MapReduce定义:
MapReduce可以分解为Map (映射) + Reduce (规约) , 具体过程:
- Map : 输入数据集被切分成多个小块,并分配给不同的计算节点进行处理
- Shuffle and Sort:洗牌和排序,在 Map 阶段结束后,将每个 Mapper 生成的键值对按照键进行排序,并将相同键的值归并在一起,并将相同的键发送给后续的reduce
- Reduce: 规约计算,每个计算节点独立处理它们的键值对,并生成最终的输出结果。
MapReduce是一个分布式运算程序的编程框架,用于用户开发“基于Hadoop的数据分析应用”的核心框架。f核心功能是将用户编写的业务逻辑代码和自带默认组件整合成一个完整的分布式运算程序,并发运行在一个Hadoop集群上。
优点:
- 易于编程:用户只关心,业务逻辑。实现框架的接口。
- 良好扩展性:可以动态增加服务器,解决计算资源不够问题
- 高容错性:任何一台机器挂掉,可以将任务转移到其他节点。
- 并行处理:能够有效地利用集群中多个计算节点进行并行计算,提高处理速度。
- 适合海量数据计算(TB、PB)几千台服务器共同计算
缺点:
- 不擅长实时计算。Mysql
- 不擅长流式计算。Sparkstreaming flink
- 不擅长DAG有向无环图计算。Spark
MapReduce架构:
MapReduce中,执行MapReduce任务的机器角色有两种: JobTracker 和 TaskTracker, 其中JobTracker 用于任务调度, TaskTracker用于执行任务。 一个Hadoop集群中, 只有一台JobTracker。
当Client向JobTracker提交作业时, JobTracker会讲作业拆分到多个TaskTracker去执行, TaskTracker会定时发送心跳信息,如果一段时间JobTracker未收到TaskTracker的心跳信息,则认定该TaskTracker出现故障, 会讲该TaskTracker的任务分配给其他TackTracker。
MapReduce执行过程:
- 客户端启动一个job
- 客户端向JobTracker请求一个JobID
- JobClient讲运行作业所需要的资源复制到HDFS上, 包括jar文件、配置文件、客户端计算所得的输入划分信息,并存档在以JobID 为名的文件夹中。
- JobClient 提交任务给JobTracker.
- JobTracker 调度作业,并根据输入划分信息为每一个划分创建一个map任务,并将map任务分配给taskTracker执行。【图中的5/6步骤】
- TaskTracker每隔一段时间给JobTracker发送一个Heartbeat告诉JobTracker它仍然在运行,同时心跳还携带很多比如map任务完成的进度等信息。当JobTracker收到作业的最后一个任务完成信息时,便把作业设置成“成功”,JobClient再传达信息给用户。
MapReduce wordCount的案例:
Java类型 | Hadoop Writable类型 |
Boolean | BooleanWritable |
Byte | ByteWritable |
Int | IntWritable |
Float | FloatWritable |
Long | LongWritable |
Double | DoubleWritable |
String | Text |
Map | MapWritable |
Array | ArrayWritable |
Null | NullWritabl |
统计文档中单词出现的频次
1、引入pom依赖:
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>3.1.3</version>
</dependency>
2、序列化类 Writable
【当前案例中并未使用到自定义的序列化接口】
Hadoop有自己的序列化机制--Writable, 相比于Java的序列化,hadoop的序列化更紧凑、快速、支持多语言。
Hadoop的序列化步骤:
- 实现Writable接口
- 反序列化时需要调用无参构造,所以序列化对象必须要有无参构造
- 重写序列化方法write()
- 重写反序列化方法readFidlds()
- 反序列化的顺序和序列化的顺序必须完全一致
- 重写toString() ,将结果显示在文件中
- 实现Comparable接口,将自定义的序列化对象放在key中传输
//1 实现Writable接口
@Data
public class FlowBeanWritable implements Writable, Comparable<FlowBeanWritable> {
private long upFlow;
private long downFlow;
private long sumFlow;
//2 提供无参构造
public FlowBeanWritable() { }
//4 实现序列化和反序列化方法,注意顺序一定要保持一致
@Override
public void write(DataOutput dataOutput) throws IOException {
dataOutput.writeLong(upFlow);
dataOutput.writeLong(downFlow);
dataOutput.writeLong(sumFlow);
}
@Override
public void readFields(DataInput dataInput) throws IOException {
this.upFlow = dataInput.readLong();
this.downFlow = dataInput.readLong();
this.sumFlow = dataInput.readLong();
}
//5 重写ToString
@Override
public String toString() {
return upFlow + "\t" + downFlow + "\t" + sumFlow;
}
// 6 如果作为Key传输,则还需要实现compareTo方法
@Override
public int compareTo(FlowBeanWritable o) {
// 倒序排列,从大到小
return this.sumFlow > o.getSumFlow() ? -1 : 1;
}
}
3、编写Mapper 类,实现Mapper接口
public class WordCountMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
private Text outK = new Text();
private IntWritable outV = new IntWritable(1);
@Override
protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, IntWritable>.Context context) throws IOException, InterruptedException {
// 1 获取一行并将其转成String类型来处理
String line = value.toString();
// 2 将String类型按照空格切割后存进String数组
String[] words = line.split(" ");
// 3 依次取出单词,将每个单词和次数包装成键值对,写入context上下文中供后续调用
for (String word : words) {
// 先将String类型,转为text,再包装成健值对
outK.set(word);
context.write(outK, outV);
}
}
}
Mapper<LongWritable, Text, Text, IntWritable> 泛型里面有四个类, 这里其实是两对键值对:
- LongWritable 、Text :表示输入数据,LongWritable表示数据的索引,类似于第几行数据; Text表示读取的文件内容。一般使用系统默认的键值对。
- Text、IntWritable: 表示输出数据, Text表示输入的单词, IntWritable表示该单词出现的次数。这个键值对需要根据业务需求来确定。
4、编写Reducer类,继承Reduce抽象类
public class WordCountReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
IntWritable outV = new IntWritable();
@Override
protected void reduce(Text key, Iterable<IntWritable> values, Reducer<Text, IntWritable, Text, IntWritable>.Context context) throws IOException, InterruptedException {
int sum = 0;
for (IntWritable value : values) {
sum += value.get();
}
outV.set(sum);
//写出
context.write(key,outV);
}
}
Reducer<Text, IntWritable, Text, IntWritable> 泛型里面有四个类, 这里也是两对键值对:
- Text, IntWritable:第一个键值对,要跟Mapper的输出泛型保持一致
- Text, IntWritable:第二个键值对,表示输出的结果数据,因为这里要输出的是单词出现的次数,所以还是 Text、IntWritable 类型
Reduce是每组会执行一次,就是相同的key是会分到同一组的,所以此处只需计算每个key的count叠加即可
5、编写Driver驱动类
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
public class WordCountDriver {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
// 获取配置信息以及job对象
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);
// 关联当前Driver程序的jar
job.setJarByClass(WordCountDriver.class);
// 指定Mapper和Reducer
job.setMapperClass(WordCountMapper.class);
job.setReducerClass(WordCountReducer.class);
// 设置输入、输出的k、v类型
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
// 设置输入输出路径
FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
// 将job提交给yarn运行
Boolean result = job.waitForCompletion(Boolean.TRUE);
}
}