【Flink】窗口实战:TUMBLE、HOP、SESSION

news2025/1/18 8:20:48

窗口实战:TUMBLE、HOP、SESSION

  • 1.TUMBLE WINDOW
    • 1.1 语法
    • 1.2 标识函数
    • 1.3 模拟用例
  • 2.HOP WINDOW
    • 2.1 语法
    • 2.2 标识函数
    • 2.3 模拟用例
  • 3.SESSION WINDOW
  • 3.1 语法
    • 3.2 标识函数
    • 3.3 模拟用例
  • 4.更多说明

在流式计算中,流通常是无穷无尽的,我们无法知道什么时候数据源会继续 / 停止发送数据,所以在流上处理聚合事件(countsum 等)的处理方式与批处理中的处理方式会有所差异。在流上一般用窗口(Window)来限定聚合的范围,例如 “过去 2 分钟网站点击量的计数”、“在最近 100 个人中点赞这个视频的总人数”。窗口的概念相当于帮我们收集了一张有限数据的动态表,我们可以对表中的数据进行聚合计算。

窗口函数是一种特殊的函数,它并不在 SELECT 的投影列表中使用,而是在 GROUP BY 子句中使用。

1.TUMBLE WINDOW

TUMBLE WINDOW(滚动窗口)将每个进入的数据分配到一个指定窗口大小的窗口中。滚动窗口可以自定义固定的大小,并且不会出现重叠。我们可以对窗口内的数据进行计算。

1.1 语法

TUMBLE(time_attr, interval)
  • time_attr:表示 时间戳字段,表示每条记录被处理的时间戳。如果指定为 PROCTIME 是自动生成的时间戳,记录了数据被 Flink 处理的时刻,一般用在 Processing Time 模式下。
  • interval:用来设置 窗口大小。例如,设置为 1 天:INTERVAL '1' DAY;设置为 2 小时:INTERVAL '2' HOUR
  • 如果在 Event Time 时间模式下(使用 WATERMARK FOR 语句定义了时间戳字段),那么 TUMBLE、HOP、SESSION 窗口函数的第一个参数必须为该字段。
  • 如果在 Processing Time 时间模式下,则 TUMBLE、HOP、SESSION 窗口函数的第一个参数必须为 proctime() 函数生成的计算列,下文用 PROCTIME 举例,请在实际作业中替换为实际的列名。

1.2 标识函数

函数名
功能描述
TUMBLE_START(time-attr, size-interval)返回窗口的起始时间(包含边界)。例如 [00:10, 00:15) 窗口,返回 00:10
TUMBLE_END(time-attr, size-interval)返回窗口的结束时间(包含边界)。例如 [00:00, 00:15] 窗口,返回 00:15
TUMBLE_ROWTIME(time-attr, size-interval)返回窗口的结束时间(不包含边界)。例如[00:00, 00:15] 窗口,返回 00:14:59.999。返回值是一个 rowtime attribute,即可以基于该字段做时间属性的操作。
TUMBLE_PROCTIME(time-attr, size-interval)返回窗口的结束时间(不包含边界)。例如 [00:00, 00:15] 窗口,返回00:14:59.999。返回值是一个 proctime attribute,即可以基于该字段做时间属性的操作。

1.3 模拟用例

下文以 TUMBLE WINDOW 为例,帮助您更容易地理解 TUMBLE WINDOW。使用 Event Time 模拟统计 每小时各用户收入金额

示例数据:

username(VARCHAR)income(BIGINT)times(TIMESTAMP)
Tom202021-11-11 10:30:00.0
Jack102021-11-11 10:35:00.0
Tom102021-11-11 10:35:00.0
Tom102021-11-11 10:40:00.0
Tom152021-11-11 11:30:00.0
Jack102021-11-11 11:30:00.0
Jack152021-11-11 11:40:00.0
CREATE TABLE user_income (
    username VARCHAR,
    income INT,
    times TIMESTAMP(3),
    WATERMARK FOR times AS times - INTERVAL '3' SECOND
) WITH (
    'connector' = 'filesystem',
    'path' = 'input/sales01.csv',
    'format' =  'csv'
);

CREATE TABLE output (
    win_start TIMESTAMP,
    win_end TIMESTAMP,
    username VARCHAR,
    hour_income BIGINT
) WITH (
    'connector' = 'print'
);

INSERT INTO output
SELECT
    TUMBLE_START(times,INTERVAL '1' HOUR),
    TUMBLE_END(times,INTERVAL '1' HOUR),
    username,
    SUM(income)
FROM user_income
GROUP BY TUMBLE(times,INTERVAL '1' HOUR),username;

在这里插入图片描述

2.HOP WINDOW

HOP WINDOW(滑动窗口)将元素分配到固定长度的窗口中,与滚动窗口类似,窗口的大小由窗口大小参数来配置,另一个窗口滑动参数控制滑动窗口开始的频率。

HOP WINDOW 保持窗口大小(Size)不变,每次滑动指定的时间周期(Slide),因而允许窗口之间的相互重叠。

Slide 的大小决定了 Flink 创建新窗口的频率。

  • 当 Slide 小于 Size 时,相邻窗口会重叠,一个时间会被分配到多个窗口。
  • 当 Slide 大于 Size 时,可能会导致有些事件被丢弃。
  • 当 Slide 等于 Size 时,等于是 TUMBLE WINDOW。

2.1 语法

HOP(time_attr, sliding_interval, window_size_interval)
  • time_attr:表示 时间戳字段,表示每条记录被处理的时间戳。如果指定为 PROCTIME 是自动生成的时间戳,记录了数据被 Flink 处理的时刻,一般用在 Processing Time 模式下。
  • sliding_interval:用来设置 滑动时间周期大小。例如,设置为 1 天:INTERVAL '1' DAY;设置为 2 小时:INTERVAL '2' HOUR
  • window_size_interval:用来设置 窗口大小。例如,设置为 1 天:INTERVAL '1' DAY;设置为 2 小时:INTERVAL '2' HOUR

2.2 标识函数

函数名
功能描述
HOP_START(time-attr, slide-interval,size-interval)返回该窗口的起始时间
HOP_END(time-attr, slide-interval,size-interval)返回该窗口的结束时间

2.3 模拟用例

下文以 HOP WINDOW 为例,帮助您更容易地理解 HOP WINDOW。使用 Event Time 模拟统计每小时各用户收入金额,1 小时的窗口,30 分钟滑动一次

示例数据:

username(VARCHAR)income(BIGINT)times(TIMESTAMP)
Tom202021-11-11 10:30:00.0
Jack102021-11-11 10:35:00.0
Tom102021-11-11 10:35:00.0
Tom102021-11-11 10:40:00.0
Tom152021-11-11 11:35:00.0
Jack102021-11-11 11:30:00.0
Jack152021-11-11 11:40:00.0
CREATE TABLE user_income (
    username VARCHAR,
    Income INT,
    times TIMESTAMP(3),
    WATERMARK FOR times AS times - INTERVAL '3' MINUTE
) WITH (
    'connector' = 'filesystem',
    'path' = 'input/sales02.csv',
    'format' =  'csv'
);

CREATE TABLE output (
    win_start TIMESTAMP,
    win_end TIMESTAMP,
    username VARCHAR,
    hour_income BIGINT
) WITH (
    'connector' = 'print'
);

INSERT INTO output
SELECT
    HOP_START(times,INTERVAL '30' MINUTE,INTERVAL '1' HOUR),
    HOP_END(times,INTERVAL '30' MINUTE,INTERVAL '1' HOUR),
    username,
    SUM(income)
FROM user_income
GROUP BY HOP(times,INTERVAL '30' MINUTE,INTERVAL '1' HOUR),username;

在这里插入图片描述

3.SESSION WINDOW

SESSION WINDOW(会话窗口)通过 Session 活动对元素进行分组,Session 窗口与滚动窗口和滑动窗口相比,不会有重叠和固定的开始时间和结束时间的情况,相反,当它在一个固定的时间周期内不再收到元素,即非活动间隔产生,那个这个窗口就会关闭。一个 Session 窗口通过一个 sSession 间隔来配置。这个 Session 间隔定义了非活跃周期的长度,当这个非活跃周期产生,那么当前的 Session 将关闭并且后续的元素将被分配到新的 Session 窗口中。

Session Window 并非以长度来划分窗口,而是以 非活跃时间 来划分。例如超过 30 分钟不活跃(没有新数据),则之前的窗口结束,下一个来到的数据将会形成一个新窗口。

3.1 语法

SESSION(time_attr, interval)
  • time_attr:表示 时间戳字段,表示每条记录被处理的时间戳。如果指定为 PROCTIME 是自动生成的时间戳,记录了数据被 Flink 处理的时刻,一般用在 Processing Time 模式下。
  • interval:用来设置 窗口大小。例如,设置为 1 天:INTERVAL '1' DAY;设置为 2 小时:INTERVAL '2' HOUR

3.2 标识函数

函数名
功能描述
SESSION_START(time-attr, size-interval)返回该窗口的起始时间
SESSION_END(time-attr, size-interval)返回该窗口的结束时间

3.3 模拟用例

下文以 SESSION WINDOW 为例,帮助您更容易地理解 SESSION WINDOW。使用 Event Time 模拟统计每小时各用户收入金额,会话超时时长为 30 分钟

样例数据:

username(VARCHAR)income(BIGINT)times(TIMESTAMP)
Tom202021-11-11 10:30:00.0
Jack102021-11-11 10:35:00.0
Tom102021-11-11 10:35:00.0
Tom102021-11-11 10:40:00.0
Tom152021-11-11 11:50:00.0
Jack102021-11-11 11:40:00.0
Jack152021-11-11 11:45:00.0
CREATE TABLE user_income (
    username VARCHAR,
    income INT,
    times TIMESTAMP(3),
    WATERMARK FOR times AS times - INTERVAL '3' MINUTE
) WITH (
    'connector' = 'filesystem',
    'path' = 'input/sales03.csv',
    'format' =  'csv'
);

CREATE TABLE output (
    win_start TIMESTAMP,
    win_end TIMESTAMP,
    username VARCHAR,
    hour_income BIGINT
) WITH (
    'connector' = 'print'
);

INSERT INTO output
SELECT
    SESSION_START(times,INTERVAL '30' MINUTE),
    SESSION_END(times,INTERVAL '30' MINUTE),
    username,
    SUM(income)
FROM user_income
GROUP BY SESSION(times,INTERVAL '30' MINUTE),username;

在这里插入图片描述

4.更多说明

以上三种窗口都有对应的辅助函数。以 TUMBLE 窗口为例(HOP、SESSION 也一样,只是前缀不同),辅助函数如下:

  • TUMBLE_ROWTIME:表示 TUMBLE 窗口的末端界限(包含,可用作 JOIN 或 GROUP 以及 OVER 条件,Event Time 时间模式下使用)。示例如下:
SELECT user,
	   TUMBLE_START(rowtime, INTERVAL '12' HOUR) AS sStart,
       TUMBLE_ROWTIME(rowtime, INTERVAL '12' HOUR) AS snd,
       SUM(amount)
FROM Orders
GROUP BY TUMBLE(rowtime, INTERVAL '12' HOUR), user
  • TUMBLE_PROCTIME:表示 TUMBLE 窗口的末端界限(包含,可用作 JOIN 或 GROUP 以及 OVER 条件,Processing Time 时间模式下使用)。示例如下:
SELECT user,
	   TUMBLE_START(PROCTIME, INTERVAL '12' HOUR) AS sStart,
	   TUMBLE_PROCTIME(PROCTIME, INTERVAL '12' HOUR) AS snd,
	   SUM(amount)
FROM Orders
GROUP BY TUMBLE(PROCTIME, INTERVAL '12' HOUR), user

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1539049.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

C++第十弹---类与对象(七)

✨个人主页: 熬夜学编程的小林 💗系列专栏: 【C语言详解】 【数据结构详解】【C详解】 目录 1、再谈构造函数 1.1、构造函数体赋值 1.2、初始化列表 1.3、explicit关键字 2、static成员 2.1、概念 2.2、特性 2.3、面试题 总结 1、再…

鸿蒙Harmony应用开发—ArkTS(@Prop装饰器:父子单向同步)

Prop装饰的变量可以和父组件建立单向的同步关系。Prop装饰的变量是可变的,但是变化不会同步回其父组件。 说明: 从API version 9开始,该装饰器支持在ArkTS卡片中使用。 概述 Prop装饰的变量和父组件建立单向的同步关系: Prop变量…

leetcode 2617. 网格图中最少访问的格子数【单调栈优化dp+二分】

原题链接&#xff1a;2617. 网格图中最少访问的格子数 题目描述&#xff1a; 给你一个下标从 0 开始的 m x n 整数矩阵 grid 。你一开始的位置在 左上角 格子 (0, 0) 。 当你在格子 (i, j) 的时候&#xff0c;你可以移动到以下格子之一&#xff1a; 满足 j < k < gri…

【单元测试】一文读懂java单元测试

目录 1. 什么是单元测试2. 为什么要单元测试3. 单元测试框架 - JUnit3.1 JUnit 简介3.2 JUnit 内容3.3 JUnit 使用3.3.1 Controller 层单元测试3.3.2 Service 层单元测试3.3.3 Dao 层单元测试3.3.4 异常测试3.3.5 测试套件测多个类3.3.6 idea 中查看单元测试覆盖率3.3.7 JUnit …

Excel使用VLOOKUP函数

VLOOKUP(lookup_value,table_array,col_index_num,range_lookup) 释义&#xff1a; lookup_value&#xff1a;要查找的值&#xff0c;包括数字&#xff0c;文本等 table_array&#xff1a;要查找的值以及预期返回的内容所在的区域 col_index_num&#xff1a;查找的区域的列…

安装mysql8.0.36遇到的问题没有developer default 选项问题

安装mysql8.0.36的话没有developer default选项&#xff0c;直接选择customer就好了&#xff0c;点击next之后通过点击左边Available Products里面的号和中间一列的右箭头添加要安装的产品&#xff0c;最后会剩下6个 安装完成后默认是启动了&#xff0c;并且在电脑注册表注册了…

机器学习——决策树剪枝算法

机器学习——决策树剪枝算法 决策树是一种常用的机器学习模型&#xff0c;它能够根据数据特征的不同进行分类或回归。在决策树的构建过程中&#xff0c;剪枝算法是为了防止过拟合&#xff0c;提高模型的泛化能力而提出的重要技术。本篇博客将介绍剪枝处理的概念、预剪枝和后剪…

《优化接口设计的思路》系列:第九篇—用好缓存,让你的接口速度飞起来

一、前言 大家好&#xff01;我是sum墨&#xff0c;一个一线的底层码农&#xff0c;平时喜欢研究和思考一些技术相关的问题并整理成文&#xff0c;限于本人水平&#xff0c;如果文章和代码有表述不当之处&#xff0c;还请不吝赐教。 作为一名从业已达六年的老码农&#xff0c…

vue2 自定义 v-model (model选项的使用)

效果预览 model 选项的语法 每个组件上只能有一个 v-model。v-model 默认会占用名为 value 的 prop 和名为 input 的事件&#xff0c;即 model 选项的默认值为 model: {prop: "value",event: "input",},通过修改 model 选项&#xff0c;即可自定义v-model …

35 跨域相关问题, 以及常见的解决方式

前言 跨域相关 这是一个 经常会碰到的问题 然后 常见的解决方式 也大概就是几种, 各有各的问题 这里仅仅是 从理论上 来探讨这个问题 主流的解决方式 是通过代理, 将不同域 合并到同一个域 测试用例 测试用例如下, 这里仅仅是一个简单的数据展示 获取对方 “/config.jso…

【c++入门】引用,内联函数,auto

&#x1f525;个人主页&#xff1a;Quitecoder &#x1f525;专栏&#xff1a;c笔记仓 朋友们大家好&#xff0c;本节我们来到c中一个重要的部分&#xff1a;引用 目录 1.引用的基本概念与用法1.1引用特性1.2使用场景1.3传值、传引用效率比较1.4引用做返回值1.5引用和指针的对…

Kubernetes(k8s)集群健康检查常用的五种指标

文章目录 1、节点健康指标2、Pod健康指标3、服务健康指标4、网络健康指标5、存储健康指标 1、节点健康指标 节点状态&#xff1a;检查节点是否处于Ready状态&#xff0c;以及是否存在任何异常状态。 资源利用率&#xff1a;监控节点的CPU、内存、磁盘等资源的使用情况&#xf…

SpringCloud从入门到精通速成(二)

文章目录 1.Nacos配置管理1.1.统一配置管理1.1.1.在nacos中添加配置文件1.1.2.从微服务拉取配置 1.2.配置热更新1.2.1.方式一1.2.2.方式二 1.3.配置共享1&#xff09;添加一个环境共享配置2&#xff09;在user-service中读取共享配置3&#xff09;运行两个UserApplication&…

c语言食堂就餐排队问题290行

定制魏&#xff1a;QTWZPW&#xff0c;获取更多源码等 目录 题目 数据结构 函数设计 结构设计 总结 效果截图 ​ 主函数代码 题目 设计一个程序来模拟食堂就餐排队问题&#xff0c;通过输入学生人数和面包数量&#xff0c;计算有多少学生能够吃到午餐。 数据结构 该…

原神x星穹铁道文本转原神语音源码

《原神》x《星穹铁道》文本转原神语音源码介绍文案 探索未知的奇幻世界&#xff0c;与心仪的角色共舞冒险之旅——《原神》与《星穹铁道》的梦幻联动&#xff0c;为你带来前所未有的游戏体验&#xff01;而此刻&#xff0c;我们将为你揭秘一项革命性的创新&#xff1a;文本转原…

T470 双电池机制

ThinkPad系列电脑牛黑科技双电池管理体系技术,你知道吗&#xff1f; - 北京正方康特联想电脑代理商 上文的地址 在放电情况下&#xff1a;优先让外置电池放电&#xff0c;当放到一定电量后开始让内置电池放电。 在充电情况下&#xff1a;优先给内置电池充电&#xff0c;当充…

数据结构从入门到精通——希尔排序

希尔排序 前言一、希尔排序( 缩小增量排序 )二、希尔排序的特性总结三、希尔排序动画演示四、希尔排序具体代码实现test.c 前言 希尔排序是一种基于插入排序的算法&#xff0c;通过比较相距一定间隔的元素来工作&#xff0c;各趟比较所用的距离随着算法的进行而减小&#xff0…

c++核心学习5

4.6继承 有些类与类之间存在特殊的关系&#xff0c;例如下图中&#xff1a; 我们发现&#xff0c;定义这些类时&#xff0c;下级别的成员除了拥有上一级的共性&#xff0c;还有自己的特性。这个时候我们就可以考虑利用继承的技术&#xff0c;减少重复代码 4.6.1继承的基本语法…

学点儿Java_Day9_字符串操作

1 实现trim方法 实现简单的trim方法&#xff0c;实现传入一个字符串&#xff0c;返回忽略前导空格和尾部空格。 public String myTrim(String str) {if (str null || str.isEmpty()) {//"".equals(str)return null;}char[] chars str.toCharArray();int start 0…

GD32串口通信PB6,PB7

我发现GD32很多接口都需要冲映射&#xff0c;刚开始还是不习惯&#xff0c;还要打开要选打开AFIO时钟。算了&#xff0c;直接看代码&#xff1a; 1,usart.c //#include "usart.h"//void USART_GPIO_init(void) //{ // //初始化引脚 // rcu_periph_clock_enable(RCU…