mqtt-emqx:keepAlive机制测试

news2025/1/20 18:32:33

mqtt keepAlive原理详见【https://www.emqx.com/zh/blog/mqtt-keep-alive】

# 下面开始写测试代码

【pom.xml】

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-web</artifactId>
    <version>2.3.12.RELEASE</version>
</dependency>
<dependency>
    <groupId>org.eclipse.paho</groupId>
    <artifactId>org.eclipse.paho.client.mqttv3</artifactId>
    <version>1.2.2</version>
</dependency>
<dependency>
    <groupId>com.alibaba.fastjson2</groupId>
    <artifactId>fastjson2</artifactId>
    <version>2.0.49</version>
</dependency>

【MyDemo7MqttCallback.java】

package com.chz.myMqttV3.demo7;

@Slf4j
public class MyDemo7MqttCallback implements MqttCallbackExtended {

    private MqttClient client;
    private MqttConnectOptions options;
    private String[] topics;

    public MyDemo7MqttCallback(MqttClient client, MqttConnectOptions options, String[] topics)
    {
        this.client = client;
        this.options = options;
        this.topics = topics;
    }

    @SneakyThrows
    @Override
    public void connectionLost(Throwable throwable) {
        log.error("connectionLost", throwable);
        while (!client.isConnected()) {
            log.info("emqx重新连接....................................................");
            client.connect(options);
            Thread.sleep(1000);
        }
    }

    @Override
    public void messageArrived(String topic, MqttMessage message) throws Exception {
        log.info("messageArrived: topic={}, message={}", topic, new String(message.getPayload()));
    }

    @SneakyThrows
    @Override
    public void deliveryComplete(IMqttDeliveryToken token) {
        if( token!=null ){
            MqttMessage message = token.getMessage();
            String topic = token.getTopics()==null ? null : Arrays.asList(token.getTopics()).toString();
            String str = message==null ? null : new String(message.getPayload());
            log.info("deliveryComplete: topic={}, message={}", topic, str);
        } else {
            log.info("deliveryComplete: null");
        }
    }

    @SneakyThrows
    @Override
    public void connectComplete(boolean reconnect, String serverURI) {
        log.info("connectComplete: reconnect={}, serverURI={}", reconnect, serverURI);

        if( topics.length > 0 ){
            int[] qosArr = new int[topics.length];
            Arrays.fill(qosArr, 2);

            MyDemo7MqttMessageListener[] listeners = new MyDemo7MqttMessageListener[topics.length];
            Arrays.fill(listeners, new MyDemo7MqttMessageListener());

            client.subscribe(topics, qosArr, listeners);
        }
    }
}

【MyDemo7MqttMessageListener.java】

package com.chz.myMqttV3.demo7;

@Slf4j
public class MyDemo7MqttMessageListener implements IMqttMessageListener
{
    @Override
    public void messageArrived(String topic, MqttMessage message) {
        log.info("messageArrived: topic={}, message={}", topic, new String(message.getPayload()));
    }
}

【MyDemo7MqttClient1Test.java】

package com.chz.myMqttV3.demo7;

public class MyDemo7MqttClient1Test
{
    public static void main(String[] args) throws  MqttException {
        MqttConnectOptions options = new MqttConnectOptions();
        options.setUserName("admin");
        options.setPassword("public".toCharArray());
        options.setCleanSession(true);
        options.setAutomaticReconnect(true);
        options.setConnectionTimeout(20);
        options.setKeepAliveInterval(10);

        MqttClient client = new MqttClient("tcp://192.168.44.228:1883", "MyDemo7MqttClient1Test", new MemoryPersistence());
        client.setCallback(new MyDemo7MqttCallback(client, options, new String[]{"device/#"}));
        client.connect(options);
    }
}

【MyDemo7MqttSenderTest.java】

package com.chz.myMqttV3.demo7;

public class MyDemo7MqttSenderTest
{
    public static void main(String[] args) throws UnknownHostException, MqttException, InterruptedException {
        MqttConnectOptions options = new MqttConnectOptions();
        options.setUserName("admin");
        options.setPassword("public".toCharArray());
        options.setCleanSession(true);
        options.setAutomaticReconnect(true);
        options.setConnectionTimeout(20);
        options.setKeepAliveInterval(10);
        // 这里设置遗嘱消息,当broker认为本client断联时会将遗嘱消息发出,可以用来确认borker是否认为本client已经断联
        options.setWill("device/1", "I am MyDemo7MqttSenderTest, I am dead!!!".getBytes(), 1, false);

        MqttClient client = new MqttClient("tcp://192.168.44.228:1883", "MyDemo7MqttSenderTest", new MemoryPersistence());
        client.setCallback(new MyDemo7MqttCallback(client, options, new String[]{}));
        client.connect(options);

        for( int i=0; true; i++ ){
            String topic = "device/1";
            MqttMessage mqttMessage = new MqttMessage();
            mqttMessage.setRetained(true);
            String msg = "I am MyDemo7MqttSenderTest, at node [192.168.44.228:1883]:" + i;
            mqttMessage.setPayload(msg.getBytes(StandardCharsets.UTF_8));
            client.publish(topic, mqttMessage);
            System.out.println("send: " + msg);
            Thread.sleep(1000L);    // 测试运行的时候在这里加入断点,卡住MqttClient内部自动发的PINGREQ消息
        }
    }
}

# 下面开始进行测试

1、启动【MyDemo7MqttClient1Test】。
2、在【MyDemo7MqttSenderTest】的【sleep】那一句加入断点,然后运行【MyDemo7MqttSenderTest】
在这里插入图片描述
等一段时间,可以看到遗嘱消息【I am MyDemo7MqttSenderTest, I am dead!!!】发出来了
在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1801591.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

数据结构(C语言)之对归并排序的介绍与理解

目录 一归并排序介绍&#xff1a; 二归并排序递归版本&#xff1a; 2.1递归思路&#xff1a; 2.2递归代码实现&#xff1a; 三归并排序非递归版本&#xff1a; 3.1非递归思路&#xff1a; 3.2非递归代码实现&#xff1a; 四归并排序性能分析&#xff1a; 欢迎大佬&#…

day40--Redis(二)实战篇

实战篇Redis 开篇导读 亲爱的小伙伴们大家好&#xff0c;马上咱们就开始实战篇的内容了&#xff0c;相信通过本章的学习&#xff0c;小伙伴们就能理解各种redis的使用啦&#xff0c;接下来咱们来一起看看实战篇我们要学习一些什么样的内容 短信登录 这一块我们会使用redis共…

碳素钢化学成分分析 螺纹钢材质鉴定 钢材维氏硬度检测

碳素钢的品种主要有圆钢、扁钢、方钢等。经冷、热加工后钢材的表面不得有裂缝、结疤、夹杂、折叠和发纹等缺陷。尺寸和允许公差必须符合相应品种国家标准的要求。 具体分类、按化学成分分类 &#xff1a; 碳素钢按化学成分&#xff08;即以含碳量&#xff09;可分为低碳钢、中…

问题:军保卡不允许开立附属卡,不能开展境外交易,不开通云闪付工功能() #其他#经验分享

问题&#xff1a;军保卡不允许开立附属卡&#xff0c;不能开展境外交易&#xff0c;不开通云闪付工功能&#xff08;&#xff09; A&#xff0e;A&#xff1a;正确 B&#xff0e;B&#xff1a;错误 参考答案如图所示

在线渲染3d怎么用?3d快速渲染步骤设置

在线渲染3D模型是一种高效的技术&#xff0c;它允许艺术家和设计师通过互联网访问远程服务器的强大计算能力&#xff0c;从而加速渲染过程。无论是复杂的场景还是高质量的视觉效果&#xff0c;在线渲染服务都能帮助您节省宝贵的时间。 在线渲染3D一般选择的是&#xff1a;云渲染…

React的useState的基础使用

import {useState} from react // 1.调用useState添加状态变量 // count 是新增的状态变量 // setCount 修改状态变量的方法 // 2.添加点击事件回调 // userState实现计数实例import {useState} from react// 使用组件 function App() {// 1.调用useState添加状态变量// coun…

Python下载库

注&#xff1a;本文一律使用windows讲解。 一、使用cmd下载 先用快捷键win R打开"运行"窗口&#xff0c;如下图。 在输入框中输入cmd并按回车Enter或点确定键&#xff0c;随后会出现这个画面&#xff1a; 输入pip install 你想下载的库名&#xff0c;并按回车&…

使用MATLAB的BP神经网络进行数据分类任务(简单版)

BP神经网络&#xff0c;即反向传播&#xff08;Backpropagation&#xff09;神经网络&#xff0c;是一种多层前馈神经网络&#xff0c;它通过反向传播算法来更新网络权重。这种网络结构特别适合于分类和回归任务。 MATLAB环境设置 在开始之前&#xff0c;请确保MATLAB环境已经…

【设计模式深度剖析】【5】【行为型】【迭代器模式】

&#x1f448;️上一篇:策略模式 设计模式-专栏&#x1f448;️ 文章目录 迭代器模式定义英文原话直译如何理解呢&#xff1f; 迭代器模式的角色1. Iterator&#xff08;迭代器&#xff09;2. ConcreteIterator&#xff08;具体迭代器&#xff09;3. Aggregate&#xff08;聚…

【Git】如何不管本地文件,强制git pull

要在 Git 中强制执行 git pull 操作&#xff0c;忽略本地文件的更改&#xff0c;可以按照以下步骤操作&#xff1a; 保存当前工作状态&#xff1a;如果你有未提交的更改&#xff0c;可以使用 git stash 将这些更改存储起来。 git stash强制拉取最新代码&#xff1a;使用 git re…

物联网学习小记

https://www.cnblogs.com/senior-engineer/p/10045658.html GOSP: 提供类似Qt的API接口&#xff0c;仅需要几百KB的硬件资源&#xff08;比Qt小的多&#xff09;&#xff0c;能运行在Qt不支持的低配置硬件上&#xff08;对Qt生态形成补充&#xff09;&#xff0c;适用于嵌入式…

基于SpringBoot+Vue单位考勤系统设计和实现(源码+LW+调试文档+讲解等)

&#x1f497;博主介绍&#xff1a;✌全网粉丝1W,CSDN作者、博客专家、全栈领域优质创作者&#xff0c;博客之星、平台优质作者、专注于Java、小程序技术领域和毕业项目实战✌&#x1f497; &#x1f31f;文末获取源码数据库&#x1f31f; 感兴趣的可以先收藏起来&#xff0c;还…

YOLOv5车流量监测系统研究

一. YOLOv5算法详解 YOLOv5网络架构 上图展示了YOLOv5目标检测算法的整体框图。对于一个目标检测算法而言&#xff0c;我们通常可以将其划分为4个通用的模块&#xff0c;具体包括&#xff1a;输入端、基准网络、Neck网络与Head输出端&#xff0c;对应于上图中的4个红色模块。Y…

【Python错误】:AttributeError: ‘generator‘ object has no attribute ‘next‘解决办法

【Python错误】&#xff1a;AttributeError: ‘generator’ object has no attribute next’解决办法 在Python中&#xff0c;生成器是一种使用yield语句的特殊迭代器&#xff0c;它允许你在函数中产生一个值序列&#xff0c;而无需一次性创建并返回整个列表。然而&#xff0c;…

网线制作(双绞线+水晶头)——T568B标准

参考视频&#xff1a;https://www.bilibili.com/video/BV1KQ4y1i7zP/ 1、使用剥线器 2、将线捋顺、排序、剪掉牵引线 记忆技巧 1.线序颜色整体是一浅一深 2.颜色顺序是黄、蓝、绿、棕 一个黄种人、从上向下看&#xff0c;分别看到的是蓝天、青草(绿)、泥土(棕色) 3.中间两根浅…

常见机器学习概念

信息熵 信息熵&#xff08;information entropy&#xff09;是信息论的基本概念。描述信息源各可能事件发生的不确定性。20世纪40年代&#xff0c;香农&#xff08;C.E.Shannon&#xff09;借鉴了热力学的概念&#xff0c;把信息中排除了冗余后的平均信息量称为“信息熵”&…

大数据环境搭建@Hive编译

Hive3.1.3编译 1.编译原因1.1Guava依赖冲突1.2开启MetaStore后运行有StatsTask报错1.3Spark版本过低 2.环境部署2.1jdk安装2.2maven部署2.3安装图形化桌面2.4安装Git2.5安装IDEA 3.拉取Hive源码4.Hive源码编译4.1环境测试1.测试方法——编译2.问题及解决方案&#x1f4a5;问题1…

了解JVM中的Server和Client参数

了解JVM中的Server和Client参数 Java虚拟机&#xff08;Java Virtual Machine&#xff0c;JVM&#xff09;作为Java程序运行的核心&#xff0c;提供了多种参数来优化和调整程序的性能和行为。其中&#xff0c;-server和-client是两个重要的参数&#xff0c;分别用于配置JVM在服…

[ 网络通信基础 ]——网络的传输介质(双绞线,光纤,标准,线序)

&#x1f3e1;作者主页&#xff1a;点击&#xff01; &#x1f916;网络通信基础TCP/IP专栏&#xff1a;点击&#xff01; ⏰️创作时间&#xff1a;2024年6月8日14点23分 &#x1f004;️文章质量&#xff1a;94分 前言—— 在现代通信网络中&#xff0c;传输介质是数据传…

莱芜代理记账公司-全方位为您服务的专业会计服务机构

莱芜代理记账&#xff0c;一个专注于为各类企业提供专业、高效和全面的财务咨询服务的机构&#xff0c;我们的团队由一群经验丰富、富有责任心的会计专业人士组成&#xff0c;他们具备深厚的理论知识和丰富的实践经验&#xff0c;能够根据企业的实际需求&#xff0c;提供最适合…