源码仓库:https://gitee.com/haijun1998/seata.git 分支source-read-1.5.0
1. GlobalTransactional
@GlobalTransactional 注解,提供给客户端来创建一个全局事务,@GlobalTransactional 注解由 GlobalTransactionScanner 进行扫描, 通过 GlobalTransactionalInterceptor 对其进行拦截增强
@Retention(RetentionPolicy.RUNTIME)
@Target({ElementType.METHOD,ElementType.TYPE})
@Inherited
public @interface GlobalTransactional {
/**
* 全局事务超时时间,如果配置了 client.tm.default-global-transaction-timeout,将会替换默认值
* 默认时间:60000毫秒
*/
int timeoutMills() default DefaultValues.DEFAULT_GLOBAL_TRANSACTION_TIMEOUT;
/**
* 设置全局事务的名称
*/
String name() default "";
/**
* 指定回滚的的异常类
*/
Class<? extends Throwable>[] rollbackFor() default {};
/**
* 指定事务回滚的class名称
*/
String[] rollbackForClassName() default {};
/**
* 指定不需要回滚的异常
*/
Class<? extends Throwable>[] noRollbackFor() default {};
/**
* 指定不需要回滚的异常名称
*/
String[] noRollbackForClassName() default {};
/**
* 事务的传播机制
*/
Propagation propagation() default Propagation.REQUIRED;
/**
* 全局锁的重试间隔时间,默认0的话使用全局配置
*/
int lockRetryInterval() default 0;
/**
* customized global lock retry interval(unit: ms)
* you may use this to override global config of "client.rm.lock.retryInterval"
* note: 0 or negative number will take no effect(which mean fall back to global config)
*
* @return int
*/
@Deprecated
@AliasFor("lockRetryInterval")
int lockRetryInternal() default 0;
/**
* 全局锁获取的重试次数,0或者-1不生效,使用默认全局配置
*/
int lockRetryTimes() default -1;
}
2. GlobalTransactionScanner
全局事务注解扫描器,继承了 AbstractAutoProxyCreator 用于在 bean 对象在创建时,对打了 @GlobalTransactional 注解的类添加 Aop 支持;
其中 GlobalTransactionScanner 实现了 wrapIfNecessary() 方法;这个方法是在 spring 初始化完毕之后通过 postProcessAfterInitialization() 中进行调用,在 spring aop 进行增强时所用到的两个接口:
这里会在 bean 初始化之后调用 wrapIfNecessary() 方法对 bean 添加 aop 切面进行增强
public interface BeanPostProcessor {
//在spring调用 InitializingBean接口、自定义初始化方法之前执行
default Object postProcessBeforeInitialization(Object bean, String beanName) throws BeansException {
return bean;
}
//在spring调用 InitializingBean接口、自定义初始化方法之后执行
@Nullable
default Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {
return bean;
}
}
public interface InstantiationAwareBeanPostProcessor extends BeanPostProcessor {
//在执行创建bean之前执行
@Nullable
default Object postProcessBeforeInstantiation(Class<?> beanClass, String beanName) throws BeansException {
return null;
}
//在执行 postProcessBeforeInstantiation() 方法如果返回值为null时执行
default boolean postProcessAfterInstantiation(Object bean, String beanName) throws BeansException {
return true;
}
//处理 PropertyValues,对bean通过 PropertyValues 类型进行处理,通过属性注入进行实现
default PropertyValues postProcessProperties(PropertyValues pvs, Object bean, String beanName) throws BeansException {
return null;
}
}
wrapIfNecessary()
其中会判断当前采用的事务模式,目前就是 TCC、AT
protected Object wrapIfNecessary(Object bean, String beanName, Object cacheKey) {
// 检查当前bean是否是 FactoryBean类型,是否已经被代理过了,是否是需要剔除的bean
if (!doCheckers(bean, beanName)) {
return bean;
}
try {
//加锁防止多线程问题
synchronized (PROXYED_SET) {
//如果存在代理类中直接返回
if (PROXYED_SET.contains(beanName)) {
return bean;
}
interceptor = null;
/**
* 根据4个类来进行解析 TCC 的调用方式
* io.seata.rm.tcc.remoting.parser.DubboRemotingParser:是否是dubbo调用远程的方法
* io.seata.rm.tcc.remoting.parser.LocalTCCRemotingParser:是否具有 @LocalTCC 注解
* io.seata.rm.tcc.remoting.parser.SofaRpcRemotingParser
* io.seata.rm.tcc.remoting.parser.HSFRemotingParser
* 根据 @TwoPhaseBusinessAction 注解注册对应的方式到资源管理器中 (DefaultResourceManager)
*/
if (TCCBeanParserUtils.isTccAutoProxy(bean, beanName, applicationContext)) {
// init tcc fence clean task if enable useTccFence
TCCBeanParserUtils.initTccFenceCleanTask(TCCBeanParserUtils.getRemotingDesc(beanName), applicationContext);
interceptor = new TccActionInterceptor(TCCBeanParserUtils.getRemotingDesc(beanName));
ConfigurationCache.addConfigListener(ConfigurationKeys.DISABLE_GLOBAL_TRANSACTION,
(ConfigurationChangeListener)interceptor);
} else {
//获取到当前bean的class类,如果是代理对象需要获取到目标类型
Class<?> serviceInterface = SpringProxyUtils.findTargetClass(bean);
//获取到接口
Class<?>[] interfacesIfJdk = SpringProxyUtils.findInterfaces(bean);
//判断当前bean或者父类接口中是否有 @GlobalTransactional、@GlobalLock 注解
if (!existsAnnotation(new Class[]{serviceInterface})
&& !existsAnnotation(interfacesIfJdk)) {
return bean;
}
if (globalTransactionalInterceptor == null) {
//创建一个全局的事务增强器
globalTransactionalInterceptor = new GlobalTransactionalInterceptor(failureHandlerHook);
ConfigurationCache.addConfigListener(
ConfigurationKeys.DISABLE_GLOBAL_TRANSACTION,
(ConfigurationChangeListener)globalTransactionalInterceptor);
}
interceptor = globalTransactionalInterceptor;
}
bean.getClass().getName(), beanName, interceptor.getClass().getName());
//判断当前bean是否是代理对象,如果不是代理类就去调用 AbstractAutoProxyCreator.wrapIfNecessary() 对bean进行代理
if (!AopUtils.isAopProxy(bean)) {
//其中又调用了 getAdvicesAndAdvisorsForBean() 方法,在当前类进行了覆写,其中直接返回了 interceptor 切面
bean = super.wrapIfNecessary(bean, beanName, cacheKey);
} else {
//当前bean已经是代理类,通过反射获取到两种不同代理类的其中的属性值 AdvisedSupport (spring在对其bean代理时创建的代理会有一个属性 advised属性,这个字段里面存储了切面等数据;例如:JdkDynamicAopProxy)
AdvisedSupport advised = SpringProxyUtils.getAdvisedSupport(bean);
//将获取出来的 Advisor 进行包装
Advisor[] advisor = buildAdvisors(beanName, getAdvicesAndAdvisorsForBean(null, null, null));
int pos;
for (Advisor avr : advisor) {
pos = findAddSeataAdvisorPosition(advised, avr);
advised.addAdvisor(pos, avr);
}
}
//将bean设置到代理对象集当中,表明当前对象已经被代理了
PROXYED_SET.add(beanName);
return bean;
}
} catch (Exception exx) {
throw new RuntimeException(exx);
}
}
getAdvicesAndAdvisorsForBean()
覆写的父类方法,在 AbstractAutoProxyCreator.wrapIfNecessary() 中创建代理对象时进行调用,这里直接返回了 interceptor 类型,目前这里返回以下两种:
- GlobalTransactionalInterceptor:全局的事务增强器
- TccActionInterceptor:TCC模式的增强器
protected Object[] getAdvicesAndAdvisorsForBean(Class beanClass, String beanName, TargetSource customTargetSource)
throws BeansException {
return new Object[]{interceptor};
}
initClient()
初始化了以下两个远程调用客户端,用于发起请求和监听结果
- TmNettyRemotingClient:事务管理器客户端
- RmNettyRemotingClient:资源管理器客户端
private void initClient() {
if (StringUtils.isNullOrEmpty(applicationId) || StringUtils.isNullOrEmpty(txServiceGroup)) {
throw new IllegalArgumentException(String.format("applicationId: %s, txServiceGroup: %s", applicationId, txServiceGroup));
}
//初始化TM客户端
TMClient.init(applicationId, txServiceGroup, accessKey, secretKey);
//初始化RM客户端
RMClient.init(applicationId, txServiceGroup);
if (LOGGER.isInfoEnabled()) {
LOGGER.info("Resource Manager is initialized. applicationId[{}] txServiceGroup[{}]", applicationId, txServiceGroup);
}
//注册spring容器中的钩子函数
registerSpringShutdownHook();
}
3. GlobalTransactionalInterceptor
invoke()
public Object invoke(final MethodInvocation methodInvocation) throws Throwable {
//目标源类的class对象
Class<?> targetClass =
methodInvocation.getThis() != null ? AopUtils.getTargetClass(methodInvocation.getThis()) : null;
//当前执行的方法
Method specificMethod = ClassUtils.getMostSpecificMethod(methodInvocation.getMethod(), targetClass);
if (specificMethod != null && !specificMethod.getDeclaringClass().equals(Object.class)) {
final Method method = BridgeMethodResolver.findBridgedMethod(specificMethod);
//获取到当前执行方法的 @GlobalTransactional 注解
final GlobalTransactional globalTransactionalAnnotation =
getAnnotation(method, targetClass, GlobalTransactional.class);
//获取到当前执行方法的 @GlobalLock 注解
final GlobalLock globalLockAnnotation = getAnnotation(method, targetClass, GlobalLock.class);
//判断是否关闭事务或者降低检查等级
boolean localDisable = disable || (degradeCheck && degradeNum >= degradeCheckAllowTimes);
if (!localDisable) {
if (globalTransactionalAnnotation != null || this.aspectTransactional != null) {
AspectTransactional transactional;
//获取到注解中的数据转换成 AspectTransactional 对象
if (globalTransactionalAnnotation != null) {
transactional = new AspectTransactional(globalTransactionalAnnotation.timeoutMills(),
globalTransactionalAnnotation.name(), globalTransactionalAnnotation.rollbackFor(),
globalTransactionalAnnotation.noRollbackForClassName(),
globalTransactionalAnnotation.noRollbackFor(),
globalTransactionalAnnotation.noRollbackForClassName(),
globalTransactionalAnnotation.propagation(),
globalTransactionalAnnotation.lockRetryInterval(),
globalTransactionalAnnotation.lockRetryTimes());
} else {
transactional = this.aspectTransactional;
}
//处理全局事务
return handleGlobalTransaction(methodInvocation, transactional);
} else if (globalLockAnnotation != null) {
//处理全局锁注解,会设置 KEY_GLOBAL_LOCK_FLAG 标识在 RootContext当中
return handleGlobalLock(methodInvocation, globalLockAnnotation);
}
}
}
//执行本体方法
return methodInvocation.proceed();
}
handleGlobalTransaction()
当前方法中,通过 transactionalTemplate.execute() 执行事务,其中创建了一个 TransactionalExecutor 来进行处理之后的回调,在 catch 方法块中对异常进行捕获,对对应的异常code进行判断然后执行对应的操作
Object handleGlobalTransaction(final MethodInvocation methodInvocation,
final AspectTransactional aspectTransactional) throws Throwable {
boolean succeed = true;
try {
//通过事务执行器,等待前置任务执行完成之后在执行回调函数中的方法
return transactionalTemplate.execute(new TransactionalExecutor() {
/**
* 直接执行本地方法
*/
@Override
public Object execute() throws Throwable {
return methodInvocation.proceed();
}
/**
* 获取事务的名称
*/
public String name() {
String name = aspectTransactional.getName();
if (!StringUtils.isNullOrEmpty(name)) {
return name;
}
return formatMethod(methodInvocation.getMethod());
}
/**
* 获取到事务的信息
*/
@Override
public TransactionInfo getTransactionInfo() {
// reset the value of timeout
int timeout = aspectTransactional.getTimeoutMills();
if (timeout <= 0 || timeout == DEFAULT_GLOBAL_TRANSACTION_TIMEOUT) {
timeout = defaultGlobalTransactionTimeout;
}
TransactionInfo transactionInfo = new TransactionInfo();
transactionInfo.setTimeOut(timeout);
transactionInfo.setName(name());
transactionInfo.setPropagation(aspectTransactional.getPropagation());
transactionInfo.setLockRetryInterval(aspectTransactional.getLockRetryInterval());
transactionInfo.setLockRetryTimes(aspectTransactional.getLockRetryTimes());
Set<RollbackRule> rollbackRules = new LinkedHashSet<>();
for (Class<?> rbRule : aspectTransactional.getRollbackFor()) {
rollbackRules.add(new RollbackRule(rbRule));
}
for (String rbRule : aspectTransactional.getRollbackForClassName()) {
rollbackRules.add(new RollbackRule(rbRule));
}
for (Class<?> rbRule : aspectTransactional.getNoRollbackFor()) {
rollbackRules.add(new NoRollbackRule(rbRule));
}
for (String rbRule : aspectTransactional.getNoRollbackForClassName()) {
rollbackRules.add(new NoRollbackRule(rbRule));
}
transactionInfo.setRollbackRules(rollbackRules);
return transactionInfo;
}
});
} catch (TransactionalExecutor.ExecutionException e) {
//根据对应的失败码,执行对应的操作
TransactionalExecutor.Code code = e.getCode();
switch (code) {
case RollbackDone:
throw e.getOriginalException();
case BeginFailure:
succeed = false;
failureHandler.onBeginFailure(e.getTransaction(), e.getCause());
throw e.getCause();
case CommitFailure:
succeed = false;
failureHandler.onCommitFailure(e.getTransaction(), e.getCause());
throw e.getCause();
case RollbackFailure:
failureHandler.onRollbackFailure(e.getTransaction(), e.getOriginalException());
throw e.getOriginalException();
case RollbackRetrying:
failureHandler.onRollbackRetrying(e.getTransaction(), e.getOriginalException());
throw e.getOriginalException();
default:
throw new ShouldNeverHappenException(String.format("Unknown TransactionalExecutor.Code: %s", code));
}
} finally {
if (degradeCheck) {
//发布一个时间消息
EVENT_BUS.post(new DegradeCheckEvent(succeed));
}
}
}
4. TransactionalTemplate
execute()
根据设置定的事务传播机制来选择是否需要创建事务等操作,然后在执行本地业务方法前开启一个事务,事务的传播类型:
- NOT_SUPPORTED:不支持事务
- REQUIRES_NEW:存在事务,将原来的事务暂停,然后创建一个新的事务
- SUPPORTS:支持事务,不存在事务就直接执行
- REQUIRED:如果存在事务就以当前事务执行,不存在创建一个新的
- NEVER:不支持事务
- MANDATORY:如果不存在事务抛出异常,如果存在继续执行
public Object execute(TransactionalExecutor business) throws Throwable {
TransactionInfo txInfo = business.getTransactionInfo();
if (txInfo == null) {
throw new ShouldNeverHappenException("transactionInfo does not exist");
}
//获取到全局事务信息,RootContext.getXID() 全局事务id,如果不为空,创建一个 DefaultGlobalTransaction类型,角色是事务的参与者
GlobalTransaction tx = GlobalTransactionContext.getCurrent();
//事务的传播机制
Propagation propagation = txInfo.getPropagation();
SuspendedResourcesHolder suspendedResourcesHolder = null;
try {
//判断事务的传播类型
switch (propagation) {
case NOT_SUPPORTED:
//如果存在事务,需要将当前事务停止
if (existingTransaction(tx)) {
//解绑xid
suspendedResourcesHolder = tx.suspend();
}
//执行方法
return business.execute();
case REQUIRES_NEW:
// 如果存在事务,将原来的事务暂停,然后创建一个新的事务
if (existingTransaction(tx)) {
suspendedResourcesHolder = tx.suspend();
//创建一个事务的发起者
tx = GlobalTransactionContext.createNew();
}
break;
case SUPPORTS:
// 支持事务,如果不存在事务,直接执行方法;如果存在继续执行后续的方法
if (notExistingTransaction(tx)) {
return business.execute();
}
// Continue and execute with new transaction
break;
case REQUIRED:
// 如果当前事务存在继续执行,如果不存在,下面会创建一个新的事务
break;
case NEVER:
// 如果存在事务抛出异常
if (existingTransaction(tx)) {
throw new TransactionException(
String.format("Existing transaction found for transaction marked with propagation 'never', xid = %s"
, tx.getXid()));
} else {
// 不存在事务直接执行
return business.execute();
}
case MANDATORY:
// 如果事务不存在直接抛出异常
if (notExistingTransaction(tx)) {
throw new TransactionException("No existing transaction found for transaction marked with propagation 'mandatory'");
}
// 继续执行当前事务
break;
default:
throw new TransactionException("Not Supported Propagation:" + propagation);
}
// 如果不存在事务,那么就创建一个事务发起者的新事务
if (tx == null) {
tx = GlobalTransactionContext.createNew();
}
// 创建一个全局锁的配置信息,这里返回的是被配置信息替换之前的信息
GlobalLockConfig previousConfig = replaceGlobalLockConfig(txInfo);
try {
//开启一个事务,发送事务的请求到 TC(事务协调者,也就是Server端)
beginTransaction(txInfo, tx);
Object rs;
try {
// 执行方法,在方法执行时,通过获取到数据源的代理类 DataSourceProxy,执行后续的sql解析创建前后镜像的逻辑
rs = business.execute();
} catch (Throwable ex) {
//抛出异常之后需要进行操作
completeTransactionAfterThrowing(txInfo, tx, ex);
throw ex;
}
//没有异常就提交事务
commitTransaction(tx);
return rs;
} finally {
//最后清除全局锁的配置信息
resumeGlobalLockConfig(previousConfig);
//触发完成之后的钩子函数
triggerAfterCompletion();
//清除钩子函数
cleanUp();
}
} finally {
// 如果事务是暂停的状态,需要恢复它
if (suspendedResourcesHolder != null) {
tx.resume(suspendedResourcesHolder);
}
}
}
beginTransaction()
private void beginTransaction(TransactionInfo txInfo, GlobalTransaction tx) throws TransactionalExecutor.ExecutionException {
try {
//触发开启事务之前的钩子函数
triggerBeforeBegin();
//开启事务,通过 TM 申请一个全局XID
tx.begin(txInfo.getTimeOut(), txInfo.getName());
//事务开启之后的钩子函数
triggerAfterBegin();
} catch (TransactionException txe) {
//抛出一个 开始事务失败的异常信息,方便外层获取到了之后进行对应的操作
throw new TransactionalExecutor.ExecutionException(tx, txe,
TransactionalExecutor.Code.BeginFailure);
}
}
5. GlobalTransaction
全局事务接口,用于定义一些标准方法,通过子类 DefaultGlobalTransaction 实现,上面会调用 begin() 方法
public interface GlobalTransaction {
/**
* 开启全局事务
*/
void begin() throws TransactionException;
void begin(int timeout) throws TransactionException;
void begin(int timeout, String name) throws TransactionException;
/**
* 提交事务
*/
void commit() throws TransactionException;
/**
* 回滚全局事务
*/
void rollback() throws TransactionException;
/**
* 暂停全局事务
*/
SuspendedResourcesHolder suspend() throws TransactionException;
/**
* 回复一个全局事务
*/
void resume(SuspendedResourcesHolder suspendedResourcesHolder) throws TransactionException;
/**
* 查询当前事务的状态
*/
GlobalStatus getStatus() throws TransactionException;
/**
* 获取XID
*/
String getXid();
/**
* 上报全局事务的状态信息
*/
void globalReport(GlobalStatus globalStatus) throws TransactionException;
/**
* 获取本地事务状态
*/
GlobalStatus getLocalStatus();
/**
* 获取当前全局事务的角色,是事务参与者还是事务发起者
*/
GlobalTransactionRole getGlobalTransactionRole();
}
begin()
开启一个全局事务,会对当前事务的角色进行判断,是否应该存在 XID
public void begin(int timeout, String name) throws TransactionException {
//如果角色不为事务发起者的话,判断XID是否为null
if (role != GlobalTransactionRole.Launcher) {
assertXIDNotNull();
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("Ignore Begin(): just involved in global transaction [{}]", xid);
}
return;
}
//如果角色为事务的发起者,那么XID需要为空
assertXIDNull();
String currentXid = RootContext.getXID();
if (currentXid != null) {
throw new IllegalStateException("Global transaction already exists," +
" can't begin a new global transaction, currentXid = " + currentXid);
}
//通过TM 发起rpc请求获取到全局的XID
xid = transactionManager.begin(null, null, name, timeout);
//事务状态为开启
status = GlobalStatus.Begin;
RootContext.bind(xid);
if (LOGGER.isInfoEnabled()) {
LOGGER.info("Begin new global transaction [{}]", xid);
}
}