大数据Flink(一百一十八):SQL水印操作(Watermark)

news2024/9/29 5:26:20

文章目录

​​​​​​SQL水印操作(Watermark)

一、为什么要有WaterMark

二、​​​​​​​Watermark解决的问题

三、​​​​​​​​​​​​​​代码演示


​​​​​​SQL水印操作(Watermark)

一、​​​​​​​为什么要有WaterMark

当 flink 以 EventTime 模式处理流数据时,它会根据数据里的时间戳来处理基于时间的算子。但是由于网络、分布式等原因,会导致数据乱序的情况。如下图所示:

假设在一个5秒的Tumble窗口,有一个EventTime是 11秒的数据,在第16秒时候到来了。图示第11秒的数据,在16秒到来了,如下图:该如何处理迟到数据

二、​​​​​​​​​​​​​​Watermark解决的问题

上面的问题在于如何将迟来的EventTime 为11的元素正确处理?

当Watermark的时间戳等于Event中携带的EventTime时候,上面场景(Watermark=EventTime)的计算结果如下:

如果想正确处理迟来的数据可以定义Watermark生成策略为 Watermark = EventTime -5s, 如下: 

通过watermark来解决,简单来说就是延迟窗口关闭的时间,等一会迟到的数据,窗口关闭不在依据数据的时间,而是到达的watermark的时间。

watermark可以理解为一个特殊的数据,这个数据不参与计算,仅仅是对窗口的触发关闭起作用。

三、​​​​​​​​​​​​​​代码演示

  • 使用Socket模拟接收数据
  • 设置WaterMark
    • 设置的逻辑:在第一条数据进来时,设置WaterMark为0,指定第一条数据的时间戳后,获取该时间戳与当前 WaterMark的最大值,并将最大值设置为下一条数据的WaterMark,以此类推
  • 使用滚动Event Time窗口,将5秒内的同组数据,进行聚合输出
CREATE TABLE watermark_zero (
item STRING,
ts TIMESTAMP(3), -- TIMESTAMP 类型的时间戳
WATERMARK FOR ts AS ts - INTERVAL '0' SECOND
) WITH (
'connector' = 'socket',
'hostname' = '178.23.142.233',
'port' = '9999',
'format' = 'csv'
);

SELECT
date_format(TUMBLE_START(ts, INTERVAL '5' SECOND),'yyyy-MM-dd hh:mm:ss.SSS') AS window_start,
date_format(TUMBLE_END(ts, INTERVAL '5' SECOND),'yyyy-MM-dd hh:mm:ss.SSS') AS window_end,
date_format(TUMBLE_ROWTIME(ts, INTERVAL '5' SECOND),'yyyy-MM-dd hh:mm:ss.SSS') as window_rowtime,
item,count(item) as total_item
FROM watermark_zero
GROUP BY TUMBLE(ts, INTERVAL '5' SECOND), item;

若输入第一条数据:hello,2022-03-25 16:39:45

那么,我先假设后续的数据Event Time间隔为1秒,推断一下WaterMark的设定,如下图所示

1.第一条数据的Event Time为1648197585000,那么当前窗口时间为:1648197585000-> 1648197589000,即下图中红色框线

2.第一条数据进来时,这条数据之前的WaterMark为0,当第一条数据已经进入后,指定Event Time位置,并与现在的WaterMark比较,将两者中大的那个值设置为新的WaterMark,那么当前数据的WaterMark为1648197585000

3.第二条数据进来时,前一条数据的WaterMark为1648197585000,第二条数据的Event Time比之前的WaterMark大,于是更新WaterMark,将当前的WaterMark更新为1648197586000,但还没到窗口触发时间,不进行计算

4.后面几个以此类推,直到Event Time为:1648197590000的数据进来的时候,前一条数据的WaterMark为1648197589000,于是更新当前的WaterMark为1648197590000,Flink认为1648197590000之前的数据都已经到达,且达到了窗口的触发条件,开始进行计算

根据上面的推断,启动程序验证一下,向9999端口监听终端输入以下内容:

hello,2022-03-25 16:39:45
hello,2022-03-25 16:39:46
hello,2022-03-25 16:39:47
hello,2022-03-25 16:39:48
hello,2022-03-25 16:39:49
hello,2022-03-25 16:39:50

 Flink输出结果:

Rowtime列在经过窗口操作后,其Event Time属性将丢失。可以使用辅助函数TUMBLE_ROWTIME、HOP_ROWTIME或SESSION_ROWTIME,获取窗口中的Rowtime列的最大值max(rowtime)作为时间窗口的Rowtime,其类型是具有Rowtime属性的TIMESTAMP,取值为 window_end - 1 。 例如[00:00, 00:15) 的窗口,返回值为00:14:59.999 。

数据乱序的场景

上面的实例,Event Time是有序,现在来做一下数据乱序的场景模拟启动程序(注意要关闭之前的查询,重新运行查询语句),在监听终端中输入如下数据:

其中,在触发了了第一个窗口计算后,又来了两条迟到数据hello,2022-03-25 16:39:47,hello,2022-03-25 16:39:46

hello,2022-03-25 16:39:45
hello,2022-03-25 16:39:46
hello,2022-03-25 16:39:47
hello,2022-03-25 16:39:48
hello,2022-03-25 16:39:49
hello,2022-03-25 16:39:50
hello,2022-03-25 16:39:47
hello,2022-03-25 16:39:46
hello,2022-03-25 16:39:51
hello,2022-03-25 16:39:52
hello,2022-03-25 16:39:53
hello,2022-03-25 16:39:54
hello,2022-03-25 16:39:55

Flink结果:

从结果中可以看到,在第二个窗口中,那两条迟到数据并没有进行处理,这个就是迟到丢弃

乱序时间的设置:

为了解决上面的问题,我们允许Flink处理延迟在5秒内的迟到数据

修改最大乱序时间(新建的表仅水印与之前不同)

CREATE TABLE watermark_five (
item STRING,
ts TIMESTAMP(3), -- TIMESTAMP 类型的时间戳
WATERMARK FOR ts AS ts - INTERVAL '5' SECOND
) WITH (
'connector' = 'socket',
'hostname' = '178.23.142.233',
'port' = '9999',
'format' = 'csv'
);

SELECT
date_format(TUMBLE_START(ts, INTERVAL '5' SECOND),'yyyy-MM-dd hh:mm:ss.SSS') AS window_start,
date_format(TUMBLE_END(ts, INTERVAL '5' SECOND),'yyyy-MM-dd hh:mm:ss.SSS') AS window_end,
date_format(TUMBLE_ROWTIME(ts, INTERVAL '5' SECOND),'yyyy-MM-dd hh:mm:ss.SSS') as window_rowtime,
item,count(item) as total_item
FROM watermark_five
GROUP BY TUMBLE(ts, INTERVAL '5' SECOND), item;

在监听终端中,输入数据

hello,2022-03-25 16:39:45
hello,2022-03-25 16:39:46
hello,2022-03-25 16:39:47
hello,2022-03-25 16:39:48
hello,2022-03-25 16:39:49
hello,2022-03-25 16:39:50
hello,2022-03-25 16:39:47
hello,2022-03-25 16:39:46
hello,2022-03-25 16:39:51
hello,2022-03-25 16:39:52
hello,2022-03-25 16:39:53
hello,2022-03-25 16:39:54
hello,2022-03-25 16:39:55

Flink输出结果:  

可以看到,之前迟到的两条数据在第一个窗口中进行了处理。因为设置了最大允许乱序时间后,WaterMark要比原来低5秒,可以对延迟5秒内的数据进行处理,窗口的触发条件也同样会往后延迟关于延迟时间,请结合业务场景进行设置。


  • 📢博客主页:https://lansonli.blog.csdn.net
  • 📢欢迎点赞 👍 收藏 ⭐留言 📝 如有错误敬请指正!
  • 📢本文由 Lansonli 原创,首发于 CSDN博客🙉
  • 📢停下休息的时候不要忘了别人还在奔跑,希望大家抓紧时间学习,全力奔赴更美好的生活✨

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2137113.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

出处不详 阻拦投篮

目录 阻拦投篮题目描述背景输入输出数据范围 题解解法 打赏 阻拦投篮 题目描述 背景 现在你得到了一个可以阻拦投篮的宝物,它会在投球后把篮球传送回运动员手上,但是宝物的成功率和篮球在空中运动的时间有关,并且在特定的时间点成功的几率…

152-钓鱼篇邮件钓鱼Ewomail系统网页克隆劫持用户后门上线

承接上节课没讲完的邮件钓鱼和全部的网页钓鱼 #知识点: 1、红队技能-网络钓鱼-邮件系统 2、邮件钓鱼-平台-Gophish&Swaks 3、邮件钓鱼-系统-smtp2go&SendCloud 4、邮件钓鱼-自定义-Ewomail&Postfix 5、网页钓鱼-克隆修改-劫持口令&下载后门 这…

测试工具笔记

性能测试是软件测试中非常重要的一部分,它可以帮助识别软件在高负载条件下的性能瓶颈。市面上有许多性能测试工具,它们各有特点和优势。以下是一些流行的性能测试工具: 1. LoadRunner: 由Micro Focus提供,是一个业界广…

实战外网配置——光猫桥接+路由器PPPoE拨号+防火墙外网链路健康检查+外网流量负载均衡

一、适用场景: 1、企业规模较大时,1条公网带宽流量可能不足,需要用到多条公网出口时。 2、企业有业务需要静态ip映射,但是因静态ip专线价格较高,所以需要拨号光纤承载较多的下行流量。 3、当公网出口有多条链路&#…

[项目][WebServer][CGI机制 设计]详细讲解

目录 1.何为CGI机制?2.理解CGI机制3.CGI接口设计1.ProcessNonCgi2.ProcessCgi 1.何为CGI机制? CGI(Common Gateway Interface)是外部应用程序(CGI程序)与WEB服务器之间的接口标准,是在CGI程序和WEB服务器之间传递信息的过程 2.理解CGI机制 …

鸿蒙OS Service Ability

鸿蒙OS Service模板的Ability基本概念 基于 Service 模板的 Ability(以下简称“Service”)主要用于后台运行任务(如执行音乐播放、文件下载等),但不提供用户交互界面。Service 可由其他应用或 Ability 启动&#xff0…

WEB攻防-PHP特性缺陷对比函数CTF考点CMS审计实例

知识点: 1、过滤函数缺陷绕过; 2、CTF考点与代码审计; 1、赋值 不会对比类型 类型也会对比 2、MD5 在使用比较md5的时候,只要第一位是相等的数字,则会值相等 3、intval 3、 %0a代表换行 4、 6、 7、 代码审计

Amoco:一款针对二进制源码的安全分析工具

关于Amoco Amoco是一款功能强大的二进制源码静态分析工具,该工具基于Python 3.8开发,可以帮助广大研究人员轻松对二进制程序执行静态符号分析。 工具特性 1、一个通用的指令解码框架,旨在减少实现对新架构的支持所需的时间。例如&#xff0c…

.NET内网实战:通过命令行解密Web.config

01阅读须知 此文所节选自小报童《.NET 内网实战攻防》专栏,主要内容有.NET在各个内网渗透阶段与Windows系统交互的方式和技巧,对内网和后渗透感兴趣的朋友们可以订阅该电子报刊,解锁更多的报刊内容。 02基本介绍 本文内容部分节选自小报童…

ICM20948 DMP代码详解(22)

接前一篇文章:ICM20948 DMP代码详解(21) 上一回讲到了inv_icm20948_wakeup_mems函数,没有讲完,本回把余下的内容讲完。为了便于理解和回顾,再次贴出inv_icm20948_wakeup_mems函数代码,在EMD-Cor…

【LLM:Gemini】文本摘要、信息提取、验证和纠错、重新排列图表、视频理解、图像理解、模态组合

开始使用Gemini 目录 开始使用Gemini Gemini简介 Gemini实验结果 Gemini的多模态推理能力 文本摘要 信息提取 验证和纠错 重新排列图表 视频理解 图像理解 模态组合 Gemini多面手编程助理 库的使用 引用 本文概述了Gemini模型和如何有效地提示和使用这些模型。本…

Linux:git

hello,各位小伙伴,本篇文章跟大家一起学习《Linux:git》,感谢大家对我上一篇的支持,如有什么问题,还请多多指教 ! 如果本篇文章对你有帮助,还请各位点点赞!!&…

基于java网吧管理系统设计与实现

博主介绍:专注于Java .net php phython 小程序 等诸多技术领域和毕业项目实战、企业信息化系统建设,从业十五余年开发设计教学工作 ☆☆☆ 精彩专栏推荐订阅☆☆☆☆☆不然下次找不到哟 我的博客空间发布了1000毕设题目 方便大家学习使用 感兴趣的可以…

Pytorch_CPU鸢尾花lirsDataset 尝试

鸢尾花数据集(lris Dataset) (1)下载地址【引用】:鸢尾花数据集下载 (2)鸢尾花数据集特点 茑尾花数据集有150 条样本记录,分为3个类别,每个类别有 50 个样本&#xff…

学习笔记JVM篇(一)

1、类加载的过程 加载->验证->准备->解析->初始化->使用->卸载 2、JVM内存组成部分(HotSpot) 名称作用特点元空间(JDK8之前在方法区)用于存储类的元数信息,例如名称、方法名、字段等;…

【程序分享】express 程序:可扩展的高级工作流程,用于更快速的从头算材料建模

分享一个 express 程序:可扩展的高级工作流程,用于更快速的从头算材料建模。 感谢论文的原作者! 主要内容 “在这项工作中,我们介绍了一个开源的Julia项目express,这是一个可扩展的、轻量级的、高通量的高级工作流框…

学python要下什么包吗,有推荐的教程或者视频吗?

初学者可以尝试三种方法来学习Python第三方库,第一种传统,第二种省心,第三种轻量。 1、安装PythonPycharm,通过pip进行包管理,或者Pycharm后台也可以 2、安装Anaconda,预装了几百个数据科学包&#xff0c…

模仿抖音用户ID加密ID的算法MB4E,提高自己平台ID安全性

先看抖音的格式 对ID加密的格式 MB4EENgLILJPeQKhJht-rjcc6y0ECMk_RGTceg6JBAA 需求是 同一个ID 比如 413884936367560 每次获取得到的加密ID都是不同的,最终解密的ID都是413884936367560 注意这是一个加密后可解密原文的方式,不是单向加密 那么如下进行…

Windows 环境下 vscode 配置 C/C++ 环境

vscode Visual Studio Code(简称 VSCode)是一个由微软开发的免费、开源的代码编辑器。它支持多种编程语言,并提供了代码高亮、智能代码补全、代码重构、调试等功能,非常适合开发者使用。VSCode 通过安装扩展(Extension…

abVIEW 可以同时支持脚本编程和图形编程

LabVIEW 可以同时支持脚本编程和图形编程,但主要依赖其独特的 图形编程 环境(G语言),其中程序通过连线与节点来表示数据流和功能模块。不过,LabVIEW 也支持通过以下方式实现脚本编程的能力: 1. 调用外部脚本…