Spark Rebalance hint的倾斜的处理(OptimizeSkewInRebalancePartitions)

news2025/1/11 20:05:14

背景

本文基于Spark 3.5.0
目前公司在做小文件合并的时候用到了 Spark Rebalance 这个算子,这个算子的主要作用是在AQE阶段的最后写文件的阶段进行小文件的合并,使得最后落盘的文件不会太大也不会太小,从而达到小文件合并的作用,这其中的主要原理是在于三个规则:OptimizeSkewInRebalancePartitions,CoalesceShufflePartitions,OptimizeShuffleWithLocalRead,这里主要说一下OptimizeSkewInRebalancePartitions规则,CoalesceShufflePartitions的作用主要是进行文件的合并,是得文件不会太小,OptimizeShuffleWithLocalRead的作用是加速shuffle fetch的速度。

结论

OptimizeSkewInRebalancePartitions的作用是对小文件进行拆分,使得罗盘的文件不会太大,这个会有个问题,如果我们在使用Rebalance(col)这种情况的时候,如果col的值是固定的,比如说值永远是20240320,那么这里就得注意一下,关于OptimizeSkewInRebalancePartitions涉及到的参数spark.sql.adaptive.optimizeSkewsInRebalancePartitions.enabled,spark.sql.adaptive.advisoryPartitionSizeInBytes,spark.sql.adaptive.rebalancePartitionsSmallPartitionFactor 这些值配置,如果这些配置调整的不合适,就会导致写文件的时候有可能只有一个Task在运行,那么最终就只有一个文件。而且大大加长了整个任务的运行时间。

分析

直接到OptimizeSkewInRebalancePartitions中的代码中来:

  override def apply(plan: SparkPlan): SparkPlan = {
    if (!conf.getConf(SQLConf.ADAPTIVE_OPTIMIZE_SKEWS_IN_REBALANCE_PARTITIONS_ENABLED)) {
      return plan
    }

    plan transformUp {
      case stage: ShuffleQueryStageExec if isSupported(stage.shuffle) =>
        tryOptimizeSkewedPartitions(stage)
    }
  }

如果我们禁用掉对rebalance的倾斜处理,也就是spark.sql.adaptive.optimizeSkewsInRebalancePartitions.enabled为false(默认是true),那么就不会应用此规则,那么如果Col为固定值的情况下,就只会有一个Task进行文件的写入操作,也就只有一个文件,因为一个Task会拉取所有的Map的数据(因为此时每个maptask上的hash(Col)都是一样的,此时只有一个reduce task去拉取数据),如图:

在这里插入图片描述
假如说hash(col)为0,那实际上只有reduceTask0有数据,其他的ReduceTask1等等都是没有数据的,所以最终只有ReduceTask0写文件,并且只有一个文件。

在看合并的计算公式,该数据流如下:

 tryOptimizeSkewedPartitions
      ||
      \/
 optimizeSkewedPartitions
      ||
      \/
 ShufflePartitionsUtil.createSkewPartitionSpecs
      ||
      \/
 ShufflePartitionsUtil.splitSizeListByTargetSize

splitSizeListByTargetSize方法中涉及到的参数解释如下 :

  • 参数 sizes: Array[Long] 表示属于同一个reduce任务的maptask任务的大小数组,举例 sizes = [100,200,300,400]
    表明该任务有4个maptask,0表示maptask为0的所属reduce的大小,1表示maptask为1的所属reduce的大小,依次类推,图解如下:

在这里插入图片描述
比如说reduceTask0的从Maptask拉取的数据的大小分别是100,200,300,400.

  • 参数targetSize 为 spark.sql.adaptive.advisoryPartitionSizeInBytes的值,假如说是256MB
  • 参数smallPartitionFactor为spark.sql.adaptive.rebalancePartitionsSmallPartitionFactor 的值,默认是0.2
    这里有个计算公式:
    def tryMergePartitions() = {
      // When we are going to start a new partition, it's possible that the current partition or
      // the previous partition is very small and it's better to merge the current partition into
      // the previous partition.
      val shouldMergePartitions = lastPartitionSize > -1 &&
        ((currentPartitionSize + lastPartitionSize) < targetSize * MERGED_PARTITION_FACTOR ||
        (currentPartitionSize < targetSize * smallPartitionFactor ||
          lastPartitionSize < targetSize * smallPartitionFactor))
      if (shouldMergePartitions) {
        // We decide to merge the current partition into the previous one, so the start index of
        // the current partition should be removed.
        partitionStartIndices.remove(partitionStartIndices.length - 1)
        lastPartitionSize += currentPartitionSize
      } else {
        lastPartitionSize = currentPartitionSize
      }
    }
    。。。
    while (i < sizes.length) {
      // If including the next size in the current partition exceeds the target size, package the
      // current partition and start a new partition.
      if (i > 0 && currentPartitionSize + sizes(i) > targetSize) {
        tryMergePartitions()
        partitionStartIndices += i
        currentPartitionSize = sizes(i)
      } else {
        currentPartitionSize += sizes(i)
      }
      i += 1
    }
    tryMergePartitions()
    partitionStartIndices.toArray

这里的计算公式大致就是:从每个maptask中的获取到属于同一个reduce的数值,依次累加,如果大于targetSize就尝试合并,直至到最后一个maptask
可以看到tryMergePartitions有个计算公式:currentPartitionSize < targetSize * smallPartitionFactor,也就是说如果当前maptask的对应的reduce分区数据 小于 256MB*0.2 = 51.2MB 的话,也还是会合并到前一个分区中去,如果smallPartitionFactor设置过大,可能会导致所有的分区都会合并到一个分区中去,最终会导致一个文件会有几十GB(也就是targetSize * smallPartitionFactor`*shuffleNum),
比如说以下的测试案例:

    val targetSize = 100
    val smallPartitionFactor2 = 0.5
    // merge last two partition if their size is not bigger than smallPartitionFactor * target
    val sizeList5 = Array[Long](50, 50, 40, 5)
    assert(ShufflePartitionsUtil.splitSizeListByTargetSize(
      sizeList5, targetSize, smallPartitionFactor2).toSeq ==
      Seq(0))

    val sizeList6 = Array[Long](40, 5, 50, 45)
    assert(ShufflePartitionsUtil.splitSizeListByTargetSize(
      sizeList6, targetSize, smallPartitionFactor2).toSeq ==
      Seq(0))

这种情况下,就会只有一个reduce任务运行。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1535188.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

CSS弹性盒模型(学习笔记)

一、厂商前缀 1.1 作用 解决浏览器对C3新特性的兼容&#xff0c;不同的浏览器厂商&#xff0c;定义了自己的厂商前缀 1.2 语法 浏览器 厂商前缀内核(渲染引擎)&#xff1a;解析htmlcssjs谷歌 -webkit-blink苹果-webkit-webkit欧朋-o-blink火狐 -moz-geckoIE-ms- trid…

sentinel流控规则详解(图形化界面)

1、QPS 每秒的请求数&#xff0c;超过这个请求数&#xff0c;直接流控。 2、BlockException统一异常处理 2.1、创建异常返回类 Result.class public class Result<T> {private Integer code;private String msg;private T data;public Result(Integer code, String ms…

算法设计与分析-贪心算法的应用动态规划算法的应用——沐雨先生

1&#xff0e; 理解贪心算法的概念&#xff1b; 2&#xff0e;掌握贪心算法的基本思想。 &#xff08;1&#xff09;删数问题 问题描述&#xff1a;给定 n 位正整数 a &#xff0c;去掉其中任意 k ≤ n 个数字后&#xff0c;剩下的数字按原次序排列组成一个新的正整数。对于…

Eclipse For ABAP:安装依赖报错

1.安装好Eclipse后需要添加依赖,这里的地址: https://tools.hana.ondemand.com/latest 全部勾选等待安装结束; 重启后报错:ABAP communication layer is not configured properly. This might be caused by missing Microsoft Visual C++ 2013 (x64) Runtime DLLs. Consu…

【面试经典150 | 数组】分发糖果

文章目录 写在前面Tag题目来源解题思路方法一&#xff1a;贪心两次遍历 写在最后 写在前面 本专栏专注于分析与讲解【面试经典150】算法&#xff0c;两到三天更新一篇文章&#xff0c;欢迎催更…… 专栏内容以分析题目为主&#xff0c;并附带一些对于本题涉及到的数据结构等内容…

STM32最小核心板使用HAL库实现CAN接口通讯(轮询方式)

这里使用了CAN1的接口&#xff0c;具体使用MX创建项目就不放了 需要注意的是&#xff0c;由于是最小核心没有CAN的收发模块需要外接一个 STM32核心板接CAN收发模块不需要交叉 /**CAN GPIO ConfigurationPA11 ------> CAN_RXPA12 ------> CAN_TX */ CAN收发模块…

数据结构知识总结

二叉树 满二叉树 特性 所有叶子结点都集中在二叉树的最下面一层上&#xff0c;而且结点总数为&#xff1a;2^n-1 (n为层数 / 高度&#xff09; 完全二叉树 特性 若设二叉树的高度为h&#xff0c;除第h层外&#xff0c;其他各层的节点数都达到最大个数&#xff0c;第h层有…

Windows 设置多显示器显示

Windows 设置多显示器显示 1. Windows 7 设置 HDMI 输出2. Windows 11 设置多显示器显示References 1. Windows 7 设置 HDMI 输出 2. Windows 11 设置多显示器显示 ​​​ References [1] Yongqiang Cheng, https://yongqiang.blog.csdn.net/

MAC本安装telnet

Linux运维工具-ywtool 目录 1.打开终端1.先安装brew命令2.写入环境变量4.安装telnet 1.打开终端 访达 - 应用程序(左侧) - 实用工具(右侧) - 终端 #注意:登入终端用普通用户,不要用MAC的root用户1.先安装brew命令 /bin/zsh -c "$(curl -fsSL https://gitee.com/cunkai/H…

Android Studio实现内容丰富的安卓校园助手班级成绩天气管理

获取源码请点击文章末尾QQ名片联系&#xff0c;源码不免费&#xff0c;尊重创作&#xff0c;尊重劳动 1.开发环境 android stuido3.6 jak1.8 eclipse mysql tomcat 2.功能介绍 安卓端&#xff1a; 1.注册登录 2.校园公告 3.课程列表 4.成绩列表&#xff0c;天气列表 5.个人中心…

python + tensorflow 开局托儿所自动点击脚本

python开局托儿所自动点击脚本 屏幕截图图片数字识别消除算法自动点击 屏幕截图 python 屏幕截图可以使用pyautogui或者PIL。我使用的是PIL中的ImageGrab(要授权)。 image ImageGrab.grab(bbox(0, 0, tool.static_window_width, tool.static_window_height)) image np.arra…

人脸表情识别系统项目完整实现详解——(三)训练MobileNet深度神经网络识别表情

摘要&#xff1a;之前的表情识别系统升级到v3.0版本&#xff0c;本篇博客详细介绍使用PyTorch框架来构建并训练MobileNet V3模型以进行实现表情识别&#xff0c;给出了完整实现代码和数据集可供下载。从构建数据集、搭建深度学习模型、数据增强、早停等多种技术&#xff0c;到模…

Django日志(一)

一、概念与配置 1.1、概述 日志是程序员经常在代码中使用快速和方便的调试工具。它在调试方面比print更加的优雅和灵活 而且日志记录对于调试很有用,可以提供更多,更好的结构化,有关应用程序的状态和运行状况的信息 Django框架的日志通过python内置的logging模块实现的,可…

inputStream.avaliable()方法网络操作读取不全BUG

一、问题描述 公司有个需求&#xff0c;就是调用方&#xff08;我&#xff09;需要把pdf文件转为Base64字符串作为参数传递为被调用方&#xff0c;以下是大致转换过程&#xff1a; URL url new URL("http://xxxx.pdf");HttpURLConnection uc (HttpURLConnection) …

XR“黑话”

MTP&#xff08;Motion-To-Photon Latency&#xff09;&#xff1a;实际人体发生运动到图像显示到屏幕上的时间延迟。早期一些vr产生晕动症的主要原因。 ATW&#xff08;Asynchronous Timewarp&#xff09;&#xff1a;主要解决两个问题&#xff0c;一是延迟&#xff0c;二是补…

R语言Meta分析核心技术:回归诊断与模型验证

R语言作为一种强大的统计分析和绘图语言&#xff0c;在科研领域发挥着日益重要的作用。其中&#xff0c;Meta分析作为一种整合多个独立研究结果的统计方法&#xff0c;在R语言中得到了广泛的应用。通过R语言进行Meta分析&#xff0c;研究者能够更为准确、全面地评估某一研究问题…

Django日志(三)

内置TimedRotatingFileHandler 按时间自动切分的log文件,文件后缀 %Y-%m-%d_%H-%M-%S , 初始化参数: 注意 发送邮件的邮箱,开启SMTP服务 filename when=h 时间间隔类型,不区分大小写 S:秒 M:分钟 H:小时 D:天 W0-W6:星期几(0 = 星期一) midnight:如果atTime未指定,…

Avue框架实现图表的基本知识 | 附Demo(全)

目录 前言1. 柱状图2. 折线图3. 饼图4. 刻度盘6. 仪表盘7. 象形图8. 彩蛋8.1 饼图8.2 柱状图8.3 折线图8.4 温度仪表盘8.5 进度条 前言 以下Demo&#xff0c;作为初学者来说&#xff0c;会相应给出一些代码注释&#xff0c;可相应选择你所想要的款式 对于以下Demo&#xff0c…

管理类联考–复试–英文面试–问题–WhatWhyHow--纯英文汇总版

文章目录 Do you have any hobbies? What are you interested in? What do you usually do in your spare time? Could you tell me something about your family&#xff1f; Could you briefly introduce your family? What is your hometown like? Please tell me so…

HTTPS 协议原理

目录 HTTPS VS HTTP HTTPS是什么 概念准备 常见的加密方式 对称加密 一个简单的对称加密例子 非对称加密 数据摘要&&数据指纹 数字签名 HTTPS 的工作过程探究 方案1-只使用对称加密 方案2-只使用非对称加密 方案3-双方都使用非对称加密 方案4-非对称加密…