【API篇】十一、Flink水位线传递与迟到数据处理

news2025/1/14 1:00:07

文章目录

  • 1、水位线传递
  • 2、水位线设置空闲等待
  • 3、迟到数据处理:窗口允许迟到
  • 4、迟到数据处理:侧流输出
  • 5、问

1、水位线传递

上游task处理完水位线,时钟改变后,要把数据和当前水位线继续往下游算子的task发送。当一个任务接收到多个上游并行任务传递来的水位线时,以最小的那个作为当前任务的事件时钟。如图:上游算子并行度为4,:

- 第一波的2.4.3.6传递到下游task,取2
- 其中一个上游task的数据4到了,传递到下游,4.4.3.6,此时,水位线被更新为最小的3
- 其中一个上游task的7到了,下游task为4.7.3.6,最小仍为3,不更新
- 上游task的6到下游,下游为4.7.6.6,最小为4,水位线再更新

在这里插入图片描述

总结:

  • 接收到上游多个,取最小
  • 往下游多个发送,广播

使用上篇的乱序流来查看水位线的传递,这次把并行度不能再是1,设置为2

public class WatermarkOutOfOrdernessDemo {
    public static void main(String[] args) throws Exception {
    
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        
        env.setParallelism(2);

        SingleOutputStreamOperator<WaterSensor> sensorDS = env
                .socketTextStream("node01", 9527)
                .map(new WaterSensorMapFunction());

        // TODO 1.定义Watermark策略
        WatermarkStrategy<WaterSensor> watermarkStrategy = WatermarkStrategy
                // 1.1 指定watermark生成:乱序的,等待3s
                .<WaterSensor>forBoundedOutOfOrderness(Duration.ofSeconds(3))
                // 1.2 指定 时间戳分配器,从数据中提取
                .withTimestampAssigner(
                        (element, recordTimestamp) -> {
                            // 返回的时间戳,要 毫秒
                            System.out.println("数据=" + element + ",recordTs=" + recordTimestamp);
                            return element.getTs() * 1000L;
                        });

        // TODO 2. 指定 watermark策略
        SingleOutputStreamOperator<WaterSensor> sensorDSwithWatermark = sensorDS.assignTimestampsAndWatermarks(watermarkStrategy);

        sensorDSwithWatermark.keyBy(sensor -> sensor.getId())
                // TODO 3.使用 事件时间语义 的窗口
                .window(TumblingEventTimeWindows.of(Time.seconds(10)))
                .process(
                        new ProcessWindowFunction<WaterSensor, String, String, TimeWindow>() {

                            @Override
                            public void process(String s, Context context, Iterable<WaterSensor> elements, Collector<String> out) throws Exception {
                                long startTs = context.window().getStart();
                                long endTs = context.window().getEnd();
                                String windowStart = DateFormatUtils.format(startTs, "yyyy-MM-dd HH:mm:ss.SSS");
                                String windowEnd = DateFormatUtils.format(endTs, "yyyy-MM-dd HH:mm:ss.SSS");

                                long count = elements.spliterator().estimateSize();

                                out.collect("key=" + s + "的窗口[" + windowStart + "," + windowEnd + ")包含" + count + "条数据===>" + elements.toString());
                            }
                        }
                )
                .print();

        env.execute();
    }
}


执行:

在这里插入图片描述
画个示意图:

在这里插入图片描述

2、水位线设置空闲等待

结合上图,上面是并行度为2,数据进来了会轮询到两个上游task,如果此时一个上游task一直没有数据进来,而当前Task是以最小的那个作为当前任务的事件时钟,就会导致下游接收的Task时钟一直为起始值而无法推进,进而导致窗口无法触发。

public class WatermarkIdlenessDemo {

    public static void main(String[] args) throws Exception {
    
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        env.setParallelism(2);

        //  MyPartitioner是自定义分区器:数据%分区数,只输入奇数,都只会去往map的一个子任务,余数总为1,0.1两个map的task总去1
        SingleOutputStreamOperator<Integer> socketDS = env
                .socketTextStream("hadoop102", 7777)
                .partitionCustom(new MyPartitioner(), r -> r)
                .map(r -> Integer.parseInt(r))
                .assignTimestampsAndWatermarks(
                        WatermarkStrategy
                                .<Integer>forMonotonousTimestamps()
                                .withTimestampAssigner((r, ts) -> r * 1000L)
                                
                );


        // 分成两组: 奇数一组,偶数一组 , 开10s的事件时间滚动窗口
        socketDS
                .keyBy(r -> r % 2)
                .window(TumblingEventTimeWindows.of(Time.seconds(10)))
                .process(new ProcessWindowFunction<Integer, String, Integer, TimeWindow>() {
                    @Override
                    public void process(Integer integer, Context context, Iterable<Integer> elements, Collector<String> out) throws Exception {
                        long startTs = context.window().getStart();
                        long endTs = context.window().getEnd();
                        String windowStart = DateFormatUtils.format(startTs, "yyyy-MM-dd HH:mm:ss.SSS");
                        String windowEnd = DateFormatUtils.format(endTs, "yyyy-MM-dd HH:mm:ss.SSS");

                        long count = elements.spliterator().estimateSize();

                        out.collect("key=" + integer + "的窗口[" + windowStart + "," + windowEnd + ")包含" + count + "条数据===>" + elements.toString());

                    }
                })
                .print();


        env.execute();
    }
}

运行:

在这里插入图片描述

分析:以上demo中,为了实现数据总流向一个子task,用了自定义分区器:

在这里插入图片描述

.partitionCustom(new MyPartitioner(), r -> r)

以输出数据为key,key除以并行度2区域为分区逻辑,如果我一直输入奇数,分区值就一直为1,就可以实现数据只流向其中一个子task。流向下游算子时,一个task始终没数据,导致取小的时候一直取到了没数据的原始time,时钟无法更新,窗口无法触发。此时就需要设置最大空闲时间,太久没数据来时,就不让它参与比较。

.withIdleness(Duration.ofSeconds(5))  //空闲等待5s

在这里插入图片描述

此时,输入到9时,已到5s时间,不再比较另一个没数据的task,11一进来,立马触发窗口

在这里插入图片描述

3、迟到数据处理:窗口允许迟到

前面为了解决乱序流,提出了延迟的概念:

WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(3));

以上,即窗口延迟触发3秒,即让水位线的推进值 = 当前值 - 3,以便争取为乱序数据更多的时间进入窗口。但当延迟完成,窗口触发计算和关闭后,再来的属于已关闭窗口的数据就不会被统计在内了,这些数据也成为迟到数据。(本来8.30上课,老师等等家远的学生,说8.40开始讲课,结果你却9.00才到,那就门口站着取,别听了,类比数据不会再被对应窗口统计)

Flink窗口允许迟到数据,即触发窗口后,会先计算当前结果,但不关闭窗口(触发计算和关窗是两个动作)。 以后每来一条迟到数据,就触发一次这条数据所在窗口的增量计算。直到水位线被推进到了窗口结束时间 + 推迟时间。

注意区分延迟和推迟,延迟是老师等你到8.40上课(触发计算时间延长了),推迟则是,8.40课开始上了(触发计算了),但教室门不关,你在开始上课后(开始上课类比触发计算)10分钟的铃声没响之前(类比推迟时间为10分钟),能到的话,你依旧可以进教室听课。如果过了推迟时间,你仍没有到,那就窗口关闭,教室关门,你去网吧游荡吧。总结就是:

  • 延迟时间,操作的是触发计算的时间,用来处理乱序问题
  • 推迟时间,操作的是触发关窗的时间,用来处理迟到数据
.window(TumblingEventTimeWindows.of(Time.seconds(10)))  //窗口10s
.allowedLateness(Time.seconds(3))  //触发关窗延迟3秒

还是乱序流的例子,多一个allowedLateness

public class WatermarkOutOfOrdernessDemo {
    public static void main(String[] args) throws Exception {
    
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        
        env.setParallelism(1);

        SingleOutputStreamOperator<WaterSensor> sensorDS = env
                .socketTextStream("node01", 9527)
                .map(new WaterSensorMapFunction());

        // TODO 1.定义Watermark策略
        WatermarkStrategy<WaterSensor> watermarkStrategy = WatermarkStrategy
                // 1.1 指定watermark生成:乱序的,等待3s
                .<WaterSensor>forBoundedOutOfOrderness(Duration.ofSeconds(3))
                // 1.2 指定 时间戳分配器,从数据中提取
                .withTimestampAssigner(
                        (element, recordTimestamp) -> {
                            // 返回的时间戳,要 毫秒
                            return element.getTs() * 1000L;
                        });

        // TODO 2. 指定 watermark策略
        SingleOutputStreamOperator<WaterSensor> sensorDSwithWatermark = sensorDS.assignTimestampsAndWatermarks(watermarkStrategy);

        sensorDSwithWatermark.keyBy(sensor -> sensor.getId())
                // TODO 3.使用 事件时间语义 的窗口
                .window(TumblingEventTimeWindows.of(Time.seconds(10)))
                .allowedLateness(Time.seconds(3))
                .process(
                        new ProcessWindowFunction<WaterSensor, String, String, TimeWindow>() {

                            @Override
                            public void process(String s, Context context, Iterable<WaterSensor> elements, Collector<String> out) throws Exception {
                                long startTs = context.window().getStart();
                                long endTs = context.window().getEnd();
                                String windowStart = DateFormatUtils.format(startTs, "yyyy-MM-dd HH:mm:ss.SSS");
                                String windowEnd = DateFormatUtils.format(endTs, "yyyy-MM-dd HH:mm:ss.SSS");

                                long count = elements.spliterator().estimateSize();

                                out.collect("key=" + s + "的窗口[" + windowStart + "," + windowEnd + ")包含" + count + "条数据===>" + elements.toString());
                            }
                        }
                )
                .print();

        env.execute();
    }
}


此时窗口为10s,延迟3s触发计算,窗口结束时间 + 推迟时间才触发关闭,即水位线到达10+3=13s时,才触发关窗。在水位线未被推到13前,对于迟到的数据,会再次触发计算,且是来一条,触发一次计算。关窗后,再来迟到数据就在不管了,不会触发计算。

在这里插入图片描述

这也和前面整理的窗口生命周期对上了:计算和关窗实际是两个动作,窗口销毁的时机(关窗)是在时间进展 >= 窗口最大时间戳(end-1ms) + 允许迟到时间(默认0)

4、迟到数据处理:侧流输出

在上面的延迟关窗与允许迟到的基础上,肯定还是不能囊括所有数据,因为乱序程度理论上可以无限大,如上的例子,对于等了10分钟才开课,且到了关教室门的时间还没到的学生,让去网吧游荡也不合理(类比流中直接丢弃这个数据),可以考虑把严重迟到的学生领到保安室,对应到Flink,那就是把乱序极大的数据使用侧流输出。关键代码:

OutputTag<WaterSensor> lateTag = new OutputTag<>("late-data", Types.POJO(WaterSensor.class));  //侧流Tag对象
.windowAll(TumblingEventTimeWindows.of(Time.seconds(5)))
.allowedLateness(Time.seconds(3))
.sideOutputLateData(lateTag)  //迟到数据侧流输出

//主流
process.print();
// 从主流获取侧输出流,打印
process.getSideOutput(lateTag).printToErr("关窗后的迟到数据");

完整demo:

public class WatermarkLateDemo {
    public static void main(String[] args) throws Exception {
    
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        
        env.setParallelism(1);

        SingleOutputStreamOperator<WaterSensor> sensorDS = env
                .socketTextStream("node01", 9527)
                .map(new WaterSensorMapFunction());

        WatermarkStrategy<WaterSensor> watermarkStrategy = WatermarkStrategy
                .<WaterSensor>forBoundedOutOfOrderness(Duration.ofSeconds(3))
                .withTimestampAssigner((element, recordTimestamp) -> element.getTs() * 1000L);

        SingleOutputStreamOperator<WaterSensor> sensorDSwithWatermark = sensorDS.assignTimestampsAndWatermarks(watermarkStrategy);


        OutputTag<WaterSensor> lateTag = new OutputTag<>("late-data", Types.POJO(WaterSensor.class));

        SingleOutputStreamOperator<String> process = sensorDSwithWatermark.keyBy(sensor -> sensor.getId())
                .window(TumblingEventTimeWindows.of(Time.seconds(10)))
                .allowedLateness(Time.seconds(2)) // 推迟2s关窗
                .sideOutputLateData(lateTag) // 关窗后的迟到数据,放入侧输出流
                .process(
                        new ProcessWindowFunction<WaterSensor, String, String, TimeWindow>() {

                            @Override
                            public void process(String s, Context context, Iterable<WaterSensor> elements, Collector<String> out) throws Exception {
                                long startTs = context.window().getStart();
                                long endTs = context.window().getEnd();
                                String windowStart = DateFormatUtils.format(startTs, "yyyy-MM-dd HH:mm:ss.SSS");
                                String windowEnd = DateFormatUtils.format(endTs, "yyyy-MM-dd HH:mm:ss.SSS");

                                long count = elements.spliterator().estimateSize();

                                out.collect("key=" + s + "的窗口[" + windowStart + "," + windowEnd + ")包含" + count + "条数据===>" + elements.toString());
                            }
                        }
                );


        process.print();
        // 从主流获取侧输出流,打印
        process.getSideOutput(lateTag).printToErr("关窗后的迟到数据");

        env.execute();
    }
}

执行:

在这里插入图片描述

5、问

如果watermark设置延时等待3s,窗口允许迟到2s,为什么不直接延时等待5s?

答:
  • 首先延时时间不能设置太大,因为这会导致计算延迟太大,失去结果的实时性
  • 其次,窗口允许迟到是对迟到数据的补偿处理,尽量让结果准确,修正结果的
  • 因此,一般延时时间不设置一个较大的值,常为秒级,而允许迟到时间则可以用来处理大部分迟到数据,极端迟到的数据,可使用侧流输出,获取后再做对应的处理

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1134367.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

对mysql的联合索引的深刻理解

背景 对mysql的联合索引的考察是Java程序员面试高频考点&#xff01;必须深刻理解掌握否则容易丢分非常可惜。 技术难点 考察对最左侧匹配原理理解。 原理 暂且不表。网上讲这非常多。我理解就是&#xff0c;B树每个非叶子节点的值都是有序存放索引的值。 比如对A、B、C …

unity 基于UGUI的无限动态滚动列表

基于UGUI的动态滚动列表&#xff0c;主要支持以下功能&#xff1a; 继承自UGUI的SrollRect&#xff0c;支持ScrollRect的所有功能&#xff1b; 使用对象池来管理列表元素&#xff0c;以实现列表元素的复用&#xff1b; 支持一行多个元素或一列多个元素&#xff1b; 可使用不…

漏洞复现--用友 畅捷通T+ .net反序列化RCE

免责声明&#xff1a; 文章中涉及的漏洞均已修复&#xff0c;敏感信息均已做打码处理&#xff0c;文章仅做经验分享用途&#xff0c;切勿当真&#xff0c;未授权的攻击属于非法行为&#xff01;文章中敏感信息均已做多层打马处理。传播、利用本文章所提供的信息而造成的任何直…

互联网Java工程师面试题·Spring篇·第五弹

目录 1、什么是 spring? 2、使用 Spring 框架的好处是什么&#xff1f; 3、Spring 由哪些模块组成? 4、核心容器&#xff08;应用上下文) 模块。 5、BeanFactory – BeanFactory 实现举例。 6、XMLBeanFactory 7、解释 AOP 模块 8、解释 JDBC 抽象和 DAO 模块。 9、…

嵌入式系统设计师考试笔记之操作系统基础复习笔记一

目录 1、嵌入式软件基础 &#xff08;1&#xff09;嵌入式软件的特点&#xff1a; &#xff08;2&#xff09;嵌入式软件分类&#xff1a; &#xff08;3&#xff09;无操作系统的嵌入式软件的两种实现方式&#xff1a; &#xff08;4&#xff09;有操作系统的三大优点&am…

【Java集合类面试二十一】、请介绍TreeMap的底层原理

文章底部有个人公众号&#xff1a;热爱技术的小郑。主要分享开发知识、学习资料、毕业设计指导等。有兴趣的可以关注一下。为何分享&#xff1f; 踩过的坑没必要让别人在再踩&#xff0c;自己复盘也能加深记忆。利己利人、所谓双赢。 面试官&#xff1a;请介绍TreeMap的底层原理…

33基于MATLAB的对RGB图像实现中值滤波,均值滤波,维纳滤波。程序已通过调试,可直接运行。

基于MATLAB的对RGB图像实现中值滤波&#xff0c;均值滤波&#xff0c;维纳滤波。程序已通过调试&#xff0c;可直接运行。 33 MATLAB、图像处理、维纳滤波 (xiaohongshu.com)

【数据挖掘 | 关联性分析】万字长文详解关联性分析,详解Apriori算法为例,确定不来看看?

&#x1f935;‍♂️ 个人主页: AI_magician &#x1f4e1;主页地址&#xff1a; 作者简介&#xff1a;CSDN内容合伙人&#xff0c;全栈领域优质创作者。 &#x1f468;‍&#x1f4bb;景愿&#xff1a;旨在于能和更多的热爱计算机的伙伴一起成长&#xff01;&#xff01;&…

深入探究深度学习、神经网络与卷积神经网络以及它们在多个领域中的应用

目录 1、什么是深度学习&#xff1f; 2、深度学习的思想 3、深度学习与神经网络 4、深度学习训练过程 4.1、先使用自下上升非监督学习&#xff08;就是从底层开始&#xff0c;一层一层的往顶层训练&#xff09; 4.2、后自顶向下的监督学习&#xff08;就是通过带标签的数…

系统架构设计师之系统应用集成

应用集成是指两个或多个应用系统根据业务逻辑的需要而进行的功能之间的相互调用和互操作。应用集成需要在数据集成的基础上完成。应用集成在底层的网络集成和数据集成的基础上实现异构应用系统之间语用层次上的互操作。它们共同构成了实现企业集成化运行最顶层会聚臭成所需要的…

面试题之Vue和React的区别是什么?

一提到前端框架&#xff0c;相信大家都对Vue和React不陌生&#xff0c;这两个前端框架都是比较主流的&#xff0c;用户也都比较多&#xff0c;但是我们在使用这些框架的时候&#xff0c;是否对这两个框架之间的区别有所了解呢&#xff1f;接下来&#xff0c;让我们来一起的系统…

基于ubuntu20.04的 ros2(foxy版本)安装

建议最好参考官方的安装指南 Ubuntu (Debian) — ROS 2 Documentation: Foxy documentation 也可参考下面的步骤 1.安装ros2 &#xff08;1&#xff09;设置编码 sudo apt update sudo apt install locales sudo locale-gen en_US en_US.UTF-8 sudo update-locale LC_AL…

Git基本概念与使用

一、Git基本概念 git&#xff0c;是一种分布式版本控制软件&#xff0c;与CVS、Subversion这类的集中式版本控制工具不同&#xff0c;它采用了分布式版本库的作法&#xff0c;不需要服务器端软件&#xff0c;就可以运作版本控制&#xff0c;使得源代码的发布和交流极其方便。g…

SpringCloud 微服务全栈体系(四)

第六章 Nacos 配置管理 Nacos 除了可以做注册中心&#xff0c;同样可以做配置管理来使用。 一、统一配置管理 当微服务部署的实例越来越多&#xff0c;达到数十、数百时&#xff0c;逐个修改微服务配置就会让人抓狂&#xff0c;而且很容易出错。我们需要一种统一配置管理方案…

将自己本地项目上传到git,增加IDEA操作

文章目录 一、初始化git仓库二、gitee创建仓库三、输入自己仓库的地址四、在添加所修改的文件可能的错误 五、合并需上传文件六、上传参考文档 一、初始化git仓库 在自己的项目中&#xff0c;命令行中输入 git init二、gitee创建仓库 新建仓库 设置仓库参数&#xff0c;设置…

Openssl数据安全传输平台011:秘钥协商服务端

0. 代码仓库 https://github.com/Chufeng-Jiang/OpenSSL_Secure_Data_Transmission_Platform/tree/main/Preparation 编译protobuf类文件 VS2022 protobuf3.17 Message.proto protoc Message.proto --cpp_out./

【每日一题】统计能整除数字的位数

文章目录 Tag题目来源题目解读解题思路方法一&#xff1a;取模运算 其他语言python3 写在最后 Tag 【取模运算】【2023-10-26】 题目来源 2520. 统计能整除数字的位数 题目解读 统计整数的所有数位中&#xff0c;可以被该整数整除的数量总和。 解题思路 方法一&#xff1a;…

Vue图片路径问题(动态引入)

vue项目中我们经常会遇到动态路径的图片无法显示的问题&#xff0c;以下是静态路径和动态路径的常见使用方法。 1.静态路径 在日常的开发中&#xff0c;图片的静态路径通过相对路径和绝对路径的方式引入。 相对路径&#xff1a;以.开头的&#xff0c;例如./、../之类的。就是…

小学数学作业练习册出题网站源码_支持打印转成PDF

小学数学作业练习册出题网站源码_支持打印转成PDF 小学数学出题网页版源码&#xff0c;加减乘除混合运算&#xff0c;支持自定义数字、小数、混合运算&#xff0c;支持加减乘除运算混合多选&#xff08;一道题中同时随机出现加减乘除运算符&#xff09;支持自定义出题数量&…

NSS [SWPUCTF 2021 新生赛]sql

NSS [SWPUCTF 2021 新生赛]sql 很明显是sql&#xff0c;有waf。 参数是wllm get型传参&#xff0c;有回显&#xff0c;单引号闭合&#xff0c;回显位3 跑个fuzz看看waf 过滤了空格 and 报错注入 空格->%09 ->like and->&&爆库&#xff1a;test_db -1%27uni…