【Flink】各种窗口的使用(处理时间窗口、事件时间窗口、窗口聚合窗口)

news2025/1/25 9:10:49

文章目录

  • 一 Flink 中的 Window
    • 1 Window
      • (1)Window概述
      • (2) Window类型
        • a 滚动窗口(Tumbling Windows)
        • b 滑动窗口(Sliding Windows)
        • c 会话窗口(Session Windows)
    • 2 Window API
      • (1)处理时间窗口
        • a 滚动窗口
        • b 滑动窗口
        • c 会话窗口
      • (2)事件时间窗口
        • a 滚动窗口
        • b 滑动窗口
        • c 会话窗口
      • (3)窗口聚合函数
        • a 全窗口聚合函数
        • b 增量聚合函数
        • c 增量聚合和全窗口聚合结合使用 【推荐】
      • (4)使用KeyedProcessFunction模拟滚动窗口
      • (5)其他可选API
      • (6)基于 Key 的窗口
      • (7)不分流直接开窗口
    • 3 总结

一 Flink 中的 Window

1 Window

(1)Window概述

在这里插入图片描述

streaming流式计算是一种被设计用于处理无限数据集的数据处理引擎,而无限数据集是指一种不断增长的本质上无限的数据集,而window是一种切割无限数据为有限块进行处理的手段。

Window是无限数据流处理的核心,Window将一个无限的stream拆分成有限大小的”buckets”桶,我们可以在这些桶上做计算操作。

(2) Window类型

Window可以分成两类:

  • CountWindow:计数窗口,按照指定的数据条数生成一个Window,与时间无关。

  • TimeWindow:时间窗口,按照时间生成Window。

    对于TimeWindow,可以根据窗口实现原理的不同分成三类:

    • 滚动窗口(Tumbling Window):是滑动窗口的特殊形式,是窗口大小和滑动距离相同的滑动窗口。
    • 滑动窗口(Sliding Window):窗口间可以重叠。
    • 会话窗口(Session Window):只有 Flink 支持。

a 滚动窗口(Tumbling Windows)

将数据依据固定的窗口长度对数据进行切片。

特点:时间对齐,窗口长度固定,没有重叠。

滚动窗口分配器将每个元素分配到一个指定窗口大小的窗口中,滚动窗口有一个固定的大小,并且不会出现重叠。例如:如果指定了一个5分钟大小的滚动窗口,窗口的创建如下图所示:

注意:先分流再开窗,在每个支流上开窗口,每个支流上的窗口之间没有关系。

在这里插入图片描述

适用场景:适合做BI统计等(做每个时间段的聚合计算)。

b 滑动窗口(Sliding Windows)

滑动窗口是固定窗口的更广义的一种形式,滑动窗口由固定的窗口长度和滑动间隔组成。

特点:时间对齐,窗口长度固定,可以有重叠。

滑动窗口分配器将元素分配到固定长度的窗口中,与滚动窗口类似,窗口的大小由窗口大小参数来配置,另一个窗口滑动参数控制滑动窗口开始的频率。因此,滑动窗口如果滑动参数小于窗口大小的话,窗口是可以重叠的,在这种情况下元素会被分配到多个窗口中。

在Flink底层,其将处于不同窗口中的元素进行复制处理,然后分发到不同窗口中,所以需要合理分配窗口大小和滑动距离,若窗口大小为一天,滑动距离为一秒,同一份数据可能存在于几千个窗口中,内存可能会崩溃。

例如假如有10分钟的窗口和5分钟的滑动,那么每个窗口中5分钟的窗口里包含着上个10分钟产生的数据,如下图所示:

在这里插入图片描述

适用场景:对最近一个时间段内的数据进行统计(求某接口最近5min的失败率来决定是否要报警)。

c 会话窗口(Session Windows)

由一系列事件组合一个指定时间长度的timeout间隙组成,类似于web应用的session,也就是一段时间没有接收到新数据就会生成新的窗口。

特点:时间无对齐。

session窗口分配器通过session活动来对元素进行分组,session窗口跟滚动窗口和滑动窗口相比,不会有重叠和固定的开始时间和结束时间的情况,相反,当它在一个固定的时间周期内不再收到元素,即非活动间隔产生,那个这个窗口就会关闭。

一个session窗口通过一个session间隔来配置,这个session间隔定义了非活跃周期的长度,当这个非活跃周期产生,那么当前的session将关闭并且后续的元素将被分配到新的session窗口中去。

只有 Flink 支持会话窗口。

在某些场景下,使用会话窗口可以更加精确的刻画用户行为,如刷抖音。

在这里插入图片描述

2 Window API

窗口分配器——window() 方法

可以用.window() 来定义一个窗口,然后基于这个window 去做一些聚合或者其它处理操作。

(1)处理时间窗口

a 滚动窗口

.window(TumblingProcessingTimeWindows.of(Time.seconds(5))):默认开的第一个窗口是在1970-01-01 00:00:00 - 1970-01-01 00:00:05,在这种情况下窗口不会开在3s - 8s时间段。

使用处理时间窗口中的滚动窗口实现求每个用户的pv:

public static void main(String[] args) throws Exception{
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    env.setParallelism(1);

    env
            .addSource(new ClickSource())
            .keyBy(r -> r.user)
            .window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
            .process(new WindowResult())
            .print();

    env.execute();
}

// 泛型依次为:输入的泛型,输出的泛型,key的泛型,窗口的泛型
public static class WindowResult extends ProcessWindowFunction<Event,String,String, TimeWindow>{
    // 在窗口关闭的时候,触发调用,只会调用一次
    // 对于机器时间,运行到窗口结束时间就会关闭窗口
    @Override
    // 迭代器参数中包含了窗口中的全部元素
    public void process(String key, Context context, Iterable<Event> iterable, Collector<String> collector) throws Exception {
        long windowStart = context.window().getStart();
        long windowEnd = context.window().getEnd();
        // 获取迭代器里面的元素个数
        long count = iterable.spliterator().getExactSizeIfKnown();
        collector.collect("用户【" + key +"】在窗口" + new Timestamp(windowStart) +
                " -- " + new Timestamp(windowEnd) + "中的pv次数是" + count);
    }
}

b 滑动窗口

.window(SlidingProcessingTimeWindows.of(Time.seconds(10), Time.seconds(5)))

c 会话窗口

.window(ProcessingTimeSessionWindows.withGap(Time.seconds(10)))

(2)事件时间窗口

a 滚动窗口

.window(TumblingEventTimeWindows.of(Time.seconds(5)))

b 滑动窗口

.window(SlidingEventTimeWindows.of(Time.seconds(10),Time.seconds(5)))

c 会话窗口

.window(EventTimeSessionWindows.withGap(Time.seconds(10)))

(3)窗口聚合函数

窗口聚合函数定义了要对窗口中收集的数据做的计算操作可以分为两类

  • 全窗口聚合函数。
  • 增量聚合函数。

a 全窗口聚合函数

先把窗口所有数据收集起来,等到计算的时候会遍历所有数据,ProcessWindowFunction,优点在于可以访问窗口信息,如窗口的开始时间,结束时间,但问题是其需要将窗口里面的所有数据收集起来再做计算,对存储系统来说是比较大的压力。

2 (1)中用户的pv实现使用的就是就是全窗口聚合函数。

在这里插入图片描述

b 增量聚合函数

  • 每条数据到来就进行计算,只保存一个简单的状态(累加器),极大的节省内存,缺点就是无法访问窗口的信息。窗口中的所有元素就是一个状态,在底层使用列表状态变量实现。
  • 增量聚合函数有 ReduceFunction 和 AggregateFunction,AggregateFunction比前者更加底层,更加灵活。
  • 当窗口闭合的时候,增量聚合完成。
  • 处理时间:当机器时间超过窗口结束时间的时候,窗口闭合。
  • 来一条数据计算一次。

在这里插入图片描述

使用增量聚合函数实现每个窗口的pv:

public static void main(String[] args) throws Exception{
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    env.setParallelism(1);

    env
            .addSource(new day02.Example1.ClickSource())
            .keyBy(r -> r.user)
            .window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
            .aggregate(new CountAgg())
            .print();

    env.execute();
}

public static class CountAgg implements AggregateFunction<day02.Example1.Event,Integer,Integer>{

    // 创建累加器
    @Override
    public Integer createAccumulator() {
        return 0;
    }

    // 定义累加规则,将返回值作为新的累加器
    @Override
    public Integer add(day02.Example1.Event value, Integer accumulator) {
        return accumulator + 1;
    }

    // 在窗口关闭时,返回结果
    @Override
    public Integer getResult(Integer accumulator) {
        return accumulator;
    }

    // 实现窗口的合并操作
    @Override
    public Integer merge(Integer a, Integer b) {
        return null;
    }
}

c 增量聚合和全窗口聚合结合使用 【推荐】

  • 全窗口聚合函数的作用就是给增量聚合函数包裹一层窗口的信息。
  • 不需要收集窗口中的所有元素,只需要维护一个累加器,节省内存。

在这里插入图片描述

当窗口闭合时,增量聚合函数会将其输出发送给全窗口聚合函数,这时全窗口聚合函数输入的泛型变成了增量函数输出的类型integer,见以下代码:

public static void main(String[] args) throws Exception{
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        env
                .addSource(new day02.Example1.ClickSource())
                .keyBy(r -> r.user)
                .window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
                .aggregate(new CountAgg(),new WindowResult())
                .print();

        env.execute();
    }

    public static class WindowResult extends ProcessWindowFunction<Integer,String,String, TimeWindow>{
        @Override
        public void process(String key, Context context, Iterable<Integer> iterable, Collector<String> collector) throws Exception {
            // 迭代器参数中只包含一个元素,就是增量聚合函数发送过来的聚合结果
            long windowStart = context.window().getStart();
            long windowEnd = context.window().getEnd();
            // 获取迭代器里面的元素个数
            long count = iterable.iterator().next();
            collector.collect("用户【" + key +"】在窗口" + new Timestamp(windowStart) +
                    " -- " + new Timestamp(windowEnd) + "中的pv次数是" + count);
        }
    }

(4)使用KeyedProcessFunction模拟滚动窗口

模拟5s的滚动窗口,模拟的是增量聚合函数和全窗口聚合函数结合使用的情况。

注意:滚动窗口为左闭右开,使用哈希表模拟窗口。

public static void main(String[] args) throws Exception{
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    env.setParallelism(1);

    env
            .addSource(new Example1.ClickSource())
            .keyBy(r -> r.user)
            .process(new FakeWindow())
            .print();

    env.execute();
}

public static class FakeWindow extends KeyedProcessFunction<String, Example1.Event,String>{
    // key是窗口的开始时间,value是窗口中的pv值,也是一个累加器
    private MapState<Long,Integer> mapState;
    // 窗口大小
    private Long windowSize = 5000L;

    @Override
    public void open(Configuration parameters) throws Exception {
        super.open(parameters);
        mapState = getRuntimeContext().getMapState(new MapStateDescriptor<Long, Integer>("windowStare-pvCount", Types.LONG,Types.INT));
    }

    @Override
    public void processElement(Example1.Event event, Context context, Collector<String> collector) throws Exception {
        // 计算当前元素所属的窗口开始时间
        long currTime = context.timerService().currentProcessingTime();
        long windowStart = currTime - currTime % windowSize;
        long windowEnd = windowStart + windowSize;

        if (mapState.contains(windowStart)){
            mapState.put(windowStart,mapState.get(windowStart) + 1);
        }else{
            mapState.put(windowStart,1);
        }
        // 注册一个窗口结束时间前一毫秒的定时器(左闭右开),用来触发窗口函数的计算
        // 同一个时间戳只能定义一个定时器,即同一个窗口中,只有一个定时器
        context.timerService().registerProcessingTimeTimer(windowEnd - 1L);
    }

    @Override
    public void onTimer(long timestamp, OnTimerContext ctx, Collector<String> out) throws Exception {
        super.onTimer(timestamp, ctx, out);
        long windowEnd = timestamp + 1L;
        long windowStart = windowEnd - windowSize;
        int count = mapState.get(windowStart);
        // 向下游发送数据
        out.collect("用户【" + ctx.getCurrentKey() +"】在窗口" + new Timestamp(windowStart) +
                " -- " + new Timestamp(windowEnd) + "中的pv次数是" + count);
        mapState.remove(windowStart);
    }
}

总结:在keyBy以后针对不同的key又新开了一个mapState,其作用域范围就是当前的key,同时,(3)c中的增量聚合初始化了一个累加器,这个累加器在模拟窗口实现时等于MapState的value;如果想要模拟一个全窗口聚合,只需要将value由Integer换成列表,将所有元素存储下来。

在生产环境中不太可能用到此种写法。

(5)其他可选API

.trigger() ——触发器 定义窗口什么时候关闭,触发计算并输出结果
.evictor() ——移除器 定义移除某些数据的逻辑
.allowedLateness() ——允许处理迟到的数据
.sideOutputLateData() ——将迟到的数据放入侧输出流
.getSideOutput() ——获取侧输出流

(6)基于 Key 的窗口

stream
	.keyBy(...) <- keyed versus non-keyed windows
	.window(...) <- required: "assigner"
	[.trigger(...)] <- optional: "trigger" (else default trigger)
	[.evictor(...)] <- optional: "evictor" (else no evictor)
	[.allowedLateness(...)] <- optional: "lateness" (else zero)
	[.sideOutputLateData(...)] <- optional: "output tag" (else no side output for late data)
	.reduce/aggregate/fold/apply() <- required: "function"
	[.getSideOutput(...)] <- optional: "output tag"

(7)不分流直接开窗口

stream
	.windowAll(...) <- required: "assigner"
	[.trigger(...)] <- optional: "trigger" (else default trigger)
	[.evictor(...)] <- optional: "evictor" (else no evictor)
	[.allowedLateness(...)] <- optional: "lateness" (else zero)
	[.sideOutputLateData(...)] <- optional: "output tag" (else no side output for late data)
	.reduce/aggregate/fold/apply() <- required: "function"
	[.getSideOutput(...)] <- optional: "output tag"

其中,windowAll的底层实现就是先将流存放到同一个插槽中,再开窗.keyBy(r -> true)

3 总结

目前已经使用了Flink 提供的 4 个 Process Function:

  • ProcessFunction:不分流直接聚合,不可以使用定时器和状态变量,因为其没有经过keyBy,定时器和状态变量都是针对key的,只有processElement。
  • KeyedProcessFunction:分流以后聚合。
  • ProcessWindowFunction:分流后再开窗。
  • ProcessAllWindowFunction:不分流,直接开窗。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/39995.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

ATJ2157内存篇【炬芯音频芯片】---sct语法

ATJ2157 sct语法公共知识篇BNF 简介Sct脚本Sct的作用Sct的语法规则1. 加载域描述(Loadd region descriptions)2. 执行域描述3. 输入节的描述ATJ2157平台使用的sctRO的等效写法ScatterAssert()函数LoadLength()函数LoadBase()函数ImageLimit()函数ATJ2157平台什么数据编译出来是…

CentOS 7.6上安装SqlServer2017

一、 安装 SQL Server 1、 安装 SQL Server 所需的python2 sudo alternatives --config python # If not configured, install python2 and openssl10 using the following commands: sudo yum install python2 sudo yum install compat-openssl10 # Configure python2 a…

Python自动化小技巧12——根据论文题目自动导出参考文献格式

案例背景 在写论文的时候&#xff0c;弄参考文献格式也很麻烦&#xff0c;不可能手打人名题目期刊名称年月日卷号页码这些&#xff0c;我们一般都是使用系统自动导出的格式复制粘贴就行。中国知网可以直接导出论文的格式&#xff0c;但是知网基本只有中文的论文&#xff0c;英…

pdf编辑器工具哪个好?好用的pdf编辑器一款就够!

pdf这类办公软件大家都很熟悉&#xff0c;不过pdf通常情况只能看不能编辑&#xff0c;这着实也很让人苦恼&#xff01;特别是现在国内大多都已居家办公&#xff0c;本来就颇多不便&#xff0c;如果没有一款好用的pdf编辑器工具&#xff0c;那么势必导致工作效率更为低下。 那么…

第十二章 哈希表与字符串哈希

第十二章 哈希表与字符串哈希一、哈希表1、什么是哈希表2、算法逻辑&#xff08;1&#xff09;哈希函数&#xff08;2&#xff09;冲突解决3、算法模板二、字符串哈希1、算法逻辑2、算法用途3、算法模板一、哈希表 1、什么是哈希表 在之前的文章中&#xff0c;我们学习过离散…

Spring-aop技术

前言 spring-aop技术是对oop(面向对象)的一个补充&#xff0c;其底层其实就是使用aspect动态代理进行实现的&#xff0c;本篇文章将大概讨论下aop的核心实现流程 相关的核心概念 刚开始&#xff0c;先介绍下aop中比较核心的一些对象和概念&#xff0c;只要理解了这些&#xff…

【通信】粒子群算法5G物联网云网络优化【含Matlab源码 2160期】

⛄一、简介 1 引言 5G技术被大众所熟知之后&#xff0c;边缘计算也成了各行业关注的重点。最初的边缘计算概念是在2014年提出&#xff0c;到了2016年就拓展到了接入边缘&#xff0c;目前基本被定义为靠近用户边缘的、包含多种技术的接入网络&#xff0c;能够提供比较稳定的IT业…

精华推荐 | 深入浅出学习透析Nginx服务器的基本原理和配置指南「Keepalive性能优化实战篇」

Linux系统&#xff1a;Centos 7 x64Nginx版本&#xff1a;1.11.5 Nginx 是一款面向性能设计的 HTTP 服务器&#xff0c;能反向代理 HTTP&#xff0c;HTTPS 和邮件相关(SMTP&#xff0c;POP3&#xff0c;IMAP)的协议链接。并且提供了负载均衡以及 HTTP 缓存。它的设计充分使用异…

拼搏一周!刷了1000道Java高频面试题喜提阿里offer,定级P7

今年较往年相比面试要难的多&#xff0c;大环境也是对于程序员的要求越来越高&#xff0c;环境是我们无法改变的&#xff0c;我们能改变的只有自己&#xff0c;月初我一好友&#xff0c;努力拼搏一周&#xff0c;刷完了这份阿里P8大牛整理的这1000道Java高频面试题笔记&#xf…

GitHub配置SSH Keys步骤

Git配置SSH Keys步骤 许多 Git 服务器都使用 SSH 公钥进行认证。 为了向 Git 服务器提供 SSH 公钥&#xff0c;如果某系统用户尚未拥有密钥&#xff0c;必须事先为其生成一份。 生成步骤如下&#xff1a; 1. 设置用户名和邮箱 在git命令行中对git进行全局设置 git config --…

八、CANdelaStudio入门-Session

本专栏将由浅入深的展开诊断实际开发与测试的数据库编辑,包含大量实际开发过程中的步骤、使用技巧与少量对Autosar标准的解读。希望能对大家有所帮助,与大家共同成长,早日成为一名车载诊断、通信全栈工程师。 本文介绍CANdelaStudio的Session概念,欢迎各位朋友订阅、评论,…

微信小程序:用户基本信息的采集

写作背景 在开发商城小程序时需要显示用户头像、昵称、手机号等信息以便后续业务的实现&#xff0c;因此需要通过微信小程序的API采集用户数据&#xff0c;由此进行总结。 在微信小程序中获取用户信息可以通过这几种方式获取&#xff0c;getUserInfo、getUserProfile、open-da…

基于多目标遗传算法的IEEE14节点系统分布式电源选址定容matlab程序

基于多目标遗传算法的IEEE14节点系统分布式电源选址定容matlab程序 摘 要: 为更好地解决分布式电源选址定容问题&#xff0c;提出一种改进的多目标遗传算法。之后&#xff0c;考虑投资成本、网损以及电压稳定性三因素建立了一个三目标的数学模型&#xff0c;并采用上述多目标遗…

javaSE -运算符,注释,关键字(复习)

一、运算符 1.1、算术运算符 基本四则运算符 - * / %规则比较简单, 值得注意的是除法和取模 1.1.1、/ 除法 int / int 结果还是 int, 需要使用 double 来计算 public static void main(String[] args) {int a 1;int b 2;System.out.println(a / b);}要得到小数那就要使…

python>>numpy包

章节内容 什么是NumPy模块和NumPy数组 创建数组 基本数据类型 数据可视化 索引和切片 副本和视图 目录 什么是NumPy模块和NumPy数组&#xff1f; 创建数组 基本数据类型 数据可视化 索引和切片 副本和视图 什么是NumPy模块和NumPy数组&#xff1f; NumPy数组 python对象 …

pyhon项目中,使用pip安装第三方插件之后,明明使用pip list可以查到,但是在项目中import时仍然找不到怎么办?

认识pip&#xff1a;python中的pip是用来安装python第三方库的工具&#xff0c;是安装python的时候自带的。 1.安装方式&#xff1a;pip install 第三方库名&#xff0c;比如&#xff1a;pip install selenium 2.查看已安装的所有第三方库&#xff1a;pip list 或 pip3 list &…

Spring Cloud OpenFeign - - - > 日志级别配置

项目源码地址&#xff1a;https://download.csdn.net/download/weixin_42950079/87168704 OpenFeign 有 4 种日志级别&#xff1a; NONE: 不记录任何日志&#xff0c;是OpenFeign默认日志级别&#xff08;性能最佳&#xff0c;适用于生产环境&#xff09;。BASIC: 仅记录请求方…

五魔方、二阶五魔方

五魔方 五魔方是正十二面体魔方&#xff0c;其实和三阶魔方很像&#xff0c;用层先法就能复原&#xff0c;而且公式一模一样。 十二个面分为6个浅色面和6个深色面&#xff0c;所以浅色和深色各有一个中心面。 先复原浅色中心面这一层&#xff1a; 再复原浅色面的5个棱块&…

【GlobalMapper精品教程】030:栅格重采样案例教程(航测DSM)

本文讲解Globalmapper栅格重采样操作方法。数据为配套实验数据包中的data030.rar,航测内业生成的DSM,分辨率为0.04米,现在需要将其重采样为0.05米。 文章目录 一、重采样简介二、重采样操作一、重采样简介 栅格/影像数据进行配准或纠正、投影等几何变换后,像元中心位置通常…

超级记忆节目

一 问题描述 杰克逊被邀请参加电视节目“超强记忆”&#xff0c;参与者会玩一个记忆游戏。主持人先告诉参与者一个数字序列 {A1 , A2 , …, An }&#xff0c;然后对该序列执行一系列操作或查询&#xff1a; ① ADD x y D &#xff0c;表示对子序列 {Ax , …, Ay } 的每个数字…