【Flink】Flink 中的时间和窗口之窗口API使用

news2024/11/15 6:56:56

1. 窗口的API概念

窗口的API使用分为按键分区非按键分区,在定义窗口操作之前,首先就要确定好是基于按键分区Keyed的数据流KeyedStream来开窗还是基于没有按键分区的DataStream上开窗。

1.1 按键分区窗口(Keyed Windows)

按键分区窗口就是按照key分为多条逻辑流logical streams,这就是KeyedStream。基于KeyedStream进行窗口操作时,窗口计算会在多个并行子任务上同时执行。相同key的数据会被发送到同一个并行子任务,而窗口操作会基于每个key进行单独的计算。

代码实现:

stream.keyBy(...)
 .window(...)

1.2 非按键分区(Non-Keyed Windows)

如果没有进行 keyBy,那么原始的DataStream就不会分成多条逻辑流。这时窗口逻辑只能在一个任务(task)上执行,就相当于并行度变成了 1。所以在实际应用中一般不推荐使用这种方式。

代码实现:

stream.windowAll(...)

这里需要注意的是,对于非按键分区的窗口操作,手动调大窗口算子的并行度也是无效的,windowAll 本身就是一个非并行的操作。

1.3 代码中窗口 API 的调用

简单来说,窗口操作主要有两个部分:窗口分配器(Window Assigners)和窗口函数(Window Functions)

代码实现:

stream.keyBy(<key selector>)
 .window(<window assigner>)
 .aggregate(<window function>)

其中.window()方法需要传入一个窗口分配器,它指明了窗口的类型;而后面的.aggregate()方法传入一个窗口函数作为参数,它用来定义窗口具体的处理逻辑。窗口分配器有各种形式,而窗口函数的调用方法也不只.aggregate()一种,我们接下来就详细展开讲解。另外,在实际应用中,一般都需要并行执行任务,非按键分区很少用到,所以我们之后都以按键分区窗口为例;如果想要实现非按键分区窗口,只要前面不做 keyBy,后面调用.window()时直接换成.windowAll()就可以了。

2. 窗口分配器(Window Assigners)

定义窗口分配器(Window Assigners)是构建窗口算子的第一步,它的作用就是定义数据应该被“分配”到哪个窗口。窗口分配数据的规则,其实就对应着不同的窗口类型。所以,窗口分配器其实就是在指定窗口的类型。

窗口分配器最通用的定义方式,就是调用.window()方法。这个方法需要传入一个WindowAssigner 作为参数,返回的WindowedStream。如果是非按键分区窗口,那么直接调用.windowAll()方法,同样传入一个 WindowAssigner,返回的是AllWindowedStream

窗口按照驱动类型可以分成时间窗口计数窗口,而按照具体的分配规则,又有滚动窗口滑动窗口会话窗口全局窗口四种。除去需要自定义的全局窗口外,其他常用的类型 Flink中都给出了内置的分配器实现,我们可以方便地调用实现各种需求。

2.1 时间窗口

时间窗口是最常用的窗口类型,又可以细分为滚动滑动会话三种。
在使用中直接调用.window(),在里面传入对应时间语义下的窗口分配器。这样一来,我们不需要专门定义时间语义,默认就是事件时间;如果想用处理时间,那么在这里传入处理时间窗口分配器就可以了。

2.1.1 滚动处理时间窗口(TumblingProcessingTimeWindows)

窗口分配器由类 TumblingProcessingTimeWindows 提供,需要调用它的静态方法.of()
代码实现:

stringDataStreamSource.keyBy(...)
                .window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
                .aggregate(...);

这里.of()方法需要传入一个 Time 类型的参数 size,表示滚动窗口的大小,我们这里创建了一个长度为 5 秒的滚动窗口。
另外.of()还有一个重载方法public static TumblingProcessingTimeWindows of(Time size, Time offset) ,可以传入两个 Time 类型的参数:sizeoffset。第一个参数表示窗口大小,第二个参数表示窗口起始点的偏移量。这个偏移量主要是解决时区问题,比如我们定义 1 天滚动窗口时,如果用默认的起始点,那么得到就是伦敦时间每天 0点开启窗口,这时是北京时间早上 8 点。那怎样得到北京时间每天 0 点开启的滚动窗口呢?只要设置-8 小时的偏移量就可以了:.window(TumblingProcessingTimeWindows.of(Time.days(1), Time.hours(-8)))

2.1.2 滑动处理时间窗口(SlidingProcessingTimeWindows)

窗口分配器由类 SlidingProcessingTimeWindows 提供,同样需要调用它的静态方法.of()
代码实现:

stringDataStreamSource.keyBy(...)
                .window(SlidingProcessingTimeWindows.of(Time.seconds(10), Time.seconds(5)))
                .aggregate(...)

这里.of()方法需要传入两个 Time 类型的参数:size slide,前者表示滑动窗口的大小,后者表示滑动窗口的滑动步长。我们这里创建了一个长度为 10 秒、滑动步长为 5 秒的滑动窗口。同样滑动窗口也可以追加第三个参数offset -偏移量,用于指定窗口起始点的偏移量,用法与滚动窗口完全一致。

2.1.3 处理时间会话窗口(ProcessingTimeSessionWindows)

窗口分配器由类 ProcessingTimeSessionWindows 提供,需要调用它的静态方法.withGap()或者.withDynamicGap()
代码实现:

stream.keyBy(...)
	.window(ProcessingTimeSessionWindows.withGap(Time.seconds(10)))
	.aggregate(...)

这里.withGap()方法需要传入一个 Time 类型的参数 size,表示会话的超时时间,也就是最小间隔 session gap。我们这里创建了静态会话超时时间为 10 秒的会话窗口。

我们也可以使用.withDynamicGap()方法动态定义session gap时间,这就需要传入一个SessionWindowTimeGapExtractor作为参数,用来定义 session gap 的动态提取逻辑。在这里,我们提取了数据元素的第一个字段,用它的长度乘以 1000 作为会话超时的间隔。

.window(ProcessingTimeSessionWindows.withDynamicGap(new 
	SessionWindowTimeGapExtractor<Tuple2<String, Long>>() {
	 @Override
	 public long extract(Tuple2<String, Long> element) { 
	// 提取 session gap 值返回, 单位毫秒
	 return element.f0.length() * 1000;
	 }
}))

2.1.4 滚动事件时间窗口(TumblingEventTimeWindows)

窗口分配器由类 TumblingEventTimeWindows 提供,用法与滚动处理事件窗口完全一致。

stream.keyBy(...)
	.window(TumblingEventTimeWindows.of(Time.seconds(5)))
	.aggregate(...)

这里.of()方法也可以传入第二个参数 offset,用于设置窗口起始点的偏移量。

2.1.5 滑动事件时间窗口(SlidingEventTimeWindows)

窗口分配器由类 SlidingEventTimeWindows 提供,用法与滑动处理事件窗口完全一致。

stream.keyBy(...)
	.window(SlidingEventTimeWindows.of(Time.seconds(10), Time.seconds(5)))
	.aggregate(...)

2.1.6 事件时间会话窗口(EventTimeSessionWindows)

窗口分配器由类 EventTimeSessionWindows 提供,用法与处理事件会话窗口完全一致。

stream.keyBy(...)
	.window(EventTimeSessionWindows.withGap(Time.seconds(10)))
	.aggregate(...)

2.2 计数窗口

计数窗口本身底层是基于全局窗口(Global Window)实现的。直接调用.countWindow()方法。根据分配规则的不同,又可以分为滚动计数窗口滑动计数窗口两类。

2.2.1 滚动计数窗口

滚动计数窗口只需要传入一个长整型的参数 size,表示窗口的大小。

stream.keyBy(...)
	.countWindow(10)

我们定义了一个长度为 10 的滚动计数窗口,当窗口中元素数量达到 10 的时候,就会触发计算执行并关闭窗口。

2.2.2 滑动计数窗口

与滚动计数窗口类似,不过需要在.countWindow()调用时传入两个参数:sizeslide,前者表示窗口大小,后者表示滑动步长。

stream.keyBy(...)
	.countWindow(103)

我们定义了一个长度为 10、滑动步长为 3 的滑动计数窗口。每个窗口统计 10 个数据,每隔 3 个数据就统计输出一次结果。

2.3 全局窗口

全局窗口是计数窗口的底层实现,一般在需要自定义窗口时使用。它的定义同样是直接用.window(),分配器由 GlobalWindows 类提供。

stream.keyBy(...)
	.window(GlobalWindows.create());

需要注意使用全局窗口,必须自行定义触发器才能实现窗口计算,否则起不到任何作用。

3. 窗口函数(Window Functions)

定义窗口分配器只是知道了数据属于哪个窗口,可以将数据收集起来;但是如果想让收集的数据到底做什么就需要在接上一个定义窗口计算的窗口函数(window functions)

经过窗口分配器处理之后,数据可以分配到对应窗口中,而数据流经过转换得到的数据类型是WindowedStream。不是 DataStream,所以并不能直接进行其他转换,必须进一步调用窗口函数,对收集的数据进行处理计算之后,才能最终得到 DataStream,如下图:
在这里插入图片描述

窗口函数定义了要对窗口中收集的数据做的计算处理,根据处理方式分为两类:增量聚合函数全窗口函数

3.1 增量聚合函数(incremental aggregation functions)

窗口将数据收集起来,最基本的处理操作就是聚合。窗口对无限流的切分,可以看作得到了一个有界数据集。如果如果等到所有数据都到了在窗口到了结束时间再去聚合就很不高效——这就相当于真的在用批处理的思路来做实时流处理。

为了提高实时性,流处理就像 DataStream 的简单聚合一样,每来一条数据就立即进行计算,中间只要保持一个简单的聚合状态就可以了;区别只是在于不立即输出结果,而是要等到窗口结束时间。等到窗口到了结束时间需要输出计算结果的时候,我们只需要拿出之前聚合的状态直接输出即可,这就提高了程序运行的效率和实时性。

典型的增量聚合函数有两个:ReduceFunctionAggregateFunction

3.1.1 归约函数(ReduceFunction)

最简单的聚合就是归约(reduce),就是将窗口中收集到的数据两两进行归约。当我们进行流处理时,就是要保存一个状态;每来一个新的数据,就和之前的聚合状态做归约,这样就实现了增量式的聚合。基于 WindowedStream 调用.reduce()方法,然
后传入 ReduceFunction 作为参数,就可以指定以归约两个元素的方式去对窗口中数据进行聚合了。这里的 ReduceFunction与简单聚合时用到的 ReduceFunction 是同一个函数类接口,所以使用方式也是完全一样的。

代码实例:

public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        // 创建自定义数据源的实时流
        DataStream<Event> stringDataStreamSource = env.addSource(new CustomSource())
                .assignTimestampsAndWatermarks(WatermarkStrategy.<Event>forBoundedOutOfOrderness(Duration.ZERO)
                .withTimestampAssigner(new SerializableTimestampAssigner<Event>() {
                    @Override
                    public long extractTimestamp(Event element, long recordTimestamp) {
                        return element.getTimestamp();
                    }
                })
        );
        
        stringDataStreamSource.map(new MapFunction<Event, Tuple2<String, Long>>() {
            @Override
            public Tuple2<String, Long> map(Event value) throws Exception {
                // 将数据转换成二元组,方便计算
                return Tuple2.of(value.getUser(), 1L);
            }
        }).keyBy(data -> data.f0)
                // 设置滚动事件时间窗口 5秒一个
                .window(TumblingEventTimeWindows.of(Time.seconds(5)))
                .reduce(new ReduceFunction<Tuple2<String, Long>>() {
                    @Override
                    public Tuple2<String, Long> reduce(Tuple2<String, Long> value1, Tuple2<String, Long> value2) throws Exception {
                        // 定义累加规则,窗口闭合时,向下游发送累加结果
                        return Tuple2.of(value1.f0, value1.f1 + value2.f1);
                    }
                }).print();

        env.execute();
    }

CustomSource 代码:

public class CustomSource  implements ParallelSourceFunction<Event> {
    // 声明一个布尔变量,作为控制数据生成的标识位
    private Boolean running = true;

    @Override
    public void run(SourceContext<Event> ctx) throws Exception {
        Random random = new Random(); // 在指定的数据集中随机选取数据
        String[] users = {"Mary", "Alice", "Bob", "Cary"};
        String[] urls = {"./home", "./cart", "./fav", "./prod?id=1", "./prod?id=2"};
        while (running) {
            Event event = new Event(
                    users[random.nextInt(users.length)],
                    urls[random.nextInt(urls.length)],
                    Calendar.getInstance().getTimeInMillis()
            );
            ctx.collect(event);
            System.out.printf("我开始生成数据了,[%s], [%d]%n", event.getUser() ,event.getTimestamp());
            // 隔 1 秒生成一个点击事件,方便观测
            Thread.sleep(1000);
        }
    }

    @Override
    public void cancel() {
            running = false;
    }
}

打印结果:
在这里插入图片描述
这个是凑巧了正好5个数据都在第一个窗口,如果5个数据不在第一个窗口就是这样的:

在这里插入图片描述

3.1.1 聚合函数(AggregateFunction)

ReduceFunction 可以解决大多数归约聚合的问题,但是这个接口有一个限制,就是聚合状态的类型、输出结果的类型都必须和输入数据类型一样。导致需要在聚合前,先将数据转换(map)成预期结果类型但是这样遇到某些场景就很麻烦。

例如,如果我们希望计算一组数据的平均值,应该怎样做聚合呢?很明显,这时我们需要计算两个状态量:数据的总和(sum),以及数据的个数(count),而最终输出结果是两者的商(sum/count)。如果用 ReduceFunction,那么我们应该先把数据转换成二元组(sum, count)的形式式,然后进行归约聚合,最后再将元组的两个元素相除转换得到最后的平均值。本来应该只是一个任务,可我们却需要 map-reduce-map 三步操作,这显然不够高效。

FlinkWindow API 中的aggregate就提供了这样的操作。直接基于 WindowedStream 调用.aggregate()方法,就可以定义更加灵活的窗口聚合操作。这个方法需要传入一个AggregateFunction 的实现类作为参数。AggregateFunction 在源码中的定义如下:

public interface AggregateFunction<IN, ACC, OUT> extends Function, Serializable 
{
	 ACC createAccumulator();
	 ACC add(IN value, ACC accumulator);
	 OUT getResult(ACC accumulator);
	 ACC merge(ACC a, ACC b);
}

AggregateFunction 可以看作是 ReduceFunction 的通用版本,这里有三种类型:输入类型(IN)累加器类型(ACC)输出类型(OUT)输入类型 IN 就是输入流中元素的数据类型;累加器类型 ACC 则是我们进行聚合的中间状态类型;而输出类型当然就是最终计算结果的类型了。
接口中有四个方法:

  1. createAccumulator():创建一个累加器,这就是为聚合创建了一个初始状态,每个聚合任务只会调用一次。
  2. add():将输入的元素添加到累加器中。这就是基于聚合状态,对新来的数据进行进一步聚合的过程。方法传入两个参数:当前新到的数据 value,和当前的累加器accumulator;返回一个新的累加器值,也就是对聚合状态进行更新。每条数据到来之后都会调用这个方法。
  3. getResult():从累加器中提取聚合的输出结果。也就是说,我们可以定义多个状态,然后再基于这些聚合的状态计算出一个结果进行输出。比如之前我们提到的计算平均值,就可以把 sumcount 作为状态放入累加器,而在调用这个方法时相除得到最终结果。这个方法只在窗口要输出结果时调用。
  4. merge():合并两个累加器,并将合并后的状态作为一个累加器返回。这个方法只在需要合并窗口的场景下才会被调用;最常见的合并窗口(Merging Window)的场景就是会话窗口(Session Windows)

AggregateFunction 的工作原理是:首先调用 createAccumulator()为任务初始化一个状态(累加器);而后每来一个数据就调用一次 add()方法,对数据进行聚合,得到的结果保存在状态中;等到了窗口需要输出时,再调用 getResult()方法得到计算结果。很明显,与ReduceFunction相同,AggregateFunction 也是增量式的聚合;而由于输入、中间状态、输出的类型可以不同,使得应用更加灵活方便。

代码实例:

public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        // 创建自定义数据源的实时流
        DataStream<Event> stringDataStreamSource = env.addSource(new CustomSource())
                .assignTimestampsAndWatermarks(WatermarkStrategy.<Event>forBoundedOutOfOrderness(Duration.ZERO)
                .withTimestampAssigner(new SerializableTimestampAssigner<Event>() {
                    @Override
                    public long extractTimestamp(Event element, long recordTimestamp) {
                        return element.getTimestamp();
                    }
                })
        );

        stringDataStreamSource.keyBy(data -> data.getUser())
                // 设置滚动事件时间窗口
                .window(TumblingEventTimeWindows.of(Time.seconds(5)))
                // 输入 Event 累加器是 Tuple3<String, Long, Integer> 用户名,时间戳,个数,输出Tuple3<String, Integer, String> 用户名,个数,平均值
                .aggregate(new AggregateFunction<Event, Tuple3<String, Long, Integer>, Tuple3<String, Integer, String>>() {
                            
                            // 初始化累加器
                            @Override
                            public Tuple3<String, Long, Integer> createAccumulator() {
                                return Tuple3.of("", 0L, 0);
                            }
                            // 当数据来了处理一个数据
                            @Override
                            public Tuple3<String, Long, Integer> add(Event value, Tuple3<String, Long, Integer> accumulator) {
                                return Tuple3.of(value.getUser(), accumulator.f1 + value.getTimestamp(), accumulator.f2 + 1);
                            }

                            // 返回结果
                            @Override
                            public Tuple3<String, Integer, String> getResult(Tuple3<String, Long, Integer> accumulator) {
                                Timestamp timestamp = new Timestamp(accumulator.f1 / accumulator.f2);
                                return Tuple3.of(accumulator.f0, accumulator.f2, timestamp.toString());
                            }

                            // merge操作一般是会话窗口才会用到
                            @Override
                            public Tuple3<String, Long, Integer> merge(Tuple3<String, Long, Integer> a, Tuple3<String, Long, Integer> b) {
                                return Tuple3.of(a.f0, a.f1 + b.f1, a.f2 + b.f2);
                            }
                        }).print();

        env.execute();
    }

输出结果:
在这里插入图片描述

AggregateFunction 的工作原理是:

  • 首先调用 createAccumulator()为任务初始化一个状态(累加器);
  • 而后每来一个数据就调用一次 add()方法,对数据进行聚合,得到的结果保存在状态中;
  • 等到了窗口需要输出时,再调用 getResult()方法得到计算结果。
  • 很明显,与ReduceFunction相同,AggregateFunction 也是增量式的聚合;而由于输入、中间状态、输出的类型可以不同,使得应用更加灵活方便。

3.2 全窗口函数(full window functions)

窗口操作中的另一大类就是全窗口函数。与增量聚合函数不同,全窗口函数需要先收集窗口中的数据,并在内部缓存起来,等到窗口要输出结果的时候再取出数据进行计算。

全窗口函数就是典型的批处理思路——等到所有数据都到齐了在处理数据。这种处理效率很慢,但为什么还会存在全窗口函数?是因为有些场景下,必须需要基于全部数据的计算结果才有效,这是聚合函数做不到的。

在 Flink 中,全窗口函数也有两种:WindowFunctionProcessWindowFunction

3.2.1 窗口函数(WindowFunction)

WindowFunction 字面上就是“窗口函数”,它其实是老版本的通用窗口函数接口。我们可以基于 WindowedStream 调用.apply()方法,传入一个 WindowFunction 的实现类。

stream
 .keyBy(<key selector>)
 .window(<window assigner>)
 .apply(new MyWindowFunction());

这个类中可以获取到包含窗口所有数据的可迭代集合(Iterable),还可以拿到窗口(Window)本身的信息。WindowFunction 接口在源码中实现如下:

public interface WindowFunction<IN, OUT, KEY, W extends Window> extends Function, Serializable {
	void apply(KEY key, W window, Iterable<IN> input, Collector<OUT> out) throws Exception;
}

WindowFunction 能提供的上下文信息较少,也没有更高级的功能。事实上,它的作用可以被ProcessWindowFunction全覆盖,所以之后可能会逐渐弃用。一般在实际应用,直接使用 ProcessWindowFunction 就可以了。

public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        // 创建自定义数据源的实时流
        DataStream<Event> stringDataStreamSource = env.addSource(new CustomSource())
                .assignTimestampsAndWatermarks(WatermarkStrategy.<Event>forBoundedOutOfOrderness(Duration.ZERO)
                .withTimestampAssigner(new SerializableTimestampAssigner<Event>() {
                    @Override
                    public long extractTimestamp(Event element, long recordTimestamp) {
                        return element.getTimestamp();
                    }
                })
        );
        stringDataStreamSource.keyBy(data -> true)
                // 设置滚动事件时间窗口
                .window(TumblingEventTimeWindows.of(Time.seconds(5)))
                        .apply(new WindowFunction<Event, String, Boolean, TimeWindow>() {
                            @Override
                            public void apply(Boolean aBoolean, TimeWindow window, Iterable<Event> input, Collector<String> out) throws Exception {
                                long start = window.getStart();
                                long end = window.getEnd();

                                HashSet<String> users = new HashSet<>();
                                for (Event event : input) {
                                    users.add(event.getUser());
                                }
                                out.collect("窗口:" + new Timestamp(start) + " ~ " + new Timestamp(end)
                                        + "的独立访客数量是:" + users.size());
                            }
                        }).print();
        stringDataStreamSource.print("data");
        env.execute();
    }

输出结果:
在这里插入图片描述

可以看到TimeWindows的属性很少
在这里插入图片描述

3.2.2 处理窗口函数(ProcessWindowFunction)

ProcessWindowFunctionWindow API 中最底层的通用窗口函数接口。之所以说它“最底层”,是因为除了可以拿到窗口中的所有数据之外,ProcessWindowFunction 还可以获取到一个“上下文对象”(Context)。这个上下文对象非常强大,不仅能够获取窗口信息,还可以访问当前的时间和状态信息。这里的时间就包括了处理时间(processing time)事件时间水位线(event time watermark)。这就使得 ProcessWindowFunction 更加灵活、功能更加丰富。事实上,ProcessWindowFunction 是 Flink 底层 API——处理函数(process function)中的一员。

作为全窗口函数ProcessWindowFunction 同样需要将所有数据缓存下来,等到窗口触发计算时才使用,其实就是增强版的WindowFunction

比如求一个网站的每小时用户量:

public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        // 创建自定义数据源的实时流
        DataStream<Event> stringDataStreamSource = env.addSource(new CustomSource())
                .assignTimestampsAndWatermarks(WatermarkStrategy.<Event>forBoundedOutOfOrderness(Duration.ZERO)
                .withTimestampAssigner(new SerializableTimestampAssigner<Event>() {
                    @Override
                    public long extractTimestamp(Event element, long recordTimestamp) {
                        return element.getTimestamp();
                    }
                })
        );
        stringDataStreamSource.keyBy(data -> true)
                // 设置滚动事件时间窗口
                .window(TumblingEventTimeWindows.of(Time.seconds(5)))
                        .process(new UvCountByWindow()).print();
        stringDataStreamSource.print("data");
        env.execute();
    }

    public static class UvCountByWindow extends ProcessWindowFunction<Event, String, Boolean, TimeWindow> {

        @Override
        public void process(Boolean aBoolean, ProcessWindowFunction<Event, String, Boolean, TimeWindow>.Context context, Iterable<Event> elements, Collector<String> out) throws Exception {
            HashSet<String> users = new HashSet<>();
            // 遍历所有数据获取去重的用户名单
            for (Event element : elements) {
                users.add(element.getUser());
            }

            // 获取窗口信息
            long start = context.window().getStart();
            long end = context.window().getEnd();
            out.collect(" 窗 口 : " + new Timestamp(start) + " ~ " + new
                    Timestamp(end) + " 的独立访客数量是:" + users.size());
        }
    }

输出结果:
在这里插入图片描述

3.3 增量聚合和全窗口函数的结合使用、

增量聚合函数处理计算会更高效。举一个最简单的例子,对一组数据求和。大量的数据连续不断到来,全窗口函数只是把它们收集缓存起来,并没有处理;到了窗口要关闭、输出结果的时候,再遍历所有数据依次叠加,得到最终结果。而如果我们采用增量聚合的方式,那么只需要保存一个当前和的状态,每个数据到来时就会做一次加法,更新状态;到了要输出结果的时候,只要将当前状态直接拿出来就可以了。

增量聚合相当于把计算量“均摊”到了窗口收集数据的过程中,自然就会比全窗口聚合更加高效、输出更加实时。而全窗口函数的优势在于提供了更多的信息,可以认为是更加“通用”的窗口操作。它只负责收集数据、提供上下文相关信息,把所有的原材料都准备好,至于拿来做什么我们完全可以任意发挥。这就使得窗口计算更加灵活,功能更加强大。所以在实际应用中,我们往往希望兼具这两者的优点,把它们结合在一起使用。Flink 的Window API 就给我们实现了这样的用法。

调用WindowedStream.reduce().aggregate()方法时,只是简单地直接传入了一个 ReduceFunctionAggregateFunction 进行增量聚合。除此之外,其实还可以传入第二个参数:一个全窗口函数,可以是 WindowFunction 或者 ProcessWindowFunction

// ReduceFunction 与 WindowFunction 结合
public <R> SingleOutputStreamOperator<R> reduce(
 ReduceFunction<T> reduceFunction, WindowFunction<T, R, K, W> function) 

// ReduceFunction 与 ProcessWindowFunction 结合
public <R> SingleOutputStreamOperator<R> reduce(
 ReduceFunction<T> reduceFunction, ProcessWindowFunction<T, R, K, W> 
function)

// AggregateFunction 与 WindowFunction 结合
public <ACC, V, R> SingleOutputStreamOperator<R> aggregate(
 AggregateFunction<T, ACC, V> aggFunction, WindowFunction<V, R, K, W> 
windowFunction)

// AggregateFunction 与 ProcessWindowFunction 结合
public <ACC, V, R> SingleOutputStreamOperator<R> aggregate(
 AggregateFunction<T, ACC, V> aggFunction,
 ProcessWindowFunction<V, R, K, W> windowFunction)

调用的处理机制是:基于第一个参数(增量聚合函数)来处理窗口数据,每来一个数据就做一次聚合;等到窗口需要触发计算时,则调用第二个参数(全窗口函数)的处理逻辑输出结果。需要注意的是,这里的全窗口函数就不再缓存所有数据了,而是直接将增量聚合函数的结果拿来当作了 Iterable 类型的输入。一般情况下,这时的可迭代集合中就只有一个元素了。

举一个具体的实例来说明。在网站的各种统计指标中,一个很重要的统计指标就是热门的链接;想要得到热门的 url,前提是得到每个链接的“热门度”。一般情况下,可以用url 的浏览量(点击量)表示热门度。我们这里统计 10 秒钟的 url 浏览量,每 5 秒钟更新一次;另外为了更加清晰地展示,还应该把窗口的起始结束时间一起输出。我们可以定义滑动窗口,并结合增量聚合函数和全窗口函数来得到统计结果。

public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        // 创建自定义数据源的实时流
        DataStream<Event> stringDataStreamSource = env.addSource(new CustomSource())
                .assignTimestampsAndWatermarks(WatermarkStrategy.<Event>forBoundedOutOfOrderness(Duration.ZERO)
                .withTimestampAssigner(new SerializableTimestampAssigner<Event>() {
                    @Override
                    public long extractTimestamp(Event element, long recordTimestamp) {
                        return element.getTimestamp();
                    }
                })
        );

        stringDataStreamSource.keyBy(data -> true)
                // 设置滚动事件时间窗口 10秒
                .window(TumblingEventTimeWindows.of(Time.seconds(10)))
                // 同时传入增量聚合函数和全窗口函数 增量聚合函数的输出就是全局窗口的输入
                .aggregate(new UvAgg(), new UvCountResult()).print();
        stringDataStreamSource.print("data");
        env.execute();
    }

    // 自定义实现AggregateFunction ,增量聚合计算uv值,来一条数据就加一个数据
    public static class UvAgg implements AggregateFunction<Event, HashSet<String>, Integer> {

        @Override
        public HashSet<String> createAccumulator() {
            return new HashSet<>();
        }

        @Override
        public HashSet<String> add(Event value, HashSet<String> accumulator) {
            accumulator.add(value.getUser());
            return accumulator;
        }

        @Override
        public Integer getResult(HashSet<String> accumulator) {
            return accumulator.size();
        }

        @Override
        public HashSet<String> merge(HashSet<String> a, HashSet<String> b) {
            return null;
        }
    }

    // 自定义全局窗口处理函数 处理增量聚合窗口的值
    public static class UvCountResult extends ProcessWindowFunction<Integer, String, Boolean, TimeWindow> {

        @Override
        public void process(Boolean aBoolean, Context context, Iterable<Integer> elements, Collector<String> out) throws Exception {
            long start = context.window().getStart();
            long end = context.window().getEnd();
            // 增量聚合窗口的返回值就只有一个
            long uv  = elements.iterator().next();
            out.collect(" 窗 口 : " + new Timestamp(start) + " ~ " + new
                    Timestamp(end) + " 的独立访客数量是:" + uv);
        }
    }

输出结果:
在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1527770.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

二蛋赠书十八期:《一本书讲透Elasticsearch:原理、进阶与工程实践》

Elasticsearch 是一种强大的搜索和分析引擎&#xff0c;被广泛用于各种应用中&#xff0c;以其强大的全文搜索能力而著称。 不过&#xff0c;在日常管理 Elasticsearch 时&#xff0c;我们经常需要对索引进行保护&#xff0c;以防止数据被意外修改或删除&#xff0c;特别是在进…

20240318uniapp怎么引用组件

在script中增加 import index from "/pages/index/index.vue" 把index直接整个作为一个组件引入 然后注册组件 在export default中增加 components: {index:index }, 注册了index组件&#xff0c;内容为import的index 然后就可以在template里使用 <index&…

03|提示工程(下):用思维链和思维树提升模型思考质量

什么是 Chain of Thought CoT这个概念来源于学术界&#xff0c;是谷歌大脑的Jason Wei等人于2022年在论文《Chain-of-Thought Prompting Elicits Reasoning in Large Language Models&#xff08;自我一致性提升了语言模型中的思维链推理能力&#xff09;》中提出来的概念。它…

双向队列广搜

适用情况 适用的情况&#xff1a;解决最短路径问题 当我们已起始点和终点时&#xff0c;我们可以采用双向队列广搜去解决问题。所谓的双向队列广搜&#xff0c;就是让起点向终点搜索&#xff0c;终点向起点搜索&#xff0c;二者同时开始&#xff0c;那么当它们第一次1相遇时&am…

RocketMQ - 一条消息写入CommitLog文件之后,如何实时更新索引文件?

Broker收到一条消息之后,其实就会直接把消息写入到CommitLog里去,但是它写入刚开始仅仅是写入到MappedFile映射的一块内存里去,后续是根据刷盘策略去决定是否立即把数据从内存刷入磁盘的。 实际上,Broker启动的时候会开启一个线程,ReputMessageService,他会把CommitLog更…

FreeRTOS教程7 事件组

目录 1、准备材料 2、学习目标 3、前提知识 3.1、什么是事件组&#xff1f; 3.1、事件组特征 3.1.1、事件组、事件标志和事件位 3.1.2、EventBits_t 数据类型 3.1.3、多个任务访问 3.2、创建事件组 3.3、操作事件组 3.4、xEventGroupWaitBits() API 函数 3.4.1、ux…

主机与windows虚拟机远程桌面实现方法

目录 一、虚拟机相关配置1. 配置虚拟机网络2. 打开虚拟机远程桌面功能3. 配置虚拟机用户与分组 二、主机相关配置 当无法通过共享文件夹实现主机与windows虚拟机文件共享时&#xff0c;可以通过主机与虚拟机远程桌面的方法实现文件的共享传输。本文主要介绍主机与虚拟机远程桌面…

【接口防重复提交】⭐️基于RedisLockRegistry 分布式锁管理器实现

目录 前言 思路 实现方式 实践 1.引入相关依赖 2.aop注解 3.切面类代码 4.由于启动时报错找不到对应的RedisLockRegistry bean&#xff0c;选择通过配置类手动注入&#xff0c;配置类代码如下 测试 章末 前言 项目中有个用户根据二维码绑定身份的接口&#xff0c;由于用户在…

诺视科技完成亿元Pre-A2轮融资,加速Micro-LED微显示芯片商业化落地

近日&#xff0c;Micro-LED微显示芯片研发商诺视科技&#xff08;苏州&#xff09;有限公司&#xff08;以下简称“诺视科技”&#xff09;宣布完成亿元Pre-A2轮融资&#xff0c;本轮融资由力合资本领投&#xff0c;老股东盛景嘉成、汕韩基金以及九合创投持续加码&#xff0c;这…

YOLOv8改进 | 图像去雾 | MB-TaylorFormer改善YOLOv8高分辨率和图像去雾检测(ICCV,全网独家首发)

一、本文介绍 本文给大家带来的改进机制是图像去雾MB-TaylorFormer,其发布于2023年的国际计算机视觉会议(ICCV)上,可以算是一遍比较权威的图像去雾网络, MB-TaylorFormer是一种为图像去雾设计的多分支高效Transformer网络,它通过应用泰勒公式展开的方式来近似softmax-at…

华为openEuler系统安装openjdk并配置环境变量

华为openEuler系统安装openjdk并配置环境变量 1、安装JDK软件包 执行dnf list installed | grep jdk 查询JDK软件是否已安装。 $ dnf list installed | grep jdk查看命令打印信息&#xff0c;若打印信息中包含“jdk”&#xff0c;表示该软件已经安装了&#xff0c;则不需要再…

堆排序(向下调整法,向上调整法详解)

目录 一、 二叉树的顺序结构 二、 堆的概念及结构 三、数组存储、顺序存储的规律 此处可能会有疑问&#xff0c;左右孩子的父节点计算为什么可以归纳为一个结论了&#xff1f; 四、大小堆解释 五、大小堆的实现&#xff08;向上和向下调整法&#xff09; 5.11向上调整法…

docxTemplater——从word模板生成docx文件

官网文档&#xff1a;Get Started (Browser) | docxtemplater 官网在线演示&#xff1a;Demo of Docxtemplater with all modules active | docxtemplater 源码&#xff1a;https://github.com/open-xml-templating/docxtemplater 不仅可以处理word&#xff08;免费&#xf…

【深度学习实践】面部表情识别,深度学习分类模型,mmpretrain用于分类的实用教程,多任务网络头

文章目录 数据集数据集的进一步处理转换training.csv转换validation.csv 剔除无法使用的图片数据选择mmpretrain框架来训练配置四个文件改写base model改写base datasetsschedulesdefault_runtime 总配置开始训练训练分析考虑在网络上增加facial_landmarks回归head考虑是否可以…

论文解析:V3D: Video Diffusion Models are Effective 3DGenerators

摘要&#xff1a; 自动三维生成最近引起了广泛关注。最近的方法大大加快了生成速度&#xff0c;但由于模型容量有限或三维数据&#xff0c;生成的物体通常不够精细。在视频扩散模型最新进展的推动下&#xff0c;我们引入了 V3D&#xff0c;利用预训练视频扩散模型的世界模拟能…

【已解决】MySQL:常用的除法运算+精度处理+除数为0处理

目录 问题现象&#xff1a; 问题分析&#xff1a; 拓展&#xff1a; 1、除法运算&#xff1a; 拓展&#xff1a;MySQL中常用的几种除法运算 1、取整除法 2、浮点数除法 3、取余除法 4、向上取整除法 5、向下取整除法 2、运算结果的精度处理 1.1、浮点数 1.2、总位数 1.3、…

蓝桥杯练习题——健身大调查

在浏览器中预览 index.html 页面效果如下&#xff1a; 目标 完成 js/index.js 中的 formSubmit 函数&#xff0c;用户填写表单信息后&#xff0c;点击蓝色提交按钮&#xff0c;表单项隐藏&#xff0c;页面显示用户提交的表单信息&#xff08;在 id 为 result 的元素显示&#…

2024年敏捷产品负责人CSPO认证培训

课程名称&#xff1a;Scrum Product Owner CSPO产品负责人认证 课程类型&#xff1a;经理级 课程简介&#xff1a; Scrum Product Owner产品负责人在Scrum产品开发当中扮演“舵手”的角色&#xff0c;他决定产品的愿景、路线图以及投资回报&#xff0c;他需要回答为什么做&am…

【ZooKeeper】2、安装

本文基于 Apache ZooKeeper Release 3.7.0 版本书写 作于 2022年3月6日 14:22:11 转载请声明 下载zookeeper安装包 wget https://mirrors.tuna.tsinghua.edu.cn/apache/zookeeper/zookeeper-3.7.0/apache-zookeeper-3.7.0-bin.tar.gz解压 tar -zxvf apache-zookeeper-3.7.0-b…

OpenCV Steger算法提取条纹中心线

文章目录 一、简介二、实现代码三、实现效果参考资料一、简介 Steger 算法是一种常用的图像边缘检测算法,可以用于提取图像中的中心线或边缘信息。它的理论假设是:条纹的亮度是按照高斯分布呈现的,即中心亮两侧渐暗。 其计算过程如下所述: 1、首先,我们需要计算每个点Hess…