Flink解决延迟数据问题

news2024/11/27 10:50:13

总结:

水印:对于迟到数据不长

allowedLateness: 迟到时间很长

侧道输出:对于迟到时间特别长

对于延迟数据的理解:

水印机制(水位线、watermark)机制可以帮助我们在短期延迟下,允许乱序数据的到来。

这个机制很好的处理了那些因为网络等情况短期延迟的数据,让窗口等它们一会儿。

但是水印机制无法长期的等待下去,因为水印机制简单说就是让窗口一直等在那里,等达到水印时间才会触发计算和关闭窗口

这个等待不能一直等,因为会一直缓着数据不计算。

一般水印也就是几秒钟最多几分钟而已(看业务)

那么,在现实世界中,延迟数据除了有短期延迟外,长期延迟也是很常见的。

比如:

l 客户端断网,等了好几个小时才恢复

l 车联网系统进入隧道后没有信号无法上报数据

l 手机欠费没有网

等等,这些场景下数据的迟到就不是简单的网络堵塞造成的几秒延迟了

而是小时、天级别的延迟

对于水印来说,这样的长期延迟数据是无法很好处理的。

那么有没有什么办法去处理这些长期延迟的数据呢?让其可以找到其所属的窗口正常完成计算,哪怕晚了几个小时。

这个场景的解决方式就是:延迟数据处理机制(allowedLateness方法)

水印:乱序数据处理(时间很短的延迟)

延迟处理:长期延迟数据的处理机制。

延迟数据的处理:

waterMark和Window机制解决了流式数据的乱序问题,对于因为延迟而顺序有误的数据,可以根据eventTime进行业务处理,对于延迟的数据Flink也有自己的解决办法,

主要的办法是给定一个允许延迟的时间,在该时间范围内仍可以接受处理延迟数据

设置允许延迟的时间是通过allowedLateness(lateness: Time)设置

保存延迟数据则是通过sideOutputLateData(outputTag: OutputTag[T])保存

获取延迟数据是通过DataStream.getSideOutput(tag: OutputTag[X])获取

1)allowedLateness(lateness: Time)

当我们对流设置窗口后得到的WindowedSteam对象就可以使用allowedLateness方法

该方法传入一个Time值,设置允许的长期延迟(迟到)的时间。

和watermark不同。

未设置allowedLateness(为0),当watermark满足条件,会触发窗口的 执行 + 关闭

当设置了allowedLateness,当watermark满足条件后,只会触发窗口的执行,不会触发窗口关闭

也就是,watermark满足条件后会正常触发窗口计算,将已有的数据完成计算。

但是,不会关闭窗口。如果在allowedLateness允许的时间内仍有这个窗口的数据进来,那么每进来一条,会和已经计算过的(被watermark触发的)数据一起在计算一次

水印:短期延迟,达到条件后触发计算并且关闭窗口(触发+关闭同时进行)

水印+allowedLateness : 短期延迟+ 等待长期延迟效果, 达到水印条件后,会触发窗口计算,但是不关闭窗口。事件时间延迟达到水印+allowedLateness之和后会关闭窗口。

2) 侧道输出-SideOutput

Flink 通过watermark在短时间内允许了乱序到来的数据

通过延迟数据处理机制,可以处理长期迟到的数据。

但总有那么些数据来的晚的太久了。允许迟到1天的设置,它迟到了2天才来。

对于这样的迟到数据,水印无能为力,设置allowedLateness也无能为力,那对于这样的数据Flink就只能任其丢掉了吗?

不会,Flink的两个迟到机制尽量确保了数据不会错过了属于他们的窗口,但是真的迟到太久了,Flink也有一个机制将这些数据收集起来

保存成为一个DataStream,然后,交由开发人员自行处理。

那么这个机制就叫做侧输出机制(Side Output)

侧输出机制:可以将错过水印又错过allowedLateness允许的时间的数据,单独的存放到一个DataStream中,然后开发人员可以自定逻辑对这些超级迟到数据进行处理。

处理主要使用两个方式:

对窗口对象调用sideOutputLateData(OutputTag outputTag)方法,将数据存储到一个地方

对DataStream对象调用getSideOutput(OutputTag outputTag)方法,取出这些被单独处理的数据的DataStream

注意,取到的是一个DataStream,这意味着你可以对这些超级迟到数据继续写 如keyBy, window等处理逻辑。

sideOutputLateData方法:


使用方式:
先定义OutputTag对象(注意,必须new一个匿名内部类形式的OutputTag对象的实例)
然后调用sideOutputLateData方法
// side output   OutputTag对象必须是匿名内部类的形式创建出来, 本质上得到的是OutputTag对象的一个匿名子类
OutputTag<Tuple2<String, Long>> outputTag = new OutputTag<Tuple2<String, Long>>("side output"){};
WindowedStream<Tuple2<String, Long>, Tuple, TimeWindow> sideOutputLateData =
        allowedLateness.sideOutputLateData(outputTag);

DataStream.getSideOutput方法:

用法:
DataStream<Tuple2<String, Long>> sideOutput = result.getSideOutput(outputTag);
// 对得到的保存超级迟到数据的DataStream进行处理
sideOutput.print("late>>>");

代码演示

使用Watermark + AllowedLateness + SideOutput ,即使用侧道输出机制来单独收集延迟/迟到/乱序严重的数据,避免数据丢失!

package com.bigdata.day05;


import com.bigdata.day04.OrderInfo;
import org.apache.commons.lang3.time.DateFormatUtils;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;

import java.text.SimpleDateFormat;
import java.time.Duration;
import java.util.Date;
import java.util.Random;
import java.util.UUID;


class MyOrderSource2 implements SourceFunction<OrderInfo> {

    @Override
    public void run(SourceContext<OrderInfo> ctx) throws Exception {


        Random random = new Random();
        while(true){
            OrderInfo orderInfo = new OrderInfo();
            orderInfo.setOrderId(UUID.randomUUID().toString().replace("-",""));
            // 在这个地方可以模拟迟到数据
            long orderTime = System.currentTimeMillis() - 1000*random.nextInt(100);

            orderInfo.setOrdertime(orderTime);
            int money = random.nextInt(10);

            System.out.println("订单产生的时间:"+ DateFormatUtils.format(orderTime,"yyyy-MM-dd HH:mm:ss")+",金额:"+money);



            orderInfo.setMoney(money);

            orderInfo.setUserId(random.nextInt(2));
            ctx.collect(orderInfo);
            Thread.sleep(500);
        }

    }

    @Override
    public void cancel() {

    }
}
public class Demo01 {



    public static void main(String[] args) throws Exception {

        //1. env-准备环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
        env.setParallelism(1);

        // 每隔五秒统计每个用户的前面5秒的订单的总金额
        //2. source-加载数据
        DataStreamSource<OrderInfo> streamSource = env.addSource(new MyOrderSource2());

        //-2.告诉Flink最大允许的延迟时间/乱序时间为多少
        SingleOutputStreamOperator<OrderInfo> orderDSWithWatermark = streamSource.assignTimestampsAndWatermarks(
                WatermarkStrategy.<OrderInfo>forBoundedOutOfOrderness(Duration.ofSeconds(3))
                        //-3.告诉Flink哪一列是事件时间
                        .withTimestampAssigner((order, time) -> order.getOrdertime())
        );

        OutputTag<OrderInfo> outputTag = new OutputTag<OrderInfo>("side output"){};

        //3. transformation-数据处理转换
        SingleOutputStreamOperator<String> result = orderDSWithWatermark.keyBy(orderInfo -> orderInfo.getUserId()).
                window(TumblingEventTimeWindows.of(Time.seconds(5)))
                .allowedLateness(Time.seconds(4))
                .sideOutputLateData(outputTag)
                .apply(new WindowFunction<OrderInfo, String, Integer, TimeWindow>() {
                    @Override
                    public void apply(Integer key,  // 代表分组key值     五旬老太守国门
                                      TimeWindow window, // 代表窗口对象
                                      Iterable<OrderInfo> input, // 分组过之后的数据 [1,1,1,1,1]
                                      Collector<String> out  // 用于输出的对象
                    ) throws Exception {
                        SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
                        long start = window.getStart();
                        long end = window.getEnd();
                        int sum = 0;
                        // 专门存放迟到的订单时间

                        for (OrderInfo orderInfo : input) {

                            sum += orderInfo.getMoney();
                        }

                        out.collect(key + ",窗口开始:" + dateFormat.format(new Date(start)) + ",结束时间:" + dateFormat.format(new Date(end)) + "," + sum);
                        //out.collect(key+",窗口开始:"+start+",结束时间:"+end+","+sum);

                    }
                });
        result.print("流中的数据,包含迟到的数据:");
        result.getSideOutput(outputTag).print("严重迟到的数据:");
        //4. sink-数据输出



        //5. execute-执行
        env.execute();
    }
}

虽然我们添加了延迟的效果,就是说侧道输出,侧道输出不能触发窗口的执行,窗口的执行只能通过水印时间触发 ,允许迟到的数据,不放入到当前窗口中,而是作为一个触发条件看到,它需要放入到它对应的窗口中。

只考虑 1 个并行度的问题

订单发生的真实事件:窗口5秒,间隔5秒,允许迟到 3秒 最晚允许迟到4秒

10:44:00 第一个区间就应该是10:44:00 10:44:05

10:44:01

10:44:02

10:44:03

10:44:04

10:44:05

10:44:07 第一个区间就应该是10:44:05 10:44:10

10:44:22 第一个区间就应该是10:44:20 10:44:25

10:44:30

10:44:28

10:44:20

通过上面这个图可以知道,44:07没有办法触发00~05的执行,但是07不放入00~05区间,而是放入10:44:05 10:44:10

44:22 一个数据触发了两个区间的执行 00~05 05~10

假如有一个订单时44:10产生的,放入哪个区间?应该放入10~15这个区间

 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2248361.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

【计网】自定义协议与序列化(一) —— Socket封装于服务器端改写

&#x1f30e; 应用层自定义协议与序列化 文章目录&#xff1a; Tcp协议Socket编程 应用层简介 序列化和反序列化       重新理解read/write/recv/send及tcp的全双工       Socket封装       服务器端改写 &#x1f680;应用层简介 我们程序员写的一个个解决…

鸿蒙动画开发07——粒子动画

1、概 述 粒子动画是在一定范围内随机生成的大量粒子产生运动而组成的动画。 动画元素是一个个粒子&#xff0c;这些粒子可以是圆点、图片。我们可以通过对粒子在颜色、透明度、大小、速度、加速度、自旋角度等维度变化做动画&#xff0c;来营造一种氛围感&#xff0c;比如下…

C语言学习 12(指针学习1)

一.内存和地址 1.内存 在讲内存和地址之前&#xff0c;我们想有个⽣活中的案例&#xff1a; 假设有⼀栋宿舍楼&#xff0c;把你放在楼⾥&#xff0c;楼上有100个房间&#xff0c;但是房间没有编号&#xff0c;你的⼀个朋友来找你玩&#xff0c;如果想找到你&#xff0c;就得挨…

【pyspark学习从入门到精通19】机器学习库_2

目录 估计器 分类 回归 聚类 管道 估计器 估计器可以被看作是需要估算的统计模型&#xff0c;以便对您的观测值进行预测或分类。 如果从抽象的 Estimator 类派生&#xff0c;新模型必须实现 .fit(...) 方法&#xff0c;该方法根据在 DataFrame 中找到的数据以及一些默认或…

结构方程模型(SEM)入门到精通:lavaan VS piecewiseSEM、全局估计/局域估计;潜变量分析、复合变量分析、贝叶斯SEM在生态学领域应用

目录 第一章 夯实基础 R/Rstudio简介及入门 第二章 结构方程模型&#xff08;SEM&#xff09;介绍 第三章 R语言SEM分析入门&#xff1a;lavaan VS piecewiseSEM 第四章 SEM全局估计&#xff08;lavaan&#xff09;在生态学领域高阶应用 第五章 SEM潜变量分析在生态学领域…

JQuery -- 第九课

文章目录 前言一、JQuery是什么&#xff1f;二、JQuery的使用步骤1.引入2.书写位置3. 表示方法 三、JQuery选择器1.层级选择器2. 筛选选择器3. 排他思想4. 精品展示 四、jQuery样式操作1. 修改样式2.类操作1. 添加2. 移除3. 切换 五、jQuery动画1. 显示和隐藏2. 滑动1. slide2.…

无人机探测:光电侦测核心技术算法详解!

核心技术 双光谱探测跟踪&#xff1a; 可见光成像技术&#xff1a;利用无人机表面反射的自然光或主动光源照射下的反射光&#xff0c;通过高灵敏度相机捕捉图像。该技术适用于日间晴朗天气下的无人机探测&#xff0c;具有直观、易于识别目标的特点。 红外成像技术&#xff1…

Java使用replaceAll替换时不使用正则表达式

前言 public String replaceAll(String regex, String replacement) {return Pattern.compile(regex).matcher(this).replaceAll(replacement);}在使用String.replaceAll() 方法时&#xff0c;由于入参时regex &#xff0c;而入参刚好是正则表达式的字符该怎么办&#xff1f;我…

计算机毕业设计Hadoop+Spark音乐推荐系统 音乐预测系统 音乐可视化大屏 音乐爬虫 HDFS hive数据仓库 机器学习 深度学习 大数据毕业设计

温馨提示&#xff1a;文末有 CSDN 平台官方提供的学长联系方式的名片&#xff01; 温馨提示&#xff1a;文末有 CSDN 平台官方提供的学长联系方式的名片&#xff01; 温馨提示&#xff1a;文末有 CSDN 平台官方提供的学长联系方式的名片&#xff01; 作者简介&#xff1a;Java领…

途普科技企业知识中台完成华为昇思MindSpore技术认证

近日&#xff0c;北京途普科技有限公司&#xff08;以下简称“途普科技”&#xff09;作为华为昇腾大模型方向的应用软件伙伴&#xff0c;核心产品企业知识中台已成功与华为AI框架昇思MindSpore完成相互兼容性认证。这一成就标志着途普科技在AI领域与华为的合作进一步加深&…

自由学习记录(25)

只要有修改&#xff0c;子表就不用元表的参数了&#xff0c;用自己的参数&#xff08;只不过和元表里的那个同名&#xff09; 子表用__index“继承”了父表的值&#xff0c;此时子表仍然是空表 一定是创建这样一个同名的变量在原本空空的子表里&#xff0c; 传参要传具体的变…

【Nginx】核心概念与安装配置解释

文章目录 1. 概述2. 核心概念2.1.Http服务器2.2.反向代理2.3. 负载均衡 3. 安装与配置3.1.安装3.2.配置文件解释3.2.1.全局配置块3.2.2.HTTP 配置块3.2.3.Server 块3.2.4.Location 块3.2.5.upstream3.2.6. mine.type文件 3.3.多虚拟主机配置 4. 总结 1. 概述 Nginx是我们常用的…

AIGC-----AIGC在虚拟现实中的应用前景

AIGC在虚拟现实中的应用前景 引言 随着人工智能生成内容&#xff08;AIGC&#xff09;的快速发展&#xff0c;虚拟现实&#xff08;VR&#xff09;技术的应用也迎来了新的契机。AIGC与VR的结合为创造沉浸式体验带来了全新的可能性&#xff0c;这种组合不仅极大地降低了VR内容的…

学习笔记035——MySQL索引

数据库索引 索引是为了提高数据的查询速度&#xff0c;相当于给数据进行编号&#xff0c;在查找数据的时候就可以通过编号快速找到对应的数据。 索引内部数据结构&#xff1a;B Tree 主键自带索引。 如&#xff1a; insert into user (id, name) values (1,f); insert int…

C语言数据结构-链表

C语言数据结构-链表 1.单链表1.1概念与结构1.2结点3.2 链表性质1.3链表的打印1.4实现单链表1.4.1 插入1.4.2删除1.4.3查找1.4.4在指定位置之前插入或删除1.4.5在指定位置之后插入或删除1.4.6删除指定位置1.4.7销毁链表 2.链表的分类3.双向链表3.1实现双向链表3.1.1尾插3.1.2头插…

计算机网络 网络安全基础——针对实习面试

目录 网络安全基础你了解被动攻击吗&#xff1f;你了解主动攻击吗&#xff1f;你了解病毒吗&#xff1f;说说基本的防护措施和安全策略&#xff1f; 网络安全基础 网络安全威胁是指任何可能对网络系统造成损害的行为或事件。这些威胁可以是被动的&#xff0c;也可以是主动的。…

上海乐鑫科技一级代理商飞睿科技,ESP32-C61高性价比WiFi6芯片高性能、大容量

在当今快速发展的物联网市场中&#xff0c;无线连接技术的不断进步对智能设备的性能和能效提出了更高要求。为了满足这一需求&#xff0c;乐鑫科技推出了ESP32-C61——一款高性价比的Wi-Fi 6芯片&#xff0c;旨在为用户设备提供更出色的物联网性能&#xff0c;并满足智能设备连…

初识java(2)

大家好&#xff0c;今天我们来讲讲java中的数据类型。 java跟我们的c语言的数据类型有一些差别&#xff0c;那么接下来我们就来看看。 一.字面常量&#xff0c;其中&#xff1a;199&#xff0c;3.14&#xff0c;‘a’&#xff0c;true都是常量将其称为字面常量。&#xff08;…

MMCM DRP动态配置方法(超详细讲解)

一、MMCM 源语介绍 1、调用源语 2、调用Clocking Wizard IP 调用Clocking Wizard IP核选择使用MMCM资源时&#xff0c;IP内部也是调用的MMCM源语。 Clocking Wizard IP中启用MMCM DRP接口方法&#xff1a; 在Clocking Wizard IP中设置分频倍频系数方法&#xff1a; IP核中生…

对于GC方面,在使用Elasticsearch时要注意什么?

大家好&#xff0c;我是锋哥。今天分享关于【对于GC方面&#xff0c;在使用Elasticsearch时要注意什么&#xff1f;】面试题。希望对大家有帮助&#xff1b; 对于GC方面&#xff0c;在使用Elasticsearch时要注意什么&#xff1f; 1000道 互联网大厂Java工程师 精选面试题-Java…