SPARK调优:AQE特性(含脑图总结)

news2024/11/28 4:44:21

学完AQE需要能够回答如下的几个问题:

  1. 什么是AQE?
  2. AQE的实现原理是什么?
  3. AQE的特性有哪些?使用什么参数实现?
  4. AQE每个特性可以解决什么问题?什么问题是AQE不能解决的
  • HL:学习脑图如下

在这里插入图片描述

SparkAQE是spark 3.0引入的一大重要功能,今天我们来聊一聊AQE的实现原理。

了解一个功能,先来了解其面临的问题。当涉及到大型集群中的复杂查询性能时,处理的并行度和正确Join策略选择已被证明是影响性能的关键因素。但Spark SQL在易用性和性能方面仍然存在极具挑战的问题:

一 应运而生,没有AQE时候的挑战

SparkSQL只能设置固定的Shuffle 分区数:
在 Spark SQL 中,shuffle 分区数是通过 spark.sql.shuffle.partition 配置的,默认值为 200。它决定了 reduce 任务的数量,对查询性能影响很大。
当我们配置spark.sql.shuffle.partition 后会默认给所有的join或agg过程中的shuffle设置统一的分区数,这是不合适的。相同的shuffle分区数不能适合单个查询的所有stage,因为每个stage都有不同的输出数据大小和分配。
如果分区数太小,则并行度较低,每个reduce任务必须处理更多数据。由于内存可能无法保存所有数据,甚至会发生溢出到磁盘。在最坏的情况下,它可能会导致严重的 GC 问题或 OOM。此外,如果某些任务处理的数据比其他任务多得多(即数据倾斜),或者由于磁盘较旧、CPU 频率较低等原因运行时间较长,整个集群的资源利用率就会降低,当前阶段的执行时间会更长。这时候就需要通过增加分区数,减少某个分区处理的数据量,可以减轻这类问题的影响。
当然,如果分区值太大,可能会增加调度的开销,因为会有太多的小reduce任务和许多小文件生成,也会导致性能问题。
总之,shuffle 分区数既不能太小也不能太大。为了获得最佳性能,我们经常需要在非生产环境中为多次调整 shuffle 分区数(只能在非SQL的DataSet或DataFrame中设置)。
并不能选择出最优执行计划:
在没有开启
CBO
前,Spark默认从逻辑计划生成的物理计划列表中选择第一个作为物理计划。
而CBO的优化并非是万能的,首先CBO 仅支持注册到 Hive Metastore 的数据表的优化。另外开发者还需预先统计收集表信息,存在性能消耗。最重要的是CBO也是一种静态的优化策略,它结合各类统计信息制定执行计划,一旦执行计划交付运行,CBO 的使命就算完成了。换句话说,Spark在启动后不能在对物理计划进行修改。
但其实在Spark运行中会涉及些必须落盘的操作,例如shuffle等操作。通过落盘信息收集更多的运行时信息,可以提供更加精准的Statistics。
从SparkSQL统计信息文章可以看出,SparkSQL默认对数据的评估是非常不准确的。如果真实表的数据量小于BroadcastHashJoin的阈值,就可以将SortMergeJoin转换为BroadcastHashJoin,它避免了大表在集群中的Shuffle, 这大大提高了查询性能。
数据倾斜严重影响SparkSQL的稳定性:
数据倾斜指的是某些分区的数据大小远大于其他分区的情况,那些相应的任务可能会运行更长时间,依据木桶理论,整个Query变慢。

目前有一些处理数据倾斜的方法:
增加 shuffle 分区数,这样数据更有可能被散列到不同的分区。不幸的是,当大量数据共享相同的hash key时,这将无济于事。
增加 BroadcastHashJoin 阈值以将更多先前plan的 SortMergeJoin 转换为 BroadcastHashJoin。这样可以避免在有限的情况下shuffle造成的数据倾斜。
手动过滤倾斜的key并向这些数据添加随机"加盐"处理。在另一个表中,相应的数据也需要复制。但当其中一个连接表取自中间结果,这种解决办法很难实现。
这些问题的原因一方面是SparkSQL并不能准确的评估处理的数据量,另一方面是不能根据运行中的状态动态的调整运行计划和配置。

为了解决这些问题,Spark社区引入在RDBMS 世界中广泛使用多年基于成本的优化(CBO)。然而,在分布式系统中使用 CBO 是一个“极其复杂的问题”,在Spark中收集和维护一组准确和最新的统计数据是昂贵的。

为了解决这些问题,Spark 社区在 2015 年提出了 Adaptive Execution 的基本思想,并在 DAGScheduler 中,增加了一个新的 API 来支持提交单个map阶段,以及运行时更改 shuffle 分区数。基于社区工作,英特尔和百度团队重新设计了 Adaptive Execution,实现了更加灵活的AQE框架。

二 AQE是什么?它是什么实现原理?

简单来说,AQE 是 Spark SQL 的一种动态优化机制,在运行时,每当 Shuffle Map 阶段执行完毕,AQE 都会结合这个阶段的统计信息,基于既定的规则动态地调整、修正尚未执行的逻辑计划和物理计划,来完成对原始查询语句的运行时优化

首先,AQE 赖以优化的统计信息与 CBO 不同,这些统计信息并不是关于某张表或是哪个列,而是 Shuffle Map 阶段输出的中间文件。学习过 Shuffle 的工作原理之后,我们知道,每个 Map Task 都会输出以 data 为后缀的数据文件,还有以 index 为结尾的索引文件,这些文件统称为中间文件。每个 data 文件的大小、空文件数量与占比、每个 Reduce Task 对应的分区大小,所有这些基于中间文件的统计值构成了 AQE 进行优化的信息来源。

其次,AQE 从运行时获取统计信息,在条件允许的情况下,优化决策会分别作用到逻辑计划和物理计划。

那么它是怎么实现基于统计信息修改尚未执行的逻辑计划和物理计划的呢?

在非AQE的情况下,Spark会在规划阶段确定了物理执行计划后,根据每个算子的定义生成RDD对应的DAG。然后 Spark DAGScheduler通过shuffle来划分RDD Graph并创建stage,然后提交Stage以供执行。

下面我们来看看在启用AQE后的情况:

首先,会将逻辑计划拆分为QueryStage的独立子图,这可以更早地拆分Stage。通过单独提交mapStage,收集它们的MapOutputStatistics对象。

在AQE的plan中定义了两种类型的QueryStage, 分别为:

Shuffle query stages: 将其输出物化为Shuffle文件。
Broadcast query stages:将其输出物化到Driver内存中的数组。

顾名思义,它们分别是基于Shuffle和broadcast来进行划分的。同时这里划分出来的是QueryStage而非Stage。实现上述这个功能的实现是基于DAGScheduler现在支持提交单个mapStage。

其次,AQE还包含对物理规划和执行规划的修改。

在物理规划中,会在原执行计划中找到Exchange,并引入了两个新的操作节点。

QueryStage是一个阶段的根节点,负责运行时决策
QueryStageInput是一个stage的叶子节点,主要目的是在物理计划更新后将子stage的结果提供给它的父亲
在这里插入图片描述
如上图所示,QueryStages 和 QueryStageInputs 是通过在执行计划中查找 Exchange 来添加的。

在执行阶段中,plan树上的任何 QueryStage 都会引用其子阶段并递归执行它们。在 QueryStage 的所有子节点完成后,将收集运行时 shuffle-write 统计信息并用于进一步细化。然后 Spark 重新启动逻辑优化和物理规划阶段,并根据这些新信息动态更新查询计划。
在这里插入图片描述

总之,如图所示,在非AQE的情况下,SparkSQL会转换为DAG图,然后DAGScheduler基于shuffle将其划分为多个stage, 然后再执行stage。

在AQE的情况下,首先会将plan树拆分为多个QueryStages, 在执行时先将它的子 QueryStages 被提交。在所有子节点完成后,收集 shuffle 数据大小。根据收集到的 shuffle 数据统计信息,将当前 QueryStage 的执行计划优化为更好的执行计划。然后转换为DAG图再执行Stage。

三 目前AQE主要有三大特性:

自动分区合并:在 Shuffle 过后,Reduce Task 数据分布参差不齐,AQE 将自动合并过小的数据分区。
Join 策略调整:如果某张表在过滤之后,尺寸小于广播变量阈值,这张表参与的数据关联就会从 Shuffle Sort Merge Join 降级(Demote)为执行效率更高的 Broadcast Hash Join。
自动倾斜处理:结合配置项,AQE 自动拆分 Reduce 阶段过大的数据分区,降低单个 Reduce Task 的工作负载。
下面我们依次来简单了解下。

1 自动分区合并

分区合并的原理比较简单,在 Reduce 阶段,当 Reduce Task 把数据分片从map端拉回,AQE 按照分区编号的顺序,依次把小于目标尺寸的分区合并在一起。

举个例子,假设 shuffle 分区数为 5,每个 reducer 的目标数据大小为 64MB。map阶段完成后,我们知道每个分区的大小分别为70MB、30MB、20MB、10MB和50MB。为了尽量在不拆分分区的情况下,让每个 post-shuffle 分区的大小小于目标数据大小,确定在运行时使用 3 个 reducer 进行平衡。第一个 reducer 处理分区 0 (70MB)。第二个 reducer 处理 3 个连续的分区(分区 1 到 3,总共 60MB)。第三个 reducer 处理分区 4 (50MB)。

在合并分区时,会按照分区编号依次将 shuffle 分区打包到单个coalesced 分区,直到添加另一个 shuffle 分区会导致 coalesced 分区的大小大于目标大小。

此外,在新的AQE框架中,每个 QueryStage 都知道它的所有子阶段,这使得它可以很好地处理 3 个以上的表join,避免引入更多的shuffle。

调整自动合并分区大小主要有两个参数:

1.spark.sql.adaptive.advisoryPartitionSizeInBytes 合并分区的推荐目标大小
2.spark.sql.adaptive.coalescePartitions.minPartitionNum 分区合并后的最小分区数

val maxTargetSize = math.max(  math.ceil(totalPostShuffleInputSize / minNumPartitions.toDouble).toLong, 16)
val targetSize = math.min(maxTargetSize, advisoryTargetSize)

从上面的代码可以看出,目标分区的大小是由两个参数共同决定。shuffle数据的总大小除以最小分区数与推荐目标大小间的最小值即为目标分区的大小。

2 Join 策略调整

在AQE中,shuffle数据的统计信息可以为当前的 QueryStage 的plan提供更好的优化建议。那么当其中一个join表的实际大小小于阈值时,如果使用BroadcastHashJoin而不是SortMergeJoin可以获得更好的性能。

这里有两个优化规则,一个逻辑规则和一个物理策略分别是:DemoteBroadcastHashJoin 和 OptimizeLocalShuffleReader。

DemoteBroadcastHashJoin 规则的作用,是把 Shuffle Sort Merge Joins 降级为 Broadcast Joins。

对于参与 Join 的两张表来说,在它们分别完成 Shuffle Map 阶段的计算之后,DemoteBroadcastHashJoin 会判断中间文件是否满足如下条件:

自动降级为BHJ的参数配置

1.spark.sql.autoBroadcastJoinThreshold 中间文件尺寸总和小于广播阈值
2.spark.sql.adaptive.nonEmptyPartitionRatioForBroadcastJoin 空文件占比小于配置项

只要有任意一张表的统计信息满足这两个条件,Shuffle Sort Merge Join 就会降级为 Broadcast Hash Join。

在这里插入图片描述
但是,AQE 依赖的统计信息来自于 Shuffle Map 阶段生成的中间文件。这意味什么呢?

即使已经将Shuffle Sort Merge Join 就会降级为 Broadcast Hash Join,但join两表都已经按照 Sort Merge Join 的方式走了一半(即大表和小表都进行了shuffle操作)。

在常规的 Shuffle 计算流程中,Reduce 阶段的计算需要跨节点访问中间文件拉取数据分片。如果遵循常规步骤,即便 AQE 在运行时把 Shuffle Sort Merge Join 降级为 Broadcast Join,大表的中间文件还是需要通过网络进行分发。这个时候,AQE 的动态 Join 策略调整也就失去了实用价值。原因很简单,负载最重的大表 Shuffle 计算已经完成,再去决定切换到 Broadcast Join 已经没有任何意义。

在这样的背景下,OptimizeLocalShuffleReader 物理策略就非常重要了。既然大表已经完成 Shuffle Map 阶段的计算,这些计算可不能白白浪费掉。采取 OptimizeLocalShuffleReader 策略可以省去 Shuffle 常规步骤中的网络分发,Reduce Task 可以就地读取本地节点(Local)的中间文件,完成与广播小表的关联操作。

举个例子:假设 shuffle 分区数为 5。在 map 阶段,任一连接都包含 2 个 map 任务。在 reduce 阶段,为 SortMergeJoin 启动了 5 个 reducer,每个 reducer 进行远程 shuffle 读取。但是,如果将 BroadcastHashJoin 转换为,则只需要启动 2 个 reducer,并且每个 reducer 在本地读取一个 mapper 的完整 shuffle 输出。
在这里插入图片描述

这里有3个优点:

没有数据通过网络传输,因此节省了网络I/O。
顺序读取文件比正常随机读取文件的一小部分要快得多。
防止shuffle,避免数据歪斜。

3 自动倾斜处理

在AQE中,执行子 QueryStages 后,收集每个分区的 shuffle 数据大小和记录数。如果一个分区的数据量或记录数比中位数大N倍,也比预先配置的值大,则判断为倾斜分区,该连接判断为倾斜连接。这是在 Reduce 阶段,当 Reduce Task 所需处理的分区尺寸大于一定阈值时,利用 OptimizeSkewedJoin 策略,AQE 会把大分区拆成多个小分区。

OptimizeSkewedJoin规则使 Spark 通过遵循一个简单的规则来实现更好的负载均衡。其中 F 为倾斜因子,S 为倾斜大小和 R 为倾斜行数,一个分区被认为是倾斜的条件是:

它的大小大于 S 并且大于分区大小中位数乘以 F。
它的行数比 R 大,并且大于分区行数的中位数乘以 F。

假设表 A 和表 B 执行内连接并且表 A 中的分区 0 是倾斜的。对于正常执行,表 A 和 B 的分区 0 都被洗牌到单个 reducer 进行处理。由于这个 reducer 需要通过网络和进程获取大量数据,因此它可能是延长整个阶段时间的最慢任务
在这里插入图片描述

如上图所示,N个task用于处理表A的偏斜分区0,每个task只读取表A的少数mapper的shuffle输出,并与表B的分区0进行join,将这N个task的结果合并得到最终的join结果. 为了实现这一点,我们更新了 shuffle read API 以允许仅从几个映射器而不是全部读取分区。

在处理过程中,我们可以看到表 B 的分区 0 将被多次读取。尽管引入了开销,但性能改进仍然很显着。

不过这种解决数据倾斜的方式针对的是Task级别的数据倾斜,主要是将同一个executor内的倾斜task进行拆分,而对于数据全集中在个别executor内的情况就无济于事了。
在这里插入图片描述

对于倾斜的分区集中在某些executor中,我们可以使用两阶段方式:手动加盐复制。

优化数据倾斜的配置参数

spark.sql.adaptive.skewJoin.enabled = true 
spark.sql.adaptive.skewJoin.skewedPartitionFactor = 10  ,判定倾斜的膨胀系数
spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes = 256MB ,判定倾斜的最低阈值
spark.sql.adaptive.advisoryPartitionSizeInBytes,以字节为单位,定义拆分粒度

参考文档,嘎嘎好:

  1. https://time.geekbang.org/column/article/369687?utm_campaign=geektime_search&utm_content=geektime_search&utm_medium=geektime_search&utm_source=geektime_search&utm_term=geektime_search
  2. https://zhuanlan.zhihu.com/p/533982903

不理解的地方:

  1. querystage
  2. 更完整的执行流图

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2186857.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

MES系统适用于哪些行业?MES系统对于企业的作用和价值

MES系统(制造执行系统)广泛应用于多个行业,并在这些行业中发挥着重要作用,为企业带来了显著的价值。以下是对MES系统适用行业及其对企业作用和价值的详细分析: 一、MES系统适用的行业 电子信息行业: 随着市…

大功率LED模块(5V STM32)

目录 一、介绍 二、模块原理 1.尺寸介绍 2.模块原理图 3.引脚描述 三、程序设计 main.c文件 timer.h文件 timer.c文件 四、实验效果 五、资料获取 项目分享 一、介绍 大功率LED模块是一种照明设备,它将大功率高亮度发光二极管(LED)集成在铝基板上&…

Linux学习笔记(二):深入理解用户管理、运行级别与命令行操作

Linux学习笔记(二):深入理解用户管理、运行级别与命令行操作 Linux学习笔记(一):Linux学习环境的安装及远程连接工具的使用 1. 用户管理 1.1 用户密码管理 创建用户密码 使用 passwd 命令可以为指定用户…

封装了一个iOS水平方向动态宽度layout

我们有时候会遇到这样的情形,就是需要展示一些动态的标签,宽度是动态的, 水平方向是一行,其实这种情况还是比较容易处理的,只是一下子想不起来, 这里做了一个相关的需求,将思路和代码记录下来&a…

第5章 总线与微命令实验

第5章 总线与微命令实验 5.1实验目的 (1)理解总线的概念和作用。 (2)连接运算器与存储器,熟悉计算机的数据通路。 (3)理解微命令与微操作的概念。 5.2实验要求 (1)做…

69 BERT预训练_by《李沐:动手学深度学习v2》pytorch版

系列文章目录 文章目录 系列文章目录NLP里的迁移学习Bert的动机Bert架构对输入的修改五、预训练任务1、2、3、 六、1、2、3、 七、1、2、3、 八、1、2、3、 NLP里的迁移学习 之前是使用预训练好的模型来抽取词、句子的特征,例如 word2vec 或语言模型这种非深度学习…

香酥胡麻饼 一口沦陷的传统美食

🥯美食发现 | 胡麻饼,一口咬出的千年韵味🥯😋宝子们,今天我要给大家分享一款超级有历史底蕴的美食 —— 食家巷胡麻饼。 ✨食家巷胡麻饼,那可是有着悠久历史的传统美食。在古代,它就备受人们喜…

【算法】链表:160.相交链表(easy)+双指针

系列专栏 《分治》 《模拟》 《Linux》 目录 1、题目链接 2、题目介绍 3、解法(双指针) 返回结果 算法正确性 时间复杂度 4、代码 1、题目链接 160. 相交链表 - 力扣(LeetCode) 2、题目介绍 ​ 3、解法(…

MISC -第十天(音符加解密、敲击码、NtfsStreamsEditor工具)

前言 各位师傅大家好,我是qmx_07,今天继续讲解MISC的相关知识 [MRCTF2020]你能看懂音符吗 附件信息: rar文件无法打开,显示损坏,先放到hxd查看 头标识错误,尝试修复 rar标识头(52 61 72 21) 压缩包里有一个d…

4个顶级的大模型推理引擎

LLM 在文本生成应用中表现出色,例如具有高理解度和流畅度的聊天和代码完成模型。然而,它们的庞大规模也给推理带来了挑战。基本推理速度很慢,因为 LLM 会逐个生成文本标记,需要对每个下一个标记进行重复调用。随着输入序列的增长&…

【游戏模组】星际争霸1代模组燃烧之地,泰伦帝国对决UED。特效华丽兵种巨多特别好玩

各位星际争霸1代的粉丝大家好,今天小编给大家带来一个星际争霸1代的模组,这个模组的名字叫燃烧之地,主要是2个阵营。玩家可以扮演UED或者泰伦帝国中的一个来进行比赛。 这个模组设定的世界线背景是在异虫入侵并随后被星灵消灭后,…

昇思MindSpore进阶教程--梯度累加

大家好,我是刘明,明志科技创始人,华为昇思MindSpore布道师。 技术上主攻前端开发、鸿蒙开发和AI算法研究。 努力为大家带来持续的技术分享,如果你也喜欢我的文章,就点个关注吧 正文开始 本教程介绍梯度累加的训练算法…

C(十一)scanf、getchar(第三弹)

问题引入:如何实现输入一串密码,如:“123 xxxx” ,然后读取并确认,是 -- Y;否 -- N。 自然的,我们想到用scanf,但是在使用过程中你是否遇到跟我一样的困惑呢?如下&…

【Linux】进程管理:状态与优先级调度的深度分析

✨ 山海自有归期,风雨自有相逢 🌏 📃个人主页:island1314 🔥个人专栏:Linux—登神长阶 ⛺️ 欢迎关注:👍点赞 &#x1…

华为技术经理总结Java技术栈思维导图

最近帮在读大四的学生做了一个java项目,使用Spring bootVue3做了一个机器学习在线训练和服务的平台,发现他的Java技术栈并不全面,希望在下面分享一下Java技术栈,能够帮助到正在参加秋招或者是准备找工作的朋友们: 完整…

Vue.js 组件开发知识详解

✅作者简介:2022年博客新星 第八。热爱国学的Java后端开发者,修心和技术同步精进。 🍎个人主页:Java Fans的博客 🍊个人信条:不迁怒,不贰过。小知识,大智慧。 💞当前专栏…

为什么我可以做系统架构师?

人人都可以做产品经理,但是却没有人随随便便成为系统架构师,系统架构师为什么这么稀缺,因为系统架构师要求的能力是十分全面的,首先技术基本功要非常扎实,其次是你的统筹能力,一个项目交到你手上&#xff0…

软件管理【1.10】

软件管理【1.10】 13、软件管理13.1.包管理工具rpm13.2.yum和dnf13.2.1.配置yum源13.2.2.只下载安装包,不安装13.2.3.配置本地光盘ISO文件安装13.2.4.配置阿里云epel源13.2.5.base-for-centos7.repo13.2.6.base-for-rocky8.repo13.3.搭建私有yum仓库13.3.1.Apache实现网战功能…

力扣题解 1928

题目描述(困难) 规定时间内到达终点的最小费用 一个国家有 n 个城市,城市编号为 0 到 n - 1 ,题目保证 所有城市 都由双向道路 连接在一起 。道路由二维整数数组 edges 表示,其中 edges[i] [xi, yi, timei] 表示城市…

提示词工程实践

本讲义主要以text2SQL为场景进行讲解,包括提示词的编写和闭源模型调用、本地开源模型部署调用以及基于开源模型的微调。 回顾下大模型学习思路: 1. 开发环境准备 1.1 代码 代码位于【算法管理】->【公共算法】->【prompts_engineering_04】&…