文章目录
- 一. 下载依赖包
- 二. 源端数据准备
- 三. 使用Flink cdc mysql连接器创建flinkSQL映射表
- 四. 创建FlinkSQL Hudi连接器创建hudi表
- 五. 将数据从CDC表插入hudi表
- 六. 测试增删改
- 参考:
一. 下载依赖包
将 flink-sql-connector-mysql-cdc-2.2.1.jar 下载后,上传到$FLINK_HOME/lib
目录
二. 源端数据准备
use test;
DROP TABLE IF EXISTS mysql_cdc;
create table mysql_cdc
(
id int NOT NULL AUTO_INCREMENT ,
name varchar(100),
PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8 COMMENT='mysql cdc 表';
insert into mysql_cdc(id, name) values (1,'test1');
insert into mysql_cdc(id, name) values (2,'test2');
insert into mysql_cdc(id, name) values (3,'test3');
insert into mysql_cdc(id, name) values (4,'test4');
insert into mysql_cdc(id, name) values (5,'test5');
insert into mysql_cdc(id, name) values (6,'test6');
insert into mysql_cdc(id, name) values (7,'test7');
insert into mysql_cdc(id, name) values (8,'test8');
insert into mysql_cdc(id, name) values (9,'test9');
insert into mysql_cdc(id, name) values (10,'test10');
insert into mysql_cdc(id, name) values (11,'test11');
insert into mysql_cdc(id, name) values (12,'test12');
insert into mysql_cdc(id, name) values (13,'test13');
insert into mysql_cdc(id, name) values (14,'test14');
insert into mysql_cdc(id, name) values (15,'test15');
insert into mysql_cdc(id, name) values (16,'test16');
insert into mysql_cdc(id, name) values (17,'test17');
insert into mysql_cdc(id, name) values (18,'test18');
insert into mysql_cdc(id, name) values (19,'test19');
三. 使用Flink cdc mysql连接器创建flinkSQL映射表
代码:
这个地方的server-id我给了一个动态的端口,之前给静态的端口,总是报错:
io.debezium.DebeziumException: A slave with the same server_uuid/server_id as this slave has connect
set execution.checkpointing.interval=10sec;
CREATE TABLE flink_mysql_cdc5 (
id BIGINT NOT NULL PRIMARY KEY NOT ENFORCED,
name varchar(100)
) WITH (
'connector' = 'mysql-cdc',
'hostname' = 'hp8',
'port' = '3306',
'username' = 'root',
'password' = 'abc123',
'database-name' = 'test',
'table-name' = 'mysql_cdc',
'server-id' = '5400-5408',
'scan.incremental.snapshot.enabled'='true'
);
set sql-client.execution.result-mode=tableau;
select * from flink_mysql_cdc5;
测试记录:
四. 创建FlinkSQL Hudi连接器创建hudi表
代码:
CREATE TABLE flink_hudi_mysql_cdc5(
id BIGINT NOT NULL PRIMARY KEY NOT ENFORCED,
name varchar(100)
) WITH (
'connector' = 'hudi',
'path' = 'hdfs://hp5:8020/tmp/hudi/flink_hudi_mysql_cdc5',
'table.type' = 'MERGE_ON_READ',
'changelog.enabled' = 'true',
'hoodie.datasource.write.recordkey.field' = 'id',
'write.precombine.field' = 'name',
'compaction.async.enabled' = 'false'
);
五. 将数据从CDC表插入hudi表
insert into flink_hudi_mysql_cdc5 select * from flink_mysql_cdc5;
select * from flink_hudi_mysql_cdc5 ;
HDFS上也有数据:
六. 测试增删改
insert into mysql_cdc(id, name) values (20,'test20');
delete from mysql_cdc where id = 1;
update mysql_cdc set name ='test2-updated' where id = 2;
update mysql_cdc set name ='test3-updated' where id = 3;
delete from mysql_cdc where id = 4;
参考:
- https://ververica.github.io/flink-cdc-connectors/master/content/connectors/mysql-cdc.html
- https://www.pudn.com/news/6228ca059ddf223e1ad0b87f.html
- https://zhuanlan.zhihu.com/p/479832928