Spark大数据处理讲课笔记--- RDD持久化机制

news2025/1/12 1:48:39

零、本讲学习目标

  1. 理解RDD持久化的必要性
  2. 了解RDD的存储级别
  3. 学会如何查看RDD缓存

一、RDD持久化

(一)引入持久化的必要性

  • Spark中的RDD是懒加载的,只有当遇到行动算子时才会从头计算所有RDD,而且当同一个RDD被多次使用时,每次都需要重新计算一遍,这样会严重增加消耗。为了避免重复计算同一个RDD,可以将RDD进行持久化。
  • Spark中重要的功能之一是可以将某个RDD中的数据保存到内存或者磁盘中,每次需要对这个RDD进行算子操作时,可以直接从内存或磁盘中取出该RDD的持久化数据,而不需要从头计算才能得到这个RDD。

(二)案例演示持久化操作

1、RDD的依赖关系图

  • 读取文件,进行一系列操作,有多个RDD,如下图所示。

 

2、不采用持久化操作

  • 在上图中,对RDD3进行了两次算子操作,分别生成了RDD4和RDD5。若RDD3没有持久化保存,则每次对RDD3进行操作时都需要从textFile()开始计算,将文件数据转化为RDD1,再转化为RDD2,最终才得到RDD3。

  • 查看要操作的HDFS文件

 

  • 以集群模式启动Spark Shell

 

  • 按照图示进行操作,得RDD4和RDD5

 

  • 查看RDD4内容,会从RDD1到RDD2到RDD3到RDD4跑一趟

 

  • 显示RDD5内容,也会从RDD1到RDD2到RDD3到RDD5跑一趟

 

3、采用持久化操作

  • 可以在RDD上使用persist()或cache()方法来标记要持久化的RDD(cache()方法实际上底层调用的是persist()方法)。在第一次行动操作时将对数据进行计算,并缓存在节点的内存中。Spark的缓存是容错的:如果缓存的RDD的任何分区丢失,Spark就会按照该RDD原来的转换过程自动重新计算并缓存。
  • 计算到RDD3时,标记持久化

 

  • 计算RDD4,就是基于RDD3缓存的数据开始计算,不用从头到尾跑一趟

 

  • 计算RDD5,就是基于RDD3缓存的数据开始计算,不用从头到尾跑一趟

 

二、存储级别

(一)持久化方法的参数

  • 利用RDD的persist()方法实现持久化,向persist()方法中传入一个StorageLevel对象指定存储级别。每个持久化的RDD都可以使用不同的存储级别存储,默认的存储级别是StorageLevel.MEMORY_ONLY

(二)Spark RDD存储级别表

  • Spark RDD有七种存储级别

 

  • 在Spark的Shuffle操作(例如reduceByKey()中,即使用户没有使用persist()方法,也会自动保存一些中间数据。这样做是为了避免在节点洗牌的过程中失败时重新计算整个输入。如果想多次使用某个RDD,那么强烈建议在该RDD上调用persist()方法。

(三)如何选择存储级别

  • 选择原则:权衡内存使用率和CPU效率
  • 如果RDD存储在内存中不会发生溢出,那么优先使用默认存储级别(MEMORY_ONLY),该级别会最大程度发挥CPU的性能,使在RDD上的操作以最快的速度运行。
  • 如果RDD存储在内存中会发生溢出,那么使用MEMORY_ONLY_SER并选择一个快速序列化库将对象序列化,以节省空间,访问速度仍然相当快。
  • 除非计算RDD的代价非常大,或者该RDD过滤了大量数据,否则不要将溢出的数据写入磁盘,因为重新计算分区的速度可能与从磁盘读取分区一样快。
  • 如果希望在服务器出故障时能够快速恢复,那么可以使用多副本存储级别MEMORY_ONLY_2或MEMORY_AND_DISK_2。该存储级别在数据丢失后允许在RDD上继续运行任务,而不必等待重新计算丢失的分区。其他存储级别在发生数据丢失后,需要重新计算丢失的分区。

(四)persist()与cache()的查看

  • 查看两个方法的源码
/**                                                                                           
 * 在第一次行动操作时持久化RDD,并设置存储级别,当RDD从来没有设置过存储级别时才能使用该方法                                           
 */                                                                                          
def persist(newLevel: StorageLevel): this.type = {                                            
  if (isLocallyCheckpointed) {                                                                
    // 如果之前已将该RDD设置为localCheckpoint,就覆盖之前的存储级别                                                
    persist(LocalRDDCheckpointData.transformStorageLevel(newLevel), allowOverride = true)     
  } else {                                                                                    
    persist(newLevel, allowOverride = false)                                                  
  }                                                                                           
}                                                                                             
/**                                                                                           
  * 持久化RDD,使用默认存储级别(MEMORY_ONLY)                                                              
  */                                                                                          
def persist(): this.type = persist(StorageLevel.MEMORY_ONLY)                                  
                                                                                              
/**                                                                                           
  * 持久化RDD,使用默认存储级别(MEMORY_ONLY)                                                              
  */                                                                                          
def cache(): this.type = persist()                                                            
  • 从上述代码可以看出,cache()方法调用了无参的persist()方法,两者的默认存储级别都为MEMORY_ONLY,但cache()方法不可更改存储级别,而persist()方法可以通过参数自定义存储级别

(五)案例演示设置存储级别

  • net.cl.rdd根包里创建day05子包,然后在子包里创建SetStorageLevel对象
package net.cl.rdd.day05

import org.apache.log4j.{Level, Logger}
import org.apache.spark.storage.StorageLevel
import org.apache.spark.{SparkConf, SparkContext}

object SetStorageLevel {
  def main(args: Array[String]): Unit = {
    // 创建Spark配置对象
    val conf = new SparkConf()
      .setAppName("SetStorageLevel") // 设置应用名称
      .setMaster("local[*]") // 设置主节点位置(本地调试)
    // 基于Spark配置对象创建Spark容器
    val sc = new SparkContext(conf)

    // 去除Spark运行信息
    Logger.getLogger("org").setLevel(Level.OFF)
    Logger.getLogger("com").setLevel(Level.OFF)
    System.setProperty("spark.ui.showConsoleProgress", "false")
    Logger.getRootLogger().setLevel(Level.OFF)

    // 读取HDFS文件,得到rdd
    val rdd = sc.textFile("hdfs://master:9000/park/words.txt")
    // 将rdd标记为持久化,采用默认存储级别 - StorageLevel.MEMORY_ONLY
    rdd.persist() // 无参持久化方法

    // 对rdd做扁平化映射,得到rdd1
    val rdd1 = rdd.flatMap(_.split(" "))
    // 将rdd1持久化到磁盘
    rdd1.persist(StorageLevel.DISK_ONLY)

    // 将rdd1映射成二元组,得到rdd2
    val rdd2 = rdd1.map((_, 1))
    // 将rdd2持久化到内存,溢出的数据持久化到磁盘
    rdd2.persist(StorageLevel.MEMORY_AND_DISK)

    // 第一次行动算子,对标记为持久化的RDD进行不同级别的持久化操作
    println("元素个数:" + rdd2.count)

    // 第二次行动算子,直接利用rdd2的持久化数据进行操作,无须从头进行计算
    rdd2.collect.foreach(println)
  }
}
  • 运行程序,查看结果

 

三、利用Spark WebUI查看缓存

  • 最好重启Spark Shell

 

(一)创建RDD并标记为持久化

  • 执行命令:val rdd = sc.parallelize(List(56, 67, 32, 89, 90, 66, 100))

 

(二)Spark WebUI查看RDD存储信息

  • 浏览器中访问Spark Shell的WebUI http://master:4040/storage/查看RDD存储信息,可以看到存储信息为空

 

  • 执行命令:rdd.collect,收集RDD数据

 

  • 刷新WebUI,发现出现了一个ParallelCollectionRDD的存储信息,该RDD的存储级别为MEMORY,持久化的分区为8,完全存储于内存中。

 

  • 单击ParallelCollectionRDD超链接,可以查看该RDD的详细存储信息

 

  • 上述操作说明,调用RDD的persist()方法只是将该RDD标记为持久化,当执行行动操作时才会对标记为持久化的RDD进行持久化操作。

  • 执行以下命令,创建rdd2,并将rdd2持久化到磁盘

 

  • 刷新上述WebUI,发现多了一个MapPartitionsRDD的存储信息,该RDD的存储级别为DISK,持久化的分区为8,完全存储于磁盘中。

 

(三)将RDD从缓存中删除

  • Spark会自动监视每个节点上的缓存使用情况,并以最近最少使用的方式从缓存中删除旧的分区数据。如果希望手动删除RDD,而不是等待该RDD被Spark自动从缓存中删除,那么可以使用RDD的unpersist()方法。

  • 执行命令:rdd.unpersist(),将rdd(ParallelCollectionRDD)从缓存中删除

 

  • 刷新上述WebUI,发现只剩下了MapPartitionsRDDParallelCollectionRDD已被移除。

 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/561254.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

基于SpringBoot的家乡特色推荐系统的设计与实现

背景 设计一个家乡特色推荐系统,通过这个系统能够满足家乡特色文章的管理功能。系统的主要功能包括首页,个人中心,用户管理,文章分类管理,文章分享管理,系统管理等。 管理员可以根据系统给定的账号进行登…

堪比ChatGPT,Claude注册和使用教程

新建了一个网站 https://ai.weoknow.com/ 每天给大家更新可用的国内可用chatGPT资源 Claude简介 Claude是一款人工智能聊天机器人。主要有以下特征: 使用自己的模型与训练方法,而不是基于GPT-3等开源框架。模型采用Transformer编码器与解码器的结构,并使用对话上下文的双向…

Spark大数据处理讲课笔记---RDD容错机制

零、本讲学习目标 了解RDD容错机制理解RDD检查点机制的特点与用处理解共享变量的类别、特点与使用 一、RDD容错机制 当Spark集群中的某一个节点由于宕机导致数据丢失,则可以通过Spark中的RDD进行容错恢复已经丢失的数据。RDD提供了两种故障恢复的方式&#xff0c…

全国大学生数据统计与分析竞赛2021年【研究生组】-B题:“互联网+教育”用户消费行为分析预测模型(附获奖论文和python代码实现)

目录 摘要 1 问题重述 2 问题分析 3 符号说明 4 模型建立与求解 4.1 问题一 4.1.1 数据预处理 4.1.2 处理结果 4.2 问题二 4.2.1 城市分布情况 4.2.2 用户登录情况 4.3 问题三 4.3.1 模型建立 4.3.2 模型求解 4.3.3 模型优化 4.4 问题四 4.4.1 模型建立 4.4.…

Windows 编译 OpenCV 头疼 ? 已编译好的,你要不要吧

一、使用官方编译好的 【Qt】opencv源码&官方编译好的opencv在windows下使用的区别_外来务工人员徐某的博客-CSDN博客 官方替我们编译好了,可以直接拿来用,但是看到下面这两个文件夹就知道,官方是用msvc编译器编译的,所以还是…

2天搞定-从零开始搞-量化交易-Python 【案例A股量化交易】第一节

搭建windows电脑开发环境 一,下载并搭建python 环境 1:python 安装过程教程:https://blog.csdn.net/weixin_44727274/article/details/126017386 2:python 下载地址官网:https://www.python.org/downloads/windows/ (过程较慢耐心等待,多版本选择) 3:python 本人放…

chatgpt赋能Python-python_noj

Python NOJ - 一款适合Python学习者的在线编程环境 Python NOJ是一款在线的Python编程环境,其全称为Python Online Judge,是一款适合Python学习者使用的编程工具。接下来,我们将介绍其主要特点和优势,并探讨其与其他在线编程环境…

chatgpt赋能Python-python_nmpy

Python NumPy:提高数据科学和数学计算的效率 在数据科学和数学计算领域,Python一直是最受欢迎的语言之一。NumPy是一个优秀的Python库,它通过提供一个强大的多维数组对象和与之相关的各种函数,极大地提高了Python在数据科学和数学…

2022下半年上午题

2022下半年上午题 b b d a c d 在做加法前先用补码表示 c a d c a c b b 专利权需要申请,题目中没说公司申请了专利 c c 前向传播取大值 d 反向传播求关键路径 b b b d a c 先在前驱图中把信号量定义下去 然后定义p,v操作 然后直接看图 1:从p1出来…

Spark大数据处理讲课笔记-- 理解RDD依赖

零、本讲学习目标 理解RDD的窄依赖理解RDD的宽依赖了解两种依赖的区别 一、RDD依赖 在Spark中,对RDD的每一次转化操作都会生成一个新的RDD,由于RDD的懒加载特性,新的RDD会依赖原有RDD,因此RDD之间存在类似流水线的前后依赖关系…

CANFDCAN协议对比 - 基础介绍_02

目录 四、CAN和CANFD区别 1、保留位 2、FDF-FD格式 五、高速传输机制 1、位速率切换 (Bit Rate Switch) 2、波特率5MBit/s 3、BRS和CRC界定符之间采用更高的波特率 六、CANFD数据场 1、经典CAN中DLC:9种可能的长度 2、CANFD中DLC:16种可能的长…

ChatGPT你真的玩明白了?来试试国内免费版的ChatGPT吧!

文章目录 一、什么是ChatGPT二、ChatGPT的作用三、免费ChatGPT的使用四、写在最后 一、什么是ChatGPT ChatGPT全称为Chat Generative Pre-trained Transformer,Chat是聊天的意思,GPT是生成型预训练变换模型,可以翻译为聊天生成预训练转换器或…

抖音seo源码开发部署

抖音seo账号矩阵源码系统搭建,​ 抖音获客系统,抖音SEO优化系统源码开发,思路分享,分享一些开发的思路...... 账号矩阵霸屏系统源代码账号矩阵系统建设部署,短视频seo账号矩阵框架分析,开发语言为后台框架语言PHP pyt…

chatgpt赋能Python-python_nonetype报错

Python NoneType报错:原因、解决方法和预防措施 Python 是一种面向对象的高级编程语言,用于快速编写脚本和应用程序。但是,当我们在编写 Python 代码时,可能会遇到 NoneType 报错;这是一种类型错误,它发生…

接口自动化测试工具SoapUI下载安装以及简单使用教程

前言 SoapUI是Webservice开发的必备工具。SoapUI是一个开源测试工具,通过Soap/HTTP来检查、调用、实现Web Service的功能,而且还能对Webservice做性能方面的测试。SoapUI会根据WSDL的格式生成左边的列表树,双击Request1就能看到Soap请求报文的内容。 一…

笔记--大数据--大数据概念

大数据:指无法在一定时间范围内用常规软件工具进行捕捉、管理和 处理的数据集合,是需要新处理模式才能具有更强的决策力、洞察发现力和流程优化 能力的海量、高增长率和多样化的信息资产。 按顺序给出数据存储单位:bit、Byte、KB、MB、GB、TB…

笔记--大数据--Hadoop--01--基础概念

Hadoop是什么 Hadoop是一个分布式系统基础架构 主要解决海量数据的存储和分析计算问题 Hadoop优势–4高 高可靠性:Hadoop底层维护多个数据版本,单个计算元素或存储故障也不会导致数据丢失 高扩展性:在集群中分配任务数据,可以方便…

springWEB搭建

SpringWEB就是spring框架里得一个模块 springMVC介绍 在之前的后端三大架构: Controller: 控制层, 包含了servlet, 对数据的接收, 处理, 响应 Model: 数据模型, dao, model VIew: 视图, jsp, 用于将数据添加到html中进行响应 工作流程: 主要是控制层接收到响应之后, 调取dao层将…

CodeForces.1806A .平面移动.[判断可达范围][找步数规律]

题目描述: 题目解读: 给定移动规则以及起始点,终点;分析终点是否可达,可达则输出最小步数。 解题思路: 首先要判定是否可达。画图可知,对于题目给定的移动规则,只能到达起始点(a,b…

行业常识_交换机

文章目录 一、前言二、交换机2.1 什么是交换机?2.2 交换机的作用是什么?2.3 交换机的应用2.4 交换机分类2.5 交换机功能2.6 交换机的带宽 三、总结 一、前言 项目中经常会用到交换机。 交换机有多个网口。 你可以用一根网线,网线一端插入交换…