大数据技术架构(组件)26——Spark:Shuffle

news2025/1/12 22:56:51

2.1.6、Shuffle

2.1.6.0 Shuffle Read And Write

  MR框架中涉及到一个重要的流程就是shuffle,由于shuffle涉及到磁盘IO和网络IO,所以shuffle的性能直接影响着整个作业的性能。Spark其本质也是一种MR框架,所以也有自己的shuffle实现。但是和MR中的shuffle流程稍微有些不同(Spark相当于Mr来说其中一些环节是可以省略的),比如MR中的Shuffle过程是必须要有排序的,且不能省略掉,但Spark中的Shuffle是可以省略的;另对于MR的Shuffle中间结果是要落盘的,而对于Spark Shuffle来说,可以根据存储策略存储在内存或者磁盘中。

Shuffle阶段中涉及到一个很重要的插拔式接口ShuffleManager,该接口可以作为一个入口,可以获取用于数据读写处理句柄ShuffleHandle,然后通过ShuffleHandle获取特定的读写接口即ShuffleWriter和ShuffleReader,以及获取块数据信息解析接口ShuffleBlockResolver。

目前Spark提供了两种ShuffleManager:sort和tungsten-sort

2.1.6.0.1、Shuffle Writer

Shuffle写数据的时候,在内存中是有一个Buffer缓冲区,同时本地磁盘也有对应的文件(具体位置可以通过spark.local.dir配置);因此该部分内存中主要被两部分内容所占用:1、存储Buffer数据;2、管理文件句柄。

如果shuffle过程中写入大量的文件,那么内存消耗也是一种压力,很容易产生OOM,频繁GC。

扩展:关于GC引发的shuffle fetch不到文件

 有那么一种现象:即Reduce端的Stage去拉取上一个Stage的产生结果,但是因为找不到文件而抛出异常,其实并不是不存在,而是可能由于正在进行GC操作而未回应。

Spark2.X提供了三种Shuffle Writer模式:

2.1.6.0.1.1 BypassMergeSortShuffleWriter

该种模式是带了Hash风格的基于Sort的Shuffle机制,为每个reduce端生成一个文件。

适用场景:该种模式适用于分区数比较少的场景下,可以作为一种优化方案。

上图的合并机制即就是BypassMergeSortShuffleWriter的部分流程。

写入文件命名:

该种模式的缺点:

1、不能使用aggregator,以32条记录批次直接写入的(通过spark.shuffle.file.buffer参数配置),所以会造成后续的网络IO开销比较大。

2、每个分区都会生成一个对应的磁盘写入器DiskBlockObjectWriter,先对每个reduce产生的数据写入临时文件中,最后合并输出一个文件。所以分区数不能设置过大,避免同时打开过多实例加大内存开销

3、不能指定Ordering,也就是说该种模式的排序是采用分区Id进行的,分区内的数据是不保证有序的。

2.1.6.0.1.2 SortShuffleWriter

流程:

1、Sort Shuffle Writer模式首先会实例化一个ExternalSorter,根据是否在map端聚合来决定是否在实例化的时候传入aggregator和Ordering变量。

2、把所有的记录放到外部排序器中ExternalSorter(会调用Sorter.insertAll和writePartitionedFile两个方法)

3、Sorter.insertAll内部会根据是否进行合并采用不同的存储。如果需要进行合并,那么就会使用AppendOnlyMap在内存中进行合并;如果不需要进行合并,那么就会存放到Buffer中。

3.1、无论是否进行合并,都会进行的是否溢写检查(即调用maybeSpillCollection检查是否溢写到磁盘),其底层内部调用的是maybeSpill方法。

4.其溢写策略:

4.1、首先检查是否需要spill;判断依据为:

4.1.1、当前记录数是否是32的倍数--即对小批量的数据集进行spill

4.1.2、检查当前需要的内存大小是否达到或者超过了当前分配的内存阈值spark.shuffle.spill.initialMemoryThreshold=510241025

4.2、如果以上条件都满足的话,那么会向Shuffle内存池申请当前2倍内存,然后再次判断是否需要spill。

4.3、再次判断的依据是:

4.3.1、当前判断结果为true|从上次spill之后读取的记录数是否超过了配置的阈值spark.shuffle.spill.numElementsForceSpillThreshold

缺点:

1、内存中的数据是以反序列化的形式存储的,这样会增加内存的开销,同时也意味着增加GC负载。

2、存储到磁盘的时候会对数据进行序列化,而反序列化和序列化操作会增加CPU的开销。

2.1.6.0.1.3 UnsafeShuffleWriter

和Sort Shuffle Writer基本一致,主要不同在于使用的是序列化排序模式。

上述中说到在spark.shuffle.manager设置为sort时,内部会自动选择具体的实现机制。

Tungsten-Sort Shuffle内部的写入器是使用的UnsafeShuffleWriter,该类在构建的时候会传入一个context.taskMemoryManager(),构建一个TaskMemoryManager实例,主要负责管理分配task内存。

该写入器有以下三个关键步骤:

1、通过循环遍历将记录写入到外部排序器中

2、closeAndWriteOutput方法写数据文件和索引文件,在写的过程中会先合并外部排序器在插入过程中生成的中间文件。该方法主要有三个步骤:

2.1、触发外部排序器,获取spill信息

2.2、合并Spill中间文件,生成数据文件,并返回各个分区对应的数据量信息。

2.3、根据各个分区的数据量信息生成数据文件对应的索引文件。

3、sorter.cleanupresources最后释放外部排序器的资源。

2.1.6.0.2、Shuffle Read

2.1.6.1、Hash Shuffle(Spark2.X abandoned)

早期引入Hash Shuffle主要是为了避免不必要的排序(MR中的Shuffle过程sort是必经的一个过程)。

在Spark1.1之前,每个Mapper阶段的Task都会为每个Reduce阶段的Task生成一个文件,那么也就会生成M*R个中间文件(M表示Mapper阶段的Task个数,R表示Reduce阶段的Task个数)。

后来为了缓解这种大量文件产生的问题,基于Hash Shuffle实现又引入了Shuffle Consolidate机制,也就是将中间文件进行合并。通过配置spark.shuffle.consolidateFiles=true减少中间文件生成的个数。该种机制把中间文件生成方式调整为每个执行单元(类似于Slot)为每个Reduce阶段的Task生成一个文件,那么最后生成的文件个数为E(C/T)R;

E:表示Executors个数

C:表示Mapper阶段可用Cores个数

T:表示Mapper阶段Task分配的Cores个数。

从抽象的角度来说,Consolidate Shuffle是通过ShuffleFileGroup的概念,即每个ShuffleFileGroup对应一批Shuffle文件,文件数量和Reducer端的Task个数一样。同个Core上执行的MapTask任务会往这一批Shuffle文件里写,这样可以进行复用,在一定程度上对多个task进行了合并。

2.1.6.2、Sort Shuffle

2.1.6.2.1、引入背景

基于Hash的Shuffle实现方式,生成的中间结果文件个数取决于Reduce阶段的Task个数,即Reduce端的并行度。虽然引入了consolidate机制,但是仍然解决不了大量文件生成的问题。

因此在Spark1.1中又引入了基于Sort的Shuffle方式,在2.X中废弃掉了hash shuffle。也就是说现在1.1之后所有的版本中默认都是Sort Shuffle(早期版本其实可以调整ShuffleManager为hash方式)。

为什么说Sort Shuffle解决了Hash Shuffle生成大量文件的问题?那么最后又是会生成多少个文件呢?

解答:基于sort shuffle的模式是将所有的数据写入到一个数据文件里,同时会生成一个索引文件。那么最终文件生成的个数变成了2M;

M表示Mapper阶段的Task个数,每个Mapper阶段的Task分别生成两个文件(1个数据文件、1个索引文件)

其中索引文件存储了数据文件通过Partitioner的分类的信息,所以下一个阶段Stage中的Task就是根据这个index文件获取自己所需要的上一个Stage中ShuffleMapTask产生的数据。而ShuffleMapTask产生数据写入是顺序写的(根据自身的Key写进去的,同时也是按照Partition写进去的)

2.1.6.2.2、原理

 Sort Shuffle主要是在Mapper阶段,在Mapper阶段,会进行两次排序(第一次是根据PartitionId进行排序;第二次是根据数据本身的Key进行排序,当然第二次排序除非调用了带排序的方法,在方法里指定了Key值的Ordering实例,这个时候才会对分区内的数据进行排序)。

 sort shuffle其核心借助于ExternalSorter首先会把每个ShuffleMapTask的输出排序内存中,当超过内存容纳的时候,会spill到一个文件中(FileSegmentGroup),同时还会写一个索引文件用来区分下一个阶段Reduce Task不同的内容来告诉下游Stage的并行任务哪些数据是属于自己的。

2.1.6.2.3、缺点

1、sort shuffle产生的文件数量为2M,那么这个文件数量的大小也是取决于M的个数,也就是Map端的TASK个数。如果task数过多,那么这个时候Reduce端需要大量记录并进行反序列化,同样会造成OOM,甚至full GC

2、Mapper端强制排序(和MR中的Shuffle是一样的)

3、如果分区内也需要进行排序,那么就都要在mapper端和reducer端进行排序。

4、sort shuffle是基于记录本身进行排序的,会有一定的性能消耗。

2.1.6.3、Tungsten Sort Shuffle

tungen-sort shuffle对排序算法进行了改造优化了排序的速度。其优化(从避免反序列化的数据量过大消耗内存方面考虑;借助于Tungsten内存管理模型,可以直接处理序列化的数据,同时也降低了CPU开销。

使用该模式需要具备以下几个条件:

1、shuffle依赖中不存在聚合操作或者没有对输出排序的要求

2、shuffle的序列化器支持序列化值的重定位(目前仅支持KryoSerializer以及SparkSQL子框架自定义的序列化器)

3、Shuffle过程重化工的输出分区个数少于16777216个。

所以使用基于Tungsten-sort的Shuffle实现机制条件还是比较苛刻的。

2.1.6.4、Shuffle & Storage (TODO)

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/331625.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

九龙证券|军工股全线走强!中航电测又涨停,这一板块所有个股都在涨

今天早盘,A股全体低开高走,到午间收盘,主要股指均红盘报收,两市成交仍旧低迷。 盘面上,国防军工、酒店餐饮、芯片、钙钛矿电池等板块涨幅居前,文教休闲、锂矿、水产品、供销社等板块跌幅居前。北上资金净流…

使用openai-whisper 语音转文字

前言:最近由于ChatGPT 的大热,AI 应用领域再次进入大众的视线,今天介绍一款AI应用whisper 可以较为准确的将人声转换为文字(支持多国语言)一、安装安装有两种方式pip 和源码编译安装,这里介绍pip安装方式安…

尚医通(八) Nginx

目录一、项目中的服务地址二、配置nginx反向代理1、安装window版的nginx2、配置nginx代理3、重启nginx4、测试三、配置开发环境1、修改文件内2、重启前端程序一、项目中的服务地址 只有一个api地址的配置位置,而我们实际的后端有很多微服务,所以接口地址…

C#【必备技能篇】序列化与反序列化(json、xml、二进制文件)

文章目录一、序列化为json1、序列化与反序列化【基本使用】实例代码:2、封装成泛型方法【可以公用】实例代码:二、序列化为xml1、序列化与反序列化【基本使用】实例代码:2、封装成泛型方法【可以公用】实例代码:三、序列化为二进制…

Windows sshfs挂载远程文件夹

Windows sshfs挂载远程文件夹 Windows系统通过sshfs,远程挂载文件服务,实现远程文件夹共享的功能 目录 Windows sshfs挂载远程文件夹 1.安装WinFsp 2.安装SSHFS-Win 3.挂载Linux文件服务 4.断开Linux文件服务 1.安装WinFsp 下载地址:…

Vue3配置路由(vue-router)

文章目录前言一、配置路由(vue-router)1、安装路由2、新建页面3、创建路由配置文件4.特殊报错!前言 紧接上篇文章,vue3的配置与vue2是有所差别的,本文就讲述了如何配置,如果本文对你有所帮助请三连支持博主…

2023-02-09 Elasticsearch 模糊搜索

1 prefix 前缀搜索 以前缀开头的搜索,不计算相关度得分 前缀搜索匹配的是term,而不是field。 前缀搜索的性能很差 前缀搜索没有缓存 前缀搜索尽可能把前缀长度设置的更长 针对于中文分词器 index_prefixes: 默认 “min_chars” : 2, “max_chars” : …

CMMI落地常见4大问题及改进措施

(一)、CMMI落地常见的4大问题: 1、组织成员并非全部认可与参与 在CMMI推行过程中,过程改进小组EPG负责整个改进工作,但组织其他成员并不是全部认可和自愿参与,甚至有些成员认为与自己无关。从而造成EPG在推…

Spring-Data-Jpa实现继承实体类

写在前面:从2018年底开始学习SpringBoot,也用SpringBoot写过一些项目。现在对学习Springboot的一些知识总结记录一下。如果你也在学习SpringBoot,可以关注我,一起学习,一起进步。 相关文章: 【Springboot系…

ZooKeeper 避坑实践: Zxid溢出导致选主

作者:子葵 背景 线上 flink 用户使用 ZooKeeper 做元数据中心以及集群选主,一些版本的 flink 在 ZooKeeper 选主时,会重启 Job,导致一些非预期的业务损失。而 ZooKeeper 在 zxid溢出时,会主动触发一次选主&#xff0…

复习0206

目录 一、访问修饰符 一、权限范围 二、注意事项 二、封装(面向对象的三大特征之一) 一、封装的好处 二、封装的实现步骤 三、和构造器结合 四、练习题中的细节 一、访问修饰符 一、权限范围 访问修饰符用于控制方法和属性(成员变量…

Kylin构建引擎的衍生维度

目录1. 衍生维度(derived dimension)1. 衍生维度(derived dimension) 衍生维度的构建和查询过程: 当有一张事实表和维度表如下: 我们需要以city为维度字段,sum(salary)为度量字段,进行cube的构建。因为定义了city为衍生维度字段…

C++多态(上)

文章目录1. 多态的概念2. 多态的定义及实现2.1多态的构成条件2.2 虚函数2.3 虚函数的重写2.4 虚函数重写的两个例外2.4.1 协变(基类与派生类虚函数返回值类型不同)2.4.2 析构函数的重写(基类与派生类析构函数的名字不同)2.5 重载、覆盖(重写)、隐藏(重定义)的对比3. C11 overri…

小程序酷炫动态登录页源码(动态水滴)

1. 页面效果 登陆页面一般都要酷炫好看一点&#xff0c;这里分享一个动态登录页面&#xff0c;页面有三个流动的小水滴。一个水滴放登录框。剩下两个水滴跳转页面和打开弹窗。 2. 代码内容 <template><view class"login-page"><u-gap height"…

【c语言技能树】文件

Halo&#xff0c;这里是Ppeua。平时主要更新C语言&#xff0c;C&#xff0c;数据结构算法......感兴趣就关注我吧&#xff01;你定不会失望。 &#x1f308;个人主页&#xff1a;主页链接 &#x1f308;算法专栏&#xff1a;专栏链接 我会一直往里填充内容哒&#xff01; &…

JVM堆内存详解

一、简介 JAVA堆内存管理是影响性能主要因素之一。 堆内存溢出是JAVA项目非常常见的故障&#xff0c;在解决该问题之前&#xff0c;必须先了解下JAVA堆内存是怎么工作的。 JVM内存划分为堆内存和非堆内存&#xff0c;堆内存分为年轻代&#xff08;Young Generation&#xff09…

C语言基础(九)—— 复合类型(自定义类型)

1. 结构体1.1 概述数组&#xff1a;描述一组具有相同类型数据的有序集合&#xff0c;用于处理大量相同类型的数据运算。有时我们需要将不同类型的数据组合成一个有机的整体&#xff0c;如&#xff1a;一个学生有学号/姓名/性别/年龄/地址等属性。显然单独定义以上变量比较繁琐&…

ArkTS语法(声明式UI)

页面级变量的状态管理 装饰器装饰内容说明State基本数据类型&#xff0c;类&#xff0c;数组修饰的状态数据被修改时会触发组件的build方法进行UI界面更新。Prop基本数据类型修改后的状态数据用于在父组件和子组件之间建立单向数据依赖关系。修改父组件关联数据时&#xff0c;…

你真的看好低代码开发吗?

低代码开发前景如何&#xff0c;大家真的看好低代码开发吗&#xff1f;之前有过很多关于低代码的内容&#xff0c;这篇就来梳理下国内外低代码开发平台发展现状及前景。 01、国外低代码开发平台现状 2014年&#xff0c;研究机构Forrester Research发表的报告中提到“面向客户…

【Java基础】017 -- 面向对象进阶二(包、代码块、抽象类、接口、内部类)

目录 四、包和final 1、什么是包&#xff1f; 2、使用其它类的规则 ①、规则实现 ②、小结 3、final ①、示例代码 ②、常量 ③、练习 ④、小结 五、权限修饰符和代码块 1、权限修饰符的分类 2、权限修饰符的使用规则 3、代码块 ①、局部代码块&#xff08;写在…