Spark学习(5)-Spark Core之RDD

news2025/1/12 4:32:13

1 RDD详解

1.1 为什么需要RDD

分布式计算需要:

  • 分区控制
  • Shuffle控制
  • 数据存储\序列化\发送
  • 数据计算API
  • 等一系列功能
    这些功能, 不能简单的通过Python内置的本地集合对象(如 List\ 字典等)去完成。我们在分布式框架中, 需要有一个统一的数据抽象对象, 来实现上述分布式计算所需功能。这个抽象对象, 就是RDD。

1.2 什么是RDD?

1.2.1 RDD含义

RDD(Resilient Distributed Dataset)叫做弹性分布式数据集,是Spark中最基本的数据抽象,代表一个不可变、可分区、里面的元素可并行计算的集合。

  • Dataset:一个数据集合,用于存放数据的。
  • Distributed:RDD中的数据是分布式存储的,可用于分布式计算。
  • Resilient:RDD中的数据可以存储在内存中或者磁盘中

在这里插入图片描述

1.2.2 RDD定义

  • RDD(Resilient Distributed Dataset)弹性分布式数据集,是Spark中最基本的数据抽象,代表一个不可变、可分区、里面的元素可并行计算的集合
  • 所有的运算以及操作都建立在 RDD 数据结构的基础之上。
  • 可以认为RDD是分布式的列表List或数组Array,抽象的数据结构,RDD是一个抽象类Abstract Class和泛型Generic Type。

在这里插入图片描述

1.3 RDD的5大特性

1.3.1 RDD是有分区的(必有)

RDD的分区是RDD数据存储的最小单位。一份RDD的数据,本质上分隔成了多个分区。
在这里插入图片描述
在这里插入图片描述

1.3.2 RDD的方法会作用到其所有的分区上(必有)

1.3.3 RDD之前是有依赖关系的(必有)

1.3.4 Key-Value型的RDD可以有分区器(选有)

  • 默认分区器:Hash分区规则,可以手动设置一个分区器(rdd.partitionBy的方法来设置)。
  • 这个特性是可能的,因为不是所有的RDD都是Key-Value型。
  • Key-Value RDD:RDD中存储的是二元元组,这个就是Key-Value型RDD。
  • 二元元组:只有两个元素的元组,比如:(“hadoop”,6)。

1.3.5 RDD的分区规划会尽量靠近数据所在的服务器(选有)

在初始化RDD(读取数据的时候)规划的时候,分区会尽量规划到存储数据所在的服务器上。
因为这样可以走本地读取,避免网络读取。
本地读取:Executor所在的服务器,同样是一个DataNode,同时这个DataNode上有它要读的数据,所以可以直接读取到机器硬盘即可,无需走网络传输。
总结:
Spark会在确保并行计算能力的前提下,尽可能确保本地读取。

1.4 WordCount中RDD

2 RDD 编程入门

2.1 程序入口 SparkContext对象

Spark RDD 编程的程序入口对象是SparkContext对象(不论何种编程语言),只有构建出SparkContext, 基于它才能执行后续的API调用和计算。
本质上, SparkContext对编程来说, 主要功能就是创建第一个RDD出来。

from pyspark import SparkConf, SparkContext

if __name__ == '__main__':
    # 0. 初始化执行环境 构建SparkContext对象
    conf = SparkConf().setAppName("test").setMaster("local[*]")
    src = SparkContext(conf=conf)

2.2 RDD的创建

RDD的创建主要有2种方式:

  • 通过并行化集合创建 ( 本地对象 转 分布式RDD )
  • 读取外部数据源 ( 读取文件 )

2.2.1 并行化创建

概念:并行化创建,是指:将本地集合转成分布式RDD

# _*_ coding:utf-8 _*_
"""
@Software  :pyspark
@FileName  :01_RDD_create_parallelize.py
@Date      :2022/11/18 6:28
@Author    :wuk
并行化集合,将本地集合转成分布式对象RDD
"""
from pyspark import SparkConf, SparkContext

if __name__ == '__main__':
    # 0. 初始化执行环境 构建SparkContext对象
    conf = SparkConf().setAppName("test").setMaster("local[*]")
    src = SparkContext(conf=conf)

    # 演示通过并行化集合的方式去创建RDD, 本地集合 -> 分布式对象(RDD)
    rdd = src.parallelize([1, 2, 3, 4, 5, 6, 7, 8, 9])
    # parallelize方法, 没有给定 分区数, 默认分区数是多少?  根据CPU核心来定
    print(f"默认分区数量:{rdd.getNumPartitions()}")
    rdd = src.parallelize([1, 2, 3], 3)
    print("分区数: ", rdd.getNumPartitions())

    # collect方法, 是将RDD(分布式对象)中每个分区的数据, 都发送到Driver中, 形成一个Python List对象
    # collect: 分布式 转 -> 本地集合
    print("rdd的内容是: ", rdd.collect())

parallelize:
参数1:集合对象,比如list
参数2:分区数量

2.2.2 获取RDD分区数

rdd.getNumPartitions()

2.2.3 读取文件创建

读取单个文件
# _*_ coding:utf-8 _*_
"""
@Software  :pyspark
@FileName  :02_RDD_create_textFile.py
@Date      :2022/11/18 6:28
@Author    :wuk
读取本地文件
"""
from pyspark import SparkContext, SparkConf

if __name__ == '__main__':
    # 构建SparkContext对象
    conf = SparkConf().setAppName("text").setMaster("local[*]")
    context = SparkContext(conf=conf)

    # 通过textFile API 读取数据
    # 读取本地文件数据
    file = context.textFile("./data/words.txt")
    print(f"默认分区数量:{file.getNumPartitions()}")
    print(f"内容是{file.collect()}")

    # 加最小分区数参数的测试
    text_file = context.textFile("./data/words.txt", 100)
    print(f"默认分区数量:{text_file.getNumPartitions()}")
    print(f"内容是{text_file.collect()}")

    # 读取HDFS文件数据测试
    context_text_file = context.textFile("hdfs://master:8020/input/word.txt")
    print(context_text_file.getNumPartitions())
    print(context_text_file.collect())

textFile:
参数1:必填,读取文件路径,可以是本地文件,也可以是HDFS路径。
参数2:选填,表示最小分区数量(一般不会去设置)

读取一堆文件
# _*_ coding:utf-8 _*_
"""
@Software  :pyspark
@FileName  :03_RDD_create_wholeTextFile.py
@Date      :2022/11/18 6:28
@Author    :wuk
wholeTextFiles 读取文件夹下的所有文件内容
"""
from pyspark import SparkConf, SparkContext

if __name__ == '__main__':
    conf = SparkConf().setAppName("test").setMaster("local[*]")
    context = SparkContext(conf=conf)
    files_rdd = context.wholeTextFiles("./data")
    print(files_rdd.collect())
    print(files_rdd.map(lambda x: x[1]).collect())

注意:该API适用于少量分区读取数据,是小文件专用。

2.3 算子

  • 方法/函数:本地对象的API,叫做函数/方法.
  • 算子:分布式集合对象的API,称为算子。
  • 算子分类:Transformation算子(转换算子),Action算子(动作算子)。

注意:

  • 对于这两类的算子,转换算子相当于在构建执行计划,action是一个执行让这个执行计划开始工作。
  • 如果没有action,转换算子之间的迭代计划,就是一个没有通电的流水线,只有action的到来,这个数据处理的流水线才开始工作。

在这里插入图片描述
在这里插入图片描述

2.4 常用Transformation算子

定义:RDD的算子,返回值仍旧是一个RDD,称之为转换算子。
特性:这类算子是lazy懒加载的,如果没有action算子,转换算子是不工作的。

map

功能:将RDD数据一条条处理,处理逻辑基于map算子中接收的处理函数,返回新的RDD.

# _*_ coding:utf-8 _*_
"""
@Software  :pyspark
@FileName  :04_RDD_operators_map.py
@Date      :2022/11/18 6:29
@Author    :wuk
map的使用
"""
from pyspark import SparkConf, SparkContext

if __name__ == '__main__':
    conf= SparkConf().setAppName("test").setMaster("local[*]")
    context = SparkContext(conf=conf)
    rdd1 = context.parallelize([1, 2, 3, 4, 5, 6], 3)
    print(rdd1.getNumPartitions())
    print(rdd1.map(lambda x: x * 10).collect())

在这里插入图片描述
语法如下:
在这里插入图片描述

flatMap

功能:对rdd执行map操作,然后进行解嵌套操作。
解除嵌套:
在这里插入图片描述

# _*_ coding:utf-8 _*_
"""
@Software  :pyspark
@FileName  :05_RDD_operators_flatMap.py
@Date      :2022/11/19 14:21
@Author    :wuk
@Description  : flatMap的使用
"""
from pyspark import SparkConf, SparkContext

if __name__ == '__main__':
    conf = SparkConf().setAppName("test").setMaster("local[*]")
    context = SparkContext(conf=conf)
    rdd = context.parallelize(["hadoop spark hadoop", "spark hadoop hadoop", "hadoop flink spark"])
    # 得到所有的单词, 组成RDD, flatMap的传入参数 和map一致, 就是给map逻辑用的, 解除嵌套无需逻辑(传参)
    print(rdd.map(lambda x: x.split(" ")).collect())
    print(rdd.flatMap(lambda x: x.split(" ")).collect())

在这里插入图片描述

reduceByKey

功能:针对KV型RDD,自动按照key进行分组,然后根据你提供的聚合逻辑,完成组内数据(value)的聚合。
用法:
在这里插入图片描述
注意:reduceByKey中接收的函数,只负责聚合,不理会分组,分组是自动by key来分组的。

# _*_ coding:utf-8 _*_
"""
@Software  :pyspark
@FileName  :06_RDD_operators_reduceByKey.py
@Date      :2022/11/19 14:32
@Author    :wuk
@Description  : reduceByKey的使用
"""
from pyspark import SparkConf, SparkContext

if __name__ == '__main__':
    conf = SparkConf().setAppName("test").setMaster("local[*]")
    context = SparkContext(conf=conf)
    rdd = context.parallelize([('a', 1), ('a', 1), ('b', 1), ('b', 1), ('a', 1)])
    # reduceByKey 对相同key 的数据执行聚合相加
    print(rdd.reduceByKey(lambda a, b: a + b).collect())

在这里插入图片描述
聚合逻辑:
在这里插入图片描述

groupBy

功能:将RDD的数据进行分组
语法:
在这里插入图片描述

# _*_ coding:utf-8 _*_
"""
@Software  :pyspark
@FileName  :08_RDD_operators_groupBy.py
@Date      :2022/11/19 14:33
@Author    :wuk
@Description  : group by的使用
"""
from pyspark import SparkConf, SparkContext

if __name__ == '__main__':
    conf = SparkConf().setAppName("test").setMaster("local[*]")
    sc = SparkContext(conf=conf)

    rdd = sc.parallelize([('a', 1), ('a', 1), ('b', 1), ('b', 2), ('b', 3)])
    # 通过groupBy对数据进行分组
    # groupBy传入的函数的 意思是: 通过这个函数, 确定按照谁来分组(返回谁即可)
    # 分组规则 和SQL是一致的, 也就是相同的在一个组(Hash分组)
    print(rdd.groupBy(lambda t: t[0])
          .map(lambda t: (t[0], list(t[1])))
          .collect())

    rdd = sc.parallelize([1, 2, 3, 4, 5, 6])
    print(rdd.groupBy(lambda t: "even" if (t % 2 == 0) else "odd")
          .map(lambda t: (t[0], list(t[1])))
          .collect())

在这里插入图片描述

filter

功能:过滤想要的数据进行保留
语法:
在这里插入图片描述

# _*_ coding:utf-8 _*_
"""
@Software  :pyspark
@FileName  :09_RDD_operators_filter.py
@Date      :2022/11/19 14:33
@Author    :wuk
@Description  : filter的使用
"""
from pyspark import SparkConf, SparkContext

if __name__ == '__main__':
    conf = SparkConf().setAppName("test").setMaster("local[*]")
    sc = SparkContext(conf=conf)

    rdd = sc.parallelize([1, 2, 3, 4, 5, 6])
    print(rdd.filter(lambda x: x % 2 == 0).collect())

在这里插入图片描述

distinct

功能:对Rdd数据进行去重,返回新RDD。
语法:
在这里插入图片描述

# _*_ coding:utf-8 _*_
"""
@Software  :pyspark
@FileName  :10_RDD_operators_distinct.py
@Date      :2022/11/19 14:33
@Author    :wuk
@Description  : distinct的使用
"""
from pyspark import SparkContext, SparkConf

if __name__ == '__main__':
    conf = SparkConf().setAppName("test").setMaster("local[*]")
    sc = SparkContext(conf=conf)

    rdd = sc.parallelize([1, 1, 1, 2, 2, 2, 3, 3, 3])
    # distinct 进行RDD数据去重操作
    print(rdd.distinct().collect())

    rdd2 = sc.parallelize([('a', 1), ('a', 1), ('a', 3), ('b', 3)])
    print(rdd2.distinct().collect())

在这里插入图片描述

union

功能:两个RDD合并成一个RDD返回
注意:
只合并,不去重,不同类型的依旧可以混合。

# _*_ coding:utf-8 _*_
"""
@Software  :pyspark
@FileName  :11_RDD_operators_union.py
@Date      :2022/11/19 14:34
@Author    :wuk
@Description  : union的使用
"""
from pyspark import SparkConf, SparkContext

if __name__ == '__main__':
    conf = SparkConf().setAppName("test").setMaster("local[*]")
    sc = SparkContext(conf=conf)

    rdd1 = sc.parallelize([1, 1, 3, 3])
    rdd2 = sc.parallelize(["a", "b", "a"])

    rdd3 = rdd1.union(rdd2)
    print(rdd3.collect())

join,leftOuterJoin,rightOuterJoin

功能:对于两个RDD执行join操作,实现内外连接。
注意:join只适合于二元元组。

# coding:utf8

from pyspark import SparkConf, SparkContext

if __name__ == '__main__':
    conf = SparkConf().setAppName("test").setMaster("local[*]")
    sc = SparkContext(conf=conf)

    rdd1 = sc.parallelize([(1001, "zhangsan"), (1002, "lisi"), (1003, "wangwu"), (1004, "zhaoliu")])
    rdd2 = sc.parallelize([(1001, "销售部"), (1002, "科技部")])

    # 通过join算子来进行rdd之间的关联
    # 对于join算子来说 关联条件 按照二元元组的key来进行关联
    print(rdd1.join(rdd2).collect())

    # 左外连接, 右外连接 可以更换一下rdd的顺序 或者调用rightOuterJoin即可
    print(rdd1.leftOuterJoin(rdd2).collect())
    print(rdd1.rightOuterJoin(rdd2).collect())

intersection

功能:求两个RDD的交集。

# coding:utf8

from pyspark import SparkConf, SparkContext

if __name__ == '__main__':
    conf = SparkConf().setAppName("test").setMaster("local[*]")
    sc = SparkContext(conf=conf)

    rdd1 = sc.parallelize([('a', 1), ('a', 3)])
    rdd2 = sc.parallelize([('a', 1), ('b', 3)])

    # 通过intersection算子求RDD之间的交集, 将交集取出 返回新RDD
    print(rdd1.intersection(rdd2).collect())

在这里插入图片描述

glom

功能:将RDD的数据,加上嵌套,这个嵌套按照分区来进行。

# coding:utf8

from pyspark import SparkConf, SparkContext

if __name__ == '__main__':
    conf = SparkConf().setAppName("test").setMaster("local[*]")
    sc = SparkContext(conf=conf)

    rdd = sc.parallelize([1, 2, 3, 4, 5, 6, 7, 8, 9], 3)
    # 将数据进行分区
    print(rdd.glom().collect())

在这里插入图片描述

groupByKey

功能:针对kv型RDD,自动按照key分组

# coding:utf8

from pyspark import SparkConf, SparkContext

if __name__ == '__main__':
    conf = SparkConf().setAppName("test").setMaster("local[*]")
    sc = SparkContext(conf=conf)

    rdd = sc.parallelize([('a', 1), ('a', 1), ('b', 1), ('b', 1), ('b', 1)])

    rdd2 = rdd.groupByKey()
    print(rdd2.collect())
    print(rdd2.map(lambda x: (x[0], list(x[1]))).collect())

在这里插入图片描述

sortBy

功能:对RDD数据进行排序,基于你指定的排序依据。
语法:
在这里插入图片描述

# coding:utf8

from pyspark import SparkConf, SparkContext

if __name__ == '__main__':
    conf = SparkConf().setAppName("test").setMaster("local[*]")
    sc = SparkContext(conf=conf)

    rdd = sc.parallelize([('c', 3), ('f', 1), ('b', 11), ('c', 3), ('a', 1), ('c', 5), ('e', 1), ('n', 9), ('a', 1)], 3)

    # 使用sortBy对rdd执行排序

    # 按照value 数字进行排序
    # 参数1函数, 表示的是 ,  告知Spark 按照数据的哪个列进行排序
    # 参数2: True表示升序 False表示降序
    # 参数3: 排序的分区数
    """注意: 如果要全局有序, 排序分区数请设置为1"""
    print(rdd.sortBy(lambda x: x[1], ascending=True, numPartitions=1).collect())

    # 按照key来进行排序
    print(rdd.sortBy(lambda x: x[0], ascending=False, numPartitions=1).collect())

在这里插入图片描述

sortByKey

功能:针对kv型的rdd,根据key进行排序。
语法:
在这里插入图片描述

# coding:utf8

from pyspark import SparkConf, SparkContext

if __name__ == '__main__':
    conf = SparkConf().setAppName("test").setMaster("local[*]")
    sc = SparkContext(conf=conf)

    rdd = sc.parallelize([('a', 1), ('E', 1), ('C', 1), ('D', 1), ('b', 1), ('g', 1), ('f', 1),
                          ('y', 1), ('u', 1), ('i', 1), ('o', 1), ('p', 1),
                          ('m', 1), ('n', 1), ('j', 1), ('k', 1), ('l', 1)], 3)

    print(rdd.sortByKey(ascending=True, numPartitions=3, keyfunc=lambda key: str(key).lower()).collect())

在这里插入图片描述

将案例提交到yarn集群中运行

方式1:在pyCharm中直接执行
在这里插入图片描述
如果在PyCharm中直接提交yarn,依赖了其他的python文件,可以通过设置属性来指定依赖的代码
在这里插入图片描述

方式2:在服务器上通过spark-submit提交到集群运行
在这里插入图片描述

2.5 常用Action算子

定义:返回值不是RDD的都是动作算子。

countByKey

功能:统计key出现的次数,一般适用于KV型RDD

# coding:utf8

from pyspark import SparkConf, SparkContext

if __name__ == '__main__':
    conf = SparkConf().setAppName("test").setMaster("local[*]")
    sc = SparkContext(conf=conf)

    rdd = sc.textFile("./data/words.txt")
    rdd2 = rdd.flatMap(lambda x: x.split(" ")).map(lambda x: (x, 1))

    # 通过countByKey来对key进行计数, 这是一个Action算子
    result = rdd2.countByKey()

    print(result)
    print(type(result))

collect

功能:将RDD各个分区的数据,统一收集到Driver中,形成一个List对象
用法:
rdd.collect()
在这里插入图片描述

reduce

功能:对RDD数据按照传入的逻辑进行聚合。
语法:
在这里插入图片描述
在这里插入图片描述

fold

功能:和reduce一样,接收传入逻辑进行聚合,聚合是带有初始值的。
在这里插入图片描述

first

功能:取出RDD第一个元素
用法:
在这里插入图片描述

take

功能:取出前N个元素,组成List返给你
用法:
在这里插入图片描述

top

功能:对RDD数据集进行降序排序,取前N个
用法:
在这里插入图片描述

count

功能:计算RDD有多少条数据,返回值是一个数字。
用法:
在这里插入图片描述

takeSample

功能:随机抽样RDD的数据
用法:
在这里插入图片描述

takeOrdered

功能:对RDD进行排序取前N个
用法:
在这里插入图片描述

foreach

功能:对RDD每一个元素,执行你提供的逻辑操作,和map一致,不过没有返回值。
用法:
在这里插入图片描述

saveAsTextFile

功能:将RDD的数据写入文本文件中,支持本地写出,HDFS等文件系统。
在这里插入图片描述
注意点:

  1. foreach,saveAsTextFile这两个算子是分区(Executor)执行的,跳过Driver。
  2. 反之,其余的Action算子都会将结果发送至Driver.

2.6 分区操作算子(Transformation & Action)

mapPartitions

foreachPartition

partitionBy

repartition,coalesce

对RDD的分区执行重新分区(仅数量)

# coding:utf8

from pyspark import SparkConf, SparkContext

if __name__ == '__main__':
    conf = SparkConf().setAppName("test").setMaster("local[*]")
    sc = SparkContext(conf=conf)

    rdd = sc.parallelize([1, 2, 3, 4, 5], 3)

    # repartition 修改分区
    print(rdd.repartition(1).getNumPartitions())

    print(rdd.repartition(5).getNumPartitions())

    # coalesce 修改分区
    print(rdd.coalesce(1).getNumPartitions())

    print(rdd.coalesce(5, shuffle=True).getNumPartitions())

2.7 面试题: groupByKey和reduceByKey的区别

  • 在功能上区别:
    groupByKey仅仅有分组的功能而已。
    reduceByKey除了有Bykey的分组功能外,还有reduce聚合功能,是一个分组+聚合一体化的算子。
  • 性能上的区别:
    reduceByKey的性能是远大于groupByKey+聚合逻辑(Shuffle),原因是reduceByKey在分组前已经做了预聚合,那么在Shuffle分组节点,被Shuffle的数据可以极大的减少,如下图所示:
    groupByKey+聚合逻辑的执行流程,:
    在这里插入图片描述
    reduceByKey的执行流程如下:
    在这里插入图片描述
    如图:reduceByKey由于自带聚合逻辑,所以可以完成:
    1. 先在分区内做预聚合
    2. 再走分组流程(shuffle)
    3. 分组后再做最终聚合

3 RDD 持久化

3.1 RDD的数据是过程数据

rdd之间进行迭代计算,当执行开启以后,新的rdd生成,老的就会消失,rdd的数据是过程数据,只在处理的过程中存在,一旦处理完成了,数据就不存在了
该特性可以最大化利用资源,老旧rdd没有用了,就从内存中清理,给后续的计算腾出空间。
如图所示:
在这里插入图片描述
如上图,rdd3被两次使用,第一次使用后,其实rdd3就已经不存在了,第二次使用的时候,只能基于rdd的血缘关系,从rdd1重新执行,构建出rdd3。

3.2 RDD的缓存

3.2.1 缓存

针对上述rdd的过程数据问题,肯定需要优化,优化就是不要让rdd3消失,所以提供了rdd缓存技术,可以将指定的rdd数据保留在硬盘或者内存中。
缓存API如下:
在这里插入图片描述

3.2.2 缓存特点

  • 缓存可以将过程RDD数据,持久化保存到内存或者硬盘上。
  • 但是这个保存在设定上认为是不安全的,有丢失的风险,所以缓存有一个特点就是,保留RDD的血缘关系,一旦发生丢失,就可以基于血缘关系,重新计算这个RDD的数据

3.2.3 缓存是如何保存

RDD是将自己的分区数据,每个分区自行将其数据保存在所在的Executor内存和硬盘上,这就是分散存储。

# coding:utf8
import time

from pyspark import SparkConf, SparkContext, StorageLevel

if __name__ == '__main__':
    conf = SparkConf().setAppName("test").setMaster("local[*]")
    sc = SparkContext(conf=conf)

    rdd1 = sc.textFile("./data/words.txt")
    rdd2 = rdd1.flatMap(lambda x: x.split(" "))
    rdd3 = rdd2.map(lambda x: (x, 1))

    rdd3.cache()
    # 缓存到磁盘和内存中
    print(rdd3.persist().getStorageLevel())

    rdd4 = rdd3.reduceByKey(lambda a, b: a + b)
    print(rdd4.collect())

    rdd5 = rdd3.groupByKey()
    rdd6 = rdd5.mapValues(lambda x: sum(x))
    print(rdd6.collect())
    # 清理缓存
    rdd3.unpersist()

3.3 RDD的checkPoint

checkPoint技术,也是将RDD数据保存下来,但是它只支持硬盘存储,并且被设定是安全的,不会保留血缘关系
checkPoint存储RDD数据,是集中收集各个分区数据进行存储,而缓存是分散存储

缓存和checkPoint的对比:

  • checkPoint不管分区数量是多少,风险是一样的 ,缓存分区越多,风险越高。
  • checkPoint支持写入HDFS,缓存不行,HDFS是高可靠存储,checkPoint被认为是安全的。
  • checkPoint不支持内存,缓存可以,缓存如果写入内存,性能比checkPoint好一些。
  • checkPoint被设计是安全的,所以不保留血缘关系,而缓存被设定是不安全的,所以保留。

API实现如下:
在这里插入图片描述
注意:
checkPoint是一种重量级的使用,也就是RDD的重新计算成本很高的时候,我们采用checkPoint比较合适,或者数据量很大,使用checkPoint比较合适。
如果数据量比较小,或者RDD重新计算速度比较快,用checkPoint没有必要,直接用缓存即可。

# coding:utf8
import time

from pyspark import SparkConf, SparkContext
from pyspark.storagelevel import StorageLevel

if __name__ == '__main__':
    conf = SparkConf().setAppName("test").setMaster("local[*]")
    sc = SparkContext(conf=conf)

    # 1. 告知spark, 开启CheckPoint功能
    sc.setCheckpointDir("hdfs://master:8020/output/ckp")
    rdd1 = sc.textFile("./data/words.txt")
    rdd2 = rdd1.flatMap(lambda x: x.split(" "))
    rdd3 = rdd2.map(lambda x: (x, 1))

    # 调用checkpoint API 保存数据即可
    rdd3.checkpoint()

    rdd4 = rdd3.reduceByKey(lambda a, b: a + b)
    print(rdd4.collect())

    rdd5 = rdd3.groupByKey()
    rdd6 = rdd5.mapValues(lambda x: sum(x))
    print(rdd6.collect())

    rdd3.unpersist()

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/24779.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

JavaScript面向对象:类的继承

继承 现实中的继承:子承父业,比如我们都继承了父亲的姓。 程序中的继承:子类可以继承父类的一些属性和方法。 语法: class Father{ // 父类 } class Son extends Father { // 子类继承父类 } 实例: cla…

手把手教你开通小程序流量主

手把手教你开通小程序流量主 开通条件是累计独立访客不低于 1000。也就是1000级以上,其实这个不难。 接下来以防火安全知识专项学习与竞答为例,写一篇开通流量主、创建广告和代码嵌入的图文教程。 功能介绍 广告展示位置灵活控制,接入简单…

操作系统复习【面试】

操作系统复习【面试】前言推荐操作系统复习第一章 操作系统引论 11.3 操作系统的基本特性 141.3.1 并发1.3.2 共享1.3.3 虚拟1.3.4 异步1.4 操作系统的主要功能 171.4.1 处理机管理功能1.4.2 存储器管理功能1.4.3 设备管理功能1.4.4 文件管理功能1.4.5 操作系统和用户之间的接口…

目标管理利器OKR-给被各大APP抢占使用时长的你

今天聊聊好用的时间和目标管理利器OKR,给被各大APP抢占使用时长的你。 1、海龟的秘密 一个游泳健将,他发现自己竟然游不过一只海龟,这让他疲惫不堪,又失望,又难堪。 然后他又去不断观察,终于发现了海龟游…

数据结构之队列

文章目录前言一、队列二、队列应该如何实现顺序表or链表扩展了解三、队列的实现1.队列的声明2.接口(声明)3.接口的实现创建一个新的节点判断队列为空队头元素入队出队销毁队列注意:4.主函数(测试)四、相关习题总结前言…

【C++基础】友元

友元 定义:类的特点是私有成员无法在作用域外访问,而友元函数是特权函数,允许访问私有成员。 语法:在函数或类前加friend。 例子:在message中,published每个人都可访问,secret只有自己可以访问…

开放经济中的货币-中国视角下的宏观经济

开放经济中的货币 – 潘登同学的宏观经济学笔记 文章目录开放经济中的货币 -- 潘登同学的宏观经济学笔记汇率:复习外汇冲销下的可能三角中国的811汇改国际货币体系的现在与未来当前国际货币体系存在三个主要问题体系具有内生不稳定性美元的中心地位带来了不平等非对…

[附源码]java毕业设计校园博客系统

项目运行 环境配置: Jdk1.8 Tomcat7.0 Mysql HBuilderX(Webstorm也行) Eclispe(IntelliJ IDEA,Eclispe,MyEclispe,Sts都支持)。 项目技术: SSM mybatis Maven Vue 等等组成,B/S模式 M…

树莓派快速上手-远程调试图形界面

0 简述 前面的文章里介绍过通过ssh在局域网或者远程访问树莓派,一般而言,对于非图形界面的开发仅通过ssh命令行交互就能够完成的,但是要开发图形界面展示或交互的应用时,光命令行交互的方式就远远不够了。这篇文章将针对树莓派这…

Conformer测试问题

https://github.com/pengzhiliang/Conformer 抽空测试了conformer,训练起来很简单,但是会遇到一个问题: Loss is nan, stopping training 我用的默认配置,不知道为什么会有这个问题,知道的来探讨下。 1.数据准备 我…

华为机试 - 最长连续子序列

目录 题目描述 输入描述 输出描述 用例 题目解析 算法源码 题目描述 有N个正整数组成的一个序列。给定整数sum,求长度最长的连续子序列,使他们的和等于sum,返回此子序列的长度, 如果没有满足要求的序列,返回-1…

【Hack The Box】Linux练习-- Mirai

HTB 学习笔记 【Hack The Box】Linux练习-- Mirai 🔥系列专栏:Hack The Box 🎉欢迎关注🔎点赞👍收藏⭐️留言📝 📆首发时间:🌴2022年11月17日🌴 &#x1f36…

元宇宙技术在几年后质变,迎来体验终端世界

京东集团高级副总裁、京东探索研究院院长陶大程京东集团高级副总裁、京东探索研究院院长陶大程是京东“产业元宇宙”的提出者和构建者。他谈到,希望通过构建元宇宙供应链降低实体经济参与数字经济的门槛,帮助实体经济完成数实融合的商业转型,…

设计模式复习题

1.选择题 1.在观察者模式中,表述错误的是(C )C.观察者可以改变被观察者的状态,再由被观察者通知所有观察者依据被观察者的状态进行。 2.对于违反里式代换原则的两个类,可以采用的候选解决方案错误的是: ( D )D.以上方…

斗鱼发布Q3财报:连续三个季度收入下滑,市值年初至今缩水五成

11月21日,斗鱼(NASDAQ:DOYU)发布2022年第三季度财务报告。 财报显示,斗鱼2022年第三季度的营收为17.98亿元,同比减少23.4%;净亏损660万元,2021年同期为亏损1.435亿元;调整后净利润为…

面向对象编程·上

面向对象编程上1.包1.1导入包中的类1.2静态导入1.3将类放到包中1.4包的访问权限控制 - [只能在当前包当中使用]1.5常见的系统包2.继承2.1背景2.2语法规则2.2.1super[不能出现在静态方法当中]作用总结 父类对象的引用2.3protected 关键字2.4更复杂的继承关系2.5final 关键字2.6组…

电容笔和触控笔有什么区别?值得入手电容笔品牌推荐

电容笔与传统的触控笔最大的不同之处是,电容笔具有良好的防误触和倾斜压感,能有效地降低书写过程中的麻烦。如果我们想要 IPAD和电容笔一起很好地使用,而且我们就没有过多的预算的话。那么,这款平替电容笔,就是最好的选…

看懂这篇文章-你就懂了数据库死锁产生的场景和解决方法

一、什么是死锁 加锁(Locking)是数据库在并发访问时保证数据一致性和完整性的主要机制。任何事务都需要获得相应对象上的锁才能访问数据,读取数据的事务通常只需要获得读锁(共享锁),修改数据的事务需要获得…

京东神灯文档:JVM参数GC线程数ParallelGCThreads合理性设置

目录 1. ParallelGCThreads参数含义 2. ParallelGCThreads参数设置 3. ParallelGCThreads参数实验 4. ParallelGCThreads扫描结果 5. ParallelGCThreads修改建议 1. ParallelGCThreads参数含义 在讲这个参数之前,先谈谈JVM垃圾回收(GC)算法的两个优化标的&…

ERP (SAP) Integrator Delphi Edition

ERP (SAP) Integrator Delphi Edition ERP(SAP)Integrator支持RFC和SAP服务,并允许开发人员轻松编写桌面、服务器和移动应用程序。它提供了一种连接到SAP R/3和NetWeaver系统的简单方法,并使应用程序能够进行远程功能调用,以便向这些远程功能…