forBoundedOutOfOrderness 和 TumblingEventTimeWindows
forBoundedOutOfOrderness(M)
TumblingEventTimeWindows(N)
第一条数据的时间TS1
第一个窗口期公式:
窗口开始时间:
win_start = ((TS1-M)/N) * N
窗口结束时间:
win_end = win_start+N
数据过期:
凡是<win_start都是过期数据
第一个窗口汇总计算触发:
与数据之间的接收的间隔时间无关,与总时长也无关。
只与接收到的数据的时间TS2有关。
当 TS2>=win_end+M 时会将时间水印在 >= win_start && <=win_end 给到Apply。
TS2>=win_end+M 是唯一条件。
import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;
public class WaterMarkTest {
public WaterMarkTest() {
}
static StringBuilder sb = new StringBuilder();
static long sts = 0L;
static long ets = 0L;
static long sleeps = 500;
public <R> void run() throws Exception{
sts = System.currentTimeMillis();
StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();
OutputTag<Tuple2<String, String>> lateOutputTag = new OutputTag<Tuple2<String, String>>("late-data-lx"){private static final long serialVersionUID = 154621L;};
DataStream<String> dataStream = env.addSource(new SourceFunction<String>() {
private static final long serialVersionUID = 1134546L;
@Override
public void run(SourceContext<String> ctx) throws Exception {
ctx.collect("hello,1553503188000");
Thread.sleep(sleeps);//水印计算间隔是200ms,所以不要低于这个值
ctx.collect("hello,1553503186000");
Thread.sleep(sleeps);
ctx.collect("hello,1553503183000");
Thread.sleep(sleeps);
ctx.collect("hello,1553503180000");
Thread.sleep(sleeps);
ctx.collect("hello,1553503185000");
Thread.sleep(sleeps);
ctx.collect("hello,1553503188000");
Thread.sleep(sleeps);
ctx.collect("hello,1553503189000");
Thread.sleep(1000);
ctx.collect("hello,1553503188000");
Thread.sleep(1000);
ctx.collect("hello,1553503189000");
Thread.sleep(1000);
ctx.collect("hello,1553503190000");
Thread.sleep(1000);
ctx.collect("hello,1553503191000");
Thread.sleep(1000);
ctx.collect("hello,1553503186000");
Thread.sleep(1000);
ctx.collect("hello,1553503187000");
Thread.sleep(1000);
ctx.collect("hello,1553503185000");
Thread.sleep(1000);
ctx.collect("hello,1553503184000"); //丢弃
Thread.sleep(1000);
ctx.collect("hello,1553503183000"); //丢弃
Thread.sleep(1000);
ctx.collect("hello,1553503190000");
Thread.sleep(1000);
ctx.collect("hello,1553503192000");
Thread.sleep(1000);
ctx.collect("hello,1553503193000");
Thread.sleep(1000);
ctx.collect("hello,1553503194000");
Thread.sleep(1000);
ctx.collect("hello,1553503195000");
Thread.sleep(1000);
ctx.collect("hello,1553503196000");
Thread.sleep(1000);
ctx.collect("hello,1553503197000");
Thread.sleep(1000);
ctx.collect("hello,1553503198000");
Thread.sleep(1000);
ctx.collect("hello,1553503199000");
Thread.sleep(1000);
ctx.collect("hello,1553503200000");
Thread.sleep(1000);
ctx.collect("hello,1553503201000");
Thread.sleep(1000);
ctx.collect("hello,1553503202000");
// Thread.sleep(15000);
System.out.println("1 ============================================================");
sb.append("time use 1="+(ets-sts)+", ets="+ets+"\n");
}
@Override
public void cancel() {
}
}, "source1")
/**
* assignTimestampsAndWatermarks 的代码注释翻译:
* Assigns timestamps to the elements in the data stream and generates watermarks to signalevent time progress.
* The given WatermarkStrategy is used to create a TimestampAssigner and WatermarkGenerator.
*
* 为数据流里面的元素设置时间,并且给”信号事件时间处理”计算水印
* 给定的水印策略是用来创建 TimestampAssigner and WatermarkGenerator
*
* For each event in the data stream, the TimestampAssigner.extractTimestamp(Object, long) method is called to assign an event timestamp.
*
* 数据流里面的每一个事件,都会调用 TimestampAssigner.extractTimestamp(Object, long) 方法去给事件添加时间记录。
*
* For each event in the data stream, the WatermarkGenerator.onEvent(Object, long, WatermarkOutput) will be called.
* 数据流里面的每个事件,都会调用WatermarkGenerator.onEvent方法
*
*/
.assignTimestampsAndWatermarks(
WatermarkStrategy
.<String>forBoundedOutOfOrderness(Duration.ofSeconds(3))
/**
* 给数据打上时间信息
*/
.withTimestampAssigner(
new SerializableTimestampAssigner<String>() {
private static final long serialVersionUID = 134231L;
long recordTimestamp = 0L;
long lst_ts = 0L;
@Override
public long extractTimestamp(String element, long _recordTimestamp) {//_recordTimestamp 是element的内部时间
String[] fields = element.split(",");
Long aLong = new Long(fields[1]);
long now = System.currentTimeMillis();
// if(aLong>recordTimestamp) {
String msg = now+"["+(lst_ts>0?(now-lst_ts):0)+"]: Key-> " + fields[0] + ",EventTime:" + aLong +", recordTimestamp="+recordTimestamp;
System.out.println(msg);
sb.append(msg).append("\n");
if(lst_ts==0)lst_ts= now;
// }
recordTimestamp = Math.max(aLong, recordTimestamp);
return aLong;
}
}
)
);
dataStream.map(new MapFunction<String, Tuple2<String, String>>() {
private static final long serialVersionUID = 12342L;
@Override
public Tuple2<String, String> map(String s) throws Exception {
return new Tuple2<String, String>(s.split(",")[0], s.split(",")[1]);
}
}).keyBy(f->f.f0)
.window(TumblingEventTimeWindows.of(Time.seconds(5)))//n秒种滚动窗口
.allowedLateness(Time.seconds(0))//这个设置大于0,就会出现一个周期反复出现结果的情况,而且是会把当前周期退回晚的数据的周期,就是有一条迟到,就会改允许迟到的周期。
.sideOutputLateData(lateOutputTag)
.apply(new WindowFunction<Tuple2<String,String>, String, String, TimeWindow>() {
private static final long serialVersionUID = 1112151L;
private long last_deal_ts = 0L;
int pos = 0;
String msg0 = "";
String msg1 = "";
@Override
public void apply(java.lang.String key, TimeWindow window, Iterable<Tuple2<java.lang.String, java.lang.String>> input, Collector<java.lang.String> out) throws Exception {
long cur_ts = System.currentTimeMillis();
if(last_deal_ts==0)
last_deal_ts = sts;
String msg = cur_ts+"["+(last_deal_ts>0?(cur_ts-last_deal_ts):"-")+"]"+" 当前窗口开始时间[" + window.getStart() + ",结束时间" + window.getEnd() + ")";
sb.append(msg).append("\n");
System.out.println(msg);
last_deal_ts = cur_ts;
List<Tuple2<String, String>> list = new ArrayList<>();
input.forEach(o -> list.add(o));
list.sort((o1, o2) -> o1.f1.compareTo(o2.f1));
//list.sort(Comparator.comparing(o -> o.f1)); // 与上句代码同义,按照第二个属性升序排序
pos = 0;
msg0 = "";
msg1 = "";
list.forEach(o -> {
if(pos++<1)
msg0 ="> "+o.f1+"\n";
else
msg1 ="> "+o.f1+"\n";
out.collect(" - " + o.f1);
System.out.println("> "+o.f1);
});
sb.append(msg0).append(msg1);
ets = System.currentTimeMillis();
}
})
.getSideOutput(lateOutputTag).map(new MapFunction<Tuple2<String,String>, String>() {
private static final long serialVersionUID = 341902L;
@Override
public String map(Tuple2<String, String> value) throws Exception {
String msg = "[Expire Data]> "+value.f0+"->"+value.f1;
sb.append(msg).append("\n");
return msg;
}
}).print();
env.execute("Flink WaterMark Test1");
/*
2 ============================================================
1674029100314[0]: Key-> hello,EventTime:1553503188000, recordTimestamp=0
1674029100832[518]: Key-> hello,EventTime:1553503186000, recordTimestamp=1553503188000
1674029101341[1027]: Key-> hello,EventTime:1553503183000, recordTimestamp=1553503188000
[Expire Data]> hello->1553503183000
1674029101850[1536]: Key-> hello,EventTime:1553503180000, recordTimestamp=1553503188000
[Expire Data]> hello->1553503180000
1674029102362[2048]: Key-> hello,EventTime:1553503185000, recordTimestamp=1553503188000
1674029102872[2558]: Key-> hello,EventTime:1553503188000, recordTimestamp=1553503188000
1674029103384[3070]: Key-> hello,EventTime:1553503189000, recordTimestamp=1553503188000
1674029104392[4078]: Key-> hello,EventTime:1553503188000, recordTimestamp=1553503189000
1674029105400[5086]: Key-> hello,EventTime:1553503189000, recordTimestamp=1553503189000
1674029106404[6090]: Key-> hello,EventTime:1553503190000, recordTimestamp=1553503189000
1674029107408[7094]: Key-> hello,EventTime:1553503191000, recordTimestamp=1553503190000
1674029108420[8106]: Key-> hello,EventTime:1553503186000, recordTimestamp=1553503191000
1674029109426[9112]: Key-> hello,EventTime:1553503187000, recordTimestamp=1553503191000
1674029110433[10119]: Key-> hello,EventTime:1553503185000, recordTimestamp=1553503191000
1674029111438[11124]: Key-> hello,EventTime:1553503184000, recordTimestamp=1553503191000
[Expire Data]> hello->1553503184000
1674029112444[12130]: Key-> hello,EventTime:1553503183000, recordTimestamp=1553503191000
[Expire Data]> hello->1553503183000
1674029113450[13136]: Key-> hello,EventTime:1553503190000, recordTimestamp=1553503191000
1674029114464[14150]: Key-> hello,EventTime:1553503192000, recordTimestamp=1553503191000
1674029115467[15153]: Key-> hello,EventTime:1553503193000, recordTimestamp=1553503192000
1674029115625[19265] 当前窗口开始时间[1553503185000,结束时间1553503190000)
> 1553503185000
> 1553503189000
1674029116473[16159]: Key-> hello,EventTime:1553503194000, recordTimestamp=1553503193000
1674029117488[17174]: Key-> hello,EventTime:1553503195000, recordTimestamp=1553503194000
1674029118489[18175]: Key-> hello,EventTime:1553503196000, recordTimestamp=1553503195000
1674029119489[19175]: Key-> hello,EventTime:1553503197000, recordTimestamp=1553503196000
1674029120496[20182]: Key-> hello,EventTime:1553503198000, recordTimestamp=1553503197000
1674029120716[5091] 当前窗口开始时间[1553503190000,结束时间1553503195000)
> 1553503190000
> 1553503194000
1674029121508[21194]: Key-> hello,EventTime:1553503199000, recordTimestamp=1553503198000
1674029122516[22202]: Key-> hello,EventTime:1553503200000, recordTimestamp=1553503199000
1674029123517[23203]: Key-> hello,EventTime:1553503201000, recordTimestamp=1553503200000
1674029124533[24219]: Key-> hello,EventTime:1553503202000, recordTimestamp=1553503201000
time use 1=24357, ets=1674029120717
1674029124553[3837] 当前窗口开始时间[1553503195000,结束时间1553503200000)
> 1553503195000
> 1553503199000
1674029124554[1] 当前窗口开始时间[1553503200000,结束时间1553503205000)
> 1553503200000
> 1553503202000
time use2=28256, sts=1674029096360
*/
}
public static void main(String[] args) throws Exception {
WaterMarkTest test = new WaterMarkTest();
test.run();
System.out.println("2 ============================================================");
sb.append("time use2="+(System.currentTimeMillis()-sts)+", sts="+sts+"\n");
System.out.println(sb.toString());
}
}
参考:
https://blog.csdn.net/Vector97/article/details/110150925
https://blog.csdn.net/RonieWhite/article/details/114386907