系列文章目录
Flink第一章:环境搭建
Flink第二章:基本操作.
Flink第三章:基本操作(二)
Flink第四章:水位线和窗口
Flink第五章:处理函数
Flink第六章:多流操作
Flink第七章:状态编程
Flink第八章:FlinkSQL
Flink第九章:Flink CEP
文章目录
- 系列文章目录
- 前言
- 一、简单案例
- 1.LoginFailDetect.scala
- 2.LoginFailDetectpro.scala
- 3.OrderTimeoutDetect.scala
- 3.状态机实现
- 总结
前言
这次是Flink的最后一次内容,终于还是在放假前啃完了.
FlinkCEP是在Flink上层实现的复杂事件处理库。 它可以让你在无限事件流中检测出特定的事件模型,有机会掌握数据中重要的那部分。
这是官方的介绍,看看就行了.
先引入需要的依赖
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-cep-scala_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
一、简单案例
1.LoginFailDetect.scala
检测连续三次登录失败的用户
package com.atguigu.chapter08
import org.apache.flink.cep.PatternSelectFunction
import org.apache.flink.cep.scala.{CEP, PatternStream}
import org.apache.flink.cep.scala.pattern.Pattern
import org.apache.flink.streaming.api.scala._
import java.util
object LoginFailDetect {
def main(args: Array[String]): Unit = {
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
// 1.数据源
val loginEventStream: DataStream[LoginEvent] = env.fromElements(
LoginEvent("user_1", "192.168.0.1", "fail", 2000L),
LoginEvent("user_1", "192.168.0.2", "fail", 3000L),
LoginEvent("user_2", "192.168.1.29", "fail", 4000L),
LoginEvent("user_1", "171.56.23.10", "fail", 5000L),
LoginEvent("user_2", "192.168.1.29", "success", 6000L),
LoginEvent("user_2", "192.168.1.29", "fail", 7000L),
LoginEvent("user_2", "192.168.1.29", "fail", 8000L)
).assignAscendingTimestamps(_.timestamp)
// 2.定义Pattern,检测连续三次登录失败时间
val pattern: Pattern[LoginEvent, LoginEvent] = Pattern.begin[LoginEvent]("firstFail").where(_.eventType == "fail")
.next("secondFail").where(_.eventType == "fail")
.next("thirdFail").where(_.eventType == "fail")
// 3. 将Pattern 检测应用到事件流
val patternStream: PatternStream[LoginEvent] = CEP.pattern(loginEventStream.keyBy(_.userId), pattern)
// 4.定义处理规则,精检测到的匹配事件报警输出
val resultStream: DataStream[String] = patternStream.select(new PatternSelectFunction[LoginEvent, String] {
override def select(map: util.Map[String, util.List[LoginEvent]]): String = {
// 获取匹配到的复杂时间
val firstFail: LoginEvent = map.get("firstFail").get(0)
val secondFail: LoginEvent = map.get("secondFail").get(0)
val thirdFail: LoginEvent = map.get("thirdFail").get(0)
// 包装报警信息 输出
s"${firstFail.userId} 连续三次登录失败! 登录时间:${firstFail.timestamp},${secondFail.timestamp},${thirdFail.timestamp}"
}
})
resultStream.print()
env.execute()
}
}
case class LoginEvent(userId: String, ipAddr: String, eventType: String, timestamp: Long)
2.LoginFailDetectpro.scala
使用(Pattern API)中的量词改进代码
package com.atguigu.chapter08
import org.apache.flink.cep.functions.PatternProcessFunction
import org.apache.flink.cep.scala.{CEP, PatternStream}
import org.apache.flink.cep.scala.pattern.Pattern
import org.apache.flink.streaming.api.scala._
import org.apache.flink.util.Collector
import java.util
object LoginFailDetectpro {
def main(args: Array[String]): Unit = {
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
// 1.数据源
val loginEventStream: DataStream[LoginEvent] = env.fromElements(
LoginEvent("user_1", "192.168.0.1", "fail", 2000L),
LoginEvent("user_1", "192.168.0.2", "fail", 3000L),
LoginEvent("user_2", "192.168.1.29", "fail", 4000L),
LoginEvent("user_1", "171.56.23.10", "fail", 5000L),
LoginEvent("user_2", "192.168.1.29", "success", 6000L),
LoginEvent("user_2", "192.168.1.29", "fail", 7000L),
LoginEvent("user_2", "192.168.1.29", "fail", 8000L)
).assignAscendingTimestamps(_.timestamp)
// 2.定义Pattern,检测连续三次登录失败时间
val pattern: Pattern[LoginEvent, LoginEvent] = Pattern.begin[LoginEvent]("Fail").where(_.eventType=="fail").times(3).consecutive()
// 3. 将Pattern 检测应用到事件流
val patternStream: PatternStream[LoginEvent] = CEP.pattern(loginEventStream.keyBy(_.userId), pattern)
// 4.定义处理规则,精检测到的匹配事件报警输出
val resultStream: DataStream[String] = patternStream.process(new PatternProcessFunction[LoginEvent,String] {
override def processMatch(map: util.Map[String, util.List[LoginEvent]], context: PatternProcessFunction.Context, collector: Collector[String]): Unit = {
val firstFail: LoginEvent = map.get("Fail").get(0)
val secondFail: LoginEvent = map.get("Fail").get(1)
val thirdFail: LoginEvent = map.get("Fail").get(2)
// 包装报警信息 输出
collector.collect(s"${firstFail.userId} 连续三次登录失败! 登录时间:${firstFail.timestamp},${secondFail.timestamp},${thirdFail.timestamp}")
}
})
resultStream.print()
env.execute()
}
}
3.OrderTimeoutDetect.scala
处理超时事件
package com.atguigu.chapter08
import org.apache.flink.cep.functions.{PatternProcessFunction, TimedOutPartialMatchHandler}
import org.apache.flink.cep.scala.{CEP, PatternStream}
import org.apache.flink.cep.scala.pattern.Pattern
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.util.Collector
import java.util
// 定义订单事件样例类
case class OrderEvent(userId: String, orderId: String, eventType: String, timestamp: Long)
object OrderTimeoutDetect {
def main(args: Array[String]): Unit = {
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
// 1.读取源
val orderEventStream: DataStream[OrderEvent] = env.fromElements(
OrderEvent("user_1", "order_1", "create", 1000L),
OrderEvent("user_2", "order_2", "create", 2000L),
OrderEvent("user_1", "order_1", "modify", 10 * 1000L),
OrderEvent("user_1", "order_1", "pay", 60 * 1000L),
OrderEvent("user_2", "order_3", "create", 10 * 60 * 1000L),
OrderEvent("user_2", "order_3", "pay", 20 * 60 * 1000L)
).assignAscendingTimestamps(_.timestamp)
.keyBy(_.orderId)
// 2.定义检测模式
val pattern: Pattern[OrderEvent, OrderEvent] = Pattern.begin[OrderEvent]("create").where(_.eventType == "create")
.followedBy("pay").where(_.eventType == "pay")
.within(Time.minutes(15))
// 3.应用到事件流
val patternStream: PatternStream[OrderEvent] = CEP.pattern(orderEventStream, pattern)
// 4.检测匹配事件和部分匹配的超时事件
val payedOrderStream: DataStream[String] = patternStream.process(new OrderPayDetect())
payedOrderStream.getSideOutput(new OutputTag[String]("timeout")).print("timeout")
payedOrderStream.print("payed")
env.execute()
}
class OrderPayDetect() extends PatternProcessFunction[OrderEvent,String] with TimedOutPartialMatchHandler[OrderEvent] {
override def processMatch(map: util.Map[String, util.List[OrderEvent]], context: PatternProcessFunction.Context, collector: Collector[String]): Unit = {
// 正常支付事件
val payEvent: OrderEvent = map.get("pay").get(0)
collector.collect(s"订单${payEvent.orderId}已成功支付")
}
override def processTimedOutMatch(map: util.Map[String, util.List[OrderEvent]], context: PatternProcessFunction.Context): Unit = {
// 超时事件
val createEvent: OrderEvent = map.get("create").get(0)
context.output(new OutputTag[String]("timeout"),s"订单${createEvent.orderId}超时未支付! 用户${createEvent.userId}")
}
}
}
3.状态机实现
package com.atguigu.chapter08
import org.apache.flink.api.common.functions.RichFlatMapFunction
import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor}
import org.apache.flink.streaming.api.scala._
import org.apache.flink.util.Collector
object NFAExample {
def main(args: Array[String]): Unit = {
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
// 1.数据源
val loginEventStream: DataStream[LoginEvent] = env.fromElements(
LoginEvent("user_1", "192.168.0.1", "fail", 2000L),
LoginEvent("user_1", "192.168.0.2", "fail", 3000L),
LoginEvent("user_2", "192.168.1.29", "fail", 4000L),
LoginEvent("user_1", "171.56.23.10", "fail", 5000L),
LoginEvent("user_2", "192.168.1.29", "success", 6000L),
LoginEvent("user_2", "192.168.1.29", "fail", 7000L),
LoginEvent("user_2", "192.168.1.29", "fail", 8000L)
).assignAscendingTimestamps(_.timestamp)
val resultStream: DataStream[String] = loginEventStream.keyBy(_.userId).flatMap(new StateMachineMapper())
resultStream.print()
env.execute()
}
// 实现自定义的RichFlatmapFunction
class StateMachineMapper() extends RichFlatMapFunction[LoginEvent, String] {
lazy val currentState: ValueState[State] = getRuntimeContext.getState(new ValueStateDescriptor[State]("state", classOf[State]))
override def flatMap(in: LoginEvent, collector: Collector[String]): Unit = {
// 定义一个状态机的状态
if (currentState.value() == null) {
currentState.update(Initial)
}
val nextState: State = transition(currentState.value(), in.eventType)
nextState match {
case Matched => collector.collect(s"${in.userId}连续三次登录失败")
case Terminal => currentState.update(Initial)
case _ => currentState.update(nextState)
}
}
}
// 将状态state定义为封闭的特征
sealed trait State
case object Initial extends State
case object Terminal extends State
case object Matched extends State
case object S1 extends State
case object S2 extends State
// 定义状态转移函数
def transition(state: State, eventType: String): State = {
(state, eventType) match {
case (Initial, "success") => Terminal
case (Initial, "fail") => S1
case (S1, "success") => Terminal
case (S1, "fail") => S2
case (S2, "success") => Terminal
case (S2, "fail") => Matched
}
}
}
总结
最后的CEP有点抽象,我也没完全理解,有机会在巩固巩固吧.