SpringBoot集成Mqtt发送消息

news2025/1/14 18:42:07

1. MQTT简介

MQTT是一种物联网消息协议,为Message Queuing Telemetry Transport的缩写,即消息队列传输探测,协议基于发布订阅模式进行通信,有开销低、带宽小、轻量的特点,通常应用在物联网数据采集、移动应用、智能硬件、电力、能源等领域。

相关概念

三种身份:

在这里插入图片描述

  • 客户端(Client):MQTT 客户端是发送接收消息的应用程序。
  • 服务器(Broker):也叫“代理”,服务器是处理消息的应用程序,位于发布者和订阅者中间,负责接收消息,并按照某种规则发送给订阅者。
  • 主题(Topic): 主题是消息的标识符,用于区分不同类型的消息。

MQTT 消息

MQTT传输的消息可以分为:主题(topic)和负载(payload)两部分

  • 主题,可以理解为消息的类型
  • 负载,可以理解为消息的内容

消息服务质量QoS(Quality of Service)

Qos用于保证在不同的网络环境下消息传递的可靠性,分为3个等级

  • 0 消息最多传递一次,消息发布完全依赖底层TCP/IP网络,可能会发生消息丢失, 也就是发出去就不管了,也被叫做“即发即弃”
  • 1 消息传递至少 1 次,确保消息到达,但消息重复可能会发生,发送者将会存储发送的信息直到发送者收到一次来自接收者的PUBACK格式的应答。
  • 2 消息仅传送一次,确保消息到达一次

2. SpringBoot集成Mqtt

Spring集成Mqtt常用的有两种方式,一种是直接使用Mqtt的客户端库,如Eclipse Paho,另外一种是spring integration mqtt
第一种:使用Mqtt客户端库
依赖引入:org.eclipse.paho.client.mqttv3

<dependency>
	<groupId>org.eclipse.paho</groupId>
	<artifactId>org.eclipse.paho.client.mqttv3</artifactId>
	<version>1.2.0</version>
</dependency>

服务端配置

public class MqttSendMsgService {
    private static String clientId = "test";
    private static String username = "admin";
    private static String password = "xxxxxx";
    private static String broker = "tcp://xxxxx:1883";
    public ReturnT<String> mqttSend(String param) {
        MqttClient client;
        try {
            client = new MqttClient(broker, clientId, new MemoryPersistence());
            client.setCallback(new MqttCallback() {
                public void connectionLost(Throwable cause) {
                    System.out.println("Connection lost: " + cause.getMessage());
                }
                @Override
                public void messageArrived(String topic, MqttMessage mqttMessage) throws Exception {
                    System.out.println("Message arrived: " + mqttMessage.getPayload());
                }

                @Override
                public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
                    System.out.println("Delivery complete");
                }
            });

            MqttConnectOptions connOpts = new MqttConnectOptions();
            connOpts.setUserName(username);
            connOpts.setPassword(password.toCharArray());

            client.connect(connOpts);
            log.info("Connected to MQTT Broker!");


            //主题
            String topic="test/simple";
            //消息
            String content="发送测试";

            MqttMessage message = new MqttMessage();
            message.setQos(1);
            message.setRetained(false);
            message.setPayload(content.getBytes());
            //消息发送
            client.publish(topic,message);
        } catch (MqttException e) {
            e.printStackTrace();
        }
        return ReturnT.SUCCESS;
    }
}

上面这种使用起来比较简单,生产环境使用最多的还是下面这种

第二种:使用 Spring integration进行集成,这里以发送消息为例
依赖引入

<dependency>
	<groupId>org.springframework.integration</groupId>
	<artifactId>spring-integration-mqtt</artifactId>
	<version>5.5.14</version>
</dependency>

添加yaml配置

mqtt.url = tcp://xxxxx:1883
mqtt.username = admin
mqtt.password = 123456
mqtt.clientId = test
mqtt.defaultTopic = /test/send
mqtt.keepAliveInterval = 60
mqtt.automaticReconnect = true
mqtt.cleanSession = false
mqtt.connectionTimeout = 30
mqtt.maxInflight = 1024

添加对应的属性配置类

@Component
public class MqttConfigProperties {
    @Value("${mqtt.url}")
    private String url;
    @Value("${mqtt.username}")
    private String username;
    @Value("${mqtt.password}")
    private String password;
    @Value("${mqtt.clientId}")
    private String clientId;
    @Value("${mqtt.defaultTopic}")
    private String defaultTopic;
    @Value("${mqtt.keepAliveInterval}")
    private Integer keepAliveInterval;
    @Value("${mqtt.automaticReconnect}")
    private Boolean automaticReconnect;
    @Value("${mqtt.cleanSession}")
    private Boolean cleanSession;
    @Value("${mqtt.connectionTimeout}")
    private Integer connectionTimeout;
    @Value("${mqtt.maxInflight}")
    private Integer maxInflight;
}

创建客户端配置类

@Configuration
@IntegrationComponentScan
public class MqttConfig {
    @Autowired
    private MqttConfigProperties mqttConfigProperties;

    @Bean
    public MqttConnectOptions mqttConnectOptions() {
        log.info("初始化mqtt信息{}", JSON.toJSON(mqttConfigProperties));
        MqttConnectOptions options = new MqttConnectOptions();
        options.setUserName(mqttConfigProperties.getUsername());
        options.setPassword(mqttConfigProperties.getPassword().toCharArray());
        options.setServerURIs(new String[]{mqttConfigProperties.getUrl()});
        options.setKeepAliveInterval(mqttConfigProperties.getKeepAliveInterval());
        options.setAutomaticReconnect(mqttConfigProperties.getAutomaticReconnect());
        options.setCleanSession(mqttConfigProperties.getCleanSession());
        options.setConnectionTimeout(mqttConfigProperties.getConnectionTimeout());
        options.setMaxInflight(mqttConfigProperties.getMaxInflight());
        return options;
    }
    @Bean
    public MqttPahoClientFactory mqttPahoClientFactory(MqttConnectOptions mqttConnectOptions) {
        DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
        factory.setConnectionOptions(mqttConnectOptions);
        return factory;
    }

    // 推送通道
    @Bean
    public MessageChannel mqttOutputChannel() {
        return new DirectChannel();
    }
    @Bean
    @ServiceActivator(inputChannel = "mqttOutputChannel")
    public MessageHandler sendHandler(MqttPahoClientFactory mqttPahoClientFactory) {
        MqttPahoMessageHandler messageHandler = new MqttPahoMessageHandler(mqttConfigProperties.getClientId() + "-publish", mqttPahoClientFactory);
        messageHandler.setAsync(true);
        messageHandler.setDefaultQos(1);
        messageHandler.setDefaultTopic(mqttConfigProperties.getDefaultTopic());
        log.info("初始化mqttOutputChannel...");
        return messageHandler;
    }


}

发送网关接口

@MessagingGateway(defaultRequestChannel = "mqttOutputChannel")
public interface MqttGateway {
    /**
     * 发送消息
     *
     * @param topic
     * @param data
     */
    void send(@Header(MqttHeaders.TOPIC) String topic, String data);
}

这样,在发送消息时,直接将消息网关注入,调用发送方法就可以发送了

mqttGateway.send(topic, JSONObject.toJSONString(msg));

参考:
https://mqtt.org/

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1464036.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

程序员必备技能----删库跑路大总结

删库跑路大总结&#xff0c;各个都是大杀器&#xff0c;破坏性太大&#xff0c;轻易不要尝试。 删除linux根目录&#xff0c;用户目录&#xff0c;其实还可以增加一个删除/etc。删除&#xff08;清除&#xff09;数据库。删除redis缓存和持久化文件。删除mongodb库。git push …

大蟒蛇(Python)笔记(总结,摘要,概括)——第9章 类

目录 9.1 创建和使用类 9.1.1 创建Dog类 9.1.2 根据类创建实例 9.2 使用类和实例 9.2.1 Car类 9.2.2 给属性指定默认值 9.2.3 修改属性的值 9.3 继承 9.3.1 子类的_init_()方法 9.3.2 给子类定义属性和方法 9.3.3 重写父类中的方法 9.3.4 将实例用作属性 9.3.5 模拟实物 9.…

JavaWeb——003Axios Vue组件库(Element)

目录 一、Ajax 1、同步与异步​编辑 2、原生Ajax&#xff08;繁琐&#xff09;​编辑 2.1、写一个简易的Ajax 3、Axios&#xff08;推荐使用&#xff09;​编辑 3.1、Axios入门 3.2、Axios请求方式别名 3.3、案例&#xff1a;基于Vue及Axios完成数据的动态加载展示​编…

HarmonyOS—使用预览器查看应用/服务效果

DevEco Studio为开发者提供了UI界面预览功能&#xff0c;可以查看应用/服务的UI界面效果&#xff0c;方便开发者随时调整界面UI布局。预览器支持布局代码的实时预览&#xff0c;只需要将开发的源代码进行保存&#xff0c;就可以通过预览器实时查看应用/服务运行效果&#xff0c…

六、回归与聚类算法 - 逻辑回归与二分类

线性回归欠拟合与过拟合线性回归的改进 - 岭回归分类算法&#xff1a;逻辑回归模型保存与加载无监督学习&#xff1a;K-means算法 1、应用场景 2、原理 2.1 输入 2.2 激活函数 3、损失以及优化 3.1 损失 3.2 优化 4、逻辑回归API 5、分类的评估方法 5.1 精确率和召回率 5.2…

滚雪球学Java(68):全面了解Java中常用的集合类:LinkedHashMap的应用与实践

咦咦咦&#xff0c;各位小可爱&#xff0c;我是你们的好伙伴——bug菌&#xff0c;今天又来给大家普及Java SE相关知识点了&#xff0c;别躲起来啊&#xff0c;听我讲干货还不快点赞&#xff0c;赞多了我就有动力讲得更嗨啦&#xff01;所以呀&#xff0c;养成先点赞后阅读的好…

操作系统虚拟内存(下)

操作系统虚拟内存&#xff08;上&#xff09;-CSDN博客 TLB 多级页表虽然解决了空间上的问题&#xff0c;但是虚拟地址到物理地址的转换就多了几道转换的工序&#xff0c;这显然就降低了这俩地址转换的速度&#xff0c;也就是带来了时间上的开销。 程序是有局部性的&#xff…

使用备份工具xtrabackup完成数据库的备份与恢复

安装备份工具xtrabackup 简介 它是开源免费的支持MySQL 数据库热备份的软件&#xff0c;它能对InnoDB和XtraDB存储引擎的数据库非阻塞地备份。它不暂停服务创建Innodb热备份&#xff1b; 为mysql做增量备份&#xff1b;在mysql服务器之间做在线表迁移&#xff1b;使创建replica…

跟着pink老师前端入门教程(JavaScript)-day05

六、语句 &#xff08;一&#xff09;表达式和语句 1、表达式 表达式是可以被求值的代码&#xff0c;JavaScript 引擎会将其计算出一个结果。 2、语句 语句是一段可以执行的代码。 比如&#xff1a; prompt() 可以弹出一个输入框&#xff0c;还有 if语句 for 循环语句等…

华为OD机试真题-用连续自然数之和来表达整数-2023年OD统一考试(C卷)---python代码免费

题目&#xff1a; 代码 """ 题目分析&#xff1a; 一个整数 连续的自然数之和表示(非负整数&#xff09;输入&#xff1a; 一个整数T[1,1000] 输出&#xff1a; 输出多个表达式&#xff0c;自然数个数最少优先输出 最后一行&#xff0c; 输出“Result : 个数…

【监督学习之线性回归】

曾梦想执剑走天涯&#xff0c;我是程序猿【AK】 目录 简述概要知识图谱 简述概要 了解什么是线性回归 知识图谱 监督学习中的线性回归是一种预测模型&#xff0c;它试图通过拟合一个线性方程来建立输入变量&#xff08;特征&#xff09;和输出变量&#xff08;目标值&#x…

JAVA实现数据导出到excel文件

开始 1. 需要引入依赖 <dependency><groupId>org.apache.poi</groupId><artifactId>poi-ooxml</artifactId><version>5.2.2</version></dependency> 2. 核心代码 // 创建一个工作簿&#xff0c;也就是Excel文件 HSSFWorkb…

bat判断vmware 是否已安装

要通过批处理脚本&#xff08;.bat&#xff09;判断VMware是否已安装&#xff0c;您可以尝试以下方法&#xff1a; 检查VMware的进程 您可以检查VMware相关的进程是否在运行。例如&#xff0c;VMware Workstation的进程名通常是vmware-workstation.exe。 echo off tasklist /…

微信小程序开发学习笔记——2.8媒体组件image的src三种引入方式

>>跟着b站up主“咸虾米_”学习微信小程序开发中&#xff0c;把学习记录存到这方便后续查找。 课程连接&#xff1a; https://www.bilibili.com/video/BV19G4y1K74d?p11 image&#xff1a;https://developers.weixin.qq.com/miniprogram/dev/component/image.html 一…

基于springboot+vue的服装生产管理系统(前后端分离)

博主主页&#xff1a;猫头鹰源码 博主简介&#xff1a;Java领域优质创作者、CSDN博客专家、阿里云专家博主、公司架构师、全网粉丝5万、专注Java技术领域和毕业设计项目实战&#xff0c;欢迎高校老师\讲师\同行交流合作 ​主要内容&#xff1a;毕业设计(Javaweb项目|小程序|Pyt…

【Chrono Engine学习总结】4-vehicle-4.3-两个vehicle碰撞测试

由于Chrono的官方教程在一些细节方面解释的并不清楚&#xff0c;自己做了一些尝试&#xff0c;做学习总结。 今天突发奇想&#xff0c;想试一下&#xff0c;是否可以实现两个vehicle的碰撞&#xff1f; 1、两辆vehicle的仿真 官方提供了demo_VEH_TwoCars这个demo&#xff0c…

「C#」WPF学习笔记-基础类及继承关系

1、DependencyObject DependencyObject是WPF中依赖属性系统的核心&#xff0c;它为WPF的数据绑定、动画和属性共享等功能提供了支持&#xff0c;是一个非常重要的基类。 其主要特点和职责包括&#xff1a; 依赖属性系统&#xff1a;DependencyObject 是所有支持依赖属性的类…

js设计模式:观察者模式

作用: 和发布订阅模式基本类似。 当某一对象状态发生变化时,所有的观察者都会收到通知。 vue响应式原理就是很经典的案例,数据发生变化,通知各个依赖。 示例: class TaobaoShop{constructor(){this.list []}addSub(name,data){this.list.push({name,data})}pubUser(name,d…

Web3 基金会推出去中心化之声计划:投入高额 DOT 和 KSM ,助力去中心化治理

作者&#xff1a;Web3 Foundation Team 编译&#xff1a;OneBlock 原文&#xff1a;https://medium.com/web3foundation/decentralized-voices-program-93623c27ae43 Web3 基金会为 Polkadot 和 Kusama 创建了去中心化之声计划&#xff08;Decentralized Voices Program&…

【Java多线程】分析线程加锁导致的死锁问题以及解决方案

目录 1、线程加锁 2、死锁问题的三种经典场景 2.1、一个线程一把锁 2.2、两个线程两把锁 2.3、N个线程M把锁&#xff08;哲学家就餐问题&#xff09; 3、解决死锁问题 1、线程加锁 其中 locker 可以是任意对象&#xff0c;进入 synchronized 修饰的代码块, 相当于加锁&…