如何解决微服务下引起的 分布式事务问题

news2024/11/25 10:49:00
一、什么是分布式事务?

虽然叫分布式事务,但不是一定是分布式部署的服务之间才会产生分布式事务。不是在同一个服务或同一个数据库架构下,产生的事务,也就是分布式事务。

  • 跨数据源的分布式事务

  • 跨服务的分布式事务

二、解决方案
1、使用阿里开源的Seata框架解决分布式事务

​ 1)seata的架构

​ Seata事务管理中有三个重要的角色:

  • TC (Transaction Coordinator) - **事务协调者:**维护全局和分支事务的状态,协调全局事务提交或回滚。

  • TM (Transaction Manager) - **事务管理器:**定义全局事务的范围、开始全局事务、提交或回滚全局事务。

  • RM (Resource Manager) - **资源管理器:**管理分支事务处理的资源,与TC交谈以注册分支事务和报告分支事务的状态,并驱动分支事务提交或回滚。
    在这里插入图片描述
    3)Seata的常用模式使用

    • XA模式

      在一阶段各个本地事务执行完成后,不提交,把执行状态给事务协调者TC,此时本地事务继续持有数据库锁

      二阶段TC基于一阶段的报告来进行判断,如果一阶段均成功则通知所有的事务参与者,提交事务,如果一阶段任意一个参与者失败,则通知所有事务参与者回滚事务。

      优点:能够实现强一致性,满足ACID原则;实现简单

      缺点:性能较差;依赖数据库的事务

      I. 在application.yml文件中开启XA模式(所有参与事务的服务都需要设置):

      seata:
        data-source-proxy-mode: XA  
      

      II. 在全局事务的入口方法添加@GlobalTransactional注解

      
          @Override
          @GlobalTransactional
          public Long create(Order order) {
              // 创建订单
              orderMapper.insert(order);
              try {
                  // 扣用户余额
                  accountClient.deduct(order.getUserId(), order.getMoney());
                  // 扣库存
                  storageClient.deduct(order.getCommodityCode(), order.getCount());
              } catch (FeignException e) {
                  log.error("下单失败,原因:{}", e.contentUTF8(), e);
                  throw new RuntimeException(e.contentUTF8(), e);
              }
              return order.getId();
          }
      
    • AT模式

      和xa模式一样也是二阶段提交,不同的是AT模式本地事务结束后,直接提交。但是,它会在本地事务进行数据库数据更新的时候记录一下更新前后的快照。

​ 在二阶段需要回滚的时候,根据快照进行数据的恢复,如果二阶段全局事务提交,则把记录的快照删除。

​ 优点:性能好;实现也较为简单

​ 缺点: 存在中间状态,只能达到最终的一致性;快照功能会影响一些性能,但是相对于XA模式还是要好很多

I. 在application.yml文件中开启AT模式(所有参与事务的服务都需要设置):

seata:
  data-source-proxy-mode: AT # 默认就是AT

II. 创建相关数据库表

#在分支事务所在的库里创建记录快照的表undo_log
DROP TABLE IF EXISTS `undo_log`;
CREATE TABLE `undo_log`  (
  `branch_id` bigint(20) NOT NULL COMMENT 'branch transaction id',
  `xid` varchar(100) CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL COMMENT 'global transaction id',
  `context` varchar(128) CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL COMMENT 'undo_log context,such as serialization',
  `rollback_info` longblob NOT NULL COMMENT 'rollback info',
  `log_status` int(11) NOT NULL COMMENT '0:normal status,1:defense status',
  `log_created` datetime(6) NOT NULL COMMENT 'create datetime',
  `log_modified` datetime(6) NOT NULL COMMENT 'modify datetime',
  UNIQUE INDEX `ux_undo_log`(`xid`, `branch_id`) USING BTREE
) ENGINE = InnoDB CHARACTER SET = utf8 COLLATE = utf8_general_ci COMMENT = 'AT transaction mode undo table' ROW_FORMAT = Compact;

#在TC服务所使用的库里创建全局锁记录表lock_table
DROP TABLE IF EXISTS `lock_table`;
CREATE TABLE `lock_table`  (
  `row_key` varchar(128) CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL,
  `xid` varchar(96) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL,
  `transaction_id` bigint(20) NULL DEFAULT NULL,
  `branch_id` bigint(20) NOT NULL,
  `resource_id` varchar(256) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL,
  `table_name` varchar(32) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL,
  `pk` varchar(36) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL,
  `gmt_create` datetime NULL DEFAULT NULL,
  `gmt_modified` datetime NULL DEFAULT NULL,
  PRIMARY KEY (`row_key`) USING BTREE,
  INDEX `idx_branch_id`(`branch_id`) USING BTREE
) ENGINE = InnoDB CHARACTER SET = utf8 COLLATE = utf8_general_ci ROW_FORMAT = Compact;

III. 在全局事务的入口方法添加@GlobalTransactional注解

    @Override
    @GlobalTransactional
    public Long create(Order order) {
        // 创建订单
        orderMapper.insert(order);
        try {
            // 扣用户余额
            accountClient.deduct(order.getUserId(), order.getMoney());
            // 扣库存
            storageClient.deduct(order.getCommodityCode(), order.getCount());
        } catch (FeignException e) {
            log.error("下单失败,原因:{}", e.contentUTF8(), e);
            throw new RuntimeException(e.contentUTF8(), e);
        }
        return order.getId();
    }
2、使用RocketMQ实现可靠消息最终一致性方案 (适用于不同项目的情况)在这里插入图片描述

模拟转账 a银行向b银行转账

a银行业务代码:

减少金额,像mq发送事务消息

  1. 引入rocketmq依赖
<dependency>
   <groupId>org.apache.rocketmq</groupId>
   <artifactId>rocketmq‐spring‐boot‐starter</artifactId>
   <version>2.0.2</version>
</dependency>

2)配置rocketmq

rocketmq.producer.group = zhuoye #设置生产者组的名称
rocketmq.name‐server = 127.0.0.1:9876  #指定rocketmq的地址

3) 业务层代码

@Service
@Slf4j
public class UserAccountServiceImpl implements UserAccountService {

   @Autowired
   private UserAccountMapper userAccountMapper;

   @Autowired
   private  RocketMQTemplate rocketMQTemplate;
    
   @Autowired
   private TansactionalRecordMapper tansactionalRecordMapper;


    //向mq发送转账消息
    @Override
    public void sendTransferAccountsMessagesToMq(AccountChangeEvent accountChangeEvent) {

        //将accountChangeEvent转成json
        JSONObject jsonObject =new JSONObject();
        jsonObject.put("transferAccountInfo",accountChangeEvent);
        String jsonString = jsonObject.toJSONString();
        //生成message类型
        Message<String> message = MessageBuilder.withPayload(jsonString).build();
        //发送一条事务消息
        /**
         * String txProducerGroup 生产组
         * String destination topic,
         * Message<?> message, 消息内容
         * Object arg 参数
         */ 
        rocketMQTemplate.sendMessageInTransaction("transferAccount_ABank","topic_transferAccount",message,null);

    }

    //更新账户,扣减金额
    @Override
    @Transactional
    public void doUpdateAccountBalance(AccountChangeEvent accountChangeEvent) {
        //幂等判断
        if(tansactionalRecordMapper.isExist(accountChangeEvent.getTxNo())>0){
            return ;
        }
        //扣减金额
        userAccountMapper.updateAccountBalance(accountChangeEvent.getAccountNo(),accountChangeEvent.getAmount() * -1);
        //添加事务日志
        tansactionalRecordMapper.add(accountChangeEvent.getTxNo());
    }

4)编写RocketMQLocalTransactionListener接口实现类

@Component
@Slf4j
@RocketMQTransactionListener(txProducerGroup = "transferAccount_ABank")
public class ProducerTxmsgListener implements RocketMQLocalTransactionListener {

    @Autowired
    private UserAccountService userAccountService;

   @Autowired
   private UserAccountMapper userAccountMapper;
    
   @Autowired
   private TansactionalRecordMapper tansactionalRecordMapper;

    //事务消息发送后的回调方法,当消息发送给mq成功,此方法被回调
    @Override
    @Transactional
    public RocketMQLocalTransactionState executeLocalTransaction(Message message, Object o) {

        try {
            //解析message,转成AccountChangeEvent
            String messageString = new String((byte[]) message.getPayload());
            JSONObject jsonObject = JSONObject.parseObject(messageString);
            String accountChangeString = jsonObject.getString("transferAccountInfo");
            //将accountChange(json)转成AccountChangeEvent
            AccountChangeEvent accountChangeEvent = JSONObject.parseObject(accountChangeString, AccountChangeEvent.class);
            //执行本地事务,扣减金额
            userAccountService.doUpdateAccountBalance(accountChangeEvent);
            //当返回RocketMQLocalTransactionState.COMMIT,自动向mq发送commit消息,mq将消息的状态改为可消费
            return RocketMQLocalTransactionState.COMMIT;
        } catch (Exception e) {
            e.printStackTrace();
            return RocketMQLocalTransactionState.ROLLBACK;
        }
    }

    //事务状态回查,查询是否扣减金额
    @Override
    public RocketMQLocalTransactionState checkLocalTransaction(Message message) {
        //解析message,转成AccountChangeEvent
        String messageString = new String((byte[]) message.getPayload());
        JSONObject jsonObject = JSONObject.parseObject(messageString);
        String accountChangeString = jsonObject.getString("transferAccountInfo");
        //将accountChange(json)转成AccountChangeEvent
        AccountChangeEvent accountChangeEvent = JSONObject.parseObject(accountChangeString, AccountChangeEvent.class);
        //事务id
        String txNo = accountChangeEvent.getTxNo();
        int isExist = tansactionalRecordMapper.isExist(txNo);
        if(isExist>0){
            return RocketMQLocalTransactionState.COMMIT;
        }else{
            return RocketMQLocalTransactionState.UNKNOWN;
        }
    }
}

b银行业务代码(前两步一样):

接收消息,增加金额

1)业务层代码

@Service
@Slf4j
public class UserAccountServiceImpl implements UserAccountService {

   @Autowired
   private UserAccountMapper userAccountMapper;
    
   @Autowired
   private TansactionalRecordMapper tansactionalRecordMapper;

    //更新账户,增加金额
    @Override
    @Transactional
    public void addAccountInfoBalance(AccountChangeEvent accountChangeEvent) {
        //已更新
        if(tansactionalRecordMapper.isExist(accountChangeEvent.getTxNo())>0){
            return ;
        }
        //增加金额
        userAccountMapper.updateAccountBalance(accountChangeEvent.getAccountNo(),accountChangeEvent.getAmount());
        //添加事务记录,用于幂等
        tansactionalRecordMapper.add(accountChangeEvent.getTxNo());
    }
}

2)监听事务消息

@Component
@Slf4j
@RocketMQMessageListener(consumerGroup = "transferAccount_ABank",topic = "topic_transferAccount",maxReconsumeTimes = 3)
public class TxmsgConsumer implements RocketMQListener<String> {

    @Autowired
    UserAccountService userAccountService;

    //接收消息
    @Override
    public void onMessage(String message) {
        //解析消息
        JSONObject jsonObject = JSONObject.parseObject(message);
        String accountChangeString = jsonObject.getString("transferAccountInfo");
        //转成AccountChangeEvent
        AccountChangeEvent accountChangeEvent = JSONObject.parseObject(accountChangeString, AccountChangeEvent.class);
        //更新本地账户,增加金额
        userAccountService.addAccountInfoBalance(accountChangeEvent);
    }
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1938977.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

华为机试HJ60查找组成一个偶数最接近的两个素数

华为机试HJ60查找组成一个偶数最接近的两个素数 题目&#xff1a; 想法&#xff1a; 构建一个判断是否为素数的函数&#xff0c;再构建一个函数输出构成输入数值相差最小的两个素数。为了保证两个素数相差最小&#xff0c;从输入数值的二分之一处开始判断&#xff0c;遍历得到…

用Python写一个视频采集脚本,对某网站进行批量采集

最近某牙上又出现一批高质量视频&#xff0c;听说删的很快&#xff0c;还好我会Python&#xff0c;赶紧采集下来保存&#xff01; 准备工作 环境使用 Python 3.10 解释器 Pycharm 编辑器 模块使用 requests >>> 数据请求模块 re <正则表达式模块> os <文…

HW行动在即,邮件系统该怎么防守?

1. 什么是HW行动&#xff1f; HW行动是一项由公安部牵头&#xff0c;旨在评估企事业单位网络安全防护能力的活动&#xff0c;是国家应对网络安全问题所做的重要布局之一。 具体实践中&#xff0c;公安部组织攻防红、蓝两队&#xff08;红队为攻击队&#xff0c;主要由“国家队…

【漏洞复现】Netgear WN604 downloadFile.php 信息泄露漏洞(CVE-2024-6646)

0x01 产品简介 NETGEAR WN604是一款由NETGEAR&#xff08;网件&#xff09;公司生产的无线接入器&#xff08;或无线路由器&#xff09;提供Wi-Fi保护协议&#xff08;WPA2-PSK, WPA-PSK&#xff09;&#xff0c;以及有线等效加密&#xff08;WEP&#xff09;64位、128位和152…

面向初学者和专家的 40 大机器学习问答(2024 年更新)

面向初学者和专家的 40 大机器学习问答(2024 年更新) 一、介绍 机器学习是人工智能的重要组成部分,目前是数据科学中最受欢迎的技能之一。如果你是一名数据科学家,你需要擅长 python、SQL 和机器学习——没有两种方法。作为 DataFest 2017 的一部分,我们组织了各种技能测…

正则表达式(Ⅰ)——基本匹配

学习练习建议 正则表达式用途非常广泛&#xff0c;各种语言中都能见到它的身影&#xff08;js&#xff0c;java&#xff0c;mysql等&#xff09; 正则表达式可以快读校验/生成/替换符合要求的模式的字符串&#xff0c;而且语法通俗易懂&#xff0c;所以应用广泛 学习链接&am…

php随机海量高清壁纸系统源码,数据采集于网络,使用很方便

2022 多个分类随机海量高清壁纸系统源码&#xff0c;核心文件就两个&#xff0c;php文件负责采集&#xff0c;html负责显示&#xff0c;很简单。做流量工具还是不错的。 非第三方接口&#xff0c;图片数据采集壁纸多多官方所有数据&#xff01; 大家拿去自行研究哈&#xff0…

WEB前端09-前端服务器搭建(Node.js/nvm/npm)

前端服务器的搭建 在本文中&#xff0c;我们将介绍如何安装和配置 nvm&#xff08;Node Version Manager&#xff09;以方便切换不同版本的 Node.js&#xff0c;以及如何设置 npm&#xff08;Node Package Manager&#xff09;使用国内镜像&#xff0c;并搭建一个简单的前端服…

Merge-On-Read

基本介绍 Iceberg的Merge-On-Read Merge-On-Read&#xff0c;顾名思义&#xff0c;就是在读取的时候进行合并&#xff0c;是与Copy-On-Write相反的一种模式 在Iceberg中&#xff0c;Merge-On-Read同样用于行级更新&#xff0c;整体过程如下 当更新数据时&#xff0c;Iceber…

巴黎奥运会倒计时 一个非常不错的倒计时提醒

巴黎奥运会还有几天就要开幕了&#xff0c;大家应该到处都可以看到巴黎奥运会的倒计时&#xff0c;不管是电视上&#xff0c;还是网络里&#xff0c;一搜索奥运会&#xff0c;就会看到。倒计时其实是一个我们在生活中很常用的一个方法&#xff0c;用来做事情的提醒&#xff0c;…

【学习笔记】无人机系统(UAS)的连接、识别和跟踪(九)-无人机区域地面探测与避让(DAA)

引言 3GPP TS 23.256 技术规范&#xff0c;主要定义了3GPP系统对无人机&#xff08;UAV&#xff09;的连接性、身份识别、跟踪及A2X&#xff08;Aircraft-to-Everything&#xff09;服务的支持。 3GPP TS 23.256 技术规范&#xff1a; 【免费】3GPPTS23.256技术报告-无人机系…

【CSS in Depth 2 精译_020】3.3 元素的高度

当前内容所在位置&#xff08;可进入专栏查看其他译好的章节内容&#xff09; 第一章 层叠、优先级与继承&#xff08;已完结&#xff09; 1.1 层叠1.2 继承1.3 特殊值1.4 简写属性1.5 CSS 渐进式增强技术1.6 本章小结 第二章 相对单位&#xff08;已完结&#xff09; 2.1 相对…

Linux:Linux进程概念

目录 前言 1. 冯诺依曼体系结构 2. 操作系统 2.1 什么是操作系统 3. 进程 3.1 基本概念 3.2 描述进程——PCB 3.3 进程和程序的区别 3.4 task_struct-PCB的一种 3.5 task_struct的内容分类 4. 查看进程 4.1 通过系统文件查看进程 4.2 通过ps指令查看进程 4.3 …

Redis7(二)Redis持久化双雄

持久化之RDB RDB的持久化方式是在指定时间间隔&#xff0c;执行数据集的时间点快照。也就是在指定的时间间隔将内存中的数据集快照写入磁盘&#xff0c;也就是Snapshot内存快照&#xff0c;它恢复时再将硬盘快照文件直接读回到内存里面。 RDB保存的是dump.rdb文件。 自动触发…

记录些Spring+题集(3)

百万QPS下热点数据的收集方案 在高并发场景下&#xff0c;如京东、淘宝的秒杀活动开始时候&#xff0c;会有很多的用户同时抢购秒杀商品&#xff0c;由于同一个场次成百上千种商品参与秒杀活动&#xff0c;但是热点的商品往往就只有那么几十个左右&#xff0c;此时系统的90%的…

linux桌面运维---第八天

1、rm命令&#xff1a; 用法&#xff1a;删除一个文件或者目录。 语法&#xff1a;rm [选项] name... 选项&#xff1a; -f 即使原档案属性设为唯读&#xff0c;亦直接删除&#xff0c;无需逐一确认。-r 将目录及以下之档案亦逐一删除。需要进行一一确认 2、ln命令&#…

Lamp 小白菜鸟从入门到精通

前言 “LAMP包”的脚本组件中包括了CGIweb接口&#xff0c;它在90年代初期变得流行。这个技术允许网页浏览器的用户在服务器上执行一个程序&#xff0c;并且和接受静态的内容一样接受动态的内容。程序员使用脚本语言来创建这些程序因为它们能很容易有效的操作文本流&#xff0…

SpringBoot限制请求访问次数

本篇文章的主要内容是SpringBoot怎么限制请求访问次数。 当我们的服务端程序部署到服务器上后&#xff0c;就要考虑很多关于安全的问题。总会有坏人来攻击你的服务&#xff0c;比如说会窃取你的数据或者给你的服务器上强度。关于给服务器上强度&#xff0c;往往就有高强度给服务…

【ffmpeg入门】安装CUDA并使用gpu加速

文章目录 前言CUDACUDA是什么CUDA 的主要组成部分CUDA 的优点CUDA 的基本编程模型安装CUDA ffmpeg使用gpu加速为什么需要使用gpu加速1. 提高处理速度2. 减少 CPU 负载3. 提高实时处理能力4. 支持高分辨率和复杂编码格式5. 提供更好的可扩展性6. 提高能效 ffmpeg使用gpu加速常用…

在项目中加入 husky + lint-staged + eslint,代码检测格式化

背景 由于日常工作的多人协作中&#xff0c;会因为个人代码编写风格导致代码在不同人电脑上&#xff0c;会有异常代码格式的提示&#xff0c;为了解决这个小问题&#xff0c;我们可以使用 husky lint-staged 来对代码进行一定程度上的格式化&#xff0c;使格式风格统一&#x…