前言
Flink 数据流经过 keyBy 分组后,下一步就是 WindowAssigner。
WindowAssigner 定义了 stream 中的元素如何被分发到各个窗口,元素可以被分发到一个或多个窗口中,Flink 内置了常用的窗口分配器,包括:tumbling windows、 sliding windows、 session windows 和 global windows。除了 global windows ,其它分配器都是基于时间来分发数据的。
当然,你也可以继承 WindowAssigner 抽象类实现自定义的窗口分配逻辑。
WindowAssigner
先看一下 WindowAssigner 抽象类的定义:
@PublicEvolving
public abstract class WindowAssigner<T, W extends Window> implements Serializable {
private static final long serialVersionUID = 1L;
public WindowAssigner() {
}
public abstract Collection<W> assignWindows(T var1, long var2, WindowAssignerContext var4);
public Trigger<T, W> getDefaultTrigger() {
return this.getDefaultTrigger(new StreamExecutionEnvironment());
}
/** @deprecated */
@Deprecated
public abstract Trigger<T, W> getDefaultTrigger(StreamExecutionEnvironment var1);
public abstract TypeSerializer<W> getWindowSerializer(ExecutionConfig var1);
public abstract boolean isEventTime();
@PublicEvolving
public abstract static class WindowAssignerContext {
public WindowAssignerContext() {
}
public abstract long getCurrentProcessingTime();
}
}
四个方法,作用如下:
- assignWindows 将元素 element 分发到一个或多个窗口,返回值是窗口集合
- getDefaultTrigger 返回默认的窗口触发器 Trigger
- getWindowSerializer 返回窗口序列化器(窗口也要在算子间传输)
- isEventTime 是否基于事件时间语义
Flink 内置的 WindowAssigner 实现类关系图如下:
首先,可以按照基于何种时间语义划分出三大类:
- 基于事件时间语义
- 基于处理时间语义
- 不基于时间语义 --> GlobalWindows
在基于时间语义的大类下面,又可以按照时间窗口算法划分为三个具体实现:
- 滚动窗口分配算法 tumbling windows
- 滑动窗口分配算法 sliding windows
- 会话窗口分配算法 session windows
定义窗口Window
窗口对象被 Flink 统一封装为抽象类org.apache.flink.streaming.api.windowing.windows.Window
,Flink 内置了两种实现,分别是:
- TimeWindow 基于时间范围的窗口,包含开始时间戳和结束时间戳
- GlobalWindow 全局窗口,与时间无关的窗口
如果内置的这两种窗口无法满足你的需求,你也可以自定义窗口。需要注意的是,窗口本身是要在算子间传输的,所以你在自定义窗口的同时,还必须提供一个窗口序列化器,以便于 Flink 可以将你的窗口对象序列化传输。
如下示例,我们定义了一个基于数字范围的 NumberWindow,可以将一个数字划分到对应的数字范围窗口内。
public class NumberWindow extends Window {
private final int min;
private final int max;
public NumberWindow(int min, int max) {
this.min = min;
this.max = max;
}
public int getMin() {
return min;
}
public int getMax() {
return max;
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
NumberWindow that = (NumberWindow) o;
return min == that.min && max == that.max;
}
@Override
public int hashCode() {
return Objects.hash(min, max);
}
@Override
public long maxTimestamp() {
return Long.MAX_VALUE;
}
}
Window 实现还必须配套一个序列化器,主要是实现 两个int变量到窗口对象的转换。
public static class Serializer extends TypeSerializerSingleton<NumberWindow> {
@Override
public boolean isImmutableType() {
return true;
}
@Override
public NumberWindow createInstance() {
return new NumberWindow(0, 0);
}
@Override
public NumberWindow copy(NumberWindow numberWindow) {
return numberWindow;
}
@Override
public NumberWindow copy(NumberWindow numberWindow, NumberWindow t1) {
return numberWindow;
}
@Override
public int getLength() {
return 8;
}
@Override
public void serialize(NumberWindow numberWindow, DataOutputView dataOutputView) throws IOException {
dataOutputView.writeInt(numberWindow.getMin());
dataOutputView.writeInt(numberWindow.getMax());
}
@Override
public NumberWindow deserialize(DataInputView dataInputView) throws IOException {
return new NumberWindow(dataInputView.readInt(), dataInputView.readInt());
}
@Override
public NumberWindow deserialize(NumberWindow numberWindow, DataInputView dataInputView) throws IOException {
return this.deserialize(dataInputView);
}
@Override
public void copy(DataInputView dataInputView, DataOutputView dataOutputView) throws IOException {
dataOutputView.writeInt(dataInputView.readInt());
dataOutputView.writeInt(dataInputView.readInt());
}
@Override
public TypeSerializerSnapshot<NumberWindow> snapshotConfiguration() {
return new TimeWindowSerializerSnapshot();
}
public static final class TimeWindowSerializerSnapshot extends SimpleTypeSerializerSnapshot<NumberWindow> {
public TimeWindowSerializerSnapshot() {
super(Serializer::new);
}
}
}
自定义WindowAssigner
窗口对象定义好了,接下来就是定义窗口分配对象。
简单原则,我们把数字划分为三个窗口,分别是:小数窗口、中位数窗口、大数窗口。
如下示例,继承 WindowAssigner 类,重写 assignWindows 方法,把数字划分到对应的窗口中。
public static class MyWindowAssigner extends WindowAssigner<Integer, NumberWindow> {
private final int startingMedian;
private final int startingLarge;
public MyWindowAssigner(int startingMedian, int startingLarge) {
this.startingMedian = startingMedian;
this.startingLarge = startingLarge;
}
@Override
public Collection<NumberWindow> assignWindows(Integer element, long timestamp, WindowAssignerContext windowAssignerContext) {
// 将数字划分到 小数、中位数、大数 窗口
NumberWindow window;
if (element < startingMedian) {
window = new NumberWindow(Integer.MIN_VALUE, startingMedian - 1);
} else if (element < startingLarge) {
window = new NumberWindow(startingMedian, startingLarge - 1);
} else {
window = new NumberWindow(startingLarge, Integer.MAX_VALUE);
}
return List.of(window);
}
@Override
public Trigger<Integer, NumberWindow> getDefaultTrigger(StreamExecutionEnvironment streamExecutionEnvironment) {
return null;
}
@Override
public TypeSerializer<NumberWindow> getWindowSerializer(ExecutionConfig executionConfig) {
return new NumberWindow.Serializer();
}
@Override
public boolean isEventTime() {
return false;
}
}
把流程串起来
窗口对象和窗口分配的逻辑都有了,接下来就是把整个流程给串起来。
如下示例程序,我们定义了一个一秒内生成10个一百以内随机数的数据源Source,然后将这些数字流分为一组,并为其指定我们自定义的 MyWindowAssigner 窗口分配策略,策略中划分了三个窗口,数字小于20的归为小数一档、20到80的归为中位数一档、大于80的归为大数一档,根本数字分配对应的窗口。然后我们自定义了 Trigger,当窗口内积攒的数字达到十个,就触发窗口计算并关闭窗口。最终 ProcessWindowFunction 打印窗口内的数字并求和。
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
environment.addSource(new SourceFunction<Integer>() {
@Override
public void run(SourceContext<Integer> sourceContext) throws Exception {
while (true) {
Threads.sleep(100);
sourceContext.collect(ThreadLocalRandom.current().nextInt(100));
}
}
@Override
public void cancel() {
}
}).keyBy(i -> "all")
.window(new MyWindowAssigner(20, 80))
.trigger(new Trigger<Integer, NumberWindow>() {
@Override
public TriggerResult onElement(Integer element, long timestamp, NumberWindow numberWindow, TriggerContext triggerContext) throws Exception {
ValueState<Integer> countState = triggerContext.getPartitionedState(new ValueStateDescriptor<>("count", Integer.class));
Integer count = Optional.ofNullable(countState.value()).orElse(0) + 1;
if (count < 10) {
countState.update(count);
return TriggerResult.CONTINUE;
}
countState.update(0);
return TriggerResult.FIRE_AND_PURGE;
}
@Override
public TriggerResult onProcessingTime(long timestamp, NumberWindow numberWindow, TriggerContext triggerContext) throws Exception {
return null;
}
@Override
public TriggerResult onEventTime(long timestamp, NumberWindow numberWindow, TriggerContext triggerContext) throws Exception {
return null;
}
@Override
public void clear(NumberWindow numberWindow, TriggerContext triggerContext) throws Exception {
}
}).process(new ProcessWindowFunction<Integer, Object, String, NumberWindow>() {
@Override
public void process(String key, ProcessWindowFunction<Integer, Object, String, NumberWindow>.Context context, Iterable<Integer> iterable, Collector<Object> collector) throws Exception {
StringBuilder builder = new StringBuilder("[" + context.window().getMin() + " - " + context.window().getMax() + "] [");
int sum = 0;
for (Integer value : iterable) {
builder.append(value + ",");
sum += value;
}
builder.append("] sum=" + sum);
System.err.println(builder.toString());
}
});
environment.execute();
}
运行 Flink 作业,控制台输出:
[20 - 79] [30,32,24,66,63,37,] sum=252
[20 - 79] [71,48,41,55,75,79,] sum=369
[80 - 2147483647] [99,90,88,98,85,99,] sum=559
[20 - 79] [74,30,56,70,36,78,] sum=344
尾巴
Flink 的 WindowAssigner 在数据处理中发挥着关键作用。它决定了如何将源源不断的数据流切分成不同的窗口,以便进行有针对性的聚合、计算和分析。
通过合理配置 WindowAssigner,我们能够根据时间、数量或自定义的逻辑来划分数据,灵活地适应各种业务场景。这使得 Flink 能够对海量的实时数据进行高效且精准的处理,帮助我们从数据中提取有价值的信息和洞察。