Spark RDD的分区与依赖关系

news2025/2/21 20:21:39

Spark RDD的分区与依赖关系

RDD分区

RDD,Resiliennt Distributed Datasets,弹性式分布式数据集,是由若干个分区构成的,那么这每一个分区中的数据又是如何产生的呢?这就是RDD分区策略所要解决的问题,下面我们就一道来学习RDD分区相关。

RDD数据分区

Spark目前支持Hash分区和Range分区,用户也可以自定义分区,Hash分区为当前的默认分区,Spark中分区器直接决定了RDD中分区的个数、RDD中每条数据经过Shuffle过程属于哪个分区和Reduce的个数。

分区的决定,就是在宽依赖的过程中才有,窄依赖因为是一对一,分区确定的,所以不需要指定分区操作。

1)Partitioner:在Spark中涉及RDD的分区策略的抽象类为Partitioner,其继承体系如图-27所示,有两个核心的子类实现,一个HashPartitioner,一个RangePartitioner。

图-27 spark Partitioner继承体系

Spark中数据分区的主要工具类(数据分区类),主要用于Spark底层RDD的数据重分布的情况中,主要方法两个,如图-28所示:

图-28 Partitioner抽象类

2)HashPartitioner:Spark中非常重要的一个分区器,也是默认分区器,默认用于90%以上的RDD相关API上。

功能:依据RDD中key值的hashCode的值将数据取模后得到该key值对应的下一个RDD的分区id值,支持key值为null的情况,当key为null的时候,返回0;该分区器基本上适合所有RDD数据类型的数据进行分区操作;但是需要注意的是,由于JAVA中数组的hashCode是基于数组对象本身的,不是基于数组内容的,所以如果RDD的key是数组类型,那么可能导致数据内容一致的数据key没法分配到同一个RDD分区中,这个时候最好自定义数据分区器,采用数组内容进行分区或者将数组的内容转换为集合。HashPartitioner代码如下:

def main(args: Array[String]): Unit = {
val conf: SparkConf = new SparkConf().setAppName("demo").setMaster("local[*]")
val sc = new SparkContext(conf)
sc.setLogLevel("WARN")
//加载数据
val rdd = sc.parallelize(List((1,3),(1,2),(2,4),(2,3),(3,6),(3,8)),8)
//通过Hash分区
val result: RDD[(Int, Int)] = rdd.partitionBy(new org.apache.spark.HashPartitioner(2))
//获取分区方式
println(result.partitioner)
//获取分区数
println(result.getNumPartitions)
}

RDD自定义分区

我们都知道Spark内部提供了HashPartitioner和RangePartitioner两种分区策略,这两种分区策略在很多情况下都适合我们的场景。但是有些情况下,Spark内部不能符合我们的需求,这时候我们就可以自定义分区策略。

要实现自定义的分区器,你需要继承 org.apache.spark.Partitioner 类并实现下面三个方法。

1)numPartitions: Int:返回创建出来的分区数。

2)getPartition(key: Any): Int:返回给定键的分区编号(0到numPartitions-1)。

3)equals():Java判断相等性的标准方法。这个方法的实现非常重要,Spark 需要用这个方法来检查你的分区器对象是否和其他分区器实例相同,这样 Spark 才可以判断两个 RDD 的分区方式是否相同。

案例一:模拟实现HashPartitioner。

class CustomerPartitoner(numPartiton:Int) extends Partitioner{

  // 返回分区的总数

  override def numPartitions: Int = numPartiton

  // 根据传入的Key返回分区的索引

  override def getPartition(key: Any): Int = {

key.hashCode()%numparts

  }

}

object CustomerPartitoner {

  def main(args: Array[String]): Unit = {

val sparkConf = new SparkConf()

.setAppName("CustomerPartitoner").setMaster("local[*]")

val sc = new SparkContext(sparkConf)

//zipWithIndex该函数将RDD中的元素和这个元素在RDD中的ID(索引号)组合成键/值对。

val rdd = sc.parallelize(0 to 10,1).zipWithIndex()

val func = (index:Int,iter:Iterator[(Int,Long)]) =>{

  iter.map(x => "[partID:"+index + ", value:"+x+"]")

}

val r = rdd.mapPartitionsWithIndex(func).collect()

for (i <- r){

  println(i)

}

val rdd2 = rdd.partitionBy(new CustomerPartitoner(5))

val r1 = rdd2.mapPartitionsWithIndex(func).collect()

println("----------------------------------------")

for (i <- r1){

  println(i)

}

println("----------------------------------------")

sc.stop()

  }

}

总结:

1)分区主要面对KV结构数据,Spark内部提供了两个比较重要的分区器,Hash分区器和Range分区器。

2)hash分区主要通过key的hashcode来对分区数求余,hash分区可能会导致数据倾斜问题,Range分区是通过水塘抽样的算法来将数据均匀的分配到各个分区中。

3)自定义分区主要通过继承partitioner抽象类来实现,必须要实现两个方法:numPartitions 和 getPartition(key: Any)。

RDD依赖关系

RDD和它依赖的父RDD的关系有两种不同的类型,即窄依赖(narrow dependency)和宽依赖(wide dependency)。

依赖关系

图-29是源码中的一张图,可以发现一个问题Dependency(依赖)的意思可以发现ShuffleDependency是其子类(即宽依赖),NarrowDependency是其子类(即窄依赖)。

图-29 Dependency体系

1)宽窄依赖:所谓窄依赖,指的是子RDD一个分区中的数据,来自于上游RDD中一个分区。所谓宽依赖,指的是子RDD一个分区中的数据,来自于上游RDD所有的分区。

宽窄依赖关系示例如图-30所示:

图-30 宽窄依赖示例图

2)血统Lineage:RDD只支持粗粒度转换,即在大量记录上执行的单个操作。将创建RDD的一系列Lineage(即血统)记录下来,以便恢复丢失的分区。RDD的Lineage会记录RDD的元数据信息和转换行为,当该RDD的部分分区数据丢失时,它可以根据这些信息来重新运算和恢复丢失的数据分区。关于linage说明示意图如图-31所示:

图-31 lineage示例图

3)DAG有向无环图:如果一个有向图无法从某个顶点出发经过若干条边回到该点,则这个图是一个有向无环图。有向图中一个点经过两种路线到达另一个点未必形成环,因此有向无环图未必能转化成树,但任何有向树均为有向无环图。通俗的来说就是有方向,没有回流的图可以称为有向无环图,示意图如图-32所示。

图-32 有像无环图

4)RDD任务的切分:对于RDD的任务切分,可以很形象的如图-33所示。

图-33 RDD任务的切分

并行度:程序同一时间执行作业的线程个数。

原始的RDD通过一系列的转换就就形成了DAG,根据RDD之间的依赖关系的不同将DAG划分成不同的Stage,如图-34所示。

图-34 RDD stage的切分

对于窄依赖,partition的转换处理在Stage中完成计算。对于宽依赖,由于有Shuffle的存在,只能在parent RDD处理完成后,才能开始接下来的计算,因此宽依赖是划分Stage的重要依据。Stage阶段计算过程如图所示-35所示。

图-35 RDD stage阶段计算过程

任务生成和提交的四个阶段

Spark任务生产和提交的四个步骤可以归纳如下:

1)构建DAG:用户提交的job将首先被转换成一系列RDD并通过RDD之间的依赖关系构建DAG,然后将DAG提交到调度系统。

DAG描述多个RDD的转换过程,任务执行时,可以按照DAG的描述,执行真正的计算。

DAG是有边界的:开始(通过sparkcontext创建的RDD),结束(触发action,调用runjob就是一个完整的DAG形成了,一旦触发action,就形成了一个完整的DAG)。

一个RDD描述了数据计算过程中的一个环节,而一个DAG包含多个RDD,描述了数据计算过程中的所有环节。

一个spark application可以包含多个DAG,取决于具体有多少个action。

2)DAGScheduler:将DAG切分stage(切分依据是shuffle),将stage中生成的task以taskset的形式发送给TaskScheduler

为什么要切分stage?

一个是复杂业务逻辑(将多台机器上具有相同属性的数据聚合到一台机器上:shuffle)。

如果有shuffle,那么就意味着前面阶段产生结果后,才能执行下一个阶段,下一个阶段的计算依赖上一个阶段的数据。

在同一个stage中,会有多个算子,可以合并到一起,我们很难称其为pipeline(流水线,严格按照流程、顺序执行)。

3)TaskScheduler:调度task(根据资源情况将task调度到Executors).

4)Executors:接收task,然后将task交给线程池执行。

具体可以简化为如图-38所示。

图-38 spark任务生成和提交图

排序

TopN

topN就是上述sortBy/sortByKey之后执行action操作take(N),或者直接takeOrderd(N),建议使用后者,效率高于前者。具体操作省略。

二次排序

所谓二次排序,指的是排序字段不唯一,有多个,共同排序,仍然使用上面的数据,对学生的身高和年龄一次排序。

object SecondSortOps {

def main(args: Array[String]): Unit = {

val conf = new SparkConf()

.setAppName(s"${SecondSortOps.getClass.getSimpleName}")

.setMaster("local[2]")

val sc = new SparkContext(conf)

//sortByKey 数据类型为k-v,且是按照key进行排序

val personRDD:RDD[Person] = sc.parallelize(List(

Person(1, "吴轩宇", 19, 168),

Person(2, "彭国宏", 18, 175),

Person(3, "随国强", 18, 176),

Person(4, "闫  磊", 20, 180),

Person(5, "王静轶", 18, 168)

))

personRDD.map(stu => (stu, null)).sortByKey(true, 1).foreach(p => println(p._1))

sc.stop()

}

}

case class Person(id:Int, name:String, age:Int, height:Double) extends Ordered[Person] {

//对学生的身高和年龄依次排序

override def compare(that: Person) = {

var ret = this.height.compareTo(that.height)

if(ret == 0) {

ret = this.age.compareTo(that.age)

}

ret

}

}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1639521.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Luminar开始为沃尔沃生产下一代激光雷达传感器

在自动驾驶技术的浪潮中&#xff0c;激光雷达&#xff08;LiDAR&#xff09;传感器以其高精度和强大的环境感知能力&#xff0c;逐渐成为了该领域的技术之星。Luminar&#xff08;路安达&#xff09;公司作为自动驾驶技术的领军企业&#xff0c;近日宣布已开始为沃尔沃汽车生产…

智能家居|基于SprinBoot+vue的智能家居系统(源码+数据库+文档)

智能家居目录 基于SprinBootvue的智能家居系统 一、前言 二、系统设计 三、系统功能设计 1管理员&#xff1a;个人中心管理功能的详细实现 2管理员&#xff1a;用户信息管理功能的详细实现 3管理员&#xff1a;家具管理功能的详细实现 4管理员&#xff1a;任务管理功能…

Golang | Leetcode Golang题解之第63题不同路径II

题目&#xff1a; 题解&#xff1a; func uniquePathsWithObstacles(obstacleGrid [][]int) int {n, m : len(obstacleGrid), len(obstacleGrid[0])f : make([]int, m)if obstacleGrid[0][0] 0 {f[0] 1}for i : 0; i < n; i {for j : 0; j < m; j {if obstacleGrid[i]…

leetCode65. 有效数字

leetCode65. 有效数字 题目思路 代码 class Solution { public:bool isNumber(string s) {int l 0, r s.size() - 1;// 1.忽略前后的空格while(l < r && s[l] ) l;while(l < r && s[r] ) r--;if(l > r) return false;s s.substr(l,r - l 1)…

Docker——生产案例(如何修改Docker部署服务的端口映射)

目录 前言 1. 测试环境中新建Apache服务 2.停止容器和Docker服务 3.修改容器配置 4.重启Docker服务并访问测试 前言 由于接替原工作人员的工作之后&#xff0c;上级需要修改Docker部署Apache服务的端口映射&#xff0c;将89端口修改为99端口&#xff0c;那我们该如何修改呢…

GitHub Copilot 简单使用

因为公司安全原因&#xff0c;并不允许在工作中使用GitHub Copilot&#xff0c;所以&#xff0c;一直没怎么使用。最近因为有一些其它任务&#xff0c;所以&#xff0c;试用了一下&#xff0c;感觉还是很不错的。&#xff08;主要是C和Python编程&#xff09; 一&#xff1a;常…

不固定属性分组的减轻校准偏差以改善医学成像分析中的公平性

文章目录 Mitigating Calibration Bias Without Fixed Attribute Grouping for Improved Fairness in Medical Imaging Analysis摘要方法实验结果 Mitigating Calibration Bias Without Fixed Attribute Grouping for Improved Fairness in Medical Imaging Analysis 摘要 这…

预训练模型介绍

一、什么是GPT GPT 是由人工智能研究实验室 OpenAI 在2022年11月30日发布的全新聊天机器人模型, 一款人工智能技术驱动的自然语言处理工具 它能够通过学习和理解人类的语言来进行对话, 还能根据聊天的上下文进行互动,能完成撰写邮件、视频脚本、文案、翻译、代码等任务 二、 为…

分享几个嵌入式操作系统

文章目录 1. 概述2. 常见的RTOS2.1 μC/OS2.2 FreeRTOS2.3 RT-Thread2.4 其它系统1. 概述 最近在查阅一些物联网系统相关的知识,因此查到了实时操作系统(Real Time Operating System,简称RTOS)。我最早接触的RTOS是μC/OS,当时是为了学习 操作系统相关的知识,后来工作也接…

LeetCode-网络延迟时间(Dijkstra算法)

每日一题 今天刷到一道有关的图的题&#xff0c;需要求单源最短路径&#xff0c;因此使用Dijkstra算法。 题目要求 有 n 个网络节点&#xff0c;标记为 1 到 n。 给你一个列表 times&#xff0c;表示信号经过 有向 边的传递时间。 times[i] (ui, vi, wi)&#xff0c;其中 …

GPT-1

GPT 系列是 OpenAI 的一系列预训练模型&#xff0c;GPT 的全称是 Generative Pre-Trained Transformer&#xff0c;顾名思义&#xff0c;GPT 的目标是通过 Transformer&#xff0c;使用预训练技术得到通用的语言模型。目前已经公布论文的有 GPT-1、GPT-2、GPT-3。 最近非常火的…

java之web笔记

1.Servlet技术 1.1 JavaWeb概述 在Sun的Java Servlet规范中&#xff0c;对Java Web应用作了这样定义:“JavaWeb应用由一组Servlet、HTML页、类、以及其它可以被绑定的资源构成。它可以在各种供应商提供的实现Servlet规范的Servlet容器中运行。 Java Web应用中可以包含如下内容…

B+树详解与实现

B树详解与实现 一、引言二、B树的定义三、B树的插入四、B树的删除五、B树的查找效率六、B树与B树的区别和联系 一、引言 B树是一种树数据结构&#xff0c;通常用于数据库和操作系统的文件系统中。它的特点是能够保持数据稳定有序&#xff0c;其插入与修改拥有较稳定的对数时间…

hive表基本语法

hive表基本语法 青少年是一个美好而又是一去不可再得的时期 是将来一切光明和幸福的开端 目录 hive表基本语法 1.ROW FORMAT用法 2.LOCATION用法 3.EXTERNAL用法 &#xff08;外部表&#xff09; 4.STORED AS 用法&#xff1a;设置数据存储格式 5.TBLPROPERTIES 用法 6.P…

liceo靶机复现

liceo-hackmyvm 靶机地址&#xff1a;https://hackmyvm.eu/machines/machine.php?vmLiceo 本机环境&#xff1a;NAT模式下&#xff0c;使用VirtualBox 信息收集&#xff1a; 首先局域网内探测靶机IP 发现IP为10.0.2.4 开启nmap扫描一下看看开了什么端口 扫描期间看一下web页…

随便聊一下 显控科技 控制屏 通过 RS485 接口 上位机 通讯 说明

系统搭建&#xff1a; 1、自己研发的一个小系统&#xff08;采集信号&#xff0c;将采集的信号数字化&#xff09;通过COM口&#xff0c;连接显控屏 COM3 口采用 485 协议送到显控屏&#xff08;显控科技&#xff09;的显示屏展示出来&#xff09;。 2、显控屏 将 展示的数据…

Neomodel 快速上手 构建neo4j 知识图谱

介绍 python 创建neo4j 数据通常使用py2neo&#xff0c;但是这个包 官方声明已经停止更新&#xff0c;根据neo4j网站推荐使用neomodel neomodel 使用起来很想django 中的orm&#xff0c;如果有django基础的上手很简单&#xff0c;而且neomodel 支持 neo4j 5.X版本更新维护的也…

使用 FFmpeg 推拉 RTSP 流媒体

实时流传输协议 RTSP&#xff08;Real-Time Streaming Protocol&#xff09;是 TCP/IP 协议体系中的一个应用层协议&#xff0c;由哥伦比亚大学、网景和 RealNetworks 公司提交的 IETF RFC 标准。该协议定义了一对多应用程序如何有效地通过 IP 网络传送多媒体数据。RTSP 在体系…

全栈开发之路——前端篇(3)setup和响应式数据

全栈开发一条龙——前端篇 第一篇&#xff1a;框架确定、ide设置与项目创建 第二篇&#xff1a;介绍项目文件意义、组件结构与导入以及setup的引入。 本文为该系列的第三篇&#xff0c;主要讲述Vue核心的setup语法&#xff0c;同时讲解再使用了setup后如何设置响应式数据。 辅助…

基于php+mysql+html超市商品管理系统(含论文)

博主介绍&#xff1a; 大家好&#xff0c;本人精通Java、Python、Php、C#、C、C编程语言&#xff0c;同时也熟练掌握微信小程序、Android等技术&#xff0c;能够为大家提供全方位的技术支持和交流。 我有丰富的成品Java、Python、C#毕设项目经验&#xff0c;能够为学生提供各类…