SparkSQL调优

news2024/9/17 7:25:54

SparkSQL调优

文章目录

  • SparkSQL调优
  • Explain 查看执行计划
    • 语法
    • 执行计划处理流程
  • 资源调优
    • 内存说明
      • spark任务提交到yarn上运行命令
    • CPU优化
  • SparkSQL语法优化
    • 基于RBO优化
    • 基于CBO优化
    • 广播join
      • 方式一:通过参数指定自动广播
      • 方式二:强行广播
    • SMB Join
  • 数据倾斜
    • Join 数据倾斜优化
      • 广播join
      • 拆分大 key 打散大表 扩容小表
  • Job 优化
    • Map 端优化
      • Map 端预聚合
      • 读取小文件优化
      • 增大 map 溢写时输出流 buffer
    • Reduce 端优化
      • 增大 reduce 缓冲区,减少拉取次数
      • 调节 reduce 端拉取数据重试次数
      • 调节 reduce 端拉取数据等待间隔
  • Spark AQE
    • 动态合并分区
    • 动态切换 Join 策略
    • 动态优化 Join 倾斜

Explain 查看执行计划

语法

sparkSession.sql("xxx").explain()
  • explain(mode=“simple”):只展示物理执行计划。
  • explain(mode=“extended”):展示物理执行计划和逻辑执行计划。
  • explain(mode=“codegen”) :展示要 Codegen 生成的可执行 Java 代码。
  • explain(mode=“cost”):展示优化后的逻辑执行计划以及相关的统计。
  • explain(mode=“formatted”):以分隔的方式输出,它会输出更易读的物理执行计划,并展示每个节点的详细信息。

执行计划处理流程

分析 – 逻辑优化 – 生成物理执行计划 – 评估模型分析 – 代码生成

  • == Parsed Logical Plan == :Unresolved 逻辑执行计划
    • Parser 组件检查 SQL 语法上是否有问题,然后生成 Unresolved(未决断)的逻辑计划,不检查表名、不检查列名
  • == Analyzed Logical Plan == :Resolved 逻辑执行计划
    • 通过访问 Spark 中的 Catalog 存储库来解析验证语义、列名、类型、表名等
  • == Optimized Logical Plan == :优化后的逻辑执行计划
    • Catalyst 优化器根据各种规则进行优化
  • == Physical Plan == :物理执行计划
    • HashAggregate 运算符表示数据聚合,一般 HashAggregate 是成对出现,第一个HashAggregate 是将执行节点本地的数据进行局部聚合,另一个 HashAggregate 是将各个分区的数据进一步进行聚合计算
    • Exchange 运算符其实就是 shuffle,表示需要在集群上移动数据。很多时候HashAggregate 会以 Exchange 分隔开来
    • Project 运算符是 SQL 中的投影操作,就是选择列(例如:select name, age…)
    • BroadcastHashJoin 运算符表示通过基于广播方式进行 HashJoin
    • LocalTableScan 运算符就是全表扫描本地的表

资源调优

内存说明

在这里插入图片描述

spark.memory.fraction=(估算 storage 内存+估算 Execution 内存)/(估算 storage 内存+估算 Execution 内存+估算 Other 内存)

spark.memory.storageFraction =(估算 storage 内存)/(估算 storage 内存+估算Execution 内存)

Storage 堆内内存=(spark.executor.memory–300MB)spark.memory.fractionspark.memory.storageFraction

Execution 堆内内存=(spark.executor.memory–300MB)spark.memory.fraction(1-spark.memory.storageFraction)

spark任务提交到yarn上运行命令

${SPARK_DIR}/bin/spark-submit \
--master yarn \
--deploy-mode cluster \
--queue root.default \
--driver-memory 4g \
--executor-memory 8g \
--num-executors 6 \
--executor-cores 2 \
--conf spark.sql.shuffle.partitions=2000 \
--conf spark.sql.hive.caseSensitiveInferenceMode=NEVER_INFER \
--conf spark.executor.extraJavaOptions='-XX:+UseG1GC -XX:+PrintGCDetails -XX:+PrintGCTimeStamps' \
--class com.test.DataETLMain \
/opt/spark_sql_test.jar
参数说明
–queue任务提交到yarn的队列
–driver-memory每个 driver 的内存
–executor-cores每个 executor 的最大核数(3~6 之间比较合理 )
–num-executorsexecutor 的个数
–executor-memory每个 executor 的内存
–conf任务运行配置

CPU优化

修改并行度(分区个数)

  • rdd:spark.default.parallelism
    • 设置 RDD 的默认并行度,没有设置时,由 join、reduceByKey 和 parallelize 等转换决定
  • sparksql:spark.sql.shuffle.partitions
    • 适用 SparkSQL 时,Shuffle Reduce 阶段默认的并行度,默认 200。此参数只能控制Spark sql、DataFrame、DataSet 分区个数。不能控制 RDD 分区个数

SparkSQL语法优化

基于RBO优化

  • 谓词下推:将过滤条件的谓词逻辑都尽可能提前执行,减少下游处理的数据量
  • 列裁剪:列剪裁就是扫描数据源的时候,只读取那些与查询相关的字段
  • 常量替换

基于CBO优化

CBO 优化主要在物理计划层面,原理是计算所有可能的物理计划的代价,并挑选出代价最小的物理执行计划

通过spark.sql.cbo.enabled来开启,默认是 false

参数描述默认值
spark.sql.cbo.enabledCBO 总开关
true 表示打开,false 表示关闭
要使用该功能,需确保相关表和列的统计信息已经生成
false
spark.sql.cbo.joinReorder.enabled使用 CBO 来自动调整连续的 inner join 的顺序
true:表示打开
false:表示关闭要使用该功能,需确保相关表和列的统计信息已经生成,且CBO 总开关打开
false
spark.sql.cbo.joinReorder.dp.threshold使用 CBO 来自动调整连续 inner join 的表的个数阈值
如果超出该阈值,则不会调整 join 顺序
12

广播join

Spark join 策略中,如果当一张小表足够小并且可以先缓存到内存中,那么可以使用Broadcast Hash Join,其原理就是先将小表聚合到 driver 端,再广播到各个大表分区中,那么再次进行 join 的时候,就相当于大表的各自分区的数据与小表进行本地 join,从而规避了shuffle

方式一:通过参数指定自动广播

spark.sql.autoBroadcastJoinThreshold:广播join默认值 10M

可更改参数值:

  • 方式一:在程序里面添加参数值

    sparkConf.set("spark.sql.autoBroadcastJoinThreshold","20m")
    
  • 方式二:在执行命令配置中添加参数值

    --conf spark.sql.autoBroadcastJoinThreshold=20m
    

方式二:强行广播

使用Hint注解方式

//TODO SQL Hint方式
    val sqlstr1 =
      """
        |select /*+  BROADCASTJOIN(sc) */
        |  sc.courseid,
        |  csc.courseid
        |from sale_course sc join course_shopping_cart csc
        |on sc.courseid=csc.courseid
      """.stripMargin

    val sqlstr2 =
      """
        |select /*+  BROADCAST(sc) */
        |  sc.courseid,
        |  csc.courseid
        |from sale_course sc join course_shopping_cart csc
        |on sc.courseid=csc.courseid
      """.stripMargin

    val sqlstr3 =
      """
        |select /*+  MAPJOIN(sc) */
        |  sc.courseid,
        |  csc.courseid
        |from sale_course sc join course_shopping_cart csc
        |on sc.courseid=csc.courseid
      """.stripMargin

SMB Join

SMB JOIN 是 sort merge bucket 操作,需要进行分桶,首先会进行排序,然后根据 key值合并,把相同 key 的数据放到同一个 bucket 中(按照 key 进行 hash)。分桶的目的其实就是把大表化成小表。相同 key 的数据都在同一个桶中之后,再进行 join 操作,那么在联合的时候就会大幅度的减小无关项的扫描

使用条件:

  • 两表进行分桶,桶的个数必须相等
  • 两边进行 join 时,join 列=排序列=分桶列

数据倾斜

Join 数据倾斜优化

广播join

适用于小表 join 大表。小表足够小,可被加载进 Driver 并通过 Broadcast 方法广播到各个 Executor 中

**解决逻辑:**在小表 join 大表时如果产生数据倾斜,那么广播 join 可以直接规避掉此 shuffle 阶段。直接优化掉 stage。并且广播 join 也是 Spark Sql 中最常用的优化方案

拆分大 key 打散大表 扩容小表

适用于 join 时出现数据倾斜

解决逻辑:

  1. 将存在倾斜的表,根据抽样结果,拆分为倾斜 key(skew 表)和没有倾斜 key(common)的两个数据集
  2. 将 skew 表的 key 全部加上随机前缀,然后对另外一个不存在严重数据倾斜的数据集(old 表)整体与随机前缀集作笛卡尔乘积(即将数据量扩大 N 倍,得到 new 表)
  3. 打散的 skew 表 join 扩容的 new 表
  4. 将 skew 表的 key 去掉前缀

Job 优化

Map 端优化

Map 端预聚合

map-side 预聚合,就是在每个节点本地对相同的 key 进行一次聚合操作

读取小文件优化

读取的数据源有很多小文件,会造成查询性能的损耗,大量的数据分片信息以及对应产生的 Task 元信息也会给 Spark Driver 的内存造成压力,带来单点问题

设置参数:

# 一个分区最大字节数,默认 128m
spark.sql.files.maxPartitionBytes=128MB
# 打开一个文件的开销,默认 4m
spark.files.openCostInBytes=4194304

增大 map 溢写时输出流 buffer

  1. map 端 Shuffle Write 有一个缓冲区,初始阈值 5m,超过会尝试增加到 2*当前使用内存。如果申请不到内存,则进行溢写**(这个参数是 internal,指定无效,资源足够会自动扩容,所以不需要我们去设置)**
    spark.shuffle.spill.initialMemoryThreshold:5242880
  2. Shuffle 文件涉及到序列化,是采取批的方式读写,默认按照每批次 1 万条去读写**(这个参数是 internal,指定无效)**
    spark.shuffle.spill.batchSize:10000
  3. 溢写时使用输出流缓冲区默认 32k,这些缓冲区减少了磁盘搜索和系统调用次数,适当提高可以提升溢写效率
    spark.shuffle.file.buffer:32

Reduce 端优化

增大 reduce 缓冲区,减少拉取次数

Spark Shuffle 过程中,shuffle reduce task 的 buffer 缓冲区大小决定了 reduce task 每次能够缓冲的数据量,也就是每次能够拉取的数据量,如果内存资源较为充足,适当增加拉取数据缓冲区的大小,可以减少拉取数据的次数,也就可以减少网络传输的次数,进而提升性能

spark.reducer.maxSizeInFlight reduce 端数据拉取缓冲区的大小设置,默认为 48MB

调节 reduce 端拉取数据重试次数

Spark Shuffle 过程中,reduce task 拉取属于自己的数据时,如果因为网络异常等原因导致失败会自动进行重试

spark.shuffle.io.maxRetrie reduce 端拉取数据重试次数设置,该参数就代表了可以重试的最大次数。默认为 3

调节 reduce 端拉取数据等待间隔

Spark Shuffle 过程中,reduce task 拉取属于自己的数据时,如果因为网络异常等原因导致失败会自动进行重试,在一次失败后,会等待一定的时间间隔再进行重试,可以通过加大间隔时长(比如 60s),以增加 shuffle 操作的稳定性

spark.shuffle.io.retryWait reduce 端拉取数据等待间隔设置,默认值为 5s

Spark AQE

Spark 在 3.0 版本推出了 AQE(Adaptive Query Execution),即自适应查询执行。AQE 是Spark SQL 的一种动态优化机制,在运行时,每当 Shuffle Map 阶段执行完毕,AQE 都会结合这个阶段的统计信息,基于既定的规则动态地调整、修正尚未执行的逻辑计划和物理计划,来完成对原始查询语句的运行时优化

动态合并分区

动态切换 Join 策略

动态优化 Join 倾斜

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1883860.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

运维锅总详解RocketMQ

本文尝试从Apache RocketMQ的简介、主要组件及其作用、3种部署模式、Controller集群模式工作流程、最佳实践等方面对其进行详细分析。希望对您有所帮助! 一、Apache RocketMQ 简介 Apache RocketMQ 是一个开源的分布式消息中间件,由阿里巴巴集团开发并…

C++初学者指南-3.自定义类型(第一部分)-指针

C初学者指南-3.自定义类型(第一部分)-指针 文章目录 C初学者指南-3.自定义类型(第一部分)-指针1.为什么我们需要它们?2.T 类型的对象指针原始指针:T * 智能指针(C11) 3.操作符地址操作符 &解引用运算符 *成员访问操作符 ->语法重定向 4.nullptr (…

QT5:在窗口右上角显示图标

目录 一、环境与目标 二、实现逻辑(纯代码)与效果 三、参考代码 四、总结 一、环境与目标 qt版本:5.12.7 windows 11 下的 Qt Designer (已搭建) 目标:使用嵌套布局的方式将两个按钮显示在窗口右上角…

首款内置电源的迷你主机,不到千元的办公神器 | 零刻EQ13评测报告

零刻首款内置电源的迷你主机,不到千元的办公神器 | 零刻EQ13评测报告 哈喽小伙伴们好,我是Stark-C~ 众所周知,零刻作为目前国产迷你主机第一品牌,旗下系列众多,产线丰富,比如说它有针对游戏玩家的性能主机…

各类排序方法 归并排序 扩展练习 逆序对数量

七月挑战一个月重刷完Y总算法基础题,并且每道题写详细题解 进度:(3/106) 归并排序的思想也是分而治之 归并优点:速度稳定,排序也稳定 排序也稳定(数组中有两个一样的值,排序之后他们的前后顺序不发生变化,我们就说…

一句话介绍什么是AI智能体?

什么是AI智能体? 一句话说就是利用各种AI的功能的api组合,完成你想要的结果。 例如你希望完成一个关于主题为啤酒主题的小红书文案图片,那么它就可以完成 前面几个步骤类似automa的组件,最后生成一个结果。

手把手搞定报名亚马逊科技认证

引言 亚马逊云科技认证考试为我们这些技术从业者提供了提升专业技能的机会。无论选择线上还是线下考试,每种方式都有其独特的优势和挑战。选择合适的考试方式将帮助我们更好地展示自己的技术水平。以下是我对不同考试方式的优缺点介绍,以及各科目的考试…

tkinter显示图片

tkinter显示图片 效果代码解析打开和显示图像 代码 效果 代码解析 打开和显示图像 def open_image():file_path filedialog.askopenfilename(title"选择图片", filetypes(("PNG文件", "*.png"), ("JPEG文件", "*.jpg;*.jpeg&q…

哈希表(C++实现)

文章目录 写在前面1. 哈希概念2. 哈希冲突3. 哈希函数4.哈希冲突解决4.1 闭散列4.1.1 线性探测4.1.2 采用线性探测的方式解决哈希冲突实现哈希表4.1.3 二次探测 4.2 开散列4.2.2 采用链地址法的方式解决哈希冲突实现哈希表 写在前面 在我们之前实现的所有数据结构中(比如&…

CesiumJS【Basic】- #042 绘制纹理线(Primitive方式)

文章目录 绘制纹理线(Primitive方式)1 目标2 代码2.1 main.ts3 资源文件绘制纹理线(Primitive方式) 1 目标 使用Primitive方式绘制纹理线 2 代码 2.1 main.ts var start = Cesium.Cartesian3.fromDegrees(-75.59777, 40.03883);var

爬虫逆向实战(41)-某巢登陆(AES、MD5、RSA、滑块验证码)

一、数据接口分析 主页地址:某巢 1、抓包 通过抓包可以发现在登录时,网站首先请求captcha/querySlideImage/来获取滑块验证码的图片,然后请求captcha/checkCode/接口来验证滑块验证码。滑块验证码校验成功后,请求noshiro/getPu…

使用explain优化慢查询的业务场景分析

问:你最害怕的事情是什么?答:搓澡问:为什么?答:因为有些人一旦错过,就不在了 Explain 这个词在不同的上下文中有不同的含义。在数据库查询优化的上下文中,“EXPLAIN” 是一个常用的 …

矩阵置零解题

给定一个 m x n 的矩阵,如果一个元素为 0 ,则将其所在行和列的所有元素都设为 0 。请使用 原地 算法。 示例 1: 输入:matrix [[1,1,1],[1,0,1],[1,1,1]] 输出:[[1,0,1],[0,0,0],[1,0,1]]示例 2: 输入&…

UI(四)布局

文章目录 10、Navigator——路由器组件11、Pannel——可滑动面板12、Refresh——刷新组件13、RelativeContainer——相对布局组件14、Scroll——可滚动容器15、SideBarContainer——侧边栏容器16、Stack——堆叠容器17、Swiper——滑动块视图容器18、Tabs和TabContent——页签和…

mac英语学习工具:Eudic欧路词典 for Mac 激活版

Eudic欧路词典是一款非常受欢迎的英语学习软件,它提供了丰富的词汇解释、例句、同义词、反义词等功能,帮助用户更好地理解和掌握英语单词。 以下是Eudic欧路词典的一些主要特点: 海量词汇库:Eudic欧路词典拥有庞大的词汇库&#…

arm_uart4实验

#include "uart4.h" //UART //初始化 void hal_uart4_init() { //rcc_init //…

Google推出开源模型Gemma 2:性能大幅提升与创新训练方法

引言 近日,Google推出了开源模型Gemma 2,吸引了广大研究人员和开发者的关注。相比上一代模型,Gemma 2在性能和可用性方面实现了显著提升,提供了9B和27B两个版本,并且对外开放免费使用。本文将深入探讨Gemma 2的技术细…

QueryClientProvider is not defined

QueryClientProvider is not defined 运行一个svelte的项目,报错如上,前后查找解决不了,然后没办法, 本来是用yarn 安装的依赖,改用npm install,再次运行就成功了

可燃气体报警器定期检测:优化与改进策略的探讨

在现代化的工业环境中,可燃气体报警器的作用日益凸显。它们像是我们生产现场的安全卫士,时刻警惕着可能发生的危险,确保我们的工作环境安全、稳定。 然而,要确保这些“卫士”始终忠诚可靠,定期检测就显得尤为重要。 …

【JVM面试题】总结-01

【JVM面试题】总结-01 1. 介绍下Java内存区域(运行时数据区)1.1 程序计数器(线程私有)1.2 虚拟机栈(线程私有)1.3 本地方法栈(线程私有)1.4 Java堆(线程共享)1.5 方法区(线程共享)1.5.1 方法区和永久代的关系1.5.2 常用参数1.5.3 为什么要将永久代 (方法区) 替换为元空间 (Meta…