Flink系列知识之:Checkpoint原理

news2024/11/13 6:51:36

Flink系列知识之:Checkpoint原理

在介绍checkpoint的执行流程之前,需要先明白Flink中状态的存储机制,因为状态对于检查点的持续备份至关重要。

State Backends分类

下图显示了Flink中三个内置的状态存储种类。MemoryStateBackend和FsStateBackend在运行时存储在Java堆中。FsStateBackend仅在执行检查点时才以文件的形式持久地将数据保存到远程存储。RocksDBStateBackend使用RocksDB(一种LSM数据库,结合了内存和磁盘)来存储状态。
在这里插入图片描述

当使用基于堆的 state backend 保存状态时,访问和更新涉及在堆上读写对象。但是对于保存在 RocksDBStateBackend 中的对象,访问和更新涉及序列化和反序列化,所以会有更大的开销。但 RocksDB 的状态量仅受本地磁盘大小的限制。还要注意,只有 RocksDBStateBackend 能够进行增量快照,这对于具有大量变化缓慢状态的应用程序来说是大有裨益的。

下面是执行HeapKeyedStateBackend的方法:

  • 支持异步检查点(默认):存储格式为CopyOnWriteStateMap。
  • 仅支持同步检查点:存储格式为NestedStateMap。
    当在MemoryStateBackend中使用HeapKeyedStateBackend时,默认情况下,基于检查点的数据序列化的最大数据量为5mb。
    对于RocksDBKeyedStateBackend,每个状态都存储在一个单独的列族中。keyGroup、Key和Namespace被序列化并以键的形式存储在数据库中。

对于RocksDBKeyedStateBackend,每个状态都存储在一个单独的列族中。keyGroup、Key和Namespace被序列化并以键的形式存储在数据库中。
在这里插入图片描述

checkpoint执行流程

Flink容错机制的核心部分是绘制分布式数据流和算子状态的一致快照。这些快照充当一致的检查点,系统可以在发生故障时退回到这些检查点。它受到用于分布式快照的标准Chandy-Lamport算法的启发,并专门针对Flink的执行模型进行了定制。

自Flink 1.11以来,检查点可以在对齐或不对齐的情况下进行。在本节中,我们首先描述对齐的检查点。

Checkpoint barrier

Flink分布式快照的一个核心元素是stream barrier。这些barrier会被注入到数据流中,并作为数据流的一部分与记录一起流动。当 Flink 作业设置了检查点时,Flink 会在数据流中插入这些特殊记录,以确保在特定点上所有算子的状态都被一致地保存。barrier永远不会超过记录,它们严格地按顺序流动。barrier将数据流中的记录分隔为进入当前快照的记录集和进入下一个快照的记录集,相当于将连续的数据流切分为多个有限序列,对应多个 Checkpoint 周期。每个barrier都携带着包含了在它前面的记录的快照的ID。barrier不会中断数据流,因此非常轻巧。来自不同快照的多个barrier可以同时在数据流中,这意味着多种快照可能并发发生。整个过程是由 Flink 的执行引擎在运行时负责处理的,通过协调不同操作符之间的信号和状态来实现数据流中的 checkpoint barrier 插入。
在这里插入图片描述

Stream barrier首先会被注入到source流的并行数据流中。快照n的barrier被注入的点(我们称之为Sn)是source源流中快照所能覆盖的数据的位置。例如,在Apache Kafka中,这个位置将是分区中拉取数据的偏移量。这个插入点Sn会被报告给检查点协调器(Flink的JobManager)。

当中间操作符从其所有输入流接收到快照n的barrier时,它会开始执行快照,并将状态写入到State backend中,然后会将快照n的barrier继续向下游流动,发送到其所有传出流中。一旦sink操作符(流DAG的末端)从其所有输入流接收到barrier n,它就向检查点协调器确认快照n。在所有sink算子都确认了快照之后,就认为快照已经完成。

一旦快照n完成,作业就不会再向source算子请求Sn之前的记录,因为此时这些记录已经完整地流过了整个DAG数据拓扑。

checkpoint alignment

Checkpoint alignment 机制是 Apache Flink 中用于确保分布式检查点一致性的一种机制。对于接收多个输入流的算子需要在快照barrier上对齐输入流。如下图所示:
在这里插入图片描述

  • 一旦算子从某个输入流通道中接收到快照barrier n,它就不能处理来自该流的任何一条记录(阻塞),直到它从其他所有输入流通道中都接收到barrier n。因为如果不阻塞的话,算子状态将会混合属于快照n的记录和属于快照n+1的记录。
  • 在对齐的过程中,算子只会继续处理来自未出现 Barrier Channel 的数据,而其余 Channel 的数据会被写入输入队列,直至在队列满后被阻塞。
  • 当从最后一个输入流通道中接收到barrier n时,算子开始执行快照,异步地将状态写入到State Backend中,然后将barrier n继续向下游所有输出通道流动。

比起其他分布式快照,该算法的优势在于辅以 Copy-On-Write 技术的情况下不需要 “Stop The World” 影响应用吞吐量,同时基本不用持久化处理中的数据,只用保存进程的状态信息,大大减小了快照的大小。

需要注意的是,对于具有多个输入流的操作符算子,以及在shuffle后接收多个上游子任务输出流的操作符算子,都需要对齐。

checkpoint执行流程

上面介绍完checkpoint的相关原理后,本节尝试逐步解释执行检查点的过程。如下图所示,左侧为checkpoint coordinator,中间为Flink job(由两个源节点和一个汇聚节点组成),右侧为persistent storage(大多数场景下由HDFS提供)。

Step 1) Checkpoint coordinator触发checkpoint执行信号到所有输入流操作符算子中。
在这里插入图片描述

Step 2) 源节点向下游广播一个checkpoint barrier。该checkpoint barrier是Chandy-Lamport分布式快照算法的核心。下游任务只有在接收到所有输入流通道的barrier后才执行checkpoint操作。
在这里插入图片描述

Step 3) 源操作符算子完成state状态备份后,向checkpoint coordinator(检查点协调器)发送备份数据地址,即状态句柄。同时,barrier继续流向下游。
这里分为同步和异步(如果开启的话)两个阶段:
在这里插入图片描述

同步阶段:task执行状态快照,并写入外部存储系统(根据状态后端的选择不同有所区别)执行快照的过程:

  • 对 state 做深拷贝
  • 将写操作封装在异步的 FutureTask 中,FutureTask 的作用包括:1)打开输入流;2)写入状态的元数据信息;3)写入状态;4)关闭输入流

异步阶段

  • 执行同步阶段创建的 FutureTask
  • 向 Checkpoint Coordinator 发送 ACK 响应

Step 4) 当下游sink节点接收到上游两个输入通道的barrier后,开始执行本地快照。下图演示了执行RocksDB增量检查点的过程。RocksDB将全部数据刷新到磁盘,如红色三角形所示。然后,Flink为未上传的文件实现持久备份,如紫色三角形所示。
在这里插入图片描述

Step 5) 在执行完sink操作符算子的检查点之后,sink操作符算子将状态句柄(state handle)返回给checkpoint coordinator检查点协调器。
在这里插入图片描述

Step 6) 当接收到所有任务算子的状态句柄(state handle)后,checkpoint coordinator确认全局的checkpoint已经完成,然后将checkpoint元文件备份到持久化存储中。
在这里插入图片描述

Unaligned Checkpoint

上述对齐的chekcpoint基于Chandy-Lamport算法实现了分布式系统下的数据一致性快照。通过上面的原理可以看出,该方案在操作符算子具有多个输入流通道时,需要阻塞地等待所有输入通道的barrier都到达后才会开始执行快照。这在大多数情况下是没有问题的,但当某个输入流通道比其他输入流通道的数据流动更慢时,比如出现了反压、数据倾斜问题。会导致快照的完成时间变长甚至超时。其次,这种方案来说,Barrier对齐的过程本身就可能成为一个反压的源头,影响上游算法的效率,而这在某些情况下是不必要的。

为了解决这个问题,Flink在1.11版本中引入了Unaligned Checkpoint的特性。其基本思想是,只要输入通道中的的数据能成为操作符算子状态的一部分,那么checkpoint barrier就可以超越所有输入/输出通道中的数据。

Checkpointing can also be performed unaligned. The basic idea is that checkpoints can overtake all in-flight data as long as the in-flight data becomes part of the operator state.

如何来理解呢?

在上面对齐的checkpoint的原理介绍中可以发现,快照只包含了操作符算子的状态,而不关心输入/输出通道的数据记录。这是因为barrier对齐的checkpoint将本地快照延迟至所有barrier到达,也就是说当执行快照时,属于当前checkpoint周期内的数据记录都已经对该算子状态产生了影响,因而不必关心输入队列的剩余数据,同时输出队列又携带着barrier继续流向下一个算子的输入队列,因而输出队列的数据也不必关心,从而巧妙地避免了对算子输入/输出队列的状态进行快照。

但实际上,这和Chandy-Lamport算法是有一定出入的。举个例子,假设我们对两个数据流进行 equal-join,输出匹配上的元素。按照 Flink Aligned Checkpoint 的方式,系统的状态变化如下(图中不同颜色的元素代表属于不同的 Checkpoint 周期):
在这里插入图片描述

  • 图 a: 输入 Channel 1 存在 3 个元素,其中 2 在 Barrier 前面;Channel 2 存在 4 个元素,其中 2、9、7 在 Barrier 前面。
  • 图 b: 算子分别读取 Channel 一个元素,输出 2。随后接收到 Channel 1 的 Barrier,停止处理 Channel 1 后续的数据,只处理 Channel 2 的数据。
  • 图 c: 算子再消费 2 个自 Channel 2 的元素,接收到 Barrier,开始本地快照并输出 Barrier。

对于相同的情况,Chandy-Lamport 算法的状态变化如下:
在这里插入图片描述

  • 图 a: 同上。
  • 图 b: 算子分别处理两个 Channel 一个元素,输出结果 2。此后接收到 Channel 1 的 Barrier,算子开始本地快照记录自己的状态,并输出 Barrier。
  • 图 c: 算子继续正常处理两个 Channel 的输入,输出 9。特别的地方是 Channel 2 后续元素会被保存下来,直到 Channel 2 的 Barrier 出现(即 Channel 2 的 9 和 7)。保存的数据会作为 Channel 的状态成为快照的一部分。

两者的差异主要可以总结为两点:

  1. 快照的触发是在接收到第一个 Barrier 时还是在接收到最后一个 Barrier 时。
  2. 是否需要阻塞已经接收到 Barrier 的 Channel 的计算。

从这两点来看,新的 Unaligned Checkpoint 将快照的触发改为第一个 Barrier 且取消阻塞 Channel 的计算,算法上与 Chandy-Lamport 基本一致,同时在实现细节方面结合 Flink 的定位做了几个改进。

首先,不同于 Chandy-Lamport 模型的只需要考虑算子输入 Channel 的状态,Flink 的算子有输入和输出两种 Channel,在快照时两者的状态都需要被考虑。

其次,无论在 Chandy-Lamport 还是 Flink Aligned Checkpoint 算法中,Barrier 都必须遵循其在数据流中的位置,算子需要等待 Barrier 被实际处理才开始快照。而 Unaligned Checkpoint 改变了这个设定,允许算子优先摄入并优先输出 Barrier。 如此一来,第一个到达 Barrier 会在算子的缓存数据队列(包括输入 Channel 和输出 Channel)中往前跳跃一段距离,而被”插队”的数据和其他输入 Channel 在其 Barrier 之前的数据会被写入快照中(图中绿色部分)。

在这里插入图片描述

上图描述了算子是如何处理非对齐的checkpoint barriers的:

  • 当输入队列中接收到第一个chekcpoint barrier时,算子即开始执行相应处理。
  • 它会立即将该barrier跳过前面的输入队列,并将其插入到输出队列的尾部。
  • 算子在执行快照时,会把所有标记了跳过的数据记录(图中绿色部分),并将其一并写入到算子状态中。

此时,算子只需短暂停止处理输入队列以标记缓冲区、转发barrier并创建其状态的快照。

这样的主要好处是,如果本身算子的处理就是瓶颈,Chandy-Lamport 的 Barrier 仍会被阻塞(因为Chandy-Lamport仍然要等到第一个barrier到达算子时才开始触发快照执行,如果算子的处理本身比较慢,数据的流动仍然会很慢),但 Unaligned Checkpoint 则可以在 Barrier 进入输入 Channel 就马上开始快照。 这可以从很大程度上加快 Barrier 流经整个 DAG 的速度,从而降低 Checkpoint 整体时长。

回到之前的例子,用 Unaligned Checkpoint 来实现,状态变化如下:
在这里插入图片描述

  • 图 a: 输入 Channel 1 存在 3 个元素,其中 2 在 Barrier 前面;Channel 2 存在 4 个元素,其中 2、9、7 在 Barrier 前面。输出 Channel 已存在结果数据 1。
  • 图 b: 算子优先处理输入 Channel 1 的 Barrier,开始本地快照记录自己的状态,并将 Barrier 插到输出 Channel 末端。
  • 图 c: 算子继续正常处理两个 Channel 的输入,输出 2、9。同时算子会将 Barrier 越过的数据(即输入 Channel 1 的 2 和输出 Channel 的 1)写入 Checkpoint,并将输入 Channel 2 后续早于 Barrier 的数据(即 2、9、7)持续写入 Checkpoint。

比起Aligned Checkpoint中不同Checkpoint周期的数据以算子快照为界限分隔得很清晰,Unaligned Checkpoint进行快照和输出Barrier时,部分本属于当前Checkpoint的输入数据还未计算(因此未反映到当前算子状态中),而部分属于当前Checkpoint的输出数据却落到Barrier之后(因此未反映到下游算子的状态中)。这也正是Unaligned的含义:不同Checkpoint周期的数据没有对齐,包括不同输入Channel之间的不对齐,以及输入和输出间的不对齐。而这部分不对齐的数据会被快照记录下来,以在恢复状态时重放。换句话说,从Checkpoint恢复时,不对齐的数据并不能由Source端重放的数据计算得出,同时也没有反应到算子状态中,但因为它们会被Checkpoint恢复到对应Channel中,所以依然能够提供只计算一次的准确结果。

Unaligned Checkpoint方案确保barrier可以尽可能快地在数据流中移动。它特别适合至少有一个缓慢移动的数据输入队列的应用,其对齐时间可能达到几个小时。但是,由于它增加了额外的I/O压力,所以当应用写入State Backend的I/O本身就是瓶颈时,非对齐Checkpoint方案并不会有明显帮助。

Exactly Once vs. At Least Once

为了实现EXACTLY ONCE的语义,Flink使用了输入缓存队列来缓存在对齐过程中队列中传入的数据。同时,我们经过上面Checkpoint原理介绍也能清晰地知道,使用对齐的方式来执行快照是能够实现EXACTLY ONCE的语义的。
需要注意的是,这里的EXACTLY ONCE语义并不意味着每个事件将被精确地处理一次,而是意味着每个事件只会影响Flink算子状态一次。同时,EXACTLY ONCE语义并不能实现端到端的数据EXACTLY ONCE,如果需要实现端到端的EXACTLY ONCE语义,需要sink算子能够实现写入的幂等和事务性。

通常,在checkpoint过程中额外的对齐时间延迟大约是几毫秒,但也可能会有一些异常值的延迟明显增加的情况。对于所有记录都需要超低延迟(几毫秒)的应用程序,Flink有一个开关,可以在检查点期间跳过对齐步骤。此时,当算子接收到每个输入队列的checkpoint barrier时不会阻塞,会继续处理barrier之后的数据记录。这就可能会导致本属于下一个checkpoint周期的数据记录影响了当前checkpoint周期的算子状态,从而导致恢复时数据重复消费的情况,因此,这种模式下只能保证At Least Once语义。

Checkpoint Exactly Once和At Least Once语义配置:

// 启用 Checkpoint 每 5 秒 一次,模式为 EXACTLY_ONCE
env.enableCheckpointing(5000, CheckpointingMode.EXACTLY_ONCE);

// 启用 Checkpoint 每 5 秒 一次,模式为 At Least Once
env.enableCheckpointing(5000, CheckpointingMode.AT_LEAST_ONCE);

另外,Aligned过程只发生在具有多个输入队列(连接)的算子以及具有多个输出队列的算子(比如在重新分区/shuffle之后)。正因为如此,只有单并行度的操作算子(map(), flatMap(), filter(),…)的数据流实际上及时被设置为At Least Once语义,也能实现Exactly once语义(实际上就是单输入流的算子不需要barrier对齐)。

参考

Flink 1.11 Unaligned Checkpoint 解析
Stateful Stream Processing
Flink Checkpoints Principles and Practices: Flink Advanced Tutorials

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2145991.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

linux设置常见开机自启动命令

本文介绍了三种开机自启的方式,重点介绍使用systemctl的方式自启动的 方式一、修改 /etc/rc.d/rc.local 文件 /etc/rc.d/rc.local 文件会在 Linux 系统各项服务都启动完毕之后再被运行。所以你想要自己的脚本在开机后被运行的话,可以将自己脚本路径加到…

Kubernetes从零到精通(12-Ingress、Gateway API)

Ingress和Gateway API都是Kubernetes中用于管理外部访问集群服务的机制,但它们有不同的设计理念和适用场景。它们的基本原理是通过配置规则,将来自外部的网络流量路由到Kubernetes集群内部的服务上。 Ingress/Gateway API和Service Ingress/Gateway API…

边缘计算智能网关的功能应用与优势-天拓四方

在物联网的世界中,数以亿计的设备不断产生、传输和处理数据。然而,传统的云计算架构在面对这些实时性要求高、数据量庞大的物联网数据时,常常面临着网络延迟、带宽限制和安全风险等问题。这时,边缘计算智能网关作为一种新兴的技术…

图书馆座位预约系统小程序的设计

管理员账户功能包括:系统首页,个人中心,用户管理,图书馆管理,座位信息管理,预约选座管理,签到信息管理,系统管理 微信端账号功能包括:系统首页,论坛&#xf…

【Harmony】轮播图特效,持续更新中。。。。

效果预览 swiper官网例子 Swiper 高度可变化 两边等长露出,跟随手指滑动 Swiper 指示器导航点位于 Swiper 下方 一、官网 例子 参考代码: // xxx.ets class MyDataSource implements IDataSource {private list: number[] []constructor(list: nu…

python:Django与Celery配合实现定时任务

Celery是一个基于python开发的分布式任务队列,而做python WEB开发最为流行的框架莫属Django,但是Django的请求处理过程都是同步的无法实现异步任务,若要实现异步任务处理需要通过其他方式(前端的一般解决方案是ajax操作&#xff0…

监控网线和电话线水晶头

监控网线 1、网络摄像机网线接口的线序与B类网线的对应关系(表格从左到右代表线序1-8) 表格解读: (1)请先查看摄像机网线对应的颜色,确定是第一种还是第二种摄像机类型 (2)确定好…

计算机网络基础 - 应用层(3)

计算机网络基础 应用层P2P 应用P2P 体系结构的扩展性BitTorrent 协议torrenl 洪流BitTorrent 运行的过程 P2P文件共享应用非结构化 P2PDHT 结构化 P2P(了解) 视频流和内容分发网视频流化服务HTTP 流和 DASH内容分发网 CDN面临挑战CDN 概述CDN 操作过程集…

MFC获取网页的html文本

使用 CInternetSession 类和 CHttpFile 类&#xff1b; 在stdafx.h中加入 #include <afxinet.h> &#xff1b; 基本的代码如下&#xff0c; void CMFCApplication3Dlg::OnBnClickedButton1() {// TODO: 在此添加控件通知处理程序代码try{CInternetSession session;CH…

4.事件组

事件组的本质:一个整数 里面的每一个bit,表示一类事件 任务A:可以等待这个整数的"bitx,bity,bitz....."都被设置为1. 这就是"AND"的关系 也可以等待这个整数的"bitx bity bitz..."任意一个被设置为1. 事件组有一个特别的地方在于: 1.假设任…

【QML 基础】QML ——描述性脚本语言,用于用户界面的编写

文章目录 1. QML 定义 1. QML 定义 &#x1f427; QML全称为Qt Meta-Object Language&#xff0c;QML是一种描述性的脚本语言&#xff0c;文件格式以.qml结尾。支持javascript形式的编程控制。QML是Qt推出的Qt Quick技术当中的一部分&#xff0c;Qt Quick是 Qt5中用户界面的涵…

React框架搭建,看这一篇就够了,看完你会感谢我

传统搭建框架的方式 在2024年以前&#xff0c;我们构建框架基本上采用官方脚手架&#xff0c;但是官方脚手架其实大概率都不符合我们的项目要求&#xff0c;搭建完了以后往往需要再继续集成一些第三方的包。这时候又会碰到一些版本冲突&#xff0c;配置教程等&#xff0c;往往…

C++入门基础知识九

1.string类对象的容量操作 函数名称功能说明size返回字符串有效长度length返回字符串有效长度capacity返回总空间大小empty检测字符串是否为空&#xff0c;为空返回true&#xff0c;否则falseclear清空有效字符reserve为字符预留空间number大小空间resize将有效字符改为n个&am…

Qt窗口——QToolBar

文章目录 工具栏创建工具栏设置toolTip工具栏配合菜单栏工具栏浮动状态 工具栏 QToolBar工具栏是应用程序中集成各种功能实现快捷键使用的一个区域。 可以有多个&#xff0c;也可以没有。 创建工具栏 #include "mainwindow.h" #include "ui_mainwindow.h&qu…

【Python报错已解决】ModuleNotFoundError: No module named ‘sklearn‘

&#x1f3ac; 鸽芷咕&#xff1a;个人主页 &#x1f525; 个人专栏: 《C干货基地》《粉丝福利》 ⛺️生活的理想&#xff0c;就是为了理想的生活! 专栏介绍 在软件开发和日常使用中&#xff0c;BUG是不可避免的。本专栏致力于为广大开发者和技术爱好者提供一个关于BUG解决的经…

App Fiddler抓包配置

1. 概述 Android Fiddler是一个非常强大的抓包工具&#xff0c;可以用来捕获并分析Android设备上的网络请求和响应。在实现"android fiddler 抓包https"的过程中&#xff0c;我们需要进行以下步骤&#xff1a; 安装Fiddler并配置代理 配置Android设备的网络代理 在Fi…

机器人的动力学——牛顿欧拉,拉格朗日,凯恩

机器人的动力学推导方法有很多&#xff0c;常用得有牛顿&#xff0c;拉格朗日&#xff0c;凯恩等方法&#xff0c;接下来&#xff0c;简单说说他们之间的使用。注&#xff1a;这里不考虑怎么来的&#xff0c;只说怎么应用。 参考1&#xff1a;4-14动力学分析方法-牛顿—欧拉方…

linux下的日志编写

1、日志初始化创建 2、日志写入 3、日志关闭 log.c #include "log.h"static log_t LOG;//初始化日志文件&#xff0c;在当前目录创建日志文件 int log_init(char *pdirname) {time_t t;struct tm *ptm NULL;char filepath[64] {0};int ret 0;time(&t);ptm …

MySQL_表的基本操作

课 程 推 荐我 的 个 人 主 页&#xff1a;&#x1f449;&#x1f449; 失心疯的个人主页 &#x1f448;&#x1f448;入 门 教 程 推 荐 &#xff1a;&#x1f449;&#x1f449; Python零基础入门教程合集 &#x1f448;&#x1f448;虚 拟 环 境 搭 建 &#xff1a;&#x1…

Contact Form 7最新5.9.8版错误修复方案

最近有多位用户反应Contact Form 7最新5.9.8版的管理页面有错误如下图所示 具体错误文件的路径为wp-content\plugins\contact-form-7\admin\includes\welcome-panel.php on line 153 找到welcome-panel.php这个文件编辑它&#xff0c;将如下图选中的部分删除 删除以后&#xf…