AT模式
整体架构
使用示例
这里以 Dubbo + Seata 为例,微服务架构体系是分布式事务的常见运用场景,多个微服务分散到不同的机器上,通过远程调用串联起来,此时如何使用Seata建立起分布式事务呢?
Seata的AT模式做到了业务上的0侵入性 ,这点就要比TCC好很多。微服务一般有一个调用发起方,如下图所示就是Business,其通过RPC调用Storage、Order、Account等多个微服务。
针对这种场景,如果我们要开启一个AT模式的分布式事务,将这几个微服务的操作合并为一个全局性事务,只需要在发起RPC调用的方法,即Business的方法加上@GlobalTransactional即可
至于StockService和OrderService,什么都不用加。
那StockService和OrderService是如何感知到自己属于分布式事务中的一环的呢?答案是,在BusinessService进行RPC调用时,RPC Filter中会将全局事务id xid 写入到上下文中,而StockService和StockService收到RPC请求后,如果发现上下文中有 xid,就能够得知当前属于分布式事务,并且全局事务id也可以得到。
仅仅得到全局事务id,就可以执行分布式事务的一系列逻辑了吗?是的,由于Seata在启动时对所有的DataSource接口进行了增强(具体参考 SeataAutoDataSourceProxyCreator),所以所有的DataSource都被增强为了一个叫做DataSourceProxy的类(当然,也可以不这么粗暴,而是直接将需要用到的DataSourceProxy注入到DataSource中,就像下面这样
)。这样一来,我们的JDBC操作会被代理类拦截处理,代理类一旦发现有全局事务id xid,就会执行分布式事务逻辑,如下图,红框表示不达成分布式事务的条件,此时会执行普通的JDBC。
源码分析
初始化流程
这里以seata-dubbo为例
配置分析
主业务配置 dubbo-business.xml
除了一些依赖到的服务接口bean外,还需额外注册GlobalTransactionScanner
服务配置 dubbo-account-service.xml、dubbo-order-service.xml、dubbo-stock-service.xml
这几个的不同点在于数据源的不同,其余思路都一样,主要是对数据源进行代理 ,除此之外还需注册GlobalTransactionScanner
GlobalTransactionScanner bean注入
TM、RMClient初始化
从配置分析过程中可以看出,每一个应用都初始化了一个GlobalTransactionScanner bean
GlobalTransactionScanner在初始化过程中,建立了Netty Client,分为TMClient和RMClient。从架构图中我们可以看出,每个应用属于的角色可能是RM,也可能是TM;不管是RM还是TM,这些应用要与TC进行RPC通信,所以初始化主要是为了建立起当前应用与TC进行RPC通信的客户端
那么TC的ip和port如何获取呢?在配置文件中似乎没有看到注册TC相关的服务bean呀?答案蕴含在GlobalTransactionScanner bean 的构造函数中的 my_test_tx_group,这个是Seata 事务服务逻辑分组,此分组通过配置中心配置项 service.vgroup_mapping.my_test_tx_group 映射到相应的 Seata-Server集群名称,然后再根据集群名称.grouplist 获取到可用服务列表。在工程的文件配置file.conf中,就发现了TC对应的ip和port
@GlobalTransactional注解扫描
GlobalTransactionScanner 还有一个wrapIfNecessary的方法,该方法会扫描所有的bean,并对符合条件的bean进行增强。GlobalTransactionScanner 设置的扫描条件是,类上含有@GlobalTransactional或者类中的方法含有@GlobalTransactional注解。如果有@GlobalTransactional注解的话,就会用GlobalTransactionalInterceptor对方法的调用进行拦截,拦截的逻辑是,对于带有@GlobalTransactional注解的方法,执行handleGlobalTransaction
最终来到了TransactionalTemplate # execute
整体对方法的AOP增强如下所示,其中business.execute()为执行方法逻辑
在begin方法中,会执行XID的生成逻辑(XID # generateXID),主要逻辑为ip + port + uuid
DataSourceProxy初始化
102行初始化资源id,资源id根据 jdbc url 生成
103行将资源注册到TC上
那么某一个资源怎么知道当前的全局事务id呢?答案是通过ConnectionContext连接上下文,而连接上下文的资源id是通过RootContext得到的,而RootContext通过CONTEXT_HOLDER这个Threadlocal加载得到。CONTEXT_HOLDER什么时候放入的xid呢?当然是在收到RPC请求的RPC Filter中。
前置镜像与后置镜像
我们知道,Seata是根据事务执行过程中生成的的前置镜像和后置镜像来完成事务回滚操作的。
该方法完成了以下重要工作:
- 生成前置镜像
- 执行原始SQL语句
- 生成后置镜像
- 准备事务日志
镜像内容
这里的前置镜像、后置镜像对应的Java对象是TableRecords,TableRecords里面记录了表元数据TableMeta,和镜像中的列集合List<Row>。TableMeta里面有所有列和所有索引的元数据ColumnMeta、IndexMeta;Row里面有该行字段的集合List<Field>,Field中有该字段的类型、值等信息。TableRecords中的表元数据TableMeta可以通过JVM缓存得到
如果缓存中拿不到,就会通过查询数据库获取表的元数据信息
。至于List<Row>,则是通过查询本次SQL执行目标的几行数据,然后再进行逐行、逐字段解析的。那么为什么要获取到这么详细的表数据、行数据、字段数据的信息的结合体TableRecords呢?这是为了更好的进行数据回滚前的比对和数据恢复。
镜像生成
insert语句:前置镜像为空,后置镜像为插入的若干行数据
update语句:前置镜像为更新前的若干行数据,后置镜像为更新后的若干行数据
delete语句:前置镜像为删除前的若干行数据,后置镜像为空
前置镜像与后置镜像的构建逻辑类似。这里分析前置镜像
构建前置镜像的关键是,找到业务SQL执行前的目标的几行数据,后置镜像则是找到业务SQL执行后的目标几行数据。
buildBeforeImageSQL构建了查询 业务SQL执行的目标行 的SQL语句,比如业务SQL目标对 a,b,c三行进行更新,那buildBeforeImageSQL的职责就是构建一个查询SQL,该SQL的执行结果是返回a,b,c三行
至于怎么构建,可以看到,是通过构建一个SELECT xxx FROM xxx WHERE xxx ORDER BY xxx LIMIT xxx FOR UPDATE,其中xxx的内容来自sqlrecognizer对业务SQL的分析,查询SQL的xxx应当与业务SQL的xxx保持一致。
那么前置镜像和后置镜像生成好之后存储在什么地方呢?它们会被暂时存储在ConnectionContext的sqlUndoItemsBuffer中,Seata会关闭本地事务的自动提交,这样提交的控制权就移交给了Seata,Seata会先将生成的镜像通过UndoLogManager的flushUndoLogs方法刷写到数据库的一个专门的表中,然后再进行本地事务的提交。UndoLogManager的undo方法会读取表中的镜像并执行回滚操作,执行的时机是在TC通知RM进行全局事务的回滚时。
构建好查询SQL之后,buildTableRecords做的其实就是执行查询SQL,并通过TableRecords.buildRecords将返回的结果集和表元数据结合,生成TableRecords镜像
准备事务日志
prepareUndoLog方法主要做了两件事情:
- 构建全局锁并放入connectionProxy中
- 构建undoLog并放入connectionProxy中
全局锁的格式为表名:主键值 1,主键值2
一个本地事务可能会有多条SQL语句,每条SQL语句都可能生成行锁数据,Seata需要将每个事务的行锁数据合并成一个大字符串,即调用多次appendLockKey。
undoLog也同理,一个事务中的所有undoLog都会合并成一个,体现在事务日志表中的只有一行数据。
数据库操作代理
Seata对java.sql中的DataSource、Connection、Statement、PrepareStatement接口进行了静态代理,便于在sql执行的前后进行额外的操作(如分支事务的注册、分支状态的回报、全局锁的查询、事务日志的生成和插入等等)
总结来看,DataSourceProxy代理了数据源的创建流程,在数据源初始化时,完成资源ID的生成和向资源管理器注册当前资源,便于后续对该资源上的事务进行提交、回滚等操作。ConnectionProxy代理了连接的建立以及当前连接事务的提交回滚流程,维护当前连接的分支事务的状态,并拦截事务的提交或回滚,进行全局锁的获取、释放等操作。StatementProxy代理了SQL的执行流程,在真正的执行SQL的前后执行分布式事务的前后置镜像生成等代理逻辑。
DataSourceProxy
DataSourceProxy实现了DataSource接口,这使得用户可以直接将其注入DataSource中,并完成DataSourceProxy中定义的SQL的拦截逻辑。同时,DataSourceProxy还实现了Resource接口,这使得其可以作为一个资源注册到TC中。
DataSourceProxy还实现了Wrapper接口,这表明可以通过unwrap方法拿到原生的代理对象,执行绕过代理类执行原生操作。
在DataSourceProxy的init方法中,会调用DefaultResourceManager.get().registerResource(this)将自身注册到ResourceManager,并且开启定时任务刷新表元数据的缓存
ResourceManager
资源管理器,不同模式下具有不同实现。AT模式下的实现是DataSourceManager,TCC模式下的实现为TCCResourceManager,Saga模式下的实现为SagaResourceManager,XA模式下的实现为ResourceManagerXA。ResourceManager接口定义了4个方法
BranchType的含义是分支类型,比如AT、TCC、XA等。除此之外,ResourceManager还继承了ResourceManagerInbound、ResourceManagerOutbound接口。ResourceManagerInbound接口定义了ResourceManager的对内操作,它负责接收TC发来的请求,进行分支事务的提交或回滚操作。而ResourceManagerOutbound定义了ResourceManager的对外操作,负责向TC注册分支事务、主动上报分支事务的状态、查询全局锁等操作。DataSourceProxy的init方法中的注册分支操作就是对ResourceManagerOutbound的实现
资源注册
资源的注册操作主要是将资源ID和DataSourceProxy的映射关系放入内存中,然后向TC发起远程调用,TC会建立起资源ID与连接的对应关系,便于后续TC向对应资源发起提交或回滚请求。
ConnectionProxy
Connection是数据库连接,每次通过DataSource获取到Connection,就说明要进行一些事务性的操作。因此在ConnectionProxy这一层,出现了事务的概念。ConnectionProxy中定义了连接上下文对象ConnectionContext,它里面存储了当前分支事务的一些状态,比如xid,branchId,undolog镜像等。ConnectionProxy代理的主要目的就是维护分支事务的状态,并拦截事务的提交或回滚,进行全局锁的获取、释放等操作。
下列代码是ConnectionProxy对commit操作的实现,事务提交时会执行commit方法。可以看到,在正式提交doCommit之前,包上了一层全局锁的冲突处理逻辑lockRetryPolicy。全局锁是对当前事务涉及到的行(Row)所加的锁,是为了避免多个事务同时修改某行数据造成的数据不一致。如果发生锁冲突,会按照lockRetryPolicy中定义的策略进行重试
而doCommit中则根据当前本地事务是否处于某个全局事务,分出了不同的逻辑
如果处于全局事务,则执行processGlobalTransactionCommit,如果不处于全局事务,但是需要获取全局锁,则执行processLocalCommitWithGlobalLocks。否则,执行普通的事务提交。
processGlobalTransactionCommit
主要做了4件事:注册分支事务、刷写undoLog到数据库表中、提交本地事务、上报分支事务的状态
其中注册分支事务是向TC发起请求
可以看到,里面有buildLockKeys的操作,说明分支事务的注册需要获取到全局锁。如果获取失败,自然就不能进行下面的本地事务提交等操作了。全局锁lock_table
全局锁:当前连接上下文中,拿到的所有的行记录的主键,从lock_table中查,查到的xid都是本次全局事务的xid,则说明拿到了全局锁。
获取全局锁:开启一个事务,然后查数据库,所有的row_key,要么属于当前全局事务,要么不存在记录。如果不存在,就新增一条记录,xid为当前全局事务。所有操作成功后,提交事务并成功获取全局锁。否则回滚事务,获取全局锁失败。
processLocalCommitWithGlobalLocks
该方法在本地事务提交之前获取了全局锁。这个 方法主要是为了支持分布式事务的读已提交隔离级别。我们在需要读已提交隔离级别的事务方法上加上需要全局锁的注解标识,这样一来,该事务读到的数据就一定是全局事务中已提交的(否则就拿不到全局锁 )。默认情况下,一个其他事务读取所属全局事务未提交,但本地事务已提交的数据,是可以读到的。
StatementProxy
StatementProxy的目的主要是代理SQL的执行逻辑,在真正的执行SQL的前后执行分布式事务的前后置镜像生成等代理逻辑
StatementProxy代理了execute等方法逻辑,在execute中将sql的执行转交给ExecuteTemplate
这个方法的主要逻辑是,如果这个SQL语句不在分布式事务中,并且也没有查询全局锁的要求,则不需要将其纳入Seata框架进行处理,用原始的Statement方法进行直接处理即可;如果这个SQL语句在分布式事务中,则将其纳入Seata框架进行处理,并根据不同SQL语句类型选用不同的执行器来执行。
Seata框架处理的SQL语句包括insert、update、delete、select for update。普通的select语句用原始的Statement方法直接处理,而select for update语句需要查询Seata全局锁,它默认工作在读已提交隔离级别。
SQLRecognizer包含了该条SQL的详细结构信息,它是通过SQL解析工具解析SQL得到的。这里可以选择依赖不同的解析工具,比如Druid或者Antrl
那么Executor的execute又做了哪些事情呢?这里追踪到BaseTransactionalExecutor#execute
execute绑定了全局事务id xid,然后又设置了是否需要全局锁的参数,最后执行doExecute,doExecute由AbstractDMLBaseExecutor实现
doExecute针对getAutoCommit执行不同的逻辑。getAutoCommit返回的是当前是否支持自动提交。如果executeAutoCommitTrue中,会将自动提交关掉,然后执行executeAutoCommitFalse。所以最终还是要看executeAutoCommitFalse方法,这里就回到了上面的前置镜像与后置镜像生成的内容了。这里为什么要关掉自动提交呢?因为自动提交情况下,每一条SQL都被当成一个事务进行提交一次,而Seata框架需要在提交前生成前置镜像与后置镜像并插入数据库表,并且保证这一操作和业务sql处于同一事务中,所以需要关掉自动提交。
二阶段
由于在数据源初始化步骤中,已经将数据源注册到TC,所以TC可以对这些数据源发起RPC,进行全局事务的提交或者回滚
二阶段提交
当所有分支事务都本地提交成功后,TC会释放全局锁,随后对这些分支事务的数据源发起二阶段提交命令。RM在收到二阶段提交指令后,只需删除保存的事务日志数据即可。为了提升性能,RM会立即返回提交成功,并在异步线程中进行日志删除
DataSourceManager#branchCommit
AsyncWorker#branchCommit
addToCommitQueue会通过CompletableFuture开启一个异步任务:doBranchCommitSafely
最终会来到dealWithGroupedContexts
在dealWithGroupedContexts中会对undoLog进行分批删除
二阶段回滚
当有本地事务执行失败时,TC会对所有提交过的RM发起回滚请求
DataSourceManager#branchRollback
回滚是通过UndoLogManager#undo来进行的,undo会通过前置镜像生成回滚sql语句,在执行回滚sql语句前,会通过dataValidationAndGoOn方法判断是否需要进行回滚并校验脏写。脏写即后置镜像与当前数据库中的数据不一致,一旦发生了脏写,说明有人绕过Seata框架对数据库做了操作,此时需要人工进行修正处理。在确认无脏写后,会执行通过前置镜像生成的回滚SQL语句,完成分支事务的回滚。
TCC模式
与AT对比
二者都是一个两提交模型。分为一阶段prepare和二阶段commit/rollback,主要区别在于是自动实现的这三个行为(prepare、commit、rollback)还是自定义实现的这三个行为 。既然二者都是同一个模型,也就是说,在Seata的架构中 ,两种模式的分支事务可以共存在一个分布式事务中:这个RM既可以是AT模式的,也可以是TCC模式的。在AT模式下,Seata把每一个数据库当作一个资源;在TCC模式下,Seata把每一个服务接口当作一个资源
使用示例
TwoPhaseBusinessAction注解定义在接口方法而不是实现上,这和Hmily不同
TccActionOne、TccActionTwo是两个RPC服务接口,服务的消费方需要持有这两个接口的带有注解的api,而服务提供方的接口实现则不需要与注解进行耦合
业务方法持有RPC接口Bean,并像正常的RPC调用一样对他们进行方法调用即可
源码分析
拦截注解 - @TwoPhaseBusinessAction
当服务消费方启动并扫描到这两个RPC接口Bean时,会将它们作为资源进行注册。下图为GlobalTransactionScanner#wrapIfNecessary,可以看到第250行,isTccAutoProxy会判断bean是否是TCC远程调用Bean,如果是的话,会在parserRemotingServiceInfo中创建tccResource并调用DefaultResourceManager.get().registerResource(tccResource)进行注册。
当扫描到的bean符合TCC代理条件并成功注册资源后 ,会对该bean的调用使用 TccActionInterceptor 进行增强
下图为TccActionInterceptor#invoke,第96行,调用了ActionInterceptorHandler#proceed
分支事务注册
如下图,在ActionInterceptorHandler#proceed中,第88行对目标方法,即RPC,进行了调用。在调用前,第70行,会创建分支事务并对分支事务进行注册
下图为doTccActionLogStore,对上下文进行初始化之后,第121行,将分支事务注册到TC
二阶段提交/回滚
当执行完业务方法后,一般是成功结束或者抛出异常,此时TM就会通知TC进行二阶段confirm/cancel,在TCC模式下对应TCCResourceManager#branchCommit以及branchRollback。以branchCommit为例,在第97行,对目标bean和目标confirm方法进行调用
TC事务协调者
服务端启动的入口在io.seata.server.Server#mainv
main函数中对Netty Server以及事务协调器 DefaultCoordinator进行初始化。
DefaultCoordinator使用GlobalSession来承载事务,一个GlobalSession代表一个事务。所有事务被SessionManager管理。在main方法中会对SessionHolder进行初始化,里面就会创建SessionManager ROOT_SESSION_MANAGER
请求处理
DefaultCoordinator的一个重要角色是请求消息处理器,负责处理TM、RM发送的请求。TCInboundHandler接口中定义了TC可以处理哪些消息
/**
* The interface Tc inbound handler.
*
* @author sharajava
*/
public interface TCInboundHandler {
/**
* Handle global begin response.
*
* @param globalBegin the global begin
* @param rpcContext the rpc context
* @return the global begin response
*/
GlobalBeginResponse handle(GlobalBeginRequest globalBegin, RpcContext rpcContext);
/**
* Handle global commit response.
*
* @param globalCommit the global commit
* @param rpcContext the rpc context
* @return the global commit response
*/
GlobalCommitResponse handle(GlobalCommitRequest globalCommit, RpcContext rpcContext);
/**
* Handle global rollback response.
*
* @param globalRollback the global rollback
* @param rpcContext the rpc context
* @return the global rollback response
*/
GlobalRollbackResponse handle(GlobalRollbackRequest globalRollback, RpcContext rpcContext);
/**
* Handle branch register response.
*
* @param branchRegister the branch register
* @param rpcContext the rpc context
* @return the branch register response
*/
BranchRegisterResponse handle(BranchRegisterRequest branchRegister, RpcContext rpcContext);
/**
* Handle branch report response.
*
* @param branchReport the branch report
* @param rpcContext the rpc context
* @return the branch report response
*/
BranchReportResponse handle(BranchReportRequest branchReport, RpcContext rpcContext);
/**
* Handle global lock query response.
*
* @param checkLock the check lock
* @param rpcContext the rpc context
* @return the global lock query response
*/
GlobalLockQueryResponse handle(GlobalLockQueryRequest checkLock, RpcContext rpcContext);
/**
* Handle global status response.
*
* @param globalStatus the global status
* @param rpcContext the rpc context
* @return the global status response
*/
GlobalStatusResponse handle(GlobalStatusRequest globalStatus, RpcContext rpcContext);
/**
* Handle global report request.
*
* @param globalReport the global report request
* @param rpcContext the rpc context
* @return the global report response
*/
GlobalReportResponse handle(GlobalReportRequest globalReport, RpcContext rpcContext);
}
这里以全局事务提交为例,分析请求处理过程
全局事务提交
全局事务提交由TM触发,当@GlobalTranscational的方法顺利结束后,会由TM向TC发起GlobalCommitRequest,TC收到请求后,会调用DefaultCore.commit方法,最终来到DefaultCore.doGlobalCommit方法。该方法会对分支事务进行遍历并调用branchCommit
branchCommit会对相应分支的资源RM发起RPC,进行二阶段提交
定时任务
在DefaultCoordinator中还定义了5个线程池用来执行定时任务:重试回滚事务、重试提交事务、异步提交事务、事务超时检查、批量删除资源侧事务日志
在init方法中,对定时任务进行了启动