分布式事务之最终一致性

news2025/1/12 6:09:45

分布式事务之最终一致性

    • 参考链接
    • 分布式事务基础理论
    • 概述案例
    • 解决方案:RocketMQ可靠消息
    • 注意事项:
    • 代码实现

参考链接

原文链接:https://blog.csdn.net/jikeyeka/article/details/126296938

分布式事务基础理论

基于上述的CAP和BASE理论,一般情况下会保证P和A,舍弃C,保证最终一致性。最终一致是指经过一段时间后,所有节点数据都将会达到一致。如订单的"支付中"状态,最终会变为“支付 成功”或者"支付失败",使订单状态与实际交易结果达成一致,但需要一定时间的延迟、等待。

概述案例

此方案的核心是将分布式事务拆分成多个本地事务,然后通过网络由消息队列协调完成所有事务,并实现最终一致性。以商城下单为例:
在这里插入图片描述

  1. 消息发送方,用户下单: 创建订单,然后通过网络发送消息到MQ

  2. 消息接收方,扣减库存: 通过网络从MQ中接收消息,然后扣减库存

该解决方案容易理解,实现成本低,但是面临以下几个问题:

1.消息发送方执行本地事务与发送消息的原子性问题,也就是说如何保证本地事务执行成功,消息一定发送成功

begin transaction
1.数据库操作
2.发送消息
commit transation

这种情况下,貌似没有问题,如果发送消息失败,就会抛出异常,导致数据库事务回滚。但如果是超时异常,数据库回滚,但此时消息已经正常发送了,同样会导致不一致。

2.消息接收方接收消息与本地事务的原子性问题,也就是说如何保证接收消息成功后,本地事务一定执行成功

3.由于消息可能会重复发送,这就要求消息接收方必须实现幂等性

由于在生产环境中,消费方很有可能是个集群,若某一个消费节点超时但是消费成功,会导致集群同组 其他节点重复消费该消息。另外意外宕机后恢复,由于消费进度没有及时写入磁盘,会导致消费进度部 分丢失,从而导致消息重复消费。

解决方案:RocketMQ可靠消息

RocketMQ 是一个来自阿里巴巴的分布式消息中间件,于 2012 年开源,并在 2017 年正式成为 Apache 顶级项目。Apache RocketMQ 4.3之后的版本正式支持事务消息,为分布式事务实现提供了便 利性支持。因此,我们通过RocketMQ就可以解决前面的问题。

1.消息发送方执行本地事务与发送消息的原子性问题,也就是说如何保证本地事务执行成功,消息一定发送成功

RocketMQ中的Broker 与 发送方 具备双向通信能力,使得 broker 天生可以作为一个事务协调者存在;并且RocketMQ 本身提供了存储机制,使得事务消息可以持久化保存;这些优秀的设计可以保证即使发生了异常,RocketMQ依然能够保证达成事务的最终一致性。
在这里插入图片描述

  1. 发送方发送一个事务消息给Broker,RocketMQ会将消息状态标记为“Prepared”,此时这条消息暂时不能被接收方消费。这样的消息称之为Half Message,即半消息。

  2. Broker返回发送成功给发送方

  3. 发送方执行本地事务,例如操作数据库

  4. 若本地事务执行成功,发送commit消息给Broker,RocketMQ会将消息状态标记为“可消费”,此 时这条消息就可以被接收方消费;若本地事务执行失败,发送rollback消息给Broker,RocketMQ 将删除该消息

  5. 如果发送方在本地事务过程中,出现服务挂掉,网络闪断或者超时,那Broker将无法收到确认结 果

  6. 此时RocketMQ将会不停的询问发送方来获取本地事务的执行状态(即事务回查)

  7. 根据事务回查的结果来决定Commit或Rollback,这样就保证了消息发送与本地事务同时成功或同时失败。

以上主干流程已由RocketMQ实现,对于我们来说只需要分别实现本地事务执行的方法以及本地事务回查的方法即可,具体来说就是实现下面这个接口:

public interface TransactionListener {
/**
- 发送prepare消息成功后回调该方法用于执行本地事务
- @param msg 回传的消息,利用transactionId即可获取到该消息的唯一Id
- @param arg 调用send方法时传递的参数,当send时候若有额外的参数可以传递到send方法中,
这里能获取到
- @return 返回事务状态,COMMIT:提交 ROLLBACK:回滚 UNKNOW:未知,需要回查
*/
LocalTransactionState executeLocalTransaction(final Message msg, final
Object arg);
/**
- @param msg 通过获取transactionId来判断这条消息的本地事务执行状态
- @return 返回事务状态,COMMIT:提交 ROLLBACK:回滚 UNKNOW:未知,需要回查
*/
LocalTransactionState checkLocalTransaction(Message msg);
}

2.消息接收方接收消息与本地事务的原子性问题,也就是说如何保证接收消息成功后,本地事务一定执行成功

  1. 如果是出现了异常,RocketMQ会通过重试机制,每隔一段时间消费消息,然后再执行本地事务;如果 是超时,RocketMQ就会无限制的消费消息,不断的去执行本地事务,直到成功为止。
  2. 实现逆向流程,比如下单流程:在订单服务执行本地事务成功,消息commit, 然后 商品服务 消费消息进行扣减库存,在商品服务扣减库存过程中发生异常,那么先将本地数据回滚,然后发送回滚消息,在订单服务监听回滚消息进行处理。

在这里插入图片描述

注意事项:

  1. 因为使用最终一致性解决分布式事务,可能出现的问题:
    发送方本地事务执行完成,消息commit, 这时候返回客户是成功,但是消费者可能失败,会导致数据回滚,所以为了避免用户看到下单成功后,因为数据回滚导致订单数据消失,所以需要增加一个中间状态,比如:下单中,只有所有的消费者消费成功后,在改变订单状态为下单成功。

  2. 发送方监听事务消息 RocketMQLocalTransactionListener, 一个工程只可以有一个实现类,否则启动报错。

代码实现

  1. 发送方代码
 public Long placeOrder(Long userId, SeckillOrderCommand seckillOrderCommand) {
        SeckillGoodsDTO seckillGoods = seckillGoodsDubboService.getSeckillGoods(seckillOrderCommand.getGoodsId(), seckillOrderCommand.getVersion());
        //检测商品
        this.checkSeckillGoods(seckillOrderCommand, seckillGoods);
        boolean exception = false;
        long txNo = SnowFlakeFactory.getSnowFlakeFromCache().nextId();
        String key = SeckillConstants.getKey(SeckillConstants.GOODS_ITEM_STOCK_KEY_PREFIX, String.valueOf(seckillOrderCommand.getGoodsId()));
        try{
        //获取商品限购信息
        Object limitObj = distributedCacheService.getObject(SeckillConstants.getKey(SeckillConstants.GOODS_ITEM_LIMIT_KEY_PREFIX, String.valueOf(seckillOrderCommand.getGoodsId())));
        //如果从Redis获取到的限购信息为null,则说明商品已经下线
        if (limitObj == null){
            throw new SeckillException(ErrorCode.GOODS_OFFLINE);
        }

        if (Integer.parseInt(String.valueOf(limitObj)) < seckillOrderCommand.getQuantity()){
            throw new SeckillException(ErrorCode.BEYOND_LIMIT_NUM);
        }
        Long result = distributedCacheService.decrementByLua(key, seckillOrderCommand.getQuantity());
        this.checkResult(result);
        }catch (Exception e){
            logger.error("SeckillPlaceOrderLuaService|下单异常|参数:{}|异常信息:{}", JSONObject.toJSONString(seckillOrderCommand), e.getMessage());
            exception = true;
            //将内存中的库存增加回去
            distributedCacheService.incrementByLua(key, seckillOrderCommand.getQuantity());
        }
        //事务消息
        Message<String> message = this.getTxMessage(txNo, userId, SeckillConstants.PLACE_ORDER_TYPE_LUA, exception, seckillOrderCommand, seckillGoods);
        //发送事务消息
        rocketMQTemplate.sendMessageInTransaction(SeckillConstants.TOPIC_TX_MSG, message, null);
        return txNo;
    }
  1. 发送方本地事务
/**
 * @author binghe(微信 : hacker_binghe)
 * @version 1.0.0
 * @description 监听事务消息
 * @github https://github.com/binghe001
 * @copyright 公众号: 冰河技术
 */
@Component
@RocketMQTransactionListener(rocketMQTemplateBeanName = "rocketMQTemplate")
public class OrderTxMessageListener implements RocketMQLocalTransactionListener{
    private final Logger logger = LoggerFactory.getLogger(OrderTxMessageListener.class);
    @Autowired
    private SeckillPlaceOrderService seckillPlaceOrderService;
    @Autowired
    private DistributedCacheService distributedCacheService;

    @Override
    @Transactional(rollbackFor = Exception.class)
    public RocketMQLocalTransactionState executeLocalTransaction(Message message, Object o) {
        TxMessage txMessage = this.getTxMessage(message);
        try{
            //已经抛出了异常,则直接回滚
            if (BooleanUtil.isTrue(txMessage.getException())){
                return RocketMQLocalTransactionState.ROLLBACK;
            }
            seckillPlaceOrderService.saveOrderInTransaction(txMessage);
//            int i = 1/0;
            logger.info("executeLocalTransaction|秒杀订单微服务成功提交本地事务|{}", txMessage.getTxNo());
            return RocketMQLocalTransactionState.COMMIT;
        }catch (Exception e){
            logger.error("executeLocalTransaction|秒杀订单微服务异常回滚事务|{}",txMessage.getTxNo());
            return RocketMQLocalTransactionState.ROLLBACK;
        }
    }

    @Override
    public RocketMQLocalTransactionState checkLocalTransaction(Message message) {
        TxMessage txMessage = this.getTxMessage(message);
        logger.info("checkLocalTransaction|秒杀订单微服务查询本地事务|{}", txMessage.getTxNo());
        Boolean submitTransaction = distributedCacheService.hasKey(SeckillConstants.getKey(SeckillConstants.ORDER_TX_KEY, String.valueOf(txMessage.getTxNo())));
        return BooleanUtil.isTrue(submitTransaction) ? RocketMQLocalTransactionState.COMMIT : RocketMQLocalTransactionState.UNKNOWN ;
    }

    private TxMessage getTxMessage(Message msg){
        String messageString = new String((byte[]) msg.getPayload());
        JSONObject jsonObject = JSONObject.parseObject(messageString);
        String txStr = jsonObject.getString(SeckillConstants.TX_MSG_KEY);
        return JSONObject.parseObject(txStr, TxMessage.class);
    }
}
  1. 消费者监听事务消息
@Component
@RocketMQMessageListener(consumerGroup = SeckillConstants.TX_GOODS_CONSUMER_GROUP, topic = SeckillConstants.TOPIC_TX_MSG)
public class GoodsTxMessageListener implements RocketMQListener<String> {
    private final Logger logger = LoggerFactory.getLogger(GoodsTxMessageListener.class);
    @Autowired
    private SeckillGoodsService seckillGoodsService;
    @Override
    public void onMessage(String message) {
        if (StrUtil.isEmpty(message)){
            return;
        }
        logger.info("秒杀商品微服务开始消费事务消息:{}", message);
        TxMessage txMessage = this.getTxMessage(message);
        //如果协调的异常信息字段为false,订单微服务没有抛出异常,则处理库存信息
        if (BooleanUtil.isFalse(txMessage.getException())){
            seckillGoodsService.updateAvailableStock(txMessage);
        }
    }

    private TxMessage getTxMessage(String msg){
        JSONObject jsonObject = JSONObject.parseObject(msg);
        String txStr = jsonObject.getString(SeckillConstants.TX_MSG_KEY);
        return JSONObject.parseObject(txStr, TxMessage.class);
    }
}
  1. 消费者处理本地事务
@Override
    @Transactional(rollbackFor = Exception.class)
    public boolean updateAvailableStock(TxMessage txMessage) {
        Boolean decrementStock = distributedCacheService.hasKey(SeckillConstants.getKey(SeckillConstants.GOODS_TX_KEY, String.valueOf(txMessage.getTxNo())));
        if (BooleanUtil.isTrue(decrementStock)){
            logger.info("updateAvailableStock|秒杀商品微服务已经扣减过库存|{}", txMessage.getTxNo());
            return true;
        }
        boolean isUpdate = false;
        try{
            isUpdate = seckillGoodsDomainService.updateAvailableStock(txMessage.getQuantity(), txMessage.getGoodsId());
            //成功扣减库存成功
            if (isUpdate){
                distributedCacheService.put(SeckillConstants.getKey(SeckillConstants.GOODS_TX_KEY, String.valueOf(txMessage.getTxNo())), txMessage.getTxNo(), SeckillConstants.TX_LOG_EXPIRE_DAY, TimeUnit.DAYS);
            }else{
                //发送失败消息给订单微服务
                rocketMQTemplate.send(SeckillConstants.TOPIC_ERROR_MSG, getErrorMessage(txMessage));
            }
//            int i = 1/0;
        }catch (Exception e){
            isUpdate = false;
            logger.error("updateAvailableStock|抛出异常|{}|{}",txMessage.getTxNo(), e.getMessage());
            //发送失败消息给订单微服务
            rocketMQTemplate.send(SeckillConstants.TOPIC_ERROR_MSG, getErrorMessage(txMessage));
        }
        return isUpdate;
    }

注意:在消费者执行本地事务时,如果发生异常,则执行逆向流程

  1. 发送者监听逆向流程消息
@Component
@RocketMQMessageListener(consumerGroup = SeckillConstants.TX_ORDER_CPNSUMER_GROUP, topic = SeckillConstants.TOPIC_ERROR_MSG)
public class OrderErrorMessageListener implements RocketMQListener<String> {
    private final Logger logger = LoggerFactory.getLogger(OrderErrorMessageListener.class);
    @Autowired
    private SeckillOrderService seckillOrderService;
    @Override
    public void onMessage(String message) {
        logger.info("onMessage|秒杀订单微服务开始消费消息:{}", message);
        if (StrUtil.isEmpty(message)){
            return;
        }
        //删除数据库中对应的订单
        seckillOrderService.deleteOrder(this.getErrorMessage(message));
    }

    private ErrorMessage getErrorMessage(String msg){
        JSONObject jsonObject = JSONObject.parseObject(msg);
        String txStr = jsonObject.getString(SeckillConstants.ERROR_MSG_KEY);
        return JSONObject.parseObject(txStr, ErrorMessage.class);
    }
}

注意:该代码仅提供执行流程参考

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1341392.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Grafana Loki 组件介绍

Loki 日志系统由以下3个部分组成&#xff1a; Loki是主服务器&#xff0c;负责存储日志和处理查询。Promtail是专为loki定制的代理&#xff0c;负责收集日志并将其发送给 loki 。Grafana用于 UI展示。 Distributor Distributor 是客户端连接的组件&#xff0c;用于收集日志…

小米SU7汽车发布会; 齐碳科技C+轮融资;网易 1 月 3 日发布子曰教育大模型;百度文心一言用户数已突破 1 亿

投融资 • 3200 家 VC 投资的创业公司破产&#xff0c;那个投 PLG 的 VC 宣布暂停投资了• 云天励飞参与 AI 技术与解决方案提供商智慧互通 Pre-IPO 轮融资• 百度投资 AIGC 公司必优科技• MicroLED量测公司点莘技术获数千万级融资• 智慧互通获AI上市公司云天励飞Pre-IPO轮战…

10. Opencv检测并截取图中二维码

1. 说明 在二维码扫描功能开发中,使用相机扫描图片时,往往图片中的信息比较多样,可能会造成二维码检测失败的问题。一种提高检测精度的方式就是把二维码在图片中单独抠出来,去除其它冗余信息,然后再去识别这张提取出来的二维码。本篇博客记录采用的一种实现二维码位置检测…

OSPF被动接口配置-新版(14)

目录 整体拓扑 操作步骤 1.基本配置 1.1 配置R1的IP 1.2 配置R2的IP 1.4 配置R4的IP 1.5 配置R5的IP 1.6 配置PC-1的IP地址 1.7 配置PC-2的IP地址 1.8 配置PC-3的IP地址 1.9 配置PC-4的IP地址 1.10 检测R1与PC3连通性 1.11 检测R2与PC4连通性 1.12 检测R4与PC1连…

docker小白第九天

docker小白第九天 安装redis集群 cluster(集群)模式-docker版本&#xff0c;哈希槽分区进行亿级数据存储。如果1~2亿条数据需要缓存&#xff0c;请问如何设计这个存储案例。单机存储是不可能的&#xff0c;需要分布式存储&#xff0c;如果使用redis又该如何部署。 哈希取余分…

第十二章 Sleuth分布式请求链路跟踪

Sleuth分布式请求链路跟踪 gitee:springcloud_study: springcloud&#xff1a;服务集群、注册中心、配置中心&#xff08;热更新&#xff09;、服务网关&#xff08;校验、路由、负载均衡&#xff09;、分布式缓存、分布式搜索、消息队列&#xff08;异步通信&#xff09;、数…

nginx报错upstream sent invalid header

nginx报错upstream sent invalid header 1.报错背景 最近由于nginx 1.20的某个漏洞需要升级到nginx1.25的版本。在测试环境升级完nginx后&#xff0c;发现应用直接报错502 bad gateway了。 然后查看nginx的errlog&#xff0c;发现&#xff1a; upstream sent invalid head…

语言模型:从n-gram到神经网络的演进

目录 1 前言2 语言模型的两个任务2.1 自然语言理解2.2 自然语言生成 3 n-gram模型4 神经网络语言模型5 结语 1 前言 语言模型是自然语言处理领域中的关键技术之一&#xff0c;它致力于理解和生成人类语言。从最初的n-gram模型到如今基于神经网络的深度学习模型&#xff0c;语言…

LMX2571 芯片配置Verliog SPI驱动

前言 本实验使用ZYNQ的PL(FPGA)对LMX2571芯片进行配置&#xff0c;以下连接为相关的原理和软件使用资料。 TICS Pro 配置时钟芯片 文献阅读–Σ-Δ 小数频率合成器原理 LMX2571芯片数据手册 一、LMX2571配置时序分析 1.1 写时序 LMX2571使用24位寄存器进行编程。一个24位移位…

Codeforces Round 918 (Div. 4)(AK)

A、模拟 B、模拟 C、模拟 D、模拟 E、思维&#xff0c;前缀和 F、思维、逆序对 G、最短路 A - Odd One Out 题意&#xff1a;给定三个数字&#xff0c;有两个相同&#xff0c;输出那个不同的数字。 直接傻瓜写法 void solve() {int a , b , c;cin >> a >>…

机器学习 -- 数据预处理

系列文章目录 未完待续…… 目录 系列文章目录 前言 一、数值分析简介 二、内容 前言 tips&#xff1a;这里只是总结&#xff0c;不是教程哈。 以下内容仅为暂定&#xff0c;因为我还没找到一个好的&#xff0c;让小白&#xff08;我自己&#xff09;也能容易理解&#x…

Java线上问题排查思路

1、Java 服务常见问题 Java 服务的线上问题从系统表象来看大致可分成两大类: 系统环境异常、业务服务异常。 系统环境异常&#xff1a;主要从CPU、内存、磁盘、网络四个方面考虑。比如&#xff1a;CPU 占用率过高、CPU 上下文切换频率次数较高、系统可用内存长期处于较低值、…

工业产线看板的智能化应用

在数字化浪潮兴起之前&#xff0c;许多制造企业主要依赖手工生产和传统的生产管理方法&#xff0c;生产数据的收集和分析主要依赖于人工&#xff0c;导致信息传递滞后、生产过程不透明&#xff0c;难以及时调整生产计划。在传统的生产环境中&#xff0c;生产过程的各个环节缺乏…

留言板(Mybatis连接数据库版)

目录 1.添加Mybatis和SQL的依赖 2.建立数据库和需要的表 3.对应表中的字段&#xff0c;补充Java对象 4.对代码进行逻辑分层 5.后端逻辑代码 之前的项目实例【基于Spring MVC的前后端交互案例及应用分层的实现】https://blog.csdn.net/weixin_67793092/article/details/134…

K8S结合Prometheus构建监控系统

一、Prometheus简介 Prometheus 是一个开源的系统监控和警报工具&#xff0c;用于收集、存储和查询时间序列数据。它专注于监控应用程序和基础设施的性能和状态&#xff0c;并提供丰富的查询语言和灵活的告警机制1、Prometheus基本介绍 数据模型&#xff1a;Prometheus 使用时…

Spring Boot笔记1

1. SpringBoot简介 1.1. 原有Spring优缺点分析 1.1.1. Spring的优点分析 Spring是Java企业版&#xff08;Java Enterprise Edition&#xff0c;javeEE&#xff09;的轻量级代替品。无需开发重量级的Enterprise JavaBean&#xff08;EJB&#xff09;&#xff0c;Spring为企业…

20231227在Firefly的AIO-3399J开发板的Android11的挖掘机的DTS配置单后摄像头ov13850

20231227在Firefly的AIO-3399J开发板的Android11的挖掘机的DTS配置单后摄像头ov13850 2023/12/27 18:40 1、简略步骤&#xff1a; rootrootrootroot-X99-Turbo:~/3TB$ cat Android11.0.tar.bz2.a* > Android11.0.tar.bz2 rootrootrootroot-X99-Turbo:~/3TB$ tar jxvf Androi…

阿里云30个公共云地域、89个可用区、5个金融云和政务云地域

阿里云基础设施目前已面向全球四大洲&#xff0c;公共云地域开服运营30个公共云地域、89个可用区&#xff0c;此外还拥有5个金融云、政务云地域&#xff0c;并且致力于持续的新地域规划和建设&#xff0c;从而更好的满足用户多样化的业务和场景需求。伴随着基础设施的加速投入和…

ARM CCA机密计算软件架构之内存加密上下文(MEC)

内存加密上下文(MEC) 内存加密上下文是与内存区域相关联的加密配置,由MMU分配。 MEC是Arm Realm Management Extension(RME)的扩展。RME系统架构要求对Realm、Secure和Root PAS进行加密。用于每个PAS的加密密钥、调整或加密上下文在该PAS内是全局的。例如,对于Realm PA…

Kubernetes 学习总结(41)—— 云原生容器网络详解

背景 随着网络技术的发展&#xff0c;网络的虚拟化程度越来越高&#xff0c;特别是云原生网络&#xff0c;叠加了物理网络、虚机网络和容器网络&#xff0c;数据包在网络 OSI 七层网络模型、TCP/IP 五层网络模型的不同网络层进行封包、转发和解包。网络数据包跨主机网络、容器…