FlinkSql 窗口函数

news2025/1/15 17:26:52

Windowing TVF

以前用的是Grouped Window Functions(分组窗口函数),但是分组窗口函数只支持窗口聚合

现在FlinkSql统一都是用的是Windowing TVFs(窗口表值函数),Windowing TVFs更符合 SQL 标准且更加强大,支持window join、Window aggregations、Window Top-N、Window Deduplication 

Windowing TVFs是 Flink 定义的多态表函数(Polymorphic Table Function,缩写PTF),PTF 是 SQL 2016 标准中的一种特殊的表函数,它可以把表作为一个参数

窗口函数

Flink 认为窗口把流分割为有限大小的 “桶”,这样就可以在其之上进行计算

有以下几种用法

  • 滚动窗口
  • 滑动窗口
  • 累积窗口
  • 会话窗口 (即将支持)

滚动窗口(TUMBLE)

TUMBLE 函数指定每个元素到一个指定大小的窗口中。滚动窗口的大小固定且不重复。

例如:假设指定了一个 5 分钟的滚动窗口。Flink 将每 5 分钟生成一个新的窗口,如下图所示:

TUMBLE 函数通过时间属性字段为每行数据分配一个窗口。 在流计算模式,时间属性字段必须被指定为事件或处理时间属性。在批计算模式,这个窗口表函数的时间属性字段必须是 TIMESTAMP 或 TIMESTAMP_LTZ 的类型

--TUMBLE(TABLE data, DESCRIPTOR(timecol), size [, offset ])
data :拥有时间属性列的表。
timecol :列描述符,决定数据的哪个时间属性列应该映射到窗口。
size :窗口的大小(时长)。
offset :窗口的偏移量 [非必填]。

SELECT * FROM TABLE(
   TUMBLE(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '10' MINUTES));

SELECT window_start, window_end, SUM(price)
  FROM TABLE(
    TUMBLE(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '10' MINUTES))
  GROUP BY window_start, window_end;

滑动窗口(HOP)

滑动窗口函数指定元素到一个定长的窗口中。和滚动窗口很像,有窗口大小参数,另外增加了一个窗口滑动步长参数。如果滑动步长小于窗口大小,就能产生数据重叠的效果。在这个例子里,数据可以被分配在多个窗口。

例如:可以定义一个每5分钟滑动一次。大小为10分钟的窗口。每5分钟获得最近10分钟到达的数据的窗口,如下图所示:

HOP 函数通过时间属性字段为每一行数据分配了一个窗口。 在流计算模式,这个时间属性字段必须被指定为事件或处理时间属性。在批计算模式,这个窗口表函数的时间属性字段必须是 TIMESTAMP 或 TIMESTAMP_LTZ 的类型

-- HOP(TABLE data, DESCRIPTOR(timecol), slide, size [, offset ])
data:拥有时间属性列的表。
timecol:列描述符,决定数据的哪个时间属性列应该映射到窗口。
slide:窗口的滑动步长。
size:窗口的大小(时长)。
offset:窗口的偏移量 [非必填]。

SELECT * FROM TABLE(
    HOP(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '5' MINUTES, INTERVAL '10' MINUTES));

SELECT window_start, window_end, SUM(price)
  FROM TABLE(
    HOP(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '5' MINUTES, INTERVAL '10' MINUTES))
  GROUP BY window_start, window_end;

累积窗口(CUMULATE)

CUMULATE 函数指定元素到多个窗口,从初始的窗口开始,直到达到最大的窗口大小的窗口,所有的窗口都包含其区间内的元素,另外,窗口的开始时间是固定的。 你可以将 CUMULATE 函数视为首先应用具有最大窗口大小的 TUMBLE 窗口,然后将每个滚动窗口拆分为具有相同窗口开始但窗口结束步长不同的几个窗口。 所以累积窗口会产生重叠并且没有固定大小。

例如:1小时步长,24小时大小的累计窗口,每天可以获得如下这些窗口:[00:00, 01:00)[00:00, 02:00)[00:00, 03:00), …, [00:00, 24:00)

-- CUMULATE(TABLE data, DESCRIPTOR(timecol), step, size)
data:拥有时间属性列的表。
timecol:列描述符,决定数据的哪个时间属性列应该映射到窗口。
step:指定连续的累积窗口之间增加的窗口大小。
size:指定累积窗口的最大宽度的窗口时间。size必须是step的整数倍。
offset:窗口的偏移量 [非必填]。


SELECT * FROM TABLE(
    CUMULATE(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '2' MINUTES, INTERVAL '10' MINUTES));

SELECT window_start, window_end, SUM(price)
  FROM TABLE(
    CUMULATE(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '2' MINUTES, INTERVAL '10' MINUTES))
  GROUP BY window_start, window_end;

窗口偏移

上诉窗口都有一个 offset 参数,默认值就是 0,所以窗口默认都是整点启动的

比如10分钟的滚动窗口:TUMBLE(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '10' MINUTES),只会生成[2021-06-29 23:40:00, 2021-06-29 00:50:00),[2021-06-29 23:50:00, 2021-06-30 00:00:00),window_start 和 window_end 和数据的时间无关

offset 就是用来调整窗口偏移的,当 offset 为 -16 MINUTE,时间戳为 2021-06-30 00:00:04 的数据会分配到窗口 [2021-06-29 23:54:00, 2021-06-30 00:04:00)。

窗口函数进阶用法

flink开窗需要写上windowend,否则只是带了一个windowstart的时间而已,并没有真正开启窗口

Window Aggregation

窗口聚合是通过 GROUP BY 子句定义的,其特征是包含 窗口表值函数 产生的 “window_start” 和 “window_end” 列。和普通的 GROUP BY 子句一样,窗口聚合对于每个组会计算出一行数据。

SELECT window_start, window_end, SUM(price)
  FROM TABLE(
    TUMBLE(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '10' MINUTES))
  GROUP BY window_start, window_end;
+------------------+------------------+-------+
|     window_start |       window_end | price |
+------------------+------------------+-------+
| 2020-04-15 08:00 | 2020-04-15 08:10 | 11.00 |
| 2020-04-15 08:10 | 2020-04-15 08:20 | 10.00 |
+------------------+------------------+-------+

并且支持多级窗口聚合 

-- tumbling 5 minutes for each supplier_id
CREATE VIEW window1 AS
-- Note: The window start and window end fields of inner Window TVF are optional in the select clause. However, if they appear in the clause, they need to be aliased to prevent name conflicting with the window start and window end of the outer Window TVF.
SELECT window_start as window_5mintumble_start, window_end as window_5mintumble_end, window_time as rowtime, SUM(price) as partial_price
  FROM TABLE(
    TUMBLE(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '5' MINUTES))
  GROUP BY supplier_id, window_start, window_end, window_time;

-- tumbling 10 minutes on the first window
SELECT window_start, window_end, SUM(partial_price) as total_price
  FROM TABLE(
      TUMBLE(TABLE window1, DESCRIPTOR(rowtime), INTERVAL '10' MINUTES))
  GROUP BY window_start, window_end;

下面是分组窗口聚合的写法,分组窗口聚合已经过时,官网不推荐使用了

SELECT
  user,
  TUMBLE_START(order_time, INTERVAL '1' DAY) AS wStart,
  SUM(amount) FROM Orders
GROUP BY
  TUMBLE(order_time, INTERVAL '1' DAY),
  user

Window Join

在流式查询中,与其他连续表上的关联不同,窗口关联不产生中间结果,只在窗口结束产生一个最终的结果。另外,窗口关联会清除不需要的中间状态

目前使用时有一些限制:

目前,窗口关联需要在 join on 条件中包含两个输入表的 window_start 等值条件和 window_end 等值条件

目前,关联的左右两边必须使用相同的窗口表值函数。这个规则在未来可以扩展,比如:滚动和滑动窗口在窗口大小相同的情况下 join。

语法上支持 INNER、LEFT、RIGHT、FULL OUTER、ANTI、SEMI JOIN。而且,窗口关联可以在其他基于 窗口表值函数 的操作后使用,例如 窗口聚合,窗口 Top-N 和 窗口关联

SELECT l.id as l_id,r.id as r_id,l.window_start,l.window_end
FROM (
     SELECT * FROM TABLE(TUMBLE(TABLE t_left, DESCRIPTOR(row_time), INTERVAL '5' MINUTES))
     ) l
INNER JOIN (
     SELECT * FROM TABLE(TUMBLE(TABLE t_right, DESCRIPTOR(row_time), INTERVAL '5' MINUTES))
      ) r
ON l.id = r.id 
AND l.window_start = r.window_start 
AND l.window_end = r.window_end;

Window TopN

与普通Top-N不同,窗口Top-N只在窗口最后返回汇总的Top-N数据,不会产生中间结果。窗口 Top-N 会在窗口结束后清除不需要的中间状态

窗口 Top-N 适用于用户不需要每条数据都更新Top-N结果的场景,相对普通Top-N来说性能更好

SELECT *
  FROM (
    SELECT *, ROW_NUMBER() OVER (PARTITION BY window_start, window_end ORDER BY price DESC) as rownum
    FROM TABLE(
               TUMBLE(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '10' MINUTES))
  ) WHERE rownum <= 3;

还可以在窗口聚合后在进行窗口 Top-N

SELECT *
  FROM (
    SELECT *, ROW_NUMBER() OVER (PARTITION BY window_start, window_end ORDER BY price DESC) as rownum
    FROM (
      SELECT window_start, window_end, supplier_id, SUM(price) as price, COUNT(*) as cnt
      FROM TABLE(
        TUMBLE(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '10' MINUTES))
      GROUP BY window_start, window_end, supplier_id
    )
  ) WHERE rownum <= 3;

Window Deduplication

窗口去重是一种特殊的 去重,它根据指定的多个列来删除重复的行,保留每个窗口和分区键的第一个或最后一个数据

对于流式查询,与普通去重不同,窗口去重只在窗口的最后返回结果数据,不会产生中间结果。它会清除不需要的中间状态。 因此,窗口去重查询在用户不需要更新结果时,性能较好

Window Deduplication是一种特殊的窗口 Top-N:N是1并且是根据处理时间或事件时间排序的(目前只支持根据事件时间属性进行排序),支持在其他窗口操作上进行去重操作,比如 窗口聚合,窗口TopN 和 窗口关联

SELECT *
  FROM (
    SELECT bidtime, price, item, supplier_id, window_start, window_end, 
      ROW_NUMBER() OVER (PARTITION BY window_start, window_end ORDER BY bidtime DESC) AS rownum
    FROM TABLE(
               TUMBLE(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '10' MINUTES))
  ) WHERE rownum <= 1;

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1439650.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

[项目管理] 如何使用git客户端管理gitee的私有仓库

最近发现即使翻墙也无法g使用ithub了&#xff0c;需要把本地的项目搬迁到新的git托管平台。 gitee 是一个国内开源项目托管平台&#xff0c;是开源开发者、团队、个人进行 git 代码管理和协作的首选平台之一。本文将详细介绍如何向 gitee 提交私有项目。 注册 Gitee 账号并创建…

AD域国产替代方案,助力某金融企业麒麟信创电脑实现“真替真用”

近期收到不少企业客户反馈采购的信创PC电脑用不起来&#xff0c;影响信创改造的进度。例如&#xff0c;某金融企业积极响应国产化信创替代战略&#xff0c;购置了一批麒麟操作系统电脑。分发使用中发现了如下问题&#xff1a; • 当前麒麟操作系统电脑无法做到统一身份认证&…

K8S系列文章之 [使用 Alpine 搭建 k3s]

官方文档&#xff1a;K3s - 轻量级 Kubernetes | K3s 官方描述&#xff0c;可运行在 systemd 或者 openrc 环境上&#xff0c;那就往精简方向走&#xff0c;使用 alpine 做系统。与 RHEL、Debian 的区别&#xff0c;主要在防火墙侧&#xff1b;其他基础配置需求类似&#xff0…

jmeter的简单使用

1、打开jmeter 打开Jmeter 安装包&#xff0c;进入\bin 中&#xff0c;找到“ApacheJMeter.jar”或"jmeter.bat", 双击打开即可 2、建立线程组 如下图所示&#xff0c;右击TestPlan&#xff0c;点击ADD->Threads(Users)->ThreadGroup 线程组页面分析&#xf…

IS-IS 接口认证密码平滑更换

拓扑图 配置 AR1、AR2建立ISIS level-2邻居关系&#xff0c;并配置接口认证密码为huawei sysname AR1 # isis 1is-level level-2network-entity 49.0000.0000.0000.0001.00 # interface GigabitEthernet0/0/0ip address 12.1.1.1 255.255.255.0 isis enable 1isis authentica…

ThinkPHP 中使用Redis

环境.env [app] app_debug "1" app_trace ""[database] database "" hostname "127.0.0.1" hostport "" password "" prefix "ls_" username ""[redis] hostname "127.0.0.1…

【C语言】通过socket看系统调用过程

一、通过socket看系统调用过程 在Linux操作系统中&#xff0c;系统调用是用户空间与内核空间之间交互的一种方式。当一个应用程序需要执行操作系统级别的任务时&#xff0c;比如创建一个网络套接字&#xff08;socket&#xff09;&#xff0c;它必须通过系统调用请求内核来执行…

SQL在云计算中的新角色:重新定义数据分析

文章目录 1. 云计算与数据分析的融合2. SQL在云计算中的新角色3. 分布式SQL查询引擎4. SQL-on-Hadoop解决方案5. SQL与其他数据分析工具的集成6. 实时数据分析与SQL7. SQL在云数据仓库中的角色8. 安全性与隐私保护9. SQL的未来展望《SQL数据分析实战&#xff08;第2版&#xff…

Snipaste使用

今天推荐一款好用的截图、贴图软件工具&#xff0c;名字叫Snipaste&#xff0c;以下是官方介绍的截图 软件官方下载地址&#xff1a; Snipaste 下载 1、截图功能 2、标注 3、开发中的使用 有时候在开发中需要临时把一些任务规则信息&#xff0c;放在代码编辑器旁边进行参考&am…

蓝桥杯每日一解

可以看看a的ascii码为6532 而A为ascii码为65&#xff0c;大小写相差32位 #include <iostream>using namespace std; int main(){int n;cin >> n;char a;for (int i 1;i<n;i){while(scanf("%c",&a) ! EOF){//无限输入直到输入到空格if(a a || a …

【多模态大模型】GLIP:零样本学习 + 目标检测 + 视觉语言大模型

GLIP 核心思想GLIP 对比 BLIP、BLIP-2、CLIP 主要问题: 如何构建一个能够在不同任务和领域中以零样本或少样本方式无缝迁移的预训练模型&#xff1f;统一的短语定位损失语言意识的深度融合预训练数据类型的结合语义丰富数据的扩展零样本和少样本迁移学习 效果 论文&#xff1a;…

OpenShift AI - 运行欺诈检测模型和流程

《OpenShift / RHEL / DevSecOps 汇总目录》 说明&#xff1a;本文已经在 OpenShift 4.14 RHODS 2.50 的环境中验证 文章目录 准备运行环境安装 OpenShift AI 环境安装 Minio 对象存储软件创建 Data Science Project创建 Data connection创建 Workbench配置 Model server创建 …

python调用golang中函数方法

一、原因说明&#xff1a;由于simhash方法有多种实现方式&#xff0c;现python中simhash方法与golang中的不一样&#xff0c;需要两者代码生成结果保持一致&#xff0c;故采用python中的代码调用golang编译的so文件来实现。 环境配置&#xff1a;①Windows10系统要有gcc环境&a…

【Git版本控制 03】远程操作

目录 一、克隆远程仓库 二、推送远程仓库 三、拉取远程仓库 四、忽略特殊文件 五、命令配置别名 一、克隆远程仓库 Git是分布式版本控制系统&#xff0c;同⼀个Git仓库&#xff0c;可以分布到不同的机器上。怎么分布呢&#xff1f; 找⼀台电脑充当服务器的⻆⾊&#xff…

幻方(Magic Square)

幻方&#xff08;Magic Square&#xff09; 幻方概述 什么是幻方呢&#xff1f;幻方&#xff08;Magic Square&#xff09;就是指在nn&#xff08;n行n列&#xff09;的方格里填上一些连续的数字&#xff0c;使任意一行、任意一列和对角线上的数字的和都相等。例如有33的3行3…

【操作系统】MacOS虚拟内存统计指标

目录 命令及其结果 参数解读 有趣的实验 在 macOS 系统中&#xff0c;虚拟内存统计指标提供了对系统内存使用情况和虚拟内存操作的重要洞察。通过分析这些指标&#xff0c;我们可以更好地了解系统的性能状况和内存管理情况。 命令及其结果 >>> vm_stat Mach Virtu…

电力负荷预测 | 基于TCN的电力负荷预测(Python)———数据预处理

文章目录 效果一览文章概述源码设计参考资料效果一览 文章概述 基于TCN的电力负荷预测(Python) python3.8 keras2.6.0 matplotlib3.5.2 numpy1.19.4 pandas1.4.3 tensorflow==2.6.0

电赛相关——自制模块1

目录 一、双电源供电 原理图 PCB 仿真图 制板文件 二、单电源供电 原理图 PCB 仿真图 制板文件 该模块使用一个双运放&#xff0c;实现对输入信号改变幅度&#xff08;放大或衰减&#xff0c;可调节&#xff0c;索性叫它变幅器吧&#xff09;以及隔离缓存&#xff08…

一文读懂转融通

最近多家公司都在讲解关于转融通要求。今天我们就来详细聊聊&#xff01; 转融通是一种证券借贷机制&#xff0c;它允许机构投资者在融资融券交易中借入或借出证券。 具体来说&#xff0c;机构投资者可以向券商借入证券&#xff0c;或者将持有的证券借给券商&#xff0c;以扩大…

Golang 基础 Go Modules包管理

Golang 基础 Go Modules包管理 在 Go 项目开发中&#xff0c;依赖包管理是一个非常重要的内容&#xff0c;依赖包处理不好&#xff0c;就会导致编译失败&#xff0c;本文将系统介绍下 Go 的依赖包管理工具。 我会首先介绍下 Go 依赖包管理工具的历史&#xff0c;并详细介绍下…