前言
在flink api中,聚合算子是非常常用的。所谓的聚合就是在分组的基础上做比较计算的操作。下面通过几个简单案例来说明聚合算子的用法和注意事项。
聚合算子案例
因为flink的api操作流程比较固定,从获取执行环境==》获取数据源==》执行数据转换操作==》输出结果。为了复用代码,参考代码使用了一个模板设计模式。
先定义一个Stream的泛型接口
package com.tml.common;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
public interface StreamService<T> {
StreamExecutionEnvironment getEnv();
DataStream<T> getSource(StreamExecutionEnvironment env);
}
抽象一个模板
package com.tml.common;
import com.tml.msg.CommonMsg;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.connector.source.util.ratelimit.RateLimiterStrategy;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.connector.datagen.source.DataGeneratorSource;
import org.apache.flink.connector.datagen.source.GeneratorFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
public abstract class AbsStreamCommonService<T> implements StreamService<T> {
public void processStream(Integer parallelism) throws Exception {
StreamExecutionEnvironment env = getEnv();
env.setParallelism(parallelism);
DataStream<T> stream = getSource(env);
handle(stream);
env.execute();
}
public abstract void handle(DataStream<T> source);
@Override
public StreamExecutionEnvironment getEnv() {
return StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());
}
public DataStream<String> getSourceFromSocket(StreamExecutionEnvironment environment) {
return environment.socketTextStream("43.139.114.233", 9999);
}
public DataStream<CommonMsg> getSourceFromCollection(StreamExecutionEnvironment environment) {
DataStreamSource<CommonMsg> source = environment.fromElements(
new CommonMsg("11", "hello world", 11L),
new CommonMsg("11", "hello flink", 3L),
new CommonMsg("12", "hello kitty", 13L),
new CommonMsg("13", "hello world", 12L),
new CommonMsg("11", "hello java", 23L));
return source;
}
public DataStream<Long> getSourceFromDataGenerator(StreamExecutionEnvironment environment) {
DataGeneratorSource<Long> dataGeneratorSource =
new DataGeneratorSource<>((GeneratorFunction<Long, Long>) o -> o, 100000L,RateLimiterStrategy.perSecond(2), Types.LONG);
return environment.fromSource(dataGeneratorSource, WatermarkStrategy.noWatermarks(), "dataGeneratorSource", Types.LONG);
}
}
注:使用
StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration())可以在控制台看到flink的web-ui界面,默认是http://localhost:8081,方便看到flink job的执行参数,这种方式适用于本地调试和学习
比如这样
对应的pom文件依赖
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.tml</groupId>
<artifactId>flink-demo</artifactId>
<version>1.0-SNAPSHOT</version>
<packaging>jar</packaging>
<name>flink-demo</name>
<url>http://maven.apache.org</url>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<flink.version>1.18.0</flink.version> <!-- 根据你的 Flink 版本进行调整 -->
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
</properties>
<dependencies>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>3.8.1</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>${flink.version}</version>
</dependency>
<!-- Flink Streaming API -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java</artifactId>
<version>${flink.version}</version>
</dependency>
<!-- Flink Table API and SQL -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-runtime-web</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-files</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-datagen</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.20</version>
</dependency>
</dependencies>
</project>
keyBy
package com.tml.operator.aggregation;
import com.tml.common.AbsStreamCommonService;
import com.tml.msg.CommonMsg;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
public class KeyByDemo extends AbsStreamCommonService<CommonMsg> {
public static void main(String[] args) throws Exception {
new KeyByDemo().processStream(4);
}
@Override
public void handle(DataStream<CommonMsg> stream) {
/**
* keyby算子返回的是一个keyedStream
* 1.keyby不是一个转换算子,只对数据进行了重分区,另外还不能设置并行度
* 2.keyby分组和分区的概念
* keyby是对数据进行分组,保证同一个分组的数据会落到同一个数据分区内
* 分区:一个子任务可以理解为一个分区,一个分区可以包含有多个分组的数据
*/
KeyedStream<CommonMsg, String> keyBy = stream.keyBy((KeySelector<CommonMsg, String>) CommonMsg::getId, TypeInformation.of(String.class));
keyBy.print();
}
@Override
public DataStream<CommonMsg> getSource(StreamExecutionEnvironment env) {
return super.getSourceFromCollection(env);
}
}
数据源是一个有界的数组,对应的数据是程序中自己new出来的,执行结果如下
2> CommonMsg(id=11, msg=hello world, time=11)
2> CommonMsg(id=11, msg=hello flink, time=3)
2> CommonMsg(id=11, msg=hello java, time=23)
1> CommonMsg(id=12, msg=hello kitty, time=13)
3> CommonMsg(id=13, msg=hello world, time=12)
可以看到,通过keyBy的分组操作,相同的数据放在了同一个分区去执行。
sum/min/minBy/max/maxBy
这几个是最基本的聚合算子。
package com.tml.operator.aggregation;
import com.tml.common.AbsStreamCommonService;
import com.tml.msg.CommonMsg;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
public class SimpleAggregateDemo extends AbsStreamCommonService<CommonMsg> {
public static void main(String[] args) throws Exception {
new SimpleAggregateDemo().processStream(1);
}
@Override
public void handle(DataStream<CommonMsg> stream) {
KeyedStream<CommonMsg, String> keyStream = stream.keyBy((KeySelector<CommonMsg, String>) CommonMsg::getId, TypeInformation.of(String.class));
//使用sum聚合
//SingleOutputStreamOperator<CommonMsg> time = stream.sum("time");
//SingleOutputStreamOperator<CommonMsg> min = stream.min("time");
/**
* max、maxyBy的区别在于
* max不会对非比较字段重新赋值,而maxBy会更新非比较字段的值
*/
SingleOutputStreamOperator<CommonMsg> minBy = keyStream.minBy("time");
//min.print();
minBy.print();
}
@Override
public DataStream<CommonMsg> getSource(StreamExecutionEnvironment env) {
return super.getSourceFromCollection(env);
}
}
先看一下minBy这个算子结果输出
CommonMsg(id=11, msg=hello world, time=11)
CommonMsg(id=11, msg=hello flink, time=3)
CommonMsg(id=12, msg=hello kitty, time=13)
CommonMsg(id=13, msg=hello world, time=12)
CommonMsg(id=11, msg=hello flink, time=3)
将聚合操作的api换成min(),对比一下程序的输出
CommonMsg(id=11, msg=hello world, time=11)
CommonMsg(id=11, msg=hello world, time=3)
CommonMsg(id=12, msg=hello kitty, time=13)
CommonMsg(id=13, msg=hello world, time=12)
CommonMsg(id=11, msg=hello world, time=3)
两个对比输出可以发现,min、minBy的区别在于
min不会对非比较字段重新赋值,而minBy会更新非比较字段的值
当然max、maxBy也是一样
reduce
package com.tml.operator.aggregation;
import com.tml.common.AbsStreamCommonService;
import com.tml.msg.CommonMsg;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
public class ReduceDemo extends AbsStreamCommonService<CommonMsg> {
public static void main(String[] args) throws Exception {
new ReduceDemo().processStream(1);
}
@Override
public void handle(DataStream<CommonMsg> source) {
KeyedStream<CommonMsg, String> stream = source.keyBy((KeySelector<CommonMsg, String>) CommonMsg::getId, TypeInformation.of(String.class));
/**
* reduce函数是非常灵活的,可以根据业务需求,非常灵活的进行聚合计算
* 当每个分组中只有一条数据的时候,是不会进行reduce的,因为只有一条数据,没有比较的数据,进行reduce没有必要
*/
SingleOutputStreamOperator<CommonMsg> reduce = stream.reduce((t1, t2) -> {
System.out.println("t1==>" + t1);
System.out.println("t2==>" + t2);
CommonMsg commonMsg = new CommonMsg(t1.getId(), t2.getMsg(), t1.getTime() + t2.getTime());
return commonMsg;
});
reduce.print();
}
@Override
public DataStream<CommonMsg> getSource(StreamExecutionEnvironment env) {
return super.getSourceFromCollection(env);
}
}
看一下运行结果
CommonMsg(id=11, msg=hello world, time=11)
t1==>CommonMsg(id=11, msg=hello world, time=11)
t2==>CommonMsg(id=11, msg=hello flink, time=3)
CommonMsg(id=11, msg=hello flink, time=14)
CommonMsg(id=12, msg=hello kitty, time=13)
CommonMsg(id=13, msg=hello world, time=12)
t1==>CommonMsg(id=11, msg=hello flink, time=14)
t2==>CommonMsg(id=11, msg=hello java, time=23)
CommonMsg(id=11, msg=hello java, time=37)
通过运行结果可以看到,reduce算子是非常灵活的,可以在两个数据之间做非常灵活的操作,当然,如果对应的分组中只有一条数据,自然是 不会触发reduce函数的执行了。
richFunction
package com.tml.operator.aggregation;
import com.tml.common.AbsStreamCommonService;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
/**
* richfunction添加了一些额外的功能
* 提供了一些生命周期的管理方法,比如open()\close()
* open() 在每个子任务启动的时候调用一次
* close() 在每个任务结束的时候调用一次,如果是flink程序挂掉,不会调用这个close方法,在控制台上点击cancel任务,这个close方法也是可以额正常调用的
*
* 另外多了一些运行时上下文,可以通过getRuntimeContext() 来获取上下文中的一些关键信息
* 在close方法中可以做一些释放资源的操作,回调通知操作等一些hook函数
*/
public class RichFunctionDemo extends AbsStreamCommonService<String> {
public static void main(String[] args) throws Exception {
new RichFunctionDemo().processStream(1);
}
@Override
public void handle(DataStream<String> stream) {
SingleOutputStreamOperator<String> map = stream.map(new RichMapFunction<String, String>() {
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
RuntimeContext context = getRuntimeContext();
String taskName = context.getTaskName();
int subtasks = context.getNumberOfParallelSubtasks();
System.out.println("taskName: " + taskName + ", subtasks: " + subtasks + " call open()");
}
@Override
public void close() throws Exception {
super.close();
RuntimeContext context = getRuntimeContext();
String taskName = context.getTaskName();
int subtasks = context.getNumberOfParallelSubtasks();
System.out.println("taskName: " + taskName + ", subtasks: " + subtasks + " call close()");
}
@Override
public String map(String value) throws Exception {
return "(" + value + ")";
}
}, TypeInformation.of(String.class));
map.print();
}
@Override
public DataStream<String> getSource(StreamExecutionEnvironment env) {
return super.getSourceFromSocket(env);
}
}
运行程序前需要先运行socket,这里使用了nc,详细可以参考Flink实时统计单词【入门】
看一下运行结果
taskName: Source: Socket Stream -> Map -> Sink: Print to Std. Out, subtasks: 1 call open()
(hello kitty)
(hello flink)
taskName: Source: Socket Stream -> Map -> Sink: Print to Std. Out, subtasks: 1 call close()
总结
- 这些聚合算子的基础实在keyBy之后,只有对数据进行了分组之后,才能执行后面的聚合操作。
- min、minBy和max、maxBy之间有细微的区别,前者不会对非比较字段重新赋值,而后者会更新非比较字段的值
- reduce算子是在两个数据之间进行操作的,可以非常灵活
- richFunction不算聚合函数,这里写进来是富函数可以做非常多的额外功能,open()方法是对应的子任务启动时调用一下,close()方法是在对应的子任务结束的时候调用一次,通过这个可以做一些监控或者hook通知的操作
代码案例已经上传到了github,欢迎前来围观!