Spark RDD 持久化(CheckPoint 检查点)

news2025/1/11 18:38:55

RDD Cache 缓存
RDD 通过 Cache 或者 Persist 方法将前面的计算结果缓存,默认情况下会把数据以缓存
在 JVM 的堆内存中。但是并不是这两个方法被调用时立即缓存,而是触发后面的 action 算
子时,该 RDD 将会被缓存在计算节点的内存中

// cache 操作会增加血缘关系,不改变原有的血缘关系
println(wordToOneRdd.toDebugString)
// 数据缓存。
wordToOneRdd.cache()
// 可以更改存储级别
// mapRdd.persist(StorageLevel.MEMORY_AND_DISK_2)

存储级别

object StorageLevel {
 val NONE = new StorageLevel(false, false, false, false)
 val DISK_ONLY = new StorageLevel(true, false, false, false)
 val DISK_ONLY_2 = new StorageLevel(true, false, false, false, 2)
 val MEMORY_ONLY = new StorageLevel(false, true, false, true)
 val MEMORY_ONLY_2 = new StorageLevel(false, true, false, true, 2)
 val MEMORY_ONLY_SER = new StorageLevel(false, true, false, false)
 val MEMORY_ONLY_SER_2 = new StorageLevel(false, true, false, false, 2)
 val MEMORY_AND_DISK = new StorageLevel(true, true, false, true)
 val MEMORY_AND_DISK_2 = new StorageLevel(true, true, false, true, 2)
 val MEMORY_AND_DISK_SER = new StorageLevel(true, true, false, false)
 val MEMORY_AND_DISK_SER_2 = new StorageLevel(true, true, false, false, 2)
 val OFF_HEAP = new StorageLevel(true, true, true, false, 1)

在这里插入图片描述
缓存有可能丢失,或者存储于内存的数据由于内存不足而被删除,RDD 的缓存容错机制保证了即使缓存丢失也能保证计算的正确执行。通过基于 RDD 的一系列转换,丢失的数据会被重算,由于 RDD 的各个 Partition 是相对独立的,因此只需要计算丢失的部分即可,并不需要重算全部 Partition。
Spark 会自动对一些 Shuffle 操作的中间数据做持久化操作(比如:reduceByKey)。这样做的目的是为了当一个节点 Shuffle 失败了避免重新计算整个输入。但是,在实际使用的时候,如果想重用数据,仍然建议调用 persist 或cache

RDD CheckPoint 检查点
所谓的检查点其实就是通过将 RDD 中间结果写入磁盘由于血缘依赖过长会造成容错成本过高,这样就不如在中间阶段做检查点容错,如果检查点之后有节点出现问题,可以从检查点开始重做血缘,减少了开销。
对 RDD 进行 checkpoint 操作并不会马上被执行,必须执行 Action 操作才能触发。

// 设置检查点路径
sc.setCheckpointDir("./checkpoint1")
// 创建一个 RDD,读取指定位置文件
val lineRdd: RDD[String] = sc.textFile("input/test1.txt")
// 业务逻辑
val wordRdd: RDD[String] = lineRdd.flatMap(line => line.split(" "))
val wordToOneRdd: RDD[(String, Long)] = wordRdd.map {
 word => {
 (word, System.currentTimeMillis())
 }
}
// 增加缓存,避免再重新跑一个 job 做 checkpoint
wordToOneRdd.cache()
// 数据检查点:针对 wordToOneRdd 做检查点计算
wordToOneRdd.checkpoint()
// 触发执行逻辑
wordToOneRdd.collect().foreach(println)

缓存和检查点区别
1)Cache 缓存只是将数据保存起来,不切断血缘依赖。Checkpoint 检查点切断血缘依赖。
2)Cache 缓存的数据通常存储在磁盘、内存等地方,可靠性低。Checkpoint 的数据通常存
储在 HDFS 等容错、高可用的文件系统,可靠性高。
3)建议对 checkpoint()的 RDD 使用 Cache 缓存,这样 checkpoint 的 job 只需从 Cache 缓存
中读取数据即可,否则需要再从头计算一次 RDD。

示例1:
数据:user_test.csv

name,number
lisi,123
wangwu,456
zhangsan,789

RDD CheckPoint 检查点 测试脚本:

package SparkTest.SparkSql

import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SparkSession

object DSLTest {
  def main(args: Array[String]): Unit = {
    val session = SparkSession.builder().appName("DSLTest").master("local[*]").getOrCreate()
    import  session.implicits._
    import org.apache.spark.sql.functions._
    // 设置检查点路径
    val sc = session.sparkContext
    sc.setCheckpointDir("file:///F:/JavaTest/SparkDemo/checkpoint")

    // 创建一个 RDD,读取指定位置文件
    val lineRdd: RDD[String] = sc.textFile("file:///F:/JavaTest/SparkDemo/data/user_test.csv")
    
    // 业务逻辑
    val wordRdd: RDD[String] = lineRdd.flatMap(line => line.split(" "))
    val wordToOneRdd: RDD[(String, Long)] = wordRdd.map {
      word => {
        (word, System.currentTimeMillis())
      }
    }
    
    // 增加缓存,避免再重新跑一个 job 做 checkpoint
    wordToOneRdd.cache()
    // 数据检查点:针对 wordToOneRdd 做检查点计算
    wordToOneRdd.checkpoint()
    // 触发执行逻辑
    wordToOneRdd.collect().foreach(println)

    session.close()

  }
}
//结果:
(name,number,1682664424265)
(lisi,123,1682664424265)
(wangwu,456,1682664424265)
(zhangsan,789,1682664424265)
F:/JavaTest/SparkDemo/checkpoint下多出文件

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/482383.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

debian11快速 ceph集群17.2.6(Quincy版)

由于网友跟我讲Pacific版快到期了,所以出一个Quincy版的部署文档 配置一下源 echo "deb http://mirrors.163.com/ceph/debian-quincy/ bullseye main" > /etc/apt/sources.list.d/ceph.list还是像以前一样使用docker或者podman 安装工具cephadm ce…

java获取文件名后缀方法

Java是一种应用广泛的编程语言,可以通过多种方式来实现对文件的操作。如文件名后缀、文件扩展名等。今天我们来看下 Java是如何获取文件名后缀的吧! 1.打开一个空文件,将其复制到一个新的文件夹中。 2.新建一个类,在里面定义方法&…

mysql 数据库备份

目录 数据库备份的方式 一、备份整个 $datadir 二、用mysqldump备份 备份某个库 只备份某个库下某个表 有很多库时候,一次性备份所有的库 一次指定备份某几个库 只备份表结构,不要里面数据 数据库还原的方式 1、在对应数据库下source还原 2…

学系统集成项目管理工程师(中项)系列16a_风险管理(上)

1. 风险的定义 1.1. 损失的不确定性 1.1.1. 狭义 1.2. 带来损失的可能性,也指可能获利的机会 1.2.1. 广义 1.3. 风险是一种不确定的事件或条件,一旦发生,就会产生积极或消极的影响 2. 性质划分 2.1. 纯粹风险 2.1.1. 只有损失可能性而…

IntelliJ IDEA修改背景颜色大全(护眼绿等)设置注释颜色

一.IDEA默认有3种背景颜色 路径为File->settings->Editor->Color Scheme可以设置软件默认颜色,旁边的小齿轮添加颜色名字 二.IDEA扩展颜色(护眼绿) 第一种方法: IDEA设置一张背景图片,路径:File->Setti…

C#,生信软件实践(02)——欧洲分子生物学实验室(EMBL格式文件)转为核酸序列或多肽序列(FASTA格式文件)的源代码

>生信老白写的基础代码.fasta MAYBENOANYUSAGE 1 EMBL 1.1 EMBL组织 欧洲分子生物学实验室EMBL(European Molecular Biology Laboratory)1974年由欧洲14个国家加上亚洲的以色列共同发起建立,现在由欧洲30个成员国政府支持组成&#xf…

【ARMv8 编程】A64 内存访问指令——内存存储指令

在内存加载一节中实际上已经使用了内存存储指令了&#xff0c;内存存储指令将寄存器的值存储到内存中。 同样&#xff0c;Store 指令的一般形式如下&#xff1a; STR Rn, <addr> 还有 unscaled-offset 偏移形式&#xff0c;例如 STUR<type>。 程序员通常不需要明…

Python多元线性回归预测模型实验完整版

多元线性回归预测模型 实验目的 通过多元线性回归预测模型&#xff0c;掌握预测模型的建立和应用方法&#xff0c;了解线性回归模型的基本原理 实验内容 多元线性回归预测模型 实验步骤和过程 (1)第一步&#xff1a;学习多元线性回归预测模型相关知识。 一元线性回归模型…

高级数据结构专题

1.树状数组 设计二分&#xff0c;二叉树&#xff0c;位运算&#xff0c;前缀和等思想 lowbit x & -x 功能&#xff1a;找到x的二进制数的最后一个1 1.1 树状数组模板 def lowbit(x):return x &-x def add (x,d):while(x < n) :tree[x] dxlowbit(x) def sum(x):an…

零基础学会 Java,这是你需要按照学习的步骤,加油,新加入的你

学习 Java 需要遵循一定的步骤&#xff0c;首先需要学习计算机基础知识&#xff0c;例如算法、数据结构、计算机组成原理等。如果没有相关背景知识&#xff0c;可以参加计算机相关课程进行学习。其次是学习编程基础知识&#xff0c;例如控制流、变量、函数等&#xff0c;你可以…

详解c++---list模拟实现

目录标题 list的准备工作构造函数push_backlist迭代器beginendinserteraseclearconst迭代器list迭代器区间构造swap现代拷贝构造函数现代赋值重载sizeempty->重载 list的准备工作 首先我们知道list是一个带头双向链表&#xff0c;他的数据就会存储在一个一个的节点里面&…

Python基础合集 练习22 (错误与异常处理语句2)

‘’’ try: 语句块 except: 语句块2 else ‘’’ class Mobe1(): def init(self) -> None: pass def mob1(self):while True:try:num int(input(请输入一个数: ))result 50 / numprint(result)print(50/{0}{1}.format(num, result))except (ZeroDivisionError, ValueEr…

时代浪潮已经袭来 AI人工智能频频爆火 ChatGPT改变行业未来

目录 1 人工智能的发展 1.1人工智能发展历程 1.1.1 人工智能的起源 1.1.2 人工智能发展的起起伏伏 1.1.3 人工智能多元化 2 什么是ChatGPT 2.1 ChatGPT的主要功能 2.2ChatGPT对企业的多种优势 2.3 不必担心ChatGPT带来的焦虑 3 人工智能对行业未来的影响 3.1 人工智…

UG NX二次开发(C++)-建模-根据UFun创建的Tag_t转换为Body

文章目录 1、前言2、用UF_MODL_create_block1创建一个块3、将Tag_t转换为Body出现错误3、解决方法4、结论 1、前言 经常采用UG NX二次开发&#xff08;NXOpen C#&#xff09;&#xff08;UG NX二次开发&#xff08;C#&#xff09;专栏&#xff09;&#xff0c;在用UFun创建一个…

【Git 入门教程】第十节、Git的常见问题

Git是一个强大的版本控制系统&#xff0c;它可以帮助开发者管理和协调代码库。然而&#xff0c;初学者使用Git时可能会遇到一些问题。本文将列举一些常见的问题&#xff0c;并提供相应的解决方案。 1. Git无法识别文件权限 在使用Git时&#xff0c;有时候你可能会遇到类似于“…

异构无线传感器网络路由算法研究(Matlab代码实现)

目录 &#x1f4a5;1 概述 &#x1f4da;2 运行结果 &#x1f389;3 参考文献 &#x1f468;‍&#x1f4bb;4 Matlab代码 &#x1f4a5;1 概述 ​无线传感器网络(Wireless Sensor Networks, WSN)是一种新型的融合传感器、计算机、通信等多学科的信息获取和处理技术的网络,…

Ubuntu下跑通 nnUNet v2

网上关于nnUNet运行的教程大部分是针对nnUNet v1的。但由于nnUNet v2已经推出&#xff0c;而且相对于v1有了很大的更新。所以个人只能啃nnUNet的英文文档参考在Windows上实现nnU-Net v2的环境配置_netv2_无聊的程序猿的博客-CSDN博客 实现了代码的复现。 1.System requirement…

树与图的存储-邻接表与邻接矩阵-深度广度遍历

全部代码 全部代码在github acwing 上 正在更新 https://github.com/stolendance/acwing 图论 欢迎star与fork 树与图的存储 无论是树 还是无向图 都可以看成有向图 有向图可以采用邻接矩阵与邻接表进行存储 邻接矩阵 邻接矩阵 采用矩阵存储,graph[i][j] 代表i到j边的权重…

Python学习15:恺撒密码 B(python123)

描述 恺撒密码是古罗马凯撒大帝用来对军事情报进行加解密的算法&#xff0c;它采用了替换方法对信息中的每一个英文字符循环替换为字母表序列中该字符后面的第三个字符&#xff0c;即&#xff0c;字母表的对应关系如下&#xff1a;‪‬‪‬‪‬‪‬‪‬‮‬‪‬‫‬‪‬‪‬‪…

Markdown编辑器快捷方式

这里写自定义目录标题 欢迎使用Markdown编辑器新的改变功能快捷键合理的创建标题&#xff0c;有助于目录的生成如何改变文本的样式插入链接与图片如何插入一段漂亮的代码片生成一个适合你的列表创建一个表格设定内容居中、居左、居右SmartyPants 创建一个自定义列表如何创建一个…