分布式事务 | 使用DTM 的Saga 模式

news2025/3/13 7:13:06

DTM 简介


前面章节提及的MassTransit、dotnetcore/CAP都提供了分布式事务的处理能力,但也仅局限于Saga和本地消息表模式的实现。那有没有一个独立的分布式事务解决方案,涵盖多种分布式事务处理模式,如Saga、TCC、XA模式等。有,目前业界主要有两种开源方案,其一是阿里开源的Seata,另一个就是DTM。其中Seata仅支持Java、Go和Python语言,因此不在.NET 的选择范围。DTM则通过提供简单易用的HTTP和gRPC接口,屏蔽了语言的无关性,因此支持任何开发语言接入,目前提供了Go、Python、NodeJs、Ruby、Java和C#等语言的SDK。

DTM,全称Distributed Transaction Manager,是一个分布式事务管理器,解决跨数据库、跨服务、跨语言更新数据的一致性问题。它提供了Saga、TCC、 XA和二阶段消息模式以满足不同应用场景的需求,同时其首创的子事务屏障技术可以有效解决幂等、悬挂和空补偿等异常问题。

DTM 事务处理过程及架构


那DTM是如何处理分布式事务的呢?以一个经典的跨行转账业务为例来看下事务处理过程。对于跨行转账业务而言,很显然是跨库跨服务的应用场景,不能简单通过本地事务解决,可以使用Saga模式,以下是基于DTM提供的Saga事务模式成功转账的的时序图:

从以上时序图可以看出,DTM整个全局事务分为如下几步:

  1. 用户定义好全局事务所有的事务分支(全局事务的组成部分称为事务分支),然后提交给DTM,DTM持久化全局事务信息后,立即返回

  1. DTM取出第一个事务分支,这里是TransOut,调用该服务并成功返回

  1. DTM取出第二个事务分支,这里是TransIn,调用该服务并成功返回

  1. DTM已完成所有的事务分支,将全局事务的状态修改为已完成

基于以上这个时序图的基础上,再来看下DTM的架构:

整个DTM架构中,一共有三个角色,分别承担了不同的职责:

  • RM-资源管理器:RM是一个应用服务,通常连接到独立的数据库,负责处理全局事务中的本地事务,执行相关数据的修改、提交、回滚、补偿等操作。例如在前面的这个Saga事务时序图中,步骤2、3中被调用的TransIn和TransOut方法所在的服务都是RM。

  • AP-应用程序:AP是一个应用服务,负责全局事务的编排,他会注册全局事务,注册子事务,调用RM接口。例如在前面的这个SAGA事务中,发起步骤1的是AP,它编排了一个包含TransOut、TransIn的全局事务,然后提交给TM

  • TM-事务管理器:TM就是DTM服务,负责全局事务的管理,作为一个独立的服务而存在。每个全局事务都注册到TM,每个事务分支也注册到TM。TM会协调所有的RM来执行不同的事务分支,并根据执行结果决定是否提交或回滚事务。例如在前面的Saga事务时序图中,TM在步骤2、3中调用了各个RM,在步骤4中,完成这个全局事务。

总体而言,AP-应用程序充当全局事务编排器的角色通过DTM提供的开箱即用的SDK进行全局事务和子事务的注册。TM-事务管理器接收到注册的全局事务和子事务后,负责调用RM-资源管理器来执行对应的事务分支,TM-事务管理器根据事务分支的执行结果决定是否提及或回滚事务。

快速上手


百闻不如一见,接下来就来实际上手体验下如何基于DTM来实际应用Saga进行分布式跨行转账事务的处理。

创建示例项目


接下来就来创建一个示例项目:

  1. 使用dotnet new webapi -n DtmDemo.Webapi创建示例项目。

  1. 添加Nuget包:DtmcliPomelo.EntityFrameworkCore.MySql

  1. 添加DTM配置项:

{
  "dtm": {
    "DtmUrl": "http://localhost:36789",
    "DtmTimeout": 10000,
    "BranchTimeout": 10000,
    "DBType": "mysql",
    "BarrierTableName": "dtm_barrier.barrier",
  }
}
  1. 定义银行账户BankAccount实体类:

namespaceDtmDemo.WebApi.Models
{
    publicclassBankAccount
    {
        publicint Id { get; set; }
        publicdecimal Balance { get; set; }
    }
}
  1. 定义DtmDemoWebApiContext数据库上下文:

using Microsoft.EntityFrameworkCore;

namespaceDtmDemo.WebApi.Data
{
    publicclassDtmDemoWebApiContext : DbContext
    {
        publicDtmDemoWebApiContext (DbContextOptions<DtmDemoWebApiContext> options)
            : base(options)
        {
        }

        public DbSet<DtmDemo.WebApi.Models.BankAccount> BankAccount { get; set; } = default!;
    }
}
  1. 注册DbContext 和DTM服务:

using Microsoft.EntityFrameworkCore;
using DtmDemo.WebApi.Data;
using Dtmcli;

var builder = WebApplication.CreateBuilder(args);
var connectionStr = builder.Configuration.GetConnectionString("DtmDemoWebApiContext");
// 注册DbContext
builder.Services.AddDbContext<DtmDemoWebApiContext>(options =>
{
    options.UseMySql(connectionStr, ServerVersion.AutoDetect(connectionStr));
});

// 注册DTM
builder.Services.AddDtmcli(builder.Configuration, "dtm");
  1. 执行dotnet ef migrations add 'Initial' 创建迁移。

  1. 为便于初始化演示数据,定义BankAccountController如下,其中PostBankAccount接口添加了await _context.Database.MigrateAsync();用于自动应用迁移。

using Microsoft.AspNetCore.Mvc;
using Microsoft.EntityFrameworkCore;
using DtmDemo.WebApi.Data;
using DtmDemo.WebApi.Models;
using Dtmcli;

namespaceDtmDemo.WebApi.Controllers
{
    [Route("api/[controller]")]
    [ApiController]
    publicclassBankAccountsController : ControllerBase
    {
        privatereadonly DtmDemoWebApiContext _context;

        publicBankAccountsController(DtmDemoWebApiContext context)
        {
            _context = context;
        }
		[HttpGet]
        publicasync Task<ActionResult<IEnumerable<BankAccount>>> GetBankAccount()
        {
            returnawait _context.BankAccount.ToListAsync();
        }

        [HttpPost]
        publicasync Task<ActionResult<BankAccount>> PostBankAccount(BankAccount bankAccount)
        {
            await _context.Database.MigrateAsync();
            _context.BankAccount.Add(bankAccount);
            await _context.SaveChangesAsync();

            return Ok(bankAccount);
        }
}

应用Saga模式


接下来定义SagaDemoController来使用DTM的Saga模式来模拟跨行转账分布式事务:

using Microsoft.AspNetCore.Mvc;
using Microsoft.EntityFrameworkCore;
using DtmDemo.WebApi.Data;
using DtmDemo.WebApi.Models;
using Dtmcli;
using DtmCommon;

namespaceDtmDemo.WebApi.Controllers
{
    [Route("api/[controller]")]
    [ApiController]
    publicclassSagaDemoController : ControllerBase
    {
        privatereadonly DtmDemoWebApiContext _context;
        privatereadonly IConfiguration _configuration;
        privatereadonly IDtmClient _dtmClient;
        privatereadonly IDtmTransFactory _transFactory;

        privatereadonly IBranchBarrierFactory _barrierFactory;
        privatereadonly ILogger<BankAccountsController> _logger;

        publicSagaDemoController(DtmDemoWebApiContext context, IConfiguration configuration, IDtmClient dtmClient, IDtmTransFactory transFactory, ILogger<BankAccountsController> logger, IBranchBarrierFactory barrierFactory)
        {
            this._context = context;
            this._configuration = configuration;
            this._dtmClient = dtmClient;
            this._transFactory = transFactory;
            this._logger = logger;
            this._barrierFactory = barrierFactory;
        }
}

对于跨行转账业务,使用DTM的Saga模式,首先要进行事务拆分,可以拆分为以下4个子事务,并分别实现:

转出子事务(TransferOut)


    [HttpPost("TransferOut")]
    publicasync Task<IActionResult> TransferOut([FromBody] TransferRequest request)
    {
        var msg = $"用户{request.UserId}转出{request.Amount}元";
        _logger.LogInformation($"转出子事务-启动:{msg}");
        // 1. 创建子事务屏障var branchBarrier = _barrierFactory.CreateBranchBarrier(Request.Query);
        try
        {
            using (var conn = _context.Database.GetDbConnection())
            {
                // 2. 在子事务屏障内执行事务操作await branchBarrier.Call(conn, async (tx) =>
                {
                    _logger.LogInformation($"转出子事务-执行:{msg}");
                    await _context.Database.UseTransactionAsync(tx);
                    var bankAccount = await _context.BankAccount.FindAsync(request.UserId);
                    if (bankAccount == null || bankAccount.Balance < request.Amount)
                        thrownew InvalidDataException("账户不存在或余额不足!");
                    bankAccount.Balance -= request.Amount;
                    await _context.SaveChangesAsync();
                });
            }
        }
        catch (InvalidDataException ex)
        {
            _logger.LogInformation($"转出子事务-失败:{ex.Message}");
            // 3. 按照接口协议,返回409,以表示子事务失败returnnew StatusCodeResult(StatusCodes.Status409Conflict);
        }
        _logger.LogInformation($"转出子事务-成功:{msg}");
        return Ok();
    }

以上代码中有几点需要额外注意:

  1. 使用Saga模式,必须开启子事务屏障:_barrierFactory.CreateBranchBarrier(Request.Query),其中Request.Query中的参数由DTM 生成,类似:?branch_id=01&gid=XTzKHgxemLyL8EXtMTLvzK&op=action&trans_type=saga,主要包含四个参数:

  1. gid:全局事务Id

  1. trans_type:事务类型,是saga、msg、xa或者是tcc。

  1. branch_id:子事务的Id

  1. op:当前操作,对于Saga事务模式,要么为action(正向操作),要么为compensate(补偿操作)。

  1. 必须在子事务屏障内执行事务操作:branchBarrier.Call(conn, async (tx) =>{}

  1. 对于Saga正向操作而言,业务上的失败与异常是需要做严格区分的,例如前面的余额不足,是业务上的失败,必须回滚。而对于网络抖动等其他外界原因导致的事务失败,属于业务异常,则需要重试。因此若因业务失败(这里是账户不存在或余额不足)而导致子事务失败,则必须通过抛异常的方式并返回**409**状态码以告知DTM 子事务失败。

  1. 以上通过抛出异常的方式中断子事务执行并在外围捕获特定异常返回409状态码。在外围捕获异常时切忌放大异常捕获,比如直接catch(Exception),如此会捕获由于网络等其他原因导致的异常,而导致DTM 不再自动处理该异常,比如业务异常时的自动重试。

转出补偿子事务(TransferOut_Compensate)


转出补偿,就是回滚转出操作,进行账户余额归还,实现如下:

    [HttpPost("TransferOut_Compensate")]
    publicasync Task<IActionResult> TransferOut_Compensate([FromBody] TransferRequest request)
    {
        var msg = $"用户{request.UserId}回滚转出{request.Amount}元";
        _logger.LogInformation($"转出补偿子事务-启动:{msg}");
        // 1. 创建子事务屏障var branchBarrier = _barrierFactory.CreateBranchBarrier(Request.Query);
        using (var conn = _context.Database.GetDbConnection())
        {
            // 在子事务屏障内执行事务操作await branchBarrier.Call(conn, async (tx) =>
            {
                _logger.LogInformation($"转出补偿子事务-执行:{msg}");
                await _context.Database.UseTransactionAsync(tx);
                var bankAccount = await _context.BankAccount.FindAsync(request.UserId);
                if (bankAccount == null)
                    return; //对于补偿操作,可直接返回,中断后续操作
                bankAccount.Balance += request.Amount;
                await _context.SaveChangesAsync();
            });
        }
        _logger.LogInformation($"转出补偿子事务-成功!");
        // 2. 因补偿操作必须成功,所以必须返回200。return Ok();
    }

由于DTM设计为总是执行补偿,也就是说即使正向操作子事务失败时,DTM 仍旧会执行补偿逻辑。但子事务屏障会在执行时判断正向操作的执行状态,当子事务失败时,并不会执行补偿逻辑。

另外DTM的补偿操作,是要求最终成功的,只要还没成功,就会不断进行重试,直到成功因此在补偿子事务中,即使补偿子事务中出现业务失败时,也必须返回**200**因此当出现bankAccount==null时可以直接 return。

转入子事务(TransferIn)


转入子事务和转出子事务的实现基本类似,都是开启子事务屏障后,在branchBarrier.Call(conn, async tx => {}中实现事务逻辑,并通过抛异常的方式并最终返回409状态码来显式告知DTM 子事务执行失败。

    [HttpPost("TransferIn")]
    publicasync Task<IActionResult> TransferIn([FromBody] TransferRequest request)
    {
        var msg = $"用户{request.UserId}转入{request.Amount}元";
        _logger.LogInformation($"转入子事务-启动:{msg}");
        var branchBarrier = _barrierFactory.CreateBranchBarrier(Request.Query);
        try
        {
            using (var conn = _context.Database.GetDbConnection())
            {
                await branchBarrier.Call(conn, async (tx) =>
                {
                    _logger.LogInformation($"转入子事务-执行:{msg}");
                    await _context.Database.UseTransactionAsync(tx);
                    var bankAccount = await _context.BankAccount.FindAsync(request.UserId);
                    if (bankAccount == null)
                        thrownew InvalidDataException("账户不存在!");
                    bankAccount.Balance += request.Amount;
                    await _context.SaveChangesAsync();
                });
            }
        }
        catch (InvalidDataException ex)
        {
            _logger.LogInformation($"转入子事务-失败:{ex.Message}");
            returnnew StatusCodeResult(StatusCodes.Status409Conflict);
        }
        _logger.LogInformation($"转入子事务-成功:{msg}");
        return Ok();
    }

转入补偿子事务(TransferIn_Compensate)


转入补偿子事务和转出补偿子事务的实现也基本类似,都是开启子事务屏障后,在branchBarrier.Call(conn, async tx => {}中实现事务逻辑,并最终返回200状态码来告知DTM 补偿子事务执行成功。

    [HttpPost("TransferIn_Compensate")]
    publicasync Task<IActionResult> TransferIn_Compensate([FromBody] TransferRequest request)
    {
        var msg = "用户{request.UserId}回滚转入{request.Amount}元";
        _logger.LogInformation($"转入补偿子事务-启动:{msg}");
        var branchBarrier = _barrierFactory.CreateBranchBarrier(Request.Query);
        using (var conn = _context.Database.GetDbConnection())
        {
            await branchBarrier.Call(conn, async (tx) =>
            {
                _logger.LogInformation($"转入补偿子事务-执行:{msg}");
                await _context.Database.UseTransactionAsync(tx);
                var bankAccount = await _context.BankAccount.FindAsync(request.UserId);
                if (bankAccount == null) return;
                bankAccount.Balance -= request.Amount;
                await _context.SaveChangesAsync();
            });
        }
        _logger.LogInformation($"转入补偿子事务-成功!");
        return Ok();
    }

编排Saga事务


拆分完子事务,最后就可以进行Saga事务编排了,其代码如下所示:

    [HttpPost("Transfer")]
    publicasync Task<IActionResult> Transfer(int fromUserId, int toUserId, decimal amount,
        CancellationToken cancellationToken)
    {
        try
        {
            _logger.LogInformation($"转账事务-启动:用户{fromUserId}转账{amount}元到用户{toUserId}");
            //1. 生成全局事务IDvar gid = await _dtmClient.GenGid(cancellationToken);
            var bizUrl = _configuration.GetValue<string>("TransferBaseURL");
            //2. 创建Sagavar saga = _transFactory.NewSaga(gid);
            //3. 添加子事务
        	saga.Add(bizUrl + "/TransferOut", bizUrl + "/TransferOut_Compensate",
                    new TransferRequest(fromUserId, amount))
                .Add(bizUrl + "/TransferIn", bizUrl + "/TransferIn_Compensate",
                    new TransferRequest(toUserId, amount))
                .EnableWaitResult(); // 4. 按需启用是否等待事务执行结果//5. 提交Saga事务await saga.Submit(cancellationToken);
        }
        catch (DtmException ex) // 6. 如果开启了`EnableWaitResult()`,则可通过捕获异常的方式,捕获事务失败的结果。
        {
            _logger.LogError($"转账事务-失败:用户{fromUserId}转账{amount}元到用户{toUserId}失败!");
            returnnew BadRequestObjectResult($"转账失败:{ex.Message}");
        }

        _logger.LogError($"转账事务-完成:用户{fromUserId}转账{amount}元到用户{toUserId}成功!");
        return Ok($"转账事务-完成:用户{fromUserId}转账{amount}元到用户{toUserId}成功!");
    }

主要步骤如下:

  1. 生成全局事务Id:var gid =await _dtmClient.GenGid(cancellationToken);

  1. 创建Saga全局事务:_transFactory.NewSaga(gid);

  1. 添加子事务:saga.Add(string action, string compensate, object postData);包含正向和反向子事务。

  1. 如果依赖事务执行结果,可通过EnableWaitResult()开启事务结果等待。

  1. 提交Saga全局事务:saga.Submit(cancellationToken);

  1. 若开启了事务结果等待,可以通过try...catch..来捕获DtmExcepiton异常来获取事务执行异常信息。

运行项目


既然DTM作为一个独立的服务存在,其负责通过HTTP或gRPC协议发起子事务的调用,因此首先需要启动一个DTM实例,又由于本项目依赖MySQL,因此我们采用Docker Compose的方式来启动项目。在Visual Studio中通过右键项目->Add->Docker Support->Linux 即可添加Dockerfile如下所示:

FROM mcr.microsoft.com/dotnet/aspnet:6.0 AS base
WORKDIR /app
EXPOSE 80
EXPOSE 443

FROM mcr.microsoft.com/dotnet/sdk:6.0 AS build
WORKDIR /src
COPY ["DtmDemo.WebApi/DtmDemo.WebApi.csproj", "DtmDemo.WebApi/"]
RUN dotnet restore "DtmDemo.WebApi/DtmDemo.WebApi.csproj"
COPY . .
WORKDIR "/src/DtmDemo.WebApi"
RUN dotnet build "DtmDemo.WebApi.csproj" -c Release -o /app/build

FROM build AS publish
RUN dotnet publish "DtmDemo.WebApi.csproj" -c Release -o /app/publish

FROM base AS final
WORKDIR /app
COPY --from=publish /app/publish .
ENTRYPOINT ["dotnet", "DtmDemo.WebApi.dll"]

在Visual Studio中通过右键项目->Add Container Orchestrator Support->Docker Compose即可添加docker-compose.yml,由于整个项目依赖mysql和DTM,修改docker-compose.yml如下所示,其中定义了三个服务:db,dtm和dtmdemo.webapi。

version:'3.4'services:db:image:'mysql:5.7'container_name:dtm-mysqlenvironment:MYSQL_ROOT_PASSWORD:123456# 指定MySQL初始密码volumes:-./docker/mysql/scripts:/docker-entrypoint-initdb.d# 挂载用于初始化数据库的脚本ports:-'3306:3306'dtm:depends_on: ["db"]
    image:'yedf/dtm:latest'container_name:dtm-svcenvironment:IS_DOCKER:'1'STORE_DRIVER:mysql# 指定使用MySQL持久化DTM事务数据STORE_HOST:db# 指定MySQL服务名,这里是dbSTORE_USER:rootSTORE_PASSWORD:'123456'STORE_PORT:3306STORE_DB:"dtm"# 指定DTM 数据库名ports:-'36789:36789'# DTM HTTP 端口-'36790:36790'# DTM gRPC 端口dtmdemo.webapi:depends_on: ["dtm", "db"]
    image:${DOCKER_REGISTRY-}dtmdemowebapienvironment:ASPNETCORE_ENVIRONMENT:docker# 设定启动环境为dockercontainer_name:dtm-webapi-demobuild:context:.dockerfile:DtmDemo.WebApi/Dockerfileports:-'31293:80'# 映射Demo:80端口到本地31293端口-'31294:443'	 # 映射Demo:443端口到本地31294端口

其中dtmdemo.webapi服务通过ASPNETCORE_ENVIRONMENT: docker指定启动环境为docker,因此需要在项目下添加appsettings.docker.json以配置应用参数:

{
  "ConnectionStrings": {
    "DtmDemoWebApiContext":"Server=db;port=3306;database=dtm_barrier;user id=root;password=123456;AllowLoadLocalInfile=true"
  },
  "TransferBaseURL":"http://dtmdemo.webapi/api/SagaDemo",
  "dtm": {
    "DtmUrl":"http://dtm:36789",
    "DtmTimeout":10000,
    "BranchTimeout":10000,
    "DBType":"mysql",
    "BarrierTableName":"dtm_barrier.barrier"
  }
}

另外db服务中通过volumes: ["./docker/mysql/scripts:/docker-entrypoint-initdb.d"]来挂载初始化脚本,以创建DTM依赖的MySQL 存储数据库dtm和示例项目使用子事务屏障需要的barrier数据表。脚本如下:

CREATE DATABASE IF NOTEXISTS dtm
/*!40100 DEFAULT CHARACTER SET utf8mb4 */
;
droptable IF EXISTS dtm.trans_global;
CREATETABLE if notEXISTS dtm.trans_global (
  `id` bigint(22) NOTNULL AUTO_INCREMENT,
  `gid` varchar(128) NOTNULL COMMENT 'global transaction id',
  `trans_type` varchar(45) notnull COMMENT 'transaction type: saga | xa | tcc | msg',
  `status` varchar(12) NOTNULL COMMENT 'tranaction status: prepared | submitted | aborting | finished | rollbacked',
  `query_prepared` varchar(1024) NOTNULL COMMENT 'url to check for msg|workflow',
  `protocol` varchar(45) notnull comment 'protocol: http | grpc | json-rpc',
  `create_time` datetime DEFAULTNULL,
  `update_time` datetime DEFAULTNULL,
  `finish_time` datetime DEFAULTNULL,
  `rollback_time` datetime DEFAULTNULL,
  `options` varchar(1024) DEFAULT'options for transaction like: TimeoutToFail, RequestTimeout',
  `custom_data` varchar(1024) DEFAULT'' COMMENT 'custom data for transaction',
  `next_cron_interval` int(11) defaultnull comment 'next cron interval. for use of cron job',
  `next_cron_time` datetime defaultnull comment 'next time to process this trans. for use of cron job',
  `owner` varchar(128) notnulldefault'' comment 'who is locking this trans',
  `ext_data` TEXT comment 'result for this trans. currently used in workflow pattern',
  `result` varchar(1024) DEFAULT'' COMMENT 'rollback reason for transaction',
  `rollback_reason` varchar(1024) DEFAULT'' COMMENT 'rollback reason for transaction',
  PRIMARY KEY (`id`),
  UNIQUE KEY `gid` (`gid`),
  key `owner`(`owner`),
  key `status_next_cron_time` (`status`, `next_cron_time`) comment 'cron job will use this index to query trans'
) ENGINE = InnoDB DEFAULT CHARSET = utf8mb4;
droptable IF EXISTS dtm.trans_branch_op;
CREATETABLE IF NOTEXISTS dtm.trans_branch_op (
  `id` bigint(22) NOTNULL AUTO_INCREMENT,
  `gid` varchar(128) NOTNULL COMMENT 'global transaction id',
  `url` varchar(1024) NOTNULL COMMENT 'the url of this op',
  `data` TEXT COMMENT 'request body, depreceated',
  `bin_data` BLOB COMMENT 'request body',
  `branch_id` VARCHAR(128) NOTNULL COMMENT 'transaction branch ID',
  `op` varchar(45) NOTNULL COMMENT 'transaction operation type like: action | compensate | try | confirm | cancel',
  `status` varchar(45) NOTNULL COMMENT 'transaction op status: prepared | succeed | failed',
  `finish_time` datetime DEFAULTNULL,
  `rollback_time` datetime DEFAULTNULL,
  `create_time` datetime DEFAULTNULL,
  `update_time` datetime DEFAULTNULL,
  PRIMARY KEY (`id`),
  UNIQUE KEY `gid_uniq` (`gid`, `branch_id`, `op`)
) ENGINE = InnoDB DEFAULT CHARSET = utf8mb4;
droptable IF EXISTS dtm.kv;
CREATETABLE IF NOTEXISTS dtm.kv (
  `id` bigint(22) NOTNULL AUTO_INCREMENT,
  `cat` varchar(45) NOTNULL COMMENT 'the category of this data',
  `k` varchar(128) NOTNULL,
  `v` TEXT,
  `version` bigint(22) default1 COMMENT 'version of the value',
  create_time datetime defaultNULL,
  update_time datetime DEFAULTNULL,
  PRIMARY KEY (`id`),
  UNIQUE key `uniq_k`(`cat`, `k`)
) ENGINE = InnoDB DEFAULT CHARSET = utf8mb4;
create database if notexists dtm_barrier
/*!40100 DEFAULT CHARACTER SET utf8mb4 */
;
droptable if exists dtm_barrier.barrier;
createtable if notexists dtm_barrier.barrier(
  id bigint(22) PRIMARY KEY AUTO_INCREMENT,
  trans_type varchar(45) default'',
  gid varchar(128) default'',
  branch_id varchar(128) default'',
  op varchar(45) default'',
  barrier_id varchar(45) default'',
  reason varchar(45) default'' comment 'the branch type who insert this record',
  create_time datetime DEFAULT now(),
  update_time datetime DEFAULT now(),
  key(create_time),
  key(update_time),
  UNIQUE key(gid, branch_id, op, barrier_id)
);

准备完毕,即可通过点击Visual Studio工具栏的Docker Compose的启动按钮,启动后可以在Containers窗口看到启动了dtm-mysql、dtm-svc和dtm-webapi-demo三个容器,并在浏览器中打开了 http://localhost:31293/swagger/index.html Swagger 网页。该种方式启动项目是支持断点调试项目,如下图所示:

通过BankAccouts控制器的POST接口,初始化用户1和用户2各100元。再通过SagaDemo控制器的/api/Transfer接口,进行Saga事务测试。

  1. 用户1转账10元到用户2

由于用户1和用户2已存在,且用户1余额足够, 因此该笔转账合法因此会成功,其执行路径为:转出(成功)->转入(成功)-> 事务完成,执行日志如下图所示:

  1. 用户3转账10元到用户1

由于用户3不存在,因此执行路径为:转出(失败)->转出补偿(成功)->事务完成。从下图的执行日志可以看出,转出子事务失败,还是会调用对应的转出补偿操作,但子事务屏障会过进行过滤,因此实际上并不会执行真正的转出补偿逻辑,其中红线框住的部分就是证明。

  1. 用户1转账10元到用户3

由于用户3不存在,因此执行路径为:转出(成功)->转入(失败)->转入补偿(成功)->转出补偿(成功)->事务完成。从下图的执行日志可以看出,转入子事务失败,还是会调用对应的转入补偿操作,但子事务屏障会过进行过滤,因此实际上并不会执行真正的转入补偿逻辑,其中红线框住的部分就是证明。

子事务屏障


在以上的示例中,重复提及子事务屏障,那子事务屏障具体是什么,这里有必要重点说明下。以上面用户1转账10元到用户3为例,整个事务流转过程中,即转出(成功)->转入(失败)->转入补偿(成功)->转出补偿(成功)->事务完成。

在提交事务之后,首先是全局事务的落库,主要由DTM 服务负责,主要包括两张表:trans_global和trans_branch_op,DTM 依此进行子事务分支的协调。其中trans_global会插入一条全局事务记录,用于记录全局事务的状态信息,如下图1所示。trans_branch_op表为trans_global的子表,记录四条子事务分支数据,如下图2所示:

具体的服务再接收到来自Dtm的子事务分支调用时,每次都会往子事务屏障表barrier中插入一条数据,如下图所示。业务服务就是依赖此表来完成子事务的控制。

而子事务屏障的核心就是子事务屏障表唯一键的设计,以gid、branch_id、op和barrier_id为唯一索引,利用唯一索引,“以改代查”来避免竞态条件。在跨行转账的Saga示例中,子事务分支的执行步骤如下所示:

  1. 开启本地事务

  1. 对于当前操作op(action|compensate),使用inster ignore into barrier(trans_type, gid, branch_id, op, barrier_id, reason)向子事务屏障表插入一条数据,有几种情况:

  1. 插入成功且影响条数大于0,则继续向下执行。

  1. 插入成功但影响条数等于0,说明触发唯一键约束,此时会进行空补偿、悬挂和重复请求判断,若是则直接返回,跳过后续子事务分支逻辑的执行。

  1. 第2步插入成功,则可以继续执行子事务分支逻辑,执行业务数据表操作,结果分两种请求

  1. 子事务成功,子事务屏障表操作和业务数据表操作由于共享同一个本地事务,提交本地事务,因此可实现强一致性,当前子事务分支完成。

  1. 子事务失败,回滚本地事务

每个子事务分支通过以上步骤,即可实现下图的效果:

小结


本文主要介绍了DTM的Saga模式的应用,基于DTM 首创的子事务屏障技术,使得开发者基于DTM 提供的SDK能够轻松开发出更可靠的分布式应用,彻底将开发人员从网络异常的处理中解放出来,再也不用担心空补偿、防悬挂、幂等等分布式问题。如果要进行分布式事务框架的选型,DTM 将是不二之选。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/338059.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

【AI数学】相机成像之内参数

计算机视觉偏底层的工作会跟摄像机打交道&#xff0c;最近正好有接触&#xff0c;所以整理总结一下。 相机参数通常分为内参数、外参数&#xff0c;偶尔会有畸变参数等滤镜参数。 申明&#xff1a;本文图例均为原创&#xff0c;借用需附此文链接。 内参数&#xff1a;相机内部的…

[SSD固态硬盘技术 15] FTL映射表的神秘面纱

为什么需要映射表?固态硬盘的存储器件采用的是闪存[5],具有以下几个特点: (1)读写基本单位是以页(Page)为单位,擦除是以块(Block)为单位。

NFC概述摘要

同学,别退出呀,我可是全网最牛逼的 WIFI/BT/GPS/NFC分析博主,我写了上百篇文章,请点击下面了解本专栏,进入本博主主页看看再走呗,一定不会让你后悔的,记得一定要去看主页置顶文章哦。 原理来说,NFC和Wi-Fi类似,利用无线射频技术来实现设备间通信。NFC的工作频率为13.5…

基于c语言实现的对代码的同源性检测

完整代码&#xff1a;https://download.csdn.net/download/qq_38735017/87382389本次课程设计为了巩固上学期在软件安全课程上所学的安全知识&#xff0c;包括堆栈溢出、整数溢出等等&#xff0c;同时考察了一些课外的新事物&#xff0c;例如字符串匹配与CFG控制流程图的同源性…

Attention机制 学习笔记

学习自https://easyai.tech/ai-definition/attention/ Attention本质 Attention&#xff08;注意力&#xff09;机制如果浅层的理解&#xff0c;跟他的名字非常匹配。他的核心逻辑就是“从关注全部到关注重点”。 比如我们人在看图片时&#xff0c;对图片的不同地方的注意力…

为什么要在电子产品中使用光耦合器?

介绍 光耦合器不仅可以保护敏感电路&#xff0c;还可以使工程师设计各种硬件应用。光耦合器通过保护元件&#xff0c;可以避免更换元件的大量成本。然而&#xff0c;光耦合器比保险丝更复杂。光耦合器还可以通过光耦合器连接和断开两个电路&#xff0c;从而方便地控制两个电路…

【Markdown】markdown语法规定

这里写自定义目录标题欢迎使用Markdown编辑器新的改变功能快捷键合理的创建标题&#xff0c;有助于目录的生成如何改变文本的样式插入链接与图片如何插入一段漂亮的代码片生成一个适合你的列表创建一个表格设定内容居中、居左、居右SmartyPants创建一个自定义列表如何创建一个注…

Vue3 如何实现一个函数式右键菜单(ContextMenus)

前言: 最近在公司 PC 端的项目中使用到了右键出现菜单选项这样的一个工作需求&#xff0c;并且自己现在也在实现一个偶然迸发的 idea&#xff08; 想用前端实现一个 windows 系统从开机到桌面的 UI&#xff09;&#xff0c;其中也要用到右键弹出菜单这样的一个功能&#xff0c;…

通讯录文件操作化

宝子&#xff0c;你不点个赞吗&#xff1f;不评个论吗&#xff1f;不收个藏吗&#xff1f; 最后的最后&#xff0c;关注我&#xff0c;关注我&#xff0c;关注我&#xff0c;你会看到更多有趣的博客哦&#xff01;&#xff01;&#xff01; 喵喵喵&#xff0c;你对我真的很重…

几个chatGPT的难题,关于语言转换

不同语言代码的移植一直以来是程序员面临的难题&#xff0c;最近问了问chatGPT能否解决这个问题。编写一个程序&#xff0c;实现c语言函数转换为php函数答&#xff1a;这是一个非常困难的问题&#xff0c;因为两种语言的语法、结构和标准库都不相同。如果您希望完成这个任务&am…

MySql服务多版本之间的切换

从网上总结的经验&#xff0c;然后根据自己所遇到的问题合并记录一下&#xff0c;方便日后再次需要用到 MySql服务多版本同时运行 步骤 1、如果你电脑上已经有一个mysql版本&#xff0c;例如mysql-5.7.39-winx64&#xff0c;它占据了3306端口。此时如果你想下仔另一版本&…

活动星投票紫砂新青年制作一个投票活动

“紫砂新青年”网络评选投票_免费链接投票_作品投票通道_扫码投票怎样进行现在来说&#xff0c;公司、企业、学校更多的想借助短视频推广自己。通过微信投票小程序&#xff0c;网友们就可以通过手机拍视频上传视频参加活动&#xff0c;而短视频微信投票评选活动既可以给用户发挥…

6年自动化测试,终于进华为了,年薪25w其实也并非触不可及

我的职业生涯开始和大多数测试人一样&#xff0c;开始接触都是纯功能界面测试&#xff0c;第一份测试工作就是在电商公司做功能测试&#xff0c;工作忙忙碌碌&#xff0c;每天在各种业务需求学习和点点中度过&#xff0c;过了好几年发现自己还只是一个功能测试工程师&#xff0…

锐捷(十四)mpls vxn optionc的关键问题所在和具体问题分析

用锐捷的设备搭建mpls vxn optionc的基础版和带RR的版本&#xff0c;在控制平面和转发平免上分析mpls vxn optionc的关键问题所在和具体问题分析。一 基础mpls vxn optionc&#xff1a;核心&#xff1a;两pe之间之间建立MP EBGP邻居&#xff0c;从而直接传递路由解放了ASBR。关…

LeetCode-1223-掷骰子模拟

1、动态规划法 我们可以利用数组dp[i][j][k]dp[i][j][k]dp[i][j][k]来表示当我们已经投过iii次骰子&#xff0c;其中第iii次投出的骰子是jjj&#xff0c;此时连续投出骰子jjj的次数为kkk。因此我们可以根据上一轮中得到的状态dp[i−1][j][k]dp[i-1][j][k]dp[i−1][j][k]&#…

最小二乘支持向量机”在学习偏微分方程 (PDE) 解方面的应用(Matlab代码实现)

目录 &#x1f4a5;1 概述 &#x1f4da;2 运行结果 &#x1f389;3 参考文献 &#x1f468;‍&#x1f4bb;4 Matlab代码 &#x1f4a5;1 概述 本代码说明了“最小二乘支持向量机”在学习偏微分方程 &#xff08;PDE&#xff09; 解方面的应用。提供了一个示例&#xff0c…

加盟管理系统挑选法则,看完不怕被坑!

经营服装连锁店铺究竟有多难&#xff1f;小编已经不止一次听到身边的老板&#xff0c;抱怨加盟连锁店铺难以管理了&#xff0c;但同时呢&#xff0c;也听到了很多作为加盟商的老板&#xff0c;抱怨总部给的支持和管理不到位。服装加盟店铺管理&#xff0c;到底有哪些难点呢&…

BFS广度优先遍历——Acwing 844. 走迷宫

1.BFS简介我们可以将bfs当做一个成熟稳重的人&#xff0c;一个眼观六路耳听八方的人&#xff0c;他每次搜索都是一层层的搜索&#xff0c;从第一层扩散到最后一层&#xff0c;BFS可以用来解决最短路问题。2.基本思想从初始状态S开始&#xff0c;利用规则&#xff0c;生成所有可…

window11 安装node及配置环境变量

一、安装环境 本教程演示的环境&#xff1a; 系统&#xff1a;win 11 64位 node.js下载地址: http://nodejs.cn/ node.js版本&#xff1a;长期支持版本&#xff08;本教程基于16.15.0&#xff09; 点击选中图标下载到电脑本地即可。 二、安装步骤 1、双击安装包&#xff0c;一…

华为10年经验测试工程师,整理出来的python自动化测试实战

前言 全书共分11章&#xff0c;第一章是基础&#xff0c;了selenium家谱&#xff0c;各种组件之间的关系以及一些必备知识。第二章告诉如何开始用python IDLE写程序以及自动化测试环境的搭建。第三章是webdriver API&#xff0c;我花了相当多时间对原先的文档&#xff0c;冗余…