Java模拟Mqtt客户端连接Mqtt Broker

news2024/12/23 8:00:23

Java模拟Mqtt客户端基本流程

引入Paho MQTT客户端库

<dependency>
    <groupId>org.eclipse.paho</groupId>
    <artifactId>org.eclipse.paho.mqttv5.client</artifactId>
    <version>1.2.5</version>
</dependency>

设置mqtt配置数据

在application.yml中添加如下配置

mqtt:
    broker-url: tcp://42.194.132.44:1883
    client-id: mqtt_receive_server
    username: mqtt_server
    password: 9b31fa798e16532b0285e130b004836d33391f908f043f2ce0897eea0a669fa0

MqttClient配置

将MqttClient加入到IoC容器,并连接客户端

package com.angel.ocean.config;

import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class MqttConfig {

    @Value("${mqtt.broker-url}")
    private String brokerUrl;

    @Value("${mqtt.client-id}")
    private String clientId;

    @Value("${mqtt.username}")
    private String username;

    @Value("${mqtt.password}")
    private String password;

    @Bean
    public MqttClient mqttClient() throws MqttException {
        MqttClient client = new MqttClient(brokerUrl, clientId);
        MqttConnectOptions options = new MqttConnectOptions();
        options.setUserName(username);
        options.setPassword(password.toCharArray());
        options.setCleanSession(true);
        client.connect(options);
        return client;
    }
}

MqttService

mqtt客户端,一些基本操作:连接、订阅、发消息,断开连接

package com.angel.ocean.mqtt;

import com.angel.ocean.contants.MqttTopicConstant;
import com.angel.ocean.kafka.KafkaService;
import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.springframework.stereotype.Service;
import javax.annotation.PostConstruct;
import javax.annotation.Resource;

@Slf4j
@Service
public class MqttService {

    @Resource
    private MqttClient client;

    @Resource
    private KafkaService kafkaService;

    @PostConstruct
    public void init() throws MqttException {
        client.setCallback(new MqttCallbackHandler(kafkaService));
        subscribe(MqttTopicConstant.ACTIVATE);
        subscribe(MqttTopicConstant.RESET);
        subscribe(MqttTopicConstant.ONLINE);
        subscribe(MqttTopicConstant.OFFLINE);
        subscribe(MqttTopicConstant.REPORT);
    }

    /**
     * 连接
     */
    public void connect(String username, String password) throws MqttException {

        if(!client.isConnected()) {
            MqttConnectOptions options = new MqttConnectOptions();
            options.setUserName(username);
            options.setPassword(password.toCharArray());
            options.setCleanSession(true);
            client.connect(options);
        }
    }

    /**
     * 发送消息
     */
    public void publish(String topic, String data) {

        if(client.isConnected()) {
            MqttMessage message = new MqttMessage(data.getBytes());
            message.setQos(0);
            try {
                client.publish(topic, message);
                log.info("Message published:{}, topic:{}, content:{}", client.getClientId(), topic, data);
            } catch (MqttException e) {
                log.error("Message publish failed:{}, topic:{}", client.getClientId(), topic, e);
            }
            return;
        }

        log.info("Message publish failed, client:{} not online.", client.getClientId());
    }

    /**
     * 订阅
     */
    public void subscribe(String topic) {

        if(client.isConnected()) {
            try {
                client.subscribe(topic);
                log.info("Message subscribed:{}, topic:{}", client.getClientId(), topic);
            } catch (MqttException e) {
                log.error("Message subscribe failed:{}, topic:{}", client.getClientId(), topic, e);
            }
            return;
        }

        log.info("Message subscribe failed, client:{} not online.", client.getClientId());
    }

    /**
     * 断开连接
     */
    public void disconnect() {
        try {
            client.disconnect();
            client.close();
            log.info("Disconnected:{}", client.getClientId());
        } catch (MqttException e) {
            log.error("Message disconnect failed:{}", client.getClientId(), e);
        }
    }
}

自定义MqttCallback

对客户端连接丢失,收到消息做一些模拟处理

package com.angel.ocean.mqtt;

import com.alibaba.fastjson2.JSON;
import com.alibaba.fastjson2.JSONObject;
import com.angel.ocean.domain.UpData;
import com.angel.ocean.domain.UpKafKaData;
import com.angel.ocean.kafka.KafkaService;
import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.*;
import org.springframework.stereotype.Component;
import static com.angel.ocean.contants.KafkaTopicConstant.UP_DATA_TOPIC;

@Slf4j
public class MqttCallbackHandler implements MqttCallback {

    private KafkaService kafkaService;

    public MqttCallbackHandler(KafkaService kafkaService) {
        this.kafkaService = kafkaService;
    }

    @Override
    public void connectionLost(Throwable cause) {
        // 连接丢失后,一般在这里面进行重连
        log.info("连接断开...", cause);
    }

    @Override
    public void messageArrived(String topic, MqttMessage message) throws Exception {
        String data = new String(message.getPayload());
        log.info("接收消息主题:{}, Qos:{}, 消息内容:{}", topic, message.getQos(), data);
        UpData upData = JSONObject.parseObject(data, UpData.class);
        UpKafKaData upKafKaData = new UpKafKaData(topic, data);
        log.info("upKafKaData: {}", JSON.toJSONString(upKafKaData));
        kafkaService.sendData(UP_DATA_TOPIC, upData.getClientId(), JSON.toJSONString(upKafKaData));
    }

    @Override
    public void deliveryComplete(IMqttDeliveryToken token) {
        log.info("deliveryComplete---------:{}", token.isComplete());
    }
}

MqttController

用于模拟客户端行为

package com.angel.ocean.controller;

import com.angel.ocean.common.ApiResult;
import com.angel.ocean.common.BaseController;
import com.angel.ocean.mqtt.MqttService;
import io.swagger.annotations.Api;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import javax.annotation.Resource;

/**
 *  前端控制器
 *
 * @author Jaime.yu
 * @time 2024-12-01
 */
@Api(value = "接口", tags = {"相关接口"})
@RestController
@RequestMapping("/mqtt/client")
public class MqttController extends BaseController {

    @Resource
    private MqttService mqttService;

    @GetMapping("/subscribe")
    public ApiResult<?> subscribe(String topic) {
        mqttService.subscribe(topic);
        return ApiResult.success();
    }

    @GetMapping("/publish")
    public ApiResult<?> publish(String topic, String message) {
        mqttService.publish(topic, message);
        return ApiResult.success();
    }

    @GetMapping("/disconnect")
    public ApiResult<?> disconnect() {
        mqttService.disconnect();
        return ApiResult.success();
    }
}

代码验证

启动mqtt客户端

如下图客户端已上线:
在这里插入图片描述

发送消息

在这里插入图片描述如下图mqtt broker该客户端的日志,接收到了我们发送的数据:hello world
在这里插入图片描述

接收数据

首先我们先订阅个主题:mqtt/0/0

在这里插入图片描述

使用MQTTX客户端向该主题发消息

在这里插入图片描述

Java mqtt客户端接收数据

查询本地Java mqtt客户收到的消息,如下图收到该消息
在这里插入图片描述mqtt broker 也可以看到该日志:
在这里插入图片描述

断开连接

在这里插入图片描述如下图本地客户端862024121819020已断开连接:
在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2264131.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

boost asio 异步服务器

boost网络框架使用方法 boost绑定 首先介绍io_context&#xff0c;可以理解为这是操作系统和应用层数据交互的桥梁。有了它不必关注内核态的缓冲区&#xff0c;只需要关注自己定义在用户态的缓冲区&#xff0c;因为它会通过桥梁运输到用户态的缓冲区。 boost::asio::io_contex…

图解HTTP-HTTP协议

HTTP HTTP是一种不保存状态&#xff0c;即无状态的协议。HTTP协议自身不对请求和响应之间的通信进行保存。为了保存状态因此后面也有一些技术产生比如Cookies技术。 HTTP是通过URI定位网上的资源&#xff0c;理论上将URI可以访问互联网上的任意资源。 如果不是访问特定的资源…

【Go】-限流器的四种实现方法

目录 关于限流和限流器 固定窗口限流器 滑动窗口限流器 漏桶限流器 令牌桶限流器 总结 关于限流和限流器 限流&#xff08;Rate Limiting&#xff09;是一种控制资源使用率的机制&#xff0c;通常用于防止系统过载和滥用。 限流器&#xff08;Rate Limiter&#xff09;是…

CTF_1

CTF_Show 萌新赛 1.签到题 <?php if(isset($_GET[url])){system("curl https://".$_GET[url].".ctf.show"); }else{show_source(__FILE__); }?> 和 AI 一起分析 1.if(isset($_GET[url]))检查GET请求中是否存在名为url的参数。 curl 2.curl…

[文献阅读] Unsupervised Deep Embedding for Clustering Analysis (无监督的深度嵌入式聚类)

文章目录 Abstract:摘要聚类深度聚类 KL散度深度嵌入式聚类(DEC)KL散度聚类软分配&#xff08;soft assignment&#xff09;KL散度损失训练编码器的初始化聚类中心的初始化 实验评估总结 Abstract: This week I read Unsupervised Deep Embedding for Clustering Analysis .It…

记录:virt-manager配置Ubuntu arm虚拟机

virt-manager&#xff08;Virtual Machine Manager&#xff09;是一个图形用户界面应用程序&#xff0c;通过libvirt管理虚拟机&#xff08;即作为libvirt的图形前端&#xff09; 因为要在Linux arm环境做测试&#xff0c;记录下virt-manager配置arm虚拟机的过程 先在VMWare中…

使用C语言编写UDP循环接收并打印消息的程序

使用C语言编写UDP循环接收并打印消息的程序 前提条件程序概述伪代码C语言实现编译和运行C改进之自由设定端口注意事项在本文中,我们将展示如何使用C语言编写一个简单的UDP服务器程序,该程序将循环接收来自指定端口的UDP消息,并将接收到的消息打印到控制台。我们将使用POSIX套…

Spring Boot 教程之三十六:实现身份验证

如何在 Spring Boot 中实现简单的身份验证&#xff1f; 在本文中&#xff0c;我们将学习如何使用 Spring设置和配置基本身份验证。身份验证是任何类型的安全性中的主要步骤之一。Spring 提供依赖项&#xff0c;即Spring Security&#xff0c;可帮助在 API 上建立身份验证。有很…

什么样的LabVIEW控制算自动控制?

自动控制是指系统通过预先设计的算法和逻辑&#xff0c;在无人工干预的情况下对被控对象的状态进行实时监测、决策和调整&#xff0c;达到预期目标的过程。LabVIEW作为一种图形化编程工具&#xff0c;非常适合开发自动控制系统。那么&#xff0c;什么样的LabVIEW控制算作“自动…

GFPS扩展技术原理(七)-音频切换消息流

音频切换消息流 Seeker和Provider通过消息流来同步音频切换能力&#xff0c;触发连接做切换&#xff0c;获取或设置音频切换偏好&#xff0c;通知连接状态等等。为此专门定义了音频切换消息流Message Group 为0x07&#xff0c;Message codes如下&#xff1a; MAC of Audio s…

视频直播点播平台EasyDSS与无人机技术的森林防火融合应用

随着科技的飞速发展&#xff0c;无人机技术以其独特的优势在各个领域得到了广泛应用&#xff0c;特别是在森林防火这一关键领域&#xff0c;EasyDSS视频平台与无人机技术的融合应用更是为传统森林防火手段带来很大的变化。 一、无人机技术在森林防火中的优势 ‌1、快速响应与高…

机器人路径规划和避障算法matlab仿真,分别对比贪婪搜索,最安全距离,RPM以及RRT四种算法

目录 1.程序功能描述 2.测试软件版本以及运行结果展示 3.核心程序 4.本算法原理 4.1贪婪搜索算法原理 4.2最安全距离算法原理 4.3RPM 算法原理 4.4 RRT 算法原理 5.完整程序 1.程序功能描述 机器人路径规划和避障算法matlab仿真,分别对比贪婪搜索,最安全距离,RPM以及R…

【论文笔记】Visual Alignment Pre-training for Sign Language Translation

&#x1f34e;个人主页&#xff1a;小嗷犬的个人主页 &#x1f34a;个人网站&#xff1a;小嗷犬的技术小站 &#x1f96d;个人信条&#xff1a;为天地立心&#xff0c;为生民立命&#xff0c;为往圣继绝学&#xff0c;为万世开太平。 基本信息 标题: Visual Alignment Pre-tra…

【附源码】Electron Windows桌面壁纸开发中的 CommonJS 和 ES Module 引入问题以及 Webpack 如何处理这种兼容

背景 在尝试让 ChatGPT 自动开发一个桌面壁纸更改的功能时&#xff0c;发现引入了一个 wallpaper 库&#xff0c;这个库的入口文件是 index.js&#xff0c;但是 package.json 文件下的 type:"module"&#xff0c;这样造成了无论你使用 import from 还是 require&…

Apache解析漏洞(apache_parsingCVE-2017-15715)

apache_parsing 到浏览器中访问网站 http://8.155.8.239:81/ 我们写一个木马 1.php.jpg 我们将写好的木马上传 会得到我们上传文件的路径 我们访问一下 发现上传成功 发现木马运行成功&#xff0c;接下来使用蚁剑连接我们的图片马 获取 shell 成功 CVE-2013-454 我们还是到…

C++-----函数与库

数学中的函数与编程中的函数对比 数学中的函数 - 数学函数是一种映射关系&#xff0c;例如&#xff0c;函数\(y f(x)x^{2}\)&#xff0c;对于每一个输入值\(x\)&#xff0c;都有唯一确定的输出值\(y\)。它侧重于描述变量之间的数量关系&#xff0c;通常通过公式来表示这种关系…

带着国标充电器出国怎么办? 适配器模式(Adapter Pattern)

适配器模式&#xff08;Adapter Pattern&#xff09; 适配器模式适配器模式&#xff08;Adapter Pattern&#xff09;概述talk is cheap&#xff0c; show you my code总结 适配器模式 适配器模式&#xff08;Adapter Pattern&#xff09;是面向对象软件设计中的一种结构型设计…

SKETCHPAD——允许语言模型生成中间草图,在几何、函数、图算法和游戏策略等所有数学任务中持续提高基础模型的性能

概述 论文地址&#xff1a;https://arxiv.org/pdf/2406.09403 素描是一种应用广泛的有效工具&#xff0c;包括产生创意和解决问题。由于素描能直接传达无法用语言表达的视觉和空间信息&#xff0c;因此从古代岩画到现代建筑图纸&#xff0c;素描在世界各地被用于各种用途。儿童…

初等函数整理

1.幂函数 2.指数函数 3.对数函数

【C/C++】手搓项目中常用小工具:日志、sqlit数据库、Split切割、UUID唯一标识

每日激励&#xff1a;“不设限和自我肯定的心态&#xff1a;I can do all things。 — Stephen Curry” 绪论​&#xff1a; 本章将写到一些手搓常用工具&#xff0c;方便在项目中的使用&#xff0c;并且在手搓的过程中一些函数如&#xff1a;日志 宏中的__VA_ARGS__接收可变参…