Spark3.0中的AOE、DPP和Hint增强

news2024/11/16 23:50:00

1 Spark3.0 AQE

Spark 在 3.0 版本推出了 AQE(Adaptive Query Execution),即自适应查询执行。AQE 是 Spark SQL 的一种动态优化机制,在运行时,每当 Shuffle Map 阶段执行完毕,AQE 都会结合这个阶段的统计信息,基于既定的规则动态地调整、修正尚未执行的逻辑计划和物理计划,来完成对原始查询语句的运行时优化。

1.1 动态合并分区

在Spark中运行查询处理非常大的数据时,shuffle通常会对查询性能产生非常重要的影响。shuffle是非常昂贵的操作,因为它需要进行网络传输移动数据,以便下游进行计算。

最好的分区取决于数据,但是每个查询的阶段之间的数据大小可能相差很大,这使得该数字难以调整:

(1)如果分区太少,则每个分区的数据量可能会很大,处理这些数据量非常大的分区,可能需要将数据溢写到磁盘(例如,排序和聚合),降低了查询。

(2)如果分区太多,则每个分区的数据量大小可能很小,读取大量小的网络数据块,这也会导致I/O效率低而降低了查询速度。拥有大量的task(一个分区一个task)也会给Spark任务计划程序带来更多负担。

 为了解决这个问题,我们可以在任务开始时先设置较多的shuffle分区个数,然后在运行时通过查看shuffle文件统计信息将相邻的小分区合并成更大的分区。

例如,假设正在运行select max(i) from tbl group by j。输入tbl很小,在分组前只有2个分区。那么任务刚初始化时,我们将分区数设置为5,如果没有AQE,Spark将启动五个任务来进行最终聚合,但是其中会有三个非常小的分区,为每个分区启动单独的任务这样就很浪费。

取而代之的是,AQE将这三个小分区合并为一个,因此最终聚只需三个task而不是五个

spark-submit --master yarn --deploy-mode client --driver-memory 1g --num-executors 3 --executor-cores 2 --executor-memory 2g  --class com.atguigu.sparktuning.aqe.AQEPartitionTunning spark-tuning-1.0-SNAPSHOT-jar-with-dependencies.jar 

结合动态申请资源:

spark-submit --master yarn --deploy-mode client --driver-memory 1g --num-executors 3 --executor-cores 2 --executor-memory 2g  --class com.atguigu.sparktuning.aqe.DynamicAllocationTunning spark-tuning-1.0-SNAPSHOT-jar-with-dependencies.jar 

1.2 动态切换Join策略

Spark支持多种join策略,其中如果join的一张表可以很好的插入内存,那么broadcast shah join通常性能最高。因此,spark join中,如果小表小于广播大小阀值(默认10mb),Spark将计划进行broadcast hash join。但是,很多事情都会使这种大小估计出错(例如,存在选择性很高的过滤器),或者join关系是一系列的运算符而不是简单的扫描表操作。

为了解决此问题,AQE现在根据最准确的join大小运行时重新计划join策略。从下图实例中可以看出,发现连接的右侧表比左侧表小的多,并且足够小可以进行广播,那么AQE会重新优化,将sort merge join转换成为broadcast hash join。

 对于运行是的broadcast hash join,可以将shuffle优化成本地shuffle,优化掉stage 减少网络传输。Broadcast hash join可以规避shuffle阶段,相当于本地join。

spark-submit --master yarn --deploy-mode client --driver-memory 1g --num-executors 3 --executor-cores 4 --executor-memory 2g  --class com.atguigu.sparktuning.aqe.AqeDynamicSwitchJoin spark-tuning-1.0-SNAPSHOT-jar-with-dependencies.jar 

1.3 动态优化Join倾斜

当数据在群集中的分区之间分布不均匀时,就会发生数据倾斜。严重的倾斜会大大降低查询性能,尤其对于join。AQE skew join优化会从随机shuffle文件统计信息自动检测到这种倾斜。然后它将倾斜分区拆分成较小的子分区。

 例如,下图 A join B,A表中分区A0明细大于其他分区

因此,skew join 会将A0分区拆分成两个子分区,并且对应连接B0分区

 没有这种优化,会导致其中一个分区特别耗时拖慢整个stage,有了这个优化之后每个task耗时都会大致相同,从而总体上获得更好的性能。

可以采取第4章提到的解决方式,3.0有了AQE机制就可以交给Spark自行解决。Spark3.0增加了以下参数。

1)spark.sql.adaptive.skewJoin.enabled  :是否开启倾斜join检测,如果开启了,那么会将倾斜的分区数据拆成多个分区,默认是开启的,但是得打开aqe。

2)spark.sql.adaptive.skewJoin.skewedPartitionFactor :默认值5,此参数用来判断分区数据量是否数据倾斜,当任务中最大数据量分区对应的数据量大于的分区中位数乘以此参数,并且也大于spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes参数,那么此任务是数据倾斜。

3)spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes :默认值256mb,用于判断是否数据倾斜

4)spark.sql.adaptive.advisoryPartitionSizeInBytes :此参数用来告诉spark进行拆分后推荐分区大小是多少。

spark-submit --master yarn --deploy-mode client --driver-memory 1g --num-executors 3 --executor-cores 4 --executor-memory 2g  --class com.atguigu.sparktuning.aqe.AqeOptimizingSkewJoin spark-tuning-1.0-SNAPSHOT-jar-with-dependencies.jar 

如果同时开启了spark.sql.adaptive.coalescePartitions.enabled动态合并分区功能,那么会先合并分区,再去判断倾斜,将动态合并分区打开后,重新执行:

spark-submit --master yarn --deploy-mode client --driver-memory 1g --num-executors 3 --executor-cores 4 --executor-memory 2g  --class com.atguigu.sparktuning.aqe.AqeOptimizingSkewJoin spark-tuning-1.0-SNAPSHOT-jar-with-dependencies.jar 

修改中位数的倍数为2重新执行

spark-submit --master yarn --deploy-mode client --driver-memory 1g --num-executors 3 --executor-cores 4 --executor-memory 2g  --class com.atguigu.sparktuning.aqe.AqeOptimizingSkewJoin spark-tuning-1.0-SNAPSHOT-jar-with-dependencies.jar 

2 Spark3.0 DPP

Spark3.0支持动态分区裁剪Dynamic Partition Pruning,简称DPP,核心思路就是先将join一侧作为子查询计算出来,再将其所有分区用到join另一侧作为表过滤条件,从而实现对分区的动态修剪。如下图所示

 将select t1.id,t2.pkey from t1 join t2 on t1.pkey =t2.pkey and t2.id<2 优化成了select t1.id,t2.pkey from t1 join t2 on t1.pkey=t2.pkey and t1.pkey in(select t2.pkey from t2 where t2.id<2)

触发条件:

(1)待裁剪的表join的时候,join条件里必须有分区字段

(2)如果是需要修剪左表,那么join必须是inner join ,left semi join或right join,反之亦然。但如果是left out join,无论右边有没有这个分区,左边的值都存在,就不需要被裁剪

(3)另一张表需要存在至少一个过滤条件,比如a join b on a.key=b.key and a.id<2

参数spark.sql.optimizer.dynamicPartitionPruning.enabled 默认开启。

spark-submit --master yarn --deploy-mode client --driver-memory 1g --num-executors 3 --executor-cores 4 --executor-memory 2g  --class com.atguigu.sparktuning.dpp.DPPTest spark-tuning-1.0-SNAPSHOT-jar-with-dependencies.jar 

3 Spark3.0 Hint增强

在spark2.4的时候就有了hint功能,不过只有broadcasthash join的hint,这次3.0又增加了sort merge join,shuffle_hash join,shuffle_replicate nested loop join。

Spark的5种Join策略:https://www.cnblogs.com/jmx-bigdata/p/14021183.html

3.1 broadcasthast join

sparkSession.sql("select /*+ BROADCAST(school) */ *  from test_student student left join test_school school on student.id=school.id").show()

sparkSession.sql("select /*+ BROADCASTJOIN(school) */ *  from test_student student left join test_school school on student.id=school.id").show()

sparkSession.sql("select /*+ MAPJOIN(school) */ *  from test_student student left join test_school school on student.id=school.id").show()

3.2 sort merge join

sparkSession.sql("select /*+ SHUFFLE_MERGE(school) */ *  from test_student student left join test_school school on student.id=school.id").show()

sparkSession.sql("select /*+ MERGEJOIN(school) */ *  from test_student student left join test_school school on student.id=school.id").show()

sparkSession.sql("select /*+ MERGE(school) */ *  from test_student student left join test_school school on student.id=school.id").show()

3.3 shuffle_hash join

sparkSession.sql("select /*+ SHUFFLE_HASH(school) */ *  from test_student student left join test_school school on student.id=school.id").show()

3.4 shuffle_replicate_nl join

使用条件非常苛刻,驱动表(school表)必须小,且很容易被spark执行成sort merge join。

sparkSession.sql("select /*+ SHUFFLE_REPLICATE_NL(school) */ *  from test_student student inner join test_school school on student.id=school.id").show()

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1205568.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

[工业自动化-19]:西门子S7-15xxx编程 - 软件编程 - PLC程序块、组织块OB与PLC多线程原理、OB、FC、FB、DB

目录 一、PLC的块的种类 1.1 什么是块 1.2 块的种类 1.3 不同块之间的相互调用关系 1.4 OB、FC、FB和DB 二、PLC程序组织块OB 2.1 什么是程序块OB 2.2 为什么需要程序块OB 2.3 PLC有哪些程序块 2.4 如何使用程序块 - OB块的执行顺序和规则 2.5 PLC用户程序主函数&am…

2023.11.12使用flask对图片进行黑白处理(base64编码方式传输)

2023.11.12使用flask对图片进行黑白处理&#xff08;base64编码方式传输&#xff09; 由前端输入图片并预览&#xff0c;在后端处理图片后返回前端显示&#xff0c;可以作为图片处理的模板。 关键点在于对图片进行base64编码的转化。 使用Base64编码可以更方便地将图片数据嵌入…

sass 生成辅助色

背景 一个按钮往往有 4 个状态。 默认状态hover鼠标按下禁用状态 为了表示这 4 个状态&#xff0c;需要设置 4 个颜色来提示用户。 按钮类型一般有 5 个&#xff1a; 以 primary 类型按钮为例&#xff0c;设置它不同状态下的颜色&#xff1a; <button class"btn…

【Android】配置Gradle打包apk的环境

目录 生成jks签名文件 配置build.gradle&#xff08;app&#xff09; 打包 生成jks签名文件 Java 密钥库&#xff08;.jks 或 .keystore&#xff09;是用作证书和私钥存储库的二进制文件。用于为用户设备上安装的 APK 签名的密钥。 详细解释请看官方文档&#xff1a; 为应用…

OpenCV踩坑笔记使用笔记入门笔记整合SpringBoot笔记大全

springboot开启摄像头抓拍照片并上传实现&问题记录 NotAllowedErrot: 请求的媒体源不能使用&#xff0c;以下情况会返回该错误: 当前页面内容不安全&#xff0c;没有使用HTTPS没有通过用户授权NotFoundError: 没有找到指定的媒体通道NoReadableError: 访问硬件设备出错Ov…

Linux线程创建,退出,等待

目录​​​​​​​ 一 为什么使用线程 1.1概念 1.2使用线程的理由 二 线程的创建&#xff0c;退出&#xff0c;等待 2.1 线程创建 2.2 线程退出 2.3.线程等待 2.4. 线程ID获取及比较 一 为什么使用线程 1.1概念 概念&#xff1a;"进程——资源分配的最小单位&…

【算法训练-链表 零】链表高频算法题看这一篇就够了

一轮的算法训练完成后&#xff0c;对相关的题目有了一个初步理解了&#xff0c;接下来进行专题训练&#xff0c;以下这些题目就是汇总的高频题目 题目题干直接给出对应博客链接&#xff0c;这里只给出简单思路、代码实现、复杂度分析 反转链表 依据难度等级分别为反转链表、…

2023数字科技生态展,移远通信解锁新成就

11月10日&#xff0c;以“数字科技&#xff0c;焕新启航”为主题的中国电信2023数字科技生态大会暨2023数字科技生态展在广州盛大启幕。作为物联网行业的龙头标杆&#xff0c;同时更与中国电信连续多年维持稳定友好的合作关系&#xff0c;移远通信受邀参加本次展会。 在本次展会…

Docker - DockerFile

Docker - DockerFile DockerFile 描述 dockerfile 是用来构建docker镜像的文件&#xff01;命令参数脚本&#xff01; 构建步骤&#xff1a; 编写一个dockerfile 文件docker build 构建成为一个镜像docker run 运行脚本docker push 发布镜像&#xff08;dockerhub&#xff0…

你真的会使用 MySQL中EXPLAIN吗

EXPLAIN是MySQL数据库中一个强大的工具&#xff0c;用于查询性能分析和优化。通过EXPLAIN&#xff0c;你可以查看MySQL查询的执行计划&#xff0c;了解MySQL是如何执行你的查询语句的。这篇文章将详细介绍EXPLAIN的使用&#xff0c;帮助你更好地理解和优化MySQL查询。 为什么使…

卫星通信和800MHz双管齐下,中国电信对中国移动发起新挑战

依靠国内某科技企业的宣传&#xff0c;卫星通信大热&#xff0c;中国电信也由此成为受益者&#xff0c;日前中国电信又大举招标25万座800MHz 5G基站&#xff0c;显示出中国电信积极以技术优势挑战中国移动。 一、中国电信急起直追 自从4G时代以来&#xff0c;中国电信就在国内通…

web3 React dapp进行事件订阅

好啊&#xff0c;上文web3 React Dapp书写订单 买入/取消操作 我们已经写好了 填充和取消订单 这就已经是非常大的突破了 但是 留下了一个问题 那就是 我们执行完之后 订单的数据没有直接更新 每次都需要我们手动刷新 才能看到结果 那么 今天我们就来看解决这个问题的事件订阅 …

ISP图像处理Pipeline

参考&#xff1a;1. 键盘摄影(七)——深入理解图像信号处理器 ISP2. Understanding ISP Pipeline3. ISP图像处理流程介绍4. ISP系统综述5. ISP(图像信号处理)之——图像处理概述6. ISP 框架7. ISP(图像信号处理)算法概述、工作原理、架构、处理流程8. ISP全流程简介9. ISP流程介…

spring boot中使用Bean Validation做优雅的参数校验

一、Bean Validation简介 Bean Validation是Java定义的一套基于注解的数据校验规范&#xff0c;目前已经从JSR 303的1.0版本升级到JSR 349的1.1版本&#xff0c;再到JSR 380的2.0版本&#xff08;2.0完成于2017.08&#xff09;&#xff0c;目前最新稳定版2.0.2&#xff08;201…

互联网Java工程师面试题·微服务篇·第二弹

目录 18、什么是 Spring 引导的执行器&#xff1f; 19、什么是 Spring Cloud&#xff1f; 20、Spring Cloud 解决了哪些问题&#xff1f; 21、在 Spring MVC 应用程序中使用 WebMvcTest 注释有什么用处&#xff1f; 22、你能否给出关于休息和微服务的要点&#xff1f; 23、…

正点原子嵌入式linux驱动开发——Linux DAC驱动

上一篇笔记中学习了ADC驱动&#xff0c;STM32MP157 也有DAC外设&#xff0c;DAC也使用的IIO驱动框架。本章就来学习一下如下在Linux下使用STM32MP157上的DAC。 DAC简介 ADC是模数转换器&#xff0c;负责将外界的模拟信号转换为数字信号。DAC刚好相反&#xff0c;是数模转换器…

mysql数据库可以执行定时任务

在一些业务需要中&#xff0c;经常需要一些定时任务。如Java的schedule&#xff0c;nodejs的node-schedule等。今天第一次接触了使用数据库的存储过程来执行定时任务。 本篇文章以MySQL数据库为例&#xff0c;介绍通过数据库设置定时任务的方法。本文中以介绍操作过程为主&…

注册并实名认证华为开发者账号流程

文 | Promise Sun 1. 打开华为开发者网址&#xff1a; https://www.harmonyos.com 2.注册华为开发者账号&#xff1a; 1&#xff09;注册时可以选择手机号或者邮箱两种方式注册&#xff0c;建议选择手机号注册。 2&#xff09;根据提示填写信息注册即可。 3.开发者实名认证&am…

P6入门:项目初始化7-项目详情之代码/分类码Code

前言 使用项目详细信息查看和编辑有关所选项目的详细信息&#xff0c;在项目创建完成后&#xff0c;初始化项目是一项非常重要的工作&#xff0c;涉及需要设置的内容包括项目名&#xff0c;ID,责任人&#xff0c;日历&#xff0c;预算&#xff0c;资金&#xff0c;分类码等等&…

lc307.区域和检索 - 数组可修改

暴力解法 创建方法&#xff0c;通过switch-case判断所需要调用的方法。 public class RegionsAndSertches {public static void main(String[] args) {String[] str new String[]{"NumArray", "sumRange", "update", "sumRange"};i…