Seata 源码篇之AT模式启动流程 - 中 - 03

news2025/1/10 12:10:55

Seata 源码篇之AT模式启动流程 - 中 - 03

  • 数据源代理
    • 会话代理
      • 锁定查询执行器
        • 本地事务提交
        • 本地事务回滚
      • 更新执行器
      • 删除执行器
      • 插入执行器
  • 小节


本系列文章:

  • Seata 源码篇之核心思想 - 01
  • Seata 源码篇之AT模式启动流程 - 上 - 02

数据源代理

当我们的数据源被代理后,代理数据源方法调用会走AOP拦截逻辑,也就是被SeataAutoDataSourceProxyAdvice的invoke方法拦截。invoke方法内部会将原本调用DataSource的方法转发给SeataDataSourceProxy执行:

   @Override
    public Object invoke(MethodInvocation invocation) throws Throwable {
        // 1. 检查当前是否存在全局事务,或者是否需要获取全局锁
        if (!inExpectedContext()) {
            return invocation.proceed();
        }

        // 2. 获取当前调用的是数据源的哪个方法
        Method method = invocation.getMethod();
        String name = method.getName();
        Class<?>[] parameterTypes = method.getParameterTypes();

        Method declared;
        try {
            declared = DataSource.class.getDeclaredMethod(name, parameterTypes);
        } catch (NoSuchMethodException e) {
            return invocation.proceed();
        }

        // 3. 取出当前数据源对应的SeataDataSourceProxy,然后调用代理数据源对应的方法
        DataSource origin = (DataSource) invocation.getThis();
        SeataDataSourceProxy proxy = DataSourceProxyHolder.get(origin);
        Object[] args = invocation.getArguments();
        return declared.invoke(proxy, args);
    }

在这里插入图片描述

SeataDataSourceProxy这里采用装饰器模式实现对DataSource的增强,同时借助动态代理实现对用户的无感装饰。这里有趣的一点在于,为什么不直接在拦截器invoke方法内部实现拦截逻辑,而是借助装饰器倒了一手,大家可以思考一下原因。

我们通常会通过DataSource的getConnection方法从连接池中获取一个空闲连接,然后借助Connection创建一个会话对象Statement,最后利用Statament对象完成SQL语句的执行。Seata需要拦截SQL执行,那么就不仅需要在DataSource层面做装饰增强,还需要在Connection和Statement层面同样进行装饰增强。

public class DataSourceProxy extends AbstractDataSourceProxy implements Resource {
    @Override
    public ConnectionProxy getConnection() throws SQLException {
        Connection targetConnection = targetDataSource.getConnection();
        // 返回的Connection对象同样采用装饰器进行增强
        return new ConnectionProxy(this, targetConnection);
    }
...
}    

ConnectionProxy 内部会在创建Statement会话对象的时候进行装饰增强:

public abstract class AbstractConnectionProxy implements Connection {
    @Override
    public Statement createStatement() throws SQLException {
        Statement targetStatement = getTargetConnection().createStatement();
        // 返回的Statement采用装饰器进行增强
        return new StatementProxy(this, targetStatement);
    }
    ... 
}    

但是ConnectionProxy不仅仅负责对Statement对象进行装饰,Seata还需要能够在commit和rollback等时间点进行拦截,因此ConnectionProxy的commit和rollback方法就不能只是简单的方法转发了,而是需要增加相关拦截逻辑,这一点后文会讲到。

同时ConnectionProxy内部还需要维护本次连接期间的上下文信息,上下文信息由ConnectionContext保存:

public class ConnectionContext {
    // 全局事务ID
    private String xid;
    // 分支事务ID
    private Long branchId;
    // 是否需要获取全局锁
    private boolean isGlobalLockRequire;
    // 自动提交的状态是否变更过
    private boolean autoCommitChanged;
    private final Map<String, Object> applicationData = new HashMap<>(2, 1.0001f);
    private final Map<Savepoint, Set<String>> lockKeysBuffer = new LinkedHashMap<>();
    private final Map<Savepoint, List<SQLUndoLog>> sqlUndoItemsBuffer = new LinkedHashMap<>();
    private final List<Savepoint> savepoints = new ArrayList<>(8);
    ...
}

StatementProxy 负责拦截本次会话中执行的每条SQL语句,并通过解析,查询前置和后置镜像,组装undo_log日志,最终完成本地事务的提交。

下面我们将来仔细分析一下StatementProxy的模版流程实现。


会话代理

StatementProxy 类中的拦截逻辑也是以模版方法固定下来的,但是由于模版逻辑存在于query,insert,delete 和 update 逻辑中,所以这里将模版逻辑抽取到了ExecuteTemplate类中:

public class StatementProxy<T extends Statement> extends AbstractStatementProxy<T> {
    @Override
    public ResultSet executeQuery(String sql) throws SQLException {
        this.targetSQL = sql;
        return ExecuteTemplate.execute(this, (statement, args) -> statement.executeQuery((String) args[0]), sql);
    }

    @Override
    public int executeUpdate(String sql) throws SQLException {
        this.targetSQL = sql;
        return ExecuteTemplate.execute(this, (statement, args) -> statement.executeUpdate((String) args[0]), sql);
    }
    
    ...
}

到此,应该可以猜到,SQL解析和前后镜像组织的核心逻辑都汇聚于ExecteTemplate类的execute方法中,下面我们来详细看看具体实现:

public class ExecuteTemplate {
    public static <T, S extends Statement> T execute(StatementProxy<S> statementProxy,
                                                     StatementCallback<T, S> statementCallback,
                                                     Object... args) throws SQLException {
        return execute(null, statementProxy, statementCallback, args);
    }

    public static <T, S extends Statement> T execute(List<SQLRecognizer> sqlRecognizers,
                                                     StatementProxy<S> statementProxy,
                                                     StatementCallback<T, S> statementCallback,
                                                     Object... args) throws SQLException {
        // 1. 当前事务执行无需获取全局锁,直接调用原本的Statement方法
        if (!RootContext.requireGlobalLock() && BranchType.AT != RootContext.getBranchType()) {
            return statementCallback.execute(statementProxy.getTargetStatement(), args);
        }
        // 2. 根据DB类型,获取对应的SQL解析器
        String dbType = statementProxy.getConnectionProxy().getDbType();
        if (CollectionUtils.isEmpty(sqlRecognizers)) {
            sqlRecognizers = SQLVisitorFactory.get(
                    statementProxy.getTargetSQL(),
                    dbType);
        }
        Executor<T> executor;
        // 3. 当前SQL无需执行任何拦截处理,直接调用原本的Statement方法
        if (CollectionUtils.isEmpty(sqlRecognizers)) {
            executor = new PlainExecutor<>(statementProxy, statementCallback);
        } else {
        // 4. 如果SQL解析器只存在唯一的一个
            if (sqlRecognizers.size() == 1) {
                SQLRecognizer sqlRecognizer = sqlRecognizers.get(0);
                // 5. 根据当前SQL类型,获取不同类型的执行拦截器
                switch (sqlRecognizer.getSQLType()) {
                    case INSERT:
                        executor = EnhancedServiceLoader.load(InsertExecutor.class, dbType,
                                    new Class[]{StatementProxy.class, StatementCallback.class, SQLRecognizer.class},
                                    new Object[]{statementProxy, statementCallback, sqlRecognizer});
                        break;
                    // 下面就是case update , case delete 等同质的选择逻辑了
                    ...
                }
            } else {
            // 6. 当存在多个SQL解析器时
                executor = new MultiExecutor<>(statementProxy, statementCallback, sqlRecognizers);
            }
        }
        T rs;
        // 7. 执行器的execute方法中包含SQL解析等逻辑,但是不同操作对于的处理逻辑不太一样,所以需要使用不同的执行器类型
        rs = executor.execute(args);
        ...
        return rs;
    }

}

ExecuteTemplate 会根据所执行的SQL语句类型不同,通过SPI加载不同类型的执行器来执行,执行器继承体系如下所示:
在这里插入图片描述
SQLRecognizer 作为SQL解析器,由于内部使用Druid作为最终解析工具,所有看做是Seata与Druid的一层隔离,防止两者直接耦合在一起。不同类型的SQL语句同样对应不同类型的SQLRecognizer实现,具体如下图所示:
在这里插入图片描述
执行器主要负责解析SQL语句来组织回滚日志,执行本地事务,获取全局锁以及提交本地事务。

下面我们来看看不同场景下,执行器执行逻辑的区别。


锁定查询执行器

我们首先来看最简单的SelectForUpdateExecutor实现,针对select … for update 语句进行拦截增强:

public abstract class BaseTransactionalExecutor<T, S extends Statement> implements Executor<T> {
    @Override
    public T execute(Object... args) throws Throwable {
        // 1. 获取全局事务ID,并绑定到当前连接上下文中
        String xid = RootContext.getXID();
        if (xid != null) {
            statementProxy.getConnectionProxy().bind(xid);
        }
        // 2. 将是否需要全局锁这一标识设置到连接上下文中 
        statementProxy.getConnectionProxy().setGlobalLockRequire(RootContext.requireGlobalLock());
        // 3. 真正执行查询的方法
        return doExecute(args);
    }
    
    ...
}
public class SelectForUpdateExecutor<T, S extends Statement> extends BaseTransactionalExecutor<T, S> {
    @Override
    public T doExecute(Object... args) throws Throwable {
        // 1. 获取代理数据源连接
        Connection conn = statementProxy.getConnection();
        // 2. 从代理数据源连接中获取当前数据库元数据信息
        DatabaseMetaData dbmd = conn.getMetaData();
        T rs;
        Savepoint sp = null;
        boolean originalAutoCommit = conn.getAutoCommit();
        try {
            // 3. 如果当前数据源开启了事务自动提交,则将自动提交暂时关闭
            if (originalAutoCommit) {
                conn.setAutoCommit(false);
            } else if (dbmd.supportsSavepoints()) {
            // 4. 如果当前数据源关闭了事务自动提交,则在当前数据源支持回滚点的前提下,创建一个回滚点
                // 如果因为全局锁获取失败,需要执行全局回滚,则可以直接回滚到当前事务执行到此处的状态,而非把之前的操作全部执行回滚
                sp = conn.setSavepoint();
            } else {
                throw new SQLException("not support savepoint. please check your db version");
            }

            LockRetryController lockRetryController = new LockRetryController();
            ArrayList<List<Object>> paramAppenderList = new ArrayList<>();
            // 5. 构建SQL语句负责查询出本次查询涉及到的所有记录: select 主键 from 表 需要执行的sql的where子句
            String selectPKSQL = buildSelectSQL(paramAppenderList);
            // 这里的while循环为的是全局锁获取失败后,进行重试
            while (true) {
                try {
                    // 6. 执行目标SQL查询语句
                    rs = statementCallback.execute(statementProxy.getTargetStatement(), args);
                    // 7. 执行before image对应的SQL语句,同时利用返回的记录列表,构建全局锁的key,该全局锁覆盖本次查询得到的记录列表
                    TableRecords selectPKRows = buildTableRecords(getTableMeta(), selectPKSQL, paramAppenderList);
                    String lockKeys = buildLockKey(selectPKRows);
                    if (StringUtils.isNullOrEmpty(lockKeys)) {
                        break;
                    }
                    // 8. 本地事务执行完毕,提交前,尝试获取全局锁  
                    if (RootContext.inGlobalTransaction() || RootContext.requireGlobalLock()) {
                        // Do the same thing under either @GlobalTransactional or @GlobalLock, 
                        // that only check the global lock  here.
                        statementProxy.getConnectionProxy().checkLock(lockKeys);
                    } else {
                        throw new RuntimeException("Unknown situation!");
                    }
                    break;
                } catch (LockConflictException lce) {
                    // 9. 获取全局锁失败,会先回滚当前本地事务,然后休眠指定时间后,再次重试
                    if (sp != null) {
                        conn.rollback(sp);
                    } else {
                        conn.rollback();
                    }
                    // trigger retry
                    lockRetryController.sleep(lce);
                }
            }
        } finally {
            // 10. 如果有需要,则释放先前临时的创建的回滚点,同时将自动提交设置更改回来
            if (sp != null) {
                try {
                    if (!JdbcConstants.ORACLE.equalsIgnoreCase(getDbType())) {
                        conn.releaseSavepoint(sp);
                    }
                } catch (SQLException e) {
                    LOGGER.error("{} release save point error.", getDbType(), e);
                }
            }
            // 11. 如果先前开启了自动提交,此处需要提交本地事务,同时将原本的自动提交设置更改回来
            if (originalAutoCommit) {
                conn.setAutoCommit(true);
            }
        }
        // 12. 返回执行目标SQL语句得到的结果
        return rs;
    }
    ...
}

由于select … for update 锁定读不涉及数据修改,所以也就无需前置和后置镜像了,但是这里有四点需要注意一下:

  1. selectPKSQL 如何构建出来的 ?
// 目标SQL语句
select * from test where id = 1 and name = 'dhy' for update;
// 构建得到的selectPKSQL,组装规则为: select + 主键 + from 表 + 目标SQL语句的where子句
SELECT id FROM test WHERE id = 1 AND name = 'dhy' FOR UPDATE;
  1. 全局锁的key是如何构成的 ?
// 对于只有单个主键的情况,例如上面的SQL语句,则key的模样如下
// 表名:记录1主键值,记录2主键值,记录3主键值
test:1,2
// 对于联合主键的情况,则key的模样如下
// 表名:记录1主键1值_记录1主键2值
test:1_a,2_b
  1. 如何判断全局锁是否获取成功 ?
public class ConnectionProxy extends AbstractConnectionProxy {
    public void checkLock(String lockKeys) throws SQLException {
        if (StringUtils.isBlank(lockKeys)) {
            return;
        }
        try {
            // 全局锁等资源由默认的资源管理器管理
            boolean lockable = DefaultResourceManager.get().lockQuery(BranchType.AT,
                getDataSourceProxy().getResourceId(), context.getXid(), lockKeys);
            // 获取全局锁失败,抛出锁冲突异常
            if (!lockable) {
                throw new LockConflictException(String.format("get lock failed, lockKey: %s",lockKeys));
            }
        } catch (TransactionException e) {
            // 识别是否为锁冲突异常,如果是的话,抛出锁冲突异常
            recognizeLockKeyConflictException(e, lockKeys);
        }
    }
    ...
}    

获取全局锁是否成功完全由资源管理器说的算,所以下面我们来看看资源管理器是如何判断全局锁是否获取成功的:

public class DataSourceManager extends AbstractResourceManager {
    @Override
    public boolean lockQuery(BranchType branchType, String resourceId, String xid, String lockKeys) throws TransactionException {
        // 1. 构建全局锁获取亲戚
        GlobalLockQueryRequest request = new GlobalLockQueryRequest();
        request.setXid(xid);
        request.setLockKey(lockKeys);
        request.setResourceId(resourceId);
        try {
            GlobalLockQueryResponse response;
            // 2. 借助RmNettyRemotingClient发送同步请求
            if (RootContext.inGlobalTransaction() || RootContext.requireGlobalLock()) {
                response = (GlobalLockQueryResponse) RmNettyRemotingClient.getInstance().sendSyncRequest(request);
            } else {
                throw new RuntimeException("unknow situation!");
            }
            // 3. 判断请求是否成功
            if (response.getResultCode() == ResultCode.Failed) {
                throw new TransactionException(response.getTransactionExceptionCode(),
                    "Response[" + response.getMsg() + "]");
            }
            // 4. 判断全局锁是否获取成功
            return response.isLockable();
        } catch (TimeoutException toe) {
            throw new RmTransactionException(TransactionExceptionCode.IO, "RPC Timeout", toe);
        } catch (RuntimeException rex) {
            throw new RmTransactionException(TransactionExceptionCode.LockableCheckFailed, "Runtime", rex);
        }
    }
    
    ...
}

全局锁资源具体是如何管理的,由server端负责承接逻辑实现,这一点在本系列文章后面会进行详细分析,这里暂时不展开。

最后还有一点,LockRetryController主要负责两件事:

  1. 维护重试次数和重试间隔
    public LockRetryController() {
        this.lockRetryInterval = getLockRetryInterval();
        this.lockRetryTimes = getLockRetryTimes();
    }
  1. 负责休眠等待逻辑实现
    public void sleep(Exception e) throws LockWaitTimeoutException {
        // prioritize the rollback of other transactions
        if (--lockRetryTimes < 0 || (e instanceof LockConflictException
            && ((LockConflictException)e).getCode() == TransactionExceptionCode.LockKeyConflictFailFast)) {
            throw new LockWaitTimeoutException("Global lock wait timeout", e);
        }

        try {
            Thread.sleep(lockRetryInterval);
        } catch (InterruptedException ignore) {
        }
    }

本地事务提交

关于本地事务提交这一点,由于内容较多,我想单独开一节进行讲解。

本地事务提交有两种方式,一种是在设置autoCommit属性为false的前提下,由开发者手动提交;另一种就是在设置autoCommit属性为true的前提下,由框架内部的模版代码先将自动提交关闭,执行完本地SQL语句和附加逻辑后,再最后由框架内部调用commit方法完成提交,同时恢复原先自动提交的属性设置。

要想享受上面框架提供的事务模版服务,我们需要首先把要执行的事务交付于框架托管,比如在事务方法上标注@Transactional注解,表示当前事务执行交由Spring事务模块托管;下面我们复习一下Spring为我们提供的事务执行的整套模版流程:

public abstract class TransactionAspectSupport implements BeanFactoryAware, InitializingBean {

	protected Object invokeWithinTransaction(Method method, @Nullable Class<?> targetClass,
			final InvocationCallback invocation) throws Throwable {

            ...
            // 1. 准备平台事务管理器  
		   PlatformTransactionManager ptm = asPlatformTransactionManager(tm);
           ...
           // 2. 根据事务传播行为,决定是否开启新的本地事务
	       TransactionInfo txInfo = createTransactionIfNecessary(ptm, txAttr, joinpointIdentification);
           // 3. 执行本地事务
		   Object retVal;
		   try {
				retVal = invocation.proceedWithInvocation();
		   }
		   catch (Throwable ex) {
		       // 4. 出现异常则回滚
			   completeTransactionAfterThrowing(txInfo, ex);
			   throw ex;
		   }
           ...
           // 5. 执行正常则提交
		   commitTransactionAfterReturning(txInfo);
		   return retVal;
	  }
        ...
   }
}    				
  1. 开启新的本地事务过程中,会调用PlatformTransactionManager的doBegin方法真正开启一次新的事务连接
	@Override
	protected void doBegin(Object transaction, TransactionDefinition definition) {
		    DataSourceTransactionObject txObject = (DataSourceTransactionObject) transaction;
            ...
			con = txObject.getConnectionHolder().getConnection();
			// 核心: 如果连接开启了自动提交,则关闭自动提交,并设置标记
			if (con.getAutoCommit()) {
				txObject.setMustRestoreAutoCommit(true);
				con.setAutoCommit(false);
			}
			...
	}
  1. 无论我们是设置了手动提交,还是默认的自动提交,只要事务交由了Spring托管,那么Spring便会在目标方法正常执行完毕后,进行commit
	@Override
	protected void doCommit(DefaultTransactionStatus status) {
		DataSourceTransactionObject txObject = (DataSourceTransactionObject) status.getTransaction();
		Connection con = txObject.getConnectionHolder().getConnection();
		...
		con.commit();
	    ...
	}
  1. 在事务提交完毕后的资源清理环节,会将先前更改的自动提交恢复过来
	@Override
	protected void doCleanupAfterCompletion(Object transaction) {
		DataSourceTransactionObject txObject = (DataSourceTransactionObject) transaction;
		...
		// 重新将连接设置回自动提交
		Connection con = txObject.getConnectionHolder().getConnection();
		if (txObject.isMustRestoreAutoCommit()) {
				con.setAutoCommit(true);
		}
		...
	}

Spring声明式事务实现的特点在于,如果当前连接设置的是自动提交,则更改为手动提交,同时目标方法执行完毕后,统一由Spring框架内部帮我们调用commit方法完成本地事务提交。我们下面会看到Seata的实现有所不同。

Seata在遇到设置为自动提交模式的连接处理上,和Spring处理思路一致,但是当连接处于手动提交模式时,Seata不会在目标方法执行完毕后,帮助我们统一调用commit进行提交,而是需要开发者自行调用commit方法进行提交。

对于SelectForUpdateExecutor来说,其execute方法可以简化为如下过程:

    @Override
    public T doExecute(Object... args) throws Throwable {
        ... 
        // 1. 如果当前连接开启了自动提交,则先关闭自动提交
        boolean originalAutoCommit = conn.getAutoCommit();
        if (originalAutoCommit) {
            conn.setAutoCommit(false);
        }
        // 2. 执行业务SQL
        ...
        // 3. 如果先前为自动提交模式,则在此处提交本地事务,同时恢复自动提交
        if (originalAutoCommit) {
            conn.setAutoCommit(true);
        }
        return rs;
    }

Seata只会在自动提交模式下,才会由框架内部在业务SQL执行完毕后,帮助我们调用commit方法完成本地事务提交,这一点需要注意。

下面看看本地事务提交的具体过程:

public class ConnectionProxy extends AbstractConnectionProxy {
    @Override
    public void setAutoCommit(boolean autoCommit) throws SQLException {
        // 1. 在开启了全局事务场景下,并且原先是自动提交模式,则在此处由框架内部手动帮助我们提交事务
        if ((context.inGlobalTransaction() || context.isGlobalLockRequire()) && autoCommit && !getAutoCommit()) {
            doCommit();
        }
        // 2. 恢复自动提交设置
        targetConnection.setAutoCommit(autoCommit);
    }
    ...
}    

真正的本地事务提交逻辑在doCommit方法中,我们下面来看看其具体实现:

    private void doCommit() throws SQLException {
        // 1. 如果处于全局事务模式下,则走seata拦截逻辑
        if (context.inGlobalTransaction()) {
            processGlobalTransactionCommit();
        } else if (context.isGlobalLockRequire()) {
            processLocalCommitWithGlobalLocks();
        } else {
        // 2. 否则直接走正常事务提交逻辑  
            targetConnection.commit();
        }
    }

下面看看对标注了@GlobalTransactional注解的目标方法的事务提交逻辑实现:

    private void processGlobalTransactionCommit() throws SQLException {
        try {
            // 1. 当前分支事务执行本地提交前,先执行分支事务注册(分支注册过程同时包含获取全局锁逻辑)
            register();
        } catch (TransactionException e) {
            // 识别是否为全局锁获取失败抛出的异常
            recognizeLockKeyConflictException(e, context.buildLockKeys());
        }
        try {
            // 2. 刷新UNDO日志到undo_log表中
            UndoLogManagerFactory.getUndoLogManager(this.getDbType()).flushUndoLogs(this);
            // 3. 执行本地事务提前(这里包括原有的业务逻辑和undo_log日志的落盘逻辑)
            targetConnection.commit();
        } catch (Throwable ex) {
            // 4. 上报分支事务执行失败
            report(false);
            throw new SQLException(ex);
        }
        // 5. 上报分支事务执行成功
        if (IS_REPORT_SUCCESS_ENABLE) {
            report(true);
        }
        // 6. 清空当前连接上下文
        context.reset();
    }

    private void register() throws TransactionException {
        if (!context.hasUndoLog() || !context.hasLockKey()) {
            return;
        }
        // 1. 分支事务提交前,到TC中进行注册
        Long branchId = DefaultResourceManager.get().branchRegister(BranchType.AT, getDataSourceProxy().getResourceId(),
            null, context.getXid(), context.getApplicationData(),
            // 传入需要获取的全局锁key,不难猜出,分支事务注册的同时,还包括获取全局锁的逻辑
            context.buildLockKeys());
        // 2. 保存分支事务ID
        context.setBranchId(branchId);
    }

如果将连接设置为手动提交模式,则需要开发者手动调用ConnectionProxy的commit方法完成分支事务提交:

    @Override
    public void commit() throws SQLException {
        try {
            // 分支事务一阶段提交包括注册和获取全局锁两个过程,如果全局锁获取失败
            // 此处由lockRetryPolicy提供的模版方法,完成重试抢锁
            lockRetryPolicy.execute(() -> {
                // 同样是调用doCommit方法完成分支事务一阶段提交
                doCommit();
                return null;
            });
        } catch (SQLException e) {
            if (targetConnection != null && !getAutoCommit() && !getContext().isAutoCommitChanged()) {
                rollback();
            }
            throw e;
        } catch (Exception e) {
            throw new SQLException(e);
        }
    }

SelectForUpdateExecutor 自动提交模式下无需在setAutoCommit方法中处理抢锁失败逻辑,是因为这段逻辑已经存在于了execute方法中,有遗忘的可以回看。而doCommit方法不只SelectForUpdateExecutor 会调用,所以内部需要处理获取全局锁失败逻辑。


本地事务回滚

本地事务回滚会调用ConnectionProxy的rollback方法,回滚逻辑实现比较简单,关键在于会向TC报告自己本地执行失败的状态:

    @Override
    public void rollback() throws SQLException {
        // 1. 执行正常的回滚操作
        targetConnection.rollback();
        // 2. 向TC报告当前分支事务的状态
        if (context.inGlobalTransaction() && context.isBranchRegistered()) {
            report(false);
        }
        // 3. 清空连接上下文
        context.reset();
    }

大家可以思考一下,当TC收到某个分支事务执行失败的状态后,它又是如何通知其他分支事务完成回滚的呢?这部分内容将在本系列后面揭晓。


更新执行器

UpdateExecutor 的逻辑是其次简单的,我们来看看其实现逻辑:

public abstract class BaseTransactionalExecutor<T, S extends Statement> implements Executor<T> {
    @Override
    public T execute(Object... args) throws Throwable {
        // 1. 获取全局事务ID,并绑定到当前连接上下文中
        String xid = RootContext.getXID();
        if (xid != null) {
            statementProxy.getConnectionProxy().bind(xid);
        }
        // 2. 将是否需要全局锁这一标识设置到连接上下文中 
        statementProxy.getConnectionProxy().setGlobalLockRequire(RootContext.requireGlobalLock());
        // 3. 真正执行查询的方法
        return doExecute(args);
    }
    
    ...
}
public abstract class AbstractDMLBaseExecutor<T, S extends Statement> extends BaseTransactionalExecutor<T, S> {
    @Override
    public T doExecute(Object... args) throws Throwable {
        AbstractConnectionProxy connectionProxy = statementProxy.getConnectionProxy();
        // 如果当前连接本身已经开启了自动提交,则在事务执行前,关闭自动提交,执行结束后,再开启自动提交
        if (connectionProxy.getAutoCommit()) {
            return executeAutoCommitTrue(args);
        } else {
            return executeAutoCommitFalse(args);
        }
    }
    ...
}    

下面我们来看看处理更全面的一种情况,也就是连接本身开启了自动提交的前提下,是如何进行处理的:

public abstract class AbstractDMLBaseExecutor<T, S extends Statement> extends BaseTransactionalExecutor<T, S> {
    
    protected T executeAutoCommitTrue(Object[] args) throws Throwable {
        ConnectionProxy connectionProxy = statementProxy.getConnectionProxy();
        try {
            // 1. 关闭自动提交设置
            connectionProxy.changeAutoCommit();
            // 2. 分支事务一阶段提交逻辑包含获取全局锁的逻辑,所以需要处理抢锁失败的重试逻辑
            return new LockRetryPolicy(connectionProxy).execute(() -> {
                // 执行SQL解析等拦截逻辑,然后执行最终的目标SQL语句
                T result = executeAutoCommitFalse(args);
                // 提交事务
                connectionProxy.commit();
                return result;
            });
        } catch (Exception e) {
            if (!LockRetryPolicy.isLockRetryPolicyBranchRollbackOnConflict()) {
                connectionProxy.getTargetConnection().rollback();
            }
            throw e;
        } finally {
            // 恢复自动提交设置
            connectionProxy.getContext().reset();
            connectionProxy.setAutoCommit(true);
        }
    }
    
    ... 
}
    protected T executeAutoCommitFalse(Object[] args) throws Exception {
        try {
            // 1. 获取前置镜像
            TableRecords beforeImage = beforeImage();
            // 2. 执行正常的SQL语句
            T result = statementCallback.execute(statementProxy.getTargetStatement(), args);
            // 3. 获取后置镜像
            TableRecords afterImage = afterImage(beforeImage);
            // 4. 准备undo日志
            prepareUndoLog(beforeImage, afterImage);
            return result;
        } catch (TableMetaException e) {
            ...
        }
    }

更新过程中的拦截逻辑核心就三步,下面我们来详细看看每一步的具体实现过程:

  1. 准备前置镜像
    @Override
    protected TableRecords beforeImage() throws SQLException {
        ArrayList<List<Object>> paramAppenderList = new ArrayList<>();
        TableMeta tmeta = getTableMeta();
        // 1. 构建组装前置镜像对应的SQL语句
        String selectSQL = buildBeforeImageSQL(tmeta, paramAppenderList);
        // 2. 执行该SQL语句,然后获取查询出来的记录
        return buildTableRecords(tmeta, selectSQL, paramAppenderList);
    }

这里简单讲讲前置镜像SQL语句组装的规则:

// 目标SQL语句
update test set name = 'WILL' where age = 18
// 构建得到的SQL语句
// 规则: select  + 主键,update涉及列 + from 表 + 目标SQL语句的where子句 
SELECT id, name FROM test WHERE age = 18 FOR UPDATE
  1. 准备后置镜像
    @Override
    protected TableRecords afterImage(TableRecords beforeImage) throws SQLException {
        TableMeta tmeta = getTableMeta();
        if (beforeImage == null || beforeImage.size() == 0) {
            return TableRecords.empty(getTableMeta());
        }
        // 1. 构建后置镜像查询SQL
        String selectSQL = buildAfterImageSQL(tmeta, beforeImage);
        ResultSet rs = null;
        // 2. 执行后置镜像查询SQL,返回查询结果
        try (PreparedStatement pst = statementProxy.getConnection().prepareStatement(selectSQL)) {
            SqlGenerateUtils.setParamForPk(beforeImage.pkRows(), getTableMeta().getPrimaryKeyOnlyName(), pst);
            rs = pst.executeQuery();
            return TableRecords.buildRecords(tmeta, rs);
        } finally {
            IOUtil.close(rs);
        }
    }

后置镜像SQL语句组装是依赖于前置镜像SQL的,而非目标SQL语句,:

// 目标SQL语句
update test set name = 'WILL' where age = 18
// 前置镜像SQL语句
SELECT id, name FROM test WHERE age = 18 FOR UPDATE
// 构建得到的SQL语句
// 规则: select  + 主键,update涉及列 + from 表 + where 主键 in (前置镜像查询出来的记录列表的主键列聚合得到的主键列表)
SELECT id, name FROM test WHERE (id) in ( (?),(?) )
  1. 准备undo_log日志
    protected void prepareUndoLog(TableRecords beforeImage, TableRecords afterImage) throws SQLException {
        // 0. 健壮性检查
        if (beforeImage.getRows().isEmpty() && afterImage.getRows().isEmpty()) {
            return;
        }
        if (SQLType.UPDATE == sqlRecognizer.getSQLType()) {
            if (beforeImage.getRows().size() != afterImage.getRows().size()) {
                throw new ShouldNeverHappenException("Before image size is not equaled to after image size, probably because you updated the primary keys.");
            }
        }
        ConnectionProxy connectionProxy = statementProxy.getConnectionProxy();
        // 1. 提交本地事务前,需要获取对应的全局锁,如果此处执行的时删除语句,则以前置镜像作为锁记录,否则以后置镜像作为锁记录(删除操作,无需记录后置镜像)
        TableRecords lockKeyRecords = sqlRecognizer.getSQLType() == SQLType.DELETE ? beforeImage : afterImage;
        // (此处key的组装规则和上文说的一致)
        String lockKeys = buildLockKey(lockKeyRecords);
        if (null != lockKeys) {
            // 2. 向当前连接上下文的lockKeysBuffer中追加需要获取的全局锁key
            connectionProxy.appendLockKey(lockKeys);
            // 3. 构建undo日志
            SQLUndoLog sqlUndoLog = buildUndoItem(beforeImage, afterImage);
            // 4. 像当前连接上下文的sqlUndoItemsBuffer中追加构建好的undo日志
            connectionProxy.appendUndoLog(sqlUndoLog);
        }
    }

构建undo日志的具体过程如下:

    protected SQLUndoLog buildUndoItem(TableRecords beforeImage, TableRecords afterImage) {
        SQLType sqlType = sqlRecognizer.getSQLType();
        String tableName = sqlRecognizer.getTableName();

        SQLUndoLog sqlUndoLog = new SQLUndoLog();
        sqlUndoLog.setSqlType(sqlType);
        sqlUndoLog.setTableName(tableName);
        sqlUndoLog.setBeforeImage(beforeImage);
        sqlUndoLog.setAfterImage(afterImage);
        return sqlUndoLog;
    }

lockKeysBuffer 和 sqlUndoItemsBuffer 会在当前本地事务提交的时候用到。

分支事务提交会调用ConnectionProxy的doCommit方法,这一点上面已经说过了,而doCommit方法会在注册分支事务的同时,传入需要获取的全局锁的key:
在这里插入图片描述
ConnectionContext 的buildLockKeys 方法中会遍历lockKeysBuffer 集合,对所有需要获取的全局锁key进行拼接:

    public String buildLockKeys() {
        if (lockKeysBuffer.isEmpty()) {
            return null;
        }
        // 1. 获取所有全局锁key
        Set<String> lockKeysBufferSet = new HashSet<>();
        for (Set<String> lockKeys : lockKeysBuffer.values()) {
            lockKeysBufferSet.addAll(lockKeys);
        }

        if (lockKeysBufferSet.isEmpty()) {
            return null;
        }
        // 2. 用;拼接在一起,然后返回
        StringBuilder appender = new StringBuilder();
        Iterator<String> iterable = lockKeysBufferSet.iterator();
        while (iterable.hasNext()) {
            appender.append(iterable.next());
            if (iterable.hasNext()) {
                appender.append(";");
            }
        }
        return appender.toString();
    }

执行完分支事务注册和全局锁获取后,下一步就是向undo_log表中写入undo日志,然后提交本地事务了; 这里会调用UndoLogManager的flushUndoLogs写入undo日志,下面我们一起来看看:

    @Override
    public void flushUndoLogs(ConnectionProxy cp) throws SQLException {
        ConnectionContext connectionContext = cp.getContext();
        if (!connectionContext.hasUndoLog()) {
            return;
        }
        // 1. 准备分支事务的undo日志
        String xid = connectionContext.getXid();
        long branchId = connectionContext.getBranchId();
        BranchUndoLog branchUndoLog = new BranchUndoLog();
        branchUndoLog.setXid(xid);
        branchUndoLog.setBranchId(branchId);
        branchUndoLog.setSqlUndoLogs(connectionContext.getUndoItems());
        // 2. 调用undo日志解析器对undo日志进行编码
        UndoLogParser parser = UndoLogParserFactory.getInstance();
        byte[] undoLogContent = parser.encode(branchUndoLog);
        // 3. 尝试对undo日志进行压缩
        CompressorType compressorType = CompressorType.NONE;
        if (needCompress(undoLogContent)) {
            compressorType = ROLLBACK_INFO_COMPRESS_TYPE;
            undoLogContent = CompressorFactory.getCompressor(compressorType.getCode()).compress(undoLogContent);
        }
        // 4. 将undo日志插入undo_log表中,如果使用的是Mysql,这里会调用MySQLUndoLogManager的insertUndoLogWithNormal实现
        insertUndoLogWithNormal(xid, branchId, buildContext(parser.getName(), compressorType), undoLogContent, cp.getTargetConnection());
    }

删除执行器

本节来看看DeleteExecutor的执行流程,由于DeleteExecutor和UpdateExecutor都继承了AbstractDMLBaseExecutor,所以二者主要区别主要集中在前置和后置镜像构建的逻辑上,下面我们一起来看一下:

  1. 构建前置镜像
public class DeleteExecutor<T, S extends Statement> extends AbstractDMLBaseExecutor<T, S> {
    @Override
    protected TableRecords beforeImage() throws SQLException {
        SQLDeleteRecognizer visitor = (SQLDeleteRecognizer) sqlRecognizer;
        TableMeta tmeta = getTableMeta(visitor.getTableName());
        ArrayList<List<Object>> paramAppenderList = new ArrayList<>();
        // 构建前置镜像对应的SQL,然后执行该SQL,返回查询得到的结果
        String selectSQL = buildBeforeImageSQL(visitor, tmeta, paramAppenderList);
        return buildTableRecords(tmeta, selectSQL, paramAppenderList);
    }
     
    ... 
}

这里还是老规矩,来看看前置镜像SQL组装的逻辑:

// 目标SQL
delete from t where id = 1
// 前置镜像SQL
// 组装规则: select * from 表 + 目标SQL的where子句
SELECT name, id FROM t WHERE id = 1 FOR UPDATE
  1. delete操作无需后置镜像,所以返回结果集合为空
    @Override
    protected TableRecords afterImage(TableRecords beforeImage) throws SQLException {
        return TableRecords.empty(getTableMeta());
    }

插入执行器

本节我们再来看看MySQLInsertExecutor的执行流程,由于MySQLInsertExecutor同样继承了AbstractDMLBaseExecutor,所以这里我们也只对前置和后置镜像构建过程进行分析:

  1. 构建前置镜像
    @Override
    protected TableRecords beforeImage() throws SQLException {
         // insert操作无需前置镜像,所以返回的空集合
        return TableRecords.empty(getTableMeta());
    }
  1. 构建后置镜像
    @Override
    protected TableRecords afterImage(TableRecords beforeImage) throws SQLException {
        // 1. 获取插入记录的主键值
        Map<String, List<Object>> pkValues = getPkValues();
        // 2. 正常情况下,此处返回的后置镜像里面的内容也是空的,因为记录插入之前并不存在
        TableRecords afterImage = buildTableRecords(pkValues);
        if (afterImage == null) {
            throw new SQLException("Failed to build after-image for insert");
        }
        return afterImage;
    }

除了简单的增删改查语句外,还有涉及Join的更新操作,InsertOnUpdate等操作,这部分操作对应的执行器实现大家可自行翻阅源码进行学习。


小节

本文和大家一起探索了Seata AT模式的一阶段实现,下篇文章将和大家一起来看看AT模式二阶段的实现和一阶段中漏掉的全局事务提交和回滚。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1062514.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

[C]嵌入式中变量存储方案

#include<stdio.h>#define uint8_t unsigned char #define uint16_t unsigned short #define uint24_t unsigned int #define uint32_t unsigned int #define uint64_t unsigned long long//用户自定义变量名字&#xff0c;用于存储 typedef enum {first_run 0,//…

Linux: tcpdump抓包示例

文章目录 1. 前言2. TCP 状态机3. tcpdump 抓包示例3.1 抓连接握手包&#xff1a;三次握手3.2 抓数据包示例3.3 抓终结连接&#xff1a;四次挥手 4. 参考资料 1. 前言 限于作者能力水平&#xff0c;本文可能存在谬误&#xff0c;因此而给读者带来的损失&#xff0c;作者不做任…

【面试HOT100】哈希双指针滑动窗口

系列综述&#xff1a; &#x1f49e;目的&#xff1a;本系列是个人整理为了秋招面试的&#xff0c;整理期间苛求每个知识点&#xff0c;平衡理解简易度与深入程度。 &#x1f970;来源&#xff1a;材料主要源于LeetCodeHot100进行的&#xff0c;每个知识点的修正和深入主要参考…

创建vue3工程

一、新建工程目录E:\vue\projectCode\npm-demo用Visual Studio Code 打开目录 二、点击新建文件夹按钮&#xff0c;新建vue3-01-core文件夹 三、右键vue3-01-core文件夹点击在集成终端中打开 四、初始化项目&#xff0c;输入npm init 一直敲回车直到创建成功如下图 npm init 五…

The directory ‘*‘ or its parent directory is not owned by the current user

python安装编译时出现如下错误 The directory /home/admin/.cache/pip/http or its parent directory is not owned by the current user and the cache has been disabled. Please check the permissions and owner of that directory. If executing pip with sudo, you may …

【办公软件】案例:电路中计算出的电阻值为5欧,怎么通过Excel匹配到仓库里最接近的电阻值?

在实际工作中&#xff0c;比如我们计算出一个电阻值为46欧&#xff0c;那么我们的库里到底是有哪个电阻值最接近呢&#xff1f;可能有一些有经验的工程师会说当然是47欧呀。 但是如果我们计算出来的是80.2欧呢&#xff1f;是不是得去查一下表格看看到底哪个最接近&#xff0c;…

PyTorch入门之【tensor】

目录 tensor的创建tensor的相关信息tensor的运算 tensor的创建 1.手动创建 import torch test1torch.tensor([1,2,3])#一维时为向量 test2torch.tensor([[1,2,3]])#二维时为矩阵 test3torch.tensor([[[1,2,3]]])#三维及以上统称为tensor print(test1) print(test2) print(tes…

C++ 类和对象篇(四) 构造函数

目录 一、概念 1. 构造函数是什么&#xff1f; 2. 为什么C要引入构造函数&#xff1f; 3. 怎么用构造函数&#xff1f; 3.1 创建构造函数 3.2 调用构造函数 二、构造函数的特性 三、构造函数对成员变量初始化 0. 对构造函数和成员变量分类 1. 带参构造函数对成员变量初始化 2. …

云存储解决方案-阿里云OSS

1. 阿里云OSS简介 阿里云对象存储服务&#xff08;Object Storage Service&#xff0c;简称OSS&#xff09;为用户提供基于网络的数据存取服务。使用OSS&#xff0c;用户可以通过网络随时存储和调用包括文本、图片、音频和视频等在内的各种非结构化数据文件。 阿里云OSS将数据…

练[BJDCTF2020]Easy MD5

[BJDCTF2020]Easy MD5 文章目录 [BJDCTF2020]Easy MD5掌握知识解题思路关键paylaod 掌握知识 ​ 强等于和弱等于的MD5绕过&#xff0c;数据库查询的MD5加密绕过&#xff0c;代码审计 解题思路 打开题目链接&#xff0c;发现是一个post提交框&#xff0c;提交完了也就是url发…

自然语言处理 | WordNet

WordNet是词汇数据库,即英语词典,专为自然语言处理而设计。 Synset是一种特殊的简单接口,存在于 NLTK 中, 用于在 WordNet 中查找单词。同义词集实例是表达相同概念的同义词的分组。有些单词只有一个同义词集,有些则有多个。

【kubernetes】使用helm部署redis

1 什么是helm 在学习使用k8s进行应用的部署时&#xff0c;或者从github上下载一些组件进行部署时&#xff0c;通常是直接用yaml的方式部署&#xff0c;用这种方式部署时&#xff0c;有个比较大的问题是&#xff0c;当参数需要调整时&#xff0c;就需要阅读整个yaml文件&#x…

UG\NX CAM二次开发 加工模块获取 UF _ask_application_module

文章作者:代工 来源网站:NX CAM二次开发专栏 简介: UG\NX CAM二次开发 加工模块获取 UF _ask_application_module 代码: void MyClass::do_it() { // TODO: add your code here // 获取NX当前所在的模块 int module_id = 0; // UF_ask_application_module(&…

Android改造CardView为圆形View,Kotlin

Android改造CardView为圆形View&#xff0c;Kotlin 可以利用androidx.cardview.widget.CardView的cardCornerRadius特性&#xff0c;将CardView改造成一个圆形的View&#xff0c;技术实现的关键首先设定CardView为一个宽高相等的View&#xff08;正方形&#xff09;&#xff0c…

在PHP8中使用instanceof操作符检测对象类型-PHP8知识详解

在PHP8中使用instanceof操作符可以检测当前对象属于哪个类。语法格式如下&#xff1a; objectName instanceof classname下面我们用一个实例来讲解使用instanceof操作符检测对象类型。 本实例将将创建3个类&#xff0c;其中有两个类是父类和子类的关系&#xff0c;然后实例化…

时序预测 | MATLAB实现EMD-iCHOA+GRU基于经验模态分解-改进黑猩猩算法优化门控循环单元的时间序列预测

时序预测 | MATLAB实现EMD-iCHOAGRU基于经验模态分解-改进黑猩猩算法优化门控循环单元的时间序列预测 目录 时序预测 | MATLAB实现EMD-iCHOAGRU基于经验模态分解-改进黑猩猩算法优化门控循环单元的时间序列预测预测效果基本介绍程序设计参考资料 预测效果 基本介绍 EMD-iCHOAGR…

第一百六十四回 如何实现NumberPicker

文章目录 1.概念介绍2.使用方法2.1 NumberPicker2.2 CupertinoPicker 3.示例代码4.内容总结 我们在上一章回中介绍了"如何在任意位置显示PopupMenu"相关的内容&#xff0c;本章回中将介绍如何实现NumberPicker.闲话休提&#xff0c;让我们一起Talk Flutter吧。 1.概…

【知识点随笔分析 | 第七篇】什么是Cookie、Session、Token

前言&#xff1a; 当今互联网世界的发展让网站和应用程序扮演着重要的角色。为了实现用户身份验证、数据传输和用户状态管理等功能&#xff0c;开发人员常常使用一些关键技术来确保安全性和持久性。而在这些技术中&#xff0c;Cookie、Session和Token是最常见和广泛使用的三种机…

C++基础知识(三) -- 引用

1 引用概念 引用不是新定义一个变量&#xff0c;而是给已存在变量取了一个别名(俗称)&#xff0c;编译器不会为引用变量开辟内存空间&#xff0c;它和它引用的变量共用同一块内存空间。 比如&#xff1a;李逵&#xff0c;在家称为"铁牛"&#xff0c;江湖上人称"…

动态内存管理<C语言>

✨Blog&#xff1a;&#x1f970;不会敲代码的小张:)&#x1f970; &#x1f251;推荐专栏&#xff1a;C语言&#x1f92a;、Cpp&#x1f636;‍&#x1f32b;️、数据结构初阶&#x1f480; &#x1f4bd;座右铭&#xff1a;“記住&#xff0c;每一天都是一個新的開始&#x1…