Flink(十五)【Flink SQL Connector、savepoint、CateLog、Table API】

news2024/12/23 16:55:53

前言

       今天一天争取搞完最后这一部分,学完赶紧把 Kafka 和 Flume 学完,就要开始做实时数仓了。据说是应届生得把实时数仓搞个 80%~90% 才能差不多找个工作,太牛马了。

1、常用 Connector 读写

        之前我们已经用过了一些简单的内置连接器,比如 'datagen' 、'print' ,其它的可以查看官网:Overview | Apache Flink

环境准备:

# 1. 先启动 hadoop
myhadoop start
# 2. 不需要启动 flink 只启动yarn-session即可
/opt/module/flink-1.17.0/bin/yarn-session.sh -d
# 3. 启动 flink sql 的环境 sql-client
./sql-client.sh embedded -s yarn-session

1.1、Kafka 

1)添加kafka连接器依赖

  • 将flink-sql-connector-kafka-1.17.0.jar上传到flink的lib目录下
  • 重启yarn-session、sql-client

        使用 kafka 连接器,我们需要清楚,我们用 Flink SQL 往连接器为 kafka 的表中插入数据就相当于 Flink 往 Kafka 写入数据,而我们查询 Flink SQL 表中的数据就相当于 从 Kafka 中读取数据。所以当我们建表时就需要初始化读取 Kafka 数据和消费 Kafka 数据的参数。

2)创建 kfaka 的映射表

CREATE TABLE t1( 
  `event_time` TIMESTAMP(3) METADATA FROM 'timestamp',
  --列名和元数据名一致可以省略 FROM 'xxxx', VIRTUAL表示只读
  `partition` BIGINT METADATA VIRTUAL,
  `offset` BIGINT METADATA VIRTUAL,
id int, 
ts bigint , 
vc int )
WITH (
  'connector' = 'kafka',
  'properties.bootstrap.servers' = 'hadoop102:9092',
  'properties.group.id' = 'lyh',
-- 'earliest-offset', 'latest-offset', 'group-offsets', 'timestamp' and 'specific-offsets'
  'scan.startup.mode' = 'earliest-offset',
  -- fixed为flink实现的分区器,一个并行度只写往kafka一个分区
'sink.partitioner' = 'fixed',
  'topic' = 'ws1',
  'format' = 'json'
) 

上面有一个参数 'sink.partitioner' 的值是 'fixed' ,我们之前学过 Kafka 的生产者的分区器有默认的 hash分区器和粘性分区器,这种 fixed 分区器是 kafka 为flink实现的 ,一个并行度只写往一个 kafka 分区,我们可以查看一下 FlinkFixedPartition 的源码:

创建好的表格是没有数据的,所以我们再创建一个数据源往 kfaka 里插入数据:

Flink SQL> CREATE TABLE source ( 
>     id INT, 
>     ts BIGINT, 
>     vc INT
> ) WITH ( 
>     'connector' = 'datagen', 
>     'rows-per-second'='1', 
>     'fields.id.kind'='random', 
>     'fields.id.min'='1', 
>     'fields.id.max'='10', 
>     'fields.ts.kind'='sequence', 
>     'fields.ts.start'='1', 
>     'fields.ts.end'='1000000', 
>     'fields.vc.kind'='random', 
>     'fields.vc.min'='1', 
>     'fields.vc.max'='100'
> );

插入数据:

insert into t1(id,ts,vc) select * from source;

查询 kafka 表:

select * from t1;

3)upsert-kafka 表

        如果当前表存在更新操作,那么普通的kafka连接器将无法满足(因为普通的连接器不支持更新操作),此时可以使用Upsert Kafka连接器

        Upsert Kafka 连接器支持以 upsert 方式从 Kafka topic 中读取数据并将数据写入 Kafka topic。

        作为 source,upsert-kafka 连接器生产 changelog 流,其中每条数据记录代表一个更新或删除事件。更准确地说,数据记录中的 value 被解释为同一 key 的最后一个 value 的 UPDATE,如果有这个 key(如果不存在相应的 key,则该更新被视为 INSERT)。用表来类比,changelog 流中的数据记录被解释为 UPSERT,也称为 INSERT/UPDATE,因为任何具有相同 key 的现有行都被覆盖。另外,value 为空的消息将会被视作为 DELETE 消息。

        作为 sink,upsert-kafka 连接器可以消费 changelog 流。它会将 INSERT/UPDATE_AFTER 数据作为正常的 Kafka 消息写入,并将 DELETE 数据以 value 为空的 Kafka 消息写入(表示对应 key 的消息被删除)。Flink 将根据主键列的值对数据进行分区,从而保证主键上的消息有序,因此同一主键上的更新/删除消息将落在同一分区中。

1)创建upsert-kafka的映射表(必须定义主键)

CREATE TABLE t2( 
    id int , 
    sumVC int ,
    -- 主键必须 not enforced
    primary key (id) NOT ENFORCED 
)
WITH (
  'connector' = 'upsert-kafka',
  'properties.bootstrap.servers' = 'hadoop102:9092',
  'topic' = 'ws2',
  'key.format' = 'json',
  'value.format' = 'json'
)

2)插入 upset-kafka 表

insert into t2 select  id,sum(vc) sumVC  from source group by id;

3)查询 upset-kafka 表

select * from t2;

查询结果: 

可以看到,upsert-kafka 表是支持数据更新操作的。

1.2、File

Flink 天生就支持本地系统、HDFS 等。

1)创建 FileSystem 映射表

CREATE TABLE t3( id int, ts bigint , vc int )
WITH (
  'connector' = 'filesystem',
  -- 如果是本地系统就用 file:/// 
  'path' = 'hdfs://hadoop102:8020/data/t3',
  'format' = 'csv'
);

注意:之前我们在 flink 的 lib 目录下放了 hive 的连接器,这个包会和 flink 的依赖产生冲突:java.lang.ClassNotFoundException: org.apache.flink.table.planner.delegation.DialectFactory 我们需要把这个依赖移除掉或者改名并重启 sqlSession :

# 重命名连接器
mv flink-sql-connector-hive-3.1.3_2.12-1.17.0.jar flink-sql-connector-hive-3.1.3_2.12-1.17.0.jar.del
# yarn web端 kill 掉job
# 重启 yarn-session
bin/yarn-session.sh -d
bin/sql-client.sh embedded -s yarn-session.sh -i sql-client-init.sql

插入数据:

查询插入结果:

除了上面这种方式,我们还可以把 flink 目录下 opt/ 的 flink-table-planner-1.17.0.jar 和 lib/ 下面的 flink-table-planner-loader-1.17.0.jar 替换一下位置,这样我们就不用把 hive 的连接器移除带了。

1.3、JDBC

        Flink在将数据写入外部数据库时使用DDL中定义的主键。如果定义了主键,则连接器以upsert模式操作,否则,连接器以追加模式操作。

        在upsert模式下,Flink会根据主键插入新行或更新现有行,Flink这样可以保证幂等性。为了保证输出结果符合预期,建议为表定义主键,并确保主键是底层数据库表的唯一键集或主键之一。在追加模式下,Flink将所有记录解释为INSERT消息,如果底层数据库中发生了主键或唯一约束违反,则INSERT操作可能会失败。

1)mysql 的 test 库中建表

CREATE TABLE `ws2` (
  `id` int(11) NOT NULL,
  `ts` bigint(20) DEFAULT NULL,
  `vc` int(11) DEFAULT NULL,
  PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;

上传jdbc连接器的jar包和mysql的连接驱动包到flink/lib下:

  • flink-connector-jdbc-1.17-20230109.003314-120.jar
  • mysql-connector-j-5.1.7.jar
CREATE TABLE t4
(
    id INT,
    ts BIGINT,
    vc INT,
PRIMARY KEY (id) NOT ENFORCED
) WITH (
    'connector'='jdbc',
    'url' = 'jdbc:mysql://hadoop102:3306/test?useUnicode=true&characterEncoding=UTF-8',
    'username' = 'root',
    'password' = '123456',
    'connection.max-retry-timeout' = '60s',
    'table-name' = 'ws2',
    'sink.buffer-flush.max-rows' = '500',
    'sink.buffer-flush.interval' = '5s',
    'sink.max-retries' = '3',
    'sink.parallelism' = '1'
);

测试插入数据:

insert into t4 values(1,1,1);

 查看结果:

 这里,因为我们给 mysql 的这张表设置了主键,所以默认当出现和主键字段相同的新数据时,会直接以 upsert 的方式操作:

insert into t4 values(1,2,2);

运行结果: 

注意:我们这个表是和 mysql 关联的,所以我们不管对 mysql 操做还是对这张映射表操作都会互相影响,上面我们修改了映射表 t4 之后,同样会修改到 mysql 表 ws2(除了删除表格,删除flink sql 中的表格并不会删除mysql 中的表格

 

如果我们希望使用追加模式,就必须保证 mysql 表和 Flink SQL 表都是没有主键的。

2、sql-client 使用 savepoint

1)提交一个insert作业,可以给作业设置名称

Flink SQL> create table sink(
> id int,
> ts bigint,
> vc int
> )with(
> 'connector' = 'print'
> );
insert into sink select * from source;

2)查看 job 列表

查看 job 列表是为了获得 job id,我们提交作业的时候会返回一个 job id 可以在 shell 命令行看到,或者从 web ui 端也可以看到,再或者通过下面的命令看:

show jobs;

3)停止作业,触发 savepoint

SET state.checkpoints.dir='hdfs://hadoop102:8020/chk';
SET state.savepoints.dir='hdfs://hadoop102:8020/sp';
-- 结束作业不设置保存点
stop job 'e6d3e9afed97aee7819c460a6e109445';
-- 结束作业设置保存点
stop job 'e6d3e9afed97aee7819c460a6e109445' with savepoint;

4)从 savepoint 恢复

-- 设置从savepoint恢复的路径 
SET execution.savepoint.path='hdfs://hadoop102:8020/sp/savepoint-0e0742-7e2154873185';  

-- 之后直接提交sql,就会从savepoint恢复

--允许跳过无法还原的保存点状态
set 'execution.savepoint.ignore-unclaimed-state' = 'true'; 

5)恢复后重置路径

注意:我们设置 savepoint 恢复路径后,之后的所有 insert 任务都会默认使用这个 savepoint,所以下一个作业一定要重置这个配置参数:

指定execution.savepoint.path后,将影响后面执行的所有DML语句,可以使用RESET命令重置这个配置选项。

RESET execution.savepoint.path;

 如果出现reset没生效的问题,可能是个bug(包括 pipeline.name 这个参数也是),我们可以退出sql-client,再重新进,不需要重启flink的集群。

3、CateLog

        Catalog 提供了元数据信息,例如数据库、表、分区、视图以及数据库或其他外部系统中存储的函数和信息。它本来翻译过来就是目录,我们可以理解为它就是数据库的目录。

        数据处理最关键的方面之一是管理元数据。元数据可以是临时的,例如临时表、UDF。我们之前上面使用的表都是基于内存的一个 Catelog ,所以每次我们退出 sql-client 客户端的时候,这些表和数据库就不见了。元数据也可以是持久化的,例如 Hive MetaStore 中的元数据。Catalog 提供了一个统一的API,用于管理元数据,并使其可以从 Table API 和 SQL 查询语句中来访问。

        Catalog 允许用户引用其数据存储系统中现有的元数据,并自动将其映射到 Flink 的相应元数据。例如,Flink 可以直接使用 Hive MetaStore 中的表的元数据,不必在Flink中手动重写ddl,也可以将 Flink SQL 中的元数据存储到 Hive MetaStore 中。Catalog 极大地简化了用户开始使用 Flink 的步骤,并极大地提升了用户体验。

        注意:catalog 可以使得 mysql 、hive 和 flink 互通有无,互通就是可以操作读写(除了建表),而不是说只是在某个生命周期内起作用,只要连接上,flink 操作的就是实实在在的 hive 、mysql 本身,这才叫互通,而不是自嗨。

3.1、CateLog 类型

目前 Flink 包含了以下四种 Catalog:

  • GenericInMemoryCatalog:基于内存实现的 Catalog,所有元数据只在session 的生命周期(即一个 Flink 任务一次运行生命周期内)内可用。默认自动创建,会有名为“default_catalog”的内存Catalog,这个Catalog默认只有一个名为“default_database”的数据库。
  • JdbcCatalog:JdbcCatalog 使得用户可以将 Flink 通过 JDBC 协议连接到关系数据库。Postgres Catalog和MySQL Catalog是目前仅有的两种JDBC Catalog实现,将元数据存储在数据库中。
  • HiveCatalog:有两个用途,一是单纯作为 Flink 元数据的持久化存储,二是作为读写现有 Hive 元数据的接口。注意:Hive MetaStore 以小写形式存储所有元数据对象名称。Hive Metastore以小写形式存储所有元对象名称,而 GenericInMemoryCatalog会区分大小写。
  • 用户自定义 Catalog:用户可以实现 Catalog 接口实现自定义 Catalog。从Flink1.16开始引入了用户类加载器,通过CatalogFactory.Context#getClassLoader访问,否则会报错ClassNotFoundException。

3.2、JdbcCatalog(MySQL)

JdbcCatalog不支持建表,只是打通flink与mysql的连接,可以去读写mysql现有的库表。

1)上传所需jar包到lib下

  • flink-connector-jdbc-1.17-20230109.003314-120.jar
  • mysql-connector-j-5.1.7.jar

注意:Flink 是冷加载,所以上传后需要重启 yarn-session 和 sql-client

2)创建Catalog

JdbcCatalog支持以下选项:

  • name:必需,Catalog名称。
  • default-database:必需,连接到的默认数据库。
  • username: 必需,Postgres/MySQL帐户的用户名。
  • password:必需,该帐号的密码。
  • base-url:必需,数据库的jdbc url(不包含数据库名)

对于Postgres Catalog,是"jdbc:postgresql://<ip>:<端口>"

对于MySQL Catalog,是"jdbc: mysql://<ip>:<端口>"

CREATE CATALOG my_jdbc_catalog WITH(
    'type' = 'jdbc',
    -- 这里指定的只是默认使用的数据库 它会把所有数据库导进这个catalog下
    'default-database' = 'test',
    'username' = 'root',
    'password' = '123456',
    'base-url' = 'jdbc:mysql://hadoop102:3306'
);

3)查看 Catalog 

show catalogs;

4)使用指定的 Catalog

use catalog my_jdbc_catalog;

我们发现,除了 mysql 的系统数据库看不到,别的都别导进来了。

我们也可以直接往表中插入数据,而不用向之前那样去建立映射表:

insert into ws2 values(2,2,2);

 注意:在 jdbcCatalog 下是不支持建表的,什么表都不行(映射表或者普通表)!

要建表需要返回到之前默认的 default_catalog 才可以,但是我们是可以从 jdbc_catalog 去查 default_catalog 下的表数据的。

select * from default_catalog.mydatabase.source;

此外,我们也可以把不同类型catalog下不同的表数据关联在一起:

select * from default_catalog.mydatabase.source s join my_jdbc_catalog.test.ws2 w on s.id=w.id;

最后,每次我们退出 sql-client 的时候,其实我们创建的 jdbc_catalog 还是会被删除的,所以我们最好把创建catalog这些命令写进一个 sql 文件,初始化启动 sql-client 的时候执行一下。

3.3、HiveCatalog 

同样,HiveCatalog 可以打通所有 Hive 的库和表,这样我们就可以在 Flink 直接读写 Hive 表。此外,我们还可以在 catalog 下创建我们 Flink 的表,比如带有 Kafka 连接器的表,而且即使我们退出客户端,再次进去 HiveCatalog ,那张表还是存在的。

1)上传 jar 包

  • flink-sql-connector-hive-3.1.3_2.12-1.17.0.jar
  • mysql-connector-j-5.1.7.jar(我的 Hive 元数据存储在 MySQL)

2)更换planner依赖

只有在使用Hive方言或HiveServer2时才需要这样额外的计划器jar移动,但这是Hive集成的推荐设置。

这个我们之前使用 FileSystem 创建映射表的时候已经做过了。

3)重启flink集群和sql-client

4)启动外置的hive metastore服务

Hive metastore必须作为独立服务运行,也就是hive-site中必须配置hive.metastore.uris。(必须启动 hive 的元数据服务,不然我们flink无法获取hive中的数据)

# & 的意思是后台启动
# hive --service metastore &
# 这里直接启动我的 hive
hiveservice.sh start
# 查看hive 启动没有
hiveservice status

启动 hive 后会一直挂在那,我们可以判断一下元数据服务是否启动:

netstat -anp|grep 9083
# 或者
ps -ef|grep -i metastore

5)创建 Catalog 

配置项

必需

默认值

类型

说明

type

Yes

(none)

String

Catalog类型,创建HiveCatalog时必须设置为'hive'。

name

Yes

(none)

String

Catalog的唯一名称

hive-conf-dir

No

(none)

String

包含hive -site.xml的目录,需要Hadoop文件系统支持。如果没指定hdfs协议,则认为是本地文件系统。如果不指定该选项,则在类路径中搜索hive-site.xml。

default-database

No

default

String

Hive Catalog使用的默认数据库

hive-version

No

(none)

String

HiveCatalog能够自动检测正在使用的Hive版本。建议不要指定Hive版本,除非自动检测失败。

hadoop-conf-dir

No

(none)

String

Hadoop conf目录的路径。只支持本地文件系统路径。设置Hadoop conf的推荐方法是通过HADOOP_CONF_DIR环境变量。只有当环境变量不适合你时才使用该选项,例如,如果你想分别配置每个HiveCatalog。

CREATE CATALOG myhive WITH (
    'type' = 'hive',
    'default-database' = 'default',
    'hive-conf-dir' = '/opt/module/hive-3.1.2/conf'
);

 6)查看 catalog

我们在 hive 中创建一个数据库 test 再创建一张表 ws:

我们再往 ws 中插入一条数据:

hive(test)> insert into ws values(1,1,1);

我们可以看到,即使是插入一条数据,hive 也是转换为一个 MapReduce 作业,所以很慢。

不对劲,是完全卡死了,估计是 flink 和 hive 同时占用 yarn 的资源,导致资源不足 的原因(暂且怀疑是 Yarn 的CPU核数的配置问题),在修改完 yarn 的最大内存发现在 flink 往 hive 插入查询数据都没有问题了,但是 hive 还是不行,那就暂且用 flink 端操作 hive 吧。

没毛病,在 flink sql 客户端往 hive 插入数据后,在 flink 和 hive 中都可以看到结果。

        此外,我们在 hiveCatalog 下创建一张连接器为 FileSystem 的表,那么这张表是只能在 flink 环境下才能查到的,在hive是只能看到有这张表,查不到数据的:

我们去 hive 端查看一下:

我们发现,hive 端可以查看到存在这张表。

当我们查的时候,发现直接报错,毕竟它不是一个满足 hive 规范的表。同样,我们在 hiveCatalog 下创建一个连接器为 jdbcCatalog 的表,同样在 flink sql 中也是可以查到并正常使用的,但是在 hive 端同样只能看到表名。

现在,我们退出 sql-client,我们重新变价一下初始化 sql 文件:

 重新启动:

我们可以看到,虽然每次退出 sql-client 之后 catalog 下次启动就消失了,但是catalog 下面的表不会消失,我们只需要创建对应的 catalog 即可。

我们再解决一下刚才 hive插入数据失败的问题:

关闭hadoop、hive,修改yarn-site.xml 中 yarn的最大cpu核数=8(无所谓,反正在自己电脑上测试)。重启 hadoop、hive,测试插入:

说明 hive 没有问题,但是还不能说明我们配置的 yarn 有用,我们再启动 flink 的 yarn-session 和 sql-client:

提交插入作业(插入数据到 hive):

没毛病,这下问题测彻底解决了。

4、Table API

4.1、引入依赖

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-table-api-java-bridge</artifactId>
    <version>${flink.version}</version>
</dependency>

        这里的依赖是一个Java的“桥接器”(bridge),主要就是负责Table API和下层DataStream API的连接支持,按照不同的语言分为Java版和Scala版。

        如果我们希望在本地的集成开发环境(IDE)里运行Table API和SQL,还需要引入以下依赖:

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-table-planner-loader</artifactId>
    <version>${flink.version}</version>
</dependency>

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-table-runtime</artifactId>
    <version>${flink.version}</version>
</dependency>

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-connector-files</artifactId>
    <version>${flink.version}</version>
</dependency>

4.2、创建表环境

        对于Flink这样的流处理框架来说,数据流和表在结构上还是有所区别的。所以使用Table API和SQL需要一个特别的运行时环境,这就是所谓的“表环境”(TableEnvironment)。它主要负责:

(1)注册Catalog和表;

(2)执行 SQL 查询;

(3)注册用户自定义函数(UDF);

(4)DataStream 和表之间的转换。

每个表和SQL的执行,都必须绑定在一个表环境(TableEnvironment)中。TableEnvironment是Table API中提供的基本接口类,可以通过调用静态的create()方法来创建一个表环境实例。方法需要传入一个环境的配置参数EnvironmentSettings,它可以指定当前表环境的执行模式和计划器(planner)。执行模式有批处理和流处理两种选择,默认是流处理模式;计划器默认使用blink planner。

import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableEnvironment;

EnvironmentSettings settings = EnvironmentSettings
    .newInstance()
    .inStreamingMode()    // 使用流处理模式
    .build();

TableEnvironment tableEnv = TableEnvironment.create(setting);

对于流处理场景,其实默认配置就完全够用了。所以我们也可以用另一种更加简单的方式来创建表环境:

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);

这里我们引入了一个“流式表环境”(StreamTableEnvironment),它是继承自TableEnvironment的子接口。调用它的create()方法,只需要直接将当前的流执行环境(StreamExecutionEnvironment)传入,就可以创建出对应的流式表环境了。

4.3、创建表

表(Table)是我们非常熟悉的一个概念,它是关系型数据库中数据存储的基本形式,也是SQL执行的基本对象。

具体创建表的方式,有通过连接器(connector)和虚拟表(virtual tables)两种。

1)连接器表(Connector Tables)

最直观的创建表的方式,就是通过连接器(connector)连接到一个外部系统,然后定义出对应的表结构。

在代码中,我们可以调用表环境的executeSql()方法,可以传入一个DDL作为参数执行SQL操作。这里我们传入一个CREATE语句进行表的创建,并通过WITH关键字指定连接到外部系统的连接器:

tableEnv.executeSql("CREATE [TEMPORARY] TABLE MyTable ... WITH ( 'connector' = ... )");

这里的TEMPORARY关键字可以省略。关于连接器的具体定义,我们会在11.8节中展开讲解。

2)虚拟表(Virtual Tables)

在环境中注册之后,我们就可以在SQL中直接使用这张表进行查询转换了。

Table newTable = tableEnv.sqlQuery("SELECT ... FROM MyTable... ");

这里调用了表环境的sqlQuery()方法,直接传入一条SQL语句作为参数执行查询,得到的结果是一个Table对象。Table是Table API中提供的核心接口类,就代表了一个Java中定义的表实例。

由于newTable是一个Table对象,并没有在表环境中注册;所以如果希望直接在SQL中使用,我们还需要将这个中间结果表注册到环境中:

tableEnv.createTemporaryView("NewTable", newTable);

我们发现,这里的注册其实是创建了一个“虚拟表”(Virtual Table)。这个概念与SQL语法中的视图(View)非常类似,所以调用的方法也叫作创建“虚拟视图”(createTemporaryView)。

4.4、表的查询

创建好了表,接下来自然就是对表进行查询转换了。对一个表的查询(Query)操作,就对应着流数据的转换(Transform)处理。

Flink为我们提供了两种查询方式:SQL,和Table API。

1)执行SQL进行查询

基于表执行SQL语句,是我们最为熟悉的查询方式。

在代码中,我们只要调用表环境的sqlQuery()方法,传入一个字符串形式的SQL查询语句就可以了。执行得到的结果,是一个Table对象。

// 创建表环境
TableEnvironment tableEnv = ...; 

// 创建表
tableEnv.executeSql("CREATE TABLE EventTable ... WITH ( 'connector' = ... )");

// 查询用户Alice的点击事件,并提取表中前两个字段
Table aliceVisitTable = tableEnv.sqlQuery(
    "SELECT user, url " +
    "FROM EventTable " +
    "WHERE user = 'Alice' "
  );

目前Flink支持标准SQL中的绝大部分用法,并提供了丰富的计算函数。这样我们就可以把已有的技术迁移过来,像在MySQL、Hive中那样直接通过编写SQL实现自己的处理需求,从而大大降低了Flink上手的难度。

例如,我们也可以通过GROUP BY关键字定义分组聚合,调用COUNT()、SUM()这样的函数来进行统计计算:

Table urlCountTable = tableEnv.sqlQuery(
    "SELECT user, COUNT(url) " +
    "FROM EventTable " +
    "GROUP BY user "
);

上面的例子得到的是一个新的Table对象,我们可以再次将它注册为虚拟表继续在SQL中调用。另外,我们也可以直接将查询的结果写入到已经注册的表中,这需要调用表环境的executeSql()方法来执行DDL,传入的是一个INSERT语句:

// 注册表
tableEnv.executeSql("CREATE TABLE EventTable ... WITH ( 'connector' = ... )");
tableEnv.executeSql("CREATE TABLE OutputTable ... WITH ( 'connector' = ... )");


// 将查询结果输出到OutputTable中
tableEnv.executeSql (
"INSERT INTO OutputTable " +
    "SELECT user, url " +
    "FROM EventTable " +
    "WHERE user = 'Alice' "
  );

2)调用Table API进行查询

另外一种查询方式就是调用Table API。这是嵌入在Java和Scala语言内的查询API,核心就是Table接口类,通过一步步链式调用Table的方法,就可以定义出所有的查询转换操作。

由于Table API是基于Table的Java实例进行调用的,因此我们首先要得到表的Java对象。基于环境中已注册的表,可以通过表环境的from()方法非常容易地得到一个Table对象:

Table eventTable = tableEnv.from("EventTable");

传入的参数就是注册好的表名。注意这里eventTable是一个Table对象,而EventTable是在环境中注册的表名。得到Table对象之后,就可以调用API进行各种转换操作了,得到的是一个新的Table对象:

Table maryClickTable = eventTable
        .where($("user").isEqual("Alice"))
        .select($("url"), $("user"));

这里每个方法的参数都是一个“表达式”(Expression),用方法调用的形式直观地说明了想要表达的内容;“$”符号用来指定表中的一个字段。上面的代码和直接执行SQL是等效的。

Table API是嵌入编程语言中的DSL,SQL中的很多特性和功能必须要有对应的实现才可以使用,因此跟直接写SQL比起来肯定就要麻烦一些。目前Table API支持的功能相对更少,可以预见未来Flink社区也会以扩展SQL为主,为大家提供更加通用的接口方式;所以我们接下来也会以介绍SQL为主,简略地提及Table API。

3)两种API的结合使用

可以发现,无论是调用Table API还是执行SQL,得到的结果都是一个Table对象;所以这两种API的查询可以很方便地结合在一起。

(1)无论是那种方式得到的Table对象,都可以继续调用Table API进行查询转换;

(2)如果想要对一个表执行SQL操作(用FROM关键字引用),必须先在环境中对它进行注册。所以我们可以通过创建虚拟表的方式实现两者的转换:

tableEnv.createTemporaryView("MyTable", myTable);

两种API殊途同归,实际应用中可以按照自己的习惯任意选择。不过由于结合使用容易引起混淆,而Table API功能相对较少、通用性较差,所以企业项目中往往会直接选择SQL的方式来实现需求。

4.5、输出表

表的创建和查询,就对应着流处理中的读取数据源(Source)和转换(Transform);而最后一个步骤Sink,也就是将结果数据输出到外部系统,就对应着表的输出操作。

在代码上,输出一张表最直接的方法,就是调用Table的方法executeInsert()方法将一个 Table写入到注册过的表中,方法传入的参数就是注册的表名。

// 注册表,用于输出数据到外部系统
tableEnv.executeSql("CREATE TABLE OutputTable ... WITH ( 'connector' = ... )");

// 经过查询转换,得到结果表
Table result = ...

// 将结果表写入已注册的输出表中
result.executeInsert("OutputTable");

在底层,表的输出是通过将数据写入到TableSink来实现的。TableSink是Table API中提供的一个向外部系统写入数据的通用接口,可以支持不同的文件格式(比如CSV、Parquet)、存储数据库(比如JDBC、Elasticsearch)和消息队列(比如Kafka)。

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import static org.apache.flink.table.api.Expressions.$;

public class SqlDemo {
    public static void main(String[] args) {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // TODO 1.创建表环境
        // 1.1 写法1:
//        EnvironmentSettings es = EnvironmentSettings.newInstance()
//                .inStreamingMode()
//                .build();
//        TableEnvironment table_env = TableEnvironment.create(es);

        // 1.2 写法2:
        StreamTableEnvironment table_env = StreamTableEnvironment.create(env);

        // TODO 2. 创建表
        table_env.executeSql("CREATE TABLE source ( \n" +
                "    id INT, \n" +
                "    ts BIGINT, \n" +
                "    vc INT\n" +
                ") WITH ( \n" +
                "    'connector' = 'datagen', \n" +
                "    'rows-per-second'='1', \n" +
                "    'fields.id.kind'='random', \n" +
                "    'fields.id.min'='1', \n" +
                "    'fields.id.max'='10', \n" +
                "    'fields.ts.kind'='sequence', \n" +
                "    'fields.ts.start'='1', \n" +
                "    'fields.ts.end'='1000000', \n" +
                "    'fields.vc.kind'='random', \n" +
                "    'fields.vc.min'='1', \n" +
                "    'fields.vc.max'='100'\n" +
                ");\n");
        table_env.executeSql("CREATE TABLE sink (\n" +
                "    id INT, \n" +
                "    sum_vc INT \n" +
                ") WITH (\n" +
                "'connector' = 'print'\n" +
                ");\n");

        // TODO 3. 执行查询 查询的结果也是一张表
        // 3.1 使用sql进行查询
        Table table = table_env.sqlQuery("select id,sum(vc) as sum_vc from source where id > 5 group by id;");
        // 把 table 对象注册成为表名
        table_env.createTemporaryView("tmp",table);
//        table_env.sqlQuery("select * from tmp;");
        // 3.2 用table api查询
//        Table source = table_env.from("source");
//        Table result = source
//                .where($("id").isGreater(5))
//                .groupBy($("id"))
//                .aggregate($("vc").sum().as("sum_vc"))
//                .select($("id"), $("sum_vc"));

        // TODO 4. 输出表
        // 4.1 sql用法
        table_env.executeSql("insert into sink select * from tmp");
        // 4.2 table api
//        result.executeInsert("sink");
    }
}

运行结果: 

4.6 表和流的转换

既然都不用 SQL 了,我们不可能用 API 只是为了没事找事干,而是为了方便结合一些底层的 API,比如我们之前学的 DataSream API。

4.6.1、将流(DataStream)转换成表(Table)

1. 调用fromDataStream()方法

想要将一个DataStream转换成表很简单,可以通过调用表环境的fromDataStream()方法来实现,返回的就是一个Table对象。

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

// 获取表环境
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);

// 读取数据源
SingleOutputStreamOperator<WaterSensor> sensorDS = env.fromSource(...)

// 将数据流转换成表
Table sensorTable = tableEnv.fromDataStream(sensorDS);

由于流中的数据本身就是定义好的POJO类型WaterSensor,所以我们将流转换成表之后,每一行数据就对应着一个WaterSensor,而表中的列名就对应着WaterSensor中的属性。

另外,我们还可以在fromDataStream()方法中增加参数,用来指定提取哪些属性作为表中的字段名,并可以任意指定位置:

// 提取Event中的timestamp和url作为表中的列
Table sensorTable = tableEnv.fromDataStream(sensorDS, $("id"), $("vc"));

也可以通过表达式的as()方法对字段进行重命名:

// 将timestamp字段重命名为ts
Table sensorTable = tableEnv.fromDataStream(sensorDS, $("id").as("sid"), $("vc"));

(2)调用createTemporaryView()方法

       调用fromDataStream()方法简单直观,可以直接实现DataStream到Table的转换;不过如果我们希望直接在SQL中引用这张表,就还需要调用表环境的createTemporaryView()方法来创建虚拟视图了。

        对于这种场景,也有一种更简洁的调用方式。我们可以直接调用createTemporaryView()方法创建虚拟表,传入的两个参数,第一个依然是注册的表名,而第二个可以直接就是DataStream。之后仍旧可以传入多个参数,用来指定表中的字段

tableEnv.createTemporaryView("sensorTable",sensorDS, $("id"),$("ts"),$("vc"));

 这样,我们接下来就可以直接在SQL中引用表sensorTable了。

4.6.2、将表(Table)转换成流(DataStream)

(1)调用toDataStream()方法

将一个Table对象转换成DataStream非常简单,只要直接调用表环境的方法toDataStream()就可以了。例如,我们可以将2.4小节经查询转换得到的表aliceClickTable转换成流打印输出:

tableEnv.toDataStream(table).print();

(2)调用toChangelogStream()方法

urlCountTable这个表中进行了分组聚合统计,所以表中的每一行是会“更新”的。对于这样有更新操作的表,我们不应该直接把它转换成DataStream打印输出,而是记录一下它的“更新日志”(change log)。这样一来,对于表的所有更新操作,就变成了一条更新日志的流,我们就可以转换成流打印输出了。

代码中需要调用的是表环境的toChangelogStream()方法:

Table table = tableEnv.sqlQuery(
    "SELECT id, sum(vc) " +
    "FROM source " +
    "GROUP BY id "
  );

// 将表转换成更新日志流
tableEnv.toChangelogStream(table).print();

4.6.3、支持的数据类型

整体来看,DataStream中支持的数据类型,Table中也是都支持的,只不过在进行转换时需要注意一些细节。

1. 原子类型

在Flink中,基础数据类型(Integer、Double、String)和通用数据类型(也就是不可再拆分的数据类型)统一称作“原子类型”。原子类型的DataStream,转换之后就成了只有一列的Table,列字段(field)的数据类型可以由原子类型推断出。另外,还可以在fromDataStream()方法里增加参数,用来重新命名列字段。

StreamTableEnvironment tableEnv = ...;

DataStream<Long> stream = ...;

// 将数据流转换成动态表,动态表只有一个字段,重命名为myLong
Table table = tableEnv.fromDataStream(stream, $("myLong"));
2. Tuple类型

当原子类型不做重命名时,默认的字段名就是“f0”,容易想到,这其实就是将原子类型看作了一元组Tuple1的处理结果。

Table支持Flink中定义的元组类型Tuple,对应在表中字段名默认就是元组中元素的属性名f0、f1、f2...。所有字段都可以被重新排序,也可以提取其中的一部分字段。字段还可以通过调用表达式的as()方法来进行重命名。

StreamTableEnvironment tableEnv = ...;

DataStream<Tuple2<Long, Integer>> stream = ...;

// 将数据流转换成只包含f1字段的表
Table table = tableEnv.fromDataStream(stream, $("f1"));

// 将数据流转换成包含f0和f1字段的表,在表中f0和f1位置交换
Table table = tableEnv.fromDataStream(stream, $("f1"), $("f0"));

// 将f1字段命名为myInt,f0命名为myLong
Table table = tableEnv.fromDataStream(stream, $("f1").as("myInt"), $("f0").as("myLong"));
3. POJO 类型

Flink也支持多种数据类型组合成的“复合类型”,最典型的就是简单Java对象(POJO 类型)。由于POJO中已经定义好了可读性强的字段名,这种类型的数据流转换成Table就显得无比顺畅了。

将POJO类型的DataStream转换成Table,如果不指定字段名称,就会直接使用原始 POJO 类型中的字段名称。POJO中的字段同样可以被重新排序、提却和重命名。

StreamTableEnvironment tableEnv = ...;

DataStream<Event> stream = ...;

Table table = tableEnv.fromDataStream(stream);
Table table = tableEnv.fromDataStream(stream, $("user"));
Table table = tableEnv.fromDataStream(stream, $("user").as("myUser"), $("url").as("myUrl"));
4. Row类型

Flink中还定义了一个在关系型表中更加通用的数据类型——行(Row),它是Table中数据的基本组织形式。

Row类型也是一种复合类型,它的长度固定,而且无法直接推断出每个字段的类型,所以在使用时必须指明具体的类型信息;我们在创建Table时调用的CREATE语句就会将所有的字段名称和类型指定,这在Flink中被称为表的“模式结构”(Schema)。

4.6.4、综合应用示例

package com.lyh.sql;

import com.lyh.bean.WaterSensor;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

import static org.apache.flink.table.api.Expressions.$;

public class TableStreamDemo {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        DataStreamSource<WaterSensor> sensorDS = env.fromElements(
                new WaterSensor("s1", 1L, 1),
                new WaterSensor("s1", 2L, 2),
                new WaterSensor("s2", 2L, 21),
                new WaterSensor("s3", 3L, 3),
                new WaterSensor("s3", 4L, 4)
        );


        // TODO 1.创建表环境
        StreamTableEnvironment table_env = StreamTableEnvironment.create(env);

        // TODO 1. 流 -> 表
        // 属性名 就是表的 字段名
        Table sensorTable = table_env.fromDataStream(sensorDS);
        // 或者指定保留哪些字段
//        table_env.fromDataStream(sensorDS,$("id"));
        // 注册
        table_env.createTemporaryView("sensor",sensorTable);

        Table result = table_env.sqlQuery("select id,sum(vc) from sensor group by id");
        Table filter = table_env.sqlQuery("select id,ts,vc from sensor where ts > 2");

        // TODO 2. 表 -> 流
        // 2.1 追加流
        table_env.toDataStream(filter,WaterSensor.class).print("filter");
        // 2.2 更新流(结果需要更新)
        table_env.toChangelogStream(result).print("sum");

        // 只要代码中调用了 DataStream 就需要使用 execute
        env.execute();
    }
}

运行结果:

4.7 自定义函数(UDF)

系统函数尽管庞大,也不可能涵盖所有的功能;如果有系统函数不支持的需求,我们就需要用自定义函数(User Defined Functions,UDF)来实现了。

Flink的Table API和SQL提供了多种自定义函数的接口,以抽象类的形式定义。当前UDF主要有以下几类:

  1. 标量函数(Scalar Functions):将输入的标量值转换成一个新的标量值;
  2. 表函数(Table Functions):将标量值转换成一个或多个新的行数据,也就是扩展成一个表;
  3. 聚合函数(Aggregate Functions):将多行数据里的标量值转换成一个新的标量值;
  4. 表聚合函数(Table Aggregate Functions):将多行数据里的标量值转换成一个或多个新的行数据。

4.7.1、整体调用流程

要想在代码中使用自定义的函数,我们需要首先自定义对应UDF抽象类的实现,并在表环境中注册这个函数,然后就可以在Table API和SQL中调用了。

(1)注册函数

注册函数时需要调用表环境的createTemporarySystemFunction()方法,传入注册的函数名以及UDF类的Class对象:

// 注册函数
tableEnv.createTemporarySystemFunction("MyFunction", MyFunction.class);

我们自定义的UDF类叫作MyFunction,它应该是上面四种UDF抽象类中某一个的具体实现;在环境中将它注册为名叫MyFunction的函数。

(2)使用Table API调用函数

在Table API中,需要使用call()方法来调用自定义函数:

tableEnv.from("MyTable").select(call("MyFunction", $("myField")));

这里call()方法有两个参数,一个是注册好的函数名MyFunction,另一个则是函数调用时本身的参数。这里我们定义MyFunction在调用时,需要传入的参数是myField字段。

(3)在SQL中调用函数

当我们将函数注册为系统函数之后,在SQL中的调用就与内置系统函数完全一样了:

tableEnv.sqlQuery("SELECT MyFunction(myField) FROM MyTable");

可见,SQL的调用方式更加方便,我们后续依然会以SQL为例介绍UDF的用法。

4.7.2、标量函数(Scalar Functions

自定义标量函数可以把0个、 1个或多个标量值转换成一个标量值,它对应的输入是一行数据中的字段,输出则是唯一的值。所以从输入和输出表中行数据的对应关系看,标量函数是“一对一”的转换。

想要实现自定义的标量函数,我们需要自定义一个类来继承抽象类ScalarFunction,并实现叫作eval() 的求值方法。标量函数的行为就取决于求值方法的定义,它必须是公有的(public),而且名字必须是eval。求值方法eval可以重载多次,任何数据类型都可作为求值方法的参数和返回值类型。

这里需要特别说明的是,ScalarFunction抽象类中并没有定义eval()方法,所以我们不能直接在代码中重写(override);但Table API的框架底层又要求了求值方法必须名字为eval()。这是Table API和SQL目前还显得不够完善的地方,未来的版本应该会有所改进。

下面我们来看一个具体的例子。我们实现一个自定义的哈希(hash)函数HashFunction,用来求传入对象的哈希值。

package com.lyh.sql;

import org.apache.flink.table.annotation.DataTypeHint;
import org.apache.flink.table.annotation.InputGroup;
import org.apache.flink.table.functions.ScalarFunction;

// TODO 自定义函数的实现类
public class MyHashFunction extends ScalarFunction {
    // 接受任意类型的输入 返回int类型输出
    public int eval(@DataTypeHint(inputGroup = InputGroup.ANY) Object o){
        return o.hashCode();
    }
}
package com.lyh.sql;

import com.lyh.bean.WaterSensor;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

import static org.apache.flink.table.api.Expressions.$;
import static org.apache.flink.table.api.Expressions.call;

public class ScalarFunctionDemo {
    public static void main(String[] args) {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        DataStreamSource<WaterSensor> sensorDS = env.fromElements(
                new WaterSensor("s1", 1L, 1),
                new WaterSensor("s1", 2L, 2),
                new WaterSensor("s2", 2L, 21),
                new WaterSensor("s3", 3L, 3),
                new WaterSensor("s3", 4L, 4)
        );


        // TODO 1.创建表环境
        StreamTableEnvironment table_env = StreamTableEnvironment.create(env);
        // TODO 流 -> 表
        // 属性名 就是表的 字段名
        Table sensorTable = table_env.fromDataStream(sensorDS);
        table_env.createTemporaryView("sensor",sensorTable);

        //TODO 2. 注册函数
        table_env.createTemporaryFunction("hashFunction",MyHashFunction.class);

        // TODO 3. 调用自定义函数
        // 3.1 sql 用法
        table_env.sqlQuery("select hashFunction(id) from sensor")
                // 调用了 sql 的 execute 就不需要 env.execute()
                .execute()
                .print();
        // 3.2 table api 用法
        sensorTable
                // hashFunction的eval方法有注解的原因就是因为要告诉这里的参数类型
                .select(call("hashFunction",$("id")))
                .execute()
                .print();

    }
}

运行结果:

这里我们自定义了一个ScalarFunction,实现了eval()求值方法,将任意类型的对象传入,得到一个Int类型的哈希值返回。当然,具体的求哈希操作就省略了,直接调用对象的hashCode()方法即可。

另外注意,由于Table API在对函数进行解析时需要提取求值方法参数的类型引用,所以我们用DataTypeHint(inputGroup = InputGroup.ANY)对输入参数的类型做了标注,表示eval的参数可以是任意类型。

4.7.3、表函数(Table Functions

        跟标量函数一样,表函数的输入参数也可以是 0个、1个或多个标量值;不同的是,它可以返回任意多行数据。“多行数据”事实上就构成了一个表,所以“表函数”可以认为就是返回一个表的函数,这是一个“一对多”的转换关系。之前我们介绍过的窗口TVF,本质上就是表函数。

        类似地,要实现自定义的表函数,需要自定义类来继承抽象类TableFunction,内部必须要实现的也是一个名为 eval 的求值方法。与标量函数不同的是,TableFunction类本身是有一个泛型参数T的,这就是表函数返回数据的类型;而eval()方法没有返回类型,内部也没有return语句,是通过调用collect()方法来发送想要输出的行数据的。

        在SQL中调用表函数,需要使用LATERAL TABLE(<TableFunction>)来生成扩展的“侧向表”,然后与原始表进行联结(Join)。这里的Join操作可以是直接做交叉联结(cross join),在FROM后用逗号分隔两个表就可以;也可以是以ON TRUE为条件的左联结(LEFT JOIN)。

        下面是表函数的一个具体示例。我们实现了一个分隔字符串的函数SplitFunction,可以将一个字符串转换成(字符串,长度)的二元组。

package com.lyh.sql;

import org.apache.flink.table.annotation.DataTypeHint;
import org.apache.flink.table.annotation.FunctionHint;
import org.apache.flink.table.functions.TableFunction;
import org.apache.flink.types.Row;

@FunctionHint(output = @DataTypeHint("ROW<word STRING,length INT>"))
public class MySplitFunction extends TableFunction<Row> {

    // 返回是 void ,用 collect 方法输出
    public void eval(String str){
        for (String word : str.split(" ")) {
            collect(Row.of(word,word.length()));
        }
    }
}
package com.lyh.sql;

import com.lyh.bean.WaterSensor;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

import static org.apache.flink.table.api.Expressions.$;
import static org.apache.flink.table.api.Expressions.call;

public class ScalarFunctionDemo2 {
    public static void main(String[] args) {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        DataStreamSource<String> wordsDS = env.fromElements(
                "hello flink",
                "hello hadoop",
                "hello kafka",
                "hello spark",
                "hello hive and impala"
        );
        
        // TODO 1.创建表环境
        StreamTableEnvironment table_env = StreamTableEnvironment.create(env);
        // TODO 流 -> 表
        // 属性名 就是表的 字段名
        Table sensorTable = table_env.fromDataStream(wordsDS,$("name"));// 字段名
        table_env.createTemporaryView("words",sensorTable);

        //TODO 2. 注册函数
        table_env.createTemporaryFunction("splitFunction",MySplitFunction.class);

        // TODO 3. 调用自定义函数
        // 交叉用法
        table_env.sqlQuery("select word,length from words,lateral table (splitFunction(name))")
                // 调用了 sql 的 execute 就不需要 env.execute()
                .execute()
                .print();

    }
}

运行结果:

左联结:

table_env
     .sqlQuery("select name,word,length from words left join lateral table (splitFunction(name)) on true")
      .execute()
      .print();

字段重命名:

.sqlQuery("select name,newWord,newLength from words left join lateral table (splitFunction(name)) as T(newWord,newLength) on true")

        这里我们直接将表函数的输出类型定义成了ROW,这就是得到的侧向表中的数据类型;每行数据转换后也只有一行。我们分别用交叉联结和左联结两种方式在SQL中进行了调用,还可以对侧向表的中字段进行重命名。

4.7.4、聚合函数(Aggregate Functions

用户自定义聚合函数(User Defined AGGregate function,UDAGG)会把一行或多行数据(也就是一个表)聚合成一个标量值。这是一个标准的“多对一”的转换。

聚合函数的概念我们之前已经接触过多次,如SUM()、MAX()、MIN()、AVG()、COUNT()都是常见的系统内置聚合函数。而如果有些需求无法直接调用系统函数解决,我们就必须自定义聚合函数来实现功能了。

自定义聚合函数需要继承抽象类AggregateFunction。AggregateFunction有两个泛型参数<T, ACC>,T表示聚合输出的结果类型,ACC则表示聚合的中间状态类型。

Flink SQL中的聚合函数的工作原理如下:

(1)首先,它需要创建一个累加器(accumulator),用来存储聚合的中间结果。这与DataStream API中的AggregateFunction非常类似,累加器就可以看作是一个聚合状态。调用createAccumulator()方法可以创建一个空的累加器。

(2)对于输入的每一行数据,都会调用accumulate()方法来更新累加器,这是聚合的核心过程。

(3)当所有的数据都处理完之后,通过调用getValue()方法来计算并返回最终的结果。

所以,每个 AggregateFunction 都必须实现以下几个方法:

  1. createAccumulator()

这是创建累加器的方法。没有输入参数,返回类型为累加器类型ACC。

  1. accumulate()

这是进行聚合计算的核心方法,每来一行数据都会调用。它的第一个参数是确定的,就是当前的累加器,类型为ACC,表示当前聚合的中间状态;后面的参数则是聚合函数调用时传入的参数,可以有多个,类型也可以不同。这个方法主要是更新聚合状态,所以没有返回类型。需要注意的是,accumulate()与之前的求值方法eval()类似,也是底层架构要求的,必须为public,方法名必须为accumulate,且无法直接override、只能手动实现。

  1. getValue()

这是得到最终返回结果的方法。输入参数是ACC类型的累加器,输出类型为T。

在遇到复杂类型时,Flink 的类型推导可能会无法得到正确的结果。所以AggregateFunction也可以专门对累加器和返回结果的类型进行声明,这是通过 getAccumulatorType()和getResultType()两个方法来指定的。

AggregateFunction 的所有方法都必须是 公有的(public),不能是静态的(static),而且名字必须跟上面写的完全一样。createAccumulator、getValue、getResultType 以及 getAccumulatorType 这几个方法是在抽象类 AggregateFunction 中定义的,可以override;而其他则都是底层架构约定的方法。

下面举一个具体的示例,我们从学生的分数表ScoreTable中计算每个学生的加权平均分。

package com.lyh.sql;

import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.table.functions.AggregateFunction;

// 泛型 T:返回类型 ACC: 累加器类型<加权总和,权重总和>
public class MyWeightAvg extends AggregateFunction<Double, Tuple2<Integer,Integer>> {

    /**
     * 计算累加和
     * @param acc 累加器的类型 固定写法
     * @param score 分神
     * @param weight 权重
     */
    public void accumulate(Tuple2<Integer, Integer> acc,Integer score,Integer weight){
        acc.f0 += score*weight;
        acc.f1 += weight;
    }

    @Override
    public Double getValue(Tuple2<Integer, Integer> integerIntegerTuple2) {
        return integerIntegerTuple2.f0*1d/integerIntegerTuple2.f1;
    }

    @Override
    public Tuple2<Integer, Integer> createAccumulator() {
        return Tuple2.of(0,0);
    }
}
package com.lyh.sql;

import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

import static org.apache.flink.table.api.Expressions.$;

public class ScalarFunctionDemo3 {
    public static void main(String[] args) {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        DataStreamSource<Tuple3<String,Integer,Integer>> scoreDS = env.fromElements(
                Tuple3.of("燕双鹰",80,3),
                Tuple3.of("李大喜",90,4),
                Tuple3.of("李大喜",88,4),
                Tuple3.of("狄仁杰",95,4),
                Tuple3.of("狄仁杰",86,4)
        );

        // TODO 1.创建表环境
        StreamTableEnvironment table_env = StreamTableEnvironment.create(env);
        // TODO 流 -> 表
        // 属性名 就是表的 字段名
        Table scoreTable = table_env.fromDataStream(scoreDS,$("f0").as("name"),$("f1").as("score"),$("f2").as("weight"));// 字段名
        table_env.createTemporaryView("scores",scoreTable);

        //TODO 2. 注册函数
        table_env.createTemporaryFunction("weightAvg",MyWeightAvg.class);

        // TODO 3. 调用自定义函数
        table_env
                .sqlQuery("select name,weightAvg(score,weight) from scores group by name")
                .execute()
                .print();
    }
}

运行结果:

 

聚合函数的accumulate()方法有三个输入参数。第一个是WeightedAvgAccum类型的累加器;另外两个则是函数调用时输入的字段:要计算的值 ivalue 和 对应的权重 iweight。这里我们并不考虑其它方法的实现,只要有必须的三个方法就可以了。

4.7.5、表聚合函数(Table Aggregate Functions

        用户自定义表聚合函数(UDTAGG)可以把一行或多行数据(也就是一个表)聚合成另一张表,结果表中可以有多行多列。很明显,这就像表函数和聚合函数的结合体,是一个“多对多”的转换。

        自定义表聚合函数需要继承抽象类TableAggregateFunction。TableAggregateFunction的结构和原理与AggregateFunction非常类似,同样有两个泛型参数<T, ACC>,用一个ACC类型的累加器(accumulator)来存储聚合的中间结果。聚合函数中必须实现的三个方法,在TableAggregateFunction中也必须对应实现:

  • createAccumulator()
  •     创建累加器的方法,与AggregateFunction中用法相同。
  • accumulate()
  •     聚合计算的核心方法,与AggregateFunction中用法相同。
  • emitValue()

    所有输入行处理完成后,输出最终计算结果的方法。这个方法对应着AggregateFunction中的getValue()方法;区别在于emitValue没有输出类型,而输入参数有两个:第一个是ACC类型的累加器,第二个则是用于输出数据的“收集器”out,它的类型为Collect<T>。另外,emitValue()在抽象类中也没有定义,无法override,必须手动实现。

        表聚合函数相对比较复杂,它的一个典型应用场景就是TOP-N查询。比如我们希望选出一组数据排序后的前两名,这就是最简单的TOP-2查询。没有现成的系统函数,那么我们就可以自定义一个表聚合函数来实现这个功能。在累加器中应该能够保存当前最大的两个值,每当来一条新数据就在accumulate()方法中进行比较更新,最终在emitValue()中调用两次out.collect()将前两名数据输出。

具体代码如下:

package com.lyh.sql;

import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.table.functions.TableAggregateFunction;
import org.apache.flink.util.Collector;

// T: 返回类型(数值,排名) ACC:累加器类型(第一大的数,第二大的数)
public class MyTableAggregate extends TableAggregateFunction<Tuple2<Integer,Integer>,Tuple2<Integer,Integer>> {

    @Override
    public Tuple2<Integer, Integer> createAccumulator() {
        return Tuple2.of(0,0);
    }

    /**
     *
     * 每来一个数据 调用一次 判断比较大小 更新top2到acc中
     * @param acc 累加器类型
     * @param num 过来的数据
     */
    public void accumulate(Tuple2<Integer,Integer> acc,Integer num){
        if (num > acc.f0){
            // 新来的变第一,旧的第一变第二
            acc.f1 = acc.f0;
            acc.f0 = num;
        }else if (num > acc.f1){
            // 新来的变第二,旧的第二删除
            acc.f1 = num;
        }
    }

    /**
     * 输出结果 (数值,排名)
     * @param acc 累加器类型
     * @param out 采集器类型,和输出结果类型一样
     */
    public void emitValue(Tuple2<Integer,Integer> acc, Collector<Tuple2<Integer,Integer>> out){
        if (acc.f0 != 0){
            out.collect(Tuple2.of(acc.f0,1));
        }
        if (acc.f1 != 0){
            out.collect(Tuple2.of(acc.f1,2));
        }
    }
}
package com.lyh.sql;

import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

import static org.apache.flink.table.api.Expressions.$;
import static org.apache.flink.table.api.Expressions.call;

public class ScalarFunctionDemo4 {
    public static void main(String[] args) {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        DataStreamSource<Integer> valDS = env.fromElements(3,5,4,7,5,6,1,4,2);

        // TODO 1.创建表环境
        StreamTableEnvironment table_env = StreamTableEnvironment.create(env);
        // TODO 流 -> 表
        // 属性名 就是表的 字段名
        Table valueTable = table_env.fromDataStream(valDS,$("value"));// 字段名
        table_env.createTemporaryView("values",valueTable);

        //TODO 2. 注册函数
        table_env.createTemporaryFunction("top2",MyTableAggregate.class);

        // TODO 3. 调用自定义函数: 只支持 table api
        valueTable
                .flatAggregate(call("top2",$("value")).as("value","rank"))
                .select($("value"),$("rank"))
                .execute()
                .print();
    }
}

运行结果:

+----+-------------+-------------+
| op |       value |        rank |
+----+-------------+-------------+
| +I |           3 |           1 |
| -D |           3 |           1 |
| +I |           5 |           1 |
| +I |           3 |           2 |
| -D |           5 |           1 |
| -D |           3 |           2 |
| +I |           5 |           1 |
| +I |           4 |           2 |
| -D |           5 |           1 |
| -D |           4 |           2 |
| +I |           7 |           1 |
| +I |           5 |           2 |
| -D |           7 |           1 |
| -D |           5 |           2 |
| +I |           7 |           1 |
| +I |           5 |           2 |
| -D |           7 |           1 |
| -D |           5 |           2 |
| +I |           7 |           1 |
| +I |           6 |           2 |
| -D |           7 |           1 |
| -D |           6 |           2 |
| +I |           7 |           1 |
| +I |           6 |           2 |
| -D |           7 |           1 |
| -D |           6 |           2 |
| +I |           7 |           1 |
| +I |           6 |           2 |
| -D |           7 |           1 |
| -D |           6 |           2 |
| +I |           7 |           1 |
| +I |           6 |           2 |
+----+-------------+-------------+
32 rows in set

目前SQL中没有直接使用表聚合函数的方式,所以需要使用Table API的方式来调用。

        这里使用了flatAggregate()方法,它就是专门用来调用表聚合函数的接口。统计num值最大的两个;并将聚合结果的两个字段重命名为value和rank,之后就可以使用select()将它们提取出来了。

总结

这节课用到不少东西:Kafka、Hive、MySQL、Flink,需要注意的地方很多,忘了的东西也很多。这里记录一下:

  • 关闭 hadoop 前 把 yarn里的任务都 kill 掉(尤其是 flink 的 yarn-session)

一些启动命令:

  • 启动 hive:hiveservices.sh start
  • 启动 flink sql:
    • bin/sqlsession -d
    • bin/sql-client embeded -s yarn-session -i sql-client-init.sql

        现在是 2024-01-23 22:13,终于把 Flink 完结了,从这学期的开始,耗时半年断断续续。Flink 是我难以言说的喜欢的一门课,没有缘由,希望接下来可以好好把它弄熟,更希望未来若干年的工作可以都和它打交道。

        永言配命,自求多福,希望未来能有一个好的结果,浅浅期待一下吧 ~

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1406586.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

机器学习预测全家桶之单变量输入多步预测,天气温度预测为例,MATLAB代码

截止到本期&#xff0c;一共发了8篇关于机器学习预测全家桶的文章。参考文章如下&#xff1a; 1.五花八门的机器学习预测&#xff1f;一篇搞定不行吗&#xff1f; 2.机器学习预测全家桶&#xff0c;多步预测之BiGRU、BiLSTM、GRU、LSTM&#xff0c;LSSVM、TCN、CNN&#xff0c;…

怎么快速发表一篇EI会议论文?有什么要注意的?

都说EI会议论文的发表相对简单一些&#xff0c;但因为EI会议论文的含金量也挺高&#xff0c;因此很多国内外作者都喜爱在EI上投稿论文&#xff0c;那么怎么在国际ei会议发表会议论文呢? 这和国内发表论文都是差不多的&#xff0c;要选择合适的会议&#xff0c;按照会议要求整…

web安全学习笔记【06】——http\https抓包

思维导图放最后 #知识点&#xff1a; 1、Web常规-系统&中间件&数据库&源码等 2、Web其他-前后端&软件&Docker&分配站等 3、Web拓展-CDN&WAF&OSS&反向&负载均衡等 ----------------------------------- 1、APP架构-封装&原生态&…

Textual Inversion、DreamBooth、LoRA、InstantID:从低成本进化到零成本实现IP专属的AI绘画模型

2023年7月份国内有一款定制写真AI工具爆火。一款名为妙鸭相机的AI写真小程序&#xff0c;成功在C端消费者群体中出圈&#xff0c;并在微信、微博和小红书等平台迅速走红&#xff0c;小红书上的话题Tag获得了330多万的浏览量&#xff0c;相关微信指数飙升到了1800万以上。 其他…

【RT-DETR有效改进】2023.12月份最新成果TransNeXt像素聚焦注意力主干(全网首发)

前言 大家好&#xff0c;我是Snu77&#xff0c;这里是RT-DETR有效涨点专栏。 本专栏的内容为根据ultralytics版本的RT-DETR进行改进&#xff0c;内容持续更新&#xff0c;每周更新文章数量3-10篇。 专栏以ResNet18、ResNet50为基础修改版本&#xff0c;同时修改内容也支持Re…

检查字符串数组中的每个字符串是否全为“不显示元素”(如空格、制表符、换行符等)numpy.char.isspace()

【小白从小学Python、C、Java】 【计算机等级考试500强双证书】 【Python-数据分析】 检查字符串数组中的每个字符串 是否全为“不显示元素” &#xff08;如空格、制表符、换行符等&#xff09; numpy.char.isspace() [太阳]选择题 请问以下代码最终输出结果是&#xff1f; i…

RabbitMQ中交换机的应用及原理,案例的实现

目录 一、介绍 1. 概述 2. 作用及优势 3. 工作原理 二、交换机Exchange 1. Direct 2. Topic 3. Fanout 三、代码案例 消费者代码 1. 直连direct 生产者代码 测试 2. 主题topic 生产者代码 测试 3. 扇形fanout 生产者代码 测试 每篇一获 一、介绍 1. …

【前端小点】Vue3中的IP输入框组件

本文章记录,如何在vue3项目开发中,使用ip输入框组件. 之前写过vue2版本的ip组件,为了更好的适应vue3,此次进行vue3代码重写 先上效果图: 禁用效果图: 主要是组件的开发,代码如下,可直接拷贝使用. 大概思路就是: 使用四个输入框拼接,然后给输入内容添加校验操作,添加光标移动,…

05 双向链表

目录 1.双向链表 2.实现 3.OJ题 4.链表和顺序表对比 1. 双向链表 前面写了单向链表&#xff0c;复习一下 无头单向非循环链表&#xff1a;结构简单&#xff0c;一般不会单独用来存数据。实际中更多作为其他数据结构的子结构&#xff0c;如哈希桶、图的邻接等。另外这种结构在…

你知道Mysql的架构吗?

msyql分为server曾和存储引擎层 server层包括了连接器(管理连接&#xff0c;权限验证)、查询缓存&#xff08;命中直接返回结果&#xff09;、分析器&#xff08;词法分析&#xff0c;语法分析&#xff09;、优化器&#xff08;执行计划生成&#xff0c;索引选择&#xff09;、…

浪花 - 查询队伍列表

一、接口设计 1. 请求参数&#xff1a;封装 TeamQuery package com.example.usercenter.model.dto;import com.example.usercenter.common.PageRequest; import lombok.Data;/*** author 乐小鑫* version 1.0* Date 2024-01-22-20:14*/ Data public class TeamQuery extends …

使用Unity创建VisionPro应用

1、下载特定Unity版本 Unity账号需要是Pro账号,普通账号不行,目前只支持这1个Unity版本,不要下载任何其它版本:unityhub://2022.3.11f1/d00248457e15) 其它条件:使用Mac电脑M系列芯片,XCode15 Beta2及以上 参考资料: 苹果官网:苹果官网 Unity官网:Unity官网 官方教程…

C#,生成图片的指定尺寸缩略图的源代码

编程的时候经常用到图像的缩略图。 本文发布一个用于生成指定尺寸的缩略图的简单方法。 1 文本格式 private void button1_Click(object sender, EventArgs e) { CreateThumbnail("demo.jpg", "demo_thumb.jpg", 128, 128); } private void CreateTh…

MySQL函数—日期函数

MySQL函数—日期函数 函数功能CURDATE()返回当前日期&#xff0c;只有年月日CURTIME()返回当前时间&#xff0c;只有时分秒NOW()返回当前日期和时间 年月日时分秒YEAR(date)获取指定date的年份MONTH(date)获取指定date的月份DAY(date)获取指定date的日期DATE_ADD(date,INTERVAL…

项目解决方案: 视频融合(实时监控视频和三维建模进行融合)设计方案

目 录 一、需求描述 1、视频接入和控制要求 2、视频播放需求 3、提供其他应用的调用 二、方案设计 &#xff08;一&#xff09;系统设计图 &#xff08;二&#xff09;产品实现方案 三、产品和功能描述 &#xff08;一&#xff09;总体描述 &#xf…

2024问题汇总

2024问题汇总 Linux1.df-h / df -i 命令2.为多网卡Linux云服务器配置策略路由 Windows1.快速进入控制面板 网络连接指令 Linux 1.df-h / df -i 命令 df -h / df -i 都表示查看磁盘空间使用信息 如果遇到磁盘快满的情况&#xff0c;用这两个命令区别如下 df -h 是去删除比较大 …

Java的异常 Exception

从继承关系可知:Throwable 是异常体系的根&#xff0c;它继承自Object 。Throwable 有两个体系: Error 和Exception. Error表示严重的错误&#xff0c;程序对此一般无能为力,例如: OutOfMemoryError :内存耗尽NoClassDefFoundError :无法加载某个ClassStackOverflowError :虚…

web安全学习笔记【05】——反弹Shell、正反向连接

思维导图 #知识点&#xff1a; 1、Web常规-系统&中间件&数据库&源码等 2、Web其他-前后端&软件&Docker&分配站等 3、Web拓展-CDN&WAF&OSS&反向&负载均衡等 ----------------------------------- 1、APP架构-封装&原生态&H5&am…

软件安全测试的重要性简析,专业安全测试报告如何申请?

在当今数字化时代&#xff0c;软件在我们的日常生活中扮演着至关重要的角色&#xff0c;但也带来了各种潜在的安全威胁。为了保障用户的信息安全和维护软件的可靠性&#xff0c;软件安全测试显得尤为重要。 软件安全测试是指通过一系列的方法和技术&#xff0c;对软件系统中的…