Spark 的介绍与搭建:从理论到实践_spark环境搭建-CSDN博客
Spark 的Standalone集群环境安装与测试-CSDN博客
PySpark 本地开发环境搭建与实践-CSDN博客
Spark 程序开发与提交:本地与集群模式全解析-CSDN博客
Spark on YARN:Spark集群模式之Yarn模式的原理、搭建与实践-CSDN博客
Spark 中 RDD 的诞生:原理、操作与分区规则-CSDN博客
Spark 中的 RDD 分区的设定规则与高阶函数、Lambda 表达式详解-CSDN博客
目录
一、RDD 算子分类
(一)Transformation 算子(转换算子)
(二)Action 算子(触发算子 / 行为算子)
(三)各个算子的作用,对比sql中的关键字
二、常用转换算子详细解析
(一)map 算子
(二)flatMap 算子
(三)filter 算子
三、常见触发算子详细解析
(一)count 算子
(二)foreach 算子
(三)saveAsTextFile 算子
四、其他转换算子
(一)union 算子
(二)distinct 算子
(三)分组聚合算子:groupByKey、reduceByKey
(四)排序算子:sortBy、sortByKey
sortBy
sortByKey
(五)重分区算子:repartition、coalesce
repartition
coalesce
五、其他触发算子
(一)first 算子
(二)take 算子
(三)collect 算子
(四)reduce 算子
(五)TopN 算子:top、takeOrdered
top 算子
takeOrdered 算子
六、算子的其他方面
(一)面试题:groupByKey + map 和 reduceByKey 的区别
区别
map 端的聚合(Combiner)
(二)其他 KV 类型算子
keys
values
mapValues
collectAsMap
(三)join 方面的算子
join / fullOuterJoin / leftOuterJoin / rightOuterJoin
(四)分区算子
为什么需要分区算子
mapPartitions
foreachParition
七、总结
(一)触发算子
(二)转换算子
(三)能触发shuffle过程的算子
在大数据处理领域,Spark 中的 RDD(弹性分布式数据集)是核心概念之一。RDD 算子则是对 RDD 进行操作的关键工具,它们决定了数据的处理方式和流程。深入理解 RDD 算子对于高效地使用 Spark 处理大规模数据至关重要。本文将详细介绍 RDD 的常用基础算子,包括算子的分类、功能、代码示例、常见问题以及面试相关要点。
一、RDD 算子分类
Spark为了避免资源浪费,将RDD的读取、转换设计为lazy模式 【只定义,不执行】需要等待真正使用到对应RDD的数据返回给用户时,才真正的执行所有RDD的构建和转换。
(一)Transformation 算子(转换算子)
- 特点
- 处于 lazy 模式,一般不会触发 job 的运行。这意味着只有当需要使用该算子处理后的数据时,才会真正执行相关的计算。
- 算子返回值一定是 RDD。这种设计使得可以对 RDD 进行连续的转换操作,构建复杂的数据处理管道。
- 常见的 Transformation 算子
- map:对 RDD 中的每个元素进行一对一的映射操作。
- filter:根据给定的条件过滤 RDD 中的元素。
- flatMap:对 RDD 中的每个元素进行操作,将每个元素映射为 0 个或多个新元素,并将结果扁平化。
- reduceByKey:针对键值对(KV)类型的 RDD,根据相同的 key 对 value 进行聚合计算。
- groupByKey:对 KV 类型的 RDD 按照 Key 进行分组,将相同 key 的 value 放入一个集合列表中。
- sortByKey:对 KV 类型的 RDD 按照 key 进行排序。
(二)Action 算子(触发算子 / 行为算子)
- 特点
- 一定会触发 job 的运行,这是与 Transformation 算子的重要区别。当执行 Action 算子时,Spark 会开始执行之前定义的一系列 Transformation 操作。
- 返回值一定不是 RDD。根据不同的 Action 算子,返回值类型各异,如单个元素、集合、写入文件等操作的结果。
- 常见的 Action 算子
- foreach:对 RDD 中的每个元素执行给定的函数。通常用于对 RDD 中的数据进行输出或其他副作用操作。
- first:返回 RDD 中的第一个元素。
- count:返回 RDD 中的元素个数。
- reduce:对 RDD 中的元素进行聚合操作,需要提供一个聚合函数。
- saveAsTextFile:将 RDD 中的数据保存为文本文件。
- collect:将 RDD 中的所有元素收集到驱动程序中,形成一个本地集合。但要注意,如果 RDD 数据量过大,可能会导致内存溢出。
- take:返回 RDD 中的前 n 个元素。
(三)各个算子的作用,对比sql中的关键字
类比SQL处理数据的常见功能,记住常用算子的功能、语法、场景
过滤数据:where、having => filter
处理数据:字符串函数、日期函数 => map
展开数据:explode => flatMap合并数据:union、join => union join
去重数据:distinct => distinct
分组聚合:group by + 聚合函数 => groupByKey、 reduceByKey
排序数据:order by 、sort by => sortBy、top
二、常用转换算子详细解析
(一)map 算子
map
算子对 RDD 中的每个元素进行一对一的映射。它接受一个函数作为参数,该函数应用于 RDD 中的每个元素。例如,假设我们有一个存储学生成绩的 RDD,每个元素是一个学生的成绩,我们可以使用 map
算子将每个成绩转换为等级(如 90 分及以上为 A,80 - 89 分为 B 等)。这种转换不会改变 RDD 的元素个数,只是对每个元素的值进行了修改。
功能特点
功能:对RDD中每个元素调用一次参数中的函数,并将每次调用的返回值直接放入一个新的RDD中
分类:转换算子
场景:一对一的转换,需要返回值
语法格式:
def map(self , f: T -> U ) -> RDD[U]
f:代表参数是一个函数
T:代表RDD中的每个元素
U:代表RDD中每个元素转换的结果
举例说明
需求:计算每个元素的立方
原始数据
1 2 3 4 5 6
目标结果
1 8 27 64 125 216
list01 = [1,2,3,4,5,6]
listRdd = sc.parallelize(list01)
mapRdd = listRdd.map(lambda x: math.pow(x,3))
mapRdd.foreach(lambda x: print(x))
(二)flatMap 算子
flatMap
算子在处理数据时,先对每个元素应用一个函数,这个函数返回一个可迭代对象,然后将所有这些可迭代对象扁平化。例如,在处理文本数据时,如果我们有一个 RDD,其中每个元素是一个段落,我们可以使用 flatMap
算子将每个段落拆分成单词,然后将所有单词组成一个新的 RDD。这对于后续的文本分析任务,如单词计数、词频统计等非常有用。
功能特点
功能:将两层嵌套集合中的每个元素取出,扁平化处理,放入一层
集合中返回,类似于SQL中explode函数
分类:转换算子
场景:多层集合元素展开,一个集合对应多个元素【一对多】
语法:
def flatMap(self , f : T -> Iterable[U]) -> RDD[U]Iterable :传递进来的数据,必须至少是Iterable ,这个类中要实现 __iter__
Iterator: 迭代器 __next__ 以及 __iter__判断一个对象是否为可迭代数据类型:
print(isinstance(map(str, [10, 20, 30]), Iterator)) # True
举例说明
需求:返回为一个可迭代对象
夜曲/发如雪/东风破/七里香
十年/爱情转移/你的背包
日不落/舞娘/倒带
鼓楼/成都/吉姆餐厅/无法长大
月亮之上/荷塘月色
fileRdd = sc.textFile("../datas/a.txt",2)
flatRdd = fileRdd.flatMap(lambda line: line.split("/"))
flatRdd.foreach(lambda x: print(x))
原理:
[ 夜曲/发如雪/东风破/七里香,十年/爱情转移/你的背包 ...]
||
[ [夜曲,发如雪,东风破,七里香],[十年,爱情转移,你的背包.....]]
||
flatMap
||
[夜曲,发如雪,东风破,七里香,十年,爱情转移,你的背包]
(三)filter 算子
filter
算子根据给定的条件筛选 RDD 中的元素。条件可以是任何返回布尔值的函数。例如,在处理电商订单数据的 RDD 时,我们可以使用 filter
算子筛选出特定地区的订单,或者筛选出金额大于某个阈值的订单。这种筛选操作可以大大减少后续处理的数据量,提高处理效率。
功能特点
功能:对RDD集合中的每个元素调用一次参数中的表达式对数据进
行过滤,符合条件就保留,不符合就过滤
分类:转换算子
场景:行的过滤,类似于SQL中where或者having
def filter(self, f: T -> bool ) -> RDD[T]
举例说明
需求:
1 周杰伦 0 夜曲/发如雪/东风破/七里香
2 陈奕迅 0 十年/爱情转移/你的背包
3 1 日不落/舞娘/倒带
4 赵雷 0 鼓楼/成都/吉姆餐厅/无法长大
5 凤凰传奇 -1 月亮之上/荷塘月色
fileRdd = sc.textFile("../datas/b.txt",2)
# 这个说切割的时候有问题,数组下标越界了
# fileRdd.foreach(lambda line: print(line.split(" ")))
# filterRdd = fileRdd.filter(lambda line: line.split(" ")[2] != '-1' and len(line.split(" ")) == 4 )
#
filterRdd = fileRdd.filter(lambda line: re.split(r"\s",line)[2] != '-1' and len(re.split("\\s",line)) == 4)
filterRdd.foreach(lambda x: print(x))
三、常见触发算子详细解析
(一)count 算子
count
算子用于计算 RDD 中的元素数量。它在实际应用中非常有用,比如我们需要知道某个数据集的大小,或者在对数据进行抽样后,计算抽样数据的数量。在 Spark 内部,count
算子会触发一个 job 的执行,遍历整个 RDD 来计算元素个数。
功能特点
count算子
功能:统计RDD集合中元素的个数,返回一个int值
分类:触发算子
场景:统计RDD的数据量,计算行数
语法:
def count(self) -> int
(二)foreach 算子
foreach
算子对 RDD 中的每个元素执行指定的操作。这个操作通常是有副作用的,比如将数据写入外部存储系统、打印数据等。需要注意的是,由于 foreach
是在集群中的每个节点上执行,对于有状态的操作(如更新全局变量)需要谨慎处理,以免出现数据不一致或其他问题。
功能特点
功能:对RDD中每个元素调用一次参数中的函数,没有返回值【与map场景上区别】
分类:触发算子
场景:对RDD中的每个元素进行输出或者保存,一般用于测试打印或者保存数据到第三方系统【数据库等】
(三)saveAsTextFile 算子
saveAsTextFile
算子将 RDD 中的数据保存为文本文件。它会将 RDD 中的每个元素转换为字符串形式,并写入指定的文件路径。在保存过程中,Spark 会根据数据的分区情况将数据分布存储在多个文件中,以提高写入效率。这个算子常用于将处理后的结果保存下来,供后续分析或其他应用使用。
功能特点
功能:用于将RDD的数据保存到外部文件系统中
分类:触发算子
场景:保存RDD的计算的结果,一般用于将结果保存到HDFS
文件个数 = Task个数 = 分区个数
def saveAsTextFile(self , path ) -> None
转换算子、触发算子代码演示
import os
from pyspark import SparkContext, SparkConf
def getSongs(line):
list = line.split()
return list[-1]
if __name__ == '__main__':
# 设置 任务的环境变量
os.environ['JAVA_HOME'] = r'C:\Program Files\Java\jdk1.8.0_77'
# 配置Hadoop的路径,就是前面解压的那个路径
os.environ['HADOOP_HOME'] = 'D:/hadoop-3.3.1/hadoop-3.3.1'
# 配置base环境Python解析器的路径
os.environ['PYSPARK_PYTHON'] = r'C:\ProgramData\Miniconda3\python.exe' # 配置base环境Python解析器的路径
os.environ['PYSPARK_DRIVER_PYTHON'] = 'C:/ProgramData/Miniconda3/python.exe'
# 获取sc 对象
conf = SparkConf().setMaster("local[2]").setAppName("分区的解释")
sc = SparkContext(conf=conf)
print(sc)
# 编写各种需求代码
# 使用parallelize 这个算子获取的rdd,假如指定了分区数,就按照数字分区,假如没有指定spark.default.parallelism
rdd1 = sc.parallelize([1,2,3,4,5,6],numSlices=3)
# map 算子 转换算子
mapRdd = rdd1.map(lambda x:x*3) #PipelinedRDD
print(type(mapRdd))
# mapRdd.foreach(print)
# filter算子 转换算子
filterRdd = mapRdd.filter(lambda x: x>=9)
print(type(filterRdd)) # PipelinedRDD
print(filterRdd)
filterRdd.foreach(print)
# flatMap 算子 转换算子
fileRdd = sc.textFile("../../datas/a.txt")
#fileRdd.foreach(print)
flatmapRdd = fileRdd.flatMap(lambda line: line.split("/"))
filterRdd = flatmapRdd.filter(lambda song: len(song)==4)
filterRdd.foreach(print)
fileRdd = sc.textFile("../../datas/b.txt")
# fileRdd.foreach(print)
#mapRdd = fileRdd.map(lambda line:getSongs(line))
mapRdd = fileRdd.map(getSongs)
# mapRdd.foreach(print)
mapRdd.foreach(lambda x:print(x))
mapRdd.flatMap(lambda line: line.split("/")).filter(lambda song:len(song) ==4).foreach(print)
# 常见的触发算子的用法
# count 触发算子,返回值是一个int ,不是rdd
print(mapRdd.count())
# foreach 将rdd中的每一个元素,执行foreach 中的函数,没有返回值,跟map 有点像
# foreach 没有返回值,是触发算子
# map 有返回值,不是触发算子
mapRdd.foreach(lambda line: print("~".join(line.split("/"))))
# saveAsTextFile 是触发算子
mapRdd.saveAsTextFile("../../datas/result")
# 关闭sc
sc.stop()
四、其他转换算子
(一)union 算子
union
算子用于合并两个 RDD。这两个 RDD 的类型必须相同,合并后的 RDD 包含了两个原始 RDD 中的所有元素。例如,有两个分别存储不同地区用户信息的 RDD,我们可以使用 union
算子将它们合并成一个包含所有用户信息的 RDD。
功能特点
union算子
功能:实现两个RDD中数据的合并
分类:转换算子
语法:
def union(self,other:RDD[U]) -> RDD[T/U]
(二)distinct 算子
distinct
算子用于去除 RDD 中的重复元素。它会对 RDD 中的所有元素进行去重操作。例如,在处理一个包含用户浏览记录的 RDD 时,可能存在用户多次浏览同一页面的情况,使用 distinct
算子可以去除这些重复的浏览记录。
功能特点
功能:实现对RDD元素的去重
分类:转换算子
语法:
def distinct(self) -> RDD[T]
union、distinct 算子代码演示
import os
from pyspark import SparkContext, SparkConf
if __name__ == '__main__':
# 设置 任务的环境变量
os.environ['JAVA_HOME'] = r'C:\Program Files\Java\jdk1.8.0_77'
# 配置Hadoop的路径,就是前面解压的那个路径
os.environ['HADOOP_HOME'] = 'D:/hadoop-3.3.1/hadoop-3.3.1'
# 配置base环境Python解析器的路径
os.environ['PYSPARK_PYTHON'] = r'C:\ProgramData\Miniconda3\python.exe' # 配置base环境Python解析器的路径
os.environ['PYSPARK_DRIVER_PYTHON'] = 'C:/ProgramData/Miniconda3/python.exe'
# 获取sc 对象
conf = SparkConf().setMaster("local[2]").setAppName("其他转换算子")
sc = SparkContext(conf=conf)
print(sc)
# 编写各种需求代码
list1 = [1, 2, 3, 4, 5, 6, 7, 8]
list2 = [5, 6, 7, 8, 9, 10]
rdd1 = sc.parallelize(list1,2)
rdd2 = sc.parallelize(list2,2)
# 将两个算子中的数据进行合并, 形成新的算子 转算算子
rdd3 = rdd1.union(rdd2)
# rdd3.foreach(print)
rdd4 = rdd3.distinct()
rdd4.foreach(print)
# 关闭sc
sc.stop()
(三)分组聚合算子:groupByKey、reduceByKey
groupByKey
- 功能:对 KV 类型的 RDD 按照 Key 进行分组,将相同 K 的 Value 放入一个集合列表中。例如,在处理日志数据时,我们可以根据用户 ID 对用户的操作记录进行分组。
- 语法:
RDD[K,V].groupByKey => RDD[K, List[V]]
。它可以指定新的 RDD 分区个数和分区规则。 -
场景:需要对数据进行分组的场景,或者说分组以后的聚合逻辑 比较复杂,不适合用reduce
- 特点:必须经过 Shuffle 过程。在处理大规模数据时,Shuffle 可能会带来较大的性能开销,需要合理设计分区等参数。
reduceByKey
- 根据 key 值对 value 进行合并计算。与
groupByKey
不同,它在 Shuffle 之前会在每个分区内先进行预聚合(类似于 MapReduce 中的 Combiner),这样可以减少网络传输和后续聚合的计算量,性能通常比groupByKey
好。因此,在可以使用reduceByKey
的情况下,尽量不使用groupByKey + map
的方式来实现分组聚合。
groupByKey、reduceByKey 算子代码演示
import os
from pyspark import SparkContext, SparkConf
def showMsg(name,age):
print(name,age)
def showMsg2(a,b,c):
print(a,b,c)
if __name__ == '__main__':
# 设置 任务的环境变量
os.environ['JAVA_HOME'] = r'C:\Program Files\Java\jdk1.8.0_77'
# 配置Hadoop的路径,就是前面解压的那个路径
os.environ['HADOOP_HOME'] = 'D:/hadoop-3.3.1/hadoop-3.3.1'
# 配置base环境Python解析器的路径
os.environ['PYSPARK_PYTHON'] = r'C:\ProgramData\Miniconda3\python.exe' # 配置base环境Python解析器的路径
os.environ['PYSPARK_DRIVER_PYTHON'] = 'C:/ProgramData/Miniconda3/python.exe'
# 获取sc 对象
conf = SparkConf().setMaster("local[2]").setAppName("其他转换算子")
sc = SparkContext(conf=conf)
print(sc)
# 编写各种需求代码
list1 = [1, 2, 3, 4, 5, 6, 7, 8]
list2 = [5, 6, 7, 8, 9, 10]
rdd1 = sc.parallelize(list1, 2)
rdd2 = sc.parallelize(list2, 2)
# 将两个算子中的数据进行合并, 形成新的算子 转算算子
rdd3 = rdd1.union(rdd2)
# rdd3.foreach(print)
rdd4 = rdd3.distinct()
rdd4.foreach(print)
# groupByKey 转换算子,只对 KV键值对的RDD 起作用
rdd5 = sc.parallelize([("word", 10), ("word", 5), ("hello", 100), ("hello", 20), ("laoyan", 1)], numSlices=3)
rdd6 = rdd5.groupByKey() # ("word",List[10,5])
rdd6.foreach(lambda x: print(x[0], *x[1]))
list3 = [10, 20, 30]
print(*list3)
showMsg2(*list3)
dict = {"name": "zhangsan", "age": 29}
showMsg(**dict)
# reduceByKey算子
rdd7 = rdd5.reduceByKey(lambda total,num: total * num)
rdd7.foreach(print)
# 关闭sc
sc.stop()
(四)排序算子:sortBy、sortByKey
sortBy
可以根据指定的函数对 RDD 中的元素进行排序。例如,按照用户年龄降序排序,我们可以定义一个根据年龄提取值的函数作为排序依据。
功能特点
功能:对RDD中的所有元素进行整体排序,可以指定排序规则
【按照谁排序,升序或者降序】
分类:转换算子
场景:适用于所有对大数据排序的场景,一般用于对大数据量非KV类型的RDD的数据排序
特点:经过Shuffle,可以指定排序后新RDD的分区个数,底层只能使用RangePartitioner来实现
def sortBy(self, keyFunc:(T) -> 0, asc: bool,numPartitions) -> RDD
keyFunc:(T) -> 0:用于指定按照数据中的哪个值进行排序
asc: bool:用于指定升序还是降序,默认是升序
举例说明
需求:按照用户年龄降序排序
laoda,20,male
laoer,22,female
laoliu,28,middle
laosan,24,male
laosi,30,male
laowu,26,female
fileRdd = sc.textFile("../../datas/c.txt")
fileRdd.sortBy(lambda line:line.split(",")[1],ascending=False).foreach(print)
sortByKey
专门用于对 KV 类型的 RDD 按照 key 进行排序。这种排序方式在处理键值对数据时非常方便,比如对单词和其出现次数的 RDD 按照单词字典序排序。
功能特点
功能:对RDD中的所有元素按照Key进行整体排序,可以指定排序规则
要求:只有KV类型的RDD才能调用
分类:转换算子【sortByKey会触发job的运行】
场景:适用于大数据量的KV类型的RDD按照Key排序的场景
特点:经过Shuffle,可以指定排序后新RDD的分区个数
语法:def sortByKey(self, asc, numPartitions) -> RR[Tuple[K,V]]
使用这个算子,还想得到以上的需求的结果,必须让年龄为key
sortBy、sortByKey 算子代码演示
import os
from pyspark import SparkContext, SparkConf
"""
------------------------------------------
Description :
SourceFile : _05其他转换算子
Author : 闫哥
Date : 2024/4/19
-------------------------------------------
"""
if __name__ == '__main__':
# 设置 任务的环境变量
os.environ['JAVA_HOME'] = r'C:\Program Files\Java\jdk1.8.0_77'
# 配置Hadoop的路径,就是前面解压的那个路径
os.environ['HADOOP_HOME'] = 'D:/hadoop-3.3.1/hadoop-3.3.1'
# 配置base环境Python解析器的路径
os.environ['PYSPARK_PYTHON'] = r'C:\ProgramData\Miniconda3\python.exe' # 配置base环境Python解析器的路径
os.environ['PYSPARK_DRIVER_PYTHON'] = 'C:/ProgramData/Miniconda3/python.exe'
# 获取sc 对象
conf = SparkConf().setMaster("local[2]").setAppName("其他转换算子")
sc = SparkContext(conf=conf)
print(sc)
# sortBy
fileRdd = sc.textFile("../../datas/c.txt")
fileRdd.sortBy(lambda line:line.split(",")[1],ascending=False).foreach(print)
# sortByKey 对KV类型的RDD进行排序
rdd5 = sc.parallelize([("word", 10), ("word", 5), ("hello", 100), ("hello", 20), ("laoyan", 1)], numSlices=3)
#rdd5.sortByKey(ascending=False).foreach(print)
# 假如你想根据value排序,怎么办?
rdd5.sortBy(lambda tuple:tuple[1],ascending=False).foreach(print)
# 关闭sc
sc.stop()
(五)重分区算子:repartition、coalesce
这两个算子都涉及 Shuffle 过程。repartition
底层是 coalesce(shuffle=True),
repartition
可以将分区变大或变小,而 coalesce
默认情况下只能将分区变小,如果设置 shuffle=True
,也可以将分区变大。在处理数据倾斜等问题时,重分区算子可以帮助重新调整数据分布,提高计算效率。
repartition
底层是 coalesce(shuffle=True),
repartition
可以将分区变大或变小
功能特点
功能:调整RDD的分区个数
分类:转换算子
场景:一般用于调大分区个数,必须经过shuffle才能实现
语法:
def repartition(self,numPartitions) -> RDD[T]
list01 = [1, 5, 2, 6, 9, 10, 4, 3, 8, 7]
# 没有指定分区,走默认,默认分区个数,因为是local 模式,所以跟核数有关,所以 分区数为2
rdd = sc.parallelize(list01)
print(rdd.getNumPartitions()) # 2
# repartition 是一个转换算子,必然经历shuffle过程
bigrdd = rdd.repartition(4)
print(bigrdd.getNumPartitions()) # 4
coalesce
默认情况下只能将分区变小,如果设置 shuffle=True
,也可以将分区变大
功能特点
功能:调整RDD的分区个数
分类:转换算子特点:可以选择是否经过Shuffle,默认情况下不经过shuffle
def coalesce(self, numPartitions, shuffle:bool) -> RDD[T]
# 将一个小分区,变为大分区,shuffle 必须等于True,否则分区数不发生改变
bigbigrdd = bigrdd.coalesce(8,shuffle=True) # 8
print(bigbigrdd.getNumPartitions())
smallRdd = bigbigrdd.repartition(2)
print(smallRdd.getNumPartitions()) # 8 -->2
smallRdd2 = bigbigrdd.coalesce(2)
print(smallRdd2.getNumPartitions()) # 8 --> 2
# repartition(num) = coalesce(num, shuffle=True)
五、其他触发算子
(一)first 算子
first
算子返回 RDD 中的第一个元素。它在某些情况下很有用,比如快速查看数据集的一个示例数据。需要注意的是,如果 RDD 为空,执行 first
算子会抛出异常。
功能特点
功能:返回RDD集合中的第一个元素【RDD有多个分区,返回的是第一个分区的第一个元素】
分类:触发算子
语法:def first(self) -> T
(二)take 算子
take
算子返回 RDD 中的前 n 个元素。这对于快速获取数据集的一小部分数据进行查看或初步分析非常方便。与 collect
不同,take
不会将整个 RDD 数据收集到驱动程序中,而是只获取指定数量的元素,因此更适合处理大数据集。
功能特点
功能:返回RDD集合中的前N个元素【先从第一个分区取,如果不够再从第二个分区取】
分类:触发算子
注意:take返回的结果放入Driver内存中的,take数据量不能过大
举例说明
举例: [1,2,3,4,5,6,7,8,9]
假如是三个分区:
[1,2,3]
[4,5,6]
[7,8,9]
take(4) 1 2 3 4
(三)collect 算子
collect
算子将 RDD 中的所有元素收集到驱动程序中,形成一个本地集合。虽然它可以方便地获取和处理 RDD 中的数据,但如果 RDD 数据量过大,可能会导致内存溢出。因此,在使用 collect
算子时,需要确保数据量在驱动程序的内存承受范围内。
功能特点
功能:将RDD转化成一个列表返回
分类:触发算子
这个RDD的数据一定不能过大,如果RDD数据量很大,导致Driver内存溢出理解:假如现在有三个分区,三个分区中都有数据,假如你现在想打印数据,此时打印哪个分区呢?先收集,将数据汇总在一起,再打印。
案例:在sortBy sortByKey 中,如果不收集就打印的话,此时打印的是每一个分区的结果,为了看到全局排序的结果,此时你需要先collect 再 打印就能看到结果了。
(四)reduce 算子
reduce
算子对 RDD 中的元素进行聚合操作。它接受一个二元函数作为参数,该函数用于将两个元素合并为一个新的元素。例如,计算 RDD 中所有整数的乘积,或者将字符串 RDD 中的所有字符串连接起来等。reduce
算子从 RDD 的第一个元素开始,依次将每个元素与前一个聚合结果进行合并,直到遍历完整个 RDD。
功能特点
功能:将RDD中的每个元素按照给定的聚合函数进行聚合,返回聚合的结果
分类:触发算子
# tmp用于存储每次计算临时结果,item就是RDD中的每个元素
def reduce(self,f : (T,T) -> U) -> U
reduceByKey(lambda tmp,item: tmp+item)
举例说明
# 一般用于KV键值对的数据
等同于: select word,sum(value) from a groub by word;rdd:1 2 3 4 5 6 7 8 9 10
rdd.reduce(lambda tmp,item: tmp+item) = 55
# 一般用于正常数据
等同于: select sum(1) * from a ;
first、take、conllect、reduce算子代码演示
import os
from pyspark import SparkContext, SparkConf
def getSongs(line):
list = line.split()
return list[-1]
if __name__ == '__main__':
# 设置 任务的环境变量
os.environ['JAVA_HOME'] = r'C:\Program Files\Java\jdk1.8.0_77'
# 配置Hadoop的路径,就是前面解压的那个路径
os.environ['HADOOP_HOME'] = 'D:/hadoop-3.3.1/hadoop-3.3.1'
# 配置base环境Python解析器的路径
os.environ['PYSPARK_PYTHON'] = r'C:\ProgramData\Miniconda3\python.exe' # 配置base环境Python解析器的路径
os.environ['PYSPARK_DRIVER_PYTHON'] = 'C:/ProgramData/Miniconda3/python.exe'
# 获取sc 对象
conf = SparkConf().setMaster("local[2]").setAppName("分区的解释")
sc = SparkContext(conf=conf)
print(sc)
# 编写各种需求代码
# 使用parallelize 这个算子获取的rdd,假如指定了分区数,就按照数字分区,假如没有指定spark.default.parallelism
rdd1 = sc.parallelize([1,2,3,4,5,6,7,8,9],numSlices=3)
print(rdd1.first()) # 1
# take 数据量不能大,是将数据临时存储在driver进程中
print(rdd1.take(4)) # [1, 2, 3, 4]
# collect 将数据收集起来放在driver进程中
print(rdd1.collect())
print(rdd1.reduce(lambda total, num: total + num))
rdd2 = sc.parallelize([("word",10), ("word",5), ("hello",100), ("hello",20), ("laoyan",1)], numSlices=3)
# reduceBYKey 是一个 转换算子,reduce 是一个出发算子
rdd2.reduceByKey(lambda total, num: total + num).foreach(print)
# 关闭sc
sc.stop()
(五)TopN 算子:top、takeOrdered
top 算子
用于获取排好序之后的最大的几个值。例如,对于一个存储学生成绩的 RDD,我们可以使用 top
算子获取成绩最高的前几名学生的成绩。
功能特点
功能:对RDD中的所有元素降序排序,并返回前N个元素,即返回RDD中最大的前N个元数据
分类:触发算子
场景:取RDD数据中的最大的TopN个元素
特点:不经过Shuffle,将所有元素放入Driver内存中排序,性能更好,只能适合处理小数据量
语法:def top(self,num) -> List[0]
举例说明
# top
list01 = [1, 5, 2, 6, 9, 10, 4, 3, 8, 7]
rdd = sc.parallelize(list01)
# top 是一个触发算子,不返回rdd类型
# 为什么 有时 用foreach打印,有时用print 打印
# 对于转换算子的结果,还是rdd,对于rdd 使用foreach 1) rdd 循环打印 2) foreach 是触发算子
# 对于触发算子的结果,一般不返回rdd,而是一个正常的返回值,使用print 打印即可
print(rdd.top(3))
takeOrdered 算子
与 top
相反,它用于获取排好序之后的最小的几个值。比如,获取订单金额最小的几个订单信息。
功能特点
功能:对RDD中的所有元素升序排序,并返回前N个元素,即返回RDD中最小的前N个元数据
分类:触发算子
场景:取RDD数据中的最小的TopN个元素
特点:不经过Shuffle,将所有元素放入Driver内存中排序,只能
适合处理小数据量
语法:def takeOrdered(self,num) -> List[0]
举例说明
list01 = [1, 5, 2, 6, 9, 10, 4, 3, 8, 7]
rdd = sc.parallelize(list01)
# takeOrdered 也是一个触发算子,返回排序之后的最小的几个值
print(rdd.takeOrdered(3))
六、算子的其他方面
(一)面试题:groupByKey + map 和 reduceByKey 的区别
区别
- 计算方式:
groupByKey
只是简单地将相同 key 的 value 分组到一起,形成一个 value 的集合,然后如果要进行聚合操作,需要再使用map
算子。而reduceByKey
在 Shuffle 之前会在每个分区内先进行预聚合(类似 Combiner),减少了网络传输和后续聚合的计算量。 - 性能:
reduceByKey
的性能通常更好,尤其是在数据量较大且存在较多重复 key 的情况下。因为它减少了不必要的数据传输和聚合计算。
map 端的聚合(Combiner)
Combiner 类似于运行在 map 端的 Reduce。它在 map 阶段对本地数据进行初步聚合,然后再将聚合后的结果发送到 reduce 阶段。这样可以减少在网络传输过程中的数据量,提高整体性能。例如,在单词计数的场景中,在每个 map 任务中,可以先对本地出现的相同单词进行计数,然后再将结果传输到 reduce 任务进行最终的聚合。
(二)其他 KV 类型算子
keys
keys
算子用于获取所有的 key,返回一个只包含 KV - RDD 中 key 的新 RDD。在只需要处理键或值的情况下非常有用。例如,在统计某个数据集的键的种类或者值的分布情况时。
功能特点
功能:针对二元组KV类型的RDD,返回RDD中所有的Key,放入一个新的RDD中
分类:转换算子
语法
def keys( self: RDD[Tuple[K,V]] ) -> RDD[K]
举例说明
rdd_kv = sc.parallelize([('laoda',11),('laoer',22),('laosan',33),('laosi',44)], numSlices=2)
rdd_keys = rdd_kv.keys()
rdd_keys.foreach(lambda x: print(x)))
values
values
算子则用于获取所有 rdd 中的 value,返回一个只包含值的新 RDD。在只需要处理键或值的情况下非常有用。例如,在统计某个数据集的键的种类或者值的分布情况时。
功能特点
- 功能:针对二元组KV类型的RDD,返回RDD中所有的Value,放入一个新的RDD中
- 分类:转换算子
- 语法
def values( self: RDD[Tuple[K,V]] ) -> RDD[V]
举例说明
rdd_values = rdd_kv.values()
rdd_values.foreach(lambda x: print(x))
mapValues
mapValues
算子将所有的 value 拿到之后进行 map 转换,转换后还是元组,只是元组中的 value 发生了变化。例如,对于一个存储用户 ID 和用户年龄的 KV - RDD,我们可以使用 mapValues
算子将每个用户的年龄增加一岁。
功能特点
- 功能:针对二元组KV类型的RDD,对RDD中每个元素的Value进行map处理,结果放入一个新的RDD中
- 分类:转换算子
- 语法 def mapValues(self: RDD[Tuple[K,V]], f: (V) -> U) -> RDD[Tuple[K,U]]
举例说明
rdd_kv = sc.parallelize([('laoda',11),('laoer',22),('laosan',33),('laosi',44)], numSlices=2)
rsRdd = rdd_kv.mapValues(lambda age: age + 1)
rsRdd.foreach(lambda x:print(x))
collectAsMap
collectAsMap
算子将 KV - RDD 转换为一个本地的 Map。需要注意的是,如果 RDD 中有重复的 key,只会保留最后一个 key - value 对。这个算子在需要将键值对数据在本地以 Map 形式处理时很方便,比如配置信息的读取和处理。
功能特点
- 功能:将二元组类型的RDD转换成一个Dict字典
- 分类:触发算子
- 特点:类似于collect,将RDD中元素的结果放入Driver内存中的一个字典中,数据量必须比较小
- 语法
def collectAsMap(self: RDD[Tuple[K,V]]) -> Dict[K,V]
举例说明
dict = rdd_kv.collectAsMap()
print(type(dict))
for k,v in dict.items():
print(k,v)
KV 类型算子代码演示
import os
from pyspark import SparkContext, SparkConf
if __name__ == '__main__':
# 设置 任务的环境变量
os.environ['JAVA_HOME'] = r'C:\Program Files\Java\jdk1.8.0_77'
# 配置Hadoop的路径,就是前面解压的那个路径
os.environ['HADOOP_HOME'] = 'D:/hadoop-3.3.1/hadoop-3.3.1'
# 配置base环境Python解析器的路径
os.environ['PYSPARK_PYTHON'] = r'C:\ProgramData\Miniconda3\python.exe' # 配置base环境Python解析器的路径
os.environ['PYSPARK_DRIVER_PYTHON'] = 'C:/ProgramData/Miniconda3/python.exe'
# 获取sc 对象
conf = SparkConf().setMaster("local[2]").setAppName("其他转换算子")
sc = SparkContext(conf=conf)
print(sc)
rdd_kv = sc.parallelize([('laoda', 11), ('laoer', 22), ('laosan', 33), ('laosi', 44)], numSlices=2)
# values 是一个转换算子
valuesRdd = rdd_kv.values()
valuesRdd.foreach(print)
# keys 是一个转换算子,获取所有的key
rdd_kv.keys().foreach(print)
# 针对 kv类型的value 重新进行计算
rdd_kv.mapValues(lambda x:x*2).foreach(print)
# 以上三个都是 转换算子 collectAsMap 是 触发算子,返回值是一个字典
# collectAsMap 将一个KV类型的rdd ,快速变为一个 python中的字典类型
print(rdd_kv.collectAsMap()) # {'laoda': 11, 'laoer': 22, 'laosan': 33, 'laosi': 44}
# 关闭sc
sc.stop()
(三)join 方面的算子
join / fullOuterJoin / leftOuterJoin / rightOuterJoin
这些算子用于对两个 KV 类型的 RDD 进行连接操作。
join
类似于 SQL 中的内连接,只返回两个 RDD 中 key 相同的元素对。
fullOuterJoin
返回两个 RDD 中所有元素对,对于没有匹配的 key,相应的值为 None。
leftOuterJoin
以左边的 RDD 为基础,返回左边 RDD 中所有元素和右边 RDD 中匹配 key 的元素,右边没有匹配的 key 则值为 None。
rightOuterJoin
与
leftOuterJoin
相反,以右边的 RDD 为基础。这些 join 算子在处理关联数据时非常有用,比如在处理订单数据和用户信息数据时,通过用户 ID 进行连接操作。
功能特点
实现**两个KV类型**的RDD之间按照K实现关联,将两个RDD的关联结果放入一个新的RDD中
def join(self: RDD[Tuple[K,V]], otherRdd: RDD[Tuple[K,W]]) -> RDD[Tuple[K,(V,W)]]join的过程,必然引发相同key值的数据汇总在一起,引发shuffle 操作
举例说明
rdd_singer_age = sc.parallelize([("周杰伦", 43), ("陈奕迅", 47), ("蔡依林", 41), ("林子祥", 74), ("陈升", 63)], numSlices= 2)
rdd_singer_music = sc.parallelize([("周杰伦", "青花瓷"), ("陈奕迅", "孤勇者"), ("蔡依林", "日不落"), ("林子祥", "男儿当自强"), ("动力火车", "当")], numSlices=2)
Join算子代码演示
import os
from pyspark import SparkContext, SparkConf
if __name__ == '__main__':
# 设置 任务的环境变量
os.environ['JAVA_HOME'] = r'C:\Program Files\Java\jdk1.8.0_77'
# 配置Hadoop的路径,就是前面解压的那个路径
os.environ['HADOOP_HOME'] = 'D:/hadoop-3.3.1/hadoop-3.3.1'
# 配置base环境Python解析器的路径
os.environ['PYSPARK_PYTHON'] = r'C:\ProgramData\Miniconda3\python.exe' # 配置base环境Python解析器的路径
os.environ['PYSPARK_DRIVER_PYTHON'] = 'C:/ProgramData/Miniconda3/python.exe'
# 获取sc 对象
conf = SparkConf().setMaster("local[2]").setAppName("其他转换算子")
sc = SparkContext(conf=conf)
rdd_singer_age = sc.parallelize([("周杰伦", 43), ("陈奕迅", 47), ("蔡依林", 41), ("林子祥", 74), ("陈升", 63)],
numSlices=2)
rdd_singer_music = sc.parallelize(
[("周杰伦", "青花瓷"), ("陈奕迅", "孤勇者"), ("蔡依林", "日不落"), ("林子祥", "男儿当自强"),
("动力火车", "当")], numSlices=2)
# join 是 转换算子 join 可以理解为内连接
joinRdd = rdd_singer_age.join(rdd_singer_music)
joinRdd.foreach(print)
print("*"*100)
leftRdd = rdd_singer_age.leftOuterJoin(rdd_singer_music)
leftRdd.foreach(print)
print("*"*100)
rightRdd = rdd_singer_age.rightOuterJoin(rdd_singer_music)
rightRdd.foreach(print)
print("*"*100)
fullRdd = rdd_singer_age.fullOuterJoin(rdd_singer_music)
fullRdd.foreach(print)
# join 关联的是两个kv类型的rdd
# union 关联的是单个元素的rdd
# 关闭sc
sc.stop()
(四)分区算子
为什么需要分区算子
在处理大数据集时,有些操作是针对一条数据的(如 map
和 foreach
),但当需要对整个数据集进行批量处理时,可能会出现问题。例如,有一个 RDD 读文件产生,有两个分区,每个分区有 50 万条数据,需要将 RDD 的数据进行一对一的处理转换,最后将转换好的结果写入 MySQL。如果直接使用普通的 map
和 foreach
可能会导致资源利用不合理、性能低下等问题。
mapPartitions
对每个分区的数据作为一个整体进行转换操作。它接受一个函数作为参数,该函数应用于每个分区的数据。例如,在处理数据库连接池时,可以在每个分区内获取一次数据库连接,然后对分区内的所有数据进行处理,最后释放连接,提高数据库连接的使用效率。
功能特点
- 功能:对RDD每个分区的数据进行操作,将每个分区的数据进行map转换,将转换的结果放入新的RDD中
- 分类:转换算子
def mapPartitions(self: RDD[T], f: Iterable[T] -> Iterable[U] ) -> RDD[U]
举例说明
# 使用mapPartitions:对每个分区进行处理
def map_partition(part):
rs = [i * 2 for i in part]
return rs
# 每个分区会调用一次:将这个分区的数据放入内存,性能比map更好,优化型算子,注意更容易出现内存溢出
map_part_rdd = input_rdd.mapPartitions(lambda part: map_partition(part))
foreachParition
对每个分区的数据执行指定的操作,通常用于对分区数据进行输出或写入外部存储等有副作用的操作。与 foreach
类似,但在分区级别执行,可以更好地控制资源和提高效率。
功能特点
- 功能:对RDD每个分区的数据进行操作,将整个分区的数据加载到内存进行foreach处理,没有返回值
- 分类:触发算子
def foreachParition(self: RDD[T] , f: Iterable[T] -> None) -> None
举例说明
# 使用foreachPartiion:对每个分区进行处理写入MySQL
def save_part_to_mysql(part):
# 构建MySQL连接
for i in part:
# 利用MySQL连接将结果写入MySQL
print(i)
# 将每个分区的数据直接写入MySQL,一个分区就构建一个连接
map_part_rdd.foreachPartition(lambda part: save_part_to_mysql(part))
mapPartitions、foreachParition算子代码演示
import os
from pyspark import SparkContext, SparkConf
def saveToMySQL(partData):
print("我进来了")
for i in partData:
print(i)
def a(partData2):
return (i * 2 for i in partData2)
if __name__ == '__main__':
# 设置 任务的环境变量
os.environ['JAVA_HOME'] = r'C:\Program Files\Java\jdk1.8.0_77'
# 配置Hadoop的路径,就是前面解压的那个路径
os.environ['HADOOP_HOME'] = 'D:/hadoop-3.3.1/hadoop-3.3.1'
# 配置base环境Python解析器的路径
os.environ['PYSPARK_PYTHON'] = r'C:\ProgramData\Miniconda3\python.exe' # 配置base环境Python解析器的路径
os.environ['PYSPARK_DRIVER_PYTHON'] = 'C:/ProgramData/Miniconda3/python.exe'
# 获取sc 对象
conf = SparkConf().setMaster("local[2]").setAppName("分区算子")
sc = SparkContext(conf=conf)
# 构建RDD
input_rdd = sc.parallelize((1, 2, 3, 4, 5, 6, 7, 8, 9, 10), numSlices=2)
# 触发算子 算子中的函数 没有返回值
input_rdd.foreachPartition(lambda partition: saveToMySQL(partition))
# mapPartitions 转换算子 算子中的函数,返回值必须有,并且返回值必须是集合(可迭代类型)
newRdd = input_rdd.mapPartitions(lambda partition: a(partition))
newRdd.foreach(print)
sc.stop()
七、总结
(一)触发算子
count foreach saveAsTextFile first take
collect reduce top takeOrdered
collectAsMap foreachParition max min mean sum
(二)转换算子
map flatMap filter union distinct groupByKey sortByKey sortBy reduceByKey
repartition coalesce keys values mapValues
join fullOuterJoin leftOuterJoin rightOuterJoin
mapPartitions
(三)能触发shuffle过程的算子
groupByKey sortByKey sortBy reduceByKey repartition
coalesce(根据情况) join( fullOuterJoin / leftOuterJoin / rightOuterJoin)
本文详细介绍了 RDD 的常用基础算子,包括 34 个算子的分类、功能、代码示例以及相关的注意事项。了解这些算子对于熟练使用 Spark 进行大数据处理至关重要。在实际应用中,根据数据处理的需求选择合适的算子可以提高处理效率、减少资源消耗。同时,掌握这些算子的原理和特点对于应对 Spark 相关的面试问题也非常有帮助。无论是数据的转换、触发计算、分组聚合、排序还是其他复杂的操作,RDD 算子都提供了丰富的功能来满足不同的场景需求。在使用过程中,需要注意算子的 lazy 执行模式、Shuffle 过程对性能的影响以及内存的合理使用等问题,以确保数据处理的高效性和稳定性。