这篇博客来记录在发起全局事务回滚时,服务端接收到netty请求是如何处理的
1. 发起全局事务回滚请求
在前面的博客中,有说到过,事务发起者在发现分支事务执行异常之后,会提交全局事务回滚的请求到netty服务端,这里是发送全局事务回滚请求的代码,同样,这里的GlobalRollbackRequest这个对象很重要,可以看到,在netty客户端这里,只入参了XID,这个xid贯穿了全局上下文,在服务端就是根据这里xid,来确定是要处理哪个全局事务
2. 服务端处理逻辑
在前面netty全局事务开启服务端源码中,有介绍过,netty服务端接收到客户端的请求之后,最终会进入到这个方法中,根据请求的对象类型,来决定是有哪个对象来处理
io.seata.server.AbstractTCInboundHandler#handle(io.seata.core.protocol.transaction.GlobalRollbackRequest, io.seata.core.rpc.RpcContext)
在这个方法中,会继续调度
可以发现,在doGlobalRollback方法中,最终,调用了core.rollback()
2.1 io.seata.server.coordinator.DefaultCore#rollback
在这里执行回滚的时候,有三个步骤
- 根据xid,找到netty服务端的globalSession
- 调用globalSession的close()方法,这个方法只是把globalSession的active属性设置为false了
- 最后会调用doGlobalRollback()方法,最核心的逻辑在这个方法中
2.2 doGlobalRollback
io.seata.server.coordinator.DefaultCore#doGlobalRollback
下面这是执行全局事务回滚的核心代码
- 会先根据全局事务,获取到所有的分支事务
- 然后依次遍历所有的分支事务
- 如果分支事务是失败状态,就直接删除分支事务(从branchTable中删除)
- branchRollback 这里是通过netty请求,触发分支事务进行回滚的逻辑,这个下面单独说
- 接着就会对分支事务回滚成功/失败,做出相应的处理;如果处理成功,会把branchTable和lockTable中的数据删除
- 最后在所有分支事务都处理之后,对全局事务进行处理,endRollbacked 在这个方法中,会把globalTable中的全局事务删除
public boolean doGlobalRollback(GlobalSession globalSession, boolean retrying) throws TransactionException {
boolean success = true;
// start rollback event 这里没看懂,先跳过
eventBus.post(new GlobalTransactionEvent(globalSession.getTransactionId(), GlobalTransactionEvent.ROLE_TC,
globalSession.getTransactionName(), globalSession.getBeginTime(), null, globalSession.getStatus()));
// 判断当前事务使用的模式
if (globalSession.isSaga()) {
success = getCore(BranchType.SAGA).doGlobalRollback(globalSession, retrying);
} else {
for (BranchSession branchSession : globalSession.getReverseSortedBranches()) {
BranchStatus currentBranchStatus = branchSession.getStatus();
if (currentBranchStatus == BranchStatus.PhaseOne_Failed) {
/**
* 1.如果分支事务是failed状态,直接remove,就是从branchTable表中删除记录
*/
globalSession.removeBranch(branchSession);
continue;
}
try {
/**
* 2.调用客户端,进行分支事务回滚
* 如果回滚成功,会在removeBranch中先删除lockTable的记录
* 再删除branchTable的记录
*/
BranchStatus branchStatus = branchRollback(globalSession, branchSession);
switch (branchStatus) {
/**
* 这是rm正常进行事务回滚的返回结果,如果rm进行了sql回滚,并且undolog日志正常删除
* 那这里就会将lockTable和branchTable的记录删除
*/
case PhaseTwo_Rollbacked:
globalSession.removeBranch(branchSession);
LOGGER.info("Rollback branch transaction successfully, xid = {} branchId = {}", globalSession.getXid(), branchSession.getBranchId());
continue;
case PhaseTwo_RollbackFailed_Unretryable:
/**
* rm抛出unretryable异常的话,会执行这里,将全局事务删除?
*/
SessionHelper.endRollbackFailed(globalSession);
LOGGER.info("Rollback branch transaction fail and stop retry, xid = {} branchId = {}", globalSession.getXid(), branchSession.getBranchId());
return false;
default:
LOGGER.info("Rollback branch transaction fail and will retry, xid = {} branchId = {}", globalSession.getXid(), branchSession.getBranchId());
if (!retrying) {
globalSession.queueToRetryRollback();
}
return false;
}
} catch (Exception ex) {
StackTraceLogger.error(LOGGER, ex,
"Rollback branch transaction exception, xid = {} branchId = {} exception = {}",
new String[] {globalSession.getXid(), String.valueOf(branchSession.getBranchId()), ex.getMessage()});
if (!retrying) {
globalSession.queueToRetryRollback();
}
throw new TransactionException(ex);
}
}
// In db mode, there is a problem of inconsistent data in multiple copies, resulting in new branch
// transaction registration when rolling back.
// 1. New branch transaction and rollback branch transaction have no data association
// 2. New branch transaction has data association with rollback branch transaction
// The second query can solve the first problem, and if it is the second problem, it may cause a rollback
// failure due to data changes.
GlobalSession globalSessionTwice = SessionHolder.findGlobalSession(globalSession.getXid());
if (globalSessionTwice != null && globalSessionTwice.hasBranch()) {
LOGGER.info("Rollbacking global transaction is NOT done, xid = {}.", globalSession.getXid());
return false;
}
}
if (success) {
// 在这里的end方法中,会删除globalTable中的记录
SessionHelper.endRollbacked(globalSession);
// rollbacked event
eventBus.post(new GlobalTransactionEvent(globalSession.getTransactionId(), GlobalTransactionEvent.ROLE_TC,
globalSession.getTransactionName(), globalSession.getBeginTime(), System.currentTimeMillis(),
globalSession.getStatus()));
LOGGER.info("Rollback global transaction successfully, xid = {}.", globalSession.getXid());
}
return success;
}
2.3 分支事务回滚 branchRollback(globalSession, branchSession);
io.seata.rm.DefaultResourceManager#branchRollback
服务端会通过发送netty请求(在2.2 中的branchRollback),触发rm这边开始进行分支事务的回滚,最终会在rm这边的这个类中进行分支事务回滚的操作
在执行回滚操作的时候,会根据branchType获取对应的manager
在这里,会根据当前的dbType,找到对应的undoLogManager,然后调用其undo方法进行分支事务回滚
io.seata.rm.datasource.undo.AbstractUndoLogManager#undo
在这里的undo方法中,会生成回滚sql,执行,然后删除undolog日志,代码太长了,但是逻辑不多,就不贴代码了
2.4 删除分支事务
我们接着上面2.2的逻辑来看,在调用netty请求,rm这一端进行了事务回滚之后,对于seata服务端这边,只需要把分支事务和全局事务删除即可
在2.2 的代码中,会对netty请求返回的状态进行判断,branchStatus
如果是PhaseTwo_Rollbacked,表示分支事务处理成功,此时会进入到globalSession.removeBranch(branchSession);来进行处理
在这个方法中,有两个逻辑,解锁和removeBranch,那就不用想了,肯定是删除分支事务加的锁和删除分支事务
io.seata.server.session.BranchSession#unlock
io.seata.server.storage.db.lock.DataBaseLockManager#releaseLock
io.seata.server.storage.db.lock.DataBaseLocker#releaseLock(java.lang.String, java.lang.Long)
io.seata.server.storage.db.lock.LockStoreDataBaseDAO#unLock(java.lang.String, java.lang.Long)
对于释放锁的逻辑,比较简答,中间会调用的时候,会指定xid和branchId,然后去构建删除的sql
我们接着来看,删除分支事务的代码
这就是分支事务删除的逻辑;这个逻辑也比较简单,大部分都是共用的逻辑,就不做过多的介绍了
2.5 全局事务删除
SessionHelper.endRollbacked(globalSession);在这个方法中,会处理全局事务
这里看起来也有两个逻辑,在clean()方法中,我最开始没有看懂这个clean中要做什么,因为在clean中,就直接调用了下面截图中的这个releaselock方法,看到这个方法中的这行代码之后,我好像明白了,这里是再尝试将所有分支事务的锁进行释放的逻辑
我们接着来看,onEnd()的逻辑:
这就是全局事务删除的逻辑
总结
对于分布式事务,进行全局回滚的逻辑,还是比较简单的
- 在事务管理者这边发起全局事务回滚的请求
- seata服务端,也就是事务协调者接收到请求之后,会向所有的rm,发起netty请求,进行分支事务的处理
- 各个分支事务,会根据undoLog日志,生成回滚的sql,然后执行
- seata服务端在等分支事务返回状态之后,会先删除分支事务,在删除分支事务之前,会将分支事务加的锁给删除(这里的锁,如果是mysql的话,其实就是写入到mysql中的一条记录)
- 最后会去处理globalSession,将globalSession从mysql表中删除
所以其实分支事务回滚的时候,就是把全局事务 + 分支事务 + 锁 从mysql表中删除