欢迎大家订阅【Python从入门到精通】专栏,一起探索Python的无限可能!
文章目录
- 前言
- 一、map算子
- 二、flatMap算子
- 三、reduceByKey算子
- 四、filter算子
- 五、distinct算子
- 六、sortBy算子
- 七、综合案例
前言
在大数据处理的时代,Apache Spark以其高效的数据处理能力和灵活的编程模型,成为了数据科学家和工程师的热门选择。PySpark作为Spark的Python接口,使得数据处理和分析更加直观和便捷。本文详细讲解了PySpark中的常用RDD算子,包括map、flatMap、reduceByKey、filter、distinct和sortBy。
本篇文章参考:黑马程序员
在 PySpark 中,所有的数据计算都是基于 RDD(弹性分布式数据集)对象进行的。RDD 提供了丰富的成员方法(算子)来执行各种数据处理操作。
一、map算子
定义:
map
算子会对RDD中的每个元素应用一个用户定义的函数,并返回一个新的 RDD。
语法:
new_rdd = rdd.map(func)
参数func
为一个函数,该函数接受单个输入参数,并返回一个输出值,其函数表示法为f:(T) → U
- f:表示这是一个函数(方法)
- T:表示传入参数的类型,可以是任意类型
- U:表示返回值的类型,可以是任意类型
- (T)-U:表示该方法接受一个参数(类型为 T),返回值的类型为 U
【注意】
import os
from pyspark import SparkConf, SparkContext
# os.environ['PYSPARK_PYTHON'] =“自己电脑Python.exe的安装路径”,用于指定Python解释器
os.environ['PYSPARK_PYTHON'] = "D:Study\Paython\python.exe"
conf = SparkConf().setMaster("local[*]").setAppName("test_spark")
sc = SparkContext(conf=conf)
# 准备一个RDD
rdd = sc.parallelize([1, 2, 3, 4, 5])
# 通过map方法将全部数据都乘以10
def func(data):
return data * 10
print(rdd2.collect())
输出结果:
[10,20,30,40,50]
【分析】
rdd.map(func) 创建一个新的RDD对象rdd2,其中每个元素都会通过map
算子应用函数 func
。因此,原始 RDD 中的每个元素(1, 2, 3, 4, 5)都会依次被传入 func
函数并处理:
func(1) 产生 10
func(2) 产生 20
func(3) 产生 30
func(4) 产生 40
func(5) 产生 50
结果是新的RD 对象rdd2 ,包含的元素为 [10, 20, 30, 40, 50]。
【拓展】
链式调用:在编程中将多个方法或函数的调用串联在一起的方式。
在 PySpark 中,链式调用非常常见,通常用于对 RDD 进行一系列变换或操作。通过链式调用,开发者可以在一条语句中连续执行多个操作,不需要将每个操作的结果存储在一个中间变量中,从而提高代码的简洁性和可读性。
例如:
from pyspark import SparkConf, SparkContext
import os
os.environ['PYSPARK_PYTHON'] = "D:\桌面\Study\Paython\learning\pythonProject\.venv\Scripts\python.exe"
conf = SparkConf().setMaster("local[*]").setAppName("test_spark")
sc = SparkContext(conf=conf)
# 准备一个RDD
rdd = sc.parallelize([1, 2, 3, 4, 5])
# 通过map方法将全部数据都乘以10,然后都加上5
# 链式调用
rdd2 = rdd.map(lambda x: x * 10).map(lambda x: x + 5)
print(rdd2.collect())
输出结果:
[15, 25, 35, 45, 55]
【分析】
第一个map
算子接收一个 lambda 函数,这个函数将传入的每个元素乘以 10;第二个map
算子在第一个map
的结果上再次调用新的 lambda 函数,每个元素再加上 5。处理后的结果为:10 + 5, 20 + 5, 30 + 5, 40 + 5, 50 + 5,即 15, 25, 35, 45, 55。
二、flatMap算子
定义:
flatMap
算子将输入RDD中的每个元素映射到一个序列,然后将所有序列扁平化为一个单独的RDD。简单来说,就是对rdd执行map操作,然后进行解除嵌套操作。
语法:
new_rdd = rdd.flatMap(func)
from pyspark import SparkConf, SparkContext
import os
os.environ['PYSPARK_PYTHON'] = "D:\桌面\Study\Paython\learning\pythonProject\.venv\Scripts\python.exe"
conf = SparkConf().setMaster("local[*]").setAppName("test_spark")
sc = SparkContext(conf=conf)
rdd = sc.parallelize(["hi python","Hello world","Happy day"])
# 需求将RDD数据里面的单词一个个提取出来
rdd2=rdd.map(lambda x:x.split(" "))
print(rdd2.collect())
sc.stop()
输出结果:
[[“hi python”],[“Hello world”],[“Happy day”]]
【分析】
map
算子执行过程如下:
对于第一个元素 “hi python”,通过 split(" “)得到的结果是 [“hi”, “python”];
对于第二个元素 “Hello world”,通过 split(” “)得到的结果是 [“Hello”, “world”];
对于第三个元素 “Happy day”,通过 split(” ")得到的结果是 [“Happy”, “day”];
显而易见,输出的结果不满足我们的需求,我们运用flatMap
算子,将rdd2=rdd.map(lambda x:x.split(" "))
改为如下代码后
rdd2=rdd.flatmap(lambda x:x.split(" "))
输出结果:
[‘hi’, ‘python’, ‘Hello’, ‘world’, ‘Happy’, ‘day’]
flatMap
算子会将结果扁平化为单一列表,适合于需要展开嵌套结构的场景。
三、reduceByKey算子
定义:
reduceByKey
算子用于将具有相同键的值进行合并,并通过指定的聚合函数生成一个新的键值对 RDD。
语法:
new_rdd = rdd.reduceByKey(func)
参数func
是一个用于合并两个相同键的值的函数,其接收两个相同类型的参数并返回一个相同类型的值,其函数表示法为 f:(V,V)→>V
- f: 函数的名称或标识符
- (V, V):表示函数接收两个相同类型的参数
- → V:表示函数的返回值类型
from pyspark import SparkConf, SparkContext
import os
os.environ['PYSPARK_PYTHON'] = "D:\桌面\Study\Paython\learning\pythonProject\.venv\Scripts\python.exe"
conf = SparkConf().setMaster("local[*]").setAppName("test_spark")
sc = SparkContext(conf=conf)
# reduceByKey算子
rdd=sc.parallelize([('男',99),('男',88),('女',99),('女',66)])
# 求男生和女生两个组的成绩之和
rdd2=rdd.reduceByKey(lambda a,b:a+b)
print(rdd2.collect())
sc.stop()
输出结果:
[(‘男’,187), (‘女’,165)]
【分析】
reduceByKey
算子根据每个不同的键调用匿名函数 lambda a, b: a + b,将其接受两个参数相加。
对于键 ‘男’:
首先处理到的值是 99,然后是 88;
使用 lambda a, b: a + b,即 99 + 88 = 187。
对于键 ‘女’:
首先处理到的值是 99,然后是 66;
使用 lambda a, b: a + b,即 99 + 66 = 165。
四、filter算子
定义:
filter
算子根据给定的布尔函数过滤RDD中的元素,返回一个只包含满足条件的元素的新RDD。
语法:
new_rdd = rdd.filter(func)
参数func
是一个函数,用于接收 RDD 中的每个元素,并返回一个布尔值(True 或 False)。
- 如果返回 True,则该元素会被保留在新 RDD 中
- 如果返回 False,则该元素会被过滤掉
from pyspark import SparkConf, SparkContext
import os
os.environ['PYSPARK_PYTHON'] = "D:\桌面\Study\Paython\learning\pythonProject\.venv\Scripts\python.exe"
conf = SparkConf().setMaster("local[*]").setAppName("test_spark")
sc = SparkContext(conf=conf)
# filter算子
rdd = sc.parallelize([1, 2, 3, 4, 5])
# 过滤RDD数据中的奇数,仅保留偶数
rdd2=rdd.filter(lambda num:num%2==0)
print(rdd2.collect())
sc.stop()
输出结果:
[2, 4]
五、distinct算子
定义:
distinct
算子对RDD数据进行去重,返回一个新的RDD。
语法:
new_rdd = rdd.distinct()
from pyspark import SparkConf, SparkContext
import os
os.environ['PYSPARK_PYTHON'] = "D:\桌面\Study\Paython\learning\pythonProject\.venv\Scripts\python.exe"
conf = SparkConf().setMaster("local[*]").setAppName("test_spark")
sc = SparkContext(conf=conf)
# distinct算子
rdd = sc.parallelize([1, 2, 2, 5, 5, 6])
# 对RDD数据进行去重
rdd2=rdd.distinct()
print(rdd2.collect())
sc.stop()
输出结果:
[1, 2, 5, 6]
六、sortBy算子
定义:
sortBy
算子根据指定的键对元素进行排序。
语法:
new_rdd = rdd.sortBy(func, ascending=True, numPartitions=None)
- 参数:func:用于指定排序依据的函数
- 参数ascending:指定排序的顺序,True 表示升序排序(默认值);False 表示降序排序
- 参数numPartitions:可选参数,指定分区数
from pyspark import SparkConf, SparkContext
import os
os.environ['PYSPARK_PYTHON'] = "D:\桌面\Study\Paython\learning\pythonProject\.venv\Scripts\python.exe"
conf = SparkConf().setMaster("local[*]").setAppName("test_spark")
sc = SparkContext(conf=conf)
# 创建了一个包含四个元组的 RDD
rdd=sc.parallelize([('小明',99),('小红',88),('小城',99),('小李',66)])
# 使用 sortBy 方法将 RDD 按照分数(元组中的第二个元素)进行降序排序
rdd2=rdd.sortBy(lambda x:x[1],ascending=False,numPartitions=1)
print(rdd2.collect())
sc.stop()
输出结果:
[(‘小明’, 99), (‘小城’, 99), (‘小红’, 88), (‘小李’, 66)]
【注意】
如果多个元素具有相同的键(如这里的 99),sortBy
算子会保持这些元素在原始 RDD 中的相对顺序(稳定排序)。
七、综合案例
【案例一】
运用前面所有学内容学习到的内容,完成test.txt文件的读取并统计文件内各个单词的出现次数。(假设该文本文件的完整路径为:D:/test.txt)
# 1.构建执行环境入口对象
from pyspark import SparkContext,SparkConf
import os
os.environ['PYSPARK_PYTHON']="D:/Tools/python3.10/python.exe"
conf=SparkConf().setMaster("local[*]").setAppName("test_spark")
sc=SparkContext(conf=conf)
# 2.读取数据
rdd=sc.textFile("D:/test.txt")
# 3.取出全部单词
word_rdd=rdd.flatMap(lambda x:x.split(" "))
# print(word_rdd.collect())
# 输出结果:['hello', 'python', 'happy', 'day', 'hello', 'world', 'hello', 'world', 'hello', 'python', 'happy', 'day', 'happy', 'day', 'hello', 'world', 'hello', 'python', 'happy', 'happy', 'happy', 'day']
# 4.将所有单词都转换为二元元组,格式为 (word, 1),这里的 word 是单词,1 用于计数
word_with_one_rdd=word_rdd.map(lambda word:(word,1))
# print(word_with_one_rdd.collect())
# 输出结果:[('hello', 1), ('python', 1), ('happy', 1), ('day', 1), ('hello', 1), ('world', 1), ...]
# 5.分组并求和
# 使用 reduceByKey 方法按单词(key)聚合并求和
result_rdd=word_with_one_rdd.reduceByKey(lambda a,b:a+b)
# 6.打印输出结果
print(result_rdd)
输出结果:
[(‘hello’, 6), (‘python’, 3), (‘happy’, 6), (‘day’, 4), (‘world’, 3)]
【案例二】
复制以下JSON类型的数据到test.txt文件中(假设该文本文件的完整路径为:D:/test.txt),使用Spark读取文件并计算以下内容:
①各个城市销售额排名,从大到小排列;
②全部城市有哪些商品类别在售卖;
③北京市有哪些商品类别在售卖。
"""
{"id":1,"timestamp":"2019-05-08T01:03.00Z","category":"平板电脑","areaName":"北京","money":"1450"}|{"id":2,"timestamp":"2019-05-08T01:01.00Z","category":"手机","areaName":"北京","money":"1450"}|{"id":3,"timestamp":"2019-05-08T01:03.00Z","category":"手机","areaName":"北京","money":"8412"}
{"id":4,"timestamp":"2019-05-08T05:01.00Z","category":"电脑","areaName":"上海","money":"1513"}|{"id":5,"timestamp":"2019-05-08T01:03.00Z","category":"家电","areaName":"北京","money":"1550"}|{"id":6,"timestamp":"2019-05-08T01:01.00Z","category":"电脑","areaName":"杭州","money":"1550"}
{"id":7,"timestamp":"2019-05-08T01:03.00Z","category":"电脑","areaName":"北京","money":"5611"}|{"id":8,"timestamp":"2019-05-08T03:01.00Z","category":"家电","areaName":"北京","money":"4410"}|{"id":9,"timestamp":"2019-05-08T01:03.00Z","category":"家具","areaName":"郑州","money":"1120"}
{"id":10,"timestamp":"2019-05-08T01:01.00Z","category":"家具","areaName":"北京","money":"6661"}|{"id":11,"timestamp":"2019-05-08T05:03.00Z","category":"家具","areaName":"杭州","money":"1230"}|{"id":12,"timestamp":"2019-05-08T01:01.00Z","category":"书籍","areaName":"北京","money":"5550"}
{"id":13,"timestamp":"2019-05-08T01:03.00Z","category":"书籍","areaName":"北京","money":"5550"}|{"id":14,"timestamp":"2019-05-08T01:01.00Z","category":"电脑","areaName":"北京","money":"1261"}|{"id":15,"timestamp":"2019-05-08T03:03.00Z","category":"电脑","areaName":"杭州","money":"6660"}
{"id":16,"timestamp":"2019-05-08T01:01.00Z","category":"电脑","areaName":"天津","money":"6660"}|{"id":17,"timestamp":"2019-05-08T01:03.00Z","category":"书籍","areaName":"北京","money":"9000"}|{"id":18,"timestamp":"2019-05-08T05:01.00Z","category":"书籍","areaName":"北京","money":"1230"}
{"id":19,"timestamp":"2019-05-08T01:03.00Z","category":"电脑","areaName":"杭州","money":"5551"}|{"id":20,"timestamp":"2019-05-08T01:01.00Z","category":"电脑","areaName":"北京","money":"2450"}
{"id":21,"timestamp":"2019-05-08T01:03.00Z","category":"食品","areaName":"北京","money":"5520"}|{"id":22,"timestamp":"2019-05-08T01:01.00Z","category":"食品","areaName":"北京","money":"6650"}
{"id":23,"timestamp":"2019-05-08T01:03.00Z","category":"服饰","areaName":"杭州","money":"1240"}|{"id":24,"timestamp":"2019-05-08T01:01.00Z","category":"食品","areaName":"天津","money":"5600"}
{"id":25,"timestamp":"2019-05-08T01:03.00Z","category":"食品","areaName":"北京","money":"7801"}|{"id":26,"timestamp":"2019-05-08T01:01.00Z","category":"服饰","areaName":"北京","money":"9000"}
{"id":27,"timestamp":"2019-05-08T01:03.00Z","category":"服饰","areaName":"杭州","money":"5600"}|{"id":28,"timestamp":"2019-05-08T01:01.00Z","category":"食品","areaName":"北京","money":"8000"}|{"id":29,"timestamp":"2019-05-08T02:03.00Z","category":"服饰","areaName":"杭州","money":"7000"}
"""
# 构建执行环境入口对象
from pyspark import SparkContext,SparkConf
import os
import json
os.environ['PYSPARK_PYTHON']="D:/Tools/python3.10/python.exe"
conf=SparkConf().setMaster("local[*]").setAppName("test_spark")
sc=SparkContext(conf=conf)
# 1.1 读取数据得到 RDD
file_rdd=sc.textFile("D:/test.txt")
# 1.2 取出一个个JSON字符串
json_str_rdd=file_rdd.flatMap(lambda x:x.split("|"))
# print(json_str_rdd.collect())
# 输出结果:['{"id":1,"timestamp":"2019-05-08T01:03.00Z","category":"平板电脑","areaName":"北京","money":"1450"}', '{"id":2,"timestamp":"2019-05-08T01:01.00Z","category":"手机","areaName":"北京","money":"1450"}', ……]
# 1.3 将一个个JSON字符串转换为字典
dict_rdd=json_str_rdd.map(lambda x:json.loads(x))
print(dict_rdd.collect())
# 输出结果:[{"id": 1, "timestamp": "2019-05-08T01:03.00Z", "category": "平板电脑", "areaName": "北京", "money": "1450"}, {"id": 2, "timestamp": "2019-05-08T01:01.00Z", "category": "手机", "areaName": "北京", "money": "1450"}, ……]
# 1.4 取出城市和销售额数据
# 将一个完整的字典转换为一个格式为(城市,销售额)的二元元组
city_with_money_rdd=dict_rdd.map(lambda x: (x["areaName"],int(x["money"])))
# print(city_with_money_rdd.collect())
# 输出结果:[('北京', 1450), ('北京', 1450), ……]
# 1.5 按城市分组按销售额聚合
city_result_rdd=city_with_money_rdd.reduceByKey(lambda a,b:a+b)
# 1.6 按销售额聚合结果排序
reslut_rdd=city_result_rdd.sortBy(lambda x:x[1],ascending=False,numPartitions=1)
print("题①的结果:",reslut_rdd.collect())
# 2.1 取出全部的商品类别并去重
category_rdd=dict_rdd.map(lambda x:x['category']).distinct()
print("题②的结果:",category_rdd.collect())
# 3.1 过滤北京市的数据
beijing_data_rdd=dict_rdd.filter(lambda x:x['areaName'=='北京'])
# 3.2 取出全部商品类别并去重
beijing_data_rdd.map(lambda x:x['category']).distinct()
print("题③的结果:",beijing_data_rdd.collect())
输出结果:
题①的结果:[(‘北京’,91556),(‘杭州’,28831),(‘天津’,12260),(‘上海’,1513),(郑州,1120)]
题②的结果:[‘平板电脑’,‘家电’,‘书籍’,‘手机’,‘电脑’,‘家具’,‘食品’,‘服饰’]
题③的结果:[‘平板电脑’,‘家电’,‘书籍’,‘手机’,‘电脑’,‘家具’,‘食品’,'服饰’]