Spark - RDD 的分区和Shuffle

news2025/1/9 16:38:50

一、RDD 的分区

前面在学习 MapReduces 的时候就提到分区,在RDD中同样也存在分区的概念,本质上都是为了提高并行度,从而提高执行的效率,那在 Spark 中的分区该怎么设置呢?

首先分区不是越多越好,太多意味着任务数太多,调度任务也会耗时从而导致总体耗时增多,分区数太少的话,会导致一些节点分配不到任务,而某个分区数据量又大导致数据倾斜问题。

因此官方推荐的分区数是:partitionNum = (executor-cores * num-executor) * (2~3)

在 Spark 中可以通过创建 RDD 时指定分区的数量,比如:

var rdd = sc.textFile("D:/test/input", 5)

也可以通过 repartition 算子,动态调整分区的数量:

rdd = rdd.repartition(8)

或者使用 coalesce 算子修改分区数:

rdd = rdd.coalesce(numPartitions = 2, shuffle = false)

repartition 算子本质上就是 coalesce(numPartitions, shuffle = true)

如果 shuffle 参数指定为 false,运行计划中不会有 ShuffledRDD,也就没有 shuffled 过程,如果是增大分区,此时是一种宽依赖,如果 shuffle 参数指定为 false ,可以发现分区数不会发生变化,比如:

var rdd = sc.parallelize(1 to 100, 6)
println(rdd.getNumPartitions)

rdd = rdd.coalesce(numPartitions = 8, shuffle = false)
println(rdd.getNumPartitions)

此时分区数无法增大:

在这里插入图片描述

分区规则:

在 Spark 中的默认分区规则有两种,分别是RangePartitioner(范围分区),HashPartitioner(Hash分区),同样也支持自定义分区。

HashPartitioner 只作用于Key-Value类型的RDD,根据 key 的 hashCode 值和分区数求余,确定具体那个分区。

RangePartitioner 将一定范围内的数映射到某一个分区内,尽量保证每个分区中数据量的均匀,而且分区与分区之间是有序的,一个分区中的元素肯定都是比另一个分区内的元素小或者大,但是分区内的元素是不能保证顺序的。简单的说就是将一定范围内的数映射到某一个分区内。

自定义分区的话需要继承 Partitioner ,并在 getPartition 中接收数据给出具体分区数,如:

object PartitionerTest {

  case class MyPartition(numPartition: Int) extends Partitioner {
    //分区数
    override def numPartitions: Int = {
      numPartition
    }
    //具体分区
    override def getPartition(key: Any): Int = {
      val v = key.toString.toInt
      if (v < 3) {
        0
      } else if (v >= 3 && v < 5) {
        1
      } else {
        2
      }
    }
  }

  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName("spark").setMaster("local[*]")
    val sc = new SparkContext(conf)
    sc.setLogLevel("WARN")

    val rdd = sc.parallelize(Seq(1, 2, 3, 4, 5, 2, 3), 2)
    println(rdd.getNumPartitions)

    val rdd1 = rdd.map((_, 1)).partitionBy(MyPartition(3))
    println(rdd1.getNumPartitions)
  }
}

二、RDD 的 Shuffle

分区的主要作用是用来实现并行计算,但是往往在进行数据处理的时候,例如 reduceByKey 等聚合操作时, 需要把 Key 相同的 Value 拉取到一起进行计算, 这个时候有可能这些 Key 相同的 Value 会坐落于不同的分区,为了让不同分区相同 Key 的数据都在 reduceByKey 的同一个 reduce 中处理,需要执行一个 all-to-all 的操作,在不同的分区之间拷贝数据, 必须跨分区聚集相同 Key 的所有数据,这个过程即 Shuffle

Spark 中的 ShuffleHash base shuffleSort base shuffle 以及 tungsten-sort shuflle ,默认使用的是 Sort base shuffleHash base shuffle 已经过时废弃。

Hash base shuffle:

在这里插入图片描述

大致的原理是分桶, 假设 Reducer 的个数为 R, 那么每个 MapperR 个桶,按照 KeyHash 将数据映射到不同的桶中, Reduce 找到每一个 Mapper 中对应自己的桶拉取数据。

假设 Mapper 的个数为 M, 整个集群的文件数量是 M * R, 如果有 1000MapperReducer,则会生成 1000000 个文件, 这个量是非常巨大的。

Sort base shuffle:
在这里插入图片描述
对于 Sort base shuffleMap 侧将数据全部放入一个叫做 AppendOnlyMap 的组件中,同时可以在这个特殊的数据结构中做聚合操作,然后通过一个类似于 MergeSort 的排序算法 TimSortAppendOnlyMap 底层的 Array 排序,先按照 Partition ID 排序, 后按照 KeyHashCode 排序,最终每个 Map Task 生成一个 输出文件,Reduce Task 来拉取自己对应的数据,可以大幅度减少所产生的中间文件,从而能够更好的应对大吞吐量的场景,在 Spark 1.2 以后, 已经默认采用这种方式。

tungsten-sort:

与sort类似,tungsten-sort使用了堆外内存管理机制,内存使用效率更高。

修改默认的 Sort base shuffletungsten-sort

conf.set("spark.shuffle.manager","tungsten-sort");

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/55446.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

[Android]Logcat调试

Android采用Log(android.util.log)工具打印日志&#xff0c;它将各类日志划分为五个等级。 Log.e 打印错误信息 Log.w 打印警告信息 Log.i 打印一般信息 Log.d 打印调试信息 Log.v 打印冗余信息 不同等级的日志信息&#xff0c;在日志栏中会以不同颜色和等级(E、W、…

(附源码)ssm医院挂号系统 毕业设计 250858

医院挂号系统的设计与实现 摘 要 信息化社会内需要与之针对性的信息获取途径&#xff0c;但是途径的扩展基本上为人们所努力的方向&#xff0c;由于站在的角度存在偏差&#xff0c;人们经常能够获得不同类型信息&#xff0c;这也是技术最为难以攻克的课题。针对医院排队挂号等问…

深入理解 Python 描述符

学习 Python 这么久了&#xff0c;说起 Python 的优雅之处&#xff0c;能让我脱口而出的&#xff0c; Descriptor&#xff08;描述符&#xff09;特性可以排得上号。 描述符 是Python 语言独有的特性&#xff0c;它不仅在应用层使用&#xff0c;在语言语法糖的实现上也有使用到…

【java基础系列】16- Java中怎么处理异常?

Java的异常处理 1、异常的概念 概念&#xff1a;程序在运行过程中出现的不正常现象。出现异常不处理将终止程序运行。异常处理的必要性&#xff1a;任何程序都可以存在大量的未知问题、错误&#xff1b;如果不对这些问题进行正确处理&#xff0c;则可能导致程序的中断&#x…

[附源码]Python计算机毕业设计SSM开放性实验室网上预约管理(程序+LW)

项目运行 环境配置&#xff1a; Jdk1.8 Tomcat7.0 Mysql HBuilderX&#xff08;Webstorm也行&#xff09; Eclispe&#xff08;IntelliJ IDEA,Eclispe,MyEclispe,Sts都支持&#xff09;。 项目技术&#xff1a; SSM mybatis Maven Vue 等等组成&#xff0c;B/S模式 M…

分布式和可再生系统建模(simulink)

目录 1 可再生/微电网概述 1.1 分布式和可再生系统建模和仿真 1.2 解决工作流程 1.3 能源管理系统监控设计 1.4 验证控件的测试网格规范和IEEE标准 1.5 部署算法和代码生成 1.6 网格集成研究控制器的实时测试 2 微电网案例 2.1 简介 2.2 在 Simulink 中实现微电网…

【JavaEE】初识操作系统,吃透进程

✨哈喽&#xff0c;进来的小伙伴们&#xff0c;你们好耶&#xff01; &#x1f6f0;️&#x1f6f0;️系列专栏:【JavaEE】 ✈️✈️本篇内容:初始操作系统&#xff0c;进程的概念 &#x1f680;&#x1f680;代码存放仓库gitee&#xff1a;JavaEE初阶代码存放&#xff01; ⛵⛵…

关于AM437x Linux+qt开发笔记(2)

第一部分,触摸屏 命令 lsinput (有些系统不移植)查看系统input实践 命令ox -d /dev/input/event1 或hexdump -d /dev/input/event1 (从上图看,event0没有接的触屏) ,点击屏幕如下 命令 ls /dev/input -al , 查看input设备的触摸屏软连接。 命令cat /etc/udev/ru…

编码与测试

文章目录一、编码1、概念2、如何选择程序设计语言3、程序设计风格&#xff08;1&#xff09;源程序文档化(2) 数据说明&#xff08;3&#xff09;语句构造&#xff08;4&#xff09;输入输出&#xff08;5&#xff09;程序效率编码时提高程序运行效率的主要规则二、软件测试基础…

【爬坑之路一】windows系统下更新升级node版本【亲测有效】

前言 一定要看到最后&#xff01;&#xff01;&#xff01; 项目开发中&#xff0c;需要升级 node 版本。本着不想卸载 node 再重新安装的原则&#xff0c;因为node 的环境配置以及各种相关配置有些繁琐&#xff0c;所以就想着使用 命令的方式进行升级。 在网上找了一些升级 n…

C#详解:程序域、程序集、模块、Type、反射

总结&#xff1a; ">>>":代表包含 进程>>>应用程序域AppDomain>>>程序集Assembly>>>模块Module>>>类型Type>>>成员&#xff08;方法、属性等&#xff09; 1、程序集Assembly 如图&#xff0c;假设一个解决方…

ARM 自己动手安装交叉编译工具链

一、Windows中装软件的特点 Windows中装软件使用安装包&#xff0c;安装包解压后有2种情况&#xff1a; 一种是一个安装文件&#xff08;.exe .msi&#xff09;&#xff0c;双击进行安装&#xff0c;下一步直到安装完毕。安装完毕后会在桌面上生成快捷方式&#xff0c;我们平…

(附源码)ssm招聘网站 毕业设计 250858

SSM招聘网站 摘 要 招聘网站采用B/S结构、java开发语言、以及Mysql数据库等技术。系统主要分为管理员、用户、企业三部分&#xff0c;管理员管理主要功能包括&#xff1a;首页、站点管理&#xff08;轮播图、公告栏&#xff09;用户管理&#xff08;管理员、应聘用户、企业用户…

重点| 系统集成项目管理工程师考前50个知识点(2)

本文章总结了系统集成项目管理工程师考试背记50个知识点&#xff01;&#xff01;&#xff01; 帮助大家更好的复习&#xff0c;希望能对大家有所帮助 比较长&#xff0c;放了部分&#xff0c;需要可私信&#xff01;&#xff01; 11、项目目标包括成果性目标和约束性目标。项…

直播倒计时,PyTorch Conference 2022 今晚开启

内容一览&#xff1a;PyTorch Conference 2022 即将在美国南部城市新奥尔良举办。本次会议将带来技术讲座、项目演示及最佳案例分享。 本文首发自微信公众号&#xff1a;PyTorch 开发者社区 关键词&#xff1a;PyTorch 深度学习 机器学习 PyTorch Conference 2022 今晚开启 自…

360crawlergo结合xray被动扫描

360crawlergo结合xray被动扫描 360crawlergo结合xray被动扫描安装配置 360crawlergo结合xray被动扫描 安装 Xray https://github.com/chaitin/xraycrawlergo_x_XRAY https://github.com/timwhitez/crawlergo_x_XRAYcrawlergo https://github.com/0Kee-Team/crawlergo 更多的…

高等数值计算方法学习笔记第7章【非线性方程组求根】

高等数值计算方法学习笔记第7章【非线性方程组求根】一、方程求根与二分法&#xff08;第五次课&#xff09;二、不动点迭代法及其收敛性1.不动点迭代与不动点迭代法&#xff08;一个例题&#xff09;2.不动点的存在性与迭代法的收敛性&#xff08;两个定理&#xff0c;两例题&…

计算机网络——常考的面试题

什么是TCP/IP&#xff1f; TCP建立连接为什么要三次握手&#xff1f;断开连接为什么要四次挥手&#xff1f; SSL/TSL握手过程&#xff1f; 1、网络分层模型 OSI&#xff1a;全称叫Open System Interconnection (开放式系统互联)&#xff0c;是国际标准化组织ISO制定的理论模…

【软件测试】面试老约不到?软件测试简历项目经验怎么写?论项目经验的重要性......

目录&#xff1a;导读前言一、Python编程入门到精通二、接口自动化项目实战三、Web自动化项目实战四、App自动化项目实战五、一线大厂简历六、测试开发DevOps体系七、常用自动化测试工具八、JMeter性能测试九、总结&#xff08;尾部小惊喜&#xff09;前言 随着就业竞争越来越…

(五)共享模型之管程【wait notify 】

一、wait notify 1. 小故事 - 为什么需要 wait 2. 原理之 wait / notify &#xff08;1&#xff09;Owner 线程发现条件不满足&#xff0c;调用 wait 方法&#xff0c;即可进入 WaitSet 变为 WAITING 状态。 &#xff08;2&#xff09;BLOCKED 和WAITING 的线程都处于阻塞状…