springboot+rabbitmq搭建mqtt协议实现订阅发布(亲测9w消息并发)

news2025/1/6 19:37:39

一、mqtt协议简单介绍

mqtt是一种基于发布/订阅(publish/subscribe)模式的轻量级通讯协议,通过订阅相应的主题来获取消息,是物联网(Internet
of Thing)中的一个标准传输协议。

二、rabbitmq的安装部署

1. 安装Erlang环境

yum -y install make gcc gcc-c++ kernel-devel m4 ncurses-devel openssl-devel
yum -y install ncurses-devel

2. 下载Erlang rpm 安装包和rabbitmq rpm安装包

rpm包自取:https://pan.baidu.com/s/1UGuxeEIYMK9hBHKYBClfTQ
提取码:tmfm

RPM 下载包版本地址:https://packagecloud.io/rabbitmq/erlang

下载RabbitMQ rpm 安装包: https://github.com/rabbitmq/rabbitmq-server/releases/

在这里插入图片描述
*注意版本统一

安装erlang

rpm -Uvh erlang-23.2.7-1.el7.x86_64.rpm

yum install -y erlang

erl -v

在这里插入图片描述
安装rabbitmq

yum install -y socat
rpm -Uvh rabbitmq-server-3.9.15-1.el7.noarch.rpm
yum install -y rabbitmq-server

启动rabbitmq

systemctl start rabbitmq-server

查看rabbitmq状态

systemctl status rabbitmq-server

在这里插入图片描述

3、添加用户

添加root用户取代guest用户

rabbitmqctl add_user root   root
rabbitmqctl set_user_tags root administrator
rabbitmqctl set_permissions -p "/" root ".*" ".*" ".*"

删除guest

rabbitmqctl delete_user guest  

创建普通用户并设置权限仅用于发送订阅消息
创建v-host

rabbitmqctl add_vhost /third_mqtt

创建用户

rabbitmqctl add_user third_client OP74X53Z

设置用户角色,无法登陆管理控制台,通常就是普通的生产者和消费者。

rabbitmqctl set_user_tags third_client none

设置用户在v-host下的权限

rabbitmqctl set_permissions -p /third_mqtt third_client ".*" ".*" ".*" 

设置用户在默认"/” v-host下的权限

rabbitmqctl set_permissions -p  /  third_client ".*" ".*" ".*" 

设置主题权限,可订阅和发布消息

rabbitmqctl set_topic_permissions -p /third_mqtt third_client amq.topic ".*" ".*"

三、启用 rabbitmq的mqtt协议和RabbitMQWeb管理界面

rabbitmq插件启用
启动RabbitMQWeb管理界面

rabbitmq-plugins enable rabbitmq_management

启用 rabbitmq的mqtt协议

rabbitmq-plugins enable rabbitmq_mqtt

启用 rabbitmq的web_mqtt协议(不使用js订阅发布可以不启用)

rabbitmq-plugins enable rabbitmq_web_mqtt

查看插件状态 E显式启用 e隐式启用

rabbitmq-plugins list

在这里插入图片描述
开放外网访问并重启防火墙

firewall-cmd --zone=public --add-port=5672/tcp --permanent
firewall-cmd --zone=public --add-port=15672/tcp --permanent
firewall-cmd --zone=public --add-port=1883/tcp --permanent
firewall-cmd --zone=public --add-port=15675/tcp --permanent

如果搭建rabbitmq集群模式需要把下面这个两个端口放开

firewall-cmd --zone=public --add-port=4369/tcp --permanent
firewall-cmd --zone=public --add-port=25672/tcp --permanent

重启防火墙

systemctl restart firewalld
firewall-cmd --zone=public --list-ports

以上部署操作全部设定完毕,重启rabbitmq服务,使用创建root用户登录进入rabbitmq控制台

在这里插入图片描述

*至此rabbitmq搭建mqtt安装部署结束,下面进入代码实现环节

四、代码实现

先在pom中添加依赖包

<dependency>
     <groupId>org.springframework.integration</groupId>
     <artifactId>spring-integration-mqtt</artifactId>
 </dependency>
 <dependency>
     <groupId>org.eclipse.paho</groupId>
     <artifactId>org.eclipse.paho.client.mqttv3</artifactId>
     <version>1.2.5</version>
 </dependency>

application.yml的所需的配置

mqtt:
  #MQTT-用户名 root
  username: third_client
  #MQTT-密码,需要解密 root
  password: OP74X53Z
  #MQTT-服务器连接地址,如果有多个,用逗号隔开,如:tcp://xxx.xxx.xx.xxx:1883,tcp://xxx.xxx.xxx.xxx:1883
  hostUrl: tcp://192.168.2.128:1883,tcp://192.168.2.129:1883
  #两个客户端的clientId不能相同,生产者和消费者的clientId不能相同 
  pubClientId: pub-client-id-al68pq1w-dev
  subClientId: sub-client-id-9v83pp7c-dev
  #发布的主题--MQTT-默认的消息推送主题,实际可在调用接口时指定
  pubTopic: defaultTopic
  #订阅的主题
  subTopic: gps-topic,oil-topic,broadcast-topic,fault-topic
  completionTimeout: 3000

mqtt服务类

package com.zdft.bhdcm.config.mtqq;

import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.annotation.IntegrationComponentScan;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.core.MessageProducer;
import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory;
import org.springframework.integration.mqtt.core.MqttPahoClientFactory;
import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter;
import org.springframework.integration.mqtt.outbound.MqttPahoMessageHandler;
import org.springframework.integration.mqtt.support.DefaultPahoMessageConverter;
import org.springframework.integration.mqtt.support.MqttHeaders;
import org.springframework.messaging.*;
import org.springframework.scheduling.concurrent.ConcurrentTaskScheduler;


/**
 * mqtt服务类
 * 一种基于发布/订阅(publish/subscribe)模式的轻量级通讯协议,通过订阅相应的主题来获取消息,
 * 是物联网(Internet of Thing)中的一个标准传输协议
 * ClientId是MQTT客户端的标识。MQTT服务端用该标识来识别客户端。因此ClientId必须是独立的。
 * clientID需为全局唯一。如果不同的设备使用相同的clientID同时连接物联网平台,那么先连接的那个设备会被强制断开。
 */
@Configuration
@IntegrationComponentScan
@Slf4j
public class MqttServerConfig {

    @Value("${mqtt.username}")
    private String username;

    @Value("${mqtt.password}")
    private String password;

    @Value("${mqtt.hostUrl}")
    private String hostUrl;

    @Value("${mqtt.pubClientId}")
    private String pubClientId;

    @Value("${mqtt.subClientId}")
    private String subClientId;

    @Value("${mqtt.pubTopic}")
    private String pubTopic;

    @Value("${mqtt.subTopic}")
    private String subTopic;

    @Value("${mqtt.completionTimeout}")
    private int completionTimeout;

    /*========================================factory=================================*/
    /**
     * mqtt客户工厂
     * @return
     */
    @Bean
    public MqttPahoClientFactory mqttClientFactory() {
        DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
        MqttConnectOptions mqttConnectOptions = new MqttConnectOptions();
        mqttConnectOptions.setUserName(username);
        mqttConnectOptions.setPassword(password.toCharArray());
        // 设置是否清空session,这里如果设置为false表示服务器会保留客户端的连接记录,
        // 把配置里的 cleanSession 设为false,客户端掉线后 服务器端不会清除session,
        // 当重连后可以接收之前订阅主题的消息。当客户端上线后会接受到它离线的这段时间的消息
        mqttConnectOptions.setCleanSession(false);
        mqttConnectOptions.setServerURIs(hostUrl.split(","));
        // 设置会话心跳时间 单位为秒 服务器会每隔1.5*20秒的时间向客户端发送个消息判断客户端是否在线,但这个方法并没有重连的机制
        mqttConnectOptions.setKeepAliveInterval(20);
        mqttConnectOptions.setMaxInflight(1000);
        factory.setConnectionOptions(mqttConnectOptions);
        return factory;
    }

    /*========================================sent=================================*/
    /**
     * mqtt出站通道
     * @return
     */
    @Bean
    public MessageChannel mqttOutboundChannel() {
        return new DirectChannel();
    }

    /**
     * mqtt出站handler
     *
     * @return {@link MessageHandler}
     */
    @Bean
    @ServiceActivator(inputChannel = "mqttOutboundChannel")
    public MessageHandler mqttOutboundHandler() {
        //MqttPahoMessageHandler初始化
        MqttPahoMessageHandler handler = new MqttPahoMessageHandler(pubClientId+"_send_handler_", mqttClientFactory());
        //设置默认的qos级别
        handler.setDefaultQos(1);
        //保留标志的默认值。如果没有mqtt_retained找到标题,则使用它。如果提供了自定义,则不使用它converter。这里不启用
        handler.setDefaultRetained(false);
        //设置发布的主题
        handler.setDefaultTopic(pubTopic);
        //当 时true,调用者不会阻塞。相反,它在发送消息时等待传递确认。默认值为false(在确认交付之前发送阻止)。
        handler.setAsync(false);
        //当 async 和 async-events 都为 true 时,会发出 MqttMessageSentEvent(请参阅事件)。它包含消息、主题、客户端库生成的messageId、clientId和clientInstance(每次连接客户端时递增)。当客户端库确认交付时,会发出 MqttMessageDeliveredEvent。它包含 messageId、clientId 和 clientInstance,使传递与发送相关联。任何 ApplicationListener 或事件入站通道适配器都可以接收这些事件。请注意,有可能在 MqttMessageSentEvent 之前接收到 MqttMessageDeliveredEvent。默认值为false。
        handler.setAsyncEvents(false);
        return handler;
    }


    /*========================================receive=================================*/

    /**
     * mqtt输入通道
     * @return
     */
    @Bean
    public MessageChannel mqttInputChannel() {
        return new DirectChannel();
    }

    /**
     * 入站
     * @return
     */
    @Bean
    public MessageProducer inbound() {
        //配置订阅端MqttPahoMessageDrivenChannelAdapter
        MqttPahoMessageDrivenChannelAdapter adapter =
                new MqttPahoMessageDrivenChannelAdapter(subClientId+"_receive_inbound_", mqttClientFactory(), subTopic.split(","));
        //设置超时时间
        adapter.setCompletionTimeout(completionTimeout);
        //设置默认的消息转换类
        adapter.setConverter(new DefaultPahoMessageConverter());
        //设置qos级别
        adapter.setQos(1);
        //设置入站管道
        adapter.setOutputChannel(mqttInputChannel());
        adapter.setTaskScheduler(new ConcurrentTaskScheduler());
        return adapter;
    }

    /**
     * 消息处理程序
     * @return
     */
    @Bean
    @ServiceActivator(inputChannel = "mqttInputChannel")
    public MessageHandler handler() {
        return new MessageHandler() {
            @Override
            public void handleMessage(Message<?> message) throws MessagingException {
                MessageHeaders headers = message.getHeaders();
                log.info("headers: {}", headers);
                String topic = headers.get(MqttHeaders.RECEIVED_TOPIC).toString();
                log.info("订阅主题为: {}", topic);
                String[] topics = subTopic.split(",");
                for (String t : topics) {
                    if (t.equals(topic)) {
                        log.info("订阅主题为:{};接收到该主题消息为:{}",topic,message.getPayload().toString());
                    }
                }
            }
        };
    }
}

mqtt网关(发布端需要用到)

package com.zdft.bhdcm.config.mtqq;

import org.springframework.integration.annotation.MessagingGateway;
import org.springframework.integration.mqtt.support.MqttHeaders;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.stereotype.Component;

/**
 *mqtt网关(发布端需要用到)
 */
@Component
@MessagingGateway(defaultRequestChannel = "mqttOutboundChannel")
public interface MqttGateway {


    /**
     * 发送到mqtt
     *
     * @param payload 有效载荷
     */
    void sendToMqtt(String payload);

    /**
     * 发送到mqtt
     *
     * @param topic   主题
     * @param payload 消息内容
     */
    void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, String payload);

    /**
     * 发送到mqtt
     *
     * @param topic   主题
     * @param qos     qos
     * @param payload 消息内容
     */
    void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, @Header(MqttHeaders.QOS) int qos, String payload);
}

测试发送和订阅
在这里插入图片描述

五、mqtt.fx连接mqtt工具使用

mqtt测试工具安装包:https://pan.baidu.com/s/1oun7rMVJITOK9VSyO785HQ
提取码:l3cm

1、配置连接及订阅

配置mqtt连接
在这里插入图片描述

配置用户名密码
在这里插入图片描述
订阅gps-topic

在这里插入图片描述

六、jmeter压测结果展示

这里拟2w的消息并发量,根据业务计算最高模拟测试9w消息并发量没出现问题

如何使用移步到 :https://blog.csdn.net/weixin_39393393/article/details/116640867?spm=1001.2014.3001.5502

1、使用jmeter模拟2w并发量

在这里插入图片描述

在这里插入图片描述

2、结果展示

rabbitmq控制台展示
在这里插入图片描述

后台打印
在这里插入图片描述

mqtt订阅的消息

在这里插入图片描述
jmeter压测报告

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/163854.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

c# 随机数,c# 生成随机数,c# 生成区间随机数,c# 生成随机数组

c# 随机数&#xff0c;c# 生成随机数&#xff0c;c# 生成区间随机数&#xff0c;c# 生成随机数组 小试牛刀 先看结果 生成200-700之间的5个随机数 第1的随机数是&#xff1a;647 第2的随机数是&#xff1a;219 第3的随机数是&#xff1a;311 第4的随机数是&#xff1a;210 第5…

Redisson的看门狗机制

背景 据Redisson官网的介绍&#xff0c;Redisson是一个Java Redis客户端&#xff0c;与Spring 提供给我们的 RedisTemplate 工具没有本质的区别&#xff0c;可以把它看做是一个功能更强大的客户端&#xff08;虽然官网上声称Redisson不只是一个Java Redis客户端&#xff09; …

记录Maven的相关操作(笔记整理)

一、安装 我使用的是免安装版的&#xff0c;直接解压缩就可以使用。 二、配置环境变量 打开环境变量配置。右键计算机→属性→高级系统设置→高级→环境变量&#xff0c;在系统变量中配置。 配置MAVEN_HOME。在系统变量中新建&#xff0c;变量名MAVEN_HOME&#xff0c;变量值…

parquet

一、parquet结构 Row Group ​ --Column Chunk&#xff1a;一列对应一个Column Chunk ​ – Page&#xff1a;压缩和编码的单元&#xff0c;parquet的 min/max 索引是针对于page的&#xff0c;存在了文件的页脚。以前的版本是存储Column Chunk和Page的索引&#xff0c;导致在…

Linux命令scp用法

本文主要讲的是scp用法如果哪里不对欢迎指出&#xff0c;主页https://blog.csdn.net/qq_57785602?typeblog首先讲述一下scp用法并不是让你连接公司服务器后用的&#xff08;不是连接公司服务器使用&#xff09;&#xff0c;如果要使用的情况下那么请看下面&#xff1a;winr打开…

【C语言篇】请把这篇文章推给现在还对指针一知半解的童鞋~超生动图解,详细讲解,易懂,易学,让天下没有难懂的指针~

&#x1f331;博主简介&#xff1a;是瑶瑶子啦&#xff0c;一名大一计科生&#xff0c;目前在努力学习C进阶,JavaSE。热爱写博客~正在努力成为一个厉害的开发程序媛&#xff01; &#x1f4dc;所属专栏&#xff1a;C/C ✈往期博文回顾:进入内存,透彻理解数据类型存在的意义,整形…

[java拓展]Mysql数据库的基础指令,和JDBC的使用

1.关于mysql数据库 &#xff08;1&#xff09;概述 DBMS数据库管理系统&#xff0c;用来管理数据库&#xff0c;执行sql语句的东西&#xff0c;Mysql&#xff0c;oracle&#xff0c;sqlite这些严格来说不是数据库而是数据库管理系统&#xff0c;其中Mysql最常用&#xff0c;而…

马来酰亚胺聚乙二醇硅烷,MAL-PEG-Silane 结构,科研试剂溶于大部分有机溶剂

马来酰亚胺聚乙二醇硅烷&#xff0c;MAL-PEG-Silane 中文名称&#xff1a;马来酰亚胺聚乙二醇硅烷 分子量&#xff1a;1k&#xff0c;2k&#xff0c;3.4k&#xff0c;5k&#xff0c;10k&#xff0c;20k。。。 存储条件&#xff1a;-20C&#xff0c;避光&#xff0c;避湿 用…

剑指offer----C语言版----第十九天----面试题25:合并两个有序的链表

目录 1. 合并两个排序的链表 1.1 题目描述 1.2 解题思路 1.3 往期回顾 1. 合并两个排序的链表 原题链接&#xff1a;21. 合并两个有序链表 - 力扣&#xff08;LeetCode&#xff09;1.1 题目描述 将两个升序链表合并为一个新的 升序 链表并返回。新链表是通过拼接给定的两个…

new-Crack:StarUML 5.1.0 -2023

StarUML 用于敏捷和简洁建模的复杂软件建模器 当前版本&#xff1a;v5.1.0 主要特征 通用语言2 兼容 UML 2.x 标准元模型和图表&#xff1a;类、对象、用例、组件、部署、复合结构、序列、通信、状态图、活动、时间、交互溢出、信息流和剖面图。 SYSML 支持 支持使用 SysML 图…

SSM03_SpringMVC REST风格 SSM整合

01-SpringMVC简介SpringMVC是隶属于Spring框架的一部分&#xff0c;主要是用来进行Web开发&#xff0c;是对Servlet进行了封装。SpringMVC是处于Web层的框架&#xff0c;所以其主要的作用就是用来接收前端发过来的请求和数据然后经过处理并将处理的结果响应给前端&#xff0c;所…

【iHooya】1月14日寒假集训课作业解析

内部元素之和 输入一个整数矩阵&#xff0c;计算位于矩阵内部的元素之和。所谓矩阵内部的元素&#xff0c;不在第一行和最后一行的元素以及第一列和最后一列的元素。 输入 第一行分别为矩阵的行数m和列数n&#xff08;m < 100&#xff0c;n < 100&#xff09;&#xff0…

Arthas 入门到实战(一)快速入门

Arthas官方文档指出&#xff1a; 介绍&#xff1a;Arthas 是一款线上监控诊断产品&#xff0c;通过全局视角实时查看应用 load、内存、gc、线程的状态信息&#xff0c;并能在不修改应用代码的情况下&#xff0c;对业务问题进行诊断&#xff0c;包括查看方法调用的出入参、异常&…

人工智能与游戏

游戏是智能应用最好的平台&#xff0c;可惜的是&#xff1a;只用了少部分计算AI&#xff0c;还没有用到智能的计算计1 引言从1950年香农教授提出为计算机象棋博弈编写程序开始&#xff0c;游戏人工智能就是人工智能技术研究的前沿&#xff0c;被誉为人工智能界的“果蝇”&#…

微信小程序070校园食堂订餐多商家带配送

基于微信小程序食堂订餐管理系 系统分为用户和管理员&#xff0c;商家三个角色 用户小程序端的主要功能有&#xff1a; 1.用户注册和登陆小程序 2.查看系统新闻资讯公告 3.用户查看小程序端的菜品信息&#xff0c;在线搜索菜品&#xff0c; 4.用户查看菜品详情&#xff0c;收…

数据结构-二叉搜索树解析和实现

1.含义规则特性二叉搜索树也叫排序二叉树、有序二叉树&#xff0c;为什么这么叫呢&#xff1f;名字由来是什么&#xff1f;主要是它的规则图一规则一&#xff0c;左子树的所有节点的值均小于它的根节点的值规则二&#xff0c;右子树的所有节点的值均大于它的根节点的值&#xf…

摇头测距小车01_舵机和超声波代码封装

目录 一、摇头测距小车图片演示 二、接线方式 三、代码实现 一、摇头测距小车图片演示 就是在小车原有的基础上&#xff0c;在小车前面加一个舵机和一个超声波&#xff0c;把超声波粘在舵机上 二、接线方式 1、超声波接线 VCC-----上官一号5V GND----上官一号GND Trig----…

.net开发安卓入门-Dialog

.net开发安卓入门-DialogAndroid.App.AlertDialog运行效果代码UI源码引入 下面这个类库才可以使用Java.Interop.Export特性绑定事件Android.App.AlertDialog https://learn.microsoft.com/zh-cn/dotnet/api/android.app.alertdialog?viewxamarin-android-sdk-13 SetTitle &…

nodejs+vue摄影跟拍预定管理系统

&#xff0c;本系统分为用户&#xff0c;摄影师&#xff0c;管理员三个角色&#xff0c;用户可以注册登陆系统&#xff0c;查看摄影套餐&#xff0c;预约套餐&#xff0c;购买摄影周边商品&#xff0c;查看跟拍照片等。摄影师可以对用户的摄影预约审核&#xff0c;跟拍流程管理…

【Java集合】ArrayList自动扩容机制分析

目录 先从 ArrayList 的构造函数说起 一步一步分析 ArrayList 扩容机制 先来看 add 方法 再来看看 ensureCapacityInternal() 方法 ensureExplicitCapacity()和calculateCapacity方法 下面我们接着来看grow() 方法 再来看一下grow()中调用的hugeCapacity() 方法 System.arrayco…