SPARKSQL3.0-Analyzer阶段源码剖析

news2025/1/18 17:14:45

一、前言

在上一节【SPARKSQL3.0-Unresolved[Parsed]阶段源码剖析】中已经介绍了Unresolved Logical Plan未解析阶段,建议先看完上一篇文章后再来看本文

由于Unresolved LogicalPlan阶段生成的logicalPlan仅仅是一种数据结构,不包含任何数据信息。故在分析器(Analyzer) 阶段会使用事先定义好的规则(Rule)以及 Catalog 等信息对未解析的逻辑计划(Unresolved Logical Plan) 进行补充和替换【logicalPlan】中的各个节点,让新的语法树包含元数据信息

Catalog 主要用于各种函数资源信息和元数据信息(数据库、数据表、数据视图、数据分区与函数等)的统一管理。

二、示例:

val sparkConf = new SparkConf().setMaster("local[*]").setAppName("sparkSQL")

val spark = SparkSession.builder()
  .config(sparkConf)
  .getOrCreate()

case class Person(name: String, age: Int)

Seq(Person("Jack", 12), Person("James", 21), Person("Mac", 30)).toDS().createTempView("person")

spark.sql("SELECT * FROM PERSON WHERE AGE > 18").explain(true) // 打印执行计划

执行计划:

== Parsed Logical Plan ==
'Project [*]
+- 'Filter ('AGE > 18)
   +- 'UnresolvedRelation `PERSON`

== Analyzed Logical Plan ==
name: string, age: int
Project [name#10, age#11]
+- Filter (AGE#11 > 18)
   +- SubqueryAlias person
      +- LocalRelation [name#10, age#11]

三、源码

根据上面的例子我们进入到sparkSession.sql中:

image-20220705164335283

plan就是sqlParser解析出来的logicalPlan【上一节已经讲过其详细过程】,然后就进入到Dataset.ofRows,可以看到只要进入到此函数都会构建了一个QueryExecution【可以理解成spark会为每一个sql语句创建一个QueryExecution】,然后调用QueryExecution的assertAnalyzed函数,接下来看QueryExecution类

image-20220705164431965

QueryExecution极其重要,它是sql查询的主要工作流,它包含了一条sql语句需要执行的所有计划:

  • logical是未解析的逻辑执行计划
  • analyzed是解析后的逻辑执行计划
  • optimizedPlan是优化后的逻辑执行计划
  • executePlan是物理执行计划
  • sparkPlan是Spark执行计划

image-20220705165740939

可以看出其内部都是使用的sparkSession.sessionState的 分析器,优化器,语法器等,关于sessionState的讲解可以看之前的文章

我们回头看qe.assertAnalyzed()调用,下图可以看到是调用了sessionState.analyzer的executeAndCheck函数

接下来看sessionState.analyzer是哪个变量

image-20220705165834007

Session是由BaseSessionStateBuilder构建出来,analyzer变量构建的是Analyzer类,并且在类中重写了一些规则

image-20220705170158838

image-20220705170220859

找到了Analyzer,接下来看其中的executeAndCheck函数,发现其内部调用了executeAndTrack

image-20220705170539520

而Analyzer中没有executeAndTrack函数,此处是调用的的父类RuleExecutor的executeAndTrack函数,函数内部又调用了execute函数

image-20220707142820120

image-20220707142703447

execute函数被子类Analyzer重写,但重写函数最终上还是调用父类RuleExecutor的execute

image-20220707143115913

在RuleExecutor的execute函数中,最重要的是循环调用batches,这里贴一下源码:

核心思路是curPlan迭代一次会获新的curPlan并检查一下是否等于上一次的lastPlan,如果相等则无需下一次迭代,否则继续迭代,直到和lastPlan相等或者超过最大迭代次数[默认100]

def execute(plan: TreeType): TreeType = {
    var curPlan = plan
    val queryExecutionMetrics = RuleExecutor.queryExecutionMeter
    val planChangeLogger = new PlanChangeLogger()
    val tracker: Option[QueryPlanningTracker] = QueryPlanningTracker.get
    val beforeMetrics = RuleExecutor.getCurrentMetrics()

    // 针对初始输入plan,运行结构完整性检查
    if (!isPlanIntegral(plan)) {
      val message = "The structural integrity of the input plan is broken in " +
        s"${this.getClass.getName.stripSuffix("$")}."
      throw new TreeNodeException(plan, message, null)
    }
		// 遍历不同子类实现的batches中定义的 batchs 变量, 此处batches是用的Analyzer子类的实现
    batches.foreach { batch =>
      // 用来对比执行规则前后,初始的plan有无变化
      val batchStartPlan = curPlan
      var iteration = 1
      var lastPlan = curPlan
      var continue = true

      // 执行直到达到稳定点或者最大迭代次数
      while (continue) {
        curPlan = batch.rules.foldLeft(curPlan) {
          case (plan, rule) =>
            val startTime = System.nanoTime()
            val result = rule(plan)
            val runTime = System.nanoTime() - startTime
            val effective = !result.fastEquals(plan)

            if (effective) {
              queryExecutionMetrics.incNumEffectiveExecution(rule.ruleName)
              queryExecutionMetrics.incTimeEffectiveExecutionBy(rule.ruleName, runTime)
              planChangeLogger.logRule(rule.ruleName, plan, result)
            }
            queryExecutionMetrics.incExecutionTimeBy(rule.ruleName, runTime)
            queryExecutionMetrics.incNumExecution(rule.ruleName)
            tracker.foreach(_.recordRuleInvocation(rule.ruleName, runTime, effective))
            if (!isPlanIntegral(result)) {
              val message = s"After applying rule ${rule.ruleName} in batch ${batch.name}, " +
                "the structural integrity of the plan is broken."
              throw new TreeNodeException(result, message, null)
            }
            result
        }
        iteration += 1
        // 到达最大迭代次数, 不再执行优化
        if (iteration > batch.strategy.maxIterations) {
          // 只对最大迭代次数大于1的情况打log
          if (iteration != 2) {
            val endingMsg = if (batch.strategy.maxIterationsSetting == null) {
              "."
            } else {
              s", please set '${batch.strategy.maxIterationsSetting}' to a larger value."
            }
            val message = s"Max iterations (${iteration - 1}) reached for batch ${batch.name}" +
              s"$endingMsg"
            if (Utils.isTesting || batch.strategy.errorOnExceed) {
              throw new TreeNodeException(curPlan, message, null)
            } else {
              logWarning(message)
            }
          }
          // 检查一次幂等
          if (batch.strategy == Once &&
            Utils.isTesting && !excludedOnceBatches.contains(batch.name)) {
            checkBatchIdempotence(batch, curPlan)
          }
          continue = false
        }
        // plan不变了,到达稳定点,不再执行优化
        if (curPlan.fastEquals(lastPlan)) {
          logTrace(
            s"Fixed point reached for batch ${batch.name} after ${iteration - 1} iterations.")
          continue = false
        }
        lastPlan = curPlan
      }
      planChangeLogger.logBatch(batch.name, batchStartPlan, curPlan)
    }
    planChangeLogger.logMetrics(RuleExecutor.getCurrentMetrics() - beforeMetrics)
    curPlan
  }
    
}

可以看到将每个Batch中所有的规则Rule对象实施于该Unsolved LogicalPlan,并且该Batch中规则可能要执行多轮,直到执行的批数等于batch.strategy.maxIterations或者logicalplan与上个批次的结果比没有变化,则退出执行, batch.strategy.maxIterations次数每个batch都不同,有的batch是1,有的batch则是通过spark的配置文件中的默认值,此默认值在spark3.0中是100次,下面有讲到

RuleExecutor使用模版设计模式,其batches抽象函数由其子类实现,也就是Analyzer的batches

image-20220705170945214

Analyzer的batches = Seq[Batch],是由多个Batch 组成 ,这里贴一下源码:

lazy val batches: Seq[Batch] = Seq(
  	// Hint策略组
    Batch("Hints", fixedPoint,
      new ResolveHints.ResolveJoinStrategyHints(conf),
      new ResolveHints.ResolveCoalesceHints(conf)),
  	// 简单检查策略组
    Batch("Simple Sanity Check", Once,
      LookupFunctions),
  	// 替换策略组
    Batch("Substitution", fixedPoint,
      CTESubstitution,
      WindowsSubstitution,
      EliminateUnions,
      new SubstituteUnresolvedOrdinals(conf)),
  	// 最关键的关系策略组
    Batch("Resolution", fixedPoint,
      ResolveTableValuedFunctions ::
      ResolveNamespace(catalogManager) ::
      new ResolveCatalogs(catalogManager) ::
      ResolveInsertInto ::
      ResolveRelations ::
      ResolveTables ::
      ResolveReferences ::
      ResolveCreateNamedStruct ::
      ResolveDeserializer ::
      ResolveNewInstance ::
      ResolveUpCast ::
      ResolveGroupingAnalytics ::
      ResolvePivot ::
      ResolveOrdinalInOrderByAndGroupBy ::
      ResolveAggAliasInGroupBy ::
      ResolveMissingReferences ::
      ExtractGenerator ::
      ResolveGenerate ::
      ResolveFunctions ::
      ResolveAliases ::
      ResolveSubquery ::
      ResolveSubqueryColumnAliases ::
      ResolveWindowOrder ::
      ResolveWindowFrame ::
      ResolveNaturalAndUsingJoin ::
      ResolveOutputRelation ::
      ExtractWindowExpressions ::
      GlobalAggregates ::
      ResolveAggregateFunctions ::
      TimeWindowing ::
      ResolveInlineTables(conf) ::
      ResolveHigherOrderFunctions(v1SessionCatalog) ::
      ResolveLambdaVariables(conf) ::
      ResolveTimeZone(conf) ::
      ResolveRandomSeed ::
      ResolveBinaryArithmetic ::
      TypeCoercion.typeCoercionRules(conf) ++
      extendedResolutionRules : _*),
  	// 执行的钩子策略组
    Batch("Post-Hoc Resolution", Once, postHocResolutionRules: _*),
  	// Alter Table策略组
    Batch("Normalize Alter Table", Once, ResolveAlterTableChanges),
  	// 移除未解析的Hint策略组
    Batch("Remove Unresolved Hints", Once,
      new ResolveHints.RemoveAllHints(conf)),
  	// 非确定策略组
    Batch("Nondeterministic", Once,
      PullOutNondeterministic),
  	// UDF策略组
    Batch("UDF", Once,
      HandleNullInputsForUDF),
  	// 更新为null策略组
    Batch("UpdateNullability", Once,
      UpdateAttributeNullability),
  	// 子查询策略组
    Batch("Subquery", Once,
      UpdateOuterReferences),
  	// 清理策略组
    Batch("Cleanup", fixedPoint,
      CleanupAliases)
  )

再来看Batch,一个 Batch 包含 name , Strategy 和一个或多个Rule 构成的 Rules

image-20220705171202277

Strategy 定义了该 Batch 的最大执行次数,最大迭代次数

image-20220705171314562

Rules 是logcial解析的关键, Analyzer 对 LogicalPlan 分析的过程其实就是对 plan 逐个应用 Rule 的过程。

Rule有一堆的子类,每个子类的解析各不相同

image-20220705171543126

接下来回到RuleExecutor的execute函数【对照上面源码】

处理的流程为依次遍历 Batches ,使用每一个 Batch 处理 LocgicalPlan 。对于每一个 Batch ,依次使用该 Batch 中的 Rule 处理 plan ,处理完的 plan 作为该 Batch 中的下一条 Rule 处理的输入继续处理。经过该 Batch 所有 Rules 处理过的 plan 再重复上述的 Rule 处理过程,直至达到该 Batch 的最大运行次数或该 plan 不再变化为止。然后接着使用下一个 Batch 继续处理该 plan 。具体流程可参考以下流程图。

补充一下,有些batch的策略中最大迭代次数为100:

image-20220712172010713

再回到上一节我们得到的logicalPlan,可以看出此处到达execute的plan正是Project【logicalPlan】

== Parsed Logical Plan ==
'Project [*]
+- 'Filter ('AGE > 18)
   +- 'UnresolvedRelation `PERSON`

image-20220707143939498

而经过execute阶段我们应该返回的是:

== Analyzed Logical Plan ==
name: string, age: int
Project [name#10, age#11]
+- Filter (AGE#11 > 18)
   +- SubqueryAlias person
      +- LocalRelation [name#10, age#11]

image-20220707145824247

这中间的变化就是batches.foreach循环执行不同策略后的结果

我们回到batches.foreach循环策略组阶段,这里直接告诉大家,当循环到Resolution【关系策略】中的ResolveRelations规则时才会真正执行解析

image-20220707154212566

这里贴一下Resolution【关系策略】的规则列表

image-20220707154503339

在经历了ResolveRelations规则后,plan变成了新的Project:

'Project [*]
+- 'Filter ('AGE > 18)
   +- SubqueryAlias person
      +- LocalRelation [name#2, age#3]

image-20220707154612015

可以看出根节点从之前的UnresolvedRelation `PERSON -> LocalRelation [name#2, age#3]

并且加上了一层 SubqueryAlias person

那么接下来看一下ResolveRelations策略的执行逻辑

ResolveRelations的apply函数中,将project交给了ResolveTempViews【object类】

image-20220707162500892

在ResolveTempViews 类中的apply函数中调用了resolveOperatorsUp函数,并将函数参数传进去

image-20220707162557917

在resolveOperatorsUp函数总会判断当前project是否已经被analyzed解析,如果没有解析则会递归调用所有子节点,自下而上

image-20220707162841983

既然是自下而上,那么最底层就是 ± 'UnresolvedRelation PERSON节点,此时就用到了函数传参的解析规则:

image-20220707163544147

identifier便是 person字符串

image-20220707163619067

此时用到了v1SessionCatalog :SessionCatalog,进入到SessionCatalog类的lookupTempView

image-20220707163655575

在getTempView函数中调用tempViews获取值,而tempViews是SessionCatalog中用于保存临时视图的map,还记得我们的程序中有创建视图的函数:

Seq(Person("Jack", 12), Person("James", 21), Person("Mac", 30)).toDS().createTempView("person")

故此处tempViews有值

image-20220707163850465

此时获取的值正是:LocalRelation [name#2, age#3]

image-20220707163756143

其中[name#2, age#3]中的数字是spark默认为列生成的唯一id

并且可以看到dataType类型 = StringType,故字段类型schema是在Analyzer阶段生成

image-20220707164630063

再回到lookupTempView函数,catalog默认给这个获取到的临时视图套上了一层SubqueryAlias 【logicalPlan】设置了一个默认别名节点

image-20220707164104182

层层向上,我们回到RuleExecutor的execute函数中,result返回的结果为:

'Project [*]
+- 'Filter ('AGE > 18)
   +- SubqueryAlias person
      +- LocalRelation [name#2, age#3]

image-20220707172856908

至此子节点analyzer解析成功,其余节点转换过程大体一致,感兴趣的可以看各个规则即可,这里不在赘述;

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/23653.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

ggplot2 | 世界杯赛程的可视化就交给我吧!~

11. 写在前面 昨天卡塔尔🇶🇦输了比赛真是让人大跌眼镜啊😱,打破了世界杯东道主必胜的神律,也不知道王子们是怎么想的。🤣 今天是英格兰🏴󠁧󠁢󠁥&#xe006e…

Linux 调试之strace

文章目录前言一、strace 例子1.1 strace 跟踪 free1.2 strace 跟踪 dd1.3 strace 其他一些使用选项二、strace 原理2.1 ptrace简介2.2 strace 原理总结参考资料前言 strace命令是Linux系统调用跟踪器,可以跟踪系统调用,为每个系统调用打印一行信息&…

浅谈SQL Server索引视图(物化视图)以及索引视图与查询重写

目录 (一)前言 (二)正文 1. 物化视图(索引视图)与查询重写的基本概念 2. 创建测试环境 (1)建表 (2)写数据 3. 索引视图创建 (1&#xff0…

一篇文章详解Linux的用户和权限

教程推荐:Linux零基础快速入门到精通 认知root用户 root用户(超级管理员) 无论是Windows、MacOS、Linux均采用多用户的管理模式进行权限管理。 •在Linux系统中,拥有最大权限的账户名为:root(超级管理员&a…

第十七届全国人机语音通讯学术会议(NCMMSC 2022) | 早鸟票开放注册了

全国人机语音通讯学术会议是国内语音领域广大专家、学者和科研工作者交流最新研究成果,促进该领域研究和开发工作不断进步的重要舞台。该系列会议自1990年开创以来已成功召开了十六届。2022年第十七届全国人机语音通讯学术会议(National Conference on M…

移动WEB开发之流式布局--移动端常见布局--流式布局

移动端技术选型 移动端布局和以前我们学习的PC端有所区别: 1. 单独制作移动端页面(主流) 流式布局(百分比布局) flex 弹性布局(强烈推荐) lessrem媒体查询布局 混合布局 2. 响应式页面兼…

ConcurrentHashMap的实现原理是分段锁?你Out了

前言 Java后端开发面试的时候,一场好的面试,是无论如何也绕不开并发编程的。并发编程里面往往有个很重要的类可能会被拿出来探讨:ConcurrentHashMap。 ConcurrentHashMap是HashMap的线程安全版。大家都知道HashMap的高性能,但是H…

动静态库的制作

目录 一.动静态库的原理 二.静态库 2.1制作静态库 2.2使用静态库 三.动态库 3.1制作动态库 3.2动态库的使用 一.动静态库的原理 首先要知道可执行程序的生成过程:1,预处理 2,编译 3,汇编 4 ,链接 1.预处理 头…

03 LaTex之标题页摘要

1.标题页 \title{{ABC}\footnote{explain}}%生成标题和标题的脚注\author{\small $^a$lay \qquad $^b$winter \footnote{super star}\\%换行符 %作者信息 \small $^a$ lays brief\\ \small lays address, 710021\\%换行 \small $^b$ winters introduction \\ \small winters …

0101 蓝桥杯真题04

/* * 马虎的算式 * 小明是个急性子,上小学的时候经常把老师写在黑板上的题目抄错了。 * 有一次,老师出的题目是:36 x 495 ? * 他却给抄成了:396 x 45 ? * 但结果却很戏剧性,他的答案竟然是对的!&a…

同花顺_代码解析_技术指标_Z_3

本文通过对同花顺中现成代码进行解析,用以了解同花顺相关策略设计的思想 目录 ZNZ_DPCYC1 ZNZ_DPCYR ZNZ_HLD ZNZ_HUO ZNZ_MYL1 ZNZ_MYP1 ZNZ_PAS ZNZ_PAS1 ZNZ_RPY1 ZNZ_RPY2 ZNZ_SDR ZNZ_TAO ZNZ_YHBOL1 ZNZ_YHCBB ZX ZNZ_DPCYC1 大盘成本均线 行…

python 给图片添加噪声

import numpy as np import cv2 import matplotlib.pyplot as plt import skimage from skimage import io import randomdef addGaussNoise(origin,var0.0005):#添加高斯噪声函数var random.uniform(0.0001, 0.04)noisy skimage.util.random_noise(origin, modegaussian, va…

idea iu 2021 Mac版本的使用,如何创建java web项目,包括tomcat和web包

Java web系列文章目录 第一章 前端学习入门之idea iu 2021版本的使用 目录Java web系列文章目录前言一、Java web是什么?二、配置步骤1.下载Tomcat服务器2.idea iu 2021版本界面总结前言 随着前端的学习路径,java web项目不可避免要学习使用&#xff0…

YUV与RGB 以及之间的转换

目录 一、RGB 二、YUV 三、YUV类型和存储方式 1、类型 2、存储方式 四、分析YUV 4:2:0 1、YU12(I420,YUV420P) 2、YV12 3、NV12(YUV420SP) 4、NV21(YUV420SP) 5、占用空间大小比较 五、RGB与YUV之间的转换 1、转换标准 2、Color Range 3、计算公式 在…

【ArcGIS】属性表导出及乱码问题

这玩意其实说难不难,但是乱码有时候还是烦人 直接复制到EXCEL 部分表细节被我删掉了 直接点击全选,然后复制,再到EXCEL里粘贴。我有时候就是这么干的。而且量大概是二十万行左右。 Table to Table 如果你的属性文件大于65533行&#xff…

十一、组合API(1)

本章概要 为什么要引入组合APIsetup() 函数 组合(Composition)API 是在 Vue 3.0 中引入的,它是一组附加的、基于函数的 API ,允许灵活地组合组件逻辑。 组合 API 并没有引入新的概念,更多地是将 Vue 的核心功能&…

项目相互依赖调用解决方法两种方法

Bmodel依赖于Amodel,但是Amodel又需要BModel的信息。原来是在Amodel创建一块内存,在Bmodel中将内存地址赋给这块内存,然后在Amodel去做其他操作。 方法一:采用静态变量static链接:C开发中一个解决方案里,两…

LeetCode 0808. 分汤:好题【感叹号】

【LetMeFly】808.分汤:好题! 力扣题目链接:https://leetcode.cn/problems/soup-servings/ 有 A 和 B 两种类型 的汤。一开始每种类型的汤有 n 毫升。有四种分配操作: 提供 100ml 的 汤A 和 0ml 的 汤B 。提供 75ml 的 汤A 和 2…

Google Earth Engine(GEE)—— 各矿区时序NDVI变化图(包含具体的运行函数)

函数: ee.Filter.eq(name, value) Filter to metadata equal to the given value. Returns the constructed filter. Arguments: name (String): The property name to filter on. value (Object): The value to compare against. Returns: Filter ui.Chart.image.s…

7、Jedis测试

文章目录7、Jedis测试7.1. Jedis所需要的jar包7.2. 连接Redis注意事项7.3. Jedis常用操作7.3.1. 创建动态的工程7.3.2. 创建测试程序7.4. 测试相关数据类型7.4.1. Jedis-API:Key7.4.2. Jedis-API:String7.4.3. Jedis-API:List7.4.4. Jedis-AP…