SpringBoot集成MQTT协议

news2024/9/22 7:38:00

简介

MQTT 可以被解释为一种低开销,低带宽占用的即时通讯协议,可以用较少的代码和带宽为远程设备连接提供实时可靠的消息服务,它适用于硬件性能低下的远程设备以及网络状况糟糕的环境下,因此 MQTT 协议在 IoT(Internet of things,物联网),小型设备应用,移动应用等方面有较广泛的应用。

优点:代码量少,开销低,带宽占用小,即时通讯协议。

MQTT原理

实现mqtt协议需要客户端和服务器端通讯完成,在通讯中,mqtt协议中有三种身份:发布者(publish),代理(broker)(服务器),订阅者(subscribe)。其中,消息的发布者和订阅者都是客户端。消息代理是服务器,消息发布者可以同时是订阅者,传输过程如下如所示。

image.png

有别于传统的客户端/服务器通讯协议,MQTT协议并不是端到端的,消息传递通过代理,包括会话(session)也不是建立在发布者和订阅者之间,而是建立在端和代理之间。代理解除了发布者和订阅者之间的耦合。

除了发布者和订阅者之间传递普通消息,代理还可以为发布者处理保留消息和遗愿消息,并可以更改服务质量(QoS)等级。

MQTT主题

1、主题层级分隔符“/”

用于分割主题的每个层级,为主题名提供一个分层结构。如主题:

china/anhui
china/anhui/hefei

2、多层通配符“#”

用于匹配主题中任意层级的通配符。如主题:china/#

china/anhui
china/anhui/hefei
china/anhui/hefei/shushan

3、单层通配符“+”

加号是只能用于单个主题层级匹配的通配符。如主题:

china/+      只能匹配 china/anhui
china/+/+/shushan     能匹配china/anhui/hefei/shushan

4、通配符“$”

通配符“$”表示匹配一个字符,只要不是放在主题的最开头,即:

$xx
/$xx
/xx$

实战应用

对接协议部分内容

image.png

SpringBoot集成Mqtt协议

本篇文章就不重点介绍MQTT相关的内容了,主要学会怎么运用到实际开发工作中。

  1. 添加依赖
<dependency>
   <groupId>org.springframework.integration</groupId>
   <artifactId>spring-integration-mqtt</artifactId>
</dependency>
  1. 添加配置类
@Configuration
@ConfigurationProperties(prefix="spring.mqtt")
public class MqttProperties {
    private String url;
    private String username;
    private String password;
    private String clientId;

    public String getUrl() {
        return url;
    }

    public void setUrl(String url) {
        this.url = url;
    }

    public String getUsername() {
        return username;
    }

    public void setUsername(String username) {
        this.username = username;
    }

    public String getPassword() {
        return password;
    }

    public void setPassword(String password) {
        this.password = password;
    }

    public String getClientId() {
        return clientId;
    }

    public void setClientId(String clientId) {
        this.clientId = clientId;
    }
}
@Configuration
@ConditionalOnBean(MqttProperties.class)
public class MqttConfiguration {

    @Autowired
    private MqttProperties mqttProperties;

    @Autowired
    private MqttMessageHandler mqttMessageHandler;

    /**
     * 创建MqttPahoClientFactory,设置MQTT Broker连接属性,如果使用SSL验证,也在这里设置。
     *
     * @return factory
     */
    @Bean
    public MqttPahoClientFactory mqttClientFactory() {
        DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
        MqttConnectOptions options = new MqttConnectOptions();

        // 设置代理端的URL地址,可以是多个
        options.setServerURIs(new String[]{mqttProperties.getUrl()});
        options.setUserName(mqttProperties.getUsername());
        options.setPassword(mqttProperties.getPassword().toCharArray());
        options.setKeepAliveInterval(120);
        factory.setConnectionOptions(options);
        return factory;
    }

    /**
     * 入站通道
     */
    @Bean
    public MessageChannel mqttInputChannel() {
        return new DirectChannel();
    }

    /**
     * 入站
     */
    @Bean
    public MessageProducer inbound() {
        // Paho客户端消息驱动通道适配器,主要用来订阅主题
        MqttPahoMessageDrivenChannelAdapter adapter = new MqttPahoMessageDrivenChannelAdapter(
                mqttProperties.getClientId() + "-consumer",
                mqttClientFactory(),
                MqttConstants.UP_DEVICE_STATE,
                MqttConstants.UP_DEVICE_STATUS,
                MqttConstants.UP_DEVICE_EVENT,
                MqttConstants.UP_DEVICE_SET_ACK,
                MqttConstants.UP_DEVICE_GET_ACK,
                MqttConstants.UP_DEVICE_CAPTURE_REPORT,
                MqttConstants.UP_DEVICE_CONTROL_QUERY_ACK,
                MqttConstants.UP_DEVICE_CONTROL_ACK);
        // Paho消息转换器
        DefaultPahoMessageConverter defaultPahoMessageConverter = new DefaultPahoMessageConverter();
        adapter.setConverter(defaultPahoMessageConverter);
        adapter.setCompletionTimeout(5000);
        // 设置QoS
        adapter.setQos(1);
        adapter.setOutputChannel(mqttInputChannel());
        return adapter;
    }

    /**
     * ServiceActivator注解表明:当前方法用于处理MQTT消息,inputChannel参数指定了用于消费消息的channel。
     *
     * @return
     */
    @Bean
    @ServiceActivator(inputChannel = "mqttInputChannel")
    public MessageHandler handler() {
        return mqttMessageHandler;
    }

    /**
     * 出站通道
     */
    @Bean
    public MessageChannel mqttOutboundChannel() {
        return new DirectChannel();
    }

    /**
     * 出站
     */
    @Bean
    @ServiceActivator(inputChannel = "mqttOutboundChannel")
    public MessageHandler outbound() {
        // 发送消息和消费消息Channel可以使用相同MqttPahoClientFactory
        MqttPahoMessageHandler mqttPahoMessageHandler = new MqttPahoMessageHandler(
                mqttProperties.getClientId() + "-producer", mqttClientFactory());
        // 如果设置成true,即异步,发送消息时将不会阻塞。
        mqttPahoMessageHandler.setAsync(true);
        // 设置默认QoS
        mqttPahoMessageHandler.setDefaultQos(1);
        // Paho消息转换器
        DefaultPahoMessageConverter defaultPahoMessageConverter = new DefaultPahoMessageConverter();
        mqttPahoMessageHandler.setConverter(defaultPahoMessageConverter);
        return mqttPahoMessageHandler;
    }
}
  1. 消息处理类
@Service
public class MqttMessageHandler implements MessageHandler {

    private final Logger logger = LoggerFactory.getLogger(MqttMessageHandler.class);

    @Override
    public void handleMessage(Message<?> message) throws MessagingException {
        try {
            String payload = message.getPayload().toString();
            String topic = message.getHeaders().get("mqtt_receivedTopic").toString();
            logger.info("接受来自mqtt的订阅信息,topic:{}", topic);
            //离线上报
            if (topic.matches(".+/offline")) {
                statusReport(payload);
            }
            //状态上报
            else if (topic.matches(".+/status")) {
                deviceInfoReport(payload);
            } 
            //事件上报
            else if (topic.matches(".+/eventReport")) {
                eventReport(payload);
            } else {
                logger.info("主题topic:{},负载payload:{}", topic, payload);
            }
        } catch (Exception e) {
            logger.error("handleMessage 接受mqtt订阅消息异常:", e);
        }
    }
    
      /**
     * 设备状态上报
     *
     * @param payload
     */
    private void statusReport(String payload) {
        OfflineReport offlineReport = JSONUtil.toBean(payload, OfflineReport.class);
        logger.info("收到设备状态信息上报:{}", offlineReport);
        //...省略
    }

 //...省略
   
}
/**
 * 消息发送
 */
@MessagingGateway(defaultRequestChannel = "mqttOutboundChannel")
public interface MqttGateway {

    /**
     * 定义重载方法,用于消息发送
     *
     * @param payload
     */
    void sendToMqtt(String payload);

    /**
     * 指定topic进行消息发送
     *
     * @param topic
     * @param payload
     */
    void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, String payload);

    void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, @Header(MqttHeaders.QOS) int qos, String payload);

    void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, @Header(MqttHeaders.QOS) int qos, byte[] payload);
}

源码地址:https://gitee.com/jiangwang001/springboot/tree/master/cy-cabinet-adapter

小结

本文简单介绍了一下MQTT协议的基本知识,在实际工作中,MQTT通常应用于物联网、智能家居等设备和应用程序之间的通信。在嵌入式领域,MQTT已经占据着无法替代的分量,因为大多数的嵌入式设备,都需要这样的协议进行数据交互。

2024一起加油~

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1353992.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

工业物联网上篇——什么是IIOT?

工业物联网背后的理念是使用工业设施中“哑巴设备”多年来产生的数据。装配线上的智能机器不仅可以更快地捕获和分析数据&#xff0c;且在交流重要信息方面也更快&#xff0c;这有助于更快、更准确地做出业务决策。 信息技术&#xff08;IT&#xff09;和运营技术&#xff08;O…

如何打开wps的备份中心查找备份文件

备份中心在我们使用WPS Office时扮演着重要的角色。经常保存文件的同时&#xff0c;我们也应该学会备份文件&#xff0c;以免意外损失。本文将向您介绍如何使用WPS备份中心来查找并恢复备份文件&#xff0c;方便您在需要时快速找到所需文件。 图片来源于网络&#xff0c;如有侵…

STM32 学习(三)OLED 调试工具

目录 一、简介 二、使用方法 2.1 接线图 2.2 配置引脚 2.3 编写代码 三、Keil 工具调试 一、简介 在进行单片机开发时&#xff0c;有很多调试方法&#xff0c;如下图&#xff1a; 其中 OLED 就是一种比较好用的调试工具&#xff1a; OLED 硬件电路如下&#xff0c…

科技智慧,产业链全覆盖:河南恩珅德农业的养殖业务优势

河南恩珅德农业以科技智慧和全产业链覆盖的优势&#xff0c;成功打造了一体化的养殖业务模式&#xff0c;为养殖者提供了全面的支持和优越的管理体验。以下是该企业养殖业务的核心优势&#xff1a; 1. 先进科技智慧 河南恩珅德农业充分利用先进的科技手段&#xff0c;引入智能…

了解并使用django-rest-framework-jwt

一 JWT认证 在用户注册或登录后&#xff0c;我们想记录用户的登录状态&#xff0c;或者为用户创建身份认证的凭证。我们不再使用Session认证机制&#xff0c;而使用Json Web Token&#xff08;本质就是token&#xff09;认证机制。 Json web token (JWT), 是为了在网络应用环…

Android开发中“真正”的仓库模式

原文地址&#xff1a;https://proandroiddev.com/the-real-repository-pattern-in-android-efba8662b754原文发表日期&#xff1a;2019.9.5作者&#xff1a;Denis Brandi翻译&#xff1a;tommwq翻译日期&#xff1a;2024.1.3 Figure 1: 仓库模式 多年来我见过很多仓库模式的实…

c# OpenCvSharp透视矫正参数调整器

透视矫正不够智能化&#xff0c;每次都要进行局部参数调整&#xff0c;不便于程序使用&#xff0c;程序流程还是那几个步骤&#xff1b; 1、读取图像、灰度化 2、高斯滤波 3、二值化 4、边缘检测 灰度化图 上个图看看经过调整透视矫正边缘检测结果我还是挺满意的 发现一个…

Nest 框架:解锁企业级 Web 应用开发的秘密武器(下)

&#x1f90d; 前端开发工程师&#xff08;主业&#xff09;、技术博主&#xff08;副业&#xff09;、已过CET6 &#x1f368; 阿珊和她的猫_CSDN个人主页 &#x1f560; 牛客高级专题作者、在牛客打造高质量专栏《前端面试必备》 &#x1f35a; 蓝桥云课签约作者、已在蓝桥云…

将ipad拓展为笔记本副屏

写在前面 对比过moonlight和spacedesk两种软件&#xff08;付费的更流畅&#xff0c;本人穷困暂不涉及&#xff09;&#xff0c;moonlight无线连接感觉更卡顿一些&#xff08;但是大多数人都觉得moonlight更丝滑&#xff0c;实践是检验真理的唯一标准&#xff0c;建议自己都试…

go执行静态二进制文件和执行动态库文件

目的和需求&#xff1a;部分go的核心文件不开源&#xff0c;例如验证&#xff0c;主程序核心逻辑等等 第一个想法&#xff0c;把子程序代码打包成静态文件&#xff0c;然后主程序执行 子程序 package mainimport ("fmt""github.com/gogf/gf/v2/os/gfile"…

UⅤ机用的滚珠丝杆套装

UⅤ机用的滚珠丝杆套装 上面是单丝杆UⅤ机内部。

ES(Elasticsearch)的基本使用

一、常见的NoSQL解决方案 1、redis Redis是一个基于内存的 key-value 结构数据库。Redis是一款采用key-value数据存储格式的内存级NoSQL数据库&#xff0c;重点关注数据存储格式&#xff0c;是key-value格式&#xff0c;也就是键值对的存储形式。与MySQL数据库不同&#xff0…

我的创作纪念日三年收获和感悟

机缘 我刚开始接触创作也是最近几年开始&#xff0c;当初就是希望自己的收获分享给大家&#xff0c;不仅使自己成长&#xff0c;也可以带着大家一起成长&#xff0c;独乐乐不如众乐乐&#xff0c;人都是自私的以前我都是看到好的知识文章都是自己藏起来&#xff0c;发现收获的…

Qt/C++编写视频监控系统82-自定义音柱显示

一、前言 通过音柱控件实时展示当前播放的声音产生的振幅的大小&#xff0c;得益于音频播放组件内置了音频振幅的计算&#xff0c;可以动态开启和关闭&#xff0c;开启后会对发送过来的要播放的声音数据&#xff0c;进行运算得到当前这个音频数据的振幅&#xff0c;类似于分贝…

初始数字孪生

文章目录 概念定义 架构框图 基本要求 功能要求 服务应用 参考文献 概念定义 数字孪生&#xff08;digital twin&#xff09;&#xff0c;是指具有保证物理状态和虚拟状态之间以适当速率和精度同步的数据连接的特定目标实体的数字化表达。 架构框图 数字孪生的整体架构&a…

玩转贝启科技BQ3588C开源鸿蒙系统开发板 —— DevEco Studio下载与安装

一、下载DevEco Studio IDE开发工具 1. 登录鸿蒙官网 网址为&#xff1a; ​​​​​​​华为HarmonyOS智能终端操作系统官网 | 应用设备分布式开发者生态 页面如下&#xff1a; 2. 搜索“DevEco Studio IDE” 点击右上角的“请输入关键词”&#xff0c;在其中搜索“DevEc…

Termius for Mac/Win:一款功能强大的终端模拟器、SSH 和 SFTP 客户端软件

随着远程工作和云技术的普及&#xff0c;对于高效安全的远程访问和管理服务器变得至关重要。Termius&#xff0c;一款强大且易用的终端模拟器、SSH 和 SFTP 客户端软件&#xff0c;正是满足这一需求的理想选择。 Termius 提供了一站式的解决方案&#xff0c;允许用户通过单一平…

钡铼分布式IO在玻璃制造中的实时数据采集与监控应用介绍

导读 玻璃行业多年来一直广泛使用 PLC 来帮助管理生产过程所需的精确材料比例&#xff0c;完全依赖其PLC进行数据采集与控制&#xff0c;并且大量依靠人工来操作&#xff0c;所以这些高成本推动了对成本较低的替代方案的需求。 场景描述 某玻璃厂生产的玻璃生产包括配料段、熔…

Excel 插件:ASAP Utilities Crack

ASAP Utilities是一款功能强大的 Excel 插件&#xff0c;填补了 Excel 的空白。在过去的 20 年里&#xff0c;我们的加载项已经发展成为世界上最受欢迎的 Microsoft Excel 加载项之一。 ASAP Utilities 中的功能数量&#xff08;300 多个&#xff09;可能看起来有点令人眼花缭乱…

2024.1.2 Redis 数据类型 Stream、Geospatial、HyperLogLog、Bitmaps、Bitfields 简介

目录 引言 Stream 类型 Geospatial 类型 HyperLogLog 类型 Bitmaps 类型 Bitfields 类型 引言 Redis 最关键&#xff08;应用广泛、频繁使用&#xff09;的五个数据类型 StringListHashSetZSet 下文介绍的数据类型一般适合在特定的场景中使用&#xff01; Stream 类型 St…