Debezium发布历史20

news2025/1/25 9:15:04

原文地址: https://debezium.io/blog/2017/09/25/streaming-to-another-database/

欢迎关注留言,我是收集整理小能手,工具翻译,仅供参考,笔芯笔芯.

将数据流式传输到下游数据库
九月 25, 2017 作者: Jiri Pechanec
mysql postgres smt 示例
在这篇博文中,我们将创建一个简单的流数据管道来持续捕获 MySQL 数据库中的更改,并将它们近乎实时地复制到 PostgreSQL 数据库中。我们将展示如何在不编写任何代码的情况下完成此操作,而是通过使用和配置 Kafka Connect、Debezium MySQL 源连接器、Confluence JDBC 接收器连接器和一些单消息转换 (SMT)。

这种通过 Kafka 复制数据的方法本身确实非常有用,但当我们可以将近乎实时的数据更改流与其他流、连接器和流处理应用程序相结合时,它会变得更加有利。最近的Confluence 博客文章系列展示了类似的流数据管道,但使用不同的连接器和 SMT。Kafka Connect 的优点在于您可以混合和匹配连接器以在多个系统之间移动数据。

我们还将演示Debezium 0.6.0中发布的一项新功能: CDC 事件扁平化的单个消息转换。

拓扑结构
该场景的一般拓扑如下图所示:
图片来源于debezium官网
在这里插入图片描述
图 1:一般拓扑

为了稍微简化设置,我们将仅使用一个包含所有连接器的 Kafka Connect 实例。即该实例将充当事件生产者和事件消费者:
图片来源于debezium官网在这里插入图片描述

图 2:简化的拓扑

配置
我们将使用此组合来快速部署演示。该部署由以下 Docker 映像组成:

阿帕奇动物园管理员

阿帕奇·卡夫卡

经过更改的丰富的Kafka Connect / Debezium镜像

PostgreSQL JDBC 驱动程序放入/kafka/libs目录中

Kafka Connect JDBC Connector(由Confluence开发)放入/kafka/connect/kafka-connect-jdbc目录

我们的教程中使用的预填充 MySQL

空 PostgreSQL

Debezium MySQL 连接器旨在专门捕获数据库更改,并提供有关这些事件的尽可能多的信息,而不仅仅是每行的新状态。同时,Confluence JDBC Sink Connector 的设计目的是根据消息的结构将每条消息简单地转换为数据库插入/更新插入。因此,两个连接器具有不同的消息结构,但它们也使用不同的主题命名约定和表示已删除记录的行为。

当使用并非设计用于协同工作的连接器时,结构和行为上的不匹配很常见。但这是我们可以轻松处理的事情,我们将在接下来的几节中讨论如何处理。

活动形式
Debezium 以复杂的格式发出事件,其中包含有关捕获的数据更改的所有信息:操作类型、源元数据、连接器处理事件的时间戳以及更改前后的行状态。Debezium 将此结构称为“信封”:

{
“op”: “u”,
“source”: {

},
“ts_ms” : “…”,
“before” : {
“field1” : “oldvalue1”,
“field2” : “oldvalue2”
},
“after” : {
“field1” : “newvalue1”,
“field2” : “newvalue2”
}
}
许多其他 Kafka Connect 源连接器没有能力了解这么多有关更改的信息,而是使用更简单的模型,其中每条消息直接代表行的后状态。这也是许多接收器连接器所期望的,Confluence JDBC Sink Connector 也不例外:

{
“field1” : “newvalue1”,
“field2” : “newvalue2”
}
虽然我们认为 Debezium CDC 连接器提供尽可能多的细节实际上是一件很棒的事情,但我们还使您可以轻松地将 Debezium 的“信封”格式转换为许多其他连接器所期望的“行”格式。Debezium 以单一消息转换的形式提供了这两种格式之间的桥梁。该ExtractNewRecordState转换会自动提取新的行记录,从而有效地将复杂的记录扁平化为可由其他连接器使用的简单记录。

您可以在源连接器上使用此 SMT 在将消息写入 Kafka之前转换消息,也可以将源连接器更丰富的消息“信封”形式存储在 Kafka 中,并在接收器连接器上使用此 SMT 来转换消息从 Kafka 读取数据之后以及传递到接收器连接器之前。这两个选项都有效,这仅取决于您是否发现消息的信封形式可用于其他目的。

在我们的示例中,我们使用以下配置属性在接收器连接器上应用 SMT:

“transforms”: “unwrap”,
“transforms.unwrap.type”: “io.debezium.transforms.ExtractNewRecordState”,
删除记录
当 Debezium 连接器检测到行被删除时,它会创建两个事件消息:删除事件和逻辑删除消息。删除消息有一个信封,其中字段中包含已删除行的状态before,并且after字段为null。逻辑删除消息包含与删除消息相同的键,但整个消息值为null,Kafka 的日志压缩利用这一点来知道它可以删除任何具有相同键的较早消息。许多接收器连接器(包括 Confluence 的 JDBC 接收器连接器)并不期望这些消息,如果它们看到任何一种消息,就会失败。默认情况下, SMTExtractNewRecordState将过滤掉这两者删除和逻辑删除记录,但如果您使用 SMT 并希望保留其中一种或两种消息,则可以更改此设置。

主题命名
最后但并非最不重要的一点是,主题的命名有所不同。Debezium 对代表其管理的每个表的目标主题使用完全限定的命名。命名遵循模式..。Kafka Connect JDBC 连接器使用简单的名称。

在更复杂的场景中,用户可以部署Kafka Streams框架来在源路由和目标路由之间建立详细的路由。在我们的示例中,我们将使用库存RegexRouterSMT,它将 Debezium 创建的记录路由到根据 JDBC 连接器架构命名的主题中。同样,我们可以在源连接器或接收器连接器中使用此 SMT,但在本示例中,我们将在源连接器中使用它,以便我们可以选择将在其中写入记录的 Kafka 主题的名称。

“transforms”: “route”,
“transforms.route.type”: “org.apache.kafka.connect.transforms.RegexRouter”,
“transforms.route.regex”: “([.]+)\.([.]+)\.([^.]+)”,
“transforms.route.replacement”: “$3”
例子
踢轮胎,让我们试试我们的例子!

首先我们需要部署所有组件。

export DEBEZIUM_VERSION=0.6
docker-compose up
当所有组件启动后,我们将注册 JDBC Sink 连接器写入 PostgreSQL 数据库:

curl -i -X POST -H “Accept:application/json” -H “Content-Type:application/json” http://localhost:8083/connectors/ -d @jdbc-sink.json
使用此注册请求:

{
“name”: “jdbc-sink”,
“config”: {
“connector.class”: “io.confluent.connect.jdbc.JdbcSinkConnector”,
“tasks.max”: “1”,
“topics”: “customers”,
“connection.url”: “jdbc:postgresql://postgres:5432/inventory?user=postgresuser&password=postgrespw”,
“transforms”: “unwrap”, (1)
“transforms.unwrap.type”: “io.debezium.transforms.ExtractNewRecordState”,(1)
“auto.create”: “true”, (2)
“insert.mode”: “upsert”, (3)
“pk.fields”: “id”, (4)
“pk.mode”: “record_value” (4)
}
}
该请求配置这些选项:

将 Debezium 的复杂格式分解为简单格式

自动创建目标表

如果不存在则插入一行或更新现有行

识别存储在Kafka记录值字段中的主键

然后必须设置源连接器:

curl -i -X POST -H “Accept:application/json” -H “Content-Type:application/json” http://localhost:8083/connectors/ -d @source.json
使用此注册请求:

{
“name”: “inventory-connector”,
“config”: {
“connector.class”: “io.debezium.connector.mysql.MySqlConnector”,
“tasks.max”: “1”,
“database.hostname”: “mysql”,
“database.port”: “3306”,
“database.user”: “debezium”,
“database.password”: “dbz”,
“database.server.id”: “184054”,
“database.server.name”: “dbserver1”, (1)
“database.whitelist”: “inventory”, (2)
“database.history.kafka.bootstrap.servers”: “kafka:9092”,
“database.history.kafka.topic”: “schema-changes.inventory”,
“transforms”: “route”, (3)
“transforms.route.type”: “org.apache.kafka.connect.transforms.RegexRouter”, (3)
“transforms.route.regex”: “([.]+)\.([.]+)\.([^.]+)”, (3)
“transforms.route.replacement”: “$3” (3)
}
}
该请求配置这些选项:

数据库的逻辑名称

我们要监控的数据库

一个SMT,定义与主题名称匹配的正则表达式..,并提取其中的第三部分作为最终的主题名称

让我们检查一下数据库是否同步。表的所​​有行都customers应该在源数据库(MySQL)和目标数据库(Postgres)中找到:

docker-compose exec mysql bash -c ‘mysql -u M Y S Q L U S E R − p MYSQL_USER -p MYSQLUSERpMYSQL_PASSWORD inventory -e “select * from customers”’
±-----±-----------±----------±----------------------+
| id | first_name | last_name | email |
±-----±-----------±----------±----------------------+
| 1001 | Sally | Thomas | sally.thomas@acme.com |
| 1002 | George | Bailey | gbailey@foobar.com |
| 1003 | Edward | Walker | ed@walker.com |
| 1004 | Anne | Kretchmar | annek@noanswer.org |
±-----±-----------±----------±----------------------+

docker-compose exec postgres bash -c ‘psql -U $POSTGRES_USER $POSTGRES_DB -c “select * from customers”’
last_name | id | first_name | email
-----------±-----±-----------±----------------------
Thomas | 1001 | Sally | sally.thomas@acme.com
Bailey | 1002 | George | gbailey@foobar.com
Walker | 1003 | Edward | ed@walker.com
Kretchmar | 1004 | Anne | annek@noanswer.org
在连接器仍在运行的情况下,我们可以向 MySQL 数据库添加一个新行,然后检查它是否已复制到 PostgreSQL 数据库中:

docker-compose exec mysql bash -c ‘mysql -u M Y S Q L U S E R − p MYSQL_USER -p MYSQLUSERpMYSQL_PASSWORD inventory’
mysql> insert into customers values(default, ‘John’, ‘Doe’, ‘john.doe@example.com’);
Query OK, 1 row affected (0.02 sec)

docker-compose exec -postgres bash -c ‘psql -U $POSTGRES_USER $POSTGRES_DB -c “select * from customers”’
last_name | id | first_name | email
-----------±-----±-----------±----------------------

Doe | 1005 | John | john.doe@example.com
(5 rows)
概括
我们建立了一个简单的流数据管道,以近乎实时的方式将数据从 MySQL 数据库复制到 PostgreSQL 数据库。我们使用 Kafka Connect、Debezium MySQL 源连接器、Confluence JDBC 接收器连接器和一些 SMT 来完成此任务 - 所有这些都无需编写任何代码。由于它是一个流系统,它将继续捕获对 MySQL 数据库所做的所有更改并近乎实时地复制它们。

下一步是什么?
在未来的博客文章中,我们将使用 Elasticsearch 作为事件目标来重现相同的场景。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1327862.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

期货交易策略模拟测试-基于CLBISO01策略-2023.12.22

采取与昨天同样的策略进行盘中模拟测试,今天行情还可以,挺“顺溜”。

1. 结构型模式 - 适配器模式

亦称: 封装器模式、Wrapper、Adapter 意图 适配器模式是一种结构型设计模式, 它能使接口不兼容的对象能够相互合作 问题 假如你正在开发一款股票市场监测程序, 它会从不同来源下载 XML 格式的股票数据, 然后向用户呈现出美观的图…

电气 接近开关

npn:和负载(控制器或者继电器)共阳极,低电平响应 pnp:和负载共阴极,高电平响应

声音克隆定制丰富和的系统源码+完整的代码包+搭建教程

随着科技的进步,人工智能(AI)技术已经逐渐渗透到我们生活的各个领域。声音克隆技术,作为AI领域的一个重要分支,通过模仿人类的声音特征,生成与目标声音相似的语音。这项技术在语音合成、语音识别、虚拟现实…

vue2 之 实现pdf电子签章

一、前情提要 1. 需求 仿照e签宝,实现pdf电子签章 > 拿到pdf链接,移动章的位置,获取章的坐标 技术 : 使用fabric pdfjs-dist vuedraggable 2. 借鉴 一位大佬的代码仓亏 : 地址 一位大佬写的文章 :地址 3. 优化 在大佬的代码…

椰油酰胺,预计到2026年将达到5.25亿美元

椰油酰胺,也称为椰油酰胺 DEA 或椰油酰胺 MEA,是从椰子油中提取的脂肪酸酰胺的混合物。它通常用作洗发水、香皂和化妆品等个人护理产品中的乳化剂和发泡剂。近年来,受个人护理产品需求增加以及椰油酰胺在食品和制药等其他行业的广泛使用推动&…

安全、高效的MySQL DDL解决方案

MySQL作为目前应用最广泛的开源关系型数据库,是许多网站、应用和商业产品的主要数据存储。在生产环境,线上数据库常常面临着持续的、不断变化的表结构修改(DDL),如增加、更改、删除字段和索引等等。其中一些DDL操作在M…

微服务 Spring Cloud 10,如何追踪微服务调用?服务治理的常见手段

目录 一、服务追踪的作用1、优化系统瓶颈2、优化链路调用3、故障排查4、性能优化5、生成网络拓扑图4、透明传输数据 二、节点管理1、服务调用失败一般有两类原因造成:2、服务调用失败的解决方式:3、服务调用失败的具体解决方式: 三、负载均衡…

电脑怎么重装系统?跟着步骤轻松搞定!

电脑系统随着时间的推移可能会变得迟缓或出现其他问题,而重装系统是解决这些问题的有效方法之一。本文将介绍三种电脑怎么重装系统的方法,帮助您在不同情况下选择适合自己的方案,让电脑焕然一新。 方法1:使用系统自带的恢复选项 …

[SWPUCTF 2021 新生赛]gift_F12

打开环境 题目有提示(F12),那就查看一下源代码 直接滑到最后 看提示猜测,flag就在源代码里了 ctrlf查找flag 最后得到flag,改一下形式就可以了

ELFK日志收集

文章目录 第一章:ELK日志收集系统介绍日志收集重要性ELK介绍EFK介绍ELFK介绍ES部署Kibana部署第二章:Logstach日志收集Logstash介绍Logstash安装Logstash Input输入插件Logstash Filter过滤插件Logstash Output输出插件Input fileFilter mutatesplit示例add_field示例remove_…

Flink系列之:Savepoints

Flink系列之:Savepoints 一、Savepoints二、分配算子ID三、Savepoint 状态四、算子五、触发Savepoint六、Savepoint 格式七、触发 Savepoint八、使用 YARN 触发 Savepoint九、使用 Savepoint 停止作业十、从 Savepoint 恢复十一、跳过无法映射的状态恢复十二、Resto…

阿里云大模型数据存储解决方案,为 AI 创新提供推动力

云布道师 随着国内首批大模型产品获批名单问世,百“模”大战悄然开启。在这场百“模”大战中,每一款大模型产品的诞生,都离不开数据的支撑。如何有效存储、管理和处理海量多模态数据集,并提升模型训练、推理的效率,保…

【湖仓一体尝试】MYSQL和HIVE数据联合查询

爬了两天大大小小的一堆坑,今天把一个简单的单机环境的流程走通了,记录一笔。 先来个完工环境照: mysqlhadoophiveflinkicebergtrino 得益于IBM OPENJ9的优化,完全启动后的内存占用: 1)执行联合查询后的…

【Java探索之旅】我与Java的初相识(二):程序结构与运行关系和JDK,JRE,JVM的关系

🎥 屿小夏 : 个人主页 🔥个人专栏 : Java入门到精通 🌄 莫道桑榆晚,为霞尚满天! 文章目录 📑前言一. 第一个Java程序1.1 main方法1.2 Java的程序结构 二. Java程序的运行三. JDK、JR…

【零基础入门】凸优化1:怎么培养研究能力,从模型+优化开始!

凸优化1 优化问题的形式优化问题类别1:凸函数 和 非凸函数优化问题类别2:带条件 和 无条件优化问题类别3:离散 和 连续优化问题类别4:平滑 和 非平滑如何判断一个目标函数是凸函数,还是非凸函数?怎么设计模…

Exynos4412 移植Linux-6.1(九)移植tiny4412_backlight驱动的过程及问题解决

系列文章目录 Exynos4412 移植Linux-6.1(一)下载、配置、编译Linux-6.1 Exynos4412 移植Linux-6.1(二)SD卡驱动——解决无法挂载SD卡的根文件系统 Exynos4412 移植Linux-6.1(三)SD卡驱动——解决mmc0: Ti…

解决 elementPlus 组件内容显示为英文的问题

解决 elementPlus 组件内容显示为英文的问题 一、问题描述 刚开始用 ElementPlus 发现默认的组件内容都是英文的 二、解决办法 找了找,发现是国际化的问题,默认就是显示英文,如果要显示中文需要配置中文显示。 关于显示中文的官方说明&a…

Windows11系统下如何通过.cab文件更新PL2303串口驱动?

Windows11系统下如何通过.cab文件更新PL2303串口驱动? 首先,在微软官方网站上下载所需版本的.cab文件,具体链接如下: https://www.catalog.update.microsoft.com/Search.aspx?q=Prolific%20USB-to-Serial%20Comm%20Port 如下图所示,进入该网站后,找到自己所需的驱动版…