【Flink】Flink时间语义详解

news2024/11/17 14:20:09

简介

在流处理中,时间是一个非常核心的概念,是整个系统的基石。我们经常会遇到这样的需求:给定一个时间窗口,比如一个小时,统计时间窗口内的数据指标。那如何界定哪些数据将进入这个窗口呢?在窗口的定义之前,首先需要确定一个作业使用什么样的时间语义。

本文首先介绍 Flink 的三种时间语义(Event Time、Processing Time 和 Ingestion Time),然后详细介绍 Flink-watermark 机制。

三种时间语义

事件时间(Event Time)

事件时间是数据生成的时间,是数据流中每个元素或者每个事件自带的时间属性,一般是事件发生的时间,在实际项目中作为前端的一个属性嵌入。在理想情况下,数据应当按照事件时间顺序到达集群节点,但是由于从产生一条数据到数据抵达集群有过多的中间步骤,一个较早发生的事件可能较晚到达,使用事件时间意味着会产生数据乱序。

为了避免数据乱序问题,我们第一个能想到的解决方案就是等待,我们可以等待所有的数据都到达之后再进行窗口计算,如果一个窗口可以等待所有的数据抵达再触发计算,那么结果的一致性和正确性都能够得到保证,此时使用事件时间不用担心乱序问题。

但是这个解决方案也有一个问题,就是我们不知道什么时候所有的数据都抵达了,在实际项目中也不可能永久等待下去。为了解决这个问题引入了水位线(WaterMark),水位线是一个 Flink 系统中抽象的、基于事件时间的逻辑时钟,这个时钟不会自己流动,它随着事件的到来进行推动。 用来衡量当前系统事件时间的进展,我将会后面详细介绍。

如果想要使用水位线,在程序中必须指定如下内容:

  1. 如何从一条数据中提取出事件时间;
  2. 基于事件时间如何生成水位线;

小结一下,使用 Event Time 的优势是结果的可预测性,缺点是缓存较大,增加了延迟,且调试和定位问题更复杂。

处理时间(Processing Time)

处理时间执行处理操作的机器的系统时间,使用处理时间不需要依赖水位线,也无需缓存,实现也十分简单,是延迟最小的一种时间语义。由于计算是需要时间的,受限于集群中软硬件的限制,第 N 个算子和第 N+1 个算子的处理时间是不同的,尽管他们有相同的事件时间,这也意味着处理时间具有不确定性,结果不可预测。就窗口场景下,不同的运行环境窗口计算结果不同。

注入时间(Ingestion Time)

注入时间是数据进入 Source 算子的时间,任何一个算子的处理速度快慢可能影响到下游算子的处理时间,但是注入时间仅依赖于数据进入 Source 算子的时间,因此不会受制于不同算子的计算时间。

使用注入时间是一种折中方案,不同于事件时间,不需要使用水位线机制,也意味着他不需要太多的缓存,延迟也较低。相比处理时间,它避免了不同算子处理速度的影响。

设置时间语义

1.11 版本

在公司中通常使用 1.09 或者 1.11 版本,针对这两个版本,设置时间语义的方式如下:

// 设置事件时间语义
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
// 设置处理时间语义
env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
// 设置注入时间语义
env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);

1.11 版本后

1.12 版本开始,1.11 版本时间语义的设置方法被废弃,1.12 版本开始默认的时间语义就是事件时间,所以 setStreamTimeCharacteristic() 方法被废弃,如果需要使用 setAutoWatermarkInterval() 方法设置自动水印发射的时间间隔,如果间隔设为 0 则认为使用处理时间。从 setStreamTimeCharacteristic() 的源码看,它的底层也是使用了 setAutoWatermarkInterval() 方法,源码分析如下:

// --------------- StreamExecutionEnvironment ------------
// 默认间隔时间200毫秒
private long autoWatermarkInterval = 200;
// 默认的时间语义就是事件时间
private TimeCharacteristic timeCharacteristic = DEFAULT_TIME_CHARACTERISTIC;
private static final TimeCharacteristic DEFAULT_TIME_CHARACTERISTIC =  
        TimeCharacteristic.EventTime;

public void setStreamTimeCharacteristic(TimeCharacteristic characteristic) {  
	// 前置检查
    this.timeCharacteristic = Preconditions.checkNotNull(characteristic);  
    // 如果使用处理时间语义,则水位线发送间隔设置为0,
    if (characteristic == TimeCharacteristic.ProcessingTime) {  
        getConfig().setAutoWatermarkInterval(0);  
    } else {  
        getConfig().setAutoWatermarkInterval(200);  
    }  
}
// -------------------------------------------------------

总结

为了让大家更好的理解三种时间语义,我画了一个图,希望能帮到大家。

在这里插入图片描述

水位线和事件时间

我们知道,流处理从事件产生,到流经 source,再到 operator,中间是有一个过程和时间的,虽然大部分情况下,流到 operator 的数据都是按照事件产生的时间顺序来的,但是也不排除由于网络、分布式等原因,导致乱序的产生,所谓乱序,就是指 Flink 接收到的事件的先后顺序不是严格按照事件的 Event Time 顺序排列的,为了衡量一个系统的事件时间水平引入了水位线机制。稍稍总结一下水位线的引入原因:

  • 分布式系统的网络传输的不确定性;
  • 数据是乱序的;
  • 支持事件时间的流处理器需要一种测量事件时间进度的方法,用以正确的处理窗口等操作;

水位线的物理意义有两点:

  1. 水位线本质是一个基于数据生成的、单调递增的时间戳;
  2. 水位线 W(t)表示当前数据流中的所有 t 时刻前的数据都已经到了。

就机制而言十分的简单,Flink 会按照一定的规则生成水位线并插入到数据流中,这是一种特殊的数据结构,有对应的特殊处理方式。这条 Watermark 就等于当前所有到达数据中的 maxEventTime - 延迟时长,就窗口场景而言,Watermark 是由数据携带的,一旦数据携带的 Watermark 比当前未触发的窗口的停止时间要晚,那么就会触发相应窗口的执行。由于 Watermark 是由数据携带的,因此,如果运行过程中无法获取新的数据,那么没有被触发的窗口将永远都不被触发,这也称之为断流

我画了一张图方便大家理解

在这里插入图片描述

除此之外,WaterMark 还需要注意如下几点:

  • Watermark 与事件的时间戳紧密相关。一个时间戳为 T 的 Watermark 假设后续到达的事件时间戳都大于 T。
  • 假如 Flink 算子接收到一个违背上述规则的事件,该事件将被认定为迟到数据。Flink 提供了一些其他机制来处理迟到数据。
  • Watermark 时间戳必须单调递增,以保证时间不会倒流。
  • Watermark 机制允许用户来控制准确度和延迟。Watermark 设置得与事件时间戳相距紧凑,会产生不少迟到数据,影响计算结果的准确度,整个应用的延迟很低;Watermark 设置得非常宽松,准确度能够得到提升,但应用的延迟较高,因为 Flink 必须等待更长的时间才进行计算。

水位线的传播

在实际计算过程中,Flink 的算子一般分布在多个并行的分区(或者称为实例)上,Flink 需要将 Watermark 在并行环境下向前传播。如下图所示,由于上游各分区的处理速度不同,到达当前算子的 Watermark 也会有先后快慢之分,每个算子子任务会维护来自上游不同分区的 Watermark 信息,这是一个列表,列表内对应上游算子各分区的 Watermark 时间戳等信息。每当一个上游传递过来一个水位线,实例会判断该水位线是否大于列表中记录的数值,如果大于则更新水位线。接着实例会遍历整个水位线列表找出最小值作为实例的事件时间,最后,实例会将更新的 Event Time 作为 Watermark 发送给下游所有算子子任务。

在这里插入图片描述

总结一下就是:

  • 上游的水位线会广播到所有的下游任务上;
  • 多个上游任务传递水位线时下游任务使用其中最小的水位线,下游任务会为所有的上游任务创建一个分区标志位,保存其传递的水位线。

这样的设计机制满足了并行环境下 Watermark 在各算子中的传播问题,但是假如某个上游分区的 Watermark 一直不更新,Partition Watermark 列表其他地方都在正常更新,唯独个别分区的时间停留在很早的某个时间,这会导致算子的 Event Time 时钟不更新,相应的时间窗口计算也不会被触发。这种问题的解决办法就是设置断流阈值,设置断流阈值,超时将 Source-SubTask 置于 IDLE,这样后续分区水位线传递将会忽略该 SubTask。此外,在 union 等多数据流处理时,Flink 也使用上述 Watermark 更新机制,这也可能导致断流问题,解决方案同上。

时间事件的提取和水位线的生成

Source 算子

我们可以在 Source 阶段,通过自定义 SourceFunction 或 RichSourceFunction,在 SourceContext 里重写 collectWithTimestamp()emitWatermark() 两个方法,前者给数据流中的每个元素 T 赋值一个时间戳作为事件时间;后者针对数据流中的每一个元素 T 发出一个时间戳。源码如下:

// 生成事件时间
void collectWithTimestamp(T element, long timestamp);
// 生成水位线
void emitWatermark(Watermark mark);

非 Source 算子

我们也可以使用 assignTimestampsAndWatermarks() 来分配时间戳和水位线。该方法主要依赖于 WatermarkStrategy 接口,通过 WatermarkStrategy 我们可以为每个元素抽取时间戳并生成 Watermark。基本的使用方法如下:

DataStream<MyType> withTimestampsAndWatermarks = stream
        .assignTimestampsAndWatermarks(
            WatermarkStrategy
                .forGenerator(...)
                .withTimestampAssigner(...)
        );

可以看到 WatermarkStrategy.ForGenerator (...). WithTimestampAssigner (...) 链式调用了两个方法,forGenerator () 方法用来生成 Watermark,本质是返回了一个 WatermarkGenerator ,它的源码如下:


public interface WatermarkGenerator<T> {  

	// 数据流中的每个元素流入后都会调用onEvent()方法 
	// Punctunated方式下,一般根据数据流中的元素是否有特殊标记来判断是否需要生成Watermark 
	// Periodic方式下,一般用于记录各元素的Event Time时间戳
    void onEvent(T event, long eventTimestamp, WatermarkOutput output);  

	// 每隔固定周期调用onPeriodicEmit()方法 
	// 一般主要用于Periodic方式 
	// 固定周期用 ExecutionConfig#setAutoWatermarkInterval() 方法设置
    void onPeriodicEmit(WatermarkOutput output);  
}

withTimestampAssigner () 方法用来为数据流的每个元素设置时间戳。WatermarkGenerator 提供了两种方式,onPeriodicEmit() 方法实现周期性的生成水位线,默认周期是 200 毫秒。可以使用 ExecutionConfig.setAutoWatermarkInterval() 方法进行设置。

Flink 本身已经帮我们封装好了这样的代码,常见的两个实现是 BoundedOutOfOrdernessWatermarksAscendingTimestampsWatermarks

基本的使用方式如下:

DataStream.assignTimestampsAndWatermarks(WatermarkStrategy  
        .<Event>forBoundedOutOfOrderness(Duration.ZERO)  
        .withTimestampAssigner((SerializableTimestampAssigner<Event>)  
                (element, recordTimestamp) -> element.timeStamp))

Watermark 的设定

在 Flink 中 watermark 由开发人员指定延迟,对待具体的业务场景,我们可能需要反复尝试,不断迭代和调整时间戳和 Watermark 策略.

往期回顾

  1. 【Flink】详解JobGraph
  2. 【Flink】详解StreamGraph
  3. 【Flink】浅谈Flink架构和调度
  4. 【Flink】详解Flink的八种分区
  5. 【Flink】浅谈Flink背压问题(1)
  6. 【分布式】浅谈CAP、BASE理论(1)

文中难免会出现一些描述不当之处(尽管我已反复检查多次),欢迎在留言区指正,相关的知识点也可进行分享,希望大家都能有所收获!!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/349179.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

【C语言】程序环境和预处理|预处理详解|定义宏(下)

主页&#xff1a;114514的代码大冒 qq:2188956112&#xff08;欢迎小伙伴呀hi✿(。◕ᴗ◕。)✿ &#xff09; Gitee&#xff1a;庄嘉豪 (zhuang-jiahaoxxx) - Gitee.com 文章目录 目录 文章目录 前言 2.5带副作用的宏参数 2.6宏和函数的对比 3#undef ​编辑 4 命令行定义…

直播 | StarRocks 实战系列第二期--导入优化&问题排查

2023 年开春&#xff0c; StarRocks 社区重磅推出入门级实战系列直播&#xff0c;手把手带你从 Zero to Hero 成为一个 “StarRocks Pro”&#xff01;通过实际操作和应用场景的结合&#xff0c;我们将帮你系统性地学习 StarRocks 这个当今最热门的开源 OLAP 数据库。本次&…

WebSocket+xterm+springboot+vue 实现 xshell 操作linux终端功能

效果图 1.工具介绍与安装 1.1 xterm.js xterm 是一个使用 TypeScript 编写的前端终端组件&#xff0c;可以直接在浏览器中实现一个命令行终端应用。Xterm.js 适用于大多数终端应用程序&#xff0c;如 bash&#xff0c;vim 和 tmux&#xff0c;这包括对基于curses的应用程序和…

反击爬虫,前端工程师的脑洞可以有多大?

1. 前言对于一张网页&#xff0c;我们往往希望它是结构良好&#xff0c;内容清晰的&#xff0c;这样搜索引擎才能准确地认知它。而反过来&#xff0c;又有一些情景&#xff0c;我们不希望内容能被轻易获取&#xff0c;比方说电商网站的交易额&#xff0c;教育网站的题目等。因为…

港科夜闻|广州市市长郭永航先生与香港科大校董会廖长城先生一行举行座谈交流...

关注并星标每周阅读港科夜闻建立新视野 开启新思维1、广州市市长郭永航先生与香港科大校董会廖长城先生一行举行座谈交流。2月9日上午&#xff0c;双方就推进香港科技大学(广州)建设发展进行深入交流&#xff0c;并一致表示&#xff0c;将全力推动落实《南沙方案》中“打造高等…

【基础篇】一文掌握css的盒子模型(margin、padding)

1、CSS 盒子模型(Box Model) 所有HTML元素可以看作盒子,在CSS中,"box model"这一术语是用来设计和布局时使用。CSS盒模型本质上是一个盒子,封装周围的HTML元素,它包括:边距,边框,填充,和实际内容。盒模型允许我们在其它元素和周围元素边框之间的空间放置元素…

文献的阅读的习惯与方法

文献阅读是每个研究人员都要做的事情&#xff0c;然而虽然每个人都在阅读&#xff0c;但是每个人的阅读效率不一样&#xff0c;总结有效的方式是非常重要的。本笔记将梳理我在阅读文献中的方法和所在其中的关注点。 阅读文献有两种目的&#xff0c;第一种目的是日常阅读和学习…

ThinkPHP5美食商城系统

有需要请私信或看评论链接哦 可远程调试 ThinkPHP5美食商城系统一 介绍 此美食商城系统基于ThinkPHP5框架开发&#xff0c;数据库mysql&#xff0c;前端bootstrap。系统角色分为用户和管理员。用户注册登录后可购买美食&#xff0c;个人中心&#xff0c;评论和反馈等&#xff…

一手教你如何搭建Hadoop基于Zookeeper的集群(5台主机)

文章目录一、设计集群图二、准备五台虚拟机2.1、下载安装文件2.2、创建虚拟机2.3、配置网络2.4、修改主机名称2.5、关闭防火墙2.6、同步时间2.7、设置/etc/hosts文件2.8、设置免密登录2.9、为后面可以主备替换安装psmisc三、安装JDK3.1、安装jdk3.2、测试jdk是否安装成功3.3、将…

Android笔记:动画

文章目录1.View Animation&#xff08;视图动画&#xff09;1.1 Tween Animation&#xff08;补间动画&#xff09;Animation 继承属性透明度alpha缩放scale移动translate旋转rotateset标签Animation父类共有函数1.2Frame Animation &#xff08;逐帧动画&#xff09;2.Propert…

spark3.0源码分析-driver-executor心跳机制

前言 driver和executor心跳机制分为两种机制&#xff1a; 1、executor发送心跳机制 2、driver接受心跳机制 至于为何要分为两种&#xff0c;原因是在分布式场景中&#xff0c;服务的稳定性是无法保障的&#xff0c;例如executor宕机后无法发送心跳&#xff0c;故driver端需要…

3、按键扫描检测处理

说明&#xff1a;本文处理按键的短按、长按检测执行&#xff0c;非矩阵按键 硬件可以类似如下连接即可&#xff0c;无需放置上下拉电阻&#xff1b; 按键动作分长按、短按(弹起时执行)两种 按下不放执行长按&#xff0c;但松开按键时不予执行短按函数 多个按键可以同时操作 按…

内网渗透(三十四)之横向移动篇-IPC配合计划任务横向移动

系列文章第一章节之基础知识篇 内网渗透(一)之基础知识-内网渗透介绍和概述 内网渗透(二)之基础知识-工作组介绍 内网渗透(三)之基础知识-域环境的介绍和优点 内网渗透(四)之基础知识-搭建域环境 内网渗透(五)之基础知识-Active Directory活动目录介绍和使用 内网渗透(六)之基…

Altium Designer输出生产文件Gerber、IPC、NC Drill、坐标文件--AD

AD软件版本&#xff1a;22.2.1 gerber文件输出共有两部分&#xff1a; 1、Gerber Files:铜皮 和 外形分别导出 2、Nc Drill Files 分3次导出 一、Gerber Files 导出2次 设定原点 ** Edit->Origin->Set** 一般板边左下角为原点&#xff0c;可以根据自己板子形状确定 导…

使用MAT工具分析OOM问题

1、添加jvm参数 保存堆内存快照 -XX:HeapDumpOnOutOfMemoryError -XX:HeapDumpPath存放路径 2、下载MAT工具 下载地址&#xff1a; https://www.eclipse.org/mat/downloads.php 3、启动如果遇到 Version 1.8.0 102 of the JVM is not suitable for this product, Version: 11…

2023LaCTFWriteup

文章目录2023LaCTFEBEa hackers notes2023LaCTF 前言&#xff1a; 累了&#xff0c;没有话&#xff0c;下次一定 EBE I was trying to send a flag to my friend over UDP, one character at a time, but it got corrupted! I think someone else was messing around with me…

【C++】类与对象(三)

前言 本章我们接替前一章继续深入理解类的默认成员函数&#xff0c;赋值重载&#xff0c;取地址重载&#xff0c;及const取地址操作符重载 但是在讲剩下的三个默认成员函数之前&#xff0c;我们要先来了解运算符重载&#xff0c;因为赋值重载&#xff0c;取地址重载&#xff0c…

Compose-Navigation简单案例上手

Navigation 快速上手 下面案例简要展示使用 Compose 版本的 Navigation 库来实现两个页面之间的跳转 这是完整的结构&#xff08;忽略掉红线划过的那个包&#xff09; 安装适用于 kotlin 的 navigation 依赖 dependencies {implementation("androidx.navigation:navigati…

兼职任务平台收集(一)分享给有需要的朋友们

互联网时代&#xff0c;给人们带来了很大的便利。信息交流、生活缴费、足不出户购物、便捷出行、线上医疗、线上教育等等很多。可以说&#xff0c;网络的时代会一直存在着。很多人也在互联网上赚到了第一桶金&#xff0c;这跟他们的努力和付出是息息相关的。所谓一份耕耘&#…

使用kubeadm部署k8s1.24.0版本,遇到的坑总结

使用kubeadm部署k8s1.24.0版本&#xff0c;遇到的坑总结环境安装遇到的问题环境 操作系统&#xff1a;centos7 内核&#xff1a;5.4.231-1.el7.elrepo.x86_64 kubeadm&#xff1a;1.24.0 kubelet&#xff1a;1.24.0 kubectl&#xff1a;1.24.0 cri&#xff1a;docker cni&…