RDD定义
RDD(Resilient Distributed Dataset)叫做弹性分布式数据集,是Spark中最基本的数据抽象,代表一个不可变、可分区、里面的元素可并行计算的集合。
Resilient:RDD中的数据可以存储在内存中或者磁盘中。
Distributed:RDD中的数据是分布式存储的,可用于分布式计算。
Dataset:一个数据集合,用于存放数据的。
RDD的五大特性
RDD 数据结构内部有五个特性
前三个特征每个RDD都具备,后两个特征可选。
RDD 编程入门
程序执行入口 SparkContext对象
Spark RDD 编程的程序入口对象是SparkContext对象(不论何种编程语言)。只有构建出SparkContext, 基于它才能执行后续的API调用和计算。本质上, SparkContext对编程来说, 主要功能就是创建第一个RDD出来。
RDD的创建
RDD的创建主要有2种方式:
• 通过并行化集合创建 ( 本地对象 转 分布式RDD )
• 读取外部数据源 ( 读取文件 )
# coding:utf8
# 导入Spark的相关包
from pyspark import SparkConf, SparkContext
if __name__ == '__main__':
# 0. 初始化执行环境 构建SparkContext对象
conf = SparkConf().setAppName("test").setMaster("local[*]")
sc = SparkContext(conf=conf)
# 演示通过并行化集合的方式去创建RDD, 本地集合 -> 分布式对象(RDD)
rdd = sc.parallelize([1, 2, 3, 4, 5, 6, 7, 8, 9])
# parallelize方法, 没有给定 分区数, 默认分区数是多少? 根据CPU核心来定
print("默认分区数: ", rdd.getNumPartitions())
rdd = sc.parallelize([1, 2, 3], 3)
print("分区数: ", rdd.getNumPartitions())
# collect方法, 是将RDD(分布式对象)中每个分区的数据, 都发送到Driver中, 形成一个Python List对象
# collect: 分布式 转 -> 本地集合
print("rdd的内容是: ", rdd.collect())
# coding:utf8
from pyspark import SparkConf, SparkContext
if __name__ == '__main__':
# 构建SparkContext对象
conf = SparkConf().setAppName("test").setMaster("local[*]")
sc = SparkContext(conf=conf)
# 通过textFile API 读取数据
# 读取本地文件数据
file_rdd1 = sc.textFile("../data/input/words.txt")
print("默认读取分区数: ", file_rdd1.getNumPartitions())
print("file_rdd1 内容:", file_rdd1.collect())
# 加最小分区数参数的测试
file_rdd2 = sc.textFile("../data/input/words.txt", 3)
# 最小分区数是参考值, Spark有自己的判断, 你给的太大Spark不会理会
file_rdd3 = sc.textFile("../data/input/words.txt", 100)
print("file_rdd2 分区数:", file_rdd2.getNumPartitions())
print("file_rdd3 分区数:", file_rdd3.getNumPartitions())
# 读取HDFS文件数据测试
hdfs_rdd = sc.textFile("hdfs://node1:8020/input/words.txt")
print("hdfs_rdd 内容:", hdfs_rdd.collect())
# coding:utf8
from pyspark import SparkConf, SparkContext
if __name__ == '__main__':
conf = SparkConf().setAppName("test").setMaster("local[*]")
sc = SparkContext(conf=conf)
# 读取小文件文件夹
rdd= sc.wholeTextFiles("../data/input/tiny_files")
print(rdd.map(lambda x:x[1]).collect())
RDD 算子
常用 Transformation 算子
# coding:utf8
from pyspark import SparkConf, SparkContext
if __name__ == '__main__':
conf = SparkConf().setAppName("test").setMaster("local[*]")
sc = SparkContext(conf=conf)
rdd = sc.parallelize([1, 2, 3, 4, 5, 6], 3)
# 定义方法, 作为算子的传入函数体
def add(data):
return data * 10
print(rdd.map(add).collect())
# 更简单的方式 是定义lambda表达式来写匿名函数
print(rdd.map(lambda data: data * 10).collect())
"""
对于算子的接受函数来说, 两种方法都可以
lambda表达式 适用于 一行代码就搞定的函数体, 如果是多行, 需要定义独立的方法.
"""
# coding:utf8
from pyspark import SparkConf, SparkContext
if __name__ == '__main__':
conf = SparkConf().setAppName("test").setMaster("local[*]")
sc = SparkContext(conf=conf)
rdd = sc.parallelize(["hadoop spark hadoop", "spark hadoop hadoop", "hadoop flink spark"])
# 得到所有的单词, 组成RDD, flatMap的传入参数 和map一致, 就是给map逻辑用的, 解除嵌套无需逻辑(传参)
rdd2 = rdd.flatMap(lambda line: line.split(" "))
print(rdd2.collect())
# coding:utf8
from pyspark import SparkConf, SparkContext
if __name__ == '__main__':
conf = SparkConf().setAppName("test").setMaster("local[*]")
sc = SparkContext(conf=conf)
rdd = sc.parallelize([1, 2, 3, 4, 5, 6])
# 通过Filter算子, 过滤奇数
result = rdd.filter(lambda x: x % 2 == 1)
print(result.collect())
# coding:utf8
from pyspark import SparkConf, SparkContext
if __name__ == '__main__':
conf = SparkConf().setAppName("test").setMaster("local[*]")
sc = SparkContext(conf=conf)
rdd = sc.parallelize([1, 1, 1, 2, 2, 2, 3, 3, 3])
# distinct 进行RDD数据去重操作
print(rdd.distinct().collect())
rdd2 = sc.parallelize([('a', 1), ('a', 1), ('a', 3)])
print(rdd2.distinct().collect())
# coding:utf8
from pyspark import SparkConf, SparkContext
if __name__ == '__main__':
conf = SparkConf().setAppName("test").setMaster("local[*]")
sc = SparkContext(conf=conf)
rdd1 = sc.parallelize([1, 1, 3, 3])
rdd2 = sc.parallelize(["a", "b", "a"])
rdd3 = rdd1.union(rdd2)
print(rdd3.collect())
"""
1. 可以看到 union算子是不会去重的
2. RDD的类型不同也是可以合并的.
"""
# coding:utf8
from pyspark import SparkConf, SparkContext
if __name__ == '__main__':
conf = SparkConf().setAppName("test").setMaster("local[*]")
sc = SparkContext(conf=conf)
rdd1 = sc.parallelize([ (1001, "zhangsan"), (1002, "lisi"), (1003, "wangwu"), (1004, "zhaoliu") ])
rdd2 = sc.parallelize([ (1001, "销售部"), (1002, "科技部")])
# 通过join算子来进行rdd之间的关联
# 对于join算子来说 关联条件 按照二元元组的key来进行关联
print(rdd1.join(rdd2).collect())
# 左外连接, 右外连接 可以更换一下rdd的顺序 或者调用rightOuterJoin即可
print(rdd1.leftOuterJoin(rdd2).collect())
# coding:utf8
from pyspark import SparkConf, SparkContext
if __name__ == '__main__':
conf = SparkConf().setAppName("test").setMaster("local[*]")
sc = SparkContext(conf=conf)
rdd1 = sc.parallelize([('a', 1), ('b', 1)])
rdd2 = sc.parallelize([('a', 1), ('c', 3)])
# 通过intersection算子求RDD之间的交集, 将交集取出 返回新RDD
rdd3 = rdd1.intersection(rdd2)
print(rdd3.collect())
# coding:utf8
from pyspark import SparkConf, SparkContext
if __name__ == '__main__':
conf = SparkConf().setAppName("test").setMaster("local[*]")
sc = SparkContext(conf=conf)
rdd = sc.parallelize([1, 2, 3, 4, 5, 6, 7, 8, 9], 2)
print(rdd.glom().flatMap(lambda x: x).collect())
# coding:utf8
from pyspark import SparkConf, SparkContext
if __name__ == '__main__':
conf = SparkConf().setAppName("test").setMaster("local[*]")
sc = SparkContext(conf=conf)
rdd = sc.parallelize([('a', 1), ('a', 1), ('b', 1), ('b', 1), ('b', 1)])
rdd2 = rdd.groupByKey()
print(rdd2.map(lambda x: (x[0], list(x[1]))).collect())
# coding:utf8
from pyspark import SparkConf, SparkContext
if __name__ == '__main__':
conf = SparkConf().setAppName("test").setMaster("local[*]")
sc = SparkContext(conf=conf)
rdd = sc.parallelize([('a', 1), ('a', 1), ('b', 1), ('b', 1), ('a', 1)])
# reduceByKey 对相同key 的数据执行聚合相加
print(rdd.reduceByKey(lambda a, b: a + b).collect())
groupByKey和reduceByKey的区别
# coding:utf8
from pyspark import SparkConf, SparkContext
if __name__ == '__main__':
conf = SparkConf().setAppName("test").setMaster("local[*]")
sc = SparkContext(conf=conf)
rdd = sc.parallelize([('c', 3), ('f', 1), ('b', 11), ('c', 3), ('a', 1), ('c', 5), ('e', 1), ('n', 9), ('a', 1)], 3)
# 使用sortBy对rdd执行排序
# 按照value 数字进行排序
# 参数1函数, 表示的是 , 告知Spark 按照数据的哪个列进行排序
# 参数2: True表示升序 False表示降序
# 参数3: 排序的分区数
"""注意: 如果要全局有序, 排序分区数请设置为1"""
print(rdd.sortBy(lambda x: x[1], ascending=True, numPartitions=1).collect())
# 按照key来进行排序
print(rdd.sortBy(lambda x: x[0], ascending=False, numPartitions=1).collect())
# coding:utf8
from pyspark import SparkConf, SparkContext
if __name__ == '__main__':
conf = SparkConf().setAppName("test").setMaster("local[*]")
sc = SparkContext(conf=conf)
rdd = sc.parallelize([('a', 1), ('E', 1), ('C', 1), ('D', 1), ('b', 1), ('g', 1), ('f', 1),
('y', 1), ('u', 1), ('i', 1), ('o', 1), ('p', 1),
('m', 1), ('n', 1), ('j', 1), ('k', 1), ('l', 1)], 3)
print(rdd.sortByKey(ascending=True, numPartitions=1, keyfunc=lambda key: str(key).lower()).collect())
和map的区别:
处理方式:map逐个处理元素,而mapPartitions处理整个分区。
性能:mapPartitions的性能通常优于map,因为它减少了在每个元素上执行操作的开销。当你需要进行一次性操作时,例如建立数据库连接或初始化资源,使用mapPartitions可以显著提高性能。
内存:尽管mapPartitions在性能上有优势,但需要注意的是,mapPartitions在处理大量数据时可能会导致内存不足,因为它需要在每个分区上一次性处理所有数据。
# coding:utf8
from pyspark import SparkConf, SparkContext
if __name__ == '__main__':
conf = SparkConf().setAppName("test").setMaster("local[*]")
sc = SparkContext(conf=conf)
rdd = sc.parallelize([1, 3, 2, 4, 7, 9, 6], 3)
def process(iter):
result = list()
for it in iter:
result.append(it * 10)
return result
print(rdd.mapPartitions(process).collect())
# coding:utf8
from pyspark import SparkConf, SparkContext
if __name__ == '__main__':
conf = SparkConf().setAppName("test").setMaster("local[*]")
sc = SparkContext(conf=conf)
rdd = sc.parallelize([('hadoop', 1), ('spark', 1), ('hello', 1), ('flink', 1), ('hadoop', 1), ('spark', 1)])
# 使用partitionBy 自定义 分区
def process(k):
if 'hadoop' == k or 'hello' == k: return 0
if 'spark' == k: return 1
return 2
print(rdd.partitionBy(3, process).glom().collect())
输出:
# coding:utf8
from pyspark import SparkConf, SparkContext
if __name__ == '__main__':
conf = SparkConf().setAppName("test").setMaster("local[*]")
sc = SparkContext(conf=conf)
rdd = sc.parallelize([1, 2, 3, 4, 5], 3)
# repartition 修改分区
print(rdd.repartition(1).getNumPartitions())
print(rdd.repartition(5).getNumPartitions())
# coalesce 修改分区,不能加分区
print(rdd.coalesce(1).getNumPartitions())
# 可以加分区
print(rdd.coalesce(5, shuffle=True).getNumPartitions())
{"id":1,"timestamp":"2019-05-08T01:03.00Z","category":"平板电脑","areaName":"北京","money":"1450"}|{"id":2,"timestamp":"2019-05-08T01:01.00Z","category":"手机","areaName":"北京","money":"1450"}|{"id":3,"timestamp":"2019-
05-08T01:03.00Z","category":" 手机 ","areaName":" 北京 ","money":"8412"}{"id":4,"timestamp":"2019-05-08T05:01.00Z","category":" 电脑 ","areaName":" 上海 ","money":"1513"}|{"id":5,"timestamp":"2019-05-08T01:03.00Z","category":" 家电 ","areaName":" 北京 ","money":"1550"}|{"id":6,"timestamp":"2019-05-08T01:01.00Z","category":" 电脑 ","areaName":" 杭州 ","money":"1550"}{"id":7,"timestamp":"2019-05-08T01:03.00Z","category":" 电脑 ","areaName":" 北京 ","money":"5611"}|{"id":8,"timestamp":"2019-05-08T03:01.00Z","category":" 家电 ","areaName":" 北京 ","money":"4410"}|{"id":9,"timestamp":"2019-05-08T01:03.00Z","category":" 家具 ","areaName":" 郑州 ","money":"1120"}{"id":10,"timestamp":"2019-05-08T01:01.00Z","category":" 家具 ","areaName":" 北京 ","money":"6661"}|{"id":11,"timestamp":"2019-05-08T05:03.00Z","category":" 家具 ","areaName":" 杭州 ","money":"1230"}|{"id":12,"timestamp":"2019-05-08T01:01.00Z","category":" 书籍 ","areaName":" 北京 ","money":"5550"}{"id":13,"timestamp":"2019-05-08T01:03.00Z","category":" 书籍 ","areaName":" 北京 ","money":"5550"}|{"id":14,"timestamp":"2019-05-08T01:01.00Z","category":" 电脑 ","areaName":" 北京 ","money":"1261"}|{"id":15,"timestamp":"2019-05-08T03:03.00Z","category":" 电脑 ","areaName":" 杭州 ","money":"6660"}{"id":16,"timestamp":"2019-05-08T01:01.00Z","category":" 电脑 ","areaName":" 天津 ","money":"6660"}|{"id":17,"timestamp":"2019-05-08T01:03.00Z","category":" 书籍 ","areaName":" 北京 ","money":"9000"}|{"id":18,"timestamp":"2019-05-08T05:01.00Z","category":" 书籍 ","areaName":" 北京 ","money":"1230"}{"id":19,"timestamp":"2019-05-08T01:03.00Z","category":" 电脑 ","areaName":" 杭州 ","money":"5551"}|{"id":20,"timestamp":"2019-05-08T01:01.00Z","category":" 电脑 ","areaName":" 北京 ","money":"2450"}{"id":21,"timestamp":"2019-05-08T01:03.00Z","category":" 食品 ","areaName":" 北京 ","money":"5520"}|{"id":22,"timestamp":"2019-05-08T01:01.00Z","category":" 食品 ","areaName":" 北京 ","money":"6650"}{"id":23,"timestamp":"2019-05-08T01:03.00Z","category":" 服饰 ","areaName":" 杭州 ","money":"1240"}|{"id":24,"timestamp":"2019-05-08T01:01.00Z","category":" 食品 ","areaName":" 天津 ","money":"5600"}{"id":25,"timestamp":"2019-05-08T01:03.00Z","category":" 食品 ","areaName":" 北京 ","money":"7801"}|{"id":26,"timestamp":"2019-05-08T01:01.00Z","category":" 服饰 ","areaName":" 北京 ","money":"9000"}{"id":27,"timestamp":"2019-05-08T01:03.00Z","category":" 服饰 ","areaName":" 杭州 ","money":"5600"}|{"id":28,"timestamp":"2019-05-08T01:01.00Z","category":" 食品 ","areaName":" 北京 ","money":"8000"}|{"id":29,"timestamp":"2019-05-08T02:03.00Z","category":" 服饰 ","areaName":" 杭州 ","money":"7000"}
# coding:utf8
from pyspark import SparkConf, SparkContext
import json
if __name__ == '__main__':
conf = SparkConf().setAppName("test").setMaster("local[*]")
sc = SparkContext(conf=conf)
# 读取数据文件
file_rdd = sc.textFile("../data/input/order.text")
# 进行rdd数据的split 按照|符号进行, 得到一个个的json数据
jsons_rdd = file_rdd.flatMap(lambda line: line.split("|"))
# 通过Python 内置的json库, 完成json字符串到字典对象的转换
dict_rdd = jsons_rdd.map(lambda json_str: json.loads(json_str))
# 过滤数据, 只保留北京的数据
beijing_rdd = dict_rdd.filter(lambda d: d['areaName'] == "北京")
# 组合北京 和 商品类型形成新的字符串
category_rdd = beijing_rdd.map(lambda x: x['areaName'] + "_" + x['category'])
# 对结果集进行去重操作
result_rdd = category_rdd.distinct()
# 输出
print(result_rdd.collect())
# coding:utf8
from pyspark import SparkConf, SparkContext
from defs_19 import city_with_category
import json
import os
os.environ['HADOOP_CONF_DIR'] = "/export/server/hadoop/etc/hadoop"
if __name__ == '__main__':
# 提交 到yarn集群, master 设置为yarn
conf = SparkConf().setAppName("test-yarn-1").setMaster("yarn")
# 如果提交到集群运行, 除了主代码以外, 还依赖了其它的代码文件
# 需要设置一个参数, 来告知spark ,还有依赖文件要同步上传到集群中
# 参数叫做: spark.submit.pyFiles
# 参数的值可以是 单个.py文件, 也可以是.zip压缩包(有多个依赖文件的时候可以用zip压缩后上传)
conf.set("spark.submit.pyFiles", "defs_19.py")
sc = SparkContext(conf=conf)
# 在集群中运行, 我们需要用HDFS路径了. 不能用本地路径
file_rdd = sc.textFile("hdfs://node1:8020/input/order.text")
# 进行rdd数据的split 按照|符号进行, 得到一个个的json数据
jsons_rdd = file_rdd.flatMap(lambda line: line.split("|"))
# 通过Python 内置的json库, 完成json字符串到字典对象的转换
dict_rdd = jsons_rdd.map(lambda json_str: json.loads(json_str))
# 过滤数据, 只保留北京的数据
beijing_rdd = dict_rdd.filter(lambda d: d['areaName'] == "北京")
# 组合北京 和 商品类型形成新的字符串
category_rdd = beijing_rdd.map(city_with_category)
# 对结果集进行去重操作
result_rdd = category_rdd.distinct()
# 输出
print(result_rdd.collect())
def city_with_category(data):
return data['areaName'] + "_" + data['category']
常用 Action 算子
# coding:utf8
from pyspark import SparkConf, SparkContext
if __name__ == '__main__':
conf = SparkConf().setAppName("test").setMaster("local[*]")
sc = SparkContext(conf=conf)
rdd = sc.textFile("../data/input/words.txt")
rdd2 = rdd.flatMap(lambda x: x.split(" ")).map(lambda x: (x, 1))
# 通过countByKey来对key进行计数, 这是一个Action算子
result = rdd2.countByKey()
print(result)
print(type(result))
# coding:utf8
from pyspark import SparkConf, SparkContext
if __name__ == '__main__':
conf = SparkConf().setAppName("test").setMaster("local[*]")
sc = SparkContext(conf=conf)
rdd = sc.parallelize([1, 2, 3, 4, 5])
print(rdd.reduce(lambda a, b: a + b))
# coding:utf8
from pyspark import SparkConf, SparkContext
if __name__ == '__main__':
conf = SparkConf().setAppName("test").setMaster("local[*]")
sc = SparkContext(conf=conf)
rdd = sc.parallelize([1, 2, 3, 4, 5, 6, 7, 8, 9], 3)
print(rdd.fold(10, lambda a, b: a + b))
# coding:utf8
from pyspark import SparkConf, SparkContext
if __name__ == '__main__':
conf = SparkConf().setAppName("test").setMaster("local[*]")
sc = SparkContext(conf=conf)
rdd = sc.parallelize([1, 3, 5, 3, 1, 3, 2, 6, 7, 8, 6], 1)
print(rdd.takeSample(False, 5, 1))
# coding:utf8
from pyspark import SparkConf, SparkContext
if __name__ == '__main__':
conf = SparkConf().setAppName("test").setMaster("local[*]")
sc = SparkContext(conf=conf)
rdd = sc.parallelize([1, 3, 2, 4, 7, 9, 6], 1)
print(rdd.takeOrdered(3))
# 排序的时候对数值进行取相反数,相当于取最小的3个元素
print(rdd.takeOrdered(3, lambda x: -x))
# coding:utf8
from pyspark import SparkConf, SparkContext
if __name__ == '__main__':
conf = SparkConf().setAppName("test").setMaster("local[*]")
sc = SparkContext(conf=conf)
rdd = sc.parallelize([1, 3, 2, 4, 7, 9, 6], 1)
result = rdd.foreach(lambda x: print(x * 10))
# 这个result是None,没有返回值
# coding:utf8
from pyspark import SparkConf, SparkContext
if __name__ == '__main__':
conf = SparkConf().setAppName("test").setMaster("local[*]")
sc = SparkContext(conf=conf)
rdd = sc.parallelize([1, 3, 2, 4, 7, 9, 6], 3)
rdd.saveAsTextFile("hdfs://node1:8020/output/out1")
# coding:utf8
from pyspark import SparkConf, SparkContext
if __name__ == '__main__':
conf = SparkConf().setAppName("test").setMaster("local[*]")
sc = SparkContext(conf=conf)
rdd = sc.parallelize([1, 3, 2, 4, 7, 9, 6], 3)
def process(iter):
result = list()
for it in iter:
result.append(it * 10)
print(result)
rdd.foreachPartition(process)
Transformation 和 Action的区别?
转换算子的返回值100%是RDD, 而Action算子的返回值100%不是RDD。
转换算子是懒加载的, 只有遇到Action才会执行. Action就是转换算子处理链条的开关。
哪两个Action算子的结果不经过Driver, 直接输出?
foreach 和 saveAsTextFile 直接由Executor执行后输出,不会将结果发送到Driver上去。
对于分区操作有什么要注意的地方?
尽量不要增加分区, 可能破坏内存迭代的计算管道。