以下是一个基于 Spring Boot + RocketMQ 的完整分布式事务实战 Demo,包含事务消息、本地事务、自动重试、死信队列(DLQ) 等核心机制。代码已充分注释,可直接运行。
一、项目结构
src/main/java
├── com.example.rocketmq
│ ├── controller
│ │ └── OrderController.java
│ ├── model
│ │ ├── Order.java
│ │ ├── OrderRequest.java
│ ├── repository
│ │ ├── OrderRepository.java
│ ├── service
│ │ ├── InventoryService.java
│ │ ├── OrderService.java
│ │ ├── PaymentService.java
│ ├── listener
│ │ └── OrderConsumer.java
│ └── RocketMQConfig.java
├── application.yml
└── pom.xml
二、依赖配置(pom.xml)
<dependencies>
<!-- Spring Boot Starter -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<!-- RocketMQ Spring Boot Starter -->
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.11.0</version>
</dependency>
<!-- MySQL Driver -->
<dependency>
<groupId>com.mysql</groupId>
<artifactId>mysql-connector-j</artifactId>
<scope>runtime</scope>
</dependency>
<!-- Spring Data JPA -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-jpa</artifactId>
</dependency>
</dependencies>
三、配置文件(application.yml)
# RocketMQ配置
rocketmq:
producer:
name-server: localhost:9876
default-topic: order_topic
consumer:
name-server: localhost:9876
default-topic: order_topic
consumer-group: order_consumer_group
acknowledge-mode: AUTO
max-reconsume-times: 5 # 最大重试次数
broker:
role: SYNC_MASTER # 同步复制模式
store-path-commit-log: /data/rocketmq/commitlog
store-path-consume-queue: /data/rocketmq/consumequeue
# 数据库配置
spring:
datasource:
url: jdbc:mysql://localhost:3306/rocketmq_db?useSSL=false&serverTimezone=UTC
username: root
password: root
driver-class-name: com.mysql.cj.jdbc.Driver
hikari:
maximum-pool-size: 20
jpa:
hibernate:
ddl-auto: update
show-sql: true
四、核心代码实现
1. 实体类(Order.java)
@Entity
@Table(name = "orders")
@Data
public class Order {
@Id
@GeneratedValue(strategy = GenerationType.IDENTITY)
private Long id;
private String userId;
private BigDecimal amount;
private String sku;
private Integer status; // 0-待支付,1-已支付,2-已发货
}
2. 生产者代码(OrderService.java)
@Service
public class OrderService {
@Autowired
private RocketMQTemplate rocketMQTemplate;
@Autowired
private InventoryService inventoryService;
@Autowired
private PaymentService paymentService;
@Transactional // 本地事务
public void createOrder(OrderRequest request) {
// 1. 扣减库存(本地事务)
inventoryService.deduct(request.getSku());
// 2. 发送事务消息(与本地事务绑定)
rocketMQTemplate.sendMessageInTransaction(
"order_topic",
request,
() -> { // 事务回滚回调
System.out.println("本地事务回滚,消息未发送!");
return null;
}
);
}
}
3. 消费者代码(OrderConsumer.java)
@Service
public class OrderConsumer {
@Autowired
private OrderRepository orderRepository;
@Autowired
private PaymentService paymentService;
@Autowired
private InventoryService inventoryService;
@RocketMQListener(
topics = "order_topic",
consumerGroup = "order_consumer_group",
acknowledge-mode = AcknowledgeMode.AUTO
)
public void listen(OrderRequest request) {
try {
// 1. 生成订单记录
Order order = new Order();
order.setUserId(request.getUserId());
order.setAmount(request.getAmount());
order.setSku(request.getSku());
orderRepository.save(order);
// 2. 扣款(外部服务调用)
paymentService.charge(request.getUserId(), request.getAmount());
// 3. 发送物流通知(模拟成功)
System.out.println("物流已通知,订单号: " + order.getId());
} catch (Exception e) {
// 4. 异常处理:触发重试或补偿
System.out.println("处理失败,触发重试! 订单号: " + request.getOrderNo());
throw new RuntimeException("订单处理失败", e);
}
}
}
4. 支付服务(PaymentService.java)
@Service
public class PaymentService {
@Autowired
private PaymentRepository paymentRepository;
public void charge(String userId, BigDecimal amount) {
// 模拟支付失败(30%概率)
if (Math.random() < 0.3) {
throw new RuntimeException("支付失败,用户: " + userId);
}
Payment payment = new Payment();
payment.setUserId(userId);
payment.setAmount(amount);
payment.setStatus("SUCCESS");
paymentRepository.save(payment);
}
}
五、数据库表设计
1. 订单表(orders)
CREATE TABLE orders (
id BIGINT PRIMARY KEY AUTO_INCREMENT,
user_id VARCHAR(50) NOT NULL,
amount DECIMAL(10,2) NOT NULL,
sku VARCHAR(50) NOT NULL,
status TINYINT DEFAULT 0 COMMENT '0-待支付,1-已支付,2-已发货'
);
2. 支付表(payments)
CREATE TABLE payments (
id BIGINT PRIMARY KEY AUTO_INCREMENT,
user_id VARCHAR(50) NOT NULL,
amount DECIMAL(10,2) NOT NULL,
status ENUM('SUCCESS', 'FAILED') DEFAULT 'SUCCESS'
);
六、测试与验证
1. 正常流程
• 步骤:
- 发送创建订单请求(扣减库存 + 发送事务消息)。
- 消费者处理消息(生成订单 + 扣款 + 物流通知)。
• 预期结果:
• 库存减少,订单和支付记录生成,物流通知成功。
2. 异常流程(支付失败)
• 步骤:
- 发送创建订单请求。
- 消费者处理时支付失败(抛出异常)。
- RocketMQ自动重试(默认3次)。
- 重试失败后消息转入DLQ。
• 预期结果:
• 库存已恢复(通过本地事务回滚)。
• 订单未生成,支付记录未插入。
3. DLQ处理
• 操作:手动消费DLQ中的消息,排查支付失败原因(如用户余额不足)。
• 代码示例:
@RocketMQListener(
topics = "order_topic_DLQ",
consumerGroup = "order_consumer_group_dlq"
)
public void listenDLQ(OrderRequest request) {
System.out.println("处理死信消息: " + request.getOrderNo());
// 人工干预逻辑(如短信通知用户)
}
七、关键机制说明
1. 事务消息与本地事务绑定
• 代码示例:sendMessageInTransaction
方法将消息发送与本地事务提交原子化。
• 流程:
• 本地事务成功 → RocketMQ持久化消息。
• 本地事务失败 → RocketMQ丢弃消息。
2. 自动重试与死信队列
• 配置:max-reconsume-times=5
表示最大重试5次。
• DLQ Topic:默认死信队列名称为 order_topic_Retry
,可通过 spring.rabbitmq.listener.defaultDLQ
配置。
3. ACK确认机制
• 自动ACK:消费者处理完消息后自动发送确认,RocketMQ删除消息。
• 手动ACK(可选):通过 AcknowledgeMode.MANUAL
控制。
八、生产环境优化建议
- 持久化配置:
• 确保storePathCommitLog
和storePathConsumeQueue
指向持久化磁盘路径。 - 多Broker集群:
• 部署多个Broker节点,配置brokerRole=SYNC_MASTER
实现高可用。 - 监控与报警:
• 监控ConsumerLag
和MessagesPending
指标,阈值报警。 - 日志记录:
• 启用RocketMQ日志(log4j2.xml
),记录消息生产、消费详情。
九、总结
通过本Demo,你已掌握以下核心技能:
- 事务消息:结合本地事务实现强一致性。
- 自动重试:处理临时性故障(如网络抖动)。
- 死信队列:隔离无法处理的异常消息。
- 监控与运维:通过指标和日志保障系统稳定性。
下一步行动:
• 将Demo部署到Docker容器,模拟高并发场景。
• 结合Seata框架实现更复杂的分布式事务(如订单-库存-支付三阶段)。