11、水位线
11.1、水位线概念
一般实时流处理场景中,事件时间基本与处理时间保持同步,可能会略微延迟。
flink中用来衡量事件时间进展的标记就是水位线(WaterMark)。水位线可以看作一条特殊的数据记录,它是插入到数据流中的一个标记点,主要内容是一个时间戳,用来指示当前的事件时间。一般使用某个数据的时间戳作为水位线的时间戳。
水位线特性:
- 水位线是插入到数据流中的一个标记
- 水位线主要内容是一个时间戳用来表示当前事件时间的进展
- 水位线是基于数据的时间戳生成的
- 水位线时间戳单调递增
- 水位线可通过设置延迟正确处理乱序数据
- 一个水位线WaterMark(t)表示在当前流中事件时间已经达到了时间戳t,代表t之前的所有数据都到齐了,之后流中不会出现时间戳小于或等于t的数据
以WaterMark等2s为例:
**注意:**flink窗口并不是静态准备好的,而是动态创建的,当有罗在这个窗口区间范围的数据达到时才创建对应的窗口。当到达窗口结束时间后窗口就触发计算并关闭,触发计算和窗口关闭两个行为也是分开的。
11.2、生成水位线
11.2.1、原则
要性能就设置低水位线或不设置水位线,直接使用处理时间语义可得到最低的延迟,但有可能遗漏数据。
如要保证数据全部到齐可以设置高水位线,但会影响性能,计算会有延迟。
11.2.2、内置水位线
1、有序流中内置水位线设置
直接调用
package waterMark;
import bean.WaterSensor;
import org.apache.commons.lang3.time.DateFormatUtils;
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.datastream.WindowedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
public class WatermarkMonoDemo {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();;
env.setParallelism(1);
SingleOutputStreamOperator<WaterSensor> dataStreamSource = env.socketTextStream("192.168.132.101", 7777)
.map(value ->{
String[] datas = value.split(",");
return new WaterSensor(datas[0],Long.parseLong(datas[1]),Integer.parseInt(datas[2]) );
} )
.assignTimestampsAndWatermarks(
//指定WaterMark策略
WatermarkStrategy
//升序的WaterMark,没有等待时间
.<WaterSensor>forMonotonousTimestamps()
//指定时间戳分配器
.withTimestampAssigner(
new SerializableTimestampAssigner<WaterSensor>(){
@Override
public long extractTimestamp(WaterSensor element, long recordTimestamp) {
//返回的时间戳为毫秒
System.out.println("数据="+element+"recordTS="+recordTimestamp);
return element.getTs()*1000L;
}
}
)
);
SingleOutputStreamOperator<String> process = dataStreamSource
.keyBy(value -> value.getId())
//要使用事件语义的窗口TumblingEventTimeWindows,WaterMark才能起作用,不能用TumblingProcessingTimeWindows
.window(TumblingEventTimeWindows.of(Time.seconds(10)))
.process(new ProcessWindowFunction<WaterSensor, String, String, TimeWindow>() {
@Override
public void process(String key, Context context, Iterable<WaterSensor> elements, Collector<String> out) throws Exception {
long start = context.window().getStart();
long end = context.window().getEnd();
String winStart = DateFormatUtils.format(start, "yyyy-MM-dd HH:mm:ss.SSS");
String winEnd = DateFormatUtils.format(end, "yyyy-MM-dd HH:mm:ss.SSS");
long l = elements.spliterator().estimateSize();
out.collect("key=" + key + "的窗口[" + winStart + "," + winEnd + "]包含" + l + "条数据" + elements.toString());
}
});
process.print();
env.execute();
}
}
2、乱序流中内置水位线设置
设置等待时间为2秒,即12秒时触发窗口关闭
package waterMark;
import bean.WaterSensor;
import org.apache.commons.lang3.time.DateFormatUtils;
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
import java.time.Duration;
/**
* @Title:
* @Author lizhe
* @Package sink
* @Date 2024/6/5 21:57
* @description:
*/
public class WatermarkOutOfOrdernessDemo {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();;
env.setParallelism(1);
SingleOutputStreamOperator<WaterSensor> dataStreamSource = env.socketTextStream("192.168.132.101", 7777)
.map(value ->{
String[] datas = value.split(",");
return new WaterSensor(datas[0],Long.parseLong(datas[1]),Integer.parseInt(datas[2]) );
} )
.assignTimestampsAndWatermarks(
//指定WaterMark策略
WatermarkStrategy
//乱序的WaterMark,有等待时间
.<WaterSensor>forBoundedOutOfOrderness(Duration.ofSeconds(2))
//指定时间戳分配器
.withTimestampAssigner(
new SerializableTimestampAssigner<WaterSensor>(){
@Override
public long extractTimestamp(WaterSensor element, long recordTimestamp) {
//返回的时间戳为毫秒
System.out.println("数据="+element+"recordTS="+recordTimestamp);
return element.getTs()*1000L;
}
}
)
);
SingleOutputStreamOperator<String> process = dataStreamSource
.keyBy(value -> value.getId())
//要使用事件语义的窗口TumblingEventTimeWindows,WaterMark才能起作用,不能用TumblingProcessingTimeWindows
.window(TumblingEventTimeWindows.of(Time.seconds(10)))
.process(new ProcessWindowFunction<WaterSensor, String, String, TimeWindow>() {
@Override
public void process(String key, Context context, Iterable<WaterSensor> elements, Collector<String> out) throws Exception {
long start = context.window().getStart();
long end = context.window().getEnd();
String winStart = DateFormatUtils.format(start, "yyyy-MM-dd HH:mm:ss.SSS");
String winEnd = DateFormatUtils.format(end, "yyyy-MM-dd HH:mm:ss.SSS");
long l = elements.spliterator().estimateSize();
out.collect("key=" + key + "的窗口[" + winStart + "," + winEnd + "]包含" + l + "条数据" + elements.toString());
}
});
process.print();
env.execute();
}
}
结果:
可见当发送数据=WaterSensor{id=‘s1’, ts=12, vc=12}recordTS=-9223372036854775808时使得[0,10)窗口关闭,但是WaterSensor{id=‘s1’, ts=12, vc=12}不会在[0,10)窗口中,而是在[10,20)窗口中。
11.2.3、内置WaterMark生成原理
- 都是周期性生成的,默认是200ms
- 有序流:WaterMark=当前最大的事件时间-1ms
- 乱序流:WaterMark=当前最大的事件时间-延迟时间-1ms
11.3、水位线的传递
11.3.1、多并行度下水位线传递
水位线传递以最小的WaterMark为准,否则提前触发关窗造成数据丢失。
演示WaterMark多并行度下的传递
package waterMark;
import bean.WaterSensor;
import org.apache.commons.lang3.time.DateFormatUtils;
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
import java.time.Duration;
/**
* @Title:
* @Author lizhe
* @Package sink
* @Date 2024/6/5 21:57
* @description:
*/
public class WatermarkOutOfOrdernessDemo {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//演示WaterMark多并行度下的传递
env.setParallelism(2);
SingleOutputStreamOperator<WaterSensor> dataStreamSource = env.socketTextStream("192.168.132.101", 7777)
.map(value ->{
String[] datas = value.split(",");
return new WaterSensor(datas[0],Long.parseLong(datas[1]),Integer.parseInt(datas[2]) );
} )
.assignTimestampsAndWatermarks(
//指定WaterMark策略
WatermarkStrategy
//乱序的WaterMark,有等待时间
.<WaterSensor>forBoundedOutOfOrderness(Duration.ofSeconds(2))
//指定时间戳分配器
.withTimestampAssigner(
new SerializableTimestampAssigner<WaterSensor>(){
@Override
public long extractTimestamp(WaterSensor element, long recordTimestamp) {
//返回的时间戳为毫秒
System.out.println("数据="+element+"recordTS="+recordTimestamp);
return element.getTs()*1000L;
}
}
)
);
SingleOutputStreamOperator<String> process = dataStreamSource
.keyBy(value -> value.getId())
//要使用事件语义的窗口TumblingEventTimeWindows,WaterMark才能起作用,不能用TumblingProcessingTimeWindows
.window(TumblingEventTimeWindows.of(Time.seconds(10)))
.process(new ProcessWindowFunction<WaterSensor, String, String, TimeWindow>() {
@Override
public void process(String key, Context context, Iterable<WaterSensor> elements, Collector<String> out) throws Exception {
long start = context.window().getStart();
long end = context.window().getEnd();
String winStart = DateFormatUtils.format(start, "yyyy-MM-dd HH:mm:ss.SSS");
String winEnd = DateFormatUtils.format(end, "yyyy-MM-dd HH:mm:ss.SSS");
long l = elements.spliterator().estimateSize();
out.collect("key=" + key + "的窗口[" + winStart + "," + winEnd + "]包含" + l + "条数据" + elements.toString());
}
});
process.print();
env.execute();
}
}
结果:
在多并行度下,增加了一个WaterMark的更新操作。当数据WaterSensor{id=‘s1’, ts=12, vc=12}到来时,一个WaterMark,5-2=3,一个WaterMark是12-2=10,因WaterMark取小原则WaterMark是3未更新为10。当数据WaterSensor{id=‘s1’, ts=13, vc=13}到来,WaterMark更新为10,进而触发窗口关闭。
结论:在多并行度下,当触发WaterMark的下一条数据到来时才能进行关窗操作。
11.3.2、水位线空闲等待设置
在多个上游并行任务中,如果有其中一个没有数据,由于当前Task是以最小的那个座位当前任务的事件时钟,就会导致当前Task的水位线无法推进,从而导致窗口无法触发。这时候可以设置空闲等待。
package waterMark;
import bean.WaterSensor;
import org.apache.commons.lang3.time.DateFormatUtils;
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.Partitioner;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
import java.time.Duration;
/**
* @Title:
* @Author lizhe
* @Package
* @Date 2024/6/5 21:57
* @description:
*/
public class WatermarkIdlenessDemo {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(2);
SingleOutputStreamOperator<Integer> streamOperator = env
.socketTextStream("192.168.132.101", 7777)
//自定义分区器,数据%分区数,只输入奇数,都只会去往一个子任务
.partitionCustom(new Partitioner<String>() {
@Override
public int partition(String key, int numPartitions) {
return Integer.parseInt(key) % numPartitions;
}
}, value -> value)
.map(value -> Integer.parseInt(value))
.assignTimestampsAndWatermarks(
//指定WaterMark策略
WatermarkStrategy
.<Integer>forMonotonousTimestamps()
.withTimestampAssigner((r, ts) -> r * 1000
)
//空闲等待5s
.withIdleness(Duration.ofSeconds(5))
);
//分成两组:奇数一组,偶数一组,开10s的事件时间滚动窗口
streamOperator
.keyBy(value -> value%2)
.window(TumblingEventTimeWindows.of(Time.seconds(2)))
.process(new ProcessWindowFunction<Integer, String, Integer, TimeWindow>() {
@Override
public void process(Integer integer, Context context, Iterable<Integer> elements, Collector<String> out) throws Exception {
long start = context.window().getStart();
long end = context.window().getEnd();
String winStart = DateFormatUtils.format(start, "yyyy-MM-dd HH:mm:ss.SSS");
String winEnd = DateFormatUtils.format(end, "yyyy-MM-dd HH:mm:ss.SSS");
long l = elements.spliterator().estimateSize();
out.collect("key=" + integer + "的窗口[" + winStart + "," + winEnd + "]包含" + l + "条数据" + elements.toString());
}
}).print();
env.execute();
}
}
11.4、迟到数据处理
11.4.1、推迟WaterMark推进
在WaterMark产生时设置一个乱序容忍度,推迟系统时间的推进,保证窗口计算被延迟执行,为乱序的数据争取更多时间进入窗口。
forBoundedOutOfOrderness(Duration.ofSeconds(2))
11.4.2、设置窗口延迟关闭
flink的窗口允许迟到数据。当触发窗口计算后会先计算当前结果,但此时并不会关闭窗口。以后每来一条数据就触发一次窗口计算(增量计算)。直到WaterMark超过了窗口结束时间+推迟时间,窗口才会关闭。
.allowedLateness(Time.seconds(2))
package waterMark;
import bean.WaterSensor;
import org.apache.commons.lang3.time.DateFormatUtils;
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
import java.time.Duration;
/**
* @Title:
* @Author lizhe
* @Package
* @Date 2024/6/5 21:57
* @description:
*/
public class WatermarkAllowLatenessDemo {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
SingleOutputStreamOperator<WaterSensor> dataStreamSource = env.socketTextStream("192.168.132.101", 7777)
.map(value ->{
String[] datas = value.split(",");
return new WaterSensor(datas[0],Long.parseLong(datas[1]),Integer.parseInt(datas[2]) );
} )
.assignTimestampsAndWatermarks(
//指定WaterMark策略
WatermarkStrategy
//乱序的WaterMark,有等待时间
.<WaterSensor>forBoundedOutOfOrderness(Duration.ofSeconds(2))
//指定时间戳分配器
.withTimestampAssigner(
new SerializableTimestampAssigner<WaterSensor>(){
@Override
public long extractTimestamp(WaterSensor element, long recordTimestamp) {
//返回的时间戳为毫秒
System.out.println("数据="+element+"recordTS="+recordTimestamp);
return element.getTs()*1000L;
}
}
)
);
SingleOutputStreamOperator<String> process = dataStreamSource
.keyBy(value -> value.getId())
//要使用事件语义的窗口TumblingEventTimeWindows,WaterMark才能起作用,不能用TumblingProcessingTimeWindows
.window(TumblingEventTimeWindows.of(Time.seconds(10)))
//推迟2秒关窗
.allowedLateness(Time.seconds(2))
.process(new ProcessWindowFunction<WaterSensor, String, String, TimeWindow>() {
@Override
public void process(String key, Context context, Iterable<WaterSensor> elements, Collector<String> out) throws Exception {
long start = context.window().getStart();
long end = context.window().getEnd();
String winStart = DateFormatUtils.format(start, "yyyy-MM-dd HH:mm:ss.SSS");
String winEnd = DateFormatUtils.format(end, "yyyy-MM-dd HH:mm:ss.SSS");
long l = elements.spliterator().estimateSize();
out.collect("key=" + key + "的窗口[" + winStart + "," + winEnd + "]包含" + l + "条数据" + elements.toString());
}
});
process.print();
env.execute();
}
}
11.4.3、使用侧流接收迟到数据
使用.sideOutputLateData()函数将迟到数据放到侧输出流
package waterMark;
import bean.WaterSensor;
import org.apache.commons.lang3.time.DateFormatUtils;
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;
import java.lang.reflect.Type;
import java.time.Duration;
/**
* @Title:
* @Author lizhe
* @Package sink
* @Date 2024/6/5 21:57
* @description:
*/
public class WatermarkAllowLatenessDemo {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
SingleOutputStreamOperator<WaterSensor> dataStreamSource = env.socketTextStream("192.168.132.101", 7777)
.map(value ->{
String[] datas = value.split(",");
return new WaterSensor(datas[0],Long.parseLong(datas[1]),Integer.parseInt(datas[2]) );
} )
.assignTimestampsAndWatermarks(
//指定WaterMark策略
WatermarkStrategy
//乱序的WaterMark,有等待时间
.<WaterSensor>forBoundedOutOfOrderness(Duration.ofSeconds(2))
//指定时间戳分配器
.withTimestampAssigner(
new SerializableTimestampAssigner<WaterSensor>(){
@Override
public long extractTimestamp(WaterSensor element, long recordTimestamp) {
//返回的时间戳为毫秒
System.out.println("数据="+element+"recordTS="+recordTimestamp);
return element.getTs()*1000L;
}
}
)
);
OutputTag<WaterSensor> outputTag = new OutputTag<>("late-data", Types.POJO(WaterSensor.class));
SingleOutputStreamOperator<String> process = dataStreamSource
.keyBy(value -> value.getId())
//要使用事件语义的窗口TumblingEventTimeWindows,WaterMark才能起作用,不能用TumblingProcessingTimeWindows
.window(TumblingEventTimeWindows.of(Time.seconds(10)))
//推迟2秒关窗
.allowedLateness(Time.seconds(2))
//关窗后的迟到数据放到侧输出流
.sideOutputLateData(outputTag)
.process(new ProcessWindowFunction<WaterSensor, String, String, TimeWindow>() {
@Override
public void process(String key, Context context, Iterable<WaterSensor> elements, Collector<String> out) throws Exception {
long start = context.window().getStart();
long end = context.window().getEnd();
String winStart = DateFormatUtils.format(start, "yyyy-MM-dd HH:mm:ss.SSS");
String winEnd = DateFormatUtils.format(end, "yyyy-MM-dd HH:mm:ss.SSS");
long l = elements.spliterator().estimateSize();
out.collect("key=" + key + "的窗口[" + winStart + "," + winEnd + "]包含" + l + "条数据" + elements.toString());
}
});
process.print();
process.getSideOutput(outputTag).print("侧输出流");
env.execute();
}
}
11.4.4、总结
-
乱序与迟到的区别:
**乱序:**数据的顺序乱了,出现时间早的比时间晚的晚来
**迟到:**数据的时间戳<当前的WaterMark
-
乱序与迟到数据的处理
- 在WaterMark中指定乱序等待时间
- 如果开窗设置窗口允许迟到
- 关窗后的迟到数据放入侧输出流
-
WaterMark等待时间与窗口允许迟到时间并不能等同和替换
WaterMark涉及到窗口第一次计算时间,WaterMark等待时间过长会导致计算延迟变大。
窗口允许迟到时间只是要保证计算结果更加准确,但不应影响数据计算延迟。
所以二者不能等价代替。
-
WaterMark等待时间与窗口允许迟到时间设置经验
WaterMark等待时间不能设置过大,一般秒级。窗口允许迟到时间只考虑大部分的迟到数据。极端情况小部分迟到数据使用侧输出流。
12、基于时间的合流
上面提到的connect合流可满足大部分需求。但统计固定时间内两条流数据的匹配情况,对于connect要使用自定义,但可以使用更简单的Window来表示,flink 内置了API。
12.1、窗口联结Window Join
- 落在同一个时间窗口范围内才能匹配
- 根据keyby的key来进行匹配关联
- 只能拿到匹配上的数据,类似有固定时间范围的inner join
package waterMark;
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.JoinFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
/**
* @Title: WindowJoinDemo
* @Author lizhe
* @Package Window Join
* @Date 2024/6/8 21:11
* @description:
*/
public class WindowJoinDemo {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
SingleOutputStreamOperator<Tuple2<String, Integer>> ds1 = env
.fromElements(
Tuple2.of("a", 1),
Tuple2.of("a", 2),
Tuple2.of("b", 3),
Tuple2.of("c", 4)
)
.assignTimestampsAndWatermarks(
WatermarkStrategy
.<Tuple2<String, Integer>>forMonotonousTimestamps()
.withTimestampAssigner(new SerializableTimestampAssigner<Tuple2<String, Integer>>() {
@Override
public long extractTimestamp(Tuple2<String, Integer> element, long recordTimestamp) {
return element.f1 * 1000L;
}
})
);
SingleOutputStreamOperator<Tuple3<String, Integer, Integer>> ds2 = env
.fromElements(
Tuple3.of("a", 1,1),
Tuple3.of("a", 11,1),
Tuple3.of("b", 2,1),
Tuple3.of("b", 12,1),
Tuple3.of("c", 14,1),
Tuple3.of("d", 15,1)
)
.assignTimestampsAndWatermarks(
WatermarkStrategy
.<Tuple3<String, Integer, Integer>>forMonotonousTimestamps()
.withTimestampAssigner(new SerializableTimestampAssigner<Tuple3<String, Integer, Integer>>() {
@Override
public long extractTimestamp(Tuple3<String, Integer, Integer> element, long recordTimestamp) {
return element.f1 * 1000L;
}
})
);
DataStream<String> join = ds1.join(ds2)
.where(r1 -> r1.f0)
.equalTo(r2 -> r2.f0)
.window(TumblingEventTimeWindows.of(Time.seconds(5)))
.apply(new JoinFunction<Tuple2<String, Integer>, Tuple3<String, Integer, Integer>, String>() {
/**
* 关联上的数据调用join方法
* @param first ds1的数据
* @param second ds2的数据
* @return
* @throws Exception
*/
@Override
public String join(Tuple2<String, Integer> first, Tuple3<String, Integer, Integer> second) throws Exception {
return first + "----" + second;
}
});
join.print();
env.execute();
}
}
12.2、间隔联结Interval Join
有时要处理的时间间隔并不固定。要匹配的数据可能刚开卡在窗口边缘两侧造成匹配失败。所有窗口联结并不能满足要求。
间隔联结的思路是针对一条流的每个数据开辟出其时间戳前后的一段时间间隔指定,上下界的偏移,负号代表时间往前,正号代表时间往后,看这期间是否有来自另一条流的匹配。(只支持事件时间语义)
package waterMark;
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.JoinFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.ProcessJoinFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;
/**
* @Title:
* @Author lizhe
* @Package
* @Date 2024/6/8 21:11
* @description:
*/
public class IntervalJoinDemo {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
SingleOutputStreamOperator<Tuple2<String, Integer>> ds1 = env
.fromElements(
Tuple2.of("a", 1),
Tuple2.of("a", 2),
Tuple2.of("b", 3),
Tuple2.of("c", 4)
)
.assignTimestampsAndWatermarks(
WatermarkStrategy
.<Tuple2<String, Integer>>forMonotonousTimestamps()
.withTimestampAssigner(new SerializableTimestampAssigner<Tuple2<String, Integer>>() {
@Override
public long extractTimestamp(Tuple2<String, Integer> element, long recordTimestamp) {
return element.f1 * 1000L;
}
})
);
SingleOutputStreamOperator<Tuple3<String, Integer, Integer>> ds2 = env
.fromElements(
Tuple3.of("a", 1,1),
Tuple3.of("a", 11,1),
Tuple3.of("b", 2,1),
Tuple3.of("b", 12,1),
Tuple3.of("c", 14,1),
Tuple3.of("d", 15,1)
)
.assignTimestampsAndWatermarks(
WatermarkStrategy
.<Tuple3<String, Integer, Integer>>forMonotonousTimestamps()
.withTimestampAssigner(new SerializableTimestampAssigner<Tuple3<String, Integer, Integer>>() {
@Override
public long extractTimestamp(Tuple3<String, Integer, Integer> element, long recordTimestamp) {
return element.f1 * 1000L;
}
})
);
KeyedStream<Tuple2<String, Integer>, String> stream1 = ds1.keyBy(value -> value.f0);
KeyedStream<Tuple3<String, Integer, Integer>, String> stream2 = ds2.keyBy(value -> value.f0);
stream1.intervalJoin(stream2)
.between(Time.seconds(-2),Time.seconds(2))
.process(new ProcessJoinFunction<Tuple2<String, Integer>, Tuple3<String, Integer, Integer>, String>() {
/**
* 两条流的数据匹配上才会调用方法
* @param left stream1的数据
* @param right stream2的数据
* @param ctx 上下文
* @param out 采集器
* @throws Exception
*/
@Override
public void processElement(Tuple2<String, Integer> left, Tuple3<String, Integer, Integer> right, Context ctx, Collector<String> out) throws Exception {
//进入这个方法是关联上的数据
out.collect(left+"----"+right);
}
})
.print();
env.execute();
}
}
1.17版本支持将该匹配上的迟到数据通过侧输出流输出
如果当前数据的事件时间<当前的WaterMark就是迟到数据,主流的process不处理。
但在between后使用SideOutputLeftLateData(),SideOutputRightLateData()函数将迟到数据放到侧输出流
13、处理函数
DataStream更下层的API,统一称为process算子,接口就是process function(处理函数)
13.1、基本处理函数
处理函数提供一个定时服务(TimeService),可以通过它访问流中的事件、时间戳、水位线,甚至可以注册定时事件。处理函数集成了AbstractRichFunction,拥有富函数类的所有特性,可以访问状态和其他运行时信息。处理函数可以直接将数据输出的侧输出流。处理函数是最为灵活的处理方法,可实现各种自定义逻辑。
分类:
- ProcessFunction
- KeyedProcessFunction
- ProcessWindowFunction
- ProcessAllWindowFunction
- CoProcessFunction
- ProcessJoinFunction
- BroadcastProcessFunction
- KeyedBroadcastProcessFunction
13.2、按键分区处理函数KeyedProcessFunction
只有在KeyedStream才支持使用TimeService设置定时器。
13.2.1、定时器和定时服务
keyedStream.process(
new KeyedProcessFunction<String, WaterSensor, String>() {
/**
* 来一条数据处理一次
* @param value
* @param ctx
* @param out
* @throws Exception
*/
@Override
public void processElement(WaterSensor value, Context ctx, Collector<String> out) throws Exception {
//数据中提取出来的事件时间,如果没有则为null
Long timestamp = ctx.timestamp();
//定时器
TimerService timerService = ctx.timerService();
//注册定时器:处理时间
timerService.registerEventTimeTimer(10L);
//注册定时器:事件时间
timerService.currentProcessingTime();
//删除定时器:事件时间
timerService.deleteEventTimeTimer(10L);
//删除定时器:处理时间
timerService.deleteProcessingTimeTimer(10L);
//获取当前处理时间,即系统时间
timerService.currentProcessingTime();
//获取当前WaterMark
timerService.currentWatermark();
}
}
);
事件时间定时器:
package process;
import bean.WaterSensor;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.streaming.api.TimerService;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.util.Collector;
import java.time.Duration;
/**
* @Title: KeyedProcessTimerDemo
* @Author lizhe
* @Package process
* @Date 2024/6/9 12:29
* @description:
*/
public class KeyedProcessTimerDemo {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
SingleOutputStreamOperator<WaterSensor> singleOutputStreamOperator = env.socketTextStream("192.168.132.101", 7777)
.map(
value -> {
String[] datas = value.split(",");
return new WaterSensor(datas[0], Long.parseLong(datas[1]), Integer.parseInt(datas[2]));
}
);
WatermarkStrategy<WaterSensor> waterSensorWatermarkStrategy = WatermarkStrategy.<WaterSensor>forBoundedOutOfOrderness(Duration.ofSeconds(2))
.withTimestampAssigner(
(element, recordTime) -> {
return element.getTs() * 1000L;
}
);
SingleOutputStreamOperator<WaterSensor> operator = singleOutputStreamOperator.assignTimestampsAndWatermarks(waterSensorWatermarkStrategy);
KeyedStream<WaterSensor, String> keyedStream = operator.keyBy(value -> value.getId());
keyedStream.process(
new KeyedProcessFunction<String, WaterSensor, String>() {
/**
* 来一条数据处理一次
* @param value
* @param ctx
* @param out
* @throws Exception
*/
@Override
public void processElement(WaterSensor value, Context ctx, Collector<String> out) throws Exception {
//数据中提取出来的事件时间,如果没有则为null
Long timestamp = ctx.timestamp();
//定时器
TimerService timerService = ctx.timerService();
//注册定时器:处理时间
timerService.registerEventTimeTimer(5000L);
System.out.println("当前时间"+timestamp+",注册了一个5s的定时器");
}
/**
* 时间进展到定时器注册的时间,调用该方法
* @param timestamp 当前时间进展
* @param ctx 上下文
* @param out 采集器
* @throws Exception
*/
@Override
public void onTimer(long timestamp, OnTimerContext ctx, Collector<String> out) throws Exception {
super.onTimer(timestamp, ctx, out);
System.out.println("现在时间"+timestamp + "定时器触发,key为"+ctx.getCurrentKey());
}
}
)
.print();
env.execute();
}
}
输出:
TimeService会以key和时间戳作为标准,对定时器去重;即对每个key和时间戳最多只有一个定时器,如果注册了多次,onTimer()方法也将被调用一次。
处理时间定时器:
package process;
import bean.WaterSensor;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.streaming.api.TimerService;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.util.Collector;
import java.time.Duration;
/**
* @Title: KeyedProcessTimerDemo
* @Author lizhe
* @Package process
* @Date 2024/6/9 12:29
* @description:
*/
public class KeyedProcessTimerDemo {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
SingleOutputStreamOperator<WaterSensor> singleOutputStreamOperator = env.socketTextStream("192.168.132.101", 7777)
.map(
value -> {
String[] datas = value.split(",");
return new WaterSensor(datas[0], Long.parseLong(datas[1]), Integer.parseInt(datas[2]));
}
);
WatermarkStrategy<WaterSensor> waterSensorWatermarkStrategy = WatermarkStrategy.<WaterSensor>forBoundedOutOfOrderness(Duration.ofSeconds(2))
.withTimestampAssigner(
(element, recordTime) -> {
return element.getTs() * 1000L;
}
);
SingleOutputStreamOperator<WaterSensor> operator = singleOutputStreamOperator.assignTimestampsAndWatermarks(waterSensorWatermarkStrategy);
KeyedStream<WaterSensor, String> keyedStream = operator.keyBy(value -> value.getId());
keyedStream.process(
new KeyedProcessFunction<String, WaterSensor, String>() {
/**
* 来一条数据处理一次
* @param value
* @param ctx
* @param out
* @throws Exception
*/
@Override
public void processElement(WaterSensor value, Context ctx, Collector<String> out) throws Exception {
//数据中提取出来的事件时间,如果没有则为null
Long timestamp = ctx.timestamp();
//定时器
TimerService timerService = ctx.timerService();
long currentProcessingTime = timerService.currentProcessingTime();
timerService.registerProcessingTimeTimer(currentProcessingTime+5000L);
System.out.println("当前时间"+currentProcessingTime+",注册了一个5后的定时器,key为"+ctx.getCurrentKey() );
}
/**
* 时间进展到定时器注册的时间,调用该方法
* @param timestamp 当前时间进展
* @param ctx 上下文
* @param out 采集器
* @throws Exception
*/
@Override
public void onTimer(long timestamp, OnTimerContext ctx, Collector<String> out) throws Exception {
super.onTimer(timestamp, ctx, out);
System.out.println("现在时间"+timestamp + "定时器触发,key为"+ctx.getCurrentKey());
}
}
)
.print();
env.execute();
}
}
总结:
-
事件时间定时器通过WaterMark来触发的,WaterMark>=注册时间。
注意:
WaterMark=当前最大事件时间-等待时间-1ms,因为-1ms会推迟一条数据。比如5s的定时器,如果等待=3s,WaterMark=8s-3s-1ms=4999ms,不会触发5s的定时器。需要WaterMark=9s-3s-1ms=5999ms才能触发5s的定时器
-
在Process中获取当前的WaterMark显示的是上一次的的WaterMark(因为Process还没接收到这条数据对应生成的新WaterMark)
13.3、应用案例
统计一段时间内出现次数最多的水位。统计10s内出现次数最多的两个水位,这两个水位每5s更新一次。
可使用滑动窗口实现按照不同水位进行统计
后面仔细看吧,可能有问题!!!
package process;
import bean.WaterSensor;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.AggregateFunction;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.SlidingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
import java.time.Duration;
import java.util.*;
/**
* @Title:
* @Author lizhe
* @Package process
* @Date 2024/6/9 12:29
* @description:
*/
public class TopNDemo {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
SingleOutputStreamOperator<WaterSensor> operator = env.socketTextStream("192.168.132.101", 7777)
.map(
value -> {
String[] datas = value.split(",");
return new WaterSensor(datas[0], Long.parseLong(datas[1]), Integer.parseInt(datas[2]));
}
)
.assignTimestampsAndWatermarks(
WatermarkStrategy.<WaterSensor>forBoundedOutOfOrderness(Duration.ofSeconds(2))
.withTimestampAssigner(
(element, recordTime) -> {
return element.getTs() * 1000L;
}
)
);
//1、按照vc分组,开窗聚合(增量计算+全量打标签)
//开窗聚合后就是普通的流,丢失了窗口信息需要自己打窗口标签(WindowEnd)
SingleOutputStreamOperator<Tuple3<Integer, Integer, Long>> aggregate = operator.keyBy(value -> value.getVc())
.window(SlidingProcessingTimeWindows.of(Time.seconds(10), Time.seconds(5)))
.aggregate(new VcCountAgg(), new WindowResult());
//2、按照窗口标签keyby,保证同一个窗口时间范围的结果到一起去。排序去TopN
aggregate.keyBy(value -> value.f2)
.process(new TopN(2))
.print();
env.execute();
}
public static class VcCountAgg implements AggregateFunction<WaterSensor,Integer,Integer>{
@Override
public Integer createAccumulator() {
return 0;
}
@Override
public Integer add(WaterSensor value, Integer accumulator) {
return ++accumulator;
}
@Override
public Integer getResult(Integer accumulator) {
return accumulator;
}
@Override
public Integer merge(Integer a, Integer b) {
return null;
}
}
/**
* 泛型如下:
* 第一个:输入类型=增量函数的输出 count值
* 第二个:输出类型=Tuple(vc,count,WindowEnd)带上窗口结束时间的标签
* 第三个:key类型,vc,Integer
* 第四个:窗口类型
*/
public static class WindowResult extends ProcessWindowFunction<Integer, Tuple3<Integer,Integer,Long>,Integer, TimeWindow>{
@Override
public void process(Integer key, Context context, Iterable<Integer> elements, Collector<Tuple3<Integer, Integer, Long>> out) throws Exception {
Integer count = elements.iterator().next();
long windowsEnd = context.window().getEnd();
out.collect(Tuple3.of(key,count,windowsEnd));
}
}
public static class TopN extends KeyedProcessFunction<Long,Tuple3<Integer, Integer, Long>,String>{
//存不同窗口的统计结果 key=windowEnd value=list数据
private Map<Long, List< Tuple3<Integer,Integer,Long>>> dataListMap;
//要取的Top的数量
private int threshold;
public TopN(int threshold) {
dataListMap = new HashMap<>();
this.threshold = threshold;
}
@Override
public void processElement(Tuple3<Integer, Integer, Long> value, Context ctx, Collector<String> out) throws Exception {
//进入这个方法只是一条数据,要排序,要存起来,不同的窗口要分开存
//1、存到HashMap中
Long windowEnd = value.f2;
if (dataListMap.containsKey(windowEnd)){
//1.1 包含vc 不是该vc的第一条,直接加到list中
List<Tuple3<Integer, Integer, Long>> tuple3List = dataListMap.get(windowEnd);
tuple3List.add(value);
}else {
//1.1 不包含vc是该vc的第一条,需要初始化list
List<Tuple3<Integer, Integer, Long>> dataList = new ArrayList<>();
dataList.add(value);
dataListMap.put(windowEnd,dataList);
}
//2、注册一个定时器,WindowsEnd+1ms即可(同一个窗口范围应该同时输出的,只不过是一条条调用ProcessElement方法,只需延迟1ms)
ctx.timerService().registerProcessingTimeTimer(windowEnd+1);
}
@Override
public void onTimer(long timestamp, OnTimerContext ctx, Collector<String> out) throws Exception {
super.onTimer(timestamp, ctx, out);
//定时器触发,同一个窗口范围的计算结果攒齐了,开始、排序、取TopN
Long windowEnd = ctx.getCurrentKey();
//1、排序
List<Tuple3<Integer, Integer, Long>> dataList = dataListMap.get(windowEnd);
dataList.sort(
new Comparator<Tuple3<Integer, Integer, Long>>() {
@Override
public int compare(Tuple3<Integer, Integer, Long> o1, Tuple3<Integer, Integer, Long> o2) {
return o2.f1-o1.f1;
}
}
);
//2、取TopN
StringBuilder outStr = new StringBuilder();
outStr.append("==========\n");
//遍历 排序后的list,取出前threshold个,dataList要是不够dataList个取dataList.size()
for (int i = 0; i < Math.min(threshold,dataList.size()); i++) {
Tuple3<Integer, Integer, Long> vcCount = dataList.get(i);
outStr.append("Top"+(i+1)+"\n");
outStr.append("vc="+vcCount.f0+"\n");
outStr.append("count="+vcCount.f1 + "\n");
outStr.append("窗口结束时间"+ vcCount.f2 + "\n");
}
//用完的list及时清理
dataList.clear();
out.collect(outStr.toString());
}
}
}
13.4、侧输出流
使用侧输出流实现水位告警
package process;
import bean.WaterSensor;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.streaming.api.TimerService;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;
import java.time.Duration;
/**
* @Title:
* @Author lizhe
* @Package process
* @Date 2024/6/9 12:29
* @description:
*/
public class SideOutputDemo {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
WatermarkStrategy<WaterSensor> waterSensorWatermarkStrategy = WatermarkStrategy.<WaterSensor>forBoundedOutOfOrderness(Duration.ofSeconds(2))
.withTimestampAssigner(
(element, recordTime) -> {
return element.getTs() * 1000L;
}
);
SingleOutputStreamOperator<WaterSensor> singleOutputStreamOperator = env.socketTextStream("192.168.132.101", 7777)
.map(
value -> {
String[] datas = value.split(",");
return new WaterSensor(datas[0], Long.parseLong(datas[1]), Integer.parseInt(datas[2]));
}
)
.assignTimestampsAndWatermarks(waterSensorWatermarkStrategy);
final OutputTag<String> warnTag = new OutputTag<>("warn", Types.STRING);
SingleOutputStreamOperator<WaterSensor> process = singleOutputStreamOperator.keyBy(value -> value.getId())
.process(
new KeyedProcessFunction<String, WaterSensor, WaterSensor>() {
@Override
public void processElement(WaterSensor value, Context ctx, Collector<WaterSensor> out) throws Exception {
//使用侧输出流告警
if (value.getVc() > 10) {
ctx.output(warnTag, "当前水位=" + value.getVc() + ",大于阈值10!");
}
out.collect(value);
}
}
);
process.print();
process.getSideOutput(warnTag).printToErr("warn");
env.execute();
}
}
14、状态管理
14.1、flink的状态
分为有状态和无状态两种。
无状态的算子任务只要观察每个独立事件,根据当前输入的数据直接转换输出结果。如:map、filter、flatMap。
有状态算子任务除当前数据外还要其他数据来得到计算结果。“其他数据”就是状态。如:聚合算子、窗口算子。
状态的分类:
-
托管状态和原始状态
托管状态:由flink统一管理使用时只需要调用相应接口。
原始状态:自定义的相当于开辟了一块内存自己管理,自己实现状态的序列化和故障恢复。
通常采用flink托管状态(重点)
-
算子状态和按键分区状态
通过keyby()函数的称为按键分区状态,其他为算子状态
14.2、算子状态
对于一个并行子任务,处理的所有数据都会访问到相同的状态,状态对于同一任务而言是共享的。
算子状态可以用在所有算子上,类似本地变量。
14.3、按键分区状态
状态根据输入流中定义的键来维护和访问,也就keyby后能用。
14.3.1、值状态
状态只保存一个值。
水位相差10则报警
package state;
import bean.WaterSensor;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.util.Collector;
import java.io.Serializable;
import java.time.Duration;
/**
* @Title: KeyedValueStateDemo
* @Author lizhe
* @Package state
* @Date 2024/6/11 20:54
* @description:水位相差10则报警
*/
public class KeyedValueStateDemo {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
SingleOutputStreamOperator<WaterSensor> sensorDS = env.socketTextStream("192.168.132.101", 7777)
.map(
value -> {
String[] datas = value.split(",");
return new WaterSensor(datas[0], Long.parseLong(datas[1]), Integer.parseInt(datas[2]));
}
)
.assignTimestampsAndWatermarks(
WatermarkStrategy.<WaterSensor>forBoundedOutOfOrderness(Duration.ofSeconds(2))
.withTimestampAssigner(
(element, recordTime) -> {
return element.getTs() * 1000L;
}
)
);
sensorDS.keyBy(value -> value.getId())
.process(
new KeyedProcessFunction<String, WaterSensor, String>() {
ValueState<Integer> lastVcState;
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
ValueStateDescriptor lastVc = new ValueStateDescriptor<Integer>("lastVc", Integer.class);
lastVcState=getRuntimeContext().getState(lastVc);
}
@Override
public void processElement(WaterSensor value, Context ctx, Collector<String> out) throws Exception {
int lastVc = lastVcState.value() == null ? 0 : lastVcState.value();
if (Math.abs(value.getVc()-lastVc)>10) {
out.collect("传感器id="+value.getId()+",当前水位值="+value.getVc()+",上一条水水位值="+lastVc+"相差超过10");
}
lastVcState.update(value.getVc());
}
}
).print();
env.execute();
}
}
14.3.2、列表状态
将要保存的数据以列表形式进行保存
针对每种传感器输出最高的三个水位值
package state;
import bean.WaterSensor;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.util.Collector;
import java.time.Duration;
import java.util.ArrayList;
/**
* @Title:
* @Author lizhe
* @Package state
* @Date 2024/6/11 20:54
* @description:针对每种传感器输出最高的三个水位值
*/
public class KeyedListStateDemo {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
SingleOutputStreamOperator<WaterSensor> sensorDS = env.socketTextStream("192.168.132.101", 7777)
.map(
value -> {
String[] datas = value.split(",");
return new WaterSensor(datas[0], Long.parseLong(datas[1]), Integer.parseInt(datas[2]));
}
)
.assignTimestampsAndWatermarks(
WatermarkStrategy.<WaterSensor>forBoundedOutOfOrderness(Duration.ofSeconds(2))
.withTimestampAssigner(
(element, recordTime) -> {
return element.getTs() * 1000L;
}
)
);
sensorDS.keyBy(value -> value.getId())
.process(
new KeyedProcessFunction<String, WaterSensor, String>() {
ListState<Integer> vcListState;
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
vcListState = getRuntimeContext().getListState(new ListStateDescriptor<Integer>("vcListState", Integer.class));
}
@Override
public void processElement(WaterSensor value, Context ctx, Collector<String> out) throws Exception {
vcListState.add(value.getVc());
ArrayList<Integer> arrayList = new ArrayList<>();
for (Integer vc : vcListState.get()) {
arrayList.add(vc);
}
arrayList.sort((o1,o2)->{
return o2-o1;
});
if (arrayList.size() > 3) {
arrayList.remove(3);
}
out.collect("传感器id="+value.getId()+",最大三个水位值="+arrayList.toString());
vcListState.update(arrayList);
}
}
).print();
env.execute();
}
}
14.3.3、map状态
把键值对最为状态保存起来
统计每种传感器每种水位值出现的次数
package state;
import bean.WaterSensor;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.state.MapState;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.util.Collector;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Map;
/**
* @Title:
* @Author lizhe
* @Package state
* @Date 2024/6/11 20:54
* @description:统计每种传感器每种水位值出现的次数
*/
public class KeyedMapStateDemo {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
SingleOutputStreamOperator<WaterSensor> sensorDS = env.socketTextStream("192.168.132.101", 7777)
.map(
value -> {
String[] datas = value.split(",");
return new WaterSensor(datas[0], Long.parseLong(datas[1]), Integer.parseInt(datas[2]));
}
)
.assignTimestampsAndWatermarks(
WatermarkStrategy.<WaterSensor>forBoundedOutOfOrderness(Duration.ofSeconds(2))
.withTimestampAssigner(
(element, recordTime) -> {
return element.getTs() * 1000L;
}
)
);
sensorDS.keyBy(value -> value.getId())
.process(
new KeyedProcessFunction<String, WaterSensor, String>() {
MapState<Integer,Integer> vcCountMapState;
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
vcCountMapState = getRuntimeContext().getMapState(new MapStateDescriptor<Integer, Integer>("vcCountMapState", Integer.class, Integer.class));
}
@Override
public void processElement(WaterSensor value, Context ctx, Collector<String> out) throws Exception {
Integer vc = value.getVc();
if (vcCountMapState.contains(vc)){
int count = vcCountMapState.get(vc) ;
vcCountMapState.put(vc,++count);
}else {
vcCountMapState.put(vc, 1);
}
StringBuilder outStr = new StringBuilder();
outStr.append("传感器id为"+value.getId()+"\n");
for (Map.Entry<Integer, Integer> vcCount : vcCountMapState.entries()) {
outStr.append(vcCount.toString()+"\n");
}
outStr.append("==============");
out.collect(outStr.toString());
}
}
).print();
env.execute();
}
}
14.3.4、规约状态
对添加的数据进行规约,将规约聚合后的值作为状态保存
计算每种传感器的水位和
package state;
import bean.WaterSensor;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.common.state.ReducingState;
import org.apache.flink.api.common.state.ReducingStateDescriptor;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.util.Collector;
import java.time.Duration;
/**
* @Title:
* @Author lizhe
* @Package state
* @Date 2024/6/11 20:54
* @description:计算每种传感器的水位和
*/
public class KeyedReduceStateDemo {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
SingleOutputStreamOperator<WaterSensor> sensorDS = env.socketTextStream("192.168.132.101", 7777)
.map(
value -> {
String[] datas = value.split(",");
return new WaterSensor(datas[0], Long.parseLong(datas[1]), Integer.parseInt(datas[2]));
}
)
.assignTimestampsAndWatermarks(
WatermarkStrategy.<WaterSensor>forBoundedOutOfOrderness(Duration.ofSeconds(2))
.withTimestampAssigner(
(element, recordTime) -> {
return element.getTs() * 1000L;
}
)
);
sensorDS.keyBy(value -> value.getId())
.process(
new KeyedProcessFunction<String, WaterSensor, String>() {
ReducingState<Integer> vcSum;
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
vcSum = getRuntimeContext().getReducingState(new ReducingStateDescriptor<Integer>("vcSum", new ReduceFunction<Integer>() {
@Override
public Integer reduce(Integer value1, Integer value2) throws Exception {
return value1 + value2;
}
}, Integer.class));
}
@Override
public void processElement(WaterSensor value, Context ctx, Collector<String> out) throws Exception {
vcSum.add(value.getVc());
out.collect("传感器id="+value.getId()+"水位值和="+vcSum.get());
}
}
).print();
env.execute();
}
}
14.3.5、聚合状态
类似规约状态,相比于规约状态,聚合里有个累加器来表示状态,聚合的状态类型可与输入数据类型不同
计算水位平均值
package state;
import bean.WaterSensor;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.AggregateFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.common.state.AggregatingState;
import org.apache.flink.api.common.state.AggregatingStateDescriptor;
import org.apache.flink.api.common.state.ReducingState;
import org.apache.flink.api.common.state.ReducingStateDescriptor;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.util.Collector;
import java.time.Duration;
/**
* @Title:
* @Author lizhe
* @Package state
* @Date 2024/6/11 20:54
* @description:计算水位平均值
*/
public class KeyedAggregateStateDemo {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
SingleOutputStreamOperator<WaterSensor> sensorDS = env.socketTextStream("192.168.132.101", 7777)
.map(
value -> {
String[] datas = value.split(",");
return new WaterSensor(datas[0], Long.parseLong(datas[1]), Integer.parseInt(datas[2]));
}
)
.assignTimestampsAndWatermarks(
WatermarkStrategy.<WaterSensor>forBoundedOutOfOrderness(Duration.ofSeconds(2))
.withTimestampAssigner(
(element, recordTime) -> {
return element.getTs() * 1000L;
}
)
);
sensorDS.keyBy(value -> value.getId())
.process(
new KeyedProcessFunction<String, WaterSensor, String>() {
AggregatingState<Integer,Double> vcAggregatingState;
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
vcAggregatingState = getRuntimeContext().getAggregatingState(new AggregatingStateDescriptor<Integer, Tuple2<Integer, Integer>, Double>(
"vcAggregatingState", new AggregateFunction<Integer, Tuple2<Integer, Integer>, Double>() {
@Override
public Tuple2<Integer, Integer> createAccumulator() {
return Tuple2.of(0,0);
}
@Override
public Tuple2<Integer, Integer> add(Integer value, Tuple2<Integer, Integer> accumulator) {
return Tuple2.of(accumulator.f0+value,accumulator.f1 + 1);
}
@Override
public Double getResult(Tuple2<Integer, Integer> accumulator) {
return accumulator.f0*1D/accumulator.f1;
}
@Override
public Tuple2<Integer, Integer> merge(Tuple2<Integer, Integer> a, Tuple2<Integer, Integer> b) {
return null;
}
}
, Types.TUPLE(Types.INT, Types.INT)));
}
@Override
public void processElement(WaterSensor value, Context ctx, Collector<String> out) throws Exception {
vcAggregatingState.add(value.getVc());
Double vcAvg = vcAggregatingState.get();
out.collect("传感器id="+value.getId()+"平均水位="+vcAvg);
}
}
).print();
env.execute();
}
}
14.3.6、状态生存时间TTL
状态创建时候,失效时间=当前时间+TTL。可对时效时间进行更新,创建配置对象,调用状态描述器启动TTL
package state;
import bean.WaterSensor;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.state.StateTtlConfig;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.util.Collector;
import java.time.Duration;
/**
* @Title: KeyedValueStateDemo
* @Author lizhe
* @Package state
* @Date 2024/6/11 20:54
* @description:
*/
public class KeyedStateTTLDemo {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
SingleOutputStreamOperator<WaterSensor> sensorDS = env.socketTextStream("192.168.132.101", 7777)
.map(
value -> {
String[] datas = value.split(",");
return new WaterSensor(datas[0], Long.parseLong(datas[1]), Integer.parseInt(datas[2]));
}
)
.assignTimestampsAndWatermarks(
WatermarkStrategy.<WaterSensor>forBoundedOutOfOrderness(Duration.ofSeconds(2))
.withTimestampAssigner(
(element, recordTime) -> {
return element.getTs() * 1000L;
}
)
);
sensorDS.keyBy(value -> value.getId())
.process(
new KeyedProcessFunction<String, WaterSensor, String>() {
ValueState<Integer> lastVcState;
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
StateTtlConfig stateTtlConfig = StateTtlConfig
.newBuilder(Time.seconds(5))//过期时间5s
.setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)//状态的创建和写入会刷新过期时间
.setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)//不返回过期的状态值
.build();
ValueStateDescriptor<Integer> lastVc = new ValueStateDescriptor<>("lastVc", Integer.class);
lastVc.enableTimeToLive(stateTtlConfig);
lastVcState=getRuntimeContext().getState(lastVc);
}
@Override
public void processElement(WaterSensor value, Context ctx, Collector<String> out) throws Exception {
Integer value1 = lastVcState.value();
out.collect("key="+value.getId()+"状态值"+ value1);
lastVcState.update(value.getVc());
}
}
).print();
env.execute();
}
}
14.4、算子状态
状态分为:列表状态ListState、联合列表状态ListUnionState、广播状态BroadcastState
算子并行实例上定义的状态,作用范围被限定为当前算子任务。
package state;
import bean.WaterSensor;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import java.time.Duration;
import java.util.List;
/**
* @Title: OperatorListDemo
* @Author lizhe
* @Package state
* @Date 2024/6/13 21:50
* @description:在map算子中计算数据个数
*/
public class OperatorListDemo {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
env.socketTextStream("192.168.132.101", 7777)
.map(new MyCountMapFunction())
.print();
env.execute();
}
public static class MyCountMapFunction implements MapFunction<String,Long>, CheckpointedFunction{
private long count =0L;
private ListState<Long> state;
@Override
public Long map(String value) throws Exception {
return ++count;
}
/**
* 本地变量持久化:将本地变量拷贝到算子状态
* @param context
* @throws Exception
*/
@Override
public void snapshotState(FunctionSnapshotContext context) throws Exception {
System.out.println("snapshotState");
state.clear();
state.add(count);
}
/**
* 初始化本地变量:程序恢复时,从状态中将数据添加到本地变量,每个子任务调用一次
* @param context
* @throws Exception
*/
@Override
public void initializeState(FunctionInitializationContext context) throws Exception {
System.out.println("initializeState");
state = context.getOperatorStateStore().getListState(new ListStateDescriptor<Long>("state", Long.class));
if (context.isRestored()){
for (Long aLong : state.get()) {
count+=aLong;
}
}
}
}
}
算子状态List与UnionList区别:
- list状态:轮询均分给新的子任务
- UnionList状态:将原先多个子任务状态的合并成一份完整的。给新的并行子任务每人一份完整的
广播状态:算子并行子任务都保持同一份全局状态。
水位超过指定的阈值发送告警,阈值可以动态修改
package state;
import bean.WaterSensor;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.state.*;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
import org.apache.flink.streaming.api.datastream.BroadcastConnectedStream;
import org.apache.flink.streaming.api.datastream.BroadcastStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.BroadcastProcessFunction;
import org.apache.flink.util.Collector;
/**
* @Title: OperatorListDemo
* @Author lizhe
* @Package state
* @Date 2024/6/13 21:50
* @description:水位超过指定的阈值发送告警,阈值可以动态修改
*/
public class OperatorBroadcastStateDemo {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
SingleOutputStreamOperator<WaterSensor> sensorDS = env.socketTextStream("192.168.132.101", 7777)
.map(
value -> {
String[] datas = value.split(",");
return new WaterSensor(datas[0], Long.parseLong(datas[1]), Integer.parseInt(datas[2]));
}
);
//配置流用来广播配置
DataStreamSource<String> configDS = env.socketTextStream("192.168.132.101", 8888);
final MapStateDescriptor<String, Integer> descriptor = new MapStateDescriptor<>("configDS", String.class, Integer.class);
BroadcastStream<String> broadcastStream = configDS.broadcast(descriptor);
BroadcastConnectedStream<WaterSensor, String> connect = sensorDS.connect(broadcastStream);
connect.process(
new BroadcastProcessFunction<WaterSensor, String, String>() {
/**
* 数据流的处理方法
* @param value
* @param ctx
* @param out
* @throws Exception
*/
@Override
public void processElement(WaterSensor value, ReadOnlyContext ctx, Collector<String> out) throws Exception {
ReadOnlyBroadcastState<String, Integer> broadcastState = ctx.getBroadcastState(descriptor);
Integer integer = broadcastState.get("threshold");
//如果数据流先来,广播流为空,要判空
integer=integer==null ?0:integer;
if (value.getVc()>integer){
out.collect("超过阈值,阈值="+integer);
}
}
/**
* 广播后的配置流处理方法
* @param value
* @param ctx
* @param out
* @throws Exception
*/
@Override
public void processBroadcastElement(String value, Context ctx, Collector<String> out) throws Exception {
BroadcastState<String, Integer> state = ctx.getBroadcastState(descriptor);
state.put("threshold",Integer.valueOf(value));
}
}
).print();
env.execute();
}
}
14.5、状态后端
状态的存储、访问以及维护都是由一个可插拔的组件决定的,这个组件为状态后端,主要负责管理本地状态的存储方式和位置。
14.5.1、状态后端分类
状态后端开箱即用,可不改变程序逻辑独立配置。有两种,一种为哈希表状态后端(默认),一种为内嵌RocksDB状态后端。
- 哈希表状态后端:状态存在内存,直接把状态当对象,存在TaskManager的JVM堆上,以键值对方式存储。
- RocksDB状态后端:RocksDB是kv型数据库,将数据存到硬盘。