Apache Flink 水印的工作机制详解与源码阅读

news2024/10/5 20:43:34

一、时间长河谁能解

在人类生存的地球上,存在着一种很神秘的东西:时间,它看不见摸不着,但速度恒定,单调递增且永无止境的往前推进,人类的历史被淹没在茫茫的时间长河中。
同时在地球附近,一个星球叫做:Flink 星球。
如同太阳需要氢原子作为燃料,Flink 星球的燃料就是地球提供的数据,它的使命就是勤勤恳恳的为地球计算数据,为了满足地球人蛮荒发展催生的海量数据和复杂业务,Flink 星球也在不断迭代不断发展。

Flink 星球最神秘的地方在于,它可以自由调整时间的流动方式,既可以把时间更改为和地球一样的处理时间(ProcessingTime),也可以把它的时间更改为事件时间(EventTime),给多少数据,就流动多少时间。
随着 Flink 星球不断的进化,从 1.15 版本开始,为了更好的符合这个星球的计算使命,把事件时间作为默认时间。也就是在默认情况下,提供多少数据,就流动多少时间。
但是凡事都要遵守原则,Flink 星球时间的原则就是:可以停滞,也可以速度不恒定,但一定是单调递增,誓不回退。

Flink 星球吸收了地球的很多重要理念,其中一条就是:分工合作,不同的事情交给不同的团队完成。
由于地球的业务太多,有太多工厂在生产数据,比如文件系统、 Socket,Kafka,JDBC,于是 Flink 星球成立了专业的团队:连接器,这个团队只做一件事,收数据。
另外内部还成立了算子团队来和连接器对接,比如:

  • 业务专一的 Source 算子,也就是连接器;
  • 老实忠厚的 Map 算子,进来一条数据处理一条数据输出一条数据;
  • 高标准强要求的 Filter 算子,对于不符合条件的数据,一律拦截;
  • 能力强业务精的 Process 算子,可以对任意数据做任意计算。

另外,由于地球的某些工厂严重腐化,爱睡懒觉,磨洋工,运送数据的飞船经常未按约定时间到达 Flink 星球,这让 Flink 某些团队特别头疼。
比如如下工作场景该如何解决:
场景一如上图,有四条数据(3、6、9、7),当3、6、9 分别来了之后,Flink 的时间跟随数据一直递增到 9 。
如果这时候来了一条数据为7,此时 Flink 中的时间会推进到多少?

场景二

算子1 有两个实例在处理数据,某一个时刻,下游算子2同时接收到了这两个实例发送过来的数据 3 和数据 5 ,此刻算子 2 中的时间是多少?

二、水印机制解难题

在没有发明水印之前,Flink 星球的工人使用了一种比较极端的数据处理方式:延迟的数据直接丢掉。这直接导致了计算结果异常,外交事件频出,疲于奔命。
之后一段时间,在不断的和地球的交涉中,Flink 星球交付了一种水印机制,暂时平息了风波。

(1)水印的产生

水印(Watermark)也是一种数据,不同于地球人给过来的数据,水印是算子团队内部产生的一种特殊的数据,只附带了时间属性。
水印可以由任意算子产生,一般由 Source 算子产生的。算子中有一个定时器,每隔 200ms 产生一个水印数据。
如下图就是 Source 算子的内部结构,里面维护了一个最大时间戳,每来一条数据,如果数据的时间比这个最大时间戳大,就更新这个时间戳。
另外还有一个定时器,默认情况下,每隔 200 ms 工作一次,每次工作,都会使用这个最大的时间戳的时间值,封装成一条水印数据,发送到数据的流水线上,和数据一同流向下一个算子。

如下图,是一个流水线上,Source 算子和 Map 算子努力工作的场景,传送带上有一些数据在流动。蓝色的是地球人发过来的要处理的数据,橙色的是 Source 算子自己产生的数据。
Source 算子产生水印的策略是,数据的时间是多少,水印的时间就是多少。

当数据 15 经过 Source 算子时,会把数据中的时间拿出来和 Source 算子内部的最大时间戳比对,如果比这个时间戳大,则把 内部的 maxTimestamp 赋值为 15。
定时器每隔 200ms 触发一次,每次到点了,就会用这个最大时间戳生成一个 watermark,发送到数据流中。

(2)水印的第一个作用:触发计算

交待了水印的背景以及初步的产生方式后,还需要介绍一下水印到底是如何在地球人延迟交付数据后,还能相对准确的算准数据的。
在 Flink 世界中,水印的作用只有两大场景:窗口计算和定时器,这里使用一个窗口计算的例子来说明。
首先为何要有窗口计算,因为数据是源源不断产生,没有一个终点,计算永远也不会停止,所以在无限的数据集上永远也无法得到最终结果。
如果把这无限的数据,切分成一段一段的有限数据集合,就可以做计算了,如下图:

Source 算子读取数据,发送数据到 Map 算子,Map算子在转换之后,根据数据的事件时间不同,把数据划分到三个窗口中,Process 算子来计算每个窗口中的数据总和。
那么问题来了,Process 算子在什么时候触发每个窗口的计算呢?

流水线上有数据5,水印5,数据8,数据10,水印10,依次经过 process 算子
当 数据5和水印5 经过 process 算子后的情况是:

数据5 被放到窗口1中,水印5 经过 Process 算子后,会判断水印值是否大于等于窗口结束时间 10,发现没有,则不触发计算。
等到了数据10和水印10过来之后:

数据8 被放到第一个窗口中。由于窗口是左闭右开,数据10被放到了第二个窗口中。
同时水印10由于等于第一个窗口的结束时间,第一个窗口被触发了计算。如果计算逻辑是求和,则第一个窗口的结果是 13.

(3)水印的第二个作用:处理延时数据

上面的例子展示了,如果水印时间和数据时间相等的情况。如果需要考虑数据延迟,那么需要调整水印生成策略,让水印的生成落后于数据的产生,
比如:水印时间 = 数据时间 - 3
比如有如下初始数据,假设 Process 算子后面有这些数据待处理。
每个数据时间后,紧跟着水印时间,水印时间为数据时间 - 3.
如下图,数据5、数据10、数据13是正常数据,但是数据6是迟到数据:

如下图,数据 8 被放到第一个窗口中,然后 process 算子收到水印5,比较是否到了窗口的结束时间,还未达到。继续处理下面的数据。

如下图,当数据 10 到达 process 算子时,被放到第二个窗口。此时虽然数据已经等于窗口结束时间了,但是此时水印才为7,仍然未达到窗口触发时间。

如下图,这条延时的数据6 还是被放到了第一个窗口,并没有被丢弃。

如下图, 13 这条数据被放到了 窗口2,此时水印10到达了 process 算子,就会触发窗口1 的计算,得到一个统计结果。

这样就很好的处理了延时数据。

三、Api里面见玄机

具体如何在代码中实现,下面展示了一段代码:
在代码中,source 算子从 一个 Socket 中读取数据,交由 map 算子处理
map 算子把字符串切割成字段,封装成对象返回
然后定义了水印的生成方式:

Configuration config = new Configuration();
config.setInteger("rest.port", 8081);
StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(config);
DataStreamSource<String> source = env.socketTextStream("192.168.10.100", 9999);

SingleOutputStreamOperator<EventBean2> beanStream = source.map(s -> {
    String[] split = s.split(",");
    return new EventBean2(Long.parseLong(split[0]),Long.parseLong(split[1]) ,Integer.parseInt(split[2]));
}).returns(EventBean2.class);

SingleOutputStreamOperator<EventBean2> watermarkedStream = beanStream.assignTimestampsAndWatermarks(
        WatermarkStrategy.<EventBean2>forBoundedOutOfOrderness(Duration.ofMillis(0))
                .withTimestampAssigner(new SerializableTimestampAssigner<EventBean2>() {
                    @Override
                    public long extractTimestamp(EventBean2 eventBean, long l) {
                        return eventBean.getTimestamp();
                    }
                })
);

这个 WatermarkStrategy 中有两种生成方式:

  • forMonotonousTimestamps
  • forBoundedOutOfOrderness

第一种就是乱序时间为0的情况,也就是紧跟着数据的时间。第二种可以设置一个延迟的时间,比如上面的案例中,可以延迟3s

另外既然水印的时间从数据中来,就需要告诉 Flink,如何从数据中抽取时间出来
于是还需要指定 withTimestampAssigner。

当前指定水印可以从任意算子开始。

四、源码里面找真理

Flink 源码是相当庞大的,一层层的往下点,很容易会迷路失去方向,那么为了了解核心源码,可以采用 debug 的方式,先在业务代码上打断点,等到了断点处,会显示调用栈,然后阅读调用栈上的核心代码就可以。

(1)当数据到达算子的时候,如何处理的

我们在业务代码抽取时间戳的地方打一个断点,然后在 Idea 中看调用栈:

跑起来,在 Socket 中输入一行数据后,就可以在 Idea 中就可以看到长长的调用栈:

点到第三个调用栈,可以看到到了这个类中:TimestampsAndWatermarksOperator
这也是一个单独的算子,它的职责就是从事件中提取时间,并生成水印

最后一行:watermarkGenerator.onEvent(event, newTimestamp, wmOutput); 点进去的逻辑是:
maxTimestamp = Math.max(maxTimestamp, eventTimestamp);
也就是用事件时间和当前的算子的记录时间比较,如果事件时间比当前时间大,则更新当前的时间。

(2)定时器如何周期性发射水印

从业务代码中往里面点几步

可以看到这个类,集成了 WatermarkGenerator 接口

定义了两个方法,事件到来的触发和周期性发射方法

我们可以在这个地方打一个断点,看是如何周期性发射水印的

重新启动,这时候还没有数据,已经到断点处来
可以点到第二个调用栈,看看

来到 onProcessingTime
image.png
第一行的逻辑就是:
output.emitWatermark(new Watermark(maxTimestamp - outOfOrdernessMillis - 1));
使用最大的时间戳减去定义的延时时间减去1,构造一个水印发送出去。
第二行得到当前的系统时间,注册了一个定时器,定时器的触发时间是,当前时间+水印发射的间隔时间(200ms)
也就是当前时间过了 200ms,又会触发这个 onProcessingTime,又会重复当前的逻辑。
不是我们想象中的使用一个 while 循环。

五、到了再见时间

本次,我们在 Flink 星球上,鉴赏了 Flink 星球为了处理延时数据而做出的努力,创造了水印这样的一个神奇的机制,来处理流式计算业务的延时问题。
相信下一次碰到类似问题,会有所启发。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/78135.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

【自动化测试】如何平衡手工和自动化测试

作为一名测试人员&#xff0c;如何平衡手工和自动化测试&#xff0c;是一道绕不过去的课题。不可否认&#xff0c;自动化测试具有提高效率&#xff0c;加快回归速度并因此有助于及时交付项目的好处。但是&#xff0c;在考虑自动化之前&#xff0c;我们应该评估一些要点&#xf…

2023最新SSM计算机毕业设计选题大全(附源码+LW)之java毕业生就业管理系统243xa

首先选择计算机题目的时候先看定什么主题&#xff0c;一般的话都选择当年最热门的话题进行组题&#xff0c;就比如说&#xff0c;今年的热门话题有奥运会&#xff0c;全运会&#xff0c;残运会&#xff0c;或者疫情相关的&#xff0c;这些都是热门话题&#xff0c;所以你就可以…

vscode 关闭/忽略/ignore 单个/指定 git 仓库/repository 提示

文章目录1. 问题2. 解决方法2.1 只追踪 打开文件 所在的仓库2.2 忽略指定的仓库3. 参考1. 问题 vscode 当打开的项目中有多个 git 仓库时&#xff0c;默认会显示所有仓库的 status。 有些已经不再使用的仓库可能有多处更改&#xff0c;但我们并不想去处理它。 如果直接关闭 vs…

gcc 4.8.5 的string问题

gcc 4.8.5及一下版本 的string 因为内部内存管理采用了COW的方式&#xff0c;导致了string在多线程的状态下容易崩溃。崩的很没有规律&#xff0c;表现就是string在赋值的时候就莫名其妙的崩了&#xff0c;查看堆栈&#xff0c;你也看不出啥。崩溃的很随机。 为了避免崩溃&…

文本纠错--文本分割N-gram--Macbert模型的调用以及对返回结果的处理

文本根据词典进行纠错 输入一段可能带有错误信息的文字&#xff0c; 通过词典来检测其中可能错误的词。 例如&#xff1a;有句子如下&#xff1a;中央人民政府驻澳门特别行政区联络办公室1日在机关大楼设灵堂    有词典如下&#xff1a;中国人民&#xff0c;中央人民&#x…

漏斗分析 - AARRR模型案例分析

漏斗分析是一套流程式的数据分析方法&#xff0c;能够科学地反映各阶段用户转化情况。漏斗分析模型已经广泛应用于用户行为分析类产品&#xff0c;且功能十分强大&#xff1a;它可以评估总体或各个环节的转化情况、促销活动效果&#xff1b;也可以与其他数据分析模型结合进行深…

【三维目标检测】SSN(二)

SSN数据和源码配置调试过程请参考上一篇博文&#xff1a;【三维目标检测】SSN&#xff08;一&#xff09;_Coding的叶子的博客-CSDN博客。本文主要详细介绍SSN网络结构及其运行中间状态。 1 模型总体过程 SSN主要结构如下图所示&#xff0c;其核心在于提出了shape-aware heads…

认识分布式锁、使用分布式锁 Redission、实现秒杀案例

分布式锁 基本原理 分布式锁&#xff1a;满足分布式系统或集群模式下多进程可见并且互斥的锁。 分布式锁的核心思想就是让大家都使用同一把锁&#xff0c;只要大家使用的是同一把锁&#xff0c;那么我们就能锁住线程&#xff0c;不让线程进行&#xff0c;让程序串行执行&…

[附源码]计算机毕业设计基于Vue的社区拼购商城Springboot程序

项目运行 环境配置&#xff1a; Jdk1.8 Tomcat7.0 Mysql HBuilderX&#xff08;Webstorm也行&#xff09; Eclispe&#xff08;IntelliJ IDEA,Eclispe,MyEclispe,Sts都支持&#xff09;。 项目技术&#xff1a; SSM mybatis Maven Vue 等等组成&#xff0c;B/S模式 M…

node.js-fs模块学习

目录 1.使用fs.readFile方法读取文件 2.使用fs.writeFile方法写入文件 3.fs小案例-整理成绩并写入到新文件中 4.fs模块-路径动态拼接的问题 1.使用fs.readFile方法读取文件 //导入fs模块 const fs require(fs)//调用fs读取文件 //参数1&#xff1a;读取文件的存放路径 //…

spring——Spring Bean属性注入——短命名空间注入——p 命名空间注入(setter注入)...

短命名空间注入 我们在通过构造函数或 setter 方法进行属性注入时&#xff0c;通常是在 <bean> 元素中嵌套 <property> 和 <constructor-arg> 元素来实现的。这种方式虽然结构清晰&#xff0c;但书写较繁琐。Spring 框架提供了 2 种短命名空间&#xff0c;可…

java基于Springboot的论坛管理系统-计算机毕业设计

项目介绍 在社会快速发展的影响下&#xff0c;论坛管理系统继续发展&#xff0c;使论坛管理系统的管理和运营比过去十年更加信息化。依照这一现实为基础&#xff0c;设计一个快捷而又方便的网上论坛管理系统是一项十分重要并且有价值的事情。对于传统的论坛管理系统控制模型来…

【大数据入门核心技术-Azkaban】(一)Azkaban介绍

目录 一、Azkaban 是什么 二、Azkaban 特点 三、Azkaban 与 Oozie 对比 一、Azkaban 是什么 Azkaban 是由 Linkedin 公司推出的一个批量工作流任务调度器&#xff0c;Azkaban 使用 job 文件建立任务之间的依赖关系&#xff0c;并提供 Web 界面供用户管理和调度工作流 官方网…

最新、最全面的Java面试题及答案(212道)

文章目录一. Java 基础1. JDK 和 JRE 有什么区别&#xff1f;2. 和 equals 的区别是什么&#xff1f;3. 两个对象的 hashCode() 相同&#xff0c;则equals() 也一定为true&#xff0c;对吗&#xff1f;4. final 在 Java 中有什么作用&#xff1f;5. Java 中的 Math. round(-1.…

服务器编译spark3.3.1源码支持CDH6.3.2

1、一定要注意编译环境的配置 mvn:3.6.3 scala:2.12.17 JDK:1.8 spark:3.3.1 服务器内存至少需要 8G 重点2、下载连接 wget https://dlcdn.apache.org/maven/maven-3/3.6.3/binaries/apache-maven-3.6.3-bin.zip wget https://downloads.lightbend.com/scala/2.12.17/scala-2…

大量域名数据下载地址收集整理

说明地址中文网站排行榜——网站排名大全http://www.alexa.cn/siterankAlexa排名(70万的数据)top-1m.csv.zipCisco Umbrellahttp://s3-us-west-1.amazonaws.com/umbrella-static/index.htmlMajestic Millionhttps://majestic.com/reports/majestic-millionTranco-List.euhttps:…

这回稳了,蓝桥杯才是yyds!

为了提高自己的编程能力&#xff0c;很多人会选择参加比赛。而这些比赛中&#xff0c;连续三年入选“全国普通高校学科竞赛排行榜”的蓝桥杯大赛&#xff0c;可以综合测评加分&#xff0c;优先奖学金评定、升学考研加分&#xff0c;yyds !! 蓝桥杯大赛报名截止至12月16日&…

Win 11 安装pycolmap

1.vcpkg安装 1.1 下载vcpkg源码 >> git clone gitgithub.com:microsoft/vcpkg.git1.2 编译生成vcpkg工具 >> .\vcpkg\bootstrap-vcpkg.bat这样在vcpkg文件夹下就会生成vcpkg.exe文件 2.colmap安装 2.1 命令安装colmap的gpu版本 >> .\vcpkg install col…

矩阵 行列式的计算

行列式要求 要计算行列式&#xff0c;那么这个矩阵一定是一个方阵 行列式性质 行列式转置后值不变互换行列式中两行&#xff0c;值变为相反数行列式中两行成比例&#xff0c;行列式为0行列式中一行所有元素乘以一个数后加到另一行&#xff0c;行列式值不变 行列式的计算有很…

【编译原理】第二章部分课后题答案

《编译原理&#xff08;第三版&#xff09;》陈意云著 第 二 章 课 后 习 题 T 2.3 叙述由下列正规式描述的语言 0(0∣1)∗0\space\space0\space\space(\space\space 0\space\space |\space\space 1\space\space)^{\space*\space\space}00 ( 0 ∣ 1 ) ∗ 0 正规式规定开…