目录
一、前言
二、MQTT协议概述
概念
基本原理
MQTT协议的结构
MQTT的QoS机制
QoS 0:最多一次传输
QoS 1:至少一次传输
QoS 2:恰好一次传输
三、MQTT的应用场景
四、MQTT的优点和缺点
五、MQTT协议的实现
六、实战体验MQTT
七、MQTT协议的未来发展
一、前言
随着物联网和智能化应用的快速发展,对于通信协议的需求越来越多样化和复杂化,对于物联网应用来说,基于TCP/IP的协议MQTT(Message Queuing Telemetry Transport)正逐渐成为主流的协议之一。本文将对MQTT协议的相关概念、基本原理、应用场景等进行介绍和分析。
二、MQTT协议概述
概念
MQTT(Message Queuing Telemetry Transport)是一种轻量级的发布/订阅消息传输协议,它被设计用于低带宽和不稳定的网络环境中,比如远程传感器和移动设备等。它支持一对多的消息传输,可以被广泛应用于物联网、移动应用程序、工业自动化、金融服务等领域。
基本原理
MQTT使用基于TCP/IP协议的可靠传输机制来实现消息传输,并采用发布/订阅模式来处理消息。在MQTT中,发布者将消息发布到一个主题(Topic)上,订阅者则可以订阅该主题,以接收发布者发布的消息。MQTT Broker充当一个中间件来传递这些消息。MQTT支持三种QoS(服务质量)等级,从而实现不同的消息传输可靠性和实时性。此外,MQTT还支持各种各样的客户端,包括C语言、Java、Python等等,因此它被广泛用于物联网、移动应用程序、工业自动化等领域。
MQTT协议的结构
MQTT协议主要由三部分组成,分别是固定头部、可变头部和消息体,其中固定头部和可变头部的长度是固定的,消息体的长度是可变的。
固定头部:固定头部由两个部分组成,第一个字节的前四位表示消息类型(MessageType),剩下的四位是标识符标志(Flag)。前四位二进制可表示16种消息类型,如下图所示:
第一个字节后四位中,DUP表示发布消息的副本,用来保证消息的可靠传输。消息重传时,DUP标志被设置为1。而两位的Qos则表示消息的服务质量(以二进制表示):
- 00:最多一次传输
- 01:至少一次传输
- 10:恰好一次传输
- 11:预留标识
RETAIN作为发布保留标识,发布保留标识,表示服务器要保留这次推送的信息,如果有新的订阅者出现,就把这消息推送给它,如果设有那么推送至当前订阅者后释放。
第二个字节为剩余长度,用来保存变长头部和消息体的总大小的,但不是直接保存的。这一字节是可以扩展,其保存机制,前7位用于保存长度,后一部用做标识。当最后一位为 1时,表示长度不足,需要使用二个字节继续保存。
可变头部:可变头部长度根据消息类型不同而不同,一般包括报文标识符(Packet Identifier)和主题(Topic)等信息。
消息体:消息体的长度也根据消息类型不同而不同,主要包括消息正文和可选的附加属性等信息。
对于MQTT协议而言,有以下四种类型的消息体:
- CONNECT:消息体内容主要是:客户端的ClientID、订阅的Topic、Message以及用户名和密码。
- SUBSCRIBE:消息体内容是一系列的要订阅的主题以及QoS。
- SUBACK:消息体内容是服务器对于SUBSCRIBE所申请的主题及QoS进行确认和回复。
- UNSUBSCRIBE:消息体内容是要订阅的主题。
MQTT的QoS机制
MQTT协议支持三种不同的QoS(服务质量)等级,分别为0、1和2,它们分别代表不同的消息传输可靠性和实时性。下面是三种QoS等级的详细说明:
QoS 0:最多一次传输
消息发布者只发送一次消息,无论它是否被接收。此时消息传输的效率最高,但消息的可靠性最低。
消息丢失情况:当我们使用 QoS 0 传递消息时,消息的可靠性完全依赖于底层的 TCP 协议。而 TCP 只能保证在连接稳定不关闭的情况下消息的可靠到达,一旦出现连接关闭、重置,仍有可能丢失当前处于网络链路或操作系统底层缓冲区中的消息。这也是 QoS 0 消息最主要的丢失场景。
QoS 1:至少一次传输
为了保证消息到达,QoS 1加入了应答与重传机制,发送方只有在收到接收方的 PUBACK 报文以后,才能认为消息投递成功,在此之前,发送方需要存储该 PUBLISH 报文以便下次重传。此时消息传输的效率较高,且消息的可靠性较高。
消息重复情况:对于发送方而言,没有收到回传的PUBACK报文有一下两种情况。
- 第一种情况:接收方未收到消息。
- 第二种情况:接收方收到了消息,回传的PUBACK报文未到达发送方。
如果是第一种情况,发送方将会重传报文,接收方相当于只收到了一次消息。如果是第二种情况,发送方重传报文,但此时接收方已经收到了这个消息,这就导致接收方收到了重复的消息。虽然在重传时,PUBLUSH报文的DUP标志会被设置为1.表示这是一个重传的报文、但是接收方不能假定自己曾经接受过这个消息,仍然需要将其视作一个全新的消息。
QoS 2:恰好一次传输
发送方和接收方之间进行一个消息确认(PUBREC、PUBREL、PUBCOMP)的三步握手,以确保消息的可靠传输。每一次的QoS 2消息投递,都要求发送方与接收方进行至少两次请求/响应流程。此时消息传输的效率较低,但消息的可靠性最高。
下面是设置QoS为2时的消息发送流程:
- 首先,发送方存储并发送QoS 为 2的 PUBLISH 报文以启动一次QoS 2消息的传输,然后等待接收方回复 PUBREC 报文。这一部分与QoS 1基本一致,只是响应报文从 PUBACK 变成了 PUBREC。
- 当发送方收到 PUBREC 报文,即可确认对端已经收到了 PUBLISH 报文,发送方将不再需要重传这个报文,并且也不能再重传这个报文。所以此时发送方可以删除本地存储的 PUBLISH 报文,然后发送一个 PUBREL 报文,通知对端自己准备将本次使用的 Packet ID 标记为可用了。与 PUBLISH 报文一样,我们需要确保 PUBREL 报文到达对端,所以也需要一个响应报文,并且这个 PUBREL 报文需要被存储下来以便后续重传。
- 当接收方收到 PUBREL 报文,也可以确认在这一次的传输流程中不会再有重传的 PUBLISH 报文到达,因此回复 PUBCOMP 报文表示自己也准备好将当前的 Packet ID 用于新的消息了。
- 当发送方收到 PUBCOMP 报文,这一次的 QoS 2 消息传输就算正式完成了。在这之后,发送方可以再次使用当前的 Packet ID 发送新的消息,而接收方再次收到使用这个 Packet ID 的 PUBLISH 报文时,也会将它视为一个全新的消息。
消息去重原理:QoS 2 规定,发送方只有在收到 PUBREC 报文之前可以重传 PUBLISH 报文。一旦收到 PUBREC 报文并发出 PUBREL 报文,发送方就进入了 Packet ID 释放流程,不可以再使用当前 Packet ID 重传 PUBLISH 报文。同时,在收到对端回复的 PUBCOMP 报文确认双方都完成 Packet ID 释放之前,也不可以使用当前 Packet ID 发送新的消息。因此,对于接收方来说,能够以 PUBREL 报文为界限,凡是在 PUBREL 报文之前到达的 PUBLISH 报文,都必然是重复的消息;而凡是在 PUBREL 报文之后到达的 PUBLISH 报文,都必然是全新的消息。一旦有了这个前提,我们就能够在协议层面完成 QoS 2 消息的去重。
三、MQTT的应用场景
MQTT协议被广泛应用于各种领域,如下所示:
物联网应用:MQTT是物联网应用中最常用的协议之一,它支持低带宽、低功耗设备,并且可以轻松地扩展到海量设备。
移动应用程序:MQTT是移动应用程序中使用最广泛的协议之一,它可以为移动应用程序提供可靠的消息传输机制,而且消耗的带宽和电量很低。
工业自动化:MQTT可以被广泛应用于工业自动化领域,用于实时监控和控制生产过程中的各种设备。
金融服务:MQTT可以为金融服务提供可靠的消息传输机制,以保证消息的安全性和可靠性。
四、MQTT的优点和缺点
MQTT协议有以下优点:
- 灵活性高。MQTT协议是基于发布/订阅模式来处理消息的,可以很方便地支持一对多的通信模式。
- 可扩展性强。MQTT协议支持可靠的消息传输机制,并且可以轻松地扩展到海量设备。
- 轻量级。MQTT协议是一种轻量级的协议,适用于低带宽和不稳定的网络环境中,消耗的带宽和电量很低。
- 易于实现。MQTT协议具有简单的结构和易于实现的特点,可以在各种硬件平台上运行。
MQTT协议的缺点包括以下几点:
- 安全性较低。MQTT协议没有内置的安全机制,需要使用TLS/SSL等加密协议来保证通信的安全性。
- 可靠性较低。QoS等级为0的消息只能保证最多一次传输,可能会出现消息丢失的情况。
- 消息头较大。MQTT协议的消息头比较大,占用了较大的带宽资源。
- 无序传输。MQTT协议的消息传输是无序的,不能保证消息的顺序性。
五、MQTT协议的实现
MQTT协议的实现需要以下几个组件:
MQTT客户端:MQTT客户端可以是任何设备,包括PC、手机、传感器等,它用于连接到MQTT Broker,并发布或订阅消息。
MQTT Broker:MQTT Broker是消息代理服务器,负责接收、存储和转发MQTT消息。
MQTT订阅者:MQTT订阅者用于订阅消息,并接收发布者发布的消息。
MQTT发布者:MQTT发布者用于发布消息,并将消息发送给MQTT Broker。
MQTT协议的实现可以使用多种编程语言,如Java、Python、C++等,还可以使用各种MQTT客户端库和MQTT Broker实现,如Eclipse Mosquitto、EMQ X等。
六、实战体验MQTT
实战体验MQTT协议,我们首先需要一个MQTT Broker,建议使用EMQX作为MQTT Broker,具体的安装方式请自行搜索(建议通过docker安装)。安装完成后,浏览器访问localhost:18083,登录之后出现如下界面,表示安装成功。
可在右上角设置中设置中文。
Maven依赖
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-mqtt</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
想要发送消息,我们需要用到一个消息发送者,一个消息接受者,一个消息代理服务器。消息发送者和接受者代码如下:
ClientMQTT代码:
@Data
public class ClientMQTT {
private String topic;
private String clientId;
private MqttClient mqttClient;
private MqttConnectOptions mqttConnectOptions;
private static String host = "tcp://127.0.0.1:1883";// 协议地址
private static String userName = "admin";// 账号
private static String passWord = "123456";// 密码
public static void main(String[] args) throws MqttException {
ClientMQTT oneClientMQTT = ClientMQTT.getOneClientMQTT("topic123", "clientId123");
}
public ClientMQTT() {
this.topic = "test";//设置默认的topic 和 clientId;
this.clientId = "testClientId-003";
}
public ClientMQTT(String topic, String clientId) throws MqttException {
this.topic = topic;
this.clientId = clientId;
}
/**
* 连接到服务器
*/
private void start() {
try {
// host为主机名,clientid即连接MQTT的客户端ID,一般以唯一标识符表示,MemoryPersistence设置clientid的保存形式,默认为以内存保存
mqttClient = new MqttClient(host, clientId, new MemoryPersistence());
// MQTT的连接设置
mqttConnectOptions = new MqttConnectOptions();
// 设置是否清空session,这里如果设置为false表示服务器会保留客户端的连接记录,这里设置为true表示每次连接到服务器都以新的身份连接
mqttConnectOptions.setCleanSession(true);
// 设置连接的用户名
mqttConnectOptions.setUserName(userName);
// 设置连接的密码
mqttConnectOptions.setPassword(passWord.toCharArray());
// 设置超时时间 单位为秒
mqttConnectOptions.setConnectionTimeout(10);
// 设置会话心跳时间 单位为秒 服务器会每隔1.5*20秒的时间向客户端发送个消息判断客户端是否在线,但这个方法并没有重连的机制
mqttConnectOptions.setKeepAliveInterval(20);
mqttConnectOptions.setAutomaticReconnect(true);
// 设置回调
mqttClient.setCallback(new PushCallback());
MqttTopic mqttTopic = mqttClient.getTopic(topic);
//setWill方法,如果项目中需要知道客户端是否掉线可以调用该方法。设置最终端口的通知消息
mqttConnectOptions.setWill(mqttTopic, "close".getBytes(), 2, true);
mqttClient.connect(mqttConnectOptions);
//订阅消息
int[] Qos = {2};
String[] topic1 = {topic};
mqttClient.subscribe(topic1, Qos);
} catch (Exception e) {
e.printStackTrace();
}
}
/**
* 返回一个Client
*/
public static ClientMQTT getOneClientMQTT(String topic, String clientId) throws MqttException {
ClientMQTT client = new ClientMQTT(topic, clientId);
client.start();
return client;
}
}
ServerMQTT代码:
@Data
public class ServerMQTT {
private static Scanner input = new Scanner(new BufferedInputStream(System.in));
private static PrintWriter pw = new PrintWriter(System.out);
private String topic;
private String clientId;
private MqttClient mqttClient;
private MqttTopic mqttTopic;
private MqttMessage mqttMessage;
private static String host = "tcp://127.0.0.1:1883";// 协议地址
private static String userName = "admin";// 账号
private static String passWord = "123456";// 密码
public static void main(String[] args) throws MqttException {
ServerMQTT oneServerMQTT = ServerMQTT.getOneServerMQTT("topic123", "client321");
while (true) {
String str = input.next();
int Qos = input.nextInt();
ServerMQTT.sendMsg(oneServerMQTT, str, Qos);
}
}
public ServerMQTT() throws MqttException {
this.topic = "test";//设置默认的topic 和 clientId;
this.clientId = "testClientId-001";
mqttClient = new MqttClient(host, clientId, new MemoryPersistence());
connect();
}
public ServerMQTT(String topic, String clientId) throws MqttException {
this.topic = topic;
this.clientId = clientId;
mqttClient = new MqttClient(host, clientId, new MemoryPersistence());
connect();
}
/**
* 用来连接服务器
*/
private void connect() {
MqttConnectOptions options = new MqttConnectOptions();
// 设置是否清空session,这里如果设置为false表示服务器会保留客户端的连接记录,这里设置为true表示每次连接到服务器都以新的身份连接
options.setCleanSession(false);
options.setUserName(userName);// 设置用户密码
options.setPassword(passWord.toCharArray());
options.setConnectionTimeout(10);// 设置超时时间
options.setKeepAliveInterval(20);// 设置会话心跳时间
options.setAutomaticReconnect(true);
try {
mqttClient.setCallback(new PushCallback());
mqttClient.connect(options);
mqttTopic = mqttClient.getTopic(topic);
} catch (Exception e) {
e.printStackTrace();
}
}
public void publish(MqttTopic topic, MqttMessage message) throws MqttPersistenceException, MqttException {
MqttDeliveryToken token = topic.publish(message);
token.waitForCompletion();
}
/**
* 返回一个Server
*/
public static ServerMQTT getOneServerMQTT(String topic, String clientId) throws MqttException {
ServerMQTT server = new ServerMQTT(topic, clientId);
return server;
}
/**
* 发送消息
*/
public static boolean sendMsg(ServerMQTT server, String msg, int qos) throws MqttException {
server.mqttMessage = new MqttMessage();
server.mqttMessage.setQos(qos);
server.mqttMessage.setRetained(true);
server.mqttMessage.setPayload(msg.getBytes());
server.publish(server.mqttTopic, server.mqttMessage);
return server.mqttMessage.isRetained();
}
}
PushBack代码:
public class PushCallback implements MqttCallback {
public void connectionLost(Throwable cause) {
System.out.println("连接断开,可以做重连");
}
public void deliveryComplete(IMqttDeliveryToken token) {
}
public void messageArrived(String topic, MqttMessage message) throws Exception {
System.out.println("接收到topic为:" + topic + " Qos = " + message.getQos() + " 内容为:" + new String(message.getPayload()));
}
}
首先启动ClientMQTT,然后启动ServerMQTT,可直接通过控制台输入消息和Qos等级发送消息。
七、MQTT协议的未来发展
随着物联网技术的不断发展和应用,MQTT协议将继续发挥重要作用。未来,MQTT协议可能会有以下几个发展趋势:
- 安全性和可靠性的提升。MQTT协议的未来发展方向之一是提高安全性和可靠性,例如加强数据加密、身份认证和消息确认等功能。
- 实时性的提高。MQTT协议将会逐步优化,提高消息传输的实时性,适用于更多实时应用场景。
- 扩展性的增强。MQTT协议将会逐步增强其可扩展性,以满足海量设备的连接和通信需求。
- 集成性的提高。MQTT协议将会逐步与其他物联网技术集成,例如传感器、无线网络、云计算等,以支持更多的应用场景。
参考资料
MQTT QoS 0, 1, 2 介绍 | EMQ (emqx.com)
MQTT 协议快速体验 | EMQ (emqx.com)