Debezium日常分享系列之:Debezium 通知

news2025/1/10 20:23:37

Debezium日常分享系列之:Debezium 通知

  • 一、概论
  • 二、Debezium通知格式
  • 三、Debezium 有关初始快照状态的通知
  • 四、Debezium 有关增量快照进度的通知
  • 五、启用 Debezium 通知
  • 六、访问 Debezium JMX 通知
  • 七、自定义通知渠道
  • 八、应用案例

一、概论

Debezium 通知提供了一种获取有关连接器状态信息的机制。通知可以发送到以下渠道:

  • 接收器通知通道:通过 Connect API 将通知发送到配置的主题。
  • 日志通知通道:通知会附加到日志中。
  • JmxNotificationChannel:通知作为 JMX bean 中的属性公开。
  • 定制:通知将发送到您实施的自定义渠道。

二、Debezium通知格式

通知消息包含以下信息:

属性描述
id分配给通知的唯一标识符。对于增量快照通知,id 与使用执行快照信号发送的相同。
aggregate_type快照类型
type提供有关在aggregate_type 字段中指定的事件的状态信息。
additional_data包含有关通知的详细信息的 Map<String,String>。
timestamp创建通知的时间。 Epoch unix 时间戳(以毫秒为单位)

三、Debezium 有关初始快照状态的通知

以下示例显示了提供初始快照状态的典型通知:

{
    "id": "5563ae14-49f8-4579-9641-c1bbc2d76f99",
    "aggregate_type": "Initial Snapshot",
    "type": "COMPLETED", 
    "additional_data" : {
        "connector_name": "myConnector"
    },
    "timestamp": "1695817046353"
}

类型字段可以包含以下值之一:

  • STARTED
  • IN_PROGRESS
  • TABLE_SCAN_COMPLETED
  • COMPLETED
  • ABORTED
  • SKIPPED

下表显示了报告初始快照状态的通知中可能存在的不同负载的示例:

  • STARTED
{
      "id":"ff81ba59-15ea-42ae-b5d0-4d74f1f4038f",
      "aggregate_type":"Initial Snapshot",
      "type":"STARTED",
      "additional_data":{
         "connector_name":"my-connector"
      },
      "timestamp": "1695817046353"
}
  • IN_PROGRESS
{
   "id":"6d82a3ec-ba86-4b36-9168-7423b0dd5c1d",
   "aggregate_type":"Initial Snapshot",
   "type":"IN_PROGRESS",
   "additional_data":{
      "connector_name":"my-connector",
      "data_collections":"table1, table2",
      "current_collection_in_progress":"table1"
   },
   "timestamp": "1695817046353"
}

Mongo 连接器当前不支持字段 data_collection

  • TABLE_SCAN_COMPLETED
   "id":"6d82a3ec-ba86-4b36-9168-7423b0dd5c1d",
   "aggregate_type":"Initial Snapshot",
   "type":"TABLE_SCAN_COMPLETED",
   "additional_data":{
      "connector_name":"my-connector",
      "data_collection":"table1, table2",
      "scanned_collection":"table1",
      "total_rows_scanned":"100",
      "status":"SUCCEEDED"
   },
   "timestamp": "1695817046353"
}

在前面的示例中,additional_data.status 字段可以包含以下值之一:

SQL_异常:执行快照时发生 SQL 异常。

成功了:快照成功完成。

Mongo 连接器当前不支持字段total_rows_scanned 和data_collection

  • COMPLETED
{
      "id":"ff81ba59-15ea-42ae-b5d0-4d74f1f4038f",
      "aggregate_type":"Initial Snapshot",
      "type":"COMPLETED",
      "additional_data":{
         "connector_name":"my-connector"
      },
      "timestamp": "1695817046353"
}
  • ABORTED
{
      "id":"ff81ba59-15ea-42ae-b5d0-4d74f1f4038f",
      "aggregate_type":"Initial Snapshot",
      "type":"ABORTED",
      "additional_data":{
         "connector_name":"my-connector"
      },
      "timestamp": "1695817046353"
}
  • SKIPPED
{
      "id":"ff81ba59-15ea-42ae-b5d0-4d74f1f4038f",
      "aggregate_type":"Initial Snapshot",
      "type":"SKIPPED",
      "additional_data":{
         "connector_name":"my-connector"
      },
      "timestamp": "1695817046353"
}

四、Debezium 有关增量快照进度的通知

下表显示了报告增量快照状态的通知中可能存在的不同负载的示例:

  • Start
{
      "id":"ff81ba59-15ea-42ae-b5d0-4d74f1f4038f",
      "aggregate_type":"Incremental Snapshot",
      "type":"STARTED",
      "additional_data":{
         "connector_name":"my-connector",
         "data_collections":"table1, table2"
      },
      "timestamp": "1695817046353"
}
  • Paused
{
      "id":"068d07a5-d16b-4c4a-b95f-8ad061a69d51",
      "aggregate_type":"Incremental Snapshot",
      "type":"PAUSED",
      "additional_data":{
         "connector_name":"my-connector",
         "data_collections":"table1, table2"
      },
      "timestamp": "1695817046353"
}
  • Resumed
{
   "id":"a9468204-769d-430f-96d2-b0933d4839f3",
   "aggregate_type":"Incremental Snapshot",
   "type":"RESUMED",
   "additional_data":{
      "connector_name":"my-connector",
      "data_collections":"table1, table2"
   },
   "timestamp": "1695817046353"
}
  • Stopped
{
   "id":"83fb3d6c-190b-4e40-96eb-f8f427bf482c",
   "aggregate_type":"Incremental Snapshot",
   "type":"ABORTED",
   "additional_data":{
      "connector_name":"my-connector"
   },
   "timestamp": "1695817046353"
}
  • Processing chunk
{
   "id":"d02047d6-377f-4a21-a4e9-cb6e817cf744",
   "aggregate_type":"Incremental Snapshot",
   "type":"IN_PROGRESS",
   "additional_data":{
      "connector_name":"my-connector",
      "data_collections":"table1, table2",
      "current_collection_in_progress":"table1",
      "maximum_key":"100",
      "last_processed_key":"50"
   },
   "timestamp": "1695817046353"
}
  • Snapshot completed for a table
{
   "id":"6d82a3ec-ba86-4b36-9168-7423b0dd5c1d",
   "aggregate_type":"Incremental Snapshot",
   "type":"TABLE_SCAN_COMPLETED",
   "additional_data":{
      "connector_name":"my-connector",
      "data_collection":"table1, table2",
      "scanned_collection":"table1",
      "total_rows_scanned":"100",
      "status":"SUCCEEDED"
   },
   "timestamp": "1695817046353"
}

在前面的示例中,additional_data.status 字段可以包含以下值之一:

EMPTY:该表不包含任何值。

NO_PRIMARY_KEY:无法完成快照;表没有主键。

SKIPPED:无法完成此类表的快照。有关详细信息,请参阅日志。

SQL_EXCEPTION:执行快照时发生 SQL 异常。

SUCCEEDED:快照成功完成。

UNKNOWN_SCHEMA:找不到该表的架构。检查日志中已知表的列表。

  • Completed
{
   "id":"6d82a3ec-ba86-4b36-9168-7423b0dd5c1d",
   "aggregate_type":"Incremental Snapshot",
   "type":"COMPLETED",
   "additional_data":{
      "connector_name":"my-connector"
   },
   "timestamp": "1695817046353"
}

五、启用 Debezium 通知

要使 Debezium 能够发出通知,请通过设置 notification.enabled.channels 配置属性来指定通知通道列表。默认情况下,以下通知渠道可用:

  • sink
  • log
  • jmx

重要的:要使用接收器通知通道,还必须将 notification.sink.topic.name 配置属性设置为希望 Debezium 发送通知的主题的名称。

六、访问 Debezium JMX 通知

要使 Debezium 能够报告通过 JMX beans 公开的事件,请完成以下配置步骤:

  • 启用 JMX MBean 服务器以公开通知 bean。
  • 将 jmx 添加到连接器配置中的 notification.enabled.channels 属性中。
  • 将首选的 JMX 客户端连接到 MBean 服务器。

通知通过名称为 debezium..management.notifications. 的 bean 的“Notifications”属性公开。

下图显示了报告增量快照开始的通知:

在这里插入图片描述
要放弃通知,请对 bean 调用重置操作。

通知还公开为 debezium.notification 类型的 JMX 通知。要使应用程序能够侦听 MBean 发出的 JMX 通知,请为应用程序订阅通知。

七、自定义通知渠道

通知机制被设计为可扩展的。可以根据需要实施渠道,以最适合的环境的方式传递通知。添加通知通道涉及几个步骤:

  • 为通道创建一个Java项目来实现通道,并添加Debezium Core作为依赖项。
  • 部署通知通道。
  • 通过修改连接器配置,使连接器能够使用自定义通知通道。

配置自定义通知渠道

自定义通知通道是实现 io.debezium.pipeline.notification.channels.NotificationChannel 服务提供者接口 (SPI) 的 Java 类。例如:

public interface NotificationChannel {

    String name(); 1

    void init(CommonConnectorConfig config); 2

    void send(Notification notification); 3

    void close(); 4
}
  • 频道的名称。要使 Debezium 能够使用该通道,请在连接器的 notification.enabled.channels 属性中指定此名称。
  • 初始化通道所需的特定配置、变量或连接。
  • 在频道上发送通知。 Debezium 调用此方法来报告其状态。
  • 关闭所有分配的资源。 Debezium 在连接器停止时调用此方法。

Debezium 核心模块依赖项

自定义通知通道 Java 项目具有对 Debezium 核心模块的编译依赖项。必须将这些编译依赖项包含在项目的 pom.xml 文件中,如以下示例所示:

<dependency>
    <groupId>io.debezium</groupId>
    <artifactId>debezium-core</artifactId>
    <version>${version.debezium}</version> 
</dependency>

${version.debezium} 表示 Debezium 连接器的版本。

在 META-INF/services/io.debezium.pipeline.notification.channels.NotificationChannel 文件中声明您的实现。

部署自定义通知渠道

先决条件:有一个自定义通知通道 Java 程序。

程序:要将通知通道与 Debezium 连接器结合使用,请将 Java 项目导出到 JAR 文件,然后将该文件复制到包含要与其一起使用的每个 Debezium 连接器的 JAR 文件的目录。

例如,在典型部署中,Debezium 连接器文件存储在 Kafka Connect 目录 (/kafka/connect) 的子目录中,每个连接器 JAR 位于其自己的子目录中 (/kafka/connect/debezium-connector-db2、/kafka /connect/debezium-connector-mysql 等)。要将信号通道与连接器一起使用,请将转换器 JAR 文件添加到连接器的子目录中。

注意:要将自定义通知通道与多个连接器一起使用,必须将通知通道 JAR 文件的副本放置在每个连接器子目录中。

配置连接器以使用自定义通知通道:在连接器配置中,将自定义通知通道的名称添加到 notification.enabled.channels 属性中。

八、应用案例

  • Debezium系列之:实现增量快照incremental技术的详细步骤
  • Debezium系列之:基于数据库信号表和Kafka信号Topic两种技术方案实现增量快照incremental技术的详细步骤
  • Debezium系列之:深入理解临时阻塞快照

更多Debezium实战应用可以参考博主Debezium专栏:

  • Debezium专栏,Debezium实战应用详细总结

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1350032.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Excel中部分sheet页隐藏并设置访问密码

1、新建sheet1 2、新建sheet2 3、隐藏sheet2 4、保护工作簿、输密码 5、密码二次确认 6、隐藏的sheet2已经查看不了 7、想要查看时&#xff0c;按图示输入原密码即可 8、查看sheet2内容

混合编程—C++程序中python脚本的嵌入方法(理论部分)

一、C与Python高级编程语言简概 &#xff08;一&#xff09;C C是一种被广泛使用的计算机程序设计语言。它是一种通用程序设计语言&#xff0c;支持多重编程范式&#xff0c;例如过程化程序设计&#xff08;Procedural programming&#xff09;、面向对象程序设计&#xff08;…

no和not的应用场景

语法 后面直接跟名词&#xff1a;no 后面不是跟名词&#xff1a;not 案例

java struts2教务管理系统Myeclipse开发mysql数据库struts2结构java编程计算机网页项目

一、源码特点 java struts2 教务管理系统 是一套完善的web设计系统&#xff0c;对理解JSP java编程开发语言有帮助 struts2 框架开发&#xff0c;系统具有完整的源代码和数据库&#xff0c;系统主要采用B/S模式开发。开发环境 为TOMCAT7.0,Myeclipse8.5开发&#xff0c;数据库…

深度学习|2.11 向量化vectorization

2.11 向量化的作用 向量化可以使得向量中的每一个维度的数据进行并行计算&#xff0c;从而加快了神经网络的计算速度。 验证 其他

详解 MySql InnoDB 的 MVCC 实现机制

目录 一. 前言 二. 认识 MVCC 2.1. 什么是 MVCC&#xff1f; 2.2. 什么是当前读和快照读&#xff1f; 2.3. 当前读、快照读和 MVCC 的关系 2.4. MVCC 能解决什么问题&#xff0c;好处是什么&#xff1f; 2.5. 小结 三. MVCC 的实现原理 3.1. 隐式字段 3.2. undo 日志…

4-文献阅读-A Data-driven Base Station Sleeping Strategy Based on Traffic Prediction

目录 文献阅读—A Data-driven Base Station Sleeping Strategy Based on Traffic Prediction0、选这篇文章的原因1、文章的主要内容和贡献2、使用的数据集3、结果及分析4、郭郭有话说 文献阅读—A Data-driven Base Station Sleeping Strategy Based on Traffic Prediction 0…

一元函数微分学——刷题(12

目录 1.题目&#xff1a;2.解题思路和步骤&#xff1a;3.总结&#xff1a;小结&#xff1a; 1.题目&#xff1a; 2.解题思路和步骤&#xff1a; 注意两个y的区别即可 都可以在图中画出来&#xff0c;所以就非常好比较 3.总结&#xff1a; 理解两种y的区别即可 小结&…

【C#】知识点实践序列之Lock简单解决并发引起数据重复问题

欢迎来到《小5讲堂之知识点实践序列》文章&#xff0c;大家好&#xff0c;我是全栈小5。 这是2023年第3篇文章&#xff0c;此篇文章是C#知识点实践序列文章&#xff0c;博主能力有限&#xff0c;理解水平有限&#xff0c;若有不对之处望指正&#xff01; 本篇在Lock锁定代码块基…

Kubernetes(k8s):Namespace详解

Kubernetes&#xff08;k8s&#xff09;&#xff1a;Namespace详解 一、Namespace简介1.1 什么是Namespace1.2 Namespace的作用1.3 命名空间的分类 二、创建和管理Namespace2.1 创建Namespace2.2 管理Namespace 三、Namespace的实战应用3.1 部署多个项目3.2 环境隔离3.3 资源配…

JMeter 简单使用

JMeter 简介 Apache JMeter 是一款流行的性能测试工具&#xff0c;可以用来模拟用户行为并对系统进行压力测试。 安装 官方网站&#xff1a;http://jmeter.apache.org/ 在window下解压后&#xff0c; 运行 “bin/jmeter.bat” Jmeter 支持中文&#xff0c; 启动 Jmeter 后&…

SpringCloud微服务 【实用篇】| Dockerfile自定义镜像、DockerCompose

目录 一&#xff1a;Dockerfile自定义镜像 1. 镜像结构 2. Dockerfile语法 3. 构建Java项目 二&#xff1a; Docker-Compose 1. 初识DockerCompose 2. 部署微服务集群 前些天突然发现了一个巨牛的人工智能学习网站&#xff0c;通俗易懂&#xff0c;风趣幽默&#xff0c;…

【软件工程】设计概念

&#x1f34e;个人博客&#xff1a;个人主页 &#x1f3c6;个人专栏&#xff1a; 软件工程 ⛳️ 功不唐捐&#xff0c;玉汝于成 目录 前言 正文 软件工程中的设计概念 概念&#xff1a; 结语 我的其他博客 前言 在数字时代的浪潮中&#xff0c;软件工程设计成为塑造创新…

PCBA电阻失效分析

一、案例背景 PCBA电阻使用一段时间后发生功能失效不良&#xff0c;据此情况&#xff0c;对失效电阻进行分析&#xff0c;明确失效原因。 二、分析过程 1、针对排阻的分析 数据通讯的主要连接点&#xff1a; 电阻测试结果&#xff1a; 测试结果&#xff1a;RP2、RP5 排阻第 3 …

基于YOLOv8的目标跟踪技术

&#x1f4a1;&#x1f4a1;&#x1f4a1;本文摘要&#xff1a;介绍了YOLOv8自带的目标跟踪技术以及评价指标&#xff0c;并教会你如何在YOLOv8使用 1.YOLOv8自带两种跟踪方法 ultralytics/cfg/trackers/文件夹下 1.1 ByteTrack介绍 https://arxiv.org/pdf/2110.06864.pdf 摘…

【C++学习】:命名空间、输入输出和缺省参数全面解析

&#x1f3a5; 屿小夏 &#xff1a; 个人主页 &#x1f525;个人专栏 &#xff1a; C入门到进阶 &#x1f304; 莫道桑榆晚&#xff0c;为霞尚满天&#xff01; 文章目录 &#x1f4d1;前言一. 命名空间1.1 为什么需要命名空间&#xff1f;1.2 命名空间的定义1.3 命名空间特性1…

基于JAVA的创意工坊双创管理系统 开源项目

目录 一、摘要1.1 项目介绍1.2 项目录屏 二、功能模块2.1 管理员端2.2 Web 端2.3 移动端 三、系统展示四、核心代码4.1 查询项目4.2 移动端新增团队4.3 查询讲座4.4 讲座收藏4.5 小程序登录 五、免责说明 一、摘要 1.1 项目介绍 基于JAVAVueSpringBootMySQL的创意工坊双创管理…

「MySQL运维常见问题及解决方法」

「MySQL运维常见问题及解决方法」 一、查看MySQL数据库安装路径1.1、方式一 --SHOW VARIABLES LIKE basedir;1.2、方式二 --ps -ef | grep mysql 二、MySQL设置连接数与最大并发数2.1、永久生效--修改my.cnf文件2.2、临时生效--通过命令设置的全局变量 三、其他相关参数设置四、…

YOLOv5独家原创改进:提出一种新的Shape IoU,更加关注边界框本身的形状和尺度,对小目标检测也很友好 | 2023.12.29收录

💡💡💡本文改进:一种新的Shape IoU方法,该方法可以通过关注边界框本身的形状和尺度来计算损失,解决边界盒的形状和规模等固有属性对边界盒回归的影响。 💡💡💡对小目标检测涨点明显,在VisDrone2019、PASCAL VOC均有涨点 收录 YOLOv5原创自研 https://blog.cs…

Linux+nginx-前端部署的详细步骤

直奔主题 学废以下内容&#xff0c;前端也可以自己做部署啦~ Linux&#xff1a;大多数服务器都是使用Linux作为操作系统&#xff1a;稳定、安全、开源。finalShell工具&#xff1a;finalShell是一个方便管理远程服务器的工具&#xff0c;提供了可视化的操作配置界面。在和远程…