一、概述
MapReduce 核心功能是将 用户编写的业务逻辑代码 和 自带默认组件 整合成一个完整的分布式运算程序 ,并发运行在一个 Hadoop 集群上。1、优缺点:优点:1 ) MapReduce 易于编程它简单的实现一些接口,就可以完成一个分布式程序, 这个分布式程序可以分布到大量廉价的 PC 机器上运行。也就是说你写一个分布式程序,跟写一个简单的串行程序是一模一样的。就是因为这个特点使得 MapReduce 编程变得非常流行。2 )良好的扩展性当你的计算资源不能得到满足的时候,你可以通过 简单的增加机器 来扩展它的计算能力。3 )高容错性MapReduce 设计的初衷就是使程序能够部署在廉价的 PC 机器上,这就要求它具有很高的容错性。比如 其中一台机器挂了,它可以把上面的计算任务转移到另外一个节点上运行,不至于这个任务运行失败 ,而且这个过程不需要人工参与,而完全是由 Hadoop 内部完成的。4 )适合 PB 级以上海量数据的离线处理可以实现上千台服务器集群并发工作,提供数据处理能力。缺点:1 )不擅长实时计算:MapReduce 无法像 MySQL 一样,在毫秒或者秒级内返回结果。2 )不擅长流式计算流式计算的输入数据是动态的,而 MapReduce 的 输入数据集是静态的 ,不能动态变化。这是因为 MapReduce 自身的设计特点决定了数据源必须是静态的。3 )不擅长 DAG (有向无环图)计算多个应用程序存在依赖关系,后一个应用程序的输入为前一个的输出。在这种情况下,MapReduce 并不是不能做,而是使用后, 每个 MapReduce 作业的输出结果都会写入到磁盘,会造成大量的磁盘 IO ,导致性能非常的低下。2、核心流程( 1 )分布式的运算程序往往需要分成至少 2 个阶段。(2)第一个阶段的 MapTask 并发实例,完全并行运行,互不相干。(3)第二个阶段的 ReduceTask 并发实例互不相干,但是他们的数据依赖于上一个阶段的所有 MapTask 并发实例的输出。(4) MapReduce 编程模型只能包含一个 Map 阶段和一个 Reduce 阶段,如果用户的业务逻辑非常复杂,那就只能多个 MapReduce 程序,串行运行。3、MapReduce 具体 进程一个完整的 MapReduce 程序在分布式运行时有三类实例进程:( 1 ) MrAppMaster :负责整个程序的过程调度及状态协调。(2) MapTask :负责 Map 阶段的整个数据处理流程。(3) ReduceTask :负责 Reduce 阶段的整个数据处理流程。4、常用数据序列化类型
5、MapReduce 编码流程
用户编写的程序分成三个部分: Mapper 、 Reducer 和 Driver 。1 . Mapper 阶段( 1 )用户自定义的 Mapper 要继承自己的父类( 2 ) Mapper 的输入数据是 KV 对的形式( KV 的类型可自定义)( 3 ) Mapper 中的业务逻辑写在 map() 方法中( 4 ) Mapper 的输出数据是 KV 对的形式( KV 的类型可自定义)( 5 ) map() 方法( MapTask 进程)对每一个 <K,V> 调用一次( 1 )用户自定义的 Reducer 要继承自己的父类2 . Reducer 阶段( 2 ) Reducer 的输入数据类型对应 Mapper 的输出数据类型,也是 KV( 3 ) Reducer 的业务逻辑写在 reduce() 方法中( 4 ) ReduceTask 进程对每一组相同 k 的 <k,v> 组调用一次 reduce() 方法3 . Driver 阶段相当于 YARN 集群的客户端,用于提交我们整个程序到 YARN 集群,提交的是封装了 MapReduce 程序相关运行参数的 job 对象6、实操WordCount案例(统计每个单词出现的次数)
需求:统计文件中每个单词出现的次数
1)、Mapper类
package com.hadoop.mapreduce; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import java.io.IOException; public class WordCountMapper extends Mapper<LongWritable, Text, Text, IntWritable>{ Text k = new Text(); IntWritable v = new IntWritable(1); @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { // 1 获取一行数据 String line = value.toString(); // 2 切割(获取单词数组) String[] words = line.split(" "); // 3 输出(包装成k-v形式输出) for (String word : words) { k.set(word); context.write(k, v); } } }
2)、Reduce
package com.hadoop.mapreduce; import java.io.IOException; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; public class WordCountReducer extends Reducer<Text, IntWritable, Text, IntWritable>{ int sum; IntWritable v = new IntWritable(); @Override protected void reduce(Text key, Iterable<IntWritable> values,Context context) throws IOException, InterruptedException { // 1 累加求和(计算各个单词出现的次数) sum = 0; for (IntWritable count : values) { sum += count.get(); } // 2 输出结果 v.set(sum); context.write(key,v); } }
3)、Driver
package com.hadoop.mapreduce; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import java.io.IOException; public class WordCountDriver { public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { // 1 获取配置信息对象以及获取 job 对象 Configuration conf = new Configuration(); Job job = Job.getInstance(conf); // 2 关联本 Driver 程序的 jar job.setJarByClass(WordCountDriver.class); // 3 关联 Mapper 和 Reducer 的 jar job.setMapperClass(WordCountMapper.class); job.setReducerClass(WordCountReducer.class); // 4 设置 Mapper 输出的 kv 类型 job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(IntWritable.class); // 5 设置最终输出 kv 类型 job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); // 6 设置输入和输出路径 FileInputFormat.setInputPaths(job, new Path("D:\\aa.txt")); FileOutputFormat.setOutputPath(job, new Path("D:\\bb.txt")); // 7 提交 job boolean result = job.waitForCompletion(true); System.exit(result ? 0 : 1); } }
此时设置输入文件aa.txt
输出结果是个文件夹:
二、 Hadoop 序列化
1、序列化概述
序列化 就是 把内存中的对象,转换成字节序列 (或其他数据传输协议)以便于存储到磁盘(持久化)和网络传输。反序列化 就是将收到字节序列(或其他数据传输协议)或者是 磁盘的持久化数据,转换成内存中的对象。~为什么不用 Java 的序列化?Java 的序列化是一个重量级序列化框架( Serializable ),一个对象被序列化后,会附带很多额外的信息(各种校验信息, Header ,继承体系等),不便于在网络中高效传输。所以,Hadoop 自己开发了一套序列化机制(Writable)。Hadoop 序列化特点:( 1 )紧凑 : 高效使用存储空间。( 2 )快速: 读写数据的额外开销小。( 3 )互操作: 支持多语言的交互2、 自定义 bean 对象实现序列化接口( Writable )在企业开发中往往常用的基本序列化类型不能满足所有需求,比如在 Hadoop 框架内部传递一个 bean 对象,那么该对象就需要实现序列化接口。具体实现 bean 对象序列化步骤如下 7 步。( 1 )必须实现 Writable 接口(2)反序列化时,需要反射调用空参构造函数,所以必须有空参构造、public FlowBean() { super(); }
(3)重写序列化方法
@Override public void write(DataOutput out) throws IOException { out.writeLong(upFlow); out.writeLong(downFlow); out.writeLong(sumFlow); }
(4)重写反序列化方法@Override public void readFields(DataInput in) throws IOException { upFlow = in.readLong(); downFlow = in.readLong(); sumFlow = in.readLong(); }
(5)注意反序列化的顺序和序列化的顺序完全一致(6)要想把结果显示在文件中,需要重写 toString() ,可用 "\t" 分开,方便后续用。(7)如果需要将自定义的 bean 放在 key 中传输,则还需要实现 Comparable 接口,因为MapReduce 框中的 Shuffle 过程要求对 key 必须能排序。 详见后面排序案例。@Override public int compareTo(FlowBean o) { // 倒序排列,从大到小 return this.sumFlow > o.getSumFlow() ? -1 : 1; }
3、序列化案例实操
需求:统计每一个手机号耗费的总上行流量、总下行流量、总流量
--输入数据格式:7 13560436666 120.196.100.99 1116 954 200id 手机号码 网络ip 上行流量 下行流量 网络状态码--期望输出数据格式13560436666 1116 954 2070手机号码 上行流量 下行流量 总流量1)、自定义 bean 对象实现序列化接口(Writable)
package com.hadoop.serialize; import org.apache.hadoop.io.Writable; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; //1 继承 Writable 接口 public class FlowBean implements Writable { private long upFlow; //上行流量 private long downFlow; //下行流量 private long sumFlow; //总流量 //2 提供无参构造 public FlowBean() { } //3 提供三个参数的 getter 和 setter 方法 public long getUpFlow() { return upFlow; } public void setUpFlow(long upFlow) { this.upFlow = upFlow; } public long getDownFlow() { return downFlow; } public void setDownFlow(long downFlow) { this.downFlow = downFlow; } public long getSumFlow() { return sumFlow; } public void setSumFlow(long sumFlow) { this.sumFlow = sumFlow; } public void setSumFlow() { this.sumFlow = this.upFlow + this.downFlow; } //4 实现序列化和反序列化方法,注意顺序一定要保持一致 @Override public void write(DataOutput dataOutput) throws IOException { dataOutput.writeLong(upFlow); dataOutput.writeLong(downFlow); dataOutput.writeLong(sumFlow); } @Override public void readFields(DataInput dataInput) throws IOException { this.upFlow = dataInput.readLong(); this.downFlow = dataInput.readLong(); this.sumFlow = dataInput.readLong(); } //5 重写 ToString @Override public String toString() { return upFlow + "\t" + downFlow + "\t" + sumFlow; } }
2)、FlowMapper
package com.hadoop.serialize; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import java.io.IOException; public class FlowMapper extends Mapper<LongWritable, Text, Text, FlowBean> { private Text outK = new Text(); private FlowBean outV = new FlowBean(); @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { //1 获取一行数据,转成字符串 String line = value.toString(); //2 切割数据 String[] split = line.split("\t"); //3 抓取我们需要的数据:手机号,上行流量,下行流量 String phone = split[1]; String up = split[split.length - 3]; String down = split[split.length - 2]; //4 封装 outK outV(hadoop序列化的bean) outK.set(phone); outV.setUpFlow(Long.parseLong(up)); outV.setDownFlow(Long.parseLong(down)); outV.setSumFlow(); //5 写出 outK outV context.write(outK, outV); } }
3、FlowReducer
package com.hadoop.serialize; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; import java.io.IOException; public class FlowReducer extends Reducer<Text, FlowBean, Text, FlowBean> { private FlowBean outV = new FlowBean(); @Override protected void reduce(Text key, Iterable<FlowBean> values, Context context) throws IOException, InterruptedException { long totalUp = 0; long totalDown = 0; //1 遍历 values,将其中的上行流量,下行流量分别累加 for (FlowBean flowBean : values) { totalUp += flowBean.getUpFlow(); totalDown += flowBean.getDownFlow(); } //2 封装 outKV outV.setUpFlow(totalUp); outV.setDownFlow(totalDown); outV.setSumFlow(); //3 写出 outK outV context.write(key,outV); } }
4、FlowDriver
package com.hadoop.serialize; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import java.io.IOException; public class FlowDriver { public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { //1 获取 job 对象 Configuration conf = new Configuration(); Job job = Job.getInstance(conf); //2 关联本 Driver 类 job.setJarByClass(FlowDriver.class); //3 关联 Mapper 和 Reducer job.setMapperClass(FlowMapper.class); job.setReducerClass(FlowReducer.class); //4 设置 Map 端输出 KV 类型 job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(FlowBean.class); //5 设置程序最终输出的 KV 类型 job.setOutputKeyClass(Text.class); job.setOutputValueClass(FlowBean.class); //6 设置程序的输入输出路径 FileInputFormat.setInputPaths(job, new Path("D:\\inputflow")); FileOutputFormat.setOutputPath(job, new Path("D:\\flowoutput")); //7 提交 Job boolean b = job.waitForCompletion(true); System.exit(b ? 0 : 1); } }
1、InputFormat 数据输入MapTask 的并行度决定 Map 阶段的任务处理并发度,进而影响到整个 Job 的处理速度。思考: 1G 的数据,启动 8 个 MapTask ,可以提高集群的并发处理能力。那么 1K 的数据,也启动 8 个 MapTask ,会提高集群性能吗? MapTask 并行任务是否越多越好呢?哪些因素影响了 MapTask 并行度?MapTask 并行度决定机制:数据块: Block 是 HDFS 物理上把数据分成一块一块。 数据块是 HDFS 存储数据单位 。数据切片: 数据切片只是在逻辑上对输入进行分片,并不会在磁盘上将其切分成片进行存储。 数据切片是 MapReduce 程序计算输入数据的单位 ,一个切片会对应启动一个 MapTask 。1 )一个 Job 的 Map 阶段并行度由客户端在提交 Job 时的切片数决定2 )每一个 Split 切片分配一个 MapTask 并行实例处理3 )默认情况下,切片大小 =BlockSize4 )切片时不考虑数据集整体,而是逐个针对每一个文件单独切片2、Job 提交流程源码和切片源码详解1)、Job 提交流程源码waitForCompletion() submit(); // 1 建立连接 connect(); // 1)创建提交 Job 的代理 new Cluster(getConfiguration()); // (1)判断是本地运行环境还是 yarn 集群运行环境 initialize(jobTrackAddr, conf); // 2 提交 job submitter.submitJobInternal(Job.this, cluster) // 1)创建给集群提交数据的 Stag 路径 Path jobStagingArea = JobSubmissionFiles.getStagingDir(cluster, conf); // 2)获取 jobid ,并创建 Job 路径 JobID jobId = submitClient.getNewJobID(); // 3)拷贝 jar 包到集群 copyAndConfigureFiles(job, submitJobDir); rUploader.uploadFiles(job, jobSubmitDir); // 4)计算切片,生成切片规划文件 writeSplits(job, submitJobDir); maps = writeNewSplits(job, jobSubmitDir); input.getSplits(job); // 5)向 Stag 路径写 XML 配置文件 writeConf(conf, submitJobFile); conf.writeXml(out); // 6)提交 Job,返回提交状态 status = submitClient.submitJob(jobId, submitJobDir.toString(),job.getCredentials());
2)、FileInputFormat 切片源码解析(input.getSplits(job))
( 1 )程序先找到你数据存储的目录。( 2 )开始遍历处理(规划切片)目录下的每一个文件。( 3 )遍历第一个文件 ss.txt。a )获取文件大小 fs.sizeOf(ss.txt)b)计算切片大小 computeSplitSize(Math.max(minSize,Math.min(maxSize,blocksize)))=blocksize=128Mc)默认情况下,切片大小 =blocksized )开始切,形成第 1 个切片: ss.txt—0:128M 第 2 个切片 ss.txt—128:256M 第 3 个切片ss.txt—256M:300M ( 每次切片时,都要判断切完剩下的部分是否大于块的 1.1 倍,不大 于 1.1 倍就划分一块切片 )e )将切片信息写到一个切片规划文件中f )整个切片的核心过程在 getSplit() 方法中完成g ) InputSplit 只记录了切片的元数据信息 ,比如起始位置、长度以及所在的节点列表等。( 4 )提交切片规划文件到 YARN 上, YARN 上的 MrAppMaster 就可以根据切片规划文件计算开启 MapTask 个数。FileInputFormat切片机制:
FileInputFormat切片大小的参数配置:
3、FileInputFormat 的切片实现类
FileInputFormat 实现类思考: 在运行 MapReduce 程序时,输入的文件格式包括:基于行的日志文件、二进制格式文件、数据库表等。 那么,针对不同的数据类型, MapReduce 是如何读取这些数据的呢?FileInputFormat 常见的接口实现类包括: TextInputFormat 、 KeyValueTextInputFormat 、NLineInputFormat 、 CombineTextInputFormat 和自定义 InputFormat 等。1)、TextInputFormat
TextInputFormat 是默认的 FileInputFormat 实现类。按行读取每条记录。键是存储该行在
整个文件中的起始字节偏移量, LongWritable 类型。值是这行的内容,不包括任何行终止符(换行符和回车符), Text 类型。2)、CombineTextInputFormat
框架默认的 TextInputFormat 切片机制是对任务按文件规划切片, 不管文件多小,都会是一个单独的切片 ,都会交给一个 MapTask ,这样如果有大量小文件,就 会产生大量的MapTask ,处理效率极其低下。1 )应用场景:CombineTextInputFormat 用于小文件过多的场景,它可以将多个小文件从逻辑上规划到一个切片中,这样,多个小文件就可以交给一个 MapTask 处理。2 )虚拟存储切片最大值设置CombineTextInputFormat.setMaxInputSplitSize(job, 4194304 );// 4m注意:虚拟存储切片最大值设置最好根据实际的小文件大小情况来设置具体的值。3 )切片机制生成切片过程包括:虚拟存储过程和切片过程二部分。( 1 )虚拟存储过程:将输入目录下所有文件大小,依次和设置的 setMaxInputSplitSize 值比较,如果不大于设置的最大值,逻辑上划分一个块。如果输入文件大于设置的最大值且大于两倍,那么以最大值切割一块; 当剩余数据大小超过设置的最大值且不大于最大值 2 倍,此时将文件均分成 2 个虚拟存储块(防止出现太小切片)。例如 setMaxInputSplitSize 值为 4M ,输入文件大小为 8.02M ,则先逻辑上分成一个4M 。剩余的大小为 4.02M ,如果按照 4M 逻辑划分,就会出现 0.02M 的小的虚拟存储文件,所以将剩余的 4.02M 文件切分成( 2.01M 和 2.01M )两个文件。(2)切片过程:(a)判断虚拟存储的文件大小是否大于 setMaxInputSplitSize 值,大于等于则单独形成一个切片。(b)如果不大于则跟下一个虚拟存储文件进行合并,共同形成一个切片。(c)测试举例:有 4 个小文件大小分别为 1.7M 、 5.1M 、 3.4M 以及 6.8M 这四个小文件,则虚拟存储之后形成 6 个文件块,大小分别为:1.7M ,( 2.55M 、 2.55M ), 3.4M 以及( 3.4M 、 3.4M )最终会形成 3 个切片,大小分别为:( 1.7+2.55 ) M ,( 2.55+3.4 ) M ,( 3.4+3.4 ) M4、CombineTextInputFormat 案例实操需求:将输入的大量小文件合并成一个切片统一处理。4个小文件用一个切片实现过程( 1 )不做任何处理,运行 1.8 节的 WordCount 案例程序,观察切片个数为 4 。number of splits:4(2)在 WordcountDriver 中增加如下代码,运行程序,并观察运行的切片个数为 3 。(a)驱动类中添加代码如下:// 如果不设置 InputFormat,它默认用的是 TextInputFormat.class job.setInputFormatClass(CombineTextInputFormat.class); //虚拟存储切片最大值设置 4m CombineTextInputFormat.setMaxInputSplitSize(job, 4194304);
(b)运行如果为 3 个切片。
number of splits:3(3)在 WordcountDriver 中增加如下代码,运行程序,并观察运行的切片个数为 1 。(a) 驱动中添加代码如下:// 如果不设置 InputFormat,它默认用的是 TextInputFormat.class job.setInputFormatClass(CombineTextInputFormat.class); //虚拟存储切片最大值设置 20m CombineTextInputFormat.setMaxInputSplitSize(job, 20971520);
(b)运行如果为 1 个切片
number of splits:1
(7)合并成大文件后,Shuffle 的过程也就结束了,后面进入 ReduceTask 的逻辑运算过程(从文件中取出一个一个的键值对 Group,调用用户自定义的 reduce()方法)