用一个例子告诉你 怎样使用Spark中RDD的算子

news2025/1/20 19:17:32

目录

1. 前言

1.1 操作分类

1.2 语法知识

2. transformations

2.1 map

2.2  mapPartitions

2.3 flatMap

2.4 glom

2.5 groupBy

2.6 filter 

2.7 sample

2.8 distinct

2.9 coalesce

2.10 repartition

2.11 sortBy

2.12 partitionBy

2.13 reduceByKey

2.14 groupByKey

2.15 aggregateByKey

2.16 foldByKey

2.17 combineByKeyWithClassTag

2.18 combineByKey

2.19 sortByKey

2.20 cogroup

2.21 join、leftOuterJoin、rightOuterJoin、fullOuterJoin

2.22 intersection、union、subtract、zip

3. actions

3.1 reduce

3.2 collect

3.3  count

3.4 first

3.5 take

3.6  takeOrdered

3.7 aggregate

3.8 fold

3.9 countByKey

3.10  saveAsTextFile 、saveAsObjectFile 、 saveAsSequenceFile

3.11 foreach


1. 前言

我们可以将RDD想象成一张分布式的表,表中的数据以分区的形式分布在不同的计算节点上

对表操作称之为算子,可以用SQL的思想来理解这些操作

1.1 操作分类

在spark中,RDD支持两种类型的操作
        1.transformations(转换算子)
        功能:
                  从现有的RDD中通过某种转换规则,创建的新RDD
        特点:
                 所有的转换操作都是懒加载,并不会立即进行转换操作
                 只有当驱动程序需要计算结果时,才会触发转换行为

        2.actions(行动算子)
        功能:
                 将各个计算节点上的结果数据,返回给驱动程序(客户端)

通常,我们也会将RDD的操作称之为算子,也就是人们常说 转换算子、行动算子

 官方API链接:   https://spark.apache.org/docs/latest/api/scala/org/apache/spark/rdd/RDD.html


1.2 语法知识

        这块知识需要对scala基本语法有些了解,才会对API调用有更好的理解

        什么是Lambda表达式? 传送门

        什么是函数柯里化?传送门

        什么是隐式转换及隐式函数?传送门


2. transformations

2.1 map

功能:  返回一个新的RDD,对父RDD的每个元素按照f函数进行转换

可以看我之前写的例子 : 传送门

重点关注:

        分区内元素依次被指定Lambda表达式执行(串行),多个分区间并发执行(并行)

        如果指定的Lambda表达式中存在消耗时间的逻辑(如 数据库连接、IO等)

        请选择使用mapPartitions


2.2  mapPartitions

功能: 返回一个新的RDD,对父RDD每个分区按照f函数进行转换

可以看我之前写的例子 : 传送门

重点关注: 1.map和mapPartitions的区别

                2.mapPartitions的使用风险和使用场景

                3.Lambda表达式每次会处理整个分区的数,小心内存溢出哦😯


2.3 flatMap

功能:  返回一个新的RDD,将父RDD中每个元素转换成集合,再将集合打散

可以看我之前写的例子 : 传送门


2.4 glom

功能: 返回一个新的RDD,将父RDD每个分区中的所有元素封装到数组中去

可以看我之前写的例子 : 传送门

重点关注:

         1. 用于将父RDD中每个分区的数据打包成数组

         2. 不会触发shuffle操作哦        


2.5 groupBy

功能: 返回一个新的RDD,对父RDD所有元素按照f函数的结果进行分组

可以看我之前写的例子 : 传送门

实现1: 使用默认分区器,分区数=父RDD分区数

实现2: 指定分区数,默认使用Hash分区器

 实现3: 使用指定分区器

 重点关注:  

             1.这个操作会触发shuffle操作,当数据分布不均时,可能会造成某个分区数据量过大
                而导致内存溢出哦😯,也就是常说的数据倾斜         

             2.如果分组的目的是为了做 聚合操作,建议使用 reduceByKey、aggregateByKey 
                效率会高很多(这些算子,会map端先做一次聚合操作,来减少IO的数据量) 


2.6 filter 

功能: 返回一个新的RDD,对父RDD的所有元素按照 f(x) = true 进行过滤

可以看我之前写的例子 : 传送门


2.7 sample

功能: 返回一个新的RDD,对父RDD做抽样查询

可以看我之前写的例子 : 传送门


2.8 distinct

功能: 返回一个新的RDD,对父RDD元素去重

可以看我之前写的例子 : 传送门

实现1: 不指定分区个数

实现2: 指定分区个数

重点关注:  

              1. 会触发shuffle操作,会先在map端对数据去重后,再在reduce端去重


2.9 coalesce

功能: 返回一个新的RDD,增加或减少父RDD的分区个数(合并分区时,可以选择不shuffle)

可以看我之前写的例子 : 传送门

 重点关注:

        shuffle = true时,会触发shuffle操作,小心数据倾斜哦😱

        合并分区时,建议使用 coalesce且shuffle=false   


2.10 repartition

功能: 返回一个新的RDD,增加或减少父RDD的分区个数(必触发shuffle)

可以看我之前写的例子 : 传送门 

重点关注:

        一定会触发shuffle操作,如果是减少分区,建议使用  coalesce


2.11 sortBy

功能: 返回一个新的RDD,对父RDD根据f函数的结果排序

可以看我之前写的例子 : 传送门

 重点关注:

        会触发shuflle操作,小心数据倾斜哦😱


2.12 partitionBy

功能:  返回一个新的RDD,按照指定分区器对父RDD重新分区

可以看我之前写的例子 :  传送门

注意事项:

        1.当父RDD数据分布不均时,可以使用此方法将数据打散


2.13 reduceByKey

功能: 返回一个新的RDD,根据指定的聚合规则对父RDD  按照key做聚合

         分区内(map端)、分区间(reduce端)聚合逻辑相同,且没有初始值参与聚合

可以看我之前写的例子 : 传送门

实现1:

实现2:

 实现3:使用默认分区器,分区数和父RDD相同

重点关注:  

        1. 这个方法 会先在每个map端本地做一次聚合,合并完后再发送到reduce端聚合

        2. 此方法会触发shuffle操作,小心数据倾斜哦😟!!!


 2.14 groupByKey

功能: 返回一个新的RDD,按照key对父RDD做分组

可以看我之前写的例子 : 传送门

实现1:

 实现2:

 重点关注:  

             1. 目前实现方式,会先将每个key的所有键值对读取到内存中,如果一个key的值过多时
                 就会导致OutOfMemoryError错误,使用前一定要评估数据量和内存资源

             2.这个操作会触发shuffle操作,当数据分布不均时,可能会造成某个分区数据量过大
                而导致内存溢出哦😯,也就是常说的数据倾斜         

             3.如果分组的目的是为了做 聚合操作,那么可以使用 reduceByKey、aggregateByKey 
                效率会高很多(这些算子,会map端先做一次聚合操作,来减少IO的数据量) 


2.15 aggregateByKey

功能: 返回一个新的RDD,根据指定的聚合规则 对父RDD 按照key做聚合

         分区内(map端)、分区间(reduce端)聚合逻辑可以不相同,且有初始值参与聚合

可以看我之前写的例子 :  传送门

方式1:

 方式2:

方式3:

重要关注:

           1. zeroValue值 会参与分区聚合计算和分区间聚合计算

           2. 此方法会触发shuffle操作,小心数据倾斜哦😟!!!


2.16 foldByKey

功能:  返回一个新的RDD,根据指定的聚合规则 对父RDD 按照key做聚合

         分区内(map端)、分区间(reduce端)聚合逻辑相同,且有初始值参与聚合

可以看我之前写的例子 :  传送门 

方式1:

方式2:

 方式3:

重要关注:

           1. zeroValue值 会参与分区聚合计算和分区间聚合计算

           2. 此方法会触发shuffle操作,小心数据倾斜哦😟!!!


2.17 combineByKeyWithClassTag

功能:  返回一个新的RDD,根据指定的聚合规则 对父RDD 按照key做聚合

         分区内(map端)、分区间(reduce端)聚合逻辑可以不相同,且有初始值参与聚合

         并且可以转换value的数据类型

可以看我之前写的例子 :  传送门

实现1:实现2: 实现3:

重点关注:

          1. 这是RDD聚合操作中最通用的方法,其他聚合函数都是对它的封装

          2. RDD分区数量和数据分布直接会影响聚合操作的效率,使用时注意数据分布哦


2.18 combineByKey

功能:  返回一个新的RDD,根据指定的聚合规则 对父RDD 按照key做聚合

         分区内(map端)、分区间(reduce端)聚合逻辑可以不相同,没有初始值参与聚合

         并且可以转换value的数据类型

可以看我之前写的例子 :  传送门

方式1:方式2:重点关注:

        1. 查看源码,是对combineByKeyWithClassTag的封装


2.19 sortByKey

功能:  返回一个新的RDD,元素值为  父RDD 根据key排序的结果

可以看我之前写的例子 : 传送门

重点关注:

        1. key的数据类型 必须实现 Ordered 接口(特质)

        2. key类型为tuple时,无法使用该方法排序

        3. 存在shuffle过程,小心数据倾斜哦


2.20 cogroup

功能:  返回一个新的RDD,元素值为 多个RDD下相同key下 各自value值的迭代器

可以看我之前写的例子 : 传送门

实现1:实现2:

 实现3:

注意事项:

        1. 多个RDD关联最通用的方法

        2. 会触发shuffle操作,小心数据倾斜哦


2.21 join、leftOuterJoin、rightOuterJoin、fullOuterJoin

功能:  返回一个新的RDD,元素值为 多个RDD下相同key下 的各自value值

可以看我之前写的例子 :  传送门 

join:leftOuterJoin:rightOuterJoin:fullOuterJoin:重点关注:

        1. 发下没有,都是通过封装 cogroup + flatMapValues 来实现的

        2. 会触发shuffle操作,小心数据倾斜哦


2.22 intersection、union、subtract、zip

功能:  返回一个新的RDD,元素值为 两个RDD求交集、并集、差集 的结果

可以看我之前写的例子 :  传送门

intersection:返回两个RDD的交集,结果将不包含任何重复元素  (内部会触发shuffle过程)

union:返回多个RDD的并集,结果会有重复元素 (内部不会触发shuffle过程)

subtract:返回两个RDD的差集 (内部会触发shuffle过程)

zip:返回两个RDD按元素顺序对应的二元组 (内部不会触发shuffle过程)


3. actions

3.1 reduce

功能: 根据指定的计算规则,对RDD所有的元素依次做运算,并返回计算结果给驱动程序(Driver)

可以看我之前写的例子 : 传送门

注意事项:

        1.先在每个分区内做聚合操作(Map端),再对各个分区的结果做聚合操作

           如果操作不满足结合律和交换律时(如减法、除法), 当分区个数不同时,计算结果也会不同


3.2 collect

功能:返回给Driver端一个数组,数组内容为RDD所有的元素

可以看我之前写的例子 :  传送门

注意事项:

        1. 当RDD元素过多时,小心Driver端内存溢出哦


3.3  count

功能:返回RDD元素个数给Driver端

可以看我之前写的例子 : 传送门


3.4 first

功能:返回RDD第一个元素的值 给Driver端

可以看我之前写的例子 : 传送门


3.5 take

功能: 返回给Driver端一个数组,数组内容为RDD的前n项元素

可以看我之前写的例子 : 传送门

注意事项:

        1. 当返回元素过多时,小心Driver端内存溢出哦

        2. 如果在Nothing或Null的RDD上调用此方法将引发异常 


3.6  takeOrdered

功能: 返回给Driver端一个数组,数组内容为RDD排序后的前n项元素

可以看我之前写的例子 : 传送门

 注意事项:

        1. 当返回元素过多时,小心Driver端内存溢出哦


3.7 aggregate

功能:对RDD做聚合操作,并将聚合的结果返回给Driver端

可以看我之前写的例子 : 传送门

注意事项:

        1.zeroValue会参与分区内聚合运算和分区间聚合运算

           通常会将它设置成一个中立元素(列表连接Nil 计数时为0)


3.8 fold

功能:对RDD做聚合操作,并将聚合的结果返回给Driver端

可以看我之前写的例子 : 传送门


3.9 countByKey

 功能:计算RDD中每个key下的value的个数,并将结果返回给Driver端

可以看我之前写的例子 : 传送门

注意事项:

        1. 当返回结果集过大时,小心Driver端内存溢出哦


3.10  saveAsTextFile 、saveAsObjectFile 、 saveAsSequenceFile

可以看我之前写的例子:传送门

saveAsTextFile:

功能:将RDD以文本文件的格式保存到指定路径

实现1:

实现2:


saveAsObjectFile

功能:将RDD以序列化对象的格式保存到指定路径


 saveAsSequenceFile 

功能:将RDD以Hadoop SequenceFile的格式保存到指定路径


3.11 foreach

功能:将指定的Lambda表达式,应用在RDD的每个元素上

可以看我之前写的例子:传送门

重点关注:

        1. 分区内按元素顺序依次执行Lambda表达式,分区间是并行的

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/378061.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

从软件的角度看待PCI和PCIE(一)

1.最容易访问的设备是什么? 是内存! 要读写内存,知道它的地址就可以了,不需要什么驱动程序; volatile unsigned int *p 0xffff8811; unsigned int val; *p val; val *p;只有内存能这样简单、方便的使用吗&#xf…

python中的序列——笔记

一、介绍 ABC语言时一个致力于为初学者设计编程环境的长达十年的研究项目。 Python也从ABC那里继承了用统一的风格去处理序列数据这一特点。不管是哪种数据结构,字符串、列表、字节序列、数组、XML元素,抑或是数据库查询结果,它们都共用一套…

SD卡损坏了?储存卡恢复数据就靠这3个方法

作为一种方便的储存设备,SD卡在我们的日常生活中使用非常广泛。但是,有时候我们可能会遇到SD卡损坏的情况,这时候里面存储的数据就会受到影响。SD卡里面保存着我们很多重要的数据,有些还是工作必须要使用的。 如果您遇到了这种情…

百度CTO王海峰:深度学习平台+大模型,夯实产业智能化基座

2月27日,中国人工智能学会首届智能融合产业论坛在成都顺利举办。本届论坛由中国人工智能学会(CAAI)主办,中国人工智能学会智能融合专委会、百度公司、深度学习技术及应用国家工程研究中心和电子科技大学联合承办。中国工程院多名院…

低代码开发平台选型必看指南

低代码开发是近年来逐渐兴起的一种新型软件开发方式。它通过封装常见的软件开发流程和代码,使得非专业的开发者也能够轻松创建复杂的应用程序。这种开发方式已经受到了许多企业的青睐,成为提高生产效率、降低开发成本的一种有效途径。 低代码开发的核心…

TryHackMe-The Great Escape(Docker)

The Great Escape 我们的开发人员创建了一个很棒的新网站。你能冲出沙盒吗? 端口扫描 循例 nmap Web信息收集 robots.txt: /exif-util是文件上传点,但是绕过之后貌似没啥用 在robots.txt当中披露了可能存在.bak.txt,现在我们已知的文件就是…

JavaScript基础语法入门

一. JS简介 JavaScript , 简称JS, JS最初只是为了进行前端页面开发, 但随这后来JS越来越火之后, JS就被赋予了更多的功能, 可以用来开发桌面程序, 手机App, 服务器端的程序等… JS是一种动态类型, 弱类型的脚本语言, 通过解释器运行, 主要在客户端和浏览器上运行, 比如Chrome…

Zeppelin安装

1、下载Zeppelin 下载地址:Download 2.解压 [rootguo147 install]# tar -zxvf zeppelin-0.10.0-bin-all.tgz -C ../soft/ //修改文件名 [rootguo147 soft]# mv zeppelin-0.10.0-bin-all/ zeppelin 3.配置 //进入conf 目录 [rootguo147 conf]# pwd /opt/soft/zepp…

C语言——自定义类型

前言 在之前我们粗略的介绍了一下结构体,但是自定义类型可不仅仅只有结构体类型,今天我们就来介绍自定义类型。 一. 结构体 1.1 结构体的声明 结构体就是将一些值集合在一个结构体变量中,并将这些值称为成员变量,结构体的成员…

Yolov5-Python系列(一)—— 基础入门(yolov5安装、简单使用)

一、资源准备 推荐使用Anconda环境:通过Anaconda则可以通过创造新的虚拟环境解决资源包(python库)之间冲突问题。 (一)Anconda安装:https://www.anaconda.com/download (二)Yolov5 下…

论文《PointTAD》

模型的输出集合(Tns,Tne,Cn),Tns是第n个预测动作开始时间,Tne是第n个预测动作结束时间,Cn是第n个预测动作的类别。 模型有三个输入:1.RGB帧 2.可学习的query points 3.query vect…

佛科院计算机软件技术基础——线性表

一、基础知识了解:结构体的理解:我们知道整型是由1位符号位和15位数值位组成,而就可以把结构体理解为我们定义的数据类型,如:typedef struct {int data[2]; //存储顺序表中的元素int len; …

Python识别二维码的两种方法(cv2)

在学习Python处理二维码的过程中,我们看到的大多是“用python生成酷炫二维码”、“用Python制作动图二维码”之类的文章。而关于使用Python批量识别二维码的教程,并不多见。所以今天我会给大家分享两种批量识别二维码的Python技巧!pyzbar PI…

【架构师】零基础到精通——服务与网关

博客昵称:架构师Cool 最喜欢的座右铭:一以贯之的努力,不得懈怠的人生。 作者简介:一名Coder,软件设计师/鸿蒙高级工程师认证,在备战高级架构师/系统分析师,欢迎关注小弟! 博主小留言…

IV测试系统3A太阳能模拟器在光伏中应用

一、概述IV测试系统3A太阳能模拟器应具备光束准直、光斑均匀、辐照稳定、且与太阳光谱匹配的特点,使用户可足不出户的完成需要太阳光照条件的测试。科迎法电气提供多规格高品质的太阳模拟器,可适用于单晶硅、多晶硅、非晶硅、染料敏化、有机、钙钛矿等各…

织梦TXT批量导入TAG标签并自动匹配相关文章插件

织梦TXT批量导入TAG标签并自动匹配相关文章插件是一种非常有用的插件,它可以帮助网站管理员快速地将TAG标签添加到文章中,并自动匹配相关文章。 以下是该织梦TXT批量导入TAG标签插件的几个优点: 1、提高网站的SEO效果:TAG标签是搜…

如何利用ReconPal将自然语言处理技术应用于信息安全

关于ReconPal 网络侦查一直是网络安全研究以及渗透测试活动中最重要的阶段之一,而这一阶段看起来很容易,但往往需要很大的努力和很强的技术才能做好来。首先,我们需要使用正确的工具、正确的查询/语法以及正确的操作,并将所有信息…

服务拆分及远程调用

目录 服务拆分 服务拆分注意事项 服务间调用 步骤一:注册RestTemplate 步骤二:修改业务层代码 总结: 提供者和消费者 思考 服务调用关系 服务拆分 服务拆分注意事项 单一职责:不同微服务,不要重复开发相同业…

备战英语6级——记录复习进度

开始记录—— 学习:如何记录笔记? 1:首先我认为:电脑打字比较适合我! 2:先记笔记,再“填笔记”! 记笔记就是一个框架,记录一个大概的东西。后面需要在笔记中&#xff0…

WEB前端性能(页面+接口)

WEB前端性能(页面接口)前端性能渲染过程Blocked时间Connect时间Send时间Waiting时间TTFBReceive时间响应时间OS相关指标idleiowaitussyswapmemory前端性能渲染过程 Blocked时间 是浏览器查看本地有没有缓存的资源,不会与服务器进行交互&…