说说FLINK细粒度滑动窗口如何处理

news2025/1/17 14:11:36

分析&回答

Flink的窗口机制是其底层核心之一,也是高效流处理的关键。Flink窗口分配的基类是WindowAssigner抽象类,下面的类图示出了Flink能够提供的所有窗口类型。

Flink窗口分为滚动(tumbling)、滑动(sliding)和会话(session)窗口三大类,本文要说的是滑动窗口。

下图示出一个典型的统计用户访问的滑动窗口,来自官方文档。

假设每两条虚线之间代表1分钟时间差,那么窗口大小(size)就是2分钟,滑动步长(slide)是1分钟。若时间特征为事件时间,代码如下。

dataStream .keyBy("userId") .window(SlidingEventTimeWindows.of(Time.minutes(2), Time.minutes(1))); 由图可知,当前滑动窗口与上一个滑动窗口会有重叠。在窗口大小size是步长slide的2倍的情况下,(几乎)每个DataStream元素都会处于2个窗口内。

我们简单参考一下相关的Flink源码,以加深理解。以下是窗口算子WindowOperator的processElement()方法的部分源码。

    @Override
    public void processElement(StreamRecord<IN> element) throws Exception {
        final Collection<W> elementWindows = windowAssigner.assignWindows(
            element.getValue(), element.getTimestamp(), windowAssignerContext);
        boolean isSkippedElement = true;
        final K key = this.<K>getKeyedStateBackend().getCurrentKey();
 
        if (windowAssigner instanceof MergingWindowAssigner) {
            // 会话窗口的处理逻辑,略去
        } else {
            for (W window : elementWindows) {
                if (isWindowLate(window)) {
                    continue;
                }
                isSkippedElement = false;
                windowState.setCurrentNamespace(window);
                windowState.add(element.getValue());
 
                triggerContext.key = key;
                triggerContext.window = window;
                TriggerResult triggerResult = triggerContext.onElement(element);
 
                if (triggerResult.isFire()) {
                    ACC contents = windowState.get();
                    if (contents == null) {
                        continue;
                    }
                    emitWindowContents(window, contents);
                }
                if (triggerResult.isPurge()) {
                    windowState.clear();
                }
                registerCleanupTimer(window);
            }
        }
        // 最后是侧输出迟到数据的逻辑,略去
    }
复制代码

该方法先调用WindowAssigner.assignWindows()方法,根据输入元素的时间戳判断它应该属于哪些窗口。接着遍历所有窗口,将该元素加入对应的窗口状态(即缓存)中,并根据触发器返回的TriggerResult决定是输出(fire)还是清除(purge)窗口的内容,emitWindowContents()方法会调用用户函数。最后,还要调用registerCleanupTimer()方法注册计时器用来在窗口彻底过期时清除窗口状态。

以下是SlidingEventTimeWindows.assignWindows()方法的源码。

    @Override
    public Collection<TimeWindow> assignWindows(Object element, long timestamp, WindowAssignerContext context) {
        if (timestamp > Long.MIN_VALUE) {
            List<TimeWindow> windows = new ArrayList<>((int) (size / slide));
            long lastStart = TimeWindow.getWindowStartWithOffset(timestamp, offset, slide);
            for (long start = lastStart;
                 start > timestamp - size;
                 start -= slide) {
                windows.add(new TimeWindow(start, start + size));
            }
            return windows;
        } else {
            throw new RuntimeException("Record has Long.MIN_VALUE timestamp (= no timestamp marker). " +
                "Is the time characteristic set to 'ProcessingTime', or did you forget to call " +
                "'DataStream.assignTimestampsAndWatermarks(...)'?");
        }
    }
 
    public static long getWindowStartWithOffset(long timestamp, long offset, long windowSize) {
        return timestamp - (timestamp - offset + windowSize) % windowSize;
    }
复制代码

这段代码就不难理解了,先调用getWindowStartWithOffset()方法根据元素的时间戳计算出其窗口的起点时间戳,再逐次循环向后滑动,产生size / slide个窗口。我们可以将size / slide叫做“粒度”,亦即上述代码中返回的Collection集合的大小。粒度越大(“细”),滑动窗口之间的重合也越大。

代码读完了,有一个貌似稀松平常的需求:

以3分钟的频率实时计算App内各个子模块近24小时的PV和UV。

直觉上我们需要用粒度为1440 / 3 = 480的滑动窗口来实现它,但是细粒度的滑动窗口会带来性能问题,有两点:

状态 由代码可知,WindowOperator内维护了窗口本身的内部状态windowState(类型为InternalAppendingState)。对于一个元素,会将其写入对应的(key, window)二元组所圈定的状态中。可见,如果粒度为480,那么每个元素到来,更新windowState时都要遍历480个窗口并写入,开销是非常大的。在采用HDFS/RocksDB作为状态后端时,checkpoint的瓶颈也尤其明显。

定时器 在Flink中,定时器的实际实现是TimerHeapInternalTimer类,并且是用Flink自己实现的优先队列维护在堆内存中的。而在WindowOperator中,每一个(key, window)二元组都需要注册两个定时器:一是触发器注册的定时器,用于决定窗口数据何时输出;二是registerCleanupTimer()方法注册的清理定时器,用于在窗口彻底过期(如allowedLateness过期)之后及时清理掉窗口的内部状态。细粒度滑动窗口会造成维护的定时器增多,内存负担加重。

在官方文档Windows最后一节的最后,也有如下的提醒:

Flink creates one copy of each element per window to which it belongs. Given this, tumbling windows keep one copy of each element (an element belongs to exactly one window unless it is dropped late). In contrast, sliding windows create several of each element, as explained in the Window Assigners section. Hence, a sliding window of size 1 day and slide 1 second might not be a good idea.

可能有看官会问:预聚合不能解决细粒度窗口的问题吗?答案是不能。预聚合只是让AggregateFunction/ReduceFunction之后的数据量降低,但是进入WindowOperator的窗口状态的数据还是没变的。换句话说,就算触发器实现为FIRE_AND_PURGE,遍历大量窗口并写入状态的开销也是无法消除的。

扯了这么多,有解决方案吗?

当然是有的,办法总比困难多。我们一般使用 滚动窗口+在线存储+读时聚合 的思路作为workaround。简单来讲就是:

弃用滑动窗口,用长度等于原滑动窗口步长的滚动窗口代替; 每个滚动窗口将其周期内的数据做聚合,打入外部在线存储(内存数据库如Redis,LSM-based NoSQL存储如HBase); 扫描在线存储中对应时间区间(可以灵活指定)的所有行,并将计算结果返回给前端展示。 针对上面的PV/UV问题,如果采用Redis作为在线存储,我们可以将时间戳放在key内,并设定24小时过期时间。用数字字符串存储3分钟周期内的PV量,用HyperLogLog存储3分钟周期内的UV量。近24小时的PV和UV就分别可以通过简单加减和HyperLogLog的pfmerge/pfcount命令得出了。当然,实际操作起来还是要根据需求和服务器资源而定。

喵呜面试助手:一站式解决面试问题,你可以搜索微信小程序 [喵呜面试助手] 或关注 [喵呜刷题] -> 面试助手 免费刷题。如有好的面试知识或技巧期待您的共享!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/951883.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

DWA算法学习

一、DWA概念  DWA(动态窗口法)属于局部路径规划方法&#xff0c;为ROS中主要采用的方法。其原理主要是在速度空间&#xff08;v,w&#xff09;中采样多组速度&#xff0c;并模拟这些速度在一定时间内的运动轨迹&#xff0c;再通过一个评价函数对这些轨迹打分&#xff0c;最优的…

2023年全国职业院校技能大赛网络系统管理赛项 模块B:服务部署 卷II

2023年全国职业院校技能大赛 GZ073网络系统管理赛项 模块B&#xff1a;服务部署 卷II 目 录 一、Windows项目任务描述 1 &#xff08;一&#xff09;拓扑图 1 &#xff08;二&#xff09;网络地址规划 1 二、Windows项目任务清单 2 &#xff08;一&#xff09;服务器IspSrver…

数据库(一) 基础知识

概述 数据库是按照数据结构来组织,存储和管理数据的仓库 数据模型 数据库系统的核心和基础是数据模型&#xff0c;数据模型是严格定义的一组概念的集合。因此数据模型一般由数据结构、数据操作和完整性约束三部分组成。数据模型主要分为三种:层次模型&#xff0c;网状模型和关…

【云原生】Ansible自动化批量操作工具playbook剧本

目录 1.playbook相关知识 1.1 playbook 的简介 1.2 playbook的 各部分组成 2. 基础的playbook剧本编写实例 2.1 playbook编写Apache安装剧本&#xff08;yum方式安装&#xff09; 报错集&#xff1a; 实例2&#xff1a;playbook编写nginx 的yum安装并且能修改其监听端口的…

QT基础教程之四QMainWindow

QT基础教程之四QMainWindow QMainWindow是一个为用户提供主窗口程序的类&#xff0c;包含一个菜单栏&#xff08;menu bar&#xff09;、多个工具栏(tool bars)、多个锚接部件(dock widgets)、一个状态栏(status bar)及一个中心部件(central widget)&#xff0c;是许多应用程序…

13、监测数据采集物联网应用开发步骤(9.2)

监测数据采集物联网应用开发步骤(9.1) TCP/IP Server开发 新建TCP/IP Server线程类com.zxy.tcp.ServerThread.py #! python3 # -*- coding: utf-8 -Created on 2017年05月10日 author: zxyong 13738196011 import socket,threading,time from com.zxy.tcp.TcpServer import …

业务流程与逻辑编排的低代码平台,一文全方位了解它的轻应用信息

JVS低代码开发平台提供了大量的可配置组件和预先集成的功能&#xff0c;开发人员可以通过拖拽和设置属性的方式&#xff0c;快速搭建应用程序的前端界面和交互逻辑。同时&#xff0c;低代码平台也提供了丰富的后端服务和集成能力&#xff0c;可以轻松地与现有的系统和第三方服务…

vulnhub靶机02-Breakout

主机发现 arp-scan -l 扫描端口 nmap --min-rate 10000 -p- 192.168.21.143 扫描端口信息 nmap -sV -sT -O -p80,139,445,10000,20000 192.168.21.143 漏洞扫描 nmap --scriptvuln -p80,139,445,10000,20000 192.168.21.143 先看网站 什么都没有看看f12 找到点好东西 解码…

被遗弃的多重继承

问题 C 是否允许一个类继承自多个父类&#xff1f; C 支持编写多重继承的代码 一个子类可以拥有多个父类 子类拥有所有父类的成员变量 子类继承所有父类的成员函数 子类对象可以当作任意父类对象使用 多重继承的语法规则 多重继承的本质与单继承相同&#xff01; 通过多重…

ChatRWKV 学习笔记和使用指南

0x0. 前言 Receptance Weighted Key Value&#xff08;RWKV&#xff09;是pengbo提出的一个新的语言模型架构&#xff0c;它使用了线性的注意力机制&#xff0c;把Transformer的高效并行训练与RNN的高效推理相结合&#xff0c;使得模型在训练期间可以并行&#xff0c;并在推理…

基于Java的代驾管理系统 springboot+vue,mysql数据库,前台用户、商户+后台管理员,有一万五千字报告,完美运行

基于Java的代驾管理系统 springbootvue&#xff0c;mysql数据库&#xff0c;前台用户、商户后台管理员&#xff0c;有一万五千字报告&#xff0c;完美运行。 系统完美实现用户下单叫车、商户接单、管理员管理系统&#xff0c;页面良好&#xff0c;系统流畅。 各角色功能&#x…

GPT能否辅助数学学习

GPT4.0的数学能力怎么样&#xff1f;我们使用镜像站进行实验&#xff0c;通过不同水平的数学看看GPT4.0的数学能力得到进步没有。镜像站的地址我们放在了最后&#xff0c;各位读者也可以自行去测试。 笔者在ChatGPT镜像站进行测试&#xff0c;我们的实验是让GPT4.0自行出数学题…

记本地新建一个gradle方式springboot项目过程

打算使用gradle在idea新建个springboot项目&#xff0c;然后坑很多&#xff0c;记录一下 原来我的idea应该是社区版&#xff0c;新建项目时候没有可以选择spring相关配置&#xff0c;然后卸载了重装&#xff0c;之前问题是启动是启动起来了&#xff0c;但是状态栏那边一直显示…

招投标系统简介 企业电子招投标采购系统源码之电子招投标系统 —降低企业采购成本

功能模块&#xff1a; 待办消息&#xff0c;招标公告&#xff0c;中标公告&#xff0c;信息发布 描述&#xff1a; 全过程数字化采购管理&#xff0c;打造从供应商管理到采购招投标、采购合同、采购执行的全过程数字化管理。通供应商门户具备内外协同的能力&#xff0c;为外部…

TCP连接分析:探寻TCP的三次握手

文章目录 一、实验背景与目的二、实验需求三、实验解法1. 预先抓包监测使用Wireshark工具2.进行TCP三次握手&#xff0c;访问www.baidu.com3.分析Wireshark捕获的TCP包 摘要&#xff1a; 本实验使用Wireshark工具&#xff0c;通过抓包监测和分析&#xff0c;深入研究了与百度服…

Richtek立锜EPS助力转向系统方案

Richtek立锜EPS助力转向系统方案包含一个集成的位置控制模块&#xff0c;用来接收一个外部系统&#xff08;泊车模块&#xff09;的角度请求以实现控制EPS系统自动转向的功能。外部交互接口用作与外部泊车模块进行CAN通讯&#xff0c;以支持泊车模块的控制状态的切换和输入角度…

nextTick原理

nextTick 是 Vue 提供的一个异步方法&#xff0c;用于在 DOM 更新之后执行回调函数。它的原理是利用 JavaScript 的事件循环机制来实现异步执行。 具体来说&#xff0c;当我们调用 nextTick 方法时&#xff0c;Vue 会将传入的回调函数添加到一个队列中。在下一个事件循环中&am…

Linux Input子系统

一、基本概念 按键、鼠标、键盘、触摸屏等都属于输入(input)设备&#xff0c;Linux 内核为此专门做了一个叫做 input子系统的框架来处理输入事件。本质属于字符设备。 1. input子系统结构如下&#xff1a; input 子系统分为 input 驱动层、input 核心层、input 事件处理层&…

重庆旅游攻略

重庆旅游攻略 白天 鹅岭二厂 &#xff08;1&#xff09;地铁线路&#xff1a; 1号线鹅岭站 川美涂鸦一条街 &#xff08;1&#xff09;地铁线路&#xff1a; 黄桷坪正街 湖广会馆 3. 长江索道 需要预约 门票&#xff1a;单程20元&#xff0c;往返30元 4. 重庆动物园 …

Mysql 事物与存储引擎

MySQL事务 MySQL 事务主要用于处理操作量大&#xff0c;复杂度高的数据。比如说&#xff0c;在人员管理系统中&#xff0c; 要删除一个人员&#xff0c;即需要删除人员的基本资料&#xff0c;又需要删除和该人员相关的信息&#xff0c;如信箱&#xff0c; 文章等等。这样&#…