增量聚合函数
——指窗口每进入一条数据就计算一次
例如:要计算数字之和,进去一个12 计算结果为20, 再进入一个7 ——结果为27
reduce
aggregate(aggregateFunction)
package com.bigdata.day04;
public class _04_agg函数 {
public static final Tuple3[] ENGLISH = new Tuple3[] {
Tuple3.of("class1", "张三", 100L),
Tuple3.of("class1", "李四", 40L),
Tuple3.of("class1", "王五", 60L),
Tuple3.of("class2", "赵六", 20L),
Tuple3.of("class2", "小七", 30L),
Tuple3.of("class2", "小八", 50L),
};
public static void main(String[] args) throws Exception {
//1. env-准备环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
//2. source-加载数据
DataStreamSource<Tuple3<String,String,Long>> dataStreamSource = env.fromElements(ENGLISH);
// 此时我要获取每个班级的平均成绩
// 输入数据的类型(IN)、累加器的类型(ACC)和输出数据的类型(OUT)
// IN——Tuple3<String, String, Long>
// ACC——Tuple3<String, Integer,Long> 第一个是班级(key)第二个是数量,第三个是总的成绩
// OUT —— Tuple2<String,Double> 第一个是班级 第二个是平均成绩
dataStreamSource.countWindowAll(3).aggregate(new AggregateFunction<Tuple3<String, String, Long>, Tuple3<String, Integer,Long>, Tuple2<String,Double>>() {
// 初始化一个 累加器
@Override
public Tuple3<String, Integer, Long> createAccumulator() {
return Tuple3.of(null,0,0L);
}
// 累加器和输入的值进行累加
// Tuple3<String, String, Long> value 第一个是传入的值
// Tuple3<String, Integer, Long> accumulator 第二个是累加器的值
@Override
public Tuple3<String, Integer, Long> add(Tuple3<String, String, Long> value, Tuple3<String, Integer, Long> accumulator) {
return Tuple3.of(value.f0,accumulator.f1+1,accumulator.f2+value.f2);
}
// 获取结果——在不同节点的结果进行汇总后实现
@Override
public Tuple2<String, Double> getResult(Tuple3<String, Integer, Long> accumulator) {
return Tuple2.of(accumulator.f0, (double) accumulator.f2 / accumulator.f1);
}
// 由于flink是分布式,所以在别的节点也会进行累加 ,该方法是不同节点的结果进行汇总
// 即累加器之间的累加
@Override
public Tuple3<String, Integer, Long> merge(Tuple3<String, Integer, Long> a, Tuple3<String, Integer, Long> b) {
return Tuple3.of(a.f0,a.f1+b.f1,a.f2+b.f2);
}
}).print();
//4. sink-数据输出
//5. execute-执行
env.execute();
}
}
sum()
min()
max()
全量聚合函数
指在窗口触发的时候才会对窗口内的所有数据进行一次计算(等窗口的数据到齐,才开始进行聚合计算,可实现对窗口内的数据进行排序等需求)
全量聚合函数比较简单,但是会将所有的数据存放在内存中,因此会占用大量的内存空间
apply
package com.bigdata.day04;
public class _05_app函数 {
public static final Tuple3[] ENGLISH = new Tuple3[] {
Tuple3.of("class1", "张三", 100L),
Tuple3.of("class1", "李四", 40L),
Tuple3.of("class1", "王五", 60L),
Tuple3.of("class2", "赵六", 20L),
Tuple3.of("class2", "小七", 30L),
Tuple3.of("class2", "小八", 50L),
};
public static void main(String[] args) throws Exception {
//1. env-准备环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
DataStreamSource<Tuple3<String,String,Long>> dataStreamSource = env.fromElements(ENGLISH);
//2. source-加载数据
dataStreamSource.countWindowAll(3).apply(new AllWindowFunction<Tuple3<String,String,Long>, Tuple2<String,Double>, GlobalWindow>() {
@Override
public void apply(GlobalWindow window, Iterable<Tuple3<String, String, Long>> values, Collector<Tuple2<String,Double>> out) throws Exception {
Long sum = 0L;
int length = 0;
String key = null;
for (Tuple3<String, String, Long> value : values) {
sum += value.f2;
length++;
key = value.f0;
}
out.collect(Tuple2.of(key,(double) sum/length));
}
}).print();
env.execute();
}
}
// 总结
// 接口
new AllWindowFunction<Tuple3<String,String,Long>, Tuple2<String,Double>, GlobalWindow>()
GlobalWindow 窗口对象 Tuple3<String,String,Long> 传入的值 Tuple2<String,Double> 结果
// 重写的方法
public void apply(GlobalWindow window, Iterable<Tuple3<String, String, Long>> values, Collector<Tuple2<String,Double>> out)
Iterable<Tuple3<String, String, Long>> values 传入值的迭代器 进行遍历
Collector<Tuple2<String,Double>> out 收集器 调用collect方法收集即可
window 窗口对象
//使用窗口对象我们可以拿到窗口的起始时间
long start = window.getStart();
long end = window.getEnd();
process
使用方式一:在connect合流之后对两个类型不同的流进行处理
使用方式二:在分流的时候使用,可以通过context.output方法对每个数据添加一个标签
使用方式一
new CoProcessFunction<Long, String, String>()
// 第一个泛型是第一个流的类型 第二个泛型是第二个流的类型 第三个泛型是合并后流的类型
@Override
public void processElement1(Long l, CoProcessFunction<Long, String, String>.Context context, Collector<String> collector) throws Exception {
// Long 是数据类型 结果使用collector中的collect 收集
collector.collect(String.valueOf(l));
}
@Override
public void processElement2(String s, CoProcessFunction<Long, String, String>.Context context, Collector<String> collector) throws Exception {
// String 是数据类型 结果使用collector中的collect 收集
collector.collect(s);
}
使用方式二
此时使用的是context中的context.output(odd, element); 方法
odd 是标签
element 是元素
OutputTag<Long> odd = new OutputTag<>("奇数",TypeInformation.of(Long.class));
OutputTag<Long> even = new OutputTag<>("偶数", TypeInformation.of(Long.class));