Spring Boot 集成MQTT代码示例

news2024/10/6 18:07:44

文章目录

  • 1. 简介
    • 使用场景
  • 2. 搭建MQTT测试环境服务
    • 1. 先创建映射目录
    • 2. 创建两个文件
      • 2.1. mosquitto.conf
    • 3. 启动 MQTT服务 Docker 容器
      • 3.1. 配置用户名和密码
        • 3.1.1. 创建密码文件
        • 3.1.2. 修改配置文件,追加密码文件
      • 3.2. 重启mosquitto 容器服务
  • 3. 编写测试程序
    • 3.1.pom依赖
    • 3.2.配置文件
    • 3.3.配置连接信息初始化client
    • MqttService
    • 使用Controller来直观的测试验证
  • 4. 常见报错

1. 简介

MQTT,全称Message Queuing Telemetry Transport,即消息队列遥测传输。它是一种基于发布/订阅模式的"轻量级"消息协议,有助于低带宽、不可靠或高延迟的网络环境中的远程传感器和控制设备消息通信。
在这里插入图片描述

让我们以一个生动的例子来揭开MQTT的神秘面纱。假设你是一名热衷于植物种植的园丁,你有一个自动灌溉系统,它可以根据植物的需要,如土壤湿度、天气预报等,自动浇水。在这个系统中,MQTT就像是你和这个自动灌溉系统之间的通信者。

  • 发布/订阅模式: 这就像你告诉自动灌溉系统:“嘿,如果土壤湿度低于某个阈值,就告诉我一声。”(即订阅),然后,当土壤湿度确实低于阈值时,系统就会通知你:“嘿,土壤湿度低了,需要浇水了。”(即发布)。

  • 轻量级: MQTT设计得非常简洁轻巧,就像是你和自动灌溉系统的对话中只包含必要的信息,例如“土壤湿度”、“浇水”,而不包含任何复杂的语法或者冗余的信息。

  • QoS(Quality of Service): MQTT定义了三种消息质量等级:QoS 0(最多一次)、QoS 1(至少一次)和QoS 2(只有一次)。这就像是你可以告诉自动灌溉系统:“对于浇水这件事,我希望你只通知我一次。”或者“对于土壤湿度的问题,我希望你一直通知我,直到我处理完为止。”

  • 保留消息和遗嘱消息: MQTT可以设置保留消息,让新订阅者立即得到最新的更新。此外,还可以设置遗嘱消息,当客户端异常断开连接时,服务器会发布这个遗嘱消息,通知其他客户端。这就像是你可以对自动灌溉系统说:“如果我突然失去联系,你就自动启动浇水程序。”

使用场景

  1. 物联网(IoT)通信:MQTT是物联网通信的重要协议之一。它提供了一种可靠、高效的方式来连接和交换传感器、设备和应用程序之间的数据。MQTT的轻量级特性使其在资源受限的设备上运行效率高,并支持发布/订阅模型,使设备能够通过订阅感兴趣的主题(Topic)来接收数据,或者通过发布数据到特定主题来向其他设备传递信息。
    在这里插入图片描述

  2. 实时数据传输:MQTT可以用于实时数据传输,例如传感器数据、监测数据、实时位置数据等。它支持低延迟、高可靠性的消息传输,并具有良好的扩展性,可以应对大规模的数据传输需求。

  3. 远程监控与控制:MQTT可以用于远程监控和控制各种设备和系统。通过将设备连接到MQTT Broker,可以实现对设备状态的实时监控,并通过MQTT消息发送指令来控制设备操作。

  4. 传感器网络:MQTT适用于传感器网络,其中大量的传感器设备需要将数据发送到中心或云平台进行处理和分析。MQTT的低网络开销、可靠的消息传输和易于实现的特性使其成为传感器网络中常用的通信协议。

  5. 聊天和即时通信:MQTT的轻量级和高效性使其成为构建聊天和即时通信应用程序的理想选择。通过MQTT,可以实现实时的消息传递和订阅机制,支持点对点通信、多用户聊天和群组通信等功能。

  6. 跨平台集成:MQTT是一种跨平台的协议,可以在各种操作系统和设备上运行,包括嵌入式设备、移动设备和服务器。这使得MQTT成为不同设备和应用程序之间进行通信和集成的理想选择。
    在这里插入图片描述

2. 搭建MQTT测试环境服务

我是用docker 搭建了一个简单 的mosquitto 服务端

1. 先创建映射目录

  $ mkdir -p /data/mosquitto/config 
  $ mkdir -p  /data/mosquitto/data 
  $ mkdir -p  /data/mosquitto/log

2. 创建两个文件

mosquitto.confpasswd

root@ip /data/mosquitto/config# ll
total 4
-rw-r--r-- 1 1883 1883 168 Sep  8 18:05 mosquitto.conf
-rw-r--r-- 1 root root   0 Sep  8 18:02 passwd

2.1. mosquitto.conf

新建 /data/mosquitto/config/mosquitto.conf
追加以下内容


persistence true
persistence_location /mosquitto/data/
allow_anonymous false
log_dest file /mosquitto/log/mosquitto.log
listener 1883

3. 启动 MQTT服务 Docker 容器

Docker中创建并运行一个名为"mosquitto"的容器,该容器包含了Eclipse Mosquitto MQTT Broker。该容器将监听主机的1883端口和9003端口,同时使用映射的配置文件、数据目录和日志目录来进行配置和持久化存储。

docker run -it --name=mosquitto --privileged -p 1883:1883 -p 9003:9001 -v /data/mosquitto/config/mosquitto.conf:/mosquitto/config/mosquitto.conf -v /data/mosquitto/data:/mosquitto/data -v /data/mosquitto/log:/mosquitto/log -d eclipse-mosquitto

--privileged: 这个选项授予容器特权访问,以便执行某些特定的操作和功能。

-p 1883:1883: 这个选项将主机的端口1883映射到容器内部的端口1883,用于MQTT协议的通信。

-p 9003:9001: 这个选项将主机的端口9003映射到容器内部的端口9001,用于WebSocket通信。

-v /data/mosquitto/config/mosquitto.conf:/mosquitto/config/mosquitto.conf: 这个选项将主机上的Mosquitto配置文件mosquitto.conf映射到容器内部的相应路径,以便在容器中加载自定义配置。

-v /data/mosquitto/data:/mosquitto/data: 这个选项将主机上的Mosquitto数据目录映射到容器内部的相应路径,用于持久化存储Mosquitto的数据。

-v /data/mosquitto/log:/mosquitto/log: 这个选项将主机上的Mosquitto日志目录映射到容器内部的相应路径,用于存储Mosquitto的日志文件。

在这里插入图片描述

3.1. 配置用户名和密码

3.1.1. 创建密码文件

# 进入容器执行
$ docker exec -it b495e3d42429 sh
# 容器内执行
$ mosquitto_passwd -c /data/mosquitto/passwd root
输入两次密码后

3.1.2. 修改配置文件,追加密码文件

mosquitto.conf


persistence true
persistence_location /mosquitto/data/
allow_anonymous false
log_dest file /mosquitto/log/mosquitto.log
listener 1883
password_file /mosquitto/passwd

3.2. 重启mosquitto 容器服务

docker restart mosquitto 

程序即可正常连接,这个版本如果没有配置密码是不能远程访问的额

3. 编写测试程序

3.1.pom依赖

        <dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-web</artifactId>
		</dependency>
		<dependency>
			<groupId>org.eclipse.paho</groupId>
			<artifactId>org.eclipse.paho.client.mqttv3</artifactId>
			<version>1.2.5</version>
		</dependency>
		<dependency>
			<groupId>org.projectlombok</groupId>
			<artifactId>lombok</artifactId>
			<optional>true</optional>
		</dependency>
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-test</artifactId>
			<scope>test</scope>
		</dependency>
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-actuator</artifactId>
		</dependency>

3.2.配置文件

       # 服务器的端口号
        server.port=8080

        # MQTT 代理服务器的连接URI
        mqtt.broker=tcp://127.0.0.1:1883

        # MQTT 客户端ID,应保证在同一MQTT服务器上唯一
        mqtt.clientId=1

        # MQTT 默认的订阅话题
        mqtt.topic=testTopic

        # MQTT 代理服务器的用户名
        mqtt.username=root

        # MQTT 代理服务器的密码
        mqtt.password=123456

        # MQTT 链接是否启用清理会话,true 表示客户端与服务器断开连接后,会话信息将被清除,false 表示信息将会保留,以便客户端重新连接
        mqtt.cleanSession=true

        # MQTT 设置的心跳时间,即每隔多久时间(秒)客户端需要向服务端发送一个PINGREQ报文
        mqtt.keepAlive=60

        # MQTT 连接超时设置,指的是客户端连接到服务器时,等待CONNACK报文回应的最大时间间隔。
        mqtt.timeout=30

3.3.配置连接信息初始化client


package com.icepip.project.mqtt.config;

import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;


/**
* MQTT配置
*
* @author 冰点
* @version 1.0.0
* @date 2023/9/8 16:14
  */

@Configuration
public class MqttConfig {

    @Value("${mqtt.broker}")
    private String mqttBroker;

    @Value("${mqtt.clientId}")
    private String mqttClientId;
    @Value("${mqtt.username}")
    private String username;
    @Value("${mqtt.password}")
    private String password;
    @Value("${mqtt.cleanSession}")
    private Boolean cleanSession;

    @Value("${mqtt.keepAlive}")
    private Integer keepAlive;

    @Value("${mqtt.timeout}")
    private Integer timeout;
    @Bean
    public MqttConnectOptions mqttConnectOptions() {
        MqttConnectOptions options = new MqttConnectOptions();
        options.setServerURIs(new String[]{mqttBroker});
        options.setCleanSession(true);
        // 其他设置,如用户名和密码等
        options.setUserName(username);
        options.setPassword(password.toCharArray());
        options.setCleanSession(cleanSession);
        options.setKeepAliveInterval(keepAlive);
        options.setConnectionTimeout(timeout);
        return options;
    }

    @Bean
    public MqttClient mqttClient() throws MqttException {
        MqttClient mqttClient = new MqttClient(mqttBroker, MqttClient.generateClientId());
        mqttClient.connect(mqttConnectOptions());
        return mqttClient;
    }
}

MqttService

package com.icepip.project.mqtt.service;

import org.eclipse.paho.client.mqttv3.IMqttMessageListener;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
/**
 * @author 冰点(icepip.blog.csdn.com)
 * @version 1.0.0
 * @date 2023/9/8 16:15
 */
@Service
public class MqttService {

    @Autowired
    private MqttClient mqttClient;


    public void publishMessage(String topic, String message) throws MqttException {
        MqttMessage mqttMessage = new MqttMessage(message.getBytes());
        mqttClient.publish(topic, mqttMessage);
    }

    public void subscribeToTopic(String topic, IMqttMessageListener listener) throws MqttException {
        mqttClient.subscribe(topic, listener);
    }
}

使用Controller来直观的测试验证

package com.icepip.project.mqtt.controller;

import com.icepip.project.mqtt.service.MqttService;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.util.StringUtils;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;

import javax.annotation.PostConstruct;

/**
 * 使用Controller来直观的测试验证
 * @author 冰点(icepip.blog.csdn.com)
 * @version 1.0.0
 * @date 2023/9/8 16:15
 */

@RestController
@RequestMapping("/mqtt")
public class MqttController {

    private final MqttService mqttService;
    @Value("${mqtt.topic:testTopic}")
    public String DEFAULT_TOPIC;

    @Autowired
    public MqttController(MqttService mqttService) {
        this.mqttService = mqttService;
    }

    /**
     * 模拟发布消息
     *
     * @param topic
     * @param message
     * @throws MqttException
     */
    @PostMapping("/publish")
    public void publishMessage(@RequestParam String topic, @RequestParam String message) throws MqttException {
        if (StringUtils.isEmpty(topic)) {
            topic=this.DEFAULT_TOPIC;
        }
        mqttService.publishMessage(topic, message);
    }

    /**
     * 模拟订阅消息
     *
     * @param topic
     * @throws MqttException
     */
    @PostMapping("/subscribe")
    public void subscribeToTopic(@RequestParam String topic) throws MqttException {
        mqttService.subscribeToTopic(topic, (topic1, message) ->
                System.out.println("Received topic:"+topic1+",message: " + new String(message.getPayload())));
    }

    /**
     * 监听默认消息
     * @throws MqttException
     */
    @PostConstruct
    public void subscribeToTopic() throws MqttException {
        mqttService.subscribeToTopic(DEFAULT_TOPIC, (topic1, message) ->
                System.out.println("Received topic:"+topic1+",message: " + new String(message.getPayload())));
    }
}

测试
在这里插入图片描述

输出结果
在这里插入图片描述

4. 常见报错

1.org.eclipse.paho.client.mqttv3.MqttSecurityException: 无权连接 此错误需要配置账号和密码

2.connect timeout 是端口没有开启端口有两个,一个是数据端口 一个websocket 端口。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/989180.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

2023--9-8 高斯消元解线性方程组

题目链接&#xff1a;高斯消元解线性方程组 #include <iostream> #include <algorithm> #include <cmath>using namespace std;const int N 110; const double eps 1e-8;int n; double a[N][N];int gauss() {int c, r;for(c 0, r 0; c < n; c){// 找到…

(其他) 剑指 Offer 64. 求1+2+…+n ——【Leetcode每日一题】

❓ 剑指 Offer 64. 求12…n 难度&#xff1a;中等 求 12...n &#xff0c;要求不能使用乘除法、for、while、if、else、switch、case 等关键字及 条件判断语句&#xff08;A?B:C&#xff09;。 示例 1&#xff1a; 输入: n 3 输出: 6 示例 2&#xff1a; 输入: n 9 输出:…

LeetCode(力扣)491. 递增子序列Python

LeetCode491. 递增子序列 题目链接代码 题目链接 https://leetcode.cn/problems/non-decreasing-subsequences/ 代码 class Solution:def backtracking(self, nums, index, result, path):if len(path) > 1:result.append(path[:])uset set()for i in range(index, len…

小程序实现摄像头拍照 + 水印绘制

文章标题 01 功能说明02 使用方式 & 效果图2.1 基础用法2.2 拍照 底部定点水印 预览2.3 拍照 整体背景水印 预览 03 全部代码3.1 页面布局 html3.2 业务核心 js3.3 基础样式 css 01 功能说明 需求&#xff1a;小程序端需要调用前置摄像头进行拍照&#xff0c;并且将拍…

当面试官问你离职原因的时候怎么回答比较好?

所有的前提都是建立在有一定的物质基础&#xff0c;当你的一日三餐都成了问题&#xff0c;都需要家庭支持的时候我希望你可以找一份工作&#xff0c;靠自己的本事养活自己从来不丢人&#xff0c;我觉得死要面子活受罪才是真的让你看不起。 所有的建议都是建立在我们是普通打工人…

如何用Jmeter编写脚本压测?

随着商业业务不断扩张&#xff0c;调用adsearch服务频率越来越高&#xff0c;所以这次想做个压测&#xff0c;了解目前多少并发量可以到达adsearch服务的界值。 这次选用的jmeter压测工具&#xff0c;压测思路如图&#xff1a; 一、日志入参 日志选取的adsearch 的 getads部分…

电工-什么是电流

什么是电流&#xff1f;电流计算公式和单位换算及电流方向讲解 前面了解到电路的基本组成是包括&#xff1a;电能、负载、导线构成的&#xff0c;而这电路就是电流流通的路径&#xff0c;那么什么是电流呢&#xff1f;下面就讲讲电流形成的基本概念以及电流计算公式、单位和方…

2023-9-8 求组合数(二)

题目链接&#xff1a;求组合数 II #include <iostream> #include <algorithm>using namespace std;typedef long long LL; const int mod 1e9 7; const int N 100010;// 阶乘&#xff0c;阶乘的逆 int fact[N], infact[N];LL qmi(int a, int k, int p) {int res…

HTTPS 之fiddler抓包--jmeter请求

一、浅谈HTTPS 我们都知道HTTP并非是安全传输&#xff0c;在HTTPS基础上使用SSL协议进行加密构成的HTTPS协议是相对安全的。目前越来越多的企业选择使用HTTPS协议与用户进行通信&#xff0c;如百度、谷歌等。HTTPS在传输数据之前需要客户端&#xff08;浏览器&#xff09;与服…

selenium的Chrome116版驱动下载

这里写自定义目录标题 下载地址https://googlechromelabs.github.io/chrome-for-testing/#stable 选择chromedriver 对应的平台和版本 国内下载地址 https://download.csdn.net/download/dongtest/88314387

分享一个Python Django影片数据爬取与数据分析系统源码

&#x1f495;&#x1f495;作者&#xff1a;计算机源码社 &#x1f495;&#x1f495;个人简介&#xff1a;本人七年开发经验&#xff0c;擅长Java、Python、PHP、.NET、微信小程序、爬虫、大数据等&#xff0c;大家有这一块的问题可以一起交流&#xff01; &#x1f495;&…

数据结构——带头双向循环链表

数据结构——带头双向循环链表 一、带头双向循环链表的定义二、带头双向循环链表的实现2.1初始化创建带头双向循环链表的节点2.2申请新节点2.3节点的初始化2.4带头双向循环链表的尾插2.5带头双向循环链表的头插2.6判空函数2.7带头双向循环链表的打印函数2.8带头双向循环链表的尾…

计算机竞赛 基于深度学习的目标检测算法

文章目录 1 简介2 目标检测概念3 目标分类、定位、检测示例4 传统目标检测5 两类目标检测算法5.1 相关研究5.1.1 选择性搜索5.1.2 OverFeat 5.2 基于区域提名的方法5.2.1 R-CNN5.2.2 SPP-net5.2.3 Fast R-CNN 5.3 端到端的方法YOLOSSD 6 人体检测结果7 最后 1 简介 &#x1f5…

OpenCV 04(通道分离与合并 | 绘制图形)

一、通道的分离与合并 - split(mat)分割图像的通道 - merge((ch1,ch2, ch3)) 融合多个通道 import cv2 import numpy as npimg np.zeros((480, 640, 3), np.uint8)b,g,r cv2.split(img)b[10:100, 10:100] 255 g[10:100, 10:100] 255img2 cv2.merge((b, g, r))cv2.imshow…

《TCP/IP网络编程》阅读笔记--并发多进程服务端的使用

1--并发服务器端 并发服务器端主要有以下三类&#xff1a; ① 多进程服务器&#xff1a;通过创建多个进程提供服务&#xff1b; ② 多路复用服务器&#xff1a;通过捆绑并统一管理I/O对象提供服务&#xff1b; ③ 多线程服务器&#xff1a;通过生成与客户端等量的线程提供服务&…

C/C++ ——内存管理

前言 为什么要研究内存管理&#xff1f; (1)程序员写的程序可以分为动态和静态两种状态。静态&#xff1a;就是程序被存放在ROM中&#xff0c;也就是磁盘、固态硬盘、eMMC等存储介质&#xff1b;动态&#xff1a;程序被执行&#xff0c;此时程序在RAM内存中运行&#xff1b; (…

图床项目数据库表设计

一、表设计 share_picture_list 和 share_file_list 类似&#xff0c;只是 share_picture_list 只存储共享图片相关的信息&#xff0c;及分享给未注册用户看的。share_file_list 是存储共享文件&#xff08;包括图片文件&#xff09;相关的信息&#xff0c;分享给已注册用户的。…

【数据结构】 七大排序详解(贰)——冒泡排序、快速排序、归并排序

文章目录 ⚽冒泡排序⚾算法步骤&#x1f3a8;算法优化&#x1f94e;代码实现&#xff1a;&#x1f3c0;冒泡排序的特性总结 &#x1f9ed;快速排序⚽算法思路&#x1f4cc;思路一&#xff08;Hoare版&#xff09;&#x1f4cc;思路二&#xff08;挖坑法&#xff09;&#x1f4c…

PCL入门(四):kdtree简单介绍和使用

目录 1. kd树的意义2. kd树的使用 参考博客《欧式聚类&#xff08;KD-Tree&#xff09;详解&#xff0c;保姆级教程》和《(三分钟)学会kd-tree 激光SLAM点云搜索常见》 1. kd树的意义 kd树是什么&#xff1f; kd树是一种空间划分的数据结构&#xff0c;对于多个维度的数据&a…

小米汽车,能否在新能源汽车江湖站稳脚跟?

最近&#xff0c;圈内都在传小米汽车亦庄工厂已试生产近一个月&#xff0c;每周可产50辆样车&#xff0c;正在为首款新能源汽车量产做最后的准备。 此前的业绩交流会上&#xff0c;小米集团总裁卢伟冰透露&#xff0c;小米汽车结束了夏测且进展非常顺利&#xff0c;2024年上半…