Java实现Mqtt收发消息

news2025/1/13 2:51:19

Java实现Mqtt收发消息

文章目录

  • Java实现Mqtt收发消息
    • windows mqtt 平台服务搭建
    • mqtt 客户端工具:mqttbox
    • 整体代码结构
    • mqtt基础参数配置类
    • mqtt客户端连接
    • mqtt接收的消息处理类
      • 对应的MqttService注解和MqttTopic注解
    • MqttGateway 发送消息
    • 指定topic接收处理方法

java实现mqtt对消息的交互,mqtt 的topic主题概念是相互的,这个要先理解好,
发布者和订阅者是对等的,它们之间可以相互发送消息,而不需要建立任何连接或状态
使用到windows mqtt 平台服务搭建(不是必须安装,仅 windows 测试需要此步骤)
mqtt 客户端工具:mqttbox
废话不多说,直接上代码,上工具,准备工作先做好,以及我的实现过程

windows mqtt 平台服务搭建

下载apache-apollo-1.7.1-windows版本,这里提供一个链接地址
http://archive.apache.org/dist/activemq/activemq-apollo/1.7.1/

提供一个现有教程:
https://blog.csdn.net/qq_42315062/article/details/125890181
搭建完成后:登录 http://127.0.0.1:61680 即可,默认账号 admin,密码 password,
注意 这里网页的端口是 61680,但是 mqtt 服务的端口是 61613

mqtt 客户端工具:mqttbox

这里提供一个下载地方,也可以自行下载
https://download.csdn.net/download/qq_39671088/85740566?utm_medium=distribute.pc_relevant_download.none-task-download-2~default~BlogCommendFromBaidu~Rate-1-85740566-download-12755743.257%5Ev10%5Etop_income_click_base3&depth_1-utm_source=distribute.pc_relevant_download.none-task-download-2~default~BlogCommendFromBaidu~Rate-1-85740566-download-12755743.257%5Ev10%5Etop_income_click_base3&spm=1003.2020.3001.6616.1

在这里插入图片描述

整体代码结构

在这里插入图片描述

mqtt基础参数配置类

在这里插入图片描述

@Data
@Component
@ConfigurationProperties("mqtt")
public class MqttProperties {

    /**
     * 用户名
     */
    private String username;

    /**
     * 密码
     */
    private String password;

    /**
     * 连接地址
     */
    private String hostUrl;

    /**
     * 进-客户Id
     */
    private String inClientId;

    /**
     * 出-客户Id
     */
    private String outClientId;

    /**
     * 客户Id
     */
    private String clientId;

    /**
     * 默认连接话题
     */
    private String defaultTopic;

    /**
     * 超时时间
     */
    private int timeout;

    /**
     * 保持连接数
     */
    private int keepalive;

    /**
     * 是否清除session
     */
    private boolean clearSession;
}

mqtt客户端连接

import com.bsj.boyun.core.tool.utils.ExceptionUtil;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.channel.ExecutorChannel;
import org.springframework.integration.dsl.IntegrationFlow;
import org.springframework.integration.dsl.IntegrationFlows;
import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory;
import org.springframework.integration.mqtt.core.MqttPahoClientFactory;
import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter;
import org.springframework.integration.mqtt.outbound.MqttPahoMessageHandler;
import org.springframework.integration.mqtt.support.DefaultPahoMessageConverter;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import java.util.concurrent.ThreadPoolExecutor;

@Configuration
public class MqttConfig {

    @Autowired
    private MqttProperties mqttProperties;
    @Autowired
    private MqttMessageHandle mqttMessageHandle;

    /**
     * 出站通道,MqttGateway类
     */
    private static String outboundChannel = "mqttOutboundChannel";

    /**
     * Mqtt 客户端工厂 所有客户端从这里产生
     *
     * @return
     */
    @Bean
    public MqttPahoClientFactory mqttPahoClientFactory() throws MqttException {
        DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
        try {
            MqttConnectOptions options = new MqttConnectOptions();
            options.setServerURIs(mqttProperties.getHostUrl().split(","));
            options.setUserName(mqttProperties.getUsername());
            options.setPassword(mqttProperties.getPassword().toCharArray());
            factory.setConnectionOptions(options);
        } catch (Exception e) {
            System.out.println("mqtt初始化连接异常:" + ExceptionUtil.getStackStr(e));
        }
        return factory;
    }

	/**
     * Mqtt 管道适配器
     *
     * @param factory
     * @return
     */
    @Bean
    public MqttPahoMessageDrivenChannelAdapter adapter(MqttPahoClientFactory factory) {
        return new MqttPahoMessageDrivenChannelAdapter(mqttProperties.getInClientId(), factory, mqttProperties.getDefaultTopic().split(","));
    }

    /**
     * 消息消费者 (接收,处理来自mqtt的消息)
     *
     * @param adapter
     * @return
     */
    @Bean
    public IntegrationFlow mqttInbound(MqttPahoMessageDrivenChannelAdapter adapter) {
        adapter.setCompletionTimeout(5000);
        adapter.setQos(1);
        //默认的消息通道,它将消息发送给为一个订阅者,然后阻碍发送直到消息被接收,传输方式都是同步的方式,都是由一个线程来运行的
        //这里我们可以将入站channel改成 ExecutorChannel一个可以使用多线程的channel
        return IntegrationFlows.from(adapter)
                .channel(new ExecutorChannel(mqttThreadPoolTaskExecutor()))
                .handle(mqttMessageHandle).get();
    }

    @Bean
    public ThreadPoolTaskExecutor mqttThreadPoolTaskExecutor() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        // 最大可创建的线程数
        int maxPoolSize = 200;
        executor.setMaxPoolSize(maxPoolSize);
        // 核心线程池大小
        int corePoolSize = 50;
        executor.setCorePoolSize(corePoolSize);
        // 队列最大长度
        int queueCapacity = 1000;
        executor.setQueueCapacity(queueCapacity);
        // 线程池维护线程所允许的空闲时间
        int keepAliveSeconds = 300;
        executor.setKeepAliveSeconds(keepAliveSeconds);
        // 线程池对拒绝任务(无线程可用)的处理策略
        executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
        return executor;
    }

    /**
     * 出站处理器 (向 mqtt 发送消息 生产者)
     *
     * @param factory
     * @return
     */
    @Bean
    public IntegrationFlow mqttOutboundFlow(MqttPahoClientFactory factory) {
        MqttPahoMessageHandler handler = new MqttPahoMessageHandler(mqttProperties.getOutClientId(), factory);
        handler.setAsync(true);
        handler.setConverter(new DefaultPahoMessageConverter());
        handler.setDefaultTopic(mqttProperties.getDefaultTopic().split(",")[0]);
        return IntegrationFlows.from(outboundChannel).handle(handler).get();
    }

}

mqtt接收的消息处理类

import com.bsj.studentcard.gateway.attendance.mqtt.annotation.MqttService;
import com.bsj.studentcard.gateway.attendance.mqtt.annotation.MqttTopic;
import com.bsj.studentcard.gateway.attendance.util.SpringUtils;
import lombok.extern.slf4j.Slf4j;
import org.springframework.integration.mqtt.support.MqttHeaders;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHandler;
import org.springframework.messaging.MessagingException;
import org.springframework.stereotype.Component;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.util.Map;

/**
 * 所有mqtt到达的消息都会在这里处理
 * 参考MVC @RequestMapping的方式
 * 使用注解映射到专门的Topic去处理(MqttTopicHandle类),不写 if else
 **/
@Component
@Slf4j
public class MqttMessageHandle implements MessageHandler {

    /**
     * 包含 @MqttService注解 的类(Component)
     */
    public static Map<String, Object> mqttServices;

    /**
     * 所有mqtt到达的消息都会在这里处理
     * 要注意这个方法是在线程池里面运行的
     *
     * @param message message
     */
    @Override
    public void handleMessage(Message<?> message) throws MessagingException {
        getMqttTopicService(message);
    }
    
    /**
     * 获取有@MqttService 的类,专门处理topic消息的类
     *
     * @return
     */
    public Map<String, Object> getMqttServices() {
        if (mqttServices == null) {
            mqttServices = SpringUtils.getBeansByAnnotation(MqttService.class);
        }
        return mqttServices;
    }

    /**
     * topic 匹配
     *
     * @param message
     */
    public void getMqttTopicService(Message<?> message) {
        // 在这里 我们根据不同的 主题 分发不同的消息
        String receivedTopic = (String) message.getHeaders().get(MqttHeaders.RECEIVED_TOPIC);
        if (receivedTopic == null || "".equals(receivedTopic)) {
            return;
        }
        for (Map.Entry<String, Object> entry : getMqttServices().entrySet()) {
            // 把所有带有 @MqttService 的类遍历
            Class<?> clazz = entry.getValue().getClass();
            // 获取他所有方法
            Method[] methods = clazz.getDeclaredMethods();
            for (Method method : methods) {
                if (method.isAnnotationPresent(MqttTopic.class)) {
                    // 如果这个方法有 这个注解
                    MqttTopic handleTopic = method.getAnnotation(MqttTopic.class);
                    if (isMatch(receivedTopic, handleTopic.value())) {
                        // 并且 这个 topic 匹配成功
                        try {
                            method.invoke(SpringUtils.getBean(clazz), message);
                            return;
                        } catch (IllegalAccessException e) {
                            e.printStackTrace();
                        } catch (InvocationTargetException e) {
                            log.error("执行 {} 方法出现错误", handleTopic.value(), e);
                        }
                    }
                }
            }
        }
    }
  
      /**
     * mqtt 订阅的主题与我实际的主题是否匹配
     *
     * @param topic   是实际的主题
     * @param pattern 是我订阅的主题 可以是通配符模式
     * @return 是否匹配
     */
    public static boolean isMatch(String topic, String pattern) {
        if ((topic == null) || (pattern == null)) {
            return false;
        }
        if (topic.equals(pattern)) {
            // 完全相等是肯定匹配的
            return true;
        }
        if ("#".equals(pattern)) {
            // # 号代表所有主题  肯定匹配的
            return true;
        }
        String[] splitTopic = topic.split("/");
        String[] splitPattern = pattern.split("/");
        boolean match = true;
        // 如果包含 # 则只需要判断 # 前面的
        for (int i = 0; i < splitPattern.length; i++) {
            if (!"#".equals(splitPattern[i])) {
                // 不是# 号 正常判断
                if (i >= splitTopic.length) {
                    // 此时长度不相等 不匹配
                    match = false;
                    break;
                }
                if (!splitTopic[i].equals(splitPattern[i]) && !"+".equals(splitPattern[i])) {
                    // 不相等 且不等于 +
                    match = false;
                    break;
                }
            } else {
                // 是# 号  肯定匹配的
                break;
            }
        }
        return match;
    }
}

对应的MqttService注解和MqttTopic注解

import org.springframework.core.annotation.AliasFor;
import org.springframework.stereotype.Component;

import java.lang.annotation.*;

/**
 * 自定义注解:消息处理类
 */
@Documented
@Target({ElementType.TYPE})
@Retention(RetentionPolicy.RUNTIME)
@Component
public @interface MqttService {

    @AliasFor(
            annotation = Component.class
    )
    String value() default "";
}
import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;

/**
 * 自定义注解:topic处理方法
 */
@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
public @interface MqttTopic {

    /**
     * 主题名字
     */
    String value() default "";

}

MqttGateway 发送消息

import org.springframework.integration.annotation.MessagingGateway;
import org.springframework.integration.mqtt.support.MqttHeaders;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.stereotype.Component;

/**
 * MqttGateway
 */

@Component
@MessagingGateway(defaultRequestChannel = "mqttOutboundChannel")
public interface MqttGateway {

    void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, String data);

    void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, @Header(MqttHeaders.QOS) Integer qos, String data);
}

指定topic接收处理方法

/**
 * MqttTopicHandle  指定topic消息处理
 */
@MqttService
@Slf4j
@RequiredArgsConstructor
public class MqttTopicHandle {

	private final MqttGateway mqttGateway;

    /**
     * 上线通知
     *
     * @param message
     */
    @MqttTopic("mqtt/face/basic")
    public void basic(Message<?> message) throws MqttException {
        String receivedTopic = (String) message.getHeaders().get(MqttHeaders.RECEIVED_TOPIC);
        String payload = (String) message.getPayload();
        log.info("接收到的topic为:{},内容:{}", receivedTopic, payload );
       // 要回复当前主题,不回复不需要处理
        mqttGateway.sendToMqtt(topic, 0, "收到消息!");
    }
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/615750.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

基于Hive的数据应用实践总结

百分位数(percentile)计算 百分位数含义&#xff1a;统计学术语&#xff0c;如果将一组数据从小到大排序&#xff0c;并计算相应的累计百分位&#xff0c;则某一百分位所对应数据的值就称为这一百分位的百分位数。可表示为&#xff1a;一组n个观测值按数值大小排列。如&#x…

如何实现Java类隔离加载

一 什么是类隔离技术 只要你 Java 代码写的足够多&#xff0c;就一定会出现这种情况&#xff1a;系统新引入了一个中间件的 jar 包&#xff0c;编译的时候一切正常&#xff0c;一运行就报错&#xff1a;java.lang.NoSuchMethodError&#xff0c;然后就哼哧哼哧的开始找解决方法…

弹性盒子中的flex

flex属性是flex-grow&#xff0c;flex-shrink和flex-basis的缩写 flex是用在盒子中的子组件的&#xff0c;充分体现了弹性盒子的弹性二字。 例如现在的情况是&#xff1a; <div class"container"><div class"item1">Item1</div><d…

微信小程序码生成,扫码携带参数进入指定页面

一、准备工作 &#xff08;1&#xff09;微信小程序后台获取小程序的appId和secret 小程序后台管理&#xff08;开发管理➡开发设置&#xff09; &#xff08;2&#xff09;扫码跳转的页面在app.json中已经注册 注册的路径与传过去的路径一致 &#xff08;3&#xff09;小程序…

同步模式之犹豫模式Balking

tip: 作为程序员一定学习编程之道&#xff0c;一定要对代码的编写有追求&#xff0c;不能实现就完事了。我们应该让自己写的代码更加优雅&#xff0c;即使这会费时费力。 文章目录 一、同步模式之犹豫模式Balking二、代码样例三、优缺点 一、同步模式之犹豫模式Balking 同步模…

挤出泡沫、脱虚向实,AI大模型正在回归价值投资?

商品推荐、交通管理、生成文章、代码编程、电影特效制作……自ChatGPT横空出世以来&#xff0c;AIGC浪潮席卷全球&#xff0c;上下游产业链也因此大放异彩。 市场行情的高景气直观反映在股价上&#xff0c;无论AI公司是否盈利&#xff0c;其股价多呈上升趋势。一些与AI概念有所…

测试:用例篇

上一章讲述的是测试的基本概念。在我们开始做了一段时间基础测试&#xff0c;熟悉了业务之后&#xff0c;往往会 分配来写测试用例&#xff0c;并且在日常测试中&#xff0c;有时也需要补充测试用例到现有的案例库中 在开始之前先讲讲测试中经典的测试方法&#xff1a;黑盒测试…

【dc-dc】DC-DC恒流电源 车灯方案的应用

1,信息来源&#xff1a;深圳市世微半导体有限公司 Augus 2,产品描述 AP5103 是一款效率高&#xff0c;稳定可靠的 LED 灯恒流驱动控制芯片&#xff0c;内置高精度比较器&#xff0c;固定关断时间控制电路&#xff0c;恒流驱动电路等&#xff0c;特别适合大功率 LED 恒流驱动。…

[Kotlin] 玩Android代码学习之-模块化+Retrofit+协程+viewModel的数据层封装

文章目录 1:前言玩Android APP 源码本贴的目的参考贴 2: kotlin下的模块化(捎带一嘴)3:Retrofit协程viewModel3.1基础网络层搭建lib_home:Bannerlib_common: BaseResp lib_common:RetrofitManagerlib_home: HomeApi 3.2基础网络层接口测试3.3 基础网络层优化-koin依赖注入框架…

观澜最快的旧改项目之一,鸿荣源观城项目一期。

项目&#xff1a;观湖街道观城第一期城市更新单元位置&#xff1a;4号地铁观澜地铁站0距离 规模&#xff1a;拆除范围用地面积706094㎡ 面积&#xff1a;私信咨询价格&#xff1a;3.x万/平 开发商&#xff1a;鸿荣源 合同方案&#xff1a;直接开发商签合同 目前进度&#…

Scrapy 入门教程

Scrapy Engine(引擎): 负责Spider、ItemPipeline、Downloader、Scheduler中间的通讯&#xff0c;信号、数据传递等。 Scheduler(调度器): 它负责接受引擎发送过来的Request请求&#xff0c;并按照一定的方式进行整理排列&#xff0c;入队&#xff0c;当引擎需要时&#xff0c;…

【CANN训练营机器狗系列】安装ROS环境及初体验

环境 操作系统&#xff1a;Ubuntu 20.04 CPU&#xff1a;Intel Xeon Gold 6278C CPU 2.60GHz 内存&#xff1a;16GB 准备环境 Ubuntu与ROS版本对应关系 UbuntuROS 1.0ROS2.016.04 LTSKinetic LTSArdent18.04 LTSMelodic LTSDashing LTS120.04 LTSNoetic LTSFoxy LTS 安装…

linorobot机器人-自动生成-不可用

好像还是比较混乱。 具体信息参考其官网。 https://linorobot.org/ Linorobot是一套开源的ROS兼容机器人&#xff0c;旨在为学生、开发者和研究人员提供一个低成本的平台&#xff0c;以便在ROS&#xff08;机器人操作系统&#xff09;的基础上创建新的激动人心的应用。Linor…

VMware(Ubuntu)共享文件夹设置

VMware共享文件夹设置 安装完成ubuntu虚拟机后&#xff0c;需要建立共享文件夹来方便在Host主机和虚拟机ubuntu之间分享文件。 在虚拟机设置中&#xff0c;在 选项 卡中找到 共享文件夹 项&#xff0c;在右侧添加共享文件夹。 在虚拟机中&#xff0c;在ubuntu终端中使用指令…

Three.js camera初探——转场动画实现

背景 首先简单介绍一下three.js&#xff0c;three.js是用javascript写的基于webGL的第三方3D库&#xff0c;通过它可以在网页中进行3D建模&#xff0c;结合上TweenMax.js动画库&#xff0c;在网页中实现3D动画效果就变得很简单了。 这是three.js建模的简单流程图例&#xff1…

基于B/S架构springboot框架开发的中小学智慧校园平台源码

一、智慧校园技术框架&#xff1a; 1、使用springboot框架Javavue2 B/S架构 2、JAVA语言数据库MySQL5.7 3、移动端小程序使用小程序原生语言开发 4、电子班牌固件安卓7.1&#xff1b;使用Java Android原生 5、elmentui &#xff0c;Quartz&#xff0c;jpa&#xff0c;jwt …

实现 Linux 视频会议(源码,支持信创环境,银河麒麟,统信UOS)

信创是现阶段国家发展的重要战略之一&#xff0c;面对这一趋势&#xff0c;所有的软件应用只有支持信创国产化的基础软硬件设施&#xff0c;在未来才不会被淘汰。那么&#xff0c;可以使用C#来实现支持信创环境的视频会议系统吗&#xff1f;答案是肯定的。 本文讲述如何使用C#来…

django 快速入门

快速开始 安装Django 首先安装Django包&#xff0c;现在Django已经到了2.0版本&#xff0c;如果还在使用1.11请尽快升级。旧版本以后只修复bug&#xff0c;不会添加新功能。 pip install django 复制 创建项目 Django安装好之后&#xff0c;会附带一个命令行工具django-a…

uCOSii_任务栈检测和任务栈清除

1、任务栈检测和任务栈清除 在创建任务时&#xff0c;也需要设置OSTaskCreateExt()传入opt参数。 当opt (INT16U)(OS_TASK_OPT_STK_CLR | OS_TASK_OPT_STK_CHK)&#xff0c;可以使用OSTaskStkChk()检查的任务栈的剩余空间&#xff0c;也可以使用OS_TaskStkClr()清除任务栈。 …

使用Graalvm+Swing搓了个原生桌面应用的轮子:文件差异对比工具,附轮子源码

文章目录 1、DFDiff介绍2、软件架构3、安装教程3.1、编译为jar包运行3.2、编译为原生应用运行 4、运行效果图5、项目源码地址 1、DFDiff介绍 当前已实现的功能比较两个文件夹内的文件差异&#xff0c;已支持文件差异对比。 2、软件架构 软件架构说明 开发环境是在OpenJDK17&…