一、保留消息简介
官方文档:https://www.emqx.io/docs/zh/v5.1/messaging/mqtt-retained-message.html
1、为什么需要保留消息
不考虑持久会话的因素,MQTT订阅只有在客户端连接之后才能创建主题。所以当消息到达服务端之后,服务端只会把消息分发给当前已经存在的订阅者,分发完成消息就会从服务端中删除。
假设下面两个场景:
- 如果当前没有任何订阅者,消息就会立即丢弃。
- 如果订阅端在消息到达服务端之后才上线订阅该主题,此时会错过这个消息。
为了解决该场景的问题,MQTT提供了保留消息的功能。
2、什么是 MQTT保留消息
MQTT 客户端向服务器发布 (PUBLISH) 消息时,可以设置保留消息 (Retained Messages) 标志,默认为false。
- 如果 Retained 标记被设置为 true,则该消息即是 MQTT 中的保留消息(Retained Message)。
- 如果 Retained 标记被设置为 false,则该消息就是普通消息。
每个主题下只能存在一份保留消息,因此如果已经存在相同主题的保留消息,则该保留消息被替换。
即 MQTT 服务器会为每个主题存储最新一条保留消息,以方便消息发布后才上线的客户端在订阅主题时仍可以接收到该消息。
3、如何判断一条消息是否是保留消息
当客户端订阅了有保留消息的主题后,即会收到该主题的保留消息,可通过消息中的保留标志位判断是否是保留消息。
需要注意:
- 在保留消息发布前订阅主题,将不会收到保留消息,而会立即收到一条消息(普通消息)。
- 订阅主题断开或者删除,在保留消息发布后,重新订阅该主题,才会收到保留消息(标志值为true)。
通过 MQTTX工具模拟发布保留消息-官方演示:https://www.emqx.io/docs/zh/latest/messaging/mqtt-retained-message.html
我们可以通过 EMQX Dashboard 界面查保留消息。
4、保留消息将保存多久?如何删除?
服务器只会为每个主题保存最新一条保留消息,保留消息的保存时间与服务器的设置有关。
- 若服务器设置保留消息存储在内存,则 MQTT 服务器重启后消息即会丢失;
- 若存储在磁盘,则服务器重启后保留消息仍然存在。
保留消息虽然存储在服务端中,但它并不属于会话的一部分。也就是说,即便发布这个保留消息的会话已结束,保留消息也不会被删除。
删除保留消息有以下几种方式:
- 客户端往某个主题发送一个 Payload 为空的保留消息,服务端就会删除这个主题下的保留消息;
- 在 MQTT 服务器上删除,比如 EMQX MQTT 服务器提供了在 Dashboard 上删除保留消息的功能;
- MQTT 5.0 新增了消息过期间隔属性,发布时可使用该属性设置消息的过期时间,不管消息是否为保留消息,都将会在过期时间后自动被删除。
下图是在 Dashboard 上操作和查看保留信息界面。
二、Java操作保留消息
在前面文章的基础上,发布保留消息。
SpringBoot整合MQTT(MqttClient):https://blog.csdn.net/qq_42402854/article/details/132791347
1、发布保留消息
只需要在发布消息时,设置保留消息标志就可以了。
/**
* 发布保持消息
*
* @param pushMessage
* @param topic
*/
public void publishRetainedMsg(String pushMessage, String topic) {
publish(pushMessage, topic, 1, true);
}
/**
* 发布消息
*
* @param pushMessage
* @param topic
* @param qos
* @param retained:留存
*/
public void publish(String pushMessage, String topic, int qos, boolean retained) {
MqttMessage message = new MqttMessage();
message.setPayload(pushMessage.getBytes());
message.setQos(qos);
message.setRetained(retained);
MqttTopic mqttTopic = MyMqttClient.getClient().getTopic(topic);
if (null == mqttTopic) {
log.error("== MyMqttClient ==> topic is not exist");
}
MqttDeliveryToken token;//Delivery:配送
synchronized (this) {//注意:这里一定要同步,否则,在多线程publish的情况下,线程会发生死锁,
try {
token = mqttTopic.publish(message);//也是发送到执行队列中,等待执行线程执行,将消息发送到消息中间件
token.waitForCompletion(1000L);
} catch (MqttPersistenceException e) {
e.printStackTrace();
} catch (MqttException e) {
e.printStackTrace();
}
}
}
业务这边直接使用它发布即可。
@Override
public void publishRetainedMsg(String msgContent, String topic) {
//MyXxxMqttMsg 转Json
MyXxxMqttMsg myXxxMqttMsg = new MyXxxMqttMsg();
myXxxMqttMsg.setContent(msgContent);
myXxxMqttMsg.setTimestamp(System.currentTimeMillis());
// TODO Md5值
myXxxMqttMsg.setMd5(UUID.randomUUID().toString());
String msgJson = JSON.toJSONString(myXxxMqttMsg);
//发布保留消息
myMqttClient.publishRetainedMsg(msgJson, topic);
}
2、测试发布
代码日志截图:
MQTTX截图:
Dashboard界面截图:
操作验证保留消息,结论总结:
- 每个主题只会存储最新一条保留消息,即使所有订阅者取消该主题订阅,保留消息也不回删除,除非人为在Dashboard界面删除它。
- 一旦主题存储了保留消息,只要是订阅了(包含新订阅或者断开后又重新订阅)该主题,都会收到保留消息推送的最新数据。
- MQTT发布消息是一种广播模式。
– 求知若饥,虚心若愚。