简述
之前写过一篇SpringBoot通过Netty实现TCP服务的文章,本篇与之前那篇实现的场景类似,都是服务器与客户端之间双向交互,但个人觉得MQTT的方式实现更好,优雅。
基础
MQTT协议是通过MQTT服务器转发消息,MQTT服务器作为三方,为每个客户端转发消息。与TCP不同的是,TCP编码,java大部分场景是作为服务端,设备作为客户端,而MQTT是一台单独的服务器,java跟设备都作为客户端与之保持长连接。
准备
下载EMQX,对应有windows、mac、linux的版本
下载网址:EMQX下载网址
解压,启动
bin文件夹下执行:(稍等五秒钟,出现两行英文时表示启动完成)
emqx start
浏览器打开
http://localhost:18083/
启动成功。
账号密码:admin/public
若代码里连接MQTT服务时带上用户名密码需要如下操作:
创建客户端认证
编码
maven依赖
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-mqtt</artifactId>
</dependency>
监听器
package com.dpkj.mqtt.mqtt;
import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
import com.dpkj.mqtt.service.DpDeviceService;
import com.mdd.common.entity.DpDevice;
import com.mdd.common.enums.CtrlTypeEnum;
import com.mdd.common.enums.MqttTopicEnum;
import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.jetbrains.annotations.NotNull;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationListener;
import org.springframework.context.annotation.Lazy;
import org.springframework.context.event.ContextRefreshedEvent;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
import java.util.List;
/**
* 项目启动 连接mqtt服务器 订阅指定主题
* 此处为项目启动就订阅主题,这种情况是主题已经明确的情况,
* 若项目启动时主题不明确,需要在代码里动态订阅,
可直接在类中注入MQTTConnect server,之后执行server.sub("xxxx")
*/
@Slf4j
@Component
public class MQTTListener implements ApplicationListener<ContextRefreshedEvent> {
private final MQTTConnect server;
@Autowired
public MQTTListener(MQTTConnect server) {
this.server = server;
}
@Override
public void onApplicationEvent(@NotNull ContextRefreshedEvent contextRefreshedEvent) {
try {
server.setMqttClient(new Callback());
//订阅主题
server.sub(“test/test/test”);
} catch (MqttException e) {
log.error(e.getMessage(), e);
}
}
}
发布及订阅主题
package com.dpkj.mqtt.mqtt;
import com.dpkj.mqtt.config.async.AsyncTaskService;
import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.*;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
import java.io.UnsupportedEncodingException;
import static org.eclipse.paho.client.mqttv3.MqttConnectOptions.MQTT_VERSION_3_1_1;
/**
* WQ
* 2023/2/14 17:10
* MQTT工具类操作
*/
@Slf4j
@Component
public class MQTTConnect {
@Value("${mqtt.host}")
private String mqttHost;
@Value("${mqtt.port}")
private String mqttPort;
@Value("${mqtt.username}")
private String mqttUsername;
@Value("${mqtt.password}")
private String mqttPassword;
//唯一标识
private final String clientId = "MqttClient" + (int) (Math.random() * 100000000);
private MqttClient mqttClient;
@Resource
private AsyncTaskService asyncTaskService;
/**
* 客户端connect连接mqtt服务器
**/
public void setMqttClient(MqttCallback mqttCallback) {
MqttConnectOptions options = mqttConnectOptions();
if (mqttCallback == null) {
mqttClient.setCallback(new Callback());
} else {
mqttClient.setCallback(mqttCallback);
}
try {
mqttClient.connect(options);
log.error("MQTT服务连接成功。tcp://{}:{},clientId:{},username:{},password:{}", mqttHost, mqttPort, clientId, mqttUsername, mqttPassword);
} catch (MqttException e) {
e.printStackTrace();
log.error("MQTT服务连接失败,{}。tcp://{}:{},clientId:{},username:{},password:{}", e.getMessage(), mqttHost, mqttPort, clientId, mqttUsername, mqttPassword);
}
}
/**
* MQTT连接参数设置
*/
public MqttConnectOptions mqttConnectOptions() {
try {
mqttClient = new MqttClient("tcp://" + mqttHost + ":" + mqttPort, clientId, new MemoryPersistence());
} catch (MqttException e) {
log.error("建立mqtt客户端出错。{}", e.getMessage());
e.printStackTrace();
}
MqttConnectOptions options = new MqttConnectOptions();
options.setUserName(mqttUsername);
options.setPassword(mqttPassword.toCharArray());
options.setConnectionTimeout(0);
options.setAutomaticReconnect(true);
options.setKeepAliveInterval(90);
return options;
}
/**
* 关闭MQTT连接
*/
public void close() throws MqttException {
mqttClient.disconnect();
mqttClient.close();
}
/**
* 向某个主题发布消息 默认qos:1
*
* @param sn:道品云泊控制卡设备的唯一标识
* @param msg:发布的消息
* @param msgId:发布的消息唯一标识
*/
public void pub(String sn, String msg, String msgId) {
MqttMessage mqttMessage = new MqttMessage();
try {
//GBK格式下发
mqttMessage.setPayload(msg.getBytes("GBK"));
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
}
String topic = "/test/" + sn + "/demo";
log.error("向主题[{}],发送消息 {}", topic, msg);
MqttTopic mqttTopic = mqttClient.getTopic(topic);
MqttDeliveryToken token;
try {
token = mqttTopic.publish(mqttMessage);
token.waitForCompletion();
} catch (MqttException e) {
e.printStackTrace();
}
}
/**
* 订阅某一个主题 ,此方法默认的的Qos等级为:1
*
* @param topic 主题
*/
public void sub(String topic) throws MqttException {
log.error("主题订阅:{}", topic);
mqttClient.subscribe(topic);
}
// /**
// * 订阅某一个主题,可携带Qos
// *
// * @param topic 所要订阅的主题
// * @param qos 消息质量:0、1、2
// */
// public void sub(String topic, int qos) throws MqttException {
// mqttClient.subscribe(topic, qos);
// }
}
yml
mqtt:
host: 127.0.0.1
port: 1883
username: xxx
password: 'xxx'