MQTT 持久会话与 Clean Session

news2025/1/22 21:54:29

1. 会话(session)

我们将从客户端向服务端发起 MQTT 连接请求开始,到连接中断直到会话过期为止的消息收发序列称之为会话。会话是服务端和客户端的一个连接,进行消息交互前必须先建立会话。

2. 会话的生命周期

MQTT v3.1.1会话的生命周期由CONNECT报文里的Clean Session标志位和控制:

  • 为 1 表示客户端和服务器必须丢弃任何先前的会话并创建一个新的会话,且这个会话的生命周期与网络连接保持一致;
  • 为 0 则表示服务端必须使用与 Client ID 关联的会话来恢复与客户端的通信(除非会话不存在),客户端和服务器在断开连接后必须存储会话的状态。

MQTT v3.1.1 没有规定持久会话应该在什么时候过期,如果仅从协议层面理解的话,这个持久会话应该永久存在。但在实际场景中这并不现实,因为它会非常占用服务端的资源,所以服务端通常不会遵循协议来实现,而是向用户提供一个全局配置来限制会话过期时间。

而到了 MQTT 5.0,这个问题得到了妥善的解决,Clean Session 字段被拆分成了 Clean Start 字段与 Session Expiry Interval 字段。Clean Start 字段指定是否需要全新的会话,Session Expiry Interval 字段指定会话过期时间,它们在连接时指定,但 Session Expiry Interval 字段可以在客户端断开连接时被更新。因此我们可以很轻易地实现客户端网络连接异常断开时会话被保留,客户端正常下线时会话则随着连接关闭而终结的功能。

客户端如何知道这是被恢复的会话?

显而易见的是,当客户端以期望从先前建立的会话恢复状态的方式发起连接,它需要知道服务端是否存在相应的会话,才能决定在连接建立后是否需要重复一遍订阅操作。关于这一点,MQTT 协议从 v3.1.1 开始,就为 CONNACK 报文设计了 Session Present 字段,用于表示当前连接使用的是否是一个全新会话,客户端可以根据这个字段的值进行判断。

3. 会话的标识 Client ID

Client ID是用来识别不同会话的唯一标识。认证时使用的用户名(User Name)和密码(Password)则是完全独立的。两个会话可以使用相同的用户名,也可以使用不同的用户名。

注:如果两个会话使用相同的Client ID,那么后一个会话CONNECT时会导致前一个会话的网络连接断开。

Client ID可以设为空,表明本次会话是匿名的临时访问,网络断开时会话立即结束。匿名访问只允许Clean Session = 1

MQTT 持久会话

不稳定的网络及有限的硬件资源是物联网应用需要面对的两大难题,MQTT 客户端与服务器的连接可能随时会因为网络波动及资源限制而异常断开。为了解决网络连接断开对通信造成的影响,MQTT 协议提供了持久会话功能。

MQTT 客户端在发起到服务器的连接时,可以设置是否创建一个持久会话。持久会话会保存一些重要的数据,以使会话能在多个网络连接中继续。持久会话主要有以下三个作用:

  • 避免因网络中断导致需要反复订阅带来的额外开销。

  • 避免错过离线期间的消息。

  • 确保 QoS 1 和 QoS 2 的消息质量保证不被网络中断影响。

持久会话需要存储哪些数据?

在协议规范中,QoS 1 和 QoS 2 消息首先会在客户端与 Broker 存储起来,在最终确认抵达订阅端后才会被删除,此过程需要 Broker 将状态与客户端相关联,这称为会话状态。除了消息存储外,订阅信息(客户端订阅的主题列表)也是会话状态的一部分。

客户端中的会话状态包括:

  • 已发送到服务器,但尚未完全确认的 QoS 1 和 QoS 2 消息
  • 已从服务器收到但尚未完全确认的 QoS 2 消息

服务器中的会话状态包括:

  • 会话的存在状态,即使会话为空
  • 客户订阅信息
  • 已发送到客户端,但尚未完全确认的 QoS 1 和 QoS 2 消息
  • 等待传输到客户端的 QoS 0(可选)、QoS 1 和 QoS 2 消息
  • 已从客户端收到但尚未完全确认的 QoS 2 消息,Will Message(遗嘱消息)和 Will Delay Interval(遗嘱延时间隔

保留消息

什么是 MQTT 保留消息?

发布者发布消息时,如果 Retained 标记被设置为 true,则该消息即是 MQTT 中的保留消息(Retained Message)。MQTT 服务器会为每个主题存储最新一条保留消息,以方便消息发布后才上线的客户端在订阅主题时仍可以接收到该消息。

如下图,当客户端订阅主题时,如果服务端存在该主题匹配的保留消息,则该保留消息将被立即发送给该客户端。

注意:保留消息虽然存储在服务端中,但它并不属于会话的一部分。也就是说,即便发布这个保留消息的会话终结,保留消息也不会被删除。不管“clean_session”标志如何设置,保留消息都被保留。

保留消息的常见应用场景:

  • 传感器上报数据的间隔太长,但是订阅者需要在订阅后立即获取到最新的数据;
  • 传感器的版本号、序列号等不会经常变更的属性,可在上线后发布一条保留消息告知后续的所有订阅者。

保留消息的使用方法:

1、发布保留消息

在发布保留消息时,MQTT设备需要将PUBLISH报文中retainFlag设置为true

2、修改保留消息

每个主题下只能存在一份保留消息,因此如果已经存在相同主题的保留消息,则该保留消息被替换。

’retain’标志设置为0的消息不会替换相同主题的保留消息,只有保留的消息替换保留的消息

3、删除保留消息

服务器只会为每个主题保存最新一条保留消息,保留消息的保存时间与服务器的设置有关。若服务器设置保留消息存储在内存,则 MQTT 服务器重启后消息即会丢失;若存储在磁盘,则服务器重启后保留消息仍然存在。

保留消息虽然存储在服务端中,但它并不属于会话的一部分。也就是说,即便发布这个保留消息的会话已结束,保留消息也不会被删除。删除保留消息有以下几种方式:

  • 客户端往某个主题发送一个 Payload 为空的保留消息,服务端就会删除这个主题下的保留消息;如果服务器接收到终端publish某主题的消息,payload为空且retain值是false,则不会删除这条持久化的消息。
  • 在 MQTT 服务器上删除,比如 EMQX MQTT 服务器提供了在 Dashboard 上删除保留消息的功能;
  • MQTT 5.0 新增了消息过期间隔属性,发布时可使用该属性设置消息的过期时间,不管消息是否为保留消息,都将会在过期时间后自动被删除。

4、如何判断一条消息是否是保留消息?

当客户端订阅了有保留消息的主题后,即会收到该主题的保留消息,可通过消息中的保留标志位判断是否是保留消息。需要注意的是,在保留消息发布前订阅主题,将不会收到保留消息。需要待保留消息发布后,重新订阅该主题,才会收到保留消息。

如下图,我们先订阅主题 sensor/t2,然后向该主题发布一条保留消息,该订阅会立即收到一条消息,但是该消息并不是保留消息。当我们删除该订阅,再次重新订阅 sensor/t2 主题时,立即收到了刚刚发布的保留消息。

ref:

保留消息 | EMQX 5.0 文档

MQTT 保留消息是什么?如何使用? | EMQ

持久性发布设置

解释:publish不成功,一直发布,直到断开连接

  • 开启条件:使用MQTTAsync_createWithOptions而不是MQTTAsync_create来创建客户端对象。将MQTTAsync_createOptions::sendWhileDisconnected设置为非零,并设置MQTTAsync_createOptions::maxBufferedMessages(默认值100)。
  • 可以调用MQTTAsync_getPendingTokens来返回等待发送的消息的id,或者返回发送过程尚未完成的消息的id。

 Publish While Disconnected

This feature was not originally available because with persistence enabled, messages could be stored locally without ever knowing if they could be sent. The client application could have created the client with an erroneous broker address or port for instance.

To enable messages to be published when the application is disconnected MQTTAsync_createWithOptions must be used instead of MQTTAsync_create to create the client object. The ::createOptions field sendWhileDisconnected must be set to non-zero, and the maxBufferedMessages field set as required - the default being 100.

MQTTAsync_getPendingTokens can be called to return the ids of the messages waiting to be sent, or for which the sending process has not completed.

/** Options for the ::MQTTAsync_createWithOptions call */
typedef struct
{
	/** The eyecatcher for this structure.  must be MQCO. */
	char struct_id[4];
	/** The version number of this structure.  Must be 0, 1, 2 or 3
	 * 0 means no MQTTVersion
	 * 1 means no allowDisconnectedSendAtAnyTime, deleteOldestMessages, restoreMessages
	 * 2 means no persistQoS0
	 */
	int struct_version;
	/** Whether to allow messages to be sent when the client library is not connected. */
	int sendWhileDisconnected;
	/** The maximum number of messages allowed to be buffered. This is intended to be used to
	 * limit the number of messages queued while the client is not connected. It also applies
	 * when the client is connected, however, so has to be greater than 0. */
	int maxBufferedMessages;
	/** Whether the MQTT version is 3.1, 3.1.1, or 5.  To use V5, this must be set.
	 *  MQTT V5 has to be chosen here, because during the create call the message persistence
	 *  is initialized, and we want to know whether the format of any persisted messages
	 *  is appropriate for the MQTT version we are going to connect with.  Selecting 3.1 or
	 *  3.1.1 and attempting to read 5.0 persisted messages will result in an error on create.  */
	int MQTTVersion;
	/**
	 * Allow sending of messages while disconnected before a first successful connect.
	 */
	int allowDisconnectedSendAtAnyTime;
	/*
	 * When the maximum number of buffered messages is reached, delete the oldest rather than the newest.
	 */
	int deleteOldestMessages;
	/*
	 * Restore messages from persistence on create - or clear it.
	 */
	int restoreMessages;
	/*
	 * Persist QoS0 publish commands - an option to not persist them.
	 */
	int persistQoS0;
} MQTTAsync_createOptions;

#define MQTTAsync_createOptions_initializer  { {'M', 'Q', 'C', 'O'}, 2, 0, 100, MQTTVERSION_DEFAULT, 0, 0, 1, 1}

#define MQTTAsync_createOptions_initializer5 { {'M', 'Q', 'C', 'O'}, 2, 0, 100, MQTTVERSION_5, 0, 0, 1, 1}


LIBMQTT_API int MQTTAsync_createWithOptions(MQTTAsync* handle, const char* serverURI, const char* clientId,
		int persistence_type, void* persistence_context, MQTTAsync_createOptions* options);

ref:

Paho Asynchronous MQTT C Client Library: Publish While Disconnected

paho.mqtt.c/MQTTAsync.h at 0995176412616ae8f1cba51c8d609fe4b69687ea · eclipse/paho.mqtt.c · GitHub

MQTT 持久会话与 Clean Session 详解 | EMQ

会话与消息过期 | EMQX 5.0 文档

MQTT协议学习(三):会话(session) - 简书

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/418262.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

003_螺旋矩阵

力扣54和59题 54.顺时针打印矩阵 题目: 思路:将矩阵分为若干层,首先打印最外层的元素,然后一直往里打印 对于每层,从左上方开始以顺时针的顺序遍历所有元素。假设当前层的左上角位于(top,left),右下角位于…

Axios请求(对于ajax的二次封装)——Axios取消请求、请求体编码

Axios请求(对于ajax的二次封装)——Axios取消请求、请求体编码知识回调(不懂就看这儿!)场景复现核心干货axios取消请求AbortControllerCancelToken deprecated请求体编码浏览器qs库编码数据ES6库方法node.jsQuery stri…

【神经网络】tensorflow实验3--NumPy科学计算库

目录 1. 实验目的 2. 实验内容 3. 实验过程 题目一: ① 代码 ② 实验结果 题目二: ① 代码 ② 实验结果 题目三: ​编辑 ① 代码 ② 实验结果 5. 实验小结 ① 实验过程中遇到了哪些问题,你是如何解决的? …

Android引入Apollo(阿波罗)

程序猿日常 记Android项目引入Apollo(阿波罗)上源码 apollo开发分支 应用 Apollo(阿波罗)客户端会管理好应用的后台GraphQL数据 之前网络请求使用RetrofitOkHttp 改成使用ApolloOkHttp 引入 1.对应的module的build.gradle中添加 id("com.apollographql.apollo3&qu…

【计算机系统概论Yale.patt】第一章

文章目录1. 计算机是简单部件的系统组合1.1 计算机组成1.1.1 编码体系1.1.2 晶体管构建微处理器1.1.3 冯诺依曼机1.1.4 LC-3机(冯诺依曼机实现)1.1.5 LC-3编程机器语言编程汇编语言编程输入输出信息问题两个重要机制栈和数据转换示例:计算器1.2 两个重要理念1.2.1 抽…

Linux基础篇(三)常见指令

目录 一、创建文件和目录 二、命令详解 0. 命令和选项 1. ls命令 2. cd命令 3. touch命令 4. mkdir命令 5. tree命令 6. rmdir命令 7. rm命令 8. man 9. nano 10. cat命令 11. cp 命令 12. mv 命令 13. echo命令 14. more命令 15. less命令 16. Ctrl C 17. head 命令 18. tail…

台灯的种类有哪些?国内热门护眼灯品牌推荐

台灯是我们日常生活中常见的电器之一,台灯不仅可以为人们照明,还可以用来家居装饰,根据人们不用的需求,台灯的种类也很多,有书房台灯、读写台灯、工艺台灯。 书房台灯:灯光的局部照明效果,以书写…

禅道OpenAI更新至1.2版本,超多实用功能惊喜上线!

广受欢迎的禅道OpenAI插件近日成功发布,截至目前已更新至1.2版本。 截至本版本发布,禅道OpenAI已经拥有了神奇海螺(ChatGPT聊天)、需求润色、任务润色、Bug润色及本次的需求一键生成用例功能,仍有更多实用的新功能正在…

Sentinel 工作主流程

Overview 在 Sentinel 里面,所有的资源都对应一个资源名称以及一个 Entry。Entry 可以通过对主流框架的适配自动创建,也可以通过注解的方式或调用 API 显式创建;每一个 Entry 创建的时候,同时也会创建一系列功能插槽(…

2023年第1季社区Task挑战赛开启,等你来战!

社区Task挑战赛是面向社区开发者开展的代码或教程征集活动。该挑战赛为社区中热爱FISCO BCOS及周边组件的开发者提供了探索区块链技术、挑战技术难题的舞台。该挑战赛去年在社区成功举办了3季,共吸引了数百名开发者报名。 前3季都有哪些有趣的作品? 在…

【图数据挖掘】— 子图同构问题、单射函数和双射函数、同构(isomorphic)和同态(homomorphism)

子图同构问题 子图同构(Subgraph Isomorphism)是指在图论中,两个图之间是否存在一种关系,使得其中一个图的顶点集合和边集合可以通过对应的方式映射到另一个图的顶点集合和边集合上,且保持原来的边和顶点的关系不变。…

如何设计一个安全的对外接口?

对外接口安全措施的作用主要体现在两个方面,一方面是如何保证数据在传输过程中的安全性,另一方面是数据已经到达服务器端,服务器端如何识别数据。 1. 数据加密 数据在传输过程中是很容易被抓包的,如果直接传输,数据可…

elasticsearch 核心概念

1.近实时(Near Real Time,NRT) elasticsearch 是一个近实时的搜索和分析平台,这意味着从索引文档到可搜索文档都会有一段微小的延迟(通常是1s以内)。这种延迟主要是因为 elasticsearch 需要进行数据刷新和索引更新。 …

远程代码执行渗透与防御

远程代码执行渗透与防御1.简介2.PHP RCE常见函数3.靶场练习4.防御姿势1.简介 远程代码执行漏洞又叫命令注入漏洞 命令注入是一种攻击,其目标是通过易受攻击的应用程序在主机操作系统上执行任意命令。 当应用程序将不安全的用户提供的数据(表单、cookie…

jQuery 基础入门速成上篇

jQuery 是目前使用最广泛的 javascript 函数库。提到 jQuery 你可能知道这句英文 : ———— Write Less,Do More ( 写的少,做的多 ) 引入jQuery jQuery是一个函数库,一个 js 文件,页面可以使用 script标签 引入使用&a…

JVM 类加载器

文章目录1 类加载器1.1 类加载器介绍1.2 类加载器的加载规则1.2 类加载器类型总结2 双亲委派模型2.1 双亲委派模型介绍2.2 双亲委派模型的执行流程2.3 双亲委派模型的好处回顾一下类加载过程:加载->连接->初始化。 其中连接又分为:验证->准备-&…

指针太难?手把手教你理解指针(传参、函数指针)

目录 前言 一、数组和指针的参数 1.一维数组传参 2.二维数组传参 3.一级指针传参 4.二级指针传参 二、函数指针 1.函数的地址 2.函数指针的形式 3.函数指针的使用 三、加深理解,两段有趣的代码 前言 之前的一篇文章讲到了指针的概念、指针和数组的关系&am…

【机器学习】随机森林预测泰坦尼克号生还概率

目录 前言: 【一】数据清洗及可视化 介绍 知识点 环境准备 数据特征介绍 检查数据 相关系数 缺失值 偏态分布 数值化和标准化 离群点 实验总结一 【二】分类模型训练及评价 介绍 环境准备 模型评估 模型选择 性能度量 实验总结二 【三】随机森…

机器学习入门(全连接神经网络-1)

机器学习入门(全连接神经网络-1) 目录 机器学习入门(全连接神经网络-1)一、神经元简介1.概念2.例子二、常见的激活函数从神经元开始进行讲述,从零开始搭建全连接神经网络。 一、神经元简介 1.概念 神经元是神经网络的基本组成单位。 神经元接受输入,对它们进行一些数学运…

自主机器人运动规划|地图相关概念总结

自主机器人运动规划|地图相关概念总结地图表示占用栅格地图八叉树地图(Octo-map)Voxel hashing(哈希表地图)点云地图TSDF mapESDF map地图表示 地图分成两个模块: 地图装在数据的数据结构地图信息融合方法 占用栅格地图 使用最为广范的是 …