【Spark系列1】DAG中Stage和Task的划分全流程

news2025/1/10 7:48:06

 本文字数在7800字左右,预计时间在15分钟

一、整体流程

每个Aciton操作会创建一个JOB,JOB会提交给DAGScheduler,DAGScheduler根据RDD依赖的关系划分为多个Stage,每个Stage又会创建多个TaskSet,每个TaskSet包含多个Task,这个Task就是每个分区的并行计算的任务。DAGScheduler将TaskSet按照顺序提交给TaskScheduler,TaskScheduler将每一个任务去找SchedulerBackend申请执行所需要的资源,获取到资源后,SchedulerBackend将这些Task提交给Executor,Executor负责将这些任务运行起来。

二、JOB提交

2.1、为什么需要action操作

在Spark中,分为transformation操作和action操作。执行用户程序时,transformation操作将一个RDD转换成了新的RDD,并在compute()函数中,记录了如何根据父RDD计算出当前RDD的数据、RDD如何分区等信息,并且能够得出最后一个RDD的数据。 但是RDD中的每个分区中依然是一条一条的分散的数据,那么要对最后一个RDD执行什么操作呢?这就是action操作的作用。

2.2、Job提交

每个action操作都会生成一个Job,这个Job包含了需要计算的RDD对象、需要计算的分区、需要执行什么样的计算。RDD和用户执行的计算都是可以序列化的,RDD序列化之后,在Executor中反序列化之后即可得到该RDD对象,再根据对象compute()函数就可以计算出某个分区的数据。JOB中包含的数据如下所示

2.3、分布式执行

当提交Job以后,就可以将Job划分为多个并行的任务,每个任务计算指定分区的一个分区即可。通过RDD的计算函数即可计算出该分区的数据,今儿计算出分区的结果。

三、Stage划分

3.1、宽依赖和窄依赖

如果一个RDD的每个分区最多只能被一个Child RDD的一个分区所使用, 则称之为窄依赖(Narrow dependency), 如果被多个Child RDD分区依赖, 则称之为宽依赖(wide dependency)

3.2、Stage划分

在用户编写的一系列转换中,多个RDD可能既形成了多次窄依赖,也形成了多次宽依赖,连续的窄依赖可以通过一个任务进行流水线处理,但是如果遇到了宽依赖,就必须先将父RDD的所有数据都进行计算并保存起来,再进行RDD的运算。在一个Job中,action操作知识定义了在最后的RDD中执行何种操作,而最后的RDD会依赖上个RDD,上个RDD又会有其他依赖,这样就形成了一系列的依赖关系。如果为宽依赖的话,就在依赖的地方进行切分,先将宽依赖的父RDD进行计算出来,再计算后续的RDD,按照快依赖被划分的过程,即为Stage划分的过程。

如上图所示,rdd1->rdd2,rdd3->rdd4是窄依赖,rdd2->rdd3,rdd4->rdd5是宽依赖。在发生shuffle的位置,Spark将计算分为两个阶段分别执行,每发生一次shuffle,Spark就将计算划分为先后的两个阶段,如下图

在划分阶段的过程中,对于某个阶段而言其并行的计算任务都完全相同,因此在Job执行的过程中,并行计算就是指每个阶段中任务并行的计算。如在Stage1中,每个分区的数据可以使用一个任务进行计算。10000个分区即可在集群中并行运行10000个任务进行计算。如果集群资源不够,可以将10000个任务依次在集群中运行,直到运行完毕,再进行Stage2的计算。Stage2也会根据分区数启动多个任务并行的加载Stage1生成的数据,完成Stage2的计算。

在一个Job的运行过程中,所有的Stage其实都是为最后一个Stage做准备,因为action操作只需要最后一个RDD的数据。因此最后一个Stage称为ResultStage,之前所有的Stage都是由Shuffle引起的中间计算过程,被称为ShuffleMapStage。其过程如下图

3.3、Spark实现

再Spark实现中,SparkContext将Job提交至DAGScheduler,DAGScheduler获取Job中执行action操作的RDD,将最后执行action操作的RDD划分到最后的ResultStage中,然后遍历该RDD的依赖和所有的父依赖,每遇到宽依赖就将两个RDD划分到两个不同的Stage中,遇到窄依赖就将窄依赖的多个RDD划分到一个Stage中,经过这次操作,一个RDD就划分为有多个依赖关系的Stage。再每个Stage中,所有的RDD之间都是窄依赖的关系,Stage之间的RDD都是宽依赖的关系。DAGScheduler将最初被依赖的Stage提交,计算该Stage中的数据,计算完成后,再将后续的Stage提交,知道最后运行的ResultStage,则整个计算Job完成。ResultStage和ShuffleMapStage结构如下图

在生成ShuffleapStage时,ShuffleDependency起到了承上启下的作用,如果两个RDD之间为宽依赖,子RDD的依赖为ShuffleDependency;在划分Stage的时候,父Stage会保存该ShuffleDependency,以便在执行父Stage的时候,根据ShuffleDependency获取Shuffle的写入器,在子Stage执行的时候,会根据RDD的依赖关系使用相同的ShuffleDependency获取Shuffle的读取器。

在计算过程中,ShuffleMapStage会生成该Stage的结果,为下一个Stage提供数据,计算下一个Stage的RDD的时候,会拉取上一个Stage的计算结果。上一个Stage的计算保存在哪呢?答案是Spark的组件MapOutputTracker。MapOutputTracker也是主从结构,Executor端是MapOutputTrackerWroker,当ShuffleMapStage的任务运行完成后,会通过Executor上的MapOutputTrackerWroker将数据保存的位置发送到Driver上的MapOutputTrackerMaster中。在后续Stage需要上一个Stage的计算结果的时候,就通过MapOutputTrackerMaster询问计算结果的保存位置,进而加载相应的数据。

四、Task划分

DAGScheduler将Job划分为多个Stage之后,下一步就是将Stage划分为多个可以在集群中并行执行的任务,只有将任务并行执行,Stage才能更快的完成。

4.1、任务的个数

由于Stage中都是对RDD的计算,RDD又是分区的,所以在对任务进行划分的时候,每个分区可以启动一个任务进行计算。无论是ResultStage还是ShuffleMapStage,每个阶段能够并行执行的任务数量都取决于该阶段中最后一个Rdd的分区数量

上面已经介绍,在一个Stage中,RDD的依赖关系是窄依赖,所以最后一个RDD的分区数量取决于其依赖的RDD的分区数量,一直依赖到该阶段的开始的RDD的分区。对于第一阶段开始的RDD分为两种情况:

  1. 第一种为初始的RDD,即从数据源加载数据形成的初始RDD,这种情况的分区数量取决于初始RDD的形成分区方式。
  2. 第二种为该阶段的初始RDD为Shuffle阶段的Reduce任务,这种情况下,该RDD的分区数量取决于在Shuffle的Map阶段最后一个RDD的分区器设置的分区数量。

4.2、Task的生成

当确定了每个Stage的分区数量之后,就需要为每个分区生成相应的计算任务,该计算任务就是需要对该阶段的最后一个RDD执行什么操作

在ResultStage中,需要对最后一个RDD的每个分区分别执行用户自定义的action操作,所以在ResultStage中生成的每个Task都包含以下三个部分

  1. 需要对哪个RDD进行操作
  2. 需要对RDD哪个分区进行操作
  3. 需要对分区的内容执行什么样的操作

在ResultStage中划分的Task称为ResultTask,ResultTask中包含了ResultStage中最后一个RDD,即执行action操作的的RDD,需要计算的RDD分区的id和执行action操作的函数。

在ShuffleMapStage中,最终需要完成Shuffle过程中的Map阶段的操作,每个分区按照Shuffle中的Map端定义的过程执行数据的分组操作,将分组结果进行保存,并将保存结果位置通知Driver端的MapOutputTrackerMaster,MapOutputTrackerMaster保存着每一个Shuffle中Map输出的位置。在ShuffleMapStage中划分的Task称为ShuffleMapTask。ShuffleMapTask同样由三个重要的部分组成:Stage中最后的RDD、需要计算的分区的id、划分Stage的ShuffleDependency

4.3、Task的最佳运行位置

生成Task时,还会计算Task的最佳运行位置。虽然RDD包含计算RDD的所有信息,可以在任何节点上运行,但是如果通过为Task计算分配最佳的运行位置,可以将Task调度到含有该Task需要的数据的节点,从而实现移动计算而不是移动数据的目的。Spark会根据RDD可能分布的的情况,将Task的运行位置主要分为Host级别和Executor级别当一个RDD被某个Executor缓存,则对该RDD计算时,优先会把计算的Task调度到该Executor中执行。当一个RDD需要的数据存在某个host中时,则会把该Task调度到这个节点的Executor中

五、Task的执行

5.1、Task执行流程

DAGScheduler将Stage生成TaskSet之后,会将Task交给TaskScheduler进行处理,TaskScheduler负责将Task提交到集群中运行,并负责失败重试,为DAGScheduler返回事件信息等,整体如流程如下:

当任务提交到TaskScheduler时,TaskScheduler会通知SchedulerBackend分配计算资源,SchedulerBackend将所有可用的Executor的资源信息转换成WorkerOffer交给TaskScheduler,WorkerOffer中包含executorId、Executor的hostname、Executor的可用CPU等。TaskScheduler负责根据WorkerOffer在相应的Executor分配TaskSet中的Task,并将Task转换为TaskDescription交给SchedulerBackend。最终有空闲的的CPU的Executor会被分配到一个或者多个TaskDescription,SchedulerBackend将这些TaskDescription提交到对应的Executor中执行。

5.2、集群资源管理

Task运行离不开集群中的计算资源,即在SparkContext初始化过程中创建的Executor资源。在Executor创建完毕后回向SchedulerBackend中注册。Executor在注册时发送的信息包含的内容有:executorId,Executor-Ref引用、Executor的hostname、可用的CPU核数。

SchedulerBackend收到后,会将Executor的注册信息转换为ExecutorData进行保存,并且在SchedulerBackend中使用Map结构保存每个executorId和ExecutorData的关系,ExecutorData中还记录了剩余的可用的CPU核数

在为计算任务分配资源时,只需遍历所有的ExecutorData,分配可用的资源即可。由ExecutorData分配的可用资源使用WorkerOffer表示,WorkerOffer中包含executorId、Executor的hostname、Executor的可用CPU等。

5.3、任务的分配

TaskScheduler在接受到DAGScheduler提交的TaskSet以后,会为每个TaskSet创建一个TaskSetManager,用于管理TaskSet中所有任务的运行。TaskSetManager会根据Task中的最佳运行位置计算TaskSet的所有本地运行级别,本地运行级别决定Task最终在哪个Executor上运行。Spark中本地运行级别从小到达可分为:

  1. 进程本地化
  2. 节点本地化
  3. 无优先位置
  4. 机架本地化
  5. 任意节点

在TaskSetManager初始化时,根据着5个本地运行级别分别创建5个Map,分别记录其下可以运行的所有Task。这些映射关系的建立,时根据生成Task时Task运行的最佳位置确定的。。在这5种映射关系中,某个Task可能会重复存在于几个本地化级别中。

当有新的TaskSet加入、由Task执行完成、由新的Executor加入时,都会触发SchedulerBackend重新计算可用资源。TaskScheduler根据调度的顺序,依次调度TaskSetManager中的TaskSet,对于每个TaskSet遍历所有本地化级别,从小到大尝试在Executor分配Task,根据每个WorkerOffer的executorId和hostname,使用TaskSetManager判断在当前本地化级别中,是否可以在该Executor或Host上分配任务,直到该本地化级别无法分配Task,再将本地化级别提高一级再次尝分配Task。经过对本地化级别的便利,即可实现WorkerOffer分配任务或将所有待执行的任务分配完成。TaskSet中部分任务分配完成以后会生成一组TaskDescription,每个TaskDescription中包含executorId和Task的其他运行信息。SchedulerBackend根据TaskDescription的executorId,将每个任务封装成LaunchTask消息提交到不同的Executor中

六、Task的执行

Executor收到SchedulerBackend提交的LaunchTask消息后,即可运行该消息中包含的Task。Executor将接收到的Task封装到TaskRunner中,TaskRunner是一个Runnable接口,从而可以将该任务提交到线程池中运行。

6.1、Executor可以并行运行Task的数量

在创建Executor时,每个Executor可能会分配多个CPU核数,而Executor运行的所有任务都是在线程池中运行。Executor运行的时候其本身没有记录CPU使用的情况,对于Executor能够同时运行多少个任务是由SchedulerBackend控制的,SchedulerBackend每在一个Executor中提交一个任务时,便在ExecutorData中减少该Executor可用的CPU核数,直到该Executor生成的WorkerOffer可用的CPU核数为0,便不再为Executor分配任务了。默认每个Task使用一个CPU核心运行,该变量可以通过Spark的配置spark.task.CPUs修改

6.2、Executor中资源共享

当在一个Executor上运行多个Task时,多个Task共享Executor中的SparkEnv的所有组件,共用Executor中分配的内存。如使用Spark广播变量时,每个Executor中会存在一份,Executor所有任务共享这一份变量。当Executor中的BlockManager缓存了某个rdd某分区的数据时,在该Executor上调度使用这个RDD的这个分区的数据的Task执行,可以有效的减少网络加载数据的过程,减少网络传输

6.3、ResultTask运行

在执行ResultTask时,首先会反序列化出该Task执行计算的RDD和对该RDD执行的操作。根据是否涉及Shuffle操作,分为两种

  1. 用户编写的RDDtransformation中,不涉及Shuffle操作,一个Job就只涉及一个ResultStage,rdd1直接从数据源中加载

  2. 过程中涉及Shuffle操作,划分为两个Stage,rdd1位Shuffle的Reduce阶段。由于DAGScheduler在划分Stage,必先会先计算父Stage,所以执行到ResultStage时,,其父Stage的Map阶段已经完成,并且计算结果已经保存到了BlockManager中,ResultStage中的rdd1之需要根据MapOutputTrackerMaster的计算结果位置信息加载该分区的数据即可

6.4、ShuffleMapTask运行

在计算ShuffleMapTask时,首先会反序列化出Task包含的计算的RDD和划分此Stage的ShulffleDependency。ShulffleDependency包含RDD需要执行分组操作的分区器partitioner,并且通过ShulffleDependency可以获取ShulffleManager的写入器,将本分区的分组计算结果通过写入器写入文件中进行保存。在这个过程中,一个分区的数据生成的多个分组的数据分别属于下游Reduce阶段的不同的分区的数据

ShuffleMapTask中计算的RDD同样为这个Stage中最后的一个RDD。

下图是多个ShuffleMapStage的RDD转换过程

七、Task结果处理

当Executor中Task运行完成时,需要将Task运行结果返回Driver程序,Driver程序根据结果判断该Stage是否计算完成

7.1、ResultTask结果

ResultTask完成后,会将其结果返回直Driver端。根据运行结果的大小返回的结果 被分为直接运行结果和非直接间接运行结果。 当运行结果大于Spark配置的最大直接结果大小的参数时, 会将运行结果保存至当前Executor的BlockManager中,并将保存的地址序列化后返回,否则直接将运行结果序列化后返回

7.2、ShuffleMapTask结果

ShuffleMapTask运行完成后,会将运行结果直接保存至当前Executor的BlockManager中,并将保存结果的位置封装到MapStatus中,最终ShuffleMapTask运行完成结果都为MapStatus类型

7.3、返回至Driver端

Executor将Task的运行结果序列化后,通过Driver的Endpoint-Ref发送至Driver端,Driver的Endpoint收到运行结果后,通知TaskScheduler Task运行完成

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1418036.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

解决 微信退款,本地退款可以,但是测试环境退款失败问题

1.问题描述 微信小程序退款操作,测试环境一直退款失败,但是本地测试退款却没问题 2.原因分析 本地退款可以,但是测试环境不行,说明问题出在测试环境上 经过排查发现是测试环境的微信商户证书是另一个小程序的,不是正在…

【Delphi】系统菜单中增加菜单项

目录 一、问题提出 二、程序截图 ​编辑 ​编辑 三、程序代码: 一、问题提出 我们在开发windows程序的时候,可能会希望在窗体的系统菜单中增加一个菜单项,那么如何实现呢,实际上通过调用windows API是可以实现的,…

C++初阶:C/C++内存管理、new与delete详解

之前结束了类与对象:今天进行下面部分内容的学习 文章目录 1.C/C内存分布2.C语言中动态内存管理方式:malloc/calloc/realloc/free3.C动态内存管理方式3.1new/delete操作内置类型3.2new和delete操作自定义类型 4.operator new与operator delete函数5.new和…

安科瑞AAFD系列故障电弧探测器应用以及选型

功能: 故障电弧探测器(以下简称探测器)对接入线路中的故障电弧(包括故障并联电弧、故障串联电弧)进行有效的检测,当检测到线路中存在引起火灾的故障电弧时,可以进行现场的声光报警,…

MYSQL中group by分组查询的用法详解(where和having的区别)!

文章目录 前言一、数据准备二、使用实例1.如何显示每个部门的平均工资和最高工资2.显示每个部门的每种岗位的平均工资和最低工资3.显示平均工资低于2000的部门和它的平均工资4.having 和 where 的区别5.SQL查询中各个关键字的执行先后顺序 前言 在前面的文章中,我们…

MySQL知识点总结(一)——一条SQL的执行过程、索引底层数据结构、一级索引和二级索引、索引失效、索引覆盖、索引下推

MySQL知识点总结(一)——一条SQL的执行过程、索引底层数据结构、一级索引和二级索引、索引失效、索引覆盖、索引下推 一条SQL的执行过程索引底层数据结构为什么不使用二叉树?为什么不使用红黑树?为什么不使用hash表?为什么不使用…

elementUI的el-select传递item对象或其他参数的2种方法

方法1 :value“item” 绑定对象 只要:value绑定item对象就可以 value-key"value" 必须是item里的一个属性&#xff0c;绑定值为对象类型时必填 <el-select v-model"value" placeholder"请选择" value-key"value" change"cha…

nginx部署前端(vue)项目及配置修改

目录 一、前端应用打包 二、部署前端应用 1、上传前端文件夹 2、修改nginx配置文件 3、重启nginx 三、查看效果 nginx安装参考&#xff1a;linux安装nginx-CSDN博客 一、前端应用打包 打包命令 npm run build 打包成功如下&#xff0c;会在项目路径下生成dist文件夹 二…

为什么说2023年是AI元年

前言 思考者~ 2023年被称为AI元年&#xff0c;主要是因为在这一年中&#xff0c;人工智能技术在各个领域取得了突破性的进展和应用&#xff0c;这些技术的广泛应用深刻地影响了人们的生活和工作方式&#xff0c;也预示着未来AI技术的更大潜力和发展空间。 首先&#xff0c;…

【数据分析】numpy基础第二天

文章目录 前言数组的形状变换reshape的基本介绍使用reshapereshape([10, 1])运行结果reshape自动判断形状reshape([-1, 1])运行结果 合并数组使用vstack和hstackvstack和hstack的运行结果使用concatenateconcatenate运行结果 分割数组array_split运行结果 数组的条件筛选条件筛…

C++ 数论相关题目 求组合数I

直接按照公式求解会超时。 常用组合数递推式。 因此用递推式先预处理出来&#xff0c;然后查表。 #include <iostream> #include <algorithm>using namespace std;const int N 2010, mod 1e9 7;int c[N][N];void init() {for(int i 0; i < N; i )for(in…

Git 教程 | 将本地修改后的文件推送到 Github 指定远程分支上

Git 是一种分布式版本控制系统&#xff0c;用于敏捷高效地处理任何大小的项目。它是由 Linus Torvalds 为了帮助管理 Linux 内核开发而开发的开源版本控制软件。Git 的本地克隆就是一个完整的版本控制存储库&#xff0c;无论脱机还是远程都能轻松工作。开发人员会在本地提交其工…

AWS 专题学习 P10 (Databases、 Data Analytics)

文章目录 专题总览1. Databases1.1 选择合适的数据库1.2 数据库类型1.3 AWS 数据库服务概述Amazon RDSAmazon AuroraAmazon ElastiCacheAmazon DynamoDBAmazon S3DocumentDBAmazon NeptuneAmazon Keyspaces (for Apache Cassandra)Amazon QLDBAmazon Timestream 2. Data & …

专业做护眼灯的有哪些品牌?专业做护眼灯的品牌

相信很多家长都有过帮助孩子选购台灯的经历&#xff0c;但或许并不清楚市面上存在许多质量不合格的台灯。曾在央视一期节目曝光了许多LED台灯存在严重不合格的问题。尽管现在台灯通常需要执行强制性产品认证&#xff0c;并且进行不定期的抽样检查&#xff0c;但仍有一些没有接受…

【JaveWeb教程】(36)SpringBootWeb案例之《智能学习辅助系统》的详细实现步骤与代码示例(9)登录问题分析与会话控制技术讲解

目录 SpringBootWeb案例092. 登录校验2.1 问题分析2.2 会话技术2.2.1 会话技术介绍2.2.2 会话跟踪方案2.2.2.1 方案一 - Cookie2.2.2.2 方案二 - Session2.2.2.3 方案三 - 令牌技术 SpringBootWeb案例09 2. 登录校验 2.1 问题分析 我们已经完成了基础登录功能的开发与测试&a…

2024年搭建幻兽帕鲁服务器价格多少?如何自建Palworld?

自建幻兽帕鲁服务器租用价格表&#xff0c;2024阿里云推出专属幻兽帕鲁Palworld游戏优惠服务器&#xff0c;配置分为4核16G和4核32G服务器&#xff0c;4核16G配置32.25元/1个月、3M带宽96.75元/1个月、8核32G配置10M带宽90.60元/1个月&#xff0c;8核32G配置3个月271.80元。ECS…

【C++】引用、内联函数、auto关键字等

前言&#xff1a;在前面我们讲解了C入门基础的一些学习例如命名空间、缺省参数、函数重载等。今天我们将进一步的学习&#xff0c;跟着博主的脚步再次往前迈一步吧。 &#x1f496; 博主CSDN主页:卫卫卫的个人主页 &#x1f49e; &#x1f449; 专栏分类:高质量&#xff23;学习…

高分文献解读|乳酸通过与可溶性腺苷酸环化酶结合调控铁代谢

乳酸(LA)的过量产生可能发生在运动期间或者许多疾病中&#xff0c;例如癌症中。个人伴有高乳酸血症的患者常表现为贫血、血清铁减少以及一种铁代谢关键调控因子—铁调素&#xff08;hepcidin&#xff09;升高。然而&#xff0c;目前尚不清楚乳酸是否以及如何调节铁调素的表达。…

Android Clear架构最强官方指南Kotlin版

Android Clear架构最强官方指南Kotlin版 在这篇文章中&#xff0c;我将介绍关于Android应用程序架构的一些内容。尽管自从早期更稳健的Android架构方法在移动开发中变得流行以来已经说了很多话&#xff0c;但改进和演进的空间总是存在的。 基于上述文章中的清晰架构示例&…

证券公司怎么选择?福州开股票账户佣金最低是多少?怎么开低佣金账户?

股票交易佣金是指投资者在进行股票交易时&#xff0c;需要向券商支付的手续费。具体的佣金费用根据券商的政策而有所不同&#xff0c;一般分为固定佣金和按比例佣金两种方式。 固定佣金是指交易每一笔固定收取一定金额的佣金&#xff0c;通常适用于较小交易量的投资者&#xf…