【Flink-1.17-教程】-【五】Flink 中的时间和窗口(1)窗口(Window)

news2024/9/24 5:30:46

【Flink-1.17-教程】-【五】Flink 中的时间和窗口(1)窗口(Window)

  • 1)窗口的概念
  • 2)窗口的分类
    • 2.1.按照驱动类型分
    • 2.2.按照窗口分配数据的规则分类
      • 2.2.1.滚动窗口(Tumbling Window)
      • 2.2.2.滑动窗口(Sliding Window)
      • 2.2.3.会话窗口(Session Window)
      • 2.2.4.全局窗口(Global Window)
  • 3)窗口 API 概览
  • 4)窗口分配器
    • 4.1.时间窗口
    • 4.2.计数窗口
  • 5)窗口函数
    • 5.1.增量聚合函数(ReduceFunction / AggregateFunction)
    • 5.2.全窗口函数(full window functions)
    • 5.3.增量聚合和全窗口函数的结合使用
  • 6)其他 API
    • 6.1.触发器(Trigger)
    • 6.2.移除器(Evictor)

在批处理统计中,我们可以等待一批数据都到齐后,统一处理。但是在实时处理统计中,我们是来一条就得处理一条,那么我们怎么统计最近一段时间内的数据呢?引入“窗口”。

所谓的“窗口”,一般就是划定的一段时间范围,也就是“时间窗”;对在这范围内的数据进行处理,就是所谓的窗口计算。所以窗口和时间往往是分不开的。接下来我们就深入了解一下 Flink 中的时间语义和窗口的应用。

1)窗口的概念

Flink 是一种流式计算引擎,主要是来处理无界数据流的,数据源源不断、无穷无尽。想要更加方便高效地处理无界流,一种方式就是将无限数据切割成有限的“数据块”进行处理,这就是所谓的“窗口”(Window)。

在这里插入图片描述

注意:Flink 中窗口并不是静态准备好的,而是动态创建——当有落在这个窗口区间范围的数据达到时,才创建对应的窗口。另外,这里我们认为到达窗口结束时间时,窗口就触发计算并关闭,事实上“触发计算”和“窗口关闭”两个行为也可以分开,这部分内容我们会在后面详述。

2)窗口的分类

我们在上一节举的例子,其实是最为简单的一种时间窗口。在 Flink 中,窗口的应用非常灵活,我们可以使用各种不同类型的窗口来实现需求。接下来我们就从不同的角度,对 Flink 中内置的窗口做一个分类说明。

2.1.按照驱动类型分

在这里插入图片描述

2.2.按照窗口分配数据的规则分类

根据分配数据的规则,窗口的具体实现可以分为 4 类:滚动窗口(Tumbling Window)、滑动窗口(Sliding Window)、会话窗口(Session Window),以及全局窗口(Global Window)。

2.2.1.滚动窗口(Tumbling Window)

在这里插入图片描述

2.2.2.滑动窗口(Sliding Window)

在这里插入图片描述

2.2.3.会话窗口(Session Window)

在这里插入图片描述

2.2.4.全局窗口(Global Window)

在这里插入图片描述

3)窗口 API 概览

1)按键分区(Keyed)和非按键分区(Non-Keyed)

在定义窗口操作之前,首先需要确定,到底是基于按键分区(Keyed)的数据流 KeyedStream 来开窗,还是直接在没有按键分区的 DataStream 上开窗。也就是说,在调用窗口算子之前,是否有 keyBy 操作。

(1)按键分区窗口(Keyed Windows)

经过按键分区 keyBy 操作后,数据流会按照 key 被分为多条逻辑流(logical streams),这就是 KeyedStream。基于 KeyedStream 进行窗口操作时,窗口计算会在多个并行子任务上同时执行。相同 key 的数据会被发送到同一个并行子任务,而窗口操作会基于每个 key 进行单独的处理。所以可以认为,每个 key 上都定义了一组窗口,各自独立地进行统计计算。

在代码实现上,我们需要先对 DataStream 调用.keyBy()进行按键分区,然后再调用.window()定义窗口。

stream.keyBy(...)
.window(...)

(2)非按键分区(Non-Keyed Windows)

如果没有进行 keyBy,那么原始的 DataStream 就不会分成多条逻辑流。这时窗口逻辑只能在一个任务(task)上执行,就相当于并行度变成了 1。

在代码中,直接基于 DataStream 调用.windowAll()定义窗口。

stream.windowAll(...)

注意:对于非按键分区的窗口操作,手动调大窗口算子的并行度也是无效的,windowAll 本身就是一个非并行的操作。

2)代码中窗口 API 的调用

窗口操作主要有两个部分:窗口分配器(Window Assigners)和窗口函数(Window Functions)。

stream.keyBy(<key selector>)
.window(<window assigner>)
.aggregate(<window function>)

其中 .window() 方法需要传入一个窗口分配器,它指明了窗口的类型;而后面的.aggregate()方法传入一个窗口函数作为参数,它用来定义窗口具体的处理逻辑。窗口分配器有各种形式,而窗口函数的调用方法也不只.aggregate()一种,我们接下来就详细展开讲解。

4)窗口分配器

定义窗口分配器(Window Assigners)是构建窗口算子的第一步,它的作用就是定义数据应该被“分配”到哪个窗口。所以可以说,窗口分配器其实就是在指定窗口的类型。

窗口分配器最通用的定义方式,就是调用.window()方法。这个方法需要传入一个 WindowAssigner 作为参数,返回 WindowedStream。如果是非按键分区窗口,那么直接调用.windowAll()方法,同样传入一个 WindowAssigner,返回的是 AllWindowedStream。

窗口按照驱动类型可以分成时间窗口和计数窗口,而按照具体的分配规则,又有滚动窗口、滑动窗口、会话窗口、全局窗口四种。除去需要自定义的全局窗口外,其他常用的类型Flink 中都给出了内置的分配器实现,我们可以方便地调用实现各种需求。

4.1.时间窗口

时间窗口是最常用的窗口类型,又可以细分为滚动、滑动和会话三种。

1、滚动处理时间窗口

窗口分配器由类 TumblingProcessingTimeWindows 提供,需要调用它的静态方法.of()。

stream.keyBy(...)
.window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
.aggregate(...)

这里.of()方法需要传入一个 Time 类型的参数 size,表示滚动窗口的大小,我们这里创建了一个长度为 5 秒的滚动窗口。

另外,.of()还有一个重载方法,可以传入两个 Time 类型的参数:size 和 offset。第一个参数当然还是窗口大小,第二个参数则表示窗口起始点的偏移量。

2、滑动处理时间窗口

窗口分配器由类 SlidingProcessingTimeWindows 提供,同样需要调用它的静态方法.of()。

stream.keyBy(...)
.window(SlidingProcessingTimeWindows.of(Time.seconds(10)Time.seconds(5)))
.aggregate(...)

这里.of()方法需要传入两个 Time 类型的参数:size 和 slide,前者表示滑动窗口的大小,后者表示滑动窗口的滑动步长。我们这里创建了一个长度为 10 秒、滑动步长为 5 秒的滑动窗口。

滑动窗口同样可以追加第三个参数,用于指定窗口起始点的偏移量,用法与滚动窗口完全一致。

3、处理时间会话窗口

窗口分配器由类 ProcessingTimeSessionWindows 提供,需要调用它的静态方法.withGap() 或者.withDynamicGap()。

stream.keyBy(...)
.window(ProcessingTimeSessionWindows.withGap(Time.seconds(10)
))
.aggregate(...)

这里.withGap()方法需要传入一个 Time 类型的参数 size,表示会话的超时时间,也就是最小间隔 session gap。我们这里创建了静态会话超时时间为 10 秒的会话窗口。

另外,还可以调用 withDynamicGap()方法定义 session gap 的动态提取逻辑。

4、滚动事件时间窗口

窗口分配器由类 TumblingEventTimeWindows 提供,用法与滚动处理事件窗口完全一致。

stream.keyBy(...)
.window(TumblingEventTimeWindows.of(Time.seconds(5)))
.aggregate(...)

5、滑动事件时间窗口

窗口分配器由类 SlidingEventTimeWindows 提供,用法与滑动处理事件窗口完全一致。

stream.keyBy(...)
.window(SlidingEventTimeWindows.of(Time.seconds(10)Time.seconds(5)))
.aggregate(...)

6、事件时间会话窗口

窗口分配器由类 EventTimeSessionWindows 提供,用法与处理事件会话窗口完全一致。

stream.keyBy(...)
.window(EventTimeSessionWindows.withGap(Time.seconds(10)))
.aggregate(...)

4.2.计数窗口

计数窗口概念非常简单,本身底层是基于全局窗口(Global Window)实现的。Flink 为我们提供了非常方便的接口:直接调用.countWindow()方法。根据分配规则的不同,又可以分为滚动计数窗口和滑动计数窗口两类,下面我们就来看它们的具体实现。

1、滚动计数窗口

滚动计数窗口只需要传入一个长整型的参数 size,表示窗口的大小。

stream.keyBy(...)
.countWindow(10)

我们定义了一个长度为 10 的滚动计数窗口,当窗口中元素数量达到 10 的时候,就会触发计算执行并关闭窗口。

2、滑动计数窗口
与滚动计数窗口类似,不过需要在.countWindow()调用时传入两个参数:size 和 slide,前者表示窗口大小,后者表示滑动步长。

stream.keyBy(...)
.countWindow(103)

我们定义了一个长度为 10、滑动步长为 3 的滑动计数窗口。每个窗口统计 10 个数据,每隔 3 个数据就统计输出一次结果。

3、全局窗口

全局窗口是计数窗口的底层实现,一般在需要自定义窗口时使用。它的定义同样是直接调用.window(),分配器由 GlobalWindows 类提供。

stream.keyBy(...)
.window(GlobalWindows.create());

需要注意使用全局窗口,必须自行定义触发器才能实现窗口计算,否则起不到任何作用。

5)窗口函数

在这里插入图片描述

窗口函数定义了要对窗口中收集的数据做的计算操作,根据处理的方式可以分为两类:增量聚合函数和全窗口函数。下面我们来进行分别讲解。

5.1.增量聚合函数(ReduceFunction / AggregateFunction)

窗口将数据收集起来,最基本的处理操作当然就是进行聚合。我们可以每来一个数据就在之前结果上聚合一次,这就是“增量聚合”。典型的增量聚合函数有两个:ReduceFunction 和 AggregateFunction。

1、归约函数(ReduceFunction)

public class WindowReduceDemo {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        SingleOutputStreamOperator<WaterSensor> sensorDS = env
                .socketTextStream("hadoop102", 7777)
                .map(new WaterSensorMapFunction());

        KeyedStream<WaterSensor, String> sensorKS = sensorDS.keyBy(sensor -> sensor.getId());

        // 1. 窗口分配器
        WindowedStream<WaterSensor, String, TimeWindow> sensorWS = sensorKS.window(TumblingProcessingTimeWindows.of(Time.seconds(10)));

        // 2. 窗口函数: 增量聚合 Reduce
        /**
         * 窗口的reduce:
         * 1、相同key的第一条数据来的时候,不会调用reduce方法
         * 2、增量聚合: 来一条数据,就会计算一次,但是不会输出
         * 3、在窗口触发的时候,才会输出窗口的最终计算结果
         */
        SingleOutputStreamOperator<WaterSensor> reduce = sensorWS.reduce(
                new ReduceFunction<WaterSensor>() {
                    @Override
                    public WaterSensor reduce(WaterSensor value1, WaterSensor value2) throws Exception {
                        System.out.println("调用reduce方法,value1=" + value1 + ",value2=" + value2);
                        return new WaterSensor(value1.getId(), value2.getTs(), value1.getVc() + value2.getVc());
                    }
                }
        );

        reduce.print();

        env.execute();
    }
}

2、聚合函数(AggregateFunction)

ReduceFunction 可以解决大多数归约聚合的问题,但是这个接口有一个限制,就是聚合状态的类型、输出结果的类型都必须和输入数据类型一样。

Flink Window API 中的 aggregate 就突破了这个限制,可以定义更加灵活的窗口聚合操作。这个方法需要传入一个 AggregateFunction 的实现类作为参数。

AggregateFunction 可以看作是 ReduceFunction 的通用版本,这里有三种类型:输入类型(IN)、累加器类型(ACC)和输出类型(OUT)。输入类型 IN 就是输入流中元素的数据类型;累加器类型 ACC 则是我们进行聚合的中间状态类型;而输出类型当然就是最终计算结果的类型了。

接口中有四个方法:

  • createAccumulator():创建一个累加器,这就是为聚合创建了一个初始状态,每个聚
    合任务只会调用一次。
  • add():将输入的元素添加到累加器中。
  • getResult():从累加器中提取聚合的输出结果。
  • merge():合并两个累加器,并将合并后的状态作为一个累加器返回。

所以可以看到,AggregateFunction 的工作原理是:首先调用 createAccumulator()为任务初始化一个状态(累加器);而后每来一个数据就调用一次 add()方法,对数据进行聚合,得到的结果保存在状态中;等到了窗口需要输出时,再调用 getResult()方法得到计算结果。很明显,与 ReduceFunction 相同,AggregateFunction 也是增量式的聚合;而由于输入、中间状态、输出的类型可以不同,使得应用更加灵活方便。

代码实现如下:

public class WindowAggregateDemo {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        SingleOutputStreamOperator<WaterSensor> sensorDS = env
                .socketTextStream("hadoop102", 7777)
                .map(new WaterSensorMapFunction());

        KeyedStream<WaterSensor, String> sensorKS = sensorDS.keyBy(sensor -> sensor.getId());

        // 1. 窗口分配器
        WindowedStream<WaterSensor, String, TimeWindow> sensorWS = sensorKS.window(TumblingProcessingTimeWindows.of(Time.seconds(10)));

        // 2. 窗口函数: 增量聚合 Aggregate
        /**
         * 1、属于本窗口的第一条数据来,创建窗口,创建累加器
         * 2、增量聚合: 来一条计算一条, 调用一次add方法
         * 3、窗口输出时调用一次getresult方法
         * 4、输入、中间累加器、输出 类型可以不一样,非常灵活
         */
        SingleOutputStreamOperator<String> aggregate = sensorWS.aggregate(
                /**
                 * 第一个类型: 输入数据的类型
                 * 第二个类型: 累加器的类型,存储的中间计算结果的类型
                 * 第三个类型: 输出的类型
                 */
                new AggregateFunction<WaterSensor, Integer, String>() {
                    /**
                     * 创建累加器,初始化累加器
                     * @return
                     */
                    @Override
                    public Integer createAccumulator() {
                        System.out.println("创建累加器");
                        return 0;
                    }

                    /**
                     * 聚合逻辑
                     * @param value
                     * @param accumulator
                     * @return
                     */
                    @Override
                    public Integer add(WaterSensor value, Integer accumulator) {
                        System.out.println("调用add方法,value="+value);
                        return accumulator + value.getVc();
                    }

                    /**
                     * 获取最终结果,窗口触发时输出
                     * @param accumulator
                     * @return
                     */
                    @Override
                    public String getResult(Integer accumulator) {
                        System.out.println("调用getResult方法");
                        return accumulator.toString();
                    }

                    @Override
                    public Integer merge(Integer a, Integer b) {
                        // 只有会话窗口才会用到
                        System.out.println("调用merge方法");
                        return null;
                    }
                }
        );

        aggregate.print();
        
        env.execute();
    }
}

另外,Flink 也为窗口的聚合提供了一系列预定义的简单聚合方法,可以直接基于 WindowedStream 调用。主要包括.sum()/max()/maxBy()/min()/minBy(),与 KeyedStream 的简单聚合非常相似。它们的底层,其实都是通过 AggregateFunction 来实现的。

5.2.全窗口函数(full window functions)

有些场景下,我们要做的计算必须基于全部的数据才有效,这时做增量聚合就没什么意义了;另外,输出的结果有可能要包含上下文中的一些信息(比如窗口的起始时间),这是增量聚合函数做不到的。

所以,我们还需要有更丰富的窗口计算方式。窗口操作中的另一大类就是全窗口函数。与增量聚合函数不同,全窗口函数需要先收集窗口中的数据,并在内部缓存起来,等到窗口要输出结果的时候再取出数据进行计算。

在 Flink 中,全窗口函数也有两种:WindowFunction 和 ProcessWindowFunction。

1、窗口函数(WindowFunction)

WindowFunction 字面上就是“窗口函数”,它其实是老版本的通用窗口函数接口。我们可以基于 WindowedStream 调用.apply()方法,传入一个 WindowFunction 的实现类。

stream
.keyBy(<key selector>)
.window(<window assigner>)
.apply(new MyWindowFunction());

这个类中可以获取到包含窗口所有数据的可迭代集合(Iterable),还可以拿到窗口(Window)本身的信息。

不过 WindowFunction 能提供的上下文信息较少,也没有更高级的功能。事实上,它的作用可以被 ProcessWindowFunction 全覆盖,所以之后可能会逐渐弃用。

2、处理窗口函数(ProcessWindowFunction)

ProcessWindowFunction 是 Window API 中最底层的通用窗口函数接口。之所以说它“最底层”,是因为除了可以拿到窗口中的所有数据之外,ProcessWindowFunction 还可以获取到一个“上下文对象”(Context)。这个上下文对象非常强大,不仅能够获取窗口信息,还可以访问当前的时间和状态信息。这里的时间就包括了处理时间(processing time)和事件时间水位线(event time watermark)。这就使得 ProcessWindowFunction 更加灵活、功能更加丰富,其实就是一个增强版的 WindowFunction。

事实上,ProcessWindowFunction 是 Flink 底层 API——处理函数(process function)中的一员,关于处理函数我们会在后续章节展开讲解。

代码实现如下:

public class WindowProcessDemo {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        SingleOutputStreamOperator<WaterSensor> sensorDS = env
                .socketTextStream("hadoop102", 7777)
                .map(new WaterSensorMapFunction());

        KeyedStream<WaterSensor, String> sensorKS = sensorDS.keyBy(sensor -> sensor.getId());

        // 1. 窗口分配器
        WindowedStream<WaterSensor, String, TimeWindow> sensorWS = sensorKS.window(TumblingProcessingTimeWindows.of(Time.seconds(10)));

        // 老写法
//        sensorWS
//                .apply(
//                        new WindowFunction<WaterSensor, String, String, TimeWindow>() {
//                            /**
//                             *
//                             * @param s  分组的key
//                             * @param window 窗口对象
//                             * @param input 存的数据
//                             * @param out   采集器
//                             * @throws Exception
//                             */
//                            @Override
//                            public void apply(String s, TimeWindow window, Iterable<WaterSensor> input, Collector<String> out) throws Exception {
//
//                            }
//                        }
//                )

        SingleOutputStreamOperator<String> process = sensorWS
                .process(
                        new ProcessWindowFunction<WaterSensor, String, String, TimeWindow>() {
                            /**
                             * 全窗口函数计算逻辑:  窗口触发时才会调用一次,统一计算窗口的所有数据
                             * @param s   分组的key
                             * @param context  上下文
                             * @param elements 存的数据
                             * @param out      采集器
                             * @throws Exception
                             */
                            @Override
                            public void process(String s, Context context, Iterable<WaterSensor> elements, Collector<String> out) throws Exception {
                                // 上下文可以拿到window对象,还有其他东西:侧输出流 等等
                                long startTs = context.window().getStart();
                                long endTs = context.window().getEnd();
                                String windowStart = DateFormatUtils.format(startTs, "yyyy-MM-dd HH:mm:ss.SSS");
                                String windowEnd = DateFormatUtils.format(endTs, "yyyy-MM-dd HH:mm:ss.SSS");

                                long count = elements.spliterator().estimateSize();

                                out.collect("key=" + s + "的窗口[" + windowStart + "," + windowEnd + ")包含" + count + "条数据===>" + elements.toString());


                            }
                        }
                );

        process.print();

        env.execute();
    }
}

5.3.增量聚合和全窗口函数的结合使用

在实际应用中,我们往往希望兼具这两者的优点,把它们结合在一起使用。Flink 的 Window API 就给我们实现了这样的用法。

我们之前在调用 WindowedStream 的.reduce()和.aggregate()方法时,只是简单地直接传入了一个 ReduceFunction 或 AggregateFunction 进行增量聚合。除此之外,其实还可以传入第二个参数:一个全窗口函数,可以是 WindowFunction 或者 ProcessWindowFunction。

// ReduceFunction 与 WindowFunction 结合
public <R> SingleOutputStreamOperator<R> reduce(
ReduceFunction<T> reduceFunction,WindowFunction<TRKW>
function)
// ReduceFunction 与 ProcessWindowFunction 结合
public <R> SingleOutputStreamOperator<R> reduce(
ReduceFunction<T> reduceFunction,ProcessWindowFunction<TRKW> function)
// AggregateFunction 与 WindowFunction 结合
public <ACC,VR> SingleOutputStreamOperator<R> aggregate(
AggregateFunction<T,ACC,V> aggFunction,WindowFunction<VRKW> windowFunction)
// AggregateFunction 与 ProcessWindowFunction 结合
public <ACC,VR> SingleOutputStreamOperator<R> aggregate(
AggregateFunction<T,ACC,V> aggFunction,
ProcessWindowFunction<VRKW> windowFunction)

这样调用的处理机制是:基于第一个参数(增量聚合函数)来处理窗口数据,每来一个数据就做一次聚合;等到窗口需要触发计算时,则调用第二个参数(全窗口函数)的处理逻辑输出结果。需要注意的是,这里的全窗口函数就不再缓存所有数据了,而是直接将增量聚合函数的结果拿来当作了 Iterable 类型的输入。

具体实现代码如下:

public class WindowAggregateAndProcessDemo {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);


        SingleOutputStreamOperator<WaterSensor> sensorDS = env
                .socketTextStream("hadoop102", 7777)
                .map(new WaterSensorMapFunction());


        KeyedStream<WaterSensor, String> sensorKS = sensorDS.keyBy(sensor -> sensor.getId());

        // 1. 窗口分配器
        WindowedStream<WaterSensor, String, TimeWindow> sensorWS = sensorKS.window(TumblingProcessingTimeWindows.of(Time.seconds(10)));

        // 2. 窗口函数:
        /**
         * 增量聚合 Aggregate + 全窗口 process
         * 1、增量聚合函数处理数据: 来一条计算一条
         * 2、窗口触发时, 增量聚合的结果(只有一条) 传递给 全窗口函数
         * 3、经过全窗口函数的处理包装后,输出
         *
         * 结合两者的优点:
         * 1、增量聚合: 来一条计算一条,存储中间的计算结果,占用的空间少
         * 2、全窗口函数: 可以通过 上下文 实现灵活的功能
         */

//        sensorWS.reduce()   //也可以传两个

        SingleOutputStreamOperator<String> result = sensorWS.aggregate(
                new MyAgg(),
                new MyProcess()
        );

        result.print();



        env.execute();
    }

    public static class MyAgg implements AggregateFunction<WaterSensor, Integer, String>{

        @Override
        public Integer createAccumulator() {
            System.out.println("创建累加器");
            return 0;
        }


        @Override
        public Integer add(WaterSensor value, Integer accumulator) {
            System.out.println("调用add方法,value="+value);
            return accumulator + value.getVc();
        }

        @Override
        public String getResult(Integer accumulator) {
            System.out.println("调用getResult方法");
            return accumulator.toString();
        }

        @Override
        public Integer merge(Integer a, Integer b) {
            System.out.println("调用merge方法");
            return null;
        }
    }

    // 全窗口函数的输入类型 = 增量聚合函数的输出类型
    public static class MyProcess extends ProcessWindowFunction<String,String,String,TimeWindow>{

        @Override
        public void process(String s, Context context, Iterable<String> elements, Collector<String> out) throws Exception {
            long startTs = context.window().getStart();
            long endTs = context.window().getEnd();
            String windowStart = DateFormatUtils.format(startTs, "yyyy-MM-dd HH:mm:ss.SSS");
            String windowEnd = DateFormatUtils.format(endTs, "yyyy-MM-dd HH:mm:ss.SSS");

            long count = elements.spliterator().estimateSize();

            out.collect("key=" + s + "的窗口[" + windowStart + "," + windowEnd + ")包含" + count + "条数据===>" + elements.toString());

        }
    }
}

这里我们为了方便处理,单独定义了一个 POJO 类 UrlViewCount 来表示聚合输出结果的数据类型,包含了 url、浏览量以及窗口的起始结束时间。用一个 AggregateFunction 来实现增量聚合,每来一个数据就计数加一;得到的结果交给 ProcessWindowFunction,结合窗口信息包装成我们想要的 UrlViewCount,最终输出统计结果。

6)其他 API

对于一个窗口算子而言,窗口分配器和窗口函数是必不可少的。除此之外,Flink 还提供了其他一些可选的 API,让我们可以更加灵活地控制窗口行为。

6.1.触发器(Trigger)

触发器主要是用来控制窗口什么时候触发计算。所谓的“触发计算”,本质上就是执行窗口函数,所以可以认为是计算得到结果并输出的过程。

基于 WindowedStream 调用 .trigger() 方法,就可以传入一个自定义的窗口触发器(Trigger)。

stream.keyBy(...)
.window(...)
.trigger(new MyTrigger())

6.2.移除器(Evictor)

移除器主要用来定义移除某些数据的逻辑。基于 WindowedStream 调用.evictor()方法,就可以传入一个自定义的移除器(Evictor)。Evictor 是一个接口,不同的窗口类型都有各自预实现的移除器。

stream.keyBy(...)
.window(...)
.evictor(new MyEvictor())

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1409116.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

代码随想录算法训练营第37天 | 738.单调递增的数字 968.监控二叉树 总结

目录 738.单调递增的数字 &#x1f4a1;解题思路 &#x1f4bb;实现代码 968.监控二叉树 &#x1f4a1;解题思路 确定遍历顺序 如何隔两个节点放一个摄像头 &#x1f4bb;实现代码 总结 738.单调递增的数字 题目链接&#xff1a;738.单调递增的数字 给定一个非负…

《深入解析Java虚拟机:从JVM体系结构到垃圾回收算法》

文章目录 JVM体系结构JVM的组成 类加载器Class Loader类加载器的作用双亲委派机制JVM自带三个类加载器Bootstrap ClassLoader-根加载器ExtClassLoader-扩展加载器AppClassLoader-应用类加载器 Java历史-沙箱安全机制沙箱概念沙箱的作用本地代码和远程代码沙箱安全机制模型JDK1 …

Sqlite真空命令VACUUM

之前在项目中使用了sqlite数据库&#xff0c;当日志变大时&#xff0c;执行CRUD操作就会变慢 后来尝试删除7天前的记录进行优化 delete from XX_CollectData where CreateTime<2024-01-24 发现sqlite文件的大小就没有变化&#xff0c;delete命令只是逻辑删除&#xff0c;…

web项目开发的基本过程

一、背景 web项目开发基本过程一般由需求分析&#xff0c;概要设计&#xff0c;详细设计&#xff0c;数据库设计&#xff0c;编码&#xff0c;测试&#xff0c;发布上线这几个过程。这就是经典的瀑布模型。但是随着系统的复杂度越来越高&#xff0c;团队人员技术栈分工越来越小…

[algorithm] 自动驾驶 规划 非线性优化学习系列之1 :车辆横向运动动力学详细解释

写在前面 最近时空联合规划很火&#xff0c;想学习。由于在学校主打学习新能源电力电子方向&#xff0c;转行后也想好好零散的知识体系。计划从车辆运动动力学习&#xff0c;模型预测控制&#xff08;经典控制目前看主打应用&#xff0c;不会再去深入&#xff09;&#xff0c;…

下载音频(MP3)解决跨域,不跳转界面,直接下载

需求 项目需求&#xff0c;将通话记录下载下来&#xff0c;要求不跳转界面直接下载。 效果 代码 // 下载录音downloadRecording(data) {const url data.urlconst fileName 录音.mp3this.getOSSBlobResource(url).then(res > {this.saveFile(res, fileName)})},getOSSBlo…

车载显示,“激斗”与“换代”

编者按&#xff1a;车载显示&#xff0c;正在进入新一轮变革周期。 车载显示作为汽车智能化的重要交互终端&#xff0c;在过去几年&#xff0c;持续受益车企的大屏化、多屏化配置趋势&#xff0c;部分头部厂商赚得盆满钵满。 比如&#xff0c;作为京东方旗下唯一的车载显示模组…

apipost和curl收不到服务器响应的HTTP/1.1 404 Not Found

windows的apipost发送请求后&#xff0c;服务器响应了HTTP/1.1 404 Not Found&#xff0c;但是apipost一直显示发送中。 linux上的curl也一样。 使用wireshark抓包发现收到了响应&#xff0c;但是wireshark识别不了&#xff08;图中是回应404后关闭了连接&#xff09;&#xff…

描绘未知:数据缺乏场景的缺陷检测方案

了解更多方案内容&#xff0c;欢迎您访问官网&#xff1a;neuro-T | 友思特 机器视觉 光电检测&#xff1b;或联系销售经理&#xff1a;18124130753 导读&#xff1a; 深度学习模型帮助工业生产实现更加精确的缺陷检测&#xff0c;但其准确性可能受制于数据样本的数量。友思特…

from sklearn.preprocessing import LabelEncoder的详细用法

sklearn.preprocessing 0. 基本解释1. 用法说明2. python例子说明 0. 基本解释 LabelEncoder 是 sklearn.preprocessing 模块中的一个工具&#xff0c;用于将分类特征的标签转换为整数。这在许多机器学习算法中是必要的&#xff0c;因为它们通常不能处理类别数据。 1. 用法说…

校园跑腿小程序源码系统+代取快递+食堂超市代买+跑腿 带完整的安装代码包以及搭建教程

随着移动互联网的普及&#xff0c;人们越来越依赖于手机应用来解决日常生活中的各种问题。特别是在校园内&#xff0c;由于快递点距离宿舍较远、食堂排队人数过多等情况&#xff0c;学生对于便捷、高效的服务需求愈发强烈。在此背景下&#xff0c;校园跑腿小程序源码系统应运而…

一款相对比较强大的国产ARM单片机HC32F4A0

已经用了3年的HC32F4A0&#xff0c;已经对它比较熟悉了&#xff0c;与STM32相比它的外设使用这些的确是挺大大&#xff0c;不像GD32一类的单片机很多都能兼容STM32。用久了之后就更喜欢用HC32F4A0&#xff0c;功能强大&#xff0c;外设使用灵活&#xff0c;用点向FPGA靠拢的感觉…

模型选择实战

我们现在可以通过多项式拟合来探索这些概念。 import math import numpy as np import torch from torch import nn from d2l import torch as d2l生成数据集 给定x&#xff0c;我们将使用以下三阶多项式来生成训练和测试数据的标签&#xff1a; max_degree 20 # 多项式的最…

第四十周:文献阅读+GAN

目录 摘要 Abstract 文献阅读&#xff1a;结合小波变换和主成分分析的长短期记忆神经网络深度学习在城市日需水量预测中的应用 现有问题 创新点 方法论 PCA&#xff08;主要成分分析法&#xff09; DWT&#xff08;离散小波变换&#xff09; DWT-PCA-LSTM模型 研究实…

Tomcat Notes: Web Security, HTTPS In Tomcat

This is a personal study notes of Apache Tomcat. Below are main reference material. - YouTube Apache Tomcat Full Tutorial&#xff0c;owed by Alpha Brains Courses. https://www.youtube.com/watch?vrElJIPRw5iM&t801s 1、Overview2、Two Levels Of Web Securi…

运用ETLCloud快速实现数据清洗、转换

一、数据清洗和转换的重要性及传统方式的痛点 1.数据清洗的重要性 数据清洗、转换作为数据ETL流程中的转换步骤&#xff0c;是指在数据收集、处理、存储和使用的整个过程中&#xff0c;对数据进行检查、处理和修复的过程&#xff0c;是数据分析中必不可少的环节&#xff0c;对…

Maps基础知识

什么是Maps&#xff1f; 在JavaScript中&#xff0c;Map是一种用于存储键值对的数据结构。它类似于对象&#xff0c;但有一些区别。 Map对象允许任何类型的值作为键&#xff08;包括对象、函数和基本数据类型&#xff09;&#xff0c;而对象只能使用字符串或符号作为键。这使得…

一次性密码 One Time Password,简称OTP

一次性密码&#xff08;One Time Password&#xff0c;简称OTP&#xff09;&#xff0c;又称“一次性口令”&#xff0c;是指只能使用一次的密码。一次性密码是根据专门算法、每隔60秒生成一个不可预测的随机数字组合&#xff0c;iKEY一次性密码已在金融、电信、网游等领域被广…

three.js中Meshline库的使用

three.js中Meshline的使用 库的地址为什么要使用MeshLine,three.js内置的线不好用吗?MeshLine入门MeshLine的深入思考样条曲线一个问题 库的地址 https://github.com/spite/THREE.MeshLine?tabreadme-ov-file 为什么要使用MeshLine,three.js内置的线不好用吗? 确实不好用,…

一个监控小技巧,巧妙破解超低温冰箱难题!

在当今科技飞速发展的时代&#xff0c;超低温冰箱监控系统以其在各行各业中关键的温度控制和环境监测功能而备受关注。 超低温环境对于存储生物样本、药品和其他温度敏感物品至关重要&#xff0c;而监控系统则提供了实时、精准的环境数据&#xff0c;确保这些物品的质量和安全性…