Flink -- 事件时间 Watermark

news2025/1/11 12:42:23
1、事件时间:

        指的是数据产生的时间或是说是数据发生的时间。

在Flink中有三种时间分别是:

                Event Time:事件时间,数据产生的时间,可以反应数据真实发生的时间

                Infestion Time:事件接收时间

                Processing Time:事件处理时间

为什么会提出事件时间这个概念?

        因为当使用Processing Time(事件的处理时间)来对数据进行处理,此时数据可能会乱序,没有办法还原数据本身的时间顺序,这种情况在Flink中会可能导致数据丢失,如果使用事件时间,它会根据事件真实发生的时间对数据排序,就不会出现数据乱序的情况。

总结来说,数据产生的时间就是事件时间,现实中实时的时间就是事件的处理时间

2、Processing Time 事件处理时间

处理时间是接收数据过后对数据操作的时间。处理时间的会按照实时的时间触发。

public class Demo03ProcessingTime {
    public static void main(String[] args)  throws Exception{
        /**
         * 数据处理时间:一般会结合窗口使用,一般值的是接受数据后对数据操作的时间
         * 需求:每过5秒中统计15秒内的单词的数量
         */

        //构建Flink的环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        //使用socket模拟实时的操作
        DataStreamSource<String> wordDS = env.socketTextStream("master", 8888);
        //将接受的数据的转换成kv的格式
        SingleOutputStreamOperator<Tuple2<String, Integer>> kvDS = wordDS
                .map(word -> Tuple2.of(word, 1), Types.TUPLE(Types.STRING,Types.INT));
        //按照单词进行分组
        KeyedStream<Tuple2<String, Integer>, String> keyByDS = kvDS
                .keyBy(key -> key.f0);
        //划分窗口,窗口的大小是10秒钟,滑动的时间是5秒钟
        WindowedStream<Tuple2<String, Integer>, String, TimeWindow> windowDS = keyByDS .
                window(SlidingProcessingTimeWindows.of(Time.seconds(10), Time.seconds(5)));
        //对统计的单词进行求和
        SingleOutputStreamOperator<Tuple2<String, Integer>> countDS = windowDS.sum(1);

        countDS.print();
        //启动Flink
        env.execute();

    }
}
3、事件时间:

数据产生的时间就是事件时间,不过在使用的时候使用的是时间戳。需要注意的是数据的时间与现实的时间是不一致的。

在使用事件时间的时候需要注意的是打入数据的数据时间是需要按照时间的顺序打入,否则数据就会丢失(也可以不按照顺序打入,后面有解决办法)

java,1699035731000
java,1699035732000
java,1699035735000
java,1699035733000
java,1699035736000
java,1699035737000
java,1699035740000


例如上述:数据总共有两个部分组成,前面是单词,后面的是单词数据产生的时间戳


public class Demo04EventTime {
    public static void main(String[] args) throws Exception{
        /**
         * 需求:统计5秒内的单词的数量,使用的是事件时间滚动窗口
         * 触发的条件是事件时间5秒
         */
        //构建flink的环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        //需要并行度改成一
        env.setParallelism(1);
        //使用socket模拟实时的环境
        DataStreamSource<String> lineDS = env.socketTextStream("master", 8888);
        /**
         * java,1699035731000
         * java,1699035732000
         * java,1699035735000
         * java,1699035733000
         * java,1699035736000
         * java,1699035737000
         * java,1699035740000
         */
        //此时的数据的格式并不是某一个单词,需要告诉flink哪一个是事件时间
        //首先对数据进行格式处理
        SingleOutputStreamOperator<Tuple2<String, Long>> kvDS = lineDS.map(line -> {
            String[] split = line.split(",");
            String word = split[0];
//            String time = split[1];
            long time1 = Long.parseLong(split[1]);
            return Tuple2.of(word, time1);
        }, Types.TUPLE(Types.STRING, Types.LONG));

        //告诉Flink,哪一个是事件的时间
        SingleOutputStreamOperator<Tuple2<String, Long>> assDS = kvDS.assignTimestampsAndWatermarks(
                WatermarkStrategy
                        .<Tuple2<String, Long>>forBoundedOutOfOrderness(Duration.ofSeconds(5))
                        //指定事件时间
                .withTimestampAssigner((kv, ts) -> kv.f1)

        );

        //统计5秒钟的单词的数量
        DataStream<Tuple2<String, Integer>>keyByDS = assDS
                .map(kv -> Tuple2.of(kv.f0, 1),Types.TUPLE(Types.STRING,Types.INT));
        //按照单词进行分组
        KeyedStream<Tuple2<String, Integer>, String> keyByDS1= keyByDS.keyBy(kv -> kv.f0);
        //开窗
        WindowedStream<Tuple2<String, Integer>, String, TimeWindow> windowDS = keyByDS1.window(TumblingEventTimeWindows.of(Time.seconds(5)));

        //对单词的数量进行统计
        SingleOutputStreamOperator<Tuple2<String, Integer>> countDS = windowDS.sum(1);
        //打印数据
        countDS.print();
        //执行Flink的环境
        env.execute();

    }
}
        1、基于事件时间来说,触发窗口的条件:
    1、水位线需要大于等于窗口的结束时间

    2、窗口里面要存在数据

    3、窗口的划分时间是从1970年1月1日0时0分0秒开始的,按照窗口的大小轮替
4、水位线(watermark):默认是等于最新的一条数据的时间戳

5、在使用事件时间的时候需要注意的是打入数据的数据时间是需要按照时间的顺序打入,否则数据就会丢失(也可以不按照顺序打入,后面有解决办法)
        解决方法:将水位线向后推移

假设一个时间窗口是5秒,如果将此时的水位线向后推移5秒,假设4进入的时候,此时的水位线就变成-3,但是此时就不满足触发窗口的条件,此时假设遗漏的数据是3,此时的水位线依旧是小于窗口的时间,依旧不会触发窗口。

但是不能完全的保证数据不丢失,推移的时间越久,对于Flink的延迟就会越大。

        1、在Flink中是默认使用的是单调递增的时间戳分配器:在没有乱序情况下,默认水位线是等于最新的一条数据的时间戳
  //1、需要告诉flink哪一个字段是时间字段
        //设置时间字段和水位线
        DataStream<Tuple2<String, Long>> assDS = wordAndTsDS.assignTimestampsAndWatermarks(
                WatermarkStrategy
                        //1、指定水位线等于时间最新一条数据的时间戳,数据不存在乱序的时候使用,如果数据乱序,可能会丢失数据
                        .<Tuple2<String, Long>>forMonotonousTimestamps()

                        //指定时间字段
                        .withTimestampAssigner((kv, ts) -> kv.f1)
        );
        2、数据之间存在最大固定延迟的时间戳分配器:在乱序的情况下,就水位线先后推移固定的时间(是以最新的一条数据的时间戳为标准的)
  //1、需要告诉flink哪一个字段是时间字段
        //设置时间字段和水位线
        DataStream<Tuple2<String, Long>> assDS = wordAndTsDS.assignTimestampsAndWatermarks(
                WatermarkStrategy

                        //1、水位线生成方式:最新一条数据的时间戳减去5秒,会导致计算延迟触发
                        .<Tuple2<String,Long>>forBoundedOutOfOrderness(Duration.ofSeconds(5))
                        //指定时间字段
                        .withTimestampAssigner((kv, ts) -> kv.f1)
        );
6、水位线的生成:

上图表示的是以Flink的流程图,图中总共有两个并行度,每一个Task上面都带着任务的时间,在Flink中,会将任务的时间向后传递,当途中上游map(1)将任务时间传递给下游window(1)时,下面的上游map(2)也会任务时间传递给下游window(1)(上游的任务是并行的),此时下游window(1)就会产生两个任务时间,此时就会选择时间最小的时间的作为水位线。因为当选择时间大的作为水位线,那么对于时间较小的数据可能会丢失。

        1、水位线对齐:

                因为上游的任务是并行执行的,指的时对于上游的所有的Task的水位线都需要逐步的向后推移。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1182806.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

【机器学习2】模型评估

模型评估主要分为离线评估和在线评估两个阶段。 针对分类、 排序、 回归、序列预测等不同类型的机器学习问题&#xff0c; 评估指标的选择也有所不同。 1 评估指标 1.1准确率 准确率是指分类正确的样本占总样本个数的比例 但是准确率存在明显的问题&#xff0c;比如当负样本…

互联网Java工程师面试题·Spring篇·第六弹

目录 ​编辑 21.什么是 Spring beans? 22、一个 Spring Bean 定义 包含什么&#xff1f; 23、如何给 Spring 容器提供配置元数据? 24、你怎样定义类的作用域? 25、解释 Spring 支持的几种 bean 的作用域。 26、Spring 框架中的单例 bean 是线程安全的吗? 27、解释 …

C/C++(a/b)*c的值 2021年6月电子学会青少年软件编程(C/C++)等级考试一级真题答案解析

目录 C/C(a/b)*c的值 一、题目要求 1、编程实现 2、输入输出 二、算法分析 三、程序编写 四、程序说明 五、运行结果 六、考点分析 C/C(a/b)*c的值 2021年6月 C/C编程等级考试一级编程题 一、题目要求 1、编程实现 给定整数a、b、c&#xff0c;计算(a / b)*c的值&…

专业128分总分390+上岸中山大学884信号与系统电通院考研经验分享

专业课884 信号系统 过年期间开始收集报考信息&#xff0c;找到了好几个上岸学姐和学长&#xff0c;都非常热情&#xff0c;把考研的准备&#xff0c;复习过程中得与失&#xff0c;都一一和我分享&#xff0c;非常感谢。得知这两年专业课难度提高很多&#xff0c;果断参加了学长…

智能网联汽车基础软件信息安全需求分析

目录 1.安全启动 2.安全升级 3.安全存储 4.安全通信 5.安全调试 6.安全诊断 7.小结 1.安全启动 对于MCU&#xff0c;安全启动主要是以安全岛BootROM为信任根&#xff0c;在MCU启动后&#xff0c;用户程序运行前&#xff0c;硬件加密模块采用逐级校验、并行校验或者混合校…

【电子通识】USB Logo的标识含义

USB 图标的设计灵感是来自罗马神话中的海神尼普顿(Neptune)&#xff08;也是海王星的名字&#xff09;的武器「三叉戟」&#xff0c;一支强有力的三齿鱼叉。不过&#xff0c;为了避免鱼叉形状的设计暗示人们拿着自己的USB 存储设备到处乱插&#xff08;叉&#xff09;。设计师对…

Failed to load module script 解决方案

Failed to load module script: Expected a JavaScript module script but the server responded with a MIME type of “text/html”. Strict MIME type checking is enforced for module scripts per HTML spec. 使用vite build 打包后部署到生产后报这个错误 原因: 资源路…

淘宝API技术文档解析,从入门到实战

探索淘宝数据的奥秘&#xff0c;淘宝是目前国内最大的B2C电商平台之一&#xff0c;每天都会产生海量的数据。借助淘宝API技术文档&#xff0c;我们可以轻松地获取到这些数据&#xff0c;从而为电商运营和数据分析提供有力支持。 1.什么是淘宝API&#xff1f; 淘宝API&#xf…

微前端qiankun嵌入vue项目后iconfont显示方块

个人项目地址&#xff1a; SubTopH前端开发个人站 &#xff08;自己开发的前端功能和UI组件&#xff0c;一些有趣的小功能&#xff0c;感兴趣的伙伴可以访问&#xff0c;欢迎提出更好的想法&#xff0c;私信沟通&#xff0c;网站属于静态页面&#xff09; SubTopH前端开发个人…

关于涉及频谱分辨率的一些问题以及FFT幅度谱数值矫正问题的梳理

问题 在研究matlab的FFT函数的时候发现了如下问题&#xff1a;对于信号 y e j 2 π f 1 t e j 2 π f 2 t e j 2 π f 3 t ye^{j2\pi f_1t}e^{j2\pi f_2t}e^{j2\pi f_3t} yej2πf1​tej2πf2​tej2πf3​t 其中 f 1 500 H z f_1500Hz f1​500Hz&#xff0c; f 2 505 H z…

Ripro-V5 6.4最新版 不限域名无限搭建(授权激活文件)

RiPro主题全新V5版本&#xff0c;是一个优秀且功能强大、易于管理、现代化的WordPress虚拟资源商城主题。支持首页模块化布局和WP原生小工具模块化首页可拖拽设置&#xff0c;让您的网站设计体验更加舒适。同时支持了高级筛选、自带会员生态系统、超全支付接口等众多功能&#…

基于SpringBoot+Vue的博物馆管理系统

基于springbootvue的博物馆信息管理系统的设计与实现~ 开发语言&#xff1a;Java数据库&#xff1a;MySQL技术&#xff1a;SpringBootMyBatisVue工具&#xff1a;IDEA/Ecilpse、Navicat、Maven 系统展示 主页 登录界面 管理员界面 用户界面 摘要 基于SpringBoot和Vue的博物馆…

RFSoC Debug:Petalinux 不显示 flash选项

这个板子和NI的X410是一样的。 问题 不显示Flash选项 [*] Advanced bootable images storage Settings ---> boot image settings ---> Image storage media (primary flash) --->解决 在Block Design中添加SD卡或者Flash选项&#xff0c;否则就不会显示&#xff1…

大数据基础入门

大数据入门 认识大数据 1.1 Web 2.0造就大数据&#xff08;Ajax&#xff09; 1.2单服务器时代 1.3数据的价值 企业成长模式以及数据分析的重要性 技术 分析 售前 大数据开发技术与架构 2.1大数据生态圈 2.2大数据版图 2.3实际运行环境 2.4大数据与传统项目整合 第3节 准…

C# OpenCvSharp 去除字母后面的杂线

效果 项目 代码 using OpenCvSharp; using System; using System.Drawing; using System.Windows.Forms;namespace OpenCvSharp_Demo {public partial class frmMain : Form{public frmMain(){InitializeComponent();}string image_path "";private void Form1_Loa…

Xilinx FPGA SPIx4 配置速度50M约束语句(Vivado开发环境)

qspi_50m.xdc文件&#xff1a; set_property BITSTREAM.GENERAL.COMPRESS TRUE [current_design] set_property BITSTREAM.CONFIG.SPI_BUSWIDTH 4 [current_design] set_property BITSTREAM.CONFIG.CONFIGRATE 50 [current_design] set_property CONFIG_VOLTAGE 3.3 [curren…

C++(Qt)软件调试---下载和安装最新版Windbg(16)

C(Qt)软件调试—下载和安装最新版Windbg&#xff08;16&#xff09; 文章目录 C(Qt)软件调试---下载和安装最新版Windbg&#xff08;16&#xff09;1、前言2、在线安装1.1 安装方法一1.2 安装方法二 3、离线安装 1、前言 Windbg是微软开发的一款强大的调试工具&#xff0c;它主…

Python--- lstrip()--删除字符串两边的空白字符、rstrip()--删除字符串左边的空白字符、strip()--删除字符串右边的空白字符

strip() 方法主要作用&#xff1a;删除字符串两边的空白字符&#xff08;如空格&#xff09; lstrip() 方法 left strip&#xff0c;作用&#xff1a;只删除字符串左边的空白字符 rstrip() 方法&#xff0c;作用&#xff1a;只删除字符串右边的空白字符 strip 英 /strɪp…

【公益案例展】火山引擎公益电子票据服务——连接善意,共创美好

‍ 火山引擎公益案例 本项目案例由火山引擎投递并参与数据猿与上海大数据联盟联合推出的 #榜样的力量# 《2023中国数据智能产业最具社会责任感企业》榜单/奖项”评选。 大数据产业创新服务媒体 ——聚焦数据 改变商业 捐赠票据是慈善组织接受捐赠后给捐赠方开具的重要凭证&…

网络安全-零基础小白自学要点

1.网络安全是什么 网络安全可以基于攻击和防御视角来分类&#xff0c;我们经常听到的 “红队”、“渗透测试” 等就是研究攻击技术&#xff0c;而“蓝队”、“安全运营”、“安全运维”则研究防御技术。 2.网络安全市场 一、是市场需求量高&#xff1b; 二、则是发展相对成熟…