Seata流程源码梳理上篇-TM、RM处理

news2024/12/24 21:22:52
这一篇我们主要来分析下Seata的AT模式的流程处理。

一、流程案例

1、案例源码

​ 我们本地流程梳理用的是基于spring-cloud框架,注册中心是eurak,服务间调用的是feign,源码下载的是官网的(当然你如果对dubbo更熟悉,也可以使用dubbo结构的),案例地址–https://github.com/seata/seata-samples。

在这里插入图片描述

在这里插入图片描述

​ 这个我本地的话,将其他的模块都删除了。然后启动的话,注意数据库连接信息的修改以及脚本初始化。

​ 这个业务是一个用户账号(account)下订单(order),然后减库存(stock),整个业务的发起就是在business发起的。

在这里插入图片描述

在这里插入图片描述

2、服务器源码

​ 然后seata服务端,我下载的1.4.2版本的源码在本地启动。

在这里插入图片描述

​ 对于服务端的启动,用的是db模式

在这里插入图片描述

​ 注册的话,修改为eurka

在这里插入图片描述

​ 注意运行对应的数据库脚本。

二、一些前置概念的介绍

​ 可以先再看下其官网的介绍–http://seata.io/zh-cn/docs/overview/what-is-seata.html。然后我们结合源码再来说下。

1、整个案例的TM、RM、TC业务逻辑

在这里插入图片描述

TC (Transaction Coordinator) - 事务协调者

维护全局和分支事务的状态,驱动全局事务提交或回滚。

TM (Transaction Manager) - 事务管理器

定义全局事务的范围:开始全局事务、提交或回滚全局事务。

RM (Resource Manager) - 资源管理器

管理分支事务处理的资源,与TC交谈以注册分支事务和报告分支事务的状态,并驱动分支事务提交或回滚。

​ 这里主要我们需要明白TM、TC的概念,TM其实就是全局事务的发起者,TC就是协调者,例如如果全局事务需要回滚或者提交,就是TM去通知TC,然后TC再获取、调用该全局事务下的各个分支,通知他们去进行提交或回滚。

​ TC其实就是我们的seata服务端,如果我们要开启全局事务,通过TM(对于我们代码来说,TM也就是TransactionManager接口)发起(发起也就是调用begin方法,获取到本次分布式事务唯一全局标识xid),也就是我们在调用businessService.purchase("U100000", "C100000", 30)方法时,就会通过拦截器拦截,进行对应分布式事务的逻辑织入,然后在里面调用TC,告诉它我们要开始分布式事务了,然后TC就是在global_table添加一条记录,就是本次分布式事务的唯一标识记录:

CREATE TABLE `global_table` (
  `xid` varchar(128) NOT NULL,
  `transaction_id` bigint DEFAULT NULL,
  `status` tinyint NOT NULL,
  `application_id` varchar(64) DEFAULT NULL,
  `transaction_service_group` varchar(64) DEFAULT NULL,
  `transaction_name` varchar(64) DEFAULT NULL,
  `timeout` int DEFAULT NULL,
  `begin_time` bigint DEFAULT NULL,
  `application_data` varchar(2000) DEFAULT NULL,
  `gmt_create` datetime DEFAULT NULL,
  `gmt_modified` datetime DEFAULT NULL,
  PRIMARY KEY (`xid`),
  KEY `idx_gmt_modified_status` (`gmt_modified`,`status`),
  KEY `idx_transaction_id` (`transaction_id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci;

在这里插入图片描述

​ 然后TC(也就是seata服务端)将xid返回,之后就会具体来调用到业务方法,也就是purchase(...)方法的业务逻辑:

@GlobalTransactional
public void purchase(String userId, String commodityCode, int orderCount) {
    stockFeignClient.deduct(commodityCode, orderCount);

    orderFeignClient.create(userId, commodityCode, orderCount);

    if (!validData()) {
        throw new RuntimeException("账户或库存不足,执行回滚");
    }
}

​ 然后我们就会通过feign去调用stock服务以及order服务,在调用的时候,将xid加在请求头部中。

stock接受到调用后,seata就会通过拓展spring提供的HandlerInterceptor接口(具体可以去了解下这个接口)实现SeataHandlerInterceptor,从头部中获取到xid,然后绑定到当前线程中,然后去执行对应的业务。

​ 对应的业务方法就会去操作DB,而这个时候,seata同样有个织入逻辑,也就是代理spring本身的DataSource-DataSourceProxy类,当操作数据库时,通过DataSource获取Connection时,其又会返回ConnectionProxy类来代理本身的Connection,然后在操作数据库之前、之后,添加对应的seata逻辑。例如:

​ 1、记录undo_log表的数据,用于回滚时候的恢复重做

CREATE TABLE `undo_log` (
  `id` bigint NOT NULL AUTO_INCREMENT,
  `branch_id` bigint NOT NULL,
  `xid` varchar(100) NOT NULL,
  `context` varchar(128) NOT NULL,
  `rollback_info` longblob NOT NULL,
  `log_status` int NOT NULL,
  `log_created` datetime NOT NULL,
  `log_modified` datetime NOT NULL,
  PRIMARY KEY (`id`),
  UNIQUE KEY `ux_undo_log` (`xid`,`branch_id`)
) ENGINE=InnoDB AUTO_INCREMENT=147 DEFAULT CHARSET=utf8mb3;

在这里插入图片描述

​ 2、通过RM(也就是ResourceManager接口)进行分支注册(调用ResourceManager接口的branchRegister方法),也就是调用TC告诉它,然后TC就会在branch_table表添加一条记录,这里的一条记录,就是一个TM对应的数据

CREATE TABLE `branch_table` (
  `branch_id` bigint NOT NULL,
  `xid` varchar(128) NOT NULL,
  `transaction_id` bigint DEFAULT NULL,
  `resource_group_id` varchar(32) DEFAULT NULL,
  `resource_id` varchar(256) DEFAULT NULL,
  `lock_key` varchar(128) DEFAULT NULL,
  `branch_type` varchar(8) DEFAULT NULL,
  `status` tinyint DEFAULT NULL,
  `client_id` varchar(64) DEFAULT NULL,
  `application_data` varchar(2000) DEFAULT NULL,
  `gmt_create` datetime DEFAULT NULL,
  `gmt_modified` datetime DEFAULT NULL,
  PRIMARY KEY (`branch_id`),
  KEY `idx_xid` (`xid`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci;

在这里插入图片描述

​ 然后当purchase(String userId, String commodityCode, int orderCount)方法调用orderstock成功完成后,这个时候就会在进入到TM也就是对应的拦截器逻辑中,如果成功执行分布式事务提交操作,失败就会捕获到异常,进行全局事务的回滚操作。例如TM调用TC告诉它完成分布式事务的提交,然后TC就会通过xid获取到当前事务下有几个分支,也就是对应RM完成提交(ResourceManagerbranchCommit方法),这个就是整个seata定义分布式事务的基本逻辑。

2、TM(TransactionManager)

public interface TransactionManager {

    /**
     * 开启一个全局事务(也就是调用),返回xid,也就是本次全局事务的标识
     */
    String begin(String applicationId, String transactionServiceGroup, String name, int timeout)
        throws TransactionException;

    /**
     * Global commit.
     *
     * @param xid XID of the global transaction.
     * @return Status of the global transaction after committing.
     * @throws TransactionException Any exception that fails this will be wrapped with TransactionException and thrown
     * out.
     */
    GlobalStatus commit(String xid) throws TransactionException;

    /**
     * Global rollback.
     *
     * @param xid XID of the global transaction
     * @return Status of the global transaction after rollbacking.
     * @throws TransactionException Any exception that fails this will be wrapped with TransactionException and thrown
     * out.
     */
    GlobalStatus rollback(String xid) throws TransactionException;
	.........
}

3、SeataHandlerInterceptor

public class SeataHandlerInterceptor implements HandlerInterceptor {

   private static final Logger log = LoggerFactory
         .getLogger(SeataHandlerInterceptor.class);

    /**
     *	从头部中获取xid,绑定到当前线程
    **/
   @Override
   public boolean preHandle(HttpServletRequest request, HttpServletResponse response,
         Object handler) {

      String xid = RootContext.getXID();
       //public static final String KEY_XID = "TX_XID";
      String rpcXid = request.getHeader(RootContext.KEY_XID);
      if (log.isDebugEnabled()) {
         log.debug("xid in RootContext {} xid in RpcContext {}", xid, rpcXid);
      }

      if (xid == null && rpcXid != null) {
         RootContext.bind(rpcXid);
         if (log.isDebugEnabled()) {
            log.debug("bind {} to RootContext", rpcXid);
         }
      }
      return true;
   }

    /**
     *	业务执行完后,删除
    **/
   @Override
   public void afterCompletion(HttpServletRequest request, HttpServletResponse response,
         Object handler, Exception e) {

      String rpcXid = request.getHeader(RootContext.KEY_XID);

      if (StringUtils.isEmpty(rpcXid)) {
         return;
      }

      String unbindXid = RootContext.unbind();
      if (log.isDebugEnabled()) {
         log.debug("unbind {} from RootContext", unbindXid);
      }
      if (!rpcXid.equalsIgnoreCase(unbindXid)) {
         log.warn("xid in change during RPC from {} to {}", rpcXid, unbindXid);
         if (unbindXid != null) {
            RootContext.bind(unbindXid);
            log.warn("bind {} back to RootContext", unbindXid);
         }
      }
   }

}

4、RM(ResourceManager)

在这里插入图片描述

三、TM相关流程源码

​ 我们上面简单介绍了seata分布式事务的简单流程,下面我们就来具体分析下源码的流转。将上面的各个应用启动后,我们调用http://localhost:8084/purchase/commit,正式执行我们的业务流程。

1、拦截对应的使用了全局事务的方法

​ 我们在purchase(...)方法使用的@GlobalTransactional注解,就会有对应的拦截器来解析对应的逻辑增强,加入对应的全局事务。

在这里插入图片描述

​ 如果没有对应全局事务的标签,就不处理,直接通过methodInvocation.proceed()执行对应的方法。我们这里是有全局事务的,就通过handleGlobalTransaction(methodInvocation, globalTransactionalAnnotation)来开始对应的逻辑处理。

2、处理全局事务

Object handleGlobalTransaction(final MethodInvocation methodInvocation,
    final GlobalTransactional globalTrxAnno) throws Throwable {
    boolean succeed = true;
    try {
        return transactionalTemplate.execute(new TransactionalExecutor() {
            @Override
            public Object execute() throws Throwable {
                return methodInvocation.proceed();
            }
			.......
            @Override
            public TransactionInfo getTransactionInfo() {
                // reset the value of timeout
                int timeout = globalTrxAnno.timeoutMills();
                if (timeout <= 0 || timeout == DEFAULT_GLOBAL_TRANSACTION_TIMEOUT) {
                    timeout = defaultGlobalTransactionTimeout;
                }
				// 这个是本次事务的对应信息、例如传播级别、超时时间
                TransactionInfo transactionInfo = new TransactionInfo();
                transactionInfo.setTimeOut(timeout);
                transactionInfo.setName(name());
                transactionInfo.setPropagation(globalTrxAnno.propagation());
                transactionInfo.setLockRetryInternal(globalTrxAnno.lockRetryInternal());
                transactionInfo.setLockRetryTimes(globalTrxAnno.lockRetryTimes());
               	.........
                return transactionInfo;
            }
        });
    } catch (TransactionalExecutor.ExecutionException e) {
        	.........
    } finally {
        if (degradeCheck) {
            EVENT_BUS.post(new DegradeCheckEvent(succeed));
        }
    }
}

​ 这里的逻辑主要是给到TransactionalTemplate来处理对应的逻辑。

3、全局事务的整个流程逻辑

public class TransactionalTemplate {
		..........
    public Object execute(TransactionalExecutor business) throws Throwable {
        // 1. Get transactionInfo
        TransactionInfo txInfo = business.getTransactionInfo();
        if (txInfo == null) {
            throw new ShouldNeverHappenException("transactionInfo does not exist");
        }
        // 1.1 Get current transaction, if not null, the tx role is 'GlobalTransactionRole.Participant'.
        GlobalTransaction tx = GlobalTransactionContext.getCurrent();

        // 1.2 Handle the transaction propagation.
        Propagation propagation = txInfo.getPropagation();
        SuspendedResourcesHolder suspendedResourcesHolder = null;
        try {
            switch (propagation) {
                case NOT_SUPPORTED:
                    // If transaction is existing, suspend it.
                    if (existingTransaction(tx)) {
                        suspendedResourcesHolder = tx.suspend();
                    }
                    // Execute without transaction and return.
                    return business.execute();
                case REQUIRES_NEW:
                    // If transaction is existing, suspend it, and then begin new transaction.
                    if (existingTransaction(tx)) {
                        suspendedResourcesHolder = tx.suspend();
                        tx = GlobalTransactionContext.createNew();
                    }
                    // Continue and execute with new transaction
                    break;
               			............
                default:
                    throw new TransactionException("Not Supported Propagation:" + propagation);
            }

            // 1.3 If null, create new transaction with role 'GlobalTransactionRole.Launcher'.
            if (tx == null) {
                tx = GlobalTransactionContext.createNew();
            }

            // set current tx config to holder
            GlobalLockConfig previousConfig = replaceGlobalLockConfig(txInfo);

            try {
              		..........
                return rs;
            } finally {
                //5. clear
                resumeGlobalLockConfig(previousConfig);
                triggerAfterCompletion();
                cleanUp();
            }
        } finally {
            // If the transaction is suspended, resume it.
            if (suspendedResourcesHolder != null) {
                tx.resume(suspendedResourcesHolder);
            }
        }
    }

​ 这里其实主要分为两部分:

​ 一部分是分布式事务其本身也是一个事务,所以前面的switch (propagation)里面的逻辑,就是遵循spring本身的事务传播机制。例如如果是NOT_SUPPORTED,其不需要事务,就会将原来的事务挂起,然后直接执行对应的业务,而如果是REQUIRES_NEW的话,其就需要先将原来的事务挂起,再开启一个新事务来继续处理往下的逻辑。

​ 另一部分就是具体的分布式事务的处理了。

 // 1.3 If null, create new transaction with role 'GlobalTransactionRole.Launcher'.
    if (tx == null) {
        tx = GlobalTransactionContext.createNew();
    }

    // set current tx config to holder
    GlobalLockConfig previousConfig = replaceGlobalLockConfig(txInfo);

    try {
        // 2. If the tx role is 'GlobalTransactionRole.Launcher', send the request of beginTransaction to TC,
        //    else do nothing. Of course, the hooks will still be triggered.
        beginTransaction(txInfo, tx);

        Object rs;
        try {
            // Do Your Business
            rs = business.execute();
        } catch (Throwable ex) {
            // 3. The needed business exception to rollback.
            completeTransactionAfterThrowing(txInfo, tx, ex);
            throw ex;
        }

        // 4. everything is fine, commit.
        commitTransaction(tx);

        return rs;
    } finally {
        //5. clear
        resumeGlobalLockConfig(previousConfig);
        triggerAfterCompletion();
        cleanUp();
    }
} finally {
    // If the transaction is suspended, resume it.
    if (suspendedResourcesHolder != null) {
        tx.resume(suspendedResourcesHolder);
    }
}

1、获取DefaultGlobalTransaction

​ 这里首先就是通过GlobalTransactionContext.createNew()获取DefaultGlobalTransaction

public static GlobalTransaction createNew() {
    return new DefaultGlobalTransaction();
}
DefaultGlobalTransaction() {
    this(null, GlobalStatus.UnKnown, GlobalTransactionRole.Launcher);
}

/**
 * Instantiates a new Default global transaction.
 *
 * @param xid    the xid
 * @param status the status
 * @param role   the role
 */
DefaultGlobalTransaction(String xid, GlobalStatus status, GlobalTransactionRole role) {
    this.transactionManager = TransactionManagerHolder.get();
    this.xid = xid;
    this.status = status;
    this.role = role;
}

​ 这个类里面可以获取到TransactionManager事务管理器,也就是TMtransactionManager是单例模式创建的。

2、开启事务(调用TC获取xid)

private void beginTransaction(TransactionInfo txInfo, GlobalTransaction tx) throws TransactionalExecutor.ExecutionException {
    try {
        triggerBeforeBegin();
        tx.begin(txInfo.getTimeOut(), txInfo.getName());
        triggerAfterBegin();
    } catch (TransactionException txe) {
        throw new TransactionalExecutor.ExecutionException(tx, txe,
            TransactionalExecutor.Code.BeginFailure);

    }
}

​ 然后我们会开启全局事务的流程,也就是首先获取xid。本次全局事务的状态设置为GlobalStatus.Begin,然后将xid绑定到当前线程上面。

public void begin(int timeout, String name) throws TransactionException {
    if (role != GlobalTransactionRole.Launcher) {
        assertXIDNotNull();
       	........
        return;
    }
    assertXIDNull();
    String currentXid = RootContext.getXID();
    if (currentXid != null) {
        throw new IllegalStateException("Global transaction already exists," +
            " can't begin a new global transaction, currentXid = " + currentXid);
    }
    xid = transactionManager.begin(null, null, name, timeout);
    status = GlobalStatus.Begin;
    RootContext.bind(xid);
}

3、执行业务(成功则提交、失败则回滚)

​ 我们获取到xid开始全局事务后,就执行我们的业务。

beginTransaction(txInfo, tx);

Object rs;
try {
    // Do Your Business
    rs = business.execute();
} catch (Throwable ex) {
    // 3. The needed business exception to rollback.
    completeTransactionAfterThrowing(txInfo, tx, ex);
    throw ex;
}

// 4. everything is fine, commit.
commitTransaction(tx);

4、执行成功TM全局事务提交

private void commitTransaction(GlobalTransaction tx) throws TransactionalExecutor.ExecutionException {
    try {
        triggerBeforeCommit();
        tx.commit();
        triggerAfterCommit();
    } catch (TransactionException txe) {
        // 4.1 Failed to commit
        throw new TransactionalExecutor.ExecutionException(tx, txe,
            TransactionalExecutor.Code.CommitFailure);
    }
}
public void commit() throws TransactionException {
    if (role == GlobalTransactionRole.Participant) {
        // Participant has no responsibility of committing
        return;
    }
    assertXIDNotNull();
    int retry = COMMIT_RETRY_COUNT <= 0 ? DEFAULT_TM_COMMIT_RETRY_COUNT : COMMIT_RETRY_COUNT;
    try {
        while (retry > 0) {
            try {
                status = transactionManager.commit(xid);
                break;
            } catch (Throwable ex) {
                LOGGER.error("Failed to report global commit [{}],Retry Countdown: {}, reason: {}", this.getXid(), retry, ex.getMessage());
                retry--;
                if (retry == 0) {
                    throw new TransactionException("Failed to report global commit", ex);
                }
            }
        }
    } finally {
        if (xid.equals(RootContext.getXID())) {
            suspend();
        }
    }
}

​ 这里我们可以看到当业务正常完成后,其会通过TM(TransactionManager)去提交事务,也就是调TC提交本次全局事务,让TC去通知各个RM本次事务已正常完成。

public GlobalStatus commit(String xid) throws TransactionException {
    GlobalCommitRequest globalCommit = new GlobalCommitRequest();
    globalCommit.setXid(xid);
    GlobalCommitResponse response = (GlobalCommitResponse) syncCall(globalCommit);
    return response.getGlobalStatus();
}

5、执行失败TM全局事务回滚

private void rollbackTransaction(GlobalTransaction tx, Throwable originalException) throws TransactionException, TransactionalExecutor.ExecutionException {
    triggerBeforeRollback();
    tx.rollback();
    triggerAfterRollback();
    // 3.1 Successfully rolled back
    throw new TransactionalExecutor.ExecutionException(tx, GlobalStatus.RollbackRetrying.equals(tx.getLocalStatus())
        ? TransactionalExecutor.Code.RollbackRetrying : TransactionalExecutor.Code.RollbackDone, originalException);
}
public void rollback() throws TransactionException {
    if (role == GlobalTransactionRole.Participant) {
        return;
    }
    assertXIDNotNull();

    int retry = ROLLBACK_RETRY_COUNT <= 0 ? DEFAULT_TM_ROLLBACK_RETRY_COUNT : ROLLBACK_RETRY_COUNT;
    try {
        while (retry > 0) {
            try {
                status = transactionManager.rollback(xid);
                break;
            } catch (Throwable ex) {
                LOGGER.error("Failed to report global rollback [{}],Retry Countdown: {}, reason: {}", this.getXid(), retry, ex.getMessage());
                retry--;
                if (retry == 0) {
                    throw new TransactionException("Failed to report global rollback", ex);
                }
            }
        }
    } finally {
        if (xid.equals(RootContext.getXID())) {
            suspend();
        }
    }
}

​ 这个与提交一样的逻辑,只是这里调用的是TM的回滚接口。

四、RM相关流程源码

在这里插入图片描述

​ 我们再回到这张图片,以及对应的方法调用:

@GlobalTransactional
public void purchase(String userId, String commodityCode, int orderCount) {
    stockFeignClient.deduct(commodityCode, orderCount);

    orderFeignClient.create(userId, commodityCode, orderCount);

    if (!validData()) {
        throw new RuntimeException("账户或库存不足,执行回滚");
    }
}

​ 我们前面梳理了,TM开启事务主要是通过拦截器在调用purchase(...)进行对应全局事务开始处理,然后在purchase(...)完成后,再通过拦截器来解析TM的提交或者回滚。然后RM的逻辑有两个,一个是在purchase(...)方法通过feign调用stockorder是,其的处理,另一个是TC来解析全局事务提交、或者回滚的调用。

1、purchase业务方法调用stockFeignClient.deduct

@FeignClient(name = "stock-service", url = "127.0.0.1:8081")
public interface StockFeignClient {

    @GetMapping("/deduct")
    void deduct(@RequestParam("commodityCode") String commodityCode, @RequestParam("count") Integer count);

}

​ 这里是通过feign去调用到stock服务,然后seata在这里做了个处理,也是代理模式,代理本身的feignClient,然后增强,也就是将xid加到header里面:

public class SeataFeignClient implements Client {

   private final Client delegate;
   private final BeanFactory beanFactory;
   private static final int MAP_SIZE = 16;

   SeataFeignClient(BeanFactory beanFactory) {
      this.beanFactory = beanFactory;
      this.delegate = new Client.Default(null, null);
   }
	.........

   @Override
   public Response execute(Request request, Request.Options options) throws IOException {

      Request modifiedRequest = getModifyRequest(request);
      return this.delegate.execute(modifiedRequest, options);
   }

   private Request getModifyRequest(Request request) {

      String xid = RootContext.getXID();
	 .........
      Map<String, Collection<String>> headers = new HashMap<>(MAP_SIZE);
      headers.putAll(request.headers());

      List<String> fescarXid = new ArrayList<>();
      fescarXid.add(xid);
      headers.put(RootContext.KEY_XID, fescarXid);

      return Request.create(request.method(), request.url(), headers, request.body(),
            request.charset());
   }

}

2、stock服务取xid(SeataHandlerInterceptor)

​ 在business服务调用时添加了xid到同步中,在stock肯定会取出来,其的获取也是拓展spring的HandlerInterceptor接口。

在这里插入图片描述

public class SeataHandlerInterceptor implements HandlerInterceptor {

   private static final Logger log = LoggerFactory
         .getLogger(SeataHandlerInterceptor.class);

   @Override
   public boolean preHandle(HttpServletRequest request, HttpServletResponse response,
         Object handler) {

      String xid = RootContext.getXID();
    	........
      if (xid == null && rpcXid != null) {
         RootContext.bind(rpcXid);
      }
      return true;
   }

   @Override
   public void afterCompletion(HttpServletRequest request, HttpServletResponse response,
         Object handler, Exception e) {

      String rpcXid = request.getHeader(RootContext.KEY_XID);

      if (StringUtils.isEmpty(rpcXid)) {
         return;
      }
      String unbindXid = RootContext.unbind();
  
      if (!rpcXid.equalsIgnoreCase(unbindXid)) {
         if (unbindXid != null) {
            RootContext.bind(unbindXid);
         }
      }
   }

}

​ 这里就是在DispatcherServlet调用实际Controller业务方法其重头部中获取然后绑定到当前线程中,在业务方法完成后再unbind。在这里处理后,就会进入到controller的业务逻辑中。

3、stock的DB相关逻辑(主要织入分支注册、undo日志记录)

在这里插入图片描述

​ 然后就就会进行DB操作,同时我们可以看到,Seata的在这里也是通过代理模式处理了DataSource

public class DataSourceProxy extends AbstractDataSourceProxy implements Resource {
    ..........
	@Override
    public ConnectionProxy getConnection() throws SQLException {
        Connection targetConnection = targetDataSource.getConnection();
        return new ConnectionProxy(this, targetConnection);
    }

​ 主要的逻辑增强处理就是在ConnectionProxy中。并且seata的处理不单是这个代理了,还有PreparedStatement这些,我们在这里就不是梳理其的具体流转了,有点绕。我们自己看seata在进行DB操作前做了对应的哪些处理。

1)、判断处理是否设置自动提交

public T doExecute(Object... args) throws Throwable {
    AbstractConnectionProxy connectionProxy = statementProxy.getConnectionProxy();
    if (connectionProxy.getAutoCommit()) {
        return executeAutoCommitTrue(args);
    } else {
        return executeAutoCommitFalse(args);
    }
}

​ 在执行前,判断是否为自动提交,如果为自动提交,是的话调用executeAutoCommitTrue,这个方法主要是将自动提交设置为不自动提交,然后调用executeAutoCommitFalse(args),再进行提交connectionProxy.commit():

protected T executeAutoCommitTrue(Object[] args) throws Throwable {
    ConnectionProxy connectionProxy = statementProxy.getConnectionProxy();
    try {
        connectionProxy.changeAutoCommit();
        return new LockRetryPolicy(connectionProxy).execute(() -> {
            T result = executeAutoCommitFalse(args);
            connectionProxy.commit();
            return result;
        });
    } catch (Exception e) {
        if (!LockRetryPolicy.isLockRetryPolicyBranchRollbackOnConflict()) {
            connectionProxy.getTargetConnection().rollback();
        }
        throw e;
    } finally {
        connectionProxy.getContext().reset();
        connectionProxy.setAutoCommit(true);
    }
}

2)、处理业务执行

protected T executeAutoCommitFalse(Object[] args) throws Exception {
    if (!JdbcConstants.MYSQL.equalsIgnoreCase(getDbType()) && isMultiPk()) {
        throw new NotSupportYetException("multi pk only support mysql!");
    }
    TableRecords beforeImage = beforeImage();
    T result = statementCallback.execute(statementProxy.getTargetStatement(), args);
    TableRecords afterImage = afterImage(beforeImage);
    prepareUndoLog(beforeImage, afterImage);
    return result;
}

​ 这个我们可以看到逻辑,在执行业务sql前获取修改前的逻辑,执行后以及获取修改后的数据,保存到undo日志

protected void prepareUndoLog(TableRecords beforeImage, TableRecords afterImage) throws SQLException {
		.........
    TableRecords lockKeyRecords = sqlRecognizer.getSQLType() == SQLType.DELETE ? beforeImage : afterImage;
    String lockKeys = buildLockKey(lockKeyRecords);
    if (null != lockKeys) {
        connectionProxy.appendLockKey(lockKeys);

        SQLUndoLog sqlUndoLog = buildUndoItem(beforeImage, afterImage);
        connectionProxy.appendUndoLog(sqlUndoLog);
    }
}

在这里插入图片描述

3)、DB提交(RM分支事务注册、undo数据记录)

private void processGlobalTransactionCommit() throws SQLException {
    try {
        register();
    } catch (TransactionException e) {
        recognizeLockKeyConflictException(e, context.buildLockKeys());
    }
    try {
        UndoLogManagerFactory.getUndoLogManager(this.getDbType()).flushUndoLogs(this);
        targetConnection.commit();
    } catch (Throwable ex) {
        LOGGER.error("process connectionProxy commit error: {}", ex.getMessage(), ex);
        report(false);
        throw new SQLException(ex);
    }
    if (IS_REPORT_SUCCESS_ENABLE) {
        report(true);
    }
    context.reset();
}

​ 在提交的时候主要有四个处理:

1、作为RM向TC注册分支事务

private void register() throws TransactionException {
    if (!context.hasUndoLog() || !context.hasLockKey()) {
        return;
    }
    Long branchId = DefaultResourceManager.get().branchRegister(BranchType.AT, getDataSourceProxy().getResourceId(),
        null, context.getXid(), null, context.buildLockKeys());
    context.setBranchId(branchId);
}
public Long branchRegister(BranchType branchType, String resourceId, String clientId, String xid, String applicationData, String lockKeys) throws TransactionException {
    try {
        BranchRegisterRequest request = new BranchRegisterRequest();
        request.setXid(xid);
        request.setLockKey(lockKeys);
        request.setResourceId(resourceId);
        request.setBranchType(branchType);
        request.setApplicationData(applicationData);

        BranchRegisterResponse response = (BranchRegisterResponse) RmNettyRemotingClient.getInstance().sendSyncRequest(request);
        if (response.getResultCode() == ResultCode.Failed) {
            throw new RmTransactionException(response.getTransactionExceptionCode(), String.format("Response[ %s ]", response.getMsg()));
        }
        return response.getBranchId();
    } catch (TimeoutException toe) {
        throw new RmTransactionException(TransactionExceptionCode.IO, "RPC Timeout", toe);
    } catch (RuntimeException rex) {
        throw new RmTransactionException(TransactionExceptionCode.BranchRegisterFailed, "Runtime", rex);
    }
}

​ 这个就是告诉TC直接是本次全局事务的一个分支事务。然后TC收到后就会在branch_table中添加一条记录。

2、undo表记录插入

public void flushUndoLogs(ConnectionProxy cp) throws SQLException {
    ConnectionContext connectionContext = cp.getContext();
    if (!connectionContext.hasUndoLog()) {
        return;
    }
    String xid = connectionContext.getXid();
    long branchId = connectionContext.getBranchId();
    BranchUndoLog branchUndoLog = new BranchUndoLog();
    branchUndoLog.setXid(xid);
    branchUndoLog.setBranchId(branchId);
    branchUndoLog.setSqlUndoLogs(connectionContext.getUndoItems());
    UndoLogParser parser = UndoLogParserFactory.getInstance();
    byte[] undoLogContent = parser.encode(branchUndoLog);
		.........
    insertUndoLogWithNormal(xid, branchId, buildContext(parser.getName(), compressorType), undoLogContent, cp.getTargetConnection());
}
protected void insertUndoLogWithNormal(String xid, long branchId, String rollbackCtx, byte[] undoLogContent,
                                       Connection conn) throws SQLException {
    insertUndoLog(xid, branchId, rollbackCtx, undoLogContent, State.Normal, conn);
}
private void insertUndoLog(String xid, long branchId, String rollbackCtx, byte[] undoLogContent,
                           State state, Connection conn) throws SQLException {
    try (PreparedStatement pst = conn.prepareStatement(INSERT_UNDO_LOG_SQL)) {
        pst.setLong(1, branchId);
        pst.setString(2, xid);
        pst.setString(3, rollbackCtx);
        pst.setBytes(4, undoLogContent);
        pst.setInt(5, state.getValue());
        pst.executeUpdate();
    } catch (Exception e) {
        if (!(e instanceof SQLException)) {
            e = new SQLException(e);
        }
        throw (SQLException) e;
    }
}

3、通过Connection正式进行本地事务提交

4、作为RM向TC进行本地事务完成状态的提交

private void report(boolean commitDone) throws SQLException {
    if (context.getBranchId() == null) {
        return;
    }
    int retry = REPORT_RETRY_COUNT;
    while (retry > 0) {
        try {
            DefaultResourceManager.get().branchReport(BranchType.AT, context.getXid(), context.getBranchId(),
                commitDone ? BranchStatus.PhaseOne_Done : BranchStatus.PhaseOne_Failed, null);
            return;
        } catch (Throwable ex) {
            retry--;
            if (retry == 0) {
                throw new SQLException("Failed to report branch status " + commitDone, ex);
            }
        }
    }
}

​ 可以看到,我们如果完成的话,就报告为BranchStatus.PhaseOne_Done,也就是本地分支事务完成提交,如果失败就是BranchStatus.PhaseOne_Failed

4、本地分支事务的提交或回滚

​ 如果全局事务的各个分支正常完成,然后TM就会向TC进行全局事务的提交,然后TC就通过xid获取当前全局事务下面的各个分支,让它们进行提交,回滚从流程上来说是类似的,所以我们本次就梳理commit的流程。

1、提交

​ seata在调用是使用的nio,我们直接进入到具体的处理。

在这里插入图片描述

​ 这里是我们RM被调用提交:

在这里插入图片描述

public BranchStatus branchCommit(BranchType branchType, String xid, long branchId, String resourceId,
                                 String applicationData) throws TransactionException {
    return asyncWorker.branchCommit(xid, branchId, resourceId);
}

​ 可以看到其在ResourceManager中是异步处理的。

public BranchStatus branchCommit(String xid, long branchId, String resourceId) {
    Phase2Context context = new Phase2Context(xid, branchId, resourceId);
    addToCommitQueue(context);
    return BranchStatus.PhaseTwo_Committed;
}

​ 添加到异步提交队列后就直接返回BranchStatus.PhaseTwo_Committed,也就是二阶段提交完成。

​ 那关于第二阶段完成的具体处理是什么呢?我们知道具体数据库的数据其实在第一阶段以及做了修改(并做了undo备份),所以二阶段的处理,不用处理数据部分了,只要将次xid对应的undo记录删除就可以了。

private void doBranchCommit() {
  	............
    groupedContexts.forEach(this::dealWithGroupedContexts);
}
private void dealWithGroupedContexts(String resourceId, List<Phase2Context> contexts) {
   	...........
    Connection conn;
    try {
        conn = dataSourceProxy.getPlainConnection();
    } catch (SQLException sqle) {
        return;
    }
    UndoLogManager undoLogManager = UndoLogManagerFactory.getUndoLogManager(dataSourceProxy.getDbType());
	..........
    splitByLimit.forEach(partition -> deleteUndoLog(conn, undoLogManager, partition));
}
private void deleteUndoLog(Connection conn, UndoLogManager undoLogManager, List<Phase2Context> contexts) {
    Set<String> xids = new LinkedHashSet<>(contexts.size());
 		.............
    try {
        undoLogManager.batchDeleteUndoLog(xids, branchIds, conn);
        if (!conn.getAutoCommit()) {
            conn.commit();
        }
    } catch (SQLException e) {
        LOGGER.error("Failed to batch delete undo log", e);
        try {
            conn.rollback();
        } catch (SQLException rollbackEx) {
            LOGGER.error("Failed to rollback JDBC resource after deleting undo log failed", rollbackEx);
        }
    } finally {
        try {
            conn.close();
        } catch (SQLException closeEx) {
            LOGGER.error("Failed to close JDBC resource after deleting undo log", closeEx);
        }
    }
}

2、回滚

​ 关于回滚,其主要就是根据undo查询恢复数据的提交

public BranchStatus branchCommit(BranchType branchType, String xid, long branchId, String resourceId,
                                 String applicationData) throws TransactionException {
    return asyncWorker.branchCommit(xid, branchId, resourceId);
}

@Override
public BranchStatus branchRollback(BranchType branchType, String xid, long branchId, String resourceId,
                                   String applicationData) throws TransactionException {
    DataSourceProxy dataSourceProxy = get(resourceId);
    if (dataSourceProxy == null) {
        throw new ShouldNeverHappenException();
    }
    try {
        UndoLogManagerFactory.getUndoLogManager(dataSourceProxy.getDbType()).undo(dataSourceProxy, xid, branchId);
    } catch (TransactionException te) {
        StackTraceLogger.info(LOGGER, te,
            "branchRollback failed. branchType:[{}], xid:[{}], branchId:[{}], resourceId:[{}], applicationData:[{}]. reason:[{}]",
            new Object[]{branchType, xid, branchId, resourceId, applicationData, te.getMessage()});
        if (te.getCode() == TransactionExceptionCode.BranchRollbackFailed_Unretriable) {
            return BranchStatus.PhaseTwo_RollbackFailed_Unretryable;
        } else {
            return BranchStatus.PhaseTwo_RollbackFailed_Retryable;
        }
    }
    return BranchStatus.PhaseTwo_Rollbacked;

}

​ 我们可以看到这个,就是恢复到前面的数据,具体里面的处理我们就不梳理了。

UndoLogManagerFactory.getUndoLogManager(dataSourceProxy.getDbType()).undo(dataSourceProxy, xid, branchId);

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/148499.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

CSDN博客之星年度评选活动 - 2022

文章目录一、2022年CSDN博客之星评选活动报名二、2022年CSDN博客之星评选活动流程线上评分流程争议&#xff08;官方最后证实公布后会更新&#xff09;三、2022年CSDN博客之星评选规则四、2022年CSDN博客之星评分规则五、2022年CSDN博客之星活动奖品「博客之星」奖品「博客新星…

CInternetSession OpenURL没反应,不能捕获异常

本文迁移自本人网易博客&#xff0c;写于2013年10月22日CString sFileName;CInternetSession iSession;BOOL bRet FALSE;CStdioFile* pFileDown NULL;try{pFileDown iSession.OpenURL(szURL, 1, INTERNET_FLAG_TRANSFER_BINARY|INTERNET_FLAG_DONT_CACHE);}catch(...){CStri…

2023/1/8 Vue学习笔记-4-脚手架及相关属性配置

1 创建脚手架 &#xff08;1&#xff09;CLI就是 command line interface 的缩写。Vue CLI官网&#xff1a;Vue CLI &#xff08;2&#xff09;安装过程&#xff1a; &#xff08;PS&#xff1a; 提前安装过node.js了&#xff0c;没有安装的可以打开这个&#xff1a;Download …

什么是布隆过滤器?——超详细解析【建议收藏】

目录 1、什么是布隆过滤器&#xff1f; 2、实现原理 2.1、回顾哈希函数 2.1.1、哈希函数概念 2.1.2、散列函数的基本特性&#xff1a; 2.2、布隆过滤器数据结构 3、特点 3.1、支持删除吗&#xff1f; 3.2、优点 3.3、缺点 3.4、误判率 4、如何选择哈希函数个数和布…

3 机器学习之聚类

学习笔记自&#xff0c;慕课网 《Python3 入门人工智能》 https://coding.imooc.com/lesson/418.html#mid32716 分类问题 1. 无监督学习 机器学习的一种方法&#xff0c;没有给定事先标记过的训练示例&#xff0c;自动对输入的数据进行分类或分群 优点&#xff1a; 1&#xf…

今年十八,喜欢CTF-杂项

目录 前言 菜狗杯杂项签到 我吐了你随意 损坏的压缩包 misc4 misc5 前言 &#x1f340;作者简介&#xff1a;被吉师散养、喜欢前端、学过后端、练过CTF、玩过DOS、不喜欢java的不知名学生。 &#x1f341;个人主页&#xff1a;被吉师散养的职业混子 &#x1fad2;每日emo&am…

Rad Studio 11.2 安装 QuickBurro 7.21 中间件组件教程

背景 QuickBurro 官方网址&#xff1a;http://www.quickburro.org/ 系统环境&#xff1a;Rad Studio 11.2 安装其他的组件操作和这个一样的&#xff0c;同样可以参考 开始配置 先打开 Rad Studio 11&#xff0c;依次点击 File–Open Project… 然后找到你解压的 qbcn 目录下的…

React 环境搭建以及创建项目工程(二)

创建工程 首先创建一个工程 npx create-react-app weibo cd移动到当前创建的文件下 cd weibo 安装 React 路由 npm install react-router react-router-dom5.2.0 --save 安装 npm install 安依赖包 npm install antd --save npm install axios --save 安装less和less-…

PyCharm安装步骤

以64位的Windows10系统为例&#xff1a; 下载链接&#xff1a;Thank you for downloading PyCharm! 下载并打开安装包 在 Installation Options&#xff08;安装选项&#xff09;页面按下图所示勾选相应内容&#xff1a; 等待电脑自动安装完成 在PyCharm里编写程序 第1步&a…

【python】天平最少砝码设计

题目 有一架天平&#xff0c;砝码的种类和个数要你来设计。给定一个整数n&#xff0c;则待称重的物品的重量可能是 [1,n] 之间的整数&#xff0c;砝码可以放在左盘也可以放在右盘&#xff0c;要能称出所有 [1,n] 重量的物品&#xff0c;请问如何设计砝码的种类和个数&#xff…

Unreal UFUNCTION函数宏标记

BlueprintCallable,使C中的函数在蓝图中能被调用,新建C类CustomActor继承AActor,并分别声明public、protected、private方法:拖拽一个CustomActor到Map中,打开关卡蓝图,可以到无法在蓝图中调出C中的方法:我们为这三个方法添加BlueprintCallable标记:然后在蓝图中调用:可以发现,…

驱动程序开发:多点电容触摸屏

驱动程序开发&#xff1a;多点电容触摸屏一、编写驱动前的知识准备1、CST340触摸屏芯片寄存器2、CST340触摸屏的硬件原理图3、电容触摸屏驱动是由几种linux驱动框架组成的4、linux多点电容触摸的(Multi-touch&#xff0c;简称 MT)协议二、驱动程序的编写1、修改设备树2、驱动程…

Spring AOP【AOP的基本实现与动态代理JDK Proxy 和 CGLIB区别】

Spring AOP【AOP的基本实现与动态代理JDK Proxy 和 CGLIB区别】&#x1f34e;一. Spring AOP&#x1f352;1.1 什么是Spring AOP&#x1f352;1.2 Spring AOP的作用&#x1f352;1.3 AOP的组成&#x1f349;1.3.1 切面&#xff08;Aspect&#xff09;&#x1f349;1.3.2 连接点…

大数据NiFi(十一):NiFi入门案例一

文章目录 NiFi入门案例一 一、配置“GetFile”处理器

Elastic-Job分布式任务调度

一.什么是任务调度 **任务调度&#xff1a;**是指系统为了自动完成特点任务&#xff0c;在约定的特定时刻去执行任务的过程。有了任务调度就不需要人力去实现&#xff0c;系统可以在某个时间自动执行任务。 二&#xff0c;任务调度的实现方式&#xff1a; 1.**多线程方式实现…

【博客579】netfilter network flow 和 routing decision的网络流处理交互关系

netfilter网络流转&#xff08;network flow&#xff09;与路由决策&#xff08;routing decision&#xff09;的网络流处理交互关系 1、场景&#xff1a; 我们可以通过iptables来基于netfilter机制下发我们的hook处理函数&#xff0c;那么我们平时iptables的四表五链与报文的…

JDBC简介

大家好&#xff0c;今天给大家分享jdbc 首先我们要知道什么是jdbc JDBC(Java DataBase Connectivity) &#xff1a;Java数据库连接技术&#xff1a;具体讲就是通过Java连接广泛的数据库&#xff0c;并对表中数据执行增、删、改、查等操作的技术 看它的架构图 或者看这个图…

flowable工作流架构分析

flowable工作流目录概述需求&#xff1a;设计思路实现思路分析1.复杂的状态的或者状态的维度增加的状的条件极为复杂2.工作流3.BPMN2.0协议4.协议的元素5.互斥网关包容性网关&#xff08;Inclusive Gateway&#xff09;参考资料和推荐阅读Survive by day and develop by night.…

车载以太网 - DoIP诊断消息处理逻辑 - 05

前面我们已经介绍了DoIP信息头部处理逻辑和路由激活处理,那根据DoIP报文的顺序,在路由激活处理完成后,接下来我们就需要发送真正的DoIP诊断信息了,那今天我们就来介绍下DoIP诊断消息的处理逻辑。 诊断消息处理逻辑 DoIP诊断报文结构: 上面表格对于DoIP诊断报文的…

Android 11 SystemUI(状态/导航栏)-图标按键的深浅色

概述 自 Android 5.0 版本&#xff0c;Android 带来了沉浸式系统 bar&#xff08;状态栏和导航栏&#xff09;&#xff0c;Android 的视觉效果进一步提高&#xff0c;各大 app 厂商也在大多数场景上使用沉浸式效果。6.0开始提供了View.SYSTEM_UI_FLAG_LIGHT_STATUS_BAR标志位&a…