SpringBoot(Java)实现MQTT连接(本地Mosquitto)通讯调试

news2025/1/4 21:18:33

1.工作及使用背景

        工作中需要跟收集各种硬件或传感器数据用于Web展示及统计计算分析,如电表、流量计、泵、控制器等物联网设备。

        目前的思路及解决策略是使用力控或者杰控等组态软件实现数据的转储(也会涉及收费问题),通过组态软件自带的转储工具将数据转储到关系型数据库,如MySQL、sqlLite、Postgresql等。然后在BS架构后台程序中通过定时刷数据或者查询时计算的方式进行统计分析计算。

        但上述解决方案实际上是实现简单,但是数据统计时机有潜在的偏差风险,且逻辑设计非常别扭,数据库压力大等问题,理论上应该通过消息队列来接收实时数据参与计算的方式,Web系统只负责展示计算统计之后的结果,这样无论是时效还是数据准确性更容易保证,实时数据存储的数据库压力也不存在(可做数据校验用,也可不用),逻辑也不显别扭。

2.开发环境及工具

JDK1.8、maven、Mosquitto、IDEA、postman

3.框架结构及文件声明

因为我用的现成的框架,所以启动模块和业务模块分开了。实际开发调试中完全可以放一起也没关系。

MqttClientConnectorPool对外提供一个初始化的Mqtt客户端,在服务启动时初始化
MqttMsgSender对外提供一个可以执行消息发送的方法
MqttMsgSubscriber初始化一个Mqtt客户端,并根据配置订阅topic
TestController接收web请求的调用消息发送,用于测试
BusinessApplicationStartup服务启动时执行,调用MqttClientConnectorPool初始化一个客户端并调起MqttMsgSubscriber的监听等待
BusinessApplicationShutdown服务正常终止时调用,关闭服务启动默认创建的Mqtt客户端
MqttBrokerServerSpringBoot服务启动类

4.具体实现逻辑及代码

4.1 maven依赖

<properties>
	<MQTTv3.version>1.2.5</MQTTv3.version>
</properties>

<dependencyManagement>
	<dependencies>
		<dependency>
			<groupId>org.eclipse.paho</groupId>
			<artifactId>org.eclipse.paho.client.MQTTv3</artifactId>
			<version>${MQTTv3.version}</version>
		</dependency>
	</dependencies>
</dependencyManagement>

或者直接使用
<dependency>
	<groupId>org.eclipse.paho</groupId>
	<artifactId>org.eclipse.paho.client.MQTTv3</artifactId>
	<version>1.2.5</version>
</dependency>

4.2 MqttClientConnectorPool

import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;

@Slf4j
public class MqttClientConnectorPool {

    public static MqttClient mqttClient;

    /**
     * 连接MQTT客户端
     * @return 获取MQTT连队对象
     */
    public static MqttClient connectMQTT() {

        if (mqttClient != null){
            log.info("已存在,我深深的脑海!");
            return mqttClient;
        }

        try {
            // broker及连接信息
            String broker = "tcp://127.0.0.1:1883";
            String username = "admin";
            String password = "123456";
            String clientId = System.currentTimeMillis() + "";

            //创建MQTT客户端(指定broker、客户端id、消息持久策略)
            mqttClient = new MqttClient(broker, clientId, new MemoryPersistence());

            //创建连接参数配置
            MqttConnectOptions options = new MqttConnectOptions();
            options.setUserName(username);
            options.setPassword(password.toCharArray());
            //是否清除会话
            options.setCleanSession(true);
            //连接超时时间
            options.setKeepAliveInterval(20);
            //是否自动重连
            options.setAutomaticReconnect(true);
            mqttClient.connect(options);
            log.info("MqttClient 服务启动broker初始化!");
        } catch (MqttException e){
            log.error("MqttClient connect Error:{}", e.getMessage());
            e.printStackTrace();
        }

        return mqttClient;
    }

    /**
     * 关闭MQTT客户端
     * @param client client
     */
    public static void closeClient(MqttClient client){
        try {
            // 断开连接
            client.disconnect();
            // 关闭客户端
            client.close();
        } catch (MqttException e){
            log.error("MqttClient disconnect or close Error:{}", e.getMessage());
            e.printStackTrace();
        }
    }

    /**
     * 关闭MQTT客户端
     */
    public static void closeStaticClient(){
        try {
            if (mqttClient != null){
                // 断开连接
                mqttClient.disconnect();
                // 关闭客户端
                mqttClient.close();
            }
        } catch (MqttException e){
            log.error("MqttClient disconnect or close Error:{}", e.getMessage());
            e.printStackTrace();
        }
    }
}

4.3 MqttMsgSender

import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;

@Slf4j
public class MqttMsgSender {

    public void sendMessage(MqttClient client,String topic,String content,int qos){
        MqttMessage message = new MqttMessage(content.getBytes());
        message.setQos(qos);
        try{
            client.publish(topic,message);
        } catch (MqttException e){
            log.error("MqttClient publish text info Error:{}!", e.getMessage());
            e.printStackTrace();
        }
    }
}

4.4 MqttMsgSubscriber

import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.*;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;

@Slf4j
public class MqttMsgSubscriber {

    String broker = "tcp://127.0.0.1:1883";
    String topic = "/deviceUp";
    String username = "admin";
    String password = "123456";
    String clientId = System.currentTimeMillis() + "";
    int qos = 1;
    
    public void readSubscribeTopicMessage(){
        try {
            MqttClient client = new MqttClient(broker, clientId, new MemoryPersistence());

            // 连接参数
            MqttConnectOptions options = new MqttConnectOptions();
            options.setUserName(username);
            options.setPassword(password.toCharArray());
            //是否清除会话
            options.setCleanSession(true);
            options.setConnectionTimeout(60);
            options.setKeepAliveInterval(60);
            client.setCallback(new MqttCallback() {

                @Override
                public void connectionLost(Throwable throwable) {
                    log.error("连接丢失");
                }

                @Override
                public void messageArrived(String s, MqttMessage mqttMessage) throws Exception {
                    log.info("topic为: " + topic);
                    log.info("qos为: " + mqttMessage.getQos());
                    log.info("消息内容为: " + new String(mqttMessage.getPayload()));
                }

                @Override
                public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
                    // 当消息被完全传送出去后调用
                    log.info("交付完成 ---Delivery complete!");
                    // 可以在这里处理一些发送完成后的清理工作
                }
            });

            client.connect(options);
            client.subscribe(topic, qos);
        } catch (MqttException e){
            log.error("MqttMsgSubscriber 连接启动异常:{}", e.getMessage());
        } catch (Exception e){
            log.error("MqttMsgSubscriber 读取消息异常:{}", e.getMessage());
        }
    }
    
}

4.5 TestController

import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.springframework.web.bind.annotation.*;

import java.util.List;

@Slf4j
@RestController
@RequestMapping()
public class TestController {

    @GetMapping("/test/mqtt/{msg}")
    public String testSendMqttMsg(@PathVariable("msg") String msg){
        log.info("消息内容:{}.", msg);

        MqttClient mqttClient = MqttClientConnectorPool.connectMQTT();
        MqttMsgSender sender = new MqttMsgSender();

        String content = "{" + " \"deviceNo\": \"" + msg + "\"," + " \"val\": 232.5" + "}";

        String topic = "/deviceUp";
        int qos = 1;

        if (null != mqttClient){
            sender.sendMessage(mqttClient, topic, content, qos);
        } else {
            log.info("MqttClient为空,无法发送!");
            return "失败!";
        }

        return "成功!";
    }

}

4.6 BusinessApplicationStartup

import 包路径(可以删掉这一行手动导入).MqttClientConnectorPool;
import 包路径(可以删掉这一行手动导入).MqttMsgSubscriber;
import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.core.annotation.Order;
import org.springframework.stereotype.Component;

@Slf4j
@Order(10)
@Component
public class BusinessApplicationStartup implements ApplicationRunner {

    @Override
    public void run(ApplicationArguments args) throws Exception {
        log.info("MqttClientConnectorPool ===================== Startup");
        MqttClientConnectorPool.connectMQTT();
        log.info("MqttClientConnectorPool ===================== recoveryAllJob Over !");

        log.info("MqttMsgSubscriber ===================== Startup");
        // 先订阅等待
        MqttMsgSubscriber subscriber = new MqttMsgSubscriber();
        subscriber.readSubscribeTopicMessage();
    }
}

4.7 BusinessApplicationShutdown

import 包路径(可以删掉这一行手动导入).MqttClientConnectorPool;
import lombok.extern.slf4j.Slf4j;
import org.springframework.context.ApplicationListener;
import org.springframework.context.event.ContextClosedEvent;
import org.springframework.stereotype.Component;

@Slf4j
@Component
public class BusinessApplicationShutdown implements ApplicationListener<ContextClosedEvent> {

    @Override
    public void onApplicationEvent(ContextClosedEvent contextClosedEvent) {
        log.info("服务终止! shutdown hook, ContextClosedEvent");
        MqttClientConnectorPool.closeStaticClient();
    }

}

4.8 MqttBrokerServer

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.scheduling.annotation.EnableScheduling;

@EnableScheduling
@SpringBootApplication
public class MqttBrokerServer {

    public static void main(String[] args) {
        SpringApplication.run(MqttBrokerServer.class, args);
    }

}

5.其他备注

5.1 需要Mqtt(Broker)服务器

        如果是直接使用示例代码的Mqtt服务器(Broker)配置,需要在自己电脑上安装Mqtt服务器,如mosquitto、EMQX等,具体自行搜索,或者使用公用的Mqtt服务器(我没测试试过

// 📢注意,当前Broker本人未测试
String broker = "tcp://broker.emqx.io:1883";
String topic = "mqtt/test";
String username = "emqx";
String password = "public";

5.2 调试地址

如果配置文件没配置[server.servlet.context-path],就不需要我自己/backend这一段

6.参考文章

MQTT协议介绍及Java教程

https://baijiahao.baidu.com/s?id=1801542244354727565&wfr=spider&for=pc

7.喜欢作者

暂无

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2169975.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

【Python语言初识(五)】

一、文件和异常 在Python中实现文件的读写操作其实非常简单&#xff0c;通过Python内置的open函数&#xff0c;我们可以指定文件名、操作模式、编码信息等来获得操作文件的对象&#xff0c;接下来就可以对文件进行读写操作了。这里所说的操作模式是指要打开什么样的文件&#…

软件测评CNAS认可实验室程序文件之检测报告的编制和交付程序

软件测评实验室在申请CNAS认可时&#xff0c;需要根据相关准则文件的要求&#xff0c;建立质量管理体系&#xff0c;其中程序文件是质量管理体系中非常重要的一环。在前面的文章中&#xff0c;我们为大家整体介绍了CNAS软件测评实验室程序文件主要都有哪些&#xff0c;以及对部…

尾巴生活彩虹泥餐盒怎么样?测评10元的国产猫罐头:高爷家、希喂、尾巴生活

我家迎来了一位缅因猫成员&#xff0c;这家伙体型魁梧&#xff0c;颜值爆表&#xff0c;走起路来自带王者风范。说到食量&#xff0c;简直是猫咪界的“大胃王”&#xff0c;一顿饭顶得上四只小猫咪的总和。二三十元一罐的进口罐被它光速炫完&#xff0c;简直是给家里的钱包“瘦…

spring boot文件上传之x-file-storage

spring boot文件上传之x-file-storage 今天看到一个文件上传的开源组件x-file-storage&#xff0c;官方地址如下&#xff1a; https://x-file-storage.xuyanwu.cn/#/ 该组件官网是这样介绍的&#xff0c;如下&#xff1a; 一行代码将文件存储到本地、FTP、SFTP、WebDAV、阿…

细讲 Java 的父子继承、方法的重写与super关键字

&#x1f680; 个人简介&#xff1a;某大型国企资深软件开发工程师&#xff0c;信息系统项目管理师、CSDN优质创作者、阿里云专家博主&#xff0c;华为云云享专家&#xff0c;分享前端后端相关技术与工作常见问题~ &#x1f49f; 作 者&#xff1a;码喽的自我修养&#x1f9…

基于Java开发的(控制台)模拟的多用户多级目录的文件系统

多级文件系统 1 设计目的 为了加深对文件系统内部功能和实现过程的理解&#xff0c;设计一个模拟的多用户多级目录的文件系统&#xff0c;并实现具体的文件物理结构、目录结构以及较为完善的文件操作命令集。 2 设计内容 2.1系统操作 操作命令风格&#xff1a;本文件系统的…

基于单片机的温湿度检测判断系统

目录 一、主要功能 二、硬件资源 三、程序编程 四、实现现象 一、主要功能 基于STC89C52单片机&#xff0c;采用dht11温湿度传感器检测温湿度&#xff0c; 通过lcd1602显示屏各个参数&#xff0c;四个按键分别可以增加温湿度的阈值&#xff0c; 如果超过阈值&#xff0c;则…

BaseCTF2024 web

Web [Week1] HTTP 是什么呀 GET: ?basectf%77%65%31%63%25%30%30%6d%65POST: BaseflgX-Forwarded-For:127.0.0.1Referer: BaseCookie: c00k13i cant eat itUser-Agent: Base有Location跳转, 抓包得到flag: QmFzZUNURntkZGUzZjA0Yy1hMDg5LTQwNGMtOTFjNi01ODZjMzAxMzM3Y2J9Cg…

解锁创意新纪元:Stable Diffusion绘画技术的非凡优势

Stable Difusion 是一款从文本到图像的潜在扩散模型&#xff0c;其操作界面如图所示。该模型由初创公司Stabiity A1、慕尼黑大学机器视觉与学习小组以及神经网络视频公司Runway 合作研发&#xff0c;首次发布于2022年8月&#xff0c;而在同年11月更新的2.0版本更是给用户带来了…

“给领导买饭”,刺痛打工人

帮领导办私事&#xff0c;你会接受还是拒绝&#xff1f; 转载&#xff1a;定焦&#xff08;dingjiaoone&#xff09;原创 作者 | 艾乐伊 郑浩钧 苏琦 王璐 编辑 | 苏琦 打工人最讨厌的事&#xff0c;领导喊你帮他带饭带娃&#xff0c;算一件。 近日&#xff0c;上海某教培公司…

fuzzer实战-magma-模糊测试

Getting Started | magma首先打开这个链接&#xff0c;跟着官网指导做&#xff1a; 并且参考Titan的官网使用方法&#xff1a;GitHub - 5hadowblad3/Titan: Research artifact for Oakland (S&P) 2024, "Titan: Efficient Multi-target Directed Greybox Fuzzing&quo…

Rce脚本自动化amp;批量

这里放上一篇我学生的投稿文章 0x00 前言 在现代网络安全领域&#xff0c;远程代码执行&#xff08;RCE&#xff09;漏洞的发现与利用成为了重要的研究课题。随着攻击手段的不断演进&#xff0c;安全专业人士面临着日益复杂的威胁环境。为应对这一挑战&#xff0c;自动化和批…

ChatGPT Sidebar 浏览器插件配置指南

随着聊天机器人技术的不断进步&#xff0c;越来越多的人开始依赖这些强大的工具来提高工作效率、获取信息和解决问题。OpenAI 的 ChatGPT 是其中最受欢迎的聊天机器人之一。为了方便用户在浏览网页时随时与 ChatGPT 互动&#xff0c;开发者们设计了一款名为 ChatGPT Sidebar 的…

30+程序员顶着被裁员的压力,为什么选择从零开始:转行大模型?

在当今这个科技进步迅速的时代&#xff0c;程序员作为引领技术革新的关键角色&#xff0c;正处于一个既充满机会又面临挑战的关键时刻。随着人工智能、大数据处理、云服务等领域的迅猛发展&#xff0c;大型模型&#xff08;例如GPT系列、BERT等&#xff09;已经成为行业内的热议…

【07】纯血鸿蒙HarmonyOS NEXT星河版开发0基础学习笔记-Swiper轮播组件与样式结构重用

序言&#xff1a; 本文详细讲解了关于我们在页面上经常看到的轮播图在鸿蒙开发中如何用Swiper实现&#xff0c;介绍了Swiper的基本用法与属性&#xff0c;及如何面对大段的重复代码进行封装和重用&#xff08;Extend、Styles、Builder&#xff09;&#xff0c;使代码更加简洁易…

HarmonyOS鸿蒙开发实战( Beta5.0)标题下拉缩放实践案例

鸿蒙HarmonyOS NEXT开发实战往期文章必看&#xff08;持续更新......&#xff09; HarmonyOS NEXT应用开发性能实践总结 HarmonyOS NEXT应用开发案例实践总结合集 最新版&#xff01;“非常详细的” 鸿蒙HarmonyOS Next应用开发学习路线&#xff01;&#xff08;从零基础入门…

耦合微带线单元的网络参量和等效电路公式推导

文档下载链接&#xff1a;耦合微带线单元的网络参量和等效电路资源-CSDN文库https://download.csdn.net/download/lu2289504634/89583027笔者水平有限&#xff0c;错误之处欢迎留言&#xff01; 一、耦合微带线奇偶模详细推导过程 二、2,4端口开路 三、2端口短路、3端口开路 四…

护眼台灯哪个品牌更好?五款由专业眼科医生推荐的护眼台灯

台灯是每个家庭中不可或缺的照明设备&#xff0c;尤其是对于有学龄儿童的家庭来说&#xff0c;孩子们每天在家学习和做作业时&#xff0c;一款优秀的护眼台灯显得尤为重要。如果长期使用的台灯是不合格&#xff0c;不能给孩子提供一个好的光照环境&#xff0c;那么孩子们的视力…

VUE.js笔记

1.介绍vue Vue 是一款用于构建用户界面的 JavaScript 框架。它基于标准 HTML、CSS 和 JavaScript 构建&#xff0c;并提供了一套声明式的、组件化的编程模型&#xff0c;帮助你高效地开发用户界面。无论是简单还是复杂的界面&#xff0c;Vue 都可以胜任。 Vue 应用程序的基本…

镭射限高防外破预警装置-线路防外破可视化监控,安全尽在掌握中

镭射限高防外破预警装置-线路防外破可视化监控&#xff0c;安全尽在掌握中 在城市化浪潮的汹涌推进中&#xff0c;电力如同现代社会的生命之脉&#xff0c;其安全稳定运行直接关系到每一个人的生活质量和社会的整体发展。然而&#xff0c;随着建设的加速&#xff0c;电力设施通…