mqttx read econnreset异常排查

news2025/1/13 10:32:54

mqtt 会话 read econnreset
在这里插入图片描述

使用mqttx连接mqtt服务器时出现READ ECONNRESET的排查
前段时间公司新增了mqtt服务器,在我们初步测试的时候没有问题,但是随着连接数量增多,后续几天连续间隔出现READ ECONNRESET,导致项目无法正常使用,于是排查了一下问题,
根据网上的答案,找到以下文章
https://blog.csdn.net/slxz001/article/details/123368088

  1. 分析可能是mqtt会话队列满了,然后修改了mqtt会话队列参数
  2. 也有可能是项目代码中的连接太多,这就要修改项目代码的连接逻辑了,用同一个mqtt client连接mqtt,然后同时监听多个主题,这样可以避免出现太多连接数连接到mqtt,从而避免造成这个问题
    建议在正式部署之前进行一定连接数量测试,避免出现问题

以下为一点测试代码

/**
* 需要自己添加application.yml配置
*/
@AllArgsConstructor
@NoArgsConstructor
@Data
@ConfigurationProperties(prefix = "mqtt")
@Component
public class MqttProperties {
	// mqtt服务器url
    private String uri;
	//登录用户名
    private String username;
	// 密码
    private String password;
	// 暂时没用,主题会在代码里动态获取
    private String topic;

}
@RunWith(SpringRunner.class)
@SpringBootTest
@Log
public class MqttProducerTest {

    @Autowired
    private MqttProperties mqttProperties;

    private static final String WATCH_PREFIX = "TEST";

    /**
     * 设备心跳客户端列表
     */
    static Map<String, MqttClient> clientMap;

    private static final ThreadPoolExecutor POOL_EXECUTOR;

    static {
        POOL_EXECUTOR = new ThreadPoolExecutor(2, 2, 30 * 1000,
                TimeUnit.MILLISECONDS,
                new ArrayBlockingQueue<>(10000),
                new CustomizableThreadFactory("pool"));
        clientMap = new HashMap<>();
    }

    @Test
    public void testConnect() {
        int count = 10000;
        int success = 0;
        AtomicInteger fail = new AtomicInteger();
        for (int i = 0; i < count; i++) {
            log.info(String.format("准备创建第%d个client", i + 1));
            // MQTT
            String urlFrontSuffix = mqttProperties.getUri();
            String clientId = "CLIENT-" + (System.currentTimeMillis() + "").substring(6);
            /* TOPIC:TEST */
            String watchTopic = WATCH_PREFIX + (i * 100);
            System.out.println("clientId: " + clientId + " watchTopic: " + watchTopic);
            try {
                MemoryPersistence memoryPersistence = new MemoryPersistence();
                MqttClient client = new MqttClient(urlFrontSuffix, clientId, memoryPersistence);

                MqttConnectOptions options = getOptions();
                // 设置回调函数,当订阅到信息时调用此方法
                client.setCallback(new MqttCallback() {
                    @Override
                    public void connectionLost(Throwable throwable) {
                        System.out.println(watchTopic + " Connection Lost.");
                        fail.incrementAndGet();
                    }

                    @Override
                    public void messageArrived(String str, MqttMessage mqttMessage) throws Exception {
                        // 订阅成功,并接受信息时调用
                        String payload = new String(mqttMessage.getPayload()); // 获取消息内容
                        log.info(String.format("accepted: %s channel:%s", payload, watchTopic));
                    }

                    @Override
                    public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
                        System.out.println("deliveryComplete" + iMqttDeliveryToken);
                    }
                });
                client.connect(options);
                client.subscribe(watchTopic);

                log.info("connected the mqtt client" + clientId);

                // mqtt客户端
                clientMap.put(watchTopic, client);

                if (client.isConnected()) {
                    success++;
                    log.info(String.format("第%d个client连接成功", success));
                }
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
        log.info(String.format("总共启动%d, 成功%d, 失败%d", count, success, fail.get()));
    }

    private MqttConnectOptions getOptions() {
        MqttConnectOptions options = new MqttConnectOptions();
        // 设置客户端和服务器是否应在重新启动和重新连接期间记住状态 默认false
        options.setCleanSession(true);
        // 设置超时时间
        options.setConnectionTimeout(10);
        // 设置会话心跳时间
        options.setKeepAliveInterval(20);
        // 设置超时时间
        options.setConnectionTimeout(10);
        // 设置会话心跳时间
        options.setKeepAliveInterval(20);
        options.setUserName(mqttProperties.getUsername());
        options.setPassword(mqttProperties.getPassword().toCharArray());
        return options;
    }
}

运行结果:

控制台- 10:35:55.099 [main]  INFO   c.f.a.c.m.MqttProducerTest - [testConnect,58] -准备创建第700个client
clientId: JAVA-CLIENT-4155099 watchTopic: xxx
已断开连接 (32109) - java.io.EOFException
	at org.eclipse.paho.client.mqttv3.internal.CommsReceiver.run(CommsReceiver.java:197)
	at java.lang.Thread.run(Thread.java:748)
Caused by: java.io.EOFException
	at java.io.DataInputStream.readByte(DataInputStream.java:267)
	at org.eclipse.paho.client.mqttv3.internal.wire.MqttInputStream.readMqttWireMessage(MqttInputStream.java:92)
	at org.eclipse.paho.client.mqttv3.internal.CommsReceiver.run(CommsReceiver.java:137)
	... 1 more
控制台- 10:35:55.119 [main]  INFO   c.f.a.c.m.MqttProducerTest - [testConnect,58] -准备创建第701个client
clientId: JAVA-CLIENT-4155119 watchTopic: xxx
已断开连接 (32109) - java.io.EOFException

使用Java代码创建了测试用例,运行测试发现,当连接数建立到700左右时,该问题出现,并抛出了java.io.EOFException异常错误信息
java.io.EOFException

EOFException:当输入过程中意外到达文件或流的末尾时,抛出此异常。

此异常主要被数据输入流用来表明到达流的末尾。注意,其他许多输入操作返回一个特殊值表示到达流的末尾,而不是抛出异常。

产生原因:

  1. 数据流中写入数据时的顺序和读取时的顺序不一致
  2. UTF是双字节编码,而writeChars方法写入的是按照字符格式写入的,在文件中的占位要小于以Unicode编码的同样字符串,所以,使用readUTF方法读取时,会出现EOF错误

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/416084.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Unity Batching 批处理

Unity Batching 批处理 了解批处理前先了解下 Draw Call 要了解 Draw Call 需要先了解游戏引擎是如何把物体图像绘制到屏幕上的。 (1)&#xff1a;渲染流水线 渲染流水线的任务为从一个 二维、三维场景开始&#xff0c;最终渲染为一张二维图像&#xff0c;显示在屏幕上。 计算…

使用Unit Scaling进行FP16 和 FP8 训练

Unit Scaling 是一种新的低精度机器学习方法&#xff0c;能够在没有损失缩放的情况下训练 FP16 和 FP8 中的语言模型。 使用FP16和BFLOAT16替代FP32可以将内存、带宽和计算需求的大幅减少&#xff0c;这也是目前越来越大的模型所需要的。 背景介绍 随着支持fp8的硬件的发展&…

webrtc入门系列(三)云服务器coturn环境搭建

《webrtc入门系列&#xff08;一&#xff09;easy_webrtc_server 入门环境搭建》 《webrtc入门系列&#xff08;二&#xff09;easy_webrtc_server 入门example测试》 《webrtc入门系列&#xff08;三&#xff09;云服务器coturn环境搭建》 《webrtc入门系列&#xff08;四&…

测试题目气死人

服了差不多每一题都要错几个案例我真的服了wok&#xff0c;什么鬼东西&#xff01;&#xff01;&#xff01; lx学长的羊圈 Description lx学长是一个养羊大户&#xff0c;有成千上百个羊圈。可是却一次也没来羊圈帮过忙&#xff0c;今天他被叫来羊圈给羊羊们施展成双成对大法…

力扣算法系统刷题题解记录

力扣算法系统刷题题解记录 文章目录力扣算法系统刷题题解记录前言一、数组704二分查找示意图&#xff1a;解题思路代码27.移除元素示意图解题思路代码前言 参考顺序和资料&#xff1a;《代码随想录》 二刷要认真做笔记啦&#xff0c;加油&#xff01; 一、数组 704二分查找 …

2023-04-12 面试中常见的数组题目

数组中的问题其实最常见 通过基础问题&#xff0c;掌握写出正确算法的“秘诀”巧妙使用双索引技术&#xff0c;解决复杂问题对撞指针- 滑动窗口 1 从二分查找法看如何写出正确的程序 本节学习重点&#xff1a;处理边界问题&#xff01; 1.确定边界范围方法&#xff0c;先用区…

13、Qt生成dll-QLibrary方式使用

Qt创建dll&#xff0c;使用QLibrary类方式调用dll 一、创建项目 1、新建项目->其他项目->Empty qmake Project->Choose 2、输入项目名&#xff0c;选择项目位置&#xff0c;下一步 3、选择MinGW&#xff0c;下一步 4、完成 5、.pro中添加TEMPLATE subdirs&#xff…

定时任务之时间轮算法

初识时间轮 我们先来考虑一个简单的情况&#xff0c;目前有三个任务A、B、C&#xff0c;分别需要在3点钟&#xff0c;4点钟和9点钟执行&#xff0c;可以把时间想象成一个钟表。 如上图中所示&#xff0c;我只需要把任务放到它需要被执行的时刻&#xff0c;然后等着时针转到这个…

IP协议(网络层重点协议)

目录 一、IP协议报头格式 二、地址选择 1、IP地址 &#xff08;1&#xff09;格式 &#xff08;2&#xff09;组成 &#xff08;3&#xff09;分类 &#xff08;4&#xff09;子网掩码 三、路由选择 IP协议是网络层的协议&#xff0c;它主要完成两个方面的任务&#xf…

4.16--设计模式之创建型之代理模式(总复习版本)---脚踏实地,一步一个脚印

1.代理对象 定义&#xff1a;代理模式给某一个对象提供一个代理对象&#xff0c;并由代理对象控制对原对象的引用&#xff0c;从而实现对真实对象的操作。 通俗的来讲代理模式就是我们生活中常见的中介。 在代理模式中&#xff0c;代理对象主要起到一个中介的作用&#xff0c;…

初识Docker并在linux完成安装

文章目录一、 初识Docker1.1 简介1.2 Docker和虚拟机的异同1.3 Docker架构二、 DockerHub三、Docker的安装一、 初识Docker 1.1 简介 Docker是一种开源的容器化平台&#xff0c;可以让开发者在容器中打包、发布、运行和管理应用程序。它使用轻量级的容器来隔离应用程序和它们的…

Scrapy爬虫基本使用与股票数据Scrapy爬虫

Scrapy爬虫的常用命令 scrapy命令行格式 红色是常用的三种命令 为什么Scrapy采用命令行创建和运行爬虫&#xff1f; 命令行&#xff08;不是图形界面&#xff09;更容易自动化&#xff0c;适合脚本控制 本质上&#xff0c;Scrapy是给程序员用的&#xff0c;功能&#xff08…

vue打包之后,可以进行修改配置后端地址、端口等信息方法

前言 用vue-cli构建的项目通常是采用前后端分离的开发模式&#xff0c;也就是前端与后台完全分离&#xff0c;此时就需要将后台接口地址打包进项目中&#xff0c;但是&#xff0c;难道我们只是改个接口地址也要重新打包吗&#xff1f;当然不行了&#xff0c;那就太麻烦了&#…

支付宝沙箱环境+SpringBoot+内网穿透整合开发

目录 1.查看沙箱账号 2.内网穿透 3.沙箱环境整合SpringBoot开发 下面我将以实际案例详细介绍如何使用沙箱环境进行支付宝支付对接的开发 1.查看沙箱账号 首先什么是沙箱账号&#xff1f; 沙箱账号是指在支付宝沙箱环境中创建的测试账户&#xff0c;用于模拟真实的支付流程…

The 2022 ICPC Asia Xian Regional Contest

题目顺序大致按照难度排序。 F. Hotel 现在酒店中有单人间和双人间&#xff0c;价格分别是c1&#xff0c;c2&#xff0c;现在有n个队&#xff0c;每队三个人&#xff0c;性别分别用字母表示&#xff0c;当两个人性别相同且在同一个队时&#xff0c;他们可以住在双人间中。求最…

【跑跑Github开源项目系列】基于YOLO和Streamlit的车辆识别系统demo

【跑跑Github开源项目系列】基于YOLO和Streamlit的车辆识别系统demo写在前面环境配置创建虚拟环境安装库项目运行写在前面 相信很多朋友跟我一样在github等平台上偷代码 (读书人的事怎么能叫偷呢) 的时候会发现伟大且无私的作者虽然开源了代码但是readme文件该写的没写&#x…

2023TYUT移动应用软件开发程序设计和填空

目录 程序设计 程序设计1&#xff1a;根据要求设计UI,补充相应布局文件&#xff0c;即.xml文件 程序设计2&#xff1a;根据要求,补充Activity.java文件 程序填空 说明&#xff1a; 程序设计 程序设计1&#xff1a;根据要求设计UI,补充相应布局文件&#xff0c;即.xml文件…

【C++初阶】第十篇:list模拟实现

文章目录一、list的模拟实现三个类及其成员函数接口总览结点类的模拟实现迭代器类的模拟实现迭代器类的模板参数说明迭代器operator->的重载迭代器模拟实现代码list的模拟实现无参构造函数带参构造拷贝构造函数赋值运算符重载函数析构函数begin和endinserteraselist的迭代器…

WordPress添加阿里云OSS对象云储存配置教程

背景&#xff1a;随着页面文章增多&#xff0c;内置图片存储拖连网站响应速度&#xff0c;这里对我来说主要是想提升速度 目的&#xff1a;使用第三方云存储作为图片外存储(图床)&#xff0c;这样处理可以为服务器节省很多磁盘空间&#xff0c;在网站搬家的时候减少文件迁移的工…

【数据结构】堆(笔记总结)

&#x1f466;个人主页&#xff1a;Weraphael ✍&#x1f3fb;作者简介&#xff1a;目前学习C和算法 ✈️专栏&#xff1a;数据结构 &#x1f40b; 希望大家多多支持&#xff0c;咱一起进步&#xff01;&#x1f601; 如果文章对你有帮助的话 欢迎 评论&#x1f4ac; 点赞&…