【分布式事务】Seata 之 @GlobalTransactional 在TM侧的核心逻辑

news2025/4/18 8:39:34

文章目录

  • 一、概述
  • 二、@GlobalTransactional核心逻辑
  • 三、@GlobalTransactional核心源码解读
  • 四、事务能力的启停
    • 运行期的开关变更

一、概述

Seata 依赖 Spring 的注解机制,实现声明式事务,即开发者给 Bean 使用@GlobalTransactional注解 ,Seata 通过GlobalTransactionScanner重写此类 Bean 生命周期的三个阶段以完成分布式事务能力,这 3 步为:

  1. Bean 初始化阶段(`afterPropertiesSet()``)

    • 初始化 TM RM 客户端,建立与 TC 的长连接
  2. Bean 初始化后阶段(`wrapIfNecessary()``)

    • wrapIfNecessary 见名知意,如果有必要就包裹(wrap)起来;所以如果 Bean 的方法上有注解@GlobalTransactional(@GlobalLock,@TwoPhaseBusinessAction 注解本篇暂不提及),则给这类 Bean 生成代理类,目标方法的代理逻辑中实现分布式事务中 TM 角色的能力
  3. Bean 销毁阶段(ShutdownHook)

    • 当 Bean 被销毁的时候关闭 TM、RM 客户端

本篇 RM、RM 客户端的初始化,暂不多说,后续会结合注册中心和重连机制再提,其中跟 Netty 相关的部分逻辑在《Seata 高性能 RPC 通信的实现- 巧用 reactor 模式》中有提及

二、@GlobalTransactional核心逻辑

1)GlobalTransactionScanner#wrapIfNecessary扫描 spring bean 时,判断是否有@GlobalTransactional注解(@GlobalLock,此处不提),识别到方法上的@GlobalTransactional注解后,给 bean 加上 AOP 拦截器GlobalTransactionalInterceptor。TCC 模式下方法被@TwoPhaseBusinessAction修饰时,相应的Advice为TccActionInterceptor

2)当目标方法被调用时,就先进入到了GlobalTransactionalInterceptor#invoke中,此方法中首先判断是否在运行时停用分布式事务能力(1.动态配置关闭事务,2.因 Seata 异常降级事务能力);如果仍是启用状态,则后续逻辑交给handleGlobalTransaction来完成。

3)handleGlobalTransaction 中的关键逻辑是 2 步,第 1 步是通过GlobalTransactionalInterceptor#transactionalTemplate执行全局事务;第二步是对第一步结果异常时的处理,在第二部会有failureHandler的调用,开发者据此回调得知事务是错在哪里了。

4)在GlobalTransactionalInterceptor#transactionalTemplate中定义的是 TM 发起者执行开启全局事务、提交或回滚全局事务的核心逻辑,这些方法执行时会判断若角色是 TM 参与者则不做什么,有可能一个执行链路中有多个方法被@GlobalTransaction修饰,调用链路中第一个@GlobalTransaction才是 TM 发起者,剩余的都是 TM 参与者,此方法中的的关键逻辑如下:

  • 从上下文中获取获取当前全局事务对象 GlobalTransaction(有可能一个执行链路中有多个方法被@GlobalTransaction 修饰)
  • 根据@GlobalTransactional 注解中 propagation 的值和当前全局事务对象的情况,决策事务传播策略,参考 Spring 文档了解事务传播行为
  • 如果当前全局事务对象是空,则新建一个全局事务对象,角色是 TM 发起者
  • 完成全局事务:开始全局事务,执行业务逻辑(AT 模式下,各分支事务的执行就在其中),提交或者回滚全局事务。

三、@GlobalTransactional核心源码解读

1) GlobalTransactionScanner#wrapIfNecessary扫描 spring bean 时,判断方法上是否有@GlobalTransactional注解,如果有则给这个 bean,添加拦截器GlobalTransactionalInterceptor,也就是说被 @GlobalTransactional 和 @GlobalLock 标注后,Seata 通过 AOP 增强提供的分布式事务能力在 GlobalTransactionalInterceptor 中

protected Object wrapIfNecessary(Object bean, String beanName, Object cacheKey) {
       // ... TCC 部分暂略

            Class<?> serviceInterface = SpringProxyUtils.findTargetClass(bean);
            Class<?>[] interfacesIfJdk = SpringProxyUtils.findInterfaces(bean);

            // 判断类或方法上是否有@GlobalTransactional 注解
            // 判断方法上有否有 @GlobalLock 注解
            if (!existsAnnotation(new Class[]{serviceInterface})
                && !existsAnnotation(interfacesIfJdk)) {
                return bean;
            }

            if (globalTransactionalInterceptor == null) {
                // 构建AOP的拦截器 GlobalTransactionalInterceptor
                globalTransactionalInterceptor = new GlobalTransactionalInterceptor(failureHandlerHook);
                // 运行时监听是否禁用分布式事务,如果禁用,那么拦截器中就不再使用分布式事务的能力
                ConfigurationCache.addConfigListener(
                        ConfigurationKeys.DISABLE_GLOBAL_TRANSACTION,
                        (ConfigurationChangeListener)globalTransactionalInterceptor);
            }
            // 下方getAdvicesAndAdvisorsForBean 方法中,就返回这个interceptor,
            // 也就是说被 @GlobalTransactional 和 @GlobalLock 标注后,Seata通过AOP增强提供的分布式事务能力在 GlobalTransactionalInterceptor中
            interceptor = globalTransactionalInterceptor;
        }

        LOGGER.info("Bean[{}] with name [{}] would use interceptor [{}]", bean.getClass().getName(), beanName, interceptor.getClass().getName());
        // 如果是普通的bean,走父类的方法生成代理类即可
        if (!AopUtils.isAopProxy(bean)) {
            bean = super.wrapIfNecessary(bean, beanName, cacheKey);
        } else {
            // 如果已经是代理类,获取到advisor后,添加到该集合即可
            AdvisedSupport advised = SpringProxyUtils.getAdvisedSupport(bean);
            // 根据上面的interceptor生成advisor
            Advisor[] advisor = buildAdvisors(beanName, getAdvicesAndAdvisorsForBean(null, null, null));
            int pos;
            for (Advisor avr : advisor) {
                // Find the position based on the advisor's order, and add to advisors by pos
                pos = findAddSeataAdvisorPosition(advised, avr);
                advised.addAdvisor(pos, avr);
            }
        }
        PROXYED_SET.add(beanName);
        return bean;
    }
} catch (Exception exx) {
    throw new RuntimeException(exx);
}

2) 拦截器GlobalTransactionalInterceptor的invoke方法中,判断分布式事务能力未被禁用的情况下,将标注了@GlobalTransactional的方法,交给handleGlobalTransaction(xxx)处理

public Object invoke(final MethodInvocation methodInvocation) throws Throwable {
    //通过 methodInvocation.getThis() 获取当前方法调用的所属对象
    //通过 AopUtils.getTargetClass(xx) 获取当前对象的Class
    Class<?> targetClass =
        methodInvocation.getThis() != null ? AopUtils.getTargetClass(methodInvocation.getThis()) : null;

    Method specificMethod = ClassUtils.getMostSpecificMethod(methodInvocation.getMethod(), targetClass);

    if (specificMethod != null && !specificMethod.getDeclaringClass().equals(Object.class)) {
        // BridgeMethodResolver.findBridgedMethod https://cloud.tencent.com/developer/article/1656258
        final Method method = BridgeMethodResolver.findBridgedMethod(specificMethod);
        // 获取目标方法上 @GlobalTransactional 的信息
        final GlobalTransactional globalTransactionalAnnotation =
            getAnnotation(method, targetClass, GlobalTransactional.class);
        // 获取目标方法上 @GlobalLock 的信息,@GlobalTransactional 和 @GlobalLock 不该同时存在
        // @GlobalTransactional 是开启全局事务
        // @GlobalLock 是按照全局事务的隔离级别查看数据
        final GlobalLock globalLockAnnotation = getAnnotation(method, targetClass, GlobalLock.class);
        // 禁用了,或者 开启了分布式事务能力降级,并且触发了降级的阈值
        boolean localDisable = disable || (ATOMIC_DEGRADE_CHECK.get() && degradeNum >= degradeCheckAllowTimes);
        if (!localDisable) {
            if (globalTransactionalAnnotation != null || this.aspectTransactional != null) {
                AspectTransactional transactional;
                if (globalTransactionalAnnotation != null) {
                    // 通过 @GlobalTransactional的信息构建 全局事务的核心配置
                    transactional = new AspectTransactional(globalTransactionalAnnotation.timeoutMills(),
                        globalTransactionalAnnotation.name(), globalTransactionalAnnotation.rollbackFor(),
                        globalTransactionalAnnotation.rollbackForClassName(),
                        globalTransactionalAnnotation.noRollbackFor(),
                        globalTransactionalAnnotation.noRollbackForClassName(),
                        globalTransactionalAnnotation.propagation(),
                        globalTransactionalAnnotation.lockRetryInterval(),
                        globalTransactionalAnnotation.lockRetryTimes(),
                        globalTransactionalAnnotation.lockStrategyMode());
                } else {
                    transactional = this.aspectTransactional;
                }
                // 处理全局事务
                return handleGlobalTransaction(methodInvocation, transactional);
            } else if (globalLockAnnotation != null) {
                // 处理全局锁
                return handleGlobalLock(methodInvocation, globalLockAnnotation);
            }
        }
    }
    return methodInvocation.proceed();
}

3)handleGlobalTransaction 中的关键逻辑是 2 步,第 1 步是通过GlobalTransactionalInterceptor#transactionalTemplate执行全局事务;第 2 步是对第 1 步结果异常时的处理,在第 2 部会有failureHandler的调用,开发者据此回调得知事务是错在哪里了。先看第 2 步的逻辑,因为比较清晰,明确要捕获的异常类型为:TransactionalExecutor.ExecutionException,根据异常中的不同 code 的值,做不同的处理(failureHandler 回调的方法不同,开发者通过这个回调感知事务发生了什么异常。即使 Seata 会捕获异常,但根据异常信息做出跟事务相关的异常处理后,仍是将原始的异常上抛,让开发者仍按照未接入事务的情况处理。

Object handleGlobalTransaction(final MethodInvocation methodInvocation,
  final AspectTransactional aspectTransactional) throws Throwable {
  boolean succeed = true;
  try {
      return transactionalTemplate.execute(new TransactionalExecutor() {...});
   } catch (TransactionalExecutor.ExecutionException e) {
      TransactionalExecutor.Code code = e.getCode();
      // code不同,处理逻辑不通
      switch (code) {
          // 遇到异常,但TM正常完成了TM回滚,将原始异常抛出,业务逻辑仍只关注原始异常,知道异常时事务回滚了,但无需操心回滚的细节
          // 对应TC侧GlobalStatus状态为 Rollbacked (11)
          case RollbackDone:
              throw e.getOriginalException();
          // 开始全局事务异常,
          case BeginFailure:
              succeed = false;
              failureHandler.onBeginFailure(e.getTransaction(), e.getCause());
              throw e.getCause();
          // 提交事务失败
          case CommitFailure:
              succeed = false;
              failureHandler.onCommitFailure(e.getTransaction(), e.getCause());
              throw e.getCause();
          // 回滚失败了
          // 对应TC侧GlobalStatus状态有 RollbackFailed  TimeoutRollbackFailed RollbackRetryTimeout
          case RollbackFailure:
              failureHandler.onRollbackFailure(e.getTransaction(), e.getOriginalException());
              throw e.getOriginalException();
          // 回滚重试中
          // 对应TC侧GlobalStatus状态有 Rollbacking  RollbackRetrying RollbackRetryTimeout
          case RollbackRetrying:
              failureHandler.onRollbackRetrying(e.getTransaction(), e.getOriginalException());
              throw e.getOriginalException();
          // 因超时而回滚了
          // 对应TC侧GlobalStatus状态有 TimeoutRollbacking  TimeoutRollbackRetrying TimeoutRollbacked
          case TimeoutRollback:
              failureHandler.onTimeoutRollback(e.getTransaction(), e.getOriginalException());
              throw e.getCause();
          default:
              throw new ShouldNeverHappenException(String.format("Unknown TransactionalExecutor.Code: %s", code));
      }
  } finally {
      if (ATOMIC_DEGRADE_CHECK.get()) {
          EVENT_BUS.post(new DegradeCheckEvent(succeed));
      }
  }
}

这其中关于回滚的错误处理部分稍稍有点复杂,下图列出了 TC 侧在回滚时所使用的GlobalStatus回滚状态 在 client 端 TM 侧对应的TransactionalExecutor.Code,通过这个映射关系梳理服务端逻辑时,会更方便理解。
在这里插入图片描述

4)handleGlobalTransaction 中第 1 步是通过GlobalTransactionalInterceptor#transactionalTemplate执行全局事务。
在这里插入图片描述

其execute()方法中的methodInvocation.proceed();是业务逻辑方法。在这个业务逻辑方法的执行前后,加上了 TM 的事务管理能力。这个类中定义的是 TM 发起者执行开启全局事务、提交或回滚全局事务的核心逻辑,这些方法执行时会判断若角色是 TM 参与者则不做什么

  1. 从上下文中获取获取当前全局事务对象GlobalTransaction(有可能一个执行链路中有多个方法被@GlobalTransaction修饰)
  2. 根据@GlobalTransactional注解中propagation的值和当前全局事务对象的情况,决策事务传播策略,参考 Spring 文档了解事务传播行为
  3. 如果当前全局事务对象是空,则新建一个全局事务对象,角色是 TM 发起者
  4. 完成全局事务:开始全局事务,执行业务逻辑(AT 模式下,各分支事务的执行就在其中),提交或者回滚全局事务。
public Object execute(TransactionalExecutor business) throws Throwable {
    // 1. Get transactionInfo
    TransactionInfo txInfo = business.getTransactionInfo();
    if (txInfo == null) {
        throw new ShouldNeverHappenException("transactionInfo does not exist");
    }
    // 1.1 Get current transaction, if not null, the tx role is 'GlobalTransactionRole.Participant'.
    /**
     * 从上下文中,获取当前事务对象
     * 1.当前事务getcurrent为空:则当前是事务的发起者TM(Launcher)
     * 2.当前事务getCurrent不为空:则当前是事务参与者;
     *      事务嵌套的情况下(如:A和B两个方法都标注了@GlobalTransactional,A方法中会调用B方法),
     *      对于A来说是TM,而对于B来说,因为全局事务A不为空,那么B就是参与者(GlobalTransactionRole.Participant)
     *      那么此处返回的GlobalTransaction中xid是事务A的xid,B的角色是GlobalTransactionRole.Participant
     */
    GlobalTransaction tx = GlobalTransactionContext.getCurrent();

    // 1.2 Handle the transaction propagation.
    // 下面是处理事务的传播特性,如果没有指定,默认是REQUIRED
    // REQUIRED:如果本来有事务,则加入该事务,如果没有事务,则创建新的事务
    Propagation propagation = txInfo.getPropagation();
    SuspendedResourcesHolder suspendedResourcesHolder = null;
    try {
        //事务的传播机制, 根据不同的传播行为,执行不同的逻辑
        switch (propagation) {
            case NOT_SUPPORTED:
                // If transaction is existing, suspend it.
                if (existingTransaction(tx)) {
                    suspendedResourcesHolder = tx.suspend();
                }
                // Execute without transaction and return.
                return business.execute();
            case REQUIRES_NEW:
                // If transaction is existing, suspend it, and then begin new transaction.
                if (existingTransaction(tx)) {
                    suspendedResourcesHolder = tx.suspend();
                    tx = GlobalTransactionContext.createNew();
                }
                // Continue and execute with new transaction
                break;
            case SUPPORTS:
                // If transaction is not existing, execute without transaction.
                if (notExistingTransaction(tx)) {
                    return business.execute();
                }
                // Continue and execute with new transaction
                break;
            case REQUIRED:
                // If current transaction is existing, execute with current transaction,
                // else continue and execute with new transaction.
                break;
            case NEVER:
                // If transaction is existing, throw exception.
                if (existingTransaction(tx)) {
                    throw new TransactionException(
                            String.format("Existing transaction found for transaction marked with propagation 'never', xid = %s"
                                    , tx.getXid()));
                } else {
                    // Execute without transaction and return.
                    return business.execute();
                }
            case MANDATORY:
                // If transaction is not existing, throw exception.
                if (notExistingTransaction(tx)) {
                    throw new TransactionException("No existing transaction found for transaction marked with propagation 'mandatory'");
                }
                // Continue and execute with current transaction.
                break;
            default:
                throw new TransactionException("Not Supported Propagation:" + propagation);
        }

        // 1.3 If null, create new transaction with role 'GlobalTransactionRole.Launcher'.
        // 如果tx为空,表示是事务的发起者TM,则创建一个角色为Launcher的GlobalTransaction
        if (tx == null) {
            tx = GlobalTransactionContext.createNew();
        }

        // set current tx config to holder
        // 应对事物嵌套的场景,Participant的某些配置覆盖Launcher的配置,
        // 待participant的事务处理完毕后,仍需要恢复Launcher中的配置
        GlobalLockConfig previousConfig = replaceGlobalLockConfig(txInfo);

        try {
            // 2. If the tx role is 'GlobalTransactionRole.Launcher', send the request of beginTransaction to TC,
            //    else do nothing. Of course, the hooks will still be triggered.
            // 开启事务,如果是TM 发起者(GlobalTransactionRole.Launcher),那么才会向TC发送开启全局事务的RPC请求
            // 开启事务是在,DefaultTransactionManager.begin方法中向TC同步发送 GlobalBeginRequest
            // 但如果是参与者GlobalTransactionRole.Participant,则不向TC发请求,仅承担分支事务的职责,但需注意hooks仍是被调用的
            // TC 端收到请求,开启全局事务成功后生成并返回一个全局唯一的 XID。
            // TM 将 XID 保存到 ThreadLocal 中,后续RPC调用中这个XID也会被透传
            beginTransaction(txInfo, tx);

            Object rs;
            try {
                // Do Your Business
                // 执行业务方法,如果有RPC调用,则发起RPC调用时携带上xid
                rs = business.execute();
            } catch (Throwable ex) {
                // 3. The needed business exception to rollback.
                // 异常时,如匹配到异常规则才执行回滚;
                // 否则内部还是执行事务提交,需注意即使事务提交成功了,接下来还有异常抛出
                // 也就是说Seata关注异常,根据异常信息来抉择分布式事务该你怎么处理,但并不会因为自己对异常处理了就把异常吞掉。
                // 回滚请求是在DefaultTransactionManager.rollback 中向TC同步发送 GlobalRollbackRequest,
                // 发送回滚请求有重试机制,默认5次,可通过 client.tm.rollbackRetryCount 调整
                completeTransactionAfterThrowing(txInfo, tx, ex);
                throw ex;
            }

            // 4. everything is fine, commit.
            // 提交事务
            // 但如果如果检测到已超时,则执行回滚事务
            // 若未超时,才执行事务提交
            // 提交请求是在DefaultTransactionManager.commit 中向TC同步发送 GlobalCommitRequest,有重试机制
            // 发送提交请求有重试机制,默认5次,可通过 client.tm.commitRetryCount 调整
            commitTransaction(tx, txInfo);

            return rs;
        } finally {
            //5. clear
            // 恢复原事务配置,触发hook回执,清理hook
            resumeGlobalLockConfig(previousConfig);
            // 触发 afterHook
            triggerAfterCompletion();
            // 清除hook,
            cleanUp();
        }
    } finally {
        // If the transaction is suspended, resume it.
        // 如果有被挂起的事务,这里将其恢复
        if (suspendedResourcesHolder != null) {
            tx.resume(suspendedResourcesHolder);
        }
    }
}

TransactionalTemplate#execute中在开始事务,提交或者回滚事务 时都有 hook 的回调,但是从 1.6.1 版本的代码来,并没有看到注册 hook 的地方,为何TransactionHookManager#registerHook没用调用?

四、事务能力的启停

GlobalTransactionalInterceptor#invoke有一行很关键的代码,用于判断当前服务调用是 TM 是否使用分布式事务能力(开启事务+提交|回滚事务),若不开启则只执行原始的业务逻辑。

boolean localDisable = disable || (ATOMIC_DEGRADE_CHECK.get() && degradeNum >= degradeCheckAllowTimes);
  1. disable 用于在启动时和运行期控制是否禁用分布式事务能力

    • 默认值为 false,对应配置 key : service.disableGlobalTransaction,可在配置中心动态变更
  2. (ATOMIC_DEGRADE_CHECK.get() && degradeNum >= degradeCheckAllowTimes) 其作用是如果开启降级检测,并且事务连续失败次数达到阈值,则自动先行禁用事务能力;同时还有一个定时任务会间歇性的通过模拟全局事务开启+提交的方式来试探,当连续试探成功次数达到阈值后,自动激活事务能力。

    • ATOMIC_DEGRADE_CHECK 通过 client.tm.degradeCheck 配置是否开启事务降级检查,默认为false,不开启事务降级检查
    • 只有开启事务降级检查,以下两个配置才有意义;
    • degradeCheckAllowTimes指定了降级检查允许的次数,通过 client.tm.degradeCheckAllowTimes 指定,
    • degradeCheckPeriod 指定了模拟试探的频率,通过client.tm.degradeCheckPeriod 指定,只有当开启事务降级检查,这个配置才有意义;

运行期的开关变更

1)监听ConfigurationKeys.DISABLE_GLOBAL_TRANSACTION的变更
在GlobalTransactionScanner#wrapIfNecessary中,创建globalTransactionalInterceptor时添加了监听ConfigurationKeys.DISABLE_GLOBAL_TRANSACTION的变更。

if (globalTransactionalInterceptor == null) {
    // 构建AOP的拦截器 GlobalTransactionalInterceptor
    globalTransactionalInterceptor = new GlobalTransactionalInterceptor(failureHandlerHook);
    // 运行时监听是否禁用分布式事务,如果禁用,那么拦截器中就不再使用分布式事务的能力
    ConfigurationCache.addConfigListener(
            ConfigurationKeys.DISABLE_GLOBAL_TRANSACTION,
            (ConfigurationChangeListener)globalTransactionalInterceptor);
}

2)监听ConfigurationKeys.CLIENT_DEGRADE_CHECK的变更
在 GlobalTransactionalInterceptor的构造函数中,读取配置,对各个关键配置做了初始化,并添加了监听器监听ConfigurationKeys.CLIENT_DEGRADE_CHECK的变更

public GlobalTransactionalInterceptor(FailureHandler failureHandler) {
    this.failureHandler = failureHandler == null ? DEFAULT_FAIL_HANDLER : failureHandler;
    // 初始化 disable 的值,读取配置service.disableGlobalTransaction
    this.disable = ConfigurationFactory.getInstance().getBoolean(ConfigurationKeys.DISABLE_GLOBAL_TRANSACTION,
        DEFAULT_DISABLE_GLOBAL_TRANSACTION);
    this.order =
        ConfigurationFactory.getInstance().getInt(ConfigurationKeys.TM_INTERCEPTOR_ORDER, TM_INTERCEPTOR_ORDER);
    // 需要注意,degradeCheckPeriod 和 degradeCheckAllowTimes 在首次启动后读取配置后赋值,之后不再感知变更。
    boolean degradeCheck = ConfigurationFactory.getInstance().getBoolean(ConfigurationKeys.CLIENT_DEGRADE_CHECK,
        DEFAULT_TM_DEGRADE_CHECK);
    degradeCheckPeriod = ConfigurationFactory.getInstance()
            .getInt(ConfigurationKeys.CLIENT_DEGRADE_CHECK_PERIOD, DEFAULT_TM_DEGRADE_CHECK_PERIOD);
    degradeCheckAllowTimes = ConfigurationFactory.getInstance()
            .getInt(ConfigurationKeys.CLIENT_DEGRADE_CHECK_ALLOW_TIMES, DEFAULT_TM_DEGRADE_CHECK_ALLOW_TIMES);
    // 通过 GuavaEventBus 来监听事务成功还是失败,以调整统计计数
    EVENT_BUS.register(this);
    // 如果满足条件则开启降级检测
    if (degradeCheck && degradeCheckPeriod > 0 && degradeCheckAllowTimes > 0) {
        startDegradeCheck();
    }
    // 监听配置项 CLIENT_DEGRADE_CHECK 的变更。
    ConfigurationCache.addConfigListener(ConfigurationKeys.CLIENT_DEGRADE_CHECK, this);
    this.initDefaultGlobalTransactionTimeout();
}

3) 在运行时监听到配置变更后,调整降级检测能力

事务是否被禁用,对应的配置 key 为:service.disableGlobalTransaction
是否启用客户端的降级检测 对应的配置 key 为:client.tm.degradeCheck

@Override
public void onChangeEvent(ConfigurationChangeEvent event) {
    if (ConfigurationKeys.DISABLE_GLOBAL_TRANSACTION.equals(event.getDataId())) {
        LOGGER.info("{} config changed, old value:{}, new value:{}", ConfigurationKeys.DISABLE_GLOBAL_TRANSACTION,
                disable, event.getNewValue());
        disable = Boolean.parseBoolean(event.getNewValue().trim());
    } else if (ConfigurationKeys.CLIENT_DEGRADE_CHECK.equals(event.getDataId())) {
        boolean degradeCheck = Boolean.parseBoolean(event.getNewValue());
        // 如果禁用降级检测,关闭降级检测
        if (!degradeCheck) {
            degradeNum = 0;
            stopDegradeCheck();
        } else if (degradeCheckPeriod > 0 && degradeCheckAllowTimes > 0) {
            // 如果开启检测,并且满足条件,则开启降级检测
            startDegradeCheck();
        }
    }
}

4)开启降级检测,当开启事务能力降级检测是,打开标志开关,并会创建单线程的线程池,并以此线程池执行事务能力检测。模拟一套全局事务的开启和提交,用于检测 TC 是否正常服务,事务的名字是 degradeCheck

private static void startDegradeCheck() {
    if (!ATOMIC_DEGRADE_CHECK.compareAndSet(false, true)) {
        return;
    }
    if (executor != null && !executor.isShutdown()) {
        return;
    }
    // 如果启动降级检测,就创建一个单线程的线程池,线程名称前缀为degradeCheckWorker
    executor = new ScheduledThreadPoolExecutor(1, new NamedThreadFactory("degradeCheckWorker", 1, true));
    // 定时任务的频率默认值是2000毫秒,通过client.tm.degradeCheckPeriod配置
    executor.scheduleAtFixedRate(() -> {
        // 定时任务也有条件,会判断当前是否要做TC事务能力试探
        if (ATOMIC_DEGRADE_CHECK.get()) {
            try {
                // 模拟一套全局事务的开启和提交,用于检测TC是否正常服务,事务的名字是degradeCheck
                String xid = TransactionManagerHolder.get().begin(null, null, "degradeCheck", 60000);
                TransactionManagerHolder.get().commit(xid);
                // 如果模拟的事务正常提交了,则投递降级检测成功的消息,onDegradeCheck中做计数梳理
                EVENT_BUS.post(new DegradeCheckEvent(true));
            } catch (Exception e) {
                // 如果模拟的事务遇到的问题,则投递降级检测失败的消息,onDegradeCheck中做计数梳理
                EVENT_BUS.post(new DegradeCheckEvent(false));
            }
        }
    }, degradeCheckPeriod, degradeCheckPeriod, TimeUnit.MILLISECONDS);
}

5)若停止事务能力降级检测,关闭标志开关,销毁线程池。

private static void stopDegradeCheck() {
    // 关闭标志开关
    if (!ATOMIC_DEGRADE_CHECK.compareAndSet(true, false)) {
        return;
    }
    // 关闭定时任务的线程池,其中的定时任务自然也被销毁
    if (executor != null && !executor.isShutdown()) {
        executor.shutdown();
    }
}

6)检测统计
借助了 Guava 中的 EventBus 这个的事件处理机制(是观察者模式(生产/消费模型)的一种实现)来实现降级检测相关的统计计算。当执行真实事务或模拟事务的时候,会根据事务结果投递成功或失败的事件信息,如下

EVENT_BUS.post(new DegradeCheckEvent(true));
EVENT_BUS.post(new DegradeCheckEvent(false));

监听事务成功还是失败的事件,以调整统计计数。观察者的注册发生在GlobalTransactionalInterceptor中,其构造函数中有EVENT_BUS.register(this),用于注册观察者,也即事件的消费者,那消费的逻辑在哪里呢?在GlobalTransactionalInterceptor#onDegradeCheck方法之上有@Subscribe注解,表明此方法是事件的消费处理主体。其核心逻辑如下:

  • 这里有 1 个阈值 degradeCheckAllowTimes 和两个计数器 reachNum、degradeNum,这两个计数器都是阈值比较
  • degradeNum 是记录了连续失败次数,当失败次数未达到阈值的时候,遇到一次成功就把 degradeNum 技术恢复成 0
  • 当连续失败次数 degradeNum 达到阈值的时候,事务就被被禁用了,业务逻辑中不会使用再使用事务,
  • 之后就依靠定时任务里的默认事务试探 TC 是否正常,频率默认值是 2000 毫秒,通过 client.tm.degradeCheckPeriod 配置
  • 当 executor 定时任务在试探过程中一旦遇到一次失败,就把试探连续成功的计数 reachNum 重置为 0
  • 当 executor 定时任务中的试探事务连续成功次数达到阈值后,才会重新激活事务能力。
@Subscribe // @Subscribe监听 EVENT_BUS 的的事件,
public static void onDegradeCheck(DegradeCheckEvent event) {
    if (event.isRequestSuccess()) {
        // 当 degradeNum >= degradeCheckAllowTimes 时,实际是事务已经被禁用了
        // 那什么情况下,事务能力被重新激活呢?
        // 当降级激活后,executor定时任务试探事务要连续成功次数达到阈值后,才会重新激活事务能力。
        if (degradeNum >= degradeCheckAllowTimes) {
            reachNum++;
            if (reachNum >= degradeCheckAllowTimes) {
                reachNum = 0;
                degradeNum = 0;
                if (LOGGER.isInfoEnabled()) {
                    LOGGER.info("the current global transaction has been restored");
                }
            }
        } else if (degradeNum != 0) {
            // 当失败次数未达到阈值的时候,遇到一次成功就把degradeNum技术恢复成0,意味着degradeNum是记录了连续失败次数。
            degradeNum = 0;
        }
    } else {
        if (degradeNum < degradeCheckAllowTimes) {
            degradeNum++;
            // 当连续失败达到阈值后,打印warn日志,the current global transaction has been automatically downgraded
            // 并且会激活降级
            if (degradeNum >= degradeCheckAllowTimes) {
                if (LOGGER.isWarnEnabled()) {
                    LOGGER.warn("the current global transaction has been automatically downgraded");
                }
            }
            //当降级激活后,会有定时任务试探事务能力是否正常,在试探过程中一旦遇到一次失败,就把试探连续成功的计数reachNum重置为0
            //也就是说当降级激活后,定时任务试探事务要连续成功次数达到阈值后,才会重新激活事务能力。
        } else if (reachNum != 0) {
            reachNum = 0;
        }
    }
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/518634.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

深度学习03-卷积神经网络(CNN)

简介 CNN&#xff0c;即卷积神经网络&#xff08;Convolutional Neural Network&#xff09;&#xff0c;是一种常用于图像和视频处理的深度学习模型。与传统神经网络相比&#xff0c;CNN 有着更好的处理图像和序列数据的能力&#xff0c;因为它能够自动学习图像中的特征&…

GIS 根据投影坐标点获取投影坐标所属的投影坐标系EPSG

什么是EPSG? EPSG&#xff08;The European Petroleum Survey Group,官网 http://www.epsg.org/&#xff09;维护着空间参照对象的数据集&#xff0c;OGC标准中空间参照系统的SRID&#xff08;Spatial Reference System Identifier&#xff09;与EPSG的空间参照系统ID相一致。…

冒泡数组实现和冒泡数组的改进以及插入法排序

概念 在数组排序的过程中&#xff0c;每次比较相邻的两个数&#xff0c;并且把大的数放在后面 例如{1&#xff0c;3&#xff0c;5&#xff0c;7&#xff0c;9&#xff0c;2&#xff0c;4&#xff0c;6&#xff0c;8&#xff0c;10} 实现 #include<iostream> using names…

修改ro.debuggable用于调试安卓应用

一.Root 网上有很多root教程,这里推荐: 玩机必看&#xff01;带你入坑安卓刷机&#xff0c;小白也能看懂的ROOT基础指南来啦&#xff01; 很详细的介绍了Root的原理和方法,强烈推荐使用Magisk工具 可以使用命令adb shell getprop ro.debuggable查看ro.debuggable的值 二.Magi…

[MYSQL / Mariadb]数据库学习-表结构、键值(普通索引、主键、外键)

数据库学习-表结构、键值 回顾数据类型表结构字段约束条件&#xff08;限制字段赋值的&#xff09;例&#xff0c;建表并设置约束条件设置default列为0 修改表结构例添加新字段&#xff0c;并设置位置。修改字段类型change 修改字段名注意&#xff1a;在修改字段名或字段类型时…

Java中的 stop the world是什么?

一、概述&#xff1b; 从字面上讲&#xff0c;就是停止这个世界&#xff0c;看到这个字眼&#xff0c;就觉得这是可怕的事情&#xff0c;那到底什么是stop-the-world&#xff1f; stop-the-world&#xff0c;简称 STW&#xff0c;指的是 GC 事件发生过程中&#xff0c;会产生…

面试题30天打卡-day26

1、什么是 AOP&#xff1f;Spring AOP 和 AspectJ AOP 有什么区别&#xff1f;有哪些实现 AOP 的方式&#xff1f; AOP&#xff08;Aspect-Oriented Programming&#xff0c;面向切面编程&#xff09;是一种编程思想&#xff0c;可以在不修改原有业务逻辑代码的情况下&#xf…

srs 直播连麦环境搭建

一、简介 二、修改conf/rtc.conf 三、两个客户端加入房间 四、合流 4.1分别拉流尝试 4.2合流推流 4.3拉取合流 一、简介 直播连麦是指在one2one或one2many进行音视频通话&#xff0c;此时把他们的音视频流合在一起&#xff0c;通过rtmp等协议推送给大量用户做直播。 因此首选需…

Java面试(3)基础语法

基础语法 1. 标识符和关键字的区别是什么&#xff1f; 在我们编写程序的时候&#xff0c;需要大量地为程序、类、变量、方法等取名字&#xff0c;于是就有了 标识符 。简单来说&#xff0c; 标识符就是一个名字 。 有一些标识符&#xff0c;Java 语言已经赋予了其特殊的含义…

python带你用最简单嘚词云图分析出最热话题

前言 嗨喽&#xff0c;大家好呀~这里是爱看美女的茜茜呐 平常我们爬的评论、弹幕等等&#xff0c;数量又多又密&#xff0c;根本看不过来&#xff0c; 这时候数据分析的作用来了&#xff0c; 今天我们就试试用Python根据这些数据&#xff0c;来绘制词云图进行热词分析。 目录…

JUC-线程Callable使用与FutureTask源码阅读

JUC-线程Callable使用与FutureTask源码阅读 Callable简单使用 带返回值的线程(实现implements Callable<返回值类型>)&#xff0c;使用示例 import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; import java.util.concurrent.Fut…

STC32G12K128单片机的 moubus-rtu 主机测试工程

简介 STC32G12K128 是STC 推出的一款32位的 C251 的单片机。最近拿到一块官方申请的 屠龙刀-STC32G开发板&#xff0c;就用它的提供的库函数&#xff0c;查考安富莱提供的 modbus 例程移植了一个 modbus-rtu 主站的工程。 modbus-rtu host 移植注意点 modbus-rtu 功能配置 …

MapReduce序列化【用户流量使用统计】

目录 什么是序列化和反序列化&#xff1f; 序列化 反序列化 为什么要序列化&#xff1f; 序列化的主要应用场景 MapReduce实现序列化 自定义bean对象实现Writable接口 1.实现Writable接口 2.无参构造 3.重写序列化方法 4.重写反序列化方法 5.顺序一致 6.重写toStri…

您应该查看的5个ChatGPT WordPress插件

要创建免费网站&#xff1f;从易服客建站平台免费开始 500M免费空间&#xff0c;可升级为20GB电子商务网站 您应该查看的5个ChatGPT WordPress插件 发布于 2023年4月1日 ChatGPT 席卷了数字世界。作为内容创建者或营销者&#xff0c;您可能希望通过在您的网站上使用ChatGPT…

JimuReport - 积木报表(一款免费Web报表工具)

一款免费的数据可视化报表&#xff0c;含报表和大屏设计&#xff0c;像搭建积木一样在线设计报表&#xff01;功能涵盖&#xff0c;数据报表、打印设计、图表报表、大屏设计等&#xff01; Web 版报表设计器&#xff0c;类似于excel操作风格&#xff0c;通过拖拽完成报表设计。…

扫雷【C语言】

用C语言实现一个9X9的扫雷 test.c 测试部分 game.c 游戏实现部分 game.h 游戏声明部分 菜单部分 游戏部分 游戏部分包括创建一个扫雷的区域&#xff0c;在其中埋雷&#xff0c;玩家进行扫雷&#xff0c;判断扫雷是否成功 这里我i们定义行列&#xff0c;便于以后将其…

2023.05.12-使用Transformers Agents来一键调用千万个AI模型

1. 简介 简单来说&#xff0c;就是以前想要实现某一个AI功能&#xff0c;需要自己去网上搜索对应的模型、下载对应的权重才能使用。现在可以把中间的这些个环节都砍了&#xff0c;我们只需要告诉模型我们想要对某段文字或者某张图片进行什么操作&#xff0c;transformer就会自…

【经验贴】项目风险管理的有效方法

你遇见过“最奇葩”的项目风险是什么&#xff1f; 中级项目经理小李&#xff1a;我比较幸运? 项目一开始发现客户的需求不太明确&#xff0c;就识别出可能会有范围无限蔓延的风险&#xff0c;制定了一系列的应对措施&#xff0c;不出所料出了问题&#xff0c;最终还是将风险遏…

springboot整合redis,MongoDB,Elasticsearch(ES)

目录 springboot整合redis 连接Redis 字符串操作 哈希表操作 列表操作 集合操作 有序集合操作 lettcus与jedis的区别 springboot整合MongoDB 新增数据 查询数据 更新数据 删除数据 springboot整合Elasticsearch&#xff08;ES&#xff09; 创建ElasticsearchRepo…

区间预测 | MATLAB实现QRCNN-BiLSTM卷积双向长短期记忆神经网络分位数回归时间序列区间预测

区间预测 | MATLAB实现QRCNN-BiLSTM卷积双向长短期记忆神经网络分位数回归时间序列区间预测 目录 区间预测 | MATLAB实现QRCNN-BiLSTM卷积双向长短期记忆神经网络分位数回归时间序列区间预测效果一览基本介绍模型描述程序设计参考资料 效果一览 基本介绍 1.Matlab实现基于QRCNN…