【大数据面试知识点】Spark的DAGScheduler

news2025/1/16 14:57:20

Spark数据本地化是在哪个阶段计算首选位置的?

先看一下DAGScheduler的注释,可以看到DAGScheduler除了Stage和Task的划分外,还做了缓存的跟踪和首选运行位置的计算。

DAGScheduler注释: 

The high-level scheduling layer that implements stage-oriented scheduling. It computes a DAG of stages for each job, keeps track of which RDDs and stage outputs are materialized, and finds a minimal schedule to run the job. It then submits stages as TaskSets to an underlying TaskScheduler implementation that runs them on the cluster. A TaskSet contains fully independent tasks that can run right away based on the data that's already on the cluster (e.g. map output files from previous stages), though it may fail if this data becomes unavailable.
Spark stages are created by breaking the RDD graph at shuffle boundaries. RDD operations with "narrow" dependencies, like map() and filter(), are pipelined together into one set of tasks in each stage, but operations with shuffle dependencies require multiple stages (one to write a set of map output files, and another to read those files after a barrier). In the end, every stage will have only shuffle dependencies on other stages, and may compute multiple operations inside it. The actual pipelining of these operations happens in the RDD.compute() functions of various RDDs
In addition to coming up with a DAG of stages, the DAGScheduler also determines the preferred locations to run each task on, based on the current cache status, and passes these to the low-level TaskScheduler. Furthermore, it handles failures due to shuffle output files being lost, in which case old stages may need to be resubmitted. Failures *within* a stage that are not caused by shuffle file loss are handled by the TaskScheduler, which will retry each task a small number of times before cancelling the whole stage.
When looking through this code, there are several key concepts:

  • Jobs (represented by ActiveJob) are the top-level work items submitted to the scheduler. For example, when the user calls an action, like count(), a job will be submitted through submitJob. Each Job may require the execution of multiple stages to build intermediate data.
  • Stages (Stage) are sets of tasks that compute intermediate results in jobs, where each task computes the same function on partitions of the same RDD. Stages are separated at shuffle boundaries, which introduce a barrier (where we must wait for the previous stage to finish to fetch outputs). There are two types of stages: ResultStage, for the final stage that executes an action, and ShuffleMapStage, which writes map output files for a shuffle. Stages are often shared across multiple jobs, if these jobs reuse the same RDDs.
  • Tasks are individual units of work, each sent to one machine.
  • Cache tracking: the DAGScheduler figures out which RDDs are cached to avoid recomputing them and likewise remembers which shuffle map stages have already produced output files to avoid redoing the map side of a shuffle.
  • Preferred locations: the DAGScheduler also computes where to run each task in a stage based on the preferred locations of its underlying RDDs, or the location of cached or shuffle data.
  • Cleanup: all data structures are cleared when the running jobs that depend on them finish, to prevent memory leaks in a long-running application.

To recover from failures, the same stage might need to run multiple times, which are called "attempts". If the TaskScheduler reports that a task failed because a map output file from a previous stage was lost, the DAGScheduler resubmits that lost stage. This is detected through a CompletionEvent with FetchFailed, or an ExecutorLost event. The DAGScheduler will wait a small amount of time to see whether other nodes or tasks fail, then resubmit TaskSets for any lost stage(s) that compute the missing tasks. As part of this process, we might also have to create Stage objects for old (finished) stages where we previously cleaned up the Stage object. Since tasks from the old attempt of a stage could still be running, care must be taken to map any events received in the correct Stage object.
Here's a checklist to use when making or reviewing changes to this class:

  • All data structures should be cleared when the jobs involving them end to avoid indefinite accumulation of state in long-running programs.
  • When adding a new data structure, update DAGSchedulerSuite.assertDataStructuresEmpty to include the new structure. This will help to catch memory leaks.

DAGScheduler的运行时机

DAGScheduler运行时机:Driver端初始化SparkContext时。DAGScheduler是在整个Spark Application的入口即 SparkContext中声明并实例化的。在实例化DAGScheduler之前,巳经实例化了SchedulerBackend和底层调度器 TaskScheduler。

如果是SQL任务的话,SparkSQL通过Catalyst(Spark SQL的核心是Catalyst优化器)将SQL先翻译成逻辑计划再翻译成物理计划,再转换成RDD的操作。之后运行时再通过DAGScheduler做RDD任务的划分和调度。

DAGScheduler如何划分Stage的?

用户提交的计算任务是一个由RDD依赖构成的DAG,Spark会把RDD的依赖以shuffle依赖为边界划分成多个Stage,这些Stage之间也相互依赖,形成了Stage的DAG。然后,DAGScheduler会按依赖关系顺序执行这些Stage。

要是把RDD依赖构成的DAG看成是逻辑执行计划(logic plan),那么,可以把Stage看成物理执行计划,为了更好的理解这个概念,我们来看一个例子。

下面的代码用来对README.md文件中包含整数值的单词进行计数,并打印RDD之间的依赖关系(Lineage):

scala> val counts = sc.textFile("README.md")
               .flatMap(x=>x.split("\\W+"))
               .filter(_.matches(".*\\d.*"))
               .map(x=>(x,1))
               .reduceByKey(_+_)
 // 调用一个action函数,用来触发任务的提交和执行
 scala> counts.collect()
 ​
 // 打印RDD的依赖关系(Lineage)
 scala> counts.toDebugString
 res7: String =
 (2) ShuffledRDD[17] at reduceByKey at <console>:24 []
  +-(2) MapPartitionsRDD[16] at map at <console>:24 []
     |  MapPartitionsRDD[15] at filter at <console>:24 []
     |  MapPartitionsRDD[14] at flatMap at <console>:24 []
     |  README.md MapPartitionsRDD[13] at textFile at <console>:24 []
     |  README.md HadoopRDD[12] at textFile at <console>:24 []

DAGScheduler会根据Shuffle划分前后两个Stage:即StageShuffleMapStage和ResultStage

ShuffleMapStage

先看下ShuffleMapStage的注释,核心就是再讲ShuffleMapStage是做ShuffleWrite的Stage,Stage中是算子的pipline。

ShuffleMapStages are intermediate stages in the execution DAG that produce data for a shuffle.
 They occur right before each shuffle operation, and might contain multiple pipelined operations before that (e.g. map and filter). When executed, they save map output files that can later be fetched by reduce tasks. The `shuffleDep` field describes the shuffle each stage is part of, and variables like `outputLocs` and `numAvailableOutputs` track how many map outputs are ready.
 
ShuffleMapStages can also be submitted independently as jobs with DAGScheduler.submitMapStage.
 For such stages, the ActiveJobs that submitted them are tracked in `mapStageJobs`. Note that there can be multiple ActiveJobs trying to compute the same shuffle map stage. 

ShuffleMapStages是在DAG执行过程中产生的Stage,用来为Shuffle产生数据。ShuffleMapStages发生在每个Shuffle操作之前,在Shuffle之前可能有多个窄转换操作,比如:map,filter,这些操作可以形成流水线(pipeline)。当执行ShuffleMapStages时,会产生Map的输出文件,这些文件会被随后的Reduce任务使用。

ShuffleMapStages也可以作为Jobs,通过DAGScheduler.submitMapStage函数单独进行提交。对于这样的Stages,会在变量mapStageJobs中跟踪提交它们的ActiveJobs。 要注意的是,可能有多个ActiveJob尝试计算相同的ShuffleMapStages。

它为一个shuffle过程产生map操作的输出文件。它也可能是自适应查询规划/自适应调度工作的最后阶段。

ResultStage

再看ResultStage的注释

ResultStages apply a function on some partitions of an RDD to compute the result of an action.
The ResultStage object captures the function to execute, `func`, which will be applied to each partition, and the set of partition IDs, `partitions`. Some stages may not run on all partitions of the RDD, for actions like first() and lookup().

ResultStage是Job的最后一个Stage,该Stage是基于执行action函数的rdd来创建的。该Stage用来计算一个action操作的结果。该类的声明如下:

 private[spark] class ResultStage(
     id: Int,
     rdd: RDD[_],
     val func: (TaskContext, Iterator[_]) => _,
     val partitions: Array[Int],
     parents: List[Stage],   //依赖的父Stage
     firstJobId: Int,
     callSite: CallSite)
   extends Stage(id, rdd, partitions.length, parents, firstJobId, callSite) {

为了计算action操作的结果,ResultStage会在目标RDD的一个或多个分区上使用函数:func。需要计算的分区id集合保存在成员变量:partitions中。但对于有些action操作,比如:first(),take()等,函数:func可能不会在所有分区上使用。

另外,在提交Job时,会先创建ResultStage。但在提交Stage时,会先递归找到该Stage依赖的父级Stage,并先提交父级Stage。如下图所示:

举个例子:

思考题 

如下rdd运算,为什么最终只划分了3个Stage

scala> val rdd1 = sc.textFile("/root/tmp/a.txt",3).flatMap(x=>x.split(",")).map(x=>(x,1)).reduceByKey((a,b)=>a+b)
val rdd1: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[4] at reduceByKey at <console>:1

scala> val rdd2 = sc.textFile("/root/tmp/a.txt",3).flatMap(_.split(",")).map((_,1)).reduceByKey(_+_)
val rdd2: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[9] at reduceByKey at <console>:1

scala> val rdd3 = rdd1.join(rdd2)
val rdd3: org.apache.spark.rdd.RDD[(String, (Int, Int))] = MapPartitionsRDD[12] at join at <console>:1

scala> val rdd4 = rdd3.groupByKey()
val rdd4: org.apache.spark.rdd.RDD[(String, Iterable[(Int, Int)])] = MapPartitionsRDD[13] at groupByKey at <console>:1

scala> rdd4.collect().foreach(println)
(c,Seq((2,2)))                                                                  
(d,Seq((1,1)))
(a,Seq((2,2)))
(b,Seq((1,1)))

scala> rdd4.toDebugString
val res8: String =
(3) MapPartitionsRDD[13] at groupByKey at <console>:1 []
 |  MapPartitionsRDD[12] at join at <console>:1 []
 |  MapPartitionsRDD[11] at join at <console>:1 []
 |  CoGroupedRDD[10] at join at <console>:1 []
 |  ShuffledRDD[4] at reduceByKey at <console>:1 []
 +-(3) MapPartitionsRDD[3] at map at <console>:1 []
    |  MapPartitionsRDD[2] at flatMap at <console>:1 []
    |  /root/tmp/a.txt MapPartitionsRDD[1] at textFile at <console>:1 []
    |  /root/tmp/a.txt HadoopRDD[0] at textFile at <console>:1 []
 |  ShuffledRDD[9] at reduceByKey at <console>:1 []
 +-(3) MapPartitionsRDD[8] at map at <console>:1 []
    |  MapPartitionsRDD[7] at flatMap at <console>:1 []
    |  /root/tmp/a.txt MapPartitionsRDD[6] at textFile at <console>:1 []
    |  /root/t...

参考:DAGScheduler-Stage的划分与提交 - 知乎Spark SQL 源码分析之Physical Plan 到 RDD的具体实现_physicalplan到rdd的具体实现-CSDN博客

一文搞定Spark的DAG调度器(DAGScheduler)_spark dagscheduler-CSDN博客

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1348753.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

【随口一说】最近的CSDN

这段时间随便发的一篇博文很快就有“点赞”、“收藏”、“关注”的信息&#xff0c; 而且简单看了一眼用户&#xff0c;很多都是空的或者一堆转载&#xff0c; 机器人也太明显了点&#xff0c;很让人不舒服&#xff0c; 不花点心思设计文章评优推送算法反倒用机器人刷热门&…

【C++】类与对象

文章目录 1. 面向过程和面向对象的初步认识2. 类的引入3. 类的定义4. 类的访问限定符及封装4.1 访问限定符4.2 封装 5. 类的作用域6. 类的实例化7. 类对象模型7.1 如何计算类对象的大小7.2 类对象的存储方式猜测7.3 结构体内存对齐规则 8. this指针8.1 this指针的引出8.2 this指…

3294 李白的酒

#include<bits/stdc.h> using namespace std; int main(){int n;double ans;scanf("%d",&n);for(int i1;i<n;i)ans1,ans/2;printf("%.5f",ans); }

一元函数微分学——刷题(10

目录 1.题目&#xff1a;2.解题思路和步骤&#xff1a;3.总结&#xff1a;小结&#xff1a; 1.题目&#xff1a; 2.解题思路和步骤&#xff1a; 首先题目中给了一个要点&#xff0c;就是周期为5&#xff0c;显然要求的那个点和题目没任何关系&#xff0c;所以利用周期为5&…

[DAU-FI Net开源 | Dual Attention UNet+特征融合+Sobel和Canny等算子解决语义分割痛点]

文章目录 概要I Introduction小结 概要 提出的架构&#xff0c;双注意力U-Net与特征融合&#xff08;DAU-FI Net&#xff09;&#xff0c;解决了语义分割中的挑战&#xff0c;特别是在多类不平衡数据集上&#xff0c;这些数据集具有有限的样本。DAU-FI Net 整合了多尺度空间-通…

数据的分片和路由

之前我们讲解了数据的复制&#xff0c;也就是说让多台机器保留相同的副本&#xff0c;适合用于读多写少的问题&#xff0c;我们这里讲的是数据的分片&#xff0c;分片的目的是将数据进行切分让他们分布到不同的机器上&#xff0c;其中一个动机是数据可能很大如果单纯存放到一个…

macos系统中,获取系统的“文件夹”图标,比如“下载文件夹”有不同的文件夹图标,怎么获得这个文件夹图标呢

在macOS系统中&#xff0c;如果您想要获取“下载”文件夹的图标以用作其他用途&#xff08;例如制作快捷方式、自定义应用图标等&#xff09;&#xff0c;可以按照以下步骤操作&#xff1a; 打开Finder。使用Spotlight搜索或者直接导航到 /System/Library/CoreServices/CoreTy…

记矩阵基础概念

转自up&#xff1a;Naruto_Qcsdn&#xff1a;三维空间几何变换矩阵 先贴个站里分享的基础概念。 learn form 肥猫同学VFX b站&#xff1a;会用transform就会用矩阵 移动 旋转 缩放 1.transofrm ——输出变化矩阵 可以移动transform查看变化去理解 位移 缩放 旋转 由此—…

B+树的插入删除

操作 插入 case2的原理,非叶子节点永远和最右边的最左边的节点的值相等。 case3:的基本原理 非叶子节点都是索引节点 底层的数据分裂之后 相当于向上方插入一个新的索引(你可以认为非叶子节点都是索引),反正第二层插入160 都要分裂,然后也需要再插入(因为索引部分不需要重…

redisson作为分布式锁的底层实现

1. redisson如何实现尝试获取锁的逻辑 如何实现在一段的时间内不断的尝试获取锁 其实就是搞了个while循环&#xff0c;不断的去尝试获取锁资源。但是因为latch的存在会在给定的时间内处于休眠状态。这个事件&#xff0c;监听的是解锁动作&#xff0c;如果解锁动作发生。会调用…

YOLOv8改进 | 检测头篇 | ASFF改进YOLOv8检测头(全网首发)

一、本文介绍 本文给大家带来的改进机制是利用ASFF改进YOLOv8的检测头形成新的检测头Detect_ASFF&#xff0c;其主要创新是引入了一种自适应的空间特征融合方式&#xff0c;有效地过滤掉冲突信息&#xff0c;从而增强了尺度不变性。经过我的实验验证&#xff0c;修改后的检测头…

12/31

提示&#xff1a;文章写完后&#xff0c;目录可以自动生成&#xff0c;如何生成可参考右边的帮助文档 文章目录 摘要Abstract文献阅读&#xff1a;用于密集预测的多路径视觉Transformer1、研究背景2、方法提出3、相关方法3.1、Vision Transformers for dense predictions3.2、C…

「网络编程」其他重要的协议或技术_ DNS协议 | ICMP协议 | NAT技术

「前言」文章内容是DNS协议、ICMP协议、NAT技术的讲解。 「归属专栏」网络编程 「主页链接」个人主页 「笔者」枫叶先生(fy) 目录 一、DNS协议1.1 背景1.2 域名简介1.3 域名解析的过程 二、ICMP协议2.1 ICMP简介2.2 ping命令2.3 traceroute命令 三、NAT技术3.1 NAT技术背景3.2 …

小型企业网设计-课设实验-爆款实验

可以按照我的配置依次配置&#xff0c;成品打包文件&#xff0c;请&#xff1a;Ensp888 <Huawei>sys Enter system view, return user view with CtrlZ. [Huawei]un in en Info: Information center is disabled. [Huawei]# [Huawei]sysname SW5 [SW5]# [SW5]vlan batch…

(C++) 拷贝构造函数

目录 一、基本介绍 二、为什么需要拷贝构造函数 三、拷贝构造函数 四、传参时的问题 五、完整代码 一、基本介绍 拷贝构造函数是C中一个特殊的构造函数&#xff0c;用于创建一个类的对象作为另一个同类对象的副本。当一个对象以值的形式被传递给函数、从函数返回&#xff0…

深入浅出图解C#堆与栈 C# Heap(ing) VS Stack(ing) 第一节 理解堆与栈

深入浅出图解C#堆与栈 C# HeapingVS Stacking第一节 理解堆与栈 [深入浅出图解C#堆与栈 C# Heap(ing) VS Stack(ing) 第一节 理解堆与栈](https://mp.csdn.net/mdeditor/101021023)[深入浅出图解C#堆与栈 C# Heap(ing) VS Stack(ing) 第二节 栈基本工作原理](https://mp.csdn.n…

Avalonia学习(十六)-Mapsui

今天开始继续Avalonia练习。 本节&#xff1a;Mapsui 1.引入 Mapsui.Avalonia 2.项目引入 前台代码 <Window xmlns"https://github.com/avaloniaui"xmlns:x"http://schemas.microsoft.com/winfx/2006/xaml"xmlns:vm"using:MapsuiAvalonia.Vi…

C语言之分支与循环【附6个练习】

文章目录 前言一、什么是语句&#xff1f;1.1 表达式语句1.2 函数调用语句1.3 控制语句1.4 复合语句1.5 空语句 二、分支语句&#xff08;选择结构&#xff09;2.1 if语句2.1.1 悬空else2.1.2 练习&#xff08;1. 判断一个数是否为奇数 2. 输出1-100之间的奇数&#xff09; 2.2…

postman win7 低版本 postman7.0.9win64 postman7.0.9win32

百度网盘&#xff1a; postman7.0.9win64&#xff1a; 链接: https://pan.baidu.com/s/18ck9tI0r9Pqoz36MOwwnnQ 提取码: rkf7 postman7.0.9win32&#xff1a; 链接: https://pan.baidu.com/s/1HrpGPrgvVzyAcjdHuwVOpA 提取码: ke5k win7系统安装postman&#xff0c;可能会…

侯捷C++ 2.0 新特性

关键字 nullptr and std::nullptr_t auto 一致性初始化&#xff1a;Uniform Initialization 11之前&#xff0c;初始化方法包括&#xff1a;小括号、大括号、赋值号&#xff0c;这让人困惑。基于这个原因&#xff0c;给他来个统一&#xff0c;即&#xff0c;任何初始化都能够…