ActiveMQ + MQTT 集群搭建(虚机版本) + Springboot使用配置

news2025/1/18 10:41:34

文章目录

  • 前言
  • 一、ActiveMQ、 MQTT是什么?
    • 1.ActiveMQ介绍
    • 2.MQTT介绍
  • 二、集群搭建步骤
    • 1.下载apache-activemq-5.15.12-bin.tar.gz
    • 2.上传apache-activemq-5.15.12-bin.tar.gz到服务器并解压文件到文件夹clusters、master、slave三个文件夹下面形成三个节点(平等节点)
    • 3.分别修改三个节点的配置文件:activemq.xml,修改连接协议端口号和配置网络代理,修改如下:
    • 4.分别修改三个节点的WEB配置文件:jetty.xml,修改端口号,修改如下:
    • 5.分别到bin目录下运行activemq文件启动activemq,命令如下:
    • 6.全部节点启动后,进入web端界面查询集群配置情况:
  • 三、项目连接配置步骤
    • 1.Activemq连接服务配置:
    • 2.MQTT生产者连接配置:
    • 3.MQTT消费者连接配置:
    • 4.项目使用:
  • 总结


前言

随着技术的不断迭代,在分布式系统中应用消息组件进行通信已经是非常常见的方式,而为了保障消息中间件的高可用性就需要对中间件进行集群化部署,这是应用程序发展的必经之路。


一、ActiveMQ、 MQTT是什么?

1.ActiveMQ介绍

ActiveMQ官网
ActiveMQ是一个开源的、基于Java的消息中间件(Message Oriented Middleware,MOM)实现。它提供了可靠的异步消息传递的功能,用于在分布式系统中进行应用程序之间的通信。

以下是ActiveMQ的一些主要特点和功能:

1.1、 异步消息传递:ActiveMQ支持发布-订阅和点对点模式的消息传递。应用程序可以通过发送和接收消息来进行异步通信。

1.2、持久化和持久订阅:ActiveMQ可以将消息持久化到磁盘,以确保即使在消息发送者和接收者之间的断开连接或重启后,消息也能被正确接收。

1.3、 多种消息传递模式:ActiveMQ支持多种消息传递模式,包括点对点队列、主题订阅和点对点回复等。

1.4、基于JMS标准:ActiveMQ完全支持Java消息服务(JMS)规范,是JMS的一种实现。JMS提供了一系列的API和协议,用于在Java应用程序之间进行消息传递。

1.5、高可用性和故障转移:ActiveMQ支持故障转移和高可用性,可以通过配置多个broker实现自动故障转移和消息备份。

1.6、多种协议支持:ActiveMQ支持多种协议,如AMQP、STOMP、OpenWire、MQTT等。这使得ActiveMQ可以与不同的客户端和应用程序进行集成和通信。

1.7、 插件体系结构:ActiveMQ具有可扩展的插件体系结构,允许开发人员根据需求添加自定义功能和扩展。

1.8、可视化管理工具:ActiveMQ提供了可视化的管理界面,用于监控和管理消息队列、主题、连接等。

作为一种成熟而强大的消息中间件解决方案,ActiveMQ被广泛用于构建可靠的分布式系统、实现异步通信、实现解耦和提高应用程序的可伸缩性等场景。

2.MQTT介绍

MQTT(Message Queuing Telemetry Transport)是一种轻量级、开放、简单的消息传输协议,专门针对物联网(IoT)领域设计。它具有低带宽和低功耗的特点,适用于在资源受限的设备上进行可靠的通信。

以下是MQTT协议的一些关键特点:

1.1、 轻量级:MQTT协议设计简单,通信报文开销小,传输数据量较小,适用于带宽有限的网络环境,能够满足物联网设备的资源限制。

1.2、发布/订阅模式:MQTT采用发布/订阅模式,包含发送消息的发布者和接收消息的订阅者。发布者将消息发布到特定的主题上,而订阅者通过订阅感兴趣的主题来接收消息。

1.3、QoS支持:MQTT支持三种不同的服务质量(QoS)级别:QoS 0(至多一次传输)、QoS 1(至少一次传输)和QoS 2(恰好一次传输)。这种级别的支持确保了消息的可靠性和传递保证。

1.4、消息保留:MQTT支持在特定主题上保留最新的消息。这意味着当订阅者订阅一个主题时,它将立即接收到最新的保留消息,而不仅仅是实时发送的消息。

1.5、心跳机制:MQTT协议定义了心跳机制,通过发送心跳报文,保持客户端和代理服务器之间的连接有效性。如果客户端长时间没有发送心跳,代理服务器将断开连接。

1.6、安全性支持:MQTT提供了基于TLS/SSL的加密和身份验证机制,以确保消息的机密性和安全性。

1.7、广泛的应用:MQTT广泛应用于物联网领域,例如传感器网络、远程监测、智能家居、工业自动化等。

MQTT协议的轻量级和简单性使得它成为连接大量设备和传输数据的理想选择,尤其是在资源受限的物联网环境中。它以其可靠性、灵活性和互通性在物联网行业得到了广泛应用。


二、集群搭建步骤

1.下载apache-activemq-5.15.12-bin.tar.gz

官网下载地址

2.上传apache-activemq-5.15.12-bin.tar.gz到服务器并解压文件到文件夹clusters、master、slave三个文件夹下面形成三个节点(平等节点)

注意:三个节点是公平节点。一开始我是想做集群节点clusters做数据分发,然后master和slave做主从的。后面发现存在问题,在资源较少情况下clusters、clusters、slave都为单节点的情况下,clusters一挂掉,集群关系就破裂了,没有节点给master和slave做数据分发了,这样的配置不友好。
于是我就把三个节点配置成了平等节点,任何节点宕机都能正常运行。

3.分别修改三个节点的配置文件:activemq.xml,修改连接协议端口号和配置网络代理,修改如下:

3.1、配置默认的传输协议OpenWire 和 支持硬件的传输协议MQTT;
3.2、配置网络代理networkConnectors,做节点间数据传输;
3.3、duplex设置为true,则一个连接上可以双向流动消息(双工连接),默认值为false,默认情况下,在两个提供者之间的连接上的消息流动方向是单向(单工连接);
3.4、修改三个节点的brokerName为localhost_clusters、localhost_master、localhost_slave;

<transportConnectors>
            <!-- DOS protection, limit concurrent connections to 1000 and frame size to 100MB -->
            <transportConnector name="openwire" uri="tcp://0.0.0.0:61616?maximumConnections=10000&amp;wireFormat.maxFrameSize=104857600"/>
           <transportConnector name="mqtt" uri="mqtt://0.0.0.0:1883?maximumConnections=10000&amp;wireFormat.maxFrameSize=104857600"/>
        </transportConnectors>
		<!-- 配置网络代理,cluster 节点需要与 master 跟 slave 进行穿透 -->
		<networkConnectors>
                <networkConnector name="network-clusters" uri="static:(tcp://192.168.10.41:61617,tcp://192.168.10.41:61618)" duplex="true" />
        </networkConnectors>
		
		
		<transportConnectors>
            <!-- DOS protection, limit concurrent connections to 1000 and frame size to 100MB -->
            <transportConnector name="openwire" uri="tcp://0.0.0.0:61617?maximumConnections=10000&amp;wireFormat.maxFrameSize=104857600"/>
            <transportConnector name="mqtt" uri="mqtt://0.0.0.0:2884?maximumConnections=10000&amp;wireFormat.maxFrameSize=104857600"/>
        </transportConnectors>
		<!-- 配置网络代理,master 节点需要与 cluster 跟 slave 进行穿透 -->
		<networkConnectors>
                <networkConnector name="network-master" uri="static:(tcp://192.168.10.41:61616,tcp://192.168.10.41:61618)" duplex="true" />
        </networkConnectors>
		
		
		<transportConnectors>
            <!-- DOS protection, limit concurrent connections to 1000 and frame size to 100MB -->
            <transportConnector name="openwire" uri="tcp://0.0.0.0:61618?maximumConnections=10000&amp;wireFormat.maxFrameSize=104857600"/>
            <transportConnector name="mqtt" uri="mqtt://0.0.0.0:2885?maximumConnections=10000&amp;wireFormat.maxFrameSize=104857600"/>
        </transportConnectors>
		<!-- 配置网络代理,slave 节点需要与 master 跟 cluster 进行穿透 -->
		<networkConnectors>
                <networkConnector name="network-slave" uri="static:(tcp://192.168.10.41:61616,tcp://192.168.10.41:61617)" duplex="true" />
        </networkConnectors>
<broker xmlns="http://activemq.apache.org/schema/core" brokerName="localhost_clusters" dataDirectory="${activemq.data}">

<broker xmlns="http://activemq.apache.org/schema/core" brokerName="localhost_master" dataDirectory="${activemq.data}">

<broker xmlns="http://activemq.apache.org/schema/core" brokerName="localhost_slave" dataDirectory="${activemq.data}">

4.分别修改三个节点的WEB配置文件:jetty.xml,修改端口号,修改如下:

<bean id="jettyPort" class="org.apache.activemq.web.WebConsolePort" init-method="start">
             <!-- the default port number for the web console -->
        <property name="host" value="0.0.0.0"/>
        <property name="port" value="8161"/>
</bean>

<bean id="jettyPort" class="org.apache.activemq.web.WebConsolePort" init-method="start">
             <!-- the default port number for the web console -->
        <property name="host" value="0.0.0.0"/>
        <property name="port" value="8171"/>
</bean>

<bean id="jettyPort" class="org.apache.activemq.web.WebConsolePort" init-method="start">
             <!-- the default port number for the web console -->
        <property name="host" value="0.0.0.0"/>
        <property name="port" value="8181"/>
</bean>

5.分别到bin目录下运行activemq文件启动activemq,命令如下:

sh activemq start

6.全部节点启动后,进入web端界面查询集群配置情况:

6.1、进入web端界面http://192.168.10.41:8161、http://192.168.10.41:8171、http://192.168.10.41:8181,登录账号密码admin/admin,到Network查看是否有另外两个节点的连接情况,若有另外两个节点的连接信息并且Remote Address为true,则集群建立完毕;

6.2、图片如下:
在这里插入图片描述

在这里插入图片描述
在这里插入图片描述


三、项目连接配置步骤

1.Activemq连接服务配置:

ActiveMQ连接配置开箱即用

failover是一种连接URL配置选项,用于指定多个ActiveMQ broker的连接地址。当一个broker发生故障或不可用时,客户端会自动尝试连接配置中的其他broker。以此机制来实现多节点的集群连接模式。

spring:
  activemq:
    broker-url: failover:(tcp://192.168.10.41:61616,tcp://192.168.10.41:61617,tcp://192.168.10.41:61618)
    user: admin
    password: admin
    pool:
      enabled: true
    packages:
      trust-all: true

2.MQTT生产者连接配置:

注意:MQTT的连接配置是我自定义的,在项目里有相关代码配合使用;

mqtt:
  brokers: tcp://192.168.10.41:2884,tcp://192.168.10.41:2885,tcp://192.168.10.41:1883
  clientIds: dig-producer1,dig-producer2
  qos: 1
  userName: admin
  password: admin

3.MQTT消费者连接配置:

注意:MQTT的连接配置是我自定义的,在项目里有相关代码配合使用;

mqtt:
  topics: V5008Upload/#,V6800Upload/#
  qoss: 1,2
  brokers: tcp://192.168.10.41:2884,tcp://192.168.10.41:2885,tcp://192.168.10.41:1883
  clientIds: dig-consumer1,dig-consumer2
  userNames: admin
  words: admin

4.项目使用:

4.1、ActiveMQ配置使用
activeMQ配置使用比较简单,也不是本文的重点,简单贴一点代码


import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.ActiveMQSession;
import org.apache.activemq.RedeliveryPolicy;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.jms.config.JmsListenerContainerFactory;
import org.springframework.jms.config.SimpleJmsListenerContainerFactory;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.JMSException;

@Configuration
public class ActiveMqConfig {
    @Value("${spring.activemq.broker-url}")
    private String brokerUrl;

    /**
     * 队列模式(消息将按顺序一个一个地被消费,每个消息只能被一个消费者接收)
     */
    @Bean
    public JmsListenerContainerFactory<?> jmsListenerContainerQueue(ConnectionFactory activeMQConnectionFactory) {
        // SimpleJmsListenerContainerFactory适用于JMS 1.1规范
        // 消息监听容器工厂
        SimpleJmsListenerContainerFactory factory = new SimpleJmsListenerContainerFactory();
        // 关闭事务
        factory.setSessionTransacted(false);
        // 手动确认消息
        factory.setSessionAcknowledgeMode(ActiveMQSession.INDIVIDUAL_ACKNOWLEDGE);
        // 设置监听容器工厂的发布订阅域为队列模式,即采用点对点消息传递模式
        factory.setPubSubDomain(false);
        factory.setConnectionFactory(activeMQConnectionFactory);
        return factory;
    }


    /**
     * 配置名字为givenConnectionFactory的连接工厂
     *
     * @return
     */
    @Bean("givenConnectionFactory")
    public ActiveMQConnectionFactory connectionFactory() throws JMSException {
        ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(brokerUrl);
        // 自定义消费重试机制
        RedeliveryPolicy policy = new RedeliveryPolicy();
        // 消息处理失败重新处理次数,默认为5次
        policy.setMaximumRedeliveries(5);
        // 启用指数退避策略,以延长每次重试的间隔时间
        policy.setUseExponentialBackOff(Boolean.TRUE);
        // 设置初始重试延迟时间为0毫秒,意味着消息处理失败时立即进行重试
        policy.setInitialRedeliveryDelay(0);
        // 设置每次重试之间的延迟时间为3秒
        policy.setRedeliveryDelay(3000L);
        // 设置指数退避的增加倍数,每次重试的延迟时间将按比例增加
        policy.setBackOffMultiplier(2);
        // 设置最大重试延迟时间为20秒
        policy.setMaximumRedeliveryDelay(20000L);
        factory.setRedeliveryPolicy(policy);
        Connection connection = factory.createConnection();
        connection.start();
        return factory;
    }

//    /**
//     *  发布-订阅模式(消息会被广播给所有订阅该主题的消费者)
//     */
//    @Bean("topicListener")
//    public JmsListenerContainerFactory<?> jmsListenerContainerTopic(ActiveMQConnectionFactory givenConnectionFactory) {
//        // 设置为发布订阅模式, 默认情况下使用生产消费者方式
//        // DefaultJmsListenerContainerFactory 2.0规范
//        DefaultJmsListenerContainerFactory bean = new DefaultJmsListenerContainerFactory();
//        bean.setSessionTransacted(false);
//        bean.setSessionAcknowledgeMode(ActiveMQSession.INDIVIDUAL_ACKNOWLEDGE);
//        bean.setPubSubDomain(true);
//        bean.setConnectionFactory(givenConnectionFactory);
//        return bean;
//    }

}



import org.apache.activemq.command.ActiveMQQueue;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jms.core.JmsMessagingTemplate;
import org.springframework.stereotype.Service;


@Service
public class ActivimqProducer {

    @Autowired
    private JmsMessagingTemplate jmsMessagingTemplate;

    /**
     * 发送队列模式
     *
     * @param queueName
     * @param message
     */
    public void sendMqQueue(String queueName, String message) {
        this.jmsMessagingTemplate.convertAndSend(new ActiveMQQueue(queueName), message);
    }


}




import com.test.common.enums.QueueType;
import com.test.local.mqtt.process.EquipmentAssetsProcess;
import com.test.local.mqtt.process.MqttDataProcessing;
import lombok.extern.slf4j.Slf4j;
import org.apache.activemq.command.ActiveMQMessage;
import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jms.annotation.JmsListener;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.stereotype.Component;

import javax.jms.Session;

@Slf4j
@Component
public class ActivimqConsumer {

    @Autowired
    private ThreadPoolTaskExecutor threadPoolTaskExecutor;


    @Autowired
    private EquipmentAssetsProcess equipmentAssetsProcess;

    @JmsListener(destination = QueueType.LABEL_STATE, containerFactory = "jmsListenerContainerQueue")
    public void consumerLabelState(ActiveMQMessage activeMQMessage, String message, Session session) {
        if (StringUtils.isNotEmpty(message)) {
            threadPoolTaskExecutor.execute(new MqttDataProcessing(
                    equipmentAssetsProcess,
                    message,
                    QueueType.LABEL_STATE,
                    activeMQMessage,
                    session
            ));
        }
    }
}



import com.test.fastjson.JSON;
import com.test.common.constants.Constants;
import com.test.common.enums.PatternStatusEnum;
import com.test.common.enums.QueueType;
import com.test.common.redis.RedisCache;
import com.test.entity.TagInfo;
import com.test.local.entity.*;
import com.test.service.RegionCheckRecordService;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.apache.activemq.command.ActiveMQMessage;
import org.springframework.beans.factory.annotation.Autowired;

import javax.jms.Session;
import java.util.UUID;

@Slf4j
public class MqttDataProcessing implements Runnable {

    @Autowired
    private RedisCache redisCache;
    @Autowired
    private RegionCheckRecordService regionCheckRecordService;

    private String topic;
    private String message;
    private EquipmentAssetsProcess equipmentAssetsProcess;
    private ActiveMQMessage activeMQMessage;
    private Session session;


    public MqttDataProcessing(
            EquipmentAssetsProcess equipmentAssetsProcess,
            String message,
            String topic,
            ActiveMQMessage activeMQMessage,
            Session session
    ) {
        this.topic = topic;
        this.message = message;
        this.equipmentAssetsProcess = equipmentAssetsProcess;
        this.activeMQMessage = activeMQMessage;
        this.session = session;
    }

    @SneakyThrows
    @Override
    public void run() {
        String logId = UUID.randomUUID().toString().replace("-", "");
        try {
            if (QueueType.LABEL_STATE.equals(topic)) {  
                LabelState labelState = JSON.parseObject(message, LabelState.class); 
                if (labelState.getData() != null && labelState.getData().size() > 0) {
                    equipmentAssetsProcess.processLabelState(labelState, logId);
                }
                activeMQMessage.acknowledge();
            }
        } catch (Exception e) {
            // 重发
            session.recover();
            log.error("异常,重新消费!logId={},topic={},message={}", logId, topic, message, e);
        }

    }

}

4.2、MQTT配置使用
对于MQTT的分布式我是这么理解的:

在消费端,同时连接多个节点进行消费,硬件发送的消息定义一个唯一id,此时会有ABC三个消费者等待硬件发送过来的消息,于是使用redisson的分布式锁lock.tryLock来限制消息只被消费一次。

在生产端,同时连接多个节点进行消息发送,因为我们的硬件只能连接到一个节点上面(硬件不能支持多节点代理消费)在一个节点宕机后才会去尝试连接备选节点,所有我们对所有节点都发送消息,保证该消息能被硬件接收并消费到,另外两个节点多发送的消息也不会造成问题(直接无视了),因为硬件同一时刻只能连接一个节点进行消费。

消费者配置



import lombok.Data;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.cloud.context.config.annotation.RefreshScope;
import org.springframework.stereotype.Repository;

@RefreshScope
@Repository
@Data
public class ConfigMqtt {

    private String[] topics = new String[]{"test1Upload/#","test2Upload/#","test3Upload/#"};

   // @Value("${mqtt.qoss}")
    private int[] qoss = new int[]{2,2,2};

    @Value("${mqtt.brokers}")
    private String[] brokers;

    @Value("${mqtt.clientIds}")
    private String[] clientIds;

    @Value("${mqtt.userNames}")
    private String userNames;

    @Value("${mqtt.words}")
    private String words;

}




import com.alibaba.fastjson.JSON;
import com.test.common.redis.RedisCache;
import com.test.config.ConfigMqtt;
import com.test.util.HexConvert;
import com.google.common.base.Strings;
import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.*;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import org.redisson.api.RLock;
import org.redisson.api.RedissonClient;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.stereotype.Service;

import javax.annotation.Resource;
import java.net.Inet4Address;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.Map;
import java.util.concurrent.TimeUnit;

/**
 * 订阅者:订阅硬件mqtt主题信息,消费硬件发送消息,转换硬件消息发送到ActiveMQ队列,最终到其他微服务处理ActiveMQ队列的消息
 */
@Slf4j
@Service
public class MqttSubscription {


    @Autowired
    private ConfigMqtt configMqtt;

    @Autowired
    private SubscriptionJSON subscriptionJSON;

    @Autowired
    private SubscriptionHEX subscriptionHEX;
    @Resource
    private ThreadPoolTaskExecutor threadPoolTaskExecutor;
    @Resource
    private RedissonClient redissonClient;
    @Resource
    private RedisCache redisCache;

    @Bean
    public void client() throws Exception {
        String[] hosts = configMqtt.getBrokers();
        String[] clientIds = configMqtt.getClientIds();

        // 多个
        for (int i = 0; i < hosts.length; i++) {
            String host = hosts[i];
            String clientId = clientIds[i];
            try {
                InetAddress ip4 = Inet4Address.getLocalHost();
                clientId = clientId + "-" + ip4.getHostAddress();
            } catch (UnknownHostException e) {
                log.error("MqttSubscription-client-configMqtt" + configMqtt);
                log.error("MqttSubscription-client-e" + e);
            }
            String finalClientId = clientId;
            threadPoolTaskExecutor.execute(() -> this.myClient(host, finalClientId));
        }
    }

    private void myClient(String host, String clientId) {

        try {
            String[] topics = configMqtt.getTopics();
            int[] qos = configMqtt.getQoss();

            String userName = configMqtt.getUserNames();
            String passWord = configMqtt.getWords();

            // host为主机名,test为clientid即连接MQTT的客户端ID,一般以客户端唯一标识符表示,MemoryPersistence设置clientid的保存形式,默认为以内存保存
            MqttClient client = new MqttClient(host, clientId, new MemoryPersistence());
            // MQTT的连接设置
            MqttConnectOptions options = new MqttConnectOptions();
            // todo:ch:设置是否清空session,这里如果设置为false表示服务器会保留客户端的连接记录,断线重连会消费断线期间的消息
            options.setCleanSession(true);
            // 设置连接的用户名
            options.setUserName(userName);
            // 设置连接的密码
            options.setPassword(passWord.toCharArray());
            // 设置超时时间 单位为秒
            options.setConnectionTimeout(10);
            // 设置会话心跳时间 单位为秒 服务器会每隔1.5*20秒的时间向客户端发送个消息判断客户端是否在线,但这个方法并没有重连的机制
            options.setKeepAliveInterval(20);
            // 自动重连
            options.setAutomaticReconnect(true);
            // todo:ch:设置遗嘱消息的话题,若客户端和服务器之间的连接意外断开,服务器将发布客户端的遗嘱信息
            options.setWill("willTopic", (clientId + "与服务器断开连接").getBytes(), 0, false);
            // 设置回调函数
            client.setCallback(new MqttCallback() {

                public void connectionLost(Throwable cause) {
                    while (true) {
                        try {
                            client.connect(options);
                            client.subscribe(topics, qos);
                            break;
                        } catch (Exception e) {
                            e.printStackTrace();
                            log.error("mqtt客户端id-clientId:" + clientId);
                            log.error("mqtt连接异常-e", e);
                            log.error("mqtt连接异常-cause" + cause);
                            try {
                                Thread.sleep(5000);
                            } catch (InterruptedException ex) {
                                ex.printStackTrace();
                            }
                        }
                    }
                }

                public void messageArrived(String topic, MqttMessage message) throws Exception {

                    String key = "MQTT-";
                    String uuid = "";
                    // 消息存在
                    if (message.getPayload().length > 0) {
                        byte[] req = message.getPayload();
                        if (topic.contains("test1") && configMqtt.getDataFormat().equals("JSON")) {
                            String data = new String(req);
                            Map<String, Object> map = JSON.parseObject(data);
                            if (map.containsKey("uuid") && map.get("uuid") != null) {
                                uuid = map.get("uuid").toString();
                            
                            }else {
                                uuid = clientId;
                            }
                        } else if (topic.contains("test1") && configMqtt.getDataFormat().equals("HEX")) {
                            String data = HexConvert.convertStringToHex(req);
                            uuid = data.subSequence(data.length() - 8, data.length()).toString();
                            
                        } else if (topic.contains("test2") || topic.contains("test3")) {
                            String data = HexConvert.convertStringToHex(req);
                            uuid = data.subSequence(data.length() - 8, data.length()).toString();
                            
                        }
                        if (!Strings.isNullOrEmpty(uuid)) {
                            key += uuid;
                        }
                    }


                    // 分布式锁,防止多应用节点产生重复消息
                    RLock lock = redissonClient.getLock(key);
                    try {

                        // 加锁,等待30秒锁自动释放, 不在finally手动释放了,给予30秒的缓冲时间
                        boolean resultLock = lock.tryLock(0, 30, TimeUnit.SECONDS);
                        if (resultLock) {
                            String data = new String(message.getPayload());

                            log.info("mqtt-clientId:" + clientId);
                            log.info("mqtt-key:" + key);
//                            log.info("message-ID:" + message.getId());
                            log.info("messageArrived-topic" + topic);
                            log.info("messageArrived-message" + data);

                            if (topic.contains("test1") && configMqtt.getDataFormat().equals("JSON")) {

								// 处理硬件的消息,发送到ActiveMQ,最终在别的微服务进行消息消费
                               ......

                            } else if (topic.contains("test1") && configMqtt.getDataFormat().equals("HEX")) {
                               // 处理硬件的消息,发送到ActiveMQ,最终在别的微服务进行消息消费
                               ......

                            } else if (topic.contains("test2") || topic.contains("test3")
                            ) {
                                // 处理硬件的消息,发送到ActiveMQ,最终在别的微服务进行消息消费
                               ......
                            }
                        }
                    } catch (Exception e) {
                        e.printStackTrace();
                        log.error("mqtt客户端id-clientId:" + clientId);
                        log.error("mqtt发布信息异常-e", e);
                        log.error("mqtt发布信息异常-topic" + topic);
                        log.error("mqtt发布信息异常-message" + message.toString());
                    }
//                    finally {
//                        if (lock.isLocked()) {
//                            if (lock.isHeldByCurrentThread()) {
//                                lock.unlock();
//                            }
//                        }
//                    }
                }

                public void deliveryComplete(IMqttDeliveryToken token) {

                }

            });

            // todo:是否需要永久重新连接,能否设定固定重连次数    或者固定多少秒重连一次(类似心跳机制)
            int retryCount = 0;
            while (!client.isConnected()) {
                try {
                    Thread.sleep(getBackoffTime(retryCount));
                    client.connect(options);
                    client.subscribe(topics, qos);
                } catch (Exception e) {
                    log.error("Reconnect attempt failed", e);
                    retryCount++;
                }
            }
        } catch (MqttException e) {
            log.error("mqtt客户端id-clientId:" + clientId);
            log.error("mqtt连接错误:", e);
        }
    }


    private long getBackoffTime(int retryCount) {
        // 使用指数退避算法计算重连时间
        long waitTime = Math.min(1000 * (1 << retryCount), 60000); // 最大等待时间为60秒
        return waitTime;
    }
}

生产者配置



import lombok.Data;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.cloud.context.config.annotation.RefreshScope;
import org.springframework.stereotype.Repository;

@RefreshScope
@Repository
@Data
public class ConfigMqtt {


    @Value("${mqtt.brokers}")
    private String[] brokers;

    @Value("${mqtt.clientIds}")
    private String[] clientIds;


    @Value("${mqtt.qos}")
    private int qos;


    @Value("${mqtt.userName}")
    private String userName;

    @Value("${mqtt.password}")
    private String password;

}




import com.test.local.config.ConfigMqtt;
import com.test.local.utils.HexConvert;
import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.stereotype.Service;

import javax.annotation.Resource;
import java.net.Inet4Address;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.List;



/**
     *  MQTT生产者,生产消息发送到硬件

     */

@Slf4j
@Service
public class MqttConnect {

    private volatile static MqttClient mqttClientSingleton;

    private volatile static List<MqttClient> mqttClientSingletonList = new ArrayList<>();

    @Autowired
    private ConfigMqtt configMqtt;

    @Resource
    private ThreadPoolTaskExecutor threadPoolTaskExecutor;

    private MqttConnect() {

    }

    /**
     * 创建 多个 mqtt 实例
     */
    public static List<MqttClient> getMoreInstance(ConfigMqtt configMqtt) {
        String[] hosts = configMqtt.getBrokers();
        String[] clientIds = configMqtt.getClientIds();

        // 创建多个节点连接实例
        if (mqttClientSingletonList == null || mqttClientSingletonList.size() < hosts.length) {


            if (mqttClientSingletonList != null && !mqttClientSingletonList.isEmpty() && mqttClientSingletonList.size() < hosts.length) {
                mqttClientSingletonList.forEach(re -> {
                    try {
                        re.disconnect();
                        re.close();
                    } catch (Exception e) {
                    }
                });
                // 清除原有实例
                mqttClientSingletonList.clear();
            }


            // 多个
            for (int i = 0; i < hosts.length; i++) {
                String clientIdRe = clientIds[i];
                String broker = hosts[i];


                String userName = configMqtt.getUserName();

                String password = configMqtt.getPassword();

                StringBuffer clientId = new StringBuffer();

                try {

                    InetAddress ip4 = Inet4Address.getLocalHost();
                    clientId.append(clientIdRe).append("-").append(ip4.getHostAddress()).append("-").append(HexConvert.getStringRandom(11));

                } catch (UnknownHostException e) {
                    log.error("MqttClient-getInstance-e" + e);
                }


                MemoryPersistence persistence = new MemoryPersistence();
                synchronized (MqttConnect.class) {
                    MqttClient mqttClient = null;
                    try {
                        // 创建客户端
                        mqttClient = new MqttClient(broker, clientId.toString(), persistence);

                        // 创建链接参数
                        MqttConnectOptions connOpts = new MqttConnectOptions();

                        // 在重新启动和重新连接时记住状态
                        connOpts.setCleanSession(true);

                        // 设置连接的用户名
                        connOpts.setUserName(userName);
                        connOpts.setPassword(password.toCharArray());

                        // 建立连接
                        mqttClient.connect(connOpts);

                        mqttClientSingletonList.add(mqttClient);

                    } catch (MqttException me) {
                        log.error("reason " + me.getReasonCode());
                        log.error("msg " + me.getMessage());
                        log.error("loc " + me.getLocalizedMessage());
                        log.error("cause " + me.getCause());
                        log.error("excep " + me);
                        log.error("发送连接mqtt异常" + me);
                        try {
                            mqttClient.disconnect();
                            mqttClient.close();

                        } catch (Exception e) {

                        }
                        //将 mqtt 置空
                        mqttClient = null;
                        me.printStackTrace();
                    }
                }
            }
        }
        return mqttClientSingletonList;
    }

    /**
     * 发布消息给硬件
     */
    public void publish(String version, String gateway, String content) {
        StringBuffer topic = new StringBuffer();
        topic.append(version).append("Download/").append(gateway);

        int qos = configMqtt.getQos();


        // mqtt多节点消息发送 -- 每个节点都发送一份消息让硬件消费
        List<MqttClient> clientList = MqttConnect.getMoreInstance(configMqtt);
        if (!clientList.isEmpty()) {
            clientList.forEach(client -> {
                threadPoolTaskExecutor.execute(() -> {
                    try {
                        // 创建消息
                        MqttMessage message = new MqttMessage(content.getBytes());

                        // 设置消息的服务质量
                        message.setQos(qos);

                        log.info("发送消息到MQTT供硬件消费");
                        log.info("================client:"+client.getClientId());
                        log.info("================topic:"+topic);
                        log.info("================message:"+message);

                        // 发布消息
                        client.publish(topic.toString(), message);

                    } catch (MqttException me) {
                        log.error("reason " + me.getReasonCode());
                        log.error("msg " + me.getMessage());
                        log.error("loc " + me.getLocalizedMessage());
                        log.error("cause " + me.getCause());
                        log.error("excep " + me);
                        log.error("发送连接mqtt异常" + me);
                    }
                });
            });
        }
    }
}


总结

近期有时间总结了一下前段时间搭建ActiveMQ + MQTT集群并且在微服务中使用的流程。经此,牛马小陈同学巩固了中间件和分布式概念知识。MQTT的分布式使用是出于自己对分布式的理解然后手写的,目前能正常进行分布式消费,对于MQTT的理解还不是很深,很多处理非常的粗糙,欢迎各位新手同学一起学习、各路大佬批评指正,谢谢!

ActiveMQ + MQTT使用docker方式部署如下:
ActiveMQ + MQTT 集群搭建(docker版本)

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1588071.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

配置QtCreator能加载自定义插件的环境

配置对应环境 引言查看当前版本配置能够加载插件的环境 引言 生成的自定义插件能在QtCreator的设计器中加载&#xff0c;需要满足当前使用的QtCreator的编译时所需的Qt库和编译器。 查看当前版本 这里需要先查看自己使用的QtCreator的版本&#xff0c;即生成QtCreator时使用…

17(18)-1-HTML5 新增语义标签及属性

个人主页&#xff1a;学习前端的小z 个人专栏&#xff1a;HTML5和CSS3悦读 本专栏旨在分享记录每日学习的前端知识和学习笔记的归纳总结&#xff0c;欢迎大家在评论区交流讨论&#xff01; 文章目录 ✍HTML5 新增语义标签及属性&#x1f48e;1 HTML5 新增的块级语义化标签&…

C语言——指针的高级引用

目录 1.概述 2.虚拟内存空间 2.1存储期限 2.2栈区管理 2.3堆区域的使用 3.动态内存分配和释放&#xff08;重点&#xff09; 3.1通用指针类型void 3.2内存分配malloc函数 3.2.1 malloc函数&#xff08;memory allocation&#xff09;&#xff08;注意len*size&#xff…

SAP SD学习笔记04 - 出荷Plant(交货工厂),出荷Point(装运点),输送计划,品目的可用性检查,一括纳入/分割纳入,仓库管理

上一章讲了SD的主数据。 SAP SD学习笔记03 - SD模块中的主数据-CSDN博客 本章讲出荷Plant&#xff08;交货工厂&#xff09;&#xff0c;出荷Point&#xff08;装运点&#xff09;和出和路线。 还是偏理论多一些&#xff0c;后面的文章尽量多加些练习巩固一下。 1&#xff0…

Element-UI plus 自定义-下拉框选择年份【vue3】

1.实现效果 2.实现代码展示 <template><el-select v-model"selectedYear" placeholder"请选择"><el-optionv-for"year in yearOptions":key"year":label"year":value"year"></el-option>…

实验四:基于内容的推荐

代码 import pandas as pd from sklearn.feature_extraction.text import TfidfVectorizer from sklearn.metrics.pairwise import cosine_similarity from sklearn.metrics.pairwise import pairwise_distances import numpy as np news_dfpd.read_csv(C:/Users/Administrat…

【Linux】开始了解重定向

送给大家一句话&#xff1a; 人真正的名字是&#xff1a;欲望。所以你得知道&#xff0c;消灭恐惧最有效的办法&#xff0c;就是消灭欲望。 – 史铁生 《我与地坛》 开始了解重定向 1 前言2 重定向与缓冲区2.1 文件描述符分配规则2.2 重定向的现象2.3 重定向的理解2.4 缓冲区…

阿里云微调chatglm3-6b---只有一个python解释器但gradio要求版本不兼容怎么办

安装LLAMA参考博文http://t.csdnimg.cn/6yYwG 在用LLAMA微调大模型的时候总是出现connected error out并且出现这样的界面 这是由于LLMA所要求的gradio版本>4.0.0,<4.2.0&#xff0c;然而chatglm3-6b要求的gradio版本需要gradio3.39.0才能显示出web_demo_gradio.py渲染…

10.1K star !牛逼了!开源技术速查表,推荐人手一份!

1、前言 在当今信息爆炸的时代&#xff0c;知识的获取、整理和应用显得尤为重要。随着个人职业发展和学习需求的不断提升&#xff0c;搭建一个个人知识库已成为提升竞争力的关键一环。个人知识库不仅是一个信息的存储库&#xff0c;更是一个思维的工具箱&#xff0c;它能够帮助…

【前缀积】Leetcode 除自身以外数组的乘积

题目解析 238. 除自身以外数组的乘积 算法讲解 我们可以使用两个空间保存当前位置的左边积和右边积&#xff0c;需要注意的地方初始的dp表需要初始化为1&#xff0c;如果是0则无法得到结果&#xff0c;因为此处是乘法 class Solution { public:vector<int> productEx…

Python用于比较数据结构并生成差异报告的工具库之data-diff使用详解

概要 Python的data-diff库是一个用于比较数据结构并生成差异报告的工具。它可以处理各种数据类型,如字典、列表、集合等,使得开发者能够快速识别数据之间的差异。 安装 通过pip可以轻松安装data-diff: pip install data-diff特性 支持多种数据类型:能够比较字典、列表、…

鸿蒙+全国产化工业平板电脑在MES系统采集终端应用

在工业4.0的大浪潮推动下,原有制造行业面临原材料及人工成本上涨、生产现场管理混乱、定单杂、生产效率难以提升、生产异常难以实时监控等诸多因素,根本无法满足数字化工厂的基本需求,更难以与工业4.0接轨。 MES系统是一套面向制造企业车间执行层的生产信息化管理 系统。MES可以…

Mac下用adb命令安装apk到android设备笔记

查询了些资料记录备用。以下是在Mac上使用命令行安装APK文件的步骤&#xff1a; 1. 下载并安装ADB&#xff1a; 如果您的Mac上没有安装ADB&#xff0c;请从官方的Android开发者网站下载Android SDK Platform Tools&#xff1a;Android SDK Platform Tools。将下载的ZIP文件解…

Centos安装MySQL提示公钥尚未安装

一、问题 在Centos7.9使用yum安装MySQL时出现错误&#xff0c;提示&#xff1a;mysql-community-server-5.7.44-1.el7.x86_64.rpm 的公钥尚未安装&#xff0c;如下图所示&#xff1a; 执行命令&#xff1a;systemctl start mysqld也提示错误&#xff1a;Failed to start mysq…

spfa算法(java代码)

题目: 851. spfa求最短路 - AcWing题库 输入样例: 3 3 1 2 5 2 3 -3 1 3 4 输出样例: 2 分析&#xff1a; 先去定义一个class 类似于c里面的pair 里面有两个变量x, y 因为后面需要用优先队列来处理最短路问题需要指出比较x还是y 因此我们让这个pair类实现 Comparable 接口 实…

IP广播对讲系统停车场解决方案

IP广播对讲系统停车场解决方案 一、需求分析 随着国民经济和社会的发展&#xff0c; 选择坐车出行的民众越来越多。在保护交通安全的同时&#xff0c;也给停车场服务部门提出了更高的要求。人们对停车场系统提出了更高的要求与挑战&#xff0c; 需要停车场系统提高工作效率与服…

如何在CentOS7.x上生成自签名SSL证书

在配置HTTPS连接时&#xff0c;SSL证书是确保数据传输安全性的关键组件。自签名证书是一种不通过证书颁发机构&#xff08;CA&#xff09;签发的证书&#xff0c;适用于测试和内部使用。以下是在CentOS 7.x系统上生成自签名证书的详细步骤。 1. 安装OpenSSL OpenSSL是一个强大…

时间案例-倒计时

需求 休息日例子 自定义日期类MyDate 日期记录是否是休息日记录是否是周末 Data NoArgsConstructor AllArgsConstructor public class MyDate {// 日期LocalDate date;// 是否休息boolean isRest;// 是否是周末boolean isWeekend; }starter启动器 // 1. 定义起始的休息时间 202…

婚纱摄影从入门到精通,专业婚礼摄影实战指南

一、资料描述 本套婚纱摄影资料&#xff0c;大小543.64M&#xff0c;共有15个文件。 二、资料目录 《婚礼摄影实战指南》苏盛鑫.全彩版.pdf 《婚礼摄影幸福攻略》.pdf 《婚纱摄影8问-拍婚纱照注意事项》【新人必看】.pdf 《婚纱摄影摆姿》 Wedding Posing.pdf 《婚纱摄影…

ELK 企业级日志分析 ELFK

一 ELK 简介 ELK平台是一套完整的日志集中处理解决方案&#xff0c;将 ElasticSearch、Logstash 和 Kiabana 三个开源 工具配合使用&#xff0c; 完成更强大的用户对日志的查询、排序、统计需求。 1 ElasticSearch&#xff1a; 是基于Lucene&#xff08;一个全文检索引擎的…