Flink checkpoint 源码分析- Checkpoint snapshot 处理流程

news2025/1/16 15:02:24

背景

在上一篇博客中我们分析了代码中barrier的是如何流动传递的。Flink checkpoint 源码分析- Checkpoint barrier 传递源码分析-CSDN博客

最后跟踪到了代码org.apache.flink.streaming.runtime.io.checkpointing.CheckpointedInputGate#handleEvent 

现在我们接着跟踪相应代码,观察算子接受到了barrier是如何进行下一步代码处理的。以及了解flink应对不同的消费语义(At least once, exactly once)对于checkpoint的影响是怎样的。

代码分析

org.apache.flink.streaming.runtime.io.checkpointing.CheckpointedInputGate#handleEvent 中我们主要关注对于checkpointBarrier的处理流程。

processBarrier方法实现上就可以看出,flink barrier的处理分成两种。

在这里我们需要跟踪一下barrierHandler 是如何生成的才能知道后面所要走的流程是哪一步。

通过往上追溯barrierHandler的生成,我们跟踪到方法:org.apache.flink.streaming.runtime.io.checkpointing.InputProcessorUtil#createCheckpointBarrierHandler 从代码中我们可以看到 如果是 EXACTLY_ONCE 那么生成的就SingleCheckpointBarrierHandler, 如果checkpoint 模式是AT_LEAST_ONCE, 生成对应的handler就是CheckpointBarrierTracker。 但是从代码中,EXACTLY_ONCE似乎不是简单的new 一个SingleCheckpointBarrierHandler, 而是通过一个方法来生成。因此需要进一步的观察这个方法是如何实现的。

org.apache.flink.streaming.runtime.io.checkpointing.InputProcessorUtil#createBarrierHandler

这里针对checkpoint类型做了区分,主要是分为aligned checkpoint 和 unaliged checkpoint的差异。这里可以进一步观察一下这两类checkpoint之前的差异。

对比这两个方法参数的差异,发现主要就是两处处参数有差异。subTaskCheckpointCoordinator、barrierHandlerState。这两个的差异主要体现在flink 在aligned checkpoint超时,会切换为unaligned checkpoint。这里可以先按下不表,回到最开始的处理过程。

总结一下就是如果是flink 设置了at least once是使用的是CheckpointBarrierTracker, 当flink模式为exactly once时是SingleCheckpointBarrierHandler。 当为exactly once时checkpoint 类型又可以分为是aligned checkpoint还是unaligned checkpoint。

At least once 下 barrier是如何处理的

at least once 下对于barrier的处理是在以下的方法中实现的。

org.apache.flink.streaming.runtime.io.checkpointing.CheckpointBarrierTracker#processBarrier

public void processBarrier(CheckpointBarrier receivedBarrier, InputChannelInfo channelInfo) throws IOException {
		final long barrierId = receivedBarrier.getId();

		// fast path for single channel trackers
		if (totalNumberOfInputChannels == 1) {
			markAlignmentStartAndEnd(receivedBarrier.getTimestamp());
			notifyCheckpoint(receivedBarrier);
			return;
		}

		// general path for multiple input channels
		if (LOG.isDebugEnabled()) {
			LOG.debug("Received barrier for checkpoint {} from channel {}", barrierId, channelInfo);
		}

		// find the checkpoint barrier in the queue of pending barriers
		CheckpointBarrierCount barrierCount = null;
		int pos = 0;

		for (CheckpointBarrierCount next : pendingCheckpoints) {
			if (next.checkpointId == barrierId) {
				barrierCount = next;
				break;
			}
			pos++;
		}

		if (barrierCount != null) {
			// add one to the count to that barrier and check for completion
			int numBarriersNew = barrierCount.incrementBarrierCount();
			if (numBarriersNew == totalNumberOfInputChannels) {
				// checkpoint can be triggered (or is aborted and all barriers have been seen)
				// first, remove this checkpoint and all all prior pending
				// checkpoints (which are now subsumed)
				for (int i = 0; i <= pos; i++) {
					pendingCheckpoints.pollFirst();
				}

				// notify the listener
				if (!barrierCount.isAborted()) {
					if (LOG.isDebugEnabled()) {
						LOG.debug("Received all barriers for checkpoint {}", barrierId);
					}
					markAlignmentEnd();
					notifyCheckpoint(receivedBarrier);
				}
			}
		}
		else {
			// first barrier for that checkpoint ID
			// add it only if it is newer than the latest checkpoint.
			// if it is not newer than the latest checkpoint ID, then there cannot be a
			// successful checkpoint for that ID anyways
			if (barrierId > latestPendingCheckpointID) {
				markAlignmentStart(receivedBarrier.getTimestamp());
				latestPendingCheckpointID = barrierId;
				pendingCheckpoints.addLast(new CheckpointBarrierCount(barrierId));

				// make sure we do not track too many checkpoints
				if (pendingCheckpoints.size() > MAX_CHECKPOINTS_TO_TRACK) {
					pendingCheckpoints.pollFirst();
				}
			}
		}
	}

如果只有一个inputchannel的情况下,在收到这一个barrier的时候,就可以做snapshot.

在这个中间会经过triggerCheckpointOnBarrier 等方法, 最后实际还是调到了org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl#checkpointState ,看到这里其实这很长的链路实际是一个循环,下一个算子会生成barrier,接着传递这个barrier。

实际情况是作业并行度不唯一,一个subtask往往是有多个inputchannel. 可以继续看看是如何处理的。

这里面当收取到第一个barrier,会将这个barrier信息存在一个队列中。

每当收到一个barrier的时候会进行计数,当收取到的是最后一个barrier的时候把之前的barrier全部清除,之后就可以通知做checkpoint snapshot, 这个流程就和之前的一个信道的checkpoint流程是一致的。

总结而言:at least 类型的checkpoint是在收到最后一个barrier的时候开始做snapshot的。

Exactly once checkpoint是如何处理的

首先看这一段的代码

@Override
	public void processBarrier(CheckpointBarrier barrier, InputChannelInfo channelInfo) throws IOException {
		long barrierId = barrier.getId();
		LOG.debug("{}: Received barrier from channel {} @ {}.", taskName, channelInfo, barrierId);

        if (currentCheckpointId > barrierId
                || (currentCheckpointId == barrierId && !isCheckpointPending())) {
            if (!barrier.getCheckpointOptions().isUnalignedCheckpoint()) {
                inputs[channelInfo.getGateIdx()].resumeConsumption(channelInfo);
            }
			return;
		}

        checkNewCheckpoint(barrier);
        checkState(currentCheckpointId == barrierId);

        if (numBarriersReceived++ == 0) {
            if (getNumOpenChannels() == 1) {
                markAlignmentStartAndEnd(barrier.getTimestamp());
            } else {
                markAlignmentStart(barrier.getTimestamp());
            }
		}

        // we must mark alignment end before calling currentState.barrierReceived which might
        // trigger a checkpoint with unfinished future for alignment duration
        if (numBarriersReceived == numOpenChannels) {
            if (getNumOpenChannels() > 1) {
                markAlignmentEnd();
            }
        }

        try {
            currentState = currentState.barrierReceived(context, channelInfo, barrier);
        } catch (CheckpointException e) {
            abortInternal(barrier.getId(), e);
        } catch (Exception e) {
            ExceptionUtils.rethrowIOException(e);
        }

        if (numBarriersReceived == numOpenChannels) {
			numBarriersReceived = 0;
			lastCancelledOrCompletedCheckpointId = currentCheckpointId;
			LOG.debug(
				"{}: Received all barriers for checkpoint {}.", taskName, currentCheckpointId);
			resetAlignmentTimer();
			allBarriersReceivedFuture.complete(null);
			}
		}

这里需要关注一下currentState, 在最开始我们看了他的构造函数AlternatingWaitingForFirstBarrier, 因此可以可以看这个方法具体是现实。

这里可以看到这里会block 住收到barrier的信道,如果barrier 都收齐了,之后会检查是不是unaligned的checkpoint, 如果不是可以直接做一次checkpoint。这个checkpoint和之前的流程是一致的。

这里的下一个分支是超时转化,比如设置为30s,前30s是做aligned checkpoint, 如果30s还没有完成,就会转化为unaligned checkpoint。 当然,你如果不想有超时时间,可以直接设置为0.

如果是unaligned checkpoint, 会将channel 里面的数据也写会到远端。

这个中间会有一些状态转化,每次barrier的到达都会触发不同的状态变化。其中我们看到对于uc来说,uc的第一个barrier到达了,就会触发一次global checkpoint,所以这个时候是不会block住信道的。org.apache.flink.streaming.runtime.io.checkpointing.AlternatingWaitingForFirstBarrierUnaligned#barrierReceived

org.apache.flink.streaming.runtime.io.checkpointing.AlternatingCollectingBarriersUnaligned#barrierReceived

最后如果收到所有的barrier之后会finished checkpoint。状态恢复到原位。

总结一下:在exactly once的语义下,aligned checkpoint的做法是,收到一个barrier的时候会将对应的channel block住。当收到最后一个barrier的时候再做一次checkpoint。

unaligned的做法是,收到barrier的时候,第一步就会触发一次checkpoint, 之后会不断上传channel state, 当收到最后一个barrier则表示checkpoint结束。

在这一篇文章我们主要介绍了ssubtask级别的snapshot的,后面再进一步介绍一下整体流程,就可以结束相关介绍了。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1655770.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

element ui的确认提示框按钮样式修改

修改确认提示框的默认按钮样式&#xff0c;使用css强制修改 例&#xff1a; js代码&#xff1a; deleteUser(params){this.$confirm("您确定要删除吗&#xff1f;此操作无法撤销并且将永久删除所有数据。", "提示", { type: "warning", cancel…

西奥CHT-01软胶囊硬度计:开启硬度测试新纪元,引领制药行业品质升级

西奥CHT-01软胶囊硬度计&#xff1a;开启硬度测试新纪元&#xff0c;引领制药行业品质升级 在追求品质卓越的制药行业中&#xff0c;软胶囊硬度测试是确保药品质量与安全的关键环节。为了满足行业对高精度、高效率硬度测试的需求&#xff0c;西奥科技推出了CHT-01软胶囊硬度计…

【XR806开发板试用】使用FDCM操作Flash记录开机次数

一、寻找系统分配的自定义用户数据地址 &#xff08;1&#xff09;XR806的Flash布局 如图1所示&#xff0c;FLASH的布局有两种&#xff1a; 1、没有开启OTA模式&#xff1b;Image1PaddingSysinfo 2、开启OTA模式&#xff1b;Image1PaddingSysinfoOTA area Image2 Padding 如图…

一维数组 和 关键字 sizeof

数组的概念 “ 数组 ”我们可以理解成一组相同类型元素的集合 &#xff08;1&#xff09;其中可以是单个或是多个元素&#xff0c;可以是0&#xff0c;但元素个数不能为0 &#xff08;2&#xff09;一个数组中存放的元素必须是同类型的&#xff0c;比如一组整型&#xff0c;…

ruoyi-nbcio 基于flowable规则的多重并发网关的任意跳转

更多ruoyi-nbcio功能请看演示系统 gitee源代码地址 前后端代码&#xff1a; https://gitee.com/nbacheng/ruoyi-nbcio 演示地址&#xff1a;RuoYi-Nbcio后台管理系统 http://218.75.87.38:9666/ 更多nbcio-boot功能请看演示系统 gitee源代码地址 后端代码&#xff1a; h…

【密评】 | 商用密码应用安全性评估从业人员考核题库(6/58)

根据Kerckhoffs原则&#xff0c;密码系统的安全性主要依赖于&#xff08;&#xff09;。 A. 密钥 B. 加密算法 C. 解密算法 D. 通信双方 2000年10月&#xff0c;美国NIST宣布&#xff08;&#xff09;算法作为新的高级加密标准AES。 A. Rijndael B. RC6 C. SERPENT D. Twofish…

去除快捷方式的箭头图标

文章目录 取消箭头显示恢复箭头显示结果展示 添加快捷方式之后&#xff0c;会有箭头图标&#xff0c;部分场景下看着较为难受&#xff1a; 可以通过如下方式取消/显示箭头&#xff1a; 取消箭头显示 新建一个.bat文件&#xff0c;内部加入如下命令&#xff1a; reg add "…

Python-VBA函数之旅-reversed函数

目录 一、reversed函数的常见应用场景 二、reversed函数使用注意事项 三、如何用好reversed函数&#xff1f; 1、reversed函数&#xff1a; 1-1、Python&#xff1a; 1-2、VBA&#xff1a; 2、推荐阅读&#xff1a; 个人主页&#xff1a; https://blog.csdn.net/ygb_10…

Day2 | Java基础 | 2 数据类型

Day1 | Java基础 | 2 数据类型 基础版staticstatic的用法static修饰内部类static修饰方法static修饰变量static修饰代码块 深入分析static小结 问题回答版参数传递形参和实参的区别是什么&#xff1f;Java是值传递还是引用传递&#xff1f;值传递和引用传递的区别是什么&#x…

计算机发展史故事【6】

电脑群英谱 本世纪三、四十年代&#xff0c;是计算机发展史里最重大的收获季节。群英荟萃&#xff0c;逐鹿中原&#xff0c;鹿究竟死于谁手&#xff0c;并不是没有争议的。除了马克1 号与埃历阿克&#xff0c;还有一大批科学家为计算机的诞生作出过巨大的贡献&#xff0c;他们…

怎么做知识付费课程呢_揭秘知识付费课程系统的搭建之路

在信息爆炸的时代&#xff0c;知识的价值日益凸显。而知识付费&#xff0c;作为一种新型的学习方式&#xff0c;正逐渐走进大众的生活。那么&#xff0c;你是否想过自己也能搭建一套知识付费课程系统&#xff0c;分享你的智慧&#xff0c;实现知识的价值最大化呢&#xff1f;接…

2024年学浪视频怎么录屏

由于学浪最新版PC学生版客户端已经有防止录屏&#xff0c;而且录屏效率太慢&#xff0c;本文将介绍你一种高效率的工具&#xff0c;小浪助手.exe&#xff0c;它可以很轻松的将你的学浪视频下载下来 学浪下载工具我已经打包好了&#xff0c;有需要的自己下载一下 注意&#xf…

C++:多态-虚函数

C 中的多态性是面向对象编程中的一个重要概念&#xff0c;它允许在运行时选择不同的函数实现&#xff0c;以适应不同类型的对象。 多态的种类 编译时多态性&#xff08;Compile-time Polymorphism&#xff09;&#xff1a;也称为静态多态性或早期绑定&#xff0c;指在编译时确…

容联云孔淼:大模型落地与全域营销中台建设

近日&#xff0c;由金科创新社主办的2024区域性商业银行数智化转型研讨会顺利召开&#xff0c; 容联云产业数字云事业群副总经理、诸葛智能创始人孔淼受邀出席&#xff0c;并分享数智化转型实践经验。 他分享了容联云两大核心产品&#xff0c;“大模型应用容犀Copilot”在金融营…

SpringBoot Actuator未授权访问漏洞的解决方法

1. 介绍 Spring Boot Actuator 是一个用于监控和管理 Spring Boot 应用程序的功能模块。它提供了一系列生产就绪的功能&#xff0c;帮助你了解应用程序的运行状况&#xff0c;以及在运行时对应用程序进行调整。Actuator 使用了 Spring MVC 来暴露各种 HTTP 或 JMX 端点&#x…

嘴尚绝卤味:传承经典,缔造美食新风尚

卤味&#xff0c;作为中国传统美食的代表之一&#xff0c;历经千年的传承与发展&#xff0c;早已成为无数食客餐桌上的宠儿。而在这个美食盛行的时代&#xff0c;嘴尚绝卤味凭借其独特的口感和精湛的工艺&#xff0c;成为卤味市场中的佼佼者&#xff0c;引领着卤味文化的新潮流…

Linux(openEuler、CentOS8)常用的IP修改方式(文本配置工具nmtui+配置文件+nmcli命令)

----本实验环境为openEuler系统<以server方式安装>&#xff08;CentOS类似&#xff0c;可参考本文&#xff09;---- 一、知识点 &#xff08;一&#xff09;文本配置工具nmtui(openEuler已预装) nmtui&#xff08;NetworkManager Text User Interface&#xff09;是一…

【系统架构师】-案例篇-UML用例图

1、概述 用于表示系统功能需求&#xff0c;以及应用程序与用户或者与其他应用程序之间的交互关系。 2、组成 参与者&#xff08;Actors&#xff09;&#xff1a;与系统交互的用户或其他系统。用一个人形图标表示。用例&#xff08;Use Cases&#xff09;&#xff1a;系统需要…

大家都是怎么写毕业论文的? 推荐4个AI工具

写作这件事一直让我们从小学时期就开始头痛&#xff0c;初高中时期800字的作文让我们焦头烂额&#xff0c;一篇作文里用尽了口水话&#xff0c;拼拼凑凑才勉强完成。 大学时期以为可以轻松顺利毕业&#xff0c;结果毕业前的最后一道坎拦住我们的是毕业论文&#xff0c;这玩意不…

c++ 入门2

五. 函数重载 函数重载&#xff1a;是函数的一种特殊情况&#xff0c;C允许在同一作用域中声明几个功能类似的同名函数&#xff0c;这 些同名函数的形参列表(参数个数 或 类型 或 类型顺序)不同&#xff0c;常用来处理实现功能类似数据类型 不同的问题。 1、参数类型不同 #inc…