1. 概述
Debezium 信号机制提供了一种方法来修改连接器的行为,或触发一次性操作,例如启动表的 临时快照。要触发连接器执行指定操作,可以发出特殊的SQL 命令以将信号消息添加到专门的信号表,也称为信号数据集合。在源数据库上创建的信令表专门用于与 Debezium 通信。当 Debezium 检测到新的 日志记录 或 临时快照记录 添加到信号表时,它会读取信号并启动请求的操作。
信号可用于以下 Debezium 连接器:
- DB2
- MongoDB
- MySQL
- Oracle
- PostgreSQL
- SQL Server
2. 启用信号机制
默认情况下,Debezium 信号机制被禁用。必须为要为连接器显式启用信号。
具体操作步骤如下:
-
在源数据库上,创建一个信号表,用于向连接器发送信号。有关信号表的结构信息,请参阅
2.1 信号表的结构
。 -
对于实现变更数据捕获 (CDC) 的源数据库(例如 Db2 或 SQL Server),需要为信号表启用 CDC。也就是说能够捕获信号表的日志
-
将信号表名称添加到 Debezium 连接器配置中。
在连接器配置中,添加属性signal.data.collection
,并将其值设置为 在步骤 1 中创建的信号表的完全限定名称。例如,
signal.data.collection = inventory.debezium_signal
。信号表的完全限定名称的格式取决于连接器。
以下示例显示了用于每个连接器的命名格式:-
DB2
<schemaName>.<tableName>
-
MongoDB
<databaseName>.<collectionName>
-
MySQL
<databaseName>.<tableName>
-
Oracle
<databaseName>.<schemaName>.<tableName>
-
PostgreSQL
<schemaName>.<tableName>
-
SQL Server
<databaseName>.<schemaName>.<tableName>
有关设置
signal.data.collection
属性的详细信息,请参阅连接器的配置属性表。
-
参考配置如下:
{
"name": "conn-8023",
"config": {
"connector.class": "io.debezium.connector.mysql.MySqlConnector",
"database.hostname": "localhost",
"database.port": "8023",
"database.user": "root",
"database.password": "",
"database.server.id": "8023",
"database.server.name": "local8023",
"include.schema.changes": "true",
"database.include.list": "test,debezium",
"topic.prefix": "bj",
"schema.history.internal.kafka.topic": "dbhistory.local8023",
"schema.history.internal.kafka.bootstrap.servers": "localhost:9092",
"decimal.handling.mode" : "double",
"signal.data.collection" : "debezium.debezium_signal"
}
}
注意要上图画蓝框部分 在 database.include.list
中添加debezium_signal 所在的数据库
2.1 信号表的结构
信号表保存通知连接器以触发指定操作的信号。信号表的结构必须符合以下标准格式。
- 包含三个字段(列)。
- 字段按特定顺序排列,如下表所示。
字段 | 类型 | 描述 |
---|---|---|
id (必选) | string | 标识信号实例的任意唯一字符串。需要为提交给信号表的每个信号分配一个id。 通常,id 是一个 UUID 字符串。 可以使用 信号实例 进行日志记录、调试或删除重复数据。 当信号触发 Debezium 执行增量快照时,它会生成带有任意字符串的信号消息。生成的消息包含的字符串与提交的信号中的字符串无关。 |
type (必选) | string | 指定要发送的信号类型。 可以将某些信号类型与信号可用的任何连接器一起使用,而其他信号类型仅适用于特定连接器。 |
data (可选) | string | 指定要传递给信号操作的 JSON 格式数据。 每种信号类型都需要一组特定的数据。 |
** | 信号表中的字段名称是任意的。上表提供了建议的名称。如果您使用不同的命名约定,请确保每个字段中的值与约定的内容一致。 |
---|
一些插入的测试数据
2.2 创建信号表
可以通过向源数据库提交标准 DDL SQL 来创建信号表。
先决条件
- 有足够的访问权限在目标数据库上创建表。
具体操作步骤如下:
- 向源数据库提交SQL语句,创建符合
2.1节
的表,如下例所示:
CREATE TABLE <tableName> (
id VARCHAR( <varcharValue> ) PRIMARY KEY,
type VARCHAR( <varcharValue> ) NOT NULL,
data VARCHAR( <varcharValue> ) NULL
);
** | 分配给变量VARCHAR 参数的空间id 量必须足以容纳发送到信令表的信号 ID 字符串的大小。 如果 ID 的大小超过可用空间,连接器将无法处理该信号。 |
---|
以下具体示例显示了一个CREATE TABLE
创建debezium_signal
表的命令:
CREATE TABLE debezium_signal (
id VARCHAR(42) PRIMARY KEY,
type VARCHAR(32) NOT NULL,
data VARCHAR(2048) NULL
);
3. 信号触发的操作
可以使用信号来启动以下操作:
- 添加消息到日志中。
- 触发临时快照。
- 停止执行临时快照。
- 暂停增量快照。
- 恢复增量快照。
有些信号并不与所有连接器兼容。
3.1 日志信号
可以通过创建具有log
信号类型的信号表条目来请求连接器将条目添加到日志中。处理信号后,连接器将指定的消息打印到日志中。或者,可以配置信号,以便生成的消息包含流坐标。
列名 | 具体值 | 描述 |
---|---|---|
id | 924e3ff8-2245-43ca-ba77-2af9af02fa07 | |
type | log | 信号的动作类型。 |
date | {"message": "Signal message at offset {}"} | 该message 参数指定要打印到日志的字符串。 如果向消息添加占位符 ({} ),它将替换为流坐标。 |
INSERT INTO debezium_signal(id,TYPE,DATA) VALUES(
'924e3ff8-2245-43ca-ba77-2af9af02fa07',
'log',
'{"message": "Signal message at offset {}"}'
)
比如插入以下语句后
INSERT INTO debezium_signal(id,TYPE,DATA) VALUES(
'222',
'log',
'{"message": "==========================Signal message at offset {}"}'
)
在控制台中可以看到如下输出
3.2 临时快照信号
可以通过创建具有execute-snapshot
信号类型的信号表条目来请求连接器启动临时快照。处理信号后,连接器运行请求的快照操作。
与连接器首次启动后运行的初始快照
不同,临时快照发生在运行时期间,连接器已经开始从数据库流式传输更改事件之后。可以随时启动临时快照。
临时快照可用于以下 Debezium 连接器:
- DB2
- MongoDB
- MySQL
- Oracle
- PostgreSQL
- SQL Server
列名 | 值 |
---|---|
id | d139b9b7-7777-4547-917d-e1775ea61d41 |
type | execute-snapshot |
data | {"data-collections": ["public.MyFirstTable", "public.MySecondTable"]} |
INSERT INTO debezium_signal(id,TYPE,DATA) VALUES(
'd139b9b7-7777-4547-917d-e1775ea61d41',
'execute-snapshot',
'{"data-collections": ["public.MyFirstTable", "public.MySecondTable"]}'
)
目前,execute-snapshot
操作仅触发 增量快照。
有关临时快照的更多信息,请参阅连接器文档中的快照主题。
额外资源
- Db2 连接器临时快照
- MongoDB 连接器临时快照
- MySQL 连接器临时快照
- Oracle 连接器临时快照
- PostgreSQL 连接器临时快照
- SQL Server 连接器临时快照
3.3 临时快照停止信号
可以通过创建具有stop-snapshot
信号类型的信号表条目来请求连接器停止正在进行的临时快照。处理信号后,连接器将停止当前正在进行的快照操作。
可以停止以下 Debezium 连接器的临时快照:
- DB2
- MongoDB
- MySQL
- Oracle
- PostgreSQL
- SQL Server
列名 | 值 |
---|---|
ID | d139b9b7-7777-4547-917d-e1775ea61d41 |
类型 | stop-snapshot |
数据 | {"type":"INCREMENTAL", "data-collections": ["public.MyFirstTable"]} |
INSERT INTO debezium_signal(id,TYPE,DATA) VALUES(
'd139b9b7-7777-4547-917d-e1775ea61d41',
'stop-snapshot',
'{"type":"INCREMENTAL", "data-collections": ["public.MyFirstTable"]}'
)
必须指定信号的type
。该data-collections
字段是可选的。将该data-collections
字段留空以请求连接器停止当前快照中的所有活动。如果您希望继续执行增量快照,但又想从快照中排除特定集合,请提供要排除的集合名称或正则表达式的逗号分隔列表。在连接器处理完信号后,增量快照将继续,但它会从您指定的集合中排除数据。
3.4 增量快照
增量快照是一种特定类型的临时快照。在增量快照中,连接器捕获您指定的表的基线状态,类似于初始快照。但是,与初始快照不同,增量快照以块的形式捕获表,而不是一次捕获所有表。连接器使用水位线(watermarking)方法来跟踪快照的进度。
通过在块中而不是在单个整体操作中捕获指定表的初始状态,增量快照与初始快照过程相比具有以下优势:
- 当连接器捕获指定表的基线状态时,来自事务日志的近乎实时的事件流继续不间断地进行。
- 如果增量快照过程被中断,它可以从它停止的地方恢复。
- 您可以随时启动增量快照。
3.4.1 增量快照暂停信号
可以通过使用pause-snapshot
信号类型创建信号表条目来请求连接器暂停正在进行的增量快照。处理信号后,连接器将停止暂停当前正在进行的快照操作。因此,无法指定数据收集,因为快照处理将暂停在信号处理时间的位置。
您可以为以下 Debezium 连接器暂停增量快照:
- DB2
- MongoDB
- MySQL
- Oracle
- PostgreSQL
- SQL Server
列名 | 值 |
---|---|
ID | d139b9b7-7777-4547-917d-e1775ea61d41 |
类型 | pause-snapshot |
INSERT INTO debezium_signal(id,TYPE) VALUES(
'd139b9b7-7777-4547-917d-e1775ea61d41',
'stop-snapshot'
)
必须指定信号的type
。该data
字段被忽略。
3.4.2 增量快照恢复信号
可以通过创建具有resume-snapshot
信号类型的信号表条目来请求连接器恢复暂停的增量快照。处理信号后,连接器将恢复先前暂停的快照操作。
可以为以下 Debezium 连接器恢复增量快照:
- DB2
- MongoDB
- MySQL
- Oracle
- PostgreSQL
- SQL Server
列名 | 值 |
---|---|
ID | d139b9b7-7777-4547-917d-e1775ea61d41 |
类型 | resume-snapshot |
INSERT INTO debezium_signal(id,TYPE) VALUES(
'd139b9b7-7777-4547-917d-e1775ea61d41',
'resume-snapshot'
)
必须指定信号的type
。该data
字段被忽略。
有关增量快照的更多信息,请参阅连接器文档中的快照主题。
额外资源
- Db2 连接器增量快照
- MongoDB 连接器增量快照
- MySQL 连接器增量快照
- Oracle 连接器增量快照
- PostgreSQL 连接器增量快照
- SQL Server 连接器增量快照