1.1、mysql源表
CREATE TABLE `mysql_orders` (
`order_id` varchar(100) NOT NULL,
`user_id` varchar(100) DEFAULT NULL,
`amount` decimal(10,2) DEFAULT NULL,
`update_time` timestamp(3) NOT NULL DEFAULT CURRENT_TIMESTAMP(3) ON UPDATE CURRENT_TIMESTAMP(3),
PRIMARY KEY (`order_id`)
)
mysql 开启bin_log
, 设置ROW
1.2、flink cdc同步mysql数据
参考: https://blog.csdn.net/wuxintdrh/article/details/146165736
CREATE TABLE mysql_cdc_source (
order_id STRING,
user_id STRING,
amount DECIMAL(10,2),
update_time TIMESTAMP(3),
dt STRING,
PRIMARY KEY (order_id) NOT ENFORCED
) WITH (
'connector' = 'mysql-cdc',
'hostname' = 'chb1',
'port' = '3306',
'username' = 'root',
'password' = '123456',
'database-name' = 'paimon_test',
'table-name' = 'mysql_orders',
'server-time-zone' = 'Asia/Shanghai' -- 时区配置(避免时间偏差)
);
select * from mysql_cdc_source;
1.3、同步到paimon
创建paimon表
CREATE TABLE orders (
order_id STRING PRIMARY KEY NOT ENFORCED,
user_id STRING,
amount DECIMAL(10,2),
update_time TIMESTAMP(3),
dt STRING
) WITH (
'merge-engine' = 'deduplicate', -- 默认去重引擎,保留最新记录
'changelog-producer' = 'input', -- 直接存储 CDC 的原始变更日志
'bucket' = '4', -- 分桶优化写入性能
'snapshot.time-retained' = '7d' -- 保留 7 天快照
);
同步数据
INSERT INTO paimon_catalog.`default`.orders
SELECT
order_id,
user_id,
amount,
update_time,
DATE_FORMAT(update_time, 'yyyy-MM-dd') AS dt -- 动态分区
FROM default_catalog.default_database.mysql_cdc_source;
查询paimon表:
select * from paimon_catalog.`default`.orders;
报错:
[ERROR] Could not execute SQL statement. Reason:
org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException: Could not acquire the minimum required resources.
End of exception on server side
排查发现jobManager资源充足,taskManager slot还有可用,taskManager memory资源偏小,调大资源后运行正常。
二、通过paimon-flink-action同步数据
参考:https://paimon.apache.org/docs/1.0/cdc-ingestion/mysql-cdc/
报错:ClassNotFoundException: org.apache.kafka.connect.errors.ConnectException
,引入connect-api-3.2.1.jar
又报错: java.lang.NoSuchMethodError: io.debezium.config.Field.withType(Lorg/apache/kafka/common/config/ConfigDef$Type;)Lio/debezium/config/Field;