Spark AQE 导致的 Driver OOM问题

news2024/11/24 2:17:41

背景

最近在做Spark 3.1 升级 Spark 3.5的过程中,遇到了一批SQL在运行的过程中 Driver OOM的情况,排查到是AQE开启导致的问题,再次分析记录一下,顺便了解一下Spark中指标的事件处理情况

结论

SQLAppStatusListener 类在内存中存放着 一个整个SQL查询链的所有stage以及stage的指标信息,在AQE中 一个job会被拆分成很多job,甚至几百上千的job,这个时候 stageMetrics的数据就会成百上倍的被存储在内存中,从而导致Driver OOM
解决方法:

  1. 关闭AQE spark.sql.adaptive.enabled false
  2. 合并对应的PR-SPARK-45439

分析

背景知识:对于一个完整链接的sql语句来说(比如说从 读取数据源,到 数据处理操作,再到插入hive表),这可以称其为一个最小的SQL执行单元,这最小的数据执行单元在Spark内部是可以跟踪的,也就是用executionId来进行跟踪的。
对于一个sql,举例来说 :

insert into  TableA select * from TableB;

在生成 物理计划的过程中会调用 QueryExecution.assertOptimized 方法,该方法会触发eagerlyExecuteCommands调用,最终会到SQLExecution.withNewExecutionId方法:

  def assertOptimized(): Unit = optimizedPlan

  ...
  lazy val commandExecuted: LogicalPlan = mode match {
    case CommandExecutionMode.NON_ROOT => analyzed.mapChildren(eagerlyExecuteCommands)
    case CommandExecutionMode.ALL => eagerlyExecuteCommands(analyzed)
    case CommandExecutionMode.SKIP => analyzed
  }
  ...
  lazy val optimizedPlan: LogicalPlan = {
  // We need to materialize the commandExecuted here because optimizedPlan is also tracked under
  // the optimizing phase
  assertCommandExecuted()
  executePhase(QueryPlanningTracker.OPTIMIZATION) {
    // clone the plan to avoid sharing the plan instance between different stages like analyzing,
    // optimizing and planning.
    val plan =
      sparkSession.sessionState.optimizer.executeAndTrack(withCachedData.clone(), tracker)
    // We do not want optimized plans to be re-analyzed as literals that have been constant
    // folded and such can cause issues during analysis. While `clone` should maintain the
    // `analyzed` state of the LogicalPlan, we set the plan as analyzed here as well out of
    // paranoia.
    plan.setAnalyzed()
    plan
  }
  
  def assertCommandExecuted(): Unit = commandExecuted
  ...
  private def eagerlyExecuteCommands(p: LogicalPlan) = p transformDown {
    case c: Command =>
      // Since Command execution will eagerly take place here,
      // and in most cases be the bulk of time and effort,
      // with the rest of processing of the root plan being just outputting command results,
      // for eagerly executed commands we mark this place as beginning of execution.
      tracker.setReadyForExecution()
      val qe = sparkSession.sessionState.executePlan(c, CommandExecutionMode.NON_ROOT)
      val name = commandExecutionName(c)
      val result = QueryExecution.withInternalError(s"Eagerly executed $name failed.") {
        SQLExecution.withNewExecutionId(qe, Some(name)) {
          qe.executedPlan.executeCollect()
        }
      }  

SQLExecution.withNewExecutionId主要的作用是设置当前计划的所属的executionId:

    val executionId = SQLExecution.nextExecutionId
    sc.setLocalProperty(EXECUTION_ID_KEY, executionId.toString)

EXECUTION_ID_KEY的值会在JobStart的时候传递给Event,以便记录跟踪整个执行过程中的指标信息。
同时我们在方法中eagerlyExecuteCommands看到qe.executedPlan.executeCollect()这是具体的执行方法,针对于insert into 操作来说,物理计划就是
InsertIntoHadoopFsRelationCommand,这里的run方法最终会流转到DAGScheduler.submitJob方法:

    eventProcessLoop.post(JobSubmitted(
      jobId, rdd, func2, partitions.toArray, callSite, waiter,
      JobArtifactSet.getActiveOrDefault(sc),
      Utils.cloneProperties(properties)))

最终会被DAGScheduler.handleJobSubmitted处理,其中会发送SparkListenerJobStart事件:

    listenerBus.post(
      SparkListenerJobStart(job.jobId, jobSubmissionTime, stageInfos,
        Utils.cloneProperties(properties)))

该事件会被SQLAppStatusListener捕获,从而转到onJobStart处理,这里有会涉及到指标信息的存储,这里我们截图出dump的内存占用情况:
在这里插入图片描述

可以看到 SQLAppStatusListener 的 LiveStageMetrics 占用很大,也就是 accumIdsToMetricType占用很大

那在AQE中是怎么回事呢?
我们知道再AQE中,任务会从source节点按照shuffle进行分割,从而形成单独的job,从而生成对应的shuffle指标,具体的分割以及执行代码在AdaptiveSparkPlanExec.getFinalPhysicalPlan中,如下:

      var result = createQueryStages(currentPhysicalPlan)
      val events = new LinkedBlockingQueue[StageMaterializationEvent]()
      val errors = new mutable.ArrayBuffer[Throwable]()
      var stagesToReplace = Seq.empty[QueryStageExec]
      while (!result.allChildStagesMaterialized) {
        currentPhysicalPlan = result.newPlan
        if (result.newStages.nonEmpty) {
          stagesToReplace = result.newStages ++ stagesToReplace
          executionId.foreach(onUpdatePlan(_, result.newStages.map(_.plan)))

          // SPARK-33933: we should submit tasks of broadcast stages first, to avoid waiting
          // for tasks to be scheduled and leading to broadcast timeout.
          // This partial fix only guarantees the start of materialization for BroadcastQueryStage
          // is prior to others, but because the submission of collect job for broadcasting is
          // running in another thread, the issue is not completely resolved.
          val reorderedNewStages = result.newStages
            .sortWith {
              case (_: BroadcastQueryStageExec, _: BroadcastQueryStageExec) => false
              case (_: BroadcastQueryStageExec, _) => true
              case _ => false
            }

          // Start materialization of all new stages and fail fast if any stages failed eagerly
          reorderedNewStages.foreach { stage =>
            try {
              stage.materialize().onComplete { res =>
                if (res.isSuccess) {
                  events.offer(StageSuccess(stage, res.get))
                } else {
                  events.offer(StageFailure(stage, res.failed.get))
                }
                // explicitly clean up the resources in this stage
                stage.cleanupResources()
              }(AdaptiveSparkPlanExec.executionContext)

这里就是得看stage.materialize()这个方法,这两个stage只有两类:BroadcastQueryStageExec 和 ShuffleQueryStageExec
这两个物理计划稍微分析一下如下:

  • BroadcastQueryStageExec
    数据流如下:
    broadcast.submitBroadcastJob
          ||
          \/
    promise.future
          ||
          \/
    relationFuture
          ||
          \/
    child.executeCollectIterator()
    
    
    其中 promise的设置在relationFuture方法中,而relationFuture 会被doPrepare调用,而submitBroadcastJob会调用executeQuery,从而调用doPrepare,executeCollectIterator()最终也会发送JobSubmitted事件,分析和上面的一样
  • ShuffleQueryStageExec
     shuffle.submitShuffleJob
          ||
          \/
     sparkContext.submitMapStage(shuffleDependency)
          ||
          \/
     dagScheduler.submitMapStage
    
    

submitMapStage会发送MapStageSubmitted事件:

    eventProcessLoop.post(MapStageSubmitted(
      jobId, dependency, callSite, waiter, JobArtifactSet.getActiveOrDefault(sc),
      Utils.cloneProperties(properties)))

最终会被DAGScheduler.handleMapStageSubmitted处理,其中会发送SparkListenerJobStart事件:

    listenerBus.post(
      SparkListenerJobStart(job.jobId, jobSubmissionTime, stageInfos,
        Utils.cloneProperties(properties)))

该事件会被SQLAppStatusListener捕获,从而转到onJobStart处理:

  private val liveExecutions = new ConcurrentHashMap[Long, LiveExecutionData]()
  private val stageMetrics = new ConcurrentHashMap[Int, LiveStageMetrics]()
   ...
 
  override def onJobStart(event: SparkListenerJobStart): Unit = {
    val executionIdString = event.properties.getProperty(SQLExecution.EXECUTION_ID_KEY)
    if (executionIdString == null) {
      // This is not a job created by SQL
      return
    }

    val executionId = executionIdString.toLong
    val jobId = event.jobId
    val exec = Option(liveExecutions.get(executionId))

该方法会获取事件中的executionId,在AQE中,同一个执行单元的executionId是一样的,所以stageMetrics内存占用会越来越大。
而这里指标的更新是在AdaptiveSparkPlanExec.onUpdatePlan等方法中。

这样整个事件的数据流以及问题的产生原因就应该很清楚了。

其他

为啥AQE以后多个Job还是共享一个executionId呢?因为原则上来说,如果没有开启AQE之前,一个SQL执行单元的是属于同一个Job的,开启了AQE之后,因为AQE的原因,一个Job被拆成了了多个Job,但是从逻辑上来说,还是属于同一个SQL处理单元的所以还是得归属到一次执行中。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1627097.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

怎么用PHP语言实现远程控制电器

怎么用PHP语言实现远程控制电器呢? 本文描述了使用PHP语言调用HTTP接口,实现控制电器,通过控制电器的电源线路来实现电器控制。 可选用产品:可根据实际场景需求,选择对应的规格 序号设备名称厂商1智能WiFi通断器AC3统…

Vue从0-1学会如何自定义封装v-指令

文章目录 介绍使用1. 理解指令2. 创建自定义指令3. 注册指令4. 使用自定义指令5. 自定义指令的钩子函数6. 传递参数和修饰符7. 总结 介绍 自定义封装 v-指令是 Vue.js 中非常强大的功能之一,它可以让我们扩展 Vue.js 的模板语法,为 HTML 元素添加自定义行…

Kubernetes学习-核心概念篇(一) 初识Kubernetes

🏷️个人主页:牵着猫散步的鼠鼠 🏷️系列专栏:Kubernetes渐进式学习-专栏 🏷️个人学习笔记,若有缺误,欢迎评论区指正 目录 1. 前言 2. 什么是Kubernetes 3. 为什么需要Kubernetes 3.1. 应…

JAVA实现easyExcel批量导入

注解类型描述ExcelProperty导入指定当前字段对应excel中的那一列。可以根据名字或者Index去匹配。当然也可以不写,默认第一个字段就是index0,以此类推。千万注意,要么全部不写,要么全部用index,要么全部用名字去匹配。…

网络安全实训Day15

写在前面 电子垃圾,堂堂恢复连载。本来不想分天数梳理了,但是最后要写实训报告,报告里还要有实训日记记录每日学的东西,干脆发这里留个档,到时候写报告提供一个思路。 网络空间安全实训-渗透测试 渗透测试概述 定义 一…

python flask 假死情况处理+https证书添加

前言 当使用flask编写了后台程序跑在服务器端的时候,有时候虽然后台中显示在运行,但是页面无法访问,出现这个情况可以使用如下方法修改代码,进而防止假死,另外记录下flask下证书的添加。 假死处理 出现进程存在&…

(MSFT.O)微软2024财年Q3营收619亿美元

在科技的浩渺宇宙中,一颗璀璨星辰再度闪耀其光芒——(MSFT.O)微软公司于2024财政年度第三季展现出惊人的财务表现,实现总营业收入达到令人咋舌的6190亿美元。这一辉煌成就不仅突显了微软作为全球技术领导者之一的地位,更引发了业界内外对这家…

华为OD机试 - 跳格子3 - 动态规划(Java 2024 C卷 200分)

华为OD机试 2024C卷题库疯狂收录中,刷题点这里 专栏导读 本专栏收录于《华为OD机试(JAVA)真题(A卷B卷C卷)》。 刷的越多,抽中的概率越大,每一题都有详细的答题思路、详细的代码注释、样例测试…

FORM调用标准AP\AR\GL\FA界面

EBS FORM客户化界面有时候数据需要追溯打开AP\AR\GL\FA等界面: 一种打开日记账的方式: PROCEDURE SHOW_JOURNAL ISparent_form_id FormModule;child_form_id FormModule; BEGINclose_jrn;parent_form_id : FIND_FORM(:SYSTEM.CURRENT_FORM);COPY(TO…

代码随想录算法训练营DAY38|C++动态规划Part.1|动态规划理论基础、509.斐波那契数、70.爬楼梯、746.使用最小花费爬楼梯

文章目录 动态规划理论基础什么是动态规划动态规划的解题步骤DP数组以及下标的含义递推公式DP数组初始化DP数组遍历顺序打印DP数组动态规划五部曲 动态规划应该如何debug 509.斐波那契数什么是斐波那契数列动态规划五部曲确定dp数组下标以及含义确定递推公式dp数组如何初始化确…

免费的单片机物联网MQTT平台选择

目的是多设备接入中控,平台只做转发。 选择巴法云:巴法科技&巴法云-巴法设备云-巴法物联网云平台 clientId是私钥uid: 多设备 clientId 填同一个 uid 都是可以的。平台应该是加了后缀区分。 支持自定义topic,操作简单&#x…

关于我在 uniapp 开发过程中遇到的问题(更新中...)

目录 uniapp 勾选自带的隐私政策 出现的问题 是否忽略版本兼容检查提示 勾选了uniapp的消息推送 打包后弹出 push module was not added when packaging, please refertohttps://ask.dcloud.net.cn /article/283 关于uniapp的真机调试 一直等待问题 或者 正在建立链接 在…

封装 H.264 视频为 FLV 格式然后推流

封装 H.264 视频为 FLV 格式并通过 RTMP 推流 flyfish 协议 RTMP (Real-Time Messaging Protocol) RTSP (Real Time Streaming Protocol) SRT (Secure Reliable Transport) WebRTC RTMP(Real Time Messaging Protocol)是一种用于实时音视频流传输的协…

以更多架构核心专利,推进 SDS 产业创新创造

今天是第 24 个世界知识产权日,今年世界知识产权日活动的主题是:“知识产权和可持续发展目标:立足创新创造,构建共同未来。” 这也正是 XSKY 在软件定义存储领域的目标之一。以“数据常青”为使命的 XSKY,始终立足于软…

Linux基础——Linux基本指令(下)

前言:Linux基本指令学到这里也快接近尾声了,如果对前面内容还有不清楚建议回顾这两篇文章 。 Linux基本指令(上) 和Linux基本指令(中) 接前两篇,接下来让我们再深入学习一下最后几个Linux指令,Linux基本指令将在本篇完结。 在此前&#xff…

将图片添加描述批量写入excel

原始图片 写入excel的效果 代码 # by zengxy chatgpt # from https://blog.csdn.net/imwatersimport os import xlsxwriter from PIL import Imageclass Image2Xlsx():def __init__(self,xls_path,head_list[编号, 图片, 名称, "描述",备注],set_default_y112,se…

StarRocks x Paimon 构建极速实时湖仓分析架构实践

Paimon 介绍 Apache Paimon 是新一代的湖格式,可以使用 Flink 和 Spark 构建实时 Lakehouse 架构,以进行流式处理和批处理操作。Paimon 创新性地使用 LSM(日志结构合并树)结构,将实时流式更新引入 Lakehouse 架构中。 …

Spark原理之Cache Table的工作原理及实现自动缓存重复表的思考

CACHE TABLE的能力 使用此语法,可以由用户自定义要缓存的结果集,实际上就是一个临时表,不过数据存储在Spark集群内部,由Application所分配的executors管理。 一旦定义了一个缓存表,就可以在SQL脚本中随处引用这个表名…

HTTP 网络协议的请求头信息,响应头信息,具体详解(2024-04-26)

1、通用头部 2、常见的 HTTP请求头信息 HTTP 响应头信息是服务器在响应客户端的HTTP请求时发送的一系列头字段,它们提供了关于响应的附加信息和服务器的指令。 3、常见的 HTTP 响应头信息 响应头向客户端提供一些额外信息,比如谁在发送响应、响应者的功…

数据分析:甲基化分析-从DNA methylation的IDAT文件到CpG site的Beta values

介绍 DNA Methylation和疾病的发生发展存在密切相关,它一般通过CH3替换碱基5‘碳的H原子,进而调控基因的转录。常用的DNA methylation是Illumina Infinium methylation arrays,该芯片有450K和850K(也即是EPIC)。 该脚…