Flink实现异常登陆监控(两秒内多次登陆失败进行异常行为标记)
在大数据处理领域,Apache Flink 是一个流行的开源流处理框架,能够高效处理实时数据流。在这篇博客中,我们将展示如何使用 Apache Flink 从 MySQL 中读取数据并进行实时异常监控处理,最终将结果写回到 MySQL 数据库中的err_login表中。
项目概述
我们的示例程序将会执行以下任务:
从 MySQL 数据库读取用户登录数据。
过滤出特定状态的登录记录。
对这些记录进行时间窗口处理。
将处理结果写回 MySQL 数据库。
依赖环境
在开始之前,请确保你已经安装了以下环境:
pom文件
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>org.example</groupId>
<artifactId>EastMoney</artifactId>
<version>1.0-SNAPSHOT</version>
<repositories>
<repository>
<id>central</id>
<name>Maven Central Repository</name>
<url>https://repo.maven.apache.org/maven2</url>
</repository>
</repositories>
<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_2.11</artifactId>
<version>1.14.6</version>
</dependency>
<!-- Apache Flink dependencies -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-java-bridge_2.11</artifactId>
<version>1.14.6</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-scala-bridge_2.11</artifactId>
<version>1.14.6</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-scala_2.11</artifactId>
<version>1.14.6</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner_2.11</artifactId>
<version>1.14.6</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-scala_2.11</artifactId>
<version>1.14.6</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-jdbc_2.11</artifactId>
<version>1.14.6</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.11</artifactId>
<version>1.14.6</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-csv</artifactId>
<version>1.14.6</version>
</dependency>
<dependency>
<groupId>com.ververica</groupId>
<artifactId>flink-sql-connector-mysql-cdc</artifactId>
<version>2.3.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka_2.11</artifactId>
<version>1.14.6</version>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>8.0.25</version>
</dependency>
</dependencies>
</project>
MySQL 数据库
CREATE TABLE `login_detail` (
`id` int NOT NULL AUTO_INCREMENT,
`username` varchar(255) DEFAULT NULL,
`password` varchar(255) DEFAULT NULL,
`time` varchar(255) DEFAULT NULL,
`status` int DEFAULT NULL,
PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=127 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci
CREATE TABLE `err_login` (
`id` int NOT NULL AUTO_INCREMENT,
`username` varchar(255) DEFAULT NULL,
`status` int DEFAULT NULL,
PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=74 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci
1. 数据模型定义
首先,我们定义了一个简单的 User case class,用于表示从 MySQL 中读取的用户数据。
case class User(id: Int, username: String, password: String, time: String, status: Int)
2.自定义 MySQL 数据源
我们实现了一个自定义的 RichSourceFunction,从 MySQL 数据库中读取数据。该函数会不断地查询数据库,并将新数据发送到 Flink 流中。
class MySQLInsertSource(jdbcUrl: String, username: String, password: String, tableName: String) extends RichSourceFunction[User] {
@volatile private var isRunning = true
private var connection: Connection = _
private var lastMaxTime: String = _
override def open(parameters: org.apache.flink.configuration.Configuration): Unit = {
super.open(parameters)
connection = DriverManager.getConnection(jdbcUrl, username, password)
// Initial load
val statement = connection.createStatement()
val resultSet = statement.executeQuery(s"SELECT * FROM $tableName")
while (resultSet.next()) {
val user = User(
resultSet.getInt("id"),
resultSet.getString("username"),
resultSet.getString("password"),
resultSet.getString("time"),
resultSet.getInt("status")
)
// Update lastMaxTime
if (lastMaxTime == null || user.time > lastMaxTime) {
lastMaxTime = user.time
}
}
}
override def run(ctx: SourceFunction.SourceContext[User]): Unit = {
val statement = connection.createStatement()
while (isRunning) {
val query = s"SELECT * FROM $tableName WHERE time > '$lastMaxTime'"
val resultSet = statement.executeQuery(query)
while (resultSet.next()) {
val user = User(
resultSet.getInt("id"),
resultSet.getString("username"),
resultSet.getString("password"),
resultSet.getString("time"),
resultSet.getInt("status")
)
ctx.collect(user)
// Update lastMaxTime
if (user.time > lastMaxTime) {
lastMaxTime = user.time
}
}
Thread.sleep(2000) // sleep for 2 seconds
}
}
override def cancel(): Unit = {
isRunning = false
if (connection != null) {
connection.close()
}
}
}
变量声明:
isRunning: 用于控制数据源是否继续运行。
connection: 用于连接 MySQL 数据库的 Connection 对象。
lastMaxTime: 记录上次读取数据的最大时间戳,用于增量查询。
open 方法:在数据源启动时初始化数据库连接并进行初始加载,读取全部数据,更新 lastMaxTime。
run 方法:在数据源运行时不断查询数据库,获取新数据并发送到 Flink 流中。每隔2秒执行一次查询,并更新 lastMaxTime。
cancel 方法:在数据源取消时关闭数据库连接。
3. 时间戳分配器和水位线
为了确保事件按时间顺序处理,我们为数据流分配时间戳并生成水位线。
val userStreamWithTimestamps = userStream
.assignTimestampsAndWatermarks(
WatermarkStrategy
.forBoundedOutOfOrderness[User](Duration.ofSeconds(1))
.withTimestampAssigner(new SerializableTimestampAssigner[User] {
override def extractTimestamp(element: User, recordTimestamp: Long): Long = {
val format = new java.text.SimpleDateFormat("yyyy-MM-dd HH:mm:ss")
val date = format.parse(element.time)
date.getTime
}
})
)
WatermarkStrategy:定义了水位线生成策略。forBoundedOutOfOrderness 表示允许事件在1秒的乱序范围内到达。
SerializableTimestampAssigner:定义了时间戳提取器,从 User 对象的 time 字段提取时间戳。
4. 数据过滤和窗口处理
我们过滤出 status 为 0 的记录,并对这些记录进行2秒的窗口处理。
val filteredStream = userStreamWithTimestamps.filter(_.status == 0)
val windowedStream = filteredStream
.keyBy(_.username)
.timeWindow(Time.seconds(2))
.process(new WriteToDatabaseFunction(jdbcUrl, username, password))
过滤:filter 操作保留 status 为 0 的记录。(0为登陆失败)
窗口处理:对每个 username 进行2秒的时间窗口处理,并使用自定义的 WriteToDatabaseFunction 进行处理。
5. 窗口处理函数
我们实现了一个 ProcessWindowFunction,在窗口结束时将获取到的异常登陆用户写入 MySQL 数据库。
class WriteToDatabaseFunction(url: String, username: String, password: String) extends ProcessWindowFunction[User, String, String, TimeWindow] {
val insertSql = "INSERT INTO err_login (username, status) VALUES (?, ?)"
override def process(key: String, context: Context, elements: Iterable[User], out: Collector[String]): Unit = {
val allStatusOne = elements.forall(_.status == 0)
if (allStatusOne) {
out.collect(s"Username: $key had status 1 for 2 seconds")
val connection = DriverManager.getConnection(url, username, password)
val preparedStatement = connection.prepareStatement(insertSql)
try {
for (user <- elements) {
preparedStatement.setString(1, user.username)
preparedStatement.setInt(2, user.status)
preparedStatement.addBatch()
}
preparedStatement.executeBatch()
} finally {
preparedStatement.close()
connection.close()
}
}
}
}
变量声明:insertSql 为插入错误登录记录的 SQL 语句。
process 方法:
检查窗口内的所有记录 status 是否都为 0。
如果是,打印日志并将记录写入 err_login 表中。
使用批量插入提高效率。
6. 主函数
最后,我们将所有部分组装在一起,并执行 Flink 作业。
object FlinkMySQLExample {
val jdbcUrl = "jdbc:mysql://localhost:3306/big_data"
val username = "root"
val password = "12345678"
val tableName = "login_detail"
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
val mySQLSource = new MySQLInsertSource(jdbcUrl, username, password, tableName)
val userStream = env.addSource(mySQLSource)
userStream.print()
val userStreamWithTimestamps = userStream
.assignTimestampsAndWatermarks(
WatermarkStrategy
.forBoundedOutOfOrderness[User](Duration.ofSeconds(1))
.withTimestampAssigner(new SerializableTimestampAssigner[User] {
override def extractTimestamp(element: User, recordTimestamp: Long): Long = {
val format = new java.text.SimpleDateFormat("yyyy-MM-dd HH:mm:ss")
val date = format.parse(element.time)
date.getTime
}
})
)
val filteredStream = userStreamWithTimestamps.filter(_.status == 0)
val windowedStream = filteredStream
.keyBy(_.username)
.timeWindow(Time.seconds(2))
.process(new WriteToDatabaseFunction(jdbcUrl, username, password))
windowedStream.print()
env.execute("Flink MySQL Example")
}
}
主函数:
获取 Flink 的执行环境。
添加自定义数据源 MySQLInsertSource,从 MySQL 数据库中读取数据。
将数据流赋予时间戳和水位线。
过滤出 status 为 0 的记录。
对过滤后的记录进行2秒的窗口处理,并将结果写入 MySQL 数据库。
执行 Flink 作业。
7.总结
这段代码展示了如何使用 Apache Flink 处理实时数据流,并与 MySQL 数据库进行交互。通过自定义数据源、时间戳和水位线分配、窗口处理和自定义窗口函数,我们可以构建强大的流处理应用程序。
如有遇到问题可以找小编沟通交流哦。另外小编帮忙辅导大课作业,学生毕设等。不限于MapReduce, MySQL, python,java,大数据,模型训练等。 hadoop hdfs yarn spark Django flask flink kafka flume datax sqoop seatunnel echart可视化 机器学习等