Flink SQL时间属性和窗口介绍

news2025/1/11 23:42:55

(1)概述

时间属性(time attributes),其实就是每个表模式结构(schema)的一部分。它可以在创建表的 DDL 里直接定义为一个字段,也可以在 DataStream 转换成表时定义。

一旦定义了时间属性,它就可以作为一个普通字段引用,并且可以在基于时间的操作中使用。

时间属性的数据类型为 TIMESTAMP,它的行为类似于常规时间戳,可以直接访问并且进行计算。

按照时间语义的不同,可以把时间属性的定义分成事件时间(event time)和处理时间(processing time)两种情况。

(2)事件时间

1)在创建表的 DDL 中定义

在创建表的 DDL(CREATE TABLE 语句)中,可以增加一个字段,通过 WATERMARK语句来定义事件时间属性。

WATERMARK 语句主要用来定义水位线(watermark)的生成表达式,这个表达式会将带有事件时间戳的字段标记为事件时间属性,并在它基础上给出水位线的延迟时间。

具体定义方式如下:

CREATE TABLE EventTable(
     user STRING,
     url STRING,
     ts TIMESTAMP(3),
     WATERMARK FOR ts AS ts - INTERVAL '5' SECOND
) WITH (
 ...
);

把 ts 字段定义为事件时间属性,而且基于 ts 设置了 5 秒的水位线延迟。这里的“5 秒”是以“时间间隔”的形式定义的,格式是 INTERVAL <数值> <时间单位>:INTERVAL '5’ SECOND,这里的数值必须用单引号引起来,而单位用 SECOND 和 SECONDS 是等效的。

Flink 中支持的事件时间属性数据类型必须为 TIMESTAMP 或者 TIMESTAMP_LTZ。这里TIMESTAMP_LTZ 是指带有本地时区信息的时间戳(TIMESTAMP WITH LOCAL TIME ZONE);

如数据中的时间戳是“年-月-日-时-分-秒”形式,那就是不带时区信息的,可以将事件时间属性定义为 TIMESTAMP 类型。
而如果原始的时间戳就是一个长整型的毫秒数,这时就需要另外定义一个字段来表示事件时间属性,类型定义为 TIMESTAMP_LTZ 会更方便:

CREATE TABLE events (
 user STRING,
 url STRING,
 ts BIGINT,
 ts_ltz AS TO_TIMESTAMP_LTZ(ts, 3),
 WATERMARK FOR ts_ltz AS ts_ltz - INTERVAL '5' SECOND
) WITH (
 ...
);

这里我们另外定义了一个字段 ts_ltz,是把长整型的 ts 转换为 TIMESTAMP_LTZ 得到的;进而使用 WATERMARK 语句将它设为事件时间属性,并设置 5 秒的水位线延迟。

2)在数据流转换为表时定义

调用 fromDataStream()方法创建表时,可以追加参数来定义表中的字段结构;这时可以给某个字段加上.rowtime() 后缀,就表示将当前字段指定为事件时间属性。

这个字段可以是数据中本不存在、额外追加上去的“逻辑字段”,就像之前 DDL 中定义的第二种情况;也可以是本身固有的字段,那么这个字段就会被事件时间属性所覆盖,类型也会被转换为 TIMESTAMP。

不论哪种方式,时间属性字段中保存的都是事件的时间戳(TIMESTAMP 类型)。

需要注意的是,这种方式只负责指定时间属性,而时间戳的提取和水位线的生成应该之前就在 DataStream 上定义好了。

由于 DataStream 中没有时区概念,因此 Flink 会将事件时间属性解析成不带时区的 TIMESTAMP 类型,所有的时间值都被当作 UTC 标准时间。

在代码中的定义方式如下:

// 方法一:
// 流中数据类型为二元组 Tuple2,包含两个字段;需要自定义提取时间戳并生成水位线
DataStream<Tuple2<String, String>> stream = inputStream.assignTimestampsAndWatermarks(...);

// 声明一个额外的逻辑字段作为事件时间属性
Table table = tEnv.fromDataStream(stream, $("user"), $("url"),$("ts").rowtime());


// 方法二:
// 流中数据类型为三元组 Tuple3,最后一个字段就是事件时间戳
DataStream<Tuple3<String, String, Long>> stream = inputStream.assignTimestampsAndWatermarks(...);

// 不再声明额外字段,直接用最后一个字段作为事件时间属性
Table table = tEnv.fromDataStream(stream, $("user"), $("url"),$("ts").rowtime());

(3)处理时间

系统时间,使用时不需要提取时间戳(timestamp)和生成水位线(watermark)。因此在定义处理时间属性时,必须要额外声明一个字段,专门用来保存当前的处理时间。

类似地,处理时间属性的定义也有两种方式:创建表 DDL 中定义,或者在数据流转换成表时定义。

1)在创建表的 DDL 中定义

在创建表的 DDL(CREATE TABLE 语句)中,可以增加一个额外的字段,通过调用系统内置的 PROCTIME()函数来指定当前的处理时间属性,返回的类型是 TIMESTAMP_LTZ。

CREATE TABLE EventTable(
     user STRING,
     url STRING,
     ts AS PROCTIME()
) WITH (
 ...
);

时间属性,以“计算列”(computed column)的形式定义出来的。所谓的计算列是 Flink SQL 中引入的特殊概念,可以用一个 AS 语句来在表中产生数据中不存在的列,并且可以利用原有的列、各种运算符及内置函数。

在前面事件时间属性的定义中,将 ts 字段转换成 TIMESTAMP_LTZ 类型的 ts_ltz,也是计算列的定义方式。

2)在数据流转换为表时定义

处理时间属性同样可以在将 DataStream 转换为表的时候来定义。我们调用 fromDataStream()方法创建表时,可以用.proctime()后缀来指定处理时间属性字段。

由于处理时间是系统时间,原始数据中并没有这个字段,所以处理时间属性一定不能定义在一个已有字段上,只能定义在表结构所有字段的最后,作为额外的逻辑字段出现。

代码中定义处理时间属性的方法如下:

DataStream<Tuple2<String, String>> stream = ...;

// 声明一个额外的字段作为处理时间属性字段
Table table = tEnv.fromDataStream(stream, $("user"), $("url"),$("ts").proctime());

(4)窗口(Window)

1)分组窗口(Group Window,老版本)

在 Flink 1.12 之前的版本中,Table API 和 SQL 提供了一组“分组窗口”(Group Window)函数,常用的时间窗口如滚动窗口、滑动窗口、会话窗口都有对应的实现;具体在 SQL 中就是调用 TUMBLE()、HOP()、SESSION(),传入时间属性字段、窗口大小等参数就可以了。

以滚动窗口为例:

TUMBLE(ts, INTERVAL '1' HOUR)

这里的 ts 是定义好的时间属性字段,窗口大小用“时间间隔”INTERVAL 来定义。在进行窗口计算时,分组窗口是将窗口本身当作一个字段对数据进行分组的,可以对组内的数据进行聚合。

基本使用方式如下:

Table result = tableEnv.sqlQuery(
 "SELECT " +
 "user, " +
"TUMBLE_END(ts, INTERVAL '1' HOUR) as endT, " +
 "COUNT(url) AS cnt " +
 "FROM EventTable " +
 "GROUP BY " + // 使用窗口和用户名进行分组
 "user, " +
 "TUMBLE(ts, INTERVAL '1' HOUR)" // 定义 1 小时滚动窗口
 );

这里定义了 1 小时的滚动窗口,将窗口和用户 user 一起作为分组的字段。用聚合函数COUNT()对分组数据的个数进行了聚合统计,并将结果字段重命名为cnt;用TUPMBLE_END() 函数获取滚动窗口的结束时间,重命名为 endT 提取出来。

分组窗口的功能比较有限,只支持窗口聚合,所以目前已经处于弃用(deprecated)的状态。

2)窗口表值函数(Windowing TVFs,新版本)

从 1.13 版本开始,Flink 开始使用窗口表值函数(Windowing table-valued functions,Windowing TVFs)来定义窗口。

窗口表值函数是 Flink 定义的多态表函数(PTF),可以将表进行扩展后返回。表函数(table function)可以看作是返回一个表的函数。

目前 Flink 提供了以下几个窗口 TVF:
1.滚动窗口(Tumbling Windows);
2.滑动窗口(Hop Windows,跳跃窗口);
3.累积窗口(Cumulate Windows);
4.会话窗口(Session Windows,目前尚未完全支持)。

1.概述

窗口表值函数可以完全替代传统的分组窗口函数。窗口 TVF 更符合 SQL 标准,性能得到了优化,拥有更强大的功能;可以支持基于窗口的复杂计算,例如窗口 Top-N、窗口联结(window join)等等。

在窗口 TVF 的返回值中,除去原始表中的所有列,还增加了用来描述窗口的额外 3 个列:“窗口起始点(window_start)、“窗口结束点”(window_end)、“窗口时间”(window_time)。

起始点和结束点比较好理解,这里的“窗口时间”指的是窗口中的时间属性,它的值等于 window_end - 1ms,所以相当于是窗口中能够包含数据的最大时间戳。

在 SQL 中的声明方式,与以前的分组窗口是类似的,直接调用 TUMBLE()、HOP()、CUMULATE()就可以实现滚动、滑动和累积窗口,不过传入的参数会有所不同。

2.滚动窗口(TUMBLE)

滚动窗口在 SQL 中的概念与 DataStream API 中的定义完全一样,是长度固定、时间对齐、无重叠的窗口,一般用于周期性的统计计算。

在 SQL 中通过调用 TUMBLE()函数就可以声明一个滚动窗口,只有一个核心参数就是窗口大小(size)。

在 SQL 中不考虑计数窗口,所以滚动窗口就是滚动时间窗口,参数中还需要将当前的时间属性字段传入;另外,窗口 TVF 本质上是表函数,可以对表进行扩展,所以还应该把当前查询的表作为参数整体传入。具体声明如下:

TUMBLE(TABLE EventTable, DESCRIPTOR(ts), INTERVAL '1' HOUR)

这里基于时间字段 ts,对表 EventTable 中的数据开了大小为 1 小时的滚动窗口。窗口会将表中的每一行数据,按照它们 ts 的值分配到一个指定的窗口中。

(2)滑动窗口(HOP)

滑动窗口的使用与滚动窗口类似,可以通过设置滑动步长来控制统计输出的频率。在 SQL中通过调用 HOP()来声明滑动窗口;除了也要传入表名、时间属性外,还需要传入窗口大小(size)和滑动步长(slide)两个参数。

HOP(TABLE EventTable, DESCRIPTOR(ts), INTERVAL '5' MINUTES, INTERVAL '1' HOURS));

这里我们基于时间属性 ts,在表 EventTable 上创建了大小为 1 小时的滑动窗口,每 5 分钟滑动一次。需要注意的是,紧跟在时间属性字段后面的第三个参数是步长(slide),第四个参数才是窗口大小(size)。

(3)累积窗口(CUMULATE)

滚动窗口和滑动窗口,可以用来计算大多数周期性的统计指标。

在实际应用中还会遇到这样一类需求:我们的统计周期可能较长,因此希望中间每隔一段时间就输出一次当前的统计值;

与滑动窗口不同的是,在一个统计周期内,我们会多次输出统计值,它们应该是不断叠加累积的。

例如,我们按天来统计网站的 PV(Page View,页面浏览量),如果用 1 天的滚动窗口,那需要到每天 24 点才会计算一次,输出频率太低;如果用滑动窗口,计算频率可以更高,但统计的就变成了“过去 24 小时的 PV”。

所以我们真正希望的是,还是按照自然日统计每天的PV,不过需要每隔 1 小时就输出一次当天到目前为止的 PV 值。这种特殊的窗口就叫作“累积窗口”(Cumulate Window)。

在这里插入图片描述

累积窗口是窗口 TVF 中新增的窗口功能,它会在一定的统计周期内进行累积计算。

累积窗口中有两个核心的参数:最大窗口长度(max window size)和累积步长(step)。

所谓的最大窗口长度其实就是我们所说的“统计周期”,最终目的就是统计这段时间内的数据。开始时,创建的第一个窗口大小就是步长 step;之后的每个窗口都会在之前的基础上再扩展 step 的长度,直到达到最大窗口长度。

在 SQL 中可以用 CUMULATE()函数来定义,具体如下:

CUMULATE(TABLE EventTable, DESCRIPTOR(ts), INTERVAL '1' HOURS, INTERVAL '1' DAYS))

这里我们基于时间属性 ts,在表 EventTable 上定义了一个统计周期为 1 天、累积步长为 1小时的累积窗口。注意第三个参数为步长 step,第四个参数则是最大窗口长度。

上面所有的语句只是定义了窗口,类似于 DataStream API 中的窗口分配器;在 SQL 中窗口的完整调用,还需要配合聚合操作和其它操作。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1173340.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

一文通透各种注意力:从多头注意力MHA到分组查询注意力GQA、多查询注意力MQA

第一部分 多头注意力 // 待更 第二部分 LLaMA2之分组查询注意力——Grouped-Query Attention 自回归解码的标准做法是缓存序列中先前标记的键 (K) 和值 (V) 对&#xff0c;从而加快注意力计算速度 然而&#xff0c;随着上下文窗口或批量大小的增加&#xff0c;多头注意力 (MH…

Buuctf-Crypto-之深夜刷题部分wp

萌萌哒的八戒 首先下载好附件&#xff0c;解压&#xff0c;是一幅猪图&#xff0c;图的下方是一串看不懂的字&#xff0c;百度输入关键词猪、密码&#xff0c;可知这是猪圈密码&#xff0c; 手撸得WHENTHEPIGWANTTOEAT 大写不对&#xff0c;换成小写。 …

数据结构——常见简答题汇总

目录 1、绪论 2、线性表 3、栈、队列和数组 4、串 5、树与二叉树 6、图 7、查找 8、排序 1、绪论 什么是数据结构&#xff1f; 数据结构是相互之间存在一种或多种特定关系的数据元素的集合。数据结构包括三个方面&#xff1a;逻辑结构、存储结构、数据的运算。 逻辑结…

中小学智慧校园电子班牌管理系统源码

智慧校园云平台电子班牌系统&#xff0c;利用先进的云计算技术&#xff0c;将教育信息化资源和教学管理系统进行有效整合&#xff0c;实现基础数据共享、应用统一管理。借助全新的智能交互识别终端和移动化教育管理系统&#xff0c;以考勤、课表、通知、家校互通等功能为切入点…

如何将 XxlJob 集成达梦数据库

1. 前言 在某些情况下&#xff0c;你的项目可能会面临数据库选择的特殊要求&#xff0c;随着国产化的不断推进&#xff0c;达梦数据库是一个常见的选择。本篇博客将教你如何解决 XxlJob 与达梦数据库之间的 SQL 兼容性问题&#xff0c;以便你的任务调度系统能够在这个数据库中…

NNDL 作业6 卷积

一、概念 &#xff08;一&#xff09;卷积 &#xff08;1&#xff09;什么叫卷积 卷积、旋积或褶积(英语&#xff1a;Convolution)是通过两个函数f和g生成第三个函数的一种数学运算&#xff0c;其本质是一种特殊的积分变换&#xff0c;描述一个函数和另一个函数在某个维度上…

类和对象解析

导言&#xff1a; Java是一门纯面向对象的语言&#xff0c;在面对对象的世界里&#xff0c;一切皆为对象。而对象的创建又和类的定义息息相关。本文主要阐述了类和对象的使用与理解。解释类的定义方式以及对象的实例化&#xff0c;类中的成员变量和成员方法的使用&#xff0c;…

【qemu逃逸】D3CTF2021-d3dev

前言 题目给的是一个 docker 环境&#xff0c;所以起环境非常方便&#xff0c;但是该怎么调试呢&#xff1f;有无佬教教怎么在 docker 中调试&#xff1f; 我本来想着直接起一个环境进行调试&#xff0c;但是缺了好的库&#xff0c;所以就算了&#xff0c;毕竟本题也不用咋调…

044_第三代软件开发-保存PDF

第三代软件开发-保存PDF 文章目录 第三代软件开发-保存PDF项目介绍保存PDF头文件源文件使用 关键字&#xff1a; Qt、 Qml、 pdf、 painter、 打印 项目介绍 欢迎来到我们的 QML & C 项目&#xff01;这个项目结合了 QML&#xff08;Qt Meta-Object Language&#xff…

阿里5年经验之谈 —— 记录一次jmeter压测的过程!

在软件架构与中间件实验的最后&#xff0c;要求进行非功能测试&#xff0c;那得非压力测试莫属了。虽然之前学习秒杀项目的时候看视频里面用过jmeter&#xff0c;但没有自己实操过&#xff0c;趁着这次机会&#xff0c;使用一下。 QPS与TPS 1、TPS&#xff1a; Transactions …

力扣周赛 -- 370周赛

先更新前两道题目&#xff0c;下午更新后两道 两道模板题(拓扑排序) 拓扑排序 拓扑排序&#xff08;Topological Sorting&#xff09;&#xff1a;一种对有向无环图&#xff08;DAG&#xff09;的所有顶点进行线性排序的方法&#xff0c;使得图中任意一点 $u$ 和 $v$&#xf…

【LeetCode】每日一题 2023_11_5 重复的DNA序列

文章目录 刷题前唠嗑重复的DNA序列题目描述代码和解题思路偷看大佬题解结语 刷题前唠嗑 LeetCode? 启动&#xff01;&#xff01;&#xff01; 重复的DNA序列 题目链接&#xff1a;187. 重复的DNA序列 题目描述 代码和解题思路 func findRepeatedDnaSequences(s string) …

fastapi-Headers和Cookies

在FastAPI中&#xff0c;Headers是一个特殊的类型&#xff0c;用于处理HTTP请求头&#xff08;Headers&#xff09;。Headers允许你接收、访问和修改HTTP请求中的头部信息。 使用Headers&#xff0c;你可以在FastAPI的路由视图中将请求头作为参数接收&#xff0c;并对它们进行…

linux基本用法

文章目录 前言一、开关机操作1.1 开机登陆1.2 关机1.3 系统目录结构 二、常用的基本命令(重点)2.1 相对路径与绝对路径2.2 处理目录的常用命令2.2.1 ls2.2.2 cd 切换目录2.2.3 pwd ( 显示目前所在的目录 )2.2.4 mkdir &#xff08;创建新目录&#xff09;2.2.5 rmdir ( 删除空的…

【Vue.js】Vue3全局配置Axios并解决跨域请求问题

系列文章目录 文章目录 系列文章目录背景一、部署Axios1. npm 安装 axios2. 创建 request.js&#xff0c;创建axios实例3. 在main.js中全局注册axios4. 在页面中使用axios 二、后端解决跨域请求问题方法一 解决单Contoller跨域访问方法二 全局解决跨域问题 背景 对于前后端分离…

回溯算法--4后问题

1.问题描述 四皇后问题&#xff1a;在4 4 的方格棋盘上放置4个皇后&#xff0c;使得没有两个皇后在同一行、同一列、也不在同一条45度的斜线上。问有多少种可能的布局&#xff1f; 解是4维向量 比如上面这个解<2,4,1,3> 分别表示圆圈的第2列、第4列等 还可以得到另一解…

LeetCode题:83删除排序链表中的重复元素 141环形链表

83删除排序链表中的重复元素 题目内容 给定一个已排序的链表的头 head &#xff0c; 删除所有重复的元素&#xff0c;使每个元素只出现一次 。返回 已排序的链表 。 示例 1&#xff1a; 输入&#xff1a;head [1,1,2] 输出&#xff1a;[1,2]示例 2&#xff1a; 输入&#xf…

下载安装PyCharm的步骤

1、首先进入Pycharm官网&#xff0c;并进行下载&#xff0c;日常使用社区版也是OK的 官网&#xff1a;https://www.jetbrains.com/pycharm/download/?sectionwindows 2、可以自定义路径进行安装&#xff0c;注意路径要全英哈 3、大家可以根据自己的需要来进行勾选 4、安装完成…

【漏洞复现】Webmin 远程命令执行(CVE-2019-15107)

感谢互联网提供分享知识与智慧&#xff0c;在法治的社会里&#xff0c;请遵守有关法律法规 文章目录 1.1、漏洞描述1.2、漏洞等级1.3、影响版本1.4、漏洞复现1、基础环境2、漏洞验证 1.5、深度利用1、反弹Shell 1.6、修复建议1.7、参考链接 说明内容漏洞编号CVE-2019-15107漏洞…

蒙哥马利算法模乘(四)

一 蒙哥马利算法模乘介绍 蒙哥马利模乘算法主要为了进行大数运算a*b mod n,在介绍蒙哥马利模乘之前,先让我们来了解蒙哥马利约减。 1.1 蒙哥马利约减 a mod n 如果a是一个2048位的整数,n是一个1024位的整数,如果直接采用相除的方式,不论在空间还是时间上都会产生非常大…