Spark性能调优指南来了!

news2025/1/15 23:23:19

1、什么是Spark

Spark 是一种基于内存的快速、通用、可扩展的大数据分析计算引擎。
在这里插入图片描述

Spark Core:实现了Spark的基本功能,包含任务调度、内存管理、错误恢复、与存储系统交互等模块。Spark Core中还包含了对弹性分布式数据集(Resilient Distributed DataSet,简称RDD)的 API 定义。
Spark SQL:是Spark用来操作结构化数据的程序包。通过 Spark SQL,我们可以使用 SQL或者Apache Hive版本的 HQL 来查询数据。Spark SQL支持多种数据源,比如 Hive 表、Parquet以及JSON等。

在这里插入图片描述

2、Spark Shuffle解析

2.1 HashShuffle

  1. 未经优化的 HashShuffleManager
    在这里插入图片描述
  2. 优化后的 HashShuffleManager
    在这里插入图片描述

2.2 SortShuffle

在这里插入图片描述

3、执行计划处理流程

先看下从一个 sql 转化成 Rdd 的过程:
在这里插入图片描述
核心的执行过程一共有 5 个步骤:
在这里插入图片描述
这些操作和计划都是 Spark SQL 自动处理的,会生成以下计划:
Unresolved 逻辑执行计划:Parsed Logical Plan
Parser 组件检查 SQL 语法上是否有问题,比如少写个逗号,少写个FROM,然后生成 Unresolved(未决断)的逻辑计划, 不检查表名、不检查列名。
Resolved 逻辑执行计划:Analyzed Logical Plan
通过访问 Spark 中的 Catalog 存储库来解析验证语义、列名、类型、表名等,就是说验证表名列名到底在不在。
➢ 优化后的逻辑执行计划:Optimized Logical Plan
Catalyst 优化器根据各种规则进行优化,比如谓词下推。
➢ 物理执行计划:Physical Plan
1)HashAggregate 运算符表示数据聚合,一般 HashAggregate 是成对出现,第一个 HashAggregate 是将执行节点本地的数据进行局部聚合,另一个 HashAggregate 是 将各个分区的数据进一步进行聚合计算。
2)Exchange 运算符其实就是 shuffle,表示需要在集群上移动数据。很多时候 HashAggregate 会以 Exchange 分隔开来。
3)Project 运算符是 SQL 中的投影操作,就是选择列(例如:select name, age…)。
4)BroadcastHashJoin 运算符表示通过基于广播方式进行 HashJoin
5)LocalTableScan 运算符就是全表扫描本地的表
CBO代价选择:选择最优的执行计划

4、SparkSQL 语法优化

4.1 大小表join

如果当一张小表足够小并且可以先缓存到内存中,那么可以使用 Broadcast Hash Join,其原理就是先将小表聚合到 driver 端,再广播到各个大表分区中,那么 再次进行 join 的时候,就相当于大表的各自分区的数据与小表进行本地 join,从而规避了 shuffle
1)通过参数指定自动广播 广播 join 默认值为 10MB
spark.sql.autoBroadcastJoinThreshold 参数控制。
2)强行广播join
语法: SELECT /*+ broadcast(a) */ FROM a JOIN b ON

4.2 大表和大表join

SMB JOINsort merge bucket 操作,需要进行分桶,首先会进行排序,然后根据 key 值合并,把相同 key 的数据放到同一个 bucket 中(按照 key 进行 hash)。分桶的目的其实 就是把大表化成小表。相同 key 的数据都在同一个桶中之后,再进行 join 操作,那么在联 合的时候就会大幅度的减小无关项的扫描。
使用条件:
(1)两表进行分桶,桶的个数必须相等
(2)两边进行 join 时,join 列=排序列=分桶列

5、基于 RBO 的优化

5.1 谓词下推(Predicate Pushdown)

将 过 滤 条 件 的 谓 词 逻 辑 都 尽 可 能 提 前 执 行 , 减 少 下 游 处 理 的 数 据 量 。下推的谓词能够大幅减少数据扫描量,降低磁盘 I/O 开销。

5.2 列剪裁(Column Pruning)

列剪裁就是扫描数据源的时候,只读取那些与查询相关的字段。

5.3 常量替换(Constant Folding)

我们在 select 语句中,掺杂了一些 常量表达式,Catalyst 也会自动地用表达式的结果进行替换。

6、基于 CBO 的优化

上文介绍的 RBO 属于逻辑计划的优化,只考虑查询,未考虑数据本身的特点。下面将介绍 CBO 如何利用数据本身的特点优化物理执行计划。
CBO 优化主要在物理计划层面,原理是计算所有可能的物理计划的代价,并挑选出代价最小的物理执行计划。充分考虑了数据本身的特点(如大小、分布)以及操作算子的特点(中间结果集的分布及大小)及代价,从而更好的选择执行代价最小的物理执行计划。

6.1 官方实验

CBO优化前:
在这里插入图片描述
CBO优化后:
在这里插入图片描述
物理执行计划是一个树状结构,其代价等于每个执行节点的代价总合,如下图所示。
在这里插入图片描述
而每个执行节点的代价,分为两个部分
该执行节点对数据集的影响,或者说该节点输出数据集的大小与分布

  • 该执行节点操作算子的代价
  • 每个操作算子的代价相对固定,可用规则来描述。而执行节点输出数据集的大小与分布,分为两个部分:
    1. 初始数据集,也即原始表,其数据集的大小与分布可直接通过统计得到;
    2. 中间节点输出数据集的大小与分布可由其输入数据集的信息与操作本身的特点推算。

所以,最终主要需要解决两个问题

  1. 如何获取原始数据集的统计信息
  2. 如何根据输入数据集估算特定算子的输出数据集

6.2 CBO如何优化

1 Statistics 收集(相关信息提前收集好)

需要先执行特定的 SQL 语句来收集所需的表和列的统计信息。
➢ 生成表级别统计信息(扫表):

ANALYZE TABLE 表名 COMPUTE STATISTICS

生成 sizeInBytes (这张表的大小)和 rowCount(这张表多少行)。
从如下示例中,Statistics 一行可见, customer 表数据总大小为 37026233 字节,即 35.3MB,总记录数为 28万。

spark-sql> ANALYZE TABLE customer COMPUTE STATISTICS;
Time taken: 12.888 seconds
​
spark-sql> desc extended customer;
c_customer_sk bigint   NULL
c_customer_id string   NULL
c_current_cdemo_sk     bigint NULL
c_current_hdemo_sk     bigint NULL
c_current_addr_sk       bigint NULL
c_first_shipto_date_sk bigint NULL
c_first_sales_date_sk   bigint NULL
c_salutation   string   NULL
c_first_name   string   NULL
c_last_name   string   NULL
c_preferred_cust_flag   string NULL
c_birth_day   int     NULL
c_birth_month int     NULL
c_birth_year   int     NULL
c_birth_country string NULL
c_login string NULL
c_email_address string NULL
c_last_review_date     string NULL# Detailed Table Information
Database       jason_tpc_ds
Table   customer
Owner   jason
Created Time   Sat Sep 15 14:00:40 CST 2018
Last Access   Thu Jan 01 08:00:00 CST 1970
Created By     Spark 2.3.2
Type   EXTERNAL
Provider       hive
Table Properties       [transient_lastDdlTime=1536997324]
Statistics     37026233 bytes, 280000 rows
Location       hdfs://dw/tpc_ds/customer
Serde Library org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
InputFormat   org.apache.hadoop.mapred.TextInputFormat
OutputFormat   org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
Storage Properties     [field.delim=|, serialization.format=|]
Partition Provider     Catalog
Time taken: 1.691 seconds, Fetched 36 row(s)

➢ 生成列级别统计信息

ANALYZE TABLE 表名 COMPUTE STATISTICS FOR COLUMNS1,2,3

从如下示例可见,customer 表的 c_customer_sk 列最小值为 1, 最大值为 280000null 值个数为 0,不同值个数为 274368,平均列长度为 8,最大列长度为 8

spark-sql> ANALYZE TABLE customer COMPUTE STATISTICS FOR COLUMNS c_customer_sk, c_customer_id, c_current_cdemo_sk;
Time taken: 9.139 seconds
spark-sql> desc extended customer c_customer_sk;
col_name       c_customer_sk
data_type     bigint
comment NULL
min     1
max     280000
num_nulls     0
distinct_count 274368
avg_col_len   8
max_col_len   8
histogram     NULL

2 算子对数据集影响估计

对于中间算子,可以根据输入数据集的统计信息以及算子的特性,可以估算出输出数据集的统计结果。
在这里插入图片描述

本节以 Filter 为例说明算子对数据集的影响。
对于常见的 Column A < value B Filter,可通过如下方式估算输出中间结果的统计信息

  • A.min > B,则无数据被选中,输出结果为空
  • A.max < B,则全部数据被选中,输出结果与 A 相同,且统计信息不变
  • A.min < B < A.max,则被选中的数据占比为 (B.value - A.min) / (A.max - A.min)A.min 不变,A.max 更新为 B.value

3 算子代价估计

SQL 中常见的操作有 Selection(由 select 语句表示),Filter(由 where 语句表示)以及笛卡尔乘积(由 join 语句表示)。其中代价最高的是 join
Spark SQLCBO 通过如下方法估算 join 的代价

Cost = rows * weight + size * (1 - weight)
Cost = CostCPU * weight + CostIO * (1 - weight)

其中 rows 即记录行数代表了 CPU 代价,size 代表了 IO 代价。weightspark.sql.cbo.joinReorder.card.weight 决定,其默认值为 0.7

6.3 CBO优化Build侧选择

对于两表Hash Join,一般选择小表作为build size,构建哈希表,另一边作为 probe side。未开启 CBO 时,根据表原始数据大小选择 t2 作为build side
在这里插入图片描述
开启 CBO 后,基于估计的代价选择 t1 作为 build side。更适合本例
在这里插入图片描述

6.4 优化 Join 类型

Spark SQL 中,Join 可分为 Shuffle based JoinBroadcastJoinShuffle based Join 需要引入 Shuffle,代价相对较高。BroadcastJoin 无须 Join,但要求至少有一张表足够小,能通过 SparkBroadcast 机制广播到每个 Executor 中。
在不开启 CBO 中,Spark SQL 通过 spark.sql.autoBroadcastJoinThreshold 判断是否启用 BroadcastJoin。其默认值为 1048576010 MB。并且该判断基于参与 Join 的表的原始大小。
在下图示例中,Table 1 大小为 1 TBTable 2 大小为 20 GB,因此在对二者进行 join 时,由于二者都远大于自动 BroatcastJoin 的阈值,因此 Spark SQL 在未开启 CBO 时选用 SortMergeJoin 对二者进行 Join
而开启 CBO 后,由于 Table 1 经过 Filter 1 后结果集大小为 500 GBTable 2 经过 Filter 2 后结果集大小为 10 MB 低于自动 BroatcastJoin 阈值,因此 Spark SQL 选用 BroadcastJoin

在这里插入图片描述

6.5 优化多表 Join 顺序

未开启 CBO 时,Spark SQLSQLjoin 顺序进行 Join。极端情况下,整个 Join 可能是 left-deep tree。在下图所示 TPC-DS Q25 中,多路 Join 存在如下问题。

  1. left-deep tree,因此所有后续 Join 都依赖于前面的 Join 结果,各 Join 间无法并行进行。
  2. 前面的两次Join输入输出数据量均非常大,属于大 Join,执行时间较长。

在这里插入图片描述
开启 CBO 后, Spark SQL 将执行计划优化如下:
在这里插入图片描述

6.6 使用 CBO

通过 “spark.sql.cbo.enabled” 来开启,默认是 false。配置开启 CBO 后,CBO 优化器可以 基于表和列的统计信息,进行一系列的估算,最终选择出最优的查询计划。比如:Build 侧 选择、优化 Join 类型、优化多表 Join 顺序等。
下面是相关参数的说明:
在这里插入图片描述

总结

本文首先讲解了 Spark 的底层的 Shuffle 的调优以及从 SQLRDD的生成执行计划的整个处理流程,其次是 Spark SQL 语法优化,最后梳理了 Spark SQL 是如何基于 RBOCBO 的进行优化的!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/818286.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

【Ansible 的脚本 --- playbook 剧本】

目录 一、playbook 剧本介绍二、示例1、运行playbook2、定义、引用变量 三、使用playbook部署lnmp集群 一、playbook 剧本介绍 playbooks 本身由以下各部分组成 &#xff08;1&#xff09;Tasks&#xff1a;任务&#xff0c;即通过 task 调用 ansible 的模板将多个操作组织在…

从多个基础CMS中学习代码审计

代码审计 概念 什么是代码审计&#xff1f; 代码审计是在一个编程中对源代码旨在发现错误、安全漏洞或违反编程约定的项目。 说人话就是找它这些代码中可能存在问题的地方&#xff0c;然后看它是否真的存在漏洞。(博主小白&#xff0c;可能存在问题&#xff0c;请见谅) 分类…

ScrumMaster认证培训(CSM)记录篇-Leangoo领歌

前不久参加了Leangoo领歌CSM认证公开班&#xff0c;简单记录下我的学习之旅 当初选课程时也很是纠结&#xff0c;最终选择了Leangoo领歌&#xff0c;Leangoo领歌是Scrum中文网旗下的一款敏捷研发管理工具。 Leangoo领歌由Scrum中文网资深的敏捷顾问团队和敏捷研发团队经过近十…

Python scipy Moudle 中的 optimize 方法

Python scipy Moudle 中的 optimize 方法 scipy Moudle 中的 optimize 方法 minimize 最小化一个函数 它提供了多种算法&#xff0c;如 BFGS、Nelder-Mead、Powell 可选参数 fun&#xff1a;要最小化的目标函数x0&#xff1a;函数的初始猜测值。可以是一个数组或列表metho…

Python入门一

目录&#xff1a; python基本操作python基本数据类型python字符串基本操作python的运算符python控制流-判断python控制流-循环python常用数据结构-列表python常用数据结构-元组python常用数据结构-集合python常用数据结构-字典python函数python函数进阶与参数处理pythonlambda…

【7.31】C++编写7254是一个不寻常的数,可以表示为7254 = 39 x 186,这个式子中1~9每个数字正好出现一次

题目题干 7254是一个不寻常的数&#xff0c;因为它可以表示为7254 39 x 186&#xff0c;这个式子中1~9每个数字正好出现一次&#xff0c;输出所有这样的不同的式子&#xff08;乘数交换被认为是相同的式子&#xff09;。结果小的先输出&#xff1b;结果相同的&#xff0c;较小…

IPsec VPN小实验

IPSec 是什么&#xff1a; IPSec是一个框架&#xff0c;它不是具体指某个协议&#xff0c;而是定义了一个框架&#xff0c;由各种协议组和协商而成。该框架涉及到的主要有加密算法、验证算法、封装协议、封装模式、密钥有效期等等。 IPSecVPN建立的前提&#xff1a;要想在两个…

【性能测试】性能测试的概念、策略、指标

一、性能测试的概念 1.1 什么是性能 - 时间&#xff1a;系统处理用户请求的响应时间 -资源&#xff1a;系统运行过程中&#xff0c;系统资源的消耗情况 1.2 什么是性能测试 使用自动化工具&#xff0c;模拟不同的场景&#xff0c;对软件各项性能指标进行测试和评估的过程 …

适配器模式与装饰器模式对比分析:优雅解决软件设计中的复杂性

适配器模式与装饰器模式对比分析&#xff1a;优雅解决软件设计中的复杂性 在软件设计中&#xff0c;我们常常面临着需要将不同接口或类协调工作的情况&#xff0c;同时还要满足灵活性和可扩展性的需求。为了应对这些挑战&#xff0c;适配器模式和装饰器模式应运而生&#xff0c…

12-4_Qt 5.9 C++开发指南_创建和使用共享库

文章目录 1. 创建共享库2. 使用共享库2.1 共享库的调用方式2.2 隐式链接调用共享库2.3 显式链接调用共享库 1. 创建共享库 除了静态库&#xff0c;Qt 还可以创建共享库&#xff0c;也就是 Windows 平台上的动态链接库。动态链接库项目编译后生成 DLL 文件&#xff0c;DLL 文件…

【机器学习】Overfitting and Regularization

Overfitting and Regularization 1. 过拟合添加正则化2. 具有正则化的损失函数2.1 正则化线性回归的损失函数2.2 正则化逻辑回归的损失函数 3. 具有正则化的梯度下降3.1 使用正则化计算梯度&#xff08;线性回归 / 逻辑回归&#xff09;3.2 正则化线性回归的梯度函数3.3 正则化…

SpringCloud集成OpenTelemetry的实现

SpringCloud项目做链路追踪&#xff0c;比较常见的会集成SleuthZipKin来完成&#xff0c;但这次的需求要集成开源框架OpenTelemetry&#xff0c;这里整理下实现过程。相关文章&#xff1a; 【SpringCloud集成SleuthZipkin进行链路追踪】 【OpenTelemetry框架Trace部分整理】 …

JavaSE运算符

大体上&#xff0c;与C语言差不多&#xff0c;不同的地方&#xff0c;我用红色字体标注了 算术运算符 1. 基本四则运算符&#xff1a;加减乘除模 ( - * / %) int a 10 ; int b 20 ; System . out . println ( a b ); // 30 System . out . println ( a - b…

校园跑腿小程序开发需要哪些核心功能?

提起校园跑腿小程序大家都不陌生&#xff0c;尤其是对上大学的伙伴们来说,更是熟悉得不能再熟悉了&#xff0c;和我们的生活息息相关&#xff0c;密不可分。 对于现在的年轻人来说&#xff0c;网购是非常简单和方便的一种购物方式&#xff0c;随之快递也会越来越多。在我们国家…

白盒测试和黑盒测试的区别是什么?

前言 曾言道“黑猫&#xff0c;白猫&#xff0c;只要能抓住老鼠就是好猫”。我们的测试亦是如此&#xff0c;不管是黑盒测试还是白盒测试&#xff0c;只要能测试出来bug&#xff0c;可以找出问题所在&#xff0c;保障软件质量就是好的测试方法。 对于刚入门的软件测试小白来说…

【Git系列】Git到远程仓库

&#x1f433;Git到远程仓库 &#x1f9ca;1. github账号注册&#x1f9ca;2. 初始化本地仓库&#x1f9ca;3. 创建GitHub远程仓库&#x1f9ca;4. 给本地仓库起别名&#x1fa9f;4.1 查看远程库的连接地址&#x1fa9f;4.2 起别名 &#x1f9ca;5. git推送操作&#x1f9ca;6.…

【SEO基础】百度权重是什么意思及网站关键词应该怎么选?

百度权重是什么意思及网站关键词应该怎么选&#xff1f; 正文共&#xff1a;3253字 20图 预计阅读时间&#xff1a;9分钟 ​ 1.什么是网站权重&#xff1f; 这段时间和一些朋友聊到网站权重以及关键词&#xff0c;发现蛮多人对于这两个概念的认知还是存在一些错误的&#xf…

【Python】PySpark 数据计算 ② ( RDD#flatMap 方法 | RDD#flatMap 语法 | 代码示例 )

文章目录 一、RDD#flatMap 方法1、RDD#flatMap 方法引入2、解除嵌套3、RDD#flatMap 语法说明 二、代码示例 - RDD#flatMap 方法 一、RDD#flatMap 方法 1、RDD#flatMap 方法引入 RDD#map 方法 可以 将 RDD 中的数据元素 逐个进行处理 , 处理的逻辑 需要用外部 通过 参数传入 map…

RL— 深度强化学习简介

一、说明 深度强化学习是关于从我们看到和听到的东西中采取最好的行动。不幸的是&#xff0c;强化学习强化学习在学习概念和术语方面存在很高的障碍。在本文中&#xff0c;我们将介绍深度强化学习&#xff0c;并概述一般情况。然而&#xff0c;我们不会回避方程式和术语。它们提…

Linux虚拟机中安装MySQL5.6.34

目录 第一章、xshell工具和xftp的使用1.1&#xff09;xshell下载与安装1.2&#xff09;xshell连接1.3&#xff09;xftp下载安装和连接 第二章、安装MySQL5.6.34&#xff08;不同版本安装方式不同)2.1&#xff09;关闭防火墙&#xff0c;传输MySQL压缩包到Linux虚拟机2.2&#x…