Seata
本专栏学习内容来自尚硅谷周阳老师的视频
有兴趣的小伙伴可以点击视频地址观看
分布式事务问题
在使用分布式之前,一般都是单机单库或者是单机多库的情况,一个服务对应一个数据库或者多个数据库,这样事务的问题可以通过@Transaction
解决。而在微服务应用中,原来的一个服务被拆分成了三个独立的应用,分别使用三个独立的数据源,业务操作需要调用三个服务来完成。此时每个服务内部的数据一致性由本地事务来保证,但是全局的数据一致性问题没法保证。
一次业务操作需要跨多个数据源或需要跨多个系统进行远程调用,就会产生分布式事务问题
Seata简介
Seata是一款开源的分布式事务解决方案,致力于在微服务架构下提供高性能和简单易用的分布式事务服务。官网地址
分布式事务过程
一ID
- Transaction ID XID:全局唯一的事务ID
三组件
-
Transaction Coordinator (TC) - 事务协调者
维护全局和分支事务的状态,驱动全局事务提交或回滚。
-
TM (Transaction Manager) - 事务管理器
定义全局事务的范围:开始全局事务、提交或回滚全局事务。
-
RM (Resource Manager) - 资源管理器
管理分支事务处理的资源,与TC交谈以注册分支事务和报告分支事务的状态,并驱动分支事务提交或回滚。
处理过程
- TM向TC申请开启一个全局事务,全局事务创建成功并生成一个全局唯一的XID;
- XID在微服务调用链路的上下文中传播
- RM向TC注册分支事务,将其纳入XID对应全局事务的管辖
- TM向TC发起针对XID的全局提交或回滚决议
- TC调度XID下管辖的全部分支事务完成提交或回滚要求
玩法
只需要使用一个@GlobalTransaction
注解在业务方法上即可
Seata Server安装
小黄使用的是1.6.1版本 下载地址
下载完成后,解压压缩包,得到这么一个目录
修改配置文件
修改conf/applicatoin.yml
文件
不重要的就不贴出来了,之后要整合Nacos
seata.config
:修改为nacos,并配置,后续回去nacos上读取配置文件seata.registry
:修改为nacos,表示将服务注册到nacos中seata.store
:修改为db,并配置MySQL地址
rver:
port: 7091
spring:
application:
name: seata-server
console:
user:
username: seata
password: seata
seata:
config:
# support: nacos, consul, apollo, zk, etcd3
type: nacos
nacos:
server-addr: 127.0.0.1:8848 # nacos的ip端口
group: DEFAULT_GROUP # 对应的组,默认为DEFAULT_GROUP
namespace: 4eed638d-b9c7-4b34-bead-7d376a0946a8 # 对应的命名空间,在nacos中配置
username: nacos
password: nacos
data-id: seataServer.properties # nacos中存放seata的配置文件,后面会提该文件的使用方式,相当于seata服务启动的时候需要注册到nacos,并使用nacos中的配置文件
registry:
# support: nacos, eureka, redis, zk, consul, etcd3, sofa
type: nacos
nacos:
application: seata-server
server-addr: 127.0.0.1:8848
namespace: 4eed638d-b9c7-4b34-bead-7d376a0946a8
group: DEFAULT_GROUP
cluster: default
username: nacos
password: nacos
store:
# support: file 、 db 、 redis
mode: db
db:
datasource: druid
db-type: mysql
driver-class-name: com.mysql.cj.jdbc.Driver
url: jdbc:mysql://127.0.0.1:13306/seata?useUnicode=true&rewriteBatchedStatements=true&serverTimezone=GMT
user: root
password: 123456
min-conn: 5
max-conn: 100
global-table: global_table
branch-table: branch_table
lock-table: lock_table
distributed-lock-table: distributed_lock
query-limit: 100
max-wait: 5000
创建数据表
首先创建一个seata
数据库,需要注意的是,seata不同版本的数据库不相同,下载地址
配置Nacos
在配置文件中,配置了data-id
,seata启动时,将服务注册到nacos,并且使用读取seataServer.properties
文件进行配置
在相应的命名空间下,创建seataServer.properties
#For details about configuration items, see https://seata.io/zh-cn/docs/user/configurations.html
#Transport configuration, for client and server
transport.type=TCP
transport.server=NIO
transport.heartbeat=true
transport.enableTmClientBatchSendRequest=false
transport.enableRmClientBatchSendRequest=true
transport.enableTcServerBatchSendResponse=false
transport.rpcRmRequestTimeout=30000
transport.rpcTmRequestTimeout=30000
transport.rpcTcRequestTimeout=30000
transport.threadFactory.bossThreadPrefix=NettyBoss
transport.threadFactory.workerThreadPrefix=NettyServerNIOWorker
transport.threadFactory.serverExecutorThreadPrefix=NettyServerBizHandler
transport.threadFactory.shareBossWorker=false
transport.threadFactory.clientSelectorThreadPrefix=NettyClientSelector
transport.threadFactory.clientSelectorThreadSize=1
transport.threadFactory.clientWorkerThreadPrefix=NettyClientWorkerThread
transport.threadFactory.bossThreadSize=1
transport.threadFactory.workerThreadSize=default
transport.shutdown.wait=3
transport.serialization=seata
transport.compressor=none
#Transaction routing rules configuration, only for the client
# 此处的yellowstargroup名字可以自定义,只修改这个值即可
service.vgroupMapping.yellowstargroup=default
#If you use a registry, you can ignore it
service.default.grouplist=127.0.0.1:8091
service.enableDegrade=false
service.disableGlobalTransaction=false
#Transaction rule configuration, only for the client
client.rm.asyncCommitBufferLimit=10000
client.rm.lock.retryInterval=10
client.rm.lock.retryTimes=30
client.rm.lock.retryPolicyBranchRollbackOnConflict=true
client.rm.reportRetryCount=5
client.rm.tableMetaCheckEnable=true
client.rm.tableMetaCheckerInterval=60000
client.rm.sqlParserType=druid
client.rm.reportSuccessEnable=false
client.rm.sagaBranchRegisterEnable=false
client.rm.sagaJsonParser=fastjson
client.rm.tccActionInterceptorOrder=-2147482648
client.tm.commitRetryCount=5
client.tm.rollbackRetryCount=5
client.tm.defaultGlobalTransactionTimeout=60000
client.tm.degradeCheck=false
client.tm.degradeCheckAllowTimes=10
client.tm.degradeCheckPeriod=2000
client.tm.interceptorOrder=-2147482648
client.undo.dataValidation=true
client.undo.logSerialization=jackson
client.undo.onlyCareUpdateColumns=true
server.undo.logSaveDays=7
server.undo.logDeletePeriod=86400000
client.undo.logTable=undo_log
client.undo.compress.enable=true
client.undo.compress.type=zip
client.undo.compress.threshold=64k
#For TCC transaction mode
tcc.fence.logTableName=tcc_fence_log
tcc.fence.cleanPeriod=1h
#Log rule configuration, for client and server
log.exceptionRate=100
#Transaction storage configuration, only for the server. The file, db, and redis configuration values are optional.
# 默认为file,一定要改为db,我们自己的服务启动会连接不到seata
store.mode=db
store.lock.mode=db
store.session.mode=db
#Used for password encryption
#These configurations are required if the `store mode` is `db`. If `store.mode,store.lock.mode,store.session.mode` are not equal to `db`, you can remove the configuration block.
# 修改mysql的配置
store.db.datasource=druid
store.db.dbType=mysql
store.db.driverClassName=com.mysql.cj.jdbc.Driver
# 指定seata的数据库,下面会提
store.db.url=jdbc:mysql://127.0.0.1:3306/seata?useUnicode=true&rewriteBatchedStatements=true
store.db.user=root
store.db.password=123456
store.db.minConn=5
store.db.maxConn=30
store.db.globalTable=global_table
store.db.branchTable=branch_table
store.db.distributedLockTable=distributed_lock
store.db.queryLimit=100
store.db.lockTable=lock_table
store.db.maxWait=5000
#Transaction rule configuration, only for the server
server.recovery.committingRetryPeriod=1000
server.recovery.asynCommittingRetryPeriod=1000
server.recovery.rollbackingRetryPeriod=1000
server.recovery.timeoutRetryPeriod=1000
server.maxCommitRetryTimeout=-1
server.maxRollbackRetryTimeout=-1
server.rollbackRetryTimeoutUnlockEnable=false
server.distributedLockExpireTime=10000
server.xaerNotaRetryTimeout=60000
server.session.branchAsyncQueueSize=5000
server.session.enableBranchAsyncRemove=false
server.enableParallelRequestHandle=false
#Metrics configuration, only for the server
metrics.enabled=false
metrics.registryType=compact
metrics.exporterList=prometheus
metrics.exporterPrometheusPort=9898
启动
双击运行/bin/seata-server.bat
,seata可视化界面
并且可以看到服务也注册到了Nacos中
Seata实战
业务说明
创建三个服务:订单、库存、账户
当用户下单时,会在订单服务中创建一个订单,然后通过远程调用库存服务来扣减下单商品库存,再通过远程调用账户服务来扣减用户账户里面的余额,最后在订单服务中修改订单状态为已完成。
该操作跨越三个数据库,有两次远程调用,很明显会有分布式事务问题
创建数据库
存储订单的数据库
CREATE DATABASE seata_order;
CREATE TABLE t_order (
`id` BIGINT(11) NOT NULL AUTO_INCREMENT PRIMARY KEY,
`user_id` BIGINT(11) DEFAULT NULL COMMENT '用户id',
`product_id` BIGINT(11) DEFAULT NULL COMMENT '产品id',
`count` INT(11) DEFAULT NULL COMMENT '数量',
`money` DECIMAL(11,0) DEFAULT NULL COMMENT '金额',
`status` INT(1) DEFAULT NULL COMMENT '订单状态:0:创建中;1:已完结'
) ENGINE=INNODB AUTO_INCREMENT=7 DEFAULT CHARSET=utf8;
SELECT * FROM t_order;
存储库存的数据库
CREATE DATABASE seata_storage;
CREATE TABLE t_storage (
`id` BIGINT(11) NOT NULL AUTO_INCREMENT PRIMARY KEY,
`product_id` BIGINT(11) DEFAULT NULL COMMENT '产品id',
`total` INT(11) DEFAULT NULL COMMENT '总库存',
`used` INT(11) DEFAULT NULL COMMENT '已用库存',
`residue` INT(11) DEFAULT NULL COMMENT '剩余库存'
) ENGINE=INNODB AUTO_INCREMENT=2 DEFAULT CHARSET=utf8;
INSERT INTO seata_storage.t_storage(`id`, `product_id`, `total`, `used`, `residue`)
VALUES ('1', '1', '100', '0', '100');
SELECT * FROM t_storage;
存储账户的数据库
CREATE DATABASE seata_account;
CREATE TABLE t_account (
`id` BIGINT(11) NOT NULL AUTO_INCREMENT PRIMARY KEY COMMENT 'id',
`user_id` BIGINT(11) DEFAULT NULL COMMENT '用户id',
`total` DECIMAL(10,0) DEFAULT NULL COMMENT '总额度',
`used` DECIMAL(10,0) DEFAULT NULL COMMENT '已用余额',
`residue` DECIMAL(10,0) DEFAULT '0' COMMENT '剩余可用额度'
) ENGINE=INNODB AUTO_INCREMENT=2 DEFAULT CHARSET=utf8;
INSERT INTO seata_account.t_account(`id`, `user_id`, `total`, `used`, `residue`) VALUES ('1', '1', '1000', '0', '1000');
SELECT * FROM t_account;
回滚日志表
订单-库存-账户三个库下都需要创建各自的回滚日志表
DROP TABLE IF EXISTS `undo_log`;
CREATE TABLE IF NOT EXISTS `undo_log`
(
`branch_id` BIGINT NOT NULL COMMENT 'branch transaction id',
`xid` VARCHAR(128) NOT NULL COMMENT 'global transaction id',
`context` VARCHAR(128) NOT NULL COMMENT 'undo_log context,such as serialization',
`rollback_info` LONGBLOB NOT NULL COMMENT 'rollback info',
`log_status` INT(11) NOT NULL COMMENT '0:normal status,1:defense status',
`log_created` DATETIME(6) NOT NULL COMMENT 'create datetime',
`log_modified` DATETIME(6) NOT NULL COMMENT 'modify datetime',
UNIQUE KEY `ux_undo_log` (`xid`, `branch_id`)
) ENGINE = InnoDB
AUTO_INCREMENT = 1
DEFAULT CHARSET = utf8 COMMENT ='AT transaction mode undo table';
创建模块
业务代码
这里咱们主要关心的就不是业务代码了,而是SpringCloud与Seata的整合,业务代码大家可以去小黄的gitee上自行下载,下载链接
我们通过OpenFeign实现远程调用,具体业务代码如下
- 调用2001本服务
orderMapper.insert(order)
新建一个订单 - 远程调用2002库存服务
storageService.decrease
减少库存 - 远程调用2003账户服务
accountService.decrease
减少账户余额 - 调用2001本服务
orderMapper.updateById
修改订单状态
@Slf4j
@Service
public class OrderServiceImpl extends ServiceImpl<OrderMapper, Order> implements OrderService {
@Autowired
OrderMapper orderMapper;
@Autowired
AccountService accountService;
@Autowired
StorageService storageService;
@Override
//@GlobalTransactional(rollbackFor = Exception.class) //全局事务注解,目前还没用
public void create(Order order) {
log.info("-----> 创建新订单");
order.setStatus(0);
orderMapper.insert(order);
log.info("-----> 开始减库存");
storageService.decrease(order.getProductId(),order.getCount());
log.info("-----> 结束减库存");
log.info("-----> 开始减余额");
accountService.decrease(order.getUserId(),order.getMoney());
log.info("-----> 结束减余额");
log.info("-----> 开始更新订单状态");
order.setStatus(1);
orderMapper.updateById(order);
log.info("-----> 结束更新订单状态");
}
}
Seata整合代码
Nacos配置
需要在nacos中创建一个配置文件
名称对应着三个地方,是有所关联的
对于Seata来说,三个服务的配置都是一样的
相关依赖
首先是依赖,本地安装的哪个版本的Seata,就要引入哪个版本的jar包
<!--nacos依赖-->
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-alibaba-nacos-discovery</artifactId>
</dependency>
<!--seata依赖需要排除掉,用了什么版本的seata就用那个版本的依赖-->
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-alibaba-seata</artifactId>
<exclusions>
<exclusion>
<groupId>io.seata</groupId>
<artifactId>seata-spring-boot-starter</artifactId>
</exclusion>
<exclusion>
<groupId>io.seata</groupId>
<artifactId>seata-all</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>io.seata</groupId>
<artifactId>seata-spring-boot-starter</artifactId>
<version>1.6.1</version>
</dependency>
<dependency>
<groupId>io.seata</groupId>
<artifactId>seata-all</artifactId>
<version>1.6.1</version>
</dependency>
配置文件
只显示Seata相关配置文件
spring:
application:
name: seata-order-service
cloud:
nacos:
discovery:
server-addr: 127.0.0.1:8848
namespace: 4eed638d-b9c7-4b34-bead-7d376a0946a8
alibaba:
seata:
tx-service-group: yellowstargroup
seata:
service:
vgroup-mapping:
yellowstargroup: default # key是事务分组名称 value要和服务端的机房名称保持一致
registry:
type: nacos
nacos:
application: seata-server
server-addr: 127.0.0.1:8848
namespace: 4eed638d-b9c7-4b34-bead-7d376a0946a8
group: DEFAULT_GROUP
config:
type: nacos
nacos:
server-addr: 127.0.0.1:8848
namespace: 4eed638d-b9c7-4b34-bead-7d376a0946a8
group: DEFAULT_GROUP
这里的坑非常多,启动一直找不到service.vgroupMapping.seata-order-service-fescar-service-group
这是因为小黄一开始是使用如下配置,经过对源码的解读,才知道这个必须配在spring.cloud.alibaba.seta.tx-service-group
下
组别配好之后,启动依旧是找不到service.vgroupMapping.yellowstargroup
这是因为我们创建的配置文件属于DEFAULT_GROUP
分组,最坑的来了,网上很多关于Seata的配置都是默认组别的,而Seata默认组别是SEATA_GROUP
!
Seata接管数据库
Seata需要对数据库进行接管和代理,需要一个配置文件
@Configuration
public class DataSourceProxyConfig {
@Bean
@ConfigurationProperties(prefix = "spring.datasource")
public DruidDataSource druidDataSource() {
return new DruidDataSource();
}
@Primary
@Bean
public DataSourceProxy dataSource(DruidDataSource druidDataSource) {
return new DataSourceProxy(druidDataSource);
}
}
测试
我们故意破坏远程调用服务,这时候发现订单加了,数量减少了,但是钱没扣掉,这时候只需要在业务方法上加上@GlobalTransactional(rollbackFor = Exception.class)
注解即可
Seata原理
TC/TM/RM三大组件
分布式事务的执行流程
- TM 开启分布式事务(TM 向 TC 注册全局事务记录)
- 按业务场景,编排数据库、服务等事务内资源(RM 向 TC 汇报资源准备状态 )
- TM 结束分布式事务,事务一阶段结束(TM 通知 TC 提交/回滚分布式事务)
- TC 汇总事务信息,决定分布式事务是提交还是回滚
- TC 通知所有 RM 提交/回滚 资源,事务二阶段结束
这样说估计大家都不是很理解,小黄也看得糊里糊涂的,接下来通过上述案例更清晰的了解一下什么是TC/TM/RM
AT模式
Seata提供了非常多的模式供我们选择,而默认是AT模式
整体机制
AT模式分为两个阶段
一阶段:加载
在一阶段,Seata 会拦截“业务 SQL”,
- 解析 SQL 语义,找到“业务 SQL”要更新的业务数据,在业务数据被更新前,将其保存成“before image”,
- 执行“业务 SQL”更新业务数据,在业务数据更新之后,
- 其保存成“after image”,最后生成行锁。
以上操作全部在一个数据库事务内完成,这样保证了一阶段操作的原子性。
二阶段
二阶段如果是顺利提交的话,因为业务SQL在一阶段已经提交给数据库,所以Seata只需要将一阶段保存的快照数据和行锁删除即可
二阶段如果是回滚的话,Seata就需要回滚一阶段已经执行的业务SQL,还原业务数据
回滚方式使用before image
还原业务数据,但是在还原前要先校验脏写,对比数据库数据与快照数据,如果两份数据完全一致就说明没有脏写,可以还原业务数据,如果不一致就说明有脏写,这种情况,需要根据配置策略来做处理。