背景
AggregateFunction接口是我们经常用的窗口聚合函数,其中有一个merge方法,我们一般情况下也是实现了的,但是你知道吗,其实这个方法只有在你使用会话窗口需要进行窗口合并的时候才需要实现
AggregateFunction.merge方法调用时机
AggregateFunction.merge方法其实只有在使用会话窗口进行窗口合并的时候才会用到,如下所示
对应的源码首先查看WindowOperator.processElement方法对要合并的窗口的状态进行合并
public void processElement(StreamRecord<IN> element) throws Exception {
final Collection<W> elementWindows =
windowAssigner.assignWindows(
element.getValue(), element.getTimestamp(), windowAssignerContext);
// if element is handled by none of assigned elementWindows
boolean isSkippedElement = true;
final K key = this.<K>getKeyedStateBackend().getCurrentKey();
if (windowAssigner instanceof MergingWindowAssigner) {
MergingWindowSet<W> mergingWindows = getMergingWindowSet();
for (W window : elementWindows) {
// adding the new window might result in a merge, in that case the actualWindow
// is the merged window and we work with that. If we don't merge then
// actualWindow == window
W actualWindow =
mergingWindows.addWindow(
window,
new MergingWindowSet.MergeFunction<W>() {
@Override
public void merge(
W mergeResult,
Collection<W> mergedWindows,
W stateWindowResult,
Collection<W> mergedStateWindows)
throws Exception {
triggerContext.key = key;
triggerContext.window = mergeResult;
triggerContext.onMerge(mergedWindows);
for (W m : mergedWindows) {
triggerContext.window = m;
triggerContext.clear();
deleteCleanupTimer(m);
}
// 合并窗口的状态
windowMergingState.mergeNamespaces(
stateWindowResult, mergedStateWindows);
}
});
继续查看AbstractHeapMergingState.mergeNamespaces方法,
public void mergeNamespaces(N target, Collection<N> sources) throws Exception {
if (sources == null || sources.isEmpty()) {
return; // nothing to do
}
final StateTable<K, N, SV> map = stateTable;
SV merged = null;
// merge the sources
for (N source : sources) {
// get and remove the next source per namespace/key
SV sourceState = map.removeAndGetOld(source);
if (merged != null && sourceState != null) {
//此处合并状态并调用AggregateFunction.merge方法
merged = mergeState(merged, sourceState);
} else if (merged == null) {
merged = sourceState;
}
}
// merge into the target, if needed
if (merged != null) {
map.transform(target, merged, mergeTransformation);
}
}
//真正调用AggregateFunction.merge方法合并自定义的状态
@Override
protected ACC mergeState(ACC a, ACC b) {
return aggregateTransformation.aggFunction.merge(a, b);
}
这样AggregateFunction.merge的调用过程就清楚了,实际应用中,我们只需要在使用会话窗口时才需要实现这个方法,其他的基于时间窗口的方式不需要实现这个方法,当然实现了也不会有错