SpringBoot集成MQTT实现交互服务通信

news2024/9/22 1:44:23

引言

本文是springboot集成mqtt的一个实战案例。
gitee代码库地址:源码地址

一、什么是MQTT

MQTT(Message Queuing Telemetry Transport,消息队列遥测传输协议),是一种基于发布/订阅(publish/subscribe)模式的“轻量级”通讯协议,该协议构建于 TCP/IP 协议上,由 IBM 于 1999 年发明。MQTT 协议的主要特征是开放、简单、轻量级和易于实现,这些特征使得它适用于受约束的应用环境,如:

网络受限:网络带宽较低且传输不可靠
终端受限:协议运行在嵌入式设备上,嵌入式终端的处理器、内存等是受限的

MQTT 非常适用于物联网领域,如传感器与服务器的通信、传感器信息采集等。

二、发布/订阅模式

发布/订阅模式(Publish/Subscribe Pattern,简称Pub/Sub)是一种消息通信模式,在这种模式下,消息的发送者(发布者)不会将消息直接发送给特定的接收者(订阅者)。而是将代表消息内容的通知(事件)发布到一个特定的主题或频道上,而订阅了这个主题的接收者会收到所有在这个主题上发布的通知。这种模式解耦了消息的发送者和接收者,使得系统更加灵活和可扩展。

主要组成部分

  1. 发布者(Publisher):负责生成消息并将其发布到特定的主题或频道。

  2. 订阅者(Subscriber):注册对特定主题的兴趣,并接收该主题上的所有消息。

  3. 消息代理(Message Broker):作为中间件,它接收来自发布者的消息,并将这些消息传递给所有相关的订阅者。

优点

  • 解耦:发布者和订阅者之间不需要直接交互,这降低了系统的耦合度。

  • 灵活性:可以动态添加或删除订阅者,不影响其他组件。

  • 可扩展性:系统容易扩展,可以轻松增加新的发布者或订阅者。

缺点

  • 复杂性:引入了额外的组件(如消息代理),增加了系统的复杂性和管理成本。

  • 性能开销:消息的传递需要通过中间件,可能会有延迟和性能损失。

应用场景

  • 事件驱动架构:在微服务架构中,不同的服务通过发布/订阅模式进行异步通信。

  • 数据流处理:如实时数据分析,多个组件可以订阅数据流并进行处理。

  • 分布式系统:用于跨系统或跨服务的消息传递。

发布/订阅模式并不是 MQTT 协议特有的模式,很多消息中间件都有使用发布/订阅模式,有同学可能认为这就是观察者模式,还真不是,这两个模式很容易混淆。观察者模式只有观察者 + 被观察者两个角色,而发布/订阅模式还有一个经纪人 Broker;往更深层次的讲观察者和被观察者,是松耦合的关系,而发布者和订阅者,则完全不存在耦合。

三、Windows下安装MQTT消息服务器

非常遗憾,EMQ X Broker 在 5.4.0 版本的发行版中已不支持 windows 版本的安装包了,笔者从网上找了一个最后支持版本的压缩包,已上传资源。

  • 解压后,在bin文件下,使用cmd执行运行命令 .\emqx console
  • 访问MQTT管理页面 http://localhost:18083/#/ 用户名密码 admin/public

如果报错缺少Erlang环境,需要自行安装下该环境

在这里插入图片描述
在这里插入图片描述

浏览器访问:http://localhost:18083/#,输入账号密码进入,会要求你修改密码,可以暂时跳过

在这里插入图片描述

四、Windows安装MQTT消息代理客户端MQTTX

下载地址:MQTTX下载地址

点击免费下载
在这里插入图片描述

选择64位版本

在这里插入图片描述
下好后点击安装,启动运行界面如下:
在这里插入图片描述
语言是英文,可以在设置按钮里调成中文。这个客户端代理主要是进行消息发送的测试服务。

五、新建MQTT集成项目

随便新建了一个springboot应用,用的是JDK17,在pom文件中引入如下依赖:

        <!-- MQTT -->
        <dependency>
            <groupId>org.eclipse.paho</groupId>
            <artifactId>org.eclipse.paho.client.mqttv3</artifactId>
            <version>1.2.5</version>
        </dependency>

5.1 yml配置

server:
  port: 8081

#允许循环依赖
spring:
  main:
    allow-circular-references: true

customer:
  mqtt:
    broker: tcp://localhost:1883
    clientList:
      #发布客户端ID
      - clientId: nays_service
        #监听主题 同时订阅多个主题 使用 - 分割开
        subscribeTopic: mqtt/publish
        #用户名
        userName: admin
        #密码
        password: public
      #接收客户端ID
      - clientId: receive_service
        #监听主题 同时订阅多个主题 使用 - 分割开
        subscribeTopic: mqtt/receive
        #用户名
        userName: admin
        #密码
        password: public



5.2 Mqtt配置类

package com.hulei.mqttproject.config;

import lombok.Data;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Configuration;

import java.util.List;

/**
 * Mqtt配置类
 */
@Data
@Configuration
@ConfigurationProperties(prefix = "customer.mqtt")
public class MqttConfig {
    /**
     * mqtt broker地址
     */
    String broker;
    /**
     * 需要创建的MQTT客户端
     */
    List<MqttClient> clientList;
}

5.3 MQTT客户端

package com.hulei.mqttproject.config;

import lombok.Data;


/**
 * MQTT客户端
 */
@Data
public class MqttClient {
    /**
     * 客户端ID
     */
    private String clientId;
    /**
     * 监听主题
     */
    private String subscribeTopic;
    /**
     * 用户名
     */
    private String userName;
    /**
     * 密码
     */
    private String password;
}

5.4 MQTT客户端管理类

package com.hulei.mqttproject.config;

import jakarta.annotation.Resource;
import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;

import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

/**
 * MQTT客户端管理类,如果客户端非常多后续可入redis缓存
 */
@Slf4j
@Component
public class MqttClientManager {

    @Value("${customer.mqtt.broker}")
    private String mqttBroker;

    @Resource
    private MqttCallBackContext mqttCallBackContext;
    /**
     * 存储MQTT客户端
     */
    public static Map<String, MqttClient> MQTT_CLIENT_MAP = new ConcurrentHashMap<>();

    public MqttClient getMqttClientById(String clientId) {
        return MQTT_CLIENT_MAP.get(clientId);
    }

    /**
     * 创建mqtt客户端
     *
     * @param clientId       客户端ID
     * @param subscribeTopic 订阅主题,可为空
     * @param userName       用户名,可为空
     * @param password       密码,可为空
     */
    public void createMqttClient(String clientId, String subscribeTopic, String userName, String password) {
        MemoryPersistence persistence = new MemoryPersistence();
        try {
            MqttClient client = new MqttClient(mqttBroker, clientId, persistence);
            MqttConnectOptions connOpts = new MqttConnectOptions();
            if (null != userName && !userName.isEmpty()) {
                connOpts.setUserName(userName);
            }

            if (null != password && !password.isEmpty()) {
                connOpts.setPassword(password.toCharArray());
            }

            connOpts.setCleanSession(true);

            if (null != subscribeTopic && !subscribeTopic.isEmpty()) {
                AbsMqttCallBack callBack = mqttCallBackContext.getCallBack(clientId);

                if (null == callBack) {
                    callBack = mqttCallBackContext.getCallBack("default");
                }

                callBack.setClientId(clientId);
                callBack.setConnectOptions(connOpts);
                client.setCallback(callBack);
            }

            //连接mqtt服务端broker
            client.connect(connOpts);
            // 订阅主题
            if (null != subscribeTopic && !subscribeTopic.isEmpty()) {
                if (subscribeTopic.contains("-"))
                    client.subscribe(subscribeTopic.split("-"));
                else {
                    client.subscribe(subscribeTopic);
                }
            }

            MQTT_CLIENT_MAP.putIfAbsent(clientId, client);
        } catch (MqttException e) {
            log.error("Create mqttClient failed!", e);
        }
    }
}

5.5 MQTT客户端创建

package com.hulei.mqttproject.config;

import jakarta.annotation.PostConstruct;
import jakarta.annotation.Resource;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;

import java.util.List;

/**
 * MQTT客户端创建
 */
@Component
@Slf4j
public class MqttClientCreate {
    @Resource
    private MqttClientManager mqttClientManager;
    @Resource
    private MqttConfig mqttConfig;

    /**
     * 创建MQTT客户端
     */
    @PostConstruct
    public void createMqttClient() {
        List<MqttClient> mqttClientList = mqttConfig.getClientList();

        for (MqttClient mqttClient : mqttClientList) {
            log.info("{}", mqttClient);
            //创建客户端,客户端ID:demo,回调类跟客户端ID一致
            mqttClientManager.createMqttClient(mqttClient.getClientId(), mqttClient.getSubscribeTopic(), mqttClient.getUserName(), mqttClient.getPassword());
        }
    }
}

5.6 MQTT回调抽象类

package com.hulei.mqttproject.config;

import jakarta.annotation.Resource;
import lombok.Getter;
import lombok.Setter;
import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttMessage;

/**
 * MQTT回调抽象类
 */
@Setter
@Getter
@Slf4j
public abstract class AbsMqttCallBack implements MqttCallback {

    private String clientId;

    private MqttConnectOptions connectOptions;

    @Resource
    MqttClientManager mqttClientManager;

    /**
     * 失去连接操作,进行重连
     *
     * @param throwable 异常
     */
    @Override
    public void connectionLost(Throwable throwable) {
        try {
            if (null != clientId) {
                if (null != connectOptions) {
                    mqttClientManager.getMqttClientById(clientId).connect(connectOptions);
                } else {
                    mqttClientManager.getMqttClientById(clientId).connect();
                }
            }

        } catch (Exception e) {
            log.error("{} reconnect failed!", e.getMessage(), e);
        }
    }

    /**
     * 接收订阅消息
     * @param topic    主题
     * @param mqttMessage 接收消息
     * @throws Exception 异常
     */
    @Override
    public void messageArrived(String topic, MqttMessage mqttMessage) throws Exception {
		String content = new String(mqttMessage.getPayload());
     	handleReceiveMessage(topic, content);
    }

    /**
     * 消息发送成功
     *
     * @param iMqttDeliveryToken toke
     */
    @Override
    public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
        log.info("消息发送成功");
    }


    /**
     * 处理接收的消息
     * @param topic   主题
     * @param message 消息内容
     */
    protected abstract void handleReceiveMessage(String topic, String message);
}


5.7 MQTT订阅回调环境类

package com.hulei.mqttproject.config;

import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;

import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

/**
 * MQTT订阅回调环境类
 */
@Component
@Slf4j
public class MqttCallBackContext {

    private final Map<String, AbsMqttCallBack> callBackMap = new ConcurrentHashMap<>();

    /**
     * 默认构造函数
     *
     * @param callBackMap 回调集合
     */
    public MqttCallBackContext(Map<String, AbsMqttCallBack> callBackMap) {
        this.callBackMap.putAll(callBackMap);
    }

    /**
     * 获取MQTT回调类
     *
     * @param clientId 客户端ID
     * @return MQTT回调类
     */
    public AbsMqttCallBack getCallBack(String clientId) {
        return this.callBackMap.get(clientId);
    }
}

5.8 默认回调类

package com.hulei.mqttproject.config;

import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;

/**
 * 默认回调
 */
@Slf4j
@Component("default")
public class DefaultMqttCallBack extends AbsMqttCallBack {

    /**
     * @param topic   主题
     * @param message 消息内容
     */
    @Override
    protected void handleReceiveMessage(String topic, String message) {
        log.info("接收到主题---{}", topic);
        log.info("接收到消息---{}", message);
        // 自定义消息处理业务

    }
}

六、测试服务类

package com.hulei.mqttproject.controller;

import com.hulei.mqttproject.config.MqttClientManager;
import jakarta.annotation.Resource;
import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

@RestController
@Slf4j
public class SendController {

    @Resource
    private MqttClientManager mqttClientManager;

    @RequestMapping("/sendMessage")
    public String sendMessage(String topic){
        try {
            MqttMessage mqttMessage = new MqttMessage("你好".getBytes());
            mqttClientManager.getMqttClientById("nays_service").publish(topic,mqttMessage);
            return "发送成功";
        } catch (Exception e) {
            log.error("发送失败",e);
            return "发送失败";
        }
    }
}

七、启动springboot

启动日志可以看到,mqtt消息服务器连接成功

在这里插入图片描述
EMQX工具显示发布客户端和接收客户端均已成功注册

在这里插入图片描述

使用Apifox测试下SendController中的接口,mqtt/receive是yaml中接收客户端订阅的主题,当然也可以往mqtt/publish主题发,mqtt中消息的发布者也可以订阅主题,监听某些消息。

在这里插入图片描述

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1933058.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

C++ : 移除链表元素/合并两个有序链表题解

目录 1.移除链表元素 分析 代码 2.合并两个有序链表 分析 代码 1.移除链表元素 分析 像这种移除元素的&#xff0c;加个哨兵位头节点会比较方便&#xff0c;因为旧的头会有被移除的情况&#xff0c;不好控制。这里只需要用cur指向待遍历的节点&#xff0c;prev指向cur的…

AI大牛Karpathy创办Eureka Labs专注AI+教育

&#x1f989; AI新闻 &#x1f680; AI大牛Karpathy创办Eureka Labs专注AI教育 摘要&#xff1a;前OpenAI大牛Karpathy离职半年后宣布创办专注AI与教育的公司Eureka Labs&#xff0c;旨在通过生成式AI优化教育体验。公司首个项目LLM101n课程已在GitHub获得高赞&#xff0c;目…

C++ 继承详解:从基础到深入

继承是面向对象编程中最强大的功能之一&#xff0c;它不仅促进了代码的重用&#xff0c;还帮助我们构建复杂的系统。在C中&#xff0c;通过继承&#xff0c;我们可以创建一个新的类&#xff08;称为派生类&#xff09;来扩展现有类&#xff08;基类&#xff09;的功能。本文将全…

基于python的百度资讯爬虫的设计与实现

研究背景 随着互联网和信息技术的飞速发展&#xff0c;网络已经成为人们获取信息的主要来源之一。特别是搜索引擎&#xff0c;作为信息检索的核心工具&#xff0c;极大地改变了人们获取信息的方式。其中&#xff0c;百度作为中国最受欢迎的搜索引擎之一&#xff0c;其新闻搜索…

[GXYCTF2019]Ping Ping Ping1

打开靶机 结合题目名称&#xff0c;考虑是命令注入&#xff0c;试试ls 结果应该就在flag.php。尝试构造命令注入载荷。 cat flag.php 可以看到过滤了空格,用 $IFS$1替换空格 还过滤了flag&#xff0c;我们用字符拼接的方式看能否绕过,ag;cat$IFS$1fla$a.php。注意这里用分号间隔…

【总结】逻辑运算在Z3中运用+CTF习题

国际赛IrisCTF在前几天举办&#xff0c;遇到了一道有意思的题目&#xff0c;特来总结。 题目 附件如下&#xff1a;&#x1f4ce;babyrevjohnson.tar 解题过程 关键main函数分析如下&#xff1a; int __fastcall main(int argc, const char **argv, const char**envp){int v4…

Golang | Leetcode Golang题解之第236题二叉树的最近公共祖先

题目&#xff1a; 题解&#xff1a; func lowestCommonAncestor(root, p, q *TreeNode) *TreeNode {parent : map[int]*TreeNode{}visited : map[int]bool{}var dfs func(*TreeNode)dfs func(r *TreeNode) {if r nil {return}if r.Left ! nil {parent[r.Left.Val] rdfs(r.L…

用 WireShark 抓住 TCP

Wireshark 是帮助我们分析网络请求的利器&#xff0c;建议每个同学都装一个。我们先用 Wireshark 抓取一个完整的连接建立、发送数据、断开连接的过程。 简单的介绍一下操作流程。 1、首先打开 Wireshark&#xff0c;在欢迎界面会列出当前机器上的所有网口、虚机网口等可以抓取…

气膜体育馆进校园:政策支持与市场前景—轻空间

过去20多年&#xff0c;气膜建筑、场馆相关项目在国内落地众多&#xff0c;展现出强大的市场潜力。2022年8月&#xff0c;《北京晚报》粗略统计&#xff0c;北京全市已建有气膜馆百余座&#xff0c;且数量还在不断增加。这一发展趋势不仅仅体现在北京&#xff0c;全国范围内也都…

微软GraphRAG +本地模型+Gradio 简单测试笔记

安装 pip install graphragmkdir -p ./ragtest/input#将文档拷贝至 ./ragtest/input/ 下python -m graphrag.index --init --root ./ragtest修改settings.yaml encoding_model: cl100k_base skip_workflows: [] llm:api_key: ${GRAPHRAG_API_KEY}type: openai_chat # or azu…

如何通过企业微信会话存档保护企业利益?

赵总: 张经理&#xff0c;最近行业内频发数据泄露事件&#xff0c;我们的客户资料和内部沟通记录安全吗&#xff1f; 张经理: 赵总&#xff0c;我们已经采取了一系列措施来加强数据安全。特别是针对企业微信的沟通记录&#xff0c;我们最近引入了安企神软件&#xff0c;它能很…

【BUG】已解决:AttributeError: ‘NoneType‘ object has no attribute ‘split‘

已解决&#xff1a;AttributeError: ‘NoneType‘ object has no attribute ‘split‘ 英杰社区https://bbs.csdn.net/topics/617804998 欢迎来到我的主页&#xff0c;我是博主英杰&#xff0c;211科班出身&#xff0c;就职于医疗科技公司&#xff0c;热衷分享知识&#xff0c;…

通用图形处理器设计GPGPU基础与架构(四)

一、前言 本文将介绍GPGPU中线程束的调度方案、记分牌方案和线程块的分配与调度方案。 二、线程束调度 在计算机中有很多资源&#xff0c;既可以是虚拟的计算资源&#xff0c;如线程、进程或数据流&#xff0c;也可以是硬件资源&#xff0c;如处理器、网络连接或 ALU 单元。调…

大数据平台之Kafka

Apache Kafka 是一个分布式流处理平台&#xff0c;最初由 LinkedIn 开发&#xff0c;并在 2011 年开源成为 Apache 项目。Kafka 主要用于构建实时数据管道和流应用&#xff0c;具有高吞吐量、低延迟、容错性强等特点。以下是对 Kafka 的详细介绍&#xff1a; 核心概念 1. Prod…

爬虫瑞数5案例:某大学总医院

声明: 该文章为学习使用,严禁用于商业用途和非法用途,违者后果自负,由此产生的一切后果均与作者无关 一、瑞数简介 瑞数动态安全 Botgate(机器人防火墙)以“动态安全”技术为核心,通过动态封装、动态验证、动态混淆、动态令牌等技术对服务器网页底层代码持续动态变换,…

【 LCD1602显示屏】使用STC89C51控制1602显示、读写操作时序

文章目录 LCD1602显示概述&#xff1a;引脚说明控制指令接线 控制思路步骤 代码示例总结对databuffer dataShow;的理解 LCD1602显示 概述&#xff1a; LCD1602&#xff08;Liquid Crystal Display&#xff09;是一种工业字符型液晶&#xff0c;能够同时显示 1602 即 32 字符…

深入理解Android中的缓存与文件存储目录

&#x1f31f; 引言 在Android应用开发中&#xff0c;合理管理应用的数据存储至关重要。应用可能需要保存各种类型的数据&#xff0c;从简单的配置信息到多媒体文件&#xff0c;甚至是缓存数据以提高性能和用户体验。Android提供了多个内置目录来满足这些需求&#xff0c;但它…

探索Facebook的最新更新:社交体验的新高度

Facebook作为全球领先的社交媒体平台&#xff0c;一直致力于不断创新和改进&#xff0c;以提供更优质的用户体验。近期&#xff0c;Facebook推出了一系列新的更新&#xff0c;旨在提升用户的社交互动体验和平台功能。本文将详细探讨这些最新更新&#xff0c;分析其对用户和社交…

模拟string(一)详解

目录 string()构造函数无参初始化错误写法代码 string(const char* str)构造函数有参初始化错误写法代码 string(const char* str "")合并无参和有参的构造函数代码错误写法代码 拷贝构造函数浅拷贝深拷贝方法一string(const string& s)方法二string& opera…

Hadoop-35 HBase 集群配置和启动 3节点云服务器 集群效果测试 Shell测试

点一下关注吧&#xff01;&#xff01;&#xff01;非常感谢&#xff01;&#xff01;持续更新&#xff01;&#xff01;&#xff01; 目前已经更新到了&#xff1a; HadoopHDFSMapReduceHiveFlumeSqoopZookeeperHBase 正在 章节内容 上一节我们完成了&#xff1a; HBase …