背景
日常开发离不开分布式系统,自然避免不了分布式事务问题,seata 是一款阿里开源的主流分布式事务解决方案,但实际工作引入seata感觉有点重,本人之前在商业银行做开发对数据一致性要求很高,但很少团队使用。几年前曾研究过seata, 今天趁有空做个demo脚手架,留以后有机会再用。
Seata组成
我们看下 Seata 中存在几种重要角色:
TC(Transaction Coordinator):事务协调者。管理全局的分支事务的状态,用于全局性事务的提交和回滚。
TM(Transaction Manager):事务管理者。用于开启、提交或回滚事务。
RM(Resource Manager):资源管理器。用于分支事务上的资源管理,向 TC 注册分支事务,上报分支事务的状态,接收 TC 的命令来提交或者回滚分支事务。
执行流程是这样的:
服务A中的 TM 向 TC 申请开启一个全局事务,TC 就会创建一个全局事务并返回一个唯一的 XID
服务A中的 RM 向 TC 注册分支事务,然后将这个分支事务纳入 XID 对应的全局事务管辖中
服务A开始执行分支事务
服务A开始远程调用B服务,此时 XID 会根据调用链传播
服务B中的 RM 也向 TC 注册分支事务,然后将这个分支事务纳入 XID 对应的全局事务管辖中
服务B开始执行分支事务
全局事务调用处理结束后,TM 会根据有误异常情况,向 TC 发起全局事务的提交或回滚
TC 协调其管辖之下的所有分支事务,决定是提交还是回滚
前置工作
seata下载安装
官网下载地址:
https://seata.apache.org/zh-cn/unversioned/download/seata-server
本人下载安装的是seata-server-1.4.2版本
修改/seata-server-1.4.2/conf/file.conf配置文件
改为db, 改成自己数据库用户名、密码
## transaction log store, only used in seata-server
store {
## store mode: file、db、redis
mode = "db"
## rsa decryption public key
publicKey = ""
db {
datasource = "druid"
dbType = "mysql"
driverClassName = "com.mysql.cj.jdbc.Driver"
url = "jdbc:mysql://127.0.0.1:3306/seata?rewriteBatchedStatements=true"
user = "用户名"
password = "密码"
minConn = 5
maxConn = 100
globalTable = "global_table"
branchTable = "branch_table"
lockTable = "lock_table"
queryLimit = 100
maxWait = 5000
}
}
修改/seata-server-1.4.2/conf/registry.conf配置文件,写上nacos的配置参数
registry {
# file 、nacos 、eureka、redis、zk、consul、etcd3、sofa
type = "nacos"
nacos {
application = "seata-server"
serverAddr = "127.0.0.1:8848"
group = "DEFAULT_GROUP"
namespace = ""
cluster = "default"
username = "nacos"
password = "nacos"
}
file {
name = "file.conf"
}
}
config {
# file、nacos 、apollo、zk、consul、etcd3
type = "nacos"
nacos {
serverAddr = "127.0.0.1:8848"
namespace = ""
group = "DEFAULT_GROUP"
username = "nacos"
password = "nacos"
dataId = "seataServer.properties"
}
file {
name = "file.conf"
}
}
请务必留意上文配置中的dataId = seataServer.properties
这个dataId需要配到nacos中,下面我们开始配置这个
由于namespace未指定特殊值,留空表示使用public
我们在nacos配置列表项的public命名空间中,新增配置文件"seataServer.properties",配置类型选择为"Properties",新建时确保名称如下:
Data ID: seataServer.properties
Group: DEFAULT_GROUP
完事后启动seata的server服务
sh seata-server.sh
然后在编辑区贴入示例内容页中的内容。
贴入的内容有部分关键地方需要修改,请留意
nacos下载安装
从官网下载nacos,解压后,配置standlone模式运行,完毕后环境如下:
访问地址:http://192.168.5.210:8848/nacos
用户名:nacos
密码:nacos
配置内容:
#For details about configuration items, see https://seata.io/zh-cn/docs/user/configurations.html
#Transport configuration, for client and server
transport.type=TCP
transport.server=NIO
transport.heartbeat=true
transport.enableTmClientBatchSendRequest=false
transport.enableRmClientBatchSendRequest=true
transport.enableTcServerBatchSendResponse=false
transport.rpcRmRequestTimeout=30000
transport.rpcTmRequestTimeout=30000
transport.rpcTcRequestTimeout=30000
transport.threadFactory.bossThreadPrefix=NettyBoss
transport.threadFactory.workerThreadPrefix=NettyServerNIOWorker
transport.threadFactory.serverExecutorThreadPrefix=NettyServerBizHandler
transport.threadFactory.shareBossWorker=false
transport.threadFactory.clientSelectorThreadPrefix=NettyClientSelector
transport.threadFactory.clientSelectorThreadSize=1
transport.threadFactory.clientWorkerThreadPrefix=NettyClientWorkerThread
transport.threadFactory.bossThreadSize=1
transport.threadFactory.workerThreadSize=default
transport.shutdown.wait=3
transport.serialization=seata
transport.compressor=none
#Transaction routing rules configuration, only for the client
service.vgroupMapping.order-seata-service-group=default
service.vgroupMapping.goods-seata-service-group=default
#If you use a registry, you can ignore it
service.default.grouplist=127.0.0.1:8091
service.enableDegrade=false
service.disableGlobalTransaction=false
#Transaction rule configuration, only for the client
client.rm.asyncCommitBufferLimit=10000
client.rm.lock.retryInterval=10
client.rm.lock.retryTimes=30
client.rm.lock.retryPolicyBranchRollbackOnConflict=true
client.rm.reportRetryCount=5
client.rm.tableMetaCheckEnable=true
client.rm.tableMetaCheckerInterval=60000
client.rm.sqlParserType=druid
client.rm.reportSuccessEnable=false
client.rm.sagaBranchRegisterEnable=false
client.rm.sagaJsonParser=fastjson
client.rm.tccActionInterceptorOrder=-2147482648
client.tm.commitRetryCount=5
client.tm.rollbackRetryCount=5
client.tm.defaultGlobalTransactionTimeout=60000
client.tm.degradeCheck=false
client.tm.degradeCheckAllowTimes=10
client.tm.degradeCheckPeriod=2000
client.tm.interceptorOrder=-2147482648
client.undo.dataValidation=true
client.undo.logSerialization=jackson
client.undo.onlyCareUpdateColumns=true
server.undo.logSaveDays=7
server.undo.logDeletePeriod=86400000
client.undo.logTable=undo_log
client.undo.compress.enable=true
client.undo.compress.type=zip
client.undo.compress.threshold=64k
#For TCC transaction mode
tcc.fence.logTableName=tcc_fence_log
tcc.fence.cleanPeriod=1h
#Log rule configuration, for client and server
log.exceptionRate=100
#Transaction storage configuration, only for the server. The file, DB, and redis configuration values are optional.
store.mode=db
store.lock.mode=file
store.session.mode=file
#Used for password encryption
store.publicKey=
#If `store.mode,store.lock.mode,store.session.mode` are not equal to `file`, you can remove the configuration block.
store.file.dir=file_store/data
store.file.maxBranchSessionSize=16384
store.file.maxGlobalSessionSize=512
store.file.fileWriteBufferCacheSize=16384
store.file.flushDiskMode=async
store.file.sessionReloadReadSize=100
#These configurations are required if the `store mode` is `db`. If `store.mode,store.lock.mode,store.session.mode` are not equal to `db`, you can remove the configuration block.
store.db.datasource=druid
store.db.dbType=mysql
store.db.driverClassName=com.mysql.cj.jdbc.Driver
store.db.url=jdbc:mysql://127.0.0.1:3306/seata?useUnicode=true&rewriteBatchedStatements=true
store.db.user=root
store.db.password=rootroot
store.db.minConn=5
store.db.maxConn=30
store.db.globalTable=global_table
store.db.branchTable=branch_table
store.db.distributedLockTable=distributed_lock
store.db.queryLimit=100
store.db.lockTable=lock_table
store.db.maxWait=5000
#These configurations are required if the `store mode` is `redis`. If `store.mode,store.lock.mode,store.session.mode` are not equal to `redis`, you can remove the configuration block.
store.redis.mode=single
store.redis.single.host=127.0.0.1
store.redis.single.port=6379
store.redis.sentinel.masterName=
store.redis.sentinel.sentinelHosts=
store.redis.maxConn=10
store.redis.minConn=1
store.redis.maxTotal=100
store.redis.database=0
store.redis.password=
store.redis.queryLimit=100
#Transaction rule configuration, only for the server
server.recovery.committingRetryPeriod=1000
server.recovery.asynCommittingRetryPeriod=1000
server.recovery.rollbackingRetryPeriod=1000
server.recovery.timeoutRetryPeriod=1000
server.maxCommitRetryTimeout=-1
server.maxRollbackRetryTimeout=-1
server.rollbackRetryTimeoutUnlockEnable=false
server.distributedLockExpireTime=10000
server.xaerNotaRetryTimeout=60000
server.session.branchAsyncQueueSize=5000
server.session.enableBranchAsyncRemove=false
#Metrics configuration, only for the server
metrics.enabled=false
metrics.registryType=compact
metrics.exporterList=prometheus
metrics.exporterPrometheusPort=9898
准备数据库表
我用的本地mysql数据库
/*
Navicat Premium Data Transfer
Source Server : localhost
Source Server Type : MySQL
Source Server Version : 80033 (8.0.33)
Source Host : localhost:3306
Source Schema : mail_order
Target Server Type : MySQL
Target Server Version : 80033 (8.0.33)
File Encoding : 65001
Date: 25/04/2024 07:23:32
*/
SET NAMES utf8mb4;
SET FOREIGN_KEY_CHECKS = 0;
-- ----------------------------
-- Table structure for t_order
-- ----------------------------
DROP TABLE IF EXISTS `t_order`;
CREATE TABLE `t_order` (
`id` bigint NOT NULL AUTO_INCREMENT COMMENT '主键id',
`order_no` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci DEFAULT NULL COMMENT '订单编号',
`goods_name` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci DEFAULT NULL COMMENT '商品名称',
`goods_total_count` int DEFAULT NULL COMMENT '下单商品数量',
`sku_number` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci DEFAULT NULL COMMENT '商品sku编码',
`create_time` datetime DEFAULT NULL COMMENT '创建时间',
`modify_time` datetime DEFAULT NULL COMMENT '修改时间',
PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=44 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci;
-- ----------------------------
-- Records of t_order
-- ----------------------------
BEGIN;
COMMIT;
-- ----------------------------
-- Table structure for undo_log
-- ----------------------------
DROP TABLE IF EXISTS `undo_log`;
CREATE TABLE `undo_log` (
`branch_id` bigint NOT NULL COMMENT 'branch transaction id',
`xid` varchar(100) NOT NULL COMMENT 'global transaction id',
`context` varchar(128) NOT NULL COMMENT 'undo_log context,such as serialization',
`rollback_info` longblob NOT NULL COMMENT 'rollback info',
`log_status` int NOT NULL COMMENT '0:normal status,1:defense status',
`log_created` datetime(6) NOT NULL COMMENT 'create datetime',
`log_modified` datetime(6) NOT NULL COMMENT 'modify datetime',
UNIQUE KEY `ux_undo_log` (`xid`,`branch_id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb3 COMMENT='AT transaction mode undo table';
-- ----------------------------
-- Records of undo_log
-- ----------------------------
BEGIN;
COMMIT;
SET FOREIGN_KEY_CHECKS = 1;
/*
Navicat Premium Data Transfer
Source Server : localhost
Source Server Type : MySQL
Source Server Version : 80033 (8.0.33)
Source Host : localhost:3306
Source Schema : mail_goods
Target Server Type : MySQL
Target Server Version : 80033 (8.0.33)
File Encoding : 65001
Date: 25/04/2024 07:24:42
*/
SET NAMES utf8mb4;
SET FOREIGN_KEY_CHECKS = 0;
-- ----------------------------
-- Table structure for t_goods
-- ----------------------------
DROP TABLE IF EXISTS `t_goods`;
CREATE TABLE `t_goods` (
`id` bigint NOT NULL COMMENT '主键id',
`goods_name` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci NOT NULL COMMENT '商品名称',
`goods_price` int NOT NULL COMMENT '商品价格',
`goods_type` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci NOT NULL COMMENT '商品类型',
`goods_stock` int NOT NULL COMMENT '商品库存',
`sku_number` varchar(255) NOT NULL COMMENT 'sku编码',
PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci;
-- ----------------------------
-- Records of t_goods
-- ----------------------------
BEGIN;
INSERT INTO `t_goods` (`id`, `goods_name`, `goods_price`, `goods_type`, `goods_stock`, `sku_number`) VALUES (1, '可乐', 2, 'DRINK', 100, '022000');
COMMIT;
-- ----------------------------
-- Table structure for undo_log
-- ----------------------------
DROP TABLE IF EXISTS `undo_log`;
CREATE TABLE `undo_log` (
`branch_id` bigint NOT NULL COMMENT 'branch transaction id',
`xid` varchar(100) NOT NULL COMMENT 'global transaction id',
`context` varchar(128) NOT NULL COMMENT 'undo_log context,such as serialization',
`rollback_info` longblob NOT NULL COMMENT 'rollback info',
`log_status` int NOT NULL COMMENT '0:normal status,1:defense status',
`log_created` datetime(6) NOT NULL COMMENT 'create datetime',
`log_modified` datetime(6) NOT NULL COMMENT 'modify datetime',
UNIQUE KEY `ux_undo_log` (`xid`,`branch_id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb3 COMMENT='AT transaction mode undo table';
-- ----------------------------
-- Records of undo_log
-- ----------------------------
BEGIN;
COMMIT;
SET FOREIGN_KEY_CHECKS = 1;
详细步骤
源码参考地址:
https://github.com/mikewuhao/springclould-seata/
项目结构
基于springBoot快速搭建3个微服务,网关gateway,订单服务order,商品服务goods,,RPC之间通讯用的openfeign,浏览器会先通过网关访问订单服务,商品服务。
引入seata相关jar包依赖
订单,商品相关业务服务,引入seata依赖
<!-- seata -->
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-alibaba-seata</artifactId>
<version>2.2.0.RELEASE</version>
<exclusions>
<exclusion>
<groupId>io.seata</groupId>
<artifactId>seata-spring-boot-starter</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>io.seata</groupId>
<artifactId>seata-spring-boot-starter</artifactId>
<version>1.4.2</version>
</dependency>
自定义SeataInterceptor
package com.goods.interceptor;
import io.seata.core.context.RootContext;
import lombok.extern.slf4j.Slf4j;
import org.springframework.web.servlet.handler.HandlerInterceptorAdapter;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
/**
* Copyright 2022 skyworth
*
* @Author: wuhao
* @CreateTime: 2023-06-22 10:52
* @Description: 事务传播拦截器 从请求header中获取远程调用xid
* @Version: 1.0
**/
@Slf4j
public class SeataInterceptor extends HandlerInterceptorAdapter {
@Override
public boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler) {
String xid = RootContext.getXID();
// 获取全局事务编号
String rpcXid = request.getHeader("TX_XID");
log.info("xid in RootContext[{}] xid in HttpContext[{}]", xid, rpcXid);
if (xid == null && rpcXid != null) {
//设置全局事务编号, 应用开启一个全局事务后,RootContext会自动绑定当前事务的XID,事务结束后也会自动解绑XID。所以在应用运行的过程中可以直接调用 RootContext.getXID()方法获取全局事务的唯一标识。
//将XID和当前进程绑定
RootContext.bind(rpcXid);
log.info("bind[{}] to RootContext", rpcXid);
}
return true;
}
}
自定义WebMvcConfig
、package com.goods.config;
import com.goods.interceptor.SeataInterceptor;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.servlet.config.annotation.InterceptorRegistry;
import org.springframework.web.servlet.config.annotation.WebMvcConfigurer;
/**
* Copyright 2022 skyworth
*
* @Author: wuhao
* @CreateTime: 2023-06-22 10:57
* @Description: 启动的时候添加一个拦截器 拦截针对的路径为:/**
* @Version: 1.0
**/
@Configuration
public class WebMvcConfig implements WebMvcConfigurer {
@Override
public void addInterceptors(InterceptorRegistry registry) {
registry.addInterceptor(new SeataInterceptor())
.addPathPatterns("/**")
.excludePathPatterns("/login");
}
}
核心业务代码:从order服务发起一个请求,在一个函数内执行下订单场景:先创建订单,再扣减库存,然后mock异常,让seata进行数据回滚
注解配置
调用方上配置@GlobalTransactional即可。被调用方不需要添加事务注解。因为主要是通过数据库中表undo_log来发起的数据库回滚。没有利用数据库本身的事务回滚功能。
/**
* 下订单场景:先创建订单,再扣减库存,
* @return
*/
//@Transactional(rollbackFor = Exception.class)
@GlobalTransactional
public String addOrder() {
log.info("创建订单,开始");
Order order = new Order();
order.setOrderNo("20230617");
order.setGoodsTotalCount(10);
order.setGoodsName("可乐");
order.setSkuNumber("33220011");
order.setModifyTime(LocalDateTime.now());
order.setCreateTime(LocalDateTime.now());
orderDao.save(order);
log.info("创建订单,结束");
ReduceGoodsRequest request = ReduceGoodsRequest.builder().reduceCount(10).id(1L).build();
log.info("商品扣减库存,开始,rpc调用商品服务,入参:{}",JSONObject.toJSONString(request));
ReduceGoodsResponse response = goodsFeignClient.reduce(request);
log.info("商品扣减库存,完成,rpc调用商品服务,出参:{}",JSONObject.toJSONString(response));
//mock异常,检查让分布式事务全回滚
int i = 10/0;
return "执行完成";
}
测试
启动gateway, order, goods, 3个微服务,执行上述下订单函数,异常时,是否数据回滚。
查看order服务日志得出:RmBranchRollbackProcessor已经对全局事务xid:2711681301535092746的事务分支进行回滚操作了
branchType=AT,这表示分支事务的类型是AT(可能是基于补偿的自动事务模式)。resourceId=jdbc:mysql://127.0.0.1:3306/mail_order,这表示分支事务关联的资源ID
再回头看mysql数据库:订单表,商品表无数据变更,证明seata已经自动把数据回滚了。
总结
seata的AT使用方式上,还是非常简单的。核心是保证xid能够传递到每一个微服务中。发生回滚操作时,通过TC来进行rpc通知。发起本地的事务回滚操作。
参考文献
seata官网:https://seata.apache.org/zh-cn/docs/overview/what-is-seata/
从分布式事务解决到Seata使用,一梭子给你整明白了 链接:https://juejin.cn/post/6944882663148748807
Seata(一) AT 模式探索 https://juejin.cn/post/7208084427393613882?log_from=b0b7edd30e6ae_1714050031728