(一)PySpark3:安装教程及RDD编程(非常详细)

news2024/9/22 13:25:14

目录

一、pyspark介绍

二、PySpark安装

三、RDD编程

1、创建RDD

2、常用Action操作

①collect

②take

③takeSample

④first

⑤count

⑥reduce

⑦foreach

⑧countByKey

⑨saveAsTextFile

3、常用Transformation操作

①map

②filter

③flatMap

④sample

⑤distinct

⑥subtract

⑦union

⑧intersection

⑨cartesian

⑩sortBy

⑪zip

⑫zipWithIndex

4、常用Transformation操作(键值对)

①reduceByKey

②groupByKey

③sortByKey

④join / leftOuterJoin / rightOuterJoin

⑤cogroup

⑥subtractByKey

⑦foldByKey

5、分区操作

①glom

②HashPartitioner

③mapPartitions / mapPartitionsWithIndex

④coalesce

⑤repartition

⑥partitionBy

6、缓存操作

①cache

②persist

③checkpoint

7、共享变量

①broadcast

②accumulator

四、综合应用

1、求平均数

2、统计出现最多的数

五、总结


一、pyspark介绍

Apache Spark是一个用于大数据处理的开源分布式计算框架,而PySpark则是Spark的Python 实现。PySpark允许使用Python编程语言来利用Spark的强大功能,使得开发人员能够利用Python的易用性和灵活性进行大规模数据处理和分析。


PySpark与Spark-Scala的对比:

1、语言选择:
PySpark: 使用简洁而易学的Python作为编程语言,这使得PySpark学习难度大大降低。
Spark-Scala: 使用Scala作为主要编程语言。Scala是一门运行在Java虚拟机上的多范式编程语言,更接近Java,并具有强大的面向对象和函数式编程特性,但是其学习曲线较陡。

2、性能:
PySpark:由于Python是解释型语言,相比Scala的原生Spark可能会有性能上的一些损失。但通过PySpark的DataFrame和优化技术也可以显著提高性能。
Spark-Scala:使用Scala编写的Spark应用程序可能在性能上略优于PySpark,因为Scala是一门静态类型语言,而且运行在Java虚拟机上。

4、生态系统支持:
PySpark:可与Python的生态系统(如NumPy、Pandas)以及其他大数据工具和库进行集成。
Spark-Scala:由于运行在JVM上,可以利用Java生态系统,但Scala本身的生态系统相对较小。

5、机器学习支持:
PySpark: 提供了MLlib库,支持在分布式环境中进行大规模的机器学习任务。
Spark-Scala: 同样支持MLlib,但在API的使用上可能相对繁琐一些。

总体而言,PySpark强于数据分析,Spark-Scala强于性能。如果应用场景有非常多的可视化和机器学习算法需求,推荐使用pyspark,可以更好地和python中的相关库配合使用。

前置知识:

1、熟悉Spark RDD原理,了解RDD常用算子
2、具有Python编码能力,熟悉Python中numpy, pandas库的基本用法。
3、了解机器学习算法原理,如逻辑回归、决策树等等
4、需要安装:JDK、Anaconda

二、PySpark安装

首先安装spark,本文使用的安装文件为:spark-3.2.1-bin-hadoop3.2。即Spark版本为3.2.1,Hadoop可不安装,对本文后续代码运行无影响。

百度云链接如下:

链接:https://pan.baidu.com/s/1GmPZBoBtSZWJtPHqm-DhwA?pwd=bcm5 
提取码:bcm5

将下载的安装文件解压到指定目录即可,例如我的目录:D:\bigdata\spark-3.2.1-bin-hadoop3.2

配置系统变量:

此电脑-右键点击“属性”-高级系统设置-环境变量-系统变量

#系统变量新建
SPARK_HOME D:\bigdata\spark-3.2.1-bin-hadoop3.2 #换成你的解压目录
PYSPARK_DEIVER_PYTHON_OPTS notebook
PYSPARK_DEIVER_PYTHON ipython
PYTHONPATH %SPARK_HOME%\python\lib\py4j;%SPARK_HOME%\python\lib\pyspark

#path添加
%SPARK_HOME%\bin

修改配置文件:
在解压路径目录conf下,复制文件spark-env.sh.template,修改文件名为spark-env.sh

修改配置文件spark-env.sh,在文件末尾添加以下代码:

#D:\Anaconda3换成你的anaconda安装目录
export PYSPARK_PYTHON=D:\Anaconda3
export PYSPARK_DRIVER_PYTHON=D:\Anaconda3
export PYSPARK_SUBMIT_ARGS='--master local[*]'
#local[*]  是利用所有的资源

以上步骤完成,spark已经安装完成。

接下来在Anaconda创建虚拟环境,安装相关python库。需要注意,Python安装的pyspark版本必须与前面安装的spark版本一致。

#创建虚拟环境
conda create -n spark python=3.8

#进入虚拟环境
conda activate spark

#安装相关包
pip install pyspark==3.2.1 findspark pyhive notebook pandas

三、RDD编程

第一步,初始化Spark环境,创建一个Spark应用程序:

import findspark
import pyspark 
from pyspark import SparkContext, SparkConf
findspark.init()

conf = SparkConf().setAppName("test").setMaster("local[4]")
sc = SparkContext(conf=conf)

以上代码中,首先创建一个SparkConf对象,用于配置Spark应用程序。通过setAppName设置应用程序的名称为"test",通过setMaster设置运行模式为本地模式,使用4个本地线程。

随后,创建一个SparkContext对象,它是与Spark集群通信的主要入口点。一旦SparkContext被创建,就可以使用它来执行各种分布式计算任务。

1、创建RDD

RDD(Resilient Distributed Dataset):是 Spark 中的核心数据结构,代表分布在集群节点上的不可变、弹性(可容错)、可并行计算的数据集。RDD 可以分为多个分区,每个分区可以在集群中的不同节点上进行并行处理。

①用textFile方法加载本地或者集群文件系统中的数据

#从本地文件系统中加载数据
file = "./data/test.txt"
rdd = sc.textFile(file,3)

#从集群文件系统中加载数据
file = "hdfs://localhost:9000/user/data/test.txt"
#也可以省去hdfs://localhost:9000
rdd = sc.textFile(file,3)

②用parallelize方法将Driver中的数据结构并行化成RDD。

rdd = sc.parallelize(range(1,11),2)

2、常用Action操作

其主要特点如下:

触发计算:Action操作是Spark计算的触发点,当调用 Action操作时,Spark将执行整个RDD的计算流程,并生成最终结果。这与Transformation操作的惰性计算形成对比。

输出结果:Action操作生成非惰性结果,即它们会立即执行计算并返回实际的结果。可以将计算结果返回到本地驱动程序,也可以将结果写入外部存储系统(如 HDFS、数据库等),另外还可以将结果缓存到驱动程序或本地内存中(但对于大型数据集来说可能会导致内存问题)。

①collect

collect()是一个action操作,用于从RDD中收集所有元素到Driver节点,形成一个本地的数据集(数组)。

rdd = sc.parallelize(range(1,11),2)
rdd.collect()

运行结果:

[1, 2, 3, 4, 5, 6, 7, 8, 9, 10]

注意:collect()操作将整个RDD中的数据收集到Driver节点的内存中,在大规模数据集上执行该操作可能导致内存不足或性能问题。
因此,collect()操作适用于小规模的结果集,用于调试和查看数据。对于大规模数据集,更常见的做法是使用转换操作和行动操作组合来实现分布式计算,最后将结果写到外部存储或以其他方式处理。

②take

rdd.take(n)用于获取RDD中的前n个元素。不同于collect(),take(n)只取前n个元素,因此在处理大规模数据时更为高效。

rdd = sc.parallelize(range(1,11),2)
rdd.take(4)

输出结果:

[1, 2, 3, 4]
takeSample

rdd.takeSample(withReplacement, num, seed=None)用于从RDD中获取指定数量的随机样本。

参数说明:
withReplacement:布尔值,表示是否允许采样时元素的重复抽取。如果为 True,则允许重复抽取;如果为 False,则不允许。
num:要获取的样本数量。
seed:可选的种子值,用于控制随机数生成。如果提供了相同的种子值,多次调用takeSample将产生相同的样本。

rdd = sc.parallelize(range(1,11),2)
rdd.takeSample(False,5,0)

输出结果:

[8, 9, 2, 6, 4]
first

first()用于获取RDD中的第一个元素,对于大型数据集的性能较好。

rdd = sc.parallelize(range(1,11),2)
rdd.first()

输出结果:

1
⑤count

count()用于获取RDD中元素的总数量,以了解数据规模。

rdd = sc.parallelize(range(1,11),2)
rdd.count()

输出结果:

10
⑥reduce

reduce()用于对RDD中的元素进行规约操作,两两结合进行某种操作后继续与下一个元素结合,直到规约成一个最终的结果。reduce()通常用于执行可以并行化的可交换和可结合的操作,例如对数字进行加法或求和。这样的操作可以在每个分区上并行执行,然后合并结果。

#计算0+1+2+3+4+5+6+7+8+9
rdd = sc.parallelize(range(10),5) 
rdd.reduce(lambda x,y:x+y)

输出结果:

45
⑦foreach

rdd.foreach()用于对RDD中的每个元素应用指定的函数,与map不同,foreach 是一个行动操作,它会在每个分区上并行地对每个元素执行给定的函数。

rdd = sc.parallelize(range(10),5) 
accum = sc.accumulator(0)
rdd.foreach(lambda x:accum.add(x))
print(accum.value)

输出结果:

45

在以上代码中,通过sc.accumulator()创建了一个累加器,并初始化其值为0。然后使用rdd.foreach()对RDD中的每个元素执行匿名函数,该函数将元素的值累加到累加器中。由于累加器是在分布式环境中共享的,因此每个节点上的累加器都能够更新。

countByKey

rdd.countByKey()用于统计 (key, value) 对的RDD中每个key的出现次数。

pairRdd = sc.parallelize([("hello",1),("world",4),("hello",9),("something",16)]) 
pairRdd.countByKey()

输出结果:

defaultdict(int, {'hello': 2, 'world': 1, 'something': 1})
saveAsTextFile

saveAsTextFile()用于将RDD的内容保存为文本文件,即将分布式数据集的结果写入到本地文件系统或分布式文件系统(如HDFS)中。

#saveAsTextFile保存rdd成text文件到本地
text_file = "./test/rdd.txt"
rdd = sc.parallelize(range(5))
rdd.saveAsTextFile(text_file)

#重新读入会被解析文本
rdd_loaded = sc.textFile(text_file)
rdd_loaded.collect()

3、常用Transformation操作

Transformation操作是对RDD进行变换的操作,它们不会立即执行,而是构建了一个表示要在RDD上执行的操作的执行计划。Transformation操作是为了支持分布式计算而设计的。它们在整个集群上并行运行,并利用RDD的不可变性和分区的概念来实现高效的分布式处理。通过构建逻辑执行计划,Spark可以优化计算并在整个集群上分布计算任务,以提高性能。

其主要特点如下:

惰性计算:当应用一个Transformation操作时,Spark只是记录了该操作的存在,并没有实际执行计算。实际的计算将会在Action操作触发时进行。

生成新的RDD:由于RDD一旦创建就不能被修改,所以Transformation操作通常生成一个新的RDD,而不是修改原始的RDD。

窄/宽依赖:Transformation操作可以分为窄依赖和宽依赖。
窄依赖指每个父分区中的数据仅依赖于该父分区的数据,例如map操作。
宽依赖指某个父分区的数据可能依赖于多个父分区的数据,例如groupByKey和reduceByKey操作,这会导致数据的重新分区,因此可能引起数据的Shuffle。

①map

rdd.map()用于对RDD中的每个元素应用一个指定的函数,并返回一个包含应用函数后结果的RDD。rdd.map()接受一个函数作为参数(可以是lambda匿名函数),该函数将被应用到RDD中的每个元素。

rdd = sc.parallelize(range(10),3)
rdd.map(lambda x:x**2).collect()

输出结果:

[0, 1, 4, 9, 16, 25, 36, 49, 64, 81]
②filter

rdd.filter()用于对RDD中的元素进行过滤,并返回新RDD,其中包含满足指定条件的原始RDD中的元素。

rdd = sc.parallelize(range(10),3)
rdd.filter(lambda x:x>5).collect()

输出结果:

[6, 7, 8, 9]
flatMap

rdd.flatMap()即在rdd.map()的基础上将所有的结果扁平化为一个新的RDD。

rdd = sc.parallelize(["hello world","hello China"])
print(rdd.map(lambda x:x.split(" ")).collect())
print(rdd.flatMap(lambda x:x.split(" ")).collect())

输出结果:

[['hello', 'world'], ['hello', 'China']]
['hello', 'world', 'hello', 'China']
sample

rdd.sample()用于从RDD每个分区按照比例随机抽样一部分元素,生成新的RDD。

参数如下:

withReplacement:是否放回抽样。true-有放回,false-无放回
fraction:期望样本的大小作为RDD大小的一部分。fraction范围在[0,1],即表示选择每个元素的概率。fraction大于1时,即表示选择每个元素的期望次数。
seed:随机数生成器的种子。

rdd = sc.parallelize(range(10),2)

#每个元素被抽到的概率为0.5,但输出的元素不一定是5个
print(rdd.sample(withReplacement=False, fraction=0.5,seed=0).collect())

#每个元素被抽到的期望次数是2,但输出的元素不一定是20个
print(rdd.sample(withReplacement=True, fraction=2,seed=0).collect())

输出结果:

[1, 4, 5, 7]
[0, 0, 1, 1, 1, 1, 2, 2, 2, 2, 3, 4, 4, 4, 4, 5, 5, 5, 6, 6, 6, 7, 8, 9]

⑤distinct

rdd.distinct()对原始RDD进行去重。

rdd = sc.parallelize([1,1,2,2,3,3,4,5])
rdd.distinct().collect()

输出结果:

[4, 1, 5, 2, 3]
⑥subtract

rdd1.subtract(rdd2)用于计算两个RDD的差集,返回在rdd1中但不在rdd2中的元素。

rdd1 = sc.parallelize([1, 2, 3, 4, 5])
rdd2 = sc.parallelize([4, 5, 6, 7, 8])
rdd1.subtract(rdd2).collect()

输出结果:

[1, 2, 3]
union

rdd1.union(rdd2)用于计算两个RDD的并集,需要注意返回结果中可能带有重复元素

rdd1 = sc.parallelize([1, 2, 3, 4, 5])
rdd2 = sc.parallelize([4, 5, 6, 7, 8])
rdd1.union(rdd2).collect()

输出结果:

[1, 2, 3, 4, 5, 4, 5, 6, 7, 8]
intersection

rdd1.intersection(rdd2)用于计算两个RDD的交集。

rdd1 = sc.parallelize([1, 2, 3, 4, 5])
rdd2 = sc.parallelize([4, 5, 6, 7, 8])
rdd1.intersection(rdd2).collect()

输出结果:

[4, 5]
cartesian

rdd1.cartesian(rdd2)用于计算两个RDD的笛卡尔积。

rdd1 = sc.parallelize([1, 2, 3])
rdd2 = sc.parallelize(['a', 'b', 'c'])
rdd1.cartesian(rdd2).collect()

输出结果:

[(1, 'a'),
 (1, 'b'),
 (1, 'c'),
 (2, 'a'),
 (2, 'b'),
 (2, 'c'),
 (3, 'a'),
 (3, 'b'),
 (3, 'c')]
sortBy

rdd.sortBy()用于对RDD中的元素按照指定的排序键进行排序。

参数如下:

rdd.sortBy(keyfunc, ascending=True, numPartitions=None)
keyfunc用于从 RDD 的每个元素中提取用于排序的键,可以是lambda匿名函数。
ascending表示排序的顺序。 True为升序,False为降序。
numPartitions表示返回结果RDD的分区数。

data = [(1, 'apple'), (3, 'orange'), (2, 'banana'), (4, 'grape')]
rdd = sc.parallelize(data)
rdd.sortBy(lambda x: x[0], ascending=True,numPartitions=2).collect()

输出结果:

[(1, 'apple'), (2, 'banana'), (3, 'orange'), (4, 'grape')]
⑪zip

rdd1.zip(rdd2)按照拉链方式将两个RDD中的元素一对一地合并成元组,效果类似python的zip函数,需要两个RDD具有相同的分区,每个分区元素数量相同。

rdd1 = sc.parallelize([1, 2, 3, 4, 5],2)
rdd2 = sc.parallelize(['a', 'b', 'c', 'd', 'e'],2)
rdd1.zip(rdd2).collect()

输出结果:

[(1, 'a'), (2, 'b'), (3, 'c'), (4, 'd'), (5, 'e')]
zipWithIndex

rdd.zipWithIndex()用于将RDD中的每个元素与其在 RDD 中的索引位置一对一地合并成元组。

rdd = sc.parallelize([10, 20, 30, 40, 50])
rdd.zipWithIndex().collect()

输出结果:

[(10, 0), (20, 1), (30, 2), (40, 3), (50, 4)]

4、常用Transformation操作(键值对)

PairRDD中的元素是键值对,Spark提供了针对键值对的一系列转换和操作,使得对数据进行分组、聚合和排序等操作更加方便。

reduceByKey

rdd.reduceByKey()用于对RDD进行聚合的Transformation操作,将具有相同键的所有值根据提供的聚合函数进行合并。

参数如下:
reduceByKey(func, numPartitions=None, partitionFunc=<function portable_hash>)
func: 聚合函数,接受两个参数,用于将相同键的值进行合并。
numPartitions: 可选参数,用于指定返回结果RDD的分区数。
partitionFunc: 可选参数,用于指定分区函数,默认为哈希分区函数。

data = [('x', 3), ('y', 5), ('x', 2), ('y', 1), ('x', 1), ('z', 4)]
rdd = sc.parallelize(data)
rdd.reduceByKey(lambda x, y: x + y).collect()

输出结果:

[('y', 6), ('x', 6), ('z', 4)]
groupByKey

rdd.groupByKey()用于对PairRDD进行分组的Transformation操作,将相同键的所有值放在一个迭代器中并返回新RDD,适用于不涉及值的聚合操作,只需按键进行分组的情况。不过groupByKey()可能导致数据倾斜,特别是当某个键对应的值非常多时。另外,生成的结果是以键值对形式的迭代器,存在大量数据时可能导致内存溢出。

data = [('x', 3), ('y', 5), ('x', 2), ('y', 1), ('x', 1), ('z', 4)]
rdd = sc.parallelize(data)
print(rdd.groupByKey().collect())

for key, values in rdd.groupByKey().collect():
    print(f"{key}: {list(values)}")

输出结果:

[('y', <pyspark.resultiterable.ResultIterable object at 0x00000259F5816C10>), ('x', <pyspark.resultiterable.ResultIterable object at 0x00000259F574C790>), ('z', <pyspark.resultiterable.ResultIterable object at 0x00000259F574C490>)]
y: [5, 1]
x: [3, 2, 1]
z: [4]
sortByKey

rdd.sortByKey()用于对PairRDD进行按键排序的Transformation操作,按照键进行排序,并返回一个新RDD。sortByKey()可能导致数据倾斜,特别是当某个键对应的值非常多时。在数据量庞大时,可能会影响性能,因为需要将数据在不同分区间移动以进行排序。

参数如下:
ascending:表示排序的顺序。True为升序(默认),False为降序。

data = [(3, 'x'), (5, 'y'), (2, 'z'), (4, 's')]
rdd = sc.parallelize(data)
rdd.sortByKey().collect()

输出结果:

[(2, 'z'), (3, 'x'), (4, 's'), (5, 'y')]
join / leftOuterJoin / rightOuterJoin

rdd1.join(rdd2)用于对两个PairRDD进行连接的Transformation操作,根据键将两个 PairRDD 中的元素进行连接。类似SQL中的inner join。

rdd1.leftOuterJoin(rdd2)、rdd1.rightOuterJoin(rdd2)分别是左关联、右关联。如果另一侧PairRDD 中没有匹配的键,则对应位置的值为None。

data1 = [('Tom', 18), ('Jerry', 19), ('Alice', 17)]
data2 = [('Tom', 'male'), ('Bob', 'male'), ('Alice', 'female')]
rdd1 = sc.parallelize(data1)
rdd2 = sc.parallelize(data2)
print(rdd1.join(rdd2).collect())
print(rdd1.leftOuterJoin(rdd2).collect())
print(rdd1.rightOuterJoin(rdd2).collect())

输出结果:

[('Tom', (18, 'male')), ('Alice', (17, 'female'))]
[('Jerry', (19, None)), ('Tom', (18, 'male')), ('Alice', (17, 'female'))]
[('Tom', (18, 'male')), ('Bob', (None, 'male')), ('Alice', (17, 'female'))]
cogroup

rdd1.cogroup(rdd2)用于对两个PairRDD进行先后两次分组连接的Transformation操作,相当于对rdd1、rdd2分别进行goupByKey,再对两个结果进行groupByKey。

rdd1 = sc.parallelize([("a", 1), ("a", 2), ("b", 3)])
rdd2 = sc.parallelize([("a", 3), ("b", 4)])
[(x, tuple(map(list, y))) for x, y in sorted(list(rdd1.cogroup(rdd2).collect()))]

输出结果:

[('a', ([1, 2], [3])), ('b', ([3], [4]))]
subtractByKey

rdd1.subtractByKey(rdd2)用于对两个PairRDD求差集的Transformation操作,即返回在rdd1中而不在rdd2中的元素。

rdd1 = sc.parallelize([("x", 1), ("y", 2), ("z", 3)])
rdd2 = sc.parallelize([("x", 3), ("y", 4)])
rdd1.subtractByKey(rdd2).collect()

输出结果:

[('z', 3)]
foldByKey

foldByKey的操作和reduceByKey类似,但是foldByKey可以提供一个初始值

data = [('x', 1), ('x', 2), ('x', 3),  ('y', 1), ('y', 2), ('z', 1)]
rdd = sc.parallelize(data)
print(rdd.foldByKey(0, lambda x, y: x + y).collect())
print(rdd.foldByKey(1, lambda x, y: x + y).collect())

输出结果:

[('y', 3), ('x', 6), ('z', 1)]
[('y', 5), ('x', 8), ('z', 2)]

5、分区操作

RDD分区操作主要分为调整分区与转换分区操作。

调整分区操作用于调整已有RDD的分区结构,不改变数据的物理位置,仅影响分区元数据,通常性能开销较小。例如:分区数调整,即调整现有RDD的分区数,但不移动数据。

转换分区操作改变了RDD的分区结构,通常是在数据上执行Transformation操作,产生一个新的RDD,其分区数可能发生变化。例如通过指定的分区数或者使用一些具体的分区算法,重新组织数据分区。在数据的重新组织过程中可能涉及跨分区的数据移动,通常伴随着性能开销。

①glom

rdd.glom()用于将每个分区的数据转换为一个数组,是Transformation操作。适用于需要对每个分区进行整体操作的场景。

data = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
rdd = sc.parallelize(data, 3)
rdd.glom().collect()

输出结果:

[[1, 2, 3], [4, 5, 6], [7, 8, 9, 10]]
②HashPartitioner

rdd.HashPartitioner()用于对PairRDD进行哈希分区,即根据键的哈希值将数据划分到不同的分区中。

data = [('a', 1), ('b', 2), ('c', 3), ('d', 4), ('e', 5)]
rdd1 = sc.parallelize(data)
rdd2 = rdd1.partitionBy(2)

for partition, data in enumerate(rdd2.glom().collect()):
    print(f"Partition {partition}: {data}")

输出结果:

Partition 0: [('b', 2), ('c', 3), ('d', 4)]
Partition 1: [('a', 1), ('e', 5)]
mapPartitions / mapPartitionsWithIndex

rdd.mapPartitions()是Transformation操作,用于对RDD的每个分区执行一个自定义映射函数,该函数可以处理分区内的所有元素,而不是一次仅处理一个元素。mapPartitions能够减少通信开销,因为映射操作是在每个分区内进行的,适用于需要对整个分区进行批量操作的场景,而不适用于需要考虑跨分区元素之间关系的场景。

rdd.mapPartitionsWithIndex()类似于mapPartitions,但提供了分区索引信息,允许更细粒度的控制。

data = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
rdd = sc.parallelize(data, 3)
print(rdd.mapPartitions(lambda x:(i * 2 for i in x)).collect())
print(rdd.mapPartitionsWithIndex(lambda index,x:((index, i*2) for i in x)).collect())

输出结果:

[2, 4, 6, 8, 10, 12, 14, 16, 18, 20]
[(0, 2), (0, 4), (0, 6), (1, 8), (1, 10), (1, 12), (2, 14), (2, 16), (2, 18), (2, 20)]
④coalesce

coalesce()用于减少分区数的Transformation操作,可以尽量避免数据迁移,提升效率。

参数如下:

coalesce(numPartitions, shuffle=False)
numPartitions:新的分区数。
shuffle:是否进行数据洗牌,默认为False。当设置为 True 时,将触发数据洗牌操作,否则只是简单地减小分区数。

rdd = sc.parallelize(range(20),10)
print(rdd.glom().collect())
print(rdd.coalesce(2,shuffle=False).glom().collect())
print(rdd.coalesce(2,shuffle=True).glom().collect())

输出结果:

[[0, 1], [2, 3], [4, 5], [6, 7], [8, 9], [10, 11], [12, 13], [14, 15], [16, 17], [18, 19]]
[[0, 1, 2, 3, 4, 5, 6, 7, 8, 9], [10, 11, 12, 13, 14, 15, 16, 17, 18, 19]]
[[0, 1, 4, 5, 6, 7, 12, 13, 16, 17], [2, 3, 8, 9, 10, 11, 14, 15, 18, 19]]
⑤repartition

rdd.repartition()用于重新分区的Transformation操作,可以增加或减少分区数,通过shuffle来重新组织数据。允许动态调整RDD的分区数,可在数据分布不均匀时提高计算性能。

对比:coalesce在已有分区基础上尽量减少数据shuffle,而repartition会创建新分区并且使用full shuffle。

rdd = sc.parallelize(range(25),25)
print(rdd.glom().collect())
print(rdd.repartition(5).glom().collect())
print(rdd.coalesce(5).glom().collect())

输出结果:

[[0], [1], [2], [3], [4], [5], [6], [7], [8], [9], [10], [11], [12], [13], [14], [15], [16], [17], [18], [19], [20], [21], [22], [23], [24]]
[[6, 8, 11, 15, 20, 21], [0, 9, 16, 18, 24], [2, 3, 7, 14, 19, 22], [1], [4, 5, 10, 12, 13, 17, 23]]
[[0, 1, 2, 3, 4], [5, 6, 7, 8, 9], [10, 11, 12, 13, 14], [15, 16, 17, 18, 19], [20, 21, 22, 23, 24]]
⑥partitionBy

rdd.partitionBy()用于对PairRDD重新分区,是Transformation操作,可以根据指定的分区器对键值对数据进行重新分区,以更好地控制数据的分布。

data = [('a', 1), ('b', 2), ('c', 3), ('d', 4), ('e', 5)]
pair_rdd = sc.parallelize(data)
hash_partitioned_rdd = pair_rdd.partitionBy(2)
for partition, data in enumerate(hash_partitioned_rdd.glom().collect()):
    print(f"Partition {partition}: {data}")

输出结果:

Partition 0: [('b', 2), ('c', 3), ('d', 4)]
Partition 1: [('a', 1), ('e', 5)]

6、缓存操作

如果多个任务在计算过程中共享同一个RDD作为中间数据,通过对其进行缓存,将其存储在内存中,可以显著加快计算速度。但是对RDD的缓存并不会立即生效,而是在该RDD第一次被计算出来时才会进行缓存。在不再需要某个RDD时,可以使用unpersist来释放缓存,而这个操作是立即执行的。这样可以有效地管理内存资源,避免不必要的缓存。
另一方面,缓存在提高计算速度的同时,并不会切断RDD的血缘依赖关系。因为缓存的数据可能存在某些分区的节点发生故障的情况,例如内存溢出或者节点损坏。在这种情况下,可以根据血缘关系重新计算受影响分区的数据,确保计算的正确性。
如果需要切断血缘关系,可以使用checkpoint来设置检查点,将RDD保存到磁盘中。与缓存类似,对RDD进行checkpoint同样不会立即生效,而是在该RDD第一次被计算出来时才会保存成检查点。通常,checkpoint适用于一些计算代价非常高昂的中间结果,或者在重复计算结果不可保证完全一致的情况下(例如使用zipWithIndex算子)。
对RDD进行缓存是优化Spark计算性能的有效手段,但需要根据具体情况灵活运用,以确保计算的准确性和效率。

①cache

rdd.cache()用于将RDD的计算结果缓存到内存中,以便在后续操作中重用,可以显著提高迭代算法等需要多次使用同一数据集的性能。rdd.cache使用存储级别MEMORY_ONLY,意味着如果内存不足,Spark可能会根据缓存数据的大小和可用内存的情况进行动态调整,例如将一部分或全部缓存的数据移除,以腾出内存供其他操作使用。

a = sc.parallelize(range(10000),5)
a.cache()
sum_a = a.reduce(lambda x,y:x+y)
cnt_a = a.count()
mean_a = sum_a/cnt_a

print(mean_a)

输出结果:

4999.5
②persist

rdd.persist()用于将RDD中间结果缓存到内存或磁盘中,以便在后续操作中重用。与cache不同,persist允许用户指定不同的存储级别,以更灵活地管理缓存。存储级别即不同的数据缓存的位置和策略,可以是MEMORY_ONLY、DISK_ONLY、MEMORY_AND_DISK(默认)等。

rdd.persist()写入磁盘的文件是临时文件,应用执行完成后就会被删除,可以使用rdd.unpersist()立即释放缓存。

from  pyspark.storagelevel import StorageLevel
a = sc.parallelize(range(10000),5)
a.persist(StorageLevel.MEMORY_AND_DISK)
sum_a = a.reduce(lambda x,y:x+y)
cnt_a = a.count()
mean_a = sum_a/cnt_a

a.unpersist() 
print(mean_a)

输出结果:

4999.5
③checkpoint

rdd.checkpoint()用于将RDD中间结果写入磁盘。由于血缘依赖过长会造成容错成本过高,可以中间阶段做检查点容错,如果检查点之后有节点出现问题,可以从检查点开始重新执行程序,减少开销。需要注意的是,checkpoint操作并不会马上被执行,而是在执行Action操作时才被触发。另外,checkpoint路径保存的文件是永久存在的,不会随着应用的结束而被删除。

sc.setCheckpointDir("./data/checkpoint/")
rdd = sc.parallelize(["a","b","c","d"],2)
rdd_idx = rdd.zipWithIndex() 
rdd_idx.checkpoint() 
rdd_idx.take(3)

输出结果:

[('a', 0), ('b', 1), ('c', 2)]

7、共享变量

共享变量主要用于在分布式计算中实现在任务之间共享数据,以提高性能和降低网络开销。广播变量(broadcast variables)和累加器(accumulators)是两个重要的分布式计算工具,用于在集群上共享数据和累积结果。

①broadcast

当需要在所有工作节点之间共享较小的只读数据集时,使用广播变量可以避免将该数据集多次传输到各个节点。这可以有效减少网络开销,提高性能。典型的应用场景包括在所有节点上使用相同的配置参数、字典或者映射表等。并且可以避免任务间重复传输,如果一个RDD需要在多个任务中使用,而且这个RDD的数据较小,使用广播变量可以避免在不同任务之间多次传输相同的数据。

#广播变量 broadcast 不可变,在所有节点可读
broads = sc.broadcast(100)
rdd = sc.parallelize(range(10))
print(rdd.map(lambda x:x+broads.value).collect())
print(broads.value)

输出结果:

[100, 101, 102, 103, 104, 105, 106, 107, 108, 109]
100
②accumulator

累加器主要用于执行在分布式任务中的“添加”和“合并”操作,通常用于聚合和计数等操作。
例如,可以用累加器来计算在整个集群上发生的某个特定事件的总次数;计算所有节点上某个变量的总和或平均值。

#累加器 只能在Driver上可读,在其它节点只能进行累加
total = sc.accumulator(0)
rdd = sc.parallelize(range(10),3)
rdd.foreach(lambda x:total.add(x))
total.value

输出结果:

45

四、综合应用

1、求平均数

题目:计算1000个随机数的平均数。

import numpy as np
import findspark
import pyspark 
from pyspark import SparkContext, SparkConf
findspark.init()

conf = SparkConf().setAppName("test1").setMaster("local[4]")
sc = SparkContext(conf=conf)

np.random.seed(6)
data = np.random.randint(0, 100, size=1000)
rdd = sc.parallelize(data,2)
print("前10个数:",rdd.take(10))
data_sum = rdd.reduce(lambda x,y:x+y)
data_count = rdd.count()
print("平均数:",data_sum/data_count)

sc.stop()

运行结果:

前10个数: [10, 73, 99, 84, 79, 80, 62, 25, 1, 75]
平均数: 50.227

2、统计出现最多的数

题目:计算1000个随机数中出现最多的数。

import numpy as np
import findspark
import pyspark 
from pyspark import SparkContext, SparkConf
findspark.init()

conf = SparkConf().setAppName("test1").setMaster("local[4]")
sc = SparkContext(conf=conf)
np.random.seed(6)
data = np.random.randint(0, 100, size=1000)
rdd1 = sc.parallelize(data,2)
rdd2 = rdd1.map(lambda x:(x,1)).reduceByKey(lambda x,y:x+y)
max_count = rdd2.map(lambda x:x[1]).reduce(lambda x,y: x if x>=y else y)
max_value = rdd2.filter(lambda x:x[1]==max_count).map(lambda x:x[0]).collect()[0]
print("出现最多的数:",max_value," 出现次数:",max_count)

sc.stop()

运行结果:

出现最多的数: 63  出现次数: 18

五、总结

总的来说,PySpark适合初学者入门学习,由于python门槛不高,易于掌握,可以通过PySpark了解Spark的运行机制以及RDD算子的使用。但如果是需要几百台服务器才能运行的任务场景, PySpark的UDF(User Defined Functions)的性能差距肯定比不过Spark-Scala。

至于选什么语言,取决于业务需求。如果是处理简单的数据清洗聚合,且数据量非常大,用Scala会有性能优势,可以节约计算资源。如果需要处理较为复杂的算法模型,依赖于各种第三方包,那么使用Python会更好。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1421475.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Matlab plot绘图的 title 语法

x 0:1:10; >> y x.^2 -10*x15; >> plot(x,y) >> title(x_y, interpreter, none) title 里面的 x_y , y不会被当作下标。

数据结构--堆排序(超详细!)

一、前言 堆排序与Top K问题是堆的两大应用&#xff0c;在我们日常也有很广泛的用处 我们已经上面已经说过了堆&#xff0c;这次来说堆的其中一个应用---堆排序。 二、堆排序 堆排序优势在哪里&#xff1f;有什么恐怖之处吗&#xff1f; 重点&#xff1a;拿一个举例&…

你ping一下,服务器累成狗--第二篇

你ping一下&#xff0c;服务器累成狗-目录篇文章浏览阅读1.7k次&#xff0c;点赞65次&#xff0c;收藏20次。我们的电脑怎么干活的https://blog.csdn.net/u010187815/article/details/135796967 你ping一下&#xff0c;服务器累成狗--第一篇文章浏览阅读62次&#xff0c;点赞6…

记录 | ubuntu nm命令的基本使用

什么是nm命令 nm命令是linux下针对某些特定文件的分析工具&#xff0c;能够列出库文件&#xff08;.a、.lib&#xff09;、目标文件&#xff08;*.o&#xff09;、可执行文件的符号表。 nm命令的常用参数 -A 或 -o 或 --print-file-name&#xff1a;打印出每个符号属于的文件…

鸿蒙(HarmonyOS)项目方舟框架(ArkUI)之DataPanel组件

鸿蒙&#xff08;HarmonyOS&#xff09;项目方舟框架&#xff08;ArkUI&#xff09;之DataPanel组件 一、操作环境 操作系统: Windows 10 专业版、IDE:DevEco Studio 3.1、SDK:HarmonyOS 3.1 二、DataPanel组件 数据面板组件&#xff0c;用于将多个数据占比情况使用占比图进…

1. 两数之和(力扣LeetCode)

文章目录 1. 两数之和题目描述哈希表&#xff1a;map二分查找暴力&#xff1a;双重for循环 1. 两数之和 题目描述 给定一个整数数组 nums 和一个整数目标值 target&#xff0c;请你在该数组中找出 和为目标值 target 的那 两个 整数&#xff0c;并返回它们的数组下标。 你可…

永磁同步电机速度环闭环控制

文章目录 1、速度环分析2、电机参数3、PI计算4、模型仿真4.1 模型总览4.2 实际转速与参考转速对比4.3 转矩波形4.4 相电流采样波形 模型下载地址&#xff1a; 链接: 速度闭环模型&#xff08;速度电流双闭环&#xff09; 1、速度环分析 2、电机参数 Udc24 V Rs0.6 LdLq1.4e-3…

Apache POI 处理excel文件 记录用法

Apache POI 写excel public static void write() throws IOException {//再内存中创建了一个Excel文件XSSFWorkbook excel new XSSFWorkbook();//创建一个sheet页XSSFSheet sheet excel.createSheet("info");//这里创建行对象,这里的rownum 是从0开始的,类似于数…

C++进阶--继承

概念 继承&#xff0c;允许一个类&#xff08;称为子类或派生类&#xff09;从另一个类&#xff08;称为父类或基类&#xff09;继承属性和方法。 继承的主要目的是实现代码的重用和构建类之间的层次关系。通过继承&#xff0c;子类可以获得父类的特性&#xff0c;包括数据成员…

qt-C++笔记之QStringList、QList<QString>、QString、QChar、QList<QChar>区别

qt-C笔记之QStringList、QList、QString、QChar、QList区别 —— 杭州 2024-01-30 凌晨0:27 参考博文&#xff1a;qt-C笔记之QStringList code review! 文章目录 qt-C笔记之QStringList、QList<QString>、QString、QChar、QList<QChar>区别1.Qt的字符容器类1.QSt…

维护管理Harbor,docker容器的重启策略

维护管理Harbor 通过HarborWeb创建项目 在 Harbor 仓库中&#xff0c;任何镜像在被 push 到 regsitry 之前都必须有一个自己所属的项目。 单击“项目”&#xff0c;填写项目名称&#xff0c;项目级别若设置为"私有"&#xff0c;则不勾选。如果设置为公共仓库&#…

【个人博客搭建】Hexo安装部署

目录 一、本地构建Hexo (一) 安装前提 1. Node.js 2. Git 3. Hexo (二) 初始化Hexo 1. 初始化博客目录 2. 配置网站基本信息 (三) 主题配置 1. 选择主题 2. 下载主题 (四) 本地启动Hexo 1. 生成静态文件 2. 启动服务 二、部署 (一) 部署到Github Pages 1. 新建…

线性代数------矩阵的运算和逆矩阵

矩阵VS行列式 矩阵是一个数表&#xff0c;而行列式是一个具体的数&#xff1b; 矩阵是使用大写字母表示&#xff0c;行列式是使用类似绝对值的两个竖杠&#xff1b; 矩阵的行数可以不等于列数&#xff0c;但是行列式的行数等于列数&#xff1b; 1.矩阵的数乘就是矩阵的每个…

4D毫米波雷达分类和工程实现

4D毫米波目标检测信息丰富&#xff0c;可获得目标3维位置信息、径向速度vr和rcs等&#xff0c;能够对目标准确分类。 4D毫米波和激光做好时空同步&#xff0c;可以用激光目标给4D毫米波做标注&#xff0c;提升标注效率。 1 激光用做4D毫米波分类真值 128线激光推理的结果作为4…

如何从视频中提取高清图片?可以这样截取

如何从视频中提取高清图片&#xff1f;从视频中提取高清图片可以方便我们制作各种用途所需的素材&#xff0c;如海报、社交媒体配图等。此外&#xff0c;高清图片的细节和色彩也更丰富&#xff0c;可以更好地满足我们的视觉需求。从视频中提取高清图片是一项需要技巧的任务&…

windows上使用anconda安装tensorrt环境

windows上使用anconda安装tensorrt环境 1 安装tensorrt1.1 下载最新的稳定的tensorrt 8.6.1(tensorrt对应的cuda、cudnn等版本是参考链接4)1.2 将tensorrt添加到环境变量1.3 安装tensorrt依赖1.4 安装Pycuda1.5 安装pytorch 2 测试2.1 测试TensorRT 样例(这个测试主要来源于参考…

InsideCli、OutsideCli-电源管理(23国赛真题)

2023全国职业院校技能大赛网络系统管理赛项–模块B&#xff1a;服务部署&#xff08;WindowServer2022&#xff09; 文章目录 题目配置步骤验证 题目 设置电源配置&#xff0c;以便客户端在通电的情况下&#xff0c;永不进入睡眠。 配置步骤 验证

小猪o2o生活通系统更新到了v24.1版本了php文件开源了提供VUE了但是车牌识别功能你真得会用吗

一.车牌识别设置项 车牌识别设置项总开关&#xff1a;系统后台-社区管理-社区配置-车牌识别配置。 平台需要开启车牌识别功能&#xff0c;其次平台可以选择车牌识别功能是由平台配置还是小区自己配置有需要提供代码的可以Q我昵称注明&#xff1a;CSDN网友。如果是平台自己配置&…

2024年火爆《幻兽帕鲁》可以macos系统运行吗?

幻兽帕鲁已经爆了&#xff0c;你和朋友们都是在哪个平台一起玩的&#xff1f; 这款有些类似宝可梦的游戏&#xff0c;已经以野火燎原之势席卷互联网&#xff0c;并且势必会持续一段时间&#xff0c;你可别说你不知道。 《幻兽帕鲁》目前能在哪些平台上运行&#xff1f; 这款由…

linux 下gdal库(python)

之前在windows下安装gdal&#xff0c;先要下安装包再安装。这次在linux上安装&#xff0c;试了一下pip install gdal&#xff0c;不可以。想着linux应该一样&#xff0c;结果一搜网上教程一堆&#xff0c;乱七八糟的。 搞了一个小时 最后发现一句话就可以&#xff01;&#xf…