MQTT的连接配置以及重连机制和遇到的问题--------求如何修改更加好

news2025/2/22 11:10:14

今天遇到了一个mqtt的问题,虽然解决了,但是感觉不是很好,希望大家多指点

这是配置文件

customer:
  mqtt:
    broker: tcp://ip:1883
    clientList:
      - clientId: nays_service
        subscribeTopic: xxxxxx
      - clientId: receive_service
        subscribeTopic: xxxxxx

MqttConfig 读取配置文件的

@Data
@Configuration
@ConfigurationProperties(prefix = "customer.mqtt")
public class MqttConfig {
    /**
     * mqtt broker地址
     */
    String broker;
    /**
     * 需要创建的MQTT客户端
     */
    List<MqttClient> clientList;
}

一个MqttClient类用来构造配置文件中的数据对象

@Data
public class MqttClient {
    /**
     * 客户端ID
     */
    private String clientId;
    /**
     * 监听主题
     */
    private String subscribeTopic;
    /**
     * 用户名
     */
    private String userName;
    /**
     * 密码
     */
    private String password;
}

服务运行的时候进行mqtt客户端创建,创建的数据从配置文件中读取

/**
 * MQTT客户端创建
 */
@Component
@Slf4j
public class MqttClientCreate {
    @Resource
    private MqttClientManager mqttClientManager;
    @Resource
    private MqttConfig mqttConfig;

    /**
     * 创建MQTT客户端
     */
    @PostConstruct
    public void createMqttClient() {

        // 会读取配置文件中的clientList
        List<MqttClient> mqttClientList = mqttConfig.getClientList();

        // 遍历去创建
        for (MqttClient mqttClient : mqttClientList) {
            log.info("{}", mqttClient);
            mqttClientManager.createMqttClient(mqttClient.getClientId(), mqttClient.getSubscribeTopic());
        }
    }
}

这是创建的代码,问题很多(请看代码的注释部分)


```java
@Slf4j
@Component
public class MqttClientManager {

    @Value("${customer.mqtt.broker}")
    private String mqttBroker;

    @Resource
    private MqttCallBackContext mqttCallBackContext;
    /**
     * 存储MQTT客户端
     */
    public static Map<String, MqttClient> MQTT_CLIENT_MAP = new ConcurrentHashMap<>();

    public MqttClient getMqttClientById(String clientId) {
        return MQTT_CLIENT_MAP.get(clientId);
    }

    /**
     * 创建mqtt客户端
     * @param clientId       客户端ID
     * @param subscribeTopic 订阅主题,可为空
     */
    public void createMqttClient(String clientId, String subscribeTopic) {

        // 它将消息存储在内存中,而不是持久存储到文件或其他存储介质中
        MemoryPersistence persistence = new MemoryPersistence();
        try {
            MqttClient client = new MqttClient(mqttBroker, clientId, persistence);
            MqttConnectOptions connOpts = new MqttConnectOptions();

            // 客户端每次连接到 MQTT 服务器时都会被视为一个全新的会话。
            connOpts.setCleanSession(true);

            if (null != subscribeTopic && !subscribeTopic.isEmpty()) {

                AbsMqttCallBack callBack = mqttCallBackContext.getCallBack(clientId);

                // 这里的default就是DefaultMqttCallBack, 一开始创建的时候走的就是这个
                // 问题最大的地方在这,通过这样方式拿回来的是同一个对象,hashCode也相同
                // 现在想到的做法是深拷贝,有没有什么好的做法,比如通过构造方法
                if (null == callBack) {
					
					// 一开始这里的操作直接是, 当创建多个客户端的时候拿到的对象都是同一个
					// callback = mqttCallBackContext.getCallBack("default");

                    AbsMqttCallBack original  = mqttCallBackContext.getCallBack("default");
                    callBack = original.deepCopy();

                }
                callBack.setClientId(clientId);
                callBack.setConnectOptions(connOpts);
                client.setCallback(callBack);

            }

            //连接mqtt服务端broker
            client.connect(connOpts);
            log.info("客户端 {} 连接成功状态 {}", clientId, client.isConnected());

            // 订阅主题
            if (null != subscribeTopic && !subscribeTopic.isEmpty()) {
                if (subscribeTopic.contains("-")) {
                    client.subscribe(subscribeTopic.split("-"));
                }
                else {
                    client.subscribe(subscribeTopic);
                }
            }

            MQTT_CLIENT_MAP.putIfAbsent(clientId, client);



        } catch (MqttException e) {
            log.error("创建mqttClient失败!", e);
        }
    }
}

这是用于存储每个mqtt客户端的回调方法类

/**
 * MQTT订阅回调环境类
 */
@Component
@Slf4j
public class MqttCallBackContext {

    // 在 Spring 中,当你注入一个 Map<String, AbsMqttCallBack> 类型的字段时,
    // Spring 会自动将所有实现了 AbsMqttCallBack 接口的 Bean 收集起来,
    // 并将它们的名称作为键值。因此,DefaultMqttCallBack 会被注入到 callBackMap 中,键值为 "default"。
    private final Map<String, AbsMqttCallBack> callBackMap = new ConcurrentHashMap<>();

    /**
     * 默认构造函数
     *
     * @param callBackMap 回调集合
     */
    public MqttCallBackContext(Map<String, AbsMqttCallBack> callBackMap) {
        this.callBackMap.putAll(callBackMap);
    }

    /**
     * 获取MQTT回调类
     *
     * @param clientId 客户端ID
     * @return MQTT回调类
     */
    public AbsMqttCallBack getCallBack(String clientId) {
        return this.callBackMap.get(clientId);
    }
}

这里遇到的问题就是mqtt断了之后进行重新连接的机制,在MqttClientManager这个代码中之前的回调类是callback = mqttCallBackContext.getCallBack(“default”);这样拿的,通过hashCode来看,都一样,说明每次创建都会对这个对象进行修改,那么这里赋值的clientId就会变成最后一个创建的mqtt对象id,所以在重连代码中,每次进来的对象虽然是另外一个mqtt客户端,但是拿到的clientid都是同一个,没有办法进行获取和其它的操作

/**
 * MQTT回调抽象类
 */
@Setter
@Getter
@Slf4j
public abstract class AbsMqttCallBack implements MqttCallback {

    private String clientId;
    private MqttConnectOptions connectOptions;

    @Resource
    private MqttConfig mqttConfig;

    @Resource
    private MqttClientManager mqttClientManager;


    /**
     * 失去连接操作,进行重连
     *
     * @param throwable 异常
     */
    @Override
    public void connectionLost(Throwable throwable) {

        log.info("{}失去连接,进行尝试重连", this.clientId);

        MqttClient mqttClient = MqttClientManager.MQTT_CLIENT_MAP.get(clientId);

        String subscribeTopic = mqttConfig.getClientList().stream()
                .filter(item -> item.getClientId().equals(clientId))
                .map(com.ruoyi.web.core.mottconfig.MqttClient::getSubscribeTopic)
                .findFirst()
                .orElse(null);

        if (mqttClient != null) {
            MqttConnectOptions connOpts = new MqttConnectOptions();
            connOpts.setCleanSession(true); // 可以根据实际需求配置

            // 重连的尝试
            int retryCount = 0;
            int maxRetries = 10; // 最大重连次数
            while (retryCount < maxRetries) {
                try {

                    if (mqttClient.isConnected()) {
                        log.info("{} 重连成功", clientId);
                        return;
                    }

                    // 重新连接
                    mqttClient.connect(connOpts);

                    log.info("{} 重连成功", clientId);

                    if (null != subscribeTopic && !subscribeTopic.isEmpty()) {
                        if (subscribeTopic.contains("-")) {
                            mqttClient.subscribe(subscribeTopic.split("-"));
                        }
                        else {
                            mqttClient.subscribe(subscribeTopic);
                        }
                    }

                    break;

                } catch (MqttException e) {
                    retryCount++;
                    log.error("{} 重连失败,尝试第 {} 次重连", clientId, retryCount, e);

                    // 可设置重连间隔,比如等待2秒钟后再尝试重连
                    try {
                        Thread.sleep(2000);
                    } catch (InterruptedException ie) {
                        Thread.currentThread().interrupt();
                    }
                }
            }

            if (retryCount == maxRetries) {
                log.error("{} 超过最大重连次数,重连失败", clientId);
            }
        }
    }


    /**
     * 接收订阅消息
     * @param topic    主题
     * @param mqttMessage 接收消息
     * @throws Exception 异常
     */
    @Override
    public void messageArrived(String topic, MqttMessage mqttMessage) throws Exception {
		String content = new String(mqttMessage.getPayload(), StandardCharsets.UTF_8);
     	handleReceiveMessage(topic, content);
    }

    /**
     * 消息发送成功
     *
     * @param iMqttDeliveryToken toke
     */
    @Override
    public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
        log.info("消息发送成功");
    }

    /**
     * 处理接收的消息
     * @param topic   主题
     * @param message 消息内容
     */
    protected abstract void handleReceiveMessage(String topic, String message);

    // 深拷贝方法
    public abstract AbsMqttCallBack deepCopy();

}


这里就是 最后的callback实现类进行业务处理

/**
 * 默认回调
 */
@Slf4j
@Component("default")
public class DefaultMqttCallBack extends AbsMqttCallBack {

    @Autowired
    private AlarmListService alarmListService;

    @Autowired
    private OperateService operateService;

    @Autowired
    private INrDeviceService iNrDeviceService;

    //private static final String TOPIC1 = 1;
    /**
     * @param topic   主题
     * @param message 消息内容
     */
    @Override
    protected void handleReceiveMessage(String topic, String message) {
        log.info("订阅的主题---{}", topic);
        log.info("接收到消息---{}", message);
        // 业务操作
    }

    @Override
    public AbsMqttCallBack deepCopy() {
        DefaultMqttCallBack copy = new DefaultMqttCallBack();
        copy.setClientId(this.getClientId());
        copy.setConnectOptions(this.getConnectOptions()); 
        copy.setMqttConfig(this.getMqttConfig());
        copy.setMqttClientManager(this.getMqttClientManager());
        return copy;
    }
}

我是通过深拷贝来做的,应该是可以通过构造方法来,但是对这个整体的代码还是不够熟悉,想看看应该如何优化,还请指点,最好笑的是:这段代码是公司一直使用的,用在了好几个项目上,我真是服了!!!!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2303378.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

大数据学习之任务流调度系统Azkaban、Superset可视化系统

一.任务流调度系统Azkaban 1.课程介绍 2.为什么需要工作流调度系统 3.AZKABAN是什么 4.AZKABAN下载 5.制作安装包 6.tar包准备 7.MYSQL配置AZKABAN 8.配置EXECUTOR SERVER 9.配置WEBSERVER 10.单作业实战_yaml语言(今天稍晚更新) 11.单作业实战 12.多作业依赖实战 13.失败自动重…

在VS-qt的程序中,后期增加PCH预编译功能,提高编译速度

由于前期创建qt程序的时候未勾选pch功能,导致没有启动预编译的功能. 这种情况下需要增加pch功能应该怎么做? 在项目中增加2个文件 stdafx.h和stdafx.cpp文件 stdafx.h增加qt常用头文件 #pragma once //windows #include <windows.h>//qt常用 #include <QObject&g…

蓝桥云客 路径之谜

11.路径之谜 - 蓝桥云课 路径之谜 题目描述 小明冒充X星球的骑士&#xff0c;进入了一个奇怪的城堡。 城堡里边什么都没有&#xff0c;只有方形石头铺成的地面。 假设城堡地面是nn个方格。如下图所示。 按习俗&#xff0c;骑士要从西北角走到东南角。可以横向或纵向移动&…

【Python项目】基于Python的语音数据及标注核对审核系统

【Python项目】基于Python的语音数据及标注核对审核系统 技术简介&#xff1a; 采用Python技术、MySQL数据库、Django框架等实现。 系统简介&#xff1a; 语音数据及标注核对审核系统是一个基于B/S架构的语音数据处理平台&#xff0c;旨在通过自动化的方式对语音数据进行标…

深入解析BFS算法:C++实现无权图最短路径的高效解决方案

在无权图中&#xff0c;广度优先搜索&#xff08;BFS&#xff09;是解决最短路径问题的高效算法。接下来博主从专业角度深入探讨其实现细节&#xff0c;并给出C代码示例&#xff1a; 目录 一、核心原理 二、算法步骤 三、C实现关键点 1. 数据结构 2. 边界检查 3. 路径回溯…

LeetCode刷题---二分查找---441

排列硬币 441. 排列硬币 - 力扣&#xff08;LeetCode&#xff09; 题目 你总共有 n 枚硬币&#xff0c;并计划将它们按阶梯状排列。对于一个由 k 行组成的阶梯&#xff0c;其第 i 行必须正好有 i 枚硬币。阶梯的最后一行 可能 是不完整的。 给你一个数字 n &#xff0c;计算…

Unity结合Vuforia虚拟按键实现AR机械仿真动画效果

零、最终效果 待上传 一、资源准备 1、Vuforia Vuforia版本不能高于10.17.4&#xff08;往上的版本虚拟按键功能被删除&#xff09; 2、Unity Unity版本必须要高于2022.3.x&#xff0c;不然使用Vuforia插件时会出现bug 二、主要内容 1、添加虚拟按钮 2、为虚拟按钮设置…

网络安全 linux学习计划 linux网络安全精要

&#x1f345; 点击文末小卡片 &#xff0c;免费获取网络安全全套资料&#xff0c;资料在手&#xff0c;涨薪更快 2.使用命令行 文件系统层次标准&#xff08;FHS&#xff09;是一个文件和目录在Unix和Linux操作系统上面应该如何存储的定义。 /bin 重要的二进制可执行程序/bo…

为AI聊天工具添加一个知识系统 之115 详细设计之56 知识表征 之2

本文要点 要点 知识表征的顶级范畴中最好是先将九个原语primitive T, ⊥, Independent, Relative, Mediating, Physical, Abstract, Continuant,和 Occurrent 进行分组&#xff08;分成2大组 和 4个小组&#xff09;并写出它们的满足公司&#xff0c;然后将它们和三种设计&am…

C#初级教程(1)——C# 与.NET 框架:探索微软平台编程的强大组合

图片来源&#xff1a; https://www.lvhang.site/docs/dotnettimeline 即梦AI - 一站式AI创作平台 一、历史发展脉络 在早期的微软平台编程中&#xff0c;常用的编程语言有 Visual Basic、C、C。到了 20 世纪 90 年代末&#xff0c;Win32 API、MFC&#xff08;Microsoft Found…

Mac m1 连接公司内网

1、创建VPN 1、在系统偏好设置 2、选择网络 3、进行添加 2、添加设置 1、选择VPN 2、类型选择L2TP/IPSec 3、填写服务器IP和账号 4、点击认证设置-填写密码 。然后应用 3、进行特殊配置 网上说苹果系统的问题。 1、创建命令 sudo vim /etc/ppp/options 2、添加内容-主要别…

C++:类与对象,定义类和构造函数

#define _CRT_SECURE_NO_WARNINGS 1 #include <iostream> using namespace std; //如何让定义一个类 // 封装 // 1、将数据和方法定义到一起。 // 2、把想给你看的数据给你看&#xff0c;不想给你看的封装起来。 通过访问限定符来实现 class Stack { public: //1.成…

Nginx环境安装

一、官网地址 Nginx官网&#xff1a;http://nginx.org/ Nginx中文网&#xff1a;https://nginx.p2hp.com/ 二、Nginx版本 mainline version 开发版本stableversion 稳定版本legacy version 历史版本 三、Windows系统安装Nginx 第一步&#xff1a;选择Windows版本&#xff0c;…

Spring AI + Ollama 实现调用DeepSeek-R1模型API

一、前言 随着人工智能技术的飞速发展&#xff0c;大语言模型&#xff08;LLM&#xff09;在各个领域的应用越来越广泛。DeepSeek 作为一款备受瞩目的国产大语言模型&#xff0c;凭借其强大的自然语言处理能力和丰富的知识储备&#xff0c;迅速成为业界关注的焦点。无论是文本生…

android系统SystemServer进程启动流程分析

目录 一,SystemServer整体框架 二,SystemServer启动源码分析 2.1,重要的概念 2.2,启动入口 2.3,创建对应进程的binder 三,binder驱动和binder线程池 四,SystemServer真正启动方法 4.1 SystemServer main方法里面主要做了几件事情 1)创建SystemServiceManager管理所有的…

Oracle 深入理解Lock和Latch ,解析访问数据块全流程

Oracle 锁机制介绍 根据保护对象的不同&#xff0c;单实例Oracle数据库锁可以分为以下几大类&#xff1a; DML lock&#xff08;data locks&#xff0c;数据锁&#xff09;&#xff1a;用于保护数据的完整性&#xff1b; DDL lock&#xff08;dictionary locks&#xff0c;字典…

如何基于transformers库通过训练Qwen/DeepSeek模型的传统分类能力实现文本分类任务

文章目录 模型与环境准备文档分析源码解读模型训练及推理方式进阶:CPU与显存的切换进阶:多卡数据并行训练🔑 DDP 训练过程核心步骤🚫 DDP 不适用于模型并行⚖️ DDP vs. Model Parallelism⚙️ 解决大模型训练的推荐方法🎉进入大模型应用与实战专栏 | 🚀查看更多专栏…

Unity中一个节点实现植物动态(Shader)

1 . 核心思路就操作顶点作往复运动&#xff1b; 核心代码&#xff1a; half stage1 dot(positionOS, float3(0, 1, 0)) * _Strength; half stage2 sin(dot(positionOS, float3(1, 0, 0)) * _Strength _Time.y * _Speed); half stage3 stage1 * stage2 * float3(0.001,…

PrimeTime:工具简介

相关阅读 PrimeTimehttps://blog.csdn.net/weixin_45791458/category_12900271.html?spm1001.2014.3001.5482 PrimeTime是PrimeTime Suite中的一个工具&#xff0c;能够执行全芯片级、门级的静态时序分析&#xff0c;这是芯片设计和分析流程中的一个关键部分。该工具通过检查…

【拜读】Tensor Product Attention Is All You Need姚期智团队开源兼容RoPE位置编码

姚期智团队开源新型注意力&#xff1a;张量积注意力&#xff08;Tensor Product Attention&#xff0c;TPA&#xff09;。有点像一种「动态的LoRA」&#xff0c;核心思路在于利用张量分解来压缩注意力机制中的 Q、K、V 表示&#xff0c;同时保留上下文信息&#xff0c;减少内存…