【Seata源码学习 】篇六 全局事务提交与回滚
全局事务提交
TM在RPC远程调用RM后,如果没有出现异常,将向TC发送提交全局事务请求io.seata.tm.api.TransactionalTemplate#execute
public Object execute(TransactionalExecutor business) throws Throwable {
// 1. Get transactionInfo
//获取@GlobalTransation注解的属性封装的TransactionInfo
TransactionInfo txInfo = business.getTransactionInfo();
if (txInfo == null) {
throw new ShouldNeverHappenException("transactionInfo does not exist");
}
// 1.1 Get current transaction, if not null, the tx role is 'GlobalTransactionRole.Participant'.
// GlobalTransactionContext 全局事务上下文对象 用于创建一个新事务,或者获取当前事务
// GlobalTransactionContext.getCurrent - > RootContext.getXID -> ContextCore.get
// ContextCore 是一个接口 seata有两个实现 FastThreadLocalContextCore ThreadLocalContextCore 都是基于ThreadLocal存储XID
GlobalTransaction tx = GlobalTransactionContext.getCurrent();
// 1.2 Handle the transaction propagation.
// 获取当前事务的传播行为
Propagation propagation = txInfo.getPropagation();
// 存储被挂起的事务XID
SuspendedResourcesHolder suspendedResourcesHolder = null;
try {
//处理事务的传播行为
switch (propagation) {
//如果当前事务的传播行为是 NOT_SUPPORTED 则以非事务的方式执行调用methodInvocation.proceed()
// 如果当前拦截器不为拦截链的最后一个,则将获取下一个拦截器执行invoke方法,如果是最后一个,则直接执行目标方法
case NOT_SUPPORTED:
// If transaction is existing, suspend it.
//如果当前存在全局事务,则挂起当前事务
if (existingTransaction(tx)) {
suspendedResourcesHolder = tx.suspend();
}
// Execute without transaction and return.
// 责任链模式 继续执行拦截器链
return business.execute();
case REQUIRES_NEW:
// If transaction is existing, suspend it, and then begin new transaction.
// 如果当前存在事务 则挂起当前事务 并创建一个新的事务
if (existingTransaction(tx)) {
suspendedResourcesHolder = tx.suspend();
tx = GlobalTransactionContext.createNew();
}
// Continue and execute with new transaction
break;
case SUPPORTS:
// If transaction is not existing, execute without transaction.
// 如果不存在事务 则跳过当前事务拦截器 执行拦截器链并返回
if (notExistingTransaction(tx)) {
return business.execute();
}
// Continue and execute with new transaction
break;
case REQUIRED:
// If current transaction is existing, execute with current transaction,
// else continue and execute with new transaction.
break;
case NEVER:
// If transaction is existing, throw exception.
// 有事务抛出异常
if (existingTransaction(tx)) {
throw new TransactionException(
String.format("Existing transaction found for transaction marked with propagation 'never', xid = %s"
, tx.getXid()));
} else {
// Execute without transaction and return.
return business.execute();
}
case MANDATORY:
// If transaction is not existing, throw exception.
// 要求必须有事务,没事务抛出异常
if (notExistingTransaction(tx)) {
throw new TransactionException("No existing transaction found for transaction marked with propagation 'mandatory'");
}
// Continue and execute with current transaction.
break;
default:
throw new TransactionException("Not Supported Propagation:" + propagation);
}
// 1.3 If null, create new transaction with role 'GlobalTransactionRole.Launcher'.
// 如果当前的事务上下文中不存在事务 那么此次事务发起为 TM 角色为 Launcher
if (tx == null) {
tx = GlobalTransactionContext.createNew();
}
// set current tx config to holder
// 记录当前的全局锁配置,存放到 ThreadLocal
GlobalLockConfig previousConfig = replaceGlobalLockConfig(txInfo);
try {
// 2. If the tx role is 'GlobalTransactionRole.Launcher', send the request of beginTransaction to TC,
// else do nothing. Of course, the hooks will still be triggered.
// 执行全局事务开启的前后置钩子方法
// 如果当前事务的角色是 Participant 也就是 RM ,判断当前事务上下文RootContext是否存在XID,如果不存在,抛出异常
// 如果当前事务的角色是 launcher 也就是 TM ,判断当前事务上下文RootContext是否存在XID,如果存在,抛出异常
// 如果不存在,则通过TmNettyRemotingClient 向TC发送一个 GlobalReportRequest 同步消息,并获取TC返回的XID,绑定到RootContext
beginTransaction(txInfo, tx);
Object rs;
try {
// Do Your Business
// 执行执行拦截器链路
rs = business.execute();
} catch (Throwable ex) {
// 3. The needed business exception to rollback.
// 如果抛出异常,判断异常是否在指定的范围中(默认为Throwable类及其子类)
// 执行异常回滚的前后钩子方法
// 如果当前事务的角色是 launcher 也就是 TM ,通过TmNettyRemotingClient 向TC发送一个 GlobalRollbackRequest 同步消息
// 并记录TC返回的当前事务状态Status
completeTransactionAfterThrowing(txInfo, tx, ex);
throw ex;
}
// 4. everything is fine, commit.
// 如果方法执行过程中没有出现异常
// 执行事务提交的前后置方法
// 如果当前事务的角色是 launcher 也就是 TM ,通过TmNettyRemotingClient 向TC发送一个 GlobalCommitRequest 同步消息 提交全局事务
// 并记录TC返回的当前事务状态Status
commitTransaction(tx);
return rs;
} finally {
//5. clear
// 恢复以前的全局锁配置
resumeGlobalLockConfig(previousConfig);
// 执行整个事务完成的前后置方法
triggerAfterCompletion();
// 移除当前绑定的事务钩子对象
cleanUp();
}
} finally {
// If the transaction is suspended, resume it.
// 当前事务执行完毕后,恢复挂起的事务,
// 获取suspendedResourcesHolder关联的xid,由RootContext重新绑定
if (suspendedResourcesHolder != null) {
tx.resume(suspendedResourcesHolder);
}
}
}
事务提交前后钩子方法执行
在全局事务提交前后,seata给我们预留了两个钩子方法,可以根据实际生产中的业务需要进行扩展
io.seata.tm.api.TransactionalTemplate#commitTransaction
private void commitTransaction(GlobalTransaction tx) throws TransactionalExecutor.ExecutionException {
try {
//全局事务提交前钩子方法
triggerBeforeCommit();
//全局事务提交
tx.commit();
//全局事务提交后钩子方法
triggerAfterCommit();
} catch (TransactionException txe) {
// 4.1 Failed to commit
throw new TransactionalExecutor.ExecutionException(tx, txe,
TransactionalExecutor.Code.CommitFailure);
}
}
TM提交全局事务
io.seata.tm.api.DefaultGlobalTransaction#commit
public void commit() throws TransactionException {
// 全局事务提交必须是 TM发起的
if (role == GlobalTransactionRole.Participant) {
// Participant has no responsibility of committing
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("Ignore Commit(): just involved in global transaction [{}]", xid);
}
return;
}
//XID不能为空
assertXIDNotNull();
// 全局事务提交失败默认重试5次
int retry = COMMIT_RETRY_COUNT <= 0 ? DEFAULT_TM_COMMIT_RETRY_COUNT : COMMIT_RETRY_COUNT;
try {
while (retry > 0) {
try {
retry--;
//由 TransactionManager 对事务进行提交操作
status = transactionManager.commit(xid);
break;
} catch (Throwable ex) {
LOGGER.error("Failed to report global commit [{}],Retry Countdown: {}, reason: {}", this.getXid(), retry, ex.getMessage());
if (retry == 0) {
throw new TransactionException("Failed to report global commit", ex);
}
}
}
} finally {
if (xid.equals(RootContext.getXID())) {
//挂起事务 -》 将当前的XID解绑,将XID封装到SuspendedResourcesHolder中
suspend();
}
}
if (LOGGER.isInfoEnabled()) {
LOGGER.info("[{}] commit status: {}", xid, status);
}
}
全局事务提交失败默认会重试5次,seata对事务的操作都是交由TransactionManager接口处理,TC、TM、RM分别有不同的实现类,具体如下图所示
io.seata.tm.DefaultTransactionManager#commit
public GlobalStatus commit(String xid) throws TransactionException {
GlobalCommitRequest globalCommit = new GlobalCommitRequest();
//绑定全局事务XID
globalCommit.setXid(xid);
//向TC发送GlobalCommitRequest消息
GlobalCommitResponse response = (GlobalCommitResponse) syncCall(globalCommit);
return response.getGlobalStatus();
}
TC处理全局事务提交
TC接收到消息后由ServerHandler调用processMessage处理,将根据消息的类型从pair中匹配合适的RemotingProcessor处理器; GlobalCommitResponse 消息将匹配ServerOnRequestProcessor处理器,ServerOnRequestProcessor进而又将消息由TransactionMessageHandler进行处理。
TransactionMessageHandler 也是一个接口,TC将使用的是DefaultCoordinator实现类,最终将消息向上转型为AbstractTransactionRequestToTC,调用不同消息子类的handle进行处理。
io.seata.core.protocol.transaction.GlobalCommitRequest#handle
public AbstractTransactionResponse handle(RpcContext rpcContext) {
return handler.handle(this, rpcContext);
}
io.seata.server.AbstractTCInboundHandler#handle(io.seata.core.protocol.transaction.GlobalCommitRequest, io.seata.core.rpc.RpcContext)
public GlobalCommitResponse handle(GlobalCommitRequest request, final RpcContext rpcContext) {
GlobalCommitResponse response = new GlobalCommitResponse();
response.setGlobalStatus(GlobalStatus.Committing);
exceptionHandleTemplate(new AbstractCallback<GlobalCommitRequest, GlobalCommitResponse>() {
@Override
public void execute(GlobalCommitRequest request, GlobalCommitResponse response)
throws TransactionException {
try {
//真正开始执行全局事务提交
doGlobalCommit(request, response, rpcContext);
} catch (StoreException e) {
throw new TransactionException(TransactionExceptionCode.FailedStore,
String.format("global commit request failed. xid=%s, msg=%s", request.getXid(), e.getMessage()),
e);
}
}
@Override
public void onTransactionException(GlobalCommitRequest request, GlobalCommitResponse response,
TransactionException tex) {
super.onTransactionException(request, response, tex);
checkTransactionStatus(request, response);
}
@Override
public void onException(GlobalCommitRequest request, GlobalCommitResponse response, Exception rex) {
super.onException(request, response, rex);
checkTransactionStatus(request, response);
}
}, request, response);
return response;
}
io.seata.server.coordinator.DefaultCoordinator#doGlobalCommit
protected void doGlobalCommit(GlobalCommitRequest request, GlobalCommitResponse response, RpcContext rpcContext)
throws TransactionException {
MDC.put(RootContext.MDC_KEY_XID, request.getXid());
response.setGlobalStatus(core.commit(request.getXid()));
}
io.seata.server.coordinator.DefaultCore#commit
@Override
public GlobalStatus commit(String xid) throws TransactionException {
// 根据XID 从存储介质中找到对应的GlobalSession
GlobalSession globalSession = SessionHolder.findGlobalSession(xid);
// 如果没找到 则全局事务已结束
if (globalSession == null) {
return GlobalStatus.Finished;
}
globalSession.addSessionLifecycleListener(SessionHolder.getRootSessionManager());
// just lock changeStatus
boolean shouldCommit = SessionHolder.lockAndExecute(globalSession, () -> {
if (globalSession.getStatus() == GlobalStatus.Begin) {
// Highlight: Firstly, close the session, then no more branch can be registered.
// AT模式释放全局锁
globalSession.closeAndClean();
// 如果是AT模式 ,可以异步提交
if (globalSession.canBeCommittedAsync()) {
// 将当前全局事务状态改为 异步提交中 并且将当前全局事务会话加入到 ASYNC_COMMITTING_SESSION_MANAGER sessionHolder中
globalSession.asyncCommit();
MetricsPublisher.postSessionDoneEvent(globalSession, GlobalStatus.Committed, false, false);
return false;
} else {
globalSession.changeGlobalStatus(GlobalStatus.Committing);
return true;
}
}
return false;
});
if (shouldCommit) {
boolean success = doGlobalCommit(globalSession, false);
//If successful and all remaining branches can be committed asynchronously, do async commit.
if (success && globalSession.hasBranch() && globalSession.canBeCommittedAsync()) {
globalSession.asyncCommit();
return GlobalStatus.Committed;
} else {
return globalSession.getStatus();
}
} else {
return globalSession.getStatus() == GlobalStatus.AsyncCommitting ? GlobalStatus.Committed : globalSession.getStatus();
}
}
DefaultCore 首先根据XID从存储介质中找到GlobalSession,AT模式下释放全局锁,标记当前全局事务状态为GlobalStatus.AsyncCommitting
seata 服务端在启动时会初始化 DefaultCoordinator,DefaultCoordinator会启动周期线程每隔一秒钟执行handleAsyncCommitting处理异步全局事务提交
asyncCommitting.scheduleAtFixedRate(
() -> SessionHolder.distributedLockAndExecute(ASYNC_COMMITTING, this::handleAsyncCommitting), 0,
ASYNC_COMMITTING_RETRY_PERIOD, TimeUnit.MILLISECONDS);
protected void handleAsyncCommitting() {
// 用于从存储介质中查找满足条件的session
SessionCondition sessionCondition = new SessionCondition(GlobalStatus.AsyncCommitting);
// 查找所有状态为 GlobalStatus.AsyncCommitting 的全局事务
Collection<GlobalSession> asyncCommittingSessions =
SessionHolder.getAsyncCommittingSessionManager().findGlobalSessions(sessionCondition);
if (CollectionUtils.isEmpty(asyncCommittingSessions)) {
return;
}
SessionHelper.forEach(asyncCommittingSessions, asyncCommittingSession -> {
try {
asyncCommittingSession.addSessionLifecycleListener(SessionHolder.getRootSessionManager());
// 调用 DefaultCore.doGlobalCommit 处理
core.doGlobalCommit(asyncCommittingSession, true);
} catch (TransactionException ex) {
LOGGER.error("Failed to async committing [{}] {} {}", asyncCommittingSession.getXid(), ex.getCode(), ex.getMessage(), ex);
}
});
}
io.seata.server.coordinator.DefaultCore#doGlobalCommit
public boolean doGlobalCommit(GlobalSession globalSession, boolean retrying) throws TransactionException {
boolean success = true;
// start committing event
MetricsPublisher.postSessionDoingEvent(globalSession, retrying);
//当前事务未AT模式
if (globalSession.isSaga()) {
success = getCore(BranchType.SAGA).doGlobalCommit(globalSession, retrying);
} else {
// 遍历所有的分支事务
Boolean result = SessionHelper.forEach(globalSession.getSortedBranches(), branchSession -> {
// if not retrying, skip the canBeCommittedAsync branches
if (!retrying && branchSession.canBeCommittedAsync()) {
return CONTINUE;
}
BranchStatus currentStatus = branchSession.getStatus();
if (currentStatus == BranchStatus.PhaseOne_Failed) {
SessionHelper.removeBranch(globalSession, branchSession, !retrying);
return CONTINUE;
}
try {
// 策略模式 不同的分支事务类型交由不同的 Core 处理
BranchStatus branchStatus = getCore(branchSession.getBranchType()).branchCommit(globalSession, branchSession);
if (isXaerNotaTimeout(globalSession,branchStatus)) {
LOGGER.info("Commit branch XAER_NOTA retry timeout, xid = {} branchId = {}", globalSession.getXid(), branchSession.getBranchId());
branchStatus = BranchStatus.PhaseTwo_Committed;
}
switch (branchStatus) {
//分支事务提交成功 将当前分支事务从全局事务中移除
case PhaseTwo_Committed:
SessionHelper.removeBranch(globalSession, branchSession, !retrying);
return CONTINUE;
case PhaseTwo_CommitFailed_Unretryable:
//not at branch
SessionHelper.endCommitFailed(globalSession, retrying);
LOGGER.error("Committing global transaction[{}] finally failed, caused by branch transaction[{}] commit failed.", globalSession.getXid(), branchSession.getBranchId());
return false;
default:
if (!retrying) {
globalSession.queueToRetryCommit();
return false;
}
if (globalSession.canBeCommittedAsync()) {
LOGGER.error("Committing branch transaction[{}], status:{} and will retry later",
branchSession.getBranchId(), branchStatus);
return CONTINUE;
} else {
LOGGER.error(
"Committing global transaction[{}] failed, caused by branch transaction[{}] commit failed, will retry later.", globalSession.getXid(), branchSession.getBranchId());
return false;
}
}
} catch (Exception ex) {
StackTraceLogger.error(LOGGER, ex, "Committing branch transaction exception: {}",
new String[] {branchSession.toString()});
if (!retrying) {
globalSession.queueToRetryCommit();
throw new TransactionException(ex);
}
}
return CONTINUE;
});
// Return if the result is not null
if (result != null) {
return result;
}
//If has branch and not all remaining branches can be committed asynchronously,
//do print log and return false
if (globalSession.hasBranch() && !globalSession.canBeCommittedAsync()) {
LOGGER.info("Committing global transaction is NOT done, xid = {}.", globalSession.getXid());
return false;
}
if (!retrying) {
//contains not AT branch
globalSession.setStatus(GlobalStatus.Committed);
}
}
// if it succeeds and there is no branch, retrying=true is the asynchronous state when retrying. EndCommitted is
// executed to improve concurrency performance, and the global transaction ends..
if (success && globalSession.getBranchSessions().isEmpty()) {
SessionHelper.endCommitted(globalSession, retrying);
LOGGER.info("Committing global transaction is successfully done, xid = {}.", globalSession.getXid());
}
return success;
}
全局事务在提交时,会遍历所有与之关联的分支事务ID,向RM发送BranchCommitRequest 消息,RM分支事务提交成功后将分支事务从全局事务中删除。当所有分支事务全部执行成功后,将GlobalSession信息从数据库中删除
public BranchStatus branchCommit(GlobalSession globalSession, BranchSession branchSession) throws TransactionException {
try {
BranchCommitRequest request = new BranchCommitRequest();
request.setXid(branchSession.getXid());
request.setBranchId(branchSession.getBranchId());
request.setResourceId(branchSession.getResourceId());
request.setApplicationData(branchSession.getApplicationData());
request.setBranchType(branchSession.getBranchType());
return branchCommitSend(request, globalSession, branchSession);
} catch (IOException | TimeoutException e) {
throw new BranchTransactionException(FailedToSendBranchCommitRequest,
String.format("Send branch commit failed, xid = %s branchId = %s", branchSession.getXid(),
branchSession.getBranchId()), e);
}
}
RM提交分支事务
TC发送BranchCommitRequest消息后,RM接收到并将消息交由RmBranchCommitProcessor处理
io.seata.core.rpc.netty.RmNettyRemotingClient#registerProcessor
// 分支事务提交
RmBranchCommitProcessor rmBranchCommitProcessor = new RmBranchCommitProcessor(getTransactionMessageHandler(), this);
super.registerProcessor(MessageType.TYPE_BRANCH_COMMIT, rmBranchCommitProcessor, messageExecutor);
io.seata.core.rpc.processor.client.RmBranchCommitProcessor#process
public void process(ChannelHandlerContext ctx, RpcMessage rpcMessage) throws Exception {
//获取TC的地址
String remoteAddress = NetUtil.toStringAddress(ctx.channel().remoteAddress());
Object msg = rpcMessage.getBody();
if (LOGGER.isInfoEnabled()) {
LOGGER.info("rm client handle branch commit process:" + msg);
}
//处理分支事务提交
handleBranchCommit(rpcMessage, remoteAddress, (BranchCommitRequest) msg);
}
io.seata.core.rpc.processor.client.RmBranchCommitProcessor#handleBranchCommit
private void handleBranchCommit(RpcMessage request, String serverAddress, BranchCommitRequest branchCommitRequest) {
BranchCommitResponse resultMessage;
resultMessage = (BranchCommitResponse) handler.onRequest(branchCommitRequest, null);
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("branch commit result:" + resultMessage);
}
try {
this.remotingClient.sendAsyncResponse(serverAddress, request, resultMessage);
} catch (Throwable throwable) {
LOGGER.error("branch commit error: {}", throwable.getMessage(), throwable);
}
}
io.seata.rm.AbstractRMHandler#onRequest
public AbstractResultMessage onRequest(AbstractMessage request, RpcContext context) {
if (!(request instanceof AbstractTransactionRequestToRM)) {
throw new IllegalArgumentException();
}
AbstractTransactionRequestToRM transactionRequest = (AbstractTransactionRequestToRM)request;
transactionRequest.setRMInboundMessageHandler(this);
return transactionRequest.handle(context);
}
io.seata.core.protocol.transaction.BranchCommitRequest#handle
public AbstractTransactionResponse handle(RpcContext rpcContext) {
return handler.handle(this);
}
io.seata.rm.AbstractRMHandler#handle(io.seata.core.protocol.transaction.BranchCommitRequest)
public BranchCommitResponse handle(BranchCommitRequest request) {
BranchCommitResponse response = new BranchCommitResponse();
//真正开始处理分支事务提交
exceptionHandleTemplate(new AbstractCallback<BranchCommitRequest, BranchCommitResponse>() {
@Override
public void execute(BranchCommitRequest request, BranchCommitResponse response)
throws TransactionException {
doBranchCommit(request, response);
}
}, request, response);
return response;
}
io.seata.rm.AbstractRMHandler#doBranchCommit
protected void doBranchCommit(BranchCommitRequest request, BranchCommitResponse response)
throws TransactionException {
String xid = request.getXid();
long branchId = request.getBranchId();
String resourceId = request.getResourceId();
String applicationData = request.getApplicationData();
if (LOGGER.isInfoEnabled()) {
LOGGER.info("Branch committing: " + xid + " " + branchId + " " + resourceId + " " + applicationData);
}
// AT模式下 将从存储介质中删除Undo.log日志
BranchStatus status = getResourceManager().branchCommit(request.getBranchType(), xid, branchId, resourceId,
applicationData);
response.setXid(xid);
response.setBranchId(branchId);
response.setBranchStatus(status);
if (LOGGER.isInfoEnabled()) {
LOGGER.info("Branch commit result: " + status);
}
}
全局事务提交时,TC会释放全局锁,如果为AT模式将标记全局事务状态为异步提交中,然后遍历所有与当前全局事务绑定的分支事务,向RM发送BranchCommitRequest消息,RM随后删除undo.log日志,并向TC返回状态。待所有的分支事务全部删除undo.log日志成功后,TC将标记全局事务状态为Committed 并 从存储介质中删除全局事务信息
全局事务回滚
TM提交全局事务回滚请求
TM在RPC远程调用RM后,如果出现异常,获取自身业务方法抛出异常,将执行completeTransactionAfterThrowing方法
io.seata.tm.api.TransactionalTemplate#completeTransactionAfterThrowing
private void completeTransactionAfterThrowing(TransactionInfo txInfo, GlobalTransaction tx, Throwable originalException) throws TransactionalExecutor.ExecutionException {
//roll back
if (txInfo != null && txInfo.rollbackOn(originalException)) {
try {
//全局事务回滚
rollbackTransaction(tx, originalException);
} catch (TransactionException txe) {
// Failed to rollback
throw new TransactionalExecutor.ExecutionException(tx, txe,
TransactionalExecutor.Code.RollbackFailure, originalException);
}
} else {
// not roll back on this exception, so commit
commitTransaction(tx);
}
}
接着向TC发送GlobalRollbackRequest消息
io.seata.tm.DefaultTransactionManager#rollback
public GlobalStatus rollback(String xid) throws TransactionException {
GlobalRollbackRequest globalRollback = new GlobalRollbackRequest();
globalRollback.setXid(xid);
GlobalRollbackResponse response = (GlobalRollbackResponse) syncCall(globalRollback);
return response.getGlobalStatus();
}
TC处理全局事务回滚请求
io.seata.server.AbstractTCInboundHandler#handle(io.seata.core.protocol.transaction.GlobalRollbackRequest, io.seata.core.rpc.RpcContext)
public GlobalRollbackResponse handle(GlobalRollbackRequest request, final RpcContext rpcContext) {
GlobalRollbackResponse response = new GlobalRollbackResponse();
response.setGlobalStatus(GlobalStatus.Rollbacking);
exceptionHandleTemplate(new AbstractCallback<GlobalRollbackRequest, GlobalRollbackResponse>() {
@Override
public void execute(GlobalRollbackRequest request, GlobalRollbackResponse response)
throws TransactionException {
try {
doGlobalRollback(request, response, rpcContext);
} catch (StoreException e) {
throw new TransactionException(TransactionExceptionCode.FailedStore, String
.format("global rollback request failed. xid=%s, msg=%s", request.getXid(), e.getMessage()), e);
}
}
@Override
public void onTransactionException(GlobalRollbackRequest request, GlobalRollbackResponse response,
TransactionException tex) {
super.onTransactionException(request, response, tex);
// may be appears StoreException outer layer method catch
checkTransactionStatus(request, response);
}
@Override
public void onException(GlobalRollbackRequest request, GlobalRollbackResponse response, Exception rex) {
super.onException(request, response, rex);
// may be appears StoreException outer layer method catch
checkTransactionStatus(request, response);
}
}, request, response);
return response;
}
io.seata.server.coordinator.DefaultCore#rollback
public GlobalStatus rollback(String xid) throws TransactionException {
GlobalSession globalSession = SessionHolder.findGlobalSession(xid);
if (globalSession == null) {
return GlobalStatus.Finished;
}
globalSession.addSessionLifecycleListener(SessionHolder.getRootSessionManager());
// just lock changeStatus
boolean shouldRollBack = SessionHolder.lockAndExecute(globalSession, () -> {
globalSession.close(); // Highlight: Firstly, close the session, then no more branch can be registered.
if (globalSession.getStatus() == GlobalStatus.Begin) {
// 标记全局事务状态为 Rollbacking
globalSession.changeGlobalStatus(GlobalStatus.Rollbacking);
return true;
}
return false;
});
if (!shouldRollBack) {
return globalSession.getStatus();
}
boolean rollbackSuccess = doGlobalRollback(globalSession, false);
return rollbackSuccess ? GlobalStatus.Rollbacked : globalSession.getStatus();
}
io.seata.server.coordinator.DefaultCore#doGlobalRollback
public boolean doGlobalRollback(GlobalSession globalSession, boolean retrying) throws TransactionException {
boolean success = true;
// start rollback event
MetricsPublisher.postSessionDoingEvent(globalSession, retrying);
if (globalSession.isSaga()) {
success = getCore(BranchType.SAGA).doGlobalRollback(globalSession, retrying);
} else {
//遍历分支事务
Boolean result = SessionHelper.forEach(globalSession.getReverseSortedBranches(), branchSession -> {
BranchStatus currentBranchStatus = branchSession.getStatus();
if (currentBranchStatus == BranchStatus.PhaseOne_Failed) {
SessionHelper.removeBranch(globalSession, branchSession, !retrying);
return CONTINUE;
}
try {
//分支事务回滚操作
BranchStatus branchStatus = branchRollback(globalSession, branchSession);
if (isXaerNotaTimeout(globalSession, branchStatus)) {
LOGGER.info("Rollback branch XAER_NOTA retry timeout, xid = {} branchId = {}", globalSession.getXid(), branchSession.getBranchId());
branchStatus = BranchStatus.PhaseTwo_Rollbacked;
}
switch (branchStatus) {
//回滚成功则将分支事务从存储介质中删除
case PhaseTwo_Rollbacked:
SessionHelper.removeBranch(globalSession, branchSession, !retrying);
LOGGER.info("Rollback branch transaction successfully, xid = {} branchId = {}", globalSession.getXid(), branchSession.getBranchId());
return CONTINUE;
//回滚失败则标记全局锁回滚失败,并释放全局锁
case PhaseTwo_RollbackFailed_Unretryable:
SessionHelper.endRollbackFailed(globalSession, retrying);
LOGGER.info("Rollback branch transaction fail and stop retry, xid = {} branchId = {}", globalSession.getXid(), branchSession.getBranchId());
return false;
default:
LOGGER.info("Rollback branch transaction fail and will retry, xid = {} branchId = {}", globalSession.getXid(), branchSession.getBranchId());
if (!retrying) {
globalSession.queueToRetryRollback();
}
return false;
}
} catch (Exception ex) {
StackTraceLogger.error(LOGGER, ex,
"Rollback branch transaction exception, xid = {} branchId = {} exception = {}",
new String[] {globalSession.getXid(), String.valueOf(branchSession.getBranchId()), ex.getMessage()});
if (!retrying) {
globalSession.queueToRetryRollback();
}
throw new TransactionException(ex);
}
});
// Return if the result is not null
if (result != null) {
return result;
}
}
// In db mode, lock and branch data residual problems may occur.
// Therefore, execution needs to be delayed here and cannot be executed synchronously.
if (success) {
// 全局事务回滚成功,将全局锁释并标记全局锁状态为 Rollbacked 随后删除全局事务信息
SessionHelper.endRollbacked(globalSession, retrying);
LOGGER.info("Rollback global transaction successfully, xid = {}.", globalSession.getXid());
}
return success;
}
io.seata.server.coordinator.AbstractCore#branchRollback
public BranchStatus branchRollback(GlobalSession globalSession, BranchSession branchSession) throws TransactionException {
try {
BranchRollbackRequest request = new BranchRollbackRequest();
request.setXid(branchSession.getXid());
request.setBranchId(branchSession.getBranchId());
request.setResourceId(branchSession.getResourceId());
request.setApplicationData(branchSession.getApplicationData());
request.setBranchType(branchSession.getBranchType());
return branchRollbackSend(request, globalSession, branchSession);
} catch (IOException | TimeoutException e) {
throw new BranchTransactionException(FailedToSendBranchRollbackRequest,
String.format("Send branch rollback failed, xid = %s branchId = %s",
branchSession.getXid(), branchSession.getBranchId()), e);
}
}
与全局事务提交类似,全局事务回滚时也是遍历所有的分支事务,随后进行分支事务回滚操作,分支事务回滚成功后就将分支事务的信息从存储介质中删除;待所有的分支事务全部回滚后,就将全局锁释放,并删除全局锁信息。
接下来我们看下分支事务回滚操作
RM处理分支事务回滚
TC向RM发送BranchRollbackRequest消息后,RM将消息交由 RmBranchRollbackProcessor 进行处理
RmBranchRollbackProcessor rmBranchRollbackProcessor = new RmBranchRollbackProcessor(getTransactionMessageHandler(), this);
super.registerProcessor(MessageType.TYPE_BRANCH_ROLLBACK, rmBranchRollbackProcessor, messageExecutor);
io.seata.core.rpc.processor.client.RmBranchRollbackProcessor#handleBranchRollback
private void handleBranchRollback(RpcMessage request, String serverAddress, BranchRollbackRequest branchRollbackRequest) {
BranchRollbackResponse resultMessage;
resultMessage = (BranchRollbackResponse) handler.onRequest(branchRollbackRequest, null);
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("branch rollback result:" + resultMessage);
}
try {
this.remotingClient.sendAsyncResponse(serverAddress, request, resultMessage);
} catch (Throwable throwable) {
LOGGER.error("send response error: {}", throwable.getMessage(), throwable);
}
}
io.seata.rm.AbstractRMHandler#handle(io.seata.core.protocol.transaction.BranchRollbackRequest)
public BranchRollbackResponse handle(BranchRollbackRequest request) {
BranchRollbackResponse response = new BranchRollbackResponse();
exceptionHandleTemplate(new AbstractCallback<BranchRollbackRequest, BranchRollbackResponse>() {
@Override
public void execute(BranchRollbackRequest request, BranchRollbackResponse response)
throws TransactionException {
doBranchRollback(request, response);
}
}, request, response);
return response;
}
io.seata.rm.AbstractRMHandler#doBranchRollback
protected void doBranchRollback(BranchRollbackRequest request, BranchRollbackResponse response)
throws TransactionException {
String xid = request.getXid();
long branchId = request.getBranchId();
String resourceId = request.getResourceId();
String applicationData = request.getApplicationData();
if (LOGGER.isInfoEnabled()) {
LOGGER.info("Branch Rollbacking: " + xid + " " + branchId + " " + resourceId);
}
//AT模式 使用的是 DataSourceManager
BranchStatus status = getResourceManager().branchRollback(request.getBranchType(), xid, branchId, resourceId,
applicationData);
response.setXid(xid);
response.setBranchId(branchId);
response.setBranchStatus(status);
if (LOGGER.isInfoEnabled()) {
LOGGER.info("Branch Rollbacked result: " + status);
}
}
io.seata.rm.datasource.DataSourceManager#branchRollback
public BranchStatus branchRollback(BranchType branchType, String xid, long branchId, String resourceId,
String applicationData) throws TransactionException {
DataSourceProxy dataSourceProxy = get(resourceId);
if (dataSourceProxy == null) {
throw new ShouldNeverHappenException(String.format("resource: %s not found",resourceId));
}
try {
//通过undo.log日志进行回滚
UndoLogManagerFactory.getUndoLogManager(dataSourceProxy.getDbType()).undo(dataSourceProxy, xid, branchId);
} catch (TransactionException te) {
StackTraceLogger.info(LOGGER, te,
"branchRollback failed. branchType:[{}], xid:[{}], branchId:[{}], resourceId:[{}], applicationData:[{}]. reason:[{}]",
new Object[]{branchType, xid, branchId, resourceId, applicationData, te.getMessage()});
if (te.getCode() == TransactionExceptionCode.BranchRollbackFailed_Unretriable) {
return BranchStatus.PhaseTwo_RollbackFailed_Unretryable;
} else {
return BranchStatus.PhaseTwo_RollbackFailed_Retryable;
}
}
return BranchStatus.PhaseTwo_Rollbacked;
}
io.seata.rm.datasource.undo.AbstractUndoLogManager#undo
public void undo(DataSourceProxy dataSourceProxy, String xid, long branchId) throws TransactionException {
Connection conn = null;
ResultSet rs = null;
PreparedStatement selectPST = null;
boolean originalAutoCommit = true;
for (; ; ) {
try {
conn = dataSourceProxy.getPlainConnection();
// The entire undo process should run in a local transaction.
if (originalAutoCommit = conn.getAutoCommit()) {
conn.setAutoCommit(false);
}
// Find UNDO LOG
//根据全局事务xid 和分支事务id查找回滚日志
selectPST = conn.prepareStatement(SELECT_UNDO_LOG_SQL);
selectPST.setLong(1, branchId);
selectPST.setString(2, xid);
rs = selectPST.executeQuery();
//存在回滚日志
boolean exists = false;
while (rs.next()) {
exists = true;
// It is possible that the server repeatedly sends a rollback request to roll back
// the same branch transaction to multiple processes,
// ensuring that only the undo_log in the normal state is processed.
int state = rs.getInt(ClientTableColumnsName.UNDO_LOG_LOG_STATUS);
if (!canUndo(state)) {
if (LOGGER.isInfoEnabled()) {
LOGGER.info("xid {} branch {}, ignore {} undo_log", xid, branchId, state);
}
return;
}
String contextString = rs.getString(ClientTableColumnsName.UNDO_LOG_CONTEXT);
//解析回滚日志
Map<String, String> context = parseContext(contextString);
byte[] rollbackInfo = getRollbackInfo(rs);
String serializer = context == null ? null : context.get(UndoLogConstants.SERIALIZER_KEY);
UndoLogParser parser = serializer == null ? UndoLogParserFactory.getInstance()
: UndoLogParserFactory.getInstance(serializer);
//反序列化成BranchUndoLog对象
BranchUndoLog branchUndoLog = parser.decode(rollbackInfo);
try {
// put serializer name to local
setCurrentSerializer(parser.getName());
List<SQLUndoLog> sqlUndoLogs = branchUndoLog.getSqlUndoLogs();
if (sqlUndoLogs.size() > 1) {
Collections.reverse(sqlUndoLogs);
}
for (SQLUndoLog sqlUndoLog : sqlUndoLogs) {
TableMeta tableMeta = TableMetaCacheFactory.getTableMetaCache(dataSourceProxy.getDbType()).getTableMeta(
conn, sqlUndoLog.getTableName(), dataSourceProxy.getResourceId());
sqlUndoLog.setTableMeta(tableMeta);
AbstractUndoExecutor undoExecutor = UndoExecutorFactory.getUndoExecutor(
dataSourceProxy.getDbType(), sqlUndoLog);
//执行回滚日志
undoExecutor.executeOn(conn);
}
} finally {
// remove serializer name
removeCurrentSerializer();
}
}
// If undo_log exists, it means that the branch transaction has completed the first phase,
// we can directly roll back and clean the undo_log
// Otherwise, it indicates that there is an exception in the branch transaction,
// causing undo_log not to be written to the database.
// For example, the business processing timeout, the global transaction is the initiator rolls back.
// To ensure data consistency, we can insert an undo_log with GlobalFinished state
// to prevent the local transaction of the first phase of other programs from being correctly submitted.
// See https://github.com/seata/seata/issues/489
if (exists) {
//回滚日志执行成功后删除
deleteUndoLog(xid, branchId, conn);
//提交本地事务
conn.commit();
if (LOGGER.isInfoEnabled()) {
LOGGER.info("xid {} branch {}, undo_log deleted with {}", xid, branchId,
State.GlobalFinished.name());
}
} else {
insertUndoLogWithGlobalFinished(xid, branchId, UndoLogParserFactory.getInstance(), conn);
conn.commit();
if (LOGGER.isInfoEnabled()) {
LOGGER.info("xid {} branch {}, undo_log added with {}", xid, branchId,
State.GlobalFinished.name());
}
}
return;
} catch (SQLIntegrityConstraintViolationException e) {
// Possible undo_log has been inserted into the database by other processes, retrying rollback undo_log
if (LOGGER.isInfoEnabled()) {
LOGGER.info("xid {} branch {}, undo_log inserted, retry rollback", xid, branchId);
}
} catch (Throwable e) {
if (conn != null) {
try {
conn.rollback();
} catch (SQLException rollbackEx) {
LOGGER.warn("Failed to close JDBC resource while undo ... ", rollbackEx);
}
}
throw new BranchTransactionException(BranchRollbackFailed_Retriable, String
.format("Branch session rollback failed and try again later xid = %s branchId = %s %s", xid,
branchId, e.getMessage()), e);
} finally {
try {
if (rs != null) {
rs.close();
}
if (selectPST != null) {
selectPST.close();
}
if (conn != null) {
if (originalAutoCommit) {
conn.setAutoCommit(true);
}
conn.close();
}
} catch (SQLException closeEx) {
LOGGER.warn("Failed to close JDBC resource while undo ... ", closeEx);
}
}
}
}
通过xid和分支事务ID,从业务库中的undo.log表中获取回滚日志,经过解析和反序列化后执行
io.seata.rm.datasource.undo.AbstractUndoExecutor#executeOn
public void executeOn(Connection conn) throws SQLException {
// 前后镜像数据 与当前数据三者比较
// 如果前镜像数据 = 后镜像数据 无需回滚 数据没有变化
// 如果后镜像数据 != 当前数据 不能回滚,因为产生了脏数据
if (IS_UNDO_DATA_VALIDATION_ENABLE && !dataValidationAndGoOn(conn)) {
return;
}
PreparedStatement undoPST = null;
try {
// 构建回滚SQL模版
String undoSQL = buildUndoSQL();
undoPST = conn.prepareStatement(undoSQL);
//获取数据修改前的镜像数据
TableRecords undoRows = getUndoRows();
//遍历所有修改前的行数据
for (Row undoRow : undoRows.getRows()) {
ArrayList<Field> undoValues = new ArrayList<>();
List<Field> pkValueList = getOrderedPkList(undoRows, undoRow, getDbType(conn));
for (Field field : undoRow.getFields()) {
if (field.getKeyType() != KeyType.PRIMARY_KEY) {
undoValues.add(field);
}
}
//拼装成真正的回滚SQL
undoPrepare(undoPST, undoValues, pkValueList);
//执行回滚SQL
undoPST.executeUpdate();
}
} catch (Exception ex) {
if (ex instanceof SQLException) {
throw (SQLException) ex;
} else {
throw new SQLException(ex);
}
}
finally {
//important for oracle
IOUtil.close(undoPST);
}
}
在执行回滚SQL前,会将前后镜像数据与当前数据进行比较
// 如果前镜像数据 = 后镜像数据 无需回滚 数据没有变化
// 如果后镜像数据 != 当前数据 不能回滚,因为产生了脏数据
如果后镜像数据等于当前数据 且 不等于前镜像数据,那么将创建回滚SQL并执行,完成分支事务回滚操作