MQTT的理解和使用

news2024/10/5 22:22:15

MQTT是一种基于发布/订阅模式的轻量协议,该协议基于TCP/IP协议上,由IBM在1999年发布。
在这里插入图片描述

流程理解:订阅者在订阅时会选择主题(Topic)和服务质量(QoS),然后发布者发布消息,代理就会把不同的消息根据Topic推送给相关订阅者。
Topic:每个人的喜好,以订报纸为例,就是军事、财经等主题。
QoS:传输质量(消息的发布者和订阅者约定的),QoS0(发布完就不管了,最多一次)、QoS1(发送之后根据规范,是否启动重传,所以至少一次)、QoS2(确保只有一次)

应用场景:这个协议主要还是在物联网应用比较多,因为开销小对网络要求不高,我这次的使用场景就是和安卓系统的之间的通讯,回传定位信息。

1.MQTT依赖

这里是springboot整合mqtt,所以要在pom.xml里增加

<!--Mqtt依赖-->
<dependency>
	<groupId>org.springframework.integration</groupId>
	<artifactId>spring-integration-mqtt</artifactId>
</dependency>

2.MQTT客户端

setCleanSession表示会话生命周期,默认为True。为 true 时表示创建一个新的会话,在客户端断开连接时,会话将自动销毁。为 false 时表示创建一个持久会话,在客户端断开连接后会话仍然保持(前提是ClientID没变),直到会话超时注销。

import org.eclipse.paho.client.mqttv3.*;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import org.springframework.stereotype.Component;

/**
 * 类描述:Mqtt连接工具类
 *
 * @ClassName MQTTConnect
 * @Author yeapt
 * @Date 2023-07-20 12:22
 */

@Component
public class MQTTConnect {
    //mqtt服务器的地址和端口号
    private String HOST = "tcp://192.168.1.113:123";
    private final String clientId = "DC" + (int) (Math.random() * 100000000);
    private MqttClient mqttClient;

    /**
     * 客户端connect连接mqtt服务器
     *
     * @param userName     用户名
     * @param passWord     密码
     * @param mqttCallback 回调函数
     **/
    public void setMqttClient(String userName, String passWord, MqttCallback mqttCallback) throws MqttException {
        MqttConnectOptions options = mqttConnectOptions(userName, passWord);
        if (mqttCallback == null) {
            mqttClient.setCallback(new Callback());
        } else {
            mqttClient.setCallback(mqttCallback);
        }
        mqttClient.connect(options);
    }

    /**
     * MQTT连接参数设置
     */
    private MqttConnectOptions mqttConnectOptions(String userName, String passWord) throws MqttException {
        mqttClient = new MqttClient(HOST, clientId, new MemoryPersistence());
        MqttConnectOptions options = new MqttConnectOptions();
        options.setUserName(userName);
        options.setPassword(passWord.toCharArray());
        //连接超时时间,默认:30
        options.setConnectionTimeout(10);
        //重连配置,默认:false
        options.setAutomaticReconnect(true);
        //会话生命周期,默认:true
        options.setCleanSession(true);
        //默认:60
        options.setKeepAliveInterval(60);
        return options;
    }

    /**
     * 关闭MQTT连接
     */
    public void close() throws MqttException {
        mqttClient.disconnect();
        mqttClient.close();
    }

    /**
     * 向某个主题发布消息 默认qos:1
     *
     * @param topic:发布的主题
     * @param msg:发布的消息
     */
    public void pub(String topic, String msg) throws MqttException {
        MqttMessage mqttMessage = new MqttMessage();
        //Qos2:保证消息至少传输成功1次
        mqttMessage.setQos(2);
        mqttMessage.setPayload(msg.getBytes());
        MqttTopic mqttTopic = mqttClient.getTopic(topic);
        MqttDeliveryToken token = mqttTopic.publish(mqttMessage);
        token.waitForCompletion();
    }

    /**
     * 向某个主题发布消息
     *
     * @param topic: 发布的主题
     * @param msg:   发布的消息
     * @param qos:   消息质量    Qos:0、1、2
     */
    public void pub(String topic, String msg, int qos) throws MqttException {
        MqttMessage mqttMessage = new MqttMessage();
        mqttMessage.setQos(qos);
        mqttMessage.setPayload(msg.getBytes());
        MqttTopic mqttTopic = mqttClient.getTopic(topic);
        MqttDeliveryToken token = mqttTopic.publish(mqttMessage);
        token.waitForCompletion();
    }

    /**
     * 订阅某一个主题 ,此方法默认的的Qos等级为:1
     *
     * @param topic 主题
     */
    public void sub(String topic) throws MqttException {
        mqttClient.subscribe(topic);
    }

    /**
     * 订阅某一个主题,可携带Qos
     *
     * @param topic 所要订阅的主题
     * @param qos   消息质量:0、1、2
     */
    public void sub(String topic, int qos) throws MqttException {
        mqttClient.subscribe(topic, qos);
    }
}

3.MQTT回调函数

用来处理业务,获取到信息后如何处理

import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttMessage;

/**
 * 类描述:MQTT回调
 *
 * @ClassName Callback
 * @Author yeapt
 * @Date 2023-07-20 12:24
 */
@Slf4j
public class Callback implements MqttCallback {
    /**
     * MQTT 断开连接会执行此方法
     */
    @Override
    public void connectionLost(Throwable throwable) {
        log.info("断开了MQTT连接 :{}", throwable.getMessage());
        log.error(throwable.getMessage(), throwable);
    }

    /**
     * publish发布成功后会执行到这里
     */
    @Override
    public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
        log.info("发布消息成功");
    }

    /**
     * subscribe订阅后得到的消息会执行到这里
     */
    @Override
    public void messageArrived(String topic, MqttMessage message) throws Exception {
        //  TODO    此处可以将订阅得到的消息进行业务处理、数据存储
        log.info("收到来自 " + topic + " 的消息:{}", new String(message.getPayload()));
}}

4.调试和使用

开发阶段使用main方法进行测试,但是在实际使用中一般在程序启动就会开启监听。

/**
* main函数自己测试用
*/
public static void main(String[] args) throws MqttException {
  MQTTConnect mqttConnect = new MQTTConnect();
  mqttConnect.setMqttClient("admin", "password", new Callback());
  mqttConnect.sub("/abcd-trace/#",2);
  mqttConnect.pub("/abcd-trace/#", "Mr.Y" + (int) (Math.random() * 100000000));
    }
import com.stylefeng.guns.modular.xujianapi.util.mqtt.Callback;
import com.stylefeng.guns.modular.xujianapi.util.mqtt.MQTTConnect;
import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationListener;
import org.springframework.context.event.ContextRefreshedEvent;
import org.springframework.stereotype.Component;

/**
 * 类描述:MQTT监听类
 *
 * @ClassName MQTTListener
 * @Author yeapt
 * @Date 2023-07-20 12:27
 */
@Slf4j
@Component
public class MQTTListener implements ApplicationListener<ContextRefreshedEvent> {
    private final MQTTConnect server;

    @Autowired
    public MQTTListener(MQTTConnect server) {
        this.server = server;
    }

    @Override
    public void onApplicationEvent(ContextRefreshedEvent contextRefreshedEvent) {
        try {
            server.setMqttClient("admin", "password", new Callback());
            server.sub("/abcd-trace/#");
        } catch (MqttException e) {
            log.error(e.getMessage(), e);
        }
    }
}

这里只是讲述了客户端,后续如果有机会再研究发布者和代理的写法。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/777552.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

matlab超前-滞后校正

1控制系统的校正 系统性能 稳定性、准确性、快速性 动态性能-超前校正 阶跃曲线、频域&#xff08;bode图&#xff09;、根轨迹&#xff08;增加零点-根轨迹左移稳定性提高&#xff09;、PID控制&#xff08;PD&#xff09; 静态性能-滞后校正 阶跃曲线、频域&#xff08…

Flink CDC MongoDB 联合实时数仓的探索实践

摘要&#xff1a;本文整理自 XTransfer 技术专家, Flink CDC Maintainer 孙家宝&#xff0c;在 Flink Forward Asia 2022 数据集成专场的分享。本篇内容主要分为四个部分&#xff1a; MongoDB 在实时数仓的探索 MongoDB CDC Connector 的实现原理和使用实践 FLIP-262 MongoDB…

Spring MVC拦截器和跨域请求

一、拦截器简介 SpringMVC的拦截器&#xff08;Interceptor&#xff09;也是AOP思想的一种实现方式。它与Servlet的过滤器&#xff08;Filter&#xff09;功能类似&#xff0c;主要用于拦截用户的请求并做相应的处理&#xff0c;通常应用在权限验证、记录请求信息的日志、判断用…

多肽试剂1801415-23-5,Satoreotide,UNII-S58172SSTS,应用在多肽标记及修饰上

资料编辑|陕西新研博美生物科技有限公司小编MISSwu​ Satoreotide&#xff0c;UNII-S58172SSTS Product structure Product specifications 1.CAS No&#xff1a;1801415-23-5 2.Molecular formula&#xff1a;C58H72ClN15O14S2 3.Molecular weight&#xff1a;1302.9 4.Packa…

【C++详解】——C++11

目录 C简介 统一的列表初始化 {}的初始化 initializer_list容器 声明 auto decltype nullptr 范围for C简介 在2003年C标准委员会曾经提交了一份技术勘误表(简称TC1)&#xff0c;使得C03这个名字已经取代了 C98称为C11之前的最新C标准名称。 不过由于C03(TC1)主…

STM32 串口 DMA 接收任意长度数据

DMA 局限性 DMA 传输完成会产生中断告知 CPU&#xff0c;这对于固定长度的数据是没什么问题的。但是对于不定长的数据就不行了&#xff0c;DMA 一定要接收到足够多&#xff08;设定的长度&#xff09;的数据时才产生完成中断&#xff0c;如果接收到的数据量小于设定的长度&…

Linux的基础配置

配置篇 前置步骤&#xff1a;先租一个服务器或装个Linux再或者虚拟机都可以 1.安装gcc,g,gdb 在Linux下我们能用什么工具来编译所编写好的代码呢&#xff0c;其实Linux下这样的工具有很多&#xff0c;但我们只介绍两款常用的工具&#xff0c;它们分别是gcc和g. gcc和g的主要…

解决端口占用

解决办法&#xff1a; 1、换一个其它未被占用的端口 2、端口被占用了&#xff0c;先看下是哪个程序再用&#xff0c;停掉就OK了 下面演示结束端口被占用的程序的过程&#xff1a; 1、查看被占用的端口的进程 netstat -aon|findstr 端口号2、根据PID找到占用此端口的进程 ta…

Redis单机伪集群配置与搭建

目录 1. 复制6个redis配置文件到当前目录下 2. 启动每个节点 3. 判断集群是否可用 4. 初始化集群 5. 查看集群信息 6. 获取与插槽对应的节点 &#xff08;1&#xff09;手动重定向 &#xff08;2&#xff09;自动重定向 7. 故障恢复 需求&#xff1a;配置一个三主三从的集…

为什么ConcurrentHashMap不允许插入null值而HashMap可以?

为什么ConcurrentHashMap不允许插入null值而HashMap可以&#xff1f; 文章目录 为什么ConcurrentHashMap不允许插入null值而HashMap可以&#xff1f;HashMap源码ConcurrentHashMap源码为什么ConcurrentHashMap需要加空值校验呢&#xff1f;二义性问题测试代码代码分析测试结果结…

纯css3实现水波纹从中心向四周扩散动画

纯css3实现水波纹从中心向四周扩散动画 效果可用于pc端或移动端,引导用户点击,间接带来一定的转化率 示例效果 示例代码 <template><div class"zanbtn-wrap"><div click"handleClick(https://pay.aikelaidev.cn/paypage/?merchant35bdYxSx7dCUr…

子网划分路由网卡

1."IPv4 CIDR" "IPv4 CIDR" 是与互联网协议地址&#xff08;IP address&#xff09;和网络的子网划分有关的概念。 - "IPv4" 代表 "Internet Protocol version 4"&#xff0c;也就是第四版互联网协议&#xff0c;这是互联网上最广泛使…

IPv4 与 IPv6:网络协议的差异和转换方法

在网络的世界里&#xff0c;IPv4 和 IPv6 这两个协议就像是两个不同的王国&#xff0c;各自拥有着独特的领土和规则。虽然它们都是为了互联网的发展而存在&#xff0c;但它们之间究竟有哪些差异&#xff0c;以及如何在这两个王国之间穿梭&#xff0c;仍然是很多网络小白们的困惑…

Spring使用注解存储Bean对象

文章目录 一. 配置扫描路径二. 使用注解储存Bean对象1. 使用五大类注解储存Bean2. 为什么要有五大类注解&#xff1f;3.4有关获取Bean参数的命名规则 三. 使用方法注解储存Bean对象1. 方法注解储存对象的用法2. Bean的重命名 在前一篇博客中&#xff08; Spring项目创建与Bean…

Excel中Vlookup

VLOOKUP($A:$A,Sheet3!$A:$D,COLUMN(Sheet3!B1),FALSE) ps: 1.按F4&#xff0c;锁定第一个和第二个参数 2.第二个参数&#xff0c;要选择全部范围(包括被查找列&#xff0c;以及查找内容) 2.第三个参数用column&#xff08;&#xff09;函数&#xff0c;第三列不要锁定 3.…

【Linux】自动化构建工具-make/Makefile详解

前言 大家好吖&#xff0c;欢迎来到 YY 滴 Linux系列 &#xff0c;热烈欢迎&#xff01;本章主要内容面向接触过Linux的老铁&#xff0c;主要内容含 欢迎订阅 YY 滴Linux专栏&#xff01;更多干货持续更新&#xff01;以下是传送门&#xff01; 订阅专栏阅读&#xff1a;YY的《…

Alpha-GO打败⼈类的秘籍- 强化学习(Reinforcement Learning)

为了深⼊理解强化学习&#xff08;Reinforcement Learning&#xff0c;简称RL&#xff09;这⼀核⼼概念&#xff0c;我们从⼀个⽇常游戏的例⼦出发。在“贪吃蛇”这个经典游戏中&#xff0c;玩家需要掌控⼀条蛇&#xff0c;引导它吞吃屏幕上出现的各种果实。每次成功捕获果实&a…

CSS层叠

声明冲突 同一个样式&#xff0c;多次应用到同一个元素。 此时可以看到我们的a元素的样式和浏览器的默认样式发生了冲突 层叠 解决声明冲突的过程&#xff0c;浏览器会自动处理&#xff08;权重计算&#xff09;&#xff0c;权重大的获胜 1.比较重要性 重要性从高到低 1&…

Docker 教程

Docker 是一个开源的应用容器引擎&#xff0c;基于 Go 语言并遵从 Apache2.0 协议开源。 Docker 可以让开发者打包他们的应用以及依赖包到一个轻量级、可移植的容器中&#xff0c;然后发布到任何流行的 Linux 机器上&#xff0c;也可以实现虚拟化。 容器是完全使用沙箱机制&a…

省电液晶驱动IC,VK2C22G,COG片高抗干扰抗噪系列LCD段码驱动芯片,I2C通信接口

型号:VK2C22G DICE(邦定COB)/COG&#xff08;绑定玻璃用&#xff09; VK2C22G概述&#xff1a; VK2C22G是一个点阵式存储映射的LCD驱动器&#xff0c;可支持最大176点&#xff08;44SEGx4COM&#xff09;的LCD屏。单片机可通过I2C接口配置显示参数和读写显示数据&#…