大数据-130 - Flink CEP 详解 - CEP开发流程 与 案例实践:恶意登录检测实现

news2025/1/23 9:26:27

点一下关注吧!!!非常感谢!!持续更新!!!

目前已经更新到了:

  • Hadoop(已更完)
  • HDFS(已更完)
  • MapReduce(已更完)
  • Hive(已更完)
  • Flume(已更完)
  • Sqoop(已更完)
  • Zookeeper(已更完)
  • HBase(已更完)
  • Redis (已更完)
  • Kafka(已更完)
  • Spark(已更完)
  • Flink(正在更新!)

章节内容

上节我们完成了如下的内容:

  • Flink CEP 核心组件
  • CEP 的应用场景
  • CEP 的优势

在这里插入图片描述

超时事件提取

当一个模式通过within关键字定义了检测窗口时间时,部分事件序列可能因为超过窗口长度而被丢弃,为了能够处理这些超时的部分匹配,select和flatSelectAPI调用允许制定超时处理程序。

FlinkCEP开发流程

  • DataSource中的数据转换为DataStream
  • 定义Pattern,并将DataStream和Pattern组合转换为PatternStream。
  • PatternStream 经过 Select、Process 等算子转换为 DataStream
  • 再次转换为 DataStream 经过处理后,Sink到目标库。

SELECT 方法:

SingleOutputStreamOperator<PayEvent> result =
    patternStream.select(orderTimeoutOutput, new PatternTimeoutFunction<PayEvent, PayEvent>() {
    @Override
    public PayEvent timeout(Map<String, List<PayEvent>> map, long l) throws Exception {
        return map.get("begin").get(0);
    }
}, new PatternSelectFunction<PayEvent, PayEvent>() {
    @Override
    public PayEvent select(Map<String, List<PayEvent>> map) throws Exception {
        return map.get("pay").get(0);
    }
});

对检测到的序列模式序列应用选择函数,对于每个模式序列,调用提供的 PatternSelectFunction,模式选择函数只能产生一个结果元素。
对超时的部分模式序列应用超时函数,对于每个部分模式序列,调用提供的 PatternTimeoutFunction,模式超时函数只能产生一个结果元素。
你可以在使用相同 OutputTag 进行 Select 操作 SingleOutputStreamOperator上获得SingleOutputStreamOperator生成的超时数据流。

非确定有限自动机

FlinkCEP 在运行时会将用户的逻辑转换为这样一个 NFA Graph(NFA对象)
所以有限状态机的工作过程,就是从开始状态,根据不同的输入,自动进行转换的过程。
在这里插入图片描述

上图中的状态机的功能,是检测二进制数是否含有偶数个0。从图上可以看出,输入只有1和0两种。
从S1状态开始,只有输入0才会转换到S2状态,同样S2状态下只有输入0才会转换到S1。所以,二进制输入完毕,如果满足最终状态,也就是最后停在S1状态,那么输入的二进制数就含有偶数个0。

CEP开发流程

FlinkCEP开发流程:

  • DataSource中数据转换为DataStream、Watermark、keyby
  • 定义Pattern,并将DataStream和Pattern组合转换为PatternStream
  • PatternStream经过select、process等算子转换为 DataStream
  • 再次转换为 DataStream 经过处理后,Sink到目标库

添加依赖

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-cep_2.12</artifactId>
    <version>${flink.version}</version>
</dependency>

案例1:恶意登录检测

找出5秒内,连续登录失败的账号
以下是数据:

new CepLoginBean(1L, "fail", 1597905234000L),
new CepLoginBean(1L, "success", 1597905235000L),
new CepLoginBean(2L, "fail", 1597905236000L),
new CepLoginBean(2L, "fail", 1597905237000L),
new CepLoginBean(2L, "fail", 1597905238000L),
new CepLoginBean(3L, "fail", 1597905239000L),
new CepLoginBean(3L, "success", 1597905240000L)

整体思路

  • 获取到数据
  • 在数据源上做Watermark
  • 在Watermark上根据ID分组keyBy
  • 做出模式Pattern
  • 在数据流上进行模式匹配
  • 提取匹配成功的数据

编写代码

package icu.wzk;

import org.apache.flink.api.common.eventtime.*;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.cep.CEP;
import org.apache.flink.cep.PatternStream;
import org.apache.flink.cep.functions.PatternProcessFunction;
import org.apache.flink.cep.pattern.Pattern;
import org.apache.flink.cep.pattern.conditions.IterativeCondition;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;

import java.util.List;
import java.util.Map;


public class FlinkCepLoginTest {

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
        env.setParallelism(1);
        DataStreamSource<CepLoginBean> data = env.fromElements(
                new CepLoginBean(1L, "fail", 1597905234000L),
                new CepLoginBean(1L, "success", 1597905235000L),
                new CepLoginBean(2L, "fail", 1597905236000L),
                new CepLoginBean(2L, "fail", 1597905237000L),
                new CepLoginBean(2L, "fail", 1597905238000L),
                new CepLoginBean(3L, "fail", 1597905239000L),
                new CepLoginBean(3L, "success", 1597905240000L)
        );
        SingleOutputStreamOperator<CepLoginBean> watermarks = data
                .assignTimestampsAndWatermarks(new WatermarkStrategy<CepLoginBean>() {

                    @Override
                    public WatermarkGenerator<CepLoginBean> createWatermarkGenerator(WatermarkGeneratorSupplier.Context context) {
                        return new WatermarkGenerator<CepLoginBean>() {

                            long maxTimestamp = Long.MAX_VALUE;
                            long maxOutOfOrderness = 500L;

                            @Override
                            public void onEvent(CepLoginBean event, long eventTimestamp, WatermarkOutput output) {
                                maxTimestamp = Math.max(maxTimestamp, event.getTimestamp());
                            }

                            @Override
                            public void onPeriodicEmit(WatermarkOutput output) {
                                output.emitWatermark(new Watermark(maxTimestamp - maxOutOfOrderness));
                            }
                        };
                    }
                }.withTimestampAssigner((element, recordTimestamp) -> element.getTimestamp())
                );
        KeyedStream<CepLoginBean, Long> keyed = watermarks
                .keyBy(new KeySelector<CepLoginBean, Long>() {
                    @Override
                    public Long getKey(CepLoginBean value) throws Exception {
                        return value.getUserId();
                    }
                });
        Pattern<CepLoginBean, CepLoginBean> pattern = Pattern
                .<CepLoginBean>begin("start")
                .where(new IterativeCondition<CepLoginBean>() {
                    @Override
                    public boolean filter(CepLoginBean cepLoginBean, Context<CepLoginBean> context) throws Exception {
                        return cepLoginBean.getOperation().equals("fail");
                    }
                })
                .next("next")
                .where(new IterativeCondition<CepLoginBean>() {
                    @Override
                    public boolean filter(CepLoginBean cepLoginBean, Context<CepLoginBean> context) throws Exception {
                        return cepLoginBean.getOperation().equals("fail");
                    }
                })
                .within(Time.seconds(5));
        PatternStream<CepLoginBean> patternStream = CEP.pattern(keyed, pattern);
        SingleOutputStreamOperator<CepLoginBean> process = patternStream
                .process(new PatternProcessFunction<CepLoginBean, CepLoginBean>() {
                    @Override
                    public void processMatch(Map<String, List<CepLoginBean>> map, Context context, Collector<CepLoginBean> collector) throws Exception {
                        System.out.println("map: " + map);
                        List<CepLoginBean> start = map.get("start");
                        collector.collect(start.get(0));
                    }
                });
        process.print();
        env.execute("FlinkCepLoginTest");
    }

}


class CepLoginBean {


    private Long userId;

    private String operation;

    private Long timestamp;

    public CepLoginBean(Long userId, String operation, Long timestamp) {
        this.userId = userId;
        this.operation = operation;
        this.timestamp = timestamp;
    }

    public Long getUserId() {
        return userId;
    }

    public void setUserId(Long userId) {
        this.userId = userId;
    }

    public String getOperation() {
        return operation;
    }

    public void setOperation(String operation) {
        this.operation = operation;
    }

    public Long getTimestamp() {
        return timestamp;
    }

    public void setTimestamp(Long timestamp) {
        this.timestamp = timestamp;
    }

    @Override
    public String toString() {
        return "CepLoginBean{" +
                "userId=" + userId +
                ", operation='" + operation + '\'' +
                ", timestamp=" + timestamp +
                '}';
    }
}

运行结果

可以看到程序输出:

map: {start=[CepLoginBean{userId=2, operation='fail', timestamp=1597905236000}], next=[CepLoginBean{userId=2, operation='fail', timestamp=1597905237000}]}
CepLoginBean{userId=2, operation='fail', timestamp=1597905236000}
map: {start=[CepLoginBean{userId=2, operation='fail', timestamp=1597905237000}], next=[CepLoginBean{userId=2, operation='fail', timestamp=1597905238000}]}
CepLoginBean{userId=2, operation='fail', timestamp=1597905237000}

Process finished with exit code 0

运行截图如下所示:
在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2124329.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

【数字IC自整资料】笔试相关

目录 1、使用连续赋值来模拟buffer惯性延时的写法正确的是?&#xff08;A&#xff09; 2、AXI总线位宽32bit&#xff0c;频率500MHZ,假设AXI cmd是基于burst8传输的&#xff0c;第一笔数据返回的延迟是200ns。那么outstanding 数量至少(13)才能满带宽传输 3、一个线性反馈移…

ICETEK-DM6437-AICOM—— DMA直接存储器访问设计

#一、设计目的&#xff1a; 1 进一步了解 ICETEK-DM6437-AF 的内部存储器空间的分配及指令寻址方式&#xff1a; 内部存储器空间分配&#xff1a;研究 ICETEK-DM6437-AF 的存储器架构&#xff0c;包括但不限于片内 SRAM、片外 DRAM 和其他存储器模块。了解这些存储器的大小、起…

Pytorch深度学习快速入门笔记【小土堆】

目录 1 Python学习中两大重要函数 2 Python代码编辑的三种方式 3 Pytorch学习 3.1 Dataset和DataLoader 3.1.1 Dataset 3.1.2 DataLoader 3.2 TensorBoard 3.2.1 add_scalar 3.2.2 add_image 3.3 Transforms 3.3.1 ToTensor 3.3.2 Normalize 3.3.3 Resize 3.3.4 C…

数据场景练习

1 行列转换 (1) 列拆分为多行 把指定字段按指定分隔符进行拆分为多行,然后其它字段直接复制. select字段列,hobby2 from tbl -- lateral view udtf(expression) tablealias as columnalias (‘,’ columnalias) lateral view explode(split(hobby,;)) temp as hobby2 (2) 行扁…

uniapp小程序,使用腾讯地图获取定位

本篇文章分享一下在实际开发小程序时遇到的需要获取用户当前位置的问题&#xff0c;在小程序开发过程中经常使用到获取定位功能。uniapp官方也提供了相应的API供我们使用。 官网地址&#xff1a;uni.getLocation(OBJECT)) 官网获取位置的详细介绍这里就不再讲述了&#xff0c;大…

区块链的可伸缩性以及面临的挑战

1. 可伸缩性 在过去的几年中&#xff0c;可伸缩性&#xff08;Scalability,也称为可扩展性) 问题一直是激烈辩论、严格研究和媒体关注的焦点。 这是一个至关重要的问题&#xff0c;因为它可能意味着区块链不适于广泛应用&#xff0c;而仅限于联盟许可的私有网络。在经过对该领域…

C++设计模式——Mediator中介者模式

一&#xff0c;中介者模式的定义 中介者模式是一种行为型设计模式。它通过一个中介者对象将多个对象之间的交互关系进行封装&#xff0c;使得对象之间的交互需要通过中介者对象来完成。该设计模式的结构很容易理解&#xff0c;以中介者为中心。 中介者模式的设计思想侧重于在…

遗传算法与深度学习实战(12)——粒子群优化详解与实现

遗传算法与深度学习实战&#xff08;12&#xff09;——粒子群优化详解与实现 0. 前言1. 粒子群优化1.1 粒子群优化原理1.2 算法流程 2. 实现 PSO 解决方程2.1 问题描述2.2 代码实现 小结系列链接 0. 前言 粒子群优化 (Particle Swarm Optimization, PSO) 是一种借鉴适者生存和…

医疗行业怎么节约和管理能源

医院建筑能耗平台 医院智能照明平台 医院能源综合管理平台 目前&#xff0c;能源短缺已成为一个全球性问题。在建筑业的发展中&#xff0c;建筑电气照明系统的节能水平与中国的能源利用率有关。照明系统中的低功率因数和高电压波动将导致较大的功率损失。因此&#xff0c;要认…

计算机网络——ARP篇(二)

上一次学习了ARP的基本概念&#xff0c;ARP缓存&#xff0c;ARP类型&#xff0c;以及ARP协议在网络中是如何工作的。这一次&#xff0c;我又深入的了解了ARP协议的工作原理&#xff0c;下面是我的学习笔记&#xff1a; 在学习之前&#xff0c;首先提出三个问题&#xff1a;ARP协…

BizDevOps落地实践

我理解BizDevOps就是端到端&#xff0c;从战略业务机会到开发上线 参考资料 十六年所思所感&#xff0c;聊聊这些年我所经历的 DevOps 系统 必致&#xff08;BizDevOps&#xff09;白皮书2022免费下载_在线阅读_藏经阁-阿里云开发者社区 具体落地实践 战略规划 战略&…

C#使用TCP-S7协议读写西门子PLC(一)

之前本人发布西门子S7协议的报文 西门子PLC的S7协议报文解析说明_西门子报文详解-CSDN博客 西门子PLC的S7协议是西门子公司在ModbusTcp协议的基础上自定义的一种协议,仅支持西门子PLC,S7协议本质仍然属于TCP协议的一种自定义具体实现 第一步,准备工作。VS2022中新建窗体应…

动态规划及其MATLAB实现

目录 引言 动态规划的基本原理 动态规划的常见应用 动态规划的求解步骤 动态规划的复杂度分析 表格总结&#xff1a;动态规划常见问题及其复杂度 结论 引言 动态规划&#xff08;Dynamic Programming, DP&#xff09;是一种求解最优化问题的有效方法&#xff0c;特别适合…

华为 HCIP-Datacom H12-821 题库 (16)

1.需要题库的小伙伴至博客最下方添加微信公众号关注后回复题库 2.有兴趣交流IT问题的小伙伴微信公众号回复交流群&#xff0c;加入微信IT交流群 1. OSPF 邻居关系建立出现故障&#xff0c;通过 display ospf error 命令来检查&#xff0c;输出结果如图所示&#xff0c;根据图中…

从零开始配置 TypeScript 项目

ESLint 配置 从背景的介绍中可以理解&#xff0c;对于全新的 TypeScript 项目&#xff08;直接抛弃 TSLint&#xff09;需要包含解析 AST 的解析器 typescript-eslint/parser 和使用校验规则的插件 typescript-eslint/eslint-plugin&#xff0c;这里需要在项目中进行安装&…

CentOS 安装Squid代理

环境&#xff1a; 华为云服务器一台&#xff1a;123.60.53.69&#xff0c;放行3128端口 Windows 11 电脑&#xff1a;动态IP 需求&#xff1a; 客户端电脑通过华为云服务器实现代理上网 一、服务器设置 1、安装 yum install squid httpd-tools -y 2、创建用户&#x…

word文档转换为PPT文档最佳方案

目前&#xff0c;笔者发现word文档转换为ppt最好的解决方案。 注&#xff1a;目前AI生成PPT&#xff0c;一般是给定一个标题&#xff0c;直接生成PPT文档内容&#xff0c;属于AI原创&#xff1b;另外&#xff0c;还有一些在线编辑、生成PPT工具&#xff0c;需要付费&#xff0c…

MySQL数据库SQL语句和常用函数大全

前言 MySQL 8数据库提供了丰富的SQL语句操作功能以及一系列高级特性&#xff0c;这些功能使得数据库的管理、查询、更新和维护变得更加高效和灵活。以下是对MySQL 8数据库SQL语句操作大全及高级特性的详细概述&#xff1a; 一、SQL语句操作大全 1. 数据定义语言&#xff08…

【雅特力AT32】 MCU CAN入门指南(超详细)

通信协议与接口知识参考文章&#xff1a; 【通信理论知识】数据传送的方式&#xff1a;串/并行&#xff1b;传输方向&#xff1a;单工、半/全双工&#xff1b;传输方式&#xff1a;同步/异步 【串口通信详解】USART/UART、RS232、RS485标准接口与协议特点解析 【同步串行通信接…

重拾精髓:go doc -http让离线包文档浏览更便捷

Go语言团队近期接受了Go团队成员、Go圣经《The Go Programming Language[1]》合著者Alan Donovan[2]的新提案[3]&#xff0c;旨在进一步提升开发者体验。这个提案为go doc命令[4]的离线文档展示形式&#xff0c;同时增强了查看本地文档的交叉引用功能。看到这个提案功能&#x…