RDD算子介绍(二)

news2025/1/13 15:50:36

1. coalesce

用于缩减分区,减少分区个数,减少任务调度成本。

val rdd : RDD[Int] = sc.makeRDD(List(1, 2, 3, 4), 4)
val newRDD = rdd.coalesce(2)
newRDD.saveAsTextFile("output")

分区数可以减少,但是减少后的分区里的数据分布并不一定是均匀分布的,比如以下场景:

val rdd : RDD[Int] = sc.makeRDD(List(1, 2, 3, 4, 5, 6), 3)
val newRDD = rdd.coalesce(2)
newRDD.saveAsTextFile("output")

结果现实1和2在一个分区,3 4 5 6四个数在第二个分区。因为coalesce算子默认不会打乱分区的数据进行重新组合的。原来1和2,3和4,5和6分别在三个分区里,如果缩减分区之后1 2 3在一个分区,4 5 6在一个分区,意味着将原来的3和4所在的分区里的数据打乱重新组合了。所以缩减分区后,应该将5和6所在的分区里的数据移到其他分区中去,即3 4 5 6最终在一个分区了。

coalesce算子可能会导致数据倾斜。如果想要数据均衡,需要进行shuffle处理,coalesce算子第二个参数就表示是否shuffle处理,默认是false,改为true即可,但是数据不一定有规律,这就是shuffle的效果。

val rdd : RDD[Int] = sc.makeRDD(List(1, 2, 3, 4, 5, 6), 3)
val newRDD = rdd.coalesce(2, true)
newRDD.saveAsTextFile("output")

结果显示1 4 5在一个分区,2 3 6在一个分区。

2. repartition

coalesce算子也可以增加分区,但是第二个参数须为true,但用得更多的是repartition算子。 

val rdd : RDD[Int] = sc.makeRDD(List(1, 2, 3, 4, 5, 6), 2)
val newRDD = rdd.repartition(3)
newRDD.saveAsTextFile("output")

repartition算子源码中还是调用了coalesce算子(第二个参数为true)。

3. sortBy

见名知义,就是排序。

val rdd : RDD[Int] = sc.makeRDD(List(1, 4, 2, 3, 6, 5), 2)
val newRDD = rdd.sortBy(num=>num)
newRDD.saveAsTextFile("output")

数据重新排序,但是分区数不变,存在shuffle过程。 默认是升序排序,第二个参数传false,表示降序排序。

4. intersection、union、subtract、zip

val rdd1 : RDD[Int] = sc.makeRDD(List(1, 2, 3, 4))
val rdd2 : RDD[Int] = sc.makeRDD(List(3, 4, 5, 6))

val rdd3 = rdd1.intersection(rdd2)
println(rdd3.collect().mkString(","))

val rdd4 = rdd1.union(rdd2)
println(rdd4.collect().mkString(","))

val rdd5 = rdd1.subtract(rdd2)
println(rdd5.collect().mkString(","))

val rdd6 : RDD[(Int, Int)]= rdd1.zip(rdd2)
println(rdd6.collect().mkString(","))

 

 注意事项:

1)交集不去重

2)如果两个rdd的数据类型不同,不能做交集、并集、差集操作,但拉链可以

3)拉链操作的两个rdd的分区数需要一致,且分区中的数据数量也要一致

5. partitionBy

只有数据类型为key-value类型的rdd,才有partitionBy操作。partitionBy本身不是RDD的方法,是通过隐式转化得到的PairRDDFunctions的方法。

val rdd : RDD[Int] = sc.makeRDD(List(1, 2, 3, 4), 2)
val mapRDD = rdd.map((_, 1))

val newRDD = mapRDD.partitionBy(new HashPartitioner(2))
newRDD.saveAsTextFile("output")

这里默认使用HashPartitioner,即按照key的哈希值对分区数取模得到分区号,后续会介绍自定义分区器。

 6. reduceByKey

val rdd : RDD[Int] = sc.makeRDD(List(("a", 1), ("a", 2), ("a", 3), ("b", 4)))
val newRDD : RDD[(String,Int)] = rdd.reduceByKey((x:Int, y:Int) => {x + y})
newRDD.collect().foreach(println)

reduceByKey将相同key的值进行聚合,具体来说是两两聚合。 但是上例中"b"只有一个,是不会做两两聚合计算的。

7. groupByKey

val rdd : RDD[Int] = sc.makeRDD(List(("a", 1), ("a", 2), ("a", 3), ("b", 4)))
val newRDD : RDD[(String,Iterable[Int])] = rdd.groupByKey()
newRDD.collect().foreach(println)

groupByKey根据相同key的值进行分组,形成一个可迭代的集合。这与groupBy类似,但是区别是groupBy的可迭代集合不是原有value的集合,而是原来每个元素(即tuple)的集合:

val rdd : RDD[Int] = sc.makeRDD(List(("a", 1), ("a", 2), ("a", 3), ("b", 4)))
val newRDD : RDD[(String,Iterable[(String, Int)])] = rdd.groupBy(_._1)

reduceByKey和groupByKey的区别:

1) reduceByKey相比于groupByKey不仅做了分组,还做了聚合计算

2)groupByKey会将数据打乱重新组合,即存在shuffle操作。既然存在shuffle操作,如果后续还有map等转换操作,原来一个分区的数据处理完之后还需要等待其他分区的数据处理完,因为shuffle后的分区的数据可能不止来源于原来的一个分区。这种等待可能很耗时,并且占用大量内存,因此需要进行落盘操作。简而言之,shuffle操作必须有落盘处理,不能在内存中进行数据等待,否则可能会导致内存溢出,因此性能也不高

3)reduceByKey也会有shuffle,也会有落盘操作,但是在落盘之前,会对原来每个分区内的数据事先进行分组并聚合计算(预聚合,combine)。这样落盘的数据量少了,磁盘IO也少了,性能也提高了

8. aggregateByKey

reduceByKey会进行预聚合,这是分区内的聚合,然后shuffle操作打乱数据,进行分区间的聚合,此时分区内和分区间的聚合规则是一样的。如果分区内和分区间的聚合计算规则不一样,那就要使用aggregateByKey算子。例如,分区内求最大值,分区间求和:

val rdd : RDD[Int] = sc.makeRDD(List(("a", 1), ("a", 2), ("a", 3), ("a", 4)), 2)
val newRDD : RDD[(String,Int)] = rdd.aggregateByKey(0)((x, y) => math.max(x, y), (x, y) => x + y)
newRDD.collect().foreach(println)

aggregateByKey有两个参数列表,第一个参数列表表示初始值(aggregateByKey的最终计算结果与这个初始值类型是相同的),用于和第一个key的value进行分区内计算,第二个参数列表的两个参数分别表示分区内和分区间的计算规则。

val rdd : RDD[Int] = sc.makeRDD(List(("a", 1), ("a", 2), ("b", 3), ("b", 4), ("b", 5), ("a", 6)), 2)
val newRDD : RDD[(String,Int)] = rdd.aggregateByKey(5)((x, y) => math.max(x, y), (x, y) => x + y)
newRDD.collect().foreach(println)

 

计算相同key的value的平均值:

val rdd : RDD[Int] = sc.makeRDD(List(("a", 1), ("a", 2), ("b", 3), ("b", 4), ("b", 5), ("a", 6)), 2)

val newRDD : RDD[(String,(Int, Int))] = rdd.aggregateByKey((0, 0))((t, v) => (t._1 + v, t._2 + 1), (t1, t2) => (t1._1 + t2._1, t1._2 + t2._2))

val result = newRDD.mapValue {
    case (val, num) => {
        val / num
    }
}

result.collect().foreach(println)

9. foldByKey

如果分区内和分区间的计算规则一样,可以使用foldByKey算子。

val rdd : RDD[Int] = sc.makeRDD(List(("a", 1), ("a", 2), ("b", 3), ("b", 4), ("b", 5), ("a", 6)), 2)
val newRDD : RDD[(String,Int)] = rdd.foldByKey(0)(_+_)
newRDD.collect().foreach(println)

 10. combineByKey

aggreagteByKey的初始值在一些场景其实很难确定,但如果初始值是相同key的第一个value或者其适当转换,就更为合理。

val rdd : RDD[Int] = sc.makeRDD(List(("a", 1), ("a", 2), ("b", 3), ("b", 4), ("b", 5), ("a", 6)), 2)

val newRDD : RDD[(String,(Int, Int))] = rdd.combineByKey(v => (v, 1))((t : (Int, Int), v) => (t._1 + v, t._2 + 1), (t1 : (Int, Int), t2 : (Int, Int)) => (t1._1 + t2._1, t1._2 + t2._2))

val result = newRDD.mapValue {
    case (val, num) => {
        val / num
    }
}

result.collect().foreach(println)

 

wordCount的多种实现方式(假设已经得到所有的(单词, 1)的tuple):

rdd.reduceByKey(_+_)
rdd.aggregateByKey(0)(_+_, _+_)
rdd.foldByKey(0)(_+_)
rdd.comnbineByKey(v=>v)((x:Int, y:Int)=>x+y, (x:Int, y:Int)=>x+y)

观察源码,发现他们底层调用的都是combineByKey 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1507282.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Keepalived+LVS构建高可用集群

目录 一、Keepalive基础介绍 1. Keepalive与VRRP 2. VRRP相关技术 3. 工作原理 4. 模块 5. 架构 6. 安装 7. Keepalived 相关文件 7.1 配置组成 7.2 全局配置 7.3 VRRP实例配置(lvs调度器) 7.4 虚拟服务器与真实服务器配置 二、Keepalived…

【数据库】软件测试之MySQL数据库面试总结

有表如下: Student 学生表 SC 成绩表 Course 课程表 Teacher 老师表 每个学生可以学习多门课程,每一个课程都有得分,每一门课程都有老师来教,一个老师可以教多个学生 1、查询姓‘朱’的学生名单 select * from Student whe…

010Editor汉化版+下载+注册码+模板bug

项目场景: 这天我想使用我的不知名的一个破解版本的010Edit来查看一个EXE程序,并想使用模板功能,但是发现没有该模板还无法下载最新模板 问题描述 010Edit联网后需要注册码: 010 Editor 激活码生成器 使用方法 参照教程使用0…

面试题:分布式锁用了 Redis 的什么数据结构

在使用 Redis 实现分布式锁时,通常使用 Redis 的字符串(String)。Redis 的字符串是最基本的数据类型,一个键对应一个值,它能够存储任何形式的字符串,包括二进制数据。字符串类型的值最多可以是 512MB。 Re…

浅谈Maven

Maven能为我们解决什么问题 1:添加第三方jar包 按照最原始的做法,我们是手动复制jar包到项目WEB-INF/lib下,每个项目都会有一份,造成大量重复文件。而Maven将jar包放在本地仓库中统一管理,需要jar包只需要用坐标的方式…

使用Canvas绘制一个自适应长度的折线图

要求x轴根据数据长度自适应 y轴根据数据最大值取长度值 <template><div ref"cvsContainer" class"cvs-container"><canvas ref"cvs" class"canvas"></canvas></div> </template><script set…

188基于matlab的AR模型参数估计

基于matlab的AR模型参数估计&#xff0c;burg法和ule-Walker法估计信号&#xff0c;并输出估计误差。程序已调通&#xff0c;可直接运行。 188 AR模型参数估计 burg法和ule-Walker法 (xiaohongshu.com)

离散数学例题——6.树和特殊图

树 树的证明 森林 同构树非同构树 生成树 有向树 二叉树遍历 哈夫曼树 特殊图 欧拉图&#xff08;一次边&#xff09; Fleury算法求欧拉回路 欧拉通路&#xff08;一笔画&#xff09; 哈密顿图&#xff08;一次点&#xff09; 哈密顿图的充分条件 哈密顿图必要条件 二部图 二部…

人工智能OCR领域安全应用措施

引言 编写目的 随着新一轮科技革命和产业变革的深入发展&#xff0c;5G、大数据、云计算、深度学习等新技术日益成为推动社会进步的核心动力。人工智能&#xff08;AI&#xff09;作为这些新技术的集大成者&#xff0c;正迅速成为新型基础设施建设的战略性支柱&#xff0c;其广…

【详识C语言】自定义类型之三:联合

本章重点 联合 联合类型的定义 联合的特点 联合大小的计算 联合&#xff08;共用体&#xff09; 联合类型的定义 联合也是一种特殊的自定义类型 这种类型定义的变量也包含一系列的成员&#xff0c;特征是这些成员公用同一块空间&#xff08;所以联合也叫共用体&#xff09;…

AI 辅助研发趋势

引言 随着人工智能技术的持续发展与突破&#xff0c;AI辅助研发正成为科技界和工业界瞩目的焦点。从医药研发到汽车设计&#xff0c;从软件开发到材料科学&#xff0c;AI正逐渐渗透到研发的各个环节&#xff0c;变革着传统的研发模式。在这一背景下&#xff0c;AI辅助研发不仅…

Linux学习——锁

目录 ​编辑 一&#xff0c;锁的概念 二&#xff0c;锁的操作 1&#xff0c;锁类型 pthread_mutex_t 2&#xff0c;初始化锁 3&#xff0c;上锁 4&#xff0c;解锁 5&#xff0c;销毁锁 三&#xff0c;线程安全问题演示 四&#xff0c;锁的原理 五&#xff0c;死锁 …

每日OJ题_牛客HJ73 计算日期到天数转换(IO型OJ)

目录 牛客HJ73 计算日期到天数转换 解析代码 牛客HJ73 计算日期到天数转换 计算日期到天数转换_牛客题霸_牛客网 解析代码 #include <iostream> using namespace std; int main() {int year 0, month 0, day 0, sum 0;cin >> year >> month >>…

【SpringBoot框架篇】36.整合Tess4J搭建提供图片文字识别的Web服务

文章目录 简介文件下载引入依赖main函数中使用基于Springboot搭建OCR Web服务配置traineddata路径枚举用到的语种类型定义接口响应的json数据格式封装OCR服务引擎编写web提供服务的接口启动服务并且测试html demo扩展 项目配套代码 简介 Tess4J是一个基于Tesseract OCR引擎的J…

[Java安全入门]三.URLDNS链

一.前言 在初步学习java的序列化和反序列化之后&#xff0c;这里学习java反序列化漏洞的一个利用链&#xff0c;也是比较基础的一条链。 由于URLDNS不需要依赖第三方的包&#xff0c;同时不限制jdk的版本&#xff0c;所以通常用于检测反序列化的点。 二.代码展开分析 构造链 …

Learn OpenGL 03 着色器

GLSL 着色器的开头总是要声明版本&#xff0c;接着是输入和输出变量、uniform和main函数。每个着色器的入口点都是main函数&#xff0c;在这个函数中我们处理所有的输入变量&#xff0c;并将结果输出到输出变量中。 一个典型的着色器有下面的结构&#xff1a; #version vers…

[java入门到精通] 10 常用API , 正则表达式 , Collection集合

今日目标 BigInteger类BigDecimal类Arrays类包装类String类的常用方法正则表达式Collection集合 1 BigInteger类 1.1 概述 概述 : java.math.BigInteger类是一个引用数据类型 , 可以用于计算一些大的整数 , 当超出基本数据类型数据范围的整数运算时就可以使用BigInteger了。…

Arduino Uno使用Mind+实现图形化编程

文章目录&#xff1a; 一&#xff1a;软件下载安装 1.下载安装 1.1 开发软件 2.辅助软件 2.主控板 二&#xff1a;基础 1.LED 2.传感器 3.智能小车 三&#xff1a;学习资源 一&#xff1a;软件下载安装 1.下载安装 1.1 开发软件 Arduino IDE代码编程软件&#…

集合和数组的相关操作

目录 1.数组转集合(引用类型数组) 2.数组转集合(基础类型数组) 3.集合转数组 4.集合之间是否相交 5.获取两个集合的交集 6.集合转为字符串 1.数组转集合(引用类型数组) (1)Arrays.asList 示例&#xff1a; String[] colArr new String[6];colArr[0] "1";co…

Fastgithub

上Github太慢、打不开怎么办&#xff1f; 选择之一是Fastgithub工具&#xff0c;同时支持win, linux, mac。 1. 工作原理 从公共dns服务器拿到github的大量ip数据&#xff0c;检测哪些ip可用&#xff0c;哪些ip访问速度最佳&#xff0c;然后编写一个本地版的dns服务&#xff0…