7.2、如何理解Flink中的水位线(Watermark)

news2025/1/13 13:32:07

目录

0、版本说明

1、什么是水位线?

2、水位线使用场景?

3、设计水位线主要为了解决什么问题?

4、怎样在flink中生成水位线?

4.1、自定义标记 Watermark 生成器

4.2、自定义周期性 Watermark 生成器

4.3、内置Watermark生成器 - 有序流水位线生成器

4.4、内置Watermark生成器 - 乱序流水位线生成器

4.5、在 读取数据源时 添加水位线

5、水位线和窗口的关系?

6、水位线在各个算子间的传递

6.1、测试用例 - 不设置 withIdleness 超时时间

6.2、测试用例 - 设置 withIdleness 超时时间


0、版本说明

        开发语言:java1.8

        Flink版本:1.17

        官网链接:官网链接

1、什么是水位线?

        Flink中水位线是一条特殊的数据(long timestamp)

        它会以时间戳的形式作为一条标识数据插入到数据流中


2、水位线使用场景?

        使用事件时间(EventTime)做流式计算任务时,需要根据事件时间生成水位线(Watermark)

        通过水位线来触发窗口计算,水位线作为衡量事件时间(EventTime)进展的标识


3、设计水位线主要为了解决什么问题?

        设计水位线主要是为了解决实时流中数据乱序和迟到的问题

        思考:什么原因造成了数据流的乱序呢?

                如今数据采集、数据传输大多都在分布式系统中完成

                各个机器节点因为网络和自身性能的原因 导致了数据的乱序和迟到


4、怎样在flink中生成水位线?

        Flink中支持在 数据源和普通DataStream上添加水位线生成策略(WatermarkStrategy)

4.1、自定义标记 Watermark 生成器

标记 Watermark 生成器特点:

        每条数据到来后,都会为其生成一条 Watermark

适用场景:

        数据量小且数据有序

代码示例:        

Step1:自定义 标记水位线生成器 实现类

// 自定义 标记水位线生成器 实现类
public class PeriodWatermarkGenerator<T> implements WatermarkGenerator<T> {

    // 每进入一条数据,都会调用一次 onEvent 方法
    @Override
    /*
     * 参数说明:
     *   @event : 进入到该方法的事件数据
     *   @eventTimestamp : 时间戳提取器提取的时间戳
     * */
    public void onEvent(T event, long eventTimestamp, WatermarkOutput output) {
        //发射水位线
        output.emitWatermark(new Watermark(eventTimestamp));
    }

    // 不需要实现
    @Override
    public void onPeriodicEmit(WatermarkOutput output) {
    }
}

Step2:自定义 标记性水位线生成策略 实现类

// TODO 自定义 标记性水位线生成策略
public class PeriodWatermarkStrategy implements WatermarkStrategy<Tuple2<String, Long>> {
    // TODO 实例化一个 事件时间提取器
    @Override
    public TimestampAssigner<Tuple2<String, Long>> createTimestampAssigner(TimestampAssignerSupplier.Context context) {
        TimestampAssigner<Tuple2<String, Long>> timestampAssigner = new TimestampAssigner<Tuple2<String, Long>>() {

            @Override
            public long extractTimestamp(Tuple2<String, Long> element, long recordTimestamp) {
                return element.f1;
            }
        };
        return timestampAssigner;
    }

    // TODO 实例化一个 watermark 生成器
    @Override
    public WatermarkGenerator<Tuple2<String, Long>> createWatermarkGenerator(WatermarkGeneratorSupplier.Context context) {
        return new PeriodWatermarkGenerator<>();
    }
}

Step3:使用 标记性水位线生成策略

// TODO 使用 自定义标记 Watermark 生成器
public class UserPeriodWatermarkStrategy {
    public static void main(String[] args) throws Exception {
        // 1.获取执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        // 2.将socket作为数据源(开启socket端口: nc -lk 9999)
        SingleOutputStreamOperator<Tuple2<String, Long>> sourceDataStream = env.socketTextStream("localhost", 9999)
                .map(new MapFunction<String, Tuple2<String, Long>>() {
                         @Override
                         public Tuple2 map(String value) throws Exception {
                             return new Tuple2(value.split(",")[0], Long.parseLong(value.split(",")[1]));
                         }
                     }
                );

        // 3.为 DataStream 添加水位线生成策略 (使用 自定义WatermarkStrategy 实现类)
        SingleOutputStreamOperator<Tuple2<String, Long>> assignTimestampsAndWatermarksDs = sourceDataStream.assignTimestampsAndWatermarks(new PeriodWatermarkStrategy());

        // 4.通过 processFunction实例 查看生成的水位线
        SingleOutputStreamOperator<String> process = assignTimestampsAndWatermarksDs.process(new ShowProcessFunction());
        process.print();

        // 5.触发程序执行
        env.execute();
    }
}

查看运行结果:


4.2、自定义周期性 Watermark 生成器

标记 Watermark 生成器特点:

        基于处理时间,周期性生成 Watermark

适用场景:

        数据量大且可能存在一定程度数据延迟(乱序)

代码示例:        

Step1:自定义 周期性水位线生成器 实现类

// 自定义 周期性水位线生成器
public class PunctuatedWatermarkGenerator<T> implements WatermarkGenerator<T> {
    // 设置变量,用来保存 当前最大的事件时间
    private long currentMaxTimestamp;
    // 设置变量,指定最大的乱序时间(等待时间)
    private final long maxOutOfOrderness = 0000; // 3 秒

    @Override
    public void onEvent(T event, long eventTimestamp, WatermarkOutput output) {
        // 只更新当前最大时间戳,不再发生水位线
        if (currentMaxTimestamp < eventTimestamp) currentMaxTimestamp = eventTimestamp;
    }

    // 周期性 生成水位线
    // 每个 setAutoWatermarkInterval 时间,调用一次该方法
    @Override
    public void onPeriodicEmit(WatermarkOutput output) {
        // 发出的 watermark = 当前最大时间戳 - 最大乱序时间
        output.emitWatermark(new Watermark(currentMaxTimestamp - maxOutOfOrderness));
    }
}

Setp2:自定义 周期性水位线生成策略 实现类

// 自定义 周期性水位线生成策略
public class PunctuatedWatermarkStrategy implements WatermarkStrategy<Tuple2<String, Long>> {
    // TODO 实例化一个 事件时间提取器
    @Override
    public TimestampAssigner<Tuple2<String, Long>> createTimestampAssigner(TimestampAssignerSupplier.Context context) {
        TimestampAssigner<Tuple2<String, Long>> timestampAssigner = new TimestampAssigner<Tuple2<String, Long>>() {

            @Override
            public long extractTimestamp(Tuple2<String, Long> element, long recordTimestamp) {
                return element.f1;
            }
        };

        return timestampAssigner;
    }

    // TODO 实例化一个 watermark 生成器
    @Override
    public WatermarkGenerator<Tuple2<String, Long>> createWatermarkGenerator(WatermarkGeneratorSupplier.Context context) {
        return new PunctuatedWatermarkGenerator<>();
    }

}

Step3:周期性水位线生成策略

// TODO 使用 自定义周期性 Watermark 生成器
public class UserPunctuatedWatermarkStrategy {
    public static void main(String[] args) throws Exception {
        // 1.获取执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        // TODO 设置周期性生成水位线的时间间隔(默认为200毫秒)
        env.getConfig().setAutoWatermarkInterval(3 * 1000L);

        // 2.将socket作为数据源(开启socket端口: nc -lk 9999)
        SingleOutputStreamOperator<Tuple2<String, Long>> ds = env.socketTextStream("localhost", 9999)
                .map(new MapFunction<String, Tuple2<String, Long>>() {
                         @Override
                         public Tuple2 map(String value) throws Exception {
                             return new Tuple2(value.split(",")[0], Long.parseLong(value.split(",")[1]));
                         }
                     }
                );

        // TODO 获取 WatermarkStrategy实例 (方式1:通过 WatermarkStrategy实现类获取)
        PunctuatedWatermarkStrategy punctuatedWatermarkStrategy = new PunctuatedWatermarkStrategy();

        // TODO 获取 WatermarkStrategy实例 (方式2:通过 WatermarkStrategy工具类获取) 推荐
        WatermarkStrategy<Tuple2<String, Long>> punctuatedWatermarkStrategyByUtil = WatermarkStrategy.<Tuple2<String, Long>>forGenerator(context -> new PunctuatedWatermarkGenerator<>())
                .withTimestampAssigner((event, timestamp) -> event.f1);

        // 3.使用 自定义水位线策略实例 来提取时间戳&生成水位线
        SingleOutputStreamOperator<Tuple2<String, Long>> assignTimestampsAndWatermarksDs = ds.assignTimestampsAndWatermarks(punctuatedWatermarkStrategy);

        // 4.通过 processFunction实例 查看生成的水位线
        SingleOutputStreamOperator<String> process = assignTimestampsAndWatermarksDs.process(new ShowProcessFunction());
        process.print();

        // 3.触发程序执行
        env.execute();
    }
}

查看运行结果:


4.3、内置Watermark生成器 - 有序流水位线生成器

有序流水位线生成器特点:

        基于处理时间,周期性生成 Watermark,最大乱序时间为0

适用场景:

        大数量有序流

代码示例:

// TODO 内置Watermark生成器 - 有序流水位线生成器
public class UserForMonotonousTimestamps {
    public static void main(String[] args) throws Exception {
        // 1.获取执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        // TODO 设置周期性生成水位线的时间间隔(默认为200毫秒)
        env.getConfig().setAutoWatermarkInterval(3 * 1000L);

        // 2.将socket作为数据源(开启socket端口: nc -lk 9999)
        SingleOutputStreamOperator<Tuple2<String, Long>> sourceDataStream = env.socketTextStream("localhost", 9999)
                .map(new MapFunction<String, Tuple2<String, Long>>() {
                         @Override
                         public Tuple2 map(String value) throws Exception {
                             return new Tuple2(value.split(",")[0], Long.parseLong(value.split(",")[1]));
                         }
                     }
                );

        // TODO 创建 内置水位线生成策略
        WatermarkStrategy<Tuple2<String, Long>> watermarkStrategy = WatermarkStrategy.<Tuple2<String, Long>>forMonotonousTimestamps()
                .withTimestampAssigner((element,recordTimestamp) -> element.f1);

        // 3.使用 内置水位线生成策略
        SingleOutputStreamOperator<Tuple2<String, Long>> assignTimestampsAndWatermarksDs = sourceDataStream.assignTimestampsAndWatermarks(watermarkStrategy);

        // 4.通过 processFunction实例 查看生成的水位线
        SingleOutputStreamOperator<String> process = assignTimestampsAndWatermarksDs.process(new ShowProcessFunction());
        process.print();

        // 3.触发程序执行
        env.execute();
    }
}

查看运行结果:


4.4、内置Watermark生成器 - 乱序流水位线生成器

乱序流水位线生成器特点:

        基于处理时间,周期性生成 Watermark,可以这是最大乱序时间

适用场景:

        大数量乱序流

代码示例:

// TODO 内置Watermark生成器 - 乱序流水位线生成器
public class UserForBoundedOutOfOrderness {
    public static void main(String[] args) throws Exception {
        // 1.获取执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        // TODO 设置周期性生成水位线的时间间隔(默认为200毫秒)
        env.getConfig().setAutoWatermarkInterval(3 * 1000L);

        // 2.将socket作为数据源(开启socket端口: nc -lk 9999)
        SingleOutputStreamOperator<Tuple2<String, Long>> ds = env.socketTextStream("localhost", 9999)
                .map(new MapFunction<String, Tuple2<String, Long>>() {
                         @Override
                         public Tuple2 map(String value) throws Exception {
                             return new Tuple2(value.split(",")[0], Long.parseLong(value.split(",")[1]));
                         }
                     }
                );

        // TODO 获取 WatermarkStrategy实例
        WatermarkStrategy<Tuple2<String, Long>> watermarkStrategy = WatermarkStrategy
                .<Tuple2<String, Long>>forBoundedOutOfOrderness(Duration.ofSeconds(1)) // 设置最大乱序时间为1s
                .withTimestampAssigner((element,recordTimestamp) -> element.f1);

        // 3.使用 内置水位线生成策略
        SingleOutputStreamOperator<Tuple2<String, Long>> assignTimestampsAndWatermarksDs = ds.assignTimestampsAndWatermarks(watermarkStrategy);

        // 4.通过 processFunction实例 查看生成的水位线
        SingleOutputStreamOperator<String> process = assignTimestampsAndWatermarksDs.process(new ShowProcessFunction());
        process.print();

        // 3.触发程序执行
        env.execute();
    }
}

查看运行结果:


4.5、在 读取数据源时 添加水位线

// 1.获取执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

// 2.创建 Source 对象
Source source = DataGeneratorSource、KafkaSource...

// 3.读取 source时添加水位线
env
        .fromSource(source, WatermarkStrategy实例, "source name")   
        .print()
;

// 4.触发程序执行
env.execute();

5、水位线和窗口的关系?

窗口什么时候创建?

        当窗口内的第一条数据到达时

窗口什么时候触发计算?

        当阈值水位线到达窗口时


6、水位线在各个算子间的传递

        下游算子 watermark 的计算方式是取所有不同的上游并行数据源 watermark 的最小值

测试代码:

// TODO 测试水位线的传递
public class TransmitWaterMark {
    public static void main(String[] args) throws Exception {
        // 1.获取执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(3); 

        // 2.将socket作为数据源(开启socket端口: nc -lk 9999)
        DataStreamSource<String> source = env.socketTextStream("localhost", 9999);

        source
                .partitionCustom(
                        new Partitioner<String>() {
                            @Override
                            public int partition(String key, int numPartitions) {
                                if (key.equals("a")) {
                                    return 0;
                                } else if (key.equals("b")) {
                                    return 1;
                                } else {
                                    return 2;
                                }
                            }
                        }, value -> value.split(",")[0]
                )
                .map(new MapFunction<String, Tuple2<String, Long>>() {
                    @Override
                    public Tuple2 map(String value) throws Exception {
                        return new Tuple2(value.split(",")[0], Long.parseLong(value.split(",")[1]));
                    }
                })
                .assignTimestampsAndWatermarks(
                        WatermarkStrategy
                                //.<Tuple2<String, Long>>forMonotonousTimestamps()
                                .<Tuple2<String, Long>>forGenerator(new PeriodWatermarkStrategy())
                                .withTimestampAssigner((element,recordTimestamp) -> element.f1)
                                .withIdleness(Duration.ofSeconds(5))  //空闲等待5s
                )
                .process(new ShowProcessFunction()).setParallelism(1)
                .print();
        
        env.execute();
    }
}

6.1、测试用例 - 不设置 withIdleness 超时时间

现象:如果上游某一个子任务一直没有数据更新,下游算子的水位线一直不会变化


6.2、测试用例 - 设置 withIdleness 超时时间

现象:如果上游某一个子任务`在指定时间内`数据更新,下游算子的水位线将不受该子任务最小值的影响

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1032076.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

【Java 基础篇】Java线程异常处理详解

在多线程编程中&#xff0c;异常处理是一个至关重要的方面&#xff0c;它决定了你的多线程应用程序的稳定性和可靠性。在本篇博客中&#xff0c;我们将深入探讨Java中的线程异常处理&#xff0c;包括线程抛出的异常类型、如何捕获和处理异常以及最佳实践。 异常类型 在多线程…

ps打开找不到MSVCP140.dll重新安装方法,安装ps出现msvcp140.dll缺失解决方法

在计算机中&#xff0c;我们可能会遇到许多问题&#xff0c;其中之一就是找不到msvcp140.dll文件。msvcp140.dll是一个动态链接库文件&#xff0c;它是Microsoft Visual C 2015 Redistributable的一部分。当计算机找不到这个文件时&#xff0c;可能会导致程序无法正常运行。本文…

PostgreSQL如何支持PL/Python过程语言

瀚高数据库 目录 环境 文档用途 详细信息 环境 系统平台&#xff1a;Linux x86-64 Red Hat Enterprise Linux 7 版本&#xff1a;10.4 文档用途 本文档主要介绍PostgreSQL如何支持PL/Python过程语言&#xff0c;如何创建plpython扩展。 详细信息 一、PostgreSQL支持python语言…

创建双向循环链表(不带头节点+插入删除操作)

#include<iostream> using namespace std; typedef struct list {int data;list* prior;list* next; }list,*linklist; void Createlist(linklist& l,int n)//创建&#xff08;不带头节点&#xff09;双向链表 {l new list;l->prior NULL;l->next NULL;link…

注册苹果开发者账号步骤揭秘,创建证书全攻略

​ 目录 转载&#xff1a;注册苹果开发者账号的方法 转载&#xff1a;注册苹果开发者账号的方法 在2020年以前&#xff0c;注册苹果开发者账号后&#xff0c;就可以生成证书。 但2020年后&#xff0c;因为注册苹果开发者账号需要使用Apple Developer app注册开发者账号&…

Cortex-M4之FPU单元

最近在学习实时操作系统&#xff0c;以下是我对学习实时操作系统过程中做的一些笔记。 一、FPU单元 在 Coretex-M4 处理器中有一个可选的单精度 FPU 单元&#xff0c;一般 STM32F429 就有 FPU 单元&#xff0c; 如果使能了 FPU 单元的话就可以使用它来对单精度浮点数进行计算…

Mac配置iTerm样式终端

一、MacOs系统 MacOs中终端使用iTerm2 1. 配置oh-my-zsh oh my zsh 的地址&#xff1a; https//github.com/ohmyzsh/ohmyzsh 插件存放位置&#xff1a;~/.oh-my-zsh/plugins 下载常用的插件 git clone http://github.com/zsh-users/zsh-syntax-highlighting.git 修改配…

APP广告变现策略:如何实现盈利与用户体验的平衡?

如何把握流量变现和用户体验的平衡呢&#xff1f;这是许多 APP 开发者在商业化进程中都会关心并迫切需要解决的问题。以下是总结的一些方法和策略。 一、深入了解用户需求 在开发产品或进行商业决策时&#xff0c;我们要始终以用户需求为出发点。通过市场调研、用户调研和数据…

iterm2免密码连接远程服务器教程

iterm2免密码连接远程服务器 commandO&#xff1a;打开iterm的Profiles。如下图 打开&#xff1a;Edit Profiles&#xff0c;页面如下图 点击左侧底部➕ Send text at start位置写&#xff1a;/Users/XXX/iterm_login.sh 22 username 服务器ip password demo&#xff1a;/User…

01Redis的安装和开机自启的配置

安装Redis 单机安装Redis 大多数企业都是基于Linux服务器来部署项目&#xff0c;而且Redis官方也没有提供Windows版本的安装包(此处选择的Linux版本的CentOS 7) Windows版直接下载对应版本的.zip压缩包解压即可使用 第一步: Redis是基于C语言编写的&#xff0c;因此首先需要…

9.21数电(加法器状态机独热编码)

加法 半加器 一位&#xff0c;分为两部分&#xff0c;一个是进位&#xff0c;一个是单位上的和 进位采取与门&#xff0c;单位上的和用异或门 全加器 进位数就是三个数进行加和&#xff0c;通过与门&#xff0c;就是两两过与门 描述每位的和项就是&#xff0c;只去描述那个…

【js】navigator.mediaDevices.getDisplayMedia实现屏幕共享:

文章目录 一、效果图:二、实现思路:三、实现代码: 一、效果图: 二、实现思路: 文档&#xff1a; 【MDN】https://developer.mozilla.org/zh-CN/docs/Web/API/Navigator/mediaDevices web技术分享| WebRTC 实现屏幕共享 面试官&#xff1a;纯前端如何实现录屏并保存视频到本地&a…

Unity中Shader通道ColorMask

文章目录 [TOC](文章目录) 前言一、ColorMask是用来干什么的二、怎么做到和 Unity UI 中的 Shader 一样根据UI层级自动适配Shader中模板测试值1、借鉴Unity官方的 UI Shader 前言 Unity中Shader通道ColorMask 一、ColorMask是用来干什么的 ColorMask RGB | A | 0 | R、G、B、…

无法解析插件 org.apache.maven.plugins:maven-clean-plugin:3.2.0 尝试使用 -U

无法解析插件 org.apache.maven.plugins:maven-clean-plugin:3.2.0 尝试使用 -U 报错如下&#xff1a; 解决方案&#xff1a;在文件夹里面找到报错的文件&#xff0c;删除&#xff0c;然后刷新.pom文件&#xff0c;让maven重新下载即可

共享WIFI项目新时代:代理商如何玩转下半场?

在21世纪的信息化时代&#xff0c;科技的发展日新月异&#xff0c;人们的生活方式也随之发生了翻天覆地的变化。其中&#xff0c;共享经济作为一种新型的经济形态&#xff0c;正在全球范围内迅速发展。共享WiFi作为共享经济的一个重要组成部分&#xff0c;也在下半场的竞争中&a…

最佳开源DEM全国、省、市、县DEM数据分享

数据简介 哥白尼数字高程模型(Copernicus DEM, COP-DEM)由欧洲航天局发布&#xff0c;被行业公认为是目前最佳开源DEM&#xff0c;分辨率有30米和90米两种&#xff0c;该数据来源于WorldDEM&#xff0c;WorldDEM产品是基于0.4弧秒(对应分辨率10-12米)的TanDEM-X DEM处理后得到…

亚马逊,沃尔玛,塔吉特测评补单,撸卡撸货采退高成功率的技巧

做撸的只有在安全稳定的环境下才能不被平台检测&#xff0c;造成被砍单或F号&#xff0c;所以在没有专业团队指导下&#xff0c;建议大家不要轻易尝试&#xff0c;毕竟试错和时间成本才是最大的 亚马逊风控点很多&#xff0c;卖家和工作室想要实现伪装度足够高的环境&#xff…

qt creator创建项目和添加图片资源

目录 一、创建项目 二、放上需要的图片 三、再次打开项目 四、运行项目 一、创建项目 二、放上需要的图片 在创建的项目里新建一个文件夹&#xff0c;里面放上需要的图片 右击项目点击add new 去第一步加的图片的文件夹&#xff0c;把所有图片都加载进来 然后去ui文件里就可…

网络电视机顶盒怎么样?百元价位最佳网络机顶盒排名

网络电视机顶盒无疑是家家户户必备的&#xff0c;用网络机顶盒可以看电视&#xff0c;玩游戏&#xff0c;上网课&#xff0c;K歌&#xff0c;购物等&#xff0c;你认为网络电视机顶盒怎么样&#xff1f;值不值得买&#xff1f;本期我盘点了百元价位段最佳网络机顶盒排名&#x…

基于HTML5架构的综合管廊系统网络结构设计

摘 要&#xff1a;从网络拓扑结构、开放式实时以太网协议、控制层系统配置方面介绍了综合管廊的系统网络架构设计&#xff0c;分析了无线网络特性&#xff0c;阐述了基于HTML5架构所能实现的功能的初步构想&#xff0c;以便于综合管廊运维人员巡检&#xff0c;确保管廊本体安全…