基于 Log 的通用增量 Checkpoint 在美团的进展

news2025/1/16 8:08:32

摘要:本文整理自美团计算引擎工程师王非凡,在 Flink Forward Asia 2022 核心技术专场的分享。本篇内容主要分为四个部分:

    1. Log based Checkpoint 基本原理介绍

    2. 美团应用场景及测试效果

    3. Changelog Restore 性能优化

    4. Changelog 存储选型探索

Tips:点击「阅读原文」查看原文视频&演讲 ppt

01

Log based Checkpoint 基本原理介绍

1.1 普通 Checkpoint 制作方式

7ec9ffc66ecd433879821c3ca12c6075.jpeg

Log based Checkpoint 又被称为通用的增量 Checkpoint,那就要先看一下在此之前的增量 Checkpoint 是怎么制作的,以及存在什么样的问题。这里主要是指 RocksDB 的增量 Checkpoint。为了简化描述,后面部分也都以 RocksDBStateBackend Checkpoint 为例。

如上图所示,RocksDBStateBackend Checkpoint 在制作时直接触发底层的 RocksDB 快照。在此过程中 RocksDB 需要将内存中的 MemTable Flush 到磁盘上,形成 L0 层的 SST 文件,以保证 SST 文件中保存了全量一致的状态数据。

2b2b8b17e87ccdd332ec4ebaced6c185.jpeg

RocksDBStateBackend 的增量 Checkpoint 就是在制作新的 Checkpoint 时不再重复上传上次 Checkpoint 已经传过的 SST 文件,通过减少重复上传文件来提高制作速度。但由于每次仍需要 RocksDB 将 MemTable 刷盘,导致高频制作 Checkpoint 时会存在一些问题。

首先是 Memtable 频繁刷盘会导致 L0 文件产生的过快,进而导致频繁的 Compaction。其次是 Checkpoint 制作时,所有 RocksDB 实例同步执行快照会导致 CPU 使用率、磁盘 IO 和网络流量的尖刺。

7ca70c49ac133aa5b166c6110e0e73a8.jpeg

上图为 Log based Checkpoint 的制作方式。为了支持该功能,Flink 引入了 ChangelogStateBackend 组件。这个组件在代理状态写操作的过程中,通过背后的 Changelog Writer 组件将 State Change 数据写入到 Changelog Storage 中。在 Checkpoint 制作时,只需要 Changelog Writer 保证 Flush State Change 即可。而底层 RocksDB 的快照被称为物化,并将以较低的频率异步执行。

我们结合例子来具体看下 Checkpoint 是如何制作的,上图中也描述了随时间推进发生的事件:

  • T0 时刻作业启动,随之开始处理数据。数据处理过程中不断地有 State Change 发生。T1 时刻 Checkpoint-1 触发,此时从 T0 到 T1 所有 State Change 就是 Checkpoint-1 中的所有状态数据,也就是图中的 Change-Set-1。

  • T2 时刻触发了一次物化,即异步制作 RocksDB 快照并上传,我们简称为 m1。同时在 T2 时刻将在 State Changelog 中记录同步点,使得 T1 到 T2 之间的 State Change 构成 Change-Set-2。

  • T3 时刻触发了 Checkpoint-2,由于此时 m1 还没有完成,Checkpoint-2 仍然只由 Changelog 构成,即包含了 Change-Set-1、Change-Set-2 和 Change-Set-3。在 Checkpoint-2 制作完成之后,m1 也制作完成了。

  • T4 时刻触发了 Checkpoint-3,这时候可以发现存在一个最近完成的 m1,Checkpoint-3 可以直接由 m1 及其之后的 Change-Set-3 和 Change-Set-4 构成。

  • 通过这种方式,Flink 的 Checkpoint 与底层的状态存储快照进行了解耦,使得 Flink Checkpoint 能够以较高的频率和速度执行。


1.2 Log based Checkpoint 优势

优势有四点:

  • 第一,可以带来更轻量的 Recovery。Checkpoint 的间隔越短,Recovery 时需要回溯的数据就越少,作业恢复的速度也就会越快。

  • 第二,可以减少事务化 Sink 的端到端延迟。事务化 Sink 在 Checkpoint 完成时进行 commit,更快的 Checkpoint 意味着可以进行更频繁的 commit。

  • 第三,更可预测的 Checkpoint 间隔。不需要等 DB Flush,compaction 的影响也更小,Checkpoint 制作时长仅取决于需要持久化到 Durable Storage 上的 Changelog 数据的多少。

  • 第四,更友好的资源使用。比较重的 DB 快照操作被分散执行,可以避免 CPU 使用率、磁盘 IO 和网卡流量 随 Checkpoint 制作而产生的尖刺。

02

美团应用场景和测试效果

3788fd9e7714b29192275dad64d8465c.jpeg

我们想要尝试使用 Log based Checkpoint 支持的业务需求,是流量数据天级回溯不超过千分之五。这种类型作业的特点是数据量大、作业规模大、单作业的规模有 5000 slot 左右。当前大部分作业能够稳定运行的 Checkpoint 间隔为十分钟,一天内 Failover 一次,回溯的数据量就会打破千分之五以内的要求。

针对这种需求,我们最初有两种解决思路:一个是减少数据回溯的时间,另一个是减少回溯的 Kafka 分区数。减少回溯数据时间可以通过缩短 Checkpoint 间隔来实现。这方面有 Flink 社区的 Flip-158 即我们上面介绍的 Log based Checkpoint 来支持。而减少回溯分区数可以通过只重启故障的节点来实现,这方面有 Flink 社区的 Flip-135 即 Approximate Task Local Recovery 来提供支持。

考虑到 Approximate Task Local Recovery 的方案会导致数据丢失,而这是我们的业务所不能接受的,因此我们选择了 Log based Checkpoint。由于当时该功能还是实验室阶段,因此我们首先对其进行了测试验证。

测试表明,使用 Log based Checkpoint 可以将 Checkpoint 的稳定制作间隔从十分钟下降到十秒,完全能够满足流量业务对天级数据回溯不超过千分之五的要求。

dfc2f7e8c8debb94bdea3aebcd61897e.jpeg

同时如上图三张资源使用率的监控所示,也验证了 Log based Checkpoint 能够优化资源使用。图中蓝线是使用传统的 Checkpoint 方式,黄线是使用了 Log based Checkpoint。具体的:CPU 使用率的峰值下降了 40%,磁盘 IO 使用率峰值下降了 29%,网卡出流量峰值下降了 61%。基本消除了传统 Checkpoint 制作过程中的资源使用尖刺。

41229dd08af1da2cd5e36e84c628f6cf.jpeg

但同时测试也暴露出 Flink 社区基于 DFS 实现的 Changelog 存储的一些问题。

首先是 Changelog Restore 重复下载文件问题。为了避免产生过多小文件,同一 TaskManager 内的 State Changelog 会尽量聚合到同一个文件中。而在 Restore 时这些文件会被同一个 TM 内的 operator 重复下载,导致 Restore 性能差。

其次是即使经过 TM 粒度的聚合,小文件问题仍然严重,HDFS NN 压力巨大。以我们一个 4800 并发的作业为例,默认配置下会产生 130 万左右个 Changelog 文件,单个作业带来的 NN 请求高达 18000 次/秒左右。

最后是 Changelog 文件写延迟太高,影响 Checkpoint 制作速度。还是以我们的 4800 并发作业为例,写 Changelog 文件 p99 延迟在 3 秒左右,最大延迟甚至达到 2 分钟,导致 Checkpoint 的制作时间非常不稳定。

针对这些问题,我们分别进行了分析和解决,这将会在后面的篇幅展开。

03

Changelog Restore 性能优化

3.1 Restore 时 Changelog 重复下载的问题

fd713b6cadd7a73d489394f3c2135bf7.jpeg

Flink 社区基于 DFS 的 Changelog Storage 在实现 Changelog 上传时,为了减少小文件问题,选择在同一 TM 内对所有 Operator Changelog 进行聚合和压缩,然后再上传到 DFS 上的文件中。而发生 Restore 时每个 Operator 实例的 Changelog Reader 都需要重复地读一次 Changelog 文件,造成严重的读放大,进而导致 Restore 的速度过慢。

减少同一个 TM 内的 slot 数可以减轻这个问题,但是又会加重 DFS 小文件的问题。

a585dcdf7ee2bac72ae1d907d5bfa7a5.jpeg

其实每个 Operator 实例的 Changelog 数据在文件中是连续的一段,并且 Checkpoint 元数据中记录了 Offset,为什么不能直接 Seek 到相应的 Offset 后再开始读操作呢,这样就可以做到每个 Operator 只读属于自己的部分,不是么?

问题就在于,为了减少 Changelog 文件的体积,Changelog 文件是经过压缩后再上传到 DFS 的。压缩导致 Offset 对压缩后的文件失效了,只能从头开始解压缩后再 Seek 到相应的 Offset 上。这样就导致了同一个 Changelog 文件的反复下载和解压缩。

af5f55c88693835e3a751b3db0c4f2f8.jpeg

针对这个问题,我们提出在 TM 上增加一个 Changelog File Cache 组件,代理 Changelog 文件的下载。Cache 组件会在需要的时候将 Changelog 文件下载并直接解压缩后存储到本地,当 Changelog Reader 发起请求时,可以直接在本地缓存的文件上 Seek 到相应的 Offset 后读取。

这样在 Restore 的过程中,每个 Changelog 文件都只需要下载和解压各一次。Changelog File Cache 组件会在内部对每个本地缓存文件记录引用计数,通过引用计数和可配置的 TTL 来清理本地缓存。这个优化已经在 Flink-1.16 上发布了。

04

Changelog 存储选型探索

7af9a90fd0185a49dfb95af7158ff98c.jpeg

上图总结了在美团业务场景下 State Changelog 数据对存储的需求。基本的功能性需求是不丢、不重和保序,以保证数据的正确性。同时在性能方面需要满足较低的写入延迟,以保证 Checkpoint 的快速的完成。最后结合美团的 Flink 作业现状,还需要能够支撑百万级的并发写入。

针对以上的各项需求,我们对候选的存储进行了对比。首先我们将存储分为两类,一类是批式上传的 HDFS 和 S3,这一类是当前实现已经支持的;另一类是流式上传,例如 BookKeeper、Kafka 和 Pulsar 等。具体细节见如下表格:

cc19f10bf509b083fd2a838b11d630a2.jpeg

补充说明下 BookKeeper,由于在设计之初就考虑到为 WAL Log 服务,从功能保障、延迟和并发规模上来看,都比较适合用来存储 Flink 的 State Changelog。而 HDFS 和 S3 这种 DFS 类的存储,延迟和并发支持能力均不太满足需求。Kafka 由于面向分区的设计,在分区数过多的情况下表现不佳。Puslar 在能力上是满足需求的,但是相对于 BookKeeper 来看没有明显优势,又比 BookKeeper 多了 1 层 proxy 的开销。因此我们初步选择使用 BookKeeper 作为新的 Changelog 存储。

3ebd09ee4eb9848eecdb39061a8d8c7e.jpeg

在实现 BookKeeper Changelog Storage 之前,我们先来看一下 Changelog 涉及的组件和他们的作用。如上图所示,Changelog 中组织了三个组件,分别是:

  • 负责代理状态读写的 ChangelogBackend。

  • 负责 Changelog 读写的 Changelog Storage。

  • 负责协调以上两个组件的物化 Manager。

我们要实现的就是 BookKeeper 的 Changelog Storage,其中包含 Writer 和 Reader 两个主要组件。

6606261896b3143a7ed5f6de329491ec.jpeg

结合 Checkpoint 的制作流程来看一下 Changelog Writer 的角色:

  • Coordinator 触发 Checkpoint 之后,Changelog Writer 需要将已经传给自己的 State Change 数据都 Flush 到 BookKeeper 上。然后向 Coordinator ACK 一个 BookKeeper Changelog State Handle。这个 Handle 中会包含 BookKeeper 的地址、数据所在的 Ledger ID 和 Offset 等信息。

  • Coordinator 在收到 ACK 之后,将 Metadata 写入 Durable Storage 上,Checkpoint 就制作完成了。

4757187d89a19e5186e7182e87d3af7c.jpeg

再来看一下 BookKeeper Changelog Writer 对三个核心方法的实现:

首先是 Append 方法,用于向 Changelog Writer 传递 State Change。考虑到要减少到 BookKeeper 的请求次数,我们会在 Writer 中去攒 Batch,Batch 满之后再将 Batch 中的数据发送给 BookKeeper,并递增 Sequence Number。nextSequence 方法用来获取物化的同步点,因此需要直接将 Batch 发送给 BookKeeper,同时也要递增 Sequence Number。

Persist 的方法用于在 Checkpoint 制作时,保证 Changelog 数据被持久化到 BookKeeper 上。因此除了需要将 Batch 发送给 BookKeeper 并递增 Sequence Number 之外,由于前面的操作考虑到优化 BookKeeper 写入性能而开启了 Deferred Sink,没有要求 BookKeeper 刷盘,Persist 时还需要调用 Force 方法强制 BookKeeper 将前面收到的 entry 刷盘。最后,Persist 方法还需要收集并整理 Ledger ID 和相关的 Offset 组装成 BookKeeper Changelog State Handle 返回给 Checkpoint Coordinator。

e0c8414da10dda808a4cf0e8029e8f24.jpeg

BookKeeper Changelog State Handle 中记录了存储在 BookKeeper 上的 Changelog 数据的元信息:一部分是 BookKeeper 的地址、数据摘要类型和数据加密密钥,另一部分包含了 Ledger ID 和相关 Offset。

如上图所示,阴影部分表示了此次 Checkpoint 需要包含的 State Change,覆盖了两个 Ledger,因此需要分别记录两个 Ledger ID、Start Offset 和 End Offset。

3a56411aefe040ca579cc53189df9fad.jpeg

介绍完 Changelog 的写操作,再结合 Checkpoint Restore 的流程来看下读操作。

首先 Checkpoint Coordinator 会从 Durable Storage 上读取 Metadata 并解析出其中的 BookKeeper Changelog State Handle,然后将这些 Handle 分发给不同的 BookKeeper Changelog Reader,由 Reader 负责发起读请求从 BookKeeper 中读取数据并 Apply 到 State Table 上。

3358fc928534331675f4cda15365a83f.jpeg

如何实现 BookKeeper Changelog State Handle 在 Checkpoint Metadata 中的序列化和反序列化呢?

由于目前所有的 State Handle 类型的序列化和反序列化都是硬编码的,非常不便于实现第三方的 Changelog Storage,我们当前的方案是增加一个 Customer Keyed State Handle 的接口,允许 Keyed State Handle 自己实现序列化和反序列化方法。

Customer Keyed State Handle 在序列化时会首先将使用的序列化器的类名写入到 Metadata。再使用序列化器将 Handle 写入 Metadata。反序列化时,首先从 Metadata 中读取出序列化器类名,再根据类名使用 ClassLoader Load 出序列化器,最后使用序列化器读出 Customer Handle。

d9cc02859e328b6df6dbb3a27210d408.jpeg

上图是 BookKeeper Changelog Storage 的配置项。除 BookKeeper 的地址外分为三类,分别用于控制 Ledger 的副本配置、Ledger 的滚动配置和批量上传配置,并且都提供了默认值。

需要说明一下,由于当前的 Shared State Registry 不支持 BookKeeper Changelog State Handle 这类并非基于 Stream State Handle 的实现,因此我们暂时通过 TTL+外部服务的方式去清理 Ledger。

782e964556e5006dbc1a37cab5a3798f.jpeg

我们在 State/Checkpoint 方向上的未来规划如下:

  • 继续完善 BookKeeper Changelog Storage,补充相关的指标,实现引擎内部的 Ledger 清理,并完成 Benchmark 测试和性能的分析,掌握能力边界。

  • 继续推动 Changelog 功能的落地,推动 POC 业务线上落地 Changelog 功能,在事务化 Sink 场景推广 Changelog 功能。

往期精选

5782800bb53ceb85c1e8d182058d0a39.png

49fafedbc0d619506ef871a6d7f8ade9.png

18064501db001a3f41d29838d0b86f07.jpeg

4a16118fd50257b19f8b8f587b676c16.jpeg

944f513a6a911981652b864a70484113.jpeg


▼ 活动推荐▼

7577392e8ab2b7a0f2eb7c90a0a391b7.png

▼ 关注「Apache Flink」,获取更多技术干货 ▼

84663e89b17a03fd0909955859575438.png

 aea726c97e1755b41d39bdb3cf352486.gif  点击「阅读原文」,查看原文视频&演讲 PPT

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/557481.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

49天精通Java,第35天,Java初始化、构造器、对象创建的过程

目录 一、初始化与清理二、用构造器保证初始化1、无参构造器2、有参构造器 三、成员初始化1、显式初始化2、隐式初始化3、初始化块 四、对象创建的过程 大家好,我是哪吒。 🏆本文收录于,49天精通Java从入门到就业。 全网最细Java零基础手把…

微服务开发系列 第三篇:OpenFeign

总概 A、技术栈 开发语言:Java 1.8数据库:MySQL、Redis、MongoDB、Elasticsearch微服务框架:Spring Cloud Alibaba微服务网关:Spring Cloud Gateway服务注册和配置中心:Nacos分布式事务:Seata链路追踪框架…

多维时序 | MATLAB实现GA-GRU遗传算法优化门控循环单元多变量时间序列预测

多维时序 | MATLAB实现GA-GRU遗传算法优化门控循环单元多变量时间序列预测 目录 多维时序 | MATLAB实现GA-GRU遗传算法优化门控循环单元多变量时间序列预测效果一览基本介绍程序设计参考资料 效果一览 基本介绍 多维时序 | MATLAB实现GA-GRU遗传算法优化门控循环单元多变量时间…

OSPF 邻居关系 附实验

实验目的 通过对 OSPF 邻居关系建立进行相关的实验,从中学习到如何调整 OSPF Hello 包间隔,Dead 间隔,以及影响 OSPF 邻居建立的因素,例如 RID,MTU,OSPF 认证。并且将总结 OSPF在不同网络类型上的不同行为…

FPGA纯verilog实现CameraLink视频接收和发送,附带工程源码和技术支持

目录 1、前言2、CameraLink协议基础3、目前我已有的CameraLink收发工程4、设计方案5、CameraLink解码模块详解6、CameraLink编码模块详解7、vivado工程详解8、上板调试验证9、福利:工程代码的获取 1、前言 FPGA实现CameraLink视频编解码目前有两种方案:…

C++基本介绍

文章目录 🥭1.C基本介绍🧂1.1 C是什么🧂1.2 C发展史 🍒2. C的优势🥔2.1 语言的使用广泛度🥔2.2 C的应用领域 🫒3. C学习计划 🥭1.C基本介绍 🧂1.1 C是什么 C是一种通用…

用streamlit,几行代码就可以拥有漂亮图表!

大家注意:因为微信最近又改了推送机制,经常有小伙伴说错过了之前被删的文章,比如前阵子冒着风险写的爬虫,再比如一些限时福利,错过了就是错过了。 所以建议大家加个星标,就能第一时间收到推送。&#x1f44…

FastAPI 和 fastapi-amis-admin:强大而可扩展的后台管理解决方案!

大家好,我是安果! 我们都知道 Django 生态丰富,功能强大,适用于中、大型项目,并且自带了后台管理系统;而 FastAPI 更适用于构建高性能的 API,后台管理系统需要另外开发 本篇文章将介绍 FastAPI…

完犊子!原单位的离职证明丢了,下周要入职了,用AI做一个行不行?

弄丢了离职证明怎么办? 一位网友哀叹: 完犊子!原单位的离职证明丢了,下周要入职了,现在怎么办?用AI做一个行不行? 有相同经历的网友安慰他,离职证明没了没事,新公司会要求…

打卡智能中国(五):博士都去哪儿了?

《打卡智能中国》系列更新了几期,有读者表示,很爱看这类接地气的真实故事,也有读者反映,不是电工,就是文员、农民、治沙人,人工智能不是高精尖学科吗?那些学历很高的博士都去哪儿了?…

用数据讲故事:十大统计学/机器学习魔法指数

统计学和机器学习为数据分析提供理论基础,入门时我看过很多统计学相关书籍,复杂的公式和推导过程让我一度陷入迷茫。对于数据科学/分析师来说,如何使用统计学知识并应用到我们的分析场景中更为重要。本文主要基于数据分析工作中的实际应用场景…

1000本!计算机经典书籍分享

闲话少说,列表如下。 编程语言类书籍 包含:Java、C、C、Python、Go等语言 Java电子书大全https://www.yingyanshe.cn/5275.htmlC电子书大全https://www.yingyanshe.cn/5284.htmlC语言类电子书https://www.yingyanshe.cn/5293.htmlC#电子书https://www…

chatgpt赋能Python-python_fg

Python FG: 优秀的Python工程师一定要知道的资源 如果你是一名Python工程师,那么你一定会喜欢Python FG资源。Python FG是一个在线平台,提供海量的Python API文档、教程、实例、以及与Python有关的各种工具和资源,让Python工程师轻松学习和使…

盘点!Instruction Tuning 时代的大模型

作者 | Kevin吴嘉文 整理 | NewBeeNLP 公众号 https://zhuanlan.zhihu.com/p/616830127 Alpaca,ChatGLM 等模型的效果可以接受,下文总结部分笔记,为训练自定义小型化(7B)模型提供点知识储备。包括模型论文 LaMDA, Mup…

纯净版Win10系统重装教程(超详细)

本博客详细讲解纯净版Win10系统重装,步骤齐全,小白可实操。 纯净版Win10系统重装教程 系统安装前准备下载安装工具更新重装 制作U盘为启动盘 重装Win10进入Boot模式选择系统版本分区系统安装中新系统配置 系统安装前准备 ➢ 准备8G或8G以上的空U盘。&a…

POSTGRESQL 10个使用POSTGRESQL 需要避免的错误 (译)

开头还是介绍一下群,如果感兴趣polardb ,mongodb ,mysql ,postgresql ,redis 等有问题,有需求都可以加群群内有各大数据库行业大咖,CTO,可以解决你的问题。加群请联系 liuaustin3 ,在新加的朋友会分到2群(共…

EOS网络基金会大战Block.One

微信公众号修改了推送规则,请各位亲爱的读者给刘教链公众号添加星标🌟,以便及时收到每日最新文章推送! 星标🌟添加方法:【1】点击标题下方“刘教链 刘教链”的第二个“刘教链”,打开公众号主页&…

chatgpt赋能Python-python_field

Python在Field上的应用 Python作为一门高级编程语言,在众多领域中扮演着应用广泛、易于学习、使用简便、速度出色的角色。在本文中,我们将重点关注Python在Field上的应用。 Field是什么? Field指的是“领域”,包括科学、工程、…

活动报名|分布式人工智能:可扩展性、效率和泛化性

2023年05月25日(星期四)14:00-15:30,智源社区「智源Live 第42期」线上活动将在线举办,「阅读原文」报名即可参加。 活动主题:分布式人工智能:可扩展性、效率和泛化性 安波 安波是新加坡南洋理工大学校长委员…

chatgpt赋能Python-python_for_end

Python for End: 介绍 Python是一种高级编程语言,由Guido van Rossum创建于1989年,并在1991年正式发布。Python是一种多范式编程语言,可以用于面向对象、函数式和过程式编程。它拥有简单易懂的语法以及扩展性强的库,从而使得开发…