目录
一、pyspark介绍
二、PySpark安装
三、RDD编程
1、创建RDD
2、常用Action操作
①collect
②take
③takeSample
④first
⑤count
⑥reduce
⑦foreach
⑧countByKey
⑨saveAsTextFile
3、常用Transformation操作
①map
②filter
③flatMap
④sample
⑤distinct
⑥subtract
⑦union
⑧intersection
⑨cartesian
⑩sortBy
⑪zip
⑫zipWithIndex
4、常用Transformation操作(键值对)
①reduceByKey
②groupByKey
③sortByKey
④join / leftOuterJoin / rightOuterJoin
⑤cogroup
⑥subtractByKey
⑦foldByKey
5、分区操作
①glom
②HashPartitioner
③mapPartitions / mapPartitionsWithIndex
④coalesce
⑤repartition
⑥partitionBy
6、缓存操作
①cache
②persist
③checkpoint
7、共享变量
①broadcast
②accumulator
四、综合应用
1、求平均数
2、统计出现最多的数
五、总结
一、pyspark介绍
Apache Spark是一个用于大数据处理的开源分布式计算框架,而PySpark则是Spark的Python 实现。PySpark允许使用Python编程语言来利用Spark的强大功能,使得开发人员能够利用Python的易用性和灵活性进行大规模数据处理和分析。
PySpark与Spark-Scala的对比:
1、语言选择:
PySpark: 使用简洁而易学的Python作为编程语言,这使得PySpark学习难度大大降低。
Spark-Scala: 使用Scala作为主要编程语言。Scala是一门运行在Java虚拟机上的多范式编程语言,更接近Java,并具有强大的面向对象和函数式编程特性,但是其学习曲线较陡。
2、性能:
PySpark:由于Python是解释型语言,相比Scala的原生Spark可能会有性能上的一些损失。但通过PySpark的DataFrame和优化技术也可以显著提高性能。
Spark-Scala:使用Scala编写的Spark应用程序可能在性能上略优于PySpark,因为Scala是一门静态类型语言,而且运行在Java虚拟机上。
4、生态系统支持:
PySpark:可与Python的生态系统(如NumPy、Pandas)以及其他大数据工具和库进行集成。
Spark-Scala:由于运行在JVM上,可以利用Java生态系统,但Scala本身的生态系统相对较小。
5、机器学习支持:
PySpark: 提供了MLlib库,支持在分布式环境中进行大规模的机器学习任务。
Spark-Scala: 同样支持MLlib,但在API的使用上可能相对繁琐一些。
总体而言,PySpark强于数据分析,Spark-Scala强于性能。如果应用场景有非常多的可视化和机器学习算法需求,推荐使用pyspark,可以更好地和python中的相关库配合使用。
前置知识:
1、熟悉Spark RDD原理,了解RDD常用算子
2、具有Python编码能力,熟悉Python中numpy, pandas库的基本用法。
3、了解机器学习算法原理,如逻辑回归、决策树等等
4、需要安装:JDK、Anaconda
二、PySpark安装
首先安装spark,本文使用的安装文件为:spark-3.2.1-bin-hadoop3.2。即Spark版本为3.2.1,Hadoop可不安装,对本文后续代码运行无影响。
百度云链接如下:
链接:https://pan.baidu.com/s/1GmPZBoBtSZWJtPHqm-DhwA?pwd=bcm5
提取码:bcm5
将下载的安装文件解压到指定目录即可,例如我的目录:D:\bigdata\spark-3.2.1-bin-hadoop3.2
配置系统变量:
此电脑-右键点击“属性”-高级系统设置-环境变量-系统变量
#系统变量新建
SPARK_HOME D:\bigdata\spark-3.2.1-bin-hadoop3.2 #换成你的解压目录
PYSPARK_DEIVER_PYTHON_OPTS notebook
PYSPARK_DEIVER_PYTHON ipython
PYTHONPATH %SPARK_HOME%\python\lib\py4j;%SPARK_HOME%\python\lib\pyspark
#path添加
%SPARK_HOME%\bin
修改配置文件:
在解压路径目录conf下,复制文件spark-env.sh.template,修改文件名为spark-env.sh。
修改配置文件spark-env.sh,在文件末尾添加以下代码:
#D:\Anaconda3换成你的anaconda安装目录
export PYSPARK_PYTHON=D:\Anaconda3
export PYSPARK_DRIVER_PYTHON=D:\Anaconda3
export PYSPARK_SUBMIT_ARGS='--master local[*]'
#local[*] 是利用所有的资源
以上步骤完成,spark已经安装完成。
接下来在Anaconda创建虚拟环境,安装相关python库。需要注意,Python安装的pyspark版本必须与前面安装的spark版本一致。
#创建虚拟环境
conda create -n spark python=3.8
#进入虚拟环境
conda activate spark
#安装相关包
pip install pyspark==3.2.1 findspark pyhive notebook pandas
三、RDD编程
第一步,初始化Spark环境,创建一个Spark应用程序:
import findspark
import pyspark
from pyspark import SparkContext, SparkConf
findspark.init()
conf = SparkConf().setAppName("test").setMaster("local[4]")
sc = SparkContext(conf=conf)
以上代码中,首先创建一个SparkConf对象,用于配置Spark应用程序。通过setAppName设置应用程序的名称为"test",通过setMaster设置运行模式为本地模式,使用4个本地线程。
随后,创建一个SparkContext对象,它是与Spark集群通信的主要入口点。一旦SparkContext被创建,就可以使用它来执行各种分布式计算任务。
1、创建RDD
RDD(Resilient Distributed Dataset):是 Spark 中的核心数据结构,代表分布在集群节点上的不可变、弹性(可容错)、可并行计算的数据集。RDD 可以分为多个分区,每个分区可以在集群中的不同节点上进行并行处理。
①用textFile方法加载本地或者集群文件系统中的数据
#从本地文件系统中加载数据
file = "./data/test.txt"
rdd = sc.textFile(file,3)
#从集群文件系统中加载数据
file = "hdfs://localhost:9000/user/data/test.txt"
#也可以省去hdfs://localhost:9000
rdd = sc.textFile(file,3)
②用parallelize方法将Driver中的数据结构并行化成RDD。
rdd = sc.parallelize(range(1,11),2)
2、常用Action操作
其主要特点如下:
触发计算:Action操作是Spark计算的触发点,当调用 Action操作时,Spark将执行整个RDD的计算流程,并生成最终结果。这与Transformation操作的惰性计算形成对比。
输出结果:Action操作生成非惰性结果,即它们会立即执行计算并返回实际的结果。可以将计算结果返回到本地驱动程序,也可以将结果写入外部存储系统(如 HDFS、数据库等),另外还可以将结果缓存到驱动程序或本地内存中(但对于大型数据集来说可能会导致内存问题)。
①collect
collect()是一个action操作,用于从RDD中收集所有元素到Driver节点,形成一个本地的数据集(数组)。
rdd = sc.parallelize(range(1,11),2)
rdd.collect()
运行结果:
[1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
注意:collect()操作将整个RDD中的数据收集到Driver节点的内存中,在大规模数据集上执行该操作可能导致内存不足或性能问题。
因此,collect()操作适用于小规模的结果集,用于调试和查看数据。对于大规模数据集,更常见的做法是使用转换操作和行动操作组合来实现分布式计算,最后将结果写到外部存储或以其他方式处理。
②take
rdd.take(n)用于获取RDD中的前n个元素。不同于collect(),take(n)只取前n个元素,因此在处理大规模数据时更为高效。
rdd = sc.parallelize(range(1,11),2)
rdd.take(4)
输出结果:
[1, 2, 3, 4]
③takeSample
rdd.takeSample(withReplacement, num, seed=None)用于从RDD中获取指定数量的随机样本。
参数说明:
withReplacement:布尔值,表示是否允许采样时元素的重复抽取。如果为 True,则允许重复抽取;如果为 False,则不允许。
num:要获取的样本数量。
seed:可选的种子值,用于控制随机数生成。如果提供了相同的种子值,多次调用takeSample将产生相同的样本。
rdd = sc.parallelize(range(1,11),2)
rdd.takeSample(False,5,0)
输出结果:
[8, 9, 2, 6, 4]
④first
first()用于获取RDD中的第一个元素,对于大型数据集的性能较好。
rdd = sc.parallelize(range(1,11),2)
rdd.first()
输出结果:
1
⑤count
count()用于获取RDD中元素的总数量,以了解数据规模。
rdd = sc.parallelize(range(1,11),2)
rdd.count()
输出结果:
10
⑥reduce
reduce()用于对RDD中的元素进行规约操作,两两结合进行某种操作后继续与下一个元素结合,直到规约成一个最终的结果。reduce()通常用于执行可以并行化的可交换和可结合的操作,例如对数字进行加法或求和。这样的操作可以在每个分区上并行执行,然后合并结果。
#计算0+1+2+3+4+5+6+7+8+9
rdd = sc.parallelize(range(10),5)
rdd.reduce(lambda x,y:x+y)
输出结果:
45
⑦foreach
rdd.foreach()用于对RDD中的每个元素应用指定的函数,与map不同,foreach 是一个行动操作,它会在每个分区上并行地对每个元素执行给定的函数。
rdd = sc.parallelize(range(10),5)
accum = sc.accumulator(0)
rdd.foreach(lambda x:accum.add(x))
print(accum.value)
输出结果:
45
在以上代码中,通过sc.accumulator()创建了一个累加器,并初始化其值为0。然后使用rdd.foreach()对RDD中的每个元素执行匿名函数,该函数将元素的值累加到累加器中。由于累加器是在分布式环境中共享的,因此每个节点上的累加器都能够更新。
⑧countByKey
rdd.countByKey()用于统计 (key, value) 对的RDD中每个key的出现次数。
pairRdd = sc.parallelize([("hello",1),("world",4),("hello",9),("something",16)])
pairRdd.countByKey()
输出结果:
defaultdict(int, {'hello': 2, 'world': 1, 'something': 1})
⑨saveAsTextFile
saveAsTextFile()用于将RDD的内容保存为文本文件,即将分布式数据集的结果写入到本地文件系统或分布式文件系统(如HDFS)中。
#saveAsTextFile保存rdd成text文件到本地
text_file = "./test/rdd.txt"
rdd = sc.parallelize(range(5))
rdd.saveAsTextFile(text_file)
#重新读入会被解析文本
rdd_loaded = sc.textFile(text_file)
rdd_loaded.collect()
3、常用Transformation操作
Transformation操作是对RDD进行变换的操作,它们不会立即执行,而是构建了一个表示要在RDD上执行的操作的执行计划。Transformation操作是为了支持分布式计算而设计的。它们在整个集群上并行运行,并利用RDD的不可变性和分区的概念来实现高效的分布式处理。通过构建逻辑执行计划,Spark可以优化计算并在整个集群上分布计算任务,以提高性能。
其主要特点如下:
惰性计算:当应用一个Transformation操作时,Spark只是记录了该操作的存在,并没有实际执行计算。实际的计算将会在Action操作触发时进行。
生成新的RDD:由于RDD一旦创建就不能被修改,所以Transformation操作通常生成一个新的RDD,而不是修改原始的RDD。
窄/宽依赖:Transformation操作可以分为窄依赖和宽依赖。
窄依赖指每个父分区中的数据仅依赖于该父分区的数据,例如map操作。
宽依赖指某个父分区的数据可能依赖于多个父分区的数据,例如groupByKey和reduceByKey操作,这会导致数据的重新分区,因此可能引起数据的Shuffle。
①map
rdd.map()用于对RDD中的每个元素应用一个指定的函数,并返回一个包含应用函数后结果的RDD。rdd.map()接受一个函数作为参数(可以是lambda匿名函数),该函数将被应用到RDD中的每个元素。
rdd = sc.parallelize(range(10),3)
rdd.map(lambda x:x**2).collect()
输出结果:
[0, 1, 4, 9, 16, 25, 36, 49, 64, 81]
②filter
rdd.filter()用于对RDD中的元素进行过滤,并返回新RDD,其中包含满足指定条件的原始RDD中的元素。
rdd = sc.parallelize(range(10),3)
rdd.filter(lambda x:x>5).collect()
输出结果:
[6, 7, 8, 9]
③flatMap
rdd.flatMap()即在rdd.map()的基础上将所有的结果扁平化为一个新的RDD。
rdd = sc.parallelize(["hello world","hello China"])
print(rdd.map(lambda x:x.split(" ")).collect())
print(rdd.flatMap(lambda x:x.split(" ")).collect())
输出结果:
[['hello', 'world'], ['hello', 'China']]
['hello', 'world', 'hello', 'China']
④sample
rdd.sample()用于从RDD每个分区按照比例随机抽样一部分元素,生成新的RDD。
参数如下:
withReplacement:是否放回抽样。true-有放回,false-无放回
fraction:期望样本的大小作为RDD大小的一部分。fraction范围在[0,1],即表示选择每个元素的概率。fraction大于1时,即表示选择每个元素的期望次数。
seed:随机数生成器的种子。
rdd = sc.parallelize(range(10),2)
#每个元素被抽到的概率为0.5,但输出的元素不一定是5个
print(rdd.sample(withReplacement=False, fraction=0.5,seed=0).collect())
#每个元素被抽到的期望次数是2,但输出的元素不一定是20个
print(rdd.sample(withReplacement=True, fraction=2,seed=0).collect())
输出结果:
[1, 4, 5, 7]
[0, 0, 1, 1, 1, 1, 2, 2, 2, 2, 3, 4, 4, 4, 4, 5, 5, 5, 6, 6, 6, 7, 8, 9]
⑤distinct
rdd.distinct()对原始RDD进行去重。
rdd = sc.parallelize([1,1,2,2,3,3,4,5])
rdd.distinct().collect()
输出结果:
[4, 1, 5, 2, 3]
⑥subtract
rdd1.subtract(rdd2)用于计算两个RDD的差集,返回在rdd1中但不在rdd2中的元素。
rdd1 = sc.parallelize([1, 2, 3, 4, 5])
rdd2 = sc.parallelize([4, 5, 6, 7, 8])
rdd1.subtract(rdd2).collect()
输出结果:
[1, 2, 3]
⑦union
rdd1.union(rdd2)用于计算两个RDD的并集,需要注意返回结果中可能带有重复元素
rdd1 = sc.parallelize([1, 2, 3, 4, 5])
rdd2 = sc.parallelize([4, 5, 6, 7, 8])
rdd1.union(rdd2).collect()
输出结果:
[1, 2, 3, 4, 5, 4, 5, 6, 7, 8]
⑧intersection
rdd1.intersection(rdd2)用于计算两个RDD的交集。
rdd1 = sc.parallelize([1, 2, 3, 4, 5])
rdd2 = sc.parallelize([4, 5, 6, 7, 8])
rdd1.intersection(rdd2).collect()
输出结果:
[4, 5]
⑨cartesian
rdd1.cartesian(rdd2)用于计算两个RDD的笛卡尔积。
rdd1 = sc.parallelize([1, 2, 3])
rdd2 = sc.parallelize(['a', 'b', 'c'])
rdd1.cartesian(rdd2).collect()
输出结果:
[(1, 'a'),
(1, 'b'),
(1, 'c'),
(2, 'a'),
(2, 'b'),
(2, 'c'),
(3, 'a'),
(3, 'b'),
(3, 'c')]
⑩sortBy
rdd.sortBy()用于对RDD中的元素按照指定的排序键进行排序。
参数如下:
rdd.sortBy(keyfunc, ascending=True, numPartitions=None)
keyfunc用于从 RDD 的每个元素中提取用于排序的键,可以是lambda匿名函数。
ascending表示排序的顺序。 True为升序,False为降序。
numPartitions表示返回结果RDD的分区数。
data = [(1, 'apple'), (3, 'orange'), (2, 'banana'), (4, 'grape')]
rdd = sc.parallelize(data)
rdd.sortBy(lambda x: x[0], ascending=True,numPartitions=2).collect()
输出结果:
[(1, 'apple'), (2, 'banana'), (3, 'orange'), (4, 'grape')]
⑪zip
rdd1.zip(rdd2)按照拉链方式将两个RDD中的元素一对一地合并成元组,效果类似python的zip函数,需要两个RDD具有相同的分区,每个分区元素数量相同。
rdd1 = sc.parallelize([1, 2, 3, 4, 5],2)
rdd2 = sc.parallelize(['a', 'b', 'c', 'd', 'e'],2)
rdd1.zip(rdd2).collect()
输出结果:
[(1, 'a'), (2, 'b'), (3, 'c'), (4, 'd'), (5, 'e')]
⑫zipWithIndex
rdd.zipWithIndex()用于将RDD中的每个元素与其在 RDD 中的索引位置一对一地合并成元组。
rdd = sc.parallelize([10, 20, 30, 40, 50])
rdd.zipWithIndex().collect()
输出结果:
[(10, 0), (20, 1), (30, 2), (40, 3), (50, 4)]
4、常用Transformation操作(键值对)
PairRDD中的元素是键值对,Spark提供了针对键值对的一系列转换和操作,使得对数据进行分组、聚合和排序等操作更加方便。
①reduceByKey
rdd.reduceByKey()用于对RDD进行聚合的Transformation操作,将具有相同键的所有值根据提供的聚合函数进行合并。
参数如下:
reduceByKey(func, numPartitions=None, partitionFunc=<function portable_hash>)
func: 聚合函数,接受两个参数,用于将相同键的值进行合并。
numPartitions: 可选参数,用于指定返回结果RDD的分区数。
partitionFunc: 可选参数,用于指定分区函数,默认为哈希分区函数。
data = [('x', 3), ('y', 5), ('x', 2), ('y', 1), ('x', 1), ('z', 4)]
rdd = sc.parallelize(data)
rdd.reduceByKey(lambda x, y: x + y).collect()
输出结果:
[('y', 6), ('x', 6), ('z', 4)]
②groupByKey
rdd.groupByKey()用于对PairRDD进行分组的Transformation操作,将相同键的所有值放在一个迭代器中并返回新RDD,适用于不涉及值的聚合操作,只需按键进行分组的情况。不过groupByKey()可能导致数据倾斜,特别是当某个键对应的值非常多时。另外,生成的结果是以键值对形式的迭代器,存在大量数据时可能导致内存溢出。
data = [('x', 3), ('y', 5), ('x', 2), ('y', 1), ('x', 1), ('z', 4)]
rdd = sc.parallelize(data)
print(rdd.groupByKey().collect())
for key, values in rdd.groupByKey().collect():
print(f"{key}: {list(values)}")
输出结果:
[('y', <pyspark.resultiterable.ResultIterable object at 0x00000259F5816C10>), ('x', <pyspark.resultiterable.ResultIterable object at 0x00000259F574C790>), ('z', <pyspark.resultiterable.ResultIterable object at 0x00000259F574C490>)]
y: [5, 1]
x: [3, 2, 1]
z: [4]
③sortByKey
rdd.sortByKey()用于对PairRDD进行按键排序的Transformation操作,按照键进行排序,并返回一个新RDD。sortByKey()可能导致数据倾斜,特别是当某个键对应的值非常多时。在数据量庞大时,可能会影响性能,因为需要将数据在不同分区间移动以进行排序。
参数如下:
ascending:表示排序的顺序。True为升序(默认),False为降序。
data = [(3, 'x'), (5, 'y'), (2, 'z'), (4, 's')]
rdd = sc.parallelize(data)
rdd.sortByKey().collect()
输出结果:
[(2, 'z'), (3, 'x'), (4, 's'), (5, 'y')]
④join / leftOuterJoin / rightOuterJoin
rdd1.join(rdd2)用于对两个PairRDD进行连接的Transformation操作,根据键将两个 PairRDD 中的元素进行连接。类似SQL中的inner join。
rdd1.leftOuterJoin(rdd2)、rdd1.rightOuterJoin(rdd2)分别是左关联、右关联。如果另一侧PairRDD 中没有匹配的键,则对应位置的值为None。
data1 = [('Tom', 18), ('Jerry', 19), ('Alice', 17)]
data2 = [('Tom', 'male'), ('Bob', 'male'), ('Alice', 'female')]
rdd1 = sc.parallelize(data1)
rdd2 = sc.parallelize(data2)
print(rdd1.join(rdd2).collect())
print(rdd1.leftOuterJoin(rdd2).collect())
print(rdd1.rightOuterJoin(rdd2).collect())
输出结果:
[('Tom', (18, 'male')), ('Alice', (17, 'female'))]
[('Jerry', (19, None)), ('Tom', (18, 'male')), ('Alice', (17, 'female'))]
[('Tom', (18, 'male')), ('Bob', (None, 'male')), ('Alice', (17, 'female'))]
⑤cogroup
rdd1.cogroup(rdd2)用于对两个PairRDD进行先后两次分组连接的Transformation操作,相当于对rdd1、rdd2分别进行goupByKey,再对两个结果进行groupByKey。
rdd1 = sc.parallelize([("a", 1), ("a", 2), ("b", 3)])
rdd2 = sc.parallelize([("a", 3), ("b", 4)])
[(x, tuple(map(list, y))) for x, y in sorted(list(rdd1.cogroup(rdd2).collect()))]
输出结果:
[('a', ([1, 2], [3])), ('b', ([3], [4]))]
⑥subtractByKey
rdd1.subtractByKey(rdd2)用于对两个PairRDD求差集的Transformation操作,即返回在rdd1中而不在rdd2中的元素。
rdd1 = sc.parallelize([("x", 1), ("y", 2), ("z", 3)])
rdd2 = sc.parallelize([("x", 3), ("y", 4)])
rdd1.subtractByKey(rdd2).collect()
输出结果:
[('z', 3)]
⑦foldByKey
foldByKey的操作和reduceByKey类似,但是foldByKey可以提供一个初始值
data = [('x', 1), ('x', 2), ('x', 3), ('y', 1), ('y', 2), ('z', 1)]
rdd = sc.parallelize(data)
print(rdd.foldByKey(0, lambda x, y: x + y).collect())
print(rdd.foldByKey(1, lambda x, y: x + y).collect())
输出结果:
[('y', 3), ('x', 6), ('z', 1)]
[('y', 5), ('x', 8), ('z', 2)]
5、分区操作
RDD分区操作主要分为调整分区与转换分区操作。
调整分区操作用于调整已有RDD的分区结构,不改变数据的物理位置,仅影响分区元数据,通常性能开销较小。例如:分区数调整,即调整现有RDD的分区数,但不移动数据。
转换分区操作改变了RDD的分区结构,通常是在数据上执行Transformation操作,产生一个新的RDD,其分区数可能发生变化。例如通过指定的分区数或者使用一些具体的分区算法,重新组织数据分区。在数据的重新组织过程中可能涉及跨分区的数据移动,通常伴随着性能开销。
①glom
rdd.glom()用于将每个分区的数据转换为一个数组,是Transformation操作。适用于需要对每个分区进行整体操作的场景。
data = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
rdd = sc.parallelize(data, 3)
rdd.glom().collect()
输出结果:
[[1, 2, 3], [4, 5, 6], [7, 8, 9, 10]]
②HashPartitioner
rdd.HashPartitioner()用于对PairRDD进行哈希分区,即根据键的哈希值将数据划分到不同的分区中。
data = [('a', 1), ('b', 2), ('c', 3), ('d', 4), ('e', 5)]
rdd1 = sc.parallelize(data)
rdd2 = rdd1.partitionBy(2)
for partition, data in enumerate(rdd2.glom().collect()):
print(f"Partition {partition}: {data}")
输出结果:
Partition 0: [('b', 2), ('c', 3), ('d', 4)]
Partition 1: [('a', 1), ('e', 5)]
③mapPartitions / mapPartitionsWithIndex
rdd.mapPartitions()是Transformation操作,用于对RDD的每个分区执行一个自定义映射函数,该函数可以处理分区内的所有元素,而不是一次仅处理一个元素。mapPartitions能够减少通信开销,因为映射操作是在每个分区内进行的,适用于需要对整个分区进行批量操作的场景,而不适用于需要考虑跨分区元素之间关系的场景。
rdd.mapPartitionsWithIndex()类似于mapPartitions,但提供了分区索引信息,允许更细粒度的控制。
data = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
rdd = sc.parallelize(data, 3)
print(rdd.mapPartitions(lambda x:(i * 2 for i in x)).collect())
print(rdd.mapPartitionsWithIndex(lambda index,x:((index, i*2) for i in x)).collect())
输出结果:
[2, 4, 6, 8, 10, 12, 14, 16, 18, 20]
[(0, 2), (0, 4), (0, 6), (1, 8), (1, 10), (1, 12), (2, 14), (2, 16), (2, 18), (2, 20)]
④coalesce
coalesce()用于减少分区数的Transformation操作,可以尽量避免数据迁移,提升效率。
参数如下:
coalesce(numPartitions, shuffle=False)
numPartitions:新的分区数。
shuffle:是否进行数据洗牌,默认为False。当设置为 True 时,将触发数据洗牌操作,否则只是简单地减小分区数。
rdd = sc.parallelize(range(20),10)
print(rdd.glom().collect())
print(rdd.coalesce(2,shuffle=False).glom().collect())
print(rdd.coalesce(2,shuffle=True).glom().collect())
输出结果:
[[0, 1], [2, 3], [4, 5], [6, 7], [8, 9], [10, 11], [12, 13], [14, 15], [16, 17], [18, 19]]
[[0, 1, 2, 3, 4, 5, 6, 7, 8, 9], [10, 11, 12, 13, 14, 15, 16, 17, 18, 19]]
[[0, 1, 4, 5, 6, 7, 12, 13, 16, 17], [2, 3, 8, 9, 10, 11, 14, 15, 18, 19]]
⑤repartition
rdd.repartition()用于重新分区的Transformation操作,可以增加或减少分区数,通过shuffle来重新组织数据。允许动态调整RDD的分区数,可在数据分布不均匀时提高计算性能。
对比:coalesce在已有分区基础上尽量减少数据shuffle,而repartition会创建新分区并且使用full shuffle。
rdd = sc.parallelize(range(25),25)
print(rdd.glom().collect())
print(rdd.repartition(5).glom().collect())
print(rdd.coalesce(5).glom().collect())
输出结果:
[[0], [1], [2], [3], [4], [5], [6], [7], [8], [9], [10], [11], [12], [13], [14], [15], [16], [17], [18], [19], [20], [21], [22], [23], [24]]
[[6, 8, 11, 15, 20, 21], [0, 9, 16, 18, 24], [2, 3, 7, 14, 19, 22], [1], [4, 5, 10, 12, 13, 17, 23]]
[[0, 1, 2, 3, 4], [5, 6, 7, 8, 9], [10, 11, 12, 13, 14], [15, 16, 17, 18, 19], [20, 21, 22, 23, 24]]
⑥partitionBy
rdd.partitionBy()用于对PairRDD重新分区,是Transformation操作,可以根据指定的分区器对键值对数据进行重新分区,以更好地控制数据的分布。
data = [('a', 1), ('b', 2), ('c', 3), ('d', 4), ('e', 5)]
pair_rdd = sc.parallelize(data)
hash_partitioned_rdd = pair_rdd.partitionBy(2)
for partition, data in enumerate(hash_partitioned_rdd.glom().collect()):
print(f"Partition {partition}: {data}")
输出结果:
Partition 0: [('b', 2), ('c', 3), ('d', 4)]
Partition 1: [('a', 1), ('e', 5)]
6、缓存操作
如果多个任务在计算过程中共享同一个RDD作为中间数据,通过对其进行缓存,将其存储在内存中,可以显著加快计算速度。但是对RDD的缓存并不会立即生效,而是在该RDD第一次被计算出来时才会进行缓存。在不再需要某个RDD时,可以使用unpersist来释放缓存,而这个操作是立即执行的。这样可以有效地管理内存资源,避免不必要的缓存。
另一方面,缓存在提高计算速度的同时,并不会切断RDD的血缘依赖关系。因为缓存的数据可能存在某些分区的节点发生故障的情况,例如内存溢出或者节点损坏。在这种情况下,可以根据血缘关系重新计算受影响分区的数据,确保计算的正确性。
如果需要切断血缘关系,可以使用checkpoint来设置检查点,将RDD保存到磁盘中。与缓存类似,对RDD进行checkpoint同样不会立即生效,而是在该RDD第一次被计算出来时才会保存成检查点。通常,checkpoint适用于一些计算代价非常高昂的中间结果,或者在重复计算结果不可保证完全一致的情况下(例如使用zipWithIndex算子)。
对RDD进行缓存是优化Spark计算性能的有效手段,但需要根据具体情况灵活运用,以确保计算的准确性和效率。
①cache
rdd.cache()用于将RDD的计算结果缓存到内存中,以便在后续操作中重用,可以显著提高迭代算法等需要多次使用同一数据集的性能。rdd.cache使用存储级别MEMORY_ONLY,意味着如果内存不足,Spark可能会根据缓存数据的大小和可用内存的情况进行动态调整,例如将一部分或全部缓存的数据移除,以腾出内存供其他操作使用。
a = sc.parallelize(range(10000),5)
a.cache()
sum_a = a.reduce(lambda x,y:x+y)
cnt_a = a.count()
mean_a = sum_a/cnt_a
print(mean_a)
输出结果:
4999.5
②persist
rdd.persist()用于将RDD中间结果缓存到内存或磁盘中,以便在后续操作中重用。与cache不同,persist允许用户指定不同的存储级别,以更灵活地管理缓存。存储级别即不同的数据缓存的位置和策略,可以是MEMORY_ONLY、DISK_ONLY、MEMORY_AND_DISK(默认)等。
rdd.persist()写入磁盘的文件是临时文件,应用执行完成后就会被删除,可以使用rdd.unpersist()立即释放缓存。
from pyspark.storagelevel import StorageLevel
a = sc.parallelize(range(10000),5)
a.persist(StorageLevel.MEMORY_AND_DISK)
sum_a = a.reduce(lambda x,y:x+y)
cnt_a = a.count()
mean_a = sum_a/cnt_a
a.unpersist()
print(mean_a)
输出结果:
4999.5
③checkpoint
rdd.checkpoint()用于将RDD中间结果写入磁盘。由于血缘依赖过长会造成容错成本过高,可以中间阶段做检查点容错,如果检查点之后有节点出现问题,可以从检查点开始重新执行程序,减少开销。需要注意的是,checkpoint操作并不会马上被执行,而是在执行Action操作时才被触发。另外,checkpoint路径保存的文件是永久存在的,不会随着应用的结束而被删除。
sc.setCheckpointDir("./data/checkpoint/")
rdd = sc.parallelize(["a","b","c","d"],2)
rdd_idx = rdd.zipWithIndex()
rdd_idx.checkpoint()
rdd_idx.take(3)
输出结果:
[('a', 0), ('b', 1), ('c', 2)]
7、共享变量
共享变量主要用于在分布式计算中实现在任务之间共享数据,以提高性能和降低网络开销。广播变量(broadcast variables)和累加器(accumulators)是两个重要的分布式计算工具,用于在集群上共享数据和累积结果。
①broadcast
当需要在所有工作节点之间共享较小的只读数据集时,使用广播变量可以避免将该数据集多次传输到各个节点。这可以有效减少网络开销,提高性能。典型的应用场景包括在所有节点上使用相同的配置参数、字典或者映射表等。并且可以避免任务间重复传输,如果一个RDD需要在多个任务中使用,而且这个RDD的数据较小,使用广播变量可以避免在不同任务之间多次传输相同的数据。
#广播变量 broadcast 不可变,在所有节点可读
broads = sc.broadcast(100)
rdd = sc.parallelize(range(10))
print(rdd.map(lambda x:x+broads.value).collect())
print(broads.value)
输出结果:
[100, 101, 102, 103, 104, 105, 106, 107, 108, 109]
100
②accumulator
累加器主要用于执行在分布式任务中的“添加”和“合并”操作,通常用于聚合和计数等操作。
例如,可以用累加器来计算在整个集群上发生的某个特定事件的总次数;计算所有节点上某个变量的总和或平均值。
#累加器 只能在Driver上可读,在其它节点只能进行累加
total = sc.accumulator(0)
rdd = sc.parallelize(range(10),3)
rdd.foreach(lambda x:total.add(x))
total.value
输出结果:
45
四、综合应用
1、求平均数
题目:计算1000个随机数的平均数。
import numpy as np
import findspark
import pyspark
from pyspark import SparkContext, SparkConf
findspark.init()
conf = SparkConf().setAppName("test1").setMaster("local[4]")
sc = SparkContext(conf=conf)
np.random.seed(6)
data = np.random.randint(0, 100, size=1000)
rdd = sc.parallelize(data,2)
print("前10个数:",rdd.take(10))
data_sum = rdd.reduce(lambda x,y:x+y)
data_count = rdd.count()
print("平均数:",data_sum/data_count)
sc.stop()
运行结果:
前10个数: [10, 73, 99, 84, 79, 80, 62, 25, 1, 75]
平均数: 50.227
2、统计出现最多的数
题目:计算1000个随机数中出现最多的数。
import numpy as np
import findspark
import pyspark
from pyspark import SparkContext, SparkConf
findspark.init()
conf = SparkConf().setAppName("test1").setMaster("local[4]")
sc = SparkContext(conf=conf)
np.random.seed(6)
data = np.random.randint(0, 100, size=1000)
rdd1 = sc.parallelize(data,2)
rdd2 = rdd1.map(lambda x:(x,1)).reduceByKey(lambda x,y:x+y)
max_count = rdd2.map(lambda x:x[1]).reduce(lambda x,y: x if x>=y else y)
max_value = rdd2.filter(lambda x:x[1]==max_count).map(lambda x:x[0]).collect()[0]
print("出现最多的数:",max_value," 出现次数:",max_count)
sc.stop()
运行结果:
出现最多的数: 63 出现次数: 18
五、总结
总的来说,PySpark适合初学者入门学习,由于python门槛不高,易于掌握,可以通过PySpark了解Spark的运行机制以及RDD算子的使用。但如果是需要几百台服务器才能运行的任务场景, PySpark的UDF(User Defined Functions)的性能差距肯定比不过Spark-Scala。
至于选什么语言,取决于业务需求。如果是处理简单的数据清洗聚合,且数据量非常大,用Scala会有性能优势,可以节约计算资源。如果需要处理较为复杂的算法模型,依赖于各种第三方包,那么使用Python会更好。