【Spark计算引擎----第二篇(RDD):一篇文章带你清楚什么是RDD?RDD的概念,RDD的特性,怎么创建一个RDD,RDD的算子】

news2024/12/25 1:51:00

前言:
💞💞大家好,我是书生♡,本阶段和大家一起分享和探索大数据技术Spark—RDD,本篇文章主要讲述了:RDD的概念,RDD的特性,怎么创建一个RDD,RDD的算子等等。欢迎大家一起探索讨论!!!
💞💞代码是你的画笔,创新是你的画布,用它们绘出属于你的精彩世界,不断挑战,无限可能!

个人主页⭐: 书生♡
gitee主页🙋‍♂:闲客
专栏主页💞:大数据开发
博客领域💥:大数据开发,java编程,前端,算法,Python
写作风格💞:超前知识点,干货,思路讲解,通俗易懂
支持博主💖:关注⭐,点赞、收藏⭐、留言💬

目录

  • 1. RDD的介绍
    • 1.1 什么是RDD
    • 1.2 RDD的特性
  • 2. RDD的创建
    • 2.1 RDD的创建方式
    • 2.2 创建RDD
      • 2.2.1 创建RDD--可迭代数据
      • 2.2.2 创建RDD---文件/目录
      • 2.2.3 设置分区
      • 2.2.4 小文件数据读取转化为RDD
  • 3. RDD算子
    • 3.1 RDD算子的介绍和分类
    • 3.2 转换算子--transformation
      • 3.2.1 常见的转换算子
      • 3.2.2 map算子
      • 3.2.3 flatMap() 算子
      • 3.2.3 filter算子
      • 3.2.4 distinct算子
      • 3.2.5 groupBy&mapValues算子
      • 3.2.6 K-V格式RDD算子
      • 3.2. 7 关联算子
    • 3.3 action算子

1. RDD的介绍

1.1 什么是RDD

RDD是一种弹性分布式数据集合是spark中最基本的数据类型,它提供了容错能力和并行处理的能力。

  • RDD(Resilient Distributed Dataset)弹性分布式数据集合,是Spark中最基本的数据抽象结构,代表一个不可变(只读)、可分区、里面的元素可并行计算的集合。
    • 是Spark中的一种数据类型,管理spark的内存数据。类似于python中的列表类型 [1,2,3,4]
    • spark中还有dataframe,dataset数据类型
    • RDD同时还提供各种计算方法(map/reduce/filter/groupBy/flatMap)
  • 弹性
    • 可以对海量数据根据需求分成多份(分区),每一份数据会由对应的task线程执行计算
  • 分布式
    • 利用集群中多台机器资源进行计算
  • 数据集合
    • 规定数据形式 类似Python中的列表 []

RDD可以看作是Spark的一个对象,它本身运行于内存中,如读文件是一个RDD,对文件计算是一个RDD,结果集也是一个RDD,不同的分片、 数据之间的依赖 、key-value类型的map数据都可以看做RDD。

在这里插入图片描述

1.2 RDD的特性

RDD五大特性:分区只读依赖缓存checkpoint

  • 分区
    • 可以将计算的海量数据分成多份,需要分成多少个分区可以通过方法指定
    • 每个分区都可以对应一个task线程执行计算
  • 只读
    • rdd中的数据不能直接修改,一旦修改当前的rdd一定会生成一个新的RDD
    • rdd本身存储的数据只能读取
  • 依赖(血统)
    • rdd之间是有依赖/一个因果关系的
    • 新的rdd是通过旧的rdd计算得到的
  • 缓存
    • 容错机制
    • 将RDD进行临时持久化操作
    • spark应用程序结束后, 缓存的RDD会被清空
    • 缓存的RDD优先存储在内存中(内存满了才会存储在磁盘中)
  • checkpoint
    • 容错机制
    • checkpoint可以将数据存储在分布式存储系统中,比如hdfs,实现永久存储
    • spark应用程序结束后, checkpoint的RDD不会被清空

2. RDD的创建

2.1 RDD的创建方式

在Apache Spark中,创建Resilient Distributed Dataset (RDD) 的方式主要有以下几种:

  • 从本地集合创建(可迭代的数据集合): 适用于小型数据集或测试场景。
  • 从文件系统创建: 适用于大型数据集和生产环境。
  • 从其他RDD转换创建: 适用于数据处理和转换。
  • 从RDD序列化创建: 适用于从持久化或序列化的RDD数据恢复。
  • 从网络流创建: 适用于实时数据处理。
  • 从外部数据源创建: 适用于从多种数据源读取数据。
  1. 从本地集合创建:

    • 方法: parallelize()
    • 描述: 将本地Python集合(如列表、元组等)转换为RDD。
    • 示例:
      data = [1, 2, 3, 4, 5]
      rdd = sc.parallelize(data)
      
  2. 从文件系统创建:

    • 方法: textFile(), wholeTextFiles(), binaryFiles(), binaryRecords(), newAPIHadoopFile(), hadoopRDD()
    • 描述: 从HDFS、本地文件系统或任何支持的文件系统中读取数据并创建RDD。
    • 示例:
      rdd = sc.textFile("hdfs://node1:9000/data/input.txt")
      
  3. 从其他RDD转换创建:

    • 方法: 使用RDD转换操作(如 map(), filter(), flatMap(), union(), join() 等)
    • 描述: 通过对现有的RDD执行转换操作来创建新的RDD。
    • 示例:
      rdd1 = sc.parallelize([1, 2, 3, 4, 5])
      rdd2 = rdd1.map(lambda x: x * 2)
      
  4. 从RDD序列化创建:

    • 方法: 使用序列化或反序列化的工具,如picklejoblib
    • 描述: 将序列化的RDD数据反序列化为新的RDD。
    • 示例:
      import joblib
      serialized_rdd = joblib.load("serialized_rdd.pkl")
      rdd = sc.parallelize(serialized_rdd)
      
  5. 从网络流创建:

    • 方法: 使用DStream API(在Spark Streaming中)
    • 描述: 从网络流中读取实时数据并创建RDD。
    • 示例:
      ssc = StreamingContext(sc, batchDuration=1)
      dstream = ssc.socketTextStream("localhost", 9999)
      rdd = dstream.dstream_to_rdd()
      
  6. 从外部数据源创建:

    • 方法: 使用DataFrameReaderSparkSession读取支持的外部数据源(如CSV、JSON、Parquet等)。
    • 描述: 虽然主要创建DataFrame或Dataset,但可以通过转换为RDD。
    • 示例:
      df = spark.read.format("csv").option("header", "true").load("file.csv")
      rdd = df.rdd
      

2.2 创建RDD

我们这里使用从本地创建(通过可迭代数据集合)创建以及通过文件和目录进行创建

将需要计算的数据转为rdd数据,就可以利用spark的内存计算方法进行分布式计算操作,这些计算方法就是由rdd提供的

  • rdd数据的转化方法是由SparkContext提供的,所以需要先生成SparkContext对象,

  • SparkContext称为Spark的入口类,通过类可以生成对象,调用对象方法将数据转为rdd数据

2.2.1 创建RDD–可迭代数据

可迭代数据:“可迭代数据”指的是可以使用迭代器(iterator)遍历的数据结构。
包括但是不仅限于:列表,字符串,元组,字典等等

# python中容器类型可以转换成rdd对象
# 导入模块
from pyspark import SparkContext

# todo: 1-创建sc对象
# sc对象是spark core入口类的对象
sc = SparkContext()

# todo: 2-创建RDD对象
# str类型
str1 = 'pyspark'
# 数值类型 -> 不能转换成RDD
int1 = 10
float1 = 3.14
# bool类型 -> 不能转换成RDD
bool1 = True
# list类型
list1 = [1, 2, 3, 4, 5, 6]
# tuple类型
tuple1 = (1, 2, 3, 4)
# dict类型
dict1 = {'name': '小明', 'age': 18, 'height': 180}
# set类型
set1 = {'a', 'b', 'c', 'd'}
# 借助sc对象的parallelize方法创建RDD对象
# 字符串中的每个字符就是rdd中的每个元素
# rdd1 = sc.parallelize(c=str1)
# 列表中的每个元素就是rdd中的每个元素
# rdd1 = sc.parallelize(c=list1)
# 元组中的每个元素就是rdd中的每个元素
# rdd1 = sc.parallelize(c=tuple1)
# 将字典的key值转换成rdd
# rdd1 = sc.parallelize(c=dict1)
# rdd1 = sc.parallelize(c=dict1.values())
# 集合中的每个元素就是rdd中的每个元素
rdd1 = sc.parallelize(c=set1)

# todo: 3-收集RDD结果
# 借助rdd对象的collect方法(算子)进行收集
result = rdd1.collect()
print(result)

2.2.2 创建RDD—文件/目录

我们要先创建一个目录里面有文件存储
在这里插入图片描述

从文件中的数据创建RDD我们使用textfile这个函数
默认是读取HDFS中的文件/目录 hdfs://node1:8020/path
读取本地磁盘文件/目录 file:///path

注意:文件中我们的每一行数据都是RDD元素

from pyspark import SparkContext


# 创建sc对象
sc = SparkContext()

# 读取文件数据转换成RDD
"""
8020: HDFS的服务端程序端口号
9870: 3版本HDFS的客户端程序端口号  2版本50070
默认是读取HDFS中的文件/目录 hdfs://node1:8020/path
读取本地磁盘文件/目录 file:///path
"""
# 读取hdfs文件数据, 一行数据对应rdd中一个元素
rdd1 = sc.textFile('/data/words.txt')
result = rdd1.collect()
# print(result)

# 读取hdfs目录文件数据, 读取目录下的所有文件数据, 一行数据对应rdd中一个元素
rdd2 = sc.textFile(name='hdfs://node1:8020/data')
print(type(rdd2))
result = rdd2.collect()
print(result)

# 读取本地磁盘文件数据, 一行数据对应rdd中一个元素
rdd3 = sc.textFile('file:///root/data/employees.json')
result = rdd3.collect()
print(f"rdd3的数据是:{result}")
# 读取本地磁盘目录文件数据
rdd4 = sc.textFile('file:///root/data')
result = rdd4.collect()
print(f"rdd4的数据是:{result}")

2.2.3 设置分区

RDD有一个特性就是弹性的也就是可以设置多个分区

glom 方法

  • glom 方法主要用于将一个分区中的所有元素组合成一个列表。这对于需要获取每个分区中所有元素的场景非常有用。

  • 作用
    合并分区中的元素: glom 将一个分区中的所有元素合并成一个列表,每个分区对应一个列表。
    便于调试和测试: 通常用于调试或测试目的,帮助理解数据是如何分布的。

我们设置分区的时候,使用glom()这个方法,默认的分区数是2个,不能小于2个

# 生成SparkContext类对象
sc = SparkContext()

# python转化时指定分区数
data = [1,2,3,4,5,6]
# numSlices指定分区数
rdd = sc.parallelize(data, numSlices=8)
# glom()按照分区查看数据
res = rdd.glom().collect()
print(res)
print("***********************")
rdd1 = sc.parallelize(data, numSlices=3)
# glom()按照分区查看数据
res1 = rdd1.glom().collect()
print(res1)
print("***********************")
rdd2 = sc.parallelize(data, numSlices=6)
# glom()按照分区查看数据
res2 = rdd2.glom().collect()
print(res2)

在这里插入图片描述

读取文件数据转换成rdd
minPartitions:设置rdd的最小分区数, 实际分区数有可能比设置的多一个分区

文件数据指定分区数
"""
1B=1字节, 1K=1024B=1024字节
单个文件分区规则:
文件字节数/分区数 = 分区字节数...余数
余数/分区字节数 = 结果值  结果值 大于10% 会多创建

目录下多个文件分区规则:
所有文件字节数/分区数 = 分区字节数...余数
每个文件字节数/分区字节数 = 分区数...余数
余数/分区字节数 = 结果值  结果值 大于10% 会多创建
"""

2.2.4 小文件数据读取转化为RDD

在一个目录下,有多个文件,如果文件的大小不够一个块的大小,一个文件就对应一个分区,文件超过一个块,那就一个block(128M)块对应一个分区。

textFile读取多个小文件数据时, 最少一个文件对应一个分区, 一个分区由一个task线程执行 HDFS中一个块数据是128M大小,
如果块数据不满128M, 造成资源浪费 小文件越多,task线程越多,线程需要资源进行执行, 造成资源浪费


使用wholeTextFiles方法可以解决

该方法会现将读取到的数据合并在一起,然后重新进行分区

我们在合并文件的时候,最小分区必须大于等于2
注意:如果设置的最小分区数不能吧文件

wholeTextFiles

  • 方法用于读取文件系统中的文件,并将每个文件的内容作为一个值返回,文件路径作为键。这种方法适用于读取较小的文件,尤其是当您希望将每个文件视为一个单独的RDD分区时。

  • 作用
    读取文件: 从文件系统(如HDFS、S3等)中读取文件,并将每个文件作为一个键值对返回。
    键值对: 键是文件的路径,值是文件的内容。
    适合小文件: 适用于读取大量较小的文件,每个文件作为一个独立的单元处理

# rdd的分区数指定
from pyspark import SparkContext

# 生成SparkContext类对象
sc = SparkContext()

# 文件数据指定分区数  ,读取目录下的多个小文件
rdd  = sc.textFile('/data')
# glom()按照分区查看数据
res = rdd.glom().collect()
# 每个小文件的数据会单独存放一个分区
# 一个分区会对应一个task执行计算
# 当目录下小文件数据较多时,会产生很多task。task较多时会抢占计算资源影响计算速度
# 10条数据文件100个  1万条数据文件10个
# 将小文件合并 一个分区数据 1000条数据在一个分区,对应一个task线程
print(res)
print(len(res))

# wholeTextFiles读取目录中的多个小文件数据
rdd2= sc.wholeTextFiles('/data')
res = rdd2.glom().collect()
print(res)
print(len(res))

3. RDD算子

3.1 RDD算子的介绍和分类

定义:将数据转化为RDD之后,就需要进行RDD的计算,RDD提供了计算方法RDD的方法又称为rdd算子

RDD算子的分类:

  • 转换算子-----transformation
  • 执行算子 ----action (也被称为任务算子)
  • transformation
    • 转化算子 对RDD数据进行转化计算得到新的RDD,定义了一个线程任务
  • action
    • 执行算子 触发计算任务,让计算任务进行执行,得到结果
    • 触发线程执行

3.2 转换算子–transformation

转化算子---- 对RDD数据进行转化计算得到新的RDD,定义了一个线程任务

  • 每一个专转换算子都有两种方式,一种是自定义函数,一种是lambda表示式函数

3.2.1 常见的转换算子

  • map(func):

作用: 接受一个函数func,将rdd中的每个元素经过函数处理, 将函数返回值保存到新的rdd中
注意点: map算子不会改变经过函数处理后的新rdd的数据结构
示例: rdd.map(lambda x: x *2)

  • flatMap(func):

作用: 接受一个函数func,将rdd中的每个元素经过函数处理, 将函数返回值拆分后保存到新的rdd中
注意点: flatMap算子会改变经过函数处理后的新rdd的数据结构, 降一维操作
tips: 自定义函数返回值一定是一个可迭代对象(python容器), 可迭代对象才能进行拆分
示例: rdd.flatMap(lambda x: [x, x + 1])

  • filter(func):

作用: 接受一个函数func,rdd中的每个元素经过函数的条件判断后, 返回True对应的元素保存到新rdd中(保留True对应的元素)
示例:rdd.filter(lambda x: x > 10)

  • union(otherDataset):

作用: 返回一个包含两个RDD中所有元素的新RDD。 示例: rdd.union(another_rdd)

  • intersection(otherDataset):

作用: 返回一个包含两个RDD中共有元素的新RDD。 示例: rdd.intersection(another_rdd)

  • distinct([numTasks]]):

作用: 返回一个去除了重复元素的新RDD。 示例: rdd.distinct()

  • groupBy(func):

作用: 根据提供的函数func对RDD中的元素进行分组。
对rdd中的元素经过自定义函数分组, 函数的返回值作为分组的key值, 相同key值对应的value值(rdd元素)放到一起
groupBy返回k-v格式的rdd
[(key, (value1, value2,…)),(key, (value1, value2, …)), …]
[[key, value],[key, value], …]
[((key1,key2), value), ((key1,key2),value)]
tips:自定义函数有几个返回值, 就有几组数据
示例: rdd.groupBy(lambda x: x % 2)

  • mapValues(func)

rdd2=rdd1.mapValues(f=自定义函数)
对k-v格式rdd中的value值经过自定义函数处理, 将函数的返回值保存到新的rdd中

  • groupByKey(func)

对kv格式的rdd根据key值进行分组, 将相同key值对应value值合并到一起, 返回一个新的kv格式rdd

  • sortBy(keyfunc=自定义函数,ascending=True/False)

对kv格式的rdd根据自定义函数的值进行排序操作, 返回排序后的新的kv格式rdd

  • reduceByKey(func, [numTasks]):

作用: 对键相同的元素应用func函数进行分组聚合。 示例: rdd.reduceByKey(lambda x, y: x + y)

  • join(otherDataset, [numTasks]):

作用: 对两个RDD进行内连接。 示例: rdd.join(another_rdd)

  • cogroup(otherDataset, [numTasks]):

作用: 对两个RDD进行分组操作。 示例: rdd.cogroup(another_rdd)

  • sortByKey([ascending=True], [numTasks]):

作用: 按键排序。 示例: rdd.sortByKey()

  • sample(withReplacement, fraction, [seed]):

作用: 从RDD中随机采样元素。 示例: rdd.sample(True, 0.1)

  • repartition(numPartitions):

作用: 重新分区RDD,通常用于改变RDD的分区数量。 示例: rdd.repartition(10)

  • coalesce(numPartitions, [shuffle]):

作用: 减少RDD的分区数量。 示例: rdd.coalesce(5)

3.2.2 map算子

rdd = rdd.map(f=自定义函数名)
rdd = rdd.map(f=lambda 参数名:参数计算逻辑)

将rdd中的每个元素经过函数处理, 将函数返回值保存到新的rdd中

注意点: map算子不会改变经过函数处理后的新rdd的数据结构,原本是什么结构,转换完还是什么结构

"""
rdd2 = rdd1.map(f=自定义函数名)
rdd2 = rdd1.map(f=lambda 参数名:参数计算逻辑)
将rdd1中的每个元素经过函数处理, 返回rdd2
注意点: map算子不会改变经过函数处理后的rdd的数据结构
"""

from pyspark import SparkContext

# 创建sc对象
sc = SparkContext()

rdd1 = sc.parallelize(c=[1, 2, 3, 4])


# 将rdd1中的每个元素放到列表中 -> [[1],[2],[3],[4]]
def func1(x):
	print(f"x的值是:{x}")
	return [x]


# 传递自定义函数
rdd2 = rdd1.map(f=func1)
# collect()是action算子
print(rdd2.collect())
# x的值是:3
# x的值是:4
# x的值是:1
# x的值是:2
# [[1], [2], [3], [4]]
# 传递匿名函数
# x-> x是什么数据, 是什么类型, 是什么结构
# x-> 1,2,3,4 整数类型
rdd3 = rdd1.map(f=lambda x: [x])
print(rdd3.collect())

# 需求: rdd1->['spark', 'hive', 'hadoop', 'flink', 'spark', 'hive'], 将rdd1转换成新rdd-> [('spark', 1), ('hive', 1)]
rdd1 = sc.parallelize(c=['spark', 'hive', 'hadoop', 'flink', 'spark', 'hive'])

def func2(x):
	return (x, 1)
rdd2 = rdd1.map(f=func2)
print(rdd2.collect())

rdd4 = rdd1.map(f=lambda x: (x,1))
print(rdd4.collect())


3.2.3 flatMap() 算子

将rdd1中的每个元素经过函数处理, 将函数返回值拆分后保存到新的rdd2中
注意点: flatMap算子会改变经过函数处理后的新rdd的数据结构, 降一维操作

什么叫做降维操作:
简单点来说就是将嵌套的类型变为不嵌套的类型
例如 【【1,2,3】,【4,5,6】】这就是一个二维的
【1,2,3,4,5,6】这就是一个一维的
从【【1,2,3】,【4,5,6】】—》【1,2,3,4,5,6】这个过程就叫做降维操作,faltMap就是现将嵌套的数据,变为单一的类型数据

"""
rdd2 = rdd1.flatMap(f=自定义函数名)
rdd2 = rdd1.flatMap(f=lambda 参数名:参数计算逻辑)
将rdd1中的每个元素经过函数处理, 将函数返回值拆分后保存到新的rdd2中
注意点: flatMap算子会改变经过函数处理后的新rdd的数据结构, 降一维操作
应用场景: 处理嵌套的rdd -> [[1,2,3], [4,5,6]] [(1,2,3),(4,5,6)]
tips: 自定义函数返回值一定是一个可迭代对象(python容器), 可迭代对象才能进行拆分
"""
from pyspark import SparkContext

# 创建sc对象
sc = SparkContext()

rdd1 = sc.parallelize(c=[[1,2,3],[4,5,6]])
rdd2 = rdd1.flatMap(lambda x:x)
print(rdd2.collect())
print("****************")

rdd1 = sc.parallelize(c=[[1,2,3],[4,5,6]])
rdd2 = rdd1.flatMap(lambda x:x+[10])
print(rdd2.collect())
print("****************************")
# 需求: rdd1->['spark,hive,spark,hadoop', 'hive,python,java', 'flink,spark,hadoop'], 将rdd1转换成rdd2->['spark','hive','spark','hadoop',...]
rdd3 = sc.parallelize(c=['spark,hive,spark,hadoop', 'hive,python,java', 'flink,spark,hadoop'])

def fun4(x):
    return  x.split(',')

rdd5= rdd3.flatMap(fun4)
print(rdd5.collect())
rdd6 = rdd5.map(lambda x:(x,1))
print(rdd6.collect())

3.2.3 filter算子

rdd中的每个元素经过函数的条件判断后, 返回True对应的元素保存到新rdd中(保留True对应的元素) 应用场景: 数据清洗

"""
rdd2=rdd1.filter(lambda 参数: 参数条件判断表达式)
rdd1中的每个元素经过函数的条件判断后, 返回True对应的元素保存到新rdd2中(保留True对应的元素)
应用场景: 数据清洗
"""
from pyspark import SparkContext

# 创建sc对象
sc = SparkContext()

 # 需求: 获取rdd1中性别为男的人员的信息, 获取rdd1中年龄大于20的人员信息
rdd1 = sc.parallelize(c=[['小明', '男', 18],
						 ['小红', '女', 16],
						 ['翠花', '女', 35],
						 ['隔壁老王', '男', 40]])

rdd2=rdd1.filter(lambda x:x[1]=='男')
rdd3=rdd1.filter(lambda x:x[2]>20)
rdd4= rdd1.filter(lambda x:x[1]=='男' and x[2]>20)
print(rdd2.collect())
print(rdd3.collect())
print(rdd4.collect())

3.2.4 distinct算子

  • rdd.distinct()

对rdd中的元素进行去重操作, 返回一个新的rdd 应用场景: 数据清洗

"""
rdd2=rdd1.distinct()
对rdd1中的元素进行去重操作, 返回一个新的rdd2
应用场景: 数据清洗
"""

from pyspark import SparkContext

# 创建sc对象
sc = SparkContext()

rdd1 = sc.parallelize(c=[1, 2, 3, 2, 5, 6, 3, 7, 1])

rdd2 = rdd1.distinct()
print(rdd2.collect())
print('----------------------')
rdd3 = sc.parallelize(c=[[1, 2, 3], [2, 5, 6], [1, 2, 3]])  # 发生报错
rdd4 = rdd3.distinct()
print(rdd4.collect())

注意:我们的去重只能对一维的数据进行去重,多维的数据去重会发生报错

3.2.5 groupBy&mapValues算子

我们的 groupBy&mapValues算子通常是在一起使用的:

  • rdd.groupBy(f=自定义函数名)

对rdd中的元素经过自定义函数分组, 函数的返回值作为分组的key值, 相同key值对应的value值(rdd元素)放到一起

groupBy返回k-v格式的rdd
[(key, (value1, value2,…)),(key, (value1, value2, …)), …]
[[key, value],[key, value], …]
[((key1,key2), value), ((key1,key2),value)]
tips:自定义函数有几个返回值, 就有几组数据

  • rdd.mapValues(f=自定义函数)

对k-v格式rdd中的value值经过自定义函数处理, 将函数的返回值保存到新的rdd中

我们的groupBy通常是用于对于非k-v结构的数据进行分组的

如果我们单独执行groupBy算子,我们的value值是内存的地址

sc = SparkContext()
rdd1 = sc.parallelize(['a', 'b', 'c', 'a'])

# hash(x) -> 对字母进行hash计算, 返回整数
# hash(x) % 2 -> 返回余数 0,1
rdd2 = rdd1.groupBy(f=lambda x: hash(x) % 2)
print(rdd2.collect())

在这里插入图片描述
如果我们想要value值是具体的值的话需要使用mapVlaues算子

"""
rdd2=rdd1.groupBy(f=自定义函数名)
对rdd1中的元素经过自定义函数分组, 函数的返回值作为分组的key值, 相同key值对应的value值(rdd元素)放到一起
groupBy返回k-v格式的rdd
[(key, (value1, value2,...)),(key, (value1, value2, ...)), ...]
[[key, value],[key, value], ...]
[((key1,key2), value), ((key1,key2),value)]
tips:自定义函数有几个返回值, 就有几组数据

rdd2=rdd1.mapValues(f=自定义函数)
对k-v格式rdd中的value值经过自定义函数处理, 将函数的返回值保存到新的rdd2中
"""
from pyspark import SparkContext

# 创建sc对象
sc = SparkContext()
rdd1 = sc.parallelize(['a', 'b', 'c', 'a'])

# hash(x) -> 对字母进行hash计算, 返回整数
# hash(x) % 2 -> 返回余数 0,1
rdd2 = rdd1.groupBy(f=lambda x: hash(x) % 2)
print(rdd2.collect())


# 对k-v格式rdd中的value值进行转换
# list(x) -> 将x转换成列表类型, 获取内存地址中的数据
def func(x):
	print(x)


print(rdd2.mapValues(func).collect())
# x->k-v中的value值
rdd3 = rdd2.mapValues(f=lambda x: list(x))
print(rdd3.collect())
# 需求: 根据rdd1中的性别值进行分组
rdd1 = sc.parallelize(c=[['小明', '男', 18],
						 ['小红', '女', 16],
						 ['翠花', '女', 35],
						 ['隔壁老王', '男', 40]])
rdd2 = rdd1.groupBy(lambda x:x[1])
rdd3= rdd2.mapValues(lambda x: list(x))
print(rdd3.collect())
print("------------------------------------------")
rdd2 = rdd1.groupBy(lambda x: hash(x[1])%2)
rdd3= rdd2.mapValues(lambda x: list(x))
print(rdd3.collect())

在这里插入图片描述

3.2.6 K-V格式RDD算子

  • rdd.groupByKey()

对kv格式的rdd根据key值进行分组, 将相同key值对应value值合并到一起, 返回一个新的kv格式rdd

  • rdd.reduceByKey(func=自定义函数)

对kv格式的rd根据key值进行分组, 将相同key值对应的value值经过自定义函数进行聚合操作, 返回一个聚合后的新的kv格式rdd

  • rdd.sortByKey(ascending=True/False)

对kv格式的rdd根据key值进行排序操作, 返回排序后的新的kv格式rdd

  • rdd.sortBy(keyfunc=自定义函数,ascending=True/False)

对kv格式的rdd根据自定义函数的值进行排序操作, 返回排序后的新的kv格式rdd

"""
rdd2=rdd1.groupByKey()
对kv格式的rdd1根据key值进行分组, 将相同key值对应value值合并到一起, 返回一个新的kv格式rdd2

rdd2=rdd1.reduceByKey(func=自定义函数)
对kv格式的rdd1根据key值进行分组, 将相同key值对应的value值经过自定义函数进行聚合操作, 返回一个聚合后的新的kv格式rdd2

rdd2=rdd1.sortByKey(ascending=True/False)
对kv格式的rdd1根据key值进行排序操作, 返回排序后的新的kv格式rdd2

rdd2=rdd1.sortBy(keyfunc=自定义函数,ascending=True/False)
对kv格式的rdd1根据自定义函数的值进行排序操作, 返回排序后的新的kv格式rdd
`ascending: 排序方式, 默认升序True, 降序False`
"""

from pyspark import SparkContext

# 创建sc对象
sc = SparkContext()

# 需求: 词频统计 rdd1->['spark,hive,spark,hadoop', 'hive,python,java', 'flink,spark,hadoop']
# 统计出每个单词出现的次数, 根据次数进行降序排序, 返回新rdd2->[('spark', 5), ('hive', 3), ('hadoop', 2), ...]
rdd1 = sc.parallelize(c=['spark,hive,spark,hadoop', 'hive,python,java', 'flink,spark,hadoop'])
rdd2 = rdd1.flatMap(lambda x: x.split(','))
print(f'rdd2的结果是:{rdd2.collect()}')
rdd3 = rdd2.map(lambda x: (x, 1))
print(f'rdd3的结果是:{rdd3.collect()}')
rdd4 = rdd3.reduceByKey(lambda x, y: x + y)
print(f'rdd4的结果是:{rdd4.collect()}')
rdd5= rdd4.sortBy(keyfunc=lambda x: x[1],ascending=False)
print(f'rdd5的结果是:{rdd5.collect()}')
print("-----------------------------------------")

# # 创建kv格式rdd对象
rdd1 = sc.parallelize(c=[('a', 1), ('b', 2), ('a', 2), ('c', 3), ('b', 4)])
### groupByKey()分组操作
res = rdd1.groupByKey().mapValues(lambda x: list(x))
print(f'res的结果是:{res.collect()}')
### reduceByKey()分组聚合操作
res1 = rdd1.reduceByKey(lambda x,y:x+y)
print(f'res1的结果是:{res1.collect()}')
### 自定义值排序
res2 = res1.sortBy(keyfunc=lambda x: x[1],ascending=False)
print(f'res2的结果是:{res2.collect()}')
res3 = res1.sortBy(keyfunc=lambda x: x[1],ascending=True)
print(f'res2的结果是:{res3.collect()}')
### key排序
res2 = res1.sortByKey(ascending=False)
print(f'res2的结果是:{res2.collect()}')
res3 = res1.sortByKey(ascending=True)
print(f'res2的结果是:{res3.collect()}')

3.2. 7 关联算子

关联算子就是跟我们mysql中的关联条件是一样的,只不过是表达方式不同,但是作用是完全相同的

  • rdd.join(other=rdd2)

内连接: 交集, 合并相同key值的value值

  • rdd.leftOuterJoin(other=rdd2)

左连接: 保留左rdd中所有的value数据, 右rdd的key关联上的value保留, 关联不上的用None填充

  • rdd.rightOuterJoin(other=rdd2)

右连接: 保留右rdd中所有的value数据, 左rdd的key关联上的value保留, 关联不上的用None填充

  • rdd.fullOuterJoin(other=rdd2)

全连接(满外连接): 保留左右rdd中所有的value数据, 关联不上的用None填充

"""
rdd3=rdd1.join(other=rdd2)
内连接: 交集, 合并相同key值的value值

rdd3=rdd1.leftOuterJoin(other=rdd2)
左连接: 保留左rdd中所有的value数据, 右rdd的key关联上的value保留, 关联不上的用None填充

rdd3=rdd1.rightOuterJoin(other=rdd2)
右连接: 保留右rdd中所有的value数据, 左rdd的key关联上的value保留, 关联不上的用None填充

rdd3=rdd1.fullOuterJoin(other=rdd2)
全连接(满外连接): 保留左右rdd中所有的value数据, 关联不上的用None填充
"""

from pyspark import SparkContext

# 创建sc对象
sc = SparkContext()

# 创建kv格式rdd对象
rdd1 = sc.parallelize([("a", 1),
					   ("b", 4)])
rdd2 = sc.parallelize([("a", 2),
					   ("a", 3),
					   ('c', 5)])


## 内连接
rdd3 = rdd1.join(other=rdd2)
print(rdd3.collect())

## 左连接
rdd4= rdd1.leftOuterJoin(other=rdd2)
print(rdd4.collect())

## 右连接
rdd5= rdd1.rightOuterJoin(other=rdd2)
print(rdd5.collect())


## 全连接
rdd6 = rdd1.fullOuterJoin(other=rdd2)
print(rdd6.collect())

3.3 action算子

  • collect()

    取出所有值,大量数据慎用collect(), 容易造成内存溢出

    • rdd.collect()
  • collectAsMap()

    将rdd的k-v格式数据转换成字典格式 例如:[(‘男’,45), (‘女’,7)] -> {‘男’:45, ‘女’:7}

  • reduce()

    非k-v类型数据的相关计算

    • rdd.reduce(lambda 参数1,参数2:两个参数计算)
  • count()

    统计rdd元素个数

    • rdd.count()
  • take()

    取出指定数量值

    • rdd.take(数量)
  • saveAsTextFile()

    将结果rdd保存到hdfs的目录中, 保存的目录是不存在的, 会自动创建

    • rdd.saveAsTextFile(path)
- collect()  

  > 取出所有值,大量数据慎用collect(), 容易造成内存溢出

  - rdd.collect()

- collectAsMap()

  > 将rdd的k-v格式数据转换成字典格式 例如:[('男',45), ('女',7)] -> {'男':45, '女':7}

- reduce() 

  > 非k-v类型数据的相关计算

  - rdd.reduce(lambda 参数1,参数2:两个参数计算)

- count() 

  > 统计rdd元素个数

  - rdd.count()

- take()  

  > 取出指定数量值

  - rdd.take(数量)

- saveAsTextFile()

  > 将结果rdd保存到hdfs的目录中, 保存的目录是不存在的, 会自动创建

  - rdd.saveAsTextFile(path)
"""
action算子: 触发计算任务, 得到最终结果(不再是rdd)
rdd.collect(): 收集rdd中所有的元素, 使用前考虑rdd的数据量问题(数量大容易造成内存溢出)
rdd.take(num=): 收集指定num数量的rdd元素
rdd.takeOrdered(num=, key=): 进行升序后收集指定num数量的rdd元素
rdd.reduce(f=自定义函数): 对非k-v格式rdd进行聚合操作
rdd.count()/sum()/mean()/max()/min(): 聚合操作
rdd.countByValue(): 统计非k-v格式rdd中值出现的次数, 返回k-v格式字典(k->rdd元素值, v->次数)
rdd.saveAsTextFile(path=):
将rdd结果保存到文件中, 可以是本地磁盘路径(file://)也可以是HDFS路径(hdfs://ip:8020)
注意点: 路径是一个不存在的目录路径
rdd.collectAsMap(): 将k-v格式的rdd保存到字典中 {k1:v1, k2:v2, ...}
rdd.countByKey(): 统计k-v格式rdd中key值出现的次数, 返回k-v格式字典(k->rdd中的key值, v->次数)
"""

# 导入SparkContext类
from pyspark import SparkContext

# 创建sc对象
sc = SparkContext(appName='action_demo')

# 创建一个并行化的RDD
rdd1 = sc.parallelize([1, 4, 3, 3, 7, 7, 2, 6])

# 收集rdd中所有的元素
print(rdd1.collect())

# 收集指定num数量的rdd元素
print(rdd1.take(num=3))

# 进行升序后收集指定num数量的rdd元素
print(rdd1.takeOrdered(num=3, key=lambda x: -x))

# 对非k-v格式rdd进行聚合操作
print(rdd1.reduce(lambda x, y: x + y))

# 聚合操作
print(rdd1.count())
print(rdd1.max())
print(rdd1.mean())
print(rdd1.min())
print(rdd1.stdev())

# 统计非k-v格式rdd中值出现的次数, 返回k-v格式字典(k->rdd元素值, v->次数)
print(rdd1.countByValue())

# 将rdd结果保存到文件中
rdd1.saveAsTextFile(path='/data/date')

# 创建一个并行化的k-v格式RDD
rdd2 = sc.parallelize([('a', 10), ('b', 20), ('c', 30)])

# 将k-v格式的rdd保存到字典中
print(rdd2.collectAsMap())

# 统计k-v格式rdd中key值出现的次数, 返回k-v格式字典
print(rdd2.countByKey())

在这里插入图片描述

💕💕在这篇文章中,我们深入探讨了RDD的相关使用,希望能为读者带来启发和收获。
💖💖感谢大家的阅读,如果您有任何疑问或建议,欢迎在评论区留言交流。同时,也请大家关注我的后续文章,一起探索更多知识领域。
愿我们共同进步,不断提升自我。💞💞💞

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1972284.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

第三十一天 chrome调试工具

打开调试工具 页面空白处右击 检查 或者F12 使用调试工具 ctrl滚轮改变代码大小 左边是html 右边是css css可以直接改动数值左右箭头或者直接输入 查看颜色 ctrl0 复原浏览器大小 点击元素右侧出现样式引入 没有的话 说明类名或者样式引用错误 这里的.new-left是存在的 如果类…

OpenStack;异构算力网络架构;算力服务与交易技术;服务编排与调度技术

目录 OpenStack 一、OpenStack概述 二、OpenStack的主要组件及功能 三、OpenStack的架构 四、OpenStack的应用场景 异构算力网络架构 算力服务与交易技术 服务编排与调度技术 OpenStack 是一个开源的云计算管理平台项目,由NASA(美国国家航空航天局)和Rackspace合作…

数学建模评价类模型—层次分析法(无数据情况下)

文章目录 前言一、pandas是什么?二、使用步骤 1.引入库2.读入数据总结 前言 本文将讲解解决评价类问题的第一种模型层次分析法(AHP法),首先我们会具体讲解评价类问题解答的具体流程再对AHP方法进行讲解 一、评价类问题概述 评价…

DB管理客户端navicat、DBever、DbVisualizer数据库连接信息迁移

DB管理客户端Navicat、DBever、DbVisualizer数据库连接信息迁移 第三方数据库连接工具为了确保数据库信息安全通常对保存的数据库连接密码进行加密,填入后想再拿到原文就不可能了,有时交接给别人或者换电脑时可以通过连接数据导出的方式来解决。 Navic…

echarts 极坐标柱状图 如何定义柱子颜色

目录 echarts 极坐标柱状图 如何定义柱子颜色问题描述方式一 在 series 数组中定义颜色方式二 通过 colorBy 和 color 属性配合使用 echarts 极坐标柱状图 如何定义柱子颜色 本文将分享在使用 echarts 的 极坐标柱状图 时,如何自定义柱子的颜色。问题本身并不难解决…

URL中的中文编码与解码

URL在传输时只能使用ACSII码表示,且ASCII码只有128位,无法存储汉字等字符,因此对于这些非ASCII码字符需要进行编码处理,以保证URL的完整性 Python中urllib.parse模块提供了两个方法quote和unquote可用于URL中的中文编码与解码 以…

全网最强Docker教程 | 万字长文爆肝Docker教程

Docker 官方文档地址:https://www.docker.com/get-started 中文参考手册:https://docker_practice.gitee.io/zh-cn/ 1.什么是 Docker 1.1 官方定义 最新官网首页 # 1.官方介绍 - We have a complete container solution for you - no matter who you are and where you ar…

攻防世界-MISC-心仪的公司-wireshark流量分析

心仪的公司 下载后发现是wireshark文件,打开: 源Ip为192.168.1.111 筛选指令: tcp contains"shell" && ip.src192.168.1.111 筛选http流同样能得到flag:

Python兼职接单全攻略:掌握技能,拓宽收入渠道

引言 随着Python在数据处理、Web开发、自动化办公、爬虫技术等多个领域的广泛应用,越来越多的人开始利用Python技能进行兼职接单,以此拓宽收入渠道。本文将详细介绍Python兼职接单的注意事项、所需技能水平、常见单子类型、接单途径及平台,帮…

一文读懂企业数字化涉及的四种架构:业务架构、应用架构、技术架构、数据架构

在当今数字化转型的时代,企业面临着前所未有的挑战与机遇。为了应对这些变化,构建一套高效、灵活且可扩展的企业级架构变得尤为重要。本文将详细介绍 业务架构、应用架构、技术架构 和 数据架构,并结合实际案例进行阐述,帮助读者更…

【MySQL】最左前缀匹配原则

目录 准备库表 结果集在索引列中的查询 1. explain select a,b,c from t where a1; 2. explain select a,b,c from t where a1 AND b2; 3. explain select a,b,c from t where a1 order by b; 4. explain select a,b,c from t where a1 order by d; 5. explain select a…

css 数字平铺布局

效果图 <!DOCTYPE html> <html> <head><meta charset"utf-8"><title>活动中心</title><meta name"viewport" content"maximum-scale1.0,minimum-scale1.0,user-scalable0,widthdevice-width,initial-scale1.0…

团队Bug管理:7个顶级工具帮手

本文将分享2024年值得关注的7款Bug管理工具&#xff1a;PingCode、Worktile、Tower、禅道、Coding、Bugzilla、Jira。 在面对日益复杂的软件开发项目时&#xff0c;管理Bug成为了一个让许多团队头疼的问题。选择一个合适的Bug管理工具不仅可以提高团队的效率&#xff0c;还能显…

Apache IoTDB 论文入选数据库国际顶会 ICDE 2024

近日&#xff0c;Apache IoTDB 的研究成果论文&#xff1a;On Tuning Raft for IoT Workload in Apache IoTDB&#xff08;《在 Apache IoTDB 中针对物联网工作负载调整 Raft 共识协议》&#xff09;被数据库领域国际顶级学术会议 The 40th IEEE International Conference on D…

Netty 必知必会(六)—— 粘包拆包问题

tcp粘包、半包怎么解决的&#xff08;LineBased和LengthBased,我是用的是LineBased&#xff09;为什么要使用LineBased&#xff0c;怎么分割的&#xff08;/r/n&#xff0c;当时没有考虑太多&#xff0c;觉得这个比较简单&#xff09;Netty解决粘包的几种方式Netty 拆包粘包的实…

SD-WAN组网加速ZOOM视频会议

随着远程办公和在线教育的普及&#xff0c;视频会议已成为人们日常沟通的重要工具。然而&#xff0c;网络不稳定、延迟高和带宽不足等问题常常影响ZOOM视频会议的体验。为了有效解决这些问题&#xff0c;SD-WAN组网开始被应用于ZOOM视频会议加速。 那么&#xff0c;SD-WAN具体是…

西门子DNC 程序传输

西门子的 840DSL 828D Siemens ONE DNC程序传输大概可以有几种方式实现&#xff1a; 1.FTP方式 缺点&#xff1a;每台设备都需要开通授权 优点&#xff1a;设置简单 2.共享文件夹 缺点&#xff1a;如果上位机联网容易中病毒 优点&#xff1a;免费 3.直接传送程序文件到NCU 缺…

图书管理系统初实现

目录 实现过程&#xff1a; 运行结果&#xff1a; 从三个模块来实现图书管理系统&#xff1a;书本、用户、实现的功能 实现过程&#xff1a; 首先在Book包下定义一个book类&#xff0c;包含书名、作者、价格、类型、是否借出成员变量。 这些成员变量都是私有的&#xff0c;…

算法第十六天:leetcode349.两个数组的交集

一、两个数组的交集的题目描述与链接 349.两个数组的交集如下表所示&#xff0c;您可以直接复制下面网址进入力扣学习&#xff0c;在观看下面的内容之前您一定要先做一遍哦&#xff0c;以便让你印象更加深刻&#xff01; https://leetcode.cn/problems/intersection-of-two-a…

Unity强化工程 之 音效

本文仅作笔记学习和分享&#xff0c;不用做任何商业用途 本文包括但不限于unity官方手册&#xff0c;unity唐老狮等教程知识&#xff0c;如有不足还请斧正 首先&#xff0c;音频这块组件较少&#xff0c;但是内容很重要&#xff0c;因为对于任何一款非特殊面向人群的游戏来说&a…