SPARKSQL3.0-DataFrameAPI与spark.sql()区别源码分析

news2025/1/15 17:31:35

一、前言:

阅读本节需要先掌握spark-sql内部执行的基本知识:

SessionState

Unresolved阶段

Analyzer阶段中queryExecution的介绍

二、区别

spark.sql的执行顺序为: sql字符串 -> antlr4解析成AST语法树 -> unreolved解析成logicalPlan -> Analyzer解析 -> Optimizer优化 -> 后续物理执行计划

DataFrame执行顺序: 根据api直接构建logicalPlan -> 根据调用不同的api嵌套成新的logicalPlan【部分函数包含Analyzer解析】 -> action算子触发Optimizer优化 -> 后续物理执行计划

可以看出dataFrameAPI 和 spark.sql 的【Optimizer优化 -> 后续物理执行计划】完全一致;

而dataFrameAPI的Analyzer阶段是在调用select等函数时直接触发Analyzer阶段,下面有详细过程

两者唯一的区别是dataFrame省略了【antlr4解析成AST语法树 -> unreolved解析成logicalPlan】步骤

三、示例:

val value = spark
      .range(2)
      .select('id as "_id")
      .filter('_id === 0)

value.explain(true)

结果: 看起来和spark.sql的explan一样

== Parsed Logical Plan ==
'Filter ('_id = 0)
+- Project [id#0L AS _id#2L]
   +- Range (0, 2, step=1, splits=Some(2))

== Analyzed Logical Plan ==
_id: bigint
Filter (_id#2L = cast(0 as bigint))
+- Project [id#0L AS _id#2L]
   +- Range (0, 2, step=1, splits=Some(2))

== Optimized Logical Plan ==
Project [id#0L AS _id#2L]
+- Filter (id#0L = 0)
   +- Range (0, 2, step=1, splits=Some(2))

== Physical Plan ==
*(1) Project [id#0L AS _id#2L]
+- *(1) Filter (id#0L = 0)
   +- *(1) Range (0, 2, step=1, splits=2)

四、源码:

关于spark.sql的执行过程前面已经详细讲过,这里主要介绍dataFrameAPI的执行过程

先来看spark.range(2):可以看到最终是构建了一个Dataset[long]返回,但dateSet的构建需要logicalPlan入参,也就是Range

image-20220708153544744

logicalPlan入参,如下图:

image-20220708163906267

可以看出dataSet的logicalPlan入参就是range函数中的Range,来看一下Range的构建,看出是构建了一个Range类,而range类正是logicalPlan的子类

image-20220708154014212

然后回到构建DateSet-this函数那里:参数logicalPlan = Range

def this(sparkSession: SparkSession, logicalPlan: LogicalPlan, encoder: Encoder[T]) = { // 参数logicalPlan = Range
    this(sparkSession.sessionState.executePlan(logicalPlan), encoder)		// 通过executePlan函数构建QueryExecution
  }

......
class Dataset[T] private[sql](
  	// Dataset的构建第一个参数需要是QueryExecution,所以上面的this构造函数需要先构建QueryExecution
    @DeveloperApi @Unstable @transient val queryExecution: QueryExecution, 
    @DeveloperApi @Unstable @transient val encoder: Encoder[T])


......
def executePlan(plan: LogicalPlan): QueryExecution = createQueryExecution(plan) // 调用createQueryExecution函数

......
// 实际上就是构建一个新的QueryExecution
protected def createQueryExecution: LogicalPlan => QueryExecution = { plan =>
    new QueryExecution(session, plan)
  }

此时QueryExecution已经构建完成,主函数中.range(2)函数的返回值是DateSet,DateSet中的logicalPlan为:Range

Range (0, 2, step=1, splits=Some(2))

image-20221122105922127

再看DateSet.select('id as “_id”), 这里贴一下源码:需要主要两个函数: withPlan{}、logicalPlan变量

def select(cols: Column*): DataFrame = withPlan { // 这里的withPlan函数很重要,下面会有介绍
		// untypedCols函数主要是将用户传输的column字段检查
  	val untypedCols = cols.map {	
      ......
    }
    Project(untypedCols.map(_.named), logicalPlan) // 可以看到这里是根据range函数创建的【range】logicalPlan又构建了一个新的Porject【logicalPlan】
  }

再看withPlan函数:调用了Dataset.ofRows函数,参数是上面创建的Project logicalPlan

image-20220708161701280

在ofRows函数中,参数logicalPlan = Project,再调用sparkSession.sessionState.executePlan函数构建新的QueryExecution

再经过assertAnalyzed解析后构建新的DateSet返回,保持和RDD一样的不可变性

def ofRows(sparkSession: SparkSession, logicalPlan: LogicalPlan): DataFrame =
    sparkSession.withActive {
      val qe = sparkSession.sessionState.executePlan(logicalPlan)  // 这里调用executePlan函数来构建一个新的queryExection
      qe.assertAnalyzed()																					 // 执行解析,可以看出这一步直接执行Analyzer阶段
      new Dataset[Row](qe, RowEncoder(qe.analyzed.schema))				 // 构建新的DateSet返回,和RDD的不可变特性保持一致
  }

......
def executePlan(plan: LogicalPlan): QueryExecution = createQueryExecution(plan)

......
// 实际上就是构建一个新的QueryExecution
protected def createQueryExecution: LogicalPlan => QueryExecution = { plan =>
    new QueryExecution(session, plan)
  }


此时主函数.select函数返回的DataSet中的logicalPlan为:

Project [id#0L AS _id#2L]
+- Range (0, 2, step=1, splits=Some(2))

再来看filter函数:根据上一次的Project - logicalPlan 构建一个新的Filter父节点,然后执行withTypedPlan函数,withTypedPlan函数中调用了DataSet的apply函数

def filter(condition: Column): Dataset[T] = withTypedPlan {
    Filter(condition.expr, logicalPlan) // 根据上一次的Project - logicalPlan 构建一个新的Filter父节点
  }

@inline private def withTypedPlan[U : Encoder](logicalPlan: LogicalPlan): Dataset[U] = {
    Dataset(sparkSession, logicalPlan) // 调用DataSet的apply函数
  }

DataSet的apply函数:可以看到此处是新构建了一个DateSet,并将Filter-logicalPlan传入,并返回

image-20220708162822776

这里我们看一下返回的DateSet:可以看到其内部的QueryExecution的logicalPlan包含了各个节点的语法树,此处已和spark.sql的logicalPlan完全相同

后续的【Analyzer解析 -> Optimizer优化 -> 后续物理执行计划】将保持一致

image-20220708163300890

至此DataFrameAPI与spark.sql(“”)执行区别结束,再来回顾一下两者的异同:

spark.sql的执行顺序为: sql字符串 -> antlr4解析成AST语法树 -> unreolved解析成logicalPlan -> Analyzer解析 -> Optimizer优化 -> 后续物理执行计划

DataFrame执行顺序: 根据api直接构建logicalPlan -> 根据调用不同的api嵌套成新的logicalPlan【部分函数包含Analyzer解析】 -> action算子触发Optimizer优化 -> 后续物理执行计划

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/27481.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

云计算正当时!将你的Java项目容器化

将您的 Java 应用程序 Docker 化 长按关注《Java学研大本营》,加入读者群,分享更多精彩 扫码关注《Java学研大本营》,加入读者群,分享更多精彩 随着每个人都转向 Kubernetes,知道如何对应用程序进行 Dockerize 是件好…

IPv6转换难点分析之一:国家监测指标-中科三方

从IPv4过渡到IPv6就像是“打破一个旧世界,创建一个新世界”,注定要经历一个长期的过程,但终究会实现。 一、IPv6过渡转换的障碍 目前互联网上还是以IPv4设备为主,不可能迅速过渡到IPv6,这主要受制于以下几个方面: &…

Nat. Mach. Intell2021 | MolCLR+:基于GNN的分子表征对比学习

原文标题:Molecular Contrastive Learning of Representations via Graph Neural Networks 代码:https://github.com/yuyangw/MolCLR 一、问题提出 首先,分子信息难以完全表示。例如,基于字符串的表示,如SMILES和SE…

未来展望:Starday供应链火力全开,为跨境电商再添动力!

2022年末将至,回顾这“命途多舛”的一年,也是令人非常唏嘘。近日,联合国发布《2022年年中世界经济形势与展望》报告,该报告指出正处于疫情恢复期的全球经济可能又到了一场新危机的边缘。据相关统计显示,2022年相比于20…

HttpServlet学习中的常见问题(个人珍藏笔记)

目录 一、HttpServlet 1.1核心方法 1.2、面试:谈谈Servlet的生命周期 二、HttpServletRequest 2.1、核心方法 2.2、如何获取请求头 三、HttpServletResponse 3.1核心方法 四、setCharacterEncoding和setContentType区别? 五、Json格式的转换问题…

串行通讯协议,只需要一文就可以给你讲懂

前言 最近在做一个通过ESP8266和STM32通讯从而实现远程控制,中间需要用到串口来发送报文,通过报文来实现两者之间的通讯。 今天刚好趁着这个机会来给大家讲解一下串行通讯,希望能给大家以后的学习提供一些思路。 串行通讯介绍 串口通信线…

磨金石教育摄影干货分享|怎样拍出唯美有内涵的“中国风”照片

之前有网友私聊我,说这两年很多人都在拍中国元素的照片,他们的照片不仅仅是对古建筑简单的拍摄,照片的内容拥有丰富的文化内涵。想问我这样的照片应该怎么拍才能达到有内涵的水平。 其实这个问题确实有一定的难度,每个人都有自己…

腾格尔成影视圈的香饽饽,十月天传媒正式邀请演唱主题曲

俗话说:姜还是老的辣,酒还是陈酿好。这句话用到音乐人腾格尔身上,就再贴切不过了。说起音乐人腾格尔,这位来自草原的雄鹰和苍狼,有很多部音乐作品,都被歌迷朋友们一直传唱。 如今的腾格尔老师,已…

六.初阶指针

前言:大家好哇!今天带大家认识下C语言中的指针,指针的用法等,希望对大家有所帮助! 目录 一.指针是什么 1.指针是什么? 2.如何理解指针变量 二.指针和指针的类型 1.指针类型 2.指针类型的意义 &#x…

http 跨域资源共享详解

http 跨域资源共享详解 由于浏览器同源策略限制,会导致出现跨域问题。而跨域资源共享(CORS)可以突破浏览的同源策略的限制,不过需要服务端配合设置相应的响应头,从而使跨源数据传输得以安全进行。 跨域资源共享新增了…

进销存软件对中小型企业管理有什么作用?

进销存软件对中小型企业管理有什么作用? 01 更加有序 库存不乱单据不乱价格不乱 使用进销存软件可以把这些都记录下来,有条不紊,出现什么问题也有据可查,不像纸质单据,会丢会坏,乱成一团。 02 能打印正式…

[Spring Cloud] Hystrix通过配置文件统一设置参数/与OpenFeign结合使用

✨✨个人主页:沫洺的主页 📚📚系列专栏: 📖 JavaWeb专栏📖 JavaSE专栏 📖 Java基础专栏📖vue3专栏 📖MyBatis专栏📖Spring专栏📖SpringMVC专栏📖SpringBoot专…

网站favion.ico图标

Favicon.ico一般用于作为缩略的网站标志,它显示在浏览器的地址栏或者标签上。 目前主要的浏览器都支持favicon.ico图标 一 制作favicon图标 1 把品优购图标切成png图片 2 把png图片转换为ico图标,这需要借助第三方转换网站,例如&#xff1…

刷题经验分享(一)

文章目录删除公共字符:组队竞赛:删除公共字符: 第一题:删除公共字符 方法一: 思路: 1.将第二个字符串的字符都映射到一个hashtable数组中,用来判断一个字符在这个字符串。 2. 判断一个字符在第…

140.深度学习分布式计算框架-3

140.1 Horovod Horovod是 Uber 开源的又一个深度学习工具,它的发展吸取了 Facebook「一小时训练 ImageNet 论文」与百度 Ring Allreduce 的优点,可为用户实现分布式训练提供帮助。Horovod 支持通过用于高性能并行计算的低层次接口 – 消息传递接口 (MPI…

Vue脚手架环境中简单使用MarkDown(只入门)

目录 入门 高级使用 入门 1 所在终端输入 npm install vue-meditor 2 复制以下代码 先新建一个组件 <template><div><MavonEditor v-model"myMarkDownData"/><button click"submit">提交</button></div> </te…

vue配置

首先安装node.js 在cmd node -v查看 然后 cmd命令行执行 : npm install -g vue/cli // 加-g是安装到全局 安装vue cli 安装vue cli 1 看一下 这是在cmd输入的内容,参考第一条链接 在vscode中怎么配置他? 文件-打开文件夹,选择一个文件夹 这里是firstvue 在下面新建文件夹…

可变长子网划分

目录 IP地址 子网划分 可变长子网划分 IP地址 在学习子网划分之前应该先清楚什么是IP地址和IP地址的类型 IP 地址的格式0网络地址主机地址10网络地址 主机地址 110网络地址主机地址1110组播地址11110保留 A 1.0.0.0~127.255.255.255 B 128.0.0.0~191.255.255…

C++编程进阶

目录 new运算符 new关键字的使用案例 C的引用 C中引用案例 引用的注意事项 引用做函数参数 引用做函数的返回值 前言&#xff1a; 具体案例 引用的本质 常量引用 常量引用原理 经典案例 函数的提高 函数的默认参数 注意&#xff1a; 具体案例 函数的占位参数…

已解决:树莓派外接硬盘 usb 或者sata 导致wifi无法链接 无线网卡无法使用问题

我的环境是树莓派4b 买了一个有硬盘的盒子 看上图的连接方式&#xff0c;是占用了树莓派的一个usb3.1进行了sata的转接&#xff0c;实现挂载硬盘。 但是我发现&#xff0c;安装系统开机之后&#xff0c;可以看到有硬盘接入&#xff0c;但是无法连wifi&#xff0c;如果拔掉硬盘…