一、背景
在大数据领域,初始阶段业务数据通常被存储于关系型数据库,如MySQL。然而,为满足日常分析和报表等需求,大数据平台采用多种同步方式,以适应这些业务数据的不同存储需求。这些同步存储方式包括离线仓库和实时仓库等,选择取决于业务需求和数据特性。
一项常见需求是,业务使用人员需要大数据分析平台中实时查看业务表数据,示例如下:
- [Mysql] 业务数据 - 用户表全量数据:
id | name | phone | gender | create_time | update_time |
---|---|---|---|---|---|
1 | jack | 111 | 男 | 2023-06-01 13:00:00 | 2023-06-01 13:00:00 |
2 | jason | 222 | 男 | 2023-06-01 13:00:00 | 2023-06-01 13:00:00 |
3 | tom | 333 | 男 | 2023-06-01 13:00:00 | 2023-06-01 13:00:00 |
- [Mysql] 2023-06-02 业务数据新增了一名用户,且更改了tom的手机号,此时表数据如下:
id | name | phone | gender | create_time | update_time |
---|---|---|---|---|---|
1 | jack | 111 | 男 | 2023-06-01 13:00:00 | 2023-06-01 13:00:00 |
2 | jason | 222 | 男 | 2023-06-01 13:00:00 | 2023-06-01 13:00:00 |
3 | tom | 444 | 男 | 2023-06-01 13:00:00 | 2023-06-02 09:00:00 |
4 | tony | 555 | 男 | 2023-06-02 10:00:00 | 2023-06-02 10:00:00 |
加粗为更新/新增数据
- [大数据平台] 2023-06-02日业务人员在大数据平台中查看用户表实时数据,期望数据和Mysql业务数据一致,如下:
id | name | phone | gender | create_time | update_time |
---|---|---|---|---|---|
1 | jack | 111 | 男 | 2023-06-01 13:00:00 | 2023-06-01 13:00:00 |
2 | jason | 222 | 男 | 2023-06-01 13:00:00 | 2023-06-01 13:00:00 |
3 | tom | 444 | 男 | 2023-06-01 13:00:00 | 2023-06-02 09:00:00 |
4 | tony | 555 | 男 | 2023-06-02 10:00:00 | 2023-06-02 10:00:00 |
根据上述需求,我们可以得出需要构建实时表以满足业务数据的实时分析需求。
二、技术架构
为了实现上述需求,我们可以利用实时同步任务将业务数据实时同步至下游的 MPP(Massively Parallel Processing)库,从而构建实时表。结合市场上常见的技术组件,本文选择了实时引擎 FlinkCDC 和 Doris(MPP)库作为实时同步技术架构。整体架构如下:
三、实现方式
FlinkCDC 提供了三种实现方式,具体如下:
- Flink run jar 模式: 这种模式适用于处理复杂的流数据。当使用简单的 Flink SQL 无法满足复杂业务需求时(例如拉链表等),可以通过编写自定义逻辑的方式,将其打包成 Jar 包并运行。以下是一个示例:
// 示例代码
public class MySqlSourceExample {
public static void main(String[] args) throws Exception {
// 配置数据源和处理逻辑
...
// 实时任务启动
env.execute("Print MySQL Snapshot + Binlog");
}
}
更多信息:MysqlCDC connector
- sql脚本模式:
bin/sql-client -f file
,这种模式适用于简单的流水任务,例如实时表同步等简单的 ETL 任务。你可以通过编写 SQL 文件并使用 Flink SQL 客户端执行,而无需编写额外的 Java 代码。以下是一个示例:
# 示例 mysql2doris SQL 文件
set 'execution.checkpointing.interval'='30000';
create table mysql_user(
# ...
) WITH (
# ...
);
create table doris_user(
# ...
) WITH (
# ...
);
insert into doris_user select * from mysql_user;
执行如下:
$> bin/sql-client.sh --file /usr/local/flinksql/mysql2doris
更多信息:FlinkSQL 客户端
- FlinkCDC Pipeline: 这是 FlinkCDC 3.0 版本引入的全新功能,旨在通过简单的配置即可实现数据同步,无需编写复杂的 Flink SQL。缺点是需要使用 Flink 版本 1.16 或更高版本。以下是一个示例:
# 示例配置文件
source:
type: mysql
name: MySQL Source
hostname: 127.0.0.1
port: 3306
username: admin
password: pass
tables: adb.\.*, bdb.user_table_[0-9]+, [app|web].order_\.*
server-id: 5401-5404
sink:
type: doris
name: Doris Sink
fenodes: 127.0.0.1:8030
username: root
password: pass
pipeline:
name: MySQL to Doris Pipeline
parallelism: 4
执行如下:
$> bin/flink-cdc.sh mysql-to-doris.yaml
更多信息:FlinkCDC Pipeline
这三种方式各有优劣,可以根据具体需求和场景选择合适的实现方式。考虑到前几篇 Flink 实时数仓同步相关博客都采用了 Jar 包形式,为了给读者带来不同的体验,本文采用 sql脚本模式 模式来实现背景需求。
四、sql脚本模式 + 实时表实现
4.1、实时表设计
背景需求需要实时查看业务表数据,因此在Doris中设计表结构时采用了Unique数据模型。建表语句如下:
CREATE TABLE `example_user_real`
(
`id` INT NOT NULL COMMENT '用户id',
`name` STRING NULL COMMENT '用户昵称',
`phone` STRING NULL COMMENT '手机号',
`gender` CHAR(5) NULL COMMENT '用户性别',
`create_time` DATETIMEV2(0) NULL COMMENT '用户注册时间',
`update_time` DATETIMEV2(0) NULL COMMENT '用户更新时间'
) ENGINE=OLAP
UNIQUE KEY(`id`)
COMMENT '用户实时表'
DISTRIBUTED BY HASH(id) BUCKETS AUTO;
关于mysql type 转换 doris type 可参考 Doris 源码内置转换工具
4.2、实时同步逻辑
-
首先,由于实时流水表同步使用Flink-cdc读取关系型数据库,flink-cdc提供了四种模式: “initial”,“earliest-offset”,“latest-offset”,“specific-offset” 和 “timestamp”。本文使用的Flink-connector-mysq是2.3版本,这里简单介绍一下这四种模式:
initial
(默认):在第一次启动时对受监视的数据库表执行初始快照,并继续读取最新的 binlog。earliest-offset
:跳过快照阶段,从可读取的最早 binlog 位点开始读取latest-offset
:首次启动时,从不对受监视的数据库表执行快照, 连接器仅从 binlog 的结尾处开始读取,这意味着连接器只能读取在连接器启动之后的数据更改。specific-offset
:跳过快照阶段,从指定的 binlog 位点开始读取。位点可通过 binlog 文件名和位置指定,或者在 GTID 在集群上启用时通过 GTID 集合指定。timestamp
:跳过快照阶段,从指定的时间戳开始读取 binlog 事件。
-
本文采用
initial
模式同步任务 -
编写mysql2doris SQL文件,这里需要注意的是类型转换:由于 mysql2doris 是 Flink SQL 文件,故需要将 mysql type -> flink type 以及 doris type -> flink type,示例如下:
set 'execution.checkpointing.interval'='30000';
set 'state.checkpoints.dir'='file:///home/finloan/flink-1.16.1/checkpoint/mysql2doris';
create table mysql_user(
`id` INT,
`name` STRING,
`phone` STRING,
`gender` CHAR(5),
`create_time` TIMESTAMP(0),
`update_time` TIMESTAMP(0),
PRIMARY KEY(id) NOT ENFORCED
) WITH (
'connector'='mysql-cdc',
'hostname'='10.185.163.177',
'port' = '80',
'username'='rouser',
'password'='123456',
'database-name' = 'database',
'table-name'='user'
);
create table doris_user(
`id` INT,
`name` STRING,
`phone` STRING,
`gender` STRING,
`create_time` TIMESTAMP(0),
`update_time` TIMESTAMP(0)
) WITH (
'password'='password',
'connector'='doris',
'fenodes'='11.113.208.103:8030',
'table.identifier'='database.user',
'sink.label-prefix'='唯一任务标识,每次启动都要唯一',
'username'='username'
);
insert into doris_user select * from mysql_user;
类型转换参考:
Doris & Flink Column Type Mapping
Mysql CDC Data Type Mapping
- 执行命令如下:此时任务已经提交到flink 集群,本文中使用的是Flink-Cluster 模式而非yarn模式
$> ./sql-client.sh -f ~/mysql2doris
Flink SQL> [INFO] Submitting SQL update statement to the cluster...
[INFO] SQL update statement has been successfully submitted to the cluster:
Job ID: 5c683fba8567e65509870a6db4e99fa5
- 登录flinkUi界面查看任务,如下所示:
- 此时Doris 数据如下:
id | name | phone | gender | create_time | update_time |
---|---|---|---|---|---|
1 | jack | 111 | 男 | 2023-06-01 13:00:00 | 2023-06-01 13:00:00 |
2 | jason | 222 | 男 | 2023-06-01 13:00:00 | 2023-06-01 13:00:00 |
3 | tom | 333 | 男 | 2023-06-01 13:00:00 | 2023-06-01 13:00:00 |
- [Mysql]业务数据2023-06-02日新增了一名tony用户,且更改了tom的手机号,此时表数据如下:
id | name | phone | gender | create_time | update_time | 备注 |
---|---|---|---|---|---|---|
1 | jack | 111 | 男 | 2023-06-01 13:00:00 | 2023-06-01 13:00:00 | |
2 | jason | 222 | 男 | 2023-06-01 13:00:00 | 2023-06-01 13:00:00 | |
3 | tom | 444 | 男 | 2023-06-01 13:00:00 | 2023-06-02 09:00:00 | (手机号从333->444) |
4 | tony | 555 | 男 | 2023-06-02 10:00:00 | 2023-06-02 10:00:00 | (新增tony用户) |
- 此时Doris 数据如下:
id | name | phone | gender | create_time | update_time |
---|---|---|---|---|---|
1 | jack | 111 | 男 | 2023-06-01 13:00:00 | 2023-06-01 13:00:00 |
2 | jason | 222 | 男 | 2023-06-01 13:00:00 | 2023-06-01 13:00:00 |
3 | tom | 444 | 男 | 2023-06-01 13:00:00 | 2023-06-02 09:00:00 |
4 | tony | 555 | 男 | 2023-06-02 10:00:00 | 2023-06-02 10:00:00 |
五、总结
本文介绍了实时数仓同步的实操案例,通过 FlinkCDC 和 Doris 实现了实时表的构建和数据同步。在实现过程中,尤其压迫注意数据类型的转换问题,以确保不同数据存储之间的兼容性。
此外要根据具体需求和场景,选择合适的实现方式,本文选择了 sql-client --f file 模式来实现实时表需求,旨在为读者提供了不同的实践体验。
六、相关资料
- Doris 数据模型
- Flink Doris Connector
- FlinkCDC Pipeline
- FlinkSQL 客户端
- Flink Run jar 模式
- Doris 源码内置转换工具
- Doris & Flink Column Type Mapping
- Mysql CDC Data Type Mapping