大数据Flink(一百一十八):Flink SQL水印操作(Watermark)

news2024/9/19 7:53:17

文章目录

Flink SQL水印操作(Watermark)

一、为什么要有WaterMark

二、​​​​​​​​​​​​​​Watermark解决的问题

三、​​​​​​​​​​​​​​代码演示


Flink SQL水印操作(Watermark)

一、​​​​​​​为什么要有WaterMark

当 flink 以 EventTime 模式处理流数据时,它会根据数据里的时间戳来处理基于时间的算子。但是由于网络、分布式等原因,会导致数据乱序的情况。如下图所示:

假设在一个5秒的Tumble窗口,有一个EventTime是 11秒的数据,在第16秒时候到来了。图示第11秒的数据,在16秒到来了,如下图:该如何处理迟到数据

二、​​​​​​​​​​​​​​Watermark解决的问题

上面的问题在于如何将迟来的EventTime 为11的元素正确处理?

当Watermark的时间戳等于Event中携带的EventTime时候,上面场景(Watermark=EventTime)的计算结果如下:

如果想正确处理迟来的数据可以定义Watermark生成策略为 Watermark = EventTime -5s, 如下: 

通过watermark来解决,简单来说就是延迟窗口关闭的时间,等一会迟到的数据,窗口关闭不在依据数据的时间,而是到达的watermark的时间。

watermark可以理解为一个特殊的数据,这个数据不参与计算,仅仅是对窗口的触发关闭起作用。

三、​​​​​​​​​​​​​​代码演示

  • 使用Socket模拟接收数据
  • 设置WaterMark
    • 设置的逻辑:在第一条数据进来时,设置WaterMark为0,指定第一条数据的时间戳后,获取该时间戳与当前 WaterMark的最大值,并将最大值设置为下一条数据的WaterMark,以此类推
  • 使用滚动Event Time窗口,将5秒内的同组数据,进行聚合输出
CREATE TABLE watermark_zero (
item STRING,
ts TIMESTAMP(3), -- TIMESTAMP 类型的时间戳
WATERMARK FOR ts AS ts - INTERVAL '0' SECOND
) WITH (
'connector' = 'socket',
'hostname' = '178.23.142.233',
'port' = '9999',
'format' = 'csv'
);

SELECT
date_format(TUMBLE_START(ts, INTERVAL '5' SECOND),'yyyy-MM-dd hh:mm:ss.SSS') AS window_start,
date_format(TUMBLE_END(ts, INTERVAL '5' SECOND),'yyyy-MM-dd hh:mm:ss.SSS') AS window_end,
date_format(TUMBLE_ROWTIME(ts, INTERVAL '5' SECOND),'yyyy-MM-dd hh:mm:ss.SSS') as window_rowtime,
item,count(item) as total_item
FROM watermark_zero
GROUP BY TUMBLE(ts, INTERVAL '5' SECOND), item;

若输入第一条数据:hello,2022-03-25 16:39:45

那么,我先假设后续的数据Event Time间隔为1秒,推断一下WaterMark的设定,如下图所示

1.第一条数据的Event Time为1648197585000,那么当前窗口时间为:1648197585000-> 1648197589000,即下图中红色框线

2.第一条数据进来时,这条数据之前的WaterMark为0,当第一条数据已经进入后,指定Event Time位置,并与现在的WaterMark比较,将两者中大的那个值设置为新的WaterMark,那么当前数据的WaterMark为1648197585000

3.第二条数据进来时,前一条数据的WaterMark为1648197585000,第二条数据的Event Time比之前的WaterMark大,于是更新WaterMark,将当前的WaterMark更新为1648197586000,但还没到窗口触发时间,不进行计算

4.后面几个以此类推,直到Event Time为:1648197590000的数据进来的时候,前一条数据的WaterMark为1648197589000,于是更新当前的WaterMark为1648197590000,Flink认为1648197590000之前的数据都已经到达,且达到了窗口的触发条件,开始进行计算

根据上面的推断,启动程序验证一下,向9999端口监听终端输入以下内容:

hello,2022-03-25 16:39:45
hello,2022-03-25 16:39:46
hello,2022-03-25 16:39:47
hello,2022-03-25 16:39:48
hello,2022-03-25 16:39:49
hello,2022-03-25 16:39:50

 Flink输出结果:

Rowtime列在经过窗口操作后,其Event Time属性将丢失。可以使用辅助函数TUMBLE_ROWTIME、HOP_ROWTIME或SESSION_ROWTIME,获取窗口中的Rowtime列的最大值max(rowtime)作为时间窗口的Rowtime,其类型是具有Rowtime属性的TIMESTAMP,取值为 window_end - 1 。 例如[00:00, 00:15) 的窗口,返回值为00:14:59.999 。

数据乱序的场景

上面的实例,Event Time是有序,现在来做一下数据乱序的场景模拟启动程序(注意要关闭之前的查询,重新运行查询语句),在监听终端中输入如下数据:

其中,在触发了了第一个窗口计算后,又来了两条迟到数据hello,2022-03-25 16:39:47,hello,2022-03-25 16:39:46

hello,2022-03-25 16:39:45
hello,2022-03-25 16:39:46
hello,2022-03-25 16:39:47
hello,2022-03-25 16:39:48
hello,2022-03-25 16:39:49
hello,2022-03-25 16:39:50
hello,2022-03-25 16:39:47
hello,2022-03-25 16:39:46
hello,2022-03-25 16:39:51
hello,2022-03-25 16:39:52
hello,2022-03-25 16:39:53
hello,2022-03-25 16:39:54
hello,2022-03-25 16:39:55

Flink结果:

从结果中可以看到,在第二个窗口中,那两条迟到数据并没有进行处理,这个就是迟到丢弃

乱序时间的设置:

为了解决上面的问题,我们允许Flink处理延迟在5秒内的迟到数据

修改最大乱序时间(新建的表仅水印与之前不同)

CREATE TABLE watermark_five (
item STRING,
ts TIMESTAMP(3), -- TIMESTAMP 类型的时间戳
WATERMARK FOR ts AS ts - INTERVAL '5' SECOND
) WITH (
'connector' = 'socket',
'hostname' = '178.23.142.233',
'port' = '9999',
'format' = 'csv'
);

SELECT
date_format(TUMBLE_START(ts, INTERVAL '5' SECOND),'yyyy-MM-dd hh:mm:ss.SSS') AS window_start,
date_format(TUMBLE_END(ts, INTERVAL '5' SECOND),'yyyy-MM-dd hh:mm:ss.SSS') AS window_end,
date_format(TUMBLE_ROWTIME(ts, INTERVAL '5' SECOND),'yyyy-MM-dd hh:mm:ss.SSS') as window_rowtime,
item,count(item) as total_item
FROM watermark_five
GROUP BY TUMBLE(ts, INTERVAL '5' SECOND), item;

在监听终端中,输入数据

hello,2022-03-25 16:39:45
hello,2022-03-25 16:39:46
hello,2022-03-25 16:39:47
hello,2022-03-25 16:39:48
hello,2022-03-25 16:39:49
hello,2022-03-25 16:39:50
hello,2022-03-25 16:39:47
hello,2022-03-25 16:39:46
hello,2022-03-25 16:39:51
hello,2022-03-25 16:39:52
hello,2022-03-25 16:39:53
hello,2022-03-25 16:39:54
hello,2022-03-25 16:39:55

Flink输出结果:  

可以看到,之前迟到的两条数据在第一个窗口中进行了处理。因为设置了最大允许乱序时间后,WaterMark要比原来低5秒,可以对延迟5秒内的数据进行处理,窗口的触发条件也同样会往后延迟关于延迟时间,请结合业务场景进行设置。


  • 📢博客主页:https://lansonli.blog.csdn.net
  • 📢欢迎点赞 👍 收藏 ⭐留言 📝 如有错误敬请指正!
  • 📢本文由 Lansonli 原创,首发于 CSDN博客🙉
  • 📢停下休息的时候不要忘了别人还在奔跑,希望大家抓紧时间学习,全力奔赴更美好的生活✨

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2145819.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

【数据结构】数据结构系列学习笔记——导航篇

一:概述 数据结构是计算机科学中的核心概念之一,是优化算法性能和资源利用率的关键。在软件开发和数据处理中,选择合适的数据结构对于算法的效率至关重要。数据结构的选择通常基于数据的使用模式,包括数据元素之间的关系、数据的存…

日志框架的使用

一、日志概述 日志:用来记录程序运行过程中的信息,并可以进行永久存储。 开发过程中可能会出现以下需求: 希望系统能记住某些数据是被谁操作的,比如被谁删除了?想分析用户浏览系统的具体情况,以便挖掘用…

【深度学习】深度学习模型的加密及解密方案及源码

本文摘要 本文主要根据自己遇到的情况,例如:对于yolo或paddle训练的模型文件,对外使用,不想要别人拿到我的模型文件随意乱用,此时就涉及到对模型文件进行加密与解密 深度学习模型的加密保护非常重要,尤其在商业应用场景下。常见的模型加密方法包括模型文件加密、加密硬件…

图像分割基本知识

计算机视觉和图像处理 Tensorflow入门深度神经网络图像分类目标检测图像分割 图像分割 一、目标分割1.1 图像分割的定义1.2 任务类型1.2.1 任务描述1.2.2 任务类型 二、语义分割2.1 FCN网络2.1.1网络结构 2.2 Unet网络 三、UNet案例3.1 数据集获取3.1.1 设置相关信息3.1.2 图像…

nature communications |多层次蛋白质组分析揭示弥漫型和肠型胃癌之间的分子多样性

文章信息 发表期刊:nature communications 发表日期:2023年2月14日 影响因子:14.7 研究背景 胃癌是世界上主要的癌症类型之一。弥漫型胃癌(DGC)和肠型胃癌(IGC)是胃癌(GC)的主要组织学类型,DGC呈分散的细胞组织,黏…

比特币10年价格数据(2014-2024)分析(进阶2_时间序列分析)

数据入口:【每周挑战】比特币10年价格数据可视化和量化分析 - Heywhale.com 本数据集包含 2014 - 2024 的比特币美元价格数据,具体包含比特币每日的开盘价、最高价、最低价、收盘价以及成交量等关键信息。数据说明如下: 字段说明Date日期&a…

iPhone 16系列:摄影艺术的全新演绎,探索影像新境界

在科技的浪潮中,智能手机摄影功能的进化从未停歇。 苹果公司即将推出的iPhone 16系列,以其卓越的相机升级和创新特性,再次站在了手机摄影的前沿。 从硬件到软件,从拍照体验到图像处理,iPhone 16系列都展现了其在移动…

camtasia2024绿色免费安装包win+mac下载含2024最新激活密钥

Hey, hey, hey!亲爱的各位小伙伴,今天我要给大家带来的是Camtasia2024中文版本,这款软件简直是视频制作爱好者的福音啊! camtasia2024绿色免费安装包winmac下载,点击链接即可保存。 先说说这个版本新加的功能吧&#…

Mapsui:一个 .NET 开源的地图组件库

前言 今天大姚给大家分享一个.NET开源(MIT License)、免费、同时支持多平台框架(MAUI、WPF、Avalonia、Uno、Blazor、WinUI、Eto、.NET Android 和 .NET iOS)地图组件库:Mapsui。 项目源代码 支持的UI框架的NuGet包 创…

华为OD机试 - 查字典(Python/JS/C/C++ 2024 E卷 100分)

华为OD机试 2024E卷题库疯狂收录中,刷题点这里 专栏导读 本专栏收录于《华为OD机试真题(Python/JS/C/C)》。 刷的越多,抽中的概率越大,私信哪吒,备注华为OD,加入华为OD刷题交流群,…

【研发日记】嵌入式处理器技能解锁(六)——ARM的Cortex-M4内核

文章目录 前言 背景介绍 指令集架构 ARM起源 ARM分类 Cortex-M4 内核框架 指令流水线 实践应用 总结 参考资料 前言 见《【研发日记】嵌入式处理器技能解锁(一)——多任务异步执行调度的三种方法》 见《【研发日记】嵌入式处理器技能解锁(二)——TI C2000 DSP的SCI(…

管理依赖版本-maven工程parent项目巧配置

本文目标:开发人员,在了解pom文件properties、dependencyManagement标签用法的条件下,进行依赖包版本统一维护,达到统一维护项目依赖jar包版本的程度。 文章目录 1 场景2 要点3 总结/练习 1 场景 maven工程多模块项目,…

数据库基础知识---------------------------(2)

MYSQL的存储过程 就是数据库 SQL 语言层面的代码封装与重用 语法格式 delimiter 自定义结束符号 create procedure 存储名({in,out,inout} 参数名,数据类型...) begin sql 语句 end 自定义结束符 delimiter; 变量定义 局部变量 用户自定义 仅在begin / end 块中有效 当将查询…

高效开发,从暗藏玄机的文件系统开始—合宙Air201资产定位模组LuatOS

超低功耗、精准定位、快速量产——迷你小巧的合宙Air201,正给越来越多的行业客户带来高效开发体验。 4G-Cat.1模组的文件系统关乎数据传输速度、存储效率,以及数据安全性等等诸多因素,在应用开发中极为重要。 本期,我们来学习合…

微型导轨在3D打印设备中的应用与实践

微型导轨的应用范围非常广泛,尤其在追求高精度、高效率及低噪音的现代打印技术中扮演着重要角色。微型导轨在3D打印机等精密设备中是常用元件,以提高打印质量和效率。 在打印机中,无论是喷墨式、激光式还是3D打印机,都需要精确的打…

JDBC编程详细总结

一、JDBC编程 JDBC编程有标准步骤(八股文) 注册驱动 将sql语句的运行环境加载到JVM 连接数据库 获得执行SQL的对象 执行SQL语句,获得结果 关流 1、 注册驱动 Class.forName("com.mysql.jdbc.Driver");//5.7版本 加载驱动 Class.forName("com.mysql.cj.jdb…

为什么收录是谷歌seo的底子?

收录是谷歌SEO的基础,因为它决定了网站页面能否被用户找到。只有被谷歌收录的页面,才有机会在搜索结果中出现。如果页面没有被收录,谷歌根本就不知道它的存在,这意味着即使内容再好、关键词再精准,也不会有任何排名 被…

fo-dicom,第一个基于.NET Standard 2.0 开发的DICOM开源库

1. 简介: fo-dicom是一个基于C#开发的库,用于处理DICOM(Digital Imaging and Communications in Medicine)格式的数据。DICOM是一种用于医学影像和相关信息的标准格式,广泛应用于医学领域。fo-dicom提供了多平台支持&…

华为OD机试 - 报数问题 - 约瑟夫环(Python/JS/C/C++ 2024 E卷 100分)

华为OD机试 2024E卷题库疯狂收录中,刷题点这里 专栏导读 本专栏收录于《华为OD机试真题(Python/JS/C/C)》。 刷的越多,抽中的概率越大,私信哪吒,备注华为OD,加入华为OD刷题交流群,…

浸没边界法精度相关的论文的阅读笔记

Convergence proof of the velocity field for a stokes flow immersed boundary method https://doi.org/10.1002/cpa.20233 研究对象的选取 他这里为什么能够选取一个周期性边界的流场啊?为什么不是狄利克雷边界或者诺伊曼边界? 方形流场的边界值 …