文章目录
- 一、认识Seata
- 1.1 Seata 是什么?
- 1.2 了解AT、TCC、SAGA事务模式?
- AT 模式
- 前提
- 整体机制
- 如何实现写隔离
- 如何实现读隔离
- TCC 模式
- Saga 模式
- Saga 模式适用场景
- Saga 模式优势
- Saga 模式缺点
- 二、Seata安装
- 2.1 下载
- 2.2 创建所需数据表
- 2.2.1 创建 分支表、全局表、锁表
- 2.2.2 创建 UNDO_LOG 表
- 2.3 修改配置文件
- 2.3.1 修改 registry.conf 文件
- 2.3.2 修改 file.conf 文件
- 2.4 启动seata
- 三、Seata的应用
- 3.1 springcloud项目整合seata
- 3.1.1 服务架构
- 3.1.2 创建仓储服务
- 3.1.3 创建仓储和订单数据库及数据表
- 3.1.4 仓储服务相关实现
- 3.1.5 订单服务相关实现
- 3.2 模拟异常测试分布式事务
- 3.2.1 测试正常流程
- 3.2.2 模拟失败流程
- 3.2.2 添加分布式事务注解
一、认识Seata
1.1 Seata 是什么?
Seata 是一款开源的分布式事务解决方案,致力于提供高性能和简单易用的分布式事务服务。Seata 将为用户提供了 AT、TCC、SAGA 和 XA 事务模式,为用户打造一站式的分布式解决方案。
1.2 了解AT、TCC、SAGA事务模式?
AT 模式
前提
- 基于支持本地 ACID 事务的关系型数据库。
- Java 应用,通过 JDBC 访问数据库。
整体机制
两阶段提交协议的演变:
- 一阶段:业务数据和回滚日志记录在同一个本地事务中提交,释放本地锁和连接资源。
- 二阶段:
提交异步化,非常快速地完成。
回滚通过一阶段的回滚日志进行反向补偿。
如何实现写隔离
过程:
- 一阶段本地事务提交前,需要确保先拿到 全局锁 。
- 拿不到 全局锁 ,不能提交本地事务。
- 拿 全局锁 的尝试被限制在一定范围内,超出范围将放弃,并回滚本地事务,释放本地锁。
举个栗子:
两个全局事务 tx1 和 tx2,分别对 a 表的 m 字段进行更新操作,m 的初始值 1000。
tx1 先开始,开启本地事务,拿到本地锁,更新操作 m = 1000 - 100 = 900。本地事务提交前,先拿到该记录的 全局锁 ,本地提交释放本地锁。 tx2 后开始,开启本地事务,拿到本地锁,更新操作 m = 900 - 100 = 800。本地事务提交前,尝试拿该记录的 全局锁 ,tx1 全局提交前,该记录的全局锁被 tx1 持有,tx2 需要重试等待 全局锁 。
下面来看下官方的两张图来加深下理解:
tx1 二阶段全局提交,释放 全局锁 。tx2 拿到 全局锁 提交本地事务。
如果 tx1 的二阶段全局回滚,则 tx1 需要重新获取该数据的本地锁,进行反向补偿的更新操作,实现分支的回滚。
此时,如果 tx2 仍在等待该数据的 全局锁,同时持有本地锁,则 tx1 的分支回滚会失败。分支的回滚会一直重试,直到 tx2 的 全局锁 等锁超时,放弃 全局锁 并回滚本地事务释放本地锁,tx1 的分支回滚最终成功。
因为整个过程 全局锁 在 tx1 结束前一直是被 tx1 持有的,所以不会发生 脏写 的问题。
如何实现读隔离
在数据库本地事务隔离级别 读已提交(Read Committed) 或以上的基础上,Seata(AT 模式)的默认全局隔离级别是 读未提交(Read Uncommitted) 。
如果应用在特定场景下,必需要求全局的 读已提交 ,目前 Seata 的方式是通过 SELECT FOR UPDATE 语句的代理。
见官方图:
SELECT FOR UPDATE 语句的执行会申请 全局锁 ,如果 全局锁 被其他事务持有,则释放本地锁(回滚 SELECT FOR UPDATE 语句的本地执行)并重试。这个过程中,查询是被 block 住的,直到 全局锁 拿到,即读取的相关数据是 已提交 的,才返回。
出于总体性能上的考虑,Seata 目前的方案并没有对所有 SELECT 语句都进行代理,仅针对 FOR UPDATE 的 SELECT 语句。
TCC 模式
一个分布式的全局事务,整体是 两阶段提交 的模型。全局事务是由若干分支事务组成的,分支事务要满足 两阶段提交 的模型要求,即需要每个分支事务都具备自己的:
- 一阶段 prepare 行为
- 二阶段 commit 或 rollback 行为
TCC 模式,不依赖于底层数据资源的事务支持:
- 一阶段 prepare 行为:调用 自定义 的 prepare 逻辑。
- 二阶段 commit 行为:调用 自定义 的 commit 逻辑。
- 二阶段 rollback 行为:调用 自定义 的 rollback 逻辑。
所谓 TCC 模式,是指支持把 自定义 的分支事务纳入到全局事务的管理中。
Saga 模式
Saga模式是SEATA提供的长事务解决方案,在Saga模式中,业务流程中每个参与者都提交本地事务,当出现某一个参与者失败则补偿前面已经成功的参与者,一阶段正向服务和二阶段补偿服务都由业务开发实现。
官方图:
Saga 模式适用场景
- 业务流程长、业务流程多
- 参与者包含其它公司或遗留系统服务,无法提供 TCC 模式要求的三个接口
Saga 模式优势
- 一阶段提交本地事务,无锁,高性能
- 事件驱动架构,参与者可异步执行,高吞吐
- 补偿服务易于实现
Saga 模式缺点
- 不保证隔离性
二、Seata安装
2.1 下载
下载地址:
- https://github.com/seata/seata/releases
- https://github.com/seata/seata/releases/download/v1.4.2/seata-server-1.4.2.zip
我这边下载的是v1.4.2版本,大家下载时需要注意下seata版本和springcloud alibaba的版本,根据自己的alibaba的版本选择对应的seata
给大家贴出组件版本关系对应:
2.2 创建所需数据表
2.2.1 创建 分支表、全局表、锁表
首先需要创建 分支表、全局表、锁表
创建sql如下
-- -------------------------------- The script used when storeMode is 'db' --------------------------------
-- the table to store GlobalSession data
CREATE TABLE IF NOT EXISTS `global_table`
(
`xid` VARCHAR(128) NOT NULL,
`transaction_id` BIGINT,
`status` TINYINT NOT NULL,
`application_id` VARCHAR(32),
`transaction_service_group` VARCHAR(32),
`transaction_name` VARCHAR(128),
`timeout` INT,
`begin_time` BIGINT,
`application_data` VARCHAR(2000),
`gmt_create` DATETIME,
`gmt_modified` DATETIME,
PRIMARY KEY (`xid`),
KEY `idx_gmt_modified_status` (`gmt_modified`, `status`),
KEY `idx_transaction_id` (`transaction_id`)
) ENGINE = InnoDB
DEFAULT CHARSET = utf8;
-- the table to store BranchSession data
CREATE TABLE IF NOT EXISTS `branch_table`
(
`branch_id` BIGINT NOT NULL,
`xid` VARCHAR(128) NOT NULL,
`transaction_id` BIGINT,
`resource_group_id` VARCHAR(32),
`resource_id` VARCHAR(256),
`branch_type` VARCHAR(8),
`status` TINYINT,
`client_id` VARCHAR(64),
`application_data` VARCHAR(2000),
`gmt_create` DATETIME(6),
`gmt_modified` DATETIME(6),
PRIMARY KEY (`branch_id`),
KEY `idx_xid` (`xid`)
) ENGINE = InnoDB
DEFAULT CHARSET = utf8;
-- the table to store lock data
CREATE TABLE IF NOT EXISTS `lock_table`
(
`row_key` VARCHAR(128) NOT NULL,
`xid` VARCHAR(128),
`transaction_id` BIGINT,
`branch_id` BIGINT NOT NULL,
`resource_id` VARCHAR(256),
`table_name` VARCHAR(32),
`pk` VARCHAR(36),
`gmt_create` DATETIME,
`gmt_modified` DATETIME,
PRIMARY KEY (`row_key`),
KEY `idx_branch_id` (`branch_id`)
) ENGINE = InnoDB
DEFAULT CHARSET = utf8;
2.2.2 创建 UNDO_LOG 表
SEATA AT 模式需要 UNDO_LOG 表
创建sql如下:
CREATE TABLE `undo_log` (
`id` bigint(20) NOT NULL AUTO_INCREMENT,
`branch_id` bigint(20) NOT NULL,
`xid` varchar(100) NOT NULL,
`context` varchar(128) NOT NULL,
`rollback_info` longblob NOT NULL,
`log_status` int(11) NOT NULL,
`log_created` datetime NOT NULL,
`log_modified` datetime NOT NULL,
`ext` varchar(100) DEFAULT NULL,
PRIMARY KEY (`id`),
UNIQUE KEY `ux_undo_log` (`xid`,`branch_id`)
) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8;
2.3 修改配置文件
2.3.1 修改 registry.conf 文件
conf目录下找到registry.conf文件
首先将type类型改为 nacos,type默认file (这种方式需要把源码的file.conf文件复制到项目中,比较麻烦不推荐) ,然后修改seata的注册中心的相关配置
其次修改seata的配置中心的相关配置,同样type类型改为nacos
2.3.2 修改 file.conf 文件
下面还需要修改seata的DB类型
我们在conf目录下找到file.conf文件
mode类型改为db
然后修改自己的数据库配置
2.4 启动seata
在bin目录下找到 seata-server.bat 双击启动
看到日志输出 Server started 应该就启动成功了
查看nacos注册中心
观察服务列表,发现seata服务已经成功注册
对nacos还不了解的可以看这里
三、Seata的应用
3.1 springcloud项目整合seata
我们简单模拟下用户从下单到扣减库存的流程,来看看seata在项目中是如何应用的
3.1.1 服务架构
先看下我的项目的整体模块架构
springcloud版本
<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
<spring-cloud.version>2021.0.1</spring-cloud.version>
<spring-cloud-alibaba.version>2021.0.1.0</spring-cloud-alibaba.version>
</properties>
因为我的项目中已经有order的相关服务了, 为了故事的延续性我在建一个仓储的服务用来扣减库存
想参考我的项目架构的同学可以点击下面的地址
mdx-shop gitee地址
3.1.2 创建仓储服务
创建一个maven模块
为服务添加启动类配置文件和seata依赖等
seata依赖
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-alibaba-seata</artifactId>
<version>${spring-cloud-alibaba.version}</version>
</dependency>
仓储服务application.yml文件
server:
port: 9092
spring:
application:
name: mdx-shop-storage
cloud:
nacos:
discovery:
server-addr: localhost:8848
namespace: mdx
group: mdx
datasource:
type: com.alibaba.druid.pool.DruidDataSource
url: jdbc:mysql://localhost:3306/mdx_storage?autoRec&useUnicode=true&characterEncoding=utf8&zeroDateTimeBehavior=convertToNull&useSSL=true&serverTimezone=GMT%2B8
driverClassName: com.mysql.cj.jdbc.Driver
username: root
password: Bendi+Ceshi+
jpa:
show-sql: true #打印执行的sql语句,false则不打印sql
properties:
hibernate:
ddl-auto: none
dialect: org.hibernate.dialect.MySQL5InnoDBDialect
open-in-view: true
seata:
tx-service-group: my_test_tx_group
enabled: true
registry:
type: nacos
nacos:
application: mdx-seata-server #注册在nacos服务名
server-addr: localhost:8848
group : mdx
namespace: mdx #注册在nacos命名空间
3.1.3 创建仓储和订单数据库及数据表
我们为仓储服务和订单服务分别创建数据表
数据库自己提前建好
// 仓储
DROP TABLE IF EXISTS `storage_tbl`;
CREATE TABLE `storage_tbl` (
`id` int(11) NOT NULL AUTO_INCREMENT,
`commodity_code` varchar(255) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL,
`count` int(11) NULL DEFAULT 0,
PRIMARY KEY (`id`) USING BTREE,
UNIQUE INDEX `commodity_code`(`commodity_code`) USING BTREE
) ENGINE = InnoDB AUTO_INCREMENT = 2 CHARACTER SET = utf8 COLLATE = utf8_general_ci ROW_FORMAT = Dynamic;
-- ----------------------------
-- Records of storage_tbl
-- ----------------------------
INSERT INTO `storage_tbl` VALUES (1, 'S123434455666777', 10);
// 订单
DROP TABLE IF EXISTS `order_tbl`;
CREATE TABLE `order_tbl` (
`id` int(11) NOT NULL AUTO_INCREMENT,
`user_id` varchar(255) DEFAULT NULL,
`commodity_code` varchar(255) DEFAULT NULL,
`count` int(11) DEFAULT 0,
`money` int(11) DEFAULT 0,
PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
3.1.4 仓储服务相关实现
这里只贴出仓储服务的主要几个方法,具体的项目结构可以参考 https://gitee.com/Ji_Agang/mdx-shop
对于数据库的操作我们使用Spring Data Jpa来实现
依赖参考
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-jpa</artifactId>
</dependency>
启动类
注意启动类一定要加 @EnableAutoDataSourceProxy 注解,来开启数据源代理
/**
* @author : jiagang
* @date : Created in 2022/7/1 11:25
*/
@SpringBootApplication
@EnableFeignClients
@EnableAutoDataSourceProxy
public class MdxShopStorageApplication {
public static void main(String[] args) {
SpringApplication.run(MdxShopStorageApplication.class, args);
}
}
controller
/**
* @author : jiagang
* @date : Created in 2022/7/1 18:42
*/
@RestController
@RequestMapping("/storage")
public class StorageController {
@Autowired
private StorageService service;
@GetMapping("/deduct")
public CommonResponse deduct(String commodityCode, int count){
service.deduct(commodityCode, count);
return CommonResponse.success();
}
}
接口
/**
* @author : jiagang
* @date : Created in 2022/7/1 18:40
*/
public interface StorageService {
/**
* 扣除存储数量
*/
void deduct(String commodityCode, int count);
}
实现类
/**
* @author : jiagang
* @date : Created in 2022/7/1 18:42
*/
@Service
public class StorageServiceImpl implements StorageService {
@Autowired
private StorageRepository storageRepository;
/**
* 扣减库存
* @param commodityCode
* @param count
*/
@Override
public void deduct(String commodityCode, int count) {
StorageTbl storageTbl = storageRepository.findByCommodityCode(commodityCode);
if (storageTbl == null){
throw new BizException("storageTbl is null");
}
// 这里先不考虑超卖的情况
storageTbl.setCount(storageTbl.getCount() - count);
// 使用jpa 存在就更新
storageRepository.save(storageTbl);
}
}
数据层
/**
* @author : jiagang
* @date : Created in 2023/1/16 15:44
*/
@Repository
public interface StorageRepository extends JpaRepository<StorageTbl,Integer> {
/**
* 通过商品code查询库存
* @param commodityCode
* @return
*/
@Query
StorageTbl findByCommodityCode(String commodityCode);
}
3.1.5 订单服务相关实现
这里只贴出订单服务的主要几个方法,具体的项目结构可以参考 https://gitee.com/Ji_Agang/mdx-shop
对于数据库的操作我们同样使用Jpa来实现
application.yml 配置文件
server:
port: 9091
spring:
application:
name: mdx-shop-order
cloud:
nacos:
discovery:
server-addr: localhost:8848
namespace: mdx
group: mdx
datasource:
type: com.alibaba.druid.pool.DruidDataSource
url: jdbc:mysql://localhost:3306/mdx_order?autoRec&useUnicode=true&characterEncoding=utf8&zeroDateTimeBehavior=convertToNull&useSSL=true&serverTimezone=GMT%2B8
driverClassName: com.mysql.cj.jdbc.Driver
username: root
password: Bendi+Ceshi+
jpa:
show-sql: true #打印执行的sql语句,false则不打印sql
properties:
hibernate:
ddl-auto: none
dialect: org.hibernate.dialect.MySQL5InnoDBDialect
open-in-view: true
seata:
tx-service-group: my_test_tx_group
enabled: true
registry:
type: nacos
nacos:
application: mdx-seata-server #注册在nacos服务名
server-addr: localhost:8848
group : mdx
namespace: mdx #注册在nacos命名空间
feign:
sentinel:
enabled: true
启动类
注意启动类一定要加 @EnableAutoDataSourceProxy 注解,来开启数据源代理
/**
* @author : jiagang
* @date : Created in 2022/7/1 11:25
*/
@SpringBootApplication
@EnableFeignClients
@EnableAutoDataSourceProxy
public class MdxShopOrderApplication {
public static void main(String[] args) {
SpringApplication.run(MdxShopOrderApplication.class, args);
}
}
controller
/**
* @author : jiagang
* @date : Created in 2022/7/1 18:42
*/
@RestController
@RequestMapping("/order")
public class OrderController {
@Autowired
private OrderService orderService;
/**
* 用户下单接口
* @param userId
* @param commodityCode
* @return
*/
@PostMapping("createOrder")
public CommonResponse<String> createOrder(String userId, String commodityCode){
return CommonResponse.success(orderService.createOrder(userId,commodityCode));
}
}
接口
/**
* @author : jiagang
* @date : Created in 2022/7/1 18:40
*/
public interface OrderService {
/**
* 下单接口
* @param userId 用户id
* @param commodityCode 商品代码
* @return
*/
String createOrder(String userId, String commodityCode);
}
实现类
/**
* @author : jiagang
* @date : Created in 2022/7/1 18:42
*/
@Service
public class OrderServiceImpl implements OrderService {
@Resource
private OrderRepository orderRepository;
@Resource
private StorageFeign storageFeign;
/**
* 下单接口
* @param userId 用户id
* @param commodityCode 商品代码
* @return
*/
@Override
public String createOrder(String userId, String commodityCode) {
try {
System.out.println("事务id---------------------->" + RootContext.getXID());
// 创建订单
OrderTbl orderTbl = new OrderTbl();
orderTbl.setUserId(userId);
orderTbl.setCommodityCode(commodityCode);
orderTbl.setCount(1); // 假设为1件
orderTbl.setMoney(10); // 假设为十元
// 保存订单
orderRepository.save(orderTbl);
// 保存订单成功后扣减库存
storageFeign.deduct(commodityCode,orderTbl.getCount());
return "success";
}catch (Exception e){
throw new BizException("创建订单失败");
}
}
}
数据层
/**
* @author : jiagang
* @date : Created in 2023/1/16 15:44
*/
@Repository
public interface OrderRepository extends JpaRepository<OrderTbl,Integer> {
}
feign接口
对微服务之间使用feign来调用还不熟悉的同学可以点下面的链接
springcloud alibaba微服务 – openfeign的使用(保姆级)
/**
* @author : jiagang
* @date : Created in 2022/7/4 10:26
*/
@FeignClient(value = "mdx-shop-storage")
@Component
public interface StorageFeign {
/**
* 扣减库存
* @param commodityCode
* @param count
* @return
*/
@GetMapping("storage/deduct")
String deduct(@RequestParam String commodityCode,@RequestParam Integer count);
}
3.2 模拟异常测试分布式事务
在进行测试之前,我们先来看下业务逻辑,我们使用postman来调用下单接口进行下单,接口地址为 http://localhost:9091/order/createOrder?userId=admin&commodityCode=S123434455666777 (POST请求) ,然后下单接口保存订单,并通过feign接口调用仓储服务扣减库存。
3.2.1 测试正常流程
正常流程下用户下单,订单数据库增加订单,仓储数据库为下单的商品扣减库存。
首先看一下订单数据库order_tbl表和仓储数据库storage_tbl表
订单表没有数据
仓储表有一条商品,库存为10
正常情况下,下单之后(我们只买一件商品)订单增加一条数据,仓储的S123434455666777商品库存减1 为 9
POST 请求调用 下单接口 http://localhost:9091/order/createOrder?userId=admin&commodityCode=S123434455666777
postman提示成功
看一下数据库
订单新增成功
库存减为9
3.2.2 模拟失败流程
先将数据库订单表清空,仓储表库存继续设置为10(这里不操作也行,大家记住之前的状态就可以了)
我们在仓储服务的扣减库存的方法中手动写一个异常,异常如下
System.out.println(1 / 0);
/**
* 扣减库存
* @param commodityCode
* @param count
*/
@Override
public void deduct(String commodityCode, int count) {
System.out.println("事务id---------------------->" + RootContext.getXID());
StorageTbl storageTbl = storageRepository.findByCommodityCode(commodityCode);
if (storageTbl == null){
throw new BizException("storageTbl is null");
}
// 模拟异常
System.out.println(1 / 0);
// 这里先不考虑超卖的情况
storageTbl.setCount(storageTbl.getCount() - count);
// 使用jpa 存在就更新
storageRepository.save(storageTbl);
}
此时,再来调用下单接口
http://localhost:9091/order/createOrder?userId=admin&commodityCode=S123434455666777
可以看到服务报错,提示创建订单失败
再来观察一下数据库
发现订单依然创建成功,但是库存缺没有减少,还是10,这就导致了用户下单成功了,但是没给人减库存,造成数据不一致,可能会发生超卖。
3.2.2 添加分布式事务注解
先将数据库订单表清空(这里不操作也行,大家记住之前的状态就可以了)
为了解决上面的问题,我们为创建订单方法增加seata的分布式注解
@GlobalTransactional
/**
* 下单接口
* @param userId 用户id
* @param commodityCode 商品代码
* @return
*/
@Override
@GlobalTransactional
public String createOrder(String userId, String commodityCode) {
try {
System.out.println("事务id---------------------->" + RootContext.getXID());
// 创建订单
OrderTbl orderTbl = new OrderTbl();
orderTbl.setUserId(userId);
orderTbl.setCommodityCode(commodityCode);
orderTbl.setCount(1); // 假设为1件
orderTbl.setMoney(10); // 假设为十元
// 保存订单
orderRepository.save(orderTbl);
// 保存订单成功后扣减库存
storageFeign.deduct(commodityCode,orderTbl.getCount());
return "success";
}catch (Exception e){
throw new BizException("创建订单失败");
}
}
加上注解之后重启服务继续调用下单接口
可以看到创建订单失败
然后再来观察下数据库
发现订单表没有此商品的订单,库存也没变,那就表示事务已经成功回滚了,不会再出现订单创建成功了但库存没减的情况。
这篇文章查阅资料、测试、发现问题、解决问题花了大约两天时间,创作不易,点个赞吧👍
最后的最后送大家一句话
白驹过隙,沧海桑田
与君共勉