【Flink入门修炼】2-3 Flink Checkpoint 原理机制

news2024/10/7 8:29:40

如果让你来做一个有状态流式应用的故障恢复,你会如何来做呢?
单机和多机会遇到什么不同的问题?
Flink Checkpoint 是做什么用的?原理是什么?

一、什么是 Checkpoint?

Checkpoint 是对当前运行状态的完整记录。程序重启后能从 Checkpoint 中恢复出输入数据读取到哪了,各个算子原来的状态是什么,并继续运行程序。
即用于 Flink 的故障恢复。
这种机制保证了实时程序运行时,即使突然遇到异常也能够进行自我恢复。

二、如何实现 Checkpoint 功能?

如果让你来设计,对于流式应用如何做到故障恢复?
我们从最简单的单机单线程看起。

一)单机情况

同步执行,每次只处理一条数据

image.png

很简单,这种情况下,整个流程一次只处理一条数据。

  • 数据到 Write 阶段结束,各个算子记录一次各自状态信息(如读取的 offset、中间算子的状态)
  • 遇到故障需要恢复的时候,从上一次保存的状态开始执行
  • 当然为了降低记录带来的开销,可以攒一批之后再记录。
同时处理多条数据

每个计算节点还是只处理一条数据,但该节点空闲就可以处理下一条数据。
image.png

如果还按照一个数据 Write 阶段结束开始保存状态,就会出现问题:

  • 前面节点的状态,在处理下一个数据时被改过了
  • 从此时保存的记录恢复,前面的节点会出现重复处理的问题
  • 此时被称为 - 确保数据不丢(At Least Once)

一种解决方式:

  • 在输入数据中,定期插入一个 barrier
  • 各算子遇到 barrier 就开始做状态保留,并且不再接收新数据的计算。
  • 当前算子状态保留后,将 barrier 传递给下一个算子,并重复上面的步骤。
  • 当 barrier 传递到最后一个算子,并完成状态保留后,本次状态保留完成。

这样,各个节点保存的都是相同数据节点时的状态。
故障恢复时,能做到不重复处理数据,也就是精确一次(Exactly-once)。
image.png

但这里,你可能会发现一个问题:

  • 数据已经写出了怎么办?在两个保存点之间,已经把结果写到外部了,重启后不是又把部分数据再写了一次?

这里实际是**「程序内部精确一次」「端到端精确一次」**。
那么如何做到「端到端精确一次」?

  • 方案一:最后一个 sink 算子不直接向外部写出,等到 barrier 来了,才把这一批数据批量写出去
  • 方案二:两阶段提交。需要 sink 端支持(如 kafka)。
    • 方式类似于 MySQL 的事务。
    • sink 端正常向外部写出,不过输出端处于 pre-commit 状态,这些数据还不可读取
    • 当 sink 端等到 barrier 时,将输出端数据变为 committed,下游输出端的数据才正式可读

不过以上方法为了做到端到端精确一次,会带来数据延迟问题。(因为要等 Checkpoint 做完,数据才实际可读)。

解决数据延迟有一种方案:

  • 方案:幂等写入。同样一条数据,无论写入多少次对输出端看来都是一样的。(比如按照主键重复写这一条数据,并且数据本身没变化)

二)重要概念介绍

一致性级别

前面的例子中,我们提到了部分一致性级别,这里我们总结下。在流处理中,一致性可以分为 3 个级别:

  • at-most-once(最多一次): 这其实是没有正确性保障的委婉说法——故障发生之后,计数结果可能丢失。
  • at-least-once (至少一次): 这表示计数结果可能大于正确值,但绝不会小于正确值。也就是说,计数程序在发生故障后可能多算,但是绝不会少算。
  • exactly-once (精确一次): 这指的是系统保证在发生故障后得到的计数结果与正确值一致。恰好处理一次是最严格的保证,也是最难实现的。

按区间分:

  • 程序(Flink)内部精确一次
  • 端到端精确一次
Checkpoint 中保留的是什么信息?

🤔 如果是你来设计,checkpoint 都需要保留哪些信息,才能让程序恢复执行?
【这里说的就是 state
考虑一个开发需求:单词计数。
从 kafka 中读数据,处理逻辑是将输入数据拆分成单词,有一个 map 记录各个单词的数量,最后输出。

  • 从输入流中,拆分单词
  • 将统计的结果放到内存中一个 Map 集合,单词做为 key,对应的数量做为 value

想要恢复的时候还能接着上次的状态来,要么就需要几个信息:

  • 处理到哪条数据了
  • 中间状态是啥
  • 数据写出到哪条了

以及,上述信息应是针对同一条数据的。否则状态就乱了。
那么可以得到,保留的信息是:

source中间算子sink
已输入的数据(offset)[<hello, 5>, <world, 10>, …]写出到第几条了

三)多机多进程

随着业务的发展,单机已经不能满足需求了,开始并行分布式的处理。
读取、处理、写出,也不再是一个进程从头到尾干完,会拆分到多个机器上执行。也不再等待一条数据处理完,才处理下一条。
image.png

多机多线程,问题就开始变得复杂起来:

  • 如何确保状态拥有精确一次的容错保证?
  • 如何在分布式场景下,替多个拥有本地状态的算子产生一个全域一致的快照?
  • 对于流合并,合并节点会受到多个 barrier 如何处理?
  • 如何在不中断运算的前提下产生快照?

🤔 先思考下,如果还用单线程中 barrier 的方式来处理。会遇到什么问题,该如何解决?

处理流程

我们还是在数据流中插入 barrier。

  • 到达第一个 source 节点和之前的没区别,source 节点开始保存状态(offset)

image.png

  • 接下来,source 将 barrier 拆分为两个,分别发往下游的算子

image.png

  • 下游算子收到 barrier,开始记录状态

image.png

  • 关键是最后的 operator#2,它会收到多个 barrier
    • barrier 的初始目的是,收到 barrier 表示前面的数据都处理完了,要开始保存状态了
    • 两个绿色的节点(operator#1)分别发送 barrier,代表两个 barrier 之前处理过的数据,实际都是第一个蓝色节点(source)barrier 之前的数据。
    • 那么最后的橙色节点(operator#2),理应收到所有由绿色节点(operator#1)发送的 barrier,才代表数据已经收全了,可以开始保存状态。【叫做 barrier 对齐】

image.png

对于多分支合并的情况,在等待所有 barrier 到齐的过程中:

  • 先收到 barrier 的分支,还会有数据不断流入
  • 为了能做到精确一次(Exactly-once),就不能处理这些数据,需要先缓存起来,否则这个节点的状态就不对了
  • 上面一条反过来说,如果不等,直接处理,那么就是**至少一次(At Least Once)**的效果。(想想在故障恢复的时候,是不是就会重复计算了)

如何在不中断运算的前提下产生快照?
前面做快照,我们假设的是节点收到 barrier 后,就不再接收新数据,把当前节点状态保存后,再接收新数据,然后把 barrier 再向后传递。
那,是否必须这样串行来呢?

  • 卡住新数据,保存当前状态,这里必须串行,不串行状态就乱了
  • 但是,向后发送 barrier 可以同时做,不影响当前节点的保存

那,后面节点保存完了,前面节点还没保存完怎么办?

  • 没关系,一次 checkpoint 成功,需要等待所有节点都成功才行,保存的先后顺序无所谓

三、Flink Checkpoint 配置

程序中如何开启 checkpoint
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 开启 checkpoint,并设置间隔 ms
env.enableCheckpointing(1000);
// 模式 Exactly-Once、At-Least-Once
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
// 两个 checkpoint 之间最小间隔
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500);
// 超时时间
env.getCheckpointConfig().setCheckpointTimeout(60000);
// 同时执行的 checkpoint 数量(比如上一个还没执行完,下一个已经触发开始了)
env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
// 当用户取消了作业后,是否保留远程存储上的Checkpoint数据
env.getCheckpointConfig().enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);

checkpoint 存储

Flink 开箱即用地提供了两种 Checkpoint 存储类型:

  • JobManagerCheckpointStorage
    • 将 Checkpoint 快照存储在 JobManager 的堆内存中
  • FileSystemCheckpointStorage
    • 放到 HDFS 或本地磁盘中

四、小结

本节介绍了 Flink Checkpoint 故障恢复机制。从单机单线程,到多机多线程一步步分析如何实现状态保存和故障恢复。
同时对一致性级别进行了探讨,对程序内部和端到端一致性的实现方式给出了可行的方案。
后续会对 Checkpoint 程序内部实现原理进行剖析。


参考文章:
Flink Checkpoint 深入理解-CSDN博客
漫谈 Flink - Why Checkpoint - Ying
Flink之Checkpoint机制-阿里云开发者社区 (图不错)
Flink 状态一致性、端到端的精确一次(ecactly-once)保证 - 掘金
硬核!八张图搞懂 Flink 端到端精准一次处理语义 Exactly-once(深入原理,建议收藏)-腾讯云开发者社区-腾讯云

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1624868.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

机器学习day3

一、距离度量 1.欧氏距离 2.曼哈顿距离 3.切比雪夫距离 4.闵可夫斯基距离 二、特征与处理 1.数据归一化 数据归一化是一种将数据按比例缩放&#xff0c;使之落入一个小的特定区间的过程。 代码实战 运行结果 2.数据标准化 数据标准化是将数据按照其均值和标准差进行缩放的过…

语义分割模型——浅谈U-net相关理论

目录 1 U-net简介 1.1 U-net是什么 1.2 U-net的创新点及优势 2 U-net改进思路 2.1 编码器优化 2.2 跳跃连接优化 2.3 解码器优化 2.4 其他优化方式 2.5 注意事项 1 U-net简介 1.1 U-net是什么 Ronneberger等人于2015年基于FCN&#xff08;全卷积神经网络&#xff09…

【一般排查思路】针对银河麒麟高级服务器操作系统磁盘空间已满

1. 本身磁盘空间已满 有时候我们会看到服务器上有提示“设备上没有空间”&#xff0c;如图1。 图 1 如果是磁盘本身空间已满&#xff0c;我们可以借助du工具来排查&#xff0c;比如首先cd / 切换到根目录&#xff0c;然后 du -sh * | sort -rh | head -n 3查看空间占用最大的…

基于Springboot的在线动漫信息平台

基于SpringbootVue的在线动漫信息平台的设计与实现 开发语言&#xff1a;Java数据库&#xff1a;MySQL技术&#xff1a;SpringbootMybatis工具&#xff1a;IDEA、Maven、Navicat 系统展示 用户登录 首页 热门动漫 文章专栏 会员分享 论坛信息 动漫资讯 后台登录 动漫分类管…

【Redis】Redis 非关系型数据库 安装、配置、使用(全集)

目录 Redis 第一章 1、什么是redis 2、安装redis 1-7 8 3、redis使用 第二章 1、redis的使用 1、使用方式 2、使用Java代码使用redis 3、优化连接redis 2、五种数据类型 常用命令 string hash list set zset 不同数据类型存、取、遍历的方法 3、redis在项目…

C++ | Leetcode C++题解之第49题字母异位词分组

题目&#xff1a; 题解&#xff1a; class Solution { public:vector<vector<string>> groupAnagrams(vector<string>& strs) {// 自定义对 array<int, 26> 类型的哈希函数auto arrayHash [fn hash<int>{}] (const array<int, 26>&…

d12(121-125)-勇敢开始Java,咖啡拯救人生

目录 JDK8前的Date SimpleDateFormat 解析字符串时间成为日期对象 秒杀 Calendar JDK8之后的时间 LocalDate LocalTime LocalDateTime LocalDate 获取日期对象中的信息 修改某信息 把某信息加/减多少 获取指定时间的LocalDime对象 判断两日期对象 是否相等 在前还是…

Python 基于 OpenCV 视觉图像处理实战 之 OpenCV 简单人脸检测/识别实战案例 之六 简单进行人脸训练与识别

Python 基于 OpenCV 视觉图像处理实战 之 OpenCV 简单人脸检测/识别实战案例 之六 简单进行人脸训练与识别 目录 Python 基于 OpenCV 视觉图像处理实战 之 OpenCV 简单人脸检测/识别实战案例 之六 简单进行人脸训练与识别 一、简单介绍 二、简单进行人脸训练与识别 1、LBPH…

基于Spring Boot的考研资讯平台设计与实现

基于Spring Boot的考研资讯平台设计与实现 开发语言&#xff1a;Java框架&#xff1a;springbootJDK版本&#xff1a;JDK1.8数据库工具&#xff1a;Navicat11开发软件&#xff1a;eclipse/myeclipse/idea 系统部分展示 系统功能界面图&#xff0c;在系统首页可以查看首页、考…

公司服务器中的kafka消息中间件挂了,我是如何修复的?

今天的公司的system系统服务在运行过程中&#xff0c;提示连接不上kafuka的消息中间件。但是负责kafka的同事已经离职了&#xff0c;询问公司开发也不知道如何处理&#xff0c;我是如何重启kafka消息中间件使system系统服务正常运行&#xff1f; 查看kafka的安装位置 在下面的…

【UE5】蓝图通信方式

目录 1、直接通信 2、getAllActorsOfClass 3、getAllActorsOfClassWithTag 4、通过射线检测 5、接口 6、事件分发器 7、SpawnActor 8、调用控制台命令 9、关卡蓝图中直接调用 创建两个Actor蓝图 1、直接通信 场景中 2、getAllActorsOfClass 3、getAllActorsOfClassWit…

编写一个Java类 输入手机号码,验证其是否合法的完整实例

每个人的手机号码都是不一样的&#xff0c;那我们该如何保证用户输入的是合法的手机号码呢&#xff1f;这就需要我们在代码中对这个手机号进行验证&#xff0c;不能随便输入11位数字就行了。这时&#xff0c;就需要对用户传递过来的字符串参数进行校验。 下面我们介绍使用Java…

【Java数据结构】初步认识ArrayList与顺序表

前言~&#x1f973;&#x1f389;&#x1f389;&#x1f389; hellohello~&#xff0c;大家好&#x1f495;&#x1f495;&#xff0c;这里是E绵绵呀✋✋ &#xff0c;如果觉得这篇文章还不错的话还请点赞❤️❤️收藏&#x1f49e; &#x1f49e; 关注&#x1f4a5;&#x…

从零开始安装 stable diffusion webui v1.9.3 (windows10)

从零开始安装 stable diffusion webui v1.9.3 (windows10) CUDA 安装 CUDA 12.1 | https://developer.nvidia.com/cuda-toolkit-archive CUDNN 8.x | https://developer.nvidia.com/rdp/cudnn-archive 安装路径 F:/CUDA/v12.1 安装git git官网 | https://git-scm.com/ 安…

html显示PDF并兼容IE浏览器的解决方案

方案一、vue-pdf插件 缺点&#xff1a;IE11显示空白&#xff0c;编译后的Edge测试环境可以正常线上&#xff0c;打到线上报错&#xff0c;谷歌和百分浏览器显示完美 1、vue 只显示核心代码&#xff0c;需要安装vue-pdf插件 <vue-pdf :src"ivcPdfUrl"></v…

决策树分析及其在项目管理中的应用

决策树分析是一种分类学习方法&#xff0c;其主要用于解决分类和回归问题。在决策树中&#xff0c;每个内部节点表示一个属性上的测试&#xff0c;每个分支代表一个属性输出&#xff0c;而每个叶节点则代表类或类分布。通过从根节点到内部节点的路径&#xff0c;可以构建一系列…

commvault学习(6):备份oracle(包括oracle的安装)

1.环境 CS、MA&#xff1a;一台windows server2012 客户端&#xff1a;2台安装了oracle11g的windows server2008 1.1 windows server2008安装oracle11g &#xff08;1&#xff09;右击安装包内的setup&#xff0c;以管理员方式运行 &#xff08;2&#xff09;取消勾选接收安…

PFA容量瓶耐受强酸强碱进口特氟龙材质定容瓶

PFA容量瓶&#xff0c;也叫特氟龙容量瓶&#xff0c;是用于配制标准浓度溶液的实验室器皿&#xff0c;是有着细长颈、梨形肚的耐强腐蚀平底塑料瓶&#xff0c;颈上有标线&#xff0c;可直接配置标准溶液和准确稀释溶液以及制备样品溶液。 因其有着不易碎、材质纯净、化学稳定性…

用docker方式安装openGauss数据库的事项记录

文章目录 &#xff08;一&#xff09;背景&#xff08;二&#xff09;安装&#xff08;2.1&#xff09;安装docker&#xff08;2.2&#xff09;安装openGauss &#xff08;三&#xff09;运行&#xff08;3.1&#xff09;运行openGauss镜像&#xff08;3.2&#xff09;连接open…

docker安装【zookeeper】【kafka】【provectuslabs/kafka-ui】记录

目录 1.安装zookeeper:3.9.2-jre-172.安装kafka:3.7.03.安装provectuslabs/kafka-ui &#xff08;选做&#xff09;新环境没有jdk&#xff0c;安装jdk-17.0.10备用 mkdir -p /export/{data,apps,logs,conf,downloads}cd /export/downloadscurl -OLk https://download.oracle.…