Flink之SQL查询操作

news2025/1/12 23:36:52

SQL查询

  • 基本SELECT查询
    • 生成测试数据
    • WITH
    • WHERE
    • DISTINCT
    • ORDER BY
    • LIMIT
  • 窗口函数
    • 概述
    • 创建数据表
    • 滚动窗口 TUMBLE
    • 滑动窗口 HOP
    • 累积窗口 CUMULATE
    • 窗口偏移
  • 聚合
    • 窗口聚合
    • 分组聚合
    • OVER聚合
  • TOP-N
    • 普通Top-N
    • 窗口Top-N
  • 联结Join查询
    • 内部等连接
    • 外部等连接
    • 间隔联结
  • 集合操作
    • UNION 和 UNION ALL
    • Intersect 和 Intersect All
    • Except 和 Except All
    • In 子查询
    • EXISTS
  • 去重
    • 普通去重
    • 窗口去重
  • 函数
    • 标量函数
    • 聚合函数
    • 自定义函数
  • Module
    • 概述
    • 操作命令
    • 加载Hive Module

基本SELECT查询

生成测试数据

使用DataGen SQL连接器生成测试数据

CREATE TABLE datagen (
 f_sequence INT,
 f_random INT,
 f_random_str STRING,
 ts AS localtimestamp,
 WATERMARK FOR ts AS ts
) WITH (
--指定要使用的连接器--
 'connector' = 'datagen',
-- 每秒生成的行数,用以控制数据发出速率。--	
 'rows-per-second'='5',
--指定字段的生成器。可以是 'sequence' 或 'random',默认random--
 'fields.f_sequence.kind'='sequence',
 --序列生成器的起始值--
 'fields.f_sequence.start'='1',
 'fields.f_sequence.end'='1000',
 --随机生成器的最小值,适用于数字类型--
 'fields.f_random.min'='1',
 'fields.f_random.max'='1000',
--随机生成器生成字符的长度,适用于 char、varchar、binary、varbinary、string-- 
'fields.f_random_str.length'='10'
);

在这里插入图片描述

WITH

WITH子句提供了一种用于更大查询而编写辅助语句的方法。这些编写的语句通常被称为公用表表达式,表达式可以理解为仅针对某个查询而存在的临时视图。

语法如下:

WITH <with_item_definition> [ , ... ]
SELECT ... FROM ...;

<with_item_defintion>:
    with_item_name (column_name[, ...n]) AS ( <select_query> )

定义一个公用表表达式orders_with_total,并在一个GROUP BY查询中使用它

WITH orders_with_total AS (
    SELECT order_id, price + tax AS total
    FROM Orders
)
SELECT order_id, SUM(total)
FROM orders_with_total
GROUP BY order_id;

在SQL客户端直接输入:

WITH datagen_with_total AS (
    SELECT f_sequence, f_random + 10 AS total
    FROM datagen
)

SELECT f_sequence, SUM(total)
FROM datagen_with_total
GROUP BY f_sequence;

WHERE

语法格式如下:

SELECT select_list FROM table_expression [ WHERE boolean_expression ]
SELECT * FROM datagen;

SELECT f_sequence,f_random,f_random_str  FROM datagen;

SELECT f_sequence,f_random,f_random_str  FROM datagen WHERE f_sequence >10;

# 在VALUES子句中使用内联数据。每一个元组对应一行,另外可以通过设置别名来为每一列指定名称。
SELECT order_id, price FROM (VALUES (1, 2.0), (2, 3.1))  AS t (order_id, price)

DISTINCT

所有的复制行都会从结果集(每个分组只会保留一行)中被删除

SELECT DISTINCT id FROM Orders

根据f_random字段进行数据去重

SELECT DISTINCT f_random FROM datagen WHERE f_sequence <10;

ORDER BY

ORDER BY子句使结果行根据指定的表达式进行排序。 如果两行根据最左边的表达式相等,则根据下一个表达式进行比较,依此类推。 如果根据所有指定的表达式它们相等,则它们以与实现相关的顺序返回。

在流模式下运行时,表的主要排序顺序必须按时间属性升序。 所有后续的 orders 都可以自由选择。 但是批处理模式没有这个限制。

SELECT *
FROM Orders
ORDER BY order_time, order_id

LIMIT

LIMIT子句限制 SELECT 语句返回的行数。 通常,此子句与 ORDER BY 结合使用,以确保结果是确定性的。

选择表中的前3行

SELECT *
FROM Orders
ORDER BY orderTime
LIMIT 3

窗口函数

概述

Apache Flink提供了3个内置窗口表值函数TVF:TUMBLE、HOP和CUMULATE。

窗口TVF的返回值是一个新的关系,包括原始表的所有列和附加的三个用于指定窗口的列,分别是:window_startwindow_endwindow_time

函数通过时间属性字段为每行数据分配一个窗口。 在流计算模式,时间属性字段必须被指定为事件或处理时间属性。

在流模式下,窗口表值函数的时间属性字段必须位于事件或处理时间属性上。且时间属性字段必须是TIMESTAMPTIMESTAMP_LTZ的类型。

函数运行后,原有的时间属性timecol将转换为一个常规的timestamp

创建数据表

  CREATE TABLE datagen (
  id INT,
  age INT,
  pt AS PROCTIME(), --处理时间
  et AS cast(CURRENT_TIMESTAMP as timestamp(3)), --事件时间
  WATERMARK FOR et AS et - INTERVAL '5' SECOND   --watermark
) WITH (
  'connector' = 'datagen',
  'rows-per-second' = '10',
  'fields.id.min' = '1',
  'fields.id.max' = '10',
  'fields.age.min' = '1',
  'fields.age.max' = '150'
);

滚动窗口 TUMBLE

TUMBLE函数指定每个元素到一个指定大小的窗口中。滚动窗口的大小固定且不重复。

例如:假设指定了一个 5 分钟的滚动窗口。Flink 将每 5 分钟生成一个新的窗口
在这里插入图片描述

TUMBLE函数语法如下:

TUMBLE(TABLE data, DESCRIPTOR(timecol), size [, offset ])
data:是一个表参数,可以是与时间属性列的任意关系。

timecol:是一个列描述符,指示数据的哪些时间属性列应映射到滚动窗口。

size:是指定翻滚窗口宽度的持续时间。

offset: 是一个可选参数,用于指定窗口开始移动的偏移量。

示例:

SELECT * FROM TABLE(
   TUMBLE(TABLE datagen, DESCRIPTOR(et), INTERVAL '5' SECOND));

查询结果如下

          id         age                      pt                      et            window_start              window_end             window_time
           2          28 2023-07-08 18:04:36.792 2023-07-08 18:04:36.792 2023-07-08 18:04:35.000 2023-07-08 18:04:40.000 2023-07-08 18:04:39.999
           7         146 2023-07-08 18:04:36.792 2023-07-08 18:04:36.792 2023-07-08 18:04:35.000 2023-07-08 18:04:40.000 2023-07-08 18:04:39.999
           3          76 2023-07-08 18:04:36.793 2023-07-08 18:04:36.792 2023-07-08 18:04:35.000 2023-07-08 18:04:40.000 2023-07-08 18:04:39.999
          10          94 2023-07-08 18:04:36.793 2023-07-08 18:04:36.793 2023-07-08 18:04:35.000 2023-07-08 18:04:40.000

滑动窗口 HOP

滑动窗口函数指定元素到一个定长的窗口中。滑动窗口有一个固定的持续时间以及一个滑动的间隔。

若滑动间隔小于窗口的持续时间,滑动窗口则会出现重叠,行将会被分配到多个窗口中。

例如:可以定义一个每5分钟滑动一次。大小为10分钟的窗口。每5分钟获得最近10分钟到达的数据的窗口
在这里插入图片描述
HOP函数语法如下:

HOP(TABLE data, DESCRIPTOR(timecol), slide, size [, offset ])
data:是一个表参数,可以是与时间属性列的任意关系

timecol:是一个列描述符,指示数据的哪些时间属性列应映射到跳跃窗口

slide:是指定连续跳跃窗口开始之间的持续时间的持续时间

size:是指定跳跃窗口宽度的持续时间

offset: 是一个可选参数,用于指定窗口开始移动的偏移量

示例:

SELECT * FROM TABLE(
    HOP(TABLE datagen, DESCRIPTOR(et), INTERVAL '5' SECOND, INTERVAL '10' SECOND));

累积窗口 CUMULATE

CUMULATE函数指定元素到多个窗口,从初始的窗口开始,直到达到最大的窗口大小的窗口,所有的窗口都包含其区间内的元素

窗口的开始时间是固定的,可以将CUMULATE函数视为首先应用具有最大窗口大小的TUMBLE窗口,然后将每个滚动窗口拆分为具有相同窗口开始但窗口结束步长不同的几个窗口。 所以累积窗口会产生重叠并且没有固定大小。

累积窗口会在一定的统计周期内进行累积计算。累积窗口中有两个核心的参数:最大窗口长度和累积步长。所谓最大窗口长度其实就是统计周期,最终目的就是统计这段时间内的数据。

例如:1小时步长,24小时大小的累计窗口,每天可以获得如下这些窗口:[00:00, 01:00),[00:00, 02:00),[00:00, 03:00), …, [00:00, 24:00)
在这里插入图片描述
CUMULATE函数语法如下:

CUMULATE(TABLE data, DESCRIPTOR(timecol), step, size)
data:是一个表参数,可以是与时间属性列的任意关系

timecol:是一个列描述符,指示数据的哪些时间属性列应映射到累积窗口

step:是指定连续累积窗口末尾之间增加的窗口大小的持续时间

size:是指定累积窗口最大宽度的持续时间。size必须是 的整数倍step

offset: 是一个可选参数,用于指定窗口开始移动的偏移量

示例:

SELECT * FROM TABLE(
    CUMULATE(TABLE datagen, DESCRIPTOR(et), INTERVAL '2' SECONDS, INTERVAL '6' SECONDS));

窗口偏移

Offset是一个可选参数,可用于更改窗口分配。它可以是正持续时间和负持续时间。窗口偏移的默认值为0。如果设置不同的偏移值,同一条记录可能会分配到不同的窗口。

例如,对于2021-06-30 00:00:04大小为 10 MINUTE 的 Tumble 窗口,带有时间戳的记录将分配给哪个窗口?

如果offset值为-16 MINUTE,则记录分配给窗口 [ 2021-06-29 23:54:00, 2021-06-30 00:04:00)
如果offset值为-6 MINUTE,则记录分配给窗口 [ 2021-06-29 23:54:00, 2021-06-30 00:04:00)
如果offset是-4 MINUTE,则记录分配给窗口[ 2021-06-29 23:56:00, 2021-06-30 00:06:00)

如果offset是0,则记录分配给窗口[ 2021-06-30 00:00:00, 2021-06-30 00:10:00)

如果offset是4 MINUTE,则记录分配给窗口[ 2021-06-29 23:54:00, 2021-06-30 00:04:00)
如果offset是6 MINUTE,则记录分配给窗口[ 2021-06-29 23:56:00, 2021-06-30 00:06:00)
如果offset是16 MINUTE,则记录分配给窗口[ 2021-06-29 23:56:00, 2021-06-30 00:06:00)

可以发现,一些窗口偏移参数可能对窗口的分配有同样的影响。在上述情况下,-16 、-6 和4对于大小为10的翻滚窗口具有相同的效果。

窗口偏移的作用只是更新窗口分配,对 Watermark 没有影响。

示例:

SQL> SELECT * FROM TABLE(
   TUMBLE(TABLE datagen, DESCRIPTOR(et), INTERVAL '5' SECOND, INTERVAL '10' SECOND));

聚合

窗口聚合

窗口聚合是通过GROUP BY子句定义的,其特征是包含窗口表值函数 产生的window_startwindow_end列。和普通的GROUP BY子句一样,窗口聚合对于每个组会计算出一行数据

和其他连续表上的聚合不同,窗口聚合不产生中间结果,只在窗口结束产生一个总的聚合结果,另外,窗口聚合会清除不需要的中间状态

窗口函数聚合使用语法:

FROM TABLE(
窗口类型(TABLE 表名, DESCRIPTOR(时间字段),INTERVAL 时间)
)
GROUP BY [window_start,][window_end,] --可选

1.ROLLUP窗口聚合
聚合写法:

   SELECT window_start, window_end, SUM(age) sumAge
  FROM TABLE(
    TUMBLE(TABLE datagen, DESCRIPTOR(et), INTERVAL '5' SECOND))
  GROUP BY window_start, window_end;

输出如下结果:

            window_start              window_end      sumAge
 2023-07-08 18:32:05.000 2023-07-08 18:32:10.000        2840
 2023-07-08 18:32:10.000 2023-07-08 18:32:15.000        3558

2.HOP窗口聚合
聚合写法:

SELECT window_start, window_end, SUM(age) FROM TABLE(
    HOP(TABLE datagen, DESCRIPTOR(et), INTERVAL '5' SECOND, INTERVAL '10' SECOND))
    GROUP BY window_start, window_end;

3.CUMULATE窗口聚合
聚合写法:

 SELECT window_start, window_end, SUM(age)
  FROM TABLE(
    CUMULATE(TABLE datagen, DESCRIPTOR(et), INTERVAL '5' SECOND, INTERVAL '10' SECOND))
  GROUP BY window_start, window_end;

4.GROUPING SET窗口聚合

1.GROUPING SETS

窗口聚合也支持GROUPING SETS语法,要求窗口聚合中GROUP BY子句必须包含window_start 和 window_end列,但GROUPING SETS子句中不能包含这两个字段。

Flink SQL> SELECT window_start, window_end, supplier_id, SUM(price) as price
  FROM TABLE(
    TUMBLE(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '10' MINUTES))
  GROUP BY window_start, window_end, GROUPING SETS ((supplier_id), ());
+------------------+------------------+-------------+-------+
|     window_start |       window_end | supplier_id | price |
+------------------+------------------+-------------+-------+
| 2020-04-15 08:00 | 2020-04-15 08:10 |      (NULL) | 11.00 |
| 2020-04-15 08:00 | 2020-04-15 08:10 |   supplier2 |  5.00 |
| 2020-04-15 08:00 | 2020-04-15 08:10 |   supplier1 |  6.00 |
| 2020-04-15 08:10 | 2020-04-15 08:20 |      (NULL) | 10.00 |
| 2020-04-15 08:10 | 2020-04-15 08:20 |   supplier2 |  9.00 |
| 2020-04-15 08:10 | 2020-04-15 08:20 |   supplier1 |  1.00 |
+------------------+------------------+-------------+-------+

2.ROLLUP

ROLLUP是一种特定通用类型 Grouping Sets 的简写。代表着指定表达式和所有前缀的列表,包括空列表。

例如:ROLLUP (one,two) 等效于 GROUPING SET((one,two),(one),())

SELECT window_start, window_end, supplier_id, SUM(price) as price
FROM TABLE(
    TUMBLE(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '10' MINUTES))
GROUP BY window_start, window_end, ROLLUP (supplier_id);

3.CUBE

CUBE 是一种特定通用类型 Grouping Sets 的简写。代表着指定列表以及所有可能的子集和幂集。

SELECT window_start, window_end, item, supplier_id, SUM(price) as price
  FROM TABLE(
    TUMBLE(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '10' MINUTES))
  GROUP BY window_start, window_end, CUBE (supplier_id, item);

分组聚合

基本聚合函数

聚合函数根据多个输入行计算单个结果。例如,聚合来计算一组行的COUNT、SUM、AVG、MAX、MIN。

select COUNT(*) from datagen;

SELECT age, COUNT(*) as total FROM datagen GROUP BY age;

DISTINCT

DISTINCT聚合在聚合函数前去掉重复的数据。

计算datagen表中不同 age的数量而不是总行数。

SELECT COUNT(DISTINCT age) FROM datagen;

GROUPING SETS

通过一个标准的 GROUP BY 语句来描述更复杂的分组操作。数据按每个指定的 Grouping Sets 分别分组,并像简单的 group by 子句一样为每个组进行聚合。

SELECT supplier_id, rating, COUNT(*) AS total
FROM (VALUES
    ('supplier1', 'product1', 4),
    ('supplier1', 'product2', 3),
    ('supplier2', 'product3', 3),
    ('supplier2', 'product4', 4))
AS Products(supplier_id, product_id, rating)
ROUPING SETS 的每个子列表可以是:空的,多列或表达式,它们的解释方式和直接使用 GROUP BY 子句是一样的。

一个空的 Grouping Sets 表示所有行都聚合在一个分组下,即使没有数据,也会输出结果

对于 Grouping Sets 中的空子列表,结果数据中的分组或表达式列会用NULL代替

ROLLUP

ROLLUP是一种特定通用类型 Grouping Sets 的简写。代表着指定表达式和所有前缀的列表,包括空列表。

SELECT supplier_id, rating, COUNT(*)
FROM (VALUES
    ('supplier1', 'product1', 4),
    ('supplier1', 'product2', 3),
    ('supplier2', 'product3', 3),
    ('supplier2', 'product4', 4))
AS Products(supplier_id, product_id, rating)
GROUP BY ROLLUP (supplier_id, rating)

CUBE

CUBE是一种特定通用类型 Grouping Sets 的简写。代表着指定列表以及所有可能的子集和幂集

SELECT supplier_id, rating, product_id, COUNT(*)
FROM (VALUES
    ('supplier1', 'product1', 4),
    ('supplier1', 'product2', 3),
    ('supplier2', 'product3', 3),
    ('supplier2', 'product4', 4))
AS Products(supplier_id, product_id, rating)
GROUP BY CUBE (supplier_id, rating, product_id)

HAVING

HAVING 会删除 group 后不符合条件的行。 HAVING 和 WHERE 的不同点:

WHERE在GROUP BY之前过滤单独的数据行

HAVING过滤GROUP BY生成的数据行
SELECT SUM(amount)
FROM Orders
GROUP BY users
HAVING SUM(amount) > 50

OVER聚合

OVER聚合计算一系列有序行上每个输入行的聚合值。与GROUP BY聚合相反,OVER聚合不会将每个组的结果行数减少到一行。相反,OVER聚合会为每个输入行生成聚合值。

OVER窗口的语法如下:

SELECT
  agg_func(agg_col) OVER (
    [PARTITION BY col1[, col2, ...]]
    ORDER BY time_col
    range_definition),
  ...
FROM ...
ORDER BY:必须是时间戳列(事件时间、处理时间),只能升序

PARTITION BY:标识了聚合窗口的聚合粒度

range_definition:这个标识聚合窗口的聚合数据范围,在Flink中有两种指定数据范围的方式。第一种为按照行数聚合,第二种为按照时间区间聚合

ORDER BY

OVER窗口需要数据是有序的。因为表没有固定的排序,所以ORDER BY子句是强制的。对于流式查询,Flink目前只支持 OVER 窗口定义在升序(asc)的 时间属性 上。其他的排序不支持。

PARTITION BY

OVER窗口可以定义在一个分区表上。PARTITION BY子句代表着每行数据只在其所属的数据分区进行聚合。

范围RANGE定义

1.按照时间区间聚合-RANGE间隔

RANGE间隔是定义在排序列值上的,在 Flink 里,排序列总是一个时间属性。

定义了聚合会在比当前行的时间属性小 30 分钟的所有行上进行。

RANGE BETWEEN INTERVAL '30' MINUTE PRECEDING AND CURRENT ROW

查看前5秒到现在收到的水位数据条数

SELECT 
    id, 
    age, 
    et,
    count(age) OVER (
        PARTITION BY id
        ORDER BY et
        RANGE BETWEEN INTERVAL '5' SECOND PRECEDING AND CURRENT ROW
  ) AS cnt
FROM datagen;

用WINDOW子句来在SELECT外部单独定义一个OVER窗口

SELECT 
    id, 
    age, 
    et,
count(age) OVER w AS cnt,
sum(age) OVER w AS sumAge
FROM datagen
WINDOW w AS (
    PARTITION BY id
    ORDER BY et
    RANGE BETWEEN INTERVAL '5' SECOND PRECEDING AND CURRENT ROW
);

2.按照行数聚合-ROW间隔

间隔ROWS是基于计数的间隔。它准确定义了聚合中包含的行数。

ROWS间隔基于计数。它定义了聚合操作包含的精确行数。

定义了当前行 + 之前的 10 行(也就是11行)都会被聚合。

ROWS BETWEEN 10 PRECEDING AND CURRENT ROW
WINDOW

统计当前行之前的5行和当前行(总共6行)的数据。或理解为统计前5条到现在的数据

SELECT 
    id, 
    age, 
    et,
    avg(age) OVER (
      PARTITION BY age
      ORDER BY et
      ROWS BETWEEN 5 PRECEDING AND CURRENT ROW
) AS avgAge
FROM datagen;

用WINDOW子句来在SELECT外部单独定义一个OVER窗口

SELECT 
    id, 
    age, 
    et,
avg(age) OVER w AS avgAge,
count(age) OVER w AS cnt
FROM datagen
WINDOW w AS (
    PARTITION BY age
    ORDER BY et
    ROWS BETWEEN 5 PRECEDING AND CURRENT ROW
);

TOP-N

普通Top-N

Top-N 查询要求按列排序的 N 个最小值或最大值。最小值和最大值集都被视为 Top-N 查询。当需要根据条件仅显示批处理/流式表中的 N 个最底部或 N 个最顶部记录时,前 N 条查询非常有用。

语法:

SELECT [column_list]
FROM (
   SELECT [column_list],
     ROW_NUMBER() OVER ([PARTITION BY col1[, col2...]]
       ORDER BY col1 [asc|desc][, col2 [asc|desc]...]) AS rownum
   FROM table_name)
WHERE rownum <= N [AND conditions]

参数说明:

ROW_NUMBER():根据分区内行的顺序,为每行分配一个唯一的连续编号,从 1 开始。目前,我们仅支持ROW_NUMBERover window 功能。今后,我们将一如既往地支持RANK()和支持DENSE_RANK()。

PARTITION BY col1[, col2...]:指定分区列。每个分区都会有一个 Top-N 结果。

ORDER BY col1 [asc|desc][, col2 [asc|desc]...]:指定排序列。不同列上的排序方向可能不同。

WHERE rownum <= N:rownum <= NFlink 需要识别该查询是 Top-N 查询。N 表示将保留的 N 个最小或最大的记录。

[AND conditions]:where 子句中可以自由添加其他条件,但其他条件只能使用rownum <= N连词组合AND。

获取前3名的数据

SELECT * FROM (
  SELECT *,ROW_NUMBER() OVER (PARTITION BY age ORDER BY et DESC) AS row_num
  FROM datagen)
WHERE row_num <=3;

窗口Top-N

窗口Top-N是特殊的 Top-N,它返回每个分区键的每个窗口的N个最小或最大值。

窗口Top-N只在窗口最后返回汇总的Top-N数据,不会产生中间结果。

窗口Top-N 会在窗口结束后清除不需要的中间状态。 因此,窗口Top-N 适用于用户不需要每条数据都更新Top-N结果的场景,相对普通Top-N来说性能更好。通常,窗口 Top-N 直接用于 窗口表值函数上。

窗口Top-N的语法和普通的Top-N 相同,需要PARTITION BY子句包含 窗口表值函数 或 窗口聚合 产生的window_start和window_end。

1.在窗口聚合后进行窗口 Top-N

在10分钟的滚动窗口上计算销售额位列前三的供应商

Flink SQL> SELECT *
  FROM (
    SELECT *, ROW_NUMBER() OVER (PARTITION BY window_start, window_end ORDER BY price DESC) as rownum
    FROM (
      SELECT window_start, window_end, supplier_id, SUM(price) as price, COUNT(*) as cnt
      FROM TABLE(
        TUMBLE(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '10' MINUTES))
      GROUP BY window_start, window_end, supplier_id
    )
  ) WHERE rownum <= 3;

2.在窗口表值函数后进行窗口Top-N

在10分钟的滚动窗口上计算价格位列前三的数据

SELECT *
  FROM (
    SELECT *, ROW_NUMBER() OVER (PARTITION BY window_start, window_end ORDER BY price DESC) as rownum
    FROM TABLE(
               TUMBLE(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '10' MINUTES))
  ) WHERE rownum <= 3;

联结Join查询

内部等连接

内联结用INNER JOIN来定义,会返回两表中符合联接条件的所有行的组合,也就是所谓的笛卡尔积(Cartesian product)。目前仅支持等值联结条件

SELECT * FROM Orders
INNER JOIN Product
ON Orders.productId = Product.id

外部等连接

外联结也会返回符合联结条件的所有行的笛卡尔积;另外,还可以将某一侧表中找不到任何匹配的行也单独返回。

Flink SQL支持左外(LEFT JOIN)、右外(RIGHT JOIN)和全外(FULL OUTER JOIN),分别表示会将左侧表、右侧表以及双侧表中没有任何匹配的行返回。

SELECT * FROM Orders
LEFT JOIN Product
ON Orders.product_id = Product.id

SELECT * FROM Orders
RIGHT JOIN Product
ON Orders.product_id = Product.id

SELECT * FROM Orders
FULL OUTER JOIN Product
ON Orders.product_id = Product.id

间隔联结

返回受连接条件和时间约束限制的简单笛卡尔积。间隔连接至少需要一个等连接谓词和一个限制两侧时间的连接条件。两个适当的范围谓词可以定义这样的条件(<、<=、>=、>)、BETWEEN 谓词或比较两个输入的相同类型的时间属性(即处理时间或事件时间)的单个相等谓词表。

例如,如果订单在收到订单四小时后发货,则此查询会将所有订单与其相应的发货连接起来。

SELECT *
FROM Orders o, Shipments s
WHERE o.id = s.order_id
AND o.order_time BETWEEN s.ship_time - INTERVAL '4' HOUR AND s.ship_ti

集合操作

UNION 和 UNION ALL

UNION:将集合合并并且去重

UNION ALL:将集合合并,不做去重
Flink SQL> create view t1(s) as values ('c'), ('a'), ('b'), ('b'), ('c');
Flink SQL> create view t2(s) as values ('d'), ('e'), ('a'), ('b'), ('b');

Flink SQL> (SELECT s FROM t1) UNION (SELECT s FROM t2);
Flink SQL> (SELECT s FROM t1) UNION ALL (SELECT s FROM t2);

Intersect 和 Intersect All

Intersect:交集并且去重

Intersect ALL:交集不做去重
Flink SQL> (SELECT s FROM t1) INTERSECT (SELECT s FROM t2);

Flink SQL> (SELECT s FROM t1) INTERSECT ALL (SELECT s FROM t2);

Except 和 Except All

Except:差集并且去重

Except ALL:差集不做去重
Flink SQL> (SELECT s FROM t1) EXCEPT (SELECT s FROM t2);

Flink SQL> (SELECT s FROM t1) EXCEPT ALL (SELECT s FROM t2);

In 子查询

如果给定表子查询中存在表达式,则返回 true。子查询表必须由一列组成。该列必须具有与表达式相同的数据类型。

SELECT user, amount FROM Orders
WHERE product IN (
    SELECT product FROM NewProducts
)

EXISTS

如果子查询返回至少一行,则为 true。只支持能被重写为 join 和 group 的操作。

SELECT user, amount
FROM Orders
WHERE product EXISTS (
    SELECT product FROM NewProducts
)

去重

普通去重

去重是去掉重复的行,只保留第一或者最后一行。

Flink使用ROW_NUMBER()去除重复数据,去重就是Top-N 在 N 为 1 时的特例,并且去重必须要求按照处理或者事件时间排序。

语法如下:

SELECT [column_list]
FROM (
   SELECT [column_list],
     ROW_NUMBER() OVER ([PARTITION BY col1[, col2...]]
       ORDER BY time_attr [asc|desc]) AS rownum
   FROM table_name)
WHERE rownum = 1

参数说明:

ROW_NUMBER():为每一行分配一个唯一且连续的数字,从 1 开始

PARTITION BY col1[, col2...]:指定分区键,即需要去重的键

ORDER BY time_attr [asc|desc]:指定排序列,必须是 时间属性。目前 Flink 支持 处理时间属性 和 事件时间属性。Order by ASC 保留第一行,DESC 保留最后一行

WHERE rownum = 1:Flink 需要这个条件来识别去重语句

示例:

CREATE TABLE Orders (
  order_id  STRING,
  user        STRING,
  product     STRING,
  num         BIGINT,
  proctime AS PROCTIME()
) WITH (...);

SELECT order_id, user, product, num
FROM (
  SELECT *,
    ROW_NUMBER() OVER (PARTITION BY order_id ORDER BY proctime ASC) AS row_num
  FROM Orders)
WHERE row_num = 1

窗口去重

窗口去重是一种特殊的 去重,它根据指定的多个列来删除重复的行,保留每个窗口和分区键的第一个或最后一个数据。

窗口重复数据删除是一种特殊的重复数据删除,它删除一组列上重复的行,保留每个窗口和分区键的第一个或最后一个。

语法:

SELECT [column_list]
FROM (
   SELECT [column_list],
     ROW_NUMBER() OVER (PARTITION BY window_start, window_end [, col_key1...]
       ORDER BY time_attr [asc|desc]) AS rownum
   FROM table_name) -- relation applied windowing TVF
WHERE (rownum = 1 | rownum <=1 | rownum < 2) [AND conditions]

参数说明:

ROW_NUMBER():为每一行分配一个唯一的连续编号,从 1 开始。

PARTITION BY window_start, window_end [, col_key1...]window_start:指定包含、window_end等分区键的分区列。

ORDER BY time_attr [asc|desc]:指定排序列,必须是时间属性。目前Flink支持处理时间属性和事件时间属性。按 ASC 排序意味着保留第一行,按 DESC 排序意味着保留最后一行。

WHERE (rownum = 1 | rownum <=1 | rownum < 2):rownum = 1 | rownum <=1 | rownum < 2优化器需要识别查询可以转换为窗口重复数据删除。
 SELECT * FROM (
    SELECT id, age, et, pt,
      ROW_NUMBER() OVER (PARTITION BY age ORDER BY id DESC) AS rownum
    FROM datagen )
    WHERE rownum <= 1;

注意:

目前只支持在滚动窗口、滑动窗口和累积窗口的窗口表值函数后进行窗口去重

目前只支持根据事件时间属性进行排序

函数

Flink允许用户在Table API 和 SQL中使用函数进行数据的转换,为用户提供了一组内置的数据转换函数。

系统函数也叫内置函数,是在系统中预先实现好的功能模块。可以通过固定的函数名直接调用,实现想要的转换操作。Flink SQL中的系统函数又主要可以分为两大类:标量函数和聚合函数。

标量函数

标量函数将零、一个或多个值作为输入并返回单个值作为结果。

1.比较函数

比较函数其实就是一个比较表达式,用来判断两个值之间的关系,返回一个布尔类型的值。这个比较表达式可以是用 <、>、= 等符号连接两个值,也可以是用关键字定义的某种判断。

SQL函数Table函数描述
value1 = value2value1 === value2如果 value1 等于 value2 返回 TRUE;如果 value1 或者 value2 为 NULL 返回 UNKNOW

2.逻辑函数

逻辑函数就是一个逻辑表达式,也就是用与(AND)、或(OR)、非(NOT)将布尔类型的值连接起来,也可以用判断语句(IS、IS NOT)进行真值判断;返回的还是一个布尔类型的值。

SQL函数Table函数描述
boolean1 OR boolean2BOOLEAN1 // BOOLEAN2如果 boolean1 为 TRUE 或 boolean2 为 TRUE 返回 TRUE。支持三值逻辑。 例如 true

3.算术函数

进行算术计算的函数,包括用算术符号连接的运算,和复杂的数学运算。

SQL函数Table函数描述
numeric1 + numeric2NUMERIC1 + NUMERIC2返回 numeric1 加 numeric2

4.字符串函数
进行字符串处理的函数

string1 || string2  两个字符串的连接

UPPER(string)  将字符串string转为全部大写

CHAR_LENGTH(string)  计算字符串string的长度
SQL函数Table函数描述
UPPER(string)STRING.upperCase()以大写形式返回字符串

5.时间函数
进行与时间相关操作的函数

DATE string  按格式"yyyy-MM-dd"解析字符串string,返回类型为SQL Date

TIMESTAMP string  按格式"yyyy-MM-dd HH:mm:ss[.SSS]"解析,返回类型为SQL timestamp

CURRENT_TIME  返回本地时区的当前时间,类型为SQL time(与LOCALTIME等价)

INTERVAL string range  返回一个时间间隔。
SQL函数Table函数描述
DATE stringSTRING.toDate()以“yyyy-MM-dd”的形式返回从字符串解析的 SQL 日期

6.其他:

更多类型的标量函数参考:官方文档-标量函数

聚合函数

聚合函数是以表中多个行作为输入,提取字段进行聚合操作的函数,会将唯一的聚合值作为结果返回。

聚合函数应用非常广泛,不论分组聚合、窗口聚合还是开窗(Over)聚合,对数据的聚合操作都可以用相同的函数来定义。

COUNT(*)  返回所有行的数量,统计个数。

SUM([ ALL | DISTINCT ] expression)  对某个字段进行求和操作。默认情况下省略了关键字ALL,表示对所有行求和;如果指定DISTINCT,则会对数据进行去重,每个值只叠加一次。

RANK()   返回当前值在一组值中的排名。

ROW_NUMBER()    对一组值排序后,返回当前值的行号

RANK()和ROW_NUMBER()一般用在OVER窗口中
SQL函数Table函数描述
COUNT(*)COUNT(1)FIELD.count 返回输入行数

自定义函数

自定义函数(UDF)是一种扩展开发机制,可以用来在查询语句里调用难以用其他方式表达的频繁使用或自定义的逻辑。


// 定义函数逻辑
public static class SubstringFunction extends ScalarFunction {
  public String eval(String s, Integer begin, Integer end) {
    return s.substring(begin, end);
  }
}

TableEnvironment env = TableEnvironment.create(...);

// 在 Table API 里不经注册直接“内联”调用函数
env.from("MyTable").select(call(SubstringFunction.class, $("myField"), 5, 12));

// 注册函数
env.createTemporarySystemFunction("SubstringFunction", SubstringFunction.class);

// 在 Table API 里调用注册好的函数
env.from("MyTable").select(call("SubstringFunction", $("myField"), 5, 12));

// 在 SQL 里调用注册好的函数
env.sqlQuery("SELECT SubstringFunction(myField, 5, 12) FROM MyTable");

Module

概述

Module允许Flink扩展函数能力。它是可插拔的

Flink 包含了以下三种Module:

1.核心模块

Flink内置的Module,其包含了目前Flink内置的所有UDF,Flink默认开启的Module就是CoreModule,可以直接使用其中的UDF

2.Hive模块

可以将Hive内置函数作为Flink的系统函数提供给SQL\Table API用户进行使用,比如get_json_object 这类Hive内置函数(Flink 默认的 CoreModule 是没有的)

3.用户自定义Module:

用户可以实现Module接口实现自己的UDF扩展 Module

在 Flink 中,Module可以被 加载、启用 、禁用 、卸载,当加载Module 之后,默认就是开启的。

操作命令

# 查看
SHOW MODULES;
SHOW FULL MODULES;

# 加载
LOAD MODULE module_name [WITH ('key1' = 'val1', 'key2' = 'val2', ...)]

# 启用module,没有被use的为禁用
USE MODULES module_name

# 卸载
UNLOAD MODULE module_name 

操作可以同时支持多个Module,并且根据加载Module的顺序去按顺序查找和解析UDF,先查到的先解析使用。

Flink只会解析已经启用的Module,当两个Module中出现两个同名的函数且都启用时, Flink会根据加载Module的顺序进行解析,结果就是会使用顺序为第一个Module的UDF

可以更改顺序

# 将Hive Module设为第一个使用及解析的Module
USE MODULE hive,core;

加载Hive Module

加载官方已经提供的的 Hive Module,将Hive已有的内置函数作为 Flink 的内置函数

1.下载:flink-sql-connector-hive

2.上传jar包到flink的lib

cp flink-sql-connector-hive-2.3.9_2.12-1.17.0.jar /usr/local/program/flink/lib/

注意:拷贝hadoop的包,解决依赖冲突问题

cp /usr/local/program/hadoop/share/hadoop/mapreduce/hadoop-mapreduce-client-core-3.3.4.jar /usr/local/program/flink/lib/

3.重启flink集群和sql-client

# 基于独立模式的会话模式部署
./bin/start-cluster.sh

# SQL客户端默认使用embedded模式
./bin/sql-client.sh

4.加载hive module

-- hive-connector内置了hive module,提供了hive自带的系统函数
load module hive with ('hive-version'='3.1.3');
show modules;
show functions;

-- 可以调用hive的split函数
select split('a,b', ',');

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1192594.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

内存对齐规则

前言 求结构体的大小是很热门的考点&#xff0c;无论你是学C还是C&#xff0c;都会遇到这样的问题&#xff0c;在面试中也很受欢迎&#xff0c;所以我们先思考这样一个问题&#xff1a;计算结构体&#xff0c;联合体和类的大小应该怎么去计算呢&#xff1f;我们知道&#xff0c…

105.am40刷机(linux)折腾记1-前期的准备工作1

前段时间在某鱼上逛的时候&#xff0c;发现一款3399的盒子只要150大洋&#xff0c;内心就开始澎拜&#xff0c;一激动就下手了3台&#xff0c;花了450大洋&#xff08;现在想想&#xff0c;心都碎了一地&#xff09;。 然后自己又来来回回折腾了几天&#xff0c;目前能跑上fire…

C# 异步日志记录类,方便下次使用,不用重复造轮子

先定义接口类&#xff1a; using System; using System.Collections.Generic; using System.Linq; using System.Text; using System.Threading.Tasks;namespace 异常 {internal interface ILog{Task WriteErrorLog(string message);Task WriteInfoLog(string message);Task W…

高级运维学习(十四)Zabbix监控(一)

一 监控概述 1 监控的目的 &#xff08;1&#xff09;报告系统运行状况 每一部分必须同时监控内容包括吞吐量、反应时间、使用率等 &#xff08;2&#xff09;提前发现问题 进行服务器性能调整前&#xff0c;知道调整什么找出系统的瓶颈在什么地方 2 监控的资源类别 …

钉钉API与集简云无代码开发连接:电商平台与营销系统的自动化集成

连接科技与能源&#xff1a;钉钉API与集简云的一次集成尝试 在数字化时代&#xff0c;许多公司面临着如何将传统的工作方式转变为更智能、高效的挑战。某能源科技有限公司也不例外&#xff0c;他们是一家专注于能源科技领域的公司&#xff0c;产品包括节能灯具、光伏逆变器、电…

Flink之SQL客户端与DDL操作

SQL客户端与DDL操作 Flink SQLSQL客户端1.启动Flink2.启动Flink的SQL客户端3.HELP命令4.验证连接5.结果显示模式6.执行配置 数据库操作1.创建数据库2.查询数据库3.修改数据库4.删除数据库 表操作1.创建表表列属性表Watermark属性列PRIMARY KEY属性列PARTITIONED BY属性列WITH选…

真是性价比之王,腾讯云这款88元云服务器已经圈粉无数!

你是否曾经想过拥有一台属于自己的云服务器&#xff0c;但是却被高昂的价格和复杂的配置吓到了&#xff1f;现在&#xff0c;腾讯云推出了一款价格亲民、简单易用的88元云服务器&#xff0c;让你的梦想成为现实。腾讯云88元/年云服务器配置见下图&#xff1a; 腾讯云88元服务器…

大厂面试题-行锁、临键锁、间隙锁的理解

行锁、临键锁、间隙锁&#xff0c;都是MySQL里面InnoDB引擎下解决事务隔离性的一系列排他锁。 分别介绍一下这三种锁&#xff1a; 1、行锁&#xff0c;也称为记录锁。(如图) 当我们针对主键或者唯一索引加锁的时候&#xff0c;MySQL默认会对查询的这一行数据加行锁&#xff…

视频剪辑方法:为视频剪辑添加亮点,如何制作精美的滚动字幕

在视频剪辑中&#xff0c;滚动字幕是一个重要的元素&#xff0c;它可以为视频增添视觉吸引力&#xff0c;增强观看体验。滚动字幕的长度和速度也是非常重要的因素。如果滚动字幕太长&#xff0c;会让人感到拖沓&#xff1b;如果滚动字幕太短&#xff0c;会让人感到匆忙。因此&a…

Java设计模式-创建者模式-工厂模式

工厂模式 工厂模式简单工厂模式工厂方法模式抽象工厂模式 工厂模式 要求&#xff1a;由一个特定的工厂提供所需的对象&#xff0c;由工厂来完成对象的创建 工厂模式一般分为三种&#xff1a;简单工厂模式&#xff0c;工厂方法模式&#xff0c;抽象工厂模式 其中简单工厂模式不…

第三方支付支付宝的信息安全分析

随着信息技术的进步&#xff0c;网络结算方式也在迅速发展。网上结算虽然便捷快速&#xff0c;但是如果没有保障的平台或者法律的支持&#xff0c;双方在没有约束的情况下&#xff0c;就会导致拖延、折扣或者拒付等许多经济事件的发生&#xff0c;由此第三方支付就随之产生。第…

三目运算符与if 判断语句的区别

我们用一个案例解释python的判断语句&#xff1a; if a > b &#xff1a;return aelse&#xff1a;return b案例&#xff1a; a 2b 3if a > b:max aelse:max b这样利用python语言&#xff0c;写一个if 判断没有问题吧 三目运算符 下面我们用三目运算符来完成这样一…

Win10专业版安装wsl-ubuntu子系统

文章目录 一、查看是否满足安装要求二、管理员权限启动 Windows PowerShell三、启用Windows10子系统功能四、启用虚拟机平台功能五、重启电脑六、下载 Linux 内核更新包&#xff08;适用于 x64 计算机的 WSL2 Linux 内核更新包&#xff09;七、将 WSL 2 设置为默认版本八、打开…

台灯护眼灯哪个牌子好?护眼台灯品牌型号推荐榜单

台灯可以说家家必备&#xff01;家中有上学的小孩更是需要一款好台灯&#xff0c;因为看书、写字、做作业都离不开台灯&#xff0c;一款好的台灯不仅会提供明亮的学习环境&#xff0c;而且还能保护视力&#xff0c;预防近视&#xff0c;因此&#xff0c;挑选台灯绝对不可以马虎…

React Native适配Xcode 15 iOS 17.0+

iOS 17.0 Simulator(21A328)下载失败 App Store 更新到 Xcode15 后&#xff0c;无法运行模拟器和真机。需要下载iOS 17对应的模拟器。Xcode中更新非常容易中断失败&#xff0c;可以在官网单独下载iOS 17模拟器文件&#xff0c;例如&#xff1a;iOS_17.0.1_Simulator_Runtime.d…

React 递归手写流程图展示树形数据

需求 根据树的数据结构画出流程图展示&#xff0c;支持新增前一级、后一级、同级以及删除功能&#xff08;便于标记节点&#xff0c;把节点数据当作label展示出来了&#xff0c;实际业务中跟据情况处理&#xff09; 文件结构 初始数据 [{"ticketTemplateCode": &…

Vite - 配置 - 不同的环境执行不同的配置文件

目标描述 通过不同的命令&#xff0c;执行不同的环境的配置文件中的内容&#xff1a; npm run dev : 执行开发环境的配置文件 npm run build: 执行生产环境的配置文件 环境文件准备 为了在不同的环境中使用不同的配置文件&#xff0c;我们将配置文件拆分开来。 开发环境使用开发…

【Python基础】基于UPD协议实现简易聊天室(Socket编程)

UDP通信 1.什么是 socket2. 创建 socket3.udp 网络程序-发送、接收数据&#xff08;User Datagram Protocol&#xff09;udp 网络程序-发送、接收数据&#xff08;客户端&#xff09;udp 绑定信息udp 绑定信息---服务器端总结 4.udp 聊天器 1.什么是 socket socket(简称 套接字…

如何快速编写测试用例?

当你学会了如何设计测试用例之后&#xff0c;接下来便是开始用例的编写。 在设计阶段&#xff0c;更准确的说应该是识别测试点的过程&#xff0c;而编写阶段则是将测试点细化成一条条测试用例的过程&#xff0c;有了比较全的用例场景后&#xff0c;如何让别人更舒服、更方便、…

Python + UnitTest 软件测试流程总结

以测试用户登录流程为例&#xff1a; TestCase&#xff1a; TestCase 主要用来编写测试用例&#xff0c;这里结合 断言&#xff08;assertEqual 和 assertIn&#xff09; 进行判断&#xff0c;避免了手动书写判断。 # tools.py # 登录验证方法 def login(username, password…