【API篇】十、生成Flink水位线

news2025/1/15 6:34:36

文章目录

  • 1、水位线的生成原则
  • 2、有序流内置水位线
  • 3、乱序流内置水位线
  • 4、自定义周期性水位线生成器
  • 5、自定义断点式水位线生成器
  • 6、从数据源中发送水位线

1、水位线的生成原则

水位线出现,即代表这个时间之前的数据已经全部到齐,之后不会再出现之前的数据了。参考前面的乱序流,可以得出:

  • 想要保证数据绝对正确,就得加足够大的延迟,但实时性就没保障了
  • 想要实时性强,就得把延迟设置小,但此时迟到数据可能遗漏,准确性降低

水位线的定义,是对低延迟和结果准确性的一个权衡。Flink生成水位线的方法是.assignTimestampsAndWatermarks(),它主要用来为流中的数据分配时间戳,并生成水位线来指示事件时间

DataStream<Event> stream = env.addSource(xxx);

DataStream withTimestampsAndWatermarks = stream.assignTimestampsAndWatermarks(WatermarkStrategy对象);

WatermarkStrategy是一个接口,包含了一个时间戳分配器TimestampAssigner和一个水位线生成WatermarkGenerator:

public interface WatermarkStrategy<T> extends TimestampAssignerSupplier<T>,WatermarkGeneratorSupplier<T>{

    // 负责从流中数据元素的某个字段中提取时间戳,并分配给元素。时间戳的分配是生成水位线的基础。
    @Override
    TimestampAssigner<T> createTimestampAssigner(TimestampAssignerSupplier.Context context);

    // 主要负责按照既定的方式,基于时间戳生成水位线
    @Override
    WatermarkGenerator<T> createWatermarkGenerator(WatermarkGeneratorSupplier.Context context);
}

2、有序流内置水位线

有序流的时间戳全部单调递增,没有迟到数据,直接WatermarkStrategy.forMonotonousTimestamps()就可以拿到WatermarkStrategy对象

public class WatermarkMonoDemo {
    public static void main(String[] args) throws Exception {
    
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        
        env.setParallelism(1);

        SingleOutputStreamOperator<WaterSensor> sensorDS = env
                .socketTextStream("node01", 9527)
                .map(new WaterSensorMapFunction());

        // TODO 1.定义Watermark策略
        WatermarkStrategy<WaterSensor> watermarkStrategy = WatermarkStrategy
                // 1.1 指定watermark生成:升序的watermark,没有等待时间
                .<WaterSensor>forMonotonousTimestamps()
                // 1.2 指定 时间戳分配器,从数据中提取
                .withTimestampAssigner(new SerializableTimestampAssigner<WaterSensor>() {
                    @Override
                    public long extractTimestamp(WaterSensor element, long recordTimestamp) {
                        System.out.println("数据=" + element + ",recordTs=" + recordTimestamp);
                        // 返回的时间戳,要毫秒,这里拿自定义对象的ts属性做为时间戳
                        return element.getTs() * 1000L;
                    }
                });

        // TODO 2. 指定 watermark策略
        SingleOutputStreamOperator<WaterSensor> sensorDSwithWatermark = sensorDS.assignTimestampsAndWatermarks(watermarkStrategy);


        sensorDSwithWatermark.keyBy(sensor -> sensor.getId())
                // TODO 3.使用事件时间语义的窗口,别再用处理时间TumblingProcessTime
                .window(TumblingEventTimeWindows.of(Time.seconds(10)))
                .process(
                        new ProcessWindowFunction<WaterSensor, String, String, TimeWindow>() {

                            @Override
                            public void process(String s, Context context, Iterable<WaterSensor> elements, Collector<String> out) throws Exception {
                                long startTs = context.window().getStart();
                                long endTs = context.window().getEnd();
                                String windowStart = DateFormatUtils.format(startTs, "yyyy-MM-dd HH:mm:ss.SSS");
                                String windowEnd = DateFormatUtils.format(endTs, "yyyy-MM-dd HH:mm:ss.SSS");

                                long count = elements.spliterator().estimateSize();

                                out.collect("key=" + s + "的窗口[" + windowStart + "," + windowEnd + ")包含" + count + "条数据===>" + elements.toString());
                            }
                        }
                )
                .print();

        env.execute();
    }
}

执行下,输入10时,逻辑时钟被推到了10s,到达区间,触发窗口,执行全窗口函数的process,输出当前窗口的数据:

在这里插入图片描述

3、乱序流内置水位线

调用WatermarkStrategy. forBoundedOutOfOrderness(),传入延迟时间:

public class WatermarkOutOfOrdernessDemo {
    public static void main(String[] args) throws Exception {
    
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        
        env.setParallelism(1);

        SingleOutputStreamOperator<WaterSensor> sensorDS = env
                .socketTextStream("node01", 9527)
                .map(new WaterSensorMapFunction());

        // TODO 1.定义Watermark策略
        WatermarkStrategy<WaterSensor> watermarkStrategy = WatermarkStrategy
                // 1.1 指定watermark生成:乱序的,等待3s
                .<WaterSensor>forBoundedOutOfOrderness(Duration.ofSeconds(3))
                // 1.2 指定 时间戳分配器,从数据中提取
                .withTimestampAssigner(
                        (element, recordTimestamp) -> {
                            // 返回的时间戳,要 毫秒
                            System.out.println("数据=" + element + ",recordTs=" + recordTimestamp);
                            return element.getTs() * 1000L;
                        });

        // TODO 2. 指定 watermark策略
        SingleOutputStreamOperator<WaterSensor> sensorDSwithWatermark = sensorDS.assignTimestampsAndWatermarks(watermarkStrategy);

        sensorDSwithWatermark.keyBy(sensor -> sensor.getId())
                // TODO 3.使用 事件时间语义 的窗口
                .window(TumblingEventTimeWindows.of(Time.seconds(10)))
                .process(
                        new ProcessWindowFunction<WaterSensor, String, String, TimeWindow>() {

                            @Override
                            public void process(String s, Context context, Iterable<WaterSensor> elements, Collector<String> out) throws Exception {
                                long startTs = context.window().getStart();
                                long endTs = context.window().getEnd();
                                String windowStart = DateFormatUtils.format(startTs, "yyyy-MM-dd HH:mm:ss.SSS");
                                String windowEnd = DateFormatUtils.format(endTs, "yyyy-MM-dd HH:mm:ss.SSS");

                                long count = elements.spliterator().estimateSize();

                                out.collect("key=" + s + "的窗口[" + windowStart + "," + windowEnd + ")包含" + count + "条数据===>" + elements.toString());
                            }
                        }
                )
                .print();

        env.execute();
    }
}

执行:

在这里插入图片描述

简单分析下结果:

  • 第一条数据s1,1,1进来,创建窗口,水位线为1s-3s(延迟3s)
  • s1,10,10进来,水位线为10-3 =7s,还未到达10,窗口不触发(若是有序流,无等待下,此时窗口已被触发了)
  • 此时进来一条乱序数据,比如s1,6,6,6-3=3s,水位线保持上面的7不变,watermark不会推进,且6这条数据也会被统计在[0,10)的区间内
  • s1,11,11进来,11-3=8,也不会触发,但这条数据是属于[10,20)区间的那个桶的
  • s1,13,13进来,达到10,窗口触发

4、自定义周期性水位线生成器

上面只是定义了时间戳的提取逻辑,水位线的生成采用的默认内置策略。接下来自定义水位线生成器:周期性水位生成器。

周期性生成器是通过onEvent()观察判断输入的事件,而在onPeriodicEmit()里发射生成的水位线

// 自定义水位线的产生
public class CustomPeriodicWatermarkExample {

    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        
        env.setParallelism(1);

        SingleOutputStreamOperator<WaterSensor> sensorDS = env
                .socketTextStream("node01", 9527)
                .map(new WaterSensorMapFunction());

        // 定义Watermark策略
        WatermarkStrategy<WaterSensor> watermarkStrategy = WatermarkStrategy
                // 1.1 指定watermark生成器
                .<WaterSensor>forGenerator(context -> MyPeriodWatermarkGenerator<>(3000L))
                // 1.2 指定时间戳分配器,从数据中提取
                .withTimestampAssigner(
                        (element, recordTimestamp) -> {
                            // 返回的时间戳,要 毫秒
                            System.out.println("数据=" + element + ",recordTs=" + recordTimestamp);
                            return element.getTs() * 1000L;
                        });

        // TODO 2. 指定 watermark策略
        SingleOutputStreamOperator<WaterSensor> sensorDSwithWatermark = sensorDS.assignTimestampsAndWatermarks(watermarkStrategy);

        sensorDSwithWatermark.keyBy(sensor -> sensor.getId())
                // TODO 3.使用 事件时间语义 的窗口
                .window(TumblingEventTimeWindows.of(Time.seconds(10)))
                .process(
                        new ProcessWindowFunction<WaterSensor, String, String, TimeWindow>() {

                            @Override
                            public void process(String s, Context context, Iterable<WaterSensor> elements, Collector<String> out) throws Exception {
                                long startTs = context.window().getStart();
                                long endTs = context.window().getEnd();
                                String windowStart = DateFormatUtils.format(startTs, "yyyy-MM-dd HH:mm:ss.SSS");
                                String windowEnd = DateFormatUtils.format(endTs, "yyyy-MM-dd HH:mm:ss.SSS");

                                long count = elements.spliterator().estimateSize();

                                out.collect("key=" + s + "的窗口[" + windowStart + "," + windowEnd + ")包含" + count + "条数据===>" + elements.toString());
                            }
                        }
                )
                .print();

        env.execute();
    }

   
}

模仿前面的内置生成器,定义自己的水位线生成器:

public class MyPeroidWatermarkGenerator implements WatermarkGenerator<Event> {

     private Long delayTime = 5000L; // 延迟时间
     private Long maxTs = -Long.MAX_VALUE + delayTime + 1L; // 观察到的最大时间戳
	
	//构造方法,传入延迟时间,构造水位线生成器对象
	public MyPeroidWatermarkGenerator(long delayTime){
		this.delayTime = delayTime;
		this.maxTs = Long.MIN_VALUE + this.delayTime + 1;
	}
	
	/**
	* 每条数据进来都调用一次,用来提取最大的事件事件
	*/
     @Override
     public void onEvent(Event event,long eventTimestamp,WatermarkOutput output) {
         // 每来一条数据就调用一次
         maxTs = Math.max(event.timestamp,maxTs); // 更新最大时间戳
         System.out.println("调用了onEvent方法,获取目前为止最大的时间戳=" + maxTimestamp);
     }

	/**
	* 周期性调用,默认20ms
	*/
     @Override
     public void onPeriodicEmit(WatermarkOutput output) {
         // 发射水位线,默认200ms调用一次
         output.emitWatermark(new Watermark(maxTs - delayTime - 1L));
         System,out,println("调用了onPeriodicEmit方法,生成watermark==" + (maxTimestamp - delayTs - 1) );
     }
 }

核心部分,指定水位线生成器的Lamdba表达式展开就是:

在这里插入图片描述

运行:

  • 数据没进来前,每200ms调用一次发射水位线的方法,此时的水位线是构造方法里Long.MIN_VALUE那个
  • 进来一条数据,调用onEvent,最大时间戳被更新,到周期后再发射水位线maxTs-delayTs-1
  • 继续周期性调用onPeriodicEmit方法

在这里插入图片描述

onPeriodicEmit()里调用output.emitWatermark(),就可以发出水位线了,这个方法由系统框架周期性地调用,默认200ms一次

修改默认的周期,比如改为400ms:

env.getConfig().setAutoWatermarkInterval(400L);

5、自定义断点式水位线生成器

断点式生成器会不停地检测onEvent()中的事件,发现带有水位线信息的当事件时,就立即发出水位线。改下代码,定义水位线生成器:

public class PointWatermarkGenerator implements WatermarkGenerator<Event> {

     private Long delayTime = 5000L; // 延迟时间
     private Long maxTs = -Long.MAX_VALUE + delayTime + 1L; // 观察到的最大时间戳
	
	//构造方法,传入延迟时间,构造水位线生成器对象
	public MyPeroidWatermarkGenerator(long delayTime){
		this.delayTime = delayTime;
		this.maxTs = Long.MIN_VALUE + this.delayTime + 1;
	}
	
	/**
	* 每条数据进来都调用一次,用来提取最大的事件事件
	*/
     @Override
     public void onEvent(Event event,long eventTimestamp,WatermarkOutput output) {
         // 每来一条数据就调用一次
         maxTs = Math.max(event.timestamp,maxTs); // 更新最大时间戳
         // 发射水位线
         output.emitWatermark(new Watermark(maxTs - delayTime - 1L));
         System.out.println("调用了onEvent方法,获取目前为止最大的时间戳=" + maxTimestamp + ",生成watermark==" + (maxTimestamp - delayTs - 1));
     }

	/**
	* 周期性调用,默认20ms
	*/
     @Override
     public void onPeriodicEmit(WatermarkOutput output) {
         
     }
 }

周期性代码改为:

//...
		// 定义Watermark策略
        WatermarkStrategy<WaterSensor> watermarkStrategy = WatermarkStrategy
                // 1.1 指定watermark生成器
                .<WaterSensor>forGenerator(context -> PointWatermarkGenerator<>(3000L))
                // 1.2 指定时间戳分配器,从数据中提取
                .withTimestampAssigner(
                        (element, recordTimestamp) -> {
                            // 返回的时间戳,要 毫秒
                            return element.getTs() * 1000L;
                        });

运行:此时不再周期性的发射水位线

在这里插入图片描述

6、从数据源中发送水位线

在自定义的数据源中抽取事件时间,然后发送水位线:

env.fromSource(
kafkaSource, WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(3)), "kafkasource" 
)

//注意fromSorce方法的第二个传参,之前用的WatermarkStrategy.noWatermark()

注意此时不用再assignTimestampsAndWatermarks了,在自定义数据源中生成水位线和在程序中使用assignTimestampsAndWatermarks方法生成水位线二者只能取其一

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1131808.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

具有独特底部轮廓的剥离光刻胶的开发

引言 金属图案的剥离方法已广泛应用于各种电子器件的制造过程中&#xff0c;如半导体封装、MEMS和LED的制造。与传统的金属刻蚀方法不同的是&#xff0c;采用剥离法的优点是节省成本和工艺简化。在剥离过程中&#xff0c;经过涂层、曝光和开发过程后&#xff0c;光刻胶会在晶片…

最简单的Ubuntu更新Nvidia驱动的方法 解决nvidia-smi 不是内部或外部命令

在装有Ubuntu/Linux的服务器上遇到nvidia-smi不是内部或外部命令的问题&#xff0c;解决方法&#xff1a;更新英伟达驱动。 Ubuntu自带的“软件和更新”里可以方便地更新驱动&#xff0c;不需要从英伟达官网下载驱动。 我把它改成了第一项&#xff08;原来是一个名为nouveau的…

点成案例 | 点成生物干浴器的多领域应用

前沿 在前面的文章中&#xff0c;我们给大家分享了点成生物水浴设备在食品行业的食物特性和生物活性测定的应用案例&#xff0c;本期文章将为大家详细介绍一下点成生物的干浴设备及其在多种领域中的应用。 一、干浴器简介 干浴器&#xff0c;又名恒温金属浴或金属浴。与水浴设…

好用的Visio绘图文件工具 VSD Viewer最新 for mac

VSD Viewer是一款可以查看Microsoft Visio绘图文件的工具&#xff0c;适用于Windows和macOS操作系统。它具有以下优点&#xff1a; 直观易用&#xff1a;VSD Viewer的用户界面非常简单直观&#xff0c;易于使用。支持多种文件格式&#xff1a;VSD Viewer支持多种Visio文件格式…

JavaScript函数:入门指南

目录 函数的定义和调用 函数参数 返回值 匿名函数 回调函数 箭头函数的定义和语法 箭头函数的参数和返回值 箭头函数的this绑定 递归函数简介 基本情况 递归调用 注意事项 递归的应用 JavaScript是一种广泛使用的编程语言&#xff0c;拥有强大的函数功能。函数是J…

Java基础——流程控制

文章目录 顺序结构分支结构ifif elseswitch 循环结构forwhiledo while嵌套循环 流程控制语句breakcontinuereturn 流程控制语句是用来控制程序中各语句执行顺序&#xff0c;可以把语句组合成能完成一定功能的小逻辑模块。 顺序结构 程序从上到下逐行执行&#xff0c;中间没有…

系统优化-异步化

目录 1:异步化 1.1:什么是同步?什么是异步? 1.2:异步化业务流程分析 1.3:异步化的问题 2:线程池的理论和实战 2.1:为啥需要线程池? 2.2:线程池的实现 2.2.1:线程池参数 2.2.2:线程池的工作机制 1:异步化 什么时候使用异步化? 调用的服务处理能力有限&#xff0c…

酷开科技 | 酷开系统大屏电视,打造精彩家庭场景

在信息资讯不发达的年代&#xff0c;电视机一直都是个人及家庭重要的信息获取渠道和家庭娱乐中心&#xff0c;是每个家庭必不可少的大家电之一&#xff01;在快节奏的现代生活中&#xff0c;受手机和平板的冲击&#xff0c;电视机这个曾经的客厅“霸主”一度失去了“主角光环”…

Haproxy 服务

Haproxy&#xff1a;他也是常用的负载均衡软件 nginx 支持四层转发&#xff0c;七层转发 haproxy 也是四层和七层转发 LVS的DR和NAT都是基于四层转发 都是基于流量的转发。 tun:四层和七层都有。 基于四层的转发&#xff1a; 1&#xff0c;lvs 2&#xff0c;nginx 3&…

Go语言入门心法(十六):Go远程过程调用框架GRPC实战

Go语言入门心法(一): 基础语法 Go语言入门心法(二): 结构体 Go语言入门心法(三): 接口 Go语言入门心法(四): 异常体系 Go语言入门心法(五): 函数 Go语言入门心法(六): HTTP面向客户端|服务端编程 Go语言入门心法(七): 并发与通道 Go语言入门心法(八): mysql驱动安装报错o…

vue-admin相关问题记录

编辑器设置自定义高度 base.component.js内关于tinymce内容初始化设置&#xff0c;增加高度自定义接收并初始化 <tinymce :content.sync"form.article_content" :height"400"></tinymce> textarea自定义高度 tooltip备注

Python数据结构(树)

Python数据结构&#xff08;树&#xff09; 树的概念 树(英语: tree)是一种抽象数据类型ADT) 或是实作这种抽象数据类型的数据结构&#xff0c;用来模拟具有树状结构性质的数据集合。它是由n(n>1)个有限节点组成一个具有层次关系的集合。把它叫做“树”是因为它看起来像一…

Elasticsearch核心技术与实战-05-elasticsearch的安装与简单配置-Windows

首先下载elasticsearch的zip包&#xff1a;下载地址 网络不通的解决方法&#xff1a;国内镜像站 es、kibana、logstash均可在华为云开元镜像站自行选择版本下载&#xff1a;下载地址 下载插件包&#xff1a; .\bin\elasticsearch-plugin install analysis-icu .\bin\elasti…

ORB-SLAM系列算法相关介绍(综合版)

一、参考资料 ORB-SLAM2详解&#xff08;一&#xff09;简介 &#xff08;公开课&#xff09;视觉SLAM原理与ORB-SLAM3系列算法 二、相关介绍 1. ORB简介 ORB指的是一种旋转不变性特征。 2. ORB-SLAM系列算法的演进 三、ORB-SLAM 论文&#xff1a;ORB-SLAM: A Versatile …

php使用lunar实现农历、阳历、节日等功能

lunar是一个支持阳历、阴历、佛历和道历的日历工具库&#xff0c;它开源免费&#xff0c;有多种开发语言的版本&#xff0c;不依赖第三方&#xff0c;支持阳历、阴历、佛历、道历、儒略日的相互转换&#xff0c;还支持星座、干支、生肖等。仅供参考&#xff0c;切勿迷信。 官…

相机卡格式化了还能恢复吗?答案在这!(附带恢复教程)

“天啊&#xff01;和朋友出去旅行拍了好多美美的照片&#xff0c;在传照片的时候不小心点到了格式化&#xff0c;相机里所有的照片都被清空了&#xff01;这可怎么办呀&#xff1f;相机卡被格式化了还有机会恢复吗&#xff1f;” 相机的存储卡通常会保存我们很多美好的记忆&am…

某网站互动数据采集

1&#xff0c;网址 aHR0cHM6Ly9uZXdzLmZ1dHVubi5jb20vcG9zdC8zMzE4MzE1OQ2&#xff0c;找到返回互动数的请求包 3&#xff0c;采集互动数据加密信息如下 4&#xff0c;察看抓到的包&#xff0c;不难发现futu-offline-csrf-v2和futu-x-csrf-token-v2这两个参数在首页的请求中有…

vue2.0项目中组件和iframe之间如何传值

vue2.0项目中组件和iframe之间如何传值 一、vue组件二、iframe组件 一、vue组件 mounted() {// 注册 message 事件监听器&#xff0c;只注册一次window.addEventListener(message, this.handleFromIframeMessage) }, beforeDestroy() {// 移除事件监听器window.removeEventList…

测开( 进阶篇)

目录 按测试对象进行划分 界面测试 可靠性测试 容错性 灾难恢复性测试 - 了解即可 文档测试 兼容性测试 易用性测试 安装卸载测试 安全测试 性能测试 内存泄漏测试 实战 - 微信发红包的测试用例 按是否查看代码划分 黑盒测试(Black-box Testing) 白盒测试(Whit…

02333软件工程串讲

完整笔记在语雀 https://www.yuque.com/huangzhanqi/nrt1l4/zoa0g0osnrmog0xdhttps://www.yuque.com/huangzhanqi/nrt1l4/zoa0g0osnrmog0xd 《软件工程》串讲讲义 应考指导 一、课程介绍 1、课程性质 《软件工程》是全国高等教育自学考试计算机及应用&#xff08;独立本科…