Seata - @GlobalTransactional源码解析

news2024/11/23 18:31:42

脑图

在这里插入图片描述

核心

Seata三大角色

  • TC :事务协调者,netty server(服务器)
  • TM :事务管理器,netty client(客户端)
  • RM: 资源管理器,netty client(客户端)
@GlobalTransactional(name = "fsp-create-order" rollbackFor = Exception.class)
public void create(Order order) {
	orderMapper.create(order);
	//feign调用
	accountService.decrease(order.getuserId(),order.getMoney());
}
  • 只要方法上加了@GlobalTransactional,Seata通过aop检测到之后,就会使用TM和TC通信,注册全局事务。
  • 在@GlobalTransactional涵括的代码中,不管是本服务中的sql操作,还是feign调用别的服务的sql操作,只要sql操作满足如下:insert操作,delete操作,update操作,select for update操作。 就会被seata增强,使用RM与TC通信,注册分支事务。

@GlobalTransactional 源码解析

在这里插入图片描述
GlobalTransactionScanner继承自AbstractAutoProxyCreator,在这里拦截到加了@GlobalTransactional的方法。

GlobalTransactionScanner

public class GlobalTransactionScanner extends AbstractAutoProxyCreator
			implements InitializingBean, ApplicationContextAwareDisposableBean {}

我们需要关注的方法如下:

  • AbstractAutoProxyCreator:wrapIfNecessary(aop的核心),getAdvicesAndAdvisorsForBean(拦截器)
  • InitializingBean:afterPropertiesSet(初始化TM,RM)

Spring生命周期回调

	@Override
    public void afterPropertiesSet() {
        //是否禁止了全局事务
        if (disableGlobalTransaction) {
            if (LOGGER.isInfoEnabled()) {
                LOGGER.info("Global transaction is disabled.");
            }
            return;
        }
        //初始化netty 客户端(TM  RM)
        initClient();
    }

初始化TM,RM

 private void initClient() {  
        //init TM
        TMClient.init(applicationId, txServiceGroup);
        
        //init RM
        RMClient.init(applicationId, txServiceGroup);
    }

代码最终会调用到RpcClientBootstrap -> start

	bootstrap.handler(
	 ChannetPipeline pipeline = ch.pipeline();
		pipeline.addLast(
			new IdleStateHandler(
			nettyclientConfig.getChannelMaxReadIdleSeconds(),  //15s
			nettyclientConfig.getChannelMaxWriteIdleSeconds(),//15s
			nettyctientconfig.getchannelMaxATTIdTeSeconds()))
			.addLast(new ProtocolV1Decoder())		//Seata白定义编解码器,这里可以看Seata的协议RpcMessage
			.addLast(new ProtocoTV1Encoder());
			if (null != channelHandlers){
				addChannetPipelineLast(ch,channelHandlers);
			}
	});

wrapIfNecessary

//判断此类中所有方法是否存在@GLobalTransactional或者@GTobalLock注解
	if(!existsAnnotation(new class[]{serviceInterface})
		&& !existsAnnotation(interfacesIfJdk)) {
			return bean;			//有GlobalTransactional注解的方法才会被下面的interceptor拦截
   }
	
	//如果存在@GTobalTransactional或者GLobalLock注解
	if (interceptor == null) {
		//实例化一个新的全局事务拦截器。
		interceptor = new GlobalTransactionalInterceptor(failureHandlerHook);
		//根据配置文件配置的配置中心 实例不同的ConfiqurationFactory
		ConfigurationFactory.getInstance().addConfiglistener(ConfigurationkKeys.DISABLE_GLOBAL_TRANSACTION(ConfigurationChangeListener) interceptor);
		}
	}	

GlobalTransactionalInterceptor -> invoke

	@override
	public Object invoke(final MethodInvocation methodInvocation) throws Throwable {
		Class<?> targetclass = methodInvocation.getThis() != nULL ? Aoputils.getTargetClass(methodInvocation.getthis()) : null;

		Method specificMethod = Classutils.getMostSpecificMethod(methodInvocation.getMethod(), targetclass);
		final Method method = BridgeMethodResolver.findBridgedMethod(specificMethod);
		//拿到@GLobalTransactional注解的元数据
		final GlobalTransactional globalTransactionalAnnotation = getAnnotation(method, GlobalTransactional.class);
		//拿到@GLobalLock注解的元数据
		final GlobalLock globalLockAnnotation = getAnnotation(method, GlobalLock.class);
		if (!disable && globalTransactionalAnnotation != null) {
			//处理@GLobalTransactional
			return handleGlobalTransaction(methodInvocation, globalTransactionalAnnotation); //我们只关注这里
		}else if(!disable && oloballockAnnotation != null){
			//处理@GLobalLock
			return handleGlobalLock(methodInvocation);
		}else {
			//直接调用原始方法
			return methodInvocation.proceed();
		}
	}	

GlobalTransactionalInterceptor#handleGlobalTransaction -> TransactionalTemplate#execute

		//开始事务,tm获得xid,tc负责向global_table插入数据
		beginTransaction(txInfo,tx);	//开启全局事务,TM与TC通信,插入数据到global table表

		Object rs = null;
		try {
			//[/做你的事情,回调加了@GLobalTransaction注解的那个方法
			rs = business.execute() ;		//执行加了@Globaltransactional的那个方法
		} catch (Throwable ex) {
			//上面方法抛异常了回滚全局事务
			//.rollback全局事务
			completeTransactionAfterThrowing(txInfo, tx , ex);
			//不执行下面的throw,上面这行就会抛出回滚失败异常
			//到这里说明回滚成功,抛出业务执行的异常
			throw ex;		//抛异常,回滚全局事务,TM与TC通信,TC收到后删除global table数据去branch tabel找到所有分支事务并通知的分支事务, 即给所有RM发信息RM收到信息后找到对应的undolog反向补偿.(正常回滚后应该是TC端删除global table,global lock,branch table RM删除undolog,反向补偿成功)
		}

		//一切0K提交
		commitTransaction(tx);	//提交全局事务,TM与TC通信,TC收到后删除global table, 删除branch table,删除global lock, 通知所有RM, RM收到后只需要将undo log删除即可
		
		return rs;	

全局事务的处理是通过spring aop来增强的,下面我们来看看分支事务是如何处理的。

分支事务

代理数据源配置

	@Bean
	public DataSource dataSource(){
		DruidDataSource dataSource = new DruidDataSource();
		dataSource.setDriverclassName("com.mysql.cj.jdbc.Driver");
		dataSource.setPassword("poot");
		dataSource.setUsername("root");
		datasource.setUrl("jdbc:mysgl://127.0.0.1:3506/account?usenicode=true&characterEncoding=utf-8&serverTinezone=UTc"):
		dataSource.setMaxActive(2000)dataSource.setMaxWait(20000);
		return dataSource;
	}

	@Bean
	public DataSourceProxy datasourceProxy(DataSource dataSource) { 	
		return new DatasourceProxy(datasource);
	}

	@Bean
	public SqlSessionFactory salSessionFactoryBean(DataSourceProxy datasourceProxy) throws Exception (
		SqlSessionFactoryBean sqlSessionFactoryBean = new SglSessionFactoryBean();
		sqlsessionFactoryBean.setDataSource(dataSourceProxy);
		sqlSesionFactoryBean.setlapperlocations(new PathMatchingResourcePatternResolver().getResources("classpath:maper/*.xml");
		sqlSessionFactoryBean.setTransactionFactory(new SpringManagedTransactionFactory());
		return sqlsessionFactoryBean.getobject();
	}

DataSourceProxy -> getConnection

	@Override
    public ConnectionProxy getConnection() throws SQLException {
        Connection targetConnection = targetDataSource.getConnection();
        return new ConnectionProxy(this, targetConnection);
    }

ConnectionProxy -> doCommit

	private void doCommit() throws SQLException {
		//处理@GlobalTransaction的分支事务
        if (context.inGlobalTransaction()) {
            processGlobalTransactionCommit();
        } else if (context.isGlobalLockRequire()) { 
        	//处理@GlobalLock,即检查一下是否可以获取全局锁
            processLocalCommitWithGlobalLocks();
        } else {
            targetConnection.commit();
        }
    }

ConnectionProxy -> processGlobalTransactionCommit
正常情况注册分支事务还会往lock_tabel插入一条记录,代表某个表的某行记录被seata用全局锁锁住了

private void processGlobalTransactionCommit() throws SQLException{
	try{
		//注册分支事务
		register();
	}catch (TransactionException e) {
		//全局锁中突异常
		recognizeLockKeyConflictException(e, context.buildLockKeys());	//注册分支事务,RM与TC通信,如果可以获取全局锁,那么TC往branch_table插入一条记录
	}
	
	try{
		//向undolog表插入数据
		UndoLogManagerFactory.getUndoLogManager(this.getDbType()).flushUndoLogs(this);		//插入undo_log,前后置镜像不是此处生成,而是DMLExecutor生成的
		//本地数据和undo_log表插入的数据一起提交
		targetConnection.commit();	//我们代码的sql和这个undolog一起提交事务
	}catch (Throwable ex) {
		L06GER.error("process connectionProxy commit error: (", ex.getMessage(), ex);
		//如果本地事务提交失败告诉seata服务器
		report(false);	
		throw new SQLException(ex);		//抛异常了,RM报告seata分支事务本地事务执行失败,TC收到后会删除alobal_table,global通知所有分支事务回滚,其他RM收到通知后用undolog反向回滚,删除undolog
	}

	1F (IS_REPORT_SUCCESS_ENABLE)(
		//向seata服务端报告本地事务提交成功
		report(true);	//RM报告tc本地事务执行成功
	}
	
	context.reset();
}	

快问快答

全局锁怎么实现的?
全局锁使用数据库表实现,lock_table。

为什么需要全局锁?
全局锁用于读写隔离,如果有多个分布式事务同时操作同一行数据库记录,那么可以保证数据的正确性。

lock_table什么时候插入记录,什么时候删除?
注册分支事务的时候会插入lock_table记录(正常情况),全局事务提交的时候会删除lock_table。

读写隔离?

  • 写隔离,如果要用分布式事务,那么对于同一张表更新时建议全使用@GlobalTransaction
  • 读隔离,使用@GlobalTransactional+select for update 或者 @GlobalLock+@Transactional+select for update

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/91522.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

【静脉检测】手指静脉图像检测【含Matlab源码 1654期】

⛄一、简介 手指静脉识别系统的性能非常依赖于采集图像的质量,但是采集设备在成像和传输时产生的各类噪声,以及开放式使用场景下设备镜面上存在脏污、用户手指存在蜕皮情况等因素都会对图像质量造成极大的影响,增大后续特征提取的难度,最终影响整个系统的识别性能。针对目前现…

Zabbix 6.2 监控 PostgreSQL13 数据库

Zabbix 6.2 监控 PostgreSQL13 数据库 文章目录Zabbix 6.2 监控 PostgreSQL13 数据库官方模版地址1.pgsql新建监控用户2. 编辑 pg_hba.conf 文件并重启3.拷贝监控脚本到var/lib/zabbix下4.zabbix监控导入模版5.主机配置模版6.验证监控数据官方模版地址 https://git.zabbix.com…

一文带你读懂何为 macOS App 公证,以及如何自动化实现

前言 在上篇文章「macOS App 自动化分发 App Store 探索与实践」中讲解了如何通过 Shell 脚本实现 macOS App 自动化分发 App Store。相信&#xff0c;看过的同学都或多或少对 macOS App 构建、分发 App Store 相关的知识都具备了一定的认知。 而对于开发者来说&#xff0c;我…

RCE(远程代码/命令执行漏洞)原理及靶场练习

目录 PHP-RCE涉及函数 基础命令符 靶场练习 PHP-RCE涉及函数 代码注入 eval() 把字符串 code 作为PHP代码执行 assert() 检查一个断言是否为 false preg_replace() 执行一个正则表达式的搜索和替换 create_function() 创建一个匿名函数并且返回函数名创 call_user_func()/ca…

Android进程启动流程

一.Android 系统架构图 ​虽然 Android 系统非常庞大且错综复杂&#xff0c;但整体架构设计清晰。Android 底层内核空间以 Linux Kernel 作为基石&#xff0c;上层用户空间由 Native系统库、虚拟机运行环境、框架层组成&#xff0c;通过系统调用(Syscall)连通系统的内核空间 与…

浅谈hudi 的callback回调机制

浅谈hudi 的callback回调机制 关于hudi的write operations,hudi有4种类型,分别为upsert/insert/bulk_insert/delete[软删除/硬删除]。 了解hudi的都知道,hudi有一个核心的机制就是timeline,hudi的instantDTO包含action(动作),ts(时间),state(状态)。 action主要包括: commits…

Linux常用命令总结(建议收藏)

文章目录一、文件管理1、cat&#xff1a;查看文件内容案例1&#xff1a;输出内容行数2、chmod&#xff1a;是控制用户对文件的权限的命令案例1&#xff1a;&#xff1a;将user文件修改成用户、组、其他用户都可以读写可执行的权限3、diff&#xff1a;用于比较文件的差异4、find…

ELK (一)部署ELK+Filebeat日志收集分析系统

说明&#xff1a;此安装流程只适用于8.0.0以下的版本 1. ElasticSearch 部署 1.1 下载ElasticSearch的wget指令&#xff1a; wget https://artifacts.elastic.co/downloads/elasticsearch/elasticsearch-7.13.4-linux-x86_64.tar.gz1.2 解压安装包到指定目录 指定解压缩到 …

【指纹识别】指纹识别【含GUI Matlab源码 586期】

⛄一、指纹识别简介 1 指纹识别的引入和原理 1.1 指纹的基本知识 指纹&#xff0c;由于其具有终身不变性、唯一性和方便性&#xff0c;已几乎成为生物特征识别的代名词。指纹是指人的手指末端正面皮肤上凸凹不平产生的纹线。纹线有规律的排列形成不同的纹型。纹线的起点、终点…

SpringBoot+Vue实现前后端分离的旅游网站

文末获取源码 开发语言&#xff1a;Java 使用框架&#xff1a;spring boot 前端技术&#xff1a;JavaScript、Vue.js 、css3 开发工具&#xff1a;IDEA/MyEclipse/Eclipse、Visual Studio Code 数据库&#xff1a;MySQL 5.7/8.0 数据库管理工具&#xff1a;phpstudy/Navicat JD…

【Effective_Objective-C_2对象,消息,运行期1】

文章目录前言6 理解”属性“这一概念定义变量不兼容现象的出现解决不兼容现象-Property使用属性更便捷属性特质原子性读写权限内存管理语义方法名原子性和非原子性要点总结7 在对象内部尽量直接访问实例变量要点总结8 理解“对象等同性” 和 isEqual“” 判断的依据“isEuqalTo…

RTL8380M/82M管理型交换机系统软件操作指南四:QoS/服务质量

接下来对QoS进行详细的描述&#xff0c;主要包括以下七大内容&#xff1a;QoS概述、功能简介、拥塞管理、策略分类、调度方式、优先级映射配置、QoS端口配置. 1.1 QoS概述 QoS&#xff08;Quality of Service&#xff0c;服务质量&#xff09;是用各种手段解决网络延迟和阻塞等…

[附源码]Python计算机毕业设计SSM基于Java动漫论坛系统(程序+LW)

项目运行 环境配置&#xff1a; Jdk1.8 Tomcat7.0 Mysql HBuilderX&#xff08;Webstorm也行&#xff09; Eclispe&#xff08;IntelliJ IDEA,Eclispe,MyEclispe,Sts都支持&#xff09;。 项目技术&#xff1a; SSM mybatis Maven Vue 等等组成&#xff0c;B/S模式 M…

业务模型设计

业务模型设计业务模型设计统一语言、术语统一单词业务数据表模型规范数据库范式几个经验业务模型索引主键&#xff1a; 自增id、雪花id、和uuid 差别创建表字符集设置myisam 和 innodb 区别业务模型设计 统一语言、术语 定义&#xff1a;需求分析的过程&#xff08;系统目标、…

001、【C语言编程题目】猴子吃桃问题

001、【题目】猴子吃桃问题 猴子吃桃问题&#xff1a;猴子第一天吃了若干个桃子&#xff0c;当即吃了一半&#xff0c;还不解馋&#xff0c;又多吃了一个&#xff1b; 第二天&#xff0c;吃剩下的桃子的一半&#xff0c;还不过瘾&#xff0c;又多吃了一个&#xff1b;以后每天…

艾美捷针对性检测—游离维多珠单抗ADA水平检测试剂盒

艾美捷游离维多珠单抗ADA水平检测试剂盒可靠地测定游离ADA针对维多利单抗&#xff08;如ENTYVIO&#xff09;). 风湿因子的联合测定或排除不规则抗体。连同确定维多利单抗的活性物质浓度由IDK监测仪指示 维多利单抗免费ADA ELISA&#xff0c;主治医师有可能陪同治疗并在早期阶段…

用Python作一条已知曲线的等距曲线

参考资料&#xff1a; 该如何作一条已知曲线的等距曲线&#xff1f; - 知乎 等距线_百度百科 目录 1.等距线 2.数学推导 3.示例 4.代码与结果 1.等距线 等距线&#xff08;equidistant line&#xff09;亦称平行曲线&#xff0c;一种平面曲线&#xff0c;即由一已知曲线…

准备Plan B 如何设计兜底方案

对于很多秒杀系统而言&#xff0c;在诸如双十一这样的大流量的迅猛冲击下&#xff0c;都曾经或多或少发生过宕机的情况。当一个系统面临的大流量时&#xff0c;它其实很难单靠自身调整来恢复状态&#xff0c;你必须等待流量自然下降或者认人为地把流量切走才行&#xff0c;这无…

Android OpenGL ES 学习(九) – 坐标系统和实现3D效果

OpenGL 学习教程 Android OpenGL ES 学习(一) – 基本概念 Android OpenGL ES 学习(二) – 图形渲染管线和GLSL Android OpenGL ES 学习(三) – 绘制平面图形 Android OpenGL ES 学习(四) – 正交投影 Android OpenGL ES 学习(五) – 渐变色 Android OpenGL ES 学习(六) – 使用…

高通Ride软件开发包使用指南(7)

高通Ride软件开发包使用指南&#xff08;7&#xff09;6.5 构建 x86 Ubuntu SDK6.6端到端可视化6.7 x86 Ubuntu上的功能验证6.7.1简单比特率验证6.7.2在x86笔记本上用 8xCams HEVC格式 录制 FPS6.8记录仪6.5 构建 x86 Ubuntu SDK 构建 x86 ubuntu ~/src/qride/stack-sdk$ ./ex…