【Flink】复杂事件处理CEP底层实现(有限状态机)和应用

news2025/1/13 13:57:58

文章目录

  • 一 Flink CEP简介
    • 1 什么是复杂事件处理CEP
    • 2 Flink CEP
      • (1)导入依赖
      • (2)代码编写
      • (3)优化模板
    • 3 实现CEP底层 -- 有限状态机
    • 4 使用CEP处理超时事件

一 Flink CEP简介

1 什么是复杂事件处理CEP

一个或多个由简单事件构成的事件流通过一定的规则匹配,然后输出用户想得到的数据,满足规则的复杂事件。

特征有如下几点:

  • 目标:从有序的简单事件流中发现一些高阶特征。
  • 输入:一个或多个由简单事件构成的事件流。
  • 处理:识别简单事件之间的内在联系,多个符合一定规则的简单事件构成复杂事件。
  • 输出:满足规则的复杂事件。

如下图中,将输入流中的元素,按照连续两个事件,且第一个元素为正方形,第二个元素为三角形进行过滤:

在这里插入图片描述

CEP用于分析低延迟、频繁产生的不同来源的事件流。CEP可以帮助在复杂的、不相关的事件流中找出有意义的模式和复杂的关系,以接近实时或准实时的获得通知并阻止一些行为。

CEP支持在流上进行模式匹配,根据模式的条件不同,分为连续的条件或不连续的条件;模式的条件允许有时间的限制,当在条件范围内没有达到满足的条件时,会导致模式匹配超时。

看起来很简单,但是它有很多不同的功能:

  • 输入的流数据,尽快产生结果
  • 在2个event流上,基于时间进行聚合类的计算
  • 提供实时/准实时的警告和通知
  • 在多样的数据源中产生关联并分析模式
  • 高吞吐、低延迟的处理

市场上有多种CEP的解决方案,例如Spark、Samza、Beam等,但他们都没有提供专门的library支持。但是Flink提供了专门的CEP library。

2 Flink CEP

Flink为CEP提供了专门的Flink CEP library,它包含如下组件:

  • Event Stream
  • pattern定义
  • pattern检测
  • 生成Alert

在这里插入图片描述

首先,开发人员要在DataStream流上定义出模式条件,之后Flink CEP引擎进行模式检测,必要时生成告警。

(1)导入依赖

为了使用Flink CEP,需要导入依赖:

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-cep_${scala.binary.version}</artifactId>
    <version>${flink.version}</version>
</dependency>

(2)代码编写

使用API完成检测用户连续三次登录失败的需求

public static void main(String[] args) throws Exception{
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    env.setParallelism(1);

    SingleOutputStreamOperator<day06.Example3.Event> stream = env
            .fromElements(
                    new day06.Example3.Event("user-1", "log-fail", 1000L),
                    new day06.Example3.Event("user-1", "log-fail", 2000L),
                    new day06.Example3.Event("user-2", "log-succ", 3000L),
                    new day06.Example3.Event("user-1", "log-fail", 4000L),
                    new day06.Example3.Event("user-1", "log-fail", 5000L)
            )
            .assignTimestampsAndWatermarks(WatermarkStrategy.<day06.Example3.Event>forMonotonousTimestamps()
                    .withTimestampAssigner(new SerializableTimestampAssigner<day06.Example3.Event>() {
                        @Override
                        public long extractTimestamp(day06.Example3.Event element, long recordTimestamp) {
                            return element.timestamp;
                        }
                    }));

    // 定义模板(org.apache.flink.cep.pattern.Pattern)
    Pattern<day06.Example3.Event, day06.Example3.Event> pattern = Pattern
            .<day06.Example3.Event>begin("first")   // 给第一个匹配事件起名
            .where(new SimpleCondition<day06.Example3.Event>() {
                @Override
                public boolean filter(day06.Example3.Event value) throws Exception {
                    return value.eventType.equals("log-fail");
                }
            })
            .next("second")  //next表示严格紧邻
            .where(new SimpleCondition<day06.Example3.Event>() {
                @Override
                public boolean filter(day06.Example3.Event value) throws Exception {
                    return value.eventType.equals("log-fail");
                }
            })
            .next("third")
            .where(new SimpleCondition<day06.Example3.Event>() {
                @Override
                public boolean filter(day06.Example3.Event value) throws Exception {
                    return value.eventType.equals("log-fail");
                }
            });
    
    // 在流上使用模板,参数为输入的流和要匹配的模板
    PatternStream<day06.Example3.Event> patternStream = CEP.pattern(stream.keyBy(r -> r.orderId), pattern);

    // 使用select方法将匹配到的事件取出
    patternStream
            .select(new PatternSelectFunction<day06.Example3.Event, String>() {
                @Override
                public String select(Map<String, List<day06.Example3.Event>> map) throws Exception {
                    // map的key是给时间起的名字,v为名字对应的事件列表
                    // 上例中事件只有一个,列表中只有一个元素
                    day06.Example3.Event first = map.get("first").get(0);
                    day06.Example3.Event second = map.get("second").get(0);
                    day06.Example3.Event third = map.get("third").get(0);
                    String result = "用户:" + first.orderId + "分别在以下三个时间:" + first.timestamp
                            + "、" + second.timestamp + "、" + third.timestamp + "登录失败了";
                    return result;
                }
            })
            .print();

    env.execute();
}

(3)优化模板

Pattern<Example3.Event, Example3.Event> pattern = Pattern
        .<day06.Example3.Event>begin("log-fail")   // 给第一个匹配事件起名
        .where(new SimpleCondition<Example3.Event>() {
            @Override
            public boolean filter(day06.Example3.Event value) throws Exception {
                return value.eventType.equals("log-fail");
            }
        })
        .times(3);
patternStream
        .select(new PatternSelectFunction<Example3.Event, String>() {
            @Override
            public String select(Map<String, List<Example3.Event>> map) throws Exception {
                // map的key是给时间起的名字,v为名字对应的事件列表
                // 上例中事件只有一个,列表中只有一个元素
                day06.Example3.Event first = map.get("log-fail").get(0);
                day06.Example3.Event second = map.get("log-fail").get(1);
                day06.Example3.Event third = map.get("log-fail").get(2);
                String result = "用户:" + first.orderId + "分别在以下三个时间:" + first.timestamp
                        + "、" + second.timestamp + "、" + third.timestamp + "登录失败了";
                return result;
            }
        })
        .print();

3 实现CEP底层 – 有限状态机

使用状态机实现检测连续三次登录失败,实现原理如下图:

在这里插入图片描述

public static void main(String[] args) throws Exception {
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    env.setParallelism(1);

    SingleOutputStreamOperator<Example3.Event> stream = env
            .fromElements(
                    new day06.Example3.Event("user-1", "log-fail", 1000L),
                    new day06.Example3.Event("user-1", "log-fail", 2000L),
                    new day06.Example3.Event("user-2", "log-succ", 3000L),
                    new day06.Example3.Event("user-1", "log-fail", 4000L),
                    new day06.Example3.Event("user-1", "log-fail", 5000L)
            )
            .assignTimestampsAndWatermarks(WatermarkStrategy.<day06.Example3.Event>forMonotonousTimestamps()
                    .withTimestampAssigner(new SerializableTimestampAssigner<Example3.Event>() {
                        @Override
                        public long extractTimestamp(day06.Example3.Event element, long recordTimestamp) {
                            return element.timestamp;
                        }
                    }));

    stream
            .keyBy(r -> r.orderId)
            .process(new KeyedProcessFunction<String, Example3.Event, String>() {
                private HashMap<Tuple2<String,String>,String> stateMachine = new HashMap<>();
                private ValueState<String> currentState;

                @Override
                public void open(Configuration parameters) throws Exception {
                    super.open(parameters);
                    // 如果接收到初始登录状态为成功,返回SUCCESS
                    // 状态转移矩阵
                    // key:(状态,接收到事件的类型)
                    // value:(将要跳转到的状态)
                    stateMachine.put(Tuple2.of("INITIAL","log-succ"),"SUCCESS");
                    stateMachine.put(Tuple2.of("INITIAL","log-fail"),"s1");
                    stateMachine.put(Tuple2.of("s1","log-succ"),"SUCCESS");
                    stateMachine.put(Tuple2.of("s1","log-fail"),"s2");
                    stateMachine.put(Tuple2.of("s2","log-succ"),"SUCCESS");
                    stateMachine.put(Tuple2.of("s2","log-fail"),"FAIL");

                    currentState = getRuntimeContext().getState(
                            new ValueStateDescriptor<String>("current-state", Types.STRING)
                    );
                }

                @Override
                public void processElement(Example3.Event value, Context ctx, Collector<String> out) throws Exception {
                    if(currentState.value() == null){
                        currentState.update("INITIAL");
                    }

                    // 记录将要跳转到的状态
                    // 如initial状态,到来事件值为log-succ,那么nextState为SUCCESS
                    // 取状态机的value部分
                    String nextState = stateMachine.get(Tuple2.of(currentState.value(), value.eventType));

                    if(nextState.equals("FAIL")){
                        out.collect("用户" + value.orderId + "连续三次登录失败了");
                        currentState.update("s2");
                    } else if(nextState.equals("SUCCESS")){
                        currentState.clear();
                    } else {
                        currentState.update(nextState);
                    }

                }
            })
            .print();

    env.execute();
}

4 使用CEP处理超时事件

现实中,并不是每次下单都会被用户立刻支付。当拖延一段时间后,用户支付的意愿会降低。所以为了让用户更有紧迫感从而提高支付转化率,同时也为了防范订单支付环节的安全风险,电商网站往往会对订单状态进行监控,设置一个失效时间(比如 15 分钟),如果下单后一段时间仍未支付,订单就会被取消。

先将事件流按照订单号 orderId 分流,然后定义这样的一个事件模式:在 15 分钟内,事件“create”与“pay”严格紧邻:

public static void main(String[] args) throws Exception{
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    env.setParallelism(1);

    SingleOutputStreamOperator<Example3.Event> stream = env
            .fromElements(
                    new day06.Example3.Event("order-1", "create", 1000L),
                    new day06.Example3.Event("order-2", "create", 2000L),
                    new day06.Example3.Event("order-1", "pay", 19000L)
            )
            .assignTimestampsAndWatermarks(WatermarkStrategy.<day06.Example3.Event>forMonotonousTimestamps()
                    .withTimestampAssigner(new SerializableTimestampAssigner<Example3.Event>() {
                        @Override
                        public long extractTimestamp(day06.Example3.Event element, long recordTimestamp) {
                            return element.timestamp;
                        }
                    }));

    Pattern<Example3.Event, Example3.Event> pattren = Pattern
            .<Example3.Event>begin("create")
            .where(new SimpleCondition<Example3.Event>() {
                @Override
                public boolean filter(Example3.Event value) throws Exception {
                    return value.eventType.equals("create");
                }
            })
            .next("pay")
            .where(new SimpleCondition<Example3.Event>() {
                @Override
                public boolean filter(Example3.Event value) throws Exception {
                    return value.eventType.equals("pay");
                }
            })
            // 要求两事件的间隔时间不能超过15分钟
            .within(Time.minutes(15));

    PatternStream<Example3.Event> patternStream = CEP.pattern(stream.keyBy(r -> r.orderId), pattren);

    SingleOutputStreamOperator<String> result = patternStream
            // 第一个参数是侧输出标签
            // 第二个参数用于将超时事件发送到侧输出流
            // 第三个参数用于处理正常事件
            .flatSelect(
                    new OutputTag<String>("timeout") {
                    },
                    new PatternFlatTimeoutFunction<Example3.Event, String>() {
                        @Override
                        public void timeout(Map<String, List<Example3.Event>> map, long l, Collector<String> collector) throws Exception {
                            Example3.Event create = map.get("create").get(0);
                            collector.collect("订单:" + create.orderId + "超时了");
                        }
                    },
                    new PatternFlatSelectFunction<Example3.Event, String>() {
                        @Override
                        public void flatSelect(Map<String, List<Example3.Event>> map, Collector<String> collector) throws Exception {
                            Example3.Event pay = map.get("pay").get(0);
                            collector.collect("订单:" + pay.orderId + "已支付");
                        }
                    }
            );

    result.print();
    result.getSideOutput(new OutputTag<String>("timeout"){}).print();

    env.execute();
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/70513.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

239页11万字新型智慧城市运营中心IOC大数据平台建设方案

目录 1 概述 1.1 建设目标 1.2 建设内容 1.3 建设步骤 2 项目建设方案 2.1 总体设计方案 2.2 支撑平台方案 2.2.1 数据治理平台 2.2.2 可视化平台 2.2.3 城市感知平台 2.3 应用系统方案 2.3.1 综合监测系统 2.3.2 事件管理系统 2.3.3 联动指挥系统 2.3.4 辅助决策…

自定义镜像上传阿里云

目录标题一、Docker制作jdk镜像1.jdkv.1.0的制作1.1创建文件夹上传jdk的安装包,和在同级目录下编写Dockerfile文件1.2.编写 Dockerfile 文件1.3.执行Dockerfile文件&#xff0c;初次依赖镜像的时候会下载相应镜像2.jdk2.0的制作3.jdk3.0的制作二、Docker镜像上传至阿里云前期准…

19.5 迭代器的概念和分类

一&#xff1a;迭代器基本概念&#xff1a;第十三章第九节 迭代器&#xff1a;是一个“可遍历STL容器全部或者部分元素”的对象&#xff08;行为类似于指针的对象&#xff09;&#xff1b; 迭代器用来表现容器中的某一位置&#xff1b;迭代器紧密依赖于容器&#xff0c;迭代器…

2023年天津仁爱学院专升本动画、化学工程与工艺专业介绍

2023年天津仁爱学院专升本专业课动画专业、化学工程与工艺专业介绍 &#xff08;一&#xff09;动画专业 动画专业以行业发展对应用型人才需求为导向&#xff0c;不断提高学生就业质量为目标&#xff0c;针对学生特点&#xff0c;积极拓展动画应用领域&#xff0c;设有影视后期…

JSP SSH超市管理统myeclipse开发mysql数据库MVC模式java编程计算机网页设计

一、源码特点 JSP 超市管理统是一套完善的web设计系统&#xff08;系统采用ssh框架进行设计开发&#xff09;&#xff0c;对理解JSP java编程开发语言有帮助&#xff0c;系统具有完整的源代码和数据库&#xff0c;系统主要采用B/S模式开发 JSP SSH超市管理统myeclipse开发…

【Leetcode每日一题】子序列宽度之和,匹配子序列的单词数,最大平均值和的分组

891. 子序列宽度之和 计算的是【贡献】。 首先观察发现&#xff0c;顺序不影响结果。然后比如1&#xff0c;作为最大元素贡献为0&#xff0c;而作为最小元素贡献为每个子序列的【最大-1】&#xff0c;一共有多少个作为最小元素的子序列&#xff0c;对答案的贡献就是-1*(个数)。…

【✨十五天搞定电工基础】正弦交流电路的分析(下)

目录 五、复杂正弦交流电路的分析&#xff08;下&#xff09; 六、功率因数的提高 七、谐振电路 1、串联谐振 2、并联谐振 八、课后习题 1、正弦量三要素&#xff0c;相位 2、RLC串联问题 3、复杂正弦交流电路问题 4、谐振问题 五、复杂正弦交流电路的分析&#x…

Metabase学习教程:权限-4

高级数据沙盒&#xff1a;限制对列的访问 了解如何使用已保存的SQL查询对表进行沙盒处理&#xff0c;并根据用户属性限制用户可以查看的列。 我们的文章行权限涵盖了沙盒&#xff08;商业版本). 我们将沙盒定义为一种根据用户身份指定用户可以访问哪些数据的方法&#xff0c;…

【TOTP】基于时间的动态密码及其工程实践

探究了常见的动态密码的实现方式及其底层原理&#xff0c;并基于java做出了工程实践。 文章目录A.来源于一个现象的好奇B.2FAC.TOTP1.什么是TOTP2.原理详解&#xff08;基于java-totp项目分析&#xff09;3.这样真的安全吗4.常见的支持TOTP的软件1.Google Authenticator2.Micro…

RCNN学习笔记-MobileNet3

更新Block(bneck倒残差结构) 1.加入SE&#xff08;自注意力模块squeeze-and-excite bottleneck&#xff09;模块。当stride1&#xff08;高和宽是不会变化的&#xff09;且inputc outputc才有shortcut连接。 相反&#xff0c;我们将它们全部替换为扩展层中通道数量的1/4。我…

功率放大器输出阻抗的影响因素有哪些原因

关于功率放大器的疑问有很多&#xff0c;前阵子有人咨询影响功率放大器输出阻抗的有哪些因素&#xff0c;今天就请安泰电子来为大家解释&#xff0c;同时再为大家科普一下功率放大器的知识。 图&#xff1a;信号源和负载的放大器的简化模型 在搞清楚影响功率放大器输出阻抗因素…

速锐得解码本田雅阁混动版整车网关CAN总线通信协议DBC控制策略

本田汽车增城工厂就在附近50多公里的地方&#xff0c;和比亚迪汽车差不多&#xff0c;无论怎么跑都得1个多小时&#xff0c;也因为近水楼台的天然优势&#xff0c;而我们也与本田安全驾驶中心有多次深度的合作。碗里的肉&#xff0c;基本上都是上过了速锐得砧板。 近&#xff0…

m基于FPGA的半带滤波器verilog设计,对比普通结构以及乘法器复用结构

目录 1.算法描述 2.仿真效果预览 3.verilog核心程序 4.完整FPGA 1.算法描述 HBF模块由半带滤波器&#xff08;HBF&#xff09;和抽取模块组成。该模块的任务是实现2倍抽取进一步降低信号采样速率。由于HBF的冲激响应h(k)除零点外其余偶数点均为零&#xff0c;所以用HBF实现…

5G+无人驾驶融合创新,赋能港口智能化发展!

导语 | 在新一轮科技革命的时代背景下&#xff0c;5G 技术和无人驾驶的创新融合&#xff0c;使得我国当前港口的智慧化建设走在了世界的前列&#xff0c;智慧港口的发展不断深入。此次&#xff0c;我们邀请到了飞步科技的联合创始人兼 CTO、腾讯云 TVP 杨政老师&#xff0c;他将…

【LeetCode】No.116. Populating Next Right Pointers in Each Node -- Java Version

题目链接&#xff1a;https://leetcode.com/problems/populating-next-right-pointers-in-each-node/description/ 1. 题目介绍&#xff08;&#xff09; You are given a perfect binary tree where all leaves are on the same level, and every parent has two children. T…

安卓APP源码和设计报告——小说阅读器

班级 姓名 学号 答辩情况 考核项满分成绩得分掌握计算机系统软硬件资源管理的原理&#xff0c;能够设计针对计算机领域复杂工程问题的解决方案&#xff0c;设计满足特定需求的软硬件系统&#xff0c;并具有对解决方案在特定约束条件下进行工程设计和开发的能力。30能够针对计…

Excel 函数大全之 INTERCEPT function 获取线性回归线的截距

描述 使用现有的 x 值和 y 值计算直线与 y 轴相交的点。截点基于通过已知 x 值和已知 y 值绘制的最佳拟合回归线。当您想要在自变量为 0(零)时确定因变量的值时,请使用 INTERCEPT 函数。例如,当您的数据点是在室温或更高温度下获取的时,您可以使用 INTERCEPT 函数预测金属…

BIGEMAP APP导入/导出文件\照片(kml\shp\cad(dxf)\txt\excel)

APP数据导入&#xff1a; 1、kml\bmv文件通过QQ、微信等发送到手机端&#xff0c;在手机端下载文件&#xff0c;然后选择其他应用打开&#xff0c;选择bigemap打开就可以了。 2、其他数据导入&#xff08;其他数据包括&#xff1a;shp、kml\kmz、CAD的dxf、txt、excel、csv等…

java通过idea进行远程调试

1&#xff0c;基于SpringBoot使用IDEA工具 在pom.xml中配置 里配置jvmArguments参数 -Xdebug -Xrunjdwp:transportdt_socket,address8008,servery,suspendn&#xff1a; <build><plugins><plugin><groupId>org.springframework.boot</groupId>…

MySQL是怎样加锁的

是不是很多人都对 MySQL 加行级锁的规则搞的迷迷糊糊&#xff0c;对记录一会加的是 next-key 锁&#xff0c;一会加是间隙锁&#xff0c;一会又是记录锁。这次就带大家浅浅地聊一下MySQL是怎样加锁的。 什么 SQL 语句会加行级锁&#xff1f; InnoDB 引擎是支持行级锁的&#…