目录
1.MapReduce概述
2.MapReduce的基本工作原理
2.1.Map阶段
2.1.1.Map源码解析
2.1.2.map端代码总结
2.2.Shuffle and Sort阶段
2.3.Reduce阶段
2.3.1.Reduce源码解析
2.3.2.Reduce端源码总结
3.数据流与任务执行
3.1.数据输入与输出格式
3.1.1.TextInputFormat
3.1.2.SequenceFileInputFormat
3.1.3.KeyValueTextInputFormat
3.1.4.TextOutputFormat
3.1.5.SequenceFileOutputFormat
3.2.任务调度执行流程
3.2.1.MapReduce1.X
3.2.2.MapReduce2.x
4.性能优化
4.1.常见优化策略
4.1.1.合并器(Combiner)
4.1.2.Partitioner(分区器)
4.1.3.调整Map和Reduce任务数
4.1.4.压缩中间数据
4.1.5.数据本地化
4.1.6.增加并行度
5.MapReduce的局限性与扩展
5.1.局限性
5.1.1.迭代任务效率低
5.1.2.延迟高
5.1.3.模型单一
5.1.4.资源管理不足
1.MapReduce概述
Hadoop MapReduce是一个软件框架,用于轻松编写应用程序,这些应用程序以可靠、容错的方式在大型集群(数千个节点)的商用硬件上并行大量数据(数TB数据集)。
MapReduce作业通常将输入数据集分割成独立的块,这些块由映射任务以完全并行的方式进行处理。该框架对映射的输出进行排序,然后将其输入到Reduce任务中。通常,作业的输入和输出都存储在文件系统中。该框架负责安排任务,监控它们并重新执行失败的任务。
通常,计算节点和存储节点是相同的,即MapReduce框架和Hadoop分布式文件系统(参见HDFS架构指南)在同一组节点上运行。此配允许框架在已存在数据的节点上有效地调度任务,从而导致整个集群的聚合带宽非常高。
2.MapReduce的基本工作原理
2.1.Map阶段
2.1.1.Map源码解析
public class WordMapper extends
Mapper<Object, Text, Text, IntWritable> {
private final static IntWritable one = new IntWritable(1);
private Text word = new Text();
public void map(Object key, Text value, Context context)
throws IOException, InterruptedException {
String line = value.toString();
StringTokenizer itr = new StringTokenizer(line);
while (itr.hasMoreTokens()) {
word.set(itr.nextToken().toLowerCase());
context.write(word, one);
}
}
}
当我输入“hello world hello”并调用这段代码时,以下时代码的执行过程:
public class WordMapper extends
Mapper<Object, Text, Text, IntWritable> {
初始化Mapper类
private final static IntWritable one = new IntWritable(1);
private Text word = new Text();
这里将输入的数据“hello world hello”,这个字符以Value参数传递给map方法。也就是说此时的map是这样的(偏移量,“hello world hello”)
public void map(Object key, Text value, Context context)
throws IOException, InterruptedException {
String line = value.toString();
StringTokenizer itr = new StringTokenizer(line);
while (itr.hasMoreTokens()) {
word.set(itr.nextToken().toLowerCase());
context.write(word, one);
}
}
执行map方法
String line = value.toString();
输入Text对象value被转换为Java的String类型,并存储在变量line中。此时line的值就是“hello world hello”
StringTokenizer itr = new StringTokenizer(line);
StringTokenizer类用于将字符串line分解成单词。默认情况下是将根据空格来拆分成单词的。
while (itr.hasMoreTokens()) {
word.set(itr.nextToken().toLowerCase());
context.write(word, one);
}
循环遍历每个单词
word.set(itr.nextToken().toLowerCase()); 将当前单词转换为小写,并存储在word中
context.write(word, one);输出键值对,也就是一个单词对应一个one,one表示单词的计数固定为1。所以得出来就是(hello,1),(world,1),(hello,1)。它不仅用于输出键值对,还在整个MapReduce框架中充当与框架交互的接口,允许Mapper将中间数据传递给Reduce阶段。
2.1.2.map端代码总结
Map阶段是MapReduce作业的第一个步骤,主要负责将原始输入数据分解为一系列键值对(key-value pairs)。这个过程通常涉及到如下步骤:
输入数据的分片:原始数据通常存储在HDFS中,并被分割成若干个分片(splits),每个 分片通常对应一个HDFS块。MapReduce框架会为每个分片启动一个Map任务。
数据解析与映射:每个Map任务读取输入分片中的数据,并将其解析为键值对。例如,如果处理的是一个文本文件,Map任务可以将每一行文本作为一个记录,然后将该记录拆分成键值对,通常是单词作为键,出现次数作为值。
中间键值对的生成:通过自定义的Map函数,输入记录被转换为中间的键值对。这些中间的键值对。这些中间结果在整个集群存储,并为后续的shuffle阶段做准备。
示例:在一个单词计数应用中,map函数会将每个单词映射为一个键,值为1( 即出现次数)。比如,“hello world hello” 会被映射为(hello,1),(world,1),(hello,1)。
2.2.Shuffle and Sort阶段
该阶段是mapreduce作业的中间步骤,连接map阶段和reduce阶段。其中主要目的是将所有相同的键值对聚合到一起,并对他们进行排序。这个阶段可以分为以下几个步骤:
shuffle(数据洗牌):在map阶段生成的中间建制对被分发到相应的Reduce任务。具体来说,相同的建会被发送到同一个Reduce任务,以确保这些建可以一起被处理。
sort(排序):在Reduce任务接收到所有属于自己的键值对后,它们会根据键进行排序。排序的目的是为了保证Reduce阶段处理的数据是按键顺序排列的,这对某些需要顺序处理的数据非常重要。
数据合并与压缩(可选):为了跳效率,Mapreduce可以在shuffle过程合并与压缩中间数据,从而减少网络传输的负担。
示例:在前速的单词计数应用中,shuffle and sort阶段会将相同的单词汇聚在一起,如(hello,1),(hello,1) 会汇总成(hello,[1,1])。
2.3.Reduce阶段
2.3.1.Reduce源码解析
public class WordReducer extends
Reducer<Text, IntWritable, Text, IntWritable> {
private IntWritable result = new IntWritable();
public void reduce(Text key, Iterable<IntWritable> values,
Context context) throws IOException, InterruptedException {
int sum = 0;
for (IntWritable val : values) {
sum += val.get();
}
result.set(sum);
context.write(key, new IntWritable(sum));
}
}
经过上面2步的操作之后,此时我们得到的数据是(hello,[1,1]),(world,1)。
public class WordReducer extends
Reducer<Text, IntWritable, Text, IntWritable> {
private IntWritable result = new IntWritable();
首先初始化Reducer类,result 用于存储最终计算结果的IntWritable 对象。在Reduce函数中,我们将把相同键(单词)对应的所有值相加,最终将和存储在result中。
public void reduce(Text key, Iterable<IntWritable> values,
Context context) throws IOException, InterruptedException {
int sum = 0;
for (IntWritable val : values) {
sum += val.get();
}
result.set(sum);
context.write(key, new IntWritable(sum));
}
执行Reduce函数
public void reduce(Text key, Iterable<IntWritable> values,
Context context) throws IOException, InterruptedException {
key:这是来自Map阶段的一个键(在我们的例子中,就是一个单词)
values:这是一个可迭代的的IntWritable对象集合,表示所有与该key相关联的值(计数)。
在我们的例子中:key对应的是hello,values则是[1,1]。
int sum = 0;
for (IntWritable val : values) {
sum += val.get();
}
遍历求和
result.set(sum);
context.write(key, new IntWritable(sum));
}
result.set(sum);将计算结果设置为result值。
context.write(key, new IntWritable(sum));输出最终的键值对。
我们例子中则是(hello,2),(world,1)。
2.3.2.Reduce端源码总结
reduce阶段是maprecduce作业最后一步,负责对shuffle and sort阶段输出的中间键值对进行汇总和处理,最终生成作业的输出结果。主要步骤包括:
汇总与处理:Reduce任务接收到某个键的所有值后,会调用用户第一reduce函数,对这些值进行汇总操作。例如,进行求和,求平均值,最大值,最小值等聚合操作。
输出结果:Reduce函数处理完毕后,生成最终的键值对结果,并将其写回hdfs中作为最终结果输出。通常,这些结果会存储微多个文件,以适应hadoop的分布式存储结构。
示例:继续前面的单词计数应用,Reduce函数会对每个单词的出现次数进行求和,如(hello,[1,1])将被转换为(hello,2),并输出到最终的结果文件中。
3.数据流与任务执行
3.1.数据输入与输出格式
3.1.1.TextInputFormat
TextInputFormat是MapReduce中最常见的输入格式之一。它将输入文件按行划分为记录,每一行会作为一个键值对。键通常是行的字节偏移量,值则是该行的内容。这种格式适用于处理纯文本文件,比如日志文件或CSV文件。
3.1.2.SequenceFileInputFormat
SequenceFileInputFormat用于处理hadoop的二进制文件格式。SequenceFile是一种用于存储键值对的hadoop特定格式,通常用于中间数据的存储和分布式缓存。这种格式适合存储大量小文件或需要高效读取的大规模数据。
3.1.3.KeyValueTextInputFormat
KeyValueTextInputFormat将输入文件的每一行作为一个键值对进行处理,行中的第一次单词作为键,其余部分作为值。这种格式在处理结构化文本数据时非常有用。
3.1.4.TextOutputFormat
TextOutputFormat是最常用的输出格式,默认情况下,输出为文本格式。每一行包含一个键值对,键和值之间由制表符分隔。这适合与需要将处理结果以文本形式存储的场景。
3.1.5.SequenceFileOutputFormat
SequenceFileOutputFormat将结果存储为hadoop的二进制格式SequenceFile,这样的文本具有较高的读取效率和压缩比,适合用于中间结果存储或者更大规模的数据存储需求。
3.2.任务调度执行流程
3.2.1.MapReduce1.X
1.提交作业:客户端首先向JobTracker提高一个作业。作业包含了需要执行的Map和Reduce任务、输入输出路径以及作业的配置参数等。
2.JobTracker分配任务:JobTracker负责整个集群的资源管理和任务调度。它会将作业分解成多个Map任务和Reduce任务,并将这些任务分配给集群中的各个TaskTracker。
3.TaskTracker执行任务:TaskTracker运行在每个集群节点上,负责接收并执行来自JobTracker的任务。TaskTracker会将任务分配给本地的工作线程,执行Map或Reduce操作。
4.任务执行与心跳机制:TaskTracker在执行任务的同时,会定期向JobTracker发送心跳信息,报告任务的执行状态以及当前的资源使用情况。心跳信息用于通知JobTracker节点的健康状态以及任务的执行进度。
5.失败处理与重试:如果JobTracker没有在规定时间内收到某个TaskTracker的心跳信息,或者TaskTracker报告了任务失败,JobTracker会将该任务重新分配给其他可用的TaskTracker执行。
6.任务完成:当所有的Map和Reduce任务都完成后,JobTracker会通知客户端作业完成,并输出最终的结果。
3.2.2.MapReduce2.x
1.启动作业:客户端的MapReduce程序通过JobClient启动作业,作业由客户端节点发起。
2.申请新的Application:jobClient向ResourceManager(在YARN架构中,它负责资源管理)申请一个新的Application ID。
3.复制作业资源:jobClient将作业所需的资源(例如JAR文件、配置文件等)复制到分布式文件系统(HDFS)中,以便作业在集群中执行时可以访问这些资源。
4.提交申请:jobClient将作业提交给ResourceManager,ResourceManager随后启动一个ApplicationMaster来管理该作业的执行。
5.初始化:jobTracker(在图中应是ResourceManager,负责集群资源的分配和管理)初始化并准备管理作业。
6.获取输入拆分:输入数据被分割为多个逻辑分片(input splits),每个Map任务将处理一个分片。这些分片是由InputFormat类管理的。
7.发送心跳信息:TaskTracker节点(在YARN中被称为NodeManager)定期向ResourceManager(或图中的jobTracker)发送心跳信息,报告其状态并请求新的任务。
8.获取工作资源:TaskTracker节点从HDFS获取作业所需的资源,这些资源包括输入数据和运行任务的必要文件。
9.启动JVM:TaskTracker节点启动一个子JVM(Java虚拟机),这个子JVM用于执行具体的Map或Reduce任务。
10.运行任务:子JVM开始执行分配给它的任务,可能是Map任务或Reduce任务,直至任务完成。
4.性能优化
4.1.常见优化策略
4.1.1.合并器(Combiner)
Combiner是一种本地Reduce操作,它在Map任务的输出阶段起作用,用来合并相同的键值对,从而减少需要传输到Reduce任务的数据量。
如果Map阶段的输出可以通过局部合并得到较小的中间结果,使用Combiner可以显著减少网络传输的负担。Combiner通常用于诸如计数、求和等操作。
4.1.2.Partitioner(分区器)
Partitioner负责将Map阶段输出的键值对分配到不同的Reduce任务中。默认的分区方式是根据键的哈希值,但在某些情况下,自定义分区器可以帮助数据均衡地分配到各个Reduce任务中,从而避免某些Reduce任务过载。
自定义Partitioner可以确保数据按某种逻辑进行分配,例如基于键的范围进行分区,使每个Reduce任务处理的工作量更加均衡。
4.1.3.调整Map和Reduce任务数
Map任务的数量由输入数据的分片大小决定。可以通过调整 mapreduce.input.fileinputformat.split.minsize 来控制每个Map任务处理的分片大小。适当增加分片大小可以减少Map任务的数量,减少调度开销。
Reduce任务数量的设置应根据中间数据量和集群资源进行调整。过少的Reduce任务会导致负载集中在少数几个节点,过多则会增加调度和管理的开销。一般建议设置Reduce任务数量为集群Reduce slot的1-1.5倍。
4.1.4.压缩中间数据
MapReduce的中间数据传输量可能很大,通过启用压缩可以减少网络传输时间以及磁盘I/O的开销。推荐使用Snappy和LZO是高效的压缩格式,适合MapReduce任务。
4.1.5.数据本地化
MapReduce会优先在数据存储所在的节点上运行Map任务,这被称为数据本地化。为了提高性能,应尽量确保数据分布均匀,并避免热点问题。
在HDFS上,确保数据块被均匀分布在集群的不同节点上,以最大化数据本地化的机会。
4.1.6.增加并行度
MapReduce作业中的每个Map和Reduce任务都可以在各自的JVM中并行运行。增加集群的并行度可以提升处理速度。
使用YARN可以动态调整集群的资源分配,确保MapReduce作业能够使用到更多的资源。
5.MapReduce的局限性与扩展
5.1.局限性
5.1.1.迭代任务效率低
MapReduce模型在处理迭代任务(如机器学习中的多轮训练)时表现不佳。每次迭代都需要从磁盘读取数据,处理完成后再将结果写回磁盘,这导致了大量的I/O操作。
这种I/O密集型的操作导致迭代任务的整体执行效率低下,增加了计算的时间成本。
5.1.2.延迟高
MapReduce模型的工作流程设计使得作业的启动和数据传输具有一定的延迟。任务调度、数据分发以及之间结果的处理都需要花费时间,尤其是在处理实时数据或低延迟需求的应用时、MapReduce表现不够理想。
延迟问题使得MapReduce难以应用于需要实时响应的数据处理场景,如实时数据分析和流式数据处理。
5.1.3.模型单一
MapReduce主要适用于批处理任务,对于诸如图计算、流处理等需要更加复杂数据处理模型的场景,MapReduce显得力不从心。它缺乏对这些模型的原生支持,开发者需要通过复杂的工作流程和代码来弥补。
这限制了MapReduce在多样化数据处理任务中的适用性,迫使开发者寻求更加灵活的计算框架。
5.1.4.资源管理不足
MapReduce 1.x中,JobTracker既负责任务调度,又负责资源管理,这种集中式架构导致了系统的可扩展性问题。当集群规模变大时,JobTracker容易成为性能瓶颈,并且存在单点故障的风险。
这种资源管理方式限制了集群规模的扩展性,影响了大规模集群的稳定性和高效性。