【SpringCloud-Seata源码分析3】

news2024/11/17 14:26:11

文章目录

  • 事务的提交
    • 客户端提交流程
    • 服务端提交流程
    • 客户端删除undo_log
  • 事务回滚
    • 客户端事务回滚
    • 服务端回滚事务

事务的提交

前面两篇我们分析了seata的TC初始化和TM,RM初始化,并且事务准备阶段源码及业务Sql执行,下面我们分析事务的提交源码。

客户端提交流程

在这里插入图片描述
这个主要是从 TransactionalTemplate#execute方法中


                try {
                	//准备阶段
                    this.beginTransaction(txInfo, tx);

                    Object rs;
                    Object ex;
                    try {
                    	//业务sql的执行
                        rs = business.execute();
                    } catch (Throwable var17) {
                        ex = var17;
                        this.completeTransactionAfterThrowing(txInfo, tx, var17);
                        throw var17;
                    }
					//事务的提交
                    this.commitTransaction(tx);
                    ex = rs;
                    return ex;
                } finally {
                    this.resumeGlobalLockConfig(previousConfig);
                    this.triggerAfterCompletion();
                    this.cleanUp();
                }
    private void commitTransaction(GlobalTransaction tx) throws ExecutionException {
        try {
        	//前置镜像
            this.triggerBeforeCommit();
            //提交
            tx.commit();
            //后置镜像
            this.triggerAfterCommit();
        } catch (TransactionException var3) {
            throw new ExecutionException(tx, var3, Code.CommitFailure);
        }
    }
public void commit() throws TransactionException {
		//判断当前的角色是否是Launcher
        if (this.role == GlobalTransactionRole.Participant) {
            if (LOGGER.isDebugEnabled()) {
                LOGGER.debug("Ignore Commit(): just involved in global transaction [{}]", this.xid);
            }

        } else {
        	//判断xid是否为null
            this.assertXIDNotNull();
            //重试次数,默认为5次可以自定定义重试次数
            int retry = COMMIT_RETRY_COUNT <= 0 ? 5 : COMMIT_RETRY_COUNT;

            try {
                while(retry > 0) {
                    try {
                        --retry;
                        //循环请求后台,并且返回状态
                        this.status = this.transactionManager.commit(this.xid);
                        break;
                    } catch (Throwable var6) {
                        LOGGER.error("Failed to report global commit [{}],Retry Countdown: {}, reason: {}", new Object[]{this.getXid(), retry, var6.getMessage()});
                        if (retry == 0) {
                            throw new TransactionException("Failed to report global commit", var6);
                        }
                    }
                }
            } finally {
                if (this.xid.equals(RootContext.getXID())) {
                    this.suspend();
                }

            }

            if (LOGGER.isInfoEnabled()) {
                LOGGER.info("[{}] commit status: {}", this.xid, this.status);
            }

        }
    }
    public GlobalStatus commit(String xid) throws TransactionException {
    	//组装请求的参数
        GlobalCommitRequest globalCommit = new GlobalCommitRequest();
        globalCommit.setXid(xid);
        //netty请求seata后端
        GlobalCommitResponse response = (GlobalCommitResponse)this.syncCall(globalCommit);
        return response.getGlobalStatus();
    }
   private AbstractTransactionResponse syncCall(AbstractTransactionRequest request) throws TransactionException {
        try {
            return (AbstractTransactionResponse)TmNettyRemotingClient.getInstance().sendSyncRequest(request);
        } catch (TimeoutException var3) {
            throw new TmTransactionException(TransactionExceptionCode.IO, "RPC timeout", var3);
        }
    }
}

服务端提交流程

在这里插入图片描述seata后台服务时对事务进行的异步提交,首先来分析一下我们后端服务的入口DefaultCoordinator#doglobalCommit方法

   protected void doGlobalCommit(GlobalCommitRequest request, GlobalCommitResponse response, RpcContext rpcContext)
            throws TransactionException {
            //存储线程池的xid方便看日志
        MDC.put(RootContext.MDC_KEY_XID, request.getXid());
        //核心方法
        response.setGlobalStatus(core.commit(request.getXid()));
    }
 public GlobalStatus commit(String xid) throws TransactionException {
        //key1:查询全局事务 如果事务被清理,则直接返回完成
        GlobalSession globalSession = SessionHolder.findGlobalSession(xid);
        if (globalSession == null) {
            return GlobalStatus.Finished;
        }
        //添加监听器
        globalSession.addSessionLifecycleListener(SessionHolder.getRootSessionManager());
        // just lock changeStatus

        boolean shouldCommit = SessionHolder.lockAndExecute(globalSession, () -> {
            if (globalSession.getStatus() == GlobalStatus.Begin) {
                // Highlight: Firstly, close the session, then no more branch can be registered.
                //释放全局锁
                globalSession.closeAndClean();
                // key2: 异步提交
                //判断是否可以进行异步提交,是AT模式
                if (globalSession.canBeCommittedAsync()) {
                   //异步提交
                    globalSession.asyncCommit();
                    MetricsPublisher.postSessionDoneEvent(globalSession, GlobalStatus.Committed, false, false);
                    return false;
                } else {
                	//更改全局事务状态为commiting
                    globalSession.changeGlobalStatus(GlobalStatus.Committing);
                    return true;
                }
            }
            return false;
        });
		//如果可以提交,则执行shouldCommit
        if (shouldCommit) {
            boolean success = doGlobalCommit(globalSession, false);
            //If successful and all remaining branches can be committed asynchronously, do async commit.
            if (success && globalSession.hasBranch() && globalSession.canBeCommittedAsync()) {
                globalSession.asyncCommit();
                return GlobalStatus.Committed;
            } else {
                return globalSession.getStatus();
            }
        } else {
            return globalSession.getStatus() == GlobalStatus.AsyncCommitting ? GlobalStatus.Committed : globalSession.getStatus();
        }
    }

AT模式将事务状态改为异步提交AsyncCommitting,然后定时运行线程池1s轮训运行一次

    public void asyncCommit() throws TransactionException {
        this.addSessionLifecycleListener(SessionHolder.getAsyncCommittingSessionManager());
        // 设置一个状态异步提交·
        this.setStatus(GlobalStatus.AsyncCommitting);
        //  异步增加全局session信息
        SessionHolder.getAsyncCommittingSessionManager().addGlobalSession(this);
    }

异步定时任务DefaultCoordinator#init方法中定时开启

    public void init() {
        retryRollbacking.scheduleAtFixedRate(
            () -> SessionHolder.distributedLockAndExecute(RETRY_ROLLBACKING, this::handleRetryRollbacking), 0,
            ROLLBACKING_RETRY_PERIOD, TimeUnit.MILLISECONDS);

        retryCommitting.scheduleAtFixedRate(
            () -> SessionHolder.distributedLockAndExecute(RETRY_COMMITTING, this::handleRetryCommitting), 0,
            COMMITTING_RETRY_PERIOD, TimeUnit.MILLISECONDS);
		//我们事务异步提交都在这里执行
        asyncCommitting.scheduleAtFixedRate(
            () -> SessionHolder.distributedLockAndExecute(ASYNC_COMMITTING, this::handleAsyncCommitting), 0,
            ASYNC_COMMITTING_RETRY_PERIOD, TimeUnit.MILLISECONDS);

        timeoutCheck.scheduleAtFixedRate(
            () -> SessionHolder.distributedLockAndExecute(TX_TIMEOUT_CHECK, this::timeoutCheck), 0,
            TIMEOUT_RETRY_PERIOD, TimeUnit.MILLISECONDS);

        undoLogDelete.scheduleAtFixedRate(
            () -> SessionHolder.distributedLockAndExecute(UNDOLOG_DELETE, this::undoLogDelete),
            UNDO_LOG_DELAY_DELETE_PERIOD, UNDO_LOG_DELETE_PERIOD, TimeUnit.MILLISECONDS);
    }

获取所有的一步提交状态的事务,并加上排他锁

    public static boolean distributedLockAndExecute(String key, NoArgsFunc func) {
        boolean lock = false;
        try {
        	//核心方法,请求加锁
            if (lock = acquireDistributedLock(key)) {
                func.call();
            }
        } catch (Exception e) {
            LOGGER.info("Exception running function with key = {}", key, e);
        } finally {
            if (lock) {
                try {
                    SessionHolder.releaseDistributedLock(key);
                } catch (Exception ex) {
                    LOGGER.warn("release distribute lock failure, message = {}", ex.getMessage(), ex);
                }
            }
        }
        return lock;
    }
public boolean acquireLock(DistributedLockDO distributedLockDO) {
        if (demotion) {
            return true;
        }

        Connection connection = null;
        boolean originalAutoCommit = false;
        try {
            connection = distributedLockDataSource.getConnection();
            originalAutoCommit = connection.getAutoCommit();
            connection.setAutoCommit(false);
			//这个是核心,查询出所有的异步提交的事务,并且查询的sql上加上排他锁
            DistributedLockDO distributedLockDOFromDB = getDistributedLockDO(connection, distributedLockDO.getLockKey());
            //如果查询出来的数据为null
            if (null == distributedLockDOFromDB) {
            //执行插入的sql "INSERT INTO " + DISTRIBUTED_LOCK_TABLE_PLACE_HOLD + "(" + ALL_COLUMNS + ") VALUES (?, ?, ?)";
                boolean ret = insertDistribute(connection, distributedLockDO);
                //提交事务
                connection.commit();
                return ret;
            }
			//判断过期时间如果大于等于当前的系统时间
            if (distributedLockDOFromDB.getExpireTime() >= System.currentTimeMillis()) {
                LOGGER.debug("the distribute lock for key :{} is holding by :{}, acquire lock failure.",
                        distributedLockDO.getLockKey(), distributedLockDOFromDB.getLockValue());
                        //事务提交
                connection.commit();
                return false;
            }
			//执行更新sql"UPDATE " + "distributed_lock_table" + " SET "+ ServerTableColumnsName.DISTRIBUTED_LOCK_VALUE + "=?, " + ServerTableColumnsName.DISTRIBUTED_LOCK_EXPIRE + "=?"+ " WHERE " + ServerTableColumnsName.DISTRIBUTED_LOCK_KEY + "=?";
            boolean ret = updateDistributedLock(connection, distributedLockDO);
            //事务提交
            connection.commit();

            return ret;
        } catch (SQLException ex) {
            LOGGER.error("execute acquire lock failure, key is: {}", distributedLockDO.getLockKey(), ex);
            try {
                if (connection != null) {
                	//回滚
                    connection.rollback();
                }
            } catch (SQLException e) {
                LOGGER.warn("rollback fail because of {}", e.getMessage(), e);
            }
            return false;
        } finally {
            try {
                if (originalAutoCommit) {
                    connection.setAutoCommit(true);
                }
                IOUtil.close(connection);
            } catch (SQLException ignore) { }
        }
    }

加上分布式事务锁之后,我们需要执行handleAsyncCommitting

protected void handleAsyncCommitting() {
        SessionCondition sessionCondition = new SessionCondition(GlobalStatus.AsyncCommitting);
        // key1:获取所有GlobalSession,并且是事务为AsyncCommiting状态
        Collection<GlobalSession> asyncCommittingSessions =
                SessionHolder.getAsyncCommittingSessionManager().findGlobalSessions(sessionCondition);
        if (CollectionUtils.isEmpty(asyncCommittingSessions)) {
            return;
        }
        // key2:遍历
        SessionHelper.forEach(asyncCommittingSessions, asyncCommittingSession -> {
            try {
                asyncCommittingSession.addSessionLifecycleListener(SessionHolder.getRootSessionManager());
                // 进行处理
                core.doGlobalCommit(asyncCommittingSession, true);
            } catch (TransactionException ex) {
                LOGGER.error("Failed to async committing [{}] {} {}", asyncCommittingSession.getXid(), ex.getCode(), ex.getMessage(), ex);
            }
        });
    }

事务的核心执行流程

@Override
    public boolean  doGlobalCommit(GlobalSession globalSession, boolean retrying) throws TransactionException {
        boolean success = true;
        // start committing event
        MetricsPublisher.postSessionDoingEvent(globalSession, retrying);

        if (globalSession.isSaga()) {
            success = getCore(BranchType.SAGA).doGlobalCommit(globalSession, retrying);
        } else {
            // key1: 获取分支事务
            Boolean result = SessionHelper.forEach(globalSession.getSortedBranches(), branchSession -> {
                // if not retrying, skip the canBeCommittedAsync branches
                if (!retrying && branchSession.canBeCommittedAsync()) {
                    return CONTINUE;
                }

                BranchStatus currentStatus = branchSession.getStatus();
                // key2: 当前事务状态为PhaseOne_Failed,则删除分支事务,释放全局锁
                if (currentStatus == BranchStatus.PhaseOne_Failed) {
                    SessionHelper.removeBranch(globalSession, branchSession, !retrying);
                    return CONTINUE;
                }
                try {
                    //key3:发送rpc删除undolog日志
                    BranchStatus branchStatus = getCore(branchSession.getBranchType()).branchCommit(globalSession, branchSession);
                    if (isXaerNotaTimeout(globalSession,branchStatus)) {
                        LOGGER.info("Commit branch XAER_NOTA retry timeout, xid = {} branchId = {}", globalSession.getXid(), branchSession.getBranchId());
                        branchStatus = BranchStatus.PhaseTwo_Committed;
                    }
                    switch (branchStatus) {
                        case PhaseTwo_Committed:
                            //key4: 删除分支事务信息,释放全局锁
                            SessionHelper.removeBranch(globalSession, branchSession, !retrying);
                            return CONTINUE;
                        case PhaseTwo_CommitFailed_Unretryable:
                            //远程删除失败则报错,如果不是异步提交,修改状态CommitFailed
                            SessionHelper.endCommitFailed(globalSession, retrying);
                            LOGGER.error("Committing global transaction[{}] finally failed, caused by branch transaction[{}] commit failed.", globalSession.getXid(), branchSession.getBranchId());
                            return false;

                        default:
                            if (!retrying) {
                                globalSession.queueToRetryCommit();
                                return false;
                            }
                            if (globalSession.canBeCommittedAsync()) {
                                LOGGER.error("Committing branch transaction[{}], status:{} and will retry later",
                                    branchSession.getBranchId(), branchStatus);
                                return CONTINUE;
                            } else {
                                LOGGER.error(
                                    "Committing global transaction[{}] failed, caused by branch transaction[{}] commit failed, will retry later.", globalSession.getXid(), branchSession.getBranchId());
                                return false;
                            }
                    }
                } catch (Exception ex) {
                    StackTraceLogger.error(LOGGER, ex, "Committing branch transaction exception: {}",
                        new String[] {branchSession.toString()});
                    if (!retrying) {
                        globalSession.queueToRetryCommit();
                        throw new TransactionException(ex);
                    }
                }
                return CONTINUE;
            });
            // Return if the result is not null
            if (result != null) {
                return result;
            }
            //If has branch and not all remaining branches can be committed asynchronously,
            //do print log and return false
            if (globalSession.hasBranch() && !globalSession.canBeCommittedAsync()) {
                LOGGER.info("Committing global transaction is NOT done, xid = {}.", globalSession.getXid());
                return false;
            }
            if (!retrying) {
                //contains not AT branch
                globalSession.setStatus(GlobalStatus.Committed);
            }
        }
        // if it succeeds and there is no branch, retrying=true is the asynchronous state when retrying. EndCommitted is
        // executed to improve concurrency performance, and the global transaction ends..
        if (success && globalSession.getBranchSessions().isEmpty()) {
            // key4: 删除全局事务信息
            SessionHelper.endCommitted(globalSession, retrying);
            LOGGER.info("Committing global transaction is successfully done, xid = {}.", globalSession.getXid());
        }
        return success;
    }

客户端删除undo_log

在这里插入图片描述
我们进入客户端DataSourceManager#branchcommit方法

//这个属于核心,在类的构造方法中,会初始化一个定时任务
    private final AsyncWorker asyncWorker = new AsyncWorker(this);
public BranchStatus branchCommit(BranchType branchType, String xid, long branchId, String resourceId, String applicationData) throws TransactionException {
        return this.asyncWorker.branchCommit(xid, branchId, resourceId);
    }
    public BranchStatus branchCommit(String xid, long branchId, String resourceId) {
        AsyncWorker.Phase2Context context = new AsyncWorker.Phase2Context(xid, branchId, resourceId);
        //将事务加入队列
        this.addToCommitQueue(context);
        return BranchStatus.PhaseTwo_Committed;
    }

AsyncWorker的构造方法

    public AsyncWorker(DataSourceManager dataSourceManager) {
        this.dataSourceManager = dataSourceManager;
        LOGGER.info("Async Commit Buffer Limit: {}", ASYNC_COMMIT_BUFFER_LIMIT);
        //初始化一个队列
        this.commitQueue = new LinkedBlockingQueue(ASYNC_COMMIT_BUFFER_LIMIT);
        ThreadFactory threadFactory = new NamedThreadFactory("AsyncWorker", 2, true);
        this.scheduledExecutor = new ScheduledThreadPoolExecutor(2, threadFactory);
        //每一秒就轮训执行方法doBranchCommitSafely
        this.scheduledExecutor.scheduleAtFixedRate(this::doBranchCommitSafely, 10L, 1000L, TimeUnit.MILLISECONDS);
    }
    private void doBranchCommit() {
    //判断队列是否为空
        if (!this.commitQueue.isEmpty()) {
            List<AsyncWorker.Phase2Context> allContexts = new LinkedList();
            //将队列中的消息添加到新的list中
            this.commitQueue.drainTo(allContexts);
            //校验和处理数据,并将其封装为map
            Map<String, List<AsyncWorker.Phase2Context>> groupedContexts = this.groupedByResourceId(allContexts);
            groupedContexts.forEach(this::dealWithGroupedContexts);
        }
    }
 private void dealWithGroupedContexts(String resourceId, List<AsyncWorker.Phase2Context> contexts) {
 		//判断resourceId是否为空
        if (StringUtils.isBlank(resourceId)) {
            LOGGER.warn("resourceId is empty and will skip.");
        } else {
            DataSourceProxy dataSourceProxy = this.dataSourceManager.get(resourceId);
            if (dataSourceProxy == null) {
                LOGGER.warn("failed to find resource for {} and requeue", resourceId);
                //添加到提交队列
                this.addAllToCommitQueue(contexts);
            } else {
                Connection conn = null;

                try {
                    conn = dataSourceProxy.getPlainConnection();
                    //获取undolog事务管理器
                    UndoLogManager undoLogManager = UndoLogManagerFactory.getUndoLogManager(dataSourceProxy.getDbType());
                    List<List<AsyncWorker.Phase2Context>> splitByLimit = Lists.partition(contexts, 1000);
                    Iterator var7 = splitByLimit.iterator();
					//循环遍历
                    while(var7.hasNext()) {
                        List<AsyncWorker.Phase2Context> partition = (List)var7.next();
                        //删除undologr日志
                        this.deleteUndoLog(conn, undoLogManager, partition);
                    }
                } catch (SQLException var12) {
                    this.addAllToCommitQueue(contexts);
                    LOGGER.error("failed to get connection for async committing on {} and requeue", resourceId, var12);
                } finally {
                    IOUtil.close(conn);
                }

            }
        }
    }
    private void deleteUndoLog(final Connection conn, UndoLogManager undoLogManager, List<AsyncWorker.Phase2Context> contexts) {
        Set<String> xids = new LinkedHashSet(contexts.size());
        Set<Long> branchIds = new LinkedHashSet(contexts.size());
        contexts.forEach((context) -> {
            xids.add(context.xid);
            branchIds.add(context.branchId);
        });

        try {
        //批量删除
            undoLogManager.batchDeleteUndoLog(xids, branchIds, conn);
            if (!conn.getAutoCommit()) {
                conn.commit();
            }
        } catch (SQLException var9) {
            LOGGER.error("Failed to batch delete undo log", var9);

            try {
                conn.rollback();
                this.addAllToCommitQueue(contexts);
            } catch (SQLException var8) {
                LOGGER.error("Failed to rollback JDBC resource after deleting undo log failed", var8);
            }
        }

    }

事务回滚

客户端事务回滚

在这里插入图片描述
客户端事务主要从AbstractTransactionRequest的实现类中去找到branchTrancetionRollback类

public abstract class AbstractTransactionRequest extends AbstractMessage {
    public AbstractTransactionRequest() {
    }

    public abstract AbstractTransactionResponse handle(RpcContext rpcContext);
}

这个类是rollBack的核心类

public class BranchRollbackRequest extends AbstractBranchEndRequest {
    public BranchRollbackRequest() {
    }

    public short getTypeCode() {
        return 5;
    }

    public AbstractTransactionResponse handle(RpcContext rpcContext) {
        return this.handler.handle(this);
    }
}
    protected void doBranchRollback(BranchRollbackRequest request, BranchRollbackResponse response) throws TransactionException {
    	//封装请求参数
        String xid = request.getXid();
        long branchId = request.getBranchId();
        String resourceId = request.getResourceId();
        String applicationData = request.getApplicationData();
        if (LOGGER.isInfoEnabled()) {
            LOGGER.info("Branch Rollbacking: " + xid + " " + branchId + " " + resourceId);
        }
		
        BranchStatus status = this.getResourceManager().branchRollback(request.getBranchType(), xid, branchId, resourceId, applicationData);
        //封装响应数据
        response.setXid(xid);
        response.setBranchId(branchId);
        response.setBranchStatus(status);
        if (LOGGER.isInfoEnabled()) {
            LOGGER.info("Branch Rollbacked result: " + status);
        }

    }
    public BranchStatus branchRollback(BranchType branchType, String xid, long branchId, String resourceId, String applicationData) throws TransactionException {
        DataSourceProxy dataSourceProxy = this.get(resourceId);
        if (dataSourceProxy == null) {
            throw new ShouldNeverHappenException(String.format("resource: %s not found", resourceId));
        } else {
            try {
            //核心的执行方法
                UndoLogManagerFactory.getUndoLogManager(dataSourceProxy.getDbType()).undo(dataSourceProxy, xid, branchId);
            } catch (TransactionException var9) {
                StackTraceLogger.info(LOGGER, var9, "branchRollback failed. branchType:[{}], xid:[{}], branchId:[{}], resourceId:[{}], applicationData:[{}]. reason:[{}]", new Object[]{branchType, xid, branchId, resourceId, applicationData, var9.getMessage()});
                if (var9.getCode() == TransactionExceptionCode.BranchRollbackFailed_Unretriable) {
                    return BranchStatus.PhaseTwo_RollbackFailed_Unretryable;
                }

                return BranchStatus.PhaseTwo_RollbackFailed_Retryable;
            }
			//返回二次提交回滚状态
            return BranchStatus.PhaseTwo_Rollbacked;
        }
    }
 public void undo(DataSourceProxy dataSourceProxy, String xid, long branchId) throws TransactionException {
        Connection conn = null;
        ResultSet rs = null;
        PreparedStatement selectPST = null;
        boolean originalAutoCommit = true;

        while(true) {
            try {
                conn = dataSourceProxy.getPlainConnection();
                if (originalAutoCommit = conn.getAutoCommit()) {
                    conn.setAutoCommit(false);
                }
				//查询出来事务的undolog      SELECT_UNDO_LOG_SQL = "SELECT * FROM " + UNDO_LOG_TABLE_NAME + " WHERE " + "branch_id" + " = ? AND " + "xid" + " = ? FOR UPDATE";

                selectPST = conn.prepareStatement(SELECT_UNDO_LOG_SQL);
                selectPST.setLong(1, branchId);
                selectPST.setString(2, xid);
                rs = selectPST.executeQuery();
                boolean exists = false;

                while(rs.next()) {
                    exists = true;
                    int state = rs.getInt("log_status");
                    if (!canUndo(state)) {
                        if (LOGGER.isInfoEnabled()) {
                            LOGGER.info("xid {} branch {}, ignore {} undo_log", new Object[]{xid, branchId, state});
                        }

                        return;
                    }

                    String contextString = rs.getString("context");
                    Map<String, String> context = this.parseContext(contextString);
                    //封装undolog,然后执行sql会进行回滚
                    byte[] rollbackInfo = this.getRollbackInfo(rs);
                    String serializer = context == null ? null : (String)context.get("serializer");
                    UndoLogParser parser = serializer == null ? UndoLogParserFactory.getInstance() : UndoLogParserFactory.getInstance(serializer);
                    BranchUndoLog branchUndoLog = parser.decode(rollbackInfo);

                    try {
                        setCurrentSerializer(parser.getName());
                        List<SQLUndoLog> sqlUndoLogs = branchUndoLog.getSqlUndoLogs();
                        if (sqlUndoLogs.size() > 1) {
                            Collections.reverse(sqlUndoLogs);
                        }

                        Iterator var18 = sqlUndoLogs.iterator();

                        while(var18.hasNext()) {
                            SQLUndoLog sqlUndoLog = (SQLUndoLog)var18.next();
                            TableMeta tableMeta = TableMetaCacheFactory.getTableMetaCache(dataSourceProxy.getDbType()).getTableMeta(conn, sqlUndoLog.getTableName(), dataSourceProxy.getResourceId());
                            sqlUndoLog.setTableMeta(tableMeta);
                            AbstractUndoExecutor undoExecutor = UndoExecutorFactory.getUndoExecutor(dataSourceProxy.getDbType(), sqlUndoLog);
                            //执行对应的回滚操作
                            undoExecutor.executeOn(conn);
                        }
                    } finally {
                        removeCurrentSerializer();
                    }
                }

                if (exists) {
                //删除undolog
                    this.deleteUndoLog(xid, branchId, conn);
                    conn.commit();
                    if (LOGGER.isInfoEnabled()) {
                        LOGGER.info("xid {} branch {}, undo_log deleted with {}", new Object[]{xid, branchId, AbstractUndoLogManager.State.GlobalFinished.name()});
                        break;
                    }
                } else {
                    this.insertUndoLogWithGlobalFinished(xid, branchId, UndoLogParserFactory.getInstance(), conn);
                    conn.commit();
                    if (LOGGER.isInfoEnabled()) {
                        LOGGER.info("xid {} branch {}, undo_log added with {}", new Object[]{xid, branchId, AbstractUndoLogManager.State.GlobalFinished.name()});
                        break;
                    }
                }

                return;
            } catch (SQLIntegrityConstraintViolationException var43) {
                if (LOGGER.isInfoEnabled()) {
                    LOGGER.info("xid {} branch {}, undo_log inserted, retry rollback", xid, branchId);
                }
            } catch (Throwable var44) {
                if (conn != null) {
                    try {
                        conn.rollback();
                    } catch (SQLException var41) {
                        LOGGER.warn("Failed to close JDBC resource while undo ... ", var41);
                    }
                }

                throw new BranchTransactionException(TransactionExceptionCode.BranchRollbackFailed_Retriable, String.format("Branch session rollback failed and try again later xid = %s branchId = %s %s", xid, branchId, var44.getMessage()), var44);
            } finally {
                try {
                    if (rs != null) {
                        rs.close();
                    }

                    if (selectPST != null) {
                        selectPST.close();
                    }

                    if (conn != null) {
                        if (originalAutoCommit) {
                            conn.setAutoCommit(true);
                        }

                        conn.close();
                    }
                } catch (SQLException var40) {
                    LOGGER.warn("Failed to close JDBC resource while undo ... ", var40);
                }

            }
        }

    }

服务端回滚事务

在这里插入图片描述
服务端的事务回滚是从GlobalRollbackRequest方法进入的

public class GlobalRollbackRequest extends AbstractGlobalEndRequest {
    @Override
    public short getTypeCode() {
        return MessageType.TYPE_GLOBAL_ROLLBACK;
    }

    @Override
    public AbstractTransactionResponse handle(RpcContext rpcContext) {
    	//这里是核心
        return handler.handle(this, rpcContext);
    }
}
 @Override
    public GlobalRollbackResponse handle(GlobalRollbackRequest request, final RpcContext rpcContext) {
    	//组装响应的数据
        GlobalRollbackResponse response = new GlobalRollbackResponse();
        response.setGlobalStatus(GlobalStatus.Rollbacking);
        exceptionHandleTemplate(new AbstractCallback<GlobalRollbackRequest, GlobalRollbackResponse>() {
            @Override
            public void execute(GlobalRollbackRequest request, GlobalRollbackResponse response)
                throws TransactionException {
                try {
                	//核心的执行类
                    doGlobalRollback(request, response, rpcContext);
                } catch (StoreException e) {
                    throw new TransactionException(TransactionExceptionCode.FailedStore, String
                        .format("global rollback request failed. xid=%s, msg=%s", request.getXid(), e.getMessage()), e);
                }
            }

            @Override
            public void onTransactionException(GlobalRollbackRequest request, GlobalRollbackResponse response,
                                               TransactionException tex) {
                super.onTransactionException(request, response, tex);
                // may be appears StoreException outer layer method catch
                checkTransactionStatus(request, response);
            }

            @Override
            public void onException(GlobalRollbackRequest request, GlobalRollbackResponse response, Exception rex) {
                super.onException(request, response, rex);
                // may be appears StoreException outer layer method catch
                checkTransactionStatus(request, response);
            }
        }, request, response);
        return response;
    }
  public GlobalStatus rollback(String xid) throws TransactionException {
        //key1:事务回滚,先通过全局事xid找到全局事务和分支事务
        //1.先通过全局事务XID找到去global_table查询到全局事务对象
        //2.通过查询出来的xid对象去查询branch_talbe是否有分支事务,最后将这两个事务对象封装到GlobalSession中
        //这里传入的true代表全局事务和分支事务都查询出来,全局事务是一条数据,分支事务可能会多条是一个list
        GlobalSession globalSession = SessionHolder.findGlobalSession(xid);
        if (globalSession == null) {
            return GlobalStatus.Finished;
        }
        globalSession.addSessionLifecycleListener(SessionHolder.getRootSessionManager());
        // just lock changeStatus
        boolean shouldRollBack = SessionHolder.lockAndExecute(globalSession, () -> {
            //设置本事务的active状态为false
            globalSession.close(); // Highlight: Firstly, close the session, then no more branch can be registered.
            //如果目前的全局事务状态为Begin开始状态,那么修改全局事务的状态为回滚中,修改数据表global_table的status的值为Rollbacking
            if (globalSession.getStatus() == GlobalStatus.Begin) {
                globalSession.changeGlobalStatus(GlobalStatus.Rollbacking);
                return true;
            }
            return false;
        });
        if (!shouldRollBack) {
            return globalSession.getStatus();
        }
        //key2:真正的回滚逻辑
        boolean rollbackSuccess = doGlobalRollback(globalSession, false);
        //返回回滚的逻辑
        return rollbackSuccess ? GlobalStatus.Rollbacked : globalSession.getStatus();
    }

@Override
    public boolean doGlobalRollback(GlobalSession globalSession, boolean retrying) throws TransactionException {
        boolean success = true;
        // start rollback event
        MetricsPublisher.postSessionDoingEvent(globalSession, retrying);

        if (globalSession.isSaga()) {
            success = getCore(BranchType.SAGA).doGlobalRollback(globalSession, retrying);
        } else {
            //globalSession.getReverseSortedBranches()得到的是所有分支
            Boolean result = SessionHelper.forEach(globalSession.getReverseSortedBranches(), branchSession -> {
                BranchStatus currentBranchStatus = branchSession.getStatus();
                //如果分支事务是在一阶段失败的,调用removeBranch,释放锁
                if (currentBranchStatus == BranchStatus.PhaseOne_Failed) {
                    SessionHelper.removeBranch(globalSession, branchSession, !retrying);
                    return CONTINUE;
                }
                try {
                    //branchRollback回滚分支事务,这里是server端,所以这里的分支事务的回滚是调用的远程进行回滚的
                    //而远程回滚就是使用的undo log日志表来回滚的
                    BranchStatus branchStatus = branchRollback(globalSession, branchSession);
                    if (isXaerNotaTimeout(globalSession, branchStatus)) {
                        LOGGER.info("Rollback branch XAER_NOTA retry timeout, xid = {} branchId = {}", globalSession.getXid(), branchSession.getBranchId());
                        branchStatus = BranchStatus.PhaseTwo_Rollbacked;
                    }
                    switch (branchStatus) {
                        //PhaseTwo_Rollbacked表示远程回滚成功
                        case PhaseTwo_Rollbacked:
                            // 二阶段回滚,删除分支事务信息
                            //删除分支事务信息,就是删除branch_table和释放锁
                            //1.释放锁,删除lock_table中的行锁信息;
                            //2.删除分支事务,branch_table
                            SessionHelper.removeBranch(globalSession, branchSession, !retrying);
                            LOGGER.info("Rollback branch transaction successfully, xid = {} branchId = {}", globalSession.getXid(), branchSession.getBranchId());
                            return CONTINUE;
                        case PhaseTwo_RollbackFailed_Unretryable:
                            //远程回滚失败,修改全局事务状态为RollbackFailed
                            SessionHelper.endRollbackFailed(globalSession, retrying);
                            LOGGER.info("Rollback branch transaction fail and stop retry, xid = {} branchId = {}", globalSession.getXid(), branchSession.getBranchId());
                            return false;
                        default:
                            LOGGER.info("Rollback branch transaction fail and will retry, xid = {} branchId = {}", globalSession.getXid(), branchSession.getBranchId());
                            if (!retrying) {
                                globalSession.queueToRetryRollback();
                            }
                            return false;
                    }
                } catch (Exception ex) {
                    StackTraceLogger.error(LOGGER, ex,
                        "Rollback branch transaction exception, xid = {} branchId = {} exception = {}",
                        new String[] {globalSession.getXid(), String.valueOf(branchSession.getBranchId()), ex.getMessage()});
                    if (!retrying) {
                        globalSession.queueToRetryRollback();
                    }
                    throw new TransactionException(ex);
                }
            });
            // Return if the result is not null
            if (result != null) {
                return result;
            }
        }

        // In db mode, lock and branch data residual problems may occur.
        // Therefore, execution needs to be delayed here and cannot be executed synchronously.
        if (success) {
            // 删除全局事务数据
            SessionHelper.endRollbacked(globalSession, retrying);
            LOGGER.info("Rollback global transaction successfully, xid = {}.", globalSession.getXid());
        }
        return success;
    }

起一个定时任务删除全局事务。

 public void init() {
        retryRollbacking.scheduleAtFixedRate(
            () -> SessionHolder.distributedLockAndExecute(RETRY_ROLLBACKING, this::handleRetryRollbacking), 0,
            ROLLBACKING_RETRY_PERIOD, TimeUnit.MILLISECONDS);

        retryCommitting.scheduleAtFixedRate(
            () -> SessionHolder.distributedLockAndExecute(RETRY_COMMITTING, this::handleRetryCommitting), 0,
            COMMITTING_RETRY_PERIOD, TimeUnit.MILLISECONDS);

        asyncCommitting.scheduleAtFixedRate(
            () -> SessionHolder.distributedLockAndExecute(ASYNC_COMMITTING, this::handleAsyncCommitting), 0,
            ASYNC_COMMITTING_RETRY_PERIOD, TimeUnit.MILLISECONDS);

        timeoutCheck.scheduleAtFixedRate(
            () -> SessionHolder.distributedLockAndExecute(TX_TIMEOUT_CHECK, this::timeoutCheck), 0,
            TIMEOUT_RETRY_PERIOD, TimeUnit.MILLISECONDS);

        undoLogDelete.scheduleAtFixedRate(
            () -> SessionHolder.distributedLockAndExecute(UNDOLOG_DELETE, this::undoLogDelete),
            UNDO_LOG_DELAY_DELETE_PERIOD, UNDO_LOG_DELETE_PERIOD, TimeUnit.MILLISECONDS);
    }
  */
    protected void handleRetryRollbacking() {
        SessionCondition sessionCondition = new SessionCondition(rollbackingStatuses);
        sessionCondition.setLazyLoadBranch(true);
        Collection<GlobalSession> rollbackingSessions =
            SessionHolder.getRetryRollbackingSessionManager().findGlobalSessions(sessionCondition);
        if (CollectionUtils.isEmpty(rollbackingSessions)) {
            return;
        }
        long now = System.currentTimeMillis();
        SessionHelper.forEach(rollbackingSessions, rollbackingSession -> {
            try {
                // prevent repeated rollback
                // !rollbackingSession.isDeadSession() 判断回滚的全局事务必须超时,对应时间可以进入看
                if (rollbackingSession.getStatus().equals(GlobalStatus.Rollbacking)
                    && !rollbackingSession.isDeadSession()) {
                    // The function of this 'return' is 'continue'.
                    return;
                }
                if (isRetryTimeout(now, MAX_ROLLBACK_RETRY_TIMEOUT.toMillis(), rollbackingSession.getBeginTime())) {
                    if (ROLLBACK_RETRY_TIMEOUT_UNLOCK_ENABLE) {
                        rollbackingSession.clean();
                    }
                    // Prevent thread safety issues
                    SessionHolder.getRetryRollbackingSessionManager().removeGlobalSession(rollbackingSession);
                    LOGGER.error("Global transaction rollback retry timeout and has removed [{}]", rollbackingSession.getXid());

                    SessionHelper.endRollbackFailed(rollbackingSession, true);

                    // rollback retry timeout event
                    MetricsPublisher.postSessionDoneEvent(rollbackingSession, GlobalStatus.RollbackRetryTimeout, true, false);

                    //The function of this 'return' is 'continue'.
                    return;
                }
                rollbackingSession.addSessionLifecycleListener(SessionHolder.getRootSessionManager());
                core.doGlobalRollback(rollbackingSession, true);
            } catch (TransactionException ex) {
                LOGGER.info("Failed to retry rollbacking [{}] {} {}", rollbackingSession.getXid(), ex.getCode(), ex.getMessage());
            }
        });
    }

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1858988.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

【面试题】等保(等级保护)的工作流程

等保&#xff08;等级保护&#xff09;的工作流程主要包括以下几个步骤&#xff0c;以下将详细分点介绍&#xff1a; 系统定级&#xff1a; 确定定级对象&#xff1a;根据《信息系统等级保护管理办法》和《信息系统等级保护定级指南》的要求&#xff0c;确定需要进行等级保护的…

Charles抓包工具系列文章(二)-- Repeat 回放http请求

一、什么是http请求回放 当我们对客户端进行抓包&#xff0c;经常会想要重试http请求&#xff0c;或者改写原有部分进行重新请求&#xff0c;都需要用到回放http请求。 还有一种场景是压力测试&#xff0c;对一个请求进行重复请求多少次&#xff0c;并加上适当的并发度。 这里…

2024年6月22日,雨中骑行谷仓坝游后记。 (校长骑行撰稿)

在这个快节奏的时代&#xff0c;生活中总是充满了无尽的压力和喧嚣。然而&#xff0c;骑行就像一股清流&#xff0c;给人们带来片刻的宁静和思考&#xff0c;是一种独特的释放方式。它不仅是一种锻炼&#xff0c;更是一种探索世界、理解生活的方式。这次&#xff0c;我们校长骑…

[职场] 线上面试的准备工作 #知识分享#经验分享#媒体

线上面试的准备工作 面对求职中的面试&#xff0c;应届毕业生该做些什么准备呢&#xff1f;在这里&#xff0c;向各位分享面试前做好预案不慌张几点准备。现在许多面试是通过线上形式进行的。对于求职者来说&#xff0c;要做好两手准备。在这里&#xff0c;重点与大家分享线上面…

pyhon模块以及常用的第三方模块

import my_info as info print(info.name) info.show()from my_info import * print(name) show() pyhon中包的导入 import admin.my_admin as ad # 包名.模块名 admin是包名&#xff0c;my_admin是模块名print(ad.name) print(ad.info())from admin import my_admin as ad # …

【可控图像生成系列论文(三)】北大 Context-Aware Unsupervised Text Stylization论文解读1

【可控图像生成系列论文&#xff08;一&#xff09;】 简要介绍了论文的整体流程和方法&#xff1b;【可控图像生成系列论文&#xff08;二&#xff09;】则将就整体方法、模型结构、训练数据和纹理迁移进行了更详细的介绍。 本篇将介绍来自 ACM MM 2018 的一篇字体风格化的可控…

【43 Pandas+Pyecharts | 京东某商品销量数据分析可视化】

文章目录 &#x1f3f3;️‍&#x1f308; 1. 导入模块&#x1f3f3;️‍&#x1f308; 2. Pandas数据处理2.1 读取数据2.2 查看数据信息2.3 查看数据描述信息 &#x1f3f3;️‍&#x1f308; 3. Pyecharts数据可视化3.1 销量(瓶)地图分布3.2 每月销量(瓶)3.3 男性女性购买数量…

《看不影子的少年》一部探讨偏见与接纳的电视剧❗

《看不见影子的少年》这部电视剧以其独特的视角和深刻的主题 给我留下了深刻的印象。该剧讲述了一位与众不同的少年 他无法在阳光下留下影子&#xff0c;象征着他在社会中的孤独与不被理解 观看过程中&#xff0c;可以感受到少年内心的挣扎与渴望 他渴望被接纳&#xff0c;渴…

【linux kernel】一文总结linux输入子系统

文章目录 一、导读二、重要数据数据结构&#xff08;2-1&#xff09;struct input_dev&#xff08;2-2&#xff09;input_dev_list和input_handler_list&#xff08;2-3&#xff09;struct input_handler 三、input核心的初始化四、常用API五、输入设备驱动开发总结(1)查看输入…

20240507-招商证券 基于鳄鱼线的指数择时及轮动策略

动量震荡指标构造 动量震荡指标为交易者提供了获利的钥匙。动量震荡指标测算了5根价格柱相对于34根价格柱的动量变化。首先计算最近5根价格柱的最高价和最低价间的中点的简单移动平均值,即(最高价最低价)12的简单移动平均,将得出的值减去最近34根价格柱的最高价和最低价中点的…

Spring Cloud:构建分布式系统的利器

引言 在当今的云计算和微服务架构时代&#xff0c;构建高效、可靠的分布式系统成为软件开发的重要任务。Spring Cloud 提供了一套完整的解决方案&#xff0c;帮助开发者快速构建分布式系统中的一些常见模式&#xff08;例如配置管理、服务发现、断路器等&#xff09;。本文将探…

Listary——最好用的电脑搜索文件软件

简易版&#xff1a; https://www.listary.com/download-completion?versionstable 完整功能版&#xff1a; Microsoft PowerToys | Microsoft Learn

【PA交易】BackTrader(一): 如何使用实时tick数据和蜡烛图

背景和需求 整合Tick数据是PA交易的回测与实盘基本需求。多数交易回测框架往往缺乏对大规模Tick数据直接而全面的支持。Tick数据因其体量庞大&#xff08;例如&#xff0c;某棕榈油主力合约四年间的数据达8GB&#xff09;为结合价格趋势与PA分析带来挑战&#xff0c;凸显了实时…

探索ChatTTS项目:高效的文字转语音解决方案

文章目录 &#x1f4d6; 介绍 &#x1f4d6;&#x1f4d2; ChatTTS &#x1f4d2;&#x1f4dd; 项目介绍&#x1f4dd; 项目亮点&#x1f4dd; UI &#x1f388; 项目地址 &#x1f388; &#x1f4d6; 介绍 &#x1f4d6; 在AI技术迅速发展的今天&#xff0c;文本到语音&…

[职场] 怎么写个人简历模板 #其他#知识分享

怎么写个人简历模板 怎么写个人简历模板1 姓名&#xff1a;xxx 性别&#xff1a;x 年龄&#xff1a;x岁 婚姻状况&#xff1a;x 最高学历&#xff1a;xx 政治面貌&#xff1a;xx 现居城市&#xff1a;xx 籍贯&#xff1a;xx 联系电话&#xff1a;xxxxxx 电子邮箱&#xff1a;xx…

CRMEB开源商城系统Java版:新零售时代的技术创新与实战案例

一、引言 随着新零售概念的兴起和电子商务的飞速发展&#xff0c;企业对商城系统的需求也日益多元化和个性化。CRMEB开源商城系统Java版&#xff0c;凭借其先进的技术架构、丰富的功能模块和灵活的扩展性&#xff0c;成为了众多企业构建和扩展自身电商业务的首选。本文将对CRM…

基于SpringBoot前后端分离在线骑行网站设计和实现(源码+LW+调试文档+讲解等)

&#x1f497;博主介绍&#xff1a;✌全网粉丝10W,CSDN作者、博客专家、全栈领域优质创作者&#xff0c;博客之星、平台优质作者、专注于Java、小程序技术领域和毕业项目实战✌&#x1f497; &#x1f31f;文末获取源码数据库&#x1f31f;感兴趣的可以先收藏起来&#xff0c;还…

我是如何在markdown编辑器中完成视频的插入和播放的

如果你有更好用的编辑器组件&#xff0c;请一定推荐给我!!!&#xff08;最好附带使用说明&#x1f913;️&#xff09; 介绍 在开发一个社区页面的时候&#xff0c;需要完成发帖、浏览帖子的能力。这里考虑接入markdown编辑器进行开发&#xff0c;也符合大多数用户的习惯。 …

MM-LLM:Internvl_chat.v1.5论文解读

这个模型在我自己测的结果上也是表现优异&#xff0c;和glm4v打得有来有回。是目前开源的效果最佳的模型之一。 官方的评测榜单&#xff1a;https://huggingface.co/spaces/opencompass/open_vlm_leaderboard 摘要&#xff1a; 直接说提出了一个拉近开源和商业多模态模型的开…

强强联合 极光推送(JPush)成为华为生态市场首家推送类SDK服务商

近日&#xff0c;中国领先的客户互动和营销科技服务商&#xff0c;极光&#xff08;Aurora Mobile&#xff0c;纳斯达克股票代码&#xff1a;JG&#xff09;的核心产品极光推送&#xff08;JPush&#xff09;顺利通过华为开发者联盟的多项测试及审核&#xff0c;成为首家在Harm…