4.时间与窗口

news2025/1/23 3:25:28

4.1 时间类型

在Flink中定义了3种时间类型:

  • 事件时间(Event Time):事件的发生事件,数据本身自带时间字段。
  • 处理时间(Processing Time):计算引擎处理时的系统时间。
  • 和摄取时间(Ingestion Time):指事件进入流处理系统的时间。

4.2 窗口类型

  • 计数窗口(Count Window):滚动/滑动
  • 时间窗口(Time Window):滚动/滑动
  • 会话窗口(Session Window):当超过一段时间,该窗口没有收到新的数据元素,则视为该窗口结束。

4.3 窗口原理与机制

  1. 数据流进入算子前,被提交给WindowAssigner,决定元素被放到哪个或哪些窗口,同时可能会创建新窗口或者合并旧的窗口。
  2. 每一个窗口都拥有一个属于自己的触发器Trigger,每当有元素被分配到该窗口,或者之前注册的定时器超时时,Trigger都会被调用。
  3. Trigger被触发后,窗口中的元素集合就会交给Evictor(如果指定了),遍历窗口中的元素列表,并决定最先进入窗口的多少个元素需要被移除。
  4. 窗口函数计算结果值,发送给下游。

PS:Flink对一些聚合类的窗口计算(如sum和min)做了优化,因为只需要保存一个中间结果值。每个进入窗口的元素都会执行一次聚合函数并修改中间结果值。

(1)WindowAssigner:决定某个元素被分配到哪个/哪些窗口中去。

(2)WindowTrigger:拥有定时器,决定窗口何时触发/清除。处理时间和计数窗口的实现基于触发器完成。(事件时间窗口触发:watermark ≥ 窗口endTime

(3)WindowEvictor:窗口数据的过滤器,可在WindowFunction执行前或后,从Window中过滤元素。

1)CountEvictor: 计数过滤器。在Window中保留指定数量的元素,并从窗口头部开始丢弃其余元素。 2)DeltaEvictor: 阈值过滤器。本质上来说就是一个自定义规则,计算窗口中每个数据记录,然后与一个事先定义好的阈值做比较,丢弃超过阈值的数据记录。 3)TimeEvictor: 时间过滤器。保留Window中最近一段时间内的元素,并丢弃其余元素。

(4)Window函数:

1)增量计算函数:数据到达后立即计算,窗口只保存中间结果。效率高,性能好,但不够灵活。

2)全量计算函数:缓存窗口的所有元素,触发后统一计算,效率低,但计算灵活。

4.4 水印

水印(Watermark)用于处理乱序事件。(也就是迟到数据)

4.4.1 watermark生成

  1. Datastream watermark生成
    1. Source function中生成
    2. DataStream API中生成
      1. AssignerWithPeriodicWatermarks:系统周期性的调用getCurrentWatermark()来获取当前的Watermark,它返回的Watermark仅在大于上一次返回的Watermark情况下有效
        1. BoundedOutOfOrdernessTimestampExtractor:初始化Watermark = Long.MIN_VALUE,对每条数据,根据extractTimestamp获取最大时间戳currentMaxTimestamp。周期性的调用getCurrentWatermark获取当前最新的Watermark。Watermark=当前收到的数据元素的最大 时间戳-固定延迟
        2. AscendingTimestampExtractor:默认是顺序数据,Watermark=当前收到的数据元素的时间戳-1。减1的目的是确保有最大时间戳的事件不会被当做迟到数据丢弃(书上说 -1 是为了确保有最大时间戳的事件不会被当做迟到数据丢弃,私认为不对,窗口是左闭右开的,最大时间戳的事件会被分配给下一个窗口,此时上一个窗口触发,不代表会丢弃这条数据,因为是在下一个窗口触发时计算
        3. IngestionTimeExtractor:周期性调用getCurrentWatermark() 获取当前机器时间作为当前的Watermark。
      2. AssignerWithPunctuatedWatermarks:对每一条数据生成一个watermark,它返回的Watermark仅在大于上一次返回的Watermark情况下有效
  2. Flink SQL:与Datastream类似,主要是在TableSource中完成。

在实际的生产中Punctuated方式在TPS很高的场景下会产生大量的Watermark,会对下游算子造成一定计算压力,适用于实时性要求较高和TPS低的场景。

周期性适用于实时性要求不高,和TPS高的场景。

4.4.2 多流watermark

Apache Flink 内部要保证Watermark保持单调递增。存在多source watermark不一致的问题。

 

经过keyBy、partition之后,在Flink的底层执行模型上,多流输入会被分解为多个双流输入。下游watermark 取所有上游的最小值

4.5 时间服务

4.5.1 定时器服务

定时器服务在Flink中叫作TimerService,TimeService是在算子中提供定时器的管理行为, 包含定时器的注册和删除。在算子中使用时间服务来创建定时器(Timer),并且 在Timer触发的时候进行回调,从而进行业务逻辑处理。

4.5.2 定时器

定时器在Flink中叫作Timer。注册Timer然后重写其onTimer()方法,在Watermark超过Timer的时间点之后,触发回调onTimer()。

  • 对于事件时间,会根据Watermark,从事件时间的定时器队列中找到比给定时间小的所有定时器,触发该Timer所在的算子,然后由算子去调用UDF中的onTime()方法
  • 处理时间是从处理时间 Timer优先级队列中找到Timer。处理时间因为依赖于当前系统,所以其使用的是周期性调度。

4.5.3 优先队列

Flink自己实现了优先级队列来管理Timer,共有2种实现。

  • 基于堆内存的优先级队列HeapPriorityQueueSet:基于Java堆 内存的优先级队列,其实现思路与Java的PriorityQueue类似,使用了二叉树。
  • 基于RocksDB的优先级队列:分为Cache+RocksDB量级,Cache 中保存了前N个元素,其余的保存在RocksDB中。写入的时候采用 Write-through策略,即写入Cache的同时要更新RocksDB中的数据,可 能需要访问磁盘。

基于堆内存的优先级队列比基于RocksDB的优先级队列性能好,但 是受限于内存大小,无法容纳太多的数据;基于RocksDB的优先级队列 牺牲了部分性能,可以容纳大量的数据。

4.6 窗口实现

在 Flink 中 有 3 类 窗 口 : CountWindow 、 TimeWindow 、 SessionWindow,其执行时的算子是WindowOperator。

事件窗口用的比较少。在Flink中提供了4种Session Window的默认实现。

  1. ProcessingTimeSessionWindows:处理时间会话窗口,使用固定会话间隔时长。
  2. DynamicProcessingTimeSessionWindows : 处理时间会话窗口,使用自定义会话间隔时长。
  3. EventTimeSessionWindows:事件时间会话窗口,使用固定会话间隔时长。
  4. DynamicEventTimeSessionWindows:事件时间会话窗口,使用自定义会话间隔时长。

对于会话窗口,因为无法事先确定窗口的长度,也不知道该将数据元素放到哪个窗口,所以对于每一个事件分配一个SessionWindow。然后判断窗口是否需要与已有的窗口进行合并。窗口合并时按照窗口的起始时间进行排序,然后判断窗口之间是否存在时间重叠,重叠的窗口进行合并。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/850803.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

golang协程池(goroutine池)ants库实践

golang中goroutine由运行时管理,使用go关键字就可以方便快捷的创建一个goroutine,受限于服务器硬件内存大小,如果不对goroutine数量进行限制,会出现Out of Memory错误。但是goroutine泄漏引发的血案,想必各位gopher都经历过&#…

分布式任务调度平台XXL-JOB学习笔记-helloworld运行

环境:win10 eclipse java17 mysql8.0.17 xxl-job 2.4 源码:https://github.com/xuxueli/xxl-job/ 导入时按Existing Maven Projects导入,先导入xxl-job-admin(管理平台)和xxl-job-executor-sample-springboot&#x…

帆软设计器大坑:导出的模板会改变数据集的类型

今天早上在调试一个帆软决策报表(*.frm)中的可视化图表。无意中发现之前自己做的数据源变成了内置数据源(ps.不会更新的静态数据)。 查看了原来复制用的模板: 另存为模板(含数据),放…

LVS-DR模式集群构建过程演示

一、工作原理 LVS的工作原理 1.当用户向负载均衡调度器(Director Server)发起请求,调度器将请求发往至内核空间 2.PREROUTING链首先会接收到用户请求,判断目标IP确定是本机IP,将数据包发往INPUT链 3.IPVS是工作在IN…

THS4301 振荡问题排查及解决过程

项目背景简介: 本项目是基于一款微弱信号处理前级模拟电路设计方案。 问题描述: 在生产标定中,发现以前的程序在小量程标定后,切换到差分和单端后,两者的直流偏置不一样,且切换到差分输入时,能发现有振荡现象(有设备单端输入也有振荡); 排查分析过程: 1)首先可以…

车云一体化系统基础理论

车云一体化系统基础理论 介绍目标正文 参考文档 介绍 最近在调研车云链路一体化的整套解决方案,涉及分布式消息队列(RocketMQ)、分布式存储(Doris)、离线数据处理(Spark)、用户行为日志分析&am…

二级python和二级c哪个简单,二级c语言和二级python

大家好,小编为大家解答二级c语言和二级office一起报可以吗的问题。很多人还不知道计算机二级c语言和python哪个好考,现在让我们一起来看看吧! 介绍Python有很多库和使用Qt编写的接口,这自然创建c调用Python的需求。一路摸索,充满艰辛的添加头…

机器学习笔记:李宏毅ChatGPT:生成式学习的两种策略

1 策略1 “各个击破”——autoregressive model “各个击破”——一个一个生成出来 2 策略2 : “一次到位”——non-autoregressve model 一步到位,全部生成出来 2.1 non-autoregressive model 如何确定长度? 两种策略 策略1:始…

Android OkHttp源码分析--分发器

OkHttp是当下Android使用最频繁的网络请求框架,由Square公司开源。Google在Android4.4以后开始将源码中 的HttpURLConnection底层实现替换为OKHttp,同时现在流行的Retrofit框架底层同样是使用OKHttp的。 OKHttp优点: 1、支持Http1、Http2、Quic以及Web…

MySQL多实例下安装不同的版本

MySQL多版本安装 主要步骤: 1. 在/etc/my.cnf 配置中,更改对应配置。相对于同一版本多实例需要配置的参数,不同版本多实例需要多配置basedir参数,指向mysql的解压目录。 2. 初始化数据目录。进入对应解压的MySQL目录&#xff…

Transformer理论学习

Transformer出自于论文《attention is all you need》。 一些主流的序列模型主要依赖于复杂的循环结构或者CNN,这里面包含了编解码器等。而Transformer主要的结构是基于注意力机制,而且是用多头注意力机制去替换网络中的循环或者CNN(换言之就是transfor…

一篇文章带你彻底了解Java Object类

一篇文章带你彻底了解Java Object类 ​ 在Java的世界中,有一个神秘的存在,它是所有类的根基,无所不在,无所不知。它就是——Object类。本文将带你深入探索Java中这个神秘之源,解密Object类的奥秘,让你更好…

粒子群算法运行时间太长怎么办?—教你一招降低94%的运行时间

不管是初学者还是精通智能优化算法(粒子群算法,遗传算法等)的朋友,相信你们都对智能优化算法运行之慢深有体会,对于比较复杂的问题,经常出现运行一次几小时,调试一次几小时的情况。调试了这么多年代码,智能…

数仓架构模型设计参考

1、数据技术架构 1.1、技术架构 1.2、数据分层 将数据仓库分为三层,自下而上为:数据引入层(ODS,Operation Data Store)、数据公共层(CDM,Common Data Model)和数据应用层&#xff…

IoTDB原理剖析

一、介绍 IoTDB(物联网数据库)是一体化收集、存储、管理与分析物联网时序数据的软件系统。 Apache IoTDB采用轻量式架构,具有高性能和丰富的功能。 IoTDB从存储上对时间序列进行排序,索引和chunk块存储,大大的提升时序…

wireshark 安装和使用

wireshark,世界上最受欢迎的网络协议分析器。是一个网络流量分析器,或“嗅探器”,适用于Linux、macOS、*BSD和其他Unix和类Unix操作系统以及Windows。它使用图形用户界面库Qt以及libpcap和npcap作为数据包捕获和过滤库。 wireshark&#xff…

MyBatis 缓存机制复习及项目中的应用经历

背景 想起前两年工作中因为二级缓存默认开启导致的问题,完整的看了一个介绍 MyBatis 缓存机制的视频《MyBatis 缓存基础知识讲解》。 总计知识点: 缓存的类型及开关这是个形同虚设的功能,线上环境应该禁用缓存 MyBatis 缓存分类 MyBasit…

AWD攻防学习总结(草稿状态,待陆续补充)

AWD攻防学习总结 防守端1、修改密码2、备份网站3、备份数据库4、部署WAF5、部署文件监控脚本6、部署流量监控脚本/工具7、D盾扫描,删除预留webshell8、代码审计,seay/fortify扫描,漏洞修复及利用9、时刻关注流量和积分信息,掉分时…

yolov2检测网数据集标注_labelme使用_json2txt格式转换

yolov2检测网数据集标注_labelme使用_json2txt格式转换 一、安装Anaconda二、创建labelme虚拟环境三、使用labelme标注健康非健康猫狗数据3.1 打开数据集所在文件夹3.2 进行标注数据集3.3 json2txt3.4 按文件目录和训练测试数据集重分配 四、数据喂给服务器网络参考链接 一、安…

容器安装Nginx

文章目录 容器安装nginx下载安装容器1、安装docker容器2、安装nginx3、容器运行nginx结果 容器安装nginx 下载安装容器 1、安装docker容器 yum makecache fast # 更新yum缓存 yum-config-manager \--add-repo \http://mirrors.aliyun.com/docker-ce/linux/centos/docker-ce.…