脑图
核心
Seata三大角色
- TC :事务协调者,netty server(服务器)
- TM :事务管理器,netty client(客户端)
- RM: 资源管理器,netty client(客户端)
@GlobalTransactional(name = "fsp-create-order" rollbackFor = Exception.class)
public void create(Order order) {
orderMapper.create(order);
//feign调用
accountService.decrease(order.getuserId(),order.getMoney());
}
- 只要方法上加了@GlobalTransactional,Seata通过aop检测到之后,就会使用TM和TC通信,注册全局事务。
- 在@GlobalTransactional涵括的代码中,不管是本服务中的sql操作,还是feign调用别的服务的sql操作,只要sql操作满足如下:insert操作,delete操作,update操作,select for update操作。 就会被seata增强,使用RM与TC通信,注册分支事务。
@GlobalTransactional 源码解析
GlobalTransactionScanner继承自AbstractAutoProxyCreator,在这里拦截到加了@GlobalTransactional的方法。
GlobalTransactionScanner
public class GlobalTransactionScanner extends AbstractAutoProxyCreator
implements InitializingBean, ApplicationContextAwareDisposableBean {}
我们需要关注的方法如下:
- AbstractAutoProxyCreator:wrapIfNecessary(aop的核心),getAdvicesAndAdvisorsForBean(拦截器)
- InitializingBean:afterPropertiesSet(初始化TM,RM)
Spring生命周期回调
@Override
public void afterPropertiesSet() {
//是否禁止了全局事务
if (disableGlobalTransaction) {
if (LOGGER.isInfoEnabled()) {
LOGGER.info("Global transaction is disabled.");
}
return;
}
//初始化netty 客户端(TM RM)
initClient();
}
初始化TM,RM
private void initClient() {
//init TM
TMClient.init(applicationId, txServiceGroup);
//init RM
RMClient.init(applicationId, txServiceGroup);
}
代码最终会调用到RpcClientBootstrap -> start
bootstrap.handler(
ChannetPipeline pipeline = ch.pipeline();
pipeline.addLast(
new IdleStateHandler(
nettyclientConfig.getChannelMaxReadIdleSeconds(), //15s
nettyclientConfig.getChannelMaxWriteIdleSeconds(),//15s
nettyctientconfig.getchannelMaxATTIdTeSeconds()))
.addLast(new ProtocolV1Decoder()) //Seata白定义编解码器,这里可以看Seata的协议RpcMessage
.addLast(new ProtocoTV1Encoder());
if (null != channelHandlers){
addChannetPipelineLast(ch,channelHandlers);
}
});
wrapIfNecessary
//判断此类中所有方法是否存在@GLobalTransactional或者@GTobalLock注解
if(!existsAnnotation(new class[]{serviceInterface})
&& !existsAnnotation(interfacesIfJdk)) {
return bean; //有GlobalTransactional注解的方法才会被下面的interceptor拦截
}
//如果存在@GTobalTransactional或者GLobalLock注解
if (interceptor == null) {
//实例化一个新的全局事务拦截器。
interceptor = new GlobalTransactionalInterceptor(failureHandlerHook);
//根据配置文件配置的配置中心 实例不同的ConfiqurationFactory
ConfigurationFactory.getInstance().addConfiglistener(ConfigurationkKeys.DISABLE_GLOBAL_TRANSACTION,(ConfigurationChangeListener) interceptor);
}
}
GlobalTransactionalInterceptor -> invoke
@override
public Object invoke(final MethodInvocation methodInvocation) throws Throwable {
Class<?> targetclass = methodInvocation.getThis() != nULL ? Aoputils.getTargetClass(methodInvocation.getthis()) : null;
Method specificMethod = Classutils.getMostSpecificMethod(methodInvocation.getMethod(), targetclass);
final Method method = BridgeMethodResolver.findBridgedMethod(specificMethod);
//拿到@GLobalTransactional注解的元数据
final GlobalTransactional globalTransactionalAnnotation = getAnnotation(method, GlobalTransactional.class);
//拿到@GLobalLock注解的元数据
final GlobalLock globalLockAnnotation = getAnnotation(method, GlobalLock.class);
if (!disable && globalTransactionalAnnotation != null) {
//处理@GLobalTransactional
return handleGlobalTransaction(methodInvocation, globalTransactionalAnnotation); //我们只关注这里
}else if(!disable && oloballockAnnotation != null){
//处理@GLobalLock
return handleGlobalLock(methodInvocation);
}else {
//直接调用原始方法
return methodInvocation.proceed();
}
}
GlobalTransactionalInterceptor#handleGlobalTransaction -> TransactionalTemplate#execute
//开始事务,tm获得xid,tc负责向global_table插入数据
beginTransaction(txInfo,tx); //开启全局事务,TM与TC通信,插入数据到global table表
Object rs = null;
try {
//[/做你的事情,回调加了@GLobalTransaction注解的那个方法
rs = business.execute() ; //执行加了@Globaltransactional的那个方法
} catch (Throwable ex) {
//上面方法抛异常了回滚全局事务
//.rollback全局事务
completeTransactionAfterThrowing(txInfo, tx , ex);
//不执行下面的throw,上面这行就会抛出回滚失败异常
//到这里说明回滚成功,抛出业务执行的异常
throw ex; //抛异常,回滚全局事务,TM与TC通信,TC收到后删除global table数据去branch tabel找到所有分支事务并通知的分支事务, 即给所有RM发信息RM收到信息后找到对应的undolog反向补偿.(正常回滚后应该是TC端删除global table,global lock,branch table RM删除undolog,反向补偿成功)
}
//一切0K提交
commitTransaction(tx); //提交全局事务,TM与TC通信,TC收到后删除global table, 删除branch table,删除global lock, 通知所有RM, RM收到后只需要将undo log删除即可
return rs;
全局事务的处理是通过spring aop来增强的,下面我们来看看分支事务是如何处理的。
分支事务
代理数据源配置
@Bean
public DataSource dataSource(){
DruidDataSource dataSource = new DruidDataSource();
dataSource.setDriverclassName("com.mysql.cj.jdbc.Driver");
dataSource.setPassword("poot");
dataSource.setUsername("root");
datasource.setUrl("jdbc:mysgl://127.0.0.1:3506/account?usenicode=true&characterEncoding=utf-8&serverTinezone=UTc"):
dataSource.setMaxActive(2000)dataSource.setMaxWait(20000);
return dataSource;
}
@Bean
public DataSourceProxy datasourceProxy(DataSource dataSource) {
return new DatasourceProxy(datasource);
}
@Bean
public SqlSessionFactory salSessionFactoryBean(DataSourceProxy datasourceProxy) throws Exception (
SqlSessionFactoryBean sqlSessionFactoryBean = new SglSessionFactoryBean();
sqlsessionFactoryBean.setDataSource(dataSourceProxy);
sqlSesionFactoryBean.setlapperlocations(new PathMatchingResourcePatternResolver().getResources("classpath:maper/*.xml");
sqlSessionFactoryBean.setTransactionFactory(new SpringManagedTransactionFactory());
return sqlsessionFactoryBean.getobject();
}
DataSourceProxy -> getConnection
@Override
public ConnectionProxy getConnection() throws SQLException {
Connection targetConnection = targetDataSource.getConnection();
return new ConnectionProxy(this, targetConnection);
}
ConnectionProxy -> doCommit
private void doCommit() throws SQLException {
//处理@GlobalTransaction的分支事务
if (context.inGlobalTransaction()) {
processGlobalTransactionCommit();
} else if (context.isGlobalLockRequire()) {
//处理@GlobalLock,即检查一下是否可以获取全局锁
processLocalCommitWithGlobalLocks();
} else {
targetConnection.commit();
}
}
ConnectionProxy -> processGlobalTransactionCommit
正常情况注册分支事务还会往lock_tabel插入一条记录,代表某个表的某行记录被seata用全局锁锁住了
private void processGlobalTransactionCommit() throws SQLException{
try{
//注册分支事务
register();
}catch (TransactionException e) {
//全局锁中突异常
recognizeLockKeyConflictException(e, context.buildLockKeys()); //注册分支事务,RM与TC通信,如果可以获取全局锁,那么TC往branch_table插入一条记录
}
try{
//向undolog表插入数据
UndoLogManagerFactory.getUndoLogManager(this.getDbType()).flushUndoLogs(this); //插入undo_log,前后置镜像不是此处生成,而是DMLExecutor生成的
//本地数据和undo_log表插入的数据一起提交
targetConnection.commit(); //我们代码的sql和这个undolog一起提交事务
}catch (Throwable ex) {
L06GER.error("process connectionProxy commit error: (", ex.getMessage(), ex);
//如果本地事务提交失败告诉seata服务器
report(false);
throw new SQLException(ex); //抛异常了,RM报告seata分支事务本地事务执行失败,TC收到后会删除alobal_table,global通知所有分支事务回滚,其他RM收到通知后用undolog反向回滚,删除undolog
}
1F (IS_REPORT_SUCCESS_ENABLE)(
//向seata服务端报告本地事务提交成功
report(true); //RM报告tc本地事务执行成功
}
context.reset();
}
快问快答
全局锁怎么实现的?
全局锁使用数据库表实现,lock_table。
为什么需要全局锁?
全局锁用于读写隔离,如果有多个分布式事务同时操作同一行数据库记录,那么可以保证数据的正确性。
lock_table什么时候插入记录,什么时候删除?
注册分支事务的时候会插入lock_table记录(正常情况),全局事务提交的时候会删除lock_table。
读写隔离?
- 写隔离,如果要用分布式事务,那么对于同一张表更新时建议全使用@GlobalTransaction
- 读隔离,使用@GlobalTransactional+select for update 或者 @GlobalLock+@Transactional+select for update