一、前言
使用Apache Flink实现数据同步的ETL(抽取、转换、加载)过程通常涉及从源系统(如数据库、消息队列或文件)中抽取数据,进行必要的转换,然后将数据加载到目标系统(如另一个数据库或数据仓库)。在这里,我们将展示如何使用Apache Flink来实现一个从MySQL或Oracle数据库抽取数据并同步到另一个Doris数据库的ETL过程。
Flink官网:Apache Flink CDC | Apache Flink CDC
开启 Mysql Binlog
修改我们的配置文件 my.cnf
,增加:
server_id=1
log_bin=mysql-bin
binlog_format=ROW
expire_logs_days=30
重启 mysql
# 判断MySQL是否已经开启binlog on 为打开状态
SHOW VARIABLES LIKE 'log_bin';
# 查看MySQL的binlog模式
show global variables like "binlog%";
# 查看日志开启状态
show variables like 'log_%';
# 刷新log日志,立刻产生一个新编号的binlog日志文件,跟重启一个效果
flush logs;
# 清空所有binlog日志
reset master;
二、准备 Flink Standalone 集群
1、下载 Flink 1.18.0,解压后得到 flink-1.18.0 目录。 使用下面的命令跳转至 Flink 目录下,并且设置 FLINK_HOME 为 flink-1.18.0 所在目录。
tar -xzf flink-*.tgz
cd flink-1.18.0
export FLINK_HOME=/usr/local/flink-1.18.0
2、通过在 conf/flink-conf.yaml 配置文件追加下列参数开启 checkpoint,每隔 3 秒做一次 checkpoint。
execution.checkpointing.interval: 3000
3、使用下面的命令启动 Flink 集群。
cd /usr/local/flink-1.18.0
./bin/start-cluster.sh
Flink现在作为后台进程运行。您可以使用以下命令检查其状态:
ps aux | grep flink
启动成功的话,可以在 http://localhost:8081/访问到 Flink Web UI,如下所示:
多次执行 start-cluster.sh
可以拉起多个 TaskManager。
如果想修改端口,可以在 conf/flink-conf.yaml 中修改 rest.port
要快速停止集群和所有正在运行的组件,您可以使用提供的脚本:
./bin/stop-cluster.sh
三、通过 FlinkCDC cli 提交任务
支持的连接器:
连接器 | 类型 | 支持的外部系统 |
---|---|---|
Apache Doris | Sink |
|
MySQL | Source |
|
StarRocks | Sink |
|
1、下载下面列出的二进制压缩包,并解压得到目录 flink-cdc-3.1.0
; flink-cdc-3.1.0-bin.tar.gz flink-cdc-3.1.0 下会包含 bin
、lib
、log
、conf
四个目录。
2、下载下面列出的 connector 包,并且移动到 lib
目录下; 下载链接只对已发布的版本有效, SNAPSHOT 版本需要本地基于 master 或 release- 分支编译.
- MySQL pipeline connector 3.1.0
- Apache Doris pipeline connector 3.1.0
3、编写任务配置 yaml 文件 下面给出了一个整库同步的示例文件 mysql-to-doris.yaml
:
################################################################################
# Description: Sync MySQL all tables to Doris
################################################################################
source:
type: mysql
hostname: localhost
port: 3306
username: root
password: 123456
tables: app_db.\.*
server-id: 5400-5404
server-time-zone: UTC
sink:
type: doris
fenodes: 127.0.0.1:8030
username: root
password: ""
table.create.properties.light_schema_change: true
table.create.properties.replication_num: 1
pipeline:
name: Sync MySQL Database to Doris
parallelism: 2
其中: source 中的 tables: app_db.\.*
通过正则匹配同步 app_db
下的所有表。 sink 添加 table.create.properties.replication_num
参数是由于 Docker 镜像中只有一个 Doris BE 节点。
4、最后,通过命令行提交任务到 Flink Standalone cluster
bash bin/flink-cdc.sh mysql-to-doris.yaml
提交成功后,返回信息如:
Pipeline has been submitted to cluster.
Job ID: ae30f4580f1918bebf16752d4963dc54
Job Description: Sync MySQL Database to Doris
在 Flink Web UI,可以看到一个名为 Sync MySQL Database to Doris
的任务正在运行。
四、Route the changes
Flink CDC 提供了将源表的表结构/数据路由到其他表名的配置,借助这种能力,我们能够实现表名库名替换,整库同步等功能。 下面提供一个配置文件说明:
################################################################################
# Description: Sync MySQL all tables to Doris
################################################################################
source:
type: mysql
hostname: localhost
port: 3306
username: root
password: 123456
tables: app_db.\.*
server-id: 5400-5404
server-time-zone: UTC
sink:
type: doris
fenodes: 127.0.0.1:8030
benodes: 127.0.0.1:8040
username: root
password: ""
table.create.properties.light_schema_change: true
table.create.properties.replication_num: 1
route:
- source-table: app_db.orders
sink-table: ods_db.ods_orders
- source-table: app_db.shipments
sink-table: ods_db.ods_shipments
- source-table: app_db.products
sink-table: ods_db.ods_products
pipeline:
name: Sync MySQL Database to Doris
parallelism: 2
通过上面的 route
配置,会将 app_db.orders
表的结构和数据同步到 ods_db.ods_orders
中。从而实现数据库迁移的功能。 特别地,source-table
支持正则表达式匹配多表,从而实现分库分表同步的功能,例如下面的配置:
route:
- source-table: app_db.order\.*
sink-table: ods_db.ods_orders
这样,就可以将诸如 app_db.order01
、app_db.order02
、app_db.order03
的表汇总到 ods_db.ods_orders 中。注意,目前还不支持多表中存在相同主键数据的场景,将在后续版本支持。
以下扩展笔记
五、通过Flink Sql Client 方式与 Flink 进行交互
支持的连接器:
Connector | Database | Driver |
---|---|---|
mongodb-cdc |
| MongoDB Driver: 4.9.1 |
mysql-cdc |
| JDBC Driver: 8.0.28 |
oceanbase-cdc |
| OceanBase Driver: 2.4.x |
oracle-cdc |
| Oracle Driver: 19.3.0.0 |
postgres-cdc |
| JDBC Driver: 42.5.1 |
sqlserver-cdc |
| JDBC Driver: 9.4.1.jre8 |
tidb-cdc |
| JDBC Driver: 8.0.27 |
db2-cdc |
| Db2 Driver: 11.5.0.0 |
vitess-cdc |
| MySql JDBC Driver: 8.0.26 |
下表显示了Flink CDC连接器和Flink 之间的版本映射:
Flink® CDC Version | Flink® Version |
---|---|
1.0.0 | 1.11.* |
1.1.0 | 1.11.* |
1.2.0 | 1.12.* |
1.3.0 | 1.12.* |
1.4.0 | 1.13.* |
2.0.* | 1.13.* |
2.1.* | 1.13.* |
2.2.* | 1.13.*, 1.14.* |
2.3.* | 1.13.*, 1.14.*, 1.15.*, 1.16.* |
2.4.* | 1.13.*, 1.14.*, 1.15.*, 1.16.*, 1.17.* |
3.0.* | 1.14.*, 1.15.*, 1.16.*, 1.17.*, 1.18.* |
使用下面的命令启动 Flink SQL CLI
#在flink目录下
./bin/sql-client.sh
然后建表语句创建Flink表 ,以下展示将flink_source.source_test表实时同步到flink_sink、flink_sink_second的sink_test表,Mysql同步Mysql。
# 每 3 秒做一次 checkpoint,用于测试,生产配置建议5到10分钟
Flink SQL> SET execution.checkpointing.interval = 3s;
[INFO] Session property has been set.
Flink SQL> CREATE TABLE source_test (
> user_id STRING,
> user_name STRING,
> PRIMARY KEY (user_id) NOT ENFORCED
> ) WITH (
> 'connector' = 'mysql-cdc',
> 'hostname' = '192.168.3.31',
> 'port' = '3306',
> 'username' = 'root',
> 'password' = '******',
> 'database-name' = 'flink_source',
> 'table-name' = 'source_test'
> );
[INFO] Execute statement succeed.
Flink SQL> CREATE TABLE sink_test (
> user_id STRING,
> user_name STRING,
> PRIMARY KEY (user_id) NOT ENFORCED
> ) WITH (
> 'connector' = 'jdbc',
> 'url' = 'jdbc:mysql://192.168.3.31:3306/flink_sink',
> 'driver' = 'com.mysql.cj.jdbc.Driver',
> 'username' = 'root',
> 'password' = '******',
> 'table-name' = 'sink_test'
> );
[INFO] Execute statement succeed.
Flink SQL> CREATE TABLE sink_test_second (
> user_id STRING,
> user_name STRING,
> PRIMARY KEY (user_id) NOT ENFORCED
> ) WITH (
> 'connector' = 'jdbc',
> 'url' = 'jdbc:mysql://192.168.3.31:3306/flink_sink_second',
> 'driver' = 'com.mysql.cj.jdbc.Driver',
> 'username' = 'root',
> 'password' = '******',
> 'table-name' = 'sink_test'
> );
[INFO] Execute statement succeed.
Flink SQL> insert into sink_test select * from source_test;
[INFO] Submitting SQL update statement to the cluster...
[INFO] SQL update statement has been successfully submitted to the cluster:
Job ID: 0c49758cc251699f0b4acd6c9f735e6e
Flink SQL> insert into sink_test_second select * from source_test;
[INFO] Submitting SQL update statement to the cluster...
[INFO] SQL update statement has been successfully submitted to the cluster:
Job ID: ecea685a715d7d40ee1a94aac3236c18
Flink SQL>
注意需要将flink-sql-connector-mysql-cdc-3.1.0.jar放到{flink-1.18.0}/lib/ 下 。
下载 Flink CDC 相关 Jar 包:Central Repository: com/ververica/flink-sql-connector-mysql-cdc
如果报如下错误:将flink-connector-jdbc-3.2.0-1.18.jar放到{flink-1.18.0}/lib/ 下 。
[ERROR] Could not execute SQL statement. Reason:
org.apache.flink.table.api.ValidationException: Could not find any factory for identifier 'jdbc' that implements 'org.apache.flink.table.factories.DynamicTableFactory' in the classpath.
Available factory identifiers are:
blackhole
datagen
filesystem
mysql-cdc
print
python-input-format
JDBC SQL 连接器:JDBC 连接器(使用地址)允许使用 JDBC 驱动向任意类型的关系型数据库读取或者写入数据。针对关系型数据库如何通过建立 JDBC 连接器来执行 SQL 查询。如果在 DDL 中定义了主键,JDBC sink 将以 upsert 模式与外部系统交换 UPDATE/DELETE 消息;否则,它将以 append 模式与外部系统交换消息且不支持消费 UPDATE/DELETE 消息。下载JDBC SQL连接器的依赖包,放到目录 {flink-1.18.0}/lib/ 下,用于使用JDBC SQL连接器连接MySQL的sink库。
Mysql连接器选项:
Option | Required | Default | Type | Description |
---|---|---|---|---|
connector | required | (none) | String | 指定要使用的连接器, 这里应该是 'mysql-cdc' . |
hostname | required | (none) | String | MySQL 数据库服务器的 IP 地址或主机名。 |
username | required | (none) | String | 连接到 MySQL 数据库服务器时要使用的 MySQL 用户的名称。 |
password | required | (none) | String | 连接 MySQL 数据库服务器时使用的密码。 |
database-name | required | (none) | String | 要监视的 MySQL 服务器的数据库名称。数据库名称还支持正则表达式,以监视多个与正则表达式匹配的表。 |
table-name | required | (none) | String | 需要监视的 MySQL 数据库的表名。表名支持正则表达式,以监视满足正则表达式的多个表。注意:MySQL CDC 连接器在正则匹配表名时,会把用户填写的 database-name, table-name 通过字符串 `\\.` 连接成一个全路径的正则表达式,然后使用该正则表达式和 MySQL 数据库中表的全限定名进行正则匹配。 |
port | optional | 3306 | Integer | MySQL 数据库服务器的整数端口号。 |
server-id | optional | (none) | String | 读取数据使用的 server id,server id 可以是个整数或者一个整数范围,比如 '5400' 或 '5400-5408', 建议在 'scan.incremental.snapshot.enabled' 参数为启用时,配置成整数范围。因为在当前 MySQL 集群中运行的所有 slave 节点,标记每个 salve 节点的 id 都必须是唯一的。 所以当连接器加入 MySQL 集群作为另一个 slave 节点(并且具有唯一 id 的情况下),它就可以读取 binlog。 默认情况下,连接器会在 5400 和 6400 之间生成一个随机数,但是我们建议用户明确指定 Server id。 |
scan.incremental.snapshot.enabled | optional | true | Boolean | 增量快照是一种读取表快照的新机制,与旧的快照机制相比, 增量快照有许多优点,包括: (1)在快照读取期间,Source 支持并发读取, (2)在快照读取期间,Source 支持进行 chunk 粒度的 checkpoint, (3)在快照读取之前,Source 不需要数据库锁权限。 如果希望 Source 并行运行,则每个并行 Readers 都应该具有唯一的 Server id,所以 Server id 必须是类似 `5400-6400` 的范围,并且该范围必须大于并行度。 请查阅 增量快照读取 章节了解更多详细信息。 |
scan.incremental.snapshot.chunk.size | optional | 8096 | Integer | 表快照的块大小(行数),读取表的快照时,捕获的表被拆分为多个块。 |
scan.snapshot.fetch.size | optional | 1024 | Integer | 读取表快照时每次读取数据的最大条数。 |
scan.startup.mode | optional | initial | String | MySQL CDC 消费者可选的启动模式, 合法的模式为 "initial","earliest-offset","latest-offset","specific-offset" 和 "timestamp"。 请查阅 启动模式 章节了解更多详细信息。 |
scan.startup.specific-offset.file | optional | (none) | String | 在 "specific-offset" 启动模式下,启动位点的 binlog 文件名。 |
scan.startup.specific-offset.pos | optional | (none) | Long | 在 "specific-offset" 启动模式下,启动位点的 binlog 文件位置。 |
scan.startup.specific-offset.gtid-set | optional | (none) | String | 在 "specific-offset" 启动模式下,启动位点的 GTID 集合。 |
scan.startup.specific-offset.skip-events | optional | (none) | Long | 在指定的启动位点后需要跳过的事件数量。 |
scan.startup.specific-offset.skip-rows | optional | (none) | Long | 在指定的启动位点后需要跳过的数据行数量。 |
server-time-zone | optional | (none) | String | 数据库服务器中的会话时区, 例如: "Asia/Shanghai". 它控制 MYSQL 中的时间戳类型如何转换为字符串。 更多请参考 这里. 如果没有设置,则使用ZoneId.systemDefault()来确定服务器时区。 |
debezium.min.row. count.to.stream.result | optional | 1000 | Integer | 在快照操作期间,连接器将查询每个包含的表,以生成该表中所有行的读取事件。 此参数确定 MySQL 连接是否将表的所有结果拉入内存(速度很快,但需要大量内存), 或者结果是否需要流式传输(传输速度可能较慢,但适用于非常大的表)。 该值指定了在连接器对结果进行流式处理之前,表必须包含的最小行数,默认值为1000。将此参数设置为`0`以跳过所有表大小检查,并始终在快照期间对所有结果进行流式处理。 |
connect.timeout | optional | 30s | Duration | 连接器在尝试连接到 MySQL 数据库服务器后超时前应等待的最长时间。 |
connect.max-retries | optional | 3 | Integer | 连接器应重试以建立 MySQL 数据库服务器连接的最大重试次数。 |
connection.pool.size | optional | 20 | Integer | 连接池大小。 |
jdbc.properties.* | optional | 20 | String | 传递自定义 JDBC URL 属性的选项。用户可以传递自定义属性,如 'jdbc.properties.useSSL' = 'false'. |
heartbeat.interval | optional | 30s | Duration | 用于跟踪最新可用 binlog 偏移的发送心跳事件的间隔。 |
debezium.* | optional | (none) | String | 将 Debezium 的属性传递给 Debezium 嵌入式引擎,该引擎用于从 MySQL 服务器捕获数据更改。 For example: 'debezium.snapshot.mode' = 'never' . 查看更多关于 Debezium 的 MySQL 连接器属性 |
scan.incremental.close-idle-reader.enabled | optional | false | Boolean | 是否在快照结束后关闭空闲的 Reader。 此特性需要 flink 版本大于等于 1.14 并且 'execution.checkpointing.checkpoints-after-tasks-finish.enabled' 需要设置为 true。 |
Jdbc连接器选项:
参数 | 是否必填 | 默认值 | 类型 | 描述 |
---|---|---|---|---|
connector | 必填 | (none) | String | 指定使用什么类型的连接器,这里应该是'jdbc' 。 |
url | 必填 | (none) | String | JDBC 数据库 url。 |
table-name | 必填 | (none) | String | 连接到 JDBC 表的名称。 |
driver | 可选 | (none) | String | 用于连接到此 URL 的 JDBC 驱动类名,如果不设置,将自动从 URL 中推导。 |
compatible-mode | 可选 | (none) | String | 数据库的兼容模式。 |
username | 可选 | (none) | String | JDBC 用户名。如果指定了 'username' 和 'password' 中的任一参数,则两者必须都被指定。 |
password | 可选 | (none) | String | JDBC 密码。 |
connection.max-retry-timeout | 可选 | 60s | Duration | 最大重试超时时间,以秒为单位且不应该小于 1 秒。 |
scan.partition.column | 可选 | (none) | String | 用于将输入进行分区的列名。请参阅下面的分区扫描部分了解更多详情。 |
scan.partition.num | 可选 | (none) | Integer | 分区数。 |
scan.partition.lower-bound | 可选 | (none) | Integer | 第一个分区的最小值。 |
scan.partition.upper-bound | 可选 | (none) | Integer | 最后一个分区的最大值。 |
scan.fetch-size | 可选 | 0 | Integer | 每次循环读取时应该从数据库中获取的行数。如果指定的值为 '0' ,则该配置项会被忽略。 |
scan.auto-commit | 可选 | true | Boolean | 在 JDBC 驱动程序上设置 auto-commit 标志, 它决定了每个语句是否在事务中自动提交。有些 JDBC 驱动程序,特别是 Postgres,可能需要将此设置为 false 以便流化结果。 |
lookup.cache | 可选 | NONE | 枚举类型 可选值: NONE, PARTIAL | 维表的缓存策略。 目前支持 NONE(不缓存)和 PARTIAL(只在外部数据库中查找数据时缓存)。 |
lookup.cache.max-rows | 可选 | (none) | Integer | 维表缓存的最大行数,若超过该值,则最老的行记录将会过期。 使用该配置时 "lookup.cache" 必须设置为 "PARTIAL”。请参阅下面的 Lookup Cache 部分了解更多详情。 |
lookup.partial-cache.expire-after-write | 可选 | (none) | Duration | 在记录写入缓存后该记录的最大保留时间。 使用该配置时 "lookup.cache" 必须设置为 "PARTIAL”。请参阅下面的 Lookup Cache 部分了解更多详情。 |
lookup.partial-cache.expire-after-access | 可选 | (none) | Duration | 在缓存中的记录被访问后该记录的最大保留时间。 使用该配置时 "lookup.cache" 必须设置为 "PARTIAL”。请参阅下面的 Lookup Cache 部分了解更多详情。 |
lookup.partial-cache.cache-missing-key | 可选 | true | Boolean | 是否缓存维表中不存在的键,默认为true。 使用该配置时 "lookup.cache" 必须设置为 "PARTIAL”。 |
lookup.max-retries | 可选 | 3 | Integer | 查询数据库失败的最大重试次数。 |
sink.buffer-flush.max-rows | 可选 | 100 | Integer | flush 前缓存记录的最大值,可以设置为 '0' 来禁用它。 |
sink.buffer-flush.interval | 可选 | 1s | Duration | flush 间隔时间,超过该时间后异步线程将 flush 数据。可以设置为 '0' 来禁用它。注意, 为了完全异步地处理缓存的 flush 事件,可以将 'sink.buffer-flush.max-rows' 设置为 '0' 并配置适当的 flush 时间间隔。 |
sink.max-retries | 可选 | 3 | Integer | 写入记录到数据库失败后的最大重试次数。 |
sink.parallelism | 可选 | (none) | Integer | 用于定义 JDBC sink 算子的并行度。默认情况下,并行度是由框架决定:使用与上游链式算子相同的并行度。 |
六、使用数据流API
包含以下Maven依赖项(通过Maven中心库提供):
<dependency>
<groupId>org.apache.flink</groupId>
<!-- 添加与你的数据库匹配的依赖 -->
<artifactId>flink-connector-mysql-cdc</artifactId>
<!-- 该依赖仅适用于稳定发布,SNAPSHOT依赖需要根据master或release分支自行构建。 -->
<version>3.1.0</version>
</dependency>
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.cdc.debezium.JsonDebeziumDeserializationSchema;
import org.apache.flink.cdc.connectors.mysql.source.MySqlSource;
public class MySqlSourceExample {
public static void main(String[] args) throws Exception {
MySqlSource<String> mySqlSource = MySqlSource.<String>builder()
.hostname("yourHostname")
.port(yourPort)
.databaseList("yourDatabaseName") // 设置捕获的数据库
.tableList("yourDatabaseName.yourTableName") // 设置捕获的表
.username("yourUsername")
.password("yourPassword")
.deserializer(new JsonDebeziumDeserializationSchema()) // 将SourceRecord转换为JSON字符串
.build();
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 启用检查点
env.enableCheckpointing(3000);
env
.fromSource(mySqlSource, WatermarkStrategy.noWatermarks(), "MySQL Source")
// 设置4个并行源任务
.setParallelism(4)
.print().setParallelism(1); // 为sink设置并行度为1,以保持消息顺序
env.execute("Print MySQL Snapshot + Binlog");
}
}