命运交织的节点:分布式事务最终一致性的心跳共鸣纪实

news2024/11/18 19:49:00

关注微信公众号 “程序员小胖” 每日技术干货,第一时间送达!

引言

在当今云计算和微服务架构大行其道的时代,分布式系统成为了构建高可用、高性能应用的基石。然而,随着系统规模的扩张,数据的一致性问题如同幽灵般萦绕在每位架构师心头,尤其是分布式事务处理中的挑战更是首当其冲。今天,让我们一起深入探索分布式事务模型中的“最终一致性”,揭开它那既神秘又强大的面纱。

分布式事务的挑战与背景

想象一下双十一购物节,数百万用户同时下单,订单系统、库存系统、支付系统等多个服务间需要协同完成交易。采用最终一致性模型,即使瞬间请求激增导致部分操作延迟,系统也能确保在合理的时间框架内调整库存、确认订单状态,从而维持整体业务流程的顺畅。

最终一致性?

在分布式系统中,最终一致性是一种事务模型,它保证系统中的所有数据副本最终会达到一致的状态,但不保证立即的一致性。这种模型允许在数据复制过程中存在短暂的不一致状态,但随着时间的推移,系统会通过各种机制确保数据最终达到一致。

实现策略

补偿事务(TCC)

TCC,即Try-Confirm-Cancel,是一种通过预先定义的确认和取消操作来保证事务最终一致性的模式。

**Try 阶段:**调用 Try 接口,尝试执行业务,完成所有业务检查,预留业务资源。
Confirm 或 Cancel 阶段:两者是互斥的,只能进入其中一个,并且都满足幂等性,允许失败重试。

**Confirm 操作:**对业务系统做确认提交,确认执行业务操作,不做其他业务检查,只使用 Try 阶段预留的业务资
源。

**Cancel 操作:**在业务执行错误,需要回滚的状态下执行业务取消,释放预留资源。

转账场景示例:


//Account类代表一个账户,拥有冻结、解冻、存款和取款的方法。Money类代表金额。
public class AccountService {

    // Try阶段:检查账户余额并冻结资金
    public boolean prepareTransfer(Account source, Account target, Money amount) {
        if (source.getBalance() < amount) {
            return false; // 余额不足
        }
        source.freeze(amount); // 冻结资金
        return true;
    }

    // Confirm阶段:实际转账操作
    public void confirmTransfer(Account source, Account target, Money amount) {
        source.withdraw(amount); // 从源账户扣除金额
        target.deposit(amount); // 向目标账户增加金额
    }

    // Cancel阶段:回滚操作,解冻资金
    public void cancelTransfer(Account source, Account target, Money amount) {
        source.unfreeze(amount); // 解冻资金
    }
}

注意事项

**幂等性:**确保Try、Confirm和Cancel操作都是幂等的,以支持重复执行而不会引起副作用。

**空回滚:**系统应能够处理“空回滚”的情况,即Cancel操作被调用,但Try操作并未实际执行。

**防悬挂:**确保系统能够处理悬挂操作,即Try操作在网络延迟后到达,而Cancel操作已经执行。

本地消息表

本地消息表的方案最初是由 ebay 的工程师提出,核心思想是将分布式事务拆分成本地事务进行处理,通过消息日志的方式来异步执行。本地消息表是一种业务耦合的设计,消息生产方需要额外建一个事务消息表,并记录消息发送状态,消息消费方需要处理这个消息,并完成自己的业务逻辑,另外会有一个异步机制来定期扫描未完成的消息,确保最终一致性。

实战代码示例:

  1. 系统收到下单请求,将订单业务数据存入到订单库中,并且同时存储该订单对应的消息数据,比如购买商品的 ID 和数量,消息数据与订单库为同一库,更新订单和存储消息为一个本地事务,要么都成功,要么都失败。
   @Service
    public class OrderService {

        @Resource
        private OrderMapper orderMapper;
        
        @Resource
        private OrderMessageMapper orderMessageMapper;

        @Autowired
        private MessageProducer messageProducer; // 消息队列的发送器

        @Transactional
        public void placeOrder(Order order, OrderMessage orderMessage) {
            // 将订单业务数据存入到订单库中
            int orderRows = orderMapper.insert(order);

            // 同时存储该订单对应的消息数据
            int messageRows = orderMessageMapper.insert(orderMessage);

            // 确保订单和消息数据都成功插入
            if (orderRows == 1 && messageRows == 1) {
                // 发送库存更新消息到消息队列
                messageProducer.sendMessage(orderMessage);
            } else {
                // 如果任何插入失败,抛出异常以回滚事务
                throw new RuntimeException("Order or message data insertion failed");
            }
        }
    }
  1. 库存服务更新
    @Autowired
    private InventoryDomainService inventoryDomainService;

    @Override
    public boolean consume(MqMessageEntity<OrderMessage> mqMessageEntity) {
        log.info("接收订单支付完成请求,扣件库存:{}", JSON.toJSONString(mqMessageEntity));
        return inventoryDomainService.deductionInventory(mqMessageEntity);
    }
  1. 订单服务更新本地消息表
        @Autowired
        private OrderMessageMapper orderMessageMapper;

        public void sendMessage(OrderMessage orderMessage) {
            // 向消息队列发送库存更新消息
            // 消息发送成功的回调中更新本地消息表状态
            orderMessageMapper.upodateMessageStatus(orderMessage);
        }
  1. 异步任务重试机制
    使用Spring的@Scheduled注解来定时触发异步任务。这个地方用任何调度计划都可以实现 我用的是spring自带的@Scheduled注解实现的。异步技术也可以根据自己的情况选择。

@Component
public class MessageRetryScheduler {

    @Autowired
    private MessageProducer messageProducer;

    @Scheduled(fixedRate = 60000) // 每60秒执行一次
    public void scheduleMessageRetry() {
        messageProducer.scanAndRetryUnsentMessages();
    }
}

    @Autowired
    private OrderMessageMapper orderMessageMapper;
    
    @Async
    public void scanAndRetryUnsentMessages() {

        List<OrderMessageDO> unsentMessages = orderMessageMapper.queryByStatus("PENDING");
        
        for (OrderMessage message : unsentMessages) {
            try {
                sendMessage(message); // 重试发送消息
                orderMessageMapper.updateStatus(message);
            } catch (Exception e) {
                // 可以选择更新状态为错误或其他逻辑
                orderMessageMapper.updateStatus(message);
            }
        }
    }

RocketMQ 事务消息

RocketMQ 事务消息是一种支持分布式事务的消息。它通过引入 prepare、commit 和 rollback 三个阶段,来确保事务消息的一致性。

**prepare 阶段:**消息发送方发送半消息,此时消息的状态为“待提交”。

**commit 阶段:**消息发送方向 RocketMQ 发送 commit 消息请求,RocketMQ 判断此时半消息是否被确认,如果半消息已被确认,则将消息标记为“可消费”并提交事务。如果半消息未被确认,则将消息标记为“不可消费”并终止事务。

**rollback 阶段:**消息发送方向 RocketMQ 发送 rollback 消息请求,RocketMQ 将半消息标记为“不可消费”并回滚
事务。


代码示例:

创建并初始化一个事务消息生产者:

import org.apache.rocketmq.client.producer.LocalTransactionState;
import org.apache.rocketmq.client.producer.TransactionListener;
import org.apache.rocketmq.client.producer.TransactionMQProducer;
import org.apache.rocketmq.common.message.Message;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class TransactionProducer {

    public static void main(String[] args) throws Exception {
        // 实例化消息生产者,并指定NameServer地址
        TransactionMQProducer producer = new TransactionMQProducer("please_rename_unique_group_name");
        producer.setNamesrvAddr("localhost:9876");

        // 指定事务监听器
        producer.setTransactionListener(new TransactionListener() {
            @Override
            public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
                // 执行本地事务逻辑,例如数据库操作
                // 假设执行成功,返回Commit状态
                return LocalTransactionState.CommitMessage;
            }

            @Override
            public LocalTransactionState checkLocalTransaction(Message msg) {
                // 检查本地事务状态,确认是否需要提交或回滚
                // 这里可以根据业务逻辑来实现检查
                // 假设检查通过,返回Unknown状态,让消息服务决定是提交还是回滚
                return LocalTransactionState.Unknown;
            }
        });

        // 启动生产者
        producer.start();

        // 创建消息
        Message msg = new Message("TopicTest", "TagA", "Hello RocketMQ".getBytes());

        // 发送事务消息
        SendResult sendResult = producer.sendMessageInTransaction(msg, null);

        System.out.printf("%s%n", sendResult);

        // 关闭生产者
        producer.shutdown();
    }
}

在代码示例中我们实现了TransactionListener接口的两个方法:

**executeLocalTransaction:**执行本地事务逻辑,返回事务状态。如果本地事务执行成功,返回CommitMessage;如果执行失败,返回RollbackMessage。

**checkLocalTransaction:**检查本地事务状态。如果事务状态未知,返回Unknown,让消息队列服务决定是提交还是回滚消息。

最大努力通知

最大努力通知型( Best-effort delivery)是最简单的一种柔性事务,适用于一些最终一致性时间敏感度低
的业务,且被动方处理结果 不影响主动方的处理结果。典型的使用场景:如银行通知、商户通知等。
最大努力通知型的实现方案,一般符合以下特点:

  1. 不可靠消息:业务活动主动方,在完成业务处理之后,向业务活动的被动方发送消息,直到通知N次后不再通知,允许消息丢失(不可靠消息)。
  2. 定期校对:业务活动的被动方,根据定时策略,向业务活动主动方查询(主动方提供查询接口),恢复丢失的业务消息

代码示例:

发送通知

@Service
public class NotificationService {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    public void sendNotification(String message) {
        // 发送通知消息到MQ
        rabbitTemplate.convertAndSend("notificationExchange", "notificationRoutingKey", message);
    }
}

监听消息并处理

@Component
public class NotificationListener {

    @RabbitListener(queues = "notificationQueue")
    public void handleNotification(String message) {
        // 处理消息,例如更新库存
        processNotification(message);

        // 确认消息已处理
        acknowledgeMessage();
    }

    private void processNotification(String message) {
        // 业务逻辑处理
    }

    private void acknowledgeMessage() {
        // 向发送方确认消息已处理的逻辑
    }
}

重试机制通常由消息中间件提供,如RabbitMQ的死信队列和重试策略。校对机制可能需要额外的接口和逻辑来实现。

结语

最终一致性作为分布式系统中一种重要的事务处理哲学,它在实践中展现出了强大的生命力。然而,没有银弹存在,每种模型都有其适用场景与局限。作为技术探索者,我们应当持续思考如何更精细地控制一致性级别,结合业务特性量体裁衣,设计出既能满足业务需求又能保持系统弹性的解决方案。那么,您在实际项目中遇到过哪些分布式事务的挑战?对于最终一致性模型又有何独到见解或疑问呢?欢迎留言讨论,共同推进技术的边界。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1657709.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Linux字符设备驱动(一) - 框架

字符设备是Linux三大设备之一(另外两种是块设备&#xff0c;网络设备)&#xff0c;字符设备就是字节流形式通讯的I/O设备,绝大部分设备都是字符设备&#xff0c;常见的字符设备包括鼠标、键盘、显示器、串口等等&#xff0c;当我们执行ls -l /dev的时候&#xff0c;就能看到大量…

C++容器之vector类

目录 1.vector的介绍及使用1.1vector的介绍1.2vector的使用1.2.1 vector的定义1.2.2 vector iterator 的使用1.2.3 vector 空间增长问题1.2.4 vector 增删查改1.2.5vector 迭代器失效问题1.2.6 vector 在OJ中的使用。 2.vector深度剖析及模拟实现2.1 std::vector的核心框架接口…

Kotlin基础知识总结(三万字超详细)

1、条件语句 &#xff08;1&#xff09;if条件 if条件表达式&#xff0c;每一个分支最后一条语句就是该分支的返回值。适用于每个分支返回值类型一致这种情况。 fun getDegree(score: Int): String{val result: String if(score 100){"非常优秀"}else if(score …

【2024全国青少年信息素养大赛初赛时间以及模拟题】

2024全国青少年信息素养大赛时间已经出来了 目录 全国青少年信息素养大赛智能算法挑战赛初中模拟卷 全国青少年信息素养大赛智能算法挑战赛初中模拟卷 1、比赛时间和考试内容&#xff1a; 算法创意实践挑战赛初中组于5月19日举行&#xff0c;检录时间为10:30-11:00&#xf…

OS复习笔记ch5-3

引言 上一节我们学习了关于信号量机制的一些内容&#xff0c;包括信号量的含义&#xff0c;对应的PV操作等。 如图所示&#xff0c;上一节主要是针对信号量的互斥&#xff0c;其实信号量机制还可以做很多事情&#xff0c;比如实现进程同步和前驱关系&#xff0c;这一节我们先复…

leetcode每日一题第七十二天

class Solution { public:TreeNode* searchBST(TreeNode* root, int val) {if(!root) return root;if(root->val val) return root;else if(root->val > val) return searchBST(root->left,val);else return searchBST(root->right,val);} };

新能源汽车动力电池热管理方案直冷方案原理简介

前言 随着新能源汽车的快速发展&#xff0c;动力电池作为其核心部件之一&#xff0c;对于其性能和寿命具有重要影响。动力电池在工作过程中会产生大量的热量&#xff0c;如果不能有效地进行热管理&#xff0c;将会导致电池温度升高、性能下降甚至损坏。因此&#xff0c;热管理…

C语言【文件操作 2】

文章目录 前言顺序读写函数的介绍fputc && fgetcfputcfgetc fputs && fgetsfputsfgets fprintf && fscanffprintffscanf fwrite && freadfwritefread 文件的随机读写fseek函数偏移量ftell函数rewind函数 文件的结束判断被错误使用的feof 结语 …

哈希题目总结

以下列举了可以用哈希方法&#xff08;包括但不限于用HashMap和HashSet&#xff09;的题目&#xff0c;实质上是把东西丢给这些数据结构去维护。请注意有些题目中用哈希是最优解&#xff0c;有些题目中不是最优解&#xff0c;可以自行探索其时间复杂度和空间复杂度的区别&#…

【Java】还不会数组?一文万字全搞定

前言&#xff1a;前面两章我们详细讲解了Java基本程序设计结构中的基本知识&#xff0c;&#xff0c;包括&#xff1a;一个简单的Java应用&#xff0c;注释&#xff0c;数据类型&#xff0c;变量与常量&#xff0c;运算符&#xff0c;字符串&#xff0c;输入输出&#xff0c;控…

探索精酿啤酒:从经典到创新

Fendi club啤酒一直以来都以其卓着的品质和与众不同的口感深受消费者喜爱。而随着时代的变迁和消费者口味的不断变化&#xff0c;Fendi club啤酒也在不断地探索和创新&#xff0c;以满足市场的多样化需求。 在经典的口感和风味基础上&#xff0c;Fendi club啤酒不断地尝试新的原…

多线程学习D10 收尾了应该

线程安全集合类概述 重点介绍java.util.concurrent.* 下的线程安全集合类&#xff0c;可以发现它们有规律&#xff0c;里面包含三类关键词&#xff1a;Blocking、CopyOnWrite、Concurrent Blocking 大部分实现基于锁&#xff0c;并提供用来阻塞的方法 CopyOnWrite 之类容器修改…

探讨关于AutoPSA里CII算法的结构荷载

UKP3D,AutoPDMS导出应力计算文件至管道应力分析软件分析&#xff0c;如下图AutoPSA.用户咨询如图 1.如果计算时考虑水重&#xff0c;把工况中的w改为ww&#xff1b; 2.CAD表格中结构荷载不是单纯的1.5倍&#xff0c;是参照仿GLIF的算法&#xff0c;计算了水重的&#xff08;根…

如何进行资产梳理

前言 为什么要进行资产梳理&#xff1f; 资产梳理方式一: 一、安全防护设备资产 二、对外开放服务项目资产 三、项目外包业务流程资产 资产梳理方式二: 一、业务资源梳理 二、设备资产梳理 三、第三方的服务信息梳理 风险梳理 风险有哪些&#xff1f; 一,账号权限风…

在windows下使用VS Code、CMake、Make进行代码编译

软件环境 Windows11VS CodeNoneCMake3.26.4-windows-x86_64MinGWNone 电脑系统配置 安装MinGW将MinGW安装文件夹中bin文件夹下的mingw32-make.exe复制并重命名为make.exe在文件夹中添加系统路径&#xff0c;具体位置为 系统->系统信息->高级系统设置->高级->环境…

Core_Air724UG学习

产品描述 Core_Air724UG核心板是基于Air724UG cat1模板制作的开发实验板。 该模块支持Lua二次开发或AT指令&#xff0c;方便开发者根据自己的需求灵活选择。 Core_Air724UG核心板专注于小型化&#xff0c;PCB尺寸4246mm&#xff0c;有12x22哥标准2.54mm排针管脚&#xff0c;其…

Android MediaCodec 简明教程(七):使用 MediaCodec 解码到 OES 纹理上

系列文章目录 Android MediaCodec 简明教程&#xff08;一&#xff09;&#xff1a;使用 MediaCodecList 查询 Codec 信息&#xff0c;并创建 MediaCodec 编解码器Android MediaCodec 简明教程&#xff08;二&#xff09;&#xff1a;使用 MediaCodecInfo.CodecCapabilities 查…

安装oh-my-zsh(命令行工具)

文章目录 一、安装zsh、git、wget二、安装运行脚本1、curl/wget下载2、手动下载 三、切换主题1、编辑配置文件2、切换主题 四、安装插件1、zsh-syntax-highlighting&#xff08;高亮语法错误&#xff09;2、zsh-autosuggestions&#xff08;自动补全&#xff09; 五、更多优化配…

FFmpeg常用命令详解与实战指南

下载地址&#xff1a;Releases BtbN/FFmpeg-Builds (github.com) 1. 获取视频信息 使用FFmpeg获取视频信息是最基本的操作之一。你可以使用-i选项指定输入文件&#xff0c;然后使用FFmpeg内置的分析器来获取视频的各种信息&#xff0c;包括视频编解码器、音频编解码器、分辨…

【bug记录】清除僵尸进程,释放GPU显存

目录 1. 为什么会出现这种情况&#xff1f;2. 解决方案方法一&#xff1a;使用 fuser 命令方法二&#xff1a; 3. 小贴士 在进行深度学习或其他需要GPU支持的任务时&#xff0c;我们有时会发现虽然没有可见的进程在执行&#xff0c;但GPU资源却意外地被占用。这种情况往往会阻碍…