7、如何使用Flink中的窗口(Window算子)

news2024/11/27 19:45:15

目录

1、如何理解 Flink中的窗口(window)

2、Flink中窗口的类型

2.1 根据上游DataStream类型分类

2.2 根据驱动类型分类

2.3 根据进入到窗口数据的分发规则分类

3、怎样使用 Flink中的 Window算子

4、怎样使用 Flink中的 Window Assigners

4.1、基于处理时间的滑动窗口

4.2、基于处理时间的滚动窗口

4.3、基于处理时间的会话窗口

4.4、基于事件时间的滑动窗口

4.5、基于事件时间的滚动窗口

4.6、基于事件时间的会话窗口

4.7、计数窗口

5、怎样使用 Flink中的 Window Funcation

5.1、ReduceFunction

5.2、AggregateFunction

5.3、ProcessWindowFunction

5.4、增量聚合的 ProcessWindowFunction

6、Flink中的 Window的生命周期

6.1、触发器 - Triggers

6.2、移除器 - Evictors

7、对迟到数据的处理

7.1、生成水位线时,设置最大乱序时间

7.2、设置窗口延迟关闭 - allowedLateness

7.3、使用侧输出流获取迟到的数据 - sideOutputLateData


1、如何理解 Flink中的窗口(window)

        Flink中的窗口好像一个桶,可以根据不同的时间语义,将无界流中的数据分配到指定的桶中去,再通过 水位线或者处理时间 触发对桶中的数据进行计算

        其目的就是为了将无限的数据 根据指定的规则进行切分成有限的数据进行计算


2、Flink中窗口的类型

2.1 根据上游DataStream类型分类

按键分区窗口(Keyed Windows) :

         基于KeyedStream做窗口操作,窗口计算会在多个并行子任务上同时执行,相同的key的数据会进入到同一个窗口中去。

非按键分区-Non-Keyed Windows :

        基于DataStream做窗口操作,流上的数据会进入同一窗口中,只能有一个Task处理(不推荐这种方式)

2.2 根据驱动类型分类

时间窗口(Time Window):

        通过指定的时间语义的时间点来定义窗口的开始和结束,流中的数据被分配到哪个窗口中,也由数据上的时间标识来决定,当接收到带有结束时间标识的数据时,将触发窗口计算,并销毁窗口

计数窗口(Count Window):

       窗口的大小由数据个数来限制,当窗口内接收的数据个数到达窗口大小时,将触发窗口计算,并销毁窗口

 2.3 根据进入到窗口数据的分发规则分类

滚动窗口(Tumbling Windows):

        滚动窗口的大小是固定的,且各自范围之间不重叠。 比如说,如果你指定了滚动窗口的大小为 5 分钟,那么每 5 分钟就会有一个窗口被计算,且一个新的窗口被创建(如下图所示)

滑动窗口(Sliding Windows):

        滑动窗口的 assigner 分发元素到指定大小的窗口,窗口大小通过 window size 参数设置。 滑动窗口需要一个额外的滑动距离(window slide)参数来控制生成新窗口的频率。 因此,如果 slide 小于窗口大小,滑动窗口可以允许窗口重叠。这种情况下,一个元素可能会被分发到多个窗口。

比如说,你设置了大小为 10 分钟,滑动距离 5 分钟的窗口,你会在每 5 分钟得到一个新的窗口, 里面包含之前 10 分钟到达的数据(如下图所示)

会话窗口(Session Windows): 

         与滚动窗口滑动窗口不同,会话窗口不会相互重叠,且没有固定的开始或结束时间。 会话窗口在一段时间没有收到数据之后会关闭,即在一段不活跃的间隔之后。 会话窗口的 assigner 可以设置固定的会话间隔(session gap)或 用 session gap extractor 函数来动态地定义多长时间算作不活跃。 当超出了不活跃的时间段,当前的会话就会关闭,并且将接下来的数据分发到新的会话窗口。

全局窗口(Global Windows): 

全局窗口的 assigner 将拥有相同 key 的所有数据分发到一个全局窗口。 这样的窗口模式仅在你指定了自定义的 trigger 时有用。 否则,计算不会发生,因为全局窗口没有天然的终点去触发其中积累的数据。


3、怎样使用 Flink中的 Window算子

Keyed Windows API :

stream
       .keyBy(...)               <-  仅 keyed 窗口需要
       .window(...)              <-  必填项:"assigner" 	 指定窗口类型
      [.trigger(...)]            <-  可选项:"trigger"  	 指定触发器
      [.evictor(...)]            <-  可选项:"evictor"  	 指定移除器
      [.allowedLateness(...)]    <-  可选项:"lateness" 	 指定窗口延迟关闭时间 
      [.sideOutputLateData(...)] <-  可选项:"output tag" 
       .reduce/aggregate/apply() <-  必填项:"function"   指定窗口聚合函数
      [.getSideOutput(...)]      <-  可选项:"output tag" 指定侧输出流(用来接收迟到数据)

Non-Keyed Windows API :

stream
       .windowAll(...)           <-  必填项:"assigner" 	 指定窗口类型
      [.trigger(...)]            <-  可选项:"trigger"  	 指定触发器
      [.evictor(...)]            <-  可选项:"evictor"  	 指定移除器
      [.allowedLateness(...)]    <-  可选项:"lateness" 	 指定窗口延迟关闭时间 
      [.sideOutputLateData(...)] <-  可选项:"output tag" 指定侧输出流(用来接收迟到数据)
       .reduce/aggregate/apply() <-  必填项:"function"   指定窗口聚合函数
      [.getSideOutput(...)]      <-  可选项:"output tag" 

4、怎样使用 Flink中的 Window Assigners

功能说明:

        通过stream.window(WindowAssigner) 来指定 窗口的类型

4.1、基于处理时间的滑动窗口

使用场景:

        每隔x秒统计近y秒内的数据(y>x)

代码示例:

/*
 * TODO 基于处理时间的滑动窗口
 *     每2秒计算最近10秒内的数据
 * */
public class SlidingProcessingTimeWindow {
    public static void main(String[] args) throws Exception {
        // 1.获取执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(3);

        // 2.将socket作为数据源(开启socket端口: nc -lk 9999)
        // TODO Window:KeyedStream → WindowedStream
        Window(env);

        // TODO WindowAll:DataStream → AllWindowedStream
        //WindowAll(env);

        // 3.触发程序执行
        env.execute();
    }

    // TODO Keyed Windows
    private static void Window(StreamExecutionEnvironment env) {
        env
                .socketTextStream("localhost", 9999)
                .map(new MapFunction<String, Tuple2<String, Long>>() {
                         @Override
                         public Tuple2 map(String value) throws Exception {
                             return new Tuple2(value.split(",")[0], Long.parseLong(value.split(",")[1]));
                         }
                     }
                )
                .keyBy(value -> value.f0)
                // 滑动窗口,窗口长度10s,滑动步长2s
                .window(SlidingProcessingTimeWindows.of(Time.seconds(10), Time.seconds(2)))
                .process(new ShowProcessWindowFunction())
                .print()
        ;
    }

    // TODO Non-Keyed Windows
    private static void WindowAll(StreamExecutionEnvironment env) {
        env
                .socketTextStream("localhost", 9999)
                .map(new MapFunction<String, Tuple2<String, Long>>() {
                         @Override
                         public Tuple2 map(String value) throws Exception {
                             return new Tuple2(value.split(",")[0], Long.parseLong(value.split(",")[1]));
                         }
                     }
                )
                // 滑动窗口,窗口长度5s,滑动步长2s
                .windowAll(SlidingProcessingTimeWindows.of(Time.seconds(5), Time.seconds(2)))
                .process(new ShowProcessAllWindowFunction())
                .print()
        ;
    }
}

4.2、基于处理时间的滚动窗口

使用场景:

        计算固定时间段内的数据

代码示例:

/*
 * TODO 基于处理时间的滚动窗口
 *   计算每小时内的用户访问数
 * */
public class TumblingProcessingTimeWindow {
    public static void main(String[] args) throws Exception {
        // 1.获取执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(3);

        // 2.将socket作为数据源(开启socket端口: nc -lk 9999)
        // TODO Window:KeyedStream → WindowedStream
        //Window(env);

        // TODO WindowAll:DataStream → AllWindowedStream
        WindowAll(env);

        // 3.触发程序执行
        env.execute();
    }

    // TODO Keyed Windows
    private static void Window(StreamExecutionEnvironment env) {
        env
                .socketTextStream("localhost", 9999)
                .map(new MapFunction<String, Tuple2<String, Long>>() {
                         @Override
                         public Tuple2 map(String value) throws Exception {
                             return new Tuple2(value.split(",")[0], Long.parseLong(value.split(",")[1]));
                         }
                     }
                )
                .keyBy(value -> value.f0)
                // 滚动窗口,窗口长度5s
                .window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
                .process(new ShowProcessWindowFunction())
                .print()
        ;
    }

    // TODO Non-Keyed Windows
    private static void WindowAll(StreamExecutionEnvironment env) {
        env
                .socketTextStream("localhost", 9999)
                .map(new MapFunction<String, Tuple2<String, Long>>() {
                         @Override
                         public Tuple2 map(String value) throws Exception {
                             return new Tuple2(value.split(",")[0], Long.parseLong(value.split(",")[1]));
                         }
                     }
                )
                // 滚动窗口,窗口长度5s
                .windowAll(TumblingProcessingTimeWindows.of(Time.seconds(5)))
                .process(new ShowProcessAllWindowFunction())
                .print()
        ;
    }
}

4.3、基于处理时间的会话窗口

使用场景:

        计算指定时间间隔内的数据

代码示例:

/*
 * TODO 基于处理时间的会话窗口
 *    相邻两个元素的处理时间间隔 大于指定会话周期 触发窗口计算
 * */
public class ProcessingTimeSessionWindow {
    public static void main(String[] args) throws Exception {
        // 1.获取执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(3);

        // 2.将socket作为数据源(开启socket端口: nc -lk 9999)
        // TODO Window:KeyedStream → WindowedStream
        //Window(env);

        // TODO WindowAll:DataStream → AllWindowedStream
        WindowAll(env);

        // 3.触发程序执行
        env.execute();
    }

    // TODO Keyed Windows
    private static void Window(StreamExecutionEnvironment env) {
        env
                .socketTextStream("localhost", 9999)
                .map(new MapFunction<String, Tuple2<String, Long>>() {
                         @Override
                         public Tuple2 map(String value) throws Exception {
                             return new Tuple2(value.split(",")[0], Long.parseLong(value.split(",")[1]));
                         }
                     }
                )
                .keyBy(value -> value.f0)
                // 会话窗口,超时间隔5s
                .window(ProcessingTimeSessionWindows.withGap(Time.seconds(5)))
                .process(new ShowProcessWindowFunction())
                .print()
        ;
    }

    // TODO Non-Keyed Windows
    private static void WindowAll(StreamExecutionEnvironment env) {
        env
                .socketTextStream("localhost", 9999)
                .map(new MapFunction<String, Tuple2<String, Long>>() {
                         @Override
                         public Tuple2 map(String value) throws Exception {
                             return new Tuple2(value.split(",")[0], Long.parseLong(value.split(",")[1]));
                         }
                     }
                )
                // 会话窗口,超时间隔5s
                .windowAll(ProcessingTimeSessionWindows.withGap(Time.seconds(5)))
                .process(new ShowProcessAllWindowFunction())
                .print()
        ;
    }
}

4.4、基于事件时间的滑动窗口

代码示例:

/*
 * TODO 基于事件时间的滑动窗口
 *   每2秒计算一次最近10秒内的数据
 * */
public class SlidingEventTimeWindow {
    public static void main(String[] args) throws Exception {
        // 1.获取执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(3);

        // 2.将socket作为数据源(开启socket端口: nc -lk 9999)
        // TODO Window:KeyedStream → WindowedStream
        Window(env);

        // TODO WindowAll:DataStream → AllWindowedStream
        //WindowAll(env);

        // 3.触发程序执行
        env.execute();
    }

    // TODO Keyed Windows
    private static void Window(StreamExecutionEnvironment env) {
        env
                .socketTextStream("localhost", 9999)
                .map(new MapFunction<String, Tuple2<String, Long>>() {
                         @Override
                         public Tuple2 map(String value) throws Exception {
                             return new Tuple2(value.split(",")[0], Long.parseLong(value.split(",")[1]));
                         }
                     }
                )
                .assignTimestampsAndWatermarks(
                        WatermarkStrategy
                                .<Tuple2<String, Long>>forMonotonousTimestamps()
                                .withTimestampAssigner((element, recordTimestamp) -> element.f1)
                )
                .keyBy(value -> value.f0)
                // 滚动窗口,窗口长度10s,滑动步长2s
                .window(SlidingEventTimeWindows.of(Time.seconds(5), Time.seconds(1)))
                .process(new ShowProcessWindowFunction())
                .print()
        ;
    }

    // TODO Non-Keyed Windows
    private static void WindowAll(StreamExecutionEnvironment env) {
        env
                .socketTextStream("localhost", 9999)
                .map(new MapFunction<String, Tuple2<String, Long>>() {
                         @Override
                         public Tuple2 map(String value) throws Exception {
                             return new Tuple2(value.split(",")[0], Long.parseLong(value.split(",")[1]));
                         }
                     }
                )
                .assignTimestampsAndWatermarks(
                        WatermarkStrategy
                                //.<Tuple2<String, Long>>forMonotonousTimestamps()
                                .<Tuple2<String, Long>>forGenerator(new PeriodWatermarkStrategy())
                                .withTimestampAssigner(
                                        (Tuple2<String, Long> element, long recordTimestamp) -> {
                                            //System.out.println("Step1:extractTimestamp-从事件数据中提取时间戳");
                                            return element.f1;
                                        }
                                )
                        //.withIdleness(Duration.ofSeconds(5))  //空闲等待5s
                )
                .keyBy(value -> value.f0)
                // 滑动窗口,窗口长度10s,滑动步长2s
                .windowAll(SlidingEventTimeWindows.of(Time.seconds(5), Time.seconds(2)))
                .process(new ShowProcessAllWindowFunction())
                .print()
        ;
    }

}

4.5、基于事件时间的滚动窗口

代码示例:

/*
 * TODO 基于事件时间的滚动窗口
 *   计算每个窗口周期内的数据(计算每小时内的用户访问数)
 * */
public class TumblingEventTimeWindow {
    public static void main(String[] args) throws Exception {
        // 1.获取执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(3);

        // 2.将socket作为数据源(开启socket端口: nc -lk 9999)
        // TODO Window:KeyedStream → WindowedStream
        //Window(env);

        // TODO WindowAll:DataStream → AllWindowedStream
        WindowAll(env);

        // 3.触发程序执行
        env.execute();
    }

    // TODO Keyed Windows
    private static void Window(StreamExecutionEnvironment env) {
        env
                .socketTextStream("localhost", 9999)
                .map(new MapFunction<String, Tuple2<String, Long>>() {
                         @Override
                         public Tuple2 map(String value) throws Exception {
                             return new Tuple2(value.split(",")[0], Long.parseLong(value.split(",")[1]));
                         }
                     }
                )
                .assignTimestampsAndWatermarks(
                        WatermarkStrategy
                                .<Tuple2<String, Long>>forMonotonousTimestamps()
                                .withTimestampAssigner((element, recordTimestamp) -> element.f1)
                )
                .keyBy(value -> value.f0)
                // 滚动窗口,窗口长度5s
                .window(TumblingEventTimeWindows.of(Time.seconds(5)))
                .process(new ShowProcessWindowFunction())
                .print()
        ;
    }

    // TODO Non-Keyed Windows
    private static void WindowAll(StreamExecutionEnvironment env) {
        env
                .socketTextStream("localhost", 9999)
                .map(new MapFunction<String, Tuple2<String, Long>>() {
                         @Override
                         public Tuple2 map(String value) throws Exception {
                             return new Tuple2(value.split(",")[0], Long.parseLong(value.split(",")[1]));
                         }
                     }
                )
                .assignTimestampsAndWatermarks(
                        WatermarkStrategy
                                .<Tuple2<String, Long>>forMonotonousTimestamps()
                                .withTimestampAssigner((element, recordTimestamp) -> element.f1)
                )
                .keyBy(value -> value.f0)
                // 滚动窗口,窗口长度5s
                .windowAll(TumblingEventTimeWindows.of(Time.seconds(5)))
                .process(new ShowProcessAllWindowFunction())
                .print()
        ;
    }
}

4.6、基于事件时间的会话窗口

代码示例:

/*
 * TODO 基于会话时间的会话窗口
 *    相邻两个元素的处理时间间隔 大于指定会话周期 触发窗口计算
 * */
public class EventTimeSessionWindow {
    public static void main(String[] args) throws Exception {
        // 1.获取执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        // 2.将socket作为数据源(开启socket端口: nc -lk 9999)
        // TODO Window:KeyedStream → WindowedStream
        // Window(env);

        // TODO WindowAll:DataStream → AllWindowedStream
        WindowAll(env);

        // 3.触发程序执行
        env.execute();
    }

    // TODO Keyed Windows
    private static void Window(StreamExecutionEnvironment env) {
        env
                .socketTextStream("localhost", 9999)
                .map(new MapFunction<String, Tuple2<String, Long>>() {
                         @Override
                         public Tuple2 map(String value) throws Exception {
                             return new Tuple2(value.split(",")[0], Long.parseLong(value.split(",")[1]));
                         }
                     }
                )
                .assignTimestampsAndWatermarks(
                        WatermarkStrategy
                                .<Tuple2<String, Long>>forMonotonousTimestamps()
                                .withTimestampAssigner((element, recordTimestamp) -> element.f1)
                )
                .keyBy(value -> value.f0)
                // 会话窗口,超时间隔5s
                .window(EventTimeSessionWindows.withGap(Time.seconds(5)))
                .process(new ShowProcessWindowFunction())
                .print()
        ;
    }

    // TODO Non-Keyed Windows
    private static void WindowAll(StreamExecutionEnvironment env) {
        env
                .socketTextStream("localhost", 9999)
                .map(new MapFunction<String, Tuple2<String, Long>>() {
                         @Override
                         public Tuple2 map(String value) throws Exception {
                             return new Tuple2(value.split(",")[0], Long.parseLong(value.split(",")[1]));
                         }
                     }
                )
                .assignTimestampsAndWatermarks(
                        WatermarkStrategy
                                .<Tuple2<String, Long>>forMonotonousTimestamps()
                                .withTimestampAssigner((element, recordTimestamp) -> element.f1)
                )
                // 会话窗口,超时间隔5s
                .windowAll(EventTimeSessionWindows.withGap(Time.seconds(5)))
                .process(new ShowProcessAllWindowFunction())
                .print()
        ;
    }
}

4.7、计数窗口

代码示例:

// TODO 计数窗口
public class countWindow {
    public static void main(String[] args) throws Exception {
        // 1.获取执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(3);

        // 2.将socket作为数据源(开启socket端口: nc -lk 9999)
        // TODO Window:KeyedStream → WindowedStream
        // Window(env);

        // TODO countWindowAll:DataStream → AllWindowedStream
         countWindowAll(env);

        // 3.触发程序执行
        env.execute();
    }

    // TODO Keyed Windows
    private static void Window(StreamExecutionEnvironment env) {
        env
                .socketTextStream("localhost", 9999)
                .map(new MapFunction<String, Tuple2<String, Long>>() {
                         @Override
                         public Tuple2 map(String value) throws Exception {
                             return new Tuple2(value.split(",")[0], Long.parseLong(value.split(",")[1]));
                         }
                     }
                )
                .keyBy(value -> value.f0)
                // TODO 滚动窗口,窗口长度=5个元素
                //.countWindow(5)
                // TODO 滑动窗口,窗口长度=5个元素,滑动步长=2个元素 (每当进入两个数据,就统计下窗口内最五个数据)
                .countWindow(5,2)
                .process(
                        new ProcessWindowFunction<Tuple2<String, Long>, String, String, GlobalWindow>() {
                            @Override
                            public void process(String s, ProcessWindowFunction<Tuple2<String, Long>, String, String, GlobalWindow>.Context context, Iterable<Tuple2<String, Long>> elements, Collector<String> out) throws Exception {
                                // 当前水位线
                                long watermark = context.currentWatermark();
                                // 当前处理时间
                                long processingTime = context.currentProcessingTime();
                                // 窗口开始时间

                                // 窗口结束时间

                                // 计算窗口内数据数量
                                long count = elements.spliterator().estimateSize();

                                String record = "key=" + s
                                        + " 包含" + count + "条数据===>" + elements.toString()
                                        + " 当前Watermark:" + watermark
                                        + " 当前processingTime:" + processingTime;

                                out.collect(record);
                            }
                        }
                )
                .print()
        ;
    }

    // TODO Non-Keyed Windows
    private static void countWindowAll(StreamExecutionEnvironment env) {
        env
                .socketTextStream("localhost", 9999)
                .map(new MapFunction<String, Tuple2<String, Long>>() {
                         @Override
                         public Tuple2 map(String value) throws Exception {
                             return new Tuple2(value.split(",")[0], Long.parseLong(value.split(",")[1]));
                         }
                     }
                )
                // TODO 滚动窗口,窗口长度=5个元素
                .countWindowAll(5)
                // TODO 滑动窗口,窗口长度=5个元素,滑动步长=2个元素 (每当进入两个数据,就统计下窗口内最近的五个数据)
                //.countWindowAll(5,2)
                .process(
                        new ProcessAllWindowFunction<Tuple2<String, Long>, String, GlobalWindow>() {
                            @Override
                            public void process(ProcessAllWindowFunction<Tuple2<String, Long>, String, GlobalWindow>.Context context, Iterable<Tuple2<String, Long>> elements, Collector<String> out) throws Exception {
                                // 当前水位线

                                // 当前处理时间

                                // 窗口开始时间

                                // 窗口结束时间

                                // 计算窗口内数据数量
                                long count = elements.spliterator().estimateSize();

                                String record = "窗口包含" + count + "条数据===>" + elements.toString();

                                out.collect(record);
                            }
                        }
                )
                .print()
        ;
    }

}

5、怎样使用 Flink中的 Window Funcation

窗口函数的作用:

        窗口函数定义了当窗口触发后,对窗口内数据的计算逻辑

窗口函数的分类:

5.1、ReduceFunction

函数功能:

        将两条数据合并成一条数据,输入和输出的数据类型必须相同

代码示例:

/*
 * TODO 增量聚合函数:ReduceFunction
 *  特点:
 *      1.增量聚合:进入窗口一条数据,就会被计算一次(调用reduce方法),但是不会立刻输出(只会更新状态)
 *      2.第一条数据进入窗口后,不会调用 reduce 方法
 *      3.数据类型不能改变:输入数据类型 = 输出数据类型 = `中间累加器数据类型`
 *      4.窗口触发计算时,才会输出窗口的计算结果
 * */
public class ReduceFunctions {
    public static void main(String[] args) throws Exception {
        // 1.获取执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(3);

        // 2.将socket作为数据源(开启socket端口: nc -lk 9999)
        env
                .socketTextStream("localhost", 9999)
                .map(new MapFunction<String, Tuple2<String, Long>>() {
                         @Override
                         public Tuple2 map(String value) throws Exception {
                             return new Tuple2(value.split(",")[0], Long.parseLong(value.split(",")[1]));
                         }
                     }
                )
                .keyBy(value -> value.f0)
                // TODO 指定窗口类型:滚动计数窗口,窗口长度=5个元素
                .countWindow(5)
                // TODO 对窗口内的元素求和
                .reduce(
                        new ReduceFunction<Tuple2<String, Long>>() {

                            /**
                             * @param value1 参与聚合的第一个值,也就是累加器的值
                             * @param value2 参与聚合的第二个值,也就是新进入窗口的时间数据
                             * @return
                             * @throws Exception
                             */
                            @Override
                            public Tuple2<String, Long> reduce(Tuple2<String, Long> value1, Tuple2<String, Long> value2) throws Exception {
                                System.out.println("触发计算:" + value1 + " ," + value2);
                                long sum = value1.f1 + value2.f1;
                                return new Tuple2<>(value1.f0, sum);
                            }
                        }
                )
                .print()
        ;

        // 3.触发程序执行
        env.execute();
    }

}

5.2、AggregateFunction

函数功能:

        将两条数据合并成一条数据,输入和输出的数据类型可以不同

代码示例:

/*
 * TODO 增量聚合函数:AggregateFunction
 *  特点:
 *      1.增量聚合:进入窗口一条数据,就会被计算一次(调用add方法),但是不会立刻输出(只会更累加器的状态)
 *      2.窗口内的第一条数据进入后,会创建窗口,创建累加器并初始化
 *      3.窗口触发计算时,才会输出窗口的计算结果(调用getResult方法)
 *      4.数据类型可以不同:输入数据类型、输出数据类型、累加器数据类型
 *  泛型参数:
 *      AggregateFunction<IN, ACC, OUT>
 *      @IN    :  输入数据类型
 *      @ACC   :  累加器数据类型
 *      @OUT   :  输出数据类型
 *
 * */
public class AggregateFunctions {
    public static void main(String[] args) throws Exception {
        // 1.获取执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(3);

        // 2.将socket作为数据源(开启socket端口: nc -lk 9999)
        env
                .socketTextStream("localhost", 9999)
                .map(new MapFunction<String, Tuple2<String, Long>>() {
                         @Override
                         public Tuple2 map(String value) throws Exception {
                             return new Tuple2(value.split(",")[0], Long.parseLong(value.split(",")[1]));
                         }
                     }
                )
                .keyBy(value -> value.f0)
                // TODO 指定窗口类型:滚动计数窗口,窗口长度=5个元素
                .countWindow(5)
                // TODO 对窗口内的元素求和
                .aggregate(
                        new AggregateFunction<Tuple2<String, Long>, Integer, String>() {
                            /**
                             * 创建累加器,并对累加器做初始化操作
                             * @return
                             */
                            @Override
                            public Integer createAccumulator() {
                                System.out.println("创建累加器");
                                return 0;
                            }

                            /**
                             * 事件数据与累加器的Merge逻辑,用来更新累加器状态
                             * 进入一条数据,调用一次
                             * @param value       The value to add
                             * @param accumulator The accumulator to add the value to
                             * @return
                             */
                            @Override
                            public Integer add(Tuple2<String, Long> value, Integer accumulator) {
                                System.out.println("调用add方法,value=" + value + " 当前累加器:" + accumulator);
                                return accumulator + 1;
                            }

                            /**
                             * 获取累加器的状态值,窗口触发时调用
                             * @param accumulator The accumulator of the aggregation
                             * @return
                             */
                            @Override
                            public String getResult(Integer accumulator) {
                                System.out.println("调用getResult 方法");
                                return accumulator.toString();
                            }

                            /**
                             * 合并窗口逻辑(只有sessionWindow才会用的)
                             * @param a An accumulator to merge
                             * @param b Another accumulator to merge
                             * @return
                             */
                            @Override
                            public Integer merge(Integer a, Integer b) {
                                System.out.println("调用merge方法");
                                return null;
                            }
                        }
                )
                .print()
        ;

        // 3.触发程序执行
        env.execute();
    }

}

5.3、ProcessWindowFunction

函数功能:

        将窗口内所有的数据缓存到Iterable,在窗口触发后对所有数据进行计算

代码示例:

/*
 * TODO 全窗口函数:ProcessWindowFunction
 *  特点:
 *      1.窗口触发时才会调用一次process方法,对窗口内的数据统一计算
 *  泛型参数说明:
 *      ProcessWindowFunction<IN, OUT, KEY, W extends Window>
 *      @IN     :  输入数据类型
 *      @OUT    :  输出数据类型
 *      @KEY    :  key的数据类型
 *      @W      :  window类型(时间窗口、计数窗口)
 *  上下文对象说明:
 *      时间信息:currentProcessingTime()、currentWatermark()
 *      状态信息:windowState()、globalState()
 *      侧输出流:
 *      窗口信息:window()
 *  重点说明:
 *      由于WindowFunction会存储窗口内的所有数据,当窗口内数量特别大时,慎用
 *  思考:
 *      1.什么时候需要使用全窗口函数呢?
 *          计算平均数、计算中位数
 * */
public class ProcessWindowFunctions {
    public static void main(String[] args) throws Exception {
        // 1.获取执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(3);

        // 2.将socket作为数据源(开启socket端口: nc -lk 9999)
        // TODO 基于处理时间的 滚动时间窗口,窗口长度10秒
        timewindow(env);

        // TODO 计数窗口
        //countwindow(env);

        // 3.触发程序执行
        env.execute();
    }

    private static void timewindow(StreamExecutionEnvironment env) {
        env
                .socketTextStream("localhost", 9999)
                .map(new MapFunction<String, Tuple2<String, Long>>() {
                         @Override
                         public Tuple2 map(String value) throws Exception {
                             return new Tuple2(value.split(",")[0], Long.parseLong(value.split(",")[1]));
                         }
                     }
                )
                .keyBy(value -> value.f0)
                // TODO 指定窗口类型:基于处理时间的 滚动时间窗口,窗口长度5秒
                .window(TumblingProcessingTimeWindows.of(Time.seconds(10)))
                // TODO 对窗口内的元素求和
                .process(
                        new ProcessWindowFunction<Tuple2<String, Long>, String, String, TimeWindow>() {

                            /**
                             * @param s        窗口中所属的key
                             * @param context  上下文对象
                             * @param elements 窗口中存储的数据
                             * @param out      采集器,用来向下游发送数据
                             * @throws Exception
                             */
                            @Override
                            public void process(String s, ProcessWindowFunction<Tuple2<String, Long>, String, String, TimeWindow>.Context context, Iterable<Tuple2<String, Long>> elements, Collector<String> out) throws Exception {
                                // 通过窗口对象,获取窗口的范围信息
                                long startTs = context.window().getStart();
                                long endTs = context.window().getEnd();
                                String windowStart = DateFormatUtils.format(startTs, "yyyy-MM-dd HH:mm:ss.SSS");
                                String windowEnd = DateFormatUtils.format(endTs, "yyyy-MM-dd HH:mm:ss.SSS");

                                long count = elements.spliterator().estimateSize();

                                // 当前处理数据
                                long currentProcessingTime = context.currentProcessingTime();
                                String currentProcessingTimeS = DateFormatUtils.format(endTs, "yyyy-MM-dd HH:mm:ss.SSS");

                                // 当前水位线
                                long currentWatermark = context.currentWatermark();

                                out.collect("key=" + s + ",的窗口[" + windowStart + "," + windowEnd + ") 包含" + count + "条数据 ===> " + elements.toString() + " 当前处理时间:" + currentProcessingTimeS);

                            }
                        }
                )
                .print()
        ;

    }

    private static void countwindow(StreamExecutionEnvironment env) {
        env
                .socketTextStream("localhost", 9999)
                .map(new MapFunction<String, Tuple2<String, Long>>() {
                         @Override
                         public Tuple2 map(String value) throws Exception {
                             return new Tuple2(value.split(",")[0], Long.parseLong(value.split(",")[1]));
                         }
                     }
                )
                .keyBy(value -> value.f0)
                // TODO 指定窗口类型:计数窗口,窗口长度为5
                .countWindow(5)
                // TODO 对窗口内的元素求和
                .process(
                        new ProcessWindowFunction<Tuple2<String, Long>, String, String, GlobalWindow>() {


                            /**
                             * @param s        窗口中所属的key
                             * @param context  上下文对象
                             * @param elements 窗口中存储的数据
                             * @param out      采集器,用来向下游发送数据
                             * @throws Exception
                             */
                            @Override
                            public void process(String s, ProcessWindowFunction<Tuple2<String, Long>, String, String, GlobalWindow>.Context context, Iterable<Tuple2<String, Long>> elements, Collector<String> out) throws Exception {
                                // 通过窗口对象,获取窗口的范围信息

                                long count = elements.spliterator().estimateSize();

                                // 当前处理数据
                                long currentProcessingTime = context.currentProcessingTime();
                                String currentProcessingTimeS = DateFormatUtils.format(currentProcessingTime, "yyyy-MM-dd HH:mm:ss.SSS");

                                // 当前水位线
                                long currentWatermark = context.currentWatermark();

                                out.collect("key=" + s + ",的窗口 包含" + count + "条数据 ===> " + elements.toString() + " 当前处理时间:" + currentProcessingTimeS);

                            }
                        }
                )
                .print()
        ;

    }
}

5.4、增量聚合的 ProcessWindowFunction

函数功能:

        既可以使用 ReduceFunction 或 AggregateFunction 增量聚合功能

        又可以使用 ProcessWindowFunction 中的元数据信息(窗口信息、水位线信息)

代码示例:

/*
 * TODO 全窗口函数:ProcessWindowFunction
 *  特点:
 *      1.窗口触发时才会调用一次process方法,对窗口内的数据统一计算
 *  泛型参数说明:
 *      ProcessWindowFunction<IN, OUT, KEY, W extends Window>
 *      @IN     :  输入数据类型
 *      @OUT    :  输出数据类型
 *      @KEY    :  key的数据类型
 *      @W      :  window类型(时间窗口、计数窗口)
 *  上下文对象说明:
 *      时间信息:currentProcessingTime()、currentWatermark()
 *      状态信息:windowState()、globalState()
 *      侧输出流:
 *      窗口信息:window()
 *  重点说明:
 *      由于WindowFunction会存储窗口内的所有数据,当窗口内数量特别大时,慎用
 *  思考:
 *      1.什么时候需要使用全窗口函数呢?
 *      计算平均数、计算中位数
 * */
public class ReduceAggredateProcessWindowFunctions {
    public static void main(String[] args) throws Exception {
        // 1.获取执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(3);

        // 2.将socket作为数据源(开启socket端口: nc -lk 9999)
        // TODO 基于处理时间的 滚动时间窗口,窗口长度10秒
        // TODO 使用 ReduceFunction + ProcessWindowFunction 增量聚合
        //reduceAndProcessWindowFunction(env);

        // TODO 使用 AggregateFunction + ProcessWindowFunction 增量聚合
        aggreateAndProcessWindowFunction(env);

        // 3.触发程序执行
        env.execute();
    }

    private static void reduceAndProcessWindowFunction(StreamExecutionEnvironment env) {
        env
                .socketTextStream("localhost", 9999)
                .map(new MapFunction<String, Tuple2<String, Long>>() {
                         @Override
                         public Tuple2 map(String value) throws Exception {
                             return new Tuple2(value.split(",")[0], Long.parseLong(value.split(",")[1]));
                         }
                     }
                )
                .keyBy(value -> value.f0)
                // TODO 指定窗口类型:基于处理时间的 滚动时间窗口,窗口长度5秒
                .window(TumblingProcessingTimeWindows.of(Time.seconds(10)))
                // TODO 获取窗口中的最小元素和窗口的开始时间、结束时间
                .reduce(
                        new ReduceFunction<Tuple2<String, Long>>() {

                            /**
                             * @param value1 参与聚合的第一个值,也就是累加器的值
                             * @param value2 参与聚合的第二个值,也就是新进入窗口的时间数据
                             * @return
                             * @throws Exception
                             */
                            @Override
                            public Tuple2<String, Long> reduce(Tuple2<String, Long> value1, Tuple2<String, Long> value2) throws Exception {
                                System.out.println("触发计算:" + value1 + " ," + value2);
                                long min = Math.min(value1.f1, value2.f1);

                                return new Tuple2<>(value1.f0, min);
                            }
                        }
                        , new ProcessWindowFunction<Tuple2<String, Long>, String, String, TimeWindow>() {

                            /**
                             * @param s        窗口中所属的key
                             * @param context  上下文对象
                             * @param elements 窗口中存储的数据
                             * @param out      采集器,用来向下游发送数据
                             * @throws Exception
                             */
                            @Override
                            public void process(String s, ProcessWindowFunction<Tuple2<String, Long>, String, String, TimeWindow>.Context context, Iterable<Tuple2<String, Long>> elements, Collector<String> out) throws Exception {
                                // 通过窗口对象,获取窗口的范围信息
                                long startTs = context.window().getStart();
                                long endTs = context.window().getEnd();
                                String windowStart = DateFormatUtils.format(startTs, "yyyy-MM-dd HH:mm:ss.SSS");
                                String windowEnd = DateFormatUtils.format(endTs, "yyyy-MM-dd HH:mm:ss.SSS");

                                long count = elements.spliterator().estimateSize();

                                // 当前处理数据
                                long currentProcessingTime = context.currentProcessingTime();
                                String currentProcessingTimeS = DateFormatUtils.format(endTs, "yyyy-MM-dd HH:mm:ss.SSS");

                                // 当前水位线
                                long currentWatermark = context.currentWatermark();

                                out.collect("key=" + s + ",的窗口[" + windowStart + "," + windowEnd + ") 包含" + count + "条数据 ===> " + elements.toString() + " 当前处理时间:" + currentProcessingTimeS);

                            }
                        }
                )
                .print()
        ;

    }

    private static void aggreateAndProcessWindowFunction(StreamExecutionEnvironment env) {
        env
                .socketTextStream("localhost", 9999)
                .map(new MapFunction<String, Tuple2<String, Long>>() {
                         @Override
                         public Tuple2 map(String value) throws Exception {
                             return new Tuple2(value.split(",")[0], Long.parseLong(value.split(",")[1]));
                         }
                     }
                )
                .keyBy(value -> value.f0)
                // TODO 指定窗口类型:基于处理时间的 滚动时间窗口,窗口长度5秒
                .window(TumblingProcessingTimeWindows.of(Time.seconds(10)))
                // TODO 计算窗口内数据平均值并与窗口对应的key一同输出
                .aggregate(
                        new AggregateFunction<Tuple2<String, Long>, Tuple2<Long, Long>, Double>() {

                            /**
                             * @return
                             */
                            @Override
                            public Tuple2<Long, Long> createAccumulator() {
                                return new Tuple2<Long, Long>(0L, 0L);
                            }

                            /**
                             * @param value       The value to add
                             * @param accumulator The accumulator to add the value to
                             * @return
                             */
                            @Override
                            public Tuple2<Long, Long> add(Tuple2<String, Long> value, Tuple2<Long, Long> accumulator) {
                                return new Tuple2<Long, Long>(accumulator.f0 + value.f1, accumulator.f1 + 1L);
                            }

                            /**
                             * @param accumulator The accumulator of the aggregation
                             * @return
                             */
                            @Override
                            public Double getResult(Tuple2<Long, Long> accumulator) {
                                return (double) (accumulator.f0 * 1.0000 / accumulator.f1);
                            }

                            /**
                             * @param a An accumulator to merge
                             * @param b Another accumulator to merge
                             * @return
                             */
                            @Override
                            public Tuple2<Long, Long> merge(Tuple2<Long, Long> a, Tuple2<Long, Long> b) {
                                return null;
                            }
                        }
                        ,new ProcessWindowFunction<Double,String,String,TimeWindow>(){
                            /**
                             * @param s        The key for which this window is evaluated.
                             * @param context  The context in which the window is being evaluated.
                             * @param elements The elements in the window being evaluated.
                             * @param out      A collector for emitting elements.
                             * @throws Exception
                             */
                            @Override
                            public void process(String s, ProcessWindowFunction<Double, String, String, TimeWindow>.Context context, Iterable<Double> elements, Collector<String> out) throws Exception {
                                // 通过窗口对象,获取窗口的范围信息
                                long startTs = context.window().getStart();
                                long endTs = context.window().getEnd();
                                String windowStart = DateFormatUtils.format(startTs, "yyyy-MM-dd HH:mm:ss.SSS");
                                String windowEnd = DateFormatUtils.format(endTs, "yyyy-MM-dd HH:mm:ss.SSS");

                                long count = elements.spliterator().estimateSize();

                                // 当前处理数据
                                long currentProcessingTime = context.currentProcessingTime();
                                String currentProcessingTimeS = DateFormatUtils.format(endTs, "yyyy-MM-dd HH:mm:ss.SSS");

                                // 当前水位线
                                long currentWatermark = context.currentWatermark();

                                out.collect("key=" + s + ",的窗口[" + windowStart + "," + windowEnd + ") 包含" + count + "条数据 ===> " + elements.toString() + " 当前处理时间:" + currentProcessingTimeS);
                            }
                        }
                )
                .print()
        ;

    }
}

6、Flink中的 Window的生命周期

窗口什么时候创建?

        当窗口所属的第一条数据到达时,窗口会被创建

窗口什么时候删除?

        当 水位线 或者 processing time 超过窗口的结束时间戳 + allowedLateness时,窗口会被删除

窗口什么时候被触发计算?

        当定义的触发器触发时, window function 会处理窗口内的数据

        默认触发器:

                当 水位线 或者 processing time 超过窗口的结束时间戳时,窗口会被计算

6.1、触发器 - Triggers

触发器的作用:

        Trigger 决定了一个窗口(由 window assigner 定义)何时可以被 window function 处理

内置 Trigger:

自定义 Triggers:

Trigger 接口:

    // 每个元素被加入窗口时调用
    @Override
    public TriggerResult onElement(Tuple2<String, Long> element, long timestamp, GlobalWindow window, TriggerContext ctx) 

    // 在注册的 event-time timer 触发时调用
    @Override
    public TriggerResult onProcessingTime(long time, GlobalWindow window, TriggerContext ctx)

    // 在注册的 processing-time timer 触发时调用
    @Override
    public TriggerResult onEventTime(long time, GlobalWindow window, TriggerContext ctx) 

    // 对应窗口被移除时所需的逻辑
    @Override
    public void clear(GlobalWindow window, TriggerContext ctx) 

    TriggerResult:
        CONTINUE: 什么也不做
        FIRE: 触发计算
        PURGE: 清空窗口内的元素
        FIRE_AND_PURGE: 触发计算,计算结束后清空窗口内的元素

代码示例:

// TODO 自定义触发器
public class Triggers {
    public static void main(String[] args) throws Exception {
        // 1.获取执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(3);

        // 2.将socket作为数据源(开启socket端口: nc -lk 9999)
        // TODO Window:KeyedStream → WindowedStream
        // countWindow(env);

        // TODO countWindowAll:DataStream → AllWindowedStream
        timeWindow(env);

        // 3.触发程序执行
        env.execute();
    }

    // TODO 基于 计数窗口 的触发器
    private static void countWindow(StreamExecutionEnvironment env) {
        env
                .socketTextStream("localhost", 9999)
                .map(new MapFunction<String, Tuple2<String, Long>>() {
                         @Override
                         public Tuple2 map(String value) throws Exception {
                             return new Tuple2(value.split(",")[0], Long.parseLong(value.split(",")[1]));
                         }
                     }
                )
                .keyBy(value -> value.f0)
                // TODO 滚动窗口,窗口长度=5个元素
                //.countWindow(5)
                // TODO 滑动窗口,窗口长度=5个元素,滑动步长=2个元素 (每当进入两个数据,就统计下窗口内最五个数据)
                .countWindow(5, 2)
                .trigger(
                        new Trigger<Tuple2<String, Long>, GlobalWindow>() {
                            @Override
                            public TriggerResult onElement(Tuple2<String, Long> element, long timestamp, GlobalWindow window, TriggerContext ctx) throws Exception {
                                System.out.println("调用onElement方法");
                                if (element.f1 == 999) {
                                    return TriggerResult.FIRE;
                                } else {
                                    return TriggerResult.CONTINUE;
                                }
                            }

                            @Override
                            public TriggerResult onProcessingTime(long time, GlobalWindow window, TriggerContext ctx) throws Exception {
                                System.out.println("调用onProcessingTime方法");
                                return null;
                            }

                            @Override
                            public TriggerResult onEventTime(long time, GlobalWindow window, TriggerContext ctx) throws Exception {
                                System.out.println("调用onEventTime方法");
                                return null;
                            }

                            @Override
                            public void clear(GlobalWindow window, TriggerContext ctx) throws Exception {
                                System.out.println("调用clear方法");
                            }
                        }
                )
                .process(
                        new ProcessWindowFunction<Tuple2<String, Long>, String, String, GlobalWindow>() {
                            @Override
                            public void process(String s, ProcessWindowFunction<Tuple2<String, Long>, String, String, GlobalWindow>.Context context, Iterable<Tuple2<String, Long>> elements, Collector<String> out) throws Exception {
                                // 当前水位线
                                long watermark = context.currentWatermark();
                                // 当前处理时间
                                long processingTime = context.currentProcessingTime();
                                // 窗口开始时间

                                // 窗口结束时间

                                // 计算窗口内数据数量
                                long count = elements.spliterator().estimateSize();

                                String record = "key=" + s
                                        + " 包含" + count + "条数据===>" + elements.toString()
                                        + " 当前Watermark:" + watermark
                                        + " 当前processingTime:" + processingTime;

                                out.collect(record);
                            }
                        }
                )
                .print()
        ;
    }

    // TODO 基于 处理时间窗口 的触发器
    private static void timeWindow(StreamExecutionEnvironment env) {
        env
                .socketTextStream("localhost", 9999)
                .map(new MapFunction<String, Tuple2<String, Long>>() {
                         @Override
                         public Tuple2 map(String value) throws Exception {
                             return new Tuple2(value.split(",")[0], Long.parseLong(value.split(",")[1]));
                         }
                     }
                )
                .keyBy(value -> value.f0)
                // 滚动窗口,窗口长度5s
                .window(TumblingProcessingTimeWindows.of(Time.seconds(10)))
                .trigger(
                        new Trigger<Tuple2<String, Long>, TimeWindow>() {

                            @Override
                            public TriggerResult onElement(Tuple2<String, Long> element, long timestamp, TimeWindow window, TriggerContext ctx) throws Exception {
                                System.out.println("调用onElement方法");
                                return TriggerResult.CONTINUE;
                            }

                            @Override
                            public TriggerResult onProcessingTime(long time, TimeWindow window, TriggerContext ctx) throws Exception {
                                System.out.println("调用onProcessingTime方法");
                                return TriggerResult.FIRE;
                            }

                            @Override
                            public TriggerResult onEventTime(long time, TimeWindow window, TriggerContext ctx) throws Exception {
                                System.out.println("调用onEventTime方法");
                                return TriggerResult.CONTINUE;
                            }
                            
                            @Override
                            public void clear(TimeWindow window, TriggerContext ctx) throws Exception {
                                System.out.println("调用clear方法");
                            }
                        }
                )
                .process(new ShowProcessWindowFunction())
                .print()
        ;
    }
}

6.2、移除器 - Evictors

移除器的作用:

        Evictor 可以在 trigger 触发后、调用窗口函数之前或之后从窗口中删除元素

内置 Evictors :

        CountEvictor(元素个数) :一旦窗口中的元素超过这个数量,多余的元素会从窗口缓存的开头移除

        DeltaEvictor :接收 DeltaFunction 和 threshold 参数,计算最后一个元素与窗口缓存中所有元素的差值, 并移除差值大于或等于 threshold 的元素

        TimeEvictor :接收 interval 参数,以毫秒表示。 它会找到窗口中元素的最大 timestamp max_ts 并移除比 max_ts - interval 小的所有元素

代码示例:

public class Evictors {
    public static void main(String[] args) throws Exception {
        // 1.获取执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(3);

        env
                .socketTextStream("localhost", 9999)
                .map(new MapFunction<String, Tuple2<String, Long>>() {
                         @Override
                         public Tuple2 map(String value) throws Exception {
                             return new Tuple2(value.split(",")[0], Long.parseLong(value.split(",")[1]));
                         }
                     }
                )
                .keyBy(value -> value.f0)
                // TODO 滚动窗口,窗口长度=5个元素
                .countWindow(5)
                .evictor(CountEvictor.of(2L))
                .process(
                        new ProcessWindowFunction<Tuple2<String, Long>, String, String, GlobalWindow>() {
                            @Override
                            public void process(String s, ProcessWindowFunction<Tuple2<String, Long>, String, String, GlobalWindow>.Context context, Iterable<Tuple2<String, Long>> elements, Collector<String> out) throws Exception {
                                // 当前水位线
                                long watermark = context.currentWatermark();
                                // 当前处理时间
                                long processingTime = context.currentProcessingTime();
                                // 窗口开始时间

                                // 窗口结束时间

                                // 计算窗口内数据数量
                                long count = elements.spliterator().estimateSize();

                                String record = "key=" + s
                                        + " 包含" + count + "条数据===>" + elements.toString()
                                        + " 当前Watermark:" + watermark
                                        + " 当前processingTime:" + processingTime;

                                out.collect(record);
                            }
                        }
                )
                .print()
        ;

        // 3.触发程序执行
        env.execute();
    }
}

7、对迟到数据的处理

        在使用 event-time 窗口时,数据可能会迟到,默认情况下,watermark 一旦越过窗口结束的 timestamp,迟到的数据就会被直接丢弃,Flink中提供多种多处理迟到数据的策略。

7.1、生成水位线时,设置最大乱序时间

        在生成 watermark 时,设置一个可容忍的最大乱序时间,保证窗口计算被延迟执行,保证更多的迟到的数据能够进入窗口

        注意:迟到的数据超过了设置的最大乱序时间,将会被丢弃

WatermarkStrategy
                .<Tuple2<String, Long>>forBoundedOutOfOrderness(Duration.ofSeconds(1)) // 设置最大乱序时间为1s
                .withTimestampAssigner((element,recordTimestamp) -> element.f1);

7.2、设置窗口延迟关闭 - allowedLateness

可以通过 window.allowedLateness 来设置窗口延迟关闭的时间

allowedLateness 默认为0

        表示 当watermark 或 processing time 超过 窗口结束的timestamp时,触发计算 并销毁窗口

allowedLateness 大于0时

        表示  当watermark 或 processing time 超过 窗口结束的timestamp时,会触发计算 但是并不会销毁窗口
        而是当 窗口结束的timestamp + allowedLateness 的数据到来后,才会销毁窗口
        并且 每次接收到迟到的数据后 都会触发窗口计算

代码示例:

public class AllowedLateness {
    public static void main(String[] args) throws Exception {
        // 1.获取执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        // 2.将socket作为数据源(开启socket端口: nc -lk 9999)
        env
                .socketTextStream("localhost", 9999)
                .map(new MapFunction<String, Tuple2<String, Long>>() {
                         @Override
                         public Tuple2 map(String value) throws Exception {
                             return new Tuple2(value.split(",")[0], Long.parseLong(value.split(",")[1]));
                         }
                     }
                )
                .assignTimestampsAndWatermarks(
                        WatermarkStrategy
                                .<Tuple2<String, Long>>forMonotonousTimestamps()
                                .withTimestampAssigner((element, recordTimestamp) -> element.f1)
                )
                .keyBy(value -> value.f0)
                // 滚动窗口,窗口长度5s
                .window(TumblingEventTimeWindows.of(Time.seconds(5)))
                // 窗口延迟10s关闭
                .allowedLateness(Time.seconds(10))
                .process(new ShowProcessWindowFunction())
                .print()
        ;

        // 3.触发程序执行
        env.execute();
    }
}

运行结果:


7.3、使用侧输出流获取迟到的数据 - sideOutputLateData

在 Flink中可以使用 侧输出流-OutputTag 来获取窗口中迟到的数据

代码示例:

// TODO 使用侧流接收迟到的数据
public class UseSideOutputReceiveLatedata {
    public static void main(String[] args) throws Exception {
        // 1.获取执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        // 声明 OutputTag 对象,用来接收迟到的数据
        OutputTag<Tuple2<String, Long>> outputTag = new OutputTag<Tuple2<String, Long>>("side-output"){};

        // 2.将socket作为数据源(开启socket端口: nc -lk 9999)
        SingleOutputStreamOperator<String> processDataStream = env
                .socketTextStream("localhost", 9999)
                .map(new MapFunction<String, Tuple2<String, Long>>() {
                         @Override
                         public Tuple2 map(String value) throws Exception {
                             return new Tuple2(value.split(",")[0], Long.parseLong(value.split(",")[1]));
                         }
                     }
                )
                .assignTimestampsAndWatermarks(
                        WatermarkStrategy
                                .<Tuple2<String, Long>>forMonotonousTimestamps()
                                .withTimestampAssigner((element, recordTimestamp) -> element.f1)
                )
                .keyBy(value -> value.f0)
                // 滚动窗口,窗口长度5s
                .window(TumblingEventTimeWindows.of(Time.seconds(5)))
                .sideOutputLateData(outputTag)
                .process(new ShowProcessWindowFunction());

        // 输出主流
        processDataStream.print("主流输出");

        // 从主流获取侧输出流,打印迟到的数据
        processDataStream.getSideOutput(outputTag).printToErr("关窗后的迟到数据");

        // 3.触发程序执行
        env.execute();
    }
}

运行结果:

 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1042213.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Python 进阶 - 日常工作中使用过的简单Trick

引言&#xff1a;无论你是一名初学者还是有一定经验的开发者&#xff0c;这些技巧都能帮助你更好地利用Python来解决问题、提高效率和写出更干净、可维护的代码。无论你是数据科学家、Web开发者、自动化脚本编写者还是其他领域的从业者&#xff0c;这些技巧都能对你有所帮助。 …

Linux 日期、时区

1、date命令 通过date命令可以在命令行中查看系统的时间 date [-d] [格式化字符串] -d 按照给定的字符串显示日期&#xff0c;一般用于日期计算 格式化字符串&#xff1a;通过特定的字符串标记&#xff0c;来控制显示的日期格式 %Y 年 %y 年份后两位数字 (00..99) %m …

【百度地图】绘制扇形图层

思路&#xff1a;使用多边形图层PolygonLayer&#xff0c;借助函数将一个经纬度转换成扇形经纬度数组 扇形经纬度数组生成函数 /** 返回扇形经纬度数组 绘制多边形* lon 经度* lat 维度* radius 半径* azimuth 方向角* jiaodu&#xff08;扇形的夹角&#xff09;** **/ export…

java Spring Boot2.7写一个接口 提供图片预览 前端可以直接用接口地址当src为图片地址使用

我们特别是在做小程序开发时 很多图片会比较大 而小程序本身就对自身大小要求非常高 所以 图片放在服务器上提供访问链接是一种非常好的选择 我想很多前端会误认为 直接将图片放在服务器上就可以了 但其实没那么简单 因为服务器其实也可以理解为一个电脑 你就想 你自己本地都不…

python+requests+pytest+allure自动化框架

1.核心库 requests request请求 openpyxl excel文件操作 loggin 日志 smtplib 发送邮件 configparser unittest.mock mock服务 2.目录结构 base utils testDatas conf testCases testReport logs 其他 2.1base base_path.py 存放绝对路径,dos命令或Jenkins执行…

idea Springboot闲置物品交易平台VS开发mysql数据库web结构java编程计算机网页源码maven项目

一、源码特点 springboot 闲置物品交易平台是一套完善的完整二手交易信息系统&#xff0c;结合springboot框架和bootstrap完成本系统&#xff0c;对理解JSP java编程开发语言有帮助系统采用springboot框架&#xff08;MVC模式开发&#xff09;&#xff0c;系统具有完整的源代…

Wespeaker框架训练(2)

2. 模型训练 2.1 run.sh stage 3 数据集处理完毕后开始训练&#xff0c;主要是调用wespeaker/bin/train.py 函数 echo ”Start training ...” 打印提示信息&#xff0c;表示开始训练num_gpus$(echo $gpus | awk -F ’,’ ’print NF’) 通过gpus 变量获取要使用的GPU 数量t…

《C++ primer》练习6.36-6.38:书写返回数组引用的函数声明

最近看C primer&#xff0c;看到《C primer》6.3.3练习&#xff0c;要求书写返回数组引用的函数声明&#xff0c;觉得有必要实践记录一下。 这里先总结返回数组的引用的的函数声明写法&#xff08;下面的Type是数组元素的类型&#xff0c;可以是int、float等&#xff0c;如果要…

企业架构相关

数据架构的作用首先是找到所有的业务对象 和数据对象。 在数据对象分析里面有一个重点就是主数据识别和分析。

OpenCV项目开发实战--使用 EigenFaces 进行人脸重建 (含C++/Python源码)

在这篇文章中,我们将学习如何使用 EigenFaces 重建面部。这篇文章是为初学者写的。如果您不了解主成分分析 (PCA) 或 EigenFaces。 什么是特征脸? 特征脸是可以添加到平均(平均)脸部以创建新的面部图像的图像。我们可以用数学方式将其写为: 在哪里,

【周赛364-数组】最大二进制奇数-力扣 2863 题

&#x1f49d;&#x1f49d;&#x1f49d;欢迎来到我的博客&#xff0c;很高兴能够在这里和您见面&#xff01;希望您在这里可以感受到一份轻松愉快的氛围&#xff0c;不仅可以获得有趣的内容和知识&#xff0c;也可以畅所欲言、分享您的想法和见解。 推荐:kuan 的首页,持续学…

vue pc端/手机移动端 — 下载导出当前表格页面pdf格式

一、需求&#xff1a;在手机端/pc端实现一个表格页面&#xff08;缴费单/体检报告单等&#xff09;的导出功能&#xff0c;便于用户在本地浏览打印。 二、实现&#xff1a;之前在pc端做过预览打印的功能&#xff0c;使用的是print.js之类的方法让当前页面直接唤起打印机的打印预…

win11、win10使用python代码打开和关闭wifi热点的正确方法

问题一 win10、win11&#xff0c;可以在任务栏的WIFI图标启动移动热点&#xff0c;但是无法设置SSID和密码。在网上搜索好久&#xff0c;无解。 万能的网络解决不了&#xff0c;只能自己动手解决了。 问题二 我当前的WiFi驱动程序不支持承载网络&#xff0c;如果我输入netsh…

JMeter+Python 实现异步接口测试

当使用JMeter和Python来实现异步接口测试时&#xff0c;可以按照以下步骤进行操作&#xff1a; 1、安装JMeter和Java Development Kit&#xff08;JDK&#xff09;&#xff1a; 下载并安装JMeter&#xff08;https://jmeter.apache.org/download_jmeter.cgi&#xff09;和适用…

数组06-滑动窗口

目录 LeetCode——209. 长度最小的子数组 分析&#xff1a; LeetCode——844. 比较含退格的字符串 分析&#xff1a; LeetCode——209. 长度最小的子数组 给定一个含有 n 个正整数的数组和一个正整数 target 。 找出该数组中满足其总和大于等于 target 的长度最小的 连续…

Apache Doris 行列转换可以这样玩

行列转换在做报表分析时还是经常会遇到的&#xff0c;今天就说一下如何实现行列转换吧。 行列转换就是如下图所示两种展示形式的互相转换 1. 行转列 我们来看一个简单的例子&#xff0c;我们要把下面这个表的数据&#xff0c;转换成图二的样式 image-20230914151818953.png …

计算机网络补充

未分类文档 CDMA是码分多路复用技术 和CMSA不是一个东西 UPD是只确保发送 但是接收端收到之后(使用检验和校验 除了检验的部分相加 对比检验和是否相等。如果不相同就丢弃。 复用和分用是发生在上层和下层的问题。通过比如时分多路复用 频分多路复用等。TCP IP 应用层的IO多路…

Coupang新手教程,Coupang怎么收款?——站斧浏览器

coupang新手教程 韩国coupang入驻条件很简单&#xff0c;只需要你提供注册四件套就可以了&#xff1b; Coupang的经营模式呢可以说和我们国内的电商比较像&#xff1b; 前期可以做无货源模式&#xff1b;在熟悉平台一段时间后&#xff0c;可以去打造我们自己的精品店铺&…

start()方法源码分析

当我们创建好一个线程之后&#xff0c;可以调用.start()方法进行启动&#xff0c;start()方法的内部其实是调用本地的start0()方法&#xff0c; 其实Thread.java这个类中的方法在底层的Thread.c文件中都是一一对应的&#xff0c;在Thread.c中start0方法的底层调用了jvm.cpp文件…

Python机器学习实战-特征重要性分析方法(4):相关性分析(附源码和实现效果)

实现功能 计算各特征与目标变量之间的相关性。相关性越高的特征越重要。 实现代码 import pandas as pd from sklearn.datasets import load_breast_cancer import matplotlib.pyplot as pltX, y load_breast_cancer(return_X_yTrue) df pd.DataFrame(X, columnsrange(30)…