搞懂 Spark 系列之 Spark Shuffle 的前世今生

news2025/3/9 23:02:33

注:本文已首发于PowerData公众号!

1 Spark Shuffle 是什么?

Shuffle 中文意思是“洗牌,混洗”,而在 Hadoop 的 MapReduce 框架中,Shuffle 是 Map 和 Reduce 中间必不可少的连接桥梁。数据在从Map 阶段结束经过 Shuffle 到 Reduce 阶段的过程中,涉及到磁盘的读写、网络传输和数据序列化,Shuffle 操作还会在磁盘上生成大量中间
文件,这些都是直接影响程序的性能的,因此,Shuffle 过程的性能高低能够直接影响整个程序的性能高低。

Spark 使用 Hadoop 的 MapReduce 分布式计算框架作为基础,自然也是实现了 Shuffle 的逻辑,而且还进行了优化改进。 Spark Shuffle 的发展时间线如下:

Spark Shuffle的发展时间线

2 为什么会产生 Shuffle

2.1 产生Shuffle的过程

要了解 Shuffle 的产生,首先我们得知道什么是 RDD 的依赖关系。RDDs 通过操作算子进行转换,转换得到的新 RDD 包含了从其他 RDDs衍生所必需的信息,RDDs 之间维护着这种血缘关系(lineage),也称之为依赖。依赖包括两种:窄依赖和宽依赖。

  • 窄依赖:父 RDD 的一个分区只会被子 RDD 的一个分区依赖,即 RDDs 之间分区是一一对应的(1:1 或 n:1)
  • 宽依赖:子 RDD 每个分区与父 RDD 的每个分区都有关,是多对多的关系(即 n:m)

RDD的窄依赖和宽依赖图示

从上图我们可以看出,在窄依赖的过程中,并不会出现原本同属于父 RDD 同一个分区的数据分散到子 RDD 的不同分区,因此而不会产生Shuffle。相反,宽依赖的过程属于是多对多的情况,自然会产生 Shuffle。

2.2 没有Shuffle行不行

首先明确一下,没有Shuffle是不行的。计算过程之所以需要 Shuffle,往往是由计算逻辑、或者说业务逻辑决定的,比如在 Word Count 的例子中,我们的“业务逻辑”是对单词做统计计数,那么对单词“Spark”来说,在做“加和”之前,我们就是得把原本分散在不同 Executors 中的“Spark”,拉取到某一个 Executor,才能完成统计计数的操作。因此,Spark 在做分布式计算的过程中,没有Shuffle就没办法去完成一些聚合计算。

3 Shuffle 包括什么?

Spark 的 Shuffle 包括 Shuffle 写操作和 Shuffle 读操作两类操作。下面将对这两类操作进行详细介绍。

3.1 Shuffle 写操作

Spark 的 Shuffle 相对于 Hadoop 阶段的 Shuffle 进行了一些改动,比如为了避免 Hadoop 多余的排序操作(Reduce 之前按获取的数据需要经过排序),提出了基于哈希的 Shuffle 操作:Hash Shuffle。不过这种方式也有问题,即当 Map 和 Reduce 数量较多的情况下会导致写文件数量大和缓存开销过大,因此,在此基础上,Spark1.2 版本对 Shuffle 又进行了改进,提出了基于排序的 Shuffle 操作:Sort Shuffle。

3.1.1 Hash Shuffle

使用历程

Spark1.1 版本之前,它是 Spark 唯一的 Shuffle 方式,Spark 版本后,默认将 Shuffle 方式为了 Sort Shuffle,在之后的 Spark2.0 中,Hashshuffle 被弃用。

产生原因:

Hadoop 中 Reduce 所处理的很多数据是需要经过排序的,但是实际的数据处理过程中,很多场景并不会对数据进行排序,因此省去外部排序,从而产生了 Hash Shuffle。
其处理流程如下图:

Hash Shuffle处理流程图

在该机制中每个Mapper会根据Reduce的数量创建出相应的bucket,bucket的数据是M * R,其中M是Map 的个数,R是Reduce的个数;Mapper生成的结果会根据设置的Partition算法填充到每个bucket中。这里的bucket是一个抽象的概念,在该机制中每个bucket对应一个文件:当Reduce启动时,会根据任务的编号和所依赖的Mapper的编号从远端或者是本地取得相应的bucket作为Reduce任务的输入进行处理。

该机制的优缺点:

  • 优点:MapReduce中sort作为固定步骤,有许多任务不需要排序,hashShuffle避免不必要的排序所导致不必要的排序和内存开销,提升
    了性能。
  • 缺点:缺点:每个mapTask都会为reduceTask生成一个文件, 会生成M*R个中间文件。数据量越来越多时,产生的文件量是不可控的,严重制约了Spark的性能及扩展能力。

3.1.2 Sort Shuffle

使用历程

Spark1.1版本的时候引入,Spark1.2版本之后,默认使用Sort Shuffle,Spark1.4版本引入钨丝机制

产生原因:

Hash Shuffle采用了文件合并机制后,中间结果文件依旧依赖Reduce的Task个数,文件数仍不可控,其缓存所占用的内存也是一笔不小的开销。为了解决这个问题Spark引入了Shuffle写操作机制。
其处理流程如下图:

Sort Shuffle处理流程图

在该机制中,需要先判断Shuffle MapTask输出结果在Map端是否需要合并(Combine),如果需要合并,则外部排序中进行聚合并排序;如果不需要,则外部排序中不;进行聚合和排序,例如sortByKey操作在Reduce端会进行聚合并排序。确认外部排序方式后,在外部排序中将使用PartitionedAppendOnlyMap来存放数据,当排序中的Map占用的内存已经.超越了使用的阈值,则将Map中的内容溢写到磁盘中,每一次溢写产生一个不同的文件。当所有数据处理完毕后,在外部排序中有可能一部分计算结果在内存中,另一部分计算结果溢写到一或多个文件之中,这时通过merge操作将内存和spill文件中的内容合并整到一个文件里。

该机制的优缺点:

  • 优点:mapTask不会为每个reduceTask生成一个单独的文件,而是全部写到一个数据文件中,同时生成一一个索引文件,reduceTask可以通过索引文件获取相关数据。
  • 缺点:强制要求数据在map端进行排序,导致大量CPU开销。

3.2 Shuffle 读操作

有Shuffle的写操作,自然也就要说一说Shuffle的读操作。相对于Shuffle的写操作,其读操作还是要简单一点的,虽然Shuffle的写操作有不同方式,但是Spark对此采用了相同的读取方式,直接将读取的数据放在哈希列表中方便后续的处理。

其读操作流程如下图:

Shuffle读操作流程图

该流程的实现如下:

  • ①由Executor 的MapOutputTracker发送获取结果状态消息给Driver端的MapOutputTrackerMaster
  • ②然后请求获取上游Shuffle输出结果对应的MapStatus(在该MapStatus存放了结果数据的位置信息);
  • ③得到上游Shuffle结果的位置信息后,对这些位置进行筛选,判断当前运行的数据是从本地还是从远程节点获取。
  • ④如果是本地获取,直接调用BlockManager的getBlockData 方法,在读取数据的时候会根据写入方式采取不同ShuffleBlockResolver读取;如果是在远程节点上,需要通过Netty网络方式读取数据,在远程读取的过程中使用多线程的方式进行读取。

4 Shuffle怎么用?

关于Shuffle的使用,体现在Spark 计算过程中,出现跨节点进行数据分发的数据聚合的场景,本文以最简单的wordcount进行举例。

4.1 需求说明

统计在给定的文本文件中输出每一个单词出现的总次数

4.2 代码实现

1 编写Mapper类

import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
//map阶段
/*
* KEYIN 输入数据的key
* VALUEIN 输入数据的value
* KEYOUT 输出数据的key的类型
* VALUEOUT 输出数据的value类型
*/
public class WordcountMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
    Text k = new Text();
    IntWritable v = new IntWritable(1);
    @Override
    protected void map(LongWritable key, Text value, Context context)
    throws IOException, InterruptedException {
        //获取1行
        String line = value.toString();
        //切割单词
        String[] words = line.split(" ");
        //循环写出
        for (String word : words) {
        k.set(word);
        context.write(k, v);
        }
    }
}

2 编写Reducer类

import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
public class WordcountReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
    IntWritable v = new IntWritable();
    @Override
    protected void reduce(Text key, Iterable<IntWritable> values,
    Context context) throws IOException, InterruptedException {
        int sum = 0;
        //累加求和
        for (IntWritable value : values) {
            sum += value.get();
        }
        v.set(sum);
        //写出
        context.write(key, v);
    }
}

3 编写Driver类

import java.io.File;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class WordcountDriver {
    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
        // 输入输出路径需要根据自己电脑上实际的输入输出路径设置
        args = new String[] { "自己的输入路径", "自己的输出路径" };
        Configuration conf = new Configuration();
        
        //获取Job对象
        Job job = Job.getInstance(conf);
        
        //设置jar存储位置
        job.setJarByClass(WordcountDriver.class);
        
        //关联Map和Reduce类
        job.setMapperClass(WordcountMapper.class);
        job.setReducerClass(WordcountReducer.class);
        
        //设置Mapper阶段输出数据key和value类型
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(IntWritable.class);
        
        //设置最终数据输出的key和value类型
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);
        // job.setCombinerClass(WordcountReducer.class);
        
        //设置输入路径和输出路径
        FileInputFormat.setInputPaths(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));
        
        //提交Job
        // job.submit();
        boolean result = job.waitForCompletion(true);
        System.out.println(result? 0 : 1);
    }
}

4.3 执行过程图

下图是上述代码的流程图,其中体现了Shuffle的使用过程。

上述代码的流程图

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/116995.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

优化器核心技术—Join Reorder

Join Reorder 的简介 Join Reorder 是开务数据库 SQL 优化器中的核心优化算法&#xff0c;开务数据库优化器包括 RBO 和 CBO 两部分&#xff0c;负责计划优化&#xff0c;提升 SQL 执行性能。Join Reorder 能够保证在复杂查询执行的场景下&#xff0c;枚举合法的执行路径&…

分布式系统稳定性建设指南

来源&#xff1a; 中国信息通信研究院 系统稳定性能建设是一个系统化工程&#xff0c;需要硬件软件&#xff0c;需要从企业工程建设的全环节进行设计和实施&#xff0c;充分利用以混沌工程、全链路压测为代表的分布式稳定性保障技术&#xff0c;建设保障能力&#xff0c;改造运…

域名系统 DNS(计算机网络-应用层)

目录 互联网的域名结构 顶级域名 TLD(Top Level Domain) 域名服务器 域名系统 DNS 域名解析的过程 域名服务器的四种类型 本地域名服务器 DNS 协议 DNS缓存 DNS提供的其它服务 互联网的域名结构 域名系统 DNS (Domain Name System)&#xff0c;实现主机名&#xff08;域…

(三)HTTPTomcatServlet

一、HTTP HyperText Transfer Protocol&#xff0c;超文本传输协议&#xff0c;规定了浏览器和服务器之间数据传输的规则。 HTTP协议特点&#xff1a; &#xff08;1&#xff09;基于TCP协议&#xff1a;面向连接&#xff0c;安全 &#xff08;2&#xff09;基于请求-响应模型的…

漫画 | JavaScript杀死了编程......

上班路上&#xff0c;张大胖突然想到一个点子。晚上&#xff0c;张大胖开工&#xff0c;决定把好点子给实现了。JavaScript开发效率贼高&#xff0c;项目迅速完成。项目开源发布&#xff0c;获得了不少用户&#xff0c;收获了不少star&#xff0c;张大胖非常满意。转眼间&#…

【GlobalMapper精品教程】039:GM面状数据符号化设置案例教程

GM面状数据符号化设置案例教程。 文章目录 一、使用基于分类或自定义样式的默认样式二、对所有要素使用相同样式三、基于属性/名称值应用样式四、随机指定颜色给要素一、使用基于分类或自定义样式的默认样式 该样式类似于其他GIS软件的单一样式,即为数据加载的默认样式。加载…

Stable Diffusion攻略集(Stable Diffusion官方文档、kaggle notebook、webui资源帖)

文章目录第一部分一、 Features&#xff1a;Textual Inversion&#xff08;文本反转&#xff09;1.1 Textual Inversion 简介1.1.1 摘要1.1.2 算法1.1.3 模型效果1.2 Textual Inversion of webai1.2.1 预训练embedding用于图片生成1.2.2 训练embedding1.2.3 Finding embeddings…

【Redis】知识体系结构构建以及常见考题汇总

【Redis】知识体系结构构建以及常见考题汇总1、思维导图2、Redis体系结构&#xff1a;原理、部署2.1、Redis实现原理2.1.1、Redis中数据结构2.1.2、不同对象的数据类型&#xff08;基本类型、特殊类型&#xff09;2.2、Redis部署2.2.1、基本配置&#xff08;缓存的更新、删除和…

【Linux】基础命令

目录 1.ls指令 2.pwd指令 3.cd指令 4.touch指令 5.mkdir指令 6.rmdir指令 && rm 指令 7.man指令 8.cp指令 9.mv指令 10.cat指令 11.less指令 12. head和tail 13.date指令 14.grep指令 15.bc指令 16.重要的几个热键 1.ls指令 语法&#xff1a;ls 选项 …

Taro React组件开发(2) —— RuiEditor 富文本编辑器【兼容H5和微信小程序】

1. 富文本编辑器需求分析 需要实现图片上传显示,上传使用Taro的 chooseImage 和 uploadFile,完成图片的上传!!!文字的居左、居中、居右展示,使用格式化方法 format!!!文字的加粗、倾斜、下划线,使用格式化方法 format!!!2. 富文本编辑 获取提示文本 placeholder;…

【C++】string类常用函数接口

在使用库函数中的string类时&#xff0c;需要包含头文件#include 。 1.构造函数和拷贝构造 string s1;string s2("hello world");string s3(s2);下面通过VS调试的监视窗口看一下初始化之后的内容&#xff1a; 还有一种构造函数&#xff0c;是拷贝一个字符串的一部分…

Kubernetes 1.26 新功能 Pod 调度就绪特性解析

Kubernetes 1.26 新功能 Pod 调度就绪特性解析 Kubernetes 1.26 引入了 Pod 的一个新特性&#xff1a;scheduling gates。在 Kubernetes 中&#xff0c;调度门是告诉调度程序何时准备好考虑调度 Pod 的 keys。 它解决了什么问题&#xff1f; 当一个 Pod 创建时&#xff0c;调…

从发现SQL注入到ssh连接

前言&#xff1a; 某天&#xff0c;同事扔了一个教育站点过来&#xff0c;里面的url看起来像有sql注入。正好最近手痒痒&#xff0c;就直接开始。 一、发现时间盲注和源码 后面发现他发的url是不存在SQL注入的&#xff0c;但是我在其他地方发现了SQL盲注。然后改站点本身也可…

C++/Java调用C++动态链接库————附带示例和详细讲解

文章目录0 准备1 C调用动态链接库2 Java调用C动态链接库3 运行0 准备 在CMake中&#xff0c;使用如下的方法把代码编译成动态/静态链接库&#xff1a; # 设置项目名 project(getMatInfo)# 设置c版本 set(CMAKE_CXX_STANDARD 11)# 如果不填写SHARE&#xff0c;默认为静态链接…

Spring Security开发实践

Spring Security 是 Spring 家族中用于提供认证、授权和攻击防护功能的一套安全框架。它也是 Spring 应用在安全框架方面的公认标准。 Spring Security 安全框架适合为 Spring Boot 项目提供安全保护&#xff0c;所以如果您是个 Spring Boot 项目的开发人员&#xff0c;且正在寻…

Python 将视频按照时间维度剪切 | Python工具

目录 前言 环境依赖 代码 总结 前言 本文提供将视频按照时间维度进行剪切的工具方法&#xff0c;一如既往的实用主义。 环境依赖 ffmpeg环境安装&#xff0c;可以参考我的另一篇文章&#xff1a;windows ffmpeg安装部署_阿良的博客-CSDN博客 本文主要使用到的不是ffmpeg&a…

基于Vue.js+Node问卷调查系统的设计与实现

作者主页&#xff1a;Designer 小郑 作者简介&#xff1a;Java全栈软件工程师一枚&#xff0c;来自浙江宁波&#xff0c;负责开发管理公司OA项目&#xff0c;专注软件前后端开发&#xff08;Vue、SpringBoot和微信小程序&#xff09;、系统定制、远程技术指导。CSDN学院、蓝桥云…

ORM框架

ORM框架可以做的两件事&#xff1a; 创建、修改、删除数据库中的表&#xff08;不用写SQL语句)。【无法创建数据库】操作表中的数据&#xff08;不用写SQL语句&#xff09; 1.安装第三方模块&#xff1a; pip3.9 install mysqlclient 2.创建数据库&#xff1a; 启动Mysql服务…

20221226编译Toybrick的TB-RK3588X开发板的Android12系统2-SDK预处理

20221226编译Toybrick的TB-RK3588X开发板的Android12系统2-SDK预处理 2022/12/26 16:40 结论&#xff1a; 1、风火轮技术团队的技术支持力度欠佳&#xff01; 淘宝客服只能处理发货问题&#xff0c;发发SDK还可以&#xff0c;技术问题只能找联系方式 联系手机&#xff1a;18926…

一起从零开始学VUE(16)生命周期与组合式API

文章目录生命周期自定义hook函数toRef其他组合APIshallowReactive与shallowRefreadonly与shallowReadonllytoRaw 与 markRawcustomRefprovide与inject响应式数据的判断生命周期 除了直接写对应的钩子函数外&#xff0c;Vue3.0也提供了composition API形式的钩子函数&#xff0c…