SpringBoot+RabbitMQ实现MQTT协议通讯

news2024/12/27 12:55:18

一、简介

MQTT(消息队列遥测传输)是ISO 标准(ISO/IEC PRF 20922)下基于发布/订阅范式的消息协议。它工作在 TCP/IP协议族上,是为硬件性能低下的远程设备以及网络状况糟糕的情况下而设计的发布/订阅型消息协议,为此,它需要一个消息中间件 。此处使用RabbitMQ
MQTT是一个基于客户端-服务器的消息发布/订阅传输协议。MQTT协议是轻量、简单、开放和易于实现的,这些特点使它适用范围非常广泛。在很多情况下,包括受限的环境中,如:机器与机器(M2M)通信和物联网(IoT)。其在,通过卫星链路通信传感器、偶尔拨号的医疗设备、智能家居、及一些小型化设备中已广泛使用。

二、环境准备

2.1 Erlang安装

使用rabbitMQ首先需要安装Erlang环境,因为rabbitMQ是用Erlang语言编写的。

2.1.1 下载安装

官网下载:https://www.erlang.org/patches/otp-26.0 (比较慢,不推荐)
在这里插入图片描述

百度网盘下载:https://pan.baidu.com/s/1xU4syn14Bh7QR-skjm_hOg (推荐)
提取码:az1t

2.1.2 环境变量

进入高级系统设置
在这里插入图片描述
在这里插入图片描述
环境变量: 变量名-ERLANG_HOME 变量值-文件安装路径
在这里插入图片描述
配置path: 配置完上面的之后,找到系统变量中的path点击编辑,然后新建:%ERLANG_HOME%\bin
在这里插入图片描述
验证: 进入cmd,输入 erl -version 显示版本号就说明安装成功
在这里插入图片描述

2.2 RabbtiMQ安装

2.2.1 下载安装

官网下载:http://www.rabbitmq.com/download.html
下载后一通傻瓜式安装即可。
在这里插入图片描述

2.2.2 环境变量

变量名-RABBITMQ_SERVER 变量值-文件安装路径
在这里插入图片描述
编辑path,点击新建按钮,输入%RABBITMQ_SERVER%\sbin,点击确定
在这里插入图片描述

2.2.3 安装mqtt插件

rabbitmq-plugins enable rabbitmq_mqtt

在这里插入图片描述

2.2.4 管理控制台安装

rabbitmq-plugins enable rabbitmq_management

在这里插入图片描述

2.2.5 访问测试

登录测试: 浏览器输入 http://localhost:15672 ,输入用户名:guest,密码:guest在这里插入图片描述

三、代码实现

3.1 引入依赖

<!-- rabbitmq -->
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

<!--mqtt依赖包-->
<dependency>
    <groupId>org.springframework.integration</groupId>
    <artifactId>spring-integration-mqtt</artifactId>
</dependency>
<dependency>
    <groupId>org.eclipse.paho</groupId>
    <artifactId>org.eclipse.paho.client.mqttv3</artifactId>
    <version>1.2.0</version>
</dependency>

3.2 yml配置

# mqtt配置
mqtt:
  url: ***********
  username: ***********
  password: ***********
  # 间隔时间
  keep-alive-interval: 60
  # 超时时间
  completion-timeout: 30000
  # 会话保持,默认为false
  clean-session: false
  # 自动连接,默认为true
  automatic-reconnect: true
  # 生产者配置
  producer:
    # 很重要
    client-id: producer1
    topic: demo-topic
    # 传输质量 QoS 0:最多分发一次 QoS 1:至少分发一次(默认) QoS 2:只分发一次
    qos: 1
  # 消费者配置
  subscriber:
    # 很重要
    client-id: subscriber1
    topic: demo-topic
    # 传输质量 QoS 0:最多分发一次 QoS 1:至少分发一次(默认) QoS 2:只分发一次
    qos: 0

3.3 配置属性类

package com.qiangesoft.mqtt.config;

import lombok.Data;
import org.apache.commons.lang3.StringUtils;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory;
import org.springframework.integration.mqtt.core.MqttPahoClientFactory;
import org.springframework.stereotype.Component;

/**
 * mqtt配置
 *
 * @author qiangesoft
 * @date 2024-04-24
 */
@Data
@Component
@Configuration
@ConfigurationProperties(prefix = "mqtt")
public class MqttProperty {

    /**
     * 服务地址
     */
    private String url;

    /**
     * 账号
     */
    private String username;

    /**
     * 密码
     */
    private String password;

    /**
     * 间隔时间
     */
    private int keepAliveInterval;

    /**
     * 超时时间
     */
    private int completionTimeout;

    /**
     * 会话保持,默认为false
     */
    private boolean cleanSession;

    /**
     * 自动连接,默认为true
     */
    private boolean automaticReconnect = true;

    /**
     * 生产者
     */
    private Client producer = new Client();

    /**
     * 消费者
     */
    private Client subscriber = new Client();

    @Data
    public class Client {
        /**
         * 客户端id
         */
        private String clientId;

        /**
         * 默认主题
         */
        private String topic;

        /**
         * 传输质量
         * QoS 0:最多分发一次
         * QoS 1:至少分发一次(默认)
         * QoS 2:只分发一次
         */
        private int qos = 1;
    }

    @Bean
    public MqttPahoClientFactory mqttPahoClientFactory() {
        DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
        // 连接参数
        MqttConnectOptions options = new MqttConnectOptions();
        options.setServerURIs(new String[]{url});
        if (StringUtils.isNotBlank(this.username)) {
            options.setUserName(this.username);
        }
        if (StringUtils.isNotBlank(this.password)) {
            options.setPassword(this.password.toCharArray());
        }
        // 心跳时间
        options.setKeepAliveInterval(this.keepAliveInterval);
        // 断开是否自动重联
        options.setAutomaticReconnect(this.automaticReconnect);
        // 保持session,客户端上线后会接受到它离线的这段时间的消息
        options.setCleanSession(this.cleanSession);
        // 设置“遗嘱”消息的话题,若客户端与服务器之间的连接意外中断,服务器将发布客户端的“遗嘱”消息。
//        options.setWill("willTopic", WILL_DATA, 2, false);
        factory.setConnectionOptions(options);
        return factory;
    }
}
package com.qiangesoft.mqtt.constant;

/**
 * mqtt通用常量信息
 *
 * @author qiangesoft
 * @date 2024-04-24
 */
public class MqttConstant {

    /**
     * 生产者管道
     */
    public static final String OUTBOUND_CHANNEL = "outboundChannel";

    /**
     * 消费者管道
     */
    public static final String INBOUND_CHANNEL = "inboundChannel";
}

3.4 生产者

package com.qiangesoft.mqtt.producer;

import com.qiangesoft.mqtt.config.MqttProperty;
import com.qiangesoft.mqtt.constant.MqttConstant;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.mqtt.core.MqttPahoClientFactory;
import org.springframework.integration.mqtt.outbound.MqttPahoMessageHandler;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandler;

/**
 * mqtt生产者
 *
 * @author qiangesoft
 * @date 2024-04-24
 */
@Configuration
public class MqttProducerConfig {

    @Autowired
    private MqttProperty mqttProperty;
    @Autowired
    private MqttPahoClientFactory mqttPahoClientFactory;

    /**
     * 消息生产通道
     *
     * @return
     */
    @Bean(name = MqttConstant.OUTBOUND_CHANNEL)
    public MessageChannel outboundChannel() {
        return new DirectChannel();
    }

    /**
     * 消息发布
     *
     * @return
     */
    @Bean
    @ServiceActivator(inputChannel = MqttConstant.OUTBOUND_CHANNEL)
    public MessageHandler mqttOutbound() {
        MqttPahoMessageHandler messageHandler = new MqttPahoMessageHandler(mqttProperty.getProducer().getClientId(), mqttPahoClientFactory);
        messageHandler.setAsync(false);
        messageHandler.setDefaultQos(mqttProperty.getProducer().getQos());
        messageHandler.setDefaultTopic(mqttProperty.getProducer().getTopic());
        return messageHandler;
    }
}

3.5 消费者

package com.qiangesoft.mqtt.subscriber;

import com.qiangesoft.mqtt.config.MqttProperty;
import com.qiangesoft.mqtt.constant.MqttConstant;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.core.MessageProducer;
import org.springframework.integration.mqtt.core.MqttPahoClientFactory;
import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter;
import org.springframework.integration.mqtt.support.DefaultPahoMessageConverter;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandler;

import java.util.Objects;

/**
 * mqtt消费者
 *
 * @author qiangesoft
 * @date 2024-04-24
 */
@Slf4j
@Configuration
public class MqttSubscriberConfig {

    @Autowired
    private MqttProperty mqttProperty;
    @Autowired
    private MqttPahoClientFactory mqttPahoClientFactory;

    /**
     * 消息订阅通道
     *
     * @return
     */
    @Bean(name = MqttConstant.INBOUND_CHANNEL)
    public MessageChannel inboundChannel() {
        return new DirectChannel();
    }

    /**
     * 消息订阅通道绑定
     *
     * @return
     */
    @Bean
    public MessageProducer mqttInbound() {
        MqttPahoMessageDrivenChannelAdapter adapter = new MqttPahoMessageDrivenChannelAdapter(mqttProperty.getSubscriber().getClientId(),
                mqttPahoClientFactory, mqttProperty.getSubscriber().getTopic());
        adapter.setCompletionTimeout(mqttProperty.getCompletionTimeout());
        adapter.setConverter(new DefaultPahoMessageConverter());
        adapter.setQos(mqttProperty.getSubscriber().getQos());
        adapter.setOutputChannel(inboundChannel());
        return adapter;
    }

    /**
     * 消息订阅
     *
     * @return
     */
    @Bean
    @ServiceActivator(inputChannel = MqttConstant.INBOUND_CHANNEL)
    public MessageHandler messageHandler() {
        return message -> {
            try {
                String topic = Objects.requireNonNull(message.getHeaders().get("mqtt_receivedTopic")).toString();
                log.info("订阅主题为: {}", topic);
                String payload = message.getPayload().toString();
                log.info("订阅接收到消息:{}", payload);
            } catch (Exception e) {
                e.printStackTrace();
            }
        };
    }

}

3.6 消息发送网关

package com.qiangesoft.mqtt.service;

import com.qiangesoft.mqtt.constant.MqttConstant;
import org.springframework.integration.annotation.MessagingGateway;
import org.springframework.integration.mqtt.support.MqttHeaders;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.stereotype.Component;

/**
 * 消息发送网关
 *
 * @author qiangesoft
 * @date 2024-04-24
 */
@Component
@MessagingGateway(defaultRequestChannel = MqttConstant.OUTBOUND_CHANNEL)
public interface MqttGateway {

    /**
     * 发送到mqtt
     *
     * @param payload 消息内容
     */
    void sendToMqtt(String payload);

    /**
     * 发送到mqtt
     *
     * @param topic   主题
     * @param payload 消息内容
     */
    void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, String payload);

    /**
     * 发送到mqtt
     *
     * @param topic   主题
     * @param qos     qos
     * @param payload 消息内容
     */
    void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, @Header(MqttHeaders.QOS) int qos, String payload);

}

3.7 发送测试

package com.qiangesoft.mqtt.controller;

import com.qiangesoft.mqtt.service.MqttGateway;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

/**
 * 控制器
 *
 * @author qiangesoft
 * @date 2024-04-24
 */
@RestController
@RequestMapping("/mqtt")
public class MqttController {

    @Autowired
    private MqttGateway mqttGateway;

    @GetMapping("/send")
    public String send(String message) {
        mqttGateway.sendToMqtt(message);
        return "success";
    }
}

在这里插入图片描述
在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1624740.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

阿斯达年代记游戏下载教程 阿斯达年代记下载教程

《阿斯达年代记&#xff1a;三强争霸》作为一款气势恢宏的MMORPG大作&#xff0c;是Netmarble与STUDIO DRAGON强强联合的巅峰创作&#xff0c;定于4月24日迎来全球玩家热切期待的公测。游戏剧情围绕阿斯达大陆的王权争夺战展开&#xff0c;三大派系——阿斯达联邦、亚高联盟及边…

浅谈菊风实时音视频 (RTC)与实时操作系统 (RTOS) 在智能硬件领域应用

近年来&#xff0c;菊风通过实时音视频赋能智能手表、智能门禁、智能门锁/门铃、智能眼镜等数十种智能硬件&#xff0c;与一众合作伙伴共同探索在IoT智能硬件领域的不同场景应用&#xff0c;积累了丰富的实践经验。在智能硬件中&#xff0c;RTOS因其轻量化的系统内核&#xff0…

使用Mybatis映射时间 DateTime ==> LocalDateTime

首先查看&#xff0c;数据库字段&#xff1a; 书写映射实体类对象VO&#xff1a; Data public class OrderListVO implements Serializable {private Integer orderId;private String memberName;private String orderNumber;private BigDecimal orderPrice;private String l…

element-ui upload 组件 手动多次出发 submit

element 上传组件 upload 上传成功以后&#xff0c;想重新 调用 submit()函数&#xff0c;发现是不可以进行多次触发的,。 直接上解决方法&#xff0c;在上传成功后的钩子函数里添加:fileList[0l.status ready fileList是文件列表&#xff0c;status是单文件的状态改成ready就…

全栈从0到1 3D旅游地图标记和轨迹生成

功能演示 演示视频 体验地址 Vercel App 开发技术栈&#xff1a; NextJs&#xff08;前端框架&#xff09;React&#xff08;前端框架&#xff09;TailwindCSS &#xff08;CSS样式&#xff09;echart echart gl &#xff08;地图生成&#xff09;shadui&#xff08;UI组件…

机器视觉系统-工业光源什么是无影光

光路描述&#xff1a;通过结构或漫射板改变光路&#xff0c;最终发光角度包含了高角度 和低角度。 效果分析&#xff1a;兼具了高角度光和低角度光的效果&#xff0c;使被测物得到了多角度的照射&#xff0c;表面纹理、皱褶被弱化&#xff0c; 图像上整体均匀。 主要应用&#…

linux 上 jps 列出一堆 jar,如何快速定位 jar 文件启动位置?

例如&#xff0c;在 /data下有一个 xxx.jar &#xff0c;如果是通过 "java -jar /data/xxx.jar" 方式启动&#xff0c;则 jps会列出的名字中带 xxx.jar&#xff0c;这时再 "ps -ef | grep xxx.jar" 就会列出 更详细的信息&#xff0c;例如 "java -ja…

Spring Kafka——基于 Spring Kafka 实现动态管理 Kafka 连接和 topic 的监听

文章目录 使用 Spring Kafka 动态管理 Kafka 连接和主题监听1. 前言2. 简单的消费程序配置3. Spring Kafka 主要的相关类的说明4. KafkaListener 注解的加载执行流程解析5. 动态监听消费订阅的设计与实现 使用 Spring Kafka 动态管理 Kafka 连接和主题监听 文章内容较长&#x…

Windows电脑中护眼(夜间)模式的开启异常

我的电脑是联想小新16pro&#xff0c;Windows11版本。之前一直可以正常使用夜间模式&#xff0c;但是经过一次电脑的版本更新之后&#xff0c;我重启电脑发现我的夜间模式不能使用了。明明显示开启状态&#xff0c;但是却不能使用&#xff0c;电脑还是无法显示夜间模式。 询问…

59、回溯-括号生成

思路&#xff1a; 括号是成对出现&#xff0c;首先左括号可以放n个&#xff0c;右括号也可以放n个。如果当前左括号放了3了&#xff0c;右括号放了4个&#xff0c;错了&#xff0c;如果左括号放了5个&#xff0c;右括号放了4个。可以&#xff0c;继续放右括号即可。所以可以设…

每日一题:跳跃游戏II

给定一个长度为 n 的 0 索引整数数组 nums。初始位置为 nums[0]。 每个元素 nums[i] 表示从索引 i 向前跳转的最大长度。换句话说&#xff0c;如果你在 nums[i] 处&#xff0c;你可以跳转到任意 nums[i j] 处: 0 < j < nums[i] i j < n 返回到达 nums[n - 1] 的最…

【Linux系统化学习】死锁 | 线程同步

目录 死锁 死锁的必要条件 避免死锁 线程同步 条件变量 同步概念和竞态条件 条件变量接口 创建和初始化条件变量 等待条件满足 唤醒等待 毁条件变量 为什么 pthread_cond_wait 需要互斥量? 条件变量使用规范 等待条件代码 给条件发送信号代码 死锁 死锁是指在一…

深度探讨容器化技术在网络安全中的应用与挑战

随着容器化技术的快速发展&#xff0c;尤其是Docker与Kubernetes&#xff08;K8s&#xff09;的广泛应用&#xff0c;企业IT架构正经历着从传统虚拟机向轻量级容器的深刻变革。容器化技术为提升资源利用率、加速应用部署及维护提供了强大支持&#xff0c;但同时也给网络安全带来…

用 VMare Workstation 搭建 esxi --- (一)创建 exsi 虚拟机

用 VMare Workstation 搭建 esxi 文章目录 用 VMare Workstation 搭建 esxi创建虚拟机 创建虚拟机

企业微信代开发应用登录操作

首先声明&#xff1a;企微的文档写得真烂&#xff01;&#xff01;&#xff01;有一些问题&#xff0c;官方情愿在问答区给用户一个个解答&#xff0c;也不愿意在文档写清楚&#xff0c;生怕自己工作量不饱和被优化。 概念说明 代开发应用&#xff0c;是相对于自建应用来说的。…

计算机网络和因特网

Internet: 主机/端系统&#xff08;end System / host&#xff09;&#xff1a; 硬件 操作系统 网络应用程序 通信链路&#xff1a; 光纤、网络电缆、无线电、卫星 传输效率&#xff1a;带宽&#xff08;bps&#xff09; 分组交换设备&#xff1a;转达分组 包括&#…

Centos的一些基础命令

CentOS是一个基于开源代码构建的免费Linux发行版&#xff0c;它由Red Hat Enterprise Linux (RHEL) 的源代码重新编译而成。由于 CentOS是基于RHEL构建的&#xff0c;因此它与RHEL具有非常类似的特性和功能&#xff0c;包括稳定性、安全性和可靠性。并且大部分的 Linux 命令在C…

SpringBoot学习之Redis下载安装启动【Mac版本】(三十七)

一、下载Redis 1、下载地址:Downloads - Redis 往下滑,找到Downloads区域,这里有若干版本,这里我们选择了7.0的稳定版本 2、我们下载的是redis-7.0.15.tar.gz,这是一个压缩包,我们双击解压这个压缩包,可以得到如下文件 二、安装Redis 1、我们进入redis根目录安装mak…

Orange3数据可视化(树查看器-决策树)

树视图 分类和回归树的可视化。 输入 树&#xff1a;决策树 输出 选中的数据&#xff1a;从树节点中选中的实例 数据&#xff1a;带有额外一列&#xff0c;显示每个点是否被选中 这是一个多功能的小部件&#xff0c;用于展示分类和回归树的2D可视化。用户可以选择一个节点…

jvm知识点总结(二)

Java8默认使用的垃圾收集器是什么? Java8版本的Hotspot JVM,默认情况下使用的是并行垃圾收集器&#xff08;Parallel GC&#xff09; 如果CPU使用率飙升&#xff0c;如何排查? 1.先通过top定位到消耗最高的进程id 2.执行top -h pid单独监控该进程 3.在2中输入H&#xff…