一、Flink窗口函数
前面指定了窗口的分配器, 接着我们需要来指定如何计算, 这事由window function来负责. 一旦窗口关闭, window function 去计算处理窗口中的每个元素.
window function 可以是ReduceFunction,AggregateFunction,or ProcessWindowFunction中的任意一种.
ReduceFunction,AggregateFunction更加高效, 原因就是Flink可以对到来的元素进行增量聚合 . ProcessWindowFunction 可以得到一个包含这个窗口中所有元素的迭代器, 以及这些元素所属窗口的一些元数据信息.
ProcessWindowFunction不能被高效执行的原因是Flink在执行这个函数之前, 需要在内部缓存这个窗口上所有的元素。
除了一些简单聚合,比如 sum,max,min,maxBay,minBay ,有以下窗口聚合函数。
二、ReduceFunction(增量聚合函数)
输入和输出必须一致
package com.lyh.flink07;
import com.lyh.bean.WaterSensor;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
public class Window_s_function {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
env.socketTextStream("hadoop100",9999)
.map(line -> {
String[] data = line.split(",");
return new WaterSensor(
data[0],
Long.valueOf(data[1]),
Integer.valueOf(data[2])
);
})
.keyBy(WaterSensor::getId)
.window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
.reduce(new ReduceFunction<WaterSensor>() {
@Override
public WaterSensor reduce(WaterSensor value1,
WaterSensor value2) throws Exception {
System.out.println("Window_s_function.reduce");
value1.setVc ( value1.getVc() + value2.getVc());
return (value1);
}
})
.print();
env.execute();
}
}
运行结果
三、AggregateFunction(增量聚合函数)
输入和输出可以不一致
package com.lyh.flink07;
import com.lyh.bean.WaterSensor;
import org.apache.flink.api.common.functions.AggregateFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
import org.apache.kafka.common.metrics.stats.Avg;
import java.util.List;
public class Window_s_function_2 {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
env.socketTextStream("hadoop100",9999)
.map(line -> {
String[] data = line.split(",");
return new WaterSensor(
data[0],
Long.valueOf(data[1]),
Integer.valueOf(data[2])
);
})
.keyBy(WaterSensor::getId)
.window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
.aggregate(
new AggregateFunction<WaterSensor, Avg, Double>() {
@Override
public Avg createAccumulator() {
return new Avg();
}
@Override
public Avg add(WaterSensor value, Avg acc) {
acc.sum += value.getVc();
acc.couunt++;
return acc;
}
@Override
public Double getResult(Avg acc) {
return acc.sum * 1.0 / acc.couunt;
}
@Override
public Avg merge(Avg avg, Avg acc1) {
return null;
}
},
new ProcessWindowFunction<Double, String, String, TimeWindow>() {
@Override
public void process(String key,
Context ctx,
Iterable<Double> elements,
Collector<String> out) throws Exception {
Double result = elements.iterator().next();
long starttime = ctx.window().getStart();
long endtime = ctx.window().getEnd();
out.collect("窗口:" + starttime + " " + endtime + " key: " + key + " result: " + result);
}
}
)
.print();
env.execute();
}
public static class Avg {
public Integer sum = 0;
public Long couunt = 0L;
};
}
运行结果
四、ProcessWindowFunction(全窗口函数)
上面例子里已经用到
new ProcessWindowFunction<Double, String, String, TimeWindow>() {
@Override
public void process(String key,
Context ctx,
Iterable<Double> elements,
Collector<String> out) throws Exception {
Double result = elements.iterator().next();
long starttime = ctx.window().getStart();
long endtime = ctx.window().getEnd();
out.collect("窗口:" + starttime + " " + endtime + " key: " + key + " result: " + result);
}
}