环境创建
CREATE CATALOG fs_catalog WITH (
'type'='paimon',
'warehouse'='file:/data/soft/paimon/catalog'
);
USE CATALOG fs_catalog;
drop table if exists t_changelog_input;
CREATE TABLE t_changelog_input (
age BIGINT,
money BIGINT,
hh STRING,
PRIMARY KEY (hh) NOT ENFORCED
)WITH (
'merge-engine' = 'deduplicate',
'changelog-producer' = 'lookup'
);
paimon的snapshot和checkpoint之间的关系
- 一次snapshot会产生一个data文件
- 一次checkpoint会产生1-2个snapshot文件,要看这次checkpoint是否触发compaction,触发了就是2个data文件(一个是合并后的数据,一个本次checkpoint写入数据),否则只有一个(本次checkpoint写入数据)
- 流式写入根据checkpoint间隔,定期进行checkpoint
- 批写(手动执行sql脚本)每一个sql会立即生成一次checkpoint效果
执行一次插入操作
insert into t_changelog_input values(10,1000,'1');
root@wsl01:/data/soft/paimon/catalog/default.db/t_changelog_input/bucket-0# ll
total 8
-rw-r–r-- 1 root root 1217 Nov 27 13:40 changelog-aa32a677-6df6-40fa-83bd-3b7a6d38c84a-0.parquet
-rw-r–r-- 1 root root 1217 Nov 27 13:40 data-aa32a677-6df6-40fa-83bd-3b7a6d38c84a-1.parquet
changelog文件
data文件
相同主键,再执行一次插入操作
insert into t_changelog_input values(10,2000,'1');
root@wsl01:/data/soft/paimon/catalog/default.db/t_changelog_input/bucket-0# ll
total 16
-rw-r–r-- 1 root root 1217 Nov 27 13:40 changelog-aa32a677-6df6-40fa-83bd-3b7a6d38c84a-0.parquet
-rw-r–r-- 1 root root 1217 Nov 27 13:43 changelog-abee540c-f29d-4d18-8578-2d173d43452b-0.parquet
-rw-r–r-- 1 root root 1217 Nov 27 13:40 data-aa32a677-6df6-40fa-83bd-3b7a6d38c84a-1.parquet
-rw-r–r-- 1 root root 1217 Nov 27 13:43 data-abee540c-f29d-4d18-8578-2d173d43452b-1.parquet
changelog文件
data文件
sqlclient 流式查询
-- Flink SQL
SET 'execution.runtime-mode' = 'streaming';
--设置检查点的间隔为10秒
SET 'execution.checkpointing.interval'='10 s';
set parallelism.default=1;
-- 使用changelog,展示op,操作类型
SET 'sql-client.execution.result-mode'='changelog';
-- 从当前快照开始,读取变化情况
SELECT * FROM t_changelog_input;
-- 从ID=1的快照开始,读取变化情况
SELECT * FROM t_changelog_input /*+ OPTIONS('scan.snapshot-id' = '1') */;
sqlclient 批查询
Flink SQL> SET 'sql-client.execution.result-mode'='tableau';
[INFO] Execute statement succeed.
Flink SQL> SET 'execution.runtime-mode' = 'batch';
[INFO] Execute statement succeed.
Flink SQL> SELECT * FROM t_changelog_input;
+-----+-------+----+
| age | money | hh |
+-----+-------+----+
| 10 | 2000 | 1 |
+-----+-------+----+
1 row in set
Flink SQL> SELECT * FROM t_changelog_input /*+ OPTIONS('scan.snapshot-id' = '1') */;
+-----+-------+----+
| age | money | hh |
+-----+-------+----+
| 10 | 1000 | 1 |
+-----+-------+----+
1 row in set
Flink SQL> SELECT * FROM t_changelog_input /*+ OPTIONS('scan.snapshot-id' = '2') */;
+-----+-------+----+
| age | money | hh |
+-----+-------+----+
| 10 | 2000 | 1 |
+-----+-------+----+
1 row in set
执行cdc采集
使用方法
https://paimon.apache.org/docs/0.9/flink/cdc-ingestion/mysql-cdc/
适配情况
https://nightlies.apache.org/flink/flink-cdc-docs-release-3.1/docs/connectors/flink-sources/overview/
MYSQL数据源
https://nightlies.apache.org/flink/flink-cdc-docs-release-3.1/docs/connectors/flink-sources/mysql-cdc/
下载地址
https://repo1.maven.org/maven2/org/apache/flink/flink-sql-connector-mysql-cdc/3.1.0/flink-sql-connector-mysql-cdc-3.1.0.jar
bin/flink run \
lib/paimon-flink-action-0.9.0.jar \
mysql_sync_table \
--warehouse 'file:/data/soft/paimon/catalog' \
--database 'default' \
--table 't_changelog_input' \
--mysql_conf hostname=wsl \
--mysql_conf username=root \
--mysql_conf password=root \
--mysql_conf database-name='test' \
--mysql_conf table-name='t_changelog_input' \
--table_conf changelog-producer=input \
--table_conf sink.parallelism=1
mysql新增一条数据,然后再修改
changelog文件,新增数据时,和data数据一样。
修改mysql数据时,paimon同步到数据后发现,changelog和data不一样了,这次是两条,而data只有一条
data文件
changelog文件
流式消费的结果也是正确的
结论
- changelog=input的情况下,一次checkpoint产生一个data文件的同时,也会产生一个changelog文件
- changelog文件内容和data文件内容完全一致
- input 情况下,如果你的操作不完整,那么流式读取的结果也是不对的
- 上述操作insert2次相同主键,按照主键表的逻辑,应该是会出现-D +I 或者 -U +U 的场景,但是由于input模式,不会额外的处理changelog,你insert两次,我的changelog就写两次insert,你流式读取,那我就重放我的changelog,你写的changelog不对,流处理结果也就不对了,要想对,那就需要先insert,然后delete,再insert,或者执行update操作,总之不能利用主键表根据主键更新的特性来更新数据
- 但是批处理的结果是正确的,因为批处理不管changelog
- changelog=input就是为流处理设计的,但是数据处理操作必须要标准,必须要有-U +U -D +I的操作,而CDC任务接到的数据就是标准的,因此他两个一般是绑定的,cdc采集任务的表,如果需要流处理消费,最好使用changelog=input
应用场景
- 用于cdc采集的表
- and
- 后期要进行流式处理的表