【大数据】Flink SQL 语法篇(四):Group 聚合

news2025/1/16 0:47:24

Flink SQL 语法篇(四):Group 聚合

  • 1.基础概念
  • 2.窗口聚合和 Group 聚合
  • 3.SQL 语义
  • 4.Group 聚合支持 Grouping sets、Rollup、Cube

1.基础概念

Group 聚合定义(支持 Batch / Streaming 任务):Flink 也支持 Group 聚合。Group 聚合和上面介绍到的窗口聚合的不同之处,就在于 Group 聚合是按照数据的类别进行分组,比如年龄、性别,是横向的;而窗口聚合是在时间粒度上对数据进行分组,是纵向的。如下图所示,就展示出了其区别。其中 按颜色分 key(横向)就是 Group 聚合按窗口划分(纵向)就是 窗口聚合

在这里插入图片描述

2.窗口聚合和 Group 聚合

应用场景:一般用于对数据进行分组,然后后续使用聚合函数进行 countsum 等聚合操作。

那么这时候,小伙伴萌就会问到,我其实可以把窗口聚合的写法也转换为 Group 聚合,只需要把 Group 聚合的 Group By key 换成时间就行,那这两个聚合的区别到底在哪?

首先来举一个例子看看怎么将 窗口聚合 转换为 Group 聚合。假如一个窗口聚合是按照 1 1 1 分钟的粒度进行聚合,如下 滚动窗口 SQL:

-- 数据源表
CREATE TABLE source_table (
    -- 维度数据
    dim STRING,
    -- 用户 id
    user_id BIGINT,
    -- 用户
    price BIGINT,
    -- 事件时间戳
    row_time AS cast(CURRENT_TIMESTAMP as timestamp(3)),
    -- watermark 设置
    WATERMARK FOR row_time AS row_time - INTERVAL '5' SECOND
) WITH (
  'connector' = 'datagen',
  'rows-per-second' = '10',
  'fields.dim.length' = '1',
  'fields.user_id.min' = '1',
  'fields.user_id.max' = '100000',
  'fields.price.min' = '1',
  'fields.price.max' = '100000'
)

-- 数据汇表
CREATE TABLE sink_table (
    dim STRING,
    pv BIGINT,
    sum_price BIGINT,
    max_price BIGINT,
    min_price BIGINT,
    uv BIGINT,
    window_start bigint
) WITH (
  'connector' = 'print'
)

-- 数据处理逻辑
insert into sink_table
select dim,
    count(*) as pv,
    sum(price) as sum_price,
    max(price) as max_price,
    min(price) as min_price,
    -- 计算 uv 数
    count(distinct user_id) as uv,
    UNIX_TIMESTAMP(CAST(tumble_start(row_time, interval '1' minute) AS STRING)) * 1000  as window_start
from source_table
group by
    dim,
    -- 按照 Flink SQL tumble 窗口写法划分窗口
    tumble(row_time, interval '1' minute)

转换为 Group 聚合 的写法如下:

-- 数据源表
CREATE TABLE source_table (
    -- 维度数据
    dim STRING,
    -- 用户 id
    user_id BIGINT,
    -- 用户
    price BIGINT,
    -- 事件时间戳
    row_time AS cast(CURRENT_TIMESTAMP as timestamp(3)),
    -- watermark 设置
    WATERMARK FOR row_time AS row_time - INTERVAL '5' SECOND
) WITH (
  'connector' = 'datagen',
  'rows-per-second' = '10',
  'fields.dim.length' = '1',
  'fields.user_id.min' = '1',
  'fields.user_id.max' = '100000',
  'fields.price.min' = '1',
  'fields.price.max' = '100000'
);

-- 数据汇表
CREATE TABLE sink_table (
    dim STRING,
    pv BIGINT,
    sum_price BIGINT,
    max_price BIGINT,
    min_price BIGINT,
    uv BIGINT,
    window_start bigint
) WITH (
  'connector' = 'print'
);

-- 数据处理逻辑
insert into sink_table
select dim,
    count(*) as pv,
    sum(price) as sum_price,
    max(price) as max_price,
    min(price) as min_price,
    -- 计算 uv 数
    count(distinct user_id) as uv,
    cast((UNIX_TIMESTAMP(CAST(row_time AS STRING))) / 60 as bigint) as window_start
from source_table
group by
    dim,
    -- 将秒级别时间戳 / 60 转化为 1min
    cast((UNIX_TIMESTAMP(CAST(row_time AS STRING))) / 60 as bigint)

确实没错,上面这个转换是一点问题都没有的。

但是窗口聚合和 Group by 聚合的差异在于:

  • 本质区别窗口聚合是具有时间语义的,其本质是想实现窗口结束输出结果之后,后续有迟到的数据也不会对原有的结果发生更改了,即输出结果值是定值(不考虑 allowLateness)。而 Group by 聚合是没有时间语义的,不管数据迟到多长时间,只要数据来了,就把上一次的输出的结果数据撤回,然后把计算好的新的结果数据发出。
  • 运行层面:窗口聚合是和 时间 绑定的,窗口聚合其中窗口的计算结果触发都是由 时间(Watermark)推动的。Group by 聚合完全由 数据 推动触发计算,新来一条数据去根据这条数据进行计算出结果发出;由此可见两者的实现方式也大为不同。

3.SQL 语义

SQL 语义这里也拿离线和实时做对比,Order 为 Kafka,target_table 为 Kafka,这个 SQL 生成的实时任务,在执行时,会生成三个算子。

  • 数据源算子From Order):数据源算子一直运行,实时的从 Order Kafka 中一条一条的读取数据,然后一条一条发送给下游的 Group 聚合算子,向下游发送数据的 shuffle 策略是根据 group by 中的 key 进行发送,相同的 key 发到同一个 SubTask(并发) 中。
  • Group 聚合算子group by key + sum / count / max / min):接收到上游算子发的一条一条的数据,去状态 state 中找这个 key 之前的 sum / count / max / min 结果。如果有结果 oldResult,拿出来和当前的数据进行 sum / count / max / min 计算出这个 key 的新结果 newResult,并将新结果 [key, newResult] 更新到 state 中,在向下游发送新计算的结果之前,先发一条撤回上次结果的消息 -[key, oldResult],然后再将新结果发往下游 +[key, newResult];如果 state 中没有当前 key 的结果,则直接使用当前这条数据计算 sum / max / min 结果 newResult,并将新结果 [key, newResult] 更新到 state 中,当前是第一次往下游发,则不需要先发回撤消息,直接发送 +[key, newResult]
  • 数据汇算子INSERT INTO target_table):接收到上游发的一条一条的数据,写入到 target_table Kafka 中这个实时任务也是 24 24 24 小时一直在运行的,所有的算子在同一时刻都是处于 running 状态的。

4.Group 聚合支持 Grouping sets、Rollup、Cube

Group 聚合也支持 Grouping setsRollupCube。举一个 Grouping sets 的案例:

SELECT 
    supplier_id
    , rating
    , product_id
    , COUNT(*)
FROM (VALUES
    ('supplier1', 'product1', 4),
    ('supplier1', 'product2', 3),
    ('supplier2', 'product3', 3),
    ('supplier2', 'product4', 4))
AS Products(supplier_id, product_id, rating)
GROUP BY GROUPING SET (
    ( supplier_id, product_id, rating ),
    ( supplier_id, product_id         ),
    ( supplier_id,             rating ),
    ( supplier_id                     ),
    (              product_id, rating ),
    (              product_id         ),
    (                          rating ),
    (                                 )
)

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1470706.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Python及Pycharm专业版下载安装教程(Python 3.11版)附JetBrains学生认证教程

目录 一、Python下载及安装1、Python下载2、Python安装3、验证是否安装成功 二、PyCharm下载及安装1、PyCharm下载2、PyCharm安装3、激活PyCharm 三、JetBrains学生认证 本篇主要介绍Python和PyCharm专业版的下载及安装方式,以及通过两种方式进行JetBrains学生认证。…

第十篇【传奇开心果系列】Python的文本和语音相互转换库技术点案例示例:Microsoft Azure开发语音翻译应用程序经典案例

传奇开心果博文系列 系列博文目录Python的文本和语音相互转换库技术点案例示例系列 博文目录前言一、雏形示例代码二、扩展思路介绍三、Azure多语种支持示例代码四、Azure实时对话模式示例代码五、Azure自定义翻译模型示例代码六、Azure语音合成示例代码七、Azure用户界面优化示…

openGauss学习笔记-229 openGauss性能调优-系统调优-配置Ustore

文章目录 openGauss学习笔记-229 openGauss性能调优-系统调优-配置Ustore229.1 设计原理229.2 核心优势229.3 使用指导 openGauss学习笔记-229 openGauss性能调优-系统调优-配置Ustore Ustore存储引擎,又名In-place Update存储引擎(原地更新&#xff09…

java面试(网络)

TCP和UDP有什么区别?TCP三次握手不是两次? TCP:面向连接,可靠的,传输层通信协议。点对点,占用资源多,效率低。 UDP:无连接,不可靠,传输层通信协议。广播&…

Nvidia Jetson Orin NX配置环境

Nvidia Jetson Orin NX配置环境配置环境 一、安装jetson5.1.2二、安装jtop三、配置CUDA和cuDNN四、安装Pytorch 先导片:Jetson采用arm64架构 一、安装jetson5.1.2 安装好jetson自带cuda、cudnn和tensorRT 官方文档 更换源 sudo vi /etc/apt/sources.list.d/nvidia…

力扣技巧题:丢失的数字

先排后找可以让结果更简单 int cmp(const void* a, const void* b){return *(int*)a - *(int*)b; } int missingNumber(int* nums, int numsSize){qsort(nums, numsSize, 4, cmp);for(int i0; i<numsSize; i){if(nums[i] i){continue;}else{return i;}}return numsSize; }…

10 在线逻辑分析仪的使用

在线逻辑分析仪简介 传统的 FPGA 板级调试是将逻辑分析仪连接到 FPGA 的 IO 引脚上 &#xff0c;然后将内部信号引出至 IO 引脚&#xff0c;再进行板级调试&#xff0c;这种方法的缺点是我们需要一个逻辑分析仪&#xff0c;且还要在 PCB 中预留测试点。在线逻辑分析仪克服了以…

使用 C++23 协程实现第一个 co_yield 同步风格调用接口--Qt计算排列组合

在C23的协程特性里&#xff0c; co_yield 用于从协程执行过程中返回值。这个功能乍一听起来很奇怪&#xff0c;网上的例子大多是用一个计数器来演示多次中断协程函数&#xff0c;返回顺序的计数值。这看起来毫无意义。 其实这个功能主要想演示的就是协程 co_yield 具备打断一个…

【数据结构】图——最短路径

最短路径问题&#xff1a;从在带权有向图G中的某一顶点出发&#xff0c;找出一条通往另一顶点的最短路径&#xff0c;最短也就是沿路径各边的权值总和达到最小。 最短路径分为图中单源路径和多源路径。 本文会介绍Dijkstra和Bellman-Ford解决单源路径的问题 Floyd-Warshall解…

SCI一区 | Matlab实现ST-CNN-MATT基于S变换时频图和卷积网络融合多头自注意力机制的多特征分类预测

SCI一区 | Matlab实现ST-CNN-MATT基于S变换时频图和卷积网络融合多头自注意力机制的故障多特征分类预测 目录 SCI一区 | Matlab实现ST-CNN-MATT基于S变换时频图和卷积网络融合多头自注意力机制的故障多特征分类预测效果一览基本介绍模型描述程序设计参考资料 效果一览 基本介绍…

【牛客】2024牛客寒假算法基础集训营6ABCDEGHIJ

文章目录 A 宇宙的终结题目大意主要思路代码 B 爱恨的纠葛题目大意主要思路代码 C 心绪的解剖题目大意主要思路代码 D 友谊的套路题目大意主要思路代码 E 未来的预言题目大意主要思路代码 G 人生的起落题目大意主要思路代码 I 时空的交织题目大意主要思路代码 J 绝妙的平衡题目…

ChatGPT调教指南 | 咒语指南 | Prompts提示词教程(三)

在人工智能成为我们日常互动中无处不在的一部分的时代&#xff0c;与大型语言模型(llm)有效沟通的能力是无价的。“良好提示的26条原则”为优化与这些复杂系统的交互提供了全面的指导。本指南证明了人类和人工智能之间的微妙关系&#xff0c;强调清晰、专一和结构化的沟通方法。…

leetcode hot100 买卖股票最佳时机3

本题中&#xff0c;依旧可以采用动态规划来进行解决&#xff0c;之前的两个题我们都是用二维数组dp[i][2]来表示的&#xff0c;其中i表示第i天&#xff0c;2表示长度为2&#xff0c;其中0表示不持有&#xff0c;1表示持有。 本题中&#xff0c;说至多完成两笔交易&#xff0c;也…

RabbitMQ 面试八股题整理

前言&#xff1a;本文是博主网络自行收集的一些RabbitMQ相关八股文&#xff0c;还在准备暑期实习&#xff0c;后续应该会持续更新...... 参考&#xff1a;三天吃透RabbitMQ面试八股文_牛客网 目录 RabbitMQ概述 什么是 RabbitMQ&#xff1f; 说一说RabbitMQ中的AMQP 为什么…

单机取证-信息安全管理与评估-2022年国赛真题-环境+wp

🍬 博主介绍 博主介绍:大家好,我是 Mikey ,很高兴认识大家~ 主攻:【应急响应】 【python】 【数字取证】【单机取证】【流量分析】【MISC】 🎉点赞➕评论➕收藏 == 养成习惯(一键三连)😋 🎉欢迎关注💗一起学习👍一起讨论⭐️一起进步 作者水平有限,欢迎各…

网络层的DDoS攻击与应用层的DDoS攻击之间的区别

DDoS攻击&#xff08;即“分布是拒绝服务攻击”&#xff09;&#xff0c;是基于DoS的特殊形式的拒绝服务攻击&#xff0c;是一种分布式、协作的大规模攻击方式&#xff0c;主要瞄准一些企业或政府部门的网站发起攻击。根据攻击原理和方式的区别&#xff0c;可以把DDoS攻击分为两…

(done) 如何判断一个矩阵是否可逆?

参考视频&#xff1a;https://www.bilibili.com/video/BV15H4y1y737/?spm_id_from333.337.search-card.all.click&vd_source7a1a0bc74158c6993c7355c5490fc600 这个视频里还暗含了一些引理 1.若 AX XB 且 X 和 A,B 同阶可逆&#xff0c;那么 A 和 B 相似。原因&#xff1…

RDMA内核态函数ib_post_recv()源码分析

接上文&#xff0c;上文分析了内核rdma向发送队列添加发送请求的函数ib_post_send&#xff0c;本文分析一下向接收队列添加接收请求的函数ib_post_recv。其实函数调用流程与上文类似&#xff0c;不再重复说明&#xff0c;可参考链接。 函数调用过程 最终会调用到这个函数 下面…

Stable Diffusion 绘画入门教程(webui)-ControlNet(Inpaint)

上篇文章介绍了语义分割Tile/Blur&#xff0c;这篇文章介绍下Inpaint&#xff08;重绘&#xff09; Inpaint类似于图生图的局部重绘&#xff0c;但是Inpain效果要更好一点&#xff0c;和原图融合会更加融洽&#xff0c;下面是案例&#xff0c;可以看下效果&#xff08;左侧原图…

【Java多线程】对线程池的理解并模拟实现线程池

目录 1、池 1.1、线程池 2、ThreadPoolExecutor 线程池类 3、Executors 工厂类 4、模拟实现线程池 1、池 “池”这个概念见到非常多&#xff0c;例如常量池、数据库连接池、线程池、进程池、内存池。 所谓“池”的概念就是&#xff1a;&#xff08;提高效率&#xff09; 1…