大数据笔记之flink-cdc实时同步数据(mysql -->doris)
一、基本概念
Flink CDC
是一个基于流的数据集成工具,旨在为用户提供一套功能更加全面的编程接口(API)。 该工具使得用户能够以 YAML配置文件的形式,优雅地定义其 ETL(Extract, Transform, Load)流程,并协助用户自动化生成定制化的 Flink 算子并且提交 Flink 作业。 Flink CDC 在任务提交过程中进行了优化,并且增加了一些高级特性,如表结构变更自动同步(Schema Evolution)、数据转换(Data Transformation)、整库同步(Full Database Synchronization)以及 精确一次(Exactly-once)语义。
Flink CDC 深度集成并由 Apache Flink 驱动,提供以下核心功能:✅ 端到端的数据集成框架
✅ 为数据集成的用户提供了易于构建作业的 API
✅ 支持在 Source 和 Sink 中处理多个表
✅ 整库同步
✅具备表结构变更自动同步的能力(Schema Evolution),
二、准备工作
1.doris的搭建,请参考 大数据笔记之doris的安装部署
2.mysql的安装,我用docker安装了mysql8.0.17,注意,这里挂载了配置目录、日志目录、数据目录
docker run --name mysql8.0.17 \
-p 3307:3306 \
-v /usr/local/mysql8/conf.d:/etc/mysql/conf.d \
-v /usr/local/mysql8/logs:/var/log/mysql \
-v /usr/local/mysql8/data1:/var/lib/mysql \
-e MYSQL_ROOT_PASSWORD=123456\
-d mysql:8.0.17
3.修改mysql的配置文件
[mysqld]
default-time-zone = 'Asia/Shanghai'
#设置3306端口
port=3306
#允许最大连接数
max_connections=200
#允许连接失败的次数
max_connect_errors=10
#默认使用“mysql_native_password”插件认证
default_authentication_plugin=mysql_native_password
#服务端使用的字符集默认为8比特编码的latin1字符集
character-set-server=UTF8MB4
#开启查询缓存
explicit_defaults_for_timestamp=true
#创建新表时将使用的默认存储引擎
default-storage-engine=INNODB
#等待超时时间秒
wait_timeout=60
#交互式连接超时时间秒
interactive-timeout=600
#添加如下配置信息,开启`test`以及`test_route`数据库的Binlog
##数据库id
server-id = 1
###启动binlog,该参数的值会作为binlog的文件名
log-bin=mysql-bin
###binlog类型,maxwell要求为row类型
binlog_format=ROW
binlog-row-image=FULL
###特别注意:启用binlog的数据库,需根据实际情况作出修改,这里是允许binlog日志监听的库
binlog-do-db=test
binlog-do-db=test_route
特别注意: binlog-do-db=test binlog-do-db=test_route 是我要开启数据同步的库
三、部署flink-cdc
3.1 这里默认已经部署好了flink,详情参看 大数据笔记之flink集群的安装部署
把flink-cdc-3.0.0-bin.tar.gz上传到/opt/module/flink,解压。
正在解压后的 /opt/module/flink/flink-cdc-3.0.0/lib 目录上传:
flink-cdc-pipeline-connector-mysql-3.0.0.jar
flink-cdc-pipeline-connector-doris-3.0.0.jar
3.2 编写配置文件
在 /opt/module/flink/flink-cdc-3.0.0 创建job目录,在job目录下创建一个文件(名字任取) mysql-to-doris2.yaml
source:
type: mysql
hostname: 192.168.10.100
port: 3307
username: root
password: "123456"
tables: test_route.\.*
server-id: 5400-5404
server-time-zone: "Asia/Shanghai"
sink:
type: doris
fenodes: 192.168.10.171:8030,192.168.10.172:8030,192.168.10.173:8030
benodes: 192.168.10.171:8040,192.168.10.172:8040,192.168.10.173:8040
username: root
password: "123456"
table.create.properties.light_schema_change: true
table.create.properties.replication_num: 1
route:
- source-table: test_route.t1
sink-table: doris_test_route.doris_t1
- source-table: test_route.t2
sink-table: doris_test_route.doris_t1
- source-table: test_route.t3
sink-table: doris_test_route.doris_t3
pipeline:
name: Sync MySQL Database to Doris
parallelism: 1
3.3 提交flink任务
在目录:/opt/module/flink/flink-cdc-3.0.0 执行:
bin/flink-cdc.sh job/mysql-to-doris2.yaml
3.4 查看运行状态
在flink控制台可以查看正在运行的任务
四、查看数据
查看doris的数据表,已经把数据同步过来了,如果增删改myql表,doris也会相应更改