spark sql(四)物理计划解析

news2025/4/7 15:45:36

1、流程解析

在该系列第二篇文章中介绍了spark sql整体的解析流程,我们知道整体的sql解析分为未解析的逻辑计划(Unresolved LogicalPlan)、解析后的逻辑计划(LogicalPlan)、优化后的逻辑计划(Optimized LogicalPlan)、物理计划(PhysiclPlan)等四个阶段。物理计划是sql转换执行的最后一个环节,过程比较复杂,其内部又分三个阶段,如下图所示:

这3个阶段所做的工作分别如下:

1)由SparkPlanner将各种物理计划策略(Strategy,可自定义扩展)作用于对应的LogicalPlan节点上,生成SparkPlan列表(注:一个 LogicalPlan 可能产生多种 SparkPlan)。

2)选取最佳的SparkPlan,在Spark3.0.1版本中的实现较为简单,在候选列表中直接用 next()方法获取第一个。

3)提交前进行准备工作,进行一些分区排序方面的处理,确保SparkPlan各节点能够正确执行,这一步通过 prepareForExecution方法调用若干规则(Rule,可自定义扩展)进行转换。

 注:如果对Strategy和Rule的扩展入口感兴趣的,可以参考该系列的第二篇文章:spark sql(二)sql解析流程扩展_Interest1_wyt的博客-CSDN博客

2、节点分类

根据SparkPlan的子节点数目,可以大致将其分为4类。分别为 LeafExecNode、UnaryExecNode、BinaryExecNode和其他不属于这3种子节点的类型。

1)LeafExecNode

叶子节点类型的物理执行计划不存在子节点,物理执行计划中与数据源相关的节点都属于该类型。LeafExecNode类型的SparkPlan负责对初始RDD的创建。例如:
angeExec会利用SparkContext中的parallelize方法生成给定范围内的64位数据的RDD;HiveTableScanExec会根据Hive数据表存储的HDFS信息直接生成HadoopRDD;FileSourceScanExec根据数据表所在的源文件生成FileScanRDD。

2)UnaryExecNode

UnaryExecNode类型的物理执行计划的节点是一元的,意味着只包含1个子节点,UnaryExecNode类型的物理计划也是数量最多的类型。UnaryExecNode节点的作用主要是对RDD进行转换操作。例如:
ProjectExec和FilterExec分别对子节点产生的RDD进行列剪裁与行过滤操作;
Exchange负责对数据进行重分区;
SampleExec对输入RDD中的数据进行采样;
SortExec按照一定条件对输入RDD中数据进行排序;
WholeStageCodegenExec类型的SparkPlan将生成的代码整合成单个Java函数。

3)BinaryExecNode

BinaryExecNode类型的SparkPlan具有两个子节点,这种二元类型的物理执行计划大多与数据的关联处理有关,比如常见的join处理。

4)其它节点

除上述3种类型的SparkPlan外,SparkSQL中还有许多其他类型的物理执行计划,如Union等。

3、源码追踪

spark sql物理计划这一块内容很杂,特别是还涉及分区与排序。所以理解起来很困难,本人也是处于知其然不知其所有然的尴尬状态,故下面通过源码的追踪串一下整体的逻辑。具体的细节则需要时间慢慢打磨。代码demo如下:

  def main(args: Array[String]): Unit = {

    //1、创建sparkSession
    val sparkSession = SparkSession.builder
      .appName("test")
      .master("local")
      .getOrCreate

    //2、编辑mysql连接参数
    val url: String = "jdbc:mysql://127.0.0.1:3306/olap"
    val table: String = "person"
    var prop: Properties = new Properties
    prop.put("user", "root")
    prop.put("password", "123456")

    //3、创建临时表
    val dataset: Dataset[Row] = sparkSession.read.format("mysql").jdbc(url, table, prop)
    dataset.toDF.createOrReplaceTempView("temp")

    //4、查询展示
    val dataset2: Dataset[Row] = sparkSession.sql("select id,name,birthday from temp")
    dataset2.show()

  }

前三部分主要是构建环境和创建临时表,这里不过多解释,我们从 第四部分查询开始追踪源码。因为大体流程在前面讲过,所以这里对于物理计划之前的流程简单介绍,在物理计划解析时进行详细介绍。首先是执行sql语句生成Dataset数据集(注意这里的数据集还没数据,只有在执行show方法时才会触发计算操作):

 

 这里tracker是一个追踪器,追踪sql计划各个阶段的起止时间以及各个规则作用的时间段等。plan则是生成最基础的未解析的逻辑计划。最后通过ofRows返回一个Dataset数据集。我们接着到ofRows中查看:

 ofRows方法中首先是创建一个QueryExecution对象,这是很重要的一个对象,它包含了sql计划的各个阶段。其次是通过assertAnalyzed将未解析的逻辑计划转换为解析后的逻辑计划。最后封装成Dataset返回(此时的Dataset还没有数据,只有遇到action算子才会触发后续流程)。

最后一路返回到最后的demo代码中,接着从show方法开始看:

 

这里第一行代码是获取要展示数据的行数,第二行getRows则是获取具体的数据,所以进入getRows看一下:

 这里首先是重新生成一个Dataset数据集,随后对新数据集的数据字段的类型做一些转换——非日期字段统一转为字符串类型。然后执行select转换和take行动操作。select主要是基于逻辑计划做一些转换,不是此次的重点,感兴趣的自己看下。最后一步操作则是拼装返回结果,也不是重点。这里我们直接看下take的操作:

 

 这里withAction是一个高阶函数,乍一看可能不知道这个函数的执行逻辑。这里我大致介绍下:首先当前所在的类是Dataset,所以执行limit(n).queryExecution其实就是在当前数据集所对应的逻辑计划上加一层limit封装,然后返回当前sql计划的QueryExecution对象。至于最后的collectFromPlan则是一个函数,其当做参数传递到withAction方法中。这里我们接着到withAction中看一下:

这里的逻辑也是比较绕,首先最外层SQLExecution.withNewExecutionId还是一个高阶函数,其封装整个action操作。具体的封装细节不是此次重点,感兴趣的可以再看。这里关键操作只有两行代码,第一行是重置SparkPlan物理计划的度量信息,其次是从物理计划获取结果,注意这里的action对象其实是上一步我们传递进来的collectFromPlan方法。这里我们先看下比较重要的qe.executedPlan这个方法,它返回了物理计划SparkPlan。具体我们到源码中看一下:

 

 首先第一行是判断是否执行逻辑计划的优化,如果没执行则执行。其次是执行物理计划的准备,不过在这一步还有一个容易忽略的点。就是sparkPlan.clone方法。它是获取物理计划的第一步。我们进入源码看一下:

 createSparkPlan是物理计划的第一步,其将逻辑计划LogicPlan转换为SparkPlan。我们到plan方法中接着看下其具体的转换流程:

 

可以看到是将各种strategy应用到逻辑计划上,进而生成PhysicalPlan,随后通过map函数对PhysicalPlan进行简单的处理进而转换成SparkPlan。再回到createSparkPlan中可以看到,即使有多个物理计划返回,默认也只返回第一个物理计划进行使用(当前spark版本3.0.1)。至此我们获得了一个可用的SparkPlan物理计划。接着回到executedPlan属性方法中:

这里的QueryExecution.prepareForExecution,主要是将各种规则作用于物理计划上,使得所有逻辑计划树转换为物理算子树。我们接着回到withAction方法中。

 当我们获得了SparkPlan物理计划并重置物理计划的度量信息后,接着执行action操作,这里的action其实就是一开传进来的collectFromPlan方法,所以接着到该方法中查看一下:

在该方法中可以看到会调用物理计划的executeCollect方法。而这里的物理计划通过断点可以看到是一个CollectLimitExec算子对象,所以我们接着到该对象的方法中看一下:

 可以看到这里其实是调用WholeStageCodegenExec中的方法,而该对象其实是用于生成代码的。随后将代码发送到各个节点执行。后续是不是感觉不知道去哪追踪?去除代码生成和任务下发,其实后续剩下的内容不多,主要就是去哪拿数据。这里我们可以一层层展开WholeStageCodegenExec算子,看看其底层的RDD数据到底来自于哪:

 可以看到最底层RDD数据是JDBCRDD,而RDD最核心的就是其compute方法,所以我们直接看下JDBCRDD的compute方法:

可以看到其最终还是通过将语法树转换成sql语句,然后通过jdbc连接发送请求并获得数据。至此是不是感觉豁然开朗,原来sparksql底层也是用jdbc查询的。为了防止大家被误导,这里要介绍下,本文之所以最后是jdbc查询,是跟我查询mysql数据库有关的,如果你查的是文件或其它非数据库的源,最后的物理算子肯定不是通过jdbc去查。但是整个spark sql计划转换流程和物理计划执行过程都是一样的。后期想扩展或者排查问题也可以参考该流程。

至此,我们物理计划获取数据的整个流程就完全清晰了。后续还会有些其它文章进一步探讨spark sql中问题定位以及功能扩展的用法。

4、总结:

1)因为spark sql计划的转换执行,很多都是懒加载,所以有的时候加的断点会不生效。这个时候可以吧其它断点都放开,只在目标位置放一个断点。然后重启程序即可。

2)spark sql物理计划转换的内容真的太多,本文主要注重将整个流程穿起来,所以很多细节讲解的不够到位。感兴趣或者有需要的可以自己再看。

3)物理计划内部包含Physical、SparkPlan、PreparedSparkPlan三个小阶段。

4)spark 3.0.1版本物理计划即使有多个也是默认使用第一个

5)本文之所以最后是jdbc查询,是跟我查询mysql数据库有关的,如果你查的是文件或其它非数据库的源,最后的物理算子肯定不是通过jdbc去查。但是整个spark sql计划转换流程和物理计划执行过程都是一样的。后期想扩展或者排查问题也可以参考该流程。

参考文献:

《spark sql内核剖析》

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/578882.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

HDFS学习笔记

HDFS1.0 1 什么是HDFS? HDFS的全称是:Hadoop DistributeFiles System,分布式文件系统。 在整个Hadoop技术体系中,HDFS提供了数据分布式存储的底层技术支持。 HDFS 由三个组件构成:NameNode(NN&#xff…

排序算法:堆排序

朋友们、伙计们,我们又见面了,本期来给大家解读一下栈和队列方面的相关知识点,如果看完之后对你有一定的启发,那么请留下你的三连,祝大家心想事成! 数据结构与算法专栏:数据结构与算法 个 人 …

算法|9.从暴力递归到动态规划2

9.算法|从暴力递归到动态规划2 1.数字字符串转英文字符串 题意:规定1和A对应、2和B对应、3和C对应…26和Z对应,那么一个数字字符串比如"111”就可以转化为:“AAA”、“KA"和"AK” 给定一个只有数字字符组成的字符串str,返回…

windows安装python开发环境

最近因工作需要,要学习一下python,所以先安装一下python的开发环境,比较简单 下载和安装Python 首先,在浏览器中打开Python的官方网站(https://www.python.org/downloads/) 然后,从该网站下载与你的操…

NCI架构-1

1、NFCC和DH通过物理连线相连,物理连线对应为Transport Layer(传输层),支持SPI、I2C、UART、USB等; 2、DH中所有和NFC相关的应用程序都可视为DH-NFCEE(EE:Execution Enviroment),图左的NFCEE模块可运行一些…

Linux系统中源码安装1.8.x版本Arduino IDE

本文内容参考: Ubuntu22.04安装Arduino IDE及Arduino UNO(使用CH341驱动)调试方法__KILLMILEDC_的博客-CSDN博客 在Linux上下载arduino_不说话的白帽子的博客-CSDN博客 https://guoqing.blog.csdn.net/article/details/88913063?spm1001.…

【JVM】8. 对象实例化及直接内存

文章目录 8.1. 对象实例化8.1.1. 创建对象的方式8.1.2. 创建对象的步骤1. 判断对象对应的类是否加载、链接、初始化2. 为对象分配内存3. 处理并发问题4. 初始化分配到的内存5. 设置对象的对象头6. 执行init方法进行初始化 8.2. 对象内存布局8.2.1. 对象头(Header&am…

python+vue新能源汽车在线租赁管理系统pycharm项目

开发语言:Python 框架:django/flask Python版本:python3.7.7 数据库:mysql 数据库工具:Navicat 开发软件:PyCharm 在当今高度发达的信息中,信息管理改革已成为一种更加广泛和全面的趋势。 “新…

SpringBoot——原理(自动配置+原理分析@Conditional)

在上一篇有说到,进行源码跟踪时可以看见一个以Conditional开头的注解,这些都是条件装配的注解。 加在方法上时只对该方法生效,加在类上时是对整个配置类都有效。 这里只说三个常用的Conditional的子注解 案例演示 在启动类上加上一个Enabl…

第二章:ShardingSphere简介

什么是ShardingSphere 何为ShardingSphere呢?其实我们总结如下三点就能很好的理解: 1、一整套开源的分布式数据库中间件解决方案 2、有三个产品组成:Sharding-JDBC、Sharding-Proxy、Sharding-Sidecar(规划中) 3、他的定位是关系型数据库的中间件,在分布式环境下合理的…

【20】SCI易中期刊推荐——计算机信息系统工程电子与电气(中科院3区)

💖💖>>>加勒比海带,QQ2479200884<<<💖💖 🍀🍀>>>【YOLO魔法搭配&论文投稿咨询】<<<🍀🍀 ✨✨>>>学习交流 | 温澜潮生 | 合作共赢 | 共同进步<<<✨✨ 📚📚>>>人工智能 | 计算机视觉…

(转载)matlab遗传算法工具箱

以下内容大部分来源于《MATLAB智能算法30个案例分析》&#xff0c;仅为学习交流所用。 1理论基础 1.1遗传算法概述 遗传算法(genetic algorithm,GA)是一种进化算法,其基本原理是仿效生物界中的“物竞天择、适者生存”的演化法则。遗传算法是把问题参数编码为染色体,再利用迭代…

Qiskit系列(1)---Qiskit安装

1.qiskit与anaconda简介 Qiskit并不是一门独立的语言&#xff0c;它是基于Python的一个框架&#xff0c;就好比Pytorch, Tensorflow。而Qiskit这个框架需要配套一些其他的package&#xff08;各种大小DLC&#xff09;一起运行&#xff0c;这些运行Qiskit所必须的package就构成了…

vue前端分页功能怎么实现

Vue前端分页功能可以通过以下几个步骤实现&#xff1a; 1. 安装分页组件库&#xff08;如vue-pagination-2&#xff09;&#xff1a; bash npm install vue-pagination-2 2. 在Vue项目中引入并注册分页组件&#xff1a; javascript import Vue from vue; import Pagination fr…

【商品详情 +关键词搜索】API 接口系列

首先&#xff0c;大家要到官方主页去申请一个 appkey&#xff0c;这个是做什么用的呢&#xff1f;App Key 是应用的唯一标识&#xff0c;TOP 通过 App Key 来鉴别应用的身份。AppSecret 是 TOP 给应用分配的密钥&#xff0c;开发者需要妥善保存这个密钥&#xff0c;这个密钥用来…

增强语言模型导读

以ChatGPT为主的大语言模型出现已有半年时间&#xff0c;研究逐渐从针对模型本身的进化和功能&#xff0c;延展到如何更为有效地利用大模型&#xff0c;将它与其它工具结合&#xff0c;落地&#xff0c;以解决实际领域中的问题。 这里的增强主要指让大语言模型&#xff08;LM&…

【21】SCI易中期刊推荐——计算机科学人工智能领域(中科院4区)

💖💖>>>加勒比海带,QQ2479200884<<<💖💖 🍀🍀>>>【YOLO魔法搭配&论文投稿咨询】<<<🍀🍀 ✨✨>>>学习交流 | 温澜潮生 | 合作共赢 | 共同进步<<<✨✨ 📚📚>>>人工智能 | 计算机视觉…

Unity 动画系统基本概念

一、动画的基本概念 1、帧 在古代&#xff0c;一幅字画叫一帧&#xff0c;而在计算机中&#xff0c;每次渲染完毕一幅画面并显示出来&#xff0c;这一幅画就是一帧。 连续切换的帧就形成了动态的画面。每秒刷新帧的次数称为频率&#xff0c;单位是FPS&#xff08;Frames Per…

JavaEE Tomcat Servelet第一个helloworld程序

Tomcat & Servelet第一个程序helloworld&#xff01; 文章目录 JavaEE & Tomcat & 第一个Servelet程序1. HTTP服务器 - Tomcat1.1 Tomcat的目录结构&#xff1a;1.2 启动Tomcat1.3 Tomcat的优点 2. Servelet框架2.1 创建Maven项目2.2 引入依赖2.3 创建目录2.4 写代…

Mac电脑读写移动硬盘软件Tuxera NTFS2023中文版

日常工作中&#xff0c;我们经常会使用移动硬盘拷贝文件&#xff0c;因为移动硬盘传输文件方便、传输速度快。但我们在mac电脑上使用移动硬盘却发现硬盘无法正常读写。本文向大家介绍mac能读写的移动硬盘有哪些以及移动硬盘怎么在mac上读写。 一、Mac能读写的移动硬盘有哪些 移…