【Spark系列1】Spark作业执行原理

news2025/1/16 18:00:25

 本文字数在7800字左右,预计时间在15分钟

一、整体流程

每个Aciton操作会创建一个JOB,JOB会提交给DAGScheduler,DAGScheduler根据RDD依赖的关系划分为多个Stage,每个Stage又会创建多个TaskSet,每个TaskSet包含多个Task,这个Task就是每个分区的并行计算的任务。DAGScheduler将TaskSet按照顺序提交给TaskScheduler,TaskScheduler将每一个任务去找SchedulerBackend申请执行所需要的资源,获取到资源后,SchedulerBackend将这些Task提交给Executor,Executor负责将这些任务运行起来。

二、JOB提交

2.1、为什么需要action操作

在Spark中,分为transformation操作和action操作。执行用户程序时,transformation操作将一个RDD转换成了新的RDD,并在compute()函数中,记录了如何根据父RDD计算出当前RDD的数据、RDD如何分区等信息,并且能够得出最后一个RDD的数据。 但是RDD中的每个分区中依然是一条一条的分散的数据,那么要对最后一个RDD执行什么操作呢?这就是action操作的作用。

2.2、Job提交

每个action操作都会生成一个Job,这个Job包含了需要计算的RDD对象、需要计算的分区、需要执行什么样的计算。RDD和用户执行的计算都是可以序列化的,RDD序列化之后,在Executor中反序列化之后即可得到该RDD对象,再根据对象compute()函数就可以计算出某个分区的数据。JOB中包含的数据如下所示

2.3、分布式执行

当提交Job以后,就可以将Job划分为多个并行的任务,每个任务计算指定分区的一个分区即可。通过RDD的计算函数即可计算出该分区的数据,今儿计算出分区的结果。

三、Stage划分

3.1、宽依赖和窄依赖

如果一个RDD的每个分区最多只能被一个Child RDD的一个分区所使用, 则称之为窄依赖(Narrow dependency), 如果被多个Child RDD分区依赖, 则称之为宽依赖(wide dependency)

3.2、Stage划分

在用户编写的一系列转换中,多个RDD可能既形成了多次窄依赖,也形成了多次宽依赖,连续的窄依赖可以通过一个任务进行流水线处理,但是如果遇到了宽依赖,就必须先将父RDD的所有数据都进行计算并保存起来,再进行RDD的运算。在一个Job中,action操作知识定义了在最后的RDD中执行何种操作,而最后的RDD会依赖上个RDD,上个RDD又会有其他依赖,这样就形成了一系列的依赖关系。如果为宽依赖的话,就在依赖的地方进行切分,先将宽依赖的父RDD进行计算出来,再计算后续的RDD,按照快依赖被划分的过程,即为Stage划分的过程。

如上图所示,rdd1->rdd2,rdd3->rdd4是窄依赖,rdd2->rdd3,rdd4->rdd5是宽依赖。在发生shuffle的位置,Spark将计算分为两个阶段分别执行,每发生一次shuffle,Spark就将计算划分为先后的两个阶段,如下图

在划分阶段的过程中,对于某个阶段而言其并行的计算任务都完全相同,因此在Job执行的过程中,并行计算就是指每个阶段中任务并行的计算。如在Stage1中,每个分区的数据可以使用一个任务进行计算。10000个分区即可在集群中并行运行10000个任务进行计算。如果集群资源不够,可以将10000个任务依次在集群中运行,直到运行完毕,再进行Stage2的计算。Stage2也会根据分区数启动多个任务并行的加载Stage1生成的数据,完成Stage2的计算。

在一个Job的运行过程中,所有的Stage其实都是为最后一个Stage做准备,因为action操作只需要最后一个RDD的数据。因此最后一个Stage称为ResultStage,之前所有的Stage都是由Shuffle引起的中间计算过程,被称为ShuffleMapStage。其过程如下图

3.3、Spark实现

再Spark实现中,SparkContext将Job提交至DAGScheduler,DAGScheduler获取Job中执行action操作的RDD,将最后执行action操作的RDD划分到最后的ResultStage中,然后遍历该RDD的依赖和所有的父依赖,每遇到宽依赖就将两个RDD划分到两个不同的Stage中,遇到窄依赖就将窄依赖的多个RDD划分到一个Stage中,经过这次操作,一个RDD就划分为有多个依赖关系的Stage。再每个Stage中,所有的RDD之间都是窄依赖的关系,Stage之间的RDD都是宽依赖的关系。DAGScheduler将最初被依赖的Stage提交,计算该Stage中的数据,计算完成后,再将后续的Stage提交,知道最后运行的ResultStage,则整个计算Job完成。ResultStage和ShuffleMapStage结构如下图

在生成ShuffleapStage时,ShuffleDependency起到了承上启下的作用,如果两个RDD之间为宽依赖,子RDD的依赖为ShuffleDependency;在划分Stage的时候,父Stage会保存该ShuffleDependency,以便在执行父Stage的时候,根据ShuffleDependency获取Shuffle的写入器,在子Stage执行的时候,会根据RDD的依赖关系使用相同的ShuffleDependency获取Shuffle的读取器。

在计算过程中,ShuffleMapStage会生成该Stage的结果,为下一个Stage提供数据,计算下一个Stage的RDD的时候,会拉取上一个Stage的计算结果。上一个Stage的计算保存在哪呢?答案是Spark的组件MapOutputTracker。MapOutputTracker也是主从结构,Executor端是MapOutputTrackerWroker,当ShuffleMapStage的任务运行完成后,会通过Executor上的MapOutputTrackerWroker将数据保存的位置发送到Driver上的MapOutputTrackerMaster中。在后续Stage需要上一个Stage的计算结果的时候,就通过MapOutputTrackerMaster询问计算结果的保存位置,进而加载相应的数据。

四、Task划分

DAGScheduler将Job划分为多个Stage之后,下一步就是将Stage划分为多个可以在集群中并行执行的任务,只有将任务并行执行,Stage才能更快的完成。

4.1、任务的个数

由于Stage中都是对RDD的计算,RDD又是分区的,所以在对任务进行划分的时候,每个分区可以启动一个任务进行计算。无论是ResultStage还是ShuffleMapStage,每个阶段能够并行执行的任务数量都取决于该阶段中最后一个Rdd的分区数量

上面已经介绍,在一个Stage中,RDD的依赖关系是窄依赖,所以最后一个RDD的分区数量取决于其依赖的RDD的分区数量,一直依赖到该阶段的开始的RDD的分区。对于第一阶段开始的RDD分为两种情况:

  1. 第一种为初始的RDD,即从数据源加载数据形成的初始RDD,这种情况的分区数量取决于初始RDD的形成分区方式。
  2. 第二种为该阶段的初始RDD为Shuffle阶段的Reduce任务,这种情况下,该RDD的分区数量取决于在Shuffle的Map阶段最后一个RDD的分区器设置的分区数量。

4.2、Task的生成

当确定了每个Stage的分区数量之后,就需要为每个分区生成相应的计算任务,该计算任务就是需要对该阶段的最后一个RDD执行什么操作

在ResultStage中,需要对最后一个RDD的每个分区分别执行用户自定义的action操作,所以在ResultStage中生成的每个Task都包含以下三个部分

  1. 需要对哪个RDD进行操作
  2. 需要对RDD哪个分区进行操作
  3. 需要对分区的内容执行什么样的操作

在ResultStage中划分的Task称为ResultTask,ResultTask中包含了ResultStage中最后一个RDD,即执行action操作的的RDD,需要计算的RDD分区的id和执行action操作的函数。

在ShuffleMapStage中,最终需要完成Shuffle过程中的Map阶段的操作,每个分区按照Shuffle中的Map端定义的过程执行数据的分组操作,将分组结果进行保存,并将保存结果位置通知Driver端的MapOutputTrackerMaster,MapOutputTrackerMaster保存着每一个Shuffle中Map输出的位置。在ShuffleMapStage中划分的Task称为ShuffleMapTask。ShuffleMapTask同样由三个重要的部分组成:Stage中最后的RDD、需要计算的分区的id、划分Stage的ShuffleDependency

4.3、Task的最佳运行位置

生成Task时,还会计算Task的最佳运行位置。虽然RDD包含计算RDD的所有信息,可以在任何节点上运行,但是如果通过为Task计算分配最佳的运行位置,可以将Task调度到含有该Task需要的数据的节点,从而实现移动计算而不是移动数据的目的。Spark会根据RDD可能分布的的情况,将Task的运行位置主要分为Host级别和Executor级别当一个RDD被某个Executor缓存,则对该RDD计算时,优先会把计算的Task调度到该Executor中执行。当一个RDD需要的数据存在某个host中时,则会把该Task调度到这个节点的Executor中

五、Task的执行

5.1、Task执行流程

DAGScheduler将Stage生成TaskSet之后,会将Task交给TaskScheduler进行处理,TaskScheduler负责将Task提交到集群中运行,并负责失败重试,为DAGScheduler返回事件信息等,整体如流程如下:

当任务提交到TaskScheduler时,TaskScheduler会通知SchedulerBackend分配计算资源,SchedulerBackend将所有可用的Executor的资源信息转换成WorkerOffer交给TaskScheduler,WorkerOffer中包含executorId、Executor的hostname、Executor的可用CPU等。TaskScheduler负责根据WorkerOffer在相应的Executor分配TaskSet中的Task,并将Task转换为TaskDescription交给SchedulerBackend。最终有空闲的的CPU的Executor会被分配到一个或者多个TaskDescription,SchedulerBackend将这些TaskDescription提交到对应的Executor中执行。

5.2、集群资源管理

Task运行离不开集群中的计算资源,即在SparkContext初始化过程中创建的Executor资源。在Executor创建完毕后回向SchedulerBackend中注册。Executor在注册时发送的信息包含的内容有:executorId,Executor-Ref引用、Executor的hostname、可用的CPU核数。

SchedulerBackend收到后,会将Executor的注册信息转换为ExecutorData进行保存,并且在SchedulerBackend中使用Map结构保存每个executorId和ExecutorData的关系,ExecutorData中还记录了剩余的可用的CPU核数

在为计算任务分配资源时,只需遍历所有的ExecutorData,分配可用的资源即可。由ExecutorData分配的可用资源使用WorkerOffer表示,WorkerOffer中包含executorId、Executor的hostname、Executor的可用CPU等。

5.3、任务的分配

TaskScheduler在接受到DAGScheduler提交的TaskSet以后,会为每个TaskSet创建一个TaskSetManager,用于管理TaskSet中所有任务的运行。TaskSetManager会根据Task中的最佳运行位置计算TaskSet的所有本地运行级别,本地运行级别决定Task最终在哪个Executor上运行。Spark中本地运行级别从小到达可分为:

  1. 进程本地化
  2. 节点本地化
  3. 无优先位置
  4. 机架本地化
  5. 任意节点

在TaskSetManager初始化时,根据着5个本地运行级别分别创建5个Map,分别记录其下可以运行的所有Task。这些映射关系的建立,时根据生成Task时Task运行的最佳位置确定的。。在这5种映射关系中,某个Task可能会重复存在于几个本地化级别中。

当有新的TaskSet加入、由Task执行完成、由新的Executor加入时,都会触发SchedulerBackend重新计算可用资源。TaskScheduler根据调度的顺序,依次调度TaskSetManager中的TaskSet,对于每个TaskSet遍历所有本地化级别,从小到大尝试在Executor分配Task,根据每个WorkerOffer的executorId和hostname,使用TaskSetManager判断在当前本地化级别中,是否可以在该Executor或Host上分配任务,直到该本地化级别无法分配Task,再将本地化级别提高一级再次尝分配Task。经过对本地化级别的便利,即可实现WorkerOffer分配任务或将所有待执行的任务分配完成。TaskSet中部分任务分配完成以后会生成一组TaskDescription,每个TaskDescription中包含executorId和Task的其他运行信息。SchedulerBackend根据TaskDescription的executorId,将每个任务封装成LaunchTask消息提交到不同的Executor中

六、Task的执行

Executor收到SchedulerBackend提交的LaunchTask消息后,即可运行该消息中包含的Task。Executor将接收到的Task封装到TaskRunner中,TaskRunner是一个Runnable接口,从而可以将该任务提交到线程池中运行。

6.1、Executor可以并行运行Task的数量

在创建Executor时,每个Executor可能会分配多个CPU核数,而Executor运行的所有任务都是在线程池中运行。Executor运行的时候其本身没有记录CPU使用的情况,对于Executor能够同时运行多少个任务是由SchedulerBackend控制的,SchedulerBackend每在一个Executor中提交一个任务时,便在ExecutorData中减少该Executor可用的CPU核数,直到该Executor生成的WorkerOffer可用的CPU核数为0,便不再为Executor分配任务了。默认每个Task使用一个CPU核心运行,该变量可以通过Spark的配置spark.task.CPUs修改

6.2、Executor中资源共享

当在一个Executor上运行多个Task时,多个Task共享Executor中的SparkEnv的所有组件,共用Executor中分配的内存。如使用Spark广播变量时,每个Executor中会存在一份,Executor所有任务共享这一份变量。当Executor中的BlockManager缓存了某个rdd某分区的数据时,在该Executor上调度使用这个RDD的这个分区的数据的Task执行,可以有效的减少网络加载数据的过程,减少网络传输

6.3、ResultTask运行

在执行ResultTask时,首先会反序列化出该Task执行计算的RDD和对该RDD执行的操作。根据是否涉及Shuffle操作,分为两种

  1. 用户编写的RDDtransformation中,不涉及Shuffle操作,一个Job就只涉及一个ResultStage,rdd1直接从数据源中加载

  2. 过程中涉及Shuffle操作,划分为两个Stage,rdd1位Shuffle的Reduce阶段。由于DAGScheduler在划分Stage,必先会先计算父Stage,所以执行到ResultStage时,,其父Stage的Map阶段已经完成,并且计算结果已经保存到了BlockManager中,ResultStage中的rdd1之需要根据MapOutputTrackerMaster的计算结果位置信息加载该分区的数据即可

6.4、ShuffleMapTask运行

在计算ShuffleMapTask时,首先会反序列化出Task包含的计算的RDD和划分此Stage的ShulffleDependency。ShulffleDependency包含RDD需要执行分组操作的分区器partitioner,并且通过ShulffleDependency可以获取ShulffleManager的写入器,将本分区的分组计算结果通过写入器写入文件中进行保存。在这个过程中,一个分区的数据生成的多个分组的数据分别属于下游Reduce阶段的不同的分区的数据

ShuffleMapTask中计算的RDD同样为这个Stage中最后的一个RDD。

下图是多个ShuffleMapStage的RDD转换过程

七、Task结果处理

当Executor中Task运行完成时,需要将Task运行结果返回Driver程序,Driver程序根据结果判断该Stage是否计算完成

7.1、ResultTask结果

ResultTask完成后,会将其结果返回直Driver端。根据运行结果的大小返回的结果 被分为直接运行结果和非直接间接运行结果。 当运行结果大于Spark配置的最大直接结果大小的参数时, 会将运行结果保存至当前Executor的BlockManager中,并将保存的地址序列化后返回,否则直接将运行结果序列化后返回

7.2、ShuffleMapTask结果

ShuffleMapTask运行完成后,会将运行结果直接保存至当前Executor的BlockManager中,并将保存结果的位置封装到MapStatus中,最终ShuffleMapTask运行完成结果都为MapStatus类型

7.3、返回至Driver端

Executor将Task的运行结果序列化后,通过Driver的Endpoint-Ref发送至Driver端,Driver的Endpoint收到运行结果后,通知TaskScheduler Task运行完成

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1416762.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

python简单socket demo

socket说明 socket本质是编程接口(API),对TCP/IP的封装,TCP/IP也要提供可供程序员做网络开发所用的接口,这就是Socket编程接口。除了常见的http请求之外,一些敏感的数据传输常用socket套接字层直接传输数据。一个简单的domo用于熟…

基于Micropython利用ESP32-C3墨水屏电子时钟方法

本篇笔记介绍一下我们设计制作的墨水屏时钟。 1、所需硬件 1)合宙的ESP32-C3: 2)电子价签拆出来的2.9寸墨水屏: ——电子价签型号为:Stellar-L,墨水屏型号为:E029A01。 3)自己设计…

我的2023年度总结

今天和去年一样,没有目录,正文开始: 距离上次更新博客已经过去很久了 我们还是不忘初心,先推荐一本书《培根随笔》。最近有想看马克思的著作,马哲才是世界大法。 这一年,过得很快。如果没有保持写日记的习惯…

多文件开发

当所有的类都写在main.m这个源文件之中、将不利于后期的维护和团队开发 推荐的方式 把1个类写在1个模块之中,而1个模块至少包含两个文件 h头文件 1.写的类声明因为要用到Foundation框架中的类NS0 bject所以在这个头文件中要引入 Foundationa 2.框架的头文件 3.然后…

1.23神经网络框架(sig函数),逆向参数调整法(梯度下降法,链式法则(理解,及处理多层神经网络的方式))

框架 输入层 隐藏层 存在一个阈值,如果低于某一阈值就不激活;高于了就激活 输出层 逆向参数调整方法 初始阶段,随机设置权重值w1,w2 依据训练集 两个数学方法 (梯度下降、链式法则) 调参借助两个数学方法 当导数为…

【归并排序】【图论】【动态规划】【 深度游戏搜索】1569将子数组重新排序得到同一个二叉搜索树的方案数

本文涉及知识点 动态规划汇总 图论 深度游戏搜索 归并排序 组合 LeetCoce1569将子数组重新排序得到同一个二叉搜索树的方案数 给你一个数组 nums 表示 1 到 n 的一个排列。我们按照元素在 nums 中的顺序依次插入一个初始为空的二叉搜索树(BST)。请你统…

了解OpenCV的数据类型

OpenCV是一个开源的计算机视觉库,广泛应用于图像和视频处理领域。在OpenCV中,数据类型扮演着非常重要的角色,它们决定了数据的存储方式和操作方式。本文将介绍OpenCV中常见的数据类型,包括图像数据类型、矩阵数据类型和轮廓数据类…

线程调度(Java Android)

关于作者:CSDN内容合伙人、技术专家, 从零开始做日活千万级APP。 专注于分享各领域原创系列文章 ,擅长java后端、移动开发、商业变现、人工智能等,希望大家多多支持。 未经允许不得转载 目录 一、导读二、概览2.1、线程的属性 三、…

Dockerfile简介和基础实践

文章目录 1、Dockerfile简介1.1、Dockerfile解决的问题1.2、docker build 构建流程1.3、关键字介绍 2、Dockerfile 实践2.1、基本语法实践 --- golang2.1.1 问题检查 2.2、基本语法实践 --- gcc 总结 1、Dockerfile简介 Dockerfile是一个创建镜像所有命令的文本文件, 包含了一…

C++进阶(七)AVL树

📘北尘_:个人主页 🌎个人专栏:《Linux操作系统》《经典算法试题 》《C》 《数据结构与算法》 ☀️走在路上,不忘来时的初心 文章目录 一、AVL树的概念二、AVL树的旋转1、左单旋2、右单旋3、左右双旋4、右左双旋 三、AVL树的基本实…

PCL Kdtree 使用示例

PCL Kdtree 使用示例 文章目录 PCL Kdtree 使用示例一、关于 KDTree二、关于最近邻搜索三、复杂度分析四、C代码示例五、关键函数说明nearestKSearch 函数说明 一、关于 KDTree 点云数据主要是, 表征 目标表面 的海量点集合, 并不具备传统实体网格数据的…

Hive中left join 中的where 和 on的区别

目录 一、知识点 二、测试验证 三、引申 一、知识点 left join中关于where和on条件的知识点: 多表left join 是会生成一张临时表。on后面: 一般是对left join 的右表进行条件过滤,会返回左表中的所有行,而右表中没有匹配上的数…

【Git】项目管理笔记

文章目录 本地电脑初始化docker报错.gitignoregit loggit resetgit statusgit ls-filesgit rm -r -f --cached拉取仓库文件更新本地的项目报错处理! [rejected] master -> master (fetch first)gitgitee.com: Permission denied (publickey).error: remote origin already e…

linux进程(上)

目录 进程的概念 进程的状态 进程状态的理解 特殊的进程 本期我们将进行linux进程的学习。 进程的概念 我们或多或少都听说过进程的概念,但是在操作系统中,进程到底是什么呢? 进程就是程序的一次执行过程。 一个程序要被执行&#xff…

NLP自然语言处理的发展:从初创到人工智能的里程碑

自然语言处理(Natural Language Processing,NLP)人工智能领域中备受关注的重要分支之一。它使得计算机能够理解、解释和使用人类语言。随着技术的不断发展,NLP经历了从初创时期到深度学习时代的巨大演变,推动了互联网产…

AWS云用户创建

问题 需要给工友创建AWS云的用户,这里假设使用分配给自己AWS开发者IAM账号,给别人创建aws IAM账号。 登录系统 打开页面:https://xxx.signin.aws.amazon.com/console,使用分配的开发者账号登录。如下图: 创建用户…

有手就行!阿里云上3分钟搞定幻兽帕鲁联机服务器搭建

幻兽帕鲁最近在社区呈现了爆火的趋势,在线人数已突破百万级别,官方服务器也开始出现不稳定,卡人闪退的情况。对于有一定财力的小伙伴,搭建一个私人服务器是一个最稳定而舒服的解决方案。 本文萝卜哥将讲解一下如何快速搭建 palwo…

【技能---ubuntu上的github常用命令及其将自己的文件夹上传流程】

提示:文章写完后,目录可以自动生成,如何生成可参考右边的帮助文档 文章目录 前言GitHub 作用github常用命令ubuntu上传自己的文件夹到github上1.创建远程仓库2. 生成token3. 上传本地代码 总结 前言 随着自己的代码越来越多,需要…

在线小学数学作业练习册出题网站源码,支持打印转成PDF

源码介绍 小学数学出题网页版源码,加减乘除混合运算,支持自定义数字、小数、混合运算,支持加减乘除运算混合多选(一道题中同时随机出现加减乘除运算符)支持自定义出题数量,支持一键打印成pdf,支…

k8s-配置管理

一、ConfigMap 1.1 创建ConfigMap 1.2 在环境种使用ConfigMap ConfigMap最为常见的使用方式就是在环境变量和Volume中引用。 1.3 在Volume中引用ConfigMap 在Volume中引用ConfigMap,就是通过文件的方式直接将ConfigMap的每条数据填入Volume,每条数据是…