Spark AQE

news2025/1/24 5:00:51

Spark AQE

  • AQE/RBO/CBO
  • AQE
  • AQE特点
    • Join 策略调整
    • 自动分区合并
    • 自动倾斜处理

Spark 3.0 添加了自适应查询执行(AQE)、动态分区剪裁(DPP)、扩展的 Join Hints

AQE/RBO/CBO

Spark SQL 2.0前,仅支持启发式、静态的优化过程。RBO (Rule Based Optimization,基于规则的优化,启发式的优化):基于一些规则和策略实现(谓词下推、列剪枝),这些规则和策略来源于数据库领域已有的应用经验。即:启发式的优化是一种经验主义

Spark SQL 2.2 后,推出 CBO (Cost Based Optimization,基于成本的优化)
CBO 特点 : “实事求是”,基于数据表的统计信息(如表大小、数据列分布)来选择优化策略。一般 CBO 优于 RBO

RBO/CBO 缺点 : 窄、慢、静

  • 窄: 适用面太窄,CBO 仅支持注册到 Hive Metastore 的数据表,但很多数据源是存储在分布式文件系统的各类文件(Parquet、ORC、CSV)
  • 慢: 统计信息的搜集效率较低。对注册到 Hive Metastore 的数据表,用户需要调用 ANALYZE TABLE COMPUTE STATISTICS 来收集统计信息,而各类信息的收集会消耗大量时间
  • 静: 静态优化。CBO 会结合各类统计信息制定执行计划,CBO只执行计划交付运行。即:运行时数据分布发生动态变化,CBO 执行计划并不会跟着调整

AQE

Spark 3.0 推出了 AQE (Adaptive Query Execution,自适应查询执行)

AQE 是 Spark SQL 的一种动态优化机制,在运行时,每当 Shuffle Map 阶段执行完毕,AQE 都会结合这个阶段的统计信息,基于既定的规则动态地调整、修正尚未执行的逻辑计划和物理计划,来完成对原始查询语句的运行时优化

  • AQE 优化机制触发的时机是 Shuffle Map 阶段执行完毕。即:AQE 次数与Shuffle 的次数一致。无 Shuffle , AQE 就不会触发。
  • AQE 依赖优化 Shuffle Map 阶段输出的中间文件的统计信息(每个 data 文件的大小、空文件数量与占比、每个 Reduce Task 对应的分区大小)
  • Shuffle 的每个 Map Task 会输出中间文件(data 的数据文件/index 的索引文件)

在这里插入图片描述

AQE 的优化决策分别作用:逻辑计划/物理计划,AQE 分:1 个逻辑优化规则/ 3 个物理优化策略:

优化类型规则与策略AQE特性统计信息
逻辑计划DemoteBroadcastHashJoinJoin策略调整Map 阶段中间文件总大小
物理计划OptimizeLocalShuffleReaderJoin策略调整中间文件空文件占比
物理计划CoalesceShufflePartitions自动分区合并每个 Reduce Task 分区大小
物理计划OptimizeSkewedJoin自动倾斜处理每个 Reduce Task 分区大小

AQE特点

AQE 的三大特性:

  • Join 策略调整:当某张表过滤后,尺寸小于广播变量阈值,该表参与的数据关联就会从 Shuffle Sort Merge Join 到 Broadcast Hash Join
  • 自动分区合并:Shuffle 后,Reduce Task 数据分布过小,AQE 会自动合并过小的数据分区
  • 自动倾斜处理:AQE 自动拆分 Reduce 过大的数据分区,降低单个 Reduce Task 的工作负载

Join 策略调整

Join 策略调整涉及1个逻辑规则/1个物理策略,分别是 DemoteBroadcastHashJoin /OptimizeLocalShuffleReader

DemoteBroadcastHashJoin 的作用:把 Shuffle Joins 降为 Broadcast Joins。注意:仅适用 Shuffle Sort Merge Join。

Join 的两个表分别完成 Shuffle Map 后, DemoteBroadcastHashJoin 会判断该中间文件是否满足如下条件:

  • 中间文件尺寸总和小于广播阈值: spark.sql.autoBroadcastJoinThreshold
  • 空文件占比小于配置项 : spark.sql.adaptive.nonEmptyPartitionRatioForBroadcastJoin
  • 只要一张表的统计信息满足这两个条件, Shuffle Sort Merge Join 就会降为 Broadcast Hash Join

当两张表都超过了广播阈值时,Spark SQL 最初的执行计划会选择 Sort Merge Join。AQE 会判断Shuffle Map 的中间文件,是否能降为 Broadcast Join。为了避免 Reduce 数据在网络中的全量分发,采取 OptimizeLocalShuffleReader :Reduce Task 从读取本地节点(Local)的中间文件,完成与广播小表的关联操作

OptimizeLocalShuffleReader 物理策略的生效配置: spark.sql.adaptive.localShuffleReader.enabled=True

自动分区合并

分区合并的原理:当 Reduce Task 从全网把数据分片拉回,AQE 按照分区编号的顺序,依次把小于目标尺寸的分区合并在一起

在这里插入图片描述

目标分区尺寸的两个参数:

  • 分区合并后的推荐尺寸:spark.sql.adaptive.advisoryPartitionSizeInBytes
  • 分区合并后,分区数不能低于该值:spark.sql.adaptive.coalescePartitions.minPartitionNum

在 Shuffle Map 完成后, AQE 触发, CoalesceShufflePartitions 策略会添加到物理计划中

自动倾斜处理

自动倾斜处理的原理:当 Reduce Task 的分区大于一定阈值时,利用 OptimizeSkewedJoin 策略,AQE 会把大分区拆成多个小分区

倾斜分区/拆分粒度的决定配置项:

  • 倾斜的膨胀系数:spark.sql.adaptive.skewJoin.skewedPartitionFactor
  • 倾斜的最低阈值:spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes
  • 拆分粒度,以字节为单位:spark.sql.adaptive.advisoryPartitionSizeInBytes

自动倾斜处理的局限:在同一 Executor 内,由一个 Task 处理的大分区,被 AQE 拆成多个小分区并交给多个 Task 计算。Task 之间的计算负载就能平衡。但是不能解决不同 Executors 之间的负载均衡问题

例子:Shuffle的 Map 阶段有 3 个分区,Reduce 阶段有 4 个分区。但4 个分区中有两个都是倾斜的大分区,而且这两个倾斜的大分区刚好都分发到了 Executor 0。
尽管两个大分区被拆分,但整个作业的主要负载还在 Executor 0 上。Executor 0 的计算能力依然是整个作业的瓶颈,这点并没有因为分区拆分而解决

在这里插入图片描述

例子:Join 的两张表(表1/表2),如果表 1 有数据倾斜,表 2 无倾斜,关联时,AQE 要对表1拆分,还要对表2的数据分区做复制,来保证关联关系不被破坏

在这里插入图片描述

如果表1/表 2都有数据倾斜,为了不破坏逻辑上的关联关系,表1/表2 拆分出的分区还要各自复制一份,左表拆出 M 个分区,右表拆出 N 个分区,那每张表都需要M x N 个分区数据,才能保证关联逻辑的一致性。当 M/N 逐渐变大时, AQE 处理数据倾斜的计算开销会很大

在这里插入图片描述

  • 当简单的数据倾斜(如: 有倾斜但数据分布均匀/只有一边倾斜),完全可以依赖 AQE 的自动倾斜处理机制
  • 但当数据倾斜变得复杂(如: 数据的不同 Key 的分布悬殊/两表有大量的倾斜),就需要衡量 AQE 的自动化机制或手工处理倾斜

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/392924.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

mybatis面试题 一

一、MyBatis工作原理? 1、 创建SqlSessionFactory 2、 通过SqlSessionFactory创建SqlSession 3、 通过sqlsession执行数据库操作 4、 调用session.commit()提交事务 5、 调用session.close()关闭会话 1)读取 MyBatis 配置文件:mybatis-c…

zynq7000系列芯片介绍

ZYNQ从架构上可以划分为两大模块,一个是PS(处理器系统),另一个是PL(可编程逻辑) PS由APU、内存接口、IO外设、互连线4大模块组成。 1、APU(Application Processor Unit)应用处理单元 即PS【可编…

面向对象设计模式:行为型模式之状态模式

文章目录一、引入二、状态模式2.1 Intent 意图2.2 Applicability 适用性2.3 类图2.4 Collaborations 合作2.5 Implementation 实现2.5 状态模式与策略模式的对比2.5 状态模式实例:糖果机2.6 状态模式实例:自行车升降档一、引入 State Diagram 状态图&am…

docker数据卷及软件部署方式

目录 一、docker数据卷管理 1.docker数据卷概念 2.数据卷命令 3.创建容器并挂载数据卷 二、docker软件部署 1.docker部署mysql方式 下载MySQL5.7镜像文件 创建所需的数据卷目录 创建mysql容器并挂载数据卷 进入数据库授权远程连接用户访问 2. docker部署nginx方式 下…

C语言-基础了解-16-C字符串

C字符串 一、C字符串 在 C 语言中,字符串实际上是使用空字符 \0 结尾的一维字符数组。因此,\0 是用于标记字符串的结束。 空字符(Null character)又称结束符,缩写 NUL,是一个数值为 0 的控制字符&#x…

剑指 Offer 31. 栈的压入、弹出序列

一、题目描述 输入两个整数序列,第一个序列表示栈的压入顺序,请判断第二个序列是否为该栈的弹出顺序。假设压入栈的所有数字均不相等。 例如,序列 {1,2,3,4,5} 是某栈的压栈序列,序列 {4,5,3,2,1} 是该压栈序列对应的一个弹出序列…

centos7 安装 hyperf

​​​​​​PHP > 7.4 Swoole PHP 扩展 > 4.5,并关闭了 Short Name OpenSSL PHP 扩展 JSON PHP 扩展 PDO PHP 扩展 Redis PHP 扩展 Protobuf PHP 扩展 composer create-project hyperf/hyperf-skeleton 推荐安装项 全部选n php.ini [swoole] extens…

LQB小板焊接V3版本的小板原理图,PCB图,注意事项和步骤

第一部分,这个部分,可以不焊接,直接用买的下载器进行下载代码,外接一个下载器,网上大概是10元左右,以后学习stm32的芯片的时候,这个下载器就是一个串口转换器,也可以使用。。 当然也…

realloc也可以缩容了??

作者:小树苗渴望变成参天大树 作者宣言:认真写好每一篇博客 作者gitee:gitee 如 果 你 喜 欢 作 者 的 文 章 ,就 给 作 者 点 点 关 注 吧! realloc的细节前言一、realloc的原地扩容和异地扩容二、关于realloc是否可以缩容问题前…

快速搭建本地服务器

一、anywhere 1、npm install anywhere -g 2、打开位于文件夹下的终端页,输入anywhere 9999 9999这里是设置端口号,端口号自行设置,也可以不输入xxx会默认8080端口号 二、http-server 1、npm install http-server -g 2、打开位于文件夹下…

[golang]Go语言从入门到实践-反射

反射三定律: 1.变量---->反射变量 2.变量---->反射变量---->接口 3.变量----->(通过取地址&)反射变量---->修改变量的值 反射的类型和种类: 切片、集合、结构体、指针、函数与反射...... 总结: 内置包函数 reflect 的…

darknet测试yolo

原文链接:https://wangguo.site/posts/38432.html YOLO: Real-Time Object Detection 1、下载yolo权重文件 mkdir model #新建文件夹放权重文件 cd model wget https://pjreddie.com/media/files/yolov3.weights2、测试图片 执行命令 ./darknet detect cfg/yolo…

(socket编程实验中遇到的问题)connect error no route to host

在编写网络编程的时候遇到了这个问题connect error no route to host socket编程(服务端与客户端) 上网一搜全是: 两台机器进行socket通信时,可能在连接时出现错误: connect error: No route to host(errno:113) 出…

产品新人如何培养产品思维?

什么是产品思维?其实很难定义,不同人有不同的定义。有的人定义为以用户为中心打磨一个完美体验的产品;有的定义为从需求调研到需求上线各个步骤需要思考的点,等等。本文想讨论的产品思维是:怎么去发现问题,…

23.3.6打卡 AtCoder Beginner Contest 277 A~D

E题最短路有点生疏了先不写, 之后再补 A 题意 给出一个排列和X 问X在排列中出现的下标是多少 代码 void solve() {cin>>n>>m;for(ll i1;i<n;i) {cin>>arr[i];if(arr[i]m) ansi;}cout<<ans<<endl;return; }B题 题意 这机翻翻译的挺正确的…

5.3中断系统中的设备树——中断号的演变与irq_domain

通过上一节我们知道&#xff0c;在内核中有一个irq_desc数组&#xff0c;数组里面的每一项对应一个中断&#xff0c;数组的下标就是对应中断的虚拟中断号&#xff08;virq&#xff09;。 假设只有一个中断控制器&#xff0c;有32个中断&#xff0c;那么中断和irq_desc数组可以…

654. 最大二叉树

题目 leetcode题目地址 给定一个不重复的整数数组 nums 。 最大二叉树 可以用下面的算法从 nums 递归地构建: 创建一个根节点&#xff0c;其值为 nums 中的最大值。 递归地在最大值 左边 的 子数组前缀上 构建左子树。 递归地在最大值 右边 的 子数组后缀上 构建右子树。 返…

项目黑马面面-学科列表-增删改查

查 1.布局2.定义api3.导入api4.进入页面就调用api5.获取数据6.存储并渲染7.与分页建立关联a.请求参数值要与分页组件绑定b.total值存储并绑定到分页组件c.页码改变与页容量改变都要请求api1.布局 <template><div><el-card><el-form :inline"true&q…

C语言例程:猜数字游戏

猜数字游戏 实现一个简单的猜数字游戏&#xff0c;学习 while 循环语句的用法。 实例解析 while 循环语句 while 语句的一般形式为&#xff1a; while(表达式)语句; 其中表达式是循环条件&#xff0c;语句为循环体。 while 语句的语义是&#xff1a;计算表达式的值&#xf…

Vue使用ElemenUI对table的指定列进行合算

前言 最近有一个想法&#xff0c;就是记录自己花销的时候&#xff0c;table中有一项内容是花销的金额。然后想在table的底部有一项内容是该金额的总计。 然后我就顺着elemetui的table组件寻找相关的demo&#xff0c;还真发现了一个这样的demo。 对于这个demo&#xff0c;官方…