【RocketMQ】SpringBoot整合RocketMQ

news2024/11/19 13:20:02

🎯 导读:本文档详细介绍了如何在Spring Boot应用中集成Apache RocketMQ,并实现消息生产和消费功能。首先通过创建消息生产者项目,配置POM文件引入RocketMQ依赖,实现同步消息发送,并展示了如何发送普通字符串消息、对象消息以及集合消息。接着,文档讲解了如何搭建消息消费者项目,包括配置RocketMQ监听器以消费不同类型的RocketMQ消息。此外,还探讨了RocketMQ的不同消息类型如延迟消息、顺序消息的发送方式,以及消息消费模式的选择与实现(负载均衡模式与广播模式)。

文章目录

  • RocketMQ集成SpringBoot入门案例
    • 搭建rocketmq-producer(消息生产者)
      • 创建项目,完整的pom.xml
      • 修改配置文件application.yml
      • 在测试类里面测试发送消息
    • 搭建rocketmq-consumer(消息消费者)
      • 创建项目,完整的pom.xml
      • 修改配置文件application.yml
      • 监听器SimpleMsgListener
      • 启动rocketmq-consumer
  • 发送对象消息和集合消息
    • 发送对象消息(消息内容为对象)
      • 生产者
      • 消费者
    • 发送集合消息(消息内容为集合)
  • 发送不同模式的消息
    • 发送同步消息
    • 发送异步消息
    • 发送单向消息
    • 发送延迟消息
    • 发送顺序消息
      • 生产者
      • ObjMsgListener
    • 发送事务消息
      • 事务消息的处理逻辑
  • 消息过滤
    • tag过滤(常在消费者端过滤)
      • 生产者
      • TagMsgListener
        • sql92表达式
    • Key过滤(可以在事务监听的类里面区分)
      • 生产者
      • 断点发送这个消息,查看事务里面消息头
  • 两种消费模式
    • 消费者1
    • 消费者2
    • 启动两个消费者
    • 在生产者里面添加一个单元测试并且运行
    • 查看两个消费者的控制台
    • BROADCASTING 广播模式

RocketMQ集成SpringBoot入门案例

搭建rocketmq-producer(消息生产者)

在这里插入图片描述

在这里插入图片描述

创建项目,完整的pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.6.3</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>
    <groupId>com.test</groupId>
    <artifactId>01-rocketmq-producer</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <name>rocketmq-producer</name>
    <description>Demo project for Spring Boot</description>
    <properties>
        <java.version>1.8</java.version>
    </properties>
    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <!-- rocketmq的依赖 -->
        <dependency>
            <groupId>org.apache.rocketmq</groupId>
            <artifactId>rocketmq-spring-boot-starter</artifactId>
            <version>2.2.2</version>
        </dependency>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <optional>true</optional>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
    </dependencies>
 
    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
                <configuration>
                    <excludes>
                        <exclude>
                            <groupId>org.projectlombok</groupId>
                            <artifactId>lombok</artifactId>
                        </exclude>
                    </excludes>
                </configuration>
            </plugin>
        </plugins>
    </build>
 
</project>

在这里插入图片描述

如果想用RocketMQ 5.x版本,可以用2.3.0

修改配置文件application.yml

注意rocketmq不是在spring层级下面的,很容易搞错,以为和SpringBoot整合就是在spring层级下面了

spring:
  application:
    name: rocketmq-producer
rocketmq:
  # rocketMq的nameServer地址
  name-server: 127.0.0.1:9876     
  producer:
    # 生产者组别
    group: test-group
    # 消息发送的超时时间
    send-message-timeout: 3000
    # 异步消息发送失败重试次数
    retry-times-when-send-async-failed: 2
    # 发送消息的最大大小,单位字节,这里等于4M
    max-message-size: 4194304      

在测试类里面测试发送消息

往test主题里面发送一个简单的字符串消息

/**
 * 注入rocketMQTemplate,我们使用它来操作mq
 */
@Autowired
private RocketMQTemplate rocketMQTemplate;
 
/**
 * 测试发送简单的消息
 *
 * @throws Exception
 */
@Test
public void testSimpleMsg() throws Exception {
    // 往test的主题里面发送一个简单的字符串消息
    // syncSend同步消息
    // asyncSend异步
    // 参数1:topic名 参数2:Object,数据
    SendResult sendResult = rocketMQTemplate.syncSend("test", "我是一个简单的消息");
    // 拿到消息的发送状态
    System.out.println(sendResult.getSendStatus());
    // 拿到消息的id
    System.out.println(sendResult.getMsgId());
}

运行后查看控制台

在这里插入图片描述

搭建rocketmq-consumer(消息消费者)

在这里插入图片描述

在这里插入图片描述

创建项目,完整的pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.6.3</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>
    <groupId>com.test</groupId>
    <artifactId>02-rocketmq-consumer</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <name>rocketmq-consumer</name>
    <description>Demo project for Spring Boot</description>
    <properties>
        <java.version>1.8</java.version>
    </properties>
    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <!-- rocketmq的依赖 -->
        <dependency>
            <groupId>org.apache.rocketmq</groupId>
            <artifactId>rocketmq-spring-boot-starter</artifactId>
            <version>2.0.2</version>
        </dependency>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <optional>true</optional>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
    </dependencies>
 
    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
                <configuration>
                    <excludes>
                        <exclude>
                            <groupId>org.projectlombok</groupId>
                            <artifactId>lombok</artifactId>
                        </exclude>
                    </excludes>
                </configuration>
            </plugin>
        </plugins>
    </build>
 
</project>

修改配置文件application.yml

spring:
  application:
    name: rocketmq-consumer
rocketmq:
  name-server: 127.0.0.1:9876
#    consumer:
#        group: aaa-group 不需要写,一个项目一般有很多消费者组

监听器SimpleMsgListener

消费者要消费消息,就添加一个监听器,SpringBoot一启动,监听器就开始持续工作
1、类上添加注解 @Component 和 @RocketMQMessageListener 。topic指定消费的主题,consumerGroup 指定消费组,一个主题可以有多个消费者组,一个消息可以被多个不同的组的消费者都消费
2、实现 RocketMQListener 接口,泛型可以为具体的数据类型,如果想拿到消息的其他参数(如消息头、消息体,例如key之类的),泛型用MessageExt

package com.test.listener;
 
import org.apache.rocketmq.spring.annotation.MessageModel;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Component;
 
@Component
@RocketMQMessageListener(topic = "test", consumerGroup = "test-group", messageModel = MessageModel.CLUSTERING)
public class SimpleMsgListener implements RocketMQListener<String> {
 
    /**
     * 消费消息的方法
     * 
     * @param message 消息内容,类型和上面的泛型一致。如果泛型指定了固定的类型,消息体就是我们的参数
     */
    @Override
    public void onMessage(String message) {
        System.out.println(message);
    }
}

泛型使用MessageExt

@Component
@RocketMQMessageListener(topic = "test", consumerGroup = "test-group", messageModel = MessageModel.CLUSTERING)
public class SimpleMsgListener implements RocketMQListener<MessageExt> {
 
    /**
     * 消费消息的方法
     * 
     * @param message 消息内容,类型和上面的泛型一致。如果泛型指定了固定的类型,消息体就是我们的参数
     */
    @Override
    public void onMessage(MessageExt message) {
        System.out.println(new String(message.getBody()));
    }
}

onMessage方法没有返回值,要表示消息签收?

  • 方法报错就拒收
  • 方法不报错就签收

启动rocketmq-consumer

查看控制台,发现我们已经监听到消息了

在这里插入图片描述

发送对象消息和集合消息

发送对象消息(消息内容为对象)

消费者监听的时候泛型中写对象的类型即可

生产者

rocketmq-producer 添加一个Order类

package com.test.domain;
 
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
 
import java.util.Date;
 
/**
 * 订单对象
 */
@Data
@AllArgsConstructor
@NoArgsConstructor
public class Order {
    /**
     * 订单号
     */
    private String orderId;
 
    /**
     * 订单名称
     */
    private String orderName;
 
    /**
     * 订单价格
     */
    private Double price;
 
    /**
     * 订单号创建时间
     */
    private Date createTime;
 
    /**
     * 订单描述
     */
    private String desc;
 
}

rocketmq-producer 添加一个单元测试

/**
 * 测试发送对象消息
 *
 * @throws Exception
 */
@Test
public void testObjectMsg() throws Exception {
    Order order = new Order();
    order.setOrderId(UUID.randomUUID().toString());
    order.setOrderName("我的订单");
    order.setPrice(998D);
    order.setCreateTime(new Date());
    order.setDesc("加急配送");
    // 往test-obj主题发送一个订单对象
    rocketMQTemplate.syncSend("test-obj", order);
}

发送此消息

消费者

rocketmq-consumer 将Order类拷贝过来

rocketmq-consumer 添加一个监听器

package com.test.listener;
 
import com.test.domain.Order;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Component;
 
/**
 * 创建一个对象消息的监听
 * 1.类上添加注解@Component和@RocketMQMessageListener
 * 2.实现RocketMQListener接口,注意泛型的使用
 */
@Component
@RocketMQMessageListener(topic = "test-obj", consumerGroup = "test-obj-group")
public class ObjMsgListener implements RocketMQListener<Order> {
 
    /**
     * 消费消息的方法
     *
     * @param message
     */
    @Override
    public void onMessage(Order message) {
        System.out.println(message);
    }
}

重启rocketmq-consumer后查看控制台,监听的对象消息如下

在这里插入图片描述

发送集合消息(消息内容为集合)

  • 生产者创建一个Order的集合,发送消息;
  • 监听方修改泛型中的类型为 Object ,接收到消息之后,再做类型强转

在这里插入图片描述

发送不同模式的消息

发送同步消息

  • 同步消息:消息由生产者发送到 broker 后,会得到一个确认,具有强可靠性。用于重要的消息通知、短信通知等

入门案例演示发送的就是同步消息。下面三种发送消息的方法,底层都是调用syncSend,发送的都是同步消息

  • rocketMQTemplate.syncSend()
  • rocketMQTemplate.send()
  • rocketMQTemplate.convertAndSend()

发送异步消息

  • rocketMQTemplate.asyncSend()
/**
 * 测试异步发送消息
 *
 * @throws Exception
 */
@Test
public void testAsyncSend() throws Exception {
    // 发送异步消息,发送完以后会有一个异步通知
    rocketMQTemplate.asyncSend("test", "发送一个异步消息", new SendCallback() {
        /**
         * 成功的回调
         * @param sendResult
         */
        @Override
        public void onSuccess(SendResult sendResult) {
            System.out.println("发送成功");
        }
 
        /**
         * 失败的回调
         * @param throwable
         */
        @Override
        public void onException(Throwable throwable) {
            System.out.println("发送失败");
        }
    });
    // 测试一下异步的效果
    System.out.println("谁先执行");
    // 挂起jvm 不让方法结束
    System.in.read();
}

测试发现。“谁先执行”打印在前面,“发送成功” 打印在后面

发送单向消息

  • 单向消息用在不关心发送结果的场景
  • 吞吐量很大,存在消息丢失的风险,可用于日志信息的发送
/**
 * 测试单向消息
 *
 * @throws Exception
 */
@Test
public void testOnWay() throws Exception {
    // 发送单向消息,没有返回值和结果
    rocketMQTemplate.sendOneWay("test", "这是一个单向消息");
}

发送延迟消息

延迟等级,从1级开始分别对应:1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h

/**
 * 测试延迟消息
 *
 * @throws Exception
 */
@Test
public void testDelay() throws Exception {
    // 构建消息对象
    Message<String> message = MessageBuilder.withPayload("我是一个延迟消息").build();
    // 发送一个延时消息,延迟等级为4级,也就是30s后被监听消费
    // 参数3:连接MQ的超时时间 参数4:延迟等级
    SendResult sendResult = rocketMQTemplate.syncSend("test", message, 2000, 4);
    System.out.println(sendResult.getSendStatus());
}

运行后,查看消费者端,过了30s才被消费

发送顺序消息

上面消息的消费者都一样的实现方法。但顺序消息的消费不太一样,消费者需要单线程消费

生产者

/**
 * 测试顺序消费
 * mq会根据hash的值来存放到一个队列里面去
 *
 * @throws Exception
 */
@Test
public void testOrderly() throws Exception {
    List<Order> orders = Arrays.asList(
            new Order(UUID.randomUUID().toString().substring(0, 5), "张三的下订单", null, null, null, 1),
            new Order(UUID.randomUUID().toString().substring(0, 5), "张三的发短信", null, null, null, 1),
            new Order(UUID.randomUUID().toString().substring(0, 5), "张三的物流", null, null, null, 1),
            new Order(UUID.randomUUID().toString().substring(0, 5), "张三的签收", null, null, null, 1),
 
            new Order(UUID.randomUUID().toString().substring(0, 5), "李四的下订单", null, null, null, 2),
            new Order(UUID.randomUUID().toString().substring(0, 5), "李四的发短信", null, null, null, 2),
            new Order(UUID.randomUUID().toString().substring(0, 5), "李四的物流", null, null, null, 2),
            new Order(UUID.randomUUID().toString().substring(0, 5), "李四的签收", null, null, null, 2)
    );
    // 我们控制流程为 下订单->发短信->物流->签收  hash的值为seq,也就是说 seq相同的会放在同一个队列里面,顺序消费
    orders.forEach(order -> {
        rocketMQTemplate.syncSendOrderly("test-obj", order, String.valueOf(order.getSeq()));
    });
}

运行发送消息

ObjMsgListener

consumeMode指定消费类型

  • CONCURRENTLY 并发消费,不按照顺序
  • ORDERLY 顺序消费(消息放到一个队列,用一个线程来消费)
package com.test.listener;
 
import com.test.domain.Order;
import org.apache.rocketmq.spring.annotation.ConsumeMode;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Component;

@Component
@RocketMQMessageListener(topic = "test-obj",
        consumerGroup = "test-obj-group",
        // 修改为顺序消费模式(单线程)
        consumeMode = ConsumeMode.ORDERLY
)
public class ObjMsgListener implements RocketMQListener<Order> {
 
    /**
     * 消费消息的方法
     * @param message
     */
    @Override
    public void onMessage(Order message) {
        System.out.println(message);
    }
}

重启rocketmq-consumer,查看控制台,消息按照我们的放入顺序进行消费了

在这里插入图片描述

发送事务消息

rocketmq-producer 添加一个单元测试

/**
 * 测试事务消息
 * 默认是sync(同步的)
 * 事务消息会有确认和回查机制
 * 事务消息都会走到同一个监听回调里面,所以我们需要使用tag或者key来区分过滤
 *
 * @throws Exception
 */
@Test
public void testTrans() throws Exception {
    // 构建消息体
    Message<String> message = MessageBuilder.withPayload("这是一个事务消息").build();
    // 发送事务消息(同步的) 最后一个参数才是消息主题
    TransactionSendResult transaction = rocketMQTemplate.sendMessageInTransaction("test", message, "消息的参数");
    // 拿到本地事务状态
    System.out.println(transaction.getLocalTransactionState());
    // 挂起jvm,因为事务的回查需要一些时间
    System.in.read();
}

rocketmq-producer 添加一个本地事务消息的监听(半消息)

/**
 * 事务消息的监听与回查
 * 类上添加注解@RocketMQTransactionListener 表示这个类是本地事务消息的监听类
 * 实现RocketMQLocalTransactionListener接口
 * 两个方法为执行本地事务,与回查本地事务
 */
@Component
@RocketMQTransactionListener(corePoolSize = 4,maximumPoolSize = 8)
public class TmMsgListener implements RocketMQLocalTransactionListener {
 
    /**
     * 执行本地事务,这里可以执行想做的业务,比如操作数据库
     *     操作成功就 return RocketMQLocalTransactionState.COMMIT;
     *     操作失败就 return RocketMQLocalTransactionState.UNKNOWN;
     * 可以使用try catch来控制成功或者失败
     * @param msg
     * @param arg
     * @return
     */
    @Override
    public RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) {
        // 拿到消息参数
        System.out.println(arg);
        // 拿到消息头
        System.out.println(msg.getHeaders());
        // 执行业务
        // 返回状态 COMMIT 或者 UNKNOWN
        if (成功){
            return RocketMQLocalTransactionState.COMMIT;
        }else{
            return RocketMQLocalTransactionState.UNKNOWN;
        }        
    }
 
    /**
     * 此方法为回查方法,执行需要等待一会 
     * 回查本地事务,只有上面的执行方法返回UNKNOWN时,才执行下面的方法 默认是1min回查
     * 这里可以执行一些检查的方法
     * 如果返回COMMIT,那么本地事务就算是提交成功了,消息就会被消费者看到
     *
     * @param msg
     * @return
     */
    @Override
    public RocketMQLocalTransactionState checkLocalTransaction(Message msg) {
        System.out.println(msg);
        return RocketMQLocalTransactionState.COMMIT;
    }
}

事务消息的处理逻辑

  1. 消息会先到事务监听类的执行方法
  2. 如果返回状态为COMMIT,则消费者可以直接监听到
  3. 如果返回状态为ROLLBACK,则消息发送失败,直接回滚
  4. 如果返回状态为UNKNOW,则过一段时间走回查方法
  5. 如果回查方法返回状态为UNKNOW或者ROLLBACK,则消息发送失败,直接回滚
  6. 如果回查方法返回状态为COMMIT,则消费者可以直接监听到

消息过滤

tag过滤(常在消费者端过滤)

  • 从源码注释得知,tag带在主题后面用:来携带tag

在这里插入图片描述

  • org.apache.rocketmq.spring.support.RocketMQUtil 的getAndWrapMessage方法里面看到了具体细节,keys要在消息头里面携带

在这里插入图片描述

生产者

/**
 * 发送一个带tag的消息
 *
 * @throws Exception
 */
@Test
public void testTagMsg() throws Exception {
    // 发送一个tag为java的数据
    rocketMQTemplate.syncSend("test-tag:java", "我是一个带tag的消息");
}

TagMsgListener

1、类上添加注解@Component和@RocketMQMessageListener。selectorType = SelectorType.TAG, 指定使用tag过滤。 (也可以使用sql92 需要在配置文件broker.conf中开启enbalePropertyFilter=true)

2、selectorExpression = “java” 表达式,默认是*,支持"tag1 || tag2 || tag3" 监听多个标签 3、实现RocketMQListener接口,注意泛型的使用

注意:一个标签,用一个消费者组!!!
package com.test.listener;
 
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.annotation.SelectorType;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Component;

@Component
@RocketMQMessageListener(topic = "test-tag",
        consumerGroup = "test-tag-group",
        // 注明使用tag模式
        selectorType = SelectorType.TAG,
        selectorExpression = "java||C"
)
public class TagMsgListener implements RocketMQListener<String> {
 
    /**
     * 消费消息的方法
     *
     * @param message
     */
    @Override
    public void onMessage(String message) {
        System.out.println(message);
    }
}
sql92表达式

一般不用这种模式

发送消息的时候携带数字标签

在这里插入图片描述

监听的消息的时候,通过数字来过滤消息

在这里插入图片描述

表达式可用语法:

  • AND, OR
  • >, >=, <, <=, =
  • BETWEEN A AND B, equals to >=A AND <=B
  • NOT BETWEEN A AND B, equals to >B OR <A
  • IN (‘a’, ‘b’), equals to =‘a’ OR =‘b’, this operation only support String type.
  • IS NULL, IS NOT NULL, check parameter whether is null, or not.
  • =TRUE, =FALSE, check parameter whether is true, or false.

Example: (a > 10 AND a < 100) OR (b IS NOT NULL AND b=TRUE)

Key过滤(可以在事务监听的类里面区分)

生产者

key需要放在消息头上面

/**
 * 发送一个带 key 的消息,我们使用事务消息,打断点查看消息头
 *
 * @throws Exception
 */
@Test
public void testKeyMsg() throws Exception {
    // 发送一个key为spring的事务消息
    Message<String> message = MessageBuilder.withPayload("我是一个带key的消息")
            .setHeader(RocketMQHeaders.KEYS, "spring")
            .build();
    rocketMQTemplate.sendMessageInTransaction("test", message, "我是一个带key的消息");
}

断点发送这个消息,查看事务里面消息头

在这里插入图片描述

我们在mq的控制台也可以看到

在这里插入图片描述

两种消费模式

RocketMQ消息消费的模式分为两种:负载均衡模式和广播模式

  • CLUSTERING:负载均衡模式,多个消费者交替消费同一个主题里面的消息
  • BROADCASTING:广播模式,每个消费者都消费一遍订阅的主题的消息

消费者1

再搭建一个消费者rocketmq-consumer-b,依赖和配置文件和rocketmq-consumer一致,记住端口修改一下,避免占用。rocketmq-consumer-b添加一个监听

package com.test.listener;
 
import org.apache.rocketmq.spring.annotation.MessageModel;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Component;
 
/**
 * messageModel  指定消息消费的模式
 *      CLUSTERING 为负载均衡模式
 *      BROADCASTING 为广播模式
 */
@Component
@RocketMQMessageListener(topic = "test",
        consumerGroup = "test-group",
        // 集群模式
        messageModel = MessageModel.CLUSTERING
)
public class ConsumerBListener implements RocketMQListener<String> {
 
    @Override
    public void onMessage(String message) {
        System.out.println(message);
    }
}

消费者2

修改rocketmq-consumer的SimpleMsgListener

/**
 * 创建一个简单消息的监听
 * 1.类上添加注解@Component和@RocketMQMessageListener
 *
 * @RocketMQMessageListener(topic = "test", consumerGroup = "test-group")
 * topic指定消费的主题,consumerGroup指定消费组,一个主题可以有多个消费者组,一个消息可以被多个不同的组的消费者都消费
 * 2.实现RocketMQListener接口,注意泛型的使用
 */
@Component
@RocketMQMessageListener(topic = "test", 
        consumerGroup = "test-group",
        messageModel = MessageModel.CLUSTERING)
public class SimpleMsgListener implements RocketMQListener<String> {
 
    @Override
    public void onMessage(String message) {
        System.out.println(new Date());
        System.out.println(message);
    }
}

启动两个消费者

两个消费者是一个集群,且是同一个消费者组。当然,两个消费者可以写在一个项目中,写两个监听器就行

在生产者里面添加一个单元测试并且运行

/**
 * 测试消息消费的模式
 *
 * @throws Exception
 */
@Test
public void testMsgModel() throws Exception {
    for (int i = 0; i < 10; i++) {
        rocketMQTemplate.syncSend("test", "我是消息" + i);
    }
}

查看两个消费者的控制台

发现是负载均衡的模式,消息被负载均衡到两个消费者上,负载均衡不是说数量一定一样

在这里插入图片描述

在这里插入图片描述

在这里插入图片描述

BROADCASTING 广播模式

@RocketMQMessageListener(topic = "test",
        consumerGroup = "test-group",
        // 集群模式
        messageModel = MessageModel.BROADCASTING
)

重启测试,广播模式下,每个消费者都消费了这些消息

项目中一般部署多台机器,消费者部署 2 到 3 个,根据业务可以选择具体的模式来配置。广播模式是不会更新消费者位点的,它在乎消费失败,也不会重试,就广播一次

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2178156.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

STM32+ADC+扫描模式

1 ADC简介 1 ADC(模拟到数字量的桥梁) 2 DAC(数字量到模拟的桥梁)&#xff0c;例如&#xff1a;PWM&#xff08;只有完全导通和断开的状态&#xff0c;无功率损耗的状态&#xff09; DAC主要用于波形生成&#xff08;信号发生器和音频解码器&#xff09; 3 模拟看门狗自动监…

Ract vs Vue 你更喜欢谁?

React 和 Vue 是当今最受欢迎的两个前端框架&#xff0c;各自有其独特的特点和优势。以下是对这两个框架的详细比较和分析&#xff0c;以帮助你了解它们的异同和适用场景&#xff1a; React 简介 React 是由 Facebook 开发和维护的一个开源 JavaScript 库&#xff0c;主要用于…

OpenAI员工流失的背后:地盘争夺、倦怠、薪酬要求

近日&#xff0c;OpenAI的CTO Mira Murati宣布离职&#xff0c;同一天&#xff0c;首席研究官Bob McGrew、研究副总裁Barret Zoph也宣布离职。 据统计&#xff0c;这已经是2024年第11起OpenAI高管离职事件了。 至今&#xff0c;开启“ChatGPT时刻”的四位OpenAI领袖&#xff…

河南移动:核心营业系统稳定运行超300天,数据库分布式升级实践|OceanBase案例

河南移动&#xff0c;作为电信全业务运营企业&#xff0c;不仅拥有庞大的客户群体和业务规模&#xff0c;还引领着业务产品与服务体系的创新发展。河南移动的原有核心营业系统承载着超过6000万的庞大用户量&#xff0c;管理着超过80TB的海量数据&#xff0c;因此也面临着数据规…

扩散模型(2)--1

1.简介 生成模型通过学习并建模输入数据的分布&#xff0c;从而采集生成新的样木&#xff0c;该模型广泛运用于图片视频生成、文本生成和药物分子生成。扩散模型是一类概率生成模型&#xff0c;扩散模型通过向数据中逐步加入噪声来破坏数据的结构&#xff0c;然后学习一个相对应…

在Windows系统上安装的 Boost C++ 库

步骤一 https://www.boost.org/users/history/version_1_86_0.html 下载Boost库文件: 步骤二 安装: https://www.boost.org/doc/libs/1_52_0/doc/html/bbv2/installation.html 点击运行.\bootstrap.bat脚本在当前目录的powershell中执行:./b2 install --prefixPREFIX 然后…

优选拼团平台架构解析与关键代码逻辑概述

一、系统架构设计 唐古拉优选拼团平台采用多层架构设计&#xff0c;主要包括前端展示层、业务逻辑层、数据访问层及数据存储层。 前端展示层&#xff1a;负责用户界面的展示和交互&#xff0c;包括商品列表、拼团详情、订单管理等页面。前端采用现代前端框架&#xff08;如Vue…

第十四周学习周报

目录 摘要Abstract1. LSTM的代码实现2. 序列到序列模型3. 梯度与方向导数总结 摘要 在上周的学习基础之上&#xff0c;本周学习的内容有LSTM的代码实现&#xff0c;通过对代码的学习进一步加深了对LSTM的理解。为了切入到transformer的学习&#xff0c;本文通过对一些应用例子…

JUC高并发编程4:集合的线程安全

1 内容概要 2 ArrayList集合线程不安全 2.1 ArrayList集合操作Demo 代码演示 /*** list集合线程不安全*/ public class ThreadDemo4 {public static void main(String[] args) {// 创建ArrayList集合List<String> list new ArrayList<>();for (int i 0; i <…

铺铜修改后自动重铺

很多初学者对于敷铜操作感到比较麻烦&#xff1a;为什么每次打过孔&#xff0c;修改走线后都需要手动右击-重新修改敷铜。如何提升layout的效率&#xff1f; 版本&#xff1a;Altium Designer 21.9.2 首先&#xff0c;点击面板右边的小齿轮&#xff0c;进入设置 接下来&#…

9.29学习

1.线上问题rebalance 因集群架构变动导致的消费组内重平衡&#xff0c;如果kafka集内节点较多&#xff0c;比如数百个&#xff0c;那重平衡可能会耗时导致数分钟到数小时&#xff0c;此时kafka基本处于不可用状态&#xff0c;对kafka的TPS影响极大 产生的原因 ①组成员数量发…

【C++并发入门】摄像头帧率计算和多线程相机读取(上):并发基础概念和代码实现

前言 高帧率摄像头往往应用在很多opencv项目中&#xff0c;今天就来通过简单计算摄像头帧率&#xff0c;抛出一个单线程读取摄像头会遇到的问题&#xff0c;同时提出一种解决方案&#xff0c;使用多线程对摄像头进行读取。同时本文介绍了线程入门的基础知识&#xff0c;讲解了…

2-107 基于matlab的hsv空间双边滤波去雾图像增强算法

基于matlab的hsv空间双边滤波去雾图像增强算法&#xff0c;原始图像经过光照增强后&#xff0c;将RGB转成hsv&#xff0c;进行图像增强处理&#xff0c;使图像更加清晰。程序已调通&#xff0c;可直接运行。 下载源程序请点链接&#xff1a; 2-107 基于matlab的hsv空间双边滤…

“找不到emp.dll,无法继续执行代码”需要怎么解决呢?分享6个解决方法

在日常使用电脑玩游戏的过程中&#xff0c;我们可能会遇到一些错误提示&#xff0c;其中最常见的就是“emp.dll丢失”。那么&#xff0c;emp.dll到底是什么&#xff1f;它为什么会丢失&#xff1f;丢失后会对我们的电脑产生什么影响&#xff1f;本文将为您详细解析emp.dll的概念…

超详细的华为ICT大赛报名流程

1、访问华为人才在线官网&#xff0c;点击右上角“登录/注册“&#xff0c;登录华为账号。 报名链接&#xff1a; https://e.huawei.com/cn/talent/cert/#/careerCert?navTypeauthNavKey ▲如已有华为Uniportal账号&#xff0c;完成实名认证后方可报名大赛。 ▲如没有华为…

【有啥问啥】具身智能(Embodied AI):人工智能的新前沿

具身智能&#xff08;Embodied AI&#xff09;&#xff1a;人工智能的新前沿 引言 在人工智能&#xff08;AI&#xff09;的进程中&#xff0c;具身智能&#xff08;Embodied AI&#xff09;正逐渐成为研究与应用的焦点。具身智能不仅关注于机器的计算能力&#xff0c;更强调…

需求5:增加一个按钮

在之前的几个需求中&#xff0c;我们逐步从修改字段到新增字段&#xff0c;按部就班地完成了相关工作。通过最近的文章&#xff0c;不难看出我目前正在处理前端的“未完成”和“已完成”按钮。借此机会&#xff0c;我决定趁热打铁&#xff0c;重新梳理一下之前关于按钮实现的需…

4、MapReduce编程实践

目录 1、创建文件2、启动HDFS3、启动eclipse 创建项目并导入jar包file->new->java project导入jar包finish 4、编写Java应用程序5、编译打包应用程序&#xff08;1&#xff09;查看直接运行结果&#xff08;2&#xff09;打包程序&#xff08;3&#xff09;查看 JAR 包是…

软硬协同方案破解IT瓶颈,龙蜥衍生版KOS助力内蒙古大学成功迁移10+业务软件 | 龙蜥案例

2024 云栖大会上&#xff0c;龙蜥社区发布了《龙蜥操作系统生态用户实践精选 V2》&#xff0c;为面临 CentOS 迁移的广大用户提供成熟实践样板。截至目前&#xff0c;阿里云、浪潮信息、中兴通讯 | 新支点、移动、联通、龙芯、统信软件等超 12 家厂商基于龙蜥操作系统发布商业衍…

【在Linux世界中追寻伟大的One Piece】命名管道

目录 1 -> 命名管道 1.1 -> 创建一个命名管道 1.2 -> 匿名管道与命名管道的区别 1.3 -> 命名管道的打开规则 1.4 -> 例子 1 -> 命名管道 管道应用的一个限制就是只能在具有共同祖先(具有亲缘关系)的进程间通信。如果我们想在不相关的进程之间交换数据&…