Apache SeaTunnel MongoDB CDC 使用指南

news2024/11/19 9:30:49

随着数据驱动决策的重要性日益凸显,实时数据处理成为企业竞争力的关键。SeaTunnel MongoDB CDC(Change Data Capture) 源连接器的推出,为开发者提供了一个高效、灵活的工具,以实现对 MongoDB 数据库变更的实时捕获和处理。

file

本文将深入探讨该连接器的主要特性、支持的数据源信息、配置选项以及如何创建数据同步作业,助力开发者更好地利用 SeaTunnel 进行数据集成和实时数据分析。这些更新旨在为开发者提供更为丰富的数据处理能力,帮助他们更有效地捕获和处理来自 MongoDB 的变更数据。

支持的引擎

SeaTunnel Zeta
Flink

主要特性

  • 批处理
  • 流处理
  • 精确一次
  • 列投影
  • 并行度
  • 支持用户定义分片

功能描述

MongoDB CDC 源连接器允许从 MongoDB 数据库读取快照数据和增量数据。

支持的数据源信息

要使用 MongoDB CDC 连接器,需要以下依赖。它们可以通过 install-plugin.sh 脚本或从 Maven 中央仓库下载。

数据源支持的版本依赖
MongoDB通用下载

可用性设置

  1. MongoDB版本:MongoDB 版本 >= 4.0。
  2. 集群部署:副本集或分片集群。
  3. 存储引擎:WiredTiger 存储引擎。
  4. 权限:changeStream 和 read
use admin;
db.createRole(
    {
        role: "strole",
        privileges: [{
            resource: { db: "", collection: "" },
            actions: [
                "splitVector",
                "listDatabases",
                "listCollections",
                "collStats",
                "find",
                "changeStream" ]
        }],
        roles: [
            { role: 'read', db: 'config' }
        ]
    }
);

db.createUser(
  {
      user: 'stuser',
      pwd: 'stpw',
      roles: [
         { role: 'strole', db: 'admin' }
      ]
  }
);

数据类型映射

以下表格列出了从 MongoDB BSON 类型到 SeaTunnel 数据类型的字段数据类型映射。

MongoDB BSON 类型SeaTunnel 数据类型
ObjectIdSTRING
StringSTRING
BooleanBOOLEAN
BinaryBINARY
Int32INTEGER
Int64BIGINT
DoubleDOUBLE
Decimal128DECIMAL
DateDATE
TimestampTIMESTAMP
ObjectROW
ArrayARRAY

对于 MongoDB 中的特定类型,我们使用扩展 JSON 格式将它们映射到 SeaTunnel STRING 类型。

MongoDB BSON 类型SeaTunnel STRING 表示
Symbol{"_value": {"$symbol": "12"}}
RegularExpression{"_value": {"$regularExpression": {"pattern": "^9$", "options": "i"}}}
JavaScript{"_value": {"$code": "function() { return 10; }"}}
DbPointer{"_value": {"$dbPointer": {"$ref": "db.coll", "$id": {"$oid": "63932a00da01604af329e33c"}}}}
提示

在 SeaTunnel 中使用 DECIMAL 类型时,请注意最大范围不能超过 34 位数字,这意味着你应该使用 decimal(34, 18)。

名称类型必须默认值描述
hostsString-MongoDB 服务器的主机名和端口对的逗号分隔列表。例如:localhost:27017,localhost:27018
usernameString-连接 MongoDB 时使用的数据库用户名。
passwordString-连接 MongoDB 时使用的密码。
databaseList-要监视更改的数据库名称。如果未设置,则会捕获所有数据库。数据库还支持正则表达式,以监视与正则表达式匹配的多个数据库。例如:db1,db2。
collectionList-数据库中要监视更改的集合名称。如果未设置,则会捕获所有集合。集合也支持正则表达式,以监视与完全限定的集合标识符匹配的多个集合。例如:db1.coll1,db2.coll2。
connection.optionsString-MongoDB 的连接选项的和号分隔列表。例如:replicaSet=test&connectTimeoutMS=300000。
batch.sizeLong1024游标批大小。
poll.max.batch.sizeEnum1024轮询新数据时包含在单个批次中的更改流文档的最大数量。
poll.await.time.msLong1000等待检查更改流上的新结果之前的时间量。
heartbeat.interval.msString0发送心跳消息之间的时间长度(以毫秒为单位)。使用 0 禁用。
incremental.snapshot.chunk.size.mbLong64增量快照的块大小(MB)。
common-options-源插件通用参数,请参考源通用选项获取详情。

提示:

  • 如果集合变更速度较慢,强烈建议为 heartbeat.interval.ms 参数设置大于 0 的适当值。当我们从检查点或保存点恢复 SeaTunnel 作业时,心跳事件可以将 resumeToken 推进以避免其过期。
  • MongoDB 对单个文档有 16MB 的限制。更改文档包括附加信息,因此即使原始文档不大于 15MB,更改文档也可能超过 16MB 限制,导致更改流操作终止。
  • 建议使用不可变的分片键。在 MongoDB 中,分片键在启用事务后允许修改,但更改分片键可能导致频繁的分片迁移,造成额外的性能开销。此外,修改分片键还可能导致更新查找功能变得无效,在 CDC(更改数据捕获)场景中导致不一致的结果。

如何创建 MongoDB CDC 数据同步作业

将 CDC 数据打印到客户端

以下示例演示如何创建一个从 MongoDB 读取 CDC 数据并在本地客户端打印的数据同步作业:

env {
  # 您可以在此处设置引擎配置
  parallelism = 1
  job.mode = "STREAMING"
  checkpoint.interval = 5000
}

source {
  MongoDB-CDC {
    hosts = "mongo0:27017"
    database = ["inventory"]
    collection = ["inventory.products"]
    username = stuser
    password = stpw
    schema = {
      fields {
        "_id" : string,
        "name" : string,
        "description" : string,
        "weight" : string
      }
    }
  }
}

# 在本地客户端打印读取的 MongoDB 数据
sink {
  Console {
    parallelism = 1
  }
}

将 CDC 数据写入 MysqlDB

以下示例演示如何创建一个从 MongoDB 读取 CDC 数据并写入 mysql 数据库的数据同步作业:

env {
  # You can set engine configuration here
  parallelism = 1
  job.mode = "STREAMING"
  checkpoint.interval = 5000
}

source {
  MongoDB-CDC {
    hosts = "mongo0:27017"
    database = ["inventory"]
    collection = ["inventory.products"]
    username = stuser
    password = stpw
  }
}

sink {
  jdbc {
    url = "jdbc:mysql://mysql_cdc_e2e:3306"
    driver = "com.mysql.cj.jdbc.Driver"
    user = "st_user"
    password = "seatunnel"

    generate_sink_sql = true
    # You need to configure both database and table
    database = mongodb_cdc
    table = products
    primary_keys = ["_id"]
  }
}

多表同步

以下示例演示如何创建一个读取 mongodb 多库表 CDC 数据并在本地客户端打印的数据同步作业:

env {
  # You can set engine configuration here
  parallelism = 1
  job.mode = "STREAMING"
  checkpoint.interval = 5000
}

source {
  MongoDB-CDC {
    hosts = "mongo0:27017"
    database = ["inventory","crm"]
    collection = ["inventory.products","crm.test"]
    username = stuser
    password = stpw
  }
}

# Console printing of the read Mongodb data
sink {
  Console {
    parallelism = 1
  }
}

提示: 多库表 CDC 同步不能指定 schema,只能下游输出 json 数据。这是因为 MongoDB 不提供查询元数据信息,所以如果想支持多表,所有表只能作为一个结构读取。

使用正则表达式匹配多表

以下示例演示如何创建一个通过正则表达式读取 mongodb 多库表数据并在本地客户端打印的数据同步作业:

匹配示例表达式描述
前缀匹配^(test).*匹配数据库名或表名以 test 为前缀的,如 test1, test2 等。
后缀匹配.*[p$]匹配数据库名或表名以 p 为后缀的,如 cdcp, edcp 等。
```
env {
# You can set engine configuration here
parallelism = 1
job.mode = "STREAMING"
checkpoint.interval = 5000
}

source { MongoDB-CDC { hosts = "mongo0:27017" # So this example is used (^(test).|^(tpc).|txc|.[p$]|t{2}).(t[5-8]|tt),matching txc.tt、test2.test5. database = ["(^(test).|^(tpc).|txc|.[p$]|t{2})"] collection = ["(t[5-8]|tt)"] username = stuser password = stpw } }

Console printing of the read Mongodb data

sink { Console { parallelism = 1 } }


### 实时流数据格式

{ _id : { }, // Identifier of the open change stream, can be assigned to the 'resumeAfter' parameter for subsequent resumption of this change stream "operationType" : " ", // The type of change operation that occurred, such as: insert, delete, update, etc. "fullDocument" : { }, // The full document data involved in the change operation. This field does not exist in delete operations "ns" : {
"db" : " ", // The database where the change operation occurred "coll" : " " // The collection where the change operation occurred }, "to" : { // These fields are displayed only when the operation type is 'rename' "db" : " ", // The new database name after the change "coll" : " " // The new collection name after the change }, "source":{ "ts_ms":" ", // The timestamp when the change operation occurred "table":" " // The collection where the change operation occurred "db":" ", // The database where the change operation occurred "snapshot":"false" // Identify the current stage of data synchronization }, "documentKey" : { "_id" : }, // The _id field value of the document involved in the change operation "updateDescription" : { // Description of the update operation "updatedFields" : { }, // The fields and values that the update operation modified "removedFields" : [ " ", ... ] // The fields and values that the update operation removed } "clusterTime" : , // The timestamp of the Oplog log entry corresponding to the change operation "txnNumber" : , // If the change operation is executed in a multi-document transaction, this field and value are displayed, representing the transaction number "lsid" : { // Represents information related to the Session in which the transaction is located "id" : , "uid" : } }

```

到此本指南就结束了,MongoDB CDC Sink连接器的发布,不仅强化了 Apache SeaTunnel 在数据集成领域的地位,也为开发者提供了更多的可能性。

Apache SeaTunnel 社区也期待您的参与和贡献,共同迈向更广阔的数据处理未来,让我们携手共建一个更加强大、开放、互助的社区!

本文由 白鲸开源科技 提供发布支持!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1514603.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

LeetCode——贪心算法(Java)

贪心算法 简介[简单] 455. 分发饼干[中等] 376. 摆动序列[中等] 53. 最大子数组和[中等] 122. 买卖股票的最佳时机 II[中等] 55. 跳跃游戏 简介 记录一下自己刷题的历程以及代码。写题过程中参考了 代码随想录的刷题路线。会附上一些个人的思路,如果有错误&#xf…

STM32输入捕获频率和占空比proteus仿真失败

这次用了两天的时间来验证这个功能,虽然实验没有成功,但是也要记录一下,后面能解决了,回来再写上解决的办法: 这个程序最后的实验结果是读取到的CCR1和CCR2的值都是0,所以没有办法算出来频率和占空比。 还…

人工智能|机器学习——BIRCH聚类算法(层次聚类)

这里再来看看另外一种常见的聚类算法BIRCH。BIRCH算法比较适合于数据量大,类别数K也比较多的情况。它运行速度很快,只需要单遍扫描数据集就能进行聚类。 1.什么是流形学习 BIRCH的全称是利用层次方法的平衡迭代规约和聚类(Balanced Iterative…

应用程序性能监控(APM)的解决方案

随着技术的不断发展,APM 监控和可观测性的重要性怎么强调都不为过,应用程序已成为业务运营的支柱。随着组织越来越依赖数字解决方案来推动其流程并与用户互动,确保最佳性能和可用性变得至关重要。这超越了系统正常运行时间,深入研…

拼多多商品详情接口数据采集

拼多多商品详情接口数据采集是一个相对专业的任务,通常涉及到使用API接口或第三方采集工具等技术手段。以下是一些基本步骤和注意事项,供您参考: 请求示例,API接口接入Anzexi58 申请开发者账号:如果您打算使用API接口…

政务云安全风险分析与解决思路探讨

1.1概述 为了掌握某市政务网站的网络安全整体情况,在相关监管机构授权后,我们组织人员抽取了某市78个政务网站进行安全扫描,通过安全扫描,对该市政务网站的整体安全情况进行预估。 1.2工具扫描结果 本次利用漏洞扫描服务VSS共扫…

基于Springboot的集团门户网站(有报告)。Javaee项目,springboot项目。

演示视频: 基于Springboot的集团门户网站(有报告)。Javaee项目,springboot项目。 项目介绍: 采用M(model)V(view)C(controller)三层体系结构&…

FPGA高端项目:FPGA基于GS2971+GS2972架构的SDI视频收发+纯verilog图像缩放+多路视频拼接,提供8套工程源码和技术支持

目录 1、前言免责声明 2、相关方案推荐本博已有的 SDI 编解码方案本方案的SDI接收发送本方案的SDI接收图像缩放应用本方案的SDI接收HLS图像缩放HLS多路视频拼接应用本方案的SDI接收OSD动态字符叠加输出应用本方案的SDI接收HLS多路视频融合叠加应用本方案的SDI接收GTX 8b/10b编解…

KeePass 密码库坚果云授权同步(免费)

前言介绍 KeePass是一款开源的密码管理工具,可以帮助你安全地存储和管理各种密码和敏感信息。 下载安装 下载KeePass:官网,下载KeePass的安装文件。根据你的操作系统选择适用的版本,比如Windows、macOS或Linux。 安装KeePass&a…

神经网络处理器优化设计(一)

神经网络处理器优化设计,涉及到一些特殊和通用处理流程,一是降低硬件成本,二是提高性能。 一 跨层流水线调度 这里主要针对深度可分离卷积,将Pointwise conv与Depthwise卷积并行处理,好处是,减小整体流水时…

开源生态与软件供应链研讨会

✦ 日程安排 开源生态与软件供应链研讨会 时间: 2024年3月12日(星期二)13:30 – 17:00 地点: 复旦大学江湾校区二号交叉学科楼E1021 联系人: 陈碧欢(bhchenfudan.edu.cn) 点击文末“阅读原文”或扫描下方二维码进入报名通…

cms垃圾回收

cms垃圾回收 CMS概述CMS收集器整体流程初始标记并发标记重新标记并发清除 CMS卡表什么是卡表(card table)什么是mod-union table CMS概述 CMS(Concurrent Mark Sweep)收集器是Java虚拟机中的一种老年代(old Generation)垃圾收集器,他主要目标是减少垃圾收集时的应用…

Redis实现分布式锁源码分析

为什么使用分布式锁 单机环境并发时,使用synchronized或lock接口可以保证线程安全,但它们是jvm层面的锁,分布式环境并发时,100个并发的线程可能来自10个服务节点,那就是跨jvm了。 简单分布式锁实现 SETNX 格式&…

k8s关于pod

目录 1、POD 的创建流程 kubectl 发起创建 Pod 请求: API Server 接收请求并处理: 写入 Etcd 数据库: Kubelet 监听并创建 Pod: Pod 状态更新和汇报: 2、POD 的状态解析 1. Pending Pod 2. Running Pod 3. S…

【PRIVGUARD-privguard-artifact-main】代码学习(parser部分)

privguard-artifact-main:parser部分简述 1.abstract_domain.py (1)简介 实现PrivGuard中的抽象域功能。PrivGuard是一个旨在确保Python程序符合特定隐私策略的工具。代码中定义了两种类型的抽象域:闭区间格(ClosedIn…

霹雳学习笔记——6.1.2 ResNeXt

相比于ResNet,更新了block 效果:错误率低于ResNet,并且计算量一样。 对比卷积和组卷积,参数个数会变成1/g倍,g是分成了g组 最终输出的channel与卷积核的个数相同。 (好像是。。。好像之前听过这个&#xff…

【LeetCode热题100】73. 矩阵置零(矩阵)

一.题目要求 给定一个 m x n 的矩阵,如果一个元素为 0 ,则将其所在行和列的所有元素都设为 0 。请使用 原地 算法。 二.题目难度 中等 三.输入样例 示例 1: 输入:matrix [[1,1,1],[1,0,1],[1,1,1]] 输出:[[1,0…

文字弹性跳动CSS3代码

文字弹性跳动CSS3代码,源码由HTMLCSSJS组成,记事本打开源码文件可以进行内容文字之类的修改,双击html文件可以本地运行效果,也可以上传到服务器里面,重定向这个界面 下载地址 文字弹性跳动CSS3代码

Rust 构建开源 Pingora 框架可以与nginx媲美

一、概述 Cloudflare 为何弃用 Nginx,选择使用 Rust 重新构建新的代理 Pingora 框架。Cloudflare 成立于2010年,是一家领先的云服务提供商,专注于内容分发网络(CDN)和分布式域名解析。它提供一系列安全和性能优化服务…

4.MAC平台Python的下载、安装(含Python2.7+Python3.12双版本环境变量配置)——《跟老吕学Python编程》

4.MAC平台Python的下载、安装(含Python2.7Python3.12双版本环境变量配置)——《跟老吕学Python编程》)——跟老吕学Python编程 一、下载MAC版Python1.Python官网2.MAC版Python下载网址 二、在MAC安装Python1.在MAC安装Python2.阅读Python重要…