Flink Watermark和时间语义

news2025/1/11 18:30:47

Flink 中的时间语义

[点击并拖拽以移动] ​

时间语义:EventTime:事件创建时间;Ingestion Time:数据进入 Flink 的时间;Processing Time:执行操作算子的本地系统时间,与机器无关。不同的时间语义有不同的应用场合,我们往往更关系事件时间(Event Time)。数据生成的时候就会自动注入时间戳,Event Time 可以从日志数据的时间戳(timestamp) 中提取。

设置 Event Time

我们可以直接在代码中,对执行环境调用 setStreamTimeCharacteristic 方法,设置流的时间特性。具体的时间,还需要从数据中提取时间戳(timestamp)。

val env = StreamExecutionEnvironment.getExecutionEnvironment
//从调用时刻开始给 env 创建的每一个 stream 追加时间特性
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

乱序数据的影响

[点击并拖拽以移动] ​

当 Flink 以 Event Time 模式处理数据流时,它会根据数据里的时间戳来处理基于时间的算子。由于网络、分布式等原因,会导致乱序数据的产生。如上图所示,理想情况与实际情况会存在差异,乱序数据会让窗口计算不准确。解决方案是让窗口等几分钟。

水位线 Watermark

怎么避免乱序数据带来计算不正确?
遇到一个时间戳到达了窗口关闭时间,不应该立刻触发窗口计算,而是等待一段时间,等迟到的数据来了再关闭窗口。Watermark 是一种衡量 Event Time进展的机制,可以设置延迟触发。Watermark 是用于处理乱序事件的,而正确的处理乱序事件,通常用 Watermark 机制结合 window 来实现。数据流中的 Watermark 用于表示 timestamp 小于 Watermark 的数据,都已经达到了,因此, window 的执行也是由 Watermark 触发的。Watermark 用来让程序自己延迟和结果正确性。

Watermark 的特点: Watermark 是一条特殊的数据记录,必须单调递增,以确保任务的事件时间时钟在向前推进,而不是在后退。Watermark 与数据的时间戳有关。
[点击并拖拽以移动] ​

watermark 的传递、引入和设定

watermark 的传递:一个Task输入可以并行多个,如下有4个并行度,输出也可能存在多个并行,如下有3个。每个任务 Task 内部都有一个事件时钟,且每个分区也维护了对应的WM,如下的 Partition WM。当事件流流进 Partition 时会判断新事件流的WM是否大于当前的Partition WM,当大于时就更新 Partition 的时间戳WM为新流入的WM(取最大值),如下1->2象限Partition WM的变化。同时,如下 Task 也维护了一个全局的 WM 表示事件时钟,该值取分区中最小的WM 作为输出的时间戳,如下第二象限的输出选择最小的 WM=3 向下传递。当第二个(横线)分区 Partition WM 流进来 WM=7的事件流时,就会出现第三象限的情景,但是最小的 WM还是=3,因此不更新Task全局的 WM。当第三个分区 Partition WM 流进来 WM=6的事件流时,就会出现第四象限的情景,此时分区 Partition WM 的最小值=4,因此Task全局WM就 =4。
[点击并拖拽以移动] ​

watermark 的引入:Event Time 的使用一定要指定数据源中的时间戳。对于排好序的数据,只需要指定时间戳就够了,不需要延迟触发。

import org.apache.flink.streaming.api.windowing.time.Time
//同时分配时间戳和水位线
dataStream.assignTimestampsAndWatermarks(
//无序数据       Time.milliseconds(1000)=延迟时间
new BoundedOutOfOrdernessTimestampExtractor[SensorReading](Time.milliseconds(1000)) {
  //提取事件戳 = timestamp * 1000是因为出入的毫秒
  override def extractTimestamp(t: SensorReading): Long = {
    t.timestamp * 1000
  }
})

【1】对于排好序的数据,不需要延迟触发,可以只指定事件戳就行了

dataStream.assignTimestampsAndWatermarks(_.timestamp * 1000)

【2】Flink 暴露了 TimestampAssigner 接口供我们实现,使我们可以自定义如何从事件数据中抽取时间戳和生成 watermark。MyAssigner 可以有两种类型,都继承自 TimestampAssigner。

dataStream.assignTimestampsAndWatermarks(new MyAssigner())

TimestampAssigner :定义了抽取时间戳,以及生成 watermark 的方法,有两种类型:
【1】AssignerWithPeriodicWatermarks:系统会周期性的将 Watermark 插入到流中。默认周期是 200毫秒(如果是 processingTime 则 Watermark = 0 ),可以使用 ExecutionConfig.setAutoWatermarkInterval() 方法进行设置。升序和前面乱序的处理 BoundedOutOfOrderness,都是基于周期性 watermark 的。举例:如下产生 watermark 的逻辑:每隔 5秒,Flink 调用 AssignerWithPeriodicWatermarks 的getCurrentWatermark() 方法。如果方法返回一个时间戳大于之前水位的时间戳,新的 water会被插入到流中。这个检查保证了水位线是单调递增的。如果方法返回的时间戳小于之前水位的时间戳,则不会产生新的 watermark。

//方案一:
//EventTime是以数据自带的时间戳字段为准,应用程序需要指定如何从record中抽取时间戳字段
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
//每隔 5秒产生一个 watermark
env.getConfig.setAutoWatermarkInterval(5000);//方案二
//自定义一个周期性的时间戳
class PeriodicAssigner extends AssignerWithPeriodicWatermarks[SensorReading]{

  val bound: Long = 60 * 1000 //延时为 1 分钟
  var maxTs: Long = Long.MinValue //观察到的最大时间戳

  //生成水位线
  override def getCurrentWatermark: Watermark = {
    new Watermark(maxTs - bound)
  }

  //抽取时间戳的方法
  override def extractTimestamp(t: SensorReading, l: Long): Long = {
    maxTs = maxTs.max(t.timestamp)
    t.timestamp
  }
}

【2】AssignerWithPunctuatedWatermarks:没有时间周期规律,可打断的生成 watermark。

class PunctuatedAssigner extends AssignerWithPunctuatedWatermarks[SensorReading]{
  val bound: Long = 60 * 1000

  //获取水位线,根据数据触发
  override def checkAndGetNextWatermark(t: SensorReading, l: Long): Watermark = {
    if(t.id == "sensor_1"){
      new Watermark(l - bound)
    }else{
      null
    }
  }

  //抽取时间戳的方法
  override def extractTimestamp(t: SensorReading, l: Long): Long = {
    t.timestamp
  }
}

watermark 的设定:
【1】在 Flink 中,watermark 由应用程序开发人员生成,这通常需要对相应的领域有一定的了解。
【2】如果 watermark 设置的延迟太久,收到结果的速度可能就会很慢,解决办法是在水位线到达之前输出一个近似结果。
【3】而如果 watermark 到达得太早,则可能收到错误结果,不过 Flink 处理迟到数据的机制可以解决这个问题。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1351579.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

MySQL之四大引擎、建库建表以及账号管理

目录 一.四大引擎 1.1 InnoDB 1.2 MyISAM 1.3 MEMORY 1.4 Archive 二.数据库管理 2.1 元数据库简介 2.2 元数据库分类 2.3 数据库的增删改查及使用 2.4 MySQL库权限相关 三.数据表管理 3.1 三大范式 3.2 基本数据类型 3.2.1 优化原则 3.2.2 分类 四.数据库账号管理 4.1 相同…

LabVIEW在高精度机器人视觉定位系统中的应用

在现代工业自动化中,精确的机器人视觉定位系统对于提高生产效率和产品质量至关重要。LabVIEW软件,以其卓越的图像处理和自动化控制功能,在这一领域发挥着重要作用。本案例将展示LabVIEW如何帮助开发和实现一个高精度的机器人视觉定位系统&…

【WPF.NET开发】WPF中的输入

本文内容 输入 API事件路由处理输入事件文本输入触摸和操作侧重点鼠标位置鼠标捕获命令输入系统和基元素 Windows Presentation Foundation (WPF) 子系统提供了一个功能强大的 API,用于从各种设备(包括鼠标、键盘、触摸和触笔)获取输入。 本…

easycython和cython将py编译为pyd对比

前提了解 为了实验的准确性,在全过程使用的python环境版本都为同一版本 easycython和cython编译为pyd文件的不同在于,easycython编译的原始文件后缀为pyx,cython编译的原始文件为py 1.cython 1.1原始文件 def ZWHCythonTest():print("Z_W_H_") def ZWHCython…

C语言数组习题

1.数组遍历 #include <stdio.h>int main(){int i,a[10];for(i0;i<9;i) //对数组元素a[0]~a[9]赋值 a[i]i;for(i9;i>0;i--) //输出a[9]~a[0]共10个数组元素 printf("%d ",a[i]);printf("\n");return 0;} 运行结果&#xff1a; 2.数组应用&a…

IOS - 手机安装包 ipa 常见几种方式

安装 ipa 包的方法有很多中&#xff0c;可以通过不同的软件安装&#xff0c;本文只列出了常用的几种&#xff0c;做个简单的归纳整理 1、iTunes 安装 数据线连接手机之后&#xff0c;会自动连接iTunes&#xff0c;&#xff08;第一次连接的时候会提示是否信任此电脑&#xff0…

Git管理项目

大家好我是苏麟 , 今天和大家聊聊用Git管理项目 . 一步一步上传到Git仓库 . 1.找到VCS点击创建Git仓库 2.点击目录 3.点击绿色对号提交 4.点击提交 5.点击提交 6.成功提交到本地 7.打开GitLab 或 Gitee 或 GitHub 并创建项目 (注意 : 这里一定是什么都没有的) 否则一会上传是…

[足式机器人]Part2 Dr. CAN学习笔记-自动控制原理Ch1-9PID控制器

本文仅供学习使用 本文参考&#xff1a; B站&#xff1a;DR_CAN Dr. CAN学习笔记-自动控制原理Ch1-9PID控制器&#xff09; P —— Proportional I —— Integral D —— Derivative 当前误差/过去误差/误差的变化趋势 K p ⋅ e K_{\mathrm{p}}\cdot e Kp​⋅e&#xff1a;比…

挑战 ChatGPT 和 Google Bard 的防御

到目前为止&#xff0c;科学家已经创建了基于人工智能的聊天机器人&#xff0c;可以帮助内容生成。我们还看到人工智能被用来创建像 WormGPT 这样的恶意软件&#xff0c;尽管地下社区对此并不满意。但现在正在创建聊天机器人&#xff0c;可以使用生成人工智能通过即时注入活动来…

论文笔记:CellSense: Human Mobility Recovery via Cellular Network Data Enhancement

1 intro 1.1 背景 1.1.1 蜂窝计费记录&#xff08;CBR&#xff09; 人类移动性在蜂窝网络上的研究近些年得到了显著关注&#xff0c;这主要是因为手机的高渗透率和收集手机数据的边际成本低蜂窝服务提供商收集蜂窝计费记录&#xff08;CBR&#xff09;用于计费目的&#xf…

利用提示工程,提升LLM将自然语言转化为SQL的准确性

大型语言模型 (LLM) 已展现出理解自然语言提示并生成连贯响应的卓越能力。 这为将自然语言翻译成 SQL 等结构化查询语言开辟了新的可能性。 过去&#xff0c;编写 SQL 查询需要技术专业知识&#xff0c;而LLM允许任何人用简单的英语描述他们想要的内容&#xff0c;并自动生成相…

Github 2024-01-03 开源项目日报 Top10

根据Github Trendings的统计&#xff0c;今日(2024-01-03统计)共有10个项目上榜。根据开发语言中项目的数量&#xff0c;汇总情况如下&#xff1a; 开发语言项目数量Python项目3TypeScript项目3Jupyter Notebook项目1Dart项目1C项目1Rust项目1 系统设计指南 创建周期&#x…

STC进阶开发(三)蜂鸣器、RTC时钟、I2C总线、外部中断、RTC闹钟设置、RTC计时器设置

前言 这一期我们首先学习如何让蜂鸣器响起来&#xff0c;并且如何让蜂鸣器发出简单的歌曲&#xff0c;然后我们介绍RTC时钟&#xff0c;要想明白RTC时钟&#xff0c;我们还需要先介绍I2C总线和外部中断。接下来就开始这一期的学习吧&#xff01; 蜂鸣器 简单介绍 蜂鸣器是一种…

geemap学习笔记039:分析地理空间数据--合成无云影像

前言 本节介绍的内容是对于众多的原始Landsat数据&#xff0c;利用ee.Algorithms.Landsat.simpleComposite()将其处理为TOA数据&#xff0c;并且合成无云影像。 1 导入库并显示地图 import ee import geemap ee.Initialize()2 无云影像合成 Map geemap.Map()collection e…

王道考研计算机组成原理——数据的表示和运算

数制转换 任意进制》十进制&#xff1a;位权*位数即可 整数部分补0是补在头部&#xff0c;小数部分补0是补在尾部 一般都是先把十进制》二进制&#xff1b;然后二进制再转换成8/16进制这样子 一种更快的方法->拼凑法&#xff1a;小数部分整数部分都可以这样求 一般都是先…

TypeError: unsupported operand type(s) for +: ‘NoneType‘ and ‘str‘

报错 找到出错代码&#xff0c;发现默认值是None 解决 改为‘’即可

vba抓取网页数据

哈喽&#xff0c;哈喽&#xff0c;大家好&#xff01;大家2024发大财啦&#xff01; 不知道&#xff0c;平时大家爱不爱看电影呢&#xff1f;从今年的贺岁档的拍片来看&#xff0c;今年的电影还挺多&#xff0c;而且国产优秀电影居多&#xff0c;元旦假期期间我也去看了部喜剧…

【BCC动态跟踪PostgreSQL】

BPF Compiler Collection (BCC)是基于eBPF的Linux内核分析、跟踪、网络监控工具。其源码存放于GitCode - 开发者的代码家园 想要监控PostgreSQL数据库的相关SQL需要在编译PostgreSQL的时候开启dtrace。下文主要介绍几个和PostgreSQL相关的工具,其他工具可根据需求自行了解。 …

ChatGPT 进行 SEO的使用技巧

搜索引擎优化 (SEO) 是使网站对搜索引擎友好的一种不断发展的实践。 自搜索引擎和新兴技术的发展以来&#xff0c;它从未保持不变。 最近发布的 ChatGPT 是一种人工智能对话工具&#xff0c;似乎在搜索引擎优化方面有很好的应用。 从创建吸引人的标题到只需一个简短的提示就可…

Latex宏包gbt7714的格式问题:去掉OL

问题 采用bibtex来格式化文献&#xff0c;文献的格式采用gbt7714宏包来格式化。感谢宏包的作者和一种贡献者&#xff0c;效果非常好&#xff0c;用起来也很方便。 唯一一个我自己的问题&#xff0c;看不得文献索引后面[J/OL]中的OL。 网上搜索一圈&#xff0c;有一些办法&am…