Spark(27):Spark任务调度机制

news2025/1/10 6:10:55

目录

0. 相关文章链接

1. Spark任务调度概述

2. Spark Stage级调度

3. Spark Task级调度

3.1. 调度策略

3.1.1. FIFO调度策略

3.1.2. FAIR调度策略

3.2. 本地化调度

3.3. 失败重试与黑名单机制


0. 相关文章链接

 Spark文章汇总 

1. Spark任务调度概述

        在生产环境下,Spark 集群的部署方式一般为 YARN-Cluster 模式,之后的内核分析内容中我们默认集群的部署方式为 YARN-Cluster 模式。在上一章中我们讲解了 Spark YARN-Cluster 模式下的任务提交流程,但是我们并没有具体说明 Driver 的工作流程, Driver 线程主要是初始化 SparkContext 对象,准备运行所需的上下文,然后一方面保持与ApplicationMaster 的 RPC 连接,通过 ApplicationMaster 申请资源,另一方面根据用户业务逻辑开始调度任务,将任务下发到已有的空闲 Executor 上。 

        当 ResourceManager 向 ApplicationMaster 返回 Container 资源时,ApplicationMaster 就尝试在对应的 Container 上启动 Executor 进程,Executor 进程起来后,会向 Driver 反向注册,注册成功后保持与 Driver 的心跳,同时等待 Driver 分发任务,当分发的任务执行完毕后,将任务状态上报给 Driver。 

        当 Driver 起来后,Driver 则会根据用户程序逻辑准备任务,并根据 Executor 资源情况逐步分发任务。在详细阐述任务调度前,首先说明下 Spark 里的几个概念。一个 Spark 应用程序包括 Job、Stage 以及 Task 三个概念: 

  • Job 是以 Action 方法为界,遇到一个 Action 方法则触发一个 Job; 
  • Stage 是 Job 的子集,以 RDD 宽依赖(即 Shuffle)为界,遇到 Shuffle 做一次划分; 
  • Task 是 Stage 的子集,以并行度(分区数)来衡量,分区数是多少,则有多少个 task。 

Spark 的任务调度总体来说分两路进行,一路是 Stage 级的调度,一路是 Task 级的调度,总体调度流程如下图所示: 

        Spark RDD 通过其 Transactions 操作,形成了 RDD 血缘(依赖)关系图,即 DAG,最后通过 Action 的调用,触发 Job 并调度执行,执行过程中会创建两个调度器:DAGScheduler和 TaskScheduler。 

  • DAGScheduler 负责 Stage 级的调度,主要是将 job 切分成若干 Stages,并将每个 Stage 打包成 TaskSet 交给 TaskScheduler 调度。 
  • TaskScheduler 负责 Task 级的调度,将 DAGScheduler 给过来的 TaskSet 按照指定的调度策略分发到 Executor 上执行,调度过程中 SchedulerBackend 负责提供可用资源,其中 SchedulerBackend 有多种实现,分别对接不同的资源管理系统。 

        Driver 初始化 SparkContext 过程中,会分别初始化 DAGScheduler、TaskScheduler、 SchedulerBackend 以及 HeartbeatReceiver,并启动 SchedulerBackend 以及 HeartbeatReceiver。 SchedulerBackend 通过 ApplicationMaster 申请资源,并不断从 TaskScheduler 中拿到合适的 Task 分发到 Executor 执行。HeartbeatReceiver 负责接收 Executor 的心跳信息,监控 Executor 的存活状况,并通知到 TaskScheduler。 

2. Spark Stage级调度

        Spark 的任务调度是从 DAG 切割开始,主要是由 DAGScheduler 来完成。当遇到一个 Action 操作后就会触发一个 Job 的计算,并交给 DAGScheduler 来提交,下图是涉及到 Job提交的相关方法调用流程图。 

  • Job 由最终的 RDD 和 Action 方法封装而成; 
  • SparkContext 将 Job 交给 DAGScheduler 提交,它会根据 RDD 的血缘关系构成的 DAG 进行切分,将一个 Job 划分为若干 Stages,具体划分策略是,由最终的 RDD 不断通过依赖回溯判断父依赖是否是宽依赖,即以 Shuffle 为界,划分 Stage,窄依赖的 RDD 之间被划分到同一个 Stage 中,可以进行 pipeline 式的计算。划分的 Stages 分两类,一类叫做ResultStage,为 DAG 最下游的 Stage,由 Action 方法决定,另一类叫做ShuffleMapStage,为下游 Stage 准备数据,下面看一个简单的例子 WordCount。 

        Job 由 saveAsTextFile 触发,该 Job 由 RDD-3 和 saveAsTextFile 方法组成,根据 RDD 之间的依赖关系从 RDD-3 开始回溯搜索,直到没有依赖的 RDD-0,在回溯搜索过程中,RDD3 依赖 RDD-2,并且是宽依赖,所以在 RDD-2 和 RDD-3 之间划分 Stage,RDD-3 被划到最后一个 Stage,即 ResultStage 中,RDD-2 依赖 RDD-1,RDD-1 依赖 RDD-0,这些依赖都是窄依赖,所以将 RDD-0、RDD-1 和 RDD-2 划分到同一个 Stage,形成 pipeline 操作,。即 ShuffleMapStage 中,实际执行的时候,数据记录会一气呵成地执行 RDD-0 到 RDD-2 的转化。不难看出,其本质上是一个深度优先搜索(Depth First Search)算法。 

        一个 Stage 是否被提交,需要判断它的父 Stage 是否执行,只有在父 Stage 执行完毕才能提交当前 Stage,如果一个 Stage 没有父 Stage,那么从该 Stage 开始提交。Stage 提交时会将 Task 信息(分区信息以及方法等)序列化并被打包成 TaskSet 交给 TaskScheduler,一个 Partition 对应一个 Task,另一方面 TaskScheduler 会监控 Stage 的运行状态,只有 Executor 丢失或者 Task 由于 Fetch 失败才需要重新提交失败的 Stage 以调度运行失败的任务,其他类型的 Task 失败会在 TaskScheduler 的调度过程中重试。 

        相对来说 DAGScheduler 做的事情较为简单,仅仅是在 Stage 层面上划分 DAG,提交Stage 并监控相关状态信息。TaskScheduler 则相对较为复杂,下面详细阐述其细节。 

3. Spark Task级调度

        Spark Task 的调度是由 TaskScheduler 来完成,由前文可知,DAGScheduler 将 Stage 打包到交给 TaskScheTaskSetduler,TaskScheduler 会将 TaskSet 封装为 TaskSetManager 加入到调度队列中,TaskSetManager 结构如下图所示:

TaskSetManager 负责监控管理同一个 Stage 中的 Tasks,TaskScheduler 就是以TaskSetManager 为单元来调度任务。 

        前面也提到,TaskScheduler 初始化后会启动 SchedulerBackend,它负责跟外界打交道,接收 Executor 的注册信息,并维护 Executor 的状态,所以说 SchedulerBackend 是管“粮食” 的,同时它在启动后会定期地去“询问”TaskScheduler 有没有任务要运行,也就是说,它会定期地“问”TaskScheduler“我有这么余粮,你要不要啊”,TaskScheduler 在 SchedulerBackend“问”它的时候,会从调度队列中按照指定的调度策略选择 TaskSetManager 去调度运行,大致方法调用流程如下图所示: 

        上图中,将 TaskSetManager 加入 rootPool 调度池中之后,调用 SchedulerBackend 的riviveOffers 方法给 driverEndpoint 发送 ReviveOffer 消息;driverEndpoint 收到 ReviveOffer 消息后调用 makeOffers 方法,过滤出活跃状态的 Executor(这些 Executor 都是任务启动时反向注册到 Driver 的 Executor),然后将 Executor 封装成 WorkerOffer 对象;准备好计算资源(WorkerOffer)后,taskScheduler 基于这些资源调用 resourceOffer 在 Executor 上分配 task。 

3.1. 调度策略

TaskScheduler 支持两种调度策略,一种是 FIFO,也是默认的调度策略,另一种是 FAIR。在 TaskScheduler 初始化过程中会实例化 rootPool,表示树的根节点,是 Pool 类型。

3.1.1. FIFO调度策略

        如果是采用 FIFO 调度策略,则直接简单地将 TaskSetManager 按照先来先到的方式入队,出队时直接拿出最先进队的 TaskSetManager,其树结构如下图所示,TaskSetManager 保存在一个 FIFO 队列中。 

3.1.2. FAIR调度策略

FAIR 调度策略的树结构如下图所示:

        FAIR 模式中有一个 rootPool 和多个子 Pool,各个子 Pool 中存储着所有待分配的TaskSetMagager。 在 FAIR 模式中, 需要先对子 Pool 进行排序,再对子 Pool 里面的 TaskSetMagager 进行排序,因为 Pool 和 TaskSetMagager 都继承了 Schedulable 特质, 因此使用相同的排序算法。

        排序过程的比较是基于 Fair-share 来比较的,每个要排序的对象包含三个属性:runningTasks值(正在运行的Task数)、minShare值、weight值,比较时会综合考量runningTasks值, minShare 值以及 weight 值。

注意, minShare、 weight 的值均在公平调度配置文件 fairscheduler.xml 中被指定,调度池在构建阶段会读取此文件的相关配置。

  • 如果A对象的runningTasks大于它的minShare,B对象的runningTasks小于它的minShare,那么 B 排在 A 前面;(runningTasks 比 minShare 小的先执行) 
  • 如果 A、B 对象的 runningTasks 都小于它们的 minShare,那么就比较 runningTasks 与 minShare 的比值(minShare 使用率),谁小谁排前面;(minShare 使用率低的先执行) 
  • 如果 A、B 对象的 runningTasks 都大于它们的 minShare,那么就比较 runningTasks 与 weight 的比值(权重使用率),谁小谁排前面。(权重使用率低的先执行) 
  • 如果上述比较均相等,则比较名字。

        整体上来说就是通过minShare和weight这两个参数控制比较过程,可以做到让minShare 使用率和权重使用率少(实际运行 task 比例较少)的先运行。 

        FAIR 模式排序完成后,所有的 TaskSetManager 被放入一个 ArrayBuffer 里,之后依次被取出并发送给 Executor 执行。 从调度队列中拿到 TaskSetManager 后,由于 TaskSetManager 封装了一个 Stage 的所有 Task,并负责管理调度这些 Task,那么接下来的工作就是 TaskSetManager 按照一定的规则一个个取出 Task 给 TaskScheduler,TaskScheduler 再交给 SchedulerBackend 去发到 Executor 上执行。 

3.2. 本地化调度

        DAGScheduler 切割 Job,划分 Stage, 通过调用 submitStage 来提交一个 Stage 对应的tasks,submitStage 会调用 submitMissingTasks,submitMissingTasks 确定每个需要计算的 task 的 preferredLocations,通过调用 getPreferrdeLocations()得到 partition 的优先位置,由于一个partition 对应一个 Task,此 partition 的优先位置就是 task 的优先位置,对于要提交到 TaskScheduler 的 TaskSet 中的每一个 Task,该 task 优先位置与其对应的 partition 对应的优先位置一致。 

        从调度队列中拿到 TaskSetManager 后,那么接下来的工作就是 TaskSetManager 按照一定的规则一个个取出 task 给 TaskScheduler,TaskScheduler 再交给 SchedulerBackend 去发到Executor 上执行。前面也提到,TaskSetManager 封装了一个 Stage 的所有 Task,并负责管理调度这些 Task。 

根据每个 Task 的优先位置,确定 Task 的 Locality 级别,Locality 一共有五种,优先级由高到低顺序: 

        在调度执行时,Spark 调度总是会尽量让每个 task 以最高的本地性级别来启动,当一个 task 以 X 本地性级别启动,但是该本地性级别对应的所有节点都没有空闲资源而启动失败,此时并不会马上降低本地性级别启动而是在某个时间长度内再次以 X 本地性级别来启动该 task,若超过限时时间则降级启动,去尝试下一个本地性级别,依次类推。 

        可以通过调大每个类别的最大容忍延迟时间,在等待阶段对应的 Executor 可能就会有相应的资源去执行此 task,这就在在一定程度上提到了运行性能。 

3.3. 失败重试与黑名单机制

        除了选择合适的 Task 调度运行外,还需要监控 Task 的执行状态,前面也提到,与外部打交道的是 SchedulerBackend,Task 被提交到 Executor 启动执行后,Executor 会将执行状态上报给 SchedulerBackend,SchedulerBackend 则告诉 TaskScheduler,TaskScheduler 找到该Task 对应的 TaskSetManager,并通知到该 TaskSetManager,这样 TaskSetManager 就知道 Task 的失败与成功状态,对于失败的 Task,会记录它失败的次数,如果失败次数还没有超过最大重试次数,那么就把它放回待调度的 Task 池子中,否则整个 Application 失败。 

        在记录 Task 失败次数过程中,会记录它上一次失败所在的 Executor Id 和 Host,这样下次再调度这个 Task 时,会使用黑名单机制,避免它被调度到上一次失败的节点上,起到一定的容错作用。黑名单记录 Task 上一次失败所在的 Executor Id 和 Host,以及其对应的“拉黑”时间,“拉黑”时间是指这段时间内不要再往这个节点上调度这个 Task 了。 


注:其他Spark相关系列文章链接由此进 ->  Spark文章汇总 


本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/766598.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

C#为什么不能成为大学编程入门的首选?

大学编程入门不以C#作为首选的原因有多个因素。虽然C#是一种功能强大的编程语言,但在大学编程入门阶段,通常会选择其他语言作为首选,以下是一些可能的原因: 我这里刚好有嵌入式、单片机、plc的资料需要可以私我或在评论区扣个6 …

投个 3D 冰壶,上班玩一玩 | 物理引擎

本篇文章将介绍如何使用物理引擎和图扑 3D 可视化技术来呈现冰壶运动的模拟。 Oimo.js 物理引擎 Oimo.js 是一个轻量级的物理引擎,它使用 JavaScript 语言编写,并且基于 OimoPhysics 引擎进行了改进和优化。Oimo.js 核心库只有 150K ,专门用…

抢滩“返校季”!这些品类拉开爆单之旅!

“返校季”作为仅次于“黑五网一”的购物旺季。在开学的前四到六周,家长与学生就会开始陆续采购返校季所需的物品,从七月下旬一直持续到九月,都将是“返校季”的购物高峰。今年的“返校季”又将呈现什么样的消费趋势?消费者的消费…

Julia变量作用域问题

变量作用域问题 1.软作用域与硬作用域 1.1软作用域 软作用域是指在代码块内部定义的变量,如果外部有同名变量,则内部变量会被优先使用,即“遮蔽”外部的同名变量,而不影响外部变量。 1.2硬作用域 硬作用域是指在代码块内部定…

DataGrip使用随笔

由于公司不让使用NAVIcat,顾用datagrip作为替代软件 1.下载和安装 从官网下载安装包https://download.jetbrains.com.cn/datagrip/datagrip-2023.1.2.exe后,选择安装位置并试用 2.链接数据库 需要先新建个项目存储所有的db连接信息,然后选…

安卓通过adb pull和adb push 手机与电脑之间传输文件

1.可以参考这篇文章 https://www.cnblogs.com/hhddcpp/p/4247923.html2.根据上面的文章,我做了如下修改 //设置/system为可读写: adb remount //复制手机中的文件到电脑中。需要在电脑中新建一个文件夹,我新建的文件夹为ce文件夹 adb pull …

2.9Frame 框架

2.9Frame 框架 这一次的效果将会像下面的图片一样. Frame 部件 Frame 是一个在 Windows 上分离小区域的部件, 它能将 Windows 分成不同的区,然后存放不同的其他部件. 同时一个 Frame 上也能再分成两个 Frame, Frame 可以认为是一种容器. ###定义一个label显示on the windo…

视频问答新增或修改视频问答

通过问答id新增或修改视频问答题目 新增或修改视频问答 图3:视频问答功能(观看效果) 图4:视频问答功能(观看效果) 图5:视频问答功能(观看效果) 单元测试 Testpublic voi…

Linux下九个实用脚本

目录 1.批量创建用户并设置密码脚本 2.查看网卡实时流量脚本 3.nginx访问日志脚本 4.dos防范攻击(自动屏蔽攻击脚本) 5.监控多台服务器磁盘利用率脚本 6.监控MySQL主从同步异常脚本 7.批量检查网站异常脚本 8.查看服务器资源利用率脚本 9.查找占…

高效出报表的工具有哪些?奥威BI报表工具怎样?

随着企业精细化数据分析的展开,数据分析报表的制作压力也随之增加。对企业而言,拥有一个高效出报表的工具十分重要。高效出报表的工具有哪些?奥威BI报表工具的效率够不够高? 高效出报表的工具有很多,奥威BI报表工具就…

java多线程之并发容器集合

一、多线程操作容器存在的问题 如下代码 public class NotSafeDemo {public static void main(String[] args) {List list new ArrayList();for (int i 0; i < 100; i) {new Thread(() -> {list.add(UUID.randomUUID().toString());System.out.println(list);}, &quo…

Vue导入Echarts实现散点图 axios解析excel流数据 echarts数据可视化前端展示

为实现从本地服务器下载xlsx文件至前端vue echarts中展示&#xff0c;踩过许多坑&#xff0c;现将完整流程和源码分享。 1、 vue axios get请求 返回304 Not Modified 不更新数据 原因&#xff1a;由于浏览器缓存了get请求&#xff0c;导致不管如何刷新&#xff0c;数据都不更…

Echarts:柱状图的第一个柱子超出了y轴

问题 使用Echarts绘制柱状图的时候&#xff0c;第一个柱子超出了y轴&#xff1a; 解决 boundaryGap 此属性与坐标轴两端空白有关。默认值为true即留空&#xff0c;false则表示柱子顶头,而出现上述现象,是因为代码中参数 boundaryGap设置为了false 将boundaryGap的值改为tru…

sadtalker-- 本地生成数字人

文章目录 概要工具讲解安装流程1. 安装Python2. 安装FFmpeg3. 安装Sadtalker4. 导入模型 使用流程1. 生成语音2. 上传图片跟语音3. 点击生成&#xff0c;漫长等待4. 虚拟内存 小结 概要 sadtalker&#xff0c;可以根据图片以及语音&#xff0c;生成动态视频&#xff0c;跟现在…

怎么将拼接的字符串element组件通过“v-html“渲染到页面

如上图所示&#xff0c;没那么多废话&#xff0c;直接上代码 html中 <div id"app"><div :html"tempHtml"></div> </div>vue中 new Vue({el: #app,data() {return {tempHtml: }},created() {this.getHtml()},mounted() {window.…

Java学到什么程度可以在当下面试找工作?

今年的面试主打一个字&#xff1a;卷&#xff01; 随着经济环境下行&#xff0c;大厂降本增效、筛除了一部分冗余岗位&#xff0c;原本荒蛮的IT行业发展正在逐步进入正轨中。虽说今年就业环境不容乐观&#xff0c;但数据不会骗人&#xff0c;以Java为例&#xff0c;在职友集上…

gd32f103vbt6 串口OTA升级3-linux端的部分

一. 简介 本文主要是对linux端升级单片机程序的功能部分做一些介绍&#xff0c;包括一些软件流程。 二.硬件部分 2.1 rk3399cpugd32f103 2.2 连接方式&#xff1a;串口&#xff08;115200&#xff0c;8N1&#xff09;或者iic&#xff08;本文没有介绍iic&#xff09; 三、其…

[nlp] GPT

一、联合训练任务 1.1 NTP(Next Token Prediction) gpt预训练的一个目标函数有两个,第一个是基础的下一个词预测任务,选择一个K窗口,将窗口中的K个词的embedding作为条件去预测下一个词。 1.2 TC(Text Classification) 第二个是一个分类任务,一段话给一个标签,然后去预…

Qt 获得QTableview所选中的行的某一列数据

1、点击QtableView控件-》右键-》跳到槽-》选择 2、编写槽函数信息 void XXX::on_tableView_CalTable_clicked(const QModelIndex &index) {int rowindex.row();//获得当前行索引int colindex.column();//获得当前列索引QModelIndex index1 CalViewModel->index(row,2)…

开鸿智谷与华秋达成生态共创合作,共同打造OpenHarmony硬件生态

7月11日&#xff0c;在2023慕尼黑上海电子展现场&#xff0c;开鸿智谷数字产业发展有限公司(以下简称“开鸿智谷”)与深圳华秋电子有限公司(以下简称“华秋”)签署了生态共创战略合作协议&#xff0c;共同推动开源鸿蒙OpenAtom OpenHarmony(以下简称“OpenHarmony”)硬件生态繁…