mqtt 会话 read econnreset
使用mqttx连接mqtt服务器时出现READ ECONNRESET的排查
前段时间公司新增了mqtt服务器,在我们初步测试的时候没有问题,但是随着连接数量增多,后续几天连续间隔出现READ ECONNRESET,导致项目无法正常使用,于是排查了一下问题,
根据网上的答案,找到以下文章
https://blog.csdn.net/slxz001/article/details/123368088
- 分析可能是mqtt会话队列满了,然后修改了mqtt会话队列参数
- 也有可能是项目代码中的连接太多,这就要修改项目代码的连接逻辑了,用同一个mqtt client连接mqtt,然后同时监听多个主题,这样可以避免出现太多连接数连接到mqtt,从而避免造成这个问题
建议在正式部署之前进行一定连接数量测试,避免出现问题
以下为一点测试代码
/**
* 需要自己添加application.yml配置
*/
@AllArgsConstructor
@NoArgsConstructor
@Data
@ConfigurationProperties(prefix = "mqtt")
@Component
public class MqttProperties {
// mqtt服务器url
private String uri;
//登录用户名
private String username;
// 密码
private String password;
// 暂时没用,主题会在代码里动态获取
private String topic;
}
@RunWith(SpringRunner.class)
@SpringBootTest
@Log
public class MqttProducerTest {
@Autowired
private MqttProperties mqttProperties;
private static final String WATCH_PREFIX = "TEST";
/**
* 设备心跳客户端列表
*/
static Map<String, MqttClient> clientMap;
private static final ThreadPoolExecutor POOL_EXECUTOR;
static {
POOL_EXECUTOR = new ThreadPoolExecutor(2, 2, 30 * 1000,
TimeUnit.MILLISECONDS,
new ArrayBlockingQueue<>(10000),
new CustomizableThreadFactory("pool"));
clientMap = new HashMap<>();
}
@Test
public void testConnect() {
int count = 10000;
int success = 0;
AtomicInteger fail = new AtomicInteger();
for (int i = 0; i < count; i++) {
log.info(String.format("准备创建第%d个client", i + 1));
// MQTT
String urlFrontSuffix = mqttProperties.getUri();
String clientId = "CLIENT-" + (System.currentTimeMillis() + "").substring(6);
/* TOPIC:TEST */
String watchTopic = WATCH_PREFIX + (i * 100);
System.out.println("clientId: " + clientId + " watchTopic: " + watchTopic);
try {
MemoryPersistence memoryPersistence = new MemoryPersistence();
MqttClient client = new MqttClient(urlFrontSuffix, clientId, memoryPersistence);
MqttConnectOptions options = getOptions();
// 设置回调函数,当订阅到信息时调用此方法
client.setCallback(new MqttCallback() {
@Override
public void connectionLost(Throwable throwable) {
System.out.println(watchTopic + " Connection Lost.");
fail.incrementAndGet();
}
@Override
public void messageArrived(String str, MqttMessage mqttMessage) throws Exception {
// 订阅成功,并接受信息时调用
String payload = new String(mqttMessage.getPayload()); // 获取消息内容
log.info(String.format("accepted: %s channel:%s", payload, watchTopic));
}
@Override
public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
System.out.println("deliveryComplete" + iMqttDeliveryToken);
}
});
client.connect(options);
client.subscribe(watchTopic);
log.info("connected the mqtt client" + clientId);
// mqtt客户端
clientMap.put(watchTopic, client);
if (client.isConnected()) {
success++;
log.info(String.format("第%d个client连接成功", success));
}
} catch (Exception e) {
e.printStackTrace();
}
}
log.info(String.format("总共启动%d, 成功%d, 失败%d", count, success, fail.get()));
}
private MqttConnectOptions getOptions() {
MqttConnectOptions options = new MqttConnectOptions();
// 设置客户端和服务器是否应在重新启动和重新连接期间记住状态 默认false
options.setCleanSession(true);
// 设置超时时间
options.setConnectionTimeout(10);
// 设置会话心跳时间
options.setKeepAliveInterval(20);
// 设置超时时间
options.setConnectionTimeout(10);
// 设置会话心跳时间
options.setKeepAliveInterval(20);
options.setUserName(mqttProperties.getUsername());
options.setPassword(mqttProperties.getPassword().toCharArray());
return options;
}
}
运行结果:
控制台- 10:35:55.099 [main] INFO c.f.a.c.m.MqttProducerTest - [testConnect,58] -准备创建第700个client
clientId: JAVA-CLIENT-4155099 watchTopic: xxx
已断开连接 (32109) - java.io.EOFException
at org.eclipse.paho.client.mqttv3.internal.CommsReceiver.run(CommsReceiver.java:197)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.io.EOFException
at java.io.DataInputStream.readByte(DataInputStream.java:267)
at org.eclipse.paho.client.mqttv3.internal.wire.MqttInputStream.readMqttWireMessage(MqttInputStream.java:92)
at org.eclipse.paho.client.mqttv3.internal.CommsReceiver.run(CommsReceiver.java:137)
... 1 more
控制台- 10:35:55.119 [main] INFO c.f.a.c.m.MqttProducerTest - [testConnect,58] -准备创建第701个client
clientId: JAVA-CLIENT-4155119 watchTopic: xxx
已断开连接 (32109) - java.io.EOFException
使用Java代码创建了测试用例,运行测试发现,当连接数建立到700左右时,该问题出现,并抛出了java.io.EOFException异常错误信息
java.io.EOFException
EOFException:当输入过程中意外到达文件或流的末尾时,抛出此异常。
此异常主要被数据输入流用来表明到达流的末尾。注意,其他许多输入操作返回一个特殊值表示到达流的末尾,而不是抛出异常。
产生原因:
- 数据流中写入数据时的顺序和读取时的顺序不一致
- UTF是双字节编码,而writeChars方法写入的是按照字符格式写入的,在文件中的占位要小于以Unicode编码的同样字符串,所以,使用readUTF方法读取时,会出现EOF错误