spark sql(一)源码分析sql解析流程

news2024/12/28 18:28:24

        spark sql解析sql主要基于Catalyst框架,它将复杂的sql解析分为很多的阶段,每个阶段基本都有专属的工具类和扩展接口,最终实现将sql转换为DataFrame或RDD任务的功能。如果对于这些中间阶段和工具类没有一个整体概念性的了解,那阅读源码会很吃力。下面我们先大致讲解一下spark sql比较关键的阶段、然后再从源码角度追踪探查一下。

        特别提醒:很多概念名词文中给出的是一个笼统的讲解,为的是后续阅读源码时不那么吃力,实际上很多概念包含丰富的理论知识,感兴趣的可以自己在额外查阅了解。

1、spark sql核心阶段

未解析的逻辑计划:

        原始sql经过语法解析后(底层使用的是Antlr框架)就变成未解析的逻辑计划,这里的逻辑计划可以理解为语法树。后续所有的解析和操作也是通过遍历树以及修改匹配的树节点来实现的。        

        解析工具:sqlParser

逻辑计划(也称为解析后的逻辑计划):

        这一步是在上一步的语法树基础上操作的,上一步我们获得了基本的语法树,但是语法树中要查的表、字段等信息并没有经过验证。所以这一步就是和数据源连接,将语法树中未解析的表、字段等信息转换为已解析状态。

        解析工具:analyzer

优化后逻辑计划:

        通过前两个阶段,spark sql可以获得完整的sql语法树信息。这个阶段主要的功能是对于一些可以优化的场景,spark sql都会自动进行一个sql语法树(逻辑计划)的改写。这个过程的实现主要依赖于模式匹配以及访问者模式的使用。

        解析工具:optimizer

物理计划:

        将语法树上关键节点的动作转换为spark内部的方法,        

        解析工具:planner

完整的解析流程可以用下图表示,可以看上述几个阶段其实都是Catalyst流程中的一部分,而且在物理计划之后还有一个基于代价挑选最优物理计划的过程(在3.0.1版本中是直接挑选第一个物理计划)。这块在后续代码中也会简单看一下。

除了上述几个关键的解析工具外,还要再留意两个对象,一个是SessionState,它包含了上述所有的解析工具另外一个是QueryExecution,spark 将sql解析的各个中间阶段状态以及所用的工具封装成QueryExecution对象,所以通过QueryExecution可以获得各个阶段的语法树。

2、catalyst三个重要概念

2.1 InternalRow体系

        在spark sql内部实现中,InternalRow就是用来表示一行行数据的类。物理算子树节点产生和转换的RDD类型即为RDD[InternalRow]。另外要注意的是InternalRow中每一列所对应的数据类型其实是Catalyst内部定义的数据类型,所以当自定义进行一些数据转换时,可能需要进行适当的类型转换。

2.2 TreeNode体系

        无论逻辑计划还是物理计划,都离不开承载内容的中间数据结构,在catalyst中,对应的是TreeNode体系。TreeNode类是spark sql中所有树结构的基类,定义了一系列通用的集合操作和树遍历操作接口。

        TreeNode一直在内存里维护,不会dump到磁盘以文件形式存储,且树的修改都是以替换已有节点的方式进行的

2.3 Expression体系

        表达式一般指的是不需要触发执行引擎而能直接进行计算的单元,例如加减乘除四则运算、转换操作、过滤操作等。在Expression类中,主要定义了5个方面的操作,包括基本属性、核心操作、输入输出、字符串表示和等价性判断。

        在spark sql中,Expression本身也是TreeNode类的子类,因此能够调用所有TreeNode的方法。也可以通过多级的子Expression组合成复杂的Expression。

3、源码追踪

3.1 demo代码

  def main(args: Array[String]): Unit = {

    val sparkSession = SparkSession.builder
      .appName("test")
      .master("local")
      .getOrCreate

    import org.apache.spark.sql.types._

    val schema = StructType(
      List(StructField("id", IntegerType, nullable = false),
        StructField("name", StringType, nullable = false),
        StructField("age", IntegerType, nullable = false))
    )

    val rdd = sparkSession.sparkContext.parallelize(Seq(
      Row(1, "xiaohong", 34),
      Row(2, "xiaoli", 42),
      Row(3, "xiaoming", 28),
    ))

    val df = sparkSession.sqlContext.createDataFrame(rdd, schema)

    df.createOrReplaceTempView("person")

    val df2 = sparkSession.sql("select id,name from person where age > 30")

    df2.show()

  }

3.2 debug

前面创建临时表的过程直接跳过(底层其实还是走的catalyst框架),直接从sparkSession.sql这里开始看:

这一步有三个需要留意的点。

        第一个就是withActive,它的功能是将当前会话对象存入线程局部变量,在withActive中的代码块执行结束后再替换为原先的会话对象, 它的语法特性是入参是一个方法体,所以用大括号括起来。

        第二个是追踪对象tracker,它会被一直传递下去,进而记录sql解析各个阶段的指标信息,注意measurePhase是一个高阶函数,它将非公共功能抽象成字符串入参,将公共功能抽象成函数体入参。(在源码阅读时如果不知道当前解析到哪个阶段了,也可以通过这个来大致判断)

        第三个是比较核心的一行代码,其调用sessionstate对象中的sqlparser工具类对sql文件进行解析。查看parsePlan的返回类型可以知道,在这一步实现了spark sql解析的第一阶段,获得了未解析的逻辑计划

         通过查看plan的详细信息可以知道,我们要查询的person还属于一个未解析的状态,因此也可以判定此时仅获取了未解析的逻辑计划。

        引申:sql语法树不知道大家以前熟悉不,它正常的顺序大体是:先是各种字段映射、其次是操作、最后是底层源,通过它可以形象的理解记忆常说的谓词下推其实就是将各种计算操作尽量下移接近具体的数据源,这样上层需要处理的数据量就会小一些。

接着向下看:

        这一步一开始又有withActive,不再重复介绍。我们前面概念部分介绍的两个关键对象sessionState、QueryExecution,前者在上一步出现了,后者在这一步出现。可以看到其构造器入参有当前的sparkSession、未解析的逻辑计划、统计不同解析阶段指标的追踪器tracker。

接着又到了关键性的一步:

         首先根据assertAnalyzed方法找到analyzed方法变量 ,也就是图中标示的从1到2,接下来看关键点3,跟前面获取未解析的逻辑计划操作基本一样,因此可以判断下面的4肯定是获取解析后逻辑计划的操作。在4操作中首先是从sessionState中获取工具类analyzer,然后再执行解析逻辑。

接下来看下executeAndCheck方法的逻辑:

        标记1是一个记录方法,记录当前是否处于解析阶段。标记2和标记3是标记1方法中的入参函数。其中最重要的就是标记2,它里面包含了未解析逻辑计划转换为解析后逻辑计划的具体过程。标记3则是对解析后的逻辑计划进行校验,有不符合的条件情况直接抛出异常。

        再具体看标记2的逻辑之前,再引入一些知识。没有这些知识,源码可能读的还是很似懂非懂。

        Analyzed和Optimizer都继承RuleExecutor,最后都会调用RuleExecutor中的executeAndCheck、execute等方法。不过execute方法中迭代遍历的batches确是来源于Analyzed和Optimizer类。

接着我们向下看:

        可以看到当我们执行executeAndTrack时,我们从子类Analyzer跳到了父类RuleExecution中。然后在父类中会执行一些公共操作,比较核心的是迭代一批batches到所有的语法树节点。进而达到改变语法树(逻辑计划)的目的。

        可以看到当前的batched为空,所以具体的规则批还需要到具体的实现类中去看。这里我们到Analyzer中去看。

         由于篇幅问题,这里不全部展示,感兴趣的可以打开源码看下。

 再接着向下看:

        其实sparkSession.sql的执行基本就是获得解析后的逻辑计划,剩下优化的逻辑计划和物理计划需要行为算子触发,所以我们接着看show方法。

 

 上面是一些流程性代码,这里不多讲解,接下来这个代码很重要了:

        这里有三处重点:第一是withAction函数的使用,表明后面肯定有行为操作 ,通过查看withAction的入参类型可以知道collectFromPlan是一个行为操作;第二是limit(n).queryExecution方法返回的具体是什么,其实点击查看limit方法可以知道:

        它是在当前logicalPlan的基础上又封了一层limit算子,至于logicalPlan就不点击查看了,它返回的正是该DataSet所对应的解析的逻辑计划。 最后通过withTypedPlan方法处理,返回的是一个新的DataSet数据集。因此limit(n)返回的是一个包含前面所有解析结果的新DataSet,然后queryExecution其实就是获取包含之前结果的新的QueryExecution对象;第三是collectFromPlan,它用于触发计算。

接下来回过头来看下withAction的实现:

         这一步调用了withNewExecutionId方法,其中函数体入参中还有action(qe.executedPlan)方法调用,因此可以推断在withNewExecutionId方法中,在函数体执行前一定会有后续解析阶段的执行。接下来我们到withNewExecutionId中看一下:

        可以看到,优化后的逻辑计划以及物理计划都出现了,下面我们先看下优化逻辑计划的调用流程。

        这块几乎跟ANALYSIS的解析阶段一致,首先是入参标识当前解析的阶段,其次调用sessionState中该阶段解析工具类进行解析。唯一不同的是ANALYSIS阶段解析时直接传入的逻辑计划,在当前OPTIMIZATION阶段传入的是withCacheData.clone(),该方法首先是判断当前是否已经是解析后的逻辑计划,如果不是要先进行解析。其次是从共享状态中复制已解析后的计划从而进行优化。再之后的流程仍是调用父类execute方法,但是规则批batches从子类optimizer中获取。因为类似,所以这里不在过多讲解,我们接着向下看。

接下来物理计划的生成:

         可以看到首先进入的是100行处的executedPlan代码,该方法中首先判断优化阶段是否已经被执行过,其次传参标识当前为PLANNING阶段,PLANNING阶段又由两个阶段组成,分别是物理计划阶段和可执行物理计划阶段组成。在第187行代码可以看到,在可执行物理计划生成之前会去87行先调用生成物理计划阶段。这两个阶段全用同一个标识PLANNING标识。

        我们先看下物理阶段的生成,也就是87行处的方法:

         这里的参数planner正是用于物理计划阶段进行处理的工具类

        优化后的逻辑计划获取方式与解析后的逻辑计划获取方式一样,这里不过多赘述。我们来看下createSparkPlan方法:

        这里的注释 TODO 注释可以留意一下,大意就是产生的物理计划我们取第一个,后续会实现挑选最优的物理计划返回。因此我们后面的代码中可能会返回多个物理计划,但是选择是只选择了第一个。

 

         plan方法是物理计划生成中的关键一步,首先是在标记1的地方,它根据策略迭代处理优化后的计划进而生成不同的物理计划候选者。因此有可能一个逻辑计划会生成多个物理计划。其次在标记2的地方,会完善生成的物理计划。最后在标记3处,对生成的物理计划进行剪枝处理,进而优化查询效率。三个地方的细节都很多,为了不让本文的主题跑偏,这里就点到为止。

        可执行物理计划的生成其实是在物理计划的基础上进一步迭代应用了相关的规则进行处理。由于这不是本文的重点,所以具体的细节不在赘述。另外如何从可执行物理计划获取最终的RDD这里也没有讲解,后续会出专门的文章进行介绍。这里讲的话会失去本文的主题。

        到此为止,spark sql整体的解析流程讲解完毕,但是对于可执行物理计划的处理讲解的不是很细致,但是如果点击源码查看的话会发现可讲的流程与之前都类似。至于最后如何从物理计划获取RDD,即action算子的底层实现代码。这里没有进行讲解。后续会有专门文章进行介绍。

 4、总结:

1)从SQL语句的解析一直到提交之前,整个转换过程都在spark集群的Driver端进行,不涉及分布式环境

2)Analyzed和Optimizer都继承RuleExecutor,最后都会调用RuleExecutor中的executeAndCheck、execute等方法。不过execute方法中迭代遍历的batches确是来源于Analyzed和Optimizer类,所以在看到这一块源码时一定要明白。不要误以为两个阶段的处理都是一样的。

3)解析过程中很多操作其实都是懒加载或者函数当做入参的,如果debug追踪的时候有断点不生效等问题,遇到这种问题可以先把其它断点都去掉,只打目标处的断点。然后从目标处断点再接着向下追查。

4)本文主要阐述一个清晰的sql解析流程,对于比较重要的analyzer解析原理,以及有哪些解析规则。Optimizer拥有的解析规则等都将放在后续文章中讲解。

5)文中语法树的用词在某些地方可能不太准确,之所以这么叫主要为了方便形象话的理解记忆。如果从标准出发的话,统一叫做树应该是没问题的

6)3.0.1版本的spark sql中,物理计划的选择是选取第一个,后续会实现挑选最优物理计划的逻辑

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/389138.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

JUC并发编程与源码分析笔记11-Java对象内存布局和对象头

先从阿里及其它大厂面试题说起 你觉得目前面试,你还有那些方面理解的比较好,我没问到的,我说了juc和jvm以及同步锁机制那先说juc吧,说下aqs的大致流程cas自旋锁,是获取不到锁就一直自旋吗?cas和synchronized区别在哪…

国内的PMP考试通过率高达97%?

自认为是虚高,虽然国人在考试方面的确独树一帜的强,应该也没有这样夸张。 如果自学,大概是50%,如果有老师教,那大概是60%到80%,还是比较高的。 为什么自学那么低?除了自身的自制力的问题&…

【编程基础之Python】9、Python中的变量

【编程基础之Python】9、Python中的变量Python中的变量变量的定义和赋值变量的命名规范变量的类型变量的作用域变量的赋值特殊的变量删除变量总结Python中的变量 在Python中,变量是用来存储数据的一种方式。Python是一种动态类型语言,因此在声明变量时不…

JWT利用在ctfhub-easy_login拿到flag

目录 什么是JWT? jwt由三个部分组成:header.payload.signature header部分: payload部分:声明 signature部分: JWT验证过程: ctfhub-easy_login 目的:拿到flag 过程分析以及实操&#x…

阿里云轻量服务器--Docker--Nacos安装(使用外部Mysql数据存储)

前言:docker 安装nacos 如果不设置外部的mysql 默认使用内嵌的内嵌derby为数据源,这个时候如果,重新部署nacos 则会造成原有数据丢失情况; 1 默认安装的nacos 启动后使用的是内嵌的存储: 2 使用外部mysql 作为存储&a…

Ubuntu 18.04 出现GLIBC_2.28 not found的解决方法

关于/lib/x86_64-linux-gnu/libc.so.6: version GLIBC_2.28’ not found出现报错,建议不要使用源码包去编译并升级。在下文有分享一个使用官方的Debian软件包去升级使用的方法。仅供参考! 环境 # uname -a Linux Ubuntu 5.4.0-144-generic #161~18.04.…

[1.4]计算机系统概述——操作系统的体系结构

第一章 计算机系统概述 操作系统的体系结构 大内核/单内核/宏内核微内核 通过之前的学习,我们知道计算机系统的层次结构是这样的。 但是操作系统的内部其实还可以再进一步地划分。 一部分是内核的功能,一部分是非内核的功能。 操作系统最核心的功能&…

计及需求响应的粒子群算法求解风能、光伏、柴油机、储能容量优化配置(Matlab代码实现)

👨‍🎓个人主页:研学社的博客💥💥💞💞欢迎来到本博客❤️❤️💥💥🏆博主优势:🌞🌞🌞博客内容尽量做到思维缜密…

【云原生】Trace、Metrics、Logging 选型

背景分布式追踪的起源自从微服务的兴起开始,整个系统架构开始变得极为庞大和复杂,但是服务之间的调用关系,调用消耗时间等等信息却依然是半黑盒的状态。为了能够将调用的链路进行串联,将系统的各种指标数据展示出来以使得系统的链…

哈希->模拟实现+位图应用

致前行路上的人: 要努力,但不要着急,繁花锦簇,硕果累累都需要过程! 目录 1. unordered系列关联式容器 1.1 unordered_map 1.1.1概念介绍: 1.1.2 unordered_map的接口说明 1.2unordered_set 1.3常见面试题oj…

到底什么才是幻读?

💗推荐阅读文章💗 🌸JavaSE系列🌸👉1️⃣《JavaSE系列教程》🌺MySQL系列🌺👉2️⃣《MySQL系列教程》🍀JavaWeb系列🍀👉3️⃣《JavaWeb系列教程》…

【NLP相关】基于现有的预训练模型使用领域语料二次预训练

❤️觉得内容不错的话,欢迎点赞收藏加关注😊😊😊,后续会继续输入更多优质内容❤️👉有问题欢迎大家加关注私戳或者评论(包括但不限于NLP算法相关,linux学习相关,读研读博…

《七》JavaScript 中的作用域、作用域链、执行上下文、执行上下文栈

JS 引擎会在执行所有代码之前,先在堆内存中创建一个全局对象(Global Object、GO),包含 String、Math、Date、parseInt() 等属性和方法。所有作用域都可以访问这个全局对象。 在浏览器中 Global Object 就是 Window 对象。 执行上…

不用机器学习不用大数据,给你讲通ChatGPT的深层原理

ChatGPT现在看来已经异常火爆了,很多人已经熟知,并且开始练习使用或者开始利用他开始实践了。但仍然有很多人在观望,在疑惑,今天狗哥不用那些高端大气的机器学习亦或是大数据还给你讲通ChatGPT深层到底是个啥逻辑。 目录 1. 聊家…

CV——dy83 接昨天的论文中DAM模块:压缩-激励的宽残差网络在图像分类中的应用

压缩-激励的宽残差网络在图像分类中的应用(ICIP 2019)1. INTRODUCTION2. PROPOSED METHODS2.1 总体框架2.2 通道的重要性3. EXPERIMENTS3.1 Datasets3.2 训练和测试的设置3.3 分类结果及分析4. CONCLUSIONSQUEEZE-AND-EXCITATION WIDE RESIDUAL NETWORKS…

CSS 选择器以及CSS常用属性

目录 🐇今日良言:可以不光芒万丈,但不要停止发光 🐯一、写CSS的三种方法 🐯二、CSS选择器的常见用法 🐯三、CSS常用属性 🐇今日良言:可以不光芒万丈,但不要停止发光 🐯一、写CSS的三种方法 CSS的基本语…

目标检测开源数据集汇总

导 读本文汇总了一些开源目标检测类的数据集,附下载链接。多显著性对象数据集数据集链接:http://m6z.cn/5AsmXB本数据集共有 1224 张图像来自四个公共图像数据集:COCO、VOC07、ImageNet 和 SUN。Amazon Mechanic Turk 工作人员将每个图像标记…

Firebase入门使用 01

官网 firebase.google.com 解决问题 firebase 帮助解决 数据库 和 API之间的问题 这样我们就可以 集中精力开创应用。 快速上手样例指南 https://github.com/firebase 提供的服务 其中80%用不到,下面是一些我们可以用到的服务。 Authentication:用户认证管理…

Qt安装与使用经验分享;无.pro文件;无QTextCodec file;Qt小试;界面居中;无缝;更换Qt图标;更换Qt标题。

1、切换安装下载源 《Qt安装教程》先推荐一篇安装文章:《Qt安装教程》 Qt 5.15 之后已经不提供离线安装包了,就是那个 3.7G 的 exe 安装包。请看官方说明,所以只能用在线安装包。 1,下载在线安装包 QT 在线安装包链接&#xff…

基于WSL2和Clion搭建Win下C开发环境

系列文章目录 一、基于WSL2和Clion搭建Win下C开发环境 二、make、makeFile、CMake、CMakeLists的使用 三、全面、详细、通俗易懂的C语言语法和标准库 文章目录系列文章目录前言WSL2安装WSL常用命令VSCode连接WSLroot密码以systemd启动配置sshClion结语前言 Win下C语言开发环境…