【Flink状态管理五】Checkpoint的设计与实现

news2025/3/1 18:31:54

文章目录

  • 1. Checkpoint的整体设计
  • 2. Checkpoint创建源码解析
    • 2.1. DefaultExecutionGraphBuilder.buildGraph
    • 2.2. ExecutionGraph.enableCheckpointing

由于系统原因导致Flink作业无法正常运行的情况非常多,且很多时候都是无法避免的。对于Flink集群来讲,能够快速从异常状态中恢复,同时保证处理数据的正确性和一致性非常重要。Flink主要借助Checkpoint的方式保障整个系统状态数据的一致性,也就是基于ABS算法实现轻量级快照服务。

本节我们详细了解Checkpoint的设计与实现。

 

1. Checkpoint的整体设计

Checkpoint的执行过程分为三个阶段:启动、执行以及确认完成。其中Checkpoint的启动过程由JobManager管理节点中的CheckpointCoordinator组件控制,该组件会周期性地向数据源节点发送执行Checkpoint的请求,执行频率取决于用户配置的CheckpointInterval参数。

执行过程:

  1. 在JobManager管理节点通过CheckpointCoordinator组件向每个数据源节点发送Checkpoint执行请求,此时数据源节点中的算子会将消费数据对应的Position发送到JobManager管理节点中。
  2. JobManager节点会存储Checkpoint元数据,用于记录每次执行Checkpoint操作过程中算子的元数据信息,例如在FlinkKafkaConsumer中会记录消费Kafka主题的偏移量,用于确认从Kafka主题中读取数据的位置。
  3. 在数据源节点执行完Checkpoint操作后,继续向下游节点发送CheckpointBarrier事件,下游算子通过对齐Barrier事件,触发该算子的Checkpoint操作。
    当下游的map算子接收到数据源节点的Checkpoint
    Barrier事件后,首先对当前算子的数据进行处理,并等待其他上游数据源节点的Barrier事件到达。该过程就是Checkpoint
    Barrier对齐,目的是确保属于同一Checkpoint的数据能够全部到达当前节点。

在这里插入图片描述

Barrier事件的作用就是切分不同Checkpoint批次的数据。

  • 当map算子接收到所有上游的Barrier事件后,就会触发当前算子的Checkpoint操作,并将状态数据快照到指定的外部持久化介质中,该操作主要借助状态后端存储实现。

  • 当状态数据执行完毕后,继续将Barrier事件发送至下游的算子,进行后续算子的Checkpoint操作。

  • 另外,在map算子中执行完Checkpoint操作后,也会向JobManager管理节点发送Ack消息,确认当前算子的Checkpoint操作正常执行。此时Checkpoint数据会存储该算子对应的状态数据,如果StateBackend为MemoryStateBackend,则主要会将状态数据存储在JobManager的堆内存中

sink节点的ack

像map算子节点一样,当Barrier事件到达sink类型的节点后,sink节点也会进行Barrier对齐操作,确认上游节点的数据全部接入。然后对接入的数据进行处理,将结果输出到外部系统中。完成以上步骤后,sink节点会向JobManager管理节点发送Ack确认消息,确认当前Checkpoint中的状态数据都正常进行了持久化操作。(之后呢?当任务结束之后,cp会消失还是?)

 

2. Checkpoint创建源码解析

通过调用StreamExecutionEnvironment.enableCheckpointing(),开启Checkpoint。
此时Checkpoint的配置会被存储在StreamGraph中,然后将StreamGraph中的CheckpointConfig转换为JobCheckpointingSettings数据结构存储在JobGraph对象中,并伴随JobGraph提交到集群运行。启动JobMaster服务后,JobMaster调度和执行Checkpoint操作。

2.1. DefaultExecutionGraphBuilder.buildGraph

如下代码,通过JobGraph构建ExecutionGraph的过程中,获取JobGraph中存储的JobCheckpointingSettings配置,然后创建ExecutionGraph。

1)根据snapshotSettings配置获取triggerVertices、ackVertices以及confirmVertices节点集合,并转换为对应的ExecutionJobVertex集合。

  • 其中triggerVertices集合存储了所有SourceOperator节点,这些节点通过CheckpointCoordinator主动触发Checkpoint操作。
  • ackVertices和confirmVertices集合存储了StreamGraph中的全部节点,代表所有节点都需要返回Ack确认信息并确认Checkpoint执行成功。

2)创建CompletedCheckpointStore组件,用于存储Checkpoint过程中的元数据。

  • 当对作业进行恢复操作时会在CompletedCheckpointStore中检索最新完成的Checkpoint元数据信息,然后基于元数据信息恢复Checkpoint中存储的状态数据。CompletedCheckpointStore有两种实现,分别为StandaloneCompletedCheckpointStore和ZooKeeperCompletedCheckpointStore。
  • 在CompletedCheckpointStore中通过maxNumberOfCheckpointsToRetain参数配置以及结合checkpointIdCounter计数器保证只会存储固定数量的CompletedCheckpoint。

3)创建CheckpointStatsTracker实例
用于监控和追踪Checkpoint执行和更新的情况,包括Checkpoint执行的统计信息以及执行状况,WebUI中显示的Checkpoint监控数据主要来自CheckpointStatsTracker。

4)创建StateBackend,从UserClassLoader中反序列化出应用指定的StateBackend并设定为applicationConfiguredBackend。

5)初始化用户自定义的Checkpoint Hook函数

6)最终调用executionGraph.enableCheckpointing()方法,在作业的执行和调度过程中开启Checkpoint。

// 配置状态数据checkpointing
// 从jobGraph中获取JobCheckpointingSettings
JobCheckpointingSettings snapshotSettings = jobGraph.getCheckpointingSettings();
//如果snapshotSettings不为空,则开启checkpoint功能
if (snapshotSettings != null) {
   List<ExecutionJobVertex> triggerVertices =
         idToVertex(snapshotSettings.getVerticesToTrigger(), executionGraph);
   List<ExecutionJobVertex> ackVertices =
         idToVertex(snapshotSettings.getVerticesToAcknowledge(), executionGraph);
   List<ExecutionJobVertex> confirmVertices =
         idToVertex(snapshotSettings.getVerticesToConfirm(), executionGraph);
   //创建CompletedCheckpointStore
   CompletedCheckpointStore completedCheckpoints;
   CheckpointIDCounter checkpointIdCounter;
   try {
      int maxNumberOfCheckpointsToRetain = jobManagerConfig.getInteger(
          CheckpointingOptions.MAX_RETAINED_CHECKPOINTS);
      if (maxNumberOfCheckpointsToRetain <= 0) {
         maxNumberOfCheckpointsToRetain = CheckpointingOptions.MAX_RETAINED_
            CHECKPOINTS.defaultValue();
      }
      // 通过recoveryFactory创建CheckpointStore
      completedCheckpoints = recoveryFactory.createCheckpointStore(jobId, 
         maxNumberOfCheckpointsToRetain, classLoader);   
      // 通过recoveryFactory创建CheckpointIDCounter
      checkpointIdCounter = recoveryFactory.createCheckpointIDCounter(jobId);
   }
   catch (Exception e) {
      throw new JobExecutionException(jobId, "Failed to initialize high-
         availability checkpoint handler", e);
   }
   // 获取checkpoints最长的记录次数
   int historySize = jobManagerConfig.getInteger(WebOptions.CHECKPOINTS_HISTORY_SIZE);
   // 创建CheckpointStatsTracker实例
   CheckpointStatsTracker checkpointStatsTracker = new CheckpointStatsTracker(
         historySize,
         ackVertices,
         snapshotSettings.getCheckpointCoordinatorConfiguration(),
         metrics);
   // 从application中获取StateBackend
   final StateBackend applicationConfiguredBackend;
   final SerializedValue<StateBackend> serializedAppConfigured = 
      snapshotSettings.getDefaultStateBackend();
   if (serializedAppConfigured == null) {
      applicationConfiguredBackend = null;
   }
   else {
      try {
         applicationConfiguredBackend = serializedAppConfigured.
            deserializeValue(classLoader);
      } catch (IOException | ClassNotFoundException e) {
         throw new JobExecutionException(jobId,
            "Could not deserialize application-defined state backend.", e);
      }
   }
   // 获取最终的rootBackend
   final StateBackend rootBackend;
   try {
      rootBackend = StateBackendLoader.fromApplicationOrConfigOrDefault(
         applicationConfiguredBackend, jobManagerConfig, classLoader, log);
   }
   catch (IllegalConfigurationException | IOException | 
      DynamicCodeLoadingException e) {
         throw new JobExecutionException(jobId, 
            "Could not instantiate configured state backend", e);
   }
   // 初始化用户自定义的checkpoint Hooks函数
   final SerializedValue<MasterTriggerRestoreHook.Factory[]> serializedHooks = 
      snapshotSettings.getMasterHooks();
   final List<MasterTriggerRestoreHook<?>> hooks;
   // 如果serializedHooks为空,则hooks为空
   if (serializedHooks == null) {
      hooks = Collections.emptyList();
   }
   else {
   // 加载MasterTriggerRestoreHook
      final MasterTriggerRestoreHook.Factory[] hookFactories;
      try {
         hookFactories = serializedHooks.deserializeValue(classLoader);
      }
      catch (IOException | ClassNotFoundException e) {
         throw new JobExecutionException(jobId, 
            "Could not instantiate user-defined checkpoint hooks", e);
      }
      // 设定ClassLoader为UserClassLoader
      final Thread thread = Thread.currentThread();
      final ClassLoader originalClassLoader = thread.getContextClassLoader();
      thread.setContextClassLoader(classLoader);
      // 创建hooks函数
      try {
         hooks = new ArrayList<>(hookFactories.length);
         for (MasterTriggerRestoreHook.Factory factory : hookFactories) {
            hooks.add(MasterHooks.wrapHook(factory.create(), classLoader));
         }
      }
      // 将thread的ContextClassLoader设定为originalClassLoader
      finally {
         thread.setContextClassLoader(originalClassLoader);
      }
   }
   // 获取CheckpointCoordinatorConfiguration
   final CheckpointCoordinatorConfiguration chkConfig = 
      snapshotSettings.getCheckpointCoordinatorConfiguration();
   // 开启executionGraph中的Checkpoint功能
   executionGraph.enableCheckpointing(
      chkConfig,
      triggerVertices,
      ackVertices,
      confirmVertices,
      hooks,
      checkpointIdCounter,
      completedCheckpoints,
      rootBackend,
      checkpointStatsTracker);
}

 

2.2. ExecutionGraph.enableCheckpointing

继续看ExecutionGraph.enableCheckpointing()方法的实现,包含如下逻辑。

  1. 将tasksToTrigger、tasksToWaitFor以及tasksToCommitTo三个ExecutionJobVertex集合转换为ExecutionVertex[]数组,每个ExecutionVertex代表ExecutionJobVertex中的一个SubTask节点。
  2. 容错管理:创建CheckpointFailureManager,用于Checkpoint执行过程中的容错管理,包含failJob和failJobDueToTaskFailure两个处理方法。
  3. 定时调度和执行:创建checkpointCoordinatorTimer,用于Checkpoint异步线程的定时调度和执行
  4. 协调和管理作业中的Checkpoint:创建CheckpointCoordinator组件,通过CheckpointCoordinator协调和管理作业中的Checkpoint,同时收集各Task节点中Checkpoint的执行状况等信息。
  5. Hook:将Master Hook注册到CheckpointCoordinator中,实现用户自定义Hook代码的调用。
  6. 控制CheckpointCoordinator的启停:将JobStatusListener的实现类CheckpointCoordinatorDeActivator注册到JobManager中,此时系统会根据作业的运行状态控制CheckpointCoordinator的启停,当作业的状态为Running时会触发启动CheckpointCoordinator组件。
public void enableCheckpointing(
      CheckpointCoordinatorConfiguration chkConfig,
      List<ExecutionJobVertex> verticesToTrigger,
      List<ExecutionJobVertex> verticesToWaitFor,
      List<ExecutionJobVertex> verticesToCommitTo,
      List<MasterTriggerRestoreHook<?>> masterHooks,
      CheckpointIDCounter checkpointIDCounter,
      CompletedCheckpointStore checkpointStore,
      StateBackend checkpointStateBackend,
      CheckpointStatsTracker statsTracker) {
   checkState(state == JobStatus.CREATED, "Job must be in CREATED state");
   checkState(checkpointCoordinator == null, "checkpointing already enabled");
   ExecutionVertex[] tasksToTrigger = collectExecutionVertices(verticesToTrigger);
   ExecutionVertex[] tasksToWaitFor = collectExecutionVertices(verticesToWaitFor);
   ExecutionVertex[] tasksToCommitTo = collectExecutionVertices(verticesToCommitTo);
   checkpointStatsTracker = checkNotNull(statsTracker, "CheckpointStatsTracker");
   // 创建CheckpointFailureManager
   CheckpointFailureManager failureManager = new CheckpointFailureManager(
      chkConfig.getTolerableCheckpointFailureNumber(),
      new CheckpointFailureManager.FailJobCallback() {
         @Override
         public void failJob(Throwable cause) {
            getJobMasterMainThreadExecutor().execute(() -> failGlobal(cause));
         }
         @Override
         public void failJobDueToTaskFailure(Throwable cause, 
                                             ExecutionAttemptID failingTask) {
            getJobMasterMainThreadExecutor()
               .execute(()  -> failGlobalIfExecutionIsStillRunning(cause, 
                  failingTask));
         }
      }
   );
   // 创建checkpointCoordinatorTimer
   checkState(checkpointCoordinatorTimer == null);
   checkpointCoordinatorTimer = Executors.newSingleThreadScheduledExecutor(
      new DispatcherThreadFactory(
         Thread.currentThread().getThreadGroup(), "Checkpoint Timer"));
   // 创建checkpointCoordinator
   checkpointCoordinator = new CheckpointCoordinator(
      jobInformation.getJobId(),
      chkConfig,
      tasksToTrigger,
      tasksToWaitFor,
      tasksToCommitTo,
      checkpointIDCounter,
      checkpointStore,
      checkpointStateBackend,
      ioExecutor,
      new ScheduledExecutorServiceAdapter(checkpointCoordinatorTimer),
      SharedStateRegistry.DEFAULT_FACTORY,
      failureManager);
   // 向checkpoint Coordinator中注册master Hooks
   for (MasterTriggerRestoreHook<?> hook : masterHooks) {
      if (!checkpointCoordinator.addMasterHook(hook)) {
         LOG.warn("Trying to register multiple checkpoint hooks with the name: {}",
                  hook.getIdentifier());
      }
   }
   //向checkpointCoordinator中设定checkpointStatsTracker
   checkpointCoordinator.setCheckpointStatsTracker(checkpointStatsTracker);
     // 注册JobStatusListener,用于自动启动CheckpointCoordinator
   if (chkConfig.getCheckpointInterval() != Long.MAX_VALUE) {
      registerJobStatusListener(checkpointCoordinator.
         createActivatorDeactivator());
   }
   this.stateBackendName = checkpointStateBackend.getClass().getSimpleName();
}

 

参考:《Flink设计与实现:核心原理与源码解析》–张利兵

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1459648.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

构造函数,原型,实例,类的关系整理

视频来源js原型链、构造函数和类_哔哩哔哩_bilibili 如视频所说&#xff0c;构造函数的prototype指向原型&#xff0c;实例化的对象的__proto__指向原型&#xff0c;原型通过constructor指向构造函数&#xff0c;正如class里面的constructor方法就相当于Person构造函数一样&am…

阿里云 OSS

阿里云对象存储服务&#xff08;Object Storage Service&#xff0c;简称 OSS&#xff09; OSS 为 Object Storage Service&#xff0c;即对象存储服务。是阿里云提供的海量、安全、低成本、高可靠的云存储服务。 OSS 具有与平台无关的 RESTful API 接口&#xff0c;可以在任…

网关服务gateway注册Consul时报错Consul service ids must not be empty

网关服务gateway启动时&#xff0c;初始化Consul相关配置时报错。 Consul service ids must not be empty, must start with a letter, end with a letter or digit, and have as interior characters only letters, digits, and hyphen: cbda-server-gateway:10.111.236.142:…

VSCode使用Remote-SSH连接服务器时报错:启动服务器失败问题

VSCode使用Remote-SSH连接服务器时报错&#xff1a;启动服务器失败问题 问题描述解决方法引用 问题描述 第一天上班&#xff0c;回来发现又不能使用VScode连不上服务器了&#xff0c;在「输出」栏出现了一直报 Waiting for server log… 的情况&#xff01;本来以为是普通的连接…

【毕业设计推荐】基于MATLAB的水果分级系统设计与实现

一、课题介绍 现在商业行为中&#xff0c;在水果出厂前都需要进行质量检测&#xff0c;需要将不同等级的水果进行分级包装&#xff0c;以保证商业利益最大化。可是传统方法都是依靠人工进行检测&#xff0c;效率低下&#xff0c;主观成分大&#xff0c;并不能很好客观地评价出货…

c++:蓝桥杯中的基础算法1(枚举,双指针)

枚举 基础概念&#xff1a; 枚举&#xff08;Enum&#xff09;是一种用户定义的数据类型&#xff0c;用于定义一个有限集合的命名常量。在C中&#xff0c;枚举类型可以通过关键字enum来定义。 下面是一个简单的枚举类型的定义示例&#xff1a; #include <iostream>enum…

Kalman滤波器的原理及Matlab代码实例

Kalman滤波是一种用于估计系统状态的优秀滤波方法&#xff0c;特别适用于具有噪声的测量数据的情况。它的主要应用包括导航、目标跟踪、信号处理、机器人技术等领域。Kalman滤波器通过融合系统模型和实际测量数据&#xff0c;提供对系统状态的最优估计。 Kalman滤波器的原理基…

第3.1章:StarRocks数据导入——Insert into 同步模式

一、概述 在StarRocks中&#xff0c;insert的语法和mysql等数据库的语法类似&#xff0c;并且每次insert into操作都是一次完整的导入事务。 主要的 insertInto 命令包含以下两种&#xff1a; insert into tbl select ...insert into tbl (col1, col2, ...) values (1, 2, ...…

学习鸿蒙基础(5)

一、honmonyos的page路由界面的路径 新建了一个page,然后删除了。运行模拟器的时候报错了。提示找不到这个界面。原来是在路由界面没有删除这个page。新手刚接触找了半天才找到这个路由。在resources/base/profile/main_pages.json 这个和微信小程序好类似呀。 吐槽&#xf…

Python学习-流程图、分支与循环(branch and loop)

十、流程图 1、流程图&#xff08;Flowchart&#xff09; 流程图是一种用于表示算法或代码流程的框图组合&#xff0c;它以不同类型的框框代表不同种类的程序步骤&#xff0c;每两个步骤之间以箭头连接起来。 好处&#xff1a; 1&#xff09;代码的指导文档 2&#xff09;有助…

电脑恢复删除数据的原理和方法

在恢复数据的时候&#xff0c;很多人都会问&#xff0c;为什么删除的数据还能恢复&#xff1f;本篇和大家一起了解下硬盘上数据的存储方式&#xff0c;文件被删除的时候具体发生了什么&#xff0c;帮助大家理解数据恢复的基本原理。最后还会分享一个好用的数据恢复工具并附上图…

Reactive到Spring WebFlux的来龙去脉

感谢下述博客作者提供的干货。本文只是做一个整理&#xff0c;归纳&#xff0c;以供自己或者他人学习之用。 一文弄懂 Spring WebFlux 的来龙去脉 - 知乎概述本文将通过对 Reactive 以及相关概念的解释引出 Spring-WebFlux&#xff0c;并通过一些示例向读者解释 基于 Spring-W…

C# 使用RestSharp封装一个常用的http请求方法

Nuget安装RestSharp版本&#xff0c;不同版本之间的区别有很大&#xff0c;当前这个写法基于以下版本 public class APIHelper{private readonly string baseUrl ConfigurationManager.AppSettings["connectionString"].ToString(); /// <summary>/// http请…

ELK入门(二)- springboot整合ES

springboot整合elasticsearch 引用依赖 <?xml version"1.0" encoding"UTF-8"?> <project xmlns"http://maven.apache.org/POM/4.0.0"xmlns:xsi"http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation"http…

基于情感分析的网上图书推荐系统

项目&#xff1a;基于情感分析的网上图书推荐系统 摘 要 基于网络爬虫的数据可视化服务系统是一种能自动从网络上收集信息的工具&#xff0c;可根据用户的需求定向采集特定数据信息的工具&#xff0c;本项目通过研究爬取网上商品评论信息实现商品评论的情感分析系统功能。对于…

Android13 针对low memory killer内存调优

引入概念 在旧版本的安卓系统中&#xff0c;当触发lmk&#xff08;low memory killer&#xff09;的时候一般认为就是内存不足导致&#xff0c;但是随着安卓版本的增加lmk的判断标准已经不仅仅是内存剩余大小&#xff0c;io&#xff0c;cpu同样会做评判&#xff0c;从而保证设备…

Java之获取Nginx代理之后的客户端IP

Java之获取Nginx代理之后的客户端IP Nginx代理接口之后&#xff0c;后台获取的IP地址都是127.0.0.1&#xff0c;解决办法是需要配置Nginx搭配后台获取的方法&#xff0c;获得设备的真实地址。我们想要获取的就是nginx代理日志中的这个IP nginx配置 首先在nginx代理的对应lo…

【NI-DAQmx入门】构建应用程序案例2(经典界面配置、流盘)(建议大家学习)

此范例展示了DAQ常规的一个简单界面设计案例&#xff0c;仅是学习使用。 范例包含以下LabVIEW编程常用知识&#xff1a;UI设计、窗口缩放、子面板、启动画面、自定义控件、选项卡控件、表格、对话框&#xff0c;光标、状态更新、运行时菜单等等。 支持界面跳转配置DAQ通道&…

Android Studio自定义region模板

问题 有些文件&#xff0c;AS自带的Surround With不提示region&#xff0c;于是就可以自定义模板进行region 设置模板 菜单 Preferences | Editor | Live Templates 检查是否生效 1.选中代码 2.快捷键 cmd opt T 3.选择刚才自定义的模板

ArcGIS中查看栅格影像最大值最小值的位置

如果只是想大概获取栅格影像中最大值最小值的位置进行查看&#xff0c;可以不用编写程序获取具体的行列信息&#xff0c;只需要利用分类工具即可。 假设有一幅灰度影像数据&#xff0c;如下图所示。 想要查看最大值2116的大概位置在哪里&#xff0c;可以右击选择图层属性&…