MapReduce概述
是一个分布式的编程框架,MapReduce核心功能是将用户编写的业务逻辑代码和自带默认组件整合成一个完整的分布式运算程序,并发运行在一个Hadoop集群上。
- 优点:
- 易于编程,简单的实现一些接口,就可以完成一个分布式程序。
- 良好的扩展性,可以增加机器来扩展计算能力。
- 高容错性,子任务失败后可以重试4次。
- 缺点:
- 不擅长实时计算
- 不能进行流式计算,MapReduce的输入数据集是静态的,不能动态变化。
- 不擅长DAG有向无环图:下一段计算的起始数据取决于上一个阶段的结果。
MapReduce核心思想
- Map: 读单词,进行分区
- Shuffle:排序是框架内固定的代码,必须排序。进行快排
- Reduce:对区间有序的内容进行归并排序,累加单词
- 在MapReduce过程中只能有一个Map和一个Reduce
- MapReduce进程
- MrAppMaster:负责整个程序的过程调度及状态协调。
- MapTask:负责Map阶段的整个数据处理流程。
- ReduceTask:负责Reduce阶段的整个数据处理流程。
- 序列化
- 变量类型后面加上Writable,转换成可以序列化的类型
- String类型有点特别,相应的序列化类型为Text
- java类型转hadoop类型
private IntWritable key = new IntWritable();
key.set(java_value);
- 构造器转换
new intWritable(1);
- hadoop类型转java类型
int value = key.get();
WordCount案例
Driver类的8个步骤
该类中的步骤是使用hadoop框架的核心,这8个步骤是写死的,无法更改,具体为:
- 获取配置信息,获取job对象实例
- 指定本程序的jar包所在的本地路径
- 关联Mapper/Reducer业务类
- 指定Mapper输出数据的kv类型
- 指定最终输出数据的kv类型,部分案例不需要reduce这个步骤
- 指定job的输入原始文件所在目录
- 指定job的输出结果所在目录
- 提交作业
WordCountMapper类的实现
- 继承mapper类,选择mapreduce包,新版本,老版本的叫mapred包
- 根据业务需求设定泛型的具体类型,输入的kv类型,输出的kv类型。
- 输入类型
- keyIn:起始偏移量,是字节偏移量。一般不参与计算,类型为longwritable
- valueIn:每一行数据,类型为Text
- 输出类型
- keyOut: 每个单词
- valueOut:数字1
- 输入类型
- 重写map方法,ctrl + o 快捷键重写方法
@Override
protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, IntWritable>.Context context) throws IOException, InterruptedException {
//1.获取一行
// atguigu atguigu
String line = value.toString();
//2.切割
//atguigu
//atguigu
String[] words = line.split(" ");
//3.循环写出
for (String word : words) {
//封装outKey
context.write(new Text(word),new IntWritable(1));
}
}
WordCountReducer类的实现
- 继承Reduce类
- 设置输入输出类型
- 输入: 跟Map的输出类型对应即可
- 输出:
- keyInt, 单词,类型为Text
- keyOut, 次数,类型为IntWritable
- 重写reduce方法
@Override
protected void reduce(Text key, Iterable<IntWritable> values, Reducer<Text, IntWritable, Text, IntWritable>.Context context) throws IOException, InterruptedException {
int sum = 0;
//atguigu(1,1)
//累加
for (IntWritable value : values) {
sum += value.get();
}
//写出
context.write(key, new IntWritable(sum);//注意
}
在处理大数据时,不要在循环中new对象,创建对象是很消耗资源的。可以使用ctrl + alt + F
将这两个变量提升为全局变量,作为Reducer类的属性值。但其实还有更好的方法,可以使用Mapper类中的setup()方法来实现该需求。该方法是框架里面原本就设定好的方法,在map阶段前只会执行1次。
private Text outKey;
private IntWritable outValue;
@Override
protected void setup(Mapper<LongWritable, Text, Text, IntWritable>.Context context) throws IOException, InterruptedException {
outKey = new Text();
outValue = new IntWritable(1);
}
Driver类的实现
该类的写法基本上是固定的,不同需求只需要在此基础上修改一下map和reduce业务类即可。
public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {
//1.获取Job
Configuration conf = new Configuration();
Job job = Job.getInstance();
//2.设置jar包路径,绑定driver类
job.setJarByClass(WordCountDriver.class);
//3.关联mapper和reducer
job.setMapperClass(WordCountMapper.class);
job.setReducerClass(WordCountReducer.class);
//4.设置map的kv类型
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
//5.设置最终输出的kv类型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
//6.设置输入路径和输出路径
FileInputFormat.setInputPaths(job, new Path("D:\\inputOutput\\input\\wordcount"));
FileOutputFormat.setOutputPath(job, new Path("D:\\inputOutput\\output\\output666"));
//7.提交job
job.waitForCompletion(true);
}
打包本地程序到集群中运行
- 修改本地程序Driver类中的输入输出路径
//6.设置输入路径和输出路径
FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
- 使用maven的package命令打包本地程序成.jar文件,生成的jar包在target目录下
- 使用jar命令解压运行
jar 包名 数据输入路径 数据输出路径
可以直接在window中的IDEA中发送任务到Linux集群中,但配置方式较为烦琐,生产环境中使用该方法的人较少。
序列化
java序列化
序列化目的是将内存的对象放进磁盘中进行保存,序列化实际上是为了传输对象中的属性值,而不是方法。序列化本质上是一种数据传输技术,IO流。
- ObjectOutputStream 序列化流 writeObject()
- ObjectInputStream 反序列化流 readObject(Object obj)
- 序列化前提条件:
- 实现Serializable接口
- 空参构造器
- 私有的属性
- 准备get和set方法
总结:java序列化是一个比较重的序列化,序列化的内容很多,比如属性+校验+血缘关系+元数据。
hadoop序列化
特点: 轻量化,只有属性值和校验
hadoop中自定义bean对象步骤:
- 实现writable接口
- 空参构造器
- 私有的属性
- get和set方法
- 重写
write
方法和readFields
方法(write方法出现的属性顺序必须和readFields的读取顺序一致) - 如果自定义的bean对象要作为reduce的输出结果,需要重写
toString
方法,否则存入磁盘的是地址值 - 如果自定义的bean对象要作为map输出结果中的key进行输出,并进行reduce操作,必须实现
comparable
接口。
MapReduce框架原理
- 粗略流程:
mapreduce = map + reduce
- 大体流程:
mapreduce = inputFormat --> map --> shuffle(排序) --> reduce -->outputFormat
- 源码流程:
- inputformat
- map
- map
- sort: 按照字典序进行快排
- reduce
- copy:拉取map的处理结果
- sort:由于结果是局部有序,不是整体有序,进行归并排序
- reduce:之后再进行数据合并规约
InputFormat/OutputFormat基类
实现类有TextInputFormat和TextOutputFormat, 其中重点是切片逻辑和读写逻辑,读写部分的代码框架已经写死了,主要关注如何切片即可。
切片与MapTask并行度决定机制
数据块:Block是物理上真的分开存储了。
数据切片:只是逻辑上进行分片处理,每个数据切片对应一个MapTask。
正常来说,如果数据有300M,我们按照常理来说会平均划分成3 x 100 M,但是物理上每个物理块是128M,每个MapTask进行计算时需要从另外那个主机读取数据,跨越主机读取数据需要进行网络IO,这是很慢的。
所有MR选择的是按照128M来进行切分,尽管这样会导致划分的数据块并不是十分均匀,但是对于网络IO的延迟来说,还是可以接受的。
Hadoop提交流程源码和切片源码
提交源码主要debug节点
- job.waitForCompletion()
- JobState枚举类,DEFINE、RUNING
- submit()
- connect()
- 匿名内部类、new方法 无法进入,打好断点,点击快速运行进入
- ctrl + alt + 左方向键,可以返回原先的位置
- initProviderList(): 添加了本地客户端协议和Yarn客户端协议
- create(conf): 根据配置文件来决定代码的运行环境(Yarn分布式环境/本地单机环境)
- submitter 根据运行环境获取相应的提交器
- checkSpecs(job):检查输出路径是否正确
- jobStagingArea: job的临时运行区域,给定一个绝对路径的目录D:\tmp\hadoop\mapred\staging,里面存放了:
- local: 切片结果+8个配置文件总和
- yarn:切片结果+8个配置文件总和+jar包
- copyAndConfigureFiles(): 读取任务需要的支持文件读取到job的临时运行环境中,在Yarn环境中,会上传jar包到该路径中
- writeSplits():给数据添加切片标记,实际还未切分,会生成切片文件到临时区域中
- writeConf(): 写配置文件到目录中
提交源码总结:
- mapreduce.framework.name这个参数决定了运行环境
- 切片个数决定了MapTask个数