什么是AT模式
AT模式是一种无侵入的分布式事务解决方案 保证最终一致性 是Seata默认的方式,在AT模式下,用户只需要关注自己的“业务SQL”,用户的“业务SQL”作为一阶段,Seata框架会自动的生成事务的二阶段提交和回滚
AT模式的机制
AT模式其实就是二阶段提交的演变
什么是二阶段提交可以看https://cloud.tencent.com/developer/article/2363585?areaId=106001
AT模式的主要流程是
一阶段:业务数据和回滚日志记录在同一个本地事务中提交,释放本地锁和连接资源。
二阶段:
提交异步化,非常快速地完成。
回滚通过一阶段的回滚日志进行反向补偿。
项目启动创建GlobalTransactional的代理类
SpringBoot项目启动的时候会通过自动装配机制将GlobalTransactionScanner装载到Bean中 然后 wrapIfNecessary方法会去做增强 创建针对 GlobalTransactionScanner的代理类 并构建全局的 globalTransactionalInterceptor拦截器
GlobalTransactionScanner
// 对于扫描到的@GlobalTransactional注解, bean和beanName
// 判断类 或 类的某一个方法是否被@GlobalTransactional注解标注,进而决定当前Class是否需要创建动态代理;存在则创建。
@Override
protected Object wrapIfNecessary(Object bean, String beanName, Object cacheKey) {
// do checkers,做一些检查,不用花过多精力关注
if (!doCheckers(bean, beanName)) {
return bean;
}
try {
synchronized (PROXYED_SET) {
if (PROXYED_SET.contains(beanName)) {
return bean;
}
interceptor = null;
//check TCC proxy TCC的动态代理
if (TCCBeanParserUtils.isTccAutoProxy(bean, beanName, applicationContext)) {
// init tcc fence clean task if enable useTccFence
TCCBeanParserUtils.initTccFenceCleanTask(TCCBeanParserUtils.getRemotingDesc(beanName), applicationContext);
//TCC interceptor, proxy bean of sofa:reference/dubbo:reference, and LocalTCC
interceptor = new TccActionInterceptor(TCCBeanParserUtils.getRemotingDesc(beanName));
ConfigurationCache.addConfigListener(ConfigurationKeys.DISABLE_GLOBAL_TRANSACTION,
(ConfigurationChangeListener) interceptor);
} else {
// 先获取目标Class的接口
Class<?> serviceInterface = SpringProxyUtils.findTargetClass(bean);
Class<?>[] interfacesIfJdk = SpringProxyUtils.findInterfaces(bean);
// existsAnnotation()表示类或类方法是否有被@GlobalTransactional注解标注,进而决定类是否需要被动态代理
if (!existsAnnotation(new Class[]{serviceInterface})
&& !existsAnnotation(interfacesIfJdk)) {
return bean;
}
if (globalTransactionalInterceptor == null) {
// 构建一个全局拦截器
globalTransactionalInterceptor = new GlobalTransactionalInterceptor(failureHandlerHook);
ConfigurationCache.addConfigListener(
ConfigurationKeys.DISABLE_GLOBAL_TRANSACTION,
(ConfigurationChangeListener) globalTransactionalInterceptor);
}
interceptor = globalTransactionalInterceptor;
}
LOGGER.info("Bean[{}] with name [{}] would use interceptor [{}]", bean.getClass().getName(), beanName, interceptor.getClass().getName());
// 如果当前Bean没有被AOP代理
if (!AopUtils.isAopProxy(bean)) {
// 基于Spring AOP的AutoProxyCreator对当前Class创建全局事务动态动态代理类
bean = super.wrapIfNecessary(bean, beanName, cacheKey);
} else {
AdvisedSupport advised = SpringProxyUtils.getAdvisedSupport(bean);
Advisor[] advisor = buildAdvisors(beanName, getAdvicesAndAdvisorsForBean(null, null, null));
int pos;
for (Advisor avr : advisor) {
// Find the position based on the advisor's order, and add to advisors by pos
// 找到seata切面的位置
pos = findAddSeataAdvisorPosition(advised, avr);
advised.addAdvisor(pos, avr);
}
}
PROXYED_SET.add(beanName);
return bean;
}
} catch (Exception exx) {
throw new RuntimeException(exx);
}
}
全局事务处理拦截器执行
对于一个全局事务来说,开启全局事务的角色,即TM
GlobalTransactionalInterceptor实现了MethodInterceptor接口,所以当每次执行添加了 GlobalTransactionalInterceptor拦截器的Bean的方法时,都会进入到GlobalTransactionalInterceptor类覆写MethodInterceptor接口的invoke()方法:
@Override
public Object invoke(final MethodInvocation methodInvocation) throws Throwable {
//获取目标类
Class<?> targetClass =
methodInvocation.getThis() != null ? AopUtils.getTargetClass(methodInvocation.getThis()) : null;
//获取调用的目标方法
Method specificMethod = ClassUtils.getMostSpecificMethod(methodInvocation.getMethod(), targetClass);
//只要不是object 的方法
if (specificMethod != null && !specificMethod.getDeclaringClass().equals(Object.class)) {
final Method method = BridgeMethodResolver.findBridgedMethod(specificMethod);
//拿到注解
final GlobalTransactional globalTransactionalAnnotation =
getAnnotation(method, targetClass, GlobalTransactional.class);
//获取全局锁
final GlobalLock globalLockAnnotation = getAnnotation(method, targetClass, GlobalLock.class);
boolean localDisable = disable || (degradeCheck && degradeNum >= degradeCheckAllowTimes);
if (!localDisable) {
if (globalTransactionalAnnotation != null) {
//处理全局事务
return handleGlobalTransaction(methodInvocation, globalTransactionalAnnotation);
} else if (globalLockAnnotation != null) {
return handleGlobalLock(methodInvocation);
}
}
}
//调用原始方法
return methodInvocation.proceed();
}
GlobalTransactionalInterceptor
private Object handleGlobalTransaction(final MethodInvocation methodInvocation,
final GlobalTransactional globalTrxAnno) throws Throwable {
boolean succeed = true;
try {
//调用之后调用重写的方法
return transactionalTemplate.execute(new TransactionalExecutor() {
@Override
public Object execute() throws Throwable {
return methodInvocation.proceed();
}
//获取名字 GlobalTransactional中定义的name
public String name() {
String name = globalTrxAnno.name();
if (!StringUtils.isNullOrEmpty(name)) {
return name;
}
return formatMethod(methodInvocation.getMethod());
}
//填充属性
@Override
public TransactionInfo getTransactionInfo() {
TransactionInfo transactionInfo = new TransactionInfo();
transactionInfo.setTimeOut(globalTrxAnno.timeoutMills());
transactionInfo.setName(name());
transactionInfo.setPropagation(globalTrxAnno.propagation());
Set<RollbackRule> rollbackRules = new LinkedHashSet<>();
for (Class<?> rbRule : globalTrxAnno.rollbackFor()) {
rollbackRules.add(new RollbackRule(rbRule));
}
for (String rbRule : globalTrxAnno.rollbackForClassName()) {
rollbackRules.add(new RollbackRule(rbRule));
}
for (Class<?> rbRule : globalTrxAnno.noRollbackFor()) {
rollbackRules.add(new NoRollbackRule(rbRule));
}
for (String rbRule : globalTrxAnno.noRollbackForClassName()) {
rollbackRules.add(new NoRollbackRule(rbRule));
}
transactionInfo.setRollbackRules(rollbackRules);
return transactionInfo;
}
});
} catch (TransactionalExecutor.ExecutionException e) {
TransactionalExecutor.Code code = e.getCode();
switch (code) {
case RollbackDone:
throw e.getOriginalException();
case BeginFailure:
succeed = false;
failureHandler.onBeginFailure(e.getTransaction(), e.getCause());
throw e.getCause();
case CommitFailure:
succeed = false;
failureHandler.onCommitFailure(e.getTransaction(), e.getCause());
throw e.getCause();
case RollbackFailure:
failureHandler.onRollbackFailure(e.getTransaction(), e.getOriginalException());
throw e.getOriginalException();
case RollbackRetrying:
failureHandler.onRollbackRetrying(e.getTransaction(), e.getOriginalException());
throw e.getOriginalException();
default:
throw new ShouldNeverHappenException(String.format("Unknown TransactionalExecutor.Code: %s", code));
}
} finally {
if (degradeCheck) {
EVENT_BUS.post(new DegradeCheckEvent(succeed));
}
}
}
处理事务传播机制
TransactionalTemplate
public Object execute(TransactionalExecutor business) throws Throwable {
// 1 get transactionInfo
//获取事务信息
TransactionInfo txInfo = business.getTransactionInfo();
if (txInfo == null) {
throw new ShouldNeverHappenException("transactionInfo does not exist");
}
// 1.1 get or create a transaction
//获取全局事务
GlobalTransaction tx = GlobalTransactionContext.getCurrentOrCreate();
// 1.2 Handle the Transaction propatation and the branchType
//事务传播行为 默认REQUIRED
Propagation propagation = txInfo.getPropagation();
SuspendedResourcesHolder suspendedResourcesHolder = null;
try {
switch (propagation) {
//非事务方式执行 存在事务挂起
case NOT_SUPPORTED:
suspendedResourcesHolder = tx.suspend(true);
return business.execute();
// 新建事务 如果当前存在事务把事务挂起
case REQUIRES_NEW:
suspendedResourcesHolder = tx.suspend(true);
break;
//支持当前事务 如果当前没有事务已非事务的方式运行
case SUPPORTS:
if (!existingTransaction()) {
return business.execute();
}
break;
//支持当前事务 如果没有则新建
case REQUIRED:
break;
// 已非事务的方式执行 如果当前存在事务抛出异常
case NEVER:
if (existingTransaction()) {
throw new TransactionException(
String.format("Existing transaction found for transaction marked with propagation 'never',xid = %s"
,RootContext.getXID()));
} else {
return business.execute();
}
// 支持当前事务 如果当前存在事务则抛异常
case MANDATORY:
if (!existingTransaction()) {
throw new TransactionException("No existing transaction found for transaction marked with propagation 'mandatory'");
}
break;
default:
throw new TransactionException("Not Supported Propagation:" + propagation);
}
......
} finally {
tx.resume(suspendedResourcesHolder);
}
}
全局事务开启
......
try {
// 2. begin transaction
//开启全局事务
beginTransaction(txInfo, tx);
Object rs = null;
try {
//执行业务代码
// Do Your Business
rs = business.execute();
} catch (Throwable ex) {
//出现异常回滚
// 3.the needed business exception to rollback.
completeTransactionAfterThrowing(txInfo, tx, ex);
throw ex;
}
//提交
// 4. everything is fine, commit.
commitTransaction(tx);
return rs;
} finally {
//5. clear
triggerAfterCompletion();
cleanUp();
}
......
获取全局事务
public static GlobalTransaction getCurrentOrCreate() {
GlobalTransaction tx = getCurrent();
if (tx == null) {
return createNew();
}
return tx;
}
开启全局事务
private void beginTransaction(TransactionInfo txInfo, GlobalTransaction tx) throws TransactionalExecutor.ExecutionException {
try {
//前置钩子扩展
triggerBeforeBegin();
tx.begin(txInfo.getTimeOut(), txInfo.getName());
//后置钩子扩展
triggerAfterBegin();
} catch (TransactionException txe) {
throw new TransactionalExecutor.ExecutionException(tx, txe,
TransactionalExecutor.Code.BeginFailure);
}
}
DefaultGlobalTransaction
@Override
public void begin(int timeout, String name) throws TransactionException {
if (role != GlobalTransactionRole.Launcher) {
assertXIDNotNull();
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("Ignore Begin(): just involved in global transaction [{}]", xid);
}
return;
}
assertXIDNull();
if (RootContext.getXID() != null) {
throw new IllegalStateException();
}
//开启事务 xid是server端创建返回的
xid = transactionManager.begin(null, null, name, timeout);
//修改状态为Begin
status = GlobalStatus.Begin;
//绑定 xid 在被调用方会从Feign 那个拦截器里拿出来用
RootContext.bind(xid);
if (LOGGER.isInfoEnabled()) {
LOGGER.info("Begin new global transaction [{}]", xid);
}
}
@Override
public String begin(String applicationId, String transactionServiceGroup, String name, int timeout)
throws TransactionException {
//创建请求
GlobalBeginRequest request = new GlobalBeginRequest();
request.setTransactionName(name);
request.setTimeout(timeout);
//发起请求 通过netty
GlobalBeginResponse response = (GlobalBeginResponse) syncCall(request);
if (response.getResultCode() == ResultCode.Failed) {
throw new TmTransactionException(TransactionExceptionCode.BeginFailed, response.getMsg());
}
return response.getXid();
}
TC处理TM的开启全局事务请求
前面那些Netty怎么处理接收请求的就不写了 想看的可以子集dbug看
AbstractTCInboundHandler
@Override
public GlobalBeginResponse handle(GlobalBeginRequest request, final RpcContext rpcContext) {
GlobalBeginResponse response = new GlobalBeginResponse();
exceptionHandleTemplate(new AbstractCallback<GlobalBeginRequest, GlobalBeginResponse>() {
@Override
public void execute(GlobalBeginRequest request, GlobalBeginResponse response) throws TransactionException {
try {
doGlobalBegin(request, response, rpcContext);
} catch (StoreException e) {
throw new TransactionException(TransactionExceptionCode.FailedStore,
String.format("begin global request failed. xid=%s, msg=%s", response.getXid(), e.getMessage()),
e);
}
}
}, request, response);
return response;
}
DefaultCoordinator
@Override
protected void doGlobalBegin(GlobalBeginRequest request, GlobalBeginResponse response, RpcContext rpcContext)
throws TransactionException {
//设置全局事务id
response.setXid(core.begin(rpcContext.getApplicationId(), rpcContext.getTransactionServiceGroup(),
request.getTransactionName(), request.getTimeout()));
if (LOGGER.isInfoEnabled()) {
LOGGER.info("Begin new global transaction applicationId: {},transactionServiceGroup: {}, transactionName: {},timeout:{},xid:{}",
rpcContext.getApplicationId(), rpcContext.getTransactionServiceGroup(), request.getTransactionName(), request.getTimeout(), response.getXid());
}
}
@Override
public String begin(String applicationId, String transactionServiceGroup, String name, int timeout)
throws TransactionException {
//创建一个全局会话
GlobalSession session = GlobalSession.createGlobalSession(applicationId, transactionServiceGroup, name,
timeout);
//设置xid信息 到map里面
MDC.put(RootContext.MDC_KEY_XID, session.getXid());
session.addSessionLifecycleListener(SessionHolder.getRootSessionManager());
//开启一个会话
session.begin();
//发送事件
// transaction start event
MetricsPublisher.postSessionDoingEvent(session, false);
return session.getXid();
}
GlobalSession
@Override
public void begin() throws TransactionException {
//更新状态
this.status = GlobalStatus.Begin;
//更新变更事件
this.beginTime = System.currentTimeMillis();
this.active = true;
//遍历会话生命周期的监听
for (SessionLifecycleListener lifecycleListener : lifecycleListeners) {
lifecycleListener.onBegin(this);
}
}
AbstractSessionManager
@Override
public void onBegin(GlobalSession globalSession) throws TransactionException {
addGlobalSession(globalSession);
}
AbstractSessionManager
@Override
public void addGlobalSession(GlobalSession session) throws TransactionException {
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("MANAGER[" + name + "] SESSION[" + session + "] " + LogOperation.GLOBAL_ADD);
}
writeSession(LogOperation.GLOBAL_ADD, session);
}
将全局事务写到事务表(global_table)中
AbstractSessionManager
private void writeSession(LogOperation logOperation, SessionStorable sessionStorable) throws TransactionException {
if (!transactionStoreManager.writeSession(logOperation, sessionStorable)) {
if (LogOperation.GLOBAL_ADD.equals(logOperation)) {
throw new GlobalTransactionException(TransactionExceptionCode.FailedWriteSession,
"Fail to store global session");
} else if (LogOperation.GLOBAL_UPDATE.equals(logOperation)) {
throw new GlobalTransactionException(TransactionExceptionCode.FailedWriteSession,
"Fail to update global session");
} else if (LogOperation.GLOBAL_REMOVE.equals(logOperation)) {
throw new GlobalTransactionException(TransactionExceptionCode.FailedWriteSession,
"Fail to remove global session");
} else if (LogOperation.BRANCH_ADD.equals(logOperation)) {
throw new BranchTransactionException(TransactionExceptionCode.FailedWriteSession,
"Fail to store branch session");
} else if (LogOperation.BRANCH_UPDATE.equals(logOperation)) {
throw new BranchTransactionException(TransactionExceptionCode.FailedWriteSession,
"Fail to update branch session");
} else if (LogOperation.BRANCH_REMOVE.equals(logOperation)) {
throw new BranchTransactionException(TransactionExceptionCode.FailedWriteSession,
"Fail to remove branch session");
} else {
throw new BranchTransactionException(TransactionExceptionCode.FailedWriteSession,
"Unknown LogOperation:" + logOperation.name());
}
}
}
transactionStoreManager 分为三种一种是文件的形式 一种是db的形式一种是redis的形式 他们是项目启动的时候在Server的start方法中调用SessionHolder的init方法进行加载 mode是在 配置文件的 store.mode进行配置 我这里是配置的 db 所以会走下面的 DataBaseTransactionStoreManager 的逻辑中
DataBaseTransactionStoreManager
@Override
public boolean writeSession(LogOperation logOperation, SessionStorable session) {
if (LogOperation.GLOBAL_ADD.equals(logOperation)) {
return logStore.insertGlobalTransactionDO(SessionConverter.convertGlobalTransactionDO(session));
} else if (LogOperation.GLOBAL_UPDATE.equals(logOperation)) {
return logStore.updateGlobalTransactionDO(SessionConverter.convertGlobalTransactionDO(session));
} else if (LogOperation.GLOBAL_REMOVE.equals(logOperation)) {
return logStore.deleteGlobalTransactionDO(SessionConverter.convertGlobalTransactionDO(session));
} else if (LogOperation.BRANCH_ADD.equals(logOperation)) {
return logStore.insertBranchTransactionDO(SessionConverter.convertBranchTransactionDO(session));
} else if (LogOperation.BRANCH_UPDATE.equals(logOperation)) {
return logStore.updateBranchTransactionDO(SessionConverter.convertBranchTransactionDO(session));
} else if (LogOperation.BRANCH_REMOVE.equals(logOperation)) {
return logStore.deleteBranchTransactionDO(SessionConverter.convertBranchTransactionDO(session));
} else {
throw new StoreException("Unknown LogOperation:" + logOperation.name());
}
LogStoreDataBaseDAO
@Override
public boolean insertGlobalTransactionDO(GlobalTransactionDO globalTransactionDO) {
//新增global_table信息的sql
String sql = LogStoreSqlsFactory.getLogStoreSqls(dbType).getInsertGlobalTransactionSQL(globalTable);
Connection conn = null;
PreparedStatement ps = null;
try {
int index = 1;
conn = logStoreDataSource.getConnection();
conn.setAutoCommit(true);
ps = conn.prepareStatement(sql);
ps.setString(index++, globalTransactionDO.getXid());
ps.setLong(index++, globalTransactionDO.getTransactionId());
ps.setInt(index++, globalTransactionDO.getStatus());
ps.setString(index++, globalTransactionDO.getApplicationId());
ps.setString(index++, globalTransactionDO.getTransactionServiceGroup());
String transactionName = globalTransactionDO.getTransactionName();
transactionName = transactionName.length() > transactionNameColumnSize ?
transactionName.substring(0, transactionNameColumnSize) :
transactionName;
ps.setString(index++, transactionName);
ps.setInt(index++, globalTransactionDO.getTimeout());
ps.setLong(index++, globalTransactionDO.getBeginTime());
ps.setString(index++, globalTransactionDO.getApplicationData());
return ps.executeUpdate() > 0;
} catch (SQLException e) {
throw new StoreException(e);
} finally {
IOUtil.close(ps, conn);
}
}
自此TM开启全局事务结束
总结
1.项目启动的时候会由SpringBoot自动装配机制把GlobalTransactionScanner装配进spring容器中去做增强并会注入一个全局事务的拦截器
2.当有标准了GlobalTransactional的方法被执行的时候会通过拦截器进行拦截进行全局事务的开启
3.开启全局事务的之前会先根据不同的事务隔离级别去做不同的处理
4.然后TM会通过Netty发送一个开启全局事务的请求到TC
5.TC接收到请求之后会根据不同的数据类型去做不同的处理 我这里用的是db 并且用的是mysql 他会在mysql的global_table中去插入一条全局事务的数据 并更新事务状态为Begin
6.返回XID给TM 作为全局事务的ID