Seata AT模式的一些常见问题及其源码解析

news2025/4/14 6:59:33

Seata 是一款开源的分布式事务解决方案,致力于提供高性能和简单易用的分布式事务服务。Seata 将为用户提供了 AT、TCC、SAGA 和 XA 事务模式,为用户打造一站式的分布式解决方案。

Seata AT

基于两阶段提交协议的演变:

一阶段:业务数据和回滚日志记录在同一个本地事务中提交,释放本地锁和连接资源。

二阶段:提交异步化,非常快速地完成。回滚通过一阶段的回滚日志进行反向补偿。

AT 模式是一种非侵入式的分布式事务解决方案,Seata 在内部做了对数据库操作的代理层,我们使用 Seata AT 模式时,实际上用的是 Seata 自带的数据源代理 DataSourceProxy,Seata 在这层代理中加入了很多逻辑,比如插入回滚 undo_log 日志,检查全局锁等。

通过日志观察客户端的全局事务处理流程

当微服务B的方法开启了本地事务,在微服务中先后执行了 insert a , update b , 业务逻辑c ,update d。微服务A的方法通过声明式注解@GlobalTransactional开启全局事务,然后通过feign调用微服务B的方法。

微服务A
[  XNIO-1 task-2] i.seata.tm.api.DefaultGlobalTransaction  : Begin new global transaction [172.17.0.7:8091:72680625000142901]
[  XNIO-1 task-4] io.seata.rm.AbstractResourceManager      : branch register success, xid:172.17.0.8:8091:72680625000142901, branchId:72680625000142905, lockKeys:tableName:pkid;tableName:pkid,pkid
[  XNIO-1 task-2] i.seata.tm.api.DefaultGlobalTransaction  : transaction 172.17.0.8:8091:72680625000142901 will be commit
[  XNIO-1 task-2] i.seata.tm.api.DefaultGlobalTransaction  : [172.17.0.8:8091:72680625000142901] commit status: Committed
[  XNIO-1 task-2] i.seata.tm.api.DefaultGlobalTransaction  : transaction end, xid = 172.17.0.8:8091:72680625000142901

[_RMROLE_1_26_32] io.seata.rm.AbstractRMHandler            : Branch committing: 172.17.0.8:8091:72680625000142901 72680625000142905 jdbc:postgresql://ip:port/schema {"autoCommit":false}
[_RMROLE_1_26_32] i.s.c.r.p.c.RmBranchCommitProcessor      : rm client handle branch commit process:BranchCommitRequest{xid='172.17.0.8:8091:72680625000142901', branchId=72680625000142905, branchType=AT, resourceId='jdbc:postgresql://ip:port/schema?currentSchema=schema', applicationData='{"autoCommit":false}'}

微服务B
[  XNIO-1 task-7] io.seata.rm.AbstractResourceManager      : branch register success, xid:172.17.0.8:8091:72680625000142901, branchId:72680625000142907, lockKeys:tableName:pkid

注意观察分支事务提交时线程的不同,证明是tc异步通知分支事务异步提交的。

1.分支事务内遇到异常,但全局事务不回滚?

当微服务B的方法开启了本地事务,在微服务中先后执行了 insert a , update b , 业务逻辑c ,update d。当某段逻辑(如业务c)执行中出现异常时,本地事务会触发回滚。(视rollbackFor、隔离级别、事务代理、事务上下文等诸多原因决定是否真的会回滚。)

此时,假设微服务A的方法通过声明式注解@GlobalTransactional开启全局事务,然后通过feign调用微服务B的方法,同样的当业务c执行中出现异常时,全局事务却未必回滚。

异常时全局事务处理流程

通过日志观察,分支事务遇到异常后,并没有提交到tc。即tc不会感知到分支事务的异常。
而微服务A没有输出微服务B的异常,正常提交了全局事务。

为什么微服务A没有捕获到微服务B的异常呢?是由于微服务B中配置的RestControllerAdvice捕获了svc的异常。

如下

@ExceptionHandler({BizException.class})
@ResponseBody
@ResponseStatus(HttpStatus.OK)
public Response handleGlobalException(Exception e) {
    String serviceName = this.environment.getProperty("spring.application.name");
    String errorMsg = String.format("系统异常,[%s],ex=%s", serviceName, e.getMessage());
    log.error(errorMsg, e);
    return Response.failed(e.getMessage());
}

ResponseStatus为200,feign感知不到异常不会报错,微服务A也就无从感知到异常。

调整如下:

@ExceptionHandler({BizException.class})
@ResponseBody
@ResponseStatus(HttpStatus.INTERNAL_SERVER_ERROR)
public Response handleGlobalException(Exception e) {
    String serviceName = this.environment.getProperty("spring.application.name");
    String errorMsg = String.format("系统异常,[%s],ex=%s", serviceName, e.getMessage());
    log.error(errorMsg, e);
    return Response.failed(e.getMessage());
}

微服务A的报错如下

feign.FeignException$InternalServerError: [500 Internal Server Error] during [POST] to [http://serviceName/url] [Client#method(List)]: [{"code":xxxx,"msg":"插入重复数据","data":null,"success":false}]
 at feign.FeignException.serverErrorStatus(FeignException.java:259)
 at feign.FeignException.errorStatus(FeignException.java:206)
 at feign.FeignException.errorStatus(FeignException.java:194)
 at feign.codec.ErrorDecoder$Default.decode(ErrorDecoder.java:103)
 at feign.InvocationContext.decodeError(InvocationContext.java:126)
 at feign.InvocationContext.proceed(InvocationContext.java:72)
 at feign.ResponseHandler.handleResponse(ResponseHandler.java:63)
 at feign.SynchronousMethodHandler.executeAndDecode(SynchronousMethodHandler.java:114)
 at feign.SynchronousMethodHandler.invoke(SynchronousMethodHandler.java:70)
 at feign.ReflectiveFeign$FeignInvocationHandler.invoke(ReflectiveFeign.java:99)
 at org.springframework.cloud.openfeign.FeignCachingInvocationHandlerFactory$1.proceed(FeignCachingInvocationHandlerFactory.java:66)
 at org.springframework.cache.interceptor.CacheInterceptor.lambda$invoke$0(CacheInterceptor.java:64)
 at org.springframework.cache.interceptor.CacheAspectSupport.invokeOperation(CacheAspectSupport.java:416)
 at org.springframework.cache.interceptor.CacheAspectSupport.execute(CacheAspectSupport.java:401)
 at org.springframework.cache.interceptor.CacheInterceptor.invoke(CacheInterceptor.java:74)
 at org.springframework.cloud.openfeign.FeignCachingInvocationHandlerFactory.lambda$create$1(FeignCachingInvocationHandlerFactory.java:53)

feign.InvocationContext#proceed
在这里插入图片描述
除了调整httpStatus200,也可以通过在微服务A中判断返回对象的状态来决定是否报错,并由全局事务捕获。

异常时的处理逻辑:

全局事务回滚:
[  XNIO-1 task-2] i.seata.tm.api.DefaultGlobalTransaction  : transaction 172.17.0.7:8091:72652747696657983 will be rollback
[  XNIO-1 task-2] i.seata.tm.api.DefaultGlobalTransaction  : transaction end, xid = 172.17.0.7:8091:72652747696657983
[  XNIO-1 task-2] i.seata.tm.api.DefaultGlobalTransaction  : [172.17.0.7:8091:72652747696657983] rollback status: Rollbacked

分支事务回滚:
[h_RMROLE_1_1_16] i.s.c.r.p.c.RmBranchRollbackProcessor    : rm handle branch rollback process:BranchRollbackRequest{xid='172.17.0.8:8091:72652747696657983', branchId=72652747696657987, branchType=AT, resourceId='jdbc:postgresql://ip:port/schema', applicationData='null'}
[h_RMROLE_1_1_16] io.seata.rm.AbstractRMHandler            : Branch Rollbacking: 172.17.0.8:8091:72652747696657983 72652747696657987 jdbc:postgresql://ip:port/schema

[h_RMROLE_1_1_16] i.s.r.d.undo.AbstractUndoLogManager      : xid 172.17.0.8:8091:72652747696657983 branch 72652747696657987, undo_log deleted with GlobalFinished
[h_RMROLE_1_1_16] i.seata.rm.datasource.DataSourceManager  : branch rollback success, xid:172.17.0.8:8091:72652747696657983, branchId:72652747696657987
[h_RMROLE_1_1_16] io.seata.rm.AbstractRMHandler            : Branch Rollbacked result: PhaseTwo_Rollbacked

注意:回滚也是异步执行的。

源码分析
全局事务的处理逻辑
io.seata.spring.annotation.GlobalTransactionalInterceptor#handleGlobalTransaction
io.seata.tm.api.TransactionalTemplate#execute

  // 1. Get transactionInfo 获取注解上的配置
  TransactionInfo txInfo = business.getTransactionInfo();
  if (txInfo == null) {
      throw new ShouldNeverHappenException("transactionInfo does not exist");
  }
  // 1.1 Get current transaction, if not null, the tx role is 'GlobalTransactionRole.Participant'.
  GlobalTransaction tx = GlobalTransactionContext.getCurrent();

  // 1.2 Handle the transaction propagation.  隔离级别
  Propagation propagation = txInfo.getPropagation();
  SuspendedResourcesHolder suspendedResourcesHolder = null;
  try {
      switch (propagation) {
          case NOT_SUPPORTED:
              // If transaction is existing, suspend it.
              if (existingTransaction(tx)) {
                  suspendedResourcesHolder = tx.suspend(false);
              }
              // Execute without transaction and return.
              return business.execute();
          case REQUIRES_NEW:
          	// 当前事务存在,则先挂起;然后创建新事务;
              // If transaction is existing, suspend it, and then begin new transaction.
              if (existingTransaction(tx)) {
                  suspendedResourcesHolder = tx.suspend(false);
              }
              tx = GlobalTransactionContext.createNew();
              // Continue and execute with new transaction
              break;
          case SUPPORTS:
              // If transaction is not existing, execute without transaction.
              if (notExistingTransaction(tx)) {
                  return business.execute();
              }
              // Continue and execute with new transaction
              break;
          case REQUIRED:
              // If current transaction is existing, execute with current transaction,else create
              // 同spring @transactional 隔离级别,默认特性;当前事务存在直接返回,没有则创建;
              tx = GlobalTransactionContext.getCurrentOrCreate();
              break;
          case NEVER:
              // If transaction is existing, throw exception.
              if (existingTransaction(tx)) {
                  throw new TransactionException(
                          String.format("Existing transaction found for transaction marked with propagation 'never', xid = %s"
                                  , tx.getXid()));
              } else {
                  // Execute without transaction and return.
                  return business.execute();
              }
          case MANDATORY:
              // If transaction is not existing, throw exception.
              if (notExistingTransaction(tx)) {
                  throw new TransactionException("No existing transaction found for transaction marked with propagation 'mandatory'");
              }
              // Continue and execute with current transaction.
              break;
          default:
              throw new TransactionException("Not Supported Propagation:" + propagation);
      }

      // 全局锁配置
      // set current tx config to holder
      GlobalLockConfig previousConfig = replaceGlobalLockConfig(txInfo);

      if (tx.getGlobalTransactionRole() == GlobalTransactionRole.Participant) {
          LOGGER.info("join into a existing global transaction,xid={}", tx.getXid());
      }

      try {
          // 2. If the tx role is 'GlobalTransactionRole.Launcher', send the request of beginTransaction to TC,
          //    else do nothing. Of course, the hooks will still be triggered.
          // 开启新事务
          beginTransaction(txInfo, tx);
          
          Object rs;
          try {
              // Do Your Business
              // 处理业务代码,上文的场景即在这里处理;
              rs = business.execute();
          } catch (Throwable ex) {
              // 3. The needed business exception to rollback.
              // 捕获异常并完成事务;
              // 注意是完成,而不是回滚,例如不是所有的异常都要回滚
              completeTransactionAfterThrowing(txInfo, tx, ex);
              throw ex;
          }

          // 4. everything is fine, commit.
          commitTransaction(tx, txInfo);

          return rs;
      } finally {
          //5. clear
          resumeGlobalLockConfig(previousConfig);
          triggerAfterCompletion();
          cleanUp();
      }
  } finally {
      // If the transaction is suspended, resume it.
      if (suspendedResourcesHolder != null) {
          tx.resume(suspendedResourcesHolder);
      }
  }
        
private void completeTransactionAfterThrowing(TransactionInfo txInfo, GlobalTransaction tx, Throwable originalException)
            throws TransactionalExecutor.ExecutionException, TransactionException {
    //roll back 判断是回滚还是提交
    if (txInfo != null && txInfo.rollbackOn(originalException)) {
        rollbackTransaction(tx, originalException);
    } else {
        // not roll back on this exception, so commit
        commitTransaction(tx, txInfo);
    }
}
分支事务的处理逻辑

提交阶段
io.seata.rm.datasource.ConnectionProxy#commit
io.seata.rm.datasource.ConnectionProxy#doCommit
io.seata.rm.datasource.ConnectionProxy#processGlobalTransactionCommit
io.seata.rm.AbstractResourceManager#branchRegister

BranchRegisterRequest request = new BranchRegisterRequest();
request.setXid(xid);
request.setLockKey(lockKeys);
request.setResourceId(resourceId);
request.setBranchType(branchType);
request.setApplicationData(applicationData);

BranchRegisterResponse response = (BranchRegisterResponse) RmNettyRemotingClient.getInstance().sendSyncRequest(request);
if (response.getResultCode() == ResultCode.Failed) {
    throw new RmTransactionException(response.getTransactionExceptionCode(),
        String.format("branch register failed, xid: %s, errMsg: %s ", xid, response.getMsg()));
}
if (LOGGER.isInfoEnabled()) {
    LOGGER.info("branch register success, xid:{}, branchId:{}, lockKeys:{}", xid, response.getBranchId(), lockKeys);
}
return response.getBranchId();

回滚阶段,tm触发全局回滚后,tc会通过netty异步告知各rm进行回滚
io.seata.core.rpc.netty.AbstractNettyRemotingClient.ClientHandler
io.seata.core.rpc.netty.AbstractNettyRemoting#processMessage
io.seata.core.rpc.processor.client.RmBranchRollbackProcessor#process // rm回滚
io.seata.core.rpc.processor.client.RmBranchRollbackProcessor#handleBranchRollback
io.seata.rm.AbstractRMHandler#doBranchRollback
io.seata.rm.datasource.DataSourceManager#branchRollback

public BranchStatus branchRollback(BranchType branchType, String xid, long branchId, String resourceId,
                                   String applicationData) throws TransactionException {
    DataSourceProxy dataSourceProxy = get(resourceId);
    if (dataSourceProxy == null) {
        throw new ShouldNeverHappenException(String.format("resource: %s not found",resourceId));
    }
    try {
        // 基于undo_log回滚UndoLogManagerFactory.getUndoLogManager(dataSourceProxy.getDbType()).undo(dataSourceProxy, xid, branchId);
        if (LOGGER.isInfoEnabled()) {
            LOGGER.info("branch rollback success, xid:{}, branchId:{}", xid, branchId);
        }
    } catch (TransactionException te) {
        StackTraceLogger.error(LOGGER, te,
            "branchRollback failed. branchType:[{}], xid:[{}], branchId:[{}], resourceId:[{}], applicationData:[{}]. reason:[{}]",
            new Object[]{branchType, xid, branchId, resourceId, applicationData, te.getMessage()});
        if (te.getCode() == TransactionExceptionCode.BranchRollbackFailed_Unretriable) {
            return BranchStatus.PhaseTwo_RollbackFailed_Unretryable;
        } else {
            return BranchStatus.PhaseTwo_RollbackFailed_Retryable;
        }
    }
    return BranchStatus.PhaseTwo_Rollbacked;
}

2.传播特性Propagation

spring @transactional 传播特性失效?

Spring在TransactionDefinition接口中规定了7种类型的事务传播行为。REQUIRES_NEW 用于在当前事务存在时挂起当前事务,并创建一个新的事务来执行标注了REQUIRES_NEW的方法。这种机制确保了内部事务的独立性,不受外部事务的影响。

然而在全局事务回滚时,REQUIRES_NEW注解的事务也将回滚,因为新开启的新分支事务也被同一全局事务管理。
日志如下:

[  XNIO-1 task-2] io.seata.rm.AbstractResourceManager      : branch register success, xid:172.17.0.7:8091:63655768332615130, branchId:63655768332615138, lockKeys:tableName:pkName
-- REQUIRES_NEW 新开启的分支事务
[  XNIO-1 task-2] io.seata.rm.AbstractResourceManager      : branch register success, xid:172.17.0.7:8091:63655768332615130, branchId:63655768332615141, lockKeys:tableName:pkName


[h_RMROLE_1_1_16] i.s.c.r.p.c.RmBranchRollbackProcessor    : rm handle branch rollback process:BranchRollbackRequest{xid='172.17.0.7:8091:63655768332615130', branchId=63655768332615141, branchType=AT, resourceId='jdbc:postgresql://ip:port/schema', applicationData='{"autoCommit":false}'}
[h_RMROLE_1_1_16] io.seata.rm.AbstractRMHandler            : Branch Rollbacking: 172.17.0.7:8091:63655768332615130 63655768332615141 jdbc:postgresql://ip:port/schema
[h_RMROLE_1_1_16] i.s.r.d.undo.AbstractUndoLogManager      : xid 172.17.0.7:8091:63655768332615130 branch 63655768332615141, undo_log deleted with GlobalFinished
[h_RMROLE_1_1_16] i.seata.rm.datasource.DataSourceManager  : branch rollback success, xid:172.17.0.7:8091:63655768332615130, branchId:63655768332615141
[h_RMROLE_1_1_16] io.seata.rm.AbstractRMHandler            : Branch Rollbacked result: PhaseTwo_Rollbacked

[h_RMROLE_1_2_16] i.s.c.r.p.c.RmBranchRollbackProcessor    : rm handle branch rollback process:BranchRollbackRequest{xid='172.17.0.7:8091:63655768332615130', branchId=63655768332615138, branchType=AT, resourceId='jdbc:postgresql://ip:port/schema', applicationData='{"autoCommit":false}'}
[h_RMROLE_1_2_16] io.seata.rm.AbstractRMHandler            : Branch Rollbacking: 172.17.0.7:8091:63655768332615130 63655768332615138 jdbc:postgresql://ip:port/schema
[h_RMROLE_1_2_16] i.s.r.d.undo.AbstractUndoLogManager      : xid 172.17.0.7:8091:63655768332615130 branch 63655768332615138, undo_log deleted with GlobalFinished
[h_RMROLE_1_2_16] i.seata.rm.datasource.DataSourceManager  : branch rollback success, xid:172.17.0.7:8091:63655768332615130, branchId:63655768332615138
[h_RMROLE_1_2_16] io.seata.rm.AbstractRMHandler            : Branch Rollbacked result: PhaseTwo_Rollbacked

解决方式,在spring REQUIRES_NEW 之上添加全局事务的传播特性,开启新的全局事务

@GlobalTransactional(rollbackFor = Exception.class, propagation = io.seata.tm.api.transaction.Propagation.REQUIRES_NEW)
@Transactional(rollbackFor = Exception.class, propagation = org.springframework.transaction.annotation.Propagation.REQUIRES_NEW)
// 全局事务A回滚,另一个全局事务并不会回滚
[  XNIO-1 task-2] i.seata.tm.api.DefaultGlobalTransaction  : suspending current transaction, xid = 172.17.0.8:8091:72652747696657983
 [  XNIO-1 task-2] i.seata.tm.api.DefaultGlobalTransaction  : Begin new global transaction [172.17.0.8:8091:72652747696657984]

[h_RMROLE_1_1_16] i.s.c.r.p.c.RmBranchRollbackProcessor    : rm handle branch rollback process:BranchRollbackRequest{xid='172.17.0.8:8091:72652747696657983', branchId=72652747696657987, branchType=AT, resourceId='jdbc:postgresql://ip:port/schema', applicationData='null'}
[h_RMROLE_1_1_16] io.seata.rm.AbstractRMHandler            : Branch Rollbacking: 172.17.0.8:8091:72652747696657983 72652747696657987 jdbc:postgresql://ip:port/schema
[h_RMROLE_1_1_16] i.s.r.d.undo.AbstractUndoLogManager      : xid 172.17.0.8:8091:72652747696657983 branch 72652747696657987, undo_log deleted with GlobalFinished
[h_RMROLE_1_1_16] i.seata.rm.datasource.DataSourceManager  : branch rollback success, xid:172.17.0.8:8091:72652747696657983, branchId:72652747696657987
[h_RMROLE_1_1_16] io.seata.rm.AbstractRMHandler            : Branch Rollbacked result: PhaseTwo_Rollbacked

[h_RMROLE_1_2_16] i.s.c.r.p.c.RmBranchCommitProcessor      : rm client handle branch commit process:BranchCommitRequest{xid='172.17.0.8:8091:72652747696657984', branchId=72652747696657985, branchType=AT, resourceId='jdbc:postgresql://ip:port/schema', applicationData='null'}
[h_RMROLE_1_2_16] io.seata.rm.AbstractRMHandler            : Branch committing: 172.17.0.8:8091:72652747696657984 72652747696657985 jdbc:postgresql://ip:port/schema null
[h_RMROLE_1_2_16] io.seata.rm.AbstractRMHandler            : Branch commit result: PhaseTwo_Committed

再次看io.seata.tm.api.TransactionalTemplate#execute方法里全局事务传播特性的行为

case REQUIRES_NEW:
	// 区分传播特性
	// If transaction is existing, suspend it, and then begin new transaction.
	if (existingTransaction(tx)) {
		suspendedResourcesHolder = tx.suspend(false);
	}
	tx = GlobalTransactionContext.createNew();
ShouldNeverHappenException

在最初使用全局事务的隔离级别时,方法上只有@GlobalTransactional(rollbackFor = Exception.class, propagation = io.seata.tm.api.transaction.Propagation.REQUIRES_NEW),而没有@Transactional(rollbackFor = Exception.class, propagation = org.springframework.transaction.annotation.Propagation.REQUIRES_NEW),抛出过如下异常,绑定的是新分布式事务id,但当前全局事务id是原id

Cause: java.sql.SQLException: io.seata.common.exception.ShouldNeverHappenException: bind xid: 172.17.0.7:8091:63655768332620147, while current xid: 172.17.0.7:8091:63655768332620141

Caused by: io.seata.common.exception.ShouldNeverHappenException: bind xid: 172.17.0.7:8091:63655768332620147, while current xid: 172.17.0.7:8091:63655768332620141
	at io.seata.rm.datasource.ConnectionContext.bind(ConnectionContext.java:198)
	at io.seata.rm.datasource.ConnectionProxy.bind(ConnectionProxy.java:85)
	at io.seata.rm.datasource.exec.BaseTransactionalExecutor.execute(BaseTransactionalExecutor.java:120)
	at io.seata.rm.datasource.exec.ExecuteTemplate.execute(ExecuteTemplate.java:145)

	at io.seata.rm.datasource.PreparedStatementProxy.execute(PreparedStatementProxy.java:55)
	at org.apache.ibatis.executor.statement.PreparedStatementHandler.update(PreparedStatementHandler.java:48)
	at org.apache.ibatis.executor.statement.RoutingStatementHandler.update(RoutingStatementHandler.java:75)
	at org.apache.ibatis.executor.SimpleExecutor.doUpdate(SimpleExecutor.java:50)
	at org.apache.ibatis.executor.BaseExecutor.update(BaseExecutor.java:117)
	at org.apache.ibatis.executor.CachingExecutor.update(CachingExecutor.java:76)
    at com.baomidou.mybatisplus.extension.plugins.MybatisPlusInterceptor.intercept(MybatisPlusInterceptor.java:106)
	at org.apache.ibatis.session.defaults.DefaultSqlSession.update(DefaultSqlSession.java:197)

seata 通过代理执行sql语句时,检查是否已有xid
org.apache.seata.rm.datasource.ConnectionContext#bind

void bind(String xid) {
    if (xid == null) {
        throw new IllegalArgumentException("xid should not be null");
    }
    if (!inGlobalTransaction()) {
        setXid(xid);
    } else {
        if (!this.xid.equals(xid)) {
            throw new ShouldNeverHappenException(String.format("bind xid: %s, while current xid: %s", xid, this.xid));
        }
    }
}

跟进org.apache.seata.rm.datasource.ConnectionContext#reset()方法,只有在本地事务提交或回滚后才会清空xid。即同一分支事务不能跨不同的分布式事务。

在执行不同的sql语句时,会根据不同类型的数据库及dml语句来实现抽象类AbstractDMLBaseExecutor,比如insert、update、delete的beforeImage,afterImage有不同的处理逻辑。

3. 全局事务读未提交?

回滚时的undolog 检测

在某次分支事务执行回滚时,曾遇到过如下异常:

 [_RMROLE_1_13_32] i.seata.rm.datasource.DataSourceManager  : branchRollback failed. branchType:[AT], xid:[172.17.0.8:8091:72641030088531502], branchId:[72641030088531508], resourceId:[jdbc:postgresql://ip:port/schema], applicationData:[{"autoCommit":false}]. reason:[Branch session rollback failed because of dirty undo log, please delete the relevant undolog after manually calibrating the data. xid = 172.17.0.8:8091:72641030088531502 branchId = 72641030088531508]
// 执行回滚
org.apache.seata.rm.AbstractRMHandler#doBranchRollback
io.seata.rm.datasource.DataSourceManager#branchRollback
// 基于undo log进行回滚
org.apache.seata.rm.datasource.undo.AbstractUndoLogManager#undo
org.apache.seata.rm.datasource.undo.AbstractUndoExecutor#executeOn
// 回滚前校验, 如校验当前信息与修改前、修改后信息
org.apache.seata.rm.datasource.undo.AbstractUndoExecutor#dataValidationAndGoOn
	// 如果当前数据与undo log 中的修改后、修改前日志都不一样,则抛出异常
	// 这些数据可能被其它并发事务修改了,需要人工维护
	io.seata.rm.datasource.undo.SQLUndoDirtyException


protected boolean dataValidationAndGoOn(ConnectionProxy conn) throws SQLException {

   TableRecords beforeRecords = sqlUndoLog.getBeforeImage();
   TableRecords afterRecords = sqlUndoLog.getAfterImage();

   // Compare current data with before data
   // No need undo if the before data snapshot is equivalent to the after data snapshot.
   Result<Boolean> beforeEqualsAfterResult = DataCompareUtils.isRecordsEquals(beforeRecords, afterRecords);
   if (beforeEqualsAfterResult.getResult()) {
       if (LOGGER.isInfoEnabled()) {
           LOGGER.info("Stop rollback because there is no data change " +
                   "between the before data snapshot and the after data snapshot.");
       }
       // no need continue undo.
       return false;
   }

   // Validate if data is dirty.
   TableRecords currentRecords = queryCurrentRecords(conn);
   // compare with current data and after image.
   Result<Boolean> afterEqualsCurrentResult = DataCompareUtils.isRecordsEquals(afterRecords, currentRecords);
   if (!afterEqualsCurrentResult.getResult()) {

       // If current data is not equivalent to the after data, then compare the current data with the before
       // data, too. No need continue to undo if current data is equivalent to the before data snapshot
       Result<Boolean> beforeEqualsCurrentResult = DataCompareUtils.isRecordsEquals(beforeRecords, currentRecords);
       if (beforeEqualsCurrentResult.getResult()) {
           if (LOGGER.isInfoEnabled()) {
               LOGGER.info("Stop rollback because there is no data change " +
                       "between the before data snapshot and the current data snapshot.");
           }
           // no need continue undo.
           return false;
       } else {
           if (LOGGER.isInfoEnabled()) {
               if (StringUtils.isNotBlank(afterEqualsCurrentResult.getErrMsg())) {
                   LOGGER.info(afterEqualsCurrentResult.getErrMsg(), afterEqualsCurrentResult.getErrMsgParams());
               }
           }
           if (LOGGER.isDebugEnabled()) {
               LOGGER.debug("check dirty data failed, old and new data are not equal, " +
                       "tableName:[" + sqlUndoLog.getTableName() + "]," +
                       "oldRows:[" + JSON.toJSONString(afterRecords.getRows()) + "]," +
                       "newRows:[" + JSON.toJSONString(currentRecords.getRows()) + "].");
           }
           throw new SQLUndoDirtyException("Has dirty records when undo.");
       }
   }
   return true;
}	

回顾下seata at 模式的执行流程,在一阶段分支事务注册时已经完成本地事务的提交。

// 本地事务提交
org.apache.seata.rm.datasource.ConnectionProxy#commit
org.apache.seata.rm.datasource.ConnectionProxy#doCommit 	
org.apache.seata.rm.datasource.ConnectionProxy#processGlobalTransactionCommit  

private void processGlobalTransactionCommit() throws SQLException {
    try {
        // 向tc注册分支事务 
        register();
    } catch (TransactionException e) {
        recognizeLockKeyConflictException(e, context.buildLockKeys());
    }
    try {
        UndoLogManagerFactory.getUndoLogManager(this.getDbType()).flushUndoLogs(this);
        // 提交本地事务
        targetConnection.commit();
    } catch (Throwable ex) {
        LOGGER.error("process connectionProxy commit error: {}", ex.getMessage(), ex);
        report(false);
        throw new SQLException(ex);
    }
    if (IS_REPORT_SUCCESS_ENABLE) {
    	// 向tc上报本地提交结果
        report(true);
    }
    // 重试context;避免检查xid抛出shouldNotHappenExp
    context.reset();
}			

而回滚时是收到tc通知异步进行的,在这个时间差内如果有其他事务读到了全局事务未提交的数据,即出现了全局事务的读未提交,出现了脏读的情况。

如何实现全局事务的读已提交?
脏写

回顾上文的日志中,曾出现过

[  XNIO-1 task-4] io.seata.rm.AbstractResourceManager      : branch register success, xid:172.17.0.8:8091:72680625000142901, branchId:72680625000142905, lockKeys:tableName:pkid;tableName:pkid,pkid

即分支事务提交时,会加锁。
但是如果针对同一张表的改动不在此全局事务的方法中,如何处理呢?
可以在其他方法上也增加全局事务注解@GlobalTransactional 。
如果不需要全局事务,则需要使用@GlobalLock。

无论是@GlobalTransactional还是@GlobalLock,都是在分支事务提交时才能知道锁是否存在,然后决定下一步操作。

脏读

曾有同事遇到过一个场景,在MQ因异常重试时,读到了回滚前的数据认为业务已执行完成而停止了重试逻辑。

因此需要在执行过程中有必要检查全局锁是否存在。
如果执行 SQL 是 select for update,则会使用 SelectForUpdateExecutor 类,如果执行方法中带有 @GlobalTransactional or @GlobalLock注解,则会检查是否有全局锁,如果当前存在全局锁,则会回滚本地事务,通过 while 循环不断地重新竞争获取本地锁和全局锁。

io.seata.rm.datasource.exec.SelectForUpdateExecutor#doExecute

public T doExecute(Object... args) throws Throwable {
    Connection conn = statementProxy.getConnection();
    // ... ...
    try {
        // ... ...
        while (true) {
            try {
                // ... ...
                if (RootContext.inGlobalTransaction() || RootContext.requireGlobalLock()) {
                    // Do the same thing under either @GlobalTransactional or @GlobalLock, 
                    // that only check the global lock  here.
                    statementProxy.getConnectionProxy().checkLock(lockKeys);
                } else {
                    throw new RuntimeException("Unknown situation!");
                }
                break;
            } catch (LockConflictException lce) {
                if (sp != null) {
                    conn.rollback(sp);
                } else {
                    conn.rollback();
                }
                // trigger retry
                lockRetryController.sleep(lce);
            }
        }
    } finally {
        // ...
    }

其他的一些问题

时间类型的字段时区不要带时区
[_RMROLE_1_10_32] i.seata.rm.datasource.DataSourceManager  : branchRollback failed. branchType:[AT], xid:[172.17.0.8:8091:72641030088531502], branchId:[72641030088531508], resourceId:[jdbc:postgresql://ip:port/schema], applicationData:[{"autoCommit":false}]. reason:[Branch session rollback failed and try again later xid = 172.17.0.8:8091:72641030088531502 branchId = 72641030088531508 Cannot cast an instance of java.sql.Timestamp to type Types.TIMESTAMP_WITH_TIMEZONE]

参考

https://seata.apache.org/zh-cn/docs/dev/mode/at-mode/
https://seata.apache.org/zh-cn/blog/seata-datasource-proxy/
https://seata.apache.org/zh-cn/docs/overview/faq/#42
https://seata.apache.org/zh-cn/blog/seata-at-lock/

org.apache.seata.tm.api.TransactionalTemplate
io.seata.rm.AbstractResourceManager

org.apache.seata.rm.datasource.exec.BaseTransactionalExecutor
子类 AbstractDMLBaseExecutor、SelectForUpdateExecutor

org.apache.seata.server.coordinator.DefaultCore

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2325574.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

2025年3月29日(matlab -ss -lti)

线性时不变系统&#xff08;LTI系统&#xff09;的定义与核心特性 线性时不变系统&#xff08;Linear Time-Invariant System&#xff09;是信号与系统分析中的基础模型&#xff0c;其核心特性包括线性和时不变性。以下从定义、验证方法和应用场景展开说明&#xff1a; 1. 线性…

网络原理-TCP/IP

网络原理学习笔记&#xff1a;TCP/IP 核心概念 本文是我在学习网络原理时整理的笔记&#xff0c;主要涵盖传输层、网络层和数据链路层的核心协议和概念&#xff0c;特别是 TCP, UDP, IP, 和以太网。 一、传输层 (Transport Layer) 传输层负责提供端到端&#xff08;进程到进…

第五十三章 Spring之假如让你来写Boot——环境篇

Spring源码阅读目录 第一部分——IOC篇 第一章 Spring之最熟悉的陌生人——IOC 第二章 Spring之假如让你来写IOC容器——加载资源篇 第三章 Spring之假如让你来写IOC容器——解析配置文件篇 第四章 Spring之假如让你来写IOC容器——XML配置文件篇 第五章 Spring之假如让你来写…

Router [Continuation Settings]

楼上网络CMCC-Wmew&#xff0c;楼下接收不到&#xff0c;可能因为喜好弱&#xff0c;再弄一台路由器中转一下 Router [Continuation Settings] 路由器中续设置 到这里这台K3的路由器设置完成了&#xff0c;作为转发&#xff0c;中续&#xff0c;她还需要设置上游路由器&#…

Zookeeper中的Zxid是如何设计的

想获取更多高质量的Java技术文章&#xff1f;欢迎访问Java技术小馆官网&#xff0c;持续更新优质内容&#xff0c;助力技术成长 Java技术小馆官网https://www.yuque.com/jtostring Zookeeper中的Zxid是如何设计的 如果你们之前学习过 ZooKeeper&#xff0c;你们可能已经了解…

蓝桥云客 岛屿个数

0岛屿个数 - 蓝桥云课 问题描述 小蓝得到了一副大小为 MN 的格子地图&#xff0c;可以将其视作一个只包含字符 0&#xff08;代表海水&#xff09;和 1&#xff08;代表陆地&#xff09;的二维数组&#xff0c;地图之外可以视作全部是海水&#xff0c;每个岛屿由在上/下/左/右…

31天Python入门——第14天:异常处理

你好&#xff0c;我是安然无虞。 文章目录 异常处理1. Python异常2. 异常捕获try-except语句捕获所有的异常信息获取异常对象finally块 3. raise语句4. 自定义异常5. 函数调用里面产生的异常补充练习 异常处理 1. Python异常 Python异常指的是在程序执行过程中发生的错误或异…

浅析Android Jetpack ACC之LiveData

一、Android Jetpack简介 Android官网对Jetpack的介绍如下&#xff1a; Jetpack is a suite of libraries to help developers follow best practices, reduce boilerplate code, and write code that works consistently across Android versions and devices so that develo…

【区块链安全 | 第十五篇】类型之值类型(二)

文章目录 值类型有理数和整数字面量&#xff08;Rational and Integer Literals&#xff09;字符串字面量和类型&#xff08;String Literals and Types&#xff09;Unicode 字面量&#xff08;Unicode Literals&#xff09;十六进制字面量&#xff08;Hexadecimal Literals&am…

Ubuntu修改用户名

修改用户名&#xff1a; 1.CTRL ALT T 快捷键打开终端&#xff0c;输入‘sudo su’ 转为root用户。 2.输入‘ gredit /etc/passwd ’&#xff0c;修改用户名&#xff0c;只修改用户名&#xff0c;后面的全名、目录等不修改。 3.输入 ‘ gedit /etc/shadow ’ 和 ‘ gedit /etc/…

Windows 系统下多功能免费 PDF 编辑工具详解

IceCream PDF Editor是一款极为实用且操作简便的PDF文件编辑工具&#xff0c;它完美适配Windows操作系统。其用户界面设计得十分直观&#xff0c;哪怕是初次接触的用户也能快速上手。更为重要的是&#xff0c;该软件具备丰富多样的强大功能&#xff0c;能全方位满足各类PDF编辑…

UE学习记录part11

第14节 breakable actors 147 destructible meshes a geometry collection is basically a set of static meshes that we get after we fracture a mesh. 几何体集合基本上是我们在断开网格后获得的一组静态网格。 选中要破碎的网格物品&#xff0c;创建集合 可以选择不同的…

Redis-07.Redis常用命令-集合操作命令

一.集合操作命令 SADD key member1 [member2]&#xff1a; sadd set1 a b c d sadd set1 a 0表示没有添加成功&#xff0c;因为集合中已经有了这个元素了&#xff0c;因此无法重复添加。 SMEMBERS key: smembers set1 SCARD key&#xff1a; scard set1 SADD key member1 …

vscode 源代码管理

https://code.visualstudio.com/updates/v1_92#_source-control 您可以通过切换 scm.showHistoryGraph 设置来禁用传入/传出更改的图形可视化。

iOS审核被拒:Missing privacy manifest 第三方库添加隐私声明文件

问题&#xff1a; iOS提交APP审核被拒&#xff0c;苹果开发者网页显示二进制错误&#xff0c;收到的邮件显示的详细信息如下图: 分析&#xff1a; 从上面信息能看出第三方SDK库必须要包含一个隐私文件&#xff0c;去第三方库更新版本。 几经查询资料得知&#xff0c;苹果在…

【LeetCode Solutions】LeetCode 101 ~ 105 题解

CONTENTS LeetCode 101. 对称二叉树&#xff08;简单&#xff09;LeetCode 102. 二叉树的层序遍历&#xff08;中等&#xff09;LeetCode 103. 二叉树的锯齿形层序遍历&#xff08;中等&#xff09;LeetCode 104. 二叉树的最大深度&#xff08;简单&#xff09;LeetCode 105. 从…

Orpheus-TTS 介绍,新一代开源文本转语音

Orpheus-TTS 是由 Canopy Labs 团队于2025年3月19日发布的开源文本转语音&#xff08;TTS&#xff09;模型&#xff0c;其技术突破集中在超低延迟、拟人化情感表达与实时流式生成三大领域。以下从技术架构、核心优势、应用场景、对比分析、开发背景及最新进展等多维度展开深入解…

Java数据结构-栈和队列

目录 1. 栈(Stack) 1.1 概念 1.2 栈的使用 1.3 栈的模拟实现 1.4 栈的应用场景 1. 改变元素的序列 2. 将递归转化为循环 3. 括号匹配 4. 逆波兰表达式求值 5. 出栈入栈次序匹配 6. 最小栈 1.5 概念区分 2. 队列(Queue) 2.1 概念 2.2 队列的使用 2.3 队列模拟实…

权重衰减-笔记

《动手学深度学习》-4.5-笔记 权重衰减就像给模型“勒紧裤腰带”&#xff0c;不让它太贪心、不让它学太多。 你在学英语单词&#xff0c;别背太多冷门单词&#xff0c;只背常见的就行&#xff0c;这样考试时更容易拿分。” —— 这其实就是在“限制你学的内容复杂度”。 在…

Hyperliquid 遇袭「拔网线」、Polymarket 遭治理攻击「不作为」,从双平台危机看去中心化治理的进化阵痛

作者&#xff1a;Techub 热点速递 撰文&#xff1a;Glendon&#xff0c;Techub News 继 3 月 12 日「Hyperliquid 50 倍杠杆巨鲸」引发的 Hyperliquid 清算事件之后&#xff0c;3 月 26 日 晚间&#xff0c;Hyperliquid 再次遭遇了一场针对其流动性和治理模式的「闪电狙击」。…