Spark系列之Spark的Shuffle详解及相关参数调优

news2025/1/13 10:29:21

title: Spark系列


第七章 Spark的Shuffle详解及相关参数调优

​ 大多数Spark作业的性能主要就是消耗在了shuffle环节,因为该环节包含了大量的磁盘IO、序列化、网 络数据传输等操作。因此,如果要让作业的性能更上一层楼,就有必要对shuffle过程进行调优。但是也 必须提醒大家的是,影响一个Spark作业性能的因素,主要还是代码开发、资源参数以及数据倾斜, shuffle调优只能在整个Spark的性能调优中占到一小部分而已。因此大家务必把握住调优的基本原则, 千万不要舍本逐末。下面我们就给大家详细讲解shuffle的原理,以及相关参数的说明,同时给出各个参数的调优建议。

7.1 ShuffleManager发展概述

​ 在 Spark 的源码中,负责 shuffle 过程的执行、计算和处理的组件主要就是 ShuffleManager,也即 shuffle 管理器。而随着Spark的版本的发展,ShuffleManager 也在不断迭代,变得越来越先进。

在 Spark 1.2 以前,默认的shuffle计算引擎是 HashShuffleManager。该 ShuffleManager 而 HashShuffleManager有着一个非常严重的弊端,就是会产生大量的中间磁盘文件,进而由大量的磁盘 IO操作影响了性能。

因此在Spark 1.2以后的版本中,默认的 ShuffleManager 改成了 SortShuffleManager。SortShuffleManager 相较于 HashShuffleManager 来说,有了一定的改进。主要就在于,每个 Task在进行 shuffle 操作时,虽然也会产生较多的临时磁盘文件,但是最后会将所有的临时文件合并(merge)成一个磁盘文件,因此每个 Task 就只有一个磁盘文件。在下一个stage的 shuffle read task拉取自己的数据时,只要根据索引读取每个磁盘文件中的部分数据即可。

​ 下面我们详细分析一下 HashShuffleManagerSortShuffleManager 的原理。

7.2 HashShuffleManager的运行原理

7.2.1 未经优化的HashShuffleManager

​ 下图说明了未经优化的HashShuffleManager的原理。这里我们先明确一个假设前提:每个 Executor 只有 1 个CPU core,也就是说,无论这个 Executor 上分配多少个 task 线程,同一时间都只能执行一 个 task 线程。

​ 我们先从shuffle write开始说起。shuffle write阶段,主要就是在一个stage结束计算之后,为了下一个 stage可以执行shuffle类的算子(比如reduceByKey),而将每个task处理的数据按key进行“分类”。所 谓“分类”,就是对相同的key执行hash算法,从而将相同key都写入同一个磁盘文件中,而每一个磁盘文 件都只属于下游stage的一个task。在将数据写入磁盘之前,会先将数据写入内存缓冲中,当内存缓冲 填满之后,才会溢写到磁盘文件中去。

​ 那么每个执行shuffle write的task,要为下一个stage创建多少个磁盘文件呢?很简单,下一个stage的task有多少个,当前stage的每个task就要创建多少份磁盘文件。比如下一个stage总共有100个task,那么当前stage的每个task都要创建100份磁盘文件。如果当前stage有50个task,总共有10个Executor,每个Executor执行5个Task,那么每个Executor上总共就要创建500个磁盘文件,所有Executor上会创建5000个磁盘文件。由此可见,未经优化的shuffle write操作所产生的磁盘文件的数量是极其惊人的。

​ 接着我们来说说shuffle read。shuffle read,通常就是一个stage刚开始时要做的事情。此时该stage的每一个task就需要将上一个stage的计算结果中的所有相同key,从各个节点上通过网络都拉取到自己所在的节点上,然后进行key的聚合或连接等操作。由于shuffle write的过程中,task给下游stage的每个task都创建了一个磁盘文件,因此shuffle read的过程中,每个task只要从上游stage的所有task所在节点上,拉取属于自己的那一个磁盘文件即可。

​ shuffle read的拉取过程是一边拉取一边进行聚合的。每个shuffle read task都会有一个自己的buffer缓冲,每次都只能拉取与buffer缓冲相同大小的数据,然后通过内存中的一个Map进行聚合等操作。聚合完一批数据后,再拉取下一批数据,并放到buffer缓冲中进行聚合操作。以此类推,直到最后将所有数据到拉取完,并得到最终的结果。

在这里插入图片描述

7.2.2 优化后的HashShuffleManager

​ 下图说明了优化后的HashShuffleManager的原理。这里说的优化,是指我们可以设置一个参数,spark.shuffle.consolidateFiles。该参数默认值为false,将其设置为true即可开启优化机制。通常来说,如果我们使用HashShuffleManager,那么都建议开启这个选项。

​ 开启consolidate机制之后,在shuffle write过程中,task就不是为下游stage的每个task创建一个磁盘文件了。此时会出现shuffleFileGroup的概念,每个shuffleFileGroup会对应一批磁盘文件,磁盘文件的数量与下游stage的task数量是相同的。一个Executor上有多少个CPU core,就可以并行执行多少个task。而第一批并行执行的每个task都会创建一个shuffleFileGroup,并将数据写入对应的磁盘文件内。

​ 当Executor的CPU core执行完一批task,接着执行下一批task时,下一批task就会复用之前已有的shuffleFileGroup,包括其中的磁盘文件。也就是说,此时task会将数据写入已有的磁盘文件中,而不会写入新的磁盘文件中。因此,consolidate机制允许不同的task复用同一批磁盘文件,这样就可以有效将多个task的磁盘文件进行一定程度上的合并,从而大幅度减少磁盘文件的数量,进而提升shuffle write的性能。

​ 假设第二个stage有100个task,第一个stage有50个task,总共还是有10个Executor,每个Executor执行5个task。那么原本使用未经优化的HashShuffleManager时,每个Executor会产生500个磁盘文件,所有Executor会产生5000个磁盘文件的。但是此时经过优化之后,每个Executor创建的磁盘文件的数量的计算公式为:CPU core的数量 * 下一个stage的task数量。也就是说,每个Executor此时只会创建100个磁盘文件,所有Executor只会创建1000个磁盘文件。

在这里插入图片描述

7.3 SortShuffleManager运行原理

​ SortShuffleManager的运行机制主要分成两种,一种是普通运行机制,另一种是bypass运行机制。当 shuffle read task的数量小于等于spark.shuffle.sort.bypassMergeThreshold参数的值时(默认为 200),就会启用bypass机制。

7.3.1 普通运行机制

​ 下图说明了普通的SortShuffleManager的原理。在该模式下,数据会先写入一个内存数据结构中,此时根据不同的shuffle算子,可能选用不同的数据结构。如果是reduceByKey这种聚合类的shuffle算子,那么会选用Map数据结构,一边通过Map进行聚合,一边写入内存;如果是join这种普通的shuffle算子,那么会选用Array数据结构,直接写入内存。接着,每写一条数据进入内存数据结构之后,就会判断一下,是否达到了某个临界阈值。如果达到临界阈值的话,那么就会尝试将内存数据结构中的数据溢写到磁盘,然后清空内存数据结构。

​ 在溢写到磁盘文件之前,会先根据key对内存数据结构中已有的数据进行排序。排序过后,会分批将数据写入磁盘文件。默认的batch数量是10000条,也就是说,排序好的数据,会以每批1万条数据的形式分批写入磁盘文件。写入磁盘文件是通过Java的BufferedOutputStream实现的。BufferedOutputStream是Java的缓冲输出流,首先会将数据缓冲在内存中,当内存缓冲满溢之后再一次写入磁盘文件中,这样可以减少磁盘IO次数,提升性能。

​ 一个task将所有数据写入内存数据结构的过程中,会发生多次磁盘溢写操作,也就会产生多个临时文件。最后会将之前所有的临时磁盘文件都进行合并,这就是merge过程,此时会将之前所有临时磁盘文件中的数据读取出来,然后依次写入最终的磁盘文件之中。此外,由于一个task就只对应一个磁盘文件,也就意味着该task为下游stage的task准备的数据都在这一个文件中,因此还会单独写一份索引文件,其中标识了下游各个task的数据在文件中的start offset与end offset。

​ SortShuffleManager由于有一个磁盘文件merge的过程,因此大大减少了文件数量。比如第一个stage有50个task,总共有10个Executor,每个Executor执行5个task,而第二个stage有100个task。由于每个task最终只有一个磁盘文件,因此此时每个Executor上只有5个磁盘文件,所有Executor只有50个磁盘文件。

在这里插入图片描述

7.3.2 bypass运行机制

​ 下图说明了bypass SortShuffleManager的原理。bypass运行机制的触发条件如下: * shuffle maptask数量小于spark.shuffle.sort.bypassMergeThreshold参数的值。 * 不是聚合类的shuffle算子(比如reduceByKey)。

​ 此时task会为每个下游task都创建一个临时磁盘文件,并将数据按key进行hash然后根据key的hash值,将key写入对应的磁盘文件之中。当然,写入磁盘文件时也是先写入内存缓冲,缓冲写满之后再溢写到磁盘文件的。最后,同样会将所有临时磁盘文件都合并成一个磁盘文件,并创建一个单独的索引文件。

​ 该过程的磁盘写机制其实跟未经优化的HashShuffleManager是一模一样的,因为都要创建数量惊人的磁盘文件,只是在最后会做一个磁盘文件的合并而已。因此少量的最终磁盘文件,也让该机制相对未经优化的HashShuffleManager来说,shuffle read的性能会更好。

​ 而该机制与普通SortShuffleManager运行机制的不同在于:第一,磁盘写机制不同;第二,不会进行排序。也就是说,启用该机制的最大好处在于,shuffle write过程中,不需要进行数据的排序操作,也就节省掉了这部分的性能开销。

在这里插入图片描述

7.4 shuffle相关参数调优

​ 以下是Shffule过程中的一些主要参数,这里详细讲解了各个参数的功能、默认值以及基于实践经验给出 的调优建议。

7.4.1 spark.shuffle.file.buffer

  • 默认值:32k
  • 参数说明:该参数用于设置shuffle write task的BufferedOutputStream的buffer缓冲大小。将数 据写到磁盘文件之前,会先写入buffer缓冲中,待缓冲写满之后,才会溢写到磁盘。
  • 调优建议:如果作业可用的内存资源较为充足的话,可以适当增加这个参数的大小(比如64k), 从而减少shuffle write过程中溢写磁盘文件的次数,也就可以减少磁盘IO次数,进而提升性能。在 实践中发现,合理调节该参数,性能会有1%~5%的提升。

7.4.2 spark.reducer.maxSizeInFlight

  • 默认值:48m
  • 参数说明:该参数用于设置shuffle read task的buffer缓冲大小,而这个buffer缓冲决定了每次能 够拉取多少数据。
  • 调优建议:如果作业可用的内存资源较为充足的话,可以适当增加这个参数的大小(比如 96m),从而减少拉取数据的次数,也就可以减少网络传输的次数,进而提升性能。在实践中发 现,合理调节该参数,性能会有1%~5%的提升。

7.4.3 spark.shuffle.io.maxRetries

  • 默认值:3
  • 参数说明:shuffle read task从shuffle write task所在节点拉取属于自己的数据时,如果因为网络 异常导致拉取失败,是会自动进行重试的。该参数就代表了可以重试的最大次数。如果在指定次数 之内拉取还是没有成功,就可能会导致作业执行失败。
  • 调优建议:对于那些包含了特别耗时的shuffle操作的作业,建议增加重试最大次数(比如60 次),以避免由于JVM的full gc或者网络不稳定等因素导致的数据拉取失败。在实践中发现,对于 针对超大数据量(数十亿~上百亿)的shuffle过程,调节该参数可以大幅度提升稳定性。

7.4.4 spark.shuffle.io.retryWait

  • 默认值:5s
  • 参数说明:具体解释同上,该参数代表了每次重试拉取数据的等待间隔,默认是5s。
  • 调优建议:建议加大间隔时长(比如60s),以增加shuffle操作的稳定性。

7.4.5 spark.shuffle.memoryFraction

  • 默认值:0.2
  • 参数说明:该参数代表了Executor内存中,分配给shuffle read task进行聚合操作的内存比例,默 认是20%。
  • 调优建议:在资源参数调优中讲解过这个参数。如果内存充足,而且很少使用持久化操作,建议调 高这个比例,给shuffle read的聚合操作更多内存,以避免由于内存不足导致聚合过程中频繁读写 磁盘。在实践中发现,合理调节该参数可以将性能提升10%左右。

7.4.6 spark.shuffle.manager

  • 默认值:sort
  • 参数说明:该参数用于设置ShuffleManager的类型。Spark 1.5以后,有三个可选项:hash、sort 和tungsten-sort。HashShuffleManager是Spark 1.2以前的默认选项,但是Spark 1.2以及之后的 版本默认都是SortShuffleManager了。tungsten-sort与sort类似,但是使用了tungsten计划中的 堆外内存管理机制,内存使用效率更高。
  • 调优建议:由于SortShuffleManager默认会对数据进行排序,因此如果你的业务逻辑中需要该排 序机制的话,则使用默认的SortShuffleManager就可以;而如果你的业务逻辑不需要对数据进行 排序,那么建议参考后面的几个参数调优,通过bypass机制或优化的HashShuffleManager来避 免排序操作,同时提供较好的磁盘读写性能。这里要注意的是,tungsten-sort要慎用,因为之前 发现了一些相应的bug。

7.4.7 spark.shuffle.sort.bypassMergeThreshold

  • 默认值:200
  • 参数说明:当ShuffleManager为SortShuffleManager时,如果shuffle read task的数量小于这个 阈值(默认是200),则shuffle write过程中不会进行排序操作,而是直接按照未经优化的 HashShuffleManager的方式去写数据,但是最后会将每个task产生的所有临时磁盘文件都合并成 一个文件,并会创建单独的索引文件。
  • 调优建议:当你使用SortShuffleManager时,如果的确不需要排序操作,那么建议将这个参数调 大一些,大于shuffle read task的数量。那么此时就会自动启用bypass机制,map-side就不会进 行排序了,减少了排序的性能开销。但是这种方式下,依然会产生大量的磁盘文件,因此shuffle write性能有待提高。

7.4.8 spark.shuffle.consolidateFiles

  • 默认值:false
  • 参数说明:如果使用HashShuffleManager,该参数有效。如果设置为true,那么就会开启 consolidate机制,会大幅度合并shuffle write的输出文件,对于shuffle read task数量特别多的情 况下,这种方法可以极大地减少磁盘IO开销,提升性能。
  • 调优建议:如果的确不需要SortShuffleManager的排序机制,那么除了使用bypass机制,还可以 尝试将spark.shffle.manager参数手动指定为hash,使用HashShuffleManager,同时开启 consolidate机制。在实践中尝试过,发现其性能比开启了bypass机制的SortShuffleManager要高 出10%~30%。


声明:
        文章中代码及相关语句为自己根据相应理解编写,文章中出现的相关图片为自己实践中的截图和相关技术对应的图片,若有相关异议,请联系删除。感谢。转载请注明出处,感谢。


By luoyepiaoxue2014

微博地址: http://weibo.com/luoyepiaoxue2014 点击打开链接

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/45291.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

数图互通高校房产管理——公房管理功能详解

数图互通房产管理系统在这方面做得比较全面; 公用房管理 1 房屋档案 可维护校区信息、公房的楼栋信息、公房的房间信息,记录校区的名称、建成年份、占地面积、建筑面积等基本信息。记录楼栋的编号、名称、建筑面积、使用面积等基本信息,同…

[附源码]计算机毕业设计springboot超市商品管理

项目运行 环境配置: Jdk1.8 Tomcat7.0 Mysql HBuilderX(Webstorm也行) Eclispe(IntelliJ IDEA,Eclispe,MyEclispe,Sts都支持)。 项目技术: SSM mybatis Maven Vue 等等组成,B/S模式 M…

企业研发数据:省级工业企业研发费用、企业非真实研发支出原始数据加计算stata do代码两大维度指标

1、省级工业企业研发费用 1、数据来源:Wind 2、时间跨度:2008-2019年 3、区域范围:全国31省(区分了东中西部地区) 4、指标说明: 包含如下指标: 省级规模以上工业企业R&D经费 省级规模以上工业企…

Kotlin高仿微信-第8篇-单聊

Kotlin高仿微信-项目实践58篇详细讲解了各个功能点,包括:注册、登录、主页、单聊(文本、表情、语音、图片、小视频、视频通话、语音通话、红包、转账)、群聊、个人信息、朋友圈、支付服务、扫一扫、搜索好友、添加好友、开通VIP等众多功能。 Kotlin高仿…

http和dubbo接口调用主动设置超时时间

http接口超时方案 方案1:多个resttemplate,不同超时时间的使用不同的template,优点:简单,扩展起来复制粘贴,缺点:代码冗余,多个template占用内存不够优雅 方案2:单个res…

MySQL事务和锁

目录 1、四大特性 2、事务引发的问题 3、事务控制演进 3.1、排队 3.2、排它锁 3.3、读写锁 3.4、MVCC 4、事务的隔离级别 4.1、四种隔离级别 4.2、事务隔离级别和锁的关系 4.3、MySQL隔离级别控制 5、锁机制和实战 5.1、锁分类 5.1.1、按操作粒度分类 5.1.2、按…

CDMP选修课都有什么?

大家都知道CDMP认证考试有四个级别。分别是A级(基础级)P级(实践级)M级(专业级)F级(大师级)。级别越高,考试难度就越大,分数比例要求也更高,相对应…

在Linux命令行中查找空目录

在 Linux 系统中,出现空的目录这是很正常的事情,而且,也是有办法一次性把它们都找出来的。 但是,仅仅列出空目录并不是我们的目的,我们今天了解一下如何删除这些空的目录。 在Linux中查找空目录 查找空目录&#xf…

ThinkPHP5文档学习——配置

文章目录一、配置目录二、配置格式PHP数组定义其它格式的支持二级配置三、配置加载惯例配置应用配置拓展配置场景配置四、读取配置参数五、动态配置设置配置参数六、独立配置独立配置文件V5.0.1版本已经废除该写法自动读取扩展配置七、配置作用域八、环境变量配置一、配置目录 …

语文课内外杂志语文课内外杂志社语文课内外杂志社2022年第14期目录

幼儿教育《语文课内外》投稿:cn7kantougao163.com 家园协同视域下幼儿心理危机的预防与干预对策 曹锭1-3 幼小衔接阶段幼儿时间观念的培养对策 陈晶晶4-6 有效支持 助力幼儿在书海中徜徉 胡玲珊7-9 东西部幼儿园结对帮扶,助力乡村教育扶贫——以广州市人民政府机关幼…

虹科分享|终端安全防护|网络安全术语列表(二)

如果你的工作或者生活与网络安全有关,你就知道它使用了自己独特的、不断发展的语言。术语和缩略语受到网络安全专家的喜爱。因此,我们创建了一个全面的网络安全词汇表,解释了常用的网络安全术语、短语和技术。我们设计此列表是为了揭开安全专…

[附源码]Python计算机毕业设计Django贷款申请审核管理系统论文

项目运行 环境配置: Pychram社区版 python3.7.7 Mysql5.7 HBuilderXlist pipNavicat11Djangonodejs。 项目技术: django python Vue 等等组成,B/S模式 pychram管理等等。 环境需要 1.运行环境:最好是python3.7.7,我…

你知道吗?小程序组件≠小程序插件

一直以为小程序组件和小程序插件是一回事,只是措辞不一样,导致造成乌龙,其实完全是两回事,插件是可以直接提供服务的,组件是给开发者提供的轮子,不能直接提供服务。 先看看微信是如何定义小程序插件的&…

什么是深度卷积可分离Depthwise Separable conv

假设加载进来一张GRB三通道的图片 我现在就把三个通道拆开 注意哦 传统的就是 一个卷积核filter 和三个通道channel 进行卷积 现在这个深度卷积可分离 就要用三个不同的卷积核来对每一个通道进行卷积 小细节 :如果是传统意义上的卷积,但用一个的话&a…

【SQL Server + MySQL一】数据库基本知识、关系数据模型、关系数据语言--关系代数

极其感动!!!当时学数据库的时候,没白学!! 时隔很长时间回去看数据库的笔记都能看懂,每次都靠这份笔记巩固真的是语雀分享要花钱,要不一定把笔记给贴出来(;༎ຶД༎ຶ) ,除…

详解设计模式:装饰器模式

装饰器模式(Decorator Pattern)也称为包装模式(Wrapper Pattern),是 GoF 的 23 种设计模式中的一种结构型设计模式。装饰器模式 是指在不改变原有对象的基础之上,将功能附加到对象上,提供了比继…

3.7.2、IP地址(网际层)

我们日常的大多数网络应用中,属于数据链路层的 MAC 地址,和属于网络层的 IP 地址都在使用,它们之间存在一定的关系。这里主要介绍 IP 地址的作用 1、基本介绍 IP 地址是因特网(Internet)上的主机和路由器所使用的地址,用于标识两…

osgEarth示例分析——osgearth_graticule

前言 本示例最具有借鉴的功能:绘制网格、网格上的文字显示、拾取地球的坐标。在地球网格示例中,可以设置4种网格。执行命令如下: // --geodetic osgearth_graticuled.exe --geodetic earth_image\china-simple.earth // --utm osgearth_gr…

[附源码]Python计算机毕业设计Django大学生心理测评系统

项目运行 环境配置: Pychram社区版 python3.7.7 Mysql5.7 HBuilderXlist pipNavicat11Djangonodejs。 项目技术: django python Vue 等等组成,B/S模式 pychram管理等等。 环境需要 1.运行环境:最好是python3.7.7,我…

磁盘划分和磁盘格式化

文章目录列出装置的 UUID 等参数parted 列出磁盘的分区表类型与分区信息磁盘分区:gdisk、fdisk用 gdisk 新增分区槽用 gdisk 删除一个分区槽磁盘格式化(建立文件系统)XFS 文件系统 mkfs.xfsXFS 文件系统 for RAID 效能优化(Option…