目录
Actions
.take(...) 方法
.collect(...) 方法
.reduce(...) 方法
.count(...) 方法
.saveAsTextFile(...) 方法
.foreach(...) 方法
Actions
与转换不同,动作是在数据集上执行计划任务的;一旦您完成了数据的转换,就可以执行您的转换。这可能不包含任何转换(例如,.take(n) 即使您没有对 RDD 进行任何转换,也会从 RDD 返回 n 条记录)或执行整个转换链。
.take(...) 方法
这可能是最有用的(并且使用频率高,如 .map(...) 方法)。与 .collect(...) 相比,此方法更受青睐,因为它只从单个数据分区返回前 n 行,而 .collect(...) 返回整个 RDD。当您处理大型数据集时,这一点尤其重要:
data_first = data_from_file_conv.take(1)
如果您想要一些随机记录,可以使用 .takeSample(...) 代替,它接受三个参数:第一个是采样是否应该有放回,第二个指定要返回的记录数,第三个是伪随机数生成器的种子:
data_take_sampled = data_from_file_conv.takeSample(False, 1, 667)
.collect(...) 方法
此方法将 RDD 的所有元素返回到驱动程序。正如我们刚刚提供了对此的警告,我们在这里不再重复。
.reduce(...) 方法
.reduce(...) 方法使用指定的方法减少 RDD 的元素。
您可以使用它来对 RDD 的元素求和:
rdd1.map(lambda row: row[1]).reduce(lambda x, y: x + y)
这将产生 15 的总和。
我们首先使用 .map(...) 转换创建 rdd1 的所有值的列表,然后使用 .reduce(...) 方法处理结果。reduce(...) 方法在每个分区上运行求和方法(这里用 lambda 表示),并将总和返回到驱动节点,在那里进行最终聚合。
.reduceByKey(...) 方法的工作原理与 .reduce(...) 方法类似,但它是按关键点进行归约的:
data_key = sc.parallelize(
[('a', 4),('b', 3),('c', 2),('a', 8),('d', 2),('b', 1),
('d', 3)],4)
data_key.reduceByKey(lambda x, y: x + y).collect()
前面的代码产生了以下结果:
.count(...) 方法
.count(...) 方法统计 RDD 中的元素数量。使用以下代码:
data_reduce.count()
这段代码将产生 6,即 data_reduce RDD 中的确切元素数量。
.count(...) 方法与以下方法产生相同的结果,但它不需要将整个数据集移动到驱动程序:
len(data_reduce.collect()) # WRONG -- DON'T DO THIS!
如果您的数据集是键值对形式,您可以使用 .countByKey() 方法来获取不同键的数量。运行以下代码:
data_reduce.countByKey()
这段代码将产生以下输出:
.saveAsTextFile(...) 方法
顾名思义,.saveAsTextFile(...) 方法将 RDD 保存为文本文件:
每个分区对应一个单独的文件:
data_key.saveAsTextFile(
'/Users/drabast/Documents/PySpark_Data/data_key.txt')
要重新读取它,您需要将其解析回字符串,因为所有行都被当作字符串处理:
def parseInput(row):
import re
pattern = re.compile(r'\(\'([a-z])\', ([0-9])\)')
row_split = pattern.split(row)
return (row_split[1], int(row_split[2]))
data_key_reread = sc \
.textFile(
'/Users/drabast/Documents/PySpark_Data/data_key.txt') \
.map(parseInput)
data_key_reread.collect()
读取的键列表与我们最初拥有的匹配:
.foreach(...) 方法
这是一种对 RDD 的每个元素以迭代方式应用相同函数的方法;与 .map(...) 相反,.foreach(...) 方法以逐个的方式将定义的函数应用于每个记录。当您想要将数据保存到不受 PySpark 原生支持的数据库时,它很有用。
这里,我们将使用它在命令行界面(而不是 Jupyter Notebook)打印存储在 data_key RDD 中的所有记录:
def f(x):
print(x)
data_key.foreach(f)
如果您现在转到命令行界面,您应该会看到所有记录都被打印出来。注意,每次的顺序很可能不同。