Debezium是Red Hat开源的一个工具,用于实时捕获多种数据源(包括MySQL、PostgreSQL、SQL Server、Oracle等)的变更数据,并将这些数据作为事件流输出到Kafka等消息中间件中。通过Debezium,可以实现数据的实时同步和变更数据捕获(CDC)。
Debezium官网:Reference Documentation
一、环境准备
1、安装 MySQL
确保 MySQL 版本至少为 5.7.6,因为 Debezium 需要此版本或更高版本的 MySQL 来支持 Binlog。安装 MySQL 并配置好基本的访问权限和网络设置。
# 编辑 MySQL 的配置文件(通常是 /etc/my.cnf 或 /etc/mysql/my.cnf),添加或修改以下配置:
[mysqld]
server-id = 1 # 确保每个 MySQL 实例的 server-id 是唯一的
log_bin = mysql-bin # 开启 Binlog 并指定日志文件名前缀
binlog_format = ROW # 使用行格式记录 Binlog,以捕获详细的行级变更
binlog_row_image = FULL # 记录更详细的数据变更信息
expire_logs_days = 10 # 设置 Binlog 的过期时间
在 MySQL 中创建用于 Debezium 的用户,并授权访问数据库:
CREATE USER 'debezium'@'%' IDENTIFIED BY 'password';
GRANT SELECT, RELOAD, SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'debezium'@'%';
FLUSH PRIVILEGES;
2、安装Oracle
确保 Oracle 数据库已安装并配置好,归档日志(Archive Log)和补充日志(Supplemental Log Data)已开启。创建一个用户并授予足够的权限以访问需要同步的表和日志。
-- 1. 查看数据库是否处于归档模式
SELECT log_mode FROM v$database;
-- 如果结果显示为ARCHIVELOG,则数据库已处于归档模式,可以跳过转换步骤
-- 2. 如果不是归档模式,转换为归档模式
SHUTDOWN IMMEDIATE; -- 立即关闭数据库
STARTUP MOUNT; -- 启动实例并挂载数据库
ALTER DATABASE ARCHIVELOG; -- 更改数据库为归档模式
ALTER DATABASE OPEN; -- 打开数据库
-- 3. 指定归档日志的位置,可以是一个本地目录或一个网络位置
-- 首先查看归档日志的配置
SELECT destination FROM v$archived_log WHERE destination IS NOT NULL;
-- 如果需要更改归档日志的位置,可以使用以下命令
-- 例如,指定归档日志的位置为+ARCH/orcl
ALTER SYSTEM SET log_archive_dest_1='LOCATION=+ARCH/orcl' SCOPE=BOTH;
-- 确保归档进程正在运行
ALTER SYSTEM ARCHIVE LOG START;
-- 可以通过以下命令查看归档进程状态
SELECT status FROM v$instance WHERE status = 'ACTIVE';
3、安装 Kafka 和 Zookeeper
Kafka 和 Zookeeper 是 Debezium 实现数据同步所必需的组件。
安装 Kafka 和 Zookeeper,并确保它们能够正常运行,且 Kafka 集群的配置符合生产环境的要求。
4、下载 Debezium 插件
访问 Debezium 官网((https://debezium.io/documentation/)下载对应版本的 MySQL/Oracle Connector 插件(这里以2.7版本为例),如 debezium-connector-mysql-x.y.z.Final-plugin.tar.gz
。
二、配置 Kafka Connect
1、解压 Debezium 插件
将下载的 Debezium MySQL/Oracle Connector 插件解压到 Kafka 的插件目录中,例如 /opt/kafka/plugins
,没有plugins
目录,创建一个plugins
目录
2、修改 Kafka Connect 配置
编辑 Kafka Connect 的配置文件(如 connect-distributed.properties
),确保包含以下关键配置:
bootstrap.servers=localhost:9092 # Kafka 集群地址
group.id=connect-cluster # Kafka Connect 集群的组 ID
key.converter=org.apache.kafka.connect.json.JsonConverter # 键转换器
value.converter=org.apache.kafka.connect.json.JsonConverter # 值转换器
key.converter.schemas.enable=false # 禁用键的 schema
value.converter.schemas.enable=false # 禁用值的 schema
offset.storage.topic=connect-offsets # 偏移量存储主题
config.storage.topic=connect-configs # 配置存储主题
status.storage.topic=connect-status # 状态存储主题
plugin.path=/opt/kafka/plugins # 插件路径
3、启动 Kafka Connect
使用命令启动 Kafka Connect 服务,并确保它以分布式模式运行:
/opt/kafka/bin/connect-distributed.sh -daemon /opt/kafka/config/connect-distributed.properties
三、配置 Debezium Connector
创建一个 JSON 文件来定义 Debezium MySql/Oracle Connector 的配置,指定 MySql/Oracle 数据库的连接信息、Kafka 主题、以及需要同步的表。
1、创建 Debezium Mysql Connector 配置:
编写一个配置文件mysql-source.json
,用于启动Debezium MySQL Connector。可参照官网例子:Debezium connector for MySQL :: Debezium Documentation
{
"name": "mysql-connector", //在Kafka Connect服务中注册时的连接器名称。
"config": {
"connector.class": "io.debezium.connector.mysql.MySqlConnector"",
"database.hostname": "mysql_host",
"database.port": "3306",
"database.user": "debezium-user",
"database.password": "debezium-user-pw",
"database.server.id": "184054",
"topic.prefix": "fullfillment",
"database.include.list": "inventory",
"schema.history.internal.kafka.bootstrap.servers": "kafka:9092",
"schema.history.internal.kafka.topic": "schemahistory.fullfillment",
"include.schema.changes": "true"
}
}
更多配置可查看:Debezium connector for MySQL :: Debezium Documentation
使用Kafka Connect REST API或CLI工具部署Debezium MySQL Connector。
curl -X POST -H "Accept:application/json" -H "Content-Type:application/json" --data @mysql-source.json http://connect_cluster:8083/connectors
2、创建 Debezium Oracle Connector 配置:
编写一个配置文件oracle-source.json
,用于启动Debezium Oracle Connector。
{
"name": "oracle-connector",
"config": {
"connector.class" : "io.debezium.connector.oracle.OracleConnector",
"database.hostname" : "<ORACLE_IP_ADDRESS>",
"database.port" : "1521",
"database.user" : "c##dbzuser",
"database.password" : "dbz",
"database.dbname" : "ORCLCDB",
"topic.prefix" : "server1",
"tasks.max" : "1",
"database.pdb.name" : "ORCLPDB1",
"schema.history.internal.kafka.bootstrap.servers" : "kafka:9092",
"schema.history.internal.kafka.topic": "schema-changes.inventory"
}
}
还可以使用JDBC URL连接到数据库。
{
"name": "oracle-connector",
"config": {
"connector.class" : "io.debezium.connector.oracle.OracleConnector",
"tasks.max" : "1",
"topic.prefix" : "server1",
"database.user" : "c##dbzuser",
"database.password" : "dbz",
"database.url": "jdbc:oracle:thin:@(DESCRIPTION=(ADDRESS_LIST=(LOAD_BALANCE=OFF)(FAILOVER=ON)(ADDRESS=(PROTOCOL=TCP)(HOST=<oracle ip 1>)(PORT=1521))(ADDRESS=(PROTOCOL=TCP)(HOST=<oracle ip 2>)(PORT=1521)))(CONNECT_DATA=SERVICE_NAME=)(SERVER=DEDICATED)))",
"database.dbname" : "ORCLCDB",
"database.pdb.name" : "ORCLPDB1",
"schema.history.internal.kafka.bootstrap.servers" : "kafka:9092",
"schema.history.internal.kafka.topic": "schema-changes.inventory"
}
}
以上是容器数据库(CDB)配置方式,下面是非容器数据库(非CDB)配置方式。
{
"config": {
"connector.class" : "io.debezium.connector.oracle.OracleConnector",
"tasks.max" : "1",
"topic.prefix" : "server1",
"database.hostname" : "<oracle ip>",
"database.port" : "1521",
"database.user" : "c##dbzuser",
"database.password" : "dbz",
"database.dbname" : "ORCLCDB",
"schema.history.internal.kafka.bootstrap.servers" : "kafka:9092",
"schema.history.internal.kafka.topic": "schema-changes.inventory"
}
}
当您配置Debezium Oracle连接器以与Oracle CDB一起使用时,必须为属性database.pdb.name指定一个值,该值命名了您希望连接器从中捕获更改的pdb。对于非CDB安装,不要指定database.pdb.name属性。
更多配置可查看:Debezium Connector for Oracle :: Debezium Documentation
使用Kafka Connect REST API或CLI工具部署Debezium Oracle Connector。
curl -X POST -H "Accept:application/json" -H "Content-Type:application/json" --data @oracle-source.json http://connect_cluster:8083/connectors
四、使用JDBC Sink Connector同步数据到数据库
以JDBC Sink Connector为例,以下是一个简化的配置示例,用于将数据从Kafka订阅消费事件JSON:
{
"name": "jdbc-connector", //在Kafka Connect服务中注册连接器时分配给连接器的名称。
"config": {
"connector.class": "io.debezium.connector.jdbc.JdbcSinkConnector", //JDBC接收器连接器类的名称。
"tasks.max": "1", //为此连接器创建的最大任务数。
"connection.url": "jdbc:postgresql://localhost/db", //连接器用于连接到其写入的接收器数据库的JDBC URL。
"connection.username": "pguser", //用于身份验证的数据库用户的名称。
"connection.password": "pgpassword", //用于身份验证的数据库用户的密码。
"insert.mode": "upsert", // 插入模式,可以选择 `insert`, `update`, 或者 `upsert`
"delete.enabled": "true", //允许删除数据库中的记录
"primary.key.mode": "record_key", //指定用于解析主键列的方法。
"schema.evolution": "basic", //指定连接器如何演化目标表架构,可选择`none`, 或者 `basic`, basic:指定发生基本演变。连接器通过将传入事件的记录架构与数据库表结构进行比较,将缺失的列添加到表中。
"database.time_zone": "UTC", //指定写入时间字段类型时使用的时区。
"topics": "orders" //要使用的主题列表,用逗号分隔。
}
}
更多配置可查看:Debezium connector for JDBC :: Debezium Documentation
使用Kafka Connect REST API或CLI工具部署Oracle Sink Connector。
curl -X POST -H "Accept:application/json" -H "Content-Type:application/json" --data @sink.json http://connect_cluster:8083/connectors