SpringBoot集成RocketMQ实现六种消息

news2025/1/8 5:03:27

1. 简介

        RocketMQ 支持多种消息类型以满足不同的业务需求

  1. 普通消息(Standard Message)

    1. 这是最常用的消息类型,适用于大多数场景。

    2. 可以设置延迟级别(Delay Levels),但不支持消息轨迹。

  2. 顺序消息(Ordered Message)

    1. 用于需要保证消息顺序的场景,例如订单状态更新。

    2. 可以按照消息的发送顺序或者按照消息键(Message Key)的顺序来发送和消费消息。

    3. 顺序消息可以是单播或组播。

  3. 延时消息(Delayed Message)

    1. 支持不同级别的延时,例如 1s、5s、10s、1m、2m、3m 等。

    2. 消息实际发送后并不会立即被消费,而是在指定的延时时间后才可用。

    3. 可以用于需要延时处理的场景,如支付超时未支付自动取消订单。

  4. 事务消息(Transaction Message)

    1. 用于需要保证事务性的场景,确保本地事务和消息发送的原子性。

    2. 发送消息包含两个阶段:preparecommitrollback

    3. 需要用户实现本地事务逻辑,并在 prepare 阶段提交消息,在本地事务完成后再调用 commitrollback

  5. 批量消息(Batch Message)

    1. 允许一次发送多条消息,减少网络请求次数,提高吞吐量。

    2. 批量消息内部是多条独立的消息,消费时也是逐条消费。

  6. 过滤消息(Filter Message)

    1. 允许消费者订阅主题时指定标签(Tag),只消费带有指定标签的消息。

    2. 可以提高消费的效率,只关注感兴趣的消息内容。

2. 环境搭建

2.1 开发环境

JDK

1.8

Maven

3.6

rocketmq-client

4.9.6

springboot

2.7.12

docker

27.1.1

docker-compose

1.29.2

2.2 rocketmq组件(4.x架构只需要启动server和broker)

云服务器配置最好选用2核4G,如果是2核2G得话只能启动这两个容器(server和broker一共需要约1.3G内存)

在云服务器中创建一个rocketmq文件夹,进入文件夹(余下操作在文件夹中执行,隔离)

mkdir rocketmq cd rocketmq

配置config

# Configure the broker's IP address 
# 1,云服务器中,ip为服务器的地址 
# 2,在本地,ip为127.0.0.1 

echo "brokerIP1=127.0.0.1" > broker.conf

docker-compose.yml

version: '3.8'

services:
  namesrv:
    image: apache/rocketmq:4.9.6
    container_name: rmqnamesrv
    ports:
      - 9876:9876
    networks:
      - rocketmq
    command: sh mqnamesrv

  broker:
    image: apache/rocketmq:4.9.6
    container_name: rmqbroker
    ports:
      - 10909:10909
      - 10911:10911
      - 10912:10912
    environment:
      - NAMESRV_ADDR=rmqnamesrv:9876
    volumes:
      - ./broker.conf:/home/rocketmq/rocketmq-4.9.6/conf/broker.conf
    depends_on:
      - namesrv
    networks:
      - rocketmq
    command: sh mqbroker -c /home/rocketmq/rocketmq-4.9.6/conf/broker.conf

networks:
  rocketmq:
    driver: bridge

服务器需要开放端口--->server使用9876端口,broker使用10911端口

执行文件

docker-compose up -d

docker ps(查看启动容器)

2.3 引入依赖

没有使用springboot框架

<!--         https://mvnrepository.com/artifact/org.apache.rocketmq/rocketmq-client -->
                <dependency>
                    <groupId>org.apache.rocketmq</groupId>
                    <artifactId>rocketmq-client</artifactId>
                    <version>4.9.6</version>
                </dependency>

使用springboot集成rocketmq

<dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-spring-boot-starter</artifactId>
    <version>2.2.3</version>
</dependency>

3. 简单示例

3.1 生产者

  • DefaultMQProducer:RocketMQ提供的一个生产者类,用于发送消息。

  • SendResult:发送消息后返回的结果对象,包含发送状态和消息队列等信息。

  • Message:代表要发送的消息对象。

import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;

public class ProducerExample {
    public static void main(String[] args) throws Exception {
        // Create producer instance and set the producer group name
        DefaultMQProducer producer = new DefaultMQProducer("producer_group");
        // Set the Name Server address (replace with actual Name Server address)
        producer.setNamesrvAddr("110.41.55.242:9876");
        producer.setSendMsgTimeout(30000);
        producer.start();

        try {
            // Create a message instance, specifying the topic, tag, and message body
            Message msg = new Message("TestTopic", "TagA", ("Hello RocketMQ unique").getBytes());
            // Send the message and get the send result
            SendResult sendResult = producer.send(msg);
            System.out.println("Message sent: " + new String(msg.getBody()));
            System.out.println("Send result: " + sendResult);
        } catch (Exception e) {
            e.printStackTrace();
            System.out.println("Message sending failed.");
        } finally {
            // Shutdown the producer
            producer.shutdown();
        }
    }
}
  • 创建一个DefaultMQProducer实例,并设置生产者组名称为producer_group。生产者组是逻辑上的分组,可以一个或多个生产者属于同一个组。

  • 设置NameServer的地址和端口,这是RocketMQ用于服务发现的组件。

  • 设置发送消息的超时时间为30秒(30000毫秒)。这是生产者在发送消息时等待响应的最长时间。

  • 启动生产者实例,使其能够发送消息。

  • 创建一个Message实例,指定主题为TestTopic,标签为TagA,消息体为字符串"Hello RocketMQ unique"

  • 调用send方法发送消息,并获取发送结果SendResult

3.2 消费者

  • DefaultMQPushConsumer:RocketMQ提供的一个消费者类,用于订阅并消费消息。

  • ConsumeConcurrentlyContext:并发消费消息时的上下文对象。

  • ConsumeConcurrentlyStatus:并发消费消息后的返回状态。

  • MessageListenerConcurrently:并发消息监听器接口。

  • MessageExt:代表单个消息的扩展对象。

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;

import java.util.List;

public class ConsumerExample {
    public static void main(String[] args) throws Exception {
        // Create consumer instance and set the consumer group name
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumer_group");
        // Set the Name Server address (replace with actual Name Server address)
        consumer.setNamesrvAddr("110.41.55.242:9876");
        // Subscribe to the specified topic and tag (* means all tags)
        consumer.subscribe("TestTopic", "*");

        // Register message listener
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                for (MessageExt msg : msgs) {
                    System.out.println("Received message: " + new String(msg.getBody()));
                }
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });

        // Start the consumer
        consumer.start();
        System.out.println("Consumer started.");
    }
}
  • 创建一个DefaultMQPushConsumer实例,并设置消费者组名称为consumer_group。消费者组是逻辑上的分组,可以一个或多个消费者属于同一个组。

  • 设置NameServer的地址和端口,这是RocketMQ用于服务发现的组件。

  • 订阅TestTopic主题下所有标签(*表示所有标签)的消息(对于过滤消息,需要指定TAG而不是全匹配)。

  • 实现MessageListenerConcurrently接口:创建一个新的匿名内部类实例,实现MessageListenerConcurrently接口。

  • 消费消息:重写consumeMessage方法,该方法包含业务逻辑,用于处理接收到的消息。

    • 遍历消息:遍历批量消息msgs,打印每条消息的内容。

    • 返回消费状态:返回ConsumeConcurrentlyStatus.CONSUME_SUCCESS表示消息消费成功。

3.3 测试结果

启动消费者

Consumer started.

启动生产者

Message sent: Hello RocketMQ unique
Send result: SendResult [sendStatus=SEND_OK, msgId=7F0000018CB018B4AAC2229057A00000, offsetMsgId=6E2937F200002A9F0000000000000242, messageQueue=MessageQueue [topic=TestTopic, brokerName=95d68505f648, queueId=1], queueOffset=2]

此时消费者控制台输出

Consumer started. Received message: Hello RocketMQ unique

4. SpringBoot集成Rocket MQ发送消息

示例代码结合项目实际运用rocketmq去做封装。

4.1 工程结构

main
├── java
│   ├── com
│   │   └── xiaokai
│   │       ├── config--->一些配置类
│   │       │   └── LogMQConfig.java--->打印mq相关信息
│   │       ├── entity--->实体类
│   │       │   ├── orderEntity.java
│   │       │   └── UserEntity.java
│   │       ├── event--->事件
│   │       │   ├── BaseEvent.java--->基础事件
│   │       │   ├── commonMessage.java--->普通消息
│   │       │   └── EventPublisher.java--->事件发布
│   │       ├── listen--->事件监听
│   │       │   └── UserCommonConsumer.java--->消费消息
│   │       ├── service--->服务
│   │       │   └── UserService.java--->调用事件发布
│   │       └── RocketMQApplication.java--->启动程序
├── resources
│   └── application.yml--->配置文件
└── test
    ├── java
    │   ├── com
    │   │   └── xiaokai
    │           ├── RocketMQTest.java--->测试

4.2 基础代码

LogMQConfig.java

package com.xiaokai.config;

import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.autoconfigure.RocketMQProperties;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationContext;
import org.springframework.stereotype.Component;

import java.util.Arrays;

/**
 * Author:yang
 * Date:2024-10-06 17:04
 */
@Component
@Slf4j
public class LogMQConfig {

    @Autowired
    private ApplicationContext applicationContext;

    public void printLog() {
        RocketMQProperties properties = applicationContext.getBean(RocketMQProperties.class);
        String nameServer = Arrays.toString(properties.getNameServer().split(","));
        String producerGroup = properties.getProducer().getGroup();
        // 打印配置信息
        log.info("RocketMQ NameServer: {}", nameServer);
        log.info("RocketMQ Producer Group: {}", producerGroup);
    }
}

UserEntity.java

package com.xiaokai.entity;

import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;


/**
 * Author:yang
 * Date:2024-09-26 10:55
 */
@Data
@Builder
@AllArgsConstructor
@NoArgsConstructor
public class UserEntity {

    private int id;

    private String name;

    private String password;

    private String iphone;

    private String address;

    private int money;

}

Order.java

package com.xiaokai.entity;

import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;

/**
 * Author:yang
 * Date:2024-09-26 10:52
 * Description:订单实体类
 */
@Data
@Builder
@AllArgsConstructor
@NoArgsConstructor
public class OrderEntity {

    private Integer userId;

    private String name;

    private Integer number;
}

BaseEvent<T>.java

package com.xiaokai.event;

import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;

import java.util.Date;

/**
 * @description 基础事件
 */
@Data
public abstract class BaseEvent<T> {

    public abstract EventMessage<T> buildEventMessage(T data);

    public abstract String topic();

    @Data
    @Builder
    @AllArgsConstructor
    @NoArgsConstructor
    public static class EventMessage<T> {
        private String id;
        private Date timestamp;
        private T data;
    }

}

CommonMessage.java

package com.xiaokai.event;

import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.apache.commons.lang3.RandomStringUtils;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;
import java.util.Date;

/**
 * Author:yang
 * Date:2024-10-06 16:08
 * Description:普通消息
 */
@Service
public class CommonMessage extends BaseEvent<CommonMessage.SendUserMessage> {

    @Value("${rocketmq.topic.common}")
    private String topic;

    // 构建消息
    @Override
    public EventMessage<SendUserMessage> buildEventMessage(SendUserMessage data) {
        return EventMessage.<SendUserMessage>builder()
                .data(data)
                .timestamp(new Date())
                .id(RandomStringUtils.randomNumeric(10))
                .build();
    }

    @Override
    public String topic() {
        return topic;
    }

    @Builder
    @Data
    @AllArgsConstructor
    @NoArgsConstructor
    public static class SendUserMessage{
        private int id;

        private String name;

        private String password;
    }
}

EventPublisher.java

package com.xiaokai.event;

import com.alibaba.fastjson.JSON;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

/**
 * Author:yang
 * Date:2024-10-06 15:58
 * Description:事件发布
 */
@Service
@Slf4j
public class EventPublisher {

    @Autowired
    private RocketMQTemplate rocketMQTemplate;

    public void publish(String topic, BaseEvent.EventMessage<?> message) {
        try {
            // 转换消息
            String payload = JSON.toJSONString(message);
            // 发送消息
            rocketMQTemplate.convertAndSend(topic, payload);
            log.info("publish event success, topic:{}, message:{}", topic, payload);
        }catch (Exception e){
            log.error("publish event error, topic:{}, message:{}, error:{}", topic, message, e.getMessage());
            throw e;
        }
    }

    public void publish(String topic, String message) {
        try {
            // 发送消息
            rocketMQTemplate.convertAndSend(topic, message);
            log.info("publish event success, topic:{}, message:{}", topic, message);
        }catch (Exception e){
            log.error("publish event error, topic:{}, message:{}, error:{}", topic, message, e.getMessage());
            throw e;
        }
    }
}

UserService.java

package com.xiaokai.service;

import com.xiaokai.entity.UserEntity;
import com.xiaokai.event.BaseEvent;
import com.xiaokai.event.CommonMessage;
import com.xiaokai.event.EventPublisher;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cglib.proxy.Enhancer;
import org.springframework.stereotype.Service;

/**
 * Author:yang
 * Date:2024-10-06 16:20
 */
@Service
@Slf4j
public class UserService {

    @Autowired
    private CommonMessage commonMessage;
    @Autowired
    private EventPublisher eventPublisher;

    public void sendMsg(UserEntity user){
        log.info("构建普通发送消息");
        CommonMessage.SendUserMessage message = CommonMessage.SendUserMessage.builder()
                .id(user.getId())
                .name(user.getName())
                .password(user.getPassword()).build();
        BaseEvent.EventMessage<CommonMessage.SendUserMessage> sendUserMessageEventMessage = commonMessage.buildEventMessage(message);

        eventPublisher.publish(commonMessage.topic(),sendUserMessageEventMessage);
        log.info("发送普通消息完成");
    }
}

4.3 普通消息

添加监听器

UserCommonConsumer.java

package com.xiaokai.listen;

import apache.rocketmq.v2.Message;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Service;

/**
 * Author:yang
 * Date:2024-10-06 16:31
 * Description:普通消息消费
 */
@Slf4j
@Service
@RocketMQMessageListener(topic = "${rocketmq.topic.common}", consumerGroup = "${rocketmq.consumer.group}")
public class UserCommonConsumer implements RocketMQListener<String> {

    @Override
    public void onMessage(String s) {
        log.info("Received message: {}", s);
    }
}

@RocketMQMessageListener不能放在方法上。

测试

package com.xiaokai;

import com.xiaokai.config.LogMQConfig;
import com.xiaokai.entity.UserEntity;
import com.xiaokai.service.UserService;
import lombok.extern.slf4j.Slf4j;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;

import javax.annotation.PreDestroy;

/**
 * Author:yang
 * Date:2024-10-06 16:32
 */
@SpringBootTest(classes = RocketMQApplication.class)
@RunWith(SpringRunner.class)
@Slf4j
public class RocketMQTest {

    @Autowired
    private UserService userService;
    @Autowired
    private LogMQConfig logMQConfig;


    @Test
    public void test_sendCommonMsg() {
        logMQConfig.printLog();
        UserEntity user = UserEntity.builder().id(100)
                .name("xiaokai")
                .password("111111")
                .build();
        userService.sendMsg(user);
    }
}
  • 启动RocketMQApplication.java(启动监听器)

  • 启动测试方法

测试结果

// test
 : RocketMQ NameServer: [110.41.55.242:9876]
 : RocketMQ Producer Group: producer-group
 : 构建普通发送消息
 : publish event success, topic:common-message, message:{"data":{"id":100,"name":"xiaokai","password":"111111"},"id":"8401537837","timestamp":1728295047107}
 : 发送普通消息完成
 
 // application
  Received message: {"data":{"id":100,"name":"xiaokai","password":"111111"},"id":"8401537837","timestamp":1728295047107}

4.4 过滤消息

过滤消息是在普通消息的topic后面跟上:tag

添加监听器:tag为xiaokai

package com.xiaokai.listen;

import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Service;

/**
 * Author:yang
 * Date:2024-10-06 16:31
 * Description:过滤消息消费
 */
@Slf4j
@Service
@RocketMQMessageListener(topic = "${rocketmq.topic.filter}", consumerGroup = "${rocketmq.consumer.group}",
        selectorExpression = "xiaokai")
public class UserFilterConsumer implements RocketMQListener<String> {

    @Override
    public void onMessage(String s) {
        log.info("Received filter message: {}", s);
    }
}

在事件发布中添加方法

/**
 * 发布事件
 *
 * @param topic   主题
 * @param message 消息
 * @param tag     标签
 */
public void publish(String topic, BaseEvent.EventMessage<?> message, String tag) {
    try {
        // 转换消息
        String payload = JSON.toJSONString(message);
        // 发送过滤消息--->在主题后面加上":",然后加上标签:destination = topic:tag
        rocketMQTemplate.convertAndSend(topic + ":" + tag, payload);
        log.info("publish event success, topic:{}, message:{}", topic, payload);
    } catch (Exception e) {
        log.error("publish event error, topic:{}, message:{}, error:{}", topic, message, e.getMessage());
        throw e;

    }
}

服务中添加方法

public void sendFilterMsg(UserEntity user,String tag){
    log.info("构建过滤发送消息");
    FilterMessage.SendUserMessage message = FilterMessage.SendUserMessage.builder()
            .id(user.getId())
            .name(user.getName())
            .password(user.getPassword()).build();
    BaseEvent.EventMessage<FilterMessage.SendUserMessage> sendUserMessageEventMessage = filterMessage.buildEventMessage(message);

    eventPublisher.publish(filterMessage.topic(),sendUserMessageEventMessage,tag);
    log.info("发送过滤消息完成");
}

启动application:启动了两个监听器

running container: DefaultRocketMQListenerContainer{consumerGroup='consumer-group', namespace='', nameServer='110.41.55.242:9876', topic='common-message', consumeMode=CONCURRENTLY, selectorType=TAG, selectorExpression='*', 
Register the listener to container, listenerBeanName:userCommonConsumer, containerBeanName:org.apache.rocketmq.spring.support.DefaultRocketMQListenerContainer_1
running container: DefaultRocketMQListenerContainer{consumerGroup='consumer-group', namespace='', nameServer='110.41.55.242:9876', topic='filter-message', consumeMode=CONCURRENTLY, selectorType=TAG, selectorExpression='xiao
Register the listener to container, listenerBeanName:userFilterConsumer, containerBeanName:org.apache.rocketmq.spring.support.DefaultRocketMQListenerContainer_2

测试

/**
 * 发送带tag消息
 */
@Test
public void test_sendFilterMsg() {
    logMQConfig.printLog();
    UserEntity user = UserEntity.builder().id(100)
            .name("xiaokai")
            .password("111111")
            .build();
    userService.sendFilterMsg(user,"xiaokai");
}

: 构建过滤发送消息
: publish event success, topic:filter-message, message:{"data":{"id":100,"name":"xiaokai","password":"111111"},"id":"4163167680","timestamp":1728303830159}
: 发送过滤消息完成
2024-10-07 20:24:03.610  INFO 43732 --- [onsumer-group_1] com.xiaokai.listen.UserFilterConsumer    : Received filter message: {"data":{"id":100,"name":"xiaokai","password":"111111"},"id":"4163167680","timestamp":1728303830159}

4.5 延迟消息

Apache RocketMQ 一共支持18种级别的延迟投递,具体如下:

延迟级别延迟时间延迟级别延迟时间
11秒106分钟
25秒117分钟
310 秒128分钟
430 秒139分钟
51分钟1410分钟
62分钟1520分钟
73分钟1630分钟
84分钟171小时
95分钟182小时

添加延迟方法

/**
 * 发布事件
 *
 * @param topic   主题
 * @param message 消息
 * @param level   delay级别
 */
public void publishDelay(String topic, BaseEvent.EventMessage<?> message, Integer level) {
    try {
        // 转换消息
        String payload = JSON.toJSONString(message);

        Message<String> msg = MessageBuilder.withPayload(payload)
                .setHeader(MessageConst.PROPERTY_DELAY_TIME_LEVEL, level)
                .build();
        // 设置延时级别
        rocketMQTemplate.syncSend(topic, msg, 3000, level);
        log.info("publish event success, topic:{}, message:{}", topic, payload);
    } catch (Exception e) {
        log.error("publish event error, topic:{}, message:{}, error:{}", topic, message, e.getMessage());
        throw e;
    }
}

4.6 批量消息

添加发送消息方法

/**
 * 发布事件
 * @param topic   主题
 * @param message 消息集合
 */
public void publishBatch(String topic, List<BaseEvent.EventMessage<?>> message) {
    try {
        // 消息集合
        List<Message<String>> messages = new ArrayList<>();
        // 转换消息
        for (BaseEvent.EventMessage<?> eventMessage : message) {
            String payload = JSON.toJSONString(eventMessage);
            Message<String> msg = MessageBuilder.withPayload(payload)
                    .build();
            messages.add(msg);
        }
        rocketMQTemplate.syncSend(topic, messages);
        log.info("publish event success, topic:{}, message:{}", topic, message);
    } catch (Exception e) {
        log.error("publish event error, topic:{}, message:{}, error:{}", topic, message, e.getMessage());
        throw e;
    }
}

添加服务方法

public void sendBatchMsg(UserEntity user){
    log.info("构建批量发送消息");
    List<BaseEvent.EventMessage<?>> messages = new ArrayList<>();
    for (int i = 0; i < 10; i++) {
        CommonMessage.SendUserMessage message = CommonMessage.SendUserMessage.builder()
                .id(user.getId())
                .name(user.getName())
                .password(user.getPassword()).build();
        BaseEvent.EventMessage<CommonMessage.SendUserMessage> sendUserMessageEventMessage = commonMessage.buildEventMessage(message);
        messages.add(sendUserMessageEventMessage);
    }
    eventPublisher.publishBatch(commonMessage.topic(),messages);
    log.info("发送批量消息完成");
}

测试:往common-message的主题中发送批量消息

 /**
 * 发送批量消息
 */
@Test
public void test_sendBatchMsg() {
    logMQConfig.printLog();
    UserEntity user = UserEntity.builder().id(100)
            .name("xiaokai")
            .password("111111")
            .build();
    userService.sendBatchMsg(user);
}

测试结果:批量发送,单独消费

4.7 顺序消息

发送消息方法
/**
 * 发布顺序事件
 *
 * @param topic   主题
 * @param message 消息
 */
public void publishOrder(String topic, BaseEvent.EventMessage<?> message , String orderId) {
    try {
        // 转换消息
        String payload = JSON.toJSONString(message);
        rocketMQTemplate.setMessageQueueSelector(new MessageQueueSelector() {
            /**
             * 选择队列
             * @param list 默认的消息队列列表
             * @param message 传输的消息
             * @param o 传输消息时额外的参数 - send(topic, message, o)
             * @return
             */
            @Override
            public MessageQueue select(List<MessageQueue> list, org.apache.rocketmq.common.message.Message message, Object o) {
                // 全部的消息发送到第一个队列
                return list.get(0);
            }
        });
        // orderId 相同的消息会被顺序消费
        // orderId在选择器中是 o
        // payload在选择器中是 message.getPayload()
        rocketMQTemplate.syncSendOrderly(topic, payload, orderId);
        log.info("publish event success, topic:{}, message:{}", topic, message);
    } catch (Exception e) {
        log.error("publish event error, topic:{}, message:{}, error:{}", topic, message, e.getMessage());
        throw e;
    }
}

接口中list为队列,message为消息,o 为传入的对象,返回消息发送到哪个队列。上面的例子中以orderId为分片标准,所有队列的余数,即把orderId相同的消息发送到同一个队列。

4.8 事务消息

事务性消息的发送分为两个阶段,首先会有一个半消息被投递,即一条消息成功发送到MQ服务器,但是服务器没有收到Producer对该消息的第二次确认,此时该消息会被标记为“暂时无法投递”状态。

消息发送成功后会执行本地事务,并根据本地事务的结果向Broker传递半消息状态(提交或者回滚)。

如果由于网络闪退、Producer重启等原因导致某条事务消息的二次确认丢失,Broker 会发现这条长时间处于“半消息”状态的消息,并主动向 Producer 检查该消息的事务状态(Commit 或 Rollback)。因此,如果本地事务执行成功,下游就会收到该消息,否则不会收到。这样最终保证了上下游数据的一致性。

事务消息的详细执行流程如下图所示:

交易消息发送流程

  1. 生产者将半条消息发送给RocketMQ Broker

  2. 消息持久化成功后RocketMQ Broker,返回Ack给Producer,确认消息发送成功,并且是一条半消息。

  3. 生产者开始执行本地事务。

  4. Producer根据本地事务的结果,向服务端提交第二次确认(Commit或者Rollback),服务端收到确认后,处理如下逻辑。

    1. 如果第二次确认结果为Commit:服务端将该半消息标记为可交付,并交付给消费者。

    2. 如果第二次确认结果为Rollback,则服务端将回滚该事务,不会再向Consumer投递该半条消息。

  5. 在网络断线或者Producer重启等特殊情况下,如果Server没有收到Producer的第二次确认结果,或者Server收到的第二次确认结果为Unknown,则Server会在固定的时间之后向某个Producer发起回滚消息。

demo:

实现事务监听器 TransactionListener

package com.xiaokai.listen;

import com.xiaokai.transactional.TransactionService;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.producer.LocalTransactionState;
import org.apache.rocketmq.client.producer.TransactionListener;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageExt;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

/**
 * Author:yang
 * Date:2024-10-09 17:33
 * Description:事务监听器 - 用于实现事务消息
 */

@Slf4j
@Component
public class TransactionListenerImpl implements TransactionListener {


    @Autowired
    private TransactionService transactionService;

    /**
     * 作用 - 用于执行本地事务,返回事务状态,如果返回提交状态,则消息会被提交,如果返回回滚状态,则消息会被丢弃。
     * 执行时间 - 在发送半消息之后调用
     * @param message 这个参数代表了需要进行事务状态检查的消息对象。它包含了消息的主题(topic)、消息体(body)、消息标签(tags)、属性(properties)等信息。
     *                这个对象是之前在调用sendMessageInTransaction方法时创建并发送的半消息。
     * @param o 这是一个用户自定义的对象,它在调用sendMessageInTransaction发送半消息时传递给TransactionListener的。
     *          可以利用这个对象来传递任何需要在执行本地事务检查时使用的上下文信息或状态数据。
     * @return 事务状态
     */
    @Override
    public LocalTransactionState executeLocalTransaction(Message message, Object o) {
        // 调用事务服务,执行本地事务
        boolean success = transactionService.localTransaction(message);
        if (success) {
            return LocalTransactionState.COMMIT_MESSAGE;
        }
        return LocalTransactionState.UNKNOW;
    }



    /**
     * 作用 - 定时(默认5s,可以设置)检查事务状态,如果返回提交状态,则消息会被提交,如果返回回滚状态,则消息会被丢弃。
     * 执行时间 - 在检查半消息状态之前调用
     * @param messageExt 消息
     * @return 事务状态
     */
    @Override
    public LocalTransactionState checkLocalTransaction(MessageExt messageExt) {
        // 调用事务服务,检查本地事务状态
        boolean success = transactionService.checkLocalTransaction(messageExt);
        if (success) {
            return LocalTransactionState.COMMIT_MESSAGE;
        }
        return LocalTransactionState.ROLLBACK_MESSAGE;
    }
}

触发时机:

  • executeLocalTransaction(Message msg, Object arg)的触发时机

    • 当你调用sendMessageInTransaction方法发送事务性消息时,RocketMQ客户端库首先会发送一个半消息到消息服务器。

    • 然后,客户端库会调用executeLocalTransaction方法,传入消息msg和之前通过sendMessageInTransaction发送时附加的对象arg,以执行本地事务。

  • checkLocalTransaction(Message msg, Object arg)的触发时机

    • 这个方法会在半消息的回查周期内被调用。消息服务器会定时向消息生产者查询半消息对应的本地事务执行情况。

    • 客户端库会调用checkLocalTransaction方法,传入消息msg和之前附加的对象arg,以检查本地事务是否成功执行。

参数含义:

  1. Message msg

    1. 这个参数代表了需要进行事务状态检查的消息对象。它包含了消息的主题(topic)、消息体(body)、消息标签(tags)、属性(properties)等信息。这个对象是之前在调用sendMessageInTransaction方法时创建并发送的半消息。

  2. Object arg

    1. 这是一个用户自定义的对象,它在调用sendMessageInTransaction发送半消息时传递给TransactionListener的。你可以利用这个对象来传递任何需要在执行本地事务检查时使用的上下文信息或状态数据。

参数的使用场景:

  • Message msg

    • 你可以使用msg对象来获取与消息相关的信息,例如,可以通过msg.getMsgId()获取消息的唯一标识符,或者使用msg.getProperty方法获取消息的自定义属性,这些信息可能对检查本地事务状态有帮助。

  • Object arg

    • 在发送半消息之前,你可能会执行一些本地事务逻辑,比如数据库操作。在这些操作中,可能需要一些额外的上下文信息来完成事务的提交或回滚。这些信息可以通过arg参数在发送半消息时传递给executeLocalTransaction方法,然后executeLocalTransaction方法可以再将其传递给checkLocalTransaction方法。

本地事务方法

package com.xiaokai.transactional;

import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageExt;
import org.springframework.stereotype.Service;

/**
 * Author:yang
 * Date:2024-10-09 18:30
 */
@Slf4j
@Service
public class TransactionService {

    /**
     * 本地事务执行
     * @param msg
     * @return
     */
    public boolean localTransaction(Message msg) {
        // 1. 执行1方法

        // 2. 执行2方法

        // 3. 本地事务提交
        return true;
    }

    /**
     * 事务回查
     * @param messageExt
     * @return
     */
    public boolean checkLocalTransaction(MessageExt messageExt) {
        // 用消息中的部分信息,入库查询事务执行结果 - 使用orderId查询订单表是否有这个这个记录
        // 1. 查询事务执行结果

        // 2. 根据查询结果决定是否提交事务

        return true;
    }
}

监听器配置

package com.xiaokai.config;

import com.xiaokai.listen.TransactionListenerImpl;
import org.apache.rocketmq.client.producer.TransactionMQProducer;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Component;

/**
 * Author:yang
 * Date:2024-10-09 18:54
 * Description:发送事务消息,需要配置监听器
 */
@Component
public class MQTemplateConfig {

    @Autowired
    private TransactionListenerImpl transactionListener;

    @Value("${rocketmq.name-server}")
    private String nameServer;

    @Value("${rocketmq.producer.group}")
    private String producerGroup;
    
    @Bean
    public TransactionMQProducer transactionMQProducer() {
        TransactionMQProducer producer = new TransactionMQProducer("transaction_group");
        producer.setTransactionListener(transactionListener);
        // 其他配置,如setNamesrvAddr等
        producer.setNamesrvAddr(nameServer);
        producer.setProducerGroup(producerGroup);
        return producer;
    }

    @Bean
    public RocketMQTemplate rocketTransactionMQTemplate(TransactionMQProducer transactionMQProducer) {
        RocketMQTemplate template = new RocketMQTemplate();
        template.setProducer(transactionMQProducer);
        // 其他配置
        return template;
    }
}

发送事务消息

/**
 * 发布事务事件
 *
 * @param topic   主题
 * @param message 消息
 */
public void publishTransaction(String topic, BaseEvent.EventMessage<?> message , String orderId) {
    try {
        // 转换消息
        String payload = JSON.toJSONString(message);
        Message<String> msg = MessageBuilder.withPayload(payload)
                .build();

        rocketMQTemplate.sendMessageInTransaction(topic, msg , orderId);
        log.info("publish event success, topic:{}, message:{}", topic, message);
    } catch (Exception e) {
        log.error("publish event error, topic:{}, message:{}, error:{}", topic, message, e.getMessage());
        throw e;
    }
}

注:

       1.  文章全篇使用了消息同步发送方式,建议采用消息异步发送方式(发送消息时,添加回调函数监听消息)。

/**
 * 异步发布事件
 * @param topic   主题
 * @param message 消息
 */
public void publish(String topic, String message) {
    try {
        // 发送消息
        rocketMQTemplate.asyncSend(topic, message, new SendCallback() {
            @Override
            public void onSuccess(SendResult sendResult) {
                log.info("publish event success, topic:{}, message:{}", topic, message);
            }

            @Override
            public void onException(Throwable throwable) {
                log.error("publish event error, topic:{}, message:{}, error:{}", topic, message, throwable.getMessage());
            }
        });
    } catch (Exception e) {
        log.error("publish event error, topic:{}, message:{}, error:{}", topic, message, e.getMessage());
        throw e;
    }
}

        2. 刚开始启动出错误,优先检查rmqbroker容器是不是处于运行状态。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2200290.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

C/C++程序员为什么要了解汇编?了解汇编有哪些好处?如何学习汇编?

目录 1、概述 2、从汇编的角度去理解问题的若干实例说明 2.1、使用空指针去访问类的数据成员或调用类的虚函数为什么会引发崩溃&#xff1f; 2.2、从汇编代码的角度去理解多线程的执行细节&#xff0c;去理解多线程在访问共享资源时为什么要加锁 2.3、使用Windbg静态分析d…

【Matlab】Matlab 导入数据.csv或者.xlsx文件,然后使用这些数据来绘制图表

Matlab 导入数据.csv或者.xlsx文件&#xff0c;然后使用这些数据来绘制图表 初始数据 filename C:\Users\jia\Desktop\yadian\data\1Hz 2024_09_12 17_10_06.csv; 代码&#xff1a; clc;clear close all; % 读取Excel文件 filename C:\Users\jia\Desktop\yadian\data\1Hz …

一篇文章让你学懂python入门

1.编写程序输入三个整数&#xff0c;按升序输出 思路1&#xff1a;使用if-else结构进行大小比较&#xff0c;将三个数进行从小到大的排序 num1 int(input("请输入第一个整数:")) num2 int(input("请输入第二个整数&#xff1a;")) num3 int(input(&qu…

华为OD机试 - 采样过滤(Python/JS/C/C++ 2024 E卷 100分)

华为OD机试 2024E卷题库疯狂收录中&#xff0c;刷题点这里 专栏导读 本专栏收录于《华为OD机试真题&#xff08;Python/JS/C/C&#xff09;》。 刷的越多&#xff0c;抽中的概率越大&#xff0c;私信哪吒&#xff0c;备注华为OD&#xff0c;加入华为OD刷题交流群&#xff0c;…

MySQL数据的增删改查

CRUD CRUD指的是创建&#xff08;Create&#xff09;,读取&#xff08;Retrieve&#xff09;&#xff0c;更新&#xff08;Update&#xff09;和删除&#xff08;Delete&#xff09;,它是数据库或者持久层在软件系统中执行的基本功能&#xff0c;构成了大多数应用程序和服务器…

图神经网络之异构图转同构图

heterogeneous graph转homogeneous 异构图创建教程dgl.to_homogeneous语法格式例子 异构图创建教程 对于异构图创建&#xff0c;可以看异构图创建教程 dgl.to_homogeneous语法格式 dgl.to_homogeneous(G, ndataNone, edataNone, store_typeTrue, return_countFalse)G&#x…

K8s简介及环境搭建

一、Kubernetes简介 kubernetes 的本质是一组服务器集群&#xff0c;它可以在集群的每个节点上运行特定的程序&#xff0c;来对节点中的容器进行管理。目的是实现资源管理的自动化&#xff0c;主要提供了如下的主要功能&#xff1a; 自我修复&#xff1a;一旦某一个容器崩溃&a…

使用HashiCorp Nomad Cluster App管理高可用集群

容器化和Kubernetes如今已成为创建可扩展云原生应用程序的基本要素。但并非每个应用程序工作负载都需要容器或Kubernetes资源。HashiCorp Nomad是一个轻量级的工作负载调度程序&#xff0c;提供了与Kubernetes相似的优势&#xff0c;但不仅可以管理容器&#xff0c;还能管理其他…

SSL证书是否可以给多个域名使用?

在当今数字化的网络环境中&#xff0c;SSL证书在保障网站安全、保护用户数据传输方面发挥着至关重要的作用。那么&#xff0c;SSL 证书是否可以给多个域名使用呢&#xff1f;这是一个在网站开发、运营和安全管理领域备受关注的问题。 SSL 证书能够给多个域名使用吗&#xff1f…

指针函数C++

指针函数概念 指针函数在C中是一种特殊类型的函数。从本质上讲&#xff0c;它是一个函数&#xff0c;不过其返回值是一个指针类型的数据。例如&#xff0c;像int* plusfunction(int a, int b);这样的函数声明&#xff0c;plusfunction就是一个指针函数&#xff0c;它接受两个i…

Jvisualvm介绍;使用Jvisualvm:运行jvisualvm.exe;安装gc插件;查看gc

一&#xff0c;Jvisualvm介绍 jvisualvm是用来查看硬件使用情况的工具&#xff0c;多数会用它来看内存的使用情况 VisualVM 是Netbeans的profile子项目&#xff0c;已在JDK6.0 update 7 中自带(java启动时不需要特定参数&#xff0c;监控工具在bin/jvisualvm.exe)&#xff0c…

leetcode 10.9 94.二叉树的中序遍历

94. 二叉树的中序遍历 已解答 简单 相关标签 相关企业 给定一个二叉树的根节点 root &#xff0c;返回 它的 中序 遍历 。 示例 1&#xff1a; 输入&#xff1a;root [1,null,2,3] 输出&#xff1a;[1,3,2]示例 2&#xff1a; 输入&#xff1a;root [] 输出&#xff1a…

LabelImag标注工具环境配置

LabelImag标注工具环境配置 Anaconda的安装和使用 Anaconda是一个集成了Python解释器、conda包管理器和众多科学计算、数据分析、机器学习等常用库的发行版。它允许用户轻松地管理Python环境和包&#xff0c;无需手动解决依赖问题。Anaconda特别适用于数据科学、机器学习、人…

Mac 下编译 libaom 源码教程

AV1 AV1是一种开放、免版税的视频编码格式&#xff0c;由开放媒体联盟&#xff08;AOMedia&#xff09;开发&#xff0c;旨在提供高压缩效率和优秀的视频质量。AV1支持多种分辨率&#xff0c;包括SD、HD、4K和8K&#xff0c;并适用于视频点播&#xff08;VOD&#xff09;、直播…

Python酷库之旅-第三方库Pandas(139)

目录 一、用法精讲 626、pandas.plotting.scatter_matrix方法 626-1、语法 626-2、参数 626-3、功能 626-4、返回值 626-5、说明 626-6、用法 626-6-1、数据准备 626-6-2、代码示例 626-6-3、结果输出 627、pandas.plotting.table方法 627-1、语法 627-2、参数 …

从0开始linux(11)——进程(3)进程的切换与调度

欢迎来到博主的专栏&#xff1a;从0开始linux 博主ID&#xff1a;代码小豪 文章目录 进程优先级进程的切换linux的调度算法 进程优先级 进程的优先级决定了进程获得CPU资源分配的顺序&#xff0c;在进程&#xff08;0&#xff09;这篇文章中博主就讲过并发和并行两个概念。即对…

ai论文写作软件哪个好?分享5款ai论文题目生成器

在当前的学术研究和写作领域&#xff0c;AI论文写作软件已经成为提高效率和质量的重要工具。根据多个来源的评测和推荐&#xff0c;以下是五款值得推荐的AI论文写作软件&#xff0c;其中特别推荐千笔-AIPassPaper。 1. 千笔-AIPassPaper 千笔-AIPassPaper是一款基于深度学习和…

超级详细 安装 Python 最佳实践文档

第一步&#xff0c;下载Python 打开Python官网&#xff1a;Welcome to Python.org 可以根据自己的需要下载相应的版本 第二步&#xff0c;安装Python 右键.exe文件->选择打开 安装 勾选 Install for all users选项 Browse为选择安装路径&#xff0c;可以自定安装路径 D盘…

动态规划12:213. 打家劫舍 II

动态规划解题步骤&#xff1a; 1.确定状态表示&#xff1a;dp[i]是什么 2.确定状态转移方程&#xff1a;dp[i]等于什么 3.初始化&#xff1a;确保状态转移方程不越界 4.确定填表顺序&#xff1a;根据状态转移方程即可确定填表顺序 5.确定返回值 题目链接&#xff1a;213.…

DAMA数据管理知识体系(第15章 数据管理成熟度评估)

课本内容 15.1 引言 概要 能力成熟度评估&#xff08;Capability Maturity Assessment&#xff0c;CMA&#xff09;是一种基于能力成熟度模型&#xff08;Capability Maturity Model&#xff0c;CMM&#xff09;框架的能力提升方案&#xff0c;描述了数据管理能力初始状态发展到…