Checkpoint 执行机制原理解析

news2024/11/18 23:33:53

在介绍Checkpoint的执行机制前,我们需要了解一下state的存储,因为stateCheckpoint进行持久化备份的主要角色。Checkpoint作为Flink最基础也是最关键的容错机制,Checkpoint快照机制很好地保证了Flink应用从异常状态恢复后的数据准确性。同时 Checkpoint相关的metrics(指标)也是诊断Flink应用健康状态最为重要的指标,成功且耗时较短的Checkpoint表明作业运行状况良好,没有异常或反压。然而,由于Checkpoint与反压的耦合,反压反过来也会作用于Checkpoint,导致Checkpoint的种种问题。Flink1.11引入Unaligned(未对齐)Checkpoint来解耦Checkpoint机制与反压机制,优化高反压情况下的Checkpoint表现。

Statebackend 的分类

下图阐释了目前Flink内置的三类state backend,其中MemoryStateBackendFsStateBackend在运行时都是存储在java heap中的,只有在执行Checkpoint时,FsStateBackend才会将数据以文件格式持久化到远程存储上。 而RocksDBStateBackend则借用了 RocksDB(内存磁盘混合的LSM DB)对state进行存储。
[点击并拖拽以移动] ​
对于在这里插入图片描述
HeapKeyedStateBackend,有两种实现:
【1】支持异步Checkpoint(默认): 存储格式CopyOnWriteStateMap
【2】仅支持同步Checkpoint 存储格式NestedStateMap

特别在MemoryStateBackend内使用HeapKeyedStateBackend时,Checkpoint序列化数据阶段默认有最大5 MB数据的限制。对于 RocksDBKeyedStateBackend,每个state都存储在一个单独的column family内,其中keyGroupKeyNamespace进行序列化存储在 DB作为key
[点击并拖拽以移动] ​

Checkpoint 执行机制详解

Checkpoint的执行流程逐步拆解进行讲解,下图左侧是Checkpoint Coordinator,是整个Checkpoint的发起者,中间是由两个 source,一个sink组成的Flink作业,最右侧的是持久化存储,在大部分用户场景中对应HDFS
[点击并拖拽以移动] ​

【1】Checkpoint Coordinator向所有source节点触发trigger Checkpoint
【2】source节点向下游广播barrier(分界线),这个barrier就是实现Chandy-Lamport分布式快照算法的核心,下游的task只有收到所有inputbarrier才会执行相应的Checkpoint

Chandy-Lamport算法将分布式系统抽象成DAG(暂时不考虑有闭环的图),节点表示进程,边表示两个进程间通信的管道。分布式快照的目的是记录下整个系统的状态,即可以分为节点的状态(进程的状态)和边的状态(信道的状态,即传输中的数据)。因为系统状态是由输入的消息序列驱动变化的,我们可以将输入的消息序列分为多个较短的子序列,图的每个节点或边先后处理完某个子序列后,都会进入同一个稳定的全局统状态。利用这个特性,系统的进程和信道在子序列的边界点分别进行本地快照,即使各部分的快照时间点不同,最终也可以组合成一个有意义的全局快照。
[点击并拖拽以移动] ​

从实现上看,Flink通过在DAG数据源定时向数据流注入名为Barrier的特殊元素,将连续的数据流切分为多个有限序列,对应多个 Checkpoint周期。每当接收到Barrier,算子进行本地的Checkpoint快照,并在完成后异步上传本地快照,同时将Barrier以广播方式发送至下游。当某个Checkpoint的所有Barrier到达DAG末端且所有算子完成快照,则标志着全局快照的成功。

[点击并拖拽以移动] ​

【3】当task完成state备份后,会将备份数据的地址state handle通知给Checkpoint coordinator
[点击并拖拽以移动] ​

【4】下游的sink节点收集齐上游两个inputbarrier之后,会执行本地快照,这里特地展示了RocksDB incremental(增量) Checkpoint的流程,首先RocksDB会全量刷数据到磁盘上(红色大三角表示),然后Flink框架会从中选择没有上传的文件进行持久化备份(紫色小三角)。
[点击并拖拽以移动] ​

【5】同样的,sink节点在完成自己的Checkpoint之后,会将state handle返回通知Checkpoint Coordinator
[点击并拖拽以移动] ​

【6】最后,当Checkpoint coordinator收集齐所有taskstate handle,就认为这一次的Checkpoint全局完成了,向持久化存储中再备份一个Checkpoint meta文件。
[点击并拖拽以移动] ​

Checkpoint 的 EXACTLY_ONCE 语义

EXACTLY ONCE语义: 在有多个输入Channel的时候,为了数据准确性,算子会等待所有流的Barrier都到达之后才会开始本地的快照,这种机制被称为Barrier对齐。在对齐的过程中,算子只会继续处理的来自未出现Barrier Channel的数据,而其余Channel的数据会被写入输入队列(Flink通过一个input buffer将在对齐阶段收到的数据缓存起来),直至在队列满后被阻塞。当所有Barrier到达后(对齐),算子进行本地快照,输出 Barrier 到下游并恢复正常处理。
比起其他分布式快照,该算法的优势在于辅以Copy-On-Write技术的情况下不需要Stop The World影响应用吞吐量,同时基本不用持久化处理中的数据,只用保存进程的状态信息,大大减小了快照的大小。

AT LEAST ONCE语义: 无需缓存收集到的数据,会对后续直接处理,所以导致restore(恢复)时,数据可能会被多次处理。下图是官网文档里面就Checkpoint align的示意图:
[点击并拖拽以移动] ​

需要特别注意的是,FlinkCheckpoint机制只能保证Flink的计算过程可以做到EXACTLY ONCE,端到端的EXACTLY ONCE需要 sourcesink支持。

Checkpoint 与反压的耦合

目前的Checkpoint算法在大多数情况下运行良好,然而当作业出现反压时,阻塞式的Barrier对齐反而会加剧作业的反压,甚至导致作业的不稳定。

首先, Chandy-Lamport分布式快照的结束依赖于Marker的流动,而反压则会限制Marker的流动,导致快照的完成时间变长甚至超时。无论是哪种情况,都会导致Checkpoint的时间点落后于实际数据流较多。这时作业的计算进度是没有被持久化的,处于一个比较脆弱的状态,如果作业出于异常被动重启或者被用户主动重启,作业会回滚丢失一定的进度。如果Checkpoint连续超时且没有很好的监控,回滚丢失的进度可能高达一天以上,对于实时业务这通常是不可接受的。更糟糕的是,回滚后的作业落后的Lag更大,通常带来更大的反压,形成一个恶性循环。

其次,Barrier对齐本身可能成为一个反压的源头,影响上游算子的效率,而这在某些情况下是不必要的。比如典型的情况是一个的作业读取多个Source,分别进行不同的聚合计算,然后将计算完的结果分别写入不同的Sink。通常来说,这些不同的Sink会复用公共的算子以减少重复计算,但并不希望不同Source间相互影响。
[点击并拖拽以移动] ​

假设一个作业要分别统计AB两个业务线的以天为粒度指标,同时还需要统计所有业务线以周为单位的指标,拓扑如上图所示。如果B业务线某天的业务量突涨,使得Checkpoint Barrier有延迟,那么会导致公用的Window Aggregate进行Barrier对齐,进而阻塞业务AFlatMap,最终令业务A的计算也出现延迟。
当然这种情况可以通过拆分作业等方式优化,但难免引入更多开发维护成本,而且更重要的是这本来就符合Flink用户常规的开发思路,应该在框架内尽量减小出现用户意料之外的行为的可能性。

Unaligned Checkpoint

为了解决这个问题,Flink1.11版本引入了Unaligned Checkpoint的特性。要理解Unaligned Checkpoint的原理,首先需要了解 Chandy-Lamport论文中对于Marker处理规则的描述:自行百度翻译

Marker-Sending Rule for a Process p. For each channel c, incident on, and
directed away from p:
p sends one marker along c after p records its state and before p sends further messages
along c.
    Marker-Receiving Rule for a Process q. On receiving a marker along a channel
C:
if q has not recorded its state then
    begin q records its state;
          q records the state c as the empty sequence
    end
else q records the state of c as the sequence of messages received along c after q’s state
was recorded and before q received the marker along c.

其中关键是if q has not recorded its state,也就是接收到Marker时算子是否已经进行过本地快照。一直以来FlinkAligned Checkpoint通过Barrier对齐,将本地快照延迟至所有Barrier到达,因而这个条件是永真的,从而巧妙地避免了对算子输入队列的状态进行快照,但代价是比较不可控的 Checkpoint时长和吞吐量的降低 。实际上这和Chandy-Lamport算法是有一定出入的。举个例子,假设我们对两个数据流进行equal-join,输出匹配上的元素。按照Flink Aligned Checkpoint的方式,系统的状态变化如下(图中不同颜色的元素代表属于不同的Checkpoint周期):
[点击并拖拽以移动] ​

图 a: 输入Channel 1存在3个元素,其中2Barrier前面;Channel 2存在4个元素,其中297Barrier前面。
图 b: 算子分别读取Channel一个元素,输出2。随后接收到Channel 1Barrier,停止处理Channel 1后续的数据,只处理 Channel 2的数据。
图 c: 算子再消费2个自Channel 2的元素,接收到Barrier,开始本地快照并输出Barrier

对于相同的情况,Chandy-Lamport算法的状态变化如下:
[点击并拖拽以移动] ​

图 a: 输入Channel 1存在3个元素,其中2Barrier前面;Channel 2存在4个元素,其中297Barrier前面。
图 b: 算子分别处理两个Channel一个元素,输出结果2。此后接收到Channel 1Barrier,算子开始本地快照记录自己的状态,并输出Barrier
图 c: 算子继续正常处理两个Channel的输入,输出9。特别的地方是Channel 2后续元素会被保存下来,直到Channel 2Barrier出现(即Channel 297)。保存的数据会作为Channel的状态成为快照的一部分。

两者的差异主要可以总结为两点:
快照的触发是在接收到第一个Barrier时还是在接收到最后一个Barrier时。
是否需要阻塞已经接收到BarrierChannel的计算。

从这两点来看,新的 Unaligned Checkpoint将快照的触发改为第一个Barrier且取消阻塞Channel的计算 ,算法上与Chandy-Lamport基本一致,同时在实现细节方面结合Flink的定位做了几个改进。
首先,不同于 Chandy-Lamport模型的只需要考虑算子输入Channel的状态,Flink的算子有输入和输出两种Channel ,在快照时两者的状态都需要被考虑。其次,无论在Chandy-Lamport还是Flink Aligned Checkpoint算法中,Barrier都必须遵循其在数据流中的位置,算子需要等待Barrier被实际处理才开始快照。而Unaligned Checkpoint改变了这个设定,允许算子优先摄入并优先输出Barrier。如此一来,第一个到达Barrier会在算子的缓存数据队列(包括输入Channel和输出Channel)中往前跳跃一段距离,而被”插队”的数据和其他输入Channel在其Barrier之前的数据会被写入快照中。
[点击并拖拽以移动] ​

这样的主要好处是,如果本身算子的处理就是瓶颈Chandy-LamportBarrier仍会被阻塞,但Unaligned Checkpoint则可以在 Barrier进入输入Channel就马上开始快照。这可以从很大程度上加快Barrier流经整个DAG的速度,从而降低Checkpoint整体时长。回到之前的例子,用Unaligned Checkpoint来实现,状态变化如下:
[点击并拖拽以移动] ​

图 a: 输入Channel 1存在3个元素,其中2Barrier前面;Channel 2存在4个元素,其中297Barrier前面。输出 Channel已存在结果数据1
图 b: 算子优先处理输入Channel 1Barrier,开始本地快照记录自己的状态,并将Barrier插到输出Channel末端。
图 c: 算子继续正常处理两个Channel的输入,输出29。同时算子会将Barrier越过的数据(即输入Channel 12和输出 Channel1)写入Checkpoint,并将输入Channel 2后续早于Barrier的数据(即 297)持续写入Checkpoint

比起Aligned Checkpoint中不同Checkpoint周期的数据以算子快照为界限分隔得很清晰,Unaligned Checkpoint进行快照和输出Barrier时,部分本属于当前Checkpoint的输入数据还未计算(因此未反映到当前算子状态中),而部分属于当前Checkpoint的输出数据却落到Barrier之后(因此未反映到下游算子的状态中)。

这也正是 Unaligned的含义: 不同Checkpoint周期的数据没有对齐,包括不同输入Channel之间的不对齐,以及输入和输出间的不对齐。而这部分不对齐的数据会被快照记录下来,以在恢复状态时重放。换句话说,从Checkpoint恢复时,不对齐的数据并不能由Source端重放的数据计算得出,同时也没有反映到算子状态中,但因为它们会被Checkpoint恢复到对应Channel中,所以依然能提供只计算一次的准确结果。

当然,Unaligned Checkpoint并不是百分百优于Aligned Checkpoint,它会带来的已知问题就有:
【1】由于要持久化缓存数据,State Size会有比较大的增长,磁盘负载会加重。
【2】随着State Size增长,作业恢复时间可能增长,运维管理难度增加。

目前看来,Unaligned Checkpoint更适合容易产生高反压同时又比较重要的复杂作业。对于像数据ETL同步等简单作业,更轻量级的 Aligned Checkpoint显然是更好的选择。

总结:Flink 1.11Unaligned Checkpoint主要解决在高反压情况下作业难以完成Checkpoint的问题,同时它以磁盘资源为代价,避免了Checkpoint可能带来的阻塞,有利于提升Flink的资源利用率。随着流计算的普及,未来的Flink应用大概会越来越复杂,在未来经过实战打磨完善后Unaligned Checkpoint很有可能会取代Aligned Checkpoint成为Flink的默认Checkpoint策略。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1338290.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

golang连接mysql的第一个程序(最新)

不想多说话,先把golang和mysql连接好,环境配置好,然后直接上代码就行了,代码可直接运行。 package mainimport ("database/sql""fmt"_ "github.com/go-sql-driver/mysql" )func main() {// MySQL …

nvm 的安装及使用 (Node版本管理器)

目录 1、nvm 介绍 2、nvm安装 3、nvm 使用 4、node官网可以查看node和npm对应版本 5、nvm安装指定版本node 6、安装cli脚手架 1、nvm 介绍 NVM 全称 node.js version management ,专门针对 node 版本进行管理的工具,通过它可以安装和切换不同版本的…

【SpringCloud笔记】(11)消息驱动之Stream

Stream 技术背景 底层不同模块可能使用不同的消息中间件,这就导致技术的切换,微服务的维护及开发变得麻烦起来 概述 官网: https://spring.io/projects/spring-cloud-stream#overview https://cloud.spring.io/spring-cloud-static/spring…

在Go语言中处理HTTP文件上传

大家好,我是你们可爱又迷人的编程小助手,今天要带你们一起探讨在Go语言中如何处理HTTP文件上传,让我们把这场技术之旅变得轻松有趣吧! 首先,想象一下这个场景:你是一个网站的开发者,用户们急切…

pycharm 工具栏不见了

新版pycharm后, 菜单栏和工具栏不见了 目录 我发现的解决方法: 其他旧版的解决方法: 我发现的解决方法: 其他旧版的解决方法: 另外,一些使用pycharm的新手可能会由于不熟悉软件的功能而误操作&#xff…

文件夹共享(普通共享和高级共享的区别)防火墙设置(包括了jdk安装和Tomcat)

文章目录 一、共享文件1.1为什么需要配置文件夹共享功能?1.2配置文件共享功能1.3高级共享和普通共享的区别: 二、防火墙设置2.1先要在虚拟机上安装JDK和Tomcat供外部访问。2.2设置防火墙: 一、共享文件 1.1为什么需要配置文件夹共享功能&…

springboot对接WebSocket实现消息推送

1.修改pom文件 <dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-websocket</artifactId></dependency> 2.增加配置WebSocketConfig.java import org.springframework.context.annotation.Bean…

接口测试工具——ApiFox使用初体验 postman导出和ApiFox导入

目录 ApiFox使用初体验初步使用从postman导出到apifox导入 IDEA简单测试Postman测试工具post请求 接口测试工具swaggerKnife4j1.引入依赖2.配置3.常用注解4.接口测试 JMeter什么是JMeter?JMeter安装配置1.官网下载2.下载后解压3.汉语设置 JMeter的使用方法1.新建线程组2.设置参…

智能监控平台/视频共享融合系统EasyCVR海康设备国标GB28181接入流程

TSINGSEE青犀视频监控汇聚平台EasyCVR可拓展性强、视频能力灵活、部署轻快&#xff0c;可支持的主流标准协议有国标GB28181、RTSP/Onvif、RTMP等&#xff0c;以及支持厂家私有协议与SDK接入&#xff0c;包括海康Ehome、海大宇等设备的SDK等。平台既具备传统安防视频监控的能力&…

基于springboot的数码论坛系统设计与实现

** &#x1f345;点赞收藏关注 → 私信领取本源代码、数据库&#x1f345; 本人在Java毕业设计领域有多年的经验&#xff0c;陆续会更新更多优质的Java实战项目希望你能有所收获&#xff0c;少走一些弯路。&#x1f345;关注我不迷路&#x1f345;** 摘 要 网络的广泛应用给…

laravel api资源的问题记录

resource 转换层 可以帮助我们转换一些字段的结果&#xff0c;类似前端的filter。 可以使用比如对象或者模型的形式来处理&#xff0c;但使用sql查询会导致n1的问题。如图&#xff1a; 层次嵌套很多&#xff0c;而且很深&#xff0c;这样虽然开发方便了&#xff0c;但是维护就…

Mysql(5日志备份恢复)

一.日志管理 MySQL 的日志默认保存位置为 /usr/local/mysql/data 先看下mysql的日志文件有无&#xff1a; 修改配置文件添加&#xff1a;错误日志&#xff0c;用来记录当MySQL启动、停止或运行时发生的错误信息&#xff0c;默认已开启 修改配置文件添加&#xff1a;通用查…

PWR 电源控制-stm32入门

这一节我们来学习 STM32 的 PWR 电源控制。 其中&#xff0c;我们重点学习的主要就是 3 种低功耗模式&#xff1a;睡眠模式、停机模式和待机模式。 低功耗模式的目的呢&#xff1f;简单明了&#xff0c;就是省电&#xff0c;这对于一些使用电池供电&#xff0c;又需要长时间待…

腾讯云4核8G服务器轻量和CVM标准型S5对比

腾讯云4核8G服务器优惠价格表&#xff0c;云服务器CVM标准型S5实例4核8G配置价格15个月1437.3元&#xff0c;5年6490.44元&#xff0c;轻量应用服务器4核8G12M带宽一年446元、529元15个月&#xff0c;阿腾云atengyun.com分享腾讯云4核8G服务器详细配置、优惠价格及限制条件&…

nodejs微信小程序+python+PHP基于Android自习室管理系统的设计与实现-计算机毕业设计推荐

目 录 摘 要 I ABSTRACT II 目 录 II 第1章 绪论 1 1.1背景及意义 1 1.2 国内外研究概况 1 1.3 研究的内容 1 第2章 相关技术 3 2.1 nodejs简介 4 2.2 express框架介绍 6 2.4 MySQL数据库 4 第3章 系统分析 5 3.1 需求分析 5 3.2 系统可行性分析 5 3.2.1技术可行性&#xff1a;…

基于ssm的电影评论系统论文

摘 要 传统办法管理信息首先需要花费的时间比较多&#xff0c;其次数据出错率比较高&#xff0c;而且对错误的数据进行更改也比较困难&#xff0c;最后&#xff0c;检索数据费事费力。因此&#xff0c;在计算机上安装电影评论系统软件来发挥其高效地信息处理的作用&#xff0c…

电商低价 窜货问题怎么解决

渠道中最常见的问题一般都是围绕价格的&#xff0c;低价、窜货、假货问题尤为明显&#xff0c;经销商为了利益&#xff0c;总会进行价格的突破&#xff0c;窜货也是为了获得更多流量利润&#xff0c;也会变相低价&#xff0c;所以治理好低价、窜货问题&#xff0c;就是在管控渠…

【LeetCode刷题笔记(13-1)】【Python】【回文数】【反转整数】【简单】

文章目录 引言回文数题目描述提示 题意分析解决方案1&#xff1a;【反转字符串】解决方案2&#xff1a;【反转整数】题外话结束语 9. 回文数 引言 编写通过所有测试案例的代码并不简单&#xff0c;通常需要深思熟虑和理性分析。虽然这些代码能够通过所有的测试案例&#xff0…

[PyTorch][chapter 8][李宏毅深度学习][DNN 训练技巧]

前言&#xff1a; DNN 是神经网络的里面基础核心模型之一.这里面结合DNN 介绍一下如何解决 深度学习里面过拟合,欠拟合问题 目录&#xff1a; DNN 训练常见问题 过拟合处理 欠拟合处理 keras 项目 一 DNN 训练常见问题 我们在深度学习网络训练的时候经常会遇到下面…

Android原生实现分段选择

六年前写的一个控件&#xff0c;一直没有时间总结&#xff0c;趁年底不怎么忙&#xff0c;整理一下之前写过的组件。供大家一起参考学习。废话不多说&#xff0c;先上图。 一、效果图 实现思路使用的是radioGroup加radiobutton组合方式。原理就是通过修改RadioButton 的backgr…