Spark---RDD持久化

news2024/9/30 1:30:29

文章目录

  • 1.RDD持久化
      • 1.1 RDD Cache 缓存
      • 1.2 RDD CheckPoint 检查点
      • 1.3 缓存和检查点区别

1.RDD持久化

在Spark中,持久化是将RDD存储在内存中,以便在多次计算之间重复使用。这可以显著减少不必要的计算,提高Spark应用程序的性能。
在这里插入图片描述

    val lines = context.textFile("D:\\learnSoftWare\\IdeaProject\\Spark_Demo\\Spark_Core\\src\\main\\com.mao\\datas\\1.txt")
    //执行扁平化操作
    //扁平化就是将多个集合打散为一个集合
    val words = lines.flatMap((a: String) => a.split(" "))

    //对每个单词进行改造(hello,1)
    val wordMap = words.map(word => {
      println("@@@@@@@@@@@@@@@@@")
      (word, 1)
    })


    //reduceByKey要使用wordMap
    val wordToCount=wordMap.reduceByKey((t1,t2)=>t1+t2)
    wordToCount.collect().foreach(println)

    //groupByKey要使用wordMap
    val wordToGroup = wordMap.groupByKey()
    wordToGroup.collect().foreach(println)

在这里插入图片描述

如上述代码,reduceByKey和groupByKey都要是用wordMap的结果,由于RDD中是不存储数据的,在reduceByKey使用完成之后,groupByKey想要再次使用的时候,需要查找血缘关系,从头开始一步一步的执行,如果数据量大的情况下,会造成很大的成本上的浪费。

1.1 RDD Cache 缓存

RDD 通过 Cache 或者 Persist 方法将前面的计算结果缓存,默认情况下会把数据以缓存在 JVM 的堆内存中。 但是并不是这两个方法被调用时立即缓存,而是触发后面的 action 算子时,该 RDD 将会被缓存在计算节点的内存中,并供后面重用。

可以通过设置persist来将数据存储到内存或者磁盘中

    // 数据缓存。
    wordMap.cache()
    // 可以更改存储级别
    //wordMap.persist(StorageLevel.MEMORY_AND_DISK_2)

在这里插入图片描述
可以看出将中间结果放入缓存中后,第二次使用中间结果的时候,将不会从头再执行一遍,而是直接从缓存中读取数据,

存储级别:

object StorageLevel {
 val NONE = new StorageLevel(false, false, false, false)
 val DISK_ONLY = new StorageLevel(true, false, false, false)
 val DISK_ONLY_2 = new StorageLevel(true, false, false, false, 2)
 val MEMORY_ONLY = new StorageLevel(false, true, false, true)
 val MEMORY_ONLY_2 = new StorageLevel(false, true, false, true, 2)
 val MEMORY_ONLY_SER = new StorageLevel(false, true, false, false)
 val MEMORY_ONLY_SER_2 = new StorageLevel(false, true, false, false, 2)
 val MEMORY_AND_DISK = new StorageLevel(true, true, false, true)
 val MEMORY_AND_DISK_2 = new StorageLevel(true, true, false, true, 2)
 val MEMORY_AND_DISK_SER = new StorageLevel(true, true, false, false)
 val MEMORY_AND_DISK_SER_2 = new StorageLevel(true, true, false, false, 2)
 val OFF_HEAP = new StorageLevel(true, true, true, false, 1)

在这里插入图片描述
Spark 会自动对一些 Shuffle 操作的中间数据做持久化操作(比如:reduceByKey)。这样做的目的是为了当一个节点 Shuffle 失败了避免重新计算整个输入。但是,在实际使用的时候,如果想重用数据,仍然建议调用persist 或 cache。

1.2 RDD CheckPoint 检查点

检查点其实就是通过将 RDD 中间结果写入磁盘
由于血缘依赖过长会造成容错成本过高,这样就不如在中间阶段做检查点容错,如果检查点之后有节点出现问题,可以从检查点开始重做血缘,减少了开销。
对 RDD 进行 checkpoint 操作并不会马上被执行,必须执行 Action 操作才能触发。

检查点的数据需要进行落盘,当作业执行结束后,不会被删除。检查点数据一般都是存储在hdfs上。

    //建立与Spark框架的连接
    val wordCount = new SparkConf().setMaster("local").setAppName("WordCount")//配置文件
    val context = new SparkContext(wordCount)//读取配置文件

    // 设置检查点路径
    //这里选择将数据落盘在本地
    context.setCheckpointDir("./checkpoint1")

    //执行业务操作
    val lines = context.textFile("D:\\learnSoftWare\\IdeaProject\\Spark_Demo\\Spark_Core\\src\\main\\com.mao\\datas\\1.txt")
    //执行扁平化操作
    //扁平化就是将多个集合打散为一个集合
    val words = lines.flatMap((a: String) => a.split(" "))

    //对每个单词进行改造(hello,1)
    val wordMap = words.map(word => {
      println("@@@@@@@@@@@@@@@@@")
      (word, 1)
    })

    // 数据缓存。
    wordMap.cache()

    // 数据检查点:针对 wordToOneRdd 做检查点计算
    wordMap.checkpoint()


    //reduceByKey要使用wordMap
    val wordToCount=wordMap.reduceByKey((t1,t2)=>t1+t2)
    wordToCount.collect().foreach(println)

    //groupByKey要使用wordMap
    val wordToGroup = wordMap.groupByKey()
    wordToGroup.collect().foreach(println)



    //关闭连接
    context.stop()

1.3 缓存和检查点区别

1)Cache 缓存只是将数据保存起来,不切断血缘依赖,即Cache相当于增加一个依赖关系。Checkpoint 检查点切断血缘依赖。
2)Cache 缓存的数据通常存储在磁盘、内存等地方,可靠性低。Checkpoint 的数据通常存
储在 HDFS 等容错、高可用的文件系统,可靠性高。
3)建议对 checkpoint()的 RDD 使用 Cache 缓存,这样 checkpoint 的 job 只需从 Cache 缓存
中读取数据即可,否则需要再从头计算一次 RDD。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1378943.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

MATLAB - 四旋翼飞行器动力学方程

系列文章目录 前言 本例演示了如何使用 Symbolic Math Toolbox™(符号数学工具箱)推导四旋翼飞行器的连续时间非线性模型。具体来说,本例讨论了 getQuadrotorDynamicsAndJacobian 脚本,该脚本可生成四旋翼状态函数及其雅各布函数…

Hive基础知识(十):Hive导入数据的五种方式

1. 向表中装载数据(Load) 1)语法 hive> load data [local] inpath 数据的 path[overwrite] into table student [partition (partcol1val1,…)]; (1)load data:表示加载数据 (2)local:表示…

蓝桥杯练习题(五)

📑前言 本文主要是【算法】——蓝桥杯练习题(五)的文章,如果有什么需要改进的地方还请大佬指出⛺️ 🎬作者简介:大家好,我是听风与他🥇 ☁️博客首页:CSDN主页听风与他 …

UE4工程升级UE5教程及注意事项

原文链接:https://mp.weixin.qq.com/s/vSVu0VsNub0J62Nz7vM6cA虚幻引擎5迁移指南 | 虚幻引擎5.3文档 (unrealengine.com) 官方教程应该是从英文直接翻译过来的,过多词汇没修改,本篇重新整理修改一下,供各位参考。 本教程介绍&…

基于JAVA的数据可视化的智慧河南大屏 开源项目

目录 一、摘要1.1 项目介绍1.2 项目录屏 二、功能模块三、系统展示四、核心代码4.1 数据模块 A4.2 数据模块 B4.3 数据模块 C4.4 数据模块 D4.5 数据模块 E 五、免责说明 一、摘要 1.1 项目介绍 基于JAVAVueSpringBootMySQL的数据可视化的智慧河南大屏,包含了GDP、…

分裂联邦学习论文-混合联邦分裂学习GAN驱动的预测性多目标优化

论文标题:《Predictive GAN-Powered Multi-Objective Optimization for Hybrid Federated Split Learning》 期刊:IEEE Transactions on Communications, 2023 一、论文介绍 背景:联邦学习作为一种多设备协同训练的边缘智能算法&#xff0…

IDEA—初始化配置

注:以下红框圈的部分,均为已设置好的 外观与行为 编辑器 高级设置 按两次 shift 弹出提示问题解决

OpenCV-19图像的仿射变换

放射变换是图像旋转,缩放,平移的总称,具体的做法是通过一个矩阵和原图片坐标进行计算,得到新的坐标,完成变换,所以关键就是这个矩阵。 一、仿射变换之图像平移 使用API------warpAffine(src &…

Nightingale 夜莺监控系统 - 监控篇(2)

Author:rab 官方文档:https://flashcat.cloud/docs/content/flashcat-monitor/categraf/3-configuration/ 目录 前言一、Categraf 配置文件二、Input 插件配置文件2.1 插件说明2.2 通用配置2.2.1 配置采集频率 interval2.2.2 配置采集实例 instances2.2…

C#编程-在线程中使用同步

在线程中使用同步 在线程应用程序中,线程需要相互共享数据。但是,应用程序应该确保一个线程不更改另一个线程使用的数据。考虑有两个线程的场景。一个线程从文件读取工资,另一个线程尝试更新工资。当两个线程同时工作时,数据就会受损。下图显示了两个线程同时访问一个文件…

【JAVA】concurrentHashMap和HashTable有什么区别

🍎个人博客:个人主页 🏆个人专栏:JAVA ⛳️ 功不唐捐,玉汝于成 目录 前言 正文 同步性质: 性能: 允许空键值(Allow Nulls): 迭代器(Iter…

Flask+ Dependency-injecter+pytest 写测试类

最近在使用这几个在做项目,因为第一次用这个,所以不免有些问题。总结下踩的坑 1.测试类位置 首先测试类约定会放在tests里面,不然有可能发生引入包的问题,会报错某些包找不到。 2. 测试类依赖注入 这里我就用的真实的数据库操作…

[AutoSar]BSW_OS 01 Autosar OS入门(一)

目录 关键词平台说明一、Autosar OS 的位置二、Autosar OS 与OSEK三、TASK 关键词 嵌入式、C语言、autosar、OS、BSW 平台说明 项目ValueOSautosar OSautosar厂商vector芯片厂商TI编程语言C,C编译器HighTec (GCC) 一、Autosar OS 的位置 如在[AutoSar]基础部分 a…

如何使用统计鸟网站统计分析网站流量来源?

统计鸟官网地址:https://www.tongjiniao.com/ 站长必备!网站数据统计,流量监测平台 提供网站数据统计分析、搜索关键词、流量访问来源等服务 深入分析用户点击习惯,为智能化运营网站提供更好的用户体验 目录 一、注册账号信息 二…

电位器的基本知识

一、电位器简介 电位器是一种可调的电子元件。它是由一个电阻体和一个转动或滑动系统组成。当电阻体的两个固定触电之间外加一个电压时,通过转动或滑动系统改变触点在电阻体上的位置,在动触点与固定触点之间便可得到一个与动触点位置成一定关系的电压。…

DFT中的SCAN、BIST、ATPG基本概念

DFT中的SCAN、BIST、ATPG基本概念 SCAN 定义 扫描路径法是一种针对时序电路芯片的DFT方案,目标是在不影响正常功能的情况下来能够提高可控性和可观测性。 原理 原理是将时序电路可以模型化为一个组合电路网络和带触发器(Flip-Flop,简称FF)的时序电路…

【开源】基于JAVA的数据可视化的智慧河南大屏

目录 一、摘要1.1 项目介绍1.2 项目录屏 二、功能模块三、系统展示四、核心代码4.1 数据模块 A4.2 数据模块 B4.3 数据模块 C4.4 数据模块 D4.5 数据模块 E 五、免责说明 一、摘要 1.1 项目介绍 基于JAVAVueSpringBootMySQL的数据可视化的智慧河南大屏,包含了GDP、…

蚁群算法(ACO)解决旅行商(TSP)问题的python实现

TSP问题 旅行商问题(Travelling Salesman Problem, 简记TSP,亦称货郎担问题):设有n个城市和距离矩阵D [dij],其中dij表示城市i到城市j的距离,i, j 1, 2 … n,则问题是要找出遍访每个城市恰好一次的一条回…

c#多线程中使用SemaphoreSlim

SemaphoreSlim是一个用于同步和限制并发访问的类,和它类似的还有Semaphore,只是SemaphoreSlim更加的轻量、高效、好用。今天说说它,以及如何使用,在什么时候去使用,使用它将会带来什么优势。 代码的业务是&#xff1a…

InseRF: 文字驱动的神经3D场景中的生成对象插入

每周跟踪AI热点新闻动向和震撼发展 想要探索生成式人工智能的前沿进展吗?订阅我们的简报,深入解析最新的技术突破、实际应用案例和未来的趋势。与全球数同行一同,从行业内部的深度分析和实用指南中受益。不要错过这个机会,成为AI领…