RDD缓存机制及持久化技术

news2025/1/11 22:45:04

文章目录

    • RDD缓存
      • RDD缓存API介绍
      • RDD缓存代码演示示例
      • RDD缓存执行原理
    • RDD CheckPoint
      • CheckPoint代码演示示例
      • CheckPoint与Cache对比

RDD缓存

  • RDD之间进行Transformation计算,当执行开启之后,就会有新的RDD生成,而之前老的RDD就会消失,所以RDD是过程数据,只在处理过程中存在,一旦处理完成,就会消失。这样的特性就是可以最大化利用资源,内存得到释放能腾出更多的空间以便后续的使用
  • 以上场景也会出现一个问题,就是如果一个程序中,运行了两个job。而我们在执行第一个之后,由于RDD的消失,在执行第二个任务时,又得重头再去执行,这样就显得有些麻烦了。
  • 对于以上场景Spark提供了RDD缓存的API,我们可以通过调用API来讲指定的RDD保留在内存或者磁盘中。

RDD缓存API介绍

# 缓存到内存中
rdd.cache()
# 仅内存缓存
rdd.persist(StorageLevel.MEMORY_ONLY)
# 仅内存缓存,2副本
rdd.persist(StorageLevel.MEMORY_ONLY_2)
# 仅缓存到磁盘
rdd.persist(StorageLevel.DISK_ONLY)
# 仅缓存到磁盘,2副本
rdd.persist(StorageLevel.DISK_ONLY_2)
# 仅缓存到磁盘,3副本
rdd.persist(StorageLevel.DISK_ONLY_3)
# 先放内存不够再放磁盘
rdd.persist(StorageLevel.MEMORY_AND_DISK)
# 先放内存,不够再放磁盘,2副本
rdd.persist(StorageLevel.MEMORY_AND_DISK_2)
# 堆外内存(系统内存)
rdd.persist(StorageLevel.OFF_HEAP)
# 主动清理缓存
rdd.unpersist()

RDD缓存代码演示示例

  • 以WordCount为例,不加缓存
import time
from pyspark import SparkConf, SparkContext
if __name__ == '__main__':
    conf  = SparkConf().setMaster('local[*]').setAppName('test')
    sc = SparkContext(conf=conf)

    rdd1 = sc.textFile("../Data/input/words.txt")
    rdd2 = rdd1.flatMap(lambda line : line.split(" "))
    rdd3 = rdd2.map(lambda x : (x, 1))
    rdd4 = rdd3.reduceByKey(lambda a,b : a + b)
    print(rdd4.collect())

    rdd5 = rdd3.groupByKey()
    rdd6 = rdd5.mapValues(lambda x : sum(x))
    print(rdd6.collect())

    time.sleep(10000)
  • 在spark4040界面的DAG图中我们可以看出第二个job的DAG图又是从textFile开始又执行了一次,如下图
    在这里插入图片描述
  • 加入缓存之后的代码:
在这里插入代码片
# coding: utf8
import time
from pyspark import SparkConf, SparkContext
from pyspark.storagelevel import StorageLevel
if __name__ == '__main__':
    conf  = SparkConf().setMaster('local[*]').setAppName('test')
    sc = SparkContext(conf=conf)

    rdd1 = sc.textFile("../Data/input/words.txt")
    rdd2 = rdd1.flatMap(lambda line : line.split(" "))
    rdd3 = rdd2.map(lambda x : (x, 1))
    rdd3.cache()
    rdd3.persist(StorageLevel.MEMORY_ONLY) 
    rdd4 = rdd3.reduceByKey(lambda a,b : a + b)
    print(rdd4.collect())

    rdd5 = rdd3.groupByKey()
    rdd6 = rdd5.mapValues(lambda x : sum(x))
    print(rdd6.collect())

    rdd3.unpersist()
    time.sleep(10000)
  • 在spark4040界面的DAG图中我们可以看出第二个job的DAG图又是从绿色小点之后开始执行,证明已经添加了缓存机制。
    在这里插入图片描述

RDD缓存执行原理

  • 以三分区的RDD为例,如下图所示,根据调用的API来设定
    在这里插入图片描述
  • RDD将自己分区的数据按照每一个分区自行保留在其所在Executor服务器的内存和硬盘上,这就是RDD的缓存分散存储缓存一定要保留被缓存RDD前置的血缘关系

RDD CheckPoint

  • CheckPoint技术也是将RDD的数据保存起来,但是它仅支持硬盘存储。在设计的角度上讲,CheckPoint是安全的,但是并不保留血缘关系。
  • CheckPoint保存数据原理图,以三分区保存到HDFS为例:
    在这里插入图片描述
  • CheckPoint存储RDD数据是集中收集各个分区的数据进行存储,也叫集中存储。

CheckPoint代码演示示例

  • API:
# 设置路径既可以是local模式下的本地文件系统,又可以选用HDFS
sc.setCheckpointDir()
# 直接调用CheckPoint算子
rdd.checkpoint()
# coding: utf8
import time
from pyspark import SparkConf, SparkContext
if __name__ == '__main__':
    conf  = SparkConf().setMaster('local[*]').setAppName('test')
    sc = SparkContext(conf=conf)

    # 开启CheckPoint功能
    sc.setCheckpointDir("hdfs://node1:8020/Test/CheckPoint")

    rdd1 = sc.textFile("../Data/input/words.txt")
    rdd2 = rdd1.flatMap(lambda line : line.split(" "))
    rdd3 = rdd2.map(lambda x : (x, 1))

    # 调用CheckPoint API
    rdd3.checkpoint()
    rdd4 = rdd3.reduceByKey(lambda a,b : a + b)
    print(rdd4.collect())

    rdd5 = rdd3.groupByKey()
    rdd6 = rdd5.mapValues(lambda x : sum(x))
    print(rdd6.collect())
    
    time.sleep(10000)
  • 在spark4040界面的DAG图中我们可以看出,开头是CheckPoint,表示数据从CheckPoint读取了,也能看出CheckPoint不保留血缘关系。
    在这里插入图片描述
  • CheckPoint是一种重量级的使用,当RDD重新计算成本很高的情况下可以使用CheckPoint,如果是小数据量并且对RDD重新计算无可厚非的情况,直接使用cache最好。
  • 不管是CheckPoint还是cache都是Acition类型,如果想要这两个api工作后续必须添加Action算子。

CheckPoint与Cache对比

  • CheckPoint不管分区数量是多少,所承担的风险是一样的;而缓存分区越多,风险越高。
  • CheckPoint支持写入HDFS,缓存则不行,HDFS属于高可靠存储,所以CheckPoint安全性比缓存高。
  • CheckPoint不支持内存,缓存则可以,性能上缓存要比CheckPoint好。
  • CheckPoint因为安全,所以其不保留血缘关系,缓存反之。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/24907.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

js逆向基础篇-某音乐网站-xx音乐

提示!本文章仅供学习交流,严禁用于任何商业和非法用途,如有侵权,可联系本文作者删除! 网站链接:aHR0cHM6Ly9tdXNpYy4xNjMuY29tLyMvc2VhcmNoL20vP3M9JUU1JUE0JUE5JUU0JUI4JThCJnR5cGU9MQ== 案例分析: 搜索歌曲名称,找到列表接口,如上图能看到列表数据的,之后看下传参,…

249 h221 最大岛屿面积

方式1 错误的动态规划 递归公式为 if (matrix[i][j]‘1’&&matrix[i-1][j-1]‘1’){ int edge(int) Math.pow(dp[i][j],0.5); // 边长 int addCount addCount(matrix, i, j, edge); dp[i][j]dp[i-1][j-1]addCount; maxMath.max(max,dp[i][j]); } 只根据 dp[i-1][j-1]…

Dev C++开发环境的配置及使用

标题Dev C开发环境的配置及使用 本文引用自作者编写的下述图书; 本文允许以个人学习、教学等目的引用、讲授或转载,但需要注明原作者"海洋饼干叔 叔";本文不允许以纸质及电子出版为目的进行抄摘或改编。 1.《Python编程基础及应用》&#xff0…

免费题库接口

免费题库接口 本平台优点: 多题库查题、独立后台、响应速度快、全网平台可查、功能最全! 1.想要给自己的公众号获得查题接口,只需要两步! 2.题库: 查题校园题库:查题校园题库后台(点击跳转&a…

[SUCTF 2019]Pythonginx

源码: app.route(/getUrl, methods[GET, POST]) def getUrl():url request.args.get("url")host parse.urlparse(url).hostnameif host suctf.cc:return "我扌 your problem? 111"parts list(urlsplit(url))host parts[1]if host suctf…

[论文评析]Densely Connected Convolutional Networks,CVPR,2017

Densely Connected Convolutional Networks, 文章信息背景与动机DenseNetDense blockDenseNetDenseNet的集中经典配置总结文章信息 题目:Densely Connected Convolutional Networks, 发表:CVPR,2017 作者:Gao Huang, …

【TWVRP】遗传算法求解带时间窗的含充电站车辆路径规划问题【含Matlab源码 1177期】

⛄一、VRP简介 1 VRP基本原理 车辆路径规划问题(Vehicle Routing Problem,VRP)是运筹学里重要的研究问题之一。VRP关注有一个供货商与K个销售点的路径规划的情况,可以简述为:对一系列发货点和收货点,组织调用一定的车辆&#xff…

【密码学篇】虚拟专用网技术原理与应用(商密)

【密码学篇】虚拟专用网技术原理与应用(商密) VPN技术不是洪水猛兽,其普遍应用于网络通信安全和网络接入控制,可通过服务器、硬件、软件等多种方式实现。—【蘇小沐】 文章目录【密码学篇】虚拟专用网技术原理与应用(…

JAVA多线程并发(一):线程的创建

JAVA多线程并发——创建线程 第一章:线程的创建与实现 文章目录JAVA多线程并发——创建线程一、继承Thread类二、实现runnable接口三、简单匿名内部类写法四、实现Callable接口五、线程池一、继承Thread类 代码示例: public class ExtendThread {publ…

SPARKSQL3.0-Unresolved[Parsed]阶段源码剖析

一、前言 上两节介绍了Antlr4的简单使用以及spark中如何构建SessionState,如果没有看过建议先了解上两节的使用,否则看本节会比较吃力 [SPARKSQL3.0-Antlr4由浅入深&SparkSQL语法解析] [SPARKSQL3.0-SessionState构建源码剖析] 那么在Unresolved…

MySql查询的生命周期和性能优化思路

目录 前言 1. 为什么查询性能差 2. 一次查询的生命周期 2.1 客户端与服务端通信 2.2 查询缓存 2.3 解析器 2.4 预处理器 2.5 优化器 2.6 查询引擎 2.7 存储引擎 3. 查询性能优化的思路 4.总结 前言 一说到mysql的查询性能优化,相信很多人能说出来很多的技…

AT32F407/437使用FreeRTOS并实现ping客户端

示例目的 基于以太网络,实现ping客户端已检测网络联机。 支持型号 AT32F407xx AT32F437xx 主要使用外设 EMAC GPIO USART 快速使用方法硬件资源 1) 指示灯LED2/LED3 2) USART1(PA9/PA10) 3) AT-START-F407/ AT-START-F437实验板 4) 以太网连接线软件资源 1) SourceC…

sql函数coalesce和parse_url

学习函数系列: coalesce coalesce函数可以用来排除null值。 coalesce(a, b,c,d) 参数的个数没有限制 返回第一个参数中非null的值。 select help coalesce\G; [ 1. row ] name | COALESCE description | Syntax: COALESCE(value,…) Returns the first non-NUL…

15-JavaSE基础巩固练习:多态、接口、抽象类的综合练习

多态的综合练习 1、需求 狗类 属性:年龄,颜色行为: eat(String something):表示吃东西lookHome():看家 猫类 属性:年龄,颜色行为: eat(String something):吃东西catch…

5G工业互联阶段二:5G产线工控网

5G深入核心生产环节的第二个阶段,主要是实现产线内部通信5G化。以工控5G化为主,并综合考虑数采、安全通信等。大致示意如下: 工艺部件工控通信5G化: 如上图所述,以产线主PLC为中心,大致分为主PLC到产线内机…

Spark 3.0 - 5.ML Pipeline 实战之电影影评情感分析

目录 一.引言 二.Stage1 - 数据准备 1.数据样式 2.读取数据 3.平均得分与 Top 5 4.训练集、测试集划分 三.Stage-2 - Comment 分词 1.Tokenizer 🙅🏻‍♀️ 2.JieBa 分词 🙆🏻‍♀️ 2.1 Jieba 分词示例 2.2 自定义 Jie…

系统设计 system design 干货笔记

参考大佬的博客 https://www.lecloud.net/post/9246290032/scalability-for-dummies-part-3-cache 参考的github https://github.com/donnemartin/system-design-primer#step-2-review-the-scalability-article scalability 1 Clone 每台服务器都包含完全相同的代码库&#…

SOLIDWORKS 2023 3D Creator 云端结构设计新功能

3DEXPERIENCE平台更新版本已经与大家见面,今天微辰三维与大家分享3D Creator 云端结构设计新功能,让我们先一起来看看视频—— SOLIDWORKS 2023 3D 云端结构设计新功能点击观看3D Creator 云端结构设计新功能 如今,我们的设计生产工作不仅要面…

Linux进阶-Makefile

make工具:找出修改过的文件,根据依赖关系,找出受影响的相关文件,最后按照规则单独编译这些文件。 Makefile文件:记录依赖关系和编译规则。 Makefile本质:无论多么复杂的语法,都是为了更好地解决…

m认知无线电网络中频谱感知的按需路由算法matlab仿真

目录 1.算法概述 2.仿真效果预览 3.MATLAB部分代码预览 4.完整MATLAB程序 1.算法概述 使用无线电用户的频率范围在 9kHz 到 275GHz[3],由于无线通信环境中的干扰、信道衰落和无线电收发设备自身属性等的影响,大部分无线电设备只能工作在 50GHz 以下。…