title: Flink系列
三、Flink Checkpoint 容错机制原理概述
Flink 提供了 Exactly once 特性,是依赖于带有 barrier 的分布式快照 + 可部分重发的数据源功能实现的。而分布式快照中,就保存了 operator 的状态信息。
Flink 的失败恢复依赖于 检查点机制 + 可部分重发的数据源。
- 一、检查点机制:Checkpoint 定期触发,产生快照,快照中记录了:
-
当前检查点开始时数据源(例如 Kafka)中消息的 offset。
-
记录了所有有状态的 Operator 当前的状态信息(例如 sum 中的数值)。(Application/Job Operator / / Task)
-
通俗的解释:每隔一段时间,就给这个 job 中的所有 Task 的state 做一次持久化,所以所有 Task State 的持久化成功了,则意味着这个 job 在这个时刻的 checkpoint 就成功了, 也就意味着给这个 job 在当前时刻做了一次 checkpoint
- 二、可部分重发的数据源:Flink 选择最近成功完成的检查点,然后系统重放整个分布式的数据流,然后给予每个 Operator 他们在该检查点快照中的状态。数据源被设置为从合适位置开始重新读取流。例如在 Apache Kafka 中,那意味着告诉消费者从偏移量 offset 开始重新消费。
- 通俗的解释:我从你哪儿拉取一部分数据执行消费,数据拉取成功了,但是消费失败。 我要重来一次。那到底从那个地方继续呢?
(1)如果数据源具备数据重放功能:那么没有消费处理成功的数据,就再拉取处理一次
(2)如果数据源不具备数据重放功能: 失败之后就再也拿不到之前拉取的数据了。数据丢失了
分布式消息系统:kafak rocketmq 都是具备数据重放能力的组件。
探讨一个问题:现在让你实现 Flink 的检查点机制的 功能,该怎么做?
图零 整体的:CheckPoint简单设计
图一:当任务失败的时候
当任务运行失败的时候:
拍摄快照的时候,有两种结果是可以接受的:
如果标记放在 7 的后面: offset =7, sum_odd = 16, sum_even = 12
如果标记放在 6 的后面: offset =6, sum_odd = 9, sum_even = 12
如果标记放在 5 的后面: offset =5, sum_odd = 9, sum_even = 6
如果直接把上图中的每个 Task 的状态直接保存,那么就是不合理的(有些数据,上游已经处理了,但是下游没有被处理):
offset =7, sum_odd = 9, sum_even = 12 XXXXXXXXXXXX, 如果 job 从这个状态中执行恢复,则 offset = 7 的这条数据就丢失了,没有参与计算
就一个需求:需要保证每一条数据,都完整的通过了这条处理链路(Source —> Transform ----> Sink)
然后我们重启应用,对应的状态数据已经丢失了。
图二:重启应用
图三:CheckPoint恢复数据
Flink应用程序从checkpoint恢复数据:
图四:Flink应用程序继续运行
Flink应用程序继续运行:
四、Flink CheckPoint 算法原理深入剖析
State 是管理一个 Task 的状态,那么一个 Flink Job 在运行过程中,是由很多的 Task 分布式并行运行组成的。保管和管理一个 Task 的状态,对于一个 Task 的容错来说,非常重要,同样,保存和管理这个 Job 的所有 Task 的状态,并保持一致,也同样非常重要。这是 Flink 的 Job 容错的最终解决方案。
Flink 容错机制的核心是对数据流做连续的分布式快照(snapshots),我们把每一次 take snapshot 动作称之为 Checkpoint。Checkpoint 是 Flink 实现容错机制最核心的功能,它能够根据配置周期性地基于 Stream 中各个 Operator/Task 的状态来生成快照,从而将这些状态数据定期持久化存储下来,当 Flink 程序一旦意外崩溃时,重新运行程序时可以有选择地从这些快照进行恢复,从而修正因为故障带来的程序数据异常。
具体的概念(重要的总结):Flink 的 Checkpoint 机制基于 chandy-lamport 算法,在某一个时刻,对一个 Flink Job 的所有 Task 做一个快照拍摄(逻辑上解释),并且将快照保存在 内存/磁盘 中永久保存,这样子,如果 Flink Job 重启恢复,就可以从故障前最近一次的成功快照中进行状态恢复,从而实现保证 Flink 数据流式数据的一致性。当然,为了配合 Flink 能实现状态快照,并且 job 状态恢复,必须数据源具备数据回放功能。
简单地说,Checkpoint 是一种分布式快照:在某一时刻,对某个 Flink 作业所有的 Task 做一个快照(snapshot),并且将快照保存在 memory / filesystem 等存储系统中。这样,在任务进行故障恢复的时候,就可以还原到任务故障前最近一次检查点的状态,从而保证数据的一致性。当然,为了保证 exactly-once / at-leastonce 的特性,还需要数据源支持数据回放。
实现 Checkpoint 的核心是:Stream Barrier,它和普通消息无异,Stream barrier 作为一种标记信息插入到数据流和正常数据一起流动。barriers 永远不会超过记录,数据流严格有序,barrier 将数据流中的记录隔离成一系列的记录集合,并将一些集合中的数据加入到当前的快照中,而另一些数据加入到下一个快照中。每个 barrier 都带有快照的 ID,并且 barrier 之前的记录都进入了该快照。 barriers 不会中断流处理,非常轻量级。 来自不同快照的多个 barrier 可以同时在流中出现,这意味着多个快照可能并发地发生。
Flink 应用程序中的消息抽象其实是:BufferOrEvent(DataStream 数据流中的每条 记录 的数据抽象对象),它包含两个方面的信息:
01、Buffer:正常的待处理的数据
02、Event:嵌入到数据流中增强引擎流处理能力的特殊消息,包含 CheckpointBarrier 和 WaterMark
03、一个 DataStream 数据流中的数据其实有多种类型: data,checkpointbarrier,watermark
Flink 的 Checkpoint Coordinator 在需要触发检查点的时候要求数据源向数据流中注入 Stream Barrier(具体实现: CheckpointBarrier(checkpointID,timestamp)),当执行 Task 的 Operator 从他所有的 InputChannel 中都收到了 Stream Barrier 则会触发当前的 Operator 的快照拍摄,并向其下游 Operator发送 Stream Barrier。当所有的 SinkOperator 都反馈完成了快照之后, Flink Checkpoint Coordinator 认为 Checkpoint 创建成功。
为了方便大家清楚理解 Checkpoint 的工作机制,在此提供了三张图:
图一:
图二:
图三:
为什么要做对齐?
目的是为了 确保 一条数据如果被一个 Operator/Task 消费,那么就一定要被所有 Operator/Task 消费。否则如果一条数据 被 上游Operator/Task 消费了,但是没有被下游 Operator/Task 消费,那么就会出现数据重复消费或者漏消费!
五、Chandy-Lamport 算法详解
Flink Checkpoint机制是 Flink 可靠性的基石,可以保证 Flink 集群在某个算子因为某些原因(如异常退出)出现故障时,能够将整个应用流图的状态恢复到故障之前的某一状态,保 证应用流图状态的一致性。
Flink 的 Checkpoint 机制就是依靠 Chandy-Lamport 算法进行设计实现。
5.1 步骤一:任务开启
步骤一:任务开启
5.2 步骤二:JobMananger 发起 Checkpoint
步骤二:JobMananger 发起 Checkpoint
1、JobManager 中有一个专门用来负责定期触发 Checkpoint 的定时任务的工作组件: CheckpointCoordinator
2、每次当触发 checkpoint 的时候,其实就是 JobManager 发送 triggerCheckpoint 的 RPC 请求给运行了给 job 的 Souorce Operator Task 所在的从节点(看一下下图,其实就是 Source Operator Task1 和 Source Operator Task)
3、Source Operator 每个 Task 接收到 triggerCheckpoint RPC 请求的时候,其实就是往数据流中嵌入一个 CheckpintBarrier 的消息记录。另外,CheckpintBarrier 拥有全局唯一的 ID, 同一次 checkpoint 的 CheckpointBarrier 的 id 是一样的。
4、当 Source Operator Task1 接收到到 CB(CheckpointBarrier) 的时候,要把 offset = 3 给持久化起来,同样的 Source Operator Task2 也是需要做这个事情。
5.3 步骤三:Source 上报 Checkpoint
步骤三:Source 上报 Checkpoint
1、由于 Source Operator Task1 和 Source Operator Task2 都接收到了一个 CB(CheckpointBarrier), 所以会将自己的 state 状态持久化保存。
2、这两个Source Operator Task 还得向 JobManager 中的 CheckpointCoordinator 进行汇报。那么也就意味着 Source Operator Task1 和 Source Operator Task2 对于此次 Checkpoint 已经执行完了。但是对于整体来说, CheckpointCoordinator 并不会认为这次 checkpoint 成功。因为整体上需要等到所有的 Task 都接收到这个 CB(CheckpointBarrier) 之后执行 state 的checkpiont 成功。
也就是说目前有两个task执行成功了已经汇报给JobManager了,但是其他的task可能还没有汇报。所以要等所有的汇报完毕,才代表整体OK。
3、如果当前 Task 处理 CB(CheckpointBarrier)成功了,那么需要将这个 CB(CheckpointBarrier) 广播到下游 Task 。也就是说下游出现分流的情况,需要在所有的流分区中都嵌入这个CheckpointBarrier 。 看下面图中的中间的四个红颜色的CB。
5.4 步骤四:Task的数据处理
步骤四:Task的数据处理
这个 绿色的4 要不要处理,其实是用来来定的
1、对齐:需要等待所有输入流中的同一个 checkpointID 的 checkpointBarrier 都接收到,然后再做checkpoint
2、非对齐:本该应该要等待所有输入数据流上的同一个 checkpointID 的 checkpointBarrier 都到齐之后做checkppoint,但是没有等。
看下图,本来 绿色的4是属于 CB 之后,也就是说是下一次 checkpoint 本该计算的数据,但是由于没有等待,所以这个Sum Operator Task1 也完成了绿色的4的计算。 造成了重复计算
这一个步骤中的重点:当某一个 Task 没有接收到 所有的 输入流中的同一个 checkpointID 的 checkpointBarrier 的话,是不会执行 checkpoint 的,那么这样的话:先接收到 CB 的输入流中的数据,就会被缓存起来。
CB(checkpointBarrier )之前的数据,都要执行处理
CB(checkpointBarrier )之后的数据,都不执行处理
5.5 步骤五:CheckpointBarrier 对齐
步骤五:CheckpointBarrier 对齐
1、第一个输入流上的 CB 接收到了之后,然后又接收到了 4 和 6, 但只是缓存起来,不做处理。
2、第二个输入流上的 CB 到了之后,那么也就意味着 Sum Operator Task1 的某一次 checkpointID 的所有的 CB 都到了,那就可以执行 checkpoint,所谓执行checkpoint也就是将 Task 的 state 做持久化。
3、工作顺序:当对齐之后,先做 checkpoint ,然后消费缓存中的数据。
5.6 步骤六:Task处理缓存数据
步骤六:Task处理缓存数据
5.7 步骤七:Sink 上报 Checkpoint
步骤七:Sink 上报 Checkpoint
最后得到的结论:
分布式流快照算法的思路:
当触发 checkpoint 的时候,往数据流中嵌入 checkpointbarrier, 其实这个checkpointbarrier就是个标记,表示当前这个checkpointbarrier标记之前的数据需要全部处理完成,标记之后的数据,在做checkpoint 之前不要处理,缓存起来。
其实这一次 checkpoint 执行成功了也就意味着当初嵌入到 CB 之前的数据,都已经执行完了,CB之后的数据都没有执行。
六、Flink Checkpoint 源码级配置详解
Flink 默认 Checkpoint 功能是 disabled 的,想要使用的时候需要先启用,checkpoint 开启之后,checkPointMode 有两种,Exactly-once 和 At-least-once,默认的checkPointMode 是 Exactly-once,Exactly-once 对于大多数应用来说是最合适的。At-least-once 可能用在某些延迟超低的应用程序(始终延迟为几毫秒)。
// 默认 checkpoint 功能是 disabled ,想要使用的时候需要先启用
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 每隔 1000ms 进行启动一个检查点 [设置checkpoint的周期]
env.enableCheckpointing(1000);
// 高级选项:
// 设置模式为 exactly-once (这是默认值)
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
// 检查点必须在一分钟内完成,或者被丢弃 【checkpoint 的超时时间】
env.getCheckpointConfig().setCheckpointTimeout(60000);
// 同时允许多个 checkpoint, 推荐不要改,就是 1 。
env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
// 确保检查点之间有至少 500ms 间隔 【 checkpoint 最小间隔】 下面都是测试数据,仅供参考。没有生产环境参考性。生产环境中自己根据实际情况设置。
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(6000);
// 表示 Flink 的处理程序被 cancel 后,会保留 Checkpoint 数据,以便根据实际需要恢复到指定的Checkpoint
env.getCheckpointConfig().enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
上述代码中,设置的关于 checkpoint 的参数,最终都是给 Jobmanager 中的 CheckpointCoordinator 去使用。
每个版本的参数有点不太一样,具体参照源码中的参数和解释:
// 该方法就是帮助我们去解析 checkpoint 有关所有的配置
public class CheckpointConfig{
/**
* Sets all relevant options contained in the {@link ReadableConfig} such as e.g. {@link
* ExecutionCheckpointingOptions#CHECKPOINTING_MODE}.
*
* <p>It will change the value of a setting only if a corresponding option was set in the {@code
* configuration}. If a key is not present, the current value of a field will remain untouched.
*
* @param configuration a configuration to read the values from
*/
/**
* AA 这个方法的内存的参数解析,是在所有所有的和checkpoint有关的参数解析。
* @param configuration
*/
public void configure(ReadableConfig configuration) {
configuration
//AA checkpoint 的模式 ,模式是 EXACTLY_ONCE
.getOptional(ExecutionCheckpointingOptions.CHECKPOINTING_MODE)
.ifPresent(this::setCheckpointingMode);
configuration
//AA 两次checkpoint之间的间隔时间 。 通过 execution.checkpointing.interval 这个参数来进行设置。
.getOptional(ExecutionCheckpointingOptions.CHECKPOINTING_INTERVAL)
.ifPresent(i -> this.setCheckpointInterval(i.toMillis()));
configuration
//AA execution.checkpointing.timeout = 10 min
.getOptional(ExecutionCheckpointingOptions.CHECKPOINTING_TIMEOUT)
.ifPresent(t -> this.setCheckpointTimeout(t.toMillis()));
configuration
//AA 最多同时运行的checkpoint的数量
.getOptional(ExecutionCheckpointingOptions.MAX_CONCURRENT_CHECKPOINTS)
.ifPresent(this::setMaxConcurrentCheckpoints);
configuration
//AA 两次checkpoint之间的停顿时间 默认是 0
.getOptional(ExecutionCheckpointingOptions.MIN_PAUSE_BETWEEN_CHECKPOINTS)
.ifPresent(m -> this.setMinPauseBetweenCheckpoints(m.toMillis()));
configuration
.getOptional(ExecutionCheckpointingOptions.TOLERABLE_FAILURE_NUMBER)
.ifPresent(this::setTolerableCheckpointFailureNumber);
configuration
.getOptional(ExecutionCheckpointingOptions.EXTERNALIZED_CHECKPOINT)
.ifPresent(this::setExternalizedCheckpointCleanup);
configuration
.getOptional(ExecutionCheckpointingOptions.ENABLE_UNALIGNED)
.ifPresent(this::enableUnalignedCheckpoints);
configuration
.getOptional(ExecutionCheckpointingOptions.CHECKPOINT_ID_OF_IGNORED_IN_FLIGHT_DATA)
.ifPresent(this::setCheckpointIdOfIgnoredInFlightData);
configuration
.getOptional(ExecutionCheckpointingOptions.ALIGNED_CHECKPOINT_TIMEOUT)
.ifPresent(this::setAlignedCheckpointTimeout);
configuration
.getOptional(ExecutionCheckpointingOptions.FORCE_UNALIGNED)
.ifPresent(this::setForceUnalignedCheckpoints);
configuration
.getOptional(CheckpointingOptions.CHECKPOINTS_DIRECTORY)
.ifPresent(this::setCheckpointStorage);
}
}
默认情况下,如果设置了 Checkpoint 选项,则 Flink 只保留最近成功生成的 1 个 Checkpoint,而当 Flink 程序失败时,可以从最近的这个 Checkpoint 来进行恢复。但是,如果我们希望保留多个 Checkpoint,并能够根据实际需要选择其中一个进行恢复,这样会更加灵活,比如,我们发现最近 4 个小时数据记录处理有问题,希望将整个状态还原到 4 小时之前 Flink 可以支持保留多个 Checkpoint,需要在 Flink 的配置文件 conf/flink-conf.yaml 中,添加如下配置,指定最多需要保存Checkpoint 的个数:
state.checkpoints.num-retained: 3
这样设置以后就查看对应的 Checkpoint 在 HDFS 上存储的文件目录
hdfs dfs -ls hdfs://hadoop10/flink/checkpoints
如果希望回退到某个 Checkpoint 点,只需要指定对应的某个 Checkpoint 路径即可实现
如果 Flink 程序异常失败,或者最近一段时间内数据处理错误,我们可以将程序从某一个 Checkpoint 点进行恢复
bin/flink run -s hdfs://hadoop10/flink/checkpoints/xxxx/chk-xx/_metadata flink-job.jar
程序正常运行后,还会按照 Checkpoint 配置进行运行,继续生成 Checkpoint 数据。
当然恢复数据的方式还可以在自己的代码里面指定 Checkpoint 目录,这样下一次启动的时候即使代码发生了改变就自动恢复数据。
七、Flink SavePoint 企业生产实践方案
SavePoint 可以认为是用户手动触发的 checkpoint , checkpoint 是系统自动触发的一个定期执行的工作
SavePoint 是一个重量级的 Checkpoint,你可以把它当做在某个时间点程序状态全局镜像,以后程序在进行升级,或者修改并发度等情况,还能从保存的状态位继续启动恢复。可以保存数据源 offset,Operator 操作状态等信息,可以从应用在过去任意做了 SavePoint 的时刻开始继续消费。
Flink SavePoint 的使用
1、在 flink-conf.yaml 中配置 Savepoint 存储位置
不是必须设置,但是设置后,后面创建指定 Job 的 Savepoint 时,可以不用在手动执行命令时指定 Savepoint 的位置
state.savepoints.dir: hdfs://hadoop10/flink/savepoints
2、触发一个 Savepoint【直接触发或者在 cancel 的时候触发】
停止程序:bin/flink cancel -s [targetDirectory] jobId [-yid yarnAppId]【针对 on yarn 模式需要指定 -yid 参数】
3、从指定的 Savepoint 启动 job
bin/flink run -s savepointPath [runArgs]
Flink Checkpoint 和 SavePoint 的区别到底是什么,这是困扰大家的一个疑惑,整理一下:
维度 | Checkpoint | SavePoint |
---|---|---|
概念 | Flink 提供的一种自动 Task 级别容错机制 | Flink 提供的用于随时保存应用程序全局状态的镜像到 HDFS |
目的 | 用于程序自动容错,快速恢复 Task | 用于应用程序升级,集群迁移,并行度伸缩等 |
用户交互 | Flink 系统行为 | 应用程序管理员手动触发 |
状态文件保留策略 | 系统内服务自动删除,可以通过参数调整 | 会一直保留,由管理员维护 |
用户手动执行,是指向 Checkpoint 的指针,不会过期,在集群升级/代码迁移等情况下使用。
注意:为了能够在作业的不同版本之间以及 Flink 的不同版本之间顺利升级,强烈推荐程序员通过 uuid(String) 方法手动的给算子赋予 ID,这些 ID 将用于确定每一个算子的状态范围。如果不手动给各算子指定 ID,则会由 Flink 自动给每个算子生成一个 ID。只要这些 ID 没有改变就能从保存点(savepoint)将程序恢复回来。而这些自动生成的 ID 依赖于程序的结构,并且对代码的更改是很敏感的。因此,强烈建议用户手动的设置 ID。
声明:
文章中代码及相关语句为自己根据相应理解编写,文章中出现的相关图片为自己实践中的截图和相关技术对应的图片,若有相关异议,请联系删除。感谢。转载请注明出处,感谢。
By luoyepiaoxue2014
B站: https://space.bilibili.com/1523287361 点击打开链接
微博地址: http://weibo.com/luoyepiaoxue2014 点击打开链接