Flink 滚动窗口、滑动窗口详解

news2024/11/29 0:33:42

1 滚动窗口(Tumbling Windows)

滚动窗口有固定的大小,是一种对数据进行“均匀切片”的划分方式。窗口之间没有重叠,也不会有间隔,是“首尾相接”的状态。如果我们把多个窗口的创建,看作一个窗口的运动,那就好像它在不停地向前“翻滚”一样。这是最简单的窗口形式,我们之前所举的例子都是滚动窗口。也正是因为滚动窗口是“无缝衔接”,所以每个数据都会被分配到一个窗口,而且只会属于一个窗口。
滚动窗口可以基于时间定义,也可以基于数据个数定义;需要的参数只有一个,就是窗口的大小(window size)。比如我们可以定义一个长度为 1 小时的滚动时间窗口,那么每个小时就会进行一次统计;或者定义一个长度为 10 的滚动计数窗口,就会每 10 个数进行一次统计。
在这里插入图片描述

2 滑动窗口(Sliding Windows)

与滚动窗口类似,滑动窗口的大小也是固定的。区别在于,窗口之间并不是首尾相接的,而是可以“错开”一定的位置。如果看作一个窗口的运动,那么就像是向前小步“滑动”一样。

既然是向前滑动,那么每一步滑多远,就也是可以控制的。所以定义滑动窗口的参数有两个:除去窗口大小(window size)之外,还有一个“滑动步长”(window slide),它其实就代
表了窗口计算的频率。滑动的距离代表了下个窗口开始的时间间隔,而窗口大小是固定的,所以也就是两个窗口结束时间的间隔;窗口在结束时间触发计算输出结果,那么滑动步长就代表
了计算频率。例如,我们定义一个长度为 1 小时、滑动步长为 5 分钟的滑动窗口,那么就会统计 1 小时内的数据,每 5 分钟统计一次。同样,滑动窗口可以基于时间定义,也可以基于数据
个数定义。
在这里插入图片描述
在这里插入图片描述
我们可以看到,当滑动步长小于窗口大小时,滑动窗口就会出现重叠,这时数据也可能会被同时分配到多个窗口中。而具体的个数,就由窗口大小和滑动步长的比值(size/slide)来决
定。如图 6-18 所示,滑动步长刚好是窗口大小的一半,那么每个数据都会被分配到 2 个窗口里。比如我们定义的窗口长度为 1 小时、滑动步长为 30 分钟,那么对于 8 点 55 分的数据,应该同时属于[8 点, 9 点)和[8 点半, 9 点半)两个窗口;而对于 8 点 10 分的数据,则同时属于[8点, 9 点)和[7 点半, 8 点半)两个窗口。所以,滑动窗口其实是固定大小窗口的更广义的一种形式;换句话说,滚动窗口也可以看作是一种特殊的滑动窗口——窗口大小等于滑动步长(size = slide)。当然,我们也可以定义滑动步长大于窗口大小,这样的话就会出现窗口不重叠、但会有间隔的情况;这时有些数据不
属于任何一个窗口,就会出现遗漏统计。所以一般情况下,我们会让滑动步长小于窗口大小,并尽量设置为整数倍的关系。

在一些场景中,可能需要统计最近一段时间内的指标,而结果的输出频率要求又很高,甚至要求实时更新,比如股票价格的 24 小时涨跌幅统计,或者基于一段时间内行为检测的异常报警。这时滑动窗口无疑就是很好的实现方式。

3 窗口API

3.1 按键分区窗口(Keyed Windows)

经过按键分区 keyBy 操作后,数据流会按照 key 被分为多条逻辑流(logical streams),这就是 KeyedStream。基于 KeyedStream 进行窗口操作时, 窗口计算会在多个并行子任务上同时
执行。相同 key 的数据会被发送到同一个并行子任务,而窗口操作会基于每个 key 进行单独的处理。所以可以认为,每个 key 上都定义了一组窗口,各自独立地进行统计计算。

在代码实现上,我们需要先对 DataStream 调用.keyBy()进行按键分区,然后再调用.window()定义窗口。

stream.keyBy(...) .window(...)

3.2 非按键分区(Non-Keyed Windows)

如果没有进行 keyBy,那么原始的 DataStream 就不会分成多条逻辑流。这时窗口逻辑只能在一个任务(task)上执行,就相当于并行度变成了 1。所以在实际应用中一般不推荐使用这种方式。

在代码中,直接基于 DataStream 调用.windowAll()定义窗口。

stream.windowAll(...)

3.3 代码中窗口 API 的调用

窗口操作主要有两个部分:窗口分配器(Window Assigners)和窗口函数(Window Functions)。

stream.keyBy(<key selector>) .window(<window assigner>) .aggregate(<window function>)

其中.window()方法需要传入一个窗口分配器,它指明了窗口的类型;而后面的.aggregate()方法传入一个窗口函数作为参数,它用来定义窗口具体的处理逻辑。窗口分配器有各种形式,而窗口函数的调用方法也不只.aggregate()一种,另外,在实际应用中,一般都需要并行执行任务,非按键分区很少用到,所以我们之后都以按键分区窗口为例;如果想要实现非按键分区窗口,只要前面不做 keyBy,后面调用.window()时直接换成.windowAll()就可以了。

3.4 滚动处理时间窗口

窗口分配器由类 TumblingProcessingTimeWindows 提供,需要调用它的静态方法.of()。

stream.keyBy(...).window(TumblingProcessingTimeWindows.of(Time.seconds(5))).aggregate(...)

这里.of()方法需要传入一个 Time 类型的参数 size,表示滚动窗口的大小,我们这里创建了一个长度为 5 秒的滚动窗口。另外,.of()还有一个重载方法,可以传入两个 Time 类型的参数:size 和 offset。第一个参数当然还是窗口大小,第二个参数则表示窗口起始点的偏移量,用这个偏移量可以处理时区。

例如:我们所在的时区是东八区,也就是 UTC+8,跟 UTC 有 8小时的时差。我们定义 1 天滚动窗口时,如果用默认的起始点,那么得到就是伦敦时间每天 0点开启窗口,这时是北京时间早上 8 点。那怎样得到北京时间每天 0 点开启的滚动窗口呢?只要设置-8 小时的偏移量就可以了。

.window(TumblingProcessingTimeWindows.of(Time.days(1), Time.hours(-8)))

3.5 滑动处理时间窗口

窗口分配器由类 SlidingProcessingTimeWindows 提供,同样需要调用它的静态方法.of()。

这里.of()方法需要传入两个 Time 类型的参数:size 和 slide,前者表示滑动窗口的大小,后者表示滑动窗口的滑动步长。我们这里创建了一个长度为 10 秒、滑动步长为 5 秒的滑动窗口。
滑动窗口同样可以追加第三个参数,用于指定窗口起始点的偏移量,用法与滚动窗口完全一致。

stream.keyBy(...).window(SlidingProcessingTimeWindows.of(Time.seconds(10), Time.seconds(5))).aggregate(...)

4 窗口函数

定义了窗口分配器,我们只是知道了数据属于哪个窗口,可以将数据收集起来了;至于收集起来到底要做什么,其实还完全没有头绪。所以在窗口分配器之后,必须再接上一个定义窗
口如何进行计算的操作,这就是所谓的“窗口函数”(window functions)。经窗口分配器处理之后,数据可以分配到对应的窗口中,而数据流经过转换得到的数据类型是 WindowedStream。这个类型并不是 DataStream,所以并不能直接进行其他转换,而必须进一步调用窗口函数,对收集到的数据进行处理计算之后,才能最终再次得到 DataStream。
在这里插入图片描述

4.1 增量聚合函数(incremental aggregation functions)

4.1.1 归约函数(ReduceFunction)

将窗口中收集到的数据两两进行归约。当我们进行流处理时,就是要保存一个状态;每来一个新的数据,就和之前的聚合状态做归约,这样就实现了增量式的聚合。

统计每一小时用户的访问量:

package com.rosh.flink.pojo;

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;

@AllArgsConstructor
@NoArgsConstructor
@Data
public class UserPojo {

    private Integer userId;

    private String name;

    private String uri;

    private Long timestamp;

}
package com.rosh.flink.wartermark;

import com.rosh.flink.pojo.UserPojo;
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;

import java.security.NoSuchAlgorithmException;
import java.security.SecureRandom;
import java.util.ArrayList;
import java.util.List;
import java.util.Random;

public class WindowTS {

    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        DataStreamSource<UserPojo> dataDS = env.fromCollection(getUserLists());

        //生成有序水位线
        SingleOutputStreamOperator<UserPojo> orderStreamDS = dataDS.assignTimestampsAndWatermarks(WatermarkStrategy.<UserPojo>forMonotonousTimestamps()
                .withTimestampAssigner(new SerializableTimestampAssigner<UserPojo>() {
                    @Override
                    public long extractTimestamp(UserPojo element, long recordTimestamp) {
                        return element.getTimestamp();
                    }
                }));

        //聚合
        SingleOutputStreamOperator<Tuple2<Integer, Long>> userDS = orderStreamDS.map(new MapFunction<UserPojo, Tuple2<Integer, Long>>() {
            @Override
            public Tuple2<Integer, Long> map(UserPojo value) throws Exception {
                return Tuple2.of(value.getUserId(), 1L);
            }
        });

        //开窗统计每1小时用户访问了多少次
        SingleOutputStreamOperator<Tuple2<Integer, Long>> resultDS = userDS.keyBy(tuple -> tuple.f0)
                .window(TumblingEventTimeWindows.of(Time.hours(1)))
                .reduce(new ReduceFunction<Tuple2<Integer, Long>>() {
                    @Override
                    public Tuple2<Integer, Long> reduce(Tuple2<Integer, Long> value1, Tuple2<Integer, Long> value2) throws Exception {
                        value1.f1 = value1.f1 + value2.f1;
                        return value1;
                    }
                });

        resultDS.print();

        env.execute("WarterMarkTest");

    }

    private static List<UserPojo> getUserLists() throws NoSuchAlgorithmException {
        List<UserPojo> lists = new ArrayList<>();
        Random random = SecureRandom.getInstanceStrong();
        for (int i = 1; i <= 1000; i++) {
            String uri = "/goods/" + i;
            int userId = random.nextInt(10);
            //有序时间
            UserPojo userPojo = new UserPojo(userId, "name" + userId, uri, (long) (1000 * i));
            //无序时间
            lists.add(userPojo);
        }
        return lists;
    }

}

在这里插入图片描述

4.1.2 聚合函数(AggregateFunction)

ReduceFunction 可以解决大多数归约聚合的问题,但是这个接口有一个限制,就是聚合状态的类型、输出结果的类型都必须和输入数据类型一样。这就迫使我们必须在聚合前,先将数
据转换(map)成预期结果类型;而在有些情况下,还需要对状态进行进一步处理才能得到输出结果,这时它们的类型可能不同,使用 ReduceFunction 就会非常麻烦。

例如,如果我们希望计算一组数据的平均值,应该怎样做聚合呢?很明显,这时我们需要计算两个状态量:数据的总和(sum),以及数据的个数(count),而最终输出结果是两者的商(sum/count)。如果用 ReduceFunction,那么我们应该先把数据转换成二元组(sum, count)的形式,然后进行归约聚合,最后再将元组的两个元素相除转换得到最后的平均值。本来应该只是一个任务,可我们却需要 map-reduce-map 三步操作,这显然不够高效。

于是自然可以想到,如果取消类型一致的限制,让输入数据、中间状态、输出结果三者类型都可以不同,不就可以一步直接搞定了吗?Flink 的 Window API 中的 aggregate 就提供了这样的操作。直接基于 WindowedStream 调用.aggregate()方法,就可以定义更加灵活的窗口聚合操作。这个方法需要传入一个AggregateFunction 的实现类作为参数。AggregateFunction 在源码中的定义如下:


/**
 *
 *  The type of the values that are aggregated (input values)
 *	The type of the accumulator (intermediate aggregate state).
 *	The type of the aggregated result
 *
 */
public interface AggregateFunction<IN, ACC, OUT> extends Function, Serializable 
{
	   /**
      * 创建一个累加器,这就是为聚合创建了一个初始状态,每个聚合任务只会调用一次。
			*/		
		 ACC createAccumulator();

     /**
	    * 将输入的元素添加到累加器中。这就是基于聚合状态,对新来的数据进行进一步聚合的过程。方法传入两个参数:当前新到的数据 value,和当前的累加器accumulator;
		  * 返回一个新的累加器值,也就是对聚合状态进行更新。每条数据到来之后都会调用这个方法。
	    */
		 ACC add(IN value, ACC accumulator);

		
		/**
	   * 从累加器中提取聚合的输出结果。也就是说,我们可以定义多个状态,然后再基于这些聚合的状态计算出一个结果进行输出。比如之前我们提到的计算平均
	   * 值,就可以把 sum 和 count 作为状态放入累加器,而在调用这个方法时相除得到最终结果。这个方法只在窗口要输出结果时调用。
	   */
		 OUT getResult(ACC accumulator);

		/**
	   * 合并两个累加器,并将合并后的状态作为一个累加器返回。这个方法只在需要合并窗口的场景下才会被调用;最常见的合并窗口(Merging Window)的场景
	   * 就是会话窗口(Session Windows)。
	   */
		 ACC merge(ACC a, ACC b);
}

所以可以看到,AggregateFunction 的工作原理是:首先调用 createAccumulator()为任务初始化一个状态(累加器);而后每来一个数据就调用一次 add()方法,对数据进行聚合,得到的
结果保存在状态中;等到了窗口需要输出时,再调用 getResult()方法得到计算结果。很明显,与 ReduceFunction 相同,AggregateFunction 也是增量式的聚合;而由于输入、中间状态、输
出的类型可以不同,使得应用更加灵活方便。

·统计人均访问次数:

package com.rosh.flink.wartermark;

import com.rosh.flink.pojo.UserPojo;
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.AggregateFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.SlidingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;

import java.security.NoSuchAlgorithmException;
import java.security.SecureRandom;
import java.util.*;

public class AggWindowTest {

    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        SingleOutputStreamOperator<UserPojo> userDS = env.fromCollection(getUserLists()).assignTimestampsAndWatermarks(
                WatermarkStrategy.<UserPojo>forMonotonousTimestamps()
                        .withTimestampAssigner(new SerializableTimestampAssigner<UserPojo>() {
                            @Override
                            public long extractTimestamp(UserPojo element, long recordTimestamp) {
                                return element.getTimestamp();
                            }
                        })
        );

        //统计5秒内,人均访问次数
        SingleOutputStreamOperator<Double> resultDS = userDS.keyBy(key -> true)
                .window(SlidingEventTimeWindows.of(Time.seconds(10), Time.seconds(5)))
                .aggregate(new PeopleHourAvgCount());

        resultDS.print("人均访问次数为:");

        env.execute("AggWindowTest");
    }

    private static class PeopleHourAvgCount implements AggregateFunction<UserPojo, Tuple2<HashSet<Integer>, Long>, Double> {

        /**
         * 初始化累加器
         */
        @Override
        public Tuple2<HashSet<Integer>, Long> createAccumulator() {
            return Tuple2.of(new HashSet<>(), 0L);
        }

        /**
         *
         */
        @Override
        public Tuple2<HashSet<Integer>, Long> add(UserPojo value, Tuple2<HashSet<Integer>, Long> accumulator) {

            //distinct userId
            accumulator.f0.add(value.getUserId());
            //次数+1
            accumulator.f1 = accumulator.f1 + 1;

            //返回累加器
            return accumulator;
        }

        @Override
        public Double getResult(Tuple2<HashSet<Integer>, Long> accumulator) {

            return accumulator.f1 * 1.0 / accumulator.f0.size();
        }

        @Override
        public Tuple2<HashSet<Integer>, Long> merge(Tuple2<HashSet<Integer>, Long> a, Tuple2<HashSet<Integer>, Long> b) {
            return null;
        }
    }

    /**
     * 获取随机人数的1000次访问
     */
    private static List<UserPojo> getUserLists() throws NoSuchAlgorithmException {
        List<UserPojo> lists = new ArrayList<>();
        Random random = SecureRandom.getInstanceStrong();
        //获取随机人数
        int peopleCount = random.nextInt(20);
        System.out.println("随机人数为:" + peopleCount);
        for (int i = 1; i <= 1000; i++) {
            String uri = "/goods/" + i;
            int userId = random.nextInt(peopleCount);
            //有序时间
            UserPojo userPojo = new UserPojo(userId, "name" + userId, uri, new Date().getTime());
            //无序时间
            lists.add(userPojo);
        }
        return lists;
    }
}

在这里插入图片描述

4.2 全窗口函数(full window functions)

窗口操作中的另一大类就是全窗口函数。与增量聚合函数不同,全窗口函数需要先收集窗口中的数据,并在内部缓存起来,等到窗口要输出结果的时候再取出数据进行计算。

ProcessWindowFunction 是 Window API 中最底层的通用窗口函数接口。之所以说它“最底层”,是因为除了可以拿到窗口中的所有数据之外,ProcessWindowFunction 还可以获取到一个“上下文对象”(Context)。这个上下文对象非常强大,不仅能够获取窗口信息,还可以访问当前的时间和状态信息。这里的时间就包括了处理时间(processing time)和事件时间水位线(event time watermark)。这就使得 ProcessWindowFunction 更加灵活、功能更加丰富。事实上,ProcessWindowFunction 是 Flink 底层 API——处理函数(process function)中的一员。

统计10秒访问UV:

package com.rosh.flink.wartermark;

import com.rosh.flink.pojo.UserPojo;
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;

import java.security.NoSuchAlgorithmException;
import java.security.SecureRandom;
import java.sql.Timestamp;
import java.util.*;

public class ProcessWindowTest {

    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        DataStreamSource<UserPojo> userDS = env.fromCollection(getUserLists());

        //水位线
        SingleOutputStreamOperator<UserPojo> watermarks = userDS.assignTimestampsAndWatermarks(WatermarkStrategy.<UserPojo>forMonotonousTimestamps()
                .withTimestampAssigner(new SerializableTimestampAssigner<UserPojo>() {
                    @Override
                    public long extractTimestamp(UserPojo element, long recordTimestamp) {
                        return element.getTimestamp();
                    }
                }));

        //开窗10秒UV统计
        SingleOutputStreamOperator<String> resultDS = watermarks.keyBy(key -> true)
                .window(TumblingEventTimeWindows.of(Time.seconds(10)))
                .process(new UserUVCount());

        resultDS.print("UV:");

        env.execute("ProcessWindowTest");

    }

    private static class UserUVCount extends ProcessWindowFunction<UserPojo, String, Boolean, TimeWindow> {

        @Override
        public void process(Boolean aBoolean, ProcessWindowFunction<UserPojo, String, Boolean, TimeWindow>.Context context, Iterable<UserPojo> elements, Collector<String> out) throws Exception {

            //用户集合
            HashSet<Integer> hashSet = new HashSet<>();
            for (UserPojo user : elements) {
                hashSet.add(user.getUserId());
            }
            //获取时间信息
            long start = context.window().getStart();
            long end = context.window().getEnd();

            String rs = "窗口信息,startTime:" + new Timestamp(start) + ",endTime: " + new Timestamp(end) + ",用户访问的次数为:" + hashSet.size();
            out.collect(rs);
        }
    }

    private static List<UserPojo> getUserLists() throws NoSuchAlgorithmException {
        List<UserPojo> lists = new ArrayList<>();
        Random random = SecureRandom.getInstanceStrong();

        int userCount = random.nextInt(100);

        for (int i = 1; i <= 1000; i++) {
            String uri = "/goods/" + i;
            int userId = random.nextInt(userCount);
            //有序时间
            UserPojo userPojo = new UserPojo(userId, "name" + userId, uri, new Date().getTime());
            //无序时间
            lists.add(userPojo);
        }
        return lists;
    }

}

在这里插入图片描述

4.3 增量聚合和全窗口函数的结合使用

增量聚合函数处理计算会更高效。全窗口函数的优势在于提供了更多的信息,可以认为是更加“通用”的窗口操作。所以在实际应用中,我们往往希望兼具这两者的优点,把它们结合在一起使用。Flink 的Window API 就给我们实现了这样的用法。

在调用 WindowedStream 的.reduce()和.aggregate()方法时,只是简单地直接传入了一个 ReduceFunction 或 AggregateFunction 进行增量聚合。除此之外,其实还可以传入第二个参数:一个全窗口函数,可以是 WindowFunction 或者 ProcessWindowFunction。


// ReduceFunction 与 WindowFunction 结合
public <R> SingleOutputStreamOperator<R> reduce( ReduceFunction<T> reduceFunction, WindowFunction<T, R, K, W> function) 

// ReduceFunction 与 ProcessWindowFunction 结合
public <R> SingleOutputStreamOperator<R> reduce( ReduceFunction<T> reduceFunction, ProcessWindowFunction<T, R, K, W> function)

// AggregateFunction 与 WindowFunction 结合
public <ACC, V, R> SingleOutputStreamOperator<R> aggregate(AggregateFunction<T, ACC, V> aggFunction, WindowFunction<V, R, K, W> windowFunction)

// AggregateFunction 与 ProcessWindowFunction 结合
public <ACC, V, R> SingleOutputStreamOperator<R> aggregate( AggregateFunction<T, ACC, V> aggFunction, ProcessWindowFunction<V, R, K, W> windowFunction)

这样调用的处理机制是:基于第一个参数(增量聚合函数)来处理窗口数据,每来一个数据就做一次聚合;等到窗口需要触发计算时,则调用第二个参数(全窗口函数)的处理逻辑输
出结果。需要注意的是,这里的全窗口函数就不再缓存所有数据了,而是直接将增量聚合函数的结果拿来当作了 Iterable 类型的输入。一般情况下,这时的可迭代集合中就只有一个元素了。

统计10秒的url浏览量:

package com.rosh.flink.wartermark;

import com.alibaba.fastjson.JSONObject;
import com.rosh.flink.pojo.UserPojo;
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.SlidingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;

import java.security.NoSuchAlgorithmException;
import java.security.SecureRandom;
import java.sql.Timestamp;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.Random;

public class UrlWindowTest {

    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        //读取数据源
        DataStreamSource<UserPojo> userDS = env.fromCollection(getUserLists());
        //水位线
        SingleOutputStreamOperator<UserPojo> waterDS = userDS.assignTimestampsAndWatermarks(WatermarkStrategy.<UserPojo>forMonotonousTimestamps()
                .withTimestampAssigner(new SerializableTimestampAssigner<UserPojo>() {
                    @Override
                    public long extractTimestamp(UserPojo element, long recordTimestamp) {
                        return element.getTimestamp();
                    }
                }));
        //url count
        SingleOutputStreamOperator<Tuple2<String, Long>> urlDS = waterDS.map(new MapFunction<UserPojo, Tuple2<String, Long>>() {
            @Override
            public Tuple2<String, Long> map(UserPojo value) throws Exception {
                return Tuple2.of(value.getUri(), 1L);
            }
        });

        SingleOutputStreamOperator<JSONObject> resultDS = urlDS.keyBy(data -> data.f0)
                .window(TumblingEventTimeWindows.of(Time.seconds(10)))
                .reduce(new ReduceFunction<Tuple2<String, Long>>() {
                    @Override
                    public Tuple2<String, Long> reduce(Tuple2<String, Long> value1, Tuple2<String, Long> value2) throws Exception {
                        value1.f1 = value1.f1 + value2.f1;
                        return value1;
                    }
                }, new WindowFunction<Tuple2<String, Long>, JSONObject, String, TimeWindow>() {
                    @Override
                    public void apply(String s, TimeWindow window, Iterable<Tuple2<String, Long>> input, Collector<JSONObject> out) throws Exception {
                        Tuple2<String, Long> tuple2 = input.iterator().next();
                        JSONObject jsonObject = new JSONObject();
                        jsonObject.put("url", tuple2.f0);
                        jsonObject.put("count", tuple2.f1);

                        new Timestamp(window.getStart());

                        jsonObject.put("startTime", new Timestamp(window.getStart()).toString());
                        jsonObject.put("endTime", new Timestamp(window.getEnd()).toString());
                        out.collect(jsonObject);
                    }
                });

        resultDS.print();

        env.execute("UrlWindowTest");
    }

    private static List<UserPojo> getUserLists() throws NoSuchAlgorithmException {
        List<UserPojo> lists = new ArrayList<>();
        Random random = SecureRandom.getInstanceStrong();

        for (int i = 1; i <= 1000; i++) {
            //随机生成userId、goodId
            int userId = random.nextInt(100);
            int goodId = random.nextInt(50);
            String uri = "/goods/" + goodId;
            //有序时间
            UserPojo userPojo = new UserPojo(userId, "name" + userId, uri, new Date().getTime());
            //无序时间
            lists.add(userPojo);
        }
        return lists;
    }

}

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/335520.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

大坝安全监测系统:水库“守坝人”!

一、项目背景 随着社会经济的迅速发展&#xff0c;我国水资源利用率越来越高&#xff0c;各类水利水电工规模进一步扩大。在抗洪救灾、水利发电等方面带来巨大的经济和社会效益。但受多种因素影响&#xff0c;大坝的安全问题日益严重。大量工程实践证明&#xff0c;为保证大坝…

uniapp 离线本地打包

uniapp打包教程地址 https://nativesupport.dcloud.net.cn/AppDocs/usesdk/android.html点击查看 需要的环境&#xff1a; java (1.8)离线SDK(上面的连接下载即可)Android Studio&#xff08;同上&#xff09; 配置环境变量 依次点击“计算机”&#xff0d;“属性”&#…

通过 指针 引用 多维数组 详解

目录 一&#xff1a;回顾多维数组地址知识 二&#xff1a;二维数组的有关指针 三&#xff1a;指向数组元素的指针变量 四&#xff1a;用指向数组的指针作为函数参数 首先简单来讲&#xff0c;指针变量可以指向一维数组中的元素&#xff0c;也可以指向多维数组中的元素。下面…

Java线程中:Runnable和Callable的区别和联系

点个关注&#xff0c;必回关 文章目录一、Java提供了三种创建线程的方法1.继承Thread2.实现Runnable接口3.通过Callable和Future创建线程二、Runnable和Callable的区别和联系1.定义接口&#xff08;1&#xff09; Runnable&#xff08;2&#xff09;Callable&#xff08;3&…

onnx-graphsurgeon----ONNX计算图修改神器

0. 简介 作为深度学习用户&#xff0c;经常会听到ONNX、TensorRT等一系列常用的文件保存格式。而对于ONNX而言&#xff0c;经常我们会发现在利用TensorRT部署到NVIDIA显卡上时&#xff0c;onnx模型的计算图不好修改&#xff0c;在以前的操作中很多时候大佬是将onnx转换成ncnn的…

vscode中安装python运行调试环境

在运行代码之前&#xff0c;需要到微软商店下载安装python环境&#xff0c;35m&#xff0c;都是自动的。 1、安装python 的extensions插件。 ctrlshiftx 输入 python 后点击 install 按钮。 2、新建文件夹spider文件夹。 3、在新建文件夹spider下新建文件spider.py源代码。…

注册OpenAI体验ChatGPT实战演示

什么是ChatGPT ChatGPT&#xff0c;美国OpenAI 研发的聊天机器人程序 &#xff0c;于2022年11月30日发布的人工智能技术驱动的自然语言处理工具&#xff0c;能够真正像人一样完成交流和任务处理。日前&#xff0c;ChatGPT已经更新多个版本&#xff0c;很多大厂也都在接入其API。…

Spark On YARN时指定Python版本

坑很多&#xff0c;直接上兼容性最佳的命令&#xff0c;将python包上传到hdfs或者file:/home/xx/(此处无多余的/) # client 模式 $SPARK_HOME/spark-submit \ --master yarn \ --deploy-mode client \ --num-executors 2 \ --conf "spark.yarn.dist.archives<Python包…

超店有数,tiktok变现 |TikTok最新选品秘籍!让你爆单销量10W+

商家想要自家产品爆单&#xff0c;获得更高的转化&#xff0c;tiktok变现&#xff0c;选品很重要。但很多商家在选品上&#xff0c;找不到头绪。那不妨看看一下的内容&#xff0c;可以帮助你找到选品的思路方向和方法。一、tiktok选品方向1、 考虑垂直细分产品考虑垂直细分领域…

Vue3快速入门【二】

Vue3快速入门一、传值父传子&#xff0c;子传父v-model二、插槽2.1、匿名插槽2.2、具名插槽2.3、插槽作用域2.4、插槽作用域案例2.4.1、初始布局2.4.2、插槽使用2.4.3、点击编辑按钮获取本行数据&#xff08;插槽作用域的使用&#xff09;2.4.4、类型书写优化2.4.5、全局接口抽…

云计算|OpenStack|社区版OpenStack安装部署文档(十一--- 如何获取镜像---Rocky版)

前言&#xff1a; 前面我们使用虚拟机搭建了一个openstack集群&#xff0c;也就是在VM虚拟机的基础上模拟了一个简单的基于openstack社区版Rocky的私有云&#xff0c;但&#xff0c;不管任何部署安装工作&#xff0c;最后其实都是需要有实际的应用的&#xff0c;也就是常说的实…

拿来就用的Java海报生成器ImageCombiner(一)

背景如果您是UI美工大师或者PS大牛&#xff0c;那本文一定不适合你&#xff1b;如果当您需要自己做一张海报时&#xff0c;可以立马有小伙伴帮您实现&#xff0c;那本文大概率也不适合你。但是&#xff0c;如果你跟我一样&#xff0c;遇上到以下场景&#xff0c;最近公司上了不…

新手小白适合做跨境电商吗?

今天的跨境电商已经逐渐成熟&#xff0c;靠运气赚钱的时代早已过去&#xff0c;馅饼不可能从天上掉下来&#xff0c;尤其是你想做一个没有货源的小白劝你醒醒。做跨境电商真的不容易&#xff0c;要想做&#xff0c;首先要分析自己是否适合做。米贸搜整理了以下资料&#xff0c;…

硬件设计—高性能ADC前端电路

高性能模数转换器&#xff08;ADC&#xff09;一般对系统的性能有非常高的要求&#xff0c;而AD芯片的“前端”的输入电路设计对ADC系统的的性能有非常大的影响。以下主要介绍了ADC芯片前端输入使用放大器和变压器各自的优势。 1、放大器和变压器根本区别 放大器是有源器件&am…

Docker进阶 - 8. docker network 网络模式之 container

目录 1. container 模式概述 2. 使用Alpine操作系统来验证 container 模式 1. container 模式概述 container网络模式新建的容器和已经存在的一个容器共享一个网络ip配置而不是和宿主机共享。新创建的容器不会创建自己的网卡&#xff0c;配置自己的IP&#xff0c;而是和一个…

ChatGPT注册+微信自动回复

ChatGPT注册微信自动回复1 注册OpenAI1.1 科学上网1.2 准备用于注册OpenAI的邮箱1.3 准备用于验证OpenAI的手机号1.4 注册OpenAI1.5 使用OpenAI2 基于ChatGPT微信实现自动回复2.1 原理2.2 实现ChatGPT是OpenAI基于GPT-3模型构建的自然语言处理模型&#xff0c;近几天ChatGPT突然…

这款浏览器火了,能快速使用ChatGPT,简单一看就会

当下ChatGPT很火&#xff0c;影响的人数非常广泛。作为聊天机器人模型&#xff0c;ChatGPT能完成写论文、编代码、写新闻、画画等高难度要求。那么&#xff0c;如何使用ChatGPT呢&#xff1f;下面介绍一下几个使用方法。值得一提的&#xff0c;目前已经有浏览器支持使用ChatGPT…

C++程序设计——多态:虚函数、抽象类、虚函数表

注&#xff1a;以下示例均是在VS2019环境下 一、多态的概念 通俗来讲&#xff0c;多态就是多种形态&#xff0c;当不同的对象去完成某个行为时&#xff0c;会产生出不同的状态。即不同继承关系的类对象&#xff0c;去调用同一函数时&#xff0c;产生不同的行为。 比如”叫“这…

【React】course

《千锋HTML5实战就业班React课程体系V16版本》课程资料 一、关于React 英文官网&#xff1a;https://reactjs.org/ 中文官网&#xff1a;https://zh-hans.reactjs.org/ React 起源于 Facebook 的内部项目&#xff0c;因为该公司对市场上所有 JavaScript MVC 框架&#xff0c…

JAVA工具篇--1 Idea中 Gradle的使用

前言&#xff1a; 既然我们已经使用Maven 来完成对项目的构建&#xff0c;为什么还要使用Gradle 进行项目的构建&#xff1b;gradle和maven都可以作为java程序的构建工具&#xff0c;但两者还是有很大的不同之处的&#xff1a;1.可扩展性&#xff0c;gradle比较灵活&#xff0c…