Java实现MQTT通信(发布订阅消息)

news2024/9/20 5:38:15

文章目录

  • 前言
  • 一、相关pom依赖
  • 二、相关代码
    • 1.MQTT工具类
    • 2.MQTT回调函数
    • 3.订阅消息
    • 4.发布消息
  • 三、安装mosquitto
    • 1.mosquitto简介
    • 2.下载
  • 四、安装MQTT.fx
    • 1.MQTT.fx简介
    • 2.下载
    • 3.使用
  • 五、java订阅消息
  • 六、java发布消息


前言

MQTT是一种轻量级的物联网通信协议,基于客户端-服务器的消息发布/订阅传输协议,支持QoS级别,适用于低带宽、高延迟的网络环境。它具有精简的协议设计,开放的消息协议,以及广泛应用于物联网(IOT)、M2M通信、消息推送和智能设备等领域。MQTT协议涉及发布者、订阅者和消息代理(Broker)的角色,以及连接、订阅、发布消息的过程,并包含会话保持和心跳机制,确保消息的可靠传输。

MQTT最大优点在于,可以以极少的代码和有限的带宽,为连接远程设备提供实时可靠的消息服务。作为一种低开销、低带宽占用的即时通讯协议,使其在物联网、小型设备、移动应用等方面有较广泛的应用。


一、相关pom依赖

        <!-- MQTT -->
        <dependency>
            <groupId>org.springframework.integration</groupId>
            <artifactId>spring-integration-mqtt</artifactId>
        </dependency>

二、相关代码

1.MQTT工具类

import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.*;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import org.springframework.stereotype.Component;
 
/**
 * MQTT工具类
 */
@Slf4j
@Component
public class MQTTConnect {

    private String HOST = "tcp://127.0.0.1:1883"; //mqtt服务器的地址和端口号
    private final String clientId = "Test";
    private final String username = "test";
    private final String password = "123456";
    private MqttClient mqttClient;

    /**
     * 测试订阅消息
     */
    public static void main(String[] args) throws Exception {
        MQTTConnect mqttConnect = new MQTTConnect();
        mqttConnect.start();
        //订阅消息
        mqttConnect.sub("/topic/#",0);
    }

    public void start() throws MqttException {
        // host为主机名,clientid即连接MQTT的客户端ID,一般以唯一标识符表示,MemoryPersistence设置clientid的保存形式,默认为以内存保存
        mqttClient = new MqttClient(HOST, clientId + System.currentTimeMillis(), new MemoryPersistence());
        // MQTT的连接设置
        MqttConnectOptions options = new MqttConnectOptions();
        options.setUserName(username);
        options.setPassword(password.toCharArray());
        // 设置超时时间 单位为秒
        options.setConnectionTimeout(10);///默认:30
        // 设置是否清空session,这里如果设置为false表示服务器会保留客户端的连接记录,设置为true表示每次连接到服务器都以新的身份连接
        options.setCleanSession(false);//默认:true
        // 设置断开后重新连接(设置为true时将启用自动重新连接)
        options.setAutomaticReconnect(true);//默认:false
        // 设置会话心跳时间 单位为秒 服务器会每隔1.5*20秒的时间向客户端发送个消息判断客户端是否在线,但这个方法并没有重连的机制
        options.setKeepAliveInterval(20);//默认:60
        // 设置回调
        mqttClient.setCallback(new Callback());
        mqttClient.connect(options);
    }

    /**
     * 自定义mqtt连接
     * @param host
     * @param clientId
     * @param userName
     * @param passWord
     * @param connectionTimeout
     * @param cleanSession
     * @param automaticReconnect
     * @param keepAliveInterval
     * @param mqttCallback
     * @throws MqttException
     */
    public void start(String host,String clientId, String userName, String passWord,
                      int connectionTimeout, boolean cleanSession,boolean automaticReconnect,
                      int keepAliveInterval,MqttCallback mqttCallback) throws MqttException {
        // host为主机名,clientid即连接MQTT的客户端ID,一般以唯一标识符表示,MemoryPersistence设置clientid的保存形式,默认为以内存保存
        mqttClient = new MqttClient(host, clientId + System.currentTimeMillis(), new MemoryPersistence());
        // MQTT的连接设置
        MqttConnectOptions options = new MqttConnectOptions();
        options.setUserName(userName);
        options.setPassword(passWord.toCharArray());
        // 设置超时时间 单位为秒
        options.setConnectionTimeout(connectionTimeout);///默认:30
        // 设置是否清空session,这里如果设置为false表示服务器会保留客户端的连接记录,设置为true表示每次连接到服务器都以新的身份连接
        options.setCleanSession(cleanSession);//默认:true
        // 设置断开后重新连接(设置为true时将启用自动重新连接)
        options.setAutomaticReconnect(automaticReconnect);//默认:false
        // 设置会话心跳时间 单位为秒 服务器会每隔1.5*20秒的时间向客户端发送个消息判断客户端是否在线,但这个方法并没有重连的机制
        options.setKeepAliveInterval(keepAliveInterval);//默认:60
        // 设置回调
        mqttClient.setCallback(mqttCallback);
        mqttClient.connect(options);
    }
 
    /**
     * 关闭MQTT连接
     */
    public void close() throws MqttException {
        mqttClient.disconnect();
        mqttClient.close();
    }
 
    /**
     * 向某个主题发布消息 默认qos:1
     *
     * @param topic:发布的主题
     * @param msg:发布的消息
     */
    public void pub(String topic, String msg) throws MqttException {
        MqttMessage mqttMessage = new MqttMessage();
        //mqttMessage.setQos(2);
        mqttMessage.setPayload(msg.getBytes());
        MqttTopic mqttTopic = mqttClient.getTopic(topic);
        MqttDeliveryToken token = mqttTopic.publish(mqttMessage);
        token.waitForCompletion();
    }
 
    /**
     * 向某个主题发布消息
     *
     * @param topic: 发布的主题
     * @param msg:   发布的消息
     * @param qos:   消息质量    Qos:0、1、2
     */
    public void pub(String topic, String msg, int qos) throws MqttException {
        MqttMessage mqttMessage = new MqttMessage();
        mqttMessage.setQos(qos);
        mqttMessage.setPayload(msg.getBytes());
        MqttTopic mqttTopic = mqttClient.getTopic(topic);
        MqttDeliveryToken token = mqttTopic.publish(mqttMessage);
        token.waitForCompletion();
    }
 
    /**
     * 订阅某一个主题 ,此方法默认的的Qos等级为:1
     *
     * @param topic 主题
     */
    public void sub(String topic) throws MqttException {
        mqttClient.subscribe(topic);
    }
 
    /**
     * 订阅某一个主题,可携带Qos
     *
     * @param topic 所要订阅的主题
     * @param qos   消息质量:0、1、2
     */
    public void sub(String topic, int qos) throws MqttException {
        mqttClient.subscribe(topic, qos);
    }
}

2.MQTT回调函数

import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttMessage;
 
/**
 * 常规MQTT回调函数
 * MqttCallback 接口定义了用于处理 MQTT 客户端异步事件的方法。
 * 当使用 Eclipse Paho MQTT 客户端库时,你可以实现这个接口来接收连接状态的变化和消息传递的通知。
 */
@Slf4j
public class Callback implements MqttCallback {

    /**
     * MQTT 断开连接会执行此方法
     *
     * 方法说明:当客户端与 MQTT 服务器之间的连接丢失时,此方法被调用。
     * @param throwable 表示导致连接丢失的原因,通常为一个 Throwable 对象
     */
    @Override
    public void connectionLost(Throwable throwable) {
        log.info("断开了MQTT连接 :{}", throwable.getMessage());
        log.error(throwable.getMessage(), throwable);
    }

    /**
     * publish发布成功后会执行到这里
     *
     * 方法说明:当一个消息的交付完成并且所有必要的确认都已收到时,此方法被调用。
     * 注意事项:
     * 对于 QoS 0 消息,在消息被网络层接收后会调用此方法。
     * 对于 QoS 1 消息,在接收到 PUBACK 包后调用此方法。
     * 对于 QoS 2 消息,在接收到 PUBCOMP 包后调用此方法。
     * @param iMqttDeliveryToken the delivery token associated with the message.
     */
    @Override
    public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
        log.info("发布消息成功");
    }
 

    /**
     * subscribe订阅后得到的消息会执行到这里
     *
     * 方法说明:当从服务器接收到一条新消息时,此方法被调用
     * 注意事项:
     * 在此方法中抛出任何异常将会导致客户端关闭,并且未确认的消息可能会被重新发送。
     * 如果在此方法执行期间有其他消息到达,它们将被缓存直到此方法返回。
     * @param topic 消息发布的主题名称
     * @param message 实际的消息内容,类型为 MqttMessage
     * @throws Exception
     */
    @Override
    public void messageArrived(String topic, MqttMessage message) throws Exception {
        //  TODO    此处可以将订阅得到的消息进行业务处理、数据存储
        log.info("收到来自 " + topic + " 的消息:{}", new String(message.getPayload()));
    }
}

3.订阅消息

import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationListener;
import org.springframework.context.event.ContextRefreshedEvent;
import org.springframework.stereotype.Component;
 
/**
 * 项目启动 监听主题
 */
@Slf4j
@Component
public class MQTTListener implements ApplicationListener<ContextRefreshedEvent> {
 
    private final MQTTConnect server;
 
    @Autowired
    public MQTTListener(MQTTConnect server) {
        this.server = server;
    }
 
    @Override
    public void onApplicationEvent(ContextRefreshedEvent contextRefreshedEvent) {
        try {
            server.start();
            server.sub("/topic/#");
            log.info("-----------消息订阅成功-----------");
        } catch (Exception e) {
            log.error(e.getMessage(), e);
            log.error("-----------消息订阅失败-----------");
        }
    }
}

4.发布消息

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;

@RestController
public class MqttTestController {

    @Autowired
    private MQTTConnect mqttConnect;

    @GetMapping("mqttTest01")
    public void test(@RequestParam("msg") String msg,
                     @RequestParam("topic") String topic,@RequestParam("qos") int qos) throws Exception {
        mqttConnect.pub(topic, msg,qos);
    }
}

三、安装mosquitto

1.mosquitto简介

Mosquitto是用C语言实现MQTT协议的Broker。是一款实现了消息推送协议 MQTT v3.1 的开源消息代理软件,提供轻量级的,支持可发布/可订阅的的消息推送模式,使设备对设备之间的短消息通信变得简单,比如现在应用广泛的低功耗传感器,手机、嵌入式计算机、微型控制器等移动设备。一个典型的应用案例就是 Andy Stanford-ClarkMosquitto(MQTT协议创始人之一)在家中实现的远程监控和自动化。

2.下载

官网:https://mosquitto.org/download/

百度网盘:https://pan.baidu.com/s/1nOECYW3hrigcrhbQlcZU_Q?pwd=1234

本文中使用mosquitto当服务器来处理客户端发布或订阅的消息。

下载完安装一直点击next即可,需要注意的是安装完成后打开服务查看mosquitto是否启动(可能会出现端口冲突导致启动失败)
在这里插入图片描述

四、安装MQTT.fx

1.MQTT.fx简介

MQTT.fx是一个基于Eclipse Paho用Java编写的MQTT客户端软件。支持通过Topic订阅和发布消息,用来前期模拟设备和物理云平台等调试。

2.下载

官网暂时无法打开
百度云盘:https://pan.baidu.com/s/1txsSLKHi5IqUb1HooOHT-g?pwd=1234

3.使用

下载完安装一直点击next即可,安装成功后先编辑连接信息
在这里插入图片描述
编辑连接
在这里插入图片描述
在这里插入图片描述
主界面
在这里插入图片描述

五、java订阅消息

启动java程序

	/**
     * 测试订阅消息
     */
    public static void main(String[] args) throws Exception {
        MQTTConnect mqttConnect = new MQTTConnect();
        mqttConnect.start();
        //订阅消息
        mqttConnect.sub("/topic/#",0);
    }

点击发送消息
在这里插入图片描述
java输出台

19:55:13.276 [MQTT Call: Test] INFO com.xxx.mqtt.Callback - 收到来自 /topic/qf 的消息:hello my name is qf

也可以点击脚步进行编辑要持续发送的消息。
在这里插入图片描述

var Thread = Java.type("java.lang.Thread");

function execute(action) {
    out("Test Script: " + action.getName());
    for (var i = 0; i < 100; i++) {
        switchON();
        Thread.sleep(500);
        switchOFF();
        Thread.sleep(500);
    }
    action.setExitCode(0);
    action.setResultText("done.");
    out("Test Script: Done");
    return action;
}

function switchON() {
    out("fountain ON");
    mqttManager.publish("/topic/qf", "message01");
}

function switchOFF() {
    out("fountain OFF");
    mqttManager.publish("/topic/qf", "message02");
}

function out(message){
     output.print(message);
}

在这里插入图片描述

本文只简单介绍使用,如果需要了解更多MQTT.fx相关内容,可以查看以下的文章:
https://blog.csdn.net/qq_33406883/article/details/107492604

六、java发布消息

在这里插入图片描述

启动springboot项目,通过接口发布消息,
http://127.0.0.1:8080/mqttTest01?topic=/topic/qf&msg=hello qf !&qos=0

MQTT.fx即可收到消息
在这里插入图片描述


参考文章:java连接MQTT服务器(Springboot整合MQTT)

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2056857.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

[Meachines] [Easy] Blue MS17-010永恒之蓝

信息收集 IP AddressOpening Ports10.10.10.40TCP:135/tcp msrpc, 139/tcp netbios-ssn, 445/tcp microsoft-ds, 49152/tcp msrpc, 49153/tcp msrpc, 49154/tcp msrpc, 49155/tcp msrpc, 49156/tcp msrpc, 49157/tcp msrpc $ nmap -p- 10.10.10.40 --min-rate 1000 -sC -sV …

YOLOV8 POSE姿态检测对图片绘制矩形和和关节点序号

代码如下 import cv2 import torchfrom ultralytics import YOLO# Load a model # model YOLO("yolov8n-pose.yaml") # build a new model from YAML model YOLO("yolov8n-pose.pt") # load a pretrained model (recommended for training) # model …

SQL— DQL语句学习【后端 11】

DQL语句 引言 DQL&#xff08;Data Query Language&#xff0c;即数据查询语言&#xff09;是SQL&#xff08;Structured Query Language&#xff09;中用于从数据库中检索数据的重要部分。在数据库管理中&#xff0c;DQL语句是日常工作中最常用的工具之一。通过DQL&#xff0…

leetcode-538. 把二叉搜索树转换为累加树

题目描述 给出二叉 搜索 树的根节点&#xff0c;该树的节点值各不相同&#xff0c;请你将其转换为累加树&#xff08;Greater Sum Tree&#xff09;&#xff0c;使每个节点 node 的新值等于原树中大于或等于 node.val 的值之和。 提醒一下&#xff0c;二叉搜索树满足下列约束…

C++11:右值引用、移动语义和完美转发

目录 前言 1. 左值引用和右值引用 2. 引用范围 3. 左值引用的缺陷 4. 右值引用的作用 5. 右值引用的深入场景 6. 完美转发 总结 前言 C11作为一次重大的更新&#xff0c;引入了许多革命性的特性&#xff0c;其中之一便是右值引用和移动语义。本文将深入探讨其中引入的…

Unity抖音直播玩法开发流程

前言 近两年直播玩法逐渐新兴起来了&#xff0c;也出现不少质量还不错的作品&#xff0c;比如下列《红蓝对决》《三国全战》等。近期我们也做了一款直播玩法&#xff0c;就此记录下开发流程。 1&#xff0c;申请应用 进入抖音开发者平台&#xff0c;在首页入驻平台。 如果是…

Unity的粒子系统

目录 基础参数与模块 创建与编辑 功能与应用 实例与教程 结论 Unity粒子系统的最新功能和更新有哪些&#xff1f; 如何在Unity中使用Visual Effect Graph创建复杂粒子效果&#xff1f; Unity粒子系统的高级应用技巧有哪些&#xff1f; 在Unity中实现粒子系统时的性能优…

回溯算法(基于Python)

递归 递归(recursion)是一种算法策略&#xff0c;通过函数调用自身来解决问题。"递"指程序不断深入地调用自身&#xff0c;通常传入更小或更简化的参数&#xff0c;直到达到“终止条件”。"归"指触发终止条件后&#xff0c;程序从最深层的递归函数开始逐层…

代码块分类

局部代码块 public class Test {public static void main(String[] args) {{int a 10;}// 执行到此处时候,变量a已经从内存中消失了。 // System.out.println(a);} } 构造代码块 public class Test {private String name;private int age;{// 构造代码块System.out.…

【STM32 Blue Pill编程】-定时器与中断

定时器与中断 文章目录 定时器与中断1、硬件准备及接线2、GPIO配置3、代码实现STM32F103C8 配有四个定时器,分别为 TIM1、TIM2、TIM3 和 TIM4。 它们充当时钟并用于跟踪基于时间的事件。 我们将展示如何使用 HAL 库在 STM32Cube IDE 中对这些定时器进行编程。 本文将涉及如下内…

【网络】抓包工具的使用

抓包工具 文章目录 1.tcpdump抓包1.1安装 tcpdump1.2常见使用 2.wireshark抓包 1.tcpdump抓包 TCPDump 是一款强大的网络分析工具&#xff0c; 主要用于捕获和分析网络上传输的数据包。 1.1安装 tcpdump tcpdump 通常已经预装在大多数 Linux 发行版中。 如果没有安装&#…

常见java OOM异常分析排查思路分析

Java 虚拟机&#xff08;JVM&#xff09;发生 OutOfMemoryError&#xff08;OOM&#xff09;异常时&#xff0c;表示 JVM 在尝试分配内存时无法找到足够的内存资源。以下是几种常见的导致 OOM 异常的情况&#xff1a; 1. Java 堆空间不足 (Java Heap Space) 这种情况发生在 J…

【小球下落反弹】小球自由落下,每次落地后反跳回原高度的一半

一小球从100米高度自由落下&#xff0c;每次落地后反跳回原高度的一半&#xff1b;再落下&#xff0c;求它在第10次落地时&#xff0c;共经过多少米&#xff1f;第10次反弹多高&#xff1f; 使用C语言实现&#xff0c;具体代码&#xff1a; #include<stdio.h>int main(…

wo是如何克服编程学习中的挫折感的?

你是如何克服编程学习中的挫折感的&#xff1f; 编程学习之路上&#xff0c;挫折感就像一道道难以逾越的高墙&#xff0c;让许多人望而却步。然而&#xff0c;真正的编程高手都曾在这条路上跌倒过、迷茫过&#xff0c;却最终找到了突破的方法。你是如何在Bug的迷宫中找到出口的…

HarmonyOs透明弹窗(选择照片弹窗样式)

1.鸿蒙中需要实现一个如下图的弹窗 2.由上图中可以得出&#xff0c;只需要三个Text组件依次向下排列&#xff0c;弹窗背景设置透明即可&#xff0c;弹窗代码如下(仅展示弹窗样式)&#xff1a; /**** 自定义选择图片弹窗** 外部定义需要导出*/ CustomDialog //自定义弹窗 export…

Linux驱动学习之点灯(一)

学习不同的板子我们都是从点灯开始&#xff0c;linux驱动也不例外 驱动开发基础知识 何为驱动&#xff1f; 驱使硬件正常工作的代码就叫做驱动。 在一些mcu里&#xff1a; 无非就是直接操作寄存器&#xff0c;或者用库函数初始化外设&#xff0c;使外设正常工作如初始化iic&…

leetcode13. 罗马数字转整数,流程图带你遍历所有情况

leetcode13. 罗马数字转整数 示例 1: 输入: s “III” 输出: 3 示例 2: 输入: s “IV” 输出: 4 示例 3: 输入: s “IX” 输出: 9 示例 4: 输入: s “LVIII” 输出: 58 解释: L 50, V 5, III 3. 示例 5: 输入: s “MCMXCIV” 输出: 1994 解释: M 1000, CM 900, XC…

一个贼好用的开源导航网站项目——pintree!【送源码】

这两天发现了一个项目&#xff0c;它可以快速的将收藏夹里的网址导出&#xff0c;然后快速生成一个在线的网站。这个项目就是 pintree。 项目简介 Pintree 是一个开源项目&#xff0c;旨在将浏览器书签转换为导航网站。只需几个简单的步骤&#xff0c;就可以将书签转变为美观…

【CAN-IDPS】汽车网关信息安全要求以及实验方法

《汽车网关信息安全技术要求及试验方法》是中国的一项国家标准,编号为GB/T 40857-2021,于2021年10月11日发布,并从2022年5月1日起开始实施 。这项标准由全国汽车标准化技术委员会(TC114)归口,智能网联汽车分会(TC114SC34)执行,主管部门为工业和信息化部。 该标准主要…

集团数字化转型方案(二)

集团数字化转型方案通过整合物联网&#xff08;IoT&#xff09;、大数据分析、人工智能&#xff08;AI&#xff09;和云计算技术&#xff0c;构建了一个全面智能化的业务平台&#xff0c;从而实现了全集团范围内的业务流程自动化、数据驱动决策优化、以及客户体验的个性化提升。…