22、Flink 背压下的 Checkpoint处理

news2025/1/13 12:02:03
1.概述

通常,对齐 Checkpoint 的时长主要受 Checkpointing 过程中的同步和异步两个部分的影响;但当 Flink 作业正运行在严重的背压下时,Checkpoint 端到端延迟的主要影响因子将会是传递 Checkpoint Barrier 到 所有的算子/子任务的时间;可以通过高 alignment time and start delay metrics 观察到,解决方案如下:

  • 解决背压问题。优化 Flink 作业,调整 Flink 或 JVM 参数,通过扩容。
  • 减少 Flink 作业中缓冲在 In-flight 数据的数据量。
  • 启用非对齐 Checkpoints。

上述选项并不是互斥的,可以组合使用。

2.缓冲区 Debloating

Flink 1.14 引入了缓冲区 Debloating 机制,用于自动控制在 Flink 算子/子任务之间缓冲的 In-flight 数据的数据量;可以通过将属性taskmanager.network.memory.buffer-debloat.enabled设置为true来启用。

此特性对对齐和非对齐 Checkpoint 都生效,并且在这两种情况下都能缩短 Checkpointing 的时间,不过 Debloating 的效果对于 对齐 Checkpoint 更明显;当在非对齐 Checkpoint 情况下使用缓冲区 Debloating 时,好处是 Checkpoint 大小会更小,并且恢复时间更快 (需要保存 和恢复的 In-flight 数据更少)。

3.非对齐 Checkpoints
a)概述

从 Flink 1.11开始,Checkpoint 可以是非对齐的;Unaligned checkpoints 包含 In-flight 数据(例如,存储在缓冲区中的数据)作为 Checkpoint State的一部分,允许 Checkpoint Barrier 跨越这些缓冲区,使 Checkpoint 时长变得与当前吞吐量无关,因为 Checkpoint Barrier 实际上已经不再嵌入到数据流当中。

如果 Checkpointing 由于背压导致周期非常的长,应该使用非对齐 Checkpoint,这样 Checkpointing 时间基本上就与 端到端延迟无关;但是非对齐 Checkpointing 会增加状态存储的 I/O,因此当状态存储的 I/O 是 整个 Checkpointing 过程当中真正的瓶颈时,不应当使用非对齐 Checkpointing。

启用非对齐 Checkpoint

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

// 启用非对齐 Checkpoint
env.getCheckpointConfig().enableUnalignedCheckpoints();

或者在 flink-conf.yml 配置文件中增加配置

execution.checkpointing.unaligned: true
b)对齐 Checkpoint 超时

在启用非对齐 Checkpoint 后,依然可以通过编程的方式指定对齐 Checkpoint 的超时时间

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.getCheckpointConfig().setAlignedCheckpointTimeout(Duration.ofSeconds(30));

或是在 flink-conf.yml 配置文件中配置:

execution.checkpointing.aligned-checkpoint-timeout: 30 s

在启动时,每个 Checkpoint 仍然是 aligned checkpoint,但是当全局 Checkpoint 持续时间超过 aligned-checkpoint-timeout 时, 如果 aligned checkpoint 还没完成,那么 Checkpoint 将会转换为 Unaligned Checkpoint。

c)限制

并发 Checkpoint

Flink 当前并不支持并发的非对齐 Checkpoint;Savepoint 也不能与非对齐 Checkpoint 同时发生,因此它们将会花费更长的时间。

与 Watermark 的相互影响

非对齐 Checkpoint 在恢复的过程中改变了关于 Watermark 的一个隐式保证;目前,Flink 确保 Watermark 作为恢复的第一步, 而不是将最近的 Watermark 存放在 Operator 中,以方便扩缩容。

在非对齐 Checkpoint 中,当恢复时,Flink 会在恢复 In-flight 数据后再生成 Watermark,如果 Pipeline 中使用了对每条记录都应用最新的 Watermark 的算子将会相对于使用对齐 Checkpoint 产生不同的结果;如果 Operator 依赖于最新的 Watermark 始终可用,解决办法是将 Watermark 存放在 OperatorState 中;此时 Watermark 应该使用单键 group 存放在 UnionState 以方便扩缩容。

与长时间运行的记录处理交互

尽管有未对齐的检查点,但 barrier 能够越过队列中的其它记录;如果当前记录需要大量时间进行处理,则此 barrier 的处理仍可能延迟,例如在窗口操作中同时触发多个定时器时。

当系统处理单个输入记录时被阻止,在等待多个网络缓冲区可用时,可能会出现 Flink 不能中断对单个输入记录的处理,未对齐的检查点必须等待当前处理的记录被完全处理;

原因要么是由于不适合单个网络缓冲区的大型记录的串行化,要么是在 flatMap 操作中,为一个输入记录生成许多输出记录;此时背压可以阻止未对齐的检查点,直到处理单个输入记录所需的所有网络缓冲区可用;当单个记录的处理需要一段时间时,也可能发生在其它情况下,导致检查点的时间可能高于预期。

某些数据分布模式不是检查点式的

有一部分包含属性的的连接无法与 Channel 中的数据一样保存在 Checkpoint 中;为了保留这些特性并且确保没有状态冲突或 非预期的行为,非对齐 Checkpoint 对于这些类型的连接是禁用的,所有其他的交换仍然执行非对齐 Checkpoint。

点对点连接

目前没有任何对于点对点连接中有关数据有序性的强保证;由于数据已经被前置的 Source 或是 KeyBy 相同的方式隐式组织,一些用户会依靠这种特性在提供的有序性保证的同时将计算敏感型的任务划分为更小的块。

只要并行度不变,非对齐 Checkpoint(UC) 将会保留这些特性,但是如果加上 UC 的伸缩容,这些特性将会被改变。

针对如下任务

image-20240509104253843

如果想将并行度从 p=2 扩容到 p=3,那么需要根据 KeyGroup 将 KeyBy 的 Channel 中的数据划分到3个 Channel 中去;通过使用 Operator 的 KeyGroup 范围和确定记录属于某个 Key(group) 的方法可以实现;但对于 Forward 的 Channel,没有 KeyContext,Forward Channel 里也没有任何记录被分配了任何 KeyGroup;也无法计算它,因为无法保证 Key 仍然存在。

广播 Connections

广播 Connection 无法保证所有 Channel 中的记录都以相同的速率被消费,可能导致某些 Task 已经应用了与特定广播事件对应的状态变更,而其他任务则没有。

image-20240509104545856

广播分区通常用于实现广播状态,它应该跨所有 Operator 都相同;Flink 实现广播状态,通过仅 Checkpointing 有状态算子的 SubTask 0 中状态的单份副本。

在恢复时,将该份副本发往所有的 Operator,可能导致某个算子将很快从它的 Checkpointed Channel 消费数据并将修改应用于记录来获得状态。

4.故障排除

in-flight 中的数据损坏。

注意:以下描述的操作是最后采取的手段,因为它们将会导致数据的丢失。

为了防止 In-flight 数据损坏,或者由于其他原因导致作业应该在没有 In-flight 数据的情况下恢复,可以使用 recover-without-channel-state.checkpoint-id 属性。

该属性需要指定一个 Checkpoint Id,对它来说 In-flight 中的数据将会被忽略;除非已经持久化的 In-flight 数据内部的损坏导致无法恢复的情况,否则不要设置该属性。

只有在重新部署作业后该属性才会生效,只有启用 externalized checkpoint 时,此操作才有意义。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1667054.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

乡村振兴与农村基础设施建设:加大农村基础设施建设投入,提升农村公共服务水平,改善农民生产生活条件,构建宜居宜业的美丽乡村

一、引言 乡村振兴是我国现代化进程中的重要战略,而农村基础设施建设则是乡村振兴的基石。随着城市化进程的加快,农村基础设施建设滞后的问题日益凸显,成为制约乡村发展的瓶颈。因此,加大农村基础设施建设投入,提升农…

Docker需要代理下载镜像

systemctl status docker查看docker的状态和配置文件是/usr/lib/systemd/system/docker.service vi /usr/lib/systemd/system/docker.service, 增加如下配置项 [Service] Environment"HTTP_PROXYhttp://proxy.example.com:8080" "HTTPS_PROXYhttp:…

SpringBoot基于微信小程序的星座配对(源码)

博主介绍:✌程序员徐师兄、10年大厂程序员经历。全网粉丝12W、csdn博客专家、掘金/华为云/阿里云/InfoQ等平台优质作者、专注于Java技术领域和毕业项目实战✌ 🍅文末获取源码联系🍅 👇🏻 精彩专栏推荐订阅&#x1f447…

Springboot+logback 详细配置

一、添加依赖 这里使用springboot3.0.2 依赖 <dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId> </dependency><dependency><groupId>org.projectlombok</grou…

Java面试之分布式篇

分布式锁的实现方案 &#xff08;1&#xff09;用数据库实现分布式锁比较简单&#xff0c;就是创建一张锁表&#xff0c;数据库对字段作唯一性约束。加锁的时候&#xff0c;在锁表中增加一条记录即可&#xff1b;释放锁的时候删除锁记录就行。如果有并发请求同时提交到数据库&…

springboot2.x集成Elasticsearch7.7.0

一、前言 elasticsearch安装就不做过多介绍了&#xff0c;网上一搜一大堆&#xff1b;最需要注意的就是Elasticsearch与spring版本&#xff0c;防止版本不兼容导致的后续的一系列问题。我这里springbootspring-data-elasticsearch&#xff0c;他们的版本对照关系可以参照sprin…

手写Windows文件路径获取小工具

手写Windows文件路径获取小工具 目的 给Windows右键增加功能&#xff0c;右键任何文件&#xff08;夹&#xff09;显示复制文件路径的扩展。 效果展示 实现思路 右键调用&#xff0c;自身会把文件路径传递给被调用文件&#xff0c;被调用文件内只需将路径参数复制到剪贴板即…

【江科大STM32学习笔记】新建工程

1.建立工程文件夹&#xff0c;Keil中新建工程&#xff0c;选择型号 2.工程文件夹里建立Start、Library、User等文件夹&#xff0c;复制固件库里面的文件到工程文件夹 为添加工程文件准备&#xff0c;建文件夹是因为文件比较多需要分类管理&#xff0c;需要用到的文件一定要复…

day09-常用API异常

1.时间日期类 1.1 Date类&#xff08;应用&#xff09; 计算机中时间原点 1970年1月1日 00:00:00 时间换算单位 1秒 1000毫秒 Date类概述 Date 代表了一个特定的时间&#xff0c;精确到毫秒 Date类构造方法 方法名说明public Date()分配一个 Date对象&#xff0c;并初始化…

【动态规划】子序列问题I|最长递增子序列|摆动序列|最长递增子序列的个数|最长数对链

一、最长递增子序列 300. 最长递增子序列 算法原理&#xff1a; &#x1f4a1;细节&#xff1a; 1.注意子序列和子数组的区别&#xff1a; (1)子序列&#xff1a;要求顺序是固定的&#xff08;要求没那么高&#xff0c;所以子序列就多一些&#xff09; (2)子数组&#xff1a;要…

Python从0到POC编写-魔法方法

name __name__ 是系统定义的内部函数&#xff0c; 它的作用是识别模块。 通常我们看到这样一句话&#xff1a; if __name__ __main____name__ 的值有两种情况&#xff0c;那么挨个来说下。 如果模块是被直接执行的 &#xff0c;那么 __name__ 的值 为 __main__ 例如&…

搭建 IIS + asp +access 网站

搭建 IIS asp access 网站 一、什么是 asp二、asp 的组成三、asp 说明四、什么是access五、搭建环境六、问题一七、问题二八、网站展示九、IIS 页面展示十、IIS 功能展示 欢迎关注公总号【云边小网安】 一、什么是 asp asp&#xff1a;即 Active Server Pages&#xff0c;是…

PullTube for Mac:视频下载,一键搞定

还在为找不到想看的视频而烦恼吗&#xff1f;PullTube for Mac&#xff0c;让您的视频下载之旅变得更加轻松&#xff01;支持从多个主流视频网站下载视频&#xff0c;提供多种格式和质量选项&#xff0c;满足您的不同需求。简单易用的界面设计&#xff0c;让您轻松上手&#xf…

【北京迅为】《iTOP-3588从零搭建ubuntu环境手册》-第6章 安装Samba

RK3588是一款低功耗、高性能的处理器&#xff0c;适用于基于arm的PC和Edge计算设备、个人移动互联网设备等数字多媒体应用&#xff0c;RK3588支持8K视频编解码&#xff0c;内置GPU可以完全兼容OpenGLES 1.1、2.0和3.2。RK3588引入了新一代完全基于硬件的最大4800万像素ISP&…

MongoDB安装及接入springboot

环境&#xff1a;windows、jdk8、springboot2 1.MongoDB概述 MongoDB是一个开源、高性能、无模式&#xff08;模式自由&#xff09;的文档&#xff08;Bson&#xff09;型数据库&#xff1b;其特点如下&#xff1a; 模式自由 ---- 不需要提前创建表 直接放数据就可以 支持高并…

TriCore:Interrupt

今天简单总结下 TriCore 的中断路由模块。 名词缩写 缩写全程说明IRInterrupt Router SRService Request 包括&#xff1a; 1. External Resource 2. Internal Resource 3.SW&#xff08;Software&#xff09; SPService Privoder 包括&#xff1a; 1. CPU 2. DMA SRNServic…

Unity2D 模拟手柄实现玩家移动

1&#xff0c;创建控制器UI 2&#xff0c;挂载脚本 3&#xff0c;脚本编写 基本要素 [Tooltip("玩家游戏体")]public Rigidbody2D player;[Tooltip("玩家速度")]public float speed 1f;[Tooltip("玩家动画")]public Animator animator;public …

本地运行.net项目

有时候需要我们自己做一个.net的课设项目&#xff0c;但是我们有了代码后却不知道怎么运行。我们0基础来学习一下如何运行一个.net项目 1.安装visual studio 2022 不用安装老版本&#xff0c;新版就可以。安装好了2022版本&#xff0c;这是一个支持web的IDE&#xff0c;我们可…

具有CMOS输出,高速响应特点的新型汽车级晶振SG2520CAA

爱普生推出的汽车级晶振SG2520CAA。SG2520CAA是一款CMOS输出的&#xff0c;具有高响应速度的2520封装汽车级晶振&#xff0c;具有低电流消耗&#xff0c;1.6 V至3.63 V的宽工作电压&#xff0c;以及-40C至85C的宽工作温度范围&#xff0c;此外还可提供高达125C的工作温度。符合…

vue3使用setup模式的store报错

** setup store模式 $reset方法报错 ** 顾名思义就是 使用store 使用的是setup 语法模式 不能执行$reset 方法 解决方式&#xff1a; // main.ts import { createPinia } from pinia const pinia createPinia() pinia.use(({ store }) > {const initialState JSON.pars…