flink watermark介绍及watermark的窗口触发机制

news2024/12/29 8:19:45

Flink的三种时间

在谈watermark之前,首先需要了解flink的三种时间概念。在flink中,有三种时间戳概念:Event Time 、Processing Time 和 Ingestion Time。其中watermark只对Event Time类型的时间戳有用。这三种时间概念分别表示:

Processing time

处理时间,指执行算子操作的机器的当前时间。当基于处理时间运行时,所有关于时间的操作(如时间窗口)都将使用执行算子操作的主机的本地时间。例如,当时间窗口为一小时,如果应用程序在9:15 am开始运行,则第一个窗口将包括在9:15 am到10:00 am之间被处理的事件,下一个窗口将包含在10:00 am到11:00 am之间被处理的事件,依此类推。

处理时间是最简单的时间概念,不需要流和机器之间的协调。它提供了最佳的性能和最低的延迟。但是,在分布式和异步环境中,处理时间不能提供确定性,因为它容易受到上流系统(例如从消息队列)到达Flink的速度、flink内部operators之间交互的速度,以及中断(调度或其他情况)等因素的影响。

Event Time

事件时间,是每个event在其生产设备上产生的时间,即元素在到达flink之前,本身就自带的时间戳。

所以说,Event Time的时间戳取决于数据,而与其他时间无关。使用Event Time,必须在从执行环境中先引入EventTime的时间属性。如:

val env = StreamExecutionEnvironment.getExecutionEnvironment
// 从调用时刻开始给env创建的每一个stream追加时间特征
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

然后通过Dstream的assignTimestampsAndWatermarks方法指定event time时间戳,具体操作不做赘述。

在理想情况下,事件时间是有序的。但实际上,由于分布式操作,以及网络延迟等原因,event可能不是按照event time的顺序到达的。所以flink对处理乱序数据的方案是提供一个允许延迟时间,在允许延迟时间内到达的元素将会重新触发一次计算。这个延迟时间时相对event time而不是其他时间的,而event time不是由flink决定的,那么如何判断当前的event time到底时多少呢?flink通过一个watermark来确定与维护当前event time的最大值。这也是本文将会在后面重点解释的。

Ingestion time

Ingestion time是event进入Flink的时间,即执行source操作时的时间。

Ingestion time从概念上讲介于Event Time和Processing time之间。

与Processing time相比 ,它花费的资源会稍微多一些,但结果却更可预测。由于 Ingestion time使用稳定的时间戳(仅在addSource处分配了一次),因此对记录的不同窗口操作将引用相同的时间戳,而在Processing time中,每个窗口操作都会更新事件的Processing time,所以可能一个上游窗口中的记录会分配给不同的下游窗口(基于本地系统时钟和任何可能的延误)。

与Event Time相比,Ingestion time程序无法处理任何乱序事件或迟到的数据,但是程序不必指定如何生成watermarks。

下图为三种时间语义的图解:
在这里插入图片描述

watermark

用我自己的语言总结,在flink的窗口计算中,的watermark就是触发窗口计算的一种机制。
那么,watermark到底是以怎样的一种形式存在的呢?实际上,watermark就是一种特殊的event,它被参杂在Dstream中,watermark由flink的某个操作生成后,就在整个程序中随event一同流转,如下图所示:
在这里插入图片描述
以下是watermark的代码,可以看出watermark的就是一个流元素,仅包含一个时间戳属性:

public final class Watermark extends StreamElement {
 
	/** The watermark that signifies end-of-event-time. */
	public static final Watermark MAX_WATERMARK = new Watermark(Long.MAX_VALUE);
 
	// ------------------------------------------------------------------------
 
	/** The timestamp of the watermark in milliseconds. */
	private final long timestamp;
 
	/**
	 * Creates a new watermark with the given timestamp in milliseconds.
	 */
	public Watermark(long timestamp) {
		this.timestamp = timestamp;
	}
 

watermark的窗口触发机制

watermark会根据数据流中event的时间戳发生变化。通常情况下,event都是乱序的,不按时间排序的。watermark的计算逻辑为:当前最大的 event time - 最大允许延迟时间(MaxOutOfOrderness)。在同一个分区内部,当watermark大于或者等于窗口的结束时间时,才能触发该窗口的计算,即watermark>=windows endtime。如下图所示:
在这里插入图片描述
根据上图分析:
MaxOutOfOrderness = 5s,窗口的大小为:10s。
watermark分别为:12:08、12:15、12:30
计算逻辑为:WM(12:08)=12:13 - 5s;WM(12:15)=12:20 - 5s;WM(12:30)=12:35 - 5s

  • 对于 [12:00,12:10) 窗口,需要在WM=12:15时,才能被触发计算,参与计算的event为:event(12:07)/event(12:01)/event(12:07)/event(12:09),event(12:10)/event(12:12)/event(12:12)/event(12:13)/event(12:20)/event(12:14)/event(12:15)不参与计算,因为还未到窗口时间,也就是event time 为 [12:00,12:10] 窗口内的event才能参与计算。
    注意,如果过了这个窗口期,再收到 [12:00,12:10] 窗口内的event,就算超过了最大允许延迟时间(MaxOutOfOrderness),不会再参与计算,也就是数据被强制丢掉了。
  • 对于 [12:10,12:20][12:20,12:30] 窗口,会在WM=12:30时,被同时触发计算,参与**[12:10,12:20]** 窗口计算的event为:event(12:10)/event(12:12)/event(12:12)/event(12:13)/event(12:14)/event(12:15)/event(12:15)/event(12:18);参与 [12:20,12:30] 窗口计算的event为:event(12:20)/event(12:20);在这个过程中event(12:05)会被丢弃,不会参与计算,因为已经超了最大允许延迟时间(MaxOutOfOrderness)

迟到的事件

在介绍watermark时,提到了现实中往往处理的是乱序event,即当event处于某些原因而延后到达时,往往会发生该event time < watermark的情况,所以flink对处理乱序event的watermark有一个允许延迟的机制,这个机制就是最大允许延迟时间(MaxOutOfOrderness),允许在一定时间内迟到的event仍然视为有效event。

并行流的Watermarks

watermark可以在source处生成(也可以在source后通过其他算子生成,如map、filter等),如果source有多个并行度,那么每个并行度会单独生成一个watermark,这些watermark定义了各分区的event time。
当并行度发生变化(即上游的一个分区可能被下游多个分区使用时),每个分区的watermark是会广播至下游的每个分区的,如一些聚合多个流的操作,如 keyBy(…) 或者partition(…),此类操作的watermark是在所有输入流中取最小的watermark。当带有watermark的流通过此类算子时,会根据每个分区的watermark来更新watermark。

举个例子:当上游并行度数为4,下游的某个分区的窗口中的watermark如下:
在这里插入图片描述

  1. 当已到达的watermark分别为2、4、3、6时,窗口中的watermark为2,触发watermark为2的对应窗口计算,并将watermark=2广播至下游。

  2. 当第一个窗口的watermark被更新为4时,所有分区中已到达最小的watermark是3,则将窗口的watermark更新为3,触发对应窗口的计算,并将watermark=3广播至下游。

  3. 当第二个分区的watermark被更新为7,所有分区中已到达最小的watermark还是3,不做处理。

  4. 当第三个分区的watermark被更新为6,所有分区中已到达最小的watermark是4,则将窗口的watermark更新为4,触发对应窗口的计算,并将watermark=4广播至下游。

下图显示了event和watermark在一个并行流的示例,以及算子如何跟踪事件时间的:
在这里插入图片描述

watermark分配器

当watermark完全基于event time时,如果没有元素到达,则watermark不会被更新,这就说明,当一段时间没有元素到达,则在这个时间间隙内,watermark不会增加,那么也不会触发窗口计算。显然,如果这段时间很长的话,那么该窗口中已经到达的元素将会等待很久才会被输出计算结果。

为了避免这种情况,可以使用周期性的watermark分配器(AssignerWithPeriodicWatermarks 下面马上提到),这些分配器不仅仅基于event time进行分配。比如,可以使用一个分配器,当一段时间没有接收到新的event时,则将当前时间作为watermark。

watermark的两种分配器,flink生成watermark有两种机制:

  • AssignerWithPeriodicWatermarks :分配时间戳并定期生成watermark(可以取决于event time,或基于处理时间)。
  • AssignerWithPunctuatedWatermarks:分配时间戳并根据每一个元素生成watermark(每来一个元素都进行一次判断,相更消耗性能)

通常情况下会使用第一种机制,原因除了更节省性能外,在上面的分配器中也有提到。

下面分别对两种机制进行介绍。

AssignerWithPeriodicWatermarks

对每个元素都调用extractTimestamp方法获取时间戳,并维护一个最大时间戳。通过ExecutionConfig.setAutoWatermarkInterval(…)定义生成watermark的间隔(每n毫秒) 。根据这个间隔,周期性调用分配器的getCurrentWatermark()方法,为watermark分配值。

在flink自带的BoundedOutOfOrdernessGenerator分配器中, getCurrentWatermark是定期将当前watermark更新为最大时间戳减去允许延迟时间的值。

以下是两个使用AssignerWithPeriodicWatermarks 生成的时间戳分配器的简单示例:

/**
 * This generator generates watermarks assuming that elements arrive out of order,
 * but only to a certain degree. The latest elements for a certain timestamp t will arrive
 * at most n milliseconds after the earliest elements for timestamp t.
 */
class BoundedOutOfOrdernessGenerator extends AssignerWithPeriodicWatermarks[MyEvent] {
 
    val maxOutOfOrderness = 3500L // 3.5 seconds
 
    var currentMaxTimestamp: Long = _
 
    override def extractTimestamp(element: MyEvent, previousElementTimestamp: Long): Long = {
        val timestamp = element.getCreationTime()
        currentMaxTimestamp = max(timestamp, currentMaxTimestamp)
        timestamp
    }
 
    override def getCurrentWatermark(): Watermark = {
        // return the watermark as current highest timestamp minus the out-of-orderness bound
        new Watermark(currentMaxTimestamp - maxOutOfOrderness)
    }
}
 
/**
 * This generator generates watermarks that are lagging behind processing time by a fixed amount.
 * It assumes that elements arrive in Flink after a bounded delay.
 */
class TimeLagWatermarkGenerator extends AssignerWithPeriodicWatermarks[MyEvent] {
 
    val maxTimeLag = 5000L // 5 seconds
 
    override def extractTimestamp(element: MyEvent, previousElementTimestamp: Long): Long = {
        element.getCreationTime
    }
 
    override def getCurrentWatermark(): Watermark = {
        // return the watermark as current time minus the maximum time lag
        new Watermark(System.currentTimeMillis() - maxTimeLag)
    }
}

AssignerWithPunctuatedWatermarks

根据每个元素的event time生成watermark,通过extractTimestamp(…)方法为元素分配时间戳,通过checkAndGetNextWatermark(…)检查元素的watermark并更新watermark。

checkAndGetNextWatermark(…)方法的第二个参数是extractTimestamp(…) 返回的时间戳,根据这个时间戳决定是否要生成watermark。每当checkAndGetNextWatermark(…) 方法返回一个非空watermark,并且该watermark大于上一个watermark时,就会更新watermark。

class PunctuatedAssigner extends AssignerWithPunctuatedWatermarks[MyEvent] {
 
	override def extractTimestamp(element: MyEvent, previousElementTimestamp: Long): Long = {
		element.getCreationTime
	}
 
	override def checkAndGetNextWatermark(lastElement: MyEvent, extractedTimestamp: Long): Watermark = {
		if (lastElement.hasWatermarkMarker()) new Watermark(extractedTimestamp) else null
	}
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/515608.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

[golang gin框架] 30.Gin 商城项目- 购物车商品确认页面以及收货地址的增删改查

一.界面展示 购物车页面 增加功能&#xff1a; 展示用户加入的购物车数据&#xff0c;并点击‘去结算’按钮&#xff0c; 判断是否选中商品 确认订单页面 展示 选中的购物车商品数据(商品标题&#xff0c;图片&#xff0c;数量等)以及 结算的数据(总的价格&#xff0c;总的数量…

【Spring】初识MyBatis (二)

&#xff08;接上一篇【Spring】[初识MyBatis&#xff08;一&#xff09;]&#xff09; 目录 1.2 根据用户名模糊查询用户信息2 添加客户3 更新用户4 删除用户 1.2 根据用户名模糊查询用户信息 【示例6-2】模糊查询的实现只需要在映射文件中通过元素编写相应的SQL语句&#x…

华为手机如何进入开发者模式?连接studio真机调试

对于安卓开发者来说&#xff0c;真机调试是非常好的选择&#xff0c;对电脑配置也没有过分的要求。如果采用Android Studio自带安卓虚拟机调试&#xff0c;真的很慢&#xff0c;一点都不友好。 真机调试的步骤&#xff1a;打开设置->关于手机->版本号&#xff0c;然后连…

并发编程12:AQS

文章目录 12.1 前置知识12.2 AQS入门级别理论知识12.2.1 是什么&#xff1f;12.2.2 AQS为什么是JUC内容中最重要的基石12.2.3 能干嘛&#xff1f;12.2.4 小总结 12.3 AQS源码分析前置知识储备12.3.1 AQS内部体系架构图12.3.2 AQS内部体系架构----AQS自身12.3.1 AQS内部体系架构…

一、H3C-NE实验-抓包实验

实验一&#xff1a;抓包实验&#xff08;PING包&#xff09; 实验拓扑结构图 1. 修改设备名称 步骤1&#xff1a;启动设备 步骤2&#xff1a;在路由器1&#xff0c;进入系统视图&#xff0c;并修改设备名称为R1 步骤3&#xff1a;在路由器2&#xff0c;进入系统视图&#xf…

【Java|基础篇】类和对象

文章目录 1. 前言2. 什么是面向对象3. 类的定义4. 类的实例化5. 对象的构造及初始化6. this引用7. 总结 1. 前言 本篇文章主要讲解了下面三个问题 类的定义和实例化构造方法this关键字 2. 什么是面向对象 众所周知面向过程和面向对象是两种重要的编程思想,而Java是属于面向…

C语言函数大全-- v 开头的函数

C语言函数大全 本篇介绍C语言函数大全-- v 开头的函数 1. va_start 1.1 函数说明 函数声明函数功能void va_start(va_list ap, last_arg);用于初始化一个 va_list 类型的变量&#xff0c;使其指向可变参数列表中的第一个参数 参数&#xff1a; ap&#xff1a; 一个指向 va_…

我的创作纪念日(个人感悟)

昨天2023年5月10日是我成为创作者的第128天纪念日&#xff0c;感谢CSDN官方的纪念信让我铭记这特殊的一天。 机缘 要说与CSDN的初次相遇&#xff0c;还是2022年的高考结束完的暑假&#xff0c;当时对于大学的学习没有什么概念&#xff0c;当初的高考志愿报的有计算机相关的专…

spring集成mybatis的原理

spring是怎样和mybatis继承的&#xff1f; 在idea里点mapper.queryOne()直接跳到了接口或xml&#xff0c;它究竟是怎样利用jdbc执行的&#xff1f; 我直接调用mapper.queryOne是怎么使用的sqlsession&#xff1f;怎么去connect的&#xff1f; mybatis是怎样根据mapper找到对应的…

【Java面试】Java并发基础(1)

文章目录 1. 可见性&#xff0c;有序性&#xff0c;原子性2. java中并发和并行3. 通常线程有哪几种使用方式? 1. 可见性&#xff0c;有序性&#xff0c;原子性 可见性&#xff08;Visibility&#xff09;&#xff1a; 指一个线程修改了共享变量的值之后&#xff0c;其他线程能…

学习网络通信必备的DNS解析和Socket通信知识

OkHttp是一个开源的网络请求框架&#xff0c;由Square公司开发。它通过封装Java底层的HttpURLConnection和Okio等库&#xff0c;提供一个简单易用的API&#xff0c;让开发人员能够方便地向服务器发送HTTP/HTTPS请求&#xff0c;支持异步请求和响应回调&#xff0c;并提供丰富的…

未来已来, 新能源与IT的技术碰撞;学习Android车载开发的必然趋势

Android工程师前景 Android车载工程师扮演着关键的角色&#xff0c;他们致力于将最新的Android技术和汽车技术相结合&#xff0c;为汽车行业提供优质的产品。随着越来越多的汽车制造商投资于智能汽车技术和车联网技术&#xff0c;Android车载工程师成为了一个越来越重要的职业…

MATLAB实现二维稳态导热

MATLAB实现二维稳态导热 一、理论基础二、代码实现 一、理论基础 步骤&#xff1a; Step.1 二维模型传热控制微分方程的确定&#xff0c;具体推导可以在任何一本传热学的书中找到。 d 2 T d x 2 d 2 T d y 2 0 \frac{d^{2}T}{dx^{2}}\frac{d^{2}T}{dy^{2}}0 dx2d2T​dy2d2T…

Ansible 自动化运维工具(一)——部署以及命令行模块

文章目录 一、 ansible 的概述1、ansible简介2.、官方网站3、ansible 的特点4、ansible的工作机制5、ansible的组成模块 二、ansible部署1、Asible的安装 三、ansible 命令行模块1、command 模块2、shell 模块3、cron 模块4、user 模块5、group 模块6、copy 模块7、file 模块8、…

【ros/ros2】ros1和ros2的区别-要点记录

dds data distribution service&#xff0c;数据分发服务 rcl ros client libraries&#xff0c;ros客户端库文件 rmw ros middle ware interface&#xff0c;ros中间件接口 lcn life cycle node&#xff0c;生命周期节点&#xff0c;受控节点 lmn lifecycle manageme…

C++二叉树递归方法存入和三种递归方法读出(前序,中序,后序)

#include <stdio.h> #include <malloc.h> typedef struct op //定义子树结构 { int data; struct op *lchild; struct op *rchild; }treestruct; treestruct *createtree() //这里这种表达形式意思是bittree类型的函数 最终要返回bitt…

【Linux】版本管理器Git

&#x1f3d6;️作者&#xff1a;malloc不出对象 ⛺专栏&#xff1a;Linux的学习之路 &#x1f466;个人简介&#xff1a;一名双非本科院校大二在读的科班编程菜鸟&#xff0c;努力编程只为赶上各位大佬的步伐&#x1f648;&#x1f648; 目录 前言一、Git是什么二、Git有什么…

开源C#代码生成器,专注.NET,Sqlserver,最简单,最干净,支持自编码的开源工具,SmartSoftHelp 开发辅助优化工具

开源C#代码生成器&#xff0c;专注.NET&#xff0c;Sqlserver&#xff0c;最简单&#xff0c;最干净&#xff0c;支持自编码的开源工具&#xff0c;SmartSoftHelp 开发辅助优化工具&#xff01; 下载地址&#xff1a;https://pan.baidu.com/s/1XLL_fLxVTw4erYZLj8-MzA?pwd888…

搭建python运行环境

安装Miniconda3 清华镜像 https://mirrors.tuna.tsinghua.edu.cn/anaconda/miniconda/ 下载 安装 next————agree————这俩选哪个都行&#xff0c;他选的第二个————安装路径———— 配置系统环境变量 Path miniconda3所在的路径 D:\Autils\miniconda3 Scri…

ChatGPT 最有可能取代哪些职业?

ChatGPT 的应用场景ChatGPT 最可能取代哪些职业&#xff1f;写在最后 ChatGPT 的应用场景 ChatGPT 的应用场景大体上可以归类为三大模块。 第一类是 代码相关 的任务场景。包含程序语言之间的相互转换&#xff08;如 python 转 java&#xff09;、程序命令的生成、代码 bug 的…