SPARKSQL3.0-各阶段自定义扩展规则源码剖析

news2024/11/24 3:48:34

一、前言

这一节主要介绍如何自定义扩展各阶段规则

虽然spark内部提供了很多优化规则,但在实际工作中,经常因为业务需求需要自定义扩展优化器或解析器,故自己实现一个优化器才对sparksql有更深的理解

二、扩展范围

spark在扩展方便做的很好,几乎所有阶段都开放了扩展点,用户可以自定义Parser/ResolutionRule/CheckRule/OptimizerRulesPlannerStrategy,如下图:

在这里插入图片描述

Spark用户可以在SQL处理的各个阶段扩展自定义实现,如下

injectOptimizerRule – 添加optimizer自定义规则,optimizer负责逻辑执行计划的优化,我们例子中就是扩展了逻辑优化规则。
injectParser – 添加parser自定义规则,parser负责SQL解析。
injectPlannerStrategy – 添加planner strategy自定义规则,planner负责物理执行计划的生成。
injectResolutionRule – 添加Analyzer自定义规则到Resolution阶段,analyzer负责逻辑执行计划生成。
injectPostHocResolutionRule – 添加Analyzer自定义规则到Post Resolution阶段。
injectCheckRule – 添加Analyzer自定义Check规则。

三、示例

代码:

case class MyResolutionRule(spark: SparkSession) extends Rule[LogicalPlan] with Logging {
  override def apply(plan: LogicalPlan): LogicalPlan = {
    logInfo("开始应用 MyResolutionRule 优化规则")
    plan
  }
}

case class MyPostHocResolutionRule(spark: SparkSession) extends Rule[LogicalPlan] with Logging {
  override def apply(plan: LogicalPlan): LogicalPlan = {
    logInfo("开始应用 MyPostHocResolutionRule 优化规则")
    plan
  }
}

case class MyOptimizerRule(spark: SparkSession) extends Rule[LogicalPlan] with Logging {
  override def apply(plan: LogicalPlan): LogicalPlan = {
    logInfo("开始应用 MyOptimizerRule 优化规则")
    plan
  }
}

case class MyCheckRule(spark: SparkSession) extends (LogicalPlan => Unit) with Logging {
  override def apply(plan: LogicalPlan): Unit = {
    logInfo("开始应用 MyCheckRule 优化规则")
  }
}

case class MySparkStrategy(spark: SparkSession) extends SparkStrategy with Logging {
  override def apply(plan: LogicalPlan): Seq[SparkPlan] = {
    logInfo("开始应用 MySparkStrategy 优化规则")
    Seq.empty
  }
}

// 自定义injectParser
case class MyParser(spark: SparkSession, delegate: ParserInterface) extends ParserInterface with Logging {
  override def parsePlan(sqlText: String): LogicalPlan =
    delegate.parsePlan(sqlText)
  override def parseExpression(sqlText: String): Expression =
    delegate.parseExpression(sqlText)
  override def parseTableIdentifier(sqlText: String): TableIdentifier =
    delegate.parseTableIdentifier(sqlText)
  override def parseFunctionIdentifier(sqlText: String): FunctionIdentifier =
    delegate.parseFunctionIdentifier(sqlText)
  override def parseTableSchema(sqlText: String): StructType =
    delegate.parseTableSchema(sqlText)
  override def parseDataType(sqlText: String): DataType =
    delegate.parseDataType(sqlText)

  override def parseMultipartIdentifier(sqlText: String): Seq[String] = ???
  override def parseRawDataType(sqlText: String): DataType = ???
}


object Test {

  def main(args: Array[String]): Unit = {
    // TODO 创建SparkSQL的运行环境
    val sparkConf = new SparkConf().setMaster("local[2]").setAppName("sparkSQL")
    val spark = SparkSession.builder().config(sparkConf)
      .withExtensions(e => e.injectResolutionRule(MyResolutionRule))
      .withExtensions(e => e.injectPostHocResolutionRule(MyPostHocResolutionRule))
      .withExtensions(e => e.injectCheckRule(MyCheckRule))
      .withExtensions(e => e.injectOptimizerRule(MyOptimizerRule))
      .withExtensions(e => e.injectPlannerStrategy(MySparkStrategy))
      .withExtensions(e => e.injectParser(MyParser))
      .getOrCreate()

    import spark.implicits._

    Seq(Person("Jack", 12), Person("James", 21), Person("Mac", 30)).toDS().createTempView("person")

    spark.sql("SELECT age FROM PERSON WHERE AGE > 18").explain(true)
  
  }
}

结果打印:

22/11/01 14:43:52 INFO MyResolutionRule: 开始应用 MyResolutionRule 优化规则
22/11/01 14:43:52 INFO MyPostHocResolutionRule: 开始应用 MyPostHocResolutionRule 优化规则
22/11/01 14:43:52 INFO MyCheckRule: 开始应用 MyCheckRule 优化规则
22/11/01 14:43:52 INFO MyResolutionRule: 开始应用 MyResolutionRule 优化规则
22/11/01 14:43:52 INFO MyResolutionRule: 开始应用 MyResolutionRule 优化规则
22/11/01 14:43:52 INFO MyPostHocResolutionRule: 开始应用 MyPostHocResolutionRule 优化规则
22/11/01 14:43:52 INFO MyCheckRule: 开始应用 MyCheckRule 优化规则
22/11/01 14:43:53 INFO MyResolutionRule: 开始应用 MyResolutionRule 优化规则
22/11/01 14:43:53 INFO MyPostHocResolutionRule: 开始应用 MyPostHocResolutionRule 优化规则
22/11/01 14:43:53 INFO MyCheckRule: 开始应用 MyCheckRule 优化规则
22/11/01 14:43:54 INFO MyOptimizerRule: 开始应用 MyOptimizerRule 优化规则
22/11/01 14:43:54 INFO MyOptimizerRule: 开始应用 MyOptimizerRule 优化规则
22/11/01 14:43:54 INFO MySparkStrategy: 开始应用 MySparkStrategy 优化规则
22/11/01 14:43:54 INFO MySparkStrategy: 开始应用 MySparkStrategy 优化规则
22/11/01 14:43:54 INFO MyResolutionRule: 开始应用 MyResolutionRule 优化规则
22/11/01 14:43:54 INFO MyPostHocResolutionRule: 开始应用 MyPostHocResolutionRule 优化规则
22/11/01 14:43:54 INFO MyCheckRule: 开始应用 MyCheckRule 优化规则
22/11/01 14:43:55 INFO MyResolutionRule: 开始应用 MyResolutionRule 优化规则
22/11/01 14:43:55 INFO MyResolutionRule: 开始应用 MyResolutionRule 优化规则
22/11/01 14:43:55 INFO MyPostHocResolutionRule: 开始应用 MyPostHocResolutionRule 优化规则
22/11/01 14:43:55 INFO MyCheckRule: 开始应用 MyCheckRule 优化规则
22/11/01 14:43:55 INFO CodeGenerator: Code generated in 18.564505 ms
22/11/01 14:43:55 INFO MyOptimizerRule: 开始应用 MyOptimizerRule 优化规则
22/11/01 14:43:55 INFO MyOptimizerRule: 开始应用 MyOptimizerRule 优化规则
22/11/01 14:43:55 INFO MySparkStrategy: 开始应用 MySparkStrategy 优化规则
22/11/01 14:43:55 INFO MySparkStrategy: 开始应用 MySparkStrategy 优化规则
== Parsed Logical Plan ==
'Project ['age]
+- 'Filter ('AGE > 18)
   +- 'UnresolvedRelation [PERSON]

== Analyzed Logical Plan ==
age: int
Project [age#3]
+- Filter (AGE#3 > 18)
   +- SubqueryAlias person
      +- LocalRelation [name#2, age#3]

== Optimized Logical Plan ==
LocalRelation [age#3]

== Physical Plan ==
LocalTableScan [age#3]

可以看到自定义扩展生效

四、源码

实例中通过sparkSession.withExtensions函数传递自定义扩展类,点进去看发现是将SparkSessionExtensions类传递给入参【即我们调用的各种inject函数】:

image-20221101150221181

image-20221101150010213

image-20221101150028685

此时我们看一下SparkSessionExtensions类内部的 inject 函数,类较长这里直接贴一下代码:可以发现其内部是将外部自定义类通过 += 的方式赋予内部集合变量*

type RuleBuilder = SparkSession => Rule[LogicalPlan]
type CheckRuleBuilder = SparkSession => LogicalPlan => Unit
type StrategyBuilder = SparkSession => Strategy
type ParserBuilder = (SparkSession, ParserInterface) => ParserInterface
type FunctionDescription = (FunctionIdentifier, ExpressionInfo, FunctionBuilder)
type ColumnarRuleBuilder = SparkSession => ColumnarRule

......

private[this] val resolutionRuleBuilders = mutable.Buffer.empty[RuleBuilder]

private[this] val postHocResolutionRuleBuilders = mutable.Buffer.empty[RuleBuilder]

private[this] val checkRuleBuilders = mutable.Buffer.empty[CheckRuleBuilder]

private[this] val optimizerRules = mutable.Buffer.empty[RuleBuilder]

......

def injectResolutionRule(builder: RuleBuilder): Unit = {
    resolutionRuleBuilders += builder
}

def injectPostHocResolutionRule(builder: RuleBuilder): Unit = {
    postHocResolutionRuleBuilders += builder
}

def injectCheckRule(builder: CheckRuleBuilder): Unit = {
    checkRuleBuilders += builder
}

def injectOptimizerRule(builder: RuleBuilder): Unit = {
    optimizerRules += builder
}

......

那么赋值之后的SparkSessionExtensions如何应用在各个阶段呢?

这里暂停一下,需要先了解SessionState,这对于我们后面如何应用自定义构建规则会有帮助

SessionState

在之前几节中,我们了解到QueryExecution是一条sql执行的关键类【负责sql的解析,优化,物化等】,而这些阶段都需要使用sparkSession.sessionState.xxx[属性],如下图

image-20221101143747410

SessionState的属性是通过创建SessionState的时的传参,故需要看构建SessionState过程。

image-20221101121516183

而sessionState是保存在SparkSession中,所以需要先来看SparkSession的构建过程,常见创建sparkSession是通过getOrCreate函数

image-20221101153909215

SparkSession

这里回顾一下sparkSession的创建过程

		// TODO 创建SparkSQL的运行环境
    val sparkConf = new SparkConf().setMaster("local[2]").setAppName("sparkSQL")
    val spark = SparkSession.builder().config(sparkConf)
      .getOrCreate()

省略其他代码段后看到 new SparkSession(sparkContext, None, None, extensions) 创建SparkSession,可以看到最后一个参数是:extensions

image-20221101121942319

extensions正是我们前面看到的SparkSessionExtensions类,此时sparkSession中包含了我们扩展好的SparkSessionExtensions类了;

不过只是包含而已,还没和各个阶段整合到一起;接下来看SparkSession中的sessionState属性:期间调用了instantiateSessionState 函数

这里贴一下源码:可以看出构建的SessionState有两种子类,一种hive【HiveSessionStateBuilder】,一种普通memory【SessionStateBuilder】但最终返回的结果是统一的父类:BaseSessionStateBuilder

lazy val sessionState: SessionState = {
    parentSessionState
      .map(_.clone(this))
      .getOrElse {
        val state = SparkSession.instantiateSessionState( // 调用instantiateSessionState函数构建SessionState
          SparkSession.sessionStateClassName(sparkContext.conf), // 调用sessionStateClassName函数确定构建hive还是普通SessionState
          self)
        initialSessionOptions.foreach { case (k, v) => state.conf.setConfString(k, v) }
        state
      }
  }

// 如果构建SparkSession时调用了enableHiveSupport来连接hive,则此时会构建HiveSessionStateBuilder
private val HIVE_SESSION_STATE_BUILDER_CLASS_NAME =
    "org.apache.spark.sql.hive.HiveSessionStateBuilder"


private def sessionStateClassName(conf: SparkConf): String = {
    conf.get(CATALOG_IMPLEMENTATION) match {
      case "hive" => HIVE_SESSION_STATE_BUILDER_CLASS_NAME
      case "in-memory" => classOf[SessionStateBuilder].getCanonicalName
    }
  }


// 注意:这里将hive | in-memory的 className构建成了统一的父类BaseSessionStateBuilder,并且调用了.build()函数
private def instantiateSessionState(
      className: String,
      sparkSession: SparkSession): SessionState = {
    try {
      // invoke `new [Hive]SessionStateBuilder(SparkSession, Option[SessionState])`
      val clazz = Utils.classForName(className)
      val ctor = clazz.getConstructors.head
      ctor.newInstance(sparkSession, None).asInstanceOf[BaseSessionStateBuilder].build() // 这里将sparkSession传参进去
    } catch {
      case NonFatal(e) =>
        throw new IllegalArgumentException(s"Error while instantiating '$className':", e)
    }
  }

image-20221101154911667

此时看一下BaseSessionStateBuilder.build函数:由于BaseSessionStateBuilder类代码较多,这里以优化器阶段为主介绍,其他阶段类似

image-20221101160534673

这里new SparkOptimizer对象,正是QueryExecution中用到的optimizer,并重写了optimizer的extendedOperatorOptimizationRules属性,将父类原有属性集合中加入了扩展:customOperatorOptimizationRules函数返回规则集合

image-20221101160612265


customOperatorOptimizationRules函数调用了SparkSessionExtensions的buildOptimizerRules,如下:将SparkSessionExtensions内部扩展的optimizerRules集合元素进行创建后返回

image-20221101160744613

到这里就可以和上面暂停的地方结合起来看了,最开始我们将自定义的扩展类存储到了各个集合当中,如下:

type RuleBuilder = SparkSession => Rule[LogicalPlan]
type CheckRuleBuilder = SparkSession => LogicalPlan => Unit
type StrategyBuilder = SparkSession => Strategy
type ParserBuilder = (SparkSession, ParserInterface) => ParserInterface
type FunctionDescription = (FunctionIdentifier, ExpressionInfo, FunctionBuilder)
type ColumnarRuleBuilder = SparkSession => ColumnarRule

......

private[this] val resolutionRuleBuilders = mutable.Buffer.empty[RuleBuilder]

private[this] val postHocResolutionRuleBuilders = mutable.Buffer.empty[RuleBuilder]

private[this] val checkRuleBuilders = mutable.Buffer.empty[CheckRuleBuilder]

private[this] val optimizerRules = mutable.Buffer.empty[RuleBuilder]

......

def injectResolutionRule(builder: RuleBuilder): Unit = {
    resolutionRuleBuilders += builder
}

def injectPostHocResolutionRule(builder: RuleBuilder): Unit = {
    postHocResolutionRuleBuilders += builder
}

def injectCheckRule(builder: CheckRuleBuilder): Unit = {
    checkRuleBuilders += builder
}

def injectOptimizerRule(builder: RuleBuilder): Unit = {
    optimizerRules += builder
}

......


随后在buildOptimizerRules函数中将集合中用户扩展的元素创建出来并返回,至此SparkOptimizer类中的extendedOperatorOptimizationRules属性包含了我们自定义扩展的规则

image-20221102150546912

接下来看一下Optimizer是如何使用extendedOperatorOptimizationRules这个属性?

首先在Optimizer类中的defaultBatches属性中,会将我们扩展的类加入进去,至此Optimizer阶段的新增扩展类放入成功

image-20221101161026642

这里我们回到上一节的优化阶段,优化过程是通过batches集合迭代进行优化,而batches集合在Optimizer被重写

image-20221101165004948

可以看到被重写的batches使用了defaultBatches属性

image-20221101170421445

而defaultBatches中的extendedOperatorOptimizationRules集合正包含了用户自定义扩展的规则

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-iMHQFuiO-1669103114622)(/Users/hzxt/Library/Application Support/typora-user-images/image-20221101170442557.png)]

至此优化器自定义扩展全流程结束;关于其他阶段的扩展大致思想相同,感兴趣的小伙看可以看一下源码,方便理解

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/25718.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

vue.js毕业设计,基于vue.js前后端分离教室预约小程序系统设计与实现

功能介绍 【后台管理功能模块】 系统设置:设置关于我们、联系我们、加入我们、法律声明 广告管理:设置小程序首页轮播图广告和链接 留言列表:所有用户留言信息列表,支持删除 会员列表:查看所有注册会员信息&#xff0…

从零开始学前端:DOM、BOM、焦点事件 --- 今天你学习了吗?(JS:Day20)

从零开始学前端:程序猿小白也可以完全掌握!—今天你学习了吗?(JS) 复习:从零开始学前端:CSSOM视图模式 — 今天你学习了吗?(JS:Day19) 文章目录从…

java8 (jdk 1.8) 新特性——Stream ApI

在java8 中,有两个最重要的改变,一个就是之前了解的Lmbda java8 (jdk 1.8) 新特性——Lambda ,还有一个就是Stream Api 1. 什么是Stream API 简单来说就是一个类库,里边有一些方法方便我们对集合数据进行操作,就好像使用 SQL 语…

Windows cmd 命令及Linux 环境下导入导入mysql 数据库

文章目录一、背景二、Windows cmd 导入导出mysql 数据库1.导出数据库三种方式(导出数据库时不需要连接数据库)2. 操作步骤2.导入数据库三、linux 环境下导入导出数据库一、背景 最近在本机上安装了一个WMware 虚拟机,需要从本机(…

从三层架构说起,谈谈对历史项目的小改造

项目背景说明 最近接手一个 “老” 项目的需求修改,项目整体基于 .net core 3.1 平台,以传统的三层架构为基础构建。了解需求后,逐步对原有项目框架进行大概的了解,主要是熟悉一些框架的开发规范,基本工具类库的使用&…

寒亭5.8万亩盐碱稻 国稻种芯·中国水稻节:山东潍坊插秧期

寒亭5.8万亩盐碱稻 国稻种芯中国水稻节:山东潍坊插秧期 新京报讯(记者赵利新)新闻中国采编网 中国新闻采编网 谋定研究中国智库网 中国农民丰收节国际贸易促进会 国稻种芯中国水稻节 中国三农智库网-功能性农业农业大健康大会报道&#xff…

MMRotate 全面升级,新增 BoxType 设计

引言:大大降低水平框检测器改旋转框检测器的难度 MMRotate 是一个基于 PyTorch 和 MMDetection 的开源旋转框目标检测工具箱。它将目标检测从水平框扩展到旋转框,为场景文字、遥感影像、自动驾驶等领域的应用打下了基础,为学术界和产业界提供…

瞄准镜-第12届蓝桥杯Scratch选拔赛真题精选

[导读]:超平老师计划推出Scratch蓝桥杯真题解析100讲,这是超平老师解读Scratch蓝桥真题系列的第82讲。 蓝桥杯选拔赛每一届都要举行4~5次,和省赛、国赛相比,题目要简单不少,再加上篇幅有限,因此我精挑细选…

数据结构——单链表

一.简介 上一篇文章,我们介绍了线性表中的顺序表。 而顺序表拥有一些缺陷 1.空间不够时需要增容,增容需要付出代价 2.为避免重复扩容,我们进行指数扩容,可能会造成空间浪费 3.顺序表从开始位置连续存储,插入删除数…

卡特尔世界杯来了,只喝精酿啤酒不玩望京扑克,其实也是一种缺失

北京时间2022年11月20日,卡特尔世界杯正式拉开了序幕,全球都进入了世界杯时间。世界杯的开幕,最高兴的还是球迷朋友,大家可以欢聚一堂,喝着精酿啤酒看着足球,那滋味别提多舒服了。 世界杯对于广大球迷来说&…

表的增删查改

目录 插入数据 基本查询 更新数据 清空数据 聚合函数 group by子句 内置函数 基本查询练习 多表查询 子查询 合并查询 表的内外连接 插入数据 单行—全列插入 如下图,全列插入可以省略要在哪些列插入! 多行—指定列插入 如下图&#xff0…

安装 Red Hat Enterprise Linux 9.1 虚拟机

目录1. 官方下载链接与新闻2. 安装提示3. 系统安装步骤(1)进入系统引导界面(2)进入【系统语言选择】界面(3)进入【安装信息摘要】界面① 设置【root密码】② 设置【安装目的地】(4)进…

【python】使用python将多个视频合并、延长视频的时间

今天做知识分享的时候,最后保存的视频时长是58分钟,那么如何改变视频的时长,将视频时长改为一个小时呢? 下面提供3个方案: 方案1,重新录,很显然,不合理; 方案2&#xf…

蓝屏page_fault_in_nonpaged_area的解决办法

用户在操作电脑的过程中,难免会遇到蓝屏问题,最近就有用户遇到电脑蓝屏重启无限循环,提示代码page_fault_in_nonpaged_area,这要如何解决呢?下面就来看看详细的解决办法。 page_fault_in_nonpaged_area蓝屏代码解决方法…

【MySQL篇】第一篇——数据库基础

目录 什么是数据库 主流数据库 基本使用 MySQL安装 连接服务器 服务器管理 服务器,数据库,表关系 使用案例 创建数据库 使用数据库 创建数据库表 表中插入数据 查询表中的数据 数据逻辑存储 MySQL架构原理 MySQL整体逻辑架构 MySQL逻辑…

Eureka架构篇 - 服务发现

前言 从客户端与服务端两个角度概述一下Eureka服务发现的原理,如下: 客户端 依赖自动装配机制,客户端启动时就会从Eureka服务端全量获取服务实例的注册信息并缓存到本地。之后每隔30秒向Eureka服务端发起增量获取的请求,如果增…

云原生周刊 | 波音公司允许员工给开源项目做贡献

如果你要问谁对开源项目的贡献最小,那一定是保密等级很高的国防工业机构,但这个魔咒最近被波音公司给打破了。在最近的一次 Linux 基金会成员峰会 keynote 演讲中,波音公司提到他们会在 2022 年成立一个开源办公室,并且从即日起&a…

m基于MATLAB-GUI的GPS数据经纬度高度解析与kalman分析软件设计

目录 1.算法概述 2.仿真效果预览 3.MATLAB部分代码预览 4.完整MATLAB程序 1.算法概述 经度纬度和高度来自GPS信号的中的GPGGA的数据。所以提取这三个信息主要是对GPGGA中的数据进行整理。GPGGA的数据格式如下所示: GPGGA是GPS数据输出格式语句,意思是…

9问502

一、502意味着什么 502 Bad Gateway是指错误网关,无效网关;在互联网中表示一种网络错误。表现在WEB浏览器中给出的页面反馈。它通常并不意味着上游服务器已关闭(无响应网关/代理) ,而是上游服务器和网关/代理使用不一…

latex 模板使用技巧——参考文献篇

参考文献说明: 一、 常用参考文献类型 1、会议 (INPROCEEDINGS) 示例: INPROCEEDINGS{rcnn,title{Rich feature hierarchies for accurate object detection and semantic segmentation},author{Girshick, Ross and Donahue, J…