SparkSQL-执行流程

news2024/9/25 19:14:17

一、示例代码

我们从一句常用的sql开始分析

import spark.implicits._
import spark.sql

sql("SELECT * FROM ods.personal_info limit 10").show()

其中包含两个函数调用,sql()和show(),我们依次来分析下

二、sql()

1、SparkSession

它是使用Dataset和DataFrame API对Spark编程的入口点

  //使用Spark执行SQL查询,将结果作为“DataFrame”返回。
  def sql(sqlText: String): DataFrame = {
    Dataset.ofRows(self, sessionState.sqlParser.parsePlan(sqlText))
  }

这里要分析来两处:

        1、sessionState.sqlParser.parsePlan(sqlText)

        2、Dataset.ofRows()

我们先看下第1处,它是通过ParserInterface调用其子类AbstractSqlParser来实现

2、AbstractSqlParser

  //为给定的SQL字符串创建逻辑计划
  override def parsePlan(sqlText: String): LogicalPlan = parse(sqlText) { parser =>
    //获取将ParseTree转换为AST的构建器(访问者) 
    //这里用到了ANTLR的知识
    astBuilder.visitSingleStatement(parser.singleStatement()) match {
      case plan: LogicalPlan => plan
      case _ =>
        val position = Origin(None, None)
        throw QueryParsingErrors.sqlStatementUnsupportedError(sqlText, position)
    }
  }


  protected def parse[T](command: String)(toResult: SqlBaseParser => T): T = {
    logDebug(s"Parsing command: $command")

    //1、将sql转换成字符流
    //2、将字符流全部转换成大写,这大大简化了对流的词法分析,同时我们可以保持原始命令
    //3、词法分析是基于Hive的org.apache.hadoop.hive.ql.parse.ParseDriver.ANTLRNoCaseStringStream
    //4、Hive词法分析是基于ANTLR4
    val lexer = new SqlBaseLexer(new UpperCaseCharStream(CharStreams.fromString(command)))
    lexer.removeErrorListeners()
    lexer.addErrorListener(ParseErrorListener)
 
    //使用指定的令牌源和默认令牌通道 构造一个新的 CommonTokenStream
    val tokenStream = new CommonTokenStream(lexer)
    //解析
    val parser = new SqlBaseParser(tokenStream)
    parser.addParseListener(PostProcessor)
    parser.removeErrorListeners()
    parser.addErrorListener(ParseErrorListener)
    //spark.sql.legacy.setopsPrecedence.enabled  默认false
    //当设置为true并且括号未指定求值顺序时,设置操作将在查询中从左向右执行。当设置为false且括号未指定求值顺序时,INTERSECT操作将在任何UNION、EXCEPT和MINUS操作之前执行。
    parser.legacy_setops_precedence_enabled = conf.setOpsPrecedenceEnforced
    //spark.sql.legacy.exponentLiteralAsDecimal.enabled 默认值 false
    //当设置为true时,具有指数的文字(例如1E-30)将被解析为Decimal而不是Double。
    parser.legacy_exponent_literal_as_decimal_enabled = conf.exponentLiteralAsDecimalEnabled
    //spark.sql.ansi.enabled   默认 false
    //如果为true,Spark SQL将使用符合ANSI的方言,而不是符合Hive的方言。
    //
    parser.SQL_standard_keyword_behavior = conf.ansiEnabled

    try {
      try {
        // 首先,尝试使用可能更快的SLL模式进行解析
        parser.getInterpreter.setPredictionMode(PredictionMode.SLL)
        toResult(parser)
      }
      catch {
        case e: ParseCancellationException =>
          // 如果失败,则使用LL模式解析
          tokenStream.seek(0) // 倒带输入流
          parser.reset()

          // 重试
          parser.getInterpreter.setPredictionMode(PredictionMode.LL)
          toResult(parser)
      }
    }
    catch {
      //......
    }
  }

接下来我们看看第2步:Dataset.ofRows()

3、Dataset.ofRows()

private[sql] object Dataset {

  def ofRows(sparkSession: SparkSession, logicalPlan: LogicalPlan): DataFrame = {
    //根据逻辑计划获取一个QueryExecution 
    //使用Spark执行关系查询的主要工作流程。旨在让开发人员轻松访问查询执行的中间阶段。
    val qe = sparkSession.sessionState.executePlan(logicalPlan)
    //断言分析
    qe.assertAnalyzed()
    //最后构建一个row类型的Dataset也就是DataFrame返回
    new Dataset[Row](sparkSession, qe, RowEncoder(qe.analyzed.schema))
  }
}

我们先整体看看这个逻辑:

        1、数据方面:根据执行计划获取QueryExecution,并对该执行计划进行分析(只是分析这一步时懒惰的,需要真正触发执行时才校验)

        2、schema:根据数据构建RowEncoder将数据进行类型转换,适配程序

接下来我们看看qe.assertAnalyzed()都做了什么

4、QueryExecution

  def assertAnalyzed(): Unit = {
    // Analyzer在try块外调用,以避免在下面的catch块内再次调用它
    //它是一个懒执行的方法,只有触发action算子时才会执行
    analyzed
    try {
      //这里会对sql做分析校验
      sparkSession.sessionState.analyzer.checkAnalysis(analyzed)
    } catch {
      case e: AnalysisException =>
        //解析异常,这里会对sql进行解析并根据不同规则约束抛出不同的异常
    }
  }

  lazy val analyzed: LogicalPlan = {
    SparkSession.setActiveSession(sparkSession)
    sparkSession.sessionState.analyzer.execute(logical)
  }

接下来我们看看是如何对sql进行检查的

5、CheckAnalysis

当sql无法分析时,抛出面向用户的错误。

  def checkAnalysis(plan: LogicalPlan): Unit = {
    //我们对规则进行升级和排序,以捕捉第一个可能的失败,而不是级联解决失败的结果。
    //这里就不展开了,这里列举几个
    //1、跳过已分析的子计划
    //2、逻辑计划不应具有char/varchar类型的输出
    //3、Namespace 、Table、View、Hint等不存在
    //4、将 Table 的操作用在了 View 上 
    //5、表没有分区、不支持分区等等
    //......
    plan.foreachUp (......)
    //度量指标操作
    checkCollectedMetrics(plan)
    //覆盖以提供额外检查以进行正确分析。这些规则将在我们的内置检查规则之后进行评估。
    extendedCheckRules.foreach(_(plan))
    //如果有解析异常直接将错误抛给用户
    plan.foreachUp {
      case o if !o.resolved =>
        failAnalysis(s"unresolved operator ${o.simpleString(SQLConf.get.maxToStringFields)}")
      case _ =>
    }
    //递归地将此计划树中的所有节点标记为已分析
    plan.setAnalyzed()

  }

在构建Dataset时还需要构建一个RowEncoder,下面我们就来看看它

6、RowEncoder

它用来处理Spark SQL类型与其允许的外部类型之间的映射,比如:

BooleanType -> java.lang.Boolean
ByteType -> java.lang.Byte
ShortType -> java.lang.Short
IntegerType -> java.lang.Integer
FloatType -> java.lang.Float
DoubleType -> java.lang.Double
StringType -> String
DecimalType -> java.math.BigDecimal or scala.math.BigDecimal or Decimal

DateType -> java.sql.Date if spark.sql.datetime.java8API.enabled is false
DateType -> java.time.LocalDate if spark.sql.datetime.java8API.enabled is true

TimestampType -> java.sql.Timestamp if spark.sql.datetime.java8API.enabled is false
TimestampType -> java.time.Instant if spark.sql.datetime.java8API.enabled is true

TimestampNTZType -> java.time.LocalDateTime

DayTimeIntervalType -> java.time.Duration
YearMonthIntervalType -> java.time.Period

BinaryType -> byte array
ArrayType -> scala.collection.Seq or Array
MapType -> scala.collection.Map
StructType -> org.apache.spark.sql.Row

三、show()

sql()方法会返回一个DataFrame(其实也是一个Dataset),因此show()也是Dataset身上的。

1、Dataset

  //以表格形式显示数据集的前20行。超过20个字符的字符串将被截断,所有单元格将向右对齐。
  def show(): Unit = show(20)

  //数据样例:
  //year  month AVG('Adj Close) MAX('Adj Close)
  //1980  12    0.503218        0.595103
  //1981  01    0.523289        0.570307
  //1982  02    0.436504        0.475256
  //1983  03    0.410516        0.442194
  //1984  04    0.450090        0.483521
  def show(numRows: Int): Unit = show(numRows, truncate = true)

  //truncate 含义:
  //是否截断长字符串。如果为true,则超过20个字符的字符串将被截断,所有单元格将正确对齐
  def show(numRows: Int, truncate: Boolean): Unit = if (truncate) {
    println(showString(numRows, truncate = 20))
  } else {
    println(showString(numRows, truncate = 0))
  }

  //将查询到的数据构造成控制台可展示的字符串
  private[sql] def showString(_numRows: Int, truncate: Int = 20): String = {
    //可展示的最大行数
    val numRows = _numRows.max(0)
    //从这里就看出来,toDF()函数执行就已经将数据拿到了,因此才可以take出前21条来
    //我们后面重点看toDF()做了什么
    val takeResult = toDF().take(numRows + 1)
    val hasMoreData = takeResult.length > numRows
    val data = takeResult.take(numRows)

    lazy val timeZone =
      DateTimeUtils.getTimeZone(sparkSession.sessionState.conf.sessionLocalTimeZone)

    // 对于数组值,将Seq和array替换为方括号。
    //对于超出“truncate”字符的单元格,将其替换为第一个“truncate-3”和“…”
    val rows: Seq[Seq[String]] = schema.fieldNames.toSeq +: data.map { row =>
      row.toSeq.map { cell =>
        val str = cell match {
          case null => "null"
          case binary: Array[Byte] => binary.map("%02X".format(_)).mkString("[", " ", "]")
          case array: Array[_] => array.mkString("[", ", ", "]")
          case seq: Seq[_] => seq.mkString("[", ", ", "]")
          case d: Date =>
            DateTimeUtils.dateToString(DateTimeUtils.fromJavaDate(d))
          case ts: Timestamp =>
            DateTimeUtils.timestampToString(DateTimeUtils.fromJavaTimestamp(ts), timeZone)
          case _ => cell.toString
        }
        if (truncate > 0 && str.length > truncate) {
          // 对于长度小于4个字符的字符串,不要显示省略号。
          if (truncate < 4) str.substring(0, truncate)
          else str.substring(0, truncate - 3) + "..."
        } else {
          str
        }
      }: Seq[String]
    }

    val sb = new StringBuilder
    val numCols = schema.fieldNames.length

    // 将每列的宽度初始化为最小值“3”
    val colWidths = Array.fill(numCols)(3)

    // 计算每列的宽度
    for (row <- rows) {
      for ((cell, i) <- row.zipWithIndex) {
        colWidths(i) = math.max(colWidths(i), cell.length)
      }
    }

    // 创建分隔线
    val sep: String = colWidths.map("-" * _).addString(sb, "+", "+", "+\n").toString()

    //列名填充
    rows.head.zipWithIndex.map { case (cell, i) =>
      if (truncate > 0) {
        StringUtils.leftPad(cell, colWidths(i))
      } else {
        StringUtils.rightPad(cell, colWidths(i))
      }
    }.addString(sb, "|", "|", "|\n")

    sb.append(sep)

    // 数据填充
    rows.tail.map {
      _.zipWithIndex.map { case (cell, i) =>
        if (truncate > 0) {
          StringUtils.leftPad(cell.toString, colWidths(i))
        } else {
          StringUtils.rightPad(cell.toString, colWidths(i))
        }
      }.addString(sb, "|", "|", "|\n")
    }

    sb.append(sep)

    // 对于具有多个“numRows”记录的数据
    if (hasMoreData) {
      val rowsString = if (numRows == 1) "row" else "rows"
      sb.append(s"only showing top $numRows $rowsString\n")
    }

    sb.toString()
  }

2、构建一个新的Dataset对象

sql()返回的是一个新的Dataset吗,并不是,而是自带的伴生对象Dataset

而调用了toDF()后,真的会new一个Dataset出来

def toDF(): DataFrame = new Dataset[Row](sparkSession, queryExecution, RowEncoder(schema))

那这个时候我们就很有必要看看Dataset类中有什么属性和自动调起的方法了

class Dataset[T] private[sql](
    @transient val sparkSession: SparkSession,
    @DeveloperApi @InterfaceStability.Unstable @transient val queryExecution: QueryExecution,
    encoder: Encoder[T])
  extends Serializable {

  //sql()中的那个断言解析是在这里调用的
  queryExecution.assertAnalyzed()

  def this(sparkSession: SparkSession, logicalPlan: LogicalPlan, encoder: Encoder[T]) = {
    //根据逻辑计划创建一个queryExecution
    this(sparkSession, sparkSession.sessionState.executePlan(logicalPlan), encoder)
  }

  //这个应该是为了向前兼容
  def this(sqlContext: SQLContext, logicalPlan: LogicalPlan, encoder: Encoder[T]) = {
    this(sqlContext.sparkSession, logicalPlan, encoder)
  }

  @transient private[sql] val logicalPlan: LogicalPlan = {
    // 对于各种命令(如DDL)和具有副作用的查询,我们强制立即执行查询,让这些副作用迅速发生。
    queryExecution.analyzed match {
      case c: Command =>
        //根据需要插入shuffle操作和内部行格式转换,为执行准备一个计划的[[SparkPlan]]
        //SparkPlan 再调用 executeCollect()  运行此查询,将结果作为数组返回。
        //最后还是调用RDD的collect(),运行一个Job来执行sql
        LocalRelation(c.output, queryExecution.executedPlan.executeCollect())
      case u @ Union(children) if children.forall(_.isInstanceOf[Command]) =>
        LocalRelation(u.output, queryExecution.executedPlan.executeCollect())
      case _ =>
        queryExecution.analyzed
    }
  }

  //目前[[ExpressionEncoder]]是[[Encoder]]的唯一实现,
  //在这里我们显式地将传入的编码器转换为[[ExpressionEncoder]],并隐式标记它,
  //以便我们在构建具有相同对象类型(可能会解析为不同模式)的新Dataset对象时使用它。
  private[sql] implicit val exprEnc: ExpressionEncoder[T] = encoderFor(encoder)

  //编码器主要用作Dataset中serde表达式的容器。
  //我们通过这些serde表达式构建逻辑计划,并在查询框架内执行。
  //但是,出于性能原因,我们可能希望使用编码器作为函数,将内部行反序列化为自定义对象,
  //例如collect。在这里,我们解析并绑定编码器,以便稍后调用它的`fromRow`方法。
  private val boundEnc =
    exprEnc.resolveAndBind(logicalPlan.output, sparkSession.sessionState.analyzer)

  //sqlContext必须为val,因为导入隐式时需要一个稳定的标识符
  @transient lazy val sqlContext: SQLContext = sparkSession.sqlContext

  //将数据集的内容表示为“T”的“RDD”
  lazy val rdd: RDD[T] = {
    val objectType = exprEnc.deserializer.dataType
    val deserialized = CatalystSerde.deserialize[T](logicalPlan)
    sparkSession.sessionState.executePlan(deserialized).toRdd.mapPartitions { rows =>
      rows.map(_.get(0, objectType).asInstanceOf[T])
    }
  }


}

四、总结

源码已经大概型的过了一遍,下面我们把SparkSQL执行的整个过程来捋一下

sql()

构建逻辑计划:

    1、将sql字符串转换成大写的字符流
    2、用ANTLR4对其进行词法分析
    3、构建一个CommonTokenStream并进行解析
    4、对有歧义的情况做设置,比如1E-30应该被解析为Decimal还是Double
    5、首先,尝试使用可能更快的SLL模式进行解析,如果失败,则使用LL模式解析

构建DataFrame:

    1、根据逻辑计划创建QueryExecution
    2、断言分析sql预计的异常情况(如表、视图、库是否存在等)
    3、new一个RowEncoder,为DataFrame准备schema
    4、返回由预期数据(QueryExecution)和schema组成的DataFrame

show()

    1、用户可以设置展示多少行结果,默认是20行
    2、每列结果最多显示20个字符串,用户可以设置是截断还是...代替
    3、调用toDF() new一个新的Dataset 这里面会做两件事情(1、规则优化。2、转化为RDD进行任务提交)如果任务执行成功,最终会获取到结果数据
    4、3获取的是全量数据,需要根据用户设置的显示行数做截取
    5、设置每列的宽度(最小值为3个字符)、分割线、表头和数据
    6、控制台展示结果

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2164535.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

从传统到智能:低代码平台在生产型企业中的应用实践

在全球数字化浪潮的推动下&#xff0c;生产型企业正面临前所未有的变革压力。为了在激烈的市场竞争中保持竞争力&#xff0c;企业迫切需要通过技术手段实现业务流程的优化和创新。然而&#xff0c;传统的软件开发方式往往耗时耗力&#xff0c;难以快速响应市场需求。低代码平台…

一些依赖库的交叉编译步骤

交叉编译链版本&#xff1a;12.3.0 一、curl-7.43.0库交叉编译 libcurl是一个跨平台的网络协议库&#xff0c;支持http, https, ftp, gopher, telnet, dict, file, 和ldap 协议。libcurl同样支持HTTPS证书授权&#xff0c;HTTP POST, HTTP PUT, FTP 上传, HTTP基本表单上传&a…

Django学习实战篇六(适合略有基础的新手小白学习)(从0开发项目)

前言&#xff1a; 上一章中&#xff0c;我们完成了页面样式的配置&#xff0c;让之前简陋的页面变得漂亮了些。 整理一下目前已经完成的系统&#xff0c;从界面上看&#xff0c;已经完成了以下页面: 首页分类列表页标签列表页口博文详情页 这离我们的需求还有些距离&#xff0…

哪款手机软件适合记事?记事本软件推荐

在这个信息爆炸的时代&#xff0c;手机已经成为我们生活中不可或缺的一部分。它不仅携带方便&#xff0c;而且功能强大&#xff0c;几乎可以完成我们日常所需的所有任务。随着生活节奏的加快&#xff0c;人们越来越需要一个可靠的工具来帮助自己记录重要信息和工作事项。这时候…

德勤校招网申笔试综合能力测试SHL题库与面试真题攻略

德勤的综合能力测试&#xff08;General Ability&#xff09;是其校园招聘在线测评的关键环节&#xff0c;旨在评估应聘者的多项认知能力。以下是对这部分内容的全面整合&#xff1a; 综合能力测试&#xff08;General Ability&#xff09; 测试时长为46分钟&#xff0c;包含…

ORA-12560:TNS:协议适配器错误

今天准备在数据库服务器创建一个用户&#xff0c;使用管理员账号进行登录 sqlplus / as sysdba时&#xff0c;突然报了个ORA-12560&#xff1a;TNS&#xff1a;协议适配器错误&#xff0c;吓的我一激灵&#xff0c;不应该啊&#xff0c;之前一直都是正常的&#xff0c;也是在网…

大漠yolo-数据集标注

参考 【按键精灵】大漠插件yolo环境配置_哔哩哔哩_bilibili 1. 2. 3.启动

MySQL高阶1873-计算特殊奖金

目录 题目 准备数据 分析数据 总结 题目 编写解决方案&#xff0c;计算每个雇员的奖金。如果一个雇员的 id 是 奇数 并且他的名字不是以 M 开头&#xff0c;那么他的奖金是他工资的 100% &#xff0c;否则奖金为 0 。 返回的结果按照 employee_id 排序。 准备数据 Crea…

记录踩坑 uniapp 引入百度地图(微信小程序,H5,APP)

前言 因为公司要求一定要用百度地图,网上引入百度地图的方法说的就三种(插件,异步,webview组件),因为我用的是VUE3 第一种方法引入插件(插件名vue-baidu-map)一直报错vue2没试过反正vue3引进去就是报错第二种方法用异步引入 如果只开发app和h5可以用,微信小程序反正不显示,但…

android studio 批量修改包名 app package name

1、批量修改包名&#xff1a;project view模式 我们可以看到&#xff0c;只可以修改myapplication的部分包名&#xff0c;前面的com.demo这个修改了&#xff0c;可以进行如下设置来达到修改demo的目的。 2、设置下&#xff0c;通过不同的目录来达到批量修改的目的&#xff1a;…

2024最新甄选7款超好用的文档加密软件 | 好用的企业文档加密软件大盘点!赶快码住!

在数字化时代&#xff0c;文档如同古代的锦书密函&#xff0c;承载着企业的智慧与机密。 正如古诗所云&#xff1a;"锦书难托云中雁&#xff0c;密语常藏月下窗。" 2024年&#xff0c;我们不仅要传承古人的智慧&#xff0c;更要借助现代科技的力量&#xff0c;守护…

张朝阳的物理课第三卷:量子力学的硬核探索与启发

&#x1f482; 个人网站:【 摸鱼游戏】【神级代码资源网站】【海拥导航】&#x1f91f; 找工作&#xff0c;来万码优才&#xff1a;&#x1f449; #小程序://万码优才/HDQZJEQiCJb9cFi&#x1f485; 想寻找共同学习交流&#xff0c;摸鱼划水的小伙伴&#xff0c;请点击【全栈技…

使用Prometheus进行系统监控,包括Mysql、Redis,并使用Grafana图形化表示

Prometheus是一个开源的的监控工具&#xff0c;而且还免费。这一次我们用Prometheus来对之前安装的所有服务&#xff0c;包括Mysql、Redis、系统状况等进行监控&#xff0c;并结合Grafana进行图形化展示 Prometheus下载和安装 下载地址&#xff08;以下所有插件的官方下载地址…

二叉搜索树(来学包会) C++经验+1

目录 什么是二叉搜索树 解二叉搜索树 二叉搜索树的操作 二叉搜索树的插入&#xff08;三步走&#xff09; 二叉搜索树的搜索 二叉搜索树的删除 1.删除的节点是叶子节点 2.删除的节点只有一边的子树 3.删除的节点左子树和右子树都有 详细完整代码 什么是二叉搜索树 二…

MT76X8、MT7621、MT7981和QCA9531的GPIO列表

一、 MT76X8 GPIO列表; 二、 MT7621 GPIO列表; 三、MTK7981 GPIO列表; 四、QCA9531 GPIO列表;

CentOS 7 aarch64制作openssh 9.9p1 rpm包 —— 筑梦之路

本篇文章还是基于开源项目openssh-rpms制作。 https://github.com/boypt/openssh-rpms.git 官方发行说明&#xff1a; OpenSSH: Release Notes 1. 修改version.env 2. 下载源码包 openssl网站改版&#xff0c;下载地址和之前不一样了 # 下载openssl1.1.1w源码包cd downlo…

nacos 快速入门

目录 什么是 Nacos Nacos 的主要特点&#xff1a; Dockerfiledocker-compose.yml 快速搭建 nacos 单机 什么是 Nacos Nacos/nɑ:kəʊs/ 是“动态命名和配置服务”的缩写&#xff0c;是一个用于构建云原生应用的易于使用的动态服务发现、配置和服务管理平台。 Nacos 致力于…

【JAVA开源】基于Vue和SpringBoot的图书馆管理系统

本文项目编号 T 044 &#xff0c;文末自助获取源码 \color{red}{T044&#xff0c;文末自助获取源码} T044&#xff0c;文末自助获取源码 目录 一、系统介绍二、演示录屏三、启动教程四、功能截图五、文案资料5.1 选题背景5.2 国内外研究现状5.3 可行性分析5.4 用例设计 六、核…

Linux·进程概念(上)

1.操作系统 任何计算机系统都包含一个基本的程序合集&#xff0c;称为操作系统(Operator System)。笼统的理解&#xff0c;操作系统包括&#xff1a; 内核(进程管理&#xff0c;内存管理&#xff0c;文件管理&#xff0c;驱动管理) 其他程序(函数库&#xff0c;shell程序) OS的…

知乎知+推广怎么做?投放费用是多少?

知乎以其独特的问答形式不仅吸引了大量高质量的用户群体&#xff0c;也成为了一个不可多得的品牌营销阵地。为了帮助企业更好地利用这一平台进行品牌推广&#xff0c;知乎推出了“知”推广服务&#xff0c;而作为专业的数字营销解决方案提供商&#xff0c;云衔科技更是全面支持…