基础设置
-- 创建catalog/加载catalog,如果这个catalog已经存在就不会创建,自动加载元数据信息
CREATE CATALOG fs_paimon_catalog WITH (
'type' = 'paimon',
'warehouse' = 'hdfs://wsl01:8020/paimon/catalog'
);
-- 使用catalog
use catalog fs_paimon_catalog;
-- sqlClinet使用
-- 设置为批处理模式
SET 'execution.runtime-mode' = 'batch';
-- 设置为流处理模式
SET 'execution.runtime-mode' = 'streaming';
-- 设置查询结果显示方式,sql-clinet 特有
SET 'sql-client.execution.result-mode' = 'tableau';
-- 设置checkpoint,如果使用流模式,必须设置
SET 'execution.checkpointing.interval' = '10 s';
root@wsl01:~/soft/paimon/flink-1.17.0# cat fs_catalog.sql
CREATE CATALOG fs_catalog WITH (
'type'='paimon',
'warehouse'='file:/mnt/d/wsl/soft/paimon/catalog'
-- 'warehouse' = 'hdfs://wsl01:8020/paimon/catalog'
);
use catalog fs_catalog ;
SET 'sql-client.execution.result-mode' = 'tableau';
-- 默认批处理
SET 'execution.runtime-mode' = 'batch';
-- 指定默认启动catalog
bin/sql-client.sh -i fs_catalog.sql
DDL
创建普通表
-- 普通表,没有主键
CREATE TABLE t_sample (
user_id BIGINT,
item_id BIGINT,
behavior STRING,
dt STRING,
hh STRING
);
创建主键表
CREATE TABLE t_pk (
user_id BIGINT,
item_id BIGINT,
behavior STRING,
dt STRING,
hh STRING,
PRIMARY KEY (dt, hh, user_id) NOT ENFORCED
);
创建分区表
-- 分区表的分区字段必须是主键的子集
CREATE TABLE t_partition (
user_id BIGINT,
item_id BIGINT,
behavior STRING,
dt STRING,
hh STRING,
PRIMARY KEY (dt, hh, user_id) NOT ENFORCED
) PARTITIONED BY (dt, hh);
创建临时表
-- 如果进入paimon创建的catalog后,无法创建非paimon类型的表,如果需要借助第三方的表,需要创建临时表来使用
CREATE TEMPORARY TABLE t_temporary (
user_id BIGINT,
item_id BIGINT,
behavior STRING,
dt STRING,
hh STRING,
PRIMARY KEY (dt, hh, user_id) NOT ENFORCED
) WITH (
'connector' = 'filesystem',
'path' = 'file:///mnt/d/wsl/soft/paimon/temp_table.csv',
'format' = 'csv'
);
复制表-AS,create table as
-- create as 创建主键表,
CREATE TABLE t_create_as_pk AS SELECT * FROM t_pk;
show create table t__pk;
show create table t_create_as_pk;
-- create as 创建分区表
show create table t_partition ;
CREATE TABLE t_create_as_partition AS SELECT * FROM t_partition;
show create table t_create_as_partition ;
上述执行结果告诉我们,create as 的表,只保留原表的字段,不保留其他属性信息
-- 通过with 重新指定,关于with的用法,参考flink
CREATE TABLE t_create_as_with with ('primary-key' = 'dt,hh','partition' = 'dt') AS SELECT * FROM t_pk ;
show create table t_create_as_with;
上述执行结果告诉我们,create as 的表可以通过with 重新指定属性信息
复制表-LIKE,create table like
CREATE TABLE t_create_like like t_pk;
show create table t_pk;
show create table t_create_like;
上述执行结果告诉我们,create like 的表,保留全部信息
DML
常用管理语句
desc #{name}
show create table #{name}
show catalogs;
show databases;
show tables;
新增-普通表
insert into t_sample(user_id,item_id,behavior,dt,hh) values(100,100,'sing-sample','1','2');
insert into t_sample values(101,101,'jump-sample','1','2');
insert into t_sample select 102,102,'rap-sample','1','2';
Flink SQL> select * from t_sample;
+---------+---------+-------------+----+----+
| user_id | item_id | behavior | dt | hh |
+---------+---------+-------------+----+----+
| 100 | 100 | sing-sample | 1 | 2 |
| 101 | 101 | jump-sample | 1 | 2 |
| 102 | 102 | rap-sample | 1 | 2 |
+---------+---------+-------------+----+----+
3 rows in set
新增-主键表
insert into t_pk values(1,1,'sing','1','2');
insert into t_pk values(2,2,'jump','1','2');
insert into t_pk values(3,3,'rap','1','2');
Flink SQL> select * from t_pk;
+---------+---------+----------+----+----+
| user_id | item_id | behavior | dt | hh |
+---------+---------+----------+----+----+
| 1 | 1 | sing | 1 | 2 |
| 2 | 2 | jump | 1 | 2 |
| 3 | 3 | rap | 1 | 2 |
+---------+---------+----------+----+----+
3 rows in set
insert into t_pk values(3,3,'basketball','1','2');
Flink SQL> select * from t_pk;
+---------+---------+------------+----+----+
| user_id | item_id | behavior | dt | hh |
+---------+---------+------------+----+----+
| 1 | 1 | sing | 1 | 2 |
| 2 | 2 | jump | 1 | 2 |
| 3 | 3 | basketball | 1 | 2 |
+---------+---------+------------+----+----+
3 rows in set
-- 我们发现,主键表,写入两条相同主键的数据,后者会覆盖前者
-- 主键表有一个默认引擎,默认是就是 'merge-engine' = 'deduplicate',因此才有这个效果
新增-分区表
insert into t_partition partition(dt='2024-10-08',hh='15')(user_id,item_id,behavior) values(1,1,'sing');
insert into t_partition partition(dt='2024-10-08',hh='15')(user_id,item_id,behavior) values(2,2,'jump');
insert into t_partition(user_id,item_id,behavior,dt,hh) values(3,3,'rap','2024-10-08','16');
insert into t_partition values(4,4,'basketball','2024-10-08','16');
Flink SQL> select * from t_partition;
+---------+---------+------------+------------+----+
| user_id | item_id | behavior | dt | hh |
+---------+---------+------------+------------+----+
| 1 | 1 | sing | 2024-10-08 | 15 |
| 2 | 2 | jump | 2024-10-08 | 15 |
| 3 | 3 | rap | 2024-10-08 | 16 |
| 4 | 4 | basketball | 2024-10-08 | 16 |
+---------+---------+------------+------------+----+
4 rows in set
insert into t_partition as select * from t_sample;
insert into t_partition partition(dt='2099-10-08',hh='15')(user_id,item_id,behavior) select user_id,item_id,behavior from t_sample;
Flink SQL> select * from t_partition;
+---------+---------+-------------+------------+----+
| user_id | item_id | behavior | dt | hh |
+---------+---------+-------------+------------+----+
| 1 | 1 | sing | 2024-10-08 | 15 |
| 2 | 2 | jump | 2024-10-08 | 15 |
| 100 | 100 | sing-sample | 2099-10-08 | 15 |
| 101 | 101 | jump-sample | 2099-10-08 | 15 |
| 102 | 102 | rap-sample | 2099-10-08 | 15 |
| 3 | 3 | rap | 2024-10-08 | 16 |
| 4 | 4 | basketball | 2024-10-08 | 16 |
| 100 | 100 | sing-sample | 1 | 2 |
| 101 | 101 | jump-sample | 1 | 2 |
| 102 | 102 | rap-sample | 1 | 2 |
+---------+---------+-------------+------------+----+
10 rows in set
Flink SQL> insert overwrite t_partition(user_id,item_id,behavior) values(5,5,'non-partition');
[ERROR] Could not execute SQL statement. Reason:
org.apache.calcite.sql.validate.SqlValidatorException: Column 'dt' has no default value and does not allow NULLs
以上多种写入方式都支持,把分区字段当成普通字段用就行,但是分区字段不能为空
新增-覆盖写入
Flink SQL> select * from t_pk;
+---------+---------+------------+----+----+
| user_id | item_id | behavior | dt | hh |
+---------+---------+------------+----+----+
| 1 | 1 | sing | 1 | 2 |
| 2 | 2 | jump | 1 | 2 |
| 3 | 3 | basketball | 1 | 2 |
+---------+---------+------------+----+----+
insert into t_pk values(10,10,'overwrite','1','2');
Flink SQL> select * from t_pk;
+---------+---------+-----------+----+----+
| user_id | item_id | behavior | dt | hh |
+---------+---------+-----------+----+----+
| 10 | 10 | overwrite | 1 | 2 |
+---------+---------+-----------+----+----+
1 row in set
overwrite 会直接清空表,不会考虑主键
insert into t_partition partition(dt='2024-10-08',hh='15')(user_id,item_id,behavior) values(1,1,'sing');
insert into t_partition partition(dt='2024-10-08',hh='15')(user_id,item_id,behavior) values(2,2,'jump');
insert into t_partition partition(dt='2024-10-09',hh='15')(user_id,item_id,behavior) values(3,3,'rap');
Flink SQL> select * from t_partition;
+---------+---------+----------+------------+----+
| user_id | item_id | behavior | dt | hh |
+---------+---------+----------+------------+----+
| 3 | 3 | rap | 2024-10-09 | 15 |
| 1 | 1 | sing | 2024-10-08 | 15 |
| 2 | 2 | jump | 2024-10-08 | 15 |
+---------+---------+----------+------------+----+
3 rows in set
insert overwrite t_partition partition(dt='2024-10-08',hh='15')(user_id,item_id,behavior) values(4,4,'basketball');
Flink SQL> select * from t_partition;
+---------+---------+------------+------------+----+
| user_id | item_id | behavior | dt | hh |
+---------+---------+------------+------------+----+
| 3 | 3 | rap | 2024-10-09 | 15 |
| 4 | 4 | basketball | 2024-10-08 | 15 |
+---------+---------+------------+------------+----+
2 rows in set
分区表只会overwrite 当前要写入分区
Flink SQL> select * from t_partition;
+---------+---------+----------+------------+----+
| user_id | item_id | behavior | dt | hh |
+---------+---------+----------+------------+----+
| 3 | 3 | rap | 2024-10-09 | 15 |
| 1 | 1 | sing | 2024-10-08 | 15 |
| 2 | 2 | jump | 2024-10-08 | 15 |
+---------+---------+----------+------------+----+
3 rows in set
-- 对指定分区写入空记录,没有效果
INSERT OVERWRITE t_partition PARTITION (`dt` = '2024-10-08', `hh` = '15') SELECT user_id,item_id,behavior FROM t_partition WHERE false;
Flink SQL> select * from t_partition;
+---------+---------+----------+------------+----+
| user_id | item_id | behavior | dt | hh |
+---------+---------+----------+------------+----+
| 3 | 3 | rap | 2024-10-09 | 15 |
| 1 | 1 | sing | 2024-10-08 | 15 |
| 2 | 2 | jump | 2024-10-08 | 15 |
+---------+---------+----------+------------+----+
3 rows in set
-- 对指定分区写入空记录,指定 /*+ OPTIONS('dynamic-partition-overwrite'='false') */,会清空指定的分区
INSERT OVERWRITE t_partition /*+ OPTIONS('dynamic-partition-overwrite'='false') */ PARTITION (`dt` = '2024-10-08', `hh` = '15') SELECT user_id,item_id,behavior FROM t_partition WHERE false;
Flink SQL> select * from t_partition;
+---------+---------+----------+------------+----+
| user_id | item_id | behavior | dt | hh |
+---------+---------+----------+------------+----+
| 3 | 3 | rap | 2024-10-09 | 15 |
+---------+---------+----------+------------+----+
1 row in set
-- 不指定分区写入空记录,指定 /*+ OPTIONS('dynamic-partition-overwrite'='false') */,会清空所有分区,truncate
INSERT OVERWRITE t_partition /*+ OPTIONS('dynamic-partition-overwrite'='false') */ SELECT * FROM t_partition WHERE false;
Flink SQL> select * from t_partition;
Empty set
/*+ OPTIONS('dynamic-partition-overwrite'='false') */ Flink默认的覆盖模式是动态分区覆盖 (即Paimon只删除覆盖数据中出现的分区)。可以配置动态分区覆盖将其更改为静态覆盖。
paimon 没有truncate,因此我们可以借助overwite+静态覆盖,这个实现truncate
查询
修改
Important table properties setting:
- Only primary key table supports this feature. 表必须有主键
- MergeEngine needs to be deduplicate or partial-update to support this feature. 合并引擎必须为deduplicate,以后会支持partial-update
- Do not support updating primary keys. 不能修改主键
- Flink 版本1.17 及以上版本才支持
- 必须是批处理模式
UPDATE table_identifier SET column1 = value1, column2 = value2, ... WHERE condition;
Flink SQL> select * from t_partition;
+---------+---------+-------------+----+----+
| user_id | item_id | behavior | dt | hh |
+---------+---------+-------------+----+----+
| 100 | 100 | sing-sample | 1 | 2 |
| 101 | 101 | jump-sample | 1 | 2 |
| 102 | 102 | rap-sample | 1 | 2 |
+---------+---------+-------------+----+----+
3 rows in set
update t_partition set behavior = 'baskteball-sample' where user_id =100;
Flink SQL> select * from t_partition;
+---------+---------+-------------------+----+----+
| user_id | item_id | behavior | dt | hh |
+---------+---------+-------------------+----+----+
| 100 | 100 | baskteball-sample | 1 | 2 |
| 101 | 101 | jump-sample | 1 | 2 |
| 102 | 102 | rap-sample | 1 | 2 |
+---------+---------+-------------------+----+----+
3 rows in set
删除
常用属性
CREATE TABLE my_table (
user_id BIGINT,
item_id BIGINT,
behavior STRING,
dt STRING,
hh STRING,
PRIMARY KEY (dt, hh, user_id) NOT ENFORCED
) PARTITIONED BY (dt, hh);
统计数优化
paimon 会默认为每一列添加3个统计属性:最大值、最小值、null值数量
有四种配置来约束统计属性
- full:为所有数据添加最大值、最小值、null值数量统计
- truncate(length):截断length长度后,为所有数据添加最大值、最小值、null值数量统计,这个是默认值:默认length 16,为了避免长文本字段的统计
- counts:只对null值数量统计
- none:不统计
如果需要修改某个字段的统计属性
- fields.{field_name}.stats-mode,with ( ‘fields.behavior.stats-mode’ = ‘full’ )
官网原文:https://paimon.apache.org/docs/master/flink/sql-ddl/#specify-statistics-mode
字段默认值
paimon表可以设置字段默认值,但是 不能 对主键字段设置默认值
如果需要修改某个字段的统计属性
- fields.{field_name}.default-value
- with ( ‘fields.behavior.default-value’ = ‘sing’ )
官网原文:https://paimon.apache.org/docs/master/flink/sql-ddl/#field-default-value
聚类写入
在批作业中配置该参数可以启用聚类写入功能,使数据在特定列上按大小范围聚集分布,从而提升该列的查询速度。只能在批处理或者append table(流处理)中使用。
多个列名请使用英文逗号(,)进行分隔,例如’col1,col2’。
- sink.clustering.by-columns
- with ( ‘sink.clustering.by-columns’ = ‘user_id,item_id’ )
也可以使用Hints
- INSERT INTO my_table /*+ OPTIONS(‘sink.clustering.by-columns’ = ‘a,b’) */ SELECT * FROM source;
动态覆盖
Flink overwrite模式,在分区表中默认是动态分区覆盖,也就是说在使用overwrite时,只覆盖当前写入分区的数据,写入数据为空时,不进行覆盖,我们可以设置为静态覆盖,当写入数据为空时,也会覆盖。如果写入的分区为空则覆盖所有分区!
Hints,跟在表的后边,也就是声明本次sql的执行策略,dynamic-partition-overwrite=false > 静态覆盖,truncate的替代语法(TRUNCATE TABLE my_table 需要flink 1.18 之后才支持)
INSERT OVERWRITE my_table /*+ OPTIONS(‘dynamic-partition-overwrite’=‘false’) */