Flink CDC MongoDB 联合实时数仓的探索实践

news2024/11/23 21:41:25

摘要:本文整理自 XTransfer 技术专家, Flink CDC Maintainer 孙家宝,在 Flink Forward Asia 2022 数据集成专场的分享。本篇内容主要分为四个部分:

  1. MongoDB 在实时数仓的探索

  2. MongoDB CDC Connector 的实现原理和使用实践

  3. FLIP-262 MongoDB Connector 的功能预览

  4. 总结和展望

点击查看原文视频 & 演讲PPT

一、MongoDB 在实时数仓的探索

MongoDB 是一款非关系型的文档数据库,支持大规模的数据存储和灵活的存储结构,在 XTransfer 内部有着比较大规模的应用。

另外,XTransfer 在实时数仓方面也有着积极的探索,除了目前比较流行的基于湖技术的构建实时数仓的方式外,Flink 和 MongoDB 也有着构建轻量级实时数仓的潜力。

1.1 MongoDB 简介

1

MongoDB 是一种面向文档的非关系型数据库,支持半结构化的数据存储。它还是一种分布式的数据库,提供副本集和分片级两种集群部署模式,具有高可用和水平扩展的能力,适合大规模的数据存储。另外,MongoDB 在 3.0 版本之后还引入了 Change Streams 特性,支持并简化了数据库的变更订阅。

1.2 常见的实时架构选型

2

  • Flink 和 Kafka 纯实时链路的实时数仓。

优势包括,数据新鲜度高;数据写入较快;Kafka 的周边组件生态较好。

缺陷包括,中间结果不可查。Kafka 是线性存储,记录了数据的每一次变更,因此如果要得到最新的镜像值,需要遍历所有在 Kafka 中的记录,因此也无法进行比较灵活快速的 OLAP 查询,对于排查问题方面也比较困难;Kafka 的冷热分离还有待实现,不能充分利用一些廉价存储;这套架构一般需要额外维护两套流批架构,对部署开发运维成本会较高。

3

  • 基于湖存储的实时数仓架构。

目前比较流行的数据湖 Iceberg、Hudi,同时支持了批式读取和流式读取的能力,可以通过 Flink 实现流批一体的计算能力,其次,湖存储在存储上会充分考虑如何利用廉价存储,相对于 Kafka 具有更低的存储成本。

但基于湖存储的实时数仓也有一些缺点,包括部署成本较高,例如需要额外部署一些 OLAP 查询引擎。其次,对于数据权限也需要额外的组件来支持。

4

  • 基于 MongoDB 的实时数仓。

MongoDB 本身支持大规模数据集存储,也支持灵活的数据格式;MongoDB 的部署成本低,组件依赖少,并且有完整的权限控制。相比于其他的实时数仓架构,Flink 和 MongoDB 也有着构建轻量级实时数仓的潜力。这种模式要求 Flink 对 MongoDB 拥有流式读取、批式读取、维表查询和行级写入的能力。

目前全增量一体化流式查询可以通过 Flink CDC MongoDB Connector 提供,批式读取维表查询写入的功能可以由 FLIP-262 MongoDB Connector 提供。

二、MongoDB CDC Connector 的实现原理和使用实践

2.1 MongoDB CDC Connector

5

MongoDB CDC Connector 由 XTransfer 基础架构团队开发,并已贡献给了 Flink CDC 社区。在 Flink CDC 2.1.0 版本中正式引入,支持了全增量一体化的 CDC 读取以及元数据提取的功能;2.1.1 版本中,支持连接未开启认证的 MongoDB;2.2.0 版本中,支持正则表达式筛选的功能;2.3.0 版本中,基于增量快照读框架,实现了并行增量快照读的功能。

2.2 Change Streams 特性

6

MongoDB CDC Connector 是基于 MongoDB Change Streams 特性来实现的。MongoDB 是一个分布式的数据库,在分布式的环境中,集群成员之间一般会进行相互复制,来确保数据的完整性和容错性。与 MySQL 的 Binlog 比较类似,MongoDB 也提供了 oplog 来记录数据的操作变更,次要节点之间通过访问主节点的变更日志来进行数据的同步。

我们也可以通过直接遍历 MongoDB oplog 的方式来获取数据库的变更。但分片集群一般由多个 shard 组成,每个 shard 一般也是一个独立的副本集。在分片上的 oplog 仅包含在它分片范围里的数据,因此我们需要遍历所有 shard 上的 oplog,并把它们根据时间进行排序合并,这显然会比较复杂。

值得一提的是,在 MongoDB 3.6 版本之后,引入了 Change Streams 特性,提供了更简单的 API 来简化数据订阅。

7

使用 Change Streams 的 API,我们可以屏蔽遍历 oplog 并整合的复杂度,并且支持实例、库、集合等多种级别的订阅方式,以及完整的故障恢复机制。

2.3 Change Streams 的故障恢复

8

MongoDB 通过 ResumeToken 来进行断点恢复,Change Streams 返回的每条记录都会携带一个 ResumeToken,每个 ResumeToken 都对应了 oplog 中的一条具体记录,表示已经读到的 oplog 的位置。另外,还记录了变更时间以及变更文档主键的信息。通过 ResumeAfter、startAfter 等方法,将 ResumeToken 作为起始参数可以对中断的 Change Streams 进行恢复。

Change Streams 的 ResumeToken 是由 MongoDB KeyStream 编码的一个字符串,它的结构如上图左侧所示。ts 代表数据发生变更的时间,ui 代表发生变更 collection 的 UUID,o2 代表发生变更的文档的主键。详细的 oplog 字段描述可以参考 oplog_entry 。

上图右侧是一个 oplog 的具体记录,它描述了在 107 结尾主键下的一条记录的一次更改,将 weight 字段修改成了 5.4。值得一提的是 MongoDB 在 6.0 版本中并没有提供变更前和变更后完整的镜像值。这也是我们没有直接采用 oplog 去实现 MongoDB CDC Connector 的一个原因。

2.4 Change Streams 的演进

9

MongoDB 在 3.6 版本中正式引入了变更流特性,但仅支持对于单个集合的订阅。在 4.0 版本支持了实例、库级别的订阅,也支持了指定时间戳开启变更流的功能。在 4.0.7 版本引入了 postBatchResumeToken:

在 4.0 版本之前打开一个变更流后,如果没有新的变更数据产生,那么将不会获取到最新的 ResumeToken。如果此时发生故障,并且尝试使用了比较老旧的 ResumeToken 来恢复,可能会降低服务器的性能,因为服务器可能会需要扫描更多的 oplog 的条目。如果 ResumeToken 对应的 oplog 被清除了,那么这个变更流将无法进行恢复。

为了解决这个问题,MongoDB 4.0 提供了 postBatchResumeToken,标记已经扫描的 oplog 的位置,并且会随时间持续推进。另外,利用这个特性,我们可以比较准确的定位当前 Change Streams 消费的位置,进而实现增量快照读的功能。

在 MongoDB 4.2 版本,可以使用 startAfter 去处理一些 invalid 的事件,在 MongoDB 5.1 版本对 Change Streams 进行了一系列的优化。在 MongoDB 6.0 版本,提供了 Change Streams 前置、后置镜像值完整信息,以及 Schema 变更的订阅机制。

2.5 MongoDB CDC Connector

10

MongoDB CDC Connector 的实现原理,是利用了 Change Streams 的特性,将增、删、改等变更事件转换成 Flink 的 upsert 类型的变更流。在 Flink SQL 场景下,Planner 会加上 Changelog Normalize 的算子,将 upsert 类型的变更流进行标准化。结合 Flink 强大的计算能力,容易实现实时 ETL 甚至异构异构数据源的计算场景。

11

在 Flink CDC 2.3 版本,依托于增量快照读框架实现了无锁快照读的功能,支持并发快照,大大缩短了快照时间。关于增量快照读的总体流程是如上图所示。为了让 snapshot 并行化,首先要将完整的数据集切分成多个区块。将这些区块分配给不同的 Source Reader 并行读取,以提升整个 snapshot 的速度。但 MongoDB 的主键它多为 ObjectId,不能按照简单的增加范围的方式去切分,因此对于 MongoDB 的切分策略需要单独去设计。

12

MongoDB 有以下三种切分策略,这些切分策略参考了 Mongo Spark 项目。

  • 第一种切分策略使用了 Sample 命令对集合进行随机采样,再通过文档的平均大小计算出分桶数量。然后将采样数据分配到各个桶中,构成 Chunk 的边界。优点是速度快,适用于数据量大且未分片的集合。缺点是采用抽样预估,Chunk 的大小不能做到绝对均匀。

  • 第二种切分策略使用了 SplitVector 命令。SplitVector 是 MongoDB 分片式计算分裂点的内部命令,通过访问指定索引来计算每个节点 Chunk 的边界。优点是速度快,并且 Chunk 的大小均匀,但它额外需要 SplitVector 命令的执行权限。

  • 第三种切分策略针对于分片集合,对于已经分片好的集合,我们不用重新计算它的分片结果,可以直接读取 MongoDB 已经分片好的结果作为 Check 的边界。优点是速度快,Chunk 的大小均匀,但是 Chunk 的大小无法调节,依赖于 MongoDB 自身对于每个分片的配置,默认大小为 64mb,另外它额外需要对 config 库的读取权限。

13

接下来介绍一下增量快照读的过程。对于一个已经切分好的区块,在快照执行前后分别记录当前 Change Streams 的位置。在快照结束之后,根据快照起始、结束的位点范围,对变更流进行回放,最后将快照记录和变更记录按 Key 进行合并,得到完整的结果,避免了重复数据的发送。

14

在单个 Chunk 的增量读阶段,我们读取了 Chunk 范围内的快照数据以及 Chunk 范围内的增量数据,并将其进行合并。但整体的 snapshot 的过程可能并没有结束,那么已经完成 snapshot 的区块,在后边的时间仍然可能会发生变更,因此我们需要对这些变更数据进行补偿。从全局最低的高水位点处开始启动变更流,对于变更时间高于 Chunk 高位点的变更数据进行补偿。当达到全局 snapshot 最高位点的时候,我们的补偿便可以结束。

15

接下来介绍一些关于 MongoDB CDC Connector 的生产建议。

  • 第一,使用增量快照特性,MongoDB 的最小可用版本在 4.0 版本。因为在 4.0 版本之前,没有发生变更时无法获取 ResumeToken,且也不能够从指定的时间点进行启动,因此难以实现增量快照特性。

    在 MongoDB 4.0.7 版本之后,引入了 postBatchResumeToken,可以比较容易的获取当前 Change Streams 的位置,因此比较推荐的版本在 4.0.7 以上。

16

  • 第二,控制文档的大小不要超过 8mb,因为 MongoDB 对单条文档有 16 mb 的限制,变更文档因为包含一些额外的信息,比如修改的字段是哪些等等,即使原文档没有超过 16mb,变更文档也会超过 16mb 的限制,从而导致 Change Streams 异常终止。这个应该属于 MongoDB Change Streams 的一个缺陷。

    关于 MongoDB 的变更文档可以超过 16mb 的限制,已在 MongoDB 的 issue 中进行推进。

17

  • 第三,在 MongoDB 中分片件其实在开启事务之后允许被修改。但修改分片键可能会引起分片的频繁移动,引起额外的性能开销。另外,修改分片键还可能导致 Update Lookup 功能失效,在 CDC 的场景中可能会导致结果的不一致。

三、FLIP-262 MongoDB Connector 的功能预览

上面我们介绍了 MongoDB CDC Connector,可以对 MongoDB 进行增量的 CDC 读取,但如果要在 MongoDB 上构建实时数仓,我们还需要对 MongoDB 进行批量读取、写入以及 Lookup 的能力。这些功能在 FLIP-262 MongoDB Connector 中进行实现,目前已经发布第一个版本。

3.1 FLIP-262 Introduce MongoDB Connector

18

在并行读取方面,MongoDB Connector 基于 FLIP-27 新的 Source API 实现;支持批量读取;支持 Lookup。在并行写入方面,基于 FLIP-177 Sink API 实现;支持 Upsert 写入。在 Table API 方面,实现了 FLIP-95 Table API 使用 Flink SQL 进行读取或写入。

3.2 读取 MongoDB

19

首先我们在 MongoDB 中插入一些测试数据,然后使用 Flink SQL 定义一张 users 表,通过 select 语句我们可以得到右边所示的结果。可以发现右边的结果和我们插入的测试数据是一致的。

3.3 写入 MongoDB

20

首先我们定义一张 users snapshot 的结果表,对应 MongoDB users snapshot 的集合。然后我们通过 Flink SQL 的 insert 语句,将上面定义的 users 表集合的数据,读取并写入到 MongoDB。

最后查询一下我们新定义的这张结果表,它的结果如右边所示。可以发现它的结果和之前源表的结果是一致的,这代表着我们写入一张新的集合是成功的。

3.4 用作维表关联

21

接下来来演示一下,将上面定义的 user 表作为维表进行 Lookup 的场景。

首先我们定义一张 pageviews 的事实表,user_id 作为 Lookup Key,对应于我们之前定义的 users 表的主键。然后我们查询 pageviews 表可以得到右边的结果。

22

接着定义一张结果表代表打款以后的结果,这个结果表对于 users 是作为维表关联去补充一些区域信息。然后我们通过 Flink SQL 将 pageviews 事实表和 users 维表进行关联,写入到结果表。然后查询结果表可以得到打宽后 user_region 的信息。如右图所示,打宽以后的 user_region 在最后一列,这说明我们的 Lookup 是成功的。

四、总结和展望

4.1 总结

23

至此,Flink 联合 MongoDB 的实时数仓架构便可以实现,在建设实时数仓时多了一份选择。如图所示,通过 CDC Connector 完成整套流式链路,辅助 Lookup 进行数据打宽。通过 Source Connector 完成一整套批式链路,最后将计算的中间结果通过 Sink Connector 进行存储,那么整套实时数仓的架构便得以实现。

4.2 存在的问题

目前还存在着以下问题:

  • Changelog Normalize 是一个有状态的算子,它会一些额外的状态开销。

  • Update Lookup 的完整状态提取,也需要一定的查询开销。

  • 在 MongoDB 中的文档大小有 16mb 限制。如果有一些很大的单条数据,那么可能并不适合采用这种架构。

4.3 未来规划

在 MongoDB CDC Connector 方面,我们需要:

  • 支持 MongoDB 6.0 版本。

  • 支持指定时间点启动。

  • 推进推进 Changelog Normalize 优化。

在 MongoDB Connector 方面,我们需要:

  • 支持谓词下推。

  • 支持 AsyncLookup。

  • 支持 AsyncSink。

点击查看原文视频 & 演讲PPT

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/777550.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Spring MVC拦截器和跨域请求

一、拦截器简介 SpringMVC的拦截器(Interceptor)也是AOP思想的一种实现方式。它与Servlet的过滤器(Filter)功能类似,主要用于拦截用户的请求并做相应的处理,通常应用在权限验证、记录请求信息的日志、判断用…

多肽试剂1801415-23-5,Satoreotide,UNII-S58172SSTS,应用在多肽标记及修饰上

资料编辑|陕西新研博美生物科技有限公司小编MISSwu​ Satoreotide,UNII-S58172SSTS Product structure Product specifications 1.CAS No:1801415-23-5 2.Molecular formula:C58H72ClN15O14S2 3.Molecular weight:1302.9 4.Packa…

【C++详解】——C++11

目录 C简介 统一的列表初始化 {}的初始化 initializer_list容器 声明 auto decltype nullptr 范围for C简介 在2003年C标准委员会曾经提交了一份技术勘误表(简称TC1),使得C03这个名字已经取代了 C98称为C11之前的最新C标准名称。 不过由于C03(TC1)主…

STM32 串口 DMA 接收任意长度数据

DMA 局限性 DMA 传输完成会产生中断告知 CPU,这对于固定长度的数据是没什么问题的。但是对于不定长的数据就不行了,DMA 一定要接收到足够多(设定的长度)的数据时才产生完成中断,如果接收到的数据量小于设定的长度&…

Linux的基础配置

配置篇 前置步骤:先租一个服务器或装个Linux再或者虚拟机都可以 1.安装gcc,g,gdb 在Linux下我们能用什么工具来编译所编写好的代码呢,其实Linux下这样的工具有很多,但我们只介绍两款常用的工具,它们分别是gcc和g. gcc和g的主要…

解决端口占用

解决办法: 1、换一个其它未被占用的端口 2、端口被占用了,先看下是哪个程序再用,停掉就OK了 下面演示结束端口被占用的程序的过程: 1、查看被占用的端口的进程 netstat -aon|findstr 端口号2、根据PID找到占用此端口的进程 ta…

Redis单机伪集群配置与搭建

目录 1. 复制6个redis配置文件到当前目录下 2. 启动每个节点 3. 判断集群是否可用 4. 初始化集群 5. 查看集群信息 6. 获取与插槽对应的节点 (1)手动重定向 (2)自动重定向 7. 故障恢复 需求:配置一个三主三从的集…

为什么ConcurrentHashMap不允许插入null值而HashMap可以?

为什么ConcurrentHashMap不允许插入null值而HashMap可以? 文章目录 为什么ConcurrentHashMap不允许插入null值而HashMap可以?HashMap源码ConcurrentHashMap源码为什么ConcurrentHashMap需要加空值校验呢?二义性问题测试代码代码分析测试结果结…

纯css3实现水波纹从中心向四周扩散动画

纯css3实现水波纹从中心向四周扩散动画 效果可用于pc端或移动端,引导用户点击,间接带来一定的转化率 示例效果 示例代码 <template><div class"zanbtn-wrap"><div click"handleClick(https://pay.aikelaidev.cn/paypage/?merchant35bdYxSx7dCUr…

子网划分路由网卡

1."IPv4 CIDR" "IPv4 CIDR" 是与互联网协议地址&#xff08;IP address&#xff09;和网络的子网划分有关的概念。 - "IPv4" 代表 "Internet Protocol version 4"&#xff0c;也就是第四版互联网协议&#xff0c;这是互联网上最广泛使…

IPv4 与 IPv6:网络协议的差异和转换方法

在网络的世界里&#xff0c;IPv4 和 IPv6 这两个协议就像是两个不同的王国&#xff0c;各自拥有着独特的领土和规则。虽然它们都是为了互联网的发展而存在&#xff0c;但它们之间究竟有哪些差异&#xff0c;以及如何在这两个王国之间穿梭&#xff0c;仍然是很多网络小白们的困惑…

Spring使用注解存储Bean对象

文章目录 一. 配置扫描路径二. 使用注解储存Bean对象1. 使用五大类注解储存Bean2. 为什么要有五大类注解&#xff1f;3.4有关获取Bean参数的命名规则 三. 使用方法注解储存Bean对象1. 方法注解储存对象的用法2. Bean的重命名 在前一篇博客中&#xff08; Spring项目创建与Bean…

Excel中Vlookup

VLOOKUP($A:$A,Sheet3!$A:$D,COLUMN(Sheet3!B1),FALSE) ps: 1.按F4&#xff0c;锁定第一个和第二个参数 2.第二个参数&#xff0c;要选择全部范围(包括被查找列&#xff0c;以及查找内容) 2.第三个参数用column&#xff08;&#xff09;函数&#xff0c;第三列不要锁定 3.…

【Linux】自动化构建工具-make/Makefile详解

前言 大家好吖&#xff0c;欢迎来到 YY 滴 Linux系列 &#xff0c;热烈欢迎&#xff01;本章主要内容面向接触过Linux的老铁&#xff0c;主要内容含 欢迎订阅 YY 滴Linux专栏&#xff01;更多干货持续更新&#xff01;以下是传送门&#xff01; 订阅专栏阅读&#xff1a;YY的《…

Alpha-GO打败⼈类的秘籍- 强化学习(Reinforcement Learning)

为了深⼊理解强化学习&#xff08;Reinforcement Learning&#xff0c;简称RL&#xff09;这⼀核⼼概念&#xff0c;我们从⼀个⽇常游戏的例⼦出发。在“贪吃蛇”这个经典游戏中&#xff0c;玩家需要掌控⼀条蛇&#xff0c;引导它吞吃屏幕上出现的各种果实。每次成功捕获果实&a…

CSS层叠

声明冲突 同一个样式&#xff0c;多次应用到同一个元素。 此时可以看到我们的a元素的样式和浏览器的默认样式发生了冲突 层叠 解决声明冲突的过程&#xff0c;浏览器会自动处理&#xff08;权重计算&#xff09;&#xff0c;权重大的获胜 1.比较重要性 重要性从高到低 1&…

Docker 教程

Docker 是一个开源的应用容器引擎&#xff0c;基于 Go 语言并遵从 Apache2.0 协议开源。 Docker 可以让开发者打包他们的应用以及依赖包到一个轻量级、可移植的容器中&#xff0c;然后发布到任何流行的 Linux 机器上&#xff0c;也可以实现虚拟化。 容器是完全使用沙箱机制&a…

省电液晶驱动IC,VK2C22G,COG片高抗干扰抗噪系列LCD段码驱动芯片,I2C通信接口

型号:VK2C22G DICE(邦定COB)/COG&#xff08;绑定玻璃用&#xff09; VK2C22G概述&#xff1a; VK2C22G是一个点阵式存储映射的LCD驱动器&#xff0c;可支持最大176点&#xff08;44SEGx4COM&#xff09;的LCD屏。单片机可通过I2C接口配置显示参数和读写显示数据&#…

C# 接口2.0 (028 课程)

参考视频教程&#xff1a;B站 刘铁猛 028 接口 依赖反转 单元测试 接口是一种供方和需求方都遵守的契约 — 即规则。 文章目录 原始代码利用用接口简化函数重载紧耦合程序实体松耦合代码实体设计一个紧耦合 -- 接口松耦合 -- 单元测试紧耦合程序实例松耦合程序实体 -- 引入接…

latex子图无法重新编号问题

问题 想让后四个子图&#xff0c;从a开始编号 解决方法 如上原因是由于全局自动编号机制&#xff0c;在 \begin{figure*}[h] \centering下面添加&#xff1a; \setcounter{subfigure}{0}