Springboot+RocketMQ通过事务消息优雅的实现订单支付功能

news2025/1/8 4:35:28

目录

1. 事务消息

1.1 RocketMQ事务消息的原理

1.2 RocketMQ订单支付功能设计


1. 事务消息

RocketMQ的事务消息,是指发送消息事件和其他事件需要同时成功或同时失败。比如银行转账, A银行的某账户要转一万元到B银行的某账户。A银行发送“B银行账户增加一万元”这个消息,要和“从A银 行账户扣除一万元”这个操作同时成功或者同时失败。RocketMQ采用两阶段提交的方式实现事务消息。

1.1 RocketMQ事务消息的原理

  1. 半事务消息发送:生产者将半事务消息发送至RocketMQ服务端。

  2. 消息持久化及返回Ack确认:RocketMQ服务端接收到半事务消息并持久化成功后,向生产者返回Ack确认消息已经发送成功。此时消息状态为半事务消息。

  3. 执行本地事务逻辑:根据发送结果执行本地事务,如果写入失败,此时half消息对业务不可见,本地事务逻辑不执行。

  4. 提交二次确认结果:根据本地事务状态执行Commit或者Rollback。RocketMQ 的事务消息分为3种状态,分别是提交状态、回滚状态、未知状态。
    TransactionStatus.CommitTransaction: 提交事务,它允许消费者消费此消息。
    TransactionStatus.RollbackTransaction: 回滚事务,它代表该消息将被删除,不允许被消费。
    TransactionStatus.Unknown: 未知状态,它代表需要检查消息队列来确定状态。

  5. 消息回查:(1) 对没有Commit/Rollback的事务消息,从服务端发起一次回查 (2) Producer收到回查消息,检查回查消息对应的本地事务的状态 (3) 根据本地事务状态,重新Commit或者Rollback。第一次回查后仍未获取到事务状态,则之后每隔30s会再次回查,最多重试15次,超过了就会默认丢弃此消息。

1.2 RocketMQ订单支付功能设计

数据库设计

/*
SQLyog Community v13.2.0 (64 bit)
MySQL - 8.0.33 : Database - shop
*********************************************************************
*/

/*!40101 SET NAMES utf8 */;

/*!40101 SET SQL_MODE=''*/;

/*!40014 SET @OLD_UNIQUE_CHECKS=@@UNIQUE_CHECKS, UNIQUE_CHECKS=0 */;
/*!40014 SET @OLD_FOREIGN_KEY_CHECKS=@@FOREIGN_KEY_CHECKS, FOREIGN_KEY_CHECKS=0 */;
/*!40101 SET @OLD_SQL_MODE=@@SQL_MODE, SQL_MODE='NO_AUTO_VALUE_ON_ZERO' */;
/*!40111 SET @OLD_SQL_NOTES=@@SQL_NOTES, SQL_NOTES=0 */;
CREATE DATABASE /*!32312 IF NOT EXISTS*/`shop` /*!40100 DEFAULT CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci */ /*!80016 DEFAULT ENCRYPTION='N' */;

USE `shop`;

/*Table structure for table `shop_order` */

DROP TABLE IF EXISTS `shop_order`;

CREATE TABLE `shop_order` (
  `id` VARCHAR(50) CHARACTER SET utf8mb3 COLLATE utf8mb3_bin NOT NULL COMMENT '订单id',
  `total_num` INT DEFAULT NULL COMMENT '数量合计',
  `moneys` INT DEFAULT NULL COMMENT '金额合计',
  `pay_type` VARCHAR(1) CHARACTER SET utf8mb3 COLLATE utf8mb3_bin DEFAULT NULL COMMENT '支付类型,1、在线支付、0 货到付款',
  `create_time` DATETIME DEFAULT NULL COMMENT '订单创建时间',
  `update_time` DATETIME DEFAULT NULL COMMENT '订单更新时间',
  `pay_time` DATETIME DEFAULT NULL COMMENT '付款时间',
  `consign_time` DATETIME DEFAULT NULL COMMENT '发货时间',
  `end_time` DATETIME DEFAULT NULL COMMENT '交易完成时间',
  `username` VARCHAR(50) CHARACTER SET utf8mb3 COLLATE utf8mb3_bin DEFAULT NULL COMMENT '用户名称',
  `recipients` VARCHAR(50) CHARACTER SET utf8mb3 COLLATE utf8mb3_bin DEFAULT NULL COMMENT '收货人',
  `recipients_mobile` VARCHAR(12) CHARACTER SET utf8mb3 COLLATE utf8mb3_bin DEFAULT NULL COMMENT '收货人手机',
  `recipients_address` VARCHAR(200) CHARACTER SET utf8mb3 COLLATE utf8mb3_bin DEFAULT NULL COMMENT '收货人地址',
  `weixin_transaction_id` VARCHAR(30) CHARACTER SET utf8mb3 COLLATE utf8mb3_bin DEFAULT NULL COMMENT '交易流水号',
  `order_status` INT DEFAULT NULL COMMENT '订单状态,0:未完成,1:已完成,2:已退货',
  `pay_status` INT DEFAULT NULL COMMENT '支付状态,0:未支付,1:已支付,2:支付失败',
  `is_delete` INT DEFAULT NULL COMMENT '是否删除',
  PRIMARY KEY (`id`),
  KEY `create_time` (`create_time`),
  KEY `status` (`order_status`),
  KEY `payment_type` (`pay_type`)
) ENGINE=INNODB DEFAULT CHARSET=utf8mb3 COLLATE=utf8mb3_bin;

/*Data for the table `shop_order` */

/*!40101 SET SQL_MODE=@OLD_SQL_MODE */;
/*!40014 SET FOREIGN_KEY_CHECKS=@OLD_FOREIGN_KEY_CHECKS */;
/*!40014 SET UNIQUE_CHECKS=@OLD_UNIQUE_CHECKS */;
/*!40111 SET SQL_NOTES=@OLD_SQL_NOTES */;

添加RocketMQ依赖

<dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-spring-boot-starter</artifactId>
</dependency>

bootstrap.yaml配置

server:
  port: 8085
spring:
  application:
    name: mall-order
  datasource:
    driver-class-name: com.mysql.cj.jdbc.Driver
    url: jdbc:mysql://localhost:3306/shop?useUnicode=true&characterEncoding=UTF-8&serverTimezone=UTC
    username: root
    password: 123456
  cloud:
    nacos:
      config:
        file-extension: yaml
        server-addr: localhost:8848
      discovery:
        #Nacos的注册地址
        server-addr: localhost:8848

rocketmq:
  name-server: localhost:9876
  producer:
    group: test-group-producer

Service层

public interface OrderService extends IService<Order> {
       //添加订单
       void add(Order order);
       //修改订单支付状态
       void pay(String id);
}

@Service
public class OrderServiceImpl extends ServiceImpl<OrderMapper, Order>
    implements OrderService{

    @Autowired
    OrderMapper orderMapper;

    @Transactional(rollbackFor = Exception.class)
    @Override
    public void add(Order order) {
        order.setCreateTime(new Date());
        orderMapper.insert(order); //这里仅仅生成订单,还有扣减库存等等一系列操作省略
    }
    @Transactional(rollbackFor = Exception.class)
    @Override
    public void pay(String id) {
        //模拟支付完成,修改订单的支付状态
        Order order = orderMapper.selectById(id);
        order.setPayStatus(1);
        order.setPayTime(new Date());
        orderMapper.updateById(order);
    }
}

创建生产者

@RestController
@Slf4j
public class TestController {

    @Autowired
    OrderMapper orderMapper;

    @Autowired
    RocketMQTemplate rocketMQTemplate;

    @RequestMapping("/send")
    public String send(){
        String id = UUID.randomUUID().toString();
        String msg = "订单"+id+"支付成功";
        Order order=new Order();
        order.setId(id);
        order.setCreateTime(new Date());
        order.setMoneys(100);
        order.setUsername("张三");
        Message<String> message = MessageBuilder.withPayload(msg).setHeader("key",id).build();
        TransactionSendResult result = rocketMQTemplate.sendMessageInTransaction("order", message, order);
        String transactionId = result.getTransactionId();
        String status = result.getSendStatus().name();
        log.info("发送消息成功 transactionId={} status={} ",transactionId,status);
        return "success";
    }
}

创建消费者

@Component
@Slf4j
@RocketMQMessageListener(consumerGroup = "test-consumer",topic = "order",messageModel = MessageModel.CLUSTERING)
public class RocketMQListen implements RocketMQListener<MessageExt> {
    @Override
    public void onMessage(MessageExt messageExt) {
        String body = new String(messageExt.getBody(), StandardCharsets.UTF_8);
        System.out.println(body);
    }
}

生产者消息监听器

@Component
@RocketMQTransactionListener
public class TransactionMsgListener implements RocketMQLocalTransactionListener {

    @Autowired
    OrderService orderService;



    @Override
    public RocketMQLocalTransactionState executeLocalTransaction(Message message, Object o) {
        Order order = (Order) o;
        try {
            //生成订单
            orderService.add(order);
            return RocketMQLocalTransactionState.UNKNOWN;
        }catch (Throwable throwable){
            throwable.printStackTrace();
            return RocketMQLocalTransactionState.ROLLBACK;
        }
    }

    @Override
    public RocketMQLocalTransactionState checkLocalTransaction(Message message) {
        String key = message.getHeaders().get("key").toString();
        System.out.println("回查订单id "+key+" 回查时间"+new Date());
        Order order = orderService.getById(key);
        if(order!=null) {
            long l = new Date().getTime() - order.getCreateTime().getTime();
            long time = l / (1000 * 60);
            //超时1分钟后,就会把未支付的订单进行删除
            if (time > 1) {
                orderService.removeById(key);
                System.out.println("订单" + key + "删除");
                //订单,库存等一系列操作
                return RocketMQLocalTransactionState.ROLLBACK;
            }
            Integer payStatus = order.getPayStatus();
            if (payStatus == 1) {
                return RocketMQLocalTransactionState.COMMIT;
            }
            return RocketMQLocalTransactionState.UNKNOWN;
        }
        else
            return RocketMQLocalTransactionState.ROLLBACK;
    }
}

测试

这里通过生产者发送五个事务消息,生成五个订单,然后两个订单在一分钟内修改支付状态为已支付,超时一分钟未支付就会删除订单回退。运行截图如下:

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1368352.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

如何让GPT支持中文

上一篇已经讲解了如何构建自己的私人GPT&#xff0c;这一篇主要讲如何让GPT支持中文。 privateGPT 本地部署目前只支持基于llama.cpp 的 gguf格式模型&#xff0c;GGUF 是 llama.cpp 团队于 2023 年 8 月 21 日推出的一种新格式。它是 GGML 的替代品&#xff0c;llama.cpp 不再…

记一次JSF异步调用引起的接口可用率降低

前言 本文记录了由于JSF异步调用超时引起的接口可用率降低问题的排查过程&#xff0c;主要介绍了排查思路和JSF异步调用的流程&#xff0c;希望可以帮助大家了解JSF的异步调用原理以及提供一些问题排查思路。本文分析的JSF源码是基于JSF 1,7.5-HOTFIX-T6版本。 起因 问题背景…

强化学习的数学原理学习笔记 - Actor-Critic

文章目录 概览&#xff1a;RL方法分类Actor-CriticBasic actor-critic / QAC&#x1f7e6;A2C (Advantage actor-critic)Off-policy AC&#x1f7e1;重要性采样&#xff08;Importance Sampling&#xff09;Off-policy PGOff-policy AC &#x1f7e6;DPG (Deterministic AC) 本…

使用fs.renameSync(oldPath,newPath)方法,报错Error: ENOENT: no such file or directory

报错翻译&#xff1a;由于文件或目录不存在导致的。 解决方法&#xff1a;查看给定的路径&#xff0c;确保路径和文件名正确&#xff0c;并且文件或目录确实存在。

C语言--结构体详解

C语言--结构体详解 1.结构体产生原因2.结构体声明2.1 结构体的声明2.2 结构体的初始化2.3结构体自引用 3.结构体内存对齐3.1 对齐规则3.2 为什么存在内存对齐3.3 修改默认对⻬数 4. 结构体传参 1.结构体产生原因 C语言将数据类型分为了两种&#xff0c;一种是内置类型&#xf…

Spring学习 Spring事务控制

7.1.事务介绍 7.1.1.什么是事务&#xff1f; 当你需要一次执行多条SQL语句时&#xff0c;可以使用事务。通俗一点说&#xff0c;如果这几条SQL语句全部执行成功&#xff0c;则才对数据库进行一次更新&#xff0c;如果有一条SQL语句执行失败&#xff0c;则这几条SQL语句全部不…

2.SPSS数据文件的建立和管理

文章目录 数据文件的特点建立SPSS数据文件步骤 数据文件的结构变量的规则 数据的录入和保存录入数据保存文件 数据的编辑数据定位 数据文件的特点 SPSS数据库文件包括文件结构和数据两部分 SPSS数据文件中的一列数据称为一个变量。每个变量都应有一个名称&#xff0c;即&…

面试算法100:三角形中最小路径之和

题目 在一个由数字组成的三角形中&#xff0c;第1行有1个数字&#xff0c;第2行有2个数字&#xff0c;以此类推&#xff0c;第n行有n个数字。例如&#xff0c;下图是一个包含4行数字的三角形。如果每步只能前往下一行中相邻的数字&#xff0c;请计算从三角形顶部到底部的路径经…

centos7新建普通用户并设置分组和密码

sudo -i获取root权限 添加分组group1 groupadd group1 添加用户并设置分组为group1密码为password1 useradd user1 -g group1 -p password1 su user1 切换到 user1

第7章-第6节-Java中的Map集合

1、HashMap&#xff1a; 1&#xff09;、 引入 如果业务需要我们去用姓名的拼音手写字母匹配完整姓名&#xff0c;那么如果用单列数据&#xff0c;我们可能需要两个集合才能存储&#xff0c;而且两个集合之间没有关联不好操作&#xff0c;这种时候双列数据就会起很大作用 2&…

Mysql : command not found

1.Mysql : command not found 安装成功的mysql&#xff0c;并且服务已经启动&#xff0c;查看进行是可以看到的&#xff0c;但是使用命令登录操作&#xff0c;却抛出错误&#xff1a;command not found。 2.解决方案 2.1 查看/usr/bin目录下是否有mysql服务连接 ls /usr/bin…

服务网格 Service Mesh

什么是服务网格&#xff1f; 服务网格是一个软件层&#xff0c;用于处理应用程序中服务之间的所有通信。该层由容器化微服务组成。随着应用程序的扩展和微服务数量的增加&#xff0c;监控服务的性能变得越来越困难。为了管理服务之间的连接&#xff0c;服务网格提供了监控、记…

Linux 期末复习

Linux 期末复习 计算机历史 硬件基础 1&#xff0c;计算机硬件的五大部件&#xff1a;控制器、运算器、存储器、输入输出设备 2&#xff0c;cpu分为精简指令集(RISC)和复杂指令集(CISC) 3&#xff0c;硬件只认识0和1&#xff0c;最小单位是bit&#xff0c;最小存储单位是字…

应用统计学期末复习简答题

应用统计学期末复习简答题 1&#xff0e;解释众数、中位数和调和平均数。2、什么是普查&#xff1f;其有何特点和作用&#xff1f;3、什么是抽样调查&#xff1f;其有何特点和作用&#xff1f;4、什么是偏度系数&#xff0c;并解释左偏分布、对称分布和右偏分布。5、解释置信区…

竞赛保研 基于深度学习的人脸表情识别

文章目录 0 前言1 技术介绍1.1 技术概括1.2 目前表情识别实现技术 2 实现效果3 深度学习表情识别实现过程3.1 网络架构3.2 数据3.3 实现流程3.4 部分实现代码 4 最后 0 前言 &#x1f525; 优质竞赛项目系列&#xff0c;今天要分享的是 基于深度学习的人脸表情识别 该项目较…

结构型设计模式——适配器模式

适配器模式 这个更加好理解&#xff0c;就是做适配功能的类&#xff0c;例如&#xff0c;现在手机没有了圆形耳机接口&#xff0c;只有Type-C接口&#xff0c;因此你如果还想要使用圆形耳机的话需要买个圆形接口转Type-C的转换器&#xff08;适配器&#xff09;&#xff0c;这…

一分钟带你了解深度学习算法

深度学习是一种受到生物学启发的机器学习方法&#xff0c;其目标是通过构建多层神经网络来模拟人脑的工作原理。它在过去几十年来取得了巨大的进展&#xff0c;并在图像识别、语音识别、自然语言处理等领域取得了突破性的成果。 深度学习的核心思想是模仿人脑的神经网络。人脑中…

HarmonyOS应用开发学习笔记 Want概述Ability跳转

一、Want的定义与用途 Want是对象间信息传递的载体&#xff0c;可以用于应用组件间的信息传递。其使用场景之一是作为startAbility()的参数&#xff0c;包含了指定的启动目标以及启动时需携带的相关数据&#xff0c;如bundleName和abilityName字段分别指明目标Ability所在应用…

vue element ui table表格--实现列的显示与隐藏

前言 实现效果 提示&#xff1a;代码段太简单就不解释了&#xff0c;自己看代码自己更改&#xff0c;下面代码直接无脑复制更改就行 一、实现代码&#xff1f; <template><div id"app"><el-table :data"tableData" border style"w…

Spring 见解 7 基于注解的AOP控制事务

8.基于注解的AOP控制事务 8.1.拷贝上一章代码 8.2.applicationContext.xml <!-- 开启spring对注解事务的支持 --> <tx:annotation-driven transaction-manager"transactionManager"/> 8.3.service Service Transactional(readOnlytrue,propagation Pr…