RDD持久化原理和共享变量

news2024/11/23 13:05:45

(一) RDD持久化原理

Spark中有一个非常重要的功能就是可以对RDD进行持久化。当对RDD执行持久化操作时,每个节点都会将自己操作的RDD的partition数据持久化到内存中,并且在之后对该RDD的反复使用中,直接使用内存中缓存的partition数据。
这样的话,针对一个RDD反复执行多个操作的场景,就只需要对RDD计算一次即可,后面直接使用该RDD,而不需要反复计算多次该RDD。

巧妙使用RDD持久化,在某些场景下,对spark应用程序的性能有很大提升。
特别是对于迭代式算法和快速交互式应用来说,RDD持久化,是非常重要的。

要持久化一个RDD,只需要调用它的cache()或者persist()方法就可以了。
在该RDD第一次被计算出来时,就会直接缓存在每个节点中。而且Spark的持久化机制还是自动容错的,如果持久化的RDD的任何partition数据丢失了,那么Spark会自动通过其源RDD,使用transformation算子重新计算该partition的数据。

区别
cache()和persist()的区别在于:
cache()是persist()的一种简化方式,cache()的底层就是调用的persist()的无参版本,也就是调用persist(MEMORY_ONLY),将数据持久化到内存中。
如果需要从内存中清除缓存,那么可以使用unpersist()方法。

(二)RDD持久化策略

策略 介绍
MEMORY_ONLY         以非序列化的方式持久化在JVM内存中
MEMORY_AND_DISK     同上,但是当某些partition无法存储在内存中时,会持久化到磁盘中
MEMORY_ONLY_SER     同MEMORY_ONLY,但是会序列化
MEMORY_AND_DISK_SER 同MEMORY_AND_DSK,但是会序列化
DISK_ONLY           以非序列化的方式完全存储到磁盘上
MEMORY_ONLY_2、MEMORY_AND_DISK_2 等 尾部加了2的持久化级别,表示会将持久化数据复制

补充说明:

  1. MEMORY_ONLY:以非序列化的Java对象的方式持久化在JVM内存中。
  2. MEMORY_AND_DISK:当某些partition无法存储在内存中时,会持久化到磁盘中。下次需要使用这些partition时,需要从磁盘上读取,不需要重新计算
  3. MEMORY_ONLY_SER:同MEMORY_ONLY,但是会使用Java的序列化方式,将Java对象序列化后进行持久化。可以减少内存开销,但是在使用的时候需要进行反序列化,因此会增加CPU开销。
  4. MEMORY_AND_DISK_SER:同MEMORY_AND_DSK。但是会使用序列化方式持久化Java对象。
  5. DISK_ONLY:使用非序列化Java对象的方式持久化,完全存储到磁盘上。
  6. MEMORY_ONLY_2、MEMORY_AND_DISK_2等:如果是尾部加了2的持久化级别,表示会将持久化数据复制一份,保存到其它节点,从而在数据丢失时,不需要重新计算,只需要使用备份数据即可。

(三)如何选择持久化策略

Spark提供了多种持久化级别,主要是为了在CPU和内存消耗之间进行取舍。
策略

  1. 优先使用MEMORY_ONLY,纯内存速度最快,而且没有序列化不需要消耗CPU进行反序列化操作,
    缺点就是比较耗内存
  2. MEMORY_ONLY_SER,将数据进行序列化存储,纯内存操作还是非常快,只是在使用的时候需要消耗CPU进行反序列化。

注意
如果需要进行数据的快速失败恢复,那么就选择带后缀为_2的策略,进行数据的备份,这样在失败时,就不需要重新计算了
能不使用DISK相关的策略,就不要使用,因为有的时候,从磁盘读取数据,还不如重新计算一次。

代码实例

object PersistRddScala {
def main(args: Array[String]): Unit = {
val conf = new SparkConf()
conf.setAppName("PersistRddScala")
.setMaster("local")
val sc = new SparkContext(conf)
val dataRDD = sc.textFile("D:\\hello_10000000.dat").cache()
var start_time = System.currentTimeMillis()
var count = dataRDD.count()
println(count)
start_time = System.currentTimeMillis()
count = dataRDD.count()
println(count)
end_time = System.currentTimeMillis()
println("第二次耗时:"+(end_time-start_time))
sc.stop()
}
}

在没有添加cache之前,每一次都耗时很长, 加上cache之后,第二次计算耗时就很少了

(四)共享变量的工作原理

Spark为此提供了两种共享变量

  1. 一种是Broadcast Variable(广播变量)
    Broadcast Variable会将使用到的变量,仅仅为每个节点拷贝一份,而不会为每个task都拷贝一份副本,因此其最大的作用,就是减少变量到各个节点的网络传输消耗,以及在各个节点上的内存消耗通过调用SparkContext的broadcast()方法,针对某个变量创建广播变量
    注意:广播变量,是只读的, 然后在算子函数内,使用到广播变量时,每个节点只会拷贝一份副本。可以使用广播变量的value()方法获取值。
    在这里插入图片描述
object BroadcastOpScala {
def main(args: Array[String]): Unit = {
conf.setAppName("BroadcastOpScala")
.setMaster("local")
val sc = new SparkContext(conf)
val dataRDD = sc.parallelize(Array(1,2,3,4,5))
val varable = 2
//dataRDD.map(_ * varable)
//1:定义广播变量
val varableBroadcast = sc.broadcast(varable)
//2:使用广播变量,调用其value方法
dataRDD.map(_ * varableBroadcast.value).foreach(println(_))
sc.stop()
}
}
  1. 另一种是Accumulator(累加变量)
    Spark提供的Accumulator,主要用于多个节点对一个变量进行共享性的操作。
    正常情况下在Spark的任务中,由于一个算子可能会产生多个task并行执行,所以在这个算子内部执行的聚合计算都是局部的,想要实现多个task进行全局聚合计算,此时需要使用到Accumulator这个共享的累加变量。
    注意:Accumulator只提供了累加的功能。在task只能对Accumulator进行累加操作,不能读取它的值。
    只有在Driver进程中才可以读取Accumulator的值。
object AccumulatorOpScala {
def main(args: Array[String]): Unit = {
val conf = new SparkConf()
conf.setAppName("AccumulatorOpScala")
.setMaster("local")
val sc = new SparkContext(conf)
val dataRDD = sc.parallelize(Array(1,2,3,4,5))
//这种写法是错误的,因为foreach代码是在worker节点上执行的
// var total = 0和println("total:"+total)是在Driver进程中执行的
//所以无法实现累加操作
//并且foreach算子可能会在多个task中执行,这样foreach内部实现的累加也不是最终全局
/*var total = 0
dataRDD.foreach(num=>total += num)
println("total:"+total)*/
//所以此时想要实现累加操作就需要使用累加变量了
//1:定义累加变量
val sumAccumulator = sc.longAccumulator
//2:使用累加变量
dataRDD.foreach(num=>sumAccumulator.add(num))
//注意:只能在Driver进程中获取累加变量的结果
println(sumAccumulator.value)
}
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/384325.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

互联网工程师 1480 道 Java 面试题及答案整理 ( 2023 年 整理版)

最近很多粉丝朋友私信我说:熬过了去年的寒冬却没熬过现在的内卷;打开 Boss 直拒一排已读不回,回的基本都是外包,薪资还给的不高,对技术水平要求也远超从前;感觉 Java 一个初中级岗位有上千人同时竞争&#…

安卓逆向_4 --- 定位关键Smali、monitor使用、log插桩、栈追踪、methodprofiling(方法分析)

1、快速定位关键 smali 代码 1.分析流程 搜索特征字符串 搜索关键api 通过方法名来判断方法的功能 2.快速定位关键代码 反编译 APK 程序,AndroidManifest.xml > 包名/系统版本/组件 程序的主 activity(程序入口界面) 每个…

Allegro如何画半圆形的线操作指导

Allegro如何画半圆形的线操作指导 在用Allegro设计PCB的时候,在某些应用场合会需要画半圆形,如下图 如何画半圆形,具体操作如下 点击Add点击Arc w/Radius

WebRTC QoS方法之Pacer实现

本文将解读WebRTC中Pacer算法的实现。WebRTC有两套Pacer算法:TaskQueuePacedSender、PacedSender。本文仅介绍PacedSender的实现。(文章中引用的WebRTC代码基于master,commit:3f412945f05ce1ac372a7dad77d85498d23deaae源码分析&a…

算法练习(八)区域搜索

一、腐烂的橘子 1、题目描述: 在给定的 m x n 网格 grid 中,每个单元格可以有以下三个值之一: 值 0 代表空单元格; 值 1 代表新鲜橘子; 值 2 代表腐烂的橘子。 每分钟,腐烂的橘子 周围 4 个方向上相邻 的…

从零开始:学习使用 Hugo 构建自己的静态网站

1、什么是 Hugo 1.1、简介 Hugo 是一个由 Go 语言编写的静态网站生成器。它可以帮助用户快速构建高性能的静态网站,特别是博客、文档和个人网站等。与其他静态网站生成器相比,Hugo 的特点是速度快、易于使用、可扩展性强等。Hugo 使用简单的 Markdown …

【项目】游戏-我在万科转生成了一只狗

文章目录学习unity一些基操..位置坐标系父子关系常用工具导入游戏模型资源商店创建地形为地形化妆--纹理绘制脚本组件脚本的生命周期脚本执行顺序标签和图层的作用向量的运算和意义欧拉角和四元数-常用C#预制体-类与对象Debug的使用C#物体属性使用游戏时间使用-C#计时器的设置路…

无需手动编码的XGBoost中的分类特征

无需手动编码的XGBoost中的分类特征 XGBoost 是一种基于梯度提升的基于决策树的集成机器学习算法。 然而,直到最近,它还没有原生支持分类数据。 在将分类特征用于训练或推理之前,必须对其进行手动编码。 在序数类别的情况下,例如…

视觉SLAM十四讲ch4 李群和李代数笔记

视觉SLAM十四讲ch4 李群和李代数视觉SLAM十四讲ch4 李群和李代数李群和李代数基础指数映射与对数映射李代数求导与扰动模型视觉SLAM十四讲ch4 李群和李代数 李群和李代数基础 可以将SO3看成旋转矩阵集合,SE3看成变换矩阵集合 李代数是6个自由度的向量空间…

qsort函数的应用以及模拟实现

前言 🎈个人主页:🎈 :✨✨✨初阶牛✨✨✨ 🐻推荐专栏: 🍔🍟🌯 c语言进阶 🔑个人信条: 🌵知行合一 🍉本篇简介:>:介绍库函数qsort函数的模拟实现和应用 金句分享: ✨追…

Docker之部署Mysql

通过docker对Mysql进行部署。 如果没有部署过docker,看我之前写的目录拉取镜像运行容器开放端口拉取镜像 前往dockerHub官网地址,搜索mysql。 找到要拉取的镜像版本,在tag下找到版本。 拉取mysql镜像,不指定版本数&#xff0c…

04 Android基础--RelativeLayout

04 Android基础--RelativeLayout什么是RelativeLayout?RelativeLayout的常见用法:什么是RelativeLayout? 相对布局(RelativeLayout)是一种根据父容器和兄弟控件作为参照来确定控件位置的布局方式。 根据父容器定位 在相…

JavaWeb--RequestResponse

Request&Response1 Request和Response的概述2 Request对象2.1 Request继承体系2.2 Request获取请求数据2.2.1 获取请求行数据2.2.2 获取请求头数据2.2.3 获取请求体数据2.2.4 小结2.2.5 获取请求参数的通用方式2.3 IDEA快速创建Servlet2.4 请求参数中文乱码问题2.4.1 POST请…

智能家居Homekit系列一智能通断开关

智能通断器,也叫开关模块,可以非常方便地接入家中原有开关、插座、灯具、电器的线路中,通过手机App或者语音即可控制电路通断,轻松实现原有家居设备的智能化改造。 随着智能家居概念的普及,越来越多的人想将自己的家改…

WebRTC系列分享 | WebRTC视频QoS全局技术栈

概述目前总结出WebRTC用于提升QoS的方法有:NACK、FEC、SVC、JitterBuffer、IDR Request、Pacer、Sender Side BWE、Probe、VFR(动态帧率调整策略)、AVSync(音视频同步)、动态分辨率调整。这几种方法在WebRTC架构分布如…

上海亚商投顾:沪指窄幅震荡 ChatGPT概念再度走高

上海亚商投顾前言:无惧大盘涨跌,解密龙虎榜资金,跟踪一线游资和机构资金动向,识别短期热点和强势个股。市场情绪沪指今日窄幅震荡,创业板指低开低走,午后跌幅扩大至1%,宁德时代一度跌近4%。6G概…

【架构师】零基础到精通——微服务治理

博客昵称:架构师Cool 最喜欢的座右铭:一以贯之的努力,不得懈怠的人生。 作者简介:一名Coder,软件设计师/鸿蒙高级工程师认证,在备战高级架构师/系统分析师,欢迎关注小弟! 博主小留言…

【java基础】一篇文章彻底搞懂lambda表达式

文章目录lambda表达式是什么lambda表达式的语法函数式接口初次使用深入理解方法引用 :: 用法快速入门不同形式的::情况1 object::instanceMethod情况2 Class::instanceMethod情况3 Class::staticMethod对于 :: 的一些示例及其注意事项构造器引用变量作用域使用外部变量定义内部…

web自动化 基于python+Selenium+PHP+Ftp实现的轻量级web自动化测试框架

1、 开发环境 win7 64 PyCharm 4.0.5 setuptools-29.0.1.zip 下载地址:setuptools-29.0.1.zip_免费高速下载|百度网盘-分享无限制 官方下载地址:setuptools PyPI python 3.3.2 mysql-connector-python-2.1.4-py3.3-win64 下载地址:mysq…

企业 Active Directory 自助服务

您的企业是否正在寻找一个全面的 Active Directory 自助服务解决方案,使用户能够在没有帮助台帮助的情况下满足自己的 Active Directory 需求?ADSelfService Plus 了解您的安全问题,并提供基于审批的自助服务工作流功能,使管理员能…