文章目录
- 1、代码模板
- 1.1、pom.xml
- 1.2、log4j.properties
- 1.3、Java模板
- 2、按键分区(Keyed)、非按键分区(Non-Keyed)
- 2.1、Keyed
- 2.2、Non-Keyed
- 3、窗口的分类
- 3.1、基于时间的窗口
- 3.2、基于事件个数的窗口
- 4、窗口函数
- 5、示例代码
- 5.1、ReduceFunction
- 5.2、AggregateFunction
- 5.3、ProcessWindowFunction
1、代码模板
本地开发环境:WIN10+IDEA
只改##################### 业务逻辑 #####################
之间的代码
1.1、pom.xml
<!-- 配置 -->
<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
<flink.version>1.14.6</flink.version>
<scala.binary.version>2.12</scala.binary.version>
<slf4j.version>2.0.3</slf4j.version>
<log4j.version>2.17.2</log4j.version>
</properties>
<!-- https://mvnrepository.com/ -->
<dependencies>
<!-- Flink -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-runtime-web_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<!-- 日志 -->
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>${slf4j.version}</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>${slf4j.version}</version>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-to-slf4j</artifactId>
<version>${log4j.version}</version>
</dependency>
</dependencies>
1.2、log4j.properties
log4j.rootLogger=error, stdout
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n
1.3、Java模板
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import java.util.Scanner;
public class Hello {
public static void main(String[] args) throws Exception {
//创建执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//设置并行度
env.setParallelism(1);
//加入自定义数据源
DataStreamSource<String> dss = env.addSource(new MySource());
//################################### 业务逻辑 ########################################
dss.print();
//################################### 业务逻辑 ########################################
env.execute();
}
public static class MySource implements SourceFunction<String> {
public MySource() {}
@Override
public void run(SourceContext<String> sc) {
Scanner scanner = new Scanner(System.in);
while (true) {
String str = scanner.nextLine().trim();
if (str.equals("STOP")) {break;}
if (!str.equals("")) {sc.collect(str);}
}
scanner.close();
}
@Override
public void cancel() {}
}
}
2、按键分区(Keyed)、非按键分区(Non-Keyed)
Non-Keyed的窗口的流的并行度=1
2.1、Keyed
基于时间的窗口
.keyBy(...)
.window(...)
基于事件个数的窗口
.keyBy(...)
.countWindow(...)
2.2、Non-Keyed
基于时间的窗口
.windowAll(...)
基于事件个数的窗口
.countWindowAll(...)
3、窗口的分类
- 将 无界限的 数据 切分为 有界限的 数据
- https://yellow520.blog.csdn.net/article/details/121288240
3.1、基于时间的窗口
基于时间的滑动窗口
.window(SlidingProcessingTimeWindows.of(Time.seconds(6),Time.seconds(3)))
基于时间的滚动窗口
.window(TumblingProcessingTimeWindows.of(Time.seconds(3)))
基于时间的会话窗口
.window(ProcessingTimeSessionWindows.withGap(Time.seconds(2)))
基于时间的全局窗口
.window(GlobalWindows.create())
3.2、基于事件个数的窗口
基于事件个数的滑动窗口
.countWindow(4,3)
基于事件个数的滚动窗口
.countWindow(4)
4、窗口函数
窗口函数 | 窗口关闭时,窗口函数就去处理窗口中的每个元素 |
---|---|
ReduceFunction | 增量处理,高效 |
AggregateFunction | 增量处理,高效 |
ProcessWindowFunction | 函数执行前要在内部缓存窗口上所有的元素,低效 |
5、示例代码
修改代码模板中
##################### 业务逻辑 #####################
之间的代码
5.1、ReduceFunction
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
dss.keyBy(s -> s)
.window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
.reduce((ReduceFunction<String>) (v1, v2) -> v1 + "," + v2)
.print("输出");
基于时间的滚动窗口
5.2、AggregateFunction
import org.apache.flink.api.common.functions.AggregateFunction;
import org.apache.flink.streaming.api.windowing.assigners.SlidingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
dss.windowAll(SlidingProcessingTimeWindows.of(Time.seconds(10), Time.seconds(5)))
//AggregateFunction<IN, ACC, OUT>
.aggregate(new AggregateFunction<String, Long, Long>() {
//创建累加器
@Override
public Long createAccumulator() {return 0L;}
//累加
@Override
public Long add(String in, Long acc) {return acc + 1L;}
//从累加器获取结果
@Override
public Long getResult(Long acc) {return acc;}
//合并累加器
@Override
public Long merge(Long a1, Long a2) {return a1 + a2;}
})
.print("输出");
基于时间的滑动窗口
5.3、ProcessWindowFunction
源码截取
abstract class ProcessAllWindowFunction<IN, OUT, W extends Window> {
abstract void process(
ProcessAllWindowFunction<IN, OUT, W>.Context var1, //上下文对象
Iterable<IN> var2, //窗口内的所有输入
Collector<OUT> var3 //收集器
);
代码
import org.apache.flink.streaming.api.functions.windowing.ProcessAllWindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.SlidingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
dss.windowAll(SlidingProcessingTimeWindows.of(Time.seconds(10), Time.seconds(5)))
.process(new ProcessAllWindowFunction<String, String, TimeWindow>() {
@Override
public void process(Context context, Iterable<String> in, Collector<String> out) {
//打印窗口范围
System.out.println(context.window().toString());
//在窗口内,收集元素
out.collect(String.valueOf(in));
}
})
.print("输出");
测试运行截图