大数据Flink(一百一十七):Flink SQL的窗口操作

news2024/11/14 11:00:27

文章目录

Flink SQL的窗口操作

一、窗口的概述

二、Group Windows

1、​​​​​​​滚动窗口(TUMBLE)

2、​​​​​​​​​​​​​​滑动窗口(HOP)

3、​​​​​​​​​​​​​​Session 窗口(SESSION)

4、渐进式窗口(CUMULATE)

三、​​​​​​​​​​​​​​Over Windows

1、​​​​​​时间区间聚合(RANGE OVER Window)

2、​​​​​​​行数聚合


Flink SQL的窗口操作

一、窗口的概述

在流处理应用中,数据是连续不断的,因此我们不可能等到所有数据都到了才开始处理。当然我们可以每来一个消息就处理一次,但是有时我们需要做一些聚合类的处理,例如:在过去的1分钟内有多少用户点击了我们的网页。在这种情况下,我们必须定义一个窗口,用来收集最近一分钟内的数据,并对这个窗口内的数据进行计算。

Flink 认为 Batch 是 Streaming 的一个特例,所以 Flink 底层引擎是一个流式引擎,在上面实现了流处理和批处理。而窗口(window)就是从 Streaming 到 Batch 的一个桥梁。

  • 一个Window代表有限对象的集合。一个窗口有一个最大的时间戳,该时间戳意味着在其代表的某时间点——所有应该进入这个窗口的元素都已经到达。
  • Window就是用来对一个无限的流设置一个有限的集合,在有界的数据集上进行操作的一种机制。
  • 在Table API和SQL中,主要有两种窗口:Group Windows 和 Over Windows
    • Group Windows 根据时间或行计数间隔将组行聚合成有限的组,并对每个组计算一次聚合函数
    • Over Windows 窗口内聚合为每个输入行在其相邻行范围内计算一个聚合

 

二、​​​​​​​Group Windows

1、​​​​​​​滚动窗口(TUMBLE)

滚动窗口定义:滚动窗口将每个元素指定给指定窗口大小的窗口。滚动窗口具有固定大小,且不重叠。例如,指定一个大小为 5 分钟的滚动窗口。在这种情况下,Flink 将每隔 5 分钟开启一个新的窗口,其中每一条数都会划分到唯一一个 5 分钟的窗口中,如下图所示。

TUMBLE函数基于时间属性字段为关系的每一行指定一个窗口。

在流模式下,时间属性字段必须是事件或处理时间属性。

在批处理模式下,窗口表函数的时间属性字段必须是TIMESTAMP或TIMESTAMP _LTZ类型的属性。TUMBLE的返回值是一个新的关系,它包括原始关系的所有列,以及另外3列“window_start”、“window_end”、“window_time”,以指示指定的窗口。原始时间属性“timecol”将是窗口TVF之后的常规时间戳列。

TUMBLE函数接受三个必需参数,一个可选参数:

TUMBLE(TABLE data, DESCRIPTOR(timecol), size [, offset ])
  • data: 是一个表参数,可以是与时间属性列的任何关系。
  • timecol: 是一个列描述符,指示数据的哪些时间属性列应映射到翻转窗口。
  • size: 是指定滚动窗口宽度的持续时间。
  • offset: 是一个可选参数,用于指定窗口起始位置的偏移量。

应用场景:常见的按照一分钟对数据进行聚合,计算一分钟内 PV,UV 数据。

实际案例:简单且常见的分维度分钟级别同时在线用户数、总销售额

那么上面这个案例的 SQL 要咋写呢?

关于滚动窗口,在 1.14 版本之前和 1.14 及之后版本有两种 Flink SQL 实现方式,分别是:

  • Group Window Aggregation(1.14 之前只有此类方案,此方案在 1.14 及之后版本已经标记为废弃,不推荐使用
  • Windowing TVF(1.14 及之后建议使用 Windowing TVF

这里两种方法都会介绍: 

  • Group Window Aggregation 方案(支持 Batch\Streaming 任务):

使用socket演示:

监听9999端口:nc -lk 9999

测试数据如下(创建完表,启动查询任务后,再进行输入)

1,12189,80729,2021-05-23 05:16:39
1,78750,7434,2021-05-23 05:16:41
1,38905,75583,2021-05-23 05:16:42
1,29388,52138,2021-05-23 05:16:45
1,51810,84241,2021-05-23 05:16:47
1,34713,87372,2021-05-23 05:16:48
1,62264,61675,2021-05-23 05:16:52
1,32460,29190,2021-05-23 05:17:40
1,73052,15170,2021-05-23 05:23:00

 进入阿里云Flink开发平台创建表,首先使用处理时间进行演示(代码如下)

CREATE TABLE tumble_group_proctime ( 
 dim STRING, 
 user_id BIGINT, 
 price BIGINT,
 `timestamp` STRING,
 row_time AS TO_TIMESTAMP(`timestamp`),
 pt AS PROCTIME()
) WITH (
  'connector' = 'socket',
  'hostname' = '128.66.209.119',        
  'port' = '9999',
  'format' = 'csv'
);

查询语句如下,可以看到 Group Window Aggregation 滚动窗口的 SQL 语法就是把 tumble window 的声明写在了 group by 子句中,即 tumble(pt, interval '5' second)。第一个参数为处理时间,第二个参数为滚动窗口大小。

如果使用窗口开始时间或者结束的话,语法如下:tumble_start(pt, interval '5' second)、tumble_end(pt, interval '5' second)。

select 
    --窗口开始时间
    tumble_start(pt, interval '5' second)  as window_start,
    --窗口结束时间
    tumble_end(pt, interval '5' second) as window_end,
    dim,
    count(*) as pv,
    sum(price) as sum_price,
    max(price) as max_price,
    min(price) as min_price,
    -- 计算 uv 数
    count(distinct user_id) as uv
from tumble_group_proctime
group by
dim,
    tumble(pt, interval '5' second);

选中查询代码,点击调试,查看结果。目前没有传入数据,所以没有结果。

 

下面开始通过socket传入测试数据。首先传入第一条数据,在几秒后,可以看到一条结果。 

同时传入剩下的数据,可以看到另一条结果

第一个条结果的pv为1,即只有一条数据。这里因为在这个5秒的窗口时间内,只有第一条数据。第二个结果的pv为8,即有八条数据。显然,在这个5秒的时间窗口内,有八条数据。这条结果是八条数据聚合的结果。

接下来使用事件时间进行演示(代码如下)

 

--事件时间演示
CREATE TABLE tumble_group_eventime ( 
 dim STRING, 
 user_id BIGINT, 
 price BIGINT,
 `timestamp` STRING,
 row_time AS TO_TIMESTAMP(`timestamp`),
 watermark for row_time as row_time - interval '0' second
) WITH (
  'connector' = 'socket',
  'hostname' = '128.66.209.119',        
  'port' = '9999',
  'format' = 'csv'
);

查询语句如下

select 
    --窗口开始时间
    tumble_start(row_time, interval '5' second)  as window_start,
    --窗口结束时间
    tumble_end(row_time, interval '5' second) as window_end,
    dim,
    count(*) as pv,
    sum(price) as sum_price,
    max(price) as max_price,
    min(price) as min_price,
    -- 计算 uv 数
    count(distinct user_id) as uv
from tumble_group_eventime
group by
    dim,
    tumble(row_time, interval '5' second);

选中查询代码,点击调试,查看结果。目前没有传入数据,所以没有结果。

 

下面开始通过socket传入测试数据。首先传入第一条数据,发现无论等待多久,都没有结果产生。

这是因为第一条数据输入后,创建了一个时间窗口。但是没有输入第二条数据,即没有出现数据的事件时间大于窗口的结束时间,所以这个窗口不会结束,因此就不会触发计算,也就看不到结果。

输入第二条数据,发现看到了结果

观察测试数据可以看到,第一条数据的事件时间为2021-05-23 05:16:39,处于上面结果的窗口时间内。第二条数据的事件时间为2021-05-23 05:16:41,大于上面结果的窗口的结束时间,因而触发了这个窗口的计算。

输入剩下的数据,可以看到结果。

可以对比数据分析结果,这里不再赘述。

  • Window TVF 方案(1.14 只支持 Streaming 任务,1.15版本开始支持 Batch\Streaming 任务):

建表语句

-- TVF
CREATE TABLE tumble_tvf ( 
 dim STRING, 
 user_id BIGINT, 
 price BIGINT,
 `timestamp` STRING,
 row_time AS TO_TIMESTAMP(`timestamp`),
 watermark for row_time as row_time - interval '0' second
) WITH (
  'connector' = 'socket',
  'hostname' = '179.18.59.99',        
  'port' = '9999',
  'format' = 'csv'
);

 查询语句

SELECT 
    dim,
    window_start,
    window_end,
    count(*) as pv,
    sum(price) as sum_price,
    max(price) as max_price,
    min(price) as min_price,
    count(distinct user_id) as uv
FROM TABLE(TUMBLE(
        TABLE tumble_tvf
        , DESCRIPTOR(row_time)
        , INTERVAL '5' SECOND))
GROUP BY window_start, 
      window_end,
      dim;

可以看到 Windowing TVF 滚动窗口的写法就是把 tumble window 的声明写在了数据源的 Table 子句中,即 TABLE(TUMBLE(TABLE source_table, DESCRIPTOR(row_time), INTERVAL '5' SECOND)),包含三部分参数。

第一个参数 TABLE source_table 声明数据源表;第二个参数 DESCRIPTOR(row_time) 声明数据源的时间戳;第三个参数 INTERVAL '5' SECOND 声明滚动窗口大小为 5 秒。

返回值除了输入的参数外,还有window_start, window_end,window_time。window_start返回窗口的起始时间(包含边界),window_end返回窗口的结束时间(包含边界),window_time返回窗口的结束时间(不包含边界)。window_time等于window_end减去1ms。

  • 注意事项

事件时间中滚动窗口的窗口计算触发是由 Watermark 推动的。

 

2、​​​​​​​​​​​​​​滑动窗口(HOP)

滑动窗口定义:滑动窗口也是将元素指定给固定长度的窗口。与滚动窗口功能一样,也有窗口大小的概念。不一样的地方在于,滑动窗口有另一个参数控制窗口计算的频率(滑动窗口滑动的步长)。因此,如果滑动的步长小于窗口大小,则滑动窗口之间每个窗口是可以重叠。在这种情况下,一条数据就会分配到多个窗口当中。举例,有 10 分钟大小的窗口,滑动步长为 5 分钟。这样,每 5 分钟会划分一次窗口,这个窗口包含的数据是过去 10 分钟内的数据,如下图所示。

 

在流模式下,时间属性字段必须是事件或处理时间属性。

在批处理模式下,窗口表函数的时间属性字段必须是TIMESTAMP或TIMESTAMP _LTZ类型的属性。TUMBLE的返回值是一个新的关系,它包括原始关系的所有列,以及另外3列“window_start”、“window_end”、“window_time”,以指示指定的窗口。原始时间属性“timecol”将是窗口TVF之后的常规时间戳列。

HOP接受四个必需参数,一个可选参数:

HOP(TABLE data, DESCRIPTOR(timecol), slide, size [, offset ])
  • data: 是一个表参数,可以是与时间属性列的任何关系。
  • timecol:是一个列描述符,指示数据的哪些时间属性列应映射到跳跃窗口。
  • slide: 是一个持续时间,指定顺序跳跃窗口开始之间的持续时间
  • size: 是指定跳跃窗口宽度的持续时间。
  • offset: 是一个可选参数,用于指定窗口起始位置的偏移量。

应用场景:比如计算同时在线的数据,要求结果的输出频率是 1 分钟一次,每次计算的数据是过去 5 分钟的数据(有的场景下用户可能在线,但是可能会 2 分钟不活跃,但是这也要算在同时在线数据中,所以取最近 5 分钟的数据就能计算进去了)

实际案例:简单且常见的分维度分钟级别同时在线用户数,2 秒钟输出一次,计算最近 6 秒钟的数据

依然是 Group Window Aggregation、Windowing TVF 两种方案:

  • Group Window Aggregation 方案(支持 Batch\Streaming 任务):
 CREATE TABLE hop_group ( 
 dim STRING, 
 user_id BIGINT, 
 price BIGINT,
 `timestamp` STRING,
 row_time AS TO_TIMESTAMP(`timestamp`),
 watermark for row_time as row_time - interval '0' second
) WITH (
  'connector' = 'socket',
  'hostname' = '178.23.141.244', 
  'port' = '9999',
  'format' = 'csv'
);

SELECT 
    hop_start(row_time, interval '2' SECOND, interval '6' SECOND) as window_start,
    hop_end(row_time, interval '2' SECOND, interval '6' SECOND) as window_end, 
    dim,
    count(distinct user_id) as uv
FROM hop_group
GROUP BY dim
    , hop(row_time, interval '2' SECOND, interval '6' SECOND);

可以看到 Group Window Aggregation 滚动窗口的写法就是把 hop window 的声明写在了 group by 子句中,即 hop(row_time, interval '2' SECOND, interval '6' SECOND)。其中:

第一个参数为事件时间的时间戳;

第二个参数为滑动窗口的滑动步长;

第三个参数为滑动窗口大小。

查询结果

  • Windowing TVF 方案(1.14 只支持 Streaming 任务,1.15版本开始支持 Batch\Streaming 任务):

 

CREATE TABLE hop_tvf ( 
 dim STRING, 
 user_id BIGINT, 
 price BIGINT,
 `timestamp` STRING,
 row_time AS TO_TIMESTAMP(`timestamp`),
 watermark for row_time as row_time - interval '0' second
) WITH (
  'connector' = 'socket',
  'hostname' = '178.23.141.244',        
  'port' = '9999',
  'format' = 'csv'
);

SELECT 
    dim,
     window_start,  
     window_end, 
    count(distinct user_id) as uv
FROM TABLE(HOP(
        TABLE hop_tvf
        , DESCRIPTOR(row_time)
        , interval '2' SECOND, interval '6' SECOND))
GROUP BY window_start, 
      window_end,
      dim;

可以看到 Windowing TVF 滚动窗口的写法就是把 hop window 的声明写在了数据源的 Table 子句中,即 TABLE(HOP(TABLE source_table, DESCRIPTOR(row_time), INTERVAL '2' SECOND, INTERVAL '6' SECOND)),包含四部分参数:

第一个参数 TABLE source_table 声明数据源表;

第二个参数 DESCRIPTOR(row_time) 声明数据源的时间戳;

第三个参数 INTERVAL '2' SECOND 声明滚动窗口滑动步长大小为2 SECOND。

第四个参数 INTERVAL '6' SECOND 声明滚动窗口大小为6 SECOND。

查询结果

3、​​​​​​​​​​​​​​Session 窗口(SESSION)

Session 窗口定义:Session 时间窗口和滚动、滑动窗口不一样,其没有固定的持续时间,如果在定义的间隔期(Session Gap)内没有新的数据出现,则 Session 就会窗口关闭。如下图对比所示: 

应用场景:计算每个用户在活跃期间(一个 Session)总共购买的商品数量,如果用户 5 分钟没有活动则视为 Session 断开

案例

目前 1.15 版本中 Flink SQL 不支持 Session 窗口的 Window TVF,所以这里就只介绍 Group Window Aggregation 方案:

  • Group Window Aggregation 方案(支持 Batch\Streaming 任务):

 

CREATE TABLE session_group ( 
 dim STRING, 
 user_id BIGINT, 
 price BIGINT,
 `timestamp` STRING,
 row_time AS TO_TIMESTAMP(`timestamp`),
 watermark for row_time as row_time - interval '0' second
) WITH (
  'connector' = 'socket',
  'hostname' = '178.23.148.244',        
  'port' = '9999',
  'format' = 'csv'
);

 SELECT 
    session_start(row_time, interval '5' SECOND) as window_start, 
    session_end(row_time, interval '5' SECOND) as window_end, 
    dim,
    count(1) as pv
FROM session_group
GROUP BY dim
      , session(row_time, interval '5' SECOND);

  • 注意事项

上述 SQL 任务是在整个 Session 窗口结束之后才会把数据输出。Session 窗口即支持 处理时间 也支持 事件时间。但是处理时间只支持在 Streaming 任务中运行,Batch 任务不支持。

Session gap 间隔是5s,实际上是不包含5s,即大于5s才会触发计算

可以看到 Group Window Aggregation 中 Session 窗口的写法就是把 session window 的声明写在了 group by 子句中,即 session(row_time, interval '5' SECOND)。其中:

第一个参数为事件时间的时间戳;

第二个参数为 Session gap 间隔。

 

4、渐进式窗口(CUMULATE)

渐进式窗口定义:渐进式窗口是 固定窗口间隔内提前触发的的滚动窗口,其实就是 Tumble Window + early-fire 的一个事件时间的版本。

例如,从每日零点到当前这一分钟绘制累积 UV,其中 10:00 时的 UV 表示从 00:00 到 10:00 的 UV 总数。

渐进式窗口可以认为是首先开一个最大窗口大小的滚动窗口,然后根据用户设置的触发的时间间隔将这个滚动窗口拆分为多个窗口,这些窗口具有相同的窗口起点和不同的窗口终点。如下图所示:

这些CUMULATE函数根据时间属性列分配窗口。

在流模式下,时间属性字段必须是事件或处理时间属性

在批处理模式下,窗口表函数的时间属性字段必须是TIMESTAMP或TIMESTAMP _LTZ类型的属性。CUMULATE的返回值是一个新的关系,它包括原始关系的所有列,以及另外3列“window_start”、“window_end”、“window_time”,以指示指定的窗口。原始时间属性“timecol”将是窗口TVF之后的常规时间戳列。

CUMULATE接受四个必需参数,一个可选参数:

CUMULATE(TABLE data, DESCRIPTOR(timecol), step, size)
  • data: 是一个表参数,可以是与时间属性列的任何关系。
  • timecol:是一个列描述符,指示数据的哪些时间属性列应映射到累积窗口。
  • step: 是指定连续累积窗口结束之间增加的窗口大小的持续时间。
  • size: 是指定累积窗口的最大宽度的持续时间。size必须是step的整数倍
  • offset: 是一个可选参数,用于指定窗口起始位置的偏移量。

应用场景:周期内累计 PV,UV 指标(如每天累计到当前这一分钟的 PV,UV)。这类指标是一段周期内的累计状态,对分析师来说更具统计分析价值,而且几乎所有的复合指标都是基于此类指标的统计(不然离线为啥都要累计一天的数据,而不要一分钟累计的数据呢)。

实际案例:每天的截止当前分钟的累计 money(sum(money)),去重 id 数(count(distinct id))。每天代表渐进式窗口大小为 1 天,分钟代表渐进式窗口移动步长为分钟级别。举例如下:

明细输入数据:

time

id

money

2021-11-01 00:01:00

A

3

2021-11-01 00:01:00

B

5

2021-11-01 00:01:00

A

7

2021-11-01 00:02:00

C

3

2021-11-01 00:03:00

C

10

预期经过渐进式窗口计算的输出数据:

time

count distinct id

sum money

2021-11-01 00:01:00

2

15

2021-11-01 00:02:00

3

18

2021-11-01 00:03:00

3

28

转化为折线图长这样:

可以看到,其特点就在于,每一分钟的输出结果都是当天零点累计到当前的结果。

渐进式窗口目前只有 Windowing TVF 方案支持:

  • Windowing TVF 方案(1.14 只支持 Streaming 任务,1.15版本开始支持 Batch\Streaming 任务):
  • CREATE TABLE cumulate_tvf (
        -- 用户 id
        user_id BIGINT,
        -- 金额
        money BIGINT,
        -- 事件时间戳
        row_time AS cast(CURRENT_TIMESTAMP as timestamp(3)),
        -- watermark 设置
        WATERMARK FOR row_time AS row_time - INTERVAL '0' SECOND
    ) WITH (
      'connector' = 'datagen',
      'rows-per-second' = '10',
      'fields.user_id.min' = '1',
      'fields.user_id.max' = '100000',
      'fields.money.min' = '1',
      'fields.money.max' = '100000'
    );
    
    SELECT 
        window_start, 
        window_end,
        sum(money) as sum_money,
        count(distinct user_id) as count_distinct_id
    FROM TABLE(CUMULATE(
           TABLE cumulate_tvf
           , DESCRIPTOR(row_time)
           , INTERVAL '60' SECOND
           , INTERVAL '1' DAY))
    GROUP BY
        window_start, 
        window_end;

    一分钟内可以看到有结果输出,之后每分钟增加一条。结果如下

 

可以看到 Windowing TVF 滚动窗口的写法就是把 cumulate window 的声明写在了数据源的 Table 子句中,即 TABLE(CUMULATE(TABLE source_table, DESCRIPTOR(row_time), INTERVAL '60' SECOND, INTERVAL '1' DAY)),其中包含四部分参数:

第一个参数 TABLE source_table 声明数据源表;

第二个参数 DESCRIPTOR(row_time) 声明数据源的时间戳;

第三个参数 INTERVAL '60' SECOND 声明渐进式窗口触发的渐进步长为 1 min。

第四个参数 INTERVAL '1' DAY 声明整个渐进式窗口的大小为 1 天,到了第二天新开一个窗口重新累计。

 

三、​​​​​​​​​​​​​​Over Windows

OVER Window (over聚合)是传统数据库的标准开窗,不同于Group Window,OVER窗口中每1个元素都对应1个窗口。OVER窗口可以按照实际元素的行或实际的元素值(时间戳值)确定窗口,因此流数据元素可能分布在多个窗口中。

在应用OVER窗口的流式数据中,每1个元素都对应1个OVER窗口。每1个元素都触发1次数据计算,每个触发计算的元素所确定的行,都是该元素所在窗口的最后1行。在实时计算的底层实现中,OVER窗口的数据进行全局统一管理(数据只存储1份),逻辑上为每1个元素维护1个OVER窗口,为每1个元素进行窗口计算,完成计算后会清除过期的数据。

拿 over聚合 与 窗口聚合 做一个对比,其之间的直观不同之处在于:

  • 窗口聚合:不在 group by 中的字段,不能直接在 select 中拿到
  • Over 聚合:能够保留原始字段

Over 聚合的语法总结如下:

SELECT
    agg1(col1) OVER (definition1) AS colName,
    ...
    aggN(colN) OVER (definition1) AS colNameN
FROM Tab1;

其中:

  • agg1(col1):按照GROUP BY指定col1列对输入数据进行聚合计算。
  • OVER (definition1):OVER窗口定义。
  • AS colName:别名。

按照计算行的定义方式,OVER Window可以分为以下两类:

RANGE OVER Window:具有相同时间值的所有元素行视为同一计算行,即具有相同时间值的所有行都是同一个窗口。

ROWS OVER Window:每1行元素都被视为新的计算行,即每1行都是一个新的窗口。

 

1、​​​​​​时间区间聚合(RANGE OVER Window)

  • 窗口数据:

RANGE OVER Window所有具有共同元素值(元素时间戳)的元素行确定一个窗口。

  • 窗口语法:
SELECT
    agg1(col1) OVER(
     [PARTITION BY (value_expression1,..., value_expressionN)]
     ORDER BY timeCol
     RANGE 
     BETWEEN (UNBOUNDED | timeInterval) PRECEDING AND CURRENT ROW) AS colName,
...
FROM Tab1;
  • value_expression:进行分区的字表达式。
  • timeCol:元素排序的时间字段。
  • timeInterval:定义根据当前行开始向前追溯指定时间的元素行。

案例:每条数据对应最近 1 小时的区间,即最新输出的一条数据的 sum 聚合结果就是最近一小时数据的 amount 之和。

CREATE TABLE over_window_time (
    order_id BIGINT,
    product BIGINT,
    amount BIGINT,
    order_time as cast(CURRENT_TIMESTAMP as TIMESTAMP(3)),
    WATERMARK FOR order_time AS order_time - INTERVAL '0.001' SECOND
) WITH (
  'connector' = 'datagen',
  'rows-per-second' = '1',
  'fields.order_id.min' = '1',
  'fields.order_id.max' = '2',
  'fields.amount.min' = '1',
  'fields.amount.max' = '10',
  'fields.product.min' = '1',
  'fields.product.max' = '2'
);

SELECT product, order_time, amount,
  SUM(amount) OVER (
    PARTITION BY product
    ORDER BY order_time
    -- 标识统计范围是一个 product 的最近 1 小时的数据
    RANGE BETWEEN INTERVAL '1' HOUR PRECEDING AND CURRENT ROW
  ) AS one_hour_prod_amount_sum
FROM over_window_time;

结果如下:

2、​​​​​​​行数聚合

  • 窗口数据:

ROWS OVER Window的每个元素都确定一个窗口。

  • 窗口语法:
SELECT
    agg1(col1) OVER(
     [PARTITION BY (value_expression1,..., value_expressionN)]
     ORDER BY timeCol
     ROWS 
     BETWEEN (UNBOUNDED | rowCount) PRECEDING AND CURRENT ROW) AS colName, ...FROM Tab1; 
  • value_expression:分区值表达式。
  • timeCol:元素排序的时间字段。
  • rowCount:定义根据当前行开始向前追溯几行元素。

案例:最新输出的一条数据的 sum 聚合结果是最近 5 行数据的 amount 之和。

CREATE TABLE over_window_row (
    order_id BIGINT,
    product BIGINT,
    amount BIGINT,
    order_time as cast(CURRENT_TIMESTAMP as TIMESTAMP(3)),
    WATERMARK FOR order_time AS order_time - INTERVAL '0.001' SECOND
) WITH (
  'connector' = 'datagen',
  'rows-per-second' = '1',
  'fields.order_id.min' = '1',
  'fields.order_id.max' = '2',
  'fields.amount.min' = '1',
  'fields.amount.max' = '2',
  'fields.product.min' = '1',
  'fields.product.max' = '2'
);

SELECT product, order_time, amount,
  SUM(amount) OVER (
    PARTITION BY product
    ORDER BY order_time
    -- 标识统计范围是一个 product 的最近 5 行数据
    ROWS BETWEEN 5 PRECEDING AND CURRENT ROW
  ) AS five_rows_amount_sum
FROM over_window_row;

结果如下:


  • 📢博客主页:https://lansonli.blog.csdn.net
  • 📢欢迎点赞 👍 收藏 ⭐留言 📝 如有错误敬请指正!
  • 📢本文由 Lansonli 原创,首发于 CSDN博客🙉
  • 📢停下休息的时候不要忘了别人还在奔跑,希望大家抓紧时间学习,全力奔赴更美好的生活✨

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2129599.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

军事目标无人机视角检测数据集 3500张 坦克 带标注voc

数据集概述 该数据集包含3500张无人机拍摄的图像,主要用于坦克目标的检测。数据集已经按照VOC(Visual Object Classes)标准进行了标注,适用于训练深度学习模型,特别是物体检测模型。 数据集特点 目标明确&#xff1…

通信工程学习:什么是GFP通用成帧规范

GFP:通用成帧规范 GFP通用成帧规范(Generic Framing Procedure)是一种先进的数据业务适配的通用协议和映射技术,由国际电联ITU-T的G.7041标准定义。该技术旨在透明地将各种不同物理层或逻辑链路层信号适配进入SDH(同步…

C语言初识编译和链接

目录 翻译环境和运行环境编译环境预编译编译词法分析语法分析语义分析 汇编 链接运行环境 翻译环境和运行环境 在ANSI C的任何⼀种实现中,存在两个不同的环境。 第1种是翻译环境,在这个环境中源代码被转换为可执⾏的机器指令(⼆进制指令&…

【Vue】1.v-指令、computed、watch

1 Vue 实例 注&#xff1a;此文件是 vue 根实例&#xff0c;data 可以 是一个对象 即 data:{ } 但是在其他 .vue 组件文件中&#xff0c;data 必须 是一个函数&#xff0c;返回一个新的对象&#xff0c;以避免多个组件实例之间的数据相互干扰 即 data(){ } <!DOCTYPE html&g…

前端正确设置资源上下文路径ContextPath(发布目录outDir 、公共基础路径),保证打包部署后站点能正常加载资源。

文章目录 引言I 处理资源上下文路径ContextPathjavascript对象获取上下文路径使用`./` 加载资源文件Vite 的basepublicPath是webpack部署应用包时的基本 URLII 知识扩展:URL的识别2.1 标准的链接格式2.2 URL中的?涵义2.3 URL中的&涵义2.4 传参III #fragment3.1为网页位置…

Vue2使用Vue CLI学习笔记

Vue2构建项目分析 Vue学习官网 Vue CLI官方 # 全局安装&#xff0c;只要装一次&#xff0c;以管理员身份 npm install -g vue/cli # 查看脚手架工具版本 vue --version # 创建项目&#xff0c;注意路径&#xff0c;名称不能是中文 vue create my-project # 启动项目&#xff…

基于Ant-Design-Vue设计的配置化表单

适用vue 3.4 版本以上 在日常的前端开发中&#xff0c;表单开发必不可少&#xff0c;尤其是在遇到一些大型的复杂的表单&#xff0c;这往往会变成一个痛点。于是就想着开发一个可以list来配置的表单组件。 先上组件代码 <!-- 该组件 VUE 版本在 3.4 以上可使用--> <…

【AI绘画】Midjourney进阶:景别详解

博客主页&#xff1a; [小ᶻZ࿆] 本文专栏: AI绘画 | Midjourney 文章目录 &#x1f4af;前言&#x1f4af;为什么要学习景别景别的作用景别应用实例 &#x1f4af;大景别&#x1f4af;远景特点提示词书写技巧测试 &#x1f4af;全景特点提示词书写技巧测试注意点 &#x1f…

ozon免费选品工具,OZON免费选品神器

在跨境电商的浩瀚海洋中&#xff0c;寻找那片属于自己的盈利蓝海&#xff0c;是每个商家梦寐以求的目标。随着俄罗斯电商市场的迅速崛起&#xff0c;Ozon平台以其庞大的用户基数和不断增长的市场份额&#xff0c;成为了众多跨境卖家眼中的“香饽饽”。然而&#xff0c;面对琳琅…

【渗透测试】——DVWA靶场搭建

&#x1f4d6; 前言&#xff1a;DVWA&#xff08;Damn Vulnerable Web Application&#xff09;是一个用于安全漏洞测试的 PHP/MySQL 网络应用&#xff0c;旨在为安全专业人士提供一个合法的环境&#xff0c;以测试他们的技能和工具&#xff0c;同时帮助 Web 开发者更好地理解 …

计算机的信息编码和基本运算(上)

大家好我是清墨&#xff0c;今天同同同样来分享一下笔记。 计算机的信息编码 计算机用二进制编码的方式来表示和存储信息&#xff0c;我们见到的信息&#xff08;文字、图片等&#xff09;都是经过转换处理的。 ASCII&#xff08;American Standard Code for Information Int…

[001-02-001]. 第07-02节:线程的创建与使用

我的后端学习大纲 我的Java学习大纲 1、方式1&#xff1a;继承Thread类&#xff1a; 1.1.实现步骤 1.创建一个继承于Thred()类的子类2.重写Thread类的run()3.创建Thread类的子类的对象4.通过这个对象去调用start()方法 在调用start方法时就做了两件事&#xff0c;分别是&…

贪心问题———区间覆盖

输入样例&#xff1a; 1 5 3 -1 3 2 4 3 5 输出样例&#xff1a; 2 分析&#xff1a; 我们根据贪心的思路&#xff0c;能覆盖更大的范围就意味着能用更少的区间段 我们将线段从左端点进行排序 代码演示&#xff1a; #include <iostream> #include <vector>…

从0开始的算法(数据结构和算法)基础(十一)

回溯算法 什么是回溯算法 回溯算法&#xff0c;根据字面意思来理解这个算法是将每一步的操作可以进行回溯&#xff0c;实际上是对这个每一步的操作进行记录&#xff0c;确保可以返回上一步的操作&#xff0c;可能是对回溯操作之前的做一个复现&#xff0c;也有可能是可操作的回…

STM32与ESP8266的使用

串口透传 “透传”通常指的是数据的透明传输&#xff0c;意思是在不对数据进行任何处理或修改的情况下&#xff0c;将数据从一个接口转发到另一个接口。值得注意的是要避免串口之间无限制的透明&#xff0c;可以采用互斥锁的方式进行限制使用方法 对USART1和USART3(用他俩举例…

高效物流管理从固乔快递批量查询助手开始

固乔快递批量查询助手&#xff1a;物流管理的智能化升级 固乔快递查询助手&#xff1a;批量追踪&#xff0c;物流无忧 轻松应对海量单号&#xff0c;固乔快递批量查询助手来帮忙 跨境电商新利器&#xff1a;固乔快递批量查询助手 高效物流管理从固乔快递批量查询助手开始 …

Spring-AOP核心源码、原理详解前篇

本文主要分4部分 Aop原理介绍介绍aop相关的一些类通过源码详解aop代理的创建过程通过源码详解aop代理的调用过程Aop代理一些特性的使用案例 Spring AOP原理 原理比较简单&#xff0c;主要就是使用jdk动态代理和cglib代理来创建代理对象&#xff0c;通过代理对象来访问目标对象…

漏洞复现-泛微-E-Cology-SQL

更多漏洞分析复现&#xff0c;可前往无问社区查看http://www.wwlib.cn/index.php/artread/artid/10358.html 0x01 产品简介 泛微e-cology是一款由泛微网络科技开发的协同管理平台&#xff0c;支持人力资源、财务、行政等多功能管理和移动办公。 0x02 漏洞概述 泛微e-cology…

路由器WAN口和LAN口有什么不一样?

“ 路由器WAN口和LAN口的区别&#xff0c;WAN是广域网端口&#xff0c;LAN是本地网端口。WAN主要用于连接外部网络&#xff0c;而LAN用来连接家庭内部网络&#xff0c;两者主要会在标识上面有区别。以往大部分路由器的WAN只有一个&#xff0c;LAN口则有四个或以上&#xff0c;近…

shader 案例学习笔记之偏移

效果 代码 #ifdef GL_ES precision mediump float; #endifuniform vec2 u_resolution; uniform float u_time;vec2 brickTile(vec2 _st, float _zoom){_st * 5.;_st.x step(1., mod(_st.y,2.0)) * 0.5;return fract(_st); }float box(vec2 _st, vec2 _size){_size vec2(0.5)…