若依前后分离版框架下Springboot java引入Mqtt接受发送消息

news2025/1/10 23:29:51

**这只是其中一种而且是粗浅的接、发消息。
同步机制还要跟搞物联网的同事沟通确认去看看能不能实现 或者是设备比较多的情况下 不会去使用同步机制
首先pom文件 引入依赖
**

        <dependency>
            <groupId>org.eclipse.paho</groupId>
            <artifactId>org.eclipse.paho.client.mqttv3</artifactId>
            <version>1.2.5</version>
        </dependency>

其次配置文件mqtt配置,我这是yml,其他配置文件写法需要改动下

mqtt:
    username: ****** # 用户名
    password: ****** # 密码
    hostUrl: tcp://******:1883 # tcp://ip:端口
    clientId: clientId # 客户端id
    defaultTopic: electric/#,test # 订阅主题  electric/#表示以electric/开头的主题都可以接受到
    timeout: 100 # 超时时间 (单位:秒)
    keepalive: 60 # 心跳 (单位:秒)
    enabled: true # 是否使用mqtt功能

**接下来到了代码层面了
先创建一个yml文件的实体类 MqttConfig
prefix = 这里地址看你自己的配置
@ConfigurationProperties(prefix = “mqtt”)
**



import com.ruoyi.common.utils.StringUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Component;

@Component
@ConfigurationProperties(prefix = "mqtt")
public class MqttConfig {
    @Autowired
    private MqttPushClient mqttPushClient;

    /**
     * 用户名
     */
    private String username;
    /**
     * 密码
     */
    private String password;
    /**
     * 连接地址
     */
    private String hostUrl;
    /**
     * 客户Id
     */
    private String clientId;
    /**
     * 默认连接话题
     */
    private String defaultTopic;
    /**
     * 超时时间
     */
    private int timeout;
    /**
     * 保持连接数
     */
    private int keepalive;
    /**
     * mqtt功能使能
     */
    private boolean enabled;
    private boolean retained;
    /**
     * qos
     */
    private int qos;

    public String getUsername() {
        return username;
    }

    public void setUsername(String username) {
        this.username = username;
    }

    public String getPassword() {
        return password;
    }

    public void setPassword(String password) {
        this.password = password;
    }

    public String getHostUrl() {
        return hostUrl;
    }

    public void setHostUrl(String hostUrl) {
        this.hostUrl = hostUrl;
    }

    public String getClientId() {
        return clientId;
    }

    public void setClientId(String clientId) {
        this.clientId = clientId;
    }

    public String getDefaultTopic() {
        return defaultTopic;
    }

    public void setDefaultTopic(String defaultTopic) {
        this.defaultTopic = defaultTopic;
    }

    public int getTimeout() {
        return timeout;
    }

    public void setTimeout(int timeout) {
        this.timeout = timeout;
    }

    public int getKeepalive() {
        return keepalive;
    }

    public void setKeepalive(int keepalive) {
        this.keepalive = keepalive;
    }

    public boolean isEnabled() {
        return enabled;
    }

    public void setEnabled(boolean enabled) {
        this.enabled = enabled;
    }
    public int getQos() {
        return qos;
    }

    public void setQos(int qos) {
        this.qos = qos;
    }



    @Bean
    public MqttPushClient getMqttPushClient() {
        if(enabled == true){
            String mqtt_topic[] = StringUtils.split(defaultTopic, ",");
            mqttPushClient.connect(hostUrl, clientId, username, password, timeout, keepalive);//连接
            for(int i=0; i<mqtt_topic.length; i++){
                mqttPushClient.subscribe(mqtt_topic[i], 0);//订阅主题
            }
        }
        return mqttPushClient;
    }
}

**这里在创建 MqttPushClient 文件
去链接客户端、发消息、订阅主题 功能都在这里
**

import org.eclipse.paho.client.mqttv3.*;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;


@Component
public class MqttPushClient {
    private static final Logger logger = LoggerFactory.getLogger(MqttPushClient.class);

    @Autowired
    private PushCallback pushCallback;

    private static MqttClient client;

    private static MqttClient getClient() {
        return client;
    }

    private static void setClient(MqttClient client) {
        MqttPushClient.client = client;
    }

    /**
     * 客户端连接
     *
     * @param host      ip+端口
     * @param clientID  客户端Id
     * @param username  用户名
     * @param password  密码
     * @param timeout   超时时间
     * @param keepalive 保留数
     */
    public void connect(String host, String clientID, String username, String password, int timeout, int keepalive) {
        MqttClient client;
        try {
            client = new MqttClient(host, clientID, new MemoryPersistence());
            MqttConnectOptions options = new MqttConnectOptions();
            options.setCleanSession(true);
            options.setUserName(username);
            options.setPassword(password.toCharArray());
            options.setConnectionTimeout(timeout);
            options.setKeepAliveInterval(keepalive);
            MqttPushClient.setClient(client);
            try {
                client.setCallback(pushCallback);
                client.connect(options);
            } catch (Exception e) {
                e.printStackTrace();
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    /**
     * 发布消息
     *
     * @param pubTopic 主题
     * @param message 内容
     * @param qos   连接方式
     */
    public  static void publishMessage(String pubTopic, String message, int qos) {
            System.out.println("发布消息   "+client.isConnected());
            System.out.println("id:"+client.getClientId());
            MqttMessage mqttMessage = new MqttMessage();
            mqttMessage.setQos(qos);
            mqttMessage.setPayload(message.getBytes());

            MqttTopic topic = client.getTopic(pubTopic);

            if(null != topic) {
                try {
                    MqttDeliveryToken publish = topic.publish(mqttMessage);
                    if(!publish.isComplete()) {
                        logger.info("发布消息成功");
                    }
                } catch (MqttException e) {
                    e.printStackTrace();
                }
            }
    }

    /**
     * 订阅某个主题
     *
     * @param topic 主题
     * @param qos   连接方式
     */
    public static void subscribe(String topic, int qos) {
        logger.info("开始订阅主题" + topic);
        try {
            MqttPushClient.getClient().subscribe(topic, qos);
        } catch (MqttException e) {
            e.printStackTrace();
        }
    }

}

**再创建一个继承回调方法的接口 PushCallback
**

package com.ruoyi.util.mqttUtil;

import com.alibaba.fastjson2.JSONObject;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

@Component
public class PushCallback implements MqttCallback {
    private static final Logger logger = LoggerFactory.getLogger(MqttPushClient.class);

    @Autowired
    private MqttConfig mqttConfig;

    private static MqttClient client;

    private static String _topic;
    private static String _qos;
    private static String _msg;

    @Override
    public void connectionLost(Throwable throwable) {
        // 连接丢失后,一般在这里面进行重连
        logger.info("连接断开,可以做重连");
        if (client == null || !client.isConnected()) {
            mqttConfig.getMqttPushClient();
        }
    }

    @Override
    public void messageArrived(String topic, MqttMessage mqttMessage) throws Exception {
        // subscribe后得到的消息会执行到这里面
        logger.info("接收消息主题 : " + topic);
        logger.info("接收消息Qos : " + mqttMessage.getQos());
        logger.info("接收消息内容 : " + new String(mqttMessage.getPayload()));

        _topic = topic;
        _qos = mqttMessage.getQos()+"";
        _msg = new String(mqttMessage.getPayload());
    }

    @Override
    public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
        logger.info("发布消息成功");
        //发布消息成功之后 才会调用这里 大家可以仔细看看token里面 后续同步机制也是利用这个token去完成
        logger.info("deliveryComplete---------" + iMqttDeliveryToken.isComplete());
    }

    //别的Controller层会调用这个方法来  获取  接收到的硬件数据
    public String receive() {
        JSONObject jsonObject = new JSONObject();
        jsonObject.put("topic", _topic);
        jsonObject.put("qos", _qos);
        jsonObject.put("msg", _msg);
        return jsonObject.toString();
    }

}

到这就需要去下载个 MQTTX 跟服务器直接互相接发消息了

下图红框内的随意填写 服务器地址、端口、用户名、密码使用java代码配置文件里面的
在这里插入图片描述

在这里插入图片描述

往下就是连接上 添加一个订阅,记得 这个订阅要在你在配置文件里面哦 什么名字都ok /#相当于模糊查询
在这里插入图片描述
好了 这里可以启动项目了 控制台会打印咱们订阅的主题的,也就是说这些主题给咱们发消息 会直接被咱们接受的
在这里插入图片描述

启动项目 由于咱们的配置文件里订阅了test这个主题 我在mqttx里面直接给 test这个主题发送信息

框住的地方是什么就是给那个主题发消息

控制台自动打印 订阅的test主题信息
在这里插入图片描述
**到这里的话 接受消息就完事了 就要搞下发消息了
随便找个controller弄个请求搞一下 **

    @RequestMapping("/send")
    @ResponseBody
    private ResponseEntity<String> send() throws MqttException {
       System.out.println("我是springboot发送的数据");
       //三个参数 第一个是什么主题,第二个发送内容,第三个是
        MqttPushClient.publishMessage("clientId1","-===============",1);
        return new ResponseEntity<>("OK", HttpStatus.OK);
    }

在这里插入图片描述

在这里插入图片描述

已分享完毕,只是很基础的应用 另过几天如果项目有需求会在这继续完善同步mqtt请求的后续 如果接受不到消息 一定要看看订阅的主题对应起来没

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1208441.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

spring cloud微服务中多线程下,子线程通过feign调用其它服务,请求头token等丢失

在线程池中&#xff0c;子线程调用其他服务&#xff0c;请求头丢失&#xff0c;token为空的情况 看了很多篇文章的处理方法和在自己亲测的情况下做出说明&#xff1a; 第一种&#xff1a; 这种方式只支持在主线程情况下&#xff0c;能够处理&#xff0c;在多线程情况下&#…

基于Python实现汽车销售数据可视化【500010086】

导入模块 import numpy as np import pandas as pd import plotly.graph_objects as go import plotly.express as px获取数据 df1 pd.read_excel(r"./data/中国汽车总体销量.xlsx") print(df1.head(5))df1.info()df1[年份] df1[时间].dt.year df1[月份] df1[时…

【论文阅读】GAIN: Missing Data Imputation using Generative Adversarial Nets

论文地址&#xff1a;[1806.02920] GAIN: Missing Data Imputation using Generative Adversarial Nets (arxiv.org)

【ML】欠拟合和过拟合的一些判别和优化方法(吴恩达机器学习笔记)

吴恩达老师的机器学习教程笔记 减少误差的一些方法 获得更多的训练实例——解决高方差尝试减少特征的数量——解决高方差尝试获得更多的特征——解决高偏差尝试增加多项式特征——解决高偏差尝试减少正则化程度 λ——解决高偏差尝试增加正则化程度 λ——解决高方差 什么是…

【Linux】Ubuntu16.04配置repo

Ubuntu16.04配置repo失败 在学习韦东山Linux嵌入式开发过程中&#xff0c;使用repo获取内核及工具链: git clone https://e.coding.net/codebug8/repo.gitmkdir -p 100ask_imx6ull-sdk && cd 100ask_imx6ull-sdk../repo/repo init -u https://gitee.com/weidongshan/m…

【Linux】gitee仓库的注册使用以及在Linux上远程把代码上传到gitee上的方法

君兮_的个人主页 即使走的再远&#xff0c;也勿忘启程时的初心 C/C 游戏开发 Hello,米娜桑们&#xff0c;这里是君兮_&#xff0c;今天为大家介绍一个在实际工作以及项目开发过程中非常实用的网站gitee&#xff0c;并教如何正确的使用这个网站以及常见问题的解决方案&#xf…

流量分析(信息安全铁人三项赛分区赛2-5.18)

题目描述 目录 题目描述 黑客的IP是多少 服务器1.99的web服务器使用的CMS及其版本号(请直接复制) 服务器拿到的webshell的网址(请输入url解码后的网址) 服务器1.99的主机名 网站根目录的绝对路径(注意最后加斜杠) 黑客上传的第一个文件名称是什么 黑客进行内网扫描&am…

实体门店创新神器曝光,拓世法宝AI智能直播一体机助力商家快速惊艳逆袭

在这个飞速变革的时代&#xff0c;传统实体门店面临着多重挑战。为了迎接市场的巨大变化&#xff0c;许多实体门店迫切寻求创新的方法来吸引顾客的眼球。数字化手段和新技术的引入成为实体门店应对市场需求的重要选择之一&#xff0c;是应对激烈竞争和不断变化的消费者行为的有…

《008.SpringBoot之教务系统》【界面简洁功能简单】

《008.SpringBoot之教务系统》【界面简洁功能简单】 项目简介 [1]本系统涉及到的技术主要如下&#xff1a; 推荐环境配置&#xff1a;DEA jdk1.8 Maven MySQL 前后端分离; 后台&#xff1a;SpringBootMybatis; 前台&#xff1a;JSPBootStrap; [2]功能模块展示&#xff1a; 管…

【云原生进阶之PaaS中间件】第三章Kafka-1-综述

1 Kafka简介 Kafka是最初由Linkedin公司开发&#xff0c;是一个分布式、支持分区的&#xff08;partition&#xff09;、多副本的&#xff08;replica&#xff09;&#xff0c;基于zookeeper协调的分布式消息系统&#xff0c;它的最大的特性就是可以实时的处理大量数据以满足各…

适合孩子写作业的台灯?精选专业的读写台灯

要说现在孩子学习必不可少的一件物品&#xff0c;那一定是台灯。因为台灯可以在夜晚的时候给孩子提供充足、舒适的光源环境&#xff0c;避免光线不足导致伤眼。不过随着孩子年龄的增长&#xff0c;作业的增加用眼需求会越来越大&#xff0c;导致了很多孩子早早就出现的视力问题…

μC/OS-II---消息邮箱管理1(os_flag.c)

目录 消息邮箱创建消息邮箱删除等待邮箱中的消息向邮箱发送一则消息 消息邮箱创建 OS_EVENT *OSMboxCreate (void *pmsg) {OS_EVENT *pevent; #if OS_CRITICAL_METHOD 3u /* Allocate storage for CPU status register */OS_CPU_SR cpu_sr …

探讨计算机内存管理:分页与分段的地址空间维度差异(为什么分页机制中逻辑地址空间是一维的,而分段机制中逻辑地址空间是二维的?)

在计算机系统中&#xff0c;内存管理是一个至关重要的组成部分&#xff0c;而分页机制和分段机制是两种常见的内存管理方式。一个引人疑惑的问题是&#xff1a;为什么分页机制中逻辑地址空间是一维的&#xff0c;而分段机制中逻辑地址空间是二维的呢&#xff1f;在本文中&#…

Nacos漏洞复现合集

本文主要复现nacos的一些经典漏洞&#xff0c;既是分享也是为了记录自己的成长&#xff0c;近期会持续更新。 1. QVD-2023-6271 Nacos身份绕过漏洞 1.1 漏洞级别 &#xff1a;高危 1.2 漏洞描述&#xff1a;低版本的Nacos存在默认的scertkey在未更换的情况下可以生成任意的可…

部署百川大语言模型Baichuan2

Baichuan2是百川智能推出的新一代开源大语言模型&#xff0c;采用 2.6 万亿 Tokens 的高质量语料训练。在多个权威的中文、英文和多语言的通用、领域 benchmark 上取得同尺寸最佳的效果。包含有 7B、13B 的 Base 和 Chat 版本&#xff0c;并提供了 Chat 版本的 4bits 量化。 模…

C++——内存管理(new/delete使用详解)

C内存管理 本章思维导图&#xff1a; 注&#xff1a;本章思维导图对应的xmind文件和.png文件已同步导入至资源 1. C/C内存区域的划分 在C/C中&#xff0c;内存区域主要划分为&#xff1a;内核区域、栈区、内存映射段、堆区、数据段、代码段等区域&#xff0c;如图&#xff1…

mysql数据库报错:1166-Incorrect column name ‘xxx‘

如图&#xff0c;我的报错是&#xff1a;1166-Incorrect column name ‘book_date’&#xff0c;很奇怪&#xff0c;其它的字段都没有报错&#xff0c;但是book_date报错了 报错原因&#xff1a;引入了空字符 可以看到我的鼠标和book_date中间还有一个空格。所以导致该行创建失…

ERP是什么意思?看这一篇就够了!

如果你身在制造业&#xff0c;那么一定对ERP不陌生。天天把ERP挂在嘴边&#xff0c;但你真的了解什么是ERP吗&#xff1f;本篇文章将介绍以下几点&#xff1a;1.ERP是什么意思&#xff1b;2.ERP的功能&#xff1b;3.ERP的落地案例。 一、ERP是什么意思 ERP是企业资源计划&…

5、鸿蒙项目远程调试

一、注册华为账号&#xff0c; 如果是华为手机&#xff0c;并注册了账号可能跳过此步骤&#xff0c;如果使用邮箱注册&#xff0c;此邮箱一定是要正确的邮箱&#xff0c;此处需要使用邮箱获取验证码 注册地址&#xff1a;‎ 1、进入注册页面&#xff0c;输入手机号等信息后点…

Python 爬虫之scrapy 库

文章目录 总的介绍相关模块 总的介绍 Scrapy是一个用于爬取网站数据的开源Python框架。它提供了一套强大而灵活的工具&#xff0c;用于从网站上提取所需的数据。Scrapy是基于Twisted异步网络库构建的&#xff0c;因此可以高效地处理大量的并发请求。以下是Scrapy的一些主要特点…