文章目录
- 一 WaterMark
- 1 水位线特点总结
- 2 实时热门商品【重点】
- (1)数据集
- (2)实现思路
- a 分流 - 开窗 - 聚合
- 分流:
- 开窗:
- 聚合:
- b 再分流 -- 统计
- 再分流:
- 统计:
- (3)代码编写
- 二 处理迟到元素
- 1 什么是迟到元素
- (1)代码编写
- (2)测试
- 2 处理策略
- (1)抛弃迟到元素
- (2)重定向迟到元素
- a 将迟到数据发送到侧输出流中(不开窗口)
- b 将迟到数据发送到侧输出流中(开窗口)
一 WaterMark
1 水位线特点总结
-
Flink 认为时间戳小于水位线的事件都已到达。
-
水位线是一种逻辑时钟。
-
水位线由程序员编程插入到数据流中。
-
水位线是一种特殊的事件。
-
在事件时间的世界里,水位线就是时间。
-
水位线 = 观察到的最大时间戳 - 最大延迟时间 - 1 毫秒。
-
水位线超过窗口结束时间,窗口闭合,默认情况下,迟到元素被抛弃。
-
Flink 会在流的最开始插入一个时间戳为负无穷大的水位线。
-
Flink 会在流的最末尾插入一个时间戳为正无穷大的水位线。
-
Watermark 必须单调递增,以确保任务的事件时间时钟在向前推进,而不是在后退,(Watermark 就是当前数据流的逻辑时钟)。
水位线 = 观察到的最大时间戳 - 最大延迟时间 - 1 毫秒,显然单调递增。
-
Watermark 与数据的时间戳相关。
-
在 Flink 中,Watermark 由应用程序开发人员生成,这通常需要对相应的领域有一定的了解。
-
如果 Watermark 设置的延迟太久,收到结果的速度可能就会很慢,解决办法是在水位线到达之前输出一个近似结果。
-
而如果 Watermark 到达得太早,则可能收到错误结果,不过 Flink 处理迟到数据的机制可以解决这个问题。
2 实时热门商品【重点】
求每个窗口中最热门的商品。
(1)数据集
数据集基本结构如下:用户id,商品id,所属品类id,数据类型(pv或buy),秒级时间戳。
(2)实现思路
a 分流 - 开窗 - 聚合
统计的结果是每一个窗口里面的每一个商品的pv次数。
- 分流后变为键控流,键控流比DataStream多了一个泛型key,所以KeyedProcessFunction有三个泛型【key、输入、输出】。
- 开窗后变为了WindowedStream,WindowedStream比键控流多个一个泛型window,所以ProcessWindowFunction中有四个泛型【输入、输出、key、窗口】。
- 聚合后又变成了DateStream。
分流:
开窗:
聚合:
b 再分流 – 统计
再分流:
想统计每个窗口中的实时热门商品,再使用窗口结束时间(开始时间也可以)进行分流,效果是每一个时间窗口中不同商品id的浏览次数。
统计:
将ItemViewCount存放到列表状态变量中,使用ArrayList进行排序,最终输出。
(3)代码编写
public class Example7 {
public static void main(String[] args) throws Exception{
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
env
.readTextFile("E:\\develop\\MyWork\\flink2022tutorial\\src\\main\\resources\\UserBehavior.csv")
// 数据流ETL
.map(new MapFunction<String, UserBehavior>() {
@Override
public UserBehavior map(String value) throws Exception {
String[] arr = value.split(",");
return new UserBehavior(
arr[0],arr[1],arr[2],arr[3],
Long.parseLong(arr[4]) * 1000L
);
}
})
// 获取pv数据
.filter(r -> r.behavior.equals("pv"))
// 分配水位线
.assignTimestampsAndWatermarks(
// 此数据集针对时间戳已经做了ETL,所以设置延迟时间为0
WatermarkStrategy.<UserBehavior>forBoundedOutOfOrderness(Duration.ofSeconds(0))
.withTimestampAssigner(new SerializableTimestampAssigner<UserBehavior>() {
@Override
public long extractTimestamp(UserBehavior element, long recordTimestamp) {
return element.timestamp;
}
})
)
// 使用商品id对数据进行分流【分流】
.keyBy(r -> r.itemId)
// 每隔5分钟想查看过去一小时最热门的商品(滑动窗口)【开窗】
.window(SlidingEventTimeWindows.of(Time.hours(1),Time.minutes(5)))
// 增量聚合与全窗口聚合结合使用【聚合】
.aggregate(new CountAgg(),new WindowResult())
// 按照ItemViewCount的windowEnd进行分流,获取同一个窗口的统计信息【分流】
.keyBy(r -> r.windowEnd)
// 取前3名
.process(new TopN(3))
.print();
env.execute();
}
// 将每一个到来的ItemViewCount统计信息存储到列表状态中,设置定时器,进行排序
public static class TopN extends KeyedProcessFunction<Long,ItemViewCount,String>{
private ListState<ItemViewCount> listState;
private Integer n;
public TopN(Integer n) {
this.n = n;
}
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
listState = getRuntimeContext().getListState(
new ListStateDescriptor<ItemViewCount>("list-state", Types.POJO(ItemViewCount.class))
);
}
@Override
public void processElement(ItemViewCount value, Context ctx, Collector<String> out) throws Exception {
// 每来一条数据直接存储到列表状态中
listState.add(value);
// 一条流上元素的windowEnd相同,定时器只会注册一次
ctx.timerService().registerEventTimeTimer(value.windowEnd + 1L);
}
@Override
public void onTimer(long timestamp, OnTimerContext ctx, Collector<String> out) throws Exception {
super.onTimer(timestamp, ctx, out);
// 排序逻辑
// 列表状态变量在底层是序列化过的,所以无法针对其直接排序
ArrayList<ItemViewCount> itemViewCountArrayList = new ArrayList<>();
for(ItemViewCount ivc : listState.get()) itemViewCountArrayList.add(ivc);
// 手动对列表状态变量进行GC
listState.clear();
itemViewCountArrayList.sort(new Comparator<ItemViewCount>() {
@Override
public int compare(ItemViewCount o1, ItemViewCount o2) {
// 降序排列
return o2.count.intValue() - o1.count.intValue();
}
});
// 取出窗口结束时间
StringBuilder result = new StringBuilder();
result
.append("======================================\n")
.append("窗口结束时间:" + new Timestamp(timestamp - 1L))
.append("\n");
// 取出前几名,对数据ETL
for(int i = 0; i < n; i++){
ItemViewCount curr = itemViewCountArrayList.get(i);
result
.append("第" + (i + 1) + "名的商品id是:【" + curr.itemId)
.append("】,浏览次数是:【" + curr.count +"】")
.append("\n");
}
result
.append("======================================\n");
out.collect(result.toString());
}
}
public static class WindowResult extends ProcessWindowFunction<Long,ItemViewCount,String, TimeWindow>{
@Override
public void process(String s, Context context, Iterable<Long> elements, Collector<ItemViewCount> out) throws Exception {
out.collect(new ItemViewCount(s,elements.iterator().next(),context.window().getStart(),context.window().getEnd()));
}
}
public static class CountAgg implements AggregateFunction<UserBehavior,Long,Long> {
@Override
public Long createAccumulator() {
return 0L;
}
@Override
public Long add(UserBehavior value, Long accumulator) {
return accumulator + 1L;
}
@Override
public Long getResult(Long accumulator) {
return accumulator;
}
@Override
public Long merge(Long a, Long b) {
return null;
}
}
//聚合结果,每一个商品在每一个窗口中的浏览量
public static class ItemViewCount{
public String itemId;
public Long count;
public Long windowStart;
public Long windowEnd;
public ItemViewCount() {
}
public ItemViewCount(String itemId, Long count, Long windowStart, Long windowEnd) {
this.itemId = itemId;
this.count = count;
this.windowStart = windowStart;
this.windowEnd = windowEnd;
}
@Override
public String toString() {
return "ItemViewCount{" +
"itemId='" + itemId + '\'' +
", count=" + count +
", windowStart=" + new Timestamp(windowStart) +
", windowEnd=" + new Timestamp(windowEnd) +
'}';
}
}
public static class UserBehavior{
public String userId;
public String itemId;
public String categoryId;
public String behavior;
public Long timestamp;
public UserBehavior() {
}
public UserBehavior(String userId, String itemId, String categoryId, String behavior, Long timestamp) {
this.userId = userId;
this.itemId = itemId;
this.categoryId = categoryId;
this.behavior = behavior;
this.timestamp = timestamp;
}
@Override
public String toString() {
return "UserBehavior{" +
"userId='" + userId + '\'' +
", itemId='" + itemId + '\'' +
", categoryId='" + categoryId + '\'' +
", behavior='" + behavior + '\'' +
", timestamp=" + new Timestamp(timestamp) +
'}';
}
}
}
二 处理迟到元素
1 什么是迟到元素
(1)代码编写
迟到元素:到来的数据包含的时间戳小于当前水位线。
具体实例见以下代码:
public static void main(String[] args) throws Exception{
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
env
.socketTextStream("localhost",9999)
.map(new MapFunction<String, Tuple2<String,Long>>() {
@Override
public Tuple2<String, Long> map(String value) throws Exception {
String[] arr = value.split(" ");
return Tuple2.of(arr[0],Long.parseLong(arr[1]) * 1000L);
}
})
.assignTimestampsAndWatermarks(
// 使用延迟时间为0可以使用另一种写法(forBoundedOutOfOrderness(Duration.ofSeconds(0)))
WatermarkStrategy.<Tuple2<String, Long>>forMonotonousTimestamps()
.withTimestampAssigner(
new SerializableTimestampAssigner<Tuple2<String, Long>>() {
@Override
public long extractTimestamp(Tuple2<String, Long> element, long recordTimestamp) {
return element.f1;
}
}
)
)
// 使用ProcessFunction实现:流处理,且处理的是没有经过keyBy的流,所以只能使用processElement
// 不能使用状态变量和定时器
// 其中定时器只能在运行时才会报错,即使在编译期写出来编译器也不会报错
.process(new ProcessFunction<Tuple2<String, Long>, String>() {
@Override
public void processElement(Tuple2<String, Long> value, Context ctx, Collector<String> out) throws Exception {
if(value.f1 < ctx.timerService().currentWatermark()){
out.collect("【" + value + "】元素迟到了");
}else{
out.collect("【" + value + "】元素没有迟到");
}
}
})
.print();
env.execute();
}
(2)测试
依次输入
水位线 = 观察到的最大时间戳 - 最大延迟时间 - 1 毫秒,显然单调递增。
a 1 【水位线:1 - 0 -1ms = 9999ms】 元素携带时间戳为1s,1s > 负无穷大(当前水位线),没有迟到
a 2 【水位线:2 - 0 -1ms = 19999ms】 元素携带时间戳为2s, 2s > 9999ms(当前水位线),没有迟到
a 1 【水位线:2 - 0 -1ms = 19999ms】 元素携带时间戳为1s, 1s < 19999ms(当前水位线),迟到
结果如下图:
由上可以看出,机器时间、处理时间是不存在迟到元素的,只有在事件时间中,才会存在迟到元素。对于事件时间来说,水位线就是它的逻辑时钟。
水位线可以用来平衡计算的完整性和延迟两方面。除非选择一种非常保守的水位线策略 (最大延时设置的非常大,以至于包含了所有的元素,但结果是非常大的延迟),否则总需要处理迟到的元素。
迟到的元素是指当这个元素来到时,这个元素所对应的窗口已经计算完毕了 (也就是说水位线已经没过窗口结束时间了)。这说明迟到这个特性只针对事件时间。
2 处理策略
DataStream API 提供了三种策略来处理迟到元素
- 直接抛弃迟到的元素。
- 将迟到的元素发送到另一条流中去。
- 可以更新窗口已经计算完的结果,并发出计算结果。
(1)抛弃迟到元素
抛弃迟到的元素是事件时间窗口操作符的默认行为。也就是说一个迟到的元素不会创建一个新的窗口。
process function 可以通过比较迟到元素的时间戳和当前水位线的大小来很轻易的过滤掉迟到元素。
(2)重定向迟到元素
迟到的元素也可以使用旁路输出 (side output) 特性(侧输出流)被重定向到另外的一(或n)条流中去。迟到元素所组成的旁路输出流可以继续处理或者 sink 到持久化设施中去。
a 将迟到数据发送到侧输出流中(不开窗口)
// 定义侧输出流的名字(标签)
private static OutputTag<String> lateElement = new OutputTag<String>("late-element"){};
public static void main(String[] args) throws Exception{
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
SingleOutputStreamOperator<String> result = env
// 自定义数据源
.addSource(new SourceFunction<Tuple2<String, Long>>() {
@Override
public void run(SourceContext<Tuple2<String, Long>> ctx) throws Exception {
// 指定时间戳并发送数据,如果两个时间戳不同,以指定的时间戳为准
ctx.collectWithTimestamp(Tuple2.of("hello word", 1000L), 1000L);
// 发送水位线
ctx.emitWatermark(new Watermark(999L));
ctx.collectWithTimestamp(Tuple2.of("hello flink", 2000L), 2000L);
ctx.emitWatermark(new Watermark(1999L));
ctx.collectWithTimestamp(Tuple2.of("hello late", 1000L), 1000L);
}
@Override
public void cancel() {
}
})
.process(new ProcessFunction<Tuple2<String, Long>, String>() {
@Override
public void processElement(Tuple2<String, Long> value, Context ctx, Collector<String> out) throws Exception {
if (value.f1 < ctx.timerService().currentWatermark()) {
// 发送到测输出流
ctx.output(lateElement, "迟到元素【" + value + "】已发送到侧输出流:");
} else {
out.collect("元素【" + value + "】正常到达!");
}
}
});
result.print("正常到达的元素:");
result.getSideOutput(lateElement).print("侧输出流中的迟到元素:");
env.execute();
}
b 将迟到数据发送到侧输出流中(开窗口)
public static void main(String[] args) throws Exception{
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
SingleOutputStreamOperator<String> result = env
.addSource(new SourceFunction<String>() {
@Override
public void run(SourceContext<String> ctx) throws Exception {
ctx.collectWithTimestamp("a", 1000L);
ctx.emitWatermark(new Watermark(999L));
ctx.collectWithTimestamp("a", 2000L);
ctx.emitWatermark(new Watermark(999L));
ctx.collectWithTimestamp("a", 4000L);
// 关闭 0-5 秒的窗口:必须销毁窗口,迟到数据才会被发送到侧输出流
ctx.emitWatermark(new Watermark(4999L));
ctx.collectWithTimestamp("a late", 3000L);
}
@Override
public void cancel() {
}
})
.keyBy(r -> 1)
// 开窗口
.window(TumblingEventTimeWindows.of(Time.seconds(5)))
// 将迟到元素输出到测输出流
.sideOutputLateData(new OutputTag<String>("late") {
}
)
.process(new ProcessWindowFunction<String, String, Integer, TimeWindow>() {
@Override
public void process(Integer integer, Context context, Iterable<String> elements, Collector<String> out) throws Exception {
out.collect("窗口中共有:" + elements.spliterator().getExactSizeIfKnown() + "个元素");
}
});
// 获取正常到达数据
result.print("正常元素:");
// 获取迟到元素,根据字符串id识别侧输出标签,可以保证其实单例
result.getSideOutput(new OutputTag<String>("late"){}).print("迟到元素:");
env.execute();
}