文章目录
- MapReduce案例-TopN(倒序排序)
- 一、案例分析
- 1、TopN分析法介绍
- 2、案例需求及分析
- 二、MapReduce 倒序排序代码实现
- 1、准备数据文件
- (1) 在虚拟机上创建文本文件
- (2) 上传文件到HDFS指定路径
- 2、map阶段实现
- (1) 创建前N成绩映射器类
- 3、Reduce阶段实现
- 4、Driver程序主类实现
- 5、运行前N驱动器类,查看结果
MapReduce案例-TopN(倒序排序)
一、案例分析
1、TopN分析法介绍
TopN分析法
是指从研究对象中按照某一个指标进行倒序或正序排列,取其中所需的N个数据,并对这N个数据进行重点分析的方法。
2、案例需求及分析
- 现假设有数据文件
num.txt
,现要求使用MapReduce技
术提取上述文本中最大的5个数据,并最终将结果汇总到一个文件中。 - 先设置
MapReduce
分区为1,即ReduceTask
个数一定只有一个。我们需要提取TopN
,即全局的前N条数据,不管中间有几个Map
、Reduce
,最终只能有一个用来汇总数据。 - 在
Map
阶段,使用TreeMap
数据结构保存TopN
的数据,TreeMap
默认会根据其键的自然顺序进行排序,也可根据创建映射时提供的Comparator
进行排序,其firstKey()
方法用于返回当前集合最小值的键。 - 在
Reduce
阶段,将Map
阶段输出数据进行汇总,选出其中的TopN
数据,即可满足需求。这里需要注意的是,TreeMap
默认采取正序排列,需求是提取5个最大的数据,因此要重写Comparator
类的排序方法进行倒序排序.
二、MapReduce 倒序排序代码实现
1、准备数据文件
启动hadoop服务
(1) 在虚拟机上创建文本文件
创建topn
目录,在里面创建num.txt
文件
(2) 上传文件到HDFS指定路径
创建/topn/input
目录,执行命令:hdfs dfs -mkdir -p /topn/input
将文本文件num.txt
,上传到HDFS
的/topn/input
目录
2、map阶段实现
使用IntelliJ
开发工具创建Maven
项目TopN
,并且新建hsl.aex.mr
包,在该路径下编写自定义Mapper
类TopNMapper
,主要用于将文件中的每行数据进行切割提取,并把数据保存到TreeMap
中,判断TreeMap
是否大于5
,如果大于5就需要移除最小的数据
。TreeMap
保存了当前文件最大5条数据
后,再输出到Reduce
阶段。
(1) 创建前N成绩映射器类
创建`hsl.aex.mr`包,在包里创建`TopNScoreMapper`类
package hsl.aex.mr;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.util.TreeMap;
public class TopNMapper extends Mapper<LongWritable,Text,NullWritable,IntWritable>{
private TreeMap<Integer, String> repToRecordMap = new TreeMap<Integer, String>();
// <0,10 3 8 7 6 5 1 2 9 4>
// <xx,11 12 17 14 15 20>
@Override
public void map(LongWritable key, Text value, Context context) {
String line = value.toString();
String[] nums = line.split(" ");
for (String num : nums) {
repToRecordMap.put(Integer.parseInt(num), " ");
if (repToRecordMap.size() > 5) {
repToRecordMap.remove(repToRecordMap.firstKey());
}
}
}
@Override
protected void cleanup(Context context) {
for (Integer i : repToRecordMap.keySet()) {
try {
context.write(NullWritable.get(), new IntWritable(i));
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
3、Reduce阶段实现
根据Map
阶段的输出结果形式,同样在hsl.aex.mr
包下,自定义Reducer
类TopNReducer
,主要用于编写TreeMap
自定义排序规则,当需求取最大值时,只需要在compare()
方法中返回正数即可满足倒序排列,reduce()
方法依然是要满足时刻判断TreeMap
中存放数据是前五个数,并最终遍历输出最大的5个数。
创建前N归并器类
在hsl.aex.mr
包里创建TopNReducer
类
package hsl.aex.mr;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
import java.util.Comparator;
import java.util.TreeMap;
public class TopNReducer extends Reducer<NullWritable,IntWritable,NullWritable,IntWritable>{
private TreeMap<Integer, String> repToRecordMap = new TreeMap<Integer, String>(new Comparator<Integer>() {
/**
* 谁大排后面
*
* @param a
* @param b
* @return 一个整数
*/
public int compare(Integer a, Integer b) {
return b - a;
}
});
public void reduce(NullWritable key, Iterable<IntWritable> values, Context context)
throws IOException, InterruptedException {
for (IntWritable value : values) {
repToRecordMap.put(value.get(), " ");
if (repToRecordMap.size() > 5) {
repToRecordMap.remove(repToRecordMap.firstKey());
}
}
for (Integer i : repToRecordMap.keySet()) {
context.write(NullWritable.get(), new IntWritable(i));
}
}
}
4、Driver程序主类实现
编写MapReduce
程序运行主类TopNDriver
,主要用于设置MapReduce
工作任务的相关参数,对HDFS
上/topn/input
目录下的源文件求前N数据,并将结果输入到HDFS
的/topn/output
目录下。
创建前N驱动器类
在hsl.aex.mr
包里创建TopNDriver
类
package hsl.aex.mr;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.net.URI;
public class TopNDriver {
public static void main(String[] args) throws Exception {
// 创建配置对象
Configuration conf = new Configuration();
// 设置数据节点主机名属性
conf.set("dfs.client.use.datanode.hostname", "true");
// 获取作业实例
Job job = Job.getInstance(conf);
// 设置作业启动类
job.setJarByClass(TopNDriver.class); //这几个
// 设置Mapper类
job.setMapperClass(TopNMapper.class);
// 设置map任务输出键类型
job.setMapOutputKeyClass(NullWritable.class);//类型的和Mapper里面的对应起来
// 设置map任务输出值类型
job.setMapOutputValueClass(IntWritable.class);
// 设置Reducer类
job.setReducerClass(TopNReducer.class);
// 设置reduce任务输出键类型
job.setOutputKeyClass(Text.class); //类型的和Reduce里面的对应起来
// 设置reduce任务输出值类型
job.setOutputValueClass(NullWritable.class);
// 定义uri字符串
String uri = "hdfs://master:9000";
// 创建输入目录
Path inputPath = new Path(uri + "/topn/input/num.txt");
// 创建输出目录
Path outputPath = new Path(uri + "/topn/output");
// 获取文件系统
FileSystem fs = FileSystem.get(new URI(uri), conf);
// 删除输出目录
fs.delete(outputPath, true);
// 给作业添加输入目录
FileInputFormat.addInputPath(job, inputPath);
// 给作业设置输出目录
FileOutputFormat.setOutputPath(job, outputPath);
// 等待作业完成
job.waitForCompletion(true);
// 输出统计结果
System.out.println("======统计结果======");
FileStatus[] fileStatuses = fs.listStatus(outputPath);
for (int i = 1; i < fileStatuses.length; i++) {
// 输出结果文件路径
System.out.println(fileStatuses[i].getPath());
// 获取文件输入流
FSDataInputStream in = fs.open(fileStatuses[i].getPath());
// 将结果文件显示在控制台
IOUtils.copyBytes(in, System.out, 4096, false);
}
}
}
5、运行前N驱动器类,查看结果
可以看到排名前五的,