窗口延时、侧输出流数据处理

news2024/12/28 9:10:51

一 、 AllowedLateness API 延时关闭窗口

AllowedLateness 方法需要基于 WindowedStream 调用。AllowedLateness 需要设置一个延时时间,注意这个时间决定了窗口真正关闭的时间,而且是加上WaterMark的时间,例如 WaterMark的延时时间为2s,AllowedLateness 的时间为2s,那一个10的滚动窗口,0-10这个单位窗口正常的关窗时间应该是超过12s的数据到达之后就关窗。而AllowedLateness 是在12s的基础上继续延长了2s,也就是在14s的时候才真正去关闭 0-10s的窗口,但是在12s的时候会触发窗口计算,从12s之后到14s的数据每到达一个就会触发一次窗口计算。

二 、 OutputTag API 侧输出流

使用 OutputTag API 保证窗口关闭的数据依然可以获取,窗口到达AllowedLateness 时间后将彻底关闭,此时再属于该窗口范围内的数据将会流向 OutputTag 。

       context.collect(new Event("A", "/user", 1000L));
                Thread.sleep(3000);
                context.collect(new Event("B", "/prod", 6500L));
                Thread.sleep(3000);
                context.collect(new Event("C", "/cart", 4000L));
                Thread.sleep(3000);
                context.collect(new Event("D", "/user", 7500L));
                System.out.println("窗口关闭 ~ ");
                Thread.sleep(3000);
                context.collect(new Event("E", "/cente", 8500L));
                Thread.sleep(3000);
                context.collect(new Event("F", "/cente", 4000L));
                Thread.sleep(3000);
                context.collect(new Event("G", "/cente", 9200L));
                Thread.sleep(3000);
                context.collect(new Event("H", "/cente", 1000L));
                Thread.sleep(3000);
                context.collect(new Event("I", "/cente", 1500L));
                Thread.sleep(3000);

如果现在定义一个 5s的
滚动窗口,WaterMark延时时间为2s,AllowedLateness 延时时间为2s,此时相当于是 WaterMark到达9s的时候才会关闭0-5的窗口,也就是说最后两条数据会流向OutputTag . 当4000L数据到达后,会再次触发一次窗口计算。

完全与预期一致。

在这里插入图片描述

完整代码:

public class WindowOutputTest {

    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env = Env.getEnv();

        DataStreamSource<Event> dataStreamSource = env.addSource(new SourceFunction<Event>() {
            @Override
            public void run(SourceContext<Event> context) throws Exception {
                context.collect(new Event("A", "/user", 1000L));
                Thread.sleep(3000);
                context.collect(new Event("B", "/prod", 6500L));
                Thread.sleep(3000);
                context.collect(new Event("C", "/cart", 4000L));
                Thread.sleep(3000);
                context.collect(new Event("D", "/user", 7500L));
                System.out.println("窗口关闭 ~ ");
                Thread.sleep(3000);
                context.collect(new Event("E", "/cente", 8500L));
                Thread.sleep(3000);
                context.collect(new Event("F", "/cente", 4000L));
                Thread.sleep(3000);
                context.collect(new Event("G", "/cente", 9200L));
                Thread.sleep(3000);
                context.collect(new Event("H", "/cente", 1000L));
                Thread.sleep(3000);
                context.collect(new Event("I", "/cente", 1500L));
                Thread.sleep(3000);
            }

            @Override
            public void cancel() {

            }
        });

        //operator
        SingleOutputStreamOperator<Event> operator = dataStreamSource.assignTimestampsAndWatermarks(
                WatermarkStrategy.<Event>forBoundedOutOfOrderness(Duration.ofSeconds(2))// 水位线延时2s
                        .withTimestampAssigner(new SerializableTimestampAssigner<Event>() {
                            @Override
                            public long extractTimestamp(Event event, long l) {
                                return event.timestamp;
                            }
                        })
        );

        OutputTag<Event> eventOutputTag = new OutputTag<Event>("late") {
        };

        WindowedStream<Event, Boolean, TimeWindow> windowedStream = operator.keyBy(d -> true)
                .window(TumblingEventTimeWindows.of(Time.of(5, TimeUnit.SECONDS)))
                .allowedLateness(Time.of(2, TimeUnit.SECONDS))
                .sideOutputLateData(eventOutputTag);


        SingleOutputStreamOperator<String> windowAgg = windowedStream.aggregate(new AggregateFunction<Event, Long, Long>() {
            @Override
            public Long createAccumulator() {
                return 0L;
            }

            @Override
            public Long add(Event event, Long acc) {
                return acc + 1;
            }

            @Override
            public Long getResult(Long acc) {
                return acc;
            }

            @Override
            public Long merge(Long aLong, Long acc1) {
                return null;
            }
        }, new ProcessWindowFunction<Long, String, Boolean, TimeWindow>() {
            @Override
            public void process(Boolean key, Context context, Iterable<Long> iterable, Collector<String> collector) throws Exception {
                long start = context.window().getStart();
                long end = context.window().getEnd();
                collector.collect(new Timestamp(start) + " ~ " + new Timestamp(end) + " ===> " + iterable.iterator().next());
            }
        });

        windowAgg.print("窗口数据 ");

        //获取测输出流中的延时数据
        DataStream<Event> sideOutput = windowAgg.getSideOutput(eventOutputTag);
        sideOutput.print("测输出流:-> ");


        env.execute();

    }

}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/994097.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

嵌入式开发-绪论

目录 一.什么是嵌入式 1.1硬件系统 1.2软件系统 二.嵌入式应用场景 2.1消费电子 2.1.1智能家居 2.1.2影音 2.1.3家用电器 2.1.4玩具游戏机 2.2通信领域 2.2.1对讲机 2.2.2手机 2.2.3卫星 2.2.4雷达 2.3控制领域 2.3.1机器人 2.3.2采集器PLC 2.4金融 2.4.1POS…

快速文件复制与删除工具,将复制时文件夹里的原文件删除掉

无论是工作还是生活&#xff0c;我们都离不开文件的复制和管理。然而&#xff0c;手动复制文件不仅费时费力&#xff0c;而且容易出错。现在&#xff0c;我们为您推荐一款快速文件复制与删除工具&#xff0c;让您的文件管理更加高效&#xff01; 首先&#xff0c;我们要进入文…

MybatisPlus分页插件使用

一. 效果展示 二. 代码编写 2.1 pom <dependency><groupId>com.baomidou</groupId><artifactId>mybatis-plus-boot-starter</artifactId><version>3.4.2</version> </dependency>2.2 添加配置类 Configuration MapperScan(…

推荐一款程序员截图神器!

快来看一下程序员必备的一款截图工具 今天就来和大家说一下作为程序员必备截图神器&#xff0c;几乎每一个程序员都会设置开机自启&#xff0c;因为这个截图功能太太太好用了&#xff01;&#xff01;&#xff01;只要你在键盘上按下F1就可以轻松截取整个屏幕&#xff0c;然后…

51单片机项目(9)——基于51单片机的电子琴设计

简易电子琴设计设计内容: 1.用矩阵键盘代表琴键&#xff0c;至少能弹出8个音符&#xff0c;分别是:音符1.23.4.,5,6, 2.键按下的时间长短表征节拍的长短&#xff0c;用蜂鸣器发出声音 3.数码管显示出当前音符 4.音量可调 &#xff08;代码及其工程文件放在最后&#xff09; …

vue中的几种name属性

vue中的几种name属性 组件名name name选项 export default{name:xxx } // 获取组件的name属性 this.$options.namevue-devtools调试工具里显示的组件名称&#xff1b; 未配置name选项&#xff0c;就是组件的文件名&#xff1b; vue3配置name通过defineOptions()函数 de…

msvcp110.dll是什么意思与msvcp110.dll丢失的解决方法

电脑突然提示msvcp110.dll丢失&#xff0c;无法执行此代码。导致软件无法打开运行&#xff0c;这个怎么办呢&#xff1f;我在网上找了一天的资料&#xff0c;终于把这个问题彻底处理好&#xff0c;也弄清楚了msvcp110.dll丢失的原因及msvcp110.dll丢失修复方法&#xff1f;现在…

20230909java面经整理

1.java常用集合 ArrayList动态数组&#xff0c;动态调整大小&#xff0c;实现List接口 LinkedList双向链表&#xff0c;实现list和queue接口&#xff0c;适用于频繁插入和删除操作 HashSet无序&#xff0c;使用哈希表实现 TreeSet有序&#xff0c;使用红黑树实现 HashMap无序&…

FPGA开发

https://www.enclustra.com.cn/?bd_vid11435475462206745180 https://www.monolithicpower.cn/design-tools/design-tools/llc-design-tool.html https://www.elecfans.com/article/88/143/2012/20120718280641_2.html

[JAVAee]IP数据包的组包与分包

目录 数据包是什么? 数据包的结构 数据包/分组与分组交换 分包是什么?为什么需要分包呢? 组包是什么? 分包组包过程中和哪些 IP 报头字段有关联? 本篇文章主要围绕三个问题来展开: 为什么要分包?分包组包过程中和哪些 IP 报头字段有关联组包时如何保证数据的顺序和…

基于Yolov8的中国交通标志(CCTSDB)识别检测系统

目录 1.Yolov8介绍 2.纸箱破损数据集介绍 2.1数据集划分 2.2 通过voc_label.py得到适合yolov8训练需要的 2.3生成内容如下 3.训练结果分析 1.Yolov8介绍 Ultralytics YOLOv8是Ultralytics公司开发的YOLO目标检测和图像分割模型的最新版本。YOLOv8是一种尖端的、最先进的&…

kafka学习-生产者

目录 1、消息生产流程 2、生产者常见参数配置 3、序列化器 基本概念 自定义序列化器 4、分区器 默认分区规则 自定义分区器 5、生产者拦截器 作用 自定义拦截器 6、生产者原理解析 1、消息生产流程 2、生产者常见参数配置 3、序列化器 基本概念 在Kafka中保存的数…

虚拟机上部署K8S集群

虚拟机上部署K8S集群 安装VM Ware安装Docker安装K8S集群安装kubeadm使用kubeadm引导集群 安装VM Ware 参考&#xff1a;http://www.taodudu.cc/news/show-2034573.html?actiononClick 安装Docker 参考&#xff1a;https://www.yuque.com/leifengyang/oncloud/mbvigg#2ASxH …

长安链BaaS服务平台调研

目录 一、菜单功能二、其他说明2.1、服务平台的部署方式2.2、链本身2.3、建链流程2.4、支持连接已部署的链2.5、链治理投票2.6、支持动态节点操作2.7、支持应用 长安链ChainMaker管理平台文档地址&#xff1a;https://docs.chainmaker.org.cn 一、菜单功能 菜单子菜单/功能点…

lock screen password (remove)

解除apple手机锁屏密码步骤 对于老人家来说手机越简单越好 换手机的时候连界面图标&#xff0c;页码&#xff0c;原来放那里&#xff0c;新机也是放那里

Nacos实战(19)-Nacos健康检查机制:保障你的服务稳定运行!

0 前言 注册中心不应仅提供服务注册和发现功能&#xff0c;还应保证对服务可用性监测&#xff0c;对不健康的服务和过期的进行标识或剔除&#xff0c;维护实例的生命周期&#xff0c;以保证客户端尽可能的查询到可用的服务列表。 因此本文介绍Nacos注册中心的健康检查机制。 …

C++函数内联详解

本文旨在讲解C中的函数内联相关知识&#xff0c;读完这篇文章&#xff0c;希望读者们会对函数内联有更深一步的认识&#xff01; 内联函数的定义 在计算机科学中&#xff0c; 内联函数 &#xff08;有时称作 在线函数 或 编译时期展开函数 &#xff09;是一种编程语言结构&…

如何给Mybatis-plus再增加点plus

来源公众号&#xff1a;赵侠客 一、Mybatis-plus基本功能 1.1 Mybatis-plus内置方法 Mybatis-plus给我们造了很多轮子&#xff0c;让我们可以开箱即用&#xff0c;在BaseMapper中有19种操作数据库常用的方法&#xff0c;如Insert()、deleteById()、updateById()、selectById(…

Spring系列文章:Spring事务

一、事务简述 1、什么是事务&#xff08; Transaction&#xff08;tx&#xff09;&#xff09; 在⼀个业务流程当中&#xff0c;通常需要多条DML&#xff08;insert delete update&#xff09;语句共同联合才能完成&#xff0c;这 多条DML语句必须同时成功&#xff0c;或者同…

WSL 在windows 家庭版上面的安装方式

目录 1、前言 2、约束 3、安装 1、安装Hyper 2、Hyper-V启用 3、安装Linux 4、0x800701bc问题处理 结论 1、前言 适用于Windows的Linux子系统 Windows Subsystem for Linux&#xff08;简称WSL&#xff09;是一个在Windows 10\11上能够运行原生Linux二进制可执行文件&am…