【分布式】如何使用RocketMQ实现下单-库存-支付这个场景的分布式事务问题

news2025/3/14 14:50:27

下单-库存-支付 场景中,通过消息队列实现最终一致性,需保证三个微服务的操作最终一致,且在支付失败或库存不足时触发回滚补偿。以下是具体实现方案:


1. 整体流程设计

正常流程(成功场景)
  1. 订单服务 创建订单(状态为待支付),发送 订单创建成功事件
  2. 库存服务 消费事件,扣减库存,发送 库存扣减成功事件
  3. 支付服务 消费事件,执行支付,发送 支付成功事件
  4. 订单服务 消费支付成功事件,更新订单状态为已完成
异常流程(失败场景)

库存不足:库存服务直接发送 库存不足事件,订单服务取消订单。
支付失败:支付服务发送 支付失败事件,触发库存回滚和订单取消。


2. 核心组件与消息设计

消息队列(以RocketMQ为例)
事件类型Topic生产者消费者说明
订单创建成功事件order_created订单服务库存服务触发库存扣减
库存扣减成功事件stock_reduced库存服务支付服务触发支付
支付成功事件payment_done支付服务订单服务完成订单
库存不足事件stock_failed库存服务订单服务取消订单
支付失败事件payment_failed支付服务订单/库存服务触发库存回滚和订单取消
数据表设计(关键字段)

订单表(Order)

CREATE TABLE orders (
  id VARCHAR(64) PRIMARY KEY,  -- 订单ID(全局唯一)
  user_id BIGINT,
  amount DECIMAL(10,2),
  status ENUM('pending', 'completed', 'canceled')  -- 订单状态
);

库存表(Inventory)

CREATE TABLE inventory (
  product_id BIGINT PRIMARY KEY,
  stock INT CHECK(stock >= 0)  -- 库存不可为负
);

3. 实现步骤

(1) 订单服务:创建订单并发送事件
// 订单服务(OrderService.java)
@Transactional
public void createOrder(Order order) {
    // 1. 检查参数合法性(如用户存在性)
    // 2. 插入订单记录(状态为pending)
    orderDao.insert(order);
    
    // 3. 发送订单创建成功事件(事务消息)
    Message msg = new Message("order_created", JSON.toJSONBytes(order));
    TransactionSendResult result = rocketMQTemplate.sendMessageInTransaction(
        "order_group", 
        msg, 
        order.getId()  // 事务ID(用订单ID标识)
    );
    
    // 4. 若消息发送失败,抛出异常触发事务回滚
    if (result.getLocalTransactionState() != LocalTransactionState.COMMIT_MESSAGE) {
        throw new RuntimeException("订单创建事件发送失败");
    }
}
(2) 库存服务:扣减库存
// 库存服务(InventoryService.java)
@RocketMQMessageListener(topic = "order_created", consumerGroup = "inventory_group")
public class OrderCreatedListener implements RocketMQListener<MessageExt> {
    @Override
    @Transactional
    public void onMessage(MessageExt message) {
        Order order = JSON.parseObject(message.getBody(), Order.class);
        
        // 1. 检查库存是否充足
        Inventory inventory = inventoryDao.selectByProductId(order.getProductId());
        if (inventory.getStock() < order.getQuantity()) {
            // 发送库存不足事件
            rocketMQTemplate.sendOneWay("stock_failed", JSON.toJSONBytes(order));
            return;
        }
        
        // 2. 扣减库存(乐观锁防止超卖)
        int updated = inventoryDao.reduceStock(
            order.getProductId(), 
            order.getQuantity(), 
            inventory.getVersion()
        );
        if (updated == 0) {
            throw new RetryException("库存扣减冲突,稍后重试"); // 触发消息重试
        }
        
        // 3. 发送库存扣减成功事件
        rocketMQTemplate.sendOneWay("stock_reduced", JSON.toJSONBytes(order));
    }
}
(3) 支付服务:执行支付
// 支付服务(PaymentService.java)
@RocketMQMessageListener(topic = "stock_reduced", consumerGroup = "payment_group")
public class StockReducedListener implements RocketMQListener<MessageExt> {
    @Override
    @Transactional
    public void onMessage(MessageExt message) {
        Order order = JSON.parseObject(message.getBody(), Order.class);
        
        // 1. 调用第三方支付接口
        boolean success = paymentClient.pay(order.getUserId(), order.getAmount());
        if (!success) {
            // 发送支付失败事件
            rocketMQTemplate.sendOneWay("payment_failed", JSON.toJSONBytes(order));
            return;
        }
        
        // 2. 发送支付成功事件
        rocketMQTemplate.sendOneWay("payment_done", JSON.toJSONBytes(order));
    }
}
(4) 订单服务:处理成功/失败事件
// 订单服务(OrderService.java)
@RocketMQMessageListener(topic = "payment_done", consumerGroup = "order_complete_group")
public class PaymentDoneListener implements RocketMQListener<MessageExt> {
    @Override
    @Transactional
    public void onMessage(MessageExt message) {
        Order order = JSON.parseObject(message.getBody(), Order.class);
        orderDao.updateStatus(order.getId(), "completed");
    }
}

@RocketMQMessageListener(topic = {"stock_failed", "payment_failed"}, consumerGroup = "order_cancel_group")
public class OrderCancelListener implements RocketMQListener<MessageExt> {
    @Override
    @Transactional
    public void onMessage(MessageExt message) {
        Order order = JSON.parseObject(message.getBody(), Order.class);
        
        // 1. 取消订单(状态置为canceled)
        orderDao.updateStatus(order.getId(), "canceled");
        
        // 2. 若事件来自支付失败,需触发库存回滚(发送库存回滚事件)
        if (message.getTopic().equals("payment_failed")) {
            rocketMQTemplate.sendOneWay("stock_rollback", JSON.toJSONBytes(order));
        }
    }
}
(5) 库存服务:处理回滚事件
// 库存服务(InventoryService.java)
@RocketMQMessageListener(topic = "stock_rollback", consumerGroup = "inventory_rollback_group")
public class StockRollbackListener implements RocketMQListener<MessageExt> {
    @Override
    @Transactional
    public void onMessage(MessageExt message) {
        Order order = JSON.parseObject(message.getBody(), Order.class);
        inventoryDao.addStock(order.getProductId(), order.getQuantity());
    }
}

4. 容错与补偿机制

(1) 消息可靠性

生产者端
使用 RocketMQ 事务消息,确保本地事务与消息发送的原子性。若消息发送失败,订单创建事务回滚。
消费者端
开启消费者重试(默认重试16次),若最终消费失败,消息进入死信队列(需人工干预)。

(2) 幂等性设计

订单服务
通过订单ID唯一标识,updateStatus 操作天然幂等。
库存服务
使用乐观锁(version字段)避免重复扣减。

(3) 最终一致性保障

支付失败/库存不足
通过监听失败事件触发补偿动作(订单取消 + 库存回滚)。
消息顺序性
同一订单的消息发送到同一队列(MessageQueue),保证顺序消费。


5. 方案优缺点

优点

低侵入性:业务代码仅需处理消息发送/监听,无分布式事务框架依赖。
高可用性:依赖消息队列的可靠性和重试机制,天然支持服务宕机容错。
性能友好:异步解耦,避免同步阻塞。

缺点

最终一致性延迟:依赖消息消费速度,不适用于实时性要求高的场景。
补偿逻辑需完备:需覆盖所有异常分支(如消息丢失、服务宕机)。


总结

通过消息队列实现下单-库存-支付的最终一致性,核心在于:

  1. 事件驱动:每个服务通过发布/订阅事件推进流程。
  2. 可靠消息:使用事务消息保证本地操作与消息发送的原子性。
  3. 补偿机制:监听失败事件,触发反向操作回滚状态。

此方案适合容忍短暂不一致性的场景(如电商交易),若需强一致性,可结合 Seata AT模式TCC模式

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2314928.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

使用DeepSeek完成一个简单嵌入式开发

开启DeepSeek对话 请帮我使用Altium Designer设计原理图、PCB&#xff0c;使用keil完成代码编写&#xff1b;要求&#xff1a;使用stm32F103RCT6为主控芯片&#xff0c;控制3个流水灯的原理图 这里需要注意&#xff0c;每次DeepSeek的回答都不太一样。 DeepSeek回答 以下是使…

关于我和快速幂的事()

我之前只会这样的(dfs&#xff09;&#xff1a; 不懂下面这种写法的具体逻辑&#xff1a; 看完下面的推理&#xff0c;再转转我聪明的小老戴&#xff1a; 法一中&#xff1a;把2^11看成(2^5)^2 法二中&#xff1a;把2^11看成(2^2)^5

【鸿蒙开发】Hi3861学习笔记- GPIO之直流电机

00. 目录 文章目录 00. 目录01. GPIO概述02. 直流电机概述03. ULN2003模块概述04. 硬件设计05. 软件设计06. 实验现象07. 附录 01. GPIO概述 GPIO&#xff08;General-purpose input/output&#xff09;即通用型输入输出。通常&#xff0c;GPIO控制器通过分组的方式管理所有GP…

mapbox高阶,结合threejs(threebox)添加extrusion挤出几何体,并添加侧面窗户贴图和楼顶贴图,同时添加真实光照投影

👨‍⚕️ 主页: gis分享者 👨‍⚕️ 感谢各位大佬 点赞👍 收藏⭐ 留言📝 加关注✅! 👨‍⚕️ 收录于专栏:mapbox 从入门到精通 文章目录 一、🍀前言1.1 ☘️mapboxgl.Map 地图对象1.2 ☘️mapboxgl.Map style属性1.3 ☘️threebox extrusion挤出几何体1.3 ☘️…

python-leetcode-叶子相似的树

872. 叶子相似的树 - 力扣&#xff08;LeetCode&#xff09; 下面是一个完整的 Python 函数&#xff0c;接收两个二叉树的根节点 root1 和 root2&#xff0c;返回它们是否叶相似。 代码实现 class TreeNode:def __init__(self, val0, leftNone, rightNone):self.val valself…

<03.13>八股文补充知识

import java.lang.reflect.*; public class Main {public static void main(String[] args) throws Exception {// 获取 Class 对象//1. 通过类字面量Class<?> clazz Person.class;//2 通过对象实例化String str "Hello";Class<?> clazz_str str.ge…

2025探索短剧行业新可能报告40+份汇总解读|附PDF下载

原文链接&#xff1a;https://tecdat.cn/?p41043 近年来&#xff0c;短剧以其紧凑的剧情、碎片化的观看体验&#xff0c;迅速吸引了大量用户。百度作为互联网巨头&#xff0c;在短剧领域积极布局。从早期建立行业专属模型冷启动&#xff0c;到如今构建完整的商业生态&#xf…

STM32 内置的通讯协议

数据是以帧为单位发的 USART和UART的区别就是有没有同步功能 同步是两端设备有时钟连接&#xff0c;异步是没时钟连接&#xff0c;靠约定号的频率&#xff08;波特率&#xff09;接收发送数据 RTS和CTS是用来给外界发送已“可接收”或“可发送”信号的&#xff0c;一般用不到…

信息安全访问控制、抗攻击技术、安全体系和评估(高软42)

系列文章目录 信息安全访问控制、抗攻击技术、安全体系和评估 文章目录 系列文章目录前言一、信息安全技术1.访问控制2.抗攻击技术 二、欺骗技术1.ARP欺骗2.DNS欺骗3.IP欺骗 三、抗攻击技术1.端口扫描2.强化TCP/IP堆栈 四、保证体系和评估1.保证体系2.安全风险管理 五、真题在…

晋升系列4:学习方法

每一个成功的人&#xff0c;都是从底层开始打怪&#xff0c;不断的总结经验&#xff0c;一步一步打上来的。在这个过程中需要坚持、总结方法论。 对一件事情长久坚持的人其实比较少&#xff0c;在坚持的人中&#xff0c;不断的总结优化的更少&#xff0c;所以最终达到高级别的…

脑电波控制设备:基于典型相关分析(CCA)的脑机接口频率精准解码方法

文章目录 前言一、CCA的用途二、频率求解思路三、输入数据结构四、判断方法五、matlab实践1.数据集获取及处理2.matlab代码3.运行及结果 六、参考文献 前言 在脑机接口(BCI)领域&#xff0c;有SSVEP方向&#xff0c;中文叫做稳态视觉诱发电位&#xff0c;当人观看闪烁的视觉刺激…

Android Spinner总结

文章目录 Android Spinner总结概述简单使用自定义布局自定义Adapter添加分割线源码下载 Android Spinner总结 概述 在 Android 中&#xff0c;Spinner 是一个下拉选择框。 简单使用 xml布局&#xff1a; <Spinnerandroid:id"id/spinner1"android:layout_width&…

element-ui layout 组件源码分享

layout 布局组件源码分享&#xff0c;主要从以下两个方面&#xff1a; 1、row 组件属性。 2、col 组件属性。 一、row 组件属性。 1.1 gutter 栅栏间隔&#xff0c;类型为 number&#xff0c;默认 0。 1.2 type 布局模式&#xff0c;可选 flex&#xff0c;现代浏览器下有效…

OBJ文件生成PCD文件(python 实现)

代码实现 将 .obj 文件转换为 .pcd&#xff08;点云数据&#xff09; 代码文件。 import open3d as o3d# 加载 .obj 文件 mesh o3d.io.read_triangle_mesh("bunny.obj")# 检查是否成功加载 if not mesh.has_vertices():print("无法加载 .obj 文件&#xff0c…

c++介绍智能指针 十二(1)

普通指针&#xff1a;指向内存区域的地址变量。使用普通指针容易出现一些程序错误。 如果一个指针所指向的内存区域是动态分配的&#xff0c;那么这个指针变量离开了所在的作用域&#xff0c;这块内存也不会自动销毁。动态内存不进行释放就会导致内存泄露。如果一个指针指向已…

Appium等待机制--强制等待、隐式等待、显式等待

书接上回&#xff0c;Appium高级操作--其他操作-CSDN博客文章浏览阅读182次&#xff0c;点赞6次&#xff0c;收藏7次。书接上回Appium高级操作--从源码角度解析--模拟复杂手势操作-CSDN博客。https://blog.csdn.net/fantasy_4/article/details/146162851主要讲解了Appium的一些…

计算机视觉cv2入门之图像的读取,显示,与保存

在计算机视觉领域&#xff0c;Python的cv2库是一个不可或缺的工具&#xff0c;它提供了丰富的图像处理功能。作为OpenCV的Python接口&#xff0c;cv2使得图像处理的实现变得简单而高效。 示例图片 目录 opencv获取方式 图像基本知识 颜色空间 RGB HSV 图像格式 BMP格式 …

【QT】事件系统入门——QEvent 基础与示例

一、事件介绍 事件是 应用程序内部或者外部产生的事情或者动作的统称 在 Qt 中使用一个对象来表示一个事件。所有的 Qt 事件均继承于抽象类 QEvent。事件是由系统或者 Qt 平台本身在不同的时刻发出的。当用户按下鼠标、敲下键盘&#xff0c;或者是窗口需要重新绘制的时候&…

5-27 临摹大师-IP-Adapter

前言&#xff1a; 前一节我们主要介绍ControlNet中如何对黑白照片进行上色 主要介绍ControlNet中的IP-Adapter。这个也是一种类似的风格借鉴&#xff0c;类似Reference的能力。 当然IP-Adapter有两点或许可以吸引我们&#xff0c;一个是国人腾讯公司制作的。另一个在速度和效…

Spring MVC面试题(一)

1.什么是Spring MVC&#xff1f; 全称为Model View Controller&#xff0c;Spring MVC是Spring的一个模块&#xff0c;基于MVC架构模式的一个框架 2.Spring MVC优点&#xff1f; 1.可用各种视图技术&#xff0c;不仅限于JSP 2.支持各种请求资源映射策略 3. Spring MVC工作原…