Flink实现实时异常登陆监控(两秒内多次登陆失败进行异常行为标记)

news2024/12/26 14:35:04

Flink实现异常登陆监控(两秒内多次登陆失败进行异常行为标记)

在大数据处理领域,Apache Flink 是一个流行的开源流处理框架,能够高效处理实时数据流。在这篇博客中,我们将展示如何使用 Apache Flink 从 MySQL 中读取数据并进行实时异常监控处理,最终将结果写回到 MySQL 数据库中的err_login表中。

项目概述

我们的示例程序将会执行以下任务:

从 MySQL 数据库读取用户登录数据。
过滤出特定状态的登录记录。
对这些记录进行时间窗口处理。
将处理结果写回 MySQL 数据库。

依赖环境

在开始之前,请确保你已经安装了以下环境:
pom文件

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>org.example</groupId>
    <artifactId>EastMoney</artifactId>
    <version>1.0-SNAPSHOT</version>
    <repositories>
        <repository>
            <id>central</id>
            <name>Maven Central Repository</name>
            <url>https://repo.maven.apache.org/maven2</url>
        </repository>
    </repositories>

    <properties>
        <maven.compiler.source>8</maven.compiler.source>
        <maven.compiler.target>8</maven.compiler.target>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients_2.11</artifactId>
            <version>1.14.6</version>
        </dependency>
            <!-- Apache Flink dependencies -->
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-table-api-java-bridge_2.11</artifactId>
                <version>1.14.6</version>
            </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-api-scala-bridge_2.11</artifactId>
            <version>1.14.6</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-scala_2.11</artifactId>
            <version>1.14.6</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-planner_2.11</artifactId>
            <version>1.14.6</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-api-scala_2.11</artifactId>
            <version>1.14.6</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-jdbc_2.11</artifactId>
            <version>1.14.6</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java_2.11</artifactId>
            <version>1.14.6</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-csv</artifactId>
            <version>1.14.6</version>
        </dependency>
        <dependency>
            <groupId>com.ververica</groupId>
            <artifactId>flink-sql-connector-mysql-cdc</artifactId>
            <version>2.3.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-kafka_2.11</artifactId>
            <version>1.14.6</version>
        </dependency>

        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <version>8.0.25</version>
        </dependency>
    </dependencies>

</project>

MySQL 数据库

CREATE TABLE `login_detail` (
  `id` int NOT NULL AUTO_INCREMENT,
  `username` varchar(255) DEFAULT NULL,
  `password` varchar(255) DEFAULT NULL,
  `time` varchar(255) DEFAULT NULL,
  `status` int DEFAULT NULL,
  PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=127 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci 

CREATE TABLE `err_login` (
  `id` int NOT NULL AUTO_INCREMENT,
  `username` varchar(255) DEFAULT NULL,
  `status` int DEFAULT NULL,
  PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=74 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci 

1. 数据模型定义

首先,我们定义了一个简单的 User case class,用于表示从 MySQL 中读取的用户数据。

case class User(id: Int, username: String, password: String, time: String, status: Int)

2.自定义 MySQL 数据源

我们实现了一个自定义的 RichSourceFunction,从 MySQL 数据库中读取数据。该函数会不断地查询数据库,并将新数据发送到 Flink 流中。

class MySQLInsertSource(jdbcUrl: String, username: String, password: String, tableName: String) extends RichSourceFunction[User] {
  @volatile private var isRunning = true
  private var connection: Connection = _
  private var lastMaxTime: String = _

  override def open(parameters: org.apache.flink.configuration.Configuration): Unit = {
    super.open(parameters)
    connection = DriverManager.getConnection(jdbcUrl, username, password)
    // Initial load
    val statement = connection.createStatement()
    val resultSet = statement.executeQuery(s"SELECT * FROM $tableName")
    while (resultSet.next()) {
      val user = User(
        resultSet.getInt("id"),
        resultSet.getString("username"),
        resultSet.getString("password"),
        resultSet.getString("time"),
        resultSet.getInt("status")
      )
      // Update lastMaxTime
      if (lastMaxTime == null || user.time > lastMaxTime) {
        lastMaxTime = user.time
      }
    }
  }

  override def run(ctx: SourceFunction.SourceContext[User]): Unit = {
    val statement = connection.createStatement()
    while (isRunning) {
      val query = s"SELECT * FROM $tableName WHERE time > '$lastMaxTime'"
      val resultSet = statement.executeQuery(query)
      while (resultSet.next()) {
        val user = User(
          resultSet.getInt("id"),
          resultSet.getString("username"),
          resultSet.getString("password"),
          resultSet.getString("time"),
          resultSet.getInt("status")
        )
        ctx.collect(user)
        // Update lastMaxTime
        if (user.time > lastMaxTime) {
          lastMaxTime = user.time
        }
      }
      Thread.sleep(2000) // sleep for 2 seconds
    }
  }

  override def cancel(): Unit = {
    isRunning = false
    if (connection != null) {
      connection.close()
    }
  }
}

变量声明:
isRunning: 用于控制数据源是否继续运行。
connection: 用于连接 MySQL 数据库的 Connection 对象。
lastMaxTime: 记录上次读取数据的最大时间戳,用于增量查询。
open 方法:在数据源启动时初始化数据库连接并进行初始加载,读取全部数据,更新 lastMaxTime。
run 方法:在数据源运行时不断查询数据库,获取新数据并发送到 Flink 流中。每隔2秒执行一次查询,并更新 lastMaxTime。
cancel 方法:在数据源取消时关闭数据库连接。

3. 时间戳分配器和水位线

为了确保事件按时间顺序处理,我们为数据流分配时间戳并生成水位线。

val userStreamWithTimestamps = userStream
  .assignTimestampsAndWatermarks(
    WatermarkStrategy
      .forBoundedOutOfOrderness[User](Duration.ofSeconds(1))
      .withTimestampAssigner(new SerializableTimestampAssigner[User] {
        override def extractTimestamp(element: User, recordTimestamp: Long): Long = {
          val format = new java.text.SimpleDateFormat("yyyy-MM-dd HH:mm:ss")
          val date = format.parse(element.time)
          date.getTime
        }
      })
  )

WatermarkStrategy:定义了水位线生成策略。forBoundedOutOfOrderness 表示允许事件在1秒的乱序范围内到达。

SerializableTimestampAssigner:定义了时间戳提取器,从 User 对象的 time 字段提取时间戳。

4. 数据过滤和窗口处理

我们过滤出 status 为 0 的记录,并对这些记录进行2秒的窗口处理。

val filteredStream = userStreamWithTimestamps.filter(_.status == 0)

val windowedStream = filteredStream
  .keyBy(_.username)
  .timeWindow(Time.seconds(2))
  .process(new WriteToDatabaseFunction(jdbcUrl, username, password))

过滤:filter 操作保留 status 为 0 的记录。(0为登陆失败)
窗口处理:对每个 username 进行2秒的时间窗口处理,并使用自定义的 WriteToDatabaseFunction 进行处理。

5. 窗口处理函数

我们实现了一个 ProcessWindowFunction,在窗口结束时将获取到的异常登陆用户写入 MySQL 数据库。

class WriteToDatabaseFunction(url: String, username: String, password: String) extends ProcessWindowFunction[User, String, String, TimeWindow] {
  val insertSql = "INSERT INTO err_login (username, status) VALUES (?, ?)"

  override def process(key: String, context: Context, elements: Iterable[User], out: Collector[String]): Unit = {
    val allStatusOne = elements.forall(_.status == 0)
    if (allStatusOne) {
      out.collect(s"Username: $key had status 1 for 2 seconds")
      val connection = DriverManager.getConnection(url, username, password)
      val preparedStatement = connection.prepareStatement(insertSql)
      try {
        for (user <- elements) {
          preparedStatement.setString(1, user.username)
          preparedStatement.setInt(2, user.status)
          preparedStatement.addBatch()
        }
        preparedStatement.executeBatch()
      } finally {
        preparedStatement.close()
        connection.close()
      }
    }
  }
}

变量声明:insertSql 为插入错误登录记录的 SQL 语句。
process 方法:
检查窗口内的所有记录 status 是否都为 0。
如果是,打印日志并将记录写入 err_login 表中。
使用批量插入提高效率。

6. 主函数

最后,我们将所有部分组装在一起,并执行 Flink 作业。

object FlinkMySQLExample {
  val jdbcUrl = "jdbc:mysql://localhost:3306/big_data"
  val username = "root"
  val password = "12345678"
  val tableName = "login_detail"

  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment

    val mySQLSource = new MySQLInsertSource(jdbcUrl, username, password, tableName)
    val userStream = env.addSource(mySQLSource)

    userStream.print()

    val userStreamWithTimestamps = userStream
      .assignTimestampsAndWatermarks(
        WatermarkStrategy
          .forBoundedOutOfOrderness[User](Duration.ofSeconds(1))
          .withTimestampAssigner(new SerializableTimestampAssigner[User] {
            override def extractTimestamp(element: User, recordTimestamp: Long): Long = {
              val format = new java.text.SimpleDateFormat("yyyy-MM-dd HH:mm:ss")
              val date = format.parse(element.time)
              date.getTime
            }
          })
      )

    val filteredStream = userStreamWithTimestamps.filter(_.status == 0)

    val windowedStream = filteredStream
      .keyBy(_.username)
      .timeWindow(Time.seconds(2))
      .process(new WriteToDatabaseFunction(jdbcUrl, username, password))

    windowedStream.print()

    env.execute("Flink MySQL Example")
  }
}

主函数:
获取 Flink 的执行环境。
添加自定义数据源 MySQLInsertSource,从 MySQL 数据库中读取数据。
将数据流赋予时间戳和水位线。
过滤出 status 为 0 的记录。
对过滤后的记录进行2秒的窗口处理,并将结果写入 MySQL 数据库。
执行 Flink 作业。
在这里插入图片描述
在这里插入图片描述

7.总结

这段代码展示了如何使用 Apache Flink 处理实时数据流,并与 MySQL 数据库进行交互。通过自定义数据源、时间戳和水位线分配、窗口处理和自定义窗口函数,我们可以构建强大的流处理应用程序。

如有遇到问题可以找小编沟通交流哦。另外小编帮忙辅导大课作业,学生毕设等。不限于MapReduce, MySQL, python,java,大数据,模型训练等。 hadoop hdfs yarn spark Django flask flink kafka flume datax sqoop seatunnel echart可视化 机器学习等
在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1718523.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

LIO-EKF: 运行数据UrbanNav与mid360设备详细教程

一、代码连接 代码下载连接&#xff1a; YibinWu/LIO-EKF: Maybe the simplest LiDAR-inertial odometry that one can have. (github.com) 编译步骤&#xff1a; cd srcgit clone gitgithub.com:YibinWu/LIO-EKF.gitcatkin_makesource devel/setup.bash 运行步骤&#xff1a; …

opencv进阶 ——(八)图像处理之RMBG模型AI抠图

BRIA.AI团队于HuggingFace开源了一个基于ISNet背景移除模型RMBG-1.4&#xff0c;它可以有效对前景与背景进行分离。RMBG-1.4在精心构建的数据集上训练而来&#xff0c;该数据包含常规图像、电商、游戏以及广告内容&#xff0c;该方案达到了商业级性能&#xff0c;但仅限于非商业…

React-组件通信

组件通信 概念&#xff1a;组件通信就是组件之间的数据传递&#xff0c;根据组件嵌套关系的不同&#xff0c;有不同的通信方法 父传子 基础实现 实现步骤&#xff1a; 1.父组件传递数据-在子组件标签上绑定属性 2.子组件接收数据-子组件通过props参数接收数据 props说明 1.…

AI科技,赋能企业财务管理

AI技术已深入千行百业&#xff0c;其实际任务解决能力愈发凸显和强劲。正如乔布斯所强调“技术不是为工程师而生&#xff0c;而是为应用而生”。 胜意科技深度集成业内领先技术&#xff0c;将AI融入到实际的财务工作流中&#xff0c;与OCR、RPA等智能技术组合式输出&#xff0c…

面试后总没回音,要去问面试结果吗?

知识星球&#xff08;星球名&#xff1a;芯片制造与封测技术社区&#xff0c;星球号&#xff1a;63559049&#xff09;里的学员问&#xff1a;面试一家公司的PIE&#xff0c;这家公司各方面我都很满意&#xff0c;但是面试后到现在都一周了&#xff0c;也没回音&#xff0c;要微…

@Value 读取环境变量配置

在项目开发过程中&#xff0c;有必要使用一些灰色规则&#xff08;即仅用于开发使用过程中的逻辑控制变量&#xff09;。 比如&#xff0c;本地开发中&#xff0c;一些业务逻辑需要调用第三方代码&#xff0c;但又在本地调不通&#xff0c;怎么办。只能通过 if(本地开发) {mock…

Facebook的创新实验室:人工智能与新技术探索

Facebook作为全球领先的社交媒体平台之一&#xff0c;一直在不断探索和应用最新的技术来改善用户体验、推动创新和拓展业务边界。其创新实验室更是探索人工智能&#xff08;AI&#xff09;和新技术的前沿&#xff0c;为未来的社交媒体发展开辟了新的可能性。本文将深入探讨Face…

《广告数据定量分析》第3版读书笔记之统计原理

1.点估计与区间估计:可用于求指标误差区间;(不常用) (1)总体比例的置信区间: 通过样本数据计算的比例,估计总体的对应比例的取值范围。主要适用于用户转化漏斗各环节的转化率估计,比如点击率、点击下载率、下载安装率、安装激活率等。 我们可以得到总体百分比的一个…

iOS组件化 方案 实现

iOS组件化 组件化的原因现在流行的组件化方案方案一、url-block &#xff08;基于 URL Router&#xff09;方案二、protocol调用方式解读 方案三、target-action调用方式解读 gitHub代码链接参考 组件化的原因 模块间解耦模块重用提高团队协作开发效率单元测试 当项目App处于…

例子:Triton + TensorRT-LLM

Deploy an AI Coding Assistant with NVIDIA TensorRT-LLM and NVIDIA Triton | NVIDIA Technical Blog https://github.com/triton-inference-server/tutorials/blob/main/Conceptual_Guide/Part_1-model_deployment/README.md 1. 想用onnx-runtime来做推理backend&#xff1…

React + Taro 项目 实际书写 感受

之前我总结了部分react 基础 根据官网的内容 以及Taro 框架的内容 今天我试着开始写了一下页面和开发 说一下我的感受 我之前写的是vue3 今天是第一次真正根据需求做页面开发 和逻辑功能 代码的书写 主体就是开发了这个页面 虽说这个页面 很简单 但是如果你要是第一次写 难说…

Facebook的算法揭秘:如何塑造我们的信息

在当今数字化时代&#xff0c;Facebook已经成为人们日常生活中不可或缺的一部分。其信息流算法不仅决定着我们在平台上看到的内容&#xff0c;还对我们的观点、行为和体验产生了深远的影响。本文将深入探讨Facebook的算法运作方式&#xff0c;以及它对我们信息获取和社交行为的…

神器!!Python热重载调试【送源码】

在 Python 开发的路上&#xff0c;调试是我们不可避免的一环。 而今天推荐的开源项目Reloadium &#xff0c;让你在不重启程序的情况下实现代码的即时更新和调试。 &#x1f504; Reloadium 功能亮点&#xff1a; 1. 热重载魔法&#xff1a; Reloadium 不仅仅能够实现代码的…

Android高通 12/13 录屏流程代码位置

需求如下图 实现系统录屏功能 frameworks/base/packages/SystemUI/src/com/android/systemui/screenrecord 涉及代码 ScreenRecordDialog # startBtn RecordingService # startRecording# stopRecording ScreenMediaRecorder # start # end #save 1、点击开始录屏framewo…

停车场车位引导系统方案升级实施步骤流程是什么,有什么注意事项

停车场车位引导系统是一种现代化的停车管理系统&#xff0c;它通过实时监测车位占用情况&#xff0c;并向驾驶员提供准确的空闲车位导航信息&#xff0c;从而提高停车场的使用效率和用户体验。随着城市交通的快速发展和车辆数量的不断增加&#xff0c;停车场车位引导系统已成为…

【数据分享】中国科技统计年鉴Excel版(1991-2023年)

大家好&#xff01;今天我要向大家介绍一份重要的中国科技统计数据资源——《中国科技统计年鉴》。这份年鉴涵盖了从1991年到2023年中国科技统计全面数据&#xff0c;并提供限时免费下载。 数据介绍 在数字化时代的浪潮中&#xff0c;数据的重要性日益凸显。对于研究人员、政…

AutoSQT 2024汽车软件质量与测试峰会开启注册 | 智能汽车软件如何卷出差异化?

在汽车行业向智能化、网联化转型的大趋势下&#xff0c;软件在汽车系统中扮演着越来越核心的角色。基于“软件定义汽车”的行业共识&#xff0c;各主机厂正在不断押注软件开发&#xff0c;以实现品牌差异化竞争。 例如&#xff0c;大众集团正在开发下一代汽车操作系统和应用软…

Spring MVC 应⽤分层

什么是应用分层 引用分层是一种软件开发思想 将应用程序分为N个层次每个层次负责各个职责 其中MVC是常见的设计模式这就是应用分层的具体体现 目前主流的开发方式是前后段分离后端开发工程师不再需要关注前端的实现,对此就需要分为表现层&#xff0c;数据层&#xff0c;业务逻…

LM2733升压芯片

具有 40V 内部 FET 开关且采用 SOT-23 封装的 LM2733 0.6MHz 和 1.6MHz 升压转换器 外观 参考价格 1 特性 电路原理图 基于LM2733升压电路设计-CSDN博客https://blog.csdn.net/qq_31251431/article/details/107479885 特此记录 anlog 2024年5月31日 高压方案 此方案经过更多…

数据结构 | 二叉树(基本概念、性质、遍历、C代码实现)

1.树的基本概念 树是一种 非线性 的数据结构&#xff0c;它是由n&#xff08;n>0&#xff09;个有限结点组成一个具有层次关系的集合。 把它叫做树是因 为它看起来像一棵倒挂的树&#xff0c;也就是说它是根朝上&#xff0c;而叶朝下的。 有一个特殊的结点&#xff0c;称为根…