阿里云 Flink 原理分析与应用:深入探索 MongoDB Schema Inference

news2024/7/4 5:16:41

摘要:本文整理自阿里云 Flink 团队归源老师关于 阿里云 Flink 原理分析与应用:深入探索 MongoDB Schema Inference 的研究,内容主要分为以下四部分:

1. MongoDB 简介

2. 社区MongoDB CDC 核心特性

3. MongoDB CDC 在阿里云 Flink 实时计算产品的实践

4. 总结

Tips:点击「阅读原文」在线观看 FFA 2023 会后资料~

01

MongoDB 简介

MongoDB 是一种面向文档的非关系型数据库,支持半结构化数据存储;也是一种分布式的数据库,提供副本集和分片集两种集群部署模式,具有高可用和水平扩展的能力,比较适合大规模的数据存储。

MongoDB 使用了弱结构化的存储模式,支持灵活的数据结构和丰富的数据类型,适合 Json 文档、标签、快照、地理位置、内容存储等业务场景。它天然的分布式架构提供了开箱即用的分片机制和自动 rebalance 能力,适合大规模数据存储。另外, MongoDB 还提供了分布式网格文件存储的功能,即 GridFS,适合图片、音频、视频等大文件存储。

0b5e65c3f3d583b4e0cabbc5b5e84d6d.png

be80d934fdd3bf6046615213a75ddd54.png

02

社区 MongoDB CDC 核心特性

Flink CDC 是基于数据库的日志 CDC(Change Data Capture)技术,实现了全量和增量的一体化读取能力,借助 Flink 优秀的管道能力和丰富的上下游生态,支持实时捕获、加工多种数据的变更并输出到下游,MongoDB 也是支持的数据库之一,支持的主要特性包括:

  • 支持 Exactly-once 语义

  • 支持全量、增量订阅

  • 支持 Snapshot 数据过滤

  • 支持从检查点、保存点恢复

  • 支持元数据提取

社区 MongoDB CDC 使用了 MongoDB 3.6推出的 Change Streams[1] 特性,通过将 Change Streams 转换成 Flink Upsert changelog,实现了 MongoDB CDC TableSource。在 MongoDB 6.0 之前的版本中,默认不会提供变更前文档及被删除文档的数据,利用这些信息只能实现下图所示的 Upsert 语义。

c9f575c05e3e90f969fb3ad0d0abd09a.png


MongoDB 6.0 的 Pre- and Post-Image[2] 新功能提供了一个更高效的解决方案:只要启用changeStreamPreAndPostImages功能,MongoDB 就会在每次变更发生时,在一个特殊的集合中记录文档变更前后的完整状态。MongoDB CDC 支持读取这些记录并产生完整事件流,从而消除了对 ChangelogNormalize 节点的依赖,该功能在社区和阿里云实时计算Flink产品中均可以支持。

03

MongoDB CDC 在阿里云 Flink 实时计算产品的实践

社区的 MongoDB CDC 作为纯引擎,功能已经十分强大。但作为商业化产品,目前仍然存在着不足之处,即无法支持 Schema 变更。

MongoDB 作为 NoSQL 数据库,没有固定的 Schema 要求,Schema 的变更操作十分常见,但目前社区 MongoDB CDC 只能支持固定 Schema,并且无法支持 Schema 变更。同时社区 MongoDB CDC 需要用户手动定义表的 Schema,在使用上也不够便捷。

为了解决上述不足之处,阿里云 Flink 实时计算产品提供了 MongoDB Catalog,支持 MongoDB 的 Schema 推导,无需手动定义 Schema。并且通过 CTAS/CDAS 语句,能够做到在实时同步 MongoDB 数据的同时将上游表结构(Schema)的变更同步到下游表,提高了建表和维护表结构变更的效率。

3.1 Schema 推导的实现

MongoDB Schema 推导通过 MongoDB Catalog 实现,MongoDB Catalog 会推导出 collection 的 schema,无需手动指定 DDL 即可作为 Flink 源表、维表或结果表使用。Schema 推导的流程包括以下步骤:

■ 3.1.1 数据采样

MongoDB Catalog 会从 Collection 中采样默认 100 条文档数据,若 Collection 中文档数小于该值,则会获取其中的所有数据。

采样数据量可以通过 MongoDB Catalog 提供的配置项 max.fetch.records 进行设置。

■ 3.1.2 Schema 解析

在 MongoDB 中,每个 Document 都是一个 BSON 文档。与 JSON 相比,BSON 类型是 JSON 类型的超集,相比 JSON 额外支持了 DateTime、Binary 等类型。在解析单个 BSON 文档的 Schema 时,BSON 类型会与 Flink SQL 类型进行一一对应。对于 BSON 文档中嵌套的 Document 类型,默认会将其解析为 STRING。

为了能够更好地解析嵌套的 Document 类型,MongoDB Catalog提供了 scan.flatten-nested-columns.enabled 配置项,可以用于递归地解析 Document 类型中的字段。假设初始的 BSON 文档如下:

{
  "unnested": "value",
  "nested": {
    "col1": 99,
    "col2": true
  }
}

如果将 scan.flatten-nested-columns.enabled 设为 false(默认),得到的 Schema 会包含 2 列:

列名Flink SQL类型
unnestedSTRING
nestedSTRING

如果将 scan.flatten-nested-columns.enabled 设为 true,得到的 Schema 则会包含 3 列:

列名Flink SQL类型
unnestedSTRING
nested.col1INT
nested.col2BOOLEAN

除此之外,MongoDB Catalog 还提供了 scan.primitive-as-string 配置项,可以将所有 BSON 基本类型都映射为 STRING。

■ 3.1.3 Schema 合并

当获取到一组 BSON 文档数据后,MongoDB Catalog 会逐条解析 BSON 文档,并按如下规则合并解析出的物理列,最终得到的 Schema 会作为整个 Collection 的 Schema。合并规则如下:

  • 如果当前 BSON 文档解析出的物理列中包含结果 Schema 中没有的字段,则 MongoDB Catalog 会自动将这些字段加入到结果 Schema。

  • 如果当前 BSON 文档解析出的物理列和结果 Schema 出现了同名列,若类型不同,则会按下图的树形结构找到最近公共父节点,作为该同名列的类型。

c2a408fcc98303e13686e5f6d5f5bcc4.png

例如,对于包含如下三条数据的 Collection:

{
  "_id": {
    "$oid": "100000000000000000000101"
  },
  "name": "Alice",
  "age": 10,
  "phone": {
    "mother": "111",
    "fatehr": "222"
  }
}


{
  "_id": {
    "$oid": "100000000000000000000102"
  },
  "name": "Bob",
  "age": 20,
  "phone": {
    "mother": "333",
    "fatehr": "444"
  }
  "address": ["Shanghai"],
  "desc": 1024
}


{
  "_id": {
    "$oid": "100000000000000000000103"
  },
  "name": "John",
  "age": 30,
  "phone": {
    "mother": "555",
    "fatehr": "666"
  }
  "address": ["Shanghai"],
  "desc": "test value"
}

对于以上 3 个 BSON 文档,后两个相比第一个多了 address 和 desc 字段,这两个字段在做 Schema 合并时会合并到最终 Schema 中。后两个文档的 desc 字段类型不同,在解析单个文档的 Schema 时,会将这两个字段分别映射为 Flink SQL 类型的 INT 和 STRING,根据上述 Schema 合并时类型合并规则,最终 desc 字段类型会被推导为 STRING。

因此,MongoDB Catalog 最终得到的 Schema 如下:

列名

Flink SQL类型

备注

_id

STRING NOT NULL

主键字段

name

STRING


age

INT


phone

STRING


address

STRING


desc

STRING

类型合并为STRING

在 MongoDB 中,每个文档都有一个特殊的字段 _id,它用于唯一标识集合(collection)中的一个文档,这个字段在文档创建时会自动生成。

MongoDB Catalog 会把 _id 列作为主键,添加默认的主键约束,确保数据不会重复。

3.2 Schema Evolution 的实现

在将 MongoDB Catalog 中的表作为 CDC source 使用时,考虑到 Collection 中的数据可能会出现新增字段、更改字段类型等 Schema 变更操作,connector 在处理数据时需要考虑到 Schema evolution。

MongoDB CDC connector 会将 MongoDB Catalog 推导出的 Schema 作为初始 Schema,后续在每读到一条 OpLog 操作日志数据时,操作步骤如下:

1. 解析当前数据对应 BSON 文档的 Schema,步骤与上文 BSON 文档 Schema 解析中相同

2. 将步骤 1 中解析出的 Schema 与当前 Schema 合并

3. 比较步骤 2 中合并后的 Schema 与当前 Schema:

  • 若相同,则使用当前 Schema 解析数据

  • 若不同,则更新当前 Schema,并下发 Schema 变更信息

3.3 通过 CTAS/CDAS 语句同步数据和表结构

CTAS 语句支持同步源表的全量和增量数据到结果表中,在同步数据的同时,能够将源表的表结构变更也实时同步到结果表。CDAS 语句支持整库级别的数据实时同步,也能够支持表结构变更的同步。

在使用 CTAS/CDAS 语句同步数据之前,需要先创建 MongoDB Catalog:

CREATE CATALOG <yourcatalogname> WITH(
  'type'='mongodb',
  'default-database'='<dbName>',
  'hosts'='<hosts>',
  'scheme'='<scheme>',
  'username'='<username>',
  'password'='<password>',
  'connection.options'='<connectionOptions>',
  'max.fetch.records'='100',
  'scan.flatten-nested-columns.enable'='<flattenNestedColumns>',
  'scan.primitive-as-string'='<primitiveAsString>'
);

示例如下:

d00594fc46e7c6b74a588558226301b7.png

在创建 MongoDB Catalog 后,可以选择如下几种方式实现数据和表结构的同步:

■ 3.3.1 使用 CTAS 语句同步单个 MongoDB Collection 数据和表结构到下游存储
CREATE TABLE IF NOT EXISTS `${target_table_name}`
WITH(...)
AS TABLE `${mongodb_catalog}`.`${db_name}`.`${collection_name}`
/*+ OPTIONS('scan.incremental.snapshot.enabled'='true') */;

示例如下:

1f1812bd03d5c3f46e4be14464003e97.png

■ 3.3.2 使用多个 CTAS 语句同时同步多个 MongoDB Collection 数据和表结构到下游存储
BEGIN STATEMENT SET;


CREATE TABLE IF NOT EXISTS `some_catalog`.`some_database`.`some_table0`
AS TABLE `mongodb-catalog`.`database`.`collection0`
/*+ OPTIONS('scan.incremental.snapshot.enabled'='true') */;


CREATE TABLE IF NOT EXISTS `some_catalog`.`some_database`.`some_table1`
AS TABLE `mongodb-catalog`.`database`.`collection1`
/*+ OPTIONS('scan.incremental.snapshot.enabled'='true') */;


CREATE TABLE IF NOT EXISTS `some_catalog`.`some_database`.`some_table2`
AS TABLE `mongodb-catalog`.`database`.`collection2`
/*+ OPTIONS('scan.incremental.snapshot.enabled'='true') */;


END;

示例如下:

76ae48ec8c53de8302c886c2c83842cc.png

■ 3.3.3 使用 CDAS 同步 MongoDB 数据库中符合条件的 Collection 数据和表结构到下游存储
CREATE DATABASE IF NOT EXISTS `some_catalog`.`some_database` 
AS DATABASE `mongo-catalog`.`database` INCLUDING TABLE 'table-name'
/*+ OPTIONS('scan.incremental.snapshot.enabled'='true') */;

示例如下:

4c9cb3edcc864f3fd1512f44f81243fe.png

3.4 使用示例

以下示例使用的环境为阿里云实时计算 Flink 版[3]

假设我们需要同步 MongoDB 中单个数据库下所有 Collection 的数据和表结构到 Hologres,MongoDB 中数据可能出现新增字段。

MongoDB 的数据库名称为 guiyuan_cdas_test,其中包含了两个 collection,名称分别为 test_coll_0 和 test_coll_1,我们希望将数据同步到 Hologres 的同名数据库 cdas_test 中。

MongoDB 中两个 collection 的初始数据如下:

599f6d92e0bd39b418f3ea6163b0f290.png

9a4166e1d12bd7728fb9ef158074fdb8.png

创建 MongoDB 和 Hologres 的 Catalog 后,在 SQL 开发页面编写 CDAS 作业,由于 MongoDB Catalog 会对 collection 做 Schema 推导,此处不需要手动定义表的 DDL:

0df1d9eea5e58f381a58eb3569ef08bd.png

部署运行后,可以看到 Hologres 数据库中已经自动创建了 guiyuan_cdas_test 数据库,并且同步了两个表的初始数据:

53d3055e785de38deaf64ef75b9470e0.png

此时,向 MongoDB 的 test_coll_0 中插入一条包含了新字段 address 的数据,同时向 test_coll_1 中插入一条包含了新字段 phone 的数据。

053a288e00eabad56fafdcba635250b9.png

此时观察 Hologres 表,可以看到两个表都已经同步了新的数据和表结构:

def33358b28c0b9f89c7b7c5fac1cedb.png

6a8678fd137b4a2277ebeeffdb634acf.png

04

总结

Flink CDC 基于 MongoDB 的 Change Streams 实现了 MongoDB CDC Source,支持了 MongoDB 的全增量一体化数据同步。在此基础上,阿里云实时计算 Flink 通过 MongoDB Catalog 实现了 MongoDB Schema 推导,配合 CTAS/CDAS 语句,可以在同步数据的同时支持表结构变更的同步,当 Schema 发生变更时,无需修改 Flink 作业即可同步到下游存储,极大地提升了数据集成的灵活性和便利性。


文章超链接:

[1]https://www.mongodb.com/docs/manual/changeStreams/

[2]https://www.mongodb.com/docs/atlas/app-services/mongodb/preimages/

[3]https://help.aliyun.com/zh/flink


Flink Forward Asia 2023 

本届 Flink Forward Asia 更多精彩内容,可点击「阅读原文」或扫描图片二维码观看全部议题的视频回放及 FFA 2023 峰会资料!

de9f334c156d185a3ed280837960d882.png

关注 Apache Flink 公众号,回复 FFA 2023 即可获取 FFA 2023 会后资料查看地址


▼ 关注「Apache Flink」,获取更多技术干货 ▼

0356e2e795540409d67fa4bcd2c2361f.png

 6eb4e025502ecb7514af0616610b3d97.gif  点击「阅读原文」,在线观看FFA 2023 会后资料~

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1406275.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

网络通信(18)-C#TcpClient 和 TcpListener的使用实例

本文演示C#TcpClient 和 TcpListener的使用实例,掌握基本用法。 目录 TcpListener服务器 客户端TcpClient TcpListener服务器 界面 UI代码 namespace TcpListenerDemo {partial class Form1{/// <summary>/// 必需的设计器变量。/// </summary>private System…

【GitHub项目推荐--不错的Rust开源项目】【转载】

01 Rust 即时模式 GUI 库 egui 是一个简单、快速且高度可移植的 Rust 即时模式 GUI 库&#xff0c;可以轻松地将其集成到你选择的游戏引擎中&#xff0c;旨在成为最易于使用的 Rust GUI 库&#xff0c;以及在 Rust 中制作 Web 应用程序的最简单方法。 项目地址&#xff1a;ht…

对齐大型语言模型与人类偏好:通过表示工程实现

1、写作动机&#xff1a; 强化学习表现出相当复杂度、对超参数的敏感性、在训练过程中的不稳定性&#xff0c;并需要在奖励模型和价值网络中进行额外的训练&#xff0c;导致了较大的计算成本。为了解决RL方法带来的上述挑战&#xff0c;提出了几种计算上轻量级的替代方案&…

图像处理算法:白平衡、除法器、乘法器~笔记

参考&#xff1a; 基于FPGA的自动白平衡算法的实现 白平衡初探 (qq.com) FPGA自动白平衡实现步骤详解-CSDN博客 xilinx 除法ip核&#xff08;divider&#xff09; 不同模式结果和资源对比&#xff08;VHDL&ISE&#xff09;_ise除法器ip核-CSDN博客 数…

关于C#中的HashSet<T>与List<T>

HashSet<T> 表示值的集合。这个集合的元素是无须列表&#xff0c;同时元素不能重复。由于这个集合基于散列值&#xff0c;不能通过数组下标访问。 List<T> 表示可通过索引访问的对象的强类型列表。内部是用数组保存数据&#xff0c;不是链表。元素可重复&#xf…

Istio-gateway

一. gateway 在 Kubernetes 环境中&#xff0c;Kubernetes Ingress用于配置需要在集群外部公开的服务。但是在 Istio 服务网格中&#xff0c;更好的方法是使用新的配置模型&#xff0c;即 Istio Gateway&#xff0c;Gateway 允许将 Istio 流量管理的功能应用于进入集群的流量&…

C#基础:用ClosedXML实现Excel写入

直接在控制台输出&#xff0c;确保安装了该第三方库 using ClosedXML.Excel;class DataSource {public int id { get; set; }public string name { get; set; } "";public string classes { get; set; } "";public int score { get; set; } } class Te…

深入浅出hdfs-hadoop基本介绍

一、Hadoop基本介绍 hadoop最开始是起源于Apache Nutch项目&#xff0c;这个是由Doug Cutting开发的开源网络搜索引擎&#xff0c;这个项目刚开始的目标是为了更好的做搜索引擎&#xff0c;后来Google 发表了三篇未来持续影响大数据领域的三架马车论文&#xff1a; Google Fil…

spring Cloud Stream 实战应用深度讲解

springCloudStream 简介 Spring Cloud Stream是一个框架&#xff0c;用于构建与共享消息传递系统连接的高度可扩展的事件驱动微服务。 该框架提供了一个灵活的编程模型&#xff0c;该模型建立在已经建立和熟悉的 Spring 习惯用语和最佳实践之上&#xff0c;包括对持久发布/订…

【Maven】-- 打包添加时间戳的两种方法

一、需求 在执行 mvn clean package -Dmaven.test.skiptrue 后&#xff0c;生成的 jar 包带有自定义系统时间。 二、实现 方法一&#xff1a;使用自带属性&#xff08;不推荐&#xff09; 使用系统时间戳&#xff0c;但有一个问题&#xff0c;就是默认使用 UTC0 的时区。举例…

什么叫特征分解?

特征分解&#xff08;Eigenvalue Decomposition&#xff09;是将一个方阵分解为特征向量和特征值的过程。对于一个 nn 的方阵A&#xff0c;其特征向量&#xff08;Eigenvector&#xff09;v 和特征值&#xff08;Eigenvalue&#xff09; λ 满足以下关系&#xff1a; 这可以写…

Python批量自动处理文件夹

假如在你的电脑硬盘某文件夹里有这样一堆不同格式的文件&#xff0c;看起来非常混乱&#xff0c;详情如图所示&#xff1a; 为了方便自己快速检索到文件&#xff0c;我们需要将这些文件按照不同格式分类整理到不同的文件夹中&#xff0c;应该怎么做呢&#xff1f;源码如下&…

v39.for循环while循环

1.循环引入&#xff08;loops&#xff09; 2.while loop 当括号中表达式expression返回真值时&#xff0c;代码块执行 。之后将再次检查expression &#xff0c;如果仍然返回真值&#xff0c;继续执行......直到expression返回假值。 3.for loop 有初始化、条件、递增/减 步骤。…

Zookeeper集群 + Kafka集群,Filebeat+Kafka+ELK

目录 什么是Zookeeper&#xff1f; Zookeeper 工作机制 Zookeeper 特点 Zookeeper 数据结构 Zookeeper 选举机制 实验 部署 Zookeeper 集群 1.安装前准备 安装 JDK 下载安装包 2.安装 Zookeeper 修改配置文件 拷贝配置好的 Zookeeper 配置文件到其他机器上 在每个节…

uni-app (安卓、微信小程序)接口封装 token失效自动获取新的token

一、文件路径截图 1、新建一个文件app.js存放接口 //这里存放你需要的接口import {request} from /utils/request.js //这个文件是请求逻辑处理 module.exports {// 登录 -- 注册perssonRegister: (data) > { // 供应商注册 return request({url: manageWx/Login/perssonR…

npm i 报一堆版本问题

1&#xff0c;先npm cache clean --force 再下载 插件后缀加上 --legacy-peer-deps 2&#xff0c; npm ERR! code CERT_HAS_EXPIRED npm ERR! errno CERT_HAS_EXPIRED npm ERR! request to https://registry.npm.taobao.org/yorkie/download/yorkie-2.0.0.tgz failed, reason…

【echarts图表】提示暂无数据

【echarts图表】提示暂无数据 背景&#xff1a;echarts图表数据有时候为空数组[ ]&#xff0c;这时候渲染图表异常&#xff0c;需要提示暂无数据 // 提示 暂无数据 : noDataBox 样式 this.$nextTick(() > {const dom document.getElementById("echartsId");dom…

JAVA的面试题四

1.电商行业特点 &#xff08;1&#xff09;分布式&#xff1a; ①垂直拆分:根据功能模块进行拆分 ②水平拆分:根据业务层级进行拆分 &#xff08;2&#xff09;高并发&#xff1a; 用户单位时间内访问服务器数量,是电商行业中面临的主要问题 &#xff08;3&#xff09;集群&…

响应式Web开发项目教程(HTML5+CSS3+Bootstrap)第2版 例4-9 HTML5 表单验证

代码 <!doctype html> <html> <head> <meta charset"utf-8"> <title>HTML5 表单验证</title> </head><body> <form action"#" method"get">请输入您的邮箱:<input type"email&q…

如何基于 ESP32 芯片测试 WiFi 连接距离、获取连接的 AP 信号强度(RSSI)以及 WiFi吞吐测试

测试说明&#xff1a; 测试 WiFi 连接距离&#xff0c;是将 ESP32 作为 WiFi Station 模式来连接路由器&#xff0c;通过在开阔环境下进行拉距来测试。另外&#xff0c;可以通过增大 WiFi TX Power 来增大连接距离。 获取连接的 AP 信号强度&#xff0c;一般可以通过 WiFi 扫描…