在JAVA中使用Paho MQTT客户端

news2024/11/5 23:22:13

1.在maven里面配置好依赖

<dependency>
  <groupId>org.eclipse.paho</groupId>
	<artifactId>org.eclipse.paho.client.mqttv3</artifactId>
	<version>1.2.2</version>
</dependency>

2.创建APP类

package com.leo;

import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.MqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;

public class App {
    public static void main(String[] args) {
        String subTopic = "topic";  //主题
        String pubTopic = "topic";
        String content = "Hello World66+6";  //内容
        int qos = 2;
        String broker = "tcp://ip:1883";    //ip是自己服务器的ip
        String clientId = "emqx_test";
        MemoryPersistence persistence = new MemoryPersistence();

        try {
            MqttClient client = new MqttClient(broker, clientId, persistence);

            // MQTT 连接选项
            MqttConnectOptions connOpts = new MqttConnectOptions();
            connOpts.setUserName("name");   //用户名
            connOpts.setPassword("12345".toCharArray());   //密码
            // 保留会话
            connOpts.setCleanSession(true);

            // 设置回调
            client.setCallback(new OnMessageCallback());

            // 建立连接
            System.out.println("Connecting to broker: " + broker);
            client.connect(connOpts);

            System.out.println("Connected");
            System.out.println("Publishing message: " + content);

            // 订阅
            client.subscribe(subTopic);

            // 消息发布所需参数
            MqttMessage message = new MqttMessage(content.getBytes());
            message.setQos(qos);
            client.publish(pubTopic, message);
            System.out.println("Message published");

            // 主循环,保持程序运行
            while (true) {
                try {
                    Thread.sleep(1000); // 每秒检查一次
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }

        } catch (MqttException me) {
            System.out.println("reason " + me.getReasonCode());
            System.out.println("msg " + me.getMessage());
            System.out.println("loc " + me.getLocalizedMessage());
            System.out.println("cause " + me.getCause());
            System.out.println("excep " + me);
            me.printStackTrace();
        }
    }

    static class OnMessageCallback implements MqttCallback {
        @Override
        public void connectionLost(Throwable cause) {
            System.out.println("Connection lost: " + cause.getMessage());
        }

        @Override
        public void messageArrived(String topic, MqttMessage message) throws Exception {
            System.out.println("Message arrived: " + new String(message.getPayload()));
        }

        @Override
        public void deliveryComplete(IMqttDeliveryToken token) {
            System.out.println("Delivery complete for token: " + token);
        }
    }
}

3.创建回调消息处理类 OnMessageCallback

package io.emqx;

import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttMessage;

public class OnMessageCallback implements MqttCallback {
    public void connectionLost(Throwable cause) {
        // 连接丢失后,一般在这里面进行重连
        System.out.println("连接断开,可以做重连");
    }

    public void messageArrived(String topic, MqttMessage message) throws Exception {
        // subscribe后得到的消息会执行到这里面
        System.out.println("接收消息主题:" + topic);
        System.out.println("接收消息Qos:" + message.getQos());
        System.out.println("接收消息内容:" + new String(message.getPayload()));
    }

    public void deliveryComplete(IMqttDeliveryToken token) {
        System.out.println("deliveryComplete---------" + token.isComplete());
    }
}

4.结果展示

至此第一部分的功能已经完成了,以下是扩展功能

5.整合到springboot

创建一个 MqttService 类

package com.leo.service;

import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.MqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import org.springframework.stereotype.Service;

@Service
public class MqttService {

    public void startMqttClient() {
        String subTopic = "topic";  //订阅的主题
        String pubTopic = "topic";
        String content = "Hello World66+6";
        int qos = 2;
        String broker = "tcp://ip:1883";  //你自己的ip
        String clientId = "emqx_test";
        MemoryPersistence persistence = new MemoryPersistence();

        try {
            MqttClient client = new MqttClient(broker, clientId, persistence);

            // MQTT 连接选项
            MqttConnectOptions connOpts = new MqttConnectOptions();
            connOpts.setUserName("8266");
            connOpts.setPassword("12345".toCharArray());
            // 保留会话
            connOpts.setCleanSession(true);

            // 设置回调
            client.setCallback(new OnMessageCallback());

            // 建立连接
            System.out.println("Connecting to broker: " + broker);
            client.connect(connOpts);

            System.out.println("Connected");
            System.out.println("Publishing message: " + content);

            // 订阅
            client.subscribe(subTopic);

            // 消息发布所需参数
            MqttMessage message = new MqttMessage(content.getBytes());
            message.setQos(qos);
            client.publish(pubTopic, message);
            System.out.println("Message published");

            // 主循环,保持程序运行
            while (true) {
                try {
                    Thread.sleep(1000); // 每秒检查一次
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }

        } catch (MqttException me) {
            System.out.println("reason " + me.getReasonCode());
            System.out.println("msg " + me.getMessage());
            System.out.println("loc " + me.getLocalizedMessage());
            System.out.println("cause " + me.getCause());
            System.out.println("excep " + me);
            me.printStackTrace();
        }
    }

    static class OnMessageCallback implements MqttCallback {
        @Override
        public void connectionLost(Throwable cause) {
            System.out.println("Connection lost: " + cause.getMessage());
        }

        @Override
        public void messageArrived(String topic, MqttMessage message) throws Exception {
            System.out.println("Message arrived: " +
                    "Topic: " + topic +
                    ", ID: " + message.getId() +
                    ", QoS: " + message.getQos() +
                    ", Class: " + message.getClass().getName() +
                    ", Payload: " + new String(message.getPayload())
            );
        }

        @Override
        public void deliveryComplete(IMqttDeliveryToken token) {
            System.out.println("Delivery complete for token: " + token);
        }
    }
}

在 Spring Boot 应用中调用服务

SprintBootDemo1Application 中调用 MqttServicestartMqttClient 方法。为了确保主线程不会因为 Spring Boot 的启动过程而退出,可以在 startMqttClient 方法中使用一个新的线程来运行 MQTT 客户端逻辑。

package com.leo;

import com.leo.service.MqttService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
public class SprintBootDemo1Application implements CommandLineRunner {

    @Autowired
    private MqttService mqttService;

    public static void main(String[] args) {
        SpringApplication.run(SprintBootDemo1Application.class, args);
    }

    @Override
    public void run(String... args) throws Exception {
        // 在新线程中启动 MQTT 客户端
        new Thread(() -> mqttService.startMqttClient()).start();
    }
}

就ok了

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2229872.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Django+websocket实现一个简单聊天

WebSocket是一种在单个TCP连接上进行全双工通信的协议。它由IETF在2011年定为标准RFC 6455&#xff0c;并由RFC7936补充规范&#xff0c;同时WebSocket API也被W3C定为标准。 1、定义与原理 WebSocket是独立的、创建在TCP上的协议&#xff0c;它使用HTTP/1.1协议的101状态码进…

MATLAB实现人类学习优化算法HLO

1.算法简介 人类学习优化算法&#xff08;Human Learning-based Optimization&#xff0c;HLO&#xff09;是一种基于人类学习过程开发的启发式算法。HLO算法的设计灵感来源于人类的智慧和经验&#xff0c;特别是人类在学习和调整过程中展现出的适应性、学习能力和创新思维。该…

【果蔬识别】Python+卷积神经网络算法+深度学习+人工智能+机器学习+TensorFlow+计算机课设项目+算法模型

一、介绍 果蔬识别系统&#xff0c;本系统使用Python作为主要开发语言&#xff0c;通过收集了12种常见的水果和蔬菜&#xff08;‘土豆’, ‘圣女果’, ‘大白菜’, ‘大葱’, ‘梨’, ‘胡萝卜’, ‘芒果’, ‘苹果’, ‘西红柿’, ‘韭菜’, ‘香蕉’, ‘黄瓜’&#xff09;…

Android 策略设计模式的使用:使用设计模式,减少烂代码,让项目更好维护

目录 大家好呀~&#xff0c;我是前期后期&#xff0c;在网上冲浪的一名程序员&#xff0c;分享一些自己学到的知识&#xff0c;希望对大家有所帮助 前言&#xff1a;为什么要使用设计模式 在项目开发过程中&#xff0c;我们会对接很多种支付&#xff1a;国内&#xff08;微信…

uniapp和vite项目配置多环境编译,增加测试环境变量配置--mode test

如果你的项目是使用vite和uniapp配置开发的&#xff0c;就可以在代码里面获取到这些变量&#xff0c;但是开发&#xff0c;测试和发布是不同的请求地址&#xff0c;所以需要配置。Vite 使用 dotenv 从你的 环境目录 中的下列文件加载额外的环境变量&#xff1a; .env …

动态规划 - 编辑距离

115. 不同的子序列 困难 给你两个字符串 s 和 t &#xff0c;统计并返回在 s 的 子序列 中 t 出现的个数&#xff0c;结果需要对 10^9 7 取模。 算法思想&#xff1a;利用动态规划&#xff0c;分s[i - 1] 与 t[j - 1]相等&#xff0c;s[i - 1] 与 t[j - 1] 不相等两种情况具…

sudo apt install jupyter-notebook安装notebook失败E: Aborting install.

问题&#xff1a; sudo apt install jupyter-notebook安装notebook失败E: Aborting install. ~/jie/mywork/PointNetCFD$ sudo apt install jupyter-notebook --fix-missing Reading package lists... Done Building dependency tree Reading state information... Do…

第16课 核心函数(方法)

掌握常用的内置函数及其用法。 数学类函数&#xff1a;abs、divmod、max、min、pow、round、sum。 类型转换函数&#xff1a;bool、int、float、str、ord、chr、bin、hex、tuple、list、dict、set、enumerate、range、object。 序列操作函数&#xff1a;all、any、filter、m…

【1个月速成Java】基于Android平台开发个人记账app学习日记——第2天,启动项目

24.11.01 下面讲一下如何通过USB连接手机然后启动app实现真机测试&#xff0c;还是有一些坑的。 调整电脑的驱动程序&#xff0c;完成USB的连接 在启动项目的第一步我就遇见了问题&#xff0c;那就是插入usb线以后没有检测到设备。想要完成连接需要2个步骤&#xff0c;第一步…

使用Mac如何才能提高OCR与翻译的效率

OCR与截图大家都不陌生&#xff0c;或许有的朋友对于这两项功能用到的不多&#xff0c;但是如果经常会用到的话&#xff0c;那你就该看看了 iOCR&#xff0c;快捷键唤出翻译窗口&#xff0c;不论是截图翻译、划词翻译、输入翻译、剪切板翻译&#xff0c;统统快捷键完成&#x…

Etsy又被封号了!这次我终于搞懂了原因...

你是否真的了解在Etsy开店有哪些红线不能踩&#xff1f;你是否真的知道Etsy被封号后如何解决&#xff1f;本文我将探讨Etsy账号被封的常见原因&#xff0c;以及卖家可以采取的应对策略&#xff0c;以期减轻对跨境业务的伤害程度&#xff0c;感兴趣的商家速速码住&#xff0c;不…

MySQL — 事务 (o゚▽゚)o

文本目录&#xff1a; ❄️一、什么是事务&#xff1a; ❄️二、ACID特性&#xff1a; ❄️三、使用事务&#xff1a; ▶1、查看支持事务的存储引擎&#xff1a; ▶2、语法&#xff1a; ▶3、开启并且回滚&#xff1a; ▶4、开启并且提交&#xff1a; ▶ 5、保存点&#xff…

DOS时代软件遗憾落幕,国产编程新势力接过火炬

在计算机发展史上&#xff0c;DOS时代是一个不可磨灭的篇章。那个时期&#xff0c;虽然操作系统的图形界面尚未普及&#xff0c;但一款款经典软件却为我们打开了通往数字世界的大门&#xff0c;让我们在那个相对简单却充满魅力的时代中&#xff0c;感受到科技的魅力与创新的力量…

Qt的信号槽机制学习一

一、Qt理论知识简记 &#xff08;一&#xff09;信号与槽[1] 信号与槽是Qt编程的基础&#xff0c;其使得处理界面上各个组件的交互操作变得比较直观和简单&#xff0c;GUI&#xff08;Graphical User Interface&#xff09;程序设计的主要工作就是对界面上各组件的信号进行相应…

P11232 [CSP-S 2024] 超速检测

P11232 [CSP-S 2024] 超速检测 难度&#xff1a;普及/提高。 考点&#xff1a;二分、贪心。 题意&#xff1a; 题意较长&#xff0c;没有题目大意&#xff0c;否则你也大意。 主干道长度为 L L L&#xff0c;有 n n n 辆车&#xff0c;看做左端点为 0 0 0&#xff0c;第 …

使用GetX实现GetPage中间件

前言 GetX 中间件&#xff08;Middleware&#xff09;是 GetX 框架中的一种机制&#xff0c;用于在页面导航时对用户进行权限控制、数据预加载、页面访问条件设置等。通过使用中间件&#xff0c;可以有效地控制用户的访问流程&#xff0c;并在适当条件下引导用户到所需页面。 这…

JAVA:常见 JSON 库的技术详解

1、简述 在现代应用开发中&#xff0c;JSON&#xff08;JavaScript Object Notation&#xff09;已成为数据交换的标准格式。Java 提供了多种方式将对象转换为 JSON 或从 JSON 转换为对象&#xff0c;常见的库包括 Jackson、Gson 和 org.json。本文将介绍几种常用的 JSON 处理…

视频怎么去水印?7个视频去水印在线工具大比拼,宝藏工具推荐!

您是否正在寻找一款好用的视频去水印在线工具&#xff0c;却总是难以找到合适的去水印软件&#xff1f;别担心&#xff0c;今天在本文中小编将和大家分享一些去水印的小助手。很多人都觉得视频或图片上的水印十分烦人。如果您有着同样的烦恼&#xff0c;那么使用去水印工具将是…

如何一键更换ppt模板?掌握这2个ppt技巧快速搞定!

每当要制作ppt&#xff0c;很多人会第一时间去搜刮各种ppt模板&#xff0c;有时我们找到了一份貌似符合需求的模板&#xff0c;等到了ppt制作环节&#xff0c;才发现离我们的预期相距甚远&#xff0c;做到一半的ppt如何换模板呢&#xff1f; 想要在中途更换ppt模板&#xff0c;…

0基础入门linux文件系统

目录 文件系统简介 1. 文件系统类型 2. 文件系统结构 3. 文件系统的主要功能 4. 文件系统的使用 5. 文件系统的维护 6. 注意事项 简单举例 机械硬盘 物理结构介绍​编辑 CHS寻址 逻辑结构介绍 LBA寻址法 文件系统与磁盘管理 Boot Block Data block inode block…