目录
一、MQ协议
MQTT 特点
MQTT 工作原理
MQTT 主要应用场景
MQTT 配置与注意事项
二、MQTT服务器搭建
三、参考案例
MQTT(Message Queuing Telemetry Transport)是一种基于发布/订阅模型的轻量级消息传输协议,常用于物联网(IoT)场景中。它设计简洁、带宽占用少,非常适合资源受限的设备和网络环境。
一、MQ协议
MQTT 特点
-
轻量级协议:
- 设计简单,占用带宽少,特别适合嵌入式设备和不稳定的网络环境。
-
发布/订阅模型:
- 客户端通过主题(Topic)发布消息,订阅者通过主题接收消息,彼此不直接通信。
-
可靠性保障:
- 提供三种服务质量(QoS)等级,确保消息可靠传输:
- QoS 0:至多一次(不确认,可能丢失)。
- QoS 1:至少一次(需要确认,但可能重复)。
- QoS 2:仅一次(确保消息不丢失且不重复)。
- 提供三种服务质量(QoS)等级,确保消息可靠传输:
-
持续连接:
- 使用 TCP/IP 连接,通过心跳包(Keep-Alive)保持连接稳定。
-
支持离线消息:
- 使用“保留消息”和“持久会话”功能,实现离线设备接收消息。
-
安全性:
- 支持 SSL/TLS 加密,结合用户名和密码进行身份验证。
MQTT 工作原理
-
连接:
- 客户端通过
CONNECT
消息向服务器建立连接,服务器返回CONNACK
消息。
- 客户端通过
-
发布:
- 客户端通过
PUBLISH
消息向服务器发布消息,指定消息的主题。
- 客户端通过
-
订阅:
- 客户端通过
SUBSCRIBE
消息订阅一个或多个主题,服务器将匹配主题的消息推送给客户端。
- 客户端通过
-
心跳:
- 客户端和服务器定期发送心跳包(PINGREQ 和 PINGRESP),确保连接有效。
-
断开:
- 客户端通过
DISCONNECT
消息通知服务器主动断开连接。
- 客户端通过
MQTT 主要应用场景
-
物联网(IoT):
- 设备状态监控、数据收集和远程控制。
-
智能家居:
- 控制家电、监控传感器数据。
-
车联网:
- 实时车辆数据传输、位置追踪。
-
移动应用:
- 消息推送、实时聊天。
-
工业领域:
- 设备数据采集和分析。
MQTT 配置与注意事项
-
主题命名:
- 使用层级结构(如
/iot/device/status
),便于管理。 - 避免过于复杂的主题结构。
- 使用层级结构(如
-
QoS 选择:
- 根据应用需求选择适合的 QoS 等级,平衡可靠性和性能。
-
安全措施:
- 启用 SSL/TLS 加密。
- 配置用户名和密码,限制匿名连接。
- 控制主题的访问权限。
-
性能优化:
- 控制消息大小,减少带宽占用。
- 调整心跳时间,优化连接稳定性。
二、MQTT服务器搭建
1、在springboot项目工程pom文件下引入相关依赖
<!--mqtt相关依赖-->
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-stream</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-integration</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-mqtt</artifactId>
</dependency>
2、修改application.yml配置文件
spring:
application:
name: provider
#MQTT配置信息
mqtt:
#MQTT服务地址,端口号默认1883,如果有多个,用逗号隔开
url: tcp://127.0.0.1:1883
#用户名
username: guest
#密码
password: guest
#客户端id(不能重复)
client:
id: provider-id
#MQTT默认的消息推送主题,实际可在调用接口是指定
default:
topic: topic
server:
port: 8080
3、消息发布者客户端配置
?
package com.three.demo.mqtt.config;
import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.*;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Configuration;
import javax.annotation.PostConstruct;
import java.time.LocalDateTime;
@Configuration
@Slf4j
public class MqttClientConfig {
@Value("${spring.mqtt.username}")
private String username;
@Value("${spring.mqtt.password}")
private String password;
@Value("${spring.mqtt.url}")
private String hostUrl;
@Value("${spring.mqtt.client.id}")
private String clientId;
@Autowired
private MqttClientCallBack mqttClientCallBack;
/**
* 客户端对象
*/
private MqttAsyncClient client;
/**
* 在bean初始化后连接到服务器
*/
@PostConstruct
public void init() {
connect();
}
/**
* 客户端连接服务端
*/
public void connect() {
//连接设置
MqttConnectOptions options = new MqttConnectOptions();
//是否清空session,设置false表示服务器会保留客户端的连接记录(订阅主题,qos),客户端重连之后能获取到服务器在客户端断开连接期间推送的消息
//设置为true表示每次连接服务器都是以新的身份
options.setCleanSession(false);
//设置连接用户名
options.setUserName(username);
//设置连接密码
options.setPassword(password.toCharArray());
//设置超时时间,单位为秒
options.setConnectionTimeout(60);
//设置心跳时间 单位为秒,表示服务器每隔 1.5*10秒的时间向客户端发送心跳判断客户端是否在线
options.setKeepAliveInterval(20);
// 开启自动重连
options.setAutomaticReconnect(true);
// 设置最大重连时间间隔 (可选),单位是毫秒,设置为 5000 表示最多等待 5 秒再尝试重连
options.setMaxReconnectDelay(5000);
//设置遗嘱消息的话题,若客户端和服务器之间的连接意外断开,服务器将发布客户端的遗嘱信息
options.setWill("willTopic", (clientId + "与服务器断开连接").getBytes(), 0, false);
try {
//创建MQTT客户端对象
client = new MqttAsyncClient(hostUrl, clientId, new MemoryPersistence());
//设置回调
client.setCallback(mqttClientCallBack);
// 使用异步连接
client.connect(options, null, new IMqttActionListener() {
@Override
public void onSuccess(IMqttToken asyncActionToken) {
log.info("MQTT连接成功");
}
@Override
public void onFailure(IMqttToken asyncActionToken, Throwable exception) {
log.error("MQTT连接失败:" + exception.getMessage());
}
});
} catch (MqttException e) {
log.error("mqtt连接失败。。" + e.getMessage());
}
}
public void publish(int qos, boolean retained) {
MqttMessage mqttMessage = new MqttMessage();
mqttMessage.setQos(qos);
mqttMessage.setRetained(retained);
mqttMessage.setPayload(pushLog.getData().getBytes());
try {
// 使用异步客户端发布消息,并处理结果
client.publish(pushLog.getTopic(), mqttMessage, null, new IMqttActionListener() {
@Override
public void onSuccess(IMqttToken asyncActionToken) {
System.out.println("发送成功");
}
@Override
public void onFailure(IMqttToken asyncActionToken, Throwable exception) {
log.error("发送失败:" + exception.getMessage());
}
});
} catch (MqttException e) {
log.error("发送失败:" + e.getMessage());
}
}
/**
* 断开连接
*/
public void disConnect() {
try {
client.disconnect();
} catch (MqttException e) {
e.printStackTrace();
}
}
}
?
4、消息发布客户端回调
package com.three.demo.mqtt.config;
import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.IMqttAsyncClient;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
@Slf4j
@Component
public class MqttClientCallBack implements MqttCallback {
@Value("${spring.mqtt.client.id}")
private String clientId;
/**
* 与服务器断开的回调
*/
@Override
public void connectionLost(Throwable cause) {
log.error(clientId + "与服务器断开连接!!" + cause.getMessage());
}
/**
* 消息发布成功的回调
*/
@Override
public void deliveryComplete(IMqttDeliveryToken token) {
IMqttAsyncClient client = token.getClient();
System.out.println(client.getClientId()+"发布消息成功!");
}
}
5、创建控制器测试发布信息
package com.three.demo.mqtt.controller;
import com.three.demo.mqtt.config.MqttClientConfig;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.ResponseBody;
@Controller
public class SendController {
@Autowired
private MqttClientConfig client;
@RequestMapping("/sendMessage")
@ResponseBody
public String sendMessage(int qos,boolean retained,String topic,String message){
try {
client.publish(qos, retained, topic, message);
return "发送成功";
} catch (Exception e) {
e.printStackTrace();
return "发送失败";
}
}
}
6、消息接收者配置
这里我对之前的代码进行改造
/**
* 客户端连接服务端
*/
public void connect() {
//连接设置
MqttConnectOptions options = new MqttConnectOptions();
//是否清空session,设置false表示服务器会保留客户端的连接记录(订阅主题,qos),客户端重连之后能获取到服务器在客户端断开连接期间推送的消息
//设置为true表示每次连接服务器都是以新的身份
options.setCleanSession(false);
//设置连接用户名
options.setUserName(username);
//设置连接密码
options.setPassword(password.toCharArray());
//设置超时时间,单位为秒
options.setConnectionTimeout(60);
//设置心跳时间 单位为秒,表示服务器每隔 1.5*10秒的时间向客户端发送心跳判断客户端是否在线
options.setKeepAliveInterval(20);
// 开启自动重连
options.setAutomaticReconnect(true);
// 设置最大重连时间间隔 (可选),单位是毫秒,设置为 5000 表示最多等待 5 秒再尝试重连
options.setMaxReconnectDelay(5000);
//设置遗嘱消息的话题,若客户端和服务器之间的连接意外断开,服务器将发布客户端的遗嘱信息
options.setWill("willTopic", (clientId + "与服务器断开连接").getBytes(), 0, false);
try {
//创建MQTT客户端对象
client = new MqttAsyncClient(hostUrl, clientId, new MemoryPersistence());
//设置回调
client.setCallback(mqttClientCallBack);
// 使用异步连接
client.connect(options, null, new IMqttActionListener() {
@Override
public void onSuccess(IMqttToken asyncActionToken) {
log.info("MQTT连接成功");
// 连接成功后订阅主题
try {
//订阅主题
//消息等级,和主题数组一一对应,服务端将按照指定等级给订阅了主题的客户端推送消息
int[] qos = {2, 2};
String[] topics = {
"/iot/msg/topic1",
"/iot/msg/topic2"
};
client.subscribe(topics, qos);
log.info("订阅主题成功");
} catch (MqttException e) {
log.error("订阅主题失败:" + e.getMessage());
}
}
@Override
public void onFailure(IMqttToken asyncActionToken, Throwable exception) {
log.error("MQTT连接失败:" + exception.getMessage());
}
});
} catch (MqttException e) {
e.printStackTrace();
log.error("mqtt连接失败。。" + e.getMessage());
}
}
然后在消息客户端回调类这里
package com.ruoyi.yyt.mqtt.config;
import org.eclipse.paho.client.mqttv3.IMqttAsyncClient;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.springframework.stereotype.Component;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
@Slf4j
@Component
public class MqttClientCallBack implements MqttCallback {
@Value("${spring.mqtt.client.id}")
private String clientId;
/**
* 客户端断开连接的回调
*/
@Override
public void connectionLost(Throwable throwable) {
log.error(clientId + "与服务器断开连接!!" + cause.getMessage());
}
/**
* 消息到达的回调
*/
@Override
public void messageArrived(String topic, MqttMessage message) throws Exception {
System.out.println(String.format("接收消息主题 : %s",topic));
System.out.println(String.format("接收消息Qos : %d",message.getQos()));
System.out.println(String.format("接收消息内容 : %s",new String(message.getPayload())));
System.out.println(String.format("接收消息retained : %b",message.isRetained()));
}
/**
* 消息发布成功的回调
*/
@Override
public void deliveryComplete(IMqttDeliveryToken token) {
IMqttAsyncClient client = token.getClient();
System.out.println(client.getClientId() + "发布消息成功!");
}
}
这个时候我们启动服务,调用测试接口
就可以看到接口返回发布成功,并且能看到后台服务的打印日志了
至此大功告成了!
三、参考案例
参考