MQTT协议-EMQX技术文档-spring-boot整合使用--发送接收-消费

news2025/1/12 1:58:38

概念先行:

mqtt与mq的关系

MQTT(Message Queuing Telemetry Transport)是一种基于发布/订阅模式的通信协议,它与MQ(Message Queue,消息队列)有一定的关联,但二者并不完全相同。

MQTT是一种轻量级的通信协议,专门为在物联网(IoT)设备之间的消息传递而设计。它运行在TCP协议之上,以“发布-订阅”模式进行消息传递。在这种模式中,发布者将消息发布到特定的主题(topic)中,而订阅者则订阅这些主题以获取消息。

另一方面,MQ是一种更为通用的消息队列技术,它可以支持多种不同的消息传递协议,包括MQTT、AMQP(Advanced Message Queuing Protocol)、STOMP(Streaming Text Oriented Message Protocol)等。MQ可以提供更为复杂和灵活的消息传递模式,包括点对点、发布/订阅等。

尽管MQTT和MQ是不同的概念,但它们可以在实际应用中进行结合。例如,你可以使用MQTT协议将消息发送到MQ中,然后使用MQ的其他功能对这些消息进行处理和传递。

总之,MQTT是一种通信协议,而MQ是一种消息队列技术,二者虽然有关联,但概念不同。

EMQX在本次技术中担任什么角色?解决什么问题?

1、在技术中担任什么角色?

        在物联网中,EMQX使用MQTT协议来担任消息传递的角色。具体来说,EMQX是一种MQTT broker的实现,它充当了MQTT网络中的服务器,负责接收和转发客户端发布的MQTT消息。

MQTT是一种轻量级的发布/订阅模式的通信协议,非常适合在物联网设备之间进行消息传递。通过使用MQTT协议,设备可以发布和订阅不同的主题,从而实现设备间的信息交流和数据传输。

在技术中,EMQX作为MQTT broker,它的主要职责是接收来自客户端的MQTT连接请求,处理并转发客户端发布的MQTT消息。同时,EMQX也负责处理客户端的订阅请求,将订阅的主题与相应的客户端进行关联,确保只有订阅了特定主题的客户端才能接收到相关的消息。

除了基本的消息传递功能,EMQX还提供了许多其他的高级功能,例如安全性和认证、QoS(Quality of Service)控制、持久化存储等。这些功能进一步增强了MQTT在物联网中的应用价值和实用性。

        EMQX在物联网中担任的角色是MQTT broker,负责实现MQTT协议的消息传递功能,并提供额外的功能和特性以满足物联网应用的需求。

2、解决了什么问题?

在物联网中,使用EMQX和MQTT协议可以解决以下问题:

  1. 设备连接和通信:EMQX可以连接大量的物联网设备,并支持它们之间的通信。这使得设备可以相互传递信息,进行数据交换和协同工作。
  2. 数据采集和监控:通过EMQX,可以对大量设备进行数据采集和监控,实时获取设备的工作状态和运行数据。这有助于及时发现问题并进行处理,同时也可以进行远程监控和控制。
  3. 实时性通知:使用MQTT的发布/订阅模式,设备可以及时接收和响应其他设备发布的信息。这可以实现实时性的通知和提醒,例如在设备出现故障或异常时向管理员发送警报。
  4. 能耗优化:由于MQTT协议的轻量级特性,使用EMQX进行消息传递可以降低设备的能耗。这尤其适用于电池供电的物联网设备,可以延长其使用寿命。
  5. 数据持久化:EMQX支持将数据存储在本地或远程数据库中,实现数据的持久化存储。这有助于对历史数据进行查询和分析,支持决策和预测。

        使用EMQX和MQTT协议可以解决物联网中的设备连接、数据采集、实时通知、能耗优化和数据持久化等问题,为物联网应用的开发和实施提供有力的支持。

        因为使用了这个类似于mq的中间键,方便了服务器去获取信息和处理信息。

准备工作:

        准备一台安装部署好的emqx服务器,搭建文章如下。

MQTT协议--技术文档--搭建mqtt服务器--《EMQX单体服务器部署》_一单成的博客-CSDN博客

开始代码demo演示

注意:文章中描述了两种角色

        1、发布者

        2、订阅者

        发布者和订阅者都是根据主题来发送和存储消息的。他们两个并不知道彼此。并且根据实际的业务逻辑有的终端可以既是一些信息的发布者,也可以是一些主题的订阅者。用来拿取和接收信息。

订阅者代码demo演示:

订阅者代码结构展示:

配置文件展示:

pom文件依赖

  <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <!-- mqtt -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-integration</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.integration</groupId>
            <artifactId>spring-integration-stream</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.integration</groupId>
            <artifactId>spring-integration-mqtt</artifactId>
        </dependency>
        <!--配置文件报错问题-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-configuration-processor</artifactId>
            <optional>true</optional>
        </dependency>
        <!--lombok-->
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <version>1.18.22</version>
            <scope>provided</scope>
        </dependency>

    </dependencies>

 application.yml文件配置展示

# 服务器相关

server:
  port: 10002

spring:
  main:
    allow-circular-references: true
  application:
    name: dispatcher

mqtt:
  hostUrl: tcp://ip:1883
  username: dev
  password: dev
  client-id: MQTT-CLIENT-DEV
  cleanSession: true
  reconnect: true
  timeout: 100
  keepAlive: 100
  defaultTopic: server/dev/report
  serverTopic: server/dev/report
  isOpen: true
  qos: 0

配置文件详解:

  1. server: - 开始配置服务器相关的关键字
  2. port: 10002 - 服务器监听的端口号。这里配置为10002。
  3. spring: - Spring框架相关的配置开始。
  4. main: - Spring的主配置,用于配置Spring应用程序的主要属性。
  5. allow-circular-references: true - 允许循环引用。当Bean之间存在循环依赖时,设置为true可以解决循环依赖问题。
  6. application: - 应用程序相关的配置开始。
  7. name: dispatcher - 应用程序的名称,这里配置为dispatcher。
  8. mqtt: - MQTT相关的配置开始。
  9. hostUrl: tcp://ip:1883 - MQTT服务器的主机URL,表示MQTT消息将通过TCP协议发送到指定的IP地址和端口。这里配置为tcp://ip:1883,表示连接到运行在IP地址上的MQTT服务器,端口号为1883。
  10. username: dev - MQTT客户端的用户名,用于身份验证。这里配置为dev。
  11. password: dev - MQTT客户端的密码,用于身份验证。这里配置为dev。
  12. client-id: MQTT-CLIENT-DEV - MQTT客户端的唯一标识符。这里配置为MQTT-CLIENT-DEV。
  13. cleanSession: true - MQTT会话的清理标志。设置为true表示在连接关闭时清除会话中的所有消息。
  14. reconnect: true - MQTT客户端的重连标志。设置为true表示在连接断开时自动尝试重新连接MQTT服务器。
  15. timeout: 100 - MQTT客户端的超时时间,单位为毫秒。这里配置为100毫秒。
  16. keepAlive: 100 - MQTT客户端的心跳保持时间,单位为毫秒。这里配置为100毫秒。
  17. defaultTopic: server/dev/report - MQTT客户端的默认主题,用于接收消息。这里配置为server/dev/report。
  18. serverTopic: server/dev/report - 服务器端发布消息的主题。这里配置为server/dev/report。
  19. isOpen: true - 是否开启MQTT功能,这里配置为true表示开启MQTT功能。
  20. qos: 0 - MQTT消息的QoS(服务质量)等级,这里配置为0表示最多一次的传输保障。

 回调实现

 

package com.adn.callback.Impl;


import com.adn.client.MqttAcceptClient;
import com.adn.common.config.MqttProperties;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallbackExtended;

import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import java.io.UnsupportedEncodingException;

/**
 * @Description : MQTT接受服务的回调类
 * @Author : adn
 */
@Component
public class MqttAcceptCallback implements MqttCallbackExtended {

    private static final Logger logger = LoggerFactory.getLogger(MqttAcceptCallback.class);

    @Autowired
    private MqttAcceptClient mqttAcceptClient;

    @Autowired
    private MqttProperties mqttProperties;

    /**
     * 客户端断开后触发
     *
     * @param throwable
     */
    @Override
    public void connectionLost(Throwable throwable) {
        logger.info("连接断开,可以重连");
        if (MqttAcceptClient.client == null || !MqttAcceptClient.client.isConnected()) {
            logger.info("【emqx重新连接】....................................................");
            mqttAcceptClient.reconnection();
        }
    }

    /**
     * 客户端收到消息触发
     *
     * @param topic       主题
     * @param mqttMessage 消息
     */
    @Override
    public void messageArrived(String topic, MqttMessage mqttMessage) throws Exception {
        logger.info("【接收消息主题】:" + topic);
        logger.info("【接收消息Qos】:" + mqttMessage.getQos());
        logger.info("【接收消息内容】:" + new String(mqttMessage.getPayload()));
        //        int i = 1/0;
    }

    /**
     * 发布消息成功
     *
     * @param token token
     */
    @Override
    public void deliveryComplete(IMqttDeliveryToken token) {
        String[] topics = token.getTopics();
        for (String topic : topics) {
            logger.info("向主题【" + topic + "】发送消息成功!");
        }
        try {
            MqttMessage message = token.getMessage();
            byte[] payload = message.getPayload();
            String s = new String(payload, "UTF-8");
            logger.info("【消息内容】:" + s);
        } catch (Exception e) {
            logger.error("MqttAcceptCallback deliveryComplete error,message:{}", e.getMessage());
            e.printStackTrace();
        }
    }

    /**
     * 连接emq服务器后触
     */
    @Override
    public void connectComplete(boolean b, String s) {
        logger.info("============================= 客户端【" + MqttAcceptClient.client.getClientId() + "】连接成功!=============================");
        // 以/#结尾表示订阅所有以test开头的主题
        // 订阅所有机构主题
        mqttAcceptClient.subscribe(mqttProperties.getDefaultTopic(), 0);
    }
}

客户端

package com.adn.client;

import com.adn.callback.Impl.MqttAcceptCallback;
import com.adn.common.config.MqttProperties;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

/**
 * @Description : MQTT接受服务的客户端
 * @Author : adn
 */
@Component
public class MqttAcceptClient {

    private static final Logger logger = LoggerFactory.getLogger(MqttAcceptClient.class);

    @Autowired
    private MqttAcceptCallback mqttAcceptCallback;

    @Autowired
    private MqttProperties mqttProperties;

    public static MqttClient    client;

    private static MqttClient getClient() {
        return client;
    }

    private static void setClient(MqttClient client) {
        MqttAcceptClient.client = client;
    }

    /**
     * 客户端连接
     */
    public void connect() {
        MqttClient client;
        try {
            client = new MqttClient(mqttProperties.getHostUrl(), mqttProperties.getClientId(),
                    new MemoryPersistence());
            MqttConnectOptions options = new MqttConnectOptions();
            options.setUserName(mqttProperties.getUsername());
            options.setPassword(mqttProperties.getPassword().toCharArray());
            options.setConnectionTimeout(mqttProperties.getTimeout());
            options.setKeepAliveInterval(mqttProperties.getKeepAlive());
            options.setAutomaticReconnect(mqttProperties.getReconnect());
            options.setCleanSession(mqttProperties.getCleanSession());
            MqttAcceptClient.setClient(client);
            // 设置回调
            client.setCallback(mqttAcceptCallback);
            client.connect(options);
        } catch (Exception e) {
            logger.error("MqttAcceptClient connect error,message:{}", e.getMessage());
            e.printStackTrace();
        }
    }

    /**
     * 重新连接
     */
    public void reconnection() {
        try {
            client.connect();
        } catch (MqttException e) {
            logger.error("MqttAcceptClient reconnection error,message:{}", e.getMessage());
            e.printStackTrace();
        }
    }

    /**
     * 订阅某个主题
     *
     * @param topic 主题
     * @param qos   连接方式
     */
    public void subscribe(String topic, int qos) {
        logger.info("========================【开始订阅主题:" + topic + "】========================");
        try {
            client.subscribe(topic, qos);
        } catch (MqttException e) {
            logger.error("MqttAcceptClient subscribe error,message:{}", e.getMessage());
            e.printStackTrace();
        }
    }

    /**
     * 取消订阅某个主题
     *
     * @param topic
     */
    public void unsubscribe(String topic) {
        logger.info("========================【取消订阅主题:" + topic + "】========================");
        try {
            client.unsubscribe(topic);
        } catch (MqttException e) {
            logger.error("MqttAcceptClient unsubscribe error,message:{}", e.getMessage());
            e.printStackTrace();
        }
    }
}

使用配置

MqttCondition

package com.adn.common.config.Impl;

import org.springframework.beans.factory.config.ConfigurableListableBeanFactory;
import org.springframework.context.annotation.Condition;
import org.springframework.context.annotation.ConditionContext;
import org.springframework.core.env.Environment;
import org.springframework.core.type.AnnotatedTypeMetadata;

/**
 * @Description : 自定义配置,通过这个配置,来控制启动项目的时候是否启动mqtt
 * @Author : adn
 */
public class MqttCondition implements Condition {

    @Override
    public boolean matches(ConditionContext context, AnnotatedTypeMetadata annotatedTypeMetadata) {
        //1、能获取到ioc使用的beanfactory
        ConfigurableListableBeanFactory beanFactory = context.getBeanFactory();
        //2、获取类加载器
        ClassLoader classLoader = context.getClassLoader();
        //3、获取当前环境信息
        Environment environment = context.getEnvironment();
        String isOpen = environment.getProperty("mqtt.isOpen");
        return Boolean.valueOf(isOpen);
    }
}

 MqttConfig

package com.adn.common.config;


import com.adn.client.MqttAcceptClient;
import com.adn.common.config.Impl.MqttCondition;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Conditional;
import org.springframework.context.annotation.Configuration;

/**
 * @Description : 启动服务的时候开启监听客户端
 * @Author : adn
 */
@Configuration
public class MqttConfig {

    @Autowired
    private MqttAcceptClient mqttAcceptClient;

    /**
     * 订阅mqtt
     *
     * @return
     */
    @Conditional(MqttCondition.class)
    @Bean
    public MqttAcceptClient getMqttPushClient() {
        mqttAcceptClient.connect();
        return mqttAcceptClient;
    }
}

MqttProperties

package com.adn.common.config;


import lombok.Data;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.stereotype.Component;

/**
 *  MQTT配置信息
 * @Author : adn
 */
@Component
@ConfigurationProperties("mqtt")
@Data
public class MqttProperties {

    /**
     * 用户名
     */
    private String  username;

    /**
     * 密码
     */
    private String  password;

    /**
     * 连接地址
     */
    private String  hostUrl;

    /**
     * 客户端Id,同一台服务器下,不允许出现重复的客户端id
     */
    private String  clientId;

    /**
     * 默认连接主题,以/#结尾表示订阅所有以test开头的主题
     */
    private String  defaultTopic;

    /**
     * 默认服务器发送主题前缀,格式:server:${env}:report:${topic}
     */
    private String  serverTopic;

    /**
     * 超时时间
     */
    private int     timeout;

    /**
     * 设置会话心跳时间 单位为秒 服务器会每隔1.5*20秒的时间向客户端
     * 发送个消息判断客户端是否在线,但这个方法并没有重连的机制
     */
    private int     keepAlive;

    /**
     * 设置是否清空session,这里如果设置为false表示服务器会保留客户端的连
     * 接记录,这里设置为true表示每次连接到服务器都以新的身份连接
     */
    private Boolean cleanSession;

    /**
     * 是否断线重连
     */
    private Boolean reconnect;

    /**
     * 启动的时候是否关闭mqtt
     */
    private Boolean isOpen;

    /**
     * 连接方式
     */
    private Integer qos;

    /**
     * 获取默认主题,以/#结尾表示订阅所有以test开头的主题
     * @return
     */
    public String getDefaultTopic() {
        return defaultTopic + "/#";
    }

    /**
     * 获取服务器发送主题,格式:server/${env}/report/${topic}
     * @param topic
     * @return
     */
    public String getServerTopic(String topic) {
        return serverTopic + "/" + topic;
    }
}

启动测试

 查看可视化面板,成功创建连接。

如果需要更换订阅的主题可以在配置文件中更换以及在建立连接的时候更换。

发布者代码demo展示

发布者的配置pom和application.yml文件中内容与上面的订阅者一样。参考上面即可

代码结构展示:

 回调类

package com.adn.callback.Impl;


import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallbackExtended;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;

/**
 * @Description : MQTT发送客户端的回调类
 * @Author : adn
 */
@Component
public class MqttSendCallBack implements MqttCallbackExtended {

    private static final Logger logger = LoggerFactory.getLogger(MqttSendCallBack.class);

    /**
     * 客户端断开后触发
     *
     * @param throwable
     */
    @Override
    public void connectionLost(Throwable throwable) {
        logger.info("连接断开,可以重连");
    }

    /**
     * 客户端收到消息触发
     *
     * @param topic       主题
     * @param mqttMessage 消息
     */
    @Override
    public void messageArrived(String topic, MqttMessage mqttMessage) throws Exception {
        logger.info("【接收消息主题】: " + topic);
        logger.info("【接收消息Qos】: " + mqttMessage.getQos());
        logger.info("【接收消息内容】: " + new String(mqttMessage.getPayload()));
    }

    /**
     * 发布消息成功
     *
     * @param token token
     */
    @Override
    public void deliveryComplete(IMqttDeliveryToken token) {
        String[] topics = token.getTopics();
        for (String topic : topics) {
            logger.info("向主题【" + topic + "】发送消息成功!");
        }
        try {
            MqttMessage message = token.getMessage();
            byte[] payload = message.getPayload();
            String s = new String(payload, "UTF-8");
            logger.info("【消息内容】:" + s);
        } catch (Exception e) {
            logger.error("MqttSendCallBack deliveryComplete error,message:{}", e.getMessage());
            e.printStackTrace();
        }
    }

    /**
     *
     * 连接emq服务器后触发
     *
     * @param b
     * @param s
     */

    @Override
    public void connectComplete(boolean b, String s) {
//        logger.info("============================= 客户端【" + MqttAcceptClient.client.getClientId() + "】连接成功!=============================");
    }



}

发送客户端

package com.adn.client;


import com.adn.callback.Impl.MqttSendCallBack;
import com.adn.common.config.MqttProperties;
import org.eclipse.paho.client.mqttv3.*;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import java.util.UUID;

/**
 * @Description : MQTT发送客户端
 * @Author : adn
 */
@Component
public class MqttSendClient {

    private static final Logger logger       = LoggerFactory.getLogger(MqttSendClient.class);

    @Autowired
    private MqttSendCallBack mqttSendCallBack;

    @Autowired
    private MqttProperties mqttProperties;

    public MqttClient connect() {
        MqttClient client = null;
        try {
            String uuid = UUID.randomUUID().toString().replaceAll("-", "");
            client = new MqttClient(mqttProperties.getHostUrl(), uuid, new MemoryPersistence());
            MqttConnectOptions options = new MqttConnectOptions();
            options.setUserName(mqttProperties.getUsername());
            options.setPassword(mqttProperties.getPassword().toCharArray());
            options.setConnectionTimeout(mqttProperties.getTimeout());
            options.setKeepAliveInterval(mqttProperties.getKeepAlive());
            options.setCleanSession(true);
            options.setAutomaticReconnect(false);
            // 设置回调
            client.setCallback(mqttSendCallBack);
            client.connect(options);
        } catch (Exception e) {
            logger.error("MqttSendClient connect error,message:{}", e.getMessage());
            e.printStackTrace();
        }
        return client;
    }

    /**
     * 发布消息
     *
     * @param retained 是否保留
     * @param topic 主题,格式: server:${env}:report:${topic}
     * @param content 消息内容
     */
    public void publish(boolean retained, String topic, String content) {
        MqttMessage message = new MqttMessage();
        message.setQos(mqttProperties.getQos());
        message.setRetained(retained);
        message.setPayload(content.getBytes());
        MqttDeliveryToken token;
        MqttClient mqttClient = connect();
        try {
            mqttClient.publish(mqttProperties.getServerTopic(topic), message);
        } catch (MqttException e) {
            logger.error("MqttSendClient publish error,message:{}", e.getMessage());
            e.printStackTrace();
        } finally {
            disconnect(mqttClient);
            close(mqttClient);
        }
    }

    /**
     * 关闭连接
     *
     * @param mqttClient
     */
    public static void disconnect(MqttClient mqttClient) {
        try {
            if (mqttClient != null){
                mqttClient.disconnect();
            }
        } catch (MqttException e) {
            logger.error("MqttSendClient disconnect error,message:{}", e.getMessage());
            e.printStackTrace();
        }
    }

    /**
     * 释放资源
     *
     * @param mqttClient
     */
    public static void close(MqttClient mqttClient) {
        try {
            if (mqttClient != null) {
                mqttClient.close();
            }
        } catch (MqttException e) {
            logger.error("MqttSendClient close error,message:{}", e.getMessage());
            e.printStackTrace();
        }
    }
}

配置文件以及配置类

MqttProperties

package com.adn.common.config;


import lombok.Data;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.stereotype.Component;

/**
 *  MQTT配置信息
 * @Author : adn
 */
@Component
@ConfigurationProperties("mqtt")
@Data
public class MqttProperties {

    /**
     * 用户名
     */
    private String  username;

    /**
     * 密码
     */
    private String  password;

    /**
     * 连接地址
     */
    private String  hostUrl;

    /**
     * 客户端Id,同一台服务器下,不允许出现重复的客户端id
     */
    private String  clientId;

    /**
     * 默认连接主题,以/#结尾表示订阅所有以test开头的主题
     */
    private String  defaultTopic;

    /**
     * 默认服务器发送主题前缀,格式:server:${env}:report:${topic}
     */
    private String  serverTopic;

    /**
     * 超时时间
     */
    private int     timeout;

    /**
     * 设置会话心跳时间 单位为秒 服务器会每隔1.5*20秒的时间向客户端
     * 发送个消息判断客户端是否在线,但这个方法并没有重连的机制
     */
    private int     keepAlive;

    /**
     * 设置是否清空session,这里如果设置为false表示服务器会保留客户端的连
     * 接记录,这里设置为true表示每次连接到服务器都以新的身份连接
     */
    private Boolean cleanSession;

    /**
     * 是否断线重连
     */
    private Boolean reconnect;

    /**
     * 启动的时候是否关闭mqtt
     */
    private Boolean isOpen;

    /**
     * 连接方式
     */
    private Integer qos;

    /**
     * 获取默认主题,以/#结尾表示订阅所有以test开头的主题
     * @return
     */
    public String getDefaultTopic() {
        return defaultTopic + "/#";
    }

    /**
     * 获取服务器发送主题,格式:server/${env}/report/${topic}
     * @param topic
     * @return
     */
    public String getServerTopic(String topic) {
        return serverTopic + "/" + topic;
    }
}

MqttCondition

package com.adn.common.config.Impl;


import org.springframework.beans.factory.config.ConfigurableListableBeanFactory;
import org.springframework.context.annotation.Condition;
import org.springframework.context.annotation.ConditionContext;
import org.springframework.core.env.Environment;
import org.springframework.core.type.AnnotatedTypeMetadata;

/**
 * @Description : 自定义配置,通过这个配置,来控制启动项目的时候是否启动mqtt
 * @Author : adn
 * 0 16:32
 */
public class MqttCondition implements Condition {

    @Override
    public boolean matches(ConditionContext context, AnnotatedTypeMetadata annotatedTypeMetadata) {
        //1、能获取到ioc使用的beanfactory
        ConfigurableListableBeanFactory beanFactory = context.getBeanFactory();
        //2、获取类加载器
        ClassLoader classLoader = context.getClassLoader();
        //3、获取当前环境信息
        Environment environment = context.getEnvironment();
        String isOpen = environment.getProperty("mqtt.isOpen");
        return Boolean.valueOf(isOpen);
    }
}

测试的controller

package com.adn.controller;




import com.adn.client.MqttSendClient;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

/**
 * @Description : 测试类
 * @Author : adn
 */
@RestController
@RequestMapping("/mqtt")
public class MqttController {

    @Autowired
    private MqttSendClient mqttSendClient;

    @GetMapping(value = "/publishTopic")
    public String publishTopic(String topic, String sendMessage) {
        topic = "client/dev/report/test";
        System.out.println("topic:" + topic);
        System.out.println("message:" + sendMessage);
        for (int i = 0; i < 10000; i++) {
            this.mqttSendClient.publish(false, topic, sendMessage);
        }
        return "topic:" + topic + "message:" + sendMessage;
    }

}

结尾:

         总结:只要保证你的订阅者的订阅主题和发布者的发布上去的主题一致,就可以监听到

        注意:本发布者使用的是短连接,如果需要长连接直接使用订阅者的配置文件、以及配置类就可以。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/889703.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

空气IT

现代社会中&#xff0c;空气质量成为了人们关注的焦点之一。随着工业化的发展&#xff0c;汽车尾气、工厂排放和燃煤等行为导致城市空气污染日益严重&#xff0c;给人们的健康和生活质量带来了极大的威胁。 首先&#xff0c;空气污染对人体健康造成了严重的危害。空气中的颗粒…

费米问题:如何估算?

解答费米问题有两个重要的思想&#xff1a; 第一个是“逻辑树法”&#xff1a; 用逻辑拆解把一个大的问题拆分为几个小问题&#xff0c;大问题是未知的&#xff0c;而拆解后的小问题是可以通过经验和逻辑推算得出的。最后再把得到解答的小问题反推到大问题上&#xff0c;用已…

HTML基础 知识点总结

从这篇笔记开始总结看过的《从0到1 HTMLCSSJavaScript》书籍笔记&#xff0c;记录HTML以及CSS的相关知识点&#xff0c;为之后从事相关工作打好基础 简单介绍 基本标签文本列表表格 一.简单介绍 HTML&#xff1a;超文本标记语言&#xff0c;HTML是一门描述性的标记语言CSS&a…

Visual Components 专业版功能介绍 衡祖仿真

Visual Components专业版Professional 版本包括Visual Components精华版Essentials 中所有的功能&#xff0c;并提供您用于建模和创建自己的组件的工具。 Visual Components专业版功能 1、GEOMETRY SIMPLIFICATION 几何体简化 通过简化和删除&#xff08;CAD&#xff09;模型…

如何快速的合并多个PPT使之成为一个PPT?

如何快速的合并多个PPT使之成为一个PPT&#xff1f; 项目过程中&#xff0c;经常给客户汇报&#xff0c;经常做PPT&#xff0c;有时候&#xff0c;需要把之前的ppt内容整合到新的内容中&#xff0c;如何快速合并以及使用呢&#xff1f; 幻灯片&#xff08;PPT中&#xff09;点…

微信开发之一键撤回消息的技术实现

撤回消息 请求URL&#xff1a; http://域名地址/revokeMsg 请求方式&#xff1a; POST 请求头Headers&#xff1a; Content-Type&#xff1a;application/jsonAuthorization&#xff1a;login接口返回 参数&#xff1a; 参数名必选类型说明wId是string登录实例标识wcId…

excel统计函数篇2之count系列

1、COUNT(value1,[value2],…):计算参数列表中数字的个数 2、COUNTA(value1,[value2],…)&#xff1a;计算参数列表中值的个数 联想在excel之数学函数、excel中的通配符一文中提到求和函数&#xff1a; SUMIF(range,ceriteria,[sum_range])&#xff1a;对范围内符合指定条件的…

H5: div与textarea输入框的交互(聚焦、失去焦点、键盘收起)

简介 本文是基于 VUE3TS 的代码说明。 记录自己遇到的 div 与 textarea 输入框交互的聚焦、失去焦点、键盘收起、表情插入不失去焦点的需求实现。 需求分析 1.固定在页面底部&#xff1b; 2.默认显示纯文字与发送图标按钮&#xff0c;文字超出的省略显示&#xff1b; 3.点击…

QT多屏显示程序

多屏显示的原理其实很好理解&#xff0c;就拿横向扩展来说&#xff1a; 计算机把桌面的 宽度扩展成了 w1&#xff08;屏幕1的宽度&#xff09; w2(屏幕2的宽度) 。 当一个窗口的起始横坐标 > w1&#xff0c;则 他就被显示在第二个屏幕上了。 多屏虚拟成一个桌面 qt的说明…

React+Typescript使用接口泛型处理props

好 刚讲完组件 那么 这次 我们来看一下 数据传递的 props 还是上文的案例 例如 我们想将 title 传给Hello组件 之前我们可以直接这样 以一个标签属性的形式传过去 而我们在子组件中 这样去使用 但现在 我们从编辑器中都可以看出 这种写法已经不行了 然后 我们将 hello 组件…

【Swagger】只需要3步搭建Swagger环境,就可以让你的项目实现Swagger在线文档,实时浏览,修改展示

目录 1. pom.xml文件中添加Swagger的jar包 2. 配置Swagger 3. 项目启动中加入Swagger注解的开关&#xff0c;启动Swagger功能 4. 启动项目&#xff0c;查看效果 Swagger 的功能这里就不多说明了&#xff0c;相信大家都懂的&#xff0c;好奇多问一句&#xff0c;大家有知道其…

Python文件操作教程,Python文件操作笔记

文件的打开与关闭 想一想&#xff1a; 如果想用word编写一份简历&#xff0c;应该有哪些流程呢&#xff1f; 打开word软件&#xff0c;新建一个word文件写入个人简历信息保存文件关闭word软件 同样&#xff0c;在操作文件的整体过程与使用word编写一份简历的过程是很相似的…

数据科学家需要掌握的Docker要点

大家好&#xff0c;Python以及pandas和scikit-learn等Python数据分析和机器学习库套件可以帮助你轻松开发数据科学应用程序。然而Python中的依赖性管理是一项挑战&#xff0c;在进行数据科学项目时&#xff0c;需要花费大量时间安装各种库&#xff0c;并跟踪正在使用的库的版本…

linux系统服务学习(八)DNS域名系统配置

文章目录 DNS域名管理系统一、DNS概述1、DNS系统概述☆ DNS的正向解析☆ DNS的反向解析☆ 根域&#xff08;.&#xff09;☆ 一级域名<顶级域|国家域>☆ 二级域名(自己购买管理)☆ 域名机构 2、DNS工作原理3、dig工具使用 二、DNS服务器的搭建1、DNS服务器端软件2、DNS服…

运行软件mfc140u.dll丢失怎么办?mfc140u.dll的三个修复方法

最近我在使用一款软件时遇到了一个问题&#xff0c;提示缺少mfc140u.dll文件。。这个文件是我在使用某个应用程序时所需要的&#xff0c;但是由于某种原因&#xff0c;它变得无法正常使用了。经过一番搜索和了解&#xff0c;我了解到mfc140u.dll是Microsoft Visual Studio 2015…

关于openfeign调用时content-type的问题

问题1描述&#xff1a; 今天在A服务使用openfeign调用B服务的时候&#xff0c;发现经常会偶发性报错。错误如下&#xff1a; 情况为偶发&#xff0c;很让人头疼。 两个接口如下&#xff1a; A服务接口&#xff1a; delayReasonApi.test(student);就是使用openfeign调用B服务的…

计组 | DMA

前言 记录一些计组相关联的题集与知识点&#xff0c;方便记忆与理解。 DMA 采用DMA方式传送数据时&#xff0c;每传送一个数据就要用一个&#xff08; C&#xff09;时间。 A 指令周期 B 机器周期 C 存储周期 D 总线周期发…

macOS(m1/m2)破解Sublime Text和Navicat16

破解Sublime Text 说明&#xff1a;全程使用的是终端操作 1. 下载Sublime Text&#xff0c;建议使用brew下载 2. 进入到下载的app的文件夹 cd "/Applications/Sublime Text.app/Contents/MacOS/"3. 执行以下操作以确认版本是否匹配 md5 -q sublime_text | grep -i…

分析区域产业发展现状,谋划产业发展路径,提升产业竞争力

随着经济全球化的深入发展&#xff0c;产业与区域经济发展有着不可分割的关系&#xff0c;产业是区域经济发展的基础&#xff0c;产业链的形成可以促进区域经济的协调发展&#xff0c;产业竞争力的提升能够带动区域经济的增长。那么该如何打造区域产业链闭环&#xff0c;提升产…

如何将labelImg打包成exe

最近整理一下数据标注这块的内容&#xff0c;在目标检测和目标分割里面用的最多的标注工具labelimg&#xff0c;labelme labelimg主要用于目标检测领域制作自己的数据集&#xff0c;如&#xff1a;YOLO系列目标检测模型 labelme主要用于图像分割领域制作自己的数据集&#xf…