paimon,基础查询语句测试

news2024/11/29 2:39:20

基础设置

-- 创建catalog/加载catalog,如果这个catalog已经存在就不会创建,自动加载元数据信息

CREATE CATALOG fs_paimon_catalog WITH (
'type' = 'paimon',
'warehouse' = 'hdfs://wsl01:8020/paimon/catalog'
);
-- 使用catalog
use catalog fs_paimon_catalog;

-- sqlClinet使用
-- 设置为批处理模式
SET 'execution.runtime-mode' = 'batch';
-- 设置为流处理模式
SET 'execution.runtime-mode' = 'streaming';
-- 设置查询结果显示方式,sql-clinet 特有
SET 'sql-client.execution.result-mode' = 'tableau';
-- 设置checkpoint,如果使用流模式,必须设置
SET 'execution.checkpointing.interval' = '10 s';



root@wsl01:~/soft/paimon/flink-1.17.0# cat fs_catalog.sql
CREATE CATALOG fs_catalog WITH (
    'type'='paimon',
    'warehouse'='file:/mnt/d/wsl/soft/paimon/catalog'
    -- 'warehouse' = 'hdfs://wsl01:8020/paimon/catalog'
);
use catalog fs_catalog ;
SET 'sql-client.execution.result-mode' = 'tableau';
-- 默认批处理
SET 'execution.runtime-mode' = 'batch';

-- 指定默认启动catalog
bin/sql-client.sh -i fs_catalog.sql


DDL

创建普通表

-- 普通表,没有主键

CREATE TABLE t_sample (
      user_id BIGINT,
      item_id BIGINT,
      behavior STRING,
      dt STRING,
      hh STRING
);

创建主键表

CREATE TABLE t_pk (
      user_id BIGINT,
      item_id BIGINT,
      behavior STRING,
      dt STRING,
      hh STRING,
      PRIMARY KEY (dt, hh, user_id) NOT ENFORCED
);

创建分区表

-- 分区表的分区字段必须是主键的子集

CREATE TABLE t_partition (
      user_id BIGINT,
      item_id BIGINT,
      behavior STRING,
      dt STRING,
      hh STRING,
      PRIMARY KEY (dt, hh, user_id) NOT ENFORCED
) PARTITIONED BY (dt, hh);

创建临时表

-- 如果进入paimon创建的catalog后,无法创建非paimon类型的表,如果需要借助第三方的表,需要创建临时表来使用

CREATE TEMPORARY TABLE t_temporary (
     user_id BIGINT,
     item_id BIGINT,
     behavior STRING,
     dt STRING,
     hh STRING,
     PRIMARY KEY (dt, hh, user_id) NOT ENFORCED
) WITH (
    'connector' = 'filesystem',
    'path' = 'file:///mnt/d/wsl/soft/paimon/temp_table.csv',
    'format' = 'csv'
);

复制表-AS,create table as


-- create as 创建主键表,

CREATE TABLE t_create_as_pk AS SELECT * FROM t_pk;
show create table t__pk;
show create table t_create_as_pk;


-- create as 创建分区表
show create table t_partition ;
CREATE TABLE t_create_as_partition AS SELECT * FROM t_partition;
show create table t_create_as_partition ;

上述执行结果告诉我们,create as 的表,只保留原表的字段,不保留其他属性信息


-- 通过with 重新指定,关于with的用法,参考flink
CREATE TABLE t_create_as_with with ('primary-key' = 'dt,hh','partition' = 'dt') AS SELECT * FROM t_pk  ;
show create table t_create_as_with;

上述执行结果告诉我们,create as 的表可以通过with 重新指定属性信息

复制表-LIKE,create table like

CREATE TABLE t_create_like like t_pk;
show create table t_pk;
show create table t_create_like;

上述执行结果告诉我们,create like 的表,保留全部信息

DML

常用管理语句

desc #{name}
show create table #{name}
show catalogs;
show databases;
show tables;

新增-普通表

insert into t_sample(user_id,item_id,behavior,dt,hh) values(100,100,'sing-sample','1','2');

insert into t_sample  values(101,101,'jump-sample','1','2');

insert into t_sample  select 102,102,'rap-sample','1','2';

Flink SQL> select * from t_sample;
+---------+---------+-------------+----+----+
| user_id | item_id |    behavior | dt | hh |
+---------+---------+-------------+----+----+
|     100 |     100 | sing-sample |  1 |  2 |
|     101 |     101 | jump-sample |  1 |  2 |
|     102 |     102 |  rap-sample |  1 |  2 |
+---------+---------+-------------+----+----+
3 rows in set

新增-主键表

insert into t_pk values(1,1,'sing','1','2');
insert into t_pk values(2,2,'jump','1','2');
insert into t_pk values(3,3,'rap','1','2');
Flink SQL> select * from t_pk;
+---------+---------+----------+----+----+
| user_id | item_id | behavior | dt | hh |
+---------+---------+----------+----+----+
|       1 |       1 |     sing |  1 |  2 |
|       2 |       2 |     jump |  1 |  2 |
|       3 |       3 |      rap |  1 |  2 |
+---------+---------+----------+----+----+
3 rows in set
insert into t_pk values(3,3,'basketball','1','2');
Flink SQL>  select * from t_pk;
+---------+---------+------------+----+----+
| user_id | item_id |   behavior | dt | hh |
+---------+---------+------------+----+----+
|       1 |       1 |       sing |  1 |  2 |
|       2 |       2 |       jump |  1 |  2 |
|       3 |       3 | basketball |  1 |  2 |
+---------+---------+------------+----+----+
3 rows in set

-- 我们发现,主键表,写入两条相同主键的数据,后者会覆盖前者
-- 主键表有一个默认引擎,默认是就是 'merge-engine' = 'deduplicate',因此才有这个效果

新增-分区表

insert into t_partition partition(dt='2024-10-08',hh='15')(user_id,item_id,behavior) values(1,1,'sing');
insert into t_partition partition(dt='2024-10-08',hh='15')(user_id,item_id,behavior) values(2,2,'jump');
insert into t_partition(user_id,item_id,behavior,dt,hh) values(3,3,'rap','2024-10-08','16');
insert into t_partition values(4,4,'basketball','2024-10-08','16');

Flink SQL> select * from t_partition;
+---------+---------+------------+------------+----+
| user_id | item_id |   behavior |         dt | hh |
+---------+---------+------------+------------+----+
|       1 |       1 |       sing | 2024-10-08 | 15 |
|       2 |       2 |       jump | 2024-10-08 | 15 |
|       3 |       3 |        rap | 2024-10-08 | 16 |
|       4 |       4 | basketball | 2024-10-08 | 16 |
+---------+---------+------------+------------+----+
4 rows in set


insert into t_partition as select * from t_sample;

insert into t_partition partition(dt='2099-10-08',hh='15')(user_id,item_id,behavior) select user_id,item_id,behavior from t_sample;

Flink SQL> select * from t_partition;
+---------+---------+-------------+------------+----+
| user_id | item_id |    behavior |         dt | hh |
+---------+---------+-------------+------------+----+
|       1 |       1 |        sing | 2024-10-08 | 15 |
|       2 |       2 |        jump | 2024-10-08 | 15 |
|     100 |     100 | sing-sample | 2099-10-08 | 15 |
|     101 |     101 | jump-sample | 2099-10-08 | 15 |
|     102 |     102 |  rap-sample | 2099-10-08 | 15 |
|       3 |       3 |         rap | 2024-10-08 | 16 |
|       4 |       4 |  basketball | 2024-10-08 | 16 |
|     100 |     100 | sing-sample |          1 |  2 |
|     101 |     101 | jump-sample |          1 |  2 |
|     102 |     102 |  rap-sample |          1 |  2 |
+---------+---------+-------------+------------+----+
10 rows in set

Flink SQL> insert overwrite t_partition(user_id,item_id,behavior) values(5,5,'non-partition');
[ERROR] Could not execute SQL statement. Reason:
org.apache.calcite.sql.validate.SqlValidatorException: Column 'dt' has no default value and does not allow NULLs

以上多种写入方式都支持,把分区字段当成普通字段用就行,但是分区字段不能为空

新增-覆盖写入

Flink SQL> select * from t_pk;
+---------+---------+------------+----+----+
| user_id | item_id |   behavior | dt | hh |
+---------+---------+------------+----+----+
|       1 |       1 |       sing |  1 |  2 |
|       2 |       2 |       jump |  1 |  2 |
|       3 |       3 | basketball |  1 |  2 |
+---------+---------+------------+----+----+

insert into t_pk values(10,10,'overwrite','1','2');

Flink SQL> select * from t_pk;
+---------+---------+-----------+----+----+
| user_id | item_id |  behavior | dt | hh |
+---------+---------+-----------+----+----+
|      10 |      10 | overwrite |  1 |  2 |
+---------+---------+-----------+----+----+
1 row in set

overwrite 会直接清空表,不会考虑主键


insert into t_partition partition(dt='2024-10-08',hh='15')(user_id,item_id,behavior) values(1,1,'sing');
insert into t_partition partition(dt='2024-10-08',hh='15')(user_id,item_id,behavior) values(2,2,'jump');
insert into t_partition partition(dt='2024-10-09',hh='15')(user_id,item_id,behavior) values(3,3,'rap');
Flink SQL> select * from t_partition;
+---------+---------+----------+------------+----+
| user_id | item_id | behavior |         dt | hh |
+---------+---------+----------+------------+----+
|       3 |       3 |      rap | 2024-10-09 | 15 |
|       1 |       1 |     sing | 2024-10-08 | 15 |
|       2 |       2 |     jump | 2024-10-08 | 15 |
+---------+---------+----------+------------+----+
3 rows in set

insert overwrite t_partition partition(dt='2024-10-08',hh='15')(user_id,item_id,behavior) values(4,4,'basketball');

Flink SQL> select * from t_partition;
+---------+---------+------------+------------+----+
| user_id | item_id |   behavior |         dt | hh |
+---------+---------+------------+------------+----+
|       3 |       3 |        rap | 2024-10-09 | 15 |
|       4 |       4 | basketball | 2024-10-08 | 15 |
+---------+---------+------------+------------+----+
2 rows in set

分区表只会overwrite 当前要写入分区


Flink SQL> select * from t_partition;
+---------+---------+----------+------------+----+
| user_id | item_id | behavior |         dt | hh |
+---------+---------+----------+------------+----+
|       3 |       3 |      rap | 2024-10-09 | 15 |
|       1 |       1 |     sing | 2024-10-08 | 15 |
|       2 |       2 |     jump | 2024-10-08 | 15 |
+---------+---------+----------+------------+----+
3 rows in set
-- 对指定分区写入空记录,没有效果
 INSERT OVERWRITE t_partition  PARTITION (`dt` = '2024-10-08', `hh` = '15') SELECT user_id,item_id,behavior FROM t_partition WHERE false;
 
 Flink SQL> select * from t_partition;
+---------+---------+----------+------------+----+
| user_id | item_id | behavior |         dt | hh |
+---------+---------+----------+------------+----+
|       3 |       3 |      rap | 2024-10-09 | 15 |
|       1 |       1 |     sing | 2024-10-08 | 15 |
|       2 |       2 |     jump | 2024-10-08 | 15 |
+---------+---------+----------+------------+----+
3 rows in set
 
 -- 对指定分区写入空记录,指定 /*+ OPTIONS('dynamic-partition-overwrite'='false') */,会清空指定的分区
 INSERT OVERWRITE t_partition  /*+ OPTIONS('dynamic-partition-overwrite'='false') */  PARTITION (`dt` = '2024-10-08', `hh` = '15') SELECT user_id,item_id,behavior FROM t_partition WHERE false;
Flink SQL> select * from t_partition;
+---------+---------+----------+------------+----+
| user_id | item_id | behavior |         dt | hh |
+---------+---------+----------+------------+----+
|       3 |       3 |      rap | 2024-10-09 | 15 |
+---------+---------+----------+------------+----+
1 row in set

 -- 不指定分区写入空记录,指定 /*+ OPTIONS('dynamic-partition-overwrite'='false') */,会清空所有分区,truncate
INSERT OVERWRITE t_partition /*+ OPTIONS('dynamic-partition-overwrite'='false') */ SELECT * FROM t_partition WHERE false;

Flink SQL> select * from t_partition;
Empty set

/*+ OPTIONS('dynamic-partition-overwrite'='false') */ Flink默认的覆盖模式是动态分区覆盖 (即Paimon只删除覆盖数据中出现的分区)。可以配置动态分区覆盖将其更改为静态覆盖。

paimon 没有truncate,因此我们可以借助overwite+静态覆盖,这个实现truncate

查询


修改

Important table properties setting:

  • Only primary key table supports this feature. 表必须有主键
  • MergeEngine needs to be deduplicate or partial-update to support this feature. 合并引擎必须为deduplicate,以后会支持partial-update
  • Do not support updating primary keys. 不能修改主键
  • Flink 版本1.17 及以上版本才支持
  • 必须是批处理模式
UPDATE table_identifier SET column1 = value1, column2 = value2, ... WHERE condition;

Flink SQL> select * from t_partition;
+---------+---------+-------------+----+----+
| user_id | item_id |    behavior | dt | hh |
+---------+---------+-------------+----+----+
|     100 |     100 | sing-sample |  1 |  2 |
|     101 |     101 | jump-sample |  1 |  2 |
|     102 |     102 |  rap-sample |  1 |  2 |
+---------+---------+-------------+----+----+
3 rows in set

update t_partition set behavior = 'baskteball-sample' where user_id =100;

Flink SQL> select * from t_partition;
+---------+---------+-------------------+----+----+
| user_id | item_id |          behavior | dt | hh |
+---------+---------+-------------------+----+----+
|     100 |     100 | baskteball-sample |  1 |  2 |
|     101 |     101 |       jump-sample |  1 |  2 |
|     102 |     102 |        rap-sample |  1 |  2 |
+---------+---------+-------------------+----+----+
3 rows in set

删除


常用属性

CREATE TABLE my_table (
    user_id BIGINT,
    item_id BIGINT,
    behavior STRING,
    dt STRING,
    hh STRING,
    PRIMARY KEY (dt, hh, user_id) NOT ENFORCED
) PARTITIONED BY (dt, hh);

统计数优化

paimon 会默认为每一列添加3个统计属性:最大值、最小值、null值数量

有四种配置来约束统计属性

  • full:为所有数据添加最大值、最小值、null值数量统计
  • truncate(length):截断length长度后,为所有数据添加最大值、最小值、null值数量统计,这个是默认值:默认length 16,为了避免长文本字段的统计
  • counts:只对null值数量统计
  • none:不统计

如果需要修改某个字段的统计属性

  • fields.{field_name}.stats-mode,with ( ‘fields.behavior.stats-mode’ = ‘full’ )

官网原文:https://paimon.apache.org/docs/master/flink/sql-ddl/#specify-statistics-mode
在这里插入图片描述

字段默认值

paimon表可以设置字段默认值,但是 不能 对主键字段设置默认值

如果需要修改某个字段的统计属性

  • fields.{field_name}.default-value
  • with ( ‘fields.behavior.default-value’ = ‘sing’ )

官网原文:https://paimon.apache.org/docs/master/flink/sql-ddl/#field-default-value
在这里插入图片描述

聚类写入

在批作业中配置该参数可以启用聚类写入功能,使数据在特定列上按大小范围聚集分布,从而提升该列的查询速度。只能在批处理或者append table(流处理)中使用。

多个列名请使用英文逗号(,)进行分隔,例如’col1,col2’。

  • sink.clustering.by-columns
  • with ( ‘sink.clustering.by-columns’ = ‘user_id,item_id’ )

也可以使用Hints

  • INSERT INTO my_table /*+ OPTIONS(‘sink.clustering.by-columns’ = ‘a,b’) */ SELECT * FROM source;

动态覆盖

Flink overwrite模式,在分区表中默认是动态分区覆盖,也就是说在使用overwrite时,只覆盖当前写入分区的数据,写入数据为空时,不进行覆盖,我们可以设置为静态覆盖,当写入数据为空时,也会覆盖。如果写入的分区为空则覆盖所有分区!

Hints,跟在表的后边,也就是声明本次sql的执行策略,dynamic-partition-overwrite=false > 静态覆盖,truncate的替代语法(TRUNCATE TABLE my_table 需要flink 1.18 之后才支持)
INSERT OVERWRITE my_table /*+ OPTIONS(‘dynamic-partition-overwrite’=‘false’) */

合并引擎

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2206336.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Java中二维数组-杨辉三角

使用二维数组打印一个10行杨辉三角 1 1 1 1 2 1 1 3 3 1 1 4 6 4 1 1 5 10 10 5 1 1)第一行有1个元素,第n行有n个元素 2)每一行的第一个元素和最后一个元素都是1 3)从第三行开始,对于非第一个元素和最后一个元素的元素…

差分注意力,负注意力的引入

文章目录 Differential Transformer差分注意力,负注意力的引入相关链接介绍初始化函数多头差分注意力 Differential Transformer差分注意力,负注意力的引入 相关链接 ai-algorithms/README.md at main Jaykef/ai-algorithms (github.com) unilm/Diff…

response和验证码、文件下载操作

目录 Response对象 案例: 1、完成重定向 2、服务器输出字符输出流到浏览器 3、服务器输出字节输出流到浏览器 4、验证码 ServletContext对象 Response对象 功能:设置响应消息 1、设置响应行 格式:HTTP/1.1 200 ok 设置状态码 se…

RabbitMQ 高级特性——死信队列

文章目录 前言死信队列什么是死信常见面试题死信队列的概念:死信的来源(造成死信的原因有哪些)死信队列的应用场景 前言 前面我们学习了为消息和队列设置 TTL 过期时间,这样可以保证消息的积压,那么对于这些过期了的消…

【更新】上市公司企业机构投资者实地调研数据(2013-2023年)

一、测算方式: 参考《会计研究》逯东(2019)老师的做法,考虑投资者实地调研的频率和可能性,设立了下述变量来衡量上市公司接待投资者调研情况: 首先,使用年度范围内接待投资者调研的总次数 ( Visitnmb) 作为…

卸载PLSQL及标准卸载流程

目录 1. 卸载PLSQL2. 删除注册表3. 删除数据信息 1. 卸载PLSQL 等待进度条走完 2. 删除注册表 regedit 右击删除 3. 删除数据信息 由于AppData是隐藏文件,需要勾选隐藏的项目。 重启电脑,PLSQL就卸载成功了。

低代码工单管理app评测,功能与效率解析

预计到2030年,低代码平台市场将达1870亿美元。ZohoCreator助力企业构建定制化软件应用,以建筑行业工作订单管理app为例,简化流程,提升管理效率,降低成本。其用户友好界面、自动化管理、跨平台使用及全面报告功能受企业…

项目优化内容及实战

文章目录 事前思考Prometheus 普罗米修斯概述架构安装及使用 Grafana可视化数据库读写分离实战1-PrometheusGrafanaspringboot 事前思考 需要了解清楚:需要从哪些角度去分析实现?使用了缓存,就需要把缓存命中率数据进行收集;使用…

企业在隔离网环境下如何进行安全又稳定的跨网文件交换?

在数字化时代,企业的数据流通如同血液一般重要。然而,当企业内部实施了隔离网环境,跨网文件交换就成了一个棘手的问题。今天我们将探讨在隔离网环境下,企业面临的跨网文件交换挑战,以及如何通过合规的跨网文件交换系统…

数字电路——触发器1(RS和钟控触发器)

触发器:能够存储一位二进制信息的基本单元电路称触发器(Flip-Flop) 特点: 具有两个能自行保持的稳定状态,用来表示逻辑状态的“0”或“1”。具有一对互补输出。有一组控制(激励、驱动)输入。或许有定时(时钟)端CP(Clock Pulse)。在输入信号…

PostgreSQL 16.4安装以及集群部署

1. 环境准备 1.1 主机环境 主机 IP: 192.24.215.121操作系统: CentOS 9PostgreSQL 版本: 16.4 1.2 从机环境 从机 IP: 192.24.215.122操作系统: CentOS 9PostgreSQL 版本: 16.4 2. 安装 PostgreSQL 16.4 在主从两台机器上都需要安装 PostgreSQL 16.4。 2.1 添加 Postgre…

银行卡基础信息查询 API 对接说明

本文将介绍一种 银行卡基础信息查询 API 对接说明,它可用于银行卡基础信息查询。 接下来介绍下 银行卡基础信息查询 API 的对接说明。 申请流程 要使用 API,需要先到 银行卡基础信息查询 API 对应页面申请对应的服务,进入页面之后&#xf…

Python自定义异常类:实际应用示例之最佳实践

Python自定义异常类:实际应用示例之最佳实践 前言 在软件开发中,合理处理异常是保证程序稳定性的重要环节。虽然 Python 内置了丰富的异常类型,但在处理复杂业务逻辑时,自定义异常类能够使代码更加清晰且具备可扩展性。 本文将…

一个架构师的职业素养:四种常用的权限模型

你好,我是看山。 本文收录在《一个架构师的职业素养》专栏。日拱一卒,功不唐捐。 今天咱们一起聊聊权限系统。 以大家熟知的电商场景举例: 用户可以分为普通用户、VIP用户:我们需要控制不同角色用户的访问范围。比如,京东的PLUS会员,可以进入会员专区,而且能够使用礼金…

ESP32接入扣子(Coze) API使用自定义智能体

使用ESP32接入Coze API实现聊天机器人的教程 本示例将使用ESP32开发板通过WiFi接入 Coze API,实现一个简单的聊天机器人功能。用户可以通过串口向机器人输入问题,ESP32将通过Coze API与智能体进行通信,并返回对应的回复。本文将详细介绍了如…

OpenGL 进阶系列03 - OpenGL实例化渲染来提高性能

目录 一:概述 二:实例化渲染的优点: 三:OpenGL实例化渲染的例子: 一:概述 OpenGL 实例化渲染(Instanced Rendering)是一种渲染技术,可以有效地绘制多个相同对象,而不需要为每个对象单独提交绘制调用。通过这种方式,可以显著提高渲染性能,尤其是在需要绘制大量相…

【每日刷题】Day137

【每日刷题】Day137 🥕个人主页:开敲🍉 🔥所属专栏:每日刷题🍍 🌼文章目录🌼 1. 1576. 替换所有的问号 - 力扣(LeetCode) 2. 495. 提莫攻击 - 力扣&#xf…

【数据结构与算法】线性表顺序存储结构

文章目录 一.顺序表的存储结构定义1.1定义1.2 图示1.3结构代码*C语言的内存动态分配 二.顺序表基本运算*参数传递2.1建立2.2初始化(InitList(&L))2.3销毁(DestroyList(&L))2.4判断线性表是否为空表(ListEmpty(L))2.5求线性表的长度(ListLength(L))2.6输出线性表(DispLi…

基于GoogleNet深度学习网络的手语识别算法matlab仿真

目录 1.算法运行效果图预览 2.算法运行软件版本 3.部分核心程序 4.算法理论概述 5.算法完整程序工程 1.算法运行效果图预览 (完整程序运行后无水印) 手语How are you,测试识别结果如下: 手语I am fine,测试识别结果如下: 手…

java入门和Java语法

Java直接运行源代码文件,不会产生HelloWorld.class第二种方法:把模块放在D盘下,然后导入 第三种方法:新建一个模块,然后把内容复制过去 byte l 12; short m l; System.out.println(m); char n a; int reason mn; Sy…