分布式事务Seata源码解析13:TCC事务模式实现原理

news2024/11/25 12:30:30

文章目录

  • 一、前言
  • 二、GlobalTransactionScanner
    • 1、判断某一个Bean是否需要做TCC动态代理
      • 1> 判断bean实现的接口中是否有接口标注了@LocalTCC注解
      • 2> 当Bean的某一个接口实现了@LocalTCC注解之后,解析相应接口中的TCC内容:
      • TCCResource数据样例
      • 3> TCCResource注册到TC的流程
        • 1)RM发起注册Resource请求
        • 2)TC接收注册Resource请求
  • 三、发起TCC分支事务
    • 1、ActionInterceptorHandler
      • 分支事务注册到全局事务
  • 四、TCC分支事务提交
  • 五、TCC分支事务回滚
  • 六、总结

一、前言

更多内容见Seata专栏:https://blog.csdn.net/saintmm/category_11953405.html

至此,seata系列的内容已出:

  1. can not get cluster name in registry config ‘service.vgroupMapping.xx‘, please make sure registry问题解决;
  2. Seata Failed to get available servers: endpoint format should like ip:port 报错原因/解决方案汇总版(看完本文必解决问题)
  3. Seata json decode exception, Cannot construct instance of java.time.LocalDateTime报错原因/解决方案最全汇总版
  4. 【微服务 31】超细的Spring Cloud 整合Seata实现分布式事务(排坑版)
  5. 【微服务 32】Spring Cloud整合Seata、Nacos实现分布式事务案例(巨细排坑版)【云原生】
  6. 【微服务33】分布式事务Seata源码解析一:在IDEA中启动Seata Server
  7. 【微服务34】分布式事务Seata源码解析二:Seata Server启动时都做了什么
  8. 【微服务35】分布式事务Seata源码解析三:从Spring Boot特性来看Seata Client 启动时都做了什么
  9. 【微服务36】分布式事务Seata源码解析四:图解Seata Client 如何与Seata Server建立连接、通信
  10. 【微服务37】分布式事务Seata源码解析五:@GlobalTransactional如何开启全局事务
  11. 【微服务38】分布式事务Seata源码解析六:全局/分支事务分布式ID如何生成?序列号超了怎么办?时钟回拨问题如何处理?
  12. 【微服务39】分布式事务Seata源码解析七:图解Seata事务执行流程之开启全局事务
  13. 分布式事务Seata源码解析八:本地事务执行流程(AT模式下)
  14. 分布式事务Seata源码解析九:分支事务如何注册到全局事务
  15. 分布式事务Seata源码解析十:AT模式回滚日志undo log详细构建过程
  16. 分布式事务Seata源码解析11:全局事务执行流程之两阶段全局事务提交
  17. 分布式事务Seata源码解析12:全局事务执行流程之全局事务回滚
  18. Spring Cloud整合Seata实现TCC分布式事务模式案例

二、GlobalTransactionScanner

在seata-client启动时会利用SpringBoot自动装配的特性加载GlobalTransactionScanner类。具体见博文:《Seata Client 启动时都做了什么?》

另外,在分布式事务Seata源码解析五:@GlobalTransactional如何开启全局事务一文中,笔者和大家聊了:

  • GlobalTransactionScanner类继承了AbstractAutoProxyCreatorAbstractAutoProxyCreator类又实现了BeanPostProcessor接口;因此GlobalTransactionScanner类也是BPP(BeanPostProcessor)。
  • AbstractAutoProxyCreator是Spring AOP中的一个抽象类,其主要功能是自动创建动态代理;因为其实现了BeanPostProcessor接口,所以在类加载到Spring容器之前,会进入到其wrapIfNecessary()方法对Bean进行代理包装,后续调用Bean之将委托给指定的拦截器。

GlobalTransactionScanner#wrapIfNecessary()方法关键部分:
在这里插入图片描述

所以在bean加载到Spring容器之前,都会先判断bean是否需要做TCC的动态代理,如果需要则增加相应拦截器TccActionInterceptor

1、判断某一个Bean是否需要做TCC动态代理

简单来讲,就是判断Bean实现的所有接口中是否有一个接口上标注了@LocalTCC注解;如果有,则当前Bean需要做TCC动态代理。

1> 判断bean实现的接口中是否有接口标注了@LocalTCC注解

代码执行如下:

在这里插入图片描述

其中:

  1. RemotingParser负责提取远程处理bean信息;其一共有四个具体实现:
    在这里插入图片描述
    • 针对普通SpringCloud项目的LocalTCCRemotingParser;
    • 针对阿里内部微服务架构HSF的HSFRemotingParser;
    • 针对Dubbo项目的DubboRemotingParser;
    • 针对SofaRpc框架的SofaRpcRemotingParser;
  2. LocalTCCRemotingParser而言,其isReference()方法和isService()方法的实现都一样的逻辑:判断当前bean实现的所有接口中是否有接口标注了@LocalTCC注解。

2> 当Bean的某一个接口实现了@LocalTCC注解之后,解析相应接口中的TCC内容:

在这里插入图片描述

DefaultRemotingParser#parserRemotingServiceInfo():

public RemotingDesc parserRemotingServiceInfo(Object bean, String beanName, RemotingParser remotingParser) {
    // 获取Bean实现的标注了@LocalTCC注解的接口信息:接口全路径类名、接口Class信息、实现接口的当前Bean(动态代理后的)
    RemotingDesc remotingBeanDesc = remotingParser.getServiceDesc(bean, beanName);
    if (remotingBeanDesc == null) {
        return null;
    }
    // 后续获取当前bean对应的TCC信息,走remotingServiceMap缓存
    remotingServiceMap.put(beanName, remotingBeanDesc);

    Class<?> interfaceClass = remotingBeanDesc.getInterfaceClass();
    Method[] methods = interfaceClass.getMethods();
    // 判断当前Bean是否为serviceBean,就LocalTCC而言,走到这里就一定是的。
    if (remotingParser.isService(bean, beanName)) {
        try {
            //service bean, registry resource
            Object targetBean = remotingBeanDesc.getTargetBean();
            // 遍历实现了@LocalTCC接口的所有方法,如果方法上标注了@TwoPhaseBusinessAction注解,则将一些TCC信息包装为TCCResource注册到TC(seata server)
            //    TCC信息包括:@TwoPhaseBusinessAction名称,目标类,try方法,commit方法、方法名称、方法入参(默认只有BusinessActionContext),rollback方法、方法名称、方法入参(默认只有BusinessActionContext),封装的两阶段方法的key
            //    将TCCResource注册到TC时,resourceId为@TwoPhaseBusinessAction名称,这也是@TwoPhaseBusinessAction名称要全局唯一的原因
            for (Method m : methods) {
                TwoPhaseBusinessAction twoPhaseBusinessAction = m.getAnnotation(TwoPhaseBusinessAction.class);
                if (twoPhaseBusinessAction != null) {
                    TCCResource tccResource = new TCCResource();
                    tccResource.setActionName(twoPhaseBusinessAction.name());
                    tccResource.setTargetBean(targetBean);
                    tccResource.setPrepareMethod(m);
                    tccResource.setCommitMethodName(twoPhaseBusinessAction.commitMethod());
                    tccResource.setCommitMethod(interfaceClass.getMethod(twoPhaseBusinessAction.commitMethod(),
                            twoPhaseBusinessAction.commitArgsClasses()));
                    tccResource.setRollbackMethodName(twoPhaseBusinessAction.rollbackMethod());
                    tccResource.setRollbackMethod(interfaceClass.getMethod(twoPhaseBusinessAction.rollbackMethod(),
                            twoPhaseBusinessAction.rollbackArgsClasses()));
                    // set argsClasses
                    tccResource.setCommitArgsClasses(twoPhaseBusinessAction.commitArgsClasses());
                    tccResource.setRollbackArgsClasses(twoPhaseBusinessAction.rollbackArgsClasses());
                    // set phase two method's keys
                    tccResource.setPhaseTwoCommitKeys(this.getTwoPhaseArgs(tccResource.getCommitMethod(),
                            twoPhaseBusinessAction.commitArgsClasses()));
                    tccResource.setPhaseTwoRollbackKeys(this.getTwoPhaseArgs(tccResource.getRollbackMethod(),
                            twoPhaseBusinessAction.rollbackArgsClasses()));
                    // registry tcc resource,将当前TCCResource作为资源注册到TC(Seata Server)中。
                    DefaultResourceManager.get().registerResource(tccResource);
                }
            }
        } catch (Throwable t) {
            throw new FrameworkException(t, "parser remoting service error");
        }
    }
    if (remotingParser.isReference(bean, beanName)) {
        //reference bean, TCC proxy
        remotingBeanDesc.setReference(true);
    }
    return remotingBeanDesc;
}

解析TCC内容具体逻辑:

  1. 获取Bean实现的标注了@LocalTCC注解的接口信息:接口全路径类名、接口Class信息、实现接口的当前Bean(动态代理后的);
  2. 将当前bean的LocalTCC信息保存到Map缓存remotingServiceMap中,后续获取bean对应的LocalTCC信息,都走remotingServiceMap缓存;
  3. 遍历实现了@LocalTCC接口的所有方法,如果方法上标注了@TwoPhaseBusinessAction注解,则将一些TCC信息包装为TCCResource注册到TC(seata server);
    • TCC信息包括:@TwoPhaseBusinessAction名称,目标类,try方法,commit方法、方法名称、方法入参(默认只有BusinessActionContext),rollback方法、方法名称、方法入参(默认只有BusinessActionContext),封装的两阶段方法的key;
    • 将TCCResource注册到TC时,resourceId为@TwoPhaseBusinessAction名称,这也是@TwoPhaseBusinessAction名称要全局唯一的原因。

TCCResource数据样例

在这里插入图片描述

3> TCCResource注册到TC的流程

RM/TM 和 TC交互的详细流程见博文:分布式事务Seata源码解析四:图解Seata Client 如何与Seata Server建立连接、通信

1)RM发起注册Resource请求

在这里插入图片描述

请求为:RegisterRMRequest,其中resourceId信息最为关键;

RM在将TCCResource注册到TC时,会在以resourceId为key在本地缓存TCCResource的具体信息;

在这里插入图片描述

2)TC接收注册Resource请求

在seata-server启动时会注册各种请求类型对应的处理器;

由于RegisterRMRequest对应的MessgeType为TYPE_REG_RM
在这里插入图片描述

所以对应的处理器为RegRmProcessor:
在这里插入图片描述

RegRmProcessor处理时最终会将Resource数据(resourceId、clientIp)保存在TC的内存层面:
在这里插入图片描述

三、发起TCC分支事务

调用标注了@LocalTCC注解的接口的实现类的方法时,会进入到TccActionInterceptor#invoke()方法,这正是发起TCC分支事务的入口;

@Override
public Object invoke(final MethodInvocation invocation) throws Throwable {
    // 当前事务必须处于全局事务中
    if (!RootContext.inGlobalTransaction() || disable || RootContext.inSagaBranch()) {
        //not in transaction, or this interceptor is disabled
        return invocation.proceed();
    }
    // 获取本次要调用的方法,在class接口中的体现
    Method method = getActionInterfaceMethod(invocation);
    TwoPhaseBusinessAction businessAction = method.getAnnotation(TwoPhaseBusinessAction.class);
    // try method,方法必须要标注@TwoPhaseBusinessAction注解才会走TCC逻辑
    if (businessAction != null) {
        // save the xid
        String xid = RootContext.getXID();
        //save the previous branchType
        BranchType previousBranchType = RootContext.getBranchType();
        //if not TCC, bind TCC branchType
        if (BranchType.TCC != previousBranchType) {
            RootContext.bindBranchType(BranchType.TCC);
        }
        try {
            // Handler the TCC Aspect, and return the business result
            return actionInterceptorHandler.proceed(
                    method, // 目标业务方法
                    invocation.getArguments(), // 调用方法时传递进来的参数
                    xid, // 全局事务xid
                    businessAction, // 标注的@TwoPhaseBusinessAction注解内容
                    invocation::proceed); // 目标业务方法的执行
        } finally {
            //if not TCC, unbind branchType
            if (BranchType.TCC != previousBranchType) {
                RootContext.unbindBranchType();
            }
            //MDC remove branchId
            MDC.remove(RootContext.MDC_KEY_BRANCH_ID);
        }
    }

    //not TCC try method
    return invocation.proceed();
}
  1. 如果当前事务不处于全局事务中 或 禁用了全局事务 或 当前事务模式为SAGA,则直接执行目标方法;
  2. 如果要执行的方法,没有@TwoPhaseBusinessAction注解标注,则直接执行目标方法;
  3. 否者,将TCC事务模式绑定到RootContext,将全局事务xid、方法的入参、@TwoPhaseBusinessAction注解内容、目标业务方法的执行传递到ActionInterceptorHandler#proceed()方法中进行TCC切面的处理、业务的执行。
  4. 最后清理RootContext的信息、包括TCC分支事务类型、分支事务branchId。

1、ActionInterceptorHandler

public Object proceed(Method method, Object[] arguments, String xid, TwoPhaseBusinessAction businessAction,
                                   Callback<Object> targetCallback) throws Throwable {
    // Get action context from arguments, or create a new one and then reset to arguments
    BusinessActionContext actionContext = getOrCreateActionContextAndResetToArguments(method.getParameterTypes(), arguments);

    // Set the xid
    actionContext.setXid(xid);
    // Set the action name,TCC注解@TwoPhaseBusinessAction中定义的tcc业务动作名称
    String actionName = businessAction.name();
    actionContext.setActionName(actionName);
    // Set the delay report,延时report上报
    actionContext.setDelayReport(businessAction.isDelayReport());

    // Creating Branch Record,发起TCC分支事务的注册 并 创建分支事务的记录,拿到TC返回的branch_id
    String branchId = doTccActionLogStore(method, arguments, businessAction, actionContext);
    actionContext.setBranchId(branchId);
    // MDC put branchId
    MDC.put(RootContext.MDC_KEY_BRANCH_ID, branchId);

    // save the previous action context
    BusinessActionContext previousActionContext = BusinessActionContextUtil.getContext();
    try {
        //share actionContext implicitly
        BusinessActionContextUtil.setContext(actionContext);

        if (businessAction.useTCCFence()) {
            try {
                // Use TCC Fence, and return the business result
                return TCCFenceHandler.prepareFence(xid, Long.valueOf(branchId), actionName, targetCallback);
            } catch (SkipCallbackWrapperException | UndeclaredThrowableException e) {
                Throwable originException = e.getCause();
                if (originException instanceof FrameworkException) {
                    LOGGER.error("[{}] prepare TCC fence error: {}", xid, originException.getMessage());
                }
                throw originException;
            }
        } else {
            // 执行目标方法,Execute business, and return the business result
            return targetCallback.execute();
        }
    } finally {
        try {
            //to report business action context finally if the actionContext.getUpdated() is true
            BusinessActionContextUtil.reportContext(actionContext);
        } finally {
            if (previousActionContext != null) {
                // recovery the previous action context
                BusinessActionContextUtil.setContext(previousActionContext);
            } else {
                // clear the action context
                BusinessActionContextUtil.clear();
            }
        }
    }
}
  1. 尝试从方法入参中获取BusinessActionContext,如果获取不到,则创建一个BusinessActionContext
  2. 将@TwoPhaseBusinessAction注解中的name()作为TCC业务执行上下文的actionName、isDelayReport()作为延时上报标识。
  3. 根据@BusinessActionContextParameter从方法入参中提取出一些数据放到TCC业务执行上下文中,try方法开始时间、RM本地IP等信息也放入到TCC业务执行上下文。
  4. 向TC发起分支事务注册请求,获取到返回的branchId。
  5. 最后执行目标业务方法。

其中businessAction.useTCCFence()为TRUE时,标识seata对TCC幂等、悬挂、空回滚问题的处理。见下一篇文章。

分支事务注册到全局事务

解析入参中的@BusinessActionContextParameter注解、向TC注册分支事务均体现在doTccActionLogStore()方法:

protected String doTccActionLogStore(Method method, Object[] arguments, TwoPhaseBusinessAction businessAction,
                                     BusinessActionContext actionContext) {
    String actionName = actionContext.getActionName();
    String xid = actionContext.getXid();

    //region fetch context and init action context

    // 根据@BusinessActionContextParameter从方法入参中提取出一些数据放到BusinessActionContext上下文中
    Map<String, Object> context = fetchActionRequestContext(method, arguments);
    // 记录TCC开始的时间
    context.put(Constants.ACTION_START_TIME, System.currentTimeMillis());

    // Init business context,
    // 初始化业务上下文,将@TwoPhaseBusinessAction的参数(name()、commit()、rollback()、useTCCFence())放到上下文中
    initBusinessContext(context, method, businessAction);
    //I nit running environment context
    // 将本地ip添加到BusinessActionContext中
    initFrameworkContext(context);

    Map<String, Object> originContext = actionContext.getActionContext();
    if (CollectionUtils.isNotEmpty(originContext)) {
        //Merge context and origin context if it exists.
        //@since: above 1.4.2
        originContext.putAll(context);
        context = originContext;
    } else {
        actionContext.setActionContext(context);
    }

    //endregion

    //Init applicationData
    Map<String, Object> applicationContext = Collections.singletonMap(Constants.TCC_ACTION_CONTEXT, context);
    String applicationContextStr = JSON.toJSONString(applicationContext);
    try {
        // registry branch record
        // 向TC发起分支事务注册请求
        Long branchId = DefaultResourceManager.get().branchRegister(BranchType.TCC, actionName, null, xid,
                applicationContextStr, null);
        return String.valueOf(branchId);
    } catch (Throwable t) {
        String msg = String.format("TCC branch Register error, xid: %s", xid);
        LOGGER.error(msg, t);
        throw new FrameworkException(t, msg);
    }
}
  1. 调用fetchActionRequestContext()方法,根据@BusinessActionContextParameter从方法入参中提取出一些数据放到BusinessActionContext;
  2. 将try方法开始时间放入到BusinessActionContext;
  3. 调用initBusinessContext()方法,将@TwoPhaseBusinessAction的参数(name()、commit()、rollback()、useTCCFence())放到上下文中;
  4. 调用initFrameworkContext()方法,将本地ip添加到BusinessActionContext中;
  5. 将分支事务注册请求BranchRegisterRequest通过netty框架发送到TC,进行分支事务的注册。TC会将分支事务信息持久化。
    • 分支事务的BusinessActionContext信息会作为applicationContext的一部分持久化到branch_table的application_data字段中。

具体分支事务注册到全局事务的流程见博文:《分布式事务Seata源码解析九:分支事务如何注册到全局事务》

四、TCC分支事务提交

详细执行流程见博文: 分布式事务Seata源码解析11:全局事务执行流程之两阶段全局事务提交

分支事务的提交最终走到ResourceManager的具体实现类TCCResourceManager#branchCommit()方法:

@Override
public BranchStatus branchCommit(BranchType branchType, String xid, long branchId, String resourceId,
                                 String applicationData) throws TransactionException {
    // 根据resourceId(也就是@TwoPhaseBusinessAction注解中的name())从内存层面的缓存Map中获取到对应的TCCResource;
    TCCResource tccResource = (TCCResource)tccResourceCache.get(resourceId);
    if (tccResource == null) {
        throw new ShouldNeverHappenException(String.format("TCC resource is not exist, resourceId: %s", resourceId));
    }
    // 获取目标bean
    Object targetTCCBean = tccResource.getTargetBean();
    // 获取目标bean的commit()方法
    Method commitMethod = tccResource.getCommitMethod();
    if (targetTCCBean == null || commitMethod == null) {
        throw new ShouldNeverHappenException(String.format("TCC resource is not available, resourceId: %s", resourceId));
    }
    try {
        // BusinessActionContext数据会作为applicationData的一部分,由TC发送过来。需要从中提取出两阶段提交的参数(比如解析@BusinessActionContextParameter注解得到的数据);
        BusinessActionContext businessActionContext = getBusinessActionContext(xid, branchId, resourceId,
            applicationData);
        Object[] args = this.getTwoPhaseCommitArgs(tccResource, businessActionContext);
        Object ret;
        boolean result;
        // add idempotent and anti hanging
        if (Boolean.TRUE.equals(businessActionContext.getActionContext(Constants.USE_TCC_FENCE))) {
            try {
                result = TCCFenceHandler.commitFence(commitMethod, targetTCCBean, xid, branchId, args);
            } catch (SkipCallbackWrapperException | UndeclaredThrowableException e) {
                throw e.getCause();
            }
        } else {
            // 使用反射将BusinessActionContext作为入参对目标bean的commit()方法进行调用
            ret = commitMethod.invoke(targetTCCBean, args);
            if (ret != null) {
                if (ret instanceof TwoPhaseResult) {
                    result = ((TwoPhaseResult)ret).isSuccess();
                } else {
                    result = (boolean)ret;
                }
            } else {
                result = true;
            }
        }
        LOGGER.info("TCC resource commit result : {}, xid: {}, branchId: {}, resourceId: {}", result, xid, branchId, resourceId);
        return result ? BranchStatus.PhaseTwo_Committed : BranchStatus.PhaseTwo_CommitFailed_Retryable;
    } catch (Throwable t) {
        String msg = String.format("commit TCC resource error, resourceId: %s, xid: %s.", resourceId, xid);
        LOGGER.error(msg, t);
        return BranchStatus.PhaseTwo_CommitFailed_Retryable;
    }
}
  1. 先根据resourceId(也就是@TwoPhaseBusinessAction注解中的name())从内存层面的缓存Map中获取到对应的TCCResource;进而获取到目标bean、目标bean的commit()方法;
    • BusinessActionContext数据会作为applicationData的一部分,由TC发送过来。需要从中提取出两阶段提交的参数(比如解析@BusinessActionContextParameter注解得到的数据);
    • 再将xid、branchId都封装到BusinessActionContext中。
  2. 使用反射将BusinessActionContext作为入参对目标bean的commit()方法进行调用;根据commit()方法是否执行成功,来决定TC是否需要重试分支事务的提交。

五、TCC分支事务回滚

详细执行流程见博文:分布式事务Seata源码解析12:全局事务执行流程之全局事务回滚

和分支事务的提交类似,分支事务的提交最终走到ResourceManager的具体实现类TCCResourceManager#branchRollback()方法:

@Override
public BranchStatus branchRollback(BranchType branchType, String xid, long branchId, String resourceId,
                                   String applicationData) throws TransactionException {
    // 根据resourceId(也就是@TwoPhaseBusinessAction注解中的name())从内存层面的缓存Map中获取到对应的TCCResource;
    TCCResource tccResource = (TCCResource)tccResourceCache.get(resourceId);
    if (tccResource == null) {
        throw new ShouldNeverHappenException(String.format("TCC resource is not exist, resourceId: %s", resourceId));
    }
    // 获取目标bean
    Object targetTCCBean = tccResource.getTargetBean();
    // 获取目标bean的collback()方法
    Method rollbackMethod = tccResource.getRollbackMethod();
    if (targetTCCBean == null || rollbackMethod == null) {
        throw new ShouldNeverHappenException(String.format("TCC resource is not available, resourceId: %s", resourceId));
    }
    try {
        // BusinessActionContext数据会作为applicationData的一部分,由TC发送过来。需要从中提取出两阶段提交的参数(比如解析@BusinessActionContextParameter注解得到的数据);
        BusinessActionContext businessActionContext = getBusinessActionContext(xid, branchId, resourceId,
            applicationData);
        Object[] args = this.getTwoPhaseRollbackArgs(tccResource, businessActionContext);
        Object ret;
        boolean result;
        // add idempotent and anti hanging
        if (Boolean.TRUE.equals(businessActionContext.getActionContext(Constants.USE_TCC_FENCE))) {
            try {
                result = TCCFenceHandler.rollbackFence(rollbackMethod, targetTCCBean, xid, branchId,
                        args, tccResource.getActionName());
            } catch (SkipCallbackWrapperException | UndeclaredThrowableException e) {
                throw e.getCause();
            }
        } else {
            // 使用反射将BusinessActionContext作为入参对目标bean的commit()方法进行调用
            ret = rollbackMethod.invoke(targetTCCBean, args);
            if (ret != null) {
                if (ret instanceof TwoPhaseResult) {
                    result = ((TwoPhaseResult)ret).isSuccess();
                } else {
                    result = (boolean)ret;
                }
            } else {
                result = true;
            }
        }
        LOGGER.info("TCC resource rollback result : {}, xid: {}, branchId: {}, resourceId: {}", result, xid, branchId, resourceId);
        return result ? BranchStatus.PhaseTwo_Rollbacked : BranchStatus.PhaseTwo_RollbackFailed_Retryable;
    } catch (Throwable t) {
        String msg = String.format("rollback TCC resource error, resourceId: %s, xid: %s.", resourceId, xid);
        LOGGER.error(msg, t);
        return BranchStatus.PhaseTwo_RollbackFailed_Retryable;
    }
}

这里和分支事务提交的逻辑是一样的!

  1. 先根据resourceId(也就是@TwoPhaseBusinessAction注解中的name())从内存层面的缓存Map中获取到对应的TCCResource;进而获取到目标bean、目标bean的rollback()方法;
    • BusinessActionContext数据会作为applicationData的一部分,由TC发送过来。需要从中提取出两阶段提交的参数(比如解析@BusinessActionContextParameter注解得到的数据);
    • 再将xid、branchId都封装到BusinessActionContext中。
  2. 使用反射将BusinessActionContext作为入参对目标bean的rollback()方法进行调用;根据rollback()方法是否执行成功,来决定TC是否需要重试分支事务的提交。

六、总结

1> seata-client(TM/RM)启动时:

  • 对标注了@LocalTCC注解接口的实现类多动态代理、并将@TwoPhaseBusinessAction注解的内容封装为TCCResource,保存在各个RM的TCCResourceManager中的Map缓存tccResourceCache中(key为resourceId(即:@TwoPhaseBusinessAction的name()));
  • 将resourceId注册到TC中,TC将其保存在内存中。

2> 执行分支事务:

调用标注了@LocalTCC注解的接口的实现类的方法时,会进入到TccActionInterceptor#invoke()方法:

  1. 如果当前事务不处于全局事务中 或 禁用了全局事务 或 当前事务模式为SAGA,则直接执行目标方法;
  2. 如果要执行的方法,没有@TwoPhaseBusinessAction注解标注,则直接执行目标方法;
  3. 否者,将TCC事务模式绑定到RootContext,将全局事务xid、方法的入参、@TwoPhaseBusinessAction注解内容、目标业务方法的执行传递到ActionInterceptorHandler#proceed()方法中进行TCC切面的处理、业务的执行。
    • 尝试从方法入参中获取BusinessActionContext,如果获取不到,则创建一个BusinessActionContext;
    • 将@TwoPhaseBusinessAction注解中的name()作为TCC业务执行上下文的actionName、isDelayReport()作为延时上报标识;
    • 根据@BusinessActionContextParameter从方法入参中提取出一些数据放到TCC业务执行上下文中,try方法开始时间、RM本地IP等信息也放入到TCC业务执行上下文;
    • 向TC发起分支事务注册请求,获取到返回的branchId;
      • 分支事务的BusinessActionContext信息会作为applicationContext的一部分持久化到branch_table的application_data字段中。
    • 最后执行目标业务方法;
  4. 最后清理RootContext的信息、包括TCC分支事务类型、分支事务branchId。

3> 提交分支事务:

  1. 先根据resourceId(也就是@TwoPhaseBusinessAction注解中的name())从内存层面的缓存Map中获取到对应的TCCResource;进而获取到目标bean、目标bean的commit()方法;
    • BusinessActionContext数据会作为applicationData的一部分,由TC发送过来。需要从中提取出两阶段提交的参数(比如解析@BusinessActionContextParameter注解得到的数据);
    • 再将xid、branchId都封装到BusinessActionContext中。
  2. 使用反射将BusinessActionContext作为入参对目标bean的commit()方法进行调用;根据commit()方法是否执行成功,来决定TC是否需要重试分支事务的提交。

4> 回滚分支事务:

  1. 先根据resourceId(也就是@TwoPhaseBusinessAction注解中的name())从内存层面的缓存Map中获取到对应的TCCResource;进而获取到目标bean、目标bean的rollback()方法;
    • BusinessActionContext数据会作为applicationData的一部分,由TC发送过来。需要从中提取出两阶段提交的参数(比如解析@BusinessActionContextParameter注解得到的数据);
    • 再将xid、branchId都封装到BusinessActionContext中。
  2. 使用反射将BusinessActionContext作为入参对目标bean的rollback()方法进行调用;根据rollback()方法是否执行成功,来决定TC是否需要重试分支事务的提交。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/4570.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

mysql是怎么运行的-笔记

文章目录启动**MySQL**服务器程序 **1.3.1 UNIX**里启动服务器程序**Windows**里启动服务器程序服务器处理客户端请求常用存储引擎一些重要的字符集**MySQL**中的**utf8**和**utf8mb4**比较规则的查看MySQL有四个级别的字符集和比较规则**InnoDB**记录结构compact 行格式变长字…

【黄啊码】PHP压缩图片(简洁易懂版,不懂我下次不写)

大家好&#xff0c;我是黄啊码&#xff0c;今天我们来解决一件头疼的事情。作为技术人员&#xff0c;我们一般传图片都知道尽量传清晰和大小适中的图片&#xff0c;部署的时候当然也希望客户能按说明办事&#xff0c;但有的客户偏偏不听&#xff0c;就传大的&#xff0c;就传大…

Python标准库glob模块详解

glob是python中的内置模块&#xff0c;该模块主要是用来查找文件与目录的。glob模块是按照 Unix shell 所使用的规则找出所有匹配特定模式的路径名称。我们只需要了解该模块的匹配规则与常用函数&#xff0c;就会使文件查找&#xff0c;路径匹配变得非常快捷简单。 1.匹配规则…

【springboot整合ES】springboot整合ES

springboot整合ES 在Springboot整合ES提供了启动依赖jar。 该jar包封了一个类: RestHighLevelClient 该类可以对象ES中各个接口进行相应的操作。 1. 新建项目 创建springboot工程并导入相关的依赖 2.3.12.RELEASE。最新版spring boot2.7.5中RestHighLevelClient已过时 2. 创建…

python_爬虫

定时、定量、制定目标的数据搜集程序。 技术库 网络请求 urllibrequests&#xff08;三方&#xff09;/ urllib3selenium&#xff08;UI自动测试、动态js渲染&#xff09;appium&#xff08;手机App的爬虫或UI测试&#xff09; urllib requests 基于urllib和urllib3封装的…

什么是“孤岛效应”? ----防孤岛保护装置

防孤岛保护是对分布式光伏电站有着重要保护作用的。即当电网出现电压高、电压低、频率高、频率低故障时&#xff0c;光伏并网开关及时跳闸。当电网恢复供电并且电压和频率达到允许值时&#xff0c;并网开关要自动合闸。这样的目的是在为了国家电网不受太大影响的情况下&#xf…

理解透C语言一维数组,二维数组这一篇就够啦!

前言 &#x1f496;作者&#xff1a;龟龟不断向前 ✨简介&#xff1a;宁愿做一只不停跑的慢乌龟&#xff0c;也不想当一只三分钟热度的兔子。 &#x1f47b;专栏&#xff1a;C初阶知识点 &#x1f47b;工具分享&#xff1a; 刷题&#xff1a; 牛客网 leetcode笔记软件&#xff…

网站部署:使用Nginx部署vue项目到阿里云服务器

最近租了个阿里云的服务器&#xff0c;想使用Nginx把刚做好的网站部署上去 下载Nginx 目前yum已经有了Nginx的源&#xff0c;因此可以直接用yum下载和安装 yum -y install nginx默认的安装位置为/etc/nginx 默认的项目位置为/usr/share/nginx 如果安装失败检查是否安装了zli…

渗透测试-CTF文件类型操作

识别文件类型 文件分离 文件合并 识别文件类型 当文件没有后缀名或者有后缀名无法正常打开时&#xff0c;根据识别的文件类型来修改后缀名即可正常打开文件。 使用场景&#xff1a;不知道后缀名&#xff0c;无法打开文件。 第一种方式&#xff1a;kali中使用 file 文件名 f…

173:vue+openlayers:解决国内openstreetmap地图加载不出来的问题(代码示例)

第173个 点击查看专栏目录 近来写程序,发现openlayers用OSM方式来加载OpenStreetMap地图,一片爆红,瓦片加载不出来。 本示例的目的是介绍演示如何在vue+openlayers中解决OpenStreetMap地图在国内被DNS污染,加载不出来瓦片的问题,通常我们是直接引用OSM,这里采用的是XYZ方…

2022年11月第十四届蓝桥杯校模拟赛详解+代码(一)

“须知少时凌云志&#xff0c;自许人间第一流” 鄙人11月八号有幸参加学校蓝桥杯校选拔赛&#xff0c;题型为5道填空题&#xff0c;5道编程题&#xff0c;总时间为小时。奈何能力有限&#xff0c;只完成了5道填空和3道编程大题&#xff0c;现进行自省自纠&#xff0c;分享学习&…

国产ETL工具 BeeDI 产品“实时同步“之 高阶 功能组件

BeeDI 提供“ 实时”企业数据集成。实时组件通过实时处理和传输业务数据的能力&#xff0c;增强了BeeDI的批处理功能。为满足当下复杂的业务需求&#xff0c;IT部门需要实时集成以加快核心业务流程和信息流。使用实时功能&#xff0c;IT部门可以使用统一界面&#xff0c;从BeeD…

【Mybatis】mybatis使用与理解

1. mybatis基础环境搭建 若想使用mybatis&#xff0c;需要有如下两个jar包&#xff1a;①mybatis的核心jar包。②数据库驱动包。 &#xff08;想使用别人提供的服务就必须要有别人的jar包&#xff1b;mybatis是和数据库打交道的&#xff0c;那么你的程序中&#xff0c;数据库的…

【MySQL高级】MySQL的锁机制

目录 概述 MyISAM 表锁 InnoDB行锁 概述 锁是计算机协调多个进程或线程并发访问某一资源的机制&#xff08;避免争抢&#xff09;。 在数据库中&#xff0c;除传统的 计算资源&#xff08;如 CPU、RAM、I/O 等&#xff09;的争用以外&#xff0c;数据也是一种供许多用户共…

重装系统后要安装哪些驱动

​重装win10后需要安装驱动吗?win10系统比win7系统高级的地方在于系统内置了很多驱动&#xff0c;这可以省去用户很多安装时间&#xff0c;下面小编来告诉大家重装系统后要安装的驱动有哪些。 工具/原料&#xff1a; 系统版本&#xff1a;windows10系统 品牌型号&#xff1…

mysql8其它新特性

文章目录MySQL8.0新特性新特性1&#xff1a;窗口函数序号函数ROW_NUMBER()函数RANK()函数DENSE_RANK()函数分布函数PERCENT_RANK()CUME_DIST()函数前后函数LAG(expr,n)函数LEAD(expr,n)函数首尾函数FIRST_VALUE(expr)函数LAST_VALUE(expr)函数其它函数NTH_VALUE(expr,n)函数NTI…

Docker 常用命令大全

个人理解 docker中的镜像 就像是咱们java 中的Class &#xff0c;而容器呢 是基于这个镜像构建出的实例 类似于咱java 中 根据Class构造出的一个个实例对象 &#xff0c;本人是初学者 理解有误还请见谅&#xff0c;并麻烦您说说您的看法让彼此相互学习… 按我理解 简言之 doc…

Java-日期类,正则实验

1. 随机产生两个日期时间&#xff0c;输入按时间先后顺序输出 public class RandomDate {SuppressWarnings("deprecation")public static void main(String[] args) throws ParseException {SimpleDateFormat sdf new SimpleDateFormat("yyyy-MM-dd HH:mm:ss&…

二分查找 【模板+中间值问题】

全文目录&#x1f603;前言&#x1f615;二分查找动图演示&#x1f634;代码模板❗️ 使用哪个模板问题 ❗️&#x1f4a2; mid为何1问题 &#x1f4a2;&#x1f603;前言 二分查找也称折半查找&#xff08;Binary Search&#xff09;&#xff0c;它是一种效率较高的查找方法。…

[解决]github上传大文件卡住

0x00 需求 github目前的策略是超过50M的文件不允许上传&#xff0c;推荐使用lfs。 0x01 操作 再把之前提交的commit 回滚&#xff1a; git reset --hard commitId 在配置lfs&#xff1a; git lfs install git lfs track "*.zip" git lfs track "*.jar" git…