flink 最后一个窗口一直没有新数据,窗口不关闭问题

news2024/12/28 21:44:13

flink 最后一个窗口一直没有新数据,窗口不关闭问题

  • 自定义实现 WatermarkStrategy接口

自定义实现 WatermarkStrategy接口

代码:

    public static class WatermarkDemoFunction implements WatermarkStrategy<JSONObject>{

        private Tuple2<Long,Boolean> state = Tuple2.of(0L,true);

        @Override
        public WatermarkGenerator<JSONObject> createWatermarkGenerator(WatermarkGeneratorSupplier.Context context) {
            return new WatermarkGenerator<JSONObject>() {
                private long maxWatermark;

                @Override
                public void onEvent(JSONObject waterSensor, long l, WatermarkOutput watermarkOutput) {
                    maxWatermark = Math.max(maxWatermark,waterSensor.getLong("ts"));
                    state.f0 = System.currentTimeMillis();
                    System.out.println("maxWatermark is " + maxWatermark);
                    state.f1 = false;
                }
                @Override
                public void onPeriodicEmit(WatermarkOutput watermarkOutput) {
                    //乱序时间
                    long outOfTime = 3000L;
                    if (maxWatermark - outOfTime <=0){
                    } else {
                        // 10s内没有数据则关闭当前窗口
                        System.out.println("System.currentTimeMillis() - state.f0:" + (System.currentTimeMillis() - state.f0));
                        System.out.println("state.f1:" + state.f1);
                        if (System.currentTimeMillis() - state.f0 >= 9000L && !state.f1){
                            watermarkOutput.emitWatermark(new Watermark(maxWatermark  + 6000L));
                            state.f1 = true;
                            System.out.println("触发窗口,maxWatermark  + 6000L:" + (maxWatermark  + 6000L));
                        } else {
                            System.out.println("正常发送水印");
                            watermarkOutput.emitWatermark(new Watermark(maxWatermark - outOfTime));
                        }
                    }
                }
            };
        }
    }

代码部分逻辑说明
在这里插入图片描述若设置了自动生成watermark 参数,根据打印日志,设置对应的时间(多久没新数据写入,触发窗口计算)
env.getConfig().setAutoWatermarkInterval(5000);

使用自定义的watermark:
在这里插入图片描述
参考:https://blog.csdn.net/lr131425/article/details/127422833

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1394013.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

第17章_反射机制拓展练习(关于Class,反射的应用,综合练习)

文章目录 第17章_反射机制拓展练习关于Class1、获取Class对象 反射的应用2、创建对象3、修改属性值4、调用方法5、获取类型信息6、榨汁机7、获取泛型参数8、自定义注解19、自定义注解2 综合练习10、AtguiguDemo11、AtguiguStudent12、自定义注解 第17章_反射机制拓展练习 关于C…

AttributeError: ‘DataFrame‘ object has no attribute ‘ix‘

文章目录 报错信息报错原因解决方案 报错信息 AttributeError: DataFrame object has no attribute ix 报错原因 这个ix属性在最新的pandas版本中已经升级修改。 解决方案 修改前 column01 dataset.ix[1, roomtype] 修改后 column01 dataset.loc[:,total_price] 关注公…

芯片制造上下游产业链及其工艺

整个产业链可分为&#xff1a;设计验证&#xff0c;晶圆制造&#xff0c;封装测试 设计验证&#xff1a;系统设计&#xff0c;逻辑设计&#xff0c;电路设计&#xff0c;物理设计 晶圆制造&#xff1a;拉单晶&#xff0c;磨外圆&#xff0c;切片&#xff0c;倒角&#xff0c;…

海思3559 yolov5模型转wk详细笔记

文章目录 前言1.编译caffer1.1安装虚拟机1.2安装caffer1.3编译python接口 2.适应wk的yolov5模型训练2.1下载yolov5-6.0项目源码2.2安装yolov5-6.0运行环境2.3修改模型2.4修改数据集2.5修改模型算子2.6 模型训练 3.模型转换&#xff1a;pt->onnx->caffe->wk3.1 pt->…

Docker安装Nginx并部署MySQL容器构建

一.MySQL容器的构建 1.创建MySQL根目录及配置文件夹&data文件夹 mkdir -p mysql/{conf,data} 2.上传配置文件 将配置文件上传到conf文件夹&#xff08;数据库配置文件已放到置顶资源中&#xff09; 3.命令构建MySQL容器 /soft/mysql/conf/my.cnf:/etc/my.cnf目录为我们…

如何安装配置VisualSVN服务并实现公网访问本地服务【内网穿透】

文章目录 前言1. VisualSVN安装与配置2. VisualSVN Server管理界面配置3. 安装cpolar内网穿透3.1 注册账号3.2 下载cpolar客户端3.3 登录cpolar web ui管理界面3.4 创建公网地址 4. 固定公网地址访问 前言 SVN 是 subversion 的缩写&#xff0c;是一个开放源代码的版本控制系统…

STM32入门教程-2023版【4-1】OLED调试工具

关注 点赞 不错过精彩内容 大家好&#xff0c;我是硬核王同学&#xff0c;最近在做免费的嵌入式知识分享&#xff0c;帮助对嵌入式感兴趣的同学学习嵌入式、做项目、找工作! 一、概述 在这一节先提前介绍一下&#xff0c;在以后的教程中我们会经常用到这个显示屏&#xff0…

javaScript设计模式-工厂

它的好处是消除对象间的耦合度&#xff0c;在派生子类时提供了更大的灵活性。但盲目的把普通的构造函数扔在一边&#xff0c;并不值得提倡。如果要采一不可能另外换用一个类&#xff0c;或都不需要在运行期间在一系列可互换的类中进行选择&#xff0c;就不应该使用。这样在后期…

中仕公考:国考进面后资格复审需要准备什么?

参加国考面试的考生在资格审核阶段需要准备以下材料&#xff1a; 1、本人身份证、学生证或工作证复印件。 2、公共科目笔试准考证复印件。 3、考试报名登记表。 4、本(专)科、研究生各阶段学历、学位证书(应届毕业生没有可以暂时不提供)。 5、报名资料上填写的各类证书材料…

页面数据类型为json,后端接受json数据

项目结构 依赖pom.xml <dependency><groupId>org.springframework</groupId><artifactId>spring-context</artifactId><version>5.2.8.RELEASE</version></dependency><dependency><groupId>org.springframework…

一些面试会问到的奇怪问题与面试总结

1.v-for、v-if先后顺序。 官方不建议一起使用&#xff0c;但是有时候面试的时候会问到。 在vue2中是v-for先与v-if的。 源码js编译结果&#xff1a; _c()就是vm.$createElement()&#xff0c;意思是创建一个虚拟的element&#xff0c;就是返回值是VNode。 _l就是renderlist…

C#,入门教程(18)——分支语句(switch-case)的基础知识

上一篇&#xff1a; C#&#xff0c;入门教程(17)——条件语句&#xff08;if-else&#xff09;的基础知识https://blog.csdn.net/beijinghorn/article/details/124033376 1、switch概述 switch-case分支语句 可以理解为 大号 的 if-else。 switch语句以switch关键字开头&…

redis7部署集群:包含主从模式、哨兵模式、Cluster集群模式等三种模式

前言&#xff1a; redis部署集群常见的一般有三种模式&#xff1a;主从模式&#xff0c;Sentinel&#xff08;哨兵模式&#xff09;&#xff0c;Redis Cluster&#xff08;高可用Cluster集群&#xff09;&#xff0c;根据不同的需求可自定义选择部署方式。 Redis 主从模式&…

2.【Linux】(进程的状态||深入理解fork||底层剖析||task_struct||进程优先级||并行和并发||详解环境变量)

一.进程 1.进程调度 Linux把所有进程通过双向链表的方式连接起来组成任务队列&#xff0c;操作系统和cpu通过选择一个task_struct执行其代码来调度进程。 2.进程的状态 1.运行态&#xff1a;pcb结构体在运行或在运行队列中排队。 2.阻塞态&#xff1a;等待非cpu资源就绪&am…

工厂企业消防安全AI可视化视频智能监管解决方案

一、方案背景 2023年11月20日下午6时30分许&#xff0c;位于江苏省无锡市惠山区前洲街道的某公司突发严重火灾&#xff0c;共造成7人死亡。这次火灾提醒我们工业安全至关重要&#xff0c;企业都应该时刻保持警惕&#xff0c;加强安全意识和培训&#xff0c;提高应对突发事件的…

RK3568驱动指南|驱动基础进阶篇-进阶8 内核运行ko文件总结

瑞芯微RK3568芯片是一款定位中高端的通用型SOC&#xff0c;采用22nm制程工艺&#xff0c;搭载一颗四核Cortex-A55处理器和Mali G52 2EE 图形处理器。RK3568 支持4K 解码和 1080P 编码&#xff0c;支持SATA/PCIE/USB3.0 外围接口。RK3568内置独立NPU&#xff0c;可用于轻量级人工…

DrGraph原理示教 - OpenCV 4 功能 - 形态操作

形态类型 从OpenCV图像处理基本知识来看&#xff0c;膨胀腐蚀操作后&#xff0c;还有形态操作&#xff0c;如开运算、闭运算、梯度、礼帽与黑帽&#xff0c;感觉很多&#xff0c;其实&#xff0c;本质上就是批处理操作&#xff0c;如 开运算&#xff1a;先腐蚀&#xff0c;再膨…

【不用找素材】ECS 游戏Demo制作教程(3) 1.17

一、生成墓碑 新建脚本如下&#xff1a; using Unity.Entities; using Unity.Mathematics;namespace ECSdemo {public struct GraveyardRandom : IComponentData{public Random Value;}}扩充GraveyardMono如下&#xff1a; using Unity.Entities; using Unity.Mathematics; …

AVL树 -- C++实现

AVL树 – C实现 1. AVL树的概念 二叉搜索树虽可以缩短查找的效率&#xff0c;但如果数据有序或接近有序二叉搜索树将退化为单支树&#xff0c;查找元素相当于在顺序表中搜索元素&#xff0c;效率低下。因此&#xff0c;两位俄罗斯的数学家G.M.Adelson-Velskii和E.M.Landis在1…

MySQL表的基本插入查询操作详解

博学而笃志&#xff0c;切问而近思 文章目录 插入插入更新 替换查询全列查询指定列查询查询字段为表达式查询结果指定别名查询结果去重 WHERE 条件基本比较逻辑运算符使用LIKE进行模糊匹配使用IN进行多个值匹配 排序筛选分页结果更新数据删除数据截断表聚合函数COUNTSUMAVGMAXM…