1. 简介
RocketMQ 支持多种消息类型以满足不同的业务需求
-
普通消息(Standard Message):
-
这是最常用的消息类型,适用于大多数场景。
-
可以设置延迟级别(Delay Levels),但不支持消息轨迹。
-
-
顺序消息(Ordered Message):
-
用于需要保证消息顺序的场景,例如订单状态更新。
-
可以按照消息的发送顺序或者按照消息键(Message Key)的顺序来发送和消费消息。
-
顺序消息可以是单播或组播。
-
-
延时消息(Delayed Message):
-
支持不同级别的延时,例如 1s、5s、10s、1m、2m、3m 等。
-
消息实际发送后并不会立即被消费,而是在指定的延时时间后才可用。
-
可以用于需要延时处理的场景,如支付超时未支付自动取消订单。
-
-
事务消息(Transaction Message):
-
用于需要保证事务性的场景,确保本地事务和消息发送的原子性。
-
发送消息包含两个阶段:
prepare
和commit
或rollback
。 -
需要用户实现本地事务逻辑,并在
prepare
阶段提交消息,在本地事务完成后再调用commit
或rollback
。
-
-
批量消息(Batch Message):
-
允许一次发送多条消息,减少网络请求次数,提高吞吐量。
-
批量消息内部是多条独立的消息,消费时也是逐条消费。
-
-
过滤消息(Filter Message):
-
允许消费者订阅主题时指定标签(Tag),只消费带有指定标签的消息。
-
可以提高消费的效率,只关注感兴趣的消息内容。
-
2. 环境搭建
2.1 开发环境
JDK | 1.8 |
Maven | 3.6 |
rocketmq-client | 4.9.6 |
springboot | 2.7.12 |
docker | 27.1.1 |
docker-compose | 1.29.2 |
2.2 rocketmq组件(4.x架构只需要启动server和broker)
云服务器配置最好选用2核4G,如果是2核2G得话只能启动这两个容器(server和broker一共需要约1.3G内存)
在云服务器中创建一个rocketmq文件夹,进入文件夹(余下操作在文件夹中执行,隔离)
mkdir rocketmq cd rocketmq
配置config
# Configure the broker's IP address
# 1,云服务器中,ip为服务器的地址
# 2,在本地,ip为127.0.0.1
echo "brokerIP1=127.0.0.1" > broker.conf
docker-compose.yml
version: '3.8'
services:
namesrv:
image: apache/rocketmq:4.9.6
container_name: rmqnamesrv
ports:
- 9876:9876
networks:
- rocketmq
command: sh mqnamesrv
broker:
image: apache/rocketmq:4.9.6
container_name: rmqbroker
ports:
- 10909:10909
- 10911:10911
- 10912:10912
environment:
- NAMESRV_ADDR=rmqnamesrv:9876
volumes:
- ./broker.conf:/home/rocketmq/rocketmq-4.9.6/conf/broker.conf
depends_on:
- namesrv
networks:
- rocketmq
command: sh mqbroker -c /home/rocketmq/rocketmq-4.9.6/conf/broker.conf
networks:
rocketmq:
driver: bridge
服务器需要开放端口--->server使用9876端口,broker使用10911端口
执行文件
docker-compose up -d
docker ps(查看启动容器)
2.3 引入依赖
没有使用springboot框架
<!-- https://mvnrepository.com/artifact/org.apache.rocketmq/rocketmq-client -->
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>4.9.6</version>
</dependency>
使用springboot集成rocketmq
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.2.3</version>
</dependency>
3. 简单示例
3.1 生产者
-
DefaultMQProducer
:RocketMQ提供的一个生产者类,用于发送消息。 -
SendResult
:发送消息后返回的结果对象,包含发送状态和消息队列等信息。 -
Message
:代表要发送的消息对象。
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
public class ProducerExample {
public static void main(String[] args) throws Exception {
// Create producer instance and set the producer group name
DefaultMQProducer producer = new DefaultMQProducer("producer_group");
// Set the Name Server address (replace with actual Name Server address)
producer.setNamesrvAddr("110.41.55.242:9876");
producer.setSendMsgTimeout(30000);
producer.start();
try {
// Create a message instance, specifying the topic, tag, and message body
Message msg = new Message("TestTopic", "TagA", ("Hello RocketMQ unique").getBytes());
// Send the message and get the send result
SendResult sendResult = producer.send(msg);
System.out.println("Message sent: " + new String(msg.getBody()));
System.out.println("Send result: " + sendResult);
} catch (Exception e) {
e.printStackTrace();
System.out.println("Message sending failed.");
} finally {
// Shutdown the producer
producer.shutdown();
}
}
}
-
创建一个
DefaultMQProducer
实例,并设置生产者组名称为producer_group
。生产者组是逻辑上的分组,可以一个或多个生产者属于同一个组。 -
设置NameServer的地址和端口,这是RocketMQ用于服务发现的组件。
-
设置发送消息的超时时间为30秒(30000毫秒)。这是生产者在发送消息时等待响应的最长时间。
-
启动生产者实例,使其能够发送消息。
-
创建一个
Message
实例,指定主题为TestTopic
,标签为TagA
,消息体为字符串"Hello RocketMQ unique"
。 -
调用
send
方法发送消息,并获取发送结果SendResult
。
3.2 消费者
-
DefaultMQPushConsumer
:RocketMQ提供的一个消费者类,用于订阅并消费消息。 -
ConsumeConcurrentlyContext
:并发消费消息时的上下文对象。 -
ConsumeConcurrentlyStatus
:并发消费消息后的返回状态。 -
MessageListenerConcurrently
:并发消息监听器接口。 -
MessageExt
:代表单个消息的扩展对象。
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;
import java.util.List;
public class ConsumerExample {
public static void main(String[] args) throws Exception {
// Create consumer instance and set the consumer group name
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumer_group");
// Set the Name Server address (replace with actual Name Server address)
consumer.setNamesrvAddr("110.41.55.242:9876");
// Subscribe to the specified topic and tag (* means all tags)
consumer.subscribe("TestTopic", "*");
// Register message listener
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
for (MessageExt msg : msgs) {
System.out.println("Received message: " + new String(msg.getBody()));
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
// Start the consumer
consumer.start();
System.out.println("Consumer started.");
}
}
-
创建一个
DefaultMQPushConsumer
实例,并设置消费者组名称为consumer_group
。消费者组是逻辑上的分组,可以一个或多个消费者属于同一个组。 -
设置NameServer的地址和端口,这是RocketMQ用于服务发现的组件。
-
订阅
TestTopic
主题下所有标签(*
表示所有标签)的消息(对于过滤消息,需要指定TAG而不是全匹配)。 -
实现
MessageListenerConcurrently
接口:创建一个新的匿名内部类实例,实现MessageListenerConcurrently
接口。 -
消费消息:重写
consumeMessage
方法,该方法包含业务逻辑,用于处理接收到的消息。-
遍历消息:遍历批量消息
msgs
,打印每条消息的内容。 -
返回消费状态:返回
ConsumeConcurrentlyStatus.CONSUME_SUCCESS
表示消息消费成功。
-
3.3 测试结果
启动消费者
Consumer started.
启动生产者
Message sent: Hello RocketMQ unique
Send result: SendResult [sendStatus=SEND_OK, msgId=7F0000018CB018B4AAC2229057A00000, offsetMsgId=6E2937F200002A9F0000000000000242, messageQueue=MessageQueue [topic=TestTopic, brokerName=95d68505f648, queueId=1], queueOffset=2]
此时消费者控制台输出
Consumer started. Received message: Hello RocketMQ unique
4. SpringBoot集成Rocket MQ发送消息
示例代码结合项目实际运用rocketmq去做封装。
4.1 工程结构
main
├── java
│ ├── com
│ │ └── xiaokai
│ │ ├── config--->一些配置类
│ │ │ └── LogMQConfig.java--->打印mq相关信息
│ │ ├── entity--->实体类
│ │ │ ├── orderEntity.java
│ │ │ └── UserEntity.java
│ │ ├── event--->事件
│ │ │ ├── BaseEvent.java--->基础事件
│ │ │ ├── commonMessage.java--->普通消息
│ │ │ └── EventPublisher.java--->事件发布
│ │ ├── listen--->事件监听
│ │ │ └── UserCommonConsumer.java--->消费消息
│ │ ├── service--->服务
│ │ │ └── UserService.java--->调用事件发布
│ │ └── RocketMQApplication.java--->启动程序
├── resources
│ └── application.yml--->配置文件
└── test
├── java
│ ├── com
│ │ └── xiaokai
│ ├── RocketMQTest.java--->测试
4.2 基础代码
LogMQConfig.java
package com.xiaokai.config;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.autoconfigure.RocketMQProperties;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationContext;
import org.springframework.stereotype.Component;
import java.util.Arrays;
/**
* Author:yang
* Date:2024-10-06 17:04
*/
@Component
@Slf4j
public class LogMQConfig {
@Autowired
private ApplicationContext applicationContext;
public void printLog() {
RocketMQProperties properties = applicationContext.getBean(RocketMQProperties.class);
String nameServer = Arrays.toString(properties.getNameServer().split(","));
String producerGroup = properties.getProducer().getGroup();
// 打印配置信息
log.info("RocketMQ NameServer: {}", nameServer);
log.info("RocketMQ Producer Group: {}", producerGroup);
}
}
UserEntity.java
package com.xiaokai.entity;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
/**
* Author:yang
* Date:2024-09-26 10:55
*/
@Data
@Builder
@AllArgsConstructor
@NoArgsConstructor
public class UserEntity {
private int id;
private String name;
private String password;
private String iphone;
private String address;
private int money;
}
Order.java
package com.xiaokai.entity;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
/**
* Author:yang
* Date:2024-09-26 10:52
* Description:订单实体类
*/
@Data
@Builder
@AllArgsConstructor
@NoArgsConstructor
public class OrderEntity {
private Integer userId;
private String name;
private Integer number;
}
BaseEvent<T>.java
package com.xiaokai.event;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
import java.util.Date;
/**
* @description 基础事件
*/
@Data
public abstract class BaseEvent<T> {
public abstract EventMessage<T> buildEventMessage(T data);
public abstract String topic();
@Data
@Builder
@AllArgsConstructor
@NoArgsConstructor
public static class EventMessage<T> {
private String id;
private Date timestamp;
private T data;
}
}
CommonMessage.java
package com.xiaokai.event;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.apache.commons.lang3.RandomStringUtils;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;
import java.util.Date;
/**
* Author:yang
* Date:2024-10-06 16:08
* Description:普通消息
*/
@Service
public class CommonMessage extends BaseEvent<CommonMessage.SendUserMessage> {
@Value("${rocketmq.topic.common}")
private String topic;
// 构建消息
@Override
public EventMessage<SendUserMessage> buildEventMessage(SendUserMessage data) {
return EventMessage.<SendUserMessage>builder()
.data(data)
.timestamp(new Date())
.id(RandomStringUtils.randomNumeric(10))
.build();
}
@Override
public String topic() {
return topic;
}
@Builder
@Data
@AllArgsConstructor
@NoArgsConstructor
public static class SendUserMessage{
private int id;
private String name;
private String password;
}
}
EventPublisher.java
package com.xiaokai.event;
import com.alibaba.fastjson.JSON;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
/**
* Author:yang
* Date:2024-10-06 15:58
* Description:事件发布
*/
@Service
@Slf4j
public class EventPublisher {
@Autowired
private RocketMQTemplate rocketMQTemplate;
public void publish(String topic, BaseEvent.EventMessage<?> message) {
try {
// 转换消息
String payload = JSON.toJSONString(message);
// 发送消息
rocketMQTemplate.convertAndSend(topic, payload);
log.info("publish event success, topic:{}, message:{}", topic, payload);
}catch (Exception e){
log.error("publish event error, topic:{}, message:{}, error:{}", topic, message, e.getMessage());
throw e;
}
}
public void publish(String topic, String message) {
try {
// 发送消息
rocketMQTemplate.convertAndSend(topic, message);
log.info("publish event success, topic:{}, message:{}", topic, message);
}catch (Exception e){
log.error("publish event error, topic:{}, message:{}, error:{}", topic, message, e.getMessage());
throw e;
}
}
}
UserService.java
package com.xiaokai.service;
import com.xiaokai.entity.UserEntity;
import com.xiaokai.event.BaseEvent;
import com.xiaokai.event.CommonMessage;
import com.xiaokai.event.EventPublisher;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cglib.proxy.Enhancer;
import org.springframework.stereotype.Service;
/**
* Author:yang
* Date:2024-10-06 16:20
*/
@Service
@Slf4j
public class UserService {
@Autowired
private CommonMessage commonMessage;
@Autowired
private EventPublisher eventPublisher;
public void sendMsg(UserEntity user){
log.info("构建普通发送消息");
CommonMessage.SendUserMessage message = CommonMessage.SendUserMessage.builder()
.id(user.getId())
.name(user.getName())
.password(user.getPassword()).build();
BaseEvent.EventMessage<CommonMessage.SendUserMessage> sendUserMessageEventMessage = commonMessage.buildEventMessage(message);
eventPublisher.publish(commonMessage.topic(),sendUserMessageEventMessage);
log.info("发送普通消息完成");
}
}
4.3 普通消息
添加监听器
UserCommonConsumer.java
package com.xiaokai.listen;
import apache.rocketmq.v2.Message;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Service;
/**
* Author:yang
* Date:2024-10-06 16:31
* Description:普通消息消费
*/
@Slf4j
@Service
@RocketMQMessageListener(topic = "${rocketmq.topic.common}", consumerGroup = "${rocketmq.consumer.group}")
public class UserCommonConsumer implements RocketMQListener<String> {
@Override
public void onMessage(String s) {
log.info("Received message: {}", s);
}
}
@RocketMQMessageListener不能放在方法上。
测试
package com.xiaokai;
import com.xiaokai.config.LogMQConfig;
import com.xiaokai.entity.UserEntity;
import com.xiaokai.service.UserService;
import lombok.extern.slf4j.Slf4j;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;
import javax.annotation.PreDestroy;
/**
* Author:yang
* Date:2024-10-06 16:32
*/
@SpringBootTest(classes = RocketMQApplication.class)
@RunWith(SpringRunner.class)
@Slf4j
public class RocketMQTest {
@Autowired
private UserService userService;
@Autowired
private LogMQConfig logMQConfig;
@Test
public void test_sendCommonMsg() {
logMQConfig.printLog();
UserEntity user = UserEntity.builder().id(100)
.name("xiaokai")
.password("111111")
.build();
userService.sendMsg(user);
}
}
-
启动RocketMQApplication.java(启动监听器)
-
启动测试方法
测试结果
// test
: RocketMQ NameServer: [110.41.55.242:9876]
: RocketMQ Producer Group: producer-group
: 构建普通发送消息
: publish event success, topic:common-message, message:{"data":{"id":100,"name":"xiaokai","password":"111111"},"id":"8401537837","timestamp":1728295047107}
: 发送普通消息完成
// application
Received message: {"data":{"id":100,"name":"xiaokai","password":"111111"},"id":"8401537837","timestamp":1728295047107}
4.4 过滤消息
过滤消息是在普通消息的topic后面跟上:tag
添加监听器:tag为xiaokai
package com.xiaokai.listen;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Service;
/**
* Author:yang
* Date:2024-10-06 16:31
* Description:过滤消息消费
*/
@Slf4j
@Service
@RocketMQMessageListener(topic = "${rocketmq.topic.filter}", consumerGroup = "${rocketmq.consumer.group}",
selectorExpression = "xiaokai")
public class UserFilterConsumer implements RocketMQListener<String> {
@Override
public void onMessage(String s) {
log.info("Received filter message: {}", s);
}
}
在事件发布中添加方法
/**
* 发布事件
*
* @param topic 主题
* @param message 消息
* @param tag 标签
*/
public void publish(String topic, BaseEvent.EventMessage<?> message, String tag) {
try {
// 转换消息
String payload = JSON.toJSONString(message);
// 发送过滤消息--->在主题后面加上":",然后加上标签:destination = topic:tag
rocketMQTemplate.convertAndSend(topic + ":" + tag, payload);
log.info("publish event success, topic:{}, message:{}", topic, payload);
} catch (Exception e) {
log.error("publish event error, topic:{}, message:{}, error:{}", topic, message, e.getMessage());
throw e;
}
}
服务中添加方法
public void sendFilterMsg(UserEntity user,String tag){
log.info("构建过滤发送消息");
FilterMessage.SendUserMessage message = FilterMessage.SendUserMessage.builder()
.id(user.getId())
.name(user.getName())
.password(user.getPassword()).build();
BaseEvent.EventMessage<FilterMessage.SendUserMessage> sendUserMessageEventMessage = filterMessage.buildEventMessage(message);
eventPublisher.publish(filterMessage.topic(),sendUserMessageEventMessage,tag);
log.info("发送过滤消息完成");
}
启动application:启动了两个监听器
running container: DefaultRocketMQListenerContainer{consumerGroup='consumer-group', namespace='', nameServer='110.41.55.242:9876', topic='common-message', consumeMode=CONCURRENTLY, selectorType=TAG, selectorExpression='*',
Register the listener to container, listenerBeanName:userCommonConsumer, containerBeanName:org.apache.rocketmq.spring.support.DefaultRocketMQListenerContainer_1
running container: DefaultRocketMQListenerContainer{consumerGroup='consumer-group', namespace='', nameServer='110.41.55.242:9876', topic='filter-message', consumeMode=CONCURRENTLY, selectorType=TAG, selectorExpression='xiao
Register the listener to container, listenerBeanName:userFilterConsumer, containerBeanName:org.apache.rocketmq.spring.support.DefaultRocketMQListenerContainer_2
测试
/**
* 发送带tag消息
*/
@Test
public void test_sendFilterMsg() {
logMQConfig.printLog();
UserEntity user = UserEntity.builder().id(100)
.name("xiaokai")
.password("111111")
.build();
userService.sendFilterMsg(user,"xiaokai");
}
: 构建过滤发送消息
: publish event success, topic:filter-message, message:{"data":{"id":100,"name":"xiaokai","password":"111111"},"id":"4163167680","timestamp":1728303830159}
: 发送过滤消息完成
2024-10-07 20:24:03.610 INFO 43732 --- [onsumer-group_1] com.xiaokai.listen.UserFilterConsumer : Received filter message: {"data":{"id":100,"name":"xiaokai","password":"111111"},"id":"4163167680","timestamp":1728303830159}
4.5 延迟消息
Apache RocketMQ 一共支持18种级别的延迟投递,具体如下:
延迟级别 | 延迟时间 | 延迟级别 | 延迟时间 |
1 | 1秒 | 10 | 6分钟 |
2 | 5秒 | 11 | 7分钟 |
3 | 10 秒 | 12 | 8分钟 |
4 | 30 秒 | 13 | 9分钟 |
5 | 1分钟 | 14 | 10分钟 |
6 | 2分钟 | 15 | 20分钟 |
7 | 3分钟 | 16 | 30分钟 |
8 | 4分钟 | 17 | 1小时 |
9 | 5分钟 | 18 | 2小时 |
添加延迟方法
/**
* 发布事件
*
* @param topic 主题
* @param message 消息
* @param level delay级别
*/
public void publishDelay(String topic, BaseEvent.EventMessage<?> message, Integer level) {
try {
// 转换消息
String payload = JSON.toJSONString(message);
Message<String> msg = MessageBuilder.withPayload(payload)
.setHeader(MessageConst.PROPERTY_DELAY_TIME_LEVEL, level)
.build();
// 设置延时级别
rocketMQTemplate.syncSend(topic, msg, 3000, level);
log.info("publish event success, topic:{}, message:{}", topic, payload);
} catch (Exception e) {
log.error("publish event error, topic:{}, message:{}, error:{}", topic, message, e.getMessage());
throw e;
}
}
4.6 批量消息
添加发送消息方法
/**
* 发布事件
* @param topic 主题
* @param message 消息集合
*/
public void publishBatch(String topic, List<BaseEvent.EventMessage<?>> message) {
try {
// 消息集合
List<Message<String>> messages = new ArrayList<>();
// 转换消息
for (BaseEvent.EventMessage<?> eventMessage : message) {
String payload = JSON.toJSONString(eventMessage);
Message<String> msg = MessageBuilder.withPayload(payload)
.build();
messages.add(msg);
}
rocketMQTemplate.syncSend(topic, messages);
log.info("publish event success, topic:{}, message:{}", topic, message);
} catch (Exception e) {
log.error("publish event error, topic:{}, message:{}, error:{}", topic, message, e.getMessage());
throw e;
}
}
添加服务方法
public void sendBatchMsg(UserEntity user){
log.info("构建批量发送消息");
List<BaseEvent.EventMessage<?>> messages = new ArrayList<>();
for (int i = 0; i < 10; i++) {
CommonMessage.SendUserMessage message = CommonMessage.SendUserMessage.builder()
.id(user.getId())
.name(user.getName())
.password(user.getPassword()).build();
BaseEvent.EventMessage<CommonMessage.SendUserMessage> sendUserMessageEventMessage = commonMessage.buildEventMessage(message);
messages.add(sendUserMessageEventMessage);
}
eventPublisher.publishBatch(commonMessage.topic(),messages);
log.info("发送批量消息完成");
}
测试:往common-message的主题中发送批量消息
/**
* 发送批量消息
*/
@Test
public void test_sendBatchMsg() {
logMQConfig.printLog();
UserEntity user = UserEntity.builder().id(100)
.name("xiaokai")
.password("111111")
.build();
userService.sendBatchMsg(user);
}
测试结果:批量发送,单独消费
4.7 顺序消息
发送消息方法
/**
* 发布顺序事件
*
* @param topic 主题
* @param message 消息
*/
public void publishOrder(String topic, BaseEvent.EventMessage<?> message , String orderId) {
try {
// 转换消息
String payload = JSON.toJSONString(message);
rocketMQTemplate.setMessageQueueSelector(new MessageQueueSelector() {
/**
* 选择队列
* @param list 默认的消息队列列表
* @param message 传输的消息
* @param o 传输消息时额外的参数 - send(topic, message, o)
* @return
*/
@Override
public MessageQueue select(List<MessageQueue> list, org.apache.rocketmq.common.message.Message message, Object o) {
// 全部的消息发送到第一个队列
return list.get(0);
}
});
// orderId 相同的消息会被顺序消费
// orderId在选择器中是 o
// payload在选择器中是 message.getPayload()
rocketMQTemplate.syncSendOrderly(topic, payload, orderId);
log.info("publish event success, topic:{}, message:{}", topic, message);
} catch (Exception e) {
log.error("publish event error, topic:{}, message:{}, error:{}", topic, message, e.getMessage());
throw e;
}
}
接口中list为队列,message为消息,o 为传入的对象,返回消息发送到哪个队列。上面的例子中以orderId为分片标准,所有队列的余数,即把orderId相同的消息发送到同一个队列。
4.8 事务消息
事务性消息的发送分为两个阶段,首先会有一个半消息被投递,即一条消息成功发送到MQ服务器,但是服务器没有收到Producer对该消息的第二次确认,此时该消息会被标记为“暂时无法投递”状态。
消息发送成功后会执行本地事务,并根据本地事务的结果向Broker传递半消息状态(提交或者回滚)。
如果由于网络闪退、Producer重启等原因导致某条事务消息的二次确认丢失,Broker 会发现这条长时间处于“半消息”状态的消息,并主动向 Producer 检查该消息的事务状态(Commit 或 Rollback)。因此,如果本地事务执行成功,下游就会收到该消息,否则不会收到。这样最终保证了上下游数据的一致性。
事务消息的详细执行流程如下图所示:
交易消息发送流程
-
生产者将半条消息发送给
RocketMQ Broker
。 -
消息持久化成功后
RocketMQ Broker
,返回Ack给Producer,确认消息发送成功,并且是一条半消息。 -
生产者开始执行本地事务。
-
Producer根据本地事务的结果,向服务端提交第二次确认(Commit或者Rollback),服务端收到确认后,处理如下逻辑。
-
如果第二次确认结果为Commit:服务端将该半消息标记为可交付,并交付给消费者。
-
如果第二次确认结果为Rollback,则服务端将回滚该事务,不会再向Consumer投递该半条消息。
-
-
在网络断线或者Producer重启等特殊情况下,如果Server没有收到Producer的第二次确认结果,或者Server收到的第二次确认结果为Unknown,则Server会在固定的时间之后向某个Producer发起回滚消息。
demo:
实现事务监听器 TransactionListener
package com.xiaokai.listen;
import com.xiaokai.transactional.TransactionService;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.producer.LocalTransactionState;
import org.apache.rocketmq.client.producer.TransactionListener;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageExt;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
/**
* Author:yang
* Date:2024-10-09 17:33
* Description:事务监听器 - 用于实现事务消息
*/
@Slf4j
@Component
public class TransactionListenerImpl implements TransactionListener {
@Autowired
private TransactionService transactionService;
/**
* 作用 - 用于执行本地事务,返回事务状态,如果返回提交状态,则消息会被提交,如果返回回滚状态,则消息会被丢弃。
* 执行时间 - 在发送半消息之后调用
* @param message 这个参数代表了需要进行事务状态检查的消息对象。它包含了消息的主题(topic)、消息体(body)、消息标签(tags)、属性(properties)等信息。
* 这个对象是之前在调用sendMessageInTransaction方法时创建并发送的半消息。
* @param o 这是一个用户自定义的对象,它在调用sendMessageInTransaction发送半消息时传递给TransactionListener的。
* 可以利用这个对象来传递任何需要在执行本地事务检查时使用的上下文信息或状态数据。
* @return 事务状态
*/
@Override
public LocalTransactionState executeLocalTransaction(Message message, Object o) {
// 调用事务服务,执行本地事务
boolean success = transactionService.localTransaction(message);
if (success) {
return LocalTransactionState.COMMIT_MESSAGE;
}
return LocalTransactionState.UNKNOW;
}
/**
* 作用 - 定时(默认5s,可以设置)检查事务状态,如果返回提交状态,则消息会被提交,如果返回回滚状态,则消息会被丢弃。
* 执行时间 - 在检查半消息状态之前调用
* @param messageExt 消息
* @return 事务状态
*/
@Override
public LocalTransactionState checkLocalTransaction(MessageExt messageExt) {
// 调用事务服务,检查本地事务状态
boolean success = transactionService.checkLocalTransaction(messageExt);
if (success) {
return LocalTransactionState.COMMIT_MESSAGE;
}
return LocalTransactionState.ROLLBACK_MESSAGE;
}
}
触发时机:
-
executeLocalTransaction(Message msg, Object arg)
的触发时机:-
当你调用
sendMessageInTransaction
方法发送事务性消息时,RocketMQ客户端库首先会发送一个半消息到消息服务器。 -
然后,客户端库会调用
executeLocalTransaction
方法,传入消息msg
和之前通过sendMessageInTransaction
发送时附加的对象arg
,以执行本地事务。
-
-
checkLocalTransaction(Message msg, Object arg)
的触发时机:-
这个方法会在半消息的回查周期内被调用。消息服务器会定时向消息生产者查询半消息对应的本地事务执行情况。
-
客户端库会调用
checkLocalTransaction
方法,传入消息msg
和之前附加的对象arg
,以检查本地事务是否成功执行。
-
参数含义:
-
Message msg:
-
这个参数代表了需要进行事务状态检查的消息对象。它包含了消息的主题(topic)、消息体(body)、消息标签(tags)、属性(properties)等信息。这个对象是之前在调用
sendMessageInTransaction
方法时创建并发送的半消息。
-
-
Object arg:
-
这是一个用户自定义的对象,它在调用
sendMessageInTransaction
发送半消息时传递给TransactionListener
的。你可以利用这个对象来传递任何需要在执行本地事务检查时使用的上下文信息或状态数据。
-
参数的使用场景:
-
Message msg:
-
你可以使用
msg
对象来获取与消息相关的信息,例如,可以通过msg.getMsgId()
获取消息的唯一标识符,或者使用msg.getProperty
方法获取消息的自定义属性,这些信息可能对检查本地事务状态有帮助。
-
-
Object arg:
-
在发送半消息之前,你可能会执行一些本地事务逻辑,比如数据库操作。在这些操作中,可能需要一些额外的上下文信息来完成事务的提交或回滚。这些信息可以通过
arg
参数在发送半消息时传递给executeLocalTransaction
方法,然后executeLocalTransaction
方法可以再将其传递给checkLocalTransaction
方法。
-
本地事务方法
package com.xiaokai.transactional;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageExt;
import org.springframework.stereotype.Service;
/**
* Author:yang
* Date:2024-10-09 18:30
*/
@Slf4j
@Service
public class TransactionService {
/**
* 本地事务执行
* @param msg
* @return
*/
public boolean localTransaction(Message msg) {
// 1. 执行1方法
// 2. 执行2方法
// 3. 本地事务提交
return true;
}
/**
* 事务回查
* @param messageExt
* @return
*/
public boolean checkLocalTransaction(MessageExt messageExt) {
// 用消息中的部分信息,入库查询事务执行结果 - 使用orderId查询订单表是否有这个这个记录
// 1. 查询事务执行结果
// 2. 根据查询结果决定是否提交事务
return true;
}
}
监听器配置
package com.xiaokai.config;
import com.xiaokai.listen.TransactionListenerImpl;
import org.apache.rocketmq.client.producer.TransactionMQProducer;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Component;
/**
* Author:yang
* Date:2024-10-09 18:54
* Description:发送事务消息,需要配置监听器
*/
@Component
public class MQTemplateConfig {
@Autowired
private TransactionListenerImpl transactionListener;
@Value("${rocketmq.name-server}")
private String nameServer;
@Value("${rocketmq.producer.group}")
private String producerGroup;
@Bean
public TransactionMQProducer transactionMQProducer() {
TransactionMQProducer producer = new TransactionMQProducer("transaction_group");
producer.setTransactionListener(transactionListener);
// 其他配置,如setNamesrvAddr等
producer.setNamesrvAddr(nameServer);
producer.setProducerGroup(producerGroup);
return producer;
}
@Bean
public RocketMQTemplate rocketTransactionMQTemplate(TransactionMQProducer transactionMQProducer) {
RocketMQTemplate template = new RocketMQTemplate();
template.setProducer(transactionMQProducer);
// 其他配置
return template;
}
}
发送事务消息
/**
* 发布事务事件
*
* @param topic 主题
* @param message 消息
*/
public void publishTransaction(String topic, BaseEvent.EventMessage<?> message , String orderId) {
try {
// 转换消息
String payload = JSON.toJSONString(message);
Message<String> msg = MessageBuilder.withPayload(payload)
.build();
rocketMQTemplate.sendMessageInTransaction(topic, msg , orderId);
log.info("publish event success, topic:{}, message:{}", topic, message);
} catch (Exception e) {
log.error("publish event error, topic:{}, message:{}, error:{}", topic, message, e.getMessage());
throw e;
}
}
注:
1. 文章全篇使用了消息同步发送方式,建议采用消息异步发送方式(发送消息时,添加回调函数监听消息)。
/**
* 异步发布事件
* @param topic 主题
* @param message 消息
*/
public void publish(String topic, String message) {
try {
// 发送消息
rocketMQTemplate.asyncSend(topic, message, new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
log.info("publish event success, topic:{}, message:{}", topic, message);
}
@Override
public void onException(Throwable throwable) {
log.error("publish event error, topic:{}, message:{}, error:{}", topic, message, throwable.getMessage());
}
});
} catch (Exception e) {
log.error("publish event error, topic:{}, message:{}, error:{}", topic, message, e.getMessage());
throw e;
}
}
2. 刚开始启动出错误,优先检查rmqbroker容器是不是处于运行状态。