# 测试程序
【pom.xml】
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
<version>2.3.12.RELEASE</version>
</dependency>
<dependency>
<groupId>org.eclipse.paho</groupId>
<artifactId>org.eclipse.paho.mqttv5.client</artifactId>
<version>1.2.5</version>
</dependency>
<dependency>
<groupId>com.alibaba.fastjson2</groupId>
<artifactId>fastjson2</artifactId>
<version>2.0.49</version>
</dependency>
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
<version>1.2.11</version>
</dependency>
</dependencies>
【MyDemo3MqttV5Server1.java】模拟一个正常的消息接收服务
package com.chz.myMqttV5.demo3;
import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.mqttv5.client.*;
import org.eclipse.paho.mqttv5.common.MqttException;
import org.eclipse.paho.mqttv5.common.MqttMessage;
import org.eclipse.paho.mqttv5.common.packet.MqttProperties;
import java.util.concurrent.ThreadLocalRandom;
@Slf4j
public class MyDemo3MqttV5Server1
{
public static void main(String[] args) throws InterruptedException
{
String broker = "tcp://192.168.44.228:1883";
String clientId = "MyDemo3MqttV5Server1";
int subQos = 1;
try {
MqttClient client = new MqttClient(broker, clientId);
MqttConnectionOptions options = new MqttConnectionOptions();
options.setAutomaticReconnect(true);
client.setCallback(new MyDemo3Server1Callback(clientId));
client.connect(options);
client.subscribe("$share/demo3/device/#", subQos);
} catch (MqttException e) {
e.printStackTrace();
}
}
@Slf4j
public static class MyDemo3Server1Callback implements MqttCallback
{
private String clientId;
public MyDemo3Server1Callback(String clientId)
{
this.clientId = clientId;
}
public void connectComplete(boolean reconnect, String serverURI) {
log.info("{}::connectComplete, reconnect={}, serverURI={}", clientId, reconnect, serverURI);
}
public void disconnected(MqttDisconnectResponse disconnectResponse) {
log.info("{}::disconnected, disconnectResponse={}", clientId, disconnectResponse.getReasonString());
}
public void deliveryComplete(IMqttToken token) {
log.info("{}::deliveryComplete, disconnectResponse={}", clientId, token.isComplete());
}
public void messageArrived(String topic, MqttMessage message) throws Exception {
log.info("{}::messageArrived, topic={}, qos={}, message={}", clientId, topic, message.getQos(), new String(message.getPayload()));
}
public void mqttErrorOccurred(MqttException exception) {
log.info("{}::mqttErrorOccurred, disconnectResponse={}", clientId, exception.getMessage());
}
public void authPacketArrived(int reasonCode, MqttProperties properties) {
log.info("{}::authPacketArrived, reasonCode={}", clientId, reasonCode);
}
}
}
【MyDemo3MqttV5Server2.java】模拟一个工作不正常的消息接收不服务
package com.chz.myMqttV5.demo3;
import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.mqttv5.client.*;
import org.eclipse.paho.mqttv5.common.MqttException;
import org.eclipse.paho.mqttv5.common.MqttMessage;
import org.eclipse.paho.mqttv5.common.packet.MqttProperties;
import java.util.concurrent.ThreadLocalRandom;
@Slf4j
public class MyDemo3MqttV5Server2
{
private static int subQos = 1;
public static void main(String[] args) throws InterruptedException
{
String broker = "tcp://192.168.44.228:1883";
String clientId = "MyDemo3MqttV5Server2";
try {
MqttClient client = new MqttClient(broker, clientId);
MqttConnectionOptions options = new MqttConnectionOptions();
options.setAutomaticReconnect(true);
options.setKeepAliveInterval(3); // keepAliveInterval设置成3秒
client.setCallback(new MyDemo3Server2Callback(clientId, client));
client.connect(options);
} catch (MqttException e) {
e.printStackTrace();
}
}
@Slf4j
public static class MyDemo3Server2Callback implements MqttCallback
{
private String clientId;
private MqttClient client;
public MyDemo3Server2Callback(String clientId, MqttClient client)
{
this.clientId = clientId;
this.client = client;
}
public void connectComplete(boolean reconnect, String serverURI) {
log.info("{}::connectComplete, reconnect={}, serverURI={}", clientId, reconnect, serverURI);
try {
if( client.isConnected() ){
client.subscribe("$share/demo3/device/#", subQos);
}
} catch (MqttException e) {
log.error("err", e);
}
}
public void disconnected(MqttDisconnectResponse disconnectResponse) {
log.info("{}::disconnected, disconnectResponse={}", clientId, disconnectResponse.getReasonString());
}
public void deliveryComplete(IMqttToken token) {
log.info("{}::deliveryComplete, disconnectResponse={}", clientId, token.isComplete());
}
public void messageArrived(String topic, MqttMessage message) throws Exception {
log.info("{}::messageArrived start, topic={}, qos={}, message={}", clientId, topic, message.getQos(), new String(message.getPayload()));
if( ThreadLocalRandom.current().nextInt() % 20 ==0 ){
try {
log.info("{}::messageArrived ------------error1, topic={}, qos={}, message={}", clientId, topic, message.getQos(), new String(message.getPayload()));
// 休眠10秒,因为前面设置了keepAliveInterval为3秒,所以一定会导致连接断开
Thread.sleep(10000);
log.info("{}::messageArrived ------------error2, topic={}, qos={}, message={}", clientId, topic, message.getQos(), new String(message.getPayload()));
} catch (Exception e) {
log.error("err", e);
}
}
}
public void mqttErrorOccurred(MqttException exception) {
log.info("{}::mqttErrorOccurred, disconnectResponse={}", clientId, exception.getMessage());
}
public void authPacketArrived(int reasonCode, MqttProperties properties) {
log.info("{}::authPacketArrived, reasonCode={}", clientId, reasonCode);
}
}
}
【MyDemo3MqttV5Sender.java】模拟一个发消息出来的设备
package com.chz.myMqttV5.demo3;
import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.mqttv5.client.*;
import org.eclipse.paho.mqttv5.common.MqttException;
import org.eclipse.paho.mqttv5.common.MqttMessage;
import org.eclipse.paho.mqttv5.common.packet.MqttProperties;
@Slf4j
public class MyDemo3MqttV5Sender {
private static String clientId = MyDemo3MqttV5Sender.class.getSimpleName();
public static void main(String[] args) throws InterruptedException {
String broker = "tcp://192.168.44.228:1883";
int subQos = 1;
int pubQos = 1;
String msg;
try {
MqttClient client = new MqttClient(broker, clientId);
MqttConnectionOptions options = new MqttConnectionOptions();
client.setCallback(new MyDemo3SenderCallback(clientId));
client.connect(options);
client.subscribe("device/#", subQos);
for(int i=0; i<200; i++){
int id = i;
msg = id+"";
MqttMessage message = new MqttMessage(msg.getBytes());
message.setId(id); // 这个id很重要,否则qos=1不会生效
message.setQos(pubQos); // 设置qos=1
client.publish("device/1", message);
Thread.sleep(1L);
}
} catch (MqttException e) {
e.printStackTrace();
}
}
@Slf4j
public static class MyDemo3SenderCallback implements MqttCallback
{
private String clientId;
public MyDemo3SenderCallback(String clientId)
{
this.clientId = clientId;
}
public void connectComplete(boolean reconnect, String serverURI) {
log.info("{}::MyMqttCallback, reconnect={}, serverURI={}", clientId, reconnect, serverURI);
}
public void disconnected(MqttDisconnectResponse disconnectResponse) {
log.info("{}::disconnected, disconnectResponse={}", clientId, disconnectResponse.getReasonString());
}
public void deliveryComplete(IMqttToken token) {
log.info("{}::deliveryComplete, disconnectResponse={}", clientId, token.isComplete());
}
public void messageArrived(String topic, MqttMessage message) throws Exception {
log.info("{}::messageArrived, topic={}, qos={}, message={}", clientId, topic, message.getQos(), new String(message.getPayload()));
}
public void mqttErrorOccurred(MqttException exception) {
log.info("{}::mqttErrorOccurred, disconnectResponse={}", clientId, exception.getMessage());
}
public void authPacketArrived(int reasonCode, MqttProperties properties) {
log.info("{}::authPacketArrived, reasonCode={}", clientId, reasonCode);
}
}
}
# 开始测试
启动【MyDemo3MqttV5Server1、MyDemo3MqttV5Server2】
然后启动【MyDemo3MqttV5Sender】,输出日志如下:
可以看到【MyDemo3MqttV5Server2】接收到消息【74】的时候卡了10秒,然后连接就断开了。
可以看到【MyDemo3MqttV5Server2】断开连接之后,消息【74】之后的后续消息都被【MyDemo3MqttV5Server1】接收到了。换句话说不会因为某个消息接收服务的问题导致消息丢失。
# 参考资料
mqtt服务emqx的安装可以参考https://blog.csdn.net/chenhz2284/article/details/139411874