flink的分组聚合、over聚合、窗口聚合对比

news2025/1/23 13:01:03

【背景】

flink有几种聚合,使用上是有一些不同,需要加以区分:

分组聚合:group agg

over聚合:over agg

窗口聚合:window agg

省流版:

触发计算时机

结果流类型

状态大小

分组聚合group agg

每当有新行就输出更新的结果

update流

保持中间结果,所以状态可能无限膨胀

over agg

每当有新行就输出更新的结果,类似一个滑动窗口

append流

保持中间结果,所以状态可能无限膨胀

window agg

窗口结束产生一个总的聚合结果

append流

不生成中间结果,自动清除状态

下面是详细对比和具体的例子(主要讨论的是流处理下的情况)。

over聚合:over agg

OVER 聚合通过排序后的范围数据为每行输入计算出聚合值。和 GROUP BY 聚合不同, OVER 聚合不会把结果通过分组减少到一行,它会为每行输入增加一个聚合值,结果是一个append流


 OVER 窗口的语法。

SELECT
  agg_func(agg_col) OVER (
    [PARTITION BY col1[, col2, ...]]
    ORDER BY time_col
    range_definition),
  ...
FROM ...

over聚合很少用到,所以本地自己做了一个测试:

测试sql如下:
create table test_window_tab

(

    region String

    ,qa_id String

    ,count_qa_id Bigint

) COMMENT ''

with

(

'properties.bootstrap.servers' ='',

'json.fail-on-missing-field' = 'false',

'connector' = 'kafka',

'format' = 'json',

'topic' = 'test_window_tab'

)

;

create table dwm_qa_score

(

    ,qa_id String   

    ,agent_id String

    ,region String

    ,saas_id String

    ,version_timestamp bigint

    , ts as to_timestamp(from_unixtime(`version_timestamp`, 'yyyy-MM-dd HH:mm:ss'))

    ,`event_time` TIMESTAMP(3) METADATA FROM 'timestamp' VIRTUAL

    ,WATERMARK FOR `ts` AS `ts` - INTERVAL '10' SECOND

) COMMENT ''

with

(

'properties.bootstrap.servers' ='',

'json.fail-on-missing-field' = 'false',

'connector' = 'kafka',

'format' = 'json',

'scan.startup.mode' = 'earliest-offset',

'topic' = 'dwm_qa_score'

)

;

insert into test_window_tab(region,qa_id,count_qa_id)

select region,qa_id,count(1)  over w as count_qa_id

from dwm_qa_score



window w as(

partition by region,qa_id

order by ts

rows between 2 preceding and current row

)

dwm_qa_score这个topic现有数据

{ "qa_id": "123", "agent_id": "497235295815123",

"region": "TH", "version_timestamp": 1709807228

}

{ "qa_id": "123", "agent_id": "497235295815123",

"region": "TH", "version_timestamp": 1709807228

}

{ "qa_id": "123", "agent_id": "497235295815123",

"region": "TH", "version_timestamp": 1709807228

}

{ "qa_id": "123", "agent_id": "497235295815123",

"region": "TH", "version_timestamp": 1709807228

}

{ "qa_id": "123", "agent_id": "497235295815123",

"region": "TH", "version_timestamp": 1709807228

}

{ "qa_id": "1234", "agent_id": "497235295815123",

"region": "TH", "version_timestamp": 1709807228

}

数据选择offset=ealiest-offset运行程序得到结果如下

{"region":"TH","qa_id":"123","count_qa_id":1}

{"region":"TH","qa_id":"123","count_qa_id":2}

{"region":"TH","qa_id":"123","count_qa_id":3}

{"region":"TH","qa_id":"123","count_qa_id":3}

{"region":"TH","qa_id":"123","count_qa_id":3}

{"region":"TH","qa_id":"1234","count_qa_id":1}

这里注意:

  1. 数据都会返回一个聚合
  2. 由于我们rows between 2 preceding and current row所以count_qa_id最多3

如果此时dwm_qa_score这个topic插入新数据

{ "qa_id": "1234", "agent_id": "497235295815123",

"region": "TH"

}

或者

{ "qa_id": "1234", "agent_id": "497235295815123",

"region": "TH","version_timestamp": null

}

或者

{ "qa_id": "1234", "agent_id": "497235295815123",

"region": "TH","version_timestamp": 0

}

发现flink作业输出record多了一条

但是在目标kafka:test_window_tab中没有新增结果

原因是我们插入的新数据中没有version_timestamp这一列为空或为0

如果往dwm_qa_score这个topic插入新数据:

{

"qa_id": "1234",

"region": "TH",

"version_timestamp": 1710145110

}

则可以看到对应目标kafka:test_window_tab中会新增结果数据

{"region":"TH","qa_id":"1234","count_qa_id":2}

如果等一分钟后,再次往dwm_qa_score这个topic插入新数据:

{

"qa_id": "1234",

"region": "TH",

"version_timestamp": 1710145110

}

则在目标kafka:test_window_tab中没有新增结果原因应该数据过期丢弃watermark)

你可以在一个 SELECT 子句中定义多个 OVER 窗口聚合。然而,对于流式查询,由于目前的限制,所有聚合的 OVER 窗口必须是相同的

ORDER BY

OVER 窗口需要数据是有序的。因为表没有固定的排序,所以 ORDER BY 子句是强制的。对于流式查询,Flink 目前只支持 OVER 窗口定义在升序(asc) 时间属性 上。其他的排序不支持。

PARTITION BY

OVER 窗口可以定义在一个分区表上。PARTITION BY 子句代表着每行数据只在其所属的数据分区进行聚合。

范围(RANGE)定义

范围(RANGE)定义指定了聚合中包含了多少行数据。范围通过 BETWEEN 子句定义上下边界,其内的所有行都会聚合。Flink 只支持 CURRENT ROW 作为上边界。

有两种方法可以定义范围:ROWS 间隔 和 RANGE 间隔

RANGE 间隔

RANGE 间隔是定义在排序列值上的,在 Flink 里,排序列总是一个时间属性。下面的 RANG 间隔定义了聚合会在比当前行的时间属性小 30 分钟的所有行上进行。

RANGE BETWEEN INTERVAL '30' MINUTE PRECEDING AND CURRENT ROW

ROW 间隔

ROWS 间隔基于计数。它定义了聚合操作包含的精确行数。下面的 ROWS 间隔定义了当前行 + 之前的 10 行(也就是11行)都会被聚合。

ROWS BETWEEN 10 PRECEDING AND CURRENT ROW

常见错误

OVER windows' ordering in stream mode must be defined on a time attribute.

 这个报错,是建表的时候需要指定时间语义的字段,WATERMARK 是必须的,而且WATERMARK所用字段必须是order by的时间字段例如下面 order by load_date那么WATERMARK就要load_date生成,即WATERMARK FOR load_date AS load_date - INTERVAL '1' MINUTE

object SqlOverRows02 {
  def main(args: Array[String]): Unit = {
    val settings = EnvironmentSettings.newInstance().inStreamingMode().build()
    val tEnv = TableEnvironment.create(settings)

    tEnv.executeSql(
      """
        |create table projects(
        |id int,
        |name string,
        |score double,
        |load_date timestamp(3),
        |WATERMARK FOR load_date AS load_date - INTERVAL '1' MINUTE
        |)with(
        |'connector' = 'kafka',
        |'topic' = 'test-topic',
        |'properties.bootstrap.servers' = 'server120:9092',
        |'properties.group.id' = 'testGroup',
        |'scan.startup.mode' = 'latest-offset',
        |'format' = 'csv'
        |)
        |""".stripMargin)
    tEnv.executeSql(
      """
        |select
        | name,
        | max(score)
        |   over(partition by name
        |     order by load_date
        |     RANGE BETWEEN INTERVAL '10' SECOND PRECEDING AND CURRENT ROW )max_score,
        | min(score)
        |   over(partition by name
        |     order by load_date
        |     RANGE BETWEEN INTERVAL '10' SECOND PRECEDING AND CURRENT ROW )min_score,
        | current_time
        | from
        | projects
        |""".stripMargin).print()
  }
}

分组聚合:group agg

Apache Flink 支持标准的 GROUP BY 子句来聚合数据。

SELECT COUNT(*) FROM Orders GROUP BY order_id

特点:

1、聚合函数把多行输入数据计算为一行结果。例如,有一些聚合函数可以计算一组行的 “COUNT”、“SUM”、“AVG”、“MAX”和 “MIN”。

2、对于流式查询,重要的是要理解 Flink 运行的是连续查询,永远不会终止,会根据其输入表的更新来更新其结果表。对于上述查询,每当有新行插入 Orders 表时,Flink 都会实时计算并输出更新后的结果。

 3、对于流式查询,用于计算查询结果的状态可能无限膨胀。状态的大小取决于分组的数量以及聚合函数的数量和类型。例如:MIN/MAX 的状态是重量级的,COUNT 是轻量级的,因为COUNT只需要保存计数值。

因此,可以设置table-exec-state-ttl,但是可能会影响查询结果的正确性,因为状态超时会被丢弃。

注意:

Flink 对于分组聚合提供了一系列性能优化的方法。更多参见:性能优化,包括MiniBatch 聚合、Local-Global 聚合、拆分 distinct 聚合、在 distinct 聚合上使用 FILTER 修饰符 、MiniBatch Regular Joins

窗口聚合:window agg

窗口聚合是通过 GROUP BY 子句定义的,其特征是包含 窗口表值函数 产生的 “window_start” 和 “window_end” 列(必须包含,否则就变成分组聚合等了)。和普通的 GROUP BY 子句一样,窗口聚合对于每个组会计算出一行数据。

SELECT ...
FROM <windowed_table> -- relation applied windowing TVF
GROUP BY window_start, window_end, ...

窗口聚合不产生中间结果,只在窗口结束产生一个总的聚合结果,另外,窗口聚合会清除不需要的中间状态(watermark超过窗口end+allowlateness,就会销毁窗口)。

具体例子:

SELECT window_start, window_end, SUM(price) AS

total_price

FROM TABLE(

    TUMBLE(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '10' MINUTES))

GROUP BY window_start, window_end;

+------------------+------------------+-------------+

|     window_start |       window_end | total_price |

+------------------+------------------+-------------+

| 2020-04-15 08:00 | 2020-04-15 08:10 | 11.00 |

| 2020-04-15 08:10 | 2020-04-15 08:20 | 10.00 |

+------------------+------------------+-------------+

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1508277.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

【RabbitMQ】RabbitMQ的交换机

交换机类型 在上文中&#xff0c;都没有交换机&#xff0c;生产者直接发送消息到队列。而一旦引入交换机&#xff0c;消息发送的模式会有很大变化&#xff1a;可以看到&#xff0c;在订阅模型中&#xff0c;多了一个exchange角色&#xff0c;而且过程略有变化&#xff1a; Pub…

ThreadLocal出现内存泄露原因分析

ThreadLocal 导致内存泄漏的主要原因是它的工作方式。在 Java 中&#xff0c;ThreadLocal 通过维护一个以 Thread 为键&#xff0c;以用户设置的值为值的映射来工作。每个线程都拥有其自身的线程局部变量副本&#xff0c;不同线程间的这些变量互不干扰。这个映射是存储在每个 T…

EpiCypher—SMARCAL1介导的肿瘤免疫逃避

资深作者Alberto Ciccia博士(左)和第一作者Giuseppe Leuzzi博士。 在新的博客系列中&#xff0c;我们将对话最近发表染色质研究工作的首席科学家们&#xff0c;了解他们的幕后工作。在EpiCypher&#xff0c;我们很感兴趣染色质技术是如何整合到不同领域的&#xff0c;如免疫学、…

【Ubuntu】gonme桌面的 gdm 和 lightdm 区别

总结&#xff1a;都可以 gdm: 【Gnome Display Manager】 完整&#xff0c;体积大 lightdm: 【Light Display Manager】 轻量

HBase非关系型数据库

HBase非关系型数据库 1 什么是HBase2 HBase的特点3 什么时候需要HBase4 HBase的数据模型5 HBase架构5.1 架构5.2 HBase如何列式储存 6 如何正确设计RowKey 1 什么是HBase HBase – Hadoop Database&#xff0c;是一个高可靠性、高性能、面向列、可伸缩、 实时读写的分布式数据…

趣味看图-Linux 文件系统的组成

/&#xff08;根目录&#xff09;&#xff1a;根目录是Linux文件系统中的顶级目录。所有其他目录都是根目录的子目录&#xff0c;使其成为整个文件系统的父目录。 /bin&#xff1a;包含启动系统和执行基本操作所需的基本二进制可执行文件。这些对所有用户都可用。 /boot&…

手撕栈和队列

接下来的日子会顺顺利利&#xff0c;万事胜意&#xff0c;生活明朗-----------林辞忧 引言 栈和队列作为数据结构的重要组成部分&#xff0c;可以用栈实现非递归等&#xff0c;为后面学习打基础。栈由数组来实现&#xff0c;队列由链表来实现&#xff0c;接下来将详细介绍 …

C++实现引用计数(二)

实现引用计数 引言实现集成开发环境项目结构实现代码运行结果 注意 引言 C中经常使用智能指针来管理内存。对于共享指针shared_ptr的原理&#xff1a;每当有一个指针指向这块内存&#xff0c;引用计数的值加一&#xff0c;每当一个指针不再指向这块内存&#xff0c;引用计数的…

【OpenGL手册14】投光物

目录 一、说明二、平行光三、点光源四、衰减五、选择正确的值六、实现衰减七、聚光八、手电筒九、平滑/软化边缘练习 一、说明 我们目前使用的光照都来自于空间中的一个点。它能给我们不错的效果&#xff0c;但现实世界中&#xff0c;我们有很多种类的光照&#xff0c;每种的表…

员工私单亡羊补牢!这个监管神器让你从此放心!

对于企业而言&#xff0c;如何有效监管员工的微信使用成为了一项重要的任务。幸运的是&#xff0c;现在有一个监管神器——微信管理系统&#xff0c;可以帮助企业从根本上解决这个问题。 接下来&#xff0c;让我们一起来看看这个监管神器究竟有哪些神奇的功能吧&#xff01; …

5款好用的AI办公软件,一键轻松制作PPT、视频,提升工作效率!

众所周知&#xff0c;AI 人工智能技术已渗透到生活的方方面面&#xff0c;无论是很多人早已用上的智能音箱、语音助手&#xff0c;还是新近诞生的各种 AI 软件工具&#xff0c;背后都离不开 AI 人工智能技术的加持。 对于各类新生的 AI 软件工具&#xff0c;人们很容易「选边站…

亚马逊多账号怎么防关联?超级浏览器来帮你!

很多做亚马逊跨境电商的小伙伴都会遇到的问题就是多登店铺账号被关联&#xff0c;我们要知道&#xff0c;如果在亚马逊上运营多个店铺&#xff0c;保持账户之间的独立性是很重要的。一旦账户之间被平台识别为关联&#xff0c;不仅可能导致收入损失&#xff0c;还可能面临账号被…

短视频,文案素材哪里找?找短视频文案素材指南

有很多创业者&#xff0c;耗费了很长时间创作出了很多短视频&#xff0c;文案素材没写好&#xff0c;导致浏览量下降&#xff0c;或者写文案的同时就已经花费了很长时间&#xff0c;导致没有时间发布视频&#xff0c;从而影响了流量&#xff0c;导致流量下滑&#xff0c;其实我…

React-嵌套路由

1.概念 说明&#xff1a;在一级路由中又内嵌了其他路由&#xff0c;这种关系就叫做嵌套路由&#xff0c;嵌套至一级路由内的路由又称作二级路由。 2.实现步骤 说明&#xff1a;使用childen属性配置路由嵌套关系&#xff0c;使用<Outlet/>组件配置二级路由渲染的位置。…

RK3588-hdmiin

1. HDMI-IN简介 HDMI IN功能可以通过桥接芯⽚的⽅式实现&#xff0c;将HDMI信号转换成MIPI信号接收RK3588芯⽚平台⾃带HDMI RX模块&#xff0c;可以直接接收HDMI信号&#xff0c;无需通过桥接芯⽚实现。在ArmSoM系列产品中&#xff0c;ArmSoM-W3支持HDMI-IN功能HDMI-IN功能框图…

容灾演练双月报|美创DRCC助力银行高效验证数据库高可用架构

了解更多灾备行业动态 守护数字化时代业务连续 目录 CONTENTS 01 灾备法规政策 02 热点安全事件 03 容灾演练典型案例 01 灾备法规政策 2月&#xff0c;工信部印发《工业领域数据安全能力提升实施方案&#xff08;2024—2026年&#xff09;》&#xff0c;要求到2026年…

Java语言的演变之路:从过去到现在,以及未来的展望与探索

引言 Java&#xff0c;这个在全球范围内广受欢迎的编程语言&#xff0c;自其诞生之日起&#xff0c;就注定要在计算机科学的历史上留下浓墨重彩的一笔。起源于20世纪90年代&#xff0c;Java的初衷是打造一种能够在多种平台上运行的、安全且易于使用的编程语言。它不仅成功地实…

MySQL-QA-异常问题及解决方案(持续更新)

MySQL-Q&A(持续更新) 1.1 PID文件找不到 问题描述 错误详情&#xff1a; ERROR&#xff01;The server quit without updating PID file (/usr/local/mysql/data/localhost.localdomain.pid) 解决方案 首先排查配置文件&#xff0c;一般路径为&#xff1a;/etc/my.cnf 检查…

【C++】stack/queue

链表完了之后就是我们的栈和队列了&#xff0c;当然我们的STL中也有实现&#xff0c;下面我们先来看一下简单用法&#xff0c;跟我们之前C语言实现的一样&#xff0c;stack和queue有这么几个重要的成员函数 最主要的就是这么几个&#xff1a;empty&#xff0c;push&#xff0c;…

嘿!AI 编码新玩法上线!

随着 AI 智能浪潮到来&#xff0c;AI 编码助手成为越来越多开发者的必备工具&#xff0c;将开发者从繁重的编码工作中解放出来&#xff0c;极大地提高了编程效率&#xff0c;帮助开发者实现更快、更好的代码编写。 通义灵码正是这样一款基于阿里云通义代码大模型打造的智能编码…