窗口函数定义了要对窗口中收集的数据做的计算操作,根据处理的方式可以分为两类:增量聚合函数和全窗口函数。
文章目录
- 1.增量聚合函数
- 1.1 ReduceFunction
- 1.2 AggregateFunction
- 2.全窗口函数
- 2.1 WindowFunction
- 2.2 ProcessWindowFunction
- 3.增量聚合和全窗口函数的结合使用
1.增量聚合函数
- 归约函数: ReduceFunction
- 聚合函数: AggregateFunction
1.1 ReduceFunction
public class WindowTest {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
environment.setParallelism(1);
environment.getConfig().setAutoWatermarkInterval(100);
DataStream<Event> dataStreamSource = environment.fromElements(
new Event("Mary", "./home", 1000L),
new Event("Bob", "./cart", 2000L),
new Event("Alice", "./fav", 3000L),
new Event("Mary", "./fav", 2000L),
new Event("Bob", "./fav", 3000L),
new Event("Alice", "./fav", 3000L),
new Event("Bob", "./prod?id=1", 4000L)
)
.assignTimestampsAndWatermarks(WatermarkStrategy.<Event>forBoundedOutOfOrderness(Duration.ofSeconds(5))
.withTimestampAssigner(new SerializableTimestampAssigner<Event>() {
@Override
public long extractTimestamp(Event event, long l) {
return event.timestamp;
}
})
);
dataStreamSource.map(new MapFunction<Event, Tuple2<String, Long>>() {
@Override
public Tuple2<String, Long> map(Event event) throws Exception {
return Tuple2.of(event.user, 1L);
}
}).
keyBy(data -> data.f0)
.window(TumblingEventTimeWindows.of(Time.seconds(10)))
.reduce(new ReduceFunction<Tuple2<String, Long>>() {
@Override
public Tuple2<String, Long> reduce(Tuple2<String, Long> stringLongTuple2, Tuple2<String, Long> t1) throws Exception {
return Tuple2.of(stringLongTuple2.f0, stringLongTuple2.f1 + t1.f1);
}
})
.print();
environment.execute();
}
}
先是基于 WindowedStream 调用.reduce()方法, 然后传入ReduceFunction作为参数, 就是将窗口中收集到的数据两两规约。
(Mary,2)
(Alice,2)
(Bob,3)
每来一条数据,就会调用内部的 reduce 方法,将新数据中的 count值叠加到状态上,并得到新的状态保存起来。等到了 5 秒窗口的结束时间,就把归约好的状态直接输出。
1.2 AggregateFunction
public interface AggregateFunction<IN, ACC, OUT> extends Function, Serializable
{
ACC createAccumulator();
ACC add(IN value, ACC accumulator);
OUT getResult(ACC accumulator);
ACC merge(ACC a, ACC b);
}
AggregateFunction 可以看作是 ReduceFunction 的通用版本, 输入类型(IN)、累加器类型(ACC)和输出类型 (OUT), 累加器类型 ACC 则是我们进行聚合的中间状态类型
- createAccumulator: 创建一个累加器, 为聚合的初始状态
- add: 将输入的元素添加到累加器, 每条数据到来之后都会调用这个方法。
- getResult: 从累加器中提取聚合的输出结果。
- merge: 合并两个累加器
在电商网站中,PV(页面浏览量)和 UV(独立访客数)是非常重要的两个流量指标。
- PV 统计的是所有的点击量
- UV 是全部的用户id总和
- PV/UV 代表的是人均重复访问量
public class WindowAggregateTest_PvUv {
public static void main(String[] args) throws Exception {
// Pv: +1
// Uv: 去重
// Pv/Uv: 平均每一个用户的访问次数, 网站的活跃度
StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
environment.setParallelism(1);
SingleOutputStreamOperator<Event> dataStream = environment.addSource(new ClickSource())
.assignTimestampsAndWatermarks(WatermarkStrategy.<Event>forBoundedOutOfOrderness(Duration.ZERO)
.withTimestampAssigner(new SerializableTimestampAssigner<Event>() {
@Override
public long extractTimestamp(Event event, long l) {
return event.timestamp;
}
})
);
dataStream.print("data: ");
dataStream
.keyBy(data -> true)
.window(TumblingEventTimeWindows.of(Time.seconds(10)))
.aggregate(new AvgPv())
.print();
environment.execute();
}
// 参数1 Pv, 参数2 Uv
public static class AvgPv implements AggregateFunction<Event, Tuple2<Long, HashSet<String>>, Double> {
@Override
public Tuple2<Long, HashSet<String>> createAccumulator() {
return Tuple2.of(0L, new HashSet<>());
}
@Override
public Tuple2<Long, HashSet<String>> add(Event event, Tuple2<Long, HashSet<String>> longHashSetTuple2) {
longHashSetTuple2.f1.add(event.user);
return Tuple2.of(longHashSetTuple2.f0 + 1, longHashSetTuple2.f1);
}
@Override
public Double getResult(Tuple2<Long, HashSet<String>> longHashSetTuple2) {
return Double.valueOf(longHashSetTuple2.f0/longHashSetTuple2.f1.size());
}
@Override
public Tuple2<Long, HashSet<String>> merge(Tuple2<Long, HashSet<String>> longHashSetTuple2, Tuple2<Long, HashSet<String>> acc1) {
return null;
}
}
}
data: > Event{user='Mary', url='./fav', timestamp=2022-12-13 18:58:58.794}
data: > Event{user='Bob', url='./fav', timestamp=2022-12-13 18:58:59.8}
data: > Event{user='Mary', url='./cart', timestamp=2022-12-13 18:59:00.809}
1.0
data: > Event{user='Bob', url='./home', timestamp=2022-12-13 18:59:01.825}
data: > Event{user='Mary', url='./home', timestamp=2022-12-13 18:59:02.84}
data: > Event{user='Bob', url='./cart', timestamp=2022-12-13 18:59:03.853}
data: > Event{user='Mary', url='./home', timestamp=2022-12-13 18:59:04.865}
data: > Event{user='Cary', url='./cart', timestamp=2022-12-13 18:59:05.914}
data: > Event{user='Mary', url='./home', timestamp=2022-12-13 18:59:06.922}
data: > Event{user='Cary', url='./cart', timestamp=2022-12-13 18:59:07.936}
data: > Event{user='Cary', url='./fav', timestamp=2022-12-13 18:59:08.951}
data: > Event{user='Cary', url='./fav', timestamp=2022-12-13 18:59:09.965}
data: > Event{user='Alice', url='./cart', timestamp=2022-12-13 18:59:10.973}
3.0
通过 ReduceFunction 和 AggregateFunction 我们可以发现,增量聚合函数其实就是在用流处理的思路来处理有界数据集,核心是保持一个聚合状态,当数据到来时不停地更新状态。这就是 Flink 所谓的“有状态的流处理”,通过这种方式可以极大地提高程序运行的效率,所以在实际应用中最为常见。
2.全窗口函数
- 窗口函数: WindowFunction
- 处理窗口函数: ProcessWindowFunction
2.1 WindowFunction
WindowedStream 调用.apply()方法,传入一个 WindowFunction 的实现类。
stream
.keyBy(<key selector>)
.window(<window assigner>)
.apply(new MyWindowFunction());
WindowFunction可以拿到可迭代集合和窗口本身信息
public interface WindowFunction<IN, OUT, KEY, W extends Window> extends Function,
Serializable {
void apply(KEY key, W window, Iterable<IN> input, Collector<OUT> out) throws
Exception;
}
不过WindowFunction可提供的信息比较少, ProcessWindowFunction可以覆盖信息。
2.2 ProcessWindowFunction
ProcessWindowFunction 可以拿到上下文对象, 就包括了处理时间(processing time)和事件时间水位线(event time watermark)
统计电商网站统计每小时 UV
public class WindowProcessTest {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
environment.setParallelism(1);
SingleOutputStreamOperator<Event> dataStream = environment.addSource(new ClickSource())
.assignTimestampsAndWatermarks(WatermarkStrategy.<Event>forBoundedOutOfOrderness(Duration.ZERO)
.withTimestampAssigner(new SerializableTimestampAssigner<Event>() {
@Override
public long extractTimestamp(Event event, long l) {
return event.timestamp;
}
})
);
dataStream.print("data: ");
dataStream.keyBy(data -> true)
.window(TumblingEventTimeWindows.of(Time.seconds(10)))
.process(new UvCountProcess())
.print();
environment.execute();
}
public static class UvCountProcess extends ProcessWindowFunction<Event, String, Boolean, TimeWindow> {
@Override
public void process(Boolean aBoolean, Context context, java.lang.Iterable<Event> elements, Collector<String> out) throws Exception {
HashSet<String> hashSet = new HashSet<>();
for (Event element : elements) {
hashSet.add(element.user);
}
int size = hashSet.size();
long start = context.window().getStart();
long end = context.window().getEnd();
out.collect("窗口 " + new Timestamp(start) + " ~ " + new Timestamp(end) + " Uv: " + size);
}
}
}
data: > Event{user='Cary', url='./fav', timestamp=2022-12-13 19:15:45.944}
data: > Event{user='Cary', url='./cart', timestamp=2022-12-13 19:15:46.956}
data: > Event{user='Bob', url='./cart', timestamp=2022-12-13 19:15:47.971}
data: > Event{user='Bob', url='./home', timestamp=2022-12-13 19:15:48.973}
data: > Event{user='Cary', url='./home', timestamp=2022-12-13 19:15:49.986}
data: > Event{user='Mary', url='./cart', timestamp=2022-12-13 19:15:51.001}
窗口 2022-12-13 19:15:40.0 ~ 2022-12-13 19:15:50.0 Uv: 2
data: > Event{user='Mary', url='./cart', timestamp=2022-12-13 19:15:52.014}
data: > Event{user='Cary', url='./home', timestamp=2022-12-13 19:15:53.018}
data: > Event{user='Cary', url='./fav', timestamp=2022-12-13 19:15:54.034}
data: > Event{user='Cary', url='./fav', timestamp=2022-12-13 19:15:55.038}
data: > Event{user='Mary', url='./cart', timestamp=2022-12-13 19:15:56.051}
data: > Event{user='Alice', url='./home', timestamp=2022-12-13 19:15:57.068}
data: > Event{user='Bob', url='./fav', timestamp=2022-12-13 19:15:58.084}
data: > Event{user='Mary', url='./fav', timestamp=2022-12-13 19:15:59.087}
data: > Event{user='Mary', url='./cart', timestamp=2022-12-13 19:16:00.104}
窗口 2022-12-13 19:15:50.0 ~ 2022-12-13 19:16:00.0 Uv: 4
HashSet 的元素个数就是 UV 值
3.增量聚合和全窗口函数的结合使用
增量聚合函数处理计算会更高效, 全窗口函数提供了更多的信息
WindowedStream的.aggregate()方法中, 可以添加两个函数的实现类
第一个参数为ReduceFunction或AggregateFunction, 第二个参数为WindowFunction或ProcessWindowFunction
基于第一个参数(增量聚合函数)来处理窗口数据,每来一个数据就做一次聚合;等到窗口需要触发计算时,则调用第二个参数(全窗口函数)的处理逻辑输出结果
Uv: 同一个网站的Uv
public class UvCountExample {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
environment.setParallelism(1);
SingleOutputStreamOperator<Event> dataStream = environment.addSource(new ClickSource())
.assignTimestampsAndWatermarks(WatermarkStrategy.<Event>forBoundedOutOfOrderness(Duration.ZERO)
.withTimestampAssigner(new SerializableTimestampAssigner<Event>() {
@Override
public long extractTimestamp(Event event, long l) {
return event.timestamp;
}
})
);
dataStream.print("data: ");
dataStream.keyBy(data -> true)
.window(TumblingEventTimeWindows.of(Time.seconds(10)))
.aggregate(new UvAgg(), new UvCountResult())
.print();
environment.execute();
}
public static class UvAgg implements AggregateFunction<Event, HashSet<String>, Long> {
@Override
public HashSet<String> createAccumulator() {
return new HashSet<>();
}
@Override
public HashSet<String> add(Event event, HashSet<String> hashSet) {
hashSet.add(event.user);
return hashSet;
}
@Override
public Long getResult(HashSet<String> events) {
return Long.valueOf(events.size());
}
@Override
public HashSet<String> merge(HashSet<String> events, HashSet<String> acc1) {
return null;
}
}
public static class UvCountResult extends ProcessWindowFunction<Long, String, Boolean, TimeWindow> {
@Override
public void process(Boolean aBoolean, Context context, Iterable<Long> elements, Collector<String> out) throws Exception {
Long uv = elements.iterator().next();
long start = context.window().getStart();
long end = context.window().getEnd();
out.collect("窗口: " + new Timestamp(start) + " ~ " + new Timestamp(end) + " Uv: " + uv);
}
}
}
data: > Event{user='Alice', url='./home', timestamp=2022-12-13 20:54:39.832}
data: > Event{user='Alice', url='./home', timestamp=2022-12-13 20:54:40.837}
窗口: 2022-12-13 20:54:30.0 ~ 2022-12-13 20:54:40.0 Uv: 1
data: > Event{user='Mary', url='./home', timestamp=2022-12-13 20:54:41.84}
data: > Event{user='Alice', url='./fav', timestamp=2022-12-13 20:54:42.853}
data: > Event{user='Cary', url='./fav', timestamp=2022-12-13 20:54:43.855}
data: > Event{user='Alice', url='./cart', timestamp=2022-12-13 20:54:44.863}
data: > Event{user='Bob', url='./fav', timestamp=2022-12-13 20:54:45.875}
data: > Event{user='Cary', url='./cart', timestamp=2022-12-13 20:54:46.89}
data: > Event{user='Mary', url='./home', timestamp=2022-12-13 20:54:47.891}
data: > Event{user='Mary', url='./fav', timestamp=2022-12-13 20:54:48.905}
data: > Event{user='Bob', url='./home', timestamp=2022-12-13 20:54:49.909}
data: > Event{user='Alice', url='./home', timestamp=2022-12-13 20:54:50.916}
窗口: 2022-12-13 20:54:40.0 ~ 2022-12-13 20:54:50.0 Uv: 4
在不同的Url 下的Uv
POJO UrlCountView
public class UrlCountView {
public String url;
public Long count;
public Long start;
public Long end;
@Override
public String toString() {
return "UrlCountView{" +
"url='" + url + '\'' +
", count=" + count +
", start=" + new Timestamp(start) +
", end=" + new Timestamp(end) +
'}';
}
public UrlCountView() {
}
public UrlCountView(String url, Long count, Long start, Long end) {
this.url = url;
this.count = count;
this.start = start;
this.end = end;
}
}
public class UvCountViewExample {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
environment.setParallelism(1);
SingleOutputStreamOperator<Event> dataStream = environment.addSource(new ClickSource())
.assignTimestampsAndWatermarks(WatermarkStrategy.<Event>forBoundedOutOfOrderness(Duration.ZERO)
.withTimestampAssigner(new SerializableTimestampAssigner<Event>() {
@Override
public long extractTimestamp(Event event, long l) {
return event.timestamp;
}
})
);
dataStream.print("data: ");
dataStream.keyBy(data -> data.url)
.window(TumblingEventTimeWindows.of(Time.seconds(10)))
.aggregate(new UvCountViewAgg(), new UvCountViewResult())
.print();
environment.execute();
}
public static class UvCountViewAgg implements AggregateFunction<Event, Long, Long>{
@Override
public Long createAccumulator() {
return 0L;
}
@Override
public Long add(Event event, Long aLong) {
return aLong + 1;
}
@Override
public Long getResult(Long aLong) {
return aLong;
}
@Override
public Long merge(Long aLong, Long acc1) {
return null;
}
}
public static class UvCountViewResult extends ProcessWindowFunction<Long, UrlCountView, String, TimeWindow>{
// key
@Override
public void process(String s, Context context, Iterable<Long> elements, Collector<UrlCountView> out) throws Exception {
Long uv = elements.iterator().next();
long start = context.window().getStart();
long end = context.window().getEnd();
out.collect(new UrlCountView(s, uv, start, end));
}
}
}
data: > Event{user='Cary', url='./cart', timestamp=2022-12-13 20:56:18.065}
data: > Event{user='Alice', url='./fav', timestamp=2022-12-13 20:56:19.077}
data: > Event{user='Cary', url='./fav', timestamp=2022-12-13 20:56:20.091}
UrlCountView{url='./cart', count=1, start=2022-12-13 20:56:10.0, end=2022-12-13 20:56:20.0}
UrlCountView{url='./fav', count=1, start=2022-12-13 20:56:10.0, end=2022-12-13 20:56:20.0}
data: > Event{user='Alice', url='./cart', timestamp=2022-12-13 20:56:21.105}
data: > Event{user='Mary', url='./home', timestamp=2022-12-13 20:56:22.111}
data: > Event{user='Alice', url='./cart', timestamp=2022-12-13 20:56:23.115}
data: > Event{user='Alice', url='./home', timestamp=2022-12-13 20:56:24.119}
data: > Event{user='Bob', url='./home', timestamp=2022-12-13 20:56:25.125}
data: > Event{user='Cary', url='./cart', timestamp=2022-12-13 20:56:26.129}
data: > Event{user='Mary', url='./cart', timestamp=2022-12-13 20:56:27.14}
data: > Event{user='Cary', url='./home', timestamp=2022-12-13 20:56:28.145}
data: > Event{user='Bob', url='./home', timestamp=2022-12-13 20:56:29.155}
data: > Event{user='Bob', url='./cart', timestamp=2022-12-13 20:56:30.171}
UrlCountView{url='./fav', count=1, start=2022-12-13 20:56:20.0, end=2022-12-13 20:56:30.0}
UrlCountView{url='./cart', count=4, start=2022-12-13 20:56:20.0, end=2022-12-13 20:56:30.0}
UrlCountView{url='./home', count=5, start=2022-12-13 20:56:20.0, end=2022-12-13 20:56:30.0}
窗口处理的主体还是增量聚合,而引入全窗口函数又可以获取到更多的信息包装输出,这样的结合兼具了两种窗口函数的优势,在保证处理性能和实时性的同时支持了更加丰富的应用场景。