Spark RDD持久化

news2024/12/24 11:38:42

RDD Cache缓存

        RDD通过Cache或者Persist方法将前面的计算结果缓存,默认情况下会把数据以序列化的形式缓存在JVM的堆内存中。但是并不是这两个方法被调用时立即缓存,而是触发后面的action时,该RDD将会被缓存在计算节点的内存中,并供后面重用。

 1)代码实现

object cache01 {

    def main(args: Array[String]): Unit = {
        //1.创建SparkConf并设置App名称
        val conf: SparkConf = new SparkConf().setAppName("SparkCoreTest").setMaster("local[*]")

        //2.创建SparkContext,该对象是提交Spark App的入口
        val sc: SparkContext = new SparkContext(conf)

        //3. 创建一个RDD,读取指定位置文件:hello atguigu atguigu
        val lineRdd: RDD[String] = sc.textFile("input1")

        //3.1.业务逻辑
        val wordRdd: RDD[String] = lineRdd.flatMap(line => line.split(" "))

        val wordToOneRdd: RDD[(String, Int)] = wordRdd.map {
            word => {
                println("************")
                (word, 1)
            }
        }

        //3.5 cache操作会增加血缘关系,不改变原有的血缘关系
        println(wordToOneRdd.toDebugString)

        //3.4 数据缓存。
        wordToOneRdd.cache()
        //3.6 可以更改存储级别
        // wordToOneRdd.persist(StorageLevel.MEMORY_AND_DISK_2)

        //3.2 触发执行逻辑
        wordToOneRdd.collect()

        println("-----------------")
        println(wordToOneRdd.toDebugString)

        //3.3 再次触发执行逻辑
        wordToOneRdd.collect()

        //4.关闭连接
        sc.stop()
    }
}

2)源码解析

mapRdd.cache()
def cache(): this.type = persist()
def persist(): this.type = persist(StorageLevel.MEMORY_ONLY)

object StorageLevel {
  val NONE = new StorageLevel(false, false, false, false)
  val DISK_ONLY = new StorageLevel(true, false, false, false)
  val DISK_ONLY_2 = new StorageLevel(true, false, false, false, 2)
  val MEMORY_ONLY = new StorageLevel(false, true, false, true)
  val MEMORY_ONLY_2 = new StorageLevel(false, true, false, true, 2)
  val MEMORY_ONLY_SER = new StorageLevel(false, true, false, false)
  val MEMORY_ONLY_SER_2 = new StorageLevel(false, true, false, false, 2)
  val MEMORY_AND_DISK = new StorageLevel(true, true, false, true)
  val MEMORY_AND_DISK_2 = new StorageLevel(true, true, false, true, 2)
  val MEMORY_AND_DISK_SER = new StorageLevel(true, true, false, false)
  val MEMORY_AND_DISK_SER_2 = new StorageLevel(true, true, false, false, 2)
  val OFF_HEAP = new StorageLevel(true, true, true, false, 1)

        注意:默认的存储级别都是仅在内存存储一份。在存储级别的末尾加上“_2”表示持久化的数据存为两份。

        缓存有可能丢失,或者存储于内存的数据由于内存不足而被删除,RDD的缓存容错机制保证了即使缓存丢失也能保证计算的正确执行。通过基于RDD的一系列转换,丢失的数据会被重算,由于RDD的各个Partition是相对独立的,因此只需要计算丢失的部分即可,并不需要重算全部Partition。

3)自带缓存算子

        Spark会自动对一些Shuffle操作的中间数据做持久化操作(比如:reduceByKey)。这样做的目的是为了当一个节点Shuffle失败了避免重新计算整个输入。但是,在实际使用的时候,如果想重用数据,仍然建议调用persist或cache。

object cache02 {

    def main(args: Array[String]): Unit = {
        //1.创建SparkConf并设置App名称
        val conf: SparkConf = new SparkConf().setAppName("SparkCoreTest").setMaster("local[*]")

        //2.创建SparkContext,该对象是提交Spark App的入口
        val sc: SparkContext = new SparkContext(conf)

        //3. 创建一个RDD,读取指定位置文件:hello atguigu atguigu
        val lineRdd: RDD[String] = sc.textFile("input1")

        //3.1.业务逻辑
        val wordRdd: RDD[String] = lineRdd.flatMap(line => line.split(" "))

        val wordToOneRdd: RDD[(String, Int)] = wordRdd.map {
            word => {
                println("************")
                (word, 1)
            }
        }
        
        // 采用reduceByKey,自带缓存
        val wordByKeyRDD: RDD[(String, Int)] = wordToOneRdd.reduceByKey(_+_)

        //3.5 cache操作会增加血缘关系,不改变原有的血缘关系
        println(wordByKeyRDD.toDebugString)

        //3.4 数据缓存。
        //wordByKeyRDD.cache()

        //3.2 触发执行逻辑
        wordByKeyRDD.collect()

        println("-----------------")
        println(wordByKeyRDD.toDebugString)

        //3.3 再次触发执行逻辑
        wordByKeyRDD.collect()

        //4.关闭连接
        sc.stop()
    }
}

        访问http://localhost:4040/jobs/页面,查看第一个和第二个job的DAG图。说明:增加缓存后血缘依赖关系仍然有,但是,第二个job取的数据是从缓存中取的。

RDD CheckPoint检查点 

1)检查点:是通过将RDD中间结果写入磁盘。

2)为什么要做检查点?

        由于血缘依赖过长会造成容错成本过高,这样就不如在中间阶段做检查点容错,如果检查点之后有节点出现问题,可以从检查点开始重做血缘,减少了开销。

3)检查点存储路径:Checkpoint的数据通常是存储在HDFS等容错、高可用的文件系统

4)检查点数据存储格式为:二进制的文件

5)检查点切断血缘:在Checkpoint的过程中,该RDD的所有依赖于父RDD中的信息将全部被移除。

6)检查点触发时间:对RDD进行checkpoint操作并不会马上被执行,必须执行Action操作才能触发。但是检查点为了数据安全,会从血缘关系的最开始执行一遍

7)设置检查点步骤

(1)设置检查点数据存储路径:sc.setCheckpointDir("./checkpoint1")

(2)调用检查点方法:wordToOneRdd.checkpoint()

8)代码实现

 

object checkpoint01 {

    def main(args: Array[String]): Unit = {
        //1.创建SparkConf并设置App名称
        val conf: SparkConf = new SparkConf().setAppName("SparkCoreTest").setMaster("local[*]")

        //2.创建SparkContext,该对象是提交Spark App的入口
        val sc: SparkContext = new SparkContext(conf)

        // 需要设置路径,否则抛异常:Checkpoint directory has not been set in the SparkContext
        sc.setCheckpointDir("./checkpoint1")

        //3. 创建一个RDD,读取指定位置文件:hello atguigu atguigu
        val lineRdd: RDD[String] = sc.textFile("input1")

        //3.1.业务逻辑
        val wordRdd: RDD[String] = lineRdd.flatMap(line => line.split(" "))

        val wordToOneRdd: RDD[(String, Long)] = wordRdd.map {
            word => {
                (word, System.currentTimeMillis())
            }
        }

        //3.5 增加缓存,避免再重新跑一个job做checkpoint
//        wordToOneRdd.cache()

        //3.4 数据检查点:针对wordToOneRdd做检查点计算
        wordToOneRdd.checkpoint()

        //3.2 触发执行逻辑
        wordToOneRdd.collect().foreach(println)
        // 会立即启动一个新的job来专门的做checkpoint运算

        //3.3 再次触发执行逻辑
        wordToOneRdd.collect().foreach(println)
        wordToOneRdd.collect().foreach(println)

        Thread.sleep(10000000)

        //4.关闭连接
        sc.stop()
    }
}

9)执行结果

        访问http://localhost:4040/jobs/页面,查看4个job的DAG图。其中第2个图是checkpoint的job运行DAG图。第3、4张图说明,检查点切断了血缘依赖关系。

(1)只增加checkpoint,没有增加Cache缓存打印

第1个job执行完,触发了checkpoint,第2个job运行checkpoint,并把数据存储在检查点上。第3、4个job,数据从检查点上直接读取。

(hadoop,1577960215526)

。。。。。。

(hello,1577960215526)

(hadoop,1577960215609)

。。。。。。

(hello,1577960215609)

(hadoop,1577960215609)

。。。。。。

(hello,1577960215609)

(2)增加checkpoint,也增加Cache缓存打印

第1个job执行完,数据就保存到Cache里面了,第2个job运行checkpoint,直接读取Cache里面的数据,并把数据存储在检查点上。第3、4个job,数据从检查点上直接读取。

(hadoop,1577960642223)

。。。。。。

(hello,1577960642225)

(hadoop,1577960642223)

。。。。。。

(hello,1577960642225)

(hadoop,1577960642223)

。。。。。。

(hello,1577960642225)

 

 缓存和检查点区别

1)Cache缓存只是将数据保存起来,不切断血缘依赖。Checkpoint检查点切断血缘依赖。

2)Cache缓存的数据通常存储在磁盘、内存等地方,可靠性低。Checkpoint的数据通常存储在HDFS等容错、高可用的文件系统,可靠性高。

3)建议对checkpoint()的RDD使用Cache缓存,这样checkpoint的job只需从Cache缓存中读取数据即可,否则需要再从头计算一次RDD。

4)如果使用完了缓存,可以通过unpersist()方法释放缓存

检查点存储到HDFS集群

如果检查点数据存储到HDFS集群,要注意配置访问集群的用户名。否则会报访问权限异常。

object checkpoint02 {

    def main(args: Array[String]): Unit = {

        // 设置访问HDFS集群的用户名
        System.setProperty("HADOOP_USER_NAME","atguigu")

        //1.创建SparkConf并设置App名称
        val conf: SparkConf = new SparkConf().setAppName("SparkCoreTest").setMaster("local[*]")

        //2.创建SparkContext,该对象是提交Spark App的入口
        val sc: SparkContext = new SparkContext(conf)

        // 需要设置路径.需要提前在HDFS集群上创建/checkpoint路径
        sc.setCheckpointDir("hdfs://hadoop102:9000/checkpoint")

        //3. 创建一个RDD,读取指定位置文件:hello atguigu atguigu
        val lineRdd: RDD[String] = sc.textFile("input1")

        //3.1.业务逻辑
        val wordRdd: RDD[String] = lineRdd.flatMap(line => line.split(" "))

        val wordToOneRdd: RDD[(String, Long)] = wordRdd.map {
            word => {
                (word, System.currentTimeMillis())
            }
        }

        //3.4 增加缓存,避免再重新跑一个job做checkpoint
        wordToOneRdd.cache()

        //3.3 数据检查点:针对wordToOneRdd做检查点计算
        wordToOneRdd.checkpoint()

        //3.2 触发执行逻辑
        wordToOneRdd.collect().foreach(println)

        //4.关闭连接
        sc.stop()
    }
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/380655.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

杂记——14.git在idea上的使用及其实际开发介绍

这篇文章我们来讲一下git在idea上的使用,以及在实际开发过程中各个分支的使用及其具体的流程 目录 1.git在idea上的使用 1.1 idea上的git提交 1.2 idea上的分支切换 2.git在实际运用时的分支及其流程 2.1分支介绍 2.2具体流程 3.小结 1.git在idea上的使用 …

GraalVM-云原生时代的JVM(Java)

文章目录一、GraalVM是什么?二、GraalVM有哪些特点?2.1、高性能2.2、多语言支持2.3、互操作性2.4、安全性三、GraalVM的应用效果3.1、提高性能3.2、简化开发3.3、降低成本3.4、节省资源3.5、支持云环境四、使用GraalVM编译springboot应用程序4.1、下载并…

网络应用之表单提交

表单提交学习目标能够知道表单的提交方式能够知道表单中action属性的作用1. 表单属性设置<form>标签 表示表单标签&#xff0c;定义整体的表单区域action属性 设置表单数据提交地址method属性 设置表单提交的方式&#xff0c;一般有“GET”方式和“POST”方式, 不区分大小…

嵌入式之ubuntu终端操作与shell常用命令详解

目录 文件和目录列表 基本列表功能 显示列表长度 过滤输出列表 浏览文件系统 Linux 文件系统 遍历目录 处理文件 创建文件 复制文件 制表键自动补全 重命名文件 删除文件 处理目录 创建目录 删除目录 ​编辑其他常用命令与操作 Uname命令 clear命令 返回上一级命令 显…

Netty学习(一):Netty概述

一、原生NIO存在的问题 NIO 的类库和API繁杂&#xff0c;使用麻烦:需要熟练掌握Selector、ServerSocketChannel、SocketChannel、ByteBuffer等。需要具备其他的额外技能:要熟悉Java 多线程编程&#xff0c;因为NIO编程涉及到Reactor 模式&#xff0c;你必须对多线程和网络编程…

buu刷题(第一周)

目录 [DDCTF 2019]homebrew event loop action:trigger_event%23;action:buy;5%23action:get_flag; [CISCN2019 华东南赛区]Web4 [RootersCTF2019]babyWeb [GWCTF 2019]mypassword [NESTCTF 2019]Love Math 2 [BSidesCF 2019]Pick Tac Toe [RootersCTF2019]ImgXweb [SW…

2023 年网络安全漏洞的主要原因

​  网络安全漏洞已经并将继续成为企业面临的主要问题。因此&#xff0c;对于企业领导者来说&#xff0c;了解这些违规行为的原因至关重要&#xff0c;这样他们才能更好地保护他们的数据。 在这篇博文中&#xff0c;我们将概述 2023 年比较普遍的网络安全漏洞的主要原因。 云…

OpenCV4.x图像处理实例-道路车辆检测(基于背景消减法)

通过背景消减进行道路车辆检测 文章目录 通过背景消减进行道路车辆检测1、车辆检测思路介绍2、BackgroundSubtractorMOG23、车辆检测实现在本文中,将介绍如何使用简单但有效的背景-前景减法方法执行车辆检测等任务。本文将使用 OpenCV 中使用背景-前景减法和轮廓检测,以及如何…

这篇教你搞定Android内存优化分析总结

一、内存优化概念1.1 为什么要做内存优化&#xff1f;内存优化一直是一个很重要但却缺乏关注的点&#xff0c;内存作为程序运行最重要的资源之一&#xff0c;需要运行过程中做到合理的资源分配与回收&#xff0c;不合理的内存占用轻则使得用户应用程序运行卡顿、ANR、黑屏&…

QT之OpenGL帧缓冲

QT之OpenGL帧缓冲1. 概述1.1 帧缓冲的创建与删除1.2 帧缓冲的数据来源1.2.1 数据源与帧缓冲的关系1.2.2 纹理Attachment1.2.3 渲染缓冲对象Attachment1.2.4 两者的区别1.2.5 关于两者的使用场景2. Demo3. 后期处理4. 参考1. 概述 OpenGL管线的最终渲染目的地被称作帧缓冲(fram…

【代码随想录训练营】【Day28】第七章|回溯算法|93.复原IP地址|78.子集|90.子集II

复原IP地址 题目详细&#xff1a;LeetCode.93 这道题与上一道练习题分割回文字符串十分详细&#xff0c;一样是涉及到分割字符串、判断字符串、递归与回溯的问题&#xff0c;所以这道题要解决的难点在于&#xff1a; 如何分割IP地址字符串如何判断分割的IP地址是否合法递归的…

Kafka基本概念

什么是Kafka Kafka是一个消息系统。它可以集中收集生产者的消息&#xff0c;并由消费者按需获取。在Kafka中&#xff0c;也将消息称为日志(log)。 一个系统&#xff0c;若仅有一类或者少量的消息&#xff0c;可直接进行发送和接收。 随着业务量日益复杂&#xff0c;消息的种类…

2.单例模式

基本概念 单例模式&#xff1a;保证一个类只有一个实例&#xff0c;并提供一个访问该实例的全局访问点 常见应用场景 读取配置文件的类一般设计为单例模式网站计数器应用程序的日志应用&#xff0c;因为共享日志文件一直处于打开状态&#xff0c;只能有一个实例去操作Spring…

新C++(11):unordered_map\set的封装

"假如可以让音乐停下来"一、unordered_map\unordered_set简介在C98中&#xff0c;STL底层提供了以红黑树封装的关联式容器map\set&#xff0c;其查询效率可以达到LogN(以2为底)。而在C11中&#xff0c;STL又提供了unordered(无序)容器&#xff0c;其使用方式与map\se…

企业对不同形态CRM系统价格需求不同

很多企业在选型时关心CRM客户管理系统的价格&#xff0c;有人对CRM的价格完全没有概念&#xff0c;也有的人先问价格再看其他。CRM价格在系统选型中到底有多重要&#xff1f;不同类型CRM系统的价格是否有所不同&#xff1f; CRM的不同产品形态也会影响价格 通常情况下&#x…

十五、MyBatis使用PageHelper

1.limit分页 limit分页原理 mysql的limit后面两个数字&#xff1a; 第一个数字&#xff1a;startIndex&#xff08;起始下标。下标从0开始。&#xff09; 第二个数字&#xff1a;pageSize&#xff08;每页显示的记录条数&#xff09; 假设已知页码pageNum&#xff0c;还有每页…

移动端笔记

目录 一、移动端基础 二、视口 三、二倍图/多倍图 四、移动端开发 &#xff08;一&#xff09;开发选择 &#xff08;二&#xff09;常见布局 &#xff08;三&#xff09;移动端技术解决方案 五、移动WEB开发之flex布局 六、移动WEB开发之rem适配布局 #END&#xff08…

嘀嗒出行再闯IPO:千军万马我无懈

羽扇纶巾笑谈间&#xff0c;千军万马我无懈。 在激烈竞争中再度冲刺港交所IPO的嘀嗒出行&#xff0c;闪露出一丝歌词里的气魄。交通运输部下属网约车监管信息交互系统的数据显示&#xff0c;截至2023年1月31日&#xff0c;全国共有300家网约车平台公司取得网约车平台经营许可。…

如何使用COM-Hunter检测持久化COM劫持漏洞

关于COM-Hunter COM- Hunter是一款针对持久化COM劫持漏洞的安全检测工具&#xff0c;该工具基于C#语言开发&#xff0c;可以帮助广大研究人员通过持久化COM劫持技术来检测目标应用程序的安全性。 关于COM劫持 微软在Windows 3.11中引入了(Component Object Model, COM)&…

2月第4周榜单丨飞瓜数据B站UP主排行榜(哔哩哔哩平台)发布!

飞瓜轻数发布2023年2月20日-2月26日飞瓜数据UP主排行榜&#xff08;B站平台&#xff09;&#xff0c;通过充电数、涨粉数、成长指数三个维度来体现UP主账号成长的情况&#xff0c;为用户提供B站号综合价值的数据参考&#xff0c;根据UP主成长情况用户能够快速找到运营能力强的B…