给Debezium connector 发送信号

news2025/1/22 9:26:41

1. 概述

Debezium 信号机制提供了一种方法来修改连接器的行为,或触发一次性操作,例如启动表的 临时快照。要触发连接器执行指定操作,可以发出特殊的SQL 命令以将信号消息添加到专门的信号表,也称为信号数据集合。在源数据库上创建的信令表专门用于与 Debezium 通信。当 Debezium 检测到新的 日志记录 或 临时快照记录 添加到信号表时,它会读取信号并启动请求的操作。

信号可用于以下 Debezium 连接器:

  • DB2
  • MongoDB
  • MySQL
  • Oracle
  • PostgreSQL
  • SQL Server

2. 启用信号机制

默认情况下,Debezium 信号机制被禁用。必须为要为连接器显式启用信号。

具体操作步骤如下:

  1. 在源数据库上,创建一个信号表,用于向连接器发送信号。有关信号表的结构信息,请参阅 2.1 信号表的结构

  2. 对于实现变更数据捕获 (CDC) 的源数据库(例如 Db2 或 SQL Server),需要为信号表启用 CDC。也就是说能够捕获信号表的日志

  3. 将信号表名称添加到 Debezium 连接器配置中。
    在连接器配置中,添加属性signal.data.collection,并将其值设置为 在步骤 1 中创建的信号表的完全限定名称。

    例如,signal.data.collection = inventory.debezium_signal

    信号表的完全限定名称的格式取决于连接器。
    以下示例显示了用于每个连接器的命名格式:

    • DB2

      <schemaName>.<tableName>

    • MongoDB

      <databaseName>.<collectionName>

    • MySQL

      <databaseName>.<tableName>

    • Oracle

      <databaseName>.<schemaName>.<tableName>

    • PostgreSQL

      <schemaName>.<tableName>

    • SQL Server

      <databaseName>.<schemaName>.<tableName>

      有关设置signal.data.collection属性的详细信息,请参阅连接器的配置属性表。

参考配置如下:

{
  "name": "conn-8023",
 "config": {
	"connector.class": "io.debezium.connector.mysql.MySqlConnector",
	"database.hostname": "localhost",
	"database.port": "8023",
	"database.user": "root",
	"database.password": "",
	"database.server.id": "8023",
	"database.server.name": "local8023",
	"include.schema.changes": "true",
	"database.include.list": "test,debezium",
	"topic.prefix": "bj",
	"schema.history.internal.kafka.topic": "dbhistory.local8023",
	"schema.history.internal.kafka.bootstrap.servers": "localhost:9092",
	"decimal.handling.mode"  : "double",
	"signal.data.collection" : "debezium.debezium_signal"
	}
}

image.png

注意要上图画蓝框部分 在 database.include.list 中添加debezium_signal 所在的数据库

2.1 信号表的结构

信号表保存通知连接器以触发指定操作的信号。信号表的结构必须符合以下标准格式。

  • 包含三个字段(列)。
  • 字段按特定顺序排列,如下表所示。
字段类型描述
id (必选)string标识信号实例的任意唯一字符串。需要为提交给信号表的每个信号分配一个id。 通常,id 是一个 UUID 字符串。 可以使用 信号实例 进行日志记录、调试或删除重复数据。 当信号触发 Debezium 执行增量快照时,它会生成带有任意字符串的信号消息。生成的消息包含的字符串与提交的信号中的字符串无关。
type (必选)string指定要发送的信号类型。 可以将某些信号类型与信号可用的任何连接器一起使用,而其他信号类型仅适用于特定连接器。
data (可选)string指定要传递给信号操作的 JSON 格式数据。 每种信号类型都需要一组特定的数据。
**信号表中的字段名称是任意的。上表提供了建议的名称。如果您使用不同的命名约定,请确保每个字段中的值与约定的内容一致。

image.png

一些插入的测试数据
image.png

2.2 创建信号表

可以通过向源数据库提交标准 DDL SQL 来创建信号表。

先决条件

  • 有足够的访问权限在目标数据库上创建表。

具体操作步骤如下:

  • 向源数据库提交SQL语句,创建符合2.1节 的表,如下例所示:

   CREATE TABLE  <tableName>  (
       id   VARCHAR( <varcharValue> ) PRIMARY KEY, 
       type VARCHAR( <varcharValue> ) NOT NULL, 
       data VARCHAR( <varcharValue> ) NULL
   );
**分配给变量VARCHAR参数的空间id量必须足以容纳发送到信令表的信号 ID 字符串的大小。 如果 ID 的大小超过可用空间,连接器将无法处理该信号。

以下具体示例显示了一个CREATE TABLE创建debezium_signal表的命令:


CREATE TABLE debezium_signal (
     id   VARCHAR(42) PRIMARY KEY, 
     type VARCHAR(32) NOT NULL, 
     data VARCHAR(2048) NULL
);

3. 信号触发的操作

可以使用信号来启动以下操作:

  • 添加消息到日志中。
  • 触发临时快照。
  • 停止执行临时快照。
  • 暂停增量快照。
  • 恢复增量快照。

有些信号并不与所有连接器兼容。

3.1 日志信号

可以通过创建具有log信号类型的信号表条目来请求连接器将条目添加到日志中。处理信号后,连接器将指定的消息打印到日志中。或者,可以配置信号,以便生成的消息包含流坐标。

列名具体值描述
id924e3ff8-2245-43ca-ba77-2af9af02fa07
typelog信号的动作类型。
date{"message": "Signal message at offset {}"}message参数指定要打印到日志的字符串。 如果向消息添加占位符 ({}),它将替换为流坐标。

INSERT INTO debezium_signal(id,TYPE,DATA) VALUES(
   '924e3ff8-2245-43ca-ba77-2af9af02fa07', 
   'log',
   '{"message": "Signal message at offset {}"}'
)

比如插入以下语句后


INSERT INTO debezium_signal(id,TYPE,DATA) VALUES(
   '222', 
   'log',
   '{"message": "==========================Signal message at offset {}"}'
)

在控制台中可以看到如下输出
image.png

3.2 临时快照信号

可以通过创建具有execute-snapshot信号类型的信号表条目来请求连接器启动临时快照。处理信号后,连接器运行请求的快照操作。

与连接器首次启动后运行的初始快照不同,临时快照发生在运行时期间,连接器已经开始从数据库流式传输更改事件之后。可以随时启动临时快照。

临时快照可用于以下 Debezium 连接器:

  • DB2
  • MongoDB
  • MySQL
  • Oracle
  • PostgreSQL
  • SQL Server
列名
idd139b9b7-7777-4547-917d-e1775ea61d41
typeexecute-snapshot
data{"data-collections": ["public.MyFirstTable", "public.MySecondTable"]}

INSERT INTO debezium_signal(id,TYPE,DATA) VALUES(
   'd139b9b7-7777-4547-917d-e1775ea61d41', 
   'execute-snapshot',
   '{"data-collections": ["public.MyFirstTable", "public.MySecondTable"]}'
)

目前,execute-snapshot操作仅触发 增量快照。

有关临时快照的更多信息,请参阅连接器文档中的快照主题。

额外资源

  • Db2 连接器临时快照
  • MongoDB 连接器临时快照
  • MySQL 连接器临时快照
  • Oracle 连接器临时快照
  • PostgreSQL 连接器临时快照
  • SQL Server 连接器临时快照

3.3 临时快照停止信号

可以通过创建具有stop-snapshot信号类型的信号表条目来请求连接器停止正在进行的临时快照。处理信号后,连接器将停止当前正在进行的快照操作。

可以停止以下 Debezium 连接器的临时快照:

  • DB2
  • MongoDB
  • MySQL
  • Oracle
  • PostgreSQL
  • SQL Server
列名
IDd139b9b7-7777-4547-917d-e1775ea61d41
类型stop-snapshot
数据{"type":"INCREMENTAL", "data-collections": ["public.MyFirstTable"]}

INSERT INTO debezium_signal(id,TYPE,DATA) VALUES(
   'd139b9b7-7777-4547-917d-e1775ea61d41', 
   'stop-snapshot',
   '{"type":"INCREMENTAL", "data-collections": ["public.MyFirstTable"]}'
)

必须指定信号的type。该data-collections字段是可选的。将该data-collections字段留空以请求连接器停止当前快照中的所有活动。如果您希望继续执行增量快照,但又想从快照中排除特定集合,请提供要排除的集合名称或正则表达式的逗号分隔列表。在连接器处理完信号后,增量快照将继续,但它会从您指定的集合中排除数据。

3.4 增量快照

增量快照是一种特定类型的临时快照。在增量快照中,连接器捕获您指定的表的基线状态,类似于初始快照。但是,与初始快照不同,增量快照以块的形式捕获表,而不是一次捕获所有表。连接器使用水位线(watermarking)方法来跟踪快照的进度。

通过在块中而不是在单个整体操作中捕获指定表的初始状态,增量快照与初始快照过程相比具有以下优势:

  • 当连接器捕获指定表的基线状态时,来自事务日志的近乎实时的事件流继续不间断地进行。
  • 如果增量快照过程被中断,它可以从它停止的地方恢复。
  • 您可以随时启动增量快照。

3.4.1 增量快照暂停信号

可以通过使用pause-snapshot信号类型创建信号表条目来请求连接器暂停正在进行的增量快照。处理信号后,连接器将停止暂停当前正在进行的快照操作。因此,无法指定数据收集,因为快照处理将暂停在信号处理时间的位置。

您可以为以下 Debezium 连接器暂停增量快照:

  • DB2
  • MongoDB
  • MySQL
  • Oracle
  • PostgreSQL
  • SQL Server
列名
IDd139b9b7-7777-4547-917d-e1775ea61d41
类型pause-snapshot

INSERT INTO debezium_signal(id,TYPE) VALUES(
   'd139b9b7-7777-4547-917d-e1775ea61d41', 
   'stop-snapshot'
)

必须指定信号的type。该data字段被忽略。

3.4.2 增量快照恢复信号

可以通过创建具有resume-snapshot信号类型的信号表条目来请求连接器恢复暂停的增量快照。处理信号后,连接器将恢复先前暂停的快照操作。

可以为以下 Debezium 连接器恢复增量快照:

  • DB2
  • MongoDB
  • MySQL
  • Oracle
  • PostgreSQL
  • SQL Server
列名
IDd139b9b7-7777-4547-917d-e1775ea61d41
类型resume-snapshot

INSERT INTO debezium_signal(id,TYPE) VALUES(
   'd139b9b7-7777-4547-917d-e1775ea61d41', 
   'resume-snapshot'
)

必须指定信号的type。该data字段被忽略。

有关增量快照的更多信息,请参阅连接器文档中的快照主题。

额外资源

  • Db2 连接器增量快照
  • MongoDB 连接器增量快照
  • MySQL 连接器增量快照
  • Oracle 连接器增量快照
  • PostgreSQL 连接器增量快照
  • SQL Server 连接器增量快照

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/116097.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

哪些进销存软件既好用又免费?

中小企业刚起步都会面临着资金紧缺、人力资源不足等诸多管理问题&#xff0c;对于大部分预算不是很多的中小企业或者尚未尝试过进销存软件的企业&#xff0c;都会选择免费进销存软件来进行企业管理或体验进销存软件的功效。 进销存软件的开发需要强大的技术支持&#xff0c;数…

B站技术选型与架构

目录前言B站前端之路B站Golang技术栈分析bilibili技术总监毛剑简介前言 了解了一下B站的技术发展历程&#xff1a;最开始是用PHP语言开发的&#xff0c;后来B站的中台逐步被Node占领&#xff0c;而后台技术为了更高的并发、更稳健&#xff0c;以及为了大数据分析&#xff0c;逐…

RK3568平台开发系列讲解(Linux系统篇)共享内存

🚀返回专栏总目录 文章目录 一、共享内存底层实现二、共享内存API三、共享内存案例沉淀、分享、成长,让自己和他人都能有所收获!😄 📢共享内存技术是功能最强、应用最广的进程间通信技术。其原理是多个进程共享相同的物理内存区,一个进程对该内存区的任意修改,可被其…

不懂Nacos没关系,可以看看它是怎么运用代理模式的

背景 看Nacos的源代码时&#xff0c;发现其中有对代理模式的运用&#xff0c;而且用得还不错&#xff0c;可以作为一个典型案例来聊聊&#xff0c;方便大家以更真实的案例来体验一下代理模式的运用。如果你对Nacos不了解&#xff0c;也并不影响对本篇文章的阅读和学习。 本文…

前端三小时用html和js写一个贪吃蛇游戏,非常简单带讲解,代码可直接用,功能完整

目录 游戏主体部分--地狱模式 游戏主页入口 预览图 游戏入口代码 1.html 2.css 3.js 注册页面代码 游戏实现很简单&#xff0c;只写游戏主体的话只要三小时就够了。 话不多说&#xff0c;我们直接来看效果预览。 转成gif图之后有点卡&#xff0c;但是游戏效果并不卡&…

php宝塔搭建部署实战PESCMSTEAM团队任务管理系统源码

大家好啊&#xff0c;我是测评君&#xff0c;欢迎来到web测评。 本期给大家带来一套php开发的PESCMSTEAM团队任务管理系统源码&#xff0c;感兴趣的朋友可以自行下载学习。 技术架构 PHP7.2 nginx mysql5.7 JS CSS HTMLcnetos7以上 宝塔面板 文字搭建教程 下载源码&am…

k8s lifecycle——poststart和prestop

1、lifecycle的声明 lifecycle:postStart:exec:command: ["/bin/sh", "-c", "sleep 100"]preStop:exec:command: ["/bin/sh", "-c", "sleep 100"]2、poststart 容器创建后立即执行&#xff0c;主要用于资源部署、…

JPEG编码原理及简易编码器实现

简介 以学习为目的编写的简易jpeg编码器&#xff0c;以看得懂为目标&#xff0c;代码尽可能清晰简洁&#xff0c;不对内存、性能做看不懂的优化&#xff0c;也不实现jpeg更多高级特性。 这篇文章是我从自己的开源工程中整理来的 本文对应的工程为https://gitee.com/dma/learn…

【OpenFOAM】-olaFlow-算例4- irreg45degTank

算例路径&#xff1a; olaFlow\tutorials\irreg45degTank 算例描述&#xff1a; 不规则波浪模拟 学习目标&#xff1a; 不规则波浪模拟&#xff1a;olaFlow中单向不规则波采用线性波浪叠加法生成&#xff0c;基本原理如图2所受&#xff0c;需要提供对应波谱的周期、波高和相位的…

生产制造业管理系统对企业究竟有哪些作用?

对于生产制造企业来说&#xff0c;除了涉及到产品的生产制造和原料采购&#xff0c;还需要管理销售、库存、财务等方方面面&#xff0c;生产制造业管理系统的使用&#xff0c;尤为重要。正因如此&#xff0c;借助生产制造业管理系统来完善生产管理流程、提升生产管理水平&#…

LVGL学习笔记4 - 主题Themes

目录 1. 获取主题句柄 2. 设置基础主题 3. 设置主题的回调函数 4. 使能主题 5. 实例 5.1 定义一个全局Style变量 5.2 显示默认主题风格的矩形 5.3 初始化新主题的样式 5.4 初始化新主题 5.5 回调函数的实现 5.6 设置新主题 5.7 显示 主题是风格的集合。对应的变量结构…

设计模式--reactor 模式

说明 本文基于 tomcat 8.5.x 编写。author blog.jellyfishmix.com / JellyfishMIX - githubLICENSE GPL-2.0 介绍 reactor 模式通常应用于网络 IO 场景&#xff0c;高性能的中间件 redis, netty 都在使用。 背景 原始的网络 IO 模型 最原始的网络 IO 模型&#xff0c;服务…

Java学习笔记【8】异常

⛵ ⛵ ⛵ ⛵ ⛵ &#x1f680; &#x1f680; &#x1f680; &#x1f680; &#x1f680;   大家好&#x1f91d;&#xff0c;我是 &#x1f449;老孙&#x1f448;&#xff0c;未来学习路上多多关照 &#x1f91d; 一个喜欢用 ✍️ 博客记录人生的程序猿 &#x1f649;&…

Python遥感图像处理应用篇(二十七):Python绘制遥感图像各波段热力图(相关系数矩阵)(续)

续-https://soderayer.blog.csdn.net/article/details/125757807 上一篇中使用csv文件计算的相关系数热力图,本篇我们直接使用遥感图像来计算图像波段之间的相关系数。 方法一:已有软件ENVI计算 实际上,目前已有的软件,如ENVI就可以直接计算图像波段之间的相关系数,该工…

【高精度定位】关于GPS、RTK、PPK三种定位技术的探讨

高精度定位通常是指亚米级、厘米级以及毫米级的定位&#xff0c;从市场需求来看&#xff0c;定位的精度越高往往越好。“高精度、低成本”的定位方案无疑将是未来市场的趋势。 在物联网时代&#xff0c;大多数的应用或多或少都与位置服务相关联&#xff0c;尤其是对于移动物体而…

深入理解MySQL——分库分表种类与原则

分库分表的种类 首先说明&#xff0c;这里所说的分库分表是指把数据库中数据物理地拆分到多个实例或多台机器上去&#xff0c;而不是MySQL原生的Partitioning。 这里稍微提一下Partitioning&#xff0c;这是MySQL官方版本支持的&#xff0c;在本地针对表的分区进行操作&#…

[Flask]各种子功能的实现

一、标准Flask架构搭建 ①config.py 新建一个文件config.py&#xff0c;在其中进行参数初始化&#xff0c;再使用下面代码加载到app.py&#xff08;主程序&#xff09;中 import config app.config.from_object(config) #由config.py初始化 ②exts.py 用于放置扩展模块&a…

(二十四)Vue之props配置项

文章目录props基本使用props的数组形式props的对象形式检测类型检测类型 其他验证Vue学习目录 上一篇&#xff1a;&#xff08;二十三&#xff09;Vue之ref属性 props props 可以是数组或对象&#xff0c;用于让组件接收外部传过来的数据 约定props是只读的&#xff0c;Vue…

开源 高性能 云原生!时序数据库 TDengine 上线亚马逊Marketplace

近日&#xff0c;涛思数据旗下开源、高性能、云原生的时序数据库&#xff08;Time Series Database&#xff0c;TSDB&#xff09;TDengine 成功上线亚马逊云科技 Marketplace&#xff0c;为用户提供了更加丰富的订阅渠道。 TDengine 是针对时序数据特点研发和优化的数据库解决方…

CentOS8 Elasticsearch8.x 安装遇到的问题解决汇总

报错清单 启动报错&#xff1a;ERROR: Elasticsearch exited unexpectedly curl测试报错&#xff1a;curl: (52) Empty reply from server 报错解决 启动报错 起因 使用archive方式安装elasticsearch后&#xff0c;在目录中运行./bin/elasticsearch报错如下&#xff1a; 原…