Spark Shuffle解析

news2025/1/9 15:56:55

1 Shuffle的核心要点

1.1 ShuffleMapStage与ResultStage

 ShuffleMapStage与ResultStage

在划分stage时,最后一个stage称为finalStage,它本质上是一个ResultStage对象,前面的所有stage被称为ShuffleMapStage。

ShuffleMapStage的结束伴随着shuffle文件的写磁盘

ResultStage基本上对应代码中的action算子,即将一个函数应用在RDD的各个partition的数据集上,意味着一个job的运行结束

1.2 Shuffle中的任务个数

我们知道,Spark Shuffle分为map阶段和reduce阶段,或者称之为ShuffleRead阶段和ShuffleWrite阶段,那么对于一次Shuffle,map过程和reduce过程都会由若干个task来执行,那么map task和reduce task的数量是如何确定的呢?

假设Spark任务从HDFS中读取数据,那么初始RDD分区个数由该文件的split个数决定,也就是一个split对应生成的RDD的一个partition,我们假设初始partition个数为N。

初始RDD经过一系列算子计算后(假设没有执行repartition和coalesce算子进行重分区,则分区个数不变,仍为N,如果经过重分区算子,那么分区个数变为M),我们假设分区个数不变,当执行到Shuffle操作时,map端的task个数和partition个数一致,即map task为N个。

reduce端的stage默认取spark.default.parallelism这个配置项的值作为分区数,如果没有配置,则以map端的最后一个RDD的分区数作为其分区数(也就是N),那么分区数就决定了reduce端的task的个数

1.3 reduce端数据的读取

根据stage的划分我们知道,map端task和reduce端task不在相同的stage中,map task位于ShuffleMapStage,reduce task位于ResultStage,map task会先执行,那么后执行的reduce task如何知道从哪里去拉取map task落盘后的数据呢?

reduce端的数据拉取过程如下:

  1. map task 执行完毕后会将计算状态以及磁盘小文件位置等信息封装到MapStatus对象中,然后由本进程中的MapOutPutTrackerWorker对象将mapStatus对象发送给Driver进程的MapOutPutTrackerMaster对象;
  2. 在reduce task开始执行之前会先让本进程中的MapOutputTrackerWorker向Driver进程中的MapoutPutTrakcerMaster发动请求,请求磁盘小文件位置信息
  3. 当所有的Map task执行完毕后,Driver进程中的MapOutPutTrackerMaster就掌握了所有的磁盘小文件的位置信息。此时MapOutPutTrackerMaster会告诉MapOutPutTrackerWorker磁盘小文件的位置信息;
  4. 完成之前的操作之后,由BlockTransforService去Executor0所在的节点拉数据,默认会启动五个子线程。每次拉取的数据量不能超过48M(reduce task每次最多拉取48M数据,将拉来的数据存储到Executor内存的20%内存中)。

2 HashShuffle解析

以下的讨论都假设每个Executor有1个CPU core。

1. 未经优化的HashShuffleManager

shuffle write阶段,主要就是在一个stage结束计算之后,为了下一个stage可以执行shuffle类的算子(比如reduceByKey),而将每个task处理的数据按key进行“划分”。所谓“划分”,就是对相同的key执行hash算法,从而将相同key都写入同一个磁盘文件中,而每一个磁盘文件都只属于下游stage的一个task在将数据写入磁盘之前,会先将数据写入内存缓冲中,当内存缓冲填满之后,才会溢写到磁盘文件中去。

下一个stage的task有多少个,当前stage的每个task就要创建多少份磁盘文件。比如下一个stage总共有100个task,那么当前stage的每个task都要创建100份磁盘文件。如果当前stage有50个task,总共有10个Executor,每个Executor执行5个task,那么每个Executor上总共就要创建500个磁盘文件,所有Executor上会创建5000个磁盘文件。由此可见,未经优化的shuffle write操作所产生的磁盘文件的数量是极其惊人的。

shuffle read阶段,通常就是一个stage刚开始时要做的事情。此时该stage的每一个task就需要将上一个stage的计算结果中的所有相同key,从各个节点上通过网络都拉取到自己所在的节点上,然后进行key的聚合或连接等操作。由于shuffle write的过程中,map task给下游stage的每个reduce task都创建了一个磁盘文件,因此shuffle read的过程中,每个reduce task只要从上游stage的所有map task所在节点上,拉取属于自己的那一个磁盘文件即可。

shuffle read的拉取过程是一边拉取一边进行聚合的。每个shuffle read task都会有一个自己的buffer缓冲,每次都只能拉取与buffer缓冲相同大小的数据,然后通过内存中的一个Map进行聚合等操作。聚合完一批数据后,再拉取下一批数据,并放到buffer缓冲中进行聚合操作。以此类推,直到最后将所有数据到拉取完,并得到最终的结果。

未优化的HashShuffleManager工作原理如图1-7所示:

 

图1-7 未优化的HashShuffleManager工作原理

2. 优化后的HashShuffleManager

为了优化HashShuffleManager我们可以设置一个参数,spark.shuffle. consolidateFiles,该参数默认值为false,将其设置为true即可开启优化机制,通常来说,如果我们使用HashShuffleManager,那么都建议开启这个选项。

开启consolidate机制之后,在shuffle write过程中,task就不是为下游stage的每个task创建一个磁盘文件了,此时会出现shuffleFileGroup的概念,每个shuffleFileGroup会对应一批磁盘文件,磁盘文件的数量与下游stage的task数量是相同的。一个Executor上有多少个CPU core,就可以并行执行多少个task。而第一批并行执行的每个task都会创建一个shuffleFileGroup,并将数据写入对应的磁盘文件内

当Executor的CPU core执行完一批task,接着执行下一批task时,下一批task就会复用之前已有的shuffleFileGroup,包括其中的磁盘文件,也就是说,此时task会将数据写入已有的磁盘文件中,而不会写入新的磁盘文件中。因此,consolidate机制允许不同的task复用同一批磁盘文件,这样就可以有效将多个task的磁盘文件进行一定程度上的合并,从而大幅度减少磁盘文件的数量,进而提升shuffle write的性能

假设第二个stage有100个task,第一个stage有50个task,总共还是有10个Executor(Executor CPU个数为1),每个Executor执行5个task。那么原本使用未经优化的HashShuffleManager时,每个Executor会产生500个磁盘文件,所有Executor会产生5000个磁盘文件的。但是此时经过优化之后,每个Executor创建的磁盘文件的数量的计算公式为:CPU core的数量 * 下一个stage的task数量,也就是说,每个Executor此时只会创建100个磁盘文件,所有Executor只会创建1000个磁盘文件。

优化后的HashShuffleManager工作原理如图1-8所示:

 

图1-8 优化后的HashShuffleManager工作原理

3 SortShuffle解析

SortShuffleManager的运行机制主要分成两种,一种是普通运行机制,另一种是bypass运行机制。当shuffle read task的数量小于等于spark.shuffle.sort. bypassMergeThreshold参数的值时(默认为200),就会启用bypass机制。

  1. 普通运行机制

在该模式下,数据会先写入一个内存数据结构中,此时根据不同的shuffle算子,可能选用不同的数据结构。如果是reduceByKey这种聚合类的shuffle算子,那么会选用Map数据结构,一边通过Map进行聚合,一边写入内存;如果是join这种普通的shuffle算子,那么会选用Array数据结构,直接写入内存。接着,每写一条数据进入内存数据结构之后,就会判断一下,是否达到了某个临界阈值如果达到临界阈值的话,那么就会尝试将内存数据结构中的数据溢写到磁盘,然后清空内存数据结构

在溢写到磁盘文件之前,会先根据key对内存数据结构中已有的数据进行排序。排序过后,会分批将数据写入磁盘文件。默认的batch数量是10000条,也就是说,排序好的数据,会以每批1万条数据的形式分批写入磁盘文件。写入磁盘文件是通过Java的BufferedOutputStream实现的。BufferedOutputStream是Java的缓冲输出流,首先会将数据缓冲在内存中,当内存缓冲满溢之后再一次写入磁盘文件中,这样可以减少磁盘IO次数,提升性能。

一个task将所有数据写入内存数据结构的过程中,会发生多次磁盘溢写操作,也就会产生多个临时文件最后会将之前所有的临时磁盘文件都进行合并,这就是merge过程,此时会将之前所有临时磁盘文件中的数据读取出来,然后依次写入最终的磁盘文件之中。此外,由于一个task就只对应一个磁盘文件,也就意味着该task为下游stage的task准备的数据都在这一个文件中,因此还会单独写一份索引文件,其中标识了下游各个task的数据在文件中的start offset与end offset

SortShuffleManager由于有一个磁盘文件merge的过程,因此大大减少了文件数量。比如第一个stage有50个task,总共有10个Executor,每个Executor执行5个task,而第二个stage有100个task。由于每个task最终只有一个磁盘文件,因此此时每个Executor上只有5个磁盘文件,所有Executor只有50个磁盘文件。

普通运行机制的SortShuffleManager工作原理如图1-9所示:

 图1-9 普通运行机制的SortShuffleManager工作原理

  1. bypass运行机制

bypass运行机制的触发条件如下:

  1. shuffle map task数量小于spark.shuffle.sort.bypassMergeThreshold参数的值。
  2. 不是聚合类的shuffle算子。

此时,每个task会为每个下游task都创建一个临时磁盘文件,并将数据按key进行hash然后根据key的hash值,将key写入对应的磁盘文件之中。当然,写入磁盘文件时也是先写入内存缓冲,缓冲写满之后再溢写到磁盘文件的。最后,同样会将所有临时磁盘文件都合并成一个磁盘文件,并创建一个单独的索引文件。

该过程的磁盘写机制其实跟未经优化的HashShuffleManager是一模一样的,因为都要创建数量惊人的磁盘文件,只是在最后会做一个磁盘文件的合并而已。因此少量的最终磁盘文件,也让该机制相对未经优化的HashShuffleManager来说,shuffle read的性能会更好。

而该机制与普通SortShuffleManager运行机制的不同在于:第一,磁盘写机制不同;第二,不会进行排序。也就是说,启用该机制的最大好处在于,shuffle write过程中,不需要进行数据的排序操作,也就节省掉了这部分的性能开销

bypass运行机制的SortShuffleManager工作原理如图1-10所示:

 

图1-10 bypass运行机制的SortShuffleManager工作原理

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/385349.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

16 Nacos服务端服务注册源码分析

Nacos服务端服务注册源码分析 服务端调用接口 我们已经知道客户端在注册服务的时候实际上是调用的NamingService.registerInstance这个方法来完成实例的注册,而且在最后我们也告诉了大家实际上从本质上讲服务注册就是调用的对应接口nacos/v1/ns/instance&#xff…

浅谈一下mysql8.0与5.7的字符集

修改字符集 修改步骤 在MySQL8.0版本之前,默认字符集为1atin1,utf8字符集指向的是utf8mb3。网站开发人员在数据库设计的时候往往会将编码修改为ut8字符集。如果遗忘修改默认的编码,就会出现乱码的问题。从MySQL8.0开始,数据库的默认编码将改…

《强化学习导论》之6.5 Q-Learning

Q-Learning:Off-Policy TD Control强化学习的早期突破之一是开发了一种称为Q学习的非策略TD控制算法(Watkins,1989)。其最简单的形式,定义为(6.8)在这种情况下,学习的动作-值函数Q直接近似于最优动作-值函数&#xff0…

【C++PrimerPlus】第三章 处理数据

文章目录前言内容目录3.1 简单变量3.1.2 变量名3.1.2 整形3.1.3 整形short,int,long,long long3.1.4 无符号类型3.1.5 选择整形类型3.1.6 整形字面值3.1.7 C如何确定常量的类型3.1.8 char类型:字符和小整数3.1.9 bool类型3.2 const修饰符3.3浮点数3.3.1 书写浮点数3…

【存储】存储协议

存储协议SCSI协议SCSI协议和存储系统SCSI协议寻址方式iSCSI产生的原因--->基于IP网络的SCSIiSCSI启动器-->目标器模型iSCSI体系结构iSCSI和SCSI、TCP和IP的关系SAS协议为什么要发展SASSAS协议层次结构SAS特点SAS的可扩展性SAS与其他传输技术的比较FCFC协议结构FC拓扑结构…

vueCli实现一个自定义loader

生活只会欺负穷人,爱情也是 webpack 作为前端项目的打包工具,具有很好的学习价值。下面来学习下其中的 Loader Loader可以帮助webpack将不同类型的文件转换为webpack可识别的模块 webpack中Loader使用:https://www.webpackjs.com/loaders/…

用VAE生成图像

用VAE生成图像自编码器AE,auto-encoderVAE讲讲为什么是log_var为什么要用重参数化技巧用VAE生成图像变分自编码器是自编码器的改进版本,自编码器AE是一种无监督学习,但它无法产生新的内容,变分自编码器对其潜在空间进行拓展&#…

二、Redis安装配置(云服务器、vmware本地虚拟机)

一、自己购买服务器 自己购买阿里云、青牛云、腾讯云或华为云服务器, 自带CentoOS或者Ubuntu环境,直接开干 二、Vmware本地虚拟机安装 1、VMWare虚拟机的安装,不讲解,默认懂 2、如何查看自己的linux是32位还是64位 getconf L…

云HIS医院管理系统源码 云HIS系统源码 SaaS模式 springboot开发

▶ SaaS运维平台多医院入驻强大的电子病历模板 ,有源码,有演示! ▶ 云HIS系统技术框架: 总体框架: SaaS应用,全浏览器访问 前后端分离,多服务协同 服务可拆分,功能易扩展 ▶ 云HI…

初阶C语言——实用调试技巧【详解】

文章目录1. 什么是bug?2. 调试是什么?有多重要?2.1 调试是什么?2.2 调试的基本步骤2.3 Debug和Release的介绍3.学会使用快捷键4.调试的时候查看程序当前信息4.1 查看临时变量的值4.2 查看内存信息4.3 查看调用堆栈4.4 查看汇编信息…

混凝土搅拌站远程监控解决方案

一、项目背景 随着大规模的基础设施建设,对混凝土搅拌设备的需求量日益增加,对其技术指标的要求也日益提高,其技术性能将直接关系到工程的质量和使用寿命。而混凝土生产的质量是在生产过程中形成的,而非最终强度的检测。混凝土生…

10 面向接口编程(上):一切皆服务,服务基于协议

按照面向接口编程的理念,将每个模块看成是一个服务,服务的具体实现我们其实并不关心,我们关心的是服务提供的能力,即接口协议。那么框架主体真正要做的事情是什么呢?其实是:定义好每个模块服务的接口协议&a…

That引导的宾语从句

That引导的宾语从句指的是that为宾语从句的引导词。宾语从句:置于动词、介词等词性后面,在句子中起宾语作用的从句叫做宾语从句。宾语从句分为三类:动词的宾语从句,介词的宾语从句和形容词的宾语从句。 一、that引导的宾语从句(在…

《数据万象带你玩转视图场景》第一期:avif图片压缩详解

前言随着硬件的发展,不管是手机还是专业摄像设备拍出的图片随便可能就有几M,甚至几十M,并且现在我们处于随处可及的信息海洋里,海量的图片带来了存储问题、带宽问题、加载时延问题等等。对图片信息进行有效的压缩处理无疑会极大的…

ARM架构Ubuntu下使用Docker安装MySQL

大家好,我是中国码农摘星人。 欢迎分享/收藏/赞/在看! 由于ARM架构的限制,许多软件还没有做到完全适配,CentOS、MySQL等软件安装频繁出错。于是决定做一栏相关软件环境安装的文章。 基础信息 Apple M1 ProUbuntu 22.04 运行 使…

Python 如何安装 MySQLdb ?

人生苦短 我用python Python 标准数据库接口为 Python DB-API, Python DB-API为开发人员提供了数据库应用编程接口。 Python 数据库接口支持非常多的数据库, 你可以选择适合你项目的数据库: GadFlymSQLMySQLPostgreSQLMicrosoft SQL Serve…

来 CSDN 三年,我写了一本Python书

大家好,我是朱小五。转眼间已经来 CSDN 3年了,其中给大家一共分享了252篇Python文章。 但这三年,最大的收获还是写了一本Python书! 在这个自动化时代,我们有很多重复无聊的工作要做。想想这些你不再需要一次又一次地做…

站内信箱系统的设计与实现

技术:Java、JSP等摘要:在经济全球化和信息技术成为发展迅速的今时今日,人们通过电子邮件收发进行信息传递已经成为主流。随着互联网和网络办公的发展,电子邮件正在被广泛应用在人们的日常生活中。跟据调查研究统计,在全…

文件系统-

文件系统 是一个面向用户的可视化管理类型的操作系统 其实就是管理硬盘的基本单位扇区,然后将存储数据可视化管理给用户 文件系统包含两个部分 文件的集合和目录结构 对于用户和系统来说文件系统时不一样的 操作系统只解释可执行文件 文件内部结构 文件就是基本…

【JVM】详解Java内存区域和分配

这里写目录标题一、前言二、运行时数据分区2.1程序计数器(PC)2.2 Java虚拟机栈2.3 本地方法栈2.4 Java堆2.5 方法区2.5.1 运行时常量池2.6 直接内存三、HotSpot虚拟机对象探秘3.1 对象的创建3.2 对象的内存布局3.3 对象的访问定位一、前言 C/C需要自行回收和释放已经没用的对象…