flink watermark 实例分析

news2025/2/6 11:57:00

WATERMARK 定义了表的事件时间属性,其形式为:

 WATERMARK FOR rowtime_column_name AS watermark_strategy_expression 

rowtime_column_name 把一个现有的列定义为一个为表标记事件时间的属性。该列的类型必须为 TIMESTAMP(3)/TIMESTAMP_LTZ(3),且是 schema 中的顶层列,它也可以是一个计算列。
watermark是触发计算的机制,只要事件时间<= watermark,就会触发当前行数据的计算,watermark的形象描述如下:
在这里插入图片描述

watermark的窗口触发机制

watermark会根据数据流中event的时间戳发生变化。通常情况下,event都是乱序的,不按时间排序的。watermark的计算逻辑为:当前最大的 event time - 最大允许延迟时间(MaxOutOfOrderness)。在同一个分区内部,当watermark大于或者等于窗口的结束时间时,才能触发该窗口的计算,即watermark>=windows endtime。如下图所示:
在这里插入图片描述
根据上图分析:
MaxOutOfOrderness = 5s,窗口的大小为:10s。
watermark分别为:12:08、12:15、12:30
计算逻辑为:WM(12:08)=12:13 - 5s;WM(12:15)=12:20 - 5s;WM(12:30)=12:35 - 5s

  • 对于 [12:00,12:10) 窗口,需要在WM=12:15时,才能被触发计算,参与计算的event为:event(12:07)/event(12:01)/event(12:07)/event(12:09),event(12:10)/event(12:12)/event(12:12)/event(12:13)/event(12:20)/event(12:14)/event(12:15)不参与计算,因为还未到窗口时间,也就是event time 为 [12:00,12:10] 窗口内的event才能参与计算。
    注意,如果过了这个窗口期,再收到 [12:00,12:10] 窗口内的event,就算超过了最大允许延迟时间(MaxOutOfOrderness),不会再参与计算,也就是数据被强制丢掉了。
  • 对于 [12:10,12:20][12:20,12:30] 窗口,会在WM=12:30时,被同时触发计算,参与**[12:10,12:20]** 窗口计算的event为:event(12:10)/event(12:12)/event(12:12)/event(12:13)/event(12:14)/event(12:15)/event(12:15)/event(12:18);参与 [12:20,12:30] 窗口计算的event为:event(12:20)/event(12:20);在这个过程中event(12:05)会被丢弃,不会参与计算,因为已经超了最大允许延迟时间(MaxOutOfOrderness)

迟到的事件的处理,在介绍watermark时,提到了现实中往往处理的是乱序event,即当event处于某些原因而延后到达时,往往会发生该event time < watermark的情况,所以flink对处理乱序event的watermark有一个允许延迟的机制,这个机制就是最大允许延迟时间(MaxOutOfOrderness),允许在一定时间内迟到的event仍然视为有效event。

WATERMARK rowtime_column_name 取值两种方式

rowtime_column_name为计算列

CREATE TABLE pageviews (
		mid bigint,
		db string,
		sch string,
		tab string,
		opt string,
		ts bigint,
		ddl string,
		err string,
		src map < string, string >,
		cur map < string, string >,
		cus map < string, string >,
     event_time as cast(TO_TIMESTAMP_LTZ(ts,3) AS TIMESTAMP(3)), --计算列,必须为TIMESTAMP(3)/TIMESTAMP_LTZ(3)类型
     WATERMARK FOR event_time AS event_time - INTERVAL '60' SECOND
) WITH (
  'connector' = 'kafka',
  'properties.bootstrap.servers' = '***',
  'topic' = 'topic1',
  'format' = 'json',
  'properties.group.id' = '*****',
	'scan.startup.mode' = 'earliest-offset'-- 取值 : group-offsets    latest-offset     earliest-offset
);

rowtime_column_name为事件时间属性

CREATE TABLE dataGen(
  uuid VARCHAR(20),
  name INT,
  age INT,
  ts TIMESTAMP(3), --事件时间属性,字段类型为TIMESTAMP(3)
 WATERMARK FOR ts AS ts
)with(
 'connector' = 'datagen',
  'rows-per-second' = '10',
  'number-of-rows' = '100',
  'fields.age.kind' = 'random',
  'fields.age.min' = '1',
  'fields.age.max' = '10',
  'fields.name.kind' = 'random',
  'fields.name.min' = '1',
  'fields.name.max' = '10'
  );

watermark使用demo

CREATE TABLE kafka_table(
		mid bigint,
		db string,
		sch string,
		tab string,
		opt string,
		ts bigint,
		ddl string,
		err string,
		src map < string, string >,
		cur map < string, string >,
		cus map < string, string >,
     group_name as COALESCE(cur['group_name'], src['group_name']),
     batch_number as COALESCE(cur['batch_number'], src['batch_number']),
     event_time as cast(TO_TIMESTAMP_LTZ(ts,3) AS TIMESTAMP(3)), -- TIMESTAMP(3)/TIMESTAMP_LTZ(3)
     WATERMARK FOR event_time AS event_time - INTERVAL '2' MINUTE     --SECOND
) WITH (
  'connector' = 'kafka',
  'properties.bootstrap.servers' = '***',
  'topic' = 'topic1',
  'format' = 'json',
  'properties.group.id' = '*****',
	'scan.startup.mode' = 'earliest-offset'-- 取值 : group-offsets    latest-offset     earliest-offset
);

watermark在over聚合中的使用

--RANGE:每个group_name计算当前group_name前10分钟内收到的同一group_name的所有总数
select
 group_name
,event_time
,COUNT(group_name) OVER w1 as cnt
from kafka_table
where UPPER(opt) <> 'DELETE'
WINDOW w1 AS (
  PARTITION BY group_name
  ORDER BY event_time
  RANGE BETWEEN INTERVAL '10' MINUTE PRECEDING AND CURRENT ROW)

watermark在windows聚合中的使用

--求每10分钟的滚动窗口内同一group_name的所有总数
create view tmp as
SELECT group_name,event_time FROM kafka_table where UPPER(opt) <> 'DELETE';

select window_start,window_end,window_time,group_name,count(*) as cnt from
TABLE(TUMBLE(TABLE tmp, DESCRIPTOR(event_time), INTERVAL '10' MINUTES))
group by window_start,window_end,window_time,group_name

参考:
Window Aggregation
Over Aggregation

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1327938.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

pdf 在线编辑

https://smallpdf.com/edit-pdf#rapp 参考 https://zh.wikihow.com/%E5%B0%86%E5%9B%BE%E5%83%8F%E6%8F%92%E5%85%A5PDF

网络基础篇【网线的制作,OSI七层模型,集线器和交换机的介绍,路由器的介绍与设置】

目录 一、网线制作 1.1 工具介绍 1.1.1网线 1.1.2 网线钳 1.1.3 水晶头 1.1.4 网线测试仪 二、OSI七层模型 2.1 简介 2.2 OSI模型层次介绍 2.2.1 结构图 2.2.2 数据传输过程 2.3 相关网站 二、集线器 2.1 介绍 2.2 适用场景 三、交换机 3.1 介绍 3.2 适用场景…

【Java】Mac下的Tomcat安装配置

&#x1f514;Tomcat是一个免费的开源web应用服务器&#xff0c;是开发和调试JSP 程序的首选&#x1f590;可利用它响应HTML页面的访问请求。 我们在进行网络编程时&#xff0c;其中重要的中间件就是Tomcat&#xff0c;下面我们将进行在Mac上配置Tomcat的讲解。&#x1f632; …

医院影像科PACS系统源码,医学影像系统,支持MPR、CPR、MIP、SSD、VR、VE三维图像处理

PACS系统是医院影像科室中应用的一种系统&#xff0c;主要用于获取、传输、存档和处理医学影像。它通过各种接口&#xff0c;如模拟、DICOM和网络&#xff0c;以数字化的方式将各种医学影像&#xff0c;如核磁共振、CT扫描、超声波等保存起来&#xff0c;并在需要时能够快速调取…

Redis-学习笔记

Remote Dictionary Server(Redis) 是一个开源的使用 ANSI C 语言编写、遵守 BSD 协议、支持网络、可基于内存、分布式、可选持久性的键值对(Key-Value)存储数据库&#xff0c;并提供多种语言的 API&#xff0c;是跨平台的非关系型数据库。 Redis 通常被称为数据结构服务器&…

自媒体新手如何写出爆款公众号文章

今天跟大家分享一下&#xff0c;作为新手怎么样写出一篇阅读量过万的公众号的文章。 我的公众号是从2020年开始写的&#xff0c;写到今天差不多三年多一点。然后现在的粉丝数虽然不多&#xff0c;但也差不多近两千个了。 我这三年多差不多更新了150篇原创文章。刚开始的时候写的…

【谭浩强C程序设计精讲 2】整型数据

文章目录 3.3 整型数据3.3.1 整型常量的表示方法3.3.2 整型变量1. 整型数据在内存中的存放形式2. 整型变量的分类3. 整型变量的定义4. 整型数据的溢出 3.3.3 整型常量的类型 3.3 整型数据 3.3.1 整型常量的表示方法 整型常量即整常数。C 的整常数可以用以下三种形式表示&…

ISA95 及工业互联网平台

ISA95简称S95&#xff0c;是美国仪表、系统和自动化协会&#xff08;ISA&#xff09;在95年提出来的&#xff0c;也是这个协会启动编制的第95个标准项目。它定义了企业商业和控制系统之间的集成&#xff0c;主要可以分成三个层次&#xff1a; 第0&#xff0c;1&#xff0c;2层…

Ubuntu 常用命令之 fdisk 命令用法介绍

&#x1f4d1;Linux/Ubuntu 常用命令归类整理 fdisk 是一个用于处理磁盘分区的命令行工具&#xff0c;它在 Linux 系统中广泛使用。fdisk 命令可以创建、删除、更改、复制和显示硬盘分区&#xff0c;以及更改硬盘的分区 ID。 fdisk 命令的常用参数如下 -l&#xff1a;列出所…

jmx_exporter安装

下载 wget https://repo1.maven.org/maven2/io/prometheus/jmx/jmx_prometheus_javaagent/0.13.0/jmx_prometheus_javaagent-0.13.0.jar 创建jmx_exporter.yml文件 文件内容为&#xff1a; rules: - pattern: ".*" 配置tomcatpinter/apache-tomcat-8.5.38/bin/ca…

模型评估方法

目录 数据集切分 交叉验证 交叉验证实例 混淆矩阵 实例 代码实现 阈值 全局阈值处理 自适应阈值处理 阈值对结果的影响 ROC曲线 数据集切分 数据集切分是指将一个数据集分割成训练集和测试集的过程。常用的方法是随机切分&#xff0c;即将数据集中的样本按照一定比…

计算机存储术语: 扇区,磁盘块,页

扇区(sector) 硬盘的读写以扇区为基本单位。磁盘上的每个磁道被等分为若干个弧段&#xff0c;这些弧段称之为扇区。硬盘的物理读写以扇区为基本单位。通常情况下每个扇区的大小是 512 字节。linux 下可以使用 fdisk -l 了解扇区大小&#xff1a; $ sudo /sbin/fdisk -l Disk …

B/S医院手术麻醉临床管理系统源码 手术申请、手术安排

手术麻醉系统概述 手术室是医院各个科室工作交叉汇集的一个重要中心&#xff0c;在时间、空间、设备、药物、材料、人员调配的科学管理、高效运作、安全质控、绩效考核&#xff0c;都十分重要。手术麻醉管理系统&#xff08;Operation Anesthesia Management System&#xff0…

vue 简单实现购物车:商品基础信息最终的 html 文件 + 商品计数器的组件处理,实现了购物车;

购物车实现过程&#xff1a; Ⅰ、商品购物车作业需求&#xff1a;1、商品购物车页面示例&#xff1a;2、具体需求&#xff1a; Ⅱ、html 文件的构建&#xff1a;商品购物车.html Ⅲ、组件文件的构建&#xff1a;商品购物车1.js Ⅳ、小结&#xff1a; Ⅰ、商品购物车作业需求&am…

百度侯震宇详解:大模型将如何重构云计算?

12月20日&#xff0c;在2023百度云智大会智算大会上&#xff0c;百度集团副总裁侯震宇以“大模型重构云计算”为主题发表演讲。他强调&#xff0c;AI原生时代&#xff0c;面向大模型的基础设施体系需要全面重构&#xff0c;为构建繁荣的AI原生生态筑牢底座。 侯震宇表示&…

.Net Core webapi RestFul 统一接口数据返回格式

在RestFul风格盛行的年代&#xff0c;大部分接口都需要一套统一的数据返回格式&#xff0c;那么我们怎么才能保证使用统一的json数据格式返回呢&#xff0c;下面给大家简单介绍一下&#xff1a; 假如我们需要接口统一返回一下数据格式&#xff1a; {"statusCode": …

预警指挥中心建设方案

目录 1.系统概述 2.系统设计 一、设计原则 二、设计依据 3.建设内容 4.设备中心 4.1视频系统 4.2音频系统 4.2.1音响 4.2.2功放 4.2.3无线麦 4.2.4音频处理器 4.3控制系统 4.4视频会商系统 4.4.1全向麦 4.4.2高清摄像头 4.4.3会议终端 4.4.4系统软件 5.软件中…

COSP营造户外骑行新业态:2024深圳户外骑行展在深圳福田召开

随着人们生活水平的提高&#xff0c;绿色健康的生活方式备受关注&#xff0c;户外骑行作为一种绿色、健康的运动方式&#xff0c;越来越受到人们的青睐。此次展会将展示各种类型的户外骑行装备&#xff0c;包括自行车、头盔、手套、鞋类、骑行服等。深圳是中国户外骑行展会的重…

使用Flask逐步搭建Web应用程序

大家好&#xff0c;Flask是一个使用Python编写的轻量级Web应用框架。它被设计成简单、易于学习和使用的&#xff0c;同时具备足够的灵活性和扩展性&#xff0c;以满足各种规模的Web应用开发需求。本文我们将介绍一个使用Flask逐步搭建Web应用程序的简单入门示例。 1.安装Flask…

安装 PyCharm 2021.1 保姆级教程

作者&#xff1a;billy 版权声明&#xff1a;著作权归作者所有&#xff0c;商业转载请联系作者获得授权&#xff0c;非商业转载请注明出处 前言 目前能下载到的最新版本是 PyCharm 2021.1。 请注意对应 Python 的版本&#xff1a; Python 2: 2.7Python 3: >3.6, <3.11…