Spark09: Spark之checkpoint

news2025/1/10 3:17:45

一、checkpoint概述

checkpoint,是Spark提供的一个比较高级的功能。有时候,我们的Spark任务,比较复杂,从初始化RDD开始,到最后整个任务完成,有比较多的步骤,比如超过10个transformation算子。而且,整个任务运行的时间也特别长,比如通常要运行1~2个小时。在这种情况下,就比较适合使用checkpoint功能了。因为对于特别复杂的Spark任务,有很高的风险会出现某个要反复使用的RDD因为节点的故障导致丢失,虽然之前持久化过,但是还是导致数据丢失了。那么也就是说,出现失败的时候,没有容错机制,所以当后面的transformation算子,又要使用到该RDD时,就会发现数据丢失了,此时如果没有进行容错处理的话,那么就需要再重新计算一次数据了。所以针对这种Spark Job,如果我们担心某些关键的,在后面会反复使用的RDD,因为节点故障导致数据
丢失,那么可以针对该RDD启动checkpoint机制,实现容错和高可用

如何使用checkpoint?

(1)首先要调用SparkContext的setCheckpointDir()方法,设置一个容错的文件系统的目录,比如HDFS;然后,对RDD调用checkpoint()方法。
(2)最后,在RDD所在的job运行结束之后,会启动一个单独的job,将checkpoint设置过的RDD的数据写入之
前设置的文件系统中。

二、RDD的checkpoint流程

1:SparkContext设置checkpoint目录,用于存放checkpoint的数据;对RDD调用checkpoint方法,然后它就会被RDDCheckpointData对象进行管理,此时这个RDD的checkpoint状态会被设置为Initialized。

2:待RDD所在的job运行结束,会调用job中最后一个RDD的doCheckpoint方法,该方法沿着RDD的血缘关系向上查找被checkpoint()方法标记过的RDD,并将其checkpoint状态从Initialized设置为
CheckpointingInProgress
3:启动一个单独的job,来将血缘关系中标记为CheckpointInProgress的RDD执行checkpoint操作,也就是将其数据写入checkpoint目录
4:将RDD数据写入checkpoint目录之后,会将RDD状态改变为Checkpointed;并且还会改变RDD的血缘关系,即会清除掉RDD所有依赖的RDD;最后还会设置其父RDD为新创建的CheckpointRDD

三、checkpoint与持久化的区别

(1)lineage是否发生改变。

lineage(血缘关系)说的就是RDD之间的依赖关系,持久化只是将数据保存在内存中或者本地磁盘文件中,RDD的lineage(血缘关系)是不变的;Checkpoint执行之后,RDD就没有依赖的RDD了,也就是它的lineage改变了。

(2)丢失数据的可能性。

持久化的数据丢失的可能性较大,如果采用 persist 把数据存在内存中的话,虽然速度最快但是也是最不可靠的,就算放在磁盘上也不是完全可靠的,因为磁盘也会损坏。Checkpoint的数据通常是保存在高可用文件系统中(HDFS),丢失的可能性很低

建议:对需要checkpoint的RDD,先执行persist(StorageLevel.DISK_ONLY)
为什么呢

因为默认情况下,如果某个RDD没有持久化,但是设置了checkpoint,那么这个时候,本来Spark任务已经执行结束了,但是由于中间的RDD没有持久化,在进行checkpoint的时候想要将这个RDD的数据写入外部存储系统的话,就需要重新计算这个RDD的数据,再将其checkpoint到外部存储系统中。如果对需要checkpoint的rdd进行了基于磁盘的持久化,那么后面进行checkpoint操作时,就会直接从磁盘上读取rdd的数据了,就不需要重新再计算一次了,这样效率就高了。那在这能不能使用基于内存的持久化呢?当然是可以的,不过没那个必要。

四、checkpoint的使用

1. scala代码

package com.sanqian.scala

import org.apache.spark.api.java.StorageLevels
import org.apache.spark.{SparkConf, SparkContext}


object CheckPointScala {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf()
    conf.setAppName("CheckPointScala")
    val sc = new SparkContext(conf)

    if (args.length == 0) {
      System.exit(100)
    }
    val outoutPath = args(0)
    // 1.设置checkpoint目录\
    sc.setCheckpointDir("hdfs://bigdata01:9000/chk001")

    val dataRDD = sc.textFile("hdfs://bigdata01:9000/hadoop")
    dataRDD.persist(StorageLevels.DISK_ONLY)
    // 2.对RDD执行checkpoint操作
    dataRDD.checkpoint()

    dataRDD.flatMap(_.split(" ")).map((_, 1)).reduceByKey(_ + _).saveAsTextFile(outoutPath)

    sc.stop()
  }
}

2. Java代码

package com.sanqian.java;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import scala.Tuple2;

import java.util.Arrays;
import java.util.Iterator;

public class CheckPointJava {
    public static void main(String[] args) {
        SparkConf conf = new SparkConf();
        conf.setAppName("CheckPointJava");
        JavaSparkContext sc = new JavaSparkContext(conf);

        if (args.length == 0){
            System.exit(100);
        }
        String outputPath = args[0];
        // 1.设置checkpoint目录
        sc.setCheckpointDir("hdfs://bigdata01:9000/chk001");
        JavaRDD<String> rdd = sc.textFile("hdfs://bigdata01:9000/hadoop");
        // 2.对RDD执行checkpoint操作
        rdd.checkpoint();
        
        rdd.flatMap(new FlatMapFunction<String, String>() {
            @Override
            public Iterator<String> call(String line) throws Exception {
                return Arrays.asList(line.split(" ")).iterator();
            }
        }).mapToPair(new PairFunction<String, String, Integer>() {
            @Override
            public Tuple2<String, Integer> call(String word) throws Exception {
                return new Tuple2<String, Integer>(word, 1);
            }
        }).reduceByKey(new Function2<Integer, Integer, Integer>() {
            @Override
            public Integer call(Integer v1, Integer v2) throws Exception {
                return v1 + v2;
            }
        }).saveAsTextFile(outputPath);

        sc.stop();
    }
}

3. 打包代码

(1)将pom.xml中的spark-core的依赖设置为provided,然后编译打包

    <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_2.11</artifactId>
            <version>2.4.3</version>
            <scope>provided</scope>
        </dependency>

(2)D:\ProgramData\IdeaProjects\db_spark>mvn clean package -DskipTests

 (3)将打包的jar包上传到bigdata04的/data/soft/sparkjars目录,创建一个新的spark-submit脚本

spark-submit \
--class $1 \
--master yarn \
--deploy-mode cluster \
--executor-memory 1G \
--num-executors 1 \
db_spark-1.0-SNAPSHOT.jar \
$2

 (4)提交任务:

 sh lwx_run.sh com.sanqian.scala.CheckPointScala /out-chk003

执行成功之后可以到 setCheckpointDir 指定的目录中查看一下,可以看到目录中会生成对应的文件保存rdd中的数据,只不过生成的文件不是普通文本文件,直接查看文件中的内容显示为乱码。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/341382.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

使用Python,Opencv检测图像,视频中的猫

使用Python&#xff0c;Opencv检测图像&#xff0c;视频中的猫&#x1f431; 这篇博客将介绍如何使用Python&#xff0c;OpenCV库附带的默认Haar级联检测器来检测图像中的猫。同样的技术也可以应用于视频流。这些哈尔级联由约瑟夫豪斯&#xff08;Joseph Howse&#xff09;训练…

Ubuntu最新版本(Ubuntu22.04LTS)安装Tftp服务及其使用教程

目录 一、概述 二、在Ubuntu安装Tftp服务器  &#x1f356;2.1 安装tftp服务端&#xff08;tftpd-hpa&#xff09;  &#x1f356;2.2 配置&#xff0c;修改/etc/default/tftpd-hpa  &#x1f356;2.3 创建tftp服务的下载目录  &#x1f356;2.4 重启tftp服务器 三、在Ubun…

C++高级篇学习笔记

文章目录 前言 本文记录C一些面试难点问题剖析。 1. 左右值和右值引用的作用 左值&#xff1a;可以在左边&#xff0c;表达式结束后依然存在的持久对象&#xff0c;一般有名字&#xff0c;可以取地址。 提示&#xff1a; 前置自加/自减 可以做左值&#xff1b; 右值在右边&a…

java08-面向对象3

一&#xff1a;static 关键字&#xff1a;静态的 1.可以用来修饰的结构:主要用来修饰类的内部结构 属性、方法、代码块、内部类 2. static 修饰属性&#xff1a;静态变量&#xff08;或类变量&#xff09; 2.1 属性&#xff0c;是否使用static修饰&#xff0c;又分为静态属…

应对新的挑战!ChatGPT将如何改变多域作战?

​公众号博主推送内容&#xff0c;未经许可&#xff0c;不得转载或者引用。 原文&#xff1a;Exploring the Possibilities of ChatGPT in Rugged Military AI Applications 《ChatGPT&#xff1a;利用最先进的技术支撑多域作战》 ChatGPT是一款基于GPT-3大型自然语言模型的…

Spring Security in Action 第六章 一个小型的安全网络应用程序

本专栏将从基础开始&#xff0c;循序渐进&#xff0c;以实战为线索&#xff0c;逐步深入SpringSecurity相关知识相关知识&#xff0c;打造完整的SpringSecurity学习步骤&#xff0c;提升工程化编码能力和思维能力&#xff0c;写出高质量代码。希望大家都能够从中有所收获&#…

Leetcode.1138 字母板上的路径

题目链接 Leetcode.1138 字母板上的路径 Rating &#xff1a; 1411 题目描述 我们从一块字母板上的位置 (0, 0)出发&#xff0c;该坐标对应的字符为 board[0][0]。 在本题里&#xff0c;字母板为board ["abcde", "fghij", "klmno", "pqr…

day01查询 排序 数据处理函数 分组

文章目录1、什么是数据库&#xff1f;什么是数据库管理系统&#xff1f;什么是SQL&#xff1f;他们之间的关系是什么&#xff1f;2、安装MySQL数据库管理系统。3、MySQL数据库的完美卸载&#xff01;4、看一下计算机上的服务&#xff0c;找一找MySQL的服务在哪里&#xff1f;5、…

autox.js在vscode(win7)与雷神模拟器上的开发环境配置

目录 下载autox.js 安装autox.js&#xff1f; 在电脑上搭建autox.js开发环境 安装vscode 安装autox.js插件 雷神模拟器连接vscode 设置雷神模拟器IP 设置autox.js应用IP地址等 下载autox.js 大体来说&#xff0c;就是一个运行在Android平台上的JavaScript 运行环境 和…

计算机软考好不好考?

软考看你备考哪一科&#xff1f;对软考证书的需求量怎么样&#xff1f;对自己工作就业是否有帮助&#xff1f;从而来体现软考的意义~ 软考是什么&#xff1f; 软考全称是计算机技术与软件专业技术资格考试&#xff0c;通俗来说就是职称考试&#xff0c;也可以说是技术水平认定…

嵌入式Linux系统开发笔记(十六)

根文件系统rootfs启动验证测试 接下来我们使用测试一下前面创建好的根文件系统 rootfs&#xff0c;测试方法使用 NFS 挂载。 6.1 检查是否在Ubuntu主机中安装和开启了NFS服务 &#xff08;特别注意&#xff1a;nfs 配置文件/etc/exports中添加的路径一定要与实际使用的绝对路…

Elasticsearch:如何在提高跨索引搜索相关性的同时返回更多相关的文档

在 Elasticsearch 的搜索中&#xff0c;经常遇到的情况是&#xff0c;我们创建一个 data view 或者 index pattern 跨多个索引&#xff0c;这样我们可以对它们进行统一的搜索。我们有遇到这样的情况&#xff1a;完全匹配的文档的分数反而低于部分匹配的文档&#xff0c;这是为什…

Synchronized和Lock的区别

在分布式开发中&#xff0c;锁是控制线程安全的重要方式。Java提供了两种锁机制synchronized 和 Lock。 1、特性区别 Synchronized是Java内置的线程同步关键字&#xff1b; Lock是JUC包下面的一个接口&#xff0c;它有很多实现类&#xff0c;比如ReentrantLock就是它的一个实…

内存优化 · 基础论 · 初识 Android 内存优化

【小木箱成长营】内存优化系列文章&#xff1a; 内存优化 工具论 常见的 Android 内存优化工具和框架 内存优化 方法论 揭开内存优化神秘面纱 内存优化 实战论 内存优化实践与应用 Tips: 关注微信公众号小木箱成长营&#xff0c;回复"内存优化"可免费获得内存优…

Linux驱动开发(二)

一、驱动流程 驱动需要以下几个步骤才能完成对硬件的访问和操作&#xff1a; 模块加载函数 module_init注册主次设备号 <应用程序通过设备号找到设备>驱动设备文件 <应用程序访问驱动的方式> 1、手动创建 &#xff08;mknod&#xff09;2、程序自动创建file_oper…

Synchronized 原理

基本特点(只考虑 JDK 1.8): 1. 开始时是乐观锁, 如果锁冲突频繁, 就转换为悲观锁.2. 开始是轻量级锁实现, 如果锁被持有的时间较长, 就转换成重量级锁.3. 实现轻量级锁的时候大概率用到的自旋锁策略4. 是一种不公平锁5. 是一种可重入锁6. 不是读写锁 加锁工作过程 JVM 将 s…

【Kafka】【三】安装Kafka服务器

Kafka基本知识 Kafka介绍 Kafka是最初由Linkedin公司开发&#xff0c;是⼀个分布式、⽀持分区的&#xff08;partition&#xff09;、多副本的 &#xff08;replica&#xff09;&#xff0c;基于zookeeper协调的分布式消息系统&#xff0c;它的最⼤的特性就是可以实时的处理 …

蓝牙安全(AES-CCM)

目录 AES-CCM CCM规范加密过程 CCM规范解密认证过程 formatting函数 counter generation函数 蓝牙AES-CCM加密流程 参考文献 AES-CCM Advanced Encryption Standard-Counter with Cipher Block Chaining-Message Authentication Code 自蓝牙4.1起蓝牙的加密算法开始采…

RabbitMQ-其他问题

一、幂等性问题&#xff1a;消费者在消费MQ中的消息时&#xff0c;MQ已把消息发送给消费者&#xff0c;消费者在给MQ返回ACK时网络中断&#xff0c;故MQ未收到确认消息&#xff0c;该消息会重新发送给其他消费者&#xff0c;或者在网络重连后再次发送给消费者&#xff0c;但实际…

第三章虚拟机的克隆,快照,迁移删除

1.虚拟机的克隆 如果你已经安装了一台linux操作系统&#xff0c;你还想再更多的&#xff0c;没有必要再重新安装&#xff0c;你只需要克 隆就可以&#xff0c;看演示。 方式1&#xff0c;直接拷贝一份安装好的虚拟机文件,再用虚拟机打开这个文件方式2&#xff0c;使用vmware的…