【FLink】水位线(Watermark)

news2025/1/16 1:41:47

目录

1、关于时间语义

1.1事件时间

1.2处理时间​编辑

2、什么是水位线

2.1 顺序流和乱序流

2.2乱序数据的处理

2.3 水位线的特性

3 、水位线的生成

3.1 生成水位线的总体原则

3.2 水位线生成策略

3.3 Flink内置水位线

3.3.1 有序流中内置水位线设置

3.4.2 断点式水位线生成器(Punctuated Generator)

3.4.3 在数据源中发送水位线

4、水位线的传递

5、迟到数据的处理


1、关于时间语义

1.1事件时间

        一般情况下,业务日志数据中都会记录数据生成的时间戳(timestamp),它就可以作为事件时间的判断基础。从Flink1.12版本开始,Flink已经将事件时间作为默认的时间语义了。

1.2处理时间

2、什么是水位线

在Flink中,用来衡量事件时间进展的标记,就被称作“水位线”(Watermark)。说白了就是事件时间戳。

2.1 顺序流和乱序流

有序流就是指数据按照生成的先后顺序,每条数据产生一个有先后顺序的水位线

这是一种理想的状态(数据量较小),而在实际中,我们产生的数据量往往非常庞大,而数据之间的时间间隔非常之小,所以为了提高效率,一般会每隔一段时间生成一个水位线

在实际生产中,由于多服务之间网络传输等的因素,往往我们的数据流,并不是我们所想的顺序结果,而是数据先后错乱,这就是乱序流

2.2乱序数据的处理

由于数据是乱序的,我们无法正确处理“迟到”的数据,为了让窗口能够正确的收集到迟到的数据,我们也可以让窗口等上一段时间,比如2秒。也就是说,我们可以在数据的时间戳基础上加上一些延迟来尽量保证不丢数据。

2.3 水位线的特性

3

3 、水位线的生成

3.1 生成水位线的总体原则

完美的水位线是“绝对正确”的,也就是一个水位线一旦出现,就表示这个时间之前的数据已经全部到齐、之后再也不会出现了。不过如果要保证绝对正确,就必须等足够长的时间,这会带来更高的延迟。

如果我们希望处理得更快、实时性更强,那么可以将水位线延迟设得低一些。这种情况下,可能很多迟到数据会在水位线之后才到达,就会导致窗口遗漏数据,计算结果不准确。当然,如果我们对准确性完全不考虑、一味地追求处理速度,可以直接使用处理时间语义,这在理论上可以得到最低的延迟。

所以Flink中的水位线,其实是流处理中对低延迟和结果正确性的一个权衡机制,而且把控制的权力交给了程序员,我们可以在代码中定义水位线的生成策略。

3.2 水位线生成策略

在Flink的DataStream API中,有一个单独用于生成水位线的方法:.assignTimestampsAndWatermarks(),它主要用来为流中的数据分配时间戳,并生成水位线来指示事件时间。

DataStream<Event> stream = env.addSource(new ClickSource());

DataStream<Event> withTimestampsAndWatermarks = 
stream.assignTimestampsAndWatermarks(<watermark strategy>);

WatermarkStrategy作为参数,这就是所谓的“水位线生成策略”。WatermarkStrategy是一个接口,该接口中包含了一个“时间戳分配器”TimestampAssigner和一个“水位线生成器”WatermarkGenerator。

public interface WatermarkStrategy<T> 
    extends TimestampAssignerSupplier<T>,
            WatermarkGeneratorSupplier<T>{

    // 负责从流中数据元素的某个字段中提取时间戳,并分配给元素。时间戳的分配是生成水位线的基础。
    @Override
    TimestampAssigner<T> createTimestampAssigner(TimestampAssignerSupplier.Context context);

    // 主要负责按照既定的方式,基于时间戳生成水位线
    @Override
    WatermarkGenerator<T> createWatermarkGenerator(WatermarkGeneratorSupplier.Context context);
}

3.3 Flink内置水位线

3.3.1 有序流中内置水位线设置

对于有序流,主要特点就是时间戳单调增长,所以永远不会出现迟到数据的问题。这是周期性生成水位线的最简单的场景,直接调用WatermarkStrategy.forMonotonousTimestamps()方法就可以实现。

public class WatermarkMonoDemo {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        SingleOutputStreamOperator<WaterSensor> sensorDS = env
                .socketTextStream("hadoop102", 7777)
                .map(new WaterSensorMapFunction());

        // TODO 1.定义Watermark策略
        WatermarkStrategy<WaterSensor> watermarkStrategy = WatermarkStrategy
                // 1.1 指定watermark生成:升序的watermark,没有等待时间
                .<WaterSensor>forMonotonousTimestamps()
                // 1.2 指定 时间戳分配器,从数据中提取
                .withTimestampAssigner(new SerializableTimestampAssigner<WaterSensor>() {
                    @Override
                    public long extractTimestamp(WaterSensor element, long recordTimestamp) {
                        // 返回的时间戳,要 毫秒
                        System.out.println("数据=" + element + ",recordTs=" + recordTimestamp);
                        return element.getTs() * 1000L;
                    }
                });

        // TODO 2. 指定 watermark策略
        SingleOutputStreamOperator<WaterSensor> sensorDSwithWatermark = sensorDS.assignTimestampsAndWatermarks(watermarkStrategy);


        sensorDSwithWatermark.keyBy(sensor -> sensor.getId())
                // TODO 3.使用 事件时间语义 的窗口
                .window(TumblingEventTimeWindows.of(Time.seconds(10)))
                .process(
                        new ProcessWindowFunction<WaterSensor, String, String, TimeWindow>() {

                            @Override
                            public void process(String s, Context context, Iterable<WaterSensor> elements, Collector<String> out) throws Exception {
                                long startTs = context.window().getStart();
                                long endTs = context.window().getEnd();
                                String windowStart = DateFormatUtils.format(startTs, "yyyy-MM-dd HH:mm:ss.SSS");
                                String windowEnd = DateFormatUtils.format(endTs, "yyyy-MM-dd HH:mm:ss.SSS");

                                long count = elements.spliterator().estimateSize();

                                out.collect("key=" + s + "的窗口[" + windowStart + "," + windowEnd + ")包含" + count + "条数据===>" + elements.toString());
                            }
                        }
                )
                .print();

        env.execute();
    }
}

3.3.2 乱序流中内置水位线设置

调用WatermarkStrategy. forBoundedOutOfOrderness()方法就可以实现。

这个方法需要传入一个maxOutOfOrderness参数,表示“最大乱序程度”,它表示数据流中乱序数据时间戳的最大差值

public class WatermarkOutOfOrdernessDemo {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);


        SingleOutputStreamOperator<WaterSensor> sensorDS = env
                .socketTextStream("hadoop102", 7777)
                .map(new WaterSensorMapFunction());


        // TODO 1.定义Watermark策略
        WatermarkStrategy<WaterSensor> watermarkStrategy = WatermarkStrategy
                // 1.1 指定watermark生成:乱序的,等待3s
                .<WaterSensor>forBoundedOutOfOrderness(Duration.ofSeconds(3))
                // 1.2 指定 时间戳分配器,从数据中提取
                .withTimestampAssigner(
                        (element, recordTimestamp) -> {
                            // 返回的时间戳,要 毫秒
                            System.out.println("数据=" + element + ",recordTs=" + recordTimestamp);
                            return element.getTs() * 1000L;
                        });

        // TODO 2. 指定 watermark策略
        SingleOutputStreamOperator<WaterSensor> sensorDSwithWatermark = sensorDS.assignTimestampsAndWatermarks(watermarkStrategy);


        sensorDSwithWatermark.keyBy(sensor -> sensor.getId())
                // TODO 3.使用 事件时间语义 的窗口
                .window(TumblingEventTimeWindows.of(Time.seconds(10)))
                .process(
                        new ProcessWindowFunction<WaterSensor, String, String, TimeWindow>() {

                            @Override
                            public void process(String s, Context context, Iterable<WaterSensor> elements, Collector<String> out) throws Exception {
                                long startTs = context.window().getStart();
                                long endTs = context.window().getEnd();
                                String windowStart = DateFormatUtils.format(startTs, "yyyy-MM-dd HH:mm:ss.SSS");
                                String windowEnd = DateFormatUtils.format(endTs, "yyyy-MM-dd HH:mm:ss.SSS");

                                long count = elements.spliterator().estimateSize();

                                out.collect("key=" + s + "的窗口[" + windowStart + "," + windowEnd + ")包含" + count + "条数据===>" + elements.toString());
                            }
                        }
                )
                .print();

        env.execute();
    }
}

3.4 自定义水位线生成器

3.4.1 周期性水位线生成器(Periodic Generator)

周期性生成器一般是通过onEvent()观察判断输入的事件,而在onPeriodicEmit()里发出水位线。

import com.atguigu.bean.Event;
import org.apache.flink.api.common.eventtime.*;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

// 自定义水位线的产生
public class CustomPeriodicWatermarkExample {

    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        env
                .addSource(new ClickSource())
                .assignTimestampsAndWatermarks(new CustomWatermarkStrategy())
                .print();

        env.execute();
    }

    public static class CustomWatermarkStrategy implements WatermarkStrategy<Event> {

        @Override
        public TimestampAssigner<Event> createTimestampAssigner(TimestampAssignerSupplier.Context context) {

            return new SerializableTimestampAssigner<Event>() {

                @Override
                public long extractTimestamp(Event element,long recordTimestamp) {
                    return element.timestamp; // 告诉程序数据源里的时间戳是哪一个字段
                }
            };
        }

        @Override
        public WatermarkGenerator<Event> createWatermarkGenerator(WatermarkGeneratorSupplier.Context context) {
            return new CustomBoundedOutOfOrdernessGenerator();
        }
    }

    public static class CustomBoundedOutOfOrdernessGenerator implements WatermarkGenerator<Event> {

        private Long delayTime = 5000L; // 延迟时间
        private Long maxTs = -Long.MAX_VALUE + delayTime + 1L; // 观察到的最大时间戳

        @Override
        public void onEvent(Event event,long eventTimestamp,WatermarkOutput output) {
            // 每来一条数据就调用一次
            maxTs = Math.max(event.timestamp,maxTs); // 更新最大时间戳
        }

        @Override
        public void onPeriodicEmit(WatermarkOutput output) {
            // 发射水位线,默认200ms调用一次
            output.emitWatermark(new Watermark(maxTs - delayTime - 1L));
        }
    }
}

如果想修改默认周期时间,可以通过下面方法修改。

//修改默认周期为400ms
env.getConfig().setAutoWatermarkInterval(400L);

3.4.2 断点式水位线生成器(Punctuated Generator

断点式生成器会不停地检测onEvent()中的事件,当发现带有水位线信息的事件时,就立即发出水位线。我们把发射水位线的逻辑写在onEvent方法当中即可。

3.4.3 在数据源中发送水位线

我们也可以在自定义的数据源中抽取事件时间,然后发送水位线。这里要注意的是,在自定义数据源中发送了水位线以后,就不能再在程序中使用assignTimestampsAndWatermarks方法来生成水位线了。在自定义数据源中生成水位线和在程序中使用assignTimestampsAndWatermarks方法生成水位线二者只能取其一。

env.fromSource(
kafkaSource, WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(3)), "kafkasource"
)

4、水位线的传递

在流处理中,上游任务处理完水位线、时钟改变之后,要把当前的水位线再次发出,广播给所有的下游子任务。而当一个任务接收到多个上游并行任务传递来的水位线时,应该以最小的那个作为当前任务的事件时钟。

水位线在上下游任务之间的传递,非常巧妙地避免了分布式系统中没有统一时钟的问题,每个任务都以“处理完之前所有数据”为标准来确定自己的时钟

也就是说:水位线的传递是以最小事件时间为准则。

5、迟到数据的处理

5.1 推迟水印推进

在水印产生时,设置一个乱序容忍度,推迟系统时间的推进,保证窗口计算被延迟执行,为乱序的数据争取更多的时间进入窗口。

WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(10));

5.2 设置窗口延迟关闭

当触发了窗口计算后,会先计算当前的结果,但是此时并不会关闭窗口。直到wartermark 超过了窗口结束时间+推迟时间,此时窗口会真正关闭。

.window(TumblingEventTimeWindows.of(Time.seconds(5)))
.allowedLateness(Time.seconds(3))

5.3 使用侧流接收迟到的数据

.windowAll(TumblingEventTimeWindows.of(Time.seconds(5)))
.allowedLateness(Time.seconds(3))
.sideOutputLateData(lateWS)

完整示例:

public class WatermarkLateDemo {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);


        SingleOutputStreamOperator<WaterSensor> sensorDS = env
                .socketTextStream("hadoop102", 7777)
                .map(new WaterSensorMapFunction());

        WatermarkStrategy<WaterSensor> watermarkStrategy = WatermarkStrategy
                .<WaterSensor>forBoundedOutOfOrderness(Duration.ofSeconds(3))
                .withTimestampAssigner((element, recordTimestamp) -> element.getTs() * 1000L);

        SingleOutputStreamOperator<WaterSensor> sensorDSwithWatermark = sensorDS.assignTimestampsAndWatermarks(watermarkStrategy);


        OutputTag<WaterSensor> lateTag = new OutputTag<>("late-data", Types.POJO(WaterSensor.class));

        SingleOutputStreamOperator<String> process = sensorDSwithWatermark.keyBy(sensor -> sensor.getId())
                .window(TumblingEventTimeWindows.of(Time.seconds(10)))
                .allowedLateness(Time.seconds(2)) // 推迟2s关窗
                .sideOutputLateData(lateTag) // 关窗后的迟到数据,放入侧输出流
                .process(
                        new ProcessWindowFunction<WaterSensor, String, String, TimeWindow>() {

                            @Override
                            public void process(String s, Context context, Iterable<WaterSensor> elements, Collector<String> out) throws Exception {
                                long startTs = context.window().getStart();
                                long endTs = context.window().getEnd();
                                String windowStart = DateFormatUtils.format(startTs, "yyyy-MM-dd HH:mm:ss.SSS");
                                String windowEnd = DateFormatUtils.format(endTs, "yyyy-MM-dd HH:mm:ss.SSS");

                                long count = elements.spliterator().estimateSize();

                                out.collect("key=" + s + "的窗口[" + windowStart + "," + windowEnd + ")包含" + count + "条数据===>" + elements.toString());
                            }
                        }
                );


        process.print();
        // 从主流获取侧输出流,打印
        process.getSideOutput(lateTag).printToErr("关窗后的迟到数据");

        env.execute();
    }
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1235458.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

A____Z____RECOVER____DATA勒索恢复---惜分飞

有客户MySQL数据库被黑,业务库中表被删除,并创建A____Z____RECOVER____DATA库,里面有一张readme表,内容为: mysql> select * from readme \G; *************************** 1. row *************************** zh_content: 请尽快与我们取得联系&#xff0c;否则我们将会公…

C#期末速成推荐看的知识和免费视频

【拯救者】C#期末速成 (基础讲解整套题讲解文档下载) 4K ​ 这里讲的是【 &#x1f337;速成&#x1f337; 速成&#x1f337; 速成】版本&#xff0c;按课本章节来&#xff0c; 抽取重点&#xff0c;翻译为人话&#xff01;&#xff01;&#xff01;&#x1f49d; 文末附上 免…

UE5和UE4版本更新重大改变汇总。

转载&#xff1a;UE5和UE4版本更新重大改变汇总。 - 知乎 (zhihu.com) 用户界面变化&#xff1a; 1&#xff0c;原先拖动给放置Actor的place actors&#xff0c;世界大纲&#xff0c;Level等都可以通过右击隐藏到侧边栏&#xff1b; 2&#xff0c;Command命令窗口和ContentBr…

设计模式-命令模式-笔记

“行为变化”模式 在组件的构建过程中&#xff0c;组件行为的变化经常导致组件本身剧烈的变化。“行为变化”模式组件的行为和组件本身进行解耦&#xff0c;从而支持组件行为的变化&#xff0c;实现两者之间的松耦合。 经典模式&#xff1a;Command、Visitor 动机&#xff0…

【LeetCode刷题-链表】--23.合并K个升序链表

23.合并K个升序链表 方法&#xff1a;顺序合并 在前面已经知道合并两个升序链表的前提下&#xff0c;用一个变量ans来维护以及合并的链表&#xff0c;第i次循环把第i个链表和ans合并&#xff0c;答案保存到ans中 /*** Definition for singly-linked list.* public class List…

算法学习 day27

第二十七天 美化数组的最少删除数 2216. 美化数组的最少删除数 - 力扣&#xff08;LeetCode&#xff09; class Solution { public:int minDeletion(vector<int>& nums) {int len nums.size();if(len 0) return 0;int res 0,cur 0;for(int i 1;i < len;i)…

spring boot项目未将resource目录标志为资源目录导致配置文件无效因而运行报错问题

能编译&#xff0c;但不能运行。感觉配置文件没有生效。 将程序代码发给同事&#xff0c;我自己能跑&#xff0c;他不能跑&#xff0c;提示无法构造redis对象。redis的链接写在配置文件里&#xff0c;其实是可以连接的。然后从GIT库下载代码&#xff0c;也同样不能跑。同事的操…

OpenCV快速入门:目标检测——轮廓检测、轮廓的距、点集拟合和二维码检测

文章目录 前言一、轮廓检测1.1 图像轮廓的概念1.2 轮廓检测算法简介1.3 轮廓检测基本步骤1.4 轮廓检测函数说明1.4.1 轮廓发现1.4.2 轮廓面积1.4.3 轮廓周长1.4.4 轮廓外接多边形1.4.5 点到轮廓距离1.4.6 凸包检测 1.5 轮廓检测代码实现 二、轮廓的距2.1 几何距2.2 中心距2.3 H…

springcloud医院挂号预约系统源码

开发技术&#xff1a; jdk1.8&#xff0c;mysql5.7&#xff0c;nodejs&#xff0c;idea&#xff0c;vscode springcloud springboot mybatis vue elementui 功能介绍&#xff1a; 用户端&#xff1a; 登录注册 首页显示医生列表&#xff0c;医院简介&#xff0c;点击医生…

P1141 01迷宫(dfs+染色联通块)

染色联通块&#xff1a; 一个格联通的所有格 每个对应的最大可联通格子的个数均相同 分析&#xff1a; 1.只需要计算每个块里的元素个数 2.元素标记对应某个块 3.查找元素时&#xff1a; 由 &#xff08;1&#xff09;元素坐标-> &#xff08;2&#xff09;查找…

Qt程序的自定义安装卸载方案

前言 NSIS 是一个 Open Source 的 Windows 系统下安装程序制作程序&#xff1b; NSIS-UI-Plugin 是一个开源的NSIS UI插件&#xff1b; 0x0 环境搭建 https://www.cnblogs.com/NSIS/p/16581122.html https://github.com/sway913/NSIS-UI-Plugin 0x1 类图 0x2 二次开发 自定…

图形编辑器开发:自定义光标管理

大家好&#xff0c;我是前端西瓜哥。 今天来讲讲如何在图形编辑器中使用自定义光标&#xff0c;并对光标其进行管理。 编辑器 github 地址&#xff1a; https://github.com/F-star/suika 线上体验&#xff1a; https://blog.fstars.wang/app/suika/ 自定义光标的意义是什么&am…

JavaScript的学习之BOM和DOM,就这一篇就OK了!(超详细)

目录 Day28 JavaScript(2) 1、BOM对象 1.1 window对象 1.2 Location ( 地址栏)对象 1.3 本地存储对象 2、DOM对象(JS核心) 2.1 查找标签 2.2 绑定事件 2.3 操作标签 2.4 常用事件 Day28 JavaScript(2) 1、BOM对象 BOM:Broswer object model,即浏览器提供我们开发者…

ssh远程连接不了虚拟机ubuntu

直奔主题 1. 确保linux安装了ssh2.查看网络适配器是否启用3.连接成功 1. 确保linux安装了ssh sudo apt-get install openssh-server2.查看网络适配器是否启用 3.连接成功

“AI在未来”公益计划,亚马逊云科技将教育资源带到更多中西部学校

亚马逊云科技宣布携手中国光华科技基金会启动“AI在未来”公益计划2023至2024学年项目&#xff0c;预计本学年内在内蒙古、江西、湖南和广西四个省份开展该项目&#xff0c;并完成三年内为中西部地区一百所学校的一万名学生提供免费人工智能教育资源及实践机会的目标。 此外&am…

基于像素特征的kmeas聚类的图像分割方案

kmeans聚类代码 将像素进行聚类&#xff0c;得到每个像素的聚类标签&#xff0c;默认聚类簇数为3 def seg_kmeans(img,clusters3):img_flatimg.reshape((-1,3))# print(img_flat.shape)img_flatnp.float32(img_flat)criteria(cv.TERM_CRITERIA_MAX_ITERcv.TERM_CRITERIA_EPS,2…

《白帽子讲web安全》

第十四章 PHP安全 文件包含漏洞是“代码注入”的一种。“代码注入”这种攻击&#xff0c;其原理就是注入一段用户能控制的脚本或代码&#xff0c;并让服务器端执行。“代码注入”的典型代表就是文件包含&#xff08;File Inclusion&#xff09;。文件包含可能会出现在JSP、PHP…

如何有效的禁止Google Chrome自动更新?

禁止Chrome自动更新 1、背景2、操作步骤 1、背景 众所周知&#xff0c;当我们在使用Selenium进行Web自动化操作&#xff08;如爬虫&#xff09;时&#xff0c;一般会用到ChromeDriver。然而Driver的更新速度明显跟不上Chrome的自动更新。导致我们在使用Selenium进行一些操作时就…

新加坡服务器托管-金融企业的选择

新加坡作为一个亚洲金融中心&#xff0c;其优越的地理位置和先进的信息通信技术基础设施&#xff0c;使得其成为了众多金融机构企业选择服务器机房托管的理想地点。金融行业对于服务器的安全性和可靠性要求很高&#xff0c;而将服务器托管在新加坡有许多好处。 首先&#xff0c…

新零售数字化系统提供商怎么选择?2023十大收银系统排行榜-亿发

随着零售业务的日益繁荣和电子商务的迅猛发展&#xff0c;零售收银系统已成为各类商家提高效率、管理库存、提供更好服务的不可或缺的工具。然而&#xff0c;在众多的收银系统中&#xff0c;如何选择一款适合自己的&#xff0c;一直是许多商家头疼的问题。今天我们就来盘点一下…