二、Spark 调度系统

news2025/1/20 15:41:19

目录

  • Spark 调度系统
    • DAGScheduler
    • SchedulerBackend
    • TaskScheduler
    • ExecutorBackend
    • Spark 任务调度流程

Spark 调度系统

分布式计算的精髓,在于如何把抽象的计算图,转化为实实在在的分布式计算任务,然后以并行计算的方式交付执行

Spark调度系统流程:

  1. 将 DAG 拆分为不同的 Stages ;
  2. 根据Stages创建分布式任务 (Tasks) 和任务组 (TaskSet) ;
  3. 获取集群内可用的硬件资源情况;
  4. 按照调度规则决定优先调度哪些 Tasks/ TaskSet ;
  5. 依序将分布式任务分发到执行器 Executor 。
    在这里插入图片描述

DAGScheduler

  • 根据用户代码构建 DAG;

  • 以 Shuffle 为边界切割 Stages;

  • 基于 Stages 创建 TaskSets,并将 TaskSets 提交给 TaskScheduler 请求调度。

从 DAG 到 Stages 的拆分过程:以 Actions 算子为起点,从后 向前回溯 DAG,以 Shuffle 操作为边界去划分 Stages。

DAGScheduler 根据 Stage 内 RDD 的 partitions 属性创建分布式任务集合 TaskSet。TaskSet 包含一个又一个分布式任务 Task,RDD 有多少数据 分区,TaskSet 就包含多少个 Task。换句话说,Task 与 RDD 的分区,是一一对应的。

在这里插入图片描述
stageId、stageAttemptId 标记了 Task 与执行阶段 Stage 的所属关系;

taskBinary 则封装了隶属于这个执行阶段的用户代码;

partition RDD 数据分区;

locs 属性以字符串的形式记录了该任务倾向的计算节点或是 Executor ID。

SchedulerBackend

获取集群内可用资源

在分发任务之前,调度系统得先判断哪些节点的计算资源空闲,然后再把任务分发过去。

对于集群中可用的计算资源,SchedulerBackend 用 ExecutorDataMap 来记录每一个计算节点中 Executors 的资源状态。ExecutorDataMap 是一种 HashMap,它的 Key 是标记 Executor 的字符串,Value 是ExecutorData Object,ExecutorData 用于封装 Executor 的资源状态,如 RPC 地址、主机地址、可用 CPU 核数和满配 CPU 核数等。

对内,SchedulerBackend 使用ExecutorDataMap 对Executor 做“资源画像”;

对外,SchedulerBackend 以 WorkerOffer 为粒度提供计算资源。WorkerOffer 封装了 Executor ID、主机地址和 CPU 核数,它用来表示一份可用于调度任务的空闲资源。

SchedulerBackend 与集群内所有 Executors 中的 ExecutorBackend 保持周期性通信, 双方通过LaunchedExecutor、RemoveExecutor、StatusUpdate 等来更新可用计算资源。

在这里插入图片描述

TaskScheduler

按照调度规则决定任务优先级,完成任务调度;

TaskScheduler 的调度策略分为两个层次:

  • 不同 Stages 之间的调度优先级,TaskScheduler 提供了两种模式 : FIFO(先到先得)和 FAIR(公平调度)

    FIFO调度:先到先得

    FAIR调度:哪个 Stages 优先被调度,取决于用户在配置文件 fairscheduler.xml 中的定义。

  • Stages 内不同任务之间的调度优先级

    WorkerOffer 封装了 Executor ID、主机地址和 CPU 核数,用来表示一份可用于调度任务的空闲资源。

    当 TaskScheduler 接收到来自 SchedulerBackend 的 WorkerOffer 后,TaskScheduler会优先挑选那些满足本地性级别要求的任务进行分发。本地性级别有 4 种:Process local < Node local < Rack local < Any。
    在这里插入图片描述

    TaskScheduler 接收到 WorkerOffer 之后,优先调度本地性倾向为 PROCESS_LOCAL 的 Task,而 NODE_LOCAL 次之,RACK_LOCAL 为再次,最后是 ANY。

    **Spark 调度系统的核心思想,是“数据不动、代码动”。**在任务调度的过程 中,为了完成分布式计算,Spark 倾向于让数据待在原地、保持不动,而把计算任务(代码)调度、分发到数据所在的地方,从而消除数据分发引入的性能隐患。相比分发 数据,分发代码要轻量得多。

    DAGScheduler 划分 Stages、创建分布式任务的过程中,会为每一个任务指定本地性级别,本地性级别中会记录该任务有意向的计算节点地址,甚至是 Executor 进程 ID。换句话说,任务自带调度意愿,它通过本地性级别告诉 TaskScheduler 自己更乐意被调度到哪里去。

    对于给定的 WorkerOffer,TaskScheduler 是按照任务的本地倾向 性,来遴选出 TaskSet 中适合调度的 Tasks。

    Task 与 RDD 的 partitions 是一一对应的,在创建 Task 的过程中, DAGScheduler 会根据数据分区的物理地址,来为 Task 设置 locs 属性。locs 属性记录了数据分区所在的计算节点、甚至是 Executor 进程 ID。

    举例来说,当调用 textFile API 从 HDFS 文件系统中读取源文件时,Spark 会根据 HDFS NameNode 当中记录的元数据,获取数据分区的存储地址,例如 node0:/rootPath/partition0-replica0,node1:/rootPath/partition0-replica1 和 node2:/rootPath/partition0-replica2。 那么,DAGScheduler 在为该数据分区创建 Task0 的时候,会把这些地址中的计算节点记录到 Task0 的 locs 属性。 当TaskScheduler 需要调度 Task0 这个分布式任务,根据 Task0 的 locs 属性,它就知道:“Task0 所需处理的数据分区,在节点 node0、node1、node2上存有副本,如果 WorkOffer 是来自这 3 个节点的计算资源,那对 Task0 来说就是投其所好”。

    每个任务都是自带本地倾向性的,换句话说,每个任务都有自己的“调度意愿”。

总的来说,TaskScheduler 根据本地性级别遴选出待计算任务之后,先对这些任务进行序列化。然后,交给 SchedulerBackend,SchedulerBackend 根据 ExecutorData 中记录的 RPC 地址和主机地址,再将序列化的任务通过网络分发到目的主机的 Executor 中去。 最后,Executor 接收到任务之后,把任务交由内置的线程池,线程池中的多线程则并发地在不同数据分片之上执行任务中封装的数据处理函数,从而实现分布式计算。

ExecutorBackend

ExecutorBackend 获取到tasks,分发给Executors 线程池,每个CPU线程负责处理一个 Task。
每当 Task 处理完毕,这些线程便会通过 ExecutorBackend,向 Driver 端的 SchedulerBackend 发送StatusUpdate 事件,告知 Task 执行状态。接下来, TaskScheduler 与 SchedulerBackend 通过接力的方式,最终把状态汇报给 DAGScheduler。

计算向数据移动:Spark 调度系统的原则是尽可能地让数据呆在原地、保持不动,同时尽可能地把承载计算任务的代码分发到离数据最近的地方,从而最大限度地降低分布式系统中的网络开销。

Spark 任务调度流程

任务调度分为如下 5 个步骤:

  1. DAGScheduler 以 Shuffle 为边界,将开发者设计的计算图 DAG 拆分为多个执行 Stages,然后为每个 Stage 创建任务集 TaskSet。

  2. SchedulerBackend 通过与 Executors 中的 ExecutorBackend 的交互来实时地获取集群中可用的计算资源,并将这些信息记录到 ExecutorDataMap 数据结构。

  3. 与此同时,SchedulerBackend 根据 ExecutorDataMap 中可用资源创建 WorkerOffer,以 WorkerOffer 为粒度提供计算资源。

  4. 对于给定 WorkerOffer,TaskScheduler 结合 TaskSet 中任务的本地性倾向,按照 PROCESS_LOCAL、NODE_LOCAL、RACK_LOCAL 和 ANY 的顺序,依次对 TaskSet 中的任务进行遍历,优先调度本地性倾向要求苛刻的 Task。

  5. 被选中的 Task 由 TaskScheduler 传递给 SchedulerBackend,再由 SchedulerBackend 分发到 Executors 中的 ExecutorBackend。Executors 接收到 Task 之后,即调用本地线程池来执行分布式任务。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/997079.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Mojo安装使用初体验

一个声称比python块68000倍的语言 蹭个热度&#xff0c;安装试试 系统配置要求&#xff1a; 不支持Windows系统 配置要求: 系统&#xff1a;Ubuntu 20.04/22.04 LTSCPU&#xff1a;x86-64 CPU (with SSE4.2 or newer)内存&#xff1a;8 GiB memoryPython 3.8 - 3.10g or cla…

华为云云耀云服务器L实例评测 | 分分钟完成打地鼠小游戏部署

前言 在上篇文章【华为云云耀云服务器L实例评测 | 快速部署MySQL使用指南】中&#xff0c;我们已经用【华为云云耀云服务器L实例】在命令行窗口内完成了MySQL的部署并简单使用。但是后台有小伙伴跟我留言说&#xff0c;能不能用【华为云云耀云服务器L实例】来实现个简单的小游…

车载诊断数据库——诊断问卷调查表与CDD关联关系

车载诊断数据库——诊断问卷调查表与CDD关联关系 我是穿拖鞋的汉子,魔都中坚持长期主义的汽车电子工程师。 老规矩,分享一段喜欢的文字,避免自己成为高知识低文化的工程师: 没有人关注你。也无需有人关注你。你必须承认自己的价值,你不能站在他人的角度来反对自己。人生…

超级电容-电池-超级电容混合储能系统能量管理simulink仿真建模模型

建立混合储能系统模型 在Simulink中&#xff0c;首先需要建立一个超级电容和蓄电池并联的混合储能系统模型。其中&#xff0c;超级电容和蓄电池的荷电状态&#xff08;SOC&#xff09;需要根据实际情况进行管理。荷电状态可以通过对电池和超级电容的电压、电流等进行测量&…

说透 Nacos 一致性协议

1 Nacos ⼀致性协议 1.1 为什么 Nacos 需要⼀致性协议 Nacos尽可能减少用户部署以及运维成本&#xff0c;做到用户只需要⼀个程序包&#xff0c;就快速单机模式启动 Nacos 或集群模式启动 Nacos。而 Nacos 是⼀个需要存储数据的组件&#xff0c;为实现目标&#xff0c;就要在…

透视俄乌网络战之二:Conti勒索软件集团(上)

透视俄乌网络战之一&#xff1a;数据擦除软件 Conti勒索软件集团&#xff08;上&#xff09; 1. Conti简介2. 组织架构3. 核心成员4. 招募途径5. 工作薪酬6. 未来计划参考 1. Conti简介 Conti于2019年首次被发现&#xff0c;现已成为网络世界中最危险的勒索软件之一&#xff0…

汇川PLC学习Day3:轴控代码编写、用户程序结构说明与任务配置示例、

汇川PLC学习Day3&#xff1a;轴控代码编写、用户程序结构说明、任务配置示例 一、新建轴与轴控代码编写 1. 新建轴 (1)新建一个轴 &#xff08;2&#xff09;将轴名字更新为实际名字 可以后面实例化后再更改&#xff0c;汇川可以在更新名字时同步更新其他编写的代码名字&a…

GStreamer时钟同步

播放复杂媒体时&#xff0c;每个audio和video sample必须在特定时间按特定顺序播放。为此&#xff0c;GStreamer提供了一种同步机制&#xff0c;通过使用 GstClock object、buffer timestamps和SEGMENT event来实现&#xff1a; &#xff08;1&#xff09;GstClock&#xff1a;…

Java中如何获取一个字符串是什么类型

Java中如何获取一个字符串是什么类型&#xff1f; 在Java中&#xff0c;您可以使用一些方法来确定一个字符串的类型。下面是一些常用的方法&#xff1a; 使用正则表达式&#xff1a;您可以使用正则表达式来匹配字符串是否符合特定的模式或格式&#xff0c;以确定其类型。例如&…

【Linux入门指北】Linux磁盘扩容

文章目录 1、给 / 分区扩容 Linux在使用过程中由于数据量不断增大&#xff0c;导致磁盘空间不足&#xff0c;需要增加磁盘空间&#xff0c;主要有以下三种方式: 直接给 / 分区&#xff08;或者某一分区&#xff09;扩容&#xff0c;直接在原有磁盘上增大空间给虚拟机新增一块磁…

typeScript 学习笔记(二)

类接口 TypeScript 入门教程 (xcatliu.com) 十四.类 ① 类 类&#xff1a;定义了一件事物的抽象特点&#xff0c;包含它的属性和方法对象&#xff1a;类的实例&#xff0c;通过new生成面向对象&#xff08;OOP&#xff09;的三大特性&#xff1a;封装、继承、多态封装&…

C++学习笔记(重载、类)

C 1、函数重载2、类2.1、类的方法和属性2.2、类的方法的定义2.3、构造器和析构器2.4、基类与子类2.5、类的public、protected、private继承2.6、类的方法的重载2.7、子类方法的覆盖2.8、继承中的构造函数和析构函数 1、函数重载 函数重载大概可以理解为&#xff0c;定义两个名…

Rethink LSTMGRU

LSTM 设计思想 姑且不看偏置。 W W W 和 U U U 是加权的矩阵&#xff0c;写模型的时候用 nn.Linear(in_dim, out_dim) 就成&#xff1b; σ \sigma σ 是 Sigmoid 函数 第一条&#xff0c;遗忘门&#xff0c;定义为 有多少内容需要被遗忘&#xff1b;第二条&#xff1a;输入门…

ES8生产实践——pod日志采集(Fluentd方案)

Fluentd介绍 Fluentd是一个是一个开源的日志收集和传输工具&#xff0c;旨在解决日志数据的收集、传输和处理问题&#xff0c;它可以收集来自于各种系统或应用的日志&#xff0c;转化为用户指定的格式后&#xff0c;转发到用户所指定的日志存储系统之中。 用图来说明问题的话&…

【安装mysql(基础安装+主从复制)】

由于我的 centos 版本是 aarch64 版本 安装链接&#xff1a; 1、aarch64 版本 linux 系统安装 mysql 2、安装完成之后是不能用 navicat 进行直接访问的&#xff0c;需要如下设置&#xff1a; mysql -uroot -proot&#xff08;明文登陆&#xff0c;记得 -uroot 和 -proot之间…

Redis 基础总结

1、NoSQL概述 1.1 数据库分类 目前数据库分&#xff1a;关系型数据库与非关系型数据库 常用的关系型数据库&#xff1a; Oracle&#xff0c;MySQL&#xff0c;SqlServer&#xff0c;DB2 常用的非关系数据库&#xff1a;Redis&#xff0c;MongoDB&#xff0c;ElasticSearch&…

MIT 6.S081学习笔记(第一章)

〇、前言 本章主要是关于实验环境的搭建和完成 LAB UTIL。 平台&#xff1a;阿里云 Ubuntu20.04VScode on macOS&#xff08;M1 Apple Silicon&#xff09;。 一、环境搭建 1、QEMU QEMU&#xff08;quick emulator&#xff09;是一款由法布里斯贝拉&#xff08;Fabrice Bel…

C++中多态的底层实现

1.先来看一波比较容易出错的题 会打印出来什么&#xff1f; 其实打印出来的是B->1;为什么呢&#xff1f;看我如何讲解的。 2.思考为什么只有引用或则指针才能触发多态 结论&#xff1a;子类赋值给父类对象切片&#xff0c;不会拷贝虚标 我听老师上面的解释是&#xff1a;如…

敏捷工具敏捷项目管理实践管理

​Scrum是目前运用最为广泛的敏捷开发方法&#xff0c;是一个轻量级的项目管理和产品研发管理框架&#xff0c;旨在最短时间内交付最大价值。 Leangoo领歌是一款永久免费的专业敏捷研发管理工具&#xff0c;提供敏捷研发解决方案&#xff0c;解决研发痛点&#xff0c;打造成功…