在 协作式 Saga 模式(Choreographed Saga)中,不同服务之间通过事件进行通信,而没有中央协调者。每个服务都知道自己需要执行哪些操作,并在执行完后发布事件来通知其他服务进行下一步操作。当某个服务执行失败时,其他服务会根据业务需求执行补偿操作,确保最终一致性。
在协作式 Saga 模式中,服务之间是通过事件驱动进行交互的,消息队列(如 RabbitMQ 或 Kafka)用于服务之间传递事件,确保跨服务事务的执行。
协作式 Saga 模式的核心概念
- 事件驱动:每个服务根据事件来启动自己的事务,并在事务成功或失败后发布事件通知其他服务。
- 无中央协调者:不同服务之间相互了解自己的职责,并且是独立工作的。
- 补偿机制:如果某个服务失败,它会发布补偿事件,其他服务根据这些补偿事件执行回滚或补偿操作。
协作式 Saga 模式实现步骤
1. 系统设计
假设有两个服务:
- Order Service:负责创建订单。
- Inventory Service:负责扣减库存。
这两个服务之间将通过事件进行通信,确保订单的创建和库存的扣减是可靠的。
2. 技术栈
- Spring Boot:用于开发微服务。
- RabbitMQ 或 Kafka:用于事件传递。
- Spring AMQP 或 Spring Kafka:用于集成消息队列。
- Spring Data JPA:用于数据库操作。
3. 工作流程
Order Service 接收到订单请求时,创建订单并发布 OrderCreatedEvent,通知其他服务进行下一步操作。
Inventory Service 听到 OrderCreatedEvent 后,执行扣减库存操作。如果成功,它发布 InventoryReservedEvent,通知订单服务库存已预留;如果失败,则发布 InventoryRollbackEvent,通知订单服务进行补偿操作。
补偿服务 听到 InventoryRollbackEvent 后,执行回滚操作,将订单状态设置为“取消”。
4. Spring Boot 示例实现
4.1 Order Service
Order Service 在收到订单创建请求时,首先会创建订单并发布一个事件 OrderCreatedEvent,通知其他服务(如库存服务)进行库存扣减操作。
// OrderService.java
@Service
public class OrderService {
@Autowired
private RabbitTemplate rabbitTemplate;
@Autowired
private OrderRepository orderRepository;
// 创建订单
@Transactional
public void createOrder(Order order) {
// Step 1: 创建订单
orderRepository.save(order);
// Step 2: 发布事件,通知库存服务进行扣减库存
OrderCreatedEvent orderCreatedEvent = new OrderCreatedEvent(order.getId(), order.getItemId(), order.getQuantity());
rabbitTemplate.convertAndSend("orderExchange", "order.created", orderCreatedEvent);
}
}
4.2 Inventory Service
Inventory Service 监听 OrderCreatedEvent 事件,执行库存扣减操作。如果库存扣减成功,发布 InventoryReservedEvent,否则发布 InventoryRollbackEvent 进行补偿。
// InventoryService.java
@Service
public class InventoryService {
@Autowired
private RabbitTemplate rabbitTemplate;
@Autowired
private InventoryRepository inventoryRepository;
// 监听创建订单事件,执行库存扣减
@RabbitListener(queues = "order.created.queue")
public void handleOrderCreated(OrderCreatedEvent event) {
try {
// Step 1: 扣减库存
boolean success = decreaseInventory(event.getItemId(), event.getQuantity());
if (!success) {
throw new Exception("Insufficient inventory");
}
// Step 2: 发布库存预留成功事件
InventoryReservedEvent reservedEvent = new InventoryReservedEvent(event.getOrderId(), event.getItemId(), event.getQuantity());
rabbitTemplate.convertAndSend("inventoryExchange", "inventory.reserved", reservedEvent);
} catch (Exception e) {
// Step 3: 发布库存扣减失败,回滚事件
InventoryRollbackEvent rollbackEvent = new InventoryRollbackEvent(event.getOrderId(), event.getItemId(), event.getQuantity());
rabbitTemplate.convertAndSend("inventoryExchange", "inventory.rollback", rollbackEvent);
}
}
// 扣减库存
private boolean decreaseInventory(Long itemId, int quantity) {
Inventory inventory = inventoryRepository.findByItemId(itemId);
if (inventory.getStock() < quantity) {
return false; // 库存不足
}
inventory.setStock(inventory.getStock() - quantity);
inventoryRepository.save(inventory);
return true; // 库存扣减成功
}
}
4.3 补偿操作
如果某个服务执行失败,应该发布补偿事件。InventoryService 如果在扣减库存时失败,将发布 InventoryRollbackEvent 来通知其他服务进行回滚操作。
// CompensationService.java
@Service
public class CompensationService {
@Autowired
private OrderRepository orderRepository;
@RabbitListener(queues = "inventory.rollback.queue")
public void handleInventoryRollback(InventoryRollbackEvent event) {
// Step 1: 回滚订单
Order order = orderRepository.findById(event.getOrderId()).orElseThrow(() -> new RuntimeException("Order not found"));
order.setStatus("Cancelled");
orderRepository.save(order);
}
}
4.4 消息队列配置
配置 RabbitMQ(或 Kafka)交换机和队列:
@Configuration
public class RabbitMQConfig {
// 创建交换机
@Bean
public TopicExchange orderExchange() {
return new TopicExchange("orderExchange");
}
@Bean
public TopicExchange inventoryExchange() {
return new TopicExchange("inventoryExchange");
}
// 创建队列
@Bean
public Queue orderCreatedQueue() {
return new Queue("order.created.queue");
}
@Bean
public Queue inventoryReservedQueue() {
return new Queue("inventory.reserved.queue");
}
@Bean
public Queue inventoryRollbackQueue() {
return new Queue("inventory.rollback.queue");
}
// 创建绑定
@Bean
public Binding orderCreatedBinding() {
return BindingBuilder.bind(orderCreatedQueue()).to(orderExchange()).with("order.created");
}
@Bean
public Binding inventoryReservedBinding() {
return BindingBuilder.bind(inventoryReservedQueue()).to(inventoryExchange()).with("inventory.reserved");
}
@Bean
public Binding inventoryRollbackBinding() {
return BindingBuilder.bind(inventoryRollbackQueue()).to(inventoryExchange()).with("inventory.rollback");
}
@Bean
public Jackson2JsonMessageConverter jackson2JsonMessageConverter() {
return new Jackson2JsonMessageConverter();
}
@Bean
public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory,
Jackson2JsonMessageConverter jackson2JsonMessageConverter) {
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
rabbitTemplate.setMessageConverter(jackson2JsonMessageConverter);
return rabbitTemplate;
}
}
4.5 事件定义
定义事件类,表示不同的操作。
// OrderCreatedEvent.java
public class OrderCreatedEvent {
private Long orderId;
private Long itemId;
private int quantity;
// 构造函数,getter 和 setter
}
// InventoryReservedEvent.java
public class InventoryReservedEvent {
private Long orderId;
private Long itemId;
private int quantity;
// 构造函数,getter 和 setter
}
// InventoryRollbackEvent.java
public class InventoryRollbackEvent {
private Long orderId;
private Long itemId;
private int quantity;
// 构造函数,getter 和 setter
}
5.最终一致性
- 补偿机制:如果某个服务执行失败,会通过发布补偿事件来保证系统的最终一致性。
- 幂等性:为了保证补偿事件的幂等性,服务需要确保同一个补偿事件被多次处理时不会产生不一致的状态。例如,库存回滚操作应该是幂等的。
- 事件的可靠传递:使用可靠的消息队列(如 RabbitMQ 或 Kafka),确保消息不丢失且能够正确投递。
6. 总结
协作式 Saga 模式的核心在于事件驱动和服务间的去中心化协调。每个微服务在收到事件后,执行自己的事务操作,并根据执行结果发布事件或补偿事件,最终通过一系列事件的流转来确保系统的一致性。与编排式 Saga 模式不同,协作式 Saga 模式更加灵活和解耦,但也需要保证事件的可靠性、幂等性以及服务间的正确协调。