Flink checkpoint 源码分析

news2024/12/29 9:24:40

序言

最近因为工作需要在阅读flink checkpoint处理机制,学习的过程中记录下来,并分享给大家。也算是学习并记录。

目前公司使用的flink版本为1.11。因此以下的分析都是基于1.11版本来的。

在分享前可以简单对flink checkpoint机制做一个大致的了解。

Flink checkpoint 机制介绍

Flink的checkpoint的过程依赖于异步屏障快照算法,该算法在《Lightweight Asynchronous Snapshots for Distributed Dataflows》这篇paper中被提出。理解了这篇paper也就明白了flink的chekpoint机制。paper整体来说比较简单易懂,下面简单介绍下paper的大体内容和核心的算法。

[1] 引用:Flink Checkpoint原理解析 - 知乎

代码分析

Flink checkpoint 的触发是通过CheckpointCoordinator 的定时线程完后。

	private ScheduledFuture<?> scheduleTriggerWithDelay(long initDelay) {
		return timer.scheduleAtFixedRate(
			new ScheduledTrigger(),
			initDelay, baseInterval, TimeUnit.MILLISECONDS);
	}

之后通过snapshotTaskState RPC的调用来实现触发checkpoint的

代码中遍历executions 来触发checkpoint,那么executions是什么东西呢?

Flink 代码中维护了一个叫tasksToTrigger的数组。

这个地方向前追溯,可以一直到jobgrap的生成。从名字和代码就可以看出,这个里面存的是没有inputchannel的节点,source节点没有inputchannel,所以回答上面的问题,executions 中是source节点,也就是做checkpoint 时 checkpointcoordinate 会给source节点发送rpc。

通过一个很长亮度的调用,最后到了SubtaskCheckpointCoordinatorImpl 中的

public void checkpointState(
			CheckpointMetaData metadata,
			CheckpointOptions options,
			CheckpointMetricsBuilder metrics,
			OperatorChain<?, ?> operatorChain,
			Supplier<Boolean> isCanceled) throws Exception {

		checkNotNull(options);
		checkNotNull(metrics);

		// All of the following steps happen as an atomic step from the perspective of barriers and
		// records/watermarks/timers/callbacks.
		// We generally try to emit the checkpoint barrier as soon as possible to not affect downstream
		// checkpoint alignments

		if (lastCheckpointId >= metadata.getCheckpointId()) {
			LOG.info("Out of order checkpoint barrier (aborted previously?): {} >= {}", lastCheckpointId, metadata.getCheckpointId());
			channelStateWriter.abort(
				metadata.getCheckpointId(),
				new CancellationException("checkpoint aborted via notification"),
				true);
			checkAndClearAbortedStatus(metadata.getCheckpointId());
			return;
		}

		// Step (0): Record the last triggered checkpointId and abort the sync phase of checkpoint if necessary.
		lastCheckpointId = metadata.getCheckpointId();
		if (checkAndClearAbortedStatus(metadata.getCheckpointId())) {
			// broadcast cancel checkpoint marker to avoid downstream back-pressure due to checkpoint barrier align.
			operatorChain.broadcastEvent(new CancelCheckpointMarker(metadata.getCheckpointId()));
			LOG.info("Checkpoint {} has been notified as aborted, would not trigger any checkpoint.", metadata.getCheckpointId());
			return;
		}

        // if checkpoint has been previously unaligned, but was forced to be aligned (pointwise
        // connection), revert it here so that it can jump over output data
        if (options.getAlignment() == CheckpointOptions.AlignmentType.FORCED_ALIGNED) {
            options = options.withUnalignedSupported();
            initInputsCheckpoint(metadata.getCheckpointId(), options);
        }

		// Step (1): Prepare the checkpoint, allow operators to do some pre-barrier work.
		//           The pre-barrier work should be nothing or minimal in the common case.
		operatorChain.prepareSnapshotPreBarrier(metadata.getCheckpointId());

		// Step (2): Send the checkpoint barrier downstream
        LOG.debug(
                "Task {} broadcastEvent at {}, triggerTime {}, passed time {}",
                taskName,
                System.currentTimeMillis(),
                metadata.getTimestamp(),
                System.currentTimeMillis() - metadata.getTimestamp());
        CheckpointBarrier checkpointBarrier =
                new CheckpointBarrier(metadata.getCheckpointId(), metadata.getTimestamp(), options);
        operatorChain.broadcastEvent(checkpointBarrier, options.isUnalignedCheckpoint());

        // Step (3): Register alignment timer to timeout aligned barrier to unaligned barrier
        registerAlignmentTimer(metadata.getCheckpointId(), operatorChain, checkpointBarrier);

        // Step (4): Prepare to spill the in-flight buffers for input and output
        if (options.needsChannelState()) {
			// output data already written while broadcasting event
			channelStateWriter.finishOutput(metadata.getCheckpointId());
		}

        // Step (5): Take the state snapshot. This should be largely asynchronous, to not impact
        // progress of the
		// streaming topology

		Map<OperatorID, OperatorSnapshotFutures> snapshotFutures = new HashMap<>(operatorChain.getNumberOfOperators());
		try {
			if (takeSnapshotSync(snapshotFutures, metadata, metrics, options, operatorChain, isCanceled)) {
				finishAndReportAsync(snapshotFutures, metadata, metrics, options);
			} else {
				cleanup(snapshotFutures, metadata, metrics, new Exception("Checkpoint declined"));
			}
		} catch (Exception ex) {
			cleanup(snapshotFutures, metadata, metrics, ex);
			throw ex;
		}
	}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1634058.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

docker学习笔记3:VmWare CentOS7安装与静态ip配置

文章目录 一、安装CentOS71、下载centos镜像2、安装二、设置静态ip三、xshell连接centos本专栏的docker环境是在centos7里安装,因此首先需要会安装centos虚拟机。 本篇博客介绍如何在vm虚拟机里安装centos7。 一、安装CentOS7 1、下载centos镜像 推荐清华源,下载如下版本 …

【C++】学习笔记——string_1

文章目录 四、模板初阶2. 类模板 五、STL简介1. 什么是STL2. STL的六大组件3. 如何学习STL 六、string类1. string类对象的容量操作 未完待续 四、模板初阶 2. 类模板 函数模板就是&#xff1a;模板 函数&#xff1b;类模板就是&#xff1a;模板 类。和函数模板用法基本相同…

Kafka客户端工具:Offset Explorer 使用指南

Kafka作为一个分布式流处理平台&#xff0c;在大数据处理和实时数据流应用中扮演着至关重要的角色。管理Kafka的topics及其offsets对于维护系统稳定性和数据一致性至关重要。Offset Explorer是一个强大的桌面应用程序&#xff0c;它使得管理和监控Kafka集群变得简单直观。本文将…

数组 Leetcode 704 二分查找/Leetcode 59 螺旋矩阵/Leetcode 203移除链表元素

数组 Leetcode 704 二分查找 Leetcode 704 学习记录自代码随想录 二分法模板记忆&#xff0c;数值分析中牛顿迭代法 class Solution { public:int search(vector<int>& nums, int target) {int left 0, right nums.size()-1;// 是否需要等于号&#xff0c;假设…

verilog分析task的接口设计,证明这种写法:assign {a,b,c,d} = links;

verilog分析task的接口设计&#xff0c;证明这种写法&#xff1a;assign {a,b,c,d} links; 1&#xff0c;task在状态机中的使用好处&#xff1a;2&#xff0c;RTL设计3&#xff0c;测试testbench4&#xff0c;波形分析&#xff0c;正确&#xff01; 参考文献&#xff1a; 1&am…

C++初阶学习第四弹——类与对象(中)——刨析类与对象的核心点

类与对象&#xff08;上&#xff09;&#xff1a;C初阶学习第三弹——类与对象&#xff08;上&#xff09;——初始类与对象-CSDN博客 前言&#xff1a; 在前面文章中&#xff0c;我们已经讲了类与对象的思想和类与对象的一些基本操作&#xff0c;接下来这篇文章我们将讲解以下…

NLP(10)--TFIDF优劣势及其应用Demo

前言 仅记录学习过程&#xff0c;有问题欢迎讨论 TF*IDF&#xff1a; 优势&#xff1a; 可解释性好 可以清晰地看到关键词 即使预测结果出错&#xff0c;也很容易找到原因 计算速度快 分词本身占耗时最多&#xff0c;其余为简单统计计算 对标注数据依赖小 可以使用无标注语…

超级数据查看器 app v2.0发布 欢迎下载使用

超级数据查看器 app v2.0发布 欢迎下载使用 感谢大家的支持 &#xff1a;&#xff09; 点击访问APP下载界面 跳转 腾讯应用宝 简介 超级数据查看器软件&#xff08;简称超级数据查看器&#xff09;是一个提供数据查询和数据管理的手机APP&#xff0c;能导入文本数据&…

ubuntu neo4j 下载与配置(一)

neo4j 官方下载页面 https://neo4j.com/deployment-center/#community 进入页面之后&#xff0c;往下滑 咱们在下载neo4j时&#xff0c;官方可能要咱们填写一下个人信息&#xff0c;比如&#xff1a;姓名组织结构邮箱等&#xff1a; 咱们可以观察一下&#xff0c;ne4j的下载链…

大数据学习笔记14-Hive基础2

一、数据字段类型 数据类型 &#xff1a;LanguageManual Types - Apache Hive - Apache Software Foundation 基本数据类型 数值相关类型 整数 tinyint smallint int bigint 小数 float double decimal 精度最高 日期类型 date 日期 timestamps 日期时间 字符串类型 s…

OpenAI 新推出 AI 问答搜索引擎——SearchGPT 震撼登场

您的浏览器不支持 video 标签。 OpenAI-SearchGPT 近日&#xff0c;OpenAI 曝光了自己的一款令人瞩目的 AI 问答搜索引擎——SearchGPT。这款搜索引擎带来了全新的搜索体验&#xff0c;给整个行业带来了巨大的压力。 SearchGPT 支持多种强大的功能。首先&#xff0c;它能够通过…

STM32CubeMX+MDK通过I2S接口进行音频输入输出(全双工读写一个DMA回调)续-音质问题解决总结

一、前言 之前进行了STM32CubeMXMDK通过I2S接口进行音频输入输出&#xff08;全双工读写一个DMA回调&#xff09;的研究总结&#xff1a; https://juejin.cn/post/7339016190612881408#heading-34 后续音质问题解决了&#xff0c;目前测试下来48khz的双声道使用效果很好&…

mySQL商城项目实战 (终)(全部表)(1-88张)

本章无sql语句&#xff0c;直接放转出的sql文件。 88张表结果如图! 资源在已经与文章绑定&#xff0c; 在navicat工具中&#xff0c;执行以下步骤 在新建的数据库中右键,点击【运行sql文件】&#xff0c;运行绑定的资源&#xff0c;之后您就可以在您的navicat中看到我建好的8…

Python数据分析大作业(ARIMA 自回归积分滑动平均模型) 4000+字 图文分析文档 销售价格库存分析+完整python代码

资源地址&#xff1a;Python数据分析大作业 4000字 图文分析文档 销售分析 完整python代码 完整代码分析 ​ 同时销售量后1000的sku品类占比中&#xff08;不畅销产品&#xff09;如上&#xff0c;精品类产品占比第一&#xff0c;达到66.7%&#xff0c;其次是香化类产品&#x…

在Android中,如何通过Kotlin协程处理多个API调用

在Android中&#xff0c;如何通过Kotlin协程处理多个API调用 在Android开发中&#xff0c;如何使用Kotlin协程处理多个API调用的示例呢&#xff1f;假设我们已经对Kotlin协程有了一定的了解&#xff0c;包括定义、简单用例和示例等。现在&#xff0c;让我们来看一些真实的Andr…

AD21技巧[更加便捷的DRC检查][把线框转成Keep-Out Layer板框]

AD10用了好久,之所以换到AD21并不是因为AD10功能不够强,而且因为别人用高版本设计的软件到我这里竟然打不开了,这个是我不能够接受的,所以开始使用AD21,使用之后发现AD21好多使用习惯和AD10有很大的区别! 更加便捷的DRC检查 本文摘录于&#xff1a;https://www.cnblogs.com/U…

[Android14] SystemUI的启动

1. 什么是System UI SystemUI是Android系统级应用&#xff0c;负责反馈系统及应用状态并与用户保持大量的交互。业务主要涉及的组成部分包括状态栏(Status Bar)&#xff0c;通知栏(Notification Panel)&#xff0c;锁屏(Keyguard)&#xff0c;控制中心(Quick Setting)&#xff…

基于H.264的RTP打包中的组合封包以及分片封包结构图简介及抓包分析

H.264视频流的RTP封装类型分析&#xff1a; 前言&#xff1a; NULL Hearder简介(结构如下)&#xff1a; ---------------|0|1|2|3|4|5|6|7|--------|F|NRI| Type |--------------- F&#xff1a;forbidden_zero_bit&#xff0c; 占1位&#xff0c;在 H.264 规范中规定了这…

RustGUI学习(iced)之小部件(四):如何使用单选框radio部件?

前言 本专栏是学习Rust的GUI库iced的合集&#xff0c;将介绍iced涉及的各个小部件分别介绍&#xff0c;最后会汇总为一个总的程序。 iced是RustGUI中比较强大的一个&#xff0c;目前处于发展中&#xff08;即版本可能会改变&#xff09;&#xff0c;本专栏基于版本0.12.1. 概述…

【酱浦菌-爬虫技术细节】解决学术堂爬虫翻页(下一页)问题

首先我们通过css选择器获取页码信息&#xff0c;这里的css选择器&#xff0c;选择的是含有a标签的所有li标签&#xff0c;代码如下&#xff1a; li html_web.css(div.pd_c_xslb_left_fenye ul li>a) for li in li:li_url li.css(a::attr(href)).get()li_num li.css(a::t…