Seata中AT模式的实现原理03-二阶段提交

news2024/12/25 9:21:28

全局事务提交

TM提交全局事务

当业务正常处理完毕后 本地事务全部提交完成,TM会将xid提交给TC,TC会返回当前事务状态,status由TC决定,TM最后会将xid从RootContext中解绑,全局事务结束。
TransactionalTemplate

    private void commitTransaction(GlobalTransaction tx) throws TransactionalExecutor.ExecutionException {
        try {
            triggerBeforeCommit();
            tx.commit();
            triggerAfterCommit();
        } catch (TransactionException txe) {
            // 4.1 Failed to commit
            throw new TransactionalExecutor.ExecutionException(tx, txe,
                TransactionalExecutor.Code.CommitFailure);
        }
    }
    @Override
    public void commit() throws TransactionException {
        if (role == GlobalTransactionRole.Participant) {
            // Participant has no responsibility of committing
            if (LOGGER.isDebugEnabled()) {
                LOGGER.debug("Ignore Commit(): just involved in global transaction [{}]", xid);
            }
            return;
        }
        assertXIDNotNull();
          // 默认重试5次 client.tm.commitRetryCount
        int retry = COMMIT_RETRY_COUNT <= 0 ? DEFAULT_TM_COMMIT_RETRY_COUNT : COMMIT_RETRY_COUNT;
        try {
            while (retry > 0) {
                try {
                // GlobalCommitRequest 发送全局事务id给TC,TC会返回全局事务状态给TM
                    status = transactionManager.commit(xid);
                    break;
                } catch (Throwable ex) {
                    LOGGER.error("Failed to report global commit [{}],Retry Countdown: {}, reason: {}", this.getXid(), retry, ex.getMessage());
                    retry--;
                    if (retry == 0) {
                        throw new TransactionException("Failed to report global commit", ex);
                    }
                }
            }
        } finally {
         // 将xid从ThreadLocal中移除,代表这个全局事务已经结束
            if (xid.equals(RootContext.getXID())) {
                suspend();
            }
        }
        if (LOGGER.isInfoEnabled()) {
            LOGGER.info("[{}] commit status: {}", xid, status);
        }
    }

TC处理全局事务提交请求

AbstractTCInboundHandler

    @Override
    public GlobalCommitResponse handle(GlobalCommitRequest request, final RpcContext rpcContext) {
        GlobalCommitResponse response = new GlobalCommitResponse();
        response.setGlobalStatus(GlobalStatus.Committing);
        exceptionHandleTemplate(new AbstractCallback<GlobalCommitRequest, GlobalCommitResponse>() {
            @Override
            public void execute(GlobalCommitRequest request, GlobalCommitResponse response)
                throws TransactionException {
                try {
                     //全局事务提交
                    doGlobalCommit(request, response, rpcContext);
                } catch (StoreException e) {
                    throw new TransactionException(TransactionExceptionCode.FailedStore,
                        String.format("global commit request failed. xid=%s, msg=%s", request.getXid(), e.getMessage()),
                        e);
                }
            }
           @Override
        public void onTransactionException(GlobalCommitRequest request, GlobalCommitResponse response,
                                           TransactionException tex) {
            super.onTransactionException(request, response, tex);
            // 设置当前全局事务状态
            checkTransactionStatus(request, response);
        }

        @Override
        public void onException(GlobalCommitRequest request, GlobalCommitResponse response, Exception rex) {
            // 设置当前全局事务状态
            super.onException(request, response, rex);
            checkTransactionStatus(request, response);
        }
        }, request, response);
        return response;
    }

DefaultCoordinator

    @Override
    protected void doGlobalCommit(GlobalCommitRequest request, GlobalCommitResponse response, RpcContext rpcContext)
            throws TransactionException {
        MDC.put(RootContext.MDC_KEY_XID, request.getXid());
        response.setGlobalStatus(core.commit(request.getXid()));
    }
public GlobalStatus commit(String xid) throws TransactionException {
    GlobalSession globalSession = SessionHolder.findGlobalSession(xid);
    if (globalSession == null) {
        return GlobalStatus.Finished;
    }
    globalSession.addSessionLifecycleListener(SessionHolder.getRootSessionManager());
    boolean shouldCommit = SessionHolder.lockAndExecute(globalSession, () -> {
        if (globalSession.getStatus() == GlobalStatus.Begin) {
            // 如果分支事务都是AT模式,释放全局锁,delete from lock_table where xid = ?
            globalSession.closeAndClean();
            // 如果分支事务都是AT模式,或分支事务有一阶段提交失败的,则可以执行异步提交
            if (globalSession.canBeCommittedAsync()) {
                // 执行异步提交,更新全局事务状态为AsyncCommitting,update global_table set status = AsyncCommitting where xid = ?
                globalSession.asyncCommit();
                MetricsPublisher.postSessionDoneEvent(globalSession, GlobalStatus.Committed, false, false);
                return false;
            } else {
                globalSession.changeGlobalStatus(GlobalStatus.Committing);
                return true;
            }
        }
        return false;
    });

    if (shouldCommit) { // 同步提交
        boolean success = doGlobalCommit(globalSession, false);
        // ...
    } else { // 异步提交
        return globalSession.getStatus() == GlobalStatus.AsyncCommitting ? GlobalStatus.Committed : globalSession.getStatus();
    }
}

DefaultCoordinator在初始化时启动了一个1秒执行一次的定时任务,通过distributed_lock表获取分布式锁,同一时间只有一个TC能执行异步提交任务。
DefaultCoordinator

public void init() {
    // 全局事务二阶段异步提交
    asyncCommitting.scheduleAtFixedRate(
        () -> SessionHolder.distributedLockAndExecute(ASYNC_COMMITTING, this::handleAsyncCommitting), 0,
        1000, TimeUnit.MILLISECONDS);
}

    protected void handleAsyncCommitting() {
        SessionCondition sessionCondition = new SessionCondition(GlobalStatus.AsyncCommitting);
        Collection<GlobalSession> asyncCommittingSessions =
                SessionHolder.getAsyncCommittingSessionManager().findGlobalSessions(sessionCondition);
        if (CollectionUtils.isEmpty(asyncCommittingSessions)) {
            return;
        }
        SessionHelper.forEach(asyncCommittingSessions, asyncCommittingSession -> {
            try {
                asyncCommittingSession.addSessionLifecycleListener(SessionHolder.getRootSessionManager());
                //提交
                core.doGlobalCommit(asyncCommittingSession, true);
            } catch (TransactionException ex) {
                LOGGER.error("Failed to async committing [{}] {} {}", asyncCommittingSession.getXid(), ex.getCode(), ex.getMessage(), ex);
            }
        });
    }

DefaultCore.doGlobalCommit
1.循环处理每个分支事务,发送BranchCommitRequest给RM,RM删除undo_log;
2.如果RM处理二阶段提交成功,返回分支事务状态PhaseTwo_Committed,TC删除branch_table中对应的分支事务;
3.如果所有分支事务都处理成功,TC删除global_table中的全局事务;

    @Override
    public boolean doGlobalCommit(GlobalSession globalSession, boolean retrying) throws TransactionException {
        boolean success = true;
        // start committing event
        MetricsPublisher.postSessionDoingEvent(globalSession, retrying);
        //如果是saga模式  
        if (globalSession.isSaga()) {
            success = getCore(BranchType.SAGA).doGlobalCommit(globalSession, retrying);
        } else {
            //循环分支事务
            Boolean result = SessionHelper.forEach(globalSession.getSortedBranches(), branchSession -> {
                // if not retrying, skip the canBeCommittedAsync branches
                if (!retrying && branchSession.canBeCommittedAsync()) {
                    return CONTINUE;
                }

                BranchStatus currentStatus = branchSession.getStatus();
                if (currentStatus == BranchStatus.PhaseOne_Failed) {
                    SessionHelper.removeBranch(globalSession, branchSession, !retrying);
                    return CONTINUE;
                }
                try {
                    //发送BranchCommitRequest给RM,RM会删除undo_log
                    BranchStatus branchStatus = getCore(branchSession.getBranchType()).branchCommit(globalSession, branchSession);
                    if (isXaerNotaTimeout(globalSession,branchStatus)) {
                        LOGGER.info("Commit branch XAER_NOTA retry timeout, xid = {} branchId = {}", globalSession.getXid(), branchSession.getBranchId());
                        branchStatus = BranchStatus.PhaseTwo_Committed;
                    }
                    switch (branchStatus) {
                        case PhaseTwo_Committed:
                        //删除branch_table中的分支事务记录
                            SessionHelper.removeBranch(globalSession, branchSession, !retrying);
                            return CONTINUE;
                        case PhaseTwo_CommitFailed_Unretryable:
                            //not at branch
                            SessionHelper.endCommitFailed(globalSession, retrying);
                            LOGGER.error("Committing global transaction[{}] finally failed, caused by branch transaction[{}] commit failed.", globalSession.getXid(), branchSession.getBranchId());
                            return false;

                        default:
                            if (!retrying) {
                                globalSession.queueToRetryCommit();
                                return false;
                            }
                            if (globalSession.canBeCommittedAsync()) {
                                LOGGER.error("Committing branch transaction[{}], status:{} and will retry later",
                                    branchSession.getBranchId(), branchStatus);
                                return CONTINUE;
                            } else {
                                LOGGER.error(
                                    "Committing global transaction[{}] failed, caused by branch transaction[{}] commit failed, will retry later.", globalSession.getXid(), branchSession.getBranchId());
                                return false;
                            }
                    }
                } catch (Exception ex) {
                    StackTraceLogger.error(LOGGER, ex, "Committing branch transaction exception: {}",
                        new String[] {branchSession.toString()});
                    if (!retrying) {
                        globalSession.queueToRetryCommit();
                        throw new TransactionException(ex);
                    }
                }
                //某个分支事务处理失败,继续处理后续分支事务
                return CONTINUE;
            });
            // Return if the result is not null
            if (result != null) {
                return result;
            }
            //If has branch and not all remaining branches can be committed asynchronously,
            //do print log and return false
            if (globalSession.hasBranch() && !globalSession.canBeCommittedAsync()) {
                LOGGER.info("Committing global transaction is NOT done, xid = {}.", globalSession.getXid());
                return false;
            }
            if (!retrying) {
                //contains not AT branch
                globalSession.setStatus(GlobalStatus.Committed);
            }
        }
        // if it succeeds and there is no branch, retrying=true is the asynchronous state when retrying. EndCommitted is
        // executed to improve concurrency performance, and the global transaction ends..
        if (success && globalSession.getBranchSessions().isEmpty()) {
        //如果所有分支事务被删除,则删除全局事务 delete from global_table where xid = ?
            SessionHelper.endCommitted(globalSession, retrying);
            LOGGER.info("Committing global transaction is successfully done, xid = {}.", globalSession.getXid());
        }
        return success;
    }

RM处理分支事务的提交

AbstractRMHandler

    @Override
    public BranchCommitResponse handle(BranchCommitRequest request) {
        BranchCommitResponse response = new BranchCommitResponse();
        exceptionHandleTemplate(new AbstractCallback<BranchCommitRequest, BranchCommitResponse>() {
            @Override
            public void execute(BranchCommitRequest request, BranchCommitResponse response)
                throws TransactionException {
                doBranchCommit(request, response);
            }
        }, request, response);
        return response;
    }

AbstractRMHandler实际委托ResourceManager实现分支事务提交。
AsyncWorker异步处理分支事务提交,将xid、branchId、resourceId封装为Phase2Context,放入阻塞队列,等待一个1秒执行一次的定时任务消费队列中的Phase2Context。

// DataSourceManager
private final AsyncWorker asyncWorker = new AsyncWorker(this);
public BranchStatus branchCommit(BranchType branchType, String xid, long branchId, String resourceId,
                                 String applicationData) throws TransactionException {
    return asyncWorker.branchCommit(xid, branchId, resourceId);
}
    private void addToCommitQueue(Phase2Context context) {
        if (commitQueue.offer(context)) {
            return;
        }
        CompletableFuture.runAsync(this::doBranchCommitSafely, scheduledExecutor)
                .thenRun(() -> addToCommitQueue(context));
    }

最后会由AsyncWorker执行 从阻塞队列里面poll数据 然后执行 实际上就是删除 undo_log数据

private void deleteUndoLog(final Connection conn, UndoLogManager undoLogManager, List<Phase2Context> contexts) {
    Set<String> xids = new LinkedHashSet<>(contexts.size());
    Set<Long> branchIds = new LinkedHashSet<>(contexts.size());
    contexts.forEach(context -> {
        xids.add(context.xid);
        branchIds.add(context.branchId);
    });

    try {
        undoLogManager.batchDeleteUndoLog(xids, branchIds, conn);
        if (!conn.getAutoCommit()) {
            conn.commit();
        }
    } catch (SQLException e) {
        LOGGER.error("Failed to batch delete undo log", e);
        try {
            conn.rollback();
            addAllToCommitQueue(contexts);
        } catch (SQLException rollbackEx) {
            LOGGER.error("Failed to rollback JDBC resource after deleting undo log failed", rollbackEx);
        }
    }
}

总结

1.当一阶段提交没有任何异常情况下进行二阶段提交 TM会发送提交全局事务请求给TC
2.TC接收到全局事务提交请求之后会循环所有的分支事务 往RM发送分支事务提交的请求
3.RM接收到分支事务提交请求之后会委派给AsyncWorker去处理 从阻塞队列里面拿数据
4.AsyncWorker处理的时候其实就是将undo_log的数据删除
5.当RM处理完之后返回TC 然后会将全局锁删除 就是 lock_table表数据
6.将分支事务状态变为二阶段已提交
7.然后会将分支事务表brach_table数据给清空
8.然后会将global_table数据删除
9.当TC端所有处理完成之后 TM收到响应信息把XID给解绑
请添加图片描述

TM全局事务回滚

TM处理全局事务回滚向TC发起请求

当一阶段提交出问题 会触发TM的回滚 然后交给TC执行

    private void completeTransactionAfterThrowing(TransactionInfo txInfo, GlobalTransaction tx, Throwable originalException) throws TransactionalExecutor.ExecutionException {
        //roll back
        if (txInfo != null && txInfo.rollbackOn(originalException)) {
            try {
               // 默认所有异常都会回滚
                rollbackTransaction(tx, originalException);
            } catch (TransactionException txe) {
                // Failed to rollback
                throw new TransactionalExecutor.ExecutionException(tx, txe,
                        TransactionalExecutor.Code.RollbackFailure, originalException);
            }
        } else {
         // 如果GlobalTransactional排除部分异常,可以选择提交
            // not roll back on this exception, so commit
            commitTransaction(tx);
        }
    }
    private void rollbackTransaction(GlobalTransaction tx, Throwable originalException) throws TransactionException, TransactionalExecutor.ExecutionException {
        triggerBeforeRollback();
        tx.rollback();
        triggerAfterRollback();
        // 3.1 Successfully rolled back
        throw new TransactionalExecutor.ExecutionException(tx, GlobalStatus.RollbackRetrying.equals(tx.getLocalStatus())
            ? TransactionalExecutor.Code.RollbackRetrying : TransactionalExecutor.Code.RollbackDone, originalException);
    }

TM向TC发送回滚请求,请求包含xid,由TC返回全局事务状态。这和TM发送提交请求类似。

    @Override
    public void rollback() throws TransactionException {
        if (role == GlobalTransactionRole.Participant) {
            // Participant has no responsibility of rollback
            if (LOGGER.isDebugEnabled()) {
                LOGGER.debug("Ignore Rollback(): just involved in global transaction [{}]", xid);
            }
            return;
        }
        assertXIDNotNull();
// 默认重试5次 client.tm.rollbackRetryCount
        int retry = ROLLBACK_RETRY_COUNT <= 0 ? DEFAULT_TM_ROLLBACK_RETRY_COUNT : ROLLBACK_RETRY_COUNT;
        try {
            while (retry > 0) {
                try {
                  // GlobalRollbackRequest,包含全局事务id给TC,TC会返回全局事务状态给TM
                    status = transactionManager.rollback(xid);
                    break;
                } catch (Throwable ex) {
                    LOGGER.error("Failed to report global rollback [{}],Retry Countdown: {}, reason: {}", this.getXid(), retry, ex.getMessage());
                    retry--;
                    if (retry == 0) {
                        throw new TransactionException("Failed to report global rollback", ex);
                    }
                }
            }
        } finally {
            //解绑全局事务ID
            if (xid.equals(RootContext.getXID())) {
                suspend();
            }
        }
        if (LOGGER.isInfoEnabled()) {
            LOGGER.info("[{}] rollback status: {}", xid, status);
        }
    }

TC处理全局事务回滚

DefaultCore

public GlobalStatus rollback(String xid) throws TransactionException {
    GlobalSession globalSession = SessionHolder.findGlobalSession(xid);
    if (globalSession == null) {
        return GlobalStatus.Finished;
    }
    globalSession.addSessionLifecycleListener(SessionHolder.getRootSessionManager());
    boolean shouldRollBack = SessionHolder.lockAndExecute(globalSession, () -> {
        globalSession.close();
        if (globalSession.getStatus() == GlobalStatus.Begin) {
            // 将全局锁lock_table状态更新为Rollbacking 
            // 将全局事务global_table状态更新为Rollbacking
            globalSession.changeGlobalStatus(GlobalStatus.Rollbacking);
            return true;
        }
        return false;
    });
    if (!shouldRollBack) {
        return globalSession.getStatus();
    }
    // 执行全局回滚
    boolean rollbackSuccess = doGlobalRollback(globalSession, false);
    return rollbackSuccess ? GlobalStatus.Rollbacked : globalSession.getStatus();
}

在变更完中间状态后,执行doGlobalRollback方法实际执行二阶段回滚
二阶段回滚逻辑:
1:TC循环向所有RM发送BranchRollbackRequest;
2:如果RM返回PhaseTwo_Rollbacked,则删除对应分支事务;发生异常或返回状态非回滚成功,将全局事务标记为RollbackRetrying,等待后续补偿执行全局回滚;
3:如果所有RM二阶段回滚成功,对于file存储模式,直接删除全局事务(因为file模式在分支注册和二阶段回滚操作上都加了锁);对于db/redis存储模式,需要异步再次执行doGlobalRollback,确保不会有分支事务注册与二阶段回滚同时发生,造成分支事务注册成功,二阶段回滚没有清理干净全局锁和分支事务;
DefaultCore

public boolean doGlobalRollback(GlobalSession globalSession, boolean retrying) throws TransactionException {
    boolean success = true;
    //循环向RM发送回滚分支事务请求
    Boolean result = SessionHelper.forEach(globalSession.getReverseSortedBranches(), branchSession -> {
        BranchStatus currentBranchStatus = branchSession.getStatus();
        if (currentBranchStatus == BranchStatus.PhaseOne_Failed) {
            SessionHelper.removeBranch(globalSession, branchSession, !retrying);
            return CONTINUE;
        }
        try {
            //  发送BranchRollbackRequest
            BranchStatus branchStatus = branchRollback(globalSession, branchSession);
            switch (branchStatus) {
                case PhaseTwo_Rollbacked:
                    // 释放全局锁,删除分支事务
                    SessionHelper.removeBranch(globalSession, branchSession, !retrying);
                    return CONTINUE;
                case PhaseTwo_RollbackFailed_Unretryable: // 回滚失败且无法重试成功
                    SessionHelper.endRollbackFailed(globalSession, retrying);
                    return false;
                default:
                    // 如果RM回滚失败 全局事务状态变为RollbackRetrying 等待重试
                    if (!retrying) {
                        globalSession.queueToRetryRollback();
                    }
                    return false;
            }
        } catch (Exception ex) {
            if (!retrying) {
                // 如果步骤异常 全局事务状态变为RollbackRetrying 等待重试
                globalSession.queueToRetryRollback();
            }
            throw new TransactionException(ex);
        }
    });
    // 如果存在一个分支事务回滚失败,则返回false
    if (result != null) {
        return result;
    }

    // 对于file模式,直接删除全局事务
    // 对于db/redis模式,异步再次执行doGlobalRollback,这里不做任何处理
    //  防止由于各种网络波动造成分支事务注册成功lock_table和branch_table中始终有残留数据
    //  导致全局锁一直被占用,无法释放
    if (success) {
        SessionHelper.endRollbacked(globalSession, retrying);
    }
    return success;
}

// SessionHelper
public static void endRollbacked(GlobalSession globalSession, boolean retryGlobal) throws TransactionException {
     // 如果是重试 或 file模式
     if (retryGlobal || !DELAY_HANDLE_SESSION) {
        long beginTime = System.currentTimeMillis();
        GlobalStatus currentStatus = globalSession.getStatus();
        boolean retryBranch =
            currentStatus == GlobalStatus.TimeoutRollbackRetrying || currentStatus == GlobalStatus.RollbackRetrying;
        if (isTimeoutGlobalStatus(currentStatus)) {
            globalSession.changeGlobalStatus(GlobalStatus.TimeoutRollbacked);
        } else {
            globalSession.changeGlobalStatus(GlobalStatus.Rollbacked);
        }
        // 删除全局事务global_table
        globalSession.end();
    }
}

RM处理分支事务回滚

AbstractRMHandler

    @Override
    public BranchRollbackResponse handle(BranchRollbackRequest request) {
        BranchRollbackResponse response = new BranchRollbackResponse();
        exceptionHandleTemplate(new AbstractCallback<BranchRollbackRequest, BranchRollbackResponse>() {
            @Override
            public void execute(BranchRollbackRequest request, BranchRollbackResponse response)
                throws TransactionException {
                //分支事务回滚
                doBranchRollback(request, response);
            }
        }, request, response);
        return response;
    }
  protected void doBranchRollback(BranchRollbackRequest request, BranchRollbackResponse response)
    throws TransactionException {
    String xid = request.getXid();
    long branchId = request.getBranchId();
    String resourceId = request.getResourceId();
    String applicationData = request.getApplicationData();
    BranchStatus status = getResourceManager().branchRollback(request.getBranchType(), xid, branchId, resourceId,
        applicationData);
    response.setXid(xid);
    response.setBranchId(branchId);
    response.setBranchStatus(status);
}

DataSourceManager

public BranchStatus branchRollback(BranchType branchType, String xid, long branchId, String resourceId,
                                   String applicationData) throws TransactionException {
    DataSourceProxy dataSourceProxy = get(resourceId);
    if (dataSourceProxy == null) {
        throw new ShouldNeverHappenException(String.format("resource: %s not found",resourceId));
    }
    try {
        UndoLogManagerFactory.getUndoLogManager(dataSourceProxy.getDbType()).undo(dataSourceProxy, xid, branchId);
    } catch (TransactionException te) {
        if (te.getCode() == TransactionExceptionCode.BranchRollbackFailed_Unretriable) {
            return BranchStatus.PhaseTwo_RollbackFailed_Unretryable;
        } else {
            return BranchStatus.PhaseTwo_RollbackFailed_Retryable;
        }
    }
    return BranchStatus.PhaseTwo_Rollbacked;

}

RM二阶段回滚就是执行undoLog里的前置镜像,回滚一阶段的本地事务。具体逻辑都在

// AbstractUndoLogManager
public void undo(DataSourceProxy dataSourceProxy, String xid, long branchId) throws TransactionException {
    Connection conn = null;
    ResultSet rs = null;
    PreparedStatement selectPST = null;
    boolean originalAutoCommit = true;
    for (; ; ) {
        try {
            conn = dataSourceProxy.getPlainConnection();
            if (originalAutoCommit = conn.getAutoCommit()) {
                conn.setAutoCommit(false);
            }
            // Step1 根据xid+branchId查询undo_log
            selectPST = conn.prepareStatement(SELECT_UNDO_LOG_SQL);
            selectPST.setLong(1, branchId);
            selectPST.setString(2, xid);
            rs = selectPST.executeQuery();

            boolean exists = false;
            while (rs.next()) {
                exists = true;
                int state = rs.getInt(ClientTableColumnsName.UNDO_LOG_LOG_STATUS);
                if (!canUndo(state)) {
                    return;
                }

                String contextString = rs.getString(ClientTableColumnsName.UNDO_LOG_CONTEXT);
                Map<String, String> context = parseContext(contextString);
                byte[] rollbackInfo = getRollbackInfo(rs);

                String serializer = context == null ? null : context.get(UndoLogConstants.SERIALIZER_KEY);
                UndoLogParser parser = serializer == null ? UndoLogParserFactory.getInstance()
                    : UndoLogParserFactory.getInstance(serializer);
                BranchUndoLog branchUndoLog = parser.decode(rollbackInfo);

                try {
                    setCurrentSerializer(parser.getName());
                    List<SQLUndoLog> sqlUndoLogs = branchUndoLog.getSqlUndoLogs();
                    if (sqlUndoLogs.size() > 1) {
                        Collections.reverse(sqlUndoLogs);
                    }
                    // Step2 回滚本地事务
                    for (SQLUndoLog sqlUndoLog : sqlUndoLogs) {
                        TableMeta tableMeta = TableMetaCacheFactory.getTableMetaCache(dataSourceProxy.getDbType()).getTableMeta(
                            conn, sqlUndoLog.getTableName(), dataSourceProxy.getResourceId());
                        sqlUndoLog.setTableMeta(tableMeta);
                        AbstractUndoExecutor undoExecutor = UndoExecutorFactory.getUndoExecutor(
                            dataSourceProxy.getDbType(), sqlUndoLog);
                        undoExecutor.executeOn(conn);
                    }
                } finally {
                    removeCurrentSerializer();
                }
            }
            // See https://github.com/seata/seata/issues/489
            // Step3 undo_log处理
            if (exists) {
                // RM一阶段正常提交,删除undoLog
                deleteUndoLog(xid, branchId, conn);
                conn.commit();
            } else {
                // RM一阶段提交失败,TC由于全局事务超时发起回滚
                // 此时RM的本地事务可能提交,这里插入一个undo_log来防止RM提交事务成功导致数据不一致
                insertUndoLogWithGlobalFinished(xid, branchId, UndoLogParserFactory.getInstance(), conn);
                conn.commit();
            }

            return;
        } catch (SQLIntegrityConstraintViolationException e) {
            // insertUndoLogWithGlobalFinished 如果插入undo_log由于唯一约束失败,那么执行重试回滚
            // 日志...
        } catch (Throwable e) {
            if (conn != null) {
                try {
                    conn.rollback();
                } catch (SQLException rollbackEx) {
                    LOGGER.warn("Failed to close JDBC resource while undo ... ", rollbackEx);
                }
            }
            throw new BranchTransactionException(BranchRollbackFailed_Retriable, String
                .format("Branch session rollback failed and try again later xid = %s branchId = %s %s", xid,
                    branchId, e.getMessage()), e);

        } finally {
            // 关闭资源
        }
    }
}

二阶段回滚RM本地事务分为三步:
1.查询分支事务对应undoLog,select from undo_log where xid = ? and branch_id = ? for update;
2.AbstractUndoExecutor利用undoLog中的前置镜像回滚本地事务;
3.处理undoLog;

回滚一阶段本地事务

根据undoLog回滚一阶段本地事务。

// AbstractUndoExecutor
public void executeOn(Connection conn) throws SQLException {
    // Step1 比较当前数据和前后镜像,如果情况允许才会回滚
    if (IS_UNDO_DATA_VALIDATION_ENABLE && !dataValidationAndGoOn(conn)) {
        return;
    }
    PreparedStatement undoPST = null;
    try {
        // Step2 构建回滚sql 如 UPDATE a SET x = ? WHERE pk1 in (?)
        String undoSQL = buildUndoSQL();
        undoPST = conn.prepareStatement(undoSQL);
        TableRecords undoRows = getUndoRows();
        // 根据主键回滚前置镜像中每一行数据
        for (Row undoRow : undoRows.getRows()) {
            ArrayList<Field> undoValues = new ArrayList<>();
            List<Field> pkValueList = getOrderedPkList(undoRows, undoRow, getDbType(conn));
            for (Field field : undoRow.getFields()) {
                if (field.getKeyType() != KeyType.PRIMARY_KEY) {
                    undoValues.add(field);
                }
            }
            // Step3 替换占位符
            undoPrepare(undoPST, undoValues, pkValueList);
            // Step4 根据id回滚一行
            undoPST.executeUpdate();
        }

    } catch (Exception ex) {
        if (ex instanceof SQLException) {
            throw (SQLException) ex;
        } else {
            throw new SQLException(ex);
        }
    }
    finally {
        //important for oracle
        IOUtil.close(undoPST);
    }

}

在执行回滚之前,dataValidationAndGoOn方法会比较当前数据和前后镜像,如果情况允许才会回滚

protected boolean dataValidationAndGoOn(Connection conn) throws SQLException {
    TableRecords beforeRecords = sqlUndoLog.getBeforeImage();
    TableRecords afterRecords = sqlUndoLog.getAfterImage();
    // case1 前后镜像一致,不需要回滚,返回false
    Result<Boolean> beforeEqualsAfterResult = DataCompareUtils.isRecordsEquals(beforeRecords, afterRecords);
    if (beforeEqualsAfterResult.getResult()) {
        return false;
    }

    // select for update by 主键 获取当前快照数据
    TableRecords currentRecords = queryCurrentRecords(conn);
    // case2 当前快照数据和后置镜像不一致
    Result<Boolean> afterEqualsCurrentResult = DataCompareUtils.isRecordsEquals(afterRecords, currentRecords);
    if (!afterEqualsCurrentResult.getResult()) {
        // case2-1 如果前置镜像与当前快照一致,也不需要回滚,返回false
        Result<Boolean> beforeEqualsCurrentResult = DataCompareUtils.isRecordsEquals(beforeRecords, currentRecords);
        if (beforeEqualsCurrentResult.getResult()) {
            return false;
        } else {
            // case2-2 当前快照数据和后置镜像不一致,抛出异常,有脏数据
            throw new SQLException("Has dirty records when undo.");
        }
    }
    // case3 当前快照数据与后置镜像一致,可以执行回滚
    return true;
}

处理undolog

如果RM一阶段commit成功,undo_log存在,即exists=true,只要删除undo_log即可,与回滚事务一起提交;
如果RM一阶段未commit成功,undo_log不存在,可能由于RM一阶段业务处理超时,导致TC判断全局事务超时发起二阶段回滚,此时RM一阶段事务提交和RM二阶段回滚可能同时发生。
为了确保数据一致性,利用undo_log的xid和branch_id的唯一约束,插入一条status=GlobalFinished的数据来解决这个问题。
如果RM一阶段事务提交成功,即一阶段提交插入status=Normal的undo_log成功,那么在二阶段回滚时insertUndoLogWithGlobalFinished会抛出SQLIntegrityConstraintViolationException唯一约束冲突,AbstractUndoLogManager.undo重试二阶段回滚本地事务,最终会回滚成功;
如果二阶段回滚插入status=GlobalFinished的undo_log成功,那么RM一阶段就不会提交成功(发生唯一约束冲突),从而解决数据一致性问题。
AbstractUndoLogManager

public void undo(DataSourceProxy dataSourceProxy, String xid, long branchId) throws TransactionException {
    for (; ; ) {
        try {
            // ...
            // See https://github.com/seata/seata/issues/489
            // Step3 undo_log处理
            if (exists) {
                // RM一阶段正常提交,删除undoLog
                deleteUndoLog(xid, branchId, conn);
                conn.commit();
            } else {
                // RM一阶段超时,TC由于全局事务超时发起回滚
                // 此时RM的本地事务可能同时提交,这里插入一个undo_log来防止RM提交事务成功导致数据不一致
                insertUndoLogWithGlobalFinished(xid, branchId, UndoLogParserFactory.getInstance(), conn);
                conn.commit();
            }

            return;
        } catch (SQLIntegrityConstraintViolationException e) {
            // insertUndoLogWithGlobalFinished 如果插入undo_log由于唯一约束失败,那么执行重试回滚
            // 日志...
        } 
        // ...
    }
}

// MySQLUndoLogManager
protected void insertUndoLogWithGlobalFinished(String xid, long branchId, UndoLogParser parser, Connection conn) throws SQLException {
    insertUndoLog(xid, branchId, buildContext(parser.getName(), CompressorType.NONE), parser.getDefaultContent(), State.GlobalFinished, conn);
}

总结

1.当一阶段业务或者其他发生异常的时候会触发二阶段的回滚
2.TM发送二阶段回滚请求给TC
3.TC接收回滚请求进行处理 会先遍历所有的分支事务 然后往RM去发送回滚的请求
4.RM接收请求之后会基于undolog去进行回滚
4.1会先查询当前undo_log 然后拿到前置和后置镜像 去比较前置和后置镜像和当前需要执行的语句是不是一致的
4.2如果后置镜像和需要回滚的语句是一样的那正常回滚 如果不一样则代表有脏数据的情况抛异常
5.如果一阶段commit undo_log存在 只要删除undo_log即可,与回滚事务一起提交
6.如果RM一阶段未commit成功,undo_log不存在,可能由于RM一阶段业务处理超时,导致TC判断全局事务超时发起二阶段回滚,此时RM一阶段事务提交和RM二阶段回滚可能同时发生。
7.当RM处理完之后响应TC TC会删除分支事务 然后把状态改为二阶段已回滚
8.当所有分支事务全部回滚结束之后 将全局事务删除

请添加图片描述

二阶段失败重试机制

DefaultCoordinator每秒执行handleRetryRollbacking方法,一方面是处理二阶段回滚的重试工作,另一方面处理正常二阶段回滚的剩余数据处理,比如残留的全局锁、分支事务,以及db/redis存储模式下需要异步清理的全局事务。
重试工作会一直进行下去,除非RM返回PhaseTwo_Rollbacked回滚成功,或PhaseTwo_RollbackFailed_Unretryable(代表回滚失败但是重试也不可能成功,属于不可恢复异常,目前这一状态只有XA模式下会返回,所以在AT模式下会重试到RM返回二阶段回滚成功为止)。

// DefaultCoordinator
protected void handleRetryRollbacking() {
    // 查询TimeoutRollbacking,TimeoutRollbackRetrying, RollbackRetrying, Rollbacking 的全局事务
    SessionCondition sessionCondition = new SessionCondition(rollbackingStatuses);
    sessionCondition.setLazyLoadBranch(true);
    Collection<GlobalSession> rollbackingSessions =
        SessionHolder.getRetryRollbackingSessionManager().findGlobalSessions(sessionCondition);
    if (CollectionUtils.isEmpty(rollbackingSessions)) {
        return;
    }
    long now = System.currentTimeMillis();
    SessionHelper.forEach(rollbackingSessions, rollbackingSession -> {
        try {
            // 如果是正在回滚的事务,需要等待开启全局事务的130秒后变为DeadSession再处理
            if (rollbackingSession.getStatus().equals(GlobalStatus.Rollbacking)
                && !rollbackingSession.isDeadSession()) {
                return;
            }
            // 默认MAX_ROLLBACK_RETRY_TIMEOUT=-1,不走这里
            if (isRetryTimeout(now, MAX_ROLLBACK_RETRY_TIMEOUT.toMillis(), rollbackingSession.getBeginTime())) {
                // ...
                return;
            }
            // 再次执行全局事务回滚逻辑
            rollbackingSession.addSessionLifecycleListener(SessionHolder.getRootSessionManager());
            core.doGlobalRollback(rollbackingSession, true);
        } catch (TransactionException ex) {
            LOGGER.info("Failed to retry rollbacking [{}] {} {}", rollbackingSession.getXid(), ex.getCode(), ex.getMessage());
        }
    });
}

全局事务超时处理

DefaultCoordinator每秒执行timeoutCheck方法,将处于begin状态的超时全局事务更新为TimeoutRollbacking状态,交由二阶段回滚重试定时任务(handleRetryRollbacking),异步执行回滚操作。

// DefaultCoordinator
protected void timeoutCheck() {
    // 1. 查询状态处于begin的全局事务
    SessionCondition sessionCondition = new SessionCondition(GlobalStatus.Begin);
    sessionCondition.setLazyLoadBranch(true);
    Collection<GlobalSession> beginGlobalsessions =
        SessionHolder.getRootSessionManager().findGlobalSessions(sessionCondition);
    if (CollectionUtils.isEmpty(beginGlobalsessions)) {
        return;
    }
    SessionHelper.forEach(beginGlobalsessions, globalSession -> {
        SessionHolder.lockAndExecute(globalSession, () -> {
            // 2. 校验是否超时
            if (globalSession.getStatus() != GlobalStatus.Begin || !globalSession.isTimeout()) {
                return false;
            }
            globalSession.addSessionLifecycleListener(SessionHolder.getRootSessionManager());
            globalSession.close();

            // 3. 更新全局事务状态为TimeoutRollbacking,通过handleRetryRollbacking异步处理二阶段回滚
            globalSession.setStatus(GlobalStatus.TimeoutRollbacking);

            globalSession.addSessionLifecycleListener(SessionHolder.getRetryRollbackingSessionManager());
            SessionHolder.getRetryRollbackingSessionManager().addGlobalSession(globalSession);
            return true;
        });
    });
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1326090.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

序列化类的高级用法

1.3.3 模型类序列化器 如果我们想要使用序列化器对应的是Django的模型类&#xff0c;DRF为我们提供了ModelSerializer模型类序列化器来帮助我们快速创建一个Serializer类。 ModelSerializer与常规的Serializer相同&#xff0c;但提供了&#xff1a; 基于模型类自动生成一系列…

ansible的playbook

1、playbook的组成部分 &#xff08;1&#xff09;task任务&#xff1a;在目标主机上执行的操作&#xff0c;使用模块定义这些操作&#xff0c;每个任务都是一个模块的调用 &#xff08;2&#xff09;variables变量&#xff1a;存储和传递数据&#xff08;变量可以自定义&…

使用Python将OSS文件免费下载到本地:项目分析和准备工作

大家好&#xff0c;我是水滴~~ 本文将介绍如何使用Python编程语言将OSS&#xff08;对象存储服务&#xff09;中的文件免费下载到本地计算机。我们先进行项目分析和准备工作&#xff0c;为后续的编码及实施提供基础。 《Python入门核心技术》专栏总目录・点这里 系列文章 使用…

RocketMQ系统性学习-RocketMQ原理分析之Broker接收消息的处理流程

Broker接收消息的处理流程&#xff1f; 既然要分析 Broker 接收消息&#xff0c;那么如何找到 Broker 接收消息并进行处理的程序入口呢&#xff1f; 那么消息既然是从生产者开始发送&#xff0c;消息是有单条消息和批量消息之分的&#xff0c;那么消息肯定是有一个标识&#…

java中常用的加密算法总结

目前在工作中常用到加密的一些场景&#xff0c;比如密码加密&#xff0c;数据加密&#xff0c;接口参数加密等&#xff0c;故通过本文总结以下常见的加密算法。 1. 对称加密算法 对称加密算法使用相同的密钥进行加密和解密。在Java中&#xff0c;常见的对称加密算法包括&…

活动回顾丨迁飞之路主题艺术墙绘落地大坪大融城

重庆作为鹰飞之城&#xff0c;不仅是数十万猛禽迁飞的必经之路&#xff0c;也是其他珍稀鸟类的家园。守护飞羽精灵&#xff0c;领略迁飞之美&#xff0c;2023年12月19日&#xff0c;传益千里携手重庆工商大学艺术学院党员服务站的志愿者们一起走进大坪大融城开展迁飞之路生态艺…

软件测试工程师,“我“从月10k到月30k进阶自动化测试之路...

目录&#xff1a;导读 前言一、Python编程入门到精通二、接口自动化项目实战三、Web自动化项目实战四、App自动化项目实战五、一线大厂简历六、测试开发DevOps体系七、常用自动化测试工具八、JMeter性能测试九、总结&#xff08;尾部小惊喜&#xff09; 前言 作为手工测试&…

Likeshop单商户高级版商城的二次开发之路

一、产品介绍 likeshop单商户高级版是一款适用于B2C、单商户、自营商城场景的商城系统。它完美契合私域流量变现闭环交易使用&#xff0c;拥有丰富的营销玩法、强大的分销能力&#xff0c;支持DIY多模板&#xff0c;前后端分离。无论您是想要进行商城运营还是二次开发&#xf…

聚观早报 |xPad2 Pro系列学习机发布;华为Mate X5典藏版实力过硬

聚观早报每日整理最值得关注的行业重点事件&#xff0c;帮助大家及时了解最新行业动态&#xff0c;每日读报&#xff0c;就读聚观365资讯简报。 整理丨Cutie 12月21日消息 xPad2 Pro系列学习机发布 华为Mate X5典藏版实力过硬 iQOO Neo9系列标配芯片Q1 亚马逊云科技自研芯…

CentOS 宣布停更3年后,服务器操作系统何去何从?

“CentOS 要停止更新了&#xff1f;” 盯着电脑&#xff0c;某大型企业数字化部门的负责人彭素素看到这个消息&#xff0c;不仅在心里发出了一声惊呼。 2020年&#xff0c;CentOS 停止更新的消息&#xff0c;不仅彭素素所在的企业&#xff0c;对于不少正在使用 CentOS 的厂商…

搞懂这6 个持续集成工具,领先80%测试人!

开发人员喜欢把写的代码当成自己的孩子&#xff0c;他们会被当成艺术品一样呵护。作为家长&#xff0c;总是会认为自己的孩子是最好的&#xff0c;也会尽全力给自己的孩子最好的&#xff0c;就算有时候会超出自己的能力范围。 最终&#xff0c;孩子会走出去&#xff0c;和其他…

【笔试强化】Day 6

文章目录 一、单选1.2.3.4.5.6.7. 二、不定项选择1.2.3. 三、编程1. 把字符串转换成整数解法&#xff1a;代码&#xff1a; 2. 不要二解法&#xff1a;代码&#xff1a; 一、单选 1. 正确答案&#xff1a;D2. 正确答案&#xff1a;B3. 正确答案&#xff1a;D4. 正确答案&#…

Python编程技巧 – 使用正则表达式

Python编程技巧 – 使用正则表达式 Python Programming Skills – Using Regular Expression By JacksonML Python以其强大的功能高居全球编程软件的榜首。它易于学习和使用&#xff0c;使其成为初学者绝佳语言。此外&#xff0c;Python还用于各种应用程序&#xff0c;包括We…

Java Swing学生成绩管理系统期末大作业

1.且看界面 &#xff08;1&#xff09;登录页&#xff08;可记住账号密码&#xff09; &#xff08;2&#xff09;注册弹窗页 &#xff08;3&#xff09;登录弹窗 &#xff08;4&#xff09;还有账号密码错误3次需等待30秒 &#xff08;5&#xff09;成绩展示页面&#xff08;…

【Spring】15 ApplicationContextAware 接口

文章目录 1. 简介2. 作用3. 使用3.1 创建并实现接口3.2 配置 Bean 信息3.3 创建启动类3.4 启动 4. 应用场景总结 Spring 框架提供了许多回调接口&#xff0c;用于在 Bean 的生命周期中执行特定的操作。ApplicationContextAware 接口是其中之一&#xff0c;它允许 Bean 获取对 A…

无代码API集成助力电商平台,提升味分享营销系统效率

无代码开发的革命 在数字化转型的浪潮中&#xff0c;无代码开发正在成为企业提升效率和灵活性的重要工具。特别是在电商领域&#xff0c;高效的客户关系管理&#xff08;CRM&#xff09;系统和客户服务系统对于保持竞争力至关重要。无代码API集成方案如何实现电商系统的优化和…

存在重复元素

题目链接 存在重复元素 题目描述 注意点 无 解答思路 根据Set无法存储相同元素的特点判断nums中是否存在重复元素 代码 class Solution {public boolean containsDuplicate(int[] nums) {Set<Integer> set new HashSet<Integer>();for (int x : nums) {if …

广州华锐互动:VR元宇宙技术为汽车行业带来革命性变化

随着科技的飞速发展&#xff0c;VR元宇宙技术已经深入影响到我们生活的方方面面&#xff0c;汽车行业更是深受其益。这一新兴技术的出现&#xff0c;为汽车行业带来了前所未有的变化。广州华锐互动将VR技术应用于汽车行业&#xff0c;研发了VR汽修培训、3D汽车展厅、特种车辆3D…

JVM内存结构Java内存模型Java对象模型

导图&#xff1a; https://naotu.baidu.com/file/60a0bdcaca7c6b92fcc5f796fe6f6bc9 1.JVM内存结构&&Java内存模型&&Java对象模型 1.1.JVM内存结构 1.2.Java对象模型 Java对象模型表示的是这个对象本身的存储模型,JVM会给这个类创建一个instanceKlass保存在方…

【powershell】Windows环境powershell 运维之历史文件压缩清理

&#x1f984; 个人主页——&#x1f390;开着拖拉机回家_Linux,大数据运维-CSDN博客 &#x1f390;✨&#x1f341; &#x1fa81;&#x1f341;&#x1fa81;&#x1f341;&#x1fa81;&#x1f341;&#x1fa81;&#x1f341; &#x1fa81;&#x1f341;&#x1fa81;&am…