Apache RocketMQ5.x-消息队列体验

news2025/1/24 2:18:03

Apache RocketMQ5.x-消息队列体验

Apache RocketMQ 是一款低延迟、高并发、高可用、高可靠的分布式消息中间件,由阿里开源,后由阿里捐赠给Apache基金会。

本次体验的目的是从技术角度验证一下在微服架构中,用Apache RocketMQ做为消息队列,做为模块之间解耦,以及主要验证是否要以做为异步消息队列进行投递消息。

【Apache RocketMQ官方网站】

在这里插入图片描述

目录

  • Apache RocketMQ5.x-消息队列体验
  • 一、各组件版本说明
  • 二、安装Apache RocketMQ 5.1.3
    • 1、下载Apache RocketMQ 5.1.3
    • 2、解压后修改配置
    • 3、启动NameServer
    • 3、启动Broker+Proxy
  • 三、安装RocketMQ Dashboard
    • 1、下载 RocketMQ Dashboard 源码
    • 2、修改rocketmq的namesrv地址
    • 3、编译RocketMQ Dashboard 源码
    • 4、运行RocketMQ Dashboard
    • 4、通过浏览器访问RocketMQ Dashboard
  • 四、创建Demo工程,编写代码
    • 1、新建rocketmqDemo工程
    • 2、编辑pom.xml和application.properties
    • 3、编写消息生产者服务类
    • 4、编写消息消费者服务类
    • 5、编写消息生产者Controller接口类
    • 6、启动类
  • 五、运行代码
    • 1、创建topic
    • 2、启动程序
    • 3、访问消息生产接口
    • 4、确认发送消息的数量
    • 5、确认tag=consumer1的消费者收到的消息数量
    • 5、确认tag=consumer2的消费者收到的消息数量
  • 六、验证向consumer1和consumer2发送了 1000条消息
    • 1、Producer发送消息数量截图
    • 2、consumer1收到消息数量截图
    • 3、consumer2收到消息数量截图
    • 4、RocketMQ监控截图
  • 七、一个订阅关系下有多个节点时的负载
    • 1、Producer发送消息数量截图
    • 2、consumer1收到消息数量截图
    • 3、consumer1收到消息数量截图
  • 八、停止RocketMQ服务
    • 1、显示所有java进程
    • 2、停止RocketMQ Dashboard
    • 3、停止RocketMQ NameSrv
    • 4、停止RocketMQ Proxy

一、各组件版本说明

序号技术框架说明
1Spring bootSpring boot 2.7.13
2Apache RocketMQ5.1.3
3rocketmq-clients-java5.0.5
4RocketMQ Dashboard需要下载源码自行编译

二、安装Apache RocketMQ 5.1.3

1、下载Apache RocketMQ 5.1.3

我们直接下载二进制版本,【当前版本下载地址】,最新版本需要去官方网站根据文档中说明的链接进行下载
在这里插入图片描述

2、解压后修改配置

将rocketmq-all-5.1.3-bin-release.zip下载并解压到了我本地/Users/duyanjun/Downloads/rocketmq-all-5.1.3,我本机是macos,windows或Linux系统上的操作也是一样的

1)、修改broker配置

cd /Users/duyanjun/Downloads/rocketmq-all-5.1.3
## 编辑broker配置配置文件
vim conf/broker.conf

在这里插入图片描述
2)、修改rmq-proxy端口

默认rmq-proxy的监听端口是8080,如果端口有冲突的话可以通过conf/rmq-proxy.json配置文件中的参数,这里我们改为了9080

vim conf/rmq-proxy.json

rmq-proxy.json配置文件

{
  "rocketMQClusterName": "DefaultCluster",
  "remotingListenPort": 9080
}

3、启动NameServer

### 启动namesrv
nohup sh bin/mqnamesrv &
 
### 验证namesrv是否启动成功
tail -f ~/logs/rocketmqlogs/namesrv.log

在这里插入图片描述

3、启动Broker+Proxy

### 先启动broker
nohup sh bin/mqbroker -n 192.168.0.9:9876 --enable-proxy &

### 验证broker是否启动成功, 比如, broker的ip是192.168.0.9 然后名字是broker-a
tail -f ~/logs/rocketmqlogs/proxy.log 

在这里插入图片描述

三、安装RocketMQ Dashboard

RocketMQ Dashboard 是 RocketMQ 的管控利器,为用户提供客户端和应用程序的各种事件、性能的统计信息,支持以可视化工具代替 Topic 配置、Broker 管理等命令行操作。

1、下载 RocketMQ Dashboard 源码

源码地址:apache/rocketmq-dashboard
下载并解压到本地目录

2、修改rocketmq的namesrv地址

说明:

  • 修改application.properties配置文件中的rocketmq.config.namesrvAddr为RocketMQ nameSrv服务所在服务器的ip和端口。
  • 由于rocketmq-dashboard-rocketmq-dashboard我准备就在RocketMQ所在的服务器上运行,所以将ip改为了127.0.0.1
cd 本地解压后的目录/rocketmq-dashboard-rocketmq-dashboard-1.0.0
vim application.properties

在这里插入图片描述

3、编译RocketMQ Dashboard 源码

mvn clean package -Dmaven.test.skip=true

说明:
编译成功后,目标文件rocketmq-dashboard-1.0.0.jar输出到了target/目录下了。
在这里插入图片描述

4、运行RocketMQ Dashboard

将编译好的rocketmq-dashboard-1.0.0.jar复制到RocketMQ服务器所在的目录下
在这里插入图片描述
启动RocketMQ Dashboard

nohup java -jar rocketmq-dashboard-1.0.0.jar &

在这里插入图片描述

4、通过浏览器访问RocketMQ Dashboard

说明:
通过application.properties中server.port=8085得知,RocketMQ Dashboard的web服务端口是8085,所以通过浏览器输入http://ip:8085

在这里插入图片描述

四、创建Demo工程,编写代码

1、新建rocketmqDemo工程

在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

2、编辑pom.xml和application.properties

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.7.13</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>
    <groupId>com.example</groupId>
    <artifactId>rocketmqDemo</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <name>rocketmqDemo</name>
    <description>rocketmqDemo</description>
    <properties>
        <java.version>1.8</java.version>
        <rocketmq-client-java-version>5.0.5</rocketmq-client-java-version>
    </properties>
    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-configuration-processor</artifactId>
            <optional>true</optional>
        </dependency>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <optional>true</optional>
        </dependency>
        <dependency>
            <groupId>org.apache.rocketmq</groupId>
            <artifactId>rocketmq-client-java</artifactId>
            <version>${rocketmq-client-java-version}</version>
        </dependency>

    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
                <configuration>
                    <excludes>
                        <exclude>
                            <groupId>org.projectlombok</groupId>
                            <artifactId>lombok</artifactId>
                        </exclude>
                    </excludes>
                </configuration>
            </plugin>
        </plugins>
    </build>
</project>

application.properties

server.port=9090
# 应用名
spring.application.name=rocketmq-demo

3、编写消息生产者服务类

ProducerRocketMQService.java

package com.example.rocketmqdemo.rocketmq;

import lombok.AllArgsConstructor;
import lombok.Data;
import org.apache.rocketmq.client.apis.ClientConfiguration;
import org.apache.rocketmq.client.apis.ClientConfigurationBuilder;
import org.apache.rocketmq.client.apis.ClientException;
import org.apache.rocketmq.client.apis.ClientServiceProvider;
import org.apache.rocketmq.client.apis.message.Message;
import org.apache.rocketmq.client.apis.producer.Producer;
import org.apache.rocketmq.client.apis.producer.SendReceipt;
import org.springframework.stereotype.Service;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.*;
import java.util.logging.Level;
import java.util.logging.Logger;

@Service
public class ProducerRocketMQService {
    // 日志对象
    private final Logger logger = Logger.getLogger(ProducerRocketMQService.class.getName());

    private final ExecutorService sendCallbackExecutor = Executors.newCachedThreadPool();

    private static final Object lock = new Object();

    private String endpoint;

    private String topic;

    private boolean isRunning;

    private final ConcurrentLinkedQueue<SendMessage> sendMessageQueue = new ConcurrentLinkedQueue<>();

    private Thread producerThread = null;


    /**
     * 初始化RocketMQ消息生产者
     *
     * @param endpoint
     *          接入点地址,需要设置成Proxy的地址和端口列表,一般是xxx:8081;xxx:8081。
     * @param topic
     *          订阅主题
     */
    public void startInitProducer(String endpoint, String topic) {
        this.endpoint = endpoint;
        this.topic = topic;
        if (producerThread == null) {
            producerThread = new Thread(new ProducerSendMsgTask());
        }
        if (!producerThread.isAlive()) {
            isRunning = true;
            producerThread.start();
        }
    }

    /**
     * 停止订阅
     */
    public void stopProducer() {
        isRunning = false;
        if (producerThread != null && producerThread.isInterrupted()) {
            producerThread.interrupt();
            producerThread = null;
        }
    }

    public void sendMessage(String tag, String message) {
        SendMessage sendMessage = new SendMessage(tag, message);
        sendMessageQueue.offer(sendMessage);
        synchronized (lock) {
            lock.notifyAll();
        }
    }

    /**
     * 生产者发送消息任务类
     */
    private class ProducerSendMsgTask implements Runnable {
        @Override
        public void run() {
            while (isRunning) {
                try {
                    if (sendMessageQueue.isEmpty()) {
                        synchronized (lock) {
                            lock.wait();
                        }
                    } else {
                        sendMsg();
                    }
                } catch (Exception ignored) {

                }
            }
        }

        private void sendMsg() {
            List<SendMessage> sendMessageList = new ArrayList<>();
            SendMessage sendMessage = null;
            while ((sendMessage = sendMessageQueue.poll()) != null) {
                sendMessageList.add(sendMessage);
            }

            ClientServiceProvider provider = ClientServiceProvider.loadService();
            ClientConfigurationBuilder builder = ClientConfiguration.newBuilder().setEndpoints(endpoint);
            ClientConfiguration configuration = builder.build();
            try {
                // 初始化Producer时需要设置通信配置以及预绑定的Topic。
                Producer producer = provider.newProducerBuilder()
                        .setTopics(topic)
                        .setClientConfiguration(configuration)
                        .build();

                for (SendMessage msg : sendMessageList) {
                    String msgKey = UUID.randomUUID().toString();
                    // 普通消息发送。
                    Message message = provider.newMessageBuilder()
                            .setTopic(topic)
                            // 设置消息索引键,可根据关键字精确查找某条消息。
                            .setKeys(msgKey)
                            // 设置消息Tag,用于消费端根据指定Tag过滤消息。
                            .setTag(msg.getTag())
                            // 消息体。
                            .setBody(msg.getMessage().getBytes(StandardCharsets.UTF_8))
                            .build();
                    // 发送消息,需要关注发送结果,并捕获失败等异常。
                    SendReceipt sendReceipt = producer.send(message);
                    logger.info("Send message successfully, messageId=" + sendReceipt.getMessageId());
                }
                if (producer != null) {
                    try {
                        // Close the producer when you don't need it anymore.
                        producer.close();
                        producer = null;
                    } catch (IOException e) {
                        logger.log(Level.WARNING, "", e);
                    }
                }
            } catch (ClientException e) {
                logger.log(Level.WARNING, "Failed to send message", e);
            }
            sendMessageList.clear();
            sendMessageList = null;
        }
    }

    /**
     * 需要发送的消息实体类
     */
    @Data
    @AllArgsConstructor
    private class SendMessage {
        private String tag;

        private String message;
    }

}

4、编写消息消费者服务类

ConsumerRocketMQService.java

package com.example.rocketmqdemo.rocketmq;

import lombok.NoArgsConstructor;
import org.apache.rocketmq.client.apis.*;
import org.apache.rocketmq.client.apis.consumer.FilterExpression;
import org.apache.rocketmq.client.apis.consumer.FilterExpressionType;
import org.apache.rocketmq.client.apis.consumer.SimpleConsumer;
import org.apache.rocketmq.client.apis.message.MessageId;
import org.apache.rocketmq.client.apis.message.MessageView;
import org.springframework.stereotype.Service;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.Collections;
import java.util.List;
import java.util.logging.Level;
import java.util.logging.Logger;

@Service
@NoArgsConstructor
public class ConsumerRocketMQService {
    private final Logger log = Logger.getLogger(ConsumerRocketMQService.class.getName());

    private static Thread subscribeRocketMQThread;

    private static boolean isRunning = false;

    private SimpleConsumer consumer;

    /**
     * 开始订阅消息
     *
     * @param endpoints
     *          接入点地址,需要设置成Proxy的地址和端口列表,一般是xxx:8081;xxx:8081。
     * @param topic
     *          订阅消息主题
     * @param tag
     *          消息Tag,用于消费端根据指定Tag过滤消息
     * @param consumerGroup
     *          消费端分组
     */
    public void startSubscribeRocketMQ(String endpoints, String topic, String tag,String consumerGroup) {
        if (subscribeRocketMQThread == null) {
            final ClientServiceProvider provider = ClientServiceProvider.loadService();

            // Credential provider is optional for client configuration.
            /*
            String accessKey = "yourAccessKey";
            String secretKey = "yourSecretKey";
            SessionCredentialsProvider sessionCredentialsProvider =
                    new StaticSessionCredentialsProvider(accessKey, secretKey);
             */
            ClientConfiguration clientConfiguration = ClientConfiguration.newBuilder()
                    .setEndpoints(endpoints)
                    //.setCredentialProvider(sessionCredentialsProvider)
                    .build();
            Duration awaitDuration = Duration.ofSeconds(30);
            FilterExpression filterExpression = new FilterExpression(tag, FilterExpressionType.TAG);
            try {
                consumer = provider.newSimpleConsumerBuilder()
                        .setClientConfiguration(clientConfiguration)
                        // Set the consumer group name.
                        .setConsumerGroup(consumerGroup)
                        // set await duration for long-polling.
                        .setAwaitDuration(awaitDuration)
                        // Set the subscription for the consumer.
                        .setSubscriptionExpressions(Collections.singletonMap(topic, filterExpression))
                        .build();
            } catch (ClientException e) {
                log.log(Level.WARNING, "", e);
            }
            subscribeRocketMQThread = new Thread(new SubscribeRocketMQTask());
        }
        if (!subscribeRocketMQThread.isAlive()) {
            subscribeRocketMQThread.start();
        }
    }

    /**
     * 停止订阅
     */
    public void stopSubscribeRocketMQ() {
        isRunning = false;
        if (subscribeRocketMQThread != null && subscribeRocketMQThread.isInterrupted()) {
            subscribeRocketMQThread.interrupt();
            subscribeRocketMQThread = null;
        }
    }

    private class SubscribeRocketMQTask implements Runnable {
        @Override
        public void run() {
            isRunning = true;
            try {
                // Max message num for each long polling.
                int maxMessageNum = 30;
                // Set message invisible duration after it is received.
                Duration invisibleDuration = Duration.ofSeconds(15);
                // Receive message, multi-threading is more recommended.
                while (isRunning) {
                    final List<MessageView> messages = consumer.receive(maxMessageNum, invisibleDuration);
                    log.info("RocketMQ Consumer收到 [" + messages.size() + "] 条消息");
                    for (MessageView message : messages) {
                        final MessageId messageId = message.getMessageId();
                        try {
                            consumer.ack(message);
                            System.out.println("==== Consumber收到来自RocketMQ的消息 =======");
                            System.out.println("messageID:" + message.getMessageId());
                            System.out.println("body:" + StandardCharsets.UTF_8.decode(message.getBody()).toString());
                            System.out.println("topic:" + message.getTopic());
                            System.out.println("tag:" + message.getTag());
                            log.info("Message is acknowledged successfully, messageId=" +  messageId);
                        } catch (Throwable t) {
                            log.log(Level.WARNING, "Message is failed to be acknowledged, messageId="+ messageId, t);
                        }
                    }
                }
                // Close the simple consumer when you don't need it anymore.
                consumer.close();
            } catch (ClientException | IOException e) {
                log.log(Level.WARNING, "", e);
            }
        }
    }
}

ConsumerRocketMQService2.java

package com.example.rocketmqdemo.rocketmq;

import lombok.NoArgsConstructor;
import org.apache.rocketmq.client.apis.ClientConfiguration;
import org.apache.rocketmq.client.apis.ClientException;
import org.apache.rocketmq.client.apis.ClientServiceProvider;
import org.apache.rocketmq.client.apis.consumer.FilterExpression;
import org.apache.rocketmq.client.apis.consumer.FilterExpressionType;
import org.apache.rocketmq.client.apis.consumer.SimpleConsumer;
import org.apache.rocketmq.client.apis.message.MessageId;
import org.apache.rocketmq.client.apis.message.MessageView;
import org.springframework.stereotype.Service;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.Collections;
import java.util.List;
import java.util.logging.Level;
import java.util.logging.Logger;

@Service
@NoArgsConstructor
public class ConsumerRocketMQService2 {
    private final Logger log = Logger.getLogger(ConsumerRocketMQService2.class.getName());

    private static Thread subscribeRocketMQThread;

    private static boolean isRunning = false;

    private SimpleConsumer consumer;

    /**
     * 开始订阅消息
     *
     * @param endpoints
     *          接入点地址,需要设置成Proxy的地址和端口列表,一般是xxx:8081;xxx:8081。
     * @param topic
     *          订阅消息主题
     * @param tag
     *          消息Tag,用于消费端根据指定Tag过滤消息
     * @param consumerGroup
     *          消费端分组
     */
    public void startSubscribeRocketMQ(String endpoints, String topic, String tag,String consumerGroup) {
        if (subscribeRocketMQThread == null) {
            final ClientServiceProvider provider = ClientServiceProvider.loadService();

            // Credential provider is optional for client configuration.
            /*
            String accessKey = "yourAccessKey";
            String secretKey = "yourSecretKey";
            SessionCredentialsProvider sessionCredentialsProvider =
                    new StaticSessionCredentialsProvider(accessKey, secretKey);
             */
            ClientConfiguration clientConfiguration = ClientConfiguration.newBuilder()
                    .setEndpoints(endpoints)
                    //.setCredentialProvider(sessionCredentialsProvider)
                    .build();
            Duration awaitDuration = Duration.ofSeconds(30);
            FilterExpression filterExpression = new FilterExpression(tag, FilterExpressionType.TAG);
            try {
                consumer = provider.newSimpleConsumerBuilder()
                        .setClientConfiguration(clientConfiguration)
                        // Set the consumer group name.
                        .setConsumerGroup(consumerGroup)
                        // set await duration for long-polling.
                        .setAwaitDuration(awaitDuration)
                        // Set the subscription for the consumer.
                        .setSubscriptionExpressions(Collections.singletonMap(topic, filterExpression))
                        .build();
            } catch (ClientException e) {
                log.log(Level.WARNING, "", e);
            }
            subscribeRocketMQThread = new Thread(new SubscribeRocketMQTask());
        }
        if (!subscribeRocketMQThread.isAlive()) {
            subscribeRocketMQThread.start();
        }
    }

    /**
     * 停止订阅
     */
    public void stopSubscribeRocketMQ() {
        isRunning = false;
        if (subscribeRocketMQThread != null && subscribeRocketMQThread.isInterrupted()) {
            subscribeRocketMQThread.interrupt();
            subscribeRocketMQThread = null;
        }
    }

    private class SubscribeRocketMQTask implements Runnable {
        @Override
        public void run() {
            isRunning = true;
            try {
                // Max message num for each long polling.
                int maxMessageNum = 30;
                // Set message invisible duration after it is received.
                Duration invisibleDuration = Duration.ofSeconds(15);
                // Receive message, multi-threading is more recommended.
                while (isRunning) {
                    final List<MessageView> messages = consumer.receive(maxMessageNum, invisibleDuration);
                    log.info("RocketMQ Consumer2收到 [" + messages.size() + "] 条消息");
                    for (MessageView message : messages) {
                        final MessageId messageId = message.getMessageId();
                        try {
                            consumer.ack(message);
                            System.out.println("==== Consumber2收到来自RocketMQ的消息 =======");
                            System.out.println("messageID:" + message.getMessageId());
                            System.out.println("body:" + StandardCharsets.UTF_8.decode(message.getBody()).toString());
                            System.out.println("topic:" + message.getTopic());
                            System.out.println("tag:" + message.getTag());
                            log.info("Message is acknowledged successfully, messageId=" +  messageId);
                        } catch (Throwable t) {
                            log.log(Level.WARNING, "Message is failed to be acknowledged, messageId="+ messageId, t);
                        }
                    }
                }
                // Close the simple consumer when you don't need it anymore.
                consumer.close();
            } catch (ClientException | IOException e) {
                log.log(Level.WARNING, "", e);
            }
        }
    }
}

说明:
ConsumerRocketMQService类和ConsumerRocketMQService2类基本上是一样的,这样做只是为了模拟两个消费者才这样做的。

5、编写消息生产者Controller接口类

ProducerController.java,对应的接口是http://ip:9090/t1/1?num=100

package com.example.rocketmqdemo.controller;

import com.example.rocketmqdemo.rocketmq.ProducerRocketMQService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.*;

@Controller
@RequestMapping("/t1")
public class ProducerController {

    @Autowired
    private ProducerRocketMQService producerRocketMQService;

    @RequestMapping("/1")
    @ResponseBody
    public String asyncHello(@RequestParam("num") int num){
        try {
            System.out.println("========== 消息生产者开始发送消息=============");
            for (int i = 0; i < num; i++) {
                producerRocketMQService.sendMessage("consumer1", "消息-to-consumer1:" + "hello" + ( i +1 ) );
                producerRocketMQService.sendMessage("consumer2", "消息-to-consumer2:" + "hello" + ( i  +1));
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
        return "ok";
    }
}

6、启动类

RocketmqDemoApplication.java

package com.example.rocketmqdemo;

import com.example.rocketmqdemo.rocketmq.ConsumerRocketMQService;
import com.example.rocketmqdemo.rocketmq.ConsumerRocketMQService2;
import com.example.rocketmqdemo.rocketmq.ProducerRocketMQService;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.context.annotation.ComponentScan;

@SpringBootApplication
@ComponentScan(basePackageClasses = ProducerRocketMQService.class)
@ComponentScan(basePackageClasses = ConsumerRocketMQService.class)
@ComponentScan(basePackageClasses = ConsumerRocketMQService2.class)
public class RocketmqDemoApplication {
    public static void main(String[] args) {
        ConfigurableApplicationContext context = SpringApplication.run(RocketmqDemoApplication.class, args);

        ProducerRocketMQService rocketMQService = context.getBean(ProducerRocketMQService.class);
        String endpoints = "192.168.0.9:9080";
        String topic = "test-msg-topic";
        rocketMQService.startInitProducer(endpoints, topic);
        System.out.println("RocketMQ测试程序启动成功");


        ConsumerRocketMQService service = context.getBean(ConsumerRocketMQService.class);
        String tag = "consumer1";
        String consumerGroup = "consumer-group";
        service.startSubscribeRocketMQ(endpoints, topic, tag, consumerGroup);


        ConsumerRocketMQService2 service2 = context.getBean(ConsumerRocketMQService2.class);
        String tag2 = "consumer2";
        String consumerGroup2 = "consumer-group2";
        service2.startSubscribeRocketMQ(endpoints, topic, tag2, consumerGroup2);
    }
}

说明:
上述消息者有多个时,每个订阅关系的tag要写一个对应唯一的名称,刚开始由于我理解错误,我认为为tag只是用于过滤消息的一个标签,所以我将两个订阅关系的tag写成了一样的了(consumer分组名也写成一样了),结果每个订阅只能收到1/2的消息,后来通过查阅资料,RocketMQ将这两个consumer认为是一个分组中的两个节点了,会自动通过负载均衡,轮着为每个节点推送消息,例如消息M1推给了第一个节点,就不给其它节点推了; 下一条消息M2就推给了第二个节点…
【演示效果可以参见本文章第七节】

五、运行代码

1、创建topic

test-msg-topic在这里插入图片描述

2、启动程序

在这里插入图片描述

3、访问消息生产接口

说明:

  • 1、在浏览器中访问请求接口:http://ip:9090/t1/1?num=10
  • 2、确认发送消息的数量,各向consumer1和consumer2发送了 10条消息
  • 3、确认tag=consumer1消息订阅者收到的消息的数量
  • 4、确认tag=consumer2消息订阅者收到的消息的数量
  • 5、正常情况下tag=consumer1消息订阅者收到的消息数量 + tag=consumer2消息订阅者收到的消息的数量 = 发送消息的数量

在这里插入图片描述

4、确认发送消息的数量

消息发达了20条
在这里插入图片描述

5、确认tag=consumer1的消费者收到的消息数量

通过检索日志中==== Consumber收到来自RocketMQ的消息 =======的数量是10

在这里插入图片描述

5、确认tag=consumer2的消费者收到的消息数量

通过检索日志中==== Consumber2收到来自RocketMQ的消息 =======的数量也是10

在这里插入图片描述

说明:
通过上述过程已验证了RocketMQ可以根据tag进行路由消息。

六、验证向consumer1和consumer2发送了 1000条消息

1、Producer发送消息数量截图

在这里插入图片描述

2、consumer1收到消息数量截图

在这里插入图片描述

3、consumer2收到消息数量截图

在这里插入图片描述

4、RocketMQ监控截图

在这里插入图片描述

说明:

  • 通过上述过程Producer向向consumer1和consumer2发送了 1000条消息,一共2000条;
  • consumer1收到1000条消息;
  • consumer2收到1000条消息;
  • 1000条消息未出现消息丢失的情况,运行平稳;

七、一个订阅关系下有多个节点时的负载

说明:
下面是多个节点订阅相同的tag和consumer组名,虽然发送了20条消息,但是我们只订阅了consumer1这个关系,而Producer为这个订阅发送了10条,所以通过负载均衡后,两个节点,每个会拉到5条消息

在这里插入图片描述

1、Producer发送消息数量截图

在这里插入图片描述

2、consumer1收到消息数量截图

在这里插入图片描述

3、consumer1收到消息数量截图

在这里插入图片描述

八、停止RocketMQ服务

1、显示所有java进程

jps

在这里插入图片描述

2、停止RocketMQ Dashboard

kill -9 2796
jps

在这里插入图片描述

3、停止RocketMQ NameSrv

cd /Users/duyanjun/Downloads/rocketmq-all-5.1.3
sh bin/mqshutdown namesrv

在这里插入图片描述

4、停止RocketMQ Proxy

cd /Users/duyanjun/Downloads/rocketmq-all-5.1.3
sh bin/mqshutdown proxy

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/767601.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

MQTT 订阅标识符详解

为什么需要订阅标识符 在大部分 MQTT 客户端的实现中&#xff0c;都会通过回调机制来实现对新到达消息的处理。 但是在回调函数中&#xff0c;我们只能知道消息的主题名是什么。如果是非通配符订阅&#xff0c;订阅时使用的主题过滤器将和消息中的主题名完全一致&#xff0c;…

chatglm微调

chatGML 看到 【【官方教程】ChatGLM-6B 微调&#xff1a;P-Tuning&#xff0c;LoRA&#xff0c;Full parameter】 【精准空降到 15:27】 https://www.bilibili.com/video/BV1fd4y1Z7Y5/?share_sourcecopy_web&vd_sourceaa8c13cff97f0454ee41e1f609a655f1&t927 记得看…

Java Mybatis02+oracle拓展

0目录 Mybatis 02Oracle 拓展 1.Mybatis 02 创建数据库和表 创建工程 实体类 util工具类 接口方法 Resource Mapper xml文件 配置文件 测试 加入模糊查询&#xff08;根据姓名&#xff09; 测试结果 2.ParameterType语法 实战 参数为对象 参数为…

Bean 作用域与生命周期

Bean 作用域与生命周期 ​ 对于 Spring 来说&#xff0c;核心操作对象就是存和取 Bean &#xff0c;接下来就 Bean 的作用域与生命周期进行探讨。 文章目录 Bean 作用域与生命周期一、作用域的定义1.1、Bean 的6种作用域1.2、Bean作用域设置方法 二、Bean 的生命周期2.1、Bean…

【Java】Java实现微信小程序发送服务通知

文章目录 前言一、文档来源二、JAR包引入三、后端工作四、编写配置文件配置一&#xff1a;WxConfig配置二&#xff1a;WxProperties 五、代码编写 前言 在上个月接到一个需求&#xff0c;大概是需要计算一条数据的最大办理时间从而发送任务超期的微信小程序服务通知&#xff0…

怎么进行流程图制作?分享几种绘制方法

怎么进行流程图制作&#xff1f;流程图是一种图形化表示流程的图表&#xff0c;通常用于描述业务、计划或工作流程。它可以帮助人们更好地理解复杂的流程&#xff0c;并且提供了一种清晰的方法来记录和共享流程信息。下面介绍一些绘制流程图的方法&#xff0c;可以帮助我们快速…

4 自动微分 Automatic Differentitaion

计算图 Computational Graph 图上的每个节点代表一个中间值边事输入输出的关系 forward 求导 forward mode AD 上图中从前向后&#xff0c;一步一步计算每个中间值对 x1的偏导&#xff0c;那么计算到 v7&#xff0c;就得到了整个函数对于 x1的偏导。 有limitation 对一个参数…

echarts开发遇到的问题

echarts开发遇到的问题 1.rich富文本标签作为横向柱状图的刻度标签&#xff0c;其中带有icon。rich里不能写参数&#xff0c;只能写死&#xff1f;圆角设置无效&#xff1f; 解决办法&#xff1a; 自己写横向柱状图 散点图性能优化配置的临界点&#xff0c;最低优化数值必须…

超全整理,软件测试高频面试题(功能/接口/自动化测试-附答案)

目录&#xff1a;导读 前言一、Python编程入门到精通二、接口自动化项目实战三、Web自动化项目实战四、App自动化项目实战五、一线大厂简历六、测试开发DevOps体系七、常用自动化测试工具八、JMeter性能测试九、总结&#xff08;尾部小惊喜&#xff09; 前言 功能测试 1、双十…

LCD—STM32液晶显示(4.液晶控制代码讲解)

目录 STM32液晶控制代码讲解 液晶接口封装介绍 使用LCD的配置步骤 内存操作要使用volatile进行修饰 图形绘制实现 绘制矩形 重点补充 STM32液晶控制代码讲解 液晶接口封装介绍 指南者液晶接口原理图 左边DB00—DB15表示液晶屏的数据线引脚&#xff0c;分别对应STM32的F…

使用ppocr突然退出问题

本次使用conda装了一个cuda10.2版本的paddleocr&#xff0c;然后所有的环境检查没问题&#xff0c;使用paddle自带的检查代码&#xff0c;输出提醒paddle可以正常使用&#xff1a; >>> import paddle >>> paddle.utils.run_check() 输出结果提示安装正常 …

零编程经验也能打造精美微信展示小程序的秘诀揭秘

随着微信的普及和发展&#xff0c;微信展示小程序成为了许多企业展示自己形象的重要渠道。那么如何快速地搭建一个精美的微信展示小程序呢&#xff1f;下面就为大家详细介绍一下具体操作步骤。 首先&#xff0c;进入【乔拓云】平台后台。乔拓云是一款非常优秀的小程序开发平台&…

php伪协议(文件包含)

提示&#xff1a;文章写完后&#xff0c;目录可以自动生成&#xff0c;如何生成可参考右边的帮助文档 文章目录 php伪协议 伪协议详情 php伪协议 文件包含直接读取的是文件&#xff0c;而不是文件源码&#xff0c;所以要想办法读取源码方法。 那么就要涉及到 PHP 伪协议 ph…

LabVIEW开发航空电子设备嵌入式诊断半物理仿真系统

LabVIEW开发航空电子设备嵌入式诊断半物理仿真系统 航电集成系统是现代战争飞机的重要组成部分&#xff0c;包括惯性导航系统、飞行控制系统、机电管理系统和任务计算机等子系统。战机的作战性能与航电系统息息相关&#xff0c;可以说&#xff0c;没有高性能的空电系统&#x…

行业首家!法大大荣获“数据安全管理能力认证(DSMC)”证书

7月11日&#xff0c;法大大获得由中国信息通信研究院&#xff08;以下简称“中国信通院”&#xff09;颁发的“数据安全管理能力认证&#xff08;DSMC&#xff09;证书”&#xff08;以下简称“DSMC证书”&#xff09;&#xff0c;成为行业内首家获颁该证书的企业&#xff0c;法…

基于Java+SpringBoot+Vue前后端分离校园管理系统详细设计和实现

博主介绍&#xff1a;✌全网粉丝30W,csdn特邀作者、博客专家、CSDN新星计划导师、Java领域优质创作者,博客之星、掘金/华为云/阿里云/InfoQ等平台优质作者、专注于Java技术领域和毕业项目实战✌ &#x1f345;文末获取源码联系&#x1f345; &#x1f447;&#x1f3fb; 精彩专…

Spring Cloud Alibaba【创建支付服务生产者、创建服务消费者 、Dubbo和OpenFeign区别 、微服务接入OpenFeign】(二)

目录 分布式服务治理_创建支付服务生产者 分布式服务治理_创建服务消费者 服务调用_Dubbo和OpenFeign区别 服务调用_微服务接入OpenFeign 分布式服务治理_创建支付服务生产者 创建服务提供者工程cloud-provider-payment8001 POM文件引入依赖 <dependencies><…

MP4怎么转换为gif的格式?快试试这两个方法!

想要将MP4视频文件转换为GIF格式&#xff1f;不用担心&#xff0c;本文将为您介绍两种简单易行的方法&#xff1a;记灵在线工具和使用FFmpeg命令行工具。这些方法适用于不同的用户&#xff0c;无论您是喜欢在线工具还是偏向命令行操作&#xff0c;都能找到适合自己的方式。让我…

开源代码分享(8)—大规模电动汽车时空耦合双层优化调度(附matlab代码)

参考文献&#xff1a; [1]He L , Yang J , Yan J , et al. A bi-layer optimization based temporal and spatial scheduling for large-scale electric vehicles[J]. Applied Energy, 2016, 168(apr.15):179-192. DOI:10.1016/j.apenergy.2016.01.089 1.基本原理 1.1摘要 电…

Python模块基础

一、模块 模块可以看成是一堆函数的集合体。 一个py文件内部就可以放一堆函数&#xff0c;因此一个py文件就可以看成一个模块。 如果这个py文件的文件名为module.py&#xff0c;模块名则是module。 1、模块的四种形式 在Python中&#xff0c;总共有以下四种形式的模块&…