SPARKSQL3.0-SessionState构建源码剖析

news2024/12/26 9:30:47

一、介绍

Apache Spark 2.0引入了SparkSession,其目的是为用户提供了一个统一的切入点来使用Spark的各项功能,不再需要显式地创建SparkConf, SparkContext 以及 SQLContext,因为这些对象已经封装在SparkSession中。此外SparkSession允许用户通过它调用DataFrame和Dataset相关API来编写Spark程序。

那么在sparkSql模块中,sql各个阶段的解析的核心类则是SessionState,在后续的文章中会多次使用到SessionState的变量,故本节将介绍SessionState是如何构建的

二、构建过程

常见构建sparkSession写法:

// TODO 创建SparkSQL的运行环境
val sparkConf = new SparkConf().setMaster("local[2]").setAppName("sparkSQL")
val spark = SparkSession.builder().config(sparkConf)
  .getOrCreate()

// hive如下:
val spark = SparkSession.builder().config(sparkConf)
  .enableHiveSupport()
  .getOrCreate()

getOrCreate函数

def getOrCreate(): SparkSession = synchronized {
		  // SparkSession只能在Driver端创建和访问 
      assertOnDriver()
      // 首先检查是否存在有效的线程本地SparkSession,如果session不为空,且session对应的sparkContext未停止了,返回现有session
      var session = activeThreadSession.get()
      if ((session ne null) && !session.sparkContext.isStopped) {
        applyModifiableSettings(session)
        return session
      }

      // 线程同步执行
      SparkSession.synchronized {
        // 如果当前线程没有活动会话,请从全局会话获取它。
        session = defaultSession.get()
        if ((session ne null) && !session.sparkContext.isStopped) {
          applyModifiableSettings(session)
          return session
        }

        // 没有活动或全局默认会话。创建一个新的sparkContext
        val sparkContext = userSuppliedContext.getOrElse {
          val sparkConf = new SparkConf()
          options.foreach { case (k, v) => sparkConf.set(k, v) }

          // set a random app name if not given.
          if (!sparkConf.contains("spark.app.name")) {
            sparkConf.setAppName(java.util.UUID.randomUUID().toString)
          }

          SparkContext.getOrCreate(sparkConf)
          // Do not update `SparkConf` for existing `SparkContext`, as it's shared by all sessions.
        }
				
        // 这里是扩展用户指定的自定义扩展,后面会有一节单独介绍
        applyExtensions(
          sparkContext.getConf.get(StaticSQLConf.SPARK_SESSION_EXTENSIONS).getOrElse(Seq.empty),
          extensions)
				
        // 重点此处构建SparkSession;extensions入参为自定义扩展类,后面会有一节单独介绍。
        session = new SparkSession(sparkContext, None, None, extensions)
        options.foreach { case (k, v) => session.initialSessionOptions.put(k, v) }
        setDefaultSession(session)
        setActiveSession(session)
        registerContextListener(sparkContext)
      }

      return session
    }

在SparkSession类中有一个核心属性:SessionState,该属性存储着sparksql各个阶段的执行过程,十分重要:

image-20221101211708569

这里看一下SessionState的属性,单单从属性就能看出其重要性,属性中包含了各个阶段的执行类;sql处理的各个阶段几乎都在使用这些变量

image-20221101211940561

**回到SparkSession构建SessionState过程,这里贴一下源码: **

可以看到先是通过config的CATALOG_IMPLEMENTATION属性分辨构建出两种SessionState:

1、HiveSessionStateBuilder

2、SessionStateBuilder

但最终返回的是统一父类:BaseSessionStateBuilder,而且是通过BaseSessionStateBuilder.build创建SessionState。

lazy val sessionState: SessionState = {
    parentSessionState
      .map(_.clone(this))
      .getOrElse {
        val state = SparkSession.instantiateSessionState( // 调用instantiateSessionState函数构建SessionState
          SparkSession.sessionStateClassName(sparkContext.conf), // 调用sessionStateClassName函数确定构建hive还是普通SessionState
          self)
        initialSessionOptions.foreach { case (k, v) => state.conf.setConfString(k, v) }
        state
      }
  }

// HiveSessionStateBuilder全类名,用于后面反射
private val HIVE_SESSION_STATE_BUILDER_CLASS_NAME = 
	"org.apache.spark.sql.hive.HiveSessionStateBuilder"

// 当调用enableHiveSupport函数时会将CATALOG_IMPLEMENTATION = hive
def enableHiveSupport(): Builder = synchronized {
  if (hiveClassesArePresent) {
    config(CATALOG_IMPLEMENTATION.key, "hive")
  } else {
    throw new IllegalArgumentException(
      "Unable to instantiate SparkSession with Hive support because " +
        "Hive classes are not found.")
  }
}

// 通过config的CATALOG_IMPLEMENTATION属性分辨构建出两种SessionState
private def sessionStateClassName(conf: SparkConf): String = {
    conf.get(CATALOG_IMPLEMENTATION) match {
      case "hive" => HIVE_SESSION_STATE_BUILDER_CLASS_NAME
      case "in-memory" => classOf[SessionStateBuilder].getCanonicalName
    }
  }


// 注意:这里将hive | in-memory的 className构建成了统一的父类BaseSessionStateBuilder,并且调用了.build()函数
private def instantiateSessionState(
      className: String,
      sparkSession: SparkSession): SessionState = {
    try {
      // invoke `new [Hive]SessionStateBuilder(SparkSession, Option[SessionState])`
      val clazz = Utils.classForName(className)
      val ctor = clazz.getConstructors.head
      ctor.newInstance(sparkSession, None).asInstanceOf[BaseSessionStateBuilder].build() // 这里将sparkSession传参进去
    } catch {
      case NonFatal(e) =>
        throw new IllegalArgumentException(s"Error while instantiating '$className':", e)
    }
  }

image-20221101154911667

接下来看一下BaseSessionStateBuilder的build函数,内容较多这里贴一下核心的代码:

// 构建SessionState
def build(): SessionState = {
    new SessionState(
      session.sharedState,
      conf,
      experimentalMethods,
      functionRegistry,
      udfRegistration,
      () => catalog, // catalog元数据,后面会在catalog一节单独介绍
      sqlParser,		// sql解析核心类
      () => analyzer,	// analyzer阶段核心类
      () => optimizer, // optimizer阶段核心类
      planner,			   // 物理计划和锡类
      () => streamingQueryManager,
      listenerManager,
      () => resourceLoader,
      createQueryExecution,
      createClone,
      columnarRules)
  }

// 创建sql解析器
protected lazy val sqlParser: ParserInterface = {
    extensions.buildParser(session, new SparkSqlParser(conf))
}

// 创建Analyzer解析器
protected def analyzer: Analyzer = new Analyzer(catalogManager, conf) {
    ......
}

// 创建optimizer优化器类
protected def optimizer: Optimizer = {
    new SparkOptimizer(catalogManager, catalog, experimentalMethods) {
      ......
    }
  }

// 创建planner物理计划类
protected def planner: SparkPlanner = {
    new SparkPlanner(session, conf, experimentalMethods) {
      ......
    }
  }

至此sql各个阶段的核心类创建准备完成,每种核心类的使用会在后面各个阶段的文种中详细展开,这里不赘述。

本节主要介绍了SessionState的构建过程和其核心函数的创建,这在后面各个阶段中会多次提及和使用。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/23337.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

字节跳动提出KVM内核热升级方案,效率提升5.25倍!

背景 作为云计算最重要的底层基础之一,KVM 虚拟化软件在现代的数据中心中应用非常广泛。基于 KVM 的 hypervisor 包括了构成宿主机的软硬件,共同为虚拟机中的应用程序提供高性能的 CPU、内存和 IO 设备等资源。在大规模部署的生产环境中,作为…

周年更名,元宇宙产业委再上新台阶

今天,2022年11月10日,全球元宇宙大会在鹏城隆重举行,这个日子也是中国移动通信联合会元宇宙产业工作委员会成立一周年的日子。会上,我们宣布了这个更名消息,这也是元宇宙产业工作委员会迈上一个新台阶的标志。有20多家…

ES学习笔记

01:REST 指的是客户端和服务器之间的交互在请求之间是无状态的,从客户端到服务器的每个请求都必须包含理解请求所必须的信息,同时在请求之间的任意间隔时间点,若服务器重启,那么客户端是得不到相应的通知的.所以无状态的请求可以由任何可用的服务器回答. 在REST样式的Web服务中…

LeetCode 42.接雨水

这篇记录一下刷接雨水这道题的过程,日后回顾 目录 法1: 法2: 法3: 法4: 法5: 务必掌握123 写这道题要知道雨水怎么算。核心就是要算当前列雨水的高度就要取决于这列左右两侧比自己搞得柱子中较矮的那…

【26-业务开发-基础业务-品牌管理-图片管理-上传图片功能实现-基于阿里云OSS服务-解决跨域问题-设置跨域规则-修改ACL权限为公共读】

一.知识回顾 【0.三高商城系统的专题专栏都帮你整理好了,请点击这里!】 【1-系统架构演进过程】 【2-微服务系统架构需求】 【3-高性能、高并发、高可用的三高商城系统项目介绍】 【4-Linux云服务器上安装Docker】 【5-Docker安装部署MySQL和Redis服务】…

springmvc源码之Web上下文初始化

系列文章目录 springmvc源码之Web上下文初始化 文章目录系列文章目录Web上下文初始化ContextLoaderListenerinitWebApplicationContext初始化创建WebApplicationContext上下文Web上下文初始化 web上下文与SerlvetContext的生命周期应该是相同的,springmvc中的web上…

公众号免费查题

公众号免费查题 本平台优点: 多题库查题、独立后台、响应速度快、全网平台可查、功能最全! 1.想要给自己的公众号获得查题接口,只需要两步! 2.题库: 题库:题库后台(点击跳转) 题…

Flink概念及应用场景

1、Flink实时应用场景 Flink在实时计算领域内的主要应用场景主要分为四类: 实时数据同步流式ETL实时数据分析复杂事件处理2、实时数据体系架构 实时数据体现大致分为三类场景: 流量类业务类特征类在数据模型上,流量类是扁平化的宽表&…

论文管理系统(用户列表显示功能)

上次我们已经实现了论文管理系登录功能,这次我们要实现登录之后的跳转到首页,并且让首页列表显示出数据库的信息并在Mapper中写入模糊查询功能语句,这次我们不使用MybatisPlus写这个功能,这次使用Mybatis来写,区别就是Plus是继承于<BaseDAO>Mapper,而Mybatis则是我们通过…

JAVASE(复习)——static

static 关键字是静态的意思,是Java中的一个修饰符,可以修饰成员方法,成员变量 一、被static修饰的特点 被类的所有对象共享&#xff08;在堆内存共享&#xff09; 随着类的加载而加载&#xff0c;优先于对象存在&#xff08;就是你加载类的时候就加载了&#xff09; 可以通过…

@Transactional注解为何会失效

使用 Transactional 注解能保证方法内多个数据库操作要么同时成功、要么同时失败。但是有很多细节需要注意&#xff0c;不然Transactional可能会失效。 1.注解应用在非 public 的方法上 如果Transactional注解应用在非public 修饰的方法上&#xff0c;Transactional将会失效。…

FLink源码 1.13 3 种 命令客户端 Generic CLI 、 yarn-cluster、DefaultCLI使用

先说结论:对于三种Cli,Generic CLI mode、yarn-cluster mode、default mode,无法同时使用,源码使用顺序为Generic CLI mode优先判断,接着是 yarn-cluster mode ,最后是default mode,所以对于三种Cli的参数,不能混用,否则会出现命令不生效的情况,具体使用以及源码见下…

【滤波跟踪】基于matlab扩展卡尔曼滤波的无人机路径跟踪【含Matlab源码 2236期】

⛄一、EKF算法简介 扩展卡尔曼滤波是利用泰勒级数展开方法将非线性滤波问题转化成近似的线性滤波问题,利用线性滤波的理论求解非线性滤波问题的次优滤波算法。其系统的状态方程和量测方程分别如式(1)、式(2)所示: 式中,X(k)为n维的随机状态向量序列,Z(k)为n维的随机量测向量序…

Matlab 对连续时间信号的运算

Matlab 对连续时间信号的运算 1、连续时间系统零状态响应 题目&函数说明 连续时间系统的微分方程为 y"(t) 4y’(t) 3y(t) f’(t) 2f(t) 当 输入信号 f(t) 20e-2tu(t) 时&#xff0c;初始值 y(0-) 2, y’(0-) 1, 求系统的零状态响应 Matlab 库函数中的 lsim(…

GNN Tensorflow packages

tf framework定义 tf.name_scope()函数 tf.name_scope(name)&#xff0c;用于定义python op的上下文管理器。 此上下文管理器将推送名称范围&#xff0c;这将使其中添加的所有操作的名称带有前缀。 例如&#xff0c;定义一个新的Python op my_op&#xff1a; def my_op(a,…

scratch踢足球 电子学会图形化编程scratch等级考试一级真题和答案解析2022年9月

目录 scratch踢足球 一、题目要求 1、准备工作 2、功能实现 二、案例分析

代码随想录算法训练营第三十九天| LeetCode62. 不同路径、LeetCode63. 不同路径 II

一、LeetCode62. 不同路径 1&#xff1a;题目描述&#xff08;62. 不同路径&#xff09; 一个机器人位于一个 m x n 网格的左上角 &#xff08;起始点在下图中标记为 “Start” &#xff09;。 机器人每次只能向下或者向右移动一步。机器人试图达到网格的右下角&#xff08;在下…

python 视角下的 6 大程序设计原则

众所周知&#xff0c;python 是面向对象的语言。 但大多数人学习 python 只是为了写出“能够实现某些任务的自动化脚本”&#xff0c;因此&#xff0c;python 更令人熟知的是它脚本语言的身份。 那么&#xff0c;更近一步&#xff0c;如果使用 python 实现并维护一个大的项目…

基于SpringBoot的CSGO赛事管理系统

末尾获取源码 开发语言&#xff1a;Java Java开发工具&#xff1a;JDK1.8 后端框架&#xff1a;SpringBoot 前端&#xff1a;采用JSP技术开发 数据库&#xff1a;MySQL5.7和Navicat管理工具结合 服务器&#xff1a;Tomcat8.5 开发软件&#xff1a;IDEA / Eclipse 是否Maven项目…

为研发效能而生|一场与 Serverless 的博弈

2022 年 11 月 3 日&#xff0c;第三届云原生编程挑战赛即将迎来终极答辩&#xff0c;18 支战队、32 位云原生开发者入围决赛&#xff0c;精彩即将开启。 云原生编程挑战赛项目组特别策划了《登顶之路》系列选手访谈&#xff0c;期待通过参赛选手的故事&#xff0c;看到更加生…