【大数据】Flink SQL 语法篇(二):WITH、SELECT WHERE、SELECT DISTINCT

news2025/1/11 23:52:39

Flink SQL 语法篇(二)

  • 1.WITH 子句
  • 2.SELECT & WHERE 子句
  • 3.SELECT DISTINCT 子句

1.WITH 子句

应用场景(支持 Batch / Streaming):With 语句和离线 Hive SQL With 语句一样的,语法糖 +1,使用它可以让你的代码逻辑更加清晰。

-- 语法糖 +1
WITH orders_with_total AS (
    SELECT order_id, price + tax AS total
    FROM Orders
)
SELECT order_id, SUM(total)
FROM orders_with_total
GROUP BY order_id;

2.SELECT & WHERE 子句

应用场景(支持 Batch / Streaming):SELECT & WHERE 语句和离线 Hive SQL 语句一样的,常用作 ETL,过滤,字段清洗标准化。

INSERT INTO target_table
SELECT * FROM Orders

INSERT INTO target_table
SELECT order_id, price + tax FROM Orders

INSERT INTO target_table
-- 自定义 Source 的数据
SELECT order_id, price FROM (VALUES (1, 2.0), (2, 3.1))  AS t (order_id, price)

INSERT INTO target_table
SELECT price + tax FROM Orders WHERE id = 10

-- 使用 UDF 做字段标准化处理
INSERT INTO target_table
SELECT PRETTY_PRINT(order_id) FROM Orders
-- 过滤条件
Where id > 3

其实理解一个 SQL 最后生成的任务是怎样执行的,最好的方式就是理解其语义。

以下面的 SQL 为例,我们来介绍下其在离线中和在实时中执行的区别,对比学习一下,大家就比较清楚了。

INSERT INTO target_table
SELECT PRETTY_PRINT(order_id) FROM Orders
Where id > 3

这个 SQL 对应的实时任务,假设 Orders 为 Kafka,target_table 也为 Kafka,在执行时,会生成三个算子:

  • 数据源算子From Order):连接到 Kafka topic,数据源算子一直运行,实时的从 Orders Kafka 中一条一条的读取数据,然后一条一条发送给下游的 过滤和字段标准化算子
  • 过滤和字段标准化算子Where id > 3PRETTY_PRINT(order_id)):接收到上游算子发的一条一条的数据,然后判断 id > 3 ?,将判断结果为 true 的数据执行 PRETTY_PRINT UDF 后,一条一条将计算结果数据发给下游 数据汇算子
  • 数据汇算子INSERT INTO target_table):接收到上游发的一条一条的数据,写入到 target_table Kafka 中。

可以看到这个实时任务的所有算子是以一种 Pipeline 模式运行的,所有的算子在同一时刻都是处于 running 状态的,24 小时一直在运行,实时任务中也没有离线中常见的分区概念。

在这里插入图片描述

⭐ 关于看如何看一段 Flink SQL 最终的执行计划:最好的方法就如上图,看 Flink Web UI 的算子图,算子图上详细的标记清楚了每一个算子做的事情。

以上图来说,我们可以看到主要有三个算子:

  • Source 算子。Source: TableSourceScan(table=[[default_catalog, default_database, Orders]], fields=[order_id, name]) -> Calc(select=[order_id, name, CAST(CURRENT_TIMESTAMP()) AS row_time]) -> WatermarkAssigner(rowtime=[row_time], watermark=[(row_time - 5000:INTERVAL SECOND)])
    • 其中 Source 表名称为 table=[[default_catalog, default_database, Orders]
    • 字段为 select=[order_id, name, CAST(CURRENT_TIMESTAMP()) AS row_time]
    • Watermark 策略为 rowtime=[row_time], watermark=[(row_time - 5000:INTERVAL SECOND)]
  • 过滤算子。Calc(select=[order_id, name, row_time], where=[(order_id > 3)]) -> NotNullEnforcer(fields=[order_id])
    • 其中过滤条件为 where=[(order_id > 3)]
    • 结果字段为 select=[order_id, name, row_time]
  • Sink 算子。Sink: Sink(table=[default_catalog.default_database.target_table], fields=[order_id, name, row_time])
    • 其中最终产出的表名称为 table=[default_catalog.default_database.target_table]
    • 表字段为 fields=[order_id, name, row_time]

可以看到 Flink SQL 具体执行了哪些操作是非常详细的标记在算子图上。所以小伙伴萌一定要学会看算子图,这是掌握 Debug、调优前最基础的一个技巧。

那么如果这个 SQL 放在 Hive 中执行时,假设其中 Orders 为 Hive 表,target_table 也为 Hive 表,其也会生成三个类似的算子(虽然实际可能会被优化为一个算子,这里为了方便对比,划分为三个进行介绍),离线和实时任务的执行方式完全不同:

  • 数据源算子From Order):数据源从 Orders Hive 表(通常都是读一天、一小时的分区数据)中一次性读取所有的数据,然后将读到的数据全部发给下游 过滤和字段标准化算子,然后 数据源算子 就运行结束了,释放资源了。
  • 过滤和字段标准化算子Where id > 3PRETTY_PRINT(order_id)):接收到上游算子的所有数据,然后遍历所有数据判断 id > 3 ?,将判断结果为 true 的数据执行 PRETTY_PRINT UDF 后,将所有数据发给下游 数据汇算子,然后 过滤和字段标准化算子 就运行结束了,释放资源了
  • 数据汇算子INSERT INTO target_table):接收到上游的所有数据,将所有数据都写到 target_table Hive 表中,然后整个任务就运行结束了,整个任务的资源也就都释放了

可以看到离线任务的算子是分阶段(Stage)进行运行的,每一个 Stage 运行结束之后,然后下一个 Stage 开始运行,全部的 Stage 运行完成之后,这个离线任务就跑结束了。

注意:很多小伙伴都是之前做过离线数仓的,熟悉了离线的分区、计算任务定时调度运行这两个概念,所以在最初接触 Flink SQL 时,会以为 Flink SQL 实时任务也会存在这两个概念,这里博主做一下解释:

  • 分区概念:离线由于能力限制问题,通常都是进行一批一批的数据计算,每一批数据的数据量都是有限的集合,这一批一批的数据自然的划分方式就是时间,比如按小时、天进行划分分区。但是 在实时任务中,是没有分区的概念的,实时任务的上游、下游都是无限的数据流。
  • 计算任务定时调度概念:同上,离线就是由于计算能力限制,数据要一批一批算,一批一批输入、产出,所以要按照小时、天定时的调度和计算。但是 在实时任务中,是没有定时调度的概念的,实时任务一旦运行起来就是 24 小时不间断,不间断的处理上游无限的数据,不简单的产出数据给到下游。

3.SELECT DISTINCT 子句

应用场景(支持 Batch / Streaming):语句和离线 Hive SQL SELECT DISTINCT 语句一样的,用作根据 key 进行数据去重。

INSERT into target_table
SELECT DISTINCT id 
FROM Orders

也是拿离线和实时做对比。

这个 SQL 对应的实时任务,假设 Orders 为 kafka,target_table 也为 Kafka,在执行时,会生成三个算子:

  • 数据源算子From Order):连接到 Kafka topic,数据源算子一直运行,实时的从 Orders Kafka 中一条一条的读取数据,然后一条一条发送给下游的 去重算子
  • 去重算子DISTINCT id):接收到上游算子发的一条一条的数据,然后判断这个 id 之前是否已经来过了,判断方式就是使用 Flink 中的 state 状态,如果状态中已经有这个 id 了,则说明已经来过了,不往下游算子发,如果状态中没有这个 id,则说明没来过,则往下游算子发,也是一条一条发给下游 数据汇算子
  • 数据汇算子INSERT INTO target_table):接收到上游发的一条一条的数据,写入到 target_table Kafka 中。

在这里插入图片描述
对于实时任务,计算时的状态可能会无限增长。状态大小取决于不同 key(上述案例为 id 字段)的数量。为了防止状态无限变大,我们可以设置状态的 TTL。但是这可能会影响查询结果的正确性,比如某个 key 的数据过期从状态中删除了,那么下次再来这么一个 key,由于在状态中找不到,就又会输出一遍。

那么如果这个 SQL 放在 Hive 中执行时,假设其中 Orders 为 Hive 表,target_table 也为 Hive 表,其也会生成三个相同的算子(虽然可能会被优化为一个算子,这里为了方便对比,划分为三个进行介绍),但是其和实时任务的执行方式完全不同:

  • 数据源算子From Order):数据源从 Orders Hive 表(通常都有天、小时分区限制)中一次性读取所有的数据,然后将读到的数据全部发给下游 去重算子,然后 数据源算子 就运行结束了,释放资源了。
  • 去重算子DISTINCT id):接收到上游算子的所有数据,然后遍历所有数据进行去重,将去重完的所有结果数据发给下游 数据汇算子,然后 去重算子 就运行结束了,释放资源了。
  • 数据汇算子INSERT INTO target_table):接收到上游的所有数据,将所有数据都写到 target_table Hive 中,然后整个任务就运行结束了,整个任务的资源也就都释放了。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1419675.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

15.Golang中的反射机制及应用

目录 概述实践基本应用复杂应用 结束 概述 Golang中的反射用法还是比较简单的 reflect.TypeOf(arg)reflect.ValueOf(arg) 实践 基本应用 package mainimport ("fmt""reflect" )func reflectNum(arg interface{}) {fmt.Println("type ", re…

专科拿到季军:微茫星火,奋起直追!

Datawhale干货 作者:“不啻微茫”团队,季军方案 前 言 大家好,我们是 飞桨星河社区 X 智海Mo平台 AI 大模型创意应用大赛 获奖团队——"不啻微茫",很荣幸能有机会与大家分享这次比赛经验,我们从零开始的过程…

python打造光斑处理系统2:打开图像和默认图像

文章目录 打开图像默认图像 光斑处理:python处理高斯光束的图像 光斑处理系统:程序框架 打开图像 光斑图像的本质是光强在空间中的分布,而有的时候,通过CCD拍到的图像往往存成虚假的RGB格式,所以在打开图像时&#x…

AI大语言模型学习笔记之三:协同深度学习的黑魔法 - GPU与Transformer模型

Transformer模型的崛起标志着人类在自然语言处理(NLP)和其他序列建模任务中取得了显著的突破性进展,而这一成就离不开GPU(图形处理单元)在深度学习中的高效率协同计算和处理。 Transformer模型是由Vaswani等人在2017年…

【机器学习300问】20、什么是神经网络?和深度学习什么关系?

在学习深度学习的相关知识之前,我们首先得了解什么是神经网络,解开神经网络的神秘面纱后,什么是深度学习的问题也就迎刃而解。我依旧会采用我习惯的方式:先给出例子直观理解,在给出定义深入理解,最后在实际…

HarmonyOS NEXT 星河版项目案例

参考代码:HeimaHealthy: 鸿蒙项目案例练习 (gitee.com) 1.欢迎页面 Entry Component struct WelcomePage {State message: string Hello Worldbuild() {Column({space: 10}) {Row() {// 1.中央slogonImage($r(app.media.home_slogan)).width(260)}.layoutWeight(…

二叉搜索树的后序遍历序列

作者简介:大家好,我是smart哥,前中兴通讯、美团架构师,现某互联网公司CTO 联系qq:184480602,加我进群,大家一起学习,一起进步,一起对抗互联网寒冬 学习必须往深处挖&…

16.Golang结构体标签与json互转

目录 概述实践结束 概述 本文主要介绍 Golang 中结构体与json互相转化 实践 完整的代码如下: package mainimport ("encoding/json""fmt" )type Movice struct {Title string json:"title"Year int json:"year&qu…

【论文阅读|半监督小苹果检测方法S3AD】

论文题目 : : Semi-supervised Small Apple Detection in Orchard Environments 项目链接:https://www.inf.uni-hamburg.de/en/inst/ab/cv/people/wilms/mad.html 摘要(Abstract) 农作物检测是自动估产或水果采摘等精准农业应用不…

搭建 prometheus + grafana + springboot3 监控

下载安装包 下载prometheus:https://github.com/prometheus/prometheus/releases/download/v2.42.0/prometheus-2.42.0.windows-amd64.zip 下载grafana: https://dl.grafana.com/enterprise/release/grafana-enterprise-9.4.1.windows-amd64.zip Spr…

优化器刺客之limit 1--Order by col limit n 代价预估优化探索

一、现象 order by 排序加了limit后更慢了? test# explain analyze select userid from dba_users where username like %aaaaaaaaaaaaaaaaaa% order by userid ;QUERY PLAN --------------…

Android Gradle Sync Task list is empty

问题 有时候 Android studio 打开项目,可能会遇到构建没有明显报错,但是 Gradle 却没有 Task list,或者 Task list 不完整只有零星几个配置项。连打包任务都没有,我怎么打包! 异常情况: 正常情况&#xf…

【Python机器学习系列】建立XGBoost模型预测心脏疾病(完整实现过程)

一、引言 前文回顾: 一文彻底搞懂机器学习中的归一化与反归一化问题 【Python机器学习系列】一文彻底搞懂机器学习中表格数据的输入形式(理论源码) 【Python机器学习系列】一文带你了解机器学习中的Pipeline管道机制(理论源码…

启发式搜索(A*、IDDFS、IDA*)

我们在解决图问题的时候,通常需要使用DFS和BFS搜索,可是这两种搜索方式的效率较低,我们会遍历到很多空白节点,有没有办法可以优化这种低效问题呢?今天要推出我们的主角:启发式搜索。 一、A* 什么是A*算法…

关于如何将Win幻兽帕鲁服务端存档转化为单人本地存档的一种方法(无损转移)

本文转自博主的个人博客:https://blog.zhumengmeng.work,欢迎大家前往查看。 原文链接:点我访问 **起因:**最近大火的开放世界缝合体游戏幻兽帕鲁的大火也是引起了博主的注意,然后博主和周边小伙伴纷纷入手,博主也是利…

npm 和 yarn 的使用

安装 yarn npm i yarn -g查看版本 npm -v yarn --version切换 npm/yarn 的下包镜像源 // 查看当前的镜像源 npm config get registry// 切换淘宝镜像源 // 新的淘宝源,旧的淘宝源已于2022年05月31日零时起停止服务 npm config set registry https://registry.…

【英语趣味游戏】填字谜(Crossword)第2天

谜题出处 柯林斯字谜大全(6),Collins——Big Book of Crosswords (Book 6) Puzzle Number: 115 本期单词 横向 1、Fetch (8) 拿,取,8个字母 答案:Retrieve,取到,拿回 5、Common s…

【JaveWeb教程】(34)SpringBootWeb案例之《智能学习辅助系统》的详细实现步骤与代码示例(7)配置文件的设置

目录 SpringBootWeb案例054. 配置文件4.1 参数配置化4.2 yml配置文件4.3 ConfigurationProperties SpringBootWeb案例05 前面我们已经实现了员工信息的条件分页查询以及删除操作,以及实现新增和修改员工。 本节的主要内容: 配置文件的设置 4. 配置文件…

<网络安全>《10 高级威胁检测系统ATD》

1 APT 高级长期威胁(英语:Advanced Persistent Threat,缩写:APT),又称高级持续性威胁、先进持续性威胁等,是指隐匿而持久的电脑入侵过程,通常由某些人员精心策划,针对特…

FairGuard游戏加固入选《CCSIP 2023中国网络安全行业全景册(第六版)》

2024年1月24日, FreeBuf咨询正式发布《CCSIP 2023中国网络安全行业全景册(第六版)》。本次发布的全景图,共计展示20个一级分类、108个细分安全领域,旨在为广大企业提供网络安全产品选型参考,帮助企业了解中国网络安全技术与市场的…