2023-01-18 flink 11.6 时间水印 和 窗口周期的关系计算方法

news2025/1/11 10:12:04

forBoundedOutOfOrderness 和 TumblingEventTimeWindows

forBoundedOutOfOrderness(M)

TumblingEventTimeWindows(N)

第一条数据的时间TS1

第一个窗口期公式:

窗口开始时间:

win_start = ((TS1-M)/N) * N

窗口结束时间:

win_end = win_start+N

数据过期:

凡是<win_start都是过期数据

第一个窗口汇总计算触发:

与数据之间的接收的间隔时间无关,与总时长也无关。

只与接收到的数据的时间TS2有关。

当 TS2>=win_end+M 时会将时间水印在 >= win_start && <=win_end 给到Apply。

TS2>=win_end+M 是唯一条件。


import java.time.Duration;
import java.util.ArrayList;
import java.util.List;

import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;
public class WaterMarkTest {
    public WaterMarkTest() {
        
    }
    static StringBuilder sb = new StringBuilder();
    static long sts = 0L;
    static long ets = 0L;
    static long sleeps = 500; 
    public <R> void run()  throws Exception{
        sts = System.currentTimeMillis();
        StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();
        OutputTag<Tuple2<String, String>> lateOutputTag = new OutputTag<Tuple2<String, String>>("late-data-lx"){private static final long serialVersionUID = 154621L;};

        DataStream<String> dataStream = env.addSource(new SourceFunction<String>() {
            private static final long serialVersionUID = 1134546L;

            @Override
            public void run(SourceContext<String> ctx) throws Exception {
                ctx.collect("hello,1553503188000");
                Thread.sleep(sleeps);//水印计算间隔是200ms,所以不要低于这个值
                ctx.collect("hello,1553503186000");
                Thread.sleep(sleeps);
                ctx.collect("hello,1553503183000");
                Thread.sleep(sleeps);
                ctx.collect("hello,1553503180000"); 
                Thread.sleep(sleeps);
                ctx.collect("hello,1553503185000");
                Thread.sleep(sleeps);
                ctx.collect("hello,1553503188000");
                Thread.sleep(sleeps);
                ctx.collect("hello,1553503189000");
                Thread.sleep(1000);
                ctx.collect("hello,1553503188000");
                Thread.sleep(1000);
                ctx.collect("hello,1553503189000");
                Thread.sleep(1000);
                ctx.collect("hello,1553503190000");
                Thread.sleep(1000);
                ctx.collect("hello,1553503191000");
                Thread.sleep(1000);
                ctx.collect("hello,1553503186000");
                Thread.sleep(1000);
                ctx.collect("hello,1553503187000");
                Thread.sleep(1000);
                ctx.collect("hello,1553503185000");
                Thread.sleep(1000);
                ctx.collect("hello,1553503184000"); //丢弃
                Thread.sleep(1000);
                ctx.collect("hello,1553503183000"); //丢弃
                Thread.sleep(1000);
                ctx.collect("hello,1553503190000");
                Thread.sleep(1000);
                ctx.collect("hello,1553503192000");
                Thread.sleep(1000);
                ctx.collect("hello,1553503193000");
                Thread.sleep(1000);
                ctx.collect("hello,1553503194000");
                Thread.sleep(1000);
                ctx.collect("hello,1553503195000");
                Thread.sleep(1000);
                ctx.collect("hello,1553503196000");
                Thread.sleep(1000);
                ctx.collect("hello,1553503197000");
                Thread.sleep(1000);
                ctx.collect("hello,1553503198000");
                Thread.sleep(1000);
                ctx.collect("hello,1553503199000");
                Thread.sleep(1000);
                ctx.collect("hello,1553503200000");
                Thread.sleep(1000);
                ctx.collect("hello,1553503201000");
                Thread.sleep(1000);
                ctx.collect("hello,1553503202000");
//                Thread.sleep(15000);
                System.out.println("1 ============================================================");
                sb.append("time use 1="+(ets-sts)+", ets="+ets+"\n");
            }
 
            @Override
            public void cancel() {
 
            }
        }, "source1")
         /**
          *  assignTimestampsAndWatermarks 的代码注释翻译:
          *  Assigns timestamps to the elements in the data stream and generates watermarks to signalevent time progress. 
          *  The given WatermarkStrategy is used to create a TimestampAssigner and WatermarkGenerator.
          *
          *  为数据流里面的元素设置时间,并且给”信号事件时间处理”计算水印
          *  给定的水印策略是用来创建 TimestampAssigner and WatermarkGenerator
          *
          *  For each event in the data stream, the TimestampAssigner.extractTimestamp(Object, long) method is called to assign an event timestamp.
          *
          *  数据流里面的每一个事件,都会调用 TimestampAssigner.extractTimestamp(Object, long) 方法去给事件添加时间记录。
          *
          *  For each event in the data stream, the WatermarkGenerator.onEvent(Object, long, WatermarkOutput) will be called.
          *  数据流里面的每个事件,都会调用WatermarkGenerator.onEvent方法
          *
          */
        .assignTimestampsAndWatermarks(
                WatermarkStrategy                    
                    .<String>forBoundedOutOfOrderness(Duration.ofSeconds(3))                    
                    /**
                     * 给数据打上时间信息
                     */
                    .withTimestampAssigner(
                        new SerializableTimestampAssigner<String>() {
                            private static final long serialVersionUID = 134231L;
                            long recordTimestamp = 0L;
                            long lst_ts = 0L;
                            @Override
                            public long extractTimestamp(String element, long _recordTimestamp) {//_recordTimestamp 是element的内部时间
                                String[] fields = element.split(",");
                                Long aLong = new Long(fields[1]);
                                long now = System.currentTimeMillis();
//                                if(aLong>recordTimestamp) {
                                    String msg = now+"["+(lst_ts>0?(now-lst_ts):0)+"]: Key-> " + fields[0] + ",EventTime:" + aLong +", recordTimestamp="+recordTimestamp;
                                    System.out.println(msg);
                                    sb.append(msg).append("\n");
                                    if(lst_ts==0)lst_ts=  now;
//                                }
                                recordTimestamp  = Math.max(aLong, recordTimestamp);
                                return aLong;
                            }
                    }
            )
        );

        dataStream.map(new MapFunction<String, Tuple2<String, String>>() {
            private static final long serialVersionUID = 12342L;
            @Override
            public Tuple2<String, String> map(String s) throws Exception {
                return new Tuple2<String, String>(s.split(",")[0], s.split(",")[1]);
            }
        }).keyBy(f->f.f0)
        .window(TumblingEventTimeWindows.of(Time.seconds(5)))//n秒种滚动窗口
        .allowedLateness(Time.seconds(0))//这个设置大于0,就会出现一个周期反复出现结果的情况,而且是会把当前周期退回晚的数据的周期,就是有一条迟到,就会改允许迟到的周期。
        .sideOutputLateData(lateOutputTag)
        .apply(new WindowFunction<Tuple2<String,String>, String, String, TimeWindow>() {
            private static final long serialVersionUID = 1112151L;
            private long last_deal_ts = 0L;
            int pos = 0;
            String msg0 = "";
            String msg1 = "";
            @Override
            public void apply(java.lang.String key, TimeWindow window,    Iterable<Tuple2<java.lang.String, java.lang.String>> input, Collector<java.lang.String> out) throws Exception {
                long cur_ts = System.currentTimeMillis();
                if(last_deal_ts==0)
                    last_deal_ts = sts;
                String msg =  cur_ts+"["+(last_deal_ts>0?(cur_ts-last_deal_ts):"-")+"]"+" 当前窗口开始时间[" + window.getStart() + ",结束时间" + window.getEnd() + ")";
                sb.append(msg).append("\n");
                System.out.println(msg);
                last_deal_ts = cur_ts;
                List<Tuple2<String, String>> list = new ArrayList<>();
                input.forEach(o -> list.add(o));
                list.sort((o1, o2) -> o1.f1.compareTo(o2.f1));
                //list.sort(Comparator.comparing(o -> o.f1)); // 与上句代码同义,按照第二个属性升序排序
                pos = 0;
                msg0 = "";
                msg1 = "";
                list.forEach(o -> {
                    if(pos++<1)
                        msg0 ="> "+o.f1+"\n";
                    else
                        msg1 ="> "+o.f1+"\n";
                    out.collect(" - " + o.f1);
                    
                    System.out.println("> "+o.f1);
                });
                sb.append(msg0).append(msg1);
                ets = System.currentTimeMillis();
            }
        })
        .getSideOutput(lateOutputTag).map(new MapFunction<Tuple2<String,String>, String>() {
            private static final long serialVersionUID = 341902L;
            @Override
            public String map(Tuple2<String, String> value) throws Exception {
                String msg = "[Expire Data]> "+value.f0+"->"+value.f1;
                sb.append(msg).append("\n");
                return msg;
            }
        }).print();

        env.execute("Flink WaterMark Test1");
        
        
        /* 
2 ============================================================
1674029100314[0]: Key-> hello,EventTime:1553503188000, recordTimestamp=0
1674029100832[518]: Key-> hello,EventTime:1553503186000, recordTimestamp=1553503188000
1674029101341[1027]: Key-> hello,EventTime:1553503183000, recordTimestamp=1553503188000
[Expire Data]> hello->1553503183000
1674029101850[1536]: Key-> hello,EventTime:1553503180000, recordTimestamp=1553503188000
[Expire Data]> hello->1553503180000
1674029102362[2048]: Key-> hello,EventTime:1553503185000, recordTimestamp=1553503188000
1674029102872[2558]: Key-> hello,EventTime:1553503188000, recordTimestamp=1553503188000
1674029103384[3070]: Key-> hello,EventTime:1553503189000, recordTimestamp=1553503188000
1674029104392[4078]: Key-> hello,EventTime:1553503188000, recordTimestamp=1553503189000
1674029105400[5086]: Key-> hello,EventTime:1553503189000, recordTimestamp=1553503189000
1674029106404[6090]: Key-> hello,EventTime:1553503190000, recordTimestamp=1553503189000
1674029107408[7094]: Key-> hello,EventTime:1553503191000, recordTimestamp=1553503190000
1674029108420[8106]: Key-> hello,EventTime:1553503186000, recordTimestamp=1553503191000
1674029109426[9112]: Key-> hello,EventTime:1553503187000, recordTimestamp=1553503191000
1674029110433[10119]: Key-> hello,EventTime:1553503185000, recordTimestamp=1553503191000
1674029111438[11124]: Key-> hello,EventTime:1553503184000, recordTimestamp=1553503191000
[Expire Data]> hello->1553503184000
1674029112444[12130]: Key-> hello,EventTime:1553503183000, recordTimestamp=1553503191000
[Expire Data]> hello->1553503183000
1674029113450[13136]: Key-> hello,EventTime:1553503190000, recordTimestamp=1553503191000
1674029114464[14150]: Key-> hello,EventTime:1553503192000, recordTimestamp=1553503191000
1674029115467[15153]: Key-> hello,EventTime:1553503193000, recordTimestamp=1553503192000
1674029115625[19265] 当前窗口开始时间[1553503185000,结束时间1553503190000)
> 1553503185000
> 1553503189000
1674029116473[16159]: Key-> hello,EventTime:1553503194000, recordTimestamp=1553503193000
1674029117488[17174]: Key-> hello,EventTime:1553503195000, recordTimestamp=1553503194000
1674029118489[18175]: Key-> hello,EventTime:1553503196000, recordTimestamp=1553503195000
1674029119489[19175]: Key-> hello,EventTime:1553503197000, recordTimestamp=1553503196000
1674029120496[20182]: Key-> hello,EventTime:1553503198000, recordTimestamp=1553503197000
1674029120716[5091] 当前窗口开始时间[1553503190000,结束时间1553503195000)
> 1553503190000
> 1553503194000
1674029121508[21194]: Key-> hello,EventTime:1553503199000, recordTimestamp=1553503198000
1674029122516[22202]: Key-> hello,EventTime:1553503200000, recordTimestamp=1553503199000
1674029123517[23203]: Key-> hello,EventTime:1553503201000, recordTimestamp=1553503200000
1674029124533[24219]: Key-> hello,EventTime:1553503202000, recordTimestamp=1553503201000
time use 1=24357, ets=1674029120717
1674029124553[3837] 当前窗口开始时间[1553503195000,结束时间1553503200000)
> 1553503195000
> 1553503199000
1674029124554[1] 当前窗口开始时间[1553503200000,结束时间1553503205000)
> 1553503200000
> 1553503202000
time use2=28256, sts=1674029096360

         */
        
    }
     public static void main(String[] args) throws Exception {
         WaterMarkTest test = new WaterMarkTest();
         test.run();
         
         System.out.println("2 ============================================================");
         sb.append("time use2="+(System.currentTimeMillis()-sts)+", sts="+sts+"\n");
         System.out.println(sb.toString());
     }
}

参考:

https://blog.csdn.net/Vector97/article/details/110150925

https://blog.csdn.net/RonieWhite/article/details/114386907

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/170220.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Docker上部署goweb项目

文章目录一、安装go语言环境①下载go语言环境②解压go语言环境到指定目录③验证是否成功④配置镜像加速二、go语言项目配置第一种&#xff1a;先编译后打包&#xff08;分步部署&#xff0c;靠谱&#xff09;第二种&#xff1a;直接打包法三、成功运行一、安装go语言环境 ①下…

Zabbix 监控 Linux操作系统的监控指标

一、Zabbix 监控 Linux操作系统的监控指标 (仅供参考) Zabbi x默认使用Zabbix agent监控操作系统,其内置的监控项可以满足系统大部分的指标监控,因此,在完成Zabbix agent的安装后,只需在前端页面配置并关联相应的系统监控模板就可以了。 如果内置监控项不能满足监控需求…

走向开放世界强化学习、IJCAI2022论文精选、机器人 RL 工具、强化学习招聘、《强化学习周刊》第73期...

No.73智源社区强化学习组强化学习周刊订阅《强化学习周刊》已经开启“订阅功能”&#xff0c;扫描下面二维码&#xff0c;进入主页&#xff0c;选择“关注TA”&#xff0c;我们会向您自动推送最新版的《强化学习周刊》。本期贡献者&#xff1a;&#xff08;李明&#xff0c;刘青…

【Kotlin】类的继承 ① ( 使用 open 关键字开启类的继承 | 使用 open 关键字开启方法重写 )

文章目录一、使用 open 关键字开启类的继承二、使用 open 关键字开启方法重写一、使用 open 关键字开启类的继承 Kotlin 中的类 默认都是 封闭的 , 无法被继承 , 如果要想类被继承 , 需要在定义类时 使用 open 关键字 ; 定义一个普通的 Kotlin 类 : class Person(val name: S…

分析GC日志来进行JVM调优

不同的垃圾收集器产生的GC日志大致遵循了同一个规则&#xff0c;只是有些许不同&#xff0c;不过对于G1收集器的GC日志和其他垃圾收集器有较大差别&#xff0c;话不多说&#xff0c;正式进入正文。。。 什么时候会发生垃圾收集 首先我们来看一个问题&#xff0c;那就是什么时…

SpringBoot集成Elasticsearch7.4 实战(三)

本篇文章主要讲的是:在Springboot环境下&#xff0c;管理数据1. 数据管理1.1. 新增数据1.1.1. 单实例新增http方式提交数据&#xff0c;案例中我将数据格式做了规范&#xff0c;提交过程中需要指定索引名&#xff0c;以及JSON格式数据&#xff0c;这样尽可能在实际过程中做到通…

图论算法:普里姆算法(C++实现+图解)

文章目录最小生成树普里姆算法实现过程代码实现最小生成树 什么是最小生成树? 对于如图所示的带权无向连通图来说&#xff1a;从图中任意顶点出发&#xff0c;进行dfs或者bfs便可以访问到图中的所有顶点&#xff0c;因此连通图中一次遍历所经过的边的集合以及图中所有顶点的…

libvirt零知识学习2 —— libvirt源码下载

1. libvirt官网主页 libvirt的官网地址为&#xff1a; https://libvirt.org/ 主页如下图所示&#xff1a; 2. libvirt官网下载主页 libvirt的官网下载页地址为&#xff08;在主页中点击“Download”按钮即可跳转到&#xff09;&#xff1a; https://libvirt.org/downloads…

KaiwuDB 首席解决方案专家 金宁:1.0 时序数据库核心功能解读

以下是实录文章精简版欢迎大家点赞、收藏、关注&#xff01;大家好&#xff0c;今天介绍将分为 3 部分&#xff1a;首先从物联网蓬勃发展的时代背景出发&#xff0c;我们一起来看看数据库究竟将面临怎样的挑战与机遇&#xff1b;接着我将为大家详细 KaiwuDB 1.0 时序数据库的核…

(Java高级教程)第四章必备前端基础知识-第一节:HTML

文章目录一&#xff1a;HTML概述&#xff08;1&#xff09;概述&#xff08;2&#xff09;标签&#xff08;3&#xff09;HTML基本结构二&#xff1a;常用标签介绍&#xff08;1&#xff09;注释&#xff08;2&#xff09;标题&#xff08;3&#xff09;段落&#xff08;4&…

React Fragment

首先 我们编写这样一个例子 我们在创建一个react项目 在src的目录下创建components目录 components下创建一个子组件 我这里的名字叫 subset.jsx import React from "react";export default class subset extends React.Component{constructor(props){super(prop…

阿B百大名单公布,有你喜欢的up吗?

阿B在1月13日中午19点30分公布了2022百大UP主名单&#xff0c;那么今年的某站年度UP主都是谁呢&#xff1f;你喜欢的up入选了吗&#xff1f; 咱就来自己查一下都有谁入选了吧~ 我们是用python自动获取名单的哦。 环境使用 python 3.9 pycharm 模块使用 selenium 谷歌驱动 …

Java基础之《netty(26)—netty其他常用编解码器》

一、解码器-ReplayingDecoder 1、函数声明 public abstract class ReplayingDecoder<S> extends ByteToMessageDecoder 2、ReplayingDecoder扩展了ByteToMessageDecoder类&#xff0c;使用这个类&#xff0c;我们不必调用readableBytes()方法。参数S指定了用户状态管理…

【Linux】版本管理工具 Git

目录 一、什么是 Git 二、如何使用 Git 1、创建远程仓库 2、将远端仓库克隆到本地 3、将本地文件添加到仓库 3.1、三板斧第一招&#xff1a;文件添加 3.2、三板斧第二招&#xff1a;提交本地 3.3、三板斧第三招&#xff1a;提交远端 4、删除文件 5、删除仓库 一、什么是 Gi…

postman接口关联

有两种方法&#xff0c;使用json提取器实现接口关联&#xff0c;还有使用正则表达式提取器实现接口关联。方法一&#xff1a;使用json提取器实现接口关联第一个接口&#xff1a;//使用json提取器提取contractID、documentID//把返回的字符串格式的数据转换成对象的形式var resu…

SAP FICO 理解成本中心会计

成本中心会计 一、成本要素 管理会计&#xff08;CO&#xff09;的数据均来源于FI损益类科目&#xff0c;也就是说只有损益类科目才可以创建成本要素&#xff08;必须先创建损益类科目&#xff0c;后创建成本要素&#xff09;&#xff0c; 但是不一定所有的损益类科目都需要…

gma 气象气候函数包的简要介绍及运算过程主要问题说明(内存不足、出现 nan 等)及解决方法

0 概述 0.1 明确气候与气象的概念 气候(Climate)&#xff1a;是指一个地区大气物理特征的长期平均状态&#xff0c;具有一定的稳定性&#xff0c;且周期长。根据世界气象组织&#xff08;WMO&#xff09;的规定&#xff0c;一个标准气候计算时间为 30 年。 气象(Meteorology)&…

【论文笔记】一文读懂残差网络ResNet(附代码)

Residual Net论文笔记1. 传统深度网络的问题2. 残差结构和残差网络2.1 残差是什么2.2 残差模块 Residual Block2.3 基本模块BasicBlock和BottleNeck2.4 残差网络ResNet设计2.4.1 恒等映射与残差的连接3. Forward/Backward Propagation3.1 Forward propogation3.2 Back Propogat…

深信服行为感知命令执行漏洞

深信服行为感知命令执行漏洞1.深信服行为感知漏洞1.1.漏洞描述1.2.漏洞影响1.3.漏洞复现1.3.1.登录页面1.3.2.构建漏洞URL1.3.2.1.查询IP地址1.3.2.2.查询当前目录下文件1.深信服行为感知漏洞 1.1.漏洞描述 深信服 行为感知系统c.php远程命令执行漏洞&#xff0c;使用与EDR相同…

Docker搭建kafka集群

Docker搭建kafka集群集群规划镜像版本kafka为什么需要依赖zookeeper创建docker网络搭建zk集群新建文件docker-compose-zk.yml启动搭建kafka集群新建docker-compose-kafka.yml启动集群安装kafka-manager新建 docker-compose-kafka-manager.yml启动kafka-manager配置cluster修改k…