Flink-窗口概念以及窗口API使用

news2025/2/28 0:07:44

6.3 窗口

6.3.1 窗口的概念

  1. 存储桶

在这里插入图片描述

水位线只是用来推动窗口的关闭,但不决定数据分到哪个窗口

6.3.2 窗口的分类

  1. 按照驱动类型分类
  • 时间窗口
  • 计数窗口

在这里插入图片描述

  1. 按照窗口分配数据的规则分类
  • 滚动窗口:参数为窗口的大小

在这里插入图片描述

  • 滑动窗口:参数为窗口大小,以及滑动步长

数据会重叠

运用场景,每个5分钟统计过去一小时的所有的活跃用户

在这里插入图片描述

  • 会话窗口:参数是会话的超时时间
  • 全局窗口

6.3.3 窗口API的概览

  1. 按键分区窗口

经过按键分区后(keyby),数据流会按照key被分为多条逻辑流,就是keyStream

stream.keyBy(...)
      .window(...)
  1. 非按键分区

stream直接开窗,所有数据收集到窗口中,也就是并行度变成1,官方不推荐

stream.windowAll(...)
  1. ​ 窗口API的调用
stream.keyBy(<key selector>)
      .window(<window assigner>)//窗口分配器:要分配什么窗口
      .aggregate(<window function>)//窗口函数:具体计算操作

6.3.4 窗口分配器

在这里插入图片描述


在这里插入图片描述

直接调用window()方法传入WindowAssigner,返回WindowedStream

在这里插入图片描述

在这里插入图片描述

WindowAssigner是一个抽象类,并且有assignWindows()方法,但是flink有抽象类的实现类,直接用实现类就好,不用自己整一个

  1. 滚动窗口

在这里插入图片描述

主要是根据窗口分类而设置的实现类,细分下面还有时间语义

例如图中的TumblingEventTimeWindows(事件时间)或者TumblingProcessingTimeWindows(处理时间)

在这里插入图片描述

在这里插入图片描述

TumblingEventTimeWindows的静态方法of需要传入一个Time大小,或者Time大小以及Time的偏移量,偏移量一般用在时差里面(伦敦时间和东八区时间)

在这里插入图片描述

Time这个类是flink下的,选包别选错,Time下就有很多方法了,例如.hour取小时

在这里插入图片描述

所以最后滚动事件时间窗口这么写

  1. 滑动窗口
    在这里插入图片描述

其他的窗口还有SlidingEventTimeWindows滑动窗口,这个就有两个参数了,一个窗口大小,一个滑动步长,当然也有三个参数,跟滑动窗口一样可以多一个offset偏移量,也是用在计算时差(伦敦时间和东八区时间)

  1. 会话窗口

在这里插入图片描述

EventTimeSessionWindows是事件事件会话窗口,参数就是会话的时间了

  1. 计数窗口

在这里插入图片描述


在这里插入图片描述


在这里插入图片描述

countWindow是计数窗口,传一个参数表示滚动窗口,传两个参数表示滑动窗口,return的是全局窗口的,以及移除器和触发器

6.3.5 窗口函数

  1. 整体介绍

在这里插入图片描述

在这里插入图片描述

  • 一般数据源获取到后做一些map等基本操作,返回的还是DataStream
  • 如果keyby了,就变成了keyedStream,再做聚合操作,返回的DataStream
  • 或者keyby后开窗,经过窗口分配器后,得到WindowedStream,再进行窗口函数,得到返回DataStream
  1. 增量聚合函数

流处理思路做批处理,依旧是来一个处理一个,等到时间点,输出计算好的数据

  • 归约函数ReduceFunction
    在这里插入图片描述

在这里插入图片描述

参数要传入ReduceFunction,和转换算子那一章keyby后用到的聚合函数一样的,同个类,即把集合每一个数据拿出来,然后按照一定的规则不停的规约,最终得到一个唯一规约聚合后的结果

    public static void main(String[] args) throws Exception{
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        //1.自定义Source输入
            SingleOutputStreamOperator<Event> stream = env.addSource(new ClickSource())
                .assignTimestampsAndWatermarks(WatermarkStrategy
                        //乱序事件为0,相当于升序
                        .<Event>forBoundedOutOfOrderness(Duration.ZERO)//得到WatermarkGenerator
                        .withTimestampAssigner(new SerializableTimestampAssigner<Event>() {
                            @Override
                            public long extractTimestamp(Event element, long recordTimestamp) {
                                return element.timestamp;
                            }
                        })
                );

        stream.map(new MapFunction<Event, Tuple2<String,Long>>() {
            @Override
            public Tuple2<String, Long> map(Event value) throws Exception {
                return Tuple2.of(value.user,1L);
            }
        })
                .keyBy(data->data.f0)
                //.countWindow(10,2)//滑动计数窗口
                //.window(EventTimeSessionWindows.withGap(Time.seconds(2)))//事件事件会话窗口
                //.window(SlidingEventTimeWindows.of(Time.hours(1),Time.minutes(5)))//滑动事件时间窗口
                .window(TumblingEventTimeWindows.of(Time.seconds(10)))//滚动事件时间窗口
                .reduce(new ReduceFunction<Tuple2<String, Long>>() {
                    @Override
                    public Tuple2<String, Long> reduce(Tuple2<String, Long> value1, Tuple2<String, Long> value2) throws Exception {
                        return Tuple2.of(value1.f0,value1.f1+value2.f1);
                    }
                })
                        .print();

        env.execute();
    }
}

结果

(Mary,3)
(Bob,3)
(Alice,4)

(Bob,2)
(Alice,4)
(Mary,4)

(Alice,3)
(Bob,5)
(Mary,2)

(Alice,3)
(Bob,5)
(Mary,2)

(Mary,4)
(Alice,3)
(Bob,3)

Process finished with exit code 130
数据源一直产生,会一直不停,每隔十秒会输出一段
  • 聚合函数(AggregateFunction)

1)相比于归约函数reduce的特点是,有三个泛型,并且可以更改输出的类型

2)有三个类型,输入类型(In),累加器类型(ACC),输出类型(OUT)

3)重写4个方法createAccumulator() ,add(Event value, Tuple2<Long, Integer> accumulator),getResult(Tuple2<Long, Integer> accumulator),merge(Tuple2<Long, Integer> a, Tuple2<Long, Integer> b)

案例1

public class WindowAggregateTest {
    public static void main(String[] args) throws Exception{
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        //1.自定义Source输入
        SingleOutputStreamOperator<Event> stream = env.addSource(new ClickSource())

                .assignTimestampsAndWatermarks(WatermarkStrategy
                        //乱序事件为0,相当于升序
                        .<Event>forBoundedOutOfOrderness(Duration.ZERO)//得到WatermarkGenerator
                        .withTimestampAssigner(new SerializableTimestampAssigner<Event>() {
                            @Override
                            public long extractTimestamp(Event element, long recordTimestamp) {
                                return element.timestamp;
                            }
                        })
                );

        stream.keyBy(data->data.user)
                        .window(TumblingEventTimeWindows.of(Time.seconds(10)))
                        .aggregate(new AggregateFunction<Event, Tuple2<Long,Integer>, String>() {//中间类型:一个是所有类型的和,一个是所有数的个数
                            @Override
                            public Tuple2<Long, Integer> createAccumulator() {//创建一个累加器
                                return Tuple2.of(0L,0);//写初始值
                            }

                            @Override
                            //叠加器,参数是传过来的数值,还有当前的状态,返回的也是当前的状态的类型,这个过程主要涉及状态的改变
                            public Tuple2<Long, Integer> add(Event value, Tuple2<Long, Integer> accumulator) {
                                return Tuple2.of(accumulator.f0+value.timestamp,accumulator.f1+1);//前面是叠加规则,后面是个数
                            }

                            @Override
                            //输出结果,返回类型变化,变成String
                            public String getResult(Tuple2<Long, Integer> accumulator) {
                                //汇总数/个数得到平均数
                                Timestamp timestamp = new Timestamp(accumulator.f0 / accumulator.f1);
                                return timestamp.toString();//转成String输出
                            }

                            @Override
                            //merge合并累加器,一般用于会话窗口
                            //这边可以实现,也可以不实现,下面是实现的写法
                            public Tuple2<Long, Integer> merge(Tuple2<Long, Integer> a, Tuple2<Long, Integer> b) {
                                return Tuple2.of(a.f0+b.f0,a.f1+b.f1);//(和,个数)
                            }
                        })
                        .print();
        env.execute();
    }
}

结果

2022-11-22 22:44:19.803
    
2022-11-22 22:44:25.389
2022-11-22 22:44:24.522
2022-11-22 22:44:27.921
    
2022-11-22 22:44:34.588
2022-11-22 22:44:35.089
2022-11-22 22:44:35.293
    
2022-11-22 22:44:44.558
2022-11-22 22:44:44.349
2022-11-22 22:44:45.701
每隔十秒输出某一用户的访问时间戳平均值,意义不大,主要看的是aggregate一步可以实现求平均值用法

案例2

public class WindowAggregateTest {
    public static void main(String[] args) throws Exception{
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        //1.自定义Source输入
        SingleOutputStreamOperator<Event> stream = env.addSource(new ClickSource())

                .assignTimestampsAndWatermarks(WatermarkStrategy
                        //乱序事件为0,相当于升序
                        .<Event>forBoundedOutOfOrderness(Duration.ZERO)//得到WatermarkGenerator
                        .withTimestampAssigner(new SerializableTimestampAssigner<Event>() {
                            @Override
                            public long extractTimestamp(Event element, long recordTimestamp) {
                                return element.timestamp;
                            }
                        })
                );

        stream.keyBy(data->data.user)
                        .window(TumblingEventTimeWindows.of(Time.seconds(10)))
                        .aggregate(new AggregateFunction<Event, Tuple2<Long,Integer>, String>() {//中间类型:一个是所有类型的和,一个是所有数的个数
                            @Override
                            public Tuple2<Long, Integer> createAccumulator() {//创建一个累加器
                                return Tuple2.of(0L,0);//写初始值
                            }

                            @Override
                            //叠加器,参数是传过来的数值,还有当前的状态,返回的也是当前的状态的类型,这个过程主要涉及状态的改变
                            public Tuple2<Long, Integer> add(Event value, Tuple2<Long, Integer> accumulator) {
                                return Tuple2.of(accumulator.f0+value.timestamp,accumulator.f1+1);//前面是叠加规则,后面是个数
                            }

                            @Override
                            //输出结果,返回类型变化,变成String
                            public String getResult(Tuple2<Long, Integer> accumulator) {
                                //汇总数/个数得到平均数
                                Timestamp timestamp = new Timestamp(accumulator.f0 / accumulator.f1);
                                return timestamp.toString();//转成String输出
                            }

                            @Override
                            //merge合并累加器,一般用于会话窗口
                            //这边可以实现,也可以不实现,下面是实现的写法
                            public Tuple2<Long, Integer> merge(Tuple2<Long, Integer> a, Tuple2<Long, Integer> b) {
                                return Tuple2.of(a.f0+b.f0,a.f1+b.f1);//(和,个数)
                            }
                        })
                        .print();
        env.execute();
    }
}

结果

在这里插入图片描述

这边url不对,后续已经改正,但是不影响此次结果查看

  1. 全窗口函数
  • 分析

在这里插入图片描述

ProcessWindowFunction是一个抽象类,继承了富函数类AbstractRichFunction,并且拥有4个泛型,并且最后一个泛型W继承了Window,可以选择TimeWindow作为子类传入

在这里插入图片描述

可以继承这个类后实现process()方法,参数分别是KEY key, Context context, Iterable elements, Collector out,第一个参数key的类型,第二个是上下文,第三个是输入(迭代器),第四个输出

在这里插入图片描述

第二个参数上下文中有很多属性,例如窗口,当前处理时间,当前watermark,以及获取当前的状态还有侧输出流

在这里插入图片描述

在这里插入图片描述

在这里插入图片描述

TimeWindow这个类中getStart()和getEnd()用,一般通过上下文调用window返回TimeWindow,然后在调用getStart()和getEnd()用方法,例如Long start = context.window()

  • 代码
public class WindowProcessTest {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        //1.自定义Source输入
        SingleOutputStreamOperator<Event> stream = env.addSource(new ClickSource())

                .assignTimestampsAndWatermarks(WatermarkStrategy
                        //乱序事件为0,相当于升序
                        .<Event>forBoundedOutOfOrderness(Duration.ZERO)//得到WatermarkGenerator
                        .withTimestampAssigner(new SerializableTimestampAssigner<Event>() {
                            @Override
                            public long extractTimestamp(Event element, long recordTimestamp) {
                                return element.timestamp;
                            }
                        })
                );
        //使用ProcessWindowFunction计算UV
        stream.keyBy(data->true)
                .window(TumblingEventTimeWindows.of(Time.seconds(10)))
                .process(new UvCountByWindow())
                .print();

        env.execute();
    }



    //实现自定义的ProcessWindowFunction,输出一条统计信息
    public static class UvCountByWindow extends ProcessWindowFunction<Event,String,Boolean,TimeWindow>{
        @Override
        //参数process(KEY key, Context context上下文, Iterable<IN> elements, Collector<OUT> out)
        public void process(Boolean aBoolean, Context context, Iterable<Event> elements, Collector<String> out) throws Exception {
            //1.用一个HashSet保存user
            HashSet<String> userSet = new HashSet<>();
            //2.从elements遍历数据,放到set中去重
            for (Event event : elements) {
                userSet.add(event.user);
            }

            Integer uv = userSet.size();

            //3.结合窗口信息
            Long start = context.window().getStart();
            Long end = context.window().getEnd();

            //4.输出
            out.collect("窗口"+new Timestamp(start)+"~"+new Timestamp(end)
            +"  UV值为:"+uv);

        }
    }
}
  • 结果
窗口2022-11-23 00:00:20.0~2022-11-23 00:00:30.0  UV值为:2
窗口2022-11-23 00:00:30.0~2022-11-23 00:00:40.0  UV值为:3
窗口2022-11-23 00:00:40.0~2022-11-23 00:00:50.0  UV值为:3
  1. 两种函数结合
  • 概述

增量函数看不到窗口信息,全窗口是将数据攒起来后进行批处理,因此是调用增量函数中add方法,窗口结束将getResult方法结果以参数形式作为element(第三个参数)输出给到全窗口函数的process()方法

在这里插入图片描述

  • 代码
public class UvCountExample {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        //1.自定义Source输入
        SingleOutputStreamOperator<Event> stream = env.addSource(new ClickSource())

                .assignTimestampsAndWatermarks(WatermarkStrategy
                        //乱序事件为0,相当于升序
                        .<Event>forBoundedOutOfOrderness(Duration.ZERO)//得到WatermarkGenerator
                        .withTimestampAssigner(new SerializableTimestampAssigner<Event>() {
                            @Override
                            public long extractTimestamp(Event element, long recordTimestamp) {
                                return element.timestamp;
                            }
                        })
                );
        stream.print();

        //使用AggregateFunction和ProcessWindowFunction结合计算UV
        stream.keyBy(data -> true)
                .window(TumblingEventTimeWindows.of(Time.seconds(10)))
                .aggregate(new UvAgg(),new UvCountResult())
                .print();
        env.execute();
    }

    //自定义实现AggregateFunction,目的在于增量计算uv值
    public static class UvAgg implements AggregateFunction<Event, HashSet<String>,Long>{

        @Override
        public HashSet<String> createAccumulator() {
            return new HashSet<>();
        }

        @Override
        public HashSet<String> add(Event value, HashSet<String> accumulator) {
            accumulator.add(value.user);
            return accumulator;
        }

        @Override
        public Long getResult(HashSet<String> accumulator) {
            return (long)accumulator.size();
        }

        @Override
        public HashSet<String> merge(HashSet<String> a, HashSet<String> b) {
            return null;
        }
    }

    //自定义实现ProcessWindowFunction,包装窗口信息输出
    public static class UvCountResult extends ProcessWindowFunction<Long,String, Boolean,TimeWindow>{

        @Override
        public void process(Boolean aBoolean, ProcessWindowFunction<Long, String, Boolean, TimeWindow>.Context context, Iterable<Long> elements, Collector<String> out) throws Exception {
            //3.结合窗口信息
            Long start = context.window().getStart();
            Long end = context.window().getEnd();
            Long uv = elements.iterator().next();
            //4.输出
            out.collect("窗口"+new Timestamp(start)+"~"+new Timestamp(end)
                    +"  UV值为:"+uv);
        }
    }
}
  • 结果
Event{user='Mary', url='./cart', timestamp=2022-11-23 21:02:49.478}
Event{user='Alice', url='./home', timestamp=2022-11-23 21:02:50.496}
窗口2022-11-23 21:02:40.0~2022-11-23 21:02:50.0  UV值为:1
Event{user='Mary', url='./home', timestamp=2022-11-23 21:02:51.504}
Event{user='Alice', url='./prod?id=100', timestamp=2022-11-23 21:02:52.513}
Event{user='Alice', url='./prod?id=100', timestamp=2022-11-23 21:02:53.588}
Event{user='Alice', url='./cart', timestamp=2022-11-23 21:02:54.602}
Event{user='Bob', url='./prod?id=100', timestamp=2022-11-23 21:02:55.615}
Event{user='Mary', url='./prod?id=100', timestamp=2022-11-23 21:02:56.627}
Event{user='Mary', url='./home', timestamp=2022-11-23 21:02:57.627}
Event{user='Mary', url='./prod?id=100', timestamp=2022-11-23 21:02:58.634}
Event{user='Mary', url='./prod?id=100', timestamp=2022-11-23 21:02:59.644}
Event{user='Alice', url='./home', timestamp=2022-11-23 21:03:00.65}
窗口2022-11-23 21:02:50.0~2022-11-23 21:03:00.0  UV值为:3
  1. 统计热门url案例
  • 代码
public class UrlCountViewExample {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        //1.自定义Source输入
        SingleOutputStreamOperator<Event> stream = env.addSource(new ClickSource())

                .assignTimestampsAndWatermarks(WatermarkStrategy
                        //乱序事件为0,相当于升序
                        .<Event>forBoundedOutOfOrderness(Duration.ZERO)//得到WatermarkGenerator
                        .withTimestampAssigner(new SerializableTimestampAssigner<Event>() {
                            @Override
                            public long extractTimestamp(Event element, long recordTimestamp) {
                                return element.timestamp;
                            }
                        })
                );
        stream.print("input");

        //统计每个url的访问量
        stream.keyBy(data -> data.url)
            .window(TumblingEventTimeWindows.of(Time.seconds(10)))
            .aggregate(new UrlViewCountAgg(),new UrlViewCountResult())
            .print();

        env.execute();
    }


    //增量聚合,来一条数据就加1
    private static class UrlViewCountAgg implements AggregateFunction<Event,Long,Long> {
        @Override
        public Long createAccumulator() {
            return 0L;
        }

        @Override
        public Long add(Event value, Long accumulator) {
            return accumulator+1;
        }

        @Override
        public Long getResult(Long accumulator) {
            return accumulator;
        }

        @Override
        public Long merge(Long a, Long b) {
            return null;
        }
    }

    //包装窗口信息,然后输出UrlViewCount
    //输入就是增量函数的输出
    private static class UrlViewCountResult extends ProcessWindowFunction<Long,UrlViewCount,String, TimeWindow>{

        @Override
        public void process(String url,Context context, Iterable<Long> elements, Collector<UrlViewCount> out) throws Exception {
            //3.结合窗口信息
            Long start = context.window().getStart();
            Long end = context.window().getEnd();
            Long count = elements.iterator().next();

            //4.输出
            out.collect(new UrlViewCount(url,count,start,end));
        }
    }
}
  • 结果
input> Event{user='Bob', url='./home', timestamp=2022-11-23 22:02:55.388}
input> Event{user='Mary', url='./prod?id=100', timestamp=2022-11-23 22:02:56.409}
input> Event{user='Mary', url='./prod?id=100', timestamp=2022-11-23 22:02:57.411}
input> Event{user='Alice', url='./prod?id=100', timestamp=2022-11-23 22:02:58.418}
input> Event{user='Alice', url='./fav', timestamp=2022-11-23 22:02:59.419}
input> Event{user='Bob', url='./home', timestamp=2022-11-23 22:03:00.42}
UrlViewCount{url='./home', count=1, windowStart=2022-11-23 22:02:50.0, windowEnd=2022-11-22 22:03:00.0}
UrlViewCount{url='./prod?id=100', count=3, windowStart=2022-11-23 22:02:50.0, windowEnd=2022-11-23 22:03:00.0}

最后一条input不算入窗口

6.3.6 其他API

  1. 触发器(Trigger)
  • 用法
stream.keyBy(...)
      .window(...)
      .trigger(new MyTrigger)
  • 分析

在这里插入图片描述

Trigger是一个抽象类

在这里插入图片描述

在这里插入图片描述

CountTrigger是实现类,使用onEventTime()方法返回TriggerResult类,即触发的结果

在这里插入图片描述

TriggerResult是一个枚举类,有CONTINUE(不动),FIRE_AND_PURGE,FIRE(发射到下游),PURGE(把窗口清空关闭)

后面听不懂了,毁灭吧

  1. 移除器(Evictor)
stream.keyBy(...)
 .window(...)
 .evictor(new MyEvictor())
  1. 允许延迟
  • 窗口延迟关闭
stream.keyBy(...)
 .window(TumblingEventTimeWindows.of(Time.hours(1)))
 .allowedLateness(Time.minutes(1))
  • 测输出流

大招:把迟到的数据放到测输出流

stream.keyBy(...)
 .window(TumblingEventTimeWindows.of(Time.hours(1)))
.sideOutputLateData(outputTag)
  • 代码测试
public class LateDataTest {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        SingleOutputStreamOperator<Event> stream = env.socketTextStream("hadoop2",7777)
                .map(new MapFunction<String, Event>() {
                    @Override
                    public Event map(String value) throws Exception {
                        String[] fileds = value.split(",");
                        return new Event(fileds[0].trim(),fileds[1].trim(),Long.valueOf(fileds[2].trim()));
                    }
                })


                .assignTimestampsAndWatermarks(WatermarkStrategy
                        //乱序事件为0,相当于升序
                        .<Event>forBoundedOutOfOrderness(Duration.ofSeconds(2))//得到WatermarkGenerator
                        .withTimestampAssigner(new SerializableTimestampAssigner<Event>() {
                            @Override
                            public long extractTimestamp(Event element, long recordTimestamp) {
                                return element.timestamp;
                            }
                        })
                );

        stream.print("input");

        //定义一个输出标签
        //使用匿名类定义测输出流标签定义出来
        OutputTag<Event> late = new OutputTag<Event>("late"){};

        //统计每个url的访问量
        SingleOutputStreamOperator<UrlViewCount> result = stream.keyBy(data -> data.url)
                .window(TumblingEventTimeWindows.of(Time.seconds(10)))
                .allowedLateness(Time.minutes(1))//允许一分钟的延迟
                .sideOutputLateData(late)//测输出流
            	//可以传入两个参数,一个AggregateFunction<,另一个ProcessWindowFunction
                .aggregate(new UrlCountViewExample.UrlViewCountAgg(),
                new UrlCountViewExample.UrlViewCountResult());

        result.print("result");
        result.getSideOutput(late).print("late");
        env.execute();
    }
}

结果

在这里插入图片描述

在这里插入图片描述

当输入Bob,./prod?id=10,12000的时候,0-10的窗口才会关闭(依据水位线10),并计算前面4条的属于0-10窗口结果,

当再次输入Bob,./prod?id=20,8000,Bob,./prod?id=10,9000,由于设置了allowedLatenes窗口延迟一分钟,因此仍然可以叠加计算并输出结果

即使Bob,./prod?id=10,70000后,关闭的是10-20秒的窗口,如果后面有20-30也会关闭,即使水位线在68秒,继续输入Mary,./home,6500,也会继续叠加计算

只有当输入Bob,./prod?id=10,72000的时候,水位线猜到了70,那么窗口时间根据水位线以及延迟的1分钟,即0-10的窗口才是真正关闭掉了

窗口关闭,此时再输入数据Bob,./cart,7000,就会被放到测输出流中

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/39108.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

[CVPR2022] Debiased Learning from Naturally Imbalanced Pseudo-Labels

Debiased Learning from Naturally Imbalanced Pseudo-Labels 要点&#xff1a; 1、伪标签&#xff1a;由经过标记源数据训练的分类器&#xff0c;对未标记目标数据做出的置信预测&#xff0c;被广泛应用于使模型适应未标记数据&#xff0c;例如半监督学习 2、由于固有的数据…

bat批处理脚本大全

目录 1、echo 2、注释 3、常见cmd命令 4、参数与变量 5、for循环 6、函数 7、数组 在windows上编程或者制作一些小工具&#xff0c;少不了使用批处理脚本&#xff0c;而且在各种开发环境搭建中我们经常会看到批处理脚本。批处理脚本以cmd命令为基础&#xff0c;增加一些变量和参…

【学生毕业设计】基于web学生信息管理系统网站的设计与实现(13个页面)

&#x1f389;精彩专栏推荐 &#x1f4ad;文末获取联系 ✍️ 作者简介: 一个热爱把逻辑思维转变为代码的技术博主 &#x1f482; 作者主页: 【主页——&#x1f680;获取更多优质源码】 &#x1f393; web前端期末大作业&#xff1a; 【&#x1f4da;毕设项目精品实战案例 (10…

【计算机毕业设计】50.课程设计管理系统

一、系统截图&#xff08;需要演示视频可以私聊&#xff09; 摘 要 网络的广泛应用给生活带来了十分的便利。所以把课程设计管理与现在网络相结合&#xff0c;利用JSP技术建设课程设计管理系统&#xff0c;实现课程设计管理的信息化。则对于进一步提高课程设计管理发展&#x…

【图像分割】基于神经气体网络 (NGN)实现图像分割附matlab代码

✅作者简介&#xff1a;热爱科研的Matlab仿真开发者&#xff0c;修心和技术同步精进&#xff0c;matlab项目合作可私信。 &#x1f34e;个人主页&#xff1a;Matlab科研工作室 &#x1f34a;个人信条&#xff1a;格物致知。 更多Matlab仿真内容点击&#x1f447; 智能优化算法 …

华为开源自研AI框架昇思MindSpore应用实践:DCGAN生成漫画头像

目录一、原理说明1.GAN基础原理2.DCGAN原理二、环境准备1.进入ModelArts官网2.使用CodeLab体验Notebook实例三、数据准备与处理1.数据处理四、创建网络1.生成器2.判别器3.损失和优化器4.优化器五、训练模型六、结果展示本教程是通过示例代码说明DCGAN网络如何设置网络、优化器、…

Pytorch学习笔记 (参考官方教程)

参考&#xff1a; pytorch官网教程 文章目录一、快速开始&#xff08;Quick Start&#xff09;数据处理&#xff08;Working with data&#xff09;创建模型&#xff08;Creating Models&#xff09;优化模型参数&#xff08;Optimizing the Model Parameters&#xff09;保存模…

光环:研发云搭建及人才梯队建设——姚冬

摘要&#xff1a;文章内容主要来源于光环国际2022年第三届中国科创者大会姚冬老师的分享&#xff0c;原分享名称为"数字化时代的研发效能建设"。讲述了华为在研发上整套流程规范&#xff0c;通过云的方式去实现人机协同&#xff0c;保持人去做创新型工作。对人才梯队…

方形平板振动克拉尼图形可视化计算MATLAB程序(Chladni Patterns)

方形平板振动克拉尼图形可视化计算MATLAB程序&#xff08;Chladni Patterns&#xff09;0前言1 数值时域求解1.1 方程建立1.2 数值差分方程建立1.3 计算结果2 简单的波动解3 理论求解惯例声明&#xff1a;本人没有相关的工程应用经验&#xff0c;只是纯粹对相关算法感兴趣才写此…

云计算技术架构-云计算四种模式(公有云、私有云、混合云、行业云)

接下来几篇主要从技术角度介绍云计算的架构&#xff1a;  云计算四种模式&#xff1a;公有云、私有云、混合云和行业云&#xff08;本文讲述&#xff09; 云计算架构&#xff1a;基础架构层、云平台层、业务应用层和业务管理层  云计算服务模式&#xff1a;IaaS、PaaS和…

Python按单元格读取复杂电子表格(Excel)数据实践

Python读取电子表格方法 本文所使用电子表格的目标是读取、解析来自Excel编制的数据报表&#xff0c;或者软件界面导出的数据报表&#xff0c;这类电子表格报表显著特点是有一定的格式&#xff0c;且数据位置不连续&#xff0c;而非标准二维数据表。 关于电子表格&#xff0c…

基于粒子群算法的配电网重构研究matlab程序

基于粒子群算法的配电网重构研究matlab程序 参考文献&#xff1a;基于改进灰狼算法的含分布式电源配电网重构研究 &#xff08;本文未考虑分布式电源&#xff09; 摘要&#xff1a;使用基本环矩阵编码的智能优化算法在处理配电网重构问题中&#xff0c;通常使用无序的解空间&a…

机械硬盘,Win10系统,磁盘100%

问题描述 使用机械硬盘&#xff0c;装了Win10系统&#xff0c;打开文件夹或程序&#xff0c;非常的慢&#xff0c;通过任务管理器查看性能&#xff0c;显示磁盘一直处于100%的状态。电脑概览如下&#xff1a; 电脑型号 技嘉 B460MAORUSPRO 操作系统 Microsoft Windows 10 专业…

PyQt5学习笔记--多线程处理、数据交互

目录 1--引入多线程的原因 2--PyQt多线程的基本知识 3--多线程登录程序的实例 4--参考 1--引入多线程的原因 ① 如果Qt只采用单线程任务的方式&#xff0c;当遇到数据处理慢的情形时&#xff0c;会出现GUI卡死的情况。 ② 使用下述例子展示单线程任务的缺陷&#xff1a; …

Java学习之继承的本质(重要)

目录 目录 一、一个继承的代码案例 二、子类创建的内存布局 三、查找顺序 一、son.name的输出结果 二、son.age的输出结果 三、son.hobby的输出结果 一、一个继承的代码案例 package com.hspedu.entends_;/*** 讲解继承的本质*/ public class ExtendsTheory {public sta…

Spring Boot 项目的创建和简单使用

目录 1. 什么是 Spring Boot, Spring Boot 框架有什么优点 2. Spring Boot 项目的创建 2.1 在 IDEA 下安装 Spring Boot Helper 插件: 2.2 创建 Spring Boot 项目: 2.3 网页版创建 Spring Boot 项目 3. Spring Boot 通过路由映射到本地程序 1. 什么是 Spring Boot, Spring …

MFC编辑框控件属性和用法

目录 一、编辑框的属性 1.want return 2.Multiline 3.滚动条 4.添加完效果 二、初始化编辑框内容 三、复制与退出 四、edit control的值类型 五、思维拓展 一、编辑框的属性 默认情况下编辑框edit control 是可以横向无限输入的 1.want return 支持换行&#xff0c;…

dreamweaver作业静态HTML网页设计模板——迪士尼影视电影(6页)带音乐

HTML实例网页代码, 本实例适合于初学HTML的同学。该实例里面有设置了css的样式设置&#xff0c;有div的样式格局&#xff0c;这个实例比较全面&#xff0c;有助于同学的学习,本文将介绍如何通过从头开始设计个人网站并将其转换为代码的过程来实践设计。 文章目录一、网页介绍一…

Private Execution on Blockchain

1.Alan Szepieniec: Ghost-Queen Chess——复杂金融合约 Alan Szepieniec为Neptune合伙人。 为何需关注decentralized Finance&#xff1f; 1&#xff09;从学术角度来看&#xff1a;是 密码学 ∩\cap∩ 分布式系统 ∩\cap∩ 经济学 ∩\cap∩ …的集合。2&#xff09;从工程…

基于蚁群算法的多配送中心的车辆调度问题的研究附Matlab代码

✅作者简介&#xff1a;热爱科研的Matlab仿真开发者&#xff0c;修心和技术同步精进&#xff0c;matlab项目合作可私信。 &#x1f34e;个人主页&#xff1a;Matlab科研工作室 &#x1f34a;个人信条&#xff1a;格物致知。 更多Matlab仿真内容点击&#x1f447; 智能优化算法 …