flink 实战理解watermark,maxOutOfOrderness,allowedLateness

news2024/9/22 1:54:44

watermark

  • watermark的作用
    就是延迟触发窗口,让乱序到达的元素依然能够落在正确的窗口内。为啥能实现这个效果,一直通过公式更新watermark,如果乱序到的元素就不能更新watermark,相当于就是延迟触发计算操作。
  • 触发时间
    watermark 大于窗口的最大值
  • allowedLateness
    允许迟到的时间,到底啥时到的元素算迟到元素,如果元素的窗口满足下面这个公式,那这个窗口就去被清掉,这个元素就会认为是迟到元素。
    窗口的最大值是固定值
    allowedLateness 是设置的固定值
    剩下来看就和watermark有关,如果有新元素不断来,一直更新watermark,那么之前这个窗口很短时间后就会被清理掉了。换个说法说,如果watermark没有更新,如果一直来的元素都满足下面的条件,那么之前的窗口就会一直输出。

window.maxTimestamp() + allowedLateness <=watermark

在这里插入图片描述

代码

  StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        env.setParallelism(1);

        DataStreamSource<String> localhost = env.socketTextStream("localhost", 9093);
        final OutputTag<Tuple3<String, Long, Integer>> lateTag = new OutputTag<Tuple3<String, Long, Integer>>("late-data") {
        };


        SingleOutputStreamOperator<Tuple3<String, Long, Integer>> reduce = localhost.assignTimestampsAndWatermarks(new AssignerWithPeriodicWatermarksAdapter.Strategy<>(new BoundedOutOfOrdernessTimestampExtractor<String>(Duration.ofSeconds(0)) {
                    @Override
                    public long extractTimestamp(String element) {
                        String[] split = element.split(",");
                        return Long.valueOf(split[0]);
                    }
                })).map(new MapFunction<String, Tuple3<String, Long, Integer>>() {
                    @Override
                    public Tuple3<String, Long, Integer> map(String value) throws Exception {
                        String[] split = value.split(",");
                        return new Tuple3<>(split[1], System.currentTimeMillis(), 1);
                    }
                }).keyBy(new KeySelector<Tuple3<String, Long, Integer>, String>() {

                    @Override
                    public String getKey(Tuple3<String, Long, Integer> value) throws Exception {
                        return value.f0;
                    }
                }).window(TumblingEventTimeWindows.of(Duration.ofMinutes(1))).allowedLateness(Duration.ofMinutes(1)).sideOutputLateData(lateTag).
                reduce(new ReduceFunction<Tuple3<String, Long, Integer>>() {
                    @Override
                    public Tuple3<String, Long, Integer> reduce(Tuple3<String, Long, Integer> value1, Tuple3<String, Long, Integer> value2) throws Exception {

                        value1.f2 = value1.f2 + value2.f2;
                        return value1;
                    }
                });
        reduce.print();

        reduce.getSideOutput(lateTag).print();


        try {
            env.execute("aa");
        } catch (Exception e) {
            throw new RuntimeException(e);
        }

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2086383.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

我的易经代码

本人从2000年起&#xff0c;就开始写一款算命软件&#xff0c;第一版用的是powerbuilder。后来改成企业版&#xff0c;名为“始皇预测”&#xff0c;用Java Swing编写&#xff0c;支持五大神数&#xff0c;三式&#xff0c;主要应用还是六爻、四柱、风水&#xff0c;其它如称骨…

2024118读书笔记|《岳阳楼记》——天高地迥,觉宇宙之无穷;兴尽悲来,识盈虚之有数

2024118读书笔记|《岳阳楼记》——天高地迥&#xff0c;觉宇宙之无穷&#xff1b;兴尽悲来&#xff0c;识盈虚之有数 爱莲说陋室铭小石潭记醉翁亭记赤壁赋桃花源记归去来兮辞木兰辞阿房宫赋滕王阁序岳阳楼记 《岳阳楼记》范仲淹&#xff0c;都是背过的古文&#xff0c;挺不错的…

【Qt窗口】—— 工具栏

前情摘要&#xff1a; 工具栏相当于菜单栏中的众多快捷方式&#xff0c;毕竟很多操作都是通过菜单栏来直接访问的&#xff0c;但是可能会查找很长时间&#xff0c;首先就是查找在哪个菜单里面&#xff0c;打开菜单才能进一步操作。而工具栏则是把一些常用的操作都给列举出来&am…

生产者与消费者模型

生产者与消费者模型 生产者&#xff1a;生产数据的线程&#xff0c;这类的线程负责从用户端、客户端接收数据&#xff0c;然后把数据Push到存储中介。 消费者&#xff1a;负责消耗数据的线程&#xff0c;对生产者线程生产的数据进行&#xff08;判断、筛选、使用、响应、存储&…

C++必修:布隆过滤器的提出与实现

✨✨ 欢迎大家来到贝蒂大讲堂✨✨ &#x1f388;&#x1f388;养成好习惯&#xff0c;先赞后看哦~&#x1f388;&#x1f388; 所属专栏&#xff1a;C学习 贝蒂的主页&#xff1a;Betty’s blog 1. 布隆过滤器的引入 在我们注册游戏或者社交账号时&#xff0c;我们可以自己设置…

科学重温柯南TV版:基于B站视频数据分析

麻鸭&#xff0c;四年过去了&#xff0c;失踪人口回归。 第一篇就决定是你了。 看了柯南M27剧场版后&#xff0c;萌生了重温TV版的念头&#xff0c;但是1191集(截止24/8/26)的体量太恐怖了&#xff0c;遂取点巧&#xff0c;综合大V建议(知乎&#xff1b;公众号)和视频网站数据…

基于asp.net的驾校管理系统附源码

这是一个基于asp.net的webform框架开发的BS架构的系统&#xff0c;详情如下&#xff1a; 项目下载链接 链接&#xff1a;https://pan.quark.cn/s/0679e783ef71

【设计模式】创建型模式——抽象工厂模式

抽象工厂模式 1. 模式定义2. 模式结构3. 实现3.1 实现抽象产品接口3.2 定义具体产品3.3 定义抽象工厂接口3.4 定义具体工厂3.5 客户端代码 4. 模式分析4.1 抽象工厂模式退化为工厂方法模式4.2 工厂方法模式退化为简单工厂模式 5. 模式特点5.1 优点5.2 缺点 6. 适用场景6.1 需要…

深入理解OJ编程中的输入输出:11个经典题目详解与技巧分享及stringstream,sort详解

文章目录 1.多组输入计算ab2.给定组数计算ab3.给定组数计算ab&#xff08;如果为0则结束&#xff09;4.计算一些列数的和(第一个数为0时结束)5.计算一些列数的和&#xff08;告诉了有几组&#xff09;6.计算一系列数的和&#xff08;不告知几组和何时结束&#xff0c;每一组第一…

如何评估云服务器提供商可靠性与信誉度

在云计算时代&#xff0c;选择一个可靠和信誉良好的云服务器提供商对于个人用户和企业来说至关重要。以下是评估云服务器提供商可靠性与信誉度的关键指标和方法&#xff1a; 1. 服务水平协议&#xff08;SLA&#xff09;&#xff1a; 可用性承诺&#xff1a; 查看云服务器提供…

服务器内存飙升分析小记

1. 写在最前面 这个繁忙的八月真的是转瞬即逝&#xff0c;我明明感觉似乎好像才八月刚开始&#xff0c;但是其实已经到了八月的尾巴。这个月本来想抽空整理一下学习 AI 模型相关的东西&#xff0c;奈何每天不是在查问题就是在查问题的路上&#xff0c;不是在修 Bug 就是在写 B…

AI Lossless Zoomer v3.1.0.0 — 超实用的AI无损图片放大工具

AI Lossless Zoomer 是一款基于腾讯开源 Real-ESRGAN 算法的 AI 图片无损放大工具&#xff0c;支持多线程和批量处理&#xff0c;具备自定义输出格式和路径等高级设置选项&#xff0c;并允许用户选择不同的 AI 引擎进行图片放大处理。此版本修复了一些小 bug&#xff0c;并增加…

Jhipster应用,cdn加速方案。

Jhipster, 采用springbootwebfluxreacttypescript技术栈。项目部署是采用k8shelm 部署在GCP上的&#xff0c;所以这个单体项目幕后是跑在pod上的。 项目上线后&#xff0c;发现单页面应用加载速度很慢&#xff0c;如图所示长时间处于加载状态&#xff1a; 仔细分析一下原因&am…

ESXi服务器无法安装Windows11:“不符合此版本的Windows所需最低系统要求“

目录 一、问题描述1.使用环境2.问题截图3.问题解析 二、解决方法Ⅰ1.按 ShiftF10 弹出命令提示符2.在弹出的Dos框中输入regedit&#xff0c;回车&#xff0c;进入注册表。3.打开HKEY_LOCAL_MACHINE\SYSTEM\Setup&#xff0c;并新建 LabConfig 的项&#xff0c;在 LabConfig 下创…

51单片机-静态数码管显示

时间&#xff1a;2024.8.29 作者&#xff1a;Whappy 目的&#xff1a;学习51单片机 代码&#xff1a; #include <REGX52.H> #include "intrins.h"unsigned char NixieTable[] {0x3F,0x06,0x5B,0x4F,0x66,0x6D,0x7D,0x07,0x7F,0x6F,0x77,0x7C,0x39,0x5E,0x79…

C++系列-STL容器之deque

STL容器之deque deque概括deque与vector内存管理的区别vector内存分配方式deque内存分配方式 deque与vector随机访问效率的区别deque与vector插入和删除操作的区别deque与vector适用场景 deque的构造函数deque的构造函数举例 deque的赋值操作deque容器的大小操作deque容器的插入…

瑞芯微RK3566开发板USB OTG模式介绍及命令切换,触觉智能EVB3566主板鸿蒙硬件厂商

一、USB OTG的模式 host模式&#xff08;下行&#xff09;&#xff1a;为u盘等设备供电&#xff0c;不可以进行调试&#xff0c;连接adb或者烧录等操作。 device模式&#xff08;上行&#xff09;&#xff1a;可以进行调试&#xff0c;连接adb或者烧录等操作&#xff0c;即US…

Delphi5实现主要——明细型数据库应用

文章目录 效果图主要——明细型数据库特点 数据库实现方式完整代码 效果图 主要——明细型数据库 在Delphi中&#xff0c;主要——明细型数据库是一种数据库应用程序的设计模式&#xff0c;它涉及到多个数据库表之间的关联操作&#xff0c;以实现对复杂数据结构的有效管理。这…

数据结构(邓俊辉)学习笔记】串 16——Karp-Rabin算法:串即是数

文章目录 1. 化串为数2. 凡物皆数3. 亦是数 1. 化串为数 接下来的这节&#xff0c;我们再来讨论一种十分另类的串匹配算法&#xff0c;也就是所谓的 Karp-Rabin 算法。回顾此前所介绍的几种串匹配算法&#xff0c;我们所面临的难题是一样的。也就是说在这里&#xff0c;我们每次…

ES配合高德地图JS-API实现地理位置查询

目录 实现功能点 技术选型 具体实现 Vue3整合高德地图JS API-2.0 添加商户&#xff1a;前端 添加商户&#xff1a;后端/ES 查询用户当前地理坐标 获取附近&#xff08;指定距离&#xff09;的商户 总结/测试Demo代码地址 测试概述&#xff1a;用户使用高德地图组件获取商户…