Flink-水位线的设置以及传递

news2025/1/16 20:06:02

6.2 水位线

6.2.1 概述

  1. 分类
  • 有序流

image.png

  • 无序流
    image.png
    判断的时间延迟
  1. 延迟时间判定

6.2.2 水位线的设置

  1. 分析

image.png
DataStream下的assignTimstampsAndWatermarks方法,返回SingleOutputStreamOperator本质还是个算子,传入的参数是WatermarkStrategy的生成策略

image.png
但是WatermarkStrategy是一个接口

  • 有序流

image.png

因此调用静态方法forMonotonousTimeStamps后new AscendingTimestampsWatermarks返回WatermarkGernerator
image.png

AscendingTimestampsWatermarks这个继承自BoundOutOfOrdernessWatermarks

image.png
image.png
image.png

BoundOutOfOrdernessWatermarks这个类有onEvent和onPeriodicEmit这两方法,因为实现了WatermarkGenerator这个接口

image.png

然后在调用接口中的默认方法withTimestampAssigner得到返回WatermarkStrategy,参数是new SerializableTimestampAssigner的对象,重写extractTimestamp方法,这个方法作用是怎么样从数据里面提取时间戳

image.png

  • 乱序流

image.png
因此调用静态方法forBoundedOutOfOrderness(参数为最大乱序程度,也就是延迟时间)后new BoundOutOfOrdernessWatermarks返回WatermarkGernerator

image.png

BoundOutOfOrdernessWatermarks这个类有onEvent和onPeriodicEmit这两方法,因为实现了WatermarkGenerator这个接口(跟上面一样了)

image.png

后面也跟有序一样,然后在调用接口中的默认方法withTimestampAssigner得到返回WatermarkStrategy

  1. 完整代码
public class WatermarkTest {
    public static void main(String[] args) throws Exception{
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        //1.输入
        SingleOutputStreamOperator<Event> stream = env.fromElements(new Event("Mary", "./home", 1000L),
                new Event("Bob", "./cart", 2000L),
                new Event("Alice", "./prod?id=100", 3000L),
                new Event("Bob", "./prod?id=1", 3300L),
                new Event("Alice", "./prod?id=200", 3000L),
                new Event("Bob", "./home", 3500L),
                new Event("Bob", "./prod?id=2", 3800L),
                new Event("Bob", "./prod?id=3", 4200L))
            
//                //有序流的watermark生成
//                //forMonotonousTimestamps前指定泛型
//                .assignTimestampsAndWatermarks(WatermarkStrategy
//                        .<Event>forMonotonousTimestamps()//得到WatermarkGenerator
//                        .withTimestampAssigner(new SerializableTimestampAssigner<Event>() {//返回WatermarkStrategy
//                            @Override
//                            //参数是当前传过来的数据element,另一个传出的recordTimestamp是时间戳
//                            public long extractTimestamp(Event element, long recordTimestamp) {
//                                return element.timestamp;
//                            }
//                        })
//                )
            .assignTimestampsAndWatermarks(WatermarkStrategy
                    //forMonotonousTimestamps前指定泛型
                    //forMonotonousTimestamps参数是最大乱序时间
                    .<Event>forBoundedOutOfOrderness(Duration.ofSeconds(2))//得到WatermarkGenerator
                    .withTimestampAssigner(new SerializableTimestampAssigner<Event>() {
                        @Override
                        public long extractTimestamp(Event element, long recordTimestamp) {
                            return element.timestamp;
                        }
                    })
            );
        env.execute();
    }
}

6.2.3 自定义水位线

  1. 分析

image.png

或者直接new 一个接口WatermarkStrategy重写createWatermarkGenerator的watermark生成器的方法(生成WatermarkGenerator)以及createTimeStampAssigner提取时间戳分配器的方法(生成TimeStampAssigner)创建watermark

image.png

image.png

image.png

image.png

WatermarkGenerator是个接口,有两个方法分别是onEvent方法,主要目标是要发出一个WatermarkOutput,另一个是onperiodicEmit方法,表示周期性的生成,周期性生成时间默认是2秒,env调用getConfig后调用setAutoWatermarkInterval后可以更改周期性生成时间

image.png
image.png

WatermarkOutput也是一个接口,调用emitWatermark就能发出一个watermark,

image.png

image.png

除了WatermarkGenerator接口还有TimeStampAssigner也是个接口,里面只有一个方法叫做extractTimestamp,目的是从当前数据提取时间戳,同时也会作为WatermarkGenerator这个接口中onEvent方法中传入的参数eventTimestamp时间戳(见上上上上上上图)

  1. 代码
  • 正常水位线
// 自定义水位线的产生
public class CustomWatermarkTest {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        env.addSource(new ClickSource())
                .assignTimestampsAndWatermarks(new CustomWatermarkStrategy())
                .print();
        env.execute();
    }
    //内部静态类
    public static class CustomWatermarkStrategy implements WatermarkStrategy<Event> {
        @Override
        //createTimestampAssigner方法生成TimeStampAssigner
        public TimestampAssigner<Event> createTimestampAssigner(TimestampAssignerSupplier.Context context) {
            return new SerializableTimestampAssigner<Event>() {
                @Override
                //extractTimestamp,目的是从当前数据提取时间戳
                public long extractTimestamp(Event element, long recordTimestamp)
                {
                    return element.timestamp; // 告诉程序数据源里的时间戳是哪一个字段
                }
            };
        }
        @Override
        //createWatermarkGenerator生成WatermarkGenerator
        public WatermarkGenerator<Event> createWatermarkGenerator(WatermarkGeneratorSupplier.Context context) {
            return new CustomPeriodicGenerator();
        }
    }
    //CustomPeriodicGenerator实现WatermarkGenerator接口,并重写方法
    public static class CustomPeriodicGenerator implements WatermarkGenerator<Event> {
        private Long delayTime = 5000L; // 延迟时间
        private Long maxTs = Long.MIN_VALUE + delayTime + 1L; // 观察到的最大时间戳
        @Override
        //更新当前时间戳,这边不发送水位线,目的是保存时间戳
        public void onEvent(Event event, long eventTimestamp, WatermarkOutput
                output) {
            // 每来一条数据就调用一次
            maxTs = Math.max(event.timestamp, maxTs); // 更新最大时间戳
        }
        @Override
        public void onPeriodicEmit(WatermarkOutput output) {
            // 发射水位线,默认 200ms 调用一次
            //-1毫秒都是为了贴切窗口闭合的时候左闭右开设计
            output.emitWatermark(new Watermark(maxTs - delayTime - 1L));
        }
    }
}

  • 断点水位线

在onevent根据条件触发,onPeriodicEmit这个方法中就不用做了

    public class CustomPunctuatedGenerator implements WatermarkGenerator<Event> {
        @Override
        public void onEvent(Event r, long eventTimestamp, WatermarkOutput output) {
        // 只有在遇到特定的 itemId 时,才发出水位线
            if (r.user.equals("Mary")) {
                output.emitWatermark(new Watermark(r.timestamp - 1));
            }
        }
        @Override
        public void onPeriodicEmit(WatermarkOutput output) {
            // 不需要做任何事情,因为我们在 onEvent 方法中发射了水位线
        }
    }

  • 在自定义数据源中发送水位线

使用 collectWithTimestamp 方法将数据发送出去,原来直接out.collect()的

image.png

参数是当前数据还有当前数据的时间戳,跟水位线生成中extractTimestamp(Event element, long recordTimestamp)这个类似,也是一个数据是什么,一个时间戳是啥

然后发送水位线,用emitWatermark方法生成

public class CustomWatermarkTest {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env =
                StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        env.addSource(new ClickSourceWithWatermark()).print();
        env.execute();
    }
    // 泛型是数据源中的类型
    public static class ClickSourceWithWatermark implements SourceFunction<Event>
    {
        private boolean running = true;
        @Override
        public void run(SourceFunction.SourceContext<Event> sourceContext) throws Exception {
            Random random = new Random();
            String[] userArr = {"Mary", "Bob", "Alice"};
            String[] urlArr = {"./home", "./cart", "./prod?id=1"};
            while (running) {
                long currTs = Calendar.getInstance().getTimeInMillis(); // 毫秒时间戳
                String username = userArr[random.nextInt(userArr.length)];
                String url = urlArr[random.nextInt(urlArr.length)];
                Event event = new Event(username, url, currTs);
                // 使用 collectWithTimestamp 方法将数据发送出去,并指明数据中的时间戳的字段
                sourceContext.collectWithTimestamp(event, event.timestamp);
                // 发送水位线
                sourceContext.emitWatermark(new Watermark(event.timestamp - 1L));
                Thread.sleep(1000L);
            }
        }
        @Override
        public void cancel() {
            running = false;
        }
    }
}

6.2.4 水位线的传递

针对多个分区,上游需要告诉下游水位线情况,采用的是广播的方式给所有下游子任务

但是上游如果也是并行的,向下传输的水位线可能有多个,以上游发过来最小的时钟为准,并且下游会有一个分区专门保存上游发过来的水位线最小的数据

image.png

image.png

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/18900.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

C51 - 中断系统

Contents1> 定义2> 作用3> 组成3.1> 中断系统结构3.2> 8个中断源3.3> 中断向量3.4> 中断优先级4> 原理5> 应用1> 定义 中断&#xff08;interrupt&#xff09;是指&#xff1a; CPU执行某一程序过程中&#xff0c;由于系统内&#xff0c;或外部某…

【操作文件的系统调用】

目录文件操作系统调用的基本库函数打开文件读取文件写入文件关闭文件应用文件操作代码举例文件操作与进程复制的结合先打开文件再复制进程先进程复制&#xff0c;再进行打开文件缓冲区的知识回顾在上一篇讲述僵尸进程的文章中对文件的系统调用做了一点点的代码讲述&#xff0c;…

day08 微服务保护

1、JMeter压力测试 1.1、安装启动 JMeter 依赖于JDK&#xff0c;所以必须确保当前计算机上已经安装了 JDK&#xff0c;并且配置了环境变量。 Apache Jmeter官网下载&#xff0c;地址&#xff1a;http://jmeter.apache.org/download_jmeter.cgi 解压缩即可使用&#xff0c;目…

【Acwing—单源最短路:建图】

y总说&#xff0c;图论题的难点不在于打板子&#xff0c;而是建图的过程 个人觉得&#xff0c;建图的过程分成以下阶段&#xff1a; 1.确定结点的意义 2.确定边权的意义 结点一般都很显然&#xff0c;但是边权的意义我们一般把它设成对答案&#xff08;或需要维护的东西&am…

C++入门知识(二)

最近太忙了&#xff0c;发论文写开题&#xff0c;有两周时间没有学习C了&#xff0c;因为都是抽时间来学习&#xff0c;所以本篇博客也是零零散散的&#xff0c;接下来尽量抽时间吧 目录 六、引用 6.1 引用概念 6.2 引用特性 6.3 常引用 6.4 使用场景 6.5 传值、传引用…

并发编程(一)可见性

【并发编程三大特性】&#xff1a; 可见性 有序性 原子性&#xff08; 较复杂 &#xff09; 【线程的可见性】&#xff1a; 【一个例子认识线程的可见性】&#xff1a; import Utils.SleepHelper; import java.io.IOException;public class T01_HelloVolatile {private sta…

Go中的泛型和反射以及序列化

嗨喽,小伙伴们,好几没有更新了,最近在搞一些云原生的东西,docker , k8s 搞得我暂时迷失了方向,不过我举得搞IT吗,就是在不断尝试,搞一下当下最新的技术,不然 … GO中的泛型与继承 搞过java的都知道泛型与继承,在go中也开始搞泛型与继承了(在go1.8之后) 先看代码–>> p…

【记录】PyCharm 安装 preprocess 模块(库)|| 在 PyCharm 中安装 preprocess 失败,故而在 终端 安装

preprocess.py 针对的是处理许多 文件类型。它工作的语言包括&#xff1a;C、Python、 perl、tcl、xml、javascript、css、idl、tex、fortran、php、java、shell 脚本&#xff08;bash、csh等&#xff09;和c。预处理可以作为 命令行应用程序和作为python 模块。 目录一、在 Py…

矩阵论复习提纲

矩阵论复习提纲 第一章 矩阵相似变化 1、特征值与特征向量 A ∈ Cnxn 若存在 λ ∈ C 满足 Ax λx 则 λ 为 A 的特征值 可转换为 &#xff08;λI - A&#xff09;x 0 特征多项式 &#xff1a;det(λI - A) 特征矩阵&#xff1a; λI - A 2、相似对角化 1. 判断可对角化…

VMware Fusion 13 正式版终于来了

千呼万唤&#xff0c;经历两年之久&#xff0c;VMware终于在Fusion 13正式版中支持了Apple Silicon 版Mac&#xff0c;此次发布的Fusion是Universal版本&#xff0c;也就是一个安装包同时适配Intel Mac及Apple Silicon &#xff08;M1&#xff0c;M2&#xff09;Mac。想起我两年…

疑难杂症集合(备忘)

sshd&#xff1a;no hostkeys available 解决过程: #ssh-keygen -t dsa -f /etc/ssh/ssh_host_dsa_key #ssh-keygen -t rsa -f /etc/ssh/ssh_host_rsa_key #/usr/sbin/sshd 如果上述两个文件存在&#xff0c;仍然出现这个错误&#xff0c;那么试试 chmod 600 上述两个文件。之…

01-Python的基本概念

01-Python的基本概念 Python是一种直译式&#xff08;Interpreted&#xff09;、面向对象&#xff08;Object Oriented&#xff09;的程序语言’它拥有完整的函数库’可以协助轻松地完成许多常见的工作。 所谓的直译式语言是指’直译器&#xff08;InteIpretor&#xff09;会将…

诊断故障码(Diagnostic Trouble Code-DTC)

诊断协议那些事儿 诊断协议那些事儿专栏系列文章&#xff0c;本文将由浅入深的介绍DTC&#xff08;Diagnostic Trouble Code&#xff09;。 关联文章&#xff1a; $19服务:DTCStatusMask和statusofDTC bit 定义 19服务List 文章目录诊断协议那些事儿DTC&#xff08;Diagnos…

Python pyenv install 下载安装慢(失败)完美解决

pyenv 下载原理就是将例如 Python-3.10.3.tar.xz 这样的压缩文件下载到本地缓存文件或临时文件&#xff0c;然后解压出来使用。 由于下载速度或者网不行&#xff0c;那么就跳过 pyenv 下载&#xff0c;自己下载&#xff0c;然后放到它的缓存文件&#xff0c;这样不就行了。 1…

3.np.random

1. np.random.seed np.random.seed()函数用于生成指定随机数。 seed()被设置了之后&#xff0c;np.random.random()可以按顺序产生一组固定的数组&#xff0c;如果使用相同的seed()值&#xff0c;则每次生成的随机数都相同。 如果不设置这个值&#xff0c;那么每次生成的随机数…

[附源码]计算机毕业设计JAVA基于Java的护肤品网站

[附源码]计算机毕业设计JAVA基于Java的护肤品网站 项目运行 环境配置&#xff1a; Jdk1.8 Tomcat7.0 Mysql HBuilderX&#xff08;Webstorm也行&#xff09; Eclispe&#xff08;IntelliJ IDEA,Eclispe,MyEclispe,Sts都支持&#xff09;。 项目技术&#xff1a; SSM my…

【MySQL进阶】表的增删改查操作(CRUD)+(SQL执行顺序)

1. 新增(复制数据)2. 查询 - 进阶2.1 聚合查询2.2 group by2.3 having2.4 联合查询2.4.1 内连接2.4.2 外连接2.4.3 自连接2.4.4 子查询2.4.5 合并查询3 SQL的执行顺序(where...)1. 新增(复制数据) 语法 -- 字段名 列名 -- 将表2的数据复制到表1中 -- 两张表的结构要一样 i…

前端知识点

1.HTML 2.CSS 3.js 4.VUE 5.vUE的基本指令 6.VUE案例 7.ELEMENT HTML 设置图片 <img src"图片地址">让图片居中显示<center><img src"图片地址" width"270" height"900"></center>有序列表 <!--有序 ty…

Spring框架技术的核心与设计思想

目录 1. Spring 是什么? 1.1 什么是容器? 1.2 什么是 IoC ? 2. 传统式开发 3. 控制(权)反转式开发 4. 理解Spring 核心 - IoC 1. Spring 是什么? Spring 的全称是 Spring Framework, 它是一种开源框架, 2002 年, Rod Jahnson 首次推出了 Spring 框雏形 interface21…

[UE笔记]客户端服务器时间同步

内容系看教程所做的笔记 时间 往返时间&#xff08;RTT, Round-Trip Time&#xff09;&#xff1a;数据从客户端通过网络发送到服务器&#xff0c;再从服务器返回到客户端所需的时间。 首先客户端应当知道服务端的当前时间。 服务器启动时间总是先于客户端的&#xff0c;客户…