Flink(八)【窗口】

news2024/9/20 15:35:35

前言

        终于忙完了四门专业课的期末,确实挺累啊。今天开始继续学习 Flink ,接着上次的内容。

今日摘录:

        他觉得一个人奋斗更轻松自在。跟没有干劲的人在一起厮混,只会徒增压力。       

                                                                                                                -《解忧杂货店》

1、窗口

        之前我们已经了解了 Flink 中基本的聚合操作。在流处理中,我们往往需要面对的是连续不断、无休无止的无界流,不可能等到所有所有数据都到齐了才开始处理。所以聚合计算其实只能针对当前已有的数据——之后再有数据到来,就需要继续叠加、再次输出结果。这样似乎很“实时”,但现实中大量数据一般会同时到来,需要并行处理,这样频繁地更新结果就会给系统带来很大负担了。更加高效的做法是,把无界流进行切分,每一段数据分别进行聚合,结果只输出一次。这就相当于将无界流的聚合转化为了有界数据集的聚合,这就是所谓的“窗口”(Window)聚合操作。窗口聚合其实是对实时性和处理效率的一个权衡。在实际应用中,我们往往更关心一段时间内数据的统计结果,比如在过去的 1 分钟内有多少用户点击了网页。在这种情况下,我们就可以定义一个窗口,收集最近一分钟内的所有用户点击数据,然后进行聚合统计,最终输出一个结果就可以了。在 Flink 中,提供了非常丰富的窗口操作,下面我们就来详细介绍。

1.1、窗口的概念

        这里的窗口和我们之前 Hive 中学到的窗口的概念是不一样的。Hive的窗口函数主要用于离线数据的聚合,是基于字段范围的,而Flink的窗口则是基于时间更多地用于处理实时数据流。

        在 Flink 中,窗口可以把流切割成有限大小的多个“存储桶”(bucket);每个数据都会分发到对应的桶中,当到达窗口结束时间时,就对每个桶中收集的数据进行计算处理。

        Flink 中窗口并不是静态准备好的,而是动态创建的,只有这个窗口区间内的数据到达时,才会去创建对应的窗口。

1.2、窗口的分类

1.2.1、按照度量标准

1)时间窗口

        以时间点来定义窗口的开始和结束,所以截取出的就是某一段时间的数据。到达结束时间时,窗口不再收集数据,触发计算输出结果,并将窗口关闭销毁。

2)计数窗口

        基于元素个数来截取数据,元素达到固定的个数时,就触发计算并关闭窗口。

1.2.2、按照窗口分配数据的规则

1)滚动窗口

        滚动窗口有固定的大小,是一种对数据进行“均匀切片”的划分方式。窗口之间没有重叠,也不会有间隔,是“首尾相接”的状态。如果我们把多个窗口的创建,看作一个窗口的运动,那就好像它在不停地向前“翻滚”一样。也正是因为滚动窗口是“无缝衔接”,所以每个数据都会被分配到一个窗口,而且只会属于一个窗口。

2)滑动窗口

        与滚动窗口类似,滑动窗口的大小也是固定的。区别在于,窗口之间并不是首尾相接的,而是可以“错开”一定的位置。如果看作一个窗口的运动,那么就像是向前小步“滑动”一样。既然是向前滑动,那么每一步滑多远,就也是可以控制的。

        滑动窗口的参数有两个:窗口大小(window size)和 “滑动步长”(window slide)。当滑动步长等于窗口大小时,这就像是一个滚动窗口;当滑动步长大于窗口大小时,会有数据的遗漏。所以一般情况下,我们会让滑动步长小于窗口大小,并尽量设置为整数倍的关系。

3)会话窗口

        会话窗口顾名思义,是基于“会话”(session)来来对数据进行分组的。这里的会话类似Web 应用中 session 的概念,不过并不表示两端的通讯过程,而是借用会话超时失效的机制来描述窗口。简单来说,就是数据来了之后就开启一个会话窗口,如果接下来还有数据陆续到来,那么就一直保持会话;如果一段时间一直没收到数据,那就认为会话超时失效,窗口自动关闭。

        与滑动窗口和滚动窗口不同,会话窗口只能基于时间来定义,而没有“会话计数窗口”的概念。

4)全局窗口

        还有一类比较通用的窗口,就是“全局窗口”。这种窗口全局有效,会把相同 key 的所有数据都分配到同一个窗口中;说直白一点,就跟没分窗口一样。无界流的数据永无止尽,所以这种窗口也没有结束的时候,默认是不会做触发计算的。如果希望它能对数据进行计算处理,还需要自定义“触发器”(Trigger)。

1.3、API

1.3.1、 按键分区(Keyed)和非按键分区(Non-Keyed)

        在定义窗口操作之前,首先需要确定,到底是基于按键分区(Keyed)的数据流 KeyedStream来开窗,还是直接在没有按键分区的 DataStream 上开窗。也就是说,在调用窗口算子之前,是否有 keyBy 操作。

1)按键分区窗口(Keyed Windows)

        经过按键分区 keyBy 操作后,数据流会按照 key 被分为多条逻辑流(logical streams),这就是 KeyedStream。基于 KeyedStream 进行窗口操作时, 窗口计算会在多个并行子任务上同时执行。相同 key 的数据会被发送到同一个并行子任务,而窗口操作会基于每个 key 进行单独的处理。所以可以认为,每个 key 上都定义了一组窗口,各自独立地进行统计计算。在代码实现上,我们需要先对 DataStream 调用.keyBy()进行按键分区,然后再调用.window()定义窗口。

stream.keyBy(...).window(...)
2)非按键分区(Non-Keyed)

        如果没有进行 keyBy,那么原始的 DataStream 就不会分成多条逻辑流。这时窗口逻辑只能在一个任务(task)上执行,就相当于并行度变成了 1。所以在实际应用中一般不推荐使用这种方式。
        在代码中,直接基于 DataStream 调用.windowAll()定义窗口。

stream.windowAll(...)

这里需要注意的是,对于非按键分区的窗口操作,手动调大窗口算子的并行度也是无效的,windowAll 本身就是一个非并行的操作。

1.3.2、API 的调用

窗口操作主要有两个部分:窗口分配器(Window Assigners)和窗口函数(Window Functions)。

stream.keyBy(<key selector>)
    .window(<window assigner>)
    .aggregate(<window function>)

其中.window()方法需要传入一个窗口分配器,它指明了窗口的类型;而后面的.aggregate()方法传入一个窗口函数作为参数,它用来定义窗口具体的处理逻辑。窗口分配器有各种形式,而窗口函数的调用方法也不只.aggregate()一种。另外,在实际应用中,一般都需要并行执行任务,非按键分区很少用到,所以我们之后都以按键分区窗口为例;如果想要实现非按键分区窗口,只要前面不做 keyBy,后面调用.window()时直接换成.windowAll()就可以了。

1.4、窗口分配器

        用于指定窗口的类型,也就是指定窗口的度量标准(基于时间/元素个数)和分配数据的规则(滚动/滑动/会话/全局)。

        窗口分配器最通用的定义方式,就是调用.window()方法。这个方法需要传入一个WindowAssigner 作为参数,返回 WindowedStream。如果是非按键分区窗口,那么直接调用.windowAll()方法,同样传入一个 WindowAssigner,返回的是 AllWindowedStream。

        除去需要自定义的全局窗口外,其他三种常用的类型 Flink中都给出了内置的分配器实现,我们可以方便地调用实现各种需求。

下面的例子我们都使用按键分区的窗口类型,这里给出 keyBy 后得到的数据流对象 KeyedStream:

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        SingleOutputStreamOperator<WaterSensor> sensorDS = env
                .socketTextStream("localhost", 9999)
                .map(new WaterSensorFunction());

        KeyedStream<WaterSensor, String> sensorKs = sensorDS.keyBy(sensor -> sensor.getId());

1.4.1、基于时间的窗口

注意:这里的时间有两种语义,一种是处理时间(对于API中 xxxProcessingTimeWindows),一种是事件时间(对于API中 xxxEventProcessingWindows),这涉及到时间语义的内容,之后学到时间语义再详细说。

        这里只需要知道,TumblingProcessingTimeWindows和TumblingEventTimeWindows的主要区别在于时间基准和触发机制。TumblingProcessingTimeWindows基于处理时间触发计算,适用于实时性要求高的场景;而TumblingEventTimeWindows基于事件时间触发计算,对于处理延迟到达的数据更加灵活。

1)基于处理时间的滚动窗口
    // 基于时间的滚动创建 10s
        WindowedStream<WaterSensor, String, TimeWindow> tumblingWindow = sensorKs.window(TumblingProcessingTimeWindows.of(Time.seconds(10)));

        这里的 of 方法还可以指定第二个参数,可以传入两个 Time 类型的参数:size 和 offset。第一个参数当然还是窗口大小,第二个参数则表示窗口起始点的偏移量。 

        我们知道,不同国家分布在不同的时区。标准时间戳其实就是1970 年 1 月 1 日 0 时 0 分 0 秒 0 毫秒开始计算的一个毫秒数,而这个时间是以 UTC 时间,也就是 0 时区(伦敦时间)为标准的。我们所在的时区是东八区,也就是 UTC+8,跟 UTC 有 8小时的时差。我们定义 1 天滚动窗口时,如果用默认的起始点,那么得到就是伦敦时间每天 0点开启窗口,这时是北京时间早上 8 点。那怎样得到北京时间每天 0 点开启的滚动窗口呢?只要设置-8 小时的偏移量就可以了:

sensorKs.window(TumblingProcessingTimeWindows.of(Time.days(1), Time.hours(-8)))
2)基于处理时间的滑动窗口
// 基于时间的滑动窗口 窗口大小10s 滑动步长2s
        WindowedStream<WaterSensor, String, TimeWindow> slideWindow = sensorKs.window(SlidingProcessingTimeWindows.of(Time.seconds(10), Time.seconds(2)));
3)基于处理时间的会话窗口

注意:会话窗口只能基于时间(处理时间或事件时间),不可以基于计数,因为我们会话窗口它划分窗口的规则只能是基于时间间隔的。

        // 基于时间的会话窗口 中间间隔多久没有数据来,就把前面的数据划分为一个窗口,这里指定5s没有数据来就把前面的数据划分为一个窗口,后面的数据是一个新的窗口
        WindowedStream<WaterSensor, String, TimeWindow> sessionWindow = sensorKs.window(ProcessingTimeSessionWindows.withGap(Time.seconds(5)));

 也可以动态指定间隔(每来一个数据都会修改它的间隔):

// 基于处理时间的会话窗口 动态间隔
        sensorKs.window(ProcessingTimeSessionWindows.withDynamicGap(new SessionWindowTimeGapExtractor<WaterSensor>() {
            @Override
            public long extract(WaterSensor sensor) {
                // 取 WaterSenor 对象的 ts 属性作为间隔,单位 ms 这里*1000方便输入
                return sensor.getTs() * 1000;  
            }
        }));

比如我们连续很快输入:

s1,5,5
s2,6,6
s3,7,7
s8,8,8

它会再我们输出完 "s8,8,8" 后再过八秒没有数据来,就输出窗口(窗口中的值就是计算的结果,具体计算的逻辑取决于我们指定的窗口函数)

4)基于事件时间的滚动窗口
// 基于事件时间的滚动窗口
        sensorKs.window(TumblingEventTimeWindows.of(Time.seconds(10)));
5)基于事件时间的滑动窗口
        // 基于事件时间的滑动窗口
        sensorKs.window(SlidingEventTimeWindows.of(Time.seconds(10),Time.seconds(2)));
6)基于事件时间的会话窗口
        // 基于事件时间的会话窗口
        sensorKs.window(EventTimeSessionWindows.withGap(Time.seconds(5)));

1.4.2、基于计数的窗口

        滚动计数窗口和滑动计数窗口的方法是同一个,只不过一个只需要传入一个参数,一个需要传入两个参数。

1)滚动计数窗口
// 基于计数的滚动窗口 窗口长度=5个元素
        sensorKs.countWindow(5);
2)滑动计数窗口
        // 基于计数的滑动窗口 窗口长度=5个元素,窗口步长的=2个元素,也就是说两个窗口间隔两个元素
        sensorKs.countWindow(5,2);

滑动计数窗口会在数据量每达到步长的时候计算一次,而数据量每达到窗口大小才会关闭窗口开启一个新的窗口。 只要经过一个步长,就一定有一个窗口关闭。

对于上面的滑动计数窗口,它每一个步长2就会滑动一次(并输出一次计算结果)。

3)全局窗口

全局窗口一般在自定义的时候才会用到。

我们可以看到,滚动计数窗口和滑动计数窗口的实现的底层就是全局窗口,只不过滚动计数窗口中指定了触发器,滑动计数窗口中指定了触发器和移除器:

1.5、窗口函数

        定义了窗口分配器,我们只是知道了数据属于哪个窗口,可以将数据收集起来了;至于收集起来到底要怎么做,这就需要使用窗口函数来操作了(注意:是窗口函数而不是其他普通函数,因为经过划分窗口后的数据流是 WindowedStream 而不是 DataStream)。所以在窗口分配器之后,必须再接上一个定义窗口如何进行计算的操作,这就是所谓的“窗口函数”(window functions)。
        经窗口分配器处理之后,数据可以分配到对应的窗口中,而数据流经过转换得到的数据类型是 WindowedStream。这个类型并不是 DataStream,所以并不能直接进行其他转换,而必须进一步调用窗口函数,对收集到的数据进行处理计算之后,才能最终再次得到 DataStream

1.5.1、增量聚合函数(reduce/aggregate)

 1)规约函数(ReduceFunction)

public class ReduceDemo {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        SingleOutputStreamOperator<WaterSensor> sensorDS = env
                .socketTextStream("localhost", 9999)
                .map(new WaterSensorFunction());

        KeyedStream<WaterSensor, String> sensorKs = sensorDS.keyBy(WaterSensor::getId);

        // todo 1. 指定窗口分配器:基于处理时间的滚动窗口
        WindowedStream<WaterSensor, String, TimeWindow> tumblingWindow = sensorKs.window(TumblingProcessingTimeWindows.of(Time.seconds(10)));
        // todo 2. 指定窗口函数:增量聚合的规约函数
        SingleOutputStreamOperator<WaterSensor> reduce = tumblingWindow.reduce(new ReduceFunction<WaterSensor>() {
            @Override
            public WaterSensor reduce(WaterSensor sensor1, WaterSensor sensor2) throws Exception {
                System.out.println("调用 reduce 函数,sensor1= " + sensor1 + ",sensor2= " + sensor2);
                return new WaterSensor(sensor1.getId(), sensor2.getTs(), sensor1.getVc() + sensor2.getVc());
            }
        });

        reduce.print();

        env.execute();
    }
}

运行结果:

// 窗口1
调用 reduce 函数,sensor1= WaterSensor{id='s1', ts=1, vc=1},sensor2= WaterSensor{id='s1', ts=2, vc=2}
WaterSensor{id='s1', ts=2, vc=3}    
WaterSensor{id='s3', ts=3, vc=3}
// 窗口2
WaterSensor{id='s3', ts=1, vc=1}
// 窗口3
调用 reduce 函数,sensor1= WaterSensor{id='s4', ts=1, vc=1},sensor2= WaterSensor{id='s4', ts=1, vc=5}
WaterSensor{id='s4', ts=1, vc=6}
 2)聚合函数(AggregateFunction)

ReduceFunction 可以解决大多数归约聚合的问题,但是这个接口有一个限制,就是聚合状态的类型、输出结果的类型都必须和输入数据类型一样(可以看上面函数接口 ReduceFunction 中的泛型方法的定义)。这就迫使我们必须在聚合前,先将数据转换(map)成预期结果类型;而在有些情况下,还需要对状态进行进一步处理才能得到输出结果,这时它们的类型可能不同,使用 ReduceFunction 就会非常麻烦。

AggregateFunction 可以看作是 ReduceFunction 的通用版本,这里有三种类型:输入类型(IN)、累加器类型(ACC,也就是我们中间结果的类型)和输出类型(OUT)。

接口中有四个方法:

⚫ createAccumulator():创建一个累加器,这就是为聚合创建了一个初始状态,每个聚合任务只会调用一次。
⚫ add():将输入的元素添加到累加器中。这就是基于聚合状态,对新来的数据进行进一步聚合的过程。方法传入两个参数:当前新到的数据 value,和当前的累加器 accumulator;返回一个新的累加器值,也就是对聚合状态进行更新。每条数据到来之后都会调用这个方法。
⚫ getResult():从累加器中提取聚合的输出结果。也就是说,我们可以定义多个状态,然后再基于这些聚合的状态计算出一个结果进行输出。
⚫ merge():合并两个累加器,并将合并后的状态作为一个累加器返回。这个方法只在需要合并窗口的场景下才会被调用;最常见的合并窗口(Merging Window)的场景就是会话窗口(Session Windows)。

public class AggregateDemo {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        SingleOutputStreamOperator<WaterSensor> sensorDS = env
                .socketTextStream("localhost", 9999)
                .map(new WaterSensorFunction());

        KeyedStream<WaterSensor, String> sensorKs = sensorDS.keyBy(WaterSensor::getId);

        // todo 1. 指定窗口分配器:基于处理时间的滚动窗口
        WindowedStream<WaterSensor, String, TimeWindow> tumblingWindow = sensorKs.window(TumblingProcessingTimeWindows.of(Time.seconds(10)));
        // todo 2. 指定窗口函数:增量聚合的规约函数
        SingleOutputStreamOperator<String> aggregate = tumblingWindow.aggregate(new AggregateFunction<WaterSensor, Integer, String>() {
            //
            @Override
            public Integer createAccumulator() {
                // 我们要存的是对象,不初始化的话是null,所以需要初始化它的值
                System.out.println("创建累加器(初始化累加器)");
                return 0;
            }

            @Override
            public Integer add(WaterSensor sensor, Integer accumulator) {
                // 计算逻辑
                System.out.println("调用add方法,sensor= "+sensor);
                return accumulator + sensor.getVc();
            }

            @Override
            public String getResult(Integer accumulator) {
                // 获取最终结果输出时 窗口触发输出
                System.out.println("调用 getResult 方法");
                return accumulator.toString();
            }

            @Override
            public Integer merge(Integer a, Integer b) {
                // 一般不用 只有会话窗口才会用到
                System.out.println("调用 merge 方法");
                return null;
            }
        });

        aggregate.print();

        env.execute();
    }
}

运行结果:

// 窗口1
创建累加器(初始化累加器)
调用add方法,sensor= WaterSensor{id='s1', ts=1, vc=1}
创建累加器(初始化累加器)
调用add方法,sensor= WaterSensor{id='s2', ts=1, vc=1}
调用 getResult 方法    //id='s1'的结果
1
调用 getResult 方法    //id='s2'的结果
1
// 窗口2
创建累加器(初始化累加器)
调用add方法,sensor= WaterSensor{id='s1', ts=1, vc=2}
调用add方法,sensor= WaterSensor{id='s1', ts=1, vc=1}
调用add方法,sensor= WaterSensor{id='s1', ts=1, vc=3}
调用 getResult 方法
6
// 窗口3
创建累加器(初始化累加器)
调用add方法,sensor= WaterSensor{id='s2', ts=2, vc=2}
调用add方法,sensor= WaterSensor{id='s2', ts=2, vc=2}
调用 getResult 方法
4
// 窗口4
创建累加器(初始化累加器)
调用add方法,sensor= WaterSensor{id='s2', ts=2, vc=1}

其实这里的增量聚合函数就是用流处理的思路来处理有界数据流,当相同 key 的数据进来时,把数据的状态不断进行更新。这就是 Flink 所谓的“有状态的流处理”,通过这种方式可以极大地提高程序运行的效率,所以在实际应用中最为常见。 

1.5.2、全窗口函数(apply/process)

        与增量聚合函数不同,全窗口函数需要先收集窗口中的数据,并在内部缓存起来,等到窗口要输出结果的时候再取出数据进行计算。很明显,这就是典型的批处理思路了——先攒数据,等一批都到齐了再正式启动处理流程。

        但是把计算放到窗口关闭才去计算无疑是低效的,毕竟如果数据量比较大的时候,这种方式肯定没有增量聚合函数计算的快。那为什么还要使用这种方式呢?这是因为有些场景下,我们要做的计算必须基于全部的数据才有效,这时做增量聚合就没什么意义了;另外,输出的结果有可能要包含上下文中的一些信息(比如窗口的起始时间),这是增量聚合函数做不到的。所以,我们还需要有更丰富的窗口计算方式,这就可以用全窗口函数来实现。
        在 Flink 中,全窗口函数也有两种:WindowFunction 和 ProcessWindowFunction。

1)全窗口函数(full window functions)

        WindowFunction 字面上就是“窗口函数”,它其实是老版本的通用窗口函数接口。我们可以基于 WindowedStream 调用.apply()方法,传入一个 WindowFunction 的实现类。

注意:这种写法已经不推荐使用了,这里了解即可。

tumblingWindow.apply(new WindowFunction<WaterSensor, String, String, TimeWindow>() {
            /**
             * 
             * @param key 分组的 key
             * @param window 窗口对象
             * @param input 存的数据(迭代器保存着所有传进来的数据)
             * @param out 采集器
             * @throws Exception
             */
            @Override
            public void apply(String key, TimeWindow window, Iterable<WaterSensor> input, Collector<String> out) throws Exception {
                
            }
        });

这个类中可以获取到包含窗口所有数据的可迭代集合(Iterable),还可以拿到窗口(Window)本身的信息。WindowFunction 接口在源码中实现如下:

@Public
public interface WindowFunction<IN, OUT, KEY, W extends Window> extends Function, Serializable {

    /**
     * Evaluates the window and outputs none or several elements.
     *
     * @param key The key for which this window is evaluated.
     * @param window The window that is being evaluated.
     * @param input The elements in the window being evaluated.
     * @param out A collector for emitting elements.
     * @throws Exception The function may throw exceptions to fail the program and trigger recovery.
     */
    void apply(KEY key, W window, Iterable<IN> input, Collector<OUT> out) throws Exception;
}
2)窗口处理函数(ProcessWindowFunction)

        ProcessWindowFunction 是 Window API 中最底层的通用窗口函数接口。之所以说它“最底层”,是因为除了可以拿到窗口中的所有数据之外,ProcessWindowFunction 还可以获取到一个“上下文对象”(Context)。这个上下文对象非常强大,不仅能够获取窗口信息,还可以访问当前的时间和状态信息。这里的时间就包括了处理时间(processing time)和事件时间水位线(event time watermark)。这就使得 ProcessWindowFunction 更加灵活、功能更加丰富。事实上, ProcessWindowFunction 是 Flink 底层 API——处理函数(process function)中的一员。

        当 然 , 这 些 好 处 是 以 牺 牲 性 能 和 资 源 为 代 价 的 。 作 为 一 个 全 窗 口 函 数 ,ProcessWindowFunction 同样需要将所有数据缓存下来、等到窗口触发计算时才使用。它其实就是一个增强版的 WindowFunction。

public class WindowProcessDemo {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        SingleOutputStreamOperator<WaterSensor> sensorDS = env
                .socketTextStream("localhost", 9999)
                .map(new WaterSensorFunction());

        KeyedStream<WaterSensor, String> sensorKs = sensorDS.keyBy(WaterSensor::getId);

        // todo 1. 指定窗口分配器:基于处理时间的滚动窗口
        WindowedStream<WaterSensor, String, TimeWindow> tumblingWindow = sensorKs.window(TumblingProcessingTimeWindows.of(Time.seconds(10)));

        // todo 2. 指定窗口函数:全窗口函数
        SingleOutputStreamOperator<String> process = tumblingWindow.process(new ProcessWindowFunction<WaterSensor, String, String, TimeWindow>() {
            /**
             *
             * @param key 分组的 key
             * @param context 上下文
             * @param elements 全窗口存的数据
             * @param out 采集器
             * @throws Exception
             */
            @Override
            public void process(String key, Context context, Iterable<WaterSensor> elements, Collector<String> out) throws Exception {
                long startTs = context.window().getStart();
                long endTs = context.window().getEnd();
                SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
                String start = sdf.format(new Date(startTs));
                String end = sdf.format(new Date(endTs));
                long size = elements.spliterator().estimateSize();
                out.collect("key=" + key + " 的窗口[" + start + "," + end + "]包含" + size + "条数据===>" + elements.toString());
            }
        });

        process.print();

        env.execute();
    }
}

运行结果:

key=s1 的窗口[2023-11-29 16:30:30,2023-11-29 16:30:40]包含1条数据===>[WaterSensor{id='s1', ts=1, vc=1}]
key=s2 的窗口[2023-11-29 16:30:30,2023-11-29 16:30:40]包含2条数据===>[WaterSensor{id='s2', ts=1, vc=1}, WaterSensor{id='s2', ts=1, vc=1}]

这里,我们只用上下文对象获取了窗口的创建时间和关闭时间,其实它还有很多强大的功能,比如侧输出流等。 

1.5.3、增量聚合函数结合全窗口函数

        增量聚合函数处理计算会更高效。举一个最简单的例子,对一组数据求和。大量的数据连续不断到来,全窗口函数只是把它们收集缓存起来,并没有处理;到了窗口要关闭、输出结果的时候,再遍历所有数据依次叠加,得到最终结果。而如果我们采用增量聚合的方式,那么只需要保存一个当前和的状态,每个数据到来时就会做一次加法,更新状态;到了要输出结果的时候,只要将当前状态直接拿出来就可以了。增量聚合相当于把计算量“均摊”到了窗口收集数据的过程中,自然就会比全窗口聚合更加高效、输出更加实时。
        而全窗口函数的优势在于提供了更多的信息,可以认为是更加“通用”的窗口操作。它只负责收集数据、提供上下文相关信息,把所有的原材料都准备好,至于拿来做什么我们完全可以任意发挥。这就使得窗口计算更加灵活,功能更加强大。
所以在实际应用中,我们往往希望兼具这两者的优点,把它们结合在一起使用。Flink 的Window API 就给我们实现了这样的用法。
        我们之前在调用 WindowedStream 的.reduce()和.aggregate()方法时,只是简单地直接传入了一个 ReduceFunction 或 AggregateFunction 进行增量聚合。除此之外,其实还可以传入第二个参数:一个全窗口函数,可以是 WindowFunction 或者 ProcessWindowFunction。

public class AggregateAndProcessDemo {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        SingleOutputStreamOperator<WaterSensor> sensorDS = env
                .socketTextStream("localhost", 9999)
                .map(new WaterSensorFunction());

        KeyedStream<WaterSensor, String> sensorKs = sensorDS.keyBy(WaterSensor::getId);

        // todo 1. 指定窗口分配器:基于处理时间的滚动窗口
        WindowedStream<WaterSensor, String, TimeWindow> tumblingWindow = sensorKs.window(TumblingProcessingTimeWindows.of(Time.seconds(10)));
        // todo 2. 指定窗口函数:增量聚合的规约函数
        SingleOutputStreamOperator<String> result = tumblingWindow.aggregate(
                new AggregateFunction<WaterSensor, Integer, String>() {
                    @Override
                    public Integer createAccumulator() {
                        // 我们要存的是对象,不初始化的话是null,所以需要初始化它的值
                        System.out.println("创建累加器(初始化累加器)");
                        return 0;
                    }

                    @Override
                    public Integer add(WaterSensor sensor, Integer accumulator) {
                        // 计算逻辑
                        System.out.println("调用add方法,sensor= " + sensor);
                        return accumulator + sensor.getVc();
                    }

                    @Override
                    public String getResult(Integer accumulator) {
                        // 获取最终结果输出时 窗口触发输出
                        System.out.println("调用 getResult 方法");
                        return accumulator.toString();
                    }

                    @Override
                    public Integer merge(Integer a, Integer b) {
                        // 一般不用 只有会话窗口才会用到
                        System.out.println("调用 merge 方法");
                        return null;
                    }
                }, new ProcessWindowFunction<String, String, String, TimeWindow>() {    // 输入类型=增量函数的输出类型
                    // 这里的 elements 只有一条数据,因为是从增量聚合函数直接传过来的,我们使用全窗口函数只是为了获得更多的上下文功能
                    @Override
                    public void process(String key, Context context, Iterable<String> elements, Collector<String> out) throws Exception {
                        long startTs = context.window().getStart();
                        long endTs = context.window().getEnd();
                        SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
                        String start = sdf.format(new Date(startTs));
                        String end = sdf.format(new Date(endTs));
                        long size = elements.spliterator().estimateSize();
                        out.collect("key=" + key + " 的窗口[" + start + "," + end + "]包含" + size + "条数据===>" + elements.toString());
                    }
                });

        result.print();

        env.execute();
    }
}

运行结果:

// 窗口1
调用add方法,sensor= WaterSensor{id='s1', ts=1, vc=1}
调用add方法,sensor= WaterSensor{id='s1', ts=1, vc=2}
调用 getResult 方法    //这里调用 getResult 方法并不会输出结果 因为结果被作为输入参数传给了全窗口函数
key=s1 的窗口[2023-11-29 16:48:00,2023-11-29 16:48:10]包含1条数据===>[3]
创建累加器(初始化累加器)
调用add方法,sensor= WaterSensor{id='s2', ts=2, vc=2}
调用 getResult 方法
key=s2 的窗口[2023-11-29 16:48:10,2023-11-29 16:48:20]包含1条数据===>[2]
// 窗口2
创建累加器(初始化累加器)
调用add方法,sensor= WaterSensor{id='s1', ts=1, vc=1}
调用add方法,sensor= WaterSensor{id='s1', ts=1, vc=1}
创建累加器(初始化累加器)
调用add方法,sensor= WaterSensor{id='s2', ts=2, vc=2}
调用 getResult 方法    
key=s1 的窗口[2023-11-29 16:48:20,2023-11-29 16:48:30]包含1条数据===>[2]
调用 getResult 方法
key=s2 的窗口[2023-11-29 16:48:20,2023-11-29 16:48:30]包含1条数据===>[2]

可以看到,我们结合了两者的优点:

  1. 增量聚合:来一条算一条,存储中间结果,不会把计算的压力攒到最后。
  2. 全窗口函数:可以通过上下文获取更多的功能。

当然,我们的增量聚合函数 reduce 也可以和 全窗口函数结合使用,同样是传入两个参数(一个需要实现函数接口 ReduceFunction ,另一个需要实现抽象类 ProcessWindowFunction (或者  WindowFunction 只不过这个现在不常用了))。

1.6、其他 API

其实前面的窗口都有默认的触发器,比如滚动计数窗口中指定了触发器,滑动计数窗口中指定了触发器和移除器。这里理解就好。

1.6.1、触发器(Trigger)

        触发器主要是用来控制窗口什么时候触发计算,也就是什么时候执行窗口函数,可以理解为计算得到结果的过程。

        基于 WindowedStream 调用 tigger() 方法,就可以传入一个自定义的窗口触发器(Trigger)。

tumblingWindow.trigger(new Trigger<WaterSensor, TimeWindow>() {
            @Override
            public TriggerResult onElement(WaterSensor element, long timestamp, TimeWindow window, TriggerContext ctx) throws Exception {
                return null;
            }

            @Override
            public TriggerResult onProcessingTime(long time, TimeWindow window, TriggerContext ctx) throws Exception {
                return null;
            }

            @Override
            public TriggerResult onEventTime(long time, TimeWindow window, TriggerContext ctx) throws Exception {
                return null;
            }

            @Override
            public void clear(TimeWindow window, TriggerContext ctx) throws Exception {

            }
        });

        拿我们基于处理时间的滚动窗口来说,TumblingProcessingTimeWindows 有一个  getDefaultTrigger 方法来选择默认的触发器,这个方法会返回一个类 ProcessingTimeTrigger 的实例,这个类 ProcessingTimeTrigger 继承自抽象类 Tigger, 主要实现了这么几个方法:

  • onElement():窗口中每到来一条数据,都会调用这个方法。
  • onEventTime():当注册的事件时间定时器触发时,将调用这个方法。
  • onProcessingTime ():当注册的处理时间定时器触发时,将调用这个方法。
  • clear():当窗口关闭销毁时,调用这个方法。一般用来清除自定义的状态。

 我们可以看上面 基于计数的滑动窗口的底层代码,我们可以发现,它有默认的触发器和移除器:

 我们可以查看它的触发器的内部代码:

1.6.2、移除器(Evictor)

        移除器主要用来自定义移除某些数据的逻辑。基于 WindowedStream 调用 evictor() 方法,就可以传入一个自定义的移除器。Evictor 是一个接口,不同的窗口类型有自己预先实现的移除器。

tumblingWindow.evictor(new Evictor<WaterSensor, TimeWindow>() {
            @Override
            public void evictBefore(Iterable<TimestampedValue<WaterSensor>> elements, int size, TimeWindow window, EvictorContext evictorContext) {
                
            }

            @Override
            public void evictAfter(Iterable<TimestampedValue<WaterSensor>> elements, int size, TimeWindow window, EvictorContext evictorContext) {

            }
        });

不管是触发器还是移除器,一般都不需要我们自定义,但要知道它的底层原理。

1.7、窗口原理分析

1.7.1、窗口的触发时机

思考:假如我们第一条数据到来的时间是 03分:05秒,时间窗口大小为10分钟,那么这条数据属于窗口 [0,10)还是[3,13)呢?(注意:窗口是左闭右开的!

事实上,窗口的划分并不是以第一条数据来的时间作为初始时间(窗口的 start),上面的数据是属于窗口[0,10]的。我们通过上面小节中的运行结果也可以看到,时间都是整秒开始的。我们可以查看计算窗口起始时间的代码底层原理(下面是以 TumblingProcessingTimeWindows 为例):

我们可以看到, TumblingProcessingTimeWindows 的 assignWindows 方法的作用是确定如何将流中的元素分配到不同的窗口中。其中定义了窗口的起始时间(start)和结束时间(start + size):

1.7.2、窗口的生命周期

1)什么时候创建?

Flink 是事件驱动型的,它不会预先把窗口建立好等数据来,而是当属于本窗口范围的第一条数据来的时候才创建窗口。

比如我们的窗口大小为 10s,假如我们在 13s(第一条)、14s 来了两条数据 ,它是什么时候创建窗口的?

通过源码我们可以看到,属于该窗口的数据到来时,都会通过现 new 来创建一个窗口对象,并且放到一个单例集合中,表示这个窗口对象只有一个,第一条数据之后来的就不会覆盖之前的窗口了。

2)什么时候销毁?

当 时间进展 >= 窗口最大时间 (end - 1ms) + 允许迟到的时间(默认为0) 的时候,就会销毁窗口。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1273726.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

第16届中国R会议暨2023X-AGI大会开幕,和鲸科技分享ModelOps在数据科学平台中的实践与应用

11月25日&#xff0c;第 16 届中国 R 会议暨 2023 X-AGI 大会在在中国人民大学逸夫会堂拉开帷幕&#xff0c;本次会议由中国人民大学统计学院、中国人民大学应用统计科学研究中心、统计之都、原灵科技和中国商业统计学会人工智能分会&#xff08;筹&#xff09;主办&#xff0c…

RNN:文本生成

文章目录 一、完整代码二、过程实现2.1 导包2.2 数据准备2.3 字符分词2.4 构建数据集2.5 定义模型2.6 模型训练2.7 模型推理 三、整体总结 采用RNN和unicode分词进行文本生成 一、完整代码 作者在文章开头地址中使用C实现了这一过程&#xff0c;为了便于理解&#xff0c;这里我…

近期知识点

aop (1) 要求利用AOP记录用户操作日志。内容包含以下信息&#xff1a;ip地址、用户名、请求的地址&#xff0c;请求的时间 &#xff08; 4 分&#xff09; &#xff08;2&#xff09;要求利用AOP记录用户操作日志&#xff0c;日志记录到文本文件。内容包含以下信息&#xff…

[读论文][跑代码]BK-SDM: A Lightweight, Fast, and Cheap Version of Stable Diffusion

github: GitHub - Nota-NetsPresso/BK-SDM: A Compressed Stable Diffusion for Efficient Text-to-Image Generation [ICCV23 Demo] [ICML23 Workshop] ICML 2023 Workshop on ES-FoMo 简化方式 蒸馏方式&#xff08;训练Task蒸馏outKD-FeatKD&#xff09; 训练数据集 评测指标…

最新Midjourney绘画提示词Prompt

最新Midjourney绘画提示词Prompt 一、AI绘画工具 SparkAi【无需魔法使用】&#xff1a; SparkAi是目前国内一款的ChatGPT对接OpenAI软件系统。那么如何搭建部署AI创作ChatGPT&#xff1f;小编这里写一个详细图文教程吧&#xff01;本系统使用NestjsVueTypescript框架技术&am…

Vue 和 React 的优点分别是什么?如何选择?

目录 为什么我更喜欢Vue&#xff1f; 低代码平台的前端框架采用Vue的优势有哪些&#xff1f; JNPF-Web-Vue3 的技术栈介绍 &#xff08;1&#xff09;Vue3.x &#xff08;2&#xff09;Vue-router4.x &#xff08;3&#xff09;Vite4.x &#xff08;4&#xff09;Ant-D…

Echarts 设置数据条颜色 宽度

设置数据条颜色&#xff08;推荐&#xff09; let yData [{value: 500,time: 2012-11-12,itemStyle: //设置数据条颜色{normal: { color: red }}},{value: 454,time: 2020-5-17},{value: 544,time: 2022-1-22},{value: 877,time: 2013-1-30}, {value: 877,time: 2012-11-12}]…

如何通过linux调用企业微信发送告警消息

一、前期准备 1、企业微信具备管理企业权限。 2、服务器有公网IP或者可以将本机端口通过net映射到公网。 二、通过脚本向企业微信发送消息 1、创建sh脚本用来发送消息。 vim 2.sh 注意&#xff1a;脚本中xxxx信息需要在企业微信管理后台获取。 #!/bin/bash # 设置企业…

力扣:1419. 数青蛙

题目&#xff1a; 代码&#xff1a; class Solution { public:int minNumberOfFrogs(string croakOfFrogs){string s "croak";int ns.size();//首先创建一个哈希表来标明每个元素出现的次数&#xff01;vector<int>hash(n); //不用真的创建一个hash表用一个数…

一、Linux系统概述和安装

目录 1、Linux系统概述 2、Linux发行版介绍 3、虚拟机软件介绍 4、VMware安装 5、Linux系统&#xff08;CentOS&#xff09;系统安装 6、登录并查看IP地址 7、Linux连接工具CRT使用 7.1 概述 7.2 CRT安装 7.3 使用步骤 7.4 文件上传 8、Linux的快照 8.1 作用 8.2…

传统算法:使用 Pygame 实现二分查找

使用 Pygame 模块实现了二分查找的动画演示。首先,它生成一个有序数组,并通过 Pygame 在屏幕上绘制这个数组的条形图。接着,通过二分查找算法对有序数组进行查找,动画效果可视化每一步的变化。在查找的过程中,程序通过比较目标值和数组中间元素,逐步缩小搜索范围,高亮显…

Python-简单模拟斗地主洗牌发牌

额滴名片儿 &#x1f388; 博主&#xff1a;一只程序猿子 &#x1f388; 博客主页&#xff1a;一只程序猿子 博客主页 &#x1f388; 个人介绍&#xff1a;爱好(bushi)编程&#xff01; &#x1f388; 创作不易&#xff1a;如喜欢麻烦您点个&#x1f44d;或者点个⭐&#xff01…

【人工智能Ⅰ】实验5:AI实验箱应用之贝叶斯

实验5 AI实验箱应用之贝叶斯 一、实验目的 1. 用实验箱的摄像头拍摄方块上数字的图片&#xff0c;在图像处理的基础上&#xff0c;应用贝叶斯方法识别图像中的数字并进行分类。 二、实验内容和步骤 1. 应用实验箱机械手臂上的摄像头拍摄图像&#xff1b; 2. Opencv处理图像…

生成对抗网络(GAN)手写数字生成

文章目录 一、前言二、前期工作1. 设置GPU&#xff08;如果使用的是CPU可以忽略这步&#xff09; 二、什么是生成对抗网络1. 简单介绍2. 应用领域 三、网络结构四、构建生成器五、构建鉴别器六、训练模型1. 保存样例图片2. 训练模型 七、生成动图 一、前言 我的环境&#xff1…

可行性研究:2023年废旧金属回收行业前景及市场数据分析

废品收购是再生资源行业的重要业务之一。是指将各种废弃物品分类后按不同种类和性能卖给不同的生产厂商或直接出售给再制造厂家&#xff08;如重新使用报废汽车拆解的零件&#xff09;。废旧金属是指暂时失去使用价值的金属或合金制品&#xff0c;一般的废旧金属都含有有用的金…

车牌限行_分支结构的C语言实现xdoj7

试题名称 车牌限行 时间限制: 1 秒 内存限制: 256KB 问题描述 问题描述 受雾霾天气影响&#xff0c;某市决定当雾霾指数超过设定值时对车辆进行限行&#xff0c;假设车牌号全为数字&#xff0c;且长度不超过6位&#xff0c;限行规则如下&#xff1a; &#xff08;…

C++相关闲碎记录(2)

1、误用shared_ptr int* p new int; shared_ptr<int> sp1(p); shared_ptr<int> sp2(p); //error // 通过原始指针两次创建shared_ptr是错误的shared_ptr<int> sp1(new int); shared_ptr<int> sp2(sp1); //ok 如果对C相关闲碎记录(1)中记录的shar…

AI - Steering behaviorsII(碰撞避免,跟随)

Steering Behaviors系统中的碰撞避免&#xff0c;路径跟随&#xff0c;队长跟随 Collision Avoid 在物体前进的方向&#xff0c;延伸一定长度的向量进行检测。相当于物体对前方一定可使范围进行检测障碍物的碰撞 延伸的向量与碰撞物圆心的距离小于碰撞物的半径&#xff0c;则…

docker-compose脚本编写及常用命令

安装 linux DOCKER_CONFIG/usr/local/lib/docker/cli-plugins sudo mkdir -p $DOCKER_CONFIG/cli-plugins sudo curl -SL https://521github.com/docker/compose/releases/download/v2.6.1/docker-compose-linux-x86_64 -o $DOCKER_CONFIG/cli-plugins/docker-compose sudo c…

numpy知识库:深入理解numpy.resize函数和数组的resize方法

前言 numpy中的resize函数顾名思义&#xff0c;可以用于调整数组的大小。但具体如何调整&#xff1f;数组形状变了&#xff0c;意味着数组中的元素个数发生了变化(增加或减少)&#xff0c;如何确定resize后的新数组中每个元素的数值呢&#xff1f;本次博文就来探讨并试图回答这…