Spark_Spark中 Stage, Job 划分依据 , Job, Stage, Task 高阶知识

news2025/1/8 4:57:18

上一篇文章中 : Spark_Spark 中 Stage, Job 划分依据 , Job, Stage, Task 基础知识_spark stage job_高达一号的博客-CSDN博客

主要解读了Stage, job, task 的划分标准,这篇文章将对这些信息进行进一步解读。

一. Job、Stage、Task的概念


在讲Spark的任务的划分和调度之前,需要明确Spark中Job、Stage、Task的概念。

  • Job:Spark中的算子分为转换(transformation)算子和行动(action)算子,一个action就会触发一个Job。
  • Stage:一个Job分为一个或者多个Stage,Stage以RDD宽依赖(也就是shuffle)为界,shuffle前后的RDD属于不同的Stage,Stage的数量等于shuffle操作的数量+1,如图

  •  Task:一个Stage包含一个或者多个Task,一个Stage包含的Task的数量等于这个Stage最后一个RDD的partition的数量。Task中包含了这个计算任务的计算逻辑以及数据位置等信息,Task是Executor执行任务的最小单位。

二. Spark任务执行的流程


在上一篇文章中,我们讲解了Spark提交任务的整个流程(Spark on Yarn提交任务过程)。以yarn cluster模式提交一个Spark任务之后,会依次做以下几件事情:

  1. 启动ApplicationMaster
  2. ApplicationMaster会启动Driver线程
  3. Driver线程进行SparkContext的初始化,SparkContext中有三个重要的组件:DAGScheduler,TaskScheduler,SchedulerBackend
  4. ApplicationMaster向yarn ResourceManager申请Container资源,申请成功后启动Executor,Executor会向Driver反向注册
  5. action行动算子触发,向Driver线程提交一个Job
  6. Job执行完毕,Spark程序执行完成

这篇文章主要就是讲解第5步。action行动算子触发后,会生成一个Job,然后向Driver提交,整个过程如何呢?

1. DAGScheduler,TaskScheduler,SchedulerBackend


DAGScheduler,TaskScheduler,SchedulerBackend是Driver中三个非常重要的组件,他们的作用如下:

  • DAGScheduler:根据RDD的依赖关系,将Job划分为一个或多个Stage,每个Stage会依据最后一个RDD的partition的数量生成一个或多个Task,同一Stage的Task属于同一TaskSet(任务集),DAGScheduler向TaskScheduler提交任务是以TaskSet为单位
  • TaskScheduler:接收来自DAGScheduler提交的TaskSet,向Executor分发Task
  • SchedulerBackend:TaskScheduler与Executor进行RPC通信的后台
     

 

2. Job提交的流程


RDD经过一系列transformation算子,形成RDD的血缘关系图,并得到ResultRDD。ResultRDD提交给DAGScheduler,DAGScheduler能通过ResultRDD得到所有RDD的依赖关系(DAG图),并依据DAG图将Job划分得到一个或多个Stage,每一个Stage会形成一个TaskSet,DAGScheduler会依次向TaskScheduler提交这些TaskSet。TaskScheduler负责Task级别的调度,调度过程由SchedulerBackend向TaskScheduler返回可用的Executor列表,TaskScheduler依据一定的策略从TaskPool中取出TaskSet,然后将TaskSet中的Task分发给Executor执行,分发Task的命令同样由SchedulerBackend通过RPC向Executor传达。各个模块之间的交互如图所示:

 

parititon和task的关系

来源 :spark task partition 并行度 线程 stage等的关系

Task是Spark中最新的执行单元。RDD一般是带有partitions的,每个partition的在一个executor上的执行可以任务是一个Task。
每个Task执行的结果就是生成了目标RDD的一个partiton。
每个Executor由若干core组成,每个Executor的每个core一次只能执行一个Task

三. DAGScheduler


DAGScheduler的runJob方法用来对RDD的行动算子生成一个Job,并对Job进行调度和提交,整个过程如下:

  1. 为这个Job生成一个JobId,这个JobId是全局唯一的。
  2. 生成finalStage,finalStage是RDD的DAG图中最后一个Stage。
  3. 依据DAG图,从finalStage一级一级向上找他的parent stage,直到第一级的Stage,第一级的Stage没有parent stage,即他不依赖其他的Stage。
  4. 将第一级Stage生成为一个TaskSet,TaskSet包含了一系列Task,每一个Task就是这个Stage的RDD的一个partiton的计算任务,Task中包含了这个计算任务的计算逻辑以及partiton的数据位置等信息。
  5. 将这个TaskSet提交给TaskScheduler。
  6. TaskScheduler执行完TaskSet中的所有Task,会通知DAGScheduler,DAGScheduler对这个TaskSet对应的Stage的childStage执行第4步,如此往复循环,直到执行完finalStage,整个Job就算完成了。
     

 

四. TaskScheduler


TaskScheduler接收来自DAGScheduler提交的TaskSet,并放入任务池(Pool),通过一定的策略不断地从Pool中取出TaskSet,然后将TaskSet中的Task分发给Executor执行。整个过程如下:

  1. 将TaskSet封装为一个TaskSetManager
  2. 将TaskSetManager加入到Pool中
  3. TaskScheduler向SchedulerBackend发出一条ReviveOffers命令
  4. SchedulerBackend接收到命令,向TaskScheduler返回可用的Executor列表以及这些Executor的相关的信息
  5. TaskScheduler按照一定的策略和任务优先级从Pool中依次取出TaskSetManager,然后将TaskSetManager中的Task分发给Executor,分发的原则是尽可能地将Task均匀地分发给Executor,同时会考虑节点本地性
  6. TaskScheduler将Task->Executor的映射信息打包成TaskDescription发送给SchedulerBackend。
  7. SchedulerBackend根据TaskDescription将每个Task分发到其对应的Executor
  8. Executor执行完分发给他的Task,通知TaskScheduler
  9. 如果TaskScheduler发现TaskSet中的所有Task都已经完成了,则会通知DAGScheduler,然后DAGScheduler继续向TaskScheduler提交下一个TaskSet

 

五. TaskScheduler的调度TaskSet和分配Task的原理


1. TaskSet的调度


TaskScheduler内部有两个调度算法:FIFO(先进先出算法)和FAIR(公平调度算法),默认是使用FIFO算法,也就是哪个TaskSet先提交,哪个TaskSet的优先级就越高。而FAIR算法则会综合考虑TaskSet的Task数量以及Task运行所需要的资源,总的来说就是,Task数越少、Task运行所需资源越少的TaskSet优先级越高。

2. Task的分配


Task的分配指的是将TaskSet中的哪个Task分配给哪个Executor,依据的原则主要是Task的节点本地性(TaskLocality)。

2.1 什么是节点本地性

Task内部有一个成员变量:
 

def preferredLocations: Seq[TaskLocation] = Nil

 preferredLocations表明了这个Task的位置偏好,这个变量的值是根据Task的数据的位置得到的,可以是一个hostName或者execotorId。例如,如果Task的数据是在192.168.5.101和192.168.5.102这两台机器上,那么:

preferredLocations=["192.168.5.101","192.168.5.102"]

然后,TaskSetManager内部有这么几个变量,用来保存他的所有的Task的节点偏好:

  // Set of pending tasks for each executor.
  val forExecutor = new HashMap[String, ArrayBuffer[Int]]
  // Set of pending tasks for each host. Similar to pendingTasksForExecutor, but at host level.
  val forHost = new HashMap[String, ArrayBuffer[Int]]
  // Set containing pending tasks with no locality preferences.
  val noPrefs = new ArrayBuffer[Int]
  // Set of pending tasks for each rack -- similar to the above.
  val forRack = new HashMap[String, ArrayBuffer[Int]]
  // Set containing all pending tasks (also used as a stack, as above).
  val all = new ArrayBuffer[Int]
  • forExecutor是一个HashMap,key为executorId,value是preferredLocations为这个executor的所有task的taskId
  • forHost是一个HashMap,key为hostName,value是preferredLocations为这个host的所有task的taskId
  • noPrefs是一个Array,保存了所有无任何preferredLocations的task的taskId
  • forRack是一个HashMap,key为rackName,value是preferredLocations为这个rack的所有task的taskId
  • all是一个Array,保存了所有task的taskId,可以理解为是上面4个集合的taskId的并集

TaskSetManager中还有一个重要的成员变量:

private[scheduler] var myLocalityLevels:Array[TaskLocality.TaskLocality]

这个变量保存了TaskSetManager的本地性级别(locality levels),这个变量是根据上述5个集合是否为空来确定的:

//伪码:
myLocalityLevels=new Array[TaskLocality.TaskLocality]
if forExecutor.isNotEmpty:
	myLocalityLevels += PROCESS_LOCAL
if forHost.isNotEmpty:
	myLocalityLevels += NODE_LOCAL
if noPrefs.isNotEmpty:
	myLocalityLevels += NO_PREF
if forRack.isNotEmpty:
	myLocalityLevels += RACK_LOCAL
myLocalityLevels += Any

举个例子:
TaskSetManager中有3个Task,他们的preferredLocations分别为:

task1:preferredLocations=["192.168.5.101"]  //task1的数据在192.168.5.101这台机器上
task2:preferredLocations=["executor 1"]     //task2的数据在executor 1这个executor上
task3:preferredLocations=["192.168.5.102"]  //task3的数据在192.168.5.102这台机器上

那么5个集合的值为:

forExecutor:["executor 1"->[2]]  //task2的preferredLocations为executor 1
forHost:["192.168.5.101"->[1],"192.168.5.102"->[3]] //task1的preferredLocations为"192.168.5.101",task3的preferredLocations为"192.168.5.102"
noPrefs:None
forRack:None
all:[1,2,3] //一共有三个task,task1,task2和task3

那么myLocalityLevels的值为:

myLocalityLevels = [PROCESS_LOCAL,NODE_LOCAL,ANY]

2.2 如何按照preferredLocations来分配Task


TaskScheduler为Executor分配Task时,会遍历myLocalityLevels,依次按照PROCESS_LOCAL、NODE_LOCAL、NO_PREF、RACK_LOCAL、Any的顺序来为Executor分配Task,如果某个Executor能找到符合当前TaskLocality的Task,那么就把这个Task分配给这个Executor。仍然以上面那个例子为例,TaskSetManager的5个集合以及myLocalityLevels为:
 

forExecutor:["executor 1"->[2]]  //task2的preferredLocations为executor 1
forHost:["192.168.5.101"->[1],"192.168.5.102"->[3]] //task1的preferredLocations为"192.168.5.101",task3的preferredLocations为"192.168.5.102"
noPrefs:None
forRack:None
all:[1,2,3] //一共有三个task,task1,task2和task3
myLocalityLevels = [PROCESS_LOCAL,NODE_LOCAL,ANY]

Executor及其所在的及其的hostname为:

"executor 1" -> "192.168.5.101" 
"executor 2" -> "192.168.5.102"
"executor 3" -> "192.168.5.103"
  1. 当前TaskLocality为PROCESS_LOCAL,遍历所有的Executor,发现 “executor 1” 在forExecutor的keySet中,并且对应的value为[2],因此将task2分配为“executor 1”
  2. 当前TaskLocality为NODE_LOCAL,遍历所有的Executor,发现 “executor 1” 所在的host为 “192.168.5.101”,且 “192.168.5.101” 在forHost的keySet中,对应的value为[1],因此将task1分配给“executor 1”;发现"executor 2" 所在的host为 “192.168.5.102”,且 “192.168.5.102” 在forHost的keySet中,对应的value为[3],因此将task3分配给“executor 2”
  3. 当前TaskLocality为ANY,但所有Task都已经分配完了,因此不进行分配
  4. 分配结束

用伪码表示这个过程:

for currentLocality <- taskSetManager.myLocalityLevels:
	for executor <- allExecutors:
		if executor in currentLocality对应的集合:
			task = 集合中executor对应的value
			if task未分配:
				将task分配给executor

2.3 一个小坑


在Standalone模式下,如果Task的数据源是HDFS,那么Task在计算他的preferredLocations时,计算出的是这个Task的数据所在机器的hostname,例如,task1的数据在HDFS集群的192.168.5.101这个节点上有一个副本,而192.168.5.101这台服务器的hostname为“hadoop1”那么他的preferredLocations为:
 

preferredLocations=["hadoop1"]

然而,Spark计算某个Executor所在的节点时,默认情况下使用的是这个节点的IP地址,如果executor 1所在的host为192.168.5.101,那么在遍历forHost集合时,会认为“192.168.5.101”不在forHost的keySet中(因为forHost的keySet保存的是服务器的hostname,而不是ip地址),而实际上“192.168.5.101”和"hadoop1"是同一台服务器,因此在按照TaskLocality分配Task的过程中,可能不会正确地将task1分配给executor 1。
那么怎么解决这个问题呢?只需要在每个节点的spark-env.sh配置文件中显示地指定这个节点的hostname:
 

export SPARK_LOCAL_HOSTNAME=*hostname of this node*

注意,这个坑只在Standalone模式下才会出现,yarn模式下是不会出现的。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/880080.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

.netcore grpc服务端流方法详解

一、服务端流式处理概述 客户端向服务端发送请求&#xff0c;服务端可以将多个消息流式传输回调用方和客户端流相反&#xff0c;客户端流发出请求&#xff0c;服务端可以传输一批消息给客户端&#xff0c;直至本次请求响应完全结束。针对文件分段传输下载&#xff0c;该方式非…

ssm基于Java ssm的校园驿站管理系统源码和论文

ssm基于Java ssm的校园驿站管理系统源码和论文016 开发工具&#xff1a;idea 数据库mysql5.7 数据库链接工具&#xff1a;navcat,小海豚等 技术&#xff1a;ssm 摘 要 互联网发展至今&#xff0c;无论是其理论还是技术都已经成熟&#xff0c;而且它广泛参与在社会中的方…

铁是地球科学争论的核心

一项新的研究调查了地球内部铁的形态。这些发现对理解内核的结构产生了影响。 一项新的研究探索了地球内核的铁结构&#xff0c;如图中的黄色和白色所示。 资料来源&#xff1a;地球物理研究快报 地球内核以铁为主&#xff0c;铁可以多种晶体形式作为固体材料存在。&#xff08…

K8S系列三:单服务部署

写在前面 本文是K8S系列第三篇&#xff0c;主要面向对K8S新手同学&#xff0c;阅读本文需要读者对K8S的基本概念&#xff0c;比如Pod、Deployment、Service、Namespace等基础概念有所了解。尚且不熟悉的同学推荐先阅读本系列的第一篇文章《K8S系列一&#xff1a;概念入门》[1]…

如何读取文件夹内的诸多文件,并选择性的保留部分文件

目录 问题描述: 问题解决: 问题描述: 当前有一个二级文件夹,第一层是文件夹名称是“Papers(LNAI14302-14304)",第二级文件夹目录名称如下图蓝色部分所示。第三层为存放的文件,如下下图所示,每一个文件中,均存放三个文件,分别为copyright.pdf, submission.pdf, s…

【CSS】禁用元素鼠标事件(例如实现元素禁用效果)

文章目录 基本用法 基本用法 pointer-events 属性指定在什么情况下 (如果有) 某个特定的图形元素可以成为鼠标事件。实际运用中可以通过对auto 和none动态控制&#xff0c;来动态实现元素的禁用效果。 属性描述auto与pointer-events属性未指定时的表现效果相同&#xff0c;对…

对方发送的文件已过期如何恢复,这样做很简单

我们常常使用微信来发送文件、传输文件&#xff0c;但很多人也会遇到文件过期的情况。每当发现文件已过期&#xff0c;都会懊恼自己当初为什么没有早点下载保存。 大家要知道&#xff0c;微信文件如果7天内没有及时下载是会被清理的。不过&#xff0c;大家不要着急&#xff0c…

励志长篇小说《周兴和》书连载之十八 内外交困搞发明

内外交困搞发明 路灯发出昏黄而惺忪的光影。 周兴和疲惫地从车间出来&#xff0c;拖着沉重的腿爬上几级石阶&#xff0c;准备回到家里去。可走到家门口&#xff0c;他想了想&#xff0c;又折了回去&#xff0c;在车间的一条长条椅子上&#xff0c;他用一块试验用的废料当枕头&…

这些款式多样的运动式蓝牙耳机哪种好?看完你就懂了

正所谓运动式蓝牙耳机是专为运动而生的&#xff0c;运动时戴上耳机&#xff0c;再来点动感、或舒缓的音乐&#xff0c;提高我们运动的效率。运动式耳机比普通的蓝牙耳机更加的适合在运动中使用&#xff0c;而纵观当下耳机市场&#xff0c;运动式的蓝牙耳机众多&#xff0c;各类…

​比特丛林用量子纠缠对抗高智商犯罪

世界上没有绝对完美的犯罪&#xff0c;但是预谋和统筹良久的高智商犯罪都几乎接近于完美和无比烧脑。 警局的洽谈室&#xff0c;只有我和嫌疑人两个人。 各自坐在桌子两边&#xff0c;门已关。在这个封闭的空间里&#xff0c;我一手拿着筷子吃着盒饭&#xff0c;一边撇了一下…

MounRiver 从模板中抽取自定义自己工程

MounRiver 序言准备依赖资源工程历程建立自己工程 步骤一 资源链接步骤二步骤三 包含汇编路径步骤四 添加源文件路径步骤五 添加链接文件步骤六社区版 添加编译启动文件![请添加图片描述](https://img-blog.csdnimg.cn/10969073d7f341abafad8232cab3c16b.jpeg)专业版 序言 准…

问AI一个严肃的问题

chatgpt的问世再一次掀起了AI的浪潮&#xff0c;其实我一直在想&#xff0c;AI和人类的关系未来会怎样发展&#xff0c;我们未来会怎样和AI相处&#xff0c;AI真的会完全取代人类吗&#xff0c;带着这个问题&#xff0c;我问了下chatgpt&#xff0c;看一看它是怎么看待这个问题…

spring-boot-maven-plugin插件详解

一、 为什么Spring Boot项目自带这个插件 当我们在SpringBoot官方下载一个脚手架时,会发现pom.xml会自带spring-boot-maven-plugin插件 那为什么会自带这个插件呢&#xff1f; 我们知道Spring Boot项目&#xff0c;是可以通过java -jar 包名启动的 打包命令 mvn clean pac…

冠达管理:“股债汇”三杀!总统大选结果意外,这国汇率暴跌两成

当地时间8月14日&#xff0c;美股三大股指全线收涨&#xff0c;纳指涨超1%。本周商场重视多家重要零售商的财报与7月零售出售数据。 大型科技股大都上涨&#xff0c;英伟达低开高走涨7.09%&#xff0c;创5月26日以来最大单日涨幅&#xff0c;早盘总市值一度跌破万亿美元。摩根士…

QQ附近人引流的几个详细方法,qq附近人引流脚本实操演示教程

大家好我是你们的小编一辞脚本&#xff0c;今天给大家分享新的知识&#xff0c;很开心可以在CSDN平台分享知识给大家,很多伙伴看不到代码我先录制一下视频 在给大家做代码&#xff0c;给大家分享一下qq引流脚本的知识和视频演示 不懂的小伙伴可以认真看一下&#xff0c;我们一…

idea常见错误大全之:解决全局搜索失效+搜索条件失效(条件为空)+F8失灵

问题一&#xff1a;全局搜索快捷键ctrlshiftf 突然失灵了&#xff0c;键盘敲烂了 都没反应&#xff0c;这是为什么呢&#xff1f; 肯定不是idea本身的原因&#xff0c;那么就是其它外在因素影响到了idea的快捷键&#xff0c;那么其它的快捷键为什么没失效呢&#xff0c;原因只有…

煜邦转债,华设转债,兴瑞转债,神通转债上市价格预测

煜邦转债 基本信息 转债名称&#xff1a;煜邦转债&#xff0c;评级&#xff1a;A&#xff0c;发行规模&#xff1a;4.10806亿元。 正股名称&#xff1a;煜邦电力&#xff0c;今日收盘价&#xff1a;8.82元&#xff0c;转股价格&#xff1a;10.12元。 当前转股价值 转债面值 / …

nodejs+vue+elementui医院电子病历管理系统5a4x5

此系统任何人都可以使用&#xff0c;哪怕对代码完全不懂&#xff0c;只会电脑的基础操作并且安装这几款软件就可以对本系统进行操作&#xff0c;实现了人员使用方面的自由&#xff0c;不必有过多的限制。 语言 node.js 框架&#xff1a;Express 前端:Vue.js 数据库&#xff1a;…

安卓快速开发

1.环境搭建 Android Studio下载网页&#xff1a;https://developer.android.google.cn/studio/index.html 第一次新建工程需要等待很长时间&#xff0c;新建一个Empty Views Activity 项目&#xff0c;右上角选择要运行的机器&#xff0c;运行就安装上去了(打开USB调试)。 2…

冠达管理:融券卖出交易规则?

融券卖出买卖是指投资者在没有实际持有某只股票的情况下&#xff0c;经过借入该股票并卖出来取得赢利的一种股票买卖方式。融券卖出买卖规矩针对不同市场、不同证券公司之间可能会存在一些差异&#xff0c;但基本的规矩包含如下几个方面。 一、融资融券的资历要求 在进行融券卖…