目录
一.回顾
二.数据计算
map算子
演示
flatMap算子
演示
reduceByKey算子
演示
练习案例1
需求
解决步骤
完整代码
filter算子
演示
distinct算子
演示
sortBy算子
演示
练习案例2
解决步骤
完整代码
三.总结
一.回顾
1.RDD对象是什么?为什么要使用它?
RDD对象称之为分布式弹性数据集,是PySpark中数据计算的载体,它可以:
- 提供数据存储
- 提供数据计算的各类方法
- 数据计算的方法,返回值依旧是RDD (RDD迭代计算)
后续对数据进行各类计算,都是基于RDD对象进行
2.如何输入数据到Spark(即得到RDD对象)
- 通过SparkContext的parallelize成员方法,将Python数据容器转换为RDD对象
- 通过SparkContext的textFile成员方法,读取文本文件得到RDD对象
二.数据计算
PySpark的数据计算,都是基于RDD对象来进行的,那么如何进行呢?自然是依赖,RDD对象内置丰富的:成员方法(算子)
介绍几种常见的成员方法(算子)如下:
- map方法
- flatmap方法
- reduceByKey方法
- filter方法
- distinct方法
- sortBy方法
map算子
功能: map算子,是将RDD的数据一条条处理(处理的逻辑基于map算子中接收的处理函数),返回新的RDD
语法:
演示
这样运行会报错,这是因为 Spark 中支持环境变量,设置一个环境变量明确告诉他,python在哪就可以了。
如上图,告诉spark运行时,在哪找到python解释器就行,
打开设置你就可以看到你的python解释器的路径,然后导入os包,设置环境就行
代码如下
#导包
from pyspark import SparkConf,SparkContext
import os
os.environ['PYSPARK_PYTHON']="C:/Users/hawa/AppData/Local/Programs/Python/Python39/python.exe"
#创建SparkConf类对象
conf=SparkConf().setMaster("local[*]").setAppName("test_spark")
#基于SparkConf类对象创建SparkContext对象
sc=SparkContext(conf=conf)
rdd=sc.parallelize([1,2,3,4,5])
def func(data):
return data*10
rdd2=rdd.map(func)
print(rdd2.collect())
这里的函数定义,我们可以用匿名函数(lambda)更简洁
#导包
from pyspark import SparkConf,SparkContext
import os
os.environ['PYSPARK_PYTHON']="C:/Users/hawa/AppData/Local/Programs/Python/Python39/python.exe"
#创建SparkConf类对象
conf=SparkConf().setMaster("local[*]").setAppName("test_spark")
#基于SparkConf类对象创建SparkContext对象
sc=SparkContext(conf=conf)
rdd=sc.parallelize([1,2,3,4,5])
# def func(data):
# return data*10
rdd2=rdd.map(lambda x:x*10)
print(rdd2.collect())
各算子之间还可以进行链式调用
flatMap算子
功能:对rdd执行map操作,然后进行解除嵌套操作.
解除嵌套:
演示代码
演示
我们先用map试试看是什么结果
我们可以看到没有解除嵌套
再用flatMap试试
#导包
from pyspark import SparkConf,SparkContext
import os
os.environ['PYSPARK_PYTHON']="C:/Users/hawa/AppData/Local/Programs/Python/Python39/python.exe"
#创建SparkConf类对象
conf=SparkConf().setMaster("local[*]").setAppName("test_spark")
#基于SparkConf类对象创建SparkContext对象
sc=SparkContext(conf=conf)
#准备一个RDD
rdd=sc.parallelize(["asdf 3rwe dff","sdf 3er gwet","q3w dg xgwe"])
#需求:把RDD数据里面的一个个单词提取出来
rdd2=rdd.flatMap(lambda x:x.split(" "))
print(rdd2.collect())
结果是
reduceByKey算子
功能:针对KV型RDD,自动按照key分组,然后根据你提供的聚合逻辑,完成组内数据(value)的聚合操作。
用法
演示代码
注意: reduceByKey中接收的函数,只负责聚合,不理会分组
分组是自动by key来分组的.
reduceBeKey中的聚合逻辑是:
演示
#导包
from pyspark import SparkConf,SparkContext
import os
os.environ['PYSPARK_PYTHON']="C:/Users/hawa/AppData/Local/Programs/Python/Python39/python.exe"
#创建SparkConf类对象
conf=SparkConf().setMaster("local[*]").setAppName("test_spark")
#基于SparkConf类对象创建SparkContext对象
sc=SparkContext(conf=conf)
#准备一个RDD
rdd=sc.parallelize([("男",99),("男",88),("女",78),("女",100)])
#需求:分别求出男女的成绩之和
rdd2=rdd.reduceByKey(lambda x,y:x+y)
print(rdd2.collect())
结果是
练习案例1
需求
读取文件,统计文件中各单词出现的次数
演示代码
解决步骤
1.用textFile读取文本文件
2.用flatMap把读取到的单词都一一提取出来
3.用map将所有单词都转换为二元元组,单词为key,value设置为1
4.用reduceByKey进行分组并求和
这样就完成了需求
完整代码
#导包
from pyspark import SparkConf,SparkContext
import os
os.environ['PYSPARK_PYTHON']="C:/Users/hawa/AppData/Local/Programs/Python/Python39/python.exe"
#创建SparkConf类对象
conf=SparkConf().setMaster("local[*]").setAppName("test_spark")
#基于SparkConf类对象创建SparkContext对象
sc=SparkContext(conf=conf)
#准备一个RDD读取数据
rdd=sc.textFile("D:/hello.txt")
#取出所有单词
rdd2=rdd.flatMap(lambda x:x.split(" "))
#将所有单词都转换为二元元组,单词为key,value设置为1
word_with_one_rdd=rdd2.map(lambda x:(x,1))
#分组并求和
result_rdd=word_with_one_rdd.reduceByKey(lambda x,y:x+y)
print(result_rdd.collect())
filter算子
功能:过滤想要的数据进行保留
语法:
演示代码
演示
#导包
from pyspark import SparkConf,SparkContext
import os
os.environ['PYSPARK_PYTHON']="C:/Users/hawa/AppData/Local/Programs/Python/Python39/python.exe"
#创建SparkConf类对象
conf=SparkConf().setMaster("local[*]").setAppName("text_spark")
#基于SparkConf类对象创建SparkContext对象
sc=SparkContext(conf=conf)
#创建RDD对象
rdd=sc.parallelize([1,2,3,4,5,6,7,8])
rdd2=rdd.filter(lambda x:x%2==0)
print(rdd2.collect())
结果是
distinct算子
功能:对RDD数据进行去重,返回新RDD
语法:rdd.distinct()无需传参
演示代码
演示
#导包
from pyspark import SparkConf,SparkContext
import os
os.environ['PYSPARK_PYTHON']="C:/Users/hawa/AppData/Local/Programs/Python/Python39/python.exe"
#创建SparkConf类对象
conf=SparkConf().setMaster("local[*]").setAppName("text_spark")
#基于SparkConf类对象创建SparkContext对象
sc=SparkContext(conf=conf)
#创建RDD对象
rdd=sc.parallelize([1,1,1,2,3,3,3,5,4,6,6,6])
rdd2=rdd.distinct()
print(rdd2.collect())
结果是
sortBy算子
功能:对RDD数据进行排序,基于你指定的排序依据
语法:
演示
就用上面那个练习,把输出的单词个数进行排序
#导包
from pyspark import SparkConf,SparkContext
import os
os.environ['PYSPARK_PYTHON']="C:/Users/hawa/AppData/Local/Programs/Python/Python39/python.exe"
#创建SparkConf类对象
conf=SparkConf().setMaster("local[*]").setAppName("test_spark")
#基于SparkConf类对象创建SparkContext对象
sc=SparkContext(conf=conf)
#准备一个RDD读取数据
rdd=sc.textFile("D:/hello.txt")
#取出所有单词
rdd2=rdd.flatMap(lambda x:x.split(" "))
#将所有单词都转换为二元元组,单词为key,value设置为1
word_with_one_rdd=rdd2.map(lambda x:(x,1))
#分组并求和
result_rdd=word_with_one_rdd.reduceByKey(lambda x,y:x+y)
#对结果进行排序,降序输出
final_rdd=result_rdd.sortBy(lambda x:x[1],ascending=False,numPartitions=1)
print(final_rdd.collect())
结果是
练习案例2
需求:复制以上内容到文件中,使用Spark读取文件进行计算:
- 各个城市销售额排名,从大到小
- 全部城市,有哪些商品类别在售卖
- 北京市有哪些商品类别在售卖
这文件里面的json数据,每一条数据都有‘|’这样一个分隔,所以到时候要用split先分开,再把json数据转为python中的字典
解决步骤
1.先用split取出一个个json字符串
2.把取出来的json字符串转为字典
3.需求1:城市销售额排名。做二元元组(城市,销售额),然后分组聚合,排序
4.需求2:全部城市有哪些商品在售卖。先用map把每条数据的“category”提取出来,再用distinct去重
5.需求3:北京市有哪些商品在售卖。先用filter过滤出北京的所有数据,再用map得到北京中的所有“category”再用distinct去重
完整代码
#导包
from pyspark import SparkConf,SparkContext
import os
os.environ['PYSPARK_PYTHON']="C:/Users/hawa/AppData/Local/Programs/Python/Python39/python.exe"
import json
#创建SparkConf类对象
conf=SparkConf().setMaster("local[*]").setAppName("test_spark")
#基于SparkConf类对象创建SparkContext对象
sc=SparkContext(conf=conf)
#准备一个RDD读取数据
rdd=sc.textFile("D:/orders.txt")
#需求1:城市销售额排名
#取出一个个json字符串
json_str_rdd=rdd.flatMap(lambda x:x.split("|"))
# 把json转换为字典
dict_rdd=json_str_rdd.map(lambda x:json.loads(x))
#取出城市、销售额作为二元元组(城市,销售额)
city_with_money_rdd=dict_rdd.map(lambda x:(x["areaName"],int(x["money"])))
#按城市分组,并把销售额加起来
city_result_rdd=city_with_money_rdd.reduceByKey(lambda x,y:x+y)
#按销售额聚合结果排序
reslut1_rdd=city_result_rdd.sortBy(lambda x:x[1],ascending=False,numPartitions=1)
print("需求1的结果是:",reslut1_rdd.collect())
#需求2:全部城市有哪些商品在售卖
category_rdd=dict_rdd.map(lambda x:x["category"]).distinct()
print("需求2的结果是:",category_rdd.collect())
#需求3:北京市有那些商品在售卖
#先选出北京市的数据
beijing_data_rdd=dict_rdd.filter(lambda x:x["areaName"]=="北京")
#取出全部商品
result3_rdd=beijing_data_rdd.map(lambda x:x["category"]).distinct()
print("需求3的结果是:",result3_rdd.collect())
三.总结
1. map算子(成员方法)
- 接受一个处理函数,可用lambda表达式快速编写
- 对RDD内的元素逐个处理,并返回一个新的RDD
2.链式调用
对于返回值是新RDD的算子,可以通过链式调用的方式多次调用算子。
3.flatMap算子
- 计算逻辑和map一样
- 可以比map多出解除一层嵌套的功能
4.reduceByKey算子
- 接受一个处理函数,对数据进行两两计算
5.filter算子
- 接受一个处理函数,可用lambda快速编写
- 函数对RDD数据逐个处理,得到True的保留至返回值的RDD中
6.sortBy算子
- 接收一个处理函数,可用lambda快速编写
- 函数表示用来决定排序的依据
- 可以控制升序或降序
- 全局排序需要设置分区数为1
7.distinct算子
- 完成对RDD内数据的去重操作