SpringBoot 实现EMQ设备的上下线告警

news2025/1/26 15:54:31

前言

上下线通知

我遇到了一个难题,即在使用EMQ X 4.4.10的开源版本时,我需要实现设备的上下线状态监控,但该4.4.10开源版本并未内置设备上下线提醒模块,只有企业版才内置了该模块。这为我带来了一些技术上的难题,迫使我必须另辟蹊径,通过其他方法来监听设备的上下线状态。为了解决这个问题,我采取了以下步骤:

首先,我修改了EMQ Xacl.config文件,添加了以下规则:

{allow, all, subscribe, ["$SYS/brokers/+/clients/#"]}.

图示:

这个规则允许订阅$SYS/brokers/+/clients/#主题的所有客户端。

接下来,我使用Spring Boot创建了一个应用程序,其中我设置了与EMQ X代理的连接。在这个应用程序中,我创建了一个监听器,用于订阅$SYS/brokers/+/clients/#主题,以感知设备的上下线信息。

通过这个解决方案,我能够实时监控设备的连接和断开事件,而不受EMQ X开源版本的功能限制。这使我能够根据设备状态采取适当的措施,比如发送通知或执行其他自定义操作。这个方法允许我灵活地定制解决方案,以满足我的特定需求,而无需依赖EMQ X的功能。

EMQ简介


EMQErlang MQTT Broker)是一种基于Erlang编程语言开发的开源消息传递代理(MQTT broker),专门用于支持MQTTMessage Queuing Telemetry Transport)协议。MQTT是一种轻量级、高效的消息传递协议,最初设计用于连接受限的设备,如嵌入式系统和物联网设备。EMQ作为MQTT broker,提供了可靠的消息传递机制,使设备能够相互通信,同时也可与后端应用程序集成,使其成为物联网生态系统中的重要组成部分。

环境

  • EMQX安装方式:Docker
  • EMQX版本:4.4.10开源版本
  • 操作系统:CentOS 7

EMQX4.4版本官方文档

下载

下载 EMQX

准备工作

安装EMQX

修改EMQX的ACL规则内容

EMQX的Docker容器配置文件所在目录

# 进入EMQX容器
docker exec -it emqx /bin/sh
# 进入配置文件目录
cd /opt/emqx/etc
# 查看acl配置文件
cat acl.conf
# 编辑acl配置文件
vi acl.conf

非Docker容器配置文件所在目录

# 进入配置文件目录
cd /etc/emqx
# 查看acl配置文件
cat acl.conf
# 编辑acl配置文件
vi acl.conf

acl的默认文件内容

%%--------------------------------------------------------------------
%% [ACL](https://docs.emqx.io/broker/v3/en/config.html)
%%
%% -type(who() :: all | binary() |
%%                {ipaddr, esockd_access:cidr()} |
%%                {ipaddrs, [esockd_access:cidr()]} |
%%                {client, binary()} |
%%                {user, binary()}).
%%
%% -type(access() :: subscribe | publish | pubsub).
%%
%% -type(topic() :: binary()).
%%
%% -type(rule() :: {allow, all} |
%%                 {allow, who(), access(), list(topic())} |
%%                 {deny, all} |
%%                 {deny, who(), access(), list(topic())}).
%%--------------------------------------------------------------------

{allow, {user, "dashboard"}, subscribe, ["$SYS/#"]}.

{allow, {ipaddr, "127.0.0.1"}, pubsub, ["$SYS/#", "#"]}.

{deny, all, subscribe, ["$SYS/#", {eq, "#"}]}.

{allow, all}.

新增一条ACL规则

allow, all, subscribe, ["$SYS/brokers/+/clients/#"]}.
  • allow: 表示这是一个允许(allow)访问的规则。这意味着与此规则匹配的操作将被允许。
  • all: 表示这个规则适用于所有的客户端。
  • subscribe: 表示这个规则定义了对主题的订阅权限。
  • $SYS/brokers/+/clients/#: 这是一个主题过滤器,它指定了匹配的主题模式。在这里,$SYS/brokers/+/clients/# 表示以 $SYS/brokers/ 开头,然后是一个通配符 +,它可以匹配单个层级的任何名称,接着是 clients/,最后又有一个 # 通配符,它可以匹配零个或多个层级的名称。因此,这个主题过滤器匹配了所有以 $SYS/brokers/ 开头,然后紧跟着 clients/ 的主题。

综合起来,这个规则允许所有的客户端订阅以 $SYS/brokers/ 开头,然后跟着 clients/的所有主题。通常,这种规则用于允许所有客户端订阅系统级别的信息或监控数据,如经纪人(Broker)的客户端连接状态等。这可以用于监视和诊断MQTT Broker 的性能和状态。

注意:修改完毕之后使用emqx_ctl reload_acl重新加载acl规则或者直接重启emqx服务

搭建一个能够监听EMQX主题的Spring Boot应用程序

MQTT相关依赖

<!-- MQTT相关依赖 -->
<dependency>
    <groupId>org.springframework.integration</groupId>
    <artifactId>spring-integration-stream</artifactId>
</dependency>
<dependency>
    <groupId>org.springframework.integration</groupId>
    <artifactId>spring-integration-mqtt</artifactId>
</dependency>

MQTT接受订阅的主题

$SYS/brokers/+/clients/#

处理设备上下线事件

获取EMQX消息主题

// 从消息请求头中获取消息主题topic
String topic = String.valueOf(message.getHeaders().get(MqttHeaders.RECEIVED_TOPIC));

获取topic最后的节点字符串

以下方式通过主题topic来获取ClientId

// topic最后的节点字符串
String lastPart = extractLastPart(topic);
// 获取消息内容
String payload = StrUtil.str((byte[]) message.getPayload(), "utf-8");
// 判断设备是上线或下线消息
if ("connected".equals(lastPart)) {
    // 设备上线消息
    clientId = extractClientIdFromTopic(topic);
    log.info("设备上线提醒 -> IMEI:{}", clientId);
} else if ("disconnected".equals(lastPart)) {
    // 设备下线消息
    clientId = extractClientIdFromTopic(topic);
    log.info("设备下线警告 -> IMEI:{}", clientId);
}

/**
 * 获取最后一个节点
 *
 * @param topic 主题
 * @return 节点内容
 */
public static String extractLastPart(String topic) {
    // 使用split方法将字符串根据'/'分割成数组
    String[] parts = topic.split("/");
    // 获取最后一个元素
    String lastPart = parts[parts.length - 1];
    return lastPart;
}

/**
 * 从Topic中提取ClientId
 *
 * @param topic 主题
 * @return ClientId
 */
public static String extractClientIdFromTopic(String topic) {
    // 使用正则表达式匹配主题中的ClientId
    String regex = "\\$SYS/brokers/[^/]+/clients/([^/]+)/(connected|disconnected)";
    Pattern pattern = Pattern.compile(regex);
    Matcher matcher = pattern.matcher(topic);
    // matcher.groupCount() 是一个方法,用于获取正则表达式中定义的组数。在正则表达式中,使用括号 () 来定义捕获组。在这个情况下,正则表达式 \\$SYS/brokers/[^/]+/clients/([^/]+)/(connected|disconnected) 中有两组,分别是括号内的 ([^/]+) 部分和 (connected|disconnected) 部分。matcher.groupCount() 返回的是正则表达式中捕获组的数量
    if (matcher.matches() && matcher.groupCount() == 2) {
        // 如果正则匹配成功,提取ClientId并返回
        return matcher.group(1);
    } else {
        // 如果匹配失败,返回null或者抛出异常,视情况而定
        return null;
}

当然你也可以通过解析payload来获取更多详细信息,可参照官方文档:客户端上下线事件

主题 (Topic)说明
${clientid}/connected上线事件。当任意客户端上线时,EMQX 就会发布该主题的消息
${clientid}/disconnected下线事件。当任意客户端下线时,EMQX 就会发布该主题的消息

connected 事件消息的 Payload 解析成 JSON 格式如下:

{
    "username": "foo",
    "ts": 1625572213873,
    "sockport": 1883,
    "proto_ver": 4,
    "proto_name": "MQTT",
    "keepalive": 60,
    "ipaddress": "127.0.0.1",
    "expiry_interval": 0,
    "connected_at": 1625572213873,
    "connack": 0,
    "clientid": "emqtt-8348fe27a87976ad4db3",
    "clean_start": true
}

disconnected 事件消息的 Payload 解析成 JSON 格式如下:

{
    "username": "foo",
    "ts": 1625572213873,
    "sockport": 1883,
    "reason": "tcp_closed",
    "proto_ver": 4,
    "proto_name": "MQTT",
    "ipaddress": "127.0.0.1",
    "disconnected_at": 1625572213873,
    "clientid": "emqtt-8348fe27a87976ad4db3"
}

可以解析JOSN数据拿到clientid,注意此处使用的JSON解析工具是Hutool的。

// 获取消息内容
String payload = StrUtil.str((byte[]) message.getPayload(), "utf-8");
// 解析JSON字符串
JSONObject payloadJsonObject = JSONUtil.parseObj(payload);
// 获取ClientId
String clientid = payloadJsonObject.getStr("clientid");

实现效果

总结

  1. 修改 EMQ X ACL 配置: 你在 EMQ X 中修改了 acl.config 文件,添加了相应的 ACL 规则,允许订阅 $SYS/brokers/+/clients/# 主题的所有客户端。这个步骤允许你在开源版本中访问关键的设备连接信息。

  2. 创建 Spring Boot 应用程序: 通过创建一个 Spring Boot 应用程序,你建立了一个连接到 EMQ X 代理的桥梁。这个应用程序充当了监听器,用于订阅 $SYS/brokers/+/clients/# 主题,以实时感知设备的连接和断开事件。

  3. 实时设备监控: 你的解决方案允许你实时监控设备的连接状态,而无需依赖 EMQ X 企业版的内置功能。这使你能够快速响应设备状态的变化,并采取相应的行动,如发送通知或执行自定义操作。

  4. 定制性: 通过这个方法,你能够灵活地定制解决方案,以满足你的特定需求。你可以根据设备状态采取不同的操作,因此具有更大的灵活性。

  5. 避免功能限制: 尽管 EMQ X 4.4.10 开源版本没有内置设备上下线提醒模块,但你成功地绕过了这个限制,通过自定义配置和应用程序开发来实现了所需的功能。

  6. 无需升级或许可证: 你的解决方案不需要升级到 EMQ X 企业版或购买额外的许可证。这使得你可以在开源版本的基础上构建所需的功能。

  7. 总而言之,你的解决方案是一种巧妙的方式,通过修改配置和创建一个自定义的 Spring Boot 应用程序,实现了设备上下线状态的监控和管理。这个方法不仅解决了技术挑战,还提供了灵活性和定制性,以满足你的特定需求。这是一个创造性的方法,适用于需要在 EMQ X 开源版本中实现设备监控的情况。

参考文献

EMQX4.4官方文档
SpringBoot整合MQTT(EMQ)设备上下线告警
【EMQX 5.0】 Spring Cloud 集成MQTT并异步入库 + 客户端上报数据 + 上下线主题订阅

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1077913.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

远程办公软件的未来趋势:预测2023年及以后的发展方向

随着科技的迅速发展&#xff0c;远程办公已经成为现代工作方式的重要组成部分。远程办公软件在过去几年中取得了巨大的进步&#xff0c;并且在全球范围内被广泛使用。本文将探讨远程办公软件在2023年及以后可能的发展方向&#xff0c;包括增强的协作功能、智能化的辅助工具、改…

坦克 400 Hi4-T:用产品诠释越野新能源

9 月 25 日&#xff0c;坦克 400 Hi4-T 正式上市&#xff0c;新车共推出两款车型配置&#xff0c;售价区间 27.98-28.98 万元。同时&#xff0c;坦克 400 Hi4-T 将上市及即交付。 权益方面&#xff0c;坦克 400 Hi4-T 共有七重好礼&#xff1a; 质保无忧&#xff1a;整车 5 年…

02 认识Verilog HDL

02 认识Verilog HDL ‍ 对于Verilog的语言的学习&#xff0c;我认为没必要一开始就从头到尾认真的学习这个语言&#xff0c;把这个语言所有细节都搞清楚也不现实&#xff0c;我们能够看懂当前FPGA的代码的程度就可以了&#xff0c;随着学习FPGA深度的增加&#xff0c;再不断的…

Autosar诊断实战系列24-0x2E服务代码级分析及ECU-Pending期间的处理

本文框架 前言1. UDS-0x2E服务逻辑整理2. Pending期间ECU的处理3. 相关工程问题思考前言 开始本篇讲述前,先抛出几个问题,UDS 2E服务在执行过程中进行了哪些操作?在2E写期间由于要操作NvM,会执行时间较长导致ECU先回复NRC 0x78,这期间ECU在进行哪些处理?ECU是如何判断2E…

单目标应用:蚁群算法(Ant Colony Optimization,ACO)求解微电网优化MATLAB

一、微网系统运行优化模型 微电网优化模型介绍&#xff1a; 微电网多目标优化调度模型简介_IT猿手的博客-CSDN博客 二、蚁群算法ACO 蚁群算法&#xff08;Ant Clony Optimization&#xff0c; ACO&#xff09;由意大利学者Colorni A., Dorigo M. 等于1991年提出&#xff0c…

开啥玩笑?一个SSD硬盘可以使用100多年?MTBF正解

在之前文章中&#xff0c;有一个参数“平均无故障时间”&#xff0c;对应的参数是MTBF&#xff0c;比如这个盘MTBF150万小时。 小编发现有一些朋友对这个参数还有误解。大家看到这个参数误认为盘可以使用150万小时都没有发生故障。如果真的是这样&#xff0c;那么这盘的质量简直…

基于springboot实现家具销售电商平台管理系统项目【项目源码+论文说明】

基于springboot实现家具销售电商平台管理系统演示 摘要 社会的发展和科学技术的进步&#xff0c;互联网技术越来越受欢迎。网络计算机的交易方式逐渐受到广大人民群众的喜爱&#xff0c;也逐渐进入了每个用户的使用。互联网具有便利性&#xff0c;速度快&#xff0c;效率高&am…

【OSPF宣告——network命令与多区域配置实验案例】

个人名片&#xff1a; &#x1f43c;作者简介&#xff1a;一名大二在校生&#xff0c;喜欢编程&#x1f38b; &#x1f43b;‍❄️个人主页&#x1f947;&#xff1a;小新爱学习. &#x1f43c;个人WeChat&#xff1a;hmmwx53 &#x1f54a;️系列专栏&#xff1a;&#x1f5bc…

ChatGLM2-6B微调实践-P-Tuning方案

ChatGLM2-6B微调实践 环境准备安装部署1、安装 Anaconda2、安装CUDA3、安装PyTorch4、安装 ChatGLM2-6B 微调实践1、准备数据集2、安装python依赖3、微调并训练新模型4、微调后模型的推理与评估5、验证与评估微调后的模型6、微调模型优化7、P-Tuning微调灾难性遗忘问题 微调过程…

vulnhub_Inferno靶机渗透测试

Inferno靶机 靶机地址&#xff1a;https://www.vulnhub.com/entry/inferno-11,603/ 文章目录 Inferno靶机信息收集web渗透获取权限横向移动权限提升靶机总结 信息收集 1.通过nmap扫描得到靶机开放22和80端口&#xff0c;看来是主web端渗透了 使用dirsearch目录扫描没得到结果…

【Mybatis】动态 SQL

动态 SQL \<if>标签\<trim>标签\<where>标签\<set>标签\<foreach>标签 动态 sql 是 Mybatis 的强⼤特性之⼀&#xff0c;能够完成不同条件下不同的 sql 拼接。 <if>标签 前端用户输入时有些选项是非必填的, 那么此时传到后端的参数是不确…

面试总结(mysql定精度/oom排查/spring三级缓存/stream流)

Mysql数据类型上的一个把握 1、MySQL Decimal为什么不会丢失精度 DECIMAL的存储方式和其他数据类型都不同&#xff0c;它是以字符串形式存储的。假设一个字段为DECIMAL(3,0)&#xff0c;当我们存入100时&#xff0c;实际上存入的1、0、0这三个字符拼接而成的字符串的二进制值&…

Linux 基本指令(下)

时间相关的指令 date显示 date 指定格式显示时间&#xff1a; date %Y:%m:%d date 用法&#xff1a;date [OPTION]... [FORMAT] 1.在显示方面&#xff0c;使用者可以设定欲显示的格式&#xff0c;格式设定为一个加号后接数个标记&#xff0c;其中常用的标记列表如下 %…

1010hw

using namespace std; class Per {friend const Per operator-(const Per &L,const Per &R);friend bool operator<(const Per &L,const Per &R);int a;int b; public://有参构造Per(int a,int b):a(a),b(b){}//无参构造Per(){}//运算符重载const Per opera…

MyBatis注解开发实现学生管理页面(分页pagehelper,多条件搜索,查看课程信息)

pom.xml <?xml version"1.0" encoding"UTF-8"?> <project xmlns"http://maven.apache.org/POM/4.0.0"xmlns:xsi"http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation"http://maven.apache.org/POM/4.0.0 …

【论文阅读】面向抽取和理解基于Transformer的自动作文评分模型的隐式评价标准(实验结果部分)

方法 结果 在这一部分&#xff0c;我们展示对于每个模型比较的聚合的统计分析当涉及到计算特征和独立的特征组&#xff08;表格1&#xff09;&#xff0c;抽取功能组和对齐重要功能组&#xff08;表格2&#xff09;&#xff0c;并且最后&#xff0c;我们提供从模型比较&#x…

【前端工程化】配置React+ts企业级代码规范及样式格式和git提交规范

目录 前言 代码规范技术栈 创建react18vite2ts项目 editorconfig统一编辑器配置 prettier自动格式化代码 eslintlint-staged检测代码 使用tsc检测类型和报错 代码提交时使用husky检测代码语法规范 代码提交时使用husky检测commit备注规范 配置commitizen方便添加commi…

VMware centos7虚拟机修改静态IP

一、修改网络适配器 1、打开 2、使用管理员权限修改 3、按照图中步骤修改为 4、设置网关为10.0.0.2后保存即可 二、修改配置文件 1、输入下面代码进入修改&#xff08;网卡这里网卡名字为ens33&#xff0c;可使用ifcfig或ip a查看&#xff09; vi /etc/sysconfig/netwo…

【Java 进阶篇】CSS 选择器详解

CSS&#xff08;层叠样式表&#xff09;是一种用于描述网页上元素样式的语言。要想有效地使用CSS&#xff0c;了解CSS选择器是至关重要的&#xff0c;因为它们允许你选择要应用样式的HTML元素。在本文中&#xff0c;我们将详细介绍CSS选择器的各种类型和用法&#xff0c;以便你…

并不止于表面理论和简单示例——《Python数据科学项目实战》

Python 现在可以说是运用最广泛的编程语言之一&#xff0c;使用 Python 的人不只局限在计算机相关专业的从业者,很多来自金融领域、医疗领域以及其他我们无法想象的领域的人,每天都在使用 Python处理各种数据、使用机器学习进行预测以及完成各种有趣的工作。 长久以来&#xff…