虽然是公有云的鼻祖,AWS在某些产品的实现却太不给力;可能是习惯了阿里云喂到嘴边的感觉,AWS很多方案需要自己折腾,蛋疼!比如这里要讲的mysql数据同步方案。
阿里云产品DTS,点几下就OK了,AWS,只能通过插件安装、配置、调试、蹚坑……,具体就是用msk的debezium插件连接器。没见识过的一脸懵逼,下面说下配置步骤和一些注意的坑。
1.前提
安装好rds和msk
rds:binlog_format=row,打开log_bin
msk:auto.create.topics.enable=true
2.安装debezium插件
(1)下载安装包,并将其上传到S3的某个bucket;
https://repo1.maven.org/maven2/io/debezium/debezium-connector-mysql/1.9.7.Final/debezium-connector-mysql-1.9.7.Final-plugin.zip
(2)通过浏览S3或直接输入文件URI:
注意坑:AWS控制台的这个功能需要有梯子才行,否则:“浏览S3”找不到你的桶,直接输入文件URI自动检查文件也提示失败:Network Failure。
(3)安装后就有了:
3.安装msk连接器
3.1.选择自定义插件
3.2.连接器配置
说明:
connector.class=io.debezium.connector.mysql.MySqlConnector #固定写法
database.hostname # db IP或域名
database.user # db username
database.password # db 密码
database.port=3306 # db 端口
database.server.id=1001 # 编号,db范围唯一
tasks.max=1 # 任务并发数
database.server.name=omp-user # DDL语句的topic,程序消费用
include.schema.changes=true # 包含DDL
database.include.list=yunkc_db # db范围
table.include.list=yunkc_db.t1,yunkc_db.t2 # 表范围
database.history.kafka.topic=cdc1 # DDL语句,内部用,消费者不要消费该topic
database.history.kafka.bootstrap.servers # kafka bootstrap server地址
3.3.为mskconnect创建角色
3.4.日志配置
注意:一定要配置,否则报错无法排查
4.消息
捕获数据变化
./kafka-console-consumer.sh --bootstrap-server xxxxxx --topic omp-user.yunkc_db.t1
Struct{before=Struct{id=1},after=Struct{id=2},source=Struct{version=1.9.7.Final,connector=mysql,name=omp-user,ts_ms=1678348957000,db=yunkc_db,table=t1,server_id=351097349,file=mysql-bin-changelog.000001,pos=344,row=0,thread=16},op=u,ts_ms=1678348957646}
Struct{before=Struct{id=2},after=Struct{id=3},source=Struct{version=1.9.7.Final,connector=mysql,name=omp-user,ts_ms=1678349029000,db=yunkc_db,table=t1,server_id=351097349,file=mysql-bin-changelog.000001,pos=611,row=0,thread=16},op=u,ts_ms=1678349029762}
Struct{before=Struct{id=3},after=Struct{id=4},source=Struct{version=1.9.7.Final,connector=mysql,name=omp-user,ts_ms=1678349036000,db=yunkc_db,table=t1,server_id=351097349,file=mysql-bin-changelog.000001,pos=878,row=0,thread=16},op=u,ts_ms=1678349036808}
捕获结构变化
./kafka-console-consumer.sh --bootstrap-server xxxxxx --topic omp-user
Struct{source=Struct{version=1.9.7.Final,connector=mysql,name=omp-user,ts_ms=1678349943178,db=yunkc_db,table=t2,server_id=351097349,file=mysql-bin-changelog.000001,pos=1287,row=0},databaseName=yunkc_db,ddl=create table t2(id int),
tableChanges=[Struct{type=CREATE,id="yunkc_db"."t2",table=Struct{defaultCharsetName=utf8,primaryKeyColumnNames=[],columns=[Struct{name=id,jdbcType=4,typeName=INT,typeExpression=INT,position=1,optional=true,autoIncremented=false,generated=false}]}}]}