文章目录
- 初始化TM和RM
- 数据源代理
由于我们一般都是在springboot中使用的,而与springboot集成的我们一般就先看starter的spring.factories文件,看看它的自动装配
这里面主要关注SeataAutoConfiguration和SeataDataSourceAutoConfiguration。
SeataAutoConfiguration
@ConditionalOnProperty(prefix = SEATA_PREFIX, name = "enabled", havingValue = "true", matchIfMissing = true)
public class SeataAutoConfiguration {
private static final Logger LOGGER = LoggerFactory.getLogger(SeataAutoConfiguration.class);
/**
* 默认的失败处理程序
*/
@Bean(BEAN_NAME_FAILURE_HANDLER)
@ConditionalOnMissingBean(FailureHandler.class)
public FailureHandler failureHandler() {
return new DefaultFailureHandlerImpl();
}
/**
* 全局事务扫描器,用来扫描 GlobalTransactional 和 GlobalLock 注解,生成代理的
*/
@Bean
@DependsOn({BEAN_NAME_SPRING_APPLICATION_CONTEXT_PROVIDER, BEAN_NAME_FAILURE_HANDLER})
@ConditionalOnMissingBean(GlobalTransactionScanner.class)
public GlobalTransactionScanner globalTransactionScanner(SeataProperties seataProperties, FailureHandler failureHandler) {
if (LOGGER.isInfoEnabled()) {
LOGGER.info("Automatically configure Seata");
}
return new GlobalTransactionScanner(seataProperties.getApplicationId(), seataProperties.getTxServiceGroup(), failureHandler);
}
}
该配置类要生效的条件是seata.enabled值为true,该值默认就是为true,所以该配置类默认是生效的。
类中创建了两个bean,一个是FailureHandler,该类是client用来处理全局事务失败的程序。另一个就是核心的bean,全局事务扫描器,用来扫描 GlobalTransactional 和 GlobalLock 注解,当在方法上标注了GlobalTransactional 和 GlobalLock 注解,就会为该类生成代理,在方法的执行前后添加上seata的事务逻辑。
GlobalTransactionScanner继承了AbstractAutoProxyCreator类,而AbstractAutoProxyCreator时aop里面一个把目标对象转换成代理对象的一个后置处理器,用来生成AOP代理的。
生成代理的逻辑就在wrapIfNecessary方法中
@Override
protected Object wrapIfNecessary(Object bean, String beanName, Object cacheKey) {
try {
synchronized (PROXYED_SET) {
// 是否已代理过
if (PROXYED_SET.contains(beanName)) {
return bean;
}
// 每次生成代理对象时先置空
interceptor = null;
// 判断是否是 TCC 模式
if (TCCBeanParserUtils.isTccAutoProxy(bean, beanName, applicationContext)) {
//TCC interceptor, proxy bean of sofa:reference/dubbo:reference, and LocalTCC
interceptor = new TccActionInterceptor(TCCBeanParserUtils.getRemotingDesc(beanName));
ConfigurationCache.addConfigListener(ConfigurationKeys.DISABLE_GLOBAL_TRANSACTION,
(ConfigurationChangeListener)interceptor);
} else {
// 获取bean的原始类
Class<?> serviceInterface = SpringProxyUtils.findTargetClass(bean);
Class<?>[] interfacesIfJdk = SpringProxyUtils.findInterfaces(bean);
// 判断bean中是否有 GlobalTransactional 和 GlobalLock 注解
if (!existsAnnotation(new Class[]{serviceInterface})
&& !existsAnnotation(interfacesIfJdk)) {
return bean;
}
// GlobalTransactionalInterceptor 实现了 MethodInterceptor 接口,
// 可以看做一个advisor,包含了增强逻辑
if (globalTransactionalInterceptor == null) {
globalTransactionalInterceptor = new GlobalTransactionalInterceptor(failureHandlerHook);
ConfigurationCache.addConfigListener(
ConfigurationKeys.DISABLE_GLOBAL_TRANSACTION,
(ConfigurationChangeListener)globalTransactionalInterceptor);
}
interceptor = globalTransactionalInterceptor;
}
LOGGER.info("Bean[{}] with name [{}] would use interceptor [{}]", bean.getClass().getName(), beanName, interceptor.getClass().getName());
// 如果bean不是代理对象,则直接调用父类的wrapIfNecessary生成代理对象,
// 在父类中会调用getAdvicesAndAdvisorsForBean获取到上面定义的interceptor
if (!AopUtils.isAopProxy(bean)) {
bean = super.wrapIfNecessary(bean, beanName, cacheKey);
} else {
// 如果bean已经是代理对象,比如类里也有@Transaction注解,已经生成了代理,
// 此时无需再代理一层,只需将增强逻辑(advisor)添加到代理对象中即可
// 获取bean中的ProxyFactory,因为ProxyFactory中含有可以应用于该bean的所有advisor集合,
// 这里需要将seata的advisor添加到该集合中
AdvisedSupport advised = SpringProxyUtils.getAdvisedSupport(bean);
// 将GlobalTransactionalInterceptor封装为advisor,添加到AdvisedSupport中
Advisor[] advisor = buildAdvisors(beanName, getAdvicesAndAdvisorsForBean(null, null, null));
for (Advisor avr : advisor) {
advised.addAdvisor(0, avr);
}
}
// 将beanName放到已代理过的集合中
PROXYED_SET.add(beanName);
return bean;
}
} catch (Exception exx) {
throw new RuntimeException(exx);
}
}
当调用被@GlobalTransactional或@GlobalLock注解修饰的方法时,会调到代理对象,而增强逻辑在GlobalTransactionalInterceptor类的invoke方法里。而具体是如何增强的以及事务时如何执行的放在另一篇专门讲解。
初始化TM和RM
GlobalTransactionScanner还实现了InitializingBean接口,所以在初始化阶段还会调用afterPropertiesSet方法
@Override
public void afterPropertiesSet() {
if (disableGlobalTransaction) {
if (LOGGER.isInfoEnabled()) {
LOGGER.info("Global transaction is disabled.");
}
ConfigurationCache.addConfigListener(ConfigurationKeys.DISABLE_GLOBAL_TRANSACTION,
(ConfigurationChangeListener)this);
return;
}
// 如果seata客户端还未初始化,则进行初始化
if (initialized.compareAndSet(false, true)) {
initClient();
}
}
这里会对TM和RM进行初始化,本质上都是创建一个netty客户端,然后向tc注册
private void initClient() {
if (LOGGER.isInfoEnabled()) {
LOGGER.info("Initializing Global Transaction Clients ... ");
}
if (StringUtils.isNullOrEmpty(applicationId) || StringUtils.isNullOrEmpty(txServiceGroup)) {
throw new IllegalArgumentException(String.format("applicationId: %s, txServiceGroup: %s", applicationId, txServiceGroup));
}
// 初始化 TM,本质就是创建一个tm的netty客户端,然后向tc注册
TMClient.init(applicationId, txServiceGroup, accessKey, secretKey);
if (LOGGER.isInfoEnabled()) {
LOGGER.info("Transaction Manager Client is initialized. applicationId[{}] txServiceGroup[{}]", applicationId, txServiceGroup);
}
// 初始化 RM,本质就是创建一个rm的netty客户端,然后向tc注册
RMClient.init(applicationId, txServiceGroup);
if (LOGGER.isInfoEnabled()) {
LOGGER.info("Resource Manager is initialized. applicationId[{}] txServiceGroup[{}]", applicationId, txServiceGroup);
}
if (LOGGER.isInfoEnabled()) {
LOGGER.info("Global Transaction Clients are initialized. ");
}
registerSpringShutdownHook();
}
初始化 TM
public static void init(String applicationId, String transactionServiceGroup, String accessKey, String secretKey) {
// 获取TM客户端实例
TmNettyRemotingClient tmNettyRemotingClient = TmNettyRemotingClient.getInstance(applicationId, transactionServiceGroup, accessKey, secretKey);
// 初始化TM的netty客户端
tmNettyRemotingClient.init();
}
在获取TM客户端实例时,会创建netty客户端,但还未启动
@Override
public void init() {
// 注册相关处理器
registerProcessor();
if (initialized.compareAndSet(false, true)) {
// 调用父类初始化方法
super.init();
}
}
注册两个处理器,用来处理TC返回给TM的响应
private void registerProcessor() {
// 1.registry TC response processor
// 注册Seata-Server返回Response的处理Processor,用于处理由Client主动发起Request,
// Seata-Server返回的Response。
// ClientOnResponseProcessor负责把Client发送的Request和Seata-Server
// 返回的Response对应起来,从而实现Rpc
ClientOnResponseProcessor onResponseProcessor =
new ClientOnResponseProcessor(mergeMsgMap, super.getFutures(), getTransactionMessageHandler());
super.registerProcessor(MessageType.TYPE_SEATA_MERGE_RESULT, onResponseProcessor, null);
super.registerProcessor(MessageType.TYPE_GLOBAL_BEGIN_RESULT, onResponseProcessor, null);
super.registerProcessor(MessageType.TYPE_GLOBAL_COMMIT_RESULT, onResponseProcessor, null);
super.registerProcessor(MessageType.TYPE_GLOBAL_REPORT_RESULT, onResponseProcessor, null);
super.registerProcessor(MessageType.TYPE_GLOBAL_ROLLBACK_RESULT, onResponseProcessor, null);
super.registerProcessor(MessageType.TYPE_GLOBAL_STATUS_RESULT, onResponseProcessor, null);
super.registerProcessor(MessageType.TYPE_REG_CLT_RESULT, onResponseProcessor, null);
// 2.registry heartbeat message processor
// 处理Seata-Server返回的心跳消息
ClientHeartbeatProcessor clientHeartbeatProcessor = new ClientHeartbeatProcessor();
super.registerProcessor(MessageType.TYPE_HEARTBEAT_MSG, clientHeartbeatProcessor, null);
}
在父类AbstractNettyRemotingClient中启动TM的netty客户端,netty相关的不是关注的重点,所以不用深入分析。
@Override
public void init() {
// 定时重新发送 RegisterTMRequest(RM 客户端会发送 RegisterRMRequest)请求尝试连接服务端
timerExecutor.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
clientChannelManager.reconnect(getTransactionServiceGroup());
}
}, SCHEDULE_DELAY_MILLS, SCHEDULE_INTERVAL_MILLS, TimeUnit.MILLISECONDS);
if (NettyClientConfig.isEnableClientBatchSendRequest()) {
mergeSendExecutorService = new ThreadPoolExecutor(MAX_MERGE_SEND_THREAD,
MAX_MERGE_SEND_THREAD,
KEEP_ALIVE_TIME, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<>(),
new NamedThreadFactory(getThreadPrefix(), MAX_MERGE_SEND_THREAD));
mergeSendExecutorService.submit(new MergedSendRunnable());
}
super.init();
// 启动netty 客户端
clientBootstrap.start();
}
初始化RM
初始化过程跟TM一样,下面只贴出相关代码
public static void init(String applicationId, String transactionServiceGroup) {
// 创建RM的netty客户端
RmNettyRemotingClient rmNettyRemotingClient = RmNettyRemotingClient.getInstance(applicationId, transactionServiceGroup);
// 设置RM进去
rmNettyRemotingClient.setResourceManager(DefaultResourceManager.get());
rmNettyRemotingClient.setTransactionMessageHandler(DefaultRMHandler.get());
// 初始化
rmNettyRemotingClient.init();
}
@Override
public void init() {
// 注册处理器
registerProcessor();
if (initialized.compareAndSet(false, true)) {
super.init();
// Found one or more resources that were registered before initialization
if (resourceManager != null
&& !resourceManager.getManagedResources().isEmpty()
&& StringUtils.isNotBlank(transactionServiceGroup)) {
getClientChannelManager().reconnect(transactionServiceGroup);
}
}
}
private void registerProcessor() {
// 1.registry rm client handle branch commit processor
// 注册Seata-Server发起branchCommit的处理Processor
RmBranchCommitProcessor rmBranchCommitProcessor = new RmBranchCommitProcessor(getTransactionMessageHandler(), this);
super.registerProcessor(MessageType.TYPE_BRANCH_COMMIT, rmBranchCommitProcessor, messageExecutor);
// 2.registry rm client handle branch commit processor
// 注册Seata-Server发起branchRollback的处理Processor
RmBranchRollbackProcessor rmBranchRollbackProcessor = new RmBranchRollbackProcessor(getTransactionMessageHandler(), this);
super.registerProcessor(MessageType.TYPE_BRANCH_ROLLBACK, rmBranchRollbackProcessor, messageExecutor);
// 3.registry rm handler undo log processor
// 注册Seata-Server发起删除undoLog的处理Processor
RmUndoLogProcessor rmUndoLogProcessor = new RmUndoLogProcessor(getTransactionMessageHandler());
super.registerProcessor(MessageType.TYPE_RM_DELETE_UNDOLOG, rmUndoLogProcessor, messageExecutor);
// 4.registry TC response processor
// 注册Seata-Server返回Response的处理Processor,用于处理由Client主动发起Request,
// Seata-Server返回的Response。
// ClientOnResponseProcessor负责把Client发送的Request和Seata-Server
// 返回的Response对应起来,从而实现Rpc
ClientOnResponseProcessor onResponseProcessor =
new ClientOnResponseProcessor(mergeMsgMap, super.getFutures(), getTransactionMessageHandler());
super.registerProcessor(MessageType.TYPE_SEATA_MERGE_RESULT, onResponseProcessor, null);
super.registerProcessor(MessageType.TYPE_BRANCH_REGISTER_RESULT, onResponseProcessor, null);
super.registerProcessor(MessageType.TYPE_BRANCH_STATUS_REPORT_RESULT, onResponseProcessor, null);
super.registerProcessor(MessageType.TYPE_GLOBAL_LOCK_QUERY_RESULT, onResponseProcessor, null);
super.registerProcessor(MessageType.TYPE_REG_RM_RESULT, onResponseProcessor, null);
// 5.registry heartbeat message processor
// 处理Seata-Server返回的心跳消息
ClientHeartbeatProcessor clientHeartbeatProcessor = new ClientHeartbeatProcessor();
super.registerProcessor(MessageType.TYPE_HEARTBEAT_MSG, clientHeartbeatProcessor, null);
}
@Override
public void init() {
// 定时重新发送 RegisterTMRequest(RM 客户端会发送 RegisterRMRequest)请求尝试连接服务端
timerExecutor.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
clientChannelManager.reconnect(getTransactionServiceGroup());
}
}, SCHEDULE_DELAY_MILLS, SCHEDULE_INTERVAL_MILLS, TimeUnit.MILLISECONDS);
if (NettyClientConfig.isEnableClientBatchSendRequest()) {
mergeSendExecutorService = new ThreadPoolExecutor(MAX_MERGE_SEND_THREAD,
MAX_MERGE_SEND_THREAD,
KEEP_ALIVE_TIME, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<>(),
new NamedThreadFactory(getThreadPrefix(), MAX_MERGE_SEND_THREAD));
mergeSendExecutorService.submit(new MergedSendRunnable());
}
super.init();
// 启动netty 客户端
clientBootstrap.start();
}
总结来说初始化TM和RM做的事就是分别注册几个处理器以及启动各自的Netty客户端。
数据源代理
SeataDataSourceAutoConfiguration
@ConditionalOnBean(DataSource.class)
@ConditionalOnExpression("${seata.enable:true} && ${seata.enableAutoDataSourceProxy:true} && ${seata.enable-auto-data-source-proxy:true}")
public class SeataDataSourceAutoConfiguration {
/**
* The bean seataDataSourceBeanPostProcessor.
*/
@Bean(BEAN_NAME_SEATA_DATA_SOURCE_BEAN_POST_PROCESSOR)
@ConditionalOnMissingBean(SeataDataSourceBeanPostProcessor.class)
public SeataDataSourceBeanPostProcessor seataDataSourceBeanPostProcessor(SeataProperties seataProperties) {
return new SeataDataSourceBeanPostProcessor(seataProperties.getExcludesForAutoProxying(), seataProperties.getDataSourceProxyMode());
}
/**
* 负责为Spring中的所有DataSource生成代理对象,从而拦截SQL的执行,在SQL执行前后实现seata的逻辑
*/
@Bean(BEAN_NAME_SEATA_AUTO_DATA_SOURCE_PROXY_CREATOR)
@ConditionalOnMissingBean(SeataAutoDataSourceProxyCreator.class)
public SeataAutoDataSourceProxyCreator seataAutoDataSourceProxyCreator(SeataProperties seataProperties) {
return new SeataAutoDataSourceProxyCreator(seataProperties.isUseJdkProxy(),
seataProperties.getExcludesForAutoProxying(), seataProperties.getDataSourceProxyMode());
}
}
该配置类要生效的条件是${seata.enable:true} && ${seata.enableAutoDataSourceProxy:true} && ${seata.enable-auto-data-source-proxy:true}这几个配置都为true,但是我在配置文件中都设置为true后也没生效,不知道哪里问题,所以我换一种方式,在启动类上添加@EnableAutoDataSourceProxy注解。
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Import(AutoDataSourceProxyRegistrar.class)
@Documented
public @interface EnableAutoDataSourceProxy {
/**
* Whether use JDK proxy instead of CGLIB proxy
*
* @return useJdkProxy
*/
boolean useJdkProxy() default false;
/**
* Specifies which datasource bean are not eligible for auto-proxying
*
* @return excludes
*/
String[] excludes() default {};
/**
* Data source proxy mode, AT or XA
*
* @return dataSourceProxyMode
*/
String dataSourceProxyMode() default "AT";
}
该注解上导入了另一个类AutoDataSourceProxyRegistrar
public class AutoDataSourceProxyRegistrar implements ImportBeanDefinitionRegistrar {
private static final String ATTRIBUTE_KEY_USE_JDK_PROXY = "useJdkProxy";
private static final String ATTRIBUTE_KEY_EXCLUDES = "excludes";
private static final String ATTRIBUTE_KEY_DATA_SOURCE_PROXY_MODE = "dataSourceProxyMode";
public static final String BEAN_NAME_SEATA_DATA_SOURCE_BEAN_POST_PROCESSOR = "seataDataSourceBeanPostProcessor";
public static final String BEAN_NAME_SEATA_AUTO_DATA_SOURCE_PROXY_CREATOR = "seataAutoDataSourceProxyCreator";
@Override
public void registerBeanDefinitions(AnnotationMetadata importingClassMetadata, BeanDefinitionRegistry registry) {
Map<String, Object> annotationAttributes = importingClassMetadata.getAnnotationAttributes(EnableAutoDataSourceProxy.class.getName());
boolean useJdkProxy = Boolean.parseBoolean(annotationAttributes.get(ATTRIBUTE_KEY_USE_JDK_PROXY).toString());
String[] excludes = (String[]) annotationAttributes.get(ATTRIBUTE_KEY_EXCLUDES);
String dataSourceProxyMode = (String) annotationAttributes.get(ATTRIBUTE_KEY_DATA_SOURCE_PROXY_MODE);
//register seataDataSourceBeanPostProcessor bean def
if (!registry.containsBeanDefinition(BEAN_NAME_SEATA_DATA_SOURCE_BEAN_POST_PROCESSOR)) {
AbstractBeanDefinition beanDefinition = BeanDefinitionBuilder
.genericBeanDefinition(SeataDataSourceBeanPostProcessor.class)
.addConstructorArgValue(excludes)
.addConstructorArgValue(dataSourceProxyMode)
.getBeanDefinition();
registry.registerBeanDefinition(BEAN_NAME_SEATA_DATA_SOURCE_BEAN_POST_PROCESSOR, beanDefinition);
}
//register seataAutoDataSourceProxyCreator bean def
if (!registry.containsBeanDefinition(BEAN_NAME_SEATA_AUTO_DATA_SOURCE_PROXY_CREATOR)) {
AbstractBeanDefinition beanDefinition = BeanDefinitionBuilder
.genericBeanDefinition(SeataAutoDataSourceProxyCreator.class)
.addConstructorArgValue(useJdkProxy)
.addConstructorArgValue(excludes)
.addConstructorArgValue(dataSourceProxyMode)
.getBeanDefinition();
registry.registerBeanDefinition(BEAN_NAME_SEATA_AUTO_DATA_SOURCE_PROXY_CREATOR, beanDefinition);
}
}
}
AutoDataSourceProxyRegistrar实现了ImportBeanDefinitionRegistrar接口,这样我们就知道该类额外注册了BeanDefinition。通过源码可知,该类注册了两个bean,分别是SeataDataSourceBeanPostProcessor和SeataAutoDataSourceProxyCreator。
SeataDataSourceBeanPostProcessor
该类是一个BeanPostProcessor,主要就是用来生成数据源代理的
public class SeataDataSourceBeanPostProcessor implements BeanPostProcessor {
private static final Logger LOGGER = LoggerFactory.getLogger(SeataDataSourceBeanPostProcessor.class);
private final List<String> excludes;
private final BranchType dataSourceProxyMode;
public SeataDataSourceBeanPostProcessor(String[] excludes, String dataSourceProxyMode) {
this.excludes = Arrays.asList(excludes);
this.dataSourceProxyMode = BranchType.XA.name().equalsIgnoreCase(dataSourceProxyMode) ? BranchType.XA : BranchType.AT;
}
@Override
public Object postProcessBeforeInitialization(Object bean, String beanName) {
return bean;
}
@Override
public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {
if (bean instanceof DataSource) {
//When not in the excludes, put and init proxy.
if (!excludes.contains(bean.getClass().getName())) {
// 这里只是生成代理,并不返回代理,返回的还是真实数据源,
// 毕竟不是每个sql都需要代理,在需要使用代理的时候再取出来
DataSourceProxyHolder.get().putDataSource((DataSource) bean, dataSourceProxyMode);
}
// 如果是代理数据源,则返回真实数据源
if (bean instanceof SeataDataSourceProxy) {
LOGGER.info("Unwrap the bean of the data source," +
" and return the original data source to replace the data source proxy.");
return ((SeataDataSourceProxy) bean).getTargetDataSource();
}
}
return bean;
}
}
DataSourceProxyHolder是用来存放代理数据源的,如果当前bean是DataSource,则会为该DataSource生成一个代理DataSource。在putDataSource方法中,会进行数据源代理类的创建,当然,该方法除了创建数据源代理,获取数据源代理也是调用这个方法。
public SeataDataSourceProxy putDataSource(DataSource dataSource, BranchType dataSourceProxyMode) {
DataSource originalDataSource;
// 如果已经是代理数据源并且事务模式也跟想要的一样,则直接返回了
if (dataSource instanceof SeataDataSourceProxy) {
SeataDataSourceProxy dataSourceProxy = (SeataDataSourceProxy) dataSource;
// 就是想要的代理类就直接返回了
if (dataSourceProxyMode == dataSourceProxy.getBranchType()) {
return (SeataDataSourceProxy) dataSource;
}
// 获取原数据源,下面根据该数据源创建或获取数据源代理类
originalDataSource = dataSourceProxy.getTargetDataSource();
} else {
originalDataSource = dataSource;
}
// 从缓存中获取真实数据源对应的代理
SeataDataSourceProxy dsProxy = dataSourceProxyMap.get(originalDataSource);
if (dsProxy == null) {
synchronized (dataSourceProxyMap) {
dsProxy = dataSourceProxyMap.get(originalDataSource);
if (dsProxy == null) {
// 没获取到就根据事务模式和真实数据源创建一个代理
dsProxy = createDsProxyByMode(dataSourceProxyMode, originalDataSource);
// 放进缓存
dataSourceProxyMap.put(originalDataSource, dsProxy);
}
}
}
return dsProxy;
}
XA模式就创建DataSourceProxyXA,其他模式创建DataSourceProx。
private SeataDataSourceProxy createDsProxyByMode(BranchType mode, DataSource originDs) {
return BranchType.XA == mode ? new DataSourceProxyXA(originDs) : new DataSourceProxy(originDs);
}
SeataAutoDataSourceProxyCreator
上面为每个数据源生成了seata的代理对象,但是该代理对象并不能通过AOP切入,所以还是需要一个AOP代理对象。SeataAutoDataSourceProxyCreator也是继承了AbstractAutoProxyCreator类,以前解析过AOP源码,可以知道继承该类就可以对指定的bean生成AOP代理 。
public class SeataAutoDataSourceProxyCreator extends AbstractAutoProxyCreator {
private static final Logger LOGGER = LoggerFactory.getLogger(SeataAutoDataSourceProxyCreator.class);
private final List<String> excludes;
private final Advisor advisor;
public SeataAutoDataSourceProxyCreator(boolean useJdkProxy, String[] excludes, String dataSourceProxyMode) {
this.excludes = Arrays.asList(excludes);
this.advisor = new DefaultIntroductionAdvisor(new SeataAutoDataSourceProxyAdvice(dataSourceProxyMode));
setProxyTargetClass(!useJdkProxy);
}
@Override
protected Object[] getAdvicesAndAdvisorsForBean(Class<?> beanClass, String beanName, TargetSource customTargetSource) throws BeansException {
if (LOGGER.isInfoEnabled()) {
LOGGER.info("Auto proxy of [{}]", beanName);
}
return new Object[]{advisor};
}
@Override
protected boolean shouldSkip(Class<?> beanClass, String beanName) {
// 这个类只对DataSource生成代理
return !DataSource.class.isAssignableFrom(beanClass) ||
SeataProxy.class.isAssignableFrom(beanClass) ||
excludes.contains(beanClass.getName());
}
}
从shouldSkip方法可知,只会对DataSource生成代理,而它添加的增强逻辑在SeataAutoDataSourceProxyAdvice内。
/**
* 对DataSource进行增强,代理DataSource中的方法
*
* @author xingfudeshi@gmail.com
*/
public class SeataAutoDataSourceProxyAdvice implements MethodInterceptor, IntroductionInfo {
private final BranchType dataSourceProxyMode;
private final Class<? extends SeataDataSourceProxy> dataSourceProxyClazz;
public SeataAutoDataSourceProxyAdvice(String dataSourceProxyMode) {
if (BranchType.AT.name().equalsIgnoreCase(dataSourceProxyMode)) {
this.dataSourceProxyMode = BranchType.AT;
this.dataSourceProxyClazz = DataSourceProxy.class;
} else if (BranchType.XA.name().equalsIgnoreCase(dataSourceProxyMode)) {
this.dataSourceProxyMode = BranchType.XA;
this.dataSourceProxyClazz = DataSourceProxyXA.class;
} else {
throw new IllegalArgumentException("Unknown dataSourceProxyMode: " + dataSourceProxyMode);
}
//Set the default branch type in the RootContext.
RootContext.setDefaultBranchType(this.dataSourceProxyMode);
}
@Override
public Object invoke(MethodInvocation invocation) throws Throwable {
// 如果不是在@GlobalLock方法或事务模式跟当前的不匹配,则直接调用原方法
if (!RootContext.requireGlobalLock() && dataSourceProxyMode != RootContext.getBranchType()) {
return invocation.proceed();
}
Method method = invocation.getMethod();
Object[] args = invocation.getArguments();
Method m = BeanUtils.findDeclaredMethod(dataSourceProxyClazz, method.getName(), method.getParameterTypes());
if (m != null && DataSource.class.isAssignableFrom(method.getDeclaringClass())) {
// 获取seata创建的代理数据源,调用代理数据源的方法
SeataDataSourceProxy dataSourceProxy = DataSourceProxyHolder.get().putDataSource((DataSource) invocation.getThis(), dataSourceProxyMode);
return m.invoke(dataSourceProxy, args);
} else {
return invocation.proceed();
}
}
@Override
public Class<?>[] getInterfaces() {
return new Class[]{SeataProxy.class};
}
}
当调用DataSource的方法时,就会通过AOP代理对象调用到SeataDataSourceProxy实现类的方法,即seata的代理。