1.为什么定时器的时间设置为,窗口的end值+1ms就可以呢?
因为定时器是下游,水位线是取的多个上游的最小的, 水位线是跟在数据后面的,所以当定时器的时间到达时,上游一定计算完成了,并且数据已经在水位线之前到下游了,所以可以触发计算,就是当前窗口所有的数据,比如窗口是[8:00~9:00)
2.为什么要用定时器呢?
不用也可以计算,但是是来一条计算一条,假如有10万条,效率低,用定时器计算,可以在数据到齐时,一起计算,效率高。
3.不用window,但是一定要keyBy
不用window的原因:不是取有限的数据,而是取所有end是9:00的数据
一定要keyBy的原因:因为上游计算完成的有可能有[8:05~9:05)的数据,所以需要根据end分组
4.为什么读取文件,没到5分钟就触发计算了?
因为用的是事件时间
5.定义的flag变量,计算完需要置null吗?
不需要。每个key都有自己的ValueState
6.定时器触发的时候,上游一定都计算完了吗?
一定计算完了。
因为上游是先keyBy,再window,计算的是A商品在 [8:00~9:00)时间段内的数据,B商品在 [8:00~9:00)时间段内的数据,当A的水位线到达9:00的时候,触发了计算,但是B的水位线才到8:30,这时候定时器会取上游最小的8:30,所以不会触发,当B的水位线推进到9:00的时候,现在最小的就是9:00,所以定时器会触发计算,这样,A和B都被计算了,没有丢失数据。
7.flink的定时器 如果重复注册相同的 会触发多次吗?
不会
“答案是不会,应为Flink内部使用的HeapPriorityQueueSet来存储定时器,一个注册请求到来时,其add()方法会检查是否已经存在,如果存在则不会加入。 ”
但是最好在外面手动控制,比如用一个Boolean值,只在第一个时注册。
8.下游定时器需要等所有上游时间都到达后计算,等的是哪些上游?
可以通过webui界面看上下游
对于topn,下游是process算子, 上游是aggregate算子,当A、B、C三个商品都完成之后,下游定时器计算。
9.对于水位线
源头是周期性产生的,但是之后是:水位线是跟在数据屁股后面的,所以等aggregate算子计算完后,定时器再计算。
package com.atguigu.flink.state;
import com.atguigu.flink.func.WaterSensorMapFunction;
import com.atguigu.flink.pojo.UserBehavior;
import com.atguigu.flink.pojo.WaterSensor;
import com.atguigu.flink.utils.MyUtil;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.AggregateFunction;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.connector.file.src.FileSource;
import org.apache.flink.connector.file.src.reader.TextLineInputFormat;
import org.apache.flink.core.fs.Path;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.SlidingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;
/**
* Created by Smexy on 2023/6/21
*
* 需求: 每隔5min输出最近1h内点击量(pv)最多的前3个商品
*
* 数据: 543462,1715,1464116,pv,1511658000
* userId,商品id,商品类别id,行为类型,ts
*
* 输入: 543462,1715,1464116,pv,1511658000
* 粒度: 一个用户点击一个商品的一次是一行
*
*
* 推理计算过程: 聚合,keyBy 商品id
* 第一次聚合: 统计最近1h(窗口)内,各个商品的点击总次数
* size: 范围,1h
* slide: 计算时机,5min
* 滑动的时间窗口。
*
* 输入: 543462,1715,1464116,pv,1511658000
*
* 输出:
* [8:00,9:00):
* A---120
* [8:05,9:05)
* A---150
* [8:00,9:00):
* B---130
* [8:00,9:00):
* C---132
* [8:00,9:00):
* D---131
*
* 第二次聚合: 将每个时间段窗口中各个商品的点击量,排序再取前3
* 用不用开窗? 不用
* 需要keyBy,按照窗口的统计的时间范围keyBy
*
* 等同一个窗口的所有数据全部到达后,再一次性计算。
* 如何知道当前要计算的数据已经全部到达,可以触发运算?
* 使用定时器,将窗口的endTime作为触发时间,只要下游的时间到了endTime证明上游endTime之前的所有数据都已经到达了下游,可以进行运算。
*
*
*
* 输出:
* [8:00,9:00):
* A--120
* B--119
* C--118
* [8:05,9:05):
* E--120
* B--119
* C--118
*/
public class Flink12_TopN
{
public static void main(String[] args) {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(2);
FileSource<String> fileSource = FileSource.forRecordStreamFormat(
new TextLineInputFormat(StandardCharsets.UTF_8.name())
,
new Path("input/UserBehavior.csv")
).build();
WatermarkStrategy<UserBehavior> watermarkStrategy = WatermarkStrategy
.<UserBehavior>forMonotonousTimestamps()
.withTimestampAssigner( (e, ts) -> e.getTimestamp() * 1000);
//1.读数据,封装bean,过滤pv,生成水印
SingleOutputStreamOperator<UserBehavior> ds = env
.fromSource(fileSource, WatermarkStrategy.noWatermarks(), "source")
.map(line -> {
String[] words = line.split(",");
return new UserBehavior(
Long.valueOf(words[0]),
Long.valueOf(words[1]),
Integer.valueOf(words[2]),
words[3],
Long.valueOf(words[4])
);
})
.filter(bean -> "pv".equals(bean.getBehavior()))
.assignTimestampsAndWatermarks(watermarkStrategy);
/*
2.开窗,统计每种商品的点击次数
*/
SingleOutputStreamOperator<HotItem> ds1 = ds
.keyBy(UserBehavior::getItemId)
.window(SlidingEventTimeWindows.of(Time.hours(1), Time.minutes(5)))
.aggregate(new AggregateFunction<UserBehavior, Long, HotItem>()
{
@Override
public Long createAccumulator() {
return 0l;
}
@Override
public Long add(UserBehavior value, Long accumulator) {
return accumulator + 1;
}
@Override
public HotItem getResult(Long accumulator) {
return new HotItem(null, null, null, accumulator);
}
@Override
public Long merge(Long a, Long b) {
return null;
}
}, new ProcessWindowFunction<HotItem, HotItem, Long, TimeWindow>()
{
@Override
public void process(Long key, ProcessWindowFunction<HotItem, HotItem, Long, TimeWindow>.Context context, Iterable<HotItem> iterable, Collector<HotItem> collector) throws Exception {
HotItem hotItem = iterable.iterator().next();
TimeWindow window = context.window();
//赋值
hotItem.setStart(window.getStart());
hotItem.setEnd(window.getEnd());
hotItem.setItemId(key);
collector.collect(hotItem);
}
});
//3.在下游按照窗口的时间范围分组,top3统计。使用定时器触发运算。
ds1
.keyBy(HotItem::getStart)
.process(new KeyedProcessFunction<Long, HotItem, String>()
{
private ValueState<Boolean> flag;
private ListState<HotItem> listState;
/*
没来一条数据,先存起来,等定时器到点了,再触发top3
*/
@Override
public void open(Configuration parameters) throws Exception {
listState = getRuntimeContext().getListState(new ListStateDescriptor<>("hot3", HotItem.class));
flag = getRuntimeContext().getState(new ValueStateDescriptor<>("flag", Boolean.class));
}
//进行top3计算
@Override
public void onTimer(long timestamp, KeyedProcessFunction<Long, HotItem, String>.OnTimerContext ctx, Collector<String> out) throws Exception {
List<HotItem> top3 = StreamSupport.stream(listState.get().spliterator(), true)
.sorted((h1, h2) -> -h1.getCount().compareTo(h2.getCount()))
.limit(3)
.collect(Collectors.toList());
//整理数据的格式
String resultStr = top3.stream().map(item -> item.getItemId() + ":" + item.getCount()).collect(Collectors.joining(","));
String timeStr = MyUtil.parseTimeWindow(new TimeWindow(top3.get(0).getStart(), top3.get(0).getEnd()));
out.collect(timeStr + ": top3 : " + resultStr);
}
@Override
public void processElement(HotItem hotItem, KeyedProcessFunction<Long, HotItem, String>.Context context, Collector<String> collector) throws Exception {
listState.add(hotItem);
//在当前组中第一条数据来的时候,定定时器
if (flag.value() == null){
//定定时器
context.timerService().registerEventTimeTimer(hotItem.getEnd());
flag.update(false);
}
}
})
.print();
try {
env.execute();
} catch (Exception e) {
e.printStackTrace();
}
}
@Data
@AllArgsConstructor
@NoArgsConstructor
public static class HotItem{
//定义窗口范围 时间窗口
private Long start;
private Long end;
//定义统计的指标
private Long itemId;
private Long count;
}
}