【Hadoop】MapReduce数据倾斜问题解决方案

news2024/11/17 7:47:05

       默认情况下Map任务的数量与InputSplit数量保持一致,Map阶段的执行效率也与InputSplit数量相关,当遇到大量的小文件时我们采用SequenceFile合并成一个大文件,以此来提高运行效率(【Hadoop】MapReduce小文件问题解决方案(SequenceFile,MapFile));默认情况下只有一个Reduce任务,那么解决了map阶段的小文件问题后,Reduce阶段的运行效率就是MapReduce运行效率的短板,我们当然可以通过增加Reduce任务的数量来提高数据处理的并行度,以此提高效率,但这种处理是“治标不治本”的。若遇到数据倾斜问题,仅增加Reduce任务的数量是于事无补的。

文章目录

  • 1. 增加Reduce任务数量(分区数量)
  • 2. 增加Reduce任务数量+数据打散
    • 2.1 数据打散执行
    • 2.2 执行结果比对
    • 2.3 运行结果处理整合
      • 2.3.1 Mapper代码
      • 2.3.2 Reducer代码
      • 2.3.3 main代码
      • 2.3.4 执行结果

1. 增加Reduce任务数量(分区数量)

在这里插入图片描述
       MapReduce默认使用哈希方法进行分区,getPatition方法相关源码为:(key.hashcode()&Inyeger.MAX_VALUE)%numReduceTask,其中numReduceTask默认为1,而任何数向1取余都为0,因此默认只有一个分区,又因为一个分区对应一个Reduce任务,所以只有也一个Ruduce。若要提高并行度,增加Reduce任务数,只需要修改numReduceTask数值即可(在Main函数中添加源码job.setNumReduceTasks(N);其中N为需要指定的Reduce数量)。

2. 增加Reduce任务数量+数据打散

       还有一种思路,既然有大量的数据映射到同一个分区并最终进入同一个Reduce任务处理,那么我们就在这些数据后面增加一个便于切割的随机数,让他们均匀映射到不同的分区,最终让每个Reduce处理的数据量大致相同,随后将输出再进行一次MapReduce任务,把数据切割还原并会汇总。
       对统计文本单词数量这个小实践项目的代码进行修改即可实现上述操作。

实验针对的文本数据改为写入901 0000个数字5,1~10的其余数字各10000次。

2.1 数据打散执行

  1. MyMapper类中的map函数重写

在每个5的后面加一个下划线并拼接十以内的随机数。

        @Override
        protected void map(LongWritable k1, Text v1, Mapper<LongWritable, Text, Text, LongWritable>.Context context)
                throws IOException, InterruptedException {
            //k1表示每一行的偏移量,v1表示该行内容
            //首先把每一行的单词切割出来
            String key =  v1.toString();
            if ("5".equals(key))
                key += "_" + random.nextInt(10);
            Text k2 = new Text(key);
            LongWritable v2 = new LongWritable(1L);
            context.write(k2, v2);
        }
  1. 设置10个Reduce任务

在main函数中增加代码:job.setNumReduceTasks(10);

然后打包执行即可。

2.2 执行结果比对

  1. 十个分区:
    在这里插入图片描述

  2. 数据打散+十个分区:
    在这里插入图片描述

       在YARN的界面可以很明显看到仅使用十个分区,需要的运行时间为3mins, 3sec;而进行数据打散后再使用十个分区仅需要1mins, 24sec,运行速度提升了一倍。

2.3 运行结果处理整合

       由于之前进行过数据打散,改变了原始数据,现在再编写一个MapReduce任务处理之前得到的结果。第一次MapReduce的结果如下:

[root@bigData01 ~]# hdfs dfs -ls /out0002
Found 11 items
-rw-r--r--   1 root supergroup          0 2023-01-28 22:23 /out0002/_SUCCESS
-rw-r--r--   1 root supergroup         19 2023-01-28 22:22 /out0002/part-r-00000
-rw-r--r--   1 root supergroup         19 2023-01-28 22:22 /out0002/part-r-00001
-rw-r--r--   1 root supergroup         19 2023-01-28 22:22 /out0002/part-r-00002
-rw-r--r--   1 root supergroup         19 2023-01-28 22:22 /out0002/part-r-00003
-rw-r--r--   1 root supergroup         11 2023-01-28 22:22 /out0002/part-r-00004
-rw-r--r--   1 root supergroup         19 2023-01-28 22:23 /out0002/part-r-00005
-rw-r--r--   1 root supergroup         19 2023-01-28 22:23 /out0002/part-r-00006
-rw-r--r--   1 root supergroup         19 2023-01-28 22:23 /out0002/part-r-00007
-rw-r--r--   1 root supergroup         28 2023-01-28 22:23 /out0002/part-r-00008
-rw-r--r--   1 root supergroup         11 2023-01-28 22:23 /out0002/part-r-00009
[root@bigData01 ~]# hdfs dfs -cat /out0002/*
1	10000
5_3	901618
2	10000
5_4	900945
3	10000
5_5	902323
4	10000
5_6	900200
5_7	899259
5_8	900541
6	10000
5_9	900792
7	10000
5_0	901125
8	10000
10	10000
5_1	900987
9	10000
5_2	902210

现在要做的就是把5_n这样的数据转化为5,并这些文件整合为一个文件。

2.3.1 Mapper代码

public static class MyMapper extends Mapper<LongWritable, Text, Text, LongWritable> {
        /**
         * map函数接收<k1,v1>,产生<k2,v2>
         *
         * @param k1
         * @param v1
         * @param context
         * @throws IOException
         * @throws InterruptedException
         */
        Random random = new Random();
        @Override
        protected void map(LongWritable k1, Text v1, Mapper<LongWritable, Text, Text, LongWritable>.Context context)
                throws IOException, InterruptedException {
            //k1表示每一行的偏移量,v1表示该行内容
            //首先把每一行的单词切割出来
            String key =  v1.toString().split("\t")[0].split("_")[0];
            String value =  v1.toString().split("\t")[1];
            System.out.println("v1:"+v1.toString()+",key:"+key+",value:"+value);
            Text k2 = new Text(key);
            LongWritable v2 = new LongWritable(Long.valueOf(value));
            context.write(k2, v2);
        }
    }

2.3.2 Reducer代码

 public static class MyReducer extends Reducer<Text, LongWritable, Text, LongWritable> {
        /**
         * 对v2s的数据进行累加,保存v2s的和
         *
         * @param k2
         * @param v2s
         * @param context
         * @throws IOException
         * @throws InterruptedException
         */
        @Override
        protected void reduce(Text k2, Iterable<LongWritable> v2s, Reducer<Text, LongWritable, Text,
                LongWritable>.Context context) throws IOException, InterruptedException {
            //sum计算v2s的和
            long sum = 0L;
            for (LongWritable v2 : v2s) {
                sum += v2.get();
            }
            Text k3 = k2;
            LongWritable v3 = new LongWritable(sum);
            context.write(k3, v3);
        }
    }

2.3.3 main代码

public static void main(String[] args) {
        try {
            if (args.length != 2) {
                System.exit(100);
            }
            //配置项
            Configuration configuration = new Configuration();
            //创建一个job
            Job job = Job.getInstance(configuration);
            //不设置的话,集群就找不到这个类
            job.setJarByClass(WordCountSkew_2.class);
            //指定输入路径
            FileInputFormat.setInputPaths(job, new Path(args[0]));
            //指定输出路径
            FileOutputFormat.setOutputPath(job, new Path(args[1]));

            //指定map相关的代码
            job.setMapperClass(MyMapper.class);
            //指定k2的类型
            job.setMapOutputKeyClass(Text.class);
            //指定v2的类型
            job.setOutputValueClass(LongWritable.class);

            //指定reduce相关的代码
            job.setReducerClass(MyReducer.class);
            //指定k3的类型
            job.setOutputKeyClass(Text.class);
            //指定v3的类型
            job.setOutputValueClass(LongWritable.class);

            //提交job
            job.waitForCompletion(true);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

2.3.4 执行结果

再经过MapReduce整合后得到如下结果,统计出了每个数字的出现频率。

[root@bigData01 wordCountSkew]# hdfs dfs -ls /out0006
Found 2 items
-rw-r--r--   1 root supergroup          0 2023-01-29 16:24 /out0006/_SUCCESS
-rw-r--r--   1 root supergroup         83 2023-01-29 16:24 /out0006/part-r-00000
[root@bigData01 wordCountSkew]# hdfs dfs -cat /out0006/part-r-00000
1	10000
10	10000
2	10000
3	10000
4	10000
5	9010000
6	10000
7	10000
8	10000
9	10000

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/184433.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

OJ万题详解––高考排名(C++详解)

题目 题目描述 高考成绩的排名规则是按总分由高到低排&#xff0c;总分相同的人排名应相同&#xff0c;例如有 5 个同学的考高成绩&#xff1a; 考号姓名成绩001c1567002ygh605003gl690004xtb605005wzs567按照成绩排序后&#xff0c;成绩如下&#xff1a; 排名考号姓名成绩1003…

C/C++ 相关低耦合代码的设计

在我们设计C/C 程序的时候&#xff0c;有时需要两个类或者两个模块相互“认识”&#xff0c;或者两个模块间函数互相调用&#xff0c;假设我们正在开发一个网上商店&#xff0c;代表的网店客户的类必须要知道相关的账户。UML图如下&#xff0c;这被称为环依赖&#xff0c;这两个…

【GIS前沿】什么是新型基础测绘、内容、产品体系、特征?

《测绘法》指出&#xff0c;基础测绘是建立和维护全国统一的测绘基准和测绘系统&#xff0c;进行航天航空影像获取&#xff0c;建立和更新维护基础地理信息数据库&#xff0c;提供测绘地理信息应用服务等。 文章目录一、什么是新型基础测绘&#xff1f;二、新型基础测绘的特征三…

6、场景法

为什么使用场景法 现在的系统基本上都是由事件来触发控制流程的。如&#xff1a;我们申请一个项目&#xff0c;需先提交审批单据&#xff0c;再由部门经理审批&#xff0c;审核通过后由总经理来最终审批&#xff0c;如果部门经理审核不通过&#xff0c;就直接退回。每个事件触…

1.Docker Desktop安装设置

1.下载最新版本Download Docker Desktop | Docker 2.进行安装 2.1进行4.x版本安装 2.2最新版本出现问题 出现 docker desktop stopped 过一会后 quit退出&#xff0c;下载3.x版本 2.3继续安装 Enable Hyper-V windows Features 启动Hyper-V windows 虚拟化功能 百度百科-验证…

【GD32F427开发板试用】一、环境搭建与freertos移植

本篇文章来自极术社区与兆易创新组织的GD32F427开发板评测活动&#xff0c;更多开发板试用活动请关注极术社区网站。作者&#xff1a;chenjie 【GD32F427开发板试用】一、环境搭建与freertos移植 【GD32F427开发板试用】二、USB库移植与双USB CDC-ACM功能开发 【GD32F427开发板…

java集合类(属于工具类)概述

Java集合类可用于存储数量不等的对象&#xff0c;并可以实现常用的数据结构&#xff0c;如栈、队列等。除此之外&#xff0c;Java集合还可用于保存具有映射关系的关联数组。 Java集合大致可分为Set、List、Queue和Map四种体系&#xff1a; 其中Set代表无序、不可重复的集合&…

限制系统性能瓶颈的因素、衡量系统性能的指标

文章目录限制系统性能瓶颈的因素cpu内存磁盘IO网络IO异常数据库锁竞争衡量系统性能的指标响应时间吞吐量计算机资源分配使用率负载承受能力有时候我们的程序性能不高,需要提升性能,这个时候可以从以下几个角度去考虑是什么限制了我们的性能瓶颈.限制系统性能瓶颈的因素 cpu 有…

spring-bean的生命周期-【源码解析】-上

一、spring的bean概念Spring最重要的功能就是帮助程序员创建对象&#xff08;也就是IOC&#xff09;&#xff0c;而启动Spring就是为创建Bean对象做准备&#xff0c;所以我们先明白Spring到底是怎么去创建Bean的&#xff0c;也就是先弄明白Bean的生命周期。Bean的生命周期就是指…

RocketMq-dashboard:topic 5min trend 原理和源码分析(一)

本文阅读基础&#xff1a;使用或了解过rocketMq&#xff1b;想了解"topic 5min trend"背后的原理&#xff1b;想了解监控模式如何实现。RocketMq的dashboard&#xff0c;有运维页面&#xff0c;驾驶舱&#xff0c;集群页面&#xff0c;主题页面&#xff0c;消费者页面…

[羊城杯 2020]easyre 1题解

一步一个脚印地耐心攀登&#xff0c;就是别去看顶峰&#xff0c;而要专注于在爬的路。 ——黑泽明 目录 1.查壳 2.IDA静态分析main函数 3.研究三重加密 第一重加密 第二重加密 第三重加密 4.解密 1.查壳 64bit exe文件 2.IDA静态分析main函数 拖入IDA&#xff0c;找到…

芯片验证系列——Checker

在产生了有效的激励后&#xff0c;需要判断出不符合功能描述的行为。Checker就是用于查看DUT是否按照功能描述做出期望的行为&#xff0c;识别出所有的设计缺陷。 按照激励的生成方式和检查的功能点分布可以将验证划分为三种方式&#xff1a; 黑盒验证&#xff1a;验证环境不…

【Vue】前端工程化与 webpack

一、前端工程化前端开发1.1 小白眼中的前端开发会写 HTML CSS JavaScript 就会前端开发需要美化页面样式&#xff0c;就拽一个 bootstrap 过来需要操作 DOM 或发起 Ajax 请求&#xff0c;再拽一个 jQuery 过来需要快速实现网页布局效果&#xff0c;就拽一个 Layui 过来1.2 实…

redis事务详解

事务是逻辑上对数据的一组操作&#xff0c;这操作要么一次全部成功或者这操作全部失败&#xff0c;是不可分割的单位 四大特性 原子性&#xff0c;一致性&#xff0c;隔离性&#xff0c;持久性(ACID) redis的事务 redis是弱事务型数据库&#xff0c;并不具备ACID的全部特性 re…

python情感分析:基于jieba的分词及snownlp的情感分析!

情感分析&#xff08;sentiment analysis&#xff09;是2018年公布的计算机科学技术名词。 它可以根据文本内容判断出所代表的含义是积极的还是负面的&#xff0c;也可以用来分析文本中的意思是褒义还是贬义。 一般应用场景就是能用来做电商的大量评论数据的分析&#xff0c;…

【Linux】基础IO文件操作

目录 基础IO 重谈文件 重谈C语言的文件操作 系统文件IO 理解文件 文件描述符fd 0 & 1 & 2 文件描述符的分配规则 重定向 使用 dup2 系统调用 在minishell中添加重定向功能 缓冲区 理解缓冲区 再次理解缓冲区 基础IO 重谈文件 1、空文件&#xff0c;也要…

C++STL入门:string的基本使用小笔记

目录 一.string类简介 二.string类的常用成员接口 1.string类对象的构造函数接口 2. string类对象的容量操作接口 std::string::size std::string::length std::string::empty std::string::clear std::string::resize std::string::reserve 3.string类对象的访问及遍历操作…

【精品】k8s的CKA考题17道解析

目标一:记住命令关键单词 第4道题:scale replicas 第5道题:cordon、uncordon、drain 第8道题:target-port 目标二:完成操作要求 NoSchedule 查看工作节点的健康状态 ,确定集群中有多少节点为 Ready 状态,并且去除包含 NoSchedule 污点的节点。之后将数字写到/opt/repl…

Mybatis-Plus使用指南

1、了解Mybatis-Plus 1.1、Mybatis-Plus介绍 MyBatis-Plus&#xff08;简称 MP&#xff09;是一个 MyBatis 的增强工具&#xff0c;在 MyBatis 的基础上只做增强不做改变&#xff0c;为简化开发、提高效率而生。 官网&#xff1a;https://mybatis.plus/ 或 https://mp.baomi…

基础IO详解

目录 一、系统文件IO 1.1 open 1.1.1 open的第一个参数 1.1.2 open的第二个参数 1.1.3 open的第三个参数 1.1.4 open的返回值 1.2 close 1.3 write 1.4 read 二、文件描述符 2.1 进程与文件描述符 2.2 文件描述符的分配规则 三、重定向 3.1 自实现重定向原理 3.…