MapReduce实现TopN

news2025/1/22 21:10:44

目录

1、先导知识

2、案例

2.1 需求

 2.2 代码实现

FlowBean类

Mapper类

Reducer类

Driver类

3、总结


1、先导知识

TreeMap底层是根据红黑树的数据结构构建的,默认是根据key的自然排序来组织(比如integer的大小,String的字典排序),如果key是自定义类,可以通过重写compareTo方法自定义排序。

firstKey ()方法 用于返回此TreeMap中具有最小键值的第一个键元素。.

lastKey ()方法 用于返回此TreeMap中具有最大键值的最后一个键元素。.

2、案例

2.1 需求

 2.2 代码实现

setup()与cleanup()方法:

 1、setup(),此方法被MapReduce框架仅且执行一次,在执行Map任务前,进行相关变量或者资源的集中初始化工作。若是将资源初始化工作放在方法map()中,导致Mapper任务在解析每一行输入时都会进行资源初始化工作,导致重复,程序运行效率不高!

 2、cleanup(),此方法被MapReduce框架仅且执行一次,在执行完毕Map任务后,进行相关变量或资源的释放工作。若是将释放资源工作放入方法map()中,也会导致Mapper任务在解析、处理每一行文本后释放资源,而且在下一行文本解析前还要重复初始化,导致反复重复,程序运行效率不高!

这里就使用了cleanup方法,map方法和reduce方法保持TreeMap只有n个元素;cleanup用于输出TreeMap的元素给下一个环节用,只需要执行一次,就放在cleanup。

FlowBean类

package com.atguigu.mr.top;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

import org.apache.hadoop.io.WritableComparable;

public class FlowBean implements WritableComparable<FlowBean>{

	private long upFlow;
	private long downFlow;
	private long sumFlow;
	
	
	public FlowBean() {
		super();
	}

	public FlowBean(long upFlow, long downFlow) {
		super();
		this.upFlow = upFlow;
		this.downFlow = downFlow;
	}

	@Override
	public void write(DataOutput out) throws IOException {
		out.writeLong(upFlow);
		out.writeLong(downFlow);
		out.writeLong(sumFlow);
	}

	@Override
	public void readFields(DataInput in) throws IOException {
		upFlow = in.readLong();
		downFlow = in.readLong();
		sumFlow = in.readLong();
	}

	public long getUpFlow() {
		return upFlow;
	}

	public void setUpFlow(long upFlow) {
		this.upFlow = upFlow;
	}

	public long getDownFlow() {
		return downFlow;
	}

	public void setDownFlow(long downFlow) {
		this.downFlow = downFlow;
	}

	public long getSumFlow() {
		return sumFlow;
	}

	public void setSumFlow(long sumFlow) {
		this.sumFlow = sumFlow;
	}

	@Override
	public String toString() {
		return upFlow + "\t" + downFlow + "\t" + sumFlow;
	}

	public void set(long downFlow2, long upFlow2) {
		downFlow = downFlow2;
		upFlow = upFlow2;
		sumFlow = downFlow2 + upFlow2;
	}

	@Override
	public int compareTo(FlowBean bean) {
		
		int result;
		
		if (this.sumFlow > bean.getSumFlow()) {
			result = -1;
		}else if (this.sumFlow < bean.getSumFlow()) {
			result = 1;
		}else {
			result = 0;
		}
		
		return result;
	}
}

Mapper类

package com.atguigu.mr.top;

import java.io.IOException;
import java.util.Iterator;
import java.util.TreeMap;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class TopNMapper extends Mapper<LongWritable, Text, FlowBean, Text>{
	
	// 定义一个TreeMap作为存储数据的容器(天然按key排序)
	private TreeMap<FlowBean, Text> flowMap = new TreeMap<FlowBean, Text>();
	private FlowBean kBean;
	
	@Override
	protected void map(LongWritable key, Text value, Context context)	throws IOException, InterruptedException {
		
		kBean = new FlowBean();
		Text v = new Text();
		
		// 1 获取一行
		String line = value.toString();
		
		// 2 切割
		String[] fields = line.split("\t");
		
		// 3 封装数据
		String phoneNum = fields[0];
		long upFlow = Long.parseLong(fields[1]);
		long downFlow = Long.parseLong(fields[2]);
		long sumFlow = Long.parseLong(fields[3]);
		
		kBean.setDownFlow(downFlow);
		kBean.setUpFlow(upFlow);
		kBean.setSumFlow(sumFlow);
		
		v.set(phoneNum);
		
		// 4 向TreeMap中添加数据
		flowMap.put(kBean, v);
		
		// 5 限制TreeMap的数据量,超过10条就删除掉流量最小的一条数据
		if (flowMap.size() > 10) {
//		flowMap.remove(flowMap.firstKey());
			flowMap.remove(flowMap.lastKey());		
}
	}
	
	@Override
	protected void cleanup(Context context) throws IOException, InterruptedException {
		
		// 6 遍历treeMap集合,输出数据
		Iterator<FlowBean> bean = flowMap.keySet().iterator();

		while (bean.hasNext()) {

			FlowBean k = bean.next();

			context.write(k, flowMap.get(k));
		}
	}
}

Reducer类

package com.atguigu.mr.top;

import java.io.IOException;
import java.util.Iterator;
import java.util.TreeMap;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

public class TopNReducer extends Reducer<FlowBean, Text, Text, FlowBean> {

	// 定义一个TreeMap作为存储数据的容器(天然按key排序)
	TreeMap<FlowBean, Text> flowMap = new TreeMap<FlowBean, Text>();

	@Override
	protected void reduce(FlowBean key, Iterable<Text> values, Context context)throws IOException, InterruptedException {

		for (Text value : values) {

			 FlowBean bean = new FlowBean();
			 bean.set(key.getDownFlow(), key.getUpFlow());

			 // 1 向treeMap集合中添加数据
			flowMap.put(bean, new Text(value));

			// 2 限制TreeMap数据量,超过10条就删除掉流量最小的一条数据
			if (flowMap.size() > 10) {
				// flowMap.remove(flowMap.firstKey());
flowMap.remove(flowMap.lastKey());
			}
		}
	}

	@Override
	protected void cleanup(Reducer<FlowBean, Text, Text, FlowBean>.Context context) throws IOException, InterruptedException {

		// 3 遍历集合,输出数据
		Iterator<FlowBean> it = flowMap.keySet().iterator();

		while (it.hasNext()) {

			FlowBean v = it.next();

			context.write(new Text(flowMap.get(v)), v);
		}
	}
}

Driver类

package com.atguigu.mr.top;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class TopNDriver {

	public static void main(String[] args) throws Exception {
		
		args  = new String[]{"e:/output1","e:/output3"};
		
		// 1 获取配置信息,或者job对象实例
		Configuration configuration = new Configuration();
		Job job = Job.getInstance(configuration);

		// 6 指定本程序的jar包所在的本地路径
		job.setJarByClass(TopNDriver.class);

		// 2 指定本业务job要使用的mapper/Reducer业务类
		job.setMapperClass(TopNMapper.class);
		job.setReducerClass(TopNReducer.class);

		// 3 指定mapper输出数据的kv类型
		job.setMapOutputKeyClass(FlowBean.class);
		job.setMapOutputValueClass(Text.class);

		// 4 指定最终输出的数据的kv类型
		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(FlowBean.class);

		// 5 指定job的输入原始文件所在目录
		FileInputFormat.setInputPaths(job, new Path(args[0]));
		FileOutputFormat.setOutputPath(job, new Path(args[1]));

		// 7 将job中配置的相关参数,以及job所用的java类所在的jar包, 提交给yarn去运行
		boolean result = job.waitForCompletion(true);
		System.exit(result ? 0 : 1);
	}
}

3、总结

MapReduce实现TopN的步骤:

(1)利用TreeMap排序, 每过来一个数据 先放入TreeMap中, 只要TreeMap的size超过n,就移除firstKey或者lastKey对应的(看是从小到大还是从大到小排序);

(2)在众多的Mapper的端,首先计算出各端Mapper的TopN,然后在将每一个Mapper端的TopN汇总到Reducer端进行计算最终的TopN,这样就可以最大化的提高运行并行处理的能力,同时极大的减少网络的Shuffle传输数据,从而极大的加快的整个处理的效率。

参考:mapreduce求topN - hdc520 - 博客园 (cnblogs.com)

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/195026.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

一刷代码随想录——回溯算法

1.理论基础【1】本质回溯法也可以叫做回溯搜索法&#xff0c;它是一种搜索的方式。回溯是递归的副产品&#xff0c;只要有递归就会有回溯。因为回溯的本质是穷举&#xff0c;穷举所有可能&#xff0c;然后选出我们想要的答案&#xff0c;如果想让回溯法高效一些&#xff0c;可以…

线性DP与真题

目录 一、前言 二、最长公共子序列&#xff08;lanqiaoOJ题号1189&#xff0c;类似于1054&#xff09; 三、最长递增子序列 1、蓝桥骑士&#xff08;lanqiaoOJ题号1188&#xff09; 四、编辑距离 1、字符串转换&#xff08;lanqiaoOJ题号1507&#xff09; 五、网络图上的…

JavaScript两数之和

两数之和 两层for循环 // O(n^2) const twoNum function(nums,target){for(let i 0;i<nums.length;i){for(let ji1 ;j<nums.length;j){if(nums[i]nums[j]target){return[i,j]}}} }双指针 // 当数组为有序的时候O(n) const twoNum2 function(nums,target){let i 0 …

SpringCloud学习

由于Spring Cloud基于Spring Boot构建&#xff0c;而Spring Cloud Alibaba又基于Spring Cloud Common的规范实现&#xff0c;所以当我们使用Spring Cloud Alibaba来构建微服务应用的时候&#xff0c;需要知道这三者之间的版本关系。 目前Spring Cloud Alibaba的版本与Spring Bo…

1-1MySql复习

MySql复习 一 数据类型 数值 字符串 ​ char(5) 定长字符串 varchar(5) 可变长度字符串 日期 ​ timestamp 记录行数据的最后修改事件 二 基本查询 1 聚合函数 avg count sum max min 2 排序 order by ​ asc ​ desc 3 分组 group by … having … 分组通常跟…

Python语言的重要性(模式识别与图像处理课程作业)

Python语言的重要性&#xff08;模式识别与图像处理课程作业&#xff09;Python语言的重要性1 Python的优点主要有&#xff1a;1.1、简单1.2、易学1.3、速度快1.4、免费1.5、高层语言1.6、解释性1.7、面向对象1.8、可扩展性1.9、可嵌入性1.10、丰富的库1.11、规范的代码2 Pytho…

TCP/IP网络编程——套接字的多种可选项

完整版文章请参考&#xff1a; TCP/IP网络编程完整版文章 文章目录第 9 章 套接字的多种可选项9.1 套接字可选项和 I/O 缓冲大小9.1.1 套接字多种可选项9.1.2 getsockopt & setsockopt9.1.3 SO_SNDBUF & SO_RCVBUF9.2 SO_REUSEADDR9.2.1 发生地址分配错误&#xff08;B…

高效学 C++|编程实例之计算器

本节将实现一个能进行实数间加、减、乘、除运算的简易计算器。首先创建一个基于QWidget带界面的Qt项目&#xff0c;然后按照如下步骤进行操作&#xff1a; 01、计算器界面设计 在界面中拖入两个单行文本框和十七个按钮&#xff0c;按钮上显示的文字、按钮对象和单行文本框对象…

百分百拿捏offer的自动化测试面试题全套教程

最近很多咨询我&#xff0c;有没有软件测试方面的面试题&#xff0c;尤其是Python自动化测试相关的最新面试题&#xff0c;所以今天给大家整理了一份&#xff0c;希望能帮助到你们。 接口测试基础 1、公司接口测试流程是什么&#xff1f; 从开发那边获取接口设计文档、分析接口…

VUE3 指令 插槽

指令 指令是 Vue 模板语法里的特殊标记&#xff0c;在使用上和 HTML 的 data-* 属性十分相似&#xff0c;统一以 v- 开头&#xff08; e.g. v-html &#xff09;。 它以简单的方式实现了常用的 JavaScript 表达式功能&#xff0c;当表达式的值改变的时候&#xff0c;响应式地…

1x1卷积、Inception网络

目录1.1x1卷积(1x1 convolution)又称网络中的网络(network in network)池化层只能压缩图像的宽和高&#xff0c;1x1卷积能压缩通道数量&#xff0c;减少计算成本。如上图&#xff0c;输入维度的通道数为192&#xff0c;用32个1x1x192的filters&#xff0c;就能将输出的通道数压…

java基础—面试题一

文章目录1.和equals区别是什么&#xff1f;2.Java中的 <<、>>、>>> 是什么3.if-else-if-else与switch的区别4.while和do-while的区别5.switch 是否能作用在 byte 上&#xff0c;是否能作用在 long 上&#xff0c;是否能作用在String上6.&和&&…

大数据技术架构(组件)16——Hive:内置UDTF函数

1.4.11、内置UDTF函数1.4.11.1、explodeselect explode(array(100,200,300));Array<int> myCol[100,200,300][400,500,600]得到的结果如下&#xff1a;(int) myNewCol1002003004005006001.4.11.2、posexplodeselect posexplode(array(A,B,C));1.4.11.3、parse_url_tuples…

2023云原生安全值得关注的3个方向

如果说过去几年教会了我们什么的话&#xff0c;那就是云原生和开源环境中安全的重要性。 Log4j 等漏洞产生的重大影响&#xff0c;在无数的行业中浮现&#xff0c;对于云原生环境中的其他安全问题也越来越受到重视。 组织不再质疑是否要迁移到云端&#xff0c;而是在寻找最快、…

centos下安装docker 并通过docker安装gitlab

一:安装docker1、若之前安过docker&#xff0c;可以先卸载yum remove docker \docker-client \docker-client-latest \docker-common \docker-latest \docker-latest-logrotate \docker-logrotate \docker-selinux \docker-engine-selinux \docker-engine \docker-ce2、更新yum…

软件测试基础(四) 之 软件测试的覆盖率

一、什么是软件测试的覆盖率&#xff1f;软件测试覆盖率是软件测试技术有效性的一个度量手段&#xff0c;用来度量测试完整性。意思概括的说&#xff0c;软件测试的工作中会有非常非常多的item&#xff08;任务&#xff09;&#xff0c;执行过的任务和总任务数的一个比值&#…

尚医通 (二)项目搭建

目录一、工程结构介绍1、工程结构2、模块说明二、创建父工程1、创建sprigboot工程yygh_parent2、删除 src 目录3、配置 pom.xml4、在pom.xml中添加依赖的版本三、搭建model模块1、在父工程yygh_parent下面创建模块model2、添加项目需要的依赖3、复制项目实体类和VO类四、搭建se…

require和important区别

1.require是赋值过程&#xff0c;就是把一个值赋值给另一个&#xff0c;important是对这个值的引用 2 . require 是赋值过程并且是运行时才执行&#xff0c;也就是同步加载&#xff0c;import 是解构过程并且是编译时执行&#xff0c;理解为异步加载 3.require 的性能相对于 im…

Linux部署达梦数据库超详细教程

陈老老老板&#x1f9b8;&#x1f468;‍&#x1f4bb;本文专栏&#xff1a;国产数据库-达梦数据库&#xff08;主要讲一些达梦数据库相关的内容&#xff09;&#x1f468;‍&#x1f4bb;本文简述&#xff1a;本文讲一下达梦数据库的下载与安装教程&#xff08;Linux版&#x…

百度网盘秒传链接生成及提取方法

百度网盘秒传链接生成及提取方法 1.认识秒传链接 首先&#xff0c;我们认识一下秒传链接的格式&#xff1a; 秒传链接是由标准提取码文件名组成。例如下面的格式&#xff1a; fd00338387f50ee5919eb3df4cfce6e3#5048587008#/影视/电影/救火奶爸.mp4 百度网盘秒传链接的提取主…