【入门Flink】- 09Flink水位线Watermark

news2025/1/13 19:44:13

窗口的处理过程中,基于数据的时间戳,自定义一个“逻辑时钟”。这个时钟的时间不会自动流逝;它的时间进展,就是靠着新到数据的时间戳来推动的。

什么是水位线

用来衡量事件时间进展的标记,就被称作“水位线”(Watermark)

具体实现上,水位线可以看作一条特殊的数据记录,它是插入到数据流中的一个标记点,主要内容就是一个时间戳,用来指示当前的事件时间。而它插入流中的位置,就应该是在某个数据到来之后;这样就可以从这个数据中提取时间戳,作为当前水位线的时间戳了。

有序流中水位线

(1)理想状态(数据量小),数据应该按照生成的先后顺序进入流中,每条数据产生一个水位线

(2)实际应用中,如果当前数据量非常大,且同时涌来的数据时间差会非常小(比如几毫秒),往
往对处理计算也没什么影响。所以为了提高效率,会每隔一段时间生成一个水位线

乱序流中水位线

分布式系统中,数据在节点间传输,会因为网络传输延迟的不确定性,导致顺序发生改变,这就是
所谓的“乱序数据”。

(1)乱序+数据量小:还是靠数据来驱动,每来一个数据就提取它的时间戳、插入一个水位线。乱序数据,插入新的水位线时,要先判断一下时间戳是否比之前的大,否则就不再生成新的水位线。也就是说,只有数据的时间戳比当前时钟大,才能推动时钟前进,这时才插入水位线。

(2)乱序+数据量大:考虑到大量数据同时到来的处理效率,可以周期性地生成水位线。这时
只需要保存一下之前所有数据中的最大时间截,需要插入水位线时,就直接以它作为时间戳生成新的水位线。

(3)乱序+迟到数据:无法正确处理“迟到”的数据。为了让窗口能够正确收集到迟到的数据设置迟到时间,比如2秒:也就是用当前已有数据的最大时间戳减去2秒,就是要插入的水位线的时间戳。9秒的数据到来之后,事件时钟不会直接推进到9秒,而是进展到了7秒;必须等到11秒的数据到来之后,事件时钟才会进展到9秒,此时迟到2秒的数据也会被正确收集处理。【迟到时间不能设置过长,否则会对实时性会有所影响】

水位线的特性

  • 水位线是插入到数据流中的一个标记,可以认为是一个特殊的数据
  • 水位线主要的内容是一个时间戳,用来表示当前事件时间的进展
  • 水位线是基于数据的时间戳生成的
  • 水位线的时间戳必须单调递增,以确保任务的事件时间时钟一直向前推进
  • 水位线可以通过设置延迟,来保证正确处理乱序数据
  • 一个水位线Watermark(t),表示在当前流中事件时间已经达到了时间戳t,代表t之前的所 有数据都到齐了,之后流中不会出现时间t'≤t的数据

水位线与窗口配合,完成对乱序数据的正确处理

水位线是流处理中对低延迟和结果正确性的一个权衡机制。

水位线生成策略

生成水位线的方法:.assignTimestampsAndWatermarks(),主要用来为流中的数据分配时间戳,并生成水位线来指示事件时间。【指定水位线生成策略】

stream.assignTimestampsAndWatermarks(<watermark strategy>);

WatermarkStrategy 水位线策略是一个接口,里面内置一些生成策略:

image-20231111224149038

有序流中内置水位线设置

时间戳单调增长,所以永远不会出现迟到数据的问题。WatermarkStrategy.forMonotonousTimestamps()

        WatermarkStrategy<WaterSensor> watermarkStrategy = WatermarkStrategy.
                <WaterSensor>forMonotonousTimestamps()
                // 指定时间戳分配器,从数据中提取 单位毫秒
                .withTimestampAssigner((SerializableTimestampAssigner<WaterSensor>) (element, recordTimestamp) -> {
                    System.out.println(" 数据 =" + element + ",recordTs=" + recordTimestamp);
                    return element.getTs() * 1000L;
                });

乱序流中内置水位线设置

由于乱序流中需要等待迟到数据到齐,必须设置一个固定量的延迟时间。WatermarkStrategy. forBoundedOutOfOrderness()

        WatermarkStrategy<WaterSensor> watermarkStrategy = WatermarkStrategy
                // 乱序数据,等待3s
                .<WaterSensor>forBoundedOutOfOrderness(Duration.ofSeconds(3))
                // 指定时间戳分配器,从数据中提取 单位毫秒
                .withTimestampAssigner((SerializableTimestampAssigner<WaterSensor>) (element, recordTimestamp) -> {
                    System.out.println(" 数据 =" + element + ",recordTs=" + recordTimestamp);
                    return element.getTs() * 1000L;
                });

自定义水位线生成器

(1)周期性水位线生成器(Periodic Generator)

周期性生成器一般是通过 onEvent()观察判断输入的事件,而在onPeriodicEmit()里发出水位线。
模仿该类BoundedOutOfOrdernessWatermarks

public class CustomBoundedOutOfOrdernessGenerator<T> implements WatermarkGenerator<T> {

    private Long delayTime = 5000L; // 延迟时间
    private Long maxTs = Long.MIN_VALUE + delayTime + 1L; // 观察到的最大时间戳

    @Override
    public void onEvent(T event, long eventTimestamp, WatermarkOutput output) {
        // 每来一条数据就调用一次
        maxTs = Math.max(eventTimestamp, maxTs); // 更新最大时间戳
    }

    @Override
    public void onPeriodicEmit(WatermarkOutput output) {
        // 发射水位线,默认 200ms 调用一次
        output.emitWatermark(new Watermark(maxTs - delayTime - 1L));
    }
}

在 onPeriodicEmit()里调用 output.emitWatermark(),就可以发出水位线了;方法由系统框架周期性地调用,默认 200ms 一次。【不建议修改】

 env.getConfig().setAutoWatermarkInterval(400L);

(2)断点式水位线生成器(Punctuated Generator)

断点式生成器会不停地检测 onEvent()中的事件,当发现带有水位线信息的事件时,就立即发出水位线。

如下:只要有数据来就直接发射水位线

    @Override
    public void onEvent(T event, long eventTimestamp, WatermarkOutput output) {
        // 每来一条数据就调用一次
        maxTs = Math.max(eventTimestamp, maxTs); // 更新最大时间戳
        output.emitWatermark(new Watermark(maxTs - delayTime - 1L));
    }

(3)在数据源中发送水位线

可以在自定义的数据源中抽取事件时间,然后发送水位线。

自定义数据源中生成水位线和在程序中使用assignTimestampsAndWatermarks 方法生成水位线二者只能取其一

env.fromSource(
kafkaSource,
WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(3)), 
// WatermarkStrategy.noWatermarks() 或者不发送水位线
"kafkasource"
)

水位线的传递(空闲等待withIdleness)

一个任务接收到多个上游并行任务传递来的水位线时,应该以最小的作为当前任务的事件时钟

如下案例:当程序并行度设置为2时,自定义分区器导致一个分区一直拿不到数据(最小时钟一直为null),此时如不加以干预,事件时钟将永远不会推进,存在问题。设置空闲时间,当超过空闲时间一直收不到该分区数据,直接忽略该分区,还是会依旧推进时间时钟

        env.setParallelism(2);
		// 自定义分区器:数据%分区数,只输入奇数,都只会去往map 的一个子任务
        SingleOutputStreamOperator<Integer> socketDS = env.socketTextStream("xxxx", 7777)
                .partitionCustom(new MyPartitioner(), r -> r).map(Integer::parseInt)
                .assignTimestampsAndWatermarks(
                        WatermarkStrategy
                                .<Integer>forMonotonousTimestamps().withTimestampAssigner((r, ts) -> r * 1000L)
                                .withIdleness(Duration.ofSeconds(5)) // 空闲等待 5s
                );

	    // 分成两组:奇数一组,偶数一组,开 10s 的事件时间滚动窗口socketDS
        .keyBy(r -> r % 2)
        .window(TumblingEventTimeWindows.of(Time.seconds(10)))
        ...

迟到数据的处理

1)推迟水印推进(设置延迟时间)

水印产生时,设置一个乱序容忍度,推迟系统时间的推进,保证窗口计算被延迟执行,为乱序的数据争取更多的时间进入窗口。

WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(10));

2)设置窗口延迟关闭

Flink 的窗口,也允许迟到数据。当触发了窗口计算后,会先计算当前的结果,但是此时并不会关闭窗口。当达到设置延迟关闭时间之后,才会真正关闭窗口,关闭窗口后再迟到的数据就不会再处理。

.window(TumblingEventTimeWindows.of(Time.seconds(5)))
.allowedLateness(Time.seconds(3))

3)使用侧流接收迟到的数据

最后兜底,窗口关闭之后的迟到数据,使用侧输出流输出。

完整方案:

public class WatermarkLateDemo {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        SingleOutputStreamOperator<WaterSensor> sensorDS = env.socketTextStream("124.222.253.33", 7777)
                .map(new WaterSensorMapFunction());
        WatermarkStrategy<WaterSensor> watermarkStrategy = WatermarkStrategy
          		 // 1.设置迟到时间 3s
                .<WaterSensor>forBoundedOutOfOrderness(Duration.ofSeconds(3))
                .withTimestampAssigner((element, recordTimestamp) -> element.getTs() * 1000L)
                // .withIdleness(Duration.ofSeconds(5)); // 空闲等待 5s;

        SingleOutputStreamOperator<WaterSensor>
                sensorDSWithWatermark = sensorDS.assignTimestampsAndWatermarks(watermarkStrategy);
        OutputTag<WaterSensor> lateTag = new OutputTag<>("latedata", Types.POJO(WaterSensor.class));
        SingleOutputStreamOperator<String> process = sensorDSWithWatermark.keyBy(WaterSensor::getId)
                .window(TumblingEventTimeWindows.of(Time.seconds(10))) 
                .allowedLateness(Time.seconds(2)) // 2.推迟2s关窗
                .sideOutputLateData(lateTag) // 3.关窗后的迟到数据,放入侧输出流
                .process(
                        new ProcessWindowFunction<WaterSensor, String, String, TimeWindow>() {
                            @Override
                            public void process(String s, Context context, Iterable<WaterSensor> elements, Collector<String> out) throws Exception {
                                long startTs = context.window().getStart();
                                long endTs = context.window().getEnd();
                                String windowStart = DateFormatUtils.format(startTs, "yyyy-MM-dd HH:mm:ss.SSS");
                                String windowEnd = DateFormatUtils.format(endTs, "yyyy-MM-dd HH:mm:ss.SSS");
                                long count = elements.spliterator().estimateSize();
                                out.collect("key=" + s + "的窗口[" + windowStart + "," + windowEnd + ") 包含 " + count + " 条数据===>" + elements);
                            }
                        }
                );
        process.print();
        // 从主流获取侧输出流,打印
        process.getSideOutput(lateTag).printToErr("关窗后的迟到数据");
        env.execute();
    }
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1198030.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

计网:第一章 概述

目录 1.1计算机网络在信息时代作用 1.2因特网概述 1.3三种交换方式 1.4计算机网络的定义和分类 1.5计算机网络的性能指标 1.6计算机网络的体系结构 基于湖科大教书匠b站计算机网络教学视频以及本校课程老师ppt 整合出的计算机网络学习笔记 根据文章目录&#xff0c;具体内…

C语言概述

目录 ​编辑 1. C语言发展史 2. C语言特点 3. C语言标准 4. C语言编程机制 4.1 预处理(Preprocessing) 4.2 编译(Compilation) 4.3 汇编(Assemble) 4.4 链接(Linking) 结语 1. C语言发展史 C语言是由美国贝尔实验室的Dennis Ritchie于1972年设计开发的一种编…

keil和proteus联动要点

一、keil与proteus如何进行联动&#xff1f; 1.先安装vdmagdi.exe&#xff0c;这是驱动 2.要保证keil工程编译通过&#xff0c;左上角红色图标进行编译&#xff0c;黑色框图标进行链接。 3.生成hex文件 先点击这个图标 按照顺序点击&#xff0c;生成HEX文件。 4.在打开的prot…

Windows 11系统cmd终端美化、Vscode终端美化

win11美化cmd终端和vscode的终端 1. 修改终端背景2. oh-my-posh2.1 安装oh-my-posh2.2 安装Clink2.3 Clink配置oh-my-posh2.4 下载和配置Nerd字体2.5 修改美化主题 3. vscode终端美化 电脑默认的终端没有语法高亮这些&#xff0c;运行命令和代码输出字体一样&#xff0c;有时会…

【数据结构】归并排序

#include<iostream>using namespace std;void Merge(int* arr,int left,int right,int mid, int*& tmparr) {int begin1 left, end1 mid;int begin2 mid 1, end2 right;int tmpi left;//下面合并两个数组为一个有序数组&#xff08;升序&#xff09;&#xff1…

测试开源加解密库NETCore.Encrypt中的RSA加解密函数

微信公众号“dotNET跨平台”的文章《开箱即用&#xff0c;.NET最全的加解密开源库》介绍了开源通用工具库NETCore.Encrypt&#xff0c;其支持对称加密算法、非对称加密算法、摘要算法等多种常用算法&#xff0c;使用方便&#xff0c;不过目前仅支持.net core。本文主要测试调用…

vue-入门介绍

​&#x1f308;个人主页&#xff1a;前端青山 &#x1f525;系列专栏&#xff1a;Vue篇 &#x1f516;人终将被年少不可得之物困其一生 依旧青山,本期给大家带来vue篇专栏内容,初识vue入门到项目实战详解 目录 一.Vue介绍 二.初识Vue 工具安装 创建项目 目录结构介绍 项…

主题模型LDA教程:一致性得分coherence score方法对比(umass、c_v、uci)

文章目录 主题建模潜在迪利克雷分配&#xff08;LDA&#xff09;一致性得分 coherence score1. CV 一致性得分2. UMass 一致性得分3. UCI 一致性得分4. Word2vec 一致性得分5. 选择最佳一致性得分 主题建模 主题建模是一种机器学习和自然语言处理技术&#xff0c;用于确定文档…

文心耀乌镇,“大模型之光”展现了什么?

“乌镇的小桥流水&#xff0c;能照见全球科技的风起云涌。” 多年以来&#xff0c;伴随着中国科技的腾飞&#xff0c;以及世界互联网大会乌镇峰会的连续成功举办&#xff0c;这句话已经成为全球科技产业的共识。乌镇是科技与互联网的风向标、晴雨表&#xff0c;也是无数新故事开…

2023-11-11 LeetCode每日一题(情侣牵手)

2023-11-11每日一题 一、题目编号 765. 情侣牵手二、题目链接 点击跳转到题目位置 三、题目描述 n 对情侣坐在连续排列的 2n 个座位上&#xff0c;想要牵到对方的手。 人和座位由一个整数数组 row 表示&#xff0c;其中 row[i] 是坐在第 i 个座位上的人的 ID。情侣们按顺…

SQL必知会(二)-SQL查询篇(4)-高级过滤

第5课、高级过滤 组合 WHERE 子句 AND OR&#xff1a;与条件、或条件 多个 WHERE 子句有两种使用方式&#xff1a;AND 子句 或 OR 子句。 1&#xff09;AND 操作符 AND 相当于编程语言中的与条件。 需求&#xff1a;如果某个产品由供应商 DLL01 制造&#xff0c;但价格不高…

java项目之共享充电宝管理系统(ssm框架)

风定落花生&#xff0c;歌声逐流水&#xff0c;大家好我是风歌&#xff0c;混迹在java圈的辛苦码农。今天要和大家聊的是一款基于ssm的共享充电宝管理系统。项目源码以及部署相关请联系风歌&#xff0c;文末附上联系信息 。 项目简介&#xff1a; 管理员&#xff1a;首页、个…

Shell编程入门--概念、特性、bash配置文件

目录 一、Shell概念1.定义2.分类和使用场景2.1.分类和切换2.2.使用场景 3.特性3.1.文件描述符与输出重定向3.2.历史命令---history3.3.别名 --alias3.4.命令排序执行3.5.部分快捷键3.6.通配符置换 4.脚本规范5.脚本运行方式5.1.bash脚本执行5.2.bash脚本测试 二、bash配置文件1…

Oracle(18)Auditing

文章目录 一、基础知识1、审计介绍2、Auditing Types 审计类型3、Auditing Guidelines 审计准则4、Auditing Categories 审核类别5、Database Auditing 数据库审计6、Auditing User SYS 审计sys用户7、Getting Auditing Informatio 获取审计信息8、获取审计记录通知 二、基础操…

(三)、MySQL索引

一、索引基础 索引是存储引擎用于快速找到记录的一种数据结构。(可以理解为一本书的目录)。 例子&#xff1a;where id1 如果在id列上建有索引&#xff0c;则MySQL将使用该索引找到id为1的行&#xff0c;也就是说&#xff0c;MySQL先在索引上按值进行查找&#xff0c;然…

Java10新增特性

版本介绍 Java 10 的发布时间是2018年3月20日。这是在Java 9之后&#xff0c;采用了基于时间发布的策略&#xff0c;每6个月一个版本。这是采用新的发布策略之后的第一个版本。 Java 10 是Java版本历史上最快的一个版本。它打破了Java缓慢增长和进化的概念。它是一个具有许多…

【树与二叉树的转换,哈夫曼树的基本概念】

文章目录 树与二叉树的转换将二叉树转化为树森林与二叉树的转化&#xff08;二叉树与多棵树之间的关系&#xff09;二叉树转换为森林森林的先序遍历1&#xff09;先序遍历2&#xff09;后序遍历 哈夫曼树的基本概念森林转换成二叉树&#xff08;二叉树与多棵树的关系&#xff0…

Sensor 点亮出图后,颜色偏红或者偏绿是为什么?

这是因为 sensor balck level 的值配置的不正确导致&#xff0c;black level 的值一般在效果参数的 calibration 参数里面。 在驱动调试阶段&#xff0c;我们一般都是复用其他已调试好的&#xff0c;sensor 的驱动文件及效果文件&#xff0c; 而不同 sensor 的 balck level 的…

YOLOv8-Seg改进:分割注意力系列篇 | 高效的通道先验卷积注意力(CPCA) | 中科院 2023.6

🚀🚀🚀本文改进:高效的通道先验卷积注意力(CPCA)方法,支持注意力权重在通道和空间维度上的动态分布; 🚀🚀🚀CPCA 小目标分割检测&复杂场景首选,实现涨点 🚀🚀🚀YOLOv8-seg创新专栏:http://t.csdnimg.cn/KLSdv 学姐带你学习YOLOv8,从入门到创新…

奇舞周刊第510期:浏览器和图形引擎渲染对比

记得点击文章末尾的“ 阅读原文 ”查看哟~ 下面先一起看下本期周刊 摘要 吧~ 奇舞推荐 ■ ■ ■ 浏览器和图形引擎渲染对比 本文从介绍浏览器渲染引擎开始&#xff0c;逐渐引出和图形引擎的比较&#xff0c;尝试从图形视角探索和理解浏览器的渲染原理。 Next.js的崛起&#xff…