Seata 是一款开源的分布式事务解决方案,致力于提供高性能和简单易用的分布式事务服务。Seata 将为用户提供了 AT、TCC、SAGA 和 XA 事务模式,为用户打造一站式的分布式解决方案。
Seata AT
基于两阶段提交协议的演变:
一阶段:业务数据和回滚日志记录在同一个本地事务中提交,释放本地锁和连接资源。
二阶段:提交异步化,非常快速地完成。回滚通过一阶段的回滚日志进行反向补偿。
AT 模式是一种非侵入式的分布式事务解决方案,Seata 在内部做了对数据库操作的代理层,我们使用 Seata AT 模式时,实际上用的是 Seata 自带的数据源代理 DataSourceProxy,Seata 在这层代理中加入了很多逻辑,比如插入回滚 undo_log 日志,检查全局锁等。
通过日志观察客户端的全局事务处理流程
当微服务B的方法开启了本地事务,在微服务中先后执行了 insert a , update b , 业务逻辑c ,update d。微服务A的方法通过声明式注解@GlobalTransactional开启全局事务,然后通过feign调用微服务B的方法。
微服务A
[ XNIO-1 task-2] i.seata.tm.api.DefaultGlobalTransaction : Begin new global transaction [172.17.0.7:8091:72680625000142901]
[ XNIO-1 task-4] io.seata.rm.AbstractResourceManager : branch register success, xid:172.17.0.8:8091:72680625000142901, branchId:72680625000142905, lockKeys:tableName:pkid;tableName:pkid,pkid
[ XNIO-1 task-2] i.seata.tm.api.DefaultGlobalTransaction : transaction 172.17.0.8:8091:72680625000142901 will be commit
[ XNIO-1 task-2] i.seata.tm.api.DefaultGlobalTransaction : [172.17.0.8:8091:72680625000142901] commit status: Committed
[ XNIO-1 task-2] i.seata.tm.api.DefaultGlobalTransaction : transaction end, xid = 172.17.0.8:8091:72680625000142901
[_RMROLE_1_26_32] io.seata.rm.AbstractRMHandler : Branch committing: 172.17.0.8:8091:72680625000142901 72680625000142905 jdbc:postgresql://ip:port/schema {"autoCommit":false}
[_RMROLE_1_26_32] i.s.c.r.p.c.RmBranchCommitProcessor : rm client handle branch commit process:BranchCommitRequest{xid='172.17.0.8:8091:72680625000142901', branchId=72680625000142905, branchType=AT, resourceId='jdbc:postgresql://ip:port/schema?currentSchema=schema', applicationData='{"autoCommit":false}'}
微服务B
[ XNIO-1 task-7] io.seata.rm.AbstractResourceManager : branch register success, xid:172.17.0.8:8091:72680625000142901, branchId:72680625000142907, lockKeys:tableName:pkid
注意观察分支事务提交时线程的不同,证明是tc异步通知分支事务异步提交的。
1.分支事务内遇到异常,但全局事务不回滚?
当微服务B的方法开启了本地事务,在微服务中先后执行了 insert a , update b , 业务逻辑c ,update d。当某段逻辑(如业务c)执行中出现异常时,本地事务会触发回滚。(视rollbackFor、隔离级别、事务代理、事务上下文等诸多原因决定是否真的会回滚。)
此时,假设微服务A的方法通过声明式注解@GlobalTransactional开启全局事务,然后通过feign调用微服务B的方法,同样的当业务c执行中出现异常时,全局事务却未必回滚。
异常时全局事务处理流程
通过日志观察,分支事务遇到异常后,并没有提交到tc。即tc不会感知到分支事务的异常。
而微服务A没有输出微服务B的异常,正常提交了全局事务。
为什么微服务A没有捕获到微服务B的异常呢?是由于微服务B中配置的RestControllerAdvice捕获了svc的异常。
如下
@ExceptionHandler({BizException.class})
@ResponseBody
@ResponseStatus(HttpStatus.OK)
public Response handleGlobalException(Exception e) {
String serviceName = this.environment.getProperty("spring.application.name");
String errorMsg = String.format("系统异常,[%s],ex=%s", serviceName, e.getMessage());
log.error(errorMsg, e);
return Response.failed(e.getMessage());
}
ResponseStatus为200,feign感知不到异常不会报错,微服务A也就无从感知到异常。
调整如下:
@ExceptionHandler({BizException.class})
@ResponseBody
@ResponseStatus(HttpStatus.INTERNAL_SERVER_ERROR)
public Response handleGlobalException(Exception e) {
String serviceName = this.environment.getProperty("spring.application.name");
String errorMsg = String.format("系统异常,[%s],ex=%s", serviceName, e.getMessage());
log.error(errorMsg, e);
return Response.failed(e.getMessage());
}
微服务A的报错如下
feign.FeignException$InternalServerError: [500 Internal Server Error] during [POST] to [http://serviceName/url] [Client#method(List)]: [{"code":xxxx,"msg":"插入重复数据","data":null,"success":false}]
at feign.FeignException.serverErrorStatus(FeignException.java:259)
at feign.FeignException.errorStatus(FeignException.java:206)
at feign.FeignException.errorStatus(FeignException.java:194)
at feign.codec.ErrorDecoder$Default.decode(ErrorDecoder.java:103)
at feign.InvocationContext.decodeError(InvocationContext.java:126)
at feign.InvocationContext.proceed(InvocationContext.java:72)
at feign.ResponseHandler.handleResponse(ResponseHandler.java:63)
at feign.SynchronousMethodHandler.executeAndDecode(SynchronousMethodHandler.java:114)
at feign.SynchronousMethodHandler.invoke(SynchronousMethodHandler.java:70)
at feign.ReflectiveFeign$FeignInvocationHandler.invoke(ReflectiveFeign.java:99)
at org.springframework.cloud.openfeign.FeignCachingInvocationHandlerFactory$1.proceed(FeignCachingInvocationHandlerFactory.java:66)
at org.springframework.cache.interceptor.CacheInterceptor.lambda$invoke$0(CacheInterceptor.java:64)
at org.springframework.cache.interceptor.CacheAspectSupport.invokeOperation(CacheAspectSupport.java:416)
at org.springframework.cache.interceptor.CacheAspectSupport.execute(CacheAspectSupport.java:401)
at org.springframework.cache.interceptor.CacheInterceptor.invoke(CacheInterceptor.java:74)
at org.springframework.cloud.openfeign.FeignCachingInvocationHandlerFactory.lambda$create$1(FeignCachingInvocationHandlerFactory.java:53)
feign.InvocationContext#proceed
除了调整httpStatus200,也可以通过在微服务A中判断返回对象的状态来决定是否报错,并由全局事务捕获。
异常时的处理逻辑:
全局事务回滚:
[ XNIO-1 task-2] i.seata.tm.api.DefaultGlobalTransaction : transaction 172.17.0.7:8091:72652747696657983 will be rollback
[ XNIO-1 task-2] i.seata.tm.api.DefaultGlobalTransaction : transaction end, xid = 172.17.0.7:8091:72652747696657983
[ XNIO-1 task-2] i.seata.tm.api.DefaultGlobalTransaction : [172.17.0.7:8091:72652747696657983] rollback status: Rollbacked
分支事务回滚:
[h_RMROLE_1_1_16] i.s.c.r.p.c.RmBranchRollbackProcessor : rm handle branch rollback process:BranchRollbackRequest{xid='172.17.0.8:8091:72652747696657983', branchId=72652747696657987, branchType=AT, resourceId='jdbc:postgresql://ip:port/schema', applicationData='null'}
[h_RMROLE_1_1_16] io.seata.rm.AbstractRMHandler : Branch Rollbacking: 172.17.0.8:8091:72652747696657983 72652747696657987 jdbc:postgresql://ip:port/schema
[h_RMROLE_1_1_16] i.s.r.d.undo.AbstractUndoLogManager : xid 172.17.0.8:8091:72652747696657983 branch 72652747696657987, undo_log deleted with GlobalFinished
[h_RMROLE_1_1_16] i.seata.rm.datasource.DataSourceManager : branch rollback success, xid:172.17.0.8:8091:72652747696657983, branchId:72652747696657987
[h_RMROLE_1_1_16] io.seata.rm.AbstractRMHandler : Branch Rollbacked result: PhaseTwo_Rollbacked
注意:回滚也是异步执行的。
源码分析
全局事务的处理逻辑
io.seata.spring.annotation.GlobalTransactionalInterceptor#handleGlobalTransaction
io.seata.tm.api.TransactionalTemplate#execute
// 1. Get transactionInfo 获取注解上的配置
TransactionInfo txInfo = business.getTransactionInfo();
if (txInfo == null) {
throw new ShouldNeverHappenException("transactionInfo does not exist");
}
// 1.1 Get current transaction, if not null, the tx role is 'GlobalTransactionRole.Participant'.
GlobalTransaction tx = GlobalTransactionContext.getCurrent();
// 1.2 Handle the transaction propagation. 隔离级别
Propagation propagation = txInfo.getPropagation();
SuspendedResourcesHolder suspendedResourcesHolder = null;
try {
switch (propagation) {
case NOT_SUPPORTED:
// If transaction is existing, suspend it.
if (existingTransaction(tx)) {
suspendedResourcesHolder = tx.suspend(false);
}
// Execute without transaction and return.
return business.execute();
case REQUIRES_NEW:
// 当前事务存在,则先挂起;然后创建新事务;
// If transaction is existing, suspend it, and then begin new transaction.
if (existingTransaction(tx)) {
suspendedResourcesHolder = tx.suspend(false);
}
tx = GlobalTransactionContext.createNew();
// Continue and execute with new transaction
break;
case SUPPORTS:
// If transaction is not existing, execute without transaction.
if (notExistingTransaction(tx)) {
return business.execute();
}
// Continue and execute with new transaction
break;
case REQUIRED:
// If current transaction is existing, execute with current transaction,else create
// 同spring @transactional 隔离级别,默认特性;当前事务存在直接返回,没有则创建;
tx = GlobalTransactionContext.getCurrentOrCreate();
break;
case NEVER:
// If transaction is existing, throw exception.
if (existingTransaction(tx)) {
throw new TransactionException(
String.format("Existing transaction found for transaction marked with propagation 'never', xid = %s"
, tx.getXid()));
} else {
// Execute without transaction and return.
return business.execute();
}
case MANDATORY:
// If transaction is not existing, throw exception.
if (notExistingTransaction(tx)) {
throw new TransactionException("No existing transaction found for transaction marked with propagation 'mandatory'");
}
// Continue and execute with current transaction.
break;
default:
throw new TransactionException("Not Supported Propagation:" + propagation);
}
// 全局锁配置
// set current tx config to holder
GlobalLockConfig previousConfig = replaceGlobalLockConfig(txInfo);
if (tx.getGlobalTransactionRole() == GlobalTransactionRole.Participant) {
LOGGER.info("join into a existing global transaction,xid={}", tx.getXid());
}
try {
// 2. If the tx role is 'GlobalTransactionRole.Launcher', send the request of beginTransaction to TC,
// else do nothing. Of course, the hooks will still be triggered.
// 开启新事务
beginTransaction(txInfo, tx);
Object rs;
try {
// Do Your Business
// 处理业务代码,上文的场景即在这里处理;
rs = business.execute();
} catch (Throwable ex) {
// 3. The needed business exception to rollback.
// 捕获异常并完成事务;
// 注意是完成,而不是回滚,例如不是所有的异常都要回滚
completeTransactionAfterThrowing(txInfo, tx, ex);
throw ex;
}
// 4. everything is fine, commit.
commitTransaction(tx, txInfo);
return rs;
} finally {
//5. clear
resumeGlobalLockConfig(previousConfig);
triggerAfterCompletion();
cleanUp();
}
} finally {
// If the transaction is suspended, resume it.
if (suspendedResourcesHolder != null) {
tx.resume(suspendedResourcesHolder);
}
}
private void completeTransactionAfterThrowing(TransactionInfo txInfo, GlobalTransaction tx, Throwable originalException)
throws TransactionalExecutor.ExecutionException, TransactionException {
//roll back 判断是回滚还是提交
if (txInfo != null && txInfo.rollbackOn(originalException)) {
rollbackTransaction(tx, originalException);
} else {
// not roll back on this exception, so commit
commitTransaction(tx, txInfo);
}
}
分支事务的处理逻辑
提交阶段
io.seata.rm.datasource.ConnectionProxy#commit
io.seata.rm.datasource.ConnectionProxy#doCommit
io.seata.rm.datasource.ConnectionProxy#processGlobalTransactionCommit
io.seata.rm.AbstractResourceManager#branchRegister
BranchRegisterRequest request = new BranchRegisterRequest();
request.setXid(xid);
request.setLockKey(lockKeys);
request.setResourceId(resourceId);
request.setBranchType(branchType);
request.setApplicationData(applicationData);
BranchRegisterResponse response = (BranchRegisterResponse) RmNettyRemotingClient.getInstance().sendSyncRequest(request);
if (response.getResultCode() == ResultCode.Failed) {
throw new RmTransactionException(response.getTransactionExceptionCode(),
String.format("branch register failed, xid: %s, errMsg: %s ", xid, response.getMsg()));
}
if (LOGGER.isInfoEnabled()) {
LOGGER.info("branch register success, xid:{}, branchId:{}, lockKeys:{}", xid, response.getBranchId(), lockKeys);
}
return response.getBranchId();
回滚阶段,tm触发全局回滚后,tc会通过netty异步告知各rm进行回滚
io.seata.core.rpc.netty.AbstractNettyRemotingClient.ClientHandler
io.seata.core.rpc.netty.AbstractNettyRemoting#processMessage
io.seata.core.rpc.processor.client.RmBranchRollbackProcessor#process // rm回滚
io.seata.core.rpc.processor.client.RmBranchRollbackProcessor#handleBranchRollback
io.seata.rm.AbstractRMHandler#doBranchRollback
io.seata.rm.datasource.DataSourceManager#branchRollback
public BranchStatus branchRollback(BranchType branchType, String xid, long branchId, String resourceId,
String applicationData) throws TransactionException {
DataSourceProxy dataSourceProxy = get(resourceId);
if (dataSourceProxy == null) {
throw new ShouldNeverHappenException(String.format("resource: %s not found",resourceId));
}
try {
// 基于undo_log回滚UndoLogManagerFactory.getUndoLogManager(dataSourceProxy.getDbType()).undo(dataSourceProxy, xid, branchId);
if (LOGGER.isInfoEnabled()) {
LOGGER.info("branch rollback success, xid:{}, branchId:{}", xid, branchId);
}
} catch (TransactionException te) {
StackTraceLogger.error(LOGGER, te,
"branchRollback failed. branchType:[{}], xid:[{}], branchId:[{}], resourceId:[{}], applicationData:[{}]. reason:[{}]",
new Object[]{branchType, xid, branchId, resourceId, applicationData, te.getMessage()});
if (te.getCode() == TransactionExceptionCode.BranchRollbackFailed_Unretriable) {
return BranchStatus.PhaseTwo_RollbackFailed_Unretryable;
} else {
return BranchStatus.PhaseTwo_RollbackFailed_Retryable;
}
}
return BranchStatus.PhaseTwo_Rollbacked;
}
2.传播特性Propagation
spring @transactional 传播特性失效?
Spring在TransactionDefinition接口中规定了7种类型的事务传播行为。REQUIRES_NEW 用于在当前事务存在时挂起当前事务,并创建一个新的事务来执行标注了REQUIRES_NEW的方法。这种机制确保了内部事务的独立性,不受外部事务的影响。
然而在全局事务回滚时,REQUIRES_NEW注解的事务也将回滚,因为新开启的新分支事务也被同一全局事务管理。
日志如下:
[ XNIO-1 task-2] io.seata.rm.AbstractResourceManager : branch register success, xid:172.17.0.7:8091:63655768332615130, branchId:63655768332615138, lockKeys:tableName:pkName
-- REQUIRES_NEW 新开启的分支事务
[ XNIO-1 task-2] io.seata.rm.AbstractResourceManager : branch register success, xid:172.17.0.7:8091:63655768332615130, branchId:63655768332615141, lockKeys:tableName:pkName
[h_RMROLE_1_1_16] i.s.c.r.p.c.RmBranchRollbackProcessor : rm handle branch rollback process:BranchRollbackRequest{xid='172.17.0.7:8091:63655768332615130', branchId=63655768332615141, branchType=AT, resourceId='jdbc:postgresql://ip:port/schema', applicationData='{"autoCommit":false}'}
[h_RMROLE_1_1_16] io.seata.rm.AbstractRMHandler : Branch Rollbacking: 172.17.0.7:8091:63655768332615130 63655768332615141 jdbc:postgresql://ip:port/schema
[h_RMROLE_1_1_16] i.s.r.d.undo.AbstractUndoLogManager : xid 172.17.0.7:8091:63655768332615130 branch 63655768332615141, undo_log deleted with GlobalFinished
[h_RMROLE_1_1_16] i.seata.rm.datasource.DataSourceManager : branch rollback success, xid:172.17.0.7:8091:63655768332615130, branchId:63655768332615141
[h_RMROLE_1_1_16] io.seata.rm.AbstractRMHandler : Branch Rollbacked result: PhaseTwo_Rollbacked
[h_RMROLE_1_2_16] i.s.c.r.p.c.RmBranchRollbackProcessor : rm handle branch rollback process:BranchRollbackRequest{xid='172.17.0.7:8091:63655768332615130', branchId=63655768332615138, branchType=AT, resourceId='jdbc:postgresql://ip:port/schema', applicationData='{"autoCommit":false}'}
[h_RMROLE_1_2_16] io.seata.rm.AbstractRMHandler : Branch Rollbacking: 172.17.0.7:8091:63655768332615130 63655768332615138 jdbc:postgresql://ip:port/schema
[h_RMROLE_1_2_16] i.s.r.d.undo.AbstractUndoLogManager : xid 172.17.0.7:8091:63655768332615130 branch 63655768332615138, undo_log deleted with GlobalFinished
[h_RMROLE_1_2_16] i.seata.rm.datasource.DataSourceManager : branch rollback success, xid:172.17.0.7:8091:63655768332615130, branchId:63655768332615138
[h_RMROLE_1_2_16] io.seata.rm.AbstractRMHandler : Branch Rollbacked result: PhaseTwo_Rollbacked
解决方式,在spring REQUIRES_NEW 之上添加全局事务的传播特性,开启新的全局事务
@GlobalTransactional(rollbackFor = Exception.class, propagation = io.seata.tm.api.transaction.Propagation.REQUIRES_NEW)
@Transactional(rollbackFor = Exception.class, propagation = org.springframework.transaction.annotation.Propagation.REQUIRES_NEW)
// 全局事务A回滚,另一个全局事务并不会回滚
[ XNIO-1 task-2] i.seata.tm.api.DefaultGlobalTransaction : suspending current transaction, xid = 172.17.0.8:8091:72652747696657983
[ XNIO-1 task-2] i.seata.tm.api.DefaultGlobalTransaction : Begin new global transaction [172.17.0.8:8091:72652747696657984]
[h_RMROLE_1_1_16] i.s.c.r.p.c.RmBranchRollbackProcessor : rm handle branch rollback process:BranchRollbackRequest{xid='172.17.0.8:8091:72652747696657983', branchId=72652747696657987, branchType=AT, resourceId='jdbc:postgresql://ip:port/schema', applicationData='null'}
[h_RMROLE_1_1_16] io.seata.rm.AbstractRMHandler : Branch Rollbacking: 172.17.0.8:8091:72652747696657983 72652747696657987 jdbc:postgresql://ip:port/schema
[h_RMROLE_1_1_16] i.s.r.d.undo.AbstractUndoLogManager : xid 172.17.0.8:8091:72652747696657983 branch 72652747696657987, undo_log deleted with GlobalFinished
[h_RMROLE_1_1_16] i.seata.rm.datasource.DataSourceManager : branch rollback success, xid:172.17.0.8:8091:72652747696657983, branchId:72652747696657987
[h_RMROLE_1_1_16] io.seata.rm.AbstractRMHandler : Branch Rollbacked result: PhaseTwo_Rollbacked
[h_RMROLE_1_2_16] i.s.c.r.p.c.RmBranchCommitProcessor : rm client handle branch commit process:BranchCommitRequest{xid='172.17.0.8:8091:72652747696657984', branchId=72652747696657985, branchType=AT, resourceId='jdbc:postgresql://ip:port/schema', applicationData='null'}
[h_RMROLE_1_2_16] io.seata.rm.AbstractRMHandler : Branch committing: 172.17.0.8:8091:72652747696657984 72652747696657985 jdbc:postgresql://ip:port/schema null
[h_RMROLE_1_2_16] io.seata.rm.AbstractRMHandler : Branch commit result: PhaseTwo_Committed
再次看io.seata.tm.api.TransactionalTemplate#execute方法里全局事务传播特性的行为
case REQUIRES_NEW:
// 区分传播特性
// If transaction is existing, suspend it, and then begin new transaction.
if (existingTransaction(tx)) {
suspendedResourcesHolder = tx.suspend(false);
}
tx = GlobalTransactionContext.createNew();
ShouldNeverHappenException
在最初使用全局事务的隔离级别时,方法上只有@GlobalTransactional(rollbackFor = Exception.class, propagation = io.seata.tm.api.transaction.Propagation.REQUIRES_NEW),而没有@Transactional(rollbackFor = Exception.class, propagation = org.springframework.transaction.annotation.Propagation.REQUIRES_NEW),抛出过如下异常,绑定的是新分布式事务id,但当前全局事务id是原id
Cause: java.sql.SQLException: io.seata.common.exception.ShouldNeverHappenException: bind xid: 172.17.0.7:8091:63655768332620147, while current xid: 172.17.0.7:8091:63655768332620141
Caused by: io.seata.common.exception.ShouldNeverHappenException: bind xid: 172.17.0.7:8091:63655768332620147, while current xid: 172.17.0.7:8091:63655768332620141
at io.seata.rm.datasource.ConnectionContext.bind(ConnectionContext.java:198)
at io.seata.rm.datasource.ConnectionProxy.bind(ConnectionProxy.java:85)
at io.seata.rm.datasource.exec.BaseTransactionalExecutor.execute(BaseTransactionalExecutor.java:120)
at io.seata.rm.datasource.exec.ExecuteTemplate.execute(ExecuteTemplate.java:145)
at io.seata.rm.datasource.PreparedStatementProxy.execute(PreparedStatementProxy.java:55)
at org.apache.ibatis.executor.statement.PreparedStatementHandler.update(PreparedStatementHandler.java:48)
at org.apache.ibatis.executor.statement.RoutingStatementHandler.update(RoutingStatementHandler.java:75)
at org.apache.ibatis.executor.SimpleExecutor.doUpdate(SimpleExecutor.java:50)
at org.apache.ibatis.executor.BaseExecutor.update(BaseExecutor.java:117)
at org.apache.ibatis.executor.CachingExecutor.update(CachingExecutor.java:76)
at com.baomidou.mybatisplus.extension.plugins.MybatisPlusInterceptor.intercept(MybatisPlusInterceptor.java:106)
at org.apache.ibatis.session.defaults.DefaultSqlSession.update(DefaultSqlSession.java:197)
seata 通过代理执行sql语句时,检查是否已有xid
org.apache.seata.rm.datasource.ConnectionContext#bind
void bind(String xid) {
if (xid == null) {
throw new IllegalArgumentException("xid should not be null");
}
if (!inGlobalTransaction()) {
setXid(xid);
} else {
if (!this.xid.equals(xid)) {
throw new ShouldNeverHappenException(String.format("bind xid: %s, while current xid: %s", xid, this.xid));
}
}
}
跟进org.apache.seata.rm.datasource.ConnectionContext#reset()方法,只有在本地事务提交或回滚后才会清空xid。即同一分支事务不能跨不同的分布式事务。
在执行不同的sql语句时,会根据不同类型的数据库及dml语句来实现抽象类AbstractDMLBaseExecutor,比如insert、update、delete的beforeImage,afterImage有不同的处理逻辑。
3. 全局事务读未提交?
回滚时的undolog 检测
在某次分支事务执行回滚时,曾遇到过如下异常:
[_RMROLE_1_13_32] i.seata.rm.datasource.DataSourceManager : branchRollback failed. branchType:[AT], xid:[172.17.0.8:8091:72641030088531502], branchId:[72641030088531508], resourceId:[jdbc:postgresql://ip:port/schema], applicationData:[{"autoCommit":false}]. reason:[Branch session rollback failed because of dirty undo log, please delete the relevant undolog after manually calibrating the data. xid = 172.17.0.8:8091:72641030088531502 branchId = 72641030088531508]
// 执行回滚
org.apache.seata.rm.AbstractRMHandler#doBranchRollback
io.seata.rm.datasource.DataSourceManager#branchRollback
// 基于undo log进行回滚
org.apache.seata.rm.datasource.undo.AbstractUndoLogManager#undo
org.apache.seata.rm.datasource.undo.AbstractUndoExecutor#executeOn
// 回滚前校验, 如校验当前信息与修改前、修改后信息
org.apache.seata.rm.datasource.undo.AbstractUndoExecutor#dataValidationAndGoOn
// 如果当前数据与undo log 中的修改后、修改前日志都不一样,则抛出异常
// 这些数据可能被其它并发事务修改了,需要人工维护
io.seata.rm.datasource.undo.SQLUndoDirtyException
protected boolean dataValidationAndGoOn(ConnectionProxy conn) throws SQLException {
TableRecords beforeRecords = sqlUndoLog.getBeforeImage();
TableRecords afterRecords = sqlUndoLog.getAfterImage();
// Compare current data with before data
// No need undo if the before data snapshot is equivalent to the after data snapshot.
Result<Boolean> beforeEqualsAfterResult = DataCompareUtils.isRecordsEquals(beforeRecords, afterRecords);
if (beforeEqualsAfterResult.getResult()) {
if (LOGGER.isInfoEnabled()) {
LOGGER.info("Stop rollback because there is no data change " +
"between the before data snapshot and the after data snapshot.");
}
// no need continue undo.
return false;
}
// Validate if data is dirty.
TableRecords currentRecords = queryCurrentRecords(conn);
// compare with current data and after image.
Result<Boolean> afterEqualsCurrentResult = DataCompareUtils.isRecordsEquals(afterRecords, currentRecords);
if (!afterEqualsCurrentResult.getResult()) {
// If current data is not equivalent to the after data, then compare the current data with the before
// data, too. No need continue to undo if current data is equivalent to the before data snapshot
Result<Boolean> beforeEqualsCurrentResult = DataCompareUtils.isRecordsEquals(beforeRecords, currentRecords);
if (beforeEqualsCurrentResult.getResult()) {
if (LOGGER.isInfoEnabled()) {
LOGGER.info("Stop rollback because there is no data change " +
"between the before data snapshot and the current data snapshot.");
}
// no need continue undo.
return false;
} else {
if (LOGGER.isInfoEnabled()) {
if (StringUtils.isNotBlank(afterEqualsCurrentResult.getErrMsg())) {
LOGGER.info(afterEqualsCurrentResult.getErrMsg(), afterEqualsCurrentResult.getErrMsgParams());
}
}
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("check dirty data failed, old and new data are not equal, " +
"tableName:[" + sqlUndoLog.getTableName() + "]," +
"oldRows:[" + JSON.toJSONString(afterRecords.getRows()) + "]," +
"newRows:[" + JSON.toJSONString(currentRecords.getRows()) + "].");
}
throw new SQLUndoDirtyException("Has dirty records when undo.");
}
}
return true;
}
回顾下seata at 模式的执行流程,在一阶段分支事务注册时已经完成本地事务的提交。
// 本地事务提交
org.apache.seata.rm.datasource.ConnectionProxy#commit
org.apache.seata.rm.datasource.ConnectionProxy#doCommit
org.apache.seata.rm.datasource.ConnectionProxy#processGlobalTransactionCommit
private void processGlobalTransactionCommit() throws SQLException {
try {
// 向tc注册分支事务
register();
} catch (TransactionException e) {
recognizeLockKeyConflictException(e, context.buildLockKeys());
}
try {
UndoLogManagerFactory.getUndoLogManager(this.getDbType()).flushUndoLogs(this);
// 提交本地事务
targetConnection.commit();
} catch (Throwable ex) {
LOGGER.error("process connectionProxy commit error: {}", ex.getMessage(), ex);
report(false);
throw new SQLException(ex);
}
if (IS_REPORT_SUCCESS_ENABLE) {
// 向tc上报本地提交结果
report(true);
}
// 重试context;避免检查xid抛出shouldNotHappenExp
context.reset();
}
而回滚时是收到tc通知异步进行的,在这个时间差内如果有其他事务读到了全局事务未提交的数据,即出现了全局事务的读未提交,出现了脏读的情况。
如何实现全局事务的读已提交?
脏写
回顾上文的日志中,曾出现过
[ XNIO-1 task-4] io.seata.rm.AbstractResourceManager : branch register success, xid:172.17.0.8:8091:72680625000142901, branchId:72680625000142905, lockKeys:tableName:pkid;tableName:pkid,pkid
即分支事务提交时,会加锁。
但是如果针对同一张表的改动不在此全局事务的方法中,如何处理呢?
可以在其他方法上也增加全局事务注解@GlobalTransactional 。
如果不需要全局事务,则需要使用@GlobalLock。
无论是@GlobalTransactional还是@GlobalLock,都是在分支事务提交时才能知道锁是否存在,然后决定下一步操作。
脏读
曾有同事遇到过一个场景,在MQ因异常重试时,读到了回滚前的数据认为业务已执行完成而停止了重试逻辑。
因此需要在执行过程中有必要检查全局锁是否存在。
如果执行 SQL 是 select for update,则会使用 SelectForUpdateExecutor 类,如果执行方法中带有 @GlobalTransactional or @GlobalLock注解,则会检查是否有全局锁,如果当前存在全局锁,则会回滚本地事务,通过 while 循环不断地重新竞争获取本地锁和全局锁。
io.seata.rm.datasource.exec.SelectForUpdateExecutor#doExecute
public T doExecute(Object... args) throws Throwable {
Connection conn = statementProxy.getConnection();
// ... ...
try {
// ... ...
while (true) {
try {
// ... ...
if (RootContext.inGlobalTransaction() || RootContext.requireGlobalLock()) {
// Do the same thing under either @GlobalTransactional or @GlobalLock,
// that only check the global lock here.
statementProxy.getConnectionProxy().checkLock(lockKeys);
} else {
throw new RuntimeException("Unknown situation!");
}
break;
} catch (LockConflictException lce) {
if (sp != null) {
conn.rollback(sp);
} else {
conn.rollback();
}
// trigger retry
lockRetryController.sleep(lce);
}
}
} finally {
// ...
}
其他的一些问题
时间类型的字段时区不要带时区
[_RMROLE_1_10_32] i.seata.rm.datasource.DataSourceManager : branchRollback failed. branchType:[AT], xid:[172.17.0.8:8091:72641030088531502], branchId:[72641030088531508], resourceId:[jdbc:postgresql://ip:port/schema], applicationData:[{"autoCommit":false}]. reason:[Branch session rollback failed and try again later xid = 172.17.0.8:8091:72641030088531502 branchId = 72641030088531508 Cannot cast an instance of java.sql.Timestamp to type Types.TIMESTAMP_WITH_TIMEZONE]
参考
https://seata.apache.org/zh-cn/docs/dev/mode/at-mode/
https://seata.apache.org/zh-cn/blog/seata-datasource-proxy/
https://seata.apache.org/zh-cn/docs/overview/faq/#42
https://seata.apache.org/zh-cn/blog/seata-at-lock/
org.apache.seata.tm.api.TransactionalTemplate
io.seata.rm.AbstractResourceManager
org.apache.seata.rm.datasource.exec.BaseTransactionalExecutor
子类 AbstractDMLBaseExecutor、SelectForUpdateExecutor
org.apache.seata.server.coordinator.DefaultCore