Spark大数据处理讲课笔记---RDD容错机制

news2025/1/12 4:04:21

零、本讲学习目标

  1. 了解RDD容错机制
  2. 理解RDD检查点机制的特点与用处
  3. 理解共享变量的类别、特点与使用

一、RDD容错机制

  • 当Spark集群中的某一个节点由于宕机导致数据丢失,则可以通过Spark中的RDD进行容错恢复已经丢失的数据。RDD提供了两种故障恢复的方式,分别是血统(Lineage)方式设置检查点(checkpoint)方式

(一)血统方式

  • 根据RDD之间依赖关系对丢失数据的RDD进行数据恢复。若丢失数据的子RDD进行窄依赖运算,则只需要把丢失数据的父RDD的对应分区进行重新计算,不依赖其他节点,并且在计算过程中不存在冗余计算;若丢失数据的RDD进行宽依赖运算,则需要父RDD所有分区都要进行从头到尾计算,计算过程中存在冗余计算。

(二)设置检查点方式

  • 本质是将RDD写入磁盘存储。当RDD进行宽依赖运算时,只要在中间阶段设置一个检查点进行容错,即Spark中的sparkContext调用setCheckpoint()方法,设置容错文件系统目录作为检查点checkpoint,将checkpoint的数据写入之前设置的容错文件系统中进行持久化存储,若后面有节点宕机导致分区数据丢失,则以从做检查点的RDD开始重新计算,不需要从头到尾的计算,从而减少开销。

二、RDD检查点

(一)RDD检查点机制

  • RDD的检查点机制(Checkpoint)相当于对RDD数据进行快照,可以将经常使用的RDD快照到指定的文件系统中,最好是共享文件系统,例如HDFS。当机器发生故障导致内存或磁盘中的RDD数据丢失时,可以快速从快照中对指定的RDD进行恢复,而不需要根据RDD的依赖关系从头进行计算,大大提高了计算效率。

(二)与RDD持久化的区别

  • cache()或者persist()是将数据存储于机器本地的内存或磁盘,当机器发生故障时无法进行数据恢复,而检查点是将RDD数据存储于外部的共享文件系统(例如HDFS),共享文件系统的副本机制保证了数据的可靠性。
  • 在Spark应用程序执行结束后,cache()或者persist()存储的数据将被清空,而检查点存储的数据不会受影响,将永久存在,除非手动将其移除。因此,检查点数据可以被下一个Spark应用程序使用,而cache()或者persist()数据只能被当前Spark应用程序使用。

(三)RDD检查点案例演示

  • net.cl.rdd包里创建day06子包,然后在子包里创建CheckPointDemo对象
package net.cl.rdd.day06

import org.apache.spark.{SparkConf, SparkContext}

object CheckPointDemo {
  def main(args: Array[String]): Unit = {
    // 创建Spark配置对象
    val conf = new SparkConf()
      .setAppName("CheckPointDemo") // 设置应用名称
      .setMaster("local[*]") // 设置主节点位置(本地调试)
    // 基于Spark配置对象创建Spark容器
    val sc = new SparkContext(conf)

    // 设置检查点数据存储路径(目录会自动创建的)
    sc.setCheckpointDir("hdfs://master:9000/spark-ck")
    // 基于集合创建RDD
    val rdd = sc.makeRDD(List(1, 1, 2, 3, 5, 8, 13))
    // 过滤大于或等于5的数据,生成新的RDD
    val rdd1 = rdd.filter(_ >= 5)
    // 将rdd1持久化到内存
    rdd1.cache(); // 相当于无参persist()方法
    // 将rdd1标记为检查点
    rdd1.checkpoint();

    // 第一次行动算子 - 采集数据,将标记为检查点的RDD数据存储到指定位置
    val result = rdd1.collect.mkString(", ")
    println("rdd1的元素:" + result)
    // 第二次行动算子 - 计算个数,直接从缓存里读取rdd1的数据,不用从头计算
    val count = rdd1.count
    println("rdd1的个数:" + count)

    // 停止Spark容器
    sc.stop()
  }
}
  • 上述代码使用checkpoint()方法将RDD标记为检查点(只是标记,遇到行动算子才会执行)。在第一次行动计算时,被标记为检查点的RDD的数据将以文件的形式保存在setCheckpointDir()方法指定的文件系统目录中,并且该RDD的所有父RDD依赖关系将被移除,因为下一次对该RDD计算时将直接从文件系统中读取数据,而不需要根据依赖关系重新计算。
  • Spark建议,在将RDD标记为检查点之前,最好将RDD持久化到内存,因为Spark会单独启动一个任务将标记为检查点的RDD的数据写入文件系统,如果RDD的数据已经持久化到了内存,将直接从内存中读取数据,然后进行写入,提高数据写入效率,否则需要重复计算一遍RDD的数据。

  • 运行程序,在控制台查看结果

 

  • 利用Hadoop WebUI查看HDFS检查点目录

 

  • 查看红色框里的目录

 

  • 查看rdd-1目录

 

  • 因为执行了sc.stop()语句,关闭了Spark容器,缓存的数据就被清除了,当然也无法访问Spark的存储数据。

 

三、共享变量

  • 通常情况下,Spark应用程序运行的时候,Spark算子(例如map(func)或filter(func))中的函数func会被发送到远程的多个Worker节点上执行,如果一个算子中使用了某个外部变量,该变量就会复制到Worker节点的每一个Task任务中,各个Task任务对变量的操作相互独立。当变量所存储的数据量非常大时(例如一个大型集合)将增加网络传输及内存的开销。因此,Spark提供了两种共享变量:广播变量和累加器。

(一)广播变量

  • 广播变量是将一个变量通过广播的形式发送到每个Worker节点的缓存中,而不是发送到每个Task任务中,各个Task任务可以共享该变量的数据。因此,广播变量是只读的
  • 准备工作:在/home目录里创建data.txt

 

  • 上传到HDFS的/park目录

 

1、默认情况下变量的传递

  • map()算子传入的函数中使用外部变量arr

 

val arr = Array(1, 2, 3, 4, 5)
val lines = sc.textFile("hdfs://master:9000/park/data.txt")
val result = lines.map((_, arr))
result.collect
  • 上述代码中,传递给map()算子的函数(_, arr)会被发送到Executor端执行,而变量arr将发送到Worker节点所有Task任务中。变量arr传递的流程如下图所示。

 

  • 假设变量arr存储的数据量大小有100MB,则每一个Task任务都需要维护100MB的副本,若某一个Executor中启动了3个Task任务,则该Executor将消耗300MB内存。

2、使用广播变量时变量的传递

  • 广播变量其实是对普通变量的封装,在分布式函数中可以通过Broadcast对象的value方法访问广播变量的值

 

  • 使用广播变量将数组arr传递给map()算子

 

val arr = Array(1, 2, 3, 4, 5)
val broadcastVar = sc.broadcast(arr) // 定义广播变量
val lines = sc.textFile("hdfs://master:9000/park/data.txt")
val result = lines.map((_, broadcastVar)) // 算子携带广播变量
result.collect

  • 上述代码使用broadcast()方法向集群发送(广播)了一个只读变量,该方法只发送一次,并返回一个广播变量broadcastVar,该变量是一个org.apache.spark.broadcast.Broadcast对象。Broadcast对象是只读的,缓存在集群的每个Worker节点中。使用广播变量进行变量传递的流程如下图所示。

 

  • Worker节点的每个Task任务共享唯一的一份广播变量,大大减少了网络传输和内存开销。
  • 通过遍历算子输出arr结果

 

  • 通过双重循环输出result的数据

 

(二)累加器

1、累加器功能

  • 累加器提供了将Worker节点的值聚合到Driver的功能,可以用于实现计数和求和。

2、不使用累加器

  • 对一个整型数组求和

 

  • 上述代码由于sum变量在Driver中定义,而累加操作sum = sum + x会发送到Executor中执行,因此输出结果不正确。

3、使用累加器

  • 对一个整型数组求和

 

val myacc = sc.longAccumulator("acc") // 在Driver里声明累加器
val rdd = sc.makeRDD(Array(1, 2, 3, 4, 5)) // 创建RDD
rdd.foreach(x => myacc.add(x)) // 在Executor里向累加器添加值
println("sum = " + myacc.value) // 在Driver里输出累加结果
  • 上述代码通过调用SparkContext对象的longAccumulator ()方法创建了一个Long类型的累加器,默认初始值为0。也可以使用doubleAccumulator()方法创建Double类型的累加器。
  • 累加器只能在Driver端定义,在Executor端更新。Executor端不能读取累加器的值,需要在Driver端使用value属性读取。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/561248.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

全国大学生数据统计与分析竞赛2021年【研究生组】-B题:“互联网+教育”用户消费行为分析预测模型(附获奖论文和python代码实现)

目录 摘要 1 问题重述 2 问题分析 3 符号说明 4 模型建立与求解 4.1 问题一 4.1.1 数据预处理 4.1.2 处理结果 4.2 问题二 4.2.1 城市分布情况 4.2.2 用户登录情况 4.3 问题三 4.3.1 模型建立 4.3.2 模型求解 4.3.3 模型优化 4.4 问题四 4.4.1 模型建立 4.4.…

Windows 编译 OpenCV 头疼 ? 已编译好的,你要不要吧

一、使用官方编译好的 【Qt】opencv源码&官方编译好的opencv在windows下使用的区别_外来务工人员徐某的博客-CSDN博客 官方替我们编译好了,可以直接拿来用,但是看到下面这两个文件夹就知道,官方是用msvc编译器编译的,所以还是…

2天搞定-从零开始搞-量化交易-Python 【案例A股量化交易】第一节

搭建windows电脑开发环境 一,下载并搭建python 环境 1:python 安装过程教程:https://blog.csdn.net/weixin_44727274/article/details/126017386 2:python 下载地址官网:https://www.python.org/downloads/windows/ (过程较慢耐心等待,多版本选择) 3:python 本人放…

chatgpt赋能Python-python_noj

Python NOJ - 一款适合Python学习者的在线编程环境 Python NOJ是一款在线的Python编程环境,其全称为Python Online Judge,是一款适合Python学习者使用的编程工具。接下来,我们将介绍其主要特点和优势,并探讨其与其他在线编程环境…

chatgpt赋能Python-python_nmpy

Python NumPy:提高数据科学和数学计算的效率 在数据科学和数学计算领域,Python一直是最受欢迎的语言之一。NumPy是一个优秀的Python库,它通过提供一个强大的多维数组对象和与之相关的各种函数,极大地提高了Python在数据科学和数学…

2022下半年上午题

2022下半年上午题 b b d a c d 在做加法前先用补码表示 c a d c a c b b 专利权需要申请,题目中没说公司申请了专利 c c 前向传播取大值 d 反向传播求关键路径 b b b d a c 先在前驱图中把信号量定义下去 然后定义p,v操作 然后直接看图 1:从p1出来…

Spark大数据处理讲课笔记-- 理解RDD依赖

零、本讲学习目标 理解RDD的窄依赖理解RDD的宽依赖了解两种依赖的区别 一、RDD依赖 在Spark中,对RDD的每一次转化操作都会生成一个新的RDD,由于RDD的懒加载特性,新的RDD会依赖原有RDD,因此RDD之间存在类似流水线的前后依赖关系…

CANFDCAN协议对比 - 基础介绍_02

目录 四、CAN和CANFD区别 1、保留位 2、FDF-FD格式 五、高速传输机制 1、位速率切换 (Bit Rate Switch) 2、波特率5MBit/s 3、BRS和CRC界定符之间采用更高的波特率 六、CANFD数据场 1、经典CAN中DLC:9种可能的长度 2、CANFD中DLC:16种可能的长…

ChatGPT你真的玩明白了?来试试国内免费版的ChatGPT吧!

文章目录 一、什么是ChatGPT二、ChatGPT的作用三、免费ChatGPT的使用四、写在最后 一、什么是ChatGPT ChatGPT全称为Chat Generative Pre-trained Transformer,Chat是聊天的意思,GPT是生成型预训练变换模型,可以翻译为聊天生成预训练转换器或…

抖音seo源码开发部署

抖音seo账号矩阵源码系统搭建,​ 抖音获客系统,抖音SEO优化系统源码开发,思路分享,分享一些开发的思路...... 账号矩阵霸屏系统源代码账号矩阵系统建设部署,短视频seo账号矩阵框架分析,开发语言为后台框架语言PHP pyt…

chatgpt赋能Python-python_nonetype报错

Python NoneType报错:原因、解决方法和预防措施 Python 是一种面向对象的高级编程语言,用于快速编写脚本和应用程序。但是,当我们在编写 Python 代码时,可能会遇到 NoneType 报错;这是一种类型错误,它发生…

接口自动化测试工具SoapUI下载安装以及简单使用教程

前言 SoapUI是Webservice开发的必备工具。SoapUI是一个开源测试工具,通过Soap/HTTP来检查、调用、实现Web Service的功能,而且还能对Webservice做性能方面的测试。SoapUI会根据WSDL的格式生成左边的列表树,双击Request1就能看到Soap请求报文的内容。 一…

笔记--大数据--大数据概念

大数据:指无法在一定时间范围内用常规软件工具进行捕捉、管理和 处理的数据集合,是需要新处理模式才能具有更强的决策力、洞察发现力和流程优化 能力的海量、高增长率和多样化的信息资产。 按顺序给出数据存储单位:bit、Byte、KB、MB、GB、TB…

笔记--大数据--Hadoop--01--基础概念

Hadoop是什么 Hadoop是一个分布式系统基础架构 主要解决海量数据的存储和分析计算问题 Hadoop优势–4高 高可靠性:Hadoop底层维护多个数据版本,单个计算元素或存储故障也不会导致数据丢失 高扩展性:在集群中分配任务数据,可以方便…

springWEB搭建

SpringWEB就是spring框架里得一个模块 springMVC介绍 在之前的后端三大架构: Controller: 控制层, 包含了servlet, 对数据的接收, 处理, 响应 Model: 数据模型, dao, model VIew: 视图, jsp, 用于将数据添加到html中进行响应 工作流程: 主要是控制层接收到响应之后, 调取dao层将…

CodeForces.1806A .平面移动.[判断可达范围][找步数规律]

题目描述: 题目解读: 给定移动规则以及起始点,终点;分析终点是否可达,可达则输出最小步数。 解题思路: 首先要判定是否可达。画图可知,对于题目给定的移动规则,只能到达起始点(a,b…

行业常识_交换机

文章目录 一、前言二、交换机2.1 什么是交换机?2.2 交换机的作用是什么?2.3 交换机的应用2.4 交换机分类2.5 交换机功能2.6 交换机的带宽 三、总结 一、前言 项目中经常会用到交换机。 交换机有多个网口。 你可以用一根网线,网线一端插入交换…

【学习日记2023.5.23】 之 店铺营业状态模块完善

文章目录 5. 店铺营业状态设置5.1 需求分析和设计产品原型 5.2 代码开发5.2.1 设置营业状态5.2.2 管理端查询营业状态5.2.3 用户端查询营业状态 5.3 功能测试5.3.1 接口文档测试5.3.2 接口分组展示5.3.3 前后端联调测试5.4 代码提交 5. 店铺营业状态设置 5.1 需求分析和设计 产…

chatgpt赋能Python-python_os_remove

Python os.remove(): 删除文件 什么是 Python os.remove()? Python os.remove() 函数是 Python 中用于删除文件的标准库函数之一。 它使用以下语法: os.remove(file)这里的 file 参数是要删除的文件的路径及文件名。 Python os.remove()的工作原理 …

Linux——SNAT与DNAT的应用

一、SNAT的介绍 1,SNAT概述 SNAT(SNAT)一般指源地址转换 源地址转换是内网地址向外访问时,发起访问的内网ip地址转换为指定的ip地址(可指定具体的服务以及相应的端口或端口范围),这可以使内网中…