真正帮你实现—MapReduce统计WordCount词频,并将统计结果按出现次数降序排列

news2024/9/21 2:44:44

项目整体介绍

对类似WordCount案例的词频统计,并将统计结果按出现次数降序排列。

网上有很多帖子,均用的相似方案,重写某某方法然后。。。运行起来可能会报这样那样的错误,这里实现了一种解决方案,分享出来供大家参考:编写两个MapReduce程序,第一个程序进行词频统计,第二个程序进行降序处理,由于是降序,还需要自定义对象,在对象内部实现降序排序。

一、项目背景及数据集说明

现有某电商网站用户对商品的收藏数据,记录了用户收藏的商品id以及收藏日期,名为buyer_favorite1。buyer_favorite1包含:买家id,商品id,收藏日期这三个字段,数据以“\t”分割,样例展现如下:在这里插入图片描述

二、编写MapReduce程序,统计每个买家收藏商品数量。(即统计买家id出现的次数)

前置说明

1.配置好Hadoop集群环境,并开启相应服务、
2.在hdfs对应路径上先上传好文件,可以自己根据文件路径定义,这里是"hdfs://localhost:9000/mymapreduce1/in/buyer_favorite1"。同时再定义好输出路径
3.这里是整个程序(词频降序)的入口,若只是想统计词频,请注释掉WordCountSortDESC.mainJob2();

package mapreduce;

import java.io.IOException;
import java.util.StringTokenizer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapreduce.*;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class WordCount {
	public static void main(String[] args) {
		Configuration conf = new Configuration();
		conf.set("yarn,resourcemanager", "bym@d2e674ec1e78");
		try {
			Job job = Job.getInstance(conf, "111");
			job.setJobName("WordCount");
			job.setJarByClass(WordCount.class);
			job.setMapperClass(doMapper.class); // 这里就是设置下job使用继承doMapper类,与定义的内容保持一致
			job.setReducerClass(doReducer.class); // 同上,设置Reduce类型

			job.setMapOutputKeyClass(Text.class); // 如果map的输出和reduce的输出不一样,这里要分别定义好格式
			job.setMapOutputValueClass(IntWritable.class);
			job.setOutputKeyClass(Text.class);
			job.setOutputValueClass(Text.class);

			Path in = new Path(
					"hdfs://localhost:9000/mymapreduce1/in/buyer_favorite1");
			Path out = new Path("hdfs://localhost:9000/mymapreduce1/out");
			FileInputFormat.addInputPath(job, in);
			FileOutputFormat.setOutputPath(job, out);
			if (job.waitForCompletion(true)) {
				System.out.println("WordCount completition");
				WordCountSortDESC.mainJob2();
				System.out.println("diaoyong");
			}
		} catch (Exception e) {
			e.printStackTrace();
		}

		// System.exit(job.waitForCompletion(true) ? 0 : 1);
	}

	// 第一个Object表示输入key的类型、是该行的首字母相对于文本文件的首地址的偏移量;
	// 第二个Text表示输入value的类型、存储的是文本文件中的一行(以回车符为行结束标记);
	// 第三个Text表示输出键的类型;第四个IntWritable表示输出值的类型
	public static class doMapper extends
			Mapper<LongWritable, Text, Text, IntWritable> {
		public static final IntWritable one = new IntWritable(1);
		public static Text word = new Text();

		@Override
		// 前面两个Object key,Text value就是输入的key和value,第三个参数Context
		// context是可以记录输入的key和value。
		protected void map(LongWritable key, Text value, Context context)
				throws IOException, InterruptedException {

			// StringTokenizer是Java工具包中的一个类,用于将字符串进行拆分
			StringTokenizer tokenizer = new StringTokenizer(value.toString(),
					"\t");
			// 返回当前位置到下一个分隔符之间的字符串, 并把字符串设置成Text格式
			word.set(tokenizer.nextToken());
			context.write(word, one);
		}
	}

	// 参数依次表示是输入键类型,输入值类型,输出键类型,输出值类型
	public static class doReducer extends
			Reducer<Text, IntWritable, Text, Text> {

		@Override
		// 输入的是键值类型,其中值类型为归并后的结果,输出结果为Context类型
		protected void reduce(Text key, Iterable<IntWritable> values,
				Context context) throws IOException, InterruptedException {
			int sum = 0;
			for (IntWritable value : values) {
				sum += value.get();
			}
			context.write(key, new Text(Integer.toString(sum)));
		}
	}
}

三、核心问题:再次编写MapReduce程序,将上一步统计的结果降序排列

前置说明

1.这里将上一步统计的结果作为输入,进行第二次mapreduce程序的运行。因此要注意输入路径与上一步的输出路径保持一致。
2.由于是降序排列,只能自定义FlowBean对象,内部实现排序方式。否则,升序可以利用shuffle机制默认的排序策略不用自定义对象排序,这里不再叙述。

package mapreduce;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapreduce.*;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class WordCountSortDESC {
	public static void mainJob2() {
		Configuration conf = new Configuration();
		conf.set("yarn,resourcemanager", "bym@d2e674ec1e78");
		try {
			Job job = Job.getInstance(conf, "1111");
			job.setJobName("WordCountSortDESC");
			job.setJarByClass(WordCountSortDESC.class);
			job.setMapperClass(TwoMapper.class); // 这里就是设置下job使用继承doMapper类,与定义的内容保持一致
			job.setReducerClass(TwoReducer.class); // 同上,设置Reduce类型
			
			job.setMapOutputKeyClass(FlowBean.class);
			job.setMapOutputValueClass(Text.class);

			job.setOutputKeyClass(Text.class);
			job.setOutputValueClass(FlowBean.class);

			Path in = new Path("hdfs://localhost:9000/mymapreduce1/out");
			Path out = new Path("hdfs://localhost:9000/mymapreduce1/out555");
			FileInputFormat.addInputPath(job, in);
			FileOutputFormat.setOutputPath(job, out);
			if (job.waitForCompletion(true)) {
				System.out.println("DESC Really Done");
			}
		} catch (Exception e) {
			System.out.println("errormainJob2-----------");
		}
	}

	public static class TwoMapper extends Mapper<Object, Text, FlowBean, Text> {
		private FlowBean outK = new FlowBean();
		private Text outV = new Text();

		@Override
		protected void map(Object key, Text value, Context context)
				throws IOException, InterruptedException {
			// 由于真实的数据存储在文件块上,这里是因为数据量较小,可以保证只在一个文件块
			FileSplit fs = (FileSplit) context.getInputSplit();
			if (fs.getPath().getName().contains("part-r-00000")) {

				// 1 获取一行数据
				String line = value.toString();

				// 2 按照"\t",切割数据
				String[] split = line.split("\t");

				// 3 封装outK outV
				outK.setNumber(Long.parseLong(split[1]));
				outV.set(split[0]);

				// 4 写出outK outV
				context.write(outK, outV);
			} else {
				System.out.println("error-part-r-------------------");
			}
		}
	}

	public static class TwoReducer extends
			Reducer<FlowBean, Text, Text, FlowBean> {
		@Override
		protected void reduce(FlowBean key, Iterable<Text> values,
				Context context) throws IOException, InterruptedException {

			// 遍历values集合,循环写出,避免总流量相同的情况
			for (Text value : values) {
				// 调换KV位置,反向写出
				context.write(value, key);
			}
		}
	}


	public static class FlowBean implements WritableComparable<FlowBean> {

		private long number;

		// 提供无参构造
		public FlowBean() {
		}

		public long getNumber() {
			return number;
		}

		public void setNumber(long number) {
			this.number = number;
		}

		// 实现序列化和反序列化方法,注意顺序一定要一致
		@Override
		public void write(DataOutput out) throws IOException {
			out.writeLong(this.number);
		}

		@Override
		public void readFields(DataInput in) throws IOException {
			this.number = in.readLong();
		}

		@Override
		public String toString() {
			return number + "\t";
		}

		@Override
		public int compareTo(FlowBean o) {
			// 按照总流量比较,倒序排列
			if (this.number > o.number) {
				return -1;
			} else if (this.number < o.number) {
				return 1;
			} else {
				return 0;
			}
		}
	}

}

四、结果展示:

执行查看文件命令

hadoop fs -cat /mymapreduce1/out555/part-r-00000

在这里插入图片描述
可以发现已经进行了降序排列,其他数据集结果应类似。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/812254.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

DHCP防护原理

电脑刚连接到网络 是没有IP地址的 。 通过发送广播到DHCPO服务器。 DHCP服务器响应对应的 IP地址&#xff08;简要过程&#xff09;。 如果有人私自挂接WIFI&#xff0c;相当于DHCP服务器&#xff0c;但这个DHCP服务器是假的&#xff0c;就会引起电脑接入获取家用WIFI的地址&…

十三.redis主从复制

概念 主从复制&#xff0c;指将一台redis服务器的数据&#xff0c;复制到其它的redis服务器。前者称为主节点(master)&#xff0c;后者称为从节点(slave)&#xff1b;数据的复制是单向的&#xff0c;只能由主节点到从节点。master以写为主&#xff0c;slave以读为主。 默认情况…

小研究 - 面向 Spring 的热点代码在线部署方法研究(三)

随着Spring生态不断发展,越来越先进的部署方式降低了部署的复杂度,提高了不同环境下的部署效率,但是在预生产环境下,对频繁改动的热点代码,其部署效率不是很理想,一些简单的代码修改就会引发对所有依赖服务的重新编译部署,给项目部署、运维以及测试带来很多预期之外的影响。在线…

设计模式再探——代理模式

目录 一、背景介绍二、思路&方案三、过程1.代理模式简介2.代理模式的类图3.代理模式代码4.代理模式还可以优化的地方5.代理模式的项目实战&#xff0c;优化后(只加了泛型方式&#xff0c;使用CGLIB的代理) 四、总结五、升华 一、背景介绍 最近在做产品过程中对于日志的统一…

【Git】分支管理之创建、切换、合并、删除分支以及冲突处理

目录 一、理解分支 二、创建、切换、合并分支 三、删除分支 四、冲突处理 五、合并模式 六、合并策略 七、Bug分支处理 八、强制删除分支 一、理解分支 master其实就是一个指针 &#xff0c;他指向的是主分支最近一次commit。我们可以创建新的分支&#xff0c;在新的分…

antv x6将节点拖动到两连线的节点中,自动插入

1、找到节点相交的边 /*** * 将节点拖入两节点之间自动插入【找相交的边】* date 2023-07-29*/export const findIntersectsEdge (graph, node) > {const edges graph.getEdges();const bbox node.getBBox();const lines [bbox.leftLine, bbox.rightLine, bbox.topLine…

《零基础入门学习Python》第073讲:GUI的终极选择:Tkinter10

我们不难发现&#xff0c;几乎每一个应用程序都有一些相同的地方&#xff0c;比如说&#xff1a;标题栏、状态栏、边框、滚动条、工作区。还有的就是 菜单。 传统的菜单有大家熟悉的 File&#xff0c;Edit&#xff0c;Help等&#xff0c;点开之后&#xff0c;是下拉菜单&#…

点云可视化工具2

文章目录 1. 序2. 开发环境2.1 QT PCL 3. 程序3.1 新建项目3.2 修改.pro文件3.2.1 添加头文件目录3.2.2 添加依赖的库文件 3.3 软件界面3.3.1 ui文件3.3.2 按钮图标3.3.3 其他界面设置 3.4 点云处理3.4.1 点云读取显示3.4.2 上/下一张显示点云3.4.3 状态栏显示点云信息3.4.5 线…

0基础五分钟学会使用shardingJDBC实现分表 及测试

1.引入相关依赖 <dependency> <groupId>org.apache.shardingsphere</groupId> <artifactId>sharding-jdbc-spring-boot-starter</artifactId> <version>4.1.1</version> </dependency> 2.添加配置 不懂的地方自己看注释 主…

vue基础-虚拟dom

vue基础-虚拟dom 1、真实dom目标2、虚拟dom目标 1、真实dom目标 在真实的document对象上&#xff0c;渲染到浏览器上显示的标签。 2、虚拟dom目标 本质是保存节点信息、属性和内容的一个JS对象 更新会监听变化的部分 给真实的DOM打补丁

SpringBoot多环境开发-配置文件

在Spring Boot中进行多环境开发时&#xff0c;你可以使用配置文件来定义每个环境的属性。Spring Boot提供了一种方便的方式来管理和加载不同环境的配置文件。 以下是一些常见的配置文件命名约定&#xff1a; application.properties: 默认的配置文件&#xff0c;适用于所有环…

Unity 性能优化五:渲染模块压力

CPU压力 Batching 在GPU渲染前&#xff0c;CPU会把数据按batch发送给GPU&#xff0c;每发送一次&#xff0c;都是一个drawcall&#xff0c;GPU在渲染每个batch的时候&#xff0c;会切换渲染状态&#xff0c;这里的渲染状态指的是&#xff1a;影响对象在屏幕上的外观的渲染属性…

【公益】Q学友联合福田人力资源局开展“侨香社区促就业 技能培训强本领”

落实《“十四五”就业促进规划》文件精神&#xff0c;进一步提高就业劳动者就业技能水平&#xff0c;提高居民就业率&#xff0c;侨香社区党委坚持以党建为引领&#xff0c;整合多方资源&#xff0c;深入开展“我为群众办实事”&#xff0c;切合群众实际、满足群众需求&#xf…

Huggingface基本使用

目录 0.install 1.tokenizer 2.datasets 3.metrics 0.install !pip install transformers !pip install datasets 1.tokenizer from transformers import BertTokenizer#加载预训练字典和分词方法 tokenizer BertTokenizer.from_pretrained(pretrained_model_name_or…

力扣 509. 斐波那契数

题目来源&#xff1a;https://leetcode.cn/problems/fibonacci-number/description/ C题解1&#xff1a;根据题意&#xff0c;直接用递归函数。 class Solution { public:int fib(int n) {if(n 0) return 0;else if(n 1) return 1;else return(fib(n-1) fib(n-2));} }; C题…

【物联网无线通信技术】UWB定位从理论到实现(DW1000)

超宽带&#xff08;UWB&#xff09;是一种基于IEEE 802.15.4a和802.15.4z标准的无线电技术&#xff0c;可以非常精确地测量无线电信号的飞行时间&#xff0c;从而实现厘米级精度的距离/位置测量。UWB技术除了提供定位功能外&#xff0c;它本身是一种通信技术&#xff0c;其提供…

Java在线OJ项目(三)、前后端交互API模块

Java在线OJ项目&#xff08;三&#xff09;、前后端交互API模块 1. 客户端向服务器请求所有题目 或者 单个题目前端获取所有题目获取一个题目 后端 2. 后端读取前端提交的代码&#xff0c;进行编译运行&#xff0c;返回结果前端提交代码后端处理 1. 客户端向服务器请求所有题目…

【程序设计】一文讲解程序设计原则SOLDI

前言 设计原则&#xff0c;是指导我们如何设计出低耦合、高内聚的代码&#xff0c;让代码能够更好的应对变化&#xff0c;从而降本提效。 设计原则的关键&#xff0c;是从『使用方的角度』看『提供方』的设计&#xff0c;一句话概括就是&#xff1a;请不要要我知道太多&#…

VBA技术资料MF36:VBA_在Excel中排序

【分享成果&#xff0c;随喜正能量】一个人的气质&#xff0c;并不在容颜和身材&#xff0c;而是所经历过的往事&#xff0c;是内在留下的印迹&#xff0c;令人深沉而安谧。所以&#xff0c;优雅是一种阅历的凝聚&#xff1b;淡然是一段人生的沉淀。时间会让一颗灵魂&#xff0…

IO流(2)-缓冲流

1. 缓冲流的简单介绍 我们上贴说到了 FileInputStream&#xff0c;FileOutputStream&#xff0c;FileReader&#xff0c;FileWriter。 其实这四个流&#xff0c;我们通常把它叫做原始流&#xff0c;它们是比较偏底层的&#xff1b;而今天我们要说的四个缓冲流&#xff0c;如…