spark sql解析sql主要基于Catalyst框架,它将复杂的sql解析分为很多的阶段,每个阶段基本都有专属的工具类和扩展接口,最终实现将sql转换为DataFrame或RDD任务的功能。如果对于这些中间阶段和工具类没有一个整体概念性的了解,那阅读源码会很吃力。下面我们先大致讲解一下spark sql比较关键的阶段、然后再从源码角度追踪探查一下。
特别提醒:很多概念名词文中给出的是一个笼统的讲解,为的是后续阅读源码时不那么吃力,实际上很多概念包含丰富的理论知识,感兴趣的可以自己在额外查阅了解。
1、spark sql核心阶段
未解析的逻辑计划:
原始sql经过语法解析后(底层使用的是Antlr框架)就变成未解析的逻辑计划,这里的逻辑计划可以理解为语法树。后续所有的解析和操作也是通过遍历树以及修改匹配的树节点来实现的。
解析工具:sqlParser
逻辑计划(也称为解析后的逻辑计划):
这一步是在上一步的语法树基础上操作的,上一步我们获得了基本的语法树,但是语法树中要查的表、字段等信息并没有经过验证。所以这一步就是和数据源连接,将语法树中未解析的表、字段等信息转换为已解析状态。
解析工具:analyzer
优化后逻辑计划:
通过前两个阶段,spark sql可以获得完整的sql语法树信息。这个阶段主要的功能是对于一些可以优化的场景,spark sql都会自动进行一个sql语法树(逻辑计划)的改写。这个过程的实现主要依赖于模式匹配以及访问者模式的使用。
解析工具:optimizer
物理计划:
将语法树上关键节点的动作转换为spark内部的方法,
解析工具:planner
完整的解析流程可以用下图表示,可以看上述几个阶段其实都是Catalyst流程中的一部分,而且在物理计划之后还有一个基于代价挑选最优物理计划的过程(在3.0.1版本中是直接挑选第一个物理计划)。这块在后续代码中也会简单看一下。
除了上述几个关键的解析工具外,还要再留意两个对象,一个是SessionState,它包含了上述所有的解析工具;另外一个是QueryExecution,spark 将sql解析的各个中间阶段状态以及所用的工具封装成QueryExecution对象,所以通过QueryExecution可以获得各个阶段的语法树。
2、catalyst三个重要概念
2.1 InternalRow体系
在spark sql内部实现中,InternalRow就是用来表示一行行数据的类。物理算子树节点产生和转换的RDD类型即为RDD[InternalRow]。另外要注意的是InternalRow中每一列所对应的数据类型其实是Catalyst内部定义的数据类型,所以当自定义进行一些数据转换时,可能需要进行适当的类型转换。
2.2 TreeNode体系
无论逻辑计划还是物理计划,都离不开承载内容的中间数据结构,在catalyst中,对应的是TreeNode体系。TreeNode类是spark sql中所有树结构的基类,定义了一系列通用的集合操作和树遍历操作接口。
TreeNode一直在内存里维护,不会dump到磁盘以文件形式存储,且树的修改都是以替换已有节点的方式进行的。
2.3 Expression体系
表达式一般指的是不需要触发执行引擎而能直接进行计算的单元,例如加减乘除四则运算、转换操作、过滤操作等。在Expression类中,主要定义了5个方面的操作,包括基本属性、核心操作、输入输出、字符串表示和等价性判断。
在spark sql中,Expression本身也是TreeNode类的子类,因此能够调用所有TreeNode的方法。也可以通过多级的子Expression组合成复杂的Expression。
3、源码追踪
3.1 demo代码
def main(args: Array[String]): Unit = {
val sparkSession = SparkSession.builder
.appName("test")
.master("local")
.getOrCreate
import org.apache.spark.sql.types._
val schema = StructType(
List(StructField("id", IntegerType, nullable = false),
StructField("name", StringType, nullable = false),
StructField("age", IntegerType, nullable = false))
)
val rdd = sparkSession.sparkContext.parallelize(Seq(
Row(1, "xiaohong", 34),
Row(2, "xiaoli", 42),
Row(3, "xiaoming", 28),
))
val df = sparkSession.sqlContext.createDataFrame(rdd, schema)
df.createOrReplaceTempView("person")
val df2 = sparkSession.sql("select id,name from person where age > 30")
df2.show()
}
3.2 debug
前面创建临时表的过程直接跳过(底层其实还是走的catalyst框架),直接从sparkSession.sql这里开始看:
这一步有三个需要留意的点。
第一个就是withActive,它的功能是将当前会话对象存入线程局部变量,在withActive中的代码块执行结束后再替换为原先的会话对象, 它的语法特性是入参是一个方法体,所以用大括号括起来。
第二个是追踪对象tracker,它会被一直传递下去,进而记录sql解析各个阶段的指标信息,注意measurePhase是一个高阶函数,它将非公共功能抽象成字符串入参,将公共功能抽象成函数体入参。(在源码阅读时如果不知道当前解析到哪个阶段了,也可以通过这个来大致判断)
第三个是比较核心的一行代码,其调用sessionstate对象中的sqlparser工具类对sql文件进行解析。查看parsePlan的返回类型可以知道,在这一步实现了spark sql解析的第一阶段,获得了未解析的逻辑计划。
通过查看plan的详细信息可以知道,我们要查询的person还属于一个未解析的状态,因此也可以判定此时仅获取了未解析的逻辑计划。
引申:sql语法树不知道大家以前熟悉不,它正常的顺序大体是:先是各种字段映射、其次是操作、最后是底层源,通过它可以形象的理解记忆常说的谓词下推其实就是将各种计算操作尽量下移接近具体的数据源,这样上层需要处理的数据量就会小一些。
接着向下看:
这一步一开始又有withActive,不再重复介绍。我们前面概念部分介绍的两个关键对象sessionState、QueryExecution,前者在上一步出现了,后者在这一步出现。可以看到其构造器入参有当前的sparkSession、未解析的逻辑计划、统计不同解析阶段指标的追踪器tracker。
接着又到了关键性的一步:
首先根据assertAnalyzed方法找到analyzed方法变量 ,也就是图中标示的从1到2,接下来看关键点3,跟前面获取未解析的逻辑计划操作基本一样,因此可以判断下面的4肯定是获取解析后逻辑计划的操作。在4操作中首先是从sessionState中获取工具类analyzer,然后再执行解析逻辑。
接下来看下executeAndCheck方法的逻辑:
标记1是一个记录方法,记录当前是否处于解析阶段。标记2和标记3是标记1方法中的入参函数。其中最重要的就是标记2,它里面包含了未解析逻辑计划转换为解析后逻辑计划的具体过程。标记3则是对解析后的逻辑计划进行校验,有不符合的条件情况直接抛出异常。
再具体看标记2的逻辑之前,再引入一些知识。没有这些知识,源码可能读的还是很似懂非懂。
Analyzed和Optimizer都继承RuleExecutor,最后都会调用RuleExecutor中的executeAndCheck、execute等方法。不过execute方法中迭代遍历的batches确是来源于Analyzed和Optimizer类。
接着我们向下看:
可以看到当我们执行executeAndTrack时,我们从子类Analyzer跳到了父类RuleExecution中。然后在父类中会执行一些公共操作,比较核心的是迭代一批batches到所有的语法树节点。进而达到改变语法树(逻辑计划)的目的。
可以看到当前的batched为空,所以具体的规则批还需要到具体的实现类中去看。这里我们到Analyzer中去看。
由于篇幅问题,这里不全部展示,感兴趣的可以打开源码看下。
再接着向下看:
其实sparkSession.sql的执行基本就是获得解析后的逻辑计划,剩下优化的逻辑计划和物理计划需要行为算子触发,所以我们接着看show方法。
上面是一些流程性代码,这里不多讲解,接下来这个代码很重要了:
这里有三处重点:第一是withAction函数的使用,表明后面肯定有行为操作 ,通过查看withAction的入参类型可以知道collectFromPlan是一个行为操作;第二是limit(n).queryExecution方法返回的具体是什么,其实点击查看limit方法可以知道:
它是在当前logicalPlan的基础上又封了一层limit算子,至于logicalPlan就不点击查看了,它返回的正是该DataSet所对应的解析的逻辑计划。 最后通过withTypedPlan方法处理,返回的是一个新的DataSet数据集。因此limit(n)返回的是一个包含前面所有解析结果的新DataSet,然后queryExecution其实就是获取包含之前结果的新的QueryExecution对象;第三是collectFromPlan,它用于触发计算。
接下来回过头来看下withAction的实现:
这一步调用了withNewExecutionId方法,其中函数体入参中还有action(qe.executedPlan)方法调用,因此可以推断在withNewExecutionId方法中,在函数体执行前一定会有后续解析阶段的执行。接下来我们到withNewExecutionId中看一下:
可以看到,优化后的逻辑计划以及物理计划都出现了,下面我们先看下优化逻辑计划的调用流程。
这块几乎跟ANALYSIS的解析阶段一致,首先是入参标识当前解析的阶段,其次调用sessionState中该阶段解析工具类进行解析。唯一不同的是ANALYSIS阶段解析时直接传入的逻辑计划,在当前OPTIMIZATION阶段传入的是withCacheData.clone(),该方法首先是判断当前是否已经是解析后的逻辑计划,如果不是要先进行解析。其次是从共享状态中复制已解析后的计划从而进行优化。再之后的流程仍是调用父类execute方法,但是规则批batches从子类optimizer中获取。因为类似,所以这里不在过多讲解,我们接着向下看。
接下来物理计划的生成:
可以看到首先进入的是100行处的executedPlan代码,该方法中首先判断优化阶段是否已经被执行过,其次传参标识当前为PLANNING阶段,PLANNING阶段又由两个阶段组成,分别是物理计划阶段和可执行物理计划阶段组成。在第187行代码可以看到,在可执行物理计划生成之前会去87行先调用生成物理计划阶段。这两个阶段全用同一个标识PLANNING标识。
我们先看下物理阶段的生成,也就是87行处的方法:
这里的参数planner正是用于物理计划阶段进行处理的工具类
优化后的逻辑计划获取方式与解析后的逻辑计划获取方式一样,这里不过多赘述。我们来看下createSparkPlan方法:
这里的注释 TODO 注释可以留意一下,大意就是产生的物理计划我们取第一个,后续会实现挑选最优的物理计划返回。因此我们后面的代码中可能会返回多个物理计划,但是选择是只选择了第一个。
plan方法是物理计划生成中的关键一步,首先是在标记1的地方,它根据策略迭代处理优化后的计划进而生成不同的物理计划候选者。因此有可能一个逻辑计划会生成多个物理计划。其次在标记2的地方,会完善生成的物理计划。最后在标记3处,对生成的物理计划进行剪枝处理,进而优化查询效率。三个地方的细节都很多,为了不让本文的主题跑偏,这里就点到为止。
可执行物理计划的生成其实是在物理计划的基础上进一步迭代应用了相关的规则进行处理。由于这不是本文的重点,所以具体的细节不在赘述。另外如何从可执行物理计划获取最终的RDD这里也没有讲解,后续会出专门的文章进行介绍。这里讲的话会失去本文的主题。
到此为止,spark sql整体的解析流程讲解完毕,但是对于可执行物理计划的处理讲解的不是很细致,但是如果点击源码查看的话会发现可讲的流程与之前都类似。至于最后如何从物理计划获取RDD,即action算子的底层实现代码。这里没有进行讲解。后续会有专门文章进行介绍。
4、总结:
1)从SQL语句的解析一直到提交之前,整个转换过程都在spark集群的Driver端进行,不涉及分布式环境。
2)Analyzed和Optimizer都继承RuleExecutor,最后都会调用RuleExecutor中的executeAndCheck、execute等方法。不过execute方法中迭代遍历的batches确是来源于Analyzed和Optimizer类,所以在看到这一块源码时一定要明白。不要误以为两个阶段的处理都是一样的。
3)解析过程中很多操作其实都是懒加载或者函数当做入参的,如果debug追踪的时候有断点不生效等问题,遇到这种问题可以先把其它断点都去掉,只打目标处的断点。然后从目标处断点再接着向下追查。
4)本文主要阐述一个清晰的sql解析流程,对于比较重要的analyzer解析原理,以及有哪些解析规则。Optimizer拥有的解析规则等都将放在后续文章中讲解。
5)文中语法树的用词在某些地方可能不太准确,之所以这么叫主要为了方便形象话的理解记忆。如果从标准出发的话,统一叫做树应该是没问题的。
6)3.0.1版本的spark sql中,物理计划的选择是选取第一个,后续会实现挑选最优物理计划的逻辑。