文章目录
- 1、基本介绍
- 2、代码实战
- 2.1、数据源准备
- 2.2、代码实战
- 2.3、数据格式
1、基本介绍
Flink CDC 是 Apache Flink 提供的一个功能强大的组件,用于实时捕获和处理数据库中的数据变更。可以实时地从各种数据库(如MySQL、PostgreSQL、Oracle、MongoDB等)中捕获数据变更并将其转换为流式数据,FlinkCDC 同步数据有两种方式:
- FlinkSQL
- Flink DataStream 和 Table API(本文使用该方式)
对比其他的CDC开源方案,发现FlinkCDC是绝大多数场景最好的选择方式,别在傻傻的只关注Canal了,如下图所示:
2、代码实战
2.1、数据源准备
本次我是用MySQL 8.0版本,并且创建好数据库(库名为quick_chat),本次演示表结构如下:
CREATE TABLE `quick_chat_msg` (
`id` bigint NOT NULL COMMENT '主键id',
`from_id` varchar(20) CHARACTER SET utf8 COLLATE utf8_unicode_ci DEFAULT NULL COMMENT '账户id(发送人)',
`to_id` varchar(20) CHARACTER SET utf8 COLLATE utf8_unicode_ci DEFAULT NULL COMMENT '账户id(接收人)',
`relation_id` varchar(50) CHARACTER SET utf8 COLLATE utf8_unicode_ci DEFAULT NULL COMMENT '发送关联',
`content` varchar(500) DEFAULT NULL COMMENT '消息内容',
`msg_type` tinyint(1) DEFAULT NULL COMMENT '消息类型(1:文字,2:语音,3:表情包,4:文件,5:语音通话,6:视频通话)',
`extra_info` varchar(500) DEFAULT NULL COMMENT '额外信息',
`create_time` datetime DEFAULT NULL COMMENT '创建时间',
`deleted` tinyint(1) DEFAULT NULL COMMENT '删除标识',
PRIMARY KEY (`id`) USING BTREE
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb3;
需要保证MySQL的Binlog格式是ROW,不过MySQL 8.0版本格式默认就是ROW:
最后,要把数据库时区配置好,否则会出现问题,命令如下:
SET persist time_zone = '+8:00';
SET time_zone = '+8:00';
SHOW VARIABLES LIKE '%time_zone%';
2.2、代码实战
首先,引入Flink CDC相关依赖,内容如下:
<dependencies>
<!-- Flink connector连接器基础包 -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-base</artifactId>
<version>1.14.0</version>
</dependency>
<!-- Flink CDC MySQL源 -->
<dependency>
<groupId>com.ververica</groupId>
<artifactId>flink-sql-connector-mysql-cdc</artifactId>
<version>2.3.0</version>
</dependency>
<!-- Flink DataStream数据流API -->
<dependency>
<groupId>com.ververica</groupId>
<artifactId>flink-connector-mysql-cdc</artifactId>
<version>2.2.0</version>
<scope>provided</scope>
</dependency>
<!-- Flink客户端-->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_2.12</artifactId>
<version>1.14.0</version>
</dependency>
<!--Flink WebUI,端口8081(默认没有开启)-->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-runtime-web_2.12</artifactId>
<version>1.14.0</version>
</dependency>
<!--Flink Table API&SQL程序可以连接到其他外部系统,用于读写批处理表和流式表。-->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-runtime_2.12</artifactId>
<version>1.14.0</version>
</dependency>
</dependencies>
第二步,开发 Sink 监听类,用于监听 MySQL 数据变化:
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
public class MySinkHandler extends RichSinkFunction<String> {
@Override
public void invoke(String value, Context context) throws Exception {
System.out.println(value);
}
@Override
public void open(Configuration parameters) throws Exception {
}
@Override
public void close() throws Exception {
}
}
最后,配置好 Flink CDC 监听进程,随着项目启动运行:
import com.ververica.cdc.connectors.mysql.source.MySqlSource;
import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.RestOptions;
import org.apache.flink.streaming.api.datastream.DataStreamSink;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
@Component
public class MySqlSourceExample {
@PostConstruct
public void init() throws Exception {
// 配置监听数据源
MySqlSource<String> source = MySqlSource.<String>builder()
.hostname("8.141.28.132")
.port(3306)
// 数据库集合,可以配置多个
.databaseList("quick_chat")
// 表集合,可以配置多个
.tableList("quick_chat.quick_chat_msg")
.username("root")
.password("root")
.deserializer(new JsonDebeziumDeserializationSchema())
.includeSchemaChanges(true)
.build();
// 配置 Flink WebUI
Configuration configuration = new Configuration();
configuration.setInteger(RestOptions.PORT, 8081);
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(configuration);
// 检查点间隔时间
// checkpoint的侧重点是“容错”,即Flink作业意外失败并重启之后,能够直接从早先打下的checkpoint恢复运行,且不影响作业逻辑的准确性。
env.enableCheckpointing(5000);
DataStreamSink<String> sink = env.fromSource(source, WatermarkStrategy.noWatermarks(), "MySQL Source")
.addSink(new MySinkHandler());
env.execute();
}
}
项目启动完毕后,可以通过8081端口访问Flink UI页面:
2.3、数据格式
上述操作完毕后,我对表数据进行了新增、修改、删除操作,控制台可以看到MySQL变更监听日志输出信息:
# 新增
{
"before": null,
"after": {
"id": 3,
"from_id": "dog",
"to_id": "cat",
"relation_id": "dog:cat",
"content": "你好啊",
"msg_type": 1,
"extra_info": null,
"create_time": 1729164075000,
"deleted": 0
},
"source": {
"version": "1.6.4.Final",
"connector": "mysql",
"name": "mysql_binlog_source",
"ts_ms": 1729135279000,
"snapshot": "false",
"db": "quick_chat",
"sequence": null,
"table": "quick_chat_msg",
"server_id": 1,
"gtid": null,
"file": "binlog.000002",
"pos": 2452,
"row": 0,
"thread": null,
"query": null
},
"op": "c",
"ts_ms": 1729135278633,
"transaction": null
}
# 修改
{
"before": {
"id": 3,
"from_id": "dog",
"to_id": "cat",
"relation_id": "dog:cat",
"content": "你好啊",
"msg_type": 1,
"extra_info": null,
"create_time": 1729164075000,
"deleted": 0
},
"after": {
"id": 3,
"from_id": "dog",
"to_id": "cat",
"relation_id": "dog:cat",
"content": "你好啊,小猫咪",
"msg_type": 1,
"extra_info": null,
"create_time": 1729164075000,
"deleted": 0
},
"source": {
"version": "1.6.4.Final",
"connector": "mysql",
"name": "mysql_binlog_source",
"ts_ms": 1729135289000,
"snapshot": "false",
"db": "quick_chat",
"sequence": null,
"table": "quick_chat_msg",
"server_id": 1,
"gtid": null,
"file": "binlog.000002",
"pos": 2825,
"row": 0,
"thread": null,
"query": null
},
"op": "u",
"ts_ms": 1729135288473,
"transaction": null
}
# 删除
{
"before": {
"id": 3,
"from_id": "dog",
"to_id": "cat",
"relation_id": "dog:cat",
"content": "你好啊,小猫咪",
"msg_type": 1,
"extra_info": null,
"create_time": 1729164075000,
"deleted": 0
},
"after": null,
"source": {
"version": "1.6.4.Final",
"connector": "mysql",
"name": "mysql_binlog_source",
"ts_ms": 1729135301000,
"snapshot": "false",
"db": "quick_chat",
"sequence": null,
"table": "quick_chat_msg",
"server_id": 1,
"gtid": null,
"file": "binlog.000002",
"pos": 3247,
"row": 0,
"thread": null,
"query": null
},
"op": "d",
"ts_ms": 1729135300692,
"transaction": null
}