Flink学习连载文章8--时间语义

news2025/1/15 10:29:15

Time的分类 (时间语义)

EventTime:事件(数据)时间,是事件/数据真真正正发生时/产生时的时间

IngestionTime:摄入时间,是事件/数据到达流处理系统的时间

ProcessingTime:处理时间,是事件/数据被处理/计算时的系统的时间

EventTime的重要性

假设,你正在去往地下停车场的路上,并且打算用手机点一份外卖。选好了外卖后,你就用在线支付功能付款了,这个时候是11点59分(EventTime)。恰好这时,你走进了地下停车库,而这里并没有手机信号。因此外卖的在线支付并没有立刻成功,而支付系统一直在Retry重试“支付”这个操作。
当你找到自己的车并且开出地下停车场的时候,已经是12点01分了(ProcessingTime)。这个时候手机重新有了信号,手机上的支付数据成功发到了外卖在线支付系统,支付完成。

在上面这个场景中你可以看到,
支付数据的事件时间是11点59分,而支付数据的处理时间是12点01分

问题:
如果要统计12之前的订单金额,那么这笔交易是否应被统计?
答案:
应该被统计,因为该数据的真真正正的产生时间为11点59分,即该数据的事件时间为11点59分,
事件时间能够真正反映/代表事件的本质! 所以一般在实际开发中会以事件时间作为计算标准。

还可以通过钉钉打卡、饭卡机 等 举例子。

一条错误日志的内容为:
2020-11-11 23:59:58 error NullPointExcep --事件时间
进入Flink的时间为2020-11-11 23:59:59      --摄入时间
到达Window的时间为2020-11-12 00:00:01     --处理时间
问题:
对于业务来说,要统计每天的的故障日志个数,哪个时间是最有意义的?
答案:
EventTime事件时间,因为bug真真正正产生的时间就是事件时间,只有事件时间才能真正反映/代表事件的本质! 

总结:
1.事件时间确实重要, 因为它能够代表事件/数据的本质,是事件/数据真真正正发生/产生的时间
2.按照事件时间进去处理/计算,会存在一定的难度, 因为数据可能会因为网路延迟等原因, 发生乱序或延迟到达, 那么最后的计算结果就有可能错误或数据丢失
3.需要有技术来解决上面的问题,使用Watermark技术来解决! 

Watermark是什么?-水印,水位线

为什么会有WaterMark?

当flink 以 EventTime 模式处理流数据时,它会根据数据里的时间戳来处理基于时间的算子。但是由于网络、分布式等原因,会导致数据乱序的情况。如下图所示

只要使用event time,就必须使用watermark,在上游指定,比如:source、map算子后.

Watermark的核心本质可以理解成一个延迟触发机制。

我们知道,流处理从事件产生,到流经source,再到operator,中间是有一个过程和时间的,虽然大部分情况下,流到operator的数据都是按照事件产生的时间顺序来的,但是也不排除由于网络等原因,导致乱序的产生,所谓乱序,就是指Flink接收到的事件的先后顺序不是严格按照事件的Event Time顺序排列的。

Watermark就是给数据额外添加的一列时间戳!

Watermark = 当前最大的事件时间 - 最大允许的延迟时间(或最大允许的乱序度时间)

假如明天出去玩,09:00集合,最多允许迟到10分钟。
08:50 胜赛来了    08:50 - 10 = 08:40 
09:05 步迅来了    09:05 - 10 = 08:55 
09:35 青林来了    watermark = 09:35 - 10 = 09:25
能否上到车上的条件是:watermark <= 时间点

Watermark能解决什么问题,如何解决的?

有了Watermark 就可以在一定程度上解决数据乱序或延迟达到问题!

不添加watermark ,窗口如何触发:

1)窗口有数据

2)窗口的结束时间到了。

班车:到了时间点立即发车,来了数据也不要。

有了Watermark就可以根据Watermark来决定窗口的触发时机,满足下面的条件才触发:

1.窗口有数据

2.Watermark >= 窗口的结束时间

满足以上条件则触发窗口计算!

以前窗口触发:系统时间到了窗口结束时间就触发

现在窗口触发:Watermark >= 窗口的结束时间

而Watermark = 当前最大的事件时间 - 最大允许的延迟时间(或最大允许的乱序度时间)

就意味着, 通过Watermark改变了窗口的触发时机了, 那么接下来我们看如何改变的/如何解决前面的问题的

需要记住:

Watermark = 当前最大的事件时间 - 最大允许的延迟时间(或最大允许的乱序度时间)

窗口触发时机 : Watermark >= 窗口的结束时间

水印(watermark)就是一个时间戳,Flink可以给数据流添加水印,可以理解为:收到一条消息后,额外给这个消息添加了一个时间字段,这就是添加水印,一般人为添加的消息的水印都会比当前消息的事件时间一些

窗口是否关闭,按照水印时间来判断,但原有事件时间不会被修改,窗口的边界依旧是事件时间来决定。

  • 水印并不会影响原有Eventtime
  • 当数据流添加水印后,会按照水印时间来触发窗口计算
  • 一般会设置水印时间,比Eventtime小一些(一般几秒钟)
  • 当接收到的水印时间>= 窗口的endTime且窗口内有数据,则触发计算

水印(水印时间)的计算:事件时间– 设置的水印长度 = 水印时间

比如,事件时间是10分30秒, 水印长度是2秒,那么水印时间就是10分28秒

Watermark图解原理

总结:

Watermark 是一个单独计算出来的时间戳
Watermark = 当前最大的事件时间 - 最大允许的延迟时间(乱序度)
Watermark可以通过改变窗口的触发时机 在 一定程度上解决数据乱序或延迟达到的问题
Watermark >= 窗口结束时间 时 就会触发窗口计算(窗口中得有数据)
延迟或乱序严重的数据还是丢失, 但是可以通过调大 最大允许的延迟时间(乱序度) 来解决, 或 使用后面要学习的侧道输出流来单独收集延迟或乱序严重的数据,保证数据不丢失!

多并行度的水印触发

在多并行度下,每个并行有一个水印

比如并行度是6,那么程序中就有6个watermark

分别属于这6个并行度(线程)

那么,触发条件以6个水印中最小的那个为准

比如, 有个窗口是0-5

其中5个并行度的水印都超过了5

但有一个并行度的水印是3

那么,不管另外5个并行度中的水印达到了多大,都不会触发

因为6个并行度中的6个水印,最小的是3,不满足大于等于窗口结束5的条件

在测试水印的时候,记得把并行度设置为1 ,好看结果,否则,结果不太容易看出来。

Watermark代码演示

需求

实时模拟生成订单数据,格式为: (订单ID,用户ID,时间戳/事件时间,订单金额)

要求每隔5s,计算5秒内,每个用户的订单总金额

并添加Watermark来解决一定程度上的数据延迟和数据乱序问题。

不使用水印的时候【不能使用eventtime时间语义】,进行开发:

package com.bigdata.day05;



import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;

import java.time.Duration;
import java.util.Random;
import java.util.UUID;

/**
 * @基本功能:
 * @program:FlinkDemo
 * @author: 闫哥
 * @create:2023-11-23 10:07:21
 *
 * 实时模拟生成订单数据,格式为: (订单ID,用户ID,时间戳/事件时间,订单金额)
 * 要求每隔5s,计算5秒内,每个用户的订单总金额
 * 并添加Watermark来解决一定程度上的数据延迟和数据乱序问题。
 **/


public class _01WatermarkDemo {

    @Data  // set get toString
    @AllArgsConstructor
    @NoArgsConstructor
    public static class OrderInfo2{
        private String orderId;
        private int uid;
        private int money;
        private long timeStamp;
    }
    public static class MySource implements SourceFunction<OrderInfo2> {
        boolean flag = true;

        @Override
        public void run(SourceContext ctx) throws Exception {
            // 源源不断的产生数据
            Random random = new Random();
            while(flag){
                OrderInfo2 orderInfo = new OrderInfo2();
                orderInfo.setOrderId(UUID.randomUUID().toString());
                orderInfo.setUid(random.nextInt(3));
                orderInfo.setMoney(random.nextInt(101));
                orderInfo.setTimeStamp(System.currentTimeMillis());
                ctx.collect(orderInfo);
                Thread.sleep(1000);// 间隔1s
            }
        }

        // source 停止之前需要干点啥
        @Override
        public void cancel() {
            flag = false;
        }
    }

    public static void main(String[] args) throws Exception {

        //1. env-准备环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);

        //2. source-加载数据
        DataStreamSource<OrderInfo2> orderSourceStream = env.addSource(new MySource());

        //3. transformation-数据处理转换
        // 每个用户的订单总额
        KeyedStream<OrderInfo2, Integer> keyedStream = orderDSWithWatermark.keyBy(orderInfo2 -> orderInfo2.getUid());
        
        SingleOutputStreamOperator<OrderInfo2> result1 = keyedStream.window(TumblingProcessingTimeWindows.of(Time.seconds(5))).sum("money");

        result1.print();
        //4. sink-数据输出
        //5. execute-执行
        env.execute();
    }
}

假如出现如下错误:

Exception in thread "main" org.apache.flink.api.common.typeutils.CompositeType$InvalidFieldReferenceException: Cannot reference field by field expression on GenericType<com.bigdata.day05.OrderInfo2>Field expressions are only supported on POJO types, tuples, and case classes. (See the Flink documentation on what is considered a POJO.)
	at org.apache.flink.streaming.util.typeutils.FieldAccessorFactory.getAccessor(FieldAccessorFactory.java:224)
	at org.apache.flink.streaming.api.functions.aggregation.SumAggregator.<init>(SumAggregator.java:53)
	at org.apache.flink.streaming.api.datastream.WindowedStream.sum(WindowedStream.java:688)
	at com.bigdata.day05._01WatermarkDemo.main(_01WatermarkDemo.java:103)

说明这个pojo 必须是public 的,否则不解析。

修正过的代码:

package com.bigdata.time;

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;

@Data  // set get toString
@AllArgsConstructor
@NoArgsConstructor
public class OrderInfo{
    private String orderId;
    private int uid;
    private int money;
    private long timeStamp;
}
package com.bigdata.time;


import org.apache.commons.lang3.time.DateFormatUtils;
import org.apache.commons.lang3.time.DateUtils;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;

import java.util.Date;
import java.util.Random;
import java.util.UUID;

/**
 * @基本功能:
 * @program:FlinkDemo
 * @author: 闫哥
 * @create:2024-11-26 10:30:28
 **/

class MySource extends RichSourceFunction<OrderInfo> {

    boolean flag = true;
    @Override
    public void run(SourceContext<OrderInfo> ctx) throws Exception {

        while(flag){
            OrderInfo orderInfo = new OrderInfo();

            Random random = new Random();
            int userId = random.nextInt(10);
            int money = random.nextInt(100);
            long timeStamp = System.currentTimeMillis() - random.nextInt(3000);

            orderInfo.setOrderId(UUID.randomUUID().toString());
            orderInfo.setUid(userId);
            orderInfo.setMoney(money);
            orderInfo.setTimeStamp(timeStamp);

            ctx.collect(orderInfo);
            Thread.sleep(20);
        }


    }

    @Override
    public void cancel() {
        flag = false;
    }
}
public class _01_OrderDemo {

    public static void main(String[] args) throws Exception {

        //1. env-准备环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
        env.setParallelism(1);

        //2. source-加载数据
        DataStreamSource<OrderInfo> streamSource = env.addSource(new MySource());

        //3. transformation-数据处理转换
        streamSource.keyBy(new KeySelector<OrderInfo, Integer>() {
            @Override
            public Integer getKey(OrderInfo orderInfo) throws Exception {
                return orderInfo.getUid();
            }
        }).window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
                //.sum("money").print();
                        .apply(new WindowFunction<OrderInfo, String, Integer, TimeWindow>() {
                            @Override
                            public void apply(Integer userId, TimeWindow window, Iterable<OrderInfo> input, Collector<String> out) throws Exception {
                                String beginTime = DateFormatUtils.format(window.getStart(), "yyyy-MM-dd HH:mm:ss");
                                String endTime = DateFormatUtils.format(window.getEnd(), "yyyy-MM-dd HH:mm:ss");
                                int sum = 0;
                                for (OrderInfo orderInfo : input) {
                                    sum += orderInfo.getMoney();
                                }
                                out.collect(beginTime+","+endTime+","+userId +","+sum);
                            }
                        }).print();
        //4. sink-数据输出


        //5. execute-执行
        env.execute();
    }
}

需求升级:

实时模拟生成订单数据,格式为: (订单ID,用户ID,时间戳/事件时间,订单金额)

* 要求每隔5s,计算5秒内,每个用户的订单总金额

* 并添加Watermark来解决一定程度上的数据延迟和数据乱序问题。

假如你添加了 eventTime 缺没有添加水印的代码,会报如下错误:

Caused by: java.lang.RuntimeException: Record has Long.MIN_VALUE timestamp (= no timestamp marker). Is the time characteristic set to 'ProcessingTime', or did you forget to call 'DataStream.assignTimestampsAndWatermarks(...)'?
	at org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows.assignWindows(TumblingEventTimeWindows.java:83)
	at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.processElement(WindowOperator.java:302)

代码演示-开发版

生成 Watermark | Apache Flink

package com.bigdata.time;


import org.apache.commons.lang3.time.DateFormatUtils;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.eventtime.*;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;

import java.time.Duration;

/**
 * @基本功能:
 * @program:FlinkDemo
 * @author: 闫哥
 * @create:2024-11-26 10:30:28
 **/

public class _02_OrderDemoWithWaterMark {

    public static void main(String[] args) throws Exception {

        //1. env-准备环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
        env.setParallelism(1);

        //2. source-加载数据
        DataStreamSource<OrderInfo> streamSource = env.addSource(new MySource());

        //3. transformation-数据处理转换
        streamSource.assignTimestampsAndWatermarks(WatermarkStrategy.<OrderInfo>forBoundedOutOfOrderness(Duration.ofSeconds(3)).withTimestampAssigner(
                        new SerializableTimestampAssigner<OrderInfo>() {
                            // long 是时间戳吗?是秒值还是毫秒呢?年月日时分秒的的字段怎么办呢?
                            @Override
                            public long extractTimestamp(OrderInfo orderInfo, long recordTimestamp) {
                                // 这个方法的返回值是毫秒,所有的数据只要不是这个毫秒值,都需要转换为毫秒
                                return orderInfo.getTimeStamp();
                            }
                        }
                ))
                .keyBy(new KeySelector<OrderInfo, Integer>() {
            @Override
            public Integer getKey(OrderInfo orderInfo) throws Exception {
                return orderInfo.getUid();
            }
        }).window(TumblingEventTimeWindows.of(Time.seconds(5)))
                //.sum("money").print();
                        .apply(new WindowFunction<OrderInfo, String, Integer, TimeWindow>() {
                            @Override
                            public void apply(Integer userId, TimeWindow window, Iterable<OrderInfo> input, Collector<String> out) throws Exception {
                                String beginTime = DateFormatUtils.format(window.getStart(), "yyyy-MM-dd HH:mm:ss");
                                String endTime = DateFormatUtils.format(window.getEnd(), "yyyy-MM-dd HH:mm:ss");
                                int sum = 0;
                                for (OrderInfo orderInfo : input) {
                                    sum += orderInfo.getMoney();
                                }
                                out.collect(beginTime+","+endTime+","+userId +","+sum);
                            }
                        }).print();
        //4. sink-数据输出


        //5. execute-执行
        env.execute();
    }
}

通过静态方法forBoundedOutOfOrderness提供,入参接收一个Duration类型的时间间隔,也就是我们可以接受的最大的延迟时间.使用这种延迟策略的时候需要我们对数据的延迟时间有一个大概的预估判断。

WatermarkStrategy#forBoundedOutOfOrderness(Duration maxOutOfOrderness)

我们实现一个延迟3秒的固定延迟水印,可以这样做:

DataStream dataStream = ...... ;
dataStream.assignTimestampsAndWatermarks(WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(3)));

Flink对于迟到数据的处理

水印:对于迟到数据不长

allowedLateness: 迟到时间很长

侧道输出:对于迟到时间特别长

对于延迟数据的理解:

水印机制(水位线、watermark)机制可以帮助我们在短期延迟下,允许乱序数据的到来。

这个机制很好的处理了那些因为网络等情况短期延迟的数据,让窗口等它们一会儿。

但是水印机制无法长期的等待下去,因为水印机制简单说就是让窗口一直等在那里,等达到水印时间才会触发计算和关闭窗口

这个等待不能一直等,因为会一直缓着数据不计算。

一般水印也就是几秒钟最多几分钟而已(看业务)

那么,在现实世界中,延迟数据除了有短期延迟外,长期延迟也是很常见的。

比如:

l 客户端断网,等了好几个小时才恢复

l 车联网系统进入隧道后没有信号无法上报数据

l 手机欠费没有网

等等,这些场景下数据的迟到就不是简单的网络堵塞造成的几秒延迟了

而是小时、天级别的延迟

对于水印来说,这样的长期延迟数据是无法很好处理的。

那么有没有什么办法去处理这些长期延迟的数据呢?让其可以找到其所属的窗口正常完成计算,哪怕晚了几个小时。

这个场景的解决方式就是:延迟数据处理机制(allowedLateness方法)

水印:乱序数据处理(时间很短的延迟)

延迟处理:长期延迟数据的处理机制。

延迟数据的处理:

waterMark和Window机制解决了流式数据的乱序问题,对于因为延迟而顺序有误的数据,可以根据eventTime进行业务处理,对于延迟的数据Flink也有自己的解决办法,

主要的办法是给定一个允许延迟的时间,在该时间范围内仍可以接受处理延迟数据

设置允许延迟的时间是通过allowedLateness(lateness: Time)设置

保存延迟数据则是通过sideOutputLateData(outputTag: OutputTag[T])保存

获取延迟数据是通过DataStream.getSideOutput(tag: OutputTag[X])获取

1)allowedLateness(lateness: Time)

当我们对流设置窗口后得到的WindowedSteam对象就可以使用allowedLateness方法

该方法传入一个Time值,设置允许的长期延迟(迟到)的时间。

和watermark不同。

未设置allowedLateness(为0),当watermark满足条件,会触发窗口的 执行 + 关闭

当设置了allowedLateness,当watermark满足条件后,只会触发窗口的执行,不会触发窗口关闭

也就是,watermark满足条件后会正常触发窗口计算,将已有的数据完成计算。

但是,不会关闭窗口。如果在allowedLateness允许的时间内仍有这个窗口的数据进来,那么每进来一条,会和已经计算过的(被watermark触发的)数据一起在计算一次

水印:短期延迟,达到条件后触发计算并且关闭窗口(触发+关闭同时进行)

水印+allowedLateness : 短期延迟+ 等待长期延迟效果, 达到水印条件后,会触发窗口计算,但是不关闭窗口。事件时间延迟达到水印+allowedLateness之和后会关闭窗口。

2) 侧输出-SideOutput

Flink 通过watermark在短时间内允许了乱序到来的数据

通过延迟数据处理机制,可以处理长期迟到的数据。

但总有那么些数据来的晚的太久了。允许迟到1天的设置,它迟到了2天才来。

对于这样的迟到数据,水印无能为力,设置allowedLateness也无能为力,那对于这样的数据Flink就只能任其丢掉了吗?

不会,Flink的两个迟到机制尽量确保了数据不会错过了属于他们的窗口,但是真的迟到太久了,Flink也有一个机制将这些数据收集起来

保存成为一个DataStream,然后,交由开发人员自行处理。

那么这个机制就叫做侧输出机制(Side Output)

侧输出机制:可以将错过水印又错过allowedLateness允许的时间的数据,单独的存放到一个DataStream中,然后开发人员可以自定逻辑对这些超级迟到数据进行处理。

处理主要使用两个方式:

对窗口对象调用sideOutputLateData(OutputTag outputTag)方法,将数据存储到一个地方

对DataStream对象调用getSideOutput(OutputTag outputTag)方法,取出这些被单独处理的数据的DataStream

注意,取到的是一个DataStream,这意味着你可以对这些超级迟到数据继续写 如keyBy, window等处理逻辑。


使用方式:
先定义OutputTag对象(注意,必须new一个匿名内部类形式的OutputTag对象的实例)
然后调用sideOutputLateData方法
// side output   OutputTag对象必须是匿名内部类的形式创建出来, 本质上得到的是OutputTag对象的一个匿名子类
OutputTag<Tuple2<String, Long>> outputTag = new OutputTag<Tuple2<String, Long>>("side output"){};
WindowedStream<Tuple2<String, Long>, Tuple, TimeWindow> sideOutputLateData =
        allowedLateness.sideOutputLateData(outputTag);
用法:
DataStream<Tuple2<String, Long>> sideOutput = result.getSideOutput(outputTag);
// 对得到的保存超级迟到数据的DataStream进行处理
sideOutput.print("late>>>");

代码演示-完美版/企业版

前面的案例中已经可以使用Watermark 来解决一定程度上的数据延迟和数据乱序问题

但是对于延迟/迟到/乱序严重的数据还是会丢失,所以接下来

使用Watermark + AllowedLateness + SideOutput ,即使用侧道输出机制来单独收集延迟/迟到/乱序严重的数据,避免数据丢失!

package com.bigdata.day05;


import com.bigdata.day04.OrderInfo;
import org.apache.commons.lang3.time.DateFormatUtils;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;

import java.text.SimpleDateFormat;
import java.time.Duration;
import java.util.Date;
import java.util.Random;
import java.util.UUID;

/**
 * @基本功能:
 * @program:FlinkDemo
 * @author: 闫哥
 * @create:2024-05-15 17:06:04
 **/

class MyOrderSource2 implements SourceFunction<OrderInfo> {

    @Override
    public void run(SourceContext<OrderInfo> ctx) throws Exception {


        Random random = new Random();
        while(true){
            OrderInfo orderInfo = new OrderInfo();
            orderInfo.setOrderId(UUID.randomUUID().toString().replace("-",""));
            // 在这个地方可以模拟迟到数据
            long orderTime = System.currentTimeMillis() - 1000*random.nextInt(100);

            orderInfo.setOrdertime(orderTime);
            int money = random.nextInt(10);

            System.out.println("订单产生的时间:"+ DateFormatUtils.format(orderTime,"yyyy-MM-dd HH:mm:ss")+",金额:"+money);



            orderInfo.setMoney(money);

            orderInfo.setUserId(random.nextInt(2));
            ctx.collect(orderInfo);
            Thread.sleep(500);
        }

    }

    @Override
    public void cancel() {

    }
}
public class Demo01 {



    public static void main(String[] args) throws Exception {

        //1. env-准备环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
        env.setParallelism(1);

        // 每隔五秒统计每个用户的前面5秒的订单的总金额
        //2. source-加载数据
        DataStreamSource<OrderInfo> streamSource = env.addSource(new MyOrderSource2());

        //-2.告诉Flink最大允许的延迟时间/乱序时间为多少
        SingleOutputStreamOperator<OrderInfo> orderDSWithWatermark = streamSource.assignTimestampsAndWatermarks(
                WatermarkStrategy.<OrderInfo>forBoundedOutOfOrderness(Duration.ofSeconds(3))
                        //-3.告诉Flink哪一列是事件时间
                        .withTimestampAssigner((order, time) -> order.getOrdertime())
        );

        OutputTag<OrderInfo> outputTag = new OutputTag<OrderInfo>("side output"){};

        //3. transformation-数据处理转换
        SingleOutputStreamOperator<String> result = orderDSWithWatermark.keyBy(orderInfo -> orderInfo.getUserId()).
                window(TumblingEventTimeWindows.of(Time.seconds(5)))
                .allowedLateness(Time.seconds(4))
                .sideOutputLateData(outputTag)
                .apply(new WindowFunction<OrderInfo, String, Integer, TimeWindow>() {
                    @Override
                    public void apply(Integer key,  // 代表分组key值     五旬老太守国门
                                      TimeWindow window, // 代表窗口对象
                                      Iterable<OrderInfo> input, // 分组过之后的数据 [1,1,1,1,1]
                                      Collector<String> out  // 用于输出的对象
                    ) throws Exception {
                        SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
                        long start = window.getStart();
                        long end = window.getEnd();
                        int sum = 0;
                        // 专门存放迟到的订单时间

                        for (OrderInfo orderInfo : input) {

                            sum += orderInfo.getMoney();
                        }

                        out.collect(key + ",窗口开始:" + dateFormat.format(new Date(start)) + ",结束时间:" + dateFormat.format(new Date(end)) + "," + sum);
                        //out.collect(key+",窗口开始:"+start+",结束时间:"+end+","+sum);

                    }
                });
        result.print("流中的数据,包含迟到的数据:");
        result.getSideOutput(outputTag).print("严重迟到的数据:");
        //4. sink-数据输出



        //5. execute-执行
        env.execute();
    }
}
订单产生的时间:2024-05-16 11:19:00,金额:1
订单产生的时间:2024-05-16 11:18:13,金额:3
严重迟到的数据:> f96d34c438ce400eb21a25328fe772ee,1,3,2024-05-16 11:18:13
订单产生的时间:2024-05-16 11:19:10,金额:3
2024-05-16 11:19:00
流中的数据,包含迟到的数据:> 1,窗口开始:2024-05-16 11:19:00,结束时间:2024-05-16 11:19:05,1,迟到的订单时间:
订单产生的时间:2024-05-16 11:18:19,金额:8
严重迟到的数据:> cf85339600c647c99856841021466c5d,1,8,2024-05-16 11:18:19
订单产生的时间:2024-05-16 11:18:19,金额:9
严重迟到的数据:> f087ee9c9eaa4e3f9eac06a907b47fb4,1,9,2024-05-16 11:18:19
订单产生的时间:2024-05-16 11:17:48,金额:3
严重迟到的数据:> 22f48bb693874a99b80177f6764f0912,0,3,2024-05-16 11:17:48
订单产生的时间:2024-05-16 11:19:23,金额:1
2024-05-16 11:19:10
流中的数据,包含迟到的数据:> 1,窗口开始:2024-05-16 11:19:10,结束时间:2024-05-16 11:19:15,3,迟到的订单时间:
订单产生的时间:2024-05-16 11:19:19,金额:7
2024-05-16 11:19:19
流中的数据,包含迟到的数据:> 1,窗口开始:2024-05-16 11:19:15,结束时间:2024-05-16 11:19:20,7,迟到的订单时间:
订单产生的时间:2024-05-16 11:18:25,金额:2
严重迟到的数据:> 9b91cfd413784e4da9fb7f16b68186d0,0,2,2024-05-16 11:18:25
订单产生的时间:2024-05-16 11:18:48,金额:3
严重迟到的数据:> 62bdd621dd4d43b2b9f401949c1c5686,1,3,2024-05-16 11:18:48
订单产生的时间:2024-05-16 11:18:38,金额:1
严重迟到的数据:> e6fe834c88d043ef94b66853ad67dc46,1,1,2024-05-16 11:18:38
订单产生的时间:2024-05-16 11:18:39,金额:5
严重迟到的数据:> 472b35fc32744c39990d13bd490ae131,1,5,2024-05-16 11:18:39
订单产生的时间:2024-05-16 11:18:19,金额:0
严重迟到的数据:> 49270e8cf00445f38a4fbcfebedfdc0a,0,0,2024-05-16 11:18:19
订单产生的时间:2024-05-16 11:19:07,金额:8
严重迟到的数据:> a337af0e6f1c46e48e2ed2ec99d6dc53,0,8,2024-05-16 11:19:07
订单产生的时间:2024-05-16 11:19:22,金额:8
订单产生的时间:2024-05-16 11:19:01,金额:3
严重迟到的数据:> 5bf845744c9d486f97fbd8106deb22bb,0,3,2024-05-16 11:19:01
订单产生的时间:2024-05-16 11:18:42,金额:2
严重迟到的数据:> f861897e49a54b589863efd6866ce61f,0,2,2024-05-16 11:18:42
订单产生的时间:2024-05-16 11:18:15,金额:7
严重迟到的数据:> 37a8d94dc8b94854963ddcbefa3d00dc,0,7,2024-05-16 11:18:15
订单产生的时间:2024-05-16 11:17:56,金额:6
严重迟到的数据:> ab81cf409c604bb398d43b15f59d7e53,1,6,2024-05-16 11:17:56
订单产生的时间:2024-05-16 11:18:24,金额:3
严重迟到的数据:> f85598988f4b43599e719b873d930440,1,3,2024-05-16 11:18:24
订单产生的时间:2024-05-16 11:17:52,金额:2
严重迟到的数据:> 03ade6f3972d441289bd745f979c961a,1,2,2024-05-16 11:17:52
订单产生的时间:2024-05-16 11:19:20,金额:9
订单产生的时间:2024-05-16 11:18:07,金额:1
严重迟到的数据:> d8b8992ea2b74a5fb10bffb198917c8f,0,1,2024-05-16 11:18:07
订单产生的时间:2024-05-16 11:18:34,金额:2
严重迟到的数据:> a8855d90701449588efc4dfd48df1dd3,0,2,2024-05-16 11:18:34
订单产生的时间:2024-05-16 11:19:11,金额:3
严重迟到的数据:> a7e245864b2c4c03b463b121277309eb,0,3,2024-05-16 11:19:11
订单产生的时间:2024-05-16 11:18:11,金额:9
严重迟到的数据:> cb99f1b5c1e24e5bb6ccacb9f0f9ea78,0,9,2024-05-16 11:18:11
订单产生的时间:2024-05-16 11:18:36,金额:7

Process finished with exit code 130

虽然我们添加了延迟的效果,就是说侧道输出,侧道输出不能触发窗口的执行,窗口的执行只能通过水印时间触发 ,允许迟到的数据,不放入到当前窗口中,而是作为一个触发条件看到,它需要放入到它对应的窗口中。

只考虑 1 个并行度的问题

订单发生的真实事件:窗口5秒,间隔5秒,允许迟到 3秒 最晚允许迟到4秒

10:44:00 第一个区间就应该是10:44:00 10:44:05

10:44:01

10:44:02

10:44:03

10:44:04

10:44:05

10:44:07 第一个区间就应该是10:44:05 10:44:10

10:44:22 第一个区间就应该是10:44:20 10:44:25

10:44:30

10:44:28

10:44:20

通过上面这个图可以知道,44:07没有办法触发00~05的执行,但是07不放入00~05区间,而是放入10:44:05 10:44:10

44:22 一个数据触发了两个区间的执行 00~05 05~10

假如有一个订单时44:10产生的,放入哪个区间?应该放入10~15这个区间

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2250858.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

【Docker】部署nginx

docker部署nginx docker部署nginx镜像加速器1、拉取nginx镜像2、创建nginx容器3、浏览器访问 docker部署nginx 镜像加速器 备注&#xff1a;阿里云镜像加速地址 https://cr.console.aliyun.com/cn-hangzhou/instances/mirrors可用的镜像源&#xff1a; https://https://reg…

github webhooks 实现网站自动更新

本文目录 Github Webhooks 介绍Webhooks 工作原理配置与验证应用云服务器通过 Webhook 自动部署网站实现复制私钥编写 webhook 接口Github 仓库配置 webhook以服务的形式运行 app.py Github Webhooks 介绍 Webhooks是GitHub提供的一种通知方式&#xff0c;当GitHub上发生特定事…

Rust SQLx CLI 同步迁移数据库

上文我们介绍了SQLx及SQLite&#xff0c;并介绍了如何使用代码同步迁移数据库。本文介绍Sqlx cli 命令行工具&#xff0c;介绍如何安装、使用&#xff0c;利用其提供的命令实现数据表同步迁移。Java生态中有flyway, sqlx cli 功能类似&#xff0c;利用命令行工具可以和其他语言…

论文笔记-WWW2024-ClickPrompt

论文笔记-WWW2024-ClickPrompt: CTR Models are Strong Prompt Generators for Adapting Language Models to CTR Prediction ClickPrompt: CTR模型是大模型适配CTR预测任务的强大提示生成器摘要1.引言2.预备知识2.1传统CTR预测2.2基于PLM的CTR预测 3.方法3.1概述3.2模态转换3.…

解析客服知识库搭建的五个必要性

在当今竞争激烈的商业环境中&#xff0c;客服知识库的搭建已成为企业提升服务质量、优化客户体验的重要手段。一个完善的客服知识库不仅能帮助企业高效管理客户服务流程&#xff0c;还能显著提升客户满意度和忠诚度。以下是搭建客服知识库的五个必要性&#xff1a; 1. 提升服务…

css—轮播图实现

一、背景 最近和朋友在一起讨论的时候&#xff0c;我们提出了这样的一个提问&#xff0c;难道轮播图的效果只能通过js来实现吗&#xff1f;经过我们的一系列的争论&#xff0c;发现了这是可以通过纯css来实现这一效果的&#xff0c;CSS轮播图也是一种常见的网页展示方式&#x…

40分钟学 Go 语言高并发:【实战课程】工作池(Worker Pool)实现

工作池&#xff08;Worker Pool&#xff09;实战实现 一、知识要点概述 模块核心功能实现难点重要程度池化设计管理协程生命周期并发安全、资源控制⭐⭐⭐⭐⭐动态扩缩容根据负载调整池大小平滑扩缩、性能优化⭐⭐⭐⭐任务分发合理分配任务到worker负载均衡、任务优先级⭐⭐⭐…

深度学习3:数据预处理使用Pandas与PyTorch的实践

文章目录 导读一、主题与提纲1.1. 读取数据集1.2. 处理缺失值1.3. 转换为张量格式二、结论本文是经过严格查阅相关权威文献和资料,形成的专业的可靠的内容。全文数据都有据可依,可回溯。特别申明:数据和资料已获得授权。本文内容,不涉及任何偏颇观点,用中立态度客观事实描…

004 MATLAB数值微积分

01 函数的极值点 求解一元函数在区间(x1,x2)中极小值点&#xff1a; xfminbnd(fun,x1,x2)求解初始向量为x0的多元函数极小值点x和对应的极值y [x,y]fminsearch(fun,x0)02 微积分 1.数值微分&#xff1a; 一次微分&#xff1a; diff(x) 若x是一个向量&#xff0c;则返回[x(…

重塑用户体验!快手电商智能巡检平台的实践与探索

导读&#xff1a;随着科技的飞速发展&#xff0c;人工智能&#xff08;AI&#xff09;已经成为推动各行各业创新的重要力量。特别是在用户体验方面&#xff0c;AI 技术的应用不仅解决了许多传统问题&#xff0c;还带来了全新的交互方式和更高的用户满意度。本文将从快手电商B端…

C# 结构体

文章目录 前言一、结构体的定义与基本使用&#xff08;一&#xff09;定义结构体&#xff08;二&#xff09;结构体的使用示例 二、C# 结构的特点&#xff08;一&#xff09;丰富的成员类型&#xff08;二&#xff09;构造函数相关限制与特性&#xff08;三&#xff09;继承方面…

实现Linux平台自定义协议族

一 简介 我们常常在Linux系统中编写socket接收TCP/UDP协议数据&#xff0c;大家有没有想过它怎么实现的&#xff0c;如果我们要实现socket接收自定义的协议数据又该怎么做呢&#xff1f;带着这个疑问&#xff0c;我们一起往下看吧~~ 二 Linux内核函数简介 在Linux系统中要想…

Asp.net core Autofac 案例 注入、AOP 启用接口代理拦截 启用 类代理拦截=== 只会拦截虚方法

资料 core 实现autofac 》》》 安装 如下工具包 安装之后 如出现 这种 》》》编写 AOP类 using Castle.DynamicProxy; using System.Diagnostics;namespace Web01.AOP {/// <summary>/// 日志记录/// </summary>public class LoggingInterceptor : IInterc…

【深度学习】各种卷积—卷积、反卷积、空洞卷积、可分离卷积、分组卷积

在全连接神经网络中&#xff0c;每个神经元都和上一层的所有神经元彼此连接&#xff0c;这会导致网络的参数量非常大&#xff0c;难以实现复杂数据的处理。为了改善这种情况&#xff0c;卷积神经网络应运而生。 一、卷积 在信号处理中&#xff0c;卷积被定义为一个函数经过翻转…

VOLO实战:使用VOLO实现图像分类任务(二)

文章目录 训练部分导入项目使用的库设置随机因子设置全局参数图像预处理与增强读取数据设置Loss设置模型设置优化器和学习率调整策略设置混合精度&#xff0c;DP多卡&#xff0c;EMA定义训练和验证函数训练函数验证函数调用训练和验证方法 运行以及结果查看测试完整的代码 在上…

GPU 服务器厂家:怎样铸就卓越 AI 算力?

文章来源于百家号&#xff1a;GPU服务器厂家 今天咱来聊聊 GPU 服务器厂家那些事儿&#xff0c;而这其中衡量 AI 算力的因素可是关键所在哦。 先讲讲计算速度这一块。咱都知道 AI 那复杂的活儿&#xff0c;像训练超厉害的图像识别模型&#xff0c;得处理海量图像数据&#x…

DroneCAN 最新开发进展,Andrew在Ardupilot开发者大会2024的演讲

本文是Andrew演讲的中文翻译&#xff0c;你可以直接观看视频了解演讲的全部内容&#xff0c;此演讲视频的中文版本已经发布在Ardupilot社区的Blog板块&#xff0c;你可以在 Arudpilot官网&#xff08;https://ardupilot.org) 获取该视频&#xff1a; 你也可以直接通过Bilibili链…

USB Type-C一线通扩展屏:多场景应用,重塑高效办公与极致娱乐体验

在追求高效与便捷的时代&#xff0c;启明智显USB Type-C一线通扩展屏方案正以其独特的优势&#xff0c;成为众多职场人士、娱乐爱好者和游戏玩家的首选。这款扩展屏不仅具备卓越的性能和广泛的兼容性&#xff0c;更能在多个应用场景中发挥出其独特的价值。 USB2.0显卡&#xff…

Android 混淆问题

我的安卓混淆只需要在gradle里面开启就行了。 buildTypes {release {minifyEnabled trueshrinkResources truezipAlignEnabled trueproguardFiles getDefaultProguardFile(proguard-android-optimize.txt), proguard-rules.pro}} minifyEnabled true 这个就是开启方法&#xf…

《硬件架构的艺术》笔记(九):电磁兼容性能设计指南

简介 电子线路易于接收来自其他发射器的辐射信号&#xff0c;这些EMI&#xff08;电磁干扰&#xff09;使得设备内毗邻的元件不能同时工作。这就有必要进行电磁兼容设计以避免系统内有害的电磁干扰。 确保设备不产生多余的辐射&#xff0c;设备也不易受到射频辐射的干扰&…