AT 模式
概述
Seata AT 模式是一种非侵入式的分布式事务解决方案,Seata 在内部做了对数据库操作的代理层,我们使用 Seata AT 模式时,实际上用的是 Seata 自带的数据源代理 DataSourceProxy,Seata 在这层代理中加入了很多逻辑,比如插入回滚 undo_log 日志,检查全局锁等。
为什么要检查全局锁呢,这是由于 Seata AT 模式的事务隔离是建立在分支事务的本地隔离级别基础之上的,在数据库本地隔离级别读已提交或以上的前提下,Seata 设计了由事务协调器维护的全局写排他锁,来保证事务间的写隔离,同时,将全局事务默认定义在读未提交的隔离级别上。
Seata 的事务是一个全局事务,它包含了若干个分支本地事务,在全局事务执行过程中(全局事务还没执行完),某个本地事务提交了,如果 Seata 没有采取任何措施,则会导致已提交的本地事务被读取,造成脏读,如果数据在全局事务提交前已提交的本地事务被修改,则会造成脏写。
一阶段:业务数据和回滚日志在同一个本地事务提交,释放本地锁和连接资源。
二阶段:提交异步化,非常快速地完成。回滚通过一阶段的回滚日志反向补偿。
Seata AT模式隔离级别解读
适用场景
- 基于支持本地 ACID 事务的关系型数据库。
- Java 应用,通过 JDBC 访问数据库。
原理
一阶段
1、解析 SQL。得到 SQL 的类型(UPDATE)、表(product)、条件(where name = ‘TXC’)等相关的信息。
2、查询前镜像:根据解析得到的条件信息,生成查询语句,定位数据。
select id, name, since from product where name = 'TXC';
得到前镜像:
id | name | since |
---|---|---|
1 | TXC | 2014 |
3、执行业务 SQL。更新这条记录的 name 为 ‘GTS’。
4、查询后镜像:根据前镜像的结果,通过 主键 定位数据。
得到后镜像:
id | name | since |
---|---|---|
1 | GTS | 2014 |
5、插入回滚日志:把前后镜像数据以及业务 SQL 相关的信息组成一条回滚日志记录,插入到 UNDO_LOG
表中。
{
"branchId": 641789253,
"undoItems": [{
"afterImage": {
"rows": [{
"fields": [{
"name": "id",
"type": 4,
"value": 1
}, {
"name": "name",
"type": 12,
"value": "GTS"
}, {
"name": "since",
"type": 12,
"value": "2014"
}]
}],
"tableName": "product"
},
"beforeImage": {
"rows": [{
"fields": [{
"name": "id",
"type": 4,
"value": 1
}, {
"name": "name",
"type": 12,
"value": "TXC"
}, {
"name": "since",
"type": 12,
"value": "2014"
}]
}],
"tableName": "product"
},
"sqlType": "UPDATE"
}],
"xid": "xid:xxx"
}
6、提交前,向 TC 注册分支:申请 product
表中,主键值等于 1 的记录的 全局锁 。
7、本地事务提交:业务数据的更新和前面步骤中生成的 UNDO LOG 一并提交。
8、将本地事务提交的结果上报给 TC。
二阶段
回滚
1、收到 TC 的分支回滚请求,开启一个本地事务,执行如下操作。
2、通过 XID 和 Branch ID 查找到相应的 UNDO LOG 记录。
3、数据校验:拿 UNDO LOG 中的后镜与当前数据进行比较,如果有不同,说明数据被当前全局事务之外的动作做了修改。这种情况,需要根据配置策略来做处理,详细的说明在另外的文档中介绍。
4、根据 UNDO LOG 中的前镜像和业务 SQL 的相关信息生成并执行回滚的语句:
update product set name = 'TXC' where id = 1;
5、提交本地事务。并把本地事务的执行结果(即分支事务回滚的结果)上报给 TC。
提交
1、收到 TC 的分支提交请求,把请求放入一个异步任务的队列中,马上返回提交成功的结果给 TC。
2、异步任务阶段的分支提交请求将异步和批量地删除相应 UNDO LOG 记录。
回滚日志表如下:
CREATE TABLE `undo_log` (
`id` bigint(20) NOT NULL AUTO_INCREMENT,
`branch_id` bigint(20) NOT NULL,
`xid` varchar(100) NOT NULL,
`context` varchar(128) NOT NULL,
`rollback_info` longblob NOT NULL,
`log_status` int(11) NOT NULL,
`log_created` datetime NOT NULL,
`log_modified` datetime NOT NULL,
PRIMARY KEY (`id`),
UNIQUE KEY `ux_undo_log` (`xid`,`branch_id`)
) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8;
写隔离
一阶段本地事务提交之前,需要确保先拿到全局锁,这样才能提交本地事务。
尝试获取全局锁如果超时,则放弃尝试,然后回滚本地事务并释放本地锁。
两个全局事务 tx1 和 tx2,分别对 a 表的 m 字段进行更新操作,m 的初始值 1000。tx1 先开始,开启本地事务,拿到本地锁,更新操作 m = 1000 - 100 = 900。本地事务提交前,先拿到该记录的 全局锁 ,本地提交释放本地锁。 tx2 后开始,开启本地事务,拿到本地锁,更新操作 m = 900 - 100 = 800。本地事务提交前,尝试拿该记录的 全局锁 ,tx1 全局提交前,该记录的全局锁被 tx1 持有,tx2 需要重试等待 全局锁 。
tx1 二阶段全局提交,释放 全局锁 。tx2 拿到 全局锁 提交本地事务。
如果 tx1 的二阶段全局回滚,则 tx1 需要重新获取该数据的本地锁,进行反向补偿的更新操作,实现分支的回滚。此时,如果 tx2 仍在等待该数据的 全局锁,同时持有本地锁,则 tx1 的分支回滚会失败。分支的回滚会一直重试,直到 tx2 的 全局锁 等锁超时,放弃 全局锁 并回滚本地事务释放本地锁,tx1 的分支回滚最终成功。
因为整个过程 全局锁 在 tx1 结束前一直是被 tx1 持有的,所以不会发生 脏写 的问题。
读隔离
在数据库本地事务隔离级别 读已提交(Read Committed) 或以上的基础上,Seata(AT 模式)的默认全局隔离级别是 读未提交(Read Uncommitted) 。
SELECT FOR UPDATE 语句的执行会申请 全局锁 ,如果 全局锁 被其他事务持有,则释放本地锁(回滚 SELECT FOR UPDATE 语句的本地执行)并重试。这个过程中,查询是被 block 住的,直到 全局锁 拿到,即读取的相关数据是 已提交 的,才返回。
出于总体性能上的考虑,Seata 目前的方案并没有对所有 SELECT 语句都进行代理,仅针对 FOR UPDATE 的 SELECT 语句。
使用
假设 Bussiness 服务调用 Stock 服务和 Order 服务。
1、配置 Bussiness 服务。
-
配置 GlobalTransactionScanner 的applicationId 和 txServiceGroup(事务分组)。
-
使用 @GlobalTransactional 注解
-
可以使用 RootContext.getXID() 方法获取到 XID
-
按需修改 file.conf、registry.conf 文件的内容
@Configuration
public class SeataAutoConfig {
@Bean
public GlobalTransactionScanner globalTransactionScanner() {
// 指定 applicationId、txServiceGroup
return new GlobalTransactionScanner("dubbo-gts-seata-example", "my_test_tx_group");
}
}
@Service
public class BusinessServiceImpl implements BusinessService {
@Reference(version = "1.0.0")
private StockDubboService stockDubboService;
@Reference(version = "1.0.0")
private OrderDubboService orderDubboService;
private boolean flag;
@Override
@GlobalTransactional(timeoutMills = 300000, name = "dubbo-gts-seata-example")
public ObjectResponse handleBusiness(BusinessDTO businessDTO) {
System.out.println("开始全局事务,XID = " + RootContext.getXID());
ObjectResponse<Object> objectResponse = new ObjectResponse<>();
//1、扣减库存
CommodityDTO commodityDTO = new CommodityDTO();
commodityDTO.setCommodityCode(businessDTO.getCommodityCode());
commodityDTO.setCount(businessDTO.getCount());
ObjectResponse stockResponse = stockDubboService.decreaseStock(commodityDTO);
//2、创建订单
OrderDTO orderDTO = new OrderDTO();
orderDTO.setUserId(businessDTO.getUserId());
orderDTO.setCommodityCode(businessDTO.getCommodityCode());
orderDTO.setOrderCount(businessDTO.getCount());
orderDTO.setOrderAmount(businessDTO.getAmount());
ObjectResponse<OrderDTO> response = orderDubboService.createOrder(orderDTO);
//打开注释测试事务发生异常后,全局回滚功能
// if (!flag) {
// throw new RuntimeException("测试抛异常后,分布式事务回滚!");
// }
if (stockResponse.getStatus() != 200 || response.getStatus() != 200) {
throw new DefaultException(RspStatusEnum.FAIL);
}
objectResponse.setStatus(RspStatusEnum.SUCCESS.getCode());
objectResponse.setMessage(RspStatusEnum.SUCCESS.getMessage());
objectResponse.setData(response.getData());
return objectResponse;
}
}
2、 配置 Stock 服务。
- 配置 DataSource、DataSourceProxy、SqlSessionFactory、GlobalTransactionScanner
- 可以使用 RootContext.getXID() 方法获取到 XID
- 按需修改 file.conf、registry.conf 文件的内容
- 对应数据库配置 UNDO_LOG 表
@Configuration
public class SeataAutoConfig {
@Autowired
private DataSourceProperties dataSourceProperties;
@Bean
@Primary
public DruidDataSource druidDataSource() {
DruidDataSource druidDataSource = new DruidDataSource();
druidDataSource.setUrl(dataSourceProperties.getUrl());
druidDataSource.setUsername(dataSourceProperties.getUsername());
druidDataSource.setPassword(dataSourceProperties.getPassword());
druidDataSource.setDriverClassName(dataSourceProperties.getDriverClassName());
druidDataSource.setInitialSize(0);
druidDataSource.setMaxActive(180);
druidDataSource.setMaxWait(60000);
druidDataSource.setMinIdle(0);
druidDataSource.setValidationQuery("Select 1 from DUAL");
druidDataSource.setTestOnBorrow(false);
druidDataSource.setTestOnReturn(false);
druidDataSource.setTestWhileIdle(true);
druidDataSource.setTimeBetweenEvictionRunsMillis(60000);
druidDataSource.setMinEvictableIdleTimeMillis(25200000);
druidDataSource.setRemoveAbandoned(true);
druidDataSource.setRemoveAbandonedTimeout(1800);
druidDataSource.setLogAbandoned(true);
return druidDataSource;
}
@Bean
public DataSourceProxy dataSourceProxy(DruidDataSource druidDataSource) {
return new DataSourceProxy(druidDataSource);
}
@Bean
public SqlSessionFactory sqlSessionFactory(DataSourceProxy dataSourceProxy) throws Exception {
SqlSessionFactoryBean factoryBean = new SqlSessionFactoryBean();
factoryBean.setDataSource(dataSourceProxy);
factoryBean.setMapperLocations(
new PathMatchingResourcePatternResolver().getResources("classpath*:/mapper/*.xml"));
factoryBean.setTransactionFactory(new JdbcTransactionFactory());
return factoryBean.getObject();
}
@Bean
public GlobalTransactionScanner globalTransactionScanner() {
return new GlobalTransactionScanner("stock-gts-seata-example", "my_test_tx_group");
}
}
3、配置 Order 服务
- 配置 DataSource、DataSourceProxy、SqlSessionFactory、GlobalTransactionScanner
- 可以使用 RootContext.getXID() 方法获取到 XID
- 按需修改 file.conf、registry.conf 文件的内容
- 对应数据库配置 UNDO_LOG 表
@Configuration
public class SeataAutoConfig {
@Autowired
private DataSourceProperties dataSourceProperties;
@Bean
@Primary
public DruidDataSource druidDataSource() {
DruidDataSource druidDataSource = new DruidDataSource();
druidDataSource.setUrl(dataSourceProperties.getUrl());
druidDataSource.setUsername(dataSourceProperties.getUsername());
druidDataSource.setPassword(dataSourceProperties.getPassword());
druidDataSource.setDriverClassName(dataSourceProperties.getDriverClassName());
druidDataSource.setInitialSize(0);
druidDataSource.setMaxActive(180);
druidDataSource.setMaxWait(60000);
druidDataSource.setMinIdle(0);
druidDataSource.setValidationQuery("Select 1 from DUAL");
druidDataSource.setTestOnBorrow(false);
druidDataSource.setTestOnReturn(false);
druidDataSource.setTestWhileIdle(true);
druidDataSource.setTimeBetweenEvictionRunsMillis(60000);
druidDataSource.setMinEvictableIdleTimeMillis(25200000);
druidDataSource.setRemoveAbandoned(true);
druidDataSource.setRemoveAbandonedTimeout(1800);
druidDataSource.setLogAbandoned(true);
return druidDataSource;
}
@Bean
public DataSourceProxy dataSourceProxy(DruidDataSource druidDataSource) {
return new DataSourceProxy(druidDataSource);
}
@Bean
public SqlSessionFactory sqlSessionFactory(DataSourceProxy dataSourceProxy) throws Exception {
SqlSessionFactoryBean factoryBean = new SqlSessionFactoryBean();
factoryBean.setDataSource(dataSourceProxy);
factoryBean.setMapperLocations(
new PathMatchingResourcePatternResolver().getResources("classpath*:/mapper/*.xml"));
factoryBean.setTransactionFactory(new JdbcTransactionFactory());
return factoryBean.getObject();
}
@Bean
public GlobalTransactionScanner globalTransactionScanner() {
return new GlobalTransactionScanner("order-gts-seata-example", "my_test_tx_group");
}
}