Spark-ShuffleManager

news2025/1/22 19:56:06

一、上下文

《Spark-Task启动流程》中我们讲到了ShuffleMapTask中会对这个Stage的结果进行磁盘的写入,并且从SparkEnv中得到了ShuffleManager,且调用了它的getWriter方法并在这个Stage的入口处(也就是RDD的迭代器数据源处)调用了它的getReader,下面我们来详细分析下ShuffleManager在整个Task中的形态。

二、ShuffleManager

Shuffle System 的可插拔接口。根据spark.shuffle.manager设置,位于SparkEnv中,在driver 和每个executor 上创建ShuffleManager。driver 来注册一个Shuffle,executor(或在driver中本地运行的任务)可以使用它来进行Shuffle Reader和Shuffle Writer

private[spark] trait ShuffleManager {

  //注册洗牌,并获得一个句柄,以便将其传递给任务
  def registerShuffle[K, V, C](
      shuffleId: Int,
      dependency: ShuffleDependency[K, V, C]): ShuffleHandle

  //为给定分区找一个写入器。被 executor 上的Task调用执行
  //一个Stage最后写入时调用
  def getWriter[K, V](
      handle: ShuffleHandle,
      mapId: Long,
      context: TaskContext,
      metrics: ShuffleWriteMetricsReporter): ShuffleWriter[K, V]

  //获取一个reduce分区范围(startPartition到endPartition-1,包括端点)的读取器,
  //以从map输出范围(startMapIndex到endMapIndex-1,包括两端)读取
  //被reduce端的Executor上的Task调用
  //一个Stage开始计算时调用
  final def getReader[K, C](
      handle: ShuffleHandle,
      startPartition: Int,
      endPartition: Int,
      context: TaskContext,
      metrics: ShuffleReadMetricsReporter): ShuffleReader[K, C] = {
    getReader(handle, 0, Int.MaxValue, startPartition, endPartition, context, metrics)
  }

}

SortShuffleManager

它是ShuffleManager的唯一子类

在基于排序的Shuffle中,传入记录根据其目标分区ID进行排序,然后写入单个map输出文件,reducer获取此文件的连续区域,以便读取其在map输出中的部分。如果map输出数据太大而无法放入内存,则可以将输出的排序子集溢写到磁盘,并将磁盘上的文件合并以生成最终输出文件。

基于排序的Shuffle有两种不同的写入路径来生成对应的map输出文件:

1、序列化排序:当满足以下三个条件时使用

  • Shuffle依赖关系未指定Map侧组合。
  • Shuffle序列化程序支持重新定位序列化值(目前KryoSerialer和Spark SQL的自定义序列化程序支持此功能)
  • 洗牌产生的输出分区小于或等于16777216(2的24次方)个。

2、非序列化排序:用于处理所有其他情况

序列化排序模式

在序列化排序模式下,传入记录在传递给ShuffleWriter后立即被序列化,并在排序过程中以序列化形式缓冲。此写入路径实现了几个优化:

1、它的排序操作基于序列化的二进制数据,而不是Java对象,这减少了内存消耗和GC开销。此优化要求记录序列化器具有某些属性,以允许序列化记录重新排序,而不需要反序列化。有关更多详细信息,请参阅SPARK-4550,其中首次提出并实施了此优化。

2、它使用一个专门的缓存高效排序器([[ShuffleExternalSorter]])对压缩记录指针和分区ID的数组进行排序。通过在排序数组中每条记录只使用8个字节的空间,这可以将更多的数组放入缓存中。

3、溢写合并过程对属于同一分区的序列化记录块进行操作,在合并过程中不需要对记录进行反序列化。

4、当溢写压缩编解码器支持压缩数据的连接时,溢写合并只是将序列化和压缩的溢写分区连接起来,以产生最终的输出分区。这允许使用高效的数据复制方法,如NIO的“transferTo”,并避免了在合并过程中分配解压缩或复制缓冲区的需要。

private[spark] class SortShuffleManager(conf: SparkConf) extends ShuffleManager with Logging {

  //注册Shuffle
  override def registerShuffle[K, V, C](
      shuffleId: Int,
      dependency: ShuffleDependency[K, V, C]): ShuffleHandle = {
    if (SortShuffleWriter.shouldBypassMergeSort(conf, dependency)) {
      //如果有少于spark.shuffle.sort.bypassMergeThreshold 默认值200 的分区,并且我们不需要map侧聚合,那么直接编写numPartitions文件,并在最后将它们连接起来。这避免了两次进行序列化和反序列化以将溢写的文件合并在一起,这在正常的代码路径中会发生。缺点是一次打开多个文件,因此分配给缓冲区的内存更多。
      new BypassMergeSortShuffleHandle[K, V](
        shuffleId, dependency.asInstanceOf[ShuffleDependency[K, V, V]])
    } else if (SortShuffleManager.canUseSerializedShuffle(dependency)) {
      //否则,请尝试以序列化形式缓冲映射输出,因为这样更有效:
      new SerializedShuffleHandle[K, V](
        shuffleId, dependency.asInstanceOf[ShuffleDependency[K, V, V]])
    } else {
      // 如果上面两种情况都不满足,走它:缓冲区数据将以反序列化的形式输出:
      new BaseShuffleHandle(shuffleId, dependency)
    }
  }

  //获取一个reduce分区范围(startPartition到endPartition-1,包括端点)的读取器,
  //以从map输出范围(startMapIndex到endMapIndex-1,包括两端)读取。如果endMapIndex=Int.MaxValue,则实际endMapIndex将更改为“getMapSizesByExecutorId”中Shuffle的总map输出长度。
  override def getReader[K, C](
      handle: ShuffleHandle,
      startMapIndex: Int,
      endMapIndex: Int,
      startPartition: Int,
      endPartition: Int,
      context: TaskContext,
      metrics: ShuffleReadMetricsReporter): ShuffleReader[K, C] = {
    val baseShuffleHandle = handle.asInstanceOf[BaseShuffleHandle[K, _, C]]
    
    val (blocksByAddress, canEnableBatchFetch) =
      //如果启用了Push-based shuffle 且 rdd不是Barrier 那么可以直接构建迭代器,就不用拉取了
      if (baseShuffleHandle.dependency.shuffleMergeEnabled) {
        val res = SparkEnv.get.mapOutputTracker.getPushBasedShuffleMapSizesByExecutorId(
          handle.shuffleId, startMapIndex, endMapIndex, startPartition, endPartition)
        (res.iter, res.enableBatchFetch)
      } else {
        //从mapOutputTracker处获取需要拉取数据的地址
        val address = SparkEnv.get.mapOutputTracker.getMapSizesByExecutorId(
          handle.shuffleId, startMapIndex, endMapIndex, startPartition, endPartition)
        (address, true)
      }
    new BlockStoreShuffleReader(
      handle.asInstanceOf[BaseShuffleHandle[K, _, C]], blocksByAddress, context, metrics,
      shouldBatchFetch =
        canEnableBatchFetch && canUseBatchFetch(startPartition, endPartition, context))
  }

 //每个分区会获取一个ShuffleWriter
 override def getWriter[K, V](
      handle: ShuffleHandle,
      mapId: Long,
      context: TaskContext,
      metrics: ShuffleWriteMetricsReporter): ShuffleWriter[K, V] = {
    val mapTaskIds = taskIdMapsForShuffle.computeIfAbsent(
      handle.shuffleId, _ => new OpenHashSet[Long](16))
    mapTaskIds.synchronized { mapTaskIds.add(mapId) }
    val env = SparkEnv.get
    handle match {
      case unsafeShuffleHandle: SerializedShuffleHandle[K @unchecked, V @unchecked] =>
        new UnsafeShuffleWriter(
          env.blockManager,
          context.taskMemoryManager(),
          unsafeShuffleHandle,
          mapId,
          context,
          env.conf,
          metrics,
          shuffleExecutorComponents)
      case bypassMergeSortHandle: BypassMergeSortShuffleHandle[K @unchecked, V @unchecked] =>
        new BypassMergeSortShuffleWriter(
          env.blockManager,
          bypassMergeSortHandle,
          mapId,
          env.conf,
          metrics,
          shuffleExecutorComponents)
      case other: BaseShuffleHandle[K @unchecked, V @unchecked, _] =>
        new SortShuffleWriter(other, mapId, context, shuffleExecutorComponents)
    }
  }


}

在SortShuffleManager中的getReader()中我们可以看到,只有一种ShuffleReader,即:BlockStoreShuffleReader,但其因为ShuffleDependency的不同也会返回不同的Iterator

getWriter()中看到,根据ShuffleHandle可以分为三种ShuffleWriter,即:

unsafeShuffleHandle  -> UnsafeShuffleWriter

bypassMergeSortHandle -> BypassMergeSortShuffleWriter

BaseShuffleHandle -> SortShuffleWriter

三、谁注册的ShuffleManager

ShuffleDependency中注册的,并得到了一个ShuffleHandle

class ShuffleDependency[K: ClassTag, V: ClassTag, C: ClassTag](
    @transient private val _rdd: RDD[_ <: Product2[K, V]],
    val partitioner: Partitioner,
    val serializer: Serializer = SparkEnv.get.serializer,
    val keyOrdering: Option[Ordering[K]] = None,
    val aggregator: Option[Aggregator[K, V, C]] = None,
    val mapSideCombine: Boolean = false,
    val shuffleWriterProcessor: ShuffleWriteProcessor = new ShuffleWriteProcessor)
  extends Dependency[Product2[K, V]] with Logging {

  val shuffleHandle: ShuffleHandle = _rdd.context.env.shuffleManager.registerShuffle(
    shuffleId, this)
}

那么什么时候会调用RDD的getDependencies?

我们回想下之前几篇博客的内容《Spark-SparkSubmit详细过程》、《Spark-driver和executor启动过程》、《Spark-Job启动、Stage划分》是不是在Stage划分时调用了这个方法来判断是否要创建一个ShuffleMapStage。因此在划分Stage时就确定了ShuffleHandle,换言之也就确定了这个Stage最后的结果要选用哪个ShuffleWriter。而ShuffleWriter又是Spark计算中一个大的瓶颈,因此调节ShuffledRDD的ShuffleDependency就成了调优的必要且重要渠道。后面展开分析ShuffleWriter时再具体讲

哪里用到了ShuffleHandle

1、getReader调用时

getReader是在一个Stage中读取上游Stage时调用的也就是ShuffleRDD中的compute()

class ShuffledRDD[K: ClassTag, V: ClassTag, C: ClassTag](
    @transient var prev: RDD[_ <: Product2[K, V]],
    part: Partitioner)
  extends RDD[(K, C)](prev.context, Nil) {

  override def compute(split: Partition, context: TaskContext): Iterator[(K, C)] = {
    val dep = dependencies.head.asInstanceOf[ShuffleDependency[K, V, C]]
    val metrics = context.taskMetrics().createTempShuffleReadMetrics()
    SparkEnv.get.shuffleManager.getReader(
      dep.shuffleHandle, split.index, split.index + 1, context, metrics)
      .read()
      .asInstanceOf[Iterator[(K, C)]]
  }

}

2、getWriter调用时

getWriter是在一个Stage结束并将数据溢写磁盘时调用的也就是ShuffleWriteProcessor中的write()

private[spark] class ShuffleWriteProcessor extends Serializable with Logging {

def write(
      rdd: RDD[_],
      dep: ShuffleDependency[_, _, _],
      mapId: Long,
      context: TaskContext,
      partition: Partition): MapStatus = {
    var writer: ShuffleWriter[Any, Any] = null
    try {
      val manager = SparkEnv.get.shuffleManager
      writer = manager.getWriter[Any, Any](
        dep.shuffleHandle,
        mapId,
        context,
        createMetricsReporter(context))
      writer.write(
        rdd.iterator(partition, context).asInstanceOf[Iterator[_ <: Product2[Any, Any]]])
  }

}

四、Shuffle场景描述

通过以上的梳理我们大致画下ShuffleManager在计算Task中的场景

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2103595.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

万界星空科技云MES系统:提升生产效率与质量

万界星空科技云MES系统是一款基于云计算技术的智能制造执行系统&#xff0c;它集成了生产计划、生产管理、质量管理、设备管理、仓储管理等多个核心功能模块&#xff0c;旨在为企业提供高效、灵活、安全的生产管理解决方案。以下是对万界星空科技云MES系统功能和应用范围的详细…

地理围栏,打造智能生活新边界

随着智能手机和其他移动设备的普及&#xff0c;用户对基于位置获取个性化服务的需求大幅增加&#xff0c;例如用户进入商圈范围并停留操作一段时间后&#xff0c;智能触发向用户推送该商圈吃、喝、玩、乐的优惠活动消息&#xff1b;又如当用户到达非常驻地的机场时触发围栏&…

2024-MongoDB中国用户大会

周五下午7个小时高铁从深圳赶到上海&#xff0c;周六一天大会&#xff0c;周天飞回深圳。特种兵行动参加“2024-MongoDB中国用户大会”。&#xff0c;缓了两天终于把素材整理出来了。 这也是首次参加MongoDB相关专题会议&#xff0c;MongoDB出现在我接触的大多数项目中&#xf…

什么是法定计量校准?法定计量工作中常会有哪些问题?

什么是法定计量校准&#xff1f; 法制计量校准是以掌握计量和计量的基本内容&#xff0c;依法规划、监督、管理计量为基础&#xff0c;通过政府、企业、个人等多种参与方式和对计量违法行为的监督、处罚。 法制计量校准的目的是为了提高计量工作水平&#xff0c;促进国民经济、…

Type-C接口 未来发展趋势

随着科技的进步&#xff0c;Type-C接口已经逐渐代替了传统的USB接口&#xff0c;Type-C接口的普及将会给我们生活带来很多的便利。 Typez-C接口主要包括正反可插&#xff0c;传输速率高&#xff0c;支持快充协议&#xff0c;安全性高&#xff0c;体积小巧&#xff0c;支持高速…

集团企业主数据管理项目实施步骤及要点

为打破数据孤岛&#xff0c;提升数据治理水平&#xff0c;某省级投资集团率先构建了主数据平台&#xff0c;并成功实现了财务系统、合同系统、人力资源系统及OA门户系统等多系统的无缝对接。通过主数据平台提供的标准化接口&#xff0c;这些关键业务系统能够高效获取所需的主数…

车辆违停智能监测摄像头

车辆违停是城市交通管理中常见的问题&#xff0c;给道路交通秩序和行车安全带来了一定的影响。为了有效监测和处置车辆违停行为&#xff0c;智能监测摄像头被广泛应用于城市道路和停车场等场所。这种摄像头结合了图像识别技术和人工智能算法&#xff0c;能够实时监测并识别违停…

3600关成语填字APP游戏ACCESS\EXCEL数据库

成语类的APP游戏在最近一两年内非常的火爆&#xff0c;其主要原因是几乎所有中国人都能够冲个几十上百关&#xff0c;学习和趣味共享。看图猜成语类的数据之前已经弄到过很多&#xff0c;今天这份成语填字的倒是头一份。 该数据做成的APP效果如下&#xff1a; 数据以\符号分隔…

QGIS编译好后 启动报错qgis.app_app.dll not find lqgis.envfor correct environment paths

报错&#xff1a;qgis.app_app.dll not find lqgis.envfor correct environment paths 一&#xff0c;把 qgis.exe 修改成qgis-bin.exe 二&#xff0c;拷贝osggeo4w下面的qgis-bin.env 文件到当前执行目录

24最新『ComfyUI』入门到入坟全套教程!!看到就是赚到!赶紧收藏!

前言 本文简介 Stable Diffusion WebUI 应该是大多数人第一次接触 SD 绘画的工具&#xff0c;这款工具简单易上手&#xff0c;但操作流程相对固定。如果你想拥有更自由的工作流&#xff0c;可以试试 ComfyUI。而且很多新的模型和功能在刚出现时 ComfyUI 的支持度都比较高&…

IDEA 更新后打开Java项目无法Run

问题 IDEA新建了Java项目&#xff0c;然后更新IDEA之后&#xff0c;没有勾选任何删除配置&#xff0c;但是在新版本打开项目时无法使用Run。 分析 首先这不是Edit Configurations能解决的问题&#xff0c;根因也不是。 打开Project Structure发现除了Name以外的配置都是不可用…

【Python入门】教你安装2024最新的Python,最新版全面教程!!!

2024安装Python的详细教程 一、准备工作 确定Python版本&#xff1a; 访问Python官网&#xff08;Welcome to Python.org&#xff09;&#xff0c;查看最新的稳定版本。在撰写本文时&#xff08;2024年9月&#xff09;&#xff0c;Python 3.x 是当前的主要版本&#xff0c;其…

linux 硬件 arm架构 汇编语言

1.cortex 1. Cortex-A 低功耗 消费类 ARM Cortex-A 系列处理器是一种广泛应用于 移动设备、嵌入式系统和物联网的高效能处理器&#xff0c;因其低功耗和高性能的特点而受到青睐。 2. Cortex-R 实时性 Cortex-R处理器针对高性能实时应用&#xff0c;例如硬盘控制器&#xff08;或…

在 Cloud TPU 上训练 DLRM 和 DCN (TF 2.x)

本教程介绍如何训练 DLRM 和 DCN v2 排名模型&#xff0c; 用于预测点击率 (CTR) 等任务。查看以下语言版本的备注 设置以运行 DLRM 或 DCN 模型&#xff0c;了解如何设置参数 来训练 DLRM 或 DCN v2 排名模型。 模型输入是数值特征和分类特征&#xff0c;输出是标量 &#xf…

【HuggingFace Transformers】LlamaRotaryEmbedding源码解析

LlamaRotaryEmbedding源码解析 1. LlamaRotaryEmbedding类 介绍2. 逆频率向量3. LlamaRotaryEmbedding类 源码解析3.1 transformers v4.44.2版3.2 transformers v4.41.1版 1. LlamaRotaryEmbedding类 介绍 在LLaMa模型中&#xff0c;LlamaRotaryEmbedding类实现了Rotary Posit…

Elasticsearch 向量数据库本地部署 及操作方法

elasticsearch是个分布式向量数据库&#xff0c;支持多种查找模式。此外还拥有 Metadata、Filtering、Hybrid Search、Delete、Store Documents、Async等能力。本文仅是记录本地测试途中遇到的问题。 一&#xff0c;环境部署 下载软件 首先去官网&#xff0c;选择适合平台下…

Kafka-设计原理

ControllerLeader - PartitionRebalance消息发布机制HW与LEO日志分段 Controller Kafka核心总控制器Controller&#xff1a;在Kafka集群中会有一个或者多个broker&#xff0c;其中有一个broker会被选举为控制器&#xff08;Kafka Controller&#xff09;&#xff0c;它负责管理…

Hyper-v 安装 centOS

一.Hyper-v安装 1. 右键此电脑&#xff0c;点击属性&#xff0c;查看自己的window版本 如果是专业版或者企业版&#xff0c;则无需额外操作&#xff0c;如果是家庭版&#xff0c;则需要先运行一个脚本来进行安装。 参考这一篇&#xff1a;window10 家庭版如何开启Hyper-v-CSDN…

FPGA开发:初识FPGA

FPGA是什么&#xff1f; FPGA的全称是现场可编程门阵列&#xff08;Field Programmable Gate Array&#xff09;&#xff0c;一种以数字电路为主的集成芯片&#xff0c;属于可编程逻辑器件PLD的一种。简单来说&#xff0c;就是能用代码编程&#xff0c;直接修改FPGA芯片中数字…

OceanBase 关于 place_group_by HINT的使用

PLACE_GROUP_BY Hint 表示在多表关联时&#xff0c;如果满足单表查询后直接进行group by 的情形下&#xff0c;在跟其它表进行关联统计&#xff0c;减少表内部联接。 NO_PLACE_GROUP_BY Hint 表示在多表关联时&#xff0c;在关联后才对结果进行group by。 使用place_group_by …