使用 Flink 消费 Kafka 中 ChangeRecord 主题的数据,每隔 1 分钟输出最近 3 分钟的预警次数最多的 设备,将结果存入Redis 中, key 值为 “warning_last3min_everymin_out” , value 值为 “ 窗口结束时间,设备id” (窗口结束时间格式: yyyy-MM-dd HH:mm:ss )。使用 redis cli 以 HGETALL key方式获取 warning_last3min_everymin_out值。注:时间语义使用 Processing Time 。
-
Kafka Source
- 从 Kafka 中读取实时的设备预警数据,数据内容应当包括设备 ID 和预警状态等信息。
- 数据通过
SimpleStringSchema
反序列化为字符串格式,再由parseMessage
进行解析和提取。
-
流处理与窗口
- Flink 使用滑动时间窗口 (
SlidingProcessingTimeWindows.of(Time.minutes(3), Time.minutes(1))
) 来计算每 1 分钟内过去 3 分钟内的设备预警数据。 - 这意味着每 1 分钟计算一次,在每次计算中,会考虑过去 3 分钟内的数据,因此具有滑动窗口的特点。
- Flink 使用滑动时间窗口 (
-
窗口函数
- 在
MaxNumWarnMachineID
中,窗口内的数据按设备 ID 分组,统计每个设备的预警次数,并选出预警次数最多的设备 ID。 apply
方法处理窗口内的数据后,输出一个包含时间戳(窗口结束时间)和设备 ID 的元组。
- 在
-
Redis Sink
- 计算后的每个时间窗口的最大预警设备 ID 将通过 Redis Sink 写入 Redis,数据结构为
HSET
。 - Redis 中的键为
warning_last3min_everymin_out
,值为设备 ID。
- 计算后的每个时间窗口的最大预警设备 ID 将通过 Redis Sink 写入 Redis,数据结构为
package flink.calculate.ChangeRecord
import org.apache.flink.api.common.eventtime.WatermarkStrategy
import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.connector.kafka.source.{KafkaSource, KafkaSourceBuilder}
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.assigners.SlidingProcessingTimeWindows
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
import org.apache.flink.streaming.connectors.redis.common.mapper.{RedisCommand, RedisCommandDescription, RedisMapper}
import org.apache.flink.util.Collector
import java.text.SimpleDateFormat
import java.util.Date
import scala.collection.mutable
// 定义常量
object Constants {
val TOPIC_NAME = "ChangeRecord"
val BOOTSTRAP_SERVERS = "192.168.222.101:9092,192.168.222.102:9092,192.168.222.103:9092"
val REDIS_HOST = "192.168.222.101"
}
// 主程序逻辑
object WarningLast3MinEveryMinOut {
def main(args: Array[String]): Unit = {
// 创建流执行环境并配置
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1) // 设置作业并行度
// 构建Kafka数据源
val kafkaSource = buildKafkaSource()
// 从Kafka读取数据并处理
val dataStream = env.fromSource(kafkaSource, WatermarkStrategy.noWatermarks(), Constants.TOPIC_NAME)
.map(parseMessage) // 解析消息为 (标识符, 设备ID, 状态)
.filter(_._3 == "预警") // 过滤非预警状态的数据
.keyBy(_._1) // 按标识符分组
.windowAll(SlidingProcessingTimeWindows.of(Time.minutes(3), Time.minutes(1))) // 滑动窗口
.apply(new MaxNumWarnMachineID) // 应用窗口函数计算每分钟内过去3分钟的最多预警设备
// 输出到控制台和Redis
dataStream.print("Result =>")
dataStream.addSink(buildRedisSink())
// 执行Flink作业
env.execute("WarningLast3MinEveryMinOut Job")
}
// 构建Kafka数据源
private def buildKafkaSource(): KafkaSource[String] = {
KafkaSource.builder[String]()
.setTopics(Constants.TOPIC_NAME)
.setBootstrapServers(Constants.BOOTSTRAP_SERVERS)
.setStartingOffsets(OffsetsInitializer.latest())
.setValueOnlyDeserializer(new SimpleStringSchema())
.build()
}
// 解析来自Kafka的消息为元组
private def parseMessage(message: String): (String, String, String) = {
val fields = message.split(",")
("warning_last3min_everymin_out", fields(1), fields(3))
}
// 构建Redis Sink
private def buildRedisSink(): ConnRedis.RedisSink[(String, String)] = {
new ConnRedis(Constants.REDIS_HOST, 6379).getRedisSink(new Last3MinRedisMapper)
}
}
// 预警设备计数窗口函数
class MaxNumWarnMachineID extends AllWindowFunction[(String, String, String), (String, String), TimeWindow] {
override def apply(window: TimeWindow, input: Iterable[(String, String, String)], out: Collector[(String, String)]): Unit = {
// 统计每个设备ID的预警次数
val machineCounts = input.groupBy(_._2).view.mapValues(_.size)
// 获取窗口结束时间
val windowEndTime = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date(window.getEnd))
// 获取预警次数最多的设备ID
if (machineCounts.nonEmpty) {
val maxMachineId = machineCounts.maxBy(_._2)._1
out.collect((windowEndTime, maxMachineId))
}
}
}
// Redis映射器
private class Last3MinRedisMapper extends RedisMapper[(String, String)] {
override def getCommandDescription: RedisCommandDescription = new RedisCommandDescription(RedisCommand.HSET, "warning_last3min_everymin_out")
override def getKeyFromData(data: (String, String)): String = data._1
override def getValueFromData(data: (String, String)): String = data._2
}