【Flink】时间语义和水位线的概念和使用

news2024/11/27 21:07:07

文章目录

  • 一 时间语义与Wartermark
    • 1 Flink中的时间语义
    • 2 EventTime的引入
    • 3 Watermark(水位线)
      • (1)基本概念
      • (2)水位线测试
        • a 代码编写
        • b 计算水位线
        • c 计算结果
        • d 深入分析
      • (3)水位线时间测试
        • a 代码编写
        • b 结果分析
      • (4)水位线插入时间测试(调优)

一 时间语义与Wartermark

1 Flink中的时间语义

在Flink的流式处理中,会涉及到时间的不同概念,如下图所示:

在这里插入图片描述

Event Time(事件时间):是事件创建的时间。它通常由事件中的时间戳描述,例如采集的日志数据中,每一条日志都会记录自己的生成时间,Flink通过时间戳分配器访问事件时间戳。

Ingestion Time(摄入时间):是数据进入Flink的时间。

Processing Time(处理时间):是每一个执行基于时间操作的算子的本地系统时间,与机器相关,默认的时间属性就是Processing Time。

例如电影《星球大战》的例子:

在这里插入图片描述

时间时间为上面的1-5,处理时间为下面的年份。

再例如,一条日志进入Flink的时间为2022-11-12 10:00:00.123,到达Window的系统时间为2022-11-12 10:00:01.234,日志的内容如下:

2022-11-02 18:37:15.624 INFO Fail over to rm2

对于业务来说,要统计1min内的故障日志个数,eventTime 时间是最有意义,因此要根据日志的生成时间进行统计。Flink 1.12 默认使用事件时间,无需设置。

2 EventTime的引入

在Flink的流式处理中,绝大部分的业务都会使用eventTime,一般只在eventTime无法使用时,才会被迫使用ProcessingTime或者IngestionTime。

如果要使用EventTime,那么需要引入EventTime的时间属性,引入方式如下所示:

但是使用事件时间会带来一个问题,我们知道,流处理从事件产生,到流经source,再到operator,中间是有一个过程和时间的,虽然大部分情况下,流到operator的数据都是按照事件产生的时间顺序来的,但是也不排除由于网络、分布式等原因,导致乱序的产生,所谓乱序,就是指Flink接收到的事件的先后顺序不是严格按照事件的Event Time顺序排列的,如下图:

理想情况:希望12345依次按序到达。

在这里插入图片描述

实际情况:145先到达,23然后到达,这时如果一个0-5S的窗口,在接收到数据5时就不能够关闭窗口,因为在其后面还有数据。

在这里插入图片描述

乱序事件的影响:

  • 当 Flink 以 Event Time 模式处理数据流时,它会根据数据里的时间戳来处理基于时间的算子。
  • 由于网络、分布式等原因,会导致乱序数据的产生。
  • 乱序数据会让窗口计算不准确。

那么此时出现一个问题,一旦出现乱序,如果只根据eventTime决定window的运行,我们不能明确数据是否全部到位,但又不能无限期的等下去,此时必须要有个机制来保证一个特定的时间后,必须触发window去进行计算了,这个特别的机制,就是Watermark。

3 Watermark(水位线)

使用水位线需要考虑以下问题:

  • 怎样避免乱序数据带来计算不正确?
  • 遇到一个时间戳达到了窗口关闭时间,不应该立刻触发窗口计算,而是等待一段时间,等迟到的数据来了再关闭窗口
  • 要等多长时间?碰到含有 10000s 时间戳的事件,是否可以闭合 0s - 5s 滚动窗口吗?

(1)基本概念

  • Watermark是一种衡量Event Time进展的机制。
  • Watermark是用于处理乱序事件的,而正确的处理乱序事件,通常用Watermark机制结合window来实现。
  • 数据流中的Watermark用于表示timestamp小于等于Watermark的数据,Flink认为其都已经到达了,因此,window的执行也是由Watermark触发的(水位线 >= 窗口结束时间)。
  • Watermark可以理解成一个延迟触发机制,用来让程序自己平衡延迟和结果正确性,可以设置Watermark的延时时长t,每次系统会校验已经到达的数据中最大的maxEventTime,然后认定eventTime小于maxEventTime - t -1的所有数据都已经到达,如果有窗口的停止时间等于maxEventTime - t - 1,那么这个窗口被触发执行。
  • 水位线由程序员编程插入到数据流中,是一种逻辑时钟,对于分布式系统来讲,最重要的一个概念就是逻辑时钟。
  • 水位线是一种特殊的事件。
  • 在事件时间的世界里,水位线就是时间。
  • 水位线 = 观察到的最大时间戳 - 最大延迟时间 - 1 毫秒【因为精度在1ms,所以不会出现0.999毫秒】。

下述文字抽象度较高,如果难以理解,请看关于水位线的测试,然后再回来理解这段文字。

有序流的Watermarker如下图所示(Watermark设置为0):

在这里插入图片描述

乱序流的Watermarker如下图所示(Watermark设置为2):

当Flink接收到数据时,会按照一定的规则去生成Watermark,这条Watermark就等于当前所有到达数据中的maxEventTime - 最大延迟时长 -1ms,也就是说,Watermark是基于数据携带的时间戳生成的,一旦Watermark比当前未触发的窗口的停止时间要晚,那么就会触发相应窗口的执行。由于event time是由数据携带的,因此,如果运行过程中无法获取新的数据,那么没有被触发的窗口将永远都不被触发。

上图中,我们设置的允许最大延迟到达时间为2s,所以时间戳为7s的事件对应的Watermark是7s - 2s - 1ms = 4999ms,时间戳为12s的事件的Watermark是12s - 2s - 1ms = 9999ms,如果窗口1是0s~5s,窗口2是5s~10s,那么时间戳为7s的事件到达时的Watermarker恰好触发窗口1,时间戳为12s的事件到达时的Watermark恰好触发窗口2。

Watermark 就是触发前一窗口的“关窗时间”,一旦触发关窗那么以当前时刻为准,在窗口范围内的所有数据都会收入窗中。

只要没有达到水位线,那么不管现实中的时间推进了多久都不会触发关窗,从这里可以看到,水位线只是在一定程度上解决了数据延迟问题,并不能全部解决乱序问题,那么Flink针对迟到数据也会进行处理,如何处理请见下文。

(2)水位线测试

a 代码编写

为每个用户的PV定义水位线:

public static void main(String[] args) throws Exception{
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    env.setParallelism(1);

    env
            // 输入的内容格式类似:'a 1'
            .socketTextStream("localhost",9999)
            // 需要将原始数据变为元组(a,1000L)
            // value为事件时间
            .map(new MapFunction<String, Tuple2<String,Long>>() {
                @Override
                public Tuple2<String, Long> map(String value) throws Exception {
                    String[] arr = value.split(" ");
                    return Tuple2.of(arr[0],Long.parseLong(arr[1]) * 1000L);
                }
            })
            // 抽取时间戳,设置水位线
            // 默认每隔200ms的机器时间插入一次水位线
            .assignTimestampsAndWatermarks(
                    // 最大延迟时间设置为5S,并设置数据流中的泛型
                    WatermarkStrategy.<Tuple2<String,Long>>forBoundedOutOfOrderness(Duration.ofSeconds(5))
                    .withTimestampAssigner(new SerializableTimestampAssigner<Tuple2<String, Long>>() {
                        @Override
                        public long extractTimestamp(Tuple2<String, Long> element, long recordTimestamp) {
                            // 告诉Flink,事件时间是数据源中的哪一个字段
                            return element.f1;
                        }
                    })
            )
            .keyBy(r ->r.f0)
            // 开启5S的事件时间滚动窗口
            .window(
                    TumblingEventTimeWindows.of(Time.seconds(5))
            )
            // 使用全窗口函数进行聚合
            .process(
                    new ProcessWindowFunction<Tuple2<String, Long>, String, String, TimeWindow>() {
                        @Override
                        public void process(String key, Context context, Iterable<Tuple2<String, Long>> elements, Collector<String> out) throws Exception {
                            long windowStart = context.window().getStart();
                            long windowEnd = context.window().getEnd();
                            // 获取迭代器里面的元素个数
                            long count = elements.spliterator().getExactSizeIfKnown();
                            out.collect("用户【" + key +"】在窗口" + new Timestamp(windowStart) +
                                    " -- " + new Timestamp(windowEnd) + "中的pv次数是" + count);
                        }
                    })
            .print();
    env.execute();
}

b 计算水位线

水位线计算方法:

  • 首先在没有任何时间来临之前Flink会在程序中插入一个负无穷大的水位线
  • 这时输入一个元素a 1,每隔200ms向数据流中插入一个水位线,这时的水位线等于1s - 5s - 1ms = -4001ms,此时数据流中插入一个-4001ms的水位线【逻辑时钟,可以为负】;
  • 再输入一个a 2,间隔200ms,再插入一个水位线,大小等于 2s -5s -1ms = -3001ms
  • 再输入一个a 5,此时水位线等于5s - 5s - 1ms = -1ms
  • 再输入一个a 3,此时水位线等于5s - 5s - 1ms = -1ms
  • 窗口大小为左闭右开,此时0-5的窗口包含三条元素a 1,a 2,a 3,其中a 5被分配到了5 - 10的窗口中,但因为此时逻辑时钟处于-1ms,窗口不会关闭;
  • 再输入一个a 10,此时水位线等于10s - 5s - 1ms = 4999ms,在事件时间的世界中,当前时钟达到定时器时间,关闭0-5的窗口

c 计算结果

运行结果如下:

在这里插入图片描述

d 深入分析

水位线是事件时间中唯一的时钟,当水位线到达4999ms,就认为4999ms之前的数据全部到达,但是4999ms之前的数据真的全部到来了吗?答案显然是否定的,如此时再输入a 4,就进不去0-5的窗口中,那么下面会介绍如何处理迟到数据。

水位线插入的位置:在map操作之前,map操作之后会输出元组,插入的位置就在map的数据流中。这时,给予我们以下启示

  • 在做任何分组(keyBy)之前,先插入水位线
  • 在插入水位线前,数据流的并行度最好为1,否则随机将数据读取到任务槽,再插入水位线,时钟会乱掉

水位线会插入到数据流中,并跟随着数据进入算子内部的这样一个流动状态,这样不同的算子看到的水位线可能不一样,如在Q算子内部,数据进行了无穷大次数的循环,在其后面有一个ProcessWindowFunction函数,那么Q算子就会将这个数据流阻塞,如b中a 1的-4001ms的水位线,其不能够跳过这个算子向前传送。这样Q算子看到的时间就是-4001ms,其后的函数看到的时间就是负无穷大,只有当Q算子计算完成,数据流时钟状态才会更新。

(3)水位线时间测试

a 代码编写

public static void main(String[] args) throws Exception{
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    env.setParallelism(1);

    env
            .socketTextStream("localhost",9999)
            .map(r -> Tuple2.of(r.split(" ")[0],Long.parseLong(r.split(" ")[1]) * 1000L))
            .returns(Types.TUPLE(Types.STRING,Types.LONG))
            .assignTimestampsAndWatermarks(
                    WatermarkStrategy.<Tuple2<String, Long>>forBoundedOutOfOrderness(Duration.ofSeconds(5))
                    .withTimestampAssigner(new SerializableTimestampAssigner<Tuple2<String, Long>>() {
                        @Override
                        public long extractTimestamp(Tuple2<String, Long> element, long recordTimestamp) {
                            return element.f1;
                        }
                    })
            )
            .keyBy(r -> r.f0)
            .process(new KeyedProcessFunction<String,Tuple2<String,Long>,String>(){
                        @Override
                        public void processElement(Tuple2<String, Long> value, Context ctx, Collector<String> out) throws Exception {
                            out.collect("当前的水位线是:" + ctx.timerService().currentWatermark());
                            ctx.timerService().registerEventTimeTimer(value.f1 + 5000L);
                            out.collect("注册了一个时间戳是:【" + new Timestamp(value.f1 + 5000L) + "】的定时器!");
                        }

                @Override
                public void onTimer(long timestamp, OnTimerContext ctx, Collector<String> out) throws Exception {
                    super.onTimer(timestamp, ctx, out);
                    out.collect("定时器触发了!");
                }
            })
            .print();

    env.execute();
}

b 结果分析

  • 输入a 1,立即执行processElement,由于a 1前面插入了一个负无穷大的水位线,输出

    当前的水位线是:-9223372036854775808
    注册了一个时间戳是:【1970-01-01 08:00:06.0】的定时器!
    
  • 输入‘a 2’,输出1 - 5 - 1ms

    当前的水位线是:-4001
    注册了一个时间戳是:【1970-01-01 08:00:07.0】的定时器!
    
  • 输入a 11,输出 2 - 5 - 1ms

    当前的水位线是:-3001
    注册了一个时间戳是:【1970-01-01 08:00:16.0】的定时器!
    
  • 输入 a 12,输出11 -5 - 1ms

    当前的水位线是:5999
    注册了一个时间戳是:【1970-01-01 08:00:17.0】的定时器!
    定时器触发了!
    
  • 输入a 23,输出12 - 5 - 1ms

    当前的水位线是:6999
    注册了一个时间戳是:【1970-01-01 08:00:28.0】的定时器!
    定时器触发了!
    

(4)水位线插入时间测试(调优)

将默认插入水位线的时间改为1分钟,同时将最大延迟时间设置为0s。。

        // 每隔6分钟,插入一条水位线
        env.getConfig().setAutoWatermarkInterval(60 * 1000L);

快速输入三个a 1,一个a 5,两个a 1,确保输入时间不超过1分钟,最终输出结果a的pv为6,因为输入a 5将要触发关闭窗口操作时,水位线还没有插入,所以不会关闭,直到1分钟之后,才会关闭。

所以水位线的插入时间和最大延迟时间需要根据一些经验和指标来设置。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/42100.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

【web前端期末大作业】html网上在线书城大学生静态网页 大学生html当当书城仿站 网上书城购物网页作业HTML

&#x1f389;精彩专栏推荐 &#x1f4ad;文末获取联系 ✍️ 作者简介: 一个热爱把逻辑思维转变为代码的技术博主 &#x1f482; 作者主页: 【主页——&#x1f680;获取更多优质源码】 &#x1f393; web前端期末大作业&#xff1a; 【&#x1f4da;毕设项目精品实战案例 (10…

智慧城市解决方案典型应用

4.2.智慧城市建设目标 4.2.1.高标准的智慧城市基础设施 智慧城市的基础设施主要包括城市信息基础设施和城市空间数据基础设施两个方面。智慧城市建设的首要目标是要建立起完善的、高标准的智慧城市基础设施&#xff0c;并在此基础上建立完备的城市基础信息资源。高标准的城市…

微软文本转语音「免费网页版」

网站地址&#xff1a;Text To Speech - 在线文本转语音 大家好&#xff5e;今天给小伙伴们安利一个AI配音小工具:TTS-文本转语音 【闲话】 疫情三年&#xff0c;很多人都失去工作&#xff0c;有的也是断断续续。很多人负债累累&#xff0c;在全球形势严峻&#xff0c;经济下滑…

【FreeRTOS(三)】任务状态

文章目录任务状态任务挂起 vTaskSuspend取消任务挂起 vTaskResume挂起任务调度器 vTaskSuspendAll取消挂起任务调度器 xTaskResumeAll代码示例&#xff1a;任务挂起、取消任务挂起代码示例&#xff1a;挂起任务调度器、取消挂起任务调度器任务状态 freeRTOS任务的状态有四种&am…

【POJ No. 3321】 子树查询 Apple Tree

【POJ No. 3321】 子树查询 Apple Tree 北大OJ 题目地址 【题意】 在卡卡的房子外面有一棵苹果树&#xff0c;树上有N 个叉&#xff08;编号为1&#xff5e;N &#xff0c;根为1&#xff09;&#xff0c;它们通过分支连接。苹果在叉上生长&#xff0c;两个苹果不会在同一个叉…

1000道最新高频Java面试题,覆盖25个技术栈,从底层原理到架构

最近感慨面试难的人越来越多了&#xff0c;一方面是市场环境&#xff0c;更重要的一方面是企业对Java的人才要求越来越高了。 基本上这样感慨的分为两类人&#xff0c;第一&#xff0c;虽然挂着3、5年经验&#xff0c;但肚子里货少&#xff0c;也没啥拿得出手的项目&#xff0c…

【外卖项目实战开发四】

文章目录菜品管理业务开发文件上传下载文件上传介绍文件下载介绍文件上传代码实现文件下载代码实现新增菜品需求分析数据模型代码开发-准备工作代码开发-梳理交互过程菜品信息分页查询需求分析代码开发-梳理交互过程修改菜品需求分析代码开发-梳理交互过程停售/起售菜品&#x…

阿里P8大牛总结的Java锁机制入门笔记,堪称教科书式天花板

前言 锁机制无处不在&#xff0c;锁机制是实现线程同步的基础&#xff0c;锁机制并不是Java锁独有的&#xff0c;其他各种计算机语言中也有着锁机制相关的实现&#xff0c;数据库中也有锁的相关内容。这篇文章就是从Java入手&#xff0c;深入学习、理解Java中的锁机制&#xf…

【Android App】实战项目之实现你问我答的智能语音机器人(超详细 附源码)

需要全部代码请点赞关注收藏后评论区留言私信~~~ 一、需求描述 想必大家都见过商场里的智能语音机器人&#xff0c;你对它提问时它可以自动回答你的问题&#xff0c;接下来我们也实现这样一个机器人&#xff0c;它依靠语音技术完成问询服务 基本功能如下 1&#xff1a;接收人们…

智能家居—— 树莓派摄像头捕捉人脸并识别

文章目录下载安装mjpg-streamer树莓派安装libcurl库树莓派安装openssl库语音控制开启摄像头线程拍照代码及步骤语音控制摄像头拍照camera.ccontrolDevice.h下载安装mjpg-streamer 参考博文&#xff1a;智能家居 —— 树莓派下载安装mjpg-streamer&#xff08;完成拍照录像监控…

市面上真正的全光谱灯品牌有哪些?全光谱护眼照明灯的作用很明显

众所周知&#xff0c;人眼感知任何事物都离不开光线的照射&#xff0c;但很多人可能不知道&#xff0c;光线不仅可以使我们“看得见”&#xff0c;还可以决定我们是否看得“真实”&#xff0c;这是怎么回事呢&#xff1f;其实这就是光线的色谱丰富度的问题。 人眼感知最舒适的光…

堆、堆排序、堆应用

一、概述 “堆”&#xff08;Heap&#xff09;&#xff0c;原地排序、时间复杂度O(nlogn)的排序算法。 堆是一个完全二叉树&#xff1b;堆中每一个节点的值都必须大于等于&#xff08;或者小于等于&#xff09;其子树中每个节点的值&#xff1b; 二、如何实现一个堆 使用数…

第2-4-7章 docker安装WorkBench-规则引擎Drools-业务规则管理系统-组件化-中台

文章目录8. WorkBench8.1 WorkBench简介8.2 安装方式8.2.1 传统方式安装8.2.2 docker安装drools workbench8.3 使用方式8.3.1 创建空间、项目8.3.2 创建数据对象8.3.3 创建DRL规则文件8.3.4 创建测试场景8.3.5 设置KieBase和KieSession8.3.6 编译、构建、部署8.3.7 在项目中使用…

Intel PAUSE 指令变化如何影响 MySQL 的性能

导读 x86、arm指令都很多&#xff0c;无论是应用程序员还是数据库内核研发大多时候都不需要对这些指令深入理解&#xff0c;但是 Pause 指令和数据库操作太紧密了&#xff0c;本文通过一次非常有趣的性能优化来引入对 Pause 指令的理解&#xff0c;期望可以事半功倍地搞清楚 C…

微服务线上问题排查困难?不知道问题出在哪一环?那是你还不会分布式链路追踪

咱们以前单体应用里面有很多的应用和功能&#xff0c;依赖各个功能之间相互调用&#xff0c;使用公共的代码包等等&#xff0c;排查问题&#xff0c;使用类似于 gdb/dlv 工具或者直接查看代码日志&#xff0c;进行定位和分析 但是现在我们基本上都是微服务架构了&#xff0c;将…

Node.js 入门教程 20 查看 npm 包安装的版本 21 安装 npm 包的旧版本

Node.js 入门教程 Node.js官方入门教程 Node.js中文网 本文仅用于学习记录&#xff0c;不存在任何商业用途&#xff0c;如侵删 文章目录Node.js 入门教程20 查看 npm 包安装的版本21 安装 npm 包的旧版本20 查看 npm 包安装的版本 若要查看所有已安装的 npm 软件包&#xff08…

JetpackCompose Navigation导航快速上手

Navigation 快速上手 下面案例简要展示使用 Compose 版本的 Navigation 库来实现两个页面之间的跳转 这是完整的结构&#xff08;忽略掉红线划过的那个包&#xff09; 编写欲跳转的两个页面 编写 Demo1 页面 子页面使用多个 composable 组件相组合的方法一一装配起来 Demo1m…

【博客543】golang pprof性能调试:寻找cpu瓶颈

golang pprof性能调试&#xff1a;寻找cpu瓶颈 1、引入pprof进行性能调试 在代码中加入&#xff1a; import _ "net/http/pprof"go func() {http.ListenAndServe("0.0.0.0:8899", nil) }()示例&#xff1a;为冒泡排序加入pprof debug package mainimpo…

月薪2万的大数据职位,为什么必须学习Python?

前言 马云说&#xff1a;“未来最大的资源就是数据&#xff0c;不参与大数据十年后一定会后悔。”毕竟出自wuli马大大之口&#xff0c;今年二月份我开始了学习大数据的道路&#xff0c;直到现在对大数据的学习脉络和方法也渐渐清晰。 我们先来看一下数据分析相关职位现在的薪…

Cys(Npys)-(Arg)₉,H2N-C(Npys)-RRRRRRRRR-OH

可渗透细胞的非精氨酸酰胺&#xff0c;可以很容易地偶联到负载分子上&#xff0c;例如通过马来酰亚胺-硫醇偶联。 编号: 126721中文名称: Cys(Npys)-(Arg)₉英文名: Cys(Npys)-(Arg)₉单字母: H2N-C(Npys)-RRRRRRRRR-OH三字母: H2N-Cys(Npys)-Arg-Arg-Arg-Arg-Arg-Arg-Arg-Arg-…