一、前言
在上一节【SPARKSQL3.0-Unresolved[Parsed]阶段源码剖析】中已经介绍了Unresolved Logical Plan未解析阶段,建议先看完上一篇文章后再来看本文
由于Unresolved LogicalPlan阶段生成的logicalPlan仅仅是一种数据结构,不包含任何数据信息。故在分析器(Analyzer) 阶段会使用事先定义好的规则(Rule)以及 Catalog 等信息对未解析的逻辑计划(Unresolved Logical Plan) 进行补充和替换【logicalPlan】中的各个节点,让新的语法树包含元数据信息
Catalog 主要用于各种函数资源信息和元数据信息(数据库、数据表、数据视图、数据分区与函数等)的统一管理。
二、示例:
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("sparkSQL")
val spark = SparkSession.builder()
.config(sparkConf)
.getOrCreate()
case class Person(name: String, age: Int)
Seq(Person("Jack", 12), Person("James", 21), Person("Mac", 30)).toDS().createTempView("person")
spark.sql("SELECT * FROM PERSON WHERE AGE > 18").explain(true) // 打印执行计划
执行计划:
== Parsed Logical Plan ==
'Project [*]
+- 'Filter ('AGE > 18)
+- 'UnresolvedRelation `PERSON`
== Analyzed Logical Plan ==
name: string, age: int
Project [name#10, age#11]
+- Filter (AGE#11 > 18)
+- SubqueryAlias person
+- LocalRelation [name#10, age#11]
三、源码
根据上面的例子我们进入到sparkSession.sql中:
plan就是sqlParser解析出来的logicalPlan【上一节已经讲过其详细过程】,然后就进入到Dataset.ofRows,可以看到只要进入到此函数都会构建了一个QueryExecution【可以理解成spark会为每一个sql语句创建一个QueryExecution】,然后调用QueryExecution的assertAnalyzed函数,接下来看QueryExecution类
QueryExecution极其重要,它是sql查询的主要工作流,它包含了一条sql语句需要执行的所有计划:
- logical是未解析的逻辑执行计划
- analyzed是解析后的逻辑执行计划
- optimizedPlan是优化后的逻辑执行计划
- executePlan是物理执行计划
- sparkPlan是Spark执行计划
可以看出其内部都是使用的sparkSession.sessionState的 分析器,优化器,语法器等,关于sessionState的讲解可以看之前的文章
我们回头看qe.assertAnalyzed()调用,下图可以看到是调用了sessionState.analyzer的executeAndCheck函数
接下来看sessionState.analyzer是哪个变量
Session是由BaseSessionStateBuilder构建出来,analyzer变量构建的是Analyzer类,并且在类中重写了一些规则
找到了Analyzer,接下来看其中的executeAndCheck函数,发现其内部调用了executeAndTrack
而Analyzer中没有executeAndTrack函数,此处是调用的的父类RuleExecutor的executeAndTrack函数,函数内部又调用了execute函数
execute函数被子类Analyzer重写,但重写函数最终上还是调用父类RuleExecutor的execute
在RuleExecutor的execute函数中,最重要的是循环调用batches,这里贴一下源码:
核心思路是curPlan迭代一次会获新的curPlan并检查一下是否等于上一次的lastPlan,如果相等则无需下一次迭代,否则继续迭代,直到和lastPlan相等或者超过最大迭代次数[默认100]
def execute(plan: TreeType): TreeType = {
var curPlan = plan
val queryExecutionMetrics = RuleExecutor.queryExecutionMeter
val planChangeLogger = new PlanChangeLogger()
val tracker: Option[QueryPlanningTracker] = QueryPlanningTracker.get
val beforeMetrics = RuleExecutor.getCurrentMetrics()
// 针对初始输入plan,运行结构完整性检查
if (!isPlanIntegral(plan)) {
val message = "The structural integrity of the input plan is broken in " +
s"${this.getClass.getName.stripSuffix("$")}."
throw new TreeNodeException(plan, message, null)
}
// 遍历不同子类实现的batches中定义的 batchs 变量, 此处batches是用的Analyzer子类的实现
batches.foreach { batch =>
// 用来对比执行规则前后,初始的plan有无变化
val batchStartPlan = curPlan
var iteration = 1
var lastPlan = curPlan
var continue = true
// 执行直到达到稳定点或者最大迭代次数
while (continue) {
curPlan = batch.rules.foldLeft(curPlan) {
case (plan, rule) =>
val startTime = System.nanoTime()
val result = rule(plan)
val runTime = System.nanoTime() - startTime
val effective = !result.fastEquals(plan)
if (effective) {
queryExecutionMetrics.incNumEffectiveExecution(rule.ruleName)
queryExecutionMetrics.incTimeEffectiveExecutionBy(rule.ruleName, runTime)
planChangeLogger.logRule(rule.ruleName, plan, result)
}
queryExecutionMetrics.incExecutionTimeBy(rule.ruleName, runTime)
queryExecutionMetrics.incNumExecution(rule.ruleName)
tracker.foreach(_.recordRuleInvocation(rule.ruleName, runTime, effective))
if (!isPlanIntegral(result)) {
val message = s"After applying rule ${rule.ruleName} in batch ${batch.name}, " +
"the structural integrity of the plan is broken."
throw new TreeNodeException(result, message, null)
}
result
}
iteration += 1
// 到达最大迭代次数, 不再执行优化
if (iteration > batch.strategy.maxIterations) {
// 只对最大迭代次数大于1的情况打log
if (iteration != 2) {
val endingMsg = if (batch.strategy.maxIterationsSetting == null) {
"."
} else {
s", please set '${batch.strategy.maxIterationsSetting}' to a larger value."
}
val message = s"Max iterations (${iteration - 1}) reached for batch ${batch.name}" +
s"$endingMsg"
if (Utils.isTesting || batch.strategy.errorOnExceed) {
throw new TreeNodeException(curPlan, message, null)
} else {
logWarning(message)
}
}
// 检查一次幂等
if (batch.strategy == Once &&
Utils.isTesting && !excludedOnceBatches.contains(batch.name)) {
checkBatchIdempotence(batch, curPlan)
}
continue = false
}
// plan不变了,到达稳定点,不再执行优化
if (curPlan.fastEquals(lastPlan)) {
logTrace(
s"Fixed point reached for batch ${batch.name} after ${iteration - 1} iterations.")
continue = false
}
lastPlan = curPlan
}
planChangeLogger.logBatch(batch.name, batchStartPlan, curPlan)
}
planChangeLogger.logMetrics(RuleExecutor.getCurrentMetrics() - beforeMetrics)
curPlan
}
}
可以看到将每个Batch中所有的规则Rule对象实施于该Unsolved LogicalPlan,并且该Batch中规则可能要执行多轮,直到执行的批数等于batch.strategy.maxIterations或者logicalplan与上个批次的结果比没有变化,则退出执行, batch.strategy.maxIterations次数每个batch都不同,有的batch是1,有的batch则是通过spark的配置文件中的默认值,此默认值在spark3.0中是100次,下面有讲到
RuleExecutor使用模版设计模式,其batches抽象函数由其子类实现,也就是Analyzer的batches
Analyzer的batches = Seq[Batch],是由多个Batch 组成 ,这里贴一下源码:
lazy val batches: Seq[Batch] = Seq(
// Hint策略组
Batch("Hints", fixedPoint,
new ResolveHints.ResolveJoinStrategyHints(conf),
new ResolveHints.ResolveCoalesceHints(conf)),
// 简单检查策略组
Batch("Simple Sanity Check", Once,
LookupFunctions),
// 替换策略组
Batch("Substitution", fixedPoint,
CTESubstitution,
WindowsSubstitution,
EliminateUnions,
new SubstituteUnresolvedOrdinals(conf)),
// 最关键的关系策略组
Batch("Resolution", fixedPoint,
ResolveTableValuedFunctions ::
ResolveNamespace(catalogManager) ::
new ResolveCatalogs(catalogManager) ::
ResolveInsertInto ::
ResolveRelations ::
ResolveTables ::
ResolveReferences ::
ResolveCreateNamedStruct ::
ResolveDeserializer ::
ResolveNewInstance ::
ResolveUpCast ::
ResolveGroupingAnalytics ::
ResolvePivot ::
ResolveOrdinalInOrderByAndGroupBy ::
ResolveAggAliasInGroupBy ::
ResolveMissingReferences ::
ExtractGenerator ::
ResolveGenerate ::
ResolveFunctions ::
ResolveAliases ::
ResolveSubquery ::
ResolveSubqueryColumnAliases ::
ResolveWindowOrder ::
ResolveWindowFrame ::
ResolveNaturalAndUsingJoin ::
ResolveOutputRelation ::
ExtractWindowExpressions ::
GlobalAggregates ::
ResolveAggregateFunctions ::
TimeWindowing ::
ResolveInlineTables(conf) ::
ResolveHigherOrderFunctions(v1SessionCatalog) ::
ResolveLambdaVariables(conf) ::
ResolveTimeZone(conf) ::
ResolveRandomSeed ::
ResolveBinaryArithmetic ::
TypeCoercion.typeCoercionRules(conf) ++
extendedResolutionRules : _*),
// 执行的钩子策略组
Batch("Post-Hoc Resolution", Once, postHocResolutionRules: _*),
// Alter Table策略组
Batch("Normalize Alter Table", Once, ResolveAlterTableChanges),
// 移除未解析的Hint策略组
Batch("Remove Unresolved Hints", Once,
new ResolveHints.RemoveAllHints(conf)),
// 非确定策略组
Batch("Nondeterministic", Once,
PullOutNondeterministic),
// UDF策略组
Batch("UDF", Once,
HandleNullInputsForUDF),
// 更新为null策略组
Batch("UpdateNullability", Once,
UpdateAttributeNullability),
// 子查询策略组
Batch("Subquery", Once,
UpdateOuterReferences),
// 清理策略组
Batch("Cleanup", fixedPoint,
CleanupAliases)
)
再来看Batch,一个 Batch 包含 name , Strategy 和一个或多个Rule 构成的 Rules
Strategy 定义了该 Batch 的最大执行次数,最大迭代次数
Rules 是logcial解析的关键, Analyzer 对 LogicalPlan 分析的过程其实就是对 plan 逐个应用 Rule 的过程。
Rule有一堆的子类,每个子类的解析各不相同
接下来回到RuleExecutor的execute函数【对照上面源码】
处理的流程为依次遍历 Batches ,使用每一个 Batch 处理 LocgicalPlan 。对于每一个 Batch ,依次使用该 Batch 中的 Rule 处理 plan ,处理完的 plan 作为该 Batch 中的下一条 Rule 处理的输入继续处理。经过该 Batch 所有 Rules 处理过的 plan 再重复上述的 Rule 处理过程,直至达到该 Batch 的最大运行次数或该 plan 不再变化为止。然后接着使用下一个 Batch 继续处理该 plan 。具体流程可参考以下流程图。
补充一下,有些batch的策略中最大迭代次数为100:
再回到上一节我们得到的logicalPlan,可以看出此处到达execute的plan正是Project【logicalPlan】
== Parsed Logical Plan ==
'Project [*]
+- 'Filter ('AGE > 18)
+- 'UnresolvedRelation `PERSON`
而经过execute阶段我们应该返回的是:
== Analyzed Logical Plan ==
name: string, age: int
Project [name#10, age#11]
+- Filter (AGE#11 > 18)
+- SubqueryAlias person
+- LocalRelation [name#10, age#11]
这中间的变化就是batches.foreach循环执行不同策略后的结果
我们回到batches.foreach循环策略组阶段,这里直接告诉大家,当循环到Resolution【关系策略】中的ResolveRelations规则时才会真正执行解析
这里贴一下Resolution【关系策略】的规则列表
在经历了ResolveRelations规则后,plan变成了新的Project:
'Project [*]
+- 'Filter ('AGE > 18)
+- SubqueryAlias person
+- LocalRelation [name#2, age#3]
可以看出根节点从之前的UnresolvedRelation `PERSON -> LocalRelation [name#2, age#3]
并且加上了一层 SubqueryAlias person
那么接下来看一下ResolveRelations策略的执行逻辑
ResolveRelations的apply函数中,将project交给了ResolveTempViews【object类】
在ResolveTempViews 类中的apply函数中调用了resolveOperatorsUp函数,并将函数参数传进去
在resolveOperatorsUp函数总会判断当前project是否已经被analyzed解析,如果没有解析则会递归调用所有子节点,自下而上
既然是自下而上,那么最底层就是 ± 'UnresolvedRelation PERSON
节点,此时就用到了函数传参的解析规则:
identifier便是 person字符串
此时用到了v1SessionCatalog :SessionCatalog,进入到SessionCatalog类的lookupTempView
在getTempView函数中调用tempViews获取值,而tempViews是SessionCatalog中用于保存临时视图的map,还记得我们的程序中有创建视图的函数:
Seq(Person("Jack", 12), Person("James", 21), Person("Mac", 30)).toDS().createTempView("person")
故此处tempViews有值
此时获取的值正是:LocalRelation [name#2, age#3]
其中[name#2, age#3]中的数字是spark默认为列生成的唯一id
并且可以看到dataType类型 = StringType,故字段类型schema是在Analyzer阶段生成
再回到lookupTempView函数,catalog默认给这个获取到的临时视图套上了一层SubqueryAlias 【logicalPlan】设置了一个默认别名节点
层层向上,我们回到RuleExecutor的execute函数中,result返回的结果为:
'Project [*]
+- 'Filter ('AGE > 18)
+- SubqueryAlias person
+- LocalRelation [name#2, age#3]
至此子节点analyzer解析成功,其余节点转换过程大体一致,感兴趣的可以看各个规则即可,这里不在赘述;