文章目录
- 下载安装Seata
- 创建对应数据库
- 修改`application.yml`相应配置
- 启动Seata
- PmHub 实战——添加任务事务管理
- 业务库添加undo_log 表
- 对应服务加上对应的seata依赖
- Nacos 配置文件 pmhub-project-dev.yml 添加 seata 配置:
- 接口添加 @GlobalTransactional 注解
- 涉及数据表
- PmHub 实战——审批状态新建/更新
- Nacos服务 pmhub-workflow-dev.yml 配置新增:
- 具体代码实现
- 涉及数据表
- PmHub 实战——Seata 实战测试验证
- 服务相关服务
- ( 正常情况 )用前端访问Pmhub, 添加任务测试![在这里插入图片描述](https://i-blog.csdnimg.cn/direct/ea279907e22448bba0d87a048834a74e.png)
- 查看数据库数据是否正常新增
- ( 异常情况 )超时, 没有@GlobalTransactional回滚
- 查看数据库表的数据情况
- 解决办法, 添加GlobalTransactional![在这里插入图片描述](https://i-blog.csdnimg.cn/direct/f288e38e33184517bd4ae3e88fe100b7.png)
-
本地事务解决的是单数据源的数据一致性问题,分布式事务解决的是多数据源数据一致性问题。
-
在单体应用中的事务其实都属于本地事务,比如在 springboot 中在方法上加 @Transactional 注解,其实走的也是一种本地事务。
-
官网地址:https://seata.apache.org/zh-cn/
-
开源地址:https://github.com/apache/incubator-seata
- 有了 Seata,我们只需要在需要使用分布式事务的地方加上 @GlobalTransactional 注解即可。
- Seata 支持 3 种模式,AT 模式、TCC 模式、Saga 模式
模式 | 描述 | 优点 | 缺点 | 适用场景 |
---|---|---|---|---|
AT模式 | 自动补偿事务, 通过代理自动管理事务的提交回滚, 适合简单场景 | 易于使用, 开发成本低, 自动管理事务 | 依赖于数据库支持, 适合简单场景, 不适合复杂业务逻辑 | 简单的业务场景, 如单表操作, 小型微服务 |
TCC模式 | 开发者手动实现业务逻辑的Try/Confirm/Cancel三个阶段 | 提供强一致性, 适用于需要严格事务的场景 | 实现复杂, 开发成本高, 需要手动管理事务的各个阶段 | 对一致性要求高且业务操作较短的场景, 如支付, 交易等涉及资金的系统 |
Saga模式 | 长事务模式, 通过一些列的子事务来完成主事务, 子事务之间独立运行,如果某个子事务失败, 通过补偿事务进行回滚 | 无需全局锁, 高性能, 适用于长事务场景 | 需要开发补偿逻辑, 可能无法保证强一致性 | 业务流程长, 跨多个系统或服务的场景, 如订单管理, 供应链管理 |
下载安装Seata
- 下载地址:https://seata.apache.org/zh-cn/unversioned/download/seata-server/
创建对应数据库
- seata使用mysql作为db高可用数据库,在mysql创建一个 pmhub-seata库,并导入数据库脚本。
CREATE DATABASE `pmhub-seata` DEFAULT CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci;
SET NAMES utf8mb4;
SET FOREIGN_KEY_CHECKS = 0;
USE `pmhub-seata`;
-- -------------------------------- The script used when storeMode is 'db' --------------------------------
-- the table to store GlobalSession data
CREATE TABLE IF NOT EXISTS `global_table`
(
`xid` VARCHAR(128) NOT NULL,
`transaction_id` BIGINT,
`status` TINYINT NOT NULL,
`application_id` VARCHAR(32),
`transaction_service_group` VARCHAR(32),
`transaction_name` VARCHAR(128),
`timeout` INT,
`begin_time` BIGINT,
`application_data` VARCHAR(2000),
`gmt_create` DATETIME,
`gmt_modified` DATETIME,
PRIMARY KEY (`xid`),
KEY `idx_status_gmt_modified` (`status` , `gmt_modified`),
KEY `idx_transaction_id` (`transaction_id`)
) ENGINE = InnoDB
DEFAULT CHARSET = utf8mb4;
-- the table to store BranchSession data
CREATE TABLE IF NOT EXISTS `branch_table`
(
`branch_id` BIGINT NOT NULL,
`xid` VARCHAR(128) NOT NULL,
`transaction_id` BIGINT,
`resource_group_id` VARCHAR(32),
`resource_id` VARCHAR(256),
`branch_type` VARCHAR(8),
`status` TINYINT,
`client_id` VARCHAR(64),
`application_data` VARCHAR(2000),
`gmt_create` DATETIME(6),
`gmt_modified` DATETIME(6),
PRIMARY KEY (`branch_id`),
KEY `idx_xid` (`xid`)
) ENGINE = InnoDB
DEFAULT CHARSET = utf8mb4;
-- the table to store lock data
CREATE TABLE IF NOT EXISTS `lock_table`
(
`row_key` VARCHAR(128) NOT NULL,
`xid` VARCHAR(128),
`transaction_id` BIGINT,
`branch_id` BIGINT NOT NULL,
`resource_id` VARCHAR(256),
`table_name` VARCHAR(32),
`pk` VARCHAR(36),
`status` TINYINT NOT NULL DEFAULT '0' COMMENT '0:locked ,1:rollbacking',
`gmt_create` DATETIME,
`gmt_modified` DATETIME,
PRIMARY KEY (`row_key`),
KEY `idx_status` (`status`),
KEY `idx_branch_id` (`branch_id`),
KEY `idx_xid` (`xid`)
) ENGINE = InnoDB
DEFAULT CHARSET = utf8mb4;
CREATE TABLE IF NOT EXISTS `distributed_lock`
(
`lock_key` CHAR(20) NOT NULL,
`lock_value` VARCHAR(20) NOT NULL,
`expire` BIGINT,
primary key (`lock_key`)
) ENGINE = InnoDB
DEFAULT CHARSET = utf8mb4;
INSERT INTO `distributed_lock` (lock_key, lock_value, expire) VALUES ('AsyncCommitting', ' ', 0);
INSERT INTO `distributed_lock` (lock_key, lock_value, expire) VALUES ('RetryCommitting', ' ', 0);
INSERT INTO `distributed_lock` (lock_key, lock_value, expire) VALUES ('RetryRollbacking', ' ', 0);
INSERT INTO `distributed_lock` (lock_key, lock_value, expire) VALUES ('TxTimeoutCheck', ' ', 0);
SET FOREIGN_KEY_CHECKS = 1;
修改application.yml
相应配置
- 修改Nacos的部署地址
- 修改Mysql数据库的相关配置
server:
port: 7091
spring:
application:
name: seata-server
logging:
config: classpath:logback-spring.xml
file:
path: ${log.home:${user.home}/logs/seata}
extend:
logstash-appender:
destination: 127.0.0.1:4560
kafka-appender:
bootstrap-servers: 127.0.0.1:9092
topic: logback_to_logstash
console:
user:
username: seata
password: seata
seata:
service:
vgroup-mapping:
default_tx_group: default
disable-global-transaction: true
config:
type: nacos
nacos:
server-addr: 127.0.0.1:8848
namespace:
group: DEFAULT_GROUP
username: nacos
password: nacos
registry:
type: nacos
nacos:
application: seata-server
server-addr: 127.0.0.1:8848
group: DEFAULT_GROUP
namespace:
cluster: default
username: nacos
password: nacos
store:
mode: db
db:
datasource: druid
db-type: mysql
driver-class-name: com.mysql.cj.jdbc.Driver
url: jdbc:mysql://localhost:3306/pmhub-seata?characterEncoding=utf8&useSSL=false&serverTimezone=GMT%2B8&rewriteBatchedStatements=true&allowPublicKeyRetrieval=true
user: root
password: 123456
min-conn: 10
max-conn: 100
global-table: global_table
branch-table: branch_table
lock-table: lock_table
distributed-lock-table: distributed_lock
query-limit: 1000
max-wait: 5000
security:
secretKey: SeataSecretKey0c382ef121d778043159209298fd40bf3850a017
tokenValidityInMilliseconds: 1800000
ignore:
urls: /,/**/*.css,/**/*.js,/**/*.html,/**/*.map,/**/*.svg,/**/*.png,/**/*.jpeg,/**/*.ico,/api/v1/auth/login,/metadata/v1/**
启动Seata
- windows启动
**seata-server.bat**
- mac启动
**seata-server.sh **
- 访问 http://localhost:7091/**
- 查看Nacos中Seata是否成功注册上去
PmHub 实战——添加任务事务管理
- 创建项目任务的时候,需要添加或更新审批设置,
- 这里需要跨库调用,且涉及到修改不同的数据库,
- 在不同的微服务上,所以就要用到分布式事务处理
业务库添加undo_log 表
- 如果跟着前面的教程来进行的话, 这一步添加
**undo_log**
表就不用做了 - 因为这里使用的是 Seata 的 AT 模式,故需要在各自的业务数据库中新建 undo_log 回滚日志表,
- 这里主要是 pmhub-project 库和pmhub-workflow 库
-- for AT mode you must to init this sql for you business database. the seata server not need it.
CREATE TABLE IF NOT EXISTS `undo_log`
(
`branch_id` BIGINT NOT NULL COMMENT 'branch transaction id',
`xid` VARCHAR(128) NOT NULL COMMENT 'global transaction id',
`context` VARCHAR(128) NOT NULL COMMENT 'undo_log context,such as serialization',
`rollback_info` LONGBLOB NOT NULL COMMENT 'rollback info',
`log_status` INT(11) NOT NULL COMMENT '0:normal status,1:defense status',
`log_created` DATETIME(6) NOT NULL COMMENT 'create datetime',
`log_modified` DATETIME(6) NOT NULL COMMENT 'modify datetime',
UNIQUE KEY `ux_undo_log` (`xid`, `branch_id`)
) ENGINE = InnoDB AUTO_INCREMENT = 1 DEFAULT CHARSET = utf8mb4 COMMENT ='AT transaction mode undo table';
ALTER TABLE `undo_log` ADD INDEX `ix_log_created` (`log_created`);
对应服务加上对应的seata依赖
<!--分布式事务-->
<dependency>
<groupId>com.laigeoffer.pmhub-cloud</groupId>
<artifactId>pmhub-base-seata</artifactId>
</dependency>
Nacos 配置文件 pmhub-project-dev.yml 添加 seata 配置:
# 分布式事务
seata:
registry:
type: nacos
nacos:
server-addr: 127.0.0.1:8848
namespace: ""
group: DEFAULT_GROUP
application: seata-server
tx-service-group: default_tx_group # 事务组,由它获得TC服务的集群名称
service:
vgroup-mapping:
default_tx_group: default # 事务组与TC服务集群的映射关系
data-source-proxy-mode: AT
接口添加 @GlobalTransactional 注解
@Override
@GlobalTransactional(name = "pmhub-project-addTask",rollbackFor = Exception.class) //seata分布式事务,AT模式
public String add(TaskReqVO taskReqVO) {
// xid 全局事务id的检查(方便查看)
String xid = RootContext.getXID();
log.info("---------------开始新建任务: "+"\t"+"xid: "+xid);
if (ProjectStatusEnum.PAUSE.getStatus().equals(projectTaskMapper.queryProjectStatus(taskReqVO.getProjectId()))) {
throw new ServiceException("归属项目已暂停,无法新增任务");
}
// 1、添加任务
ProjectTask projectTask = new ProjectTask();
if (StringUtils.isNotBlank(taskReqVO.getTaskId())) {
projectTask.setTaskPid(taskReqVO.getTaskId());
}
BeanUtils.copyProperties(taskReqVO, projectTask);
projectTask.setCreatedBy(SecurityUtils.getUsername());
projectTask.setCreatedTime(new Date());
projectTask.setUpdatedBy(SecurityUtils.getUsername());
projectTask.setUpdatedTime(new Date());
projectTaskMapper.insert(projectTask);
// 2、添加任务成员
insertMember(projectTask.getId(), 1, SecurityUtils.getUserId());
// 3、添加日志
saveLog("addTask", projectTask.getId(), taskReqVO.getProjectId(), taskReqVO.getTaskName(), "参与了任务", null);
// 将执行人加入
if (taskReqVO.getUserId() != null && !Objects.equals(taskReqVO.getUserId(), SecurityUtils.getUserId())) {
insertMember(projectTask.getId(), 0, taskReqVO.getUserId());
// 添加日志
saveLog("invitePartakeTask", projectTask.getId(), taskReqVO.getProjectId(), taskReqVO.getTaskName(), "邀请 " + getSysUserList(Collections.singletonList(taskReqVO.getUserId())).get(0).getNickName() + " 参与任务", taskReqVO.getUserId());
}
// 4、任务指派消息提醒
extracted(taskReqVO.getTaskName(), taskReqVO.getUserId(), SecurityUtils.getUsername(), projectTask.getId());
// 5、添加或更新审批设置(远程调用 pmhub-workflow 微服务)
ApprovalSetDTO approvalSetDTO = new ApprovalSetDTO(projectTask.getId(), ProjectStatusEnum.TASK.getStatusName(),
taskReqVO.getApproved(), taskReqVO.getDefinitionId(), taskReqVO.getDeploymentId());
R<Boolean> result = wfDeployService.insertOrUpdateApprovalSet(approvalSetDTO, SecurityConstants.INNER);
if (Objects.isNull(result) || Objects.isNull(result.getData())
|| R.fail().equals(result.getData())) {
throw new ServiceException("远程调用审批服务失败");
}
log.info("---------------结束新建任务: "+"\t"+"xid: "+xid);
return projectTask.getId();
}
涉及数据表
**pmhub_project_task**
**pmhub_project_member**
**pmhub_project_log**
PmHub 实战——审批状态新建/更新
Nacos服务 pmhub-workflow-dev.yml 配置新增:
# 分布式事务配置
seata:
registry:
type: nacos
nacos:
server-addr: 127.0.0.1:8848
namespace: ""
group: DEFAULT_GROUP
application: seata-server
tx-service-group: default_tx_group # 事务组,由它获得TC服务的集群名称
service:
vgroup-mapping:
default_tx_group: default # 事务组与TC服务集群的映射关系
data-source-proxy-mode: AT
具体代码实现
@Override
public boolean insertOrUpdateApprovalSet(String extraId, String type, String approved, String definitionId, String deploymentId) {
LambdaQueryWrapper<WfApprovalSet> qw = new LambdaQueryWrapper<>();
qw.eq(WfApprovalSet::getExtraId, extraId).eq(WfApprovalSet::getType, type);
WfApprovalSet mas = wfApprovalSetMapper.selectOne(qw);
if (mas != null) {
mas.setApproved(approved);
mas.setDefinitionId(definitionId);
mas.setDeploymentId(deploymentId);
mas.setUpdatedBy(SecurityUtils.getUsername());
mas.setUpdatedTime(new Date());
wfApprovalSetMapper.updateById(mas);
} else {
WfApprovalSet wfApprovalSet = new WfApprovalSet();
wfApprovalSet.setExtraId(extraId);
wfApprovalSet.setType(type);
wfApprovalSet.setApproved(approved);
wfApprovalSet.setDefinitionId(definitionId);
wfApprovalSet.setDeploymentId(deploymentId);
wfApprovalSet.setCreatedBy(SecurityUtils.getUsername());
wfApprovalSet.setCreatedTime(new Date());
wfApprovalSet.setUpdatedBy(SecurityUtils.getUsername());
wfApprovalSet.setUpdatedTime(new Date());
wfApprovalSetMapper.insert(wfApprovalSet);
}
return true;
}
涉及数据表
- pmhub-workflow -> pmhub_wf_approval_set
PmHub 实战——Seata 实战测试验证
服务相关服务
- pmhub-gateway
- pmhub-auth
- pmhub-system
- pmhub-project
- pmhub-workflow
( 正常情况 )用前端访问Pmhub, 添加任务测试
查看数据库数据是否正常新增
-
pmhub_project_task
-
pmhub_project_member
- pmhub_project_log
( 异常情况 )超时, 没有@GlobalTransactional回滚
-
修改代码, 让线程睡眠10秒
-
继续添加任务, 体验超时异常
查看数据库表的数据情况
-
pmhub_project_task
-
pmhub_project_member
- pmhub_project_log
- 可以看到项目数据库中的几个表都正常创建了,也就是流程服务超时,
- 并没有阻止之前其他服务创建数据的操作,这不是妥妥的脏数据吗?
- 我们再来看看流程表的数据
**pmhub_wf_approval_set**
- 前台都提示响应超时了, 这些数据本应该不存在的, 现在全都存在了, 绝不可以!
解决办法, 添加GlobalTransactional
- 再次测试就会发现数据并没有新增到数据库了, 分布式事务回滚成功!
**undo_log**
** 中存下了插入表格的所有信息状态, 是为了方便分布式事务回滚的。**- 当任务执行完,
**undo_log**
** 中数据会被清空,**undo_log**
只是暂时记录一下回滚信息,** - 出现异常回滚玩,记录自然而然也就不见了。