【大数据】Flink SQL 语法篇(八):集合、Order By、Limit、TopN

news2024/10/6 16:29:50

Flink SQL 语法篇(八):集合、Order By、Limit、TopN

  • 1.集合操作
  • 2.Order By、Limit 子句
    • 2.1 Order By 子句
    • 2.2 Limit 子句
  • 3.TopN 子句

1.集合操作

集合操作支持 Batch / Streaming 任务。

在这里插入图片描述

  • UNION:将集合合并并且去重。
  • UNION ALL:将集合合并,不做去重。
Flink SQL> create view t1(s) as values ('c'), ('a'), ('b'), ('b'), ('c');
Flink SQL> create view t2(s) as values ('d'), ('e'), ('a'), ('b'), ('b');

Flink SQL> (SELECT s FROM t1) UNION (SELECT s FROM t2);
+---+
|  s|
+---+
|  c|
|  a|
|  b|
|  d|
|  e|
+---+

Flink SQL> (SELECT s FROM t1) UNION ALL (SELECT s FROM t2);
+---+
|  c|
+---+
|  c|
|  a|
|  b|
|  b|
|  c|
|  d|
|  e|
|  a|
|  b|
|  b|
+---+
  • Intersect:交集并且去重。
  • Intersect ALL:交集不做去重。
Flink SQL> create view t1(s) as values ('c'), ('a'), ('b'), ('b'), ('c');
Flink SQL> create view t2(s) as values ('d'), ('e'), ('a'), ('b'), ('b');
Flink SQL> (SELECT s FROM t1) INTERSECT (SELECT s FROM t2);
+---+
|  s|
+---+
|  a|
|  b|
+---+

Flink SQL> (SELECT s FROM t1) INTERSECT ALL (SELECT s FROM t2);
+---+
|  s|
+---+
|  a|
|  b|
|  b|
+---+
  • Except:差集并且去重。
  • Except ALL:差集不做去重。
Flink SQL> (SELECT s FROM t1) EXCEPT (SELECT s FROM t2);
+---+
| s |
+---+
| c |
+---+

Flink SQL> (SELECT s FROM t1) EXCEPT ALL (SELECT s FROM t2);
+---+
| s |
+---+
| c |
| c |
+---+

上述 SQL 在流式任务中,如果一条左流数据先来了,没有从右流集合数据中找到对应的数据时会直接输出,当右流对应数据后续来了之后,会下发回撤流将之前的数据给撤回。这也是一个回撤流。

  • In 子查询:这个大家比较熟悉了,但是注意,In 子查询的结果集只能有一列。
SELECT user, amount
FROM Orders
WHERE product IN (
    SELECT product FROM NewProducts
)

上述 SQL 的 In 子句其实就和之前介绍到的 Inner Join 类似。并且 In 子查询也会涉及到大状态问题,大家注意设置 State 的 TTL。

2.Order By、Limit 子句

2.1 Order By 子句

支持 Batch / Streaming,但在实时任务中一般用的非常少。

实时任务中,Order By 子句中 必须要有时间属性字段,并且时间属性必须为 升序 时间属性,即 WATERMARK FOR rowtime_column AS rowtime_column - INTERVAL '0.001' SECOND 或者 WATERMARK FOR rowtime_column AS rowtime_column

举例:

CREATE TABLE source_table_1 (
    user_id BIGINT NOT NULL,
    row_time AS cast(CURRENT_TIMESTAMP as timestamp(3)),
    WATERMARK FOR row_time AS row_time
) WITH (
  'connector' = 'datagen',
  'rows-per-second' = '10',
  'fields.user_id.min' = '1',
  'fields.user_id.max' = '10'
);

CREATE TABLE sink_table (
    user_id BIGINT
) WITH (
  'connector' = 'print'
);

INSERT INTO sink_table
SELECT user_id
FROM source_table_1
Order By row_time, user_id desc

2.2 Limit 子句

支持 Batch / Streaming,但实时场景一般不使用,但是此处依然举一个例子。

CREATE TABLE source_table_1 (
    user_id BIGINT NOT NULL,
    row_time AS cast(CURRENT_TIMESTAMP as timestamp(3)),
    WATERMARK FOR row_time AS row_time
) WITH (
  'connector' = 'datagen',
  'rows-per-second' = '10',
  'fields.user_id.min' = '1',
  'fields.user_id.max' = '10'
);

CREATE TABLE sink_table (
    user_id BIGINT
) WITH (
  'connector' = 'print'
);

INSERT INTO sink_table
SELECT user_id
FROM source_table_1
Limit 3

结果如下,只有 3 条输出:

+I[5]
+I[9]
+I[4]

3.TopN 子句

TopN 定义(支持 Batch / Streaming):TopN 其实就是对应到离线数仓中的 row_number(),可以使用 row_number() 对某一个分组的数据进行排序。

应用场景:根据 某个排序 条件,计算 某个分组 下的排行榜数据。

SQL 语法标准:

SELECT [column_list]
FROM (
   SELECT [column_list],
     ROW_NUMBER() OVER ([PARTITION BY col1[, col2...]]
       ORDER BY col1 [asc|desc][, col2 [asc|desc]...]) AS rownum
   FROM table_name)
WHERE rownum <= N [AND conditions]
  • ROW_NUMBER():标识 TopN 排序子句。
  • PARTITION BY col1[, col2...]:标识分区字段,代表按照这个 col 字段作为分区粒度对数据进行排序取 TopN,比如下述案例中的 partition by key,就是根据需求中的搜索关键词(key)做为分区。
  • ORDER BY col1 [asc|desc][, col2 [asc|desc]...]:标识 TopN 的排序规则,是按照哪些字段、顺序或逆序进行排序。
  • WHERE rownum <= N:这个子句是一定需要的,只有加上了这个子句,Flink 才能将其识别为一个 TopN 的查询,其中 N 代表 TopN 的条目数。
  • [AND conditions]:其他的限制条件也可以加上。

实际案例:取某个搜索关键词下的搜索热度前 10 名的词条数据。

输入数据为搜索词条数据的搜索热度数据,当搜索热度发生变化时,会将变化后的数据写入到数据源的 Kafka 中:

-- 数据源 schema

-- 字段名         备注
-- key          搜索关键词
-- name         搜索热度名称
-- search_cnt    热搜消费热度(比如 3000)
-- timestamp       消费词条时间戳

CREATE TABLE source_table (
    name BIGINT NOT NULL,
    search_cnt BIGINT NOT NULL,
    key BIGINT NOT NULL,
    row_time AS cast(CURRENT_TIMESTAMP as timestamp(3)),
    WATERMARK FOR row_time AS row_time
) WITH (
  ...
);

-- 数据汇 schema

-- key          搜索关键词
-- name         搜索热度名称
-- search_cnt    热搜消费热度(比如 3000)
-- timestamp       消费词条时间戳

CREATE TABLE sink_table (
    key BIGINT,
    name BIGINT,
    search_cnt BIGINT,
    `timestamp` TIMESTAMP(3)
) WITH (
  ...
);

-- DML 逻辑
INSERT INTO sink_table
SELECT key, name, search_cnt, row_time as `timestamp`
FROM (
   SELECT key, name, search_cnt, row_time, 
     -- 根据热搜关键词 key 作为 partition key,然后按照 search_cnt 倒排取前 100 名
     ROW_NUMBER() OVER (PARTITION BY key
       ORDER BY search_cnt desc) AS rownum
   FROM source_table)
WHERE rownum <= 100

输出结果:

-D[关键词1, 词条1, 4944]
+I[关键词1, 词条1, 8670]
+I[关键词1, 词条2, 1735]
-D[关键词1, 词条3, 6641]
+I[关键词1, 词条3, 6928]
-D[关键词1, 词条4, 6312]
+I[关键词1, 词条4, 7287]

可以看到输出数据是有回撤数据的,为什么会出现回撤,我们来看看 SQL 语义。

上面的 SQL 会翻译成以下三个算子:

  • 数据源:数据源即最新的词条下面的搜索词的搜索热度数据,消费到 Kafka 中数据后,按照 partition key 将数据进行 Hash 分发到下游排序算子,相同的 Key 数据将会发送到一个并发中。
  • 排序算子:为每个 Key 维护了一个 TopN 的榜单数据,接受到上游的一条数据后,如果 TopN 榜单还没有到达 N 条,则将这条数据加入 TopN 榜单后,直接下发数据,如果到达 N 条之后,经过 TopN 计算,发现这条数据比原有的数据排序靠前,那么新的 TopN 排名就会有变化,就变化了的这部分数据之前下发的排名数据撤回(即回撤数据),然后下发新的排名数据。
  • 数据汇:接收到上游的数据之后,然后输出到外部存储引擎中。

上面三个算子也是会 24 小时一直运行的。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1476672.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

MySql-多表设计-一对多-外键

目录 外键约束问题分析问题解决 实例通过SQL语句操作物理外键和逻辑外键 外键约束 表结构创建完毕后&#xff0c;我们看到两张表的数据分别为&#xff1a; 现在员工表中有9个员工都归属于2号部门&#xff0c;当删除了号部门后&#xff0c;数据变为&#xff1a; 2号部门被删除…

什么是电子邮件客户端?如何选择合适的邮箱客户端?

“从1到10分&#xff0c;你会如何评价我们的电子邮件服务&#xff1f;” 无论你的评分是多少&#xff0c;影响你评分的一个重要因素肯定是电子邮件客户端提供的功能。 电子邮件客户端应该具有基本而漂亮的高级功能&#xff0c;以使迁移过程更容易。此外&#xff0c;应该有一些…

leetcode — 动态规划 — 打家劫舍、完全平方数

1 打家劫舍 你是一个专业的小偷&#xff0c;计划偷窃沿街的房屋。每间房内都藏有一定的现金&#xff0c;影响你偷窃的唯一制约因素就是相邻的房屋装有相互连通的防盗系统&#xff0c;如果两间相邻的房屋在同一晚上被小偷闯入&#xff0c;系统会自动报警。 给定一个代表每个房…

Vue官网“食用指南”

把Vue官网当做一个工具来用&#xff0c;有问题&#xff0c;先来官网查一查。 官网中常用的板块 官网&#xff1a;https://cn.vuejs.org/上手后&#xff0c;最常用的模块是【快速上手】【API】。所以务必要知道这两个模块在哪里&#xff0c;怎么使用。![image.png](https://img…

VPX基于全国产飞腾FT-2000+/64核+复旦微FPGA的计算刀片

6U VPX计算板 产品简介 产品特点 飞腾计算平台&#xff0c;国产化率100% VPX-MPU6902是一款基于飞腾FT-2000/64核的计算刀片&#xff0c;主频2.2GHz&#xff0c;负责业务数据流的管控和调度。搭配自带独立显示芯片的飞腾X100芯片&#xff0c;可用于于各类终端及服务器类应用场…

R语言使用dietaryindex包计算NHANES数据多种健康饮食指数 (HEI等)(1)

健康饮食指数 (HEI) 是评估一组食物是否符合美国人膳食指南 (DGA) 的指标。Dietindex包提供用户友好的简化方法&#xff0c;将饮食摄入数据标准化为基于指数的饮食模式&#xff0c;从而能够评估流行病学和临床研究中对这些模式的遵守情况&#xff0c;从而促进精准营养。 该软件…

能在手机上运行,仅仅0.5B大小的小语言模型MobiLlama

模型介绍 该模型基于LLaMA-7B架构设计&#xff0c;旨在能够在边缘设备上高效运行&#xff0c;无需将数据发送到远程服务器或云端处理。如智能手机、平板电脑、智能手表等。MobiLlama模型虽然体积小、对资源的需求低&#xff0c;但仍能提供高精度的语言理解和生成能力。项目还提…

fastjson序列化MessageExt对象问题(1.2.78之前版本)

前言 无论是kafka&#xff0c;还是RocketMq&#xff0c;消费者方法参数中的MessageExt对象不能被 fastjson默认的方式序列化。 一、查看代码 Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,ConsumeConcurrentlyContext context) {t…

C语言——深度剖析数据在内存中的存储——第2篇——(第25篇)

坚持就是胜利 文章目录 三、浮点型在内存中的存储1、一个例子2、浮点数存储规则1、IEEE 754对 有效数字M 和 指数E &#xff0c;还有一些特别规定。2、至于 指数E&#xff0c;情况比较复杂&#xff0c;首先&#xff0c;E 为 一个无符号整数(unsigned int)3、然而&#xff0c;指…

防御保护:防火墙内容安全

一、IAE&#xff08;Intelligent Awareness Engine&#xff09;引擎 二、深度检测技术(DFI和DPI&#xff09; 1.DPI – 深度包检测技术 DPI主要针对完整的数据包&#xff08;数据包分片&#xff0c;分段需要重组&#xff09;&#xff0c;之后对数据包的内容进行识别。&#x…

c# .net8 香橙派orangepi + hc-04蓝牙 实例

这些使用c# .net8开发&#xff0c;硬件 香橙派 orangepi 3lts和 hc-04蓝牙 使用场景&#xff1a;可以通过这个功能&#xff0c;手机连接orangepi进行wifi等参数配置 硬件&#xff1a; 1、带USB口的linux开发板orangepi 2、USB 转TTL 中转接蓝牙&#xff08;HC-04) 某宝上买…

openGauss学习笔记-230 openGauss性能调优-系统调优-配置并行查询功能

文章目录 openGauss学习笔记-230 openGauss性能调优-系统调优-配置并行查询功能230.1 适用场景与限制230.2 资源对SMP性能的影响230.3 其他因素对SMP性能的影响230.4 配置步骤 openGauss学习笔记-230 openGauss性能调优-系统调优-配置并行查询功能 openGauss的SMP并行技术是一…

双流机场到天府机场ADS-B数据导入MATLAB

MATLAB导入数据 导入的数据Excel部分截图&#xff1a; 一些处理 % 导入外部轨迹数据并转成标准形式 clear;clc; %% 导入&预处理 [NUM,TXT,RAW]xlsread(2021年10月31日CTU-TFU); time_cell RAW(3:end,1); %拉取时间数据&#xff08;cell&#xff09; time_char char(t…

【清理mysql数据库服务器二进制日志文件】

清理前后比对 清理前占用 86% &#xff1a; 清理后占用 29% &#xff1a; 排查占用磁盘较大的文件 检测磁盘空间占用 TOP 10 # 检测磁盘空间占用 TOP 10 $ sudo du -S /var/log/ | > sort -rn | # -n选项允许按数字排序。-r选项会先列出最大数字&#xff08;逆序&#x…

智慧应急:构建全方位、立体化的安全保障网络

一、引言 在信息化、智能化快速发展的今天&#xff0c;传统的应急管理模式已难以满足现代社会对安全保障的需求。智慧应急作为一种全新的安全管理模式&#xff0c;旨在通过集成物联网、大数据、云计算、人工智能等先进技术&#xff0c;实现对应急事件的快速响应、精准决策和高…

eltable 合计行添加tooltip

eltable 合计行添加tooltip 问题描述&#xff1a; eltable 合计行单元格内容过长会换行&#xff0c;需求要求合计行数据超长显示 … &#xff0c;鼠标 hover 时显示提示信息。 解决方案&#xff1a;eltable合计行没有对外的修改接口&#xff0c;想法是 自己实现一个tooltip&a…

代码随想录刷题笔记-Day25

1. 分割回文串 131. 分割回文串https://leetcode.cn/problems/palindrome-partitioning/ 给你一个字符串 s&#xff0c;请你将 s 分割成一些子串&#xff0c;使每个子串都是 回文串 。返回 s 所有可能的分割方案。 回文串 是正着读和反着读都一样的字符串。 示例 1&#xf…

vue使用gitshot生成gif

vue使用gitshot生成gif 问题背景 本文将介绍vue中使用gitshot生成gif。 问题分析 解决思路&#xff1a; 使用input组件上传一个视频&#xff0c;获取视频文件后用一个video组件进行播放&#xff0c;播放过程进行截图生成图片数组。 demo演示上传一个视频&#xff0c;然后生…

Linux:Kubernetes(k8s)——基础理论笔记(1)

我笔记来源的图片以及共享至GitHub&#xff0c;本章纯理论。这是k8s中部分的基础理论 &#x1f447; KALItarro/k8spdf: 这个里面只有一个pdf文件 (github.com)https://github.com/KALItarro/k8spdf&#x1f446; 什么是kubernetes kubernetes 是一个开源的&#xff0c;用于管…

spring boot学习第十三篇:使用spring security控制权限

该文章同时也讲到了如何使用swagger。 1、pom.xml文件内容如下&#xff1a; <?xml version"1.0" encoding"UTF-8"?> <project xmlns"http://maven.apache.org/POM/4.0.0" xmlns:xsi"http://www.w3.org/2001/XMLSchema-instanc…