工业—使用Flink处理Kafka中的数据_ChangeRecord2

news2024/12/27 14:12:54

使用 Flink 消费 Kafka ChangeRecord 主题的数据,每隔 1 分钟输出最近 3 分钟的预警次数最多的 设备,将结果存入Redis 中, key 值为 “warning_last3min_everymin_out” value 值为 窗口结束时间,设备id” (窗口结束时间格式: yyyy-MM-dd HH:mm:ss )。使用 redis cli HGETALL key方式获取 warning_last3min_everymin_out值。
注:时间语义使用 Processing Time
  1. Kafka Source

    • 从 Kafka 中读取实时的设备预警数据,数据内容应当包括设备 ID 和预警状态等信息。
    • 数据通过 SimpleStringSchema 反序列化为字符串格式,再由 parseMessage 进行解析和提取。
  2. 流处理与窗口

    • Flink 使用滑动时间窗口 (SlidingProcessingTimeWindows.of(Time.minutes(3), Time.minutes(1))) 来计算每 1 分钟内过去 3 分钟内的设备预警数据。
    • 这意味着每 1 分钟计算一次,在每次计算中,会考虑过去 3 分钟内的数据,因此具有滑动窗口的特点。
  3. 窗口函数

    • 在 MaxNumWarnMachineID 中,窗口内的数据按设备 ID 分组,统计每个设备的预警次数,并选出预警次数最多的设备 ID。
    • apply 方法处理窗口内的数据后,输出一个包含时间戳(窗口结束时间)和设备 ID 的元组。
  4. Redis Sink

    • 计算后的每个时间窗口的最大预警设备 ID 将通过 Redis Sink 写入 Redis,数据结构为 HSET
    • Redis 中的键为 warning_last3min_everymin_out,值为设备 ID。

 

package flink.calculate.ChangeRecord

import org.apache.flink.api.common.eventtime.WatermarkStrategy
import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.connector.kafka.source.{KafkaSource, KafkaSourceBuilder}
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.assigners.SlidingProcessingTimeWindows
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
import org.apache.flink.streaming.connectors.redis.common.mapper.{RedisCommand, RedisCommandDescription, RedisMapper}
import org.apache.flink.util.Collector
import java.text.SimpleDateFormat
import java.util.Date
import scala.collection.mutable

// 定义常量
object Constants {
  val TOPIC_NAME = "ChangeRecord"
  val BOOTSTRAP_SERVERS = "192.168.222.101:9092,192.168.222.102:9092,192.168.222.103:9092"
  val REDIS_HOST = "192.168.222.101"
}

// 主程序逻辑
object WarningLast3MinEveryMinOut {
  def main(args: Array[String]): Unit = {
    // 创建流执行环境并配置
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1) // 设置作业并行度

    // 构建Kafka数据源
    val kafkaSource = buildKafkaSource()

    // 从Kafka读取数据并处理
    val dataStream = env.fromSource(kafkaSource, WatermarkStrategy.noWatermarks(), Constants.TOPIC_NAME)
      .map(parseMessage) // 解析消息为 (标识符, 设备ID, 状态)
      .filter(_._3 == "预警") // 过滤非预警状态的数据
      .keyBy(_._1) // 按标识符分组
      .windowAll(SlidingProcessingTimeWindows.of(Time.minutes(3), Time.minutes(1))) // 滑动窗口
      .apply(new MaxNumWarnMachineID) // 应用窗口函数计算每分钟内过去3分钟的最多预警设备

    // 输出到控制台和Redis
    dataStream.print("Result =>")
    dataStream.addSink(buildRedisSink())

    // 执行Flink作业
    env.execute("WarningLast3MinEveryMinOut Job")
  }

  // 构建Kafka数据源
  private def buildKafkaSource(): KafkaSource[String] = {
    KafkaSource.builder[String]()
      .setTopics(Constants.TOPIC_NAME)
      .setBootstrapServers(Constants.BOOTSTRAP_SERVERS)
      .setStartingOffsets(OffsetsInitializer.latest())
      .setValueOnlyDeserializer(new SimpleStringSchema())
      .build()
  }

  // 解析来自Kafka的消息为元组
  private def parseMessage(message: String): (String, String, String) = {
    val fields = message.split(",")
    ("warning_last3min_everymin_out", fields(1), fields(3))
  }

  // 构建Redis Sink
  private def buildRedisSink(): ConnRedis.RedisSink[(String, String)] = {
    new ConnRedis(Constants.REDIS_HOST, 6379).getRedisSink(new Last3MinRedisMapper)
  }
}

// 预警设备计数窗口函数
class MaxNumWarnMachineID extends AllWindowFunction[(String, String, String), (String, String), TimeWindow] {
  override def apply(window: TimeWindow, input: Iterable[(String, String, String)], out: Collector[(String, String)]): Unit = {
    // 统计每个设备ID的预警次数
    val machineCounts = input.groupBy(_._2).view.mapValues(_.size)

    // 获取窗口结束时间
    val windowEndTime = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date(window.getEnd))

    // 获取预警次数最多的设备ID
    if (machineCounts.nonEmpty) {
      val maxMachineId = machineCounts.maxBy(_._2)._1
      out.collect((windowEndTime, maxMachineId))
    }
  }
}

// Redis映射器
private class Last3MinRedisMapper extends RedisMapper[(String, String)] {
  override def getCommandDescription: RedisCommandDescription = new RedisCommandDescription(RedisCommand.HSET, "warning_last3min_everymin_out")

  override def getKeyFromData(data: (String, String)): String = data._1

  override def getValueFromData(data: (String, String)): String = data._2
}

 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2253424.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Android 消息队列之MQTT的使用:物联网通讯,HTTP太重了,使用MQTT;断网重连、注册、订阅、发送数据和接受数据,实现双向通讯。

目录: 问题MQTT是什么以及为什么使用如何使用:第一阶段、基础功能如何使用:第二阶段、增加断网重连如何使用:第三阶段、封装 一、问题 在开发的时候,我们一般都使用Http和后台进行通讯,比如我们是开发物联…

项目-02-数学学院后台项目开发过程中的问题总结

目录 一、后台(pc端,vue2)1. dialog对话框被黑色蒙层盖住2. 将前端表格导出为word文档3. 在线查看、下载 .docx、.doc、.pdf文档 一、后台(pc端,vue2) 1. dialog对话框被黑色蒙层盖住 问题: d…

大数据实验E5HBase:安装配置,shell 命令和Java API使用

实验目的 熟悉HBase操作常用的shell 命令和Java API使用; 实验要求 掌握HBase的基本操作命令和函数接口的使用; 实验平台 操作系统:Linux(建议Ubuntu16.04或者CentOS 7 以上);Hadoop版本:3…

使用Tomcat搭建简易文件服务器

创建服务器 1. 复制一个tomcat服务器,并命名为file-service(好区分即可) 2.在webapp里面新建一个文件夹 uploadfiles ,用于存储上传的文件 3. 修改conf/service.xml,配置文件服务器的端口与上传文件夹的访问 在Host标签之间加入一个Context标签 docBase"uploa…

【算法】位运算合集

阿华代码,不是逆风,就是我疯 你们的点赞收藏是我前进最大的动力!! 希望本文内容能够帮助到你!! 目录 零:位运算基础公式 零:五道基础题 1:位1的个数 2:比…

【NLP高频面题 - LLM架构篇】旋转位置编码RoPE相对正弦位置编码有哪些优势?

【NLP高频面题 - LLM架构篇】旋转位置编码RoPE相对正弦位置编码有哪些优势? 重要性:⭐⭐⭐ 💯 NLP Github 项目: NLP 项目实践:fasterai/nlp-project-practice 介绍:该仓库围绕着 NLP 任务模型的设计、训练…

《Vue零基础教程》(5)计算属性和侦听器好讲解

1 计算属性 1) 什么是计算属性 计算属性就是基于现有属性计算后的属性 2) 计算属性的作用 计算属性用于对原始数据的再次加工 3) 案例 需求 实现如下效果 使用表达式实现 <!DOCTYPE html> <html lang"en"><head><meta charset"UTF…

Narya.ai正在寻找iOS工程师!#Mixlab内推

如果你对AI技术和iOS开发充满热情&#xff0c;这里有一个绝佳的机会加入一家专注于AI应用创新的初创公司。Narya.ai正在招聘iOS工程师&#xff0c;帮助他们开发下一代效率工具&#xff0c;旨在提升用户的日常生活效率与幸福感。 关于Narya.ai&#xff1a; 专注于AI应用层创新&a…

【开源免费】基于SpringBoot+Vue.JS课程答疑系统(JAVA毕业设计)

博主说明&#xff1a;本文项目编号 T 070 &#xff0c;文末自助获取源码 \color{red}{T070&#xff0c;文末自助获取源码} T070&#xff0c;文末自助获取源码 目录 一、系统介绍二、演示录屏三、启动教程四、功能截图五、文案资料5.1 选题背景5.2 国内外研究现状5.3 可行性分析…

FPGA实战篇(触摸按键控制LED灯)

1.触摸按键简介 触摸按键主要可分为四大类&#xff1a;电阻式、电容式、红外感应式以及表面声波式。根据其属性的不同&#xff0c;每种触摸按键都有其合适的使用领域。 电阻式触摸按键由多块导电薄膜按照按键的位置印制而成&#xff0c;但由于耐用性较差且维护复杂&#xff0c…

VSCode如何关闭Vite项目本地自启动

某些情况下VSCode打开Vite项目不需要自动启动&#xff0c;那么如何关闭该功能 文件>首选项>设置 搜索vite 将Vite:Auto Start 勾选取消即可

重生之我在异世界学编程之C语言:深入指针篇(上)

大家好&#xff0c;这里是小编的博客频道 小编的博客&#xff1a;就爱学编程 很高兴在CSDN这个大家庭与大家相识&#xff0c;希望能在这里与大家共同进步&#xff0c;共同收获更好的自己&#xff01;&#xff01;&#xff01; 本文目录 引言正文&#xff08;1&#xff09;内置数…

TypeScript (一)运行环境配置,数据类型,可选类型,联合类型,type与interface,交叉类型,断言as,字面量类型,类型缩小

文章目录 一、认识TS1.1 JS 存在的问题1.2 TS的出现1.3 TS运行环境运行ts的三种方式 1.4 变量声明1.5 类型推断 二、数据类型2.1 JS数据类型(1) 数组Array(2) 对象Object(3) 其他类型 2.2 TS特有数据类型(1) any类型(2) unknown类型(3) void类型(4) never (了解)(5) tuple类型 …

【Leetcode 每日一题】3274. 检查棋盘方格颜色是否相同

问题背景 给你两个字符串 c o o r d i n a t e 1 coordinate1 coordinate1 和 c o o r d i n a t e 2 coordinate2 coordinate2&#xff0c;代表 8 8 8 \times 8 88 国际象棋棋盘上的两个方格的坐标。 以下是棋盘的参考图。 如果这两个方格颜色相同&#xff0c;返回 t …

【Dubbo03】消息队列与微服务之dubbo-admin 二进制与编译安装

实战案例&#xff1a;二进制安装 dubbo-admin 新版用Golang重构&#xff0c;提供了二进制包&#xff0c;可以直接部署 #下载二进制包 [rootubuntu2204 ~]#wget https://github.com/apache/dubbo-admin/releases/download/0.5.0/apache-dubbo-admin-0.5.0-bin-release.tar.gz …

Kylin Server V10 下 Kafka 集群部署

一、ZooKeeper 集群部署 1、主机规划 主机名 IP 地址 myid 10.8.3.35 1 10.8.3.36 2 10.8.3.37 3 2、拓扑结构 3、部署 (1) 下载Zookeeper [root@localhost ~]# cd /usr/local [root@localhost local]# wget https://www.apache.org/dyn/closer.lua/zookeeper/zookeeper-…

redis的应用----缓存

redis的应用----缓存 一、缓存的概念二、使用redis作为缓存2.1使用redis作为缓存的原因2.2缓存机制的访问步骤 三、缓存的更新策略3.1定期更新3.2实时更新3.3淘汰策略 四、缓存常见的问题4.1缓存预热(Cache preheating)4.2缓存穿透(Cache penetration)4.3缓存雪崩(Cache avalan…

用于LiDAR测量的1.58um单芯片MOPA(一)

--翻译自M. Faugeron、M. Krakowski1等人2014年的文章 1.简介 如今&#xff0c;人们对高功率半导体器件的兴趣日益浓厚&#xff0c;这些器件主要用于遥测、激光雷达系统或自由空间通信等应用。与固态激光器相比&#xff0c;半导体器件更紧凑且功耗更低&#xff0c;这在低功率供…

SpringBoot两天

SpringBoot讲义 什么是SpringBoot&#xff1f; Spring Boot是由Pivotal团队提供的全新框架&#xff0c;其设计目的是用来简化新Spring应用的初始搭建以及开发过程。该框架使用了特定的方式来进行配置&#xff0c;从而使开发人员不再需要定义样板化的配置。通过这种方式&#xf…

vue3项目最新eslint9+prettier+husky+stylelint+vscode配置

一、eslint9和prettier通用配置 安装必装插件 ESlint9.x pnpm add eslintlatest -DESlint配置 vue 规则 , typescript解析器 pnpm add eslint-plugin-vue typescript-eslint -DESlint配置 JavaScript 规则 pnpm add eslint/js -D配置所有全局变量 globals pnpm add globa…