Flink第九章:Flink CEP

news2025/1/15 20:41:31

系列文章目录

Flink第一章:环境搭建
Flink第二章:基本操作.
Flink第三章:基本操作(二)
Flink第四章:水位线和窗口
Flink第五章:处理函数
Flink第六章:多流操作
Flink第七章:状态编程
Flink第八章:FlinkSQL
Flink第九章:Flink CEP


文章目录

  • 系列文章目录
  • 前言
  • 一、简单案例
    • 1.LoginFailDetect.scala
    • 2.LoginFailDetectpro.scala
    • 3.OrderTimeoutDetect.scala
    • 3.状态机实现
  • 总结


前言

这次是Flink的最后一次内容,终于还是在放假前啃完了.

FlinkCEP是在Flink上层实现的复杂事件处理库。 它可以让你在无限事件流中检测出特定的事件模型,有机会掌握数据中重要的那部分。

这是官方的介绍,看看就行了.
先引入需要的依赖

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-cep-scala_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>

在这里插入图片描述


一、简单案例

1.LoginFailDetect.scala

检测连续三次登录失败的用户

package com.atguigu.chapter08


import org.apache.flink.cep.PatternSelectFunction
import org.apache.flink.cep.scala.{CEP, PatternStream}
import org.apache.flink.cep.scala.pattern.Pattern
import org.apache.flink.streaming.api.scala._

import java.util

object LoginFailDetect {
  def main(args: Array[String]): Unit = {
    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)

    // 1.数据源
    val loginEventStream: DataStream[LoginEvent] = env.fromElements(
      LoginEvent("user_1", "192.168.0.1", "fail", 2000L),
      LoginEvent("user_1", "192.168.0.2", "fail", 3000L),
      LoginEvent("user_2", "192.168.1.29", "fail", 4000L),
      LoginEvent("user_1", "171.56.23.10", "fail", 5000L),
      LoginEvent("user_2", "192.168.1.29", "success", 6000L),
      LoginEvent("user_2", "192.168.1.29", "fail", 7000L),
      LoginEvent("user_2", "192.168.1.29", "fail", 8000L)
    ).assignAscendingTimestamps(_.timestamp)

    // 2.定义Pattern,检测连续三次登录失败时间
    val pattern: Pattern[LoginEvent, LoginEvent] = Pattern.begin[LoginEvent]("firstFail").where(_.eventType == "fail")
      .next("secondFail").where(_.eventType == "fail")
      .next("thirdFail").where(_.eventType == "fail")

    // 3. 将Pattern 检测应用到事件流
    val patternStream: PatternStream[LoginEvent] = CEP.pattern(loginEventStream.keyBy(_.userId), pattern)

    // 4.定义处理规则,精检测到的匹配事件报警输出
    val resultStream: DataStream[String] = patternStream.select(new PatternSelectFunction[LoginEvent, String] {
      override def select(map: util.Map[String, util.List[LoginEvent]]): String = {
        // 获取匹配到的复杂时间
        val firstFail: LoginEvent = map.get("firstFail").get(0)
        val secondFail: LoginEvent = map.get("secondFail").get(0)
        val thirdFail: LoginEvent = map.get("thirdFail").get(0)

        // 包装报警信息 输出
        s"${firstFail.userId} 连续三次登录失败! 登录时间:${firstFail.timestamp},${secondFail.timestamp},${thirdFail.timestamp}"
      }
    })
    resultStream.print()
    env.execute()
  }

}

case class LoginEvent(userId: String, ipAddr: String, eventType: String, timestamp: Long)

在这里插入图片描述
在这里插入图片描述

2.LoginFailDetectpro.scala

使用(Pattern API)中的量词改进代码

package com.atguigu.chapter08

import org.apache.flink.cep.functions.PatternProcessFunction
import org.apache.flink.cep.scala.{CEP, PatternStream}
import org.apache.flink.cep.scala.pattern.Pattern
import org.apache.flink.streaming.api.scala._
import org.apache.flink.util.Collector

import java.util

object LoginFailDetectpro {
  def main(args: Array[String]): Unit = {
    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)

    // 1.数据源
    val loginEventStream: DataStream[LoginEvent] = env.fromElements(
      LoginEvent("user_1", "192.168.0.1", "fail", 2000L),
      LoginEvent("user_1", "192.168.0.2", "fail", 3000L),
      LoginEvent("user_2", "192.168.1.29", "fail", 4000L),
      LoginEvent("user_1", "171.56.23.10", "fail", 5000L),
      LoginEvent("user_2", "192.168.1.29", "success", 6000L),
      LoginEvent("user_2", "192.168.1.29", "fail", 7000L),
      LoginEvent("user_2", "192.168.1.29", "fail", 8000L)
    ).assignAscendingTimestamps(_.timestamp)

    // 2.定义Pattern,检测连续三次登录失败时间
    val pattern: Pattern[LoginEvent, LoginEvent] = Pattern.begin[LoginEvent]("Fail").where(_.eventType=="fail").times(3).consecutive()

    // 3. 将Pattern 检测应用到事件流
    val patternStream: PatternStream[LoginEvent] = CEP.pattern(loginEventStream.keyBy(_.userId), pattern)

    // 4.定义处理规则,精检测到的匹配事件报警输出
    val resultStream: DataStream[String] = patternStream.process(new PatternProcessFunction[LoginEvent,String] {
      override def processMatch(map: util.Map[String, util.List[LoginEvent]], context: PatternProcessFunction.Context, collector: Collector[String]): Unit = {
        val firstFail: LoginEvent = map.get("Fail").get(0)
        val secondFail: LoginEvent = map.get("Fail").get(1)
        val thirdFail: LoginEvent = map.get("Fail").get(2)

        // 包装报警信息 输出
        collector.collect(s"${firstFail.userId} 连续三次登录失败! 登录时间:${firstFail.timestamp},${secondFail.timestamp},${thirdFail.timestamp}")
      }
    })
    resultStream.print()
    env.execute()
  }


}

在这里插入图片描述

3.OrderTimeoutDetect.scala

处理超时事件

package com.atguigu.chapter08

import org.apache.flink.cep.functions.{PatternProcessFunction, TimedOutPartialMatchHandler}
import org.apache.flink.cep.scala.{CEP, PatternStream}
import org.apache.flink.cep.scala.pattern.Pattern
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.util.Collector

import java.util

// 定义订单事件样例类
case class OrderEvent(userId: String, orderId: String, eventType: String, timestamp: Long)

object OrderTimeoutDetect {
  def main(args: Array[String]): Unit = {
    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)

    // 1.读取源
    val orderEventStream: DataStream[OrderEvent] = env.fromElements(
      OrderEvent("user_1", "order_1", "create", 1000L),
      OrderEvent("user_2", "order_2", "create", 2000L),
      OrderEvent("user_1", "order_1", "modify", 10 * 1000L),
      OrderEvent("user_1", "order_1", "pay", 60 * 1000L),
      OrderEvent("user_2", "order_3", "create", 10 * 60 * 1000L),
      OrderEvent("user_2", "order_3", "pay", 20 * 60 * 1000L)
    ).assignAscendingTimestamps(_.timestamp)
      .keyBy(_.orderId)

    // 2.定义检测模式
    val pattern: Pattern[OrderEvent, OrderEvent] = Pattern.begin[OrderEvent]("create").where(_.eventType == "create")
      .followedBy("pay").where(_.eventType == "pay")
      .within(Time.minutes(15))

    // 3.应用到事件流
    val patternStream: PatternStream[OrderEvent] = CEP.pattern(orderEventStream, pattern)

    // 4.检测匹配事件和部分匹配的超时事件
    val payedOrderStream: DataStream[String] = patternStream.process(new OrderPayDetect())

    payedOrderStream.getSideOutput(new OutputTag[String]("timeout")).print("timeout")
    payedOrderStream.print("payed")

    env.execute()
  }


  class OrderPayDetect() extends PatternProcessFunction[OrderEvent,String] with TimedOutPartialMatchHandler[OrderEvent] {
    override def processMatch(map: util.Map[String, util.List[OrderEvent]], context: PatternProcessFunction.Context, collector: Collector[String]): Unit = {
      // 正常支付事件
      val payEvent: OrderEvent = map.get("pay").get(0)
      collector.collect(s"订单${payEvent.orderId}已成功支付")
    }

    override def processTimedOutMatch(map: util.Map[String, util.List[OrderEvent]], context: PatternProcessFunction.Context): Unit = {
      // 超时事件
      val createEvent: OrderEvent = map.get("create").get(0)
      context.output(new OutputTag[String]("timeout"),s"订单${createEvent.orderId}超时未支付! 用户${createEvent.userId}")
    }
  }
}

在这里插入图片描述

3.状态机实现

package com.atguigu.chapter08

import org.apache.flink.api.common.functions.RichFlatMapFunction
import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor}
import org.apache.flink.streaming.api.scala._
import org.apache.flink.util.Collector

object NFAExample {
  def main(args: Array[String]): Unit = {
    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)

    // 1.数据源
    val loginEventStream: DataStream[LoginEvent] = env.fromElements(
      LoginEvent("user_1", "192.168.0.1", "fail", 2000L),
      LoginEvent("user_1", "192.168.0.2", "fail", 3000L),
      LoginEvent("user_2", "192.168.1.29", "fail", 4000L),
      LoginEvent("user_1", "171.56.23.10", "fail", 5000L),
      LoginEvent("user_2", "192.168.1.29", "success", 6000L),
      LoginEvent("user_2", "192.168.1.29", "fail", 7000L),
      LoginEvent("user_2", "192.168.1.29", "fail", 8000L)
    ).assignAscendingTimestamps(_.timestamp)

    val resultStream: DataStream[String] = loginEventStream.keyBy(_.userId).flatMap(new StateMachineMapper())

    resultStream.print()
    env.execute()
  }

  // 实现自定义的RichFlatmapFunction
  class StateMachineMapper() extends RichFlatMapFunction[LoginEvent, String] {
    lazy val currentState: ValueState[State] = getRuntimeContext.getState(new ValueStateDescriptor[State]("state", classOf[State]))

    override def flatMap(in: LoginEvent, collector: Collector[String]): Unit = {
      // 定义一个状态机的状态
      if (currentState.value() == null) {
        currentState.update(Initial)
      }

      val nextState: State = transition(currentState.value(), in.eventType)

      nextState match {
        case Matched => collector.collect(s"${in.userId}连续三次登录失败")
        case Terminal => currentState.update(Initial)
        case _ => currentState.update(nextState)
      }

    }
  }

  // 将状态state定义为封闭的特征
  sealed trait State

  case object Initial extends State

  case object Terminal extends State

  case object Matched extends State

  case object S1 extends State

  case object S2 extends State

  // 定义状态转移函数
  def transition(state: State, eventType: String): State = {
    (state, eventType) match {
      case (Initial, "success") => Terminal
      case (Initial, "fail") => S1
      case (S1, "success") => Terminal
      case (S1, "fail") => S2
      case (S2, "success") => Terminal
      case (S2, "fail") => Matched
    }
  }
}

在这里插入图片描述


总结

最后的CEP有点抽象,我也没完全理解,有机会在巩固巩固吧.

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/607606.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Linux高性能服务器编程|阅读笔记:第11章 - 定时器

目录 简介系列笔记11.1 socket选项SO_RCVTIMEO和SO_SNDTIMEO11.2 SIGALRM信号11.2.1 基于升序链表的定时器11.2.2 处理非活动连接11.3 I/O复用系统调用的超时参数11.4 高性能定时器11.4.1 时间轮11.4.2 时间堆结语简介 Hello! 非常感谢您阅读海轰的文章,倘若文中有错误的地方…

光储存技术及原理

描述 信息资料迅速增长是当今社会的一大特点。有人统计&#xff0c;科技文献数量大约每7年增加1倍&#xff0c;而一般的情报资料则以每2年~3年翻一番的速度增加。大量资料的存储、分析、检索和传播&#xff0c;迫切需要高密度、大容量的存储介质和管理系统。 1898年荷兰的Vald…

Jenkins+Docker 实现一键自动化部署项目!步骤齐全,少走坑路

本文章实现最简单全面的Jenkinsdockerspringboot 一键自动部署项目&#xff0c;步骤齐全&#xff0c;少走坑路。 环境&#xff1a;centos7git(gitee) 简述实现步骤&#xff1a;在docker安装jenkins&#xff0c;配置jenkins基本信息&#xff0c;利用Dockerfile和shell脚本实现…

javaScript蓝桥杯-----芝麻开门

目录 一、介绍二、准备三、目标四、代码五、完成 一、介绍 在阿里巴巴和四十大盗的故事中&#xff0c;阿里巴巴因为无意中知道了开门的咒语人生发生了翻天覆地的变化&#xff0c;四十大盗也因为咒语的泄露最终丧命。芝麻开门的咒语作为重要的信息推动着故事的发展。下面由你来…

初识网络之TCP网络套接字

目录 一、TCP常用网络接口 1. 监听服务器 2. 接收链接 3. 发起连接 二、实现一个简单的tcp程序 1. 日志函数 2. 服务端文件 2.1 .hpp文件 2.2 .cpp文件 3. 客户端文件 3.1 .hpp文件 3.2 .cpp文件 4. 程序测试 三、实现支持多个用户并发访问的tcp程序 1. 当前程序…

解决Dockerfile错误: ERROR [3/3] RUN yum install -y wget vim net-tools

RUN yum install -y wget vim net-tools该命令是在使用容器编排工具&#xff08;如Docker&#xff09;的Dockerfile文件中执行的。它尝试安装一些常用的软件包&#xff0c;如wget、vim、net-tools。根据错误消息&#xff0c;执行此命令时发生了3个错误。 可能的原因包括&#…

chatgpt赋能python:Python去除非法字符:让你的数据更干净、更有效

Python去除非法字符&#xff1a;让你的数据更干净、更有效 在处理数据时&#xff0c;非法字符是常见的问题。它们可能是一些无意义的符号、特殊字符或非常规字符等等。如果不被正确处理&#xff0c;这些非法字符可能会给你带来麻烦&#xff0c;如导致脚本失败、破坏数据完整性…

LeetCode中等题合集 python

目录 3. 无重复字符的最长子串53. 最大子数组和80. 删除有序数组中的重复项 II442. 数组中重复的数据209. 长度最小的子数组59. 螺旋矩阵 II24. 两两交换链表中的节点19. 删除链表的倒数第 N 个结点142. 环形链表 II 3. 无重复字符的最长子串 滑动窗口&#xff0c;类似的题目还…

Biological Psychiatry:利用TMS-EEG识别难治性抑郁症间歇性θ脉冲刺激的神经生理标志物

前言 难治性抑郁症(TRD)是指在经过足够疗程的药物治疗或心理治疗后&#xff0c;症状没有得到改善的重度抑郁。大约有30%的重度抑郁症(MDD)属于难治性抑郁症。重复经颅磁刺激(rTMS)可引起大脑皮层兴奋或抑制性的改变&#xff0c;是TRD的有效干预措施。间歇性θ脉冲刺激(iTBS)一…

ClickHouse 基础

ClickHouse是2016年开源的列式存储数据库&#xff08;DBMS&#xff09;&#xff0c;使用C语言编写&#xff0c;主要用于在线分析处理查询OLAP&#xff0c;能够使用SQL查询实时生成分析数据报告。 一、列式存储 以下面的Tabel为例 IDNameGender1吴彦祖男2刘亦菲女3陈冠希男 …

INT8 中的稀疏性:加速的训练工作流程和NVIDIA TensorRT 最佳实践

INT8 中的稀疏性&#xff1a;加速的训练工作流程和NVIDIA TensorRT 最佳实践 文章目录 INT8 中的稀疏性&#xff1a;加速的训练工作流程和NVIDIA TensorRT 最佳实践结构稀疏量化在 TensorRT 中部署稀疏量化模型的工作流程案例研究&#xff1a;ResNet-34要求第 1 步&#xff1a;…

JDK17新特性 即将成为主流的JDK 深入了解

文章目录 Switch 语句增强优化字符串拼接代码instanceof增强密封类&#xff08;限制继承类&#xff09;ZGC垃圾收集器 对于JDK17比较明显的特性 总结 Switch 语句增强 首先就是简化了Switch语句&#xff1a; 去掉了break及可以直接给返回值赋值。并且再匹配多个值的时候也做了…

AI 导致留学中介文书老师痛失万元月收入?是真的吗?

近日&#xff0c;总部位于伦敦&#xff0c;但在国内多个城市设有分公司的留学服务机构老板张冶告诉在接受记者采访时&#xff0c;他们对留学英国的人士提供的本硕博申请、论文润色、挂科申诉等业务都受到了ChatGPT的影响&#xff0c;甚至有业务线直接萎缩60%以上&#xff0c;有…

【Python】Python系列教程-- Python3 列表(十三)

文章目录 前言访问元组修改元组删除元组元组运算符元组索引&#xff0c;截取元组内置函数关于元组是不可变的 前言 往期回顾&#xff1a; Python系列教程–Python3介绍&#xff08;一&#xff09;Python系列教程–Python3 环境搭建&#xff08;二&#xff09;Python系列教程–…

如何打造高效的IT服务中心

官方网站 www.itilzj.com 文档资料: wenku.itilzj.com 引言 在当今数字时代&#xff0c;IT服务越来越成为企业发展的关键和优势。高效的IT服务中心能够提高员工生产力&#xff0c;改善客户体验&#xff0c;降低IT成本&#xff0c;并为企业数字化转型奠定良好的基础。本文将介绍…

Unity制作二次元卡通渲染角色材质

Unity制作二次元材质角色 大家好&#xff0c;我是阿赵。接下来准备开一个系列&#xff0c;讲一下二次元卡通角色的渲染。   先来看看成品&#xff0c;我从网上下载了著名游戏《罪恶装备》里面的一个角色模型。在没有做材质之前&#xff0c;把贴图赋予上去&#xff0c;给一个U…

3、数据库:Oracle部署 - 系统部署系列文章

Oracle数据库的安装&#xff0c;以前写过一篇&#xff0c;这次将新版的安装再记录一次&#xff0c;让读者能够有所了解&#xff0c;笔者也能够记录下最新版的安装过程。 一、数据库下载&#xff1b; Oracle最新版目前在官网是19c&#xff0c;从下面这个链接进去下载便可。 http…

java的UDP(二)

文章目录 1. DatagramSocket类2. 简单的UDP客户端3. DatagramChannel 1. DatagramSocket类 要收发DatagramPacket&#xff0c;必须打开一个数据报Socket。在java中&#xff0c;数据报Socekt通过DatagramSocekt类创建和访问。服务器Socket需要指定绑定端口&#xff0c;而用户端…

【react框架】结合antd做表单组件的一些心得记录

文章目录 前言功能的实现尽量先看看antd上是否已经提供当一个页面有多个表单组件时&#xff0c;就要优先考虑把值存在状态管理中如果一些表单比较简单且能确保后续不会有功能上的拓展&#xff0c;可以使用业务组件推荐其他的表单库 前言 因为最近在学其他东西&#xff0c;今天…

华中科技大学计算机考研分析

关注我们的微信公众号 姚哥计算机考研 更多详情欢迎咨询 华中科技大学计算机考研招生学院是计算机科学与技术学院、软件学院和网络空间安全学院。目前均已出拟录取名单。 华中科技大学计算机科学与技术学院成立于1997年&#xff0c;其前身是原华中理工大学&#xff08;即华中…