Flink系列知识之:Checkpoint原理
在介绍checkpoint的执行流程之前,需要先明白Flink中状态的存储机制,因为状态对于检查点的持续备份至关重要。
State Backends分类
下图显示了Flink中三个内置的状态存储种类。MemoryStateBackend和FsStateBackend在运行时存储在Java堆中。FsStateBackend仅在执行检查点时才以文件的形式持久地将数据保存到远程存储。RocksDBStateBackend使用RocksDB(一种LSM数据库,结合了内存和磁盘)来存储状态。
当使用基于堆的 state backend 保存状态时,访问和更新涉及在堆上读写对象。但是对于保存在 RocksDBStateBackend 中的对象,访问和更新涉及序列化和反序列化,所以会有更大的开销。但 RocksDB 的状态量仅受本地磁盘大小的限制。还要注意,只有 RocksDBStateBackend 能够进行增量快照,这对于具有大量变化缓慢状态的应用程序来说是大有裨益的。
下面是执行HeapKeyedStateBackend的方法:
- 支持异步检查点(默认):存储格式为CopyOnWriteStateMap。
- 仅支持同步检查点:存储格式为NestedStateMap。
当在MemoryStateBackend中使用HeapKeyedStateBackend时,默认情况下,基于检查点的数据序列化的最大数据量为5mb。
对于RocksDBKeyedStateBackend,每个状态都存储在一个单独的列族中。keyGroup、Key和Namespace被序列化并以键的形式存储在数据库中。
对于RocksDBKeyedStateBackend,每个状态都存储在一个单独的列族中。keyGroup、Key和Namespace被序列化并以键的形式存储在数据库中。
checkpoint执行流程
Flink容错机制的核心部分是绘制分布式数据流和算子状态的一致快照。这些快照充当一致的检查点,系统可以在发生故障时退回到这些检查点。它受到用于分布式快照的标准Chandy-Lamport算法的启发,并专门针对Flink的执行模型进行了定制。
自Flink 1.11以来,检查点可以在对齐或不对齐的情况下进行。在本节中,我们首先描述对齐的检查点。
Checkpoint barrier
Flink分布式快照的一个核心元素是stream barrier。这些barrier会被注入到数据流中,并作为数据流的一部分与记录一起流动。当 Flink 作业设置了检查点时,Flink 会在数据流中插入这些特殊记录,以确保在特定点上所有算子的状态都被一致地保存。barrier永远不会超过记录,它们严格地按顺序流动。barrier将数据流中的记录分隔为进入当前快照的记录集和进入下一个快照的记录集,相当于将连续的数据流切分为多个有限序列,对应多个 Checkpoint 周期。每个barrier都携带着包含了在它前面的记录的快照的ID。barrier不会中断数据流,因此非常轻巧。来自不同快照的多个barrier可以同时在数据流中,这意味着多种快照可能并发发生。整个过程是由 Flink 的执行引擎在运行时负责处理的,通过协调不同操作符之间的信号和状态来实现数据流中的 checkpoint barrier 插入。
Stream barrier首先会被注入到source流的并行数据流中。快照n的barrier被注入的点(我们称之为Sn)是source源流中快照所能覆盖的数据的位置。例如,在Apache Kafka中,这个位置将是分区中拉取数据的偏移量。这个插入点Sn会被报告给检查点协调器(Flink的JobManager)。
当中间操作符从其所有输入流接收到快照n的barrier时,它会开始执行快照,并将状态写入到State backend中,然后会将快照n的barrier继续向下游流动,发送到其所有传出流中。一旦sink操作符(流DAG的末端)从其所有输入流接收到barrier n,它就向检查点协调器确认快照n。在所有sink算子都确认了快照之后,就认为快照已经完成。
一旦快照n完成,作业就不会再向source算子请求Sn之前的记录,因为此时这些记录已经完整地流过了整个DAG数据拓扑。
checkpoint alignment
Checkpoint alignment 机制是 Apache Flink 中用于确保分布式检查点一致性的一种机制。对于接收多个输入流的算子需要在快照barrier上对齐输入流。如下图所示:
- 一旦算子从某个输入流通道中接收到快照barrier n,它就不能处理来自该流的任何一条记录(阻塞),直到它从其他所有输入流通道中都接收到barrier n。因为如果不阻塞的话,算子状态将会混合属于快照n的记录和属于快照n+1的记录。
- 在对齐的过程中,算子只会继续处理来自未出现 Barrier Channel 的数据,而其余 Channel 的数据会被写入输入队列,直至在队列满后被阻塞。
- 当从最后一个输入流通道中接收到barrier n时,算子开始执行快照,异步地将状态写入到State Backend中,然后将barrier n继续向下游所有输出通道流动。
比起其他分布式快照,该算法的优势在于辅以 Copy-On-Write 技术的情况下不需要 “Stop The World” 影响应用吞吐量,同时基本不用持久化处理中的数据,只用保存进程的状态信息,大大减小了快照的大小。
需要注意的是,对于具有多个输入流的操作符算子,以及在shuffle后接收多个上游子任务输出流的操作符算子,都需要对齐。
checkpoint执行流程
上面介绍完checkpoint的相关原理后,本节尝试逐步解释执行检查点的过程。如下图所示,左侧为checkpoint coordinator,中间为Flink job(由两个源节点和一个汇聚节点组成),右侧为persistent storage(大多数场景下由HDFS提供)。
Step 1) Checkpoint coordinator触发checkpoint执行信号到所有输入流操作符算子中。
Step 2) 源节点向下游广播一个checkpoint barrier。该checkpoint barrier是Chandy-Lamport
分布式快照算法的核心。下游任务只有在接收到所有输入流通道的barrier后才执行checkpoint操作。
Step 3) 源操作符算子完成state状态备份后,向checkpoint coordinator(检查点协调器)发送备份数据地址,即状态句柄。同时,barrier继续流向下游。
这里分为同步和异步(如果开启的话)两个阶段:
同步阶段:task执行状态快照,并写入外部存储系统(根据状态后端的选择不同有所区别)执行快照的过程:
- 对 state 做深拷贝
- 将写操作封装在异步的 FutureTask 中,FutureTask 的作用包括:1)打开输入流;2)写入状态的元数据信息;3)写入状态;4)关闭输入流
⠀异步阶段:
- 执行同步阶段创建的 FutureTask
- 向 Checkpoint Coordinator 发送 ACK 响应
Step 4) 当下游sink节点接收到上游两个输入通道的barrier后,开始执行本地快照。下图演示了执行RocksDB增量检查点的过程。RocksDB将全部数据刷新到磁盘,如红色三角形所示。然后,Flink为未上传的文件实现持久备份,如紫色三角形所示。
Step 5) 在执行完sink操作符算子的检查点之后,sink操作符算子将状态句柄(state handle)返回给checkpoint coordinator检查点协调器。
Step 6) 当接收到所有任务算子的状态句柄(state handle)后,checkpoint coordinator确认全局的checkpoint已经完成,然后将checkpoint元文件备份到持久化存储中。
Unaligned Checkpoint
上述对齐的chekcpoint基于Chandy-Lamport算法实现了分布式系统下的数据一致性快照。通过上面的原理可以看出,该方案在操作符算子具有多个输入流通道时,需要阻塞地等待所有输入通道的barrier都到达后才会开始执行快照。这在大多数情况下是没有问题的,但当某个输入流通道比其他输入流通道的数据流动更慢时,比如出现了反压、数据倾斜问题。会导致快照的完成时间变长甚至超时。其次,这种方案来说,Barrier对齐的过程本身就可能成为一个反压的源头,影响上游算法的效率,而这在某些情况下是不必要的。
为了解决这个问题,Flink在1.11版本中引入了Unaligned Checkpoint的特性。其基本思想是,只要输入通道中的的数据能成为操作符算子状态的一部分,那么checkpoint barrier就可以超越所有输入/输出通道中的数据。
Checkpointing can also be performed unaligned. The basic idea is that checkpoints can overtake all in-flight data as long as the in-flight data becomes part of the operator state.
如何来理解呢?
在上面对齐的checkpoint的原理介绍中可以发现,快照只包含了操作符算子的状态,而不关心输入/输出通道的数据记录。这是因为barrier对齐的checkpoint将本地快照延迟至所有barrier到达,也就是说当执行快照时,属于当前checkpoint周期内的数据记录都已经对该算子状态产生了影响,因而不必关心输入队列的剩余数据,同时输出队列又携带着barrier继续流向下一个算子的输入队列,因而输出队列的数据也不必关心,从而巧妙地避免了对算子输入/输出队列的状态进行快照。
但实际上,这和Chandy-Lamport算法是有一定出入的。举个例子,假设我们对两个数据流进行 equal-join,输出匹配上的元素。按照 Flink Aligned Checkpoint 的方式,系统的状态变化如下(图中不同颜色的元素代表属于不同的 Checkpoint 周期):
- 图 a: 输入 Channel 1 存在 3 个元素,其中 2 在 Barrier 前面;Channel 2 存在 4 个元素,其中 2、9、7 在 Barrier 前面。
- 图 b: 算子分别读取 Channel 一个元素,输出 2。随后接收到 Channel 1 的 Barrier,停止处理 Channel 1 后续的数据,只处理 Channel 2 的数据。
- 图 c: 算子再消费 2 个自 Channel 2 的元素,接收到 Barrier,开始本地快照并输出 Barrier。
对于相同的情况,Chandy-Lamport 算法的状态变化如下:
- 图 a: 同上。
- 图 b: 算子分别处理两个 Channel 一个元素,输出结果 2。此后接收到 Channel 1 的 Barrier,算子开始本地快照记录自己的状态,并输出 Barrier。
- 图 c: 算子继续正常处理两个 Channel 的输入,输出 9。特别的地方是 Channel 2 后续元素会被保存下来,直到 Channel 2 的 Barrier 出现(即 Channel 2 的 9 和 7)。保存的数据会作为 Channel 的状态成为快照的一部分。
两者的差异主要可以总结为两点:
- 快照的触发是在接收到第一个 Barrier 时还是在接收到最后一个 Barrier 时。
- 是否需要阻塞已经接收到 Barrier 的 Channel 的计算。
从这两点来看,新的 Unaligned Checkpoint 将快照的触发改为第一个 Barrier 且取消阻塞 Channel 的计算,算法上与 Chandy-Lamport 基本一致,同时在实现细节方面结合 Flink 的定位做了几个改进。
首先,不同于 Chandy-Lamport 模型的只需要考虑算子输入 Channel 的状态,Flink 的算子有输入和输出两种 Channel,在快照时两者的状态都需要被考虑。
其次,无论在 Chandy-Lamport 还是 Flink Aligned Checkpoint 算法中,Barrier 都必须遵循其在数据流中的位置,算子需要等待 Barrier 被实际处理才开始快照。而 Unaligned Checkpoint 改变了这个设定,允许算子优先摄入并优先输出 Barrier。 如此一来,第一个到达 Barrier 会在算子的缓存数据队列(包括输入 Channel 和输出 Channel)中往前跳跃一段距离,而被”插队”的数据和其他输入 Channel 在其 Barrier 之前的数据会被写入快照中(图中绿色部分)。
上图描述了算子是如何处理非对齐的checkpoint barriers的:
- 当输入队列中接收到第一个chekcpoint barrier时,算子即开始执行相应处理。
- 它会立即将该barrier跳过前面的输入队列,并将其插入到输出队列的尾部。
- 算子在执行快照时,会把所有标记了跳过的数据记录(图中绿色部分),并将其一并写入到算子状态中。
此时,算子只需短暂停止处理输入队列以标记缓冲区、转发barrier并创建其状态的快照。
这样的主要好处是,如果本身算子的处理就是瓶颈,Chandy-Lamport 的 Barrier 仍会被阻塞(因为Chandy-Lamport仍然要等到第一个barrier到达算子时才开始触发快照执行,如果算子的处理本身比较慢,数据的流动仍然会很慢),但 Unaligned Checkpoint 则可以在 Barrier 进入输入 Channel 就马上开始快照。 这可以从很大程度上加快 Barrier 流经整个 DAG 的速度,从而降低 Checkpoint 整体时长。
回到之前的例子,用 Unaligned Checkpoint 来实现,状态变化如下:
- 图 a: 输入 Channel 1 存在 3 个元素,其中 2 在 Barrier 前面;Channel 2 存在 4 个元素,其中 2、9、7 在 Barrier 前面。输出 Channel 已存在结果数据 1。
- 图 b: 算子优先处理输入 Channel 1 的 Barrier,开始本地快照记录自己的状态,并将 Barrier 插到输出 Channel 末端。
- 图 c: 算子继续正常处理两个 Channel 的输入,输出 2、9。同时算子会将 Barrier 越过的数据(即输入 Channel 1 的 2 和输出 Channel 的 1)写入 Checkpoint,并将输入 Channel 2 后续早于 Barrier 的数据(即 2、9、7)持续写入 Checkpoint。
比起Aligned Checkpoint中不同Checkpoint周期的数据以算子快照为界限分隔得很清晰,Unaligned Checkpoint进行快照和输出Barrier时,部分本属于当前Checkpoint的输入数据还未计算(因此未反映到当前算子状态中),而部分属于当前Checkpoint的输出数据却落到Barrier之后(因此未反映到下游算子的状态中)。这也正是Unaligned的含义:不同Checkpoint周期的数据没有对齐,包括不同输入Channel之间的不对齐,以及输入和输出间的不对齐。而这部分不对齐的数据会被快照记录下来,以在恢复状态时重放。换句话说,从Checkpoint恢复时,不对齐的数据并不能由Source端重放的数据计算得出,同时也没有反应到算子状态中,但因为它们会被Checkpoint恢复到对应Channel中,所以依然能够提供只计算一次的准确结果。
Unaligned Checkpoint方案确保barrier可以尽可能快地在数据流中移动。它特别适合至少有一个缓慢移动的数据输入队列的应用,其对齐时间可能达到几个小时。但是,由于它增加了额外的I/O压力,所以当应用写入State Backend的I/O本身就是瓶颈时,非对齐Checkpoint方案并不会有明显帮助。
Exactly Once vs. At Least Once
为了实现EXACTLY ONCE的语义,Flink使用了输入缓存队列来缓存在对齐过程中队列中传入的数据。同时,我们经过上面Checkpoint原理介绍也能清晰地知道,使用对齐的方式来执行快照是能够实现EXACTLY ONCE的语义的。
需要注意的是,这里的EXACTLY ONCE语义并不意味着每个事件将被精确地处理一次,而是意味着每个事件只会影响Flink算子状态一次。同时,EXACTLY ONCE语义并不能实现端到端的数据EXACTLY ONCE,如果需要实现端到端的EXACTLY ONCE语义,需要sink算子能够实现写入的幂等和事务性。
通常,在checkpoint过程中额外的对齐时间延迟大约是几毫秒,但也可能会有一些异常值的延迟明显增加的情况。对于所有记录都需要超低延迟(几毫秒)的应用程序,Flink有一个开关,可以在检查点期间跳过对齐步骤。此时,当算子接收到每个输入队列的checkpoint barrier时不会阻塞,会继续处理barrier之后的数据记录。这就可能会导致本属于下一个checkpoint周期的数据记录影响了当前checkpoint周期的算子状态,从而导致恢复时数据重复消费的情况,因此,这种模式下只能保证At Least Once语义。
Checkpoint Exactly Once和At Least Once语义配置:
// 启用 Checkpoint 每 5 秒 一次,模式为 EXACTLY_ONCE
env.enableCheckpointing(5000, CheckpointingMode.EXACTLY_ONCE);
// 启用 Checkpoint 每 5 秒 一次,模式为 At Least Once
env.enableCheckpointing(5000, CheckpointingMode.AT_LEAST_ONCE);
另外,Aligned过程只发生在具有多个输入队列(连接)的算子以及具有多个输出队列的算子(比如在重新分区/shuffle之后)。正因为如此,只有单并行度的操作算子(map(), flatMap(), filter(),…)的数据流实际上及时被设置为At Least Once语义,也能实现Exactly once语义(实际上就是单输入流的算子不需要barrier对齐)。
参考
Flink 1.11 Unaligned Checkpoint 解析
Stateful Stream Processing
Flink Checkpoints Principles and Practices: Flink Advanced Tutorials