文章目录
- 事务的提交
- 客户端提交流程
- 服务端提交流程
- 客户端删除undo_log
- 事务回滚
- 客户端事务回滚
- 服务端回滚事务
事务的提交
前面两篇我们分析了seata的TC初始化和TM,RM初始化,并且事务准备阶段源码及业务Sql执行,下面我们分析事务的提交源码。
客户端提交流程
这个主要是从 TransactionalTemplate#execute方法中
try {
//准备阶段
this.beginTransaction(txInfo, tx);
Object rs;
Object ex;
try {
//业务sql的执行
rs = business.execute();
} catch (Throwable var17) {
ex = var17;
this.completeTransactionAfterThrowing(txInfo, tx, var17);
throw var17;
}
//事务的提交
this.commitTransaction(tx);
ex = rs;
return ex;
} finally {
this.resumeGlobalLockConfig(previousConfig);
this.triggerAfterCompletion();
this.cleanUp();
}
private void commitTransaction(GlobalTransaction tx) throws ExecutionException {
try {
//前置镜像
this.triggerBeforeCommit();
//提交
tx.commit();
//后置镜像
this.triggerAfterCommit();
} catch (TransactionException var3) {
throw new ExecutionException(tx, var3, Code.CommitFailure);
}
}
public void commit() throws TransactionException {
//判断当前的角色是否是Launcher
if (this.role == GlobalTransactionRole.Participant) {
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("Ignore Commit(): just involved in global transaction [{}]", this.xid);
}
} else {
//判断xid是否为null
this.assertXIDNotNull();
//重试次数,默认为5次可以自定定义重试次数
int retry = COMMIT_RETRY_COUNT <= 0 ? 5 : COMMIT_RETRY_COUNT;
try {
while(retry > 0) {
try {
--retry;
//循环请求后台,并且返回状态
this.status = this.transactionManager.commit(this.xid);
break;
} catch (Throwable var6) {
LOGGER.error("Failed to report global commit [{}],Retry Countdown: {}, reason: {}", new Object[]{this.getXid(), retry, var6.getMessage()});
if (retry == 0) {
throw new TransactionException("Failed to report global commit", var6);
}
}
}
} finally {
if (this.xid.equals(RootContext.getXID())) {
this.suspend();
}
}
if (LOGGER.isInfoEnabled()) {
LOGGER.info("[{}] commit status: {}", this.xid, this.status);
}
}
}
public GlobalStatus commit(String xid) throws TransactionException {
//组装请求的参数
GlobalCommitRequest globalCommit = new GlobalCommitRequest();
globalCommit.setXid(xid);
//netty请求seata后端
GlobalCommitResponse response = (GlobalCommitResponse)this.syncCall(globalCommit);
return response.getGlobalStatus();
}
private AbstractTransactionResponse syncCall(AbstractTransactionRequest request) throws TransactionException {
try {
return (AbstractTransactionResponse)TmNettyRemotingClient.getInstance().sendSyncRequest(request);
} catch (TimeoutException var3) {
throw new TmTransactionException(TransactionExceptionCode.IO, "RPC timeout", var3);
}
}
}
服务端提交流程
seata后台服务时对事务进行的异步提交,首先来分析一下我们后端服务的入口DefaultCoordinator#doglobalCommit方法
protected void doGlobalCommit(GlobalCommitRequest request, GlobalCommitResponse response, RpcContext rpcContext)
throws TransactionException {
//存储线程池的xid方便看日志
MDC.put(RootContext.MDC_KEY_XID, request.getXid());
//核心方法
response.setGlobalStatus(core.commit(request.getXid()));
}
public GlobalStatus commit(String xid) throws TransactionException {
//key1:查询全局事务 如果事务被清理,则直接返回完成
GlobalSession globalSession = SessionHolder.findGlobalSession(xid);
if (globalSession == null) {
return GlobalStatus.Finished;
}
//添加监听器
globalSession.addSessionLifecycleListener(SessionHolder.getRootSessionManager());
// just lock changeStatus
boolean shouldCommit = SessionHolder.lockAndExecute(globalSession, () -> {
if (globalSession.getStatus() == GlobalStatus.Begin) {
// Highlight: Firstly, close the session, then no more branch can be registered.
//释放全局锁
globalSession.closeAndClean();
// key2: 异步提交
//判断是否可以进行异步提交,是AT模式
if (globalSession.canBeCommittedAsync()) {
//异步提交
globalSession.asyncCommit();
MetricsPublisher.postSessionDoneEvent(globalSession, GlobalStatus.Committed, false, false);
return false;
} else {
//更改全局事务状态为commiting
globalSession.changeGlobalStatus(GlobalStatus.Committing);
return true;
}
}
return false;
});
//如果可以提交,则执行shouldCommit
if (shouldCommit) {
boolean success = doGlobalCommit(globalSession, false);
//If successful and all remaining branches can be committed asynchronously, do async commit.
if (success && globalSession.hasBranch() && globalSession.canBeCommittedAsync()) {
globalSession.asyncCommit();
return GlobalStatus.Committed;
} else {
return globalSession.getStatus();
}
} else {
return globalSession.getStatus() == GlobalStatus.AsyncCommitting ? GlobalStatus.Committed : globalSession.getStatus();
}
}
AT模式将事务状态改为异步提交AsyncCommitting,然后定时运行线程池1s轮训运行一次
public void asyncCommit() throws TransactionException {
this.addSessionLifecycleListener(SessionHolder.getAsyncCommittingSessionManager());
// 设置一个状态异步提交·
this.setStatus(GlobalStatus.AsyncCommitting);
// 异步增加全局session信息
SessionHolder.getAsyncCommittingSessionManager().addGlobalSession(this);
}
异步定时任务DefaultCoordinator#init方法中定时开启
public void init() {
retryRollbacking.scheduleAtFixedRate(
() -> SessionHolder.distributedLockAndExecute(RETRY_ROLLBACKING, this::handleRetryRollbacking), 0,
ROLLBACKING_RETRY_PERIOD, TimeUnit.MILLISECONDS);
retryCommitting.scheduleAtFixedRate(
() -> SessionHolder.distributedLockAndExecute(RETRY_COMMITTING, this::handleRetryCommitting), 0,
COMMITTING_RETRY_PERIOD, TimeUnit.MILLISECONDS);
//我们事务异步提交都在这里执行
asyncCommitting.scheduleAtFixedRate(
() -> SessionHolder.distributedLockAndExecute(ASYNC_COMMITTING, this::handleAsyncCommitting), 0,
ASYNC_COMMITTING_RETRY_PERIOD, TimeUnit.MILLISECONDS);
timeoutCheck.scheduleAtFixedRate(
() -> SessionHolder.distributedLockAndExecute(TX_TIMEOUT_CHECK, this::timeoutCheck), 0,
TIMEOUT_RETRY_PERIOD, TimeUnit.MILLISECONDS);
undoLogDelete.scheduleAtFixedRate(
() -> SessionHolder.distributedLockAndExecute(UNDOLOG_DELETE, this::undoLogDelete),
UNDO_LOG_DELAY_DELETE_PERIOD, UNDO_LOG_DELETE_PERIOD, TimeUnit.MILLISECONDS);
}
获取所有的一步提交状态的事务,并加上排他锁
public static boolean distributedLockAndExecute(String key, NoArgsFunc func) {
boolean lock = false;
try {
//核心方法,请求加锁
if (lock = acquireDistributedLock(key)) {
func.call();
}
} catch (Exception e) {
LOGGER.info("Exception running function with key = {}", key, e);
} finally {
if (lock) {
try {
SessionHolder.releaseDistributedLock(key);
} catch (Exception ex) {
LOGGER.warn("release distribute lock failure, message = {}", ex.getMessage(), ex);
}
}
}
return lock;
}
public boolean acquireLock(DistributedLockDO distributedLockDO) {
if (demotion) {
return true;
}
Connection connection = null;
boolean originalAutoCommit = false;
try {
connection = distributedLockDataSource.getConnection();
originalAutoCommit = connection.getAutoCommit();
connection.setAutoCommit(false);
//这个是核心,查询出所有的异步提交的事务,并且查询的sql上加上排他锁
DistributedLockDO distributedLockDOFromDB = getDistributedLockDO(connection, distributedLockDO.getLockKey());
//如果查询出来的数据为null
if (null == distributedLockDOFromDB) {
//执行插入的sql "INSERT INTO " + DISTRIBUTED_LOCK_TABLE_PLACE_HOLD + "(" + ALL_COLUMNS + ") VALUES (?, ?, ?)";
boolean ret = insertDistribute(connection, distributedLockDO);
//提交事务
connection.commit();
return ret;
}
//判断过期时间如果大于等于当前的系统时间
if (distributedLockDOFromDB.getExpireTime() >= System.currentTimeMillis()) {
LOGGER.debug("the distribute lock for key :{} is holding by :{}, acquire lock failure.",
distributedLockDO.getLockKey(), distributedLockDOFromDB.getLockValue());
//事务提交
connection.commit();
return false;
}
//执行更新sql"UPDATE " + "distributed_lock_table" + " SET "+ ServerTableColumnsName.DISTRIBUTED_LOCK_VALUE + "=?, " + ServerTableColumnsName.DISTRIBUTED_LOCK_EXPIRE + "=?"+ " WHERE " + ServerTableColumnsName.DISTRIBUTED_LOCK_KEY + "=?";
boolean ret = updateDistributedLock(connection, distributedLockDO);
//事务提交
connection.commit();
return ret;
} catch (SQLException ex) {
LOGGER.error("execute acquire lock failure, key is: {}", distributedLockDO.getLockKey(), ex);
try {
if (connection != null) {
//回滚
connection.rollback();
}
} catch (SQLException e) {
LOGGER.warn("rollback fail because of {}", e.getMessage(), e);
}
return false;
} finally {
try {
if (originalAutoCommit) {
connection.setAutoCommit(true);
}
IOUtil.close(connection);
} catch (SQLException ignore) { }
}
}
加上分布式事务锁之后,我们需要执行handleAsyncCommitting
protected void handleAsyncCommitting() {
SessionCondition sessionCondition = new SessionCondition(GlobalStatus.AsyncCommitting);
// key1:获取所有GlobalSession,并且是事务为AsyncCommiting状态
Collection<GlobalSession> asyncCommittingSessions =
SessionHolder.getAsyncCommittingSessionManager().findGlobalSessions(sessionCondition);
if (CollectionUtils.isEmpty(asyncCommittingSessions)) {
return;
}
// key2:遍历
SessionHelper.forEach(asyncCommittingSessions, asyncCommittingSession -> {
try {
asyncCommittingSession.addSessionLifecycleListener(SessionHolder.getRootSessionManager());
// 进行处理
core.doGlobalCommit(asyncCommittingSession, true);
} catch (TransactionException ex) {
LOGGER.error("Failed to async committing [{}] {} {}", asyncCommittingSession.getXid(), ex.getCode(), ex.getMessage(), ex);
}
});
}
事务的核心执行流程
@Override
public boolean doGlobalCommit(GlobalSession globalSession, boolean retrying) throws TransactionException {
boolean success = true;
// start committing event
MetricsPublisher.postSessionDoingEvent(globalSession, retrying);
if (globalSession.isSaga()) {
success = getCore(BranchType.SAGA).doGlobalCommit(globalSession, retrying);
} else {
// key1: 获取分支事务
Boolean result = SessionHelper.forEach(globalSession.getSortedBranches(), branchSession -> {
// if not retrying, skip the canBeCommittedAsync branches
if (!retrying && branchSession.canBeCommittedAsync()) {
return CONTINUE;
}
BranchStatus currentStatus = branchSession.getStatus();
// key2: 当前事务状态为PhaseOne_Failed,则删除分支事务,释放全局锁
if (currentStatus == BranchStatus.PhaseOne_Failed) {
SessionHelper.removeBranch(globalSession, branchSession, !retrying);
return CONTINUE;
}
try {
//key3:发送rpc删除undolog日志
BranchStatus branchStatus = getCore(branchSession.getBranchType()).branchCommit(globalSession, branchSession);
if (isXaerNotaTimeout(globalSession,branchStatus)) {
LOGGER.info("Commit branch XAER_NOTA retry timeout, xid = {} branchId = {}", globalSession.getXid(), branchSession.getBranchId());
branchStatus = BranchStatus.PhaseTwo_Committed;
}
switch (branchStatus) {
case PhaseTwo_Committed:
//key4: 删除分支事务信息,释放全局锁
SessionHelper.removeBranch(globalSession, branchSession, !retrying);
return CONTINUE;
case PhaseTwo_CommitFailed_Unretryable:
//远程删除失败则报错,如果不是异步提交,修改状态CommitFailed
SessionHelper.endCommitFailed(globalSession, retrying);
LOGGER.error("Committing global transaction[{}] finally failed, caused by branch transaction[{}] commit failed.", globalSession.getXid(), branchSession.getBranchId());
return false;
default:
if (!retrying) {
globalSession.queueToRetryCommit();
return false;
}
if (globalSession.canBeCommittedAsync()) {
LOGGER.error("Committing branch transaction[{}], status:{} and will retry later",
branchSession.getBranchId(), branchStatus);
return CONTINUE;
} else {
LOGGER.error(
"Committing global transaction[{}] failed, caused by branch transaction[{}] commit failed, will retry later.", globalSession.getXid(), branchSession.getBranchId());
return false;
}
}
} catch (Exception ex) {
StackTraceLogger.error(LOGGER, ex, "Committing branch transaction exception: {}",
new String[] {branchSession.toString()});
if (!retrying) {
globalSession.queueToRetryCommit();
throw new TransactionException(ex);
}
}
return CONTINUE;
});
// Return if the result is not null
if (result != null) {
return result;
}
//If has branch and not all remaining branches can be committed asynchronously,
//do print log and return false
if (globalSession.hasBranch() && !globalSession.canBeCommittedAsync()) {
LOGGER.info("Committing global transaction is NOT done, xid = {}.", globalSession.getXid());
return false;
}
if (!retrying) {
//contains not AT branch
globalSession.setStatus(GlobalStatus.Committed);
}
}
// if it succeeds and there is no branch, retrying=true is the asynchronous state when retrying. EndCommitted is
// executed to improve concurrency performance, and the global transaction ends..
if (success && globalSession.getBranchSessions().isEmpty()) {
// key4: 删除全局事务信息
SessionHelper.endCommitted(globalSession, retrying);
LOGGER.info("Committing global transaction is successfully done, xid = {}.", globalSession.getXid());
}
return success;
}
客户端删除undo_log
我们进入客户端DataSourceManager#branchcommit方法
//这个属于核心,在类的构造方法中,会初始化一个定时任务
private final AsyncWorker asyncWorker = new AsyncWorker(this);
public BranchStatus branchCommit(BranchType branchType, String xid, long branchId, String resourceId, String applicationData) throws TransactionException {
return this.asyncWorker.branchCommit(xid, branchId, resourceId);
}
public BranchStatus branchCommit(String xid, long branchId, String resourceId) {
AsyncWorker.Phase2Context context = new AsyncWorker.Phase2Context(xid, branchId, resourceId);
//将事务加入队列
this.addToCommitQueue(context);
return BranchStatus.PhaseTwo_Committed;
}
AsyncWorker的构造方法
public AsyncWorker(DataSourceManager dataSourceManager) {
this.dataSourceManager = dataSourceManager;
LOGGER.info("Async Commit Buffer Limit: {}", ASYNC_COMMIT_BUFFER_LIMIT);
//初始化一个队列
this.commitQueue = new LinkedBlockingQueue(ASYNC_COMMIT_BUFFER_LIMIT);
ThreadFactory threadFactory = new NamedThreadFactory("AsyncWorker", 2, true);
this.scheduledExecutor = new ScheduledThreadPoolExecutor(2, threadFactory);
//每一秒就轮训执行方法doBranchCommitSafely
this.scheduledExecutor.scheduleAtFixedRate(this::doBranchCommitSafely, 10L, 1000L, TimeUnit.MILLISECONDS);
}
private void doBranchCommit() {
//判断队列是否为空
if (!this.commitQueue.isEmpty()) {
List<AsyncWorker.Phase2Context> allContexts = new LinkedList();
//将队列中的消息添加到新的list中
this.commitQueue.drainTo(allContexts);
//校验和处理数据,并将其封装为map
Map<String, List<AsyncWorker.Phase2Context>> groupedContexts = this.groupedByResourceId(allContexts);
groupedContexts.forEach(this::dealWithGroupedContexts);
}
}
private void dealWithGroupedContexts(String resourceId, List<AsyncWorker.Phase2Context> contexts) {
//判断resourceId是否为空
if (StringUtils.isBlank(resourceId)) {
LOGGER.warn("resourceId is empty and will skip.");
} else {
DataSourceProxy dataSourceProxy = this.dataSourceManager.get(resourceId);
if (dataSourceProxy == null) {
LOGGER.warn("failed to find resource for {} and requeue", resourceId);
//添加到提交队列
this.addAllToCommitQueue(contexts);
} else {
Connection conn = null;
try {
conn = dataSourceProxy.getPlainConnection();
//获取undolog事务管理器
UndoLogManager undoLogManager = UndoLogManagerFactory.getUndoLogManager(dataSourceProxy.getDbType());
List<List<AsyncWorker.Phase2Context>> splitByLimit = Lists.partition(contexts, 1000);
Iterator var7 = splitByLimit.iterator();
//循环遍历
while(var7.hasNext()) {
List<AsyncWorker.Phase2Context> partition = (List)var7.next();
//删除undologr日志
this.deleteUndoLog(conn, undoLogManager, partition);
}
} catch (SQLException var12) {
this.addAllToCommitQueue(contexts);
LOGGER.error("failed to get connection for async committing on {} and requeue", resourceId, var12);
} finally {
IOUtil.close(conn);
}
}
}
}
private void deleteUndoLog(final Connection conn, UndoLogManager undoLogManager, List<AsyncWorker.Phase2Context> contexts) {
Set<String> xids = new LinkedHashSet(contexts.size());
Set<Long> branchIds = new LinkedHashSet(contexts.size());
contexts.forEach((context) -> {
xids.add(context.xid);
branchIds.add(context.branchId);
});
try {
//批量删除
undoLogManager.batchDeleteUndoLog(xids, branchIds, conn);
if (!conn.getAutoCommit()) {
conn.commit();
}
} catch (SQLException var9) {
LOGGER.error("Failed to batch delete undo log", var9);
try {
conn.rollback();
this.addAllToCommitQueue(contexts);
} catch (SQLException var8) {
LOGGER.error("Failed to rollback JDBC resource after deleting undo log failed", var8);
}
}
}
事务回滚
客户端事务回滚
客户端事务主要从AbstractTransactionRequest的实现类中去找到branchTrancetionRollback类
public abstract class AbstractTransactionRequest extends AbstractMessage {
public AbstractTransactionRequest() {
}
public abstract AbstractTransactionResponse handle(RpcContext rpcContext);
}
这个类是rollBack的核心类
public class BranchRollbackRequest extends AbstractBranchEndRequest {
public BranchRollbackRequest() {
}
public short getTypeCode() {
return 5;
}
public AbstractTransactionResponse handle(RpcContext rpcContext) {
return this.handler.handle(this);
}
}
protected void doBranchRollback(BranchRollbackRequest request, BranchRollbackResponse response) throws TransactionException {
//封装请求参数
String xid = request.getXid();
long branchId = request.getBranchId();
String resourceId = request.getResourceId();
String applicationData = request.getApplicationData();
if (LOGGER.isInfoEnabled()) {
LOGGER.info("Branch Rollbacking: " + xid + " " + branchId + " " + resourceId);
}
BranchStatus status = this.getResourceManager().branchRollback(request.getBranchType(), xid, branchId, resourceId, applicationData);
//封装响应数据
response.setXid(xid);
response.setBranchId(branchId);
response.setBranchStatus(status);
if (LOGGER.isInfoEnabled()) {
LOGGER.info("Branch Rollbacked result: " + status);
}
}
public BranchStatus branchRollback(BranchType branchType, String xid, long branchId, String resourceId, String applicationData) throws TransactionException {
DataSourceProxy dataSourceProxy = this.get(resourceId);
if (dataSourceProxy == null) {
throw new ShouldNeverHappenException(String.format("resource: %s not found", resourceId));
} else {
try {
//核心的执行方法
UndoLogManagerFactory.getUndoLogManager(dataSourceProxy.getDbType()).undo(dataSourceProxy, xid, branchId);
} catch (TransactionException var9) {
StackTraceLogger.info(LOGGER, var9, "branchRollback failed. branchType:[{}], xid:[{}], branchId:[{}], resourceId:[{}], applicationData:[{}]. reason:[{}]", new Object[]{branchType, xid, branchId, resourceId, applicationData, var9.getMessage()});
if (var9.getCode() == TransactionExceptionCode.BranchRollbackFailed_Unretriable) {
return BranchStatus.PhaseTwo_RollbackFailed_Unretryable;
}
return BranchStatus.PhaseTwo_RollbackFailed_Retryable;
}
//返回二次提交回滚状态
return BranchStatus.PhaseTwo_Rollbacked;
}
}
public void undo(DataSourceProxy dataSourceProxy, String xid, long branchId) throws TransactionException {
Connection conn = null;
ResultSet rs = null;
PreparedStatement selectPST = null;
boolean originalAutoCommit = true;
while(true) {
try {
conn = dataSourceProxy.getPlainConnection();
if (originalAutoCommit = conn.getAutoCommit()) {
conn.setAutoCommit(false);
}
//查询出来事务的undolog SELECT_UNDO_LOG_SQL = "SELECT * FROM " + UNDO_LOG_TABLE_NAME + " WHERE " + "branch_id" + " = ? AND " + "xid" + " = ? FOR UPDATE";
selectPST = conn.prepareStatement(SELECT_UNDO_LOG_SQL);
selectPST.setLong(1, branchId);
selectPST.setString(2, xid);
rs = selectPST.executeQuery();
boolean exists = false;
while(rs.next()) {
exists = true;
int state = rs.getInt("log_status");
if (!canUndo(state)) {
if (LOGGER.isInfoEnabled()) {
LOGGER.info("xid {} branch {}, ignore {} undo_log", new Object[]{xid, branchId, state});
}
return;
}
String contextString = rs.getString("context");
Map<String, String> context = this.parseContext(contextString);
//封装undolog,然后执行sql会进行回滚
byte[] rollbackInfo = this.getRollbackInfo(rs);
String serializer = context == null ? null : (String)context.get("serializer");
UndoLogParser parser = serializer == null ? UndoLogParserFactory.getInstance() : UndoLogParserFactory.getInstance(serializer);
BranchUndoLog branchUndoLog = parser.decode(rollbackInfo);
try {
setCurrentSerializer(parser.getName());
List<SQLUndoLog> sqlUndoLogs = branchUndoLog.getSqlUndoLogs();
if (sqlUndoLogs.size() > 1) {
Collections.reverse(sqlUndoLogs);
}
Iterator var18 = sqlUndoLogs.iterator();
while(var18.hasNext()) {
SQLUndoLog sqlUndoLog = (SQLUndoLog)var18.next();
TableMeta tableMeta = TableMetaCacheFactory.getTableMetaCache(dataSourceProxy.getDbType()).getTableMeta(conn, sqlUndoLog.getTableName(), dataSourceProxy.getResourceId());
sqlUndoLog.setTableMeta(tableMeta);
AbstractUndoExecutor undoExecutor = UndoExecutorFactory.getUndoExecutor(dataSourceProxy.getDbType(), sqlUndoLog);
//执行对应的回滚操作
undoExecutor.executeOn(conn);
}
} finally {
removeCurrentSerializer();
}
}
if (exists) {
//删除undolog
this.deleteUndoLog(xid, branchId, conn);
conn.commit();
if (LOGGER.isInfoEnabled()) {
LOGGER.info("xid {} branch {}, undo_log deleted with {}", new Object[]{xid, branchId, AbstractUndoLogManager.State.GlobalFinished.name()});
break;
}
} else {
this.insertUndoLogWithGlobalFinished(xid, branchId, UndoLogParserFactory.getInstance(), conn);
conn.commit();
if (LOGGER.isInfoEnabled()) {
LOGGER.info("xid {} branch {}, undo_log added with {}", new Object[]{xid, branchId, AbstractUndoLogManager.State.GlobalFinished.name()});
break;
}
}
return;
} catch (SQLIntegrityConstraintViolationException var43) {
if (LOGGER.isInfoEnabled()) {
LOGGER.info("xid {} branch {}, undo_log inserted, retry rollback", xid, branchId);
}
} catch (Throwable var44) {
if (conn != null) {
try {
conn.rollback();
} catch (SQLException var41) {
LOGGER.warn("Failed to close JDBC resource while undo ... ", var41);
}
}
throw new BranchTransactionException(TransactionExceptionCode.BranchRollbackFailed_Retriable, String.format("Branch session rollback failed and try again later xid = %s branchId = %s %s", xid, branchId, var44.getMessage()), var44);
} finally {
try {
if (rs != null) {
rs.close();
}
if (selectPST != null) {
selectPST.close();
}
if (conn != null) {
if (originalAutoCommit) {
conn.setAutoCommit(true);
}
conn.close();
}
} catch (SQLException var40) {
LOGGER.warn("Failed to close JDBC resource while undo ... ", var40);
}
}
}
}
服务端回滚事务
服务端的事务回滚是从GlobalRollbackRequest方法进入的
public class GlobalRollbackRequest extends AbstractGlobalEndRequest {
@Override
public short getTypeCode() {
return MessageType.TYPE_GLOBAL_ROLLBACK;
}
@Override
public AbstractTransactionResponse handle(RpcContext rpcContext) {
//这里是核心
return handler.handle(this, rpcContext);
}
}
@Override
public GlobalRollbackResponse handle(GlobalRollbackRequest request, final RpcContext rpcContext) {
//组装响应的数据
GlobalRollbackResponse response = new GlobalRollbackResponse();
response.setGlobalStatus(GlobalStatus.Rollbacking);
exceptionHandleTemplate(new AbstractCallback<GlobalRollbackRequest, GlobalRollbackResponse>() {
@Override
public void execute(GlobalRollbackRequest request, GlobalRollbackResponse response)
throws TransactionException {
try {
//核心的执行类
doGlobalRollback(request, response, rpcContext);
} catch (StoreException e) {
throw new TransactionException(TransactionExceptionCode.FailedStore, String
.format("global rollback request failed. xid=%s, msg=%s", request.getXid(), e.getMessage()), e);
}
}
@Override
public void onTransactionException(GlobalRollbackRequest request, GlobalRollbackResponse response,
TransactionException tex) {
super.onTransactionException(request, response, tex);
// may be appears StoreException outer layer method catch
checkTransactionStatus(request, response);
}
@Override
public void onException(GlobalRollbackRequest request, GlobalRollbackResponse response, Exception rex) {
super.onException(request, response, rex);
// may be appears StoreException outer layer method catch
checkTransactionStatus(request, response);
}
}, request, response);
return response;
}
public GlobalStatus rollback(String xid) throws TransactionException {
//key1:事务回滚,先通过全局事xid找到全局事务和分支事务
//1.先通过全局事务XID找到去global_table查询到全局事务对象
//2.通过查询出来的xid对象去查询branch_talbe是否有分支事务,最后将这两个事务对象封装到GlobalSession中
//这里传入的true代表全局事务和分支事务都查询出来,全局事务是一条数据,分支事务可能会多条是一个list
GlobalSession globalSession = SessionHolder.findGlobalSession(xid);
if (globalSession == null) {
return GlobalStatus.Finished;
}
globalSession.addSessionLifecycleListener(SessionHolder.getRootSessionManager());
// just lock changeStatus
boolean shouldRollBack = SessionHolder.lockAndExecute(globalSession, () -> {
//设置本事务的active状态为false
globalSession.close(); // Highlight: Firstly, close the session, then no more branch can be registered.
//如果目前的全局事务状态为Begin开始状态,那么修改全局事务的状态为回滚中,修改数据表global_table的status的值为Rollbacking
if (globalSession.getStatus() == GlobalStatus.Begin) {
globalSession.changeGlobalStatus(GlobalStatus.Rollbacking);
return true;
}
return false;
});
if (!shouldRollBack) {
return globalSession.getStatus();
}
//key2:真正的回滚逻辑
boolean rollbackSuccess = doGlobalRollback(globalSession, false);
//返回回滚的逻辑
return rollbackSuccess ? GlobalStatus.Rollbacked : globalSession.getStatus();
}
@Override
public boolean doGlobalRollback(GlobalSession globalSession, boolean retrying) throws TransactionException {
boolean success = true;
// start rollback event
MetricsPublisher.postSessionDoingEvent(globalSession, retrying);
if (globalSession.isSaga()) {
success = getCore(BranchType.SAGA).doGlobalRollback(globalSession, retrying);
} else {
//globalSession.getReverseSortedBranches()得到的是所有分支
Boolean result = SessionHelper.forEach(globalSession.getReverseSortedBranches(), branchSession -> {
BranchStatus currentBranchStatus = branchSession.getStatus();
//如果分支事务是在一阶段失败的,调用removeBranch,释放锁
if (currentBranchStatus == BranchStatus.PhaseOne_Failed) {
SessionHelper.removeBranch(globalSession, branchSession, !retrying);
return CONTINUE;
}
try {
//branchRollback回滚分支事务,这里是server端,所以这里的分支事务的回滚是调用的远程进行回滚的
//而远程回滚就是使用的undo log日志表来回滚的
BranchStatus branchStatus = branchRollback(globalSession, branchSession);
if (isXaerNotaTimeout(globalSession, branchStatus)) {
LOGGER.info("Rollback branch XAER_NOTA retry timeout, xid = {} branchId = {}", globalSession.getXid(), branchSession.getBranchId());
branchStatus = BranchStatus.PhaseTwo_Rollbacked;
}
switch (branchStatus) {
//PhaseTwo_Rollbacked表示远程回滚成功
case PhaseTwo_Rollbacked:
// 二阶段回滚,删除分支事务信息
//删除分支事务信息,就是删除branch_table和释放锁
//1.释放锁,删除lock_table中的行锁信息;
//2.删除分支事务,branch_table
SessionHelper.removeBranch(globalSession, branchSession, !retrying);
LOGGER.info("Rollback branch transaction successfully, xid = {} branchId = {}", globalSession.getXid(), branchSession.getBranchId());
return CONTINUE;
case PhaseTwo_RollbackFailed_Unretryable:
//远程回滚失败,修改全局事务状态为RollbackFailed
SessionHelper.endRollbackFailed(globalSession, retrying);
LOGGER.info("Rollback branch transaction fail and stop retry, xid = {} branchId = {}", globalSession.getXid(), branchSession.getBranchId());
return false;
default:
LOGGER.info("Rollback branch transaction fail and will retry, xid = {} branchId = {}", globalSession.getXid(), branchSession.getBranchId());
if (!retrying) {
globalSession.queueToRetryRollback();
}
return false;
}
} catch (Exception ex) {
StackTraceLogger.error(LOGGER, ex,
"Rollback branch transaction exception, xid = {} branchId = {} exception = {}",
new String[] {globalSession.getXid(), String.valueOf(branchSession.getBranchId()), ex.getMessage()});
if (!retrying) {
globalSession.queueToRetryRollback();
}
throw new TransactionException(ex);
}
});
// Return if the result is not null
if (result != null) {
return result;
}
}
// In db mode, lock and branch data residual problems may occur.
// Therefore, execution needs to be delayed here and cannot be executed synchronously.
if (success) {
// 删除全局事务数据
SessionHelper.endRollbacked(globalSession, retrying);
LOGGER.info("Rollback global transaction successfully, xid = {}.", globalSession.getXid());
}
return success;
}
起一个定时任务删除全局事务。
public void init() {
retryRollbacking.scheduleAtFixedRate(
() -> SessionHolder.distributedLockAndExecute(RETRY_ROLLBACKING, this::handleRetryRollbacking), 0,
ROLLBACKING_RETRY_PERIOD, TimeUnit.MILLISECONDS);
retryCommitting.scheduleAtFixedRate(
() -> SessionHolder.distributedLockAndExecute(RETRY_COMMITTING, this::handleRetryCommitting), 0,
COMMITTING_RETRY_PERIOD, TimeUnit.MILLISECONDS);
asyncCommitting.scheduleAtFixedRate(
() -> SessionHolder.distributedLockAndExecute(ASYNC_COMMITTING, this::handleAsyncCommitting), 0,
ASYNC_COMMITTING_RETRY_PERIOD, TimeUnit.MILLISECONDS);
timeoutCheck.scheduleAtFixedRate(
() -> SessionHolder.distributedLockAndExecute(TX_TIMEOUT_CHECK, this::timeoutCheck), 0,
TIMEOUT_RETRY_PERIOD, TimeUnit.MILLISECONDS);
undoLogDelete.scheduleAtFixedRate(
() -> SessionHolder.distributedLockAndExecute(UNDOLOG_DELETE, this::undoLogDelete),
UNDO_LOG_DELAY_DELETE_PERIOD, UNDO_LOG_DELETE_PERIOD, TimeUnit.MILLISECONDS);
}
*/
protected void handleRetryRollbacking() {
SessionCondition sessionCondition = new SessionCondition(rollbackingStatuses);
sessionCondition.setLazyLoadBranch(true);
Collection<GlobalSession> rollbackingSessions =
SessionHolder.getRetryRollbackingSessionManager().findGlobalSessions(sessionCondition);
if (CollectionUtils.isEmpty(rollbackingSessions)) {
return;
}
long now = System.currentTimeMillis();
SessionHelper.forEach(rollbackingSessions, rollbackingSession -> {
try {
// prevent repeated rollback
// !rollbackingSession.isDeadSession() 判断回滚的全局事务必须超时,对应时间可以进入看
if (rollbackingSession.getStatus().equals(GlobalStatus.Rollbacking)
&& !rollbackingSession.isDeadSession()) {
// The function of this 'return' is 'continue'.
return;
}
if (isRetryTimeout(now, MAX_ROLLBACK_RETRY_TIMEOUT.toMillis(), rollbackingSession.getBeginTime())) {
if (ROLLBACK_RETRY_TIMEOUT_UNLOCK_ENABLE) {
rollbackingSession.clean();
}
// Prevent thread safety issues
SessionHolder.getRetryRollbackingSessionManager().removeGlobalSession(rollbackingSession);
LOGGER.error("Global transaction rollback retry timeout and has removed [{}]", rollbackingSession.getXid());
SessionHelper.endRollbackFailed(rollbackingSession, true);
// rollback retry timeout event
MetricsPublisher.postSessionDoneEvent(rollbackingSession, GlobalStatus.RollbackRetryTimeout, true, false);
//The function of this 'return' is 'continue'.
return;
}
rollbackingSession.addSessionLifecycleListener(SessionHolder.getRootSessionManager());
core.doGlobalRollback(rollbackingSession, true);
} catch (TransactionException ex) {
LOGGER.info("Failed to retry rollbacking [{}] {} {}", rollbackingSession.getXid(), ex.getCode(), ex.getMessage());
}
});
}