Spark SQL执行计划到RDD全流程记录

news2025/1/11 0:03:29

目录

0、样例说明

1、解析词义,语义,生成语法树

1.1、概念

1.2、根据样例跟踪Spark代码

2、Unresolved Logical Plan

3、Analyzed Logical Plan

源码

SessionCatalog

Analyzer Rule Batch

对比

4、Optimized Logical Plan

5、Physical Plan

SparkPlan 


记录Spark SQL生成执行计划的全流程和代码跟踪。Spark版本是2.3.2。

 上图流程描述了Spark SQL 怎么转成Spark计算框架可以执行的分布式模型,下面结合一个样例,跟踪每个步骤。

0、样例说明

SQL样例:

select 
    t1.*,
    t2.id
from 
    test.tb_hive_test t1
join 
    test.test t2
on 
    t1.a == t2.name and imp_date == '20221216'

test.tb_hive_test是分区表,test.test是非分区表,两张表的建表语句:

CREATE TABLE test.tb_hive_test(
    a string, 
    b string
)
PARTITIONED BY (imp_date string COMMENT '分区时间')
Stored as ORC


CREATE TABLE test.test(
    id int, 
    name string,
    create_time timestamp
)
Stored as ORC

1、解析词义,语义,生成语法树

1.1、概念

SparK基于ANTLR语法解析SQL,ANTLR是可以根据输入自动生成语法树并可视化的显示出来的开源语法分析器。 ANTLR 主要包含词法分析器,语法分析器和树分析器。

  • 词法分析器又称 Scanner、Lexer 或 Tokenizer。词法分析器的工作是分析量化那些本来毫无意义的字符流, 抽取出一些单词,例如关键字(例如SELECT)、标识符(例如STRING)、符号(例如=)和操作符(例如UNION) 供语法分析器使用。
  • 语法分析器又称编译器。在分析字符流的时候,词法分析器只关注单个词语,不关注上下文。语法分析器则将收到的 Token 组织起来,并转换成为目标语言语法定义所允许的序列。
  • 树分析器可以用于对语法分析生成的抽象语法树进行遍历,并能执行一些相关的操作。

1.2、Spark源码分析

当提交上述SQL是,会调用该方法:

package org.apache.spark.sql

class SparkSession private(...){
    //sqlText为输入的SQL
    def sql(sqlText: String): DataFrame = {
        Dataset.ofRows(self, sessionState.sqlParser.parsePlan(sqlText))
      }
}

sessionState.sqlParser.parsePlan又会调用AbstractSqlParser中的parse方法解析SQL:

package org.apache.spark.sql.catalyst.parser


abstract class AbstractSqlParser extends ParserInterface with Logging {
    //1、调用parse函数
    override def parsePlan(sqlText: String): LogicalPlan = parse(sqlText) { parser =>
        //2、astBuilder遍历语法树
        astBuilder.visitSingleStatement(parser.singleStatement()) match {
          case plan: LogicalPlan => plan
          case _ =>
            val position = Origin(None, None)
            throw new ParseException(Option(sqlText), "Unsupported SQL statement", position, position)
        }
    }


    protected def parse[T](command: String)(toResult: SqlBaseParser => T): T = {
        logDebug(s"Parsing command: $command")
        //词法分析器
        val lexer = new SqlBaseLexer(new UpperCaseCharStream(CharStreams.fromString(command)))
        lexer.removeErrorListeners()
        lexer.addErrorListener(ParseErrorListener)

      
        val tokenStream = new CommonTokenStream(lexer)
        val parser = new SqlBaseParser(tokenStream)  //语法分析器
        parser.addParseListener(PostProcessor)
        parser.removeErrorListeners()
        parser.addErrorListener(ParseErrorListener)
        ...
    }
}

debug可以看到,执行到这里,sql被拆解成了很多类型的**Context

这些Context类都是根据ANTLR语法去解析SqlBase.g4这个DSL脚本,然后生成的,比如下面这个querySpecification就会生成QuerySpecificationContext类,里面就能通过各类参数记录下Select开头的某类SQL的词义和值。

 把SQL解析成AST语法树之后,就可以开始构建逻辑执行计划。

2、Unresolved Logical Plan

回到上面得parsePlan函数,第一步生成语法树之后,第二步是使用 AstBuilder (树遍历)将语法

树转换成 LogicalPlan。这个 LogicalPlan 也被称为 Unresolved LogicalPlan。

package org.apache.spark.sql.catalyst.parser


abstract class AbstractSqlParser extends ParserInterface with Logging {
    //1、调用parse函数
    override def parsePlan(sqlText: String): LogicalPlan = parse(sqlText) { parser =>
        //2、astBuilder遍历语法树
        astBuilder.visitSingleStatement(parser.singleStatement()) match {
          case plan: LogicalPlan => plan
          case _ =>
            val position = Origin(None, None)
            throw new ParseException(Option(sqlText), "Unsupported SQL statement", position, position)
        }
    }
}

从代码上看,第二步执行完成后,应该得到的是如下的执行计划 

但是从Spark history看,logical plan如下,这个是spark自己的优化:

3、Analyzed Logical Plan

源码

Unresolved LogicalPlan 通过绑定catalog元数据,得到Analyzed Logical Plan。这一步骤主要通过Analyzer来实现。先通过源码跟踪,回到第一步:

package org.apache.spark.sql

class SparkSession private(...){
    //sqlText为输入的SQL
    def sql(sqlText: String): DataFrame = {
        Dataset.ofRows(
            self, 
            //1、得到Unresolved Logical Plan
            sessionState.sqlParser.parsePlan(sqlText) 
        )
      }
}

private[sql] object Dataset {

    def ofRows(sparkSession: SparkSession, logicalPlan: LogicalPlan): DataFrame = {
        //2、构造一个QueryExecution
        val qe = sparkSession.sessionState.executePlan(logicalPlan) 
        qe.assertAnalyzed()  //3、analyzer
        new Dataset[Row](sparkSession, qe, RowEncoder(qe.analyzed.schema))
    }
}


package org.apache.spark.sql.execution
    class QueryExecution(val sparkSession: SparkSession, val logical: LogicalPlan) {
        def assertAnalyzed(): Unit = analyzed
        
        lazy val analyzed: LogicalPlan = {
            SparkSession.setActiveSession(sparkSession)
            //3.1、对逻辑计划进行关联分析,得到分析后的逻辑计划
            sparkSession.sessionState.analyzer.executeAndCheck(logical) 
      }
}

private[sql] class SessionState(
    sharedState: SharedState,
    val conf: SQLConf,
    val experimentalMethods: ExperimentalMethods,
    val functionRegistry: FunctionRegistry,
    val udfRegistration: UDFRegistration,
    catalogBuilder: () => SessionCatalog,
    val sqlParser: ParserInterface,
    analyzerBuilder: () => Analyzer,
    optimizerBuilder: () => Optimizer,
    val planner: SparkPlanner,
    val streamingQueryManager: StreamingQueryManager,
    val listenerManager: ExecutionListenerManager,
    resourceLoaderBuilder: () => SessionResourceLoader,
    createQueryExecution: LogicalPlan => QueryExecution,
    createClone: (SparkSession, SessionState) => SessionState) {

然后看下Analyzer源码,构造Analyzer对象的第一个入参是SessionCatalog。

package org.apache.spark.sql.catalyst.analysis

class Analyzer(
    catalog: SessionCatalog,
    conf: SQLConf,
    maxIterations: Int)
extends RuleExecutor[LogicalPlan] with CheckAnalysis {

    //1、analyzer调用入口参数
    def executeAndCheck(plan: LogicalPlan): LogicalPlan = {
        val analyzed = execute(plan) //2、对逻辑执行计划进行遍历
        try {
          checkAnalysis(analyzed)
          EliminateBarriers(analyzed)
        } catch {
            case e: AnalysisException =>
            val ae = new AnalysisException(e.message, e.line, e.startPosition, Option(analyzed))
            ae.setStackTrace(e.getStackTrace)
            throw ae
        }
      }



package org.apache.spark.sql.catalyst.rules
abstract class RuleExecutor[TreeType <: TreeNode[_]] extends Logging {

    def execute(plan: TreeType): TreeType = {
        var curPlan = plan
        val queryExecutionMetrics = RuleExecutor.queryExecutionMeter

        //2、对逻辑执行计划进行遍历
        batches.foreach { batch =>
          val batchStartPlan = curPlan
          var iteration = 1
          var lastPlan = curPlan
          var continue = true

          // Run until fix point (or the max number of iterations as specified in the strategy.
          while (continue) {
              curPlan = batch.rules.foldLeft(curPlan) {
                case (plan, rule) => 
                    queryExecutionMetrics.incNumEffectiveExecution(rule.ruleName)
                    queryExecutionMetrics.incTimeEffectiveExecutionBy(rule.ruleName, runTime)

              }
           }    
          ...
        }
    }
}



package org.apache.spark.sql.catalyst.analysis
class Analyzer...{
    lazy val batches: Seq[Batch] = Seq(
      ...
      Batch("Resolution", fixedPoint,
      ResolveTableValuedFunctions ::
      ResolveRelations ::
      ResolveReferences ::
      ResolveCreateNamedStruct ::
      ...
     )
}

SessionCatalog

在 Spark 2.x 中,Spark SQL中的 Catalog 体系实现以 SessionCatalog 为主体,SessionCatalog起到了一个代理的作用,对底层的元数据信息、临时表信息、视图信息和函数信息进行了封装,用来管理数据库(Databases)、数据表(Tables)、数据分区(Partitions)和函数(Functions)的接口。

SessionCatalog 的主体入参主要分为两类:ExternalCatalog和InMemoryCatalog,ExternalCatalog利用Hive原数据库来实现持久化的管理,在生产环境中广泛应用,InMemoryCatalog将信息存储在内存中,一般用于测试或比较简单的SQL处理;

Analyzer Rule Batch

Analyzer 会使用事先定义好的 Rule 以及 SessionCatalog 对 Unresolved LogicalPlan 进行转换操作;多个性质类似的 Rule 组成一个 Batch,例如下面的Resolution Batch,RuleExecutor 执行时,是先按Batch顺序,串行执行第一个Batch的Rule列表,再执行第二个Batch的Rule列表。

package org.apache.spark.sql.catalyst.analysis

class Analyzer(
    catalog: SessionCatalog,
    conf: SQLConf,
    maxIterations: Int)
//0、Analyzer继承RuleExecutor,最终的转换是在RuleExecutor实现
extends RuleExecutor[LogicalPlan] with CheckAnalysis {

    //1、analyzer调用入口参数
    def executeAndCheck(plan: LogicalPlan): LogicalPlan = {
        val analyzed = execute(plan) //2、最终跳转到RuleExecutor中的execute
        try {
          checkAnalysis(analyzed)
          EliminateBarriers(analyzed)
        } catch {
            case e: AnalysisException =>
            val ae = new AnalysisException(e.message, e.line, e.startPosition, Option(analyzed))
            ae.setStackTrace(e.getStackTrace)
            throw ae
        }
      }



package org.apache.spark.sql.catalyst.rules
abstract class RuleExecutor[TreeType <: TreeNode[_]] extends Logging {
    def execute(plan: TreeType): TreeType = {
        var curPlan = plan
        val queryExecutionMetrics = RuleExecutor.queryExecutionMeter

        //3、对预定义好的rule进行遍历
        //根据rule将UnresolvedLogicalPlan转换成RelateLogicalPlan
        batches.foreach { batch =>
          val batchStartPlan = curPlan
          var iteration = 1
          var lastPlan = curPlan
          var continue = true

          // Run until fix point (or the max number of iterations as specified in the strategy.
          while (continue) {
              curPlan = batch.rules.foldLeft(curPlan) {
                ...
              }
          }    
       }
    }
}


//4、rule在Analyser已经预定义好
package org.apache.spark.sql.catalyst.analysis
class Analyzer...{
    lazy val batches: Seq[Batch] = Seq(
      ...
      Batch("Resolution", fixedPoint,
      ResolveTableValuedFunctions :: //解析表的函数
      ResolveRelations :: //解析表或者视图的函数
      ResolveReferences :: //解析列的函数
      ...
     )
}

对比

下面是Unresolved LogicalPlan和Analyzed Logical Plan的对比,其实我这个例子不太明显,没有复杂的子查询,然后spark对Unresolved LogicalPlan得到的logicalPlan已经做了一些元数据的关联,因此两个步骤的结果相差不大,只是把AnalysisBarrier去掉了。

4、Optimized Logical Plan

Optimized Logical Plan的生成和第三步的逻辑差不多,主要看下Optimizer自己预定义了哪些Rule Batch。

package org.apache.spark.sql.catalyst.optimizer
abstract class Optimizer(sessionCatalog: SessionCatalog)
  extends RuleExecutor[LogicalPlan] {

    def batches: Seq[Batch] = {
        val operatorOptimizationRuleSet =
          Seq(
            // Operator push down
            PushProjectionThroughUnion,
            ReorderJoin,
            EliminateOuterJoin,
            PushPredicateThroughJoin,
            PushDownPredicate,
            LimitPushDown,
            ColumnPruning,
            InferFiltersFromConstraints,
            // Operator combine
            CollapseRepartition,
            CollapseProject,
            CollapseWindow,
            CombineFilters,
            CombineLimits,
            CombineUnions,
            // Constant folding and strength reduction
            NullPropagation,
            ...
          )
        }
    }
}

在这个例子里,主要做了两类:

  • 谓词下推:把join中的imp_date == '20221216'条件,下推到了tb_hive_test表,这样可以减少数据源要扫描的数据。
  • 空值传导:默认判断分区字段不为空,以及join on条件中的的a字段不为空。

5、Physical Plan

这一步主要将逻辑计划树转换成物理计划树,以获取真实可执行的物理计划。代码还是回到QueryExecution。

package org.apache.spark.sql.execution
class QueryExecution(val sparkSession: SparkSession, val logical: LogicalPlan) {  
    
    lazy val sparkPlan: SparkPlan = {
        SparkSession.setActiveSession(sparkSession)
        planner.plan(ReturnAnswer(optimizedPlan)).next()
    }

    lazy val executedPlan: SparkPlan = prepareForExecution(sparkPlan)

    lazy val toRdd: RDD[InternalRow] = executedPlan.execute()

    protected def prepareForExecution(plan: SparkPlan): SparkPlan = {
        preparations.foldLeft(plan) { case (sp, rule) => rule.apply(sp) }
    }
}

SparkPlan 

上面executedPlan返回的就是SparkPlan,它是所有物理计划的抽象类。有了SparkPlan Tree,才能将其转换成 RDD的DAG。 SparkPlan分四类:

  • LeafExecNode,叶子节点,是不存在子节点的。在所有SparkPlan中最基础的,是生成RDD的开始,和数据源相关,例如HiveTableScanExec。
  • UnaryExecNode,一元节点,拥有一个子节点的SparkPlan,这种物理执行计划一般是对RDD进行转换,一进一出,例如Exchange重分区。
  • BinaryExecNode,二元节点,拥有两个子节点的SparkPlan,这种物理计划有LeftNode和RightNode组成,例如Join。

  • 其他节点,例如CodeGenSupport和UnionExec。

package org.apache.spark.sql.execution
abstract class SparkPlan extends QueryPlan[SparkPlan] with Logging with Serializable {

    trait LeafExecNode extends SparkPlan {
      override final def children: Seq[SparkPlan] = Nil
      override def producedAttributes: AttributeSet = outputSet
    }

    object UnaryExecNode {
      def unapply(a: Any): Option[(SparkPlan, SparkPlan)] = a match {
        case s: SparkPlan if s.children.size == 1 => Some((s, s.children.head))
        case _ => None
      }
    }

    trait BinaryExecNode extends SparkPlan {
      def left: SparkPlan
      def right: SparkPlan

      override final def children: Seq[SparkPlan] = Seq(left, right)
    }
}

在本例子中,Relation 算子变为 FileScan,Join 算子变为BroadcastHashJoin(因为我的测试数据量很小)。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/97634.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

vue3组件基础,组件引用与使用、向子组件传递数据与事件prop、emit

vue3组件基础&#xff0c;组件引用与使用、向子组件传递数据与事件prop、emit 一、组件模板 组成&#xff1a;template(必要)&#xff0c;script&#xff0c;style 例子&#xff1a;模板名称 Hello.vue <template><div class"msgStyle">{{ msg }}</di…

Java项目:ssm校园在线点餐系统源码

作者主页&#xff1a;源码空间站2022 简介&#xff1a;Java领域优质创作者、Java项目、学习资料、技术互助 文末获取源码 项目介绍 系统主要分为前台和后台&#xff0c;分为管理员与普通用户两种角色&#xff1b; 前台主要功能有&#xff1a;用户注册、用户登录、我的购物车、…

大学生零基础学编程要多久呢

众所周知现在的程序员都是高薪工作&#xff0c;很多人会通过自学或培训去获得一些编程知识&#xff0c;那“学编程需要什么基础呢&#xff1f;” 编程培训已经成为了很多大学毕业生缓解就业压力的一种方式&#xff0c;毕业之后找一份高薪工作是许多大学生最基本想法&#xff0…

【学习打卡03】可解释机器学习笔记之CAM类激活热力图

可解释机器学习笔记之CAM类激活热力图 文章目录可解释机器学习笔记之CAM类激活热力图CAM介绍CAM算法原理GAP全局平均池化GAP VS GMPCAM算法的缺点及改进CAM可视化同张图&#xff0c;不同类别不同图&#xff0c;同个类别CAM弱监督定位用语义特征编码进行分类CAM各种有意思的应用…

Linux系统中DDR3硬件初始化实验

大家好&#xff0c;我是ST。 今天的话&#xff0c;主要和大家聊一聊&#xff0c;如何使用Cortex-A芯片自带的RAM&#xff0c;很多时候要运行Linux的话是完全不够用的&#xff0c;必须要外接一片RAM芯片&#xff0c;驱动开发板上的DDR3。 目录 第一&#xff1a;何为RAM和ROM …

为什么要学习Python爬虫与数据可视化?

提到Python爬虫与数据可视化&#xff0c;我们都不陌生。因为我们早已身在大数据驱动的时代&#xff0c;数据分析已然成为了一项必备技能。可能有人会问&#xff0c;为什么要学习Python爬虫与数据可视化&#xff1f; 答案是显而易见的&#xff0c;无论是出于时代发展的要求&…

redis之如何支持秒杀场景

写在前面 本文一起看下Redis在秒杀场景中的应用。 1&#xff1a;秒杀都有哪些阶段 redis并非在秒杀的所有阶段都需要使用到&#xff0c;为了更好的了解redis在秒杀场景中的应用&#xff0c;我们先来看下秒杀的不同阶段&#xff0c;基本可以分为秒杀前&#xff0c;秒杀进行时&…

什么是用户增长? (超详细)

一.概况 原因&#xff1a;随着人口红利的衰减&#xff0c;互联网流量红利的马太效应显现&#xff0c;这意味着成本的大幅度增加&#xff0c;企业必须改变过去粗放型的营销和运营方式&#xff0c;用更高效更低成本实现快速增长 定义&#xff1a;通过实验和数据驱动&#xff0c…

5.Linux实用操作

文章目录零、学习目标一、软件安装1、Linux系统的应用商店2、yum命令3、apt命令 - 扩展二、systemctl命令三、软连接四、下载和网络请求1、ping命令2、wget命令3、curl命令五、端口1、概念2、查看端口占用六、进程管理1、概念2、查看进程3、查看指定进程4、关闭进程七、主机状态…

SAP ABAP 开发管理 代码增强标记 位置使用清单(Mark of enhancement)

SAP ABAP 开发管理 代码增强标记 位置使用清单&#xff08;Mark of enhancement&#xff09; 引言&#xff1a; 代码增强标记 &#xff08;Mark of enhancement&#xff09;是我在 ABAP 开发中对增强管理的方法之一&#xff0c;是对 SAP 系统增强管理工具的补充。通过对代码增…

自学Python找不到工作?只要掌握这七大块,月薪15K轻轻松松

1. 开发环境和开发工具 python3.6下载 Download Python sublime Text 3 Sublime Text - Download pycharm下载 PyCharm :: Download Latest Version of PyCharm 2. python语法知识 个人推荐《Python从入门到实践》、《Python编程快速上手》 3. web框架 djangoh中文文档 D…

九、Docker 复杂安装之mysql主从复制

前面我们介绍了Docker 安装单机版mysql,如果没有看可以先看下:https://blog.csdn.net/u011837804/article/details/128315385 本篇学习的前提是懂得mysql主从复制的原理,话不多说,我们开始。 1、下载mysql5.7镜像 涉及命令: 查看本地镜像命令:docker images拉取mysql5…

最近邻 M 点

一 问题描述 在 K 维空间中有很多点&#xff0c;给定一个点&#xff0c;找出最近的 M 个点。点 p 和点 q 之间的距离是连接它们的直线段的长度。 二 输入和输出 1 输入 有多个测试用例。第 1 行包含两个非负整数 n 和 k &#xff0c;分别表示点数和维数&#xff0c;1≤n≤5…

Python: unittest框架

目录 1.0 接口自动化框架设计 2.0 分层设计框架 3.0 接口配置文件 3.1 测试用例数据 4.0 框架执行入口 4.1 测试函数 4.1.1 参数替换 4.1.2 发送请求处理 4.1.3 响应断言 4.1.4 提取全局变量 4.1.5 数据库断言 5.0 工具类 5.1.1 excel文件处理 5.1.2 数…

直呼内行阿里离职带出内网专属“高并发系统设计”学习笔记

前言 我们知道&#xff0c;高并发代表着大流量&#xff0c;高并发系统设计的魅力就在于我们能够凭借自己的聪明才智设计巧妙的方案&#xff0c;从而抵抗巨大流量的冲击&#xff0c;带给用户更好的使用体验。这些方案好似能操纵流量&#xff0c;让流量更加平稳得被系统中的服务…

python 之 pandas数据处理

目录 一&#xff1a;读取文件 二&#xff1a;查看数据 三&#xff1a;获取数据 四&#xff1a;按标签选择 五&#xff1a;按照位置选择 六&#xff1a;布尔索引 七&#xff1a;缺失值 一&#xff1a;读取文件 read_csv 加载文件 df pd.read_csv("classify.csv&quo…

SPRING-了解2-XML

两种bean Spring中有两种bean:一种普通bean,另外一种工厂bin (Factory Bin&#xff0c;注意不是前面说的BeanFactory类) 普通Bean:xml中定义什么类型返回的就是什么类型 <bean id"book" class"com.i7i8i9.spring5.collectiontype.Book"> xml中clas…

Linux 应用基础 Framebuffer应用编程

文章目录前言一、了解Framebuffer二、了解LCD1.LCD的操作原理2.LCD坐标三. Framebuffer 程序分析1. 打开设备&#xff1a;&#xff08;open&#xff09;2. 获取LCD参数 : ( ioctl )3. 映射 framebuffer: ( mmap )四. 描点函数&#xff1a; &#xff08; lcd_put_pixel &#xf…

【OpenFOAM】-olaFlow-算例3- currentWaveFlume

算例路径&#xff1a; olaFlow\tutorials\currentWaveFlume 算例描述&#xff1a; 波流耦合模拟&#xff0c;该算例提供了四种工况&#xff1a;(1) Waves and forward current&#xff0c;(2) Waves and backward current&#xff0c;(3) Forward current only&#xff0c;(4) …

【LSTM预测】基于卷积神经网络结合双向长短时记忆CNN-BiLSTM(多输入单输出)数据预测含Matlab源码

✅作者简介&#xff1a;热爱科研的Matlab仿真开发者&#xff0c;修心和技术同步精进&#xff0c;matlab项目合作可私信。 &#x1f34e;个人主页&#xff1a;Matlab科研工作室 &#x1f34a;个人信条&#xff1a;格物致知。 更多Matlab仿真内容点击&#x1f447; 智能优化算法 …