Flink系列Table API和SQL之:滚动窗口、滑动窗口、累计窗口、分组聚合

news2024/9/24 5:31:33

Flink系列Table API和SQL之:窗口

  • 一、窗口(Window)
  • 二、分组窗口(Group Window)
  • 三、窗口表值函数(Windowing TVFs)
    • 1.滚动窗口(TUMBLE)
    • 2.滑动窗口(HOP)
    • 3.累计窗口(CUMULATE)
  • 四、分组聚合
  • 五、分组聚合实现代码
  • 六、分组窗口聚合代码实现
  • 七、窗口聚合:滚动窗口
  • 八、窗口聚合:滑动窗口
  • 九、窗口聚合:累积窗口

一、窗口(Window)

有了时间属性,接下来就可以定义窗口进行计算了。窗口可以将无界流切割成大小有限的桶(bucket)来做计算,通过截取有限数据集来处理无限的流数据。在DataStream API中提供了对不同类型的窗口进行定义和处理的接口,而在Table API和SQL中,类似的功能也都可以实现。

二、分组窗口(Group Window)

在Flink 1.12之前的版本中,Table API和SQL提供了一组分组窗口(Group Window)函数,常用的时间窗口如滚动窗口、滑动窗口、会话窗口都有对应的实现。具体在SQL中就是调用TUMBLE()、HOP()、SESSION(),传入时间属性字段、窗口大小等参数就可以了。以滚动窗口为例:

TUMBLE(ts,INTERVAL '1' HOUR)

这里的ts是定义好的时间属性字段,窗口大小用时间间隔 INTERVAL来定义。
在进行窗口计算时,分组窗口是将窗口本身当作一个字段对数据进行分组的,可以对组内的数据进行聚合。基本使用方式如下:

Table result = tableEnv.sqlQuery(
	"SELECT " +
	"user, " +
	"TUMBLE_END(ts,INTERVAL '1' HOUR) as endT, " +
	"COUNT(url) AS cnt " +
	"FROM EventTable " +
	"GROUP BY "+ //使用窗口和用户名进行分组
	"user, " +
	"TUMBLE(ts,INTERVAL '1' HOUR)" //定义1小时滚动窗口
);

这里定义了1小时的滚动窗口,将窗口和用户user一起作为分组的字段。用聚合函数COUNT()对分组数据的个数进行了聚合统计,并将结果字段重命名为cnt。用TUPMBLE_END()函数获取滚动窗口的结束时间,重命名为endT提取出来。

分组窗口的功能比较有限,只支持窗口聚合,所以目前已经处于弃用deprecated的状态。

三、窗口表值函数(Windowing TVFs)

Flink开始使用窗口表值函数(Windowing table-valued functions,Windowing TVFs)来定义窗口。窗口表值函数是Flink定义的多表函数(PTF),可以将表进行扩展后返回。表函数(table function)可以看作是返回一个表的函数。

目前Flink提供了以下几个窗口TVF:

  • 滚动窗口(Tumbling Windows)
  • 滑动窗口(Hop Windows,跳跃窗口)
  • 累积窗口(Cumulate Windows)
  • 会话窗口(Session Windows)

窗口表值函数可以完全替代传统的分组窗口函数,窗口TVF更符合SQL标准,性能得到了优化,拥有更强大的功能。可以支持基于窗口的复杂计算,例如窗口Top-N、窗口联结(window join)等。

在窗口TVF的返回值中,除去原始表中的所有列,还增加了用来描述窗口的额外3个列。窗口起始点(window_start)、窗口结束点(window_end)、窗口时间(window_time)。起始点和结束点比较好理解,窗口时间指的是窗口中的时间属性,它的值等于window_end - 1ms,所以相当于是窗口能够包含数据的最大时间戳。

在SQL中的声明方式,与以前的分组窗口是类似的,直接调用TUMBLE()、HOP()、CUMULATE()就可以实现滚动、滑动和累积窗口,不过传入的参数会有所不同。

分别对这几种窗口TVF进行介绍

1.滚动窗口(TUMBLE)

滚动窗口中SQL中的概念与DataStream API中的定义完全一样,是长度固定、时间对齐、无重叠的窗口,一般用于周期性的统计计算。

在SQL中通过调用TUMBLE()函数就可以声明一个滚动窗口,只有一个核心参数就是窗口大小(size)。在SQL中不考虑计数窗口,所以滚动窗口就是滚动时间窗口,参数重还需要将当前的时间属性字段传入,另外,窗口TVF本质上是表函数,可以对表进行扩展,所以还应该把当前查询的表昨晚参数整体传入。具体声明如下:

TUMBLE(TABLE EventTable,DESCRIPTOR(ts),INTERVAL '1' HOUR)

这里基于时间字段ts,对表EventTable中的数据开了大小1小时的滚动窗口。窗口会将表中的每一行数据,按照它们ts的值分配到一个指定的窗口中。

2.滑动窗口(HOP)

滑动窗口的使用与滚动窗口类似,可以通过设置滑动步长来控制统计输出的频率。在SQL中通过调用HOP()来声明滑动窗口,除了也要传入表名、时间属性外,还需要传入窗口大小(size)和滑动步长(slide)两个参数。

HOP(TABLE EventTable,DESCRIPTOR(ts),INTERVAL '5' MINUTES,INTERVAL '1' HOURS);
  • 基于时间属性ts,在表EventTable上创建了大小为1小时的滑动窗口,每5分钟滑动一次。需要注意的是,紧跟在时间属性字段后面的第三个参数是步长slide,第四个参数才是窗口大小size。

3.累计窗口(CUMULATE)

滚动窗口和滑动窗口,可以用来计算大多数周期性的统计指标。不过在实际应用中还会遇到这样一类需求:统计周期可能较长,希望中间每隔一段时间就输出一次当前的统计值。与滑动窗口不同的是,在一个统计周期内,会多次输出统计值,应该是不断叠加累积的。

例如:按天来统计网站的PV(Page View,页面浏览量),如果用1天的滚动窗口,那需要到每天24点才会计算一次,输出频率太低。如果用滑动窗口,计算频率可以更高,但统计的就变成了过去24小时的PV。所以真正希望的是,还是按照自然日统计每天的PV,不过需要每隔1小时就输出一次当天到目前为止的PV值。这种特殊的窗口就叫作累积窗口(Cumulate Window)。

在这里插入图片描述

累积窗口是窗口TVF中新增的窗口功能,它会在一定的统计周期内进行累积计算。累积窗口中有两个核心的参数:最大窗口长度(max window size)和累积步长(step)。

最大窗口长度其实就是我们所说的统计周期,最终目的就是统计这段时间内的数据。

开始时,创建的第一个窗口大小就是步长step。之后的每个窗口都会在之前的基础上再扩展step的长度,直到达到最大窗口长度。在SQL中可以用CUMULATE()函数来定义,具体如下:

CUMULATE(TABLE EventTable,DESCRIPTOR(ts),INTERVAL '1' HOURS,INTERVAL '1' DAYS)

基于时间属性ts,在表EventTable上定义了一个统计周期为1天、累计步长伟1小时的累积窗口。注意第三个参数为步长step,第四个参数则是最大窗口长度。

上面所有的语句只是定义了窗口,类似于DataStream API中的窗口分配器。在SQL中窗口的完整调用,还需要配合聚合操作和其他操作。

四、分组聚合

在SQL中,一个很常见的功能就是对某一列的多条数据做一个合并统计,得到一个或多个结果值:比如求和、最大最小值、平均值等,这种操作叫做聚合查询。Flink中的SQL是流处理与标准SQL结合的产物,所以聚合查询也可以分成两种:流处理中特有的聚合(主要指窗口聚合),以及SQL原生的聚合查询方式。

分组聚合:

SQL中一般所说的聚合,主要是通过内置的一些聚合函数来实现的,比如SUM()、MAX()、MIN()、AVG()以及COUNT()。特点是对多条输入数据进行计算,得到一个唯一的值,属于多对一的转换。比如可以通过下面的代码计算输入数据的个数。

Table eventCountTable = tableEnv.sqlQuery("select COUNT(*) from EventTable");

更多的情况下,可以通过GROUP BY子句来指定分组的键(key),从而对数据按照某个字段做一个分组统计,例如按照用户名进行分组,统计每个用户点击url的次数:

SELECT user,COUNT(url) as cnt FROM EventTable GROUP BY user

这种聚合方式,就叫做分组聚合(group aggregation)。从概念上讲,SQL中的分组聚合可以对应DataStream API中keyBy之后的聚合转换,都是按照某个key对数据进行了划分,各自维护状态来进行聚合统计。在流处理中,分组聚合同样是一个持续查询,而且是一个更新查询,得到的是一个动态表。每当流中有一个新的数据到来时,都会导致结果表的更新操作。因此,想要将结果表转换成流或输出到外部系统,必须采用撤回流或者更新插入流(upsert stream)的编码方式。如果在代码中直接转换成DataStream打印输出,需要调用toChangelogStream()。

另外在持续查询的过程中,由于用于分组的key可能会不断增加,因此计算结果所需要维护的状态也会持续增长。为了防止状态无限增长耗尽资源,Flink Table API和SQL可以在表环境中配置状态的生存时间(TTL):

TableEnvironment tableEnv = ...

//获取表环境的配置
TableConfig tableConfig = tableEnv.getConfig();

//配置状态保持时间
tableConfig.setIdleStateRetention(Duration.ofMinutes(60));

或者也可以直接设置配置项table.exec.state.ttl

TableEnvironment tableEnv = ...
Configuration configuration = tableEnv.getConfig().getConfiguration();
configuration.setString("table.exec.state.ttl","60 min");

这两种方式是等效的,需要注意,配置TTL有可能会导致统计结果不准确,其实是以

五、分组聚合实现代码

import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

import java.time.Duration;

import static org.apache.flink.table.api.Expressions.$;

/**
 * Copyright (c) 2020-2030 尚硅谷 All Rights Reserved
 * <p>
 * Project:  FlinkTutorial
 * <p>
 * Created by  wushengran
 */

public class TimeAndWindowTest {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);

        // 1. 在创建表的DDL中直接定义时间属性
        String createDDL = "CREATE TABLE clickTable (" +
                " user_name STRING, " +
                " url STRING, " +
                " ts BIGINT, " +
                " et AS TO_TIMESTAMP( FROM_UNIXTIME(ts / 1000) ), " +
                " WATERMARK FOR et AS et - INTERVAL '1' SECOND " +
                ") WITH (" +
                " 'connector' = 'filesystem', " +
                " 'path' = '/Users/fei.yang4/project/learn/src/main/java/com/bigdata/plus/flink/input/clicks.csv', " +
                " 'format' =  'csv' " +
                ")";

        tableEnv.executeSql(createDDL);

        // 2. 在流转换成Table时定义时间属性
        SingleOutputStreamOperator<Event> clickStream = env.addSource(new ClickSource())
                .assignTimestampsAndWatermarks(WatermarkStrategy.<Event>forBoundedOutOfOrderness(Duration.ZERO)
                        .withTimestampAssigner(new SerializableTimestampAssigner<Event>() {
                            @Override
                            public long extractTimestamp(Event event, long l) {
                                return event.timestamp;
                            }
                        }));

        Table clickTable = tableEnv.fromDataStream(clickStream, $("user"), $("url"), $("timestamp").as("ts"),
                $("et").rowtime());

        clickTable.printSchema();

        // 聚合查询转换

        // 1. 分组聚合
        Table aggTable = tableEnv.sqlQuery("SELECT user_name, COUNT(1) FROM clickTable GROUP BY user_name");

        // 结果表转换成流打印输出
        tableEnv.toChangelogStream(aggTable).print("agg: ");
        env.execute();
    }
}
(
  `user` STRING,
  `url` STRING,
  `ts` BIGINT,
  `et` TIMESTAMP(3) *ROWTIME*
)
agg: > +I[Mary, 1]
agg: > +I[Bob, 1]
agg: > +I[Alice, 1]
agg: > -U[Bob, 1]
agg: > +U[Bob, 2]
agg: > -U[Alice, 1]
agg: > +U[Alice, 2]

六、分组窗口聚合代码实现

public class TimeAndWindowTest {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);

        // 1. 在创建表的DDL中直接定义时间属性
        String createDDL = "CREATE TABLE clickTable (" +
                " user_name STRING, " +
                " url STRING, " +
                " ts BIGINT, " +
                " et AS TO_TIMESTAMP( FROM_UNIXTIME(ts / 1000) ), " +
                " WATERMARK FOR et AS et - INTERVAL '1' SECOND " +
                ") WITH (" +
                " 'connector' = 'filesystem', " +
                " 'path' = '/Users/fei.yang4/project/learn/src/main/java/com/bigdata/plus/flink/input/clicks.csv', " +
                " 'format' =  'csv' " +
                ")";

        tableEnv.executeSql(createDDL);

        // 2. 在流转换成Table时定义时间属性
        SingleOutputStreamOperator<Event> clickStream = env.addSource(new ClickSource())
                .assignTimestampsAndWatermarks(WatermarkStrategy.<Event>forBoundedOutOfOrderness(Duration.ZERO)
                        .withTimestampAssigner(new SerializableTimestampAssigner<Event>() {
                            @Override
                            public long extractTimestamp(Event event, long l) {
                                return event.timestamp;
                            }
                        }));

        Table clickTable = tableEnv.fromDataStream(clickStream, $("user"), $("url"), $("timestamp").as("ts"),
                $("et").rowtime());

        clickTable.printSchema();

        // 2. 分组窗口聚合
        Table groupWindowResultTable = tableEnv.sqlQuery("SELECT " +
                "user_name, " +
                "COUNT(1) AS cnt, " +
                "TUMBLE_END(et, INTERVAL '10' SECOND) as endT " +
                "FROM clickTable " +
                "GROUP BY " +                     // 使用窗口和用户名进行分组
                "  user_name, " +
                "  TUMBLE(et, INTERVAL '10' SECOND)" // 定义1小时滚动窗口
        );

        // 结果表转换成流打印输出

        tableEnv.toDataStream(groupWindowResultTable).print("group window: ");

        env.execute();
    }
}

(
  `user` STRING,
  `url` STRING,
  `ts` BIGINT,
  `et` TIMESTAMP(3) *ROWTIME*
)
group window: > +I[Mary, 1, 1970-01-01T08:00:10]
group window: > +I[Bob, 2, 1970-01-01T08:00:10]
group window: > +I[Alice, 2, 1970-01-01T08:00:10]
group window: > +I[Bob, 1, 1970-01-01T08:00:20]
group window: > +I[Alice, 1, 1970-01-01T08:00:30]

七、窗口聚合:滚动窗口

import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

import java.time.Duration;

import static org.apache.flink.table.api.Expressions.$;

/**
 * Copyright (c) 2020-2030 尚硅谷 All Rights Reserved
 * <p>
 * Project:  FlinkTutorial
 * <p>
 * Created by  wushengran
 */

public class TimeAndWindowTest {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);

        // 1. 在创建表的DDL中直接定义时间属性
        String createDDL = "CREATE TABLE clickTable (" +
                " user_name STRING, " +
                " url STRING, " +
                " ts BIGINT, " +
                " et AS TO_TIMESTAMP( FROM_UNIXTIME(ts / 1000) ), " +
                " WATERMARK FOR et AS et - INTERVAL '1' SECOND " +
                ") WITH (" +
                " 'connector' = 'filesystem', " +
                " 'path' = '/Users/fei.yang4/project/learn/src/main/java/com/bigdata/plus/flink/input/clicks.csv', " +
                " 'format' =  'csv' " +
                ")";

        tableEnv.executeSql(createDDL);

        // 2. 在流转换成Table时定义时间属性
        SingleOutputStreamOperator<Event> clickStream = env.addSource(new ClickSource())
                .assignTimestampsAndWatermarks(WatermarkStrategy.<Event>forBoundedOutOfOrderness(Duration.ZERO)
                        .withTimestampAssigner(new SerializableTimestampAssigner<Event>() {
                            @Override
                            public long extractTimestamp(Event event, long l) {
                                return event.timestamp;
                            }
                        }));

        Table clickTable = tableEnv.fromDataStream(clickStream, $("user"), $("url"), $("timestamp").as("ts"),
                $("et").rowtime());

        clickTable.printSchema();



        // 3. 窗口聚合
        // 3.1 滚动窗口
        Table tumbleWindowResultTable = tableEnv.sqlQuery("SELECT user_name, COUNT(url) AS cnt, " +
                " window_end AS endT " +
                "FROM TABLE( " +
                "  TUMBLE( TABLE clickTable, DESCRIPTOR(et), INTERVAL '10' SECOND)" +
                ") " +
                "GROUP BY user_name, window_start, window_end "
        );


        // 结果表转换成流打印输出
        tableEnv.toDataStream(tumbleWindowResultTable).print("tumble window: ");


        env.execute();
    }
}
(
  `user` STRING,
  `url` STRING,
  `ts` BIGINT,
  `et` TIMESTAMP(3) *ROWTIME*
)
tumble window: > +I[Mary, 1, 1970-01-01T08:00:10]
tumble window: > +I[Bob, 2, 1970-01-01T08:00:10]
tumble window: > +I[Alice, 2, 1970-01-01T08:00:10]
tumble window: > +I[Bob, 1, 1970-01-01T08:00:20]
tumble window: > +I[Alice, 1, 1970-01-01T08:00:30]

八、窗口聚合:滑动窗口

import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

import java.time.Duration;

import static org.apache.flink.table.api.Expressions.$;

/**
 * Copyright (c) 2020-2030 尚硅谷 All Rights Reserved
 * <p>
 * Project:  FlinkTutorial
 * <p>
 * Created by  wushengran
 */

public class TimeAndWindowTest {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);

        // 1. 在创建表的DDL中直接定义时间属性
        String createDDL = "CREATE TABLE clickTable (" +
                " user_name STRING, " +
                " url STRING, " +
                " ts BIGINT, " +
                " et AS TO_TIMESTAMP( FROM_UNIXTIME(ts / 1000) ), " +
                " WATERMARK FOR et AS et - INTERVAL '1' SECOND " +
                ") WITH (" +
                " 'connector' = 'filesystem', " +
                " 'path' = '/Users/fei.yang4/project/learn/src/main/java/com/bigdata/plus/flink/input/clicks.csv', " +
                " 'format' =  'csv' " +
                ")";

        tableEnv.executeSql(createDDL);

        // 2. 在流转换成Table时定义时间属性
        SingleOutputStreamOperator<Event> clickStream = env.addSource(new ClickSource())
                .assignTimestampsAndWatermarks(WatermarkStrategy.<Event>forBoundedOutOfOrderness(Duration.ZERO)
                        .withTimestampAssigner(new SerializableTimestampAssigner<Event>() {
                            @Override
                            public long extractTimestamp(Event event, long l) {
                                return event.timestamp;
                            }
                        }));

        Table clickTable = tableEnv.fromDataStream(clickStream, $("user"), $("url"), $("timestamp").as("ts"),
                $("et").rowtime());

        clickTable.printSchema();

        // 3.2 滑动窗口
        Table hopWindowResultTable = tableEnv.sqlQuery("SELECT user_name, COUNT(url) AS cnt, " +
                " window_end AS endT " +
                "FROM TABLE( " +
                "  HOP( TABLE clickTable, DESCRIPTOR(et), INTERVAL '5' SECOND, INTERVAL '10' SECOND)" +
                ") " +
                "GROUP BY user_name, window_start, window_end "
        );


        // 结果表转换成流打印输出
        tableEnv.toDataStream(hopWindowResultTable).print("hop window: ");

        env.execute();
    }
}
(
  `user` STRING,
  `url` STRING,
  `ts` BIGINT,
  `et` TIMESTAMP(3) *ROWTIME*
)
hop window: > +I[Mary, 1, 1970-01-01T08:00:05]
hop window: > +I[Bob, 2, 1970-01-01T08:00:05]
hop window: > +I[Alice, 2, 1970-01-01T08:00:05]
hop window: > +I[Bob, 2, 1970-01-01T08:00:10]
hop window: > +I[Alice, 2, 1970-01-01T08:00:10]
hop window: > +I[Mary, 1, 1970-01-01T08:00:10]
hop window: > +I[Bob, 1, 1970-01-01T08:00:15]
hop window: > +I[Bob, 1, 1970-01-01T08:00:20]
hop window: > +I[Alice, 1, 1970-01-01T08:00:25]
hop window: > +I[Alice, 1, 1970-01-01T08:00:30]

九、窗口聚合:累积窗口

import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

import java.time.Duration;

import static org.apache.flink.table.api.Expressions.$;

/**
 * Copyright (c) 2020-2030 尚硅谷 All Rights Reserved
 * <p>
 * Project:  FlinkTutorial
 * <p>
 * Created by  wushengran
 */

public class TimeAndWindowTest {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);

        // 1. 在创建表的DDL中直接定义时间属性
        String createDDL = "CREATE TABLE clickTable (" +
                " user_name STRING, " +
                " url STRING, " +
                " ts BIGINT, " +
                " et AS TO_TIMESTAMP( FROM_UNIXTIME(ts / 1000) ), " +
                " WATERMARK FOR et AS et - INTERVAL '1' SECOND " +
                ") WITH (" +
                " 'connector' = 'filesystem', " +
                " 'path' = '/Users/fei.yang4/project/learn/src/main/java/com/bigdata/plus/flink/input/clicks.csv', " +
                " 'format' =  'csv' " +
                ")";

        tableEnv.executeSql(createDDL);

        // 2. 在流转换成Table时定义时间属性
        SingleOutputStreamOperator<Event> clickStream = env.addSource(new ClickSource())
                .assignTimestampsAndWatermarks(WatermarkStrategy.<Event>forBoundedOutOfOrderness(Duration.ZERO)
                        .withTimestampAssigner(new SerializableTimestampAssigner<Event>() {
                            @Override
                            public long extractTimestamp(Event event, long l) {
                                return event.timestamp;
                            }
                        }));

        Table clickTable = tableEnv.fromDataStream(clickStream, $("user"), $("url"), $("timestamp").as("ts"),
                $("et").rowtime());

        clickTable.printSchema();

        // 聚合查询转换

        // 3.3 累积窗口
        Table cumulateWindowResultTable = tableEnv.sqlQuery("SELECT user_name, COUNT(url) AS cnt, " +
                " window_end AS endT " +
                "FROM TABLE( " +
                "  CUMULATE( TABLE clickTable, DESCRIPTOR(et), INTERVAL '5' SECOND, INTERVAL '10' SECOND)" +
                ") " +
                "GROUP BY user_name, window_start, window_end "
        );


        // 结果表转换成流打印输出
        tableEnv.toDataStream(cumulateWindowResultTable).print("cumulate window: ");
        env.execute();
    }
}
(
  `user` STRING,
  `url` STRING,
  `ts` BIGINT,
  `et` TIMESTAMP(3) *ROWTIME*
)
cumulate window: > +I[Mary, 1, 1970-01-01T08:00:05]
cumulate window: > +I[Bob, 2, 1970-01-01T08:00:05]
cumulate window: > +I[Alice, 2, 1970-01-01T08:00:05]
cumulate window: > +I[Bob, 2, 1970-01-01T08:00:10]
cumulate window: > +I[Alice, 2, 1970-01-01T08:00:10]
cumulate window: > +I[Mary, 1, 1970-01-01T08:00:10]
cumulate window: > +I[Bob, 1, 1970-01-01T08:00:15]
cumulate window: > +I[Bob, 1, 1970-01-01T08:00:20]
cumulate window: > +I[Alice, 1, 1970-01-01T08:00:25]
cumulate window: > +I[Alice, 1, 1970-01-01T08:00:30]

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/131990.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

【SCL】博图SCL应用之音乐喷泉

使用Scl语言编写博图应用&#xff1a;音乐喷泉 文章目录 目录 一、音乐喷泉 1.控制要求 2.I/O分配 3.编写程序 4.效果和完整代码 二、装配流水线模拟控制&#xff08;练习&#xff09; 1.控制要求 2.场景 前言 承接上文&#xff0c;这里写一下上一篇的练习题 音乐喷泉应用案…

使用Typora+PicGo+SM.MS实现本地博客图片自动上传

使用TyporaPicGoSM.MS实现本地博客图片自动上传 Typora&#xff1a;一款Markdown 编辑器 PicGo: 一个用于快速上传图片并获取图片 URL 链接的工具. SM.MS: 一个图床网站&#xff0c;注册后有5G免费空间 为什么要让本地图片自动上传 对于一个随时随刻都有可能在文章中贴代码的计…

快来领取你的JavaScript正则表达式速查表

如果我们想对字符串进行相关&#xff08;增、删、改、查、检索&#xff09;操作&#xff0c;就可以用接下来的正则表达式实现 什么是正则表达式 正则表达式是用于匹配字符串中字符组合的模式正则表达式通常被用来检索、替换那些符合某个模式&#xff08;规则&#xff09;的文本…

家装中,你最后悔的事是什么?上海极家装修公司简介!

家装中&#xff0c;你最后悔的事是什么&#xff1f;上海极家装修公司简介&#xff01;接触过很多业主&#xff0c;就没有不后悔的!至于原因&#xff0c;因为总会出现各种“考虑不周”&#xff0c;有些真的是失之毫厘差之千里&#xff01; 下面上海极家装修公司简介&#xff01;…

Cadence每日一学_12 | 使用 Padstack Editor 制作贴片焊盘和通孔焊盘

最近在学习小马哥的Cadence课程&#xff0c;该系列课程为学习笔记&#xff1a;使用Cadence Allegro绘制小马哥DragonFly四轴飞行器(STM32F4主控)PCB四层板教程。 文章目录一、获取焊盘封装尺寸的途径二、Padstack Editor三、绘制贴片焊盘&#xff08;以电阻焊盘为例&#xff09…

Odoo 16 企业版手册 - 库存管理之产品类别

产品类别 您可以使用Odoo 库存模块中提供的产品类别功能对产品进行分类。为了执行各种产品操作&#xff0c;必须在Odoo中定义产品类别。Odoo将使产品更容易找到&#xff0c;因为它允许您按产品类别进行筛选。用户可以从库存模块的「配置」菜单访问「产品类别」窗口&#xff0c;…

【Python】sklearn机器学习之Birch聚类算法

文章目录基本原理sklearn调用基本原理 BIRCH&#xff0c;即Balanced Iterative Reducing and Clustering Using Hierarchies&#xff0c;利用分层的平衡迭代规约和聚类&#xff0c;特点是扫描一次数据就可以实现聚类&#xff0c; 而根据经验&#xff0c;一般这种一遍成功的算…

02 elf 的 binary 解析

前言 需求来自于 linux binary 的执行分析, 以及一些反编译工具的实现 比如 readelf, hopper disassemble 什么的 主要的目的是 更加详细了解 elf 的文件格式 为 后续的一些 理解做准备 elf 解析 elf 文件主要分为 四个部分 elfHeader, programHeaders, segments, …

2022年度盘点|聚焦运维服务,云智慧的高光时刻

回首2022&#xff0c;从IE 浏览器退役到AIGC 火遍全球&#xff0c;每一次科技的兴衰演进都打破着技术的新边界。与此同时&#xff0c;随着各行业数据规模爆发式地增长&#xff0c;云智慧作为国内全栈智能运维解决方案服务商&#xff0c;企业数字化地加速转型也为其带来了更多的…

Hibernate validator注解及Spring Boot自定义Hibernate Validator注解校验(超级详细)

一 Hibernate validator是什么 验证数据是贯穿整个应用层&#xff08;从表示层到持久层&#xff09;的常见任务。通常在每一层中都需要实现相同的验证逻辑&#xff0c;这样既耗时又容易出错。为了避免这些验证的重复&#xff0c;开发认原经常将验证逻辑直接捆绑到Model域中&…

1.9 基础综合案例|pyechart第三方包

文章目录json数据格式pyecharts模块介绍pyecharts快速入门数据处理这里使用比较经典的pyechart的第三方包。json数据格式 json是一种轻量级的数据交互形式。可以按照json指定的格式去组织和封装数据。或者这么说本质上json就是一个带有特定格式的字符串。 主要功能&#xff1…

【金猿案例展】正官庄——全渠道会员数据治理驱动商业增长

‍珍岛集团案例本项目案例由珍岛集团投递并参与“数据猿年度金猿策划活动——《2022大数据产业年度创新服务企业》榜单/奖项”评选。‍数据智能产业创新服务媒体——聚焦数智 改变商业随着商业品牌的全渠道裂变式发展&#xff0c;对DTC直营会员为中心的综合数据运营提出了新的…

【自学Java】Java基本数据类型

Java基本数据类型 Java基本数据类型 Java 基本数据类型如下表&#xff1a; 序号数据类型大小/位可表示的数据范围默认值1long&#xff08;长整数&#xff09;64-9223372036854775808&#xff5e;92233720368547758070L2int&#xff08;整数&#xff09;32-2147483648&#x…

设置 MYSQL 数据库编码为 utf8mb4

utf-8编码可能2个字节、3个字节、4个字节的字符&#xff0c;但是MySQL的utf8编码只支持3字节的数据&#xff0c;而移动端的表情数据是4个字节的字符。如果直接往采用utf-8编码的数据库中插入表情数据&#xff0c;java程序中将报SQL异常&#xff1a; java.sql.SQLException: Inc…

带音频播放的MPlayer播放器在ARM上的移植笔记

前言 mplayer想要播放带音频的视频文件&#xff0c;需要依赖alsa-lib和zlib&#xff0c;所以交叉编译mplayer前还需要先编译alsa-lib和zlib 一、alsa-lib alsa-lib 是 ALSA 提供的一套 Linux 下的 C 语言函数库&#xff0c;需要将 alsa-lib 移植到板卡上&#xff0c;这样基于…

人话解读LGPLv3

大家都知道&#xff1a;你调用了 LGPL的库&#xff0c;你还是可以开发一个闭源程序。这就说明&#xff0c;LGPL比GPL要宽松。但并不像想象的那么简单。一、为什么会有LGPL作为GPL的发明人Stallman&#xff0c;是自由软件的死忠坚定维护者&#xff0c;为什么还允许让别人用了自己…

2022年终总结-2023新年快乐

这是学习笔记的第 2446篇文章新的一年了&#xff0c;还是得总结点东西&#xff0c;本来想盘一下自己买了多少书&#xff0c;做了哪些有意义的事情&#xff0c;想想我还是自己先慢慢盘吧&#xff0c;发不发出来另说&#xff0c;还是希望写点自己的感悟&#xff0c;也希望对大家有…

LabVIEW共享变量

LabVIEW共享变量 创建共享变量 要创建共享变量&#xff0c;必须先打开一个LabVIEW项目。在项目浏览器窗口中&#xff0c;右键单击终端、项目库或项目库中的文件夹&#xff0c;从快捷菜单中选择新建(New) 变量(Variable)&#xff0c;打开共享变量属性(Shared Variable Proper…

dubbo源码实践-SPI扩展

1 概述 SPI的官方文档说明&#xff1a;Dubbo SPI | Apache Dubbo SPI 全称为 Service Provider Interface&#xff0c;是一种服务发现机制。SPI 的本质是将接口实现类的全限定名配置在文件中&#xff0c;并由服务加载器读取配置文件&#xff0c;加载实现类。这样可以在运行时&…

我的2022总结

博客记录 踏石留印 抓铁有痕 使用csdn写博客&#xff0c;发帖子&#xff0c;帮助网友回答问题。都是实实在在满足了学习&#xff0c;交流的需求 这是我自己使用 CSDN 各种功能记录&#xff1a; 工作方面&#xff1a; 年初参与了公司的一个产品&#xff0c;主要负责串口服务…