MQTT是一种基于发布/订阅模式的轻量协议,该协议基于TCP/IP协议上,由IBM在1999年发布。
流程理解:订阅者在订阅时会选择主题(Topic)和服务质量(QoS),然后发布者发布消息,代理就会把不同的消息根据Topic推送给相关订阅者。
Topic:每个人的喜好,以订报纸为例,就是军事、财经等主题。
QoS:传输质量(消息的发布者和订阅者约定的),QoS0(发布完就不管了,最多一次)、QoS1(发送之后根据规范,是否启动重传,所以至少一次)、QoS2(确保只有一次)
应用场景:这个协议主要还是在物联网应用比较多,因为开销小对网络要求不高,我这次的使用场景就是和安卓系统的之间的通讯,回传定位信息。
1.MQTT依赖
这里是springboot整合mqtt,所以要在pom.xml里增加
<!--Mqtt依赖-->
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-mqtt</artifactId>
</dependency>
2.MQTT客户端
setCleanSession表示会话生命周期,默认为True。为 true 时表示创建一个新的会话,在客户端断开连接时,会话将自动销毁。为 false 时表示创建一个持久会话,在客户端断开连接后会话仍然保持(前提是ClientID没变),直到会话超时注销。
import org.eclipse.paho.client.mqttv3.*;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import org.springframework.stereotype.Component;
/**
* 类描述:Mqtt连接工具类
*
* @ClassName MQTTConnect
* @Author yeapt
* @Date 2023-07-20 12:22
*/
@Component
public class MQTTConnect {
//mqtt服务器的地址和端口号
private String HOST = "tcp://192.168.1.113:123";
private final String clientId = "DC" + (int) (Math.random() * 100000000);
private MqttClient mqttClient;
/**
* 客户端connect连接mqtt服务器
*
* @param userName 用户名
* @param passWord 密码
* @param mqttCallback 回调函数
**/
public void setMqttClient(String userName, String passWord, MqttCallback mqttCallback) throws MqttException {
MqttConnectOptions options = mqttConnectOptions(userName, passWord);
if (mqttCallback == null) {
mqttClient.setCallback(new Callback());
} else {
mqttClient.setCallback(mqttCallback);
}
mqttClient.connect(options);
}
/**
* MQTT连接参数设置
*/
private MqttConnectOptions mqttConnectOptions(String userName, String passWord) throws MqttException {
mqttClient = new MqttClient(HOST, clientId, new MemoryPersistence());
MqttConnectOptions options = new MqttConnectOptions();
options.setUserName(userName);
options.setPassword(passWord.toCharArray());
//连接超时时间,默认:30
options.setConnectionTimeout(10);
//重连配置,默认:false
options.setAutomaticReconnect(true);
//会话生命周期,默认:true
options.setCleanSession(true);
//默认:60
options.setKeepAliveInterval(60);
return options;
}
/**
* 关闭MQTT连接
*/
public void close() throws MqttException {
mqttClient.disconnect();
mqttClient.close();
}
/**
* 向某个主题发布消息 默认qos:1
*
* @param topic:发布的主题
* @param msg:发布的消息
*/
public void pub(String topic, String msg) throws MqttException {
MqttMessage mqttMessage = new MqttMessage();
//Qos2:保证消息至少传输成功1次
mqttMessage.setQos(2);
mqttMessage.setPayload(msg.getBytes());
MqttTopic mqttTopic = mqttClient.getTopic(topic);
MqttDeliveryToken token = mqttTopic.publish(mqttMessage);
token.waitForCompletion();
}
/**
* 向某个主题发布消息
*
* @param topic: 发布的主题
* @param msg: 发布的消息
* @param qos: 消息质量 Qos:0、1、2
*/
public void pub(String topic, String msg, int qos) throws MqttException {
MqttMessage mqttMessage = new MqttMessage();
mqttMessage.setQos(qos);
mqttMessage.setPayload(msg.getBytes());
MqttTopic mqttTopic = mqttClient.getTopic(topic);
MqttDeliveryToken token = mqttTopic.publish(mqttMessage);
token.waitForCompletion();
}
/**
* 订阅某一个主题 ,此方法默认的的Qos等级为:1
*
* @param topic 主题
*/
public void sub(String topic) throws MqttException {
mqttClient.subscribe(topic);
}
/**
* 订阅某一个主题,可携带Qos
*
* @param topic 所要订阅的主题
* @param qos 消息质量:0、1、2
*/
public void sub(String topic, int qos) throws MqttException {
mqttClient.subscribe(topic, qos);
}
}
3.MQTT回调函数
用来处理业务,获取到信息后如何处理
import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttMessage;
/**
* 类描述:MQTT回调
*
* @ClassName Callback
* @Author yeapt
* @Date 2023-07-20 12:24
*/
@Slf4j
public class Callback implements MqttCallback {
/**
* MQTT 断开连接会执行此方法
*/
@Override
public void connectionLost(Throwable throwable) {
log.info("断开了MQTT连接 :{}", throwable.getMessage());
log.error(throwable.getMessage(), throwable);
}
/**
* publish发布成功后会执行到这里
*/
@Override
public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
log.info("发布消息成功");
}
/**
* subscribe订阅后得到的消息会执行到这里
*/
@Override
public void messageArrived(String topic, MqttMessage message) throws Exception {
// TODO 此处可以将订阅得到的消息进行业务处理、数据存储
log.info("收到来自 " + topic + " 的消息:{}", new String(message.getPayload()));
}}
4.调试和使用
开发阶段使用main方法进行测试,但是在实际使用中一般在程序启动就会开启监听。
/**
* main函数自己测试用
*/
public static void main(String[] args) throws MqttException {
MQTTConnect mqttConnect = new MQTTConnect();
mqttConnect.setMqttClient("admin", "password", new Callback());
mqttConnect.sub("/abcd-trace/#",2);
mqttConnect.pub("/abcd-trace/#", "Mr.Y" + (int) (Math.random() * 100000000));
}
import com.stylefeng.guns.modular.xujianapi.util.mqtt.Callback;
import com.stylefeng.guns.modular.xujianapi.util.mqtt.MQTTConnect;
import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationListener;
import org.springframework.context.event.ContextRefreshedEvent;
import org.springframework.stereotype.Component;
/**
* 类描述:MQTT监听类
*
* @ClassName MQTTListener
* @Author yeapt
* @Date 2023-07-20 12:27
*/
@Slf4j
@Component
public class MQTTListener implements ApplicationListener<ContextRefreshedEvent> {
private final MQTTConnect server;
@Autowired
public MQTTListener(MQTTConnect server) {
this.server = server;
}
@Override
public void onApplicationEvent(ContextRefreshedEvent contextRefreshedEvent) {
try {
server.setMqttClient("admin", "password", new Callback());
server.sub("/abcd-trace/#");
} catch (MqttException e) {
log.error(e.getMessage(), e);
}
}
}
这里只是讲述了客户端,后续如果有机会再研究发布者和代理的写法。