PyFlink系列之一:PyFlink安装和PyFlink使用的详细技术

news2025/1/16 12:38:50

PyFlink系列之一:PyFlink安装和PyFlink使用的详细技术

  • 一、下载PyFlink
  • 二、创建TableEnvironment
  • 三、TableEnvironment API
    • 1.Table/SQL 操作
    • 2.执行/解释作业
    • 3.创建/删除用户自定义函数
    • 4.依赖管理
    • 5.配置
  • 四、Catalog APIs
  • 五、Statebackend,Checkpoint 以及重启策略
  • 六、Table API
  • 七、Operations
    • 1.From:Batch Streaming
    • 2.FromValues:Batch Streaming
    • 3.Select:Batch Streaming
    • 4.As:Batch Streaming
    • 5.Where / Filter:Batch Streaming
  • 八、列操作
    • 1.AddColumns:Batch Streaming
    • 2.AddOrReplaceColumns:Batch Streaming
    • 3.DropColumns:Batch Streaming
    • 4.RenameColumns:Batch Streaming
  • 九、Aggregations
    • 1.GroupBy Aggregation:Batch Streaming
    • 2.GroupBy Window Aggregation:Batch Streaming
    • 3.Over Window Aggregation
    • 4.Distinct Aggregation:Batch Streaming
  • 十、Joins
    • 1.Inner Join:Batch Streaming
    • 2.Outer Join:Batch Streaming
    • 3.Interval Join:Batch Streaming
    • 4.Inner Join with Table Function (UDTF):Batch Streaming
    • 5.Join with Temporal Table
  • 十一、Set Operations
    • 1.Union:Batch
    • 2.UnionAll:Batch Streaming
    • 3.Intersect:Batch
    • 4.IntersectAll:Batch
    • 5.Minus:Batch
    • 6.MinusAll:Batch
    • 7.In:Batch Streaming
  • 十二、OrderBy, Offset & Fetch
    • 1.Order By:Batch Streaming
    • 2.Offset & Fetch:Batch Streaming
    • 3.Insert:Batch Streaming
  • 十三、Windows
    • 1.Group Windows
    • 2.Tumble (Tumbling Windows)
    • 3.Slide (Sliding Windows)
    • 4.Session (Session Windows)
    • 5.Over Windows
    • 6.Unbounded Over Windows
    • 7.Bounded Over Windows
  • 十四、Row-based Operations
    • 1.Map:Batch Streaming
    • 2.FlatMap:Batch Streaming
    • 3.Aggregate:Batch Streaming
    • 4.Group Window Aggregate:Batch Streaming
    • 5.FlatAggregate

一、下载PyFlink

命令行下载PyFlink:

pip install apache-flink

Pycharm下载PyFlink:

在这里插入图片描述

二、创建TableEnvironment

创建 TableEnvironment 的推荐方式是通过 EnvironmentSettings 对象创建:

from pyflink.table import EnvironmentSettings, TableEnvironment

# create a streaming TableEnvironment
env_settings = EnvironmentSettings.in_streaming_mode()
# or a batch TableEnvironment
# env_settings = EnvironmentSettings.in_batch_mode()
table_env = TableEnvironment.create(env_settings)

或者,用户可以从现有的 StreamExecutionEnvironment 创建 StreamTableEnvironment,以与 DataStream API 进行互操作。

from pyflink.datastream import StreamExecutionEnvironment
from pyflink.table import StreamTableEnvironment

# create a streaming TableEnvironment from a StreamExecutionEnvironment
env = StreamExecutionEnvironment.get_execution_environment()
table_env = StreamTableEnvironment.create(env)

三、TableEnvironment API

1.Table/SQL 操作

这些 APIs 用来创建或者删除 Table API/SQL 表和写查询:

APIs描述
from_elements(elements, schema=None, verify_schema=True)通过元素集合来创建表。
from_pandas(pdf, schema=None, split_num=1)通过 pandas DataFrame 来创建表。
from_path(path)通过指定路径下已注册的表来创建一个表,例如通过 create_temporary_view 注册表。
sql_query(query)执行一条 SQL 查询,并将查询的结果作为一个 Table 对象。
create_temporary_view(view_path, table)将一个 Table 对象注册为一张临时表,类似于 SQL 的临时表。
drop_temporary_view(view_path)删除指定路径下已注册的临时表
drop_temporary_table(table_path)删除指定路径下已注册的临时表。 你可以使用这个接口来删除临时 source 表和临时 sink 表。
execute_sql(stmt)执行指定的语句并返回执行结果。 执行语句可以是 DDL/DML/DQL/SHOW/DESCRIBE/EXPLAIN/USE。注意,对于 “INSERT INTO” 语句,这是一个异步操作,通常在向远程集群提交作业时才需要使用。 但是,如果在本地集群或者 IDE 中执行作业时,你需要等待作业执行完成

2.执行/解释作业

这些 APIs 是用来执行/解释作业。注意,execute_sql API 也可以用于执行作业。

APIs描述
explain_sql(stmt, *extra_details)返回指定语句的抽象语法树和执行计划。
create_statement_set()创建一个可接受 DML 语句或表的 StatementSet 实例。 它可用于执行包含多个 sink 的作业。

3.创建/删除用户自定义函数

这些 APIs 用来注册 UDFs 或者 删除已注册的 UDFs。 注意,execute_sql API 也可以用于注册/删除 UDFs。

APIs描述
create_temporary_function(path, function)将一个 Python 用户自定义函数注册为临时 catalog 函数。
create_temporary_system_function(name, function)将一个 Python 用户自定义函数注册为临时系统函数。 如果临时系统函数的名称与临时 catalog 函数名称相同,优先使用临时系统函数。
create_java_function(path, function_class_name, ignore_if_exists=None)将 Java 用户自定义函数注册为指定路径下的 catalog 函数。 如果 catalog 是持久化的,则可以跨多个 Flink 会话和集群使用已注册的 catalog 函数。
create_java_temporary_function(path, function_class_name)将 Java 用户自定义函数注册为临时 catalog 函数。
create_java_temporary_system_function(name, function_class_name)将 Java 用户定义的函数注册为临时系统函数。
drop_function(path)删除指定路径下已注册的 catalog 函数。
drop_temporary_function(path)删除指定名称下已注册的临时系统函数。
drop_temporary_system_function(name)删除指定名称下已注册的临时系统函数。

4.依赖管理

这些 APIs 用来管理 Python UDFs 所需要的 Python 依赖。

APIs描述
add_python_file(file_path)添加 Python 依赖,可以是 Python 文件,Python 包或者本地目录。 它们将会被添加到 Python UDF 工作程序的 PYTHONPATH 中。
set_python_requirements(requirements_file_path, requirements_cache_dir=None)指定一个 requirements.txt 文件,该文件定义了第三方依赖关系。 这些依赖项将安装到一个临时 catalog 中,并添加到 Python UDF 工作程序的 PYTHONPATH 中。
add_python_archive(archive_path, target_dir=None)添加 Python 归档文件。该文件将被解压到 Python UDF 程序的工作目录中。

5.配置

APIs描述
get_config()返回 table config,可以通过 table config 来定义 Table API 的运行时行为。

下面的代码示例展示了如何通过这个 API 来设置配置选项:

# set the parallelism to 8 
table_env.get_config().get_configuration().set_string("parallelism.default", "8") 

四、Catalog APIs

这些 APIs 用于访问 catalog 和模块

APIs描述
register_catalog(catalog_name, catalog)注册具有唯一名称的 Catalog
get_catalog(catalog_name)通过指定的名称来获得已注册的 Catalog
use_catalog(catalog_name)将当前目录设置为所指定的 catalog。 它也将默认数据库设置为所指定 catalog 的默认数据库。
get_current_catalog()获取当前会话默认的 catalog 名称。
get_current_database()获取正在运行会话中的当前默认数据库名称。
use_database(database_name)设置当前默认的数据库。 它必须存在当前 catalog 中。 当寻找未限定的对象名称时,该路径将被用作默认路径。
load_module(module_name, module)加载给定名称的 Module。 模块将按照加载的顺序进行保存。
unload_module(module_name)卸载给定名称的 Module
use_modules(*module_names)按指定列表激活在这个环境中加载的 Module
list_catalogs()获取在这个环境中注册的所有 catalog 目录名称。
list_modules()获取在这个环境中注册的所有激活的 Module 名称。
list_full_modules()获取在这个环境中注册的所有加载的 Module 名称及激活状态。
list_databases()获取当前 catalog 中所有数据库的名称。
list_tables()获取当前 catalog 的当前数据库下的所有表和临时表的名称。 它可以返回永久和临时的表和视图。
list_views()获取当前 catalog 的当前数据库中的所有临时表名称。 它既可以返回永久的也可以返回临时的临时表。
list_user_defined_functions()获取在该环境中已注册的所有用户自定义函数的名称。
list_functions()获取该环境中所有函数的名称。
list_temporary_tables()获取当前命名空间(当前 catalog 的当前数据库)中所有可用的表和临时表名称。
list_temporary_views()获取当前命名空间(当前 catalog 的当前数据库)中所有可用的临时表名称。

五、Statebackend,Checkpoint 以及重启策略

在 Flink 1.10 之前,你可以通过 StreamExecutionEnvironment 来配置 statebackend,checkpointing 以及重启策略。 现在你可以通过在 TableConfig 中,通过设置键值选项来配置它们。

下面代码示例展示了如何通过 Table API 来配置 statebackend,checkpoint 以及重启策略:

# 设置重启策略为 "fixed-delay"
table_env.get_config().get_configuration().set_string("restart-strategy", "fixed-delay")
table_env.get_config().get_configuration().set_string("restart-strategy.fixed-delay.attempts", "3")
table_env.get_config().get_configuration().set_string("restart-strategy.fixed-delay.delay", "30s")

# 设置 checkpoint 模式为 EXACTLY_ONCE
table_env.get_config().get_configuration().set_string("execution.checkpointing.mode", "EXACTLY_ONCE")
table_env.get_config().get_configuration().set_string("execution.checkpointing.interval", "3min")

# 设置 statebackend 类型为 "rocksdb",其他可选项有 "filesystem" 和 "jobmanager"
# 你也可以将这个属性设置为 StateBackendFactory 的完整类名
# e.g. org.apache.flink.contrib.streaming.state.RocksDBStateBackendFactory
table_env.get_config().get_configuration().set_string("state.backend", "rocksdb")

# 设置 RocksDB statebackend 所需要的 checkpoint 目录
table_env.get_config().get_configuration().set_string("state.checkpoints.dir", "file:///tmp/checkpoints/")

六、Table API

Table API 是批处理和流处理的统一的关系型 API。Table API 的查询不需要修改代码就可以采用批输入或流输入来运行。Table API 是 SQL 语言的超集,并且是针对 Apache Flink 专门设计的。Table API 集成了 Scala,Java 和 Python 语言的 API。Table API 的查询是使用 Java,Scala 或 Python 语言嵌入的风格定义的,有诸如自动补全和语法校验的 IDE 支持,而不是像普通 SQL 一样使用字符串类型的值来指定查询。

Table API 和 Flink SQL 共享许多概念以及部分集成的 API。通过查看公共概念 & API来学习如何注册表或如何创建一个表对象。流概念页面讨论了诸如动态表和时间属性等流特有的概念。

下面的例子中假定有一张叫 Orders 的表,表中有属性 (a, b, c, rowtime) 。rowtime 字段是流任务中的逻辑时间属性或是批任务中的普通时间戳字段。

概述 & 示例
Table API 支持 Scala, Java 和 Python 语言。Scala 语言的 Table API 利用了 Scala 表达式,Java 语言的 Table API 支持 DSL 表达式和解析并转换为等价表达式的字符串,Python 语言的 Table API 仅支持解析并转换为等价表达式的字符串。

下面的例子展示了 Scala、Java 和 Python 语言的 Table API 的不同之处。表程序是在批环境下执行的。程序扫描了 Orders 表,通过字段 a 进行分组,并计算了每组结果的行数。

Java

Java 的 Table API 通过引入 org.apache.flink.table.api.java.* 来使用。下面的例子展示了如何创建一个 Java 的 Table API 程序,以及表达式是如何指定为字符串的。 使用DSL表达式时也需要引入静态的 org.apache.flink.table.api.Expressions.*。

import org.apache.flink.table.api.*;

import static org.apache.flink.table.api.Expressions.*;

EnvironmentSettings settings = EnvironmentSettings
    .newInstance()
    .inStreamingMode()
    .build();

TableEnvironment tEnv = TableEnvironment.create(settings);

// 在表环境中注册 Orders 表
// ...

// 指定表程序
Table orders = tEnv.from("Orders"); // schema (a, b, c, rowtime)

Table counts = orders
        .groupBy($("a"))
        .select($("a"), $("b").count().as("cnt"));

// 打印
counts.execute().print();

Scala
Scala 的 Table API 通过引入 org.apache.flink.table.api.、org.apache.flink.api.scala. 和 org.apache.flink.table.api.bridge.scala._(开启数据流的桥接支持)来使用。

下面的例子展示了如何创建一个 Scala 的 Table API 程序。通过 Scala 的带美元符号($)的字符串插值来实现表字段引用。

import org.apache.flink.api.scala._
import org.apache.flink.table.api._
import org.apache.flink.table.api.bridge.scala._

// 环境配置
val settings = EnvironmentSettings
    .newInstance()
    .inStreamingMode()
    .build();

val tEnv = TableEnvironment.create(settings);

// 在表环境中注册 Orders 表
// ...

// 指定表程序
val orders = tEnv.from("Orders") // schema (a, b, c, rowtime)

val result = orders
               .groupBy($"a")
               .select($"a", $"b".count as "cnt")
               .execute()
               .print()

Python
下面的例子展示了如何创建一个 Python 的 Table API 程序,以及表达式是如何指定为字符串的。

from pyflink.table import *

# 环境配置
t_env = TableEnvironment.create(
    environment_settings=EnvironmentSettings.in_batch_mode())

# 在表环境中注册 Orders 表和结果 sink 表
source_data_path = "/path/to/source/directory/"
result_data_path = "/path/to/result/directory/"
source_ddl = f"""
        create table Orders(
            a VARCHAR,
            b BIGINT,
            c BIGINT,
            rowtime TIMESTAMP(3),
            WATERMARK FOR rowtime AS rowtime - INTERVAL '1' SECOND
        ) with (
            'connector' = 'filesystem',
            'format' = 'csv',
            'path' = '{source_data_path}'
        )
        """
t_env.execute_sql(source_ddl)

sink_ddl = f"""
    create table `Result`(
        a VARCHAR,
        cnt BIGINT
    ) with (
        'connector' = 'filesystem',
        'format' = 'csv',
        'path' = '{result_data_path}'
    )
    """
t_env.execute_sql(sink_ddl)

# 指定表程序
orders = t_env.from_path("Orders")  # schema (a, b, c, rowtime)

orders.group_by("a").select(orders.a, orders.b.count.alias('cnt')).execute_insert("result").wait()

下一个例子展示了一个更加复杂的 Table API 程序。这个程序也扫描 Orders 表。程序过滤了空值,使字符串类型的字段 a 标准化,并且每个小时进行一次计算并返回 a 的平均账单金额 b。

Java

// 环境配置
// ...

// 指定表程序
Table orders = tEnv.from("Orders"); // schema (a, b, c, rowtime)

Table result = orders
        .filter(
            and(
                $("a").isNotNull(),
                $("b").isNotNull(),
                $("c").isNotNull()
            ))
        .select($("a").lowerCase().as("a"), $("b"), $("rowtime"))
        .window(Tumble.over(lit(1).hours()).on($("rowtime")).as("hourlyWindow"))
        .groupBy($("hourlyWindow"), $("a"))
        .select($("a"), $("hourlyWindow").end().as("hour"), $("b").avg().as("avgBillingAmount"));

Scala

// 环境配置
// ...

// 指定表程序
val orders: Table = tEnv.from("Orders") // schema (a, b, c, rowtime)

val result: Table = orders
        .filter($"a".isNotNull && $"b".isNotNull && $"c".isNotNull)
        .select($"a".lowerCase() as "a", $"b", $"rowtime")
        .window(Tumble over 1.hour on $"rowtime" as "hourlyWindow")
        .groupBy($"hourlyWindow", $"a")
        .select($"a", $"hourlyWindow".end as "hour", $"b".avg as "avgBillingAmount")

Python

# 指定表程序
from pyflink.table.expressions import col, lit

orders = t_env.from_path("Orders")  # schema (a, b, c, rowtime)

result = orders.filter(orders.a.is_not_null & orders.b.is_not_null & orders.c.is_not_null) \
               .select(orders.a.lower_case.alias('a'), orders.b, orders.rowtime) \
               .window(Tumble.over(lit(1).hour).on(orders.rowtime).alias("hourly_window")) \
               .group_by(col('hourly_window'), col('a')) \
               .select(col('a'), col('hourly_window').end.alias('hour'), b.avg.alias('avg_billing_amount'))

因为 Table API 的批数据 API 和流数据 API 是统一的,所以这两个例子程序不需要修改代码就可以运行在流输入或批输入上。在这两种情况下,只要流任务没有数据延时,程序将会输出相同的结果。

七、Operations

Table API支持如下操作。请注意不是所有的操作都可以既支持流也支持批;这些操作都具有相应的标记。

1.From:Batch Streaming

和 SQL 查询的 FROM 子句类似。 执行一个注册过的表的扫描。
Java:

Table orders = tableEnv.from("Orders");

Scala:

val orders = tableEnv.from("Orders")

Python:

orders = t_env.from_path("Orders")

2.FromValues:Batch Streaming

和 SQL 查询中的 VALUES 子句类似。 基于提供的行生成一张内联表。

你可以使用 row(…) 表达式创建复合行:
Java:

Table table = tEnv.fromValues(
   row(1, "ABC"),
   row(2L, "ABCDE")
);

Scala:

table = tEnv.fromValues(
   row(1, "ABC"),
   row(2L, "ABCDE")
)

Python:

table = t_env.from_elements([(1, 'ABC'), (2, 'ABCDE')])

这将生成一张结构如下的表:

root
|-- f0: BIGINT NOT NULL     // original types INT and BIGINT are generalized to BIGINT
|-- f1: VARCHAR(5) NOT NULL // original types CHAR(3) and CHAR(5) are generalized
                            // to VARCHAR(5). VARCHAR is used instead of CHAR so that
                            // no padding is applied

这个方法会根据输入的表达式自动获取类型。如果在某一个特定位置的类型不一致,该方法会尝试寻找一个所有类型的公共超类型。如果公共超类型不存在,则会抛出异常。

你也可以明确指定所需的类型。指定如 DECIMAL 这样的一般类型或者给列命名可能是有帮助的。

Java:

Table table = tEnv.fromValues(
    DataTypes.ROW(
        DataTypes.FIELD("id", DataTypes.DECIMAL(10, 2)),
        DataTypes.FIELD("name", DataTypes.STRING())
    ),
    row(1, "ABC"),
    row(2L, "ABCDE")
);

Scala:

val table = tEnv.fromValues(
    DataTypes.ROW(
        DataTypes.FIELD("id", DataTypes.DECIMAL(10, 2)),
        DataTypes.FIELD("name", DataTypes.STRING())
    ),
    row(1, "ABC"),
    row(2L, "ABCDE")
)

Python:

table = t_env.from_elements(
  [(1, 'ABC'), (2, 'ABCDE')],
  schema=DataTypes.Row([DataTypes.FIELD('id', DataTypes.DECIMAL(10, 2)),
                        DataTypes.FIELD('name', DataTypes.STRING())]))

这将生成一张结构如下的表:

root
|-- id: DECIMAL(10, 2)
|-- name: STRING

3.Select:Batch Streaming

和 SQL 的 SELECT 子句类似。 执行一个 select 操作。

Java

Table orders = tableEnv.from("Orders");
Table result = orders.select($("a"), $("c").as("d"));

Scala

val orders = tableEnv.from("Orders")
Table result = orders.select($"a", $"c" as "d");

Python

orders = t_env.from_path("Orders")
result = orders.select(orders.a, orders.c.alias('d'))

可以选择星号(*)作为通配符,select 表中的所有列。
Java

Table result = orders.select($("*"));

Scala

Table result = orders.select($"*")

Python

from pyflink.table.expressions import col

result = orders.select(col("*"))

4.As:Batch Streaming

重命名字段。

Java

Table orders = tableEnv.from("Orders");
Table result = orders.as("x, y, z, t");

scala

val orders: Table = tableEnv.from("Orders").as("x", "y", "z", "t")

Python

orders = t_env.from_path("Orders")
result = orders.alias("x, y, z, t")

5.Where / Filter:Batch Streaming

和 SQL 的 WHERE 子句类似。 过滤掉未验证通过过滤谓词的行。
Java:

Table orders = tableEnv.from("Orders");
Table result = orders.where($("b").isEqual("red"));


Table orders = tableEnv.from("Orders");
Table result = orders.filter($("b").isEqual("red"));

Scala:

val orders: Table = tableEnv.from("Orders")
val result = orders.filter($"a" % 2 === 0)

val orders: Table = tableEnv.from("Orders")
val result = orders.filter($"a" % 2 === 0)

Python:

orders = t_env.from_path("Orders")
result = orders.where(orders.a == 'red')

orders = t_env.from_path("Orders")
result = orders.filter(orders.a == 'red')

八、列操作

1.AddColumns:Batch Streaming

执行字段添加操作。 如果所添加的字段已经存在,将抛出异常。

Java

Table orders = tableEnv.from("Orders");
Table result = orders.addColumns(concat($("c"), "sunny"));

Scala

val orders = tableEnv.from("Orders");
val result = orders.addColumns(concat($"c", "Sunny"))

Python

from pyflink.table.expressions import concat

orders = t_env.from_path("Orders")
result = orders.add_columns(concat(orders.c, 'sunny'))

2.AddOrReplaceColumns:Batch Streaming

执行字段添加操作。 如果添加的列名称和已存在的列名称相同,则已存在的字段将被替换。 此外,如果添加的字段里面有重复的字段名,则会使用最后一个字段。
Java

Table orders = tableEnv.from("Orders");
Table result = orders.addOrReplaceColumns(concat($("c"), "sunny").as("desc"));

Scala

val orders = tableEnv.from("Orders");
val result = orders.addOrReplaceColumns(concat($"c", "Sunny") as "desc")

Python

from pyflink.table.expressions import concat

orders = t_env.from_path("Orders")
result = orders.add_or_replace_columns(concat(orders.c, 'sunny').alias('desc'))

3.DropColumns:Batch Streaming

Java

able orders = tableEnv.from("Orders");
Table result = orders.dropColumns($("b"), $("c"));

Scala

val orders = tableEnv.from("Orders");
val result = orders.dropColumns($"b", $"c")

Python

orders = t_env.from_path("Orders")
result = orders.drop_columns(orders.b, orders.c)

4.RenameColumns:Batch Streaming

执行字段重命名操作。 字段表达式应该是别名表达式,并且仅当字段已存在时才能被重命名。
Java

Table orders = tableEnv.from("Orders");
Table result = orders.renameColumns($("b").as("b2"), $("c").as("c2"));

Scala

val orders = tableEnv.from("Orders");
val result = orders.renameColumns($"b" as "b2", $"c" as "c2")

Python

orders = t_env.from_path("Orders")
result = orders.rename_columns(orders.b.alias('b2'), orders.c.alias('c2'))

九、Aggregations

1.GroupBy Aggregation:Batch Streaming

和 SQL 的 GROUP BY 子句类似。 使用分组键对行进行分组,使用伴随的聚合算子来按照组进行聚合行。

Java

Table orders = tableEnv.from("Orders");
Table result = orders.groupBy($("a")).select($("a"), $("b").sum().as("d"));

Scala

val orders: Table = tableEnv.from("Orders")
val result = orders.groupBy($"a").select($"a", $"b".sum().as("d"))

Python

orders = t_env.from_path("Orders")
result = orders.group_by(orders.a).select(orders.a, orders.b.sum.alias('d'))

2.GroupBy Window Aggregation:Batch Streaming

使用分组窗口结合单个或者多个分组键对表进行分组和聚合。

Java

Table orders = tableEnv.from("Orders");
Table result = orders
    .window(Tumble.over(lit(5).minutes()).on($("rowtime")).as("w")) // 定义窗口
    .groupBy($("a"), $("w")) // 按窗口和键分组
    // 访问窗口属性并聚合
    .select(
        $("a"),
        $("w").start(),
        $("w").end(),
        $("w").rowtime(),
        $("b").sum().as("d")
    );

Scala

val orders: Table = tableEnv.from("Orders")
val result: Table = orders
    .window(Tumble over 5.minutes on $"rowtime" as "w") // 定义窗口
    .groupBy($"a", $"w") // 按窗口和键分组
    .select($"a", $"w".start, $"w".end, $"w".rowtime, $"b".sum as "d") // 访问窗口属性并聚合

Python

from pyflink.table.window import Tumble
from pyflink.table.expressions import lit, col

orders = t_env.from_path("Orders")
result = orders.window(Tumble.over(lit(5).minutes).on(orders.rowtime).alias("w")) \ 
               .group_by(orders.a, col('w')) \
               .select(orders.a, col('w').start, col('w').end, orders.b.sum.alias('d'))

3.Over Window Aggregation

和 SQL 的 OVER 子句类似。

Java

Table orders = tableEnv.from("Orders");
Table result = orders
    // 定义窗口
    .window(
        Over
          .partitionBy($("a"))
          .orderBy($("rowtime"))
          .preceding(UNBOUNDED_RANGE)
          .following(CURRENT_RANGE)
          .as("w"))
    // 滑动聚合
    .select(
        $("a"),
        $("b").avg().over($("w")),
        $("b").max().over($("w")),
        $("b").min().over($("w"))
    );

Scala

val orders: Table = tableEnv.from("Orders")
val result: Table = orders
    // 定义窗口
    .window(
        Over
          partitionBy $"a"
          orderBy $"rowtime"
          preceding UNBOUNDED_RANGE
          following CURRENT_RANGE
          as "w")
    .select($"a", $"b".avg over $"w", $"b".max().over($"w"), $"b".min().over($"w")) // 滑动聚合

Python

from pyflink.table.window import Over
from pyflink.table.expressions import col, UNBOUNDED_RANGE, CURRENT_RANGE

orders = t_env.from_path("Orders")
result = orders.over_window(Over.partition_by(orders.a).order_by(orders.rowtime)
                            .preceding(UNBOUNDED_RANGE).following(CURRENT_RANGE)
                            .alias("w")) \
               .select(orders.a, orders.b.avg.over(col('w')), orders.b.max.over(col('w')), orders.b.min.over(col('w')))

所有的聚合必须定义在同一个窗口上,比如同一个分区、排序和范围内。目前只支持 PRECEDING 到当前行范围(无界或有界)的窗口。尚不支持 FOLLOWING 范围的窗口。ORDER BY 操作必须指定一个单一的时间属性。

4.Distinct Aggregation:Batch Streaming

和 SQL DISTINCT 聚合子句类似,例如 COUNT(DISTINCT a)。 Distinct 聚合声明的聚合函数(内置或用户定义的)仅应用于互不相同的输入值。 Distinct 可以应用于 GroupBy Aggregation、GroupBy Window Aggregation 和 Over Window Aggregation。

Java:

Table orders = tableEnv.from("Orders");
// 按属性分组后的的互异(互不相同、去重)聚合
Table groupByDistinctResult = orders
    .groupBy($("a"))
    .select($("a"), $("b").sum().distinct().as("d"));
// 按属性、时间窗口分组后的互异(互不相同、去重)聚合
Table groupByWindowDistinctResult = orders
    .window(Tumble
            .over(lit(5).minutes())
            .on($("rowtime"))
            .as("w")
    )
    .groupBy($("a"), $("w"))
    .select($("a"), $("b").sum().distinct().as("d"));
// over window 上的互异(互不相同、去重)聚合
Table result = orders
    .window(Over
        .partitionBy($("a"))
        .orderBy($("rowtime"))
        .preceding(UNBOUNDED_RANGE)
        .as("w"))
    .select(
        $("a"), $("b").avg().distinct().over($("w")),
        $("b").max().over($("w")),
        $("b").min().over($("w"))
    );

Scala:

val orders: Table = tableEnv.from("Orders");
// 按属性分组后的的互异(互不相同、去重)聚合
val groupByDistinctResult = orders
    .groupBy($"a")
    .select($"a", $"b".sum.distinct as "d")
// 按属性、时间窗口分组后的互异(互不相同、去重)聚合
val groupByWindowDistinctResult = orders
    .window(Tumble over 5.minutes on $"rowtime" as "w").groupBy($"a", $"w")
    .select($"a", $"b".sum.distinct as "d")
// over window 上的互异(互不相同、去重)聚合
val result = orders
    .window(Over
        partitionBy $"a"
        orderBy $"rowtime"
        preceding UNBOUNDED_RANGE
        as $"w")
    .select($"a", $"b".avg.distinct over $"w", $"b".max over $"w", $"b".min over $"w")

Python:

from pyflink.table.expressions import col, lit, UNBOUNDED_RANGE

orders = t_env.from_path("Orders")
# 按属性分组后的的互异(互不相同、去重)聚合
group_by_distinct_result = orders.group_by(orders.a) \
                                 .select(orders.a, orders.b.sum.distinct.alias('d'))
# 按属性、时间窗口分组后的互异(互不相同、去重)聚合
group_by_window_distinct_result = orders.window(
    Tumble.over(lit(5).minutes).on(orders.rowtime).alias("w")).group_by(orders.a, col('w')) \
    .select(orders.a, orders.b.sum.distinct.alias('d'))
# over window 上的互异(互不相同、去重)聚合
result = orders.over_window(Over
                       .partition_by(orders.a)
                       .order_by(orders.rowtime)
                       .preceding(UNBOUNDED_RANGE)
                       .alias("w")) \
                       .select(orders.a, orders.b.avg.distinct.over(col('w')), orders.b.max.over(col('w')), orders.b.min.over(col('w')))

用户定义的聚合函数也可以与 DISTINCT 修饰符一起使用。如果计算不同(互异、去重的)值的聚合结果,则只需向聚合函数添加 distinct 修饰符即可。

Java

Table orders = tEnv.from("Orders");

// 对 user-defined aggregate functions 使用互异(互不相同、去重)聚合
tEnv.registerFunction("myUdagg", new MyUdagg());
orders.groupBy("users")
    .select(
        $("users"),
        call("myUdagg", $("points")).distinct().as("myDistinctResult")
    );

Scala

val orders: Table = tableEnv.from("Orders")
val result = orders.distinct()

Python

orders = t_env.from_path("Orders")
result = orders.distinct()

十、Joins

1.Inner Join:Batch Streaming

和 SQL 的 JOIN 子句类似。关联两张表。两张表必须有不同的字段名,并且必须通过 join 算子或者使用 where 或 filter 算子定义至少一个 join 等式连接谓词。

Java:

Table orders = tableEnv.from("Orders");
Table result = orders.distinct();

Scala:

val left = tableEnv.from("MyTable").select($"a", $"b", $"c")
val right = tableEnv.from("MyTable").select($"d", $"e", $"f")
val result = left.join(right).where($"a" === $"d").select($"a", $"b", $"e")

Python:

from pyflink.table.expressions import col

left = t_env.from_path("Source1").select(col('a'), col('b'), col('c'))
right = t_env.from_path("Source2").select(col('d'), col('e'), col('f'))
result = left.join(right).where(left.a == right.d).select(left.a, left.b, right.e)

2.Outer Join:Batch Streaming

和 SQL LEFT/RIGHT/FULL OUTER JOIN 子句类似。 关联两张表。 两张表必须有不同的字段名,并且必须定义至少一个等式连接谓词。

Java:

Table left = tableEnv.from("MyTable).select($("a"), $("b"), $("c"));
Table right = tableEnv.from("MyTable).select($("d"), $("e"), $("f"));

Table leftOuterResult = left.leftOuterJoin(right, $("a").isEqual($("d")))
                            .select($("a"), $("b"), $("e"));
Table rightOuterResult = left.rightOuterJoin(right, $("a").isEqual($("d")))
                            .select($("a"), $("b"), $("e"));
Table fullOuterResult = left.fullOuterJoin(right, $("a").isEqual($("d")))
                            .select($("a"), $("b"), $("e"));

Scala:

val left = tableEnv.from("MyTable").select($"a", $"b", $"c")
val right = tableEnv.from("MyTable").select($"d", $"e", $"f")

val leftOuterResult = left.leftOuterJoin(right, $"a" === $"d").select($"a", $"b", $"e")
val rightOuterResult = left.rightOuterJoin(right, $"a" === $"d").select($"a", $"b", $"e")
val fullOuterResult = left.fullOuterJoin(right, $"a" === $"d").select($"a", $"b", $"e")

Python:

from pyflink.table.expressions import col

left = t_env.from_path("Source1").select(col('a'), col('b'), col('c'))
right = t_env.from_path("Source2").select(col('d'), col('e'), col('f'))

left_outer_result = left.left_outer_join(right, left.a == right.d).select(left.a, left.b, right.e)
right_outer_result = left.right_outer_join(right, left.a == right.d).select(left.a, left.b, right.e)
full_outer_result = left.full_outer_join(right, left.a == right.d).select(left.a, left.b, right.e)

3.Interval Join:Batch Streaming

Interval join 是可以通过流模式处理的常规 join 的子集。

Interval join 至少需要一个 equi-join 谓词和一个限制双方时间界限的 join 条件。这种条件可以由两个合适的范围谓词(<、<=、>=、>)或一个比较两个输入表相同时间属性(即处理时间或事件时间)的等值谓词来定义。

Java:

Table left = tableEnv.from("MyTable).select($("a"), $("b"), $("c"), $("ltime"));
Table right = tableEnv.from("MyTable).select($("d"), $("e"), $("f"), $("rtime"));

Table result = left.join(right)
  .where(
    and(
        $("a").isEqual($("d")),
        $("ltime").isGreaterOrEqual($("rtime").minus(lit(5).minutes())),
        $("ltime").isLess($("rtime").plus(lit(10).minutes()))
    ))
  .select($("a"), $("b"), $("e"), $("ltime"));

Scala:

val left = tableEnv.from("MyTable").select($"a", $"b", $"c", $"ltime")
val right = tableEnv.from("MyTable").select($"d", $"e", $"f", $"rtime")

val result = left.join(right)
  .where($"a" === $"d" && $"ltime" >= $"rtime" - 5.minutes && $"ltime" < $"rtime" + 10.minutes)
  .select($"a", $"b", $"e", $"ltime")

Python:

from pyflink.table.expressions import col

left = t_env.from_path("Source1").select(col('a'), col('b'), col('c'), col('rowtime1'))
right = t_env.from_path("Source2").select(col('d'), col('e'), col('f'), col('rowtime2'))
  
joined_table = left.join(right).where((left.a == right.d) & (left.rowtime1 >= right.rowtime2 - lit(1).second) & (left.rowtime1 <= right.rowtime2 + lit(2).seconds))
result = joined_table.select(joined_table.a, joined_table.b, joined_table.e, joined_table.rowtime1)

4.Inner Join with Table Function (UDTF):Batch Streaming

join 表和表函数的结果。左(外部)表的每一行都会 join 表函数相应调用产生的所有行。 如果表函数调用返回空结果,则删除左侧(外部)表的一行。

Java:

Table orders = tableEnv.from("Orders");
Table result = orders.groupBy($("a")).select($("a"), $("b").sum().as("d"));
Scala
Python

Scala:

// 实例化 User-Defined Table Function
val split: TableFunction[_] = new MySplitUDTF()

// join
val result: Table = table
    .joinLateral(split($"c") as ("s", "t", "v"))
    .select($"a", $"b", $"s", $"t", $"v")

Python:

# 注册 User-Defined Table Function
@udtf(result_types=[DataTypes.BIGINT(), DataTypes.BIGINT(), DataTypes.BIGINT()])
def split(x):
    return [Row(1, 2, 3)]

# join
orders = t_env.from_path("Orders")
joined_table = orders.left_outer_join_lateral(split(orders.c).alias("s, t, v"))
result = joined_table.select(joined_table.a, joined_table.b, joined_table.s, joined_table.t, joined_table.v)

5.Join with Temporal Table

Temporal table 是跟踪随时间变化的表。

Temporal table 函数提供对特定时间点 temporal table 状态的访问。表与 temporal table 函数进行 join 的语法和使用表函数进行 inner join 的语法相同。

目前仅支持与 temporal table 的 inner join。

Java:

Table ratesHistory = tableEnv.from("RatesHistory");

// 注册带有时间属性和主键的 temporal table function
TemporalTableFunction rates = ratesHistory.createTemporalTableFunction(
    "r_proctime",
    "r_currency");
tableEnv.registerFunction("rates", rates);

// 基于时间属性和键与“Orders”表关联
Table orders = tableEnv.from("Orders");
Table result = orders
    .joinLateral(call("rates", $("o_proctime")), $("o_currency").isEqual($("r_currency")));

Python:

目前不支持 Python 的 Table API。

十一、Set Operations

1.Union:Batch

和 SQL UNION 子句类似。Union 两张表会删除重复记录。两张表必须具有相同的字段类型。

Java:

Table left = tableEnv.from("orders1");
Table right = tableEnv.from("orders2");

left.union(right);

Scala:

val left = tableEnv.from("orders1")
val right = tableEnv.from("orders2")

left.union(right)

2.UnionAll:Batch Streaming

和 SQL UNION ALL 子句类似。Union 两张表。 两张表必须具有相同的字段类型。

Java:

Table left = tableEnv.from("orders1");
Table right = tableEnv.from("orders2");

left.unionAll(right);

Scala:

val left = tableEnv.from("orders1")
val right = tableEnv.from("orders2")

left.unionAll(right)

3.Intersect:Batch

和 SQL INTERSECT 子句类似。Intersect 返回两个表中都存在的记录。如果一条记录在一张或两张表中存在多次,则只返回一条记录,也就是说,结果表中不存在重复的记录。两张表必须具有相同的字段类型。

Java:

Table left = tableEnv.from("orders1");
Table right = tableEnv.from("orders2");

left.intersect(right);

Scala:

val left = tableEnv.from("orders1")
val right = tableEnv.from("orders2")

left.intersect(right)

4.IntersectAll:Batch

和 SQL INTERSECT ALL 子句类似。IntersectAll 返回两个表中都存在的记录。如果一条记录在两张表中出现多次,那么该记录返回的次数同该记录在两个表中都出现的次数一致,也就是说,结果表可能存在重复记录。两张表必须具有相同的字段类型。

Java:

Table left = tableEnv.from("orders1");
Table right = tableEnv.from("orders2");

left.intersectAll(right);

Scala:

val left = tableEnv.from("orders1")
val right = tableEnv.from("orders2")

left.intersectAll(right)

5.Minus:Batch

和 SQL EXCEPT 子句类似。Minus 返回左表中存在且右表中不存在的记录。左表中的重复记录只返回一次,换句话说,结果表中没有重复记录。两张表必须具有相同的字段类型。

Java

Table left = tableEnv.from("orders1");
Table right = tableEnv.from("orders2");

left.minus(right);

Scala:

val left = tableEnv.from("orders1")
val right = tableEnv.from("orders2")

left.minus(right)

6.MinusAll:Batch

和 SQL EXCEPT ALL 子句类似。MinusAll 返回右表中不存在的记录。在左表中出现 n 次且在右表中出现 m 次的记录,在结果表中出现 (n - m) 次,例如,也就是说结果中删掉了在右表中存在重复记录的条数的记录。两张表必须具有相同的字段类型。

Java:

Table left = tableEnv.from("orders1");
Table right = tableEnv.from("orders2");

left.minusAll(right);

Scala:

val left = tableEnv.from("orders1")
val right = tableEnv.from("orders2")

left.minusAll(right)

7.In:Batch Streaming

和 SQL IN 子句类似。如果表达式的值存在于给定表的子查询中,那么 In 子句返回 true。子查询表必须由一列组成。这个列必须与表达式具有相同的数据类型。

Java:

Table left = tableEnv.from("Orders1")
Table right = tableEnv.from("Orders2");

Table result = left.select($("a"), $("b"), $("c")).where($("a").in(right));

Scala:

val left = tableEnv.from("Orders1")
val right = tableEnv.from("Orders2");

val result = left.select($"a", $"b", $"c").where($"a".in(right))

Python:

left = t_env.from_path("Source1").select(col('a'), col('b'), col('c'))
right = t_env.from_path("Source2").select(col('a'))

result = left.select(left.a, left.b, left.c).where(left.a.in_(right))

十二、OrderBy, Offset & Fetch

1.Order By:Batch Streaming

和 SQL ORDER BY 子句类似。返回跨所有并行分区的全局有序记录。对于无界表,该操作需要对时间属性进行排序或进行后续的 fetch 操作。
Java:

from pyflink.table.expressions import col

left = t_env.from_path("Source1").select(col('a'), col('b'), col('c'))
right = t_env.from_path("Source2").select(col('d'), col('e'), col('f'))

left_outer_result = left.left_outer_join(right, left.a == right.d).select(left.a, left.b, right.e)
right_outer_result = left.right_outer_join(right, left.a == right.d).select(left.a, left.b, right.e)
full_outer_result = left.full_outer_join(right, left.a == right.d).select(left.a, left.b, right.e)

Scala:

val result = in.orderBy($"a".asc)

Python:

result = in.order_by(in.a.asc)

2.Offset & Fetch:Batch Streaming

和 SQL 的 OFFSET 和 FETCH 子句类似。Offset 操作根据偏移位置来限定(可能是已排序的)结果集。Fetch 操作将(可能已排序的)结果集限制为前 n 行。通常,这两个操作前面都有一个排序操作。对于无界表,offset 操作需要 fetch 操作。

Java:

// 从已排序的结果集中返回前5条记录
Table result1 = in.orderBy($("a").asc()).fetch(5);

// 从已排序的结果集中返回跳过3条记录之后的所有记录
Table result2 = in.orderBy($("a").asc()).offset(3);

// 从已排序的结果集中返回跳过10条记录之后的前5条记录
Table result3 = in.orderBy($("a").asc()).offset(10).fetch(5);

Scala:

// 从已排序的结果集中返回前5条记录
val result1: Table = in.orderBy($"a".asc).fetch(5)

// 从已排序的结果集中返回跳过3条记录之后的所有记录
val result2: Table = in.orderBy($"a".asc).offset(3)

// 从已排序的结果集中返回跳过10条记录之后的前5条记录
val result3: Table = in.orderBy($"a".asc).offset(10).fetch(5)

Python:

# 从已排序的结果集中返回前5条记录
result1 = table.order_by(table.a.asc).fetch(5)

# 从已排序的结果集中返回跳过3条记录之后的所有记录
result2 = table.order_by(table.a.asc).offset(3)

# 从已排序的结果集中返回跳过10条记录之后的前5条记录
result3 = table.order_by(table.a.asc).offset(10).fetch(5)

3.Insert:Batch Streaming

和 SQL 查询中的 INSERT INTO 子句类似,该方法执行对已注册的输出表的插入操作。executeInsert() 方法将立即提交执行插入操作的 Flink job。

输出表必须已注册在 TableEnvironment(详见表连接器)中。此外,已注册表的 schema 必须与查询中的 schema 相匹配。

Java:

Table orders = tableEnv.from("Orders");
orders.executeInsert("OutOrders");

Scala:

val orders = tableEnv.from("Orders")
orders.executeInsert("OutOrders")

Python:

orders = t_env.from_path("Orders")
orders.execute_insert("OutOrders")

十三、Windows

1.Group Windows

Group window 聚合根据时间或行计数间隔将行分为有限组,并为每个分组进行一次聚合函数计算。对于批处理表,窗口是按时间间隔对记录进行分组的便捷方式。

窗口是使用 window(GroupWindow w) 子句定义的,并且需要使用 as 子句来指定别名。为了按窗口对表进行分组,窗口别名必须像常规分组属性一样在 groupBy(…) 子句中引用。 以下示例展示了如何在表上定义窗口聚合

Java:

Table table = input
  .window([GroupWindow w].as("w"))  // 定义窗口并指定别名为 w
  .groupBy($("w"))  // 以窗口 w 对表进行分组
  .select($("b").sum());  // 聚合

Scala:

val table = input
  .window([w: GroupWindow] as $"w")  // 定义窗口并指定别名为 w
  .groupBy($"w")   // 以窗口 w 对表进行分组
  .select($"b".sum)  // 聚合

Python:

# 定义窗口并指定别名为 w,以窗口 w 对表进行分组,然后再聚合
table = input.window([w: GroupWindow].alias("w")) \
             .group_by(col('w')).select(input.b.sum)

在流环境中,如果窗口聚合除了窗口之外还根据一个或多个属性进行分组,则它们只能并行计算,例如,groupBy(…) 子句引用了一个窗口别名和至少一个附加属性。仅引用窗口别名(例如在上面的示例中)的 groupBy(…) 子句只能由单个非并行任务进行计算。 以下示例展示了如何定义有附加分组属性的窗口聚合。

Java:

Table table = input
  .window([GroupWindow w].as("w"))  // 定义窗口并指定别名为 w
  .groupBy($("w"), $("a"))  // 以属性 a 和窗口 w 对表进行分组
  .select($("a"), $("b").sum());  // 聚合

Scala:

val table = input
  .window([w: GroupWindow] as $"w") // 定义窗口并指定别名为 w
  .groupBy($"w", $"a")  // 以属性 a 和窗口 w 对表进行分组
  .select($"a", $"b".sum)  // 聚合

Python:

# 定义窗口并指定别名为 w,以属性 a 和窗口 w 对表进行分组,
# 然后再聚合
table = input.window([w: GroupWindow].alias("w")) \
             .group_by(col('w'), input.a).select(input.b.sum)

时间窗口的开始、结束或行时间戳等窗口属性可以作为窗口别名的属性添加到 select 子句中,如 w.start、w.end 和 w.rowtime。窗口开始和行时间戳是包含的上下窗口边界。相反,窗口结束时间戳是唯一的上窗口边界。例如,从下午 2 点开始的 30 分钟滚动窗口将 “14:00:00.000” 作为开始时间戳,“14:29:59.999” 作为行时间时间戳,“14:30:00.000” 作为结束时间戳。

Java:

able table = input
  .window([GroupWindow w].as("w"))  // 定义窗口并指定别名为 w
  .groupBy($("w"), $("a"))  // 以属性 a 和窗口 w 对表进行分组
  .select($("a"), $("w").start(), $("w").end(), $("w").rowtime(), $("b").count()); // 聚合并添加窗口开始、结束和 rowtime 时间戳

Scala:

val table = input
  .window([w: GroupWindow] as $"w")  // 定义窗口并指定别名为 w
  .groupBy($"w", $"a")  // 以属性 a 和窗口 w 对表进行分组
  .select($"a", $"w".start, $"w".end, $"w".rowtime, $"b".count) // 聚合并添加窗口开始、结束和 rowtime 时间戳

Python:

# 定义窗口并指定别名为 w,以属性 a 和窗口 w 对表进行分组,
# 然后再聚合并添加窗口开始、结束和 rowtime 时间戳
table = input.window([w: GroupWindow].alias("w")) \
             .group_by(col('w'), input.a) \
             .select(input.a, col('w').start, col('w').end, col('w').rowtime, input.b.count)

Window 参数定义了如何将行映射到窗口。 Window 不是用户可以实现的接口。相反,Table API 提供了一组具有特定语义的预定义 Window 类。下面列出了支持的窗口定义。

2.Tumble (Tumbling Windows)

滚动窗口将行分配给固定长度的非重叠连续窗口。例如,一个 5 分钟的滚动窗口以 5 分钟的间隔对行进行分组。滚动窗口可以定义在事件时间、处理时间或行数上。

滚动窗口是通过 Tumble 类定义的,具体如下:

MethodDescription
over将窗口的长度定义为时间或行计数间隔。
on要对数据进行分组(时间间隔)或排序(行计数)的时间属性。批处理查询支持任意 Long 或 Timestamp 类型的属性。流处理查询仅支持声明的事件时间或处理时间属性。
alias指定窗口的别名。别名用于在 group_by() 子句中引用窗口,并可以在 select() 子句中选择如窗口开始、结束或行时间戳的窗口属性。

Java:

// Tumbling Event-time Window
.window(Tumble.over(lit(10).minutes()).on($("rowtime")).as("w"));

// Tumbling Processing-time Window (assuming a processing-time attribute "proctime")
.window(Tumble.over(lit(10).minutes()).on($("proctime")).as("w"));

// Tumbling Row-count Window (assuming a processing-time attribute "proctime")
.window(Tumble.over(rowInterval(10)).on($("proctime")).as("w"));

Scala:

// Tumbling Event-time Window
.window(Tumble over 10.minutes on $"rowtime" as $"w")

// Tumbling Processing-time Window (assuming a processing-time attribute "proctime")
.window(Tumble over 10.minutes on $"proctime" as $"w")

// Tumbling Row-count Window (assuming a processing-time attribute "proctime")
.window(Tumble over 10.rows on $"proctime" as $"w")

Python:

# Tumbling Event-time Window
.window(Tumble.over(lit(10).minutes).on(col('rowtime')).alias("w"))

# Tumbling Processing-time Window (assuming a processing-time attribute "proctime")
.window(Tumble.over(lit(10).minutes).on(col('proctime')).alias("w"))

# Tumbling Row-count Window (assuming a processing-time attribute "proctime")
.window(Tumble.over(row_interval(10)).on(col('proctime')).alias("w"))

3.Slide (Sliding Windows)

滑动窗口具有固定大小并按指定的滑动间隔滑动。如果滑动间隔小于窗口大小,则滑动窗口重叠。因此,行可能分配给多个窗口。例如,15 分钟大小和 5 分钟滑动间隔的滑动窗口将每一行分配给 3 个不同的 15 分钟大小的窗口,以 5 分钟的间隔进行一次计算。滑动窗口可以定义在事件时间、处理时间或行数上。

滑动窗口是通过 Slide 类定义的,具体如下:

MethodDescription
over将窗口的长度定义为时间或行计数间隔。
every将窗口的长度定义为时间或行计数间隔。滑动间隔的类型必须与窗口长度的类型相同。
on要对数据进行分组(时间间隔)或排序(行计数)的时间属性。批处理查询支持任意 Long 或 Timestamp 类型的属性。流处理查询仅支持声明的事件时间或处理时间属性。
as指定窗口的别名。别名用于在 groupBy() 子句中引用窗口,并可以在 select() 子句中选择如窗口开始、结束或行时间戳的窗口属性。

Java:

// Sliding Event-time Window
.window(Slide.over(lit(10).minutes())
            .every(lit(5).minutes())
            .on($("rowtime"))
            .as("w"));

// Sliding Processing-time window (assuming a processing-time attribute "proctime")
.window(Slide.over(lit(10).minutes())
            .every(lit(5).minutes())
            .on($("proctime"))
            .as("w"));

// Sliding Row-count window (assuming a processing-time attribute "proctime")
.window(Slide.over(rowInterval(10)).every(rowInterval(5)).on($("proctime")).as("w"));

Scala:

// Sliding Event-time Window
.window(Slide over 10.minutes every 5.minutes on $"rowtime" as $"w")

// Sliding Processing-time window (assuming a processing-time attribute "proctime")
.window(Slide over 10.minutes every 5.minutes on $"proctime" as $"w")

// Sliding Row-count window (assuming a processing-time attribute "proctime")
.window(Slide over 10.rows every 5.rows on $"proctime" as $"w")

Python:

# Sliding Event-time Window
.window(Slide.over(lit(10).minutes).every(lit(5).minutes).on(col('rowtime')).alias("w"))

# Sliding Processing-time window (assuming a processing-time attribute "proctime")
.window(Slide.over(lit(10).minutes).every(lit(5).minutes).on(col('proctime')).alias("w"))

# Sliding Row-count window (assuming a processing-time attribute "proctime")
.window(Slide.over(row_interval(10)).every(row_interval(5)).on(col('proctime')).alias("w"))

4.Session (Session Windows)

会话窗口没有固定的大小,其边界是由不活动的间隔定义的,例如,如果在定义的间隔期内没有事件出现,则会话窗口将关闭。例如,定义30 分钟间隔的会话窗口,当观察到一行在 30 分钟内不活动(否则该行将被添加到现有窗口中)且30 分钟内没有添加新行,窗口会关闭。会话窗口支持事件时间和处理时间。

MethodDescription
withGap将两个窗口之间的间隙定义为时间间隔。
on要对数据进行分组(时间间隔)或排序(行计数)的时间属性。批处理查询支持任意 Long 或 Timestamp 类型的属性。流处理查询仅支持声明的事件时间或处理时间属性。
as指定窗口的别名。别名用于在 groupBy() 子句中引用窗口,并可以在 select() 子句中选择如窗口开始、结束或行时间戳的窗口属性。

Java:

// Session Event-time Window
.window(Session.withGap(lit(10).minutes()).on($("rowtime")).as("w"));

// Session Processing-time Window (assuming a processing-time attribute "proctime")
.window(Session.withGap(lit(10).minutes()).on($("proctime")).as("w"));

Scala:

// Session Event-time Window
.window(Session withGap 10.minutes on $"rowtime" as $"w")

// Session Processing-time Window (assuming a processing-time attribute "proctime")
.window(Session withGap 10.minutes on $"proctime" as $"w")

Python:

# Session Event-time Window
.window(Session.with_gap(lit(10).minutes).on(col('rowtime')).alias("w"))

# Session Processing-time Window (assuming a processing-time attribute "proctime")
.window(Session.with_gap(lit(10).minutes).on(col('proctime')).alias("w"))

5.Over Windows

Over window 聚合聚合来自在标准的 SQL(OVER 子句),可以在 SELECT 查询子句中定义。与在“GROUP BY”子句中指定的 group window 不同, over window 不会折叠行。相反,over window 聚合为每个输入行在其相邻行的范围内计算聚合。

Over windows 使用 window(w: OverWindow*) 子句(在 Python API 中使用 over_window(*OverWindow))定义,并通过 select() 方法中的别名引用。以下示例显示如何在表上定义 over window 聚合。

Java:

Table table = input
  .window([OverWindow w].as("w"))           // define over window with alias w
  .select($("a"), $("b").sum().over($("w")), $("c").min().over($("w"))); // aggregate over the over window w

Scala:

val table = input
  .window([w: OverWindow] as $"w")              // define over window with alias w
  .select($"a", $"b".sum over $"w", $"c".min over $"w") // aggregate over the over window w

Python:

# define over window with alias w and aggregate over the over window w
table = input.over_window([w: OverWindow].alias("w")) \
    .select(input.a, input.b.sum.over(col('w')), input.c.min.over(col('w')))

OverWindow 定义了计算聚合的行范围。OverWindow 不是用户可以实现的接口。相反,Table API 提供了Over 类来配置 over window 的属性。可以在事件时间或处理时间以及指定为时间间隔或行计数的范围内定义 over window 。可以通过 Over 类(和其他类)上的方法来定义 over window,具体如下:

Partition By
可选的

在一个或多个属性上定义输入的分区。每个分区单独排序,聚合函数分别应用于每个分区。

注意:在流环境中,如果窗口包含 partition by 子句,则只能并行计算 over window 聚合。如果没有 partitionBy(…),数据流将由单个非并行任务处理。

Order By
必须的

定义每个分区内行的顺序,从而定义聚合函数应用于行的顺序。

注意:对于流处理查询,必须声明事件时间或处理时间属性。目前,仅支持单个排序属性。

Preceding

可选的

定义了包含在窗口中并位于当前行之前的行的间隔。间隔可以是时间或行计数间隔。

有界 over window 用间隔的大小指定,例如,时间间隔为10分钟或行计数间隔为10行。

无界 over window 通过常量来指定,例如,用UNBOUNDED_RANGE指定时间间隔或用 UNBOUNDED_ROW 指定行计数间隔。无界 over windows 从分区的第一行开始。

如果省略前面的子句,则使用 UNBOUNDED_RANGE 和 CURRENT_RANGE 作为窗口前后的默认值。

Following
可选的

定义包含在窗口中并在当前行之后的行的窗口间隔。间隔必须以与前一个间隔(时间或行计数)相同的单位指定。

目前,不支持在当前行之后有行的 over window。相反,你可以指定两个常量之一:

CURRENT_ROW 将窗口的上限设置为当前行。
CURRENT_RANGE 将窗口的上限设置为当前行的排序键,例如,与当前行具有相同排序键的所有行都包含在窗口中。
如果省略后面的子句,则时间间隔窗口的上限定义为 CURRENT_RANGE,行计数间隔窗口的上限定义为CURRENT_ROW。

As
必须的

为 over window 指定别名。别名用于在之后的 select() 子句中引用该 over window。

注意:目前,同一个 select() 调用中的所有聚合函数必须在同一个 over window 上计算。

6.Unbounded Over Windows

Java

// 无界的事件时间 over window(假定有一个叫“rowtime”的事件时间属性)
.window(Over.partitionBy($("a")).orderBy($("rowtime")).preceding(UNBOUNDED_RANGE).as("w"));

// 无界的处理时间 over window(假定有一个叫“proctime”的处理时间属性)
.window(Over.partitionBy($("a")).orderBy("proctime").preceding(UNBOUNDED_RANGE).as("w"));

// 无界的事件时间行数 over window(假定有一个叫“rowtime”的事件时间属性)
.window(Over.partitionBy($("a")).orderBy($("rowtime")).preceding(UNBOUNDED_ROW).as("w"));
 
// 无界的处理时间行数 over window(假定有一个叫“proctime”的处理时间属性)
.window(Over.partitionBy($("a")).orderBy($("proctime")).preceding(UNBOUNDED_ROW).as("w"));

Scala

// 无界的事件时间 over window(假定有一个叫“rowtime”的事件时间属性)
.window(Over partitionBy $"a" orderBy $"rowtime" preceding UNBOUNDED_RANGE as "w")

// 无界的处理时间 over window(假定有一个叫“proctime”的处理时间属性)
.window(Over partitionBy $"a" orderBy $"proctime" preceding UNBOUNDED_RANGE as "w")

// 无界的事件时间行数 over window(假定有一个叫“rowtime”的事件时间属性)
.window(Over partitionBy $"a" orderBy $"rowtime" preceding UNBOUNDED_ROW as "w")
 
// 无界的处理时间行数 over window(假定有一个叫“proctime”的处理时间属性)
.window(Over partitionBy $"a" orderBy $"proctime" preceding UNBOUNDED_ROW as "w")

Python

# 无界的事件时间 over window(假定有一个叫“rowtime”的事件时间属性)
.over_window(Over.partition_by(col('a')).order_by(col('rowtime')).preceding(UNBOUNDED_RANGE).alias("w"))

# 无界的处理时间 over window(假定有一个叫“proctime”的处理时间属性)
.over_window(Over.partition_by(col('a')).order_by(col('proctime')).preceding(UNBOUNDED_RANGE).alias("w"))

# 无界的事件时间行数 over window(假定有一个叫“rowtime”的事件时间属性)
.over_window(Over.partition_by(col('a')).order_by(col('rowtime')).preceding(UNBOUNDED_ROW).alias("w"))
 
# 无界的处理时间行数 over window(假定有一个叫“proctime”的处理时间属性)
.over_window(Over.partition_by(col('a')).order_by(col('proctime')).preceding(UNBOUNDED_ROW).alias("w"))

7.Bounded Over Windows

Java

// 有界的事件时间 over window(假定有一个叫“rowtime”的事件时间属性)
.window(Over.partitionBy($("a")).orderBy($("rowtime")).preceding(lit(1).minutes()).as("w"));

// 有界的处理时间 over window(假定有一个叫“proctime”的处理时间属性)
.window(Over.partitionBy($("a")).orderBy($("proctime")).preceding(lit(1).minutes()).as("w"));

// 有界的事件时间行数 over window(假定有一个叫“rowtime”的事件时间属性)
.window(Over.partitionBy($("a")).orderBy($("rowtime")).preceding(rowInterval(10)).as("w"));
 
// 有界的处理时间行数 over window(假定有一个叫“proctime”的处理时间属性)
.window(Over.partitionBy($("a")).orderBy($("proctime")).preceding(rowInterval(10)).as("w"));

Scala:

// 有界的事件时间 over window(假定有一个叫“rowtime”的事件时间属性)
.window(Over partitionBy $"a" orderBy $"rowtime" preceding 1.minutes as "w")

// 有界的处理时间 over window(假定有一个叫“proctime”的处理时间属性)
.window(Over partitionBy $"a" orderBy $"proctime" preceding 1.minutes as "w")

// 有界的事件时间行数 over window(假定有一个叫“rowtime”的事件时间属性)
.window(Over partitionBy $"a" orderBy $"rowtime" preceding 10.rows as "w")
  
// 有界的处理时间行数 over window(假定有一个叫“proctime”的处理时间属性)
.window(Over partitionBy $"a" orderBy $"proctime" preceding 10.rows as "w")

Python:

# 有界的事件时间 over window(假定有一个叫“rowtime”的事件时间属性)
.over_window(Over.partition_by(col('a')).order_by(col('rowtime')).preceding(lit(1).minutes).alias("w"))

# 有界的处理时间 over window(假定有一个叫“proctime”的处理时间属性)
.over_window(Over.partition_by(col('a')).order_by(col('proctime')).preceding(lit(1).minutes).alias("w"))

# 有界的事件时间行数 over window(假定有一个叫“rowtime”的事件时间属性)
.over_window(Over.partition_by(col('a')).order_by(col('rowtime')).preceding(row_interval(10)).alias("w"))
 
# 有界的处理时间行数 over window(假定有一个叫“proctime”的处理时间属性)
.over_window(Over.partition_by(col('a')).order_by(col('proctime')).preceding(row_interval(10)).alias("w"))

十四、Row-based Operations

基于行生成多列输出的操作。

1.Map:Batch Streaming

使用用户定义的标量函数或内置标量函数执行 map 操作。如果输出类型是复合类型,则输出将被展平。

Java:

public class MyMapFunction extends ScalarFunction {
    public Row eval(String a) {
        return Row.of(a, "pre-" + a);
    }

    @Override
    public TypeInformation<?> getResultType(Class<?>[] signature) {
        return Types.ROW(Types.STRING(), Types.STRING());
    }
}

ScalarFunction func = new MyMapFunction();
tableEnv.registerFunction("func", func);

Table table = input
  .map(call("func", $("c")).as("a", "b"));

Scala:

class MyMapFunction extends ScalarFunction {
  def eval(a: String): Row = {
    Row.of(a, "pre-" + a)
  }

  override def getResultType(signature: Array[Class[_]]): TypeInformation[_] =
    Types.ROW(Types.STRING, Types.STRING)
}

val func = new MyMapFunction()
val table = input
  .map(func($"c")).as("a", "b")

Python:
使用 python 的通用标量函数或向量化标量函数执行 map 操作。如果输出类型是复合类型,则输出将被展平。

from pyflink.common import Row
from pyflink.table import DataTypes
from pyflink.table.udf import udf

def map_function(a: Row) -> Row:
    return Row(a.a + 1, a.b * a.b)

# 使用 python 通用标量函数进行 map 操作
func = udf(map_function, result_type=DataTypes.ROW(
                                     [DataTypes.FIELD("a", DataTypes.BIGINT()),
                                      DataTypes.FIELD("b", DataTypes.BIGINT())]))
table = input.map(func).alias('a', 'b')

# 使用 python 向量化标量函数进行 map 操作
pandas_func = udf(lambda x: x * 2, result_type=DataTypes.ROW(
                                                    [DataTypes.FIELD("a", DataTypes.BIGINT()),
                                                    DataTypes.FIELD("b", DataTypes.BIGINT()))]),
                  func_type='pandas')

table = input.map(pandas_func).alias('a', 'b')

2.FlatMap:Batch Streaming

Java:
使用表函数执行 flatMap 操作。

public class MyFlatMapFunction extends TableFunction<Row> {

    public void eval(String str) {
        if (str.contains("#")) {
            String[] array = str.split("#");
            for (int i = 0; i < array.length; ++i) {
                collect(Row.of(array[i], array[i].length()));
            }
        }
    }

    @Override
    public TypeInformation<Row> getResultType() {
        return Types.ROW(Types.STRING(), Types.INT());
    }
}

TableFunction func = new MyFlatMapFunction();
tableEnv.registerFunction("func", func);

Table table = input
  .flatMap(call("func", $("c")).as("a", "b"));

Scala:

class MyFlatMapFunction extends TableFunction[Row] {
  def eval(str: String): Unit = {
    if (str.contains("#")) {
      str.split("#").foreach({ s =>
        val row = new Row(2)
        row.setField(0, s)
        row.setField(1, s.length)
        collect(row)
      })
    }
  }

  override def getResultType: TypeInformation[Row] = {
    Types.ROW(Types.STRING, Types.INT)
  }
}

val func = new MyFlatMapFunction
val table = input
  .flatMap(func($"c")).as("a", "b")

Python:

from pyflink.table.udf import udtf
from pyflink.table import DataTypes
from pyflink.common import Row

@udtf(result_types=[DataTypes.INT(), DataTypes.STRING()])
def split(x: Row) -> Row:
    for s in x.b.split(","):
        yield x.a, s

input.flat_map(split)

3.Aggregate:Batch Streaming

Java:
使用聚合函数来执行聚合操作。你必须使用 select 子句关闭 aggregate,并且 select 子句不支持聚合函数。如果输出类型是复合类型,则聚合的输出将被展平。

public class MyMinMaxAcc {
    public int min = 0;
    public int max = 0;
}

public class MyMinMax extends AggregateFunction<Row, MyMinMaxAcc> {

    public void accumulate(MyMinMaxAcc acc, int value) {
        if (value < acc.min) {
            acc.min = value;
        }
        if (value > acc.max) {
            acc.max = value;
        }
    }

    @Override
    public MyMinMaxAcc createAccumulator() {
        return new MyMinMaxAcc();
    }

    public void resetAccumulator(MyMinMaxAcc acc) {
        acc.min = 0;
        acc.max = 0;
    }

    @Override
    public Row getValue(MyMinMaxAcc acc) {
        return Row.of(acc.min, acc.max);
    }

    @Override
    public TypeInformation<Row> getResultType() {
        return new RowTypeInfo(Types.INT, Types.INT);
    }
}

AggregateFunction myAggFunc = new MyMinMax();
tableEnv.registerFunction("myAggFunc", myAggFunc);
Table table = input
  .groupBy($("key"))
  .aggregate(call("myAggFunc", $("a")).as("x", "y"))
  .select($("key"), $("x"), $("y"));

Scala:

case class MyMinMaxAcc(var min: Int, var max: Int)

class MyMinMax extends AggregateFunction[Row, MyMinMaxAcc] {

  def accumulate(acc: MyMinMaxAcc, value: Int): Unit = {
    if (value < acc.min) {
      acc.min = value
    }
    if (value > acc.max) {
      acc.max = value
    }
  }

  override def createAccumulator(): MyMinMaxAcc = MyMinMaxAcc(0, 0)

  def resetAccumulator(acc: MyMinMaxAcc): Unit = {
    acc.min = 0
    acc.max = 0
  }

  override def getValue(acc: MyMinMaxAcc): Row = {
    Row.of(Integer.valueOf(acc.min), Integer.valueOf(acc.max))
  }

  override def getResultType: TypeInformation[Row] = {
    new RowTypeInfo(Types.INT, Types.INT)
  }
}

val myAggFunc = new MyMinMax
val table = input
  .groupBy($"key")
  .aggregate(myAggFunc($"a") as ("x", "y"))
  .select($"key", $"x", $"y")

Python:
使用 python 的通用聚合函数或 向量化聚合函数来执行聚合操作。你必须使用 select 子句关闭 aggregate ,并且 select 子句不支持聚合函数。如果输出类型是复合类型,则聚合的输出将被展平。

from pyflink.common import Row
from pyflink.table import DataTypes
from pyflink.table.udf import AggregateFunction, udaf

class CountAndSumAggregateFunction(AggregateFunction):

    def get_value(self, accumulator):
        return Row(accumulator[0], accumulator[1])

    def create_accumulator(self):
        return Row(0, 0)

    def accumulate(self, accumulator, row: Row):
        accumulator[0] += 1
        accumulator[1] += row.b

    def retract(self, accumulator, row: Row):
        accumulator[0] -= 1
        accumulator[1] -= row.b

    def merge(self, accumulator, accumulators):
        for other_acc in accumulators:
            accumulator[0] += other_acc[0]
            accumulator[1] += other_acc[1]

    def get_accumulator_type(self):
        return DataTypes.ROW(
            [DataTypes.FIELD("a", DataTypes.BIGINT()),
             DataTypes.FIELD("b", DataTypes.BIGINT())])

    def get_result_type(self):
        return DataTypes.ROW(
            [DataTypes.FIELD("a", DataTypes.BIGINT()),
             DataTypes.FIELD("b", DataTypes.BIGINT())])

function = CountAndSumAggregateFunction()
agg = udaf(function,
           result_type=function.get_result_type(),
           accumulator_type=function.get_accumulator_type(),
           name=str(function.__class__.__name__))

# 使用 python 通用聚合函数进行聚合
result = t.group_by(t.a) \
    .aggregate(agg.alias("c", "d")) \
    .select("a, c, d")
    
# 使用 python 向量化聚合函数进行聚合
pandas_udaf = udaf(lambda pd: (pd.b.mean(), pd.b.max()),
                   result_type=DataTypes.ROW(
                       [DataTypes.FIELD("a", DataTypes.FLOAT()),
                        DataTypes.FIELD("b", DataTypes.INT())]),
                   func_type="pandas")
t.aggregate(pandas_udaf.alias("a", "b")) \
    .select("a, b")

4.Group Window Aggregate:Batch Streaming

在 group window 和可能的一个或多个分组键上对表进行分组和聚合。你必须使用 select 子句关闭 aggregate。并且 select 子句不支持“*“或聚合函数。

Java:

AggregateFunction myAggFunc = new MyMinMax();
tableEnv.registerFunction("myAggFunc", myAggFunc);

Table table = input
    .window(Tumble.over(lit(5).minutes())
                  .on($("rowtime"))
                  .as("w")) // 定义窗口
    .groupBy($("key"), $("w")) // 以键和窗口分组
    .aggregate(call("myAggFunc", $("a")).as("x", "y"))
    .select($("key"), $("x"), $("y"), $("w").start(), $("w").end()); // 访问窗口属性与聚合结果

Scala:

val myAggFunc = new MyMinMax
val table = input
    .window(Tumble over 5.minutes on $"rowtime" as "w") // 定义窗口
    .groupBy($"key", $"w") // 以键和窗口分组
    .aggregate(myAggFunc($"a") as ("x", "y"))
    .select($"key", $"x", $"y", $"w".start, $"w".end) // 访问窗口属性与聚合结果

Python:

from pyflink.table import DataTypes
from pyflink.table.udf import AggregateFunction, udaf

pandas_udaf = udaf(lambda pd: (pd.b.mean(), pd.b.max()),
                   result_type=DataTypes.ROW(
                       [DataTypes.FIELD("a", DataTypes.FLOAT()),
                        DataTypes.FIELD("b", DataTypes.INT())]),
                   func_type="pandas")
tumble_window = Tumble.over(expr.lit(1).hours) \
    .on(expr.col("rowtime")) \
    .alias("w")
t.select(t.b, t.rowtime) \
    .window(tumble_window) \
    .group_by("w") \
    .aggregate(pandas_udaf.alias("d", "e")) \
    .select("w.rowtime, d, e")

5.FlatAggregate

和 GroupBy Aggregation 类似。使用运行中的表之后的聚合算子对分组键上的行进行分组,以按组聚合行。和 AggregateFunction 的不同之处在于,TableAggregateFunction 的每个分组可能返回0或多条记录。你必须使用 select 子句关闭 flatAggregate。并且 select 子句不支持聚合函数。

除了使用 emitValue 输出结果,你还可以使用 emitUpdateWithRetract 方法。和 emitValue 不同的是,emitUpdateWithRetract 用于下发已更新的值。此方法在retract 模式下增量输出数据,例如,一旦有更新,我们必须在发送新的更新记录之前收回旧记录。如果在表聚合函数中定义了这两个方法,则将优先使用 emitUpdateWithRetract 方法而不是 emitValue 方法,这是因为该方法可以增量输出值,因此被视为比 emitValue 方法更有效。

Java:

/**
 * Top2 Accumulator。
 */
public class Top2Accum {
    public Integer first;
    public Integer second;
}

/**
 * 用户定义的聚合函数 top2。
 */
public class Top2 extends TableAggregateFunction<Tuple2<Integer, Integer>, Top2Accum> {

    @Override
    public Top2Accum createAccumulator() {
        Top2Accum acc = new Top2Accum();
        acc.first = Integer.MIN_VALUE;
        acc.second = Integer.MIN_VALUE;
        return acc;
    }


    public void accumulate(Top2Accum acc, Integer v) {
        if (v > acc.first) {
            acc.second = acc.first;
            acc.first = v;
        } else if (v > acc.second) {
            acc.second = v;
        }
    }

    public void merge(Top2Accum acc, java.lang.Iterable<Top2Accum> iterable) {
        for (Top2Accum otherAcc : iterable) {
            accumulate(acc, otherAcc.first);
            accumulate(acc, otherAcc.second);
        }
    }

    public void emitValue(Top2Accum acc, Collector<Tuple2<Integer, Integer>> out) {
        // 下发 value 与 rank
        if (acc.first != Integer.MIN_VALUE) {
            out.collect(Tuple2.of(acc.first, 1));
        }
        if (acc.second != Integer.MIN_VALUE) {
            out.collect(Tuple2.of(acc.second, 2));
        }
    }
}

tEnv.registerFunction("top2", new Top2());
Table orders = tableEnv.from("Orders");
Table result = orders
    .groupBy($("key"))
    .flatAggregate(call("top2", $("a")).as("v", "rank"))
    .select($("key"), $("v"), $("rank");

Scala:

import java.lang.{Integer => JInteger}
import org.apache.flink.table.api.Types
import org.apache.flink.table.functions.TableAggregateFunction

/**
 * Top2 Accumulator。
 */
class Top2Accum {
  var first: JInteger = _
  var second: JInteger = _
}

/**
 * 用户定义的聚合函数 top2。
 */
class Top2 extends TableAggregateFunction[JTuple2[JInteger, JInteger], Top2Accum] {

  override def createAccumulator(): Top2Accum = {
    val acc = new Top2Accum
    acc.first = Int.MinValue
    acc.second = Int.MinValue
    acc
  }

  def accumulate(acc: Top2Accum, v: Int) {
    if (v > acc.first) {
      acc.second = acc.first
      acc.first = v
    } else if (v > acc.second) {
      acc.second = v
    }
  }

  def merge(acc: Top2Accum, its: JIterable[Top2Accum]): Unit = {
    val iter = its.iterator()
    while (iter.hasNext) {
      val top2 = iter.next()
      accumulate(acc, top2.first)
      accumulate(acc, top2.second)
    }
  }

  def emitValue(acc: Top2Accum, out: Collector[JTuple2[JInteger, JInteger]]): Unit = {
    // 下发 value 与 rank
    if (acc.first != Int.MinValue) {
      out.collect(JTuple2.of(acc.first, 1))
    }
    if (acc.second != Int.MinValue) {
      out.collect(JTuple2.of(acc.second, 2))
    }
  }
}

val top2 = new Top2
val orders: Table = tableEnv.from("Orders")
val result = orders
    .groupBy($"key")
    .flatAggregate(top2($"a") as ($"v", $"rank"))
    .select($"key", $"v", $"rank")

Python:
使用 python 通用 Table Aggregate Function 执行 flat_aggregate 操作。

和 GroupBy Aggregation 类似。使用运行中的表之后的聚合运算符对分组键上的行进行分组,以按组聚合行。和 AggregateFunction 的不同之处在于,TableAggregateFunction 的每个分组可能返回0或多条记录。你必须使用 select 子句关闭 flat_aggregate。并且 select 子句不支持聚合函数。

from pyflink.common import Row
from pyflink.table.udf import TableAggregateFunction, udtaf
from pyflink.table import DataTypes

class Top2(TableAggregateFunction):

    def emit_value(self, accumulator):
        yield Row(accumulator[0])
        yield Row(accumulator[1])

    def create_accumulator(self):
        return [None, None]

    def accumulate(self, accumulator, row: Row):
        if row.a is not None:
            if accumulator[0] is None or row.a > accumulator[0]:
                accumulator[1] = accumulator[0]
                accumulator[0] = row.a
            elif accumulator[1] is None or row.a > accumulator[1]:
                accumulator[1] = row.a

    def merge(self, accumulator, accumulators):
        for other_acc in accumulators:
            self.accumulate(accumulator, other_acc[0])
            self.accumulate(accumulator, other_acc[1])

    def get_accumulator_type(self):
        return DataTypes.ARRAY(DataTypes.BIGINT())

    def get_result_type(self):
        return DataTypes.ROW(
            [DataTypes.FIELD("a", DataTypes.BIGINT())])

mytop = udtaf(Top2())
t = t_env.from_elements([(1, 'Hi', 'Hello'),
                              (3, 'Hi', 'hi'),
                              (5, 'Hi2', 'hi'),
                              (7, 'Hi', 'Hello'),
                              (2, 'Hi', 'Hello')], ['a', 'b', 'c'])
result = t.select(t.a, t.c) \
    .group_by(t.c) \
    .flat_aggregate(mytop) \
    .select(t.a) \
    .flat_aggregate(mytop.alias("b"))

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/81874.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

小程序图片加载失败binderror方法处理

场景&#xff1a;我们在小程序项目中的一个图片列表&#xff0c;当某些图片加载失败后&#xff0c;直接显示空白&#xff0c;这样用户体验不好&#xff0c;为了解决当图片加载失败&#xff0c;我们给一个默认图片代替&#xff0c;参考官方给的图片加载失败的处理方法&#xff1…

C51单片机开发程序报错 main.c (11) : error C267 : ‘Func‘ : requires ANSI-style prototype

问题 C51单片机开发程序报错 main.c (11) : error C267 : Func : requires ANSI-style prototype详细问题 问题一 问题二 问题三 可能原因一 函数定义声明处&#xff08;.h文件中&#xff09;与主函数中函数&#xff08;函数名/参数类型/返回值类型&#xff09;不一致 解决…

【Vue2+Element ui通用后台】项目搭建和vue-router使用

文章目录介绍创建项目并引入Element-ui按需引入全局引入vue-router安装嵌套路由介绍 通过这个系列文章&#xff0c;我们将学到&#xff1a; 1.项目搭建使用element实现首页布局 2.顶部导航菜单及与左侧导航联动的面包屑实现 3.封装—个ECharts组件 4.封装一个Form表单组件和Ta…

木字楠后台管理系统开发(4):SpringSecurity引入并编写登陆接口

&#x1f3b6; 文章简介&#xff1a;木字楠后台管理系统开发(4)&#xff1a;SpringSecurity引入并编写登陆接口 &#x1f4a1; 创作目的&#xff1a;为了带大家完整的体验木字楠后台管理系统模版的开发流程 ☀️ 今日天气&#xff1a;冬天来啦&#xff01; &#x1f4dd; 每日一…

在ubuntu上部署gitlab详细步骤

一、Ubuntu安装gitlab步骤&#xff1a; 安装依赖 通过快捷键ctrlaltT打开命令行窗口&#xff0c;然后运行下面两行命令 sudo apt update sudo apt-get upgrade sudo apt-get install curl openssh-server ca-certificates postfix 如果这一步遇到下面提示界面&#xff0c…

BUUCTF Web2

[HCTF 2018]admin flask session的伪造 改密码的页面源码有提示&#xff0c;得到秘钥ckj123 自己的session .eJw9kEGLwjAUhP_KkrOHJm09CB5cbKULeaHwanm5iKu1adK4UBVpxP--XRc8zGmGj5l5sN1paC6GLa7DrZmxXXdkiwf7-GYLptCl2uoOcHWXmDu1kYnGLIFNdQdsBYmtkbb3YI89YDXKUHKNTkCg8S9PliJ…

Kotlin 开发Android app(二十二):Retrofit和简单的mvp框架

到这一节&#xff0c;基本上把大部分kotlin和android的开发都已经介绍完成了&#xff0c;通过了前面和这一章的框架结构&#xff0c;基本上能解决开发中的很多问题&#xff0c;并且能够知道android的主要的技术&#xff0c;并进行独立开发了。对于传统的开发的话&#xff0c;还…

一些可以显著提高大型 Java 项目启动速度的尝试

我们线上的业务 jar 包基本上普遍比较庞大&#xff0c;动不动一个 jar 包上百 M&#xff0c;启动时间在分钟级&#xff0c;拖慢了我们在故障时快速扩容的响应。于是做了一些分析&#xff0c;看看 Java 程序启动慢到底慢在哪里&#xff0c;如何去优化&#xff0c;目前的效果是大…

SpringSecurity安全框架

目录 一、Spring Security介绍 1、框架介绍 2、认证与授权实现思路 二、整合Spring Security 1、在common下创建spring_security模块 2、在spring_security引入相关依赖 3.代码结构说明&#xff1a; 4、创建spring security核心配置类 5、创建认证授权相关的工具类 &a…

Roson的Qt之旅 #139 Qt读写Excel

1.使用QAxObject读写Excel QAxObject类提供了一个包裹COM对象的QObject。 QAxObject可以被实例化为一个空的对象&#xff0c;用它应该包裹的COM对象的名字&#xff0c;或者用一个指向代表现有COM对象的IUnknown的指针。如果COM对象实现了IDispatch接口&#xff0c;该对象的属性…

【Python 身份证JSON数据读取】——身份证前六位地区码对照表文件(最全版-JSON文件)

点个赞留个关注吧&#xff01;&#xff01; 1、生成身份证前六位地区码对照表JSON文件 2、python 读取JSON文件 提取码【1234】 json文件下载 废话不多说&#xff0c;先上效果图 一、生成身份证json数据文件 先去百度搜索地区身份证号码前6位查询 ,然后进入网站控制台界面&…

经常会采坑的javascript原型应试题

一&#xff0e; 前言 原型和原型链在面试中历来备受重视&#xff0c;经常被提及。说难可能也不太难&#xff0c;但要真正完全理解&#xff0c;吃透它&#xff0c;还是要多下功夫的。 下面为大家简单阐述我对原型和原型链的理解&#xff0c;若是觉得有说的不对的地方&#xff…

必备技能,MySQL 查找并删除重复行

本文讲述如何查找数据库里重复的行。这是初学者十分普遍遇到的问题。方法也很简单。这个问题还可以有其他演变&#xff0c;例如&#xff0c;如何查找“两字段重复的行”&#xff08;#mysql IRC 频道问到的问题&#xff09; 如何查找重复行 第一步是定义什么样的行才是重复行。…

碳酸钾碱性溶液除钙镁软化树脂

碳酸钾是重要的基本无机化工、医药、轻工原料之一&#xff0c;主要用于光学玻璃、电焊条、电子管、电视显像管、灯泡、印染、染料、油墨、照相药品、泡花碱、聚酯、炸药、电镀、制革、陶瓷、建材、水晶、钾肥皂及药物的生产。用作气体吸附剂&#xff0c;干粉灭火剂&#xff0c;…

Spring Boot 整合 Groovy 脚本,实现动态编程

Groovy简介 Groovy 是增强 Java 平台的唯一的脚本语言。它提供了类似于 Java 的语法&#xff0c;内置映射&#xff08;Map&#xff09;、列表&#xff08;List&#xff09;、方法、类、闭包&#xff08;closure&#xff09;以及生成器。脚本语言不会替代系统编程语言&#xff…

「Redis数据结构」哈希对象(Hash)

「Redis数据结构」哈希对象&#xff08;Hash&#xff09; 文章目录「Redis数据结构」哈希对象&#xff08;Hash&#xff09;一、概述二、编码ZipListHashTable三、编码转换一、概述 Redis中hash对象是一个string类型的field和value的映射表&#xff0c;hash特别适合用于存储对…

RabbitMQ:消息模型

RabbitMQ 提供了 6 种消息模型&#xff0c;分别为&#xff1a;单生产单消费模型&#xff08;Hello World&#xff09;、消息分发模型&#xff08;Work queues&#xff09;、Fanout 消息订阅模式&#xff08;Publish/Subscribe&#xff09;、Direct 路由模式&#xff08;Routing…

基于JSP的手工艺品在线网站

摘 要 在Internet高速发展的今天&#xff0c;我们生活的各个领域都涉及到计算机的应用&#xff0c;其中包括手工艺品在线网站的网络应用&#xff0c;在外国手工艺品已经是很普遍的方式&#xff0c;不过国内的手工艺品可能还处于起步阶段。手工艺品在线网站具有在线下单功能。手…

详解vue中watch的用法

前言 说到 vue 中的 watch 方法&#xff0c;大家可能首先想到&#xff0c;它是用来监听数据的变化&#xff0c;一旦数据发生变化可以执行一些其他的操作。但是 watch 的操作可不止如此&#xff0c;本章就带大家一起深剖细析 vue 中的 watch 方法。 watch&#xff1f; 因为 vue…

DocuWare平台——用于文档管理和工作流程自动化的内容服务平台详细介绍(上)

DocuWare平台——用于文档管理和工作流程自动化的内容服务平台 成功实现办公自动化所需的一切 DocuWare 是一个先进的平台&#xff0c;可让您集中、快速、有效地管理、处理和利用业务信息。 我们的文档管理和工作流程解决方案的各项功能可以集成到任何 IT 系统中&#xff0c;…