协作式 Saga 模式

news2025/1/12 6:32:13

在 协作式 Saga 模式(Choreographed Saga)中,不同服务之间通过事件进行通信,而没有中央协调者。每个服务都知道自己需要执行哪些操作,并在执行完后发布事件来通知其他服务进行下一步操作。当某个服务执行失败时,其他服务会根据业务需求执行补偿操作,确保最终一致性。

在协作式 Saga 模式中,服务之间是通过事件驱动进行交互的,消息队列(如 RabbitMQ 或 Kafka)用于服务之间传递事件,确保跨服务事务的执行。
协作式 Saga 模式的核心概念

  1. 事件驱动:每个服务根据事件来启动自己的事务,并在事务成功或失败后发布事件通知其他服务。
  2. 无中央协调者:不同服务之间相互了解自己的职责,并且是独立工作的。
  3. 补偿机制:如果某个服务失败,它会发布补偿事件,其他服务根据这些补偿事件执行回滚或补偿操作。

协作式 Saga 模式实现步骤

1. 系统设计

假设有两个服务:

  • Order Service:负责创建订单。
  • Inventory Service:负责扣减库存。

这两个服务之间将通过事件进行通信,确保订单的创建和库存的扣减是可靠的。

2. 技术栈

  • Spring Boot:用于开发微服务。
  • RabbitMQ 或 Kafka:用于事件传递。
  • Spring AMQP 或 Spring Kafka:用于集成消息队列。
  • Spring Data JPA:用于数据库操作。

3. 工作流程

Order Service 接收到订单请求时,创建订单并发布 OrderCreatedEvent,通知其他服务进行下一步操作。
Inventory Service 听到 OrderCreatedEvent 后,执行扣减库存操作。如果成功,它发布 InventoryReservedEvent,通知订单服务库存已预留;如果失败,则发布 InventoryRollbackEvent,通知订单服务进行补偿操作。
补偿服务 听到 InventoryRollbackEvent 后,执行回滚操作,将订单状态设置为“取消”。

4. Spring Boot 示例实现

4.1 Order Service

Order Service 在收到订单创建请求时,首先会创建订单并发布一个事件 OrderCreatedEvent,通知其他服务(如库存服务)进行库存扣减操作。

// OrderService.java
@Service
public class OrderService {

    @Autowired
    private RabbitTemplate rabbitTemplate;
    
    @Autowired
    private OrderRepository orderRepository;

    // 创建订单
    @Transactional
    public void createOrder(Order order) {
        // Step 1: 创建订单
        orderRepository.save(order);

        // Step 2: 发布事件,通知库存服务进行扣减库存
        OrderCreatedEvent orderCreatedEvent = new OrderCreatedEvent(order.getId(), order.getItemId(), order.getQuantity());
        rabbitTemplate.convertAndSend("orderExchange", "order.created", orderCreatedEvent);
    }
}

4.2 Inventory Service

Inventory Service 监听 OrderCreatedEvent 事件,执行库存扣减操作。如果库存扣减成功,发布 InventoryReservedEvent,否则发布 InventoryRollbackEvent 进行补偿。

// InventoryService.java
@Service
public class InventoryService {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Autowired
    private InventoryRepository inventoryRepository;

    // 监听创建订单事件,执行库存扣减
    @RabbitListener(queues = "order.created.queue")
    public void handleOrderCreated(OrderCreatedEvent event) {
        try {
            // Step 1: 扣减库存
            boolean success = decreaseInventory(event.getItemId(), event.getQuantity());
            if (!success) {
                throw new Exception("Insufficient inventory");
            }

            // Step 2: 发布库存预留成功事件
            InventoryReservedEvent reservedEvent = new InventoryReservedEvent(event.getOrderId(), event.getItemId(), event.getQuantity());
            rabbitTemplate.convertAndSend("inventoryExchange", "inventory.reserved", reservedEvent);
        } catch (Exception e) {
            // Step 3: 发布库存扣减失败,回滚事件
            InventoryRollbackEvent rollbackEvent = new InventoryRollbackEvent(event.getOrderId(), event.getItemId(), event.getQuantity());
            rabbitTemplate.convertAndSend("inventoryExchange", "inventory.rollback", rollbackEvent);
        }
    }

    // 扣减库存
    private boolean decreaseInventory(Long itemId, int quantity) {
        Inventory inventory = inventoryRepository.findByItemId(itemId);
        if (inventory.getStock() < quantity) {
            return false; // 库存不足
        }
        inventory.setStock(inventory.getStock() - quantity);
        inventoryRepository.save(inventory);
        return true; // 库存扣减成功
    }
}

4.3 补偿操作

如果某个服务执行失败,应该发布补偿事件。InventoryService 如果在扣减库存时失败,将发布 InventoryRollbackEvent 来通知其他服务进行回滚操作。

// CompensationService.java
@Service
public class CompensationService {

    @Autowired
    private OrderRepository orderRepository;

    @RabbitListener(queues = "inventory.rollback.queue")
    public void handleInventoryRollback(InventoryRollbackEvent event) {
        // Step 1: 回滚订单
        Order order = orderRepository.findById(event.getOrderId()).orElseThrow(() -> new RuntimeException("Order not found"));
        order.setStatus("Cancelled");
        orderRepository.save(order);
    }
}

4.4 消息队列配置

配置 RabbitMQ(或 Kafka)交换机和队列:

@Configuration
public class RabbitMQConfig {

    // 创建交换机
    @Bean
    public TopicExchange orderExchange() {
        return new TopicExchange("orderExchange");
    }

    @Bean
    public TopicExchange inventoryExchange() {
        return new TopicExchange("inventoryExchange");
    }

    // 创建队列
    @Bean
    public Queue orderCreatedQueue() {
        return new Queue("order.created.queue");
    }

    @Bean
    public Queue inventoryReservedQueue() {
        return new Queue("inventory.reserved.queue");
    }

    @Bean
    public Queue inventoryRollbackQueue() {
        return new Queue("inventory.rollback.queue");
    }

    // 创建绑定
    @Bean
    public Binding orderCreatedBinding() {
        return BindingBuilder.bind(orderCreatedQueue()).to(orderExchange()).with("order.created");
    }

    @Bean
    public Binding inventoryReservedBinding() {
        return BindingBuilder.bind(inventoryReservedQueue()).to(inventoryExchange()).with("inventory.reserved");
    }

    @Bean
    public Binding inventoryRollbackBinding() {
        return BindingBuilder.bind(inventoryRollbackQueue()).to(inventoryExchange()).with("inventory.rollback");
    }
    @Bean
    public Jackson2JsonMessageConverter jackson2JsonMessageConverter() {
        return new Jackson2JsonMessageConverter();
    }

    @Bean
    public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory,
                                         Jackson2JsonMessageConverter jackson2JsonMessageConverter) {
        RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
        rabbitTemplate.setMessageConverter(jackson2JsonMessageConverter);
        return rabbitTemplate;
    }
}

4.5 事件定义

定义事件类,表示不同的操作。

// OrderCreatedEvent.java
public class OrderCreatedEvent {
    private Long orderId;
    private Long itemId;
    private int quantity;

    // 构造函数,getter 和 setter
}

// InventoryReservedEvent.java
public class InventoryReservedEvent {
    private Long orderId;
    private Long itemId;
    private int quantity;

    // 构造函数,getter 和 setter
}

// InventoryRollbackEvent.java
public class InventoryRollbackEvent {
    private Long orderId;
    private Long itemId;
    private int quantity;

    // 构造函数,getter 和 setter
}

5.最终一致性

  • 补偿机制:如果某个服务执行失败,会通过发布补偿事件来保证系统的最终一致性。
  • 幂等性:为了保证补偿事件的幂等性,服务需要确保同一个补偿事件被多次处理时不会产生不一致的状态。例如,库存回滚操作应该是幂等的。
  • 事件的可靠传递:使用可靠的消息队列(如 RabbitMQ 或 Kafka),确保消息不丢失且能够正确投递。

6. 总结

协作式 Saga 模式的核心在于事件驱动和服务间的去中心化协调。每个微服务在收到事件后,执行自己的事务操作,并根据执行结果发布事件或补偿事件,最终通过一系列事件的流转来确保系统的一致性。与编排式 Saga 模式不同,协作式 Saga 模式更加灵活和解耦,但也需要保证事件的可靠性、幂等性以及服务间的正确协调。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2275361.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

【批量拆分PDF】批量按页码范围拆分PDF并按页码重命名:技术难题与总结

按照页码范围拆分PDF项目实战参考&#xff1a; 【批量个性化拆分PDF】批量拆分PDF只取PDF的首页&#xff0c;批量按照文件大小来拆分PDF&#xff0c;PDF按照目录页码范围批量计算拆分分割文件PDF个性化拆分&#xff08;单个拆分&#xff0c;取首页拆分&#xff0c;按页码计算拆…

MySQL表的增删改查(基础)-上篇

目录 CRUD 新增 查询 (1)全列查询 (2)指定列查询 (3)查询时指定表达式 (4)别名 (5)去重查询 (6)排序查询 (7)条件查询 (8)分页查询 CRUD 即增加(Create)、查询(Retrieve)、更新(Update)、删除(Delete)四个单词的首字母缩写 新增 也可插入中文字符串 查询 (1)全列查…

【论文速读】| 利用大语言模型在灰盒模糊测试中生成初始种子

基本信息 论文标题: Harnessing Large Language Models for Seed Generation in Greyb0x Fuzzing 作者: Wenxuan Shi, Yunhang Zhang, Xinyu Xing, Jun Xu 作者单位: Northwestern University, University of Utah 关键词: Greyb0x fuzzing, Large Language Models, Seed g…

Linux:操作系统简介

前言&#xff1a; 在本片文章&#xff0c;小编将带大家理解冯诺依曼体系以及简单理解操作喜欢&#xff0c;并且本篇文章将围绕什么以及为什么两个话题进行展开说明。 冯诺依曼体系&#xff1a; 是什么&#xff1a; 冯诺依曼体系&#xff08;Von Neumann architecture&#xff…

为什么选择平滑样条?

为什么选择平滑样条&#xff1f; 抗噪声能力&#xff1a; 平滑样条通过引入平滑参数 λ \lambda λ&#xff0c;允许你在以下两者之间找到平衡&#xff1a; 拟合误差&#xff08;与数据的偏离&#xff09;&#xff1a;希望曲线接近数据点。光滑性&#xff08;曲线的平滑程度&a…

边缘计算网关解决车间数据采集的关键问题

随着工业4.0和智能制造的快速发展&#xff0c;车间数据采集与分析已成为提升生产效率、保证产品质量、优化加工过程的关键环节。传统的数据采集方式&#xff0c;如中心化的数据处理模式&#xff0c;在面对海量数据、实时性要求高的工业场景时&#xff0c;往往显得力不从心。边缘…

C语言之assert断言

1.assert的使用形式 #include <assert.h>assert (表达式); (1)在c语言中&#xff0c;宏&#xff0c;是一种预处理指令。assert(表示式) 就是一个宏 (2)表达式必须是一个能计算出真或假的布尔条件&#xff0c;它通常意味着 该表达式是一个能够返回整数值的表达式&#…

【Linux】正则表达式

正则表达式是一种可供Linux工具过滤文本的自定义模板&#xff0c;Linux工具&#xff08;如sed、gawk&#xff09;会在读取数据时使用正则表达式对数据进行模式匹配。 正则表达式使用元字符来描述数据流中的一个或多个字符。它是由正则表达式引擎实现的。正则表达式引擎是一种底…

hutool糊涂工具通过注解设置excel宽度

import java.lang.annotation.*;Documented Retention(RetentionPolicy.RUNTIME) Target({ElementType.METHOD, ElementType.FIELD, ElementType.PARAMETER}) public interface ExcelStyle {int width() default 0; }/*** 聊天记录*/ Data public class DialogContentInfo {/**…

全面教程:Nacos 2.4.2 启用鉴权与 MySQL 数据存储配置

全面教程&#xff1a;Nacos 2.4.2 启用鉴权与 MySQL 数据存储配置 1. 配置 Nacos 开启鉴权功能 1.1 修改 application.properties 配置文件 在 Nacos 2.4.2 中&#xff0c;开启鉴权功能需要修改 conf/application.properties 文件。按照以下方式配置&#xff1a; # 开启鉴权…

【学习】CMMM智能制造能力成熟度评估的重要性

CMMM认证通过对企业当前生产状态的全面评估&#xff0c;能够精准地确定其智能化生产的程度&#xff0c;并将企业的智能化生产水平划分为五个等级&#xff0c;包括初始级、已定义级、以管理级、卓越级和顶级。这种等级划分使得不同类型的企业能够根据自身实际情况&#xff0c;选…

特制一个自己的UI库,只用CSS、图标、emoji图 第二版

图&#xff1a; 代码&#xff1a; index.html <!DOCTYPE html> <html lang"zh-CN"> <head><meta charset"UTF-8"><meta name"viewport" content"widthdevice-width, initial-scale1.0"><title>M…

Y3编辑器地图教程:ORPG教程、防守图教程

文章目录 Part1&#xff1a;ORPG教程一、章节人物选择1.1 Logo与界面动画1.2 章节选择与投票1.2.1 设计章节选择完毕后的操作1.2.2 玩家投票统计 1.3 多样化的人物选择系统1.3.1 异步模型显示1.3.2 双击和键盘选人1.3.3 UI选人 1.4 简易存档 二、对话与任务系统2.1对话UI与触发…

Ubuntu问题 -- 硬盘存储不够了, 如何挂载一个新的硬盘上去, 图文简单明了, 已操作成功

需求 我现在有一个ubuntu22.04操作系统的服务器, 但是当前硬盘不够用了, 我买了一个1T的SSD固态硬盘, 且已经安装在服务器上了, 我需要将这个硬盘挂载到当前ubuntu的某个目录上 开始 1. 确认新硬盘是否被系统识别 打开终端&#xff0c;输入以下命令查看系统识别到的硬盘&…

吴恩达 提示词工程 课程笔记

一、Introduction 二、Guidelines Principle1: 清晰&#xff08;不一定是简短的&#xff09;而具体的指令 Tactic1: 使用分隔符 Triple quotes: “”" Triple backticks: Triple dashes: — Angle brackets:< > XML tags: < tag></ tag> Tactic2:…

网络安全设备主要有什么

网络安全设备指的肯定是硬件设备了&#xff0c;国内卖安全硬件的没几家&#xff0c;天融信&#xff0c;启明星辰&#xff0c;绿盟&#xff0c;深信服&#xff0c;就这四家卖的比较齐全吧&#xff0c;上它们官网看一下&#xff0c;就知道市面上主要的网络安全设备有哪些了。分类…

【C++补充】第一弹---位图技术揭秘:内存优化与快速访问

✨个人主页&#xff1a; 熬夜学编程的小林 &#x1f497;系列专栏&#xff1a; 【C语言详解】 【数据结构详解】【C详解】 目录 1 位图 1.1 位图相关面试题 1.2 位图的设计及实现 1.3 C库中的位图 bitset 1.4 位图的模拟实现 1.5 位图的优缺点 1.6 位图相关考察题目 1 …

解决nginx多层代理后应用部署后访问发现css、js、图片等样式加载失败

一般是采用前后端分离部署方式&#xff0c;被上一层ng代理后&#xff0c;通过域名访问报错&#xff0c;例如&#xff1a;sqx.com.cn/应用代理路径。 修改nginx配置&#xff0c;配置前端页面的路径&#xff1a; location / {proxy_pass http://前端页面所在服务器的IP:PORT;pro…

第34天:安全开发-JavaEE应用反射机制攻击链类对象成员变量方法构造方法

时间轴&#xff1a; Java反射相关类图解&#xff1a; 反射&#xff1a; 1、什么是 Java 反射 参考&#xff1a; https://xz.aliyun.com/t/9117 Java 提供了一套反射 API &#xff0c;该 API 由 Class 类与 java.lang.reflect 类库组成。 该类库包含了 Field 、 Me…

Qt天气预报系统获取天气数据

Qt天气预报系统获取天气数据 1、获取天气数据1.1添加天气类头文件1.2定义今天和未来几天天气数据类1.3定义一个解析JSON数据的函数1.4在mainwindow中添加weatherData.h1.5创建今天天气数据和未来几天天气数据对象1.6添加parseJson定义1.7把解析JSON数据添加进去1.8添加错误1.9解…