1. Flink-CDC的介绍
Flink-cdc主要是用来同步数据库中的数据,它的主要优势在于基于Flink框架直接用Flink Stream Api 或Flink SQL 直接编程,不需要引入第三方组件
2.Flink-CDC的使用
Flink-cdc在使用上需要注意的点
- 注意Flink-cdc在2.1版本之前需要导入MySQL的连接包,之后版本不需要,如果环境中有MySQL的连接包需要去除掉
- 在2.4版本之监控MySQL表需要它有主键,2.4版本开始只需要配置“scan.incremental.snapshot.chunk.key-column”参数即可
- MySQL CDC Connector在监控多个表的时候,每个表需要指定库名,并用逗号隔开
- Flink中必须要设置checkpoint,不设置无法正常监控binlog变更日志
Flink-CDC基于DataStream的使用
MySqlSource<String> mySqlSource = MySqlSource.<String>builder()
.hostname("node2") //设置MySQL hostname
.port(3306) //设置MySQL port
.databaseList("db1") //设置捕获的数据库
.tableList("db1.tbl1,db1.tbl2") //设置捕获的数据表
.username("root") //设置登录MySQL用户名
.password("123456") //设置登录MySQL密码
.deserializer(new JsonDebeziumDeserializationSchema()) //设置序列化将SourceRecord 转换成 Json 字符串
.startupOptions(StartupOptions.initial())
.build();
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//开启checkpoint
env.enableCheckpointing(5000);
env.fromSource(mySqlSource, WatermarkStrategy.noWatermarks(),"MySQL Source")
.setParallelism(4)
.print();
env.execute();
基于Flink Sql的使用
EnvironmentSettings settings = EnvironmentSettings.newInstance().inStreamingMode().build();
TableEnvironment tableEnv = TableEnvironment.create(settings);
//设置checkpoint
tableEnv.getConfig().getConfiguration().setLong("execution.checkpointing.interval", 5000L);
tableEnv.executeSql("" +
"CREATE TABLE mysql_binlog (" +
" id INT," +
" name STRING," +
" age INT," +
" PRIMARY KEY(id) NOT ENFORCED" +
") WITH (" +
" 'connector' = 'mysql-cdc'," +
" 'hostname' = 'node2'," +
" 'port' = '3306'," +
" 'username' = 'root'," +
" 'password' = '123456'," +
" 'database-name' = 'db1'," +
" 'table-name' = 'tbl1'" +
")");
tableEnv.executeSql("select * from mysql_binlog").print();
2.1 Flink-CDC对全量和增量数据的工作原理
并行读取表的全量快照,然后以单并行度方式读取表的binlog进行增量数据的同步
- 全量同步过程中,它会根据主键把数据分为多个chunk分片,然后分配给多并行度去分别读取这些chunk上的数据,读取快照期间,Flink支持chunk级别的checkpoint,即使在同步的过程中发生故障,也可以做到exactly-once级别的恢复
2.2 Flink-CDC启动模式
启动模式是指程序启动的时候,以怎么的方式监控数据库中的数据,共有如下几种模式
- initial(默认): 对受监控的库表进行初始快照,并继续读取最新的binlog
- earliest-offset: 它会跳过快照直接读取最早的binlog日志,它与initial方式区别在于,initial只读取已经操作后(表中现有数据)的数据
- latest-offset: 不执行快照,从binlog的最新处开始读取增量数据
- specific-offset: 从指定的binlog位点开始读取,位点可以通过binlog文件名和位置指定
- timestamp: 从指定的时间戳读取binlog事件