Spark大数据处理讲课笔记3.6 RDD容错机制

news2024/11/23 16:32:49

文章目录

  • 零、本讲学习目标
  • 一、RDD容错机制
    • (一)血统方式
    • (二)设置检查点方式
  • 二、RDD检查点
    • (一)RDD检查点机制
    • (二)与RDD持久化的区别
    • (三)RDD检查点案例演示
  • 三、共享变量
    • (一)广播变量
      • 1、默认情况下变量的传递
      • 2、使用广播变量时变量的传递
    • (二)累加器
      • 1、累加器功能
      • 2、不使用累加器
      • 3、使用累加器

零、本讲学习目标

  1. 了解RDD容错机制
  2. 理解RDD检查点机制的特点与用处
  3. 理解共享变量的类别、特点与使用

一、RDD容错机制

  • 当Spark集群中的某一个节点由于宕机导致数据丢失,则可以通过Spark中的RDD进行容错恢复已经丢失的数据。RDD提供了两种故障恢复的方式,分别是血统(Lineage)方式设置检查点(checkpoint)方式

(一)血统方式

  • 根据RDD之间依赖关系对丢失数据的RDD进行数据恢复。若丢失数据的子RDD进行窄依赖运算,则只需要把丢失数据的父RDD的对应分区进行重新计算,不依赖其他节点,并且在计算过程中不存在冗余计算;若丢失数据的RDD进行宽依赖运算,则需要父RDD所有分区都要进行从头到尾计算,计算过程中存在冗余计算。

(二)设置检查点方式

  • 本质是将RDD写入磁盘存储。当RDD进行宽依赖运算时,只要在中间阶段设置一个检查点进行容错,即Spark中的sparkContext调用setCheckpoint()方法,设置容错文件系统目录作为检查点checkpoint,将checkpoint的数据写入之前设置的容错文件系统中进行持久化存储,若后面有节点宕机导致分区数据丢失,则以从做检查点的RDD开始重新计算,不需要从头到尾的计算,从而减少开销。

二、RDD检查点

(一)RDD检查点机制

  • RDD的检查点机制(Checkpoint)相当于对RDD数据进行快照,可以将经常使用的RDD快照到指定的文件系统中,最好是共享文件系统,例如HDFS。当机器发生故障导致内存或磁盘中的RDD数据丢失时,可以快速从快照中对指定的RDD进行恢复,而不需要根据RDD的依赖关系从头进行计算,大大提高了计算效率。

(二)与RDD持久化的区别

  • cache()或者persist()是将数据存储于机器本地的内存或磁盘,当机器发生故障时无法进行数据恢复,而检查点是将RDD数据存储于外部的共享文件系统(例如HDFS),共享文件系统的副本机制保证了数据的可靠性。
  • 在Spark应用程序执行结束后,cache()或者persist()存储的数据将被清空,而检查点存储的数据不会受影响,将永久存在,除非手动将其移除。因此,检查点数据可以被下一个Spark应用程序使用,而cache()或者persist()数据只能被当前Spark应用程序使用。

(三)RDD检查点案例演示

  • net.huawei.rdd包里创建CheckpointDemo对象
    在这里插入图片描述
    在这里插入图片描述
package net.huawei.rdd

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

/**
  * 功能:RDD检查点演示
  * 作者:华卫
  * 日期:2022年04月16日
  */
object CheckpointDemo {
  def main(args: Array[String]): Unit = {
    // 设置系统属性(本地运行必须设置,否则无权访问HDFS)
    System.setProperty("HADOOP_USER_NAME", "root")
    // 创建SparkConf对象
    val conf = new SparkConf()
    // 设置应用程序名称,可在Spark WebUI里显示
    conf.setAppName("Spark-CheckpointDemo")
    // 设置集群Master节点访问地址
    conf.setMaster("local[2]")
    // 设置测试内存
    conf.set("spark.testing.memory", "2147480000")
    // 基于SparkConf对象创建SparkContext对象,该对象是提交Spark应用程序的入口
    val sc = new SparkContext(conf)

    // 设置检查点数据存储路径
    sc.setCheckpointDir("hdfs://master:9000/spark-ck")
    // 创建模拟数据RDD
    val rdd: RDD[Int] = sc.parallelize(List(1, 1, 2, 3, 5, 8, 13))
    // 过滤结果
    val resultRDD = rdd.filter(_ >= 5)
    // 持久化RDD到内存中
    resultRDD.cache()
    // 将resultRDD标记为检查点
    resultRDD.checkpoint()

    // 第一次行动算子计算时,将把标记为检查点的RDD数据存储到文件系统指定路径中
    val result: String = resultRDD.collect().mkString(", ")
    println(result)
    // 第二次行动算子计算时,将直接从文件系统读取resultRDD数据,而不需要从头计算
    val count = resultRDD.count()
    println(count)

    // 停止Spark容器
    sc.stop()
  }
}
  • 上述代码使用checkpoint()方法将RDD标记为检查点(只是标记,遇到行动算子才会执行)。在第一次行动计算时,被标记为检查点的RDD的数据将以文件的形式保存在setCheckpointDir()方法指定的文件系统目录中,并且该RDD的所有父RDD依赖关系将被移除,因为下一次对该RDD计算时将直接从文件系统中读取数据,而不需要根据依赖关系重新计算。
  • Spark建议,在将RDD标记为检查点之前,最好将RDD持久化到内存,因为Spark会单独启动一个任务将标记为检查点的RDD的数据写入文件系统,如果RDD的数据已经持久化到了内存,将直接从内存中读取数据,然后进行写入,提高数据写入效率,否则需要重复计算一遍RDD的数据。
  • 创建检查点保存数据的目录
    在这里插入图片描述
  • 运行程序,在控制台查看结果
    在这里插入图片描述
  • 查看HDFS检查点目录,执行命令:hdfs dfs -ls -R /spark-ck
    在这里插入图片描述

三、共享变量

  • 通常情况下,Spark应用程序运行的时候,Spark算子(例如map(func)或filter(func))中的函数func会被发送到远程的多个Worker节点上执行,如果一个算子中使用了某个外部变量,该变量就会复制到Worker节点的每一个Task任务中,各个Task任务对变量的操作相互独立。当变量所存储的数据量非常大时(例如一个大型集合)将增加网络传输及内存的开销。因此,Spark提供了两种共享变量:广播变量和累加器。

(一)广播变量

  • 广播变量是将一个变量通过广播的形式发送到每个Worker节点的缓存中,而不是发送到每个Task任务中,各个Task任务可以共享该变量的数据。因此,广播变量是只读的。

1、默认情况下变量的传递

  • map()算子传入的函数中使用外部变量arr
    在这里插入图片描述
scala> val arr = Array(1, 2, 3, 4, 5)
scala> val lines = sc.textFile("data.txt")
scala> val result = lines.map((_, arr))
scala> result.collect()
  • 上述代码中,传递给map()算子的函数(_, arr)会被发送到Executor端执行,而变量arr将发送到Worker节点所有Task任务中。变量arr传递的流程如下图所示。
    在这里插入图片描述
  • 假设变量arr存储的数据量大小有100MB,则每一个Task任务都需要维护100MB的副本,若某一个Executor中启动了3个Task任务,则该Executor将消耗300MB内存。

2、使用广播变量时变量的传递

  • 广播变量其实是对普通变量的封装,在分布式函数中可以通过Broadcast对象的value方法访问广播变量的值
    在这里插入图片描述
  • 使用广播变量将数组arr传递给map()算子
    在这里插入图片描述
scala> val arr = Array(1, 2, 3, 4, 5)
scala> val broadcastVar = sc.broadcast(arr)
scala> val lines = sc.textFile("data.txt")
scala> val result = lines.map((_, broadcastVar))
scala> result.collect()
  • 上述代码使用broadcast()方法向集群发送(广播)了一个只读变量,该方法只发送一次,并返回一个广播变量broadcastVar,该变量是一个org.apache.spark.broadcast.Broadcast对象。Broadcast对象是只读的,缓存在集群的每个Worker节点中。使用广播变量进行变量传递的流程如下图所示。
    在这里插入图片描述
  • Worker节点的每个Task任务共享唯一的一份广播变量,大大减少了网络传输和内存开销。
  • 输出result的数据
    在这里插入图片描述

(二)累加器

1、累加器功能

  • 累加器提供了将Worker节点的值聚合到Driver的功能,可以用于实现计数和求和。

2、不使用累加器

  • 对一个整型数组求和
    在这里插入图片描述
  • 上述代码由于sum变量在Driver中定义,而累加操作sum = sum + x会发送到Executor中执行,因此输出结果不正确。

3、使用累加器

  • 对一个整型数组求和
    在这里插入图片描述
scala> val myacc = sc.longAccumulator("My Accumulator") // 声明累加器
scala> val rdd = sc.makeRDD(Array(1, 2, 3, 4, 5))
scala> rdd.foreach(x => myacc.add(x)) // 向累加器添加值
scala> println("sum = " + myacc.value) // 在Driver端输出结果
  • 上述代码通过调用SparkContext对象的longAccumulator ()方法创建了一个Long类型的累加器,默认初始值为0。也可以使用doubleAccumulator()方法创建Double类型的累加器。
  • 累加器只能在Driver端定义,在Executor端更新。Executor端不能读取累加器的值,需要在Driver端使用value属性读取。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/489124.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Matlab 多项式拟合

一、线性 1、多项式 corrcoef函数 corrcoef函数用来计算矩阵相关系数。 (1)、corrcoef(x):若x为一个矩阵,返回的则是一个相关系数矩阵。 (2)、corrcoef(x,y):计算列向量x、y的相关系数,要求x、y具有相等的元素个数。如果x、y是矩…

2023河南土著双非硕士——毕业季秋招春招就业经验分享(仅限于在河南找工作,毕业想留河南)

作为一名河南土生土长的人,本硕皆就读于河南某双非一本,是一个实打实的河南土著,河南作为互联网就业的贫困环境,相较于CSDN博客上那么多动不动就腾讯、阿里、字节等大厂的就业经验分享,我更想分享一下我在河南省内找工…

百度文心一言正式亮相,数说故事受邀成为首批内测企业

3月16日下午,百度在北京召开新闻发布会,正式推出基于百度新一代大语言模型的生成式AI产品——文心一言,百度创始人、董事长兼首席执行官李彦宏现场展示了文心一言在文学创作、商业文案创作、数理推算、中文理解、多模态生成五个使用场景中的综…

卖期权的时候,我们在卖什么?

一直在思考一个问题,卖期权到底是怎么回事?卖实值期权、平值期权、虚值期权背后的本质有什么区别?卖近期的和远期的期权背后的本质又是什么?我们用沪深300指数期权来研究一下。 我们先从数据上来直观感受一下。上面这个表格是2020-12-09日这…

基于R语言的贝叶斯时空数据模型实践技术

时间-空间数据(以下简称“时空数据”)是最重要的观测数据形式之一,很多科学研究的数据都以时空数据的形式得以呈现,而科学研究目的可以归结为挖掘时空数据中的规律。另一方面,贝叶斯统计学作为与传统统计学…

【OpenCV】 2D-2D:对极几何算法原理

2D-2D匹配: 对极几何 SLAM十四讲笔记1 1.1 对极几何數學模型 考虑从两张图像上观测到了同一个3D点,如图所示**。**我们希望可以求解相机两个时刻的运动 R , t R,t R,t。 假设我们要求取两帧图像 I 1 , I 2 I_1,I_2 I1​,I2​之间的运动,设第一帧到第二帧的运动为…

MiniGPT-4部署过程

文章目录 项目背景部署过程环境配置与文件准备部署推理报错1报错2 项目背景 2023年4月19日,开源项目MiniGPT-4发布,该项目是由KAUST(沙特阿卜杜拉国王科技大学),是几位博士开发的。 项目地址:https://gith…

Spark大数据处理讲课笔记3.4 理解RDD依赖

文章目录 零、本讲学习目标一、RDD依赖二、窄依赖(一)map()与filter()算子(二)union()算子(三)join()算子 三、宽依赖(一)groupBy()算子(二)join()算子&#…

字符设备驱动

字符设备就是按字节流进行读写的设备,读写数据分先后顺序,如点灯,IIC,SPI,LCD等都是字符设备,这些设备的驱动就叫字符设备驱动。 include/linux/fs.h中 file_operations 结构体为内核驱动操作函数集合&…

如何关闭tomcat?tomcat端口号被占用怎么办

我tomcat一跑就报被占用怎么办?我没开tomcat呀!! 这种情况一般是你上一次打开tomcat没有关tomcat服务就关闭了变成软件(如强行关闭正在运行tomcat的idea),这样你在开tomcat就会显示端口号占用了&#xff0…

API 渗透测试从入门到精通系列文章(上)

导语:这是关于使用 Postman 进行渗透测试系列文章的第一部分。 这是关于使用 Postman 进行渗透测试系列文章的第一部分。我原本计划只发布一篇文章,但最后发现内容太多了,如果不把它分成几个部分的话,很可能会让读者不知所措。 所…

SMOKE Single-Stage Monocular 3D Object Detection via Keypoint Estimation 论文学习

论文地址:SMOKE: Single-Stage Monocular 3D Object Detection via Keypoint Estimation Github 地址:https://github.com/open-mmlab/mmdetection3d/tree/main/configs/smoke 1. 解决了什么问题? 预测物体的 3D 朝向角和平移距离对于自动驾…

hive之入门配置

学习hive之路就此开启啦,让我们共同努力 目录 Hive网站: Hive的安装部署: 启动并使用Hive: 安装Mysql: 安装Mysql依赖包: 启动Mysql: 查看密码: 登录root: 密码错误报错: 元数据库配置…

信创国产中间件概览

信创国产中间件概览 中间件国内中间件市场份额第一梯队仍然是IBM> 和Oracle,市场份额合计51%。第二梯队为五大国产厂商,包括东方通、普元信息、宝兰德、中创中间件、金蝶天燕,市场份额合计15%。东方通应用服务器TongWeb对标 开源&#xf…

人脸检测和行人检测3:Android实现人脸检测和行人检测检测(含源码,可实时检测)

人脸检测和行人检测3:Android实现人脸检测和行人检测检测(含源码,可实时检测) 目录 人脸检测和行人检测3:Android实现人脸检测和行人检测(含源码,可实时检测) 1. 前言 2. 人脸检测和行人检测数据集说明 3. 基于YOLOv5的人脸检…

Databend 开源周报第 91 期

Databend 是一款现代云数仓。专为弹性和高效设计,为您的大规模分析需求保驾护航。自由且开源。即刻体验云服务:https://app.databend.cn 。 Whats On In Databend 探索 Databend 本周新进展,遇到更贴近你心意的 Databend 。 新数据类型&…

【Robot Framework】RF关键字大全

收录工作当中最常用的Robot Framework关键字 内容较多,可以CtrlF快速搜索自己想要的 1. RF循环使用(FOR循环) {list1} create list LOG TXT INI INF C CPP JAVA JS CSS LRC H ASM S ASP FOR ${file_type} IN {list1} log 构造请求参数 ${t…

第二十二章 解释器模式

文章目录 前言一、解释器模式基本介绍解释器模式的原理类图 二、通过解释器模式来实现四则运算完整代码抽象表达式类 Expression变量表达式类 VarExpression抽象运算符号解析器 SymbolExpression加法解释器 AddExpression减法解释器 SubExpression计算器类 CalculatorClint 测试…

【C++】仅需一文速通继承

文章目录 1.继承的概念及定义继承的概念继承的定义定义格式:继承关系和访问限定符继承基类成员访问方式的变化 2.基类和派生类对象赋值转换3.继承中的作用域4.派生类的默认成员函数题目:设计出一个类A,让这个类不能被继承(继承了也没用) 5.继承与友元6.继承与静态成员7.复杂的菱…

VK Cup 2017 - Round 1 A - Bear and Friendship Condition(并查集维护大小 + dfs 遍历图统计边数)

题目大意: 给你一些n个点m条边,如果三个点(a,b,c)是合法的,当且仅当 a-b,b-c,c-a都有一条边,问你这个图是否合法,如果有一个或两个点视为合法 思路 考虑什么图才是个合法图:除了点…