尚硅谷Flink(三)时间、窗口

news2025/1/10 11:44:30

1

🎰🎲🕹️

🎰时间、窗口

🎲窗口

🕹️是啥

Flink 是一种流式计算引擎,主要是来处理无界数据流的,数据源源不断、无穷无尽。想要更加方便高效地处理无界流,一种方式就是将无限数据切割成有限的“数据块”进行处理,这就是所谓的“窗口”(Window)。 

在Flink中,窗口其实并不是一个“框”,应该把窗口理解成一个“桶”。在Flink中,窗口可以把流切割成有限大小的多个“存储桶”(bucket);每个数据都会分发到对应的桶中,当到达窗口结束时间时,就对每个桶中收集的数据进行计算处理。

Flink 中窗口并不是静态准备好的,而是动态创建——当有落在这个窗口区间范围的数据达到时,才创建对应的窗口。另外,这里我们认为到达窗口结束时间时,窗口就触发计算并关闭

事实上“触发计算”和“窗口关闭”两个行为也可以分开

🕹️分类

窗口本身是截取有界数据的一种方式,所以窗口一个非常重要的信息其实就是“怎样截取数据”。换句话说,就是以什么标准来开始和结束数据的截取,我们把它叫作窗口的“驱动类型”。

1)时间窗口(Time Window)
时间窗口以时间点来定义窗口的开始(start)和结束(end),所以截取出的就是某一时间段的数据。到达结束时间时,窗口不再收集数据,触发计算输出结果,并将窗口关闭销毁。所以可以说基本思路就是“定点发车”。

(2)计数窗口(Count Window)
计数窗口基于元素的个数来截取数据,到达固定的个数时就触发计算并关闭窗口。每个窗口截取数据的个数,就是窗口的大小。基本思路是“人齐发车”。

根据分配数据的规则,窗口的具体实现可以分为 4 类:滚动窗口(Tumbling Window)、滑动窗口(Sliding Window)、会话窗口(Session Window),以及全局窗口(Global Window)

  • 滚动窗口有固定的大小,是一种对数据进行“均匀切片”的划分方式。窗口之间没有重叠,也不会有间隔,是“首尾相接”的状态。这是最简单的窗口形式,每个数据都会被分配到一个窗口,而且只会属于一个窗口。比如我们可以定义一个长度为1小时的滚动时间窗口,那么每个小时就会进行一次统计;或者定义一个长度为10的滚动计数窗口,就会每10个数进行一次统计。

滚动窗口也可以看作是一种特殊的滑动窗口——窗口大小等于滑动步长(size = slide)。

  • 滑动窗口的大小也是固定的。但是窗口之间并不是首尾相接的,而是可以“错开”一定的位置。定义滑动窗口的参数有两个:除去窗口大小(window size)之外,还有一个“滑动步长”(window slide),它其实就代表了窗口计算的频率。窗口在结束时间触发计算输出结果,那么滑动步长就代表了计算频率。

  • 会话窗口,是基于“会话”(session)来来对数据进行分组的。会话窗口只能基于时间来定义。会话窗口中,最重要的参数就是会话的超时时间,也就是两个会话窗口之间的最大距离。如果相邻两个数据到来的时间间隔(Gap)小于指定的大小(size),那说明还在保持会话,它们就属于同一个窗口;如果gap大于size,那么新来的数据就应该属于新的会话窗口,而前一个窗口就应该关闭了。

  • “全局窗口”,这种窗口全局有效,会把相同key的所有数据都分配到同一个窗口中。这种窗口没有结束的时候,默认是不会做触发计算的。如果希望它能对数据进行计算处理,还需要自定义“触发器”(Trigger)。

🕹️api

在定义窗口操作之前,首先需要确定,到底是基于按键分区(Keyed)的数据流KeyedStream 来开窗,还是直接在没有按键分区的 DataStream 上开窗。也就是说,在调用窗口算子之前,是否有 keyBy 操作。 

(1)按键分区窗口(Keyed Windows) 
经过按键分区 keyBy 操作后,数据流会按照key 被分为多条逻辑流(logical streams),这就是KeyedStream。基于KeyedStream 进行窗口操作时,窗口计算会在多个并行子任务上同时执行。相同 key 的数据会被发送到同一个并行子任务,而窗口操作会基于每个 key 进行单独的处理。所以可以认为,每个 key 上都定义了一组窗口,各自独立地进行统计计算。

stream.keyBy(...) .window(...)

(2)非按键分区(Non-Keyed Windows) 
如果没有进行 keyBy,那么原始的 DataStream 就不会分成多条逻辑流。这时窗口逻辑只能在一个任务(task)上执行,就相当于并行度变成了1。 

stream.windowAll(...) 

注意:对于非按键分区的窗口操作,手动调大窗口算子的并行度也是无效的,windowAll
本身就是一个非并行的操作。 

stream.keyBy(<key selector>) 
       .window(<window assigner>) 
       .aggregate(<window function>) 

 其中.window()方法需要传入一个窗口分配器,它指明了窗口的类型;而后面的.aggregate()方法传入一个窗口函数作为参数,它用来定义窗口具体的处理逻辑。窗口分配器有各种形式,而窗口函数的调用方法也不只.aggregate()一种

🕹️窗口分配器

        // 窗口分配器 指定用什么窗口 时间、计数————滚动、滑动、会话
        KS.window(TumblingProcessingTimeWindows.of(Time.seconds(10)));
        KS.window(SlidingProcessingTimeWindows.of(Time.seconds(10), Time.seconds(5)));
        KS.window(ProcessingTimeSessionWindows.withGap(Time.seconds(5)));
        KS.window(GlobalWindows.create());

        KS.countWindow(5);  // 窗口数据长度5
        KS.countWindow(5, 2);  // 滑动

🕹️窗口函数

定义了窗口分配器,我们只是知道了数据属于哪个窗口,可以将数据收集起来了;至于收集起来到底要做什么, 其实还完全没有头绪。所以在窗口分配器之后,必须再接上一个定义窗口如何进行计算的操作,这就是所谓的“窗口函数”(window functions)。

增量聚合Reduce

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        DataStreamSource<String> stream = env.socketTextStream("localhost", 7777);
        KeyedStream<WaterSensor, String> KS = stream.map(new MapFunction<String, WaterSensor>() {
                    @Override
                    public WaterSensor map(String s) throws Exception {
                        String[] list = s.split(",");
                        return new WaterSensor(list[0], Long.valueOf(list[1]), Integer.valueOf(list[2]));
                    }
                })
                .keyBy(value -> value.id);


        // 窗口分配器 指定用什么窗口 时间、计数————滚动、滑动、会话
        WindowedStream<WaterSensor, String, TimeWindow> window = KS.window(TumblingProcessingTimeWindows.of(Time.seconds(5)));

        window.reduce(new ReduceFunction<WaterSensor>() {
            @Override
            public WaterSensor reduce(WaterSensor t1, WaterSensor t2) throws Exception {
                System.out.println("reduce, t1: t2 = "+t1+": "+t2);
                return new WaterSensor(t1.getId(), t1.getTs(), t1.getVc()+t2.getVc());
            }
        }).print();

        env.execute();

    }

Aggregate

ReduceFunction 可以解决大多数归约聚合的问题,但是这个接口有一个限制,就是聚合 状态的类型、输出结果的类型都必须和输入数据类型一样。

AggregateFunction 可以看作是 ReduceFunction 的通用版本,这里有三种类型:

输入类型 (IN)、累加器类型(ACC)和输出类型(OUT)。输入类型 IN 就是输入流中元素的数据类型;累加器类型 ACC 则是我们进行聚合的中间状态类型;而输出类型OUT当然就是最终计算结果 的类型了。

接口中有四个方法:

⚫ createAccumulator():创建一个累加器,这就是为聚合创建了一个初始状态,每个聚 合任务只会调用一次。

⚫ add():将输入的元素添加到累加器中。

⚫ getResult():从累加器中提取聚合的输出结果。

⚫ merge():合并两个累加器,并将合并后的状态作为一个累加器返回

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        DataStreamSource<String> stream = env.socketTextStream("localhost", 7777);
        KeyedStream<WaterSensor, String> KS = stream.map(new MapFunction<String, WaterSensor>() {
                    @Override
                    public WaterSensor map(String s) throws Exception {
                        String[] list = s.split(",");
                        return new WaterSensor(list[0], Long.valueOf(list[1]), Integer.valueOf(list[2]));
                    }
                })
                .keyBy(value -> value.id);


        // 窗口分配器 指定用什么窗口 时间、计数————滚动、滑动、会话
        WindowedStream<WaterSensor, String, TimeWindow> window = KS.window(TumblingProcessingTimeWindows.of(Time.seconds(5)));

        //  类型形参:
        //  <IN> – The type of the values that are aggregated (input values)
        //  <ACC> – The type of the accumulator (intermediate aggregate state).
        //  <OUT> – The type of the aggregated result
        SingleOutputStreamOperator<String> aggregate = window.aggregate(new AggregateFunction<WaterSensor, Integer, String>() {

            @Override
            public Integer createAccumulator() {
                System.out.println("createAccumulator()");
                return 0;
            }

            @Override
            public Integer add(WaterSensor value, Integer accumulator) {
                System.out.println("add");
                return value.getVc() + accumulator;
            }

            @Override
            public String getResult(Integer accumulator) {
                return "getResult " + accumulator.toString();
            }

            @Override
            public Integer merge(Integer a, Integer b) {
                // 会话窗口才用得到
                System.out.println("用不到的merge");
                return null;
            }
        });
        aggregate.print();

        env.execute();

    }

全窗口函数

有些场景下,我们要做的计算必须基于全部的数据才有效,这时做增量聚合就没什么意 义了;另外,输出的结果有可能要包含上下文中的一些信息(比如窗口的起始时间),这是增 量聚合函数做不到的。

全窗口函数需要先收集窗口中的数据,并在内部缓存起来,等到窗口 要输出结果的时候再取出数据进行计算。WindowFunction 和 ProcessWindowFunction。

1)窗口函数(WindowFunction)

WindowFunction 字面上就是“窗口函数”,它其实是老版本的通用窗口函数接口。我们 可以基于 WindowedStream 调用.apply()方法,传入一个 WindowFunction 的实现类。不过 WindowFunction 能提供的上下文信息较少,也没有更高级的功能。事实上,它的作 用可以被ProcessWindowFunction 全覆盖,所以之后可能会逐渐弃用。

2)处理窗口函数(ProcessWindowFunction)

ProcessWindowFunction 是 Window API 中最底层的通用窗口函数接口。之所以说它“最 底层”,是因为除了可以拿到窗口中的所有数据之外,ProcessWindowFunction 还可以获取到 一个“上下文对象”(Context)。这个上下文对象非常强大,不仅能够获取窗口信息,还可以 访问当前的时间和状态信息。这里的时间就包括了处理时间(processing time)和事件时间水位线(event time watermark)。这就使得 ProcessWindowFunction 更加灵活、功能更加丰富,其 实就是一个增强版的 WindowFunction。 事实上,ProcessWindowFunction 是 Flink 底层 API——处理函数(process function)中的 一员,关于处理函数我们会在后续章节展开讲解。

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        DataStreamSource<String> stream = env.socketTextStream("localhost", 7777);
        KeyedStream<WaterSensor, String> KS = stream.map(new MapFunction<String, WaterSensor>() {
                    @Override
                    public WaterSensor map(String s) throws Exception {
                        String[] list = s.split(",");
                        return new WaterSensor(list[0], Long.valueOf(list[1]), Integer.valueOf(list[2]));
                    }
                })
                .keyBy(value -> value.id);



        WindowedStream<WaterSensor, String, TimeWindow> window = KS.window(TumblingProcessingTimeWindows.of(Time.seconds(5)));
        /**
         * 类型形参:
         * <IN> – The type of the input value.
         * <OUT> – The type of the output value.
         * <KEY> – The type of the key.
         * <W> – The type of Window that this window function can be applied on.
         */
        SingleOutputStreamOperator<String> process = window.process(new ProcessWindowFunction<WaterSensor, String, String, TimeWindow>() {
            /**
             *
             * @param s The key 分组的key
             * @param context The context in which the window is being evaluated.
             * @param elements The elements in the window being evaluated.
             * @param out A collector for emitting elements.
             * @throws Exception
             */
            @Override
            public void process(String s, ProcessWindowFunction<WaterSensor, String, String, TimeWindow>.Context context, Iterable<WaterSensor> elements, Collector<String> out) throws Exception {
                // 上下文可以拿到的东西
                long start = context.window().getStart();
                long end = context.window().getEnd();
                String StartTime = DateFormatUtils.format(start, "yyyy-MM-dd HH:mm:ss.SSS");
                String EndTime = DateFormatUtils.format(end, "yyyy-MM-dd HH:mm:ss.SSS");

                long count = elements.spliterator().estimateSize();
                out.collect(s + "窗口[" + StartTime + "———" + EndTime + "] 有 " + count + " 条数据");
            }
        });
        process.print();
        /**
         * 16> s1窗口[2023-10-16 10:33:30.000———2023-10-16 10:33:35.000] 有 1 条数据
         * 16> s1窗口[2023-10-16 10:33:40.000———2023-10-16 10:33:45.000] 有 3 条数据
         * 16> s1窗口[2023-10-16 10:33:50.000———2023-10-16 10:33:55.000] 有 6 条数据
         * 16> s1窗口[2023-10-16 10:33:55.000———2023-10-16 10:34:00.000] 有 7 条数据
         */

        env.execute();

    }

agg、pro合体

   public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        DataStreamSource<String> stream = env.socketTextStream("localhost", 7777);
        KeyedStream<WaterSensor, String> KS = stream.map(new MapFunction<String, WaterSensor>() {
                    @Override
                    public WaterSensor map(String s) throws Exception {
                        String[] list = s.split(",");
                        return new WaterSensor(list[0], Long.valueOf(list[1]), Integer.valueOf(list[2]));
                    }
                })
                .keyBy(value -> value.id);



        WindowedStream<WaterSensor, String, TimeWindow> window = KS.window(TumblingProcessingTimeWindows.of(Time.seconds(5)));

        SingleOutputStreamOperator<String> process = window.aggregate(new MyAgg(), new MyProcess());


        process.print();


        env.execute();

    }

    public static class MyAgg implements AggregateFunction<WaterSensor, Integer, String>{

        @Override
        public Integer createAccumulator() {
            System.out.println("createAccumulator()");
            return 0;
        }

        @Override
        public Integer add(WaterSensor value, Integer accumulator) {
            System.out.println("add");
            return value.getVc() + accumulator;
        }

        @Override
        public String getResult(Integer accumulator) {
            return "getResult " + accumulator.toString();
        }

        @Override
        public Integer merge(Integer a, Integer b) {
            // 会话窗口才用得到
            System.out.println("用不到的merge");
            return null;
        }
    }

    public static class MyProcess extends ProcessWindowFunction<String, String, String, TimeWindow> {

        @Override
        public void process(String s, ProcessWindowFunction<String, String, String, TimeWindow>.Context context, Iterable<String> elements, Collector<String> out) throws Exception {
            // 上下文可以拿到的东西
            long start = context.window().getStart();
            long end = context.window().getEnd();
            String StartTime = DateFormatUtils.format(start, "yyyy-MM-dd HH:mm:ss.SSS");
            String EndTime = DateFormatUtils.format(end, "yyyy-MM-dd HH:mm:ss.SSS");

            long count = elements.spliterator().estimateSize();
            out.collect(s + "窗口[" + StartTime + "———" + EndTime + "] 有 " + count + " 条数据"+elements.toString());
        }
    }

🕹️触发器、移除器*

上述已经默认实现

触发器主要是用来控制窗口什么时候触发计算。所谓的“触发计算”,本质上就是执行窗 口函数,所以可以认为是计算得到结果并输出的过程。

基于WindowedStream调用.trigger()方法,就可以传入一个自定义的窗口触发器(Trigger)。

stream.keyBy(...) .window(...) .trigger(new MyTrigger())

移除器主要用来定义移除某些数据的逻辑。基于 WindowedStream 调用.evictor()方法,就 可以传入一个自定义的移除器(Evictor)。Evictor是一个接口,不同的窗口类型都有各自预实 现的移除器。

stream.keyBy(...) .window(...).evictor(new MyEvictor())

🎲时间语义(瞎起名)

到底是以那种时间作为衡量标准,就是所谓的“时间语义”。

在实际应用中,事件时间语义(真正产生的时间)会更为常见。一般情况下,业务日志数据中都会记录数据生成的时间戳(timestamp),它就可以作为事件时间的判断基础。

从 Flink1.12版本开始,Flink已经将事件时间作为默认的时间语义了。

🎲水位线

在窗口的处理过程中,我 们 可以基于数据的时间戳,自定义 一 个“逻辑时钟”。这个时钟的时间不会自动流逝;它的时间进 展,就是靠着新到数据的时间戳 来推动的

这样的好处在于,计算的过程可以完全不依赖处理时间(系统时间),不论什么时候进行统计 处理,得到的结果都是正确的。而一般实时流处理的场景中,事件时间可以基本与处理时间保持同 步,只是略微有一点延迟,同时保证了窗口计算的正确性。

在 Flink 中,用来衡量事件时间进展的标记,就被称作“水位线”(Watermark)。

具体实现上,水位线可以看作一条特殊的数据记录,它是插入到数据流中的一个标记点, 主要内容就是一个时间戳,用来指示当前的事件时间。而它插入流中的位置,就应该是在某 个数据到来之后;这样就可以从这个数据中提取时间戳,作为当前水位线的时间戳了。

1)有序流中的水位线

理想状态(数据量小),数据应该按照生成的先后顺序进入流中,每条数据产生一个水位线;

实际应用中,如果当前数据量非常大,且同时涌来的数据时间差会非常小(比如几毫秒),往 往对处理计算也没什么影响。所以为了提高效率,一般会每隔一段时间生成一个水位线。

2)乱序流中的水位线

😅乱序 + 数据量小:在分布式系统中,数据在节点间传输,会因为网络传输延迟的不确定性,导致顺序发生改变,这就是 所谓的“乱序数据”。 

情况是数据乱序,也就是说,只有数据的时间戳比当前时钟大,才能推动时钟前进,这时才插入水位线。

😅乱序 + 数据量大:如果考虑到大量数据同时到来的处理效率,我们同样可以周期性地生成水位线。这时 只需要保存一下之前所有数据中的最大时间戳,需要插入水位线时,就直接以它作为时间戳生成新的水位 线。

😅我们无法正确处理“迟到”的数据。为了让窗口能够正确收集到迟到的数据,我们也可 以等上一段时间,比如2秒;也就是用当前已有数据的最大时间戳减去2秒,就是要插入的水位线的时间戳。 这样的话,9秒的数据到来之后,事件时钟不会直接推进到9秒,而是进展到了7秒;必须等到11秒的数据到来 之后,事件时钟才会进展到9秒,这时迟到数据也都已收集齐,0~9秒的窗口就可以正确计算结果了

🕹️生成水位线原则

完美的水位线是“绝对正确”的,也就是一个水位线一旦出现,就表示这个时间之前的 数据已经全部到齐、之后再也不会出现了。不过如果要保证绝对正确,就必须等足够长的时间,这会带来更高的延迟。

如果我们对准确性完全不考虑、一味地追求处理速度,可以直接使用处理时间语义, 这在理论上可以得到最低的延迟。

所以 Flink中的水位线,其实是流处理中对低延迟和结果正确性的一个权衡机制,而且把 控制的权力交给了程序员,我们可以在代码中定义水位线的生成策略。

🕹️内置水位线 

对于有序流,主要特点就是时间戳单调增长,所以永远不会出现迟到数据的问题。这是 周期性生成水位线的最简单的场景,直接调用 WatermarkStrategy.forMonotonousTimestamps() 方法就可以实现

注意并行度输出

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        DataStreamSource<String> stream = env.socketTextStream("localhost", 7777);
        SingleOutputStreamOperator<WaterSensor> DS = stream.map(new MapFunction<String, WaterSensor>() {
            @Override
            public WaterSensor map(String s) throws Exception {
                String[] list = s.split(",");
                return new WaterSensor(list[0], Long.valueOf(list[1]), Integer.valueOf(list[2]));
            }
        });

        // TODO 1.定义Watermark策略
        WatermarkStrategy<WaterSensor> watermarkStrategy = WatermarkStrategy
                // 1.1 指定watermark生成:升序的watermark,没有等待时间
                .<WaterSensor>forMonotonousTimestamps()
                // 1.2 指定 时间戳分配器,从数据中提取
                .withTimestampAssigner(new SerializableTimestampAssigner<WaterSensor>() {
                    @Override
                    public long extractTimestamp(WaterSensor element, long recordTimestamp) {
                        // 返回的时间戳,要 毫秒
                        System.out.println("数据=" + element + ",recordTs=" + recordTimestamp);
                        return element.getTs() * 1000L;
                    }
                });

        // TODO 2. 指定 watermark策略
        SingleOutputStreamOperator<WaterSensor> sensorDSwithWatermark = DS.assignTimestampsAndWatermarks(watermarkStrategy);


        sensorDSwithWatermark.keyBy(sensor -> sensor.getId())
                // TODO 3.使用 事件时间语义 的窗口
                .window(TumblingEventTimeWindows.of(Time.seconds(5)))
                .process(
                        new ProcessWindowFunction<WaterSensor, String, String, TimeWindow>() {

                            @Override
                            public void process(String s, Context context, Iterable<WaterSensor> elements, Collector<String> out) throws Exception {
                                long startTs = context.window().getStart();
                                long endTs = context.window().getEnd();
                                String windowStart = DateFormatUtils.format(startTs, "yyyy-MM-dd HH:mm:ss.SSS");
                                String windowEnd = DateFormatUtils.format(endTs, "yyyy-MM-dd HH:mm:ss.SSS");

                                long count = elements.spliterator().estimateSize();

                                out.collect("key=" + s + "的窗口[" + windowStart + "," + windowEnd + ")包含" + count + "条数据===>" + elements.toString());
                            }
                        }
                )
                .print();
        env.execute();

        /**
         * 数据=WaterSensor{id='s1', ts=1, vc=1},recordTs=-9223372036854775808
         * 数据=WaterSensor{id='s1', ts=2, vc=1},recordTs=-9223372036854775808
         * 数据=WaterSensor{id='s1', ts=3, vc=1},recordTs=-9223372036854775808
         * 数据=WaterSensor{id='s1', ts=4, vc=1},recordTs=-9223372036854775808
         * 数据=WaterSensor{id='s1', ts=5, vc=1},recordTs=-9223372036854775808
         * key=s1的窗口[1970-01-01 08:00:00.000,1970-01-01 08:00:05.000)包含4条数据===>[WaterSensor{id='s1', ts=1, vc=1}, WaterSensor{id='s1', ts=2, vc=1}, WaterSensor{id='s1', ts=3, vc=1}, WaterSensor{id='s1', ts=4, vc=1}]
         * 数据=WaterSensor{id='s1', ts=6, vc=1},recordTs=-9223372036854775808
         * 数据=WaterSensor{id='s1', ts=7, vc=1},recordTs=-9223372036854775808
         * 数据=WaterSensor{id='s1', ts=9, vc=1},recordTs=-9223372036854775808
         * 数据=WaterSensor{id='s1', ts=10, vc=1},recordTs=-9223372036854775808
         * key=s1的窗口[1970-01-01 08:00:05.000,1970-01-01 08:00:10.000)包含4条数据===>[WaterSensor{id='s1', ts=5, vc=1}, WaterSensor{id='s1', ts=6, vc=1}, WaterSensor{id='s1', ts=7, vc=1}, WaterSensor{id='s1', ts=9, vc=1}]
         */

    }

由于乱序流中需要等待迟到数据到齐,所以必须设置一个固定量的延迟时间。这时生成 水位线的时间戳,就是当前数据流中最大的时间戳减去延迟的结果,相当于把表调慢,当前 时钟会滞后于数据的最大时间戳。调用 WatermarkStrategy. forBoundedOutOfOrderness()方法就 可以实现。这个方法需要传入一个 maxOutOfOrderness 参数,表示“最大乱序程度”,它表示 数据流中乱序数据时间戳的最大差值;如果我们能确定乱序程度,那么设置对应时间长度的 延迟,就可以等到所有的乱序数据了。

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        DataStreamSource<String> stream = env.socketTextStream("localhost", 7777);
        SingleOutputStreamOperator<WaterSensor> DS = stream.map(new MapFunction<String, WaterSensor>() {
            @Override
            public WaterSensor map(String s) throws Exception {
                String[] list = s.split(",");
                return new WaterSensor(list[0], Long.valueOf(list[1]), Integer.valueOf(list[2]));
            }
        });

        // TODO 1.定义Watermark策略
        WatermarkStrategy<WaterSensor> watermarkStrategy = WatermarkStrategy
                // 1.1 指定watermark生成:乱的watermark,没有等待时间
                .<WaterSensor>forBoundedOutOfOrderness(Duration.ofSeconds(3))
                // 1.2 指定 时间戳分配器,从数据中提取
                .withTimestampAssigner((element, recordTimestamp) -> {
                    // 返回的时间戳,要 毫秒
                    System.out.println("数据=" + element + ",recordTs=" + recordTimestamp);
                    return element.getTs() * 1000L;
                });

        // TODO 2. 指定 watermark策略
        SingleOutputStreamOperator<WaterSensor> sensorDSwithWatermark = DS.assignTimestampsAndWatermarks(watermarkStrategy);


        sensorDSwithWatermark.keyBy(sensor -> sensor.getId())
                // TODO 3.使用 事件时间语义 的窗口
                .window(TumblingEventTimeWindows.of(Time.seconds(5)))
                .process(
                        new ProcessWindowFunction<WaterSensor, String, String, TimeWindow>() {

                            @Override
                            public void process(String s, Context context, Iterable<WaterSensor> elements, Collector<String> out) throws Exception {
                                long startTs = context.window().getStart();
                                long endTs = context.window().getEnd();
                                String windowStart = DateFormatUtils.format(startTs, "yyyy-MM-dd HH:mm:ss.SSS");
                                String windowEnd = DateFormatUtils.format(endTs, "yyyy-MM-dd HH:mm:ss.SSS");

                                long count = elements.spliterator().estimateSize();

                                out.collect("key=" + s + "的窗口[" + windowStart + "," + windowEnd + ")包含" + count + "条数据===>" + elements.toString());
                            }
                        }
                )
                .print();
        env.execute();


    }

 周期性水位生成器

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        DataStreamSource<String> stream = env.socketTextStream("localhost", 7777);
        SingleOutputStreamOperator<WaterSensor> DS = stream.map(new MapFunction<String, WaterSensor>() {
            @Override
            public WaterSensor map(String s) throws Exception {
                String[] list = s.split(",");
                return new WaterSensor(list[0], Long.valueOf(list[1]), Integer.valueOf(list[2]));
            }
        });

        // TODO 1.定义Watermark策略
        WatermarkStrategy<WaterSensor> watermarkStrategy = WatermarkStrategy
                // 1.1 指定watermark生成:乱的watermark,没有等待时间
                .<WaterSensor>forGenerator(new WatermarkStrategy<WaterSensor>() {
                    @Override
                    public WatermarkGenerator<WaterSensor> createWatermarkGenerator(WatermarkGeneratorSupplier.Context context) {
                        return new MyWaterStrategy<>(3000L);
                    }
                })
                // 1.2 指定 时间戳分配器,从数据中提取
                .withTimestampAssigner((element, recordTimestamp) -> {
                    // 返回的时间戳,要 毫秒
                    System.out.println("数据=" + element + ",recordTs=" + recordTimestamp);
                    return element.getTs() * 1000L;
                });

        // TODO 2. 指定 watermark策略
        SingleOutputStreamOperator<WaterSensor> sensorDSwithWatermark = DS.assignTimestampsAndWatermarks(watermarkStrategy);


        sensorDSwithWatermark.keyBy(sensor -> sensor.getId())
                // TODO 3.使用 事件时间语义 的窗口
                .window(TumblingEventTimeWindows.of(Time.seconds(5)))
                .process(
                        new ProcessWindowFunction<WaterSensor, String, String, TimeWindow>() {

                            @Override
                            public void process(String s, Context context, Iterable<WaterSensor> elements, Collector<String> out) throws Exception {
                                long startTs = context.window().getStart();
                                long endTs = context.window().getEnd();
                                String windowStart = DateFormatUtils.format(startTs, "yyyy-MM-dd HH:mm:ss.SSS");
                                String windowEnd = DateFormatUtils.format(endTs, "yyyy-MM-dd HH:mm:ss.SSS");

                                long count = elements.spliterator().estimateSize();

                                out.collect("key=" + s + "的窗口[" + windowStart + "," + windowEnd + ")包含" + count + "条数据===>" + elements.toString());
                            }
                        }
                )
                .print();
        env.execute();


    }
    
    public static class MyWaterStrategy<T> implements WatermarkGenerator<T> {
        private long delay;
        private long maxTs;

        public MyWaterStrategy(long delay) {
            this.delay = delay;
            this.maxTs = Long.MIN_VALUE+this.delay+1;
        }

        @Override
        public void onEvent(T event, long eventTimestamp, WatermarkOutput output) {
            maxTs = Math.max(maxTs, eventTimestamp);
        }

        @Override
        public void onPeriodicEmit(WatermarkOutput output) {
            output.emitWatermark(new Watermark(maxTs- delay -1));

        }
    }

🕹️并行,水位线传递  

而当一个任务接收到多个上游并行任务传递来的水位线时,应该以 最小的那个作为当前任务的事件时钟。

在多个上游并行任务中,如果有其中一个没有数据,由于当前 Task 是以最小的那个作为 当前任务的事件时钟,就会导致当前 Task 的水位线无法推进,就可能导致窗口无法触发。这 时候可以设置空闲等待。

.withIdleness(Duration.ofSecond(3))

迟到数据处理:

Flink的窗口,也允许迟到数据。当触发了窗口计算后,会先计算当前的结果,但是此时 并不会关闭窗口。

以后每来一条迟到数据,就触发一次这条数据所在窗口计算(增量计算)。直到 wartermark 超过了窗口结束时间+推迟时间,此时窗口会真正关闭。

.window(TumblingEventTimeWindows.of(Time.seconds(5)))

.allowedLateness(Time.seconds(3))

允许迟到只能运用在 event time 上

🎲时间的合流

        可以发现,根据某个 key 合并两条流,与关系型数据库中表的 join 操作非常相近。事实 上,Flink 中两条流的 connect 操作,就可以通过 keyBy 指定键进行分组后合并,实现了类似 于 SQL 中的 join 操作;另外 connect 支持处理函数,可以使用自定义实现各种需求,其实已 经能够处理双流 join 的大多数场景。

        不过处理函数是底层接口,所以尽管 connect能做的事情多,但在一些具体应用场景下还 是显得太过抽象了。比如,如果我们希望统计固定时间内两条流数据的匹配情况,那就需要 自定义来实现——其实这完全可以用窗口(window)来表示。为了更方便地实现基于时间的 合流操作,Flink 的 DataStrema API 提供了内置的 join 算子。

🕹️窗口联结(Window Join)

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        // TODO
        SingleOutputStreamOperator<Tuple2<String, Integer>> DS1 = env.fromElements(
                Tuple2.of("a", 1),
                Tuple2.of("a", 2),
                Tuple2.of("b", 7),
                Tuple2.of("b", 5),
                Tuple2.of("c", 3)
        ).assignTimestampsAndWatermarks(
                WatermarkStrategy
                .<Tuple2<String, Integer>>forMonotonousTimestamps()
                .withTimestampAssigner(
                        (value, ts) -> (value.f1 * 1000L)
                ));

        SingleOutputStreamOperator<Tuple3<String, Integer, Integer>> DS2 = env.fromElements(
                Tuple3.of("a", 1, 1),
                Tuple3.of("a", 8, 1),
                Tuple3.of("b", 8, 1),
                Tuple3.of("b", 5, 1),
                Tuple3.of("c", 3, 1)
        ).assignTimestampsAndWatermarks(WatermarkStrategy
                .<Tuple3<String, Integer, Integer>>forMonotonousTimestamps()
                .withTimestampAssigner(
                        (value, ts) -> value.f1 * 1000L
                ));

        DataStream<String> join = DS1.join(DS2)
                .where(x -> x.f0)// ds1的keyBy
                .equalTo(x -> x.f0)// ds2的keyBy
                .window(TumblingEventTimeWindows.of(Time.seconds(5)))
                .apply(new JoinFunction<Tuple2<String, Integer>, Tuple3<String, Integer, Integer>, String>() {
                    /**
                     *
                     * @param first The element from first input.
                     * @param second The element from second input.
                     * @return
                     * @throws Exception
                     */
                    @Override
                    public String join(Tuple2<String, Integer> first, Tuple3<String, Integer, Integer> second) throws Exception {
                        return first + "-------" + second;
                    }
                });
        join.print();


        env.execute();
    }

🕹️间隔联结(Interval Join)

Flink 提供了一种叫作“间隔联结”(interval join)的合流操作。 顾名思义,间隔联结的思路就是针对一条流的每个数据,开辟出其时间戳前后的一段时间间 隔,看这期间是否有来自另一条流的数据匹配。

案例需求:在电商网站中,某些用户行为往往会有短时间内的强关联。我们这里举一个 例子,我们有两条流,一条是下订单的流,一条是浏览数据的流。我们可以针对同一个用户, 来做这样一个联结。也就是使用一个用户的下订单的事件和这个用户的最近十分钟的浏览据进行一个联结查询。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1100307.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

【Linux学习笔记】代码编辑工具vim

1. vim工具基本模式的转换2. vim命令模式下的各种编辑命令2.1. 光标行定位2.2. 光标自由定位2.3. 复制粘贴2.4. 删除2.5. 文本的大小写替换2.6. 文本的替换2.7. 文本的前删后删2.8. 撤销操作 3. vim底行模式下的命令3.1. 设置行号与取消设置行号3.2. 分屏操作3.3. 在不退出vim的…

Postman简单使用

文章目录 一.接口测试流程二、Postman接口测试工具三、接口关联四、全局变量和环境变量 一.接口测试流程 拿到API接口文档&#xff08;从开发拿或者抓包获取&#xff09;&#xff0c;熟悉接口业务&#xff0c;接口地址&#xff0c;错误码等等 编写接口的测试用例以及评审 编写…

SSL证书续费要如何操作

SSL证书一旦到期&#xff0c;网站会立即无法访问&#xff0c;而且会提出不安全警告&#xff0c;如果是电商或者品牌网站影响还是很大的。 SSL证书和域名续费有很大区别&#xff0c;域名续费只要交钱就可以了&#xff0c;SSL证书续费还需要认证和更新服务器SSL证书文件才算收工…

【多线程】JUC(java.util.concurrent)的常见类 信号量 线程安全的集合类

目录 1. Callable接口 1.1 Callable接口和Runnable接口的区别&#xff1f; 1.2 使用Callable接口编写代码。 2. ReentrantLock 可重入锁 3.信号量 semaphore 3.1 Java中信号量的使用 4.CountDownLatch JUC: java.util.concurrent -> 这个包里的内容主要是一些多线程…

智能变电站自动化系统的应用与产品选型

摘要&#xff1a;现如今&#xff0c;智能变电站发展已经成为了电力系统发展过程中的内容&#xff0c;如何提高智能变电站的运行效率也成为电力系统发展的一个重要目标&#xff0c;为了能够更好地促进电力系统安全稳定运行&#xff0c;本文则就智能变电站自动化系统的实现进行了…

青藏高原连续日光诱导叶绿素荧光数据集(2000-2018)

简介&#xff1a; 青藏高原连续日光诱导叶绿素荧光数据集&#xff08;2000-2018&#xff09;是通过MODIS各通道反射率和SIF观测数据建立神经网络模型&#xff0c;从而得到较高时空分辨率的SIF数据&#xff0c;常作为初级生产力的参考。前言 – 人工智能教程 源数据范围为全球&…

网工实验笔记:MQC原理与配置

一、概述 MQC&#xff08;Modular QoS Command-Line Interface&#xff0c;模块化QoS命令行&#xff09;是指通过将具有某类共同特征的数据流划分为一类&#xff0c;并为同一类数据流提供相同的服务&#xff0c;也可以对不同类的数据流提供不同的服务。 MQC三要素 流分类&am…

15-k8s-高级存储之pv与pvc

文章目录 一、相关概念二、创建pv二、创建pvc三、创建pod调用pvc四、StorageClass动态制备pv 一、相关概念 关系 生命周期相关概念 2.1 静态构建&#xff1a;集群管理员创建若干PV卷。这些卷对象带有真实存储的细节信息,并且对集群用户可用(可见)。PV卷对象存在于Kubernetes …

摩尔信使MThings的设备高级参数

摩尔信使MThings支持三级参数管理方案&#xff0c;依次为&#xff1a;数据级、设备级、通道级。 设备级参数不仅包含设备名称、设备地址等常用信息&#xff0c;同时提供了诸多高级参数&#xff0c;其同样是为了满足不同用户应用场景中所面临的差异化需求&#xff0c;以更加灵活…

勒索病毒LockBit2.0 数据库(mysql与sqlsever)解锁恢复思路分享

0.前言 今天公司服务器中招LockBit2.0勒索病毒&#xff0c;损失惨重&#xff0c;全体加班了一天基本解决了部分问题&#xff0c;首先是丢失的文件数据就没法恢复了&#xff0c;这一块没有理睬&#xff0c;主要恢复的是两个数据库&#xff0c;一个是16GB大小的SQLserver数据库&…

安徽阳光心理测量平台目录遍历

安徽阳光心理测量平台目录遍历 FOFA指纹 title"心理测量平台"漏洞复现 路由后拼接/admin/UserFiles/ GET /admin/UserFiles/ HTTP/1.1 Host: {{Hostname}}修复方案 针对路径设定对应权限

注释的重要性与程序员的责任

注释的重要性与程序员的责任 提升代码可读性促进团队协作提高代码可维护性传承知识和经验代码的责任推荐学习 导语&#xff1a;在编写代码的过程中&#xff0c;注释是程序员们经常讨论的话题。有人认为忽视注释等于耍流氓&#xff0c;但也有人觉得注释只是浪费时间。本文将探讨…

软件开发项目文档系列之三如何撰写项目招标文件

前言 招标文件在采购过程中扮演着至关重要的角色&#xff0c;其主要目的是提供清晰而详尽的信息&#xff0c;以确保采购项目的需求得以明确&#xff0c;潜在的投标单位能够清晰理解并遵守相关要求&#xff0c;并最终为采购方提供一个有力的依据来评估和选择最合适的承建单位。…

c++之new和delete

前言 在本文中&#xff0c;您将学习使用new和delete操作在C 中有效地管理内存。 数组可用于存储多个同类型数据&#xff0c;但是使用数组存在严重的缺点。声明数组时应分配内存&#xff0c;但在大多数情况下&#xff0c;直到运行时才能确定所需的确切内存。在这种情况下&#…

python每日一练(8)

&#x1f308;write in front&#x1f308; &#x1f9f8;大家好&#xff0c;我是Aileen&#x1f9f8;.希望你看完之后&#xff0c;能对你有所帮助&#xff0c;不足请指正&#xff01;共同学习交流. &#x1f194;本文由Aileen_0v0&#x1f9f8; 原创 CSDN首发&#x1f412; 如…

微信小程序开发指南

前言 微信是一款由中国著名互联网公司腾讯公司开发的社交软件&#xff0c;于2011年1月21日正式上线。在成立后的短短几年时间里&#xff0c;微信以其简单易用的界面和强大的功能&#xff0c;快速赢得了全球用户的青睐。截止2021年&#xff0c;微信已经有超过10亿的活跃用户&am…

如何使用 OpenSSL 来检查证书,来确保网络通信的安全性?

OpenSSL 是一个强大的安全套接字层密码库&#xff0c;包含丰富的加密算法、常用的密钥和证书封装管理功能以及 SSL/TLS 协议&#xff0c;并提供了丰富的应用程序供测试或其他目的使用。要使用 OpenSSL 来检查证书以确保网络通信的安全性&#xff0c;您可以遵循以下步骤&#xf…

【ARM Coresight Debug 系列 16 -- Linux 断点 BRK 中断使用详细介绍】

文章目录 1.1 ARM BRK 指令1.2 BRK 立即数宏定义介绍1.3 断点异常处理流程1.3.1 el1_sync_handler1.3.2 el1_dbg 跟踪 1.4 debug 异常处理函数注册1.4.1 brk 处理函数的注册 1.1 ARM BRK 指令 ARMv8 架构的 BRK 指令是用于生成一个软件断点的。当处理器执行到 BRK 指令时&…

【小黑嵌入式系统第二课】嵌入式系统的概述(二)

文章目录 一、嵌入式系统的组成二、嵌入式处理器三、嵌入式外围设备1. 存储设备2. 通信设备3. 显示设备 四、硬件抽象层HAL五、嵌入式操作系统六、应用程序七、嵌入式处理器1、MCU2、MPU3、DSP4、SOC5、SOPC 八、ARM处理器简介ARM处理器的特点ARM处理器的发展历程ARM体系结构版…

【递归知识+练习】

文章目录 递归♥♥♥ 栈存储的顺序&#xff1a;按顺序打印一个数字的每一位递归求N&#xff01;的阶层递归求1234...10写一个递归方法&#xff0c;输入一个非负整数。返回组成它的数字之和&#xff08;不熟&#xff09;斐波那契数列&#xff08;不熟&#xff09; 总结 递归 递…