Debezium系列之:Debezium 通知

news2025/1/11 20:45:04

Debezium系列之:Debezium 通知

  • 一、概述
  • 二、Debezium 通知格式
  • 三、可用的通知
  • 四、启用 Debezium 通知
  • 五、访问 Debezium JMX 通知
  • 六、自定义通知渠道
  • 七、配置自定义通知渠道
  • 八、Debezium 核心模块依赖项
  • 九、部署自定义通知渠道
  • 十、配置连接器以使用自定义通知通道

一、概述

Debezium 通知提供了一种获取有关连接器状态信息的机制。通知可以发送到以下渠道:

  • 接收器通知通道:通过 Connect API 将通知发送到配置的主题。
  • 日志通知通道:通知会附加到日志中。
  • JmxNotificationChannel:通知作为 JMX bean 中的属性公开。
  • Custom:通知将发送到您实施的自定义渠道。

二、Debezium 通知格式

通知消息包含以下信息:

属性描述
id分配给通知的唯一标识符。对于增量快照通知,该 ID 与使用执行快照信号发送的 ID 相同。
aggregate_type与通知相关的聚合根的数据类型。在领域驱动设计中,导出的事件应始终引用聚合。
type提供有关在aggregate_type 字段中指定的事件的状态信息。
additional_data包含有关通知的详细信息的 Map<String,String>。有关示例,请参阅有关增量快照进度的 Debezium 通知。

三、可用的通知

Debezium 通知提供有关初始快照或增量快照进度的信息。

初始快照的状态
以下示例显示了提供初始快照状态的典型通知:

{
    "id": "5563ae14-49f8-4579-9641-c1bbc2d76f99",
    "aggregate_type": "Initial Snapshot",
    "type": "COMPLETED" 
}

类型字段可以包含以下值之一:

  • COMPLETED
  • ABORTED
  • SKIPPED

Debezium 有关增量快照进度的通知
下表显示了报告增量快照状态的通知中可能存在的不同负载的示例:

Start:

{
      "id":"ff81ba59-15ea-42ae-b5d0-4d74f1f4038f",
      "aggregate_type":"Incremental Snapshot",
      "type":"STARTED",
      "additional_data":{
         "connector_name":"my-connector",
         "data_collections":"table1, table2"
      }
}

Paused:

{
      "id":"068d07a5-d16b-4c4a-b95f-8ad061a69d51",
      "aggregate_type":"Incremental Snapshot",
      "type":"PAUSED",
      "additional_data":{
         "connector_name":"my-connector",
         "data_collections":"table1, table2"
      }
}

Resumed:

{
   "id":"a9468204-769d-430f-96d2-b0933d4839f3",
   "aggregate_type":"Incremental Snapshot",
   "type":"RESUMED",
   "additional_data":{
      "connector_name":"my-connector",
      "data_collections":"table1, table2"
   }
}

Stopped:

{
   "id":"83fb3d6c-190b-4e40-96eb-f8f427bf482c",
   "aggregate_type":"Incremental Snapshot",
   "type":"ABORTED",
   "additional_data":{
      "connector_name":"my-connector"
   }
}

Processing chunk:

{
   "id":"d02047d6-377f-4a21-a4e9-cb6e817cf744",
   "aggregate_type":"Incremental Snapshot",
   "type":"IN_PROGRESS",
   "additional_data":{
      "connector_name":"my-connector",
      "data_collections":"table1, table2",
      "current_collection_in_progress":"table1",
      "maximum_key":"100",
      "last_processed_key":"50"
   }
}

Snapshot completed for a table:

{
   "id":"6d82a3ec-ba86-4b36-9168-7423b0dd5c1d",
   "aggregate_type":"Incremental Snapshot",
   "type":"TABLE_SCAN_COMPLETED",
   "additional_data":{
      "connector_name":"my-connector",
      "data_collection":"table1",
      "total_rows_scanned":"100",
      "status":"SUCCEEDED" 
   }
}

可能的值为:

  • EMPTY - 表为空
  • NO_PRIMARY_KEY - 表没有快照所需的主键
  • SKIPPED - 不支持此类表的快照,请检查日志以了解详细信息
  • SQL_EXCEPTION - 处理快照时捕获 SQL 异常
  • SUCCEEDED - 快照成功完成
  • UNKNOWN_SCHEMA - 找不到表的架构,请检查日志以获取已知表的列表

Completed:

{
   "id":"6d82a3ec-ba86-4b36-9168-7423b0dd5c1d",
   "aggregate_type":"Incremental Snapshot",
   "type":"COMPLETED",
   "additional_data":{
      "connector_name":"my-connector"
   }
}

四、启用 Debezium 通知

要使 Debezium 能够发出通知,请通过设置 notification.enabled.channels 配置属性来指定通知通道列表。默认情况下,以下通知渠道可用:

  • sink
  • log
  • jmx

要使用接收器通知通道,您还必须将 notification.sink.topic.name 配置属性设置为您希望 Debezium 发送通知的主题的名称。

五、访问 Debezium JMX 通知

要使 Debezium 能够报告通过 JMX beans 公开的事件,请完成以下配置步骤:

  • 启用 JMX MBean 服务器以公开通知 bean。
  • 将 jmx 添加到连接器配置中的 notification.enabled.channels 属性中。
  • 将您首选的 JMX 客户端连接到 MBean 服务器。

通知通过名称为 debezium.<connector-type>.management.notifications.<server> 的 bean 的“Notifications”属性公开。

下图显示了报告增量快照开始的通知:
在这里插入图片描述
要放弃通知,请对 bean 调用重置操作。

通知还公开为 debezium.notification 类型的 JMX 通知。要使应用程序能够侦听 MBean 发出的 JMX 通知,请为应用程序订阅通知。

六、自定义通知渠道

通知机制被设计为可扩展的。您可以根据需要实施渠道,以最适合您的环境的方式传递通知。添加通知通道涉及几个步骤:

  • 为通道创建一个Java项目来实现通道,并添加Debezium Core作为依赖项。
  • 部署通知通道。
  • 通过修改连接器配置,使连接器能够使用自定义通知通道。

七、配置自定义通知渠道

自定义通知通道是实现 io.debezium.pipeline.notification.channels.NotificationChannel 服务提供者接口 (SPI) 的 Java 类。例如:

public interface NotificationChannel {

    String name(); 

    void init(CommonConnectorConfig config); 

    void send(Notification notification); 

    void close(); 
}
  1. 频道的名称。要使 Debezium 能够使用该通道,请在连接器的 notification.enabled.channels 属性中指定此名称。
  2. 初始化通道所需的特定配置、变量或连接。
  3. 在频道上发送通知。 Debezium 调用此方法来报告其状态。
  4. 关闭所有分配的资源。 Debezium 在连接器停止时调用此方法。

八、Debezium 核心模块依赖项

自定义通知通道 Java 项目具有对 Debezium 核心模块的编译依赖项。您必须将这些编译依赖项包含在项目的 pom.xml 文件中,如以下示例所示:

<dependency>
    <groupId>io.debezium</groupId>
    <artifactId>debezium-core</artifactId>
    <version>${version.debezium}</version> 
</dependency>

${version.debezium} 表示 Debezium 连接器的版本。

在 META-INF/services/io.debezium.pipeline.notification.channels.NotificationChannel 文件中声明您的实现。

九、部署自定义通知渠道

先决条件:

  • 您有一个自定义通知通道 Java 程序。

程序

  • 要将通知通道与 Debezium 连接器结合使用,请将 Java 项目导出到 JAR 文件,然后将该文件复制到包含要与其一起使用的每个 Debezium 连接器的 JAR 文件的目录。
  • 例如,在典型部署中,Debezium 连接器文件存储在 Kafka Connect 目录 (/kafka/connect) 的子目录中,每个连接器 JAR 位于其自己的子目录中 (/kafka/connect/debezium-connector-db2、/kafka /connect/debezium-connector-mysql 等)。要将信号通道与连接器一起使用,请将转换器 JAR 文件添加到连接器的子目录中。

十、配置连接器以使用自定义通知通道

将自定义通知通道的名称添加到 notification.enabled.channels 配置属性中。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/690466.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

MFC加载3ds模型初步

网上下一个资源&#xff0c;名为 OpenGL三维场景绘制.rar&#xff1b; 看一下它是用MFC和opengl&#xff0c;自己绘制三维场景&#xff1b; 运行一下&#xff0c;有一个exe可以运行&#xff1b; 有一个较新版本的不能运行&#xff1b;这应是缺少VC运行库&#xff1b; 下面单独…

Linux下RPM软件包管理

目录 1、软件包管理介绍1.1、软件包分类1.2、源码包1.3、RPM包 2、RPM包管理-包命名和依赖性2.1、RPM命名规则2.2、RPM包依赖性 3、RPM包管理-安装升级和与卸载3.1、包全名与包名3.2、RPM安装3.3、RPM包升级3.4、卸载 4、RPM包管理-查询4.1、查询是否安装4.2、查询软件包详细信…

Gitlab将本地代码推送到远程空仓库

目录 引言 1、设置Git为源代码管理插件 2、创建Git仓库 3、设置多个远程仓库 引言 如果我们的本地代码想上传到公司内部的服务器&#xff0c;首先我们需要在VS2022中创建Git仓库&#xff0c;然后设置远程仓库的地址&#xff0c;才能将本地代码推送到远端。在远端会根据你本地…

表格式表单-table式from表单-合并行-合并列

效果: 使用【colspan】合并行 和【rowspan】合并列 html: <!-- 添加或修改报告数据库对话框 --><el-dialog :title"title" :visible.sync"open" width"1500px" append-to-body><el-form ref"form" :model"form&q…

Android Jetpack Compose之Checkbox的使用

Android Jetpack Compose 是一个现代化的 UI 工具包&#xff0c;为开发者提供了一种声明式的方式来构建出美观且功能强大的 Android 应用。在本文中&#xff0c;我们将详细介绍其中的一个重要组件——Checkbox。 一. Checkbox 简介 Checkbox 是 Jetpack Compose 中的一个组件&…

STM32单片机(三)第四节:GPIO输入练习2(光敏传感器控制蜂鸣器)

❤️ 专栏简介&#xff1a;本专栏记录了从零学习单片机的过程&#xff0c;其中包括51单片机和STM32单片机两部分&#xff1b;建议先学习51单片机&#xff0c;其是STM32等高级单片机的基础&#xff1b;这样再学习STM32时才能融会贯通。 ☀️ 专栏适用人群 &#xff1a;适用于想要…

一文学会如何使用Docker

Docker常见使用 1、Docker安装 ## 下载阿里源repo文件 $ curl -o /etc/yum.repos.d/Centos-7.repo http://mirrors.aliyun.com/repo/Centos-7.repo $ curl -o /etc/yum.repos.d/docker-ce.repo http://mirrors.aliyun.com/docker-ce/linux/centos/docker-ce.repo$ yum clean …

详解8种不同类型的防火墙

什么是防火墙&#xff1f; 防火墙是一种监视网络流量并检测潜在威胁的安全设备或程序&#xff0c;作为一道保护屏障&#xff0c;它只允许非威胁性流量进入&#xff0c;阻止危险流量进入。 防火墙是client-server模型中网络安全的基础之一&#xff0c;但它们容易受到以下方面的攻…

选择低代码平台的正确方式

传统开发模式早已不能满足大多数追求效率的企业的要求&#xff0c;低代码平台的出现正是可以缓解相应的开发压力&#xff0c;作为使用者我们更应该擦亮眼睛&#xff0c;选择合适的平台产品&#xff0c;充分利用新技术带来的新价值。小编在以前的文章有介绍过低代码平台应该如何…

php中的双引号与单引号的基本使用

字符串,在各类编程语言中都是一个非常重要的数据类型 网页当中的图片,文字,特殊符号,HTMl标签,英文等都属于字符串 PHP字符串变量用于存储并处理文本, 在创建字符串之后&#xff0c;我们就可以对它进行操作。我们可以直接在函数中使用字符串&#xff0c;或者把它存储在变量中 字…

360手机命令行进入fastboot线刷模式 360手机刷机

360手机命令行进入fastboot线刷模式 360手机刷机 参考&#xff1a;360手机-360刷机360刷机包twrp、root 360刷机包360手机刷机&#xff1a;360rom.github.io 【前言】 因360手机特殊&#xff1b;且因机器情况而异&#xff1b;导致360手机进不去fastboot线刷模式、360手机进…

基于Java+Swing实现坦克大战游戏

基于JavaSwing实现坦克大战游戏 一、系统介绍二、功能展示三、其他系统四、获取源码 一、系统介绍 此系统是使用Java语言实现坦克大战游戏程序&#xff0c;玩家通过连接访问进入游戏&#xff0c;通过操纵坦克来守卫基地&#xff0c;玩家还可以获得超级武器来提升坦克的属性&am…

点云特征描述子概述、PFH描述子提取

1、 6种点云特征描述子简概 NARF&#xff08;Normal Aligned Radial Feature&#xff09;特征点描述子&#xff1a;NARF描述子是一种基于法线对齐的径向特征描述子。它通过将点云表面分割为小的网格单元&#xff0c;并计算每个单元中的法线直方图&#xff0c;从而提取特征。NA…

【网站监控】如何监控自己的网站(接口)

网站监控-如何监控自己的网站 前言一、开始使用1、使用API进行监控数据采集?2、请求参数3、如何查看监控效果? 二、注意点 前端必备工具&#xff08;免费图床、API、chatAI等&#xff09;推荐网站LuckyCola: https://luckycola.com.cn/ 前言 网站接口监控是指对接口的状态进…

Keil为啥比IAR更受欢迎?

关注星标公众号&#xff0c;不错过精彩内容 作者 | strongerHuang 微信公众号 | strongerHuang 最近交流群在讨论【选择Keil和IAR的问题】&#xff0c;这就顺便展开来说下。 你可能觉得Keil、IAR这种集成开发环境界面比较古老&#xff0c;又不好用。 但是&#xff0c;这里告诉大…

ConcurrentModificationException异常分析与解决

ConcurrentModificationException异常分析与解决 1、场景重现&#xff0c;制造ConcurrentModificationException异常 Testpublic void ConcurrentModificationExceptionTest() {JSONArray jsonArray new JSONArray();JSONObject jsonObject new JSONObject();jsonObject.put…

改写cocos2d的ProgressTimer实现任意起始点的Radial进度条

解释一下要做的事&#xff1a; 原生ProgressTimer控件的进度起始点只能是在&#xff08;0.5&#xff0c;1&#xff09;的位置&#xff0c;如下&#xff1a; 我们要改成可以将矩形边上的任意点作为起始点&#xff0c;如下&#xff1a; 首先讲一下绘制的逻辑&#xff1a; 先根…

3: PCIe BDF(Bus,Device,Function)

目录 1.概述 2.BUS&#xff1a;总线号 3.Device&#xff1a;设备号 4.Function&#xff1a;功能号 1.概述 PCIe总线中的每一个功能都有一个唯一的标识符与之对应。这个标识符就是BDF&#xff08;Bus&#xff0c;Device&#xff0c;Function&#xff09; 2.BUS&#xff1a;总…

基于Java客户管理系统设计实现(源码+lw+部署文档+讲解等)

博主介绍&#xff1a; ✌全网粉丝30W,csdn特邀作者、博客专家、CSDN新星计划导师、java领域优质创作者,博客之星、掘金/华为云/阿里云/InfoQ等平台优质作者、专注于Java技术领域和毕业项目实战 ✌ &#x1f345; 文末获取源码联系 &#x1f345; &#x1f447;&#x1f3fb; 精…

SSD202D-GPIO调试驱动-三

前面又两个介绍 SSD202D-GPIO调试驱动-一 SSD202D-GPIO调试驱动-二 主要是调试方法: insmod gpio_lonbon.ko 然后可以再看到一下节点 //出现以下节点 proc/gpio-lb/dbg sys/kernel/debug/gpio-lb/debug sys/class/gpio-lb/ dev/gpio-lb 然后