RDD相关知识
RDD介绍
RDD
是Spark
的核心抽象,即 弹性分布式数据集(residenta distributed dataset
)。代表一个不可变,可分区,里面元素可并行计算的集合。其具有数据流模型的特点:自动容错,位置感知性调度和可伸缩性。 在Spark
中,对数据的所有操作不外乎创建RDD
、转化已有RDD
以及调用 RDD
操作进行求值。
RDD结构图
RDD具有五大特性
-
一组分片(
Partition
),即数据集的基本组成单位(RDD
是由一系列的partition
组成的)。将数据加载为RDD
时,一般会遵循数据的本地性(一般一个HDFS
里的block
会加载为一个partition
)。 -
RDD
之间的依赖关系。依赖还具体分为宽依赖和窄依赖,但并不是所有的RDD
都有依赖。为了容错(重算,cache
,checkpoint
),也就是说在内存中的RDD
操作时出错或丢失会进行重算。 -
由一个函数计算每一个分片。
Spark
中的RDD
的计算是以分片为单位的,每个RDD
都会实现compute
函数以达到这个目的。compute
函数会对迭代器进行复合,不需要保存每次计算的结果。 -
(可选)如果
RDD
里面存的数据是key-value
形式,则可以传递一个自定义的Partitioner
进行重新分区。 -
(可选)
RDD
提供一系列最佳的计算位置,即数据的本地性。
RDD之间的依赖关系
RDD
之间有一系列的依赖关系,依赖关系又分为窄依赖和宽依赖。
窄依赖:父RDD
和子RDD
partition
之间的关系是一对一的。或者父RDD
一个partition
只对应一个子RDD
的partition
情况下的父RDD
和子RDD
partition
关系是多对一的,也可以理解为没有触发shuffle
。
宽依赖:父RDD
与子RDD
partition
之间的关系是一对多。 父RDD
的一个分区的数据去到子RDD
的不同分区里面。也可以理解为触发了shuffle
。
特别说明:对于join
操作有两种情况,如果join
操作的使用每个partition
仅仅和已知的Partition
进行join
,此时的join
操作就是窄依赖;其他情况的join
操作就是宽依赖。
RDD创建
-
从
Hadoop
文件系统(或与Hadoop
兼容的其他持久化存储系统,如Hive
、Cassandra
、HBase
)输入(例如HDFS
)创建。 -
通过集合进行创建。
算子
算子可以分为Transformation
转换算子和Action
行动算子。 RDD
是懒执行的,如果没有行动操作出现,所有的转换操作都不会执行。
RDD
直观图,如下:
RDD 的 五大特性
-
一组分片(
Partition
),即数据集的基本组成单位。对于RDD
来说,每个分片都会被一个计算任务处理,并决定并行计算的粒度。用户可以在创建RDD
时指定RDD
的分片个数,如果没有指定,那么就会采用默认值。默认值就是程序所分配到的CPU Core
的数目。 -
一个计算每个分区的函数。
Spark
中RDD
的计算是以分片为单位的,每个RDD
都会实现compute
函数以达到这个目的。compute
函数会对迭代器进行复合,不需要保存每次计算的结果。 -
RDD
之间的依赖关系。RDD
的每次转换都会生成一个新的RDD
,所以RDD
之间就会形成类似于流水线一样的前后依赖关系。在部分分区数据丢失时,Spark
可以通过这个依赖关系重新计算丢失的分区数据,而不是对RDD
的所有分区进行重新计算。 -
一个
Partitioner
,即RDD
的分片函数。当前Spark
中实现了两种类型的分片函数,一个是基于哈希的HashPartitioner
,另外一个是基于范围的RangePartitioner
。只有对于于key-value
的RDD
,才会有Partitioner
,非key-value
的RDD
的Parititioner
的值是None
。Partitioner
函数不但决定了RDD
本身的分片数量,也决定了parent RDD Shuffle
输出时的分片数量。 -
一个列表,存储存取每个
Partition
的优先位置(preferred location
)。对于一个HDFS
文件来说,这个列表保存的就是每个 Partition 所在的块的位置。按照“移动数据不如移动计算”的理念,Spark 在进行任务调度的时候,会尽可能地将计算任务分配到其所要处理数据块的存储位置。
相关API介绍
SparkContext
创建;
sc = SparkContext("local", "Simple App")
说明:"local"
是指让Spark
程序本地运行,"Simple App"
是指Spark
程序的名称,这个名称可以任意(为了直观明了的查看,最好设置有意义的名称)。
- 集合并行化创建
RDD
;
data = [1,2,3,4]
rdd = sc.parallelize(data)
collect
算子:在驱动程序中将数据集的所有元素作为数组返回(注意数据集不能过大);
rdd.collect()
- 停止
SparkContext
。
sc.stop()
# -*- coding: UTF-8 -*-
from pyspark import SparkContext
if __name__ == "__main__":
#********** Begin **********#
# 1.初始化 SparkContext,该对象是 Spark 程序的入口
sc = SparkContext("local", "Simple App")
# 2.创建一个1到8的列表List
data = [1, 2, 3, 4, 5, 6, 7, 8]
# 3.通过 SparkContext 并行化创建 rdd
rdd = sc.parallelize(data)
# 4.使用 rdd.collect() 收集 rdd 的内容。 rdd.collect() 是 Spark Action 算子,在后续内容中将会详细说明,主要作用是:收集 rdd 的数据内容
result = rdd.collect()
# 5.打印 rdd 的内容
print(result)
# 6.停止 SparkContext
sc.stop()
#********** End **********#
读取外部数据集创建RDD
编写读取本地文件创建Spark RDD
的程序。
相关知识
为了完成本关任务,你需要掌握:1.如何读取本地文件系统中的文件来创建Spark RDD
。
textFile 介绍
PySpark
可以从Hadoop
支持的任何存储源创建分布式数据集,包括本地文件系统,HDFS
,Cassandra
,HBase
,Amazon S3
等。Spark
支持文本文件,SequenceFiles
和任何其他Hadoop InputFormat
。
文本文件RDD
可以使用创建SparkContex
的textFile
方法。此方法需要一个 URI
的文件(本地路径的机器上,或一个hdfs://,s3a://
等 URI),并读取其作为行的集合。这是一个示例调用:
distFile = sc.textFile("data.txt")
# -*- coding: UTF-8 -*- from pyspark import SparkContext if __name__ == '__main__': #********** Begin **********# # 1.初始化 SparkContext,该对象是 Spark 程序的入口 sc = SparkContext("local", "Simple App") # 文本文件 RDD 可以使用创建 SparkContext 的t extFile 方法。 #此方法需要一个 URI的 文件(本地路径的机器上,或一个hdfs://,s3a://等URI), #并读取其作为行的集合 # 2.读取本地文件,URI为:/root/wordcount.txt rdd = sc.textFile("/root/wordcount.txt") # 3.使用 rdd.collect() 收集 rdd 的内容。 #rdd.collect() 是 Spark Action 算子,在后续内容中将会详细说明,主要作用是:收集 rdd 的数据内容 result = rdd.collect() # 4.打印 rdd 的内容 print(result) # 5.停止 SparkContext sc.stop() #********** End **********#
map
算子
本关任务:使用Spark
的 map
算子按照相关需求完成转换操作。
相关知识
为了完成本关任务,你需要掌握:如何使用map
算子。
map
将原来RDD
的每个数据项通过map
中的用户自定义函数 f
映射转变为一个新的元素。
图中每个方框表示一个RDD
分区,左侧的分区经过自定义函数 f:T->U
映射为右侧的新 RDD
分区。但是,实际只有等到 Action
算子触发后,这个 f
函数才会和其他函数在一个 Stage
中对数据进行运算。
map 案例
-
sc = SparkContext("local", "Simple App") data = [1,2,3,4,5,6] rdd = sc.parallelize(data) print(rdd.collect()) rdd_map = rdd.map(lambda x: x * 2) print(rdd_map.collect())
输出:
[1, 2, 3, 4, 5, 6]
[2, 4, 6, 8, 10, 12]
说明:rdd1
的元素( 1 , 2 , 3 , 4 , 5 , 6
)经过 map
算子( x -> x*2
)转换成了 rdd2
( 2 , 4 , 6 , 8 , 10
)。
编程要求
请仔细阅读右侧代码,根据方法内的提示,在Begin - End
区域内进行代码补充,具体任务如下:
需求:使用 map
算子,将rdd
的数据 (1, 2, 3, 4, 5)
按照下面的规则进行转换操作,规则如下:
- 偶数转换成该数的平方;
- 奇数转换成该数的立方。
# -*- coding: UTF-8 -*-
from pyspark import SparkContext
if __name__ == "__main__":
#********** Begin **********#
# 1.初始化 SparkContext,该对象是 Spark 程序的入口
sc = SparkContext("local", "Simple App")
# 2.创建一个1到5的列表List
data = [1, 2, 3, 4, 5]
# 3.通过 SparkContext 并行化创建 rdd
rdd = sc.parallelize(data)
# 4.使用rdd.collect() 收集 rdd 的元素。
print(rdd.collect())
"""
使用 map 算子,将 rdd 的数据 (1, 2, 3, 4, 5) 按照下面的规则进行转换操作,规则如下:
需求:
偶数转换成该数的平方
奇数转换成该数的立方
"""
# 5.使用 map 算子完成以上需求
rdd_map = rdd.map(lambda x: x * x if x % 2 == 0 else x * x * x)
# 6.使用rdd.collect() 收集完成 map 转换的元素
print(rdd_map.collect())
# 7.停止 SparkContext
sc.stop()
#********** End **********#
mapPartitions
算子
mapPartitions
mapPartitions
函数获取到每个分区的迭代器,在函数中通过这个分区整体的迭 代器对整个分区的元素进行操作。
图中每个方框表示一个RDD
分区,左侧的分区经过自定义函数 f:T->U
映射为右侧的新RDD
分区。
mapPartitions 与 map
map
:遍历算子,可以遍历RDD
中每一个元素,遍历的单位是每条记录。
mapPartitions
:遍历算子,可以改变RDD
格式,会提高RDD
并行度,遍历单位是Partition
,也就是在遍历之前它会将一个Partition
的数据加载到内存中。
那么问题来了,用上面的两个算子遍历一个RDD
谁的效率高? mapPartitions
算子效率高。
mapPartitions 案例
-
def f(iterator): list = [] for x in iterator: list.append(x*2) return list if __name__ == "__main__": sc = SparkContext("local", "Simple App") data = [1,2,3,4,5,6] rdd = sc.parallelize(data) print(rdd.collect()) partitions = rdd.mapPartitions(f) print(partitions.collect())
输出:
[1, 2, 3, 4, 5, 6]
[2, 4, 6, 8, 10, 12]
mapPartitions()
:传入的参数是rdd
的 iterator
(元素迭代器),返回也是一个iterator
(迭代器)。
# -*- coding: UTF-8 -*-
from pyspark import SparkContext
#********** Begin **********#
def f(iterator):
list = []
for x in iterator:
list.append((x, len(x)))
return list
#********** End **********#
if __name__ == "__main__":
#********** Begin **********#
# 1.初始化 SparkContext,该对象是 Spark 程序的入口
sc = SparkContext("local", "Simple App")
# 2. 一个内容为("dog", "salmon", "salmon", "rat", "elephant")的列表List
data = ["dog", "salmon", "salmon", "rat", "elephant"]
# 3.通过 SparkContext 并行化创建 rdd
rdd = sc.parallelize(data)
# 4.使用rdd.collect() 收集 rdd 的元素。
print(rdd.collect())
"""
使用 mapPartitions 算子,将 rdd 的数据 ("dog", "salmon", "salmon", "rat", "elephant") 按照下面的规则进行转换操作,规则如下:
需求:
将字符串与该字符串的长度组合成一个元组,例如:
dog --> (dog,3)
salmon --> (salmon,6)
"""
# 5.使用 mapPartitions 算子完成以上需求
partitions = rdd.mapPartitions(f)
# 6.使用rdd.collect() 收集完成 mapPartitions 转换的元素
print(partitions.collect())
# 7.停止 SparkContext
sc.stop()
#********** End **********#
filter
算子。
filter
filter
函数功能是对元素进行过滤,对每个元素应用f
函数,返 回值为 true
的元素在RDD
中保留,返回值为false
的元素将被过滤掉。内部实现相当于生成。
FilteredRDD(this,sc.clean(f))
下面代码为函数的本质实现:
-
def filter(self, f): """ Return a new RDD containing only the elements that satisfy a predicate. >>> rdd = sc.parallelize([1, 2, 3, 4, 5]) >>> rdd.filter(lambda x: x % 2 == 0).collect() [2, 4] """ def func(iterator): return filter(fail_on_stopiteration(f), iterator) return self.mapPartitions(func, True)
上图中每个方框代表一个 RDD
分区, T
可以是任意的类型。通过用户自定义的过滤函数 f
,对每个数据项操作,将满足条件、返回结果为 true
的数据项保留。例如,过滤掉 V2
和 V3
保留了 V1
,为区分命名为 V’1
。
filter 案例
-
sc = SparkContext("local", "Simple App") data = [1,2,3,4,5,6] rdd = sc.parallelize(data) print(rdd.collect()) rdd_filter = rdd.filter(lambda x: x>2) print(rdd_filter.collect())
输出:
[1, 2, 3, 4, 5, 6]
[3, 4, 5, 6]
说明:rdd1( [ 1 , 2 , 3 , 4 , 5 , 6 ] )
经过 filter
算子转换成 rdd2( [ 3 ,4 , 5 , 6 ] )
。
使用 filter
算子,将 rdd
中的数据 (1, 2, 3, 4, 5, 6, 7, 8)
按照以下规则进行过滤,规则如下:
- 过滤掉
rdd
中的所有奇数。
# -*- coding: UTF-8 -*-
from pyspark import SparkContext
if __name__ == "__main__":
#********** Begin **********#
# 1.初始化 SparkContext,该对象是 Spark 程序的入口
sc = SparkContext("local", "Simple App")
# 2.创建一个1到8的列表List
data = [1, 2, 3, 4, 5, 6, 7, 8]
# 3.通过 SparkContext 并行化创建 rdd
rdd = sc.parallelize(data)
# 4.使用rdd.collect() 收集 rdd 的元素。
print(rdd.collect())
"""
使用 filter 算子,将 rdd 的数据 (1, 2, 3, 4, 5, 6, 7, 8) 按照下面的规则进行转换操作,规则如下:
需求:
过滤掉rdd中的奇数
"""
# 5.使用 filter 算子完成以上需求
rdd_filter = rdd.filter(lambda x: x % 2 == 0)
# 6.使用rdd.collect() 收集完成 filter 转换的元素
print(rdd_filter.collect())
# 7.停止 SparkContext
sc.stop()
#********** End **********#
flatMap
算子
flatMap
将原来RDD
中的每个元素通过函数f
转换为新的元素,并将生成的RDD
中每个集合的元素合并为一个集合,内部创建:
FlatMappedRDD(this,sc.clean(f))
上图表示RDD
的一个分区,进行flatMap
函数操作,flatMap
中传入的函数为f:T->U
,T
和U
可以是任意的数据类型。将分区中的数据通过用户自定义函数f
转换为新的数据。外部大方框可以认为是一个RDD
分区,小方框代表一个集合。V1
、V2
、V3
在一个集合作为RDD
的一个数据项,可能存储为数组或其他容器,转换为V’1
、V’2
、V’3
后,将原来的数组或容器结合拆散,拆散的数据形成RDD
中的数据项。
flatMap 案例
sc = SparkContext("local", "Simple App")
data = [["m"], ["a", "n"]]
rdd = sc.parallelize(data)
print(rdd.collect())
flat_map = rdd.flatMap(lambda x: x)
print(flat_map.collect())
输出:
[['m'], ['a', 'n']]
['m', 'a', 'n']
flatMap
:将两个集合转换成一个集合
需求:使用 flatMap
算子,将rdd
的数据 ([1, 2, 3], [4, 5, 6], [7, 8, 9])
按照下面的规则进行转换操作,规则如下:
- 合并
RDD
的元素,例如:([1,2,3],[4,5,6]) --> (1,2,3,4,5,6)
([2,3],[4,5],[6]) --> (1,2,3,4,5,6)
from pyspark import SparkContext if __name__ == "__main__": #********** Begin **********# # 1.初始化 SparkContext,该对象是 Spark 程序的入口 sc = SparkContext("local", "Simple App") # 2.创建一个[[1, 2, 3], [4, 5, 6], [7, 8, 9]] 的列表List list = [[1, 2, 3], [4, 5, 6], [7, 8, 9]] # 3.通过 SparkContext 并行化创建 rdd rdd = sc.parallelize(list) # 4.使用rdd.collect() 收集 rdd 的元素。 print(rdd.collect()) """ 使用 flatMap 算子,将 rdd 的数据 ([1, 2, 3], [4, 5, 6], [7, 8, 9]) 按照下面的规则进行转换操作,规则如下: 需求: 合并RDD的元素,例如: ([1,2,3],[4,5,6]) --> (1,2,3,4,5,6) ([2,3],[4,5],[6]) --> (1,2,3,4,5,6) """ # 5.使用 filter 算子完成以上需求 flat_map = rdd.flatMap(lambda x: x) # 6.使用rdd.collect() 收集完成 filter 转换的元素 print(flat_map.collect()) # 7.停止 SparkContext sc.stop() #********** End **********#
distinct
算子distinct
distinct
将RDD
中的元素进行去重操作。上图中的每个方框代表一个
RDD
分区,通过distinct
函数,将数据去重。 例如,重复数据V1
、V1
去重后只保留一份V1
。distinct 案例
sc = SparkContext("local", "Simple App") data = ["python", "python", "python", "java", "java"] rdd = sc.parallelize(data) print(rdd.collect()) distinct = rdd.distinct()
-
输出
['python', 'python', 'python', 'java', 'java'] ['python', 'java']
print(distinct.collect())
sortByKey
算子sortByKey
-
def sortByKey(self, ascending=True, numPartitions=None, keyfunc=lambda x: x): if numPartitions is None: numPartitions = self._defaultReducePartitions() memory = self._memory_limit() serializer = self._jrdd_deserializer def sortPartition(iterator): sort = ExternalSorter(memory * 0.9, serializer).sorted return iter(sort(iterator, key=lambda kv: keyfunc(kv[0]), reverse=(not ascending))) if numPartitions == 1: if self.getNumPartitions() > 1: self = self.coalesce(1) return self.mapPartitions(sortPartition, True) # first compute the boundary of each part via sampling: we want to partition # the key-space into bins such that the bins have roughly the same # number of (key, value) pairs falling into them rddSize = self.count() if not rddSize: return self # empty RDD maxSampleSize = numPartitions * 20.0 # constant from Spark's RangePartitioner f\fraction = min(maxSampleSize / max(rddSize, 1), 1.0) samples = self.sample(False, f\fraction, 1).map(lambda kv: kv[0]).collect() samples = sorted(samples, key=keyfunc) # we have numPartitions many parts but one of the them has # an implicit boundary bounds = [samples[int(len(samples) * (i + 1) / numPartitions)] for i in range(0, numPartitions - 1)] def rangePartitioner(k): p = bisect.bisect_left(bounds, keyfunc(k)) if ascending: return p else: return numPartitions - 1 - p return self.partitionBy(numPartitions, rangePartitioner).mapPartitions(sortPartition, True)
-
说明:
ascending
参数是指排序(升序还是降序),默认是升序。numPartitions
参数是重新分区,默认与上一个RDD
保持一致。keyfunc
参数是排序规则。sortByKey 案例
sc = SparkContext("local", "Simple App")
data = [("a",1),("a",2),("c",1),("b",1)]
rdd = sc.parallelize(data)
key = rdd.sortByKey()
print(key.collect())
-
输出:
-
[('a', 1), ('a', 2), ('b', 1), ('c', 1)]
需求:使用 sortBy
算子,将 rdd
中的数据进行排序(升序)。
from pyspark import SparkContext
if __name__ == "__main__":
# ********** Begin **********#
# 1.初始化 SparkContext,该对象是 Spark 程序的入口
sc = SparkContext("local", "Simple App")
# 2.创建一个内容为[('B',1),('A',2),('C',3)]的列表List
List = [('B',1),('A',2),('C',3)]
# 3.通过 SparkContext 并行化创建 rdd
rdd = sc.parallelize(List)
# 4.使用rdd.collect() 收集 rdd 的元素
print(rdd.collect())
"""
使用 sortByKey 算子,将 rdd 的数据 ('B', 1), ('A', 2), ('C', 3) 按照下面的规则进行转换操作,规则如下:
需求:
元素排序,例如:
[(3,3),(2,2),(1,1)] --> [(1,1),(2,2),(3,3)]
"""
# 5.使用 sortByKey 算子完成以上需求
key = rdd.sortByKey()
# 6.使用rdd.collect() 收集完成 sortByKey 转换的元素
print(key.collect())
# 7.停止 SparkContext
sc.stop()
# ********** End **********#
mapValues
算子
mapValues
mapValues
:针对(Key, Value)
型数据中的 Value
进行 Map
操作,而不对 Key
进行处理。
上图中的方框代表 RDD
分区。 a=>a+2
代表对 (V1,1)
这样的 Key Value
数据对,数据只对 Value
中的 1
进行加 2
操作,返回结果为 3
。
mapValues 案例
-
sc = SparkContext("local", "Simple App") data = [("a",1),("a",2),("b",1)] rdd = sc.parallelize(data) values = rdd.mapValues(lambda x: x + 2) print(values.collect())
输出:
-
[('a', 3), ('a', 4), ('b', 3)]
需求:使用mapValues
算子,将rdd
的数据 ("1", 1), ("2", 2), ("3", 3), ("4", 4), ("5", 5)
按照下面的规则进行转换操作,规则如下:
- 偶数转换成该数的平方
- 奇数转换成该数的立方
from pyspark import SparkContext if __name__ == "__main__": # ********** Begin **********# # 1.初始化 SparkContext,该对象是 Spark 程序的入口 sc = SparkContext("local", "Simple App") # 2.创建一个内容为[("1", 1), ("2", 2), ("3", 3), ("4", 4), ("5", 5)]的列表List List = [("1", 1), ("2", 2), ("3", 3), ("4", 4), ("5", 5)] # 3.通过 SparkContext 并行化创建 rdd rdd = sc.parallelize(List) # 4.使用rdd.collect() 收集 rdd 的元素 print(rdd.collect()) """ 使用 mapValues 算子,将 rdd 的数据 ("1", 1), ("2", 2), ("3", 3), ("4", 4), ("5", 5) 按照下面的规则进行转换操作,规则如下: 需求: 元素(key,value)的value进行以下操作: 偶数转换成该数的平方 奇数转换成该数的立方 """ # 5.使用 mapValues 算子完成以上需求 values = rdd.mapValues(lambda x: x + 2) # 6.使用rdd.collect() 收集完成 mapValues 转换的元素 print(values.collect()) # 7.停止 SparkContext sc.stop() # ********** End **********#
reduceByKey
算子reduceByKey
reduceByKey
算子,只是两个值合并成一个值,比如叠加。函数实现
def reduceByKey(self, func, numPartitions=None, partitionFunc=portable_hash): return self.combineByKey(lambda x: x, func, func, numPartitions, partitionFunc)
上图中的方框代表 RDD
分区。通过自定义函数 (A,B) => (A + B)
,将相同 key
的数据 (V1,2)
和 (V1,1)
的 value
做加法运算,结果为( V1,3)
。
reduceByKey 案例
sc = SparkContext("local", "Simple App")
data = [("a",1),("a",2),("b",1)]
rdd = sc.parallelize(data)
print(rdd.reduceByKey(lambda x,y:x+y).collect())
输出:
[('a', 3), ('b', 1)]
需求:使用 reduceByKey
算子,将 rdd(key-value类型)
中的数据进行值累加。
例如:
("soma",4), ("soma",1), ("soma",2) -> ("soma",7)
from pyspark import SparkContext
if __name__ == "__main__":
# ********** Begin **********#
# 1.初始化 SparkContext,该对象是 Spark 程序的入口
sc = SparkContext("local", "Simple App")
# 2.创建一个内容为[("python", 1), ("scala", 2), ("python", 3), ("python", 4), ("java", 5)]的列表List
List = [("python", 1), ("scala", 2), ("python", 3), ("python", 4), ("java", 5)]
# 3.通过 SparkContext 并行化创建 rdd
rdd = sc.parallelize(List)
# 4.使用rdd.collect() 收集 rdd 的元素
print(rdd.collect())
"""
使用 reduceByKey 算子,将 rdd 的数据[("python", 1), ("scala", 2), ("python", 3), ("python", 4), ("java", 5)] 按照下面的规则进行转换操作,规则如下:
需求:
元素(key-value)的value累加操作,例如:
(1,1),(1,1),(1,2) --> (1,4)
(1,1),(1,1),(2,2),(2,2) --> (1,2),(2,4)
"""
# 5.使用 reduceByKey 算子完成以上需求
reduce = rdd.reduceByKey(lambda x,y:x+y)
# 6.使用rdd.collect() 收集完成 reduceByKey 转换的元素
print(reduce.collect())
# 7.停止 SparkContext
sc.stop()
# ********** End **********#
Action
的常用算子
count
count()
:返回 RDD
的元素个数。
示例:
-
sc = SparkContext("local", "Simple App") data = ["python", "python", "python", "java", "java"] rdd = sc.parallelize(data) print(rdd.count())
输出:
5
first
first()
:返回 RDD
的第一个元素(类似于take(1)
)。
示例:
-
sc = SparkContext("local", "Simple App") data = ["python", "python", "python", "java", "java"] rdd = sc.parallelize(data) print(rdd.first())
输出:
python
take
take(n)
:返回一个由数据集的前 n
个元素组成的数组。
示例:
-
sc = SparkContext("local", "Simple App") data = ["python", "python", "python", "java", "java"] rdd = sc.parallelize(data) print(rdd.take(2))
输出:
['python', 'python']
reduce
reduce()
:通过func
函数聚集 RDD
中的所有元素,该函数应该是可交换的和关联的,以便可以并行正确计算。
示例:
-
sc = SparkContext("local", "Simple App") data = [1,1,1,1] rdd = sc.parallelize(data) print(rdd.reduce(lambda x,y:x+y))
输出:
4
collect
collect()
:在驱动程序中,以数组的形式返回数据集的所有元素。
示例:
-
sc = SparkContext("local", "Simple App") data = [1,1,1,1] rdd = sc.parallelize(data) print(rdd.collect())
输出:
[1,1,1,1]
具体任务如下:
需求1:使用 count
算子,统计下 rdd
中元素的个数;
需求2:使用 first
算子,获取 rdd
首个元素;
需求3:使用 take
算子,获取 rdd
前三个元素;
需求4:使用 reduce
算子,进行累加操作;
需求5:使用 collect
算子,收集所有元素。
from pyspark import SparkContext
if __name__ == "__main__":
# ********** Begin **********#
# 1.初始化 SparkContext,该对象是 Spark 程序的入口
sc = SparkContext("local", "Simple App")
# 2.创建一个内容为[1, 3, 5, 7, 9, 8, 6, 4, 2]的列表List
List = [1, 3, 5, 7, 9, 8, 6, 4, 2]
# 3.通过 SparkContext 并行化创建 rdd
rdd = sc.parallelize(List)
# 4.收集rdd的所有元素并print输出
print(rdd.collect())
# 5.统计rdd的元素个数并print输出
print(rdd.count())
# 6.获取rdd的第一个元素并print输出
print(rdd.first())
# 7.获取rdd的前3个元素并print输出
print(rdd.take(3))
# 8.聚合rdd的所有元素并print输出
print(rdd.reduce(lambda x,y:x+y))
# 9.停止 SparkContext
sc.stop()
# ********** End **********#