基于 Log 的通用增量 Checkpoint​

news2024/11/25 14:32:20

摘要:本文整理自 Apache Flink Contributor 俞航翔 9 月 24 日在 Apache Flink Meetup 的分享。主要内容包括:

  1. Checkpoint 性能优化之路

  2. 解析 Changelog

  3. 一览 State/Checkpoint 优化

Tips:点击「阅读原文」查看原文视频&演讲 ppt

          

01

Checkpoint 性能优化之路

4a4eb2018a64e39ef5db78bc52308f76.png

Flink 作为一个 Stateful 计算引擎,State 是其非常重要的概念,它支持 Stateful 算子通过 State 记录多个 Events 之间的信息,并在 Checkpoint 时做状态持久化,存储全局一致性快照,在恢复时通过 Resume Checkpoint 以及 Replay 来实现不同语义的一致性保障。

容错是长期运行的流式系统非常重要的部分,而 Checkpoint 的主要目的就是解决 Failover 问题。基于此目标,它的生命周期完全由 Flink 管理。因此对于不同的 StateBackend 可以用特定的原生格式进行存储,利用 StateBackend 的内部机制,比如 Incremental Checkpoint 做进一步优化等。

基于以上机制, Checkpoint 目前的设计目标就是更轻量以及更快速的恢复。

f7d29b8d137a11ffebeca84b8120ba3d.png

那 Flink 在 Chekpoint 性能上做了哪些优化呢?

最早期,在支持轻量级异步 Snapshot 算法后, Flink Checkpoint 的性能往前迈了一大步。在该机制下,Flink 将  Barrier 作为特殊 Record 在 Graph 中流动,同时将耗时较大的文件上传等工作放到异步过程中进行,极大降低了对主流程的影响。

在 1.0 版本中,Flink 开始支持 RocksDB StateBackend ,这对大状态下的存储提供了很好的支持。1.3 版本 Flink实现了基于 RocksDB Incremental Checkpoint 的机制,进一步提升了Checkpoint 在异步阶段的性能。1.11 版本 Flink 引入了 Unaligned Checkpoint,并在 1.13 版本达到了 Production-ready 状态,对于 Barrier 对齐有瓶颈的作业,这个技术让作业在反压比较严重的情况下依然可以做出 Checkpoint 。1.14 版本引入的 Buffer Debloating 可以通过动态调整 Network Buffer 大小来加速 Barrier 流动,进一步加速 Aligned Checkpoint 完成,减少 Unaligned Checkpoint 存储的数据量。1.15 和 1.16 中 Flink 引入了 Changelog StateBackend ,它通过更通用的 Incremental Checkpoint 机制进一步提升了 Checkpoint 的异步性能。

7a30be1544e538d8dd270c433e221d1f.png

我们可以通过这张图来看一下这些技术在作业执行链路中的作用。

当 Checkpoint 触发时, Barrier 会随着 Graph 流动。当打开 Buffer Debloating 后,Flink 会通过计算吞吐等方式动态调整 Network Buffer 的大小来加速 Barrier 传递。当 Barrier 到达 Stateful 节点时,如果是 Aligned Checkpoint ,则算子会等待 Barrier 对齐再触发后续的 Checkpoint 行为;如果是 Unaligned Checkpoint ,则会直接将 Barrier 传递给后续算子,同时触发 StateBackend 上的 Checkpoint,并将 Buffer 中的内容存储到 HDFS ,不会阻塞,如图中间的实线所示。

在 StateBackend 内部触发 Checkpoint 时,基于异步 Checkpoint 算法,异步部分会进行文件上传,如图中实线所示。在开启了 RocksDB Incremental Checkpoint 时,会做增量文件上传,如上图中,只有更新后的 S2、S4 需要上传到远端。

Changelog StateBackend 的核心机制如上图最下方虚线所示,会将原先的 StateBackend 异步上传和 Changelog 部分进行解耦,引入了额外独立上传到 HDFS 的过程,这个过程将稳定、持续地发生在作业运行过程中。

b2b29b390e69d8b81c6ec757aec5afca.png

我们可以通过 Checkpoint Metrics 相关信息来直观感受下这些技术的作用。

Metrics 上关于 Size 的关键指标有 Checkpointed DataSize 和 Full Checkpoint DataSize 两个,是 1.15 版本在 Flink UI 中透出的指标,我们可以通过这两个指标直接看出增量 Checkpoint 的性能以及存储在远端的空间大小。

剩余的指标主要是关于耗时的,StartDelay 指标过大通常是因为作业反压,一般需要先排查作业逻辑。此外,StartDelay 过大时,如果作业逻辑允许,可开启 Unaligned Checkpoint 和 Buffer Debloating 以加速 Checkpoint 完成。如果 Aligned Duration 指标比较长,可以考虑开启 Unaligned Checkpoint 。

同步 Duration 和异步 Duration 是整个 Checkpoint 是我们通常最关注的两个部分,而异步部分的耗时通常是最常见的瓶颈点。异步部分的优化可以通过开启 Incremental Checkpoint 以减少异步上传量,或者通过开启 Changelog StateBackend 来进一步缩短异步耗时。

02

解析 Changelog

0c8f28a2563fc6af8a2da3c17d74c69d.png

Changelog 的核心目标如下:

  1. 更稳定的 Checkpoint:通过解耦 Compaction 和 Checkpoint 过程,使 Checkpoint 更稳定,大幅减少 Checkpoint Duration 突增的情况,还可进一步减少 CPU 抖动,使网络带宽变得更平稳。

    在大规模、大状态作业上经常会出现 CPU 随着 Checkpoint 周期性抖动,进而影响作业和集群稳定性的情况。Changelog 通过解耦 Checkpoint 触发 Compaction 的过程,可以使 CPU 变得更平稳。另外,在异步过程中,Compaction 导致的大量文件同时上传有时会将网络带宽打满, 而 Changelog 是能够缓解该状况的。

  2. 更快速的 Checkpoint:Checkpoint 期间会上传相对固定的增量,可以达到秒级完成 Checkpoint 的目标。

  3. 更小的端到端延迟:Flink 中实现端到端的 Exactly-once 语义主要依赖于 Checkpoint 的完成时间。Checkpoint 完成越快,Transactional sink 可以提交得更频繁,保证更好的数据新鲜度。后续可与 Table Store 结合,保证 Table Store 上的数据更新鲜。

  4. 更少的数据回追:通过设置更小的 Checkpoint Interval 加速 Failover 过程,可以减少数据回追。

    虽然目前 Changelog 的机制下,Restore 时在 TM 上会有额外的 Replay 时间开销,但总体来看,耗费的时间还是相对减少的。

92c7e07419fd2bcae00288ab81f7ff21.png

那 RocksDB Incremental Checkpoint 为什么做不到快速且稳定呢?

我们先看下 RocksDB 的访问机制:当一条 Record 写到 RocksDB 时,首先会写到 Memtable ,数据量达到 Memtable 阈值后会 Memtable 变为 Immutable Memtable;当数据量再达到整个 Memory 所有 Memtable 的阈值后,会 Flush 到磁盘,形成 SST Files 。L0 的 SST files 之间是有 Overlap 的 。Flink 默认使用 RocksDB 的 Level Compaction 机制 ,因此在 L0 达到阈值后,会继续触发 Level Compaction,与 L1 进行 Compaction ,进一步可能触发后续 Level Compaction。

我们再来看一下 Checkpoint 同步阶段和异步阶段做了些什么。在同步过程中,Checkpoint 首先会触发 Memtable 强制 Flush,这一过程可能会触发后面级联的 Level Compaction,该步骤可能导致大量文件需要重新上传。同时,同步过程中会做 Local Checkpoint ,这是 RocksDB 本地的 Checkpoint 机制,对 Rocksdb 而言其实就是 Hard Link 一些 SST Files,是相对轻量的。异步过程会将这些 SST Files 上传,同时写入 Meta 信息。

我们可以看到两个重要部分:

  1. 数据量达到阈值,或者 cp 的同步阶段,是会触发 Memtable Flush,进一步触发级联 Level Compation,进一步导致大量文件需要重新上传的

  2. 在大规模作业中,每次 Checkpoint 可能都会因为某一个 Subtask 异步时间过长而需要重新上传很多文件。端到端 Duration 会因为 Compaction 机制而变得很长。

fdc6deea882a4bc81661ae1f933caa05.jpeg

那么 Changelog 做了什么改进呢?

在介绍 Changelog 的机制之前,我先介绍下几个内部的术语

State Table 是本地状态数据读写结构,比如 RocksDB。我们更倾向于将 Changelog 理解为 StateBackend 上的功能增强,已有的 StateBackend(HashmapStateBackend/RocksDBStateBackend,或者自定义的一种StateBackend)均可以打开该功能 。而且我们在 1.16 中实现了 Changelog 开到关和关到开的兼容性,用户可以非常方便地在存量作业中使用。

Materialization 是 State Table 持久化的过程,可以理解为 RocksDBStateBackend 或 HashmapStateBackend 做 Checkpoint 的过程。目前会定时触发,完成一次成功的 Materialization 后会 Truncate 掉 Changelog ,即做 Changelog 的清理。

DSTL 是 Changelog 的存储组件。Changelog 的写入需要提供持久化、低延迟、一致性及并发支持。目前基于 DFS 实现了 DSTL,后续我们将继续探索其他实现方式。

527282425c2883855ab3193314d5093d.png

Changelog 的机制很像 WAL 的机制。

如图所示,图中下面部分为 State Table ,上面为 Changelog 的存储部分即 DSTL 。

首先,在状态写入时,会同时写到 State Table 和 DSTL,如果 State Table 是 Rocksdb,那么它的后续流程就像我们刚才提到的一样,包括写 Memtable,Flush,触发 Compaction 等等过程。DSTL 这个部分会以操作日志的方式追加写入 DSTL,我们也支持了不同 State 类型的各种操作的写入。

其中 DSTL 会有一套完整的定时持久化机制持久化到远端存储中,所有 Changelog 将会在运行过程中连续上传,同时在 Checkpoint 上传较小的增量。

State Table 会定时进行 Materialization,在完成一次完整的 Materialization 后将会对 Changelog 进行 Truncate,清理掉失效的 Changelog,然后新的 Checkpoint 将以这个 Materialization 为基准继续持续上传增量。

我们按读写流程、Checkpoint 流程、Restore 流程再总结下这个过程。

在状态写入时,会双写到 State Table 和 Dstl,读取时会从 State Table 中读取,即读写链路上只是多了个双写,且 Dstl 的部分是 append-only 的写入,会非常轻量级。

在 Checkpoint 时,依赖于定时 Materilize State Table,以及定期 Persist Changelog,需要上传的增量会非常小,在上传小增量后只需要把 Materialization 部分的 Handle 和 Changelog 部分的 Handle 组合返回给 jm 即可。同时我们也会依赖于 Truncate 机制去及时清理无效 Changelog。

在 Restore 时,我们拿到了 State Table 的 Handle 和 Changelog 的 Handle,State Table 部分会按之前的方式进行 Retsore,比如是 Rocksdb,那么在没开启 Local Recovery 时,会先下载远端 SST Files,再 Rebuild。Changelog 部分再没有开启 Local Recovery 时,会先下载远端 State Change,然后重新 Apply 到 State Table,Rebuild Changelog 部分。

b9e4f92ab8c46988b7be4c9d773de355.jpeg

Changelog 开启后,Checkpoint 文件上传过程会变得非常平滑。此前,只有在 Checkpoint 时才会上传文件;而现在有了 Materialization 以及定期做 Changelog 增量上传,实际做 Checkpoint 时需要上传的增量变得非常小。

9678c2607be1b6f77256a3149f56659e.png

上图为相关常用参数的含义和使用方法。

307ceddc52b77c12787a38e54f1ebe00.png

Changelog 能够使 Checkpoint 更快速以及更稳定,但是会存在三个额外开销:

  • 额外的存储空间。Truncate 之前,State Changelog 会一直占用额外的存储空间。

  • 额外的恢复开销。Restore 过程需要额外 Apply 以及额外下载,因此也需要额外的恢复,恢复过程会占用耗时。

  • 额外的性能开销。State Changelog 会做定时上传,存在一定的性能开销。

558c870c0cece8cccd026f5bcbadaeac.jpeg

我们基于 RocksDB incremental 与 Changelog 做了 Benchmark 。Changelog 下使用的 State Table 为 RocksDB,开启 Incremental ,使用 OSS 作为存储介质;将 Checkpoint Interval 设置为 1 秒,对 RocksDB 而言意味着尽可能快地执行 Checkpoint ;将 Materialization Interval 设置为 3 分钟,Source Rate 设置为 10k /s,该速率对于两者而言都是比较日常的流量。

f651abd38ae79b50b95bd5602903404c.png

结果显示,RocksDB 侧 Duration 不稳定,时少时多。Checkpoint Datasize 存在周期性特征,每隔 4 个会增加,这是由于 Checkpoint 期间会 Flush Memtable 触发 Compaction 导致,且 RocksDB 配置 L0 的阈值为 4。而 Changelog 部分执行很稳定。

3fe9d36febd80441429d3db401732dd8.png

上图展示了 Checkpoint Duration 情况,我们截取了 P99 的数据做了两组参数,主要针对不同单并发 State Size 。结果显示,CP 端到端的延时,Changelog 可以在 1s 以内完成,RocksDB 延迟约 17 秒。

a23ffe58584081a932daca3084ab003b.png

大 State 在 1GB 单并发场景下,空间开销约为 1.2-1.5 倍。在内存中会更多一些,因为 RocksDB 在内存中数据会变得更紧致。

实际测试中,Sliding Window 场景下的空间消耗会更明显,可能会超出两倍。因为 Sliding Window 时,每个 Windows 之间 State 不共享,会存储多份。另外,Sliding Window 在 Checkpoint 期间会不断触发 Purge 操作,Sliding Size 设置越小,Purge 越多,RocksDB 相对能更好地合并 Push 操作。另外,因为 Changelog 使用了操作日志方式存储,Truncate 比较慢,部分全量会放得很大。

针对于 Sliding Window 的优化也一直在讨论中,比如支持 Changelog 之间的 Merge 方式。目前机制下,是否启用 Changelog 是空间放大与 Checkpoint 稳定性和更快速度之间的取舍。目前云上的空间相对廉价,因此在大多数情况下,牺牲空间换取性能和稳定性的方案是可以接受的。

28ed6d7671db7d8f149e80b580323015.png

将 Local Recovery 关闭后,RocksDB 与 Changelog 之间存在 9 秒的时间差。结合 Local Recovery 功能将 download 部分的差距抹掉,最终时间差距为 3 秒左右。在极限 TPS 上,Changelog 会有 10% 的损耗。测试时将整个 Benchmark 打满,打到反压后测试其极限状态。后续我们将针对 DSTL 上传部分做优化。

值得注意的是,这里测试的是极限情况。日常情况下,两者的 TPS 性能相差不多,甚至 Changelog 更优。因为在 同样的 Interval 设置下, RocksDB Compaction 会变得更频繁。

cac8740025135ea31758efbbdfd51843.jpeg

未来,我们的优化方向主要包含以下三个:

  • 第一,减少空间消耗,以及优化极限 TPS 场景。

  • 第二,结合 Failover 2.0,使 Checkpoint 做得更快、使 Recovery 变得更快以及实现 Reactive Mode 功能。因为在 Reactive Mode 下,增加了资源后会依赖于该机制做 Failover 后重启。如果能够将 Checkpoint 做得更快,状态恢复也会变得更快。

  • 第三,让 Table Store 获得更好的数据新鲜度。


03

一览 State/Checkpoint 优化

95188609346cac4291b40a8c0e2b6b24.png

1.16 版本在 State 和 Checkpoint 方面也做了不少优化。

可用性方面,做了针对于 RocksDB 监控和可用性的提升,导出了 Database Level 监控。同时也提高了 Unaligned Checkpoint 和 Aligned  Checkpoint 之间的切换可用性。

性能方面,基于 DeleteRange 将 RocksDB Rescale 性能提升 2-10 倍,基于 Overdraft Buffer 提升了 Checkpoint 性能。

■ 后面在 Flink Forward Asia 2022 核心技术专场分享的文章中,会有更加完整的展示!

Q&A

Q:RocksDB 与 Changelog 两种存储,相当于在 HDFS 上存了两份,文件系统容量 1.2 是如何计算得出的?

A:按 Full Checkpoint Size 计算得出。

Q:在 HDFS上的容量应该相当于两倍?

A:不一定是两倍。做完一次 Materialization 后,Changelog 部分的增量是会变化的。HDFS 上也会做定时清理,避免膨胀。

Q:HDFS 有两份,在恢复时也下载了两份数据,需要做数据版本合并吗?

A:是的。RocksDB 先恢复,然后将 Changelog Apply ,类似于合并操作。

Q:Changelog 数据应该已经比较准确,再做合并操作是否冗余?

A:Changelog 机制是基于 State Table 上做增量,因此作业恢复时还是需要全量数据。

往期精选

7095b4348233f20ccc63cc712a63df1c.png

3b120cb85eb4864b08a39c97f56b6274.jpeg

ef3f7fb902db3662b9aa7a15a0aa81f6.jpeg

a4ca718977544cc1ec1d08b70c608dde.jpeg

e6af87acb7cc9e7bc30fbc1c3246a62c.jpeg

▼ 关注「Apache Flink」,获取更多技术干货 ▼

08fce6872055e8b5deb81700384e0916.png

 1a21307874bc8701832722aab196d118.gif  点击「阅读原文」,查看原文视频&演讲 PPT

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/143887.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

nodejs+npm的安装与配置

下载与安装下载点击_Node.js中文网_根据自身系统进行下载即可(建议下载10版本的,尽量不要下最新版本的)配置nodejs相关配置简介:Node.js是一个基于Chrome V8引擎的JavaScript运行环境,让JavaScript 运行在服务端的开发…

如何理解「异或(XOR)」运算在计算机科学中的重要性

前言 在学逻辑学的时候,基本的逻辑运算是非、与、或,且并没有得到特别的强调,而且事实上异或可以由这三个逻辑运算符表出。可是在计算机领域,异或似乎处于与非、与、或并列的关系,例如 C 语言的位运算符中就有专门的异…

pmp考试需要注意哪些?

注意的事情还是蛮多的,所以这里总结一个关于考试注意的点 先来分享一些总结的考试经验: 1、规划好时间:答题时间、填答题卡时间、检查时间,一般来说一道题答题时间和填答题卡的时间最好在一分钟左右,个别的题不能超过…

8.0、Linux-账号管理学习

8.0、Linux-账号管理学习 账号管理 - 简介 Linux 系统是一个多用户多任务的分时操作系统,任何一个要使用系统的用户,都必须首先向系统的管理员申请一个账号,然后以这个账号的身份进入系统; 用户的账号一方面可以帮助系统管理员对使…

第一次设计产品logo需要注意的5点细节

对于每一个公司企业/产品来说,logo不仅是公司品牌的代表,也是影响公司运营和宣传的关键因素。为了充分发挥公司标志的作用,在企业成立之初就要把企业标志的设计作为一项重要工作来抓,以保证标志最终发挥积极作用。为了使标志设计符…

useEffect和useLayoutEffect的区别

使用方式 这两个函数的使用方式其实非常简单,他们都接受一个函数一个数组,只有在数组里面的值改变的情况下才会再次执行 effect。 差异 useEffect 是异步执行的,而useLayoutEffect是同步执行的。useEffect 的执行时机是浏览器完成渲染之后&…

修复被删除的数据库表

1.问题来源 有一天领导让我对比生产数据库表和测试数据库表,要确保表结构,字段类型一致。于是我导出测试环境数据库表的DDL,在导出表的时候有blob和clob的表报错,于是我就想把它给剔除再导出,就这样数据库表被我删掉了…

c++调python踩坑日志

目录 import_array();报错 矩阵互相转换 #include numpy相关vs2019配置 import_array();报错 参考:https://blog.csdn.net/weixin_40232401/article/details/106944336#:~:text%E5%9C%A8,import_array%20%28%29%E5%87%BA%E7%8E%B0%E6%8A%A5%E9%94%99%EF%BC%8C%E6…

renix如何查看时延和抖动和丢包——网络测试仪实操

目录 查看时延和抖动​ 一、预约测试资源 ​ 二、新建流​ 三、查看时延和抖动​ 查看丢包​ 一、预约端口​ 二、创建Raw流​ 三、如何查看流量的实时丢包个数和丢包比例​ 查看时延和抖动​ 一、预约测试资源 ​ 打开Renix软件,连接机箱, 预约端口​ 二…

课题-基于安卓androidstudio的团购app

一、课题介绍 客户端: 1:注册登录:用户使用注册的账号密码进行登录; 2:查看商品:用户可以查看发布的商品信息; 3:分类查看:用户可以通过分类的查看商品信息;…

MySQL索引的数据结构

索引的数据结构 本专栏学习内容来自尚硅谷宋红康老师的视频 有兴趣的小伙伴可以点击视频地址观看 1. 为什么要使用索引? 索引是存储引擎用于快速找到数据记录的一种数据结构,就好比去图书馆找书,或者新华字典里找字,相当于一个目…

SQL用法详解

1.SQL语言是什么?有什么作用?SQL:结构化查询语言,用于操作数据库,通用于绝大多数的数据库软件2.SQL的特征大小写不敏感需以;号结尾支持单行、多行注释3操作数据库的SQL语言基于功能可以划分为4类:数据定义:DDL ( Data Definition Language)&#xff1a…

校验、异常处理

前端校验完后,后端需要再做一次校验 JSR303 定义了数据校验的标准 使用步骤 为Bean标识注解,并自定义错误提示 import javax.validation.constraints自定义规则:一个小写或大写字母 Email、Future、NotBlank、Size 等 不推荐使用NotEmp…

Linux下命令(2)

Linux下命令(2) 1. 解压缩命令 Linux 下最常用的打包程序是 tar 命令, 使用 tar 打出来的包我们常称为 tar 包, tar包文件的命令通常都是以.tar 结尾的,生成 tar 包后,就可以用其它的程序来进行压缩了。   功能: ta…

Python程序的构成

1.开始学习图形化程序设计 >>> import turtle #导入turtle模块 >>> turtle.showturtle() #显示箭头 >>> turtle.write("文字") #写字符串 >>> turtle.forward(300) #前进300像素 >>> turtle.c…

基于C++的AGV机器人无线控制实现

AGV系统概述 AGV原理 AGV行走控制系统由控制面板、导向传感器、方向电位器、状态指示灯、避障传感器、光电控制信号传感器、驱动单元、导引磁条、电源组成。 AGV的导引(Guidance)是指根据AGV导向传感器(Navigation)所得到的位置…

Blender如何打开IFC数据?

Blender如何打开IFC数据安装blenderbimIFC介绍下载和安装BlenderBIM插件Blender打开IFC数据对于一个外行人,当我想查看IFC数据的呈现形式时,但是我又没有Revit软件,那么我想到了Blender,网上查了只需要安装BlenderBIM插件&#xf…

表单验证[用户名、邮箱、密码、重复密码]

<!DOCTYPE html> <html> <head> <meta charset"utf-8"> <title>表单验证</title> <link rel"stylesheet" href"form.css"> <!-- 引入样式 --> &l…

C++语法3——if switch break continue的定义及用法

接上节 循环语句 这一节写的是判断语句 if else语句 基本语法&#xff1a; 第一种&#xff1a; if(bool(布尔变量)) {如果bool值为真执行的语句; } else {如果bool值为假执行的语句; }如果布尔表达式为 true&#xff0c;则执行 if 块内的代码。如果布尔表达式为 false&#x…

2023北京/上海/广州/深圳物联网产品经理班招生简章

NPDP产品经理国际资格认证是国际公认的唯一的新产品开发专业认证&#xff0c;集理论、方法与实践为一体的全方位的知识体系&#xff0c;为公司组织层级进行规划、决策、执行提供良好的方法体系支撑。 我们针对互联网时代的个人、互联网企业、与传统企业推出一系列学习。 课程从…