如何正确使用DTM的Saga模式

news2024/11/23 23:53:46

DTM 简介

前面章节提及的MassTransitdotnetcore/CAP都提供了分布式事务的处理能力,但也仅局限于Saga和本地消息表模式的实现。那有没有一个独立的分布式事务解决方案,涵盖多种分布式事务处理模式,如SagaTCCXA模式等。有,目前业界主要有两种开源方案,其一是阿里开源的Seata,另一个就是DTM。其中Seata仅支持Java、Go和Python语言,因此不在.NET 的选择范围。DTM则通过提供简单易用的HTTP和gRPC接口,屏蔽了语言的无关性,因此支持任何开发语言接入,目前提供了Go、Python、NodeJs、Ruby、Java和C#等语言的SDK。
DTM,全称Distributed Transaction Manager,是一个分布式事务管理器,解决跨数据库、跨服务、跨语言更新数据的一致性问题。它提供了Saga、TCC、 XA和二阶段消息模式以满足不同应用场景的需求,同时其首创的子事务屏障技术可以有效解决幂等、悬挂和空补偿等异常问题。

DTM 事务处理过程及架构

那DTM是如何处理分布式事务的呢?以一个经典的跨行转账业务为例来看下事务处理过程。对于跨行转账业务而言,很显然是跨库跨服务的应用场景,不能简单通过本地事务解决,可以使用Saga模式,以下是基于DTM提供的Saga事务模式成功转账的的时序图:

从以上时序图可以看出,DTM整个全局事务分为如下几步:

  1. 用户定义好全局事务所有的事务分支(全局事务的组成部分称为事务分支),然后提交给DTM,DTM持久化全局事务信息后,立即返回
  2. DTM取出第一个事务分支,这里是TransOut,调用该服务并成功返回
  3. DTM取出第二个事务分支,这里是TransIn,调用该服务并成功返回
  4. DTM已完成所有的事务分支,将全局事务的状态修改为已完成

基于以上这个时序图的基础上,再来看下DTM的架构:

整个DTM架构中,一共有三个角色,分别承担了不同的职责:

  • RM-资源管理器:RM是一个应用服务,通常连接到独立的数据库,负责处理全局事务中的本地事务,执行相关数据的修改、提交、回滚、补偿等操作。例如在前面的这个Saga事务时序图中,步骤2、3中被调用的TransIn和TransOut方法所在的服务都是RM。
  • AP-应用程序:AP是一个应用服务,负责全局事务的编排,他会注册全局事务,注册子事务,调用RM接口。例如在前面的这个SAGA事务中,发起步骤1的是AP,它编排了一个包含TransOut、TransIn的全局事务,然后提交给TM
  • TM-事务管理器:TM就是DTM服务,负责全局事务的管理,作为一个独立的服务而存在。每个全局事务都注册到TM,每个事务分支也注册到TM。TM会协调所有的RM来执行不同的事务分支,并根据执行结果决定是否提交或回滚事务。例如在前面的Saga事务时序图中,TM在步骤2、3中调用了各个RM,在步骤4中,完成这个全局事务。

总体而言,AP-应用程序充当全局事务编排器的角色通过DTM提供的开箱即用的SDK进行全局事务和子事务的注册。TM-事务管理器接收到注册的全局事务和子事务后,负责调用RM-资源管理器来执行对应的事务分支,TM-事务管理器根据事务分支的执行结果决定是否提及或回滚事务。

快速上手

百闻不如一见,接下来就来实际上手体验下如何基于DTM来实际应用Saga进行分布式跨行转账事务的处理。

创建示例项目

接下来就来创建一个示例项目:

  1. 使用dotnet new webapi -n DtmDemo.Webapi创建示例项目。
  2. 添加Nuget包:Dtmcli 和Pomelo.EntityFrameworkCore.MySql
  3. 添加DTM配置项:
{
"dtm": {
"DtmUrl": "http://localhost:36789",
"DtmTimeout": 10000,
"BranchTimeout": 10000,
"DBType": "mysql",
"BarrierTableName": "dtm_barrier.barrier",
}
}
  1. 定义银行账户BankAccount实体类:
namespace DtmDemo.WebApi.Models
{
public class BankAccount
{
public int Id { get; set; }
public decimal Balance { get; set; }
}
}
  1. 定义DtmDemoWebApiContext数据库上下文:
using Microsoft.EntityFrameworkCore;
namespace DtmDemo.WebApi.Data
{
public class DtmDemoWebApiContext : DbContext
{
public DtmDemoWebApiContext (DbContextOptions<DtmDemoWebApiContext> options)
: base(options)
{
}
public DbSet<DtmDemo.WebApi.Models.BankAccount> BankAccount { get; set; } = default!;
}
}
  1. 注册DbContext 和DTM服务:
using Microsoft.EntityFrameworkCore;
using DtmDemo.WebApi.Data;
using Dtmcli;
var builder = WebApplication.CreateBuilder(args);
var connectionStr = builder.Configuration.GetConnectionString("DtmDemoWebApiContext");
// 注册DbContext
builder.Services.AddDbContext<DtmDemoWebApiContext>(options =>
{
options.UseMySql(connectionStr, ServerVersion.AutoDetect(connectionStr));
});
// 注册DTM
builder.Services.AddDtmcli(builder.Configuration, "dtm");
  1. 执行dotnet ef migrations add 'Initial' 创建迁移。
  2. 为便于初始化演示数据,定义BankAccountController如下,其中PostBankAccount接口添加了await _context.Database.MigrateAsync();用于自动应用迁移。
 
using Microsoft.AspNetCore.Mvc;
using Microsoft.EntityFrameworkCore;
using DtmDemo.WebApi.Data;
using DtmDemo.WebApi.Models;
using Dtmcli;
namespace DtmDemo.WebApi.Controllers
{
[Route("api/[controller]")]
[ApiController]
public class BankAccountsController : ControllerBase
{
private readonly DtmDemoWebApiContext _context;
public BankAccountsController(DtmDemoWebApiContext context)
{
_context = context;
}
[HttpGet]
public async Task<ActionResult<IEnumerable<BankAccount>>> GetBankAccount()
{
return await _context.BankAccount.ToListAsync();
}
[HttpPost]
public async Task<ActionResult<BankAccount>> PostBankAccount(BankAccount bankAccount)
{
await _context.Database.MigrateAsync();
_context.BankAccount.Add(bankAccount);
await _context.SaveChangesAsync();
return Ok(bankAccount);
}
}

应用Saga模式

接下来定义SagaDemoController来使用DTM的Saga模式来模拟跨行转账分布式事务:

using Microsoft.AspNetCore.Mvc;
using Microsoft.EntityFrameworkCore;
using DtmDemo.WebApi.Data;
using DtmDemo.WebApi.Models;
using Dtmcli;
using DtmCommon;
namespace DtmDemo.WebApi.Controllers
{
[Route("api/[controller]")]
[ApiController]
public class SagaDemoController : ControllerBase
{
private readonly DtmDemoWebApiContext _context;
private readonly IConfiguration _configuration;
private readonly IDtmClient _dtmClient;
private readonly IDtmTransFactory _transFactory;
private readonly IBranchBarrierFactory _barrierFactory;
private readonly ILogger<BankAccountsController> _logger;
public SagaDemoController(DtmDemoWebApiContext context, IConfiguration configuration, IDtmClient dtmClient, IDtmTransFactory transFactory, ILogger<BankAccountsController> logger, IBranchBarrierFactory barrierFactory)
{
this._context = context;
this._configuration = configuration;
this._dtmClient = dtmClient;
this._transFactory = transFactory;
this._logger = logger;
this._barrierFactory = barrierFactory;
}
}

对于跨行转账业务,使用DTM的Saga模式,首先要进行事务拆分,可以拆分为以下4个子事务,并分别实现:

转出子事务(TransferOut)

[HttpPost("TransferOut")]
public async Task<IActionResult> TransferOut([FromBody] TransferRequest request)
{
var msg = $"用户{request.UserId}转出{request.Amount}元";
_logger.LogInformation($"转出子事务-启动:{msg}");
// 1. 创建子事务屏障
var branchBarrier = _barrierFactory.CreateBranchBarrier(Request.Query);
try
{
using (var conn = _context.Database.GetDbConnection())
{
// 2. 在子事务屏障内执行事务操作
await branchBarrier.Call(conn, async (tx) =>
{
_logger.LogInformation($"转出子事务-执行:{msg}");
await _context.Database.UseTransactionAsync(tx);
var bankAccount = await _context.BankAccount.FindAsync(request.UserId);
if (bankAccount == null || bankAccount.Balance < request.Amount)
throw new InvalidDataException("账户不存在或余额不足!");
bankAccount.Balance -= request.Amount;
await _context.SaveChangesAsync();
});
}
}
catch (InvalidDataException ex)
{
_logger.LogInformation($"转出子事务-失败:{ex.Message}");
// 3. 按照接口协议,返回409,以表示子事务失败
return new StatusCodeResult(StatusCodes.Status409Conflict);
}
_logger.LogInformation($"转出子事务-成功:{msg}");
return Ok();
}

以上代码中有几点需要额外注意:

  1. 使用Saga模式,必须开启子事务屏障:_barrierFactory.CreateBranchBarrier(Request.Query),其中Request.Query中的参数由DTM 生成,类似:?branch_id=01&gid=XTzKHgxemLyL8EXtMTLvzK&op=action&trans_type=saga,主要包含四个参数:
    1. gid:全局事务Id
    2. trans_type:事务类型,是saga、msg、xa或者是tcc。
    3. branch_id:子事务的Id
    4. op:当前操作,对于Saga事务模式,要么为action(正向操作),要么为compensate(补偿操作)。
  2. 必须在子事务屏障内执行事务操作:branchBarrier.Call(conn, async (tx) =>{}
  3. 对于Saga正向操作而言,业务上的失败与异常是需要做严格区分的,例如前面的余额不足,是业务上的失败,必须回滚。而对于网络抖动等其他外界原因导致的事务失败,属于业务异常,则需要重试。因此若因业务失败(这里是账户不存在或余额不足)而导致子事务失败,则必须通过抛异常的方式并返回**409**状态码以告知DTM 子事务失败。
  4. 以上通过抛出异常的方式中断子事务执行并在外围捕获特定异常返回409状态码。在外围捕获异常时切忌放大异常捕获,比如直接catch(Exception),如此会捕获由于网络等其他原因导致的异常,而导致DTM 不再自动处理该异常,比如业务异常时的自动重试。

转出补偿子事务(TransferOut_Compensate)

转出补偿,就是回滚转出操作,进行账户余额归还,实现如下:

[HttpPost("TransferOut_Compensate")]
public async Task<IActionResult> TransferOut_Compensate([FromBody] TransferRequest request)
{
var msg = $"用户{request.UserId}回滚转出{request.Amount}元";
_logger.LogInformation($"转出补偿子事务-启动:{msg}");
// 1. 创建子事务屏障
var branchBarrier = _barrierFactory.CreateBranchBarrier(Request.Query);
using (var conn = _context.Database.GetDbConnection())
{
// 在子事务屏障内执行事务操作
await branchBarrier.Call(conn, async (tx) =>
{
_logger.LogInformation($"转出补偿子事务-执行:{msg}");
await _context.Database.UseTransactionAsync(tx);
var bankAccount = await _context.BankAccount.FindAsync(request.UserId);
if (bankAccount == null)
return; //对于补偿操作,可直接返回,中断后续操作
bankAccount.Balance += request.Amount;
await _context.SaveChangesAsync();
});
}
_logger.LogInformation($"转出补偿子事务-成功!");
// 2. 因补偿操作必须成功,所以必须返回200。
return Ok();
}

由于DTM设计为总是执行补偿,也就是说即使正向操作子事务失败时,DTM 仍旧会执行补偿逻辑。但子事务屏障会在执行时判断正向操作的执行状态,当子事务失败时,并不会执行补偿逻辑。
另外DTM的补偿操作,是要求最终成功的,只要还没成功,就会不断进行重试,直到成功因此在补偿子事务中,即使补偿子事务中出现业务失败时,也必须返回**200**因此当出现bankAccount==null时可以直接 return。

转入子事务(TransferIn)

转入子事务和转出子事务的实现基本类似,都是开启子事务屏障后,在branchBarrier.Call(conn, async tx => {}中实现事务逻辑,并通过抛异常的方式并最终返回409状态码来显式告知DTM 子事务执行失败。

[HttpPost("TransferIn")]
public async Task<IActionResult> TransferIn([FromBody] TransferRequest request)
{
var msg = $"用户{request.UserId}转入{request.Amount}元";
_logger.LogInformation($"转入子事务-启动:{msg}");
var branchBarrier = _barrierFactory.CreateBranchBarrier(Request.Query);
try
{
using (var conn = _context.Database.GetDbConnection())
{
await branchBarrier.Call(conn, async (tx) =>
{
_logger.LogInformation($"转入子事务-执行:{msg}");
await _context.Database.UseTransactionAsync(tx);
var bankAccount = await _context.BankAccount.FindAsync(request.UserId);
if (bankAccount == null)
throw new InvalidDataException("账户不存在!");
bankAccount.Balance += request.Amount;
await _context.SaveChangesAsync();
});
}
}
catch (InvalidDataException ex)
{
_logger.LogInformation($"转入子事务-失败:{ex.Message}");
return new StatusCodeResult(StatusCodes.Status409Conflict);
}
_logger.LogInformation($"转入子事务-成功:{msg}");
return Ok();
}

转入补偿子事务(TransferIn_Compensate)

转入补偿子事务和转出补偿子事务的实现也基本类似,都是开启子事务屏障后,在branchBarrier.Call(conn, async tx => {}中实现事务逻辑,并最终返回200状态码来告知DTM 补偿子事务执行成功。

[HttpPost("TransferIn_Compensate")]
public async Task<IActionResult> TransferIn_Compensate([FromBody] TransferRequest request)
{
var msg = "用户{request.UserId}回滚转入{request.Amount}元";
_logger.LogInformation($"转入补偿子事务-启动:{msg}");
var branchBarrier = _barrierFactory.CreateBranchBarrier(Request.Query);
using (var conn = _context.Database.GetDbConnection())
{
await branchBarrier.Call(conn, async (tx) =>
{
_logger.LogInformation($"转入补偿子事务-执行:{msg}");
await _context.Database.UseTransactionAsync(tx);
var bankAccount = await _context.BankAccount.FindAsync(request.UserId);
if (bankAccount == null) return;
bankAccount.Balance -= request.Amount;
await _context.SaveChangesAsync();
});
}
_logger.LogInformation($"转入补偿子事务-成功!");
return Ok();
}

编排Saga事务

拆分完子事务,最后就可以进行Saga事务编排了,其代码如下所示:

[HttpPost("Transfer")]
public async Task<IActionResult> Transfer(int fromUserId, int toUserId, decimal amount,
CancellationToken cancellationToken)
{
try
{
_logger.LogInformation($"转账事务-启动:用户{fromUserId}转账{amount}元到用户{toUserId}");
//1. 生成全局事务ID
var gid = await _dtmClient.GenGid(cancellationToken);
var bizUrl = _configuration.GetValue<string>("TransferBaseURL");
//2. 创建Saga
var saga = _transFactory.NewSaga(gid);
//3. 添加子事务
saga.Add(bizUrl + "/TransferOut", bizUrl + "/TransferOut_Compensate",
new TransferRequest(fromUserId, amount))
.Add(bizUrl + "/TransferIn", bizUrl + "/TransferIn_Compensate",
new TransferRequest(toUserId, amount))
.EnableWaitResult(); // 4. 按需启用是否等待事务执行结果
//5. 提交Saga事务
await saga.Submit(cancellationToken);
}
catch (DtmException ex) // 6. 如果开启了`EnableWaitResult()`,则可通过捕获异常的方式,捕获事务失败的结果。
{
_logger.LogError($"转账事务-失败:用户{fromUserId}转账{amount}元到用户{toUserId}失败!");
return new BadRequestObjectResult($"转账失败:{ex.Message}");
}
_logger.LogError($"转账事务-完成:用户{fromUserId}转账{amount}元到用户{toUserId}成功!");
return Ok($"转账事务-完成:用户{fromUserId}转账{amount}元到用户{toUserId}成功!");
}

主要步骤如下:

  1. 生成全局事务Id:var gid =await _dtmClient.GenGid(cancellationToken);
  2. 创建Saga全局事务:_transFactory.NewSaga(gid);
  3. 添加子事务:saga.Add(string action, string compensate, object postData);包含正向和反向子事务。
  4. 如果依赖事务执行结果,可通过EnableWaitResult()开启事务结果等待。
  5. 提交Saga全局事务:saga.Submit(cancellationToken);
  6. 若开启了事务结果等待,可以通过try...catch..来捕获DtmExcepiton异常来获取事务执行异常信息。

运行项目

既然DTM作为一个独立的服务存在,其负责通过HTTPgRPC协议发起子事务的调用,因此首先需要启动一个DTM实例,又由于本项目依赖MySQL,因此我们采用Docker Compose的方式来启动项目。在Visual Studio中通过右键项目->Add->Docker Support->Linux 即可添加Dockerfile如下所示:

FROM mcr.microsoft.com/dotnet/aspnet:6.0 AS base
WORKDIR /app
EXPOSE 80
EXPOSE 443
FROM mcr.microsoft.com/dotnet/sdk:6.0 AS build
WORKDIR /src
COPY ["DtmDemo.WebApi/DtmDemo.WebApi.csproj", "DtmDemo.WebApi/"]
RUN dotnet restore "DtmDemo.WebApi/DtmDemo.WebApi.csproj"
COPY . .
WORKDIR "/src/DtmDemo.WebApi"
RUN dotnet build "DtmDemo.WebApi.csproj" -c Release -o /app/build
FROM build AS publish
RUN dotnet publish "DtmDemo.WebApi.csproj" -c Release -o /app/publish
FROM base AS final
WORKDIR /app
COPY --from=publish /app/publish .
ENTRYPOINT ["dotnet", "DtmDemo.WebApi.dll"]

在Visual Studio中通过右键项目->Add Container Orchestrator Support->Docker Compose即可添加docker-compose.yml,由于整个项目依赖mysqlDTM,修改docker-compose.yml如下所示,其中定义了三个服务:db,dtm和dtmdemo.webapi。

version: '3.4'
services:
db:
image: 'mysql:5.7'
container_name: dtm-mysql
environment:
MYSQL_ROOT_PASSWORD: 123456 # 指定MySQL初始密码
volumes:
- ./docker/mysql/scripts:/docker-entrypoint-initdb.d # 挂载用于初始化数据库的脚本
ports:
- '3306:3306'
dtm:
depends_on: ["db"]
image: 'yedf/dtm:latest'
container_name: dtm-svc
environment:
IS_DOCKER: '1'
STORE_DRIVER: mysql # 指定使用MySQL持久化DTM事务数据
STORE_HOST: db # 指定MySQL服务名,这里是db
STORE_USER: root
STORE_PASSWORD: '123456'
STORE_PORT: 3306
STORE_DB: "dtm" # 指定DTM 数据库名
ports:
- '36789:36789' # DTM HTTP 端口
- '36790:36790' # DTM gRPC 端口
dtmdemo.webapi:
depends_on: ["dtm", "db"]
image: ${DOCKER_REGISTRY-}dtmdemowebapi
environment:
ASPNETCORE_ENVIRONMENT: docker # 设定启动环境为docker
container_name: dtm-webapi-demo
build:
context: .
dockerfile: DtmDemo.WebApi/Dockerfile
ports:
- '31293:80' # 映射Demo:80端口到本地31293端口
- '31294:443' # 映射Demo:443端口到本地31294端口

其中dtmdemo.webapi服务通过ASPNETCORE_ENVIRONMENT: docker指定启动环境为docker,因此需要在项目下添加appsettings.docker.json以配置应用参数:

 
{
"ConnectionStrings": {
"DtmDemoWebApiContext": "Server=db;port=3306;database=dtm_barrier;user id=root;password=123456;AllowLoadLocalInfile=true"
},
"TransferBaseURL": "http://dtmdemo.webapi/api/SagaDemo",
"dtm": {
"DtmUrl": "http://dtm:36789",
"DtmTimeout": 10000,
"BranchTimeout": 10000,
"DBType": "mysql",
"BarrierTableName": "dtm_barrier.barrier"
}
}

另外db服务中通过volumes: ["./docker/mysql/scripts:/docker-entrypoint-initdb.d"]来挂载初始化脚本,以创建DTM依赖的MySQL 存储数据库dtm和示例项目使用子事务屏障需要的barrier数据表。脚本如下:

CREATE DATABASE IF NOT EXISTS dtm
/*!40100 DEFAULT CHARACTER SET utf8mb4 */
;
drop table IF EXISTS dtm.trans_global;
CREATE TABLE if not EXISTS dtm.trans_global (
`id` bigint(22) NOT NULL AUTO_INCREMENT,
`gid` varchar(128) NOT NULL COMMENT 'global transaction id',
`trans_type` varchar(45) not null COMMENT 'transaction type: saga | xa | tcc | msg',
`status` varchar(12) NOT NULL COMMENT 'tranaction status: prepared | submitted | aborting | finished | rollbacked',
`query_prepared` varchar(1024) NOT NULL COMMENT 'url to check for msg|workflow',
`protocol` varchar(45) not null comment 'protocol: http | grpc | json-rpc',
`create_time` datetime DEFAULT NULL,
`update_time` datetime DEFAULT NULL,
`finish_time` datetime DEFAULT NULL,
`rollback_time` datetime DEFAULT NULL,
`options` varchar(1024) DEFAULT 'options for transaction like: TimeoutToFail, RequestTimeout',
`custom_data` varchar(1024) DEFAULT '' COMMENT 'custom data for transaction',
`next_cron_interval` int(11) default null comment 'next cron interval. for use of cron job',
`next_cron_time` datetime default null comment 'next time to process this trans. for use of cron job',
`owner` varchar(128) not null default '' comment 'who is locking this trans',
`ext_data` TEXT comment 'result for this trans. currently used in workflow pattern',
`result` varchar(1024) DEFAULT '' COMMENT 'rollback reason for transaction',
`rollback_reason` varchar(1024) DEFAULT '' COMMENT 'rollback reason for transaction',
PRIMARY KEY (`id`),
UNIQUE KEY `gid` (`gid`),
key `owner`(`owner`),
key `status_next_cron_time` (`status`, `next_cron_time`) comment 'cron job will use this index to query trans'
) ENGINE = InnoDB DEFAULT CHARSET = utf8mb4;
drop table IF EXISTS dtm.trans_branch_op;
CREATE TABLE IF NOT EXISTS dtm.trans_branch_op (
`id` bigint(22) NOT NULL AUTO_INCREMENT,
`gid` varchar(128) NOT NULL COMMENT 'global transaction id',
`url` varchar(1024) NOT NULL COMMENT 'the url of this op',
`data` TEXT COMMENT 'request body, depreceated',
`bin_data` BLOB COMMENT 'request body',
`branch_id` VARCHAR(128) NOT NULL COMMENT 'transaction branch ID',
`op` varchar(45) NOT NULL COMMENT 'transaction operation type like: action | compensate | try | confirm | cancel',
`status` varchar(45) NOT NULL COMMENT 'transaction op status: prepared | succeed | failed',
`finish_time` datetime DEFAULT NULL,
`rollback_time` datetime DEFAULT NULL,
`create_time` datetime DEFAULT NULL,
`update_time` datetime DEFAULT NULL,
PRIMARY KEY (`id`),
UNIQUE KEY `gid_uniq` (`gid`, `branch_id`, `op`)
) ENGINE = InnoDB DEFAULT CHARSET = utf8mb4;
drop table IF EXISTS dtm.kv;
CREATE TABLE IF NOT EXISTS dtm.kv (
`id` bigint(22) NOT NULL AUTO_INCREMENT,
`cat` varchar(45) NOT NULL COMMENT 'the category of this data',
`k` varchar(128) NOT NULL,
`v` TEXT,
`version` bigint(22) default 1 COMMENT 'version of the value',
create_time datetime default NULL,
update_time datetime DEFAULT NULL,
PRIMARY KEY (`id`),
UNIQUE key `uniq_k`(`cat`, `k`)
) ENGINE = InnoDB DEFAULT CHARSET = utf8mb4;
 
create database if not exists dtm_barrier
/*!40100 DEFAULT CHARACTER SET utf8mb4 */
;
drop table if exists dtm_barrier.barrier;
create table if not exists dtm_barrier.barrier(
id bigint(22) PRIMARY KEY AUTO_INCREMENT,
trans_type varchar(45) default '',
gid varchar(128) default '',
branch_id varchar(128) default '',
op varchar(45) default '',
barrier_id varchar(45) default '',
reason varchar(45) default '' comment 'the branch type who insert this record',
create_time datetime DEFAULT now(),
update_time datetime DEFAULT now(),
key(create_time),
key(update_time),
UNIQUE key(gid, branch_id, op, barrier_id)
);

准备完毕,即可通过点击Visual Studio工具栏的Docker Compose的启动按钮,启动后可以在Containers窗口看到启动了dtm-mysqldtm-svcdtm-webapi-demo三个容器,并在浏览器中打开了 http://localhost:31293/swagger/index.html Swagger 网页。该种方式启动项目是支持断点调试项目,如下图所示:

通过BankAccouts控制器的POST接口,初始化用户1和用户2各100元。再通过SagaDemo控制器的/api/Transfer接口,进行Saga事务测试。

  1. 用户1转账10元到用户2

由于用户1和用户2已存在,且用户1余额足够, 因此该笔转账合法因此会成功,其执行路径为:转出(成功)->转入(成功)-> 事务完成,执行日志如下图所示:

  1. 用户3转账10元到用户1

由于用户3不存在,因此执行路径为:转出(失败)->转出补偿(成功)->事务完成。从下图的执行日志可以看出,转出子事务失败,还是会调用对应的转出补偿操作,但子事务屏障会过进行过滤,因此实际上并不会执行真正的转出补偿逻辑,其中红线框住的部分就是证明。

  1. 用户1转账10元到用户3

由于用户3不存在,因此执行路径为:转出(成功)->转入(失败)->转入补偿(成功)->转出补偿(成功)->事务完成。从下图的执行日志可以看出,转入子事务失败,还是会调用对应的转入补偿操作,但子事务屏障会过进行过滤,因此实际上并不会执行真正的转入补偿逻辑,其中红线框住的部分就是证明。

子事务屏障

在以上的示例中,重复提及子事务屏障,那子事务屏障具体是什么,这里有必要重点说明下。以上面用户1转账10元到用户3为例,整个事务流转过程中,即转出(成功)->转入(失败)->转入补偿(成功)->转出补偿(成功)->事务完成。
在提交事务之后,首先是全局事务的落库,主要由DTM 服务负责,主要包括两张表:trans_globaltrans_branch_op,DTM 依此进行子事务分支的协调。其中trans_global会插入一条全局事务记录,用于记录全局事务的状态信息,如下图1所示。trans_branch_op表为trans_global的子表,记录四条子事务分支数据,如下图2所示:

具体的服务再接收到来自Dtm的子事务分支调用时,每次都会往子事务屏障表barrier中插入一条数据,如下图所示。业务服务就是依赖此表来完成子事务的控制。

而子事务屏障的核心就是子事务屏障表唯一键的设计,以gidbranch_idopbarrier_id为唯一索引,利用唯一索引,“以改代查”来避免竞态条件。在跨行转账的Saga示例中,子事务分支的执行步骤如下所示:

  1. 开启本地事务
  2. 对于当前操作op(action|compensate),使用inster ignore into barrier(trans_type, gid, branch_id, op, barrier_id, reason)向子事务屏障表插入一条数据,有几种情况:
    1. 插入成功且影响条数大于0,则继续向下执行。
    2. 插入成功但影响条数等于0,说明触发唯一键约束,此时会进行空补偿、悬挂和重复请求判断,若是则直接返回,跳过后续子事务分支逻辑的执行。
  3. 第2步插入成功,则可以继续执行子事务分支逻辑,执行业务数据表操作,结果分两种请求
    1. 子事务成功,子事务屏障表操作和业务数据表操作由于共享同一个本地事务,提交本地事务,因此可实现强一致性,当前子事务分支完成。
    2. 子事务失败,回滚本地事务

每个子事务分支通过以上步骤,即可实现下图的效果:

小结

本文主要介绍了DTM的Saga模式的应用,基于DTM 首创的子事务屏障技术,使得开发者基于DTM 提供的SDK能够轻松开发出更可靠的分布式应用,彻底将开发人员从网络异常的处理中解放出来,再也不用担心空补偿、防悬挂、幂等等分布式问题。如果要进行分布式事务框架的选型,DTM 将是不二之选。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/663602.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

真正的网工大佬,到底是什么样的?

大家好&#xff0c;我是许公子。 关于到底真正的网工大佬是什么样的&#xff0c;众说纷纭。 刚刚入行的小朋友&#xff0c;可能会觉得&#xff0c;是不是有HCIE的就算是网工大佬啊&#xff1f; 来几个老网工&#xff0c;评论区给他上一课哈哈。 就用这个点&#xff0c;跟你…

5年测试路,在字节终于爬到了半山腰,我不想被淘汰......

软件测试是一个付出就有回报的工作&#xff0c;可能很多人会说软件测试就是吃青春饭&#xff0c;然而其他工作又何尝不是&#xff1f;没有哪一家公司养尸位素餐之人&#xff0c;大龄员工有被辞退的&#xff0c;也有没被辞退的。干任何职业&#xff0c;抱着一劳永逸的心态&#…

MySQL----日志查询、备份与恢复

文章目录 一、MySQL日志管理二、MySQL 完全备份与恢复2.1备份的重要性2.2数据库备份的分类从物理与逻辑的角度从数据库的备份策略角度完全备份 三、MySQL 完全备份与恢复实验3.1物理冷备份与恢复3.2使用MySQL dump工具进行恢复备份恢复数据 3.3增量备份恢复 一、MySQL日志管理 …

LeetCod刷题笔记

目录 2739.总行驶距离 思路&#xff1a;模拟 代码 6890.找出分区值 思路&#xff1a;急转弯 代码: 1254.统计封闭岛屿的数目​编辑 思路&#xff1a;DFS 代码&#xff1a; 6447.给墙壁刷油漆 思路&#xff1a;动态规划 代码&#xff1a; 思路&#xff1a;状态DP 代码&…

Mac配置Android addr2line环境变量以及使用

1.首先进入终端 2.下面这个指令进入vim编辑器&#xff0c;就可以修改环境变量 vim ./.bash_profile3.按i进入insert模式 4.输入路径&#xff1a; arm32: export PATH${PATH}:/Users/xianquan/Library/Android/sdk/ndk/21.1.6352462/toolchains/arm-linux-androideabi-4.9/p…

Vue中如何进行自定义动画与动画效果设计

Vue中如何进行自定义动画与动画效果设计 在Vue中&#xff0c;动画效果是非常有用的&#xff0c;它可以使用户界面变得更加生动、有趣&#xff0c;从而提高用户体验。Vue提供了一套非常方便的动画系统&#xff0c;使得我们可以非常容易地实现动画效果。 在本文中&#xff0c;我…

6.19实训笔记

6.19实训笔记 6.19一、座右铭二、知识回顾2.1 Java集合体系2.2 工具类Utils 三、JavaIO流3.1 File类3.2 File类的使用3.2.1 File文件/文件夹类的创建3.2.2 File类的获取操作3.2.3 File类判断操作--boolean3.2.4 File类对文件/文件夹的增删改3.2.5 、File类的获取子文件夹以及子…

python+appium自动化测试-滑动到固定的位置停止

当前很多APP都存在滑动操作&#xff0c;但这些元素一般无法单独定位到&#xff0c;多为一个数组或列表&#xff0c;这边介绍了几种方法&#xff0c;使元素滑动到你想要的位置后停止。 一、scroll()方法 Appium 中webdriver提供scroll()方法来滚动页面&#xff0c;该方法只适用…

matplotlib---中文显示问题、字体库、图像结构、画布设置

1. 中文显示问题 解决方案一&#xff1a; 下载中文字体&#xff08;黑体&#xff0c;看准系统版本&#xff09; 步骤一&#xff1a;下载 SimHei 字体&#xff08;或者其他的支持中文显示的字体也行&#xff09; 步骤二&#xff1a;安装字体 linux下&#xff1a;拷贝字体到 usr…

三层架构综合实验

目录 拓扑结构&#xff1a; 要求&#xff1a; 确定广播域的个数 分配网段 配置Eth-Trunk 创建VLAN 配置STP生成树协议 修改根 边缘端口 SVI VRRP DHCP 路由部分 OSPF 缺省 汇总 NAT 拓扑结构&#xff1a; 要求&#xff1a; 1、内部IP地址基于172.16.0.0/16进行…

CABAC编解码原理分析

CABAC编解码原理分析 文章目录 CABAC编解码原理分析一、二进制算数编码二、CABAC编码三、CABAC编解码与普通的二元算术编码的区别四、 CABAC编解码中各个变量的计算&#xff1a;五、 一些其他问题&#xff1a;六、 总结&#xff1a;七、参考资料 一、二进制算数编码 cabac是一…

Nginx网络服务的配置

目录 一、Nginx概述 二、Nginx相对于Apache的优点 三、配置Nginx网络服务 1.编译安装和启用Nginx服务 2.修改Nginx主配置文件 一、Nginx概述 Nginx是一款高性能、轻量级Web服务软件。稳定性高&#xff0c;系统资源消耗低&#xff0c;对HTTP并发连接的处理能力高&#xff…

DNS 监控工具

域名系统 &#xff08;DNS&#xff09; 解析&#xff08;也称为 DNS 查找&#xff09;是在现代 IT 基础架构中建立连接和通信所需的基本组件之一。这是将人类可读的域或主机名与机器可读的 IP 地址映射的过程&#xff0c;使用户更容易访问组织的公共和专用网络上的主机。在最基…

SpringBoot 三级缓存解决循环依赖源码分析

文章目录 1. 不使用三级缓存可能存在的问题2. 源码分析2.1 对象实例的创建过程2.2 三级缓存的处理 3. 遗留问题 1. 不使用三级缓存可能存在的问题 在 SpringBoot 框架中&#xff0c;如果只存在两级缓存&#xff0c;那么当发生循环依赖的时候可能存在异常的对象创建流程如下图所…

如何解析 Impala 的 C++ 报错堆栈

生产环境用的都是release build&#xff0c;C代码产生的报错堆栈里没有函数名&#xff0c;很难像Java报错堆栈那样方便定位问题。下面是一个常见的启动报错&#xff0c;一般在CLASSPATH设置有误时发生&#xff1a; I0619 19:13:00.951988 5279 status.cc:129] Failed to find…

【全新升级版】R语言实战(第3版),超过30万学习者入手的R语言教程

在我刚入学那会儿初次接触R语言&#xff0c;看的第一本工具书就是《R语言实战》&#xff0c;收获良多&#xff0c;当时还只是第二版。最近和人民邮电出版社的好朋友交流发现&#xff0c;他告诉我上个月刚刚出版了《R语言实战 第三版》 &#xff0c;豆瓣评分9.2&#xff0c;被称…

Linux之生产者消费者模型(上)——单生产者单消费者

文章目录 前言一、生产者消费者模型1.生产消费2.生产消费关系321原则生产消费模型的特点 二、基于阻塞队列&#xff08;blockqueue&#xff09;的生产消费模型1.概念2.单生产单消费模型代码运行分析两种情况导致的现象生产者生产的慢&#xff0c;消费者消费的快生产者生产的快&…

精彩回顾 | “XR云新未来:弹性算力赋能可交互、沉浸式商业实践” 赋能云端虚拟世界

6月15日&#xff0c;由平行云联合首都在线共同主办&#xff0c;中关村软件园协办&#xff0c;以“XR云新未来|弹性算力赋能可交互、沉浸式商业实践”为主题的XR行业交流盛会在北京成功举办。 活动邀请多位XR行业大咖&#xff0c;共同见证首都在线联合平行云发布Cloud XR平台。…

MySQL数据库——索引

MySQL数据库——索引 一、索引基本常识1.索引的概念2.索引的作用3.创建索引的依据 二、索引的分类1.普通索引2.唯一索引3.主键索引4.组合索引5.全文索引 三、索引的查看与删除1.查看索引2.删除索引 一、索引基本常识 数据库索引是数据库管理系统中一个排序的数据结构&#xff0…

OpenGL 深度测试

1.简介 深度缓冲就像颜色缓冲(Color Buffer)&#xff08;储存所有的片段颜色&#xff1a;视觉输出&#xff09;一样&#xff0c;在每个片段中储存了信息&#xff0c;并且&#xff08;通常&#xff09;和颜色缓冲有着一样的宽度和高度。深度缓冲是由窗口系统自动创建的&#xf…