深入理解 Spark(三)SparkTask 执行与 shuffle 详解

news2025/4/23 7:43:50

SparkTask 的分发部署与启动流程分析

Spark Action 算子触发 job 提交

Spark 当中 Stage 切分源码详解

在这里插入图片描述

Task 的提交与执行

在这里插入图片描述

SparkShuffle 机制详解

MapReduceShuffle 全流程深度剖析

在这里插入图片描述
MapReduce 全流程执行过程中参与工作的组件以及他们的执行先后顺序:InputFormat => RecordReader => Mapper => Partitioner => Sorter => Combiner => Fetch => Group => Reducer => OutputFormat => RecordWriter

Partitioner 的常见规则

在这里插入图片描述

MapReduce Shuffle 为什么要排序?

  1. ReduceTask 的输入数据来自于上游 MapTask 的输出,并且普遍情况下,每个 MapTask 都会为每个 ReduceTask 提供部分输入。
  2. ReduceTask 的执行逻辑是:将 ReduceTask 的输入数据,按照标记分组之后,每次针对一组数据执行一次逻辑计算。
  3. 那么如果不排序,则每次为了得到待计算的一组数据,都需要完整扫描这个输入数据集一次,如果该 ReduceTask 的输入数据集的标记特比多,则需要扫描这个数据集成千上万次,这效率是非常低下的。
  4. 所以解决方案,就是给 ReduceTask 的输入数据集进行按照标记排序,然后 ReduceTask 在执行逻辑计算的时候只需要按照顺序扫描一次就能完成所有数据组的逻辑计算
  5. 既然 ReduceTask 的输入数据是需要排序的,而且 ReduceTask 的输入数据是来自于上游的MapTask 的,那为什么不让 MapTask 先给结果数据排序,最终 ReduceTask 拉取到所有数据再来做归并呢?利用分布式思想,本来该一个 ReduceTask 完成排序的,结果该排序的压力分散到多个上游 MapTask 之中,进一步提高效率。
  6. 所以 MapTask 的输出就不能直接写磁盘了,如果直接写磁盘,就只能最后做一次磁盘扫描将数据读取到内存再完成排序之后溢写到磁盘。这样做未免效率低,所以好方式是:MapTask 输出的时候,先将数据输出到内存,当然内存不可能是无限大,总有装不下的时候,所以当一定内存装满的时候,就对这部分输出数据进行排序,再刷写到磁盘,释放内存。
  7. 但是这样做,又发现一个新问题:当内存装满了之后,要先排序,然后溢写到内存,那这样是不是就阻塞了 MapTask 的继续输出呢?是的。所以又提供了优化方案:这个固定内存 100M 当写满 80% 的时候,就对这 80% 的数据执行排序后溢写到磁盘,这样子剩下的 20% 区域就可以继续接收 MapTask 的输出了。
  8. 最后再补充一点:当 MapTask 的输出做了排序之后,也可以让标记相同的待计算数据提前做预汇总,降低 Shuffle 中网络数据传输量,节省带宽。

MapReduce Shuffle 执行排序的局限性:

  1. 如果最终需要计算得到的结果集并不需要排序,这个排序则是多此一举。
  2. 多次溢写形成的多个临时磁盘文件需要做合并,这会导致磁盘 IO 的负担很重,这也是 MapReduce 效率低但是稳定的重要原因。

MapReduce Shuffle 为什么要文件合并?

  1. 由于 MapTask 输出数据的时候,是先写入 100M 的内存区间中,每次装满 80% 则执行一次溢写形成一个磁盘临时文件,这样必定会导致 MapTask 的输出磁盘文件会特别多,给文件系统带来负担。
  2. 如果不合并,那么 ReduceTask 过来拉取 MapTask 的输出数据的时候,需要打开很多的文件句柄,进一步增加负担。
  3. 每个 MapTask 输出的单个文件是有序的,但是不代表该 MapTask 输出的所有结果都是有序的,所以还需要做文件的合并来保证 MapTask 的输出有序。

Spark 当中的 Shuffle 机制介绍

大多数 Spark 作业的性能主要就是消耗在了 shuffle 环节,因为该环节包含了大量的磁盘 IO、序列化、网络数据传输等操作。理解 Spark 的 shuffle 的工作原理,有助于对 spark application 进行调优,减少资源消耗,提升生产效率。
在 Spark 的源码中,负责 shuffle 过程的执行、计算和处理的组件主要就是 ShuffleManager,也即shuffle 管理器。

  • Spark-1.2 版本以前:默认实现是:HashShuffleManager
  • Spark-1.2 版本以后:默认实现是:SortShuffleManager
  • Spark-3.x 版本以后:彻底移除了:HashShuffleManager,只留下 SortShuffleManager

HashShuffleManager 的缺点是 shuffle 过程中会产生大量的临时结果文件,SortShuffleManager 的改进是让每个 Task 只产生一个结果文件(多个临时文件会合并到一个文件中),下游的 Task 过来拉取对应分区数据的时候,只需要去根据索引按需获取即可。

spark shuffle 概念介绍

Spark Job 依赖图:
在这里插入图片描述
将对应的 RDD 标注上去后:
在这里插入图片描述
其中的 shuffle 过程:
在这里插入图片描述
前一个 Stage 的 ShuffleMapTask 进行 Shuffle Write, 把数据存储在 BlockManager 上面, 并且把数据位置元信息上报到 Driver 的 MapOutTrack 组件中, 下一个 Stage 根据数据位置元信息, 进行 Shuffle Read, 拉取上个 Stage 的输出数据。

HashShuffle 过程详解

普通的 Hash Shuffle 机制

在这里插入图片描述
上图中,

  1. buffer 起到的是缓存作用,缓存能够加速写磁盘,提高计算的效率,buffer 的默认大小 32k。
  2. 分区器:根据 hash/numRedcue 取模决定数据由几个 Reduce 处理,也决定了写入几个 buffer 中。
  3. block file:磁盘小文件,从图中我们可以知道磁盘小文件的个数计算公式:block_file_cnt = M * R 。 M 为 map task 的数量,R 为 Reduce 的数量,一般 Reduce 的数量等于 buffer 的数量,都是由分区器决定的。
  4. Shuffle 阶段在磁盘上会产生海量的小文件,建立通信和拉取数据的次数变多,此时会产生大量耗时低效的 IO 操作 (因为产生过多的小文件)
  5. 大量耗时低效的 IO 操作 ,导致写磁盘时的对象过多,读磁盘时候的对象也过多,这些对象存储在堆内存中,会导致堆内存不足,相应会导致频繁的 GC,GC 会导致 OOM。由于内存中需要保存海量文件操作句柄和临时信息,如果数据处理的规模比较庞大的话,内存不可承受,会出现 OOM 等问题。

合并机制的 Hash shuffle

合并机制就是复用 buffer 缓冲区,开启合并机制的配置是 spark.shuffle.consolidateFiles。该参数默认值为 false,将其设置为 true 即可开启优化机制。通常来说,如果我们使用 HashShuffleManager,那么都建议开启这个选项。
在这里插入图片描述
此时 block_file = Core * R ,Core 为 CPU 的核数,R 为 Reduce 的数量,但是如果 Reducer 端的并行任务或者是数据分片过多的话则 Core * Reducer Task 依旧过大,也会产生很多小文件。

SortShuffle 过程详解

Sort shuffle 的普通机制

在这里插入图片描述
这个机制的好处:

  1. 小文件明显变少了,一个 task 只生成一个 file 文件
  2. file 文件整体有序,加上索引文件的辅助,查找变快,虽然排序浪费一些性能,但是查找变快很多

ByPass 模式的 SortShuffle 机制

bypass 机制运行条件是 shuffle map task 数量小于 spark.shuffle.sort.bypassMergeThreshold 参数(默认值 200)的值,且不是聚合类的 shuffle 算子(比如 reduceByKey)。
在这里插入图片描述

补充

  • hash-based shuffle 中,排序发生在 reduce 阶段
  • sort-based shuffle 中,排序发生在 shuffle 阶段

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1382595.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

山西电力市场日前价格预测【2024-01-15】

日前价格预测 预测说明: 如上图所示,预测明日(2024-01-15)山西电力市场全天平均日前电价为399.10元/MWh。其中,最高日前电价为583.33元/MWh,预计出现在18:15。最低日前电价为275.09元/MWh,预计…

分享从零开始学习网络设备配置--任务4.4 使用动态路由OSPFv3实现网络连通

任务描述 由于RIPng不适用于复杂的网络,考虑到公司的未来发展,需要不断扩大网络规模。某公司在企业网络升级时,选择 OSPFv3路由协议实现网络连通,降低网络拓扑变化引发的人工维护工作量并加快网络收敛的速度。 公司内部的所有设…

leetcode 2114. 句子中的最多单词数

题目: 一个 句子 由一些 单词 以及它们之间的单个空格组成,句子的开头和结尾不会有多余空格。 给你一个字符串数组 sentences ,其中 sentences[i] 表示单个 句子 。 请你返回单个句子里 单词的最多数目 。 解题方法: 1.遍历列表…

vivado IP使用

使用IP源 注意:有关IP的更多信息,包括添加、打包、模拟和升级IP,请参阅VivadoDesign Suite用户指南:使用IP(UG896)进行设计。在Vivado IDE中,您可以在RTL项目中添加和管理以下类型的IP核心&…

一分钟找到所有的中文核心期刊

1.进入中国知网找到出版物检索 2.在出版来源导航这里选择期刊导航 3.右边拉到底选择核心期刊导航 4.选择自己专业的期刊即可

SpringMVC 学习博客记录

文章目录 Servlet请求转发和请求包含RequestDispatcher HandlerInterceptor组件实际运用场景 HandlerMapping&RequestMappingInfo(HandlerMapping)HandlerExecutionChainHandlerAdapter源码学习知识点博客记录 Servlet请求转发和请求包含 RequestDispatcher Request#getR…

微服务技术要点

一、服务注册到nacos 1.下载nacos,修改nacos启动模式为单机模式,另外需要在环境变量配置JAVA_HOME,否则启动不起来。 2.启动类加注解EnableDiscoveryClient 3.application.yml配置nacos地址 spring:cloud:nacos:discovery:server-addr: 127.0.0.1:884…

JDK-JVM

JVM JDKJDK内部体系结构:JVM 与 跨平台JVM在程序运行过程中的运行细节,内存分配 和 流转模型。JVM结构体系1. 虚拟机栈2. 线程栈2.1. 栈帧2.2. 数据结构栈 与 线程栈 的关系:2.3.栈帧的内部结构:2.4 方法中的数据 在栈帧中的流转过…

MyBatis第三课

目录 回顾 #和$区别 #(预编译SQL)和$(即时SQL,它是进行的字符串拼接)的区别,其中之一就是预编译SQL和即时SQL的区别 原因: 回顾 两者的共同点 MaBits可以看作是Java程序和Mysql的沟通桥梁&…

网页设计与网站建设作业html+css+js,一个简易的游戏官网网页

一个简易的游戏网页 浏览器查看 目录结构 部分代码 <!DOCTYPE html> <html lang"en"><head><meta charset"UTF-8"><meta http-equiv"X-UA-Compatible" content"IEedge"><meta name"viewport&…

【c++】利用嵌套map创建多层树结构

通常树的深度都大于1&#xff0c;即树有多层&#xff0c;而树结构又可以用c的map容器来实现&#xff0c;所以&#xff0c;本文给出了一种多层树结构的实现思路&#xff0c;同时也给出了相应的c代码。 整体思路概述 首先定义一个节点类Node类&#xff0c;要包括children&#x…

EI论文复现:考虑多能互补的综合能源系统/虚拟电厂/微电网优化运行程序代码!

本程序参考EI论文《基于多能互补的热电联供型微网优化运行》&#xff0c;文章通过储能设备解耦热电联系&#xff0c;建立基于多能互补的综合能源系统/虚拟电厂/微电网优化运行模型。模型包含系统供给侧的多能互补协调与需求侧的综合能源响应两个方面&#xff0c;使供给侧通过能…

基于springboot时间管理系统源码和论文

在Internet高速发展的今天&#xff0c;我们生活的各个领域都涉及到计算机的应用&#xff0c;其中包括时间管理系统的网络应用&#xff0c;在外国时间管理系统已经是很普遍的方式&#xff0c;不过国内的管理系统可能还处于起步阶段。时间管理系统具有时间管理功能的选择。时间管…

XTuner 大模型单卡低成本微调实战

文章目录 配置环境微调部署与测试自定义微调 XTuner 大模型单卡低成本微调 原理可查看 XTuner 大模型单卡低成本微调原理 配置环境 创建一个名为xtuner&#xff0c;python3.10版本虚拟环境 conda create --name xtuner0.1.9 python3.10 -y创建一个xtuner019文件夹&#xff0c…

WebGL在虚拟现实(VR)的应用

WebGL在虚拟现实&#xff08;VR&#xff09;领域的应用日益增多&#xff0c;它为在Web浏览器中创建交互式的虚拟现实体验提供了强大的支持。以下是一些WebGL在VR领域的应用示例&#xff0c;希望对大家有所帮助。北京木奇移动技术有限公司&#xff0c;专业的软件外包开发公司&am…

Word插件-大珩助手-手写电子签名

手写签名 支持鼠标写&#xff0c;支持触摸屏写&#xff0c;点击画笔按钮切换橡皮擦&#xff0c;支持清空画板重写&#xff0c;点击在word中插入签名&#xff0c;可插入背景透明的签字图 素材库-保存签名 将写好的签字图复制粘贴到素材库中&#xff0c;以便永久使用&#xff…

Visual Studio Code1.67版本已正式发布,新增Rust指南

Visual Studio Code1.67版本已正式发布&#xff0c;该版本包含大量增强生产力的更新项&#xff1a; 资源管理器文件嵌套 通过这次更新&#xff0c;用于浏览和管理文件和文件夹的Visual Studio Code的资源管理器工具现在支持基于名称嵌套相关文件。 资源管理器现在支持根据文…

【Linux操作】国产Linux服务管理操作

【Linux操作】国产Linux服务管理操作 前言SAMBA配置服务器端1. 安装相关包2. 配置/etc/samba/smb.conf&#xff0c;在此文件末尾添加如下内容&#xff0c;并保存退出。3. 创建/home/share并更改权限4. 启动samba服务 客户端• Windows客户端• 麒麟客户端 Telnet1、telnet语法2…

机器学习中的线性回归

线性回归 概念 利用 回归方程(函数) 对 一个或多个自变量(特征值)和因变量(目标值)之间 关系进行建模的一种分析方式。 分类 一元线性回归&#xff1a;y wx b 目标值只与一个因变量有关系 多元线性回归&#xff1a; y w_1x_1 w_2x_2 w_3x_3 … b 目标值只与多个…

Linux Ubuntu搭建我的世界Minecraft服务器实现好友远程联机MC游戏

文章目录 前言1. 安装JAVA2. MCSManager安装3.局域网访问MCSM4.创建我的世界服务器5.局域网联机测试6.安装cpolar内网穿透7. 配置公网访问地址8.远程联机测试9. 配置固定远程联机端口地址9.1 保留一个固定tcp地址9.2 配置固定公网TCP地址9.3 使用固定公网地址远程联机 前言 Li…