0基础学习PyFlink——水位线(watermark)触发计算

news2025/1/10 17:12:01

在《0基础学习PyFlink——个数滚动窗口(Tumbling Count Windows)》和《0基础学习PyFlink——个数滑动窗口(Sliding Count Windows)》中,我们发现如果窗口中元素个数没有把窗口填满,则不会触发计算。
在这里插入图片描述

为了解决长期不计算的问题,我们引入了在《0基础学习PyFlink——时间滚动窗口(Tumbling Time Windows)》和《0基础学习PyFlink——时间滑动窗口(Sliding Time Windows)》的方案。但是这个方案引入另外一个问题,就是每次处理数据可能不尽相同。这是因为它们使用了“处理时间”(Processing Time)来作为窗口划分的参考系,而每次程序处理时间会根据当前负载情况有很大的不同。这样我们对同一批数据做处理时,可能会得出不同的Window切分方案。
在这里插入图片描述
于是我们引入《0基础学习PyFlink——事件时间和运行时间的窗口》方案。它可以使用源自数据本身的“事件时间”(Event Time)作为Time Window的参考系,这样在不同负载、不同时间,相同数据的时间参考系是一样的,进而可以得出一致的结果。
在这里插入图片描述
但是现实中,我们没法保证上述数据是按照上面的顺序到达Flink的。
比如下面这个例子,红色部分都是乱序的,那么Flink如何处理这些数据呢?
在这里插入图片描述
只有两种可能性:

  1. 直接抛弃;
  2. 等待一段时间统一处理,超过等待的时间直接抛弃。因为不可能一直等下去,否则什么时候处理呢?

这些即有别于Count Window,也有别于Time Window。这个时候就要引入水位线(watermark)技术来解决这个问题。
在详细讲解之前,我们需要明确一些基本知识:

  1. EventTime就是Timestamp,即我们可以通过制定Timestamp函数设定元素的EventTime。
  2. EventTime从属于元素。
  3. Watermark源于EventTime和max_out_of_orderness(等待无序数据的时间),即Watermark=EventTime-max_out_of_orderness。
  4. Watermark从属于流。
  5. Window的Start源于EventTime;End源于Start和窗口时间,即End=Start+WindowTme;这是一个左闭右开的区间,即[Start, End)。
  6. Window从属于流,只有Watermark>=Window End时才会触发计算(且窗口中要有元素)。
  7. Window在单向递增前进,比如从[0,10)变成[10,20)、[20,30)……[90,100)。
  8. Wartermark单向递增前进,它不会因为新进入的元素EventTime较小,而导致Wartermark向变小的趋势发展。
    在这里插入图片描述
    上图中,第一个元素(A,1)的EventTime通过自定义公式可以得到101,于是初始的Window的Start值是该值向下取可以被Window Size整除的最大值,即100;这个进一步确认了第一个窗口是[100,105)。
    watermark是通过eventtime计算出来的,上例中我们希望如果事件在窗口时间之外到来则抛弃,即不等待任何时间,即Window End+0,即Eventtime-0。
    (A,0)数据来到的时候,watermark不会因为其Eventtime为100,比流中的watermark值(101)小而改变,依然维持watermark单调递增。这个在(A,2)和(A,5)到来时也是如此。
    (A,8)元素的到来,会让流的watermark变成108。这个值会越过当前窗口[100,105),于是会触发计算。计算的元素要求eventtime在上述区间内,即(A,1)、(A,0)、(A,3)和(A,4);
    (A,10)元素的到来,会让流的watermark变成110。这个值会越过当前窗口[100,110),于是会触发计算。计算的元素要求eventtime在上述区间内,即(A,8)、(A,6)、(A,7)和(A,9);而(A,2)因为不在这个区间内,就被抛弃了。我们也可以认为(A,2)迟到而被抛弃。
    为了更好讲述原理,上述例子存在一个假设:watertime更新是随着元素一个个进入而改变的。而实际元素进入个数不太确定,比如可能会两个两个进入,那么就会变成如下。主要区别就是(A,5)参与了第二次窗口计算,虽然它迟到了,而且watermark计算方法也不打算等待任何一个迟到的数据,但是它和(A,10)一起进入时间戳计算逻辑,导致触发的时机被滞后,从而“幸运”的赶上了最后一轮窗口计算。如果它稍微再晚一点到来,它也会被抛弃。
    在这里插入图片描述

测试代码

import time
from pyflink.common import Duration, WatermarkStrategy, Time, Types
from pyflink.datastream.window import TumblingEventTimeWindows, TimeWindow, TumblingProcessingTimeWindows
from pyflink.common.watermark_strategy import TimestampAssigner
from pyflink.datastream import StreamExecutionEnvironment,RuntimeExecutionMode, TimeCharacteristic
from pyflink.table import StreamTableEnvironment, TableDescriptor, Schema, DataTypes
from pyflink.datastream.functions import AllWindowFunction, ProcessFunction, ProcessAllWindowFunction, KeyedProcessFunction
from pyflink.table.expressions import lit, col
from pyflink.table.window import Tumble
from pyflink.common.time import Instant
from pyflink.table.udf import udf
from pyflink.common import Row

            
class WindowFunc(AllWindowFunction[tuple, tuple, TimeWindow]):
    def apply(self, window, inputs):
        out = "**************************WindowFunc**************************" \
                "\nwindow: start:{} end:{} \ninputs: {}" \
                "\n**************************WindowFunc**************************" \
                    .format(Instant.of_epoch_milli(window.start), Instant.of_epoch_milli(window.end), inputs)
        print(out)
      
        for value in inputs:
            yield (value, Instant.of_epoch_milli(window.start), Instant.of_epoch_milli(window.end))

class TimestampAssignerAdapter(TimestampAssigner):
    def extract_timestamp(self, value, record_timestamp: int):
        return value[1] * 1000
    
class TimestampAssignerProcessFunctionAdapter(ProcessFunction):
    def process_element(self, value, ctx: 'ProcessFunction.Context'):
        out_put = "-----------------------TimestampAssignerProcessFunctionAdapter {}-----------------------" \
                    "\nvalue: {} \ttimestamp: {} \tcurrent_processing_time: {} \tcurrent_watermark: {}" \
                    "\n-----------------------TimestampAssignerProcessFunctionAdapter-----------------------" \
                        .format(int(time.time()), value, Instant.of_epoch_milli(ctx.timestamp()),
                                Instant.of_epoch_milli(ctx.timer_service().current_processing_time()),
                                Instant.of_epoch_milli(ctx.timer_service().current_watermark()))
                        
        print(out_put)
        
        yield (value, Instant.of_epoch_milli(ctx.timestamp()), 
               Instant.of_epoch_milli(ctx.timer_service().current_processing_time()),
               Instant.of_epoch_milli(ctx.timer_service().current_watermark()))

def gen_random_int_and_timestamp():
    stream_execute_env = StreamExecutionEnvironment.get_execution_environment()
    # stream_execute_env.set_runtime_mode(RuntimeExecutionMode.STREAMING)
    stream_execute_env.set_stream_time_characteristic(TimeCharacteristic.EventTime)
    stream_execute_env.set_parallelism(1)
    stream_execute_env.get_config().set_auto_watermark_interval(2)
    
    stream_table_env = StreamTableEnvironment.create(stream_execution_environment=stream_execute_env)
    ordinal_num_start = 0
    ordinal_num_end = 10
    rows_per_second = 1
    
    schame = Schema.new_builder().column('in_ord', DataTypes.INT()) \
                                .build()
                                
    table_descriptor = TableDescriptor.for_connector('datagen') \
                        .schema(schame) \
                        .option('fields.in_ord.kind', 'sequence') \
                        .option('fields.in_ord.start', str(ordinal_num_start)) \
                        .option('fields.in_ord.end', str(ordinal_num_end)) \
                        .option('rows-per-second', str(rows_per_second)) \
                        .build()
          
    stream_table_env.create_temporary_table('source', table_descriptor)
    
    table = stream_table_env.from_path('source')
    
    @udf(result_type=DataTypes.ROW([DataTypes.FIELD("in_ord", DataTypes.INT()), DataTypes.FIELD("calc_order", DataTypes.INT())]), input_types=[DataTypes.INT()])
    def colFunc(oneCol):
        ordinal_num_data_map = {0: 1, 1: 0, 2: 3, 3: 4, 4: 8, 5: 6, 6: 7, 7: 2, 8: 9, 9: 10, 10: 5}
        # ordinal_num_data_map = {0: 16, 1: 1, 2: 2, 3: 3, 4: 4, 5: 5, 6: 6, 7: 7, 8: 8, 9: 9,
        #                       10: 10, 11: 11, 12: 12, 13: 13, 14: 14, 15: 15, 16: 0, 17: 17, 18: 18, 19: 19,
        #                       20: 20, 21: 121, 22: 122, 23: 123, 24: 124, 25: 125, 26: 126, 27: 127, 28: 128, 29: 129,}
        data = ordinal_num_data_map[oneCol] + 100
        return Row(oneCol, data)
    
    input_table=table.map(colFunc(col('in_ord')))
    
    datastream = stream_table_env.to_data_stream(input_table)
    
    ###############################################################################################    
    # datastream.window_all(TumblingProcessingTimeWindows.of(Time.milliseconds(10))) \
    #                     .apply(WindowFunc())
    ###############################################################################################
    # watermark_strategy = WatermarkStrategy.no_watermarks().with_timestamp_assigner(TimestampAssignerAdapter())
    # datastream_with_watermark=datastream.assign_timestamps_and_watermarks(watermark_strategy)
    # datastream_with_watermark.process(TimestampAssignerProcessFunctionAdapter())
    
    # datastream_with_watermark.window_all(TumblingEventTimeWindows.of(Time.milliseconds(10))) \
    #                     .apply(WindowFunc())        
    ###############################################################################################
    # watermark_strategy = WatermarkStrategy.for_monotonous_timestamps().with_timestamp_assigner(TimestampAssignerAdapter())
    watermark_strategy = WatermarkStrategy.for_bounded_out_of_orderness(Duration.of_seconds(0)).with_timestamp_assigner(TimestampAssignerAdapter())
    datastream_with_watermark=datastream.assign_timestamps_and_watermarks(watermark_strategy)
    
    datastream_with_watermark.process(TimestampAssignerProcessFunctionAdapter())
    
    
    datastream_with_watermark.window_all(TumblingEventTimeWindows.of(Time.seconds(5))) \
                        .apply(WindowFunc())
    ###############################################################################################

    stream_execute_env.execute()
    
if __name__ == '__main__':
    gen_random_int_and_timestamp()

-----------------------TimestampAssignerProcessFunctionAdapter 1699856800-----------------------
value: Row(in_ord=0, calc_order=101) timestamp: Instant<101, 0> current_processing_time: Instant<1699856800, 705000000> current_watermark: Instant<-9223372036854776, 192000000>
-----------------------TimestampAssignerProcessFunctionAdapter-----------------------
-----------------------TimestampAssignerProcessFunctionAdapter 1699856802-----------------------
value: Row(in_ord=1, calc_order=100) timestamp: Instant<100, 0> current_processing_time: Instant<1699856802, 700000000> current_watermark: Instant<100, 999000000>
-----------------------TimestampAssignerProcessFunctionAdapter-----------------------
-----------------------TimestampAssignerProcessFunctionAdapter 1699856802-----------------------
value: Row(in_ord=2, calc_order=103) timestamp: Instant<103, 0> current_processing_time: Instant<1699856802, 702000000> current_watermark: Instant<100, 999000000>
-----------------------TimestampAssignerProcessFunctionAdapter-----------------------
-----------------------TimestampAssignerProcessFunctionAdapter 1699856804-----------------------
value: Row(in_ord=3, calc_order=104) timestamp: Instant<104, 0> current_processing_time: Instant<1699856804, 700000000> current_watermark: Instant<102, 999000000>
-----------------------TimestampAssignerProcessFunctionAdapter-----------------------
-----------------------TimestampAssignerProcessFunctionAdapter 1699856804-----------------------
value: Row(in_ord=4, calc_order=108) timestamp: Instant<108, 0> current_processing_time: Instant<1699856804, 709000000> current_watermark: Instant<102, 999000000>
-----------------------TimestampAssignerProcessFunctionAdapter-----------------------
WindowFunc
window: start:Instant<100, 0> end:Instant<105, 0>
inputs: [Row(in_ord=0, calc_order=101), Row(in_ord=1, calc_order=100), Row(in_ord=2, calc_order=103), Row(in_ord=3, calc_order=104)]
WindowFunc
-----------------------TimestampAssignerProcessFunctionAdapter 1699856806-----------------------
value: Row(in_ord=5, calc_order=106) timestamp: Instant<106, 0> current_processing_time: Instant<1699856806, 701000000> current_watermark: Instant<107, 999000000>
-----------------------TimestampAssignerProcessFunctionAdapter-----------------------
-----------------------TimestampAssignerProcessFunctionAdapter 1699856806-----------------------
value: Row(in_ord=6, calc_order=107) timestamp: Instant<107, 0> current_processing_time: Instant<1699856806, 705000000> current_watermark: Instant<107, 999000000>
-----------------------TimestampAssignerProcessFunctionAdapter-----------------------
-----------------------TimestampAssignerProcessFunctionAdapter 1699856808-----------------------
value: Row(in_ord=7, calc_order=102) timestamp: Instant<102, 0> current_processing_time: Instant<1699856808, 700000000> current_watermark: Instant<107, 999000000>
-----------------------TimestampAssignerProcessFunctionAdapter-----------------------
-----------------------TimestampAssignerProcessFunctionAdapter 1699856808-----------------------
value: Row(in_ord=8, calc_order=109) timestamp: Instant<109, 0> current_processing_time: Instant<1699856808, 701000000> current_watermark: Instant<107, 999000000>
-----------------------TimestampAssignerProcessFunctionAdapter-----------------------
-----------------------TimestampAssignerProcessFunctionAdapter 1699856809-----------------------
value: Row(in_ord=9, calc_order=110) timestamp: Instant<110, 0> current_processing_time: Instant<1699856809, 440000000> current_watermark: Instant<108, 999000000>
-----------------------TimestampAssignerProcessFunctionAdapter-----------------------
-----------------------TimestampAssignerProcessFunctionAdapter 1699856809-----------------------
value: Row(in_ord=10, calc_order=105) timestamp: Instant<105, 0> current_processing_time: Instant<1699856809, 441000000> current_watermark: Instant<108, 999000000>
-----------------------TimestampAssignerProcessFunctionAdapter-----------------------
WindowFunc
window: start:Instant<105, 0> end:Instant<110, 0>
inputs: [Row(in_ord=4, calc_order=108), Row(in_ord=5, calc_order=106), Row(in_ord=6, calc_order=107), Row(in_ord=8, calc_order=109), Row(in_ord=10, calc_order=105)]
WindowFunc
WindowFunc
window: start:Instant<110, 0> end:Instant<115, 0>
inputs: [Row(in_ord=9, calc_order=110)]

参考资料

  • https://nightlies.apache.org/flink/flink-docs-release-1.3/dev/event_timestamps_watermarks.html

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1202572.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

日志及其框架

日志技术的概述 日志 生活中的日志&#xff1a; 生活中的日志就好比日记&#xff0c;可以记录你生活的点点滴滴。 程序中的日志&#xff1a; 程序中的日志可以用来记录程序运行过程中的信息&#xff0c;并可以进行永久存储。 以前记录日志的方式&#xff08;输出语句&#…

设置专属链接的这些作用你知道吗?

专属链接作为一种个性化的链接&#xff0c;用于为特定的客户或群体提供定制化的体验或服务。对于企业来说&#xff0c;每个渠道或者每个客户都能拥有一个专属链接是无比便利的事情。企业可以将这个链接嵌入到各种宣传物料中&#xff0c;让客户通过输入链接即可进入与客服的交流…

thinkphp5 连接多个服务器数据库

如果你的database.php 是这样&#xff0c; 这是默认的db连接配置 如果还想连接其他服务器&#xff0c;或数据库 在config.php中追加数据库配置&#xff0c; 在使用的地方调用&#xff1a; use think\Db;public function test(){$db3Db::connect(config(db3));$result $db3…

使用Python的requests库模拟爬取地图商铺信息

目录 引言 一、了解目标网站 二、安装requests库 三、发送GET请求 四、解析响应内容 五、处理异常和数据清洗 六、数据存储和分析 七、数据分析和可视化 八、注意事项和最佳实践 总结 引言 随着互联网的快速发展&#xff0c;网络爬虫技术已经成为获取数据的重要手段…

Leetcode-104 二叉树的最大深度

递归实现 /*** Definition for a binary tree node.* public class TreeNode {* int val;* TreeNode left;* TreeNode right;* TreeNode() {}* TreeNode(int val) { this.val val; }* TreeNode(int val, TreeNode left, TreeNode right) {* …

谈谈steam游戏搬砖的收益与风险,以及注意事项

11月CSGO市场行情分析&#xff0c;是否到了该抄底的时候了&#xff1f; 今天&#xff0c;要跟大家分享的Steam平台——全球最大的游戏平台&#xff0c;现在给大家介绍下steam搬砖项目&#xff0c;这个项目既小众又稳定。 先了解一下 steam这个平台是做什么的&#xff0c;steam…

navicat创建MySql定时任务

navicat创建MySql定时任务 前提 需要root用户权限 需要开启定时任务 1、开启定时任务 1.1 查看定时任务是否开启 mysql> show variables like event_scheduler;1.2 临时开启定时任务(下次重启后失效) set global event_scheduler on;1.3 设置永久开启定时任务 查看my…

c语言-数据结构-带头双向循环链表

目录 1、双向循环链表的结构 2、双向循环链表的结构体创建 3、双向循环链表的初始化 3.1 双向链表的打印 4、双向循环链表的头插 5、双向循环链表的尾插 6、双向循环链表的删除 6.1 尾删 6.2 头删 6.3 小节结论 7、查找 8、在pos位置前插入数据 9、删除pos位…

Scala---介绍及安装使用

一、Scala介绍 1. 为什么学习Scala语言 Scala是基于JVM的语言&#xff0c;与java语言类似&#xff0c;Java语言是基于JVM的面向对象的语言。Scala也是基于JVM&#xff0c;同时支持面向对象和面向函数的编程语言。这里学习Scala语言的原因是后期我们会学习一个优秀的计算框架S…

单链表(7)

插入函数——插入数据&#xff0c;在链表plist的pos位置插入val数据元素 由图知&#xff0c;poslength时&#xff0c;是可以插入的 在大多数情况下&#xff0c;说位置的时候&#xff0c;从0开始计数&#xff1b;说第几个数据的时候&#xff0c;从1开始计数 现在来测试一下 这就…

CSDN的规范、检测文章质量、博客等级好处等等(我也是意外发现的,我相信很多人还不知道,使用分享给大家!)

前言 都是整理官方的文档&#xff0c;方便自己查看和检查使用&#xff0c;以前我也不知道。后来巧合下发现的&#xff0c;所以分享给大家&#xff01; 下面都有官方的链接&#xff0c;详情去看官方的文档。 大家严格按照官方的规范去记录自己工作生活中的文章&#xff0c;很快…

MacOS Ventura 13 优化配置(ARM架构新手向导)

一、系统配置 1、About My MacBook Pro 2、在当前标签打开新窗口 桌面上创建目录的文件夹&#xff0c;每次新打开一个目录&#xff0c;就会创建一个窗口&#xff0c;这就造成窗口太多&#xff0c;不太好查看和管理&#xff0c;我们可以改成在新标签处打开新目录。需要在&…

电动自动换刀高速电主轴的技术优势浅析

在制造业中&#xff0c;自动化技术的发展一直是一个重要的话题。其中&#xff0c;电动自动换刀被认为是一项高效、智能、先进的技术&#xff0c;在高速电主轴中使用电动自动换刀这一技术&#xff0c;不仅能够缩短换刀时间&#xff0c;还能减少换刀失误&#xff0c;本文将探讨Sy…

光计算1周2篇Nature,英伟达的时代彻底结束!

近期&#xff0c;光计算领域连续发出重量级文章&#xff0c;刊登在学术界的顶级期刊上。一时间&#xff0c;各大媒体纷纷转发&#xff0c;读者们也纷纷感叹&#xff1a;中国芯片取代英伟达的机会来了&#xff01;今天&#xff0c;光子盒用这篇万字长文为大家梳理光计算的背景、…

指标类型(一):北极星指标、虚荣指标

每个产品都有很多指标&#xff0c;每个指标都反映了对应业务的经营情况。但是在实际业务经营中&#xff0c;却要求我们在不同的产品阶段寻找到合适的指标&#xff0c;让这个指标可以代表当前产品阶段的方向和目标&#xff0c;让这个指标不仅对业务经营团队&#xff0c;而且对产…

双十一网络电视盒子哪个品牌好?内行分享权威电视盒子排行榜

双十一大促正如火如荼进行中&#xff0c;因为我从事的工作和电视盒子有关&#xff0c;身边的朋友们在选购电视盒子时不知道从何下手就会问我的意见&#xff0c;本期将盘点业内公认的电视盒子排行榜&#xff0c;给双十一想买电视盒子的朋友们做个参考。 排行一&#xff1a;泰捷W…

【C++】非类型模板参数 | array容器 | 模板特化 | 模板为什么不能分离编译

目录 一、非类型模板参数 二、array容器 三、模板特化 为什么要对模板进行特化 函数模板特化 补充一个问题 类模板特化 全特化与偏特化 全特化 偏特化 四、模板为什么不能分离编译 为什么 怎么办 五、总结模板的优缺点 一、非类型模板参数 模板参数分两类&#x…

MVVM框架:图片加载有问题

一、前言&#xff1a;在我使用ImageView加载图片的时候添加如下代码发现报错 app:imageUrl"{viewModel.observableField.assetImg}"报错如下错误 二、原因&#xff1a;是啥我不太清楚好像是没有imageView的适配器&#xff0c;后来我看了一下确实没有 public class I…

Java中所有的运算符,以及运算符优先级(总结)

运算法是一种特殊的符号&#xff0c;用于表示数据的运算、复制、比较等。 1、算数运算符 // % 取余运算&#xff1a;结果的符号和被模数的符号一致 12 % 5 2 -12 % 5 -2 12 % -5 2 -12 % -5 -2int a1 10; int b1 a1; // a111, b111 int a2 10; int b2 a2; // a211, …

keras转onnx,TensorFlow转tf.keras.models.load_model,onnx精度转换

参考&#xff1a; https://blog.csdn.net/Deaohst/article/details/126864267 转onnx 别直接转onnx。 先转PB&#xff1a; import tensorflow as tfmodel_path ./models/model.h5 # 模型文件 model tf.keras.models.load_model(model_path) model.sa…