TiDB系列之:TiCDC同步TiDB数据库数据到Kafka集群Topic
- 一、Changefeed 概述
- Changefeed 状态流转
- 操作 Changefeed
- 二、同步数据到Kafka
- 创建同步任务,复制增量数据 Kafka
- Sink URI 配置 kafka
- 最佳实践
- TiCDC 使用 Kafka 的认证与授权
- TiCDC 集成 Kafka Connect
- 三、自定义 Kafka Sink 的 Topic 和 Partition 的分发规则
- Matcher 匹配规则
- Topic 分发器
- DDL 事件的分发
- Partition 分发器
- 列选择功能
- 处理超过 Kafka Topic 限制的消息
- TiCDC 层数据压缩功能
使用TiCDC同步TiDB数据库数据方案如下:
- TiDB系列之:使用TiCDC增量同步TiDB数据库数据
- TiDB系列之:TiCDC同步数据到Kafka集群使用Debezium数据格式
使用 TiCDC 创建一个将增量数据复制到 Kafka 的 Changefeed。
一、Changefeed 概述
Changefeed 是 TiCDC 中的单个同步任务。Changefeed 将一个 TiDB 集群中数张表的变更数据输出到一个指定的下游。TiCDC 集群可以运行和管理多个 Changefeed。
Changefeed 状态流转
同步任务状态标识了同步任务的运行情况。在 TiCDC 运行过程中,同步任务可能会运行出错、手动暂停、恢复,或达到指定的 TargetTs,这些行为都可以导致同步任务状态发生变化。本节描述 TiCDC 同步任务的各状态以及状态之间的流转关系。
以上状态流转图中的状态说明如下:
- Normal:同步任务正常进行,checkpoint-ts 正常推进。处于这个状态的 changefeed 会阻塞 GC 推进。
- Stopped:同步任务停止,由于用户手动暂停 (pause) changefeed。处于这个状态的 changefeed 会阻挡 GC 推进。
- Warning:同步任务报错,由于某些可恢复的错误导致同步无法继续进行。处于这个状态的 changefeed 会不断重试,试图继续推进,直到状态转为 Normal。默认重试时间为 30 分钟(可以通过 changefeed-error-stuck-duration 调整),超过该时间,changefeed 会进入 Failed 状态。处于这个状态的 changefeed 会阻挡 GC 推进。
- Finished:同步任务完成,同步任务进度已经达到预设的 TargetTs。处于这个状态的 changefeed 不会阻挡 GC 推进。
- Failed:同步任务失败。处于这个状态的 changefeed 不会自动尝试恢复。为了让用户有足够的时间处理故障,处于这个状态的 changefeed 会阻塞 GC 推进,阻塞时长为 gc-ttl 所设置的值,其默认值为 24 小时。在此期间,如果导致任务失败的问题被修复,用户可以手动恢复 changefeed。超过了 gc-ttl 时长后,如果 changefeed 仍然处于 Failed 状态,则同步任务无法恢复。
注意:
- 如果是因为 changefeed 阻塞了 GC, 则 changefeed 最多阻塞 GC 推进 gc-ttl 所指定的时长,超过该时长后,changefeed 会被设置成 failed 状态,错误类型为 ErrGCTTLExceeded,不再阻塞 GC 推进。
- 如果 changefeed 遭遇错误码为 ErrGCTTLExceeded、ErrSnapshotLostByGC 或者 ErrStartTsBeforeGC 类型的错误,则不再阻塞 GC 推进。
以上状态流转图中的编号说明如下:
- ① 执行 changefeed pause 命令。
- ② 执行 changefeed resume 恢复同步任务。
- ③ changefeed 运行过程中发生可恢复的错误,自动重试。
- ④ changefeed 自动重试成功,checkpoint-ts 已经继续推进。
- ⑤ changefeed 自动重试超过 30 分钟,重试失败,进入 failed 状态。此时 changefeed 会继续阻塞上游 GC,阻塞时长为 gc-ttl 所配置的时长。
- ⑥ changefeed 遇到不可重试错误,直接进入 failed 状态。此时 changefeed 会继续阻塞上游 GC,阻塞时长为 gc-ttl 所配置的时长。
- ⑦ changefeed 的同步进度到达 target-ts 设置的值,完成同步。
- ⑧ changefeed 停滞时间超过 gc-ttl 所指定的时长,遭遇 GC 推进错误,不可被恢复。
- ⑨ changefeed 停滞时间小于 gc-ttl 所指定的时长,故障原因被修复,执行 changefeed resume 恢复同步任务。
操作 Changefeed
通过 TiCDC 提供的命令行工具 cdc cli,你可以管理 TiCDC 集群和同步任务,具体可参考管理 TiCDC Changefeed。你也可以通过 HTTP 接口,即 TiCDC OpenAPI 来管理 TiCDC 集群和同步任务,详见 TiCDC OpenAPI。
如果你使用的 TiCDC 是用 TiUP 部署的,可以通过 tiup cdc:v<CLUSTER_VERSION> cli 来使用 TiCDC 命令行工具,注意需要将 v<CLUSTER_VERSION> 替换为 TiCDC 集群版本,例如 v8.1.0。你也可以通过直接执行 cdc cli 直接使用命令行工具。
二、同步数据到Kafka
创建同步任务,复制增量数据 Kafka
使用以下命令来创建同步任务:
cdc cli changefeed create \
--server=http://10.0.10.25:8300 \
--sink-uri="kafka://127.0.0.1:9092/topic-name?protocol=canal-json&kafka-version=2.4.0&partition-num=6&max-message-bytes=67108864&replication-factor=1" \
--changefeed-id="simple-replication-task"
Create changefeed successfully!
ID: simple-replication-task
Info: {"sink-uri":"kafka://127.0.0.1:9092/topic-name?protocol=canal-json&kafka-version=2.4.0&partition-num=6&max-message-bytes=67108864&replication-factor=1","opts":{},"create-time":"2023-11-28T22:04:08.103600025+08:00","start-ts":415241823337054209,"target-ts":0,"admin-job-type":0,"sort-engine":"unified","sort-dir":".","config":{"case-sensitive":false,"filter":{"rules":["*.*"],"ignore-txn-start-ts":null,"ddl-allow-list":null},"mounter":{"worker-num":16},"sink":{"dispatchers":null},"scheduler":{"type":"table-number","polling-time":-1}},"state":"normal","history":null,"error":null}
- –server:TiCDC 集群中任意一个 TiCDC 服务器的地址。
- –changefeed-id:同步任务的 ID,如果不指定该 ID,TiCDC 会自动生成一个 UUID。
- –sink-uri:同步任务下游的地址,详见:Sink URI 配置 Kafka。
- –start-ts:指定 changefeed 的开始 TSO。TiCDC 集群将从这个 TSO 开始拉取数据。默认为当前时间。
- –target-ts:指定 changefeed 的目标 TSO。TiCDC 集群拉取数据直到这个 TSO 停止。默认为空,即 TiCDC 不会自动停止。
- –config:指定 changefeed 配置文件,详见:TiCDC Changefeed 配置参数。
Sink URI 配置 kafka
Sink URI 用于指定 TiCDC 目标系统的连接信息,遵循以下格式:
[scheme]://[host]:[port][/path]?[query_parameters]
如果下游 Kafka 有多个主机或端口,可以在 Sink URI 中配置多个 [host]:[port]
URI 中可配置的的参数如下:
参数 | 描述 |
---|---|
topic-name | 变量,使用的 Kafka topic 名字。 |
kafka-version | 下游 Kafka 版本号。该值需要与下游 Kafka 的实际版本保持一致。 |
kafka-client-id | 指定同步任务的 Kafka 客户端的 ID(可选,默认值为 TiCDC_sarama_producer_同步任务的 ID)。 |
partition-num | 下游 Kafka partition 数量(可选,不能大于实际 partition 数量,否则创建同步任务会失败,默认值 3)。 |
max-message-bytes | 每次向 Kafka broker 发送消息的最大数据量(可选,默认值 10MB)。从 v5.0.6 和 v4.0.6 开始,默认值分别从 64MB 和 256MB 调整至 10 MB。 |
replication-factor | Kafka 消息保存副本数(可选,默认值 1),需要大于等于 Kafka 中 min.insync.replicas 的值。 |
required-acks | 在 Produce 请求中使用的配置项,用于告知 broker 需要收到多少副本确认后才进行响应。可选值有:0(NoResponse:不发送任何响应,只有 TCP ACK),1(WaitForLocal:仅等待本地提交成功后再响应)和 -1(WaitForAll:等待所有同步副本提交后再响应。最小同步副本数量可通过 broker 的 min.insync.replicas 配置项进行配置)。(可选,默认值为 -1)。 |
compression | 设置发送消息时使用的压缩算法(可选值为 none、lz4、gzip、snappy 和 zstd,默认值为 none)。注意 Snappy 压缩文件必须遵循官方 Snappy 格式。不支持其他非官方压缩格式。 |
protocol | 输出到 Kafka 的消息协议,可选值有 canal-json、open-protocol、avro、debezium 和 simple。 |
auto-create-topic | 当传入的 topic-name 在 Kafka 集群不存在时,TiCDC 是否要自动创建该 topic(可选,默认值 true)。 |
enable-tidb-extension | 可选,默认值是 false。当输出协议为 canal-json 时,如果该值为 true,TiCDC 会发送 WATERMARK 事件,并在 Kafka 消息中添加 TiDB 扩展字段。从 6.1.0 开始,该参数也可以和输出协议 avro 一起使用。如果该值为 true,TiCDC 会在 Kafka 消息中添加三个 TiDB 扩展字段。 |
max-batch-size | 从 v4.0.9 开始引入。当消息协议支持把多条变更记录输出至一条 Kafka 消息时,该参数用于指定这一条 Kafka 消息中变更记录的最多数量。目前,仅当 Kafka 消息的 protocol 为 open-protocol 时有效(可选,默认值 16)。 |
sasl-user | 连接下游 Kafka 实例所需的 SASL/PLAIN 或 SASL/SCRAM 认证的用户名(authcid)(可选)。 |
sasl-password | 连接下游 Kafka 实例所需的 SASL/PLAIN 或 SASL/SCRAM 认证的密码(可选)。如有特殊字符,需要用 URL encode 转义。 |
sasl-mechanism | 连接下游 Kafka 实例所需的 SASL 认证方式的名称,可选值有 plain、scram-sha-256、scram-sha-512 和 gssapi。 |
dial-timeout | 和下游 Kafka 建立连接的超时时长,默认值为 10s。 |
read-timeout | 读取下游 Kafka 返回的 response 的超时时长,默认值为 10s。 |
write-timeout | 向下游 Kafka 发送 request 的超时时长,默认值为 10s。 |
avro-decimal-handling-mode | 仅在输出协议是 avro 时有效。该参数决定了如何处理 DECIMAL 类型的字段,值可以是 string 或 precise,表明映射成字符串还是浮点数。 |
avro-bigint-unsigned-handling-mode | 仅在输出协议是 avro 时有效。该参数决定了如何处理 BIGINT UNSIGNED 类型的字段,值可以是 string 或 long,表明映射成字符串还是 64 位整型。 |
最佳实践
- TiCDC 推荐用户自行创建 Kafka Topic,你至少需要设置该 Topic 每次向 Kafka broker 发送消息的最大数据量和下游 Kafka partition 的数量。在创建 changefeed 的时候,这两项设置分别对应 max-message-bytes 和 partition-num 参数。
- 如果你在创建 changefeed 时,使用了尚未存在的 Topic,那么 TiCDC 会尝试使用 partition-num 和 replication-factor 参数自行创建 Topic。建议明确指定这两个参数。
TiCDC 使用 Kafka 的认证与授权
使用 Kafka 的 SASL 认证时配置样例如下所示:
- SASL/PLAIN
--sink-uri="kafka://127.0.0.1:9092/topic-name?kafka-version=2.4.0&sasl-user=alice-user&sasl-password=alice-secret&sasl-mechanism=plain"
- SASL/SCRAM
SCRAM-SHA-256、SCRAM-SHA-512 与 PLAIN 方式类似,只需要将 sasl-mechanism 指定为对应的认证方式即可。
TiCDC 集成 Kafka Connect
如要使用 Confluent 提供的 data connectors 向关系型或非关系型数据库传输数据,请选择 avro 协议,并在 schema-registry 中提供 Confluent Schema Registry 的 URL。
配置样例如下所示:
--sink-uri="kafka://127.0.0.1:9092/topic-name?&protocol=avro&replication-factor=3"
--schema-registry="http://127.0.0.1:8081"
--config changefeed_config.toml
[sink]
dispatchers = [
{matcher = ['*.*'], topic = "tidb_{schema}_{table}"},
]
三、自定义 Kafka Sink 的 Topic 和 Partition 的分发规则
Matcher 匹配规则
以如下示例配置文件中的 dispatchers 配置项为例:
[sink]
dispatchers = [
{matcher = ['test1.*', 'test2.*'], topic = "Topic 表达式 1", partition = "ts" },
{matcher = ['test3.*', 'test4.*'], topic = "Topic 表达式 2", partition = "index-value" },
{matcher = ['test1.*', 'test5.*'], topic = "Topic 表达式 3", partition = "table"},
{matcher = ['test6.*'], partition = "ts"}
]
- 对于匹配了 matcher 规则的表,按照对应的 topic 表达式指定的策略进行分发。例如表 test3.aa,按照 topic 表达式 2 分发;表 test5.aa,按照 topic 表达式 3 分发。
- 对于匹配了多个 matcher 规则的表,以靠前的 matcher 对应的 topic 表达式为准。例如表 test1.aa,按照 topic 表达式 1 分发。
- 对于没有匹配任何 matcher 的表,将对应的数据变更事件发送到 --sink-uri 中指定的默认 topic 中。例如表 test10.aa 发送到默认 topic。
- 对于匹配了 matcher 规则但是没有指定 topic 分发器的表,将对应的数据变更发送到 --sink-uri 中指定的默认 topic 中。例如表 test6.aa 发送到默认 topic。
Topic 分发器
Topic 分发器用 topic = “xxx” 来指定,并使用 topic 表达式来实现灵活的 topic 分发策略。topic 的总数建议小于 1000。
Topic 表达式的基本规则为 [prefix][{schema}][middle][{table}][suffix],详细解释如下:
- prefix:可选项,代表 Topic Name 的前缀。
- {schema}:可选项,用于匹配库名。
- middle:可选项,代表库表名之间的分隔符。
- {table}:可选项,用于匹配表名。
- suffix:可选项,代表 Topic Name 的后缀。
其中 prefix、middle 以及 suffix 仅允许出现大小写字母(a-z、A-Z)、数字(0-9)、点号(.)、下划线(_)和中划线(-);{schema}、{table} 均为小写,诸如 {Schema} 以及 {TABLE} 这样的占位符是无效的。
一些示例如下:
- matcher = [‘test1.table1’, ‘test2.table2’], topic = “hello_{schema}_{table}”
- 对于表 test1.table1 对应的数据变更事件,发送到名为 hello_test1_table1 的 topic 中。
- 对于表 test2.table2 对应的数据变更事件,发送到名为 hello_test2_table2 的 topic 中。
- matcher = [‘test3.', 'test4.’], topic = “hello_{schema}_world”
- 对于 test3 下的所有表对应的数据变更事件,发送到名为 hello_test3_world 的 topic 中。
- 对于 test4 下的所有表对应的数据变更事件,发送到名为 hello_test4_world 的 topic 中。
- matcher = [‘test5., 'test6.’], topic = “hard_code_topic_name”
- 对于 test5 和 test6 下的所有表对应的数据变更事件,发送到名为 hard_code_topic_name 的 topic 中。你可以直接指定 topic 名称。
- matcher = [‘.’], topic = “{schema}_{table}”
- 对于 TiCDC 监听的所有表,按照“库名_表名”的规则分别分发到独立的 topic 中;例如对于 test.account 表,TiCDC 会将其数据变更日志分发到名为 test_account 的 Topic 中。
DDL 事件的分发
库级别 DDL
诸如 create database、drop database 这类和某一张具体的表无关的 DDL,称之为库级别 DDL。对于库级别 DDL 对应的事件,被发送到 --sink-uri 中指定的默认 topic 中。
表级别 DDL
诸如 alter table、create table 这类和某一张具体的表相关的 DDL,称之为表级别 DDL。对于表级别 DDL 对应的事件,按照 dispatchers 的配置,被发送到相应的 topic 中。
例如,对于 matcher = [‘test.*’], topic = {schema}_{table} 这样的 dispatchers 配置,DDL 事件分发情况如下:
- 若 DDL 事件中涉及单张表,则将 DDL 事件原样发送到相应的 topic 中。
- 对于 DDL 事件 drop table test.table1,该事件会被发送到名为 test_table1 的 topic 中。
- 若 DDL 事件中涉及多张表(rename table / drop table / drop view 都可能涉及多张表),则将单个 DDL 事件拆分为多个发送到相应的 topic 中。
- 对于 DDL 事件 rename table test.table1 to test.table10, test.table2 to test.table20,则将 rename table test.table1 to test.table10 的 DDL 事件发送到名为 test_table1 的 topic 中,将 rename table test.table2 to test.table20 的 DDL 事件发送到名为 test.table2 的 topic 中。
Partition 分发器
partition 分发器用 partition = “xxx” 来指定,支持 default、index-value、columns、table 和 ts 共五种 partition 分发器,分发规则如下:
- default:默认使用 table 分发规则。使用所属库名和表名计算 partition 编号,一张表的数据被发送到相同的 partition。单表数据只存在于一个 partition 中并保证有序,但是发送吞吐量有限,无法通过添加消费者的方式提升消费速度。
- index-value:使用事件所属表的主键、唯一索引或由 index 配置指定的索引的值计算 partition 编号,一张表的数据被发送到多个 partition。单表数据被发送到多个 partition 中,每个 partition 中的数据有序,可以通过添加消费者的方式提升消费速度。
- columns:使用由 columns 指定的列的值计算 partition 编号。一张表的数据被发送到多个 partition。单表数据被发送到多个 partition 中,每个 partition 中的数据有序,可以通过添加消费者的方式提升消费速度。
- table:使用事件所属的表的库名和表名计算 partition 编号。
- ts:使用事件的 commitTs 计算 partition 编号。一张表的数据被发送到多个 partition。单表数据被发送到多个 partition 中,每个 partition 中的数据有序,可以通过添加消费者的方式提升消费速度。一条数据的多次修改可能被发送到不同的 partition 中。消费者消费进度不同可能导致消费端数据不一致。因此,消费端需要对来自多个 partition 的数据按 commitTs 排序后再进行消费。
以如下示例配置文件中的 dispatchers 配置项为例:
[sink]
dispatchers = [
{matcher = ['test.*'], partition = "index-value"},
{matcher = ['test1.*'], partition = "index-value", index = "index1"},
{matcher = ['test2.*'], partition = "columns", columns = ["id", "a"]},
{matcher = ['test3.*'], partition = "table"},
]
- 任何属于库 test 的表均使用 index-value 分发规则,即使用主键或者唯一索引的值计算 partition 编号。如果有主键则使用主键,否则使用最短的唯一索引。
- 任何属于库 test1 的表均使用 index-value 分发规则,并且使用名为 index1 的索引的所有列的值计算 partition 编号。如果指定的索引不存在,则报错。注意,index 指定的索引必须是唯一索引。
- 任何属于库 test2 的表均使用 columns 分发规则,并且使用列 id 和 a 的值计算 partition 编号。如果任一列不存在,则报错。
- 任何属于库 test3 的表均使用 table 分发规则。
- 对于属于库 test4 的表,因为不匹配上述任何一个规则,所以使用默认的 default,即 table 分发规则。
如果一张表,匹配了多个分发规则,以第一个匹配的规则为准。
注意
- 从 v6.1 开始,为了明确配置项的含义,用来指定 partition 分发器的配置项由原来的 dispatcher 改为 partition,partition 为 dispatcher 的别名。例如,以下两条规则完全等价:
[sink]
dispatchers = [
{matcher = ['*.*'], dispatcher = "index-value"},
{matcher = ['*.*'], partition = "index-value"},
]
但是 dispatcher 与 partition 不能出现在同一条规则中。例如,以下规则非法:
{matcher = ['*.*'], dispatcher = "index-value", partition = "table"},
列选择功能
列选择功能支持对事件中的列进行选择,只将指定的列的数据变更事件发送到下游。
以如下示例配置文件中的 column-selectors 配置项为例:
[sink]
column-selectors = [
{matcher = ['test.t1'], columns = ['a', 'b']},
{matcher = ['test.*'], columns = ["*", "!b"]},
{matcher = ['test1.t1'], columns = ['column*', '!column1']},
{matcher = ['test3.t'], columns = ["column?", "!column1"]},
]
- 对于表 test.t1,只发送 a 和 b 两列的数据。
- 对于属于库 test 的表(除 t1 外),发送除 b 列之外的所有列的数据。
- 对于表 test1.t1,发送所有以 column 开头的列,但是不发送 column1 列的数据。
- 对于表 test3.t,发送所有以 column 开头且列名长度为 7 的列,但是不发送 column1 列的数据。
- 不匹配任何规则的表将不进行列过滤,发送所有列的数据。
注意:
- 经过 column-selectors 规则过滤后,表中的数据必须要有主键或者唯一键被同步,否则在 changefeed 创建或运行时会报错。
处理超过 Kafka Topic 限制的消息
Kafka Topic 对可以接收的消息大小有限制,该限制由 max.message.bytes 参数控制。当 TiCDC Kafka sink 在发送数据时,如果发现数据大小超过了该限制,会导致 changefeed 报错,无法继续同步数据。为了解决这个问题,TiCDC 新增一个参数 large-message-handle-option 并提供如下解决方案。
目前,如下功能支持 Canal-JSON 和 Open Protocol 两种编码协议。使用 Canal-JSON 协议时,你需要在 sink-uri 中设置 enable-tidb-extension=true。
TiCDC 层数据压缩功能
从 v7.4.0 开始,TiCDC Kafka sink 支持在编码消息后立即对数据进行压缩,并与消息大小限制参数比较。该功能能够有效减少超过消息大小限制的情况发生。
配置样例如下所示:
[sink.kafka-config.large-message-handle]
# 该参数从 v7.4.0 开始引入
# 默认为 "none",即不开启编码时的压缩功能
# 可选值有 "none"、"lz4"、"snappy",默认为 "none"
large-message-handle-compression = "none"
开启了 large-message-handle-compression 之后,消费者收到的消息经过特定压缩协议编码,消费者应用程序需要使用指定的压缩协议进行数据解码。
该功能和 Kafka producer 的压缩功能不同:
- large-message-handle-compression 中指定的压缩算法是对单条 Kafka 消息进行压缩,并且压缩是在与消息大小限制参数比较之前进行。
- 你也可以同时通过 sink-uri 的 compression 参数配置压缩算法,该配置启用的压缩功能应用在整个发送数据请求,其中包含多条 Kafka 消息。
如果设置了 large-message-handle-compression,TiCDC 在收到一条消息后,先将该消息与消息大小限制参数的值进行对比,大于该消息大小限制的消息会被压缩。如果同时还设置了 sink-uri 的 compression,TiCDC 会根据 sink-uri 的设置,在 sink 级别再次对整个发送数据请求进行压缩。
两种压缩方法的压缩率的计算方法均为:compression ratio = 压缩前的大小 / 压缩后的大小 * 100