(17)Hive ——MR任务的map与reduce个数由什么决定?

news2024/11/29 1:29:03

一、MapTask的数量由什么决定?

    MapTask的数量由以下参数决定

  • 文件个数
  • 文件大小
  • blocksize

 一般而言,对于每一个输入的文件会有一个map split,每一个分片会开启一个map任务,很容易导致小文件问题(如果不进行小文件合并,极可能导致Hadoop集群资源雪崩)

hive中小文件产生的原因及解决方案见文章:

(14)Hive调优——合并小文件-CSDN博客文章浏览阅读779次,点赞10次,收藏17次。Hive的小文件问题https://blog.csdn.net/SHWAITME/article/details/136108785

maxSize的默认值为256M,minSize的默认值是1byte切片大小splitSize的计算公式:

splitSize=Min(maxSize,Max(minSize,blockSize)) = Min(256M,Max(1 byte ,128M)) = 128M =blockSize

所以默认splitSize就等于blockSize块大小

# minSize的默认值是1byte
set mapred.min.split.size=1

#maxSize的默认值为256M
set mapred.max.split.size=256000000

#hive.input.format是用来指定输入格式的参数。决定了Hive读取数据时使用的输入格式,
set hive.input.format=org.apache.hadoop.hive.ql.io.CombineHiveInputFormat

二、如何调整MapTask的数量

  假设blockSize一直是128M,且splitSize = blockSize = 128M。在不改变blockSize块大小的情况下,如何增加/减少mapTask数量

2.1 增加map的数量

      增加map:需要调小maxSize,且要小于blockSize才有效,例如maxSize调成100byte

      splitSize=Min(maxSize,Max(minSize,blockSize)) = Min(100,Max(1,128*1000*1000)) =100 byte = maxSize

   调整前的map数 = 输入文件的大小/ splitSize = 输入文件的大小/ 128M

   调整后的map数 = 输入文件的大小/ splitSize = 输入文件的大小/ 100byte

2.2 减少map的数量 

     减少map:需要调大minSize ,且要大于blockSize才有效,例如minSize 调成200M

      splitSize=Min(maxSize,Max(minSize,blockSize)) = Min(256m, Max(200M,128M)) = 200M=minSize

   调整前的map数 = 输入文件的大小/ splitSize = 输入文件的大小/ 128M

   调整后的map数 = 输入文件的大小/ splitSize = 输入文件的大小/ 200M

三、ReduceTask的数量决定

    reduce的个数决定hdfs上落地文件的个数(即: reduce个数决定文件的输出个数)。 ReduceTask的数量,由参数mapreduce.job.reduces 控制,默认值为 -1 时,代表ReduceTask的数量是根据hive的数据量动态计算的。

总体而言,ReduceTask的数量决定方式有以下两种:

3.1 方式一:hive动态计算

3.1.1 动态计算公式

 ReduceTask数量 = min (参数2,输入的总数据量/ 参数1)

参数1:hive.exec.reducers.bytes.per.reducer 

      含义:每个reduce任务处理的数据量(默认值:256M)

参数2:hive.exec.reducers.max 

     含义:每个MR任务能开启的reduce任务数的上限值(默认值:1009个)

ps: 一般参数2的值不会轻易变动,因此在普通集群规模下,hive根据数据量动态计算reduce的个数,计算公式为:输入总数据量/hive.exec.reducers.bytes.per.reducer

3.1.2 源码分析

(1)通过源码分析 hive是如何动态计算reduceTask的个数的?

	在org.apache.hadoop.hive.ql.exec.mr包下的 MapRedTask类中
	//方法类调用逻辑
	MapRedTask 
	     | ----setNumberOfReducers 
	                  | ---- estimateNumberOfReducers
	                    		 |---- estimateReducers         

(2)核心方法setNumberOfReducers(读取 用户手动设置的reduce个数)

  /**
   * Set the number of reducers for the mapred work.
   */
  private void setNumberOfReducers() throws IOException {
    ReduceWork rWork = work.getReduceWork();
    // this is a temporary hack to fix things that are not fixed in the compiler
    // 获取通过外部传参设置reduce数量的值 rWork.getNumReduceTasks() 
    Integer numReducersFromWork = rWork == null ? 0 : rWork.getNumReduceTasks();

    if (rWork == null) {
      console
          .printInfo("Number of reduce tasks is set to 0 since there's no reduce operator");
    } else {
       
      if (numReducersFromWork >= 0) {
      //如果手动设置了reduce的数量 大于等于0 ,则进来,控制台打印日志
        console.printInfo("Number of reduce tasks determined at compile time: "
            + rWork.getNumReduceTasks());
      } else if (job.getNumReduceTasks() > 0) {
      //如果手动设置了reduce的数量,获取配置中的值,并传入到work中
        int reducers = job.getNumReduceTasks();
        rWork.setNumReduceTasks(reducers);
        console
            .printInfo("Number of reduce tasks not specified. Defaulting to jobconf value of: "
            + reducers);
      } else {
       //如果没有手动设置reduce的数量,进入方法
        if (inputSummary == null) {
          inputSummary =  Utilities.getInputSummary(driverContext.getCtx(), work.getMapWork(), null);
        }
   // #==========【重中之中】estimateNumberOfReducers 
        int reducers = Utilities.estimateNumberOfReducers(conf, inputSummary, work.getMapWork(),
                                                          work.isFinalMapRed());
        rWork.setNumReduceTasks(reducers);
        console
            .printInfo("Number of reduce tasks not specified. Estimated from input data size: "
            + reducers);

      }
      
      //hive shell中所看到的控制台打印日志就在这里
      console
          .printInfo("In order to change the average load for a reducer (in bytes):");
      console.printInfo("  set " + HiveConf.ConfVars.BYTESPERREDUCER.varname
          + "=<number>");
      console.printInfo("In order to limit the maximum number of reducers:");
      console.printInfo("  set " + HiveConf.ConfVars.MAXREDUCERS.varname
          + "=<number>");
      console.printInfo("In order to set a constant number of reducers:");
      console.printInfo("  set " + HiveConf.ConfVars.HADOOPNUMREDUCERS
          + "=<number>");
    }
  }

 (3)如果没有手动设置reduce的个数,hive是如何动态计算reduce个数的?

    int reducers = Utilities.estimateNumberOfReducers(conf, inputSummary, work.getMapWork(),
                                                          work.isFinalMapRed());
                                                          
  /**
   * Estimate the number of reducers needed for this job, based on job input,
   * and configuration parameters.
   *
   * The output of this method should only be used if the output of this
   * MapRedTask is not being used to populate a bucketed table and the user
   * has not specified the number of reducers to use.
   *
   * @return the number of reducers.
   */
  public static int estimateNumberOfReducers(HiveConf conf, ContentSummary inputSummary,
                                             MapWork work, boolean finalMapRed) throws IOException {
    // bytesPerReducer 每个reduce处理的数据量,默认值为256M  BYTESPERREDUCER("hive.exec.reducers.bytes.per.reducer", 256000000L)
    long bytesPerReducer = conf.getLongVar(HiveConf.ConfVars.BYTESPERREDUCER);
    
    //整个mr任务,可以开启的reduce个数的上限值:maxReducers的默认值1009个MAXREDUCERS("hive.exec.reducers.max", 1009)
    int maxReducers = conf.getIntVar(HiveConf.ConfVars.MAXREDUCERS);
   
   //#===========对totalInputFileSize的计算
    double samplePercentage = getHighestSamplePercentage(work);
    long totalInputFileSize = getTotalInputFileSize(inputSummary, work, samplePercentage);

    // if all inputs are sampled, we should shrink the size of reducers accordingly.
    if (totalInputFileSize != inputSummary.getLength()) {
      LOG.info("BytesPerReducer=" + bytesPerReducer + " maxReducers="
          + maxReducers + " estimated totalInputFileSize=" + totalInputFileSize);
    } else {
      LOG.info("BytesPerReducer=" + bytesPerReducer + " maxReducers="
        + maxReducers + " totalInputFileSize=" + totalInputFileSize);
    }

    // If this map reduce job writes final data to a table and bucketing is being inferred,
    // and the user has configured Hive to do this, make sure the number of reducers is a
    // power of two
    boolean powersOfTwo = conf.getBoolVar(HiveConf.ConfVars.HIVE_INFER_BUCKET_SORT_NUM_BUCKETS_POWER_TWO) &&
        finalMapRed && !work.getBucketedColsByDirectory().isEmpty();
    
    //#==============【真正计算reduce个数的方法】看源码的技巧return的方法是重要核心方法
    return estimateReducers(totalInputFileSize, bytesPerReducer, maxReducers, powersOfTwo);
  }                                                

(4) 动态计算reduce个数的方法 estimateReducers

	public static int estimateReducers(long totalInputFileSize, long bytesPerReducer,
      int maxReducers, boolean powersOfTwo) {
    
  
    double bytes = Math.max(totalInputFileSize, bytesPerReducer);
  // 假设totalInputFileSize 1000M
    // bytes=Math.max(1000M,256M)=1000M


    int reducers = (int) Math.ceil(bytes / bytesPerReducer);
    
    //reducers=(int)Math.ceil(1000M/256M)=4  此公式说明如果totalInputFileSize 小于256M ,则reducers=1 ;也就是当输入reduce端的数据量特别小,即使手动设置reduce Task数量为5,最终也只会开启1个reduceTask
    
  
    reducers = Math.max(1, reducers);
  //Math.max(1, 4)=4 ,reducers的结果还是4

    reducers = Math.min(maxReducers, reducers);
    //Math.min(1009,4)=4; reducers的结果还是4

      
    int reducersLog = (int)(Math.log(reducers) / Math.log(2)) + 1;
    int reducersPowerTwo = (int)Math.pow(2, reducersLog);

    if (powersOfTwo) {
      // If the original number of reducers was a power of two, use that
      if (reducersPowerTwo / 2 == reducers) {
        // nothing to do
      } else if (reducersPowerTwo > maxReducers) {
        // If the next power of two greater than the original number of reducers is greater
        // than the max number of reducers, use the preceding power of two, which is strictly
        // less than the original number of reducers and hence the max
        reducers = reducersPowerTwo / 2;
      } else {
        // Otherwise use the smallest power of two greater than the original number of reducers
        reducers = reducersPowerTwo;
      }
    }
    return reducers;
  }

3.2 方式二:用户手动指定

 手动调整reduce个数: set mapreduce.job.reduces = 10

 需要注意:出现以下几种情况时,手动调整reduce个数不生效。

3.2.1 order by 全局排序

   sql中使用了order by全局排序,那只能在一个reduce中完成,无论怎么调整reduce的数量都是无效的。

  
  hive (default)>set mapreduce.job.reduces=5;
  hive (default)> select * from empt order  by length(ename);
  Hadoop job information for Stage-1: number of mappers: 1; number of reducers: 1

3.2.2 map端输出的数据量很小

在【3.1.2 源码分析——(4) 】动态计算reduce个数的核心方法 estimateReducers中,有下面这三行代码:

int reducers = (int) Math.ceil(bytes / bytesPerReducer);
reducers = Math.max(1, reducers);
reducers = Math.min(maxReducers, reducers);

   如果map端输出的数据量bytes (假如只有1M) 远小于hive.exec.reducers.bytes.per.reducer (每个reduce处理的数据量默认值为256M) 参数值,maxReducers默认为1009个,计算下列值:

  int reducers  = (int) Math.ceil(1 / 256M)=1;
  reducers = Math.max(1, 1)=1;
  reducers = Math.min(1009, 1)=1;

  此时即使用户手动 set mapreduce.job.reduces=10,也不生效,reduce个数最后还是只有1个。

参考文章:

Hive mapreduce的map与reduce个数由什么决定?_hive中map任务和reduce任务数量计算原理-CSDN博客

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1450300.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Mac终端远程访问Linux

以ubuntu为例 一、查看ubuntu的ip地址 1、下载net-tools localhostubuntu-server:~$ sudo apt install net-tools 2、查看ip地址 localhostubuntu-server:~$ ifconfig ubuntu需要下载net-tools才能使用ifconfig localhostubuntu-server:~$ sudo apt install net-tools 二…

问题:下列不属于影响职业选择的内在因素是()。 #微信#微信

问题&#xff1a;下列不属于影响职业选择的内在因素是&#xff08;&#xff09;。 A.健康 B.个性特征 C.性别 D.家庭的影响 参考答案如图所示

算法刷题day13

目录 引言一、蜗牛 引言 今天时间有点紧&#xff0c;只搞了一道题目&#xff0c;不过确实搞了三个小时&#xff0c;才搞完&#xff0c;主要是也有点晚了&#xff0c;也好累啊&#xff0c;不过也还是可以的&#xff0c;学了状态DP&#xff0c;把建图和spfa算法熟悉了一下&#…

[嵌入式AI从0开始到入土]14_orangepi_aipro小修补含yolov7多线程案例

[嵌入式AI从0开始到入土]嵌入式AI系列教程 注&#xff1a;等我摸完鱼再把链接补上 可以关注我的B站号工具人呵呵的个人空间&#xff0c;后期会考虑出视频教程&#xff0c;务必催更&#xff0c;以防我变身鸽王。 第1期 昇腾Altas 200 DK上手 第2期 下载昇腾案例并运行 第3期 官…

openGauss学习笔记-221 openGauss性能调优-确定性能调优范围-分析作业是否被阻塞

文章目录 openGauss学习笔记-221 openGauss性能调优-确定性能调优范围-分析作业是否被阻塞221.1 操作步骤 openGauss学习笔记-221 openGauss性能调优-确定性能调优范围-分析作业是否被阻塞 数据库系统运行时&#xff0c;在某些业务场景下查询语句会被阻塞&#xff0c;导致语句…

备战蓝桥杯---图论之最短路dijkstra算法

目录 先分个类吧&#xff1a; 1.对于有向无环图&#xff0c;我们直接拓扑排序&#xff0c;和AOE网类似&#xff0c;把取max改成min即可。 2.边权全部相等&#xff0c;直接BFS即可 3.单源点最短路 从一个点出发&#xff0c;到达其他顶点的最短路长度。 Dijkstra算法&#x…

vmware workstation群晖虚拟机vmdk文件导出

为了防止群晖虚拟机中整个挂掉&#xff0c;里面的文件导不出来&#xff0c;尝试直接从vmdk中获取内容。 1、想采用diskgenius去读取文件&#xff0c;发现volume1是空的。只能读取群晖的系统文件。 2、选择另一个linux系统的虚拟机&#xff0c;选择对应的vmdk 3、如果有文件管理…

Python slice函数

在Python编程中&#xff0c;slice&#xff08;切片&#xff09;操作是一种强大且灵活的方式&#xff0c;用于从序列&#xff08;如列表、元组、字符串等&#xff09;中获取子序列。通过切片操作&#xff0c;可以轻松地提取序列中的一部分&#xff0c;进行遍历、修改、复制等操作…

springboot185基于vue.js的客户关系管理系统(crm)的设计与实现

简介 【毕设源码推荐 javaweb 项目】基于springbootvue 的 适用于计算机类毕业设计&#xff0c;课程设计参考与学习用途。仅供学习参考&#xff0c; 不得用于商业或者非法用途&#xff0c;否则&#xff0c;一切后果请用户自负。 看运行截图看 第五章 第四章 获取资料方式 **项…

C语言学习day13:嵌套循环+练习题(时钟+乘法表)

嵌套循环通常是外面一层for循环&#xff0c;里面n层for循环 代码&#xff1a; int main1601() {//外层执行一次&#xff0c;内层执行一周for (int i 0; i < 5; i){for (int j 0; j < 5; j){printf("i%d,j%d\n",i,j);}}system("pause");return EX…

每日一练:LeeCode-98、 验证二叉搜索树【二叉搜索树+DFS】

本文是力扣LeeCode-98、 验证二叉搜索树【二叉搜索树DFS】】 学习与理解过程&#xff0c;本文仅做学习之用&#xff0c;对本题感兴趣的小伙伴可以出门左拐LeeCode。 给你一个二叉树的根节点 root &#xff0c;判断其是否是一个有效的二叉搜索树。 有效 二叉搜索树定义如下&am…

Editing While Playing 使用 Easyx 开发的 RPG 地图编辑器 tilemap eaitor

AWSD移动画布 鼠标右键长按拖拽 鼠标左键长按绘制 可以边拖拽边移动画布边绘制。 F1 导出 DLC F2 导入DLC author: 民用级脑的研发记录 1309602336qq.com 开发环境&#xff1a; 内置 easyx 的 devc 5.11 或者 VS 2022 TDM GCC 4.9.2 64-bit c11及以上都可运行 windows 环境运行…

[BIZ] - 1.金融交易系统特点

1. 典型数据汇总 数据 说明 新增数据量(条/天) Qps(条/s) 消息大小(Byte) 实时性 可丢失性 可恢复性 实时行情 1.使用场景&#xff1a;交易&#xff0c;报价&#xff0c;策略验证&#xff1b; 2.冷热分离&#xff1a;彭博行情/其他行情&#xff1b;黄金&期货行情/…

node+vue3+mysql前后分离开发范式——实现对数据库表的增删改查

文章目录 ⭐前言⭐ 功能设计与实现💖 node后端操作数据库实现增删改查💖 vue3前端实现增删改查⭐ 效果⭐ 总结⭐ 结束⭐结束⭐前言 大家好,我是yma16,本文分享关于 node+vue3+mysql前后分离开发范式——实现对数据库表的增删改查。 技术选型 前端:vite+vue3+antd 后端:…

编程语言的实际应用场景(C语言场景)

从应用范围上来说&#xff0c;这些编程语言大致可以分为两种&#xff1a; 一种是专用型语言&#xff0c;也就是针对某个特定领域而设计出来的语言&#xff1b;另一种是通用型语言&#xff0c;它们可以开发多种类型的应用程序&#xff0c;而不是局限在某个特定的领域。 专用型…

[职场] 投资顾问是做什么? #知识分享#其他#微信

投资顾问是做什么&#xff1f; 投资顾问是指专门从事于提供投资建议而获薪酬的人士&#xff0c;它是投资服务中非常重要的角色。投资顾问&#xff0c;有广义和狭义之分。广义的投资顾问&#xff0c;可以是指为金融投资、房地产投资、商品投资等各类投资领域提供专业建议的专业人…

UI文件原理

使用UI文件创建界面很轻松很便捷&#xff0c;他的原理就是每次我们保存UI文件的时候&#xff0c;QtCreator就自动帮我们将UI文件翻译成C的图形界面创建代码。可以通过以下步骤查看代码 到工程编译目录&#xff0c;一般就是工程同级目录下会生成另一个编译目录&#xff0c;会找到…

QT 工具栏 状态栏 停靠部件 核心部件

添加/删除工具栏 删除工具栏方法和删除菜单栏方法一样&#xff0c;不过工具栏可以有多个&#xff0c;所以每次右键MainWindow对象&#xff0c;都可以看到添加工具栏的选项。 工具栏添加动作 新添加的QAction对象会在动作编辑器里找到&#xff08;Action Editor&#xff09;&a…

算法之贪心

1.部分背包问题 代码1&#xff1a; 代码2&#xff1a; 但如果金币不能分割&#xff0c;那贪心就不是最优解&#xff0c;正确的做法是搜索或动态规划。 2.排队接水 3.在规定时间内参加最多的比赛 4.合并果子 使用memset初始化int数组时&#xff0c;第二个参数如果是0&#xff0…

vscode运行C/C++时候cmd.exe界面显示

写了一些命令行传参的程序&#xff0c;需要终端输入参数&#xff0c;默认是输出结果显示在它自己的终端界面 Code-runner: Run In Terminal 打勾就行 效果&#xff1a;