Flink CEP实现10秒内连续登录失败用户分析

news2024/11/13 12:46:59

1、什么是CEP?

Flink CEP即 Flink Complex Event Processing,是基于DataStream流式数据提供的一套复杂事件处理编程模型。你可以把他理解为基于无界流的一套正则匹配模型,即对于无界流中的各种数据(称为事件),提供一种组合匹配的功能。

在这里插入图片描述
上图中,以不同形状代表一个DataStream中不同属性的事件。以一个圆圈和一个三角组成一个Pattern后,就可以快速过滤出原来的DataStream中符合规律的数据。举个例子,比如很多网站需要对恶意登录的用户进行屏蔽,如果用户连续三次输入错误的密码,那就要锁定当前用户。在这个场景下,所有用户的登录行为就构成了一个无界的数据流DataStream。而连续三次登录失败就是一个匹配模型Pattern。CEP编程模型的功能就是从用户登录行为这个无界数据流DataStream中,找出符合这个匹配模Pattern的所有数据。这种场景下,使用我们前面介绍的各种DataStream API其实也是可以实现的,不过相对就麻烦很多。而CEP编程模型则提供了非常简单灵活的功能实现方式。

2、代码实现

2.1 引入maven依赖:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.roy</groupId>
    <artifactId>FlinkDemo</artifactId>
    <version>1.0</version>

    <properties>
        <flink.version>1.12.5</flink.version>
        <maven.compiler.source>1.8</maven.compiler.source>
        <maven.compiler.target>1.8</maven.compiler.target>
        <log4j.version>2.12.1</log4j.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-java</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients_2.12</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java_2.12</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-api-java-bridge_2.12</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-planner-blink_2.12</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-common</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <!-- CEP主要是下面这个依赖 -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-cep_2.12</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-statebackend-rocksdb_2.12</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-shaded-hadoop-2-uber</artifactId>
            <version>2.8.3-10.0</version>
        </dependency>
        <dependency>
            <groupId>log4j</groupId>
            <artifactId>log4j</artifactId>
            <version>1.2.14</version>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-assembly-plugin</artifactId>
                <version>3.1.0</version>
                <configuration>
                    <descriptorRefs>
                        <descriptorRef>jar-with-dependencies</descriptorRef>
                    </descriptorRefs>
                </configuration>
                <executions>
                    <execution>
                        <id>make-assembly</id>
                        <phase>package</phase>
                        <goals>
                            <goal>single</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>

</project>

2.2 基本流程

//1、获取原始事件流
DataStream<Event> input = ......; 
//2、定义匹配器
Pattern<Event,?> pattern = .......; 
//3、获取匹配流
PatternStream<Event> patternStream = CEP.pattern(input, pattern);
//4、将匹配流中的数据处理形成结果数据流
DataStream<Result> resultStream = patternStream.process(
	new PatternProcessFunction<Event, Result>() {
	@Override
	public void processMatch(
		Map<String, List<Event>> pattern,
		Context ctx,
		Collector<Result> out) throws Exception {
	}
});

2.3 完整代码

注意:代码运行前,先启动2.4 nlk socket服务

package com.roy.flink.project.userlogin;

import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.cep.CEP;
import org.apache.flink.cep.PatternStream;
import org.apache.flink.cep.functions.PatternProcessFunction;
import org.apache.flink.cep.pattern.Pattern;
import org.apache.flink.cep.pattern.conditions.SimpleCondition;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;

import java.time.Duration;
import java.util.List;
import java.util.Map;

/**
 * @desc 十秒内连续登录失败的用户分析。使用Flink CEP进行快速模式匹配
 */
public class MyUserLoginAna {
    public static void main(String[] args) throws Exception {
        final StreamExecutionEnvironment env =
                StreamExecutionEnvironment.getExecutionEnvironment();
        // //BoundedOutOfOrdernessWatermarks定时提交Watermark的间隔
        env.getConfig().setAutoWatermarkInterval(1000L);

        // 使用Socket测试
        env.setParallelism(1);
        // 1、获取原始事件流(10.86.97.206改为实际地址)
        final DataStreamSource<String> dataStreamSource = env.socketTextStream("10.86.97.206",7777);

        final SingleOutputStreamOperator<UserLoginRecord> userLoginRecordStream = dataStreamSource.map(new MapFunction<String, UserLoginRecord>() {
            @Override
            public UserLoginRecord map(String s) throws Exception {
                final String[] splitVal = s.split(",");
                return new UserLoginRecord(splitVal[0], Integer.parseInt(splitVal[1]), Long.parseLong(splitVal[2]));
            }
        }).assignTimestampsAndWatermarks(
                WatermarkStrategy.<UserLoginRecord>forBoundedOutOfOrderness(Duration.ofSeconds(1))// 主要针对乱序流,由于乱序流中需要等待迟到数据到齐,所以必须设置一个固定量的延迟时间
                        .withTimestampAssigner((SerializableTimestampAssigner<UserLoginRecord>) (element, recordTimestamp) -> element.getLoginTime())
        );

        // 2、定义匹配器
        // 2.1:10秒内出现3次登录失败的记录(不一定连续)
        // Flink CEP定义消息匹配器。
//        final Pattern<UserLoginRecord, UserLoginRecord> pattern = Pattern.<UserLoginRecord>begin("start").where(new SimpleCondition<UserLoginRecord>() {
//            @Override
//            public boolean filter(UserLoginRecord userLoginRecord) throws Exception {
//                return 1 == userLoginRecord.getLoginRes();
//            }
//        }).times(3).within(Time.seconds(10));

        // 2.2:连续三次登录失败。next表示连续匹配。 不连续匹配使用followBy
        final Pattern<UserLoginRecord, UserLoginRecord> pattern = Pattern.<UserLoginRecord>begin("one").where(new SimpleCondition<UserLoginRecord>() {
            @Override
            public boolean filter(UserLoginRecord value) throws Exception {
                return 1 == value.getLoginRes();
            }
        }).next("two").where(new SimpleCondition<UserLoginRecord>() {
            @Override
            public boolean filter(UserLoginRecord value) throws Exception {
                return 1 == value.getLoginRes();
            }
        }).next("three").where(new SimpleCondition<UserLoginRecord>() {
            @Override
            public boolean filter(UserLoginRecord value) throws Exception {
                return 1 == value.getLoginRes();
            }
        }).within(Time.seconds(10));

        // 3、获取匹配流
        final PatternStream<UserLoginRecord> badUser = CEP.pattern(userLoginRecordStream, pattern);

        final MyProcessFunction myProcessFunction = new MyProcessFunction();
        // 4、将匹配流中的数据处理成结果数据流
        final SingleOutputStreamOperator<UserLoginRecord> badUserStream = badUser.process(myProcessFunction);
        badUserStream.print("badUser");
        env.execute("UserLoginAna");

    }// main

    public static class MyProcessFunction extends PatternProcessFunction<UserLoginRecord,UserLoginRecord>{

        @Override
        public void processMatch(Map<String, List<UserLoginRecord>> match, Context ctx, Collector<UserLoginRecord> out) throws Exception {
            // 针对2.1 连续3次登录失败
//            final List<UserLoginRecord> records = match.get("start");
//            for(UserLoginRecord record : records){
//                out.collect(record);
//            }

            // 针对2.2 非连续3次登录失败
            final List<UserLoginRecord> records = match.get("three");
            for(UserLoginRecord record : records){
                out.collect(record);
            }

        }// processMarch
    }// MyProcessFunction
}

UserLoginRecord对象,如下:


public class UserLoginRecord {
    private String userId;
    private int loginRes; // 0-成功, 1-失败
    private long loginTime;

    public UserLoginRecord() {
    }

    public UserLoginRecord(String userId, int loginRes, long loginTime) {
        this.userId = userId;
        this.loginRes = loginRes;
        this.loginTime = loginTime;
    }

    @Override
    public String toString() {
        return "UserLoginRecord{" +
                "userId='" + userId + '\'' +
                ", loginRes=" + loginRes +
                ", loginTime=" + loginTime +
                '}';
    }

    public String getUserId() {
        return userId;
    }

    public void setUserId(String userId) {
        this.userId = userId;
    }

    public int getLoginRes() {
        return loginRes;
    }

    public void setLoginRes(int loginRes) {
        this.loginRes = loginRes;
    }

    public long getLoginTime() {
        return loginTime;
    }

    public void setLoginTime(long loginTime) {
        this.loginTime = loginTime;
    }
}

2.4 nlk模拟socket服务端

在这里插入图片描述

2.5 IDEA控制台打印

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1421971.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

C#,欧拉数(Eulerian Number)的算法与源代码

1 欧拉数 欧拉数特指 Eulerian Number&#xff0c;不同于 Euler numbers&#xff0c;Eulers number 哦。 组合数学中&#xff0c;欧拉数&#xff08;Eulerian Number&#xff09;是从1到n中正好满足m个元素大于前一个元素&#xff08;具有m个“上升”的排列&#xff09;条件的…

C++ 之LeetCode刷题记录(二十三)

&#x1f604;&#x1f60a;&#x1f606;&#x1f603;&#x1f604;&#x1f60a;&#x1f606;&#x1f603; 开始cpp刷题之旅。 目标&#xff1a;执行用时击败90%以上使用 C 的用户。 118. 杨辉三角 给定一个非负整数 numRows&#xff0c;生成「杨辉三角」的前 numRows …

力扣hot100 括号生成 递归回溯 超简洁版

Problem: 22. 括号生成 Code 使用 static 会被复用 class Solution {List<String> ans new ArrayList<>();public List<String> generateParenthesis(int n){dfs(n, n, "");return ans;}/*** param l 左括号待补个数* param r 右括号待补个数*…

flask+django基于python的网上美食订餐系统_3lyq1

设计旨在提高顾客就餐效率、优化餐厅管理、提高订单准确性和客户的满意度。本系统采用 Python 语言作为开发语言&#xff0c;采用Django框架及其第三方库和第三方工具来进行开发。该方案分为管理员功能模块&#xff0c;商家功能模块以及用户前后功能模块三部分。开发前期根据用…

方法阻塞的解决方案之一

1、简单使用 一个h一个cpp文件 #pragma once #include <iostream> #include <thread> #include <atomic> #include <chrono> #include <string>class Person {public:struct dog {std::string name;int age;};public:void a(std::atomic<bo…

系统架构设计师-22年-下午题目

系统架构设计师-22年-下午题目 更多软考知识请访问 https://ruankao.blog.csdn.net/ 试题一必答&#xff0c;二、三、四、五题中任选两题作答 试题一 (25分) 说明 某电子商务公司拟升级其会员与促销管理系统&#xff0c;向用户提供个性化服务&#xff0c;提高用户的粘性。…

在线版XD,免费使用,功能全面,设计神器!

Adobe XD是什么软件&#xff1f; Adobe XD软件是一个结合设计和建立原型功能的一站式UX/UI设计平台。在XD软件中&#xff0c;数字团队可以进行移动应用、网页设计和原型制作。与此同时&#xff0c;Adobe XD软件也是一种跨平台设计产品&#xff0c;结合设计和建立原型功能&…

探索世界的奇妙之旅——Google Earth Pro(谷歌地球)软件介绍

Google Earth Pro&#xff08;谷歌地球&#xff09;是一款由谷歌公司开发的地图浏览和虚拟地球软件&#xff0c;为用户提供了全球范围内精确的地理浏览和探索功能。该软件整合了卫星图像、地图、地形、建筑物和3D视图等多种信息&#xff0c;让用户可以深入了解世界各地的地理特…

SQL注入:宽字节注入

SQL注入系列文章&#xff1a; 初识SQL注入-CSDN博客 SQL注入&#xff1a;联合查询的三个绕过技巧-CSDN博客 SQL注入&#xff1a;报错注入-CSDN博客 SQL注入&#xff1a;盲注-CSDN博客 SQL注入&#xff1a;二次注入-CSDN博客 ​SQL注入&#xff1a;order by注入-CSDN博客 …

nacos启动成功,程序连接失败

问题&#xff1a;nacos服务器启动成功后可以访问&#xff0c;但是程序连接却超时 解决&#xff1a;检查端口&#xff0c;2.0以上的版本需要开放的端口一共是三个&#xff01;&#xff01; 8848 9848 9849 找了很久是因为后面两个端口没有开放&#xff0c;原因是因为2.0以上…

STM32F407移植OpenHarmony笔记3

接上一篇&#xff0c;搭建完环境&#xff0c;找个DEMO能跑&#xff0c;现在我准备尝试从0开始搬砖。 首先把/device和/vendor之前的代码全删除&#xff0c;这个时候用hb set命令看不到任何项目了。 /device目录是硬件设备目录&#xff0c;包括soc芯片厂商和board板级支持代码…

Redis内存设置

通过redis-cli进入Redis命令行 redis权限认证命令&#xff1a;auth 查看redis内存使用情况的命令&#xff1a;info memory 查看最大内存命令&#xff1a;config get maxmemory 设置最大内存命令&#xff1a;config set maxmemory 也可以通过redis.conf配置文件修改最大内存…

腾讯云SDK并发调用优化方案

目录 一、概述 二、 网关的使用 2.1 核心代码 三、腾讯云SDK依赖包的改造 一、概述 此网关主要用于协调腾讯云SDK调用的QPS消耗&#xff0c;使得多个腾讯云用户资源能得到最大限度的利用。避免直接使用腾讯云SDK 时&#xff0c;在较大并发情况下导致接口调用异常。网关的工…

【前端-VUE+TS】Vue3组件化-知识补充(六)

一. 动态组件 比如我们现在想要实现了一个功能&#xff1a; 点击一个tab-bar&#xff0c;切换不同的组件显示&#xff1b; 案例截图 这个案例我们可以通过两种不同的实现思路来实现&#xff1a; 方式一&#xff1a;通过v-if来判断&#xff0c;显示不同的组件&#xff1b;方式二…

Flink中StateBackend(工作状态)与Checkpoint(状态快照)的关系

State Backends 由 Flink 管理的 keyed state 是一种分片的键/值存储&#xff0c;每个 keyed state 的工作副本都保存在负责该键的 taskmanager 本地中。另外&#xff0c;Operator state 也保存在机器节点本地。Flink 定期获取所有状态的快照&#xff0c;并将这些快照复制到持…

力扣349两个数的交集

题目连接&#xff1a;349. 两个数组的交集 - 力扣&#xff08;LeetCode&#xff09; 给定两个数组 nums1 和 nums2 &#xff0c;返回 它们的交集 。输出结果中的每个元素一定是 唯一 的。我们可以 不考虑输出结果的顺序 。 示例 1&#xff1a; 输入&#xff1a; nums1 [1,2,2…

学习嵌入式的第十二天-------二维数组函数的调用和指针的运算

二维数组函数调用 输入设备-------cpu------输出设备 | V 存储器 总线&#xff1a; 总线宽度&#xff1a;32位或64位 &#xff08;1.数据总线2.控制总线3.地址总线&#xff09; 练习&#xff1a; 定义一个二维整型数组&#xff0c;实现一个函数…

风水+起名测算小程序源码系统 带完整的安装代码包以及搭建教程

风水学是中国古老的一门学问&#xff0c;讲究人与自然的和谐&#xff0c;通过调整环境气场来影响人的运势和发展。而起名测算则是根据个人五行属性和命理特点&#xff0c;为其起一个好名字&#xff0c;以助其运势和发展。这两者结合&#xff0c;可以帮助用户更好地了解自己的命…

为什么每天上班明明没做什么体力活,却仍感觉到身体好累?

​为什么每天上班明明没做什么体力活&#xff0c;却仍感觉到身体好累&#xff1f; 在现代社会&#xff0c;许多人在工作中并不需要从事繁重的体力劳动&#xff0c;然而&#xff0c;他们却常常感到身体疲惫不堪。这种情况不仅发生在办公室工作的人群中&#xff0c;也普遍存在于…

前端Web开发

安装flask框架 pip install flask 导入flask模块 from flask import Flask 【可能遇到的问题】 出现了如下警告&#xff1a; WARNING: You are using pip version 21.2.4; however, version 22.0.4 is available.You should consider upgrading via the D:\Python\python…