flink的定时器都是基于事件时间(event time)或事件处理时间(processing time)的变化来触发响应的。对一部分新手玩家来说,可能不清楚事件时间和事件处理时间的区别。我这里先说一下我的理解,防止下面懵逼。简单来说事件时间就相当于人出生的时间,一般数据生成的时候也会有创建时间。而事件处理时间则相当于人具体做某件事的时间,一条数据可能是2023年生成的,但是到2024年才被处理,这个2024年便被称为这个事件的处理时间。
一、事件时间定时器(event time),这是基于事件时间来触发的,这里面有一个小坑。当第一个事件到的时候创建一个定时器10秒后触发。对我们大部分人来说我既然已经创建了这个定时器,那么10秒后,他就会自动触发。但事实上他10秒后如果没有事件到来他并不会触发。大概意思就是前一个事件创建的定时器需要后一个事件的时间来触发。下面是事件时间定时器的一种实现方式。
import com.xcj.flink.bean.Temperature;
import com.xcj.util.DateFormat;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.OpenContext;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
import org.apache.flink.util.Collector;
import java.time.Duration;
import java.util.Date;
import java.util.Iterator;
import java.util.Scanner;
public class EventTime {
public static void main(String[] args) throws Exception {
SourceTemperature mySourceTemperature = new SourceTemperature();
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
DataStreamSource<Temperature> tDSSource = env.addSource(mySourceTemperature);
WatermarkStrategy<Temperature> twsDS
= WatermarkStrategy.<Temperature>forBoundedOutOfOrderness(Duration.ofSeconds(0))
.withTimestampAssigner((temperature, recordTimestamp) -> temperature.getTimestamp());
SingleOutputStreamOperator<Temperature> tSSODS = tDSSource.assignTimestampsAndWatermarks(twsDS);
KeyedStream<Temperature, String> keyByDS = tSSODS.keyBy(temperature -> temperature.getDay());
SingleOutputStreamOperator<Temperature> process = keyByDS.process(new ProcessFunction<Temperature, Temperature>() {
ListState<Temperature> temperatureListState;
ValueState<Temperature> temperatureState;
ValueState<Integer> size;
ValueState<Long> temperature;
@Override
public void open(OpenContext openContext) throws Exception {
ListStateDescriptor<Temperature> listStateDescriptor = new ListStateDescriptor<>("listState", Temperature.class);
temperatureListState = getRuntimeContext().getListState(listStateDescriptor);
temperatureState = getRuntimeContext().getState(new ValueStateDescriptor<>("temperatureState", Temperature.class));
size = getRuntimeContext().getState(new ValueStateDescriptor<>("sizeState", Integer.class));
temperature = getRuntimeContext().getState(new ValueStateDescriptor<>("temperature", Long.class));
}
@Override
public void processElement(Temperature value, ProcessFunction<Temperature, Temperature>.Context ctx, Collector<Temperature> out) throws Exception {
Temperature value1 = temperatureState.value();
//System.out.println(ctx.timestamp());
if(value1 == null){
temperatureState.update(value);
temperatureListState.add(value);
size.update(1);
//System.out.printf("当前事件处理:"+DateFormat.getDateTime(ctx.timestamp()));
//System.out.println("当前水位线:"+DateFormat.getDateTime(ctx.timerService().currentWatermark()));
temperature.update(value.getTimestamp());
ctx.timerService().registerEventTimeTimer(temperature.value()+1000*10);
}else{
if(value1.getTemperature() < value.getTemperature()){
temperatureState.update(value);
temperatureListState.add(value);
size.update(size.value()+1);
//System.out.println(size.value());
if(size.value()>= 3){
System.out.printf("警告警告:");
Iterator<Temperature> iterator = temperatureListState.get().iterator();
while(iterator.hasNext()){
out.collect(iterator.next());
}
temperatureListState.clear();
temperatureState.clear();
size.clear();
ctx.timerService().deleteEventTimeTimer(temperature.value()+1000*10);
}
}else{
System.out.println("温度降低了");
temperatureState.update(value);
temperatureListState.clear();
temperatureListState.add(value);
size.update(1);
ctx.timerService().deleteEventTimeTimer(temperature.value()+1000*10);
temperature.update(value.getTimestamp());
ctx.timerService().registerEventTimeTimer(temperature.value()+1000*10);
}
}
}
@Override
public void onTimer(long timestamp, ProcessFunction<Temperature, Temperature>.OnTimerContext ctx, Collector<Temperature> out) throws Exception {
System.out.printf("时间到了:清空温度"+DateFormat.getDateTime(ctx.timestamp()));
temperatureListState.clear();
temperatureState.clear();
size.clear();
if(temperature.value() != null)
ctx.timerService().deleteEventTimeTimer(temperature.value() + 10*1000);
}
});
process.print("当前警告温度为:");
env.execute();
}
}
//自己定义数据源
class SourceTemperature extends RichSourceFunction<Temperature> {
@Override
public void run(SourceContext<Temperature> ctx) throws Exception {
Scanner scanner = new Scanner(System.in);
while (true) {
Temperature temperature = new Temperature();
System.out.print("请输入温度: ");
//double temp = Math.random()*40;
double temp = scanner.nextDouble();
//System.out.println(temp);
temperature.setTemperature(temp);
temperature.setTimestamp(new Date().getTime());
ctx.collect(temperature);
//Thread.sleep(1000);
}
}
@Override
public void cancel() {
}
}
//自定义实体类
class Temperature1 {
public Temperature1(double temperature, long timestamp) {
this.temperature = temperature;
this.timestamp = timestamp;
}
public Temperature1(){};
//温度
private double temperature;
//时间
private long timestamp;
//id
private String day = "2024-12-24";
public double getTemperature() {
return temperature;
}
public void setTemperature(double temperature) {
this.temperature = temperature;
}
public long getTimestamp() {
return timestamp;
}
public void setTimestamp(long timestamp) {
this.timestamp = timestamp;
}
public String getDay() {
return day;
}
public void setDay(String day) {
this.day = day;
}
@Override
public String toString() {
return "Temperature1{" +
"temperature=" + temperature +
", timestamp=" + timestamp +
", day='" + day + '\'' +
'}';
}
}
下面我们做一个测试,来验证一下这个解释:前一个事件创建的定时器需要后一个事件的时间来触发。他们的时间间隔超过了10秒钟,但是时间并没有触发,而是下一个事件到的时候才触发的。
二、事件处理时间,事件处理时间触发有系统时间有关
package com.xcj;
import com.xcj.flink.bean.Temperature;
import com.xcj.util.DateFormat;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.OpenContext;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
import org.apache.flink.util.Collector;
import java.time.Duration;
import java.util.Date;
import java.util.Iterator;
import java.util.Scanner;
public class ProcessTime {
public static void main(String[] args) throws Exception {
SourceTemperature mySourceTemperature = new SourceTemperature();
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
DataStreamSource<Temperature> tDSSource = env.addSource(mySourceTemperature);
// WatermarkStrategy<Temperature> twsDS
// = WatermarkStrategy.<Temperature>forBoundedOutOfOrderness(Duration.ofSeconds(0))
// .withTimestampAssigner((temperature, recordTimestamp) -> temperature.getTimestamp());
//
// SingleOutputStreamOperator<Temperature> tSSODS = tDSSource.assignTimestampsAndWatermarks(twsDS);
KeyedStream<Temperature, String> keyByDS = tDSSource.keyBy(temperature -> temperature.getDay());
SingleOutputStreamOperator<Temperature> process = keyByDS.process(new ProcessFunction<Temperature, Temperature>() {
ListState<Temperature> temperatureListState;
ValueState<Temperature> temperatureState;
ValueState<Integer> size;
ValueState<Long> temperature;
@Override
public void open(OpenContext openContext) throws Exception {
ListStateDescriptor<Temperature> listStateDescriptor = new ListStateDescriptor<>("listState", Temperature.class);
temperatureListState = getRuntimeContext().getListState(listStateDescriptor);
temperatureState = getRuntimeContext().getState(new ValueStateDescriptor<>("temperatureState", Temperature.class));
size = getRuntimeContext().getState(new ValueStateDescriptor<>("sizeState", Integer.class));
temperature = getRuntimeContext().getState(new ValueStateDescriptor<>("temperature", Long.class));
}
@Override
public void processElement(Temperature value, ProcessFunction<Temperature, Temperature>.Context ctx, Collector<Temperature> out) throws Exception {
Temperature value1 = temperatureState.value();
//System.out.println(ctx.timestamp());
System.out.printf("当前事件时间:"+DateFormat.getDateTime(value.getTimestamp()));
System.out.println("当前水位线:"+DateFormat.getDateTime(ctx.timerService().currentWatermark()));
if(value1 == null){
temperatureState.update(value);
temperatureListState.add(value);
size.update(1);
temperature.update(ctx.timerService().currentProcessingTime());
ctx.timerService().registerProcessingTimeTimer(temperature.value()+1000*10);
}else{
if(value1.getTemperature() < value.getTemperature()){
temperatureState.update(value);
temperatureListState.add(value);
size.update(size.value()+1);
//System.out.println(size.value());
if(size.value()>= 3){
System.out.printf("警告警告:");
Iterator<Temperature> iterator = temperatureListState.get().iterator();
while(iterator.hasNext()){
out.collect(iterator.next());
}
temperatureListState.clear();
temperatureState.clear();
size.clear();
ctx.timerService().deleteProcessingTimeTimer(temperature.value()+1000*10);
}
}else{
System.out.println("温度降低了");
temperatureState.update(value);
temperatureListState.clear();
temperatureListState.add(value);
size.update(1);
ctx.timerService().deleteProcessingTimeTimer(temperature.value()+1000*10);
temperature.update(value.getTimestamp());
ctx.timerService().registerProcessingTimeTimer(temperature.value()+1000*10);
}
}
}
@Override
public void onTimer(long timestamp, ProcessFunction<Temperature, Temperature>.OnTimerContext ctx, Collector<Temperature> out) throws Exception {
System.out.printf("时间到了:清空温度"+DateFormat.getDateTime(ctx.timestamp()));
temperatureListState.clear();
temperatureState.clear();
size.clear();
if(temperature.value() != null)
ctx.timerService().deleteProcessingTimeTimer(temperature.value() + 10*1000);
}
});
process.print("当前警告温度为:");
env.execute();
}
}
//自己定义数据源
class SourceTemperature extends RichSourceFunction<Temperature> {
@Override
public void run(SourceContext<Temperature> ctx) throws Exception {
Scanner scanner = new Scanner(System.in);
while (true) {
Temperature temperature = new Temperature();
System.out.print("请输入温度: ");
//double temp = Math.random()*40;
double temp = scanner.nextDouble();
//System.out.println(temp);
temperature.setTemperature(temp);
temperature.setTimestamp(new Date().getTime());
ctx.collect(temperature);
//Thread.sleep(1000);
}
}
@Override
public void cancel() {
}
}
//自定义实体类
class Temperature1 {
public Temperature1(double temperature, long timestamp) {
this.temperature = temperature;
this.timestamp = timestamp;
}
public Temperature1(){};
//温度
private double temperature;
//时间
private long timestamp;
//id
private String day = "2024-12-24";
public double getTemperature() {
return temperature;
}
public void setTemperature(double temperature) {
this.temperature = temperature;
}
public long getTimestamp() {
return timestamp;
}
public void setTimestamp(long timestamp) {
this.timestamp = timestamp;
}
public String getDay() {
return day;
}
public void setDay(String day) {
this.day = day;
}
@Override
public String toString() {
return "Temperature1{" +
"temperature=" + temperature +
", timestamp=" + timestamp +
", day='" + day + '\'' +
'}';
}
}
事件处理时间是不需要下一个事件触发的
三、总结
事件时间(event time) 与事件处理时间(process time)定时器整体代码其实差不多,主要是在注册定时器的时候选择的方法
//事件时间
ctx.timerService().registerEventTimeTimer(value.getTimestamp());
//事件处理事件
ctx.timerService().registerProcessingTimeTimer(temperature.value()+1000*10);
和不同定时器的逻辑。注意:事件时间定时器是需要下一个事件来触发上一个事件的定时任务,但是事件处理时间定时器是不需要下一个事件来触发的,他是根据注册时间和系统时间的差值来触发的。
上面我把注册时间改为了过去很久的时间,来一个就触发一次定时任务,因为注册时间与当前系统时间相差>10秒,所以会直接触发。