Flink checkpoint 源码分析- Flink Checkpoint 触发流程分析

news2024/11/24 0:53:31

序言

最近因为工作需要在阅读flink checkpoint处理机制,学习的过程中记录下来,并分享给大家。也算是学习并记录。

目前公司使用的flink版本为1.11。因此以下的分析都是基于1.11版本来的。

在分享前可以简单对flink checkpoint机制做一个大致的了解。

Flink checkpoint 机制介绍

Flink的checkpoint的过程依赖于异步屏障快照算法,该算法在《Lightweight Asynchronous Snapshots for Distributed Dataflows》这篇paper中被提出。理解了这篇paper也就明白了flink的chekpoint机制。paper整体来说比较简单易懂,下面简单介绍下paper的大体内容和核心的算法。

[1] 引用:Flink Checkpoint原理解析 - 知乎

代码分析

Flink checkpoint 的触发是通过CheckpointCoordinator 的定时线程完后。

	private ScheduledFuture<?> scheduleTriggerWithDelay(long initDelay) {
		return timer.scheduleAtFixedRate(
			new ScheduledTrigger(),
			initDelay, baseInterval, TimeUnit.MILLISECONDS);
	}

之后通过snapshotTaskState RPC的调用来实现触发checkpoint的

代码中遍历executions 来触发checkpoint,那么executions是什么东西呢?

Flink 代码中维护了一个叫tasksToTrigger的数组。

这个地方向前追溯,可以一直到jobgrap的生成。从名字和代码就可以看出,这个里面存的是没有inputchannel的节点,source节点没有inputchannel,所以回答上面的问题,executions 中是source节点,也就是做checkpoint 时 checkpointcoordinate 会给source节点发送rpc。

通过一个很长亮度的调用,最后到了SubtaskCheckpointCoordinatorImpl 中的

public void checkpointState(
			CheckpointMetaData metadata,
			CheckpointOptions options,
			CheckpointMetricsBuilder metrics,
			OperatorChain<?, ?> operatorChain,
			Supplier<Boolean> isCanceled) throws Exception {

		checkNotNull(options);
		checkNotNull(metrics);

		// All of the following steps happen as an atomic step from the perspective of barriers and
		// records/watermarks/timers/callbacks.
		// We generally try to emit the checkpoint barrier as soon as possible to not affect downstream
		// checkpoint alignments

		if (lastCheckpointId >= metadata.getCheckpointId()) {
			LOG.info("Out of order checkpoint barrier (aborted previously?): {} >= {}", lastCheckpointId, metadata.getCheckpointId());
			channelStateWriter.abort(
				metadata.getCheckpointId(),
				new CancellationException("checkpoint aborted via notification"),
				true);
			checkAndClearAbortedStatus(metadata.getCheckpointId());
			return;
		}

		// Step (0): Record the last triggered checkpointId and abort the sync phase of checkpoint if necessary.
		lastCheckpointId = metadata.getCheckpointId();
		if (checkAndClearAbortedStatus(metadata.getCheckpointId())) {
			// broadcast cancel checkpoint marker to avoid downstream back-pressure due to checkpoint barrier align.
			operatorChain.broadcastEvent(new CancelCheckpointMarker(metadata.getCheckpointId()));
			LOG.info("Checkpoint {} has been notified as aborted, would not trigger any checkpoint.", metadata.getCheckpointId());
			return;
		}

        // if checkpoint has been previously unaligned, but was forced to be aligned (pointwise
        // connection), revert it here so that it can jump over output data
        if (options.getAlignment() == CheckpointOptions.AlignmentType.FORCED_ALIGNED) {
            options = options.withUnalignedSupported();
            initInputsCheckpoint(metadata.getCheckpointId(), options);
        }

		// Step (1): Prepare the checkpoint, allow operators to do some pre-barrier work.
		//           The pre-barrier work should be nothing or minimal in the common case.
		operatorChain.prepareSnapshotPreBarrier(metadata.getCheckpointId());

		// Step (2): Send the checkpoint barrier downstream
        LOG.debug(
                "Task {} broadcastEvent at {}, triggerTime {}, passed time {}",
                taskName,
                System.currentTimeMillis(),
                metadata.getTimestamp(),
                System.currentTimeMillis() - metadata.getTimestamp());
        CheckpointBarrier checkpointBarrier =
                new CheckpointBarrier(metadata.getCheckpointId(), metadata.getTimestamp(), options);
        operatorChain.broadcastEvent(checkpointBarrier, options.isUnalignedCheckpoint());

        // Step (3): Register alignment timer to timeout aligned barrier to unaligned barrier
        registerAlignmentTimer(metadata.getCheckpointId(), operatorChain, checkpointBarrier);

        // Step (4): Prepare to spill the in-flight buffers for input and output
        if (options.needsChannelState()) {
			// output data already written while broadcasting event
			channelStateWriter.finishOutput(metadata.getCheckpointId());
		}

        // Step (5): Take the state snapshot. This should be largely asynchronous, to not impact
        // progress of the
		// streaming topology

		Map<OperatorID, OperatorSnapshotFutures> snapshotFutures = new HashMap<>(operatorChain.getNumberOfOperators());
		try {
			if (takeSnapshotSync(snapshotFutures, metadata, metrics, options, operatorChain, isCanceled)) {
				finishAndReportAsync(snapshotFutures, metadata, metrics, options);
			} else {
				cleanup(snapshotFutures, metadata, metrics, new Exception("Checkpoint declined"));
			}
		} catch (Exception ex) {
			cleanup(snapshotFutures, metadata, metrics, ex);
			throw ex;
		}
	}

代码中可以看到构造了CheckpointBarrier, source将barrier当成数据广播给下游的所有节点。使用的方法就是operatorChain.brodacastEvent()。这里就回到最开始提到的异步屏障快照算法。

下游收到了barrier,如何进行快照处理的?flink同时有多种类型的checkpoint,他们分别的处理时机是啥,后面我会进一步进行代码分析。

CheckpointBarrier checkpointBarrier =
                new CheckpointBarrier(metadata.getCheckpointId(), metadata.getTimestamp(), options);
        operatorChain.broadcastEvent(checkpointBarrier, options.isUnalignedCheckpoint());

 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1635063.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Qt:Qt框架的初步认识和基本使用

文章目录 Qt是什么Qt的优点Qt开发环境的基本使用对象树其他控件输入框按钮 本篇总结的是对于Qt框架的基本认识 Qt是什么 Qt框架是一个跨平台的C图形用户界面应用程序框架&#xff0c;框架是一群大佬发明出来的&#xff0c;帮助新手使用的一个内容&#xff0c;如果没有框架的存…

KUKA机器人如何给IO信号或寄存器添加中文注释信息?

KUKA机器人如何给IO信号或寄存器添加中文注释信息? 如下图所示,首先,我们需要登录专家以上用户权限(默认密码KUKA), 如下图所示,点击“投入运行”—“网络配置”, 如下图所示,此时机器人的IP地址为192.168.1.10, 如下图所示,用一根网线连接机器人控制柜到笔记…

使用ClassFinal实现springboot项目jar包加密

&#x1f604; 19年之后由于某些原因断更了三年&#xff0c;23年重新扬帆起航&#xff0c;推出更多优质博文&#xff0c;希望大家多多支持&#xff5e; &#x1f337; 古之立大事者&#xff0c;不惟有超世之才&#xff0c;亦必有坚忍不拔之志 &#x1f390; 个人CSND主页——Mi…

高级IO|从封装epoll服务器到实现reactor服务器|Part2

项目复习&#xff1a;从封装epoll_server到实现reactor服务器(part2) 项目复习&#xff1a;从封装epoll_server到实现reactor服务器(part2) 基本结构搭建好为什么上面我们写的epoll的recv是不正确的&#xff1f;sock要封装了&#xff0c;要维护缓冲区封装epoll(1)继续先写tcp_…

【算法刷题 | 贪心算法07】4.29(用最少数量的箭引爆气球、无重叠区间)

文章目录 12.用最少数量的箭引爆气球12.1题目12.2解法&#xff1a;贪心12.2.1贪心思路12.2.2代码实现 13.无重叠区间13.1题目13.2解法&#xff1a;贪心13.2.1贪心思路13.2.2代码实现 12.用最少数量的箭引爆气球 12.1题目 有一些球形气球贴在一堵用 XY 平面表示的墙面上。墙面…

js之JSON

json 是一种轻量级的数据交换格式。 json 就是一种在各个编程语言中流通的数据格式&#xff0c;负责不同编程语言中的数据传递和交互。 let data {name:张三,age:18}; console.log(data); // 对象 let str JSON.stringify(data); console.log(str); // json 数据 l…

3D模型在线查看利器,支持多种模型格式!

作为3D设计师&#xff0c;你是否曾遇到过这样的烦恼&#xff1a; 客户想看设计好的3D模型作品&#xff0c;但是客户身边没电脑&#xff0c;或者电脑没有3D查看器&#xff0c;又不会使用三维软件&#xff0c;从而无法及时查看模型。 还有就是&#xff0c;自己累积了很多3D模型作…

网易云怎么改IP地址到其他城市

在数字音乐的时代&#xff0c;网易云音乐以其丰富的音乐库和个性化的推荐算法赢得了众多用户的喜爱。然而&#xff0c;有些用户可能会遇到一个问题&#xff1a;自己的IP地址显示的是家乡或当前所在的城市&#xff0c;但自己希望显示的是其他城市。那么&#xff0c;网易云音乐是…

解决TIVA飞控玄学类问题的通解,用魔法打败魔法

问题&#xff1a;我遭遇了玄学问题&#xff0c;出现飞机在起降过程中&#xff0c;位置晃动&#xff0c;突然出现的&#xff0c;昨天还好好的&#xff0c;位置地点都没换&#xff0c;今天中午测试了5、6次每次都这样&#xff0c;现在茫然无措&#xff0c;小哥救我&#xff1f; 这…

手写 轮播效果

此处只做了手动点击的效果,未处理自动轮播,基于vue2书写 , 逻辑: 点击左边的图标,进行上一个处理,若此时在第一项,则return,否则将当前所在数据-1;点击右边的图标,进行下一个处理,若此时在最后一项,则return,否则将所在数据1;当单独点击某数据时,若当前就是点击项,则return,否…

与 Apollo 共创生态:探索智能驾驶新时代

前言 随着百度Apollo的七周年大会在北京车展前夕成功举办&#xff0c;我们迎来了一场关于智能汽车未来的思想盛宴。在这次主题为“破晓•拥抱智变时刻”的盛会上&#xff0c;百度Apollo发布了一系列令人振奋的智能驾驶产品&#xff0c;从领航辅助驾驶到智能座舱&#xff0c;再到…

[C++][算法基础]区间覆盖(贪心 + 区间问题4)

给定 &#x1d441; 个闭区间 [&#x1d44e;&#x1d456;,&#x1d44f;&#x1d456;] 以及一个线段区间 [&#x1d460;,&#x1d461;]&#xff0c;请你选择尽量少的区间&#xff0c;将指定线段区间完全覆盖。 输出最少区间数&#xff0c;如果无法完全覆盖则输出 −1。 …

界面组件DevExpress中文教程 - 如何在Node.js应用中创建报表?

DevExpress Reporting是.NET Framework下功能完善的报表平台&#xff0c;它附带了易于使用的Visual Studio报表设计器和丰富的报表控件集&#xff0c;包括数据透视表、图表&#xff0c;因此您可以构建无与伦比、信息清晰的报表。 获取DevExpress Reporting最新正式版下载(Q技术…

电商日志项目(一)

电商日志项目 一、项目体系架构设计1. 项目系统架构2. 项目数据流程二、环境搭建1. NginxLog文件服务1.1. 上传,解压1.2. 编译安装1.3. 启动验证2. Flume-ng2.1. 上传解压2.2. 修改配置文件2.3. 修改环境变量2.4. 验证3. Sqoop3.1. 上传解压3.2. 配置环境变量3.3. 修改配置文件…

「玻尔曾孙」领衔!超辐射原子,重塑全球精准测时——

超辐射原子能够帮助我们以前所未有的精度测量时间。在哥本哈根大学最近的一项研究中&#xff0c;研究人员开发了一种新的测量时间间隔&#xff08;秒&#xff09;的方法&#xff0c;这种方法克服了目前最先进原子钟面临的一些限制。 这一成就有望在多个领域产生深远影响&#x…

el-date-picker 禁用时分秒选择(包括禁用下拉框展示)

2024.04.26今天我学习了对el-date-picker进行禁用时分秒&#xff0c; 在使用el-date-picker组件的时候&#xff0c;我们有可能遇到需要把时分秒的时间固定&#xff0c;然后并且不能让他修改&#xff1a; 1714120999296 比如右上角的这个时间&#xff0c;我们要给它固定是‘08:…

Flask模版详解

Flask模版详解 概述Jinja2模板引擎渲染模版的步骤变量控制结构自定义错误页面链接静态文件 概述 模板是一个包含响应文本的文件&#xff0c;其中包含用占位变量表示的动态部分&#xff0c;其具体值只在请求的上下文中才能知道。使用真实值替换变量&#xff0c;再返回最终得到的…

Android4.4真机移植过程笔记(三)

如果文章字体看得不是很清楚&#xff0c;大家可以下载pdf文档查看&#xff0c;文档已上传&#xff5e;oo&#xff5e; 7、安装加密APK 需要修改文件如下&#xff1a; 相对Android4.2改动还是蛮大的&#xff0c;有些文件连路径都变了: //Android4.2 1、frameworks/native/libs…

如何运用结构化思维来规划个人发展

结构化思维不仅在工作中非常有用&#xff0c;在日常生活中同样可以发挥巨大作用。无论是解决家庭琐事、规划个人发展&#xff0c;还是做出重要决策&#xff0c;结构化思维都能帮助我们更有条理地思考和行动。 一、解决生活中的问题 生活中总会遇到各种各样的问题&#xff0…

Unity+Shader入门精要-1. 入门shader

今天开始正式整合学习的shader内容。 Simple Shader 主要介绍了大概的shader格式。 Shader "Unity Sgaders Book/Chapter 5/Simple Shader" //shader名 {Properties{//声明color类型的属性_Color("Color Tint", Color) (1.0,1.0,1.0,1.0)}SubShader{Pa…