版本说明
本文中以Flink 1.16.1 版本讲解说明
Note:Flink1.16.1版本相较于之前版本增强的within函数,
支持模式序列中相邻事件间的超时定义,以前版本只支持模式序列中第一个事件到最后一个事件之间的最大时间间隔。
快速开始
基于Kafka connecter 流处理job还需要引入:
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka</artifactId>
<version>${flink.version}</version>
</dependency>
基于Flink DataStream作业添加CEP Maven依赖
<properties>
<flink.version>1.16.1</flink.version>
</properties>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-cep</artifactId>
<version>${flink.version}</version>
</dependency>
使用Pattern API编写CEP例子
DataStream<Event> input = ...;
Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where( //"start" 为模式的名称,模式名称规则查看模式名称约束条件
new SimpleCondition<Event>() { // 定义了一个简单条件
@Override
public boolean filter(Event event) {
return event.getId() == 42; //开始事件为id=42的事件,配到一个(未设置times时默认为匹配到1次,进入下一个模式)
}
}
).next("middle").subtype(SubEvent.class).where( // subtype方法限定middle对应事件子类为SubEvent
new SimpleCondition<SubEvent>() {
@Override
public boolean filter(SubEvent subEvent) {
return subEvent.getVolume() >= 10.0; // 事件 volume属性大于等于10.0
}
}
).followedBy("end").where(
new SimpleCondition<Event>() {
@Override
public boolean filter(Event event) {
return event.getName().equals("end"); // 最后一个事件
}
}
);
PatternStream<Event> patternStream = CEP.pattern(input, pattern);
DataStream<Alert> result = patternStream.process(
new PatternProcessFunction<Event, Alert>() {
@Override
public void processMatch(
Map<String, List<Event>> pattern,
Context ctx,
Collector<Alert> out) throws Exception {
// 对于匹配成功的事件存储在 Map<String, List<Event>> pattern的Map中,key为模式的名称,这里key值可能为start,middle,end,因为要支持循环模式的量词,所以模式对应的事件定义为List
// 对应匹配超时(使用了within条件)可以使用PatternTimeoutFunction来处理,下文中再讲解。
out.collect(createAlertFrom(pattern));
}
});
模式API
模式API可以实现在连续的事件中,匹配到符合指定特征的子序列。每个复杂的模式序列由多个简单的模式组合而成。
每个模式必须有一个独一无二的名字,你可以在后面使用它来识别匹配到的事件。
模式的名字不能包含字符":".
Flink CEP API中对模式划分为三种:单个模式,组合模式、模式组,下面对三种模式详细讲解
单个模式
单个模式,又分为单例模式、循环模式,单例模式只接受一个事件,循环模式可以接受多个事件。 在模式匹配表达式中,模式"a b+ c? d"(或者"a",后面跟着一个或者多个"b",再往后可选择的跟着一个"c",最后跟着一个"d"), a,c?,和 d都是单例模式,b+是一个循环模式。默认情况下,模式都是单例的,你可以通过使用量词(Quantifier)把它们转换成循环模式。 每个模式可以有一个或者多个条件来决定它接受哪些事件。
量词
在FlinkCEP中,你可以通过这些方法指定循环模式:pattern.oneOrMore(),指定期望一个给定事件出现一次或者多次的模式(例如前面提到的b+模式); pattern.times(#ofTimes),指定期望一个给定事件出现特定次数的模式,例如出现4次a; pattern.times(#fromTimes, #toTimes),指定期望一个给定事件出现次数在一个最小值和最大值中间的模式,比如出现2-4次a。
你可以使用pattern.greedy()方法让循环模式变成贪心的,但现在还不能让模式组贪心。 你可以使用pattern.optional()方法让所有的模式变成可选的,不管是否是循环模式。
例如:对一个名称为start的模式,可以指定如下量词;
// 期望出现4次
start.times(4);
// 期望出现0或者4次
start.times(4).optional();
// 期望出现2、3或者4次
start.times(2, 4);
// 期望出现2、3或者4次,并且尽可能的重复次数多
start.times(2, 4).greedy();
// 期望出现0、2、3或者4次
start.times(2, 4).optional();
// 期望出现0、2、3或者4次,并且尽可能的重复次数多
start.times(2, 4).optional().greedy();
// 期望出现1到多次
start.oneOrMore();
// 期望出现1到多次,并且尽可能的重复次数多
start.oneOrMore().greedy();
// 期望出现0到多次
start.oneOrMore().optional();
// 期望出现0到多次,并且尽可能的重复次数多
start.oneOrMore().optional().greedy();
// 期望出现2到多次
start.timesOrMore(2);
// 期望出现2到多次,并且尽可能的重复次数多
start.timesOrMore(2).greedy();
// 期望出现0、2或多次
start.timesOrMore(2).optional();
// 期望出现0、2或多次,并且尽可能的重复次数多
start.timesOrMore(2).optional().greedy();
特别需要注意的是:
1. optional不能应用于 not 的pattern
2. optional不能应用于 GroupPattern
3. 前一个Pattern如果设置了greedy(),后一个Pattern不能使用optional
条件
条件即Condition,API已实现了一些基础的Condition:
条件用于判定事件是否被当前这个模式接收,可通过pattern.where()、pattern.or()或者pattern.until()方法指定条件,条件可使用已实现的Condition也可以集成IterativeCondition或者SimpleCondition自定义条件。
迭代条件(IterativeCondition): 这是最普遍的条件类型。使用它可以指定一个基于前面已经被接受的事件的属性或者它们的一个子集的统计数据来决定是否接受时间序列的条件。
下面是一个迭代条件的代码,它接受"middle"模式下一个事件的名称开头是"foo", 并且前面已经匹配到的事件加上这个事件的价格小于5.0。 迭代条件非常强大,尤其是跟循环模式结合使用时。
middle.oneOrMore()
.subtype(SubEvent.class)
.where(new IterativeCondition<SubEvent>() {
@Override
public boolean filter(SubEvent value, Context<SubEvent> ctx) throws Exception {
if (!value.getName().startsWith("foo")) {
return false;
}
double sum = value.getPrice();
// ctx.getEventsForPattern还可以获取前面的pattern,比如 ctx.getEventsForPattern("start")
for (Event event : ctx.getEventsForPattern("middle")) {
sum += event.getPrice();
}
return Double.compare(sum, 5.0) < 0;
}
});
调用ctx.getEventsForPattern(…)可以获得所有前面已经接受作为可能匹配的事件。 调用这个操作的代价可能很小也可能很大,所以在实现你的条件时,尽量少使用它。
简单条件:和它的名称一样,实现非常简单,只是判断单个事件本身的属性是否满足某些条件,类似list.stream.filter() 的功能。
组合条件:和SQL中的 and 、or一样, and条件使用多个where()方法连续调用,or,调用or()方法即可,示例如下
pattern.where(new SimpleCondition<Event>() {
@Override
public boolean filter(Event value) {
return ...; // 一些判断条件
}
}).where(new SimpleCondition<Event>() {
@Override
public boolean filter(Event value) {
return ...; // AND的另一些判断条件
}
}).or(new SimpleCondition<Event>() {
@Override
public boolean filter(Event value) {
return ...; // OR的另一些判断条件
}
});
停止条件: 如果使用循环模式(oneOrMore()和oneOrMore().optional()),你可以指定一个停止条件,例如,接受事件的值大于5直到值的和小于50。方法名:until(condition);
条件API使用实例汇总:
// where()、 or()使用
pattern.where(new IterativeCondition<Event>() {
@Override
public boolean filter(Event value, Context ctx) throws Exception {
return ...; // 一些判断条件
}
}).or(new IterativeCondition<Event>() {
@Override
public boolean filter(Event value, Context ctx) throws Exception {
return ...; // 替代条件
}
});
// until 使用方法
pattern.oneOrMore().until(new IterativeCondition<Event>() {
@Override
public boolean filter(Event value, Context ctx) throws Exception {
return ...; // 替代条件
}
});
// subtype(clazz)
pattern.subtype(SubEvent.class);
//默认内部为松散连续, 推荐使用 until()或者 within()来清理状态。
pattern.oneOrMore();
//timesOrMore(n),匹配结果: >=n 次
pattern.timesOrMore(2);
//timesOrMore(n),匹配结果: =n 次
pattern.times(2);
//optional(), 匹配结果 >=0次
pattern.oneOrMore().optional();
// 尽可能多低匹配,n?次,建议结合within来清理状态,和within结合后,匹配结果:>=1;
pattern.oneOrMore().greedy();
组合模式
上文中已讲解了单个模式,下面对组合模式进行说明。
定义多个模式,按连续性语义设定,组合完整的模式序列。
先定义一个初始模式作为开头:
Pattern<Event, ?> start = Pattern.<Event>begin("start");
接下来,你可以增加更多的模式到模式序列中并指定它们之间所需的连续条件。FlinkCEP支持事件之间如下形式的连续策略:
严格连续: 期望所有匹配的事件严格的一个接一个出现,中间没有任何不匹配的事件。
松散连续: 忽略匹配的事件之间的不匹配的事件。
不确定的松散连续: 更进一步的松散连续,允许忽略掉一些匹配事件的附加匹配。
可以使用下面的方法来指定模式之间的连续策略:
next(),指定严格连续,
followedBy(),指定松散连续,
followedByAny(),指定不确定的松散连续。
或者
notNext(),如果不想后面直接连着一个特定事件
notFollowedBy(),如果不想一个特定事件发生在两个事件之间的任何地方。
- 如果模式序列没有定义时间约束,则不能以 notFollowedBy() 结尾。
- 一个 NOT 模式前面不能是可选的模式。
使用方法入下所示
// 严格连续
Pattern<Event, ?> strict = start.next("middle").where(...);
// 松散连续
Pattern<Event, ?> relaxed = start.followedBy("middle").where(...);
// 不确定的松散连续
Pattern<Event, ?> nonDetermin = start.followedByAny("middle").where(...);
// 严格连续的NOT模式
Pattern<Event, ?> strictNot = start.notNext("not").where(...);
// 松散连续的NOT模式
Pattern<Event, ?> relaxedNot = start.notFollowedBy("not").where(...);
松散连续意味着跟着的事件中,只有第一个可匹配的事件会被匹配上,而不确定的松散连接情况下,有着同样起始的多个匹配会被输出。 举例来说,模式"a b",给定事件序列"a",“c”,“b1”,“b2”,会产生如下的结果:
"a"和"b"之间严格连续: {} (没有匹配),"a"之后的"c"导致"a"被丢弃。
“a"和"b"之间松散连续: {a b1},松散连续会"跳过不匹配的事件直到匹配上的事件”。
"a"和"b"之间不确定的松散连续: {a b1}, {a b2},这是最常见的情况。
也可以为模式定义一个有效时间约束。 例如,你可以通过pattern.within()方法指定一个模式应该在10秒内发生。 这种时间模式支持处理时间和事件时间.
一个模式序列只能有一个时间限制。如果限制了多个时间在不同的单个模式上,会使用最小的那个时间限制。
注意定义过时间约束的模式允许以 notFollowedBy() 结尾。 例如,可以定义如下的模式:
Pattern.<Event>begin("start")
.next("middle").where(new SimpleCondition<Event>() {
@Override
public boolean filter(Event value) throws Exception {
return value.getName().equals("a");
}
}).notFollowedBy("end").where(new SimpleCondition<Event>() {
@Override
public boolean filter(Event value) throws Exception {
return value.getName().equals("b");
}
}).within(Time.seconds(10));
循环模式中的连续性
你可以在循环模式中使用和前面章节讲过的同样的连续性。 连续性会被运用在被接受进入模式的事件之间。 用这个例子来说明上面所说的连续性,一个模式序列"a b+ c"(“a"后面跟着一个或者多个(不确定连续的)“b”,然后跟着一个"c”) 输入为"a",“b1”,“d1”,“b2”,“d2”,“b3”,“c”,输出结果如下:
严格连续: {a b1 c}, {a b2 c}, {a b3 c} - 没有相邻的 “b” 。
松散连续: {a b1 c},{a b1 b2 c},{a b1 b2 b3 c},{a b2 c},{a b2 b3 c},{a b3 c} - "d"都被忽略了。
不确定松散连续: {a b1 c},{a b1 b2 c},{a b1 b3 c},{a b1 b2 b3 c},{a b2 c},{a b2 b3 c},{a b3 c} - 注意{a b1 b3 c},这是因为"b"之间是不确定松散连续产生的。
对于循环模式(例如oneOrMore()和times())),默认是松散连续。如果想使用严格连续,你需要使用consecutive()方法明确指定, 如果想使用不确定松散连续,你可以使用allowCombinations()方法。
// 循环模式,模式内部指定为间隔连续
pattern.oneOrMore().consecutive()
pattern.times().consecutive()
// 循环模式,模式内部指定为不确定松散连续
pattern.oneOrMore().allowCombinations()
pattern.times().allowCombinations()
模式组
也可以定义一个模式序列作为begin,followedBy,followedByAny和next的条件。这个模式序列在逻辑上会被当作匹配的条件, 并且返回一个GroupPattern,可以在GroupPattern上使用oneOrMore(),times(#ofTimes), times(#fromTimes, #toTimes),optional(),consecutive(),allowCombinations()。
模式组:GroupPattern,
模式组使用示例:
GroupPattern<Event, ?> start = GroupPattern.begin(
Pattern.<Event>begin("start").where(...).followedBy("start_middle").where(...)
);
// 严格连续
GroupPattern<Event, ?> strict = start.next(
Pattern.<Event>begin("next_start").where(...).followedBy("next_middle").where(...)
).times(3);
// 松散连续
GroupPattern<Event, ?> relaxed = start.followedBy(
Pattern.<Event>begin("followedby_start").where(...).followedBy("followedby_middle").where(...)
).oneOrMore();
// 不确定松散连续
GroupPattern<Event, ?> nonDetermin = start.followedByAny(
Pattern.<Event>begin("followedbyany_start").where(...).followedBy("followedbyany_middle").where(...)
).optional();
需要注意的是模式组不支持一下操作:
GroupPattern 不支持 where()
GroupPattern 不支持 or()
GroupPattern 不支持 subtype()
匹配后跳过策略
对于一个给定的模式,同一个事件可能会分配到多个成功的匹配上。为了控制一个事件会分配到多少个匹配上,你需要指定跳过策略AfterMatchSkipStrategy。 有五种跳过策略,如下:
NO_SKIP: 每个成功的匹配都会被输出。
SKIP_TO_NEXT: 丢弃以相同事件开始的所有部分匹配。
SKIP_PAST_LAST_EVENT: 丢弃起始在这个匹配的开始和结束之间的所有部分匹配。
SKIP_TO_FIRST: 丢弃起始在这个匹配的开始和第一个出现的名称为PatternName事件之间的所有部分匹配。
SKIP_TO_LAST: 丢弃起始在这个匹配的开始和最后一个出现的名称为PatternName事件之间的所有部分匹配。
注意当使用SKIP_TO_FIRST和SKIP_TO_LAST策略时,需要指定一个合法的PatternName.
例如,给定一个模式b+ c和一个数据流b1 b2 b3 c,不同跳过策略之间的不同如下:
想指定要使用的跳过策略,只需要调用下面的方法创建AfterMatchSkipStrategy:
AfterMatchSkipStrategy skipStrategy = ...;
Pattern.begin("patternName", skipStrategy);
使用SKIP_TO_FIRST/LAST时,有两个选项可以用来处理没有事件可以映射到对应模式名上的情况。
默认情况下会使用NO_SKIP策略,另外一个选项是抛出异常。 可以使用如下的选项:
AfterMatchSkipStrategy.skipToFirst(patternName).throwExceptionOnMiss();
检测模式
在指定Pattern之后,还需要将其应用于数据流之上,生成一个PatternStream,用于处理匹配结果数据自定义输出(sink流的数据结构,数据内容),对于事件时间戳(或处理时间,取决于是采用时间时间还是处理时间进行处理)一样时还可以自定义一个比较器对事件进行排序。
代码实现模式如下:
DataStream<Event> input = ...;
Pattern<Event, ?> pattern = ...;
EventComparator<Event> comparator = ...; // 可选的
PatternStream<Event> patternStream = CEP.pattern(input, pattern, comparator);
输入流根据你的使用场景可以是keyed或者non-keyed。
处理模式序列匹配结果
在获得到一个PatternStream之后,你可以应用各种转换来发现事件序列。官网推荐使用PatternProcessFunction。
PatternProcessFunction有一个processMatch的方法在每找到一个匹配的事件序列时都会被调用。 它按照Map<String, List>的格式接收一个匹配,映射的键是你的模式序列中的每个模式的名称,值是被接受的事件列表(IN是输入事件的类型)。 模式的输入事件按照时间戳进行排序。为每个模式返回一个接受的事件列表的原因是当使用循环模式(比如oneToMany()和times())时, 对一个模式会有不止一个事件被接受。
patternStream .process(new PatternProcessFunction<ObjectNode, Object>() {
@Override
public void processMatch(Map<String, List<ObjectNode>> map, Context context, Collector<Object> collector) throws Exception {
// 自定义处理
// context.output(outputTag, ***) 可以定义侧输出
// 输出结果
collector.out(new Alert(map));
}
})
CEP中提供process和select 相关的PatternFunction,其中select主要是兼容早期版本保留的Function
处理超时的部分匹配
当一个模式上通过within加上窗口长度后,部分匹配的事件序列就可能因为超过窗口长度而被丢弃。可以使用TimedOutPartialMatchHandler接口 来处理超时的部分匹配。这个接口可以和其它的混合使用。也就是说你可以在自己的PatternProcessFunction里另外实现这个接口。 TimedOutPartialMatchHandler提供了另外的processTimedOutMatch方法,这个方法对每个超时的部分匹配都会调用。
class MyPatternProcessFunction<IN, OUT> extends PatternProcessFunction<IN, OUT> implements TimedOutPartialMatchHandler<IN> {
@Override
public void processMatch(Map<String, List<IN>> match, Context ctx, Collector<OUT> out) throws Exception;
...
}
@Override
public void processTimedOutMatch(Map<String, List<IN>> match, Context ctx) throws Exception;
IN startEvent = match.get("start").get(0);
ctx.output(outputTag, T(startEvent));
}
}
processTimedOutMatch不能访问主输出。 但可以通过Context对象把结果输出到侧输出。
除了使用PatternProcessFunction,还可以使用platSelect函数实现主流和超时事件侧输出
PatternStream<Event> patternStream = CEP.pattern(input, pattern);
// 定义侧流输出
OutputTag<String> outputTag = new OutputTag<String>("side-output"){};
SingleOutputStreamOperator<ComplexEvent> flatResult = patternStream.flatSelect(
outputTag, // 这里设定timeout的事件输出的侧流
new PatternFlatTimeoutFunction<Event, TimeoutEvent>() { //超时部分匹配的事件输出到侧流
public void timeout(
Map<String, List<Event>> pattern,
long timeoutTimestamp,
Collector<TimeoutEvent> out) throws Exception {
out.collect(new TimeoutEvent());
}
},
new PatternFlatSelectFunction<Event, ComplexEvent>() { //匹配事件结果处理输出到主流
public void flatSelect(Map<String, List<IN>> pattern, Collector<OUT> out) throws Exception {
out.collect(new ComplexEvent());
}
}
);
DataStream<TimeoutEvent> timeoutFlatResult = flatResult.getSideOutput(outputTag);
Flink CEP目前的局限
Flink CEP的实现目前局限性比较大,主要是以下几点:
1.只支持单个规则(转换为一个Pattern序列)
2.不支持规则的动态更新,规则的动态更新需要自行扩展,网上有小伙伴提供了扩展思路,可以参考:https://mp.weixin.qq.com/s/mh–wQvAWQq2tDPKq0-m8Q;后续我也会按这个思路实现一个简易版本提供修改源码的路径。
3.阿里云实实时算实现了动态多规则更新,github项目:ververica-cep-demo,不过这个版本需要在阿里云Flink实时计算托管环境才能使用,属于商用性质的。
本文主要参考Flink官网1.16.0中文文档,https://nightlies.apache.org/flink/flink-docs-release-1.16/zh/docs/libs/cep/,部分功能是基于1.16.1新增特性进行说明的。
若想获取最新消息可以关注Flink 中文社区,GitHub flink
传送门:
Flink CEP概念熟悉:https://mp.weixin.qq.com/s/2vxjhh-h4JvHq6HrSY8VAA
Flink CEP单个规则动态更新规则:https://mp.weixin.qq.com/s/mh–wQvAWQq2tDPKq0-m8Q
Flink CEP官网文档:https://nightlies.apache.org/flink/flink-docs-release-1.16/docs/libs/cep/
Flink CEP阈值类事件检测阈值动态更新:待续写