【API篇】八、Flink窗口函数

news2025/1/10 23:15:12

文章目录

  • 1、增量聚合之ReduceFunction
  • 2、增量聚合之AggregateFunction
  • 3、全窗口函数full window functions
  • 4、增量聚合函数搭配全窗口函数
  • 5、会话窗口动态获取间隔值
  • 6、触发器和移除器
  • 7、补充

//窗口操作
stream.keyBy(<key selector>)
       .window(<window assigner>)
       .aggregate(<window function>)

上一节的窗口分配器,指明了窗口类型,知道了数据属于哪个窗口并收集。而窗口函数,则是定义如何对这些数据做计算操作。

在这里插入图片描述

  • 增量聚合来一条数据,计算一条数据,窗口触发的时候输出计算结果
  • 全窗口函数数据来了不计算,存起来,窗口触发的时候,计算并输出计算结果

1、增量聚合之ReduceFunction

public class WindowReduceDemo {

    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        
        env.setParallelism(1);

        env.socketTextStream("node01", 9527)
           .map(new WaterSensorMapFunction())
           .keyBy(r -> r.getId())
           // 设置滚动事件时间窗口
           .window(TumblingProcessingTimeWindows.of(Time.seconds(30)))
           .reduce(new ReduceFunction<WaterSensor>() {

               @Override
               public WaterSensor reduce(WaterSensor value1, WaterSensor value2) throws Exception {
                   System.out.println("调用reduce方法,value1=:"+value1 + ",value2=:"+value2);
                   return new WaterSensor(value1.getId(), value2.getTs(), value1.getVc()+value2.getVc());
               }
           })
           .print();

        env.execute();
    }
}

运行,输入数据,查看控制台:

在这里插入图片描述

2、增量聚合之AggregateFunction

上面使用ReduceFunction的限制是,输入数据的类型、聚合中间状态的类型、输出结果的类型必须一致,AggregateFunction则没有这个限制。AggregateFunction接口有四个方法:

  • createAccumulator:创建一个累加器,这就是为聚合创建了一个初始状态,每个聚合任务只会调用一次。
  • add:将输入的元素添加到累加器中。
  • getResult:从累加器中提取聚合的输出结果。
  • merge:合并两个累加器,并将合并后的状态作为一个累加器返回

AggregateFunction的工作原理是:首先调用createAccumulator()为任务初始化一个状态(累加器);而后每来一个数据就调用一次add()方法,对数据进行聚合,得到的结果保存在状态中;等到了窗口需要输出时,再调用getResult()方法得到计算结果

public class WindowAggregateDemo {

    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        
        env.setParallelism(1);

        SingleOutputStreamOperator<WaterSensor> sensorDS = env
                .socketTextStream("node01", 9527)
                .map(new WaterSensorMapFunction());    //自定义的实现类,String转自定义对象WaterSensor


        KeyedStream<WaterSensor, String> sensorKS = sensorDS.keyBy(sensor -> sensor.getId());

        // 1. 窗口分配器
        WindowedStream<WaterSensor, String, TimeWindow> sensorWS = sensorKS.window(TumblingProcessingTimeWindows.of(Time.seconds(10)));

        SingleOutputStreamOperator<String> aggregate = sensorWS
                .aggregate(
                        new AggregateFunction<WaterSensor, Integer, String>() {
                            @Override
                            public Integer createAccumulator() {
                                System.out.println("创建累加器");
                                return 0;
                            }
							
							//value即输入的数据,accumulator即之前的计算结果
                            @Override
                            public Integer add(WaterSensor value, Integer accumulator) {
                                System.out.println("调用add方法,value="+value);
                                return accumulator + value.getVc();
                            }

                            @Override
                            public String getResult(Integer accumulator) {
                                System.out.println("调用getResult方法");
                                return accumulator.toString();
                            }

                            @Override
                            public Integer merge(Integer a, Integer b) {
                                System.out.println("调用merge方法");
                                return null;
                            }
                        }
                );
        
        aggregate.print();

        env.execute();
    }
}

运行,输入数据,查看控制台:

在这里插入图片描述

3、全窗口函数full window functions

全窗口函数,即数据来了不计算,存起来,窗口触发的时候,计算并输出计算结果Flink全窗口函数有两种,第一种为apply方法下的:

stream
    .keyBy(<key selector>)
    .window(<window assigner>)
    .apply(new MyWindowFunction());

传入一个WindowFunction的实现类,该方法已被第二种ProcessWindowFunction全覆盖,因而逐渐弃用。ProcessWindowFunction除了可以拿到窗口中的所有数据之外,还可以获取到一个“上下文对象”(Context),通过这个上下文对象,可以获取窗口对象、窗口处理时间、事件时间水位线

public class WindowProcessDemo {
    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        
        env.setParallelism(1);

        SingleOutputStreamOperator<WaterSensor> sensorDS = env
                .socketTextStream("node01", 9527)
                .map(new WaterSensorMapFunction());

        KeyedStream<WaterSensor, String> sensorKS = sensorDS.keyBy(sensor -> sensor.getId());

        // 1. 窗口分配器
        WindowedStream<WaterSensor, String, TimeWindow> sensorWS = sensorKS.window(TumblingProcessingTimeWindows.of(Time.seconds(10)));

        SingleOutputStreamOperator<String> process = sensorWS
                .process(
                        new ProcessWindowFunction<WaterSensor, String, String, TimeWindow>() {
                        	/**
                        	* 全窗口函数计算逻辑,窗口结束时触发才调用一次
                        	* s 分组的key
                        	* context 上下文对象
                        	* elements 窗口内存的所有数据
                        	* out 采集器对象
                        	*/
                            @Override
                            public void process(String s, Context context, Iterable<WaterSensor> elements, Collector<String> out) throws Exception {
                                long count = elements.spliterator().estimateSize();
                                long windowStartTs = context.window().getStart();
                                long windowEndTs = context.window().getEnd();
                                String windowStart = DateFormatUtils.format(windowStartTs, "yyyy-MM-dd HH:mm:ss.SSS");
                                String windowEnd = DateFormatUtils.format(windowEndTs, "yyyy-MM-dd HH:mm:ss.SSS");

                                out.collect("key=" + s + "的窗口[" + windowStart + "," + windowEnd + ")包含" + count + "条数据===>" + elements.toString());
                            }
                        }
                );

        process.print();

        env.execute();
    }
}

效果:

在这里插入图片描述

在这里插入图片描述

4、增量聚合函数搭配全窗口函数

可以看出,增量和全窗口各有好处:

  • 增量聚合下,来一条计算一条,只存储中间计算结果,占用空间少
  • 全窗口函数则是可以通过上下文对象来实现灵活的功能

像同时拥有两者的优点,可以调用aggregate方法的另一个重载方法:

在这里插入图片描述

// ReduceFunction与WindowFunction结合
public <R> SingleOutputStreamOperator<R> reduce(
        ReduceFunction<T> reduceFunction,WindowFunction<TRKW> function) 

// ReduceFunction与ProcessWindowFunction结合
public <R> SingleOutputStreamOperator<R> reduce(
        ReduceFunction<T> reduceFunction,ProcessWindowFunction<TRKW> function)

// AggregateFunction与WindowFunction结合
public <ACCVR> SingleOutputStreamOperator<R> aggregate(
        AggregateFunction<TACCV> aggFunction,WindowFunction<VRKW> windowFunction)

// AggregateFunction与ProcessWindowFunction结合
public <ACCVR> SingleOutputStreamOperator<R> aggregate(
        AggregateFunction<TACCV> aggFunction,
        ProcessWindowFunction<VRKW> windowFunction)

此时:

  • 基于第一个参数,即增量聚合函数,来处理数据,来一条聚合一条
  • 窗口触发后,调用第二个参数的处理逻辑,此时,把增量聚合的结果(只有一条数据)再传递给全窗口函数,也就是说全窗口的Iterable<> elements,长度为1,注意全窗口不再缓存所有数据
  • 经过全窗口,执行处理和包装,再输出
public class WindowAggregateAndProcessDemo {

    public static void main(String[] args) throws Exception {
    
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        
        env.setParallelism(1);
        
        SingleOutputStreamOperator<WaterSensor> sensorDS = env
                .socketTextStream("node01", 9527)
                .map(new WaterSensorMapFunction());

        KeyedStream<WaterSensor, String> sensorKS = sensorDS.keyBy(sensor -> sensor.getId());

        // 1. 窗口分配器
        WindowedStream<WaterSensor, String, TimeWindow> sensorWS = sensorKS.window(TumblingProcessingTimeWindows.of(Time.seconds(10)));

		//sensorWS.reduce()   //也可以传两个

        SingleOutputStreamOperator<String> result = sensorWS.aggregate(
                new MyAgg(),
                new MyProcess()
        );

        result.print();

        env.execute();
    }
    
}

public  class MyAgg implements AggregateFunction<WaterSensor, Integer, String>{

        @Override
        public Integer createAccumulator() {
            System.out.println("创建累加器");
            return 0;
        }


        @Override
        public Integer add(WaterSensor value, Integer accumulator) {
            System.out.println("调用add方法,value="+value);
            return accumulator + value.getVc();
        }

        @Override
        public String getResult(Integer accumulator) {
            System.out.println("调用getResult方法");
            return accumulator.toString();
        }

        @Override
        public Integer merge(Integer a, Integer b) {
            System.out.println("调用merge方法");
            return null;
        }
    }

// 全窗口函数的输入类型 = 增量聚合函数的输出类型
public  class MyProcess extends ProcessWindowFunction<String,String,String,TimeWindow>{

    @Override
    public void process(String s, Context context, Iterable<String> elements, Collector<String> out) throws Exception {
        long startTs = context.window().getStart();
        long endTs = context.window().getEnd();
        String windowStart = DateFormatUtils.format(startTs, "yyyy-MM-dd HH:mm:ss.SSS");
        String windowEnd = DateFormatUtils.format(endTs, "yyyy-MM-dd HH:mm:ss.SSS");

        long count = elements.spliterator().estimateSize();

        out.collect("key=" + s + "的窗口[" + windowStart + "," + windowEnd + ")包含" + count + "条数据===>" + elements.toString());

    }
}

注意,二者搭配时,根据前面分析,可以知道,必有:增量聚合函数的输出类型 = 全窗口函数的输入类型

5、会话窗口动态获取间隔值

到此,窗口API需要的窗口分配器(见上一篇)和窗口函数都已整理完。上面demo中用的窗口分配器都是滚动窗口,但应该有以下这些:

  • 时间滚动窗口
  • 时间滑动窗口
  • 时间会话窗口
  • 计数滚动窗口
  • 计数滑动窗口

这里再记录下时间会话窗口+动态获取会话间隔:

public class WindowSessionDemo {

    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        
        env.setParallelism(1);

        SingleOutputStreamOperator<WaterSensor> sensorDS = env
                .socketTextStream("node01", 9527)
                .map(new WaterSensorMapFunction());
                
        KeyedStream<WaterSensor, String> sensorKS = sensorDS.keyBy(sensor -> sensor.getId());

        // 1. 窗口分配器
        WindowedStream<WaterSensor, String, TimeWindow> sensorWS = sensorKS.window(ProcessingTimeSessionWindows.withDynamicGap(t -> t.getTs() * 1000L));

        SingleOutputStreamOperator<String> process = sensorWS
                .process(
                        new ProcessWindowFunction<WaterSensor, String, String, TimeWindow>() {
           
                            @Override
                            public void process(String s, Context context, Iterable<WaterSensor> elements, Collector<String> out) throws Exception {
                                long count = elements.spliterator().estimateSize();
                                long windowStartTs = context.window().getStart();
                                long windowEndTs = context.window().getEnd();
                                String windowStart = DateFormatUtils.format(windowStartTs, "yyyy-MM-dd HH:mm:ss.SSS");
                                String windowEnd = DateFormatUtils.format(windowEndTs, "yyyy-MM-dd HH:mm:ss.SSS");

                                out.collect("key=" + s + "的窗口[" + windowStart + "," + windowEnd + ")包含" + count + "条数据===>" + elements.toString());
                            }
                        }
                );

        process.print();

        env.execute();
    }
}

来一条数据,根据这条数据获取一个值做为会话间隔,到达这个间隔前,下条数据到来了,则会话间隔又成了另一个值,动态的。运行:

在这里插入图片描述

可以看到,会话间隔动态获取,到达间隔时下条数据还没来,则结束本窗户,窗口口结束时触发才调用一次process,和分析的一致。最后补充一点,展开demo代码里的Lambda表达式,其实是一个抓取会话间隔的方法,定义了会话窗口间隔的获取逻辑。

在这里插入图片描述

再贴个计数滑动窗口:

在这里插入图片描述

6、触发器和移除器

触发器主要是用来控制窗口什么时候触发计算,即什么时候执行窗口函数

//基于WindowedStream调用.trigger()方法,就可以传入一个自定义的窗口触发器(Trigger)
stream.keyBy(...)
       .window(...)
       .trigger(new MyTrigger())

移除器主要用来定义移除某些数据的逻辑

基于WindowedStream调用.evictor()方法,就可以传入一个自定义的移除器(Evictor)。Evictor是一个接口,不同的窗口类型都有各自预实现的移除器。

stream.keyBy(...)
       .window(...)
       .evictor(new MyEvictor())

Flink提供的几个窗口,比如滑动、滚动等,都有对触发器和移除器的默认实现,不用自定义。

7、补充

窗口的划分:

  • 窗口开始时间start是窗口长度的整数倍,向下取整

在这里插入图片描述
在这里插入图片描述

  • 窗口结束时间是start+窗口长度

在这里插入图片描述
在这里插入图片描述

  • 窗口是左闭右开,因为属于本窗口的最大时间戳为end-1

在这里插入图片描述

  • 窗口的生命周期,创建是属于本窗口的第一条数据来的时候,现new的,放入一个singleton单例的集合中
  • 窗口的销毁是时间的进展 >= 窗口的最大时间戳(end-1ms) + 允许迟到的时间(默认0)
  • 窗口什么时候触发输出:当时间进展 >= 窗口的最大时间戳(end -1ms)

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1129255.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

【31】c++设计模式——>模板方法模式

模板方法模式通常由以下几个部分组成&#xff1a; 1.抽象基类&#xff08;Abstract Base Class&#xff09;&#xff1a;抽象基类定义了一个算法的骨架&#xff0c;其中包含了模板方法和一些基本操作方法。模板方法在抽象基类中被声明为虚函数&#xff0c;它定义了算法的流程&…

html网页多个div鼠标移动自动排列实例

程序示例精选 html网页多个div鼠标移动自动排列实例 如需安装运行环境或远程调试&#xff0c;见文章底部个人QQ名片&#xff0c;由专业技术人员远程协助&#xff01; 前言 这篇博客针对《html网页多个div鼠标移动自动排列实例》编写代码&#xff0c;代码整洁&#xff0c;规则…

【数据集】1980-2020年(5年)土地利用分类数据-中国科学院

土地利用/覆被变化是自然客观条件和人类社会经济活动综合作用的结果&#xff0c;其形成与演变过程在受到地理自然因素制约的同时&#xff0c;也越来越多的受到人类改造利用行为的影响。伴随城市化进展&#xff0c;土地供需矛盾日益凸显&#xff0c;土地利用已经成为城市发展的重…

[moeCTF 2023] crypto

这个比赛从8月到10月&#xff0c;漫长又不分段。结束了以后前边的都基本上忘光了。还是分段提交的好点&#xff0c;有机会写写。不过反正也是新生赛&#xff0c;又不是新生只是打个热闹。 ezrot 厨子解决大部分问题 可可的新围墙 给了1个串 mt3_hsTal3yGnM_p3jocfFn3cp3_hFs…

[c语言]深入返回值为函数指针的函数

之前写过个好玩代码 c语言返回值为函数指针的函数 一、发现 #include<stdio.h>int (*drink(void)) (void) {static int i;i;printf("(%d)\n", i);return (int(*)(void))drink; }int main() {drink()();return 0; }这个代码定义了一个返回值为函数指针的函数&…

Python基础入门例程11-NP11 单词的长度

目录 描述 输入描述&#xff1a; 输出描述&#xff1a; 示例1 解答&#xff1a; 说明&#xff1a; 描述 牛妹正在学英语&#xff0c;但是背单词实在是太痛苦了&#xff0c;她想让你帮她写一个小程序&#xff0c;能够根据输入的单词&#xff0c;快速得到单词的长度。 输…

测试C#调用Windows Media Player组件

新建基于.net framework的Winform项目&#xff0c;可以通过添加引用的方式选择COM组件中的Windows Media Player组件&#xff0c;如下图所示&#xff1a;   也可以在VS2022的工具箱空白处点右键&#xff0c;选择“选择项…”菜单。   在弹出的选择工具箱项窗口中&#xf…

多线程环境下的原子性问题

什么是原子性呢? 在数据库事务的ACID特性中就有原子性&#xff0c;它是指当前操作中包含的多个数据库事务操作&#xff0c;要么全部成功&#xff0c;要么全部失败&#xff0c;不允许存在部分成功、部分失败的情况。而在多线程中的原子性与数据库事务的原子性相同&…

WPS中图的自动编号及引用

WPS中图的自动编号及引用 图的自动编号图编号的引用图编号及引用的更新 图的自动编号 将光标放置在需要插入编号的位置点击“引用”→“题注”&#xff1a; 点击“引用”→“题注”&#xff1a; 点击“编号”&#xff0c;设置图的编号格式&#xff0c;可勾选“包含章节编号”&…

【RTOS学习】信号量 | 互斥量 | 递归锁

&#x1f431;作者&#xff1a;一只大喵咪1201 &#x1f431;专栏&#xff1a;《RTOS学习》 &#x1f525;格言&#xff1a;你只管努力&#xff0c;剩下的交给时间&#xff01; 信号量 | 互斥量 | 递归锁 &#x1f37a;信号量&#x1f964;原理&#x1f964;使用信号量的函数&…

Java反射获取内部类方法

Java反射获取内部类方法 结论一、案例准备二、测试方法&#xff1a;使用反射获取类的成员内部类和方法具体操作具体操作&#xff08;使用getDeclaredClasses&#xff09; 结论 Java 通过反射可以获得内部类&#xff0c;包括内部类属性信息和方法。 一、案例准备 创建了一个类…

1024,向着“顶尖程序员“迈进

10月24日&#xff0c;对每个程序员而言&#xff0c;都是一个具有特殊意义的日子。1024这个数字&#xff0c;不再只是计算机存储容量的基础单位&#xff0c;更是我们向着技术巅峰进发的象征。 回顾我的程序员之路&#xff0c;那是一个不断学习、不断成长的过程。起初是对编程充…

『第二章』这只燕子很特别:Swift 特性

在本篇博文中,您将学到如下内容: 1. Swift 语言概览2. Objective-C “练废了”&#xff0c;重新写一门新语言吧&#xff01;3. Swift 的“习性”与优势3.1. Swift 更简洁、更易于阅读、所需代码更少3.2. Swift 更加安全3.3. Swift 内存管理更加统一3.4. Swift 更快3.5. Swift 会…

Redis主从模式(二)---拓扑结构及复制过程

目录 一, Redis主从模式下的复制拓扑结构 1.1 一主一从结构 1.2 一主多从结构 1.3 树形主从结构 二, 主从复制过程 2.1 主从复制建立复制流程图 2.2 数据同步(psyc) 1.replicationid/replid (复制id) 2.offset(偏移量) 2.3 psync运行流程 2.4 全量复制 2.5 部分复制…

Opencv-图像插值与LUT查找表

图像像素的比较 白色是255&#xff0c;黑色是0 min(InputArray src1,InputArray src2,OutputArray dst) max(InputArray src1,InputArray src2,OutpurArray dstsrc1:第一个图像矩阵&#xff0c;通道数任意src2&#xff1a;第二个图像矩阵&#xff0c;尺寸和通道数以及数据类型…

【C++面向对象】5. this指针

文章目录 【 1. 基本原理 】【 2. 实例 】 【 1. 基本原理 】 在 C 中&#xff0c;只有成员函数才有 this 指针&#xff08;友元函数没有 this 指针&#xff0c;因为友元不是类的成员&#xff09;&#xff0c;this 指针是所有成员函数的隐含参数。 在成员函数内部&#xff0c;…

用*画田字形状,numpy和字符串格式化都可以胜任

numpy的字符型元素矩阵&#xff0c;可以方便画&#xff1b;直接python字符串手撕&#xff0c;也可以轻巧完成。 (本笔记适合熟悉循环和列表的 coder 翻阅) 【学习的细节是欢悦的历程】 Python 官网&#xff1a;https://www.python.org/ Free&#xff1a;大咖免费“圣经”教程《…

【WinForm详细教程一】WinForm中的窗体、Label、TextBox及Button控件、RadioButton和CheckBox、ListBox

文章目录 1.WinForm文件结构2. 窗体的常用属性、方法与事件2.1 常用属性&#xff08;可直接在属性中设置&#xff09;2.2 常用方法2.3 常用事件 3.Label、TextBox及Button控件4.RadioButton和CheckBox5.ListBox&#xff08;列表框&#xff09; 1.WinForm文件结构 .sln文件 &am…

IEEE754 标准存储浮点数

1. IEEE754 标准简介 IEEE754 标准是一种用于浮点数表示和运算的标准&#xff0c;由国际电工委员会&#xff08;IEEE&#xff09;制定。它定义了浮点数的编码格式、舍入规则以及基本的算术运算规则&#xff0c;旨在提供一种可移植性和一致性的方式来表示和处理浮点数 IEEE754 …

浅谈电力电容器的故障处理及选型

安科瑞 华楠 【摘要】常见的电力电容器都是为了改善电力系统的电压质量和提高输电线路的输电能力&#xff0c;它们在减少系统功率损耗、提高功率因数、降低运行电流、提升电网电压、释放变压器使用裕度等方面有着显著效果。按电压等级可以划分高、低压两部分。虽然它们可以起着…