Spark RDD容错机制

news2025/1/4 19:44:14

文章目录

  • 一、RDD容错机制
    • (一)血统方式
    • (二)设置检查点方式
  • 二、RDD检查点
    • (一)RDD检查点机制
    • (二)与RDD持久化的区别
    • (三)RDD检查点案例演示
  • 三、共享变量
    • (一)广播变量
    • 1、默认情况下变量的传递
    • 2、使用广播变量时变量的传递
    • (二)累加器
    • 1、累加器功能
    • 2、不使用累加器
    • 3、使用累加器


一、RDD容错机制

当Spark集群中的某一个节点由于宕机导致数据丢失,则可以通过Spark中的RDD进行容错恢复已经丢失的数据。RDD提供了两种故障恢复的方式,分别是血统(Lineage)方式和设置检查点(checkpoint)方式。

(一)血统方式

根据RDD之间依赖关系对丢失数据的RDD进行数据恢复。若丢失数据的子RDD进行窄依赖运算,则只需要把丢失数据的父RDD的对应分区进行重新计算,不依赖其他节点,并且在计算过程中不存在冗余计算;若丢失数据的RDD进行宽依赖运算,则需要父RDD所有分区都要进行从头到尾计算,计算过程中存在冗余计算。

(二)设置检查点方式

本质是将RDD写入磁盘存储。当RDD进行宽依赖运算时,只要在中间阶段设置一个检查点进行容错,即Spark中的sparkContext调用setCheckpoint()方法,设置容错文件系统目录作为检查点checkpoint,将checkpoint的数据写入之前设置的容错文件系统中进行持久化存储,若后面有节点宕机导致分区数据丢失,则以从做检查点的RDD开始重新计算,不需要从头到尾的计算,从而减少开销。

二、RDD检查点

(一)RDD检查点机制

RDD的检查点机制(Checkpoint)相当于对RDD数据进行快照,可以将经常使用的RDD快照到指定的文件系统中,最好是共享文件系统,例如HDFS。当机器发生故障导致内存或磁盘中的RDD数据丢失时,可以快速从快照中对指定的RDD进行恢复,而不需要根据RDD的依赖关系从头进行计算,大大提高了计算效率。

(二)与RDD持久化的区别

cache()或者persist()是将数据存储于机器本地的内存或磁盘,当机器发生故障时无法进行数据恢复,而检查点是将RDD数据存储于外部的共享文件系统(例如HDFS),共享文件系统的副本机制保证了数据的可靠性。

在Spark应用程序执行结束后,cache()或者persist()存储的数据将被清空,而检查点存储的数据不会受影响,将永久存在,除非手动将其移除。因此,检查点数据可以被下一个Spark应用程序使用,而cache()或者persist()数据只能被当前Spark应用程序使用。

(三)RDD检查点案例演示

在net.army.rdd包里创建day06子包,然后在子包里创建CheckPointDemo对象
在这里插入图片描述

package net.army.rdd.day06

import org.apache.spark.{SparkConf, SparkContext}

/**
 * 作者:梁辰兴
 * 日期:2023/6/6
 * 功能:检查点演示
 */
object CheckPointDemo {
  def main(args: Array[String]): Unit = {
    // 创建Spark配置对象
    val conf = new SparkConf()
      .setAppName("CheckPointDemo") // 设置应用名称
      .setMaster("local[*]") // 设置主节点位置(本地调试)
    // 基于Spark配置对象创建Spark容器
    val sc = new SparkContext(conf)

    // 设置检查点数据存储路径(目录会自动创建的)
    sc.setCheckpointDir("hdfs://master:9000/spark-ck")
    // 基于集合创建RDD
    val rdd = sc.makeRDD(List(1, 1, 2, 3, 5, 8, 13))
    // 过滤大于或等于5的数据,生成新的RDD
    val rdd1 = rdd.filter(_ >= 5)
    // 将rdd1持久化到内存
    rdd1.cache(); // 相当于无参persist()方法
    // 将rdd1标记为检查点
    rdd1.checkpoint();

    // 第一次行动算子 - 采集数据,将标记为检查点的RDD数据存储到指定位置
    val result = rdd1.collect.mkString(", ")
    println("rdd1的元素:" + result)
    // 第二次行动算子 - 计算个数,直接从缓存里读取rdd1的数据,不用从头计算
    val count = rdd1.count
    println("rdd1的个数:" + count)

    // 停止Spark容器
    sc.stop()
  }
}

上述代码使用checkpoint()方法将RDD标记为检查点(只是标记,遇到行动算子才会执行)。在第一次行动计算时,被标记为检查点的RDD的数据将以文件的形式保存在setCheckpointDir()方法指定的文件系统目录中,并且该RDD的所有父RDD依赖关系将被移除,因为下一次对该RDD计算时将直接从文件系统中读取数据,而不需要根据依赖关系重新计算。

Spark建议,在将RDD标记为检查点之前,最好将RDD持久化到内存,因为Spark会单独启动一个任务将标记为检查点的RDD的数据写入文件系统,如果RDD的数据已经持久化到了内存,将直接从内存中读取数据,然后进行写入,提高数据写入效率,否则需要重复计算一遍RDD的数据。

运行程序,在控制台查看结果
在这里插入图片描述
利用Hadoop WebUI查看HDFS检查点目录
在这里插入图片描述
查看红色框里的目录
在这里插入图片描述
查看rdd-1目录
在这里插入图片描述
因为执行了sc.stop()语句,关闭了Spark容器,缓存的数据就被清除了,当然也无法访问Spark的存储数据。

三、共享变量

通常情况下,Spark应用程序运行的时候,Spark算子(例如map(func)或filter(func))中的函数func会被发送到远程的多个Worker节点上执行,如果一个算子中使用了某个外部变量,该变量就会复制到Worker节点的每一个Task任务中,各个Task任务对变量的操作相互独立。当变量所存储的数据量非常大时(例如一个大型集合)将增加网络传输及内存的开销。因此,Spark提供了两种共享变量:广播变量和累加器。

(一)广播变量

广播变量是将一个变量通过广播的形式发送到每个Worker节点的缓存中,而不是发送到每个Task任务中,各个Task任务可以共享该变量的数据。因此,广播变量是只读的。

准备工作:在/home目录里创建data.txt
在这里插入图片描述
上传到HDFS的/park目录
在这里插入图片描述

1、默认情况下变量的传递

集群方式启动spark shell
在这里插入图片描述

map()算子传入的函数中使用外部变量arr
在这里插入图片描述

val arr = Array(1, 2, 3, 4, 5)
val lines = sc.textFile("hdfs://master:9000/park/data.txt")
val result = lines.map((_, arr))
result.collect

上述代码中,传递给map()算子的函数(_, arr)会被发送到Executor端执行,而变量arr将发送到Worker节点的所有Task任务中。变量arr传递的流程如下图所示。
在这里插入图片描述
假设变量arr存储的数据量大小有100MB,则每一个Task任务都需要维护100MB的副本,若某一个Executor中启动了3个Task任务,则该Executor将消耗300MB内存。

2、使用广播变量时变量的传递

广播变量其实是对普通变量的封装,在分布式函数中可以通过Broadcast对象的value方法访问广播变量的值
在这里插入图片描述

val arr = Array(1,2,3,4,5)
val broadcastVar = sc.broadcast(arr) // 定义广播变量
broadcastVar.value

使用广播变量将数组arr传递给map()算子
在这里插入图片描述

val arr = Array(1, 2, 3, 4, 5)
val broadcastVar = sc.broadcast(arr) // 定义广播变量
val lines = sc.textFile("hdfs://master:9000/park/data.txt")
val result = lines.map((_, broadcastVar)) // 算子携带广播变量
result.collect

上述代码使用broadcast()方法向集群发送(广播)了一个只读变量,该方法只发送一次,并返回一个广播变量broadcastVar,该变量是一个org.apache.spark.broadcast.Broadcast对象。Broadcast对象是只读的,缓存在集群的每个Worker节点中。
在这里插入图片描述Worker节点的每个Task任务共享唯一的一份广播变量,大大减少了网络传输和内存开销。

通过遍历算子输出arr结果
在这里插入图片描述

val arr = Array(1,2,3,4,5)
val broadcastVar = sc.broadcast(arr) // 定义广播变量
val lines = sc.textFile("hdfs://master:9000/park/data.txt")
val result = lines.map((_, broadcastVar)) // 算子携带广播变量
result.collect.foreach(tuple => {
  print(tuple._1 + ": ")
  tuple._2.value.foreach(x => print(x + " "))
  println()
})

通过双重循环输出result的数据
在这里插入图片描述

for (tuple <- result.collect) {
  print(tuple._1 + ": ")
  for (x <- tuple._2.value)
    print(x + " ")
  println()
}

(二)累加器

1、累加器功能

累加器提供了将Worker节点的值聚合到Driver的功能,可以用于实现计数和求和。

2、不使用累加器

对一个整型数组求和
在这里插入图片描述

上述代码由于sum变量在Driver中定义,而累加操作sum = sum + x会发送到Executor中执行,因此输出结果不正确。

3、使用累加器

对一个整型数组求和
在这里插入图片描述

val myacc = sc.longAccumulator("acc") // 在Driver里声明累加器
val rdd = sc.makeRDD(Array(1, 2, 3, 4, 5)) // 创建RDD
rdd.foreach(x => myacc.add(x)) // 在Executor里向累加器添加值
println("sum = " + myacc.value) // 在Driver里输出累加结果

上述代码通过调用SparkContext对象的longAccumulator ()方法创建了一个Long类型的累加器,默认初始值为0。也可以使用doubleAccumulator()方法创建Double类型的累加器。

累加器只能在Driver端定义,在Executor端更新。Executor端不能读取累加器的值,需要在Driver端使用value属性读取。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/614790.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

mysql数据库出现Too many connections以及磁盘满了的查看方式

Too many connections问题 这问题是数据库连接数太多了导致的&#xff0c; 两个排查方向 1、当用户数量大的时候 先查看最大连接数show variables like ‘%max_connections%’; 这里的最大连接数就是2000&#xff0c;够用了&#xff0c;一般500-1000就够了&#xff0c;内存多…

【干货分享】3D模型可视化、格式转换引擎和Parasolid如何集成?

​今天分享一个示例项目&#xff0c;该示例项目使用HOOPS链轮将HOOPS Exchange和Siemens Parasolid实施到HOOPS Visualize中。 HOOPS中文网http://techsoft3d.evget.com/↓ 点击下方视频查看详情 ↓ HOOPS Visualize - Exchange和Parasolid集成视频 正如您在上面的视频中看到…

小白必看:零基础入门网络安全

1、什么是网络安全&#xff1f; 官方的回答&#xff1a;指网络系统的硬件、软件及其系统中的数据受到保护&#xff0c;不因偶然的或者恶意的原因而遭受到破坏、更改、泄露&#xff0c;系统连续可靠正常地运行&#xff0c;网络服务不中断。 具有保密性、完整性、可用性、可控性…

chatgpt赋能python:Python如何分成两栏写入Word文档

Python如何分成两栏写入Word文档 在进行文本排版时&#xff0c;有些时候我们需要将文字分成两栏来排版&#xff0c;这样可以让文章更加美观&#xff0c;易读。 本文将介绍一种使用Python将文本分成两栏写入Word文档的方法。在介绍具体实现方法之前&#xff0c;我们先来了解一…

【SLAM】ROS平台下三种自主探索算法总结

目录 前言 一、frontier_exploration 二、explorate_lite 三、rrt_exploration 总结 前言 探索是指当机器人处于一个完全未知或部分已知环境中&#xff0c;通过一定的方法&#xff0c;在合理的时间内&#xff0c;尽可能多的获得周围环境的完整信息和自身的精确定位&#…

自动化测试支持

自动化测试支持 自动化测试是现代软件开发中不可或缺的一环。它可以帮助开发团队快速、精确地检测软件中的缺陷&#xff0c;提高软件质量和开发效率。 自动化测试可以在代码变更频繁、测试用例数庞大时&#xff0c;显著地减少测试时间和工作量。相对于手动测试&#xff0c;自动…

集权设施攻防兵法:实战攻防之堡垒机篇

一、黑客视角下的堡垒机 堡垒机是一种网络安全设备&#xff0c;用于保护和管理企业内部网络与外部网络之间的访问。它作为一种中间节点&#xff0c;提供安全的访问控制和审计功能&#xff0c;用于保护内部网络免受未经授权的访问和攻击。堡垒机通常被用作跳板服务器&#xff0…

计算机网络实验:RIP路由协议配置

目录 前言实验目的实验内容相关知识点实验设备实验过程总结 前言 计算机网络是指由多台计算机通过通信设备和通信线路互联起来&#xff0c;实现信息交换的系统。计算机网络中的路由器是一种专用的网络设备&#xff0c;它负责根据目的地址选择最佳的传输路径&#xff0c;将数据…

容器(第二篇)docker网络

Docker 网络实现原理&#xff1a; Docker使用Linux桥接&#xff0c;在宿主机虚拟一个Docker容器网桥(docker0)&#xff0c;Docker启动一个容器时会根据Docker网桥的网段分配给容器一个IP地址&#xff0c;称为Container-IP&#xff0c;同时Docker网桥是每个容器的默认网关。因为…

6月6日汇报

1. 张量CP分解 三阶张量的CP分解是将其分解为三个矩阵。例如&#xff1a;一个三阶张量 &#xff0c;则CP分解可以写为 其中&#xff0c; 表示向量外积&#xff0c; 。下图为三阶张量的CP分解&#xff1a; 将上面的CP分解展开&#xff0c;也可以写为&#xff1a; 假设有一个三维…

LS1028/LS1043/LS1046+FPGA+TSN多路时间敏感性网络智能工业网关方案

随着 物联网、大数据、人工智能等技术的快速发展与应用&#xff0c;给传统的云计算模式带来了巨大的挑战&#xff0c;这也催生出了计算模式的变革&#xff0c; 边缘计算由此诞生。 所谓边缘计算&#xff0c;是指在靠近物或数据源头的一侧&#xff0c;采用网络、计算、存储、应用…

From Java To Kotlin 2:Kotlin 类型系统与泛型终于懂了

上期主要分享了 From Java To Kotlin 1 &#xff1a;空安全、扩展、函数、Lambda。 这是 From Java to Kotlin 第二期。 带来 表达式思维、子类型化、类型系统、泛型。 From Java to Kotlin 关键在于 思维的转变。 表达式思维 Kotlin 中大部分语句是表达式。 表达式思维是一…

Vue.js 中的数据请求是什么?如何进行数据请求?

Vue.js 中的数据请求是什么&#xff1f;如何进行数据请求&#xff1f; Vue.js 是一款流行的前端框架&#xff0c;它提供了许多方便的工具和 API&#xff0c;用于构建交互式的用户界面。其中&#xff0c;数据请求是 Vue.js 中重要的一部分&#xff0c;它可以让我们从服务器获取…

通过python封装商品ID采集1688商品详情数据,1688商品详情接口,1688API接口

1688是阿里巴巴集团旗下的B2B电商平台&#xff0c;提供海量的商品和服务。通过1688的API接口可以获取到商品的详细数据&#xff0c;并进行采集和分析。 1688的商品详情接口包括以下信息&#xff1a; 商品名称商品图片商品价格商品库存商品属性商品描述商品评价商品销量商品SK…

什么蓝牙耳机通话效果好,介绍几款不错的骨传导耳机

骨传导耳机是一种新型的耳机&#xff0c;相比于传统的耳机&#xff0c;骨传导耳机听歌时不需要将耳朵堵上&#xff0c;不会因为长时间佩戴而对听力造成损害。它不需要入耳也能听到声音&#xff0c;在户外运动时能够及时听到环境音&#xff0c;避免安全隐患。现在在骨传导市面上…

从零开始学习JavaScript:轻松掌握编程语言的核心技能⑤

从零开始学习JavaScript&#xff1a;轻松掌握编程语言的核心技能⑤ 1. JavaScript 函数定义2. JavaScript 函数参数2.1 函数显式参数(Parameters)与隐式参数(Arguments)2.1.1 显式参数&#xff08;Parameters&#xff09;2.1.2 隐式参数&#xff08;Arguments&#xff09; 2.2 …

HVV的艺术系列 之 上线的艺术

上线的艺术 很多时候&#xff0c;拿下的机器情况复杂多样。判断其出网性应该是首要工作。 01 到底该不该上线 承认的是&#xff0c;MSF和CS都是及其出色的后渗透工具。但是面对这种复杂多样的环境&#xff0c;上不上线是个我们要去认真考虑的问题&#xff0c;CS和MSF究竟能给我…

报表自动生成软件有哪些?热门报表自动生成软件推荐

随着数字化时代的到来&#xff0c;数据分析和处理变得越来越重要。在商业领域中&#xff0c;每个公司都需要制作各种类型的报表&#xff0c;以了解他们的运营情况、市场趋势和其他有关业务的信息。但是&#xff0c;手动创建这些报表是非常耗时且容易出错的。因此&#xff0c;报…

Vue3+Three.js+antvG2实战项目 智慧城市(五)

前言 在网上找了很久都没有找到使用Three.js开发智慧城市的免费文章或者免费视频,自己花了一点时间做了一个纯前端的智慧城市项目。 技术栈都是最新的:vue3vitetypeScriptThreeantv G2 源码分享 源码 模型,天空图盒子链接分享(不想下载源码可以只用下这个)提取码1234 20230424_…

报表开发工具Stimulsoft Report新增“用户参数”新功能,来看如何使用?

Stimulsoft Reports 是一款报告编写器&#xff0c;主要用于在桌面和Web上从头开始创建任何复杂的报告。可以在大多数平台上轻松实现部署&#xff0c;如ASP.NET, WinForms, .NET Core, JavaScript, WPF, Angular, Blazor, PHP, Java等&#xff0c;在你的应用程序中嵌入报告设计器…