文章目录
- 一、前言
- 二、GlobalTransactionScanner
- 1、判断某一个Bean是否需要做TCC动态代理
- 1> 判断bean实现的接口中是否有接口标注了@LocalTCC注解
- 2> 当Bean的某一个接口实现了@LocalTCC注解之后,解析相应接口中的TCC内容:
- TCCResource数据样例
- 3> TCCResource注册到TC的流程
- 1)RM发起注册Resource请求
- 2)TC接收注册Resource请求
- 三、发起TCC分支事务
- 1、ActionInterceptorHandler
- 分支事务注册到全局事务
- 四、TCC分支事务提交
- 五、TCC分支事务回滚
- 六、总结
一、前言
更多内容见Seata专栏:https://blog.csdn.net/saintmm/category_11953405.html
至此,seata系列的内容已出:
- can not get cluster name in registry config ‘service.vgroupMapping.xx‘, please make sure registry问题解决;
- Seata Failed to get available servers: endpoint format should like ip:port 报错原因/解决方案汇总版(看完本文必解决问题)
- Seata json decode exception, Cannot construct instance of java.time.LocalDateTime报错原因/解决方案最全汇总版
- 【微服务 31】超细的Spring Cloud 整合Seata实现分布式事务(排坑版)
- 【微服务 32】Spring Cloud整合Seata、Nacos实现分布式事务案例(巨细排坑版)【云原生】
- 【微服务33】分布式事务Seata源码解析一:在IDEA中启动Seata Server
- 【微服务34】分布式事务Seata源码解析二:Seata Server启动时都做了什么
- 【微服务35】分布式事务Seata源码解析三:从Spring Boot特性来看Seata Client 启动时都做了什么
- 【微服务36】分布式事务Seata源码解析四:图解Seata Client 如何与Seata Server建立连接、通信
- 【微服务37】分布式事务Seata源码解析五:@GlobalTransactional如何开启全局事务
- 【微服务38】分布式事务Seata源码解析六:全局/分支事务分布式ID如何生成?序列号超了怎么办?时钟回拨问题如何处理?
- 【微服务39】分布式事务Seata源码解析七:图解Seata事务执行流程之开启全局事务
- 分布式事务Seata源码解析八:本地事务执行流程(AT模式下)
- 分布式事务Seata源码解析九:分支事务如何注册到全局事务
- 分布式事务Seata源码解析十:AT模式回滚日志undo log详细构建过程
- 分布式事务Seata源码解析11:全局事务执行流程之两阶段全局事务提交
- 分布式事务Seata源码解析12:全局事务执行流程之全局事务回滚
- Spring Cloud整合Seata实现TCC分布式事务模式案例
二、GlobalTransactionScanner
在seata-client启动时会利用SpringBoot自动装配的特性加载GlobalTransactionScanner类。具体见博文:《Seata Client 启动时都做了什么?》
另外,在分布式事务Seata源码解析五:@GlobalTransactional如何开启全局事务一文中,笔者和大家聊了:
GlobalTransactionScanner
类继承了AbstractAutoProxyCreator
,AbstractAutoProxyCreator
类又实现了BeanPostProcessor
接口;因此GlobalTransactionScanner
类也是BPP
(BeanPostProcessor)。AbstractAutoProxyCreator
是Spring AOP中的一个抽象类,其主要功能是自动创建动态代理;因为其实现了BeanPostProcessor
接口,所以在类加载到Spring容器之前,会进入到其wrapIfNecessary()
方法对Bean进行代理包装,后续调用Bean之将委托给指定的拦截器。
GlobalTransactionScanner#wrapIfNecessary()
方法关键部分:
所以在bean加载到Spring容器之前,都会先判断bean是否需要做TCC的动态代理,如果需要则增加相应拦截器TccActionInterceptor
。
1、判断某一个Bean是否需要做TCC动态代理
简单来讲,就是判断Bean实现的所有接口中是否有一个接口上标注了@LocalTCC注解;如果有,则当前Bean需要做TCC动态代理。
1> 判断bean实现的接口中是否有接口标注了@LocalTCC注解
代码执行如下:
其中:
RemotingParser
负责提取远程处理bean信息;其一共有四个具体实现:
- 针对普通SpringCloud项目的LocalTCCRemotingParser;
- 针对阿里内部微服务架构HSF的HSFRemotingParser;
- 针对Dubbo项目的DubboRemotingParser;
- 针对SofaRpc框架的SofaRpcRemotingParser;
- 就
LocalTCCRemotingParser
而言,其isReference()
方法和isService()
方法的实现都一样的逻辑:判断当前bean实现的所有接口中是否有接口标注了@LocalTCC
注解。
2> 当Bean的某一个接口实现了@LocalTCC注解之后,解析相应接口中的TCC内容:
DefaultRemotingParser#parserRemotingServiceInfo():
public RemotingDesc parserRemotingServiceInfo(Object bean, String beanName, RemotingParser remotingParser) {
// 获取Bean实现的标注了@LocalTCC注解的接口信息:接口全路径类名、接口Class信息、实现接口的当前Bean(动态代理后的)
RemotingDesc remotingBeanDesc = remotingParser.getServiceDesc(bean, beanName);
if (remotingBeanDesc == null) {
return null;
}
// 后续获取当前bean对应的TCC信息,走remotingServiceMap缓存
remotingServiceMap.put(beanName, remotingBeanDesc);
Class<?> interfaceClass = remotingBeanDesc.getInterfaceClass();
Method[] methods = interfaceClass.getMethods();
// 判断当前Bean是否为serviceBean,就LocalTCC而言,走到这里就一定是的。
if (remotingParser.isService(bean, beanName)) {
try {
//service bean, registry resource
Object targetBean = remotingBeanDesc.getTargetBean();
// 遍历实现了@LocalTCC接口的所有方法,如果方法上标注了@TwoPhaseBusinessAction注解,则将一些TCC信息包装为TCCResource注册到TC(seata server)
// TCC信息包括:@TwoPhaseBusinessAction名称,目标类,try方法,commit方法、方法名称、方法入参(默认只有BusinessActionContext),rollback方法、方法名称、方法入参(默认只有BusinessActionContext),封装的两阶段方法的key
// 将TCCResource注册到TC时,resourceId为@TwoPhaseBusinessAction名称,这也是@TwoPhaseBusinessAction名称要全局唯一的原因
for (Method m : methods) {
TwoPhaseBusinessAction twoPhaseBusinessAction = m.getAnnotation(TwoPhaseBusinessAction.class);
if (twoPhaseBusinessAction != null) {
TCCResource tccResource = new TCCResource();
tccResource.setActionName(twoPhaseBusinessAction.name());
tccResource.setTargetBean(targetBean);
tccResource.setPrepareMethod(m);
tccResource.setCommitMethodName(twoPhaseBusinessAction.commitMethod());
tccResource.setCommitMethod(interfaceClass.getMethod(twoPhaseBusinessAction.commitMethod(),
twoPhaseBusinessAction.commitArgsClasses()));
tccResource.setRollbackMethodName(twoPhaseBusinessAction.rollbackMethod());
tccResource.setRollbackMethod(interfaceClass.getMethod(twoPhaseBusinessAction.rollbackMethod(),
twoPhaseBusinessAction.rollbackArgsClasses()));
// set argsClasses
tccResource.setCommitArgsClasses(twoPhaseBusinessAction.commitArgsClasses());
tccResource.setRollbackArgsClasses(twoPhaseBusinessAction.rollbackArgsClasses());
// set phase two method's keys
tccResource.setPhaseTwoCommitKeys(this.getTwoPhaseArgs(tccResource.getCommitMethod(),
twoPhaseBusinessAction.commitArgsClasses()));
tccResource.setPhaseTwoRollbackKeys(this.getTwoPhaseArgs(tccResource.getRollbackMethod(),
twoPhaseBusinessAction.rollbackArgsClasses()));
// registry tcc resource,将当前TCCResource作为资源注册到TC(Seata Server)中。
DefaultResourceManager.get().registerResource(tccResource);
}
}
} catch (Throwable t) {
throw new FrameworkException(t, "parser remoting service error");
}
}
if (remotingParser.isReference(bean, beanName)) {
//reference bean, TCC proxy
remotingBeanDesc.setReference(true);
}
return remotingBeanDesc;
}
解析TCC内容具体逻辑:
- 获取Bean实现的标注了@LocalTCC注解的接口信息:接口全路径类名、接口Class信息、实现接口的当前Bean(动态代理后的);
- 将当前bean的LocalTCC信息保存到Map缓存
remotingServiceMap
中,后续获取bean对应的LocalTCC信息,都走remotingServiceMap缓存;- 遍历实现了@LocalTCC接口的所有方法,如果方法上标注了@TwoPhaseBusinessAction注解,则将一些TCC信息包装为TCCResource注册到TC(seata server);
- TCC信息包括:@TwoPhaseBusinessAction名称,目标类,try方法,commit方法、方法名称、方法入参(默认只有BusinessActionContext),rollback方法、方法名称、方法入参(默认只有BusinessActionContext),封装的两阶段方法的key;
- 将TCCResource注册到TC时,resourceId为@TwoPhaseBusinessAction名称,这也是@TwoPhaseBusinessAction名称要全局唯一的原因。
TCCResource数据样例
3> TCCResource注册到TC的流程
RM/TM 和 TC交互的详细流程见博文:分布式事务Seata源码解析四:图解Seata Client 如何与Seata Server建立连接、通信
1)RM发起注册Resource请求
请求为:RegisterRMRequest,其中resourceId信息最为关键;
RM在将TCCResource注册到TC时,会在以resourceId为key在本地缓存TCCResource的具体信息;
2)TC接收注册Resource请求
在seata-server启动时会注册各种请求类型对应的处理器;
由于RegisterRMRequest对应的MessgeType为TYPE_REG_RM
:
所以对应的处理器为RegRmProcessor:
RegRmProcessor
处理时最终会将Resource数据(resourceId、clientIp)保存在TC的内存层面:
三、发起TCC分支事务
调用标注了@LocalTCC注解的接口的实现类的方法时,会进入到TccActionInterceptor#invoke()方法,这正是发起TCC分支事务的入口;
@Override
public Object invoke(final MethodInvocation invocation) throws Throwable {
// 当前事务必须处于全局事务中
if (!RootContext.inGlobalTransaction() || disable || RootContext.inSagaBranch()) {
//not in transaction, or this interceptor is disabled
return invocation.proceed();
}
// 获取本次要调用的方法,在class接口中的体现
Method method = getActionInterfaceMethod(invocation);
TwoPhaseBusinessAction businessAction = method.getAnnotation(TwoPhaseBusinessAction.class);
// try method,方法必须要标注@TwoPhaseBusinessAction注解才会走TCC逻辑
if (businessAction != null) {
// save the xid
String xid = RootContext.getXID();
//save the previous branchType
BranchType previousBranchType = RootContext.getBranchType();
//if not TCC, bind TCC branchType
if (BranchType.TCC != previousBranchType) {
RootContext.bindBranchType(BranchType.TCC);
}
try {
// Handler the TCC Aspect, and return the business result
return actionInterceptorHandler.proceed(
method, // 目标业务方法
invocation.getArguments(), // 调用方法时传递进来的参数
xid, // 全局事务xid
businessAction, // 标注的@TwoPhaseBusinessAction注解内容
invocation::proceed); // 目标业务方法的执行
} finally {
//if not TCC, unbind branchType
if (BranchType.TCC != previousBranchType) {
RootContext.unbindBranchType();
}
//MDC remove branchId
MDC.remove(RootContext.MDC_KEY_BRANCH_ID);
}
}
//not TCC try method
return invocation.proceed();
}
- 如果当前事务不处于全局事务中 或 禁用了全局事务 或 当前事务模式为SAGA,则直接执行目标方法;
- 如果要执行的方法,没有@TwoPhaseBusinessAction注解标注,则直接执行目标方法;
- 否者,将TCC事务模式绑定到RootContext,将全局事务xid、方法的入参、@TwoPhaseBusinessAction注解内容、目标业务方法的执行传递到
ActionInterceptorHandler#proceed()
方法中进行TCC切面的处理、业务的执行。- 最后清理RootContext的信息、包括TCC分支事务类型、分支事务branchId。
1、ActionInterceptorHandler
public Object proceed(Method method, Object[] arguments, String xid, TwoPhaseBusinessAction businessAction,
Callback<Object> targetCallback) throws Throwable {
// Get action context from arguments, or create a new one and then reset to arguments
BusinessActionContext actionContext = getOrCreateActionContextAndResetToArguments(method.getParameterTypes(), arguments);
// Set the xid
actionContext.setXid(xid);
// Set the action name,TCC注解@TwoPhaseBusinessAction中定义的tcc业务动作名称
String actionName = businessAction.name();
actionContext.setActionName(actionName);
// Set the delay report,延时report上报
actionContext.setDelayReport(businessAction.isDelayReport());
// Creating Branch Record,发起TCC分支事务的注册 并 创建分支事务的记录,拿到TC返回的branch_id
String branchId = doTccActionLogStore(method, arguments, businessAction, actionContext);
actionContext.setBranchId(branchId);
// MDC put branchId
MDC.put(RootContext.MDC_KEY_BRANCH_ID, branchId);
// save the previous action context
BusinessActionContext previousActionContext = BusinessActionContextUtil.getContext();
try {
//share actionContext implicitly
BusinessActionContextUtil.setContext(actionContext);
if (businessAction.useTCCFence()) {
try {
// Use TCC Fence, and return the business result
return TCCFenceHandler.prepareFence(xid, Long.valueOf(branchId), actionName, targetCallback);
} catch (SkipCallbackWrapperException | UndeclaredThrowableException e) {
Throwable originException = e.getCause();
if (originException instanceof FrameworkException) {
LOGGER.error("[{}] prepare TCC fence error: {}", xid, originException.getMessage());
}
throw originException;
}
} else {
// 执行目标方法,Execute business, and return the business result
return targetCallback.execute();
}
} finally {
try {
//to report business action context finally if the actionContext.getUpdated() is true
BusinessActionContextUtil.reportContext(actionContext);
} finally {
if (previousActionContext != null) {
// recovery the previous action context
BusinessActionContextUtil.setContext(previousActionContext);
} else {
// clear the action context
BusinessActionContextUtil.clear();
}
}
}
}
- 尝试从方法入参中获取BusinessActionContext,如果获取不到,则创建一个BusinessActionContext
- 将@TwoPhaseBusinessAction注解中的name()作为TCC业务执行上下文的actionName、isDelayReport()作为延时上报标识。
- 根据@BusinessActionContextParameter从方法入参中提取出一些数据放到TCC业务执行上下文中,try方法开始时间、RM本地IP等信息也放入到TCC业务执行上下文。
- 向TC发起分支事务注册请求,获取到返回的branchId。
- 最后执行目标业务方法。
其中businessAction.useTCCFence()
为TRUE时,标识seata对TCC幂等、悬挂、空回滚问题的处理。见下一篇文章。
分支事务注册到全局事务
解析入参中的@BusinessActionContextParameter注解、向TC注册分支事务均体现在doTccActionLogStore()
方法:
protected String doTccActionLogStore(Method method, Object[] arguments, TwoPhaseBusinessAction businessAction,
BusinessActionContext actionContext) {
String actionName = actionContext.getActionName();
String xid = actionContext.getXid();
//region fetch context and init action context
// 根据@BusinessActionContextParameter从方法入参中提取出一些数据放到BusinessActionContext上下文中
Map<String, Object> context = fetchActionRequestContext(method, arguments);
// 记录TCC开始的时间
context.put(Constants.ACTION_START_TIME, System.currentTimeMillis());
// Init business context,
// 初始化业务上下文,将@TwoPhaseBusinessAction的参数(name()、commit()、rollback()、useTCCFence())放到上下文中
initBusinessContext(context, method, businessAction);
//I nit running environment context
// 将本地ip添加到BusinessActionContext中
initFrameworkContext(context);
Map<String, Object> originContext = actionContext.getActionContext();
if (CollectionUtils.isNotEmpty(originContext)) {
//Merge context and origin context if it exists.
//@since: above 1.4.2
originContext.putAll(context);
context = originContext;
} else {
actionContext.setActionContext(context);
}
//endregion
//Init applicationData
Map<String, Object> applicationContext = Collections.singletonMap(Constants.TCC_ACTION_CONTEXT, context);
String applicationContextStr = JSON.toJSONString(applicationContext);
try {
// registry branch record
// 向TC发起分支事务注册请求
Long branchId = DefaultResourceManager.get().branchRegister(BranchType.TCC, actionName, null, xid,
applicationContextStr, null);
return String.valueOf(branchId);
} catch (Throwable t) {
String msg = String.format("TCC branch Register error, xid: %s", xid);
LOGGER.error(msg, t);
throw new FrameworkException(t, msg);
}
}
- 调用
fetchActionRequestContext()
方法,根据@BusinessActionContextParameter从方法入参中提取出一些数据放到BusinessActionContext;- 将try方法开始时间放入到BusinessActionContext;
- 调用
initBusinessContext()
方法,将@TwoPhaseBusinessAction的参数(name()、commit()、rollback()、useTCCFence())放到上下文中;- 调用
initFrameworkContext()
方法,将本地ip添加到BusinessActionContext中;- 将分支事务注册请求
BranchRegisterRequest
通过netty框架发送到TC,进行分支事务的注册。TC会将分支事务信息持久化。
- 分支事务的BusinessActionContext信息会作为applicationContext的一部分持久化到branch_table的application_data字段中。
具体分支事务注册到全局事务的流程见博文:《分布式事务Seata源码解析九:分支事务如何注册到全局事务》。
四、TCC分支事务提交
详细执行流程见博文: 分布式事务Seata源码解析11:全局事务执行流程之两阶段全局事务提交
分支事务的提交最终走到ResourceManager
的具体实现类TCCResourceManager
#branchCommit()
方法:
@Override
public BranchStatus branchCommit(BranchType branchType, String xid, long branchId, String resourceId,
String applicationData) throws TransactionException {
// 根据resourceId(也就是@TwoPhaseBusinessAction注解中的name())从内存层面的缓存Map中获取到对应的TCCResource;
TCCResource tccResource = (TCCResource)tccResourceCache.get(resourceId);
if (tccResource == null) {
throw new ShouldNeverHappenException(String.format("TCC resource is not exist, resourceId: %s", resourceId));
}
// 获取目标bean
Object targetTCCBean = tccResource.getTargetBean();
// 获取目标bean的commit()方法
Method commitMethod = tccResource.getCommitMethod();
if (targetTCCBean == null || commitMethod == null) {
throw new ShouldNeverHappenException(String.format("TCC resource is not available, resourceId: %s", resourceId));
}
try {
// BusinessActionContext数据会作为applicationData的一部分,由TC发送过来。需要从中提取出两阶段提交的参数(比如解析@BusinessActionContextParameter注解得到的数据);
BusinessActionContext businessActionContext = getBusinessActionContext(xid, branchId, resourceId,
applicationData);
Object[] args = this.getTwoPhaseCommitArgs(tccResource, businessActionContext);
Object ret;
boolean result;
// add idempotent and anti hanging
if (Boolean.TRUE.equals(businessActionContext.getActionContext(Constants.USE_TCC_FENCE))) {
try {
result = TCCFenceHandler.commitFence(commitMethod, targetTCCBean, xid, branchId, args);
} catch (SkipCallbackWrapperException | UndeclaredThrowableException e) {
throw e.getCause();
}
} else {
// 使用反射将BusinessActionContext作为入参对目标bean的commit()方法进行调用
ret = commitMethod.invoke(targetTCCBean, args);
if (ret != null) {
if (ret instanceof TwoPhaseResult) {
result = ((TwoPhaseResult)ret).isSuccess();
} else {
result = (boolean)ret;
}
} else {
result = true;
}
}
LOGGER.info("TCC resource commit result : {}, xid: {}, branchId: {}, resourceId: {}", result, xid, branchId, resourceId);
return result ? BranchStatus.PhaseTwo_Committed : BranchStatus.PhaseTwo_CommitFailed_Retryable;
} catch (Throwable t) {
String msg = String.format("commit TCC resource error, resourceId: %s, xid: %s.", resourceId, xid);
LOGGER.error(msg, t);
return BranchStatus.PhaseTwo_CommitFailed_Retryable;
}
}
- 先根据resourceId(也就是@TwoPhaseBusinessAction注解中的name())从内存层面的缓存Map中获取到对应的TCCResource;进而获取到目标bean、目标bean的commit()方法;
- BusinessActionContext数据会作为applicationData的一部分,由TC发送过来。需要从中提取出两阶段提交的参数(比如解析@BusinessActionContextParameter注解得到的数据);
- 再将xid、branchId都封装到BusinessActionContext中。
- 使用反射将BusinessActionContext作为入参对目标bean的commit()方法进行调用;根据commit()方法是否执行成功,来决定TC是否需要重试分支事务的提交。
五、TCC分支事务回滚
详细执行流程见博文:分布式事务Seata源码解析12:全局事务执行流程之全局事务回滚
和分支事务的提交类似,分支事务的提交最终走到ResourceManager
的具体实现类TCCResourceManager
#branchRollback()
方法:
@Override
public BranchStatus branchRollback(BranchType branchType, String xid, long branchId, String resourceId,
String applicationData) throws TransactionException {
// 根据resourceId(也就是@TwoPhaseBusinessAction注解中的name())从内存层面的缓存Map中获取到对应的TCCResource;
TCCResource tccResource = (TCCResource)tccResourceCache.get(resourceId);
if (tccResource == null) {
throw new ShouldNeverHappenException(String.format("TCC resource is not exist, resourceId: %s", resourceId));
}
// 获取目标bean
Object targetTCCBean = tccResource.getTargetBean();
// 获取目标bean的collback()方法
Method rollbackMethod = tccResource.getRollbackMethod();
if (targetTCCBean == null || rollbackMethod == null) {
throw new ShouldNeverHappenException(String.format("TCC resource is not available, resourceId: %s", resourceId));
}
try {
// BusinessActionContext数据会作为applicationData的一部分,由TC发送过来。需要从中提取出两阶段提交的参数(比如解析@BusinessActionContextParameter注解得到的数据);
BusinessActionContext businessActionContext = getBusinessActionContext(xid, branchId, resourceId,
applicationData);
Object[] args = this.getTwoPhaseRollbackArgs(tccResource, businessActionContext);
Object ret;
boolean result;
// add idempotent and anti hanging
if (Boolean.TRUE.equals(businessActionContext.getActionContext(Constants.USE_TCC_FENCE))) {
try {
result = TCCFenceHandler.rollbackFence(rollbackMethod, targetTCCBean, xid, branchId,
args, tccResource.getActionName());
} catch (SkipCallbackWrapperException | UndeclaredThrowableException e) {
throw e.getCause();
}
} else {
// 使用反射将BusinessActionContext作为入参对目标bean的commit()方法进行调用
ret = rollbackMethod.invoke(targetTCCBean, args);
if (ret != null) {
if (ret instanceof TwoPhaseResult) {
result = ((TwoPhaseResult)ret).isSuccess();
} else {
result = (boolean)ret;
}
} else {
result = true;
}
}
LOGGER.info("TCC resource rollback result : {}, xid: {}, branchId: {}, resourceId: {}", result, xid, branchId, resourceId);
return result ? BranchStatus.PhaseTwo_Rollbacked : BranchStatus.PhaseTwo_RollbackFailed_Retryable;
} catch (Throwable t) {
String msg = String.format("rollback TCC resource error, resourceId: %s, xid: %s.", resourceId, xid);
LOGGER.error(msg, t);
return BranchStatus.PhaseTwo_RollbackFailed_Retryable;
}
}
这里和分支事务提交的逻辑是一样的!
- 先根据resourceId(也就是@TwoPhaseBusinessAction注解中的name())从内存层面的缓存Map中获取到对应的TCCResource;进而获取到目标bean、目标bean的rollback()方法;
- BusinessActionContext数据会作为applicationData的一部分,由TC发送过来。需要从中提取出两阶段提交的参数(比如解析@BusinessActionContextParameter注解得到的数据);
- 再将xid、branchId都封装到BusinessActionContext中。
- 使用反射将BusinessActionContext作为入参对目标bean的rollback()方法进行调用;根据rollback()方法是否执行成功,来决定TC是否需要重试分支事务的提交。
六、总结
1> seata-client(TM/RM)启动时:
- 对标注了@LocalTCC注解接口的实现类多动态代理、并将@TwoPhaseBusinessAction注解的内容封装为TCCResource,保存在各个RM的
TCCResourceManager
中的Map缓存tccResourceCache
中(key为resourceId(即:@TwoPhaseBusinessAction的name())); - 将resourceId注册到TC中,TC将其保存在内存中。
2> 执行分支事务:
调用标注了@LocalTCC注解的接口的实现类的方法时,会进入到TccActionInterceptor#invoke()方法:
- 如果当前事务不处于全局事务中 或 禁用了全局事务 或 当前事务模式为SAGA,则直接执行目标方法;
- 如果要执行的方法,没有@TwoPhaseBusinessAction注解标注,则直接执行目标方法;
- 否者,将TCC事务模式绑定到RootContext,将全局事务xid、方法的入参、@TwoPhaseBusinessAction注解内容、目标业务方法的执行传递到
ActionInterceptorHandler#proceed()
方法中进行TCC切面的处理、业务的执行。
- 尝试从方法入参中获取BusinessActionContext,如果获取不到,则创建一个BusinessActionContext;
- 将@TwoPhaseBusinessAction注解中的name()作为TCC业务执行上下文的actionName、isDelayReport()作为延时上报标识;
- 根据@BusinessActionContextParameter从方法入参中提取出一些数据放到TCC业务执行上下文中,try方法开始时间、RM本地IP等信息也放入到TCC业务执行上下文;
- 向TC发起分支事务注册请求,获取到返回的branchId;
- 分支事务的BusinessActionContext信息会作为applicationContext的一部分持久化到branch_table的application_data字段中。
- 最后执行目标业务方法;
- 最后清理RootContext的信息、包括TCC分支事务类型、分支事务branchId。
3> 提交分支事务:
- 先根据resourceId(也就是@TwoPhaseBusinessAction注解中的name())从内存层面的缓存Map中获取到对应的TCCResource;进而获取到目标bean、目标bean的commit()方法;
- BusinessActionContext数据会作为applicationData的一部分,由TC发送过来。需要从中提取出两阶段提交的参数(比如解析@BusinessActionContextParameter注解得到的数据);
- 再将xid、branchId都封装到BusinessActionContext中。
- 使用反射将BusinessActionContext作为入参对目标bean的commit()方法进行调用;根据commit()方法是否执行成功,来决定TC是否需要重试分支事务的提交。
4> 回滚分支事务:
- 先根据resourceId(也就是@TwoPhaseBusinessAction注解中的name())从内存层面的缓存Map中获取到对应的TCCResource;进而获取到目标bean、目标bean的rollback()方法;
- BusinessActionContext数据会作为applicationData的一部分,由TC发送过来。需要从中提取出两阶段提交的参数(比如解析@BusinessActionContextParameter注解得到的数据);
- 再将xid、branchId都封装到BusinessActionContext中。
- 使用反射将BusinessActionContext作为入参对目标bean的rollback()方法进行调用;根据rollback()方法是否执行成功,来决定TC是否需要重试分支事务的提交。