文章目录
- 一 Flink CEP简介
- 1 什么是复杂事件处理CEP
- 2 Flink CEP
- (1)导入依赖
- (2)代码编写
- (3)优化模板
- 3 实现CEP底层 -- 有限状态机
- 4 使用CEP处理超时事件
一 Flink CEP简介
1 什么是复杂事件处理CEP
一个或多个由简单事件构成的事件流通过一定的规则匹配,然后输出用户想得到的数据,满足规则的复杂事件。
特征有如下几点:
- 目标:从有序的简单事件流中发现一些高阶特征。
- 输入:一个或多个由简单事件构成的事件流。
- 处理:识别简单事件之间的内在联系,多个符合一定规则的简单事件构成复杂事件。
- 输出:满足规则的复杂事件。
如下图中,将输入流中的元素,按照连续两个事件,且第一个元素为正方形,第二个元素为三角形进行过滤:
CEP用于分析低延迟、频繁产生的不同来源的事件流。CEP可以帮助在复杂的、不相关的事件流中找出有意义的模式和复杂的关系,以接近实时或准实时的获得通知并阻止一些行为。
CEP支持在流上进行模式匹配,根据模式的条件不同,分为连续的条件或不连续的条件;模式的条件允许有时间的限制,当在条件范围内没有达到满足的条件时,会导致模式匹配超时。
看起来很简单,但是它有很多不同的功能:
- 输入的流数据,尽快产生结果
- 在2个event流上,基于时间进行聚合类的计算
- 提供实时/准实时的警告和通知
- 在多样的数据源中产生关联并分析模式
- 高吞吐、低延迟的处理
市场上有多种CEP的解决方案,例如Spark、Samza、Beam等,但他们都没有提供专门的library支持。但是Flink提供了专门的CEP library。
2 Flink CEP
Flink为CEP提供了专门的Flink CEP library,它包含如下组件:
- Event Stream
- pattern定义
- pattern检测
- 生成Alert
首先,开发人员要在DataStream流上定义出模式条件,之后Flink CEP引擎进行模式检测,必要时生成告警。
(1)导入依赖
为了使用Flink CEP,需要导入依赖:
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-cep_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
(2)代码编写
使用API完成检测用户连续三次登录失败的需求
public static void main(String[] args) throws Exception{
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
SingleOutputStreamOperator<day06.Example3.Event> stream = env
.fromElements(
new day06.Example3.Event("user-1", "log-fail", 1000L),
new day06.Example3.Event("user-1", "log-fail", 2000L),
new day06.Example3.Event("user-2", "log-succ", 3000L),
new day06.Example3.Event("user-1", "log-fail", 4000L),
new day06.Example3.Event("user-1", "log-fail", 5000L)
)
.assignTimestampsAndWatermarks(WatermarkStrategy.<day06.Example3.Event>forMonotonousTimestamps()
.withTimestampAssigner(new SerializableTimestampAssigner<day06.Example3.Event>() {
@Override
public long extractTimestamp(day06.Example3.Event element, long recordTimestamp) {
return element.timestamp;
}
}));
// 定义模板(org.apache.flink.cep.pattern.Pattern)
Pattern<day06.Example3.Event, day06.Example3.Event> pattern = Pattern
.<day06.Example3.Event>begin("first") // 给第一个匹配事件起名
.where(new SimpleCondition<day06.Example3.Event>() {
@Override
public boolean filter(day06.Example3.Event value) throws Exception {
return value.eventType.equals("log-fail");
}
})
.next("second") //next表示严格紧邻
.where(new SimpleCondition<day06.Example3.Event>() {
@Override
public boolean filter(day06.Example3.Event value) throws Exception {
return value.eventType.equals("log-fail");
}
})
.next("third")
.where(new SimpleCondition<day06.Example3.Event>() {
@Override
public boolean filter(day06.Example3.Event value) throws Exception {
return value.eventType.equals("log-fail");
}
});
// 在流上使用模板,参数为输入的流和要匹配的模板
PatternStream<day06.Example3.Event> patternStream = CEP.pattern(stream.keyBy(r -> r.orderId), pattern);
// 使用select方法将匹配到的事件取出
patternStream
.select(new PatternSelectFunction<day06.Example3.Event, String>() {
@Override
public String select(Map<String, List<day06.Example3.Event>> map) throws Exception {
// map的key是给时间起的名字,v为名字对应的事件列表
// 上例中事件只有一个,列表中只有一个元素
day06.Example3.Event first = map.get("first").get(0);
day06.Example3.Event second = map.get("second").get(0);
day06.Example3.Event third = map.get("third").get(0);
String result = "用户:" + first.orderId + "分别在以下三个时间:" + first.timestamp
+ "、" + second.timestamp + "、" + third.timestamp + "登录失败了";
return result;
}
})
.print();
env.execute();
}
(3)优化模板
Pattern<Example3.Event, Example3.Event> pattern = Pattern
.<day06.Example3.Event>begin("log-fail") // 给第一个匹配事件起名
.where(new SimpleCondition<Example3.Event>() {
@Override
public boolean filter(day06.Example3.Event value) throws Exception {
return value.eventType.equals("log-fail");
}
})
.times(3);
patternStream
.select(new PatternSelectFunction<Example3.Event, String>() {
@Override
public String select(Map<String, List<Example3.Event>> map) throws Exception {
// map的key是给时间起的名字,v为名字对应的事件列表
// 上例中事件只有一个,列表中只有一个元素
day06.Example3.Event first = map.get("log-fail").get(0);
day06.Example3.Event second = map.get("log-fail").get(1);
day06.Example3.Event third = map.get("log-fail").get(2);
String result = "用户:" + first.orderId + "分别在以下三个时间:" + first.timestamp
+ "、" + second.timestamp + "、" + third.timestamp + "登录失败了";
return result;
}
})
.print();
3 实现CEP底层 – 有限状态机
使用状态机实现检测连续三次登录失败,实现原理如下图:
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
SingleOutputStreamOperator<Example3.Event> stream = env
.fromElements(
new day06.Example3.Event("user-1", "log-fail", 1000L),
new day06.Example3.Event("user-1", "log-fail", 2000L),
new day06.Example3.Event("user-2", "log-succ", 3000L),
new day06.Example3.Event("user-1", "log-fail", 4000L),
new day06.Example3.Event("user-1", "log-fail", 5000L)
)
.assignTimestampsAndWatermarks(WatermarkStrategy.<day06.Example3.Event>forMonotonousTimestamps()
.withTimestampAssigner(new SerializableTimestampAssigner<Example3.Event>() {
@Override
public long extractTimestamp(day06.Example3.Event element, long recordTimestamp) {
return element.timestamp;
}
}));
stream
.keyBy(r -> r.orderId)
.process(new KeyedProcessFunction<String, Example3.Event, String>() {
private HashMap<Tuple2<String,String>,String> stateMachine = new HashMap<>();
private ValueState<String> currentState;
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
// 如果接收到初始登录状态为成功,返回SUCCESS
// 状态转移矩阵
// key:(状态,接收到事件的类型)
// value:(将要跳转到的状态)
stateMachine.put(Tuple2.of("INITIAL","log-succ"),"SUCCESS");
stateMachine.put(Tuple2.of("INITIAL","log-fail"),"s1");
stateMachine.put(Tuple2.of("s1","log-succ"),"SUCCESS");
stateMachine.put(Tuple2.of("s1","log-fail"),"s2");
stateMachine.put(Tuple2.of("s2","log-succ"),"SUCCESS");
stateMachine.put(Tuple2.of("s2","log-fail"),"FAIL");
currentState = getRuntimeContext().getState(
new ValueStateDescriptor<String>("current-state", Types.STRING)
);
}
@Override
public void processElement(Example3.Event value, Context ctx, Collector<String> out) throws Exception {
if(currentState.value() == null){
currentState.update("INITIAL");
}
// 记录将要跳转到的状态
// 如initial状态,到来事件值为log-succ,那么nextState为SUCCESS
// 取状态机的value部分
String nextState = stateMachine.get(Tuple2.of(currentState.value(), value.eventType));
if(nextState.equals("FAIL")){
out.collect("用户" + value.orderId + "连续三次登录失败了");
currentState.update("s2");
} else if(nextState.equals("SUCCESS")){
currentState.clear();
} else {
currentState.update(nextState);
}
}
})
.print();
env.execute();
}
4 使用CEP处理超时事件
现实中,并不是每次下单都会被用户立刻支付。当拖延一段时间后,用户支付的意愿会降低。所以为了让用户更有紧迫感从而提高支付转化率,同时也为了防范订单支付环节的安全风险,电商网站往往会对订单状态进行监控,设置一个失效时间(比如 15 分钟),如果下单后一段时间仍未支付,订单就会被取消。
先将事件流按照订单号 orderId 分流,然后定义这样的一个事件模式:在 15 分钟内,事件“create”与“pay”严格紧邻:
public static void main(String[] args) throws Exception{
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
SingleOutputStreamOperator<Example3.Event> stream = env
.fromElements(
new day06.Example3.Event("order-1", "create", 1000L),
new day06.Example3.Event("order-2", "create", 2000L),
new day06.Example3.Event("order-1", "pay", 19000L)
)
.assignTimestampsAndWatermarks(WatermarkStrategy.<day06.Example3.Event>forMonotonousTimestamps()
.withTimestampAssigner(new SerializableTimestampAssigner<Example3.Event>() {
@Override
public long extractTimestamp(day06.Example3.Event element, long recordTimestamp) {
return element.timestamp;
}
}));
Pattern<Example3.Event, Example3.Event> pattren = Pattern
.<Example3.Event>begin("create")
.where(new SimpleCondition<Example3.Event>() {
@Override
public boolean filter(Example3.Event value) throws Exception {
return value.eventType.equals("create");
}
})
.next("pay")
.where(new SimpleCondition<Example3.Event>() {
@Override
public boolean filter(Example3.Event value) throws Exception {
return value.eventType.equals("pay");
}
})
// 要求两事件的间隔时间不能超过15分钟
.within(Time.minutes(15));
PatternStream<Example3.Event> patternStream = CEP.pattern(stream.keyBy(r -> r.orderId), pattren);
SingleOutputStreamOperator<String> result = patternStream
// 第一个参数是侧输出标签
// 第二个参数用于将超时事件发送到侧输出流
// 第三个参数用于处理正常事件
.flatSelect(
new OutputTag<String>("timeout") {
},
new PatternFlatTimeoutFunction<Example3.Event, String>() {
@Override
public void timeout(Map<String, List<Example3.Event>> map, long l, Collector<String> collector) throws Exception {
Example3.Event create = map.get("create").get(0);
collector.collect("订单:" + create.orderId + "超时了");
}
},
new PatternFlatSelectFunction<Example3.Event, String>() {
@Override
public void flatSelect(Map<String, List<Example3.Event>> map, Collector<String> collector) throws Exception {
Example3.Event pay = map.get("pay").get(0);
collector.collect("订单:" + pay.orderId + "已支付");
}
}
);
result.print();
result.getSideOutput(new OutputTag<String>("timeout"){}).print();
env.execute();
}