使用MASA全家桶从零开始搭建IoT平台(三)管理设备的连接状态

news2025/1/23 5:01:23

文章目录

  • 前言
  • 分析
  • 方案1:遗嘱消息
    • 演示遗嘱消息的使用
    • 实施流程
  • 方案2:使用WebHook
    • 开启WebHook
    • 演示Webhook
    • 编写代码


前言

获取一个设备的在线和离线状态,是一个很关键的功能。我们对设备下发的控制指令,设备处于在线状态才能及时给我们反馈。这里的在线和离线,我们可以简单的理解为设备与MQTT的连接状态。

分析

我们打电话的时候经常能听到:"您拨打的用户已关机“和”用户不在服务区或暂时无法接通“,这两种的区别是什么?


1、当用户开机时,会自动向最近的移动基站注册,基站标记该用户为"attach"(在线)状态。
2、当用户关机时,手机会发起datach流程,告知基站自己关机了,基站标记该用户为"detach"(离线)状态。这样再次拨打就可以节省寻呼资源,直接提示用户关机。
3、当用户忽然进入无网络的环境,或者手机故障,导致来不及发起datach流程,基站还认为用户"在线",当有人拨打用户号码时,基站测会对用户进行寻呼,但是超时得不到回应后,就会提示"不在服务区"或者"暂时无法接通" 的语音。

其实这个方案在IoT上也是可行的,我们可以让设备在线和离线的过程中向特定Topic发送状态消息,但是存在问题,我们需要一个单独的Broker去订阅这个Topic,但是这个单独的Broker很容易成为单点故障点。而且如果设备数量很大,这种意外离线的设备也很难及时发现,需要下发指令后等待设备响应超时才能发现。

方案1:遗嘱消息

MQTT 遗嘱消息可以在客户端意外断线时将“遗嘱”优雅地发送给第三方订阅者,以实现离线通知、设备状态更新等业务。其中意外断线指客户端断开前未向服务器发送 DISCONNECT 消息,比如:

因网络故障或网络波动,设备在保持连接周期内未能通讯,连接被服务端关闭
设备意外掉电
设备尝试进行不被允许的操作而被服务端关闭连接,例如订阅自身权限以外的主题等
遗嘱消息在 MQTT 客户端向服务器端 CONNECT 请求时设置,可选属性包括是否发送遗嘱消息 (Will Message)标志,和遗嘱消息主题 (Topic) 与内容(Payload) 以及 Properties。

值得一提的,遗嘱消息发布的时间可能会有延迟:通常意外断线时,服务器无法立即检测到断线行为,需要通过连接保活心跳机制并经过一定周期后才会触发;MQTT 5.0 提供的遗嘱延迟间隔(Will Delay Interval)属性也会影响发布时间。

演示遗嘱消息的使用

我们使用A、B两台电脑使用MQTT X来演示。
我们在A电脑的 MQTT X 中新建一个名为 Test 的连接,Host 修改为 修改为我们的MQTT地址(192.120.5.204),并输入账号密码,在 Advanced 部分选择 MQTT Version 为 5.0,并且将 Session Expiry Interval 设置为 10,确保会话不会在遗嘱消息发布前过期。

然后在 Lass Will and Testament 部分将 Last-Will Topic 设置为 offline,Last-Will Payload 设置为 I’m offline,Will Delay Interval (s) 设置为 5。


完成以上设置后,我们点击右上角的 Connect 按钮以建立连接。

我们在B电脑的MQTTX中新建一个连接Sub,mqtt地址同样指向我们的mqtt服务器(192.120.5.204)

并订阅offline主题

我们用任务管理器直接结束A电脑的MQTTX进程,这是连接会被直接断开,模拟了设备断电的场景,在5s之后,在B电脑的MQTTX订阅中收到了一条内容为 I‘m offline 的遗嘱消息。

实施流程

1、设备遗嘱消息内容设置为offline,该遗嘱主题与一个普通发送状态的主题设定成同一个 {设备名称}/status。例如 284202304230001/status
2、当设备连接时,向主题 {设备名称}/status 发送内容为 online 的Retained消息,其它客户端订阅主题 {设备名称}/status 的时候,将获取到 Retained 消息为 online。

保留消息(Retain )
MQTT 服务端收到 Retain 标志为 1 的 PUBLISH 报文时,会将该报文视为保留消息,除了被正常转发以外, 保留消息会被存储在服务端,每个主题下只能存在一份保留消息,因此如果已经存在相同主题的保留消息,则该保留消息被替换。
当客户端建立订阅时,如果服务端存在主题匹配的保留消息,则这些保留消息将被立即发送给该客户端。 借助保留消息,新的订阅者能够立即获取最近的状态,而不需要等待无法预期的时间,这在很多场景下是非常重要的。
EMQX 默认开启保留消息的能力和服务,可以在 etc/emqx.conf 中修改 mqtt.retain_available 为 false 来关闭保留消息的能力, 这样客户端将被禁止发送 Retain 标志为 1 的 PUBLISH 报文,否则,客户端将会收到原因码为 0x9A(不支持保留消息)的 DISCONNECT 报文。
保留消息的服务会存储和管理客户端发送的保留消息,并发送给相应的订阅者。

3、当客户端异常断开时,系统自动向主题 {设备名称}/status 发送内容为 offline 的消息,其它订阅了此主题的客户端会马上收到 offline 消息;如果遗嘱消息设置了 Will Retain,那么此时如果有新的订阅 A/status 主题的客户端上线,也将获取到内容为 offline 的遗嘱消息。

方案2:使用WebHook

方案1需要设备主动设置遗嘱消息才能实现,那么有没有更简单的方式,直接通过设备与Mqtt的连接事件来获取连接状态呢。
EMQX 设计了一套WebHook系统,可以通过这个自带的WebHook系统获取内部的事件并进行处理。WebHook的原理很简单,当设备与mqtt建立连接或者断开连接时,EMQX会把事件的信息通过我们的配置调用特定的URL上的接口,实现通知。
使用WebHook还可以有限避免单点故障。所以本项目会采用WebHook的方式来实现对设备在线和离线的管理。

开启WebHook

数据集成 -> 数据桥接 中创建一个Webhook

名称设置为ConnectedEvent,URL 中填写我们的Webhook地址,也就是触发事件之后的调用接口地址,这里我们填:

http://192.120.5.204:5000/api/Device/ConnectedEvent

请求方式为Post,其他内容保持默认不变

这里注意URL可以通过${field}的方式拼接,请求体也可以自己指定,如果留空会原样转发消息,我们这里请求体留空
设备在线和离线的事件转发的消息格式如下

{
	"username": "284202304230001",
	"timestamp": 1682652598840,
	"sockname": "172.17.0.5:1883",
	"receive_maximum": 32,
	"proto_ver": 5,
	"proto_name": "MQTT",
	"peername": "172.17.0.1:48524",
	"node": "emqx@172.17.0.5",
	"mountpoint": "undefined",
	"metadata": {
		"rule_id": "rule_3hsx"
	},
	"keepalive": 60,
	"is_bridge": false,
	"expiry_interval": 10,
	"event": "client.connected",
	"connected_at": 1682652598840,
	"conn_props": {
		"User-Property": {},
		"Session-Expiry-Interval": 10
	},
	"clientid": "mqttx_c4491df0",
	"clean_start": false
}

我们点击 创建 ,并继续点击 创建规则

我们在创建规则中指定新的规则名称 rule_client_connected,并在SQL编辑器复制以下内容

SELECT
  *
FROM
  "$events/client_connected",
  "$events/client_disconnected"

在右侧的事件中,我们可以看到所有可用的事件,我们选择了连接和断开两个事件,在这两个事件触发时会通过Webhook调用我们配置的接口,这样我们就能获取到设备的在线、离线状态了。

我们点击 创建按钮 完成规则的创建
我们可以看见我们创建好的规则

点击规则ID,还可以看到统计数据

在FLows中还可以看到整个工作流程

演示Webhook

我们使用MQTTX模拟一次设备连接和断开动作,可以在规则统计界面看到我们的操作已经被记录。

编写代码

我们这里采用方案2。
我们需要实现之前配置的ConnectedEvent接口

    /// <summary>
    /// 连接事件请求
    /// </summary>
    public class ConnectedEventRequest
    {
        /// <summary>
        /// 设备名称
        /// </summary>
        public string Username { get; set; }
        /// <summary>
        /// 时间戳
        /// </summary>
        public long Timestamp { get; set; }

        /// <summary>
        /// 事件(连接/断开)
        /// </summary>
        public string Event { get; set; }
        /// <summary>
        /// 连接时间(断开事件中为0)
        /// </summary>
        public long Connected_at { get; set; }

        /// <summary>
        /// Client ID
        /// </summary>
        public string Clientid { get; set; }
    }
        /// <summary>
        /// 更新设备在线状态
        /// </summary>
        /// <param name="deviceName"></param>
        /// <param name="onlineStatus"></param>
        /// <returns></returns>
        public async Task UpdateDeviceOnlineStatusAsync(string deviceName, OnLineStates onlineStatus)
        {
            var device = await _ioTDbContext.IoTDeviceInfo.Include(o => o.IoTDeviceExtend).AsNoTracking()
                .FirstOrDefaultAsync(o => o.DeviceName == deviceName);
            if (device == null)
            {
                return;
            }
            else
            {
                if (device.IoTDeviceExtend == null) //扩展表为空
                {
                    device.IoTDeviceExtend = new IoTDeviceExtend
                    {
                        DeviceInfoId = device.Id,
                        OnLineStates = (int)onlineStatus,
                    };
                    _ioTDbContext.Attach(device.IoTDeviceExtend);
                    
                    _ioTDbContext.Entry(device.IoTDeviceExtend).State = EntityState.Added;
                    _ioTDbContext.Entry(device.IoTDeviceExtend).Property(o => o.OnLineStates).IsModified = true;
                    await _ioTDbContext.SaveChangesAsync();
                }
                if (device.IoTDeviceExtend.OnLineStates != (int)onlineStatus)         //在线状态不一致
                {
                    device.IoTDeviceExtend.OnLineStates = (int)onlineStatus;

                    _ioTDbContext.Attach(device.IoTDeviceExtend);
                    //防止更新其他字段
                    _ioTDbContext.Entry(device.IoTDeviceExtend).State = EntityState.Unchanged;
                    _ioTDbContext.Entry(device.IoTDeviceExtend).Property(o => o.OnLineStates).IsModified = true;
                    await _ioTDbContext.SaveChangesAsync();
                }
            }
        }

我们根据Event中的内容来判断是 连接(client.connected)/断开(client.disconnected) 的事件

        /// <summary>
        /// 连接、断开事件
        /// </summary>
        /// <param name="request"></param>
        /// <returns></returns>
        [HttpPost]
        public async Task ConnectedEventAsync([FromBody] ConnectedEventRequest request)
        {
            var onlineStatus = request.Event switch
            {
                "client.connected" => OnLineStates.OnLine,
                _ => OnLineStates.OffLine
            };

            await _deviceHandler.UpdateDeviceOnlineStatusAsync(request.Username, onlineStatus);
        }

#总结
以上就是本文要讲的内容,我们可以通过MQTTX来测试我们的代码有效性。
该方案还存在部分缺点,例如:
1、每次设备上下线会导致频繁的请求接口,在大量设备接入的场景中需要考虑接口性能。
2、由于网络等问题,Web调用顺序可能不能完全保证,也许离线会比在线事件更早处理,从而导致状态不一致。我们后面会尝试用其他方案来替代WebHook,尝试解决上述问题,在此之前我们都会继续使用WebHook进行功能演示。

完整代码在这里:https://github.com/sunday866/MASA.IoT-Training-Demos

如果你对我们的 MASA 感兴趣,无论是代码贡献、使用、提 Issue,欢迎联系我们

WeChat:MasaStackTechOps
QQ:7424099

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/508730.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

SOLIDWORKS钣金折弯参数设置技巧

折弯系数早期是没有计算方法的&#xff0c;工厂都是根据实际经验确定下来的经验公式。 记录下来一个经验数据表或简单的经验公式。后来才出现的中性层概念&#xff0c;即既不伸长也不压缩的那一层为中性层。可以用来计算展开长度。SOLIDWORKS钣金折弯参数也是整合了所有的计算…

【C++】类和对象(中)---构造函数和析构函数

个人主页&#xff1a;平行线也会相交&#x1f4aa; 欢迎 点赞&#x1f44d; 收藏✨ 留言✉ 加关注&#x1f493;本文由 平行线也会相交 原创 收录于专栏【C之路】&#x1f48c; 本专栏旨在记录C的学习路线&#xff0c;望对大家有所帮助&#x1f647;‍ 希望我们一起努力、成长&…

【AI大模型】讯飞星火大模型能否超越chatgpt?我们拭目以待!

文章目录 前言你有使用过这种对话式AI吗&#xff1f;你对这类型AI有什么看法或感受&#xff1f;对于“讯飞星火大模型将超越chatgpt&#xff1f;”这个命题你的态度是什么&#xff1f;简要说说原因你认为这类型的人工智能对于现在的社会有哪些意义&#xff1f;对于这类型的人工…

GEE开发之MODIS_MCD12Q1数据分析和获取

GEE开发之MODIS_土地类型分类 0.MCD12Q1介绍1.遥感影像查看2.MCD12Q1分类介绍3.年数据下载&#xff08;LC_Type1/year/500m&#xff09; 前言&#xff1a;主要介绍MODIS的MCD12Q1产品&#xff0c;包括MCD12Q1产品的介绍、使用和下载。 0.MCD12Q1介绍 Terra 和 Aqua 组合的中分…

AI 大底座,大模型时代的答卷

文心一言的诞生 “文心一言就是在这个全国 AI 领域规模最大的高性能 GPU 集群上完成训练的。” 早在 2021 年 6 月&#xff0c;为了满足未来的大模型训练任务&#xff0c;百度智能云开始规划全新的高性能 GPU 集群的建设&#xff0c;联合 NVIDIA 共同完成了可以容纳万卡以上规…

华人再次突破芯片技术瓶颈,然而成果归美芯,中国芯可能输了?

外媒报道指华人研发团队已突破芯片的极限&#xff0c;晶体管厚度只有三个原子那么大&#xff0c;达到0.3纳米&#xff0c;打破了此前芯片业界认为的1纳米是硅基芯片技术极限的概念&#xff0c;然而这项技术却是归美国所有。 这次取得技术突破的研发团队是麻省理工研究所&#x…

云服务器安装宝塔Linux面板命令脚本大全

阿里云服务器安装宝塔Linux面板&#xff0c;操作系统不同安装命令脚本也不同&#xff0c;支持CentOS、Alibaba Cloud Linux、Ubuntu/Deepin等Linux系统&#xff0c;阿里云服务器网分享阿里云服务器安装宝塔Linux面板命令脚本大全&#xff1a; 云服务器安装宝塔Linux面板命令 …

无公网IP,公网SSH远程访问家中的树莓派

文章目录 前言如何通过 SSH 连接到树莓派步骤1. 在 Raspberry Pi 上启用 SSH步骤2. 查找树莓派的 IP 地址步骤3. SSH 到你的树莓派步骤 4. 在任何地点访问家中的树莓派4.1 安装 Cpolar内网穿透4.2 cpolar进行token认证4.3 配置cpolar服务开机自启动4.4 查看映射到公网的隧道地址…

多时相遥感深度学习作物提取方法综述(万字长文)

一、 引言 本人研究的方向是遥感作物分布提取,想基于多时相、深度学习和GEE平台展开研究,现对相关文献进行资料调研。深度学习和遥感原本属于2个研究方向,现在已经有越来越多的研究开始将深度学习算法应用到遥感领域。 传统遥感研究人员重视遥感数据本身,善于分析作物生长…

Vector - CAPL - CANoe硬件配置函数 - 03

目录 canFlushTxQueue -- 刷新已定义的Tx队列 代码示例 canSetChannelAcc -- CANoe接收过滤器设置 代码示例 canSetChannelMode -- CAN控制器Tx使能/失能 代码示例 canSetChannelOutput -- Ack自应答使能/失能 代码示例 getCardTypeEx -- CAN控制器类型 canFlushTxQue…

Node.js 与 TypeScript

目录 1、什么是 TypeScript 2、运行TypeScript 3、TypeScript 在Node.js 生态中的情况 1、什么是 TypeScript TypeScript是一种流行的开源语言&#xff0c;由微软维护和开发。它受到了世界各地许多软件开发人员的喜爱和使用。 基本上&#xff0c;它是JavaScript的超集&…

信号完整性基础03:反射与阻抗匹配(1)

说在开头&#xff1a;关于“惰性气体” 英国剑桥大学当时的校长是&#xff1a;威廉.卡文迪许公爵&#xff0c;他的祖上有一位“科学怪人”&#xff1a;亨利.卡文迪许&#xff0c;他一辈子深居简出&#xff0c;淡泊名利&#xff0c;从不靠刷论文来体现自己的学术水平&#xff0…

VMware NSX Advanced Load Balancer (NSX ALB) 22.1.3 - 负载均衡平台

请访问原文链接&#xff1a;https://sysin.org/blog/vmware-nsx-alb-22/&#xff0c;查看最新版。原创作品&#xff0c;转载请保留出处。 作者主页&#xff1a;sysin.org 负载均衡平台 NSX Advanced Load Balancer NSX Advanced Load Balancer (Avi) 可简化应用交付&#xff…

关于儿童急性感染性腹泻

腹泻是一种常见的症状&#xff0c;可导致儿童生长发育迟滞和营养不良。根据世界卫生组织&#xff08;WHO&#xff09;发布的数据显示&#xff0c;急性腹泻在5岁以下儿童死亡原因中排第二位&#xff0c;仅次于肺炎。引起儿童腹泻的原因包括感染和非感染因素&#xff0c;后者主要…

C语言CRC-16 XMODEM格式校验函数

C语言CRC-16 XMODEM格式校验函数 CRC-16校验产生2个字节长度的数据校验码&#xff0c;通过计算得到的校验码和获得的校验码比较&#xff0c;用于验证获得的数据的正确性。基本的CRC-16校验算法实现&#xff0c;参考&#xff1a; C语言标准CRC-16校验函数。 不同应用规范通过对…

一文快速入门体验 Hibernate

前言 Hibernate 是一个优秀的持久层的框架&#xff0c;当然&#xff0c;虽然现在说用得比较多的是 MyBaits&#xff0c;但是我工作中也不得不接触 Hibernate&#xff0c;特别是一些老项目需要你维护的时候。所以&#xff0c;在此写下这篇文章&#xff0c;方便自己回顾&#xf…

vue+Nodejs+Koa搭建前后端系统(五)--Nodejs中使用数据库

连接数据库 1.开启mysql服务 以管理员身份运行cmd&#xff0c;输入&#xff1a; net start mysql2.登录 root用户、创建新用户、赋予新用户权限 如果你用root用户作为node的连接用户&#xff0c;这一步可以略过。 &#xff08;1&#xff09;登录root&#xff1a; mysql -…

多功能文档应用程序Codex Docs

什么是 Codex Docs &#xff1f; CodeX Docs 是一个简单的免费应用程序&#xff0c;适用于您的内部、公共或个人文档。它基于Editor.js&#xff0c;允许使用漂亮干净的 UI 处理内容。 官方提供了演示站点&#xff1a;https://docs-demo.codex.so/about-this-demo 安装 在群晖…

CIAA 网络安全模型 — 数据传输安全

目录 文章目录 目录网络传输 CIAA 安全模型机密性&#xff08;Confidentiality&#xff09;对称加密非对称加密混合加密 完整性&#xff08;Integrity&#xff09;L2 数据链路层的 CRC 强校验L3 网络层的 Checksum 弱校验L4 传输层的 Checksum 弱校验安全层的 Checksum 强校验 …

解决:component COMDLG32.OCX or one of…和 MSCOMCTL.OCX or one of...的解决方法

遇到的问题&#xff1a; 在做CTF题目 使用16进制转图片工具 出现了两个报错&#xff01; 解决方法&#xff1a; 第一步&#xff1a;下载COMDLG32.OCX 程序&#xff08;可以去官网&#xff09;也可也使用我的百度网盘 http://链接&#xff1a;https://pan.baidu.com/s/1-1KNg…