大数据技术架构(组件)32——Spark:Spark SQL--Execute Engine

news2024/9/25 3:17:34

2.2、Spark SQL

2.2.1、Execute Engine

SparkSql的整体提交执行流程和Hive的执行流程基本上一致。站在通用的角度,对于SparkSql来说,从Sql到Spark的RDD执行需要经历两个大的阶段:逻辑计划物理计划

逻辑计划层面会把用户提交的sql转换成树型结构,把sql中的逻辑映射到逻辑算子树的不同节点,该阶段并不会真正的进行提交执行,只是作为中间阶段。在这个过程中会经历三个阶段:

1、未解析的逻辑算子树(Unresolved LogicalPlan),该阶段只是通过Antlr Parser把sql进行词法分析,语法验证得到数据结构,并不包含任何数据信息。

2、解析后的逻辑算子数(Analyzed LogicalPlan),这个阶段会结合Catalog元数据信息对第一阶段得到的节点进行绑定

3、优化逻辑算子树(Optimized LogicalPlan),该阶段结合节点数据信息,应用一些优化规则对一些低效的逻辑计划进行转换。

物理计划层面会把上一步优化后的逻辑算子树进行进一步的转换,生成物理算子树,物理算子树上的节点会直接生成RDD或者对RDD进行transformation操作,并最终执行。那么对物理计划进行细分的话,又可以分为三个子阶段:

1、物理算子树列表(Iterable[PhysicalPlan]):根据优化后得到的逻辑算子树进行转换生成物理算子树的列表。

2、最优物理算子树(SparkPlan):从物理算子树列表中按照一定的策略选取最优的物理算子树。

3、准备算子树(Prepared SparkPlan):得到最优的算子树之后,那么就开始准备一些执行工作,如执行代码生成、确保分区操作正确、物理算子树节点重用等工作。

最后会对生成的RDD执行Action操作进行真正的作业执行。以上所有的流程均是在Spark的Driver端完成的,这个时候还不涉及到集群环境。

上述的所有流程可以通过SparkSession类的sql方法作为入口,调用SessionState各种对象(SparkSqlParser、Analyzer、Optimizer、SparkPlanner),最后封装一个QueryExecution对象。所以上面的每一步流程都有单独独立的类功能实现,对于我们日常开发工作中进一步剥离分析进行二次加工提供了很大的。

Spark SQL在执行SQL之前,会将SQL或者Dataset程序解析成逻辑计划,然后经历一系列的优化,最后确定一个可执行的物理计划。最终选择的物理计划的不同对性能有很大的影响。如何选择最佳的执行计划,这便是Spark SQL的Catalyst优化器的核心工作。Catalyst早期主要是基于规则的优化器(RBO),在Spark 2.2中又加入了基于代价的优化(CBO)。

2.2.1.1. RBO

根据上面的执行流程,SparkSql在逻辑优化层面主要是基于规则的优化,即RBO(Rule-Based-Optimization)

1、每个优化都是以Rule的形式存在,每条Rule都是对Analyzed Plan的等价转换

2、RBO易于扩展,新增规则可以非常方便嵌入到Optimizer中

3、RBO优化的主要思路在于减少参与计算的数据量以及计算本身的代价。

如常见的谓词下推、常量合并、列裁剪等优化手段

2.2.1.2、CBO

RBO层面的优化主要是针对逻辑计划,未考虑到数据本身的特点(数据分布、大小)以及算子执行(中间结果集分布、大小)的代价,因此sparksql又引入了CBO优化机制(Cost-Based Optimized),该优化主要在物理计划层面,其原理是计算所有可能的物理计划的代价,并挑选出代价最小的物理计划,其核心在于评估一个给定的物理执行计划的代价,其代价等于每个执行节点的代价总和。而每个执行节点的代价,又分为两个部分:

1、该执行节点对数据集的影响,或者说该节点输出数据集的大小和分布。

2、该执行节点操作算子的代价。操作算子的代价相对比较固定,可以用规则来描述。

而执行节点输出数据集主要分为两部分:

1、初始数据集,例如原始文件,其数据集的大小和分布可以直接统计得到的。

2、中间节点输出数据集的大小和分布可以根据输入数据集的信息和操作本身的特点来推算。

因此CBO优化最主要需要先解决两个问题:

1、怎么样子可以获取到原始数据集的统计信息

2、如何根据输入数据集估算特定算子的输出数据特征情况

2.2.1.2.1、如何统计到原始数据集的信息

可以通过Analyze table来分析统计出原始数据集的大小(略)

2.2.1.2.2、算子代价估计

SQL中最常见的就是Join操作,这里以Join方法为例,说明SparkSql的CBO是如何进行估价的。主要是通过以下公式:

Cost = rows * weight + size * (1-weight) ;其中rows为行数代表CPU代价,Size为大小代表IO代价。

Cost = CostCpu * weight + CostIO * (1-weight)

Weight权重的配置可以通过spark.sql.cbo.joinRecorder.card.weight决定,默认为0.7

2.2.1.3、AE

2.2.1.3.1、背景

在生产环境中,往往需要提前配置好分区数以及使用资源,然后在运行的过程中或者事后进行不断的调整参数值来达到最优。但是由于每次计算的数据量可能会变化很大,那么可能需要每次都会人工干涉进行调优,这也意味sql作业很难以最优的性能去运行。而且Catalyst优化器的一些优化工作是在计划阶段,一旦优化完成之后,在运行期间就不能改变。因此需要在运行期间拿到更多的运行信息,不断调整执行计划来达到最优,因此在Spark2.3之后引入了一个Adaptive(自适应)执行机制,需要通过spark.sql.adaptive.enabled参数来开启其机制

2.2.1.3.2、执行原理

根据Spark作业执行流程可知是先根据RDD的DAG图进行划分生成Stage然后提交作业执行,因此在执行过程中计划是不会发生变化的。那么

自适应执行的基本思路是在执行计划中事先划分好stage,然后按stage提交执行,在运行时收集当前stage的shuffle统计信息,以此来优化下一个stage的执行计划,然后再提交执行后续的stage。

对于图中两表join的执行计划来说会创建3个QueryStage。最后一个QueryStage中的执行计划是join本身,它有2个QueryStageInput代表它的输入,分别指向2个孩子的QueryStage。在执行QueryStage时,我们首先提交它的孩子stage,并且收集这些stage运行时的信息。当这些孩子stage运行完毕后,我们可以知道它们的大小等信息,以此来判断QueryStage中的计划是否可以优化更新。例如当我们获知某一张表的大小是5M,它小于broadcast的阈值时,我们可以将SortMergeJoin转化成BroadcastHashJoin来优化当前的执行计划。我们也可以根据孩子stage产生的shuffle数据量,来动态地调整该stage的reducer个数。在完成一系列的优化处理后,最终我们为该QueryStage生成RDD的DAG图,并且提交给DAG Scheduler来执行

2.2.1.3.3、实现点

该机制主要有三个功能点:

1、自动设置shuffle分区数

主要解决的问题有以下几点:

1.1、如果设置分区数过小可能会导致每个task处理大量的数据,会发生溢写磁盘的情况影响性能,甚至发生频繁GC或者OOM。

1.2、如果设置分区数过大可能会导致每个task处理小量的数据,而且会有可能产生小文件,甚至会出现资源空闲的情况。

1.3、设置分区数是对所有的Stage都会生效,而每个Stage所处理的数据量和分布都不太一样,所以全局的分区数只能对某些Stage是最优的,无法做到全局最优。

例如我们设置的shufflepartition个数为5,在map stage结束之后,我们知道每一个partition的大小分别是70MB,30MB,20MB,10MB和50MB。假设我们设置每一个reducer处理的目标数据量是64MB,那么在运行时,我们可以实际使用3个reducer。第一个reducer处理partition 0 (70MB),第二个reducer处理连续的partition 1 到3,共60MB,第三个reducer处理partition 4 (50MB)

2、动态调整执行计算

以join操作为例,在Spark中最常见的策略是BroadcastHashJoin和SortMergeJoin。BroadcastHashJoin属于map side join,其原理是当其中一张表存储空间大小小于broadcast阈值时,Spark选择将这张小表广播到每一个Executor上,然后在map阶段,每一个mapper读取大表的一个分片,并且和整张小表进行join,整个过程中避免了把大表的数据在集群中进行shuffle。而SortMergeJoin在map阶段2张数据表都按相同的分区方式进行shuffle写,reduce阶段每个reducer将两张表属于对应partition的数据拉取到同一个任务中做join。CBO根据数据的大小,尽可能把join操作优化成BroadcastHashJoin。Spark中使用参数spark.sql.autoBroadcastJoinThreshold来控制选择BroadcastHashJoin的阈值,默认是10MB。然而对于复杂的SQL查询,它可能使用中间结果来作为join的输入,在计划阶段,Spark并不能精确地知道join中两表的大小或者会错误地估计它们的大小,以致于错失了使用BroadcastHashJoin策略来优化join执行的机会。但是在运行时,通过从shuffle写得到的信息,我们可以动态地选用BroadcastHashJoin。

3、动态处理数据倾斜

在SQL作业中,数据倾斜是很常见的问题,但都是事后人为通过一些手段进行解决的,那么能不能在运行时自动处理掉呢?

假设A表和B表做inner join,并且A表中第0个partition是一个倾斜的partition。

一般情况下,A表和B表中partition 0的数据都会shuffle到同一个reducer中进行处理,由于这个reducer需要通过网络拉取大量的数据并且进行处理,它会成为一个最慢的任务拖慢整体的性能。

在自适应执行框架下,一旦我们发现A表的partition 0发生倾斜,我们随后使用N个任务去处理该partition,每个任务只读取若干个mapper的shuffle 输出文件,然后读取B表partition 0的数据做join。最后,我们将N个任务join的结果通过Union操作合并起来。

为了实现这样的处理,我们对shuffle read的接口也做了改变,允许它只读取部分mapper中某一个partition的数据。在这样的处理中,B表的partition 0会被读取N次,虽然这增加了一定的额外代价,但是通过N个任务处理倾斜数据带来的收益仍然大于这样的代价。

如果B表中partition 0也发生倾斜,对于inner join来说我们也可以将B表的partition 0分成若干块,分别与A表的partition 0进行join,最终union起来。但对于其它的join类型例如Left Semi Join我们暂时不支持将B表的partition 0拆分

4、Left join build left side map

对于left join的情况,可以对左表进行HashMapBuild。可以实现小左表left join 大右表的情况下进行ShuffledHashJoin调整。

原理:

1、在构建左表Map的时候,额外维持一个“是否匹配成功”的映射表。

2、在和右表join结束之后,把所有没有匹配到的key,用null来join填充。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/340550.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

2022级上岸浙理工MBA的复试经验提炼和备考建议

在等待联考成绩出来的那段时间,虽然内心很忐忑,但还是为复试在积极的做准备,虽然也进行了估分大概有201分,但成绩和分数线没下来之前,只能尽量多做些一些准备把。因为笔试报了达立易考的辅导班,对于浙江理工…

复现随记~

note(美团2022) 比较简单的越界漏洞,堆本身并没有什么漏洞,而且保护并没全开,所以逆向思维。必然是ROP类而非指针类,故我们着重注意unsigned int等无符号数前后是否不一致 int __fastcall edit(__int64 a1) {int idx; // [rsp14…

[全栈工程师]从0到封神

全栈工程师 一个可以独立完成产品开发的人 目标规划全栈工程师参与社区的问题回答gitCode来自选择专业的问答为什么选择软件工程当初对软件工程这个专业的期待和想象是什么当初希望自己是如何投入这个专业的学习的曾经做过什么准备,或者立下过什么FLAG吗CSDN的我的介…

【idea】idea生产类注释和方法注释

网上有很多类似的文章,但是我在按照他们的文章设置后,出现了一些问题,因此我这边在解决了问题后,总结一篇文章,发出来给大家借鉴一下。在此先说明一下idea的版本,是2020.1.3 设置动态模板,File…

应用场景二:西门子PLC通过无线WIFI连接上位机

应用场景描述: 西门子PLC通过桥接器的无线WIFI连接上位机通讯,可以同时支持S7TCP、ModbusTCP和MQTT协议,上位机可以支持西门子编程软件(Micro/WIN、STEP7、博途),组态软件(Wincc、组态王、OPC软…

基于卷积神经网络的立体视频编码质量增强方法_余伟杰

基于卷积神经网络的立体视频编码质量增强方法_余伟杰提出的基于TSAN的合成视点质量增强方法全局信息提取流像素重组局部信息提取流多尺度空间注意力机制提出的基于RDEN的轻量级合成视点质量增强方法特征蒸馏注意力块轻量级多尺度空间注意力机制概念扭曲失真孔洞问题失真和伪影提…

【OpenCV图像处理系列一】OpenCV开发环境的安装与搭建(Ubuntu + Window都适用)

🔗 运行环境:OpenCV,Ubuntu,Windows 🚩 撰写作者:左手の明天 🥇 精选专栏:《python》 🔥 推荐专栏:《算法研究》 #### 防伪水印——左手の明天 #### &#x…

Hadoop集群搭建详细步骤

目录 一、模板虚拟机环境准备 1.新建一台虚拟机hadoop100,并且配置好网络 3.安装 epel-release 4.其他工具 5. 配置普通用户具有root权限,方便后期加sudo执行root权限的命令 6.删除/opt/目录下的所有文件 7.在/opt/目录下创建文件夹,并…

RocketMQ底层源码解析——事务消息的实现

1. 简介 RocketMQ自身实现了事务消息,可以通过这个机制来实现一些对数据一致性有强需求的场景,保证上下游数据的一致性。 以电商交易场景为例,用户支付订单这一核心操作的同时会涉及到下游物流发货、积分变更、购物车状态清空等多个子系统…

基于javaee的电影碟片租赁管理系统的设计

技术:Java、JSP、框架等摘要:随着信息技术在管理中的广泛应用,管理信息系统(MIS)的实施在技术上逐渐成熟。为了适应时代的发展,降低管理成本,提高工作效率,企业需要加强对内部资源(人、钱、物)的有效管理&a…

Android测试包安装方式汇总

背景:作为一名测试,尤其是移动端测试,掌握app的安装方式是必备的基本技能,因此将Android测试包不同格式不同方式的安装方式进行一个总结分享​,仅供大家学习参考。 一、设备调试准备 1、设备打开开发者模式&#xff…

医学生考研考博太卷,一篇文章轻松助力上岸(一)

考研考博太卷了,卷不过,想没想过本科发一篇文章呢? 330分考研人淘汰390分考研人这个故事,大家应该都知道吧。 本专栏带你六个月内,搞定一篇文章,本科生发文章也很容易。 在卷考研的同时,再卷…

应用场景一:西门子PLC通过桥接器连接MQTT服务器

应用场景描述: 云平台、MES等数据采集、设备管理系统,需要通过MQTT的方式,上传和下发数据,MQTT服务器可以获取PLC的实时状态数据,也可以下发控制指令。桥接器提供4G、WIFI和有线三种连接方式。 网络拓扑:…

GRBL源码简单分析

结构体说明 GRBL里面的速度规划是带运动段前瞻的,所以有规划运动段数据和微小运动段的区分 这里的“规划运动段”对应的数据结构是plan_block_t,前瞻和加减速会使用到,也就是通过解析G代码后出来的直接直线数据或是圆弧插补出来的拟合直线数据…

【链式二叉树】数据结构链式二叉树的(万字详解)

前言: 在上一篇博客中,我们已经详解学习了堆的基本知识,今天带大家进入的是二叉树的另外一种存储方式----“链式二叉树”的学习,主要用到的就是“递归思想”!! 本文目录1.链式二叉树的实现1.1前置说明1.2结…

【蓝桥杯单片机】Keil5中怎么添加STC头文件;从烧录软件中添加显示添加成功后新建工程时依旧找不到

蓝桥杯单片机的芯片型号:IAP15F2K61S2 添加头文件:STC15F2K60S2.H 【1】如何通过烧录软件添加STC头文件: 从ATC-ISP的Keil仿真设置中添加(同时自动下载仿真驱动)仔细阅读添加说明 KEIL5添加STC芯片库_Initdev的博客-…

UVa The Morning after Halloween 万圣节后的早晨 双向BFS

题目链接:The Morning after Halloween 题目描述: 给定一个二维矩阵,图中有障碍物和字母,你需要把小写字母移动到对应的大写字母位置,不同的小写字母可以同时移动(上下左右四个方向或者保持不动 &#xff0…

概论_第8章_假设检验的基本步骤__假设检验的类型

一. 假设检验的基本步骤如下:第1步 根据实际问题提出原假设 及备择假设 , 要求 与 有且仅有一个为真;第2步 选取适当的检验统计量, 并在原假设 成立的条件下确定该检验统计量的分布;第3步 按问题的具体要求, 选取适当…

【java】OpenFeign源码解析学习

本文主要针对 spring-cloud-starter-openfeign 的 2.2.3.RELEASE 版本进行源码的解析。 OpenFeign是什么? 作为Spring Cloud的子项目之一,Spring Cloud OpenFeign以将OpenFeign集成到Spring Boot应用中的方式,为微服务架构下服务之间的调用提…

SQL Serve 日志体系结构

SQL Server 事务日志记录着 undo 及 redo 日志,为了保证数据库在崩溃后恢复,或者在正常数据库操作期间进行回滚,保证数据库事务完整性和持久化。如果没有事务日志记录,数据库在事务上将不一致,并且在数据库崩溃后可能导…