Seata学习 @GlobalTransactional注解的作用
1.自动配置类 SeataAutoConfiguration
引入 seata与SpringBoot的整合包后,基于SpringBoot的自动配置,会往Spring容器中自动添加 SeataAutoConfiguration
而 SeataAutoConfiguration 配置类又会往容器中添加bean GlobalTransactionScanner
2.客户端核心类 GlobalTransactionScanner
继承关系图如下
间接实现了 BeanPostProcessor 接口 ,在父类 AbstractAutoProxyCreator 中,重写了 BeanPostProcessor 接口的 postProcessAfterInitialization 方法,将在 GlobalTransactionScanner 初始化后调用此方法。
public Object postProcessAfterInitialization(@Nullable Object bean, String beanName) {
if (bean != null) {
Object cacheKey = this.getCacheKey(bean.getClass(), beanName);
if (this.earlyProxyReferences.remove(cacheKey) != bean) {
return this.wrapIfNecessary(bean, beanName, cacheKey);
}
}
return bean;
}
又调用 this.wrapIfNecessary()方法,由于 GlobalTransactionScanner 重写了这个方法,因此,会调用 GlobalTransactionScanner.wrapIfNecessary()
protected Object wrapIfNecessary(Object bean, String beanName, Object cacheKey) {
// do checkers
if (!doCheckers(bean, beanName)) {
return bean;
}
try {
synchronized (PROXYED_SET) {
if (PROXYED_SET.contains(beanName)) {
return bean;
}
interceptor = null;
//check TCC proxy
if (TCCBeanParserUtils.isTccAutoProxy(bean, beanName, applicationContext)) {
// init tcc fence clean task if enable useTccFence
TCCBeanParserUtils.initTccFenceCleanTask(TCCBeanParserUtils.getRemotingDesc(beanName), applicationContext);
//TCC interceptor, proxy bean of sofa:reference/dubbo:reference, and LocalTCC
interceptor = new TccActionInterceptor(TCCBeanParserUtils.getRemotingDesc(beanName));
ConfigurationCache.addConfigListener(ConfigurationKeys.DISABLE_GLOBAL_TRANSACTION,
(ConfigurationChangeListener)interceptor);
} else {
Class<?> serviceInterface = SpringProxyUtils.findTargetClass(bean);
Class<?>[] interfacesIfJdk = SpringProxyUtils.findInterfaces(bean);
//是否存在 GlobalTransactional 注解
if (!existsAnnotation(new Class[]{serviceInterface})
&& !existsAnnotation(interfacesIfJdk)) {
return bean;
}
if (globalTransactionalInterceptor == null) {
//创建连接器
globalTransactionalInterceptor = new GlobalTransactionalInterceptor(failureHandlerHook);
ConfigurationCache.addConfigListener(
ConfigurationKeys.DISABLE_GLOBAL_TRANSACTION,
(ConfigurationChangeListener)globalTransactionalInterceptor);
}
interceptor = globalTransactionalInterceptor;
}
LOGGER.info("Bean[{}] with name [{}] would use interceptor [{}]", bean.getClass().getName(), beanName, interceptor.getClass().getName());
//当前bean是否已经被代理
if (!AopUtils.isAopProxy(bean)) {
//创建代理类
bean = super.wrapIfNecessary(bean, beanName, cacheKey);
} else {
AdvisedSupport advised = SpringProxyUtils.getAdvisedSupport(bean);
//获取切面
Advisor[] advisor = buildAdvisors(beanName, getAdvicesAndAdvisorsForBean(null, null, null));
int pos;
for (Advisor avr : advisor) {
// Find the position based on the advisor's order, and add to advisors by pos
pos = findAddSeataAdvisorPosition(advised, avr);
advised.addAdvisor(pos, avr);
}
}
PROXYED_SET.add(beanName);
return bean;
}
} catch (Exception exx) {
throw new RuntimeException(exx);
}
}
假设当前时默认AT模式:
获取当前bean的字节码对象
Class<?> serviceInterface = SpringProxyUtils.findTargetClass(bean);
判断当前字节码对象上或者方法上是否有 GlobalTransactional 注解
if (!this.existsAnnotation(new Class[]{serviceInterface}) && !this.existsAnnotation(interfacesIfJdk)) {
return bean;
}
private boolean existsAnnotation(Class<?>[] classes) {
if (CollectionUtils.isNotEmpty(classes)) {
for (Class<?> clazz : classes) {
//当前类上是否有GlobalTransactional注解
if (clazz == null) {
continue;
}
GlobalTransactional trxAnno = clazz.getAnnotation(GlobalTransactional.class);
if (trxAnno != null) {
return true;
}
Method[] methods = clazz.getMethods();
for (Method method : methods) {
//当前方法上是否有GlobalTransactional注解
trxAnno = method.getAnnotation(GlobalTransactional.class);
if (trxAnno != null) {
return true;
}
GlobalLock lockAnno = method.getAnnotation(GlobalLock.class);
if (lockAnno != null) {
return true;
}
}
}
}
return false;
}
创建代理类是针对整个类而言,就算这个类中只有一个方法上有 GlobalTransactional 注解,其他方法在被调用时也会走增强逻辑,
seata肯定会在后续方法调用中判断
创建全局事务拦截器 参数 failureHandlerHook 是一个事务处理失败回调钩子,默认为 DefaultFailureHandlerImpl,如有需要可以自定义,做事务失败时邮件提醒等等其他功能开发
this.globalTransactionalInterceptor = new GlobalTransactionalInterceptor(this.failureHandlerHook);
当一个带有 @GlobalTransactional 注解的方法被调用时,GlobalTransactionalInterceptor 会拦截该方法并开启一个全局事务。在该方法执行完毕后,GlobalTransactionalInterceptor 会根据事务执行结果决定是提交还是回滚该事务。因此,GlobalTransactionalInterceptor 会在事务开始和结束时被调用。
调用父类的AbstractAutoProxyCreator.wrapIfNecessary 方法
bean = super.wrapIfNecessary(bean, beanName, cacheKey);
rotected Object wrapIfNecessary(Object bean, String beanName, Object cacheKey) {
if (StringUtils.hasLength(beanName) && this.targetSourcedBeans.contains(beanName)) {
return bean;
} else if (Boolean.FALSE.equals(this.advisedBeans.get(cacheKey))) {
return bean;
} else if (!this.isInfrastructureClass(bean.getClass()) && !this.shouldSkip(bean.getClass(), beanName)) {
//将GlobalTransactionalInterceptor 取出
Object[] specificInterceptors = this.getAdvicesAndAdvisorsForBean(bean.getClass(), beanName, (TargetSource)null);
if (specificInterceptors != DO_NOT_PROXY) {
this.advisedBeans.put(cacheKey, Boolean.TRUE);
Object proxy = this.createProxy(bean.getClass(), beanName, specificInterceptors, new SingletonTargetSource(bean));
this.proxyTypes.put(cacheKey, proxy.getClass());
return proxy;
} else {
this.advisedBeans.put(cacheKey, Boolean.FALSE);
return bean;
}
} else {
this.advisedBeans.put(cacheKey, Boolean.FALSE);
return bean;
}
}
创建代理类
Object proxy = this.createProxy(bean.getClass(), beanName, specificInterceptors, new SingletonTargetSource(bean));
3.全局事务拦截器 GlobalTransactionalInterceptor
找到invoke方法
public Object invoke(final MethodInvocation methodInvocation) throws Throwable {
//获取当前方法所属对象
Class<?> targetClass = methodInvocation.getThis() != null ? AopUtils.getTargetClass(methodInvocation.getThis()) : null;
//获取方法对象
Method specificMethod = ClassUtils.getMostSpecificMethod(methodInvocation.getMethod(), targetClass);
//不能时Object对象的方法
if (specificMethod != null && !specificMethod.getDeclaringClass().equals(Object.class)) {
//寻找桥接方法
Method method = BridgeMethodResolver.findBridgedMethod(specificMethod);
//获取方法上的全局事务注解对象
GlobalTransactional globalTransactionalAnnotation = (GlobalTransactional)this.getAnnotation(method, targetClass, GlobalTransactional.class);
//获取方法上的全局锁注解对象
GlobalLock globalLockAnnotation = (GlobalLock)this.getAnnotation(method, targetClass, GlobalLock.class);
boolean localDisable = this.disable || ATOMIC_DEGRADE_CHECK.get() && degradeNum >= degradeCheckAllowTimes;
if (!localDisable) {
//方法上面要有 globalTransactionalAnnotation 注解,否则直接调用原有方法
if (globalTransactionalAnnotation != null || this.aspectTransactional != null) {
AspectTransactional transactional;
if (globalTransactionalAnnotation != null) {
//创建一个全局事务切面配置类
transactional = new AspectTransactional(globalTransactionalAnnotation.timeoutMills(), globalTransactionalAnnotation.name(), globalTransactionalAnnotation.rollbackFor(), globalTransactionalAnnotation.rollbackForClassName(), globalTransactionalAnnotation.noRollbackFor(), globalTransactionalAnnotation.noRollbackForClassName(), globalTransactionalAnnotation.propagation(), globalTransactionalAnnotation.lockRetryInterval(), globalTransactionalAnnotation.lockRetryTimes(), globalTransactionalAnnotation.lockStrategyMode());
} else {
transactional = this.aspectTransactional;
}
//增强方法 处理全局事务
return this.handleGlobalTransaction(methodInvocation, transactional);
}
if (globalLockAnnotation != null) {
return this.handleGlobalLock(methodInvocation, globalLockAnnotation);
}
}
}
return methodInvocation.proceed();
}
4.真正处理全局事务的方法 handleGlobalTransaction
Object handleGlobalTransaction(final MethodInvocation methodInvocation, final AspectTransactional aspectTransactional) throws Throwable {
boolean succeed = true;
Object var4;
try {
// 事务管理模版类执行execute方法
var4 = this.transactionalTemplate.execute(new TransactionalExecutor() {
public Object execute() throws Throwable {
return methodInvocation.proceed();
}
public String name() {
String name = aspectTransactional.getName();
return !StringUtils.isNullOrEmpty(name) ? name : GlobalTransactionalInterceptor.this.formatMethod(methodInvocation.getMethod());
}
public TransactionInfo getTransactionInfo() {
int timeout = aspectTransactional.getTimeoutMills();
if (timeout <= 0 || timeout == 60000) {
timeout = GlobalTransactionalInterceptor.defaultGlobalTransactionTimeout;
}
TransactionInfo transactionInfo = new TransactionInfo();
transactionInfo.setTimeOut(timeout);
transactionInfo.setName(this.name());
transactionInfo.setPropagation(aspectTransactional.getPropagation());
transactionInfo.setLockRetryInterval(aspectTransactional.getLockRetryInterval());
transactionInfo.setLockRetryTimes(aspectTransactional.getLockRetryTimes());
transactionInfo.setLockStrategyMode(aspectTransactional.getLockStrategyMode());
Set<RollbackRule> rollbackRules = new LinkedHashSet();
Class[] var4 = aspectTransactional.getRollbackFor();
int var5 = var4.length;
int var6;
Class rbRule;
for(var6 = 0; var6 < var5; ++var6) {
rbRule = var4[var6];
rollbackRules.add(new RollbackRule(rbRule));
}
String[] var8 = aspectTransactional.getRollbackForClassName();
var5 = var8.length;
String rbRulex;
for(var6 = 0; var6 < var5; ++var6) {
rbRulex = var8[var6];
rollbackRules.add(new RollbackRule(rbRulex));
}
var4 = aspectTransactional.getNoRollbackFor();
var5 = var4.length;
for(var6 = 0; var6 < var5; ++var6) {
rbRule = var4[var6];
rollbackRules.add(new NoRollbackRule(rbRule));
}
var8 = aspectTransactional.getNoRollbackForClassName();
var5 = var8.length;
for(var6 = 0; var6 < var5; ++var6) {
rbRulex = var8[var6];
rollbackRules.add(new NoRollbackRule(rbRulex));
}
transactionInfo.setRollbackRules(rollbackRules);
return transactionInfo;
}
});
} catch (TransactionalExecutor.ExecutionException var9) {
//异常回调方法
TransactionalExecutor.Code code = var9.getCode();
switch (code) {
case RollbackDone:
throw var9.getOriginalException();
case BeginFailure:
succeed = false;
this.failureHandler.onBeginFailure(var9.getTransaction(), var9.getCause());
throw var9.getCause();
case CommitFailure:
succeed = false;
this.failureHandler.onCommitFailure(var9.getTransaction(), var9.getCause());
throw var9.getCause();
case RollbackFailure:
this.failureHandler.onRollbackFailure(var9.getTransaction(), var9.getOriginalException());
throw var9.getOriginalException();
case RollbackRetrying:
this.failureHandler.onRollbackRetrying(var9.getTransaction(), var9.getOriginalException());
throw var9.getOriginalException();
case TimeoutRollback:
this.failureHandler.onTimeoutRollback(var9.getTransaction(), var9.getOriginalException());
throw var9.getCause();
default:
throw new ShouldNeverHappenException(String.format("Unknown TransactionalExecutor.Code: %s", code));
}
} finally {
if (ATOMIC_DEGRADE_CHECK.get()) {
EVENT_BUS.post(new DegradeCheckEvent(succeed));
}
}
return var4;
}
5.事务管理模版类 TransactionalTemplate
public Object execute(TransactionalExecutor business) throws Throwable {
// 1. Get transactionInfo
TransactionInfo txInfo = business.getTransactionInfo();
if (txInfo == null) {
throw new ShouldNeverHappenException("transactionInfo does not exist");
}
// 1.1 Get current transaction, if not null, the tx role is 'GlobalTransactionRole.Participant'.
GlobalTransaction tx = GlobalTransactionContext.getCurrent();
// 1.2 Handle the transaction propagation.
Propagation propagation = txInfo.getPropagation();
SuspendedResourcesHolder suspendedResourcesHolder = null;
try {
switch (propagation) {
case NOT_SUPPORTED:
// If transaction is existing, suspend it.
if (existingTransaction(tx)) {
suspendedResourcesHolder = tx.suspend();
}
// Execute without transaction and return.
return business.execute();
case REQUIRES_NEW:
// If transaction is existing, suspend it, and then begin new transaction.
if (existingTransaction(tx)) {
suspendedResourcesHolder = tx.suspend();
tx = GlobalTransactionContext.createNew();
}
// Continue and execute with new transaction
break;
case SUPPORTS:
// If transaction is not existing, execute without transaction.
if (notExistingTransaction(tx)) {
return business.execute();
}
// Continue and execute with new transaction
break;
case REQUIRED:
// If current transaction is existing, execute with current transaction,
// else continue and execute with new transaction.
break;
case NEVER:
// If transaction is existing, throw exception.
if (existingTransaction(tx)) {
throw new TransactionException(
String.format("Existing transaction found for transaction marked with propagation 'never', xid = %s"
, tx.getXid()));
} else {
// Execute without transaction and return.
return business.execute();
}
case MANDATORY:
// If transaction is not existing, throw exception.
if (notExistingTransaction(tx)) {
throw new TransactionException("No existing transaction found for transaction marked with propagation 'mandatory'");
}
// Continue and execute with current transaction.
break;
default:
throw new TransactionException("Not Supported Propagation:" + propagation);
}
// 1.3 If null, create new transaction with role 'GlobalTransactionRole.Launcher'.
if (tx == null) {
tx = GlobalTransactionContext.createNew();
}
// set current tx config to holder
GlobalLockConfig previousConfig = replaceGlobalLockConfig(txInfo);
try {
// 2. If the tx role is 'GlobalTransactionRole.Launcher', send the request of beginTransaction to TC,
// else do nothing. Of course, the hooks will still be triggered.
beginTransaction(txInfo, tx);
Object rs;
try {
// Do Your Business
rs = business.execute();
} catch (Throwable ex) {
// 3. The needed business exception to rollback.
completeTransactionAfterThrowing(txInfo, tx, ex);
throw ex;
}
// 4. everything is fine, commit.
commitTransaction(tx);
return rs;
} finally {
//5. clear
resumeGlobalLockConfig(previousConfig);
triggerAfterCompletion();
cleanUp();
}
} finally {
// If the transaction is suspended, resume it.
if (suspendedResourcesHolder != null) {
tx.resume(suspendedResourcesHolder);
}
}
}
主要一下几步:
- 获取全局事务信息类TransactionInfo
- 获取全局事务对象,如果当前全局事务上下文中已经有了全局事务,那么当前事务肯定是分支事务
- 处理事务传播行为
- 如果当前全局事务对象为null,创建一个全局事务
- 开启全局事务
- 执行目标方法
- 提交事务/出现异常回滚事务
- (一定会执行) 释放全局锁/回调方法
try {
// 2. If the tx role is 'GlobalTransactionRole.Launcher', send the request of beginTransaction to TC,
// else do nothing. Of course, the hooks will still be triggered.
beginTransaction(txInfo, tx);
Object rs;
try {
// Do Your Business
rs = business.execute();
} catch (Throwable ex) {
// 3. The needed business exception to rollback.
completeTransactionAfterThrowing(txInfo, tx, ex);
throw ex;
}
// 4. everything is fine, commit.
commitTransaction(tx);
return rs;
} finally {
//5. clear
resumeGlobalLockConfig(previousConfig);
triggerAfterCompletion();
cleanUp();
}
} finally {
// If the transaction is suspended, resume it.
if (suspendedResourcesHolder != null) {
tx.resume(suspendedResourcesHolder);
}
6. 全局事务的两个角色
全局事务一共有两个角色
- 事务创建者 :创建新xid
- 事务参与者 : 绑定xid
public enum GlobalTransactionRole {
/**
* The Launcher.
*/
// The one begins the current global transaction.
Launcher,
/**
* The Participant.
*/
// The one just joins into a existing global transaction.
Participant
}