🍊本文使用了4个经典案例进行MapReduce实战
🍊参考官方源码,代码风格较为优雅
🍊解析详细
一、Introduction
MapReduce是一个分布式运算程序的编程框架,核心功能是将用户写的业务逻辑代码和自身默认代码整合成一个完整的分布式运算程序,并发运行在一个Hadoop集群上
其整体架构逻辑如下
Map | 读取数据,进行简单数据整理 |
Shuffle | 整合Map的数据 |
Reduce | 计算处理Shuffle中的数据 |
二、WordCount
2.1 题目
统计文件中每个单词出现的个数。左侧为原始数据,右侧为输出数据。
2.2 解析
WordCount统计单词个数是最基础的题目,我们除了要完成题目要求之外,代码尽量更加的优雅,因此我们主要参考的是Hadoop官方提供的WordCount案例
数据的走下如下
2.3 Mapper
Mapper中需要注意的是 Mapper<LongWritable, Text, Text, IntWritable>
<LongWritable,Text>为输入,<Text,IntWritable>为输出,或许很难理解为什么输出是<LongWritable,Text>,其实Text表示每一行的数据,LongWritable为每一行数据第一个数据在整个文件的偏移量,我们打印一下每次的Text和LongWritable
package com.bcn.mapreduce.wordcount;
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
/**
* 输入数据为<单词偏移量,单词>
* 输出数据为<单词,出现次数>
*/
public class WordCountMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
// Just craete one Text and IntWritable object to reduce waste of resources
Text outK = new Text();
IntWritable outV = new IntWritable(1);
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
// Get one line
String line = value.toString();
System.out.println(line);
System.out.println(key);
// split the word by space
String[] words = line.split(" ");
// output
for (String word : words) {
outK.set(word);
context.write(outK, outV);
}
}
}
2.4 Reducer
我们关注的还是数据的走向 Reducer <Text, IntWritable,Text,IntWritable> ,<ext,IntWritable>为数据输入,与Mapper的输出是一致的
这里可能很多人为疑惑为什么输入的是<Text, IntWritable>,但是我们重写reduce时却使用了<Text key,Iterable<IntWritable> values>,这是因为中间省略掉了我们看不见的Shuffle阶段
package com.bcn.mapreduce.wordcount;
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
public class WordCountReducer extends Reducer <Text, IntWritable,Text,IntWritable> {
int sum;
IntWritable outV=new IntWritable();
@Override
protected void reduce(Text key,Iterable<IntWritable> values, Context context)throws IOException, InterruptedException {
// Sum up
sum =0;
// The data for example apple,(1,1,1)
for (IntWritable count:values){
sum += count.get();
}
//Output
outV.set(sum);
context.write(key,outV);
}
}
2.4 Dreiver
最后我们设置启动类,也就是Main函数,在其中会配置7套件,这样就可以运行整个MapReduce程序了
package com.bcn.mapreduce.wordcount;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class WordCountDriver {
public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {
// 1.Get the config and job
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);
// 2.Connect Driver with jar
job.setJarByClass(WordCountDriver.class);
// 3.Connect with Mapper、Reducer
job.setMapperClass(WordCountMapper.class);
job.setReducerClass(WordCountReducer.class);
// 4.Set the class of Mapper output
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
// 5.Set the class of final output
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
// 6.Set the input and output path
FileInputFormat.setInputPaths(job, new Path("E:\\Hadoop and Spark\\data\\word.txt"));
FileOutputFormat.setOutputPath(job, new Path("E:\\Hadoop and Spark\\output\\wordCount"));
// 7.Submit the job
boolean result = job.waitForCompletion(true);
System.exit(result ? 0 : 1);
}
}
三、Grade Sort
3.1 题目
3.2 解析
3.3 Entity
3.4 Mapper
3.5 Reducer
3.6 Driver
四、Document Revere
4.1 题目
4.2 解析
4.3 Entity
4.4 Mapper
4.5 Reducer
4.6 Driver
五、Weather Top3
5.1 题目
5.2 解析
5.3 Entity
5.4 Mapper
5.5 Reducer
5.6 Driver