mqtt:测试eclipse paho qos=1的数据重发的功能

news2024/11/19 23:29:12

# 测试程序

【pom.xml】

<dependencies>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-web</artifactId>
        <version>2.3.12.RELEASE</version>
    </dependency>
    <dependency>
        <groupId>org.eclipse.paho</groupId>
        <artifactId>org.eclipse.paho.mqttv5.client</artifactId>
        <version>1.2.5</version>
    </dependency>
    <dependency>
        <groupId>com.alibaba.fastjson2</groupId>
        <artifactId>fastjson2</artifactId>
        <version>2.0.49</version>
    </dependency>
    <dependency>
        <groupId>ch.qos.logback</groupId>
        <artifactId>logback-classic</artifactId>
        <version>1.2.11</version>
    </dependency>
</dependencies>

【MyDemo3MqttV5Server1.java】模拟一个正常的消息接收服务

package com.chz.myMqttV5.demo3;

import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.mqttv5.client.*;
import org.eclipse.paho.mqttv5.common.MqttException;
import org.eclipse.paho.mqttv5.common.MqttMessage;
import org.eclipse.paho.mqttv5.common.packet.MqttProperties;

import java.util.concurrent.ThreadLocalRandom;

@Slf4j
public class MyDemo3MqttV5Server1
{
    public static void main(String[] args) throws InterruptedException
    {
        String broker = "tcp://192.168.44.228:1883";
        String clientId = "MyDemo3MqttV5Server1";
        int subQos = 1;
        try {
            MqttClient client = new MqttClient(broker, clientId);
            MqttConnectionOptions options = new MqttConnectionOptions();
            options.setAutomaticReconnect(true);
            client.setCallback(new MyDemo3Server1Callback(clientId));
            client.connect(options);
            client.subscribe("$share/demo3/device/#", subQos);
        } catch (MqttException e) {
            e.printStackTrace();
        }
    }

    @Slf4j
    public static class MyDemo3Server1Callback implements MqttCallback
    {
        private String clientId;

        public MyDemo3Server1Callback(String clientId)
        {
            this.clientId = clientId;
        }

        public void connectComplete(boolean reconnect, String serverURI) {
            log.info("{}::connectComplete, reconnect={}, serverURI={}", clientId, reconnect, serverURI);
        }

        public void disconnected(MqttDisconnectResponse disconnectResponse) {
            log.info("{}::disconnected, disconnectResponse={}", clientId, disconnectResponse.getReasonString());
        }

        public void deliveryComplete(IMqttToken token) {
            log.info("{}::deliveryComplete, disconnectResponse={}", clientId, token.isComplete());
        }

        public void messageArrived(String topic, MqttMessage message) throws Exception {
            log.info("{}::messageArrived, topic={}, qos={}, message={}", clientId, topic, message.getQos(), new String(message.getPayload()));
        }

        public void mqttErrorOccurred(MqttException exception) {
            log.info("{}::mqttErrorOccurred, disconnectResponse={}", clientId, exception.getMessage());
        }

        public void authPacketArrived(int reasonCode, MqttProperties properties) {
            log.info("{}::authPacketArrived, reasonCode={}", clientId, reasonCode);
        }
    }
}

【MyDemo3MqttV5Server2.java】模拟一个工作不正常的消息接收不服务

package com.chz.myMqttV5.demo3;

import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.mqttv5.client.*;
import org.eclipse.paho.mqttv5.common.MqttException;
import org.eclipse.paho.mqttv5.common.MqttMessage;
import org.eclipse.paho.mqttv5.common.packet.MqttProperties;

import java.util.concurrent.ThreadLocalRandom;

@Slf4j
public class MyDemo3MqttV5Server2
{
    private static int subQos = 1;

    public static void main(String[] args) throws InterruptedException
    {
        String broker = "tcp://192.168.44.228:1883";
        String clientId = "MyDemo3MqttV5Server2";

        try {
            MqttClient client = new MqttClient(broker, clientId);
            MqttConnectionOptions options = new MqttConnectionOptions();
            options.setAutomaticReconnect(true);
            options.setKeepAliveInterval(3);        // keepAliveInterval设置成3秒
            client.setCallback(new MyDemo3Server2Callback(clientId, client));
            client.connect(options);
        } catch (MqttException e) {
            e.printStackTrace();
        }
    }

    @Slf4j
    public static class MyDemo3Server2Callback implements MqttCallback
    {
        private String clientId;
        private MqttClient client;

        public MyDemo3Server2Callback(String clientId, MqttClient client)
        {
            this.clientId = clientId;
            this.client = client;
        }

        public void connectComplete(boolean reconnect, String serverURI) {
            log.info("{}::connectComplete, reconnect={}, serverURI={}", clientId, reconnect, serverURI);
            try {
                if( client.isConnected() ){
                    client.subscribe("$share/demo3/device/#", subQos);
                }
            } catch (MqttException e) {
                log.error("err", e);
            }
        }

        public void disconnected(MqttDisconnectResponse disconnectResponse) {
            log.info("{}::disconnected, disconnectResponse={}", clientId, disconnectResponse.getReasonString());
        }

        public void deliveryComplete(IMqttToken token) {
            log.info("{}::deliveryComplete, disconnectResponse={}", clientId, token.isComplete());
        }

        public void messageArrived(String topic, MqttMessage message) throws Exception {
            log.info("{}::messageArrived start, topic={}, qos={}, message={}", clientId, topic, message.getQos(), new String(message.getPayload()));
            if( ThreadLocalRandom.current().nextInt() % 20 ==0 ){
                try {
                    log.info("{}::messageArrived ------------error1, topic={}, qos={}, message={}", clientId, topic, message.getQos(), new String(message.getPayload()));
                    // 休眠10秒,因为前面设置了keepAliveInterval为3秒,所以一定会导致连接断开
                    Thread.sleep(10000);
                    log.info("{}::messageArrived ------------error2, topic={}, qos={}, message={}", clientId, topic, message.getQos(), new String(message.getPayload()));
                } catch (Exception e) {
                    log.error("err", e);
                }
            }
        }

        public void mqttErrorOccurred(MqttException exception) {
            log.info("{}::mqttErrorOccurred, disconnectResponse={}", clientId, exception.getMessage());
        }

        public void authPacketArrived(int reasonCode, MqttProperties properties) {
            log.info("{}::authPacketArrived, reasonCode={}", clientId, reasonCode);
        }
    }
}

【MyDemo3MqttV5Sender.java】模拟一个发消息出来的设备

package com.chz.myMqttV5.demo3;

import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.mqttv5.client.*;
import org.eclipse.paho.mqttv5.common.MqttException;
import org.eclipse.paho.mqttv5.common.MqttMessage;
import org.eclipse.paho.mqttv5.common.packet.MqttProperties;

@Slf4j
public class MyDemo3MqttV5Sender {

    private static String clientId = MyDemo3MqttV5Sender.class.getSimpleName();

    public static void main(String[] args) throws InterruptedException {
        String broker = "tcp://192.168.44.228:1883";
        int subQos = 1;
        int pubQos = 1;
        String msg;

        try {
            MqttClient client = new MqttClient(broker, clientId);
            MqttConnectionOptions options = new MqttConnectionOptions();
            client.setCallback(new MyDemo3SenderCallback(clientId));
            client.connect(options);
            client.subscribe("device/#", subQos);

            for(int i=0; i<200; i++){
                int id = i;
                msg = id+"";
                MqttMessage message = new MqttMessage(msg.getBytes());
                message.setId(id);        // 这个id很重要,否则qos=1不会生效
                message.setQos(pubQos);   // 设置qos=1
                client.publish("device/1", message);
                Thread.sleep(1L);
            }

        } catch (MqttException e) {
            e.printStackTrace();
        }
    }

    @Slf4j
    public static class MyDemo3SenderCallback implements MqttCallback
    {
        private String clientId;

        public MyDemo3SenderCallback(String clientId)
        {
            this.clientId = clientId;
        }

        public void connectComplete(boolean reconnect, String serverURI) {
            log.info("{}::MyMqttCallback, reconnect={}, serverURI={}", clientId, reconnect, serverURI);
        }

        public void disconnected(MqttDisconnectResponse disconnectResponse) {
            log.info("{}::disconnected, disconnectResponse={}", clientId, disconnectResponse.getReasonString());
        }

        public void deliveryComplete(IMqttToken token) {
            log.info("{}::deliveryComplete, disconnectResponse={}", clientId, token.isComplete());
        }

        public void messageArrived(String topic, MqttMessage message) throws Exception {
            log.info("{}::messageArrived, topic={}, qos={}, message={}", clientId, topic, message.getQos(), new String(message.getPayload()));
        }

        public void mqttErrorOccurred(MqttException exception) {
            log.info("{}::mqttErrorOccurred, disconnectResponse={}", clientId, exception.getMessage());
        }

        public void authPacketArrived(int reasonCode, MqttProperties properties) {
            log.info("{}::authPacketArrived, reasonCode={}", clientId, reasonCode);
        }
    }
}

# 开始测试

启动【MyDemo3MqttV5Server1、MyDemo3MqttV5Server2】
然后启动【MyDemo3MqttV5Sender】,输出日志如下:
在这里插入图片描述
可以看到【MyDemo3MqttV5Server2】接收到消息【74】的时候卡了10秒,然后连接就断开了。
在这里插入图片描述
可以看到【MyDemo3MqttV5Server2】断开连接之后,消息【74】之后的后续消息都被【MyDemo3MqttV5Server1】接收到了。换句话说不会因为某个消息接收服务的问题导致消息丢失。

# 参考资料

mqtt服务emqx的安装可以参考https://blog.csdn.net/chenhz2284/article/details/139411874

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1955008.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

DNS服务器搭建练习

练习要求&#xff1a; 3、搭建一个dns服务器&#xff0c;客户端可以使用该服务器解析域名www.haha.com为web服务器的 4、将客户端的ip地址中的域名解析服务器地址修改为第3题的dnt服务器的p&#xff0c;使用ping命令ping www.haha.com看能否ping通&#xff0c;用curl命令访问c…

【PyQt5】一文向您详细介绍 setPlaceholderText() 的作用

【PyQt5】一文向您详细介绍 setPlaceholderText() 的作用 下滑即可查看博客内容 &#x1f308; 欢迎莅临我的个人主页 &#x1f448;这里是我静心耕耘深度学习领域、真诚分享知识与智慧的小天地&#xff01;&#x1f387; &#x1f393; 博主简介&#xff1a;985高校的普通…

两种方法在MATLAB中实现共享参数拟合的源代码【MATLAB pk 1stopt】

有伙伴在巴山学长交流群中询问有关如何在matlab中实现共享参数拟合的问题&#xff0c;感觉这个问题挺有意思的&#xff0c;故拿出来与大家分享。咱也根据伙伴的提问在网上进行了相关搜索&#xff0c;发现这个共享参数拟合的问题基本上都跟国产拟合优化神器1stopt这款软件有关。…

Vue基础2

1.监视属性 先推荐大家安装第一个vscode常用插件 <!DOCTYPE html> <html lang"en"><head><meta charset"UTF-8"><title>天气案例_监视简写</title><!-- 引入Vue --><script type"text/javascript"…

Linux环境下(DeepinV20+)安装并配置jdk和maven

一、jdk下载 Oracle的JDK开始收费了&#xff0c;如非必要&#xff0c;请勿使用&#xff01;&#xff01;&#xff01; jdk下载地址1&#xff08;推荐&#xff09;https://github.com/graalvm/graalvm-ce-builds/releases jdk下载地址2&#xff08;可选&#xff09;&#xff1a;…

PsExec横向:IPCPTHPTT

一.IPC下的PsExec 二.PTH下的psexec&#xff08;CS操作&#xff09; 三.PTT下的psexec PsExec工具&#xff1a; psexec 是 windows 下非常好的一款远程命令行工具。psexec的使用不需要对方主机开方3389端口&#xff0c;只需要对方开启admin$共享和ipc$ (该共享默认开启&#…

8. 运行时数据区-堆

一般Java程序中堆内存是空间最大的一块内存区域。创建出来的对象都存在于堆上。栈上的局部变量表中&#xff0c;可以存放堆上对象的引用。静态变量也可以存放堆对象的引用&#xff0c;通过静态变量就可以实现对象在线程之间共享。 堆内存的调优 堆空间有三个需要关注的值&…

自编码器(autoencoder)

1.自编码器的由来 最初的自编码器是用来降维的&#xff0c;后来也逐渐用于去噪、生成任务。 2.自编码器的基本结构 自编码器&#xff08;autoencoder&#xff09;内部有一个隐藏层 h&#xff0c;可以产生编码&#xff08;code&#xff09;表示输入。该网络可以看作由两部分组…

yolo模型训练出的.pt文件过大

当我们使用yolov8训练时候&#xff0c;保存的模型变大&#xff0c;如下图&#xff1a; 原模型 训练出来的模型 经过仔细调查&#xff0c;发现是保存的模型中多了很多数据。 原模型 训练出来的模型 只需要把文件中.pt文件读取&#xff0c;重写一遍保存。 from ultralytics im…

【RabbitMQ】MQ相关概念

一、MQ的基本概念 定义&#xff1a;MQ全称为Message Queue&#xff0c;是一种提供消息队列服务的中间件&#xff0c;也称为消息中间件。它允许应用程序通过读写队列中的消息来进行通信&#xff0c;而无需建立直接的连接。作用&#xff1a;主要用于分布式系统之间的通信&#x…

[工具]GitHub + PicGo 搭建免费博客图床

文章目录 起因GitHub新建GitHub仓库新建token授予picgo权限 PicGOPicGO上传失败原因 起因 还是觉得个人博客记录最好还是不要money&#x1f625;&#xff0c;所以还是想白嫖&#xff0c;找到了GitHub PicGO的方式&#xff0c;记录一下。 GitHub 过程和搭建博客链接类似&…

【C++】红黑树的应用(封装map和set)

✨ 青山一道同云雨&#xff0c;明月何曾是两乡 &#x1f30f; &#x1f4c3;个人主页&#xff1a;island1314 &#x1f525;个人专栏&#xff1a;C学习 &#x1f680; 欢迎关注&#xff1a;&#x1f44d;点赞 &…

SpringBoot3 JDK21 Vue3开源后台RBAC管理系统 | 2024年好用的开源RBAC管理系统 | 数据权限的探索

序言 项目现已全面开源&#xff0c;商业用途完全免费&#xff01; 当前版本&#xff1a;v0.7.2。 如果喜欢这个项目或支持作者&#xff0c;欢迎Star、Fork、Watch 一键三连 &#x1f680;&#xff01;&#xff01; 在构建此代码框架的过程中&#xff0c;我已投入了大量精力&…

51单片机嵌入式开发:20、STC89C52R基于C51嵌入式点阵广告屏的设计

STC89C52R基于C51嵌入式点阵广告屏的设计 1 概述2 LED点阵介绍2.1 特点和优势2.2 工作原理&#xff1a;2.3 使用方法&#xff1a; 3 LED点阵原理3.1 Led点阵内部电路3.2 原理图电路3.3 74HC595 4 软件实现点阵图案的滑动4.1 软件工程代码4.2 Protues仿真 5 总结 配套示例程序 1…

寻找事业伴侣:男人如何找到匹配自己事业的女人

寻找事业伴侣&#xff1a;男人如何找到匹配自己事业的女人 前言 在攀登事业的征途上&#xff0c;每位男士都渴望有一位能够并肩作战的伴侣。她不仅要理解你的抱负&#xff0c;还要支持你的每一个决定。但现实中&#xff0c;找到这样的女人并非易事。 以下是一些深入的建议&a…

Linux信号上

信号 概念 信号是由于进程产生&#xff0c;但是由内核调度传递给另一个进程&#xff1a; 产生信号 按键产生信号: Ctrc --> 2)SIGINT(终止/中断) Ctrz --> 20)SIGTSTOP(终端暂停) Ctr\ --> 3)SIGQUIT(退出) 系统调用产生: kill(2), raise, abort软件条件产生: 如定…

Adobe Acrobat Pro DC for Mac:PDF处理软件

Adobe Acrobat Pro DC for Mac是一款专为Mac用户设计的PDF处理软件&#xff0c;它凭借出色的功能和卓越的性能&#xff0c;成为了处理PDF文件的理想选择。 首先&#xff0c;Acrobat Pro DC for Mac支持全方位的PDF编辑。用户可以对PDF文档进行文本编辑、图像处理、表格制作等操…

【区块链】JavaScript连接web3钱包,实现测试网络中的 Sepolia ETH余额查询、转账功能

审核看清楚了 &#xff01; 这是以太坊测试网络&#xff01;用于学习的测试网络&#xff01;&#xff01;&#xff01; 有关web3 和区块链的内容为什么要给我审核不通过&#xff1f; 别人凭什么可以发&#xff01; 目标成果&#xff1a; 实现功能分析&#xff1a; 显示账户信…

CORS-跨域资源共享

CORS-跨域资源共享 什么是CORS &#xff1f; 在前后端分离的项目中&#xff0c;我们往往会遇到跨域问题。 跨域问题只会出现在浏览器发起AJAX&#xff08;XMLHttpRequest、Fetch&#xff09;请求的时候&#xff0c;因为浏览器限制了AJAX只能从同一个源请求资源&#xff0c;除…

DeadSec CTF 2024 Misc Writeup

文章目录 MiscWelcomeMic checkflag_injectionGoLPartyMAN in the middleForgotten Password CryptoFlag killer 好久没做这么爽了噜 DK盾云服务器&#xff1a; https://www.dkdun.cn/ 最近活动是香港的1-1-3 9.9/月 Misc Welcome 进discord群签到即可 Mic check 就是他说…