99-Hadoop-MapReduce-排序:
WritableComparable 排序
排序是MapReduce框架中最重要的操作之一。 MapTask和ReduceTask均会对数据按 照key进行排序。该操作属于
Hadoop的默认行为。任何应用程序中的数据均会被排序,而不管逻辑上是否需要。 默认排序是按照字典顺序排序,且实现该排序的方法是快速排序。对于MapTask,它会将处理的结果暂时放到环形缓冲区中,当环形缓冲区使 用率达到一定阈值后,再对缓冲区中的数据进行一次快速排序,并将这些有序数 据溢写到磁盘上,而当数据处理完毕后,它会对磁盘上所有文件进行归并排序。 对于ReduceTask,它从每个MapTask上远程拷贝相应的数据文件,如果文件大小超过一定阈值,则溢写磁盘上,否则存储在内存中。如果磁盘上文件数目达到
一定阈值,则进行一次归并排序以生成一个更大文件;如果内存中文件大小或者 数目超过一定阈值,则进行一次合并后将数据溢写到磁盘上。当所有数据拷贝完 毕后,ReduceTask统一对内存和磁盘上的所有数据进行一次归并排序。
(1)部分排序
MapReduce根据输入记录的键对数据集排序。保证输出的每个文件内部有序。
(2)全排序
最终输出结果只有一个文件,且文件内部有序。实现方式是只设置一个ReduceTask。但该方法在处理大型文件时效率极低,因为一台机器处理所有文件,完全丧失了MapReduce所提供的并行架构。
(3)辅助排序:(GroupingComparator分组)
在Reduce端对key进行分组。应用于:在接收的key为bean对象时,想让一个或几个字段相同(全部字段比较不相同)的key进入到同一个reduce方法时,可以采用分组排序。
(4)二次排序
在自定义排序过程中,如果compareTo中的判断条件为两个即为二次排序。
WritableComparable 排序案例实操(全排序)(尚硅谷案例测试)
1)需求
根据案例 2.3 序列化案例产生的结果再次对总流量进行倒序排序。
(1)输入数据
原始数据 第一次处理后的数据
phone_data .txt part-r-00000(输入数据)
(2)期望输出数据
13509468723 7335 110349 117684
13736230513 2481 24681 27162
13956435636 132 1512 1644
13846544121 264 0 264
。。。 。。。
2)需求分析
代码地址:https://gitee.com/HaoZhouRS/bigdata-study-code/tree/master/big-data-study/MapReduce-Demom/src/main/java/com/zh/mapreduce/writableCompare
WritableComparable 排序案例实操(区内排序)
1)需求
要求每个省份手机号输出的文件中按照总流量内部排序。
2)需求分析
基于前一个需求,增加自定义分区类,分区按照省份手机号设置。
3)案例实操
(1)增加自定义分区类
package com.zh.mapreduce.partitionandwritableComparable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Partitioner;
public class ProvincePartitionerTwo extends Partitioner<FlowBean, Text> {
@Override
public int getPartition(FlowBean flowBean, Text text, int numPartitions) {
//获取手机号前三位
String phone = text.toString();
String prePhone = phone.substring(0, 3);
//定义一个分区号变量 partition,根据 prePhone 设置分区号
int partition;
switch (prePhone) {
case "136":
partition = 0;
break;
case "137":
partition = 1;
break;
case "138":
partition = 2;
break;
case "139":
partition = 3;
break;
default:
partition = 4;
break;
}
//最后返回分区号 partition
return partition;
}
}
(2)在驱动类中添加分区类
job.setPartitionerClass(ProvincePartitionerTwo.class);
job.setNumReduceTasks(5);
103-Hadoop-MapReduce-Combiner合并:
Combiner 合并
(1)Combiner是MR程序中Mapper和Reducer之外的一种组件。
(2)Combiner组件的父类就是Reducer。
(3)Combiner和Reducer的区别在于运行的位置
Combiner是在每一个MapTask所在的节点运行;
Reducer是接收全局所有Mapper的输出结果;
(4)Combiner的意义就是对每一个MapTask的输出进行局部汇总,以减小网络传输量。
(5)Combiner能够应用的前提是不能影响最终的业务逻辑,而且,Combiner的输出kv 应该跟Reducer的输入kv类型要对应起来。 (场景,不能对结果有影响)
Mapper Reducer
3 5 7 ->(3+5+7)/3=5 (3+5+7+2+6)/5=23/5 不等于 (5+4)/2=9/2
2 6 ->(2+6)/2=4
(6)自定义 Combiner 实现步骤
(a)自定义一个 Combiner 继承 Reducer,重写 Reduce 方法
public class WordCountCombiner extends Reducer<Text, IntWritable, Text,
IntWritable> {
private IntWritable outV = new IntWritable();
@Override
protected void reduce(Text key, Iterable<IntWritable> values, Context
context) throws IOException, InterruptedException {
int sum = 0;
for (IntWritable value : values) {
sum += value.get();
}
outV.set(sum);
context.write(key,outV);
} }
(b)在 Job 驱动类中设置:
job.setCombinerClass(WordCountCombiner.class);
Combiner 合并案例实操 (尚硅谷案例)
1)需求
统计过程中对每一个 MapTask 的输出进行局部汇总,以减小网络传输量即采用 Combiner 功能。
(1)数据输入
(2)期望输出数据
期望:Combine 输入数据多,输出时经过合并,输出数据降低。
2)需求分析
3)案例实操——方案一
(1)增加一个 WordCountCombiner 类继承 Reducer
package com.zh.mapreduce.combinerone;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
public class WordCountCombiner extends Reducer<Text, IntWritable, Text, IntWritable> {
private IntWritable outV = new IntWritable();
@Override
protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
int sum = 0;
for (IntWritable value : values) {
sum += value.get();
}
outV.set(sum);
context.write(key,outV);
}
}
(2)在 WordcountDriver 驱动类中指定 Combiner
// 指定需要使用 combiner,以及用哪个类作为 combiner 的逻辑
job.setCombinerClass(WordCountCombiner.class);
运行日志,输入122,输出5
job.setNumReduceTasks(0);//shuff机制在map、和reducer中间,混洗数据部分,没有reduce,直接会从map返回
4)案例实操——方案二
(1)将 WordcountReducer 作为 Combiner 在 WordcountDriver 驱动类中指定
// 指定需要使用 Combiner,以及用哪个类作为 Combiner 的逻辑 ,使用自己的reduce
job.setCombinerClass(WordCountReducer.class);
学习路径:https://space.bilibili.com/302417610/,如有侵权,请联系q进行删除:3623472230