前言:
💞💞大家好,我是书生♡,本阶段和大家一起分享和探索大数据技术Spark—RDD,本篇文章主要讲述了:RDD的概念,RDD的特性,怎么创建一个RDD,RDD的算子等等。欢迎大家一起探索讨论!!!
💞💞代码是你的画笔,创新是你的画布,用它们绘出属于你的精彩世界,不断挑战,无限可能!
个人主页⭐: 书生♡
gitee主页🙋♂:闲客
专栏主页💞:大数据开发
博客领域💥:大数据开发,java编程,前端,算法,Python
写作风格💞:超前知识点,干货,思路讲解,通俗易懂
支持博主💖:关注⭐,点赞、收藏⭐、留言💬
目录
- 1. RDD的介绍
- 1.1 什么是RDD
- 1.2 RDD的特性
- 2. RDD的创建
- 2.1 RDD的创建方式
- 2.2 创建RDD
- 2.2.1 创建RDD--可迭代数据
- 2.2.2 创建RDD---文件/目录
- 2.2.3 设置分区
- 2.2.4 小文件数据读取转化为RDD
- 3. RDD算子
- 3.1 RDD算子的介绍和分类
- 3.2 转换算子--transformation
- 3.2.1 常见的转换算子
- 3.2.2 map算子
- 3.2.3 flatMap() 算子
- 3.2.3 filter算子
- 3.2.4 distinct算子
- 3.2.5 groupBy&mapValues算子
- 3.2.6 K-V格式RDD算子
- 3.2. 7 关联算子
- 3.3 action算子
1. RDD的介绍
1.1 什么是RDD
RDD是一种弹性分布式数据集合是spark中最基本的数据类型,它提供了容错能力和并行处理的能力。
- RDD(Resilient Distributed Dataset)弹性分布式数据集合,是Spark中最基本的数据抽象结构,代表一个不可变(只读)、可分区、里面的元素可并行计算的集合。
- 是Spark中的一种数据类型,管理spark的内存数据。类似于python中的列表类型 [1,2,3,4]
- spark中还有dataframe,dataset数据类型
- RDD同时还提供各种计算方法(map/reduce/filter/groupBy/flatMap)
- 弹性
- 可以对海量数据根据需求分成多份(分区),每一份数据会由对应的task线程执行计算
- 分布式
- 利用集群中多台机器资源进行计算
- 数据集合
- 规定数据形式 类似Python中的列表 []
RDD可以看作是Spark的一个对象,它本身运行于内存中,如读文件是一个RDD,对文件计算是一个RDD,结果集也是一个RDD,不同的分片、 数据之间的依赖 、key-value类型的map数据都可以看做RDD。
1.2 RDD的特性
RDD五大特性:分区、只读、依赖、缓存和checkpoint
- 分区
- 可以将计算的海量数据分成多份,需要分成多少个分区可以通过方法指定
- 每个分区都可以对应一个task线程执行计算
- 只读
- rdd中的数据不能直接修改,一旦修改当前的rdd一定会生成一个新的RDD
- rdd本身存储的数据只能读取
- 依赖(血统)
- rdd之间是有依赖/一个因果关系的
- 新的rdd是通过旧的rdd计算得到的
- 缓存
- 容错机制
- 将RDD进行临时持久化操作
- spark应用程序结束后, 缓存的RDD会被清空
- 缓存的RDD优先存储在内存中(内存满了才会存储在磁盘中)
- checkpoint
- 容错机制
- checkpoint可以将数据存储在分布式存储系统中,比如hdfs,实现永久存储
- spark应用程序结束后, checkpoint的RDD不会被清空
2. RDD的创建
2.1 RDD的创建方式
在Apache Spark中,创建Resilient Distributed Dataset (RDD) 的方式主要有以下几种:
- 从本地集合创建(可迭代的数据集合): 适用于小型数据集或测试场景。
- 从文件系统创建: 适用于大型数据集和生产环境。
- 从其他RDD转换创建: 适用于数据处理和转换。
- 从RDD序列化创建: 适用于从持久化或序列化的RDD数据恢复。
- 从网络流创建: 适用于实时数据处理。
- 从外部数据源创建: 适用于从多种数据源读取数据。
-
从本地集合创建:
- 方法:
parallelize()
- 描述: 将本地Python集合(如列表、元组等)转换为RDD。
- 示例:
data = [1, 2, 3, 4, 5] rdd = sc.parallelize(data)
- 方法:
-
从文件系统创建:
- 方法:
textFile()
,wholeTextFiles()
,binaryFiles()
,binaryRecords()
,newAPIHadoopFile()
,hadoopRDD()
- 描述: 从HDFS、本地文件系统或任何支持的文件系统中读取数据并创建RDD。
- 示例:
rdd = sc.textFile("hdfs://node1:9000/data/input.txt")
- 方法:
-
从其他RDD转换创建:
- 方法: 使用RDD转换操作(如
map()
,filter()
,flatMap()
,union()
,join()
等) - 描述: 通过对现有的RDD执行转换操作来创建新的RDD。
- 示例:
rdd1 = sc.parallelize([1, 2, 3, 4, 5]) rdd2 = rdd1.map(lambda x: x * 2)
- 方法: 使用RDD转换操作(如
-
从RDD序列化创建:
- 方法: 使用序列化或反序列化的工具,如
pickle
或joblib
。 - 描述: 将序列化的RDD数据反序列化为新的RDD。
- 示例:
import joblib serialized_rdd = joblib.load("serialized_rdd.pkl") rdd = sc.parallelize(serialized_rdd)
- 方法: 使用序列化或反序列化的工具,如
-
从网络流创建:
- 方法: 使用
DStream
API(在Spark Streaming中) - 描述: 从网络流中读取实时数据并创建RDD。
- 示例:
ssc = StreamingContext(sc, batchDuration=1) dstream = ssc.socketTextStream("localhost", 9999) rdd = dstream.dstream_to_rdd()
- 方法: 使用
-
从外部数据源创建:
- 方法: 使用
DataFrameReader
或SparkSession
读取支持的外部数据源(如CSV、JSON、Parquet等)。 - 描述: 虽然主要创建DataFrame或Dataset,但可以通过转换为RDD。
- 示例:
df = spark.read.format("csv").option("header", "true").load("file.csv") rdd = df.rdd
- 方法: 使用
2.2 创建RDD
我们这里使用从本地创建(通过可迭代数据集合)创建以及通过文件和目录进行创建
将需要计算的数据转为rdd数据,就可以利用spark的内存计算方法进行分布式计算操作,这些计算方法就是由rdd提供的
-
rdd数据的转化方法是由SparkContext提供的,所以需要先生成SparkContext对象,
-
SparkContext称为Spark的入口类,通过类可以生成对象,调用对象方法将数据转为rdd数据
2.2.1 创建RDD–可迭代数据
可迭代数据:“可迭代数据”指的是可以使用迭代器(iterator)遍历的数据结构。
包括但是不仅限于:列表,字符串,元组,字典等等
# python中容器类型可以转换成rdd对象
# 导入模块
from pyspark import SparkContext
# todo: 1-创建sc对象
# sc对象是spark core入口类的对象
sc = SparkContext()
# todo: 2-创建RDD对象
# str类型
str1 = 'pyspark'
# 数值类型 -> 不能转换成RDD
int1 = 10
float1 = 3.14
# bool类型 -> 不能转换成RDD
bool1 = True
# list类型
list1 = [1, 2, 3, 4, 5, 6]
# tuple类型
tuple1 = (1, 2, 3, 4)
# dict类型
dict1 = {'name': '小明', 'age': 18, 'height': 180}
# set类型
set1 = {'a', 'b', 'c', 'd'}
# 借助sc对象的parallelize方法创建RDD对象
# 字符串中的每个字符就是rdd中的每个元素
# rdd1 = sc.parallelize(c=str1)
# 列表中的每个元素就是rdd中的每个元素
# rdd1 = sc.parallelize(c=list1)
# 元组中的每个元素就是rdd中的每个元素
# rdd1 = sc.parallelize(c=tuple1)
# 将字典的key值转换成rdd
# rdd1 = sc.parallelize(c=dict1)
# rdd1 = sc.parallelize(c=dict1.values())
# 集合中的每个元素就是rdd中的每个元素
rdd1 = sc.parallelize(c=set1)
# todo: 3-收集RDD结果
# 借助rdd对象的collect方法(算子)进行收集
result = rdd1.collect()
print(result)
2.2.2 创建RDD—文件/目录
我们要先创建一个目录里面有文件存储
从文件中的数据创建RDD我们使用textfile这个函数
默认是读取HDFS中的文件/目录 hdfs://node1:8020/path
读取本地磁盘文件/目录 file:///path
注意:文件中我们的每一行数据都是RDD元素
from pyspark import SparkContext
# 创建sc对象
sc = SparkContext()
# 读取文件数据转换成RDD
"""
8020: HDFS的服务端程序端口号
9870: 3版本HDFS的客户端程序端口号 2版本50070
默认是读取HDFS中的文件/目录 hdfs://node1:8020/path
读取本地磁盘文件/目录 file:///path
"""
# 读取hdfs文件数据, 一行数据对应rdd中一个元素
rdd1 = sc.textFile('/data/words.txt')
result = rdd1.collect()
# print(result)
# 读取hdfs目录文件数据, 读取目录下的所有文件数据, 一行数据对应rdd中一个元素
rdd2 = sc.textFile(name='hdfs://node1:8020/data')
print(type(rdd2))
result = rdd2.collect()
print(result)
# 读取本地磁盘文件数据, 一行数据对应rdd中一个元素
rdd3 = sc.textFile('file:///root/data/employees.json')
result = rdd3.collect()
print(f"rdd3的数据是:{result}")
# 读取本地磁盘目录文件数据
rdd4 = sc.textFile('file:///root/data')
result = rdd4.collect()
print(f"rdd4的数据是:{result}")
2.2.3 设置分区
RDD有一个特性就是弹性的也就是可以设置多个分区
glom 方法
-
glom 方法主要用于将一个分区中的所有元素组合成一个列表。这对于需要获取每个分区中所有元素的场景非常有用。
-
作用
合并分区中的元素: glom 将一个分区中的所有元素合并成一个列表,每个分区对应一个列表。
便于调试和测试: 通常用于调试或测试目的,帮助理解数据是如何分布的。
我们设置分区的时候,使用glom()这个方法,默认的分区数是2个,不能小于2个
# 生成SparkContext类对象
sc = SparkContext()
# python转化时指定分区数
data = [1,2,3,4,5,6]
# numSlices指定分区数
rdd = sc.parallelize(data, numSlices=8)
# glom()按照分区查看数据
res = rdd.glom().collect()
print(res)
print("***********************")
rdd1 = sc.parallelize(data, numSlices=3)
# glom()按照分区查看数据
res1 = rdd1.glom().collect()
print(res1)
print("***********************")
rdd2 = sc.parallelize(data, numSlices=6)
# glom()按照分区查看数据
res2 = rdd2.glom().collect()
print(res2)
读取文件数据转换成rdd
minPartitions:设置rdd的最小分区数, 实际分区数有可能比设置的多一个分区
文件数据指定分区数
"""
1B=1字节, 1K=1024B=1024字节
单个文件分区规则:
文件字节数/分区数 = 分区字节数...余数
余数/分区字节数 = 结果值 结果值 大于10% 会多创建
目录下多个文件分区规则:
所有文件字节数/分区数 = 分区字节数...余数
每个文件字节数/分区字节数 = 分区数...余数
余数/分区字节数 = 结果值 结果值 大于10% 会多创建
"""
2.2.4 小文件数据读取转化为RDD
在一个目录下,有多个文件,如果文件的大小不够一个块的大小,一个文件就对应一个分区,文件超过一个块,那就一个block(128M)块对应一个分区。
textFile读取多个小文件数据时, 最少一个文件对应一个分区, 一个分区由一个task线程执行 HDFS中一个块数据是128M大小,
如果块数据不满128M, 造成资源浪费 小文件越多,task线程越多,线程需要资源进行执行, 造成资源浪费
使用wholeTextFiles
方法可以解决
该方法会现将读取到的数据合并在一起,然后重新进行分区
我们在合并文件的时候,最小分区必须大于等于2
注意:如果设置的最小分区数不能吧文件
wholeTextFiles
-
方法用于读取文件系统中的文件,并将每个文件的内容作为一个值返回,文件路径作为键。这种方法适用于读取较小的文件,尤其是当您希望将每个文件视为一个单独的RDD分区时。
-
作用
读取文件: 从文件系统(如HDFS、S3等)中读取文件,并将每个文件作为一个键值对返回。
键值对: 键是文件的路径,值是文件的内容。
适合小文件: 适用于读取大量较小的文件,每个文件作为一个独立的单元处理
# rdd的分区数指定
from pyspark import SparkContext
# 生成SparkContext类对象
sc = SparkContext()
# 文件数据指定分区数 ,读取目录下的多个小文件
rdd = sc.textFile('/data')
# glom()按照分区查看数据
res = rdd.glom().collect()
# 每个小文件的数据会单独存放一个分区
# 一个分区会对应一个task执行计算
# 当目录下小文件数据较多时,会产生很多task。task较多时会抢占计算资源影响计算速度
# 10条数据文件100个 1万条数据文件10个
# 将小文件合并 一个分区数据 1000条数据在一个分区,对应一个task线程
print(res)
print(len(res))
# wholeTextFiles读取目录中的多个小文件数据
rdd2= sc.wholeTextFiles('/data')
res = rdd2.glom().collect()
print(res)
print(len(res))
3. RDD算子
3.1 RDD算子的介绍和分类
定义:将数据转化为RDD之后,就需要进行RDD的计算,RDD提供了计算方法RDD的方法又称为rdd算子
RDD算子的分类:
- 转换算子-----transformation
- 执行算子 ----action (也被称为任务算子)
transformation
- 转化算子 对RDD数据进行转化计算得到新的RDD,定义了一个线程任务
action
- 执行算子 触发计算任务,让计算任务进行执行,得到结果
- 触发线程执行
3.2 转换算子–transformation
转化算子---- 对RDD数据进行转化计算得到新的RDD,定义了一个线程任务
- 每一个专转换算子都有两种方式,一种是自定义函数,一种是lambda表示式函数
3.2.1 常见的转换算子
- map(func):
作用: 接受一个函数func,将rdd中的每个元素经过函数处理, 将函数返回值保存到新的rdd中
注意点: map算子不会改变经过函数处理后的新rdd的数据结构
示例: rdd.map(lambda x: x *2)
- flatMap(func):
作用: 接受一个函数func,将rdd中的每个元素经过函数处理, 将函数返回值拆分后保存到新的rdd中
注意点: flatMap算子会改变经过函数处理后的新rdd的数据结构, 降一维操作
tips: 自定义函数返回值一定是一个可迭代对象(python容器), 可迭代对象才能进行拆分
示例: rdd.flatMap(lambda x: [x, x + 1])
- filter(func):
作用: 接受一个函数func,rdd中的每个元素经过函数的条件判断后, 返回True对应的元素保存到新rdd中(保留True对应的元素)
示例:rdd.filter(lambda x: x > 10)
- union(otherDataset):
作用: 返回一个包含两个RDD中所有元素的新RDD。 示例: rdd.union(another_rdd)
- intersection(otherDataset):
作用: 返回一个包含两个RDD中共有元素的新RDD。 示例: rdd.intersection(another_rdd)
- distinct([numTasks]]):
作用: 返回一个去除了重复元素的新RDD。 示例: rdd.distinct()
- groupBy(func):
作用: 根据提供的函数func对RDD中的元素进行分组。
对rdd中的元素经过自定义函数分组, 函数的返回值作为分组的key值, 相同key值对应的value值(rdd元素)放到一起
groupBy返回k-v格式的rdd
[(key, (value1, value2,…)),(key, (value1, value2, …)), …]
[[key, value],[key, value], …]
[((key1,key2), value), ((key1,key2),value)]
tips:自定义函数有几个返回值, 就有几组数据
示例: rdd.groupBy(lambda x: x % 2)
- mapValues(func)
rdd2=rdd1.mapValues(f=自定义函数)
对k-v格式rdd中的value值经过自定义函数处理, 将函数的返回值保存到新的rdd中
- groupByKey(func)
对kv格式的rdd根据key值进行分组, 将相同key值对应value值合并到一起, 返回一个新的kv格式rdd
- sortBy(keyfunc=自定义函数,ascending=True/False)
对kv格式的rdd根据自定义函数的值进行排序操作, 返回排序后的新的kv格式rdd
- reduceByKey(func, [numTasks]):
作用: 对键相同的元素应用func函数进行分组聚合。 示例: rdd.reduceByKey(lambda x, y: x + y)
- join(otherDataset, [numTasks]):
作用: 对两个RDD进行内连接。 示例: rdd.join(another_rdd)
- cogroup(otherDataset, [numTasks]):
作用: 对两个RDD进行分组操作。 示例: rdd.cogroup(another_rdd)
- sortByKey([ascending=True], [numTasks]):
作用: 按键排序。 示例: rdd.sortByKey()
- sample(withReplacement, fraction, [seed]):
作用: 从RDD中随机采样元素。 示例: rdd.sample(True, 0.1)
- repartition(numPartitions):
作用: 重新分区RDD,通常用于改变RDD的分区数量。 示例: rdd.repartition(10)
- coalesce(numPartitions, [shuffle]):
作用: 减少RDD的分区数量。 示例: rdd.coalesce(5)
3.2.2 map算子
rdd = rdd.map(f=自定义函数名)
rdd = rdd.map(f=lambda 参数名:参数计算逻辑)
将rdd中的每个元素经过函数处理, 将函数返回值保存到新的rdd中
注意点: map算子不会改变经过函数处理后的新rdd的数据结构,原本是什么结构,转换完还是什么结构
"""
rdd2 = rdd1.map(f=自定义函数名)
rdd2 = rdd1.map(f=lambda 参数名:参数计算逻辑)
将rdd1中的每个元素经过函数处理, 返回rdd2
注意点: map算子不会改变经过函数处理后的rdd的数据结构
"""
from pyspark import SparkContext
# 创建sc对象
sc = SparkContext()
rdd1 = sc.parallelize(c=[1, 2, 3, 4])
# 将rdd1中的每个元素放到列表中 -> [[1],[2],[3],[4]]
def func1(x):
print(f"x的值是:{x}")
return [x]
# 传递自定义函数
rdd2 = rdd1.map(f=func1)
# collect()是action算子
print(rdd2.collect())
# x的值是:3
# x的值是:4
# x的值是:1
# x的值是:2
# [[1], [2], [3], [4]]
# 传递匿名函数
# x-> x是什么数据, 是什么类型, 是什么结构
# x-> 1,2,3,4 整数类型
rdd3 = rdd1.map(f=lambda x: [x])
print(rdd3.collect())
# 需求: rdd1->['spark', 'hive', 'hadoop', 'flink', 'spark', 'hive'], 将rdd1转换成新rdd-> [('spark', 1), ('hive', 1)]
rdd1 = sc.parallelize(c=['spark', 'hive', 'hadoop', 'flink', 'spark', 'hive'])
def func2(x):
return (x, 1)
rdd2 = rdd1.map(f=func2)
print(rdd2.collect())
rdd4 = rdd1.map(f=lambda x: (x,1))
print(rdd4.collect())
3.2.3 flatMap() 算子
将rdd1中的每个元素经过函数处理, 将函数返回值拆分后保存到新的rdd2中
注意点: flatMap算子会改变经过函数处理后的新rdd的数据结构, 降一维操作
什么叫做降维操作:
简单点来说就是将嵌套的类型变为不嵌套的类型
例如 【【1,2,3】,【4,5,6】】这就是一个二维的
【1,2,3,4,5,6】这就是一个一维的
从【【1,2,3】,【4,5,6】】—》【1,2,3,4,5,6】这个过程就叫做降维操作,faltMap就是现将嵌套的数据,变为单一的类型数据
"""
rdd2 = rdd1.flatMap(f=自定义函数名)
rdd2 = rdd1.flatMap(f=lambda 参数名:参数计算逻辑)
将rdd1中的每个元素经过函数处理, 将函数返回值拆分后保存到新的rdd2中
注意点: flatMap算子会改变经过函数处理后的新rdd的数据结构, 降一维操作
应用场景: 处理嵌套的rdd -> [[1,2,3], [4,5,6]] [(1,2,3),(4,5,6)]
tips: 自定义函数返回值一定是一个可迭代对象(python容器), 可迭代对象才能进行拆分
"""
from pyspark import SparkContext
# 创建sc对象
sc = SparkContext()
rdd1 = sc.parallelize(c=[[1,2,3],[4,5,6]])
rdd2 = rdd1.flatMap(lambda x:x)
print(rdd2.collect())
print("****************")
rdd1 = sc.parallelize(c=[[1,2,3],[4,5,6]])
rdd2 = rdd1.flatMap(lambda x:x+[10])
print(rdd2.collect())
print("****************************")
# 需求: rdd1->['spark,hive,spark,hadoop', 'hive,python,java', 'flink,spark,hadoop'], 将rdd1转换成rdd2->['spark','hive','spark','hadoop',...]
rdd3 = sc.parallelize(c=['spark,hive,spark,hadoop', 'hive,python,java', 'flink,spark,hadoop'])
def fun4(x):
return x.split(',')
rdd5= rdd3.flatMap(fun4)
print(rdd5.collect())
rdd6 = rdd5.map(lambda x:(x,1))
print(rdd6.collect())
3.2.3 filter算子
rdd中的每个元素经过函数的条件判断后, 返回True对应的元素保存到新rdd中
(保留True对应的元素)
应用场景: 数据清洗
"""
rdd2=rdd1.filter(lambda 参数: 参数条件判断表达式)
rdd1中的每个元素经过函数的条件判断后, 返回True对应的元素保存到新rdd2中(保留True对应的元素)
应用场景: 数据清洗
"""
from pyspark import SparkContext
# 创建sc对象
sc = SparkContext()
# 需求: 获取rdd1中性别为男的人员的信息, 获取rdd1中年龄大于20的人员信息
rdd1 = sc.parallelize(c=[['小明', '男', 18],
['小红', '女', 16],
['翠花', '女', 35],
['隔壁老王', '男', 40]])
rdd2=rdd1.filter(lambda x:x[1]=='男')
rdd3=rdd1.filter(lambda x:x[2]>20)
rdd4= rdd1.filter(lambda x:x[1]=='男' and x[2]>20)
print(rdd2.collect())
print(rdd3.collect())
print(rdd4.collect())
3.2.4 distinct算子
- rdd.distinct()
对rdd中的元素进行去重操作, 返回一个新的rdd 应用场景: 数据清洗
"""
rdd2=rdd1.distinct()
对rdd1中的元素进行去重操作, 返回一个新的rdd2
应用场景: 数据清洗
"""
from pyspark import SparkContext
# 创建sc对象
sc = SparkContext()
rdd1 = sc.parallelize(c=[1, 2, 3, 2, 5, 6, 3, 7, 1])
rdd2 = rdd1.distinct()
print(rdd2.collect())
print('----------------------')
rdd3 = sc.parallelize(c=[[1, 2, 3], [2, 5, 6], [1, 2, 3]]) # 发生报错
rdd4 = rdd3.distinct()
print(rdd4.collect())
注意:我们的去重只能对一维的数据进行去重,多维的数据去重会发生报错
3.2.5 groupBy&mapValues算子
我们的 groupBy&mapValues算子通常是在一起使用的:
- rdd.groupBy(f=自定义函数名)
对rdd中的元素经过自定义函数分组, 函数的返回值作为分组的key值, 相同key值对应的value值(rdd元素)放到一起
groupBy返回k-v格式的rdd
[(key, (value1, value2,…)),(key, (value1, value2, …)), …]
[[key, value],[key, value], …]
[((key1,key2), value), ((key1,key2),value)]
tips:自定义函数有几个返回值, 就有几组数据
- rdd.mapValues(f=自定义函数)
对k-v格式rdd中的value值经过自定义函数处理, 将函数的返回值保存到新的rdd中
我们的groupBy通常是用于对于非k-v结构的数据进行分组的
如果我们单独执行groupBy算子,我们的value值是内存的地址
sc = SparkContext()
rdd1 = sc.parallelize(['a', 'b', 'c', 'a'])
# hash(x) -> 对字母进行hash计算, 返回整数
# hash(x) % 2 -> 返回余数 0,1
rdd2 = rdd1.groupBy(f=lambda x: hash(x) % 2)
print(rdd2.collect())
如果我们想要value值是具体的值的话需要使用mapVlaues算子
"""
rdd2=rdd1.groupBy(f=自定义函数名)
对rdd1中的元素经过自定义函数分组, 函数的返回值作为分组的key值, 相同key值对应的value值(rdd元素)放到一起
groupBy返回k-v格式的rdd
[(key, (value1, value2,...)),(key, (value1, value2, ...)), ...]
[[key, value],[key, value], ...]
[((key1,key2), value), ((key1,key2),value)]
tips:自定义函数有几个返回值, 就有几组数据
rdd2=rdd1.mapValues(f=自定义函数)
对k-v格式rdd中的value值经过自定义函数处理, 将函数的返回值保存到新的rdd2中
"""
from pyspark import SparkContext
# 创建sc对象
sc = SparkContext()
rdd1 = sc.parallelize(['a', 'b', 'c', 'a'])
# hash(x) -> 对字母进行hash计算, 返回整数
# hash(x) % 2 -> 返回余数 0,1
rdd2 = rdd1.groupBy(f=lambda x: hash(x) % 2)
print(rdd2.collect())
# 对k-v格式rdd中的value值进行转换
# list(x) -> 将x转换成列表类型, 获取内存地址中的数据
def func(x):
print(x)
print(rdd2.mapValues(func).collect())
# x->k-v中的value值
rdd3 = rdd2.mapValues(f=lambda x: list(x))
print(rdd3.collect())
# 需求: 根据rdd1中的性别值进行分组
rdd1 = sc.parallelize(c=[['小明', '男', 18],
['小红', '女', 16],
['翠花', '女', 35],
['隔壁老王', '男', 40]])
rdd2 = rdd1.groupBy(lambda x:x[1])
rdd3= rdd2.mapValues(lambda x: list(x))
print(rdd3.collect())
print("------------------------------------------")
rdd2 = rdd1.groupBy(lambda x: hash(x[1])%2)
rdd3= rdd2.mapValues(lambda x: list(x))
print(rdd3.collect())
3.2.6 K-V格式RDD算子
- rdd.groupByKey()
对kv格式的rdd根据key值进行分组, 将相同key值对应value值合并到一起, 返回一个新的kv格式rdd
- rdd.reduceByKey(func=自定义函数)
对kv格式的rd根据key值进行分组, 将相同key值对应的value值经过自定义函数进行聚合操作, 返回一个聚合后的新的kv格式rdd
- rdd.sortByKey(ascending=True/False)
对kv格式的rdd根据key值进行排序操作, 返回排序后的新的kv格式rdd
- rdd.sortBy(keyfunc=自定义函数,ascending=True/False)
对kv格式的rdd根据自定义函数的值进行排序操作, 返回排序后的新的kv格式rdd
"""
rdd2=rdd1.groupByKey()
对kv格式的rdd1根据key值进行分组, 将相同key值对应value值合并到一起, 返回一个新的kv格式rdd2
rdd2=rdd1.reduceByKey(func=自定义函数)
对kv格式的rdd1根据key值进行分组, 将相同key值对应的value值经过自定义函数进行聚合操作, 返回一个聚合后的新的kv格式rdd2
rdd2=rdd1.sortByKey(ascending=True/False)
对kv格式的rdd1根据key值进行排序操作, 返回排序后的新的kv格式rdd2
rdd2=rdd1.sortBy(keyfunc=自定义函数,ascending=True/False)
对kv格式的rdd1根据自定义函数的值进行排序操作, 返回排序后的新的kv格式rdd
`ascending: 排序方式, 默认升序True, 降序False`
"""
from pyspark import SparkContext
# 创建sc对象
sc = SparkContext()
# 需求: 词频统计 rdd1->['spark,hive,spark,hadoop', 'hive,python,java', 'flink,spark,hadoop']
# 统计出每个单词出现的次数, 根据次数进行降序排序, 返回新rdd2->[('spark', 5), ('hive', 3), ('hadoop', 2), ...]
rdd1 = sc.parallelize(c=['spark,hive,spark,hadoop', 'hive,python,java', 'flink,spark,hadoop'])
rdd2 = rdd1.flatMap(lambda x: x.split(','))
print(f'rdd2的结果是:{rdd2.collect()}')
rdd3 = rdd2.map(lambda x: (x, 1))
print(f'rdd3的结果是:{rdd3.collect()}')
rdd4 = rdd3.reduceByKey(lambda x, y: x + y)
print(f'rdd4的结果是:{rdd4.collect()}')
rdd5= rdd4.sortBy(keyfunc=lambda x: x[1],ascending=False)
print(f'rdd5的结果是:{rdd5.collect()}')
print("-----------------------------------------")
# # 创建kv格式rdd对象
rdd1 = sc.parallelize(c=[('a', 1), ('b', 2), ('a', 2), ('c', 3), ('b', 4)])
### groupByKey()分组操作
res = rdd1.groupByKey().mapValues(lambda x: list(x))
print(f'res的结果是:{res.collect()}')
### reduceByKey()分组聚合操作
res1 = rdd1.reduceByKey(lambda x,y:x+y)
print(f'res1的结果是:{res1.collect()}')
### 自定义值排序
res2 = res1.sortBy(keyfunc=lambda x: x[1],ascending=False)
print(f'res2的结果是:{res2.collect()}')
res3 = res1.sortBy(keyfunc=lambda x: x[1],ascending=True)
print(f'res2的结果是:{res3.collect()}')
### key排序
res2 = res1.sortByKey(ascending=False)
print(f'res2的结果是:{res2.collect()}')
res3 = res1.sortByKey(ascending=True)
print(f'res2的结果是:{res3.collect()}')
3.2. 7 关联算子
关联算子就是跟我们mysql中的关联条件是一样的,只不过是表达方式不同,但是作用是完全相同的
- rdd.join(other=rdd2)
内连接: 交集, 合并相同key值的value值
- rdd.leftOuterJoin(other=rdd2)
左连接: 保留左rdd中所有的value数据, 右rdd的key关联上的value保留, 关联不上的用None填充
- rdd.rightOuterJoin(other=rdd2)
右连接: 保留右rdd中所有的value数据, 左rdd的key关联上的value保留, 关联不上的用None填充
- rdd.fullOuterJoin(other=rdd2)
全连接(满外连接): 保留左右rdd中所有的value数据, 关联不上的用None填充
"""
rdd3=rdd1.join(other=rdd2)
内连接: 交集, 合并相同key值的value值
rdd3=rdd1.leftOuterJoin(other=rdd2)
左连接: 保留左rdd中所有的value数据, 右rdd的key关联上的value保留, 关联不上的用None填充
rdd3=rdd1.rightOuterJoin(other=rdd2)
右连接: 保留右rdd中所有的value数据, 左rdd的key关联上的value保留, 关联不上的用None填充
rdd3=rdd1.fullOuterJoin(other=rdd2)
全连接(满外连接): 保留左右rdd中所有的value数据, 关联不上的用None填充
"""
from pyspark import SparkContext
# 创建sc对象
sc = SparkContext()
# 创建kv格式rdd对象
rdd1 = sc.parallelize([("a", 1),
("b", 4)])
rdd2 = sc.parallelize([("a", 2),
("a", 3),
('c', 5)])
## 内连接
rdd3 = rdd1.join(other=rdd2)
print(rdd3.collect())
## 左连接
rdd4= rdd1.leftOuterJoin(other=rdd2)
print(rdd4.collect())
## 右连接
rdd5= rdd1.rightOuterJoin(other=rdd2)
print(rdd5.collect())
## 全连接
rdd6 = rdd1.fullOuterJoin(other=rdd2)
print(rdd6.collect())
3.3 action算子
-
collect()
取出所有值,大量数据慎用collect(), 容易造成内存溢出
- rdd.collect()
-
collectAsMap()
将rdd的k-v格式数据转换成字典格式 例如:[(‘男’,45), (‘女’,7)] -> {‘男’:45, ‘女’:7}
-
reduce()
非k-v类型数据的相关计算
- rdd.reduce(lambda 参数1,参数2:两个参数计算)
-
count()
统计rdd元素个数
- rdd.count()
-
take()
取出指定数量值
- rdd.take(数量)
-
saveAsTextFile()
将结果rdd保存到hdfs的目录中, 保存的目录是不存在的, 会自动创建
- rdd.saveAsTextFile(path)
- collect()
> 取出所有值,大量数据慎用collect(), 容易造成内存溢出
- rdd.collect()
- collectAsMap()
> 将rdd的k-v格式数据转换成字典格式 例如:[('男',45), ('女',7)] -> {'男':45, '女':7}
- reduce()
> 非k-v类型数据的相关计算
- rdd.reduce(lambda 参数1,参数2:两个参数计算)
- count()
> 统计rdd元素个数
- rdd.count()
- take()
> 取出指定数量值
- rdd.take(数量)
- saveAsTextFile()
> 将结果rdd保存到hdfs的目录中, 保存的目录是不存在的, 会自动创建
- rdd.saveAsTextFile(path)
"""
action算子: 触发计算任务, 得到最终结果(不再是rdd)
rdd.collect(): 收集rdd中所有的元素, 使用前考虑rdd的数据量问题(数量大容易造成内存溢出)
rdd.take(num=): 收集指定num数量的rdd元素
rdd.takeOrdered(num=, key=): 进行升序后收集指定num数量的rdd元素
rdd.reduce(f=自定义函数): 对非k-v格式rdd进行聚合操作
rdd.count()/sum()/mean()/max()/min(): 聚合操作
rdd.countByValue(): 统计非k-v格式rdd中值出现的次数, 返回k-v格式字典(k->rdd元素值, v->次数)
rdd.saveAsTextFile(path=):
将rdd结果保存到文件中, 可以是本地磁盘路径(file://)也可以是HDFS路径(hdfs://ip:8020)
注意点: 路径是一个不存在的目录路径
rdd.collectAsMap(): 将k-v格式的rdd保存到字典中 {k1:v1, k2:v2, ...}
rdd.countByKey(): 统计k-v格式rdd中key值出现的次数, 返回k-v格式字典(k->rdd中的key值, v->次数)
"""
# 导入SparkContext类
from pyspark import SparkContext
# 创建sc对象
sc = SparkContext(appName='action_demo')
# 创建一个并行化的RDD
rdd1 = sc.parallelize([1, 4, 3, 3, 7, 7, 2, 6])
# 收集rdd中所有的元素
print(rdd1.collect())
# 收集指定num数量的rdd元素
print(rdd1.take(num=3))
# 进行升序后收集指定num数量的rdd元素
print(rdd1.takeOrdered(num=3, key=lambda x: -x))
# 对非k-v格式rdd进行聚合操作
print(rdd1.reduce(lambda x, y: x + y))
# 聚合操作
print(rdd1.count())
print(rdd1.max())
print(rdd1.mean())
print(rdd1.min())
print(rdd1.stdev())
# 统计非k-v格式rdd中值出现的次数, 返回k-v格式字典(k->rdd元素值, v->次数)
print(rdd1.countByValue())
# 将rdd结果保存到文件中
rdd1.saveAsTextFile(path='/data/date')
# 创建一个并行化的k-v格式RDD
rdd2 = sc.parallelize([('a', 10), ('b', 20), ('c', 30)])
# 将k-v格式的rdd保存到字典中
print(rdd2.collectAsMap())
# 统计k-v格式rdd中key值出现的次数, 返回k-v格式字典
print(rdd2.countByKey())
💕💕在这篇文章中,我们深入探讨了RDD的相关使用,希望能为读者带来启发和收获。
💖💖感谢大家的阅读,如果您有任何疑问或建议,欢迎在评论区留言交流。同时,也请大家关注我的后续文章,一起探索更多知识领域。
愿我们共同进步,不断提升自我。💞💞💞