1. MQTT简介
MQTT是一种物联网消息协议,为Message Queuing Telemetry Transport的缩写,即消息队列传输探测,协议基于发布订阅模式进行通信,有开销低、带宽小、轻量的特点,通常应用在物联网数据采集、移动应用、智能硬件、电力、能源等领域。
相关概念
三种身份:
- 客户端(Client):MQTT 客户端是发送和接收消息的应用程序。
- 服务器(Broker):也叫“代理”,服务器是处理消息的应用程序,位于发布者和订阅者中间,负责接收消息,并按照某种规则发送给订阅者。
- 主题(Topic): 主题是消息的标识符,用于区分不同类型的消息。
MQTT 消息
MQTT传输的消息可以分为:主题(topic)和负载(payload)两部分
- 主题,可以理解为消息的类型
- 负载,可以理解为消息的内容
消息服务质量QoS(Quality of Service)
Qos用于保证在不同的网络环境下消息传递的可靠性,分为3个等级
- 0 消息最多传递一次,消息发布完全依赖底层TCP/IP网络,可能会发生消息丢失, 也就是发出去就不管了,也被叫做“即发即弃”
- 1 消息传递至少 1 次,确保消息到达,但消息重复可能会发生,发送者将会存储发送的信息直到发送者收到一次来自接收者的PUBACK格式的应答。
- 2 消息仅传送一次,确保消息到达一次
2. SpringBoot集成Mqtt
Spring集成Mqtt常用的有两种方式,一种是直接使用Mqtt的客户端库,如Eclipse Paho
,另外一种是spring integration mqtt
第一种:使用Mqtt客户端库
依赖引入:org.eclipse.paho.client.mqttv3
<dependency>
<groupId>org.eclipse.paho</groupId>
<artifactId>org.eclipse.paho.client.mqttv3</artifactId>
<version>1.2.0</version>
</dependency>
服务端配置
public class MqttSendMsgService {
private static String clientId = "test";
private static String username = "admin";
private static String password = "xxxxxx";
private static String broker = "tcp://xxxxx:1883";
public ReturnT<String> mqttSend(String param) {
MqttClient client;
try {
client = new MqttClient(broker, clientId, new MemoryPersistence());
client.setCallback(new MqttCallback() {
public void connectionLost(Throwable cause) {
System.out.println("Connection lost: " + cause.getMessage());
}
@Override
public void messageArrived(String topic, MqttMessage mqttMessage) throws Exception {
System.out.println("Message arrived: " + mqttMessage.getPayload());
}
@Override
public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
System.out.println("Delivery complete");
}
});
MqttConnectOptions connOpts = new MqttConnectOptions();
connOpts.setUserName(username);
connOpts.setPassword(password.toCharArray());
client.connect(connOpts);
log.info("Connected to MQTT Broker!");
//主题
String topic="test/simple";
//消息
String content="发送测试";
MqttMessage message = new MqttMessage();
message.setQos(1);
message.setRetained(false);
message.setPayload(content.getBytes());
//消息发送
client.publish(topic,message);
} catch (MqttException e) {
e.printStackTrace();
}
return ReturnT.SUCCESS;
}
}
上面这种使用起来比较简单,生产环境使用最多的还是下面这种
第二种:使用 Spring integration进行集成,这里以发送消息为例
依赖引入
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-mqtt</artifactId>
<version>5.5.14</version>
</dependency>
添加yaml配置
mqtt.url = tcp://xxxxx:1883
mqtt.username = admin
mqtt.password = 123456
mqtt.clientId = test
mqtt.defaultTopic = /test/send
mqtt.keepAliveInterval = 60
mqtt.automaticReconnect = true
mqtt.cleanSession = false
mqtt.connectionTimeout = 30
mqtt.maxInflight = 1024
添加对应的属性配置类
@Component
public class MqttConfigProperties {
@Value("${mqtt.url}")
private String url;
@Value("${mqtt.username}")
private String username;
@Value("${mqtt.password}")
private String password;
@Value("${mqtt.clientId}")
private String clientId;
@Value("${mqtt.defaultTopic}")
private String defaultTopic;
@Value("${mqtt.keepAliveInterval}")
private Integer keepAliveInterval;
@Value("${mqtt.automaticReconnect}")
private Boolean automaticReconnect;
@Value("${mqtt.cleanSession}")
private Boolean cleanSession;
@Value("${mqtt.connectionTimeout}")
private Integer connectionTimeout;
@Value("${mqtt.maxInflight}")
private Integer maxInflight;
}
创建客户端配置类
@Configuration
@IntegrationComponentScan
public class MqttConfig {
@Autowired
private MqttConfigProperties mqttConfigProperties;
@Bean
public MqttConnectOptions mqttConnectOptions() {
log.info("初始化mqtt信息{}", JSON.toJSON(mqttConfigProperties));
MqttConnectOptions options = new MqttConnectOptions();
options.setUserName(mqttConfigProperties.getUsername());
options.setPassword(mqttConfigProperties.getPassword().toCharArray());
options.setServerURIs(new String[]{mqttConfigProperties.getUrl()});
options.setKeepAliveInterval(mqttConfigProperties.getKeepAliveInterval());
options.setAutomaticReconnect(mqttConfigProperties.getAutomaticReconnect());
options.setCleanSession(mqttConfigProperties.getCleanSession());
options.setConnectionTimeout(mqttConfigProperties.getConnectionTimeout());
options.setMaxInflight(mqttConfigProperties.getMaxInflight());
return options;
}
@Bean
public MqttPahoClientFactory mqttPahoClientFactory(MqttConnectOptions mqttConnectOptions) {
DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
factory.setConnectionOptions(mqttConnectOptions);
return factory;
}
// 推送通道
@Bean
public MessageChannel mqttOutputChannel() {
return new DirectChannel();
}
@Bean
@ServiceActivator(inputChannel = "mqttOutputChannel")
public MessageHandler sendHandler(MqttPahoClientFactory mqttPahoClientFactory) {
MqttPahoMessageHandler messageHandler = new MqttPahoMessageHandler(mqttConfigProperties.getClientId() + "-publish", mqttPahoClientFactory);
messageHandler.setAsync(true);
messageHandler.setDefaultQos(1);
messageHandler.setDefaultTopic(mqttConfigProperties.getDefaultTopic());
log.info("初始化mqttOutputChannel...");
return messageHandler;
}
}
发送网关接口
@MessagingGateway(defaultRequestChannel = "mqttOutputChannel")
public interface MqttGateway {
/**
* 发送消息
*
* @param topic
* @param data
*/
void send(@Header(MqttHeaders.TOPIC) String topic, String data);
}
这样,在发送消息时,直接将消息网关注入,调用发送方法就可以发送了
mqttGateway.send(topic, JSONObject.toJSONString(msg));
参考:
https://mqtt.org/