Spark 5:Spark Core 内核调度

news2025/1/15 7:55:39

DAG
Spark的核心是根据RDD来实现的,Spark Scheduler则为Spark核心实现的重要一环,其作用就是任务调度。Spark的任务调度就是如何组织任务去处理RDD中每个分区的数据,根据RDD的依赖关系构建DAG,基于DAG划分Stage,将每个Stage中的任务发到指定节点运行。基于Spark的任务调度原理,可以合理规划资源利用,做到尽可能用最少的资源高效地完成任务计算。

 

f7522e1437864665a29ee5b880091a0f.png

43922b7dbf7e443d9bc027b8eb98bbdb.png

15779273999d4f66a9a20633f74057fe.png

5be8f9f537934951bb8ddaa4c243170d.png

8ee312eea62148df974f4223c628377e.png

a78adb3bf058417a99ed043987074ec8.png

68755757d7cc41dcab9f3efcd522d77a.png

DAG的宽窄依赖和阶段划分

2c79eedc54fb4eaa97ede39c43457c43.png

ba46cf039429404f84128b3eace7a521.png

e3c2e36941f4465fa804d15984a10f7e.png

3cc03cdbb5744299a3d61f81ff26d6ac.png

内存迭代计算

17731734dc2c4fbe84affbc2c902e14a.png

caace358e9b149fb8bca5655a6b00275.png

17cf980bd4fe417b9768cb6b691279a6.png

Spark 并行度

8c564b4080754fbf822a3b0feec1d360.png

e6fd1101c29a4786be19c7a9b6add2f0.png

d3e9484e08034b05a334567dde8c06e9.png

e04a272b65b446cbab955c4a6b499c3f.png

d4a5c2e4674041699fe3bc38e5cae53d.png

b78cf6d47dc24dd1be337fdb18b80dc7.png

c1d7340c955340119699f5413fc7e1fd.png

c4a9c8b512d84614983c9e260698d751.png

Spark运行中的概念名词大全

b7f1bd1529f84502bc105c8f2c306f6c.png

d2e0d281631c4000ac291aa7394562d1.png

Spark Shuffle

首先回顾MapReduce框架中Shuffle过程,整体流程图如下:

aa181b1ab2994ebaa7bba6cb06c2964e.png

Spark在DAG调度阶段会将一个Job划分为多个Stage,上游Stage做map工作,下游Stage做reduce工作,其本质上还是MapReduce计算框架。Shuffle是连接map和reduce之间的桥梁,它将map的输出对应到reduce输入中,涉及到序列化反序列化、跨节点网络IO以及磁盘读写IO等。

3c0878de8d4d4230a45beb5242429f25.png

Spark的Shuffle分为Write和Read两个阶段,分属于两个不同的Stage,前者是Parent Stage的最后一步,后者是Child Stage的第一步。

执行Shuffle的主体是Stage中的并发任务,这些任务分ShuffleMapTask和ResultTask两种,ShuffleMapTask要进行Shuffle,ResultTask负责返回计算结果,一个Job中只有最后的Stage采用ResultTask,其他的均为ShuffleMapTask。如果要按照map端和reduce端来分析的话,ShuffleMapTask可以即是map端任务,又是reduce端任务,因为Spark中的Shuffle是可以串行的;ResultTask则只能充当reduce端任务的角色。

d6e463cbc7be468aa5d1b17d9ac07d79.png

Spark在1.1以前的版本一直是采用Hash Shuffle的实现的方式,到1.1版本时参考Hadoop MapReduce的实现开始引入Sort Shuffle,在1.5版本时开始Tungsten钨丝计划,引入UnSafe Shuffle优化内存及CPU的使用,在1.6中将Tungsten统一到Sort Shuffle中,实现自我感知选择最佳Shuffle方式,到的2.0版本,Hash Shuffle已被删除,所有Shuffle方式全部统一到Sort Shuffle一个实现中。

在Spark的中,负责shuffle过程的执行、计算和处理的组件主要就是ShuffleManager,也即shuffle管理器。ShuffleManager随着Spark的发展有两种实现的方式,分别为HashShuffleManager和SortShuffleManager,因此spark的Shuffle有Hash Shuffle和Sort Shuffle两种。
在Spark 1.2以前,默认的shuffle计算引擎是HashShuffleManager。该ShuffleManagerHashShuffleManager有着一个非常严重的弊端,就是会产生大量的中间磁盘文件,进而由大量的磁盘IO操作影响了性能。
因此在Spark 1.2以后的版本中,默认的ShuffleManager改成了SortShuffleManager。SortShuffleManager相较于HashShuffleManager来说,有了一定的改进。主要就在于,每个Task在进行shuffle操作时,虽然也会产生较多的临时磁盘文件,但是最后会将所有的临时文件合并(merge)成一个磁盘文件,因此每个Task就只有一个磁盘文件。在下一个stage的shuffle read task拉取自己的数据时,只要根据索引读取每个磁盘文件中的部分数据即可。

Hash Shuffle
Shuffle阶段划分:
shuffle write:mapper阶段,上一个stage得到最后的结果写出
shuffle read :reduce阶段,下一个stage拉取上一个stage进行合并

1)未经优化的hashShuffleManager:
HashShuffle是根据task的计算结果的key值的hashcode%ReduceTask来决定放入哪一个区分,这样保证相同的数据一定放入一个分区,Hash Shuffle过程如下:

d89dc7cdb26342ae8227f30882a02011.png

根据下游的task决定生成几个文件,先生成缓冲区文件在写入磁盘文件,再将block文件进行合并。
未经优化的shuffle write操作所产生的磁盘文件的数量是极其惊人的。提出如下解决方案
2)经过优化的hashShuffleManager:
在shuffle write过程中,task就不是为下游stage的每个task创建一个磁盘文件了。此时会出现shuffleFileGroup的概念,每个shuffleFileGroup会对应一批磁盘文件,每一个Group磁盘文件的数量与下游stage的task数量是相同的。

a7dee6741c35492d85185db81994d402.png

未经优化:
上游的task数量:m
下游的task数量:n
上游的executor数量:k (m>=k)
总共的磁盘文件:m*n
优化之后的:
上游的task数量:m
下游的task数量:n
上游的executor数量:k (m>=k)
总共的磁盘文件:k*n

Sort Shuffle Manager
SortShuffleManager的运行机制主要分成两种,一种是普通运行机制,另一种是bypass运行机制。当shuffle write task的数量小于等于spark.shuffle.sort.bypassMergeThreshold参数的值时(默认为200),就会启用bypass机制。

ea5e1980f28641b5be8975e868736c97.png

(1)该模式下,数据会先写入一个内存数据结构中(默认5M),此时根据不同的shuffle算子,可能选用不同的数据结构。如果是reduceByKey这种聚合类的shuffle算子,那么会选用Map数据结构,一边通过Map进行聚合,一边写入内存;如果是join这种普通的shuffle算子,那么会选用Array数据结构,直接写入内存。
(2)接着,每写一条数据进入内存数据结构之后,就会判断一下,是否达到了某个临界阈值。如果达到临界阈值的话,那么就会尝试将内存数据结构中的数据溢写到磁盘,然后清空内存数据结构。
(3)排序:在溢写到磁盘文件之前,会先根据key对内存数据结构中已有的数据进行排序。
(4)溢写:排序过后,会分批将数据写入磁盘文件。默认的batch数量是10000条,也就是说,排序好的数据,会以每批1万条数据的形式分批写入磁盘文件。
(5)merge:一个task将所有数据写入内存数据结构的过程中,会发生多次磁盘溢写操作,也就会产生多个临时文件。最后会将之前所有的临时磁盘文件都进行合并成1个磁盘文件,这就是merge过程。
由于一个task就只对应一个磁盘文件,也就意味着该task为Reduce端的stage的task准备的数据都在这一个文件中,因此还会单独写一份索引文件,其中标识了下游各个task的数据在文件中的start offset与end offset。

Sort Shuffle bypass机制
bypass运行机制的触发条件如下:
1)shuffle map task数量小于spark.shuffle.sort.bypassMergeThreshold=200参数的值。
2)不是map combine聚合的shuffle算子(比如reduceByKey有map combie)。

28024cc33f1440ceb4e817cee1f55a6e.png

此时task会为每个reduce端的task都创建一个临时磁盘文件,并将数据按key进行hash,然后根据key的hash值,将key写入对应的磁盘文件之中。当然,写入磁盘文件时也是先写入内存缓冲,缓冲写满之后再溢写到磁盘文件的。最后,同样会将所有临时磁盘文件都合并成一个磁盘文件,并创建一个单独的索引文件。
该过程的磁盘写机制其实跟未经优化的HashShuffleManager是一模一样的,因为都要创建数量惊人的磁盘文件,只是在最后会做一个磁盘文件的合并而已。因此少量的最终磁盘文件,也让该机制相对未经优化的HashShuffleManager来说,shuffle read的性能会更好。
而该机制与普通SortShuffleManager运行机制的不同在于:
第一,磁盘写机制不同;
第二,不会进行排序。也就是说,启用该机制的最大好处在于,shuffle write过程中,不需要进行数据的排序操作,也就节省掉了这部分的性能开销。

总结:
SortShuffle也分为普通机制和bypass机制。
普通机制在内存数据结构(默认为5M)完成排序,会产生2M个磁盘小文件。
而当shuffle map task数量小于spark.shuffle.sort.bypassMergeThreshold参数的值。或者算子不是聚合类的shuffle算子(比如reduceByKey)的时候会触发SortShuffle的bypass机制,SortShuffle的bypass机制不会进行排序,极大的提高了其性能。

Shuffle的配置选项
Shuffle阶段划分:
shuffle write:mapper阶段,上一个stage得到最后的结果写出
shuffle read :reduce阶段,下一个stage拉取上一个stage进行合并

Shuffle的配置选项:

spark 的shuffle调优:主要是调整缓冲的大小,拉取次数重试重试次数与等待时间,内存比例分配,是否进行排序操作等等
spark.shuffle.file.buffer
参数说明:该参数用于设置shuffle write task的BufferedOutputStream的buffer缓冲大小(默认是32K)。将数据写到磁盘文件之前,会先写入buffer缓冲中,待缓冲写满之后,才会溢写
到磁盘。
调优建议:如果作业可用的内存资源较为充足的话,可以适当增加这个参数的大小(比如64k),从而减少shuffle write过程中溢写磁盘文件的次数,也就可以减少磁盘IO次数,进而提升性能。在实践中发现,合理调节该参数,性能会有1%~5%的提升。
spark.reducer.maxSizeInFlight:
参数说明:该参数用于设置shuffle read task的buffer缓冲大小,而这个buffer缓冲决定了每次能够拉取多少数据。(默认48M)
调优建议:如果作业可用的内存资源较为充足的话,可以适当增加这个参数的大小(比如96m),从而减少拉取数据的次数,也就可以减少网络传输的次数,进而提升性能。在实践中发现
,合理调节该参数,性能会有1%~5%的提升。
spark.shuffle.io.maxRetries and spark.shuffle.io.retryWait:
spark.shuffle.io.maxRetries :shuffle read task从shuffle write task所在节点拉取属于自己的数据时,如果因为网络异常导致拉取失败,是会自动进行重试的。该参数就代表了可以重试
的最大次数。(默认是3次)
spark.shuffle.io.retryWait:该参数代表了每次重试拉取数据的等待间隔。(默认为5s)
调优建议:一般的调优都是将重试次数调高,不调整时间间隔。
spark.shuffle.memoryFraction:
参数说明:该参数代表了Executor内存中,分配给shuffle read task进行聚合操作内存比例。
spark.shuffle.manager
参数说明:该参数用于设置shufflemanager的类型(默认为sort).Spark1.5x以后有三个可选项:
Hash:spark1.x版本的默认值,HashShuffleManager
Sort:spark2.x版本的默认值,普通机制,当shuffle read task 的数量小于等于spark.shuffle.sort.bypassMergeThreshold参数,自动开启bypass 机制
spark.shuffle.sort.bypassMergeThreshold
参数说明:当ShuffleManager为SortShuffleManager时,如果shuffle read task的数量小于这个阈值(默认是200),则shuffle write过程中不会进行排序操作。
调优建议:当你使用SortShuffleManager时,如果的确不需要排序操作,那么建议将这个参数调大一些

DAG是什么有什么用? 
DAG有向无环图, 用以描述任务执行流程,主要作用是协助DAG调度器构建Task分配用以做任务管理
内存迭代\阶段划分?
基于DAG的宽窄依赖划分阶段,阶段内部都是窄依赖可以构建内存迭代的管道
DAG调度器是?
构建Task分配用以做任务管理

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/814909.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

跨境电商ERP源码选型指南,如何找到最适合你的?

在跨境电商行业,一个高效的ERP系统是保证业务顺利进行和管理的关键。选择适合自己的跨境电商ERP源码至关重要。本指南将帮助你了解如何找到最适合你的跨境电商ERP源码。 跨境电商ERP源码的重要性 跨境电商ERP源码在现代电商营运中起着至关重要的作用。它提供了一套…

音频开发-小程序和H5

微信录音 1、引入sdk 2、录音操作 浏览器录音 参考文献:前端H5实现调用麦克风,录音功能_h5 录音_Darker丨峰神的博客-CSDN博客 function record() { window.navigator.mediaDevices.getUserMedia({ audio: { sampleRate: 44100, // 采样率 channelCount…

游戏APP开发:创新设计的秘诀

在游戏 APP开发中,创新设计是游戏开发公司的一大追求,为了可以为用户带来更好的游戏体验,这就需要对游戏 APP开发进行创新设计。那么,游戏 APP开发中的创新设计是什么呢?接下来,我们就一起来看看吧。 想要…

一起学算法(递推篇)

前言:递推最通俗的理解就是数列,递推和数列的关系就好比算法和数据结构的关系,数列有点像数据结构中的顺序表,而递推就是一个循环或者迭代的过程的枚举过程 1.斐波那契数列 斐波那契数形成的序列称为斐波那契数列,该…

【Java|golang】143. 重排链表---快慢指针

给定一个单链表 L 的头节点 head ,单链表 L 表示为: L0 → L1 → … → Ln - 1 → Ln 请将其重新排列后变为: L0 → Ln → L1 → Ln - 1 → L2 → Ln - 2 → … 不能只是单纯的改变节点内部的值,而是需要实际的进行节点交换。 …

python中有哪些异常,怎么处理

目录 python报的错误怎么处理 1. 使用 try-except 语句块 2. 使用 finally 语句块 3. 主动引发异常 python中有哪些异常 不知道是什么异常时怎么操作 总结 python报的错误怎么处理 在Python中,当程序执行时遇到错误,Python会抛出异常。要处理Pyt…

孩子近视有必要用全光谱灯吗?全光谱led灯推荐

当然,有必要!全光谱LED灯的光源分布更加均匀,使空间更加美观舒适,而普通灯的光源分布可能会在一定范围内分布不均匀。全光谱它的使用寿命长达20-30万小时,而普通灯的使用寿命仅为1000-2000小时,因此在长期使用上&#…

list模拟

之前模拟了string,vector,再到现在的list,list的迭代器封装最让我影响深刻。本次模拟的list是双向带头节点的循环链表,该结构虽然看起来比较复杂,但是却非常有利于我们做删除节点的操作,结构图如下。 由于其节点结构特…

收发存和进销存有什么区别?

一、什么是收发存和进销存 1、收发存 收发存是供应链管理中的关键概念,用于描述企业在供应链中的物流和库存管理过程。 收发存代表了企业在采购、生产和销售过程中的物流活动和库存水平。 收(Receiving) 企业接收供应商送达的物料或产品…

归并排序算法

归并排序 算法说明与实现代码: 归并排序(Merge Sort): 归并排序是一种分治算法,它将列表分成两个子列表,分别进行排序,然后将排序好的子列表合并成一个有序列表。 package mainimport "fmt"fu…

手机商城免费搭建之java商城 开源java电子商务Spring Cloud+Spring Boot+mybatis+MQ+VR全景+b2b2c

1. 涉及平台 平台管理、商家端(PC端、手机端)、买家平台(H5/公众号、小程序、APP端(IOS/Android)、微服务平台(业务服务) 2. 核心架构 Spring Cloud、Spring Boot、Mybatis、Redis 3. 前端框…

微信小程序代码优化3个小技巧

抽取重复样式 样式复用 我们会发现很多时候在开发的过程中会存在多个页面中都用到了同样的样式,那么其实之前有提到过,公用样式可以放在app.wxss里面这样就可以直接复用。 如:flex布局的纵向排列,定义在app.wxss里面 .flex-co…

win10安装cygwin

参考 Cygwin简介及其下载安装卸载_cygwin是什么软件_徐晓康的博客的博客-CSDN博客https://blog.csdn.net/weixin_42837669/article/details/114381405这个文章写的非常好,不过现在如果想安装多个包的话,采用gui的方式可以不行了,我采用的方式…

JavaScript 简单实现观察者模式和发布-订阅模式

JavaScript 简单实现观察者模式和发布-订阅模式 1. 观察者模式1.1 什么是观察者模式1.2 代码实现 2. 发布-订阅模式2.1 什么是发布-订阅模式2.2 代码实现2.2.1 基础版2.2.2 取消订阅2.2.3 订阅一次 1. 观察者模式 1.1 什么是观察者模式 概念:观察者模式定义对象间…

【Windows11】家庭版开启组策略指南

目录 背景新建一个cmd文件运行运行结果 背景 Win11找不到gpedit.msc怎么办?有用户通过命令窗口想要去打开本地组策略的时候,系统突然弹出了一个错误提示,显示系统缺少了gpedit.msc导致无法开启本地组策略编辑器了。那么这个情况要怎么去进行…

【Web开发指南】如何用MyEclipse进行JavaScript开发?

由于MyEclipse中有高级语法高亮显示、智能内容辅助和准确验证等特性,进行JavaScript编码不再是一项繁琐的任务。 MyEclipse v2023.1.2离线版下载 JavaScript项目 在MyEclipse 2021及以后的版本中,大多数JavaScript支持都是开箱即用的JavaScript源代码…

【Minio怎么用】Minio上传图片并Vue回显

流程: 目录 1.文件服务器Minio的安装 1.1 下载Minio安装后,新建1个data文件夹。并在该安装目录cmd 敲命令。注意不要进错目录。依次输入 1.2 登录Minio网页端 1.3 先建1个桶(buckets),点击create a bucket 2. Spr…

前端小练-仿掘金导航栏

文章目录 前言项目结构导航实现创作中心移动小球消息提示 完整代码 前言 闲的,你信嘛,还得开发一个基本的门户社区网站,来给到Hlang,不然我怕说工作量不够。那么这个的话,其实也很好办,主要是这个门户网站的UI写起来麻…

操作系统_进程与线程(三)

目录 3. 同步与互斥 3.1 同步与互斥的基本概念 3.1.1 临界资源 3.1.2 同步 3.1.3 互斥 3.2 实现临界区互斥的基本方法 3.2.1 软件实现方法 3.2.1.1 算法一:单标志法 3.2.1.2 算法二:双标志法先检查 3.2.1.3 算法三:双标志法后检查 …

HarmonyOS/OpenHarmony元服务开发-卡片使用自定义绘制能力

ArkTS卡片开放了自定义绘制的能力,在卡片上可以通过Canvas组件创建一块画布,然后通过CanvasRenderingContext2D对象在画布上进行自定义图形的绘制,如下示例代码实现了在画布的中心绘制了一个笑脸。 Entry Component struct Card { private c…