【大数据】Flink SQL 语法篇(四):Group 聚合、Over 聚合

news2025/1/13 7:41:26

Flink SQL 语法篇(四):Group 聚合、Over 聚合

  • 1.Group 聚合
    • 1.1 基础概念
    • 1.2 窗口聚合和 Group 聚合
    • 1.3 SQL 语义
    • 1.4 Group 聚合支持 Grouping sets、Rollup、Cube
  • 2.Over 聚合
    • 2.1 时间区间聚合
    • 2.2 行数聚合

1.Group 聚合

1.1 基础概念

Group 聚合定义(支持 Batch / Streaming 任务):Flink 也支持 Group 聚合。Group 聚合和上面介绍到的窗口聚合的不同之处,就在于 Group 聚合是按照数据的类别进行分组,比如年龄、性别,是横向的;而窗口聚合是在时间粒度上对数据进行分组,是纵向的。如下图所示,就展示出了其区别。其中 按颜色分 key(横向)就是 Group 聚合按窗口划分(纵向)就是 窗口聚合

在这里插入图片描述

1.2 窗口聚合和 Group 聚合

应用场景:一般用于对数据进行分组,然后后续使用聚合函数进行 countsum 等聚合操作。

那么这时候,小伙伴萌就会问到,我其实可以把窗口聚合的写法也转换为 Group 聚合,只需要把 Group 聚合的 Group By key 换成时间就行,那这两个聚合的区别到底在哪?

首先来举一个例子看看怎么将 窗口聚合 转换为 Group 聚合。假如一个窗口聚合是按照 1 1 1 分钟的粒度进行聚合,如下 滚动窗口 SQL:

-- 数据源表
CREATE TABLE source_table (
    -- 维度数据
    dim STRING,
    -- 用户 id
    user_id BIGINT,
    -- 用户
    price BIGINT,
    -- 事件时间戳
    row_time AS cast(CURRENT_TIMESTAMP as timestamp(3)),
    -- watermark 设置
    WATERMARK FOR row_time AS row_time - INTERVAL '5' SECOND
) WITH (
  'connector' = 'datagen',
  'rows-per-second' = '10',
  'fields.dim.length' = '1',
  'fields.user_id.min' = '1',
  'fields.user_id.max' = '100000',
  'fields.price.min' = '1',
  'fields.price.max' = '100000'
)

-- 数据汇表
CREATE TABLE sink_table (
    dim STRING,
    pv BIGINT,
    sum_price BIGINT,
    max_price BIGINT,
    min_price BIGINT,
    uv BIGINT,
    window_start bigint
) WITH (
  'connector' = 'print'
)

-- 数据处理逻辑
insert into sink_table
select dim,
    count(*) as pv,
    sum(price) as sum_price,
    max(price) as max_price,
    min(price) as min_price,
    -- 计算 uv 数
    count(distinct user_id) as uv,
    UNIX_TIMESTAMP(CAST(tumble_start(row_time, interval '1' minute) AS STRING)) * 1000  as window_start
from source_table
group by
    dim,
    -- 按照 Flink SQL tumble 窗口写法划分窗口
    tumble(row_time, interval '1' minute)

转换为 Group 聚合 的写法如下:

-- 数据源表
CREATE TABLE source_table (
    -- 维度数据
    dim STRING,
    -- 用户 id
    user_id BIGINT,
    -- 用户
    price BIGINT,
    -- 事件时间戳
    row_time AS cast(CURRENT_TIMESTAMP as timestamp(3)),
    -- watermark 设置
    WATERMARK FOR row_time AS row_time - INTERVAL '5' SECOND
) WITH (
  'connector' = 'datagen',
  'rows-per-second' = '10',
  'fields.dim.length' = '1',
  'fields.user_id.min' = '1',
  'fields.user_id.max' = '100000',
  'fields.price.min' = '1',
  'fields.price.max' = '100000'
);

-- 数据汇表
CREATE TABLE sink_table (
    dim STRING,
    pv BIGINT,
    sum_price BIGINT,
    max_price BIGINT,
    min_price BIGINT,
    uv BIGINT,
    window_start bigint
) WITH (
  'connector' = 'print'
);

-- 数据处理逻辑
insert into sink_table
select dim,
    count(*) as pv,
    sum(price) as sum_price,
    max(price) as max_price,
    min(price) as min_price,
    -- 计算 uv 数
    count(distinct user_id) as uv,
    cast((UNIX_TIMESTAMP(CAST(row_time AS STRING))) / 60 as bigint) as window_start
from source_table
group by
    dim,
    -- 将秒级别时间戳 / 60 转化为 1min
    cast((UNIX_TIMESTAMP(CAST(row_time AS STRING))) / 60 as bigint)

确实没错,上面这个转换是一点问题都没有的。

但是窗口聚合和 Group by 聚合的差异在于:

  • 本质区别窗口聚合是具有时间语义的,其本质是想实现窗口结束输出结果之后,后续有迟到的数据也不会对原有的结果发生更改了,即输出结果值是定值(不考虑 allowLateness)。而 Group by 聚合是没有时间语义的,不管数据迟到多长时间,只要数据来了,就把上一次的输出的结果数据撤回,然后把计算好的新的结果数据发出。
  • 运行层面:窗口聚合是和 时间 绑定的,窗口聚合其中窗口的计算结果触发都是由 时间(Watermark)推动的。Group by 聚合完全由 数据 推动触发计算,新来一条数据去根据这条数据进行计算出结果发出;由此可见两者的实现方式也大为不同。

1.3 SQL 语义

SQL 语义这里也拿离线和实时做对比,Order 为 Kafka,target_table 为 Kafka,这个 SQL 生成的实时任务,在执行时,会生成三个算子。

  • 数据源算子From Order):数据源算子一直运行,实时的从 Order Kafka 中一条一条的读取数据,然后一条一条发送给下游的 Group 聚合算子,向下游发送数据的 shuffle 策略是根据 group by 中的 key 进行发送,相同的 key 发到同一个 SubTask(并发) 中。
  • Group 聚合算子group by key + sum / count / max / min):接收到上游算子发的一条一条的数据,去状态 state 中找这个 key 之前的 sum / count / max / min 结果。如果有结果 oldResult,拿出来和当前的数据进行 sum / count / max / min 计算出这个 key 的新结果 newResult,并将新结果 [key, newResult] 更新到 state 中,在向下游发送新计算的结果之前,先发一条撤回上次结果的消息 -[key, oldResult],然后再将新结果发往下游 +[key, newResult];如果 state 中没有当前 key 的结果,则直接使用当前这条数据计算 sum / max / min 结果 newResult,并将新结果 [key, newResult] 更新到 state 中,当前是第一次往下游发,则不需要先发回撤消息,直接发送 +[key, newResult]
  • 数据汇算子INSERT INTO target_table):接收到上游发的一条一条的数据,写入到 target_table Kafka 中这个实时任务也是 24 24 24 小时一直在运行的,所有的算子在同一时刻都是处于 running 状态的。

1.4 Group 聚合支持 Grouping sets、Rollup、Cube

Group 聚合也支持 Grouping setsRollupCube。举一个 Grouping sets 的案例:

SELECT 
    supplier_id
    , rating
    , product_id
    , COUNT(*)
FROM (VALUES
    ('supplier1', 'product1', 4),
    ('supplier1', 'product2', 3),
    ('supplier2', 'product3', 3),
    ('supplier2', 'product4', 4))
AS Products(supplier_id, product_id, rating)
GROUP BY GROUPING SET (
    ( supplier_id, product_id, rating ),
    ( supplier_id, product_id         ),
    ( supplier_id,             rating ),
    ( supplier_id                     ),
    (              product_id, rating ),
    (              product_id         ),
    (                          rating ),
    (                                 )
)

2.Over 聚合

Over 聚合定义(支持 Batch / Streaming):可以理解为是一种特殊的滑动窗口聚合函数。

那这里我们拿 Over 聚合窗口聚合 做一个对比,其之间的最大不同之处在于:

  • 窗口聚合:不在 group by 中的字段,不能直接在 select 中拿到。
  • Over 聚合:能够保留原始字段。

注意:其实在生产环境中,Over 聚合的使用场景还是比较少的。在 Hive 中也有相同的聚合,但是小伙伴萌可以想想你在离线数仓经常使用嘛?

  • 应用场景:计算最近一段滑动窗口的聚合结果数据。
  • 实际案例:查询每个产品最近一小时订单的金额总和。
SELECT order_id, order_time, amount,
  SUM(amount) OVER (
    PARTITION BY product
    ORDER BY order_time
    RANGE BETWEEN INTERVAL '1' HOUR PRECEDING AND CURRENT ROW
  ) AS one_hour_prod_amount_sum
FROM Orders
  • Over 聚合的语法总结如下:
SELECT
  agg_func(agg_col) OVER (
    [PARTITION BY col1[, col2, ...]]
    ORDER BY time_col
    range_definition),
  ...
FROM ...
  • ORDER BY:必须是时间戳列(事件时间、处理时间)。
  • PARTITION BY:标识了聚合窗口的聚合粒度,如上述案例是按照 product 进行聚合。
  • range_definition:这个标识聚合窗口的聚合数据范围,在 Flink 中有两种指定数据范围的方式。第一种为 按照行数聚合,第二种为 按照时间区间聚合。如下案例所示。

2.1 时间区间聚合

按照时间区间聚合就是时间区间的一个滑动窗口,比如下面案例 1 1 1 小时的区间,最新输出的一条数据的 sum 聚合结果就是最近一小时数据的 amount 之和。

CREATE TABLE source_table (
    order_id BIGINT,
    product BIGINT,
    amount BIGINT,
    order_time as cast(CURRENT_TIMESTAMP as TIMESTAMP(3)),
    WATERMARK FOR order_time AS order_time - INTERVAL '0.001' SECOND
) WITH (
  'connector' = 'datagen',
  'rows-per-second' = '1',
  'fields.order_id.min' = '1',
  'fields.order_id.max' = '2',
  'fields.amount.min' = '1',
  'fields.amount.max' = '10',
  'fields.product.min' = '1',
  'fields.product.max' = '2'
);

CREATE TABLE sink_table (
    product BIGINT,
    order_time TIMESTAMP(3),
    amount BIGINT,
    one_hour_prod_amount_sum BIGINT
) WITH (
  'connector' = 'print'
);

INSERT INTO sink_table
SELECT product, order_time, amount,
  SUM(amount) OVER (
    PARTITION BY product
    ORDER BY order_time
    -- 标识统计范围是一个 product 的最近 1 小时的数据
    RANGE BETWEEN INTERVAL '1' HOUR PRECEDING AND CURRENT ROW
  ) AS one_hour_prod_amount_sum
FROM source_table

2.2 行数聚合

按照行数聚合就是数据行数的一个滑动窗口,比如下面案例,最新输出的一条数据的 sum 聚合结果就是最近 5 5 5 行数据的 amount 之和。

CREATE TABLE source_table (
    order_id BIGINT,
    product BIGINT,
    amount BIGINT,
    order_time as cast(CURRENT_TIMESTAMP as TIMESTAMP(3)),
    WATERMARK FOR order_time AS order_time - INTERVAL '0.001' SECOND
) WITH (
  'connector' = 'datagen',
  'rows-per-second' = '1',
  'fields.order_id.min' = '1',
  'fields.order_id.max' = '2',
  'fields.amount.min' = '1',
  'fields.amount.max' = '2',
  'fields.product.min' = '1',
  'fields.product.max' = '2'
);

CREATE TABLE sink_table (
    product BIGINT,
    order_time TIMESTAMP(3),
    amount BIGINT,
    one_hour_prod_amount_sum BIGINT
) WITH (
  'connector' = 'print'
);

INSERT INTO sink_table
SELECT product, order_time, amount,
  SUM(amount) OVER (
    PARTITION BY product
    ORDER BY order_time
    -- 标识统计范围是一个 product 的最近 5 行数据
    ROWS BETWEEN 5 PRECEDING AND CURRENT ROW
  ) AS one_hour_prod_amount_sum
FROM source_table

预跑结果如下:

+I[2, 2021-12-24T22:18:19.147, 1, 9]
+I[1, 2021-12-24T22:18:20.147, 2, 11]
+I[1, 2021-12-24T22:18:21.147, 2, 12]
+I[1, 2021-12-24T22:18:22.147, 2, 12]
+I[1, 2021-12-24T22:18:23.148, 2, 12]
+I[1, 2021-12-24T22:18:24.147, 1, 11]
+I[1, 2021-12-24T22:18:25.146, 1, 10]
+I[1, 2021-12-24T22:18:26.147, 1, 9]
+I[2, 2021-12-24T22:18:27.145, 2, 11]
+I[2, 2021-12-24T22:18:28.148, 1, 10]
+I[2, 2021-12-24T22:18:29.145, 2, 10]

当然,如果你在一个 SELECT 中有多个聚合窗口的聚合方式,Flink SQL 支持了一种简化写法,如下案例:

SELECT order_id, order_time, amount,
  SUM(amount) OVER w AS sum_amount,
  AVG(amount) OVER w AS avg_amount
FROM Orders
-- 使用下面子句,定义 Over Window
WINDOW w AS (
  PARTITION BY product
  ORDER BY order_time
  RANGE BETWEEN INTERVAL '1' HOUR PRECEDING AND CURRENT ROW)

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1471084.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Java学习笔记------多态

什么是多态 同类型的对象,表现出的不同形态 多态的表现形式 父类类型 对象名称子类对象; 多态的前提 有继承关系 有父类引用指向子类对象 有方法重写 多态调用成员的特点 变量调用:编译看左边,运行也看左边 方法调用&am…

力扣LCR 140. 训练计划 II(顺序遍历,快慢指针)

Problem: LCR 140. 训练计划 II 文章目录 题目描述思路复杂度Code 题目描述 思路 思路1:顺序遍历 欲返回倒数第cnt个节点则需要顺序遍历到len-cnt(其中len为链表的长度) 思路2:快慢指针 让一个快指针fast指向cnt 1个节点&#x…

Spring ReflectionUtils 反射工具介绍和使用

一、ReflectionUtils 在 Java 中,反射(Reflection)是一种强大的机制,允许程序在运行时动态地检查类、获取类的信息、调用类的方法、访问或修改类的属性等。Java 的反射机制提供了一组类和接口,位于 java.lang.reflect…

python爬取网站内容写入xls

目标 现需要对下面网站资源进行爬取,文学人物-名人明星网 获取人物名字获取人物头像获取人物简介 资源获取 通过requests库,我们可以让 Python 程序向浏览器一样向 Web 服务器发起请求,并接收服务器返回的响应,从响应中我们就…

Python爬虫-爬取B站番剧封面

本文是本人最近学习Python爬虫所做的小练习。如有侵权,请联系删除。 页面获取url 代码 import requests import os import re# 创建文件夹 path os.getcwd() /images if not os.path.exists(path):os.mkdir(path)# 当前页数 page 1 # 总页数 total_page 2# 自动…

【文生视频】Diffusion Transformer:OpenAI Sora 原理、Stable Diffusion 3 同源技术

文生视频 Diffusion Transformer:Sora 核心架构、Stable Diffusion 3 同源技术 提出背景输入输出生成流程变换器的引入Diffusion Transformer (DiT)架构Diffusion Transformer (DiT)总结 OpenAI Sora 设计思路阶段1: 数据准备和预处理阶段2: 架构设计阶段3: 输入数据…

sql-labs第46关(order by盲注脚本)

一、环境 网上有自己找 二、解释 order by 注入我们看他的true和false来进行注入出来 二、实操 让我们用sort 看看源码 最终我们的id是放到order by后面了 如果我们直接用列去排序 ?sortusername/password username: password: 可以看到顺序是不…

云原生之容器编排实践-ruoyi-cloud项目部署到K8S:MySQL8

背景 前面搭建好了 Kubernetes 集群与私有镜像仓库,终于要进入服务编排的实践环节了。本系列拿 ruoyi-cloud 项目进行练手,按照 MySQL , Nacos , Redis , Nginx , Gateway , Auth ,…

【数据库】MySQL视图 | 用户管理

文章目录 1 :peach:视图:peach:1.1 :apple:基本使用:apple:1.1.1 :lemon:创建视图:lemon:1.1.2 :lemon:案例:lemon:1.1.3 :lemon:删除视图:lemon: 1.2 :apple:视图规则和限制:apple: 2 :peach:用户管理:peach:2.1 :apple:用户信息:apple:2.2 :apple:创建用户:apple:2.3 :apple:…

Sovit3D数字孪生平台 助力智慧海上风电场项目加速

我们常说地球是蓝色星球,那是因为海洋约占地球面积的71%。如今,我国正在向“双碳”目标不断奋斗,海上风电也作为一种潜力清洁能源,迸发出前所未有的活力,海上吹来的风成为未来清洁能源新方向。 2024年海上风电项目加速…

基于SpringBoot+Apache ECharts的前后端分离外卖项目-苍穹外卖(十八)

数据展示 1. Apache ECharts1.1 介绍1.2 入门案例 2. 营业额统计2.1 需求分析和设计2.1.1 产品原型2.1.2 接口设计 2.2 代码开发2.2.1 VO设计2.2.2 Controller层2.2.3 Service层接口2.2.4 Service层实现类2.2.5 Mapper层 2.3 功能测试 3. 用户统计3.1 需求分析和设计3.1.1 产品…

如果发现某个地方太薄了想要加厚怎么办?

Q 做完模型后,发现斧头柄部太薄了想要加厚怎么办? A 使用圆形套索区域,选中点 然后左视图,选择缩放,横向拉宽即可

git bash:ls查看文件颜色全部为白色的解决方法(已解决)

方法一: 修改~/.bashrc文件或者~/.profile文件,添加如下内容 alias lsls --colorauto 然后 source一下,让修改配置生效 source ~/.profile 然后再ls OK了

哪些网页原型设计工具易于使用?

本文介绍了七种专业易用的原型工具,帮助您快速制作可验证的方案原型,减少产品、运营和其他同事的沟通时间,以及设计师绘制效果图的沟通时间。我相信你可以在阅读后找到最合适的网页原型设计工具。网页界面原型设计软件有很多选择。以下是一些…

超全的数据可视化图表组件,建议收藏!

直观的图表可以让受众理解复杂的数据,图表也分为很多种类,不同的图表的适用场合不同,小编对【派可数据 BI】的可视化组件进行梳理,根据使用场景,将可视化组件划分了以下几类:指标、趋势、比较、时序、空间、…

Ansible user 模块 该模块主要是用来管理用户账号

目录 参数语法验证创建用户删除用户验证 删除用户 参数 comment  # 用户的描述信息 createhome  # 是否创建家目录 force  # 在使用stateabsent时, 行为与userdel –force一致. group  # 指定基本组 groups  # 指定附加组,如果指定为(groups)表示删除所有…

Vue3路由元信息

路由元信息即定义路由时的meta信息 使用路由元信息定义页面在浏览器显示的标题 定义路由 const router createRouter({history:createWebHistory(import.meta.env.BASE_URL),routes:[{path:"/",component:()>import("/components/Login.vue"),meta:{…

C# OpenCvSharp DNN Low Light image Enhancement

目录 介绍 效果 模型信息 项目 代码 下载 C# OpenCvSharp DNN Low Light image Enhancement 介绍 github地址:GitHub - zhenqifu/PairLIE 效果 模型信息 Model Properties ------------------------- ---------------------------------------------------…

stream流-> 判定 + 过滤 + 收集

List<HotArticleVo> hotArticleVos hotArticleVoList .stream() .filter(x -> x.getChannelId().equals(wmChannel.getId())).collect(Collectors.toList()); 使用Java 8中的Stream API对一个名为hotArticleVoList的列表进行过滤操作&#xff0c;筛选出符合指定条件…

可视化大屏C位的中国地图,都有哪些样式?

可视化设计用上了中国地图&#xff0c;那么中国地图必定占据了C位&#xff0c;这可是大屏中最显眼的位置&#xff0c;最抓眼球的地方&#xff0c;咱们总不能随随便便截图放上去吧。 本文就带若干张地图样式&#xff0c;开拓一下大家的思路。