Spark RDD sortBy算子什么情况会触发shuffle

news2024/11/16 21:32:25

在 Spark 的 RDD 中,sortBy 是一个排序算子,虽然它在某些场景下可能看起来是分区内排序,但实际上在需要全局排序时会触发 Shuffle。这里我们分析其底层逻辑,结合源码和原理来解释为什么会有 Shuffle 的发生。


1. 为什么 sortBy 会触发 Shuffle?

关键点 1:全局有序性要求

sortBy 并非单纯的分区内排序。它的目标是按照用户指定的键对整个 RDD 的数据进行排序,这种操作需要保证全局顺序。为实现这一点,必须:

  • 对数据进行 重新分区(Repartition),确保每个分区中的数据按照全局范围内的排序键正确分布;
  • 每个分区内部再完成排序。

这些步骤不可避免地引入了 Shuffle,因为数据需要从一个分区转移到另一个分区以保证全局有序性。


关键点 2:底层调用 repartitionAndSortWithinPartitions

sortBy 的底层实现会调用 repartitionAndSortWithinPartitions 方法:

this.keyBy(f).repartitionAndSortWithinPartitions(
  new RangePartitioner(numPartitions, this, ascending))(ordInverse).values
  1. keyBy(f)

    • 将数据转化为 (key, value) 格式,key 是排序的关键字,value 是原始数据。
  2. RangePartitioner

    • 使用 RangePartitioner 将数据根据排序键重新分区(这一步需要 Shuffle)。
  3. repartitionAndSortWithinPartitions

    • 先 Shuffle 数据以保证每个分区内的 key 是按范围划分的;
    • 然后对每个分区内的数据进行排序。
Shuffle 的触发
  • 当目标分区数量与当前分区数量不一致时(用户指定分区数或默认分区数),会触发 Shuffle;
  • 即使目标分区数一致,只要需要保证全局有序,也需要重新分布数据来确保各分区内数据按键范围划分。

2. Shuffle 的作用

  • 全局排序:分区间重新分布数据,确保所有分区的排序键范围是连续的。
  • 负载均衡:通过 RangePartitioner 分布数据,避免某些分区过大或过小的问题。
  • 分区内排序:确保每个分区内部数据按键排序。

3. 源码分析

repartitionAndSortWithinPartitions 的核心逻辑如下:

def repartitionAndSortWithinPartitions(
    partitioner: Partitioner)(
    implicit ord: Ordering[K]): RDD[(K, V)] = withScope {
  val shuffled = new ShuffledRDD[K, V, V](this, partitioner)
  shuffled.setKeyOrdering(ord)
  new MapPartitionsRDD(shuffled, (context, pid, iter) => {
    val sorter = new ExternalSorter[K, V, V](context, Some(partitioner), Some(ord))
    sorter.insertAll(iter)
    context.taskMetrics().incMemoryBytesSpilled(sorter.memoryBytesSpilled)
    context.taskMetrics().incDiskBytesSpilled(sorter.diskBytesSpilled)
    context.taskMetrics().incPeakExecutionMemory(sorter.peakMemoryUsedBytes)
    sorter.iterator
  })
}
  1. ShuffledRDD

    • 触发 Shuffle,将数据根据分区器重新分布。
  2. ExternalSorter

    • 对每个分区内的数据进行排序(如果数据超出内存,会使用磁盘作为临时存储)。

4. 举例说明 Shuffle 的发生

sortBy 的行为取决于传递的参数。为了实现分区内排序,你需要明确控制 sortBy 的参数设置。如果不显式指定目标分区数(numPartitions 参数),sortBy 默认不会触发 Shuffle,因此只会在分区内排序。

例子 1:带 Shuffle 的全局排序

显式传递 numPartitions 参数,并设置目标分区数。此时会触发数据的重新分区,确保全局顺序:

val rdd = sc.parallelize(Seq(5, 2, 4, 3, 1), numSlices = 2)
val sortedRdd = rdd.sortBy(x => x, ascending = true, numPartitions = 3)// 指定目标分区数
println(sortedRdd.collect().mkString(", "))
  • 初始数据分区
    分区 1:[5, 2],分区 2:[4, 3, 1]
  • 重新分区和排序后
    分区 1:[1, 2],分区 2:[3, 4],分区 3:[5]
  • Shuffle 触发原因
    数据必须重新分布,确保分区键范围([1-2], [3-4], [5])。
  • 特点
    触发 Shuffle 操作,数据按照 RangePartitioner 进行分区。
    每个分区内局部排序后,实现全局排序。
例子 2:分区内排序(无 Shuffle)

直接使用 sortBy 而不传递 numPartitions 参数:

val rdd = sc.parallelize(Seq(5, 2, 4, 3, 1), numSlices = 2) // 两个分区
val sorted = rdd.sortBy(x => x) // 未指定 numPartitions,默认分区数不变
// 如果只需要分区内排序,mapPartitions 提供了无 Shuffle 的选择。
// val sortedRdd = rdd.mapPartitions(partition => partition.toList.sorted.iterator)
sorted.collect().foreach(println)
  • 初始数据分区
    分区 1:[5, 2],分区 2:[4, 3, 1]
  • 排序后
    分区 1:[2, 5],分区 2:[1, 3, 4]
  • 无 Shuffle 原因
    数据仅在分区内排序,分区间顺序无全局保证。

5. 总结

  • sortBy 在需要全局排序时触发 Shuffle,这是为了重新分区以确保分区范围和分区内排序。
  • 如果只需要分区内排序,mapPartitions 提供了无 Shuffle 的选择。

注意事项

  • 全局排序带来的 Shuffle 会显著增加网络传输和计算成本。
  • 如无必要,尽量避免全局排序,优先考虑局部排序或 Top-N 算法以优化性能。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2241761.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

T6识别好莱坞明星

🍨 本文为🔗365天深度学习训练营 中的学习记录博客🍖 原作者:K同学啊 导入基础的包 from tensorflow import keras from tensorflow.keras import layers,models import os, PIL, pathlib import matplotlib.pyplot as pl…

Odoo :一款免费开源的日化行业ERP管理系统

文 / 开源智造Odoo亚太金牌服务 概述 构建以 IPD 体系作为核心的产品创新研发管控体系,增进企业跨部门业务协同的效率,支撑研发管控、智慧供应链、智能制造以及全渠道营销等行业的场景化,构筑行业的研产供销财一体化管理平台。 行业的最新…

48.第二阶段x86游戏实战2-鼠标点击call

免责声明:内容仅供学习参考,请合法利用知识,禁止进行违法犯罪活动! 本次游戏没法给 内容参考于:微尘网络安全 本人写的内容纯属胡编乱造,全都是合成造假,仅仅只是为了娱乐,请不要…

Vue 学习随笔系列十五 -- 数组遍历方法

数组遍历方法 文章目录 数组遍历方法1. for 循环2. forEach (不会修改数组本身)3. map (不修改数组本身)4. some(不修改数组本身)5. every(不修改数组本身)6. filter(不修改数组本身)7. find(不修改数组本身)8. findIndex拓展 9. reduce(累加)拓展 1. fo…

FreeRTOS的列表与列表项

目录 1.为什么要学列表? 2.什么是列表和列表项? 2.1 列表 2.2列表项 2.3,迷你列表项 3.列表与列表项的初始化 3.1 列表初始化 3.2列表项初始化 4.列表项的“增删查”(插入、删除、遍历) 4.1列表项的插入 4.1.1…

数字IC后端教程之Innovus hold violation几大典型问题

今天小编给大家分享下数字IC后端实现Physical Implementation过程中经常遇到的几个hold violation问题。每个问题都是小编自己在公司实际项目中遇到的。 数字后端实现静态时序分析STA Timing Signoff之min period violation Q1: 在Innouvs postCTS时序优化的log中我们经常会看…

VS2022编译32位OpenCV

使用环境 Visual Studio 2022 OpenCV: 4.7.0 cmake: 3.30.2一、使用CMake工具生成vs2022的openCV工程解决方案 打开cmake,选择opencv的源代码目录,创建一个文件夹,作为VS工程文件的生成目录 点击configure构建项目,弹出构建设置…

企业生产环境-麒麟V10(ARM架构)操作系统部署Zookeeper单节点集群版

前言:ZooKeeper是一个分布式协调服务,它为分布式应用提供一致性服务,是Apache Hadoop的子项目。它被设计为易于编程,同时具有高性能和高可靠性。ZooKeeper提供了一个简单的接口和一些基本的文件系统操作,使得开发者能够…

vue3 中直接使用 JSX ( lang=“tsx“ 的用法)

1. 安装依赖 npm i vitejs/plugin-vue-jsx2. 添加配置 vite.config.ts 中 import vueJsx from vitejs/plugin-vue-jsxplugins 中添加 vueJsx()3. 页面使用 <!-- 注意 lang 的值为 tsx --> <script setup lang"tsx"> const isDark ref(false)// 此处…

深度学习服务器租赁AutoDL

1. 根据需要选择租用的显卡 算力市场 1.1 显卡选择 1.2 环境配置 2. 服务器使用 2.1 上传文件 2.2 调试环境 2.3 跑代码 python train.py && /usr/bin/shutdown # && /usr/bin/shutdown表示代码成功运行结束后&#xff0c;自动关机3. 省钱绝招 省钱绝招 …

IDEA部署AI代写插件

前言 Hello大家好&#xff0c;当下是AI盛行的时代&#xff0c;好多好多东西在AI大模型的趋势下都变得非常的简单。 比如之前想画一幅风景画得先去采风&#xff0c;然后写实什么的&#xff0c;现在你只需描述出你想要的效果AI就能够根据你的描述在几分钟之内画出一幅你想要的风景…

【大数据技术基础 | 实验十】Hive实验:部署Hive

文章目录 一、实验目的二、实验要求三、实验原理四、实验环境五、实验内容和步骤&#xff08;一&#xff09;安装部署&#xff08;二&#xff09;配置HDFS&#xff08;三&#xff09;启动Hive 六、实验结果&#xff08;一&#xff09;启动结果&#xff08;二&#xff09;Hive基…

Flume1.9.0自定义Sink组件将数据发送至Mysql

需求 1、将Flume采集到的日志数据也同步保存到MySQL中一份&#xff0c;但是Flume目前不支持直接向MySQL中写数据&#xff0c;所以需要用到自定义Sink&#xff0c;自定义一个MysqlSink。 2、日志数据默认在Linux本地的/data/log/user.log日志文件中&#xff0c;使用Flume采集到…

Onlyoffice配置一 JWT認證

案例 使用官網給c# MVC的例子&#xff0c;主要在版本7.2之後&#xff0c;默認加入JWT認證&#xff0c;docker版本尚且可以在创建的时候使用默认的指令避开&#xff0c;但是在exe版本&#xff0c;即使配置为false&#xff0c;重启之后也会默认开启。 简单说一下如何配置 配置J…

ZeroSSL HTTPS SSL证书ACMESSL申请3个月证书

目录 一、引言 二、准备工作 三、申请 SSL 证书 四、证书选型 五、ssl重要性 一、引言 目前免费 Lets Encrypt、ZeroSSL、BuyPass、Google Public CA SSL 证书&#xff0c;一般免费3-6个月。从申请难易程度分析&#xff0c;zerossl申请相对快速和简单&#xff0c;亲测速度非…

MySql 日期周处理方式

MySql 日期周处理方式 最近在做数仓相关工作&#xff0c;最近遇到 几个问题&#xff0c; 1、计算指定日期是一年中的第几周&#xff0c;周一为周的第一天 2、计算周的开始时间&#xff0c;结束时间 3、计算周对应的年 比如 2023-01-01 WEEKOFYEAR(2023-01-01) 是2022年的52周&…

STM32 BootLoader 刷新项目 (十) Flash擦除-命令0x56

STM32 BootLoader 刷新项目 (十) Flash擦除-命令0x56 1. STM32F407 BootLoader 中的 Flash 擦除功能详解 在嵌入式系统中&#xff0c;BootLoader 的设计是非常关键的部分&#xff0c;它负责引导主程序的启动、升级以及安全管理。而在 STM32F407 等 MCU 上实现 BootLoader&…

【Homework】【5】Learning resources for DQ Robotics in MATLAB

Lesson 5 代码-TwoDofPlanarRobot.m 表示一个 2 自由度平面机器人。该类包含构造函数、计算正向运动学模型的函数、计算平移雅可比矩阵的函数&#xff0c;以及在二维空间中绘制机器人的函数。 classdef TwoDofPlanarRobot%TwoDofPlanarRobot - 表示一个 2 自由度平面机器人类…

Uniapp 引入 Android aar 包 和 Android 离线打包

需求&#xff1a; 原生安卓 apk 要求嵌入到 uniapp 中&#xff0c;并通过 uniapp 前端调起 app 的相关组件。 下面手把手教你&#xff0c;从 apk 到 aar&#xff0c;以及打包冲突到如何运行&#xff0c;期间我所遇到的问题都会 一 一 进行说明&#xff0c;相关版本以我文章内为…

你可以通过以下步骤找到并打开 **Visual Studio 开发者命令提示符**:

你可以通过以下步骤找到并打开 Visual Studio 开发者命令提示符&#xff1a; 1. 通过开始菜单查找 打开 开始菜单&#xff08;点击屏幕左下角的 Windows 图标&#xff09;。在搜索框中输入 Developer Command Prompt。你应该看到以下几种选项&#xff08;具体取决于你的 Visu…