分布式事务一站式解决方案-
- 分布式事务一站式解决方案
- 分布式事务产生背景
- 三个概念
- Seata下载和安装
- 实际业务模拟演示
- 不加 @GlobalTransactional 注解,正常操作下单
- 不加 @GlobalTransactional 注解,下单过程出异常或者超时了
- 加 @GlobalTransactional 注解,下单过程出异常或者超时了
- 原理
- undo_log 表作用
- 二阶段提交原理
- 分布式事务的执行流程(下订单-减库存-账户更新)
分布式事务一站式解决方案
分布式事务产生背景
一般来说,如果是微服务架构,会采用分布式系统开发,既然是多个微服务,那肯定是有多个独立的数据库的,那问题来了,
所以迫切希望提供一种分布式事务,解决微服务架构下的分布式事务问题
三个概念
Seata下载和安装
下载就不说了,直接去官方网站下载最新版本即可,注意安装 Seata 之前需要启动 Nacos,下载 Nacos 后直接 startup.cmd -m standalone
启动即可。
解压 seata-server-2.0.0.zip ,然后进入 conf 目录,更改 application.yml 配置如下,注意 console.user.username
console.user.password
seata.security.secretKey
seata.security.tokenValidityInMilliseconds
这4个一定要配置,不然启动报错
20:37:31.390 WARN --- [ main] [letWebServerApplicationContext] [ refresh] [] : Exception encountered during context initialization - cancelling refresh attempt: org.springframework.beans.factory.UnsatisfiedDependencyException: Error creating bean with name 'webSecurityConfig': Unsatisfied dependency expressed through field 'userDetailsService';
nested exception is org.springframework.beans.factory.BeanCreationException: Error creating bean with name 'customUserDetailsServiceImpl': Injection of autowired dependencies failed;
nested exception is java.lang.IllegalArgumentException: Could not resolve placeholder 'console.user.username' in value "${console.user.username}"
20:39:36.510 WARN --- [ main] [letWebServerApplicationContext] [ refresh] [] : Exception encountered during context initialization - cancelling refresh attempt: org.springframework.beans.factory.UnsatisfiedDependencyException: Error creating bean with name 'webSecurityConfig': Unsatisfied dependency expressed through field 'tokenProvider';
nested exception is org.springframework.beans.factory.BeanCreationException: Error creating bean with name 'jwtTokenUtils': Injection of autowired dependencies failed;
nested exception is java.lang.IllegalArgumentException: Could not resolve placeholder 'seata.security.secretKey' in value "${seata.security.secretKey}
server:
port: 7091
spring:
application:
name: seata-server
logging:
config: classpath:logback-spring.xml
file:
path: ${log.home:${user.home}/logs/seata}
extend:
logstash-appender:
destination: 127.0.0.1:4560
kafka-appender:
bootstrap-servers: 127.0.0.1:9092
topic: logback_to_logstash
console:
user:
username: seata
password: seata
seata:
security:
secretKey: 'seata'
tokenValidityInMilliseconds: 1000000
config:
# support: nacos 、 consul 、 apollo 、 zk 、 etcd3
type: nacos
nacos:
server-addr: 127.0.0.1:8848
namespace:
group: SEATA_GROUP
username: nacos
password: nacos
context-path:
##if use MSE Nacos with auth, mutex with username/password attribute
#access-key:
#secret-key:
# data-id: seataServer.properties
registry:
# support: nacos 、 eureka 、 redis 、 zk 、 consul 、 etcd3 、 sofa
type: nacos
nacos:
application: seata-server
server-addr: 127.0.0.1:8848
group: SEATA_GROUP
namespace:
cluster: default
username: nacos
password: nacos
context-path:
##if use MSE Nacos with auth, mutex with username/password attribute
#access-key:
#secret-key:
store:
# support: file 、 db 、 redis 、 raft
mode: db
session:
mode: db
lock:
mode: db
file:
dir: sessionStore
max-branch-session-size: 16384
max-global-session-size: 512
file-write-buffer-cache-size: 16384
session-reload-read-size: 100
flush-disk-mode: async
db:
datasource: druid
db-type: mysql
driver-class-name: com.mysql.cj.jdbc.Driver
url: jdbc:mysql://192.168.133.128:3306/seata?useUnicode=true&rewriteBatchedStatements=true&serverTimezone=GMT
user: root
password: root
min-conn: 10
max-conn: 100
global-table: global_table
branch-table: branch_table
lock-table: lock_table
distributed-lock-table: distributed_lock
query-limit: 1000
max-wait: 5000
启动成功便可以进入 Seata 前端管理页面
同时 Nacos 也可以看到 Seata 服务注册上来了
实际业务模拟演示
本次学习会用到如下三个模块,对应三个微服务,每个服务用的单独的数据库,其中 cloud-seata-order-service2001 是订单微服务、cloud-seata-storage-service2002 是库存微服务、cloud-seata-account-service2003 是账户微服务,具体的代码可以去 github 上获取
项目用的数据库及表如下
初始阶段,订单表、库存表、账户表数据如下:
不加 @GlobalTransactional 注解,正常操作下单
实际的业务需求就是 下订单-扣减库存-更新账户余额,由于现在是分布式系统,如何保证事务呢?
先看看正常操作下单,1号用户花费100块买了10个1号产品,发送如下请求 http://localhost:2001/order/create?userId=1&productId=1&count=10&money=100
, 此时数据库表数据是正常的
不加 @GlobalTransactional 注解,下单过程出异常或者超时了
假设,在下订单->扣减库存->更新账户余额,在更新账户余额这一步代码逻辑超时了或者出异常了,为什么这里超时设置的是65秒?因为 OpenFeign 远程调用的默认超时时间是 60秒
@Slf4j
@Service
public class AccountServiceImpl implements AccountService {
@Resource
private AccountMapper accountMapper;
@Override
public void decrease(Long userId, Long money) {
log.info("------------->AccountService 开始扣减余额");
accountMapper.decrease(userId, money);
log.info("------------->AccountService 开始扣减余额");
// 超时异常
timeout();
// 抛出异常
// int i = 10 / 0;
}
private void timeout() {
try {
TimeUnit.SECONDS.sleep(65);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
}
重启项目再发送一次请求,发现页面提示超时了,被全局异常捕获
查看表数据,发现库存扣减了,余额扣减了,但是订单状态是创建中…,这显然是有问题的
加 @GlobalTransactional 注解,下单过程出异常或者超时了
同样的过程,有了 GlobalTransactional 后,经过测试,正常下单,没问题;下单过程出异常或者超时了,数据库正确回滚,结果符合预期
@Override
// 微服务项目可能会有多个方法需要保证分布式事务,name 可以很好的区分
@GlobalTransactional(name = "create-order-transaction", rollbackFor = Exception.class)
public void create(Order order) {
// xid全局事务检查
String xid = RootContext.getXID();
// 1. 新建订单
log.info("-------------> 开始新建订单, XID: {}", xid);
order.setStatus(0);
int result = orderMapper.insertSelective(order);
Order orderFromDB;
if (result > 0) {
orderFromDB = orderMapper.selectOne(order);
log.info("-------------> 新建订单成功, OrderInfo: {}", orderFromDB);
// 2. 扣减库存
log.info("-------------> 开始扣减库存");
storageFeignApi.decrease(orderFromDB.getProductId(), orderFromDB.getCount());
log.info("-------------> 扣减库存成功");
// 3. 扣减账户余额
log.info("-------------> 开始扣减余额");
accountFeignApi.decrease(order.getUserId(), order.getMoney());
log.info("-------------> 扣余额存成功");
// 4. 修改订单状态
log.info("-------------> 开始修改订单状态");
Example whereCondition = new Example(Order.class);
Example.Criteria criteria = whereCondition.createCriteria();
criteria.andEqualTo("id", orderFromDB.getId());
criteria.andEqualTo("status", 0);
orderFromDB.setStatus(1);
int updateResult = orderMapper.updateByExampleSelective(orderFromDB, whereCondition);
log.info("-------------> 修改订单状态成功");
}
log.info("-------------> 结束新建订单, XID: {}", xid);
}
原理
答案就是 二阶段提交,从日志输出也可以看到,PhaseTwo_Rollbacked->二阶段回滚了…
2024-08-07T21:59:43.441+08:00 INFO 26572 --- [seata-account-service] [nio-2003-exec-2] e.wong.service.impl.AccountServiceImpl : ------------->AccountService 开始扣减余额
2024-08-07T22:00:43.500+08:00 INFO 26572 --- [seata-account-service] [h_RMROLE_1_1_24] i.s.c.r.p.c.RmBranchRollbackProcessor : rm handle branch rollback process:BranchRollbackRequest{xid='192.168.133.1:8091:2414480718769926145', branchId=2414480718769926148, branchType=AT, resourceId='jdbc:mysql://192.168.133.128:3306/seata_account', applicationData='null'}
2024-08-07T22:00:43.501+08:00 INFO 26572 --- [seata-account-service] [h_RMROLE_1_1_24] io.seata.rm.AbstractRMHandler : Branch Rollbacking: 192.168.133.1:8091:2414480718769926145 2414480718769926148 jdbc:mysql://192.168.133.128:3306/seata_account
2024-08-07T22:00:43.542+08:00 INFO 26572 --- [seata-account-service] [h_RMROLE_1_1_24] i.s.r.d.undo.AbstractUndoLogManager : xid 192.168.133.1:8091:2414480718769926145 branch 2414480718769926148, undo_log deleted with GlobalFinished
2024-08-07T22:00:43.543+08:00 INFO 26572 --- [seata-account-service] [h_RMROLE_1_1_24] i.seata.rm.datasource.DataSourceManager : branch rollback success, xid:192.168.133.1:8091:2414480718769926145, branchId:2414480718769926148
2024-08-07T22:00:43.543+08:00 INFO 26572 --- [seata-account-service] [h_RMROLE_1_1_24] io.seata.rm.AbstractRMHandler : Branch Rollbacked result: PhaseTwo_Rollbacked
undo_log 表作用
而且上面你会发现每个微服务数据库都有张 undo_log 表,这个是做什么的呢?
在订单业务方法加了GlobalTransactional 注解后,打开 65 秒注释,目的是观察中间状态,重新请求 http://localhost:2001/order/create?userId=1&productId=1&count=10&money=100
,打开 undo_log 表,发现此时有数据
JSON 格式化后结构如下
当然此时在 Seata 管理页面也可以看到分布式事务中间状态的数据
不过 undo_log 表的数据在事务成功提交或者回滚之后在自动删除
二阶段提交原理
第一阶段
二阶段有分成两种情况:正常提交和异常回滚