SPARKSQL3.0-PhysicalPlan物理阶段源码剖析

news2025/1/15 23:37:25

一、前言

阅读本节需要先掌握【SPARKSQL3.0-Optimizer阶段源码剖析】

本质:物理计划阶段将optimizer阶段优化后的逻辑算子树【LogicalPlan】进行进一步转换,生成物理算子树【SparkPlan】,物理算子树的节点可以直接生成 RDD 或对 RDD 进行 transformation 操作

最终完成了从sql字符串到生成可执行的RDD算子,再由RDD算子去执行操作的过程

这也正是为何spark官网强烈建议使用sparksql,而不是底层RDD算子,因为sparksql模块在执行rdd操作之上做了很多的优化

二、示例

这里沿用一开始的示例,将action操作改为collect,代码:

val sparkConf = new SparkConf().setMaster("local[*]").setAppName("sparkSQL")

val spark = SparkSession.builder()
  .config(sparkConf)
  .getOrCreate()

case class Person(name: String, age: Int)

Seq(Person("Jack", 12), Person("James", 21), Person("Mac", 30)).toDS().createTempView("person")

spark.sql("SELECT * FROM PERSON WHERE AGE > 18").collect // action执行操作

打印:

== Parsed Logical Plan ==
'Project [*]
+- 'Filter ('AGE > 18)
   +- 'UnresolvedRelation [PERSON]

== Analyzed Logical Plan ==
name: string, age: int
Project [name#2, age#3]
+- Filter (AGE#3 > 18)
   +- SubqueryAlias person
      +- LocalRelation [name#2, age#3]

== Optimized Logical Plan ==
LocalRelation [name#2, age#3]

== Physical Plan ==
LocalTableScan [name#2, age#3]

三、源码

我们debug进入到collect算子中,其中将queryExecution[执行sql的关键类]和collectFromPlan函数作为参数传入到withAction函数中

image-20221015191536818

collectFromPlan函数

image-20221015191613903

withAction函数

image-20221015191638339

随后将qe.executedPlan变量值的结果传递给action函数【也就是上面的collectFromPlan函数】

image-20221015191831128

我们先来看executedPlan变量,是在QueryExecution类中;

sql物理执行计划过程是首先调用executedPlan函数 -> 调用sparkPlan函数,可以看到sparkPlan变量的返回值是SparkPlan,我们之前接触的都是logicalPlan,那么就需要先来了解一下SparkPlan

image-20221014164409259

在optimized阶段我们的sql还是logicalPlan逻辑树,但经过了sparkPlan函数则变成SparkPlan物理树,这里需要先介绍一下SparkPlan,先看结构:sparkPlan 和 LogicalPlan是并列关系

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-UWbhiF7F-1669101955160)(https://segmentfault.com/img/remote/1460000039649880/view)]

image-20221014164449216

LogicalPlan主要用于逻辑树的解析和优化,而SparkPlan则用于物理执行计划

Spark SQL 将 SQL 语句经过逻辑算子树转换成物理算子树,在物理算子树中,叶子类型的 SparkPlan 节点负责从无到有地创建 RDD,可以说sql -> rdd中的最后一步就是由SparkPlan节点完成的

在SparkPlan类中有execute函数用于负责将算子转化成RDD,其中调用了doExecute抽象函数,由各个叶子节点子类来实现

image-20221014165715398 image-20221014165919394

不同的子类有不同的doExecute实现,感兴趣的小伙伴可以点进去看看不同的实现

image-20221014165935761

那么sparkPlan物理树又是如何生成的呢?回到最开始sparkPlan函数中:

image-20221014171403343 image-20221014171435390

可以看到是调用planner.plan 来构建SparkPlan,接下来看一下planner是什么:

image-20221014171530561

planner来自sessionState中,类型为SparkPlanner

image-20221014171601265

SparkPlanner继承于QueryPlanner,QueryPlanner中的plan函数是将logicalPlan逻辑树转化成SparkPlan物理树的关键

image-20221014172128014

这里贴一下plan的代码: 这块和Analyzer、Optimizer阶段的规则迭代很像,通过strategies策略列表调用localPlan迭代执行,最终得到物理树;

abstract class QueryPlanner[PhysicalPlan <: TreeNode[PhysicalPlan]] {
  /** 可以使用的执行策略列表 */
  def strategies: Seq[GenericStrategy[PhysicalPlan]]

  def plan(plan: LogicalPlan): Iterator[PhysicalPlan] = {
    // Obviously a lot to do here still...

    // 通过strategies不同的策略来调用logicalPlan
    val candidates = strategies.iterator.flatMap(_(plan))

    // 如果该集合中存在 PlanLater 类型的
		// SparkPlan 通过 aceholder 中间变量取出对应的 LogicalPlan 后,递归调用 plan()方法,将
	  // planLater 替换为子节点的物理计划
    val plans = candidates.flatMap { candidate =>
      val placeholders = collectPlaceholders(candidate)
			
      // 对物理计划列表进行过滤,去掉一些不够高效的物理计划
      if (placeholders.isEmpty) {
        // Take the candidate as is because it does not contain placeholders.
        Iterator(candidate)
      } else {
        // Plan the logical plan marked as [[planLater]] and replace the placeholders.
        placeholders.iterator.foldLeft(Iterator(candidate)) {
          case (candidatesWithPlaceholders, (placeholder, logicalPlan)) =>
            // Plan the logical plan for the placeholder.
            val childPlans = this.plan(logicalPlan)

            candidatesWithPlaceholders.flatMap { candidateWithPlaceholders =>
              childPlans.map { childPlan =>
                // Replace the placeholder by the child plan
                candidateWithPlaceholders.transformUp {
                  case p if p.eq(placeholder) => childPlan
                }
              }
            }
        }
      }
    }

    val pruned = prunePlans(plans)
    assert(pruned.hasNext, s"No plan for $plan")
    pruned
  }
}

可以看到strategies集合中包含logicalPlan转化成sparkPlan的策略,strategies集合由SparkPlanner子类实现:其中extraStrategies是可以让用户指定的额外策略,每一个策略都有其实现的apply函数,感兴趣的话可以看一下不同策略的实现。

image-20221014172956139

这里贴一下常见的策略描述

image-20221015194532991

那么将logicalPlan转化为SparkPlan后,我们在网上会经常看到如下图片内容:costModel阶段

一条SQL在Spark之旅

Cost Model 对应的就是基于代价的优化(Cost-based Optimizations,CBO,详见 SPARK-16026 ),核心思想是计算每个物理计划的代价,然后得到最优的物理计划。但是截止到spark3.0,这一部分并没有实现,直接返回多个物理计划列表的第一个作为最优的物理计划,如下:

在QueryPlanner中的plan函数中会调用prunePlans抽象函数,此函数便是cost Model计划,由SparkPlanner实现

image-20221014173513103

在SparkPlanner中的实现是将集合原封不动返回,注释解释到会在后期实现此函数

image-20221014173700386

回到前面QueryExecution的createSparkPlan函数,将planner.plan得到的集合中第一个计划[next]返回

image-20221014173949939

此时sparkPlan阶段结束,我们回到executionPlan函数:可以看到有一个prepareForExecution准备执行函数,此函数是将preparations集合中不同的规则作用在sparkPlan物理树,从而获得优化后的可执行的sparkPlan

image-20221014174749027

image-20221014174813927

调用preparations函数,并将InsertAdaptiveSparkPlan传递进去

image-20221015225852013

image-20221014175010059

在上面的规则中有几个重头戏,一个是传进来的参数InsertAdaptiveSparkPlan 就是spark3.0中新增功能AQE【自适应执行】,SPARK-9850 在 Spark 中提出了自适应执行的基本思想,关于功能实现不在这里过多陈述,可查看相关文献;由此可以看出AQE功能目前只能通过sparksql才能使用

上面的 Rule 中还有一个 CollapseCodegenStages 也是重头戏,这就全代码阶段生成,Catalyst 全阶段代码生成的入口就是这个规则。当然,如果需要 Spark 进行全阶段代码生成,需要将 spark.sql.codegen.wholeStage 设置为 true(默认)。

剩余常用的规则参考下图

image-20221015194621820

CollapseCodegenStages这一阶段也叫做Tungsten阶段,这部分机制实现较为复杂,会在后面单独开一节讲解,这里简单提一下:Tungsten阶段是建立在现代编译器和MPP数据库的基础上,并且应用到数据的处理中。主要的思想是将那些拖慢整个程序执行速度的代码放到一个单独的函数中,消除虚拟函数的调用,并使用寄存器来存放中间结果。这项技术被称作“whole-stage code generation.

经过了prepareForExecution函数后,executedPlan的任务就完成了,我们回到一开始的withAction函数中:

image-20221122151633999

此时会调用action【collectFromPlan】函数,并将executedPlan生成的sparkPlan传递过去,collectFromPlan函数中调用了sparkPlan的executeCollect函数【此函数在部分子类中有重写】

image-20221015195002590

在executeCollect中主要是通过getByteArrayRdd函数获得RDD

image-20221122151843278

可以看到getByteArrayRdd函数中调用了execute函数,而execute函数中又调用了doExecute抽象函数,从而让sparkPlan的子类迭代生成所需要的RDD

image-20221015195617351 image-20221015195653055

生成好RDD后,再回到executeCollect函数中,通过byteArrayRdd.collect()函数提交runJob到达RDD提交阶段,到此sparksql阶段结束。

image-20221122152338948

image-20221122152357443

至此PhysicalPlan物理阶段结束

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/27412.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

javaweb 使用element + vue 完善项目 servlet 优化

我们先定义一个BaseServlet,继承HttpServlet 重写Service方法 &#xff08;因为HttpServlet就是在Service方法里做的通过请求方式进行方法分发&#xff0c;我们就重写改成通过请求路径分发&#xff09; 根据资源路径进行方法分发&#xff0c;利用反射得到调用者的class字节码文…

C++-指针:void*(不确定类型指针)简介【void *可以接受任何类型的赋值】【void *可以赋值给任何类型的变量】【void *不可以解引用】

void* 是一种特殊的指针类型&#xff0c;可用于存放任意对象的地址。一个 void* 指针存放着一个地址&#xff0c;这一点和其他指针类似。 void *可以接受任何类型的赋值&#xff0c;任何类型的指针都可以直接赋值给void *型指针&#xff0c;无需进行强制类型转换&#xff0c;相…

超大规模研究发现,缺乏维生素D,增加早逝风险

晒太阳是日常生活中最常见的一件事情了&#xff0c;但是很多人为了避免晒黑不喜欢晒太阳&#xff0c;更是把自己在夏天裹得严严实实&#xff0c;恨不得只露两只眼睛。其实每天晒太阳对身体有很多益处&#xff0c;杀菌消毒、促进维生素D的生成、促进血液循环和加速脂肪燃烧等。维…

IBM MQ 通道数量查看,以及最大通道数的修改

一&#xff0c;说明 在实践生产环境中&#xff0c;我们的运维人员很多时候都想关注通道使用了多少&#xff0c;离限定的对大通道数还有多少&#xff1f;下面我们就查看通道数量提供相应的办法。 二&#xff0c;示例 我有两个队列管理器&#xff0c;TEST_QM和 TEST_RQM&#…

C. Random Events(思维+概率)

Problem - 1461C - Codeforces 罗恩是一个长度为n的排列组合的快乐主人。 一个长度为n的排列组合是一个由1到n的n个不同的整数按任意顺序组成的阵列。例如&#xff0c;[2,3,1,5,4]是一个排列组合&#xff0c;但是[1,2,2]不是一个排列组合&#xff08;2在数组中出现了两次&…

动手学习深度学习

动手学习深度学习内容安排深度学习介绍内容安排 深度学习基础&#xff1a;线性神经网络、多层感知机卷积神经网络&#xff1a;LeNet、AlexNet、VGG、Inception、ResNet循环神经网络&#xff1a;RNN、GRU、LSTM、seq2seq注意力机制&#xff1a;Attention、Transformer优化算法&…

GPU是什么?GPU有多重要?

前段时间&#xff0c;MD和英伟达相继接到通知要对我国断供高端GPU芯片&#xff0c;很多人不知道GPU到底有什么用&#xff1f;下面IC修真院就带大家来一起了解一下GPU。 首先来了解一下GPU是什么&#xff1f; GPU–图形处理器&#xff08;Graphics Processing Unit&#xff09…

Assignment写作需要做好哪些练习?

有些澳洲留学小伙伴在被Assignment难住后往往会选择多练习来完成&#xff0c;那么如何顺利完成一篇Assignment的呢&#xff1f;小编就来为大家详解一番。 Some students studying in Australia often choose to practice more to complete assignments when they are baffled b…

【pwn】2022 极客大挑战

【pwn】2022 极客大挑战 前言 又是一年的极客大挑战&#xff0c;又老了一岁&#xff0c;也只有打打新生赛才能有第一次接触ctf快乐了&#xff0c;现在各种比赛的pwn都是纯纯的坐牢~ 本次题解的所有脚本使用的类库都是本人自己整合的一个库&#xff0c;github地址&#xff1a…

2022京东双十一全品类销售额变化情况一览:50%增长,50%下滑

面对外界的风风雨雨&#xff0c;京东一直稳如泰山。有人认为京东全线都出现销售额大幅下滑&#xff0c;有人则认为京东今年的销售额整体可观。 那么相较于去年同期&#xff0c;究竟有哪些品类在如此大环境下依然处于上升期&#xff0c;又有哪些品类遭遇滑铁卢&#xff0c;面临短…

Redis数据类型之list

文章目录listⅠ. 增删查改Ⅱ. 业务场景Ⅲ. 注意事项提示&#xff1a;以下是本篇文章正文内容&#xff0c;Redis系列学习将会持续更新 list ● 数据存储需求&#xff1a;存储多个数据&#xff0c;并对数据进入存储空间的顺序进行区分 ● 需要的存储结构&#xff1a;一个存储空间…

问题 D: 是否为有效的拓扑序列

题目描述 在一个有向无环图中&#xff0c;可能存在多种有效拓扑序列。以下图为例&#xff1a; 存在两种可行的拓扑序列&#xff1a;0 1 2 3 40 2 1 3 4 本题会给出一个图&#xff0c;以及多个序列&#xff0c;你来判断每一个序列是否是该图的有效拓扑序列。 输入格式 第一…

(Transferrin)TF-PEG-PCL/PLA/PAA 转铁蛋白-聚乙二醇-聚已内酯/聚乳酸/聚丙烯酸

产品名称&#xff1a;转铁蛋白-聚乙二醇-聚已内酯 英文名称&#xff1a;TF-PEG-PCL;Transferrin-PEG-PCL 纯度&#xff1a;95% 存储条件&#xff1a;-20C&#xff0c;避光&#xff0c;避湿 外观:固体或粘性液体&#xff0c;取决于分子量 PEG分子量可选&#xff1a;350、550、75…

外汇天眼:美国10月份成屋销售连续第九个月下降!利率上升和通胀飙涨吓跑潜在买家!

10月份美国房屋销售连续第9个月下滑&#xff0c;因利率上升和通胀飙升令买家持观望态度。 具体付 全美房地产经纪人协会(National Association of Realtors)的数据显示&#xff0c;9月至10月&#xff0c;成屋销售下降5.9%。这是自2011年12月以来的最慢速度&#xff0c;除了在C…

企业真实面试:父子类之间到底是怎么实例化的?

一. 问题展现 今天有粉丝向波哥询问了这样一道题目&#xff0c;这道题目是粉丝在面试时遇到的&#xff0c;如下图所示&#xff1a; 波哥把上图这道题目的考察重点给大家梳理一下&#xff1a; 有一个父类People&#xff0c;它有一个子类Child&#xff1b; 父类的的无参构造方法…

Redis——》过期删除策略

推荐链接&#xff1a; 总结——》【Java】 总结——》【Mysql】 总结——》【Redis】 总结——》【Spring】 总结——》【SpringBoot】 总结——》【MyBatis、MyBatis-Plus】 Redis——》过期删除策略一、过期删除策略1、定时删除2、惰性删除3、定期删…

通达信交易dll接口怎么实现程序化交易?

现在很多交易者选择量化投资与传统的股票交易之间&#xff0c;往往会选择自动化交易跟量化交易的比较多&#xff0c;毕竟现在很多可以开发出来的交易软件都具备了量化的特点&#xff0c;能够及时的把握更多的盈利的机会&#xff0c;就比如说常使用的到通达信交易dll接口就是一个…

LeetCode | 850. 矩形面积 II

我们给出了一个&#xff08;轴对齐的&#xff09;二维矩形列表 rectangles 。 对于 rectangle[i] [xi1, yi1, xi2, yi2], 表示第 i 个矩形的坐标&#xff0c; (xi1, yi1) 是该矩形 左下角 的坐标&#xff0c; (xi2, yi2) 是该矩形 右上角 的坐标。 计算平面中所有 rectangles…

java maven pom application 生产prod/开发dev/测试test

前言 pom 和 application.properties&#xff08;application.yml)里的定义的环境不太一样&#xff0c; pom 是maven对应的配置文件&#xff0c;编译阶段使用 application.properties&#xff08;application.yml) 是Spring配置文件&#xff0c;程序运行阶段使用 POM pom文…

每次打开百度太麻烦?用程序直接打开网页 Python实现百度划词搜索功能(获取剪切板数据)

浏览顺序实现划词功能运行错误解决问题转载声明实现划词功能 说是划词翻译&#xff0c;实际上我们是通过获取用户的剪切板内容&#xff0c;通过一系列的操作得到的。首先呢&#xff0c;我们就先实现如何获取剪切板内容的程序 首先先在桌面创建一个文件夹&#xff0c;命名为“…