Flink-FinkSQL基本操作(Table API、动态表、事件窗口、分组聚合开窗查询、联结查询)

news2025/1/13 11:51:16

11 Table API和SQL

11.1 快速上手

  1. 引入TableAPI的依赖
  • 桥接器
<dependency>
 <groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-java-bridge_${scala.binary.version}</artifactId>
 <version>${flink.version}</version>
</dependency>

主要通过bridge桥接器进行将Table转换成DataStream

  • 计划器
<dependency>
 <groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner-blink_${scala.binary.version}</artifactId>
 <version>${flink.version}</version>
</dependency>
<dependency>
 <groupId>org.apache.flink</groupId>
 <artifactId>flink-streaming-scala_${scala.binary.version}</artifactId>
 <version>${flink.version}</version>
</dependency>

计划器以及scala流式处理

  1. 上代码
public class SimpleTableExample {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        //1.读取数据,得到DataStream
        SingleOutputStreamOperator<Event> eventStream = env.addSource(new ClickSource())
                .assignTimestampsAndWatermarks(WatermarkStrategy.<Event>forBoundedOutOfOrderness(Duration.ZERO)
                        .withTimestampAssigner(new SerializableTimestampAssigner<Event>() {
                            @Override
                            public long extractTimestamp(Event element, long recordTimestamp) {
                                return element.timestamp;
                            }
                        }));

        //2. 创建表执行环境
        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);

        //3.将DataStream转换成Table
        //POJO类属性当作表的列
        Table eventTable = tableEnv.fromDataStream(eventStream);

        //4.直接写SQL进行转换
        //调用sqlQuery返回的依然是一个表,即表和表之间的
        //from后面一定要带空格,``是用在关键字
        Table resultTable1 = tableEnv.sqlQuery("select user,url,`timestamp` from " + eventTable);

        //5.或者基于Table直接转换
        Table resultTable2 = eventTable.select($("user"), $("url"))
                .where($("user").isEqual("Alice"));

        //6.通过转回流然后打印输出
        tableEnv.toDataStream(resultTable1).print("result1");
        tableEnv.toDataStream(resultTable2).print("result2");

        env.execute();
    }
}

结果

result1> +I[Bob, ./prod?id=100, 1670142197928]
result1> +I[Bob, ./home, 1670142198946]
result1> +I[Alice, ./home, 1670142199959]
result2> +I[Alice, ./home]
result1> +I[Mary, ./fav, 1670142200963]
result1> +I[Bob, ./prod?id=100, 1670142201970]
result1> +I[Bob, ./home, 1670142202972]
result1> +I[Mary, ./home, 1670142203983]
result1> +I[Bob, ./fav, 1670142204988]
result1> +I[Alice, ./home, 1670142205994]
result2> +I[Alice, ./home]
result1> +I[Bob, ./cart, 1670142207000]
result1> +I[Mary, ./cart, 1670142208012]
result1> +I[Bob, ./fav, 1670142209015]
result1> +I[Bob, ./prod?id=100, 1670142210020]
result1> +I[Bob, ./home, 1670142211033]
result1> +I[Alice, ./home, 1670142212036]
result2> +I[Alice, ./home]

11.2 基本API

11.2.1 程序架构

  1. 概述

分为数据源(Source),转换(Transform),输出(Sink)

  1. 流程

在这里插入图片描述

创建表环境

使用executeSql()方法设置输入表和输出表,特殊的是with 'connector’连接器连接外部系统

sqlQuery()传入sql语句对输入表进行处理,也可以使用TableAPI调用from()或者select()方法

得到的处理表调用.executeInsert(),传入输出表输出

11.2.2 创建表环境

  1. 第一种
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
  1. 第二种

在这里插入图片描述

在这里插入图片描述

调用EnvironmentSettings类中的newInstance()方法返回Builder,在Builder类中有build(),返回EnvironmentSettings,同时可以调用

在这里插入图片描述

在这里插入图片描述

将setting传入TableEnvironment类的create()方法中,得到TableEnvironment

代码如下

特点是不依赖StreamExecutionEnvironment流式执行环境

        //1. 定义环境配置来创建表执行环境
        EnvironmentSettings settings = EnvironmentSettings.newInstance()
                .inStreamingMode()//流执行环境
                .useBlinkPlanner()//计划器
                .build();

        TableEnvironment tableEnv = TableEnvironment.create(settings);
  1. 其他(了解)
  • 基于老版本planner进行流处理
        //1.1基于老版本planner进行
        EnvironmentSettings settings1 = EnvironmentSettings.newInstance()
                .inStreamingMode()//流执行环境
                .useOldPlanner()//老版本计划器
                .build();

        TableEnvironment tableEnv1 = TableEnvironment.create(settings1);
  • 基于老版本planner进行批处理
        ExecutionEnvironment batchEnv = ExecutionEnvironment.getExecutionEnvironment();
        BatchTableEnvironment batchTableEnvironment = BatchTableEnvironment.create(batchEnv);
  • 基于Blink版本planner进行批处理
        EnvironmentSettings settings3 = EnvironmentSettings.newInstance()
                .inBatchMode()//批执行环境
                .useBlinkPlanner()
                .build();

        TableEnvironment tableEnv3 = TableEnvironment.create(settings3);

11.2.3 创建表

  1. 概述

表环境会维护一个目录和表的对应关系,拥有唯一的ID,由以下三部分组成:目录,数据库,表名,例如:default_catalog.default_database.MyTable

  1. 创建方式
  • 连接器表

在这里插入图片描述

  • 虚拟表
    在这里插入图片描述

在这里插入图片描述

通过createTemporaryView()创建临时的虚拟表,第一个参数表名称,第二个参数表实例

  • 两者区别是,连接器表是实体的,而虚拟表是虚拟的,只是用到这张表的时候,会将对应的语句嵌入到SQL中

11.2.4 表的查询/输出

  1. 概述

提供了两种查询方式,SQL以及TableAPI

  1. 执行SQL进行查询

写SQL并且调用.sqlQuery()

在这里插入图片描述

除了简单的select,也可以使用count,sun,avg等进行计算

  1. TableAPI

在这里插入图片描述

注册的表传入.from之中
在这里插入图片描述

  1. 代码
  • 总体代码
public class CommonApiTest {
    public static void main(String[] args) {
//        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//        env.setParallelism(1);
//
//        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);

        //1. 定义环境配置来创建表执行环境

        //基于blink版本planner进行流处理
        EnvironmentSettings settings = EnvironmentSettings.newInstance()
                .inStreamingMode()//流执行环境
                .useBlinkPlanner()//计划器
                .build();

        TableEnvironment tableEnv = TableEnvironment.create(settings);

//        //1.1 基于老版本planner进行流处理
//        EnvironmentSettings settings1 = EnvironmentSettings.newInstance()
//                .inStreamingMode()//流执行环境
//                .useOldPlanner()//老版本计划器
//                .build();
//
//        TableEnvironment tableEnv1 = TableEnvironment.create(settings1);
//
//        //1.2 基于老版本planner进行批处理
//        ExecutionEnvironment batchEnv = ExecutionEnvironment.getExecutionEnvironment();
//        BatchTableEnvironment batchTableEnvironment = BatchTableEnvironment.create(batchEnv);
//
//
//        //1.3 基于Blink版本planner进行批处理
//        EnvironmentSettings settings3 = EnvironmentSettings.newInstance()
//                .inBatchMode()//批执行环境
//                .useBlinkPlanner()
//                .build();
//
//        TableEnvironment tableEnv3 = TableEnvironment.create(settings3);


        //2.表的创建
        String createDDL = "CREATE TABLE clickTable (" +
                " user_name STRING, " +
                " url STRING, " +
                " ts BIGINT" +
                " ) WITH (" +
                " 'connector' = 'filesystem'," +
                " 'path' = 'input/clicks.csv',"+
                " 'format' = 'csv'" +
                ")";



        tableEnv.executeSql(createDDL);
        //调用Table的API进行表的查询转换
        Table clickTable = tableEnv.from("clickTable");
        Table resultTable = clickTable.where($("user_name").isEqual("Bob"))
                .select($("user_name"), $("url"));

        //注册成虚拟表
        tableEnv.createTemporaryView("result2",resultTable);

        //执行SQL进行表的查询转换
        Table resultTable2 = tableEnv.sqlQuery("select url,user_name from result2");


        //创建一张用户输出的表

        String createOutDDL = "CREATE TABLE outTable (" +
                " user_name STRING, " +
                " url STRING " +
                " ) WITH (" +
                " 'connector'='filesystem'," +
                " 'path' = 'output'," +
                " 'format' = 'csv'"+
                " )";


        tableEnv.executeSql(createOutDDL);


        //输出表
        resultTable2.executeInsert("outTable");
    }
}

结果

在这里插入图片描述

结果分区很多,因为没有设置并行度,默认就是电脑核数

  • 创建一张用于控制台打印输出的表
        //创建一张用于控制台打印输出的表
        String createPrintOutDDL = "CREATE TABLE printoutTable (" +
                " user_name STRING, " +
                " url STRING " +
                " ) WITH (" +
                " 'connector'='print' " +
                " )";

        tableEnv.executeSql(createPrintOutDDL);

        //输出表
        resultTable2.executeInsert("printoutTable");

结果

3> +I[Bob, ./home]
6> +I[Bob, ./prod?id=100]
8> +I[Bob, ./prod?id=10]
4> +I[Bob, ./cart]
2> +I[Bob, ./prod?id=10]
6> +I[Bob, ./cart]
4> +I[Bob, ./home]
  • 聚合运算
public class CommonApiTest2 {
    public static void main(String[] args) {

        //1. 定义环境配置来创建表执行环境
        //基于blink版本planner进行流处理
        EnvironmentSettings settings = EnvironmentSettings.newInstance()
                .inStreamingMode()//流执行环境
                .useBlinkPlanner()//计划器
                .build();

        TableEnvironment tableEnv = TableEnvironment.create(settings);



        //2.表的创建
        String createDDL = "CREATE TABLE clickTable (" +
                " user_name STRING, " +
                " url STRING, " +
                " ts BIGINT" +
                " ) WITH (" +
                " 'connector' = 'filesystem'," +
                " 'path' = 'input/clicks.csv',"+
                " 'format' = 'csv'" +
                ")";

        tableEnv.executeSql(createDDL);


        //执行聚合计算的查询转换
        Table aggResult = tableEnv.sqlQuery("select user_name,COUNT(url) as cnt " +
                "from clickTable " +
                "group by user_name");




        //创建一张用于控制台打印输出的表
        String createPrintOutDDL = "CREATE TABLE printoutTable (" +
                " user_name STRING, " +
                " cnt BIGINT "+
                " ) WITH (" +
                " 'connector'='print' " +
                " )";

        tableEnv.executeSql(createPrintOutDDL);



        //输出表
        aggResult.executeInsert("printoutTable");
    }
}

结果

8> +I[Bob, 1]			rowkind为+I表示新增
4> +I[Mary, 1]
9> +I[Alice, 1]
8> -U[Bob, 1]			rowkind为-U表示更新前的数据
4> -U[Mary, 1]
8> +U[Bob, 2]			rowkind为-U表示更新后的数据
4> +U[Mary, 2]
8> -U[Bob, 2]
8> +U[Bob, 3]
4> -U[Mary, 2]
4> +U[Mary, 3]
8> -U[Bob, 3]
4> -U[Mary, 3]
4> +U[Mary, 4]
8> +U[Bob, 4]

11.2.5 表和流的转换

  1. 表转换成流
  • 简单的仅仅rowkind为+I的流叫做插入流,直接调用.toToDataStream()转换成流的操作

此时用toChangelogStream()也是没问题的

public class SimpleTableExample {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        //1.读取数据,得到DataStream
        SingleOutputStreamOperator<Event> eventStream = env.addSource(new ClickSource())
                .assignTimestampsAndWatermarks(WatermarkStrategy.<Event>forBoundedOutOfOrderness(Duration.ZERO)
                        .withTimestampAssigner(new SerializableTimestampAssigner<Event>() {
                            @Override
                            public long extractTimestamp(Event element, long recordTimestamp) {
                                return element.timestamp;
                            }
                        }));

        //2. 创建表执行环境
        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);

        //3.将DataStream转换成Table
        //POJO类属性当作表的列
        Table eventTable = tableEnv.fromDataStream(eventStream);

        //4.直接写SQL进行转换
        //调用sqlQuery返回的依然是一个表,即表和表之间的
        //from后面一定要带空格,``是用在关键字
        Table resultTable1 = tableEnv.sqlQuery("select user,url,`timestamp` from " + eventTable);


        //5.通过转回流然后打印输出
        tableEnv.toDataStream(resultTable1).print("result1");


        env.execute();
    }
}

结果

result1> +I[Mary, ./cart, 1670218890037]
result1> +I[Mary, ./fav, 1670218891038]
result1> +I[Mary, ./fav, 1670218892039]
result1> +I[Alice, ./fav, 1670218893040]
result1> +I[Mary, ./fav, 1670218894053]
result1> +I[Bob, ./prod?id=100, 1670218895053]
result1> +I[Bob, ./cart, 1670218896054]
result1> +I[Mary, ./cart, 1670218897054]

  • 基于StreamTableEnvironment环境的.toChangelogStream()
public class SimpleTableExample2 {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        //1.读取数据,得到DataStream
        SingleOutputStreamOperator<Event> eventStream = env.addSource(new ClickSource())
                .assignTimestampsAndWatermarks(WatermarkStrategy.<Event>forBoundedOutOfOrderness(Duration.ZERO)
                        .withTimestampAssigner(new SerializableTimestampAssigner<Event>() {
                            @Override
                            public long extractTimestamp(Event element, long recordTimestamp) {
                                return element.timestamp;
                            }
                        }));

        //2. 创建表执行环境
        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);

        //3.将DataStream转换成Table
        //POJO类属性当作表的列
        Table eventTable = tableEnv.fromDataStream(eventStream);

        //4.聚合转换
            //注册表
        tableEnv.createTemporaryView("clickTable",eventTable);
        Table aggResult = tableEnv.sqlQuery("select user,COUNT(url) as cnt " +
                "from clickTable " +
                "group by user");

        //5.打印输出
        tableEnv.toChangelogStream(aggResult).print("agg");

        env.execute();
    }
}

结果

agg> +I[Bob, 1]
agg> -U[Bob, 1]
agg> +U[Bob, 2]
agg> +I[Alice, 1]
agg> -U[Alice, 1]
agg> +U[Alice, 2]
agg> -U[Bob, 2]
agg> +U[Bob, 3]
  1. 流转换成表

(1)流转成Table对象

  • 流属性转成表列
        Table eventTable = tableEnv.fromDataStream(eventStream);
  • 指定流的字段作为表的列

在这里插入图片描述

  • 甚至可以重命名

在这里插入图片描述

(2)注册表和流转表一起,即调用createTemporaryView(),第一个参数是名字,第二个参数是流,后面参数可以指定传的字段

在这里插入图片描述

(3) 调用fromChangelogStream()方法

即将更新日志流转换成表

  1. 支持的数据类型

流转表需要考虑的较多

(1)原子类型

​ 基础数据类型以及通用数据类型,转换的就是一列,值排排好,类型就是原来的类型

​ 也可以重命名字段名称,详见上面2说的

​ 本质当作了一元组Tuple1处理

(2) Tuple类型

​ 二元组就是两个值分别转成表的第一列以及第二列

​ 转的时候可以取字段,交换字段,重命名字段

在这里插入图片描述
(3) POJO类型也就是复合类型

(4)Row类型

​ 需指定具体类型,以及附加了一个RowKind用来表示更新日志流的数据

· 来个例子了解下

在这里插入图片描述

11.3 流处理中的表

11.3.1 动态表和持续查询

  1. 动态表

因为flink是流式处理,而表是有界的,因此得到的表需要不断的动态变化更新,因此被称为“动态表”。

动态表概念类比关系型数据库“物化视图”,可以用来缓存SQL查询的结果

  1. 持续查询

多个持续查询的连续动作,触发条件就是源源不断来的数据

在这里插入图片描述

11.3.2 用SQL持续查询

  1. 更新查询
  • 数据流到动态表的转换过程

用户点击的每一条数据都是动态表

在这里插入图片描述

  • 表转成数据流

在这里插入图片描述

  1. 追加查询

在这里插入图片描述

没update

  1. 查询限制

11.3.3 动态表编码成数据流

  1. 概念
  • 追加流

仅追加流对应动态表的追加查询

  • 撤回流

撤回流对应动态表的更新查询,具体编码如下:insert->add,delete->retract,update->retract+add

  • 更新插入流

更新插入流:upsert更新插入 delete删除 但是upsert如何区分insert还是update呢?

通过唯一的key去判定,如果有key那就insert,没有的话就更新

11.4 事件属性和窗口

11.4.1 事件时间

  1. 在创建表的DDL中定义
  • ts普通转换

在这里插入图片描述

注意下水位线咋写的

ts直接写上TIMESTAMP(3)类型就可以了,3表示毫秒精度

  • 转成TIMESTAMP_LTZ
    在这里插入图片描述

TIMESTAMP_LTZ是指的时间戳是“年-月-日-时-分-秒”的形式

  • 代码案例
    public static void main(String[] args) throws Exception{
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);

        //1. 在创建表的DDL中直接定义时间属性
        String createDDL = "CREATE TABLE clickTable (" +
                " user_name STRING, " +
                " url STRING, " +
                " ts BIGINT," +
                " et AS TO_TIMESTAMP(FROM_UNIXTIME(ts/1000))" +//TO_TIMESTAMP要String,使用FROM_UNIXTIME转成string传入,ts/1000是秒
                " WATERMARK FOR et AS et - INTERVAL '1' SECOND " +//水位线
                " ) WITH (" +
                " 'connector' = 'filesystem'," +
                " 'path' = 'input/clicks.csv',"+
                " 'format' = 'csv'" +
                ")";
            }
}
  1. 在数据流转换成表的时候定义

在字段最后调用.rowtime()指定成时间属性,ts需要在流中定义好,即使二元组没有,相当于新建一个字段
在这里插入图片描述

在这里插入图片描述

  • 代码案例
public class TimeAndWindowTest {
    public static void main(String[] args) throws Exception{
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);

        //2.在流转换成Table的时候定义时间属性
        SingleOutputStreamOperator<Event> clickStream = env.addSource(new ClickSource())
                .assignTimestampsAndWatermarks(WatermarkStrategy.<Event>forBoundedOutOfOrderness(Duration.ZERO)
                        .withTimestampAssigner(new SerializableTimestampAssigner<Event>() {
                            @Override
                            public long extractTimestamp(Event element, long recordTimestamp) {
                                return element.timestamp;
                            }
                        }));
        Table clickTable = tableEnv.fromDataStream(clickStream, $("user"), $("url"), $("timestamp").as("ts"),
                $("et").rowtime());

        clickTable.printSchema();
    }
}

结果

(
  `user` STRING,
  `url` STRING,
  `ts` BIGINT,
  `et` TIMESTAMP(3) *ROWTIME*
)

11.4.2 处理时间

  1. 在创建表的DDL中定义

在这里插入图片描述

ts是通过as proctime()新建的字段

  1. 在数据流转换成表中定义

在这里插入图片描述

在这里插入图片描述

新增字段调用.proctime()方法

11.4.3 窗口

  1. 分组窗口(旧版本)
  • 分类

TUMBLE(),HOP(),SESSION()滚动/滑动/会话窗口

  • 案例
TUMBLE(ts,INTERBAL '1' HOUR)

TUMBLE_END表示窗口结束的时间

在这里插入图片描述

  1. 窗口表值函数(新版本)

在这里插入图片描述

(1)滚动窗口

TUMBLE(TABLE EventTable, DESCRIPTOR(ts), INTERVAL '1' HOUR)

第一个参数是数据表,第二个是时间戳,第三个间隔时间

(2)滑动窗口

HOP(TABLE EventTable, DESCRIPTOR(ts), INTERVAL '5' MINUTES, INTERVAL '1' HOURS));

第三个滑动步长,第四个参数是窗口大小

(3) 累积窗口

在这里插入图片描述

CUMULATE(TABLE EventTable, DESCRIPTOR(ts), INTERVAL '1' HOURS, INTERVAL '1' DAYS))

11.5 聚合查询

11.5.1 分组聚合

  1. 运用
Table eventCountTable = tableEnv.sqlQuery("SELECT user, COUNT(url) as cnt FROM EventTable GROUP BY user
");
  1. 配置状态的生存时间(TTL)
  • 运用
TableEnvironment tableEnv = ...
// 获取表环境的配置
TableConfig tableConfig = tableEnv.getConfig();
// 配置状态保持时间
tableConfig.setIdleStateRetention(Duration.ofMinutes(60));

或者

TableEnvironment tableEnv = ...
Configuration configuration = tableEnv.getConfig().getConfiguration();
configuration.setString("table.exec.state.ttl", "60 min");
  1. 代码
  • csv数据
Mary,./home,1000
Alice,./cart,2000
Bob,./prod?id=100,3000
Bob,./cart,4000
Bob,./home,5000
Mary,./home,6000
Bob,./cart,7000
Bob,./home,8000
Bob,./prod?id=10,9000
Bob,./prod?id=10,11000
Bob,./prod?id=10,13000
Bob,./prod?id=10,15000
  • 代码
public class TimeAndWindowTest {
    public static void main(String[] args) throws Exception{
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);

        //1. 在创建表的DDL中直接定义时间属性
        String createDDL = "CREATE TABLE clickTable (" +
                " user_name STRING, " +
                " url STRING, " +
                " ts BIGINT," +
                " et AS TO_TIMESTAMP(FROM_UNIXTIME(ts/1000))," +//TO_TIMESTAMP要String,使用FROM_UNIXTIME转成string传入,ts/1000是秒
                " WATERMARK FOR et AS et - INTERVAL '1' SECOND " +//水位线
                " ) WITH (" +
                " 'connector' = 'filesystem'," +
                " 'path' = 'input/clicks.csv',"+
                " 'format' = 'csv'" +
                ") ";

        tableEnv.executeSql(createDDL);

        //2.在流转换成Table的时候定义时间属性
        SingleOutputStreamOperator<Event> clickStream = env.addSource(new ClickSource())
                .assignTimestampsAndWatermarks(WatermarkStrategy.<Event>forBoundedOutOfOrderness(Duration.ZERO)
                        .withTimestampAssigner(new SerializableTimestampAssigner<Event>() {
                            @Override
                            public long extractTimestamp(Event element, long recordTimestamp) {
                                return element.timestamp;
                            }
                        }));
        Table clickTable = tableEnv.fromDataStream(clickStream, $("user"), $("url"), $("timestamp").as("ts"),
                $("et").rowtime());

        //3.聚合查询转换
        //3.1 分组聚合
        Table aggTable = tableEnv.sqlQuery("select user_name,count(url) from clickTable group by user_name");

        //3.2 分组窗口聚合
        Table groupWindowResultTable = tableEnv.sqlQuery("select " +
                " user_name,count(1) as cnt," +
                " TUMBLE_END(et,INTERVAL '10' SECOND) as entT " +
                " from clickTable group by user_name," +
                " TUMBLE(et,INTERVAL '10' SECOND)");


        //clickTable.printSchema();

  • agg结果
agg> +I[Mary, 1]
agg> +I[Alice, 1]
agg> +I[Bob, 1]
agg> -U[Bob, 1]
agg> +U[Bob, 2]
agg> -U[Bob, 2]
agg> +U[Bob, 3]
agg> -U[Mary, 1]
agg> +U[Mary, 2]
agg> -U[Bob, 3]
agg> +U[Bob, 4]
agg> -U[Bob, 4]
agg> +U[Bob, 5]
agg> -U[Bob, 5]
agg> +U[Bob, 6]
agg> -U[Bob, 6]
agg> +U[Bob, 7]
agg> -U[Bob, 7]
agg> +U[Bob, 8]
agg> -U[Bob, 8]
agg> +U[Bob, 9]
  • group window结果
group window:> +I[Mary, 2, 1970-01-01T08:00:10]
group window:> +I[Alice, 1, 1970-01-01T08:00:10]
group window:> +I[Bob, 6, 1970-01-01T08:00:10]
    
group window:> +I[Bob, 3, 1970-01-01T08:00:20]
表示Mary在第一个十秒之内有2次点击,Bob6次,Alice1次
在第二个窗口十秒中,Bob3

11.5.2 窗口聚合

  1. 代码
public class TimeAndWindowTest {
    public static void main(String[] args) throws Exception{
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);

        //1. 在创建表的DDL中直接定义时间属性
        String createDDL = "CREATE TABLE clickTable (" +
                " user_name STRING, " +
                " url STRING, " +
                " ts BIGINT," +
                " et AS TO_TIMESTAMP(FROM_UNIXTIME(ts/1000))," +//TO_TIMESTAMP要String,使用FROM_UNIXTIME转成string传入,ts/1000是秒
                " WATERMARK FOR et AS et - INTERVAL '1' SECOND " +//水位线
                " ) WITH (" +
                " 'connector' = 'filesystem'," +
                " 'path' = 'input/clicks.csv',"+
                " 'format' = 'csv'" +
                ") ";

        tableEnv.executeSql(createDDL);

        //2.在流转换成Table的时候定义时间属性
        SingleOutputStreamOperator<Event> clickStream = env.addSource(new ClickSource())
                .assignTimestampsAndWatermarks(WatermarkStrategy.<Event>forBoundedOutOfOrderness(Duration.ZERO)
                        .withTimestampAssigner(new SerializableTimestampAssigner<Event>() {
                            @Override
                            public long extractTimestamp(Event element, long recordTimestamp) {
                                return element.timestamp;
                            }
                        }));
        Table clickTable = tableEnv.fromDataStream(clickStream, $("user"), $("url"), $("timestamp").as("ts"),
                $("et").rowtime());


        //4. 窗口聚合
        //4.1 滚动窗口
        Table tumbleWindowResultTable = tableEnv.sqlQuery("select user_name,count(1) as cnt," +
                "window_end as endT " +
                "from TABLE(" +
                "TUMBLE(TABLE clickTable,DESCRIPTOR(et),INTERVAL '10' SECOND)) " +
                "GROUP BY user_name,window_end,window_start");

        //4.2 滑动窗口
        Table hopWindowResultTable = tableEnv.sqlQuery("select user_name,count(1) as cnt," +
                "window_end as endT " +
                "from TABLE(" +
                "HOP(TABLE clickTable,DESCRIPTOR(et),INTERVAL '5' SECOND,INTERVAL '10' SECOND)) " +
                "GROUP BY user_name,window_end,window_start");

        //4.2 累积窗口
        Table cumulateResultTable = tableEnv.sqlQuery("select user_name,count(1) as cnt," +
                "window_end as endT " +
                "from TABLE(" +
                "CUMULATE(TABLE clickTable,DESCRIPTOR(et),INTERVAL '5' SECOND,INTERVAL '10' SECOND)) " +
                "GROUP BY user_name,window_end,window_start");


        tableEnv.toChangelogStream(tumbleWindowResultTable).print("tumble window:");
        tableEnv.toChangelogStream(hopWindowResultTable).print("hop window:");
        tableEnv.toChangelogStream(cumulateResultTable).print("cumulate window:");

        env.execute();
    }
}
  • tumble window结果
tumble window:> +I[Mary, 2, 1970-01-01T08:00:10]
tumble window:> +I[Alice, 1, 1970-01-01T08:00:10]
tumble window:> +I[Bob, 6, 1970-01-01T08:00:10]
tumble window:> +I[Bob, 3, 1970-01-01T08:00:20]
  • hop window 结果
-5~5
hop window:> +I[Mary, 1, 1970-01-01T08:00:05]
hop window:> +I[Alice, 1, 1970-01-01T08:00:05]
hop window:> +I[Bob, 2, 1970-01-01T08:00:05]
0~10    
hop window:> +I[Alice, 1, 1970-01-01T08:00:10]
hop window:> +I[Mary, 2, 1970-01-01T08:00:10]
hop window:> +I[Bob, 6, 1970-01-01T08:00:10]
5~15    
hop window:> +I[Mary, 1, 1970-01-01T08:00:15]
hop window:> +I[Bob, 6, 1970-01-01T08:00:15]
10~20    
hop window:> +I[Bob, 3, 1970-01-01T08:00:20]
15~25    
hop window:> +I[Bob, 1, 1970-01-01T08:00:25]
  • cumulate window
0~5
cumulate window:> +I[Mary, 1, 1970-01-01T08:00:05]
cumulate window:> +I[Alice, 1, 1970-01-01T08:00:05]
cumulate window:> +I[Bob, 2, 1970-01-01T08:00:05]
0~10    
cumulate window:> +I[Alice, 1, 1970-01-01T08:00:10]
cumulate window:> +I[Mary, 2, 1970-01-01T08:00:10]
cumulate window:> +I[Bob, 6, 1970-01-01T08:00:10]
10~15    
cumulate window:> +I[Bob, 2, 1970-01-01T08:00:15]
10~20    
cumulate window:> +I[Bob, 3, 1970-01-01T08:00:20]

11.5.3 开窗(Over) 聚合

  1. 语法
SELECT
 <聚合函数> OVER (
 [PARTITION BY <字段 1>[, <字段 2>, ...]]
 ORDER BY <时间属性字段>
 <开窗范围>),
 ...
FROM ...
SELECT user, ts,
 COUNT(url) OVER w AS cnt,
 MAX(CHAR_LENGTH(url)) OVER w AS max_url
FROM EventTable
WINDOW w AS (
 PARTITION BY user
 ORDER BY ts
 ROWS BETWEEN 2 PRECEDING AND CURRENT ROW)
  1. 开窗范围
  • 范围间隔
RANGE BETWEEN INTERVAL '1' HOUR PRECEDING AND CURRENT ROW
  • 行范围
ROWS BETWEEN 5 PRECEDING AND CURRENT ROW
  1. 代码
  • csv数据
Mary,./home,1000
Alice,./cart,2000
Bob,./prod?id=100,3000
Bob,./cart,4000
Bob,./home,5000
Mary,./home,6000
Bob,./cart,7000
Bob,./home,8000
Bob,./prod?id=10,9000
Bob,./prod?id=10,11000
Bob,./prod?id=10,13000
Bob,./prod?id=10,15000
  • 代码
public class TimeAndWindowTest {
    public static void main(String[] args) throws Exception{
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);

        //1. 在创建表的DDL中直接定义时间属性
        String createDDL = "CREATE TABLE clickTable (" +
                " user_name STRING, " +
                " url STRING, " +
                " ts BIGINT," +
                " et AS TO_TIMESTAMP(FROM_UNIXTIME(ts/1000))," +//TO_TIMESTAMP要String,使用FROM_UNIXTIME转成string传入,ts/1000是秒
                " WATERMARK FOR et AS et - INTERVAL '1' SECOND " +//水位线
                " ) WITH (" +
                " 'connector' = 'filesystem'," +
                " 'path' = 'input/clicks.csv',"+
                " 'format' = 'csv'" +
                ") ";

        tableEnv.executeSql(createDDL);

        //2.在流转换成Table的时候定义时间属性
        SingleOutputStreamOperator<Event> clickStream = env.addSource(new ClickSource())
                .assignTimestampsAndWatermarks(WatermarkStrategy.<Event>forBoundedOutOfOrderness(Duration.ZERO)
                        .withTimestampAssigner(new SerializableTimestampAssigner<Event>() {
                            @Override
                            public long extractTimestamp(Event element, long recordTimestamp) {
                                return element.timestamp;
                            }
                        }));
        Table clickTable = tableEnv.fromDataStream(clickStream, $("user"), $("url"), $("timestamp").as("ts"),
                $("et").rowtime());


        //开窗聚合over
        Table overWindowResultTable = tableEnv.sqlQuery("select user_name,avg(ts) " +
                "OVER(" +
                "PARTITION BY user_name " +
                "ORDER BY et " +
                "ROWS BETWEEN 3 PRECEDING AND CURRENT ROW" +
                ") AS avg_ts " +
                "FROM clickTable");



        tableEnv.toChangelogStream(overWindowResultTable).print("over window:");

        env.execute();
    }
}

结果

over window:> +I[Mary, 1000]
over window:> +I[Alice, 2000]
over window:> +I[Bob, 3000]
over window:> +I[Bob, 3500]3000+4000/2
over window:> +I[Bob, 4000]3000+4000+5000/3
over window:> +I[Mary, 3500]		
over window:> +I[Bob, 4750]3000+4000+5000+7000/4
over window:> +I[Bob, 6000]4000+5000+7000+8000/4			
over window:> +I[Bob, 7250]
over window:> +I[Bob, 8750]
over window:> +I[Bob, 10250]
over window:> +I[Bob, 12000]

11.5.4 TopN案例

  1. 普通TopN
  • 语法
SELECT ...
FROM (
 SELECT ...,
343
 ROW_NUMBER() OVER (
[PARTITION BY <字段 1>[, <字段 1>...]]
 ORDER BY <排序字段 1> [asc|desc][, <排序字段 2> [asc|desc]...]
) AS row_num
 FROM ...)
WHERE row_num <= N [AND <其它条件>]

  • 案例

统计字符长度排序的前两个url

SELECT user, url, ts, row_num
FROM (
 SELECT *,
 ROW_NUMBER() OVER (
PARTITION BY user
 ORDER BY CHAR_LENGTH(url) desc 
) AS row_num
 FROM EventTable)
WHERE row_num <= 2
  1. 运用
  • 统计浏览次数最多的用户
public class TopNExample {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);

        //1. 在创建表的DDL中直接定义时间属性
        String createDDL = "CREATE TABLE clickTable (" +
                " `user` STRING, " +
                " url STRING, " +
                " ts BIGINT," +
                " et AS TO_TIMESTAMP(FROM_UNIXTIME(ts/1000))," +//TO_TIMESTAMP要String,使用FROM_UNIXTIME转成string传入,ts/1000是秒
                " WATERMARK FOR et AS et - INTERVAL '1' SECOND " +//水位线
                " ) WITH (" +
                " 'connector' = 'filesystem'," +
                " 'path' = 'input/clicks.csv',"+
                " 'format' = 'csv'" +
                ") ";
        tableEnv.executeSql(createDDL);

        //普通TopN,选取当前用户中浏览量最大的两个
        Table topNResultTable = tableEnv.sqlQuery
                ("SELECT user,cnt,row_num " +
                        "FROM (" +
                        " SELECT *,ROW_NUMBER() OVER (ORDER BY cnt DESC) AS row_num " +
                        "FROM (" +
                            "SELECT user,COUNT(url) AS cnt " +
                            "FROM clickTable GROUP BY user" +
                        	")" +
                        ") " +
                   "WHERE row_num <= 2");

        tableEnv.toChangelogStream(topNResultTable).print("top2:");

        env.execute();
    }
}

结果

top2:> +I[Mary, 1, 1]
top2:> +I[Alice, 1, 2]
top2:> -U[Mary, 1, 1]
top2:> +U[Bob, 2, 1]
top2:> -U[Alice, 1, 2]
top2:> +U[Mary, 1, 2]
top2:> -U[Bob, 2, 1]
top2:> +U[Bob, 3, 1]
top2:> -U[Mary, 1, 2]
top2:> +U[Mary, 2, 2]
top2:> -U[Bob, 3, 1]
top2:> +U[Bob, 4, 1]
top2:> -U[Bob, 4, 1]
top2:> +U[Bob, 5, 1]
top2:> -U[Bob, 5, 1]
top2:> +U[Bob, 6, 1]
top2:> -U[Bob, 6, 1]
top2:> +U[Bob, 7, 1]
top2:> -U[Bob, 7, 1]
top2:> +U[Bob, 8, 1]
top2:> -U[Bob, 8, 1]
top2:> +U[Bob, 9, 1]

Process finished with exit code 0

  • 统计一段时间内有最多访问行为的用户
public class TopNExample {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);

        //1. 在创建表的DDL中直接定义时间属性
        String createDDL = "CREATE TABLE clickTable (" +
                " `user` STRING, " +
                " url STRING, " +
                " ts BIGINT," +
                " et AS TO_TIMESTAMP(FROM_UNIXTIME(ts/1000))," +//TO_TIMESTAMP要String,使用FROM_UNIXTIME转成string传入,ts/1000是秒
                " WATERMARK FOR et AS et - INTERVAL '1' SECOND " +//水位线
                " ) WITH (" +
                " 'connector' = 'filesystem'," +
                " 'path' = 'input/clicks.csv',"+
                " 'format' = 'csv'" +
                ") ";
        tableEnv.executeSql(createDDL);
        
        //窗口TopN,统计一段时间内的(前两名)活跃用户

        String subQuery = "SELECT user,COUNT(url) AS cnt,window_start,window_end " +
                            "FROM TABLE(TUMBLE(TABLE clickTable,DESCRIPTOR(et),INTERVAL '10' SECOND))" +
                            " GROUP BY user,window_start,window_end";
        Table windowTopNResultTable = tableEnv.sqlQuery("SELECT user,cnt,row_num,window_end " +
                        "FROM (" +
                            " SELECT *,ROW_NUMBER() OVER (" +
                            "PARTITION BY window_start,window_end " +
                            "ORDER BY cnt DESC) AS row_num " +
                            "FROM (" + subQuery +" ) )" +
                        "WHERE row_num <= 2");
        tableEnv.toDataStream(windowTopNResultTable).print("window top n:");

        env.execute();
    }
}

结果

window top n:> +I[Bob, 6, 1, 1970-01-01T08:00:10]
window top n:> +I[Mary, 2, 2, 1970-01-01T08:00:10]
window top n:> +I[Bob, 3, 1, 1970-01-01T08:00:20]

11.6 联结查询

11.6.1 常规联结查询

  1. 等值内连接
  2. 等值外连接

这部分跟标准SQL一样呢,就不赘述啦

11.6.2 间隔联结查询

1.时间间隔限制

ltime BETWEEN rtime - INTERVAL '10' SECOND AND rtime + INTERVAL '5' SECOND

案例

SELECT *
FROM Order o, Shipment s
WHERE o.id = s.order_id
AND o.order_time BETWEEN s.ship_time - INTERVAL '4' HOUR AND s.ship_time

在流处理中,间隔联结查询只支持具有时间属性的“仅追加”(Append-only)表。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/181812.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

12、常用插件

文章目录12、常用插件推荐1&#xff1a;Alibaba Java Coding Guidelines推荐2&#xff1a;jclasslib bytecode viewer推荐3&#xff1a;Translation推荐4&#xff1a;GenerateAllSetter推荐5&#xff1a;Rainbow Brackets推荐6&#xff1a;CodeGlance Pro推荐7&#xff1a;Stat…

7.bWAPP -- INSECURE DIRECT OBJECT REFERENCES

7.bWAPP – INSECURE DIRECT OBJECT REFERENCES 0x01、Insecure DOR (Change Secret) 同 XSS - Stored (Change Secret) Low 仔细观察页面, 发现隐藏一个input标签, 作用是输入用户名, 并且配合提交的修改密码, 完成修改用户密码的操作: 这里就可以利用该用户名input标签达…

如何带好一个团队?团队管理的要点有哪些?

想带好一个团队并不是这么容易&#xff0c;尤其是对于新晋升管理者来说更是难上加难。团队管理可以大大提高工作效率。那么&#xff0c;团队管理的要点是什么呢&#xff1f; 1、远景和目标 成员们先要有一个共同的目标&#xff0c;在此基础上还必须要有一个好的愿景&#xff0…

即时通讯系列---如何下手做技术方案设计

1. 引出主题 IM整体涉及的内容比较多, 做技术方案设计需要慎重, 可以先从功能列表以及核心case逐步的总结出技术方案 本文结构: 1. 查看功能列表 2. 核心case分析 3. 总结技术方案设计 2. 如何做技术方案设计 1. 查看功能列表 功能清单 一级分类 二级分类 三级分类…

TCP/UDP网络编程

目录 一、常见的客户端服务端模型 二、Socket套接字 1、概念 2、分类 a、流套接字 b、数据报套接字 c、原始套接字 三、UDP数据报套接字编程 四、TCP数据报套接字编程 一、常见的客户端服务端模型 客户端&#xff1a;用户使用的程序。 服务端&#xff1a;给用户提…

miracl编译及使用

文章目录Windows平台编译网址 https://miracl.com/https://github.com/miracl/MIRACL Windows平台编译 源码目录下新建文件夹ms32或ms64&#xff0c;把/lib/ms32doit.bat或ms64doit.bat分别拷进去。 把源码include和source目录所有文件拷贝进要编译的ms32或ms64&#xff0c…

【高阶数据结构】海量数据如何处理? (位图 布隆过滤器)

&#x1f308;欢迎来到高阶数据结构专栏~~位图 & 布隆过滤器 (꒪ꇴ꒪(꒪ꇴ꒪ )&#x1f423;,我是Scort目前状态&#xff1a;大三非科班啃C中&#x1f30d;博客主页&#xff1a;张小姐的猫~江湖背景快上车&#x1f698;&#xff0c;握好方向盘跟我有一起打天下嘞&#xff0…

模拟实现list / list迭代器

前言&#xff1a;学习C的STL&#xff0c;我们不仅仅要求自己能够熟练地使用各种接口&#xff0c;我们还必须要求自己了解一下其底层的实现方法&#xff0c;这样可以帮助我们写出比较高效的代码程序&#xff01; ⭐在本篇文章中&#xff0c;list的迭代器是重点&#xff0c;它不…

WSL2配置网络代理

注意&#xff1a;本文参考自文章&#xff1a;WSL2配置代理&#xff0c;是对原文的补充&#xff0c;使其适用于河对岸云服务代理。 1 开启Windows代理 1.1 开启代理软件的局域网访问权限 请注意&#xff1a;本文的WSL2代理配置&#xff0c;需要Windows的代理软件已经能够正常…

HTTPS详解及HTTPS实验

目录 HTTPS 一&#xff0c;https在参考模型中的位置 二&#xff0c;什么是HTTPS 三&#xff0c;什么是SSL 1&#xff0c;SSL 协议分为两层&#xff1a; 2&#xff0c;SSL 协议提供的服务&#xff1a; 四&#xff0c;HTTPS的加密方式 1&#xff0c;常见的加密算法 2&#xff0c;…

mysql知识点

目录 1.mysql聚合函数&#xff1a; 2.having&#xff08;用来过滤数据&#xff09;&#xff1a; HAVING 不能单独使用&#xff0c;必须要跟 GROUP BY 一起使用 WHERE 与 HAVING 的对比 3.升序和降序 4.等于 5.实战demo&#xff1a; 1.mysql聚合函数&#xff1a; 常用的聚…

codeforces签到题之div3

前言 第一次&#xff43;&#xff4f;&#xff44;&#xff45;&#xff46;&#xff4f;&#xff52;&#xff43;&#xff45;&#xff53;&#xff0c;发现几个问题&#xff1a; 1,不知道选&#xff4c;&#xff41;&#xff4e;&#xff47;&#xff55;&#xff41;&…

17正交距阵和Gram-Schmidt正交化

标准正交向量与正交矩阵 上一节介绍过的正交向量&#xff0c;通过一个式子进行回顾&#xff0c;设q是标准正交向量组中的任意向量&#xff0c;则 这很好地表现了标准正交向量组内各向量的性质&#xff1a; 不同向量之间相互垂直&#xff08;正交&#xff09;&#xff0c;向量…

Ribbon 负载均衡

介绍Spring Cloud Ribbon是基于Netflix Ribbon实现的一套客户端负载均衡的工具。Ribbon是Netflix发布的开源项目&#xff0c;主要功能是提供客户端的软件负载均衡算法和服务调用。Ribbon客户端组件提供一系列完善的配置项如连接超时&#xff0c;重试等。简单的说&#xff0c;就…

屏幕录制软件推荐,分享这3款,简单好用

​网络上充斥着许多各种各样的屏幕录制软件&#xff0c;许多有选择困难的朋友可能会充满怀疑&#xff1a;哪个电脑屏幕录制软件很容易使用&#xff1f;屏幕录制软件推荐哪个比较好&#xff1f;别担心&#xff0c;今天&#xff0c;小编分享这这3个简单好用的屏幕录制软件&#x…

Day10 C++STL入门基础知识七——案例1【评委打分】

路漫漫其修远兮&#xff0c;吾将上下而求索 文章目录1. 承接上文1. 案例描述2. 实现思路3. 亿点点分析3.1 创建选手类3.1.1 具体思路3.1.2 代码展示3.2 创建5名选手并对其姓名、平均分进行初始化3.2.1 具体思路① 创建vector容器② 创建一个creatPlayer()函数a.调用函数b. 初始…

若依框架基于@PreAuthorize注解的权限控制

目录 一、Java注解&#xff08;Annotation&#xff09; 1. 概述 2. Annotation通用定义 &#xff08;1&#xff09;interface &#xff08;2&#xff09;Documented &#xff08;3&#xff09;Target(ElementType.TYPE) &#xff08;4&#xff09;Retention(Ret…

IDEA插件

Lombok用注解的方式&#xff0c;简化了 JavaBean 的编写。注解下面介绍一下常用的几个注解&#xff1a;Setter 注解在类或字段&#xff0c;注解在类时为所有字段生成setter方法&#xff0c;注解在字段上时只为该字段生成setter方法。Getter 使用方法同上&#xff0c;区别在于生…

java基于ssm电梯服务管理信息系统的设计与实现源码+数据库

基于ssm电梯服务管理信息系统的设计与实现 技术支持 开发软件&#xff1a;Eclipse 项目类型&#xff1a;Webapp 数据库&#xff1a;MySQL 数据库连接池&#xff1a;druid 框架&#xff1a;SSM 数据库设计软件&#xff1a;PowerDesigner 前端界面开发&#xff1a;HTML/CSS…

Maven介绍

Maven介绍1、Maven的简单介绍2、Maven的优点3、Maven的基本知识3.1、Maven如何获取Jar包3.2、Maven仓库的分类4、Idea中的maven4.1、clean4.2、validate4.3、compile4.4、test&#xff08;不常用&#xff09;4.5、package4.6、verify&#xff08;不常用&#xff09;4.7、instal…