Debezium发布历史139

news2025/1/13 13:53:01

原文地址: https://debezium.io/blog/2023/02/04/ddd-aggregates-via-cdc-cqrs-pipeline-using-kafka-and-debezium/

欢迎关注留言,我是收集整理小能手,工具翻译,仅供参考,笔芯笔芯.

DDD Aggregates via CDC-CQRS Pipeline using Kafka & Debezium
February 4, 2023 by Purnima Jain
ddd cdc cqrs debezium kafka

在这篇文章中,我们将讨论在规范化的关系数据库(mysql)和去规范化的Nosql数据库(蒙戈数据库)之间的CDC-CQRS管道,这两个数据库是查询数据库,其结果是通过Debezim∓卡夫卡-流创建DDD集合。

您可以找到完整的示例源代码 在这里 .参阅 阅读。 关于构建和运行示例代码的详细信息.

这个例子围绕三个微服务:order-write-service ,order-aggregation-service 和order-read-service .这些服务是在java中作为"弹簧靴"应用程序实现的。

…order-write-service 在mysql数据库中各自的表中,提出了两个保留的端点--------------------------------------------------------Debezum对mysqb日志进行跟踪,以捕捉这些表中的任何事件,并向卡夫卡主题发布消息。这些话题是由order-aggregation-service 这是一个卡夫卡流应用程序,它将来自这两个主题的数据连接起来,创建一个订单集合对象,然后发布到第三个主题。蒙戈数据库接收器连接器使用这个主题,数据在蒙戈数据库中进行持久化,由order-read-service .

解决方案的总体架构见下图:
在这里插入图片描述

其他应用程序:订单书写服务
触发工作流启动的第一个组件是order-write-service .这已作为弹簧靴应用程序实现,并公开了两个休息点:

帖子:api/shipping-details 在mysql数据库中持久保存运输细节

帖子:api/item-details 在mysql数据库中保留项目细节

这两个端点都将它们的数据保存在mysql数据库中各自的表中。

命令数据库:mysql
上述休息端点的后端处理最终将数据持久化到mysql中各自的表中。

航运细节存储在一个表格中SHIPPING_DETAILS .物品的细节存储在一个表格中ITEM_DETAILS .

以下是SHIPPING_DETAILS 表格,一栏ORDER_ID 关键是:
在这里插入图片描述

以下是ITEM_DETAILS 表格,一栏ORDER_ID +ITEM_ID 关键是:
在这里插入图片描述

卡夫卡连接源连接器:MySQL
更改数据捕获(ccc)是一种从数据库事务日志中捕获更改事件的解决方案(在mysql的情况下称为宾基日志),并将这些事件转发给下游消费者EX。卡夫卡主题。

Debezum是一个为更改数据捕获提供低延迟数据流平台的平台,它是在阿帕奇卡夫卡之上建立的。它允许将数据库行级更改作为事件捕获并发布到阿帕奇卡夫卡主题。我们设置和配置Debezum来监视我们的数据库,然后我们的应用程序为对数据库进行的每个行级更改消费事件。

在我们的案例中,我们将使用Debezimmysql源连接器来捕捉上述表中的任何新事件,并将其转发给阿帕奇卡夫卡。为了实现这一点,我们将通过将以下JSON请求发送到卡夫卡连接的其余API来注册我们的连接器:

{
“name”: “app-mysql-db-connector”,
“config”: {
“connector.class”: “io.debezium.connector.mysql.MySqlConnector”,
“tasks.max”: “1”,
“database.hostname”: “mysql_db_server”,
“database.port”: “3306”,
“database.user”: “custom_mysql_user”,
“database.password”: “custom_mysql_user_password”,
“database.server.id”: “184054”,
“database.server.name”: “app-mysql-server”,
“database.whitelist”: “app-mysql-db”,
“table.whitelist”: “app-mysql-db.shipping_details,app-mysql-db.item_details”,
“database.history.kafka.bootstrap.servers”: “kafka_server:29092”,
“database.history.kafka.topic”: “dbhistory.app-mysql-db”,
“include.schema.changes”: “true”,
“transforms”: “unwrap”,
“transforms.unwrap.type”: “io.debezium.transforms.ExtractNewRecordState”
}
}
上述配置是基于1.9.5.最后。请注意,如果您试图使用Debezum2.0+的演示,上面的一些配置属性有了新的名称,配置将需要一些调整。

它建立了一个io.debezium.connector.mysql.MySqlConnector ,从指定的mysql实例捕获更改。请注意,通过表格包括清单,只对SHIPPING_DETAILS 和ITEM_DETAILS 可捕捉到表格。它还应用一个命名为单一消息转换(SMT)ExtractNewRecordState 它提取了after 场来自卡夫卡记录中的德贝兹改变事件。SMT只替换了原来的更改事件。after 创建一个简单的卡夫卡记录。

默认情况下,卡夫卡主题名称是"服务器.架构.表名",根据我们的连接器配置,它可以翻译为:

app-mysql-server.app-mysql-db.item_details

app-mysql-server.app-mysql-db.shipping_details

卡夫卡河应用:订单集合服务
卡夫卡河应用,即order-aggregation-service ,将处理来自卡夫卡CDC-主题的数据。这些主题接收疾病预防控制中心事件的基础是在mysql中找到的运输细节和项目细节关系。

在此基础上,可以建立如下用于创建和维护DDD订单的克兰兹拓扑结构。

应用程序读取来自运输细节-CDC主题的数据。由于卡夫卡主题记录是用德贝齐姆JSON格式与未包装的信封,我们需要解析订单标识和运输详细信息,以创建一个以订单标识为键、以运输详细信息为值的KTAD。

// Shipping Details Read
KStream<String, String> shippingDetailsSourceInputKStream = streamsBuilder.stream(shippingDetailsTopicName, Consumed.with(STRING_SERDE, STRING_SERDE));

// Change the Json value of the message to ShippingDetailsDto
KStream<String, ShippingDetailsDto> shippingDetailsDtoWithKeyAsOrderIdKStream = shippingDetailsSourceInputKStream
.map((orderIdJson, shippingDetailsJson) -> new KeyValue<>(parseOrderId(orderIdJson), parseShippingDetails(shippingDetailsJson)));

// Convert KStream to KTable
KTable<String, ShippingDetailsDto> shippingDetailsDtoWithKeyAsOrderIdKTable = shippingDetailsDtoWithKeyAsOrderIdKStream.toTable(
Materialized.<String, ShippingDetailsDto, KeyValueStore<Bytes, byte[]>>as(SHIPPING_DETAILS_DTO_STATE_STORE).withKeySerde(STRING_SERDE).withValueSerde(SHIPPING_DETAILS_DTO_SERDE));
同样,应用程序读取项目细节-CDC-主题的数据,并按一个列表中与同一订单相关的所有项目解析每个邮件到组的订单标识和项目标识-然后将其聚合到一个以订单标识为键、与该特定订单相关的项目列表作为值的KSab中。

// Item Details Read
KStream<String, String> itemDetailsSourceInputKStream = streamsBuilder.stream(itemDetailsTopicName, Consumed.with(STRING_SERDE, STRING_SERDE));

// Change the Key of the message from ItemId + OrderId to only OrderId and parse the Json value to ItemDto
KStream<String, ItemDto> itemDtoWithKeyAsOrderIdKStream = itemDetailsSourceInputKStream
.map((itemIdOrderIdJson, itemDetailsJson) -> new KeyValue<>(parseOrderId(itemIdOrderIdJson), parseItemDetails(itemDetailsJson)));

// Group all the ItemDtos for each OrderId
KGroupedStream<String, ItemDto> itemDtoWithKeyAsOrderIdKGroupedStream = itemDtoWithKeyAsOrderIdKStream.groupByKey(Grouped.with(STRING_SERDE, ITEM_DTO_SERDE));

// Aggregate all the ItemDtos pertaining to each OrderId in a list
KTable<String, ArrayList> itemDtoListWithKeyAsOrderIdKTable = itemDtoWithKeyAsOrderIdKGroupedStream.aggregate(
(Initializer<ArrayList>) ArrayList::new,
(orderId, itemDto, itemDtoList) -> addItemToList(itemDtoList, itemDto),
Materialized.<String, ArrayList, KeyValueStore<Bytes, byte[]>>as(ITEM_DTO_STATE_STORE).withKeySerde(STRING_SERDE).withValueSerde(ITEM_DTO_ARRAYLIST_SERDE));
由于两个KTAS都有订单作为键,使用订单很容易将它们连接起来创建一个叫做订单集合的聚合。订单集合是通过从船舶细节和项目细节中吸收数据而创建的一个复合对象。然后,这个订单集合写到一个订单集合卡夫卡主题。

// Joining the two tables: shippingDetailsDtoWithKeyAsOrderIdKTable and itemDtoListWithKeyAsOrderIdKTable
ValueJoiner<ShippingDetailsDto, ArrayList, OrderAggregate> shippingDetailsAndItemListJoiner = (shippingDetailsDto, itemDtoList) -> instantiateOrderAggregate(shippingDetailsDto, itemDtoList);
KTable<String, OrderAggregate> orderAggregateKTable = shippingDetailsDtoWithKeyAsOrderIdKTable.join(itemDtoListWithKeyAsOrderIdKTable, shippingDetailsAndItemListJoiner);

// Outputting to Kafka Topic
orderAggregateKTable.toStream().to(orderAggregateTopicName, Produced.with(STRING_SERDE, ORDER_AGGREGATE_SERDE));
卡夫卡连接槽连接器:蒙戈布连接器
接收器连接器是一个卡夫卡连接器,它读取来自阿帕奇卡夫卡的数据并将数据写入一些数据库。使用蒙戈数据库接收器连接器,很容易将DDD聚合物写入蒙戈数据库。它所需要的只是一个配置,可以发布到卡夫卡连接的其余API,以便运行连接器。

{
“name”: “app-mongo-sink-connector”,
“config”: {
“connector.class”: “com.mongodb.kafka.connect.MongoSinkConnector”,
“topics”: “order_aggregate”,
“connection.uri”: “mongodb://root_mongo_user:root_mongo_user_password@mongodb_server:27017”,
“key.converter”: “org.apache.kafka.connect.storage.StringConverter”,
“value.converter”: “org.apache.kafka.connect.json.JsonConverter”,
“value.converter.schemas.enable”: false,
“database”: “order_db”,
“collection”: “order”,
“document.id.strategy.overwrite.existing”: “true”,
“document.id.strategy”: “com.mongodb.kafka.connect.sink.processor.id.strategy.ProvidedInKeyStrategy”,
“transforms”: “hk,hv”,
“transforms.hk.type”: “org.apache.kafka.connect.transforms.HoistField K e y " , " t r a n s f o r m s . h k . f i e l d " : " i d " , " t r a n s f o r m s . h v . t y p e " : " o r g . a p a c h e . k a f k a . c o n n e c t . t r a n s f o r m s . H o i s t F i e l d Key", "transforms.hk.field": "_id", "transforms.hv.type": "org.apache.kafka.connect.transforms.HoistField Key","transforms.hk.field":"id","transforms.hv.type":"org.apache.kafka.connect.transforms.HoistFieldValue”,
“transforms.hv.field”: “order”
}
}
查询数据库:
将DDD聚合写入数据库order_db 在收藏中order 在蒙戈德。订单会变成_id 在餐桌上order 列存储订单-集合为JSON。

其他应用:订单-阅读-服务
在蒙戈数据库中持久存在的订单集合通过一个休息端点提供。order-read-service .

获取:api/order/{order-id} 从蒙戈数据库检索订单

执行指令
提供了此博客的完整源代码 在这里 在基特布。先克隆这个存储库然后转换成cdc-cqrs-pipeline 目录。该项目提供一个为所有组件提供服务的码头组合文件:

Mysql

通过浏览器管理mysql(原名为ppin行政人员)

蒙戈德

蒙戈快递,通过浏览器管理蒙戈数据库

饲养员

相融合的卡夫卡

卡夫卡连接

一旦所有服务启动,通过执行Create-MySQL-Debezium-Connector 和Create-MongoDB-Sink-Connector 分别要求cdc-cqrs-pipeline.postman_collection.json .执行请求Get-All-Connectors 验证连接器是否已正确创建。

更改为个别目录,并将三个弹簧靴应用程序展开:

order-write-service:在1号端口运行8070

order-aggregation-service:在1号端口运行8071

order-read-service:在1号端口运行8072

有了这个,我们的设置就完成了。

为了测试应用程序,执行请求Post-Shipping-Details 从邮递员收集到插入货运细节Post-Item-Details 插入特定订单ID的详细项目。

最后,执行Get-Order-By-Order-Id 在邮差集合中请求检索完整的订单聚合。

概括的
阿帕奇卡夫卡是服务间消息传递的高度可扩展和可靠的支柱。将阿帕奇卡夫卡置于整体架构的中心,也确保了所涉服务的脱钩。例如,如果解决方案的单个组件失败或在一段时间内无法使用,则将在稍后处理事件:在重新启动后,Debezum连接器将继续跟踪相关表,从它以前关闭的位置开始。同样,任何消费者将继续处理其先前抵消的主题。通过对已经成功处理的消息进行跟踪,可以检测到副本,并将其排除在重复处理之外。

当然,不同服务之间的此类事件管道最终是一致的,即:订单阅读服务等消费者可能比订单写作服务等生产者落后一些。通常情况下,这很好,可以用应用程序的业务逻辑来处理。此外,整个解决方案的端到端延迟通常较低(秒甚至次秒范围),这要归功于基于日志的变化数据捕获,它允许在接近实时的时间内发布事件。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1464152.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

大数据揭秘:Hadoop短视频流量分析实战

✍✍计算机编程指导师 ⭐⭐个人介绍&#xff1a;自己非常喜欢研究技术问题&#xff01;专业做Java、Python、微信小程序、安卓、大数据、爬虫、Golang、大屏等实战项目。 ⛽⛽实战项目&#xff1a;有源码或者技术上的问题欢迎在评论区一起讨论交流&#xff01; ⚡⚡ Java实战 |…

Mybatis速成(二)

文章目录 1. Mybatis基础操作1.1 需求1.2 准备1.3 删除1.3.1 功能实现1.3.2 日志输入1.3.3 预编译SQL1.3.3.1 介绍1.3.3.2 SQL注入1.3.3.3 参数占位符 1.4 新增1.4.1 基本新增1.4.2 主键返回 1.5 更新1.6 查询1.6.1 根据ID查询1.6.2 数据封装1.6.3 条件查询1.6.4 参数名说明 2.…

相机图像质量研究(25)常见问题总结:CMOS期间对成像的影响--过曝、欠曝

系列文章目录 相机图像质量研究(1)Camera成像流程介绍 相机图像质量研究(2)ISP专用平台调优介绍 相机图像质量研究(3)图像质量测试介绍 相机图像质量研究(4)常见问题总结&#xff1a;光学结构对成像的影响--焦距 相机图像质量研究(5)常见问题总结&#xff1a;光学结构对成…

HTTP REST 方式调用WebService接口(wsdl)

一、WebService接口正常使用SOAP协议调用&#xff0c;测试时常采用SoapUI软件调用&#xff0c;具体如下&#xff1a; 二、由于目前主流web服务逐渐转换为RESTful的形式&#xff0c;且SOAP协议的实现也是基于HTTP协议&#xff0c;故存在通过HTTP调用WebService接口的可能 2.1 …

第一个Qt程序中的秘密

创建第一个程序 首先我们打开Qt Creator 打开文件->New Projects... 菜单&#xff0c;创建我们的第一个Qt项目 选择 Qt Widgets Application&#xff0c;点击选择...按钮 之后&#xff0c;输入项目名称QtLearning&#xff0c;并选择创建路径&#xff0c; 在build system中选…

ClickHouse--11--ClickHouse API操作

提示&#xff1a;文章写完后&#xff0c;目录可以自动生成&#xff0c;如何生成可参考右边的帮助文档 文章目录 1.Java 读写 ClickHouse API1.1 首先需要加入 maven 依赖1.2 Java 读取 ClickHouse 集群表数据JDBC--01--简介 ClickHouse java代码 1.3 Java 向 ClickHouse 表中写…

1分钟带你了解Python数据类型

1.Python 3 主要有6种标准数据类型 Number&#xff08;数字&#xff09; String&#xff08;字符串&#xff09; List&#xff08;列表&#xff09; Tuple&#xff08;元组&#xff09; Set&#xff08;集合&#xff09; Dictionary&#xff08;字典&#xff09; 2.Numb…

不要浪费

解法&#xff1a; 记录一下tle的代码 #include <iostream> #include <vector> #include <algorithm> using namespace std; #define endl \n bool check(vector<int>& a, int l,int k) {int sum 0;for (int i 0; i < a.size() && l…

【Android 高德地图POI定位地址搜索】

先上演示&#xff1a; 高德地图的key申请这里就不讲了&#xff0c;比较简单&#xff0c;网上有很多资料&#xff0c;或者前往官网查看&#xff1a;官方文档 依赖引入 项目使用了如下依赖&#xff1a; //高德地图implementation com.amap.api:3dmap:latest.integration//地图…

alibabacloud学习笔记06(小滴课堂)

讲Sentinel流量控制详细操作 基于并发线程进行限流配置实操 在浏览器打开快速刷新会报错 基于并发线程进行限流配置实操 讲解 微服务高可用利器Sentinel熔断降级规则 讲解服务调用常见的熔断状态和恢复 讲解服务调用熔断例子 我们写一个带异常的接口&#xff1a;

centos7部署nfs+keepalived+drbd

一、项目需求描述 现在使用的架构是nfskeepalivedrsyncsersync&#xff0c;目前这套架构存在主从nfs节点数据同步不一致问题&#xff0c;大概会有 120s左右的数据延长同步时间&#xff0c;需要提供优化的自动化方案。 二、现有方案缺点 1、切换不能保证主从节点数据一致。 2、…

C++——基础语法(1)

前言 一路磕磕绊绊&#xff0c;也算是走到了C的大门下。C从名字上就可以看出是C语言的“plusplus版本”&#xff0c;C在兼容C语言的基础上又加入了许多方便又高深的特性与机制&#xff0c;便于我们更容易处理C语言中的棘手问题。不得不提的一点是C为我们打开了面向对象思想的大…

【ACM出版】第五届计算机信息和大数据应用国际学术会议(CIBDA 2024)

第五届计算机信息和大数据应用国际学术会议&#xff08;CIBDA 2024&#xff09; 2024 5th International Conference on Computer Information and Big Data Applications 重要信息 大会官网&#xff1a;www.ic-cibda.org 大会时间&#xff1a;2024年3月22-24日 大会地点&#…

Java中哪些很容易出现的坑

文章目录 1空指针2小数的计算3包装类型4Java8 Stream5日期格式化 先来一个简单一点&#xff0c;就从空指针开始吧 1空指针 //多级调用空指针userService.getUser("张三").getUserInfo().getUserName(); //例如getUser("张三")、getUserInfo&#xff08;&a…

基于SpringBoot的景区旅游管理系统

项目介绍 本期给大家介绍一个 景区旅游管理 系统.。主要模块有首页&#xff0c;旅游路线&#xff0c;旅行攻略&#xff0c;在线预定。管理员可以登录管理后台对用户进行管理&#xff0c;可以添加酒店&#xff0c;景区&#xff0c;攻略&#xff0c;路线等信息。整体完成度比较高…

【Java面试系列】JDK 1.8 新特性之 Stream API

目录 一、Stream 简介二、Stream 特点&#xff1a;Stream 注意点&#xff1a;1、什么是聚合操作2、Stream 流1、什么是流2、流的构成3、stream 流的两种操作4、惰性求值和及早求值方法5、Stream 流的并行 三、Stream操作的三个步骤1、创建流第一种&#xff1a;通过集合第二种&a…

2024全球网络安全展望|构建协同生态,护航数字经济

2024年1月&#xff0c;世界经济论坛发布《2024全球网络安全展望》报告&#xff0c;指出在科技快速发展的背景下&#xff0c;网络安全不均衡问题加剧&#xff0c;需加强公共部门、企业组织和个人的合作。 报告强调&#xff0c;面对地缘政治动荡、技术不确定性和全球经济波动&am…

Vue知识学习

Vue 是什么&#xff1f; 概念&#xff1a;Vue 是一个用于构建用户界面的渐进式框架 Vue 的两种使用方式: ① Vue 核心包开发 场景:局部 模块改造 ② Vue 核心包& Vue插件工程化开发 场景:整站开发 创建Vue 实例&#xff0c;初始化渲染的核心步骤: 1.准备容器 2.引包(官…

哪些软件可以把试卷照片转换成电子版?试试这些软件

哪些软件可以把试卷照片转换成电子版&#xff1f;在数字化时代&#xff0c;纸质试卷的保存和传输都显得不太方便。为了解决这个问题&#xff0c;我们可以将试卷照片转换成电子版。下面&#xff0c;我将为大家介绍5款可以轻松实现这一功能的软件&#xff0c;让你轻松应对各种试卷…

吸虫塔的工作原理是什么?

吸虫塔虫情智能测报分析系统是一款专门用于长期动态监测蚜虫等小型迁飞性害虫的大型植保设备&#xff0c;由装置上方的空气动力装置、上下两层远红外虫体处理装置、高清图像采集装置、虫体收集装置等部分组成。昆虫在经由设备上方的吸风装置后会被吸入设备内部&#xff0c;上下…