Spark基础学习笔记----RDD检查点与共享变量

news2025/1/12 7:01:25

零、本讲学习目标

  1. 了解RDD容错机制
  2. 理解RDD检查点机制的特点与用处
  3. 理解共享变量的类别、特点与使用

一、RDD容错机制

  • 当Spark集群中的某一个节点由于宕机导致数据丢失,则可以通过Spark中的RDD进行容错恢复已经丢失的数据。RDD提供了两种故障恢复的方式,分别是血统(Lineage)方式设置检查点(checkpoint)方式

(一)血统方式

  • 根据RDD之间依赖关系对丢失数据的RDD进行数据恢复。若丢失数据的子RDD进行窄依赖运算,则只需要把丢失数据的父RDD的对应分区进行重新计算,不依赖其他节点,并且在计算过程中不存在冗余计算;若丢失数据的RDD进行宽依赖运算,则需要父RDD所有分区都要进行从头到尾计算,计算过程中存在冗余计算。

(二)设置检查点方式

  • 本质是将RDD写入磁盘存储。当RDD进行宽依赖运算时,只要在中间阶段设置一个检查点进行容错,即Spark中的sparkContext调用setCheckpoint()方法,设置容错文件系统目录作为检查点checkpoint,将checkpoint的数据写入之前设置的容错文件系统中进行持久化存储,若后面有节点宕机导致分区数据丢失,则以从做检查点的RDD开始重新计算,不需要从头到尾的计算,从而减少开销。

二、RDD检查点

(一)RDD检查点机制

  • RDD的检查点机制(Checkpoint)相当于对RDD数据进行快照,可以将经常使用的RDD快照到指定的文件系统中,最好是共享文件系统,例如HDFS。当机器发生故障导致内存或磁盘中的RDD数据丢失时,可以快速从快照中对指定的RDD进行恢复,而不需要根据RDD的依赖关系从头进行计算,大大提高了计算效率。

(二)与RDD持久化的区别

  • cache()或者persist()是将数据存储于机器本地的内存或磁盘,当机器发生故障时无法进行数据恢复,而检查点是将RDD数据存储于外部的共享文件系统(例如HDFS),共享文件系统的副本机制保证了数据的可靠性。
  • 在Spark应用程序执行结束后,cache()或者persist()存储的数据将被清空,而检查点存储的数据不会受影响,将永久存在,除非手动将其移除。因此,检查点数据可以被下一个Spark应用程序使用,而cache()或者persist()数据只能被当前Spark应用程序使用。

(三)RDD检查点案例演示

  • net.cl.rdd包里创建CheckpointDemo对象
package net.cl.rdd

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

object CheckpointDemo {
  def main(args: Array[String]): Unit = {
    // 设置系统属性(本地运行必须设置,否则无权访问HDFS)
    System.setProperty("HADOOP_USER_NAME", "root")
    // 创建SparkConf对象
    val conf = new SparkConf()
    // 设置应用程序名称,可在Spark WebUI里显示
    conf.setAppName("Spark-CheckpointDemo")
    // 设置集群Master节点访问地址
    conf.setMaster("local[2]")
    // 设置测试内存
    conf.set("spark.testing.memory", "2147480000")
    // 基于SparkConf对象创建SparkContext对象,该对象是提交Spark应用程序的入口
    val sc = new SparkContext(conf)

    // 设置检查点数据存储路径
    sc.setCheckpointDir("hdfs://master:9000/spark-ck")
    // 创建模拟数据RDD
    val rdd: RDD[Int] = sc.parallelize(List(1, 1, 2, 3, 5, 8, 13))
    // 过滤结果
    val resultRDD = rdd.filter(_ >= 5)
    // 持久化RDD到内存中
    resultRDD.cache()
    // 将resultRDD标记为检查点
    resultRDD.checkpoint()

    // 第一次行动算子计算时,将把标记为检查点的RDD数据存储到文件系统指定路径中
    val result: String = resultRDD.collect().mkString(", ")
    println(result)
    // 第二次行动算子计算时,将直接从文件系统读取resultRDD数据,而不需要从头计算
    val count = resultRDD.count()
    println(count)

    // 停止Spark容器
    sc.stop()
  }
}
  • 上述代码使用checkpoint()方法将RDD标记为检查点(只是标记,遇到行动算子才会执行)。在第一次行动计算时,被标记为检查点的RDD的数据将以文件的形式保存在setCheckpointDir()方法指定的文件系统目录中,并且该RDD的所有父RDD依赖关系将被移除,因为下一次对该RDD计算时将直接从文件系统中读取数据,而不需要根据依赖关系重新计算。
  • Spark建议,在将RDD标记为检查点之前,最好将RDD持久化到内存,因为Spark会单独启动一个任务将标记为检查点的RDD的数据写入文件系统,如果RDD的数据已经持久化到了内存,将直接从内存中读取数据,然后进行写入,提高数据写入效率,否则需要重复计算一遍RDD的数据。
  • 创建检查点保存数据的目录

 

  • 运行程序,在控制台查看结果

 

  • 查看HDFS检查点目录,执行命令:hdfs dfs -ls -R /spark-ck

 

三、共享变量

  • 通常情况下,Spark应用程序运行的时候,Spark算子(例如map(func)或filter(func))中的函数func会被发送到远程的多个Worker节点上执行,如果一个算子中使用了某个外部变量,该变量就会复制到Worker节点的每一个Task任务中,各个Task任务对变量的操作相互独立。当变量所存储的数据量非常大时(例如一个大型集合)将增加网络传输及内存的开销。因此,Spark提供了两种共享变量:广播变量和累加器。

(一)广播变量

  • 广播变量是将一个变量通过广播的形式发送到每个Worker节点的缓存中,而不是发送到每个Task任务中,各个Task任务可以共享该变量的数据。因此,广播变量是只读的。

1、默认情况下变量的传递

  • map()算子传入的函数中使用外部变量arr

scala> val arr = Array(1, 2, 3, 4, 5)
scala> val lines = sc.textFile("data.txt")
scala> val result = lines.map((_, arr))
scala> result.collect()

 

  • 上述代码中,传递给map()算子的函数(_, arr)会被发送到Executor端执行,而变量arr将发送到Worker节点所有Task任务中。变量arr传递的流程如下图所示

  • 假设变量arr存储的数据量大小有100MB,则每一个Task任务都需要维护100MB的副本,若某一个Executor中启动了3个Task任务,则该Executor将消耗300MB内存。

2、使用广播变量时变量的传递

  • 广播变量其实是对普通变量的封装,在分布式函数中可以通过Broadcast对象的value方法访问广播变量的值

 

  • 使用广播变量将数组arr传递给map()算子

scala> val arr = Array(1, 2, 3, 4, 5)
scala> val broadcastVar = sc.broadcast(arr)
scala> val lines = sc.textFile("data.txt")
scala> val result = lines.map((_, broadcastVar))
scala> result.collect()

 

  • 上述代码使用broadcast()方法向集群发送(广播)了一个只读变量,该方法只发送一次,并返回一个广播变量broadcastVar,该变量是一个org.apache.spark.broadcast.Broadcast对象。Broadcast对象是只读的,缓存在集群的每个Worker节点中。使用广播变量进行变量传递的流程如下图所示。

 

  • Worker节点的每个Task任务共享唯一的一份广播变量,大大减少了网络传输和内存开销。
  • 输出result的数据

 

(二)累加器

1、累加器功能

  • 累加器提供了将Worker节点的值聚合到Driver的功能,可以用于实现计数和求和。

2、不使用累加器

  • 对一个整型数组求和

 

  • 上述代码由于sum变量在Driver中定义,而累加操作sum = sum + x会发送到Executor中执行,因此输出结果不正确。

3、使用累加器

  • 对一个整型数组求和

 

scala> val myacc = sc.longAccumulator("My Accumulator") // 声明累加器
scala> val rdd = sc.makeRDD(Array(1, 2, 3, 4, 5))
scala> rdd.foreach(x => myacc.add(x)) // 向累加器添加值
scala> println("sum = " + myacc.value) // 在Driver端输出结果
  • 上述代码通过调用SparkContext对象的longAccumulator ()方法创建了一个Long类型的累加器,默认初始值为0。也可以使用doubleAccumulator()方法创建Double类型的累加器。
  • 累加器只能在Driver端定义,在Executor端更新。Executor端不能读取累加器的值,需要在Driver端使用value属性读取。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/561191.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Linux防火墙之firewalld基础

一、firewalld概述 firewalld防火墙是Centos7系统默认的防火墙管理工具,取代了之前的iptables防火墙,也是工作在网络层,属于包过滤防火墙。 firewalld和iptables都是用来管理防火墙的工具(属于用户态)来定义防火墙的…

数据结构 之 二叉搜索树 结构

二叉树搜索树的基本属性: 如图所示:二叉搜索树有四个最基本的属性:指向节点的根(root),节点中的键(key)、左指针(right)、右指针(right&#xff…

DELL PowerVault MD3600f存储维修 控制器更换 电池更换

MD3600f 系列存储阵列介绍 MD3600f 系列是采用 2U 机架固定的外部独立磁盘冗余阵列 (RAID) 存储阵列,可容纳多达 12 个 3.5 英寸或 24 个 2.5 英寸的 6.0-Gbps 串行连接SCSI (SAS) 磁盘。 MD3600f 系列存储阵列可以使用 MD1200 系列扩展机柜以菊花链式连接&#xff…

解决:在 Router 中父级未引入单文件组件而且 children 中的单文件组件不能在页面展示的问题

1、问题展示: 其一、问题描述: 在 router 中父级未引入单文件组件,而只是写了其它配置,但在其 children 中写了配置且引入了单文件组件而未能在页面中展示; 其二、代码: // 某一块的静态路由管理 {path…

2021下半年

2021下半年 a d a c b 阶码是纯整数,尾数是纯小数 对于阶码: 对于尾数: 选b c c a c b c b 归属于受委托方 a b c a 前向传播 反向传播,求关键路径 b b 关键路径上的活动松弛时间为0 c 中缀式:需…

chatgpt赋能Python-python_namedtuple

Python中的namedtuple 在Python中,namedtuple是一个方便且易于使用的数据结构,可以有效地处理元组数据。 它是Python标准库collections中一个实用的类,可以创建一个具有命名属性的元组,类似于一个简单的类对象。namedtuple的属性…

Linux防火墙iptables(下)

一、通用匹配 1,协议匹配 2,地址匹配 3,接口匹配 二、隐含匹配 1.端口匹配 2,TCP标志位匹配 3,ICMP类型匹配 ICMP类型可以是字符串、数字代码 ICMP类型含义Echo-Request (代码为8)表示请求…

设计模式之规约模式

设计模式之规约模式 引言规约模式案例改造 参考 引言 规约模式的英文是Specification Pattern,Specification直译过来是要求、技术说明、明确的意思。光看名字估计大家都是一脸懵逼,根本不知道这个设计模式大概会是一个什么样子。这也是设计模式的一个通…

33.Mybatis-Plus

一、Mybatis-Plus。 (1)简介。 (2)快速开始_准备工作。 对于Mybatis整合MP有常常有三种用法,分别是MybatisMP、SpringMybatisMP、Spring BootMybatisMP。 (2.1)创建数据库以及表。 1.创建数…

chatgpt赋能Python-python_lanbda函数

Python Lambda函数:快速、灵活的编程利器 Python是当前最流行的编程语言之一,而在Python中,Lambda函数是一项十分强大的功能,它可以帮助开发者在编写代码时更快地完成任务,提高代码的灵活性和可读性。本文将介绍Pytho…

【Linux】Linux小程序(进度条)、git命令行的使用及gdb的使用

😁作者:日出等日落 🔔专栏:Linux 辛勤的蜜蜂永没有时间悲哀。 ——布莱克 目录 \r和\n的区别: 进度条小程序 git 命令行 Linux调试器:…

Linux-模拟一个简单的shell

什么是shell外壳?就是操作系统给我们的一个命令行解释器,在Linux系统中,它的shell叫做bash。 那么bash本质是什么呢? 本质就是一个文件,一个进程。 万物皆文件 每个操作系统的shell都是很复杂的,想要…

chatgpt赋能Python-python_o_n_

Python O(n)的介绍 Python是世界上最流行的编程语言之一,因为其简单易学的语法,强大的功能和广泛的使用领域。对于程序员来说,时间复杂度是非常重要的一个概念。它用来描述一个算法在处理输入数据时所需的时间和空间资源。 在计算机科学的算…

CSS中块级元素,行内块元素,行内元素的特点

CSS自学笔记 目录 一、什么是元素显示模式 二、CSS的元素显示模式 1.块元素 2.行内元素 3.行内块元素 前言 网页的标签非常多,在不同地方会用到不同类型的标签,了解他们的特点可以更好的布局我们的网页。 HTML 元素一般分为块元素和行内元素两种类型…

220v转15v芯片-220v转15v用什么芯片?

FET开关,具有高效率和稳定性。 Q:为什么需要将220v转换为15v? A:在家庭电器和电子设备中,很多电路需要低电压直流电源供电。而家庭供电一般为220v交流电,需要经过转换才能得到所需的低电压直流电源。 Q&…

错题记录—哪个类用到了解决哈希冲突的开放定址法,MYSQL实现主从复制的日志是哪种,Java对象的初始化方式有

解决哈希冲突(四种方法): 1、开放定址法:我们在遇到哈希冲突时,去寻找一个新的空闲的哈希地址。 (1)线性探测法 (2)平方探测法(二次探测) 2、再哈…

题解2023.5.23(欧拉筛)

C.Hossamand Trainees 欧拉筛,预处理先筛出质数&#xff0c;分解质因数对于出现两次及以上的输出yes 我们需要筛出根号(1e9)以内的所有质数&#xff0c;根据质数定理&#xff0c;大约有4e^3个质数&#xff0c; 时间复杂度分析&#xff1a;le5*4e34e8 #include<bits/stdc.…

Python 3.10.11 liunx系统安装

官网下载 https://www.python.org/downloads/source/ 将tar包上传服务器安装 安装基础功能软件 yum -y install zlib-devel bzip2-devel openssl-devel ncurses-devel sqlite-devel readline-devel tk-devel gdbm-devel db4-devel libpcap-devel xz-devel 解压安装 tar -z…

chatgpt赋能Python-python_noob

Python for Beginners: An Introduction to the World’s Most Popular Programming Language Python is a high-level programming language that has become one of the most popular and widely used languages in the world. It’s simple, easy to read, and has a vast …

chatgpt赋能Python-python_pecan

Python Pecan: 构建Web应用程序的高效框架 Python是一种简单易学、功能强大的编程语言&#xff0c;非常适合Web应用程序的开发。而Pecan则是一个基于Python的高效框架&#xff0c;可以简化Web应用程序的开发过程。本文将介绍Python Pecan框架的优点、使用方法和性能表现。 什…