Java实现MQTT订阅发布

news2024/11/14 13:41:36

一. MQTT 与 EMQX
MQTT 是轻量级基于代理的发布/订阅的消息传输协议。使用发布/订阅消息模式,提供一对多的消息发布,解除应用程序耦合。底层使用 TCP/IP 提供网络连接。

EMQ X (Erlang/Enterprise/Elastic MQTT Broker) 是基于 Erlang/OTP 平台开发的开源物联网 MQTT 消息服务器,支持百万级连接和分布式集群架构。

二. 搭建MQTT服务器
在 Windows 上搭建MQTT服务器,下载 EMQX Broker 压缩包,将其解压即可。
进入压缩包的bin目录下,执行:

emqx start

则 emqx 服务器在本地启动。使用下面语句可以查看启动状态:

emqx_ctl status


打开浏览器 输入http://localhost:18083/ 进入管理界面(EMQ提供了web的控制后台,默认是18083端口)。默认用户名 admin,默认密码 public。可以看到EMQ的Dashboard界面:


三. Java 实现订阅发布
在发布订阅中,无所谓服务端和客户端,两者都是连接至 mqtt 服务器的客户端(可以查看 dashboard 的连接数)。两者都既可以发布主题,也可以订阅主题。

订阅端:

public class MqttClientTest {
    //订阅的主题
    public static final String TOPIC = "mqtt";

    public static void main(String[] args) {
        MyClient myClient = new MyClient();
        myClient.subscribe(TOPIC, 1);
    }
}

/**
 * 订阅方
 */
class MyClient {
    //mqtt服务器默认的地址和端口号
    public static final String HOST = "tcp://localhost:1883";

    //连接MQTT的客户端ID,一般以唯一标识符表示
    private static final String CLIENTID = "client";
    //连接的用户名密码(非必需)
    private String userName = "admin";
    private String password = "password";

    private MqttClient mqttClient;

    public  MyClient() {
        try {
            mqttClient = new MqttClient(HOST, CLIENTID, new MemoryPersistence());
            MqttConnectOptions options = new MqttConnectOptions();
            options.setCleanSession(true);
            options.setConnectionTimeout(60);
            options.setKeepAliveInterval(10);
            options.setUserName(userName);
            options.setPassword(password.toCharArray());
            //定义回调函数
            mqttClient.setCallback(new PushCallBack());
            mqttClient.connect(options);
        } catch (MqttException e) {
            e.printStackTrace();
        }
    }

    //订阅主题
    public void subscribe(String topic, int qos) {
        try {
            mqttClient.subscribe(topic, qos);
        } catch (MqttException e) {
            e.printStackTrace();
        }
    }
}


发布端:

public class MqttServerTest {
    //发布的主题
    public static final String TOPIC = "mqtt";

    public static void main(String[] args) throws InterruptedException {
        MqttServer mqttServer = new MqttServer();
        MqttMessage message = new MqttMessage();
        message.setQos(1);
        message.setPayload("第一次广播".getBytes());
        mqttServer.publish(TOPIC, message);
        Thread.sleep(1000);
        message.setPayload("第二次广播".getBytes());
        mqttServer.publish(TOPIC, message);
        Thread.sleep(1000);
        message.setPayload("第三次广播".getBytes());
        mqttServer.publish(TOPIC, message);
    }
}

/**
 * 发布方
 */
class MqttServer {
    //mqtt服务器默认的地址和端口号
    private static final String HOST = "tcp://localhost:1883";
    //连接MQTT的客户端ID,一般以唯一标识符表示
    private static final String CLIENTID = "server";
    //连接的用户名密码(非必需)
    private String userName = "admin";
    private String password = "password";

    private MqttClient mqttClient;

    //构造方法,启动mqttClient
    public MqttServer() {
        try {
            // MemoryPersistence设置clientid的保存形式,默认为以内存保存
            mqttClient = new MqttClient(HOST, CLIENTID, new MemoryPersistence());
            MqttConnectOptions options = new MqttConnectOptions();
            options.setCleanSession(true);
            options.setConnectionTimeout(60);
            options.setKeepAliveInterval(10);
            options.setUserName(userName);
            options.setPassword(password.toCharArray());
            //定义回调函数
            mqttClient.setCallback(new PushCallBack());
            mqttClient.connect(options);
        } catch (MqttException e) {
            e.printStackTrace();
        }
    }

    //发布主题
    public void publish(String topic, MqttMessage message) {
        try {
            mqttClient.publish(topic, message);
        } catch (MqttException e) {
            e.printStackTrace();
        }
    }
}

回调函数,发布方订阅方在发布订阅完消息后的回调:

public class PushCallBack implements MqttCallback {

    /**
     * mqtt连接丢失时触发(不包括主动disconnect)
     * @param throwable
     */
    @Override
    public void connectionLost(Throwable throwable) {
        System.out.println("连接失败,可做重连");
    }

    /**
     * 收到订阅消息后调用
     * @param s
     * @param mqttMessage
     * @throws Exception
     */
    @Override
    public void messageArrived(String s, MqttMessage mqttMessage) throws Exception {
        System.out.println("---------------------------");
        System.out.println("接收到的主题为:" + s);
        System.out.println("接收到的消息为:" + new String(mqttMessage.getPayload()));
    }

    /**
     * 发布消息完成后调用
     * @param iMqttDeliveryToken
     */
    @Override
    public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
        System.out.println("---------------------------");
        System.out.println("广播完成");
    }
}


分别启动两个程序,结果展示如下:

四. 实现连接丢失后重新连接
1. 自定义重连方式
当连接突然断开时,可在 connectionLost() 回调方法中设置重连恢复连接。修改 PushCallBack 类中的 connectionLost() 方法:

public class PushCallBack implements MqttCallback {

    private MqttClient mqttClient;
    private MqttConnectOptions options;

    public PushCallBack(MqttClient mqttClient, MqttConnectOptions options) {
        this.mqttClient = mqttClient;
        this.options = options;
    }

    /**
     * mqtt连接丢失时触发(不包括主动disconnect)
     * @param throwable
     */
    @Override
    public void connectionLost(Throwable throwable) {
        while(true) {
            try {
                mqttClient.connect(options);
                if (mqttClient.isConnected()) {
                    break;
                }
            } catch (MqttException e) {
                e.printStackTrace();
            }
        }
        //验证重连
        MqttMessage message = new MqttMessage();
        message.setQos(1);
        message.setPayload("重连后的广播".getBytes());
        try {
            mqttClient.publish("mqtt", message);
        } catch (MqttException e) {
            e.printStackTrace();
        }

    }
}

发布端的代码修改为:

//定义回调函数
mqttClient.setCallback(new PushCallBack(mqttClient, options));

注意,主动 disconnect 是不会触发 connectionLost 方法的,我们可以进入到 dashboard 中剔除发布端的连接进行测试:

可以看到,踢出后server又立即连上了:

订阅方也收到了重连后的广播:

此外要注意的是,重连后不会自动订阅之前已订阅的主题,如果有订阅主题需要手动再次订阅。

2. 自动重连
MqttConnectOptions 有个方法 setAutomaticReconnect,若将其设置为 true,则可实现在连接丢失后自动重连:

options.setAutomaticReconnect(true);

此时则无需在 connectionLost 方法里自己实现重连。

此外,实验中发现,如果 connectionLost 中写了类似于 while(true) 的死循环,automaticReconnect 自动重连不会生效。猜想是因为连接丢失后,mqttClient 首先触发 connectionLost 方法,并且进入循环出不来了,而自动重连也需要当前的 mqttClient 对象调用自动重连方法,所以将无法触发自动重连机制。

五. MqttCallbackExtended
MqttCallback 的升级版,新增了一个 connectComplete 方法,在 mqttClient 连接成功时触发。
因此我们自定义的 PushCallBack方法还可以继承自 MqttCallbackExtended,此时则可以重写 connectComplete 方法,自定义重连成功后的响应事件(一般可在此实现重连后的重新订阅)。

@Override
public void connectComplete(boolean b, String s) {
    System.out.println("重连成功");
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/959898.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Vue+Element-ui实现表格本地导入

表格文件存储在前端 如图,表格文件template.xlsx存储在public下的static文件夹下 注意这里的路径容易报错 a链接下载文件失败的问题(未发现文件) a.href ‘./static/template.xlsx’ 写的时候不能带public,直接这么写就可以 DownloadTemp…

星域的庞大规模已经让我们眩晕

有一句道格拉斯亚当斯的名言银河系漫游指南我最近想了很多。“空间很大,”他写道。“你不会相信它有多么巨大,令人难以置信。我的意思是,你可能认为去药店的路很长,但那只是去太空的小钱。” 星域不妨把这句引言放在其设计文档的封…

从天镜大模型,透视马上消费的“三重价值”

AI正在打开新世界。 红杉资本曾发表名为《生成式AI:一个创造性的新世界》的文章,提到生成式AI将涉及数十亿的人工劳动力,并促使这些人工劳动力的效率和创造力至少提高10%,有潜力产生数万亿美元的经济价值。 大模型,被…

2 | Window 搭建单机 Hadoop 和Spark

搭建单机 Hadoop 和 Spark 环境可以学习和测试大数据处理的基础知识。在 Windows 操作系统上搭建这两个工具需要一些配置和设置,下面是一个详细的教程: 注意: 在开始之前,请确保你已经安装了 Java 开发工具包(JDK),并且已经下载了 Hadoop 和 Spark 的最新版本。你可以从…

程序员:你如何写可重复执行的SQL语句?

上图的意思: 百战百胜,屡试不爽。 故事 程序员小张: 刚毕业,参加工作1年左右,日常工作是CRUD 架构师老李: 多个大型项目经验,精通各种开发架构屠龙宝术; 小张注意到,在…

【Datawhale】AI夏令营第三期——基于论文摘要的文本分类笔记(下)

笔记上部分请看【Datawhale】AI夏令营第三期——基于论文摘要的文本分类笔记(上) 文章目录 一、深度学习Topline1.1 数据预处理1.2 模型训练1.3 评估模型1.4 测试集推理1.5 后续改进 二、大模型Topline2.1 大模型介绍2.2 大模型是什么?2.3 大模型的原理2.4 大模型可…

嵌入式部署机器学习模型---TinyML

我们目前生活在一个被机器学习模型包围的世界。在一天中,您使用这些模型的次数比您意识到的要多。诸如浏览社交媒体、拍照、查看天气等日常任务都依赖于机器学习模型。您甚至可能会看到此博客,因为机器学习模型向您推荐了此博客。 我们都知道训练这些模型…

【位运算】leetcode371:两整数之和

一.题目描述 两整数之和 二.思路分析 题目要求我们实现两整数相加,但是不能使用加号,应该立马想到是用位运算来解决问题。之前说过,异或就是“无进位相加”,故本题可以先将两数异或,然后想办法让得到的结果进位即可。…

yolov5自定义模型训练三

经过11个小时cpu训练完如下 在runs/train/expx里存放训练的结果, 测试是否可以检测ok 网上找的这张识别效果不是很好,通过加大训练次数和数据集的话精度可以提升。 训练后的权重也可以用视频源来识别, python detect.py --source 0 # webca…

WOFOST模型与PCSE模型技术应用

实现作物产量的准确估算对于农田生态系统响应全球变化、可持续发展、科学粮食政策制定、粮食安全维护都至关重要。传统的经验模型、光能利用率模型等估产模型原理简单,数据容易获取,但是作物生长发育非常复杂,中间涉及众多生理生化过程&#…

Ansible-playbook条件语句when的使用

目录 when关键字1.基本使用2.比较运算符3.逻辑运算符4.判断变量 when关键字 1.基本使用 当ansible_os_family是redhat的时候,执行安装vim,不是的话跳过 --- - hosts: webtasks:- name: Install VIM via yumyum:name: vim-enhancedstate: installedwhe…

mac安装brew

mac安装brew 安装brew 安装brew 第一步:执行. /bin/bash -c "$(curl -fsSL https://raw.githubusercontent.com/Homebrew/install/HEAD/install.sh)"第二步:输入开机密码 第三步:回车继续。等待安装完成 第四步:根…

最新盘点!上海值得加入的互联网公司有哪些?(文末附招聘岗位)

暑假结束了,除了迎来了青春热烈的开学季以外,也带来了打工人备受期待的金九银十秋招季。 我们在找工作时,每个人都期待能遇到一个“神仙公司”,譬如丰厚的薪水、优越的晋升通道、融洽的同事关系、良好的work-life balance以及自由…

KubeSphere 社区双周报 | KubeKey 新增网络插件 Hybridnet | 2023.08.18-08.31

KubeSphere 社区双周报主要整理展示新增的贡献者名单和证书、新增的讲师证书以及两周内提交过 commit 的贡献者,并对近期重要的 PR 进行解析,同时还包含了线上/线下活动和布道推广等一系列社区动态。 本次双周报涵盖时间为:2023.08.18-2023.…

Stable Diffusion中的ControlNet插件

文章目录 ControlNet的介绍及安装ControlNet的介绍ControlNet的安装 ControlNet的功能介绍ControlNet的应用与演示 ControlNet的介绍及安装 ControlNet的介绍 ControlNet 的中文就是控制网,本质上是Stable Diffusion的一个扩展插件,在2023年2月份由斯坦…

红米手机使用google play

开启: 1.在 Google Play 支持的设备列表内的小米/红米手机已预装谷歌服务,我们只需要安装Play 商店。 1.开启谷歌服务: 设置 -> 帐号与同步 > 谷歌基础服务 2.安装 Play 商店: 在应用商店搜索 [google play] ,安装[Google Play 商店] …

NPM 常用命令(一)

目录 1、npm 1.1 简介 1.2 依赖性 1.3 安装方式 2、npm access 2.1 命令描述 2.2 详情 3、npm adduser 3.1 描述 4、npm audit 4.1 简介 4.2 审计签名 4.3 操作示例 4.4 配置 audit-level dry-run force json package-lock-only omit foreground-scripts …

更改SVG矢量图片的颜色

问题:我从网上找的svg图片,颜色一直是黑色的,和下边的用户管理模块、卷题管理等模块的图标对不起来,看起来很怪。 办法: 1.直接在你的编程软件中 ctrl + alt +F,全局搜索“组织管理” 找到组织管理对应的文件,然后双击点进去 2.找到icon 这里对应的icon的属性值就是矢…

代理IP的需求量为什么越来越大?如何选择适合您的全球代理IP?

在当今信息爆炸的时代,代理IP已成为大数据领域一项必不可少的工具。越来越多的企业和个人使用代理IP来进行互联网业务,这导致代理IP的需求量不断增加。这是因为代理IP不仅可以帮助用户进行网络爬虫和数据采集,还能够保护个人隐私和网络安全&a…

如何产生潜在客户:增加公司的销售额

图片来源于:SaleSmartly官网 数字营销拥有大量资源可以帮助您增加业务收入。您可以实施多种有关如何产生潜在客户的策略。这是买家旅程中的重要一步,您可以识别潜在客户并定义需要做什么来帮助他们决定购买您的产品或服务。 毫无疑问,征服潜在…