[尚硅谷flink] 检查点笔记

news2024/10/6 19:14:00

在Flink中,有一套完整的容错机制来保证故障后的恢复,其中最重要的就是检查点。


文章目录

      • 11.1 检查点
        • 11.1.1 检查点的保存
          • 1)周期性的触发保存
          • 2)保存的时间点
          • 3)保存的具体流程
        • 11.1.2 从检查点恢复状态
        • 11.1.3 检查点算法(状态保存的具体算法)
          • 1 检查点分界线(Barrier)
          • 2 Barrier对齐的精准一次
          • 3 Barrier对齐的至少一次
          • 4 非Barrier对齐的精准一次(flink 1.11 之后出现)
        • 11.1.4 检查点配置
          • 1 启用检查点
          • 2 检查点存储
          • 3 其它高级配置
          • 4 demo代码
          • 5 **通用增量checkpoint (changelog)**
          • 6 最终检查点
        • 11.1.5 检查点总结

11.1 检查点

在流处理中,我们可以用存档读档的思路,就是将之前某个时间点所有的状态保存下来,这份“存档”就是所谓的“检查点”(checkpoint)。
image.png
遇到故障重启的时候,我们可以从检查点中“读档”,恢复出之前的状态,这样就可以回到当时保存的一刻接着处理数据了。
:::info
这里所谓的“检查”,其实是针对故障恢复的结果而言的:故障恢复之后继续处理的结果,应该与发生故障前完全一致,我们需要“检查”结果的正确性。所以,有时又会把checkpoint叫做“一致性检查点”。
:::

11.1.1 检查点的保存
1)周期性的触发保存

“随时存档”确实恢复起来方便,可是需要我们不停地做存档操作。如果每处理一条数据就进行检查点的保存,当大量数据同时到来时,就会耗费很多资源来频繁做检查点,数据处理的速度就会受到影响。所以在Flink中,检查点的保存是周期性触发的,间隔时间可以进行设置。

2)保存的时间点

我们应该在所有任务(算子)都恰好处理完一个相同的输入数据的时候,将它们的状态保存下来。
这样做可以实现一个数据被所有任务(算子)完整地处理完,状态得到了保存。
:::info
如果出现故障,我们恢复到之前保存的状态,故障时正在处理的所有数据都需要重新处理;我们只需要让源(source)任务向数据源重新提交偏移量、请求重放数据就可以了。
:::
当然这需要源任务可以把偏移量作为算子状态保存下来,而且外部数据源能够重置偏移量;kafka就是满足这些要求的一个最好的例子。

3)保存的具体流程

检查点的保存,最关键的就是要等所有任务将“同一个数据”处理完毕。下面我们通过一个具体的例子,来详细描述一下检查点具体的保存过程。
回忆一下我们最初实现的统计词频的程序——word count。这里为了方便,我们直接从数据源读入已经分开的一个个单词,例如这里输入的是:
“hello”,“world”,“hello”,“flink”,“hello”,“world”,“hello”,“flink”…
我们所需要的就是每个任务都处理完“hello”之后保存自己的状态。

11.1.2 从检查点恢复状态

:::info

  1. 重启应用
  2. 读取检查点,重置状态
  3. 重置偏移量(向外部数据源重新提交偏移量,比如说Kafka)
  4. 继续处理数据
    :::
    image.png
    image.png
    image.png
    image.png
    image.png
11.1.3 检查点算法(状态保存的具体算法)

在Flink中,采用了基于Chandy-Lamport算法的分布式快照,可以在不暂停整体流处理的前提下,将状态备份保存到检查点。

1 检查点分界线(Barrier)

:::info
借鉴水位线的设计,在数据流中插入一个特殊的数据结构,专门用来表示触发检查点保存的时间点。
barrier肯定是从source算子开始。
:::
收到保存检查点的指令后,Source任务可以在当前数据流中插入这个结构;
之后的所有任务只要遇到它就开始对状态做持久化快照保存。
由于数据流是保持顺序依次处理的,因此遇到这个标识就代表之前的数据都处理完了,可以保存一个检查点;而在它之后的数据,引起的状态改变就不会体现在这个检查点中,而需要保存到下一个检查点。
这种特殊的数据形式,把一条流上的数据按照不同的检查点分隔开,所以就叫做检查点的“分界线”(Checkpoint Barrier)。
image.png

2 Barrier对齐的精准一次

watermark指示的是“之前的数据全部到齐了”,而barrier指示的是“之前所有数据的状态更改保存入当前检查点”:它们都是一个“截止时间”的标志。所以在处理多个分区的传递时,也要以是否还会有数据到来作为一个判断标准。
:::info
具体实现上,Flink使用了Chandy-Lamport算法的一种变体,被称为“异步分界线快照”算法。算法的核心就是两个原则:

  1. 当上游任务向多个并行下游任务发送barrier时,需要广播出去;
  2. 而当多个上游任务向同一个下游任务传递分界线时,需要在下游任务执行“分界线对齐”操作,也就是需要等到所有并行分区的barrier都到齐,才可以开始状态的保存。
    :::
    检查点保存算法具体过程为:
    image.png
    image.png
    image.png
    image.png
    (1)触发检查点:JobManager向Source发送Barrier;
    (2)Barrier发送:向下游广播发送;
    (3)Barrier对齐:下游需要收到上游所有并行度传递过来的Barrier才做自身状态的保存;
    (4)状态保存:有状态的算子将状态保存至持久化。
    (5)先处理缓存数据,然后正常继续处理
    完成检查点保存之后,任务就可以继续正常处理数据了。这时如果有等待分界线对齐时缓存的数据,需要先做处理;然后再按照顺序依次处理新到的数据。当JobManager收到所有任务成功保存状态的信息,就可以确认当前检查点成功保存。之后遇到故障就可以从这里恢复了。
    (补充)由于分界线对齐要求先到达的分区做缓存等待,一定程度上会影响处理的速度;当出现背压时,下游任务会堆积大量的缓冲数据,检查点可能需要很久才可以保存完毕。

为了应对这种场景,Barrier对齐中提供了至少一次语义以及Flink 1.11之后提供了不对齐的检查点保存方式,可以将未处理的缓冲数据也保存进检查点。这样,当我们遇到一个分区barrier时就不需等待对齐,而是可以直接启动状态的保存了。

3 Barrier对齐的至少一次

:::info
精准一次性,在分界线对齐时,barrier 后面的数据需要等待对齐。而至少一次,可以直接进入算子进行计算。
:::

4 非Barrier对齐的精准一次(flink 1.11 之后出现)

:::info
与barrier,在分界线对齐,保存状态的时候,引入了输入缓冲区,以及输出缓冲区,当barrier到来时,不进行计算,直接传入道输出缓冲区的最后一个,然后继续接收数据进行处理,处理完毕后就是输出缓冲区排队。
所以与barrier最大的区别就是在保存状态时,连同缓冲区的数据队列状态一起保存。类似于空间换时间的思想。
:::
image.png
image.png
image.png
image.png

11.1.4 检查点配置

检查点的作用是为了故障恢复,我们不能因为保存检查点占据了大量时间、导致数据处理性能明显降低。为了兼顾容错性和处理性能,我们可以在代码中对检查点进行各种配置。

1 启用检查点

默认情况下,Flink程序是禁用检查点的。如果想要为Flink应用开启自动保存快照的功能,需要在代码中显式地调用执行环境的.enableCheckpointing()方法:

StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
// 每隔1秒启动一次检查点保存
env.enableCheckpointing(1000);

这里需要传入一个长整型的毫秒数,表示周期性保存检查点的间隔时间。如果不传参数直接启用检查点,默认的间隔周期为500毫秒,这种方式已经被弃用。
检查点的间隔时间是对处理性能和故障恢复速度的一个权衡。如果我们希望对性能的影响更小,可以调大间隔时间;而如果希望故障重启后迅速赶上实时的数据处理,就需要将间隔时间设小一些。

2 检查点存储

检查点具体的持久化存储位置,取决于“检查点存储”的设置。默认情况下,检查点存储在JobManager的堆内存中。而对于大状态的持久化保存,Flink也提供了在其他存储位置进行保存的接口。
具体可以通过调用检查点配置的.setCheckpointStorage()来配置,需要传入一个CheckpointStorage的实现类。Flink主要提供了两种CheckpointStorage:作业管理器的堆内存和文件系统。

// 配置存储检查点到JobManager堆内存
env.getCheckpointConfig().setCheckpointStorage(new JobManagerCheckpointStorage());

// 配置存储检查点到文件系统
env.getCheckpointConfig().setCheckpointStorage(new FileSystemCheckpointStorage("hdfs://namenode:40010/flink/checkpoints"));

对于实际生产应用,我们一般会将CheckpointStorage配置为高可用的分布式文件系统(HDFS,S3等)。

3 其它高级配置

检查点还有很多可以配置的选项,可以通过获取检查点配置(CheckpointConfig)来进行设置。

CheckpointConfig checkpointConfig = env.getCheckpointConfig();

1)常用高级配置

  • 检查点模式(CheckpointingMode)

设置检查点一致性的保证级别,有“精确一次”(exactly-once)和“至少一次”(at-least-once)两个选项。默认级别为exactly-once,而对于大多数低延迟的流处理程序,at-least-once就够用了,而且处理效率会更高。

  • 超时时间(checkpointTimeout)

用于指定检查点保存的超时时间,超时没完成就会被丢弃掉。传入一个长整型毫秒数作为参数,表示超时时间。

  • 最小间隔时间(minPauseBetweenCheckpoints)

用于指定在上一个检查点完成之后,检查点协调器最快等多久可以出发保存下一个检查点的指令。这就意味着即使已经达到了周期触发的时间点,只要距离上一个检查点完成的间隔不够,就依然不能开启下一次检查点的保存。这就为正常处理数据留下了充足的间隙。当指定这个参数时,实际并发为1。

  • 最大并发检查点数量(maxConcurrentCheckpoints)

用于指定运行中的检查点最多可以有多少个。由于每个任务的处理进度不同,完全可能出现后面的任务还没完成前一个检查点的保存、前面任务已经开始保存下一个检查点了。这个参数就是限制同时进行的最大数量。

  • 开启外部持久化存储(enableExternalizedCheckpoints)

用于开启检查点的外部持久化,而且默认在作业失败的时候不会自动清理,如果想释放空间需要自己手工清理。里面传入的参数ExternalizedCheckpointCleanup指定了当作业取消的时候外部的检查点该如何清理。

DELETE_ON_CANCELLATION:在作业取消的时候会自动删除外部检查点,但是如果是作业失败退出,则会保留检查点。

RETAIN_ON_CANCELLATION:作业取消的时候也会保留外部检查点。

  • 检查点连续失败次数(tolerableCheckpointFailureNumber)

用于指定检查点连续失败的次数,当达到这个次数,作业就失败退出。默认为0,这意味着不能容忍检查点失败,并且作业将在第一次报告检查点失败时失败。
2)开启非对齐检查点

  • 非对齐检查点(enableUnalignedCheckpoints)

不再执行检查点的分界线对齐操作,启用之后可以大大减少产生背压时的检查点保存时间。这个设置要求检查点模式(CheckpointingMode)必须为exctly-once,并且最大并发的检查点个数为1。

  • 对齐检查点超时时间(alignedCheckpointTimeout)

该参数只有在启用非对齐检查点的时候有效。参数默认是0,表示一开始就直接用非对齐检查点。如果设置大于0,一开始会使用对齐的检查点,当对齐时间超过该参数设定的时间,则会自动切换成非对齐检查点。

4 demo代码
public class CheckPointConfig {
    public static void main(String[] args) throws Exception {
        Configuration configuration = new Configuration();

// 指定本地WEB-UI端口号
        configuration.setInteger(RestOptions.PORT, 8082);
        StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(configuration);
        env.setParallelism(1);

        // 代码中用到hdfs,需要导入hadoop依赖、指定访问hdfs的用户名
        System.setProperty("HADOOP_USER_NAME", "atguigu");

        // TODO 检查点配置
        // 1、启用检查点: 默认是barrier对齐的,周期为5s, 精准一次
        env.enableCheckpointing(5000, CheckpointingMode.EXACTLY_ONCE);
        CheckpointConfig checkpointConfig = env.getCheckpointConfig();
        // 2、指定检查点的存储位置
//        checkpointConfig.setCheckpointStorage("hdfs://hadoop102:8020/chk");
        checkpointConfig.setCheckpointStorage("file:///Users/xx/project/big_data/flink/flink_study/chk");
        // 3、checkpoint的超时时间: 默认10分钟
        checkpointConfig.setCheckpointTimeout(60000);
        // 4、同时运行中的checkpoint的最大数量
        checkpointConfig.setMaxConcurrentCheckpoints(1);
        // 5、最小等待间隔: 上一轮checkpoint结束 到 下一轮checkpoint开始 之间的间隔,设置了>0,并发就会变成1
        checkpointConfig.setMinPauseBetweenCheckpoints(1000);
        // 6、取消作业时,checkpoint的数据 是否保留在外部系统
        // DELETE_ON_CANCELLATION:主动cancel时,删除存在外部系统的chk-xx目录 (如果是程序突然挂掉,不会删)
        // RETAIN_ON_CANCELLATION:主动cancel时,外部系统的chk-xx目录会保存下来
        checkpointConfig.setExternalizedCheckpointCleanup(CheckpointConfig.ExternalizedCheckpointCleanup.DELETE_ON_CANCELLATION);
        // 7、允许 checkpoint 连续失败的次数,默认0--》表示checkpoint一失败,job就挂掉
        checkpointConfig.setTolerableCheckpointFailureNumber(10);

        // TODO 开启 非对齐检查点(barrier非对齐)
        // 开启的要求: Checkpoint模式必须是精准一次,最大并发必须设为1
        checkpointConfig.enableUnalignedCheckpoints();
        // 开启非对齐检查点才生效: 默认0,表示一开始就直接用 非对齐的检查点
        // 如果大于0, 一开始用 对齐的检查点(barrier对齐), 对齐的时间超过这个参数,自动切换成 非对齐检查点(barrier非对齐)
        checkpointConfig.setAlignedCheckpointTimeout(Duration.ofSeconds(1));


        env
            .socketTextStream("localhost", 7777)
            .flatMap(
                (String value, Collector<Tuple2<String, Integer>> out) -> {
                    String[] words = value.split(" ");
                    for (String word : words) {
                        out.collect(Tuple2.of(word, 1));
                    }
                }
            )
            .returns(Types.TUPLE(Types.STRING, Types.INT))
            .keyBy(value -> value.f0)
            .sum(1)
            .print();

        env.execute();
    }
}
5 通用增量checkpoint (changelog)

了解

6 最终检查点

如果数据源是有界的,就可能出现部分Task已经处理完所有数据,变成finished状态,不继续工作。从Flink 1.14 开始,这些finished状态的Task,也可以继续执行检查点。自1.15 起默认启用此功能,并且可以通过功能标志禁用它:

Configuration config = new Configuration();
config.set(ExecutionCheckpointingOptions.ENABLE_CHECKPOINTS_AFTER_TASKS_FINISH, false);
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(config);
11.1.5 检查点总结
  • Barrier对齐: 一个Task 收到 所有上游 同一个编号的 barrier之后,才会对自己的本地状态做备份
    • 精准一次:在barrier对齐过程中,barrier后面的数据 阻塞等待(不会越过barrier)
    • 至少一次:在barrier对齐过程中,先到的barrier,其后面的数据 不阻塞 接着计算
  • 非Barrier 对齐:一个Task 收到 第一个 barrier时,就开始 执行备份,能保证 精准一次(fhink 1.11出的新算法)

先到的barrier,将 本地状态 备份,其后面的数据接着计算输出未到的barrier,其 前面的数据 接着计算输出,同时 也保存到备份中最后一个barrier到达 该Task时,这个Task的备份结束

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1594677.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

计算机服务器中了rmallox勒索病毒怎么办,rmallox勒索病毒解密流程步骤

在企业的生产运营过程中网络发挥着巨大作用&#xff0c;利用网络可以拓宽市场&#xff0c;提高办公效率&#xff0c;网络为企业的生产运营提供了极大便利&#xff0c;但也为企业的数据安全带来隐患。近日&#xff0c;云天数据恢复中心接到多家企业的求助&#xff0c;企业的计算…

二维相位解包理论算法和软件【全文翻译- 菲林(Flynn)最小不连续性方法(4.5)】

4.5 菲林最小不连续性方法 在迄今为止对路径跟踪算法的讨论中,我们忽略了一种非常自然的方法,现在我们将对其进行描述。如果我们仔细观察图 4.42(a)中包裹相位数据中的条纹图案,就会发现 "条纹线 "或最亮像素和最暗像素之间的边界标志着从 0 到 2π 的过渡,它们…

Linux gcc 6

本章开始学习工具 什么是工具&#xff1f; 本质也是指令 yum 命令 小火车 sudo yum install sl&#xff08;安装sl&#xff09; sudo yum install -y sl //直接yes就不提示了 yum list //将yum源上的软件都穷举出来 yum search sl //结果不友好&#xff0c;不推荐 yum lis…

智能革命:未来人工智能创业的天地

智能革命&#xff1a;未来人工智能创业的天地 一、引言 在这个数字化迅速变革的时代&#xff0c;人工智能(AI)已经从一个边缘科学发展成为推动未来经济和社会发展的关键动力。这一技术领域的飞速进步&#xff0c;不仅影响着科技行业的每一个角落&#xff0c;更是为创业者提供了…

PTA 2813:画家问题(熄灯问题)

有一个正方形的墙&#xff0c;由NN个正方形的砖组成&#xff0c;其中一些砖是白色的&#xff0c;另外一些砖是黄色的。Bob是个画家&#xff0c;想把全部的砖都涂成黄色。但他的画笔不好使。当他用画笔涂画第(i,j)个位置的砖时&#xff0c; 位置(i−1,j)、 (i1,j)、(i,j−1)、(i…

设计模式学习笔记 - 设计模式与范式 -行为型:17.中介模式:什么时候用中介模式?什么时候用观察者模式?

概述 本章学习 23 种经典设计模式中的最后一个设计模式&#xff0c;中介模式。和之前讲过的命令模式、解释器模式类似&#xff0c;中介模式也不怎么常用&#xff0c;应用场景比较特殊、有限&#xff0c;但是&#xff0c;跟它俩不同的是&#xff0c;中介模式理解起来并不难&…

《手把手教你》系列基础篇(八十六)-java+ selenium自动化测试-框架设计基础-Log4j实现日志输出(详解教程)

1.简介 自动化测试中如何输出日志文件。任何软件&#xff0c;都会涉及到日志输出。所以&#xff0c;在测试人员报bug&#xff0c;特别是崩溃的bug&#xff0c;一般都要提供软件产品的日志文件。开发通过看日志文件&#xff0c;知道这个崩溃产生的原因&#xff0c;至少知道触发崩…

图文教程 | 2024Typora最新版免费激活使用教程(新旧版可用)

一、打开官网下载最新版Typora Typora 官网下载 安装&#xff1a; Typora中文官网&#xff1a;https://typoraio.cn/ Typora官网&#xff1a;https://typora.io/releases/all 官网长这个样子 下面这个不是官网&#xff01;&#xff01;&#xff01;&#xff01;注意&#x…

《一》Qt的概述

1.1 什么是Qt Qt是一个跨平台的C图形用户界面应用程序框架。它为应用程序开发者提供建立图形界面所需的所有功能。它是完全面向对象的&#xff0c;很容易扩展&#xff0c;并且允许真正的组件编程。 1.2 Qt的发展史 1991年 Qt最早由芬兰奇趣科技开发 1996年 进入商业领域&#x…

【Django开发】0到1美多商城项目md教程第7篇:登录,1. 互联开发者申请步骤【附代码文档】

美多商城完整教程&#xff08;附代码资料&#xff09;主要内容讲述&#xff1a;欢迎来到美多商城&#xff01;&#xff0c;项目准备。展示用户注册页面&#xff0c;创建用户模块子应用。用户注册业务实现&#xff0c;用户注册前端逻辑。图形验证码&#xff0c;图形验证码接口设…

结合 react-webcam、three.js 与 electron 实现桌面人脸动捕应用

系列文章目录 React 使用 three.js 加载 gltf 3D模型 | three.js 入门React three.js 3D模型骨骼绑定React three.js 3D模型面部表情控制React three.js 实现人脸动捕与3D模型表情同步结合 react-webcam、three.js 与 electron 实现桌面人脸动捕应用 示例项目(github)&…

Jackson 2.x 系列【19】模块 Module

有道无术&#xff0c;术尚可求&#xff0c;有术无道&#xff0c;止于术。 本系列Jackson 版本 2.17.0 源码地址&#xff1a;https://gitee.com/pearl-organization/study-jaskson-demo 文章目录 1. 前言2. 核心类2.1 Module2.2 SimpleModule 3. 案例演示3.1 自定义模块3.2 注册…

ES查询和监控

es安装 参考https://blog.csdn.net/okiwilldoit/article/details/137107087 再安装kibana&#xff0c;在它的控制台里写es查询语句。 es指南 es权威指南-中文版&#xff1a; kibana用户手册-中文版&#xff1a; es中文社区 es参考手册API es客户端API es查询语句 # 查询e…

杰发科技AC7840——CAN通信简介(3)_时间戳

0. 时间戳简介 时间戳表示的是收到该CAN消息的时刻&#xff0c;通过连续多帧的时间戳&#xff0c;可以计算出CAN消息的发送周期&#xff0c;也可以用于判断CAN消息是否被持续收到。 1. 使用步骤 注意分别是发送和接收的功能&#xff1a; 2. 现象分析_接收时间戳 看下寄存器的…

鸿蒙端云一体化开发--开发云函数--适合小白体制

开发云函数 那什么是云函数&#xff1f;我们将来又怎么去使用这个云函数呢&#xff1f; 答&#xff1a;我们之前要编写一些服务端的业务逻辑代码&#xff0c;那现在&#xff0c;在这种端云一体化的开发模式下&#xff0c;我们是把服务端的业务逻辑代码&#xff0c;通过云函数来…

HackTheBox-Machines--MonitorsTwo

文章目录 0x01 信息收集0x02 CVE-2022-46169 漏洞利用0x03 权限提升0x04 提升到root权限 MonitorsTwo 测试过程 0x01 信息收集 a.端口扫描: 发现22、80端口    b.信息收集: 1.2.22 Cacti信息收集 nmap -sC -sV 10.129.186.1321.访问 10.129.186.132&#xff0c;为 1.2.22 Ca…

Java 面试宝典:你知道多少种解决 hash 冲突的方法?

大家好&#xff0c;我是大明哥&#xff0c;一个专注「死磕 Java」系列创作的硬核程序员。 本文已收录到我的技术网站&#xff1a;https://www.skjava.com。有全网最优质的系列文章、Java 全栈技术文档以及大厂完整面经 回答 在使用 hash 表时&#xff0c; hash 冲突是一个非常…

01-Three.js

引入three.js 1.script标签引入 <!DOCTYPE html> <html lang"en"><head><meta charset"UTF-8"><title>Three.js中文网&#xff1a;http://www.webgl3d.cn/</title><!-- 引入three.js --><script src"…

恢复MySQL!是我的条件反射,PXB开源的力量...

&#x1f4e2;&#x1f4e2;&#x1f4e2;&#x1f4e3;&#x1f4e3;&#x1f4e3; 哈喽&#xff01;大家好&#xff0c;我是【IT邦德】&#xff0c;江湖人称jeames007&#xff0c;10余年DBA及大数据工作经验 一位上进心十足的【大数据领域博主】&#xff01;&#x1f61c;&am…

【Linux】账号和权限管理

目录 一、用户账号与组账号 二、添加用户账号-useradd 三、修改用户账号的属性-usermod 四、更改用户命令-passwd 五、删除用户账号-userdel 六、添加组账号-groupadd 七、添加删除组成员-gpasswd 八、删除组账号-groupdel 九、查询账号信息-groups、id、finger、w、w…