今天遇到了一个mqtt的问题,虽然解决了,但是感觉不是很好,希望大家多指点
这是配置文件
customer:
mqtt:
broker: tcp://ip:1883
clientList:
- clientId: nays_service
subscribeTopic: xxxxxx
- clientId: receive_service
subscribeTopic: xxxxxx
MqttConfig 读取配置文件的
@Data
@Configuration
@ConfigurationProperties(prefix = "customer.mqtt")
public class MqttConfig {
/**
* mqtt broker地址
*/
String broker;
/**
* 需要创建的MQTT客户端
*/
List<MqttClient> clientList;
}
一个MqttClient类用来构造配置文件中的数据对象
@Data
public class MqttClient {
/**
* 客户端ID
*/
private String clientId;
/**
* 监听主题
*/
private String subscribeTopic;
/**
* 用户名
*/
private String userName;
/**
* 密码
*/
private String password;
}
服务运行的时候进行mqtt客户端创建,创建的数据从配置文件中读取
/**
* MQTT客户端创建
*/
@Component
@Slf4j
public class MqttClientCreate {
@Resource
private MqttClientManager mqttClientManager;
@Resource
private MqttConfig mqttConfig;
/**
* 创建MQTT客户端
*/
@PostConstruct
public void createMqttClient() {
// 会读取配置文件中的clientList
List<MqttClient> mqttClientList = mqttConfig.getClientList();
// 遍历去创建
for (MqttClient mqttClient : mqttClientList) {
log.info("{}", mqttClient);
mqttClientManager.createMqttClient(mqttClient.getClientId(), mqttClient.getSubscribeTopic());
}
}
}
这是创建的代码,问题很多(请看代码的注释部分)
```java
@Slf4j
@Component
public class MqttClientManager {
@Value("${customer.mqtt.broker}")
private String mqttBroker;
@Resource
private MqttCallBackContext mqttCallBackContext;
/**
* 存储MQTT客户端
*/
public static Map<String, MqttClient> MQTT_CLIENT_MAP = new ConcurrentHashMap<>();
public MqttClient getMqttClientById(String clientId) {
return MQTT_CLIENT_MAP.get(clientId);
}
/**
* 创建mqtt客户端
* @param clientId 客户端ID
* @param subscribeTopic 订阅主题,可为空
*/
public void createMqttClient(String clientId, String subscribeTopic) {
// 它将消息存储在内存中,而不是持久存储到文件或其他存储介质中
MemoryPersistence persistence = new MemoryPersistence();
try {
MqttClient client = new MqttClient(mqttBroker, clientId, persistence);
MqttConnectOptions connOpts = new MqttConnectOptions();
// 客户端每次连接到 MQTT 服务器时都会被视为一个全新的会话。
connOpts.setCleanSession(true);
if (null != subscribeTopic && !subscribeTopic.isEmpty()) {
AbsMqttCallBack callBack = mqttCallBackContext.getCallBack(clientId);
// 这里的default就是DefaultMqttCallBack, 一开始创建的时候走的就是这个
// 问题最大的地方在这,通过这样方式拿回来的是同一个对象,hashCode也相同
// 现在想到的做法是深拷贝,有没有什么好的做法,比如通过构造方法
if (null == callBack) {
// 一开始这里的操作直接是, 当创建多个客户端的时候拿到的对象都是同一个
// callback = mqttCallBackContext.getCallBack("default");
AbsMqttCallBack original = mqttCallBackContext.getCallBack("default");
callBack = original.deepCopy();
}
callBack.setClientId(clientId);
callBack.setConnectOptions(connOpts);
client.setCallback(callBack);
}
//连接mqtt服务端broker
client.connect(connOpts);
log.info("客户端 {} 连接成功状态 {}", clientId, client.isConnected());
// 订阅主题
if (null != subscribeTopic && !subscribeTopic.isEmpty()) {
if (subscribeTopic.contains("-")) {
client.subscribe(subscribeTopic.split("-"));
}
else {
client.subscribe(subscribeTopic);
}
}
MQTT_CLIENT_MAP.putIfAbsent(clientId, client);
} catch (MqttException e) {
log.error("创建mqttClient失败!", e);
}
}
}
这是用于存储每个mqtt客户端的回调方法类
/**
* MQTT订阅回调环境类
*/
@Component
@Slf4j
public class MqttCallBackContext {
// 在 Spring 中,当你注入一个 Map<String, AbsMqttCallBack> 类型的字段时,
// Spring 会自动将所有实现了 AbsMqttCallBack 接口的 Bean 收集起来,
// 并将它们的名称作为键值。因此,DefaultMqttCallBack 会被注入到 callBackMap 中,键值为 "default"。
private final Map<String, AbsMqttCallBack> callBackMap = new ConcurrentHashMap<>();
/**
* 默认构造函数
*
* @param callBackMap 回调集合
*/
public MqttCallBackContext(Map<String, AbsMqttCallBack> callBackMap) {
this.callBackMap.putAll(callBackMap);
}
/**
* 获取MQTT回调类
*
* @param clientId 客户端ID
* @return MQTT回调类
*/
public AbsMqttCallBack getCallBack(String clientId) {
return this.callBackMap.get(clientId);
}
}
这里遇到的问题就是mqtt断了之后进行重新连接的机制,在MqttClientManager这个代码中之前的回调类是callback = mqttCallBackContext.getCallBack(“default”);这样拿的,通过hashCode来看,都一样,说明每次创建都会对这个对象进行修改,那么这里赋值的clientId就会变成最后一个创建的mqtt对象id,所以在重连代码中,每次进来的对象虽然是另外一个mqtt客户端,但是拿到的clientid都是同一个,没有办法进行获取和其它的操作
/**
* MQTT回调抽象类
*/
@Setter
@Getter
@Slf4j
public abstract class AbsMqttCallBack implements MqttCallback {
private String clientId;
private MqttConnectOptions connectOptions;
@Resource
private MqttConfig mqttConfig;
@Resource
private MqttClientManager mqttClientManager;
/**
* 失去连接操作,进行重连
*
* @param throwable 异常
*/
@Override
public void connectionLost(Throwable throwable) {
log.info("{}失去连接,进行尝试重连", this.clientId);
MqttClient mqttClient = MqttClientManager.MQTT_CLIENT_MAP.get(clientId);
String subscribeTopic = mqttConfig.getClientList().stream()
.filter(item -> item.getClientId().equals(clientId))
.map(com.ruoyi.web.core.mottconfig.MqttClient::getSubscribeTopic)
.findFirst()
.orElse(null);
if (mqttClient != null) {
MqttConnectOptions connOpts = new MqttConnectOptions();
connOpts.setCleanSession(true); // 可以根据实际需求配置
// 重连的尝试
int retryCount = 0;
int maxRetries = 10; // 最大重连次数
while (retryCount < maxRetries) {
try {
if (mqttClient.isConnected()) {
log.info("{} 重连成功", clientId);
return;
}
// 重新连接
mqttClient.connect(connOpts);
log.info("{} 重连成功", clientId);
if (null != subscribeTopic && !subscribeTopic.isEmpty()) {
if (subscribeTopic.contains("-")) {
mqttClient.subscribe(subscribeTopic.split("-"));
}
else {
mqttClient.subscribe(subscribeTopic);
}
}
break;
} catch (MqttException e) {
retryCount++;
log.error("{} 重连失败,尝试第 {} 次重连", clientId, retryCount, e);
// 可设置重连间隔,比如等待2秒钟后再尝试重连
try {
Thread.sleep(2000);
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
}
}
}
if (retryCount == maxRetries) {
log.error("{} 超过最大重连次数,重连失败", clientId);
}
}
}
/**
* 接收订阅消息
* @param topic 主题
* @param mqttMessage 接收消息
* @throws Exception 异常
*/
@Override
public void messageArrived(String topic, MqttMessage mqttMessage) throws Exception {
String content = new String(mqttMessage.getPayload(), StandardCharsets.UTF_8);
handleReceiveMessage(topic, content);
}
/**
* 消息发送成功
*
* @param iMqttDeliveryToken toke
*/
@Override
public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
log.info("消息发送成功");
}
/**
* 处理接收的消息
* @param topic 主题
* @param message 消息内容
*/
protected abstract void handleReceiveMessage(String topic, String message);
// 深拷贝方法
public abstract AbsMqttCallBack deepCopy();
}
这里就是 最后的callback实现类进行业务处理
/**
* 默认回调
*/
@Slf4j
@Component("default")
public class DefaultMqttCallBack extends AbsMqttCallBack {
@Autowired
private AlarmListService alarmListService;
@Autowired
private OperateService operateService;
@Autowired
private INrDeviceService iNrDeviceService;
//private static final String TOPIC1 = 1;
/**
* @param topic 主题
* @param message 消息内容
*/
@Override
protected void handleReceiveMessage(String topic, String message) {
log.info("订阅的主题---{}", topic);
log.info("接收到消息---{}", message);
// 业务操作
}
@Override
public AbsMqttCallBack deepCopy() {
DefaultMqttCallBack copy = new DefaultMqttCallBack();
copy.setClientId(this.getClientId());
copy.setConnectOptions(this.getConnectOptions());
copy.setMqttConfig(this.getMqttConfig());
copy.setMqttClientManager(this.getMqttClientManager());
return copy;
}
}
我是通过深拷贝来做的,应该是可以通过构造方法来,但是对这个整体的代码还是不够熟悉,想看看应该如何优化,还请指点,最好笑的是:这段代码是公司一直使用的,用在了好几个项目上,我真是服了!!!!