实训笔记7.21
- 7.21
- 一、MapReduce编程代码的打包问题与大数据集群环境中运行问题
- 1.1 MR程序在运行的时候,job提交作业的时候会自动识别我们的运行环境,如果我们是在windows本地运行的话,MR程序识别的环境未LocalRunner这么一个环境,这个环境是windows的模拟分布式的环境,因此我们MR程序基本上都是在windows上测试没有问题之后,打成jar包,提交给Hadoop集群的YARN进行运行。
- 1.2 如果将代码打成JAR包,部署到大数据集群上运行,也不一定是分布式运行,这个得看我们的配置
- 1.2.1 本地安装模式
- 1.2.2 伪分布式安装模式
- 1.2.3 完全分布式安装模式
- 1.2.4 HA高可用安装模式
- 1.3 MR程序运行中报错问题
- 1.3.1 运行MR程序报错HDFS的权限问题
- 1.3.2 当MR程序打成JAR包以后,在Hadoop集群的YARN上运行的时候,报错ClassNotFoundException: xxxxx.xxMapper
- 1.3.3 当MR程序打成JAR包以后,在Hadoop集群的YARN上运行的时候,报错资源不足的问题
- 二、InputFormat的常见实现类的知识点
- 2.1 cInputFortmat在MR程序中作用:
- 2.2 TextInputFormat
- 2.3 KeyValueTextInputFormat
- 2.4 NLineInputFormat
- 2.5 CombineTextInputFormat
- 2.6 SequenceFileInputFormat
- 三、MR程序Job提交的流程的源码分析
- 四、Mapper阶段的核心知识点
- 五、MR程序运行的Shuffle知识点
- 5.1 Shuffle阶段的执行逻辑
- 5.1.1 Map方法执行之后的逻辑
- 5.1.2 Reduce方法执行之前的逻辑
- 5.2 Shuffle中map输出开始执行源码解读
- 5.3 Shuffle阶段中通过自定义分区实现数据的分区规则定义
- 5.3.1 定义Java类继承Partitioner类
- 5.3.2 重写Partitioner类中getPartition 方法自定义分区规则即可
- 5.3.3 分区的数量必须和ReduceTask的数量保持一致,如果两者不一致,出现以下三种情况
- 5.4 Shuffle阶段中通过自定义排序规则保证输出结果有序
- 5.5 Shuffle阶段中的Combiner操作(MR程序的可选组件)
- 5.5.1 Combiner的使用规则
- 5.5.2 Combiner的执行时机
- 5.6 shuffle阶段reduce聚合数据的时候,哪些数据为相同的key值?
- 六、代码示例
7.21
一、MapReduce编程代码的打包问题与大数据集群环境中运行问题
1.1 MR程序在运行的时候,job提交作业的时候会自动识别我们的运行环境,如果我们是在windows本地运行的话,MR程序识别的环境未LocalRunner这么一个环境,这个环境是windows的模拟分布式的环境,因此我们MR程序基本上都是在windows上测试没有问题之后,打成jar包,提交给Hadoop集群的YARN进行运行。
1.2 如果将代码打成JAR包,部署到大数据集群上运行,也不一定是分布式运行,这个得看我们的配置
1.2.1 本地安装模式
有一个特点,如果是在本地安装模式下运行,MR程序也不是分布式运行,采用的也是模拟的运行环境,而非YARN
1.2.2 伪分布式安装模式
1.2.3 完全分布式安装模式
1.2.4 HA高可用安装模式
1.2.2~1.2.4 需要修改配置文件,其中在mapred-site,xml文件中专门配置了MR的运行环境在YARN上运行的
mapreduce.framework.name
yarn模式
如果在三种安装模式当中,如果没有配置上述的选项,那么就算YARN启动成功了,MR程序也不会在YARN上运行,还是使用local本地模拟环境
1.3 MR程序运行中报错问题
1.3.1 运行MR程序报错HDFS的权限问题
-
-
问题的原因:MR程序运行过程中需要在HDFS创建目录,并且向目录中写入MR程序运行结果,但是如果我们是在windows本地运行代码,MR程序在运行中,会使用windows上的用户名当作HDFS用户进行写操作权限,但是默认情况下,HDFS上除了root用户以外,其他用户基本上都是无权限写入的
-
报错解决方案
-
简单粗暴,但是不安全:给HDFS的根目录赋予一个777最高权限,不安全----禁止大家操作
-
MR程序在运行的时候,指定HDFS的用户为root用户而非windows本地的用户(建议大家使用) 在MR程序的 vm options中增加一个配置项:
-DHADOOP_USER_NAME=root
-
在HDFS集群中配置忽略权限检查,这个效果等同于第一种设置的方式hdfs-site.xml 必须在hdfs集群中配置,而非MR代码中
<property> <name>dfs.permissions.enabled</name> <value>false</value> </property>
-
1.3.2 当MR程序打成JAR包以后,在Hadoop集群的YARN上运行的时候,报错ClassNotFoundException: xxxxx.xxMapper
- 报错原因不是因为类的class文件没有打包到jar包当中,而是因为hadoop运行jar包的时候,不知道如何在JAR包中寻找这个类
- 解决方案:只需要让Hadoop运行jar包能找到类即可,在Driver驱动程序当中配置一行代码即可
job.setJarByClass(xxxDriver.class);
1.3.3 当MR程序打成JAR包以后,在Hadoop集群的YARN上运行的时候,报错资源不足的问题
-
-
报错原因
-
虚拟机的资源太少,MR程序运行的时候,每一个map任务默认需要1024MB的内存mapred-site.xml
<property> <name>mapreduce.map.memory.mb</name> <value>250</value> </property> <property> <name>mapreduce.map.java.opts</name> <value>-Xmx250M</value> </property> <property> <name>mapreduce.reduce.memory.mb</name> <value>300</value> </property> <property> <name>mapreduce.reduce.java.opts</name> <value>-Xmx300M</value> </property>
-
资源不足之后,YARN会把一些已经分配了资源的MapTask强制杀死,之所以会杀死,是因为YARN会进行资源的检查,如果不想报这个错,还有一种方案,关闭YARN的资源检测yarn-site.xml
<property> <name>yarn.nodemanager.vmem-check-enabled</name> <value>false</value> </property>
-
-
【注意】:MR程序的jar包的运行命令如下:
hadoop jar jar包的路径 jar包中的Driver驱动程序的全限定类名 参数1 参数2 .......
二、InputFormat的常见实现类的知识点
2.1 cInputFortmat在MR程序中作用:
- 如何对输入的文件进行切片,切片很重要,切片直接关系到Mapper阶段的MapTask的任务个数
getSplits()
- Mapper阶段的map方法在读取切片数据,如何读取,以及如何将读取的数据转换成为key-value格式的数据
createRecordReader()
2.2 TextInputFormat
- 切片机制:每一个文件单独进行切片,根据splitSize来确定
- kv读取机制:一行一行读取,每一行以偏移量为key 以当前行的数据为value
2.3 KeyValueTextInputFormat
- 切片机制和TextInputFormat机制一样的
- kv读取机制:一行一行读取,每一行以指定的分隔符进行切割,将切割之后的第一个字符串当作key值,剩余的字符串当作value
2.4 NLineInputFormat
- 切片机制:每一文件单独进行切片,根据行数来确定
- kv读取机制:和TextInputFormat机制一样的
2.5 CombineTextInputFormat
- 切片机制:所有的输入文件整体进行切片,基于设置的虚拟切片容量进行虚拟切片和物理切片两部分来完成
- kv读取机制:和TextInputFormat机制是一样的
2.6 SequenceFileInputFormat
三、MR程序Job提交的流程的源码分析
- 底层会先识别我们的运行环境
- 生成一个资源提交目录,如果是本地运行模式,那么资源提交到本地的某个路径下,如果是YARN运行模式,那么资源提交给HDFS的某个路径,生成一个JobID
- 基于InputFormat的切片机制生成切片规划文件job.split文件,并且把文件写入到资源提交目录
- 将MR程序中所有的配置项写入到一个job.xml文件,文件也写入到资源提交目录
- 程序开始申请运行资源,运行Map任务和reduce任务
四、Mapper阶段的核心知识点
- Mapper阶段需要启动多个MapTask任务,多个mapTask是并行运行,互不干扰的。
- 多个MapTask可能会在多个节点上运行,那么到底这些MapTask要在哪些节点上启动运行,MR程序也是有规矩的,移动数据不如移动计算。 启动MapTask的时候,一般要求MapTask最好在该MapTask负责的切片节点启动。这样的话我们MapTask计算数据的时候就不需要移动数据了。如果切片节点没有资源能启动MapTask了,那我们也会在距离这个数据最近的节点启动MapTask(网络拓扑原则)
- MapTask的任务个数是和切片的个数是对应,默认情况切片机制下,一个切片就是一个block块
【注意】我们之所以可以在切片所在节点启动计算任务,是因为当初我们配置Hadoop集群的时候,DataNode和NodeManager是同时配置的
五、MR程序运行的Shuffle知识点
shuffle指的是重新洗牌,逻辑上指的是将数据全部重新通过网络进行分发和洗牌,Shuffle指的是从map方法输出开始到reudce方法执行之前的这一段计算逻辑称之为Shuffle阶段
5.1 Shuffle阶段的执行逻辑
5.1.1 Map方法执行之后的逻辑
- map方法输出kv数据时,先根据指定的Partitioner计算kv数据的分区,计算成功之后,将kv数据的分区编号、kv数据本身、key、value分别在内存的其实地址,key、value数据的长度等信息写入到一个内存的环形缓冲区中(100M),
- 当环形缓冲区到达设定的阈值(80%),将环形缓冲区的数据溢写到磁盘文件,溢写数据之前,环形缓冲区的数据会根据不同分区进行一次分区排序(根据key值进行排序,默认使用快速排序算法),将排好序的分区数据溢写到磁盘文件中
- 可能Map阶段进行多次溢写,每一次溢写都需要先在环形缓冲区进行分区排序,然后再溢写文件,每一次溢写都会产生一个新的溢写文件
- 如果溢写文件的数量的超过3个,那么就会触发自己设置的combiner操作,对已经溢写完成的数据先进行一次map端的聚合操作。Combiner操作可选的。
- 当map阶段执行完成,会将产生的多个溢写文件,以及环形缓冲区剩余的还没有溢写的数据进行一次合并操作,合并成为一个大文件,只不过再合并的时候也需要进行一次排序(排序也是基于每一个分区进行,基于key值大小,使用的排序算法是归并排序算法)。
- 归并排序生成大文件之后,还会进行一次自定义的Combiner操作,对map阶段输出的数据进行一次局部汇总
【注意】Combiner操作可选的组件,如果加上的操作,第4和第6步就会执行,如果没有加,第4步和第6步一定不会执行 Combiner就算你指定了,可能一次也不执行,当map任务的计算负担很重,如果map任务的计算压力很大,那么combiner操作就算设置了,MR程序也不会执行的
5.1.2 Reduce方法执行之前的逻辑
- Copy阶段:Reduce任务根据负责的分区,从不同的MapTask上把对应的分区数据拉取到ReduceTask的内存中,如果ReduceTask内存放不下这些数据,把数据写到文件
- merge阶段:会把我们从不同maptask拉去回来的数据进行一次整体的合并
- sort阶段:合并拉取的不同mapTask分区的数据得时候,还需要对数据进行一次排序,排序可以单独指定规则,如果没有指定,默认还是使用key值得大小规则,排序算法也是归并排序
5.2 Shuffle中map输出开始执行源码解读
-
collecotr收集器往环形缓冲区写出数据,只不过写出数据的时候先根据Partitioner计算数据的分区,partitioner分区计算默认情况下有两种计算方式
- 如果reduceTask的数量等于1的时候,采用一个内部类的分区器进行分区,分区器是吧所有的数据都分配到0号分区
- 如果reduceTask的数量大于1的时候,采用一个HashPartitioner分区机制,按照key的hashcode值和Integer.MAX_VALUE进行一次&位运算,然后和reduceTask取%余数得到一个分区编号。 分区编号看ReduceTask数量,[0,reduceTask-1]
- 如果你想自己控制分区的数据,那么就得需要自定义Partitioner来完成
-
collector将数据写入环形缓冲区,环形缓冲区代码的体现就是一个字节数组,字节数组默认100M,超过80M,需要把缓冲区的数据写入到一个文件中
缓冲区可以设置大小,阈值可以设置
mapreduce.task.io.sort.mb 100 指定MR程序运行中环形缓冲区的默认大小 100M mapreduce.map.sort.spill.percent 0.80 指定MR程序运行中缓冲区的阈值 默认是0.8
也可以再mapred-site.xml配置,如果在这个文件配置了,以后所有在Hadoop集群上运行的MR程序的缓冲区和阈值都是配置文件的值了。但是这样的配置我们不建议
因为不同的计算程序环形缓冲区和阈值配置不同的参数,因此一般在MR的驱动程序使用Configuration配置,虽然这个配置只是对当前的MR生效。但是这是最常用的。
配置有个规则:缓冲区越大,溢写的次数越小,计算的速度越高。
5.3 Shuffle阶段中通过自定义分区实现数据的分区规则定义
5.3.1 定义Java类继承Partitioner类
5.3.2 重写Partitioner类中getPartition 方法自定义分区规则即可
5.3.3 分区的数量必须和ReduceTask的数量保持一致,如果两者不一致,出现以下三种情况
- reduceTask的数量大于分区数,那么会产生多个结果文件,只不过有些结果文件就是一个空白文件,多余的reduceTask没有分区数据处理才会产生空白文件
- reduceTask的数量小于分区数,而且大于1的,报错
- reduceTask的数量小于分区数,但是等于1 正常执行,只不过分区不执行了
5.4 Shuffle阶段中通过自定义排序规则保证输出结果有序
整体Shuffle阶段,一共对数据进行三次排序,而且最终输出结果文件里面的数据其实是有顺序的。三次排序分别发生在:
- 当环形缓冲区超过阈值之后溢写磁盘的时候,会先在环形缓冲区进行第一次排序操作,排序基于key值的比较器进行排序,底层采用的快速排序的算法
- 当map阶段产生了多个溢写文件之后,合并多个溢写文件以及缓冲区中的数据的之后会进行第二次排序操作,排序基于key值得比较器进行排序的,底层采用是归并排序的算法
- 当ReduceTask把它所负责的分区数据拉去到ReduceTask节点之后,也需要对拉取的多个MapTask上的数据在进行一次归并排序,默认情况下我们排序也是基于key值的比较器进行排序,但是reduce比较特殊,也可以单独指定另外一种排序规则。
5.5 Shuffle阶段中的Combiner操作(MR程序的可选组件)
Combiner其实也是一个Reducer,只不过和Reducer不一样的地方在于,Reducer是对所有的MapTask计算的结果进行聚合操作,Combiner只对当前的MapTask计算的结果进行一次局部汇总,目的是为了减少了Map阶段向Reduce阶段传输的数据量,从而提升MR程序的计算效率。
5.5.1 Combiner的使用规则
- 一般默认情况下,Combiner就是Reducer,Reducer可以当作Combiner来使用
- 如果你不想用Reducer充当Combiner,也可以自定义Combiner,如果自定义Combiner,那么必须满足以下要求
- 自定义的Combiner的类必须继承Reducer
- Combiner的输入的KV是map阶段输出的kv类型 Combiner输出的kv类型必须是Reducer阶段输入的key value类型
- 使用在Map阶段给Reduce阶段传输的数据量过大的情况下,可以使用Combine进行一次map的局部汇总,减少数据的传输量
5.5.2 Combiner的执行时机
- 当Map阶段的的溢写文件超过三个,自动触发Combiner操作
- 当map阶段执行完成之后,把所有的溢写合并之后也会触发一次Combiner操作
- Combiner在有些极端的情况下,就算我们设置了,它也可能不会执行,如果map端的计算压力过大,那么Combiner就不会执行了,而是直接执行Reducer
5.6 shuffle阶段reduce聚合数据的时候,哪些数据为相同的key值?
除了需要借助自定义类型的hashCode和equals方法以外,还需要通过比较器判断。
六、代码示例
package com.sxuek.wc02;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Partitioner;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
/**
* 现在想使用mapreduce去实现单词技术案例,案例需求:
* 1、要求可以统计出输入文件中每一个单词出现的总次数
* 2、要求输出文件有两个,其中如果单词的首字母是大写,那么单词的统计结果写出到part-r-00000文件
* 如果单词的首字母是小写,那么单词的统计结果写出到part-r-00001文件中
*
* 逻辑实现:
* 因为结果需要两个文件,因此我们需要两个ReduceTask(因为MR程序中一个reduceTask默认只输出一个文件)
* 而且现在我们还指定了分区的数据规则,MR程序的默认分区机制无法满足我们的需求,因此我们还得需要自定义分区机制
* 剩余的操作就是基本的求单词计数案例的代码
*/
public class WCDriver02 {
public static void main(String[] args) throws IOException, URISyntaxException, InterruptedException, ClassNotFoundException {
Configuration conf = new Configuration();
conf.set("fs.defaultFS","hdfs://192.168.68.101:9000");
Job job = Job.getInstance(conf);
//这一行代码用来指定程序打成JAR包之后在集群中运行时避免ClassNotFound异常问题
job.setJarByClass(WCDriver02.class);
//封装InputFormat 默认使用是TextInputFormat
job.setInputFormatClass(TextInputFormat.class);
FileInputFormat.setInputPaths(job,new Path("/wordcount.txt"));
//封装Mapper阶段
job.setMapperClass(WCMapper02.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(LongWritable.class);
//封装分区Partitioner
// job.setPartitionerClass(WCPartitioner.class);
job.setPartitionerClass(WCPartitioner01.class);
//封装reducer阶段
job.setReducerClass(WCReducer02.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(LongWritable.class);
/**
* 设置reduceTask的数量,默认情况下 我们规定ReduceTask的数量必须和自定义的分区数保持一致
* 但是规定是规定 是可以打破的,但是打破规则是要接受代价的
* 代价:如果reduceTask的数量和分区返回的数量不一致,会出现以下三种情况:
* 1、reduceTask的数量大于分区数,那么会产生多个结果文件,只不过有些结果文件就是一个空白文件,
* 多余的reduceTask没有分区数据处理才会产生空白文件
* 2、reduceTask的数量小于分区数,而且大于1的,报错
* 3、reduceTask的数量小于分区数,但是等于1 正常执行,只不过分区不执行了
*/
job.setNumReduceTasks(3);
//封装OutputFormat
// job.setOutputFormatClass();
Path path = new Path("/output");
FileSystem fs = FileSystem.get(new URI("hdfs://192.168.35.101:9000"), conf, "root");
if (fs.exists(path)){
fs.delete(path,true);
}
FileOutputFormat.setOutputPath(job,path);
//提交程序运行
boolean flag = job.waitForCompletion(true);
System.exit(flag?0:1);
}
}
class WCMapper02 extends Mapper<LongWritable, Text,Text,LongWritable>{
@Override
protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, LongWritable>.Context context) throws IOException, InterruptedException {
String line = value.toString();
String[] words = line.split(" ");
for (String word : words) {
context.write(new Text(word),new LongWritable(1L));
}
}
}
class WCReducer02 extends Reducer<Text,LongWritable,Text,LongWritable>{
@Override
protected void reduce(Text key, Iterable<LongWritable> values, Reducer<Text, LongWritable, Text, LongWritable>.Context context) throws IOException, InterruptedException {
long sum = 0L;
for (LongWritable value : values) {
sum += value.get();
}
context.write(key,new LongWritable(sum));
}
}
/**
* 自定义Partitioner实现数据分区机制
* 1、自定义的Partitioner需要传递两个泛型,两个泛型就是map阶段输出的key-value的类型,
* 因为partitioner分区是map阶段输出数据的时候触发的
* 2、重写getPartition方法
*/
class WCPartitioner extends Partitioner<Text,LongWritable>{
/**
* 方法传递了三个参数
* @param key map阶段输出的key值
* @param value map阶段输出的value值
* @param numPartitions 设置的reduceTask的数量
* @return 返回值整数类型 整数代表的是数据的分区编号 分区编号从0开始 而且分区编号必须是连贯的
*/
@Override
public int getPartition(Text key, LongWritable value, int numPartitions) {
String word = key.toString();
/**
* 分区逻辑是 如果单词的首字母是大写 那么把数据分配给0号分区处理
* 如果单词的首字母是小写 那么把数据分配给1号分区处理
* 分区的编号从0开始
*/
char first = word.charAt(0);
if (first>=65 && first <= 90){
return 0;
}else{
return 1;
}
}
}
class WCPartitioner01 extends Partitioner<Text,LongWritable>{
@Override
public int getPartition(Text key, LongWritable value, int numPartitions) {
String word = key.toString();
//首字母是全部转成了小写形式 这样的话我们就可以实现类似于忽略大小写判断的规则
char first = word.toLowerCase().charAt(0);
if(first == 'h'){
return 0;
}else if(first == 's'){
return 1;
}else{
return 2;
}
}
}
package com.sxuek.wordcount;
import com.google.common.base.Charsets;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.*;
import org.apache.hadoop.mapreduce.lib.input.LineRecordReader;
import java.io.IOException;
import java.util.List;
/**
* 自定义InputFormat 需要指定两个泛型,两个泛型就是读取kv的类型
*/
public class MyInputFormat extends InputFormat<LongWritable, Text> {
@Override
public List<InputSplit> getSplits(JobContext context) throws IOException, InterruptedException {
return null;
}
/**
* 也是一行一行读取的 以每一行的偏移量为key 以每一行的数据为value进行读取
* @param split the split to be read
* @param context the information about the task
* @return
* @throws IOException
* @throws InterruptedException
*/
@Override
public RecordReader<LongWritable, Text> createRecordReader(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException {
String delimiter = context.getConfiguration().get("textinputformat.record.delimiter");
byte[] recordDelimiterBytes = null;
if (null != delimiter){
recordDelimiterBytes = delimiter.getBytes(Charsets.UTF_8);
}
return new LineRecordReader(recordDelimiterBytes);
}
}
package com.sxuek.wordcount;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.*;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
public class WCDriver {
public static void main(String[] args) throws IOException, URISyntaxException, InterruptedException, ClassNotFoundException {
//1、准备一个配置文件对象
Configuration configuration = new Configuration();
// configuration.set("mapreduce.task.io.sort.mb","80");
// configuration.set("mapreduce.map.sort.spill.percent","0.90");
//在配置文件指定HDFS的地址,因此MR处理的数据一般都是HDFS的,但是我这里不指定了,因此在resources目录下已经通过core-site.xml文件指定了
// configuration.set(KeyValueLineRecordReader.KEY_VALUE_SEPARATOR," ");
//2、创建一个封装MR程序使用Job对象
Job job = Job.getInstance(configuration);
job.setJarByClass(WCDriver.class);
/**
* 3、封装指定的InputFormat类 如果没有指定,默认使用TextInputFormat
*/
// job.setInputFormatClass(KeyValueTextInputFormat.class);
// job.setInputFormatClass(NLineInputFormat.class);
// NLineInputFormat.setNumLinesPerSplit(job,3);//指定3行做一个切片
// job.setInputFormatClass(CombineTextInputFormat.class);
// CombineTextInputFormat.setMaxInputSplitSize(job,8);
//指定输入文件路径 输入路径默认是本地的,如果你想要是HDFS上的 那么必须配置fs.defaultFS 指定HDFS的路径
FileInputFormat.setInputPaths(job,new Path("/wordcount.txt"));
/**
* 4、封装Mapper阶段
*/
job.setMapperClass(WCMapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(LongWritable.class);
/**
* 5、封装Partitioner分区类,分区类可以不用指定,默认分区机制
*/
// job.setPartitionerClass();
job.setCombinerClass(WCCombiner.class);
/**
* 6、封装Reducer阶段
*/
job.setReducerClass(WCReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(LongWritable.class);
job.setNumReduceTasks(1);
/**
* 7、封装指定的OutputFormat,如果没有指定OutputFormat 默认使用TextOutputFormat
*/
// job.setOutputFormatClass();
//封装输出路径 输出路径不能提前存在,因此代码在中先判断是否存在,如果存在删除了
Path path = new Path("/output2");
FileSystem fs = FileSystem.get(new URI("hdfs://192.168.68.101:9000"), configuration, "root");
if (fs.exists(path)){
fs.delete(path,true);
}
FileOutputFormat.setOutputPath(job,path);
/**
* 8、提交程序运行
* 提交的时候先进行切片规划,然后将配置和代码提交给资源调度器
*/
boolean b = job.waitForCompletion(true);
System.exit(b?0:1);
}
}
class WCMapper extends Mapper<LongWritable, Text,Text,LongWritable>{
@Override
protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, LongWritable>.Context context) throws IOException, InterruptedException {
String line = value.toString();
System.out.println("map通过inputFormat机制读取的key值为"+key.get()+",读取的value值为"+line);
String[] words = line.split(" ");
for (String word : words) {
context.write(new Text(word),new LongWritable(1L));
}
}
}
class WCReducer extends Reducer<Text,LongWritable,Text,LongWritable>{
@Override
protected void reduce(Text key, Iterable<LongWritable> values, Reducer<Text, LongWritable, Text, LongWritable>.Context context) throws IOException, InterruptedException {
long sum =0l;
for (LongWritable value : values) {
sum += value.get();
}
context.write(key,new LongWritable(sum));
}
}
class WCCombiner extends Reducer<Text,LongWritable,Text,LongWritable>{
@Override
protected void reduce(Text key, Iterable<LongWritable> values, Reducer<Text, LongWritable, Text, LongWritable>.Context context) throws IOException, InterruptedException {
long sum =0l;
for (LongWritable value : values) {
sum += value.get();
}
context.write(key,new LongWritable(sum));
}
}