Time的分类 (时间语义)
EventTime:事件(数据)时间,是事件/数据真真正正发生时/产生时的时间
IngestionTime:摄入时间,是事件/数据到达流处理系统的时间
ProcessingTime:处理时间,是事件/数据被处理/计算时的系统的时间
EventTime的重要性
假设,你正在去往地下停车场的路上,并且打算用手机点一份外卖。选好了外卖后,你就用在线支付功能付款了,这个时候是11点59分(EventTime)。恰好这时,你走进了地下停车库,而这里并没有手机信号。因此外卖的在线支付并没有立刻成功,而支付系统一直在Retry重试“支付”这个操作。
当你找到自己的车并且开出地下停车场的时候,已经是12点01分了(ProcessingTime)。这个时候手机重新有了信号,手机上的支付数据成功发到了外卖在线支付系统,支付完成。
在上面这个场景中你可以看到,
支付数据的事件时间是11点59分,而支付数据的处理时间是12点01分
问题:
如果要统计12之前的订单金额,那么这笔交易是否应被统计?
答案:
应该被统计,因为该数据的真真正正的产生时间为11点59分,即该数据的事件时间为11点59分,
事件时间能够真正反映/代表事件的本质! 所以一般在实际开发中会以事件时间作为计算标准。
还可以通过钉钉打卡、饭卡机 等 举例子。
一条错误日志的内容为:
2020-11-11 23:59:58 error NullPointExcep --事件时间
进入Flink的时间为2020-11-11 23:59:59 --摄入时间
到达Window的时间为2020-11-12 00:00:01 --处理时间
问题:
对于业务来说,要统计每天的的故障日志个数,哪个时间是最有意义的?
答案:
EventTime事件时间,因为bug真真正正产生的时间就是事件时间,只有事件时间才能真正反映/代表事件的本质!
总结:
1.事件时间确实重要, 因为它能够代表事件/数据的本质,是事件/数据真真正正发生/产生的时间
2.按照事件时间进去处理/计算,会存在一定的难度, 因为数据可能会因为网路延迟等原因, 发生乱序或延迟到达, 那么最后的计算结果就有可能错误或数据丢失
3.需要有技术来解决上面的问题,使用Watermark技术来解决!
Watermark是什么?-水印,水位线
为什么会有WaterMark?
当flink 以 EventTime 模式处理流数据时,它会根据数据里的时间戳来处理基于时间的算子。但是由于网络、分布式等原因,会导致数据乱序的情况。如下图所示
只要使用event time,就必须使用watermark,在上游指定,比如:source、map算子后.
Watermark的核心本质可以理解成一个延迟触发机制。
我们知道,流处理从事件产生,到流经source,再到operator,中间是有一个过程和时间的,虽然大部分情况下,流到operator的数据都是按照事件产生的时间顺序来的,但是也不排除由于网络等原因,导致乱序的产生,所谓乱序,就是指Flink接收到的事件的先后顺序不是严格按照事件的Event Time顺序排列的。
Watermark就是给数据额外添加的一列时间戳!
Watermark = 当前最大的事件时间 - 最大允许的延迟时间(或最大允许的乱序度时间)
假如明天出去玩,09:00集合,最多允许迟到10分钟。
08:50 胜赛来了 08:50 - 10 = 08:40
09:05 步迅来了 09:05 - 10 = 08:55
09:35 青林来了 watermark = 09:35 - 10 = 09:25
能否上到车上的条件是:watermark <= 时间点
Watermark能解决什么问题,如何解决的?
有了Watermark 就可以在一定程度上解决数据乱序或延迟达到问题!
不添加watermark ,窗口如何触发:
1)窗口有数据
2)窗口的结束时间到了。
班车:到了时间点立即发车,来了数据也不要。
有了Watermark就可以根据Watermark来决定窗口的触发时机,满足下面的条件才触发:
1.窗口有数据
2.Watermark >= 窗口的结束时间
满足以上条件则触发窗口计算!
以前窗口触发:系统时间到了窗口结束时间就触发
现在窗口触发:Watermark >= 窗口的结束时间
而Watermark = 当前最大的事件时间 - 最大允许的延迟时间(或最大允许的乱序度时间)
就意味着, 通过Watermark改变了窗口的触发时机了, 那么接下来我们看如何改变的/如何解决前面的问题的
需要记住:
Watermark = 当前最大的事件时间 - 最大允许的延迟时间(或最大允许的乱序度时间)
窗口触发时机 : Watermark >= 窗口的结束时间
水印(watermark)就是一个时间戳,Flink可以给数据流添加水印,可以理解为:收到一条消息后,额外给这个消息添加了一个时间字段,这就是添加水印,一般人为添加的消息的水印都会比当前消息的事件时间小一些。
窗口是否关闭,按照水印时间来判断,但原有事件时间不会被修改,窗口的边界依旧是事件时间来决定。
- 水印并不会影响原有Eventtime
- 当数据流添加水印后,会按照水印时间来触发窗口计算
- 一般会设置水印时间,比Eventtime小一些(一般几秒钟)
- 当接收到的水印时间>= 窗口的endTime且窗口内有数据,则触发计算
水印(水印时间)的计算:事件时间– 设置的水印长度 = 水印时间
比如,事件时间是10分30秒, 水印长度是2秒,那么水印时间就是10分28秒
Watermark图解原理
总结:
Watermark 是一个单独计算出来的时间戳
Watermark = 当前最大的事件时间 - 最大允许的延迟时间(乱序度)
Watermark可以通过改变窗口的触发时机 在 一定程度上解决数据乱序或延迟达到的问题
Watermark >= 窗口结束时间 时 就会触发窗口计算(窗口中得有数据)
延迟或乱序严重的数据还是丢失, 但是可以通过调大 最大允许的延迟时间(乱序度) 来解决, 或 使用后面要学习的侧道输出流来单独收集延迟或乱序严重的数据,保证数据不丢失!
多并行度的水印触发
在多并行度下,每个并行有一个水印
比如并行度是6,那么程序中就有6个watermark
分别属于这6个并行度(线程)
那么,触发条件以6个水印中最小的那个为准
比如, 有个窗口是0-5
其中5个并行度的水印都超过了5
但有一个并行度的水印是3
那么,不管另外5个并行度中的水印达到了多大,都不会触发
因为6个并行度中的6个水印,最小的是3,不满足大于等于窗口结束5的条件
在测试水印的时候,记得把并行度设置为1 ,好看结果,否则,结果不太容易看出来。
Watermark代码演示
需求
实时模拟生成订单数据,格式为: (订单ID,用户ID,时间戳/事件时间,订单金额)
要求每隔5s,计算5秒内,每个用户的订单总金额
并添加Watermark来解决一定程度上的数据延迟和数据乱序问题。
不使用水印的时候【不能使用eventtime时间语义】,进行开发:
package com.bigdata.day05;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import java.time.Duration;
import java.util.Random;
import java.util.UUID;
/**
* @基本功能:
* @program:FlinkDemo
* @author: 闫哥
* @create:2023-11-23 10:07:21
*
* 实时模拟生成订单数据,格式为: (订单ID,用户ID,时间戳/事件时间,订单金额)
* 要求每隔5s,计算5秒内,每个用户的订单总金额
* 并添加Watermark来解决一定程度上的数据延迟和数据乱序问题。
**/
public class _01WatermarkDemo {
@Data // set get toString
@AllArgsConstructor
@NoArgsConstructor
public static class OrderInfo2{
private String orderId;
private int uid;
private int money;
private long timeStamp;
}
public static class MySource implements SourceFunction<OrderInfo2> {
boolean flag = true;
@Override
public void run(SourceContext ctx) throws Exception {
// 源源不断的产生数据
Random random = new Random();
while(flag){
OrderInfo2 orderInfo = new OrderInfo2();
orderInfo.setOrderId(UUID.randomUUID().toString());
orderInfo.setUid(random.nextInt(3));
orderInfo.setMoney(random.nextInt(101));
orderInfo.setTimeStamp(System.currentTimeMillis());
ctx.collect(orderInfo);
Thread.sleep(1000);// 间隔1s
}
}
// source 停止之前需要干点啥
@Override
public void cancel() {
flag = false;
}
}
public static void main(String[] args) throws Exception {
//1. env-准备环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
//2. source-加载数据
DataStreamSource<OrderInfo2> orderSourceStream = env.addSource(new MySource());
//3. transformation-数据处理转换
// 每个用户的订单总额
KeyedStream<OrderInfo2, Integer> keyedStream = orderDSWithWatermark.keyBy(orderInfo2 -> orderInfo2.getUid());
SingleOutputStreamOperator<OrderInfo2> result1 = keyedStream.window(TumblingProcessingTimeWindows.of(Time.seconds(5))).sum("money");
result1.print();
//4. sink-数据输出
//5. execute-执行
env.execute();
}
}
假如出现如下错误:
Exception in thread "main" org.apache.flink.api.common.typeutils.CompositeType$InvalidFieldReferenceException: Cannot reference field by field expression on GenericType<com.bigdata.day05.OrderInfo2>Field expressions are only supported on POJO types, tuples, and case classes. (See the Flink documentation on what is considered a POJO.)
at org.apache.flink.streaming.util.typeutils.FieldAccessorFactory.getAccessor(FieldAccessorFactory.java:224)
at org.apache.flink.streaming.api.functions.aggregation.SumAggregator.<init>(SumAggregator.java:53)
at org.apache.flink.streaming.api.datastream.WindowedStream.sum(WindowedStream.java:688)
at com.bigdata.day05._01WatermarkDemo.main(_01WatermarkDemo.java:103)
说明这个pojo 必须是public 的,否则不解析。
修正过的代码:
package com.bigdata.time;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
@Data // set get toString
@AllArgsConstructor
@NoArgsConstructor
public class OrderInfo{
private String orderId;
private int uid;
private int money;
private long timeStamp;
}
package com.bigdata.time;
import org.apache.commons.lang3.time.DateFormatUtils;
import org.apache.commons.lang3.time.DateUtils;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
import java.util.Date;
import java.util.Random;
import java.util.UUID;
/**
* @基本功能:
* @program:FlinkDemo
* @author: 闫哥
* @create:2024-11-26 10:30:28
**/
class MySource extends RichSourceFunction<OrderInfo> {
boolean flag = true;
@Override
public void run(SourceContext<OrderInfo> ctx) throws Exception {
while(flag){
OrderInfo orderInfo = new OrderInfo();
Random random = new Random();
int userId = random.nextInt(10);
int money = random.nextInt(100);
long timeStamp = System.currentTimeMillis() - random.nextInt(3000);
orderInfo.setOrderId(UUID.randomUUID().toString());
orderInfo.setUid(userId);
orderInfo.setMoney(money);
orderInfo.setTimeStamp(timeStamp);
ctx.collect(orderInfo);
Thread.sleep(20);
}
}
@Override
public void cancel() {
flag = false;
}
}
public class _01_OrderDemo {
public static void main(String[] args) throws Exception {
//1. env-准备环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
env.setParallelism(1);
//2. source-加载数据
DataStreamSource<OrderInfo> streamSource = env.addSource(new MySource());
//3. transformation-数据处理转换
streamSource.keyBy(new KeySelector<OrderInfo, Integer>() {
@Override
public Integer getKey(OrderInfo orderInfo) throws Exception {
return orderInfo.getUid();
}
}).window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
//.sum("money").print();
.apply(new WindowFunction<OrderInfo, String, Integer, TimeWindow>() {
@Override
public void apply(Integer userId, TimeWindow window, Iterable<OrderInfo> input, Collector<String> out) throws Exception {
String beginTime = DateFormatUtils.format(window.getStart(), "yyyy-MM-dd HH:mm:ss");
String endTime = DateFormatUtils.format(window.getEnd(), "yyyy-MM-dd HH:mm:ss");
int sum = 0;
for (OrderInfo orderInfo : input) {
sum += orderInfo.getMoney();
}
out.collect(beginTime+","+endTime+","+userId +","+sum);
}
}).print();
//4. sink-数据输出
//5. execute-执行
env.execute();
}
}
需求升级:
实时模拟生成订单数据,格式为: (订单ID,用户ID,时间戳/事件时间,订单金额)
* 要求每隔5s,计算5秒内,每个用户的订单总金额
* 并添加Watermark来解决一定程度上的数据延迟和数据乱序问题。
假如你添加了 eventTime 缺没有添加水印的代码,会报如下错误:
Caused by: java.lang.RuntimeException: Record has Long.MIN_VALUE timestamp (= no timestamp marker). Is the time characteristic set to 'ProcessingTime', or did you forget to call 'DataStream.assignTimestampsAndWatermarks(...)'?
at org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows.assignWindows(TumblingEventTimeWindows.java:83)
at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.processElement(WindowOperator.java:302)
代码演示-开发版
生成 Watermark | Apache Flink
package com.bigdata.time;
import org.apache.commons.lang3.time.DateFormatUtils;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.eventtime.*;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
import java.time.Duration;
/**
* @基本功能:
* @program:FlinkDemo
* @author: 闫哥
* @create:2024-11-26 10:30:28
**/
public class _02_OrderDemoWithWaterMark {
public static void main(String[] args) throws Exception {
//1. env-准备环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
env.setParallelism(1);
//2. source-加载数据
DataStreamSource<OrderInfo> streamSource = env.addSource(new MySource());
//3. transformation-数据处理转换
streamSource.assignTimestampsAndWatermarks(WatermarkStrategy.<OrderInfo>forBoundedOutOfOrderness(Duration.ofSeconds(3)).withTimestampAssigner(
new SerializableTimestampAssigner<OrderInfo>() {
// long 是时间戳吗?是秒值还是毫秒呢?年月日时分秒的的字段怎么办呢?
@Override
public long extractTimestamp(OrderInfo orderInfo, long recordTimestamp) {
// 这个方法的返回值是毫秒,所有的数据只要不是这个毫秒值,都需要转换为毫秒
return orderInfo.getTimeStamp();
}
}
))
.keyBy(new KeySelector<OrderInfo, Integer>() {
@Override
public Integer getKey(OrderInfo orderInfo) throws Exception {
return orderInfo.getUid();
}
}).window(TumblingEventTimeWindows.of(Time.seconds(5)))
//.sum("money").print();
.apply(new WindowFunction<OrderInfo, String, Integer, TimeWindow>() {
@Override
public void apply(Integer userId, TimeWindow window, Iterable<OrderInfo> input, Collector<String> out) throws Exception {
String beginTime = DateFormatUtils.format(window.getStart(), "yyyy-MM-dd HH:mm:ss");
String endTime = DateFormatUtils.format(window.getEnd(), "yyyy-MM-dd HH:mm:ss");
int sum = 0;
for (OrderInfo orderInfo : input) {
sum += orderInfo.getMoney();
}
out.collect(beginTime+","+endTime+","+userId +","+sum);
}
}).print();
//4. sink-数据输出
//5. execute-执行
env.execute();
}
}
通过静态方法forBoundedOutOfOrderness提供,入参接收一个Duration类型的时间间隔,也就是我们可以接受的最大的延迟时间.使用这种延迟策略的时候需要我们对数据的延迟时间有一个大概的预估判断。
WatermarkStrategy#forBoundedOutOfOrderness(Duration maxOutOfOrderness) |
我们实现一个延迟3秒的固定延迟水印,可以这样做:
DataStream dataStream = ...... ; |
Flink对于迟到数据的处理
水印:对于迟到数据不长
allowedLateness: 迟到时间很长
侧道输出:对于迟到时间特别长
对于延迟数据的理解:
水印机制(水位线、watermark)机制可以帮助我们在短期延迟下,允许乱序数据的到来。
这个机制很好的处理了那些因为网络等情况短期延迟的数据,让窗口等它们一会儿。
但是水印机制无法长期的等待下去,因为水印机制简单说就是让窗口一直等在那里,等达到水印时间才会触发计算和关闭窗口
这个等待不能一直等,因为会一直缓着数据不计算。
一般水印也就是几秒钟最多几分钟而已(看业务)
那么,在现实世界中,延迟数据除了有短期延迟外,长期延迟也是很常见的。
比如:
l 客户端断网,等了好几个小时才恢复
l 车联网系统进入隧道后没有信号无法上报数据
l 手机欠费没有网
等等,这些场景下数据的迟到就不是简单的网络堵塞造成的几秒延迟了
而是小时、天级别的延迟
对于水印来说,这样的长期延迟数据是无法很好处理的。
那么有没有什么办法去处理这些长期延迟的数据呢?让其可以找到其所属的窗口正常完成计算,哪怕晚了几个小时。
这个场景的解决方式就是:延迟数据处理机制(allowedLateness方法)。
水印:乱序数据处理(时间很短的延迟)
延迟处理:长期延迟数据的处理机制。
延迟数据的处理:
waterMark和Window机制解决了流式数据的乱序问题,对于因为延迟而顺序有误的数据,可以根据eventTime进行业务处理,对于延迟的数据Flink也有自己的解决办法,
主要的办法是给定一个允许延迟的时间,在该时间范围内仍可以接受处理延迟数据
设置允许延迟的时间是通过allowedLateness(lateness: Time)设置
保存延迟数据则是通过sideOutputLateData(outputTag: OutputTag[T])保存
获取延迟数据是通过DataStream.getSideOutput(tag: OutputTag[X])获取
1)allowedLateness(lateness: Time)
当我们对流设置窗口后得到的WindowedSteam对象就可以使用allowedLateness方法
该方法传入一个Time值,设置允许的长期延迟(迟到)的时间。
和watermark不同。
未设置allowedLateness(为0),当watermark满足条件,会触发窗口的 执行 + 关闭
当设置了allowedLateness,当watermark满足条件后,只会触发窗口的执行,不会触发窗口关闭
也就是,watermark满足条件后会正常触发窗口计算,将已有的数据完成计算。
但是,不会关闭窗口。如果在allowedLateness允许的时间内仍有这个窗口的数据进来,那么每进来一条,会和已经计算过的(被watermark触发的)数据一起在计算一次。
水印:短期延迟,达到条件后触发计算并且关闭窗口(触发+关闭同时进行)
水印+allowedLateness : 短期延迟+ 等待长期延迟效果, 达到水印条件后,会触发窗口计算,但是不关闭窗口。事件时间延迟达到水印+allowedLateness之和后会关闭窗口。
2) 侧输出-SideOutput
Flink 通过watermark在短时间内允许了乱序到来的数据
通过延迟数据处理机制,可以处理长期迟到的数据。
但总有那么些数据来的晚的太久了。允许迟到1天的设置,它迟到了2天才来。
对于这样的迟到数据,水印无能为力,设置allowedLateness也无能为力,那对于这样的数据Flink就只能任其丢掉了吗?
不会,Flink的两个迟到机制尽量确保了数据不会错过了属于他们的窗口,但是真的迟到太久了,Flink也有一个机制将这些数据收集起来
保存成为一个DataStream,然后,交由开发人员自行处理。
那么这个机制就叫做侧输出机制(Side Output)
侧输出机制:可以将错过水印又错过allowedLateness允许的时间的数据,单独的存放到一个DataStream中,然后开发人员可以自定逻辑对这些超级迟到数据进行处理。
处理主要使用两个方式:
对窗口对象调用sideOutputLateData(OutputTag outputTag)方法,将数据存储到一个地方
对DataStream对象调用getSideOutput(OutputTag outputTag)方法,取出这些被单独处理的数据的DataStream
注意,取到的是一个DataStream,这意味着你可以对这些超级迟到数据继续写 如keyBy, window等处理逻辑。
使用方式:
先定义OutputTag对象(注意,必须new一个匿名内部类形式的OutputTag对象的实例)
然后调用sideOutputLateData方法
// side output OutputTag对象必须是匿名内部类的形式创建出来, 本质上得到的是OutputTag对象的一个匿名子类
OutputTag<Tuple2<String, Long>> outputTag = new OutputTag<Tuple2<String, Long>>("side output"){};
WindowedStream<Tuple2<String, Long>, Tuple, TimeWindow> sideOutputLateData =
allowedLateness.sideOutputLateData(outputTag);
用法:
DataStream<Tuple2<String, Long>> sideOutput = result.getSideOutput(outputTag);
// 对得到的保存超级迟到数据的DataStream进行处理
sideOutput.print("late>>>");
代码演示-完美版/企业版
前面的案例中已经可以使用Watermark 来解决一定程度上的数据延迟和数据乱序问题
但是对于延迟/迟到/乱序严重的数据还是会丢失,所以接下来
使用Watermark + AllowedLateness + SideOutput ,即使用侧道输出机制来单独收集延迟/迟到/乱序严重的数据,避免数据丢失!
package com.bigdata.day05;
import com.bigdata.day04.OrderInfo;
import org.apache.commons.lang3.time.DateFormatUtils;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;
import java.text.SimpleDateFormat;
import java.time.Duration;
import java.util.Date;
import java.util.Random;
import java.util.UUID;
/**
* @基本功能:
* @program:FlinkDemo
* @author: 闫哥
* @create:2024-05-15 17:06:04
**/
class MyOrderSource2 implements SourceFunction<OrderInfo> {
@Override
public void run(SourceContext<OrderInfo> ctx) throws Exception {
Random random = new Random();
while(true){
OrderInfo orderInfo = new OrderInfo();
orderInfo.setOrderId(UUID.randomUUID().toString().replace("-",""));
// 在这个地方可以模拟迟到数据
long orderTime = System.currentTimeMillis() - 1000*random.nextInt(100);
orderInfo.setOrdertime(orderTime);
int money = random.nextInt(10);
System.out.println("订单产生的时间:"+ DateFormatUtils.format(orderTime,"yyyy-MM-dd HH:mm:ss")+",金额:"+money);
orderInfo.setMoney(money);
orderInfo.setUserId(random.nextInt(2));
ctx.collect(orderInfo);
Thread.sleep(500);
}
}
@Override
public void cancel() {
}
}
public class Demo01 {
public static void main(String[] args) throws Exception {
//1. env-准备环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
env.setParallelism(1);
// 每隔五秒统计每个用户的前面5秒的订单的总金额
//2. source-加载数据
DataStreamSource<OrderInfo> streamSource = env.addSource(new MyOrderSource2());
//-2.告诉Flink最大允许的延迟时间/乱序时间为多少
SingleOutputStreamOperator<OrderInfo> orderDSWithWatermark = streamSource.assignTimestampsAndWatermarks(
WatermarkStrategy.<OrderInfo>forBoundedOutOfOrderness(Duration.ofSeconds(3))
//-3.告诉Flink哪一列是事件时间
.withTimestampAssigner((order, time) -> order.getOrdertime())
);
OutputTag<OrderInfo> outputTag = new OutputTag<OrderInfo>("side output"){};
//3. transformation-数据处理转换
SingleOutputStreamOperator<String> result = orderDSWithWatermark.keyBy(orderInfo -> orderInfo.getUserId()).
window(TumblingEventTimeWindows.of(Time.seconds(5)))
.allowedLateness(Time.seconds(4))
.sideOutputLateData(outputTag)
.apply(new WindowFunction<OrderInfo, String, Integer, TimeWindow>() {
@Override
public void apply(Integer key, // 代表分组key值 五旬老太守国门
TimeWindow window, // 代表窗口对象
Iterable<OrderInfo> input, // 分组过之后的数据 [1,1,1,1,1]
Collector<String> out // 用于输出的对象
) throws Exception {
SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
long start = window.getStart();
long end = window.getEnd();
int sum = 0;
// 专门存放迟到的订单时间
for (OrderInfo orderInfo : input) {
sum += orderInfo.getMoney();
}
out.collect(key + ",窗口开始:" + dateFormat.format(new Date(start)) + ",结束时间:" + dateFormat.format(new Date(end)) + "," + sum);
//out.collect(key+",窗口开始:"+start+",结束时间:"+end+","+sum);
}
});
result.print("流中的数据,包含迟到的数据:");
result.getSideOutput(outputTag).print("严重迟到的数据:");
//4. sink-数据输出
//5. execute-执行
env.execute();
}
}
订单产生的时间:2024-05-16 11:19:00,金额:1
订单产生的时间:2024-05-16 11:18:13,金额:3
严重迟到的数据:> f96d34c438ce400eb21a25328fe772ee,1,3,2024-05-16 11:18:13
订单产生的时间:2024-05-16 11:19:10,金额:3
2024-05-16 11:19:00
流中的数据,包含迟到的数据:> 1,窗口开始:2024-05-16 11:19:00,结束时间:2024-05-16 11:19:05,1,迟到的订单时间:
订单产生的时间:2024-05-16 11:18:19,金额:8
严重迟到的数据:> cf85339600c647c99856841021466c5d,1,8,2024-05-16 11:18:19
订单产生的时间:2024-05-16 11:18:19,金额:9
严重迟到的数据:> f087ee9c9eaa4e3f9eac06a907b47fb4,1,9,2024-05-16 11:18:19
订单产生的时间:2024-05-16 11:17:48,金额:3
严重迟到的数据:> 22f48bb693874a99b80177f6764f0912,0,3,2024-05-16 11:17:48
订单产生的时间:2024-05-16 11:19:23,金额:1
2024-05-16 11:19:10
流中的数据,包含迟到的数据:> 1,窗口开始:2024-05-16 11:19:10,结束时间:2024-05-16 11:19:15,3,迟到的订单时间:
订单产生的时间:2024-05-16 11:19:19,金额:7
2024-05-16 11:19:19
流中的数据,包含迟到的数据:> 1,窗口开始:2024-05-16 11:19:15,结束时间:2024-05-16 11:19:20,7,迟到的订单时间:
订单产生的时间:2024-05-16 11:18:25,金额:2
严重迟到的数据:> 9b91cfd413784e4da9fb7f16b68186d0,0,2,2024-05-16 11:18:25
订单产生的时间:2024-05-16 11:18:48,金额:3
严重迟到的数据:> 62bdd621dd4d43b2b9f401949c1c5686,1,3,2024-05-16 11:18:48
订单产生的时间:2024-05-16 11:18:38,金额:1
严重迟到的数据:> e6fe834c88d043ef94b66853ad67dc46,1,1,2024-05-16 11:18:38
订单产生的时间:2024-05-16 11:18:39,金额:5
严重迟到的数据:> 472b35fc32744c39990d13bd490ae131,1,5,2024-05-16 11:18:39
订单产生的时间:2024-05-16 11:18:19,金额:0
严重迟到的数据:> 49270e8cf00445f38a4fbcfebedfdc0a,0,0,2024-05-16 11:18:19
订单产生的时间:2024-05-16 11:19:07,金额:8
严重迟到的数据:> a337af0e6f1c46e48e2ed2ec99d6dc53,0,8,2024-05-16 11:19:07
订单产生的时间:2024-05-16 11:19:22,金额:8
订单产生的时间:2024-05-16 11:19:01,金额:3
严重迟到的数据:> 5bf845744c9d486f97fbd8106deb22bb,0,3,2024-05-16 11:19:01
订单产生的时间:2024-05-16 11:18:42,金额:2
严重迟到的数据:> f861897e49a54b589863efd6866ce61f,0,2,2024-05-16 11:18:42
订单产生的时间:2024-05-16 11:18:15,金额:7
严重迟到的数据:> 37a8d94dc8b94854963ddcbefa3d00dc,0,7,2024-05-16 11:18:15
订单产生的时间:2024-05-16 11:17:56,金额:6
严重迟到的数据:> ab81cf409c604bb398d43b15f59d7e53,1,6,2024-05-16 11:17:56
订单产生的时间:2024-05-16 11:18:24,金额:3
严重迟到的数据:> f85598988f4b43599e719b873d930440,1,3,2024-05-16 11:18:24
订单产生的时间:2024-05-16 11:17:52,金额:2
严重迟到的数据:> 03ade6f3972d441289bd745f979c961a,1,2,2024-05-16 11:17:52
订单产生的时间:2024-05-16 11:19:20,金额:9
订单产生的时间:2024-05-16 11:18:07,金额:1
严重迟到的数据:> d8b8992ea2b74a5fb10bffb198917c8f,0,1,2024-05-16 11:18:07
订单产生的时间:2024-05-16 11:18:34,金额:2
严重迟到的数据:> a8855d90701449588efc4dfd48df1dd3,0,2,2024-05-16 11:18:34
订单产生的时间:2024-05-16 11:19:11,金额:3
严重迟到的数据:> a7e245864b2c4c03b463b121277309eb,0,3,2024-05-16 11:19:11
订单产生的时间:2024-05-16 11:18:11,金额:9
严重迟到的数据:> cb99f1b5c1e24e5bb6ccacb9f0f9ea78,0,9,2024-05-16 11:18:11
订单产生的时间:2024-05-16 11:18:36,金额:7
Process finished with exit code 130
虽然我们添加了延迟的效果,就是说侧道输出,侧道输出不能触发窗口的执行,窗口的执行只能通过水印时间触发 ,允许迟到的数据,不放入到当前窗口中,而是作为一个触发条件看到,它需要放入到它对应的窗口中。
只考虑 1 个并行度的问题
订单发生的真实事件:窗口5秒,间隔5秒,允许迟到 3秒 最晚允许迟到4秒
10:44:00 第一个区间就应该是10:44:00 10:44:05
10:44:01
10:44:02
10:44:03
10:44:04
10:44:05
10:44:07 第一个区间就应该是10:44:05 10:44:10
10:44:22 第一个区间就应该是10:44:20 10:44:25
10:44:30
10:44:28
10:44:20
通过上面这个图可以知道,44:07没有办法触发00~05的执行,但是07不放入00~05区间,而是放入10:44:05 10:44:10
44:22 一个数据触发了两个区间的执行 00~05 05~10
假如有一个订单时44:10产生的,放入哪个区间?应该放入10~15这个区间