Seata AT模式源码解析二(Seata Client端启动流程)

news2024/11/24 3:19:50

文章目录

  • 初始化TM和RM
  • 数据源代理

由于我们一般都是在springboot中使用的,而与springboot集成的我们一般就先看starter的spring.factories文件,看看它的自动装配
在这里插入图片描述
在这里插入图片描述
这里面主要关注SeataAutoConfiguration和SeataDataSourceAutoConfiguration。

SeataAutoConfiguration

@ConditionalOnProperty(prefix = SEATA_PREFIX, name = "enabled", havingValue = "true", matchIfMissing = true)
public class SeataAutoConfiguration {
    private static final Logger LOGGER = LoggerFactory.getLogger(SeataAutoConfiguration.class);

    /**
     * 默认的失败处理程序
     */
    @Bean(BEAN_NAME_FAILURE_HANDLER)
    @ConditionalOnMissingBean(FailureHandler.class)
    public FailureHandler failureHandler() {
        return new DefaultFailureHandlerImpl();
    }

    /**
     * 全局事务扫描器,用来扫描 GlobalTransactional 和 GlobalLock 注解,生成代理的
     */
    @Bean
    @DependsOn({BEAN_NAME_SPRING_APPLICATION_CONTEXT_PROVIDER, BEAN_NAME_FAILURE_HANDLER})
    @ConditionalOnMissingBean(GlobalTransactionScanner.class)
    public GlobalTransactionScanner globalTransactionScanner(SeataProperties seataProperties, FailureHandler failureHandler) {
        if (LOGGER.isInfoEnabled()) {
            LOGGER.info("Automatically configure Seata");
        }
        return new GlobalTransactionScanner(seataProperties.getApplicationId(), seataProperties.getTxServiceGroup(), failureHandler);
    }
}

该配置类要生效的条件是seata.enabled值为true,该值默认就是为true,所以该配置类默认是生效的。
类中创建了两个bean,一个是FailureHandler,该类是client用来处理全局事务失败的程序。另一个就是核心的bean,全局事务扫描器,用来扫描 GlobalTransactional 和 GlobalLock 注解,当在方法上标注了GlobalTransactional 和 GlobalLock 注解,就会为该类生成代理,在方法的执行前后添加上seata的事务逻辑。
GlobalTransactionScanner继承了AbstractAutoProxyCreator类,而AbstractAutoProxyCreator时aop里面一个把目标对象转换成代理对象的一个后置处理器,用来生成AOP代理的。
生成代理的逻辑就在wrapIfNecessary方法中

@Override
protected Object wrapIfNecessary(Object bean, String beanName, Object cacheKey) {
    try {
        synchronized (PROXYED_SET) {
            // 是否已代理过
            if (PROXYED_SET.contains(beanName)) {
                return bean;
            }
            // 每次生成代理对象时先置空
            interceptor = null;
            // 判断是否是 TCC 模式
            if (TCCBeanParserUtils.isTccAutoProxy(bean, beanName, applicationContext)) {
                //TCC interceptor, proxy bean of sofa:reference/dubbo:reference, and LocalTCC
                interceptor = new TccActionInterceptor(TCCBeanParserUtils.getRemotingDesc(beanName));
                ConfigurationCache.addConfigListener(ConfigurationKeys.DISABLE_GLOBAL_TRANSACTION,
                                                     (ConfigurationChangeListener)interceptor);
            } else {
                // 获取bean的原始类
                Class<?> serviceInterface = SpringProxyUtils.findTargetClass(bean);
                Class<?>[] interfacesIfJdk = SpringProxyUtils.findInterfaces(bean);

                // 判断bean中是否有 GlobalTransactional 和 GlobalLock 注解
                if (!existsAnnotation(new Class[]{serviceInterface})
                    && !existsAnnotation(interfacesIfJdk)) {
                    return bean;
                }

                // GlobalTransactionalInterceptor 实现了 MethodInterceptor 接口,
                // 可以看做一个advisor,包含了增强逻辑
                if (globalTransactionalInterceptor == null) {
                    globalTransactionalInterceptor = new GlobalTransactionalInterceptor(failureHandlerHook);
                    ConfigurationCache.addConfigListener(
                        ConfigurationKeys.DISABLE_GLOBAL_TRANSACTION,
                        (ConfigurationChangeListener)globalTransactionalInterceptor);
                }
                interceptor = globalTransactionalInterceptor;
            }

            LOGGER.info("Bean[{}] with name [{}] would use interceptor [{}]", bean.getClass().getName(), beanName, interceptor.getClass().getName());
            // 如果bean不是代理对象,则直接调用父类的wrapIfNecessary生成代理对象,
            // 在父类中会调用getAdvicesAndAdvisorsForBean获取到上面定义的interceptor
            if (!AopUtils.isAopProxy(bean)) {
                bean = super.wrapIfNecessary(bean, beanName, cacheKey);
            } else {
                // 如果bean已经是代理对象,比如类里也有@Transaction注解,已经生成了代理,
                // 此时无需再代理一层,只需将增强逻辑(advisor)添加到代理对象中即可

                // 获取bean中的ProxyFactory,因为ProxyFactory中含有可以应用于该bean的所有advisor集合,
                // 这里需要将seata的advisor添加到该集合中
                AdvisedSupport advised = SpringProxyUtils.getAdvisedSupport(bean);
                // 将GlobalTransactionalInterceptor封装为advisor,添加到AdvisedSupport中
                Advisor[] advisor = buildAdvisors(beanName, getAdvicesAndAdvisorsForBean(null, null, null));
                for (Advisor avr : advisor) {
                    advised.addAdvisor(0, avr);
                }
            }
            // 将beanName放到已代理过的集合中
            PROXYED_SET.add(beanName);
            return bean;
        }
    } catch (Exception exx) {
        throw new RuntimeException(exx);
    }
}

当调用被@GlobalTransactional或@GlobalLock注解修饰的方法时,会调到代理对象,而增强逻辑在GlobalTransactionalInterceptor类的invoke方法里。而具体是如何增强的以及事务时如何执行的放在另一篇专门讲解。

初始化TM和RM

GlobalTransactionScanner还实现了InitializingBean接口,所以在初始化阶段还会调用afterPropertiesSet方法

@Override
public void afterPropertiesSet() {
    if (disableGlobalTransaction) {
        if (LOGGER.isInfoEnabled()) {
            LOGGER.info("Global transaction is disabled.");
        }
        ConfigurationCache.addConfigListener(ConfigurationKeys.DISABLE_GLOBAL_TRANSACTION,
                                             (ConfigurationChangeListener)this);
        return;
    }
    // 如果seata客户端还未初始化,则进行初始化
    if (initialized.compareAndSet(false, true)) {
        initClient();
    }
}

这里会对TM和RM进行初始化,本质上都是创建一个netty客户端,然后向tc注册

private void initClient() {
    if (LOGGER.isInfoEnabled()) {
        LOGGER.info("Initializing Global Transaction Clients ... ");
    }
    if (StringUtils.isNullOrEmpty(applicationId) || StringUtils.isNullOrEmpty(txServiceGroup)) {
        throw new IllegalArgumentException(String.format("applicationId: %s, txServiceGroup: %s", applicationId, txServiceGroup));
    }
    // 初始化 TM,本质就是创建一个tm的netty客户端,然后向tc注册
    TMClient.init(applicationId, txServiceGroup, accessKey, secretKey);
    if (LOGGER.isInfoEnabled()) {
        LOGGER.info("Transaction Manager Client is initialized. applicationId[{}] txServiceGroup[{}]", applicationId, txServiceGroup);
    }
    // 初始化 RM,本质就是创建一个rm的netty客户端,然后向tc注册
    RMClient.init(applicationId, txServiceGroup);
    if (LOGGER.isInfoEnabled()) {
        LOGGER.info("Resource Manager is initialized. applicationId[{}] txServiceGroup[{}]", applicationId, txServiceGroup);
    }

    if (LOGGER.isInfoEnabled()) {
        LOGGER.info("Global Transaction Clients are initialized. ");
    }
    registerSpringShutdownHook();

}

初始化 TM

public static void init(String applicationId, String transactionServiceGroup, String accessKey, String secretKey) {
    // 获取TM客户端实例
    TmNettyRemotingClient tmNettyRemotingClient = TmNettyRemotingClient.getInstance(applicationId, transactionServiceGroup, accessKey, secretKey);
    // 初始化TM的netty客户端
    tmNettyRemotingClient.init();
}

在获取TM客户端实例时,会创建netty客户端,但还未启动

@Override
public void init() {
    // 注册相关处理器
    registerProcessor();
    if (initialized.compareAndSet(false, true)) {
        // 调用父类初始化方法
        super.init();
    }
}

注册两个处理器,用来处理TC返回给TM的响应

private void registerProcessor() {
    // 1.registry TC response processor
    // 注册Seata-Server返回Response的处理Processor,用于处理由Client主动发起Request,
    // Seata-Server返回的Response。
    // ClientOnResponseProcessor负责把Client发送的Request和Seata-Server
    // 返回的Response对应起来,从而实现Rpc
    ClientOnResponseProcessor onResponseProcessor =
        new ClientOnResponseProcessor(mergeMsgMap, super.getFutures(), getTransactionMessageHandler());
    super.registerProcessor(MessageType.TYPE_SEATA_MERGE_RESULT, onResponseProcessor, null);
    super.registerProcessor(MessageType.TYPE_GLOBAL_BEGIN_RESULT, onResponseProcessor, null);
    super.registerProcessor(MessageType.TYPE_GLOBAL_COMMIT_RESULT, onResponseProcessor, null);
    super.registerProcessor(MessageType.TYPE_GLOBAL_REPORT_RESULT, onResponseProcessor, null);
    super.registerProcessor(MessageType.TYPE_GLOBAL_ROLLBACK_RESULT, onResponseProcessor, null);
    super.registerProcessor(MessageType.TYPE_GLOBAL_STATUS_RESULT, onResponseProcessor, null);
    super.registerProcessor(MessageType.TYPE_REG_CLT_RESULT, onResponseProcessor, null);
    // 2.registry heartbeat message processor
    // 处理Seata-Server返回的心跳消息
    ClientHeartbeatProcessor clientHeartbeatProcessor = new ClientHeartbeatProcessor();
    super.registerProcessor(MessageType.TYPE_HEARTBEAT_MSG, clientHeartbeatProcessor, null);
}

在父类AbstractNettyRemotingClient中启动TM的netty客户端,netty相关的不是关注的重点,所以不用深入分析。

@Override
public void init() {
    // 定时重新发送 RegisterTMRequest(RM 客户端会发送 RegisterRMRequest)请求尝试连接服务端
    timerExecutor.scheduleAtFixedRate(new Runnable() {
        @Override
        public void run() {
            clientChannelManager.reconnect(getTransactionServiceGroup());
        }
    }, SCHEDULE_DELAY_MILLS, SCHEDULE_INTERVAL_MILLS, TimeUnit.MILLISECONDS);
    if (NettyClientConfig.isEnableClientBatchSendRequest()) {
        mergeSendExecutorService = new ThreadPoolExecutor(MAX_MERGE_SEND_THREAD,
                                                          MAX_MERGE_SEND_THREAD,
                                                          KEEP_ALIVE_TIME, TimeUnit.MILLISECONDS,
                                                          new LinkedBlockingQueue<>(),
                                                          new NamedThreadFactory(getThreadPrefix(), MAX_MERGE_SEND_THREAD));
        mergeSendExecutorService.submit(new MergedSendRunnable());
    }
    super.init();
    // 启动netty 客户端
    clientBootstrap.start();
}

初始化RM
初始化过程跟TM一样,下面只贴出相关代码

public static void init(String applicationId, String transactionServiceGroup) {
    // 创建RM的netty客户端
    RmNettyRemotingClient rmNettyRemotingClient = RmNettyRemotingClient.getInstance(applicationId, transactionServiceGroup);
    // 设置RM进去
    rmNettyRemotingClient.setResourceManager(DefaultResourceManager.get());
    rmNettyRemotingClient.setTransactionMessageHandler(DefaultRMHandler.get());
    // 初始化
    rmNettyRemotingClient.init();
}
@Override
public void init() {
    // 注册处理器
    registerProcessor();
    if (initialized.compareAndSet(false, true)) {
        super.init();

        // Found one or more resources that were registered before initialization
        if (resourceManager != null
            && !resourceManager.getManagedResources().isEmpty()
            && StringUtils.isNotBlank(transactionServiceGroup)) {
            getClientChannelManager().reconnect(transactionServiceGroup);
        }
    }
}
private void registerProcessor() {
    // 1.registry rm client handle branch commit processor
    // 注册Seata-Server发起branchCommit的处理Processor
    RmBranchCommitProcessor rmBranchCommitProcessor = new RmBranchCommitProcessor(getTransactionMessageHandler(), this);
    super.registerProcessor(MessageType.TYPE_BRANCH_COMMIT, rmBranchCommitProcessor, messageExecutor);

    // 2.registry rm client handle branch commit processor
    // 注册Seata-Server发起branchRollback的处理Processor
    RmBranchRollbackProcessor rmBranchRollbackProcessor = new RmBranchRollbackProcessor(getTransactionMessageHandler(), this);
    super.registerProcessor(MessageType.TYPE_BRANCH_ROLLBACK, rmBranchRollbackProcessor, messageExecutor);

    // 3.registry rm handler undo log processor
    // 注册Seata-Server发起删除undoLog的处理Processor
    RmUndoLogProcessor rmUndoLogProcessor = new RmUndoLogProcessor(getTransactionMessageHandler());
    super.registerProcessor(MessageType.TYPE_RM_DELETE_UNDOLOG, rmUndoLogProcessor, messageExecutor);

    // 4.registry TC response processor
    // 注册Seata-Server返回Response的处理Processor,用于处理由Client主动发起Request,
    // Seata-Server返回的Response。
    // ClientOnResponseProcessor负责把Client发送的Request和Seata-Server
    // 返回的Response对应起来,从而实现Rpc
    ClientOnResponseProcessor onResponseProcessor =
        new ClientOnResponseProcessor(mergeMsgMap, super.getFutures(), getTransactionMessageHandler());
    super.registerProcessor(MessageType.TYPE_SEATA_MERGE_RESULT, onResponseProcessor, null);
    super.registerProcessor(MessageType.TYPE_BRANCH_REGISTER_RESULT, onResponseProcessor, null);
    super.registerProcessor(MessageType.TYPE_BRANCH_STATUS_REPORT_RESULT, onResponseProcessor, null);
    super.registerProcessor(MessageType.TYPE_GLOBAL_LOCK_QUERY_RESULT, onResponseProcessor, null);
    super.registerProcessor(MessageType.TYPE_REG_RM_RESULT, onResponseProcessor, null);

    // 5.registry heartbeat message processor
    // 处理Seata-Server返回的心跳消息
    ClientHeartbeatProcessor clientHeartbeatProcessor = new ClientHeartbeatProcessor();
    super.registerProcessor(MessageType.TYPE_HEARTBEAT_MSG, clientHeartbeatProcessor, null);
}
@Override
public void init() {
    // 定时重新发送 RegisterTMRequest(RM 客户端会发送 RegisterRMRequest)请求尝试连接服务端
    timerExecutor.scheduleAtFixedRate(new Runnable() {
        @Override
        public void run() {
            clientChannelManager.reconnect(getTransactionServiceGroup());
        }
    }, SCHEDULE_DELAY_MILLS, SCHEDULE_INTERVAL_MILLS, TimeUnit.MILLISECONDS);
    if (NettyClientConfig.isEnableClientBatchSendRequest()) {
        mergeSendExecutorService = new ThreadPoolExecutor(MAX_MERGE_SEND_THREAD,
                                                          MAX_MERGE_SEND_THREAD,
                                                          KEEP_ALIVE_TIME, TimeUnit.MILLISECONDS,
                                                          new LinkedBlockingQueue<>(),
                                                          new NamedThreadFactory(getThreadPrefix(), MAX_MERGE_SEND_THREAD));
        mergeSendExecutorService.submit(new MergedSendRunnable());
    }
    super.init();
    // 启动netty 客户端
    clientBootstrap.start();
}

总结来说初始化TM和RM做的事就是分别注册几个处理器以及启动各自的Netty客户端。

数据源代理

SeataDataSourceAutoConfiguration

@ConditionalOnBean(DataSource.class)
@ConditionalOnExpression("${seata.enable:true} && ${seata.enableAutoDataSourceProxy:true} && ${seata.enable-auto-data-source-proxy:true}")
public class SeataDataSourceAutoConfiguration {

    /**
     * The bean seataDataSourceBeanPostProcessor.
     */
    @Bean(BEAN_NAME_SEATA_DATA_SOURCE_BEAN_POST_PROCESSOR)
    @ConditionalOnMissingBean(SeataDataSourceBeanPostProcessor.class)
    public SeataDataSourceBeanPostProcessor seataDataSourceBeanPostProcessor(SeataProperties seataProperties) {
        return new SeataDataSourceBeanPostProcessor(seataProperties.getExcludesForAutoProxying(), seataProperties.getDataSourceProxyMode());
    }

    /**
     * 负责为Spring中的所有DataSource生成代理对象,从而拦截SQL的执行,在SQL执行前后实现seata的逻辑
     */
    @Bean(BEAN_NAME_SEATA_AUTO_DATA_SOURCE_PROXY_CREATOR)
    @ConditionalOnMissingBean(SeataAutoDataSourceProxyCreator.class)
    public SeataAutoDataSourceProxyCreator seataAutoDataSourceProxyCreator(SeataProperties seataProperties) {
        return new SeataAutoDataSourceProxyCreator(seataProperties.isUseJdkProxy(),
            seataProperties.getExcludesForAutoProxying(), seataProperties.getDataSourceProxyMode());
    }
}

该配置类要生效的条件是${seata.enable:true} && ${seata.enableAutoDataSourceProxy:true} && ${seata.enable-auto-data-source-proxy:true}这几个配置都为true,但是我在配置文件中都设置为true后也没生效,不知道哪里问题,所以我换一种方式,在启动类上添加@EnableAutoDataSourceProxy注解。

@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Import(AutoDataSourceProxyRegistrar.class)
@Documented
public @interface EnableAutoDataSourceProxy {
    /**
     * Whether use JDK proxy instead of CGLIB proxy
     *
     * @return useJdkProxy
     */
    boolean useJdkProxy() default false;

    /**
     * Specifies which datasource bean are not eligible for auto-proxying
     *
     * @return excludes
     */
    String[] excludes() default {};

    /**
     * Data source proxy mode, AT or XA
     *
     * @return dataSourceProxyMode
     */
    String dataSourceProxyMode() default "AT";
}

该注解上导入了另一个类AutoDataSourceProxyRegistrar

public class AutoDataSourceProxyRegistrar implements ImportBeanDefinitionRegistrar {
    private static final String ATTRIBUTE_KEY_USE_JDK_PROXY = "useJdkProxy";
    private static final String ATTRIBUTE_KEY_EXCLUDES = "excludes";
    private static final String ATTRIBUTE_KEY_DATA_SOURCE_PROXY_MODE = "dataSourceProxyMode";

    public static final String BEAN_NAME_SEATA_DATA_SOURCE_BEAN_POST_PROCESSOR = "seataDataSourceBeanPostProcessor";
    public static final String BEAN_NAME_SEATA_AUTO_DATA_SOURCE_PROXY_CREATOR = "seataAutoDataSourceProxyCreator";

    @Override
    public void registerBeanDefinitions(AnnotationMetadata importingClassMetadata, BeanDefinitionRegistry registry) {
        Map<String, Object> annotationAttributes = importingClassMetadata.getAnnotationAttributes(EnableAutoDataSourceProxy.class.getName());

        boolean useJdkProxy = Boolean.parseBoolean(annotationAttributes.get(ATTRIBUTE_KEY_USE_JDK_PROXY).toString());
        String[] excludes = (String[]) annotationAttributes.get(ATTRIBUTE_KEY_EXCLUDES);
        String dataSourceProxyMode = (String) annotationAttributes.get(ATTRIBUTE_KEY_DATA_SOURCE_PROXY_MODE);

        //register seataDataSourceBeanPostProcessor bean def
        if (!registry.containsBeanDefinition(BEAN_NAME_SEATA_DATA_SOURCE_BEAN_POST_PROCESSOR)) {
            AbstractBeanDefinition beanDefinition = BeanDefinitionBuilder
                .genericBeanDefinition(SeataDataSourceBeanPostProcessor.class)
                .addConstructorArgValue(excludes)
                .addConstructorArgValue(dataSourceProxyMode)
                .getBeanDefinition();
            registry.registerBeanDefinition(BEAN_NAME_SEATA_DATA_SOURCE_BEAN_POST_PROCESSOR, beanDefinition);
        }

        //register seataAutoDataSourceProxyCreator bean def
        if (!registry.containsBeanDefinition(BEAN_NAME_SEATA_AUTO_DATA_SOURCE_PROXY_CREATOR)) {
            AbstractBeanDefinition beanDefinition = BeanDefinitionBuilder
                .genericBeanDefinition(SeataAutoDataSourceProxyCreator.class)
                .addConstructorArgValue(useJdkProxy)
                .addConstructorArgValue(excludes)
                .addConstructorArgValue(dataSourceProxyMode)
                .getBeanDefinition();
            registry.registerBeanDefinition(BEAN_NAME_SEATA_AUTO_DATA_SOURCE_PROXY_CREATOR, beanDefinition);
        }
    }
}

AutoDataSourceProxyRegistrar实现了ImportBeanDefinitionRegistrar接口,这样我们就知道该类额外注册了BeanDefinition。通过源码可知,该类注册了两个bean,分别是SeataDataSourceBeanPostProcessor和SeataAutoDataSourceProxyCreator。

SeataDataSourceBeanPostProcessor
该类是一个BeanPostProcessor,主要就是用来生成数据源代理的

public class SeataDataSourceBeanPostProcessor implements BeanPostProcessor {

    private static final Logger LOGGER = LoggerFactory.getLogger(SeataDataSourceBeanPostProcessor.class);

    private final List<String> excludes;
    private final BranchType dataSourceProxyMode;

    public SeataDataSourceBeanPostProcessor(String[] excludes, String dataSourceProxyMode) {
        this.excludes = Arrays.asList(excludes);
        this.dataSourceProxyMode = BranchType.XA.name().equalsIgnoreCase(dataSourceProxyMode) ? BranchType.XA : BranchType.AT;
    }

    @Override
    public Object postProcessBeforeInitialization(Object bean, String beanName) {
        return bean;
    }

    @Override
    public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {
        if (bean instanceof DataSource) {
            //When not in the excludes, put and init proxy.
            if (!excludes.contains(bean.getClass().getName())) {
                // 这里只是生成代理,并不返回代理,返回的还是真实数据源,
                // 毕竟不是每个sql都需要代理,在需要使用代理的时候再取出来
                DataSourceProxyHolder.get().putDataSource((DataSource) bean, dataSourceProxyMode);
            }

            // 如果是代理数据源,则返回真实数据源
            if (bean instanceof SeataDataSourceProxy) {
                LOGGER.info("Unwrap the bean of the data source," +
                    " and return the original data source to replace the data source proxy.");
                return ((SeataDataSourceProxy) bean).getTargetDataSource();
            }
        }
        return bean;
    }
}

DataSourceProxyHolder是用来存放代理数据源的,如果当前bean是DataSource,则会为该DataSource生成一个代理DataSource。在putDataSource方法中,会进行数据源代理类的创建,当然,该方法除了创建数据源代理,获取数据源代理也是调用这个方法。

public SeataDataSourceProxy putDataSource(DataSource dataSource, BranchType dataSourceProxyMode) {
    DataSource originalDataSource;
    // 如果已经是代理数据源并且事务模式也跟想要的一样,则直接返回了
    if (dataSource instanceof SeataDataSourceProxy) {
        SeataDataSourceProxy dataSourceProxy = (SeataDataSourceProxy) dataSource;

        // 就是想要的代理类就直接返回了
        if (dataSourceProxyMode == dataSourceProxy.getBranchType()) {
            return (SeataDataSourceProxy) dataSource;
        }

        // 获取原数据源,下面根据该数据源创建或获取数据源代理类
        originalDataSource = dataSourceProxy.getTargetDataSource();
    } else {
        originalDataSource = dataSource;
    }
    // 从缓存中获取真实数据源对应的代理
    SeataDataSourceProxy dsProxy = dataSourceProxyMap.get(originalDataSource);
    if (dsProxy == null) {
        synchronized (dataSourceProxyMap) {
            dsProxy = dataSourceProxyMap.get(originalDataSource);
            if (dsProxy == null) {
                // 没获取到就根据事务模式和真实数据源创建一个代理
                dsProxy = createDsProxyByMode(dataSourceProxyMode, originalDataSource);
                // 放进缓存
                dataSourceProxyMap.put(originalDataSource, dsProxy);
            }
        }
    }
    return dsProxy;
}

XA模式就创建DataSourceProxyXA,其他模式创建DataSourceProx。

private SeataDataSourceProxy createDsProxyByMode(BranchType mode, DataSource originDs) {
    return BranchType.XA == mode ? new DataSourceProxyXA(originDs) : new DataSourceProxy(originDs);
}

SeataAutoDataSourceProxyCreator
上面为每个数据源生成了seata的代理对象,但是该代理对象并不能通过AOP切入,所以还是需要一个AOP代理对象。SeataAutoDataSourceProxyCreator也是继承了AbstractAutoProxyCreator类,以前解析过AOP源码,可以知道继承该类就可以对指定的bean生成AOP代理 。

public class SeataAutoDataSourceProxyCreator extends AbstractAutoProxyCreator {
    private static final Logger LOGGER = LoggerFactory.getLogger(SeataAutoDataSourceProxyCreator.class);
    private final List<String> excludes;
    private final Advisor advisor;

    public SeataAutoDataSourceProxyCreator(boolean useJdkProxy, String[] excludes, String dataSourceProxyMode) {
        this.excludes = Arrays.asList(excludes);
        this.advisor = new DefaultIntroductionAdvisor(new SeataAutoDataSourceProxyAdvice(dataSourceProxyMode));
        setProxyTargetClass(!useJdkProxy);
    }

    @Override
    protected Object[] getAdvicesAndAdvisorsForBean(Class<?> beanClass, String beanName, TargetSource customTargetSource) throws BeansException {
        if (LOGGER.isInfoEnabled()) {
            LOGGER.info("Auto proxy of [{}]", beanName);
        }
        return new Object[]{advisor};
    }

    @Override
    protected boolean shouldSkip(Class<?> beanClass, String beanName) {
        // 这个类只对DataSource生成代理
        return !DataSource.class.isAssignableFrom(beanClass) ||
            SeataProxy.class.isAssignableFrom(beanClass) ||
            excludes.contains(beanClass.getName());
    }
}

从shouldSkip方法可知,只会对DataSource生成代理,而它添加的增强逻辑在SeataAutoDataSourceProxyAdvice内。

/**
 * 对DataSource进行增强,代理DataSource中的方法
 *
 * @author xingfudeshi@gmail.com
 */
public class SeataAutoDataSourceProxyAdvice implements MethodInterceptor, IntroductionInfo {

    private final BranchType dataSourceProxyMode;
    private final Class<? extends SeataDataSourceProxy> dataSourceProxyClazz;

    public SeataAutoDataSourceProxyAdvice(String dataSourceProxyMode) {
        if (BranchType.AT.name().equalsIgnoreCase(dataSourceProxyMode)) {
            this.dataSourceProxyMode = BranchType.AT;
            this.dataSourceProxyClazz = DataSourceProxy.class;
        } else if (BranchType.XA.name().equalsIgnoreCase(dataSourceProxyMode)) {
            this.dataSourceProxyMode = BranchType.XA;
            this.dataSourceProxyClazz = DataSourceProxyXA.class;
        } else {
            throw new IllegalArgumentException("Unknown dataSourceProxyMode: " + dataSourceProxyMode);
        }

        //Set the default branch type in the RootContext.
        RootContext.setDefaultBranchType(this.dataSourceProxyMode);
    }

    @Override
    public Object invoke(MethodInvocation invocation) throws Throwable {
        // 如果不是在@GlobalLock方法或事务模式跟当前的不匹配,则直接调用原方法
        if (!RootContext.requireGlobalLock() && dataSourceProxyMode != RootContext.getBranchType()) {
            return invocation.proceed();
        }

        Method method = invocation.getMethod();
        Object[] args = invocation.getArguments();
        Method m = BeanUtils.findDeclaredMethod(dataSourceProxyClazz, method.getName(), method.getParameterTypes());
        if (m != null && DataSource.class.isAssignableFrom(method.getDeclaringClass())) {
            // 获取seata创建的代理数据源,调用代理数据源的方法
            SeataDataSourceProxy dataSourceProxy = DataSourceProxyHolder.get().putDataSource((DataSource) invocation.getThis(), dataSourceProxyMode);
            return m.invoke(dataSourceProxy, args);
        } else {
            return invocation.proceed();
        }
    }

    @Override
    public Class<?>[] getInterfaces() {
        return new Class[]{SeataProxy.class};
    }
}

当调用DataSource的方法时,就会通过AOP代理对象调用到SeataDataSourceProxy实现类的方法,即seata的代理。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/571895.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

C# WPF窗体设计器显示以及App.xaml文件打不开

问题描述&#xff1a; 在项目中遇到了App.xaml设计器打不开以及窗体设计器不显示&#xff0c;只有代码&#xff0c;如图所示&#xff1a; 可以明显的看见左下角的设计器不见&#xff0c;但是用户控件又有设计器 解决方法&#xff1a; ①清理项目 ②将不能正常打开的文件右…

Android Studio 2022.3 新版 flamingo 安装步骤及遇到的问题

下载地址: https://developer.android.google.cn/studio D盘中新建一个 Android 文件夹, 用来存储 Android studio 和 SDK 文件. 下载好之后, 运行 exe 文件, 点击 next 注意这个路径最好不要有空格,比如 program files这种目录,不然后面安装sdk的时候会有问题. 点击 instal…

【TI毫米波雷达笔记】IWR6843AOPEVM-G的DCA1000EVM模式配置及避坑

【TI毫米波雷达笔记】IWR6843AOPEVM-G的DCA1000EVM模式配置及避坑 IWR6843AOPEVM-G版本可以直接与DCA1000EVM连接 进行数据获取 不需要连接MMWAVEICBOOST版 直接使用 DCA1000mmWave Studio 软件进行数据采集 在官方手册中 User’s Guide 60GHz 毫米波传感器EVM 有相关模式的开…

基于RetinaNet和TensorFlow Object Detection API实现目标检测(附源码)

文章目录 一、RetinaNet原理二、RetinaNet实现1. tf.train.CheckPoint简介2. RetinaNet的TensorFlow源码 一、RetinaNet原理 待补充 二、RetinaNet实现 1. tf.train.CheckPoint简介 待补充 2. RetinaNet的TensorFlow源码 Step 1&#xff1a;安装Tensorflow 2 Object Detect…

ORB-SLAM3整体流程详解

0. 简介 在之前&#xff0c;作者曾经转过一篇《一文详解ORB-SLAM3》的文章。那篇文章中提到了ORB-SLAM3是一个支持视觉、视觉加惯导、混合地图的SLAM系统&#xff0c;可以在单目&#xff0c;双目和RGB-D相机上利用针孔或者鱼眼模型运行。与ORB-SLAM2相比&#xff0c;ORB-SLAM3…

软件系统三基座之一:权限管理

软件系统三基座包含&#xff1a;权限管理、组织架构、用户管理。 何为基座&#xff0c;即是有了这些基础&#xff0c;任一相关的“建筑”就能逐步搭建起来。 万丈高楼平地起 一、为什么要权限管理 权限管理&#xff0c;一般指根据系统设置的安全规则或者安全策略&#xff0c;…

集成chatgpt4和midjourney的超强镜像站

昨天发现一个镜像站&#xff0c;和之前发的镜像站不一样&#xff0c;这个集成了midjourney和chatgpt&#xff0c;且免翻&#xff0c;相信给很多很多用户都提供了便利吧&#xff01; 先把网站贴出来&#xff0c;有兴趣的伙伴可以玩一玩 http://mtw.so/5EoyYy http://mtw.so/5E…

如何在上架App之前设置证书并上传应用

App上架教程 在上架App之前想要进行真机测试的同学&#xff0c;请查看《iOS- 最全的真机测试教程》&#xff0c;里面包含如何让多台电脑同时上架App和真机调试。 P12文件的使用详解 注意&#xff1a; 同样可以在Build Setting 的sign中设置证书&#xff0c;但是有点麻烦&…

浅析 Redis 中 String 数据类型及其底层编码

从 RedisObject 说起 在 Redis 中&#xff0c;任意数据类型的键和值都会被封装为一个 RedisObject &#xff0c;也叫做Redis对象&#xff0c;源码如下 c 复制代码 /*server.h*/ typedef struct redisObject { unsigned type:4; unsigned encoding:4; unsigned lru:LRU_BITS;…

springboot+vue之java学习平台(java项目源码+文档)

风定落花生&#xff0c;歌声逐流水&#xff0c;大家好我是风歌&#xff0c;混迹在java圈的辛苦码农。今天要和大家聊的是一款基于springboot的java学习平台。项目源码以及部署相关请联系风歌&#xff0c;文末附上联系信息 。 &#x1f495;&#x1f495;作者&#xff1a;风歌&a…

档案库房太乱了怎么办?这个方法秒变高级!

全国有数以万计的大大小小的档案馆&#xff0c;其中有许多非常重要的机要档案&#xff0c;其历史和社会价值非常高&#xff0c;而档案保存的质量、档案的物理寿命、档案的防虫防霉都与库房的空气质量、温湿度息息相关。 解决档案高效管理及利用的安全问题越来越迫切&#xff0c…

在Ubuntu22.04上安装QQ~Linux

在Ubuntu22.04上安装QQ~Linux 0. 前言1. 下载deb安装包2. 使用dpkg安装deb包3. 安装完成&#xff0c;启动QQ3.1 点击图标打开3.2 使用命令行的方式打开 0. 前言 换Ubuntu当主力生产力了&#xff0c;并不是太喜欢vmware&#xff0c;所以我直接装到了硬盘里边&#xff0c;需要移…

SSM 如何使用 Kafka 实现消息队列?

SSM 如何使用 Kafka 实现消息队列&#xff1f; Kafka 是一个高性能、可扩展、分布式的消息队列系统&#xff0c;它支持多种数据格式和多种操作&#xff0c;可以用于实现数据传输、消息通信、日志处理等场景。在 SSM&#xff08;Spring Spring MVC MyBatis&#xff09;开发中…

iOS-最全的App上架教程

App上架教程 在上架App之前想要进行真机测试的同学&#xff0c;请查看《iOS- 最全的真机测试教程》&#xff0c;里面包含如何让多台电脑同时上架App和真机调试。 P12文件的使用详解 注意&#xff1a; 同样可以在Build Setting 的sign中设置证书&#xff0c;但是有点麻烦&…

软件开发项目成本控制的4大策略

1、构建责权利相结合的成本控制机制 需要对每个部门与个人的工作范围和工作职业有明确的界定&#xff0c;并赋予相应的权利以充分履行职责。在责任支配下高效完成工作进度时&#xff0c;需要给予一定的物质奖励。通过这样层层落实&#xff0c;逐级负责&#xff0c;从而做到责权…

VanillaNet:深度学习极简主义的力量

摘要 基础模型的核心是“更多不同”的理念&#xff0c;计算机视觉和自然语言处理方面的出色表现就是例证。然而&#xff0c;Transformer模型的优化和固有复杂性的挑战要求范式向简单性转变。在本文中&#xff0c;我们介绍了VanillaNET&#xff0c;这是一种设计优雅的神经网络架…

学会提问,ChatGPT可以帮你写出高质量论文

前言 ChatGPT 很火&#xff0c;火到大家以为他可以上天入地&#xff0c;上到天文&#xff0c;下到地理无所不能&#xff0c;但实际使用大家是不是会遇到如下的情况。 写论文步骤 今天&#xff0c;我们来探讨下怎样问ChatGPT&#xff0c;才能帮你写出一篇优秀的论文&#xff0c;…

【Java-Crawler】爬取动态页面(HtmlUnit、WebMagic)

爬取动态页面&#xff08;WebMagic、HtmlUnit&#xff09; 一、HtmlUnit的基本使用引入依赖一般使用步骤WebClient 的一些配置&#xff08;上述一般步骤中的第二步&#xff09; 二、案例&#xff08;爬取CSDN首页&#xff09;测试&#xff08;WebMagicHtmlUnit&#xff09;三、…

人机交互技术在车管所的应用探索

车管所作为交通管理的重要机构&#xff0c;承担着车辆登记、驾驶证办理、年检等重要职责&#xff0c;其工作效率和服务质量对于保障道路交通安全和畅通至关重要。而人机交互技术作为一种新兴的技术手段&#xff0c;可以为车管所提供更加高效、便捷的服务。因此&#xff0c;本文…

ESD防静电监控系统后台实时掌控现场静电防护情况

当静电积累到一定程度时&#xff0c;它可能会产生电击&#xff0c;从而对工人造成伤害。因此&#xff0c;工厂应该采取必要的预防措施&#xff0c;如提供防静电鞋和衣服&#xff0c;以保护工人免受静电伤害。 ESD防静电监控系统实现工业4.0技术要求&#xff0c;ESD物联技术稳定…