flink学习之水位线

news2025/1/19 2:31:27

什么是水位线

在事件时间语义下,我们不依赖系统时间,而是基于数据自带的时间戳去定义了一个时钟,
用来表示当前时间的进展。于是每个并行子任务都会有一个自己的逻辑时钟,它的前进是靠数
据的时间戳来驱动的。
我们可以把时钟也以数据的形式传递出去,告诉下游任务当前时间的进展;而且这个时钟
的传递不会因为窗口聚合之类的运算而停滞。一种简单的想法是,在数据流中加入一个时钟标
记,记录当前的事件时间;这个标记可以直接广播到下游,当下游任务收到这个标记,就可以
更新自己的时钟了。在 Flink 中,数据流中用来做时间标记的记号就叫做水位线。
水位线可以看作一条特殊的数据记录,它是插入到数据流中的一个标记点,
主要内容就是一个时间戳,用来指示当前的事件时间。而它插入流中的位置,就应该是在某个
数据到来之后;这样就可以从这个数据中提取时间戳,作为当前水位线的时间戳了。

水位线的分类

有序的水位线

在理想状态下,数据应该按照它们生成的先后顺序、排好队进入流中;而在实际应用中,
如果当前数据量非常大,可能会有很多数据的时间戳是相同的,这时每来一条数据就提取时间
戳、插入水位线就做了大量的无用功。所以为了提高效率,一般会每隔一段时间生成一个水位
线,这个水位线的时间戳,就是当前最新数据的时间戳,所以这时的水位线,其实就是有序流中的一个周期性出现的时间标记。
在这里插入图片描述

无序的水位线

在分布式系统中,数据在节点间传输,会因为网络传输延迟的不确定性,导致顺序发生改
变,这就是所谓的“乱序数据”。
在这里插入图片描述
对于连续数据流,我们插入新的水位线时,要先判断一下时间戳是否比之前的大,否则就
不再生成新的水位线,也就是说,只有数据的时间戳比当前时钟大,才能推动时钟前进,这时才插入水位线。
在这里插入图片描述

如果考虑到大量数据同时到来的处理效率,我们同样可以周期性地生成水位线。这时只需
要保存一下之前所有数据中的最大时间戳,需要插入水位线时,就直接以它作为时间戳生成新
的水位线,。所以我们可以试着多等几秒,也就是把时钟调得更慢一些。最终的目的,就是要让窗口能够把所有迟到数据都收进来,得到正确的计算结果。对应到水位线上,其实就是要保证,当前时间已经进展到了这个时间戳,在这之后不可能再有迟到数据来了(延迟设的足够长)。
在这里插入图片描述

如何生成水位线

1.水位线的生成时机

水位线生产的最佳位置是在尽可能靠近数据源的地方,因为水位线生成时会做出一些有关元素顺序相对时间戳的假设。由于数据源读取过程是并行的,一切引起Flink跨行数据流分区进行重新分发的操作(比如:改变并行度,keyby等)都会导致元素时间戳乱序。但是如果是某些初始化的filter、map等不会引起元素重新分发的操作,可以考虑在生成水位线之前使用。

2.水位线生成策略

在 Flink 的 DataStream API 中 , 有 一 个 单 独 用 于 生 成 水 位 线 的 方 法:assignTimestampsAndWatermarks(),它主要用来为流中的数据分配时间戳,并生成水位线来指
示事件时间。

val stream: DataStream[ClickEvent] = env.addSource(new ClickSource())  
val withTimestampsAndWatermarks: DataStream[ClickEvent] = stream.assignTimestampsAndWatermarks(watermarkStrategy)

assignTimestampsAndWatermarks()方法需要传入一个 WatermarkStrategy 作为参数,这就是
所谓的“水位线生成策略”。WatermarkStrategy 中包含了一个“时间戳分配器”TimestampAssigner
和一个“水位线生成器”WatermarkGenerator。

trait WatermarkStrategy[T] extends TimestampAssignerSupplier[T] with WatermarkGeneratorSupplier[T] {  
  def createTimestampAssigner(context: TimestampAssignerSupplier.Context): TimestampAssigner[T]  
  def createWatermarkGenerator(context: WatermarkGeneratorSupplier.Context): WatermarkGenerator[T]  
}

TimestampAssigner:主要负责从流中数据元素的某个字段中提取时间戳,并分配给
元素。时间戳的分配是生成水位线的基础。
WatermarkGenerator:主要负责按照既定的方式,基于时间戳生成水位线。在
WatermarkGenerator 接口中,主要又有两个方法:onEvent()和 onPeriodicEmit()。
onEvent:每个事件(数据)到来都会调用的方法,它的参数有当前事件、时间戳,
以及允许发出水位线的一个 WatermarkOutput,可以基于事件做各种操作
onPeriodicEmit:周期性调用的方法,可以由 WatermarkOutput 发出水位线。周期时间
为处理时间,可以调用环境配置的 setAutoWatermarkInterval()方法来设置,默认为
200ms。

env.getConfig.setAutoWatermarkInterval(60 * 1000L)
3. flink内置水位线生成器
  1. 有序流
val stream: DataStream[Event] = env.addSource(new ClickSource())  
val withTimestampsAndWatermarks: DataStream[Event] = stream.assignTimestampsAndWatermarks(  
  WatermarkStrategy  
    .forMonotonousTimestamps[Event]()  
    .withTimestampAssigner { (event, timestamp) => event.timestamp }  
)
  1. 无序流
import java.time.Duration  
import org.apache.flink.streaming.api.scala._  
import org.apache.flink.streaming.api.windowing.time.Time  
import org.apache.flink.util.Collector  
  
object OutOfOrdernessTest {  
  def main(args: Array[String]): Unit = {  
    val env = StreamExecutionEnvironment.getExecutionEnvironment  
    val clickSource = new ClickSource()  
    val stream = env.addSource(clickSource)  
  
    // 插入水位线的逻辑  
    val watermarkedStream = stream  
      .assignTimestampsAndWatermarks(  
        WatermarkStrategy  
          .forBoundedOutOfOrderness(Time.seconds(5))  
          .withTimestampAssigner(new SerializableTimestampAssigner[Event] {  
            override def extractTimestamp(element: Event, recordTimestamp: Long): Long = element.timestamp  
          })  
      )  
  
    watermarkedStream.print()  
    env.execute("OutOfOrdernessTest")  
  }  
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1398291.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

【想要安利给所有人的开发工具】一款写笔记的工具——语雀

目录 📕开篇 ✍使用感受 👍语雀的常用功能 1、导出成图片 2、导出为PDF 3、代码的模块 4、流程图 ​5、画板类 6、程序员专用区 ​7、布局和样式 8、菜单栏的功能 9、其余功能(很多) 🚗为什么推荐语雀 &…

(二)CarPlay集成开发之苹果的iAP协议

文章目录 概要协议格式鉴权流程CarPlay中的iAP2协议应用小结 概要 iAP2协议是由苹果公司定义的一种数据通信协议,主要用于苹果设备认证外设,以及与外设数据交换的一种协议 协议格式 协议格式一共分为三种类型,分别为握手包,链路…

lattice Diamond Programmer程序下载

Lattice Diamond Programmer Diamond Programmer程序下载1 Diamond Programmer启动2 Diamond Programmer程序烧写3 Cannot Identify Device错误解决 Diamond Programmer程序下载 Diamond Programmer适用于Lattice公司的FPGA器件与CPLD器件的程序下载,其下载步骤如下…

如何才能拥有比特币 - 01 ?

如何才能拥有BTC 在拥有 BTC 之前我们要先搞明白 BTC到底保存在哪里?我的钱是存在银行卡里的,那我的BTC是存在哪里的呢? BTC到底在哪里? 一句话概括,BTC是存储在BTC地址中,而且地址是公开的,…

Python项目——搞怪小程序(PySide6+Pyinstaller)

1、介绍 使用python编写一个小程序,回答你是猪吗。 点击“是”提交,弹窗并退出。 点击“不是”提交,等待5秒,重新选择。 并且隐藏了关闭按钮。 2、实现 新建一个项目。 2.1、设计UI 使用Qt designer设计一个UI界面&#xff0c…

android 开发 W/TextToSpeech: speak failed: not bound to TTS engine

问题 笔者使用TTS(TextToSpeech)对于文本内容进行语音播报,控制台报错 android 开发 speak failed:not bound to TTS engine详细问题 笔者核心代码: import android.os.Bundle; import android.speech.tts.TextToSpeech; import android.speech.tts.…

react native Gradle的原国外地址、本地下载、国内阿里腾讯镜像三种下载配置

一、国外地址:(初始项目默认) 下载地址:https://services.gradle.org/distributions/ 文件地址见下图: 注意:这个地址下载十次就有九次是连接超时,建议换另外两种方法 二、下载到本地&#x…

LLM:ALiBi - 给注意力加上线性偏置

论文:https://arxiv.org/pdf/2108.12409.pdf 代码:https://github.com/ofirpress/attention_with_linear_biases 发表:2021 长度外推 参考:https://spaces.ac.cn/archives/9431#ALIBI 长度外推性是一个训练和预测的长度不一致…

tomcat原理模拟和tomcat优化

1、tomcat实现原理 servlet 没有主方法main,依赖tomcat才能运行,因为tomcat 有主方法main,由java编写 servlet中doGet和doPost方法属于非静态方法,只能依托new对象存在,tomcat无法new出来对象,因此tomcat…

手机与电脑更改IP地址怎么使用代理IP?

在现代互联网时代,代理IP已成为许多人日常生活和工作中不可或缺的一部分。通过代理IP,用户可以隐藏自己的真实IP地址,并获得更好的网络体验。本文将详细介绍如何在手机和电脑上更改IP地址并使用代理IP。 一、手机使用代理IP 1. 打开手机设置&…

1.C语言——基础知识

C语言基础知识 1.第一个C语言程序2.注释3.标识符4.关键字5.数据类型6.变量7.常量8.运算符9.输入输出输入输出 1.第一个C语言程序 C语言的编程框架 #include <stdio.h> int main() {/* 我的第一个 C 程序 */printf("Hello, World! \n");return 0; }2.注释 单行…

MySQL面试题 | 18.精选MySQL面试题

&#x1f90d; 前端开发工程师&#xff08;主业&#xff09;、技术博主&#xff08;副业&#xff09;、已过CET6 &#x1f368; 阿珊和她的猫_CSDN个人主页 &#x1f560; 牛客高级专题作者、在牛客打造高质量专栏《前端面试必备》 &#x1f35a; 蓝桥云课签约作者、已在蓝桥云…

Vue3前端开发,如何获取组件内dom对象以及子组件的属性和方法

Vue3前端开发,借助Ref来获取组件内dom对象&#xff0c;借助defineExpose编译宏可以获取到子组件的属性和方法。 <script setup> import {onMounted, ref} from vue import Base from ./components/Base.vue import SetupDemo from ./components/SetupDemo.vue import Rea…

探索C++中std::string的弱点:你可能未曾注意到的缺点

C中std::string的弱点&#xff1a;你可能未曾注意到的缺点 一、背景二、性能方面的局限三、可变性带来的问题四、内存管理和指针操作五、Unicode和多字节字符集的支持六、其他替代方案七、总结 一、背景 C中std::string是一个非常重要的类&#xff0c;用于表示和处理字符串数据…

无偿分享一个很有用的看源码小技巧

怎么在 idea 里面查看 git 提交记录呢&#xff1f;这个界面是藏在哪里的呢&#xff0c;我的 idea 里面怎么没有呢&#xff1f; 好的&#xff0c;是我疏忽了&#xff0c;我先入为主的认为这个大家应该都知道是怎么来的。 但是确实是有一些同学是不太清楚的&#xff0c;那我这篇…

Java设计模式-单例模式(2)

大家好&#xff0c;我是馆长&#xff01;从今天开始馆长开始对java设计模式的创建型模式中的单例、原型、工厂方法、抽象工厂、建造者的单例模式进行讲解和说明。 单例模式&#xff08;Singleton&#xff09; 定义 某个类只能生成一个实例&#xff0c;该类提供了一个全局访问…

Docker技巧汇总

Docker技巧汇总 前言使用流程安装配置镜像管理创建并运行容器使用容器/常用命令导出和导入查看元数据挂载数据卷端口映射/转发VS Code连接Docker 前言 Docker 是一个开源的应用容器引擎&#xff0c;可以让开发者打包他们的应用以及依赖包到一个轻量级、可移植的容器中&#xf…

2024年第十五届机械与智能制造技术国际会议(ICMIMT 2024)

2024年第十五届机械与智能制造技术国际会议(ICMIMT 2024)2024年5月17-19日 南非 开普敦会议官网&#xff1a; 15TH IEEE-ICMIMT 2024http://www.mimt.us/ 近年来&#xff0c;机械和智能制造技术取得了重大进展。先进计算和传感技术的集成带来了更精确、更高效和自动化的制造过…

Pymol-电子密度图展示方法-PDB数据库已发表结构和自己晶体解析得到的结构密度图

简单来说&#xff0c;想要用PyMol展示电子密度图可以归为以下两种&#xff1a; 一是展示PDB数据库中已发表数据的结构和Map的方式 以6sps.pdb为例&#xff0c;在pymol中导入该数据密度图时&#xff0c;可以无需下载对应的密度文件&#xff0c;直接用fetch即可&#xff1a; Py…

Spring 事务管理 @Transactional

事务 Spring 的声明式事务是采用声明的方式来处理事务。这里所说的声明&#xff0c;就是指在配置文件中声明&#xff0c;用在 Spring 配置文件中声明式的处理事务来代替代码式的处理事务。 事务管理不侵入开发的组件。具体来说&#xff0c;业务逻辑对象就不会意识到正在事务管…