7 处理函数
7.1 概述
更底层的操作,直接对流进行操作,直接调用处理函数
7.2 基本处理函数ProcessFunction
- 分析
- ProcessFunction的来源
处理函数继承了AbstractRichFunction富函数抽象类,因此就具有访问状态(state)和其他运行时环境
例如AbstractRichFunction类中有getRuntimeContext()这个方法返回的是RuntimeContext类
- 内部分析
里面有个抽象方法processElements()处理元素,参数分别是:输入,上下文,用于输出的collector
以及onTimer()方法类似一个触发回调机制,主要用于定时器的回调,就是TimerService使用registerProcessingTimeTime()注册定时器[详见下面,有写],仅仅是这次,如果需要调用,那么调用就是onTimer()方法调用,但是仅仅限于keyby后的KeyedStream的定时器操作
上下文Context就有可以获取时间戳timestamp(),可以做测输出流output(),以及TimerService接口中有获取处理时间currentProcessingTime()的方法以及获取时间/水位线时间的currentWatermark()方法,以及注册定时器的registerProcessingTimeTime()的方法这边有问题
- 关系图
- 使用
stream.process(new MyProcessFunction())
- 分类
- 初步分析
stream.keyBy得到KeyedStream后调用.process函数,传进去的就是KeyedProcessFunction
ProcessFuntion只是一个,还有KeyedProcessFunction,两者没有继承关系,两者都继承了AbstractRichFunction富函数抽象类
stream.keyBy()
.window()
.process()
亦或者stream.keyBy得到KeyedStream后调用window方法,得到WindowedStream,再调用.process方法,此时传入的就是ProcessWindowFunction函数
- 分类
Flink 提供了 8 个不同的处理函数:
(1)ProcessFunction 最基本的处理函数,基于 DataStream 直接调用.process()时作为参数传入。
(2)KeyedProcessFunction 对流按键分区后的处理函数,基于 KeyedStream 调用.process()时作为参数传入。要想使用 定时器,比如基于 KeyedStream。
(3)ProcessWindowFunction 开窗之后的处理函数,也是全窗口函数的代表。基于 WindowedStream 调用.process()时作 为参数传入。
(4)ProcessAllWindowFunction 同样是开窗之后的处理函数,基于 AllWindowedStream 调用.process()时作为参数传入。
(5)CoProcessFunction 188 合并(connect)两条流之后的处理函数,基于 ConnectedStreams 调用.process()时作为参 数传入。关于流的连接合并操作,我们会在后续章节详细介绍。
(6)ProcessJoinFunction 间隔连接(interval join)两条流之后的处理函数,基于 IntervalJoined 调用.process()时作为 参数传入。 (7)BroadcastProcessFunction 广播连接流处理函数,基于 BroadcastConnectedStream 调用.process()时作为参数传入。这 里的“广播连接流”BroadcastConnectedStream,是一个未 keyBy 的普通 DataStream 与一个广 播流(BroadcastStream)做连接(conncet)之后的产物。关于广播流的相关操作,我们会在后 续章节详细介绍。
(8)KeyedBroadcastProcessFunction 按键分区的广播连接流处理函数,同样是基于 BroadcastConnectedStream 调用.process()时 作为参数传入。与 BroadcastProcessFunction 不同的是,这时的广播连接流,是一个 KeyedStream 与广播流(BroadcastStream)做连接之后的产物。
- 代码
public class ProcessFunctionTest {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
SingleOutputStreamOperator<Event> stream = env.addSource(new ClickSource())
.assignTimestampsAndWatermarks(WatermarkStrategy.<Event>forBoundedOutOfOrderness(Duration.ZERO)
.withTimestampAssigner(new SerializableTimestampAssigner<Event>() {
@Override
public long extractTimestamp(Event element, long recordTimestamp) {
return element.timestamp;
}
})
);
stream.process(new ProcessFunction<Event, String>() {
@Override
//onTimer()方法不一定实现了,参数输入,上下文,输出
public void processElement(Event value, Context ctx, Collector<String> out) throws Exception {
if(value.user.equals("Mary")){
out.collect(value.user+"clicks"+value.url);
}else if(value.user.equals("Bob")){
//可以输出多条
out.collect(value.user);
out.collect(value.user);
}
out.collect(value.toString());
//通过上下文获取当前的timestamp
System.out.println("timestamp"+ctx.timestamp());
System.out.println("watermark"+ctx.timerService().currentWatermark());
//使用富函数
System.out.println(getRuntimeContext().getIndexOfThisSubtask());
//getRuntimeContext().getState()获取状态
}
}).print();
env.execute();
}
}
输出
Bob
Bob
Event{user='Bob', url='./prod?id=100', timestamp=2022-11-23 21:56:12.444}
timestamp1669211772444
watermark-9223372036854775808
0
Event{user='Alice', url='./home', timestamp=2022-11-23 21:56:13.46}
timestamp1669211773460
watermark1669211772443
0
Maryclicks./cart
Event{user='Mary', url='./cart', timestamp=2022-11-23 21:56:14.474}
timestamp1669211774474
watermark1669211773459
0
Event{user='Alice', url='./fav', timestamp=2022-11-23 21:56:15.487}
timestamp1669211775487
watermark1669211774473
0
7.3 按键分区处理函数(KeyedProcessFunction)
7.3.1 概述
主要有以下几个方法
7.3.2 使用
stream.keyBy(t->t.f0)
.process(new MyKeyedProcessFunction())
- 处理时间定时器
- 代码
public class ProcessingTimeTimerTest {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
DataStreamSource<Event> stream = env.addSource(new ClickSource());
stream.keyBy(data -> data.user)
//参数是<K, I, O>
.process(new KeyedProcessFunction<String, Event, String>() {
@Override
public void processElement(Event value, Context ctx, Collector<String> out) {
Long currTs = ctx.timerService().currentProcessingTime();//获取系统时间
out.collect(ctx.getCurrentKey()+ "数据到达,到达时间:"+new Timestamp(currTs));
//注册一个10秒后的定时器,传入的是毫秒数
ctx.timerService().registerProcessingTimeTimer(currTs+10*1000L);
}
@Override
//参数:定时器时间,OnTimerContext继承Context,
public void onTimer(long timestamp,OnTimerContext ctx, Collector<String> out) throws Exception {
out.collect(ctx.getCurrentKey()+"定时器触发,触发时间:"+new Timestamp(timestamp));
}
}).print();
env.execute();
}
}
- 结果
- 事件时间定时器
- 代码
public class EventTimeTimerTest {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
SingleOutputStreamOperator<Event> stream = env.addSource(new ClickSource())
.assignTimestampsAndWatermarks(WatermarkStrategy.<Event>forMonotonousTimestamps()
.withTimestampAssigner(new SerializableTimestampAssigner<Event>() {
@Override
public long extractTimestamp(Event element, long recordTimestamp) {
return element.timestamp;
}
}));
stream.keyBy(data -> data.user)
//参数是<K, I, O>
.process(new KeyedProcessFunction<String, Event, String>() {
@Override
public void processElement(Event value, Context ctx, Collector<String> out) throws Exception {
Long currTs = ctx.timestamp();//获取事件时间,直接掉ctx的timestamp(),不掉service中的了
out.collect(ctx.getCurrentKey()+ "数据到达,时间戳:"+new Timestamp(currTs)+"watermark:"+ctx.timerService().currentWatermark());
//注册一个10秒后的定时器,传入的是毫秒数
ctx.timerService().registerEventTimeTimer(currTs+10*1000L);
}
@Override
//参数:定时器时间,OnTimerContext继承Context,
public void onTimer(long timestamp,OnTimerContext ctx, Collector<String> out) throws Exception {
out.collect(ctx.getCurrentKey()+"定时器触发,触发时间:"+new Timestamp(timestamp)+ctx.timerService().currentWatermark());
}
}).print();
env.execute();
}
}
- 结果
结果分析
-
时间戳带来watermark数据的改变,当定时器2021-08-20 16:50:23.427触发的时候定时器才到22秒512,因此定时器在16:50:23,524之后,触发在水位线之后
-
代码
public class EventTimeTimerTest {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
SingleOutputStreamOperator<Event> stream = env.addSource(new CustomSource())
.assignTimestampsAndWatermarks(WatermarkStrategy.<Event>forMonotonousTimestamps()
.withTimestampAssigner(new SerializableTimestampAssigner<Event>() {
@Override
public long extractTimestamp(Event element, long recordTimestamp) {
return element.timestamp;
}
}));
stream.keyBy(data -> data.user)
//参数是<K, I, O>
.process(new KeyedProcessFunction<String, Event, String>() {
@Override
public void processElement(Event value, Context ctx, Collector<String> out) throws Exception {
Long currTs = ctx.timestamp();//获取事件时间,直接掉ctx的timestamp(),不掉service中的了
out.collect(ctx.getCurrentKey()+ "数据到达,时间戳:"+new Timestamp(currTs)+"watermark:"+ctx.timerService().currentWatermark());
//注册一个10秒后的定时器,传入的是毫秒数
ctx.timerService().registerEventTimeTimer(currTs+10*1000L);
}
@Override
//参数:定时器时间,OnTimerContext继承Context,
public void onTimer(long timestamp,OnTimerContext ctx, Collector<String> out) throws Exception {
out.collect(ctx.getCurrentKey()+"定时器触发,触发时间:"+new Timestamp(timestamp)+ctx.timerService().currentWatermark());
}
}).print();
env.execute();
}
//自定义测试数据源
public static class CustomSource implements SourceFunction<Event>{
@Override
public void run(SourceContext<Event> ctx) throws Exception {
//直接发出测试数据
ctx.collect(new Event("Mary","./homne",1000L));
Thread.sleep(5000L);
ctx.collect(new Event( "Alice","./homne",11000L));
Thread.sleep(5000L);
}
@Override
public void cancel() {
}
}
}
- 结果
Mary数据到达,时间戳:1970-01-01 08:00:01.0watermark:-9223372036854775808
Alice数据到达,时间戳:1970-01-01 08:00:11.0watermark:999
Mary定时器触发,触发时间:1970-01-01 08:00:11.09223372036854775807
Alice定时器触发,触发时间:1970-01-01 08:00:21.09223372036854775807
结果分析
如果输入数据已经结束,数据集处理完毕,那么事件时间语义会把watermark推到最大,因此定时器会被全部触发
7.4 窗口处理函数(ProcessWindowFunction )
- 分析
继承的富函数类
里面是process方法,参数分别是key,上下文,收集数据的集合,输出
Context中可以获取窗口,当前处理时间,当前的watermark,以及测输出流,发现跟之前相比没有TimeService,就不能注册定时器,但是可以通过定时器获取
7.5 应用案例-TopN
7.5.1 使用ProcessAllWindowFunction
- 场景
例如,需要统计最近10秒内最热门的两个url链接,并且每5秒
- 思路
- 使用全窗口函数ProcessAllWindowFunction开窗处理,使用HashMap来保存每个url的访问次数(通过遍历)
- 然后转成ArrayList,然后进行排序,取前两名输出即可
- 代码
- 代码
public class TopNExample_ProcessAllWindowFunction {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
//读取数据
SingleOutputStreamOperator<Event> stream = env.addSource(new ClickSource())
//乱序种延迟0,相当于-1毫秒而已
.assignTimestampsAndWatermarks(WatermarkStrategy.<Event>forBoundedOutOfOrderness(Duration.ZERO)
.withTimestampAssigner(new SerializableTimestampAssigner<Event>() {
@Override
public long extractTimestamp(Event element, long recordTimestamp) {
return element.timestamp;
}
})
);
//直接开窗,收集数据排序
stream.map(data->data.url)//得到String类型的Stream
.windowAll(SlidingEventTimeWindows.of(Time.seconds(10),Time.seconds(5)))//直接开窗
.aggregate(new UrlHashMapCountAgg(),new UrlAllWindowResult())
.print();
env.execute();
}
//实现自定义的增量聚合函数
public static class UrlHashMapCountAgg implements AggregateFunction<String, HashMap<String,Long>, ArrayList<Tuple2<String,Long>>> {
@Override
public HashMap<String, Long> createAccumulator() {
return new HashMap<>();
}
@Override
public HashMap<String, Long> add(String value, HashMap<String, Long> accumulator) {
if(accumulator.containsKey(value)){
Long count = accumulator.get(value);
accumulator.put(value,count+1);
}else {
accumulator.put(value,1L);
}
return accumulator;
}
//就HashMap转成ArrayList<Tuple2<String, Long>>的操作
@Override
public ArrayList<Tuple2<String, Long>> getResult(HashMap<String, Long> accumulator) {
ArrayList<Tuple2<String, Long>> result = new ArrayList<>();
for(String key:accumulator.keySet()){
result.add(Tuple2.of(key,accumulator.get(key)));
}
//排序
result.sort(new Comparator<Tuple2<String, Long>>() {
@Override
//降序,后减前
public int compare(Tuple2<String, Long> o1, Tuple2<String, Long> o2) {
return o2.f1.intValue()-o1.f1.intValue();
}
});
return result;
}
@Override
public HashMap<String, Long> merge(HashMap<String, Long> a, HashMap<String, Long> b) {
return null;
}
}
//实现自定义全窗口函数,包装信息输出结果
public static class UrlAllWindowResult extends ProcessAllWindowFunction<ArrayList<Tuple2<String, Long>>, String, TimeWindow> {
@Override
public void process(Context context, Iterable<ArrayList<Tuple2<String, Long>>> elements, Collector<String> out) throws Exception {
//先拿出来
ArrayList<Tuple2<String, Long>> list = elements.iterator().next();
StringBuilder result = new StringBuilder();
result.append("---------------\n");
//获取窗口信息
result.append("窗口结束时间:"+new Timestamp(context.window().getEnd())+"\n");
//取List排过序后的前两个,包装信息输出
for(int i = 0;i<2;i++){
Tuple2<String, Long> currTuple = list.get(i);
String info = "No."+(i+1)+" "
+"url:"+currTuple.f0+" "
+"访问量"+currTuple.f1+"\n ";
result.append(info);
}
result.append("--------------------\n");
out.collect(result.toString());
}
}
}
- 结果
窗口结束时间:2022-11-25 21:58:35.0
No.1 url:./fav 访问量1
No.2 url:./home 访问量1
--------------------
---------------
窗口结束时间:2022-11-25 21:58:40.0
No.1 url:./home 访问量3
No.2 url:./prod?id=100 访问量3
--------------------
---------------
窗口结束时间:2022-11-25 21:58:45.0
No.1 url:./prod?id=100 访问量4
No.2 url:./cart 访问量2
--------------------
---------------
窗口结束时间:2022-11-25 21:58:50.0
No.1 url:./prod?id=100 访问量4
No.2 url:./fav 访问量3
--------------------
- 评价
用这个方法思路易懂,但是使用了windowAll的全窗口函数,stream直接开窗,所有数据收集到窗口中,导致无分区也就是并行度会变成1大数据场景下内存估计会炸掉,OOM警告
7.5.2 使用 KeyedProcessFunction
- 场景
例如,需要统计最近10秒内最热门的两个url链接,并且每5秒
- 思路
-
触发
-
参照窗口的流式处理原理,将数据汇聚一段时间后输出,就可以使用定时器
-
窗口结束时间+1豪秒使得watermark触发,即数据到齐
-
-
收集
- 定义一个列表把所有数据保存下来
- 使用状态,根据之前keyby按键分组的状态
-
输出
- 排序
- 输出
- 代码
跟上面差不多,多了状态设置,可以理解urlViewCountListState这个就是用来存有状态的数据的
- 代码
public class TopNExample {
public static void main(String[] args) throws Exception{
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
//读取数据
SingleOutputStreamOperator<Event> stream = env.addSource(new ClickSource())
//乱序种延迟0,相当于-1毫秒而已
.assignTimestampsAndWatermarks(WatermarkStrategy.<Event>forBoundedOutOfOrderness(Duration.ZERO)
.withTimestampAssigner(new SerializableTimestampAssigner<Event>() {
@Override
public long extractTimestamp(Event element, long recordTimestamp) {
return element.timestamp;
}
})
);
//1.按照url分组,统计窗口内每个url的访问量
SingleOutputStreamOperator<UrlViewCount> urlCountStream = stream
.keyBy(data -> data.url)
.window(SlidingEventTimeWindows.of(Time.seconds(10), Time.seconds(5)))
.aggregate(new UrlCountViewExample.UrlViewCountAgg(), new UrlCountViewExample.UrlViewCountResult());
urlCountStream.print("url count");
//2.对一同一窗口统计出的访问量,进行手机和排序(以聚合过的结果按照窗口间隔不间断流式输出)
urlCountStream.keyBy(data->data.windowEnd)
.process(new TopNProcessResult(2))
.print();
env.execute();
}
//实现自定义的KeyProcessFunction
public static class TopNProcessResult extends KeyedProcessFunction<Long,UrlViewCount,String> {
//定义一个属性n
private Integer n;
//1.定义列表状态
private ListState<UrlViewCount> urlViewCountListState;
public TopNProcessResult(Integer n) {
this.n = n;
}
//2.管理状态,在环境中获取状态,使用生命周期方法获取
@Override
public void open(Configuration parameters) throws Exception {
urlViewCountListState= getRuntimeContext().getListState(//传入描述器
//两个参数:一个名字,一个类型
new ListStateDescriptor<UrlViewCount>("url-count-list", Types.POJO(UrlViewCount.class)));
}
@Override
public void processElement(UrlViewCount value,Context ctx, Collector<String> out) throws Exception {
//3.将数据保存到状态中
urlViewCountListState.add(value);
//4.注册windowEnd+1ms的定时器
ctx.timerService().registerEventTimeTimer(ctx.getCurrentKey()+1);
}
//5.用来触发定时器
//将状态拿出来,保存成ArrayList
//输出包装
@Override
public void onTimer(long timestamp, OnTimerContext ctx, Collector<String> out) throws Exception {
ArrayList<UrlViewCount> urlViewCountArrayList = new ArrayList<>();
for(UrlViewCount urlViewCount:urlViewCountListState.get())//得到OUT是一个iterable类型
urlViewCountArrayList.add(urlViewCount);
//排序
urlViewCountArrayList.sort(new Comparator<UrlViewCount>() {
@Override
public int compare(UrlViewCount o1, UrlViewCount o2) {
return o2.count.intValue()-o1.count.intValue();
}
});
//6.包装信息打印输出
StringBuilder result = new StringBuilder();
result.append("---------------\n");
//获取窗口信息
result.append("窗口结束时间:"+new Timestamp(ctx.getCurrentKey())+"\n");
//包装信息输出
for(int i = 0;i<2;i++){
UrlViewCount currTuple = urlViewCountArrayList.get(i);
String info = "No."+(i+1)+" "
+"url:"+currTuple.url+" "
+"访问量"+currTuple.count+"\n ";
result.append(info);
}
result.append("--------------------\n");
out.collect(result.toString());
}
}
}
- 结果
url count> UrlViewCount{url='./home', count=1, windowStart=2022-11-25 22:42:30.0, windowEnd=2022-11-25 22:42:40.0}
url count> UrlViewCount{url='./cart', count=2, windowStart=2022-11-25 22:42:30.0, windowEnd=2022-11-25 22:42:40.0}
---------------
窗口结束时间:2022-11-25 22:42:40.0
No.1 url:./cart 访问量2
No.2 url:./home 访问量1
--------------------
url count> UrlViewCount{url='./home', count=2, windowStart=2022-11-25 22:42:35.0, windowEnd=2022-11-25 22:42:45.0}
url count> UrlViewCount{url='./prod?id=100', count=2, windowStart=2022-11-25 22:42:35.0, windowEnd=2022-11-25 22:42:45.0}
url count> UrlViewCount{url='./cart', count=4, windowStart=2022-11-25 22:42:35.0, windowEnd=2022-11-25 22:42:45.0}
---------------
窗口结束时间:2022-11-25 22:42:45.0
No.1 url:./cart 访问量4
No.2 url:./home 访问量2
--------------------
- 评价
可以做并行计算