分布式事务几种实现方案

news2025/1/21 4:59:36

前言

分布式事务?

分布式事务是指涉及多个参与者的事务,这些参与者可能分布在不同的计算机、进程或者网络中。分布式事务需要保证ACID属性,即原子性、一致性、隔离性和持久性

解释

现在我们接触的系统基本上都是分布式系统,并且每个系统都会有自己的业务领域,当涉及到系统交互的时候,必然会涉及相关数据库数据的变动,一般的来说,每个业务系统都会有一个自己单独的数据源(不管是物理的,还是逻辑的),在目前来说,在spring体系下数据库的事务不支持跨应用,这样子就会导致数据一致性问题,即分布式事务问题。

实现方案

事务补偿

建表

CREATE TABLE `tx_compensation_task` (
  `id` bigint(20) NOT NULL AUTO_INCREMENT COMMENT '主键',
  `out_key` varchar(32) DEFAULT NULL COMMENT '外部主键key',
  `biz_type` int(10) NOT NULL COMMENT '业务类型',
  `times` int(10) NOT NULL DEFAULT '0' COMMENT '已经执行次数\n',
  `created` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
  `last_modified` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '修改时间',
  `status` tinyint(4) NOT NULL DEFAULT '0' COMMENT '状态0-待执行,1-执行中,3-成功,4-失败',
  PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4

执行流程

image-20230417173447094

解释

  1. 进行本地业务,添加一条补偿任务,这个任务是实际需要调用的二方接口,然后再继续执行自己的业务
  2. 通过外部的任务驱动拉起相关的任务,获取到相应的out_key,然后根据相应out_key执行对应的逻辑

事务型消息

在rocketmq中提供了分布式事务的解决方案–事务消息

整体执行流程

image-20230417175021111

解释

  1. 生产者将半事务消息发送至 RocketMQ Broker

  2. RocketMQ Broker 将消息持久化成功之后,向生产者返回 Ack 确认消息已经发送成功,此时消息暂不能投递,为半事务消息。

  3. 生产者开始执行本地事务逻辑。

  4. 生产者根据本地事务执行结果向服务端提交二次确认结果(Commit或是Rollback),服务端收到确认结果后处理逻辑如下:

    • 二次确认结果为Commit:服务端将半事务消息标记为可投递,并投递给消费者。

    • 二次确认结果为Rollback:服务端将回滚事务,不会将半事务消息投递给消费者。

  5. 在断网或者是生产者应用重启的特殊情况下,若服务端未收到发送者提交的二次确认结果,或服务端收到的二次确认结果为Unknown未知状态,经过固定时间后,服务端将对消息生产者即生产者集群中任一生产者实例发起消息回查。

  6. :::note 需要注意的是,服务端仅仅会按照参数尝试指定次数,超过次数后事务会强制回滚,因此未决事务的回查时效性非常关键,需要按照业务的实际风险来设置 :::

事务消息回查步骤如下: 7. 生产者收到消息回查后,需要检查对应消息的本地事务执行的最终结果。 8. 生产者根据检查得到的本地事务的最终状态再次提交二次确认,服务端仍按照步骤4对半事务消息进行处理。

源码分析

发送半消息
  public TransactionSendResult sendMessageInTransaction(Message msg, LocalTransactionExecuter localTransactionExecuter, Object arg) throws MQClientException {
        TransactionListener transactionListener = this.getCheckListener();
        if (null == localTransactionExecuter && null == transactionListener) {
            throw new MQClientException("tranExecutor is null", (Throwable)null);
        } else {
            Validators.checkMessage(msg, this.defaultMQProducer);
            SendResult sendResult = null;
            MessageAccessor.putProperty(msg, "TRAN_MSG", "true");
            MessageAccessor.putProperty(msg, "PGROUP", this.defaultMQProducer.getProducerGroup());

            try {
                //发送一次半消息
                sendResult = this.send(msg);
            } catch (Exception var11) {
                throw new MQClientException("send message Exception", var11);
            }
发送消息流程

org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl#sendKernelImpl

 public SendResult sendMessage(
        final String addr,
        final String brokerName,
        final Message msg,
        final SendMessageRequestHeader requestHeader,
        final long timeoutMillis,
        final CommunicationMode communicationMode,
        final SendCallback sendCallback,
        final TopicPublishInfo topicPublishInfo,
        final MQClientInstance instance,
        final int retryTimesWhenSendFailed,
        final SendMessageContext context,
        final DefaultMQProducerImpl producer
    ) throws RemotingException, MQBrokerException, InterruptedException {
        long beginStartTime = System.currentTimeMillis();
        RemotingCommand request = null;
        if (sendSmartMsg || msg instanceof MessageBatch) {
            SendMessageRequestHeaderV2 requestHeaderV2 = SendMessageRequestHeaderV2.createSendMessageRequestHeaderV2(requestHeader);
            request = RemotingCommand.createRequestCommand(msg instanceof MessageBatch ? RequestCode.SEND_BATCH_MESSAGE : RequestCode.SEND_MESSAGE_V2, requestHeaderV2);
        } else {
          //这里发送消息
            request = RemotingCommand.createRequestCommand(RequestCode.SEND_MESSAGE, requestHeader);
        }
Broker接受消息
 public RemotingCommand processRequest(ChannelHandlerContext ctx,
                                          RemotingCommand request) throws RemotingCommandException {
        SendMessageContext mqtraceContext;
        switch (request.getCode()) {
            case RequestCode.CONSUMER_SEND_MSG_BACK:
                return this.consumerSendMsgBack(ctx, request);
            default:
                SendMessageRequestHeader requestHeader = parseRequestHeader(request);
                if (requestHeader == null) {
                    return null;
                }

                mqtraceContext = buildMsgContext(ctx, requestHeader);
                this.executeSendMessageHookBefore(ctx, request, mqtraceContext);

                RemotingCommand response;
                if (requestHeader.isBatch()) {
                    response = this.sendBatchMessage(ctx, request, mqtraceContext, requestHeader);
                } else {
                //执行发送消息,其实这里就是往store里面加数据
                    response = this.sendMessage(ctx, request, mqtraceContext, requestHeader);
                }

                this.executeSendMessageHookAfter(response, mqtraceContext);
                return response;
        }
    }

执行本地事务1-1
LocalTransactionState localTransactionState = LocalTransactionState.UNKNOW;
            Throwable localException = null;
            switch (sendResult.getSendStatus()) {
                case SEND_OK:
                    try {
                        if (sendResult.getTransactionId() != null) {
                            msg.putUserProperty("__transactionId__", sendResult.getTransactionId());
                        }

                        String transactionId = msg.getProperty("UNIQ_KEY");
                        if (null != transactionId && !"".equals(transactionId)) {
                            msg.setTransactionId(transactionId);
                        }

                        if (null != localTransactionExecuter) {
                            localTransactionState = localTransactionExecuter.executeLocalTransactionBranch(msg,            												 arg);
                        } else if (transactionListener != null) {
                            this.log.debug("Used new transaction API");
                            //执行本地事务事务,并返回一个状态
                            localTransactionState = transactionListener.executeLocalTransaction(msg, arg);
                        }
                      	//如果是这种情况的话,其实是不知道本地事务执行的结果,成功或者失败
                        //这里本地事务状态如果没有的话,就给一个UNKNOW,让事务回查线程去做check
                        if (null == localTransactionState) {
                            localTransactionState = LocalTransactionState.UNKNOW;
                        }

                        if (localTransactionState != LocalTransactionState.COMMIT_MESSAGE) {
                            this.log.info("executeLocalTransactionBranch return {}", localTransactionState);
                            this.log.info(msg.toString());
                        }
                    } catch (Throwable var10) {
                        this.log.info("executeLocalTransactionBranch exception", var10);
                        this.log.info(msg.toString());
                        localException = var10;
                    }
                    break;
                case FLUSH_DISK_TIMEOUT:
                case FLUSH_SLAVE_TIMEOUT:
                case SLAVE_NOT_AVAILABLE:
                    localTransactionState = LocalTransactionState.ROLLBACK_MESSAGE;
            }

执行本地事务1-2
public class MyTransactionListener implements RocketMQLocalTransactionListener {

    //本地事务方法
    @Override
    public RocketMQLocalTransactionState executeLocalTransaction(Message message, Object o) {
        //执行本地事务
        try {
            System.out.println("执行本地事务");
            System.out.println(message.getPayload());
        } catch (Exception e) {
            //如果发生了异常,返回ROLLBACK状态
            e.printStackTrace();
            return RocketMQLocalTransactionState.ROLLBACK;
        }
        //没有发生错误的话,返回COMMIT状态
        return RocketMQLocalTransactionState.COMMIT;
    }
发送真实消息
 public void endTransactionOneway(
        final String addr,
        final EndTransactionRequestHeader requestHeader,
        final String remark,
        final long timeoutMillis
    ) throws RemotingException, MQBrokerException, InterruptedException {
        RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.END_TRANSACTION, requestHeader);

        request.setRemark(remark);
        //发送最终的消息,前提是 -- 半事务消息的状态是明确的
        this.remotingClient.invokeOneway(addr, request, timeoutMillis);
    }
事务状态回查线程

在broker启动的时候启动

  1. org.apache.rocketmq.broker.BrokerStartup#main
  2. org.apache.rocketmq.broker.BrokerStartup#start
  3. org.apache.rocketmq.broker.BrokerStartup#createBrokerController
  4. org.apache.rocketmq.broker.BrokerController#initialize
  5. org.apache.rocketmq.broker.BrokerController#initialTransaction
  6. org.apache.rocketmq.common.ServiceThread#waitForRunning
  7. org.apache.rocketmq.broker.transaction.TransactionalMessageCheckService#onWaitEnd
  8. org.apache.rocketmq.broker.transaction.TransactionalMessageService#check

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/768317.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

计算几何(二维),定理及证明(持续更新中..)

注:定理来自这篇博客,本文注重证明 向量基本运算 加法 向量 a ⃗ ( x 1 , y 1 ) , b ⃗ ( x 2 , y 2 ) \vec{a}\left(x_1,y_1\right),\vec{b}\left(x_2,y_2\right) a (x1​,y1​),b (x2​,y2​) 则 a ⃗ b ⃗ ( x 1 x 2 , y 1 y 2 ) \vec{a}\…

P7 第二章 电阻电路的等效变换

1、等效变换应用举例 化简套路: 电压源与其串联的电阻,可以等效为电流源并联电阻,然后电流源就可以拿去合并电路中的,与之并联的电流,电阻则可以拿去合并与之并联的电阻。 公式法: 就是根据端的电压与端的…

第一节 C++ 变量

文章目录 1. Visual Studio Community 安装1.1. Visual Studio 介绍1.2. Visual Studio的安装1.3 Visual Studio创建与使用1.3.1 创建一个工程项目1.3.2 新建一个C文件1.3.3 编写执行文件 2. Dev-C 安装(初学者建议使用)2.1 Dev-C 介绍2.2 Dev-C 安装2.3 Dev-C 快捷键使用 3. 认…

数学建模常用模型(九) :偏最小二乘回归分析

数学建模常用模型(九) :偏最小二乘回归分析 偏最小二乘回归(Partial Least Squares Regression,PLS Regression)是一种常用的统计建模方法,用于解决多元线性回归中自变量间高度相关的问题。在偏…

【java】三大容器类(List、Set、Map)的常用实现类的特点

三大容器类(List、Set、Map)的常用实现类的特点 简介 本文总结三大容器类(List、Set、Map)的常用实现类(ArrayList、Vector、LinkedList、HashSet、HashMap、HashTable)的特点 一、List部分 1、ArrayLi…

C# DlibDotNet 人脸识别、人脸68特征点识别、人脸5特征点识别、人脸对齐,三角剖分,人脸特征比对

人脸识别 人脸68特征点识别 人脸5特征点识别 人脸对齐 三角剖分 人脸特征比对 项目 VS2022.net4.8OpenCvSharp4DlibDotNet Demo下载 代码 using DlibDotNet.Extensions; using DlibDotNet; using System; using System.Collections.Generic; using System.ComponentModel; …

学堂在线数据结构(上)(2023春)邓俊辉 课后作业错题整理

The reverse number of a sequence is defined as the total number of reversed pairs in the sequence, and the total number of element comparisons performed by the insertion sort in the list of size n is: 一个序列的逆序数定义为该序列中的逆序对总数,…

transformer Position Embedding

这是最近一段很棒的 Youtube 视频,它深入介绍了位置嵌入,并带有精美的动画: Transformer 神经网络视觉指南 -(第 1 部分)位置嵌入 让我们尝试理解计算位置嵌入的公式的“sin”部分: 这里“pos”指的是“单词…

自定义view(一)----自定义TextView

自定义view也算是Android的一大难点,里面涉及到很多值得学习的地方,我会在接下来写一系列文章去介绍它,本篇文章以自定义一个TextView为例。 View的构造方法 自定义view之前我们先了解view的四个构造方法,自定义view无非就是新建一…

R语言逻辑回归(Logistic Regression)、回归决策树、随机森林信用卡违约分析信贷数据集...

原文链接:http://tecdat.cn/?p23344 本文中我们介绍了决策树和随机森林的概念,并在R语言中用逻辑回归、回归决策树、随机森林进行信用卡违约数据分析(查看文末了解数据获取方式)(点击文末“阅读原文”获取完整代码数据…

MVSNet、PatchMatchNet中的 eval.sh文件超参数解释

下面以PatchMatchNet为例, 打开PatchMatchNet程序中的 eavl.sh文件, 可以看到文件设置了数据集路径,及超参数设置(超参数,也可以不写,会使用默认参数) 上图中各参数意思如下: 执行文件python eval.py; 数据集加载方方式使用dtu_yao_eval ; batch_size=1 ,视图数N设…

【C++】命名空间 ( namespace )

目录搁这 什么是命名空间命名空间的作用如何定义命名空间命名空间的种类如何使用命名空间内的成员作用域限定符命名空间展开命名空间全部展开命名空间部分展开 总结 什么是命名空间 命名空间是一种用来避免命名冲突的机制,它可以将一段代码的名称隔离开&#xff0c…

中国地图使用心得

中国地图使用心得 注册地图是注册在echarts对象上而非 自己构建的echarts dom上、。 请求本地json文件 ​ vue项目的public打包时不会动,所以线上和本地地址直接指向了public同级目录,请求时直接相对路径 绘制中国地图时,如何在各个省会地方…

「深度学习之优化算法」(十三)蝙蝠算法

1. 蝙蝠算法简介 (以下描述,均不是学术用语,仅供大家快乐的阅读)   蝙蝠算法(Bat Algorithm)是受蝙蝠回声定位的特性启发而提出的新兴算法,提出时间是2010年,虽然距今(2020)有近10年,但与其它的经典算法相比仍算一个新算法。算法也已有一定规模的研究和应用,但仍…

【LLM系列之LLaMA2】LLaMA 2技术细节详细介绍!

Llama 2 发布! Meta 刚刚发布了 LLaMa 2,它是 LLaMA 的下一代版本,具有商业友好的许可证。🤯😍 LLaMA 2 有 3 种不同的尺寸:7B、13B 和 70B。 7B & 13B 使用与 LLaMA 1 相同的架构,并且是商…

年CTF—初五

0x00 前言 CTF 加解密合集:CTF 加解密合集 0x01 题目 神秘人送来了半个世纪前的无线电信号,但是只能分别出以下的密文: YDHML_QKA_PDK_HVD_NAHI_OQ_K_GR 据说上面的无线电信号代表的是中文,由红岸基地发往半人马星系 半个世纪过…

数据容器入门(set)

集合的定义: 语法:变量名称 {元素,元素,元素.........元素} 定义空集合: 变量名称 set() set {“abc”,123,“def”} 集合的特点: 可以容纳多个数据可以容…

数据结构01-线性结构-链表栈队列-栈篇

文章目录 参考:总结大纲要求线性结构-栈回文匹配小猫钓鱼的故事 参考: 线性结构-栈 总结 本系列为C数据结构系列,会介绍 线性结构,简单树,特殊树,简单图等。本文为线性结构部分。 大纲要求 线性结构 【…

回归预测 | MATLAB实现GRU(门控循环单元)多输入单输出(不调用工具箱函数)

回归预测 | MATLAB实现GRU(门控循环单元)多输入单输出(不调用工具箱函数) 文章目录 回归预测 | MATLAB实现GRU(门控循环单元)多输入单输出(不调用工具箱函数)预测效果基本介绍程序设计参考资料 预测效果 基本介绍 GRU神经网络是LSTM神经网络的一种变体,LSTM 神经网 …