spark sql(二)sql解析流程扩展

news2025/1/18 2:53:39

1、前言

        通过前面的文章我们了解到,spark sql通过catalyst框架解析sql,而在将sql语句转变为可执行的任务过程中会将大的sql解析流程划分为未解析的逻辑计划、解析后的逻辑计划、优化后的逻辑计划、物理计划、可执行物理计划等阶段。大概的解析流程如下所述:

1)SQL语句经过SqlParser解析成Unresolved LogicalPlan;

2)使用analyzer结合数据字典(catalog)进行绑定,生成Resolved LogicalPlan;

3)使用optimizer对Resolved LogicalPlan进行优化,生成Optimized LogicalPlan;

4)使用SparkPlan将LogicalPlan转换成PhysiclPlan;

5)使用prepareForException将PhysicalPlan转换成可执行物理计划

        spark sql将解析流程划分为多个阶段,每个阶段都有其特有的功能和解析工具,这种清晰的划分使得我们更容易理解catalyst框架的功能作用。除此之外,该框架在每个解析阶段都提供了自定义扩展点,即在每个解析阶段都可以引入我们的自定义处理逻辑。

2、自定义扩展点

        spark sql大体在四个阶段提供了扩展能力,分别是未解析的逻辑计划、解析后的逻辑计划、优化后的逻辑计划、物理计划。在这四个阶段后面我们都可以引入自己的处理逻辑,具体的引入方法如下:

1)Parser 阶段:

        injectParser – 添加parser自定义规则,parser负责SQL解析。

2)Analyzer阶段(都是analysis解析后才执行):

        injectCheckRule – 添加Analyzer自定义Check规则。

        injectResolutionRule – 添加Analyzer自定义规则到Resolution阶段,analyzer负责逻辑执行计划生成。

        injectPostHocResolutionRule – 添加Analyzer自定义规则到Post Resolution阶段。

3)optimizer阶段:

        injectOptimizerRule – 添加optimizer自定义规则,optimizer负责逻辑执行计划的优化,我们例子中就是扩展了逻辑优化规则。

4)planner strategy阶段(物理计划阶段):

        injectPlannerStrategy – 添加planner strategy自定义规则,planner负责物理执行计划的生成。

3、扩展demo

        首先是通用的main方法,因为所有示例都用的同一个查询,所以这里只展示一次:

  def main(args: Array[String]): Unit = {

    val sparkSession = SparkSession.builder
      .appName("test")
      .master("local")
      .withExtensions(e => e.injectParser(ExtendParser))
      .withExtensions(e => e.injectResolutionRule(ExtendAnalyzerResolutionRule))
      .withExtensions(e => e.injectPostHocResolutionRule(ExtendAnalyzerPostHocResolutionRule))
      .withExtensions(e => e.injectCheckRule(ExtendAnalyzerCheckRule))
      .withExtensions(e => e.injectOptimizerRule(ExtendOptimizerRule))
      .withExtensions(e => e.injectPlannerStrategy(ExtendSparkStrategy))
      .getOrCreate

    import org.apache.spark.sql.types._

    val schema = StructType(
      List(StructField("id", IntegerType, nullable = false),
        StructField("name", StringType, nullable = false),
        StructField("age", IntegerType, nullable = false))
    )

    val rdd = sparkSession.sparkContext.parallelize(Seq(
      Row(1, "xiaohong", 34),
      Row(2, "xiaoli", 42),
      Row(3, "xiaoming", 28),
    ))

    val df = sparkSession.sqlContext.createDataFrame(rdd, schema)

    df.createOrReplaceTempView("person")

    val df2 = sparkSession.sql("select id,name from person where age > 30")

    df2.show()

  }

        另外这里有个小小的语法知识,就是case classs 不用我们主动new创建对象,因为样例类默认提供了apply方法,因此我们可以直接写类名,下面所有的扩展对象我们都用case class进行定义。

3.1 Parser 阶段

        这个阶段是将sql语句初始转换为语法树。通常这个阶段扩展的场景比较少,难度也较大。

case class ExtendParser(sparkSession: SparkSession, delegate: ParserInterface) extends ParserInterface with Logging{
  override def parsePlan(sqlText: String): LogicalPlan = {
    logInfo("parser: extend ourselves parser ")
    delegate.parsePlan(sqlText)
  }

  override def parseExpression(sqlText: String): Expression = delegate.parseExpression(sqlText)

  override def parseTableIdentifier(sqlText: String): TableIdentifier = delegate.parseTableIdentifier(sqlText)

  override def parseFunctionIdentifier(sqlText: String): FunctionIdentifier = delegate.parseFunctionIdentifier(sqlText)

  override def parseMultipartIdentifier(sqlText: String): Seq[String] = delegate.parseMultipartIdentifier(sqlText)

  override def parseTableSchema(sqlText: String): StructType = delegate.parseTableSchema(sqlText)

  override def parseDataType(sqlText: String): DataType = delegate.parseDataType(sqlText)

  override def parseRawDataType(sqlText: String): DataType = delegate.parseRawDataType(sqlText)
}

        运行后可以看到如下一行日志:

3.2 Analyzer阶段

        这个阶段是将语法树和数据源关联上,将语法树上的表、字段等信息转换为解析后的状态

case class ExtendAnalyzerResolutionRule(sparkSession: SparkSession) extends Rule[LogicalPlan] with Logging {
  override def apply(plan: LogicalPlan): LogicalPlan = {
    logInfo("Analyzer: extend ourselves analyzer - ResolutionRule")
    plan
  }
}

case class ExtendAnalyzerPostHocResolutionRule(sparkSession: SparkSession) extends Rule[LogicalPlan] with Logging {
  override def apply(plan: LogicalPlan): LogicalPlan = {
    logInfo("Analyzer: extend ourselves analyzer - PostHocResolutionRule")
    plan
  }
}

case class ExtendAnalyzerCheckRule(sparkSession: SparkSession) extends (LogicalPlan => Unit) with Logging {
  override def apply(v1: LogicalPlan): Unit = {
    logInfo("Analyzer: extend ourselves analyzer - CheckRule")
  }
}

        运行后的日志效果如下:

        可以看到有很多重复的解析日志,这是因为catalyst框架解析语法树时是以访问者的模式将规则应用到语法树所有节点上。而不是将语法树传入解析规则中。

3.3 optimizer阶段

        这个阶段主要是针对解析的逻辑计划做各种优化处理,目前spark中提供了很多针对固定场景的优化规则。但是有些特定业务场景下的sql优化,spark sql中并没有提供,此时就需要我们通过手动扩展的方式引入我们的优化逻辑。这个阶段的扩展也是最常用的扩展阶段

case class ExtendOptimizerRule(sparkSession: SparkSession) extends Rule[LogicalPlan] with Logging {
  override def apply(plan: LogicalPlan): LogicalPlan = {
    logInfo("Optimizer: extend ourselves optimizer - OptimizerRule")
    plan
  }
}

        运行后的日志效果如下:

3.4 planner strategy阶段

        这个阶段主要跟物理计划的生成有关,比如你想更改查询源的方式,从jdbc方式改为odbc或者直接查询文件。

case class ExtendSparkStrategy(sparkSession: SparkSession) extends SparkStrategy with Logging {
  override def apply(plan: LogicalPlan): Seq[SparkPlan] = {
    logInfo("Planner strategy: extend ourselves strategy - SparkStrategy")
    Seq.empty
  }
}

        运行后日志效果如下:

4、总结

        本文章主要讲解的是spark sql解析中的扩展方式,这些内容更偏向于实战。具体的逻辑计划、物理计划等的细节会在后续系列文章中进行讲解。因为Parser 阶段主要是将sql语句转换为基础的语法树;Analyzer阶段主要是将语法树和数据源关联上,将语法树上的表、字段等信息转换为解析后的状态。这两个阶段扩展的需求很少,因此工作中常用的扩展方式是在optimizer阶段和planner strategy阶段

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/387122.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Handler与线程

简介 Handler提供的种异步消息处理机制是:当它发出一个消息进入消息队列后,发送消息的函数立刻返回,接着主线程会逐个地从消息队列中把消息取出,然后对消息进行处理。明显,Handler发送消息和接收消息是异步进行的&…

三八送什么数码产品好?适合送礼的数码产品

数码产品是我们生活中比较常见到的物品,相比较于一般礼物的观赏性,它的实用性更强一些,所以如果你不知道送什么礼物给别人的话,数码产品也是不错的选择。 一、南卡小音舱蓝牙耳机 这个时代的女性,变得越来越自信了&am…

ChatGPT解答:根据使用者输入的字符串,自动判断规则,并给出各种正则表达式,用Python实现

ChatGPT解答: 根据使用者输入的字符串,自动判断规则,并给出各种正则表达式,用Python实现 根据输入的字符串,自动给出正则表达式 根据使用者输入的字符串,自动判断规则,并给出各种正则表达式&am…

JVM系统优化实践(7):垃圾回收器与垃圾回收算法

您好,我是湘王,这是我的CSDN博客,欢迎您来,欢迎您再来~上回说到了年轻代、老年代与数据计算的一个案例。接下来就先讲一讲年轻代和老年代的两个垃圾回收器:ParNew和CMS。和Serial垃圾回收器一样&#xff0c…

实战:yaml方式安装ingress-nginx-2023.3.2(测试成功)

实战:yaml方式安装ingress-nginx-2023.3.2(测试成功) 目录 文章目录实战:yaml方式安装ingress-nginx-2023.3.2(测试成功)目录实验环境实验软件1、安装过程2、第一个示例关于我最后最后实验环境 实验环境: 1、win10,vmwrokstation虚机&#x…

AI_News周刊:第四期

CV - 计算机视觉 | ML - 机器学习 | RL - 强化学习 | NLP 自然语言处理 News 1.对抗“唤醒人工智能”马斯克招募团队开发 OpenAI 竞争对手 据两位直接了解这项工作的人士和另一位了解情况的人士透露,埃隆马斯克最近几周与人工智能研究人员接洽,商讨成…

详细分析什么是进程?如何理解进程状态?

什么是进程? 比较官方一点的回答是:当一个程序加载到内存的时候,就是一个进程。 但是这是不准确的回答,进程是怎么在内存中形成的,以及内存是如何管理进程的,是通过什么描述进程的?下面我们将…

Neo4j数据库部署配置

这里写目录标题一、neo4j图形数据库安装与部署1.1配置JDK运行环境(注意jdk与neo4j版本对应)1.2部署Neo4j(注意jdk与neo4j版本对应)二、数据库基本操作演示一、neo4j图形数据库安装与部署 1.1配置JDK运行环境(注意jdk与…

centos安装rocketmq

centos安装rocketmq1 下载rocketmq二进制包2 解压二进制包3 修改broker.conf4 修改runbroker.sh和runserver.sh的JVM参数5 启动NameServer和Broker6 安装rockermq dashboard(可视化控制台)1 下载rocketmq二进制包 点击rocketmq二进制包下载地址,下载完成之后通过ft…

javaEE 初阶 — 数据链路层中的以太网数据帧

文章目录以太网帧格式1. MAC 地址2. MAC 地址是如何与 IP 地址相互配合的3. 以太网帧格式中的类型MTU(了解)以太网帧格式 数据链路层主要考虑的是相邻的两个结点之间的传输。 这里最知名的协议就是 以太网。 一个以太网数据帧有三个部分组成。帧头载荷…

【GlobalMapper精品教程】055:GM坐标转换器的巧妙使用

GM软件提供了一个简单实用的坐标转换工具,可以实现地理坐标和投影坐标之间的高斯正反算及多种转换计算。 文章目录 一、坐标转换器认识二、坐标转换案例1. 地理坐标←→地理坐标2. 地理坐标←→投影坐标三、在输出坐标上创建新的点四、其他转换工具的使用一、坐标转换器认识 …

653600-56-7,Ac4GaINAz,N-叠氮四酰化半乳糖用于PROTAC合成

基础产品数据:CAS号:653600-56-7中文名:N-叠氮四酰化半乳糖,叠氮修饰半乳糖英文名: Ac4GaINAzAc4GaINAz结构式(Structural):详细产品数据:分子式:C16H22N4O10…

python学习——【第二弹】

前言 上一篇文章 python学习——【第一弹】给大家介绍了python中的基本数据类型等,这篇文章接着学习python中的运算符的相关内容。 运算符 python中的运算符主要有:算术运算符,赋值运算符,比较运算符,布尔运算符以及…

NPP夜间灯光遥感数据读取与可视化

1、Google Earth EngineGoogle Earth Engine是Google推出的行星尺度的遥感云计算平台,提供了大量遥感数据的集成与运算工具。同时也包括DMSP和NPP夜间灯光遥感数据(月尺度和年尺度)。这里给出样例的可视化代码。var dataset ee.ImageCollect…

1.2 CSS标签选择器,类选择器

CSS选择器: 根据不同的需求选出不同的标签,进行美化装饰 1. 标签选择器 标签选择器(元素选择器):用 HTML标签名作为选择器,按标签名称进行分类,为页面某一类标签指定统一的CSS样式 作用: 可以把某一类标签全部选中&…

UWB通道选择、信号阻挡和反射对UWB定位范围和定位精度的影响

(一)介绍检查NLOS操作时需要考虑三个方面:(1)由于整体信号衰减,通信范围减小。(2)由于直接路径信号的衰减,导致直接路径检测范围的减小。(3)由于阻…

记录--手摸手带你撸一个拖拽效果

这里给大家分享我在网上总结出来的一些知识,希望对大家有所帮助 前言 最近看见一个拖拽效果的视频(抖音:艾恩小灰灰),看好多人评论说跟着敲也没效果,还有就是作者也不回复大家提出的一些疑问,本着知其然必要知其所以然…

栈帧之局部变量表(Local Variables)解读

局部变量表也被称之为局部变量数组或本地变量表 定义为一个数字数组,主要用于存储方法参数和定义在方法体内的局部变量,这些数据类型包括各类基本数据类型、对象引用(reference),以及returnAddress类型。由于局部变量表…

2023最新版本RabbitMQ下载安装教程

一、RabbitMQ简介 RabbitMQ 是一个由 Erlang 语言开发的 AMQP 的开源实现。主要用于在进程、应用程序和服务器之间交换数据,可以通过插件支持进行扩展,支持许多协议,并提供高性能、可靠性、集群和高可用队列。 AMQP :Advanced Me…

2023年疫情开放,国内程序员薪资涨了还是跌了?大数据告诉你答案

自从疫情开放,国内各个行业都开始有复苏的迹象,尤其是旅游行业更是空前暴涨,那么互联网行业如何? 有人说今年好找工作多了,有人说依然是内卷得一塌糊涂,那么今年开春以来,各个岗位的程序员工资…