一、前言
通过以下系列章节:
docker-compose 实现Seata Server高可用部署 | Spring Cloud 51
Seata AT 模式理论学习、事务隔离及部分源码解析 | Spring Cloud 52
Spring Boot集成Seata利用AT模式分布式事务示例 | Spring Cloud 53
Seata XA 模式理论学习、使用及注意事项 | Spring Cloud54
Seata TCC 模式理论学习、生产级使用示例搭建及注意事项 | Spring Cloud55
Seata TCC 模式下解决幂等、悬挂、空回滚问题 | Spring Cloud56
Seata Saga 模式理论学习、生产级使用示例搭建及注意事项(一) | Spring Cloud57
我们对Seata
及其AT
、XA
、TCC
、Saga
事务模式的理论、使用有了深入的了解,今天专注学习Seata
的Saga
事务模式代码实现;并区别与官网,我们利用openfeign
进行生产级示例搭建,降低入门难度。
二、示例搭建
书接上回,本示例是一个商品下单的案例,一共有三个服务和一个公共模块:
order-saga
:业务服务,用户下单操作将在这里完成。account-saga
:账户服务,可以查询/修改用户的账户信息storage-saga
:仓储服务,可以查询/修改商品的库存数量。common-tcc
:公共模块,包含:实体类、openfeign接口、统一异常处理等。
2.1 业务表结构
2.1.1 订单表
DROP TABLE IF EXISTS `t_order`;
CREATE TABLE `t_order` (
`id` bigint NOT NULL AUTO_INCREMENT,
`user_id` varchar(255) CHARACTER SET utf8mb3 COLLATE utf8mb3_general_ci NULL DEFAULT NULL,
`commodity_code` varchar(255) CHARACTER SET utf8mb3 COLLATE utf8mb3_general_ci NULL DEFAULT NULL,
`count` int NULL DEFAULT 0,
`money` decimal(10, 2) NULL DEFAULT 0.00,
`business_key` varchar(255) CHARACTER SET utf8mb3 COLLATE utf8mb3_general_ci NULL DEFAULT NULL,
PRIMARY KEY (`id`) USING BTREE
) ENGINE = InnoDB AUTO_INCREMENT = 23 CHARACTER SET = utf8mb3 COLLATE = utf8mb3_general_ci ROW_FORMAT = Dynamic;
2.1.2 库存表
-- ----------------------------
-- Table structure for t_storage
-- ----------------------------
DROP TABLE IF EXISTS `t_storage`;
CREATE TABLE `t_storage` (
`id` bigint NOT NULL AUTO_INCREMENT,
`commodity_code` varchar(255) CHARACTER SET utf8mb3 COLLATE utf8mb3_general_ci NULL DEFAULT NULL,
`count` int NULL DEFAULT 0,
`business_key` varchar(255) CHARACTER SET utf8mb3 COLLATE utf8mb3_general_ci NULL DEFAULT NULL,
PRIMARY KEY (`id`) USING BTREE,
UNIQUE INDEX `commodity_code`(`commodity_code` ASC) USING BTREE
) ENGINE = InnoDB AUTO_INCREMENT = 2 CHARACTER SET = utf8mb3 COLLATE = utf8mb3_general_ci ROW_FORMAT = Dynamic;
-- ----------------------------
-- Records of t_storage
-- ----------------------------
INSERT INTO `t_storage` VALUES (1, 'iphone', 3, '');
2.1.3 账户表
-- ----------------------------
-- Table structure for t_account
-- ----------------------------
DROP TABLE IF EXISTS `t_account`;
CREATE TABLE `t_account` (
`id` bigint NOT NULL AUTO_INCREMENT,
`user_id` varchar(255) CHARACTER SET utf8mb3 COLLATE utf8mb3_general_ci NULL DEFAULT NULL COMMENT '用户ID',
`money` decimal(10, 2) NULL DEFAULT 0.00 COMMENT '账户余额',
`business_key` varchar(255) CHARACTER SET utf8mb3 COLLATE utf8mb3_general_ci NULL DEFAULT NULL COMMENT '业务标识',
PRIMARY KEY (`id`) USING BTREE
) ENGINE = InnoDB AUTO_INCREMENT = 2 CHARACTER SET = utf8mb3 COLLATE = utf8mb3_general_ci ROW_FORMAT = Dynamic;
-- ----------------------------
-- Records of t_account
-- ----------------------------
INSERT INTO `t_account` VALUES (1, 'user1', 500.00, '');
2.2 项目总体结构
2.3 common-saga 搭建
2.3.1 实体类
com/gm/seata/openfeign/entity/Account.java
:
import com.baomidou.mybatisplus.annotation.IdType;
import com.baomidou.mybatisplus.annotation.TableId;
import com.baomidou.mybatisplus.annotation.TableName;
import lombok.Data;
import java.math.BigDecimal;
@Data
@TableName("t_account")
public class Account {
@TableId(type = IdType.ASSIGN_ID)
private long id;
private String userId;
private BigDecimal money;
private String businessKey;
}
com/gm/seata/openfeign/entity/Order.java
:
import com.baomidou.mybatisplus.annotation.IdType;
import com.baomidou.mybatisplus.annotation.TableId;
import com.baomidou.mybatisplus.annotation.TableName;
import lombok.Data;
import java.math.BigDecimal;
@Data
@TableName("t_order")
public class Order {
@TableId(type = IdType.ASSIGN_ID)
private long id;
private String userId;
private String commodityCode;
private int count;
private BigDecimal money;
private String businessKey;
}
com/gm/seata/openfeign/entity/Storage.java
:
import com.baomidou.mybatisplus.annotation.IdType;
import com.baomidou.mybatisplus.annotation.TableId;
import com.baomidou.mybatisplus.annotation.TableName;
import lombok.Data;
@Data
@TableName("t_storage")
public class Storage {
@TableId(type = IdType.ASSIGN_ID)
private long id;
private String commodityCode;
private int count;
private String businessKey;
}
2.3.2 feign接口
com/gm/seata/openfeign/feign/AccountServiceApi.java
:
import com.gm.seata.openfeign.util.R;
import org.springframework.cloud.openfeign.FeignClient;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.RequestParam;
import java.math.BigDecimal;
@FeignClient(value = "account-saga")
public interface AccountServiceApi {
/**
* 扣除账户余额
*
* @param userId
* @param money
* @return
*/
@RequestMapping(value = "deduct", method = RequestMethod.GET)
R<Boolean> deduct(@RequestParam("businessKey") String businessKey, @RequestParam("userId") String userId, @RequestParam("money") BigDecimal money);
/**
* 补偿扣除的账户余额
*
* @param userId
* @param money
* @return
*/
@RequestMapping(value = "compensateDeduct", method = RequestMethod.GET)
R<Boolean> compensateDeduct(@RequestParam("businessKey") String businessKey, @RequestParam("userId") String userId, @RequestParam("money") BigDecimal money);
}
com/gm/seata/openfeign/feign/StorageServiceApi.java
:
import com.gm.seata.openfeign.util.R;
import org.springframework.cloud.openfeign.FeignClient;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.RequestParam;
@FeignClient(value = "storage-saga")
public interface StorageServiceApi {
/**
* 扣除库存
*
* @param commodityCode
* @param count
* @return
*/
@RequestMapping(value = "deduct", method = RequestMethod.GET)
R<Boolean> deduct(@RequestParam("businessKey") String businessKey, @RequestParam("commodityCode") String commodityCode, @RequestParam("count") Integer count);
/**
* 补偿扣除的库存
*
* @param commodityCode
* @param count
* @return
*/
@RequestMapping(value = "compensateDeduct", method = RequestMethod.GET)
R<Boolean> compensateDeduct(@RequestParam("businessKey") String businessKey, @RequestParam("commodityCode") String commodityCode, @RequestParam("count") Integer count);
}
2.3.3 feign 微服务调用异码处理
com/gm/seata/openfeign/handle/FeignErrorDecoder.java
:
import com.alibaba.fastjson.JSONObject;
import com.gm.seata.openfeign.util.ErrorEnum;
import feign.Response;
import feign.RetryableException;
import feign.Util;
import feign.codec.ErrorDecoder;
import lombok.extern.slf4j.Slf4j;
import org.springframework.context.annotation.Configuration;
import java.nio.charset.Charset;
@Slf4j
@Configuration
public class FeignErrorDecoder extends ErrorDecoder.Default {
@Override
public Exception decode(String methodKey, Response response) {
try {
// 可以自定义一些逻辑
String message = Util.toString(response.body().asReader(Charset.forName("utf8")));
JSONObject jsonObject = JSONObject.parseObject(message);
int code = jsonObject.getInteger("code");
ErrorEnum errorEnum = ErrorEnum.getEnumByCode(code);
// 包装成自己自定义的异常
return new RuntimeException(String.valueOf(errorEnum.getCode()));
} catch (Exception e) {
log.error("非已知异常", e.getMessage(), e);
}
Exception exception = super.decode(methodKey, response);
// 如果是RetryableException,则返回继续重试
if (exception instanceof RetryableException) {
return exception;
}
return new RuntimeException(String.valueOf(ErrorEnum.UNKNOWN_EXCEPTION.getCode()));
}
}
2.3.4 Controller 统一异常处理
com/gm/seata/openfeign/handle/GlobalBizExceptionHandler.java
:
import com.gm.seata.openfeign.util.ErrorEnum;
import com.gm.seata.openfeign.util.R;
import lombok.extern.slf4j.Slf4j;
import org.springframework.core.annotation.Order;
import org.springframework.http.HttpStatus;;
import org.springframework.web.bind.annotation.ExceptionHandler;
import org.springframework.web.bind.annotation.ResponseStatus;
import org.springframework.web.bind.annotation.RestControllerAdvice;
/**
* 全局异常处理器
*/
@Slf4j
@Order(10000)
@RestControllerAdvice
public class GlobalBizExceptionHandler {
/**
* 全局异常.
*
* @param e the e
* @return R
*/
@ExceptionHandler(Exception.class)
@ResponseStatus(HttpStatus.INTERNAL_SERVER_ERROR)
public R handleGlobalException(Exception e) {
log.error("全局异常信息 ex={}", e.getMessage(), e);
R r = null;
// 根据异常信息与已知异常进行匹配
try {
int code = Integer.parseInt(e.getLocalizedMessage());
ErrorEnum errorEnum = ErrorEnum.getEnumByCode(code);
if (errorEnum != null) {
r = R.restResult(null, errorEnum.getCode(), errorEnum.getTitle());
}
} finally {
if (e instanceof feign.FeignException) {
ErrorEnum errorEnum = ErrorEnum.UNKNOWN_EXCEPTION;
r = R.restResult(null, errorEnum.getCode(), errorEnum.getTitle());
}
if (r == null) {
r = R.failed(e.getLocalizedMessage());
}
}
return r;
}
}
3.2.4 Controller 统一异常处理
com/gm/seata/openfeign/handle/GlobalBizExceptionHandler.java
:
import com.gm.seata.openfeign.util.ErrorEnum;
import com.gm.seata.openfeign.util.R;
import lombok.extern.slf4j.Slf4j;
import org.springframework.core.annotation.Order;
import org.springframework.http.HttpStatus;;
import org.springframework.web.bind.annotation.ExceptionHandler;
import org.springframework.web.bind.annotation.ResponseStatus;
import org.springframework.web.bind.annotation.RestControllerAdvice;
/**
* 全局异常处理器
*/
@Slf4j
@Order(10000)
@RestControllerAdvice
public class GlobalBizExceptionHandler {
/**
* 全局异常.
*
* @param e the e
* @return R
*/
@ExceptionHandler(Exception.class)
@ResponseStatus(HttpStatus.INTERNAL_SERVER_ERROR)
public R handleGlobalException(Exception e) {
log.error("全局异常信息 ex={}", e.getMessage(), e);
R r = null;
// 根据异常信息与已知异常进行匹配
try {
int code = Integer.parseInt(e.getLocalizedMessage());
ErrorEnum errorEnum = ErrorEnum.getEnumByCode(code);
if (errorEnum != null) {
r = R.restResult(null, errorEnum.getCode(), errorEnum.getTitle());
}
} finally {
if (e instanceof feign.FeignException) {
ErrorEnum errorEnum = ErrorEnum.UNKNOWN_EXCEPTION;
r = R.restResult(null, errorEnum.getCode(), errorEnum.getTitle());
}
if (r == null) {
r = R.failed(e.getLocalizedMessage());
}
}
return r;
}
}
3.2.5 已知异常枚举类
com/gm/seata/openfeign/util/ErrorEnum.java
:
import lombok.AllArgsConstructor;
import lombok.Getter;
@Getter
@AllArgsConstructor
public enum ErrorEnum {
NO_SUCH_COMMODITY(3000, "无此商品"),
STORAGE_LOW_PREPARE(3001, "库存不足,预扣库存失败"),
STORAGE_LOW_COMMIT(3002, "库存不足,扣库存失败"),
NO_SUCH_ACCOUNT(4000, "无此账户"),
ACCOUNT_LOW_PREPARE(4001, "余额不足,预扣款失败"),
ACCOUNT_LOW_COMMIT(4002, "余额不足,扣款失败"),
UNKNOWN_EXCEPTION(9999, "远程方法调用异常");
private final Integer code;
private final String title;
public static ErrorEnum getEnumByCode(int code) {
for (ErrorEnum error : ErrorEnum.values()) {
if (error.getCode().equals(code)) {
return error;
}
}
return null;
}
}
3.2.6 响应信息结构体
com/gm/seata/openfeign/util/R.java
:
import lombok.*;
import lombok.experimental.Accessors;
import java.io.Serializable;
/**
* 响应信息主体
*
*/
@ToString
@NoArgsConstructor
@AllArgsConstructor
@Accessors(chain = true)
public class R<T> implements Serializable {
private static final long serialVersionUID = 1L;
/**
* 成功标记
*/
private static final Integer SUCCESS = 0;
/**
* 失败标记
*/
private static final Integer FAIL = 1;
@Getter
@Setter
private int code;
@Getter
@Setter
private String msg;
@Getter
@Setter
private T data;
public static <T> R<T> ok() {
return restResult(null, SUCCESS, null);
}
public static <T> R<T> ok(T data) {
return restResult(data, SUCCESS, null);
}
public static <T> R<T> ok(T data, String msg) {
return restResult(data, SUCCESS, msg);
}
public static <T> R<T> failed() {
return restResult(null, FAIL, null);
}
public static <T> R<T> failed(String msg) {
return restResult(null, FAIL, msg);
}
public static <T> R<T> failed(T data) {
return restResult(data, FAIL, null);
}
public static <T> R<T> failed(T data, String msg) {
return restResult(data, FAIL, msg);
}
public static <T> R<T> restResult(T data, int code, String msg) {
R<T> apiResult = new R<>();
apiResult.setCode(code);
apiResult.setData(data);
apiResult.setMsg(msg);
return apiResult;
}
}
3.2.7 自动配置实现
在src/main/resources/META-INF/spring
路径下新建文件org.springframework.boot.autoconfigure.AutoConfiguration.imports
内容如下:
com.gm.seata.openfeign.handle.GlobalBizExceptionHandler
com.gm.seata.openfeign.handle.FeignErrorDecoder
新建文件org.springframework.cloud.openfeign.FeignClient.imports
内容如下:
com.gm.seata.openfeign.feign.AccountServiceApi
com.gm.seata.openfeign.feign.OrderServiceApi
com.gm.seata.openfeign.feign.StorageServiceApi
通过上述方式实现自动配置。
2.4 account-saga 搭建
2.4.1 完整依赖
seata/openfeign-saga/account-saga/pom.xml
:
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>openfeign-saga</artifactId>
<groupId>com.gm</groupId>
<version>0.0.1-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>account-saga</artifactId>
<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>
<dependencies>
<dependency>
<groupId>com.gm</groupId>
<artifactId>common-saga</artifactId>
<version>0.0.1-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-bootstrap</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-loadbalancer</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-openfeign</artifactId>
</dependency>
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-alibaba-nacos-discovery</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>com.mysql</groupId>
<artifactId>mysql-connector-j</artifactId>
</dependency>
<dependency>
<groupId>com.baomidou</groupId>
<artifactId>mybatis-plus-boot-starter</artifactId>
<version>3.5.3.1</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
2.4.2 配置文件
src/main/resources/bootstrap.yml
:
server:
port: 3011
spring:
application:
name: @artifactId@
cloud:
nacos:
username: @nacos.username@
password: @nacos.password@
discovery:
server-addr: ${NACOS_HOST:nacos1.kc}:${NACOS_PORT:8848}
datasource:
driver-class-name: com.mysql.cj.jdbc.Driver
url: jdbc:mysql://192.168.0.35:3306/seata-saga-demo?useUnicode=true&characterEncoding=UTF-8&rewriteBatchedStatements=true&allowMultiQueries=true&serverTimezone=Asia/Shanghai
username: root
password: '1qaz@WSX'
# mybatis-plus配置控制台打印完整带参数SQL语句
mybatis-plus:
configuration:
log-impl: org.apache.ibatis.logging.stdout.StdOutImpl
2.5.3 功能搭建
2.4.3.1 启动类
com/gm/seata/openfeign/AccountTCCApplication.java
:
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.client.discovery.EnableDiscoveryClient;
@SpringBootApplication
@EnableDiscoveryClient
public class AccountTCCApplication {
public static void main(String[] args) {
SpringApplication.run(AccountTCCApplication.class, args);
}
}
2.4.3.2 Mapper类
com/gm/seata/openfeign/mapper/AccountMapper.java
:
import com.baomidou.mybatisplus.core.mapper.BaseMapper;
import com.gm.seata.openfeign.entity.Account;
import org.apache.ibatis.annotations.Mapper;
import org.apache.ibatis.annotations.Param;
import org.apache.ibatis.annotations.Select;
@Mapper
public interface AccountMapper extends BaseMapper<Account> {
@Select("SELECT * FROM t_account WHERE user_id = #{userId} limit 1")
Account getAccountByUserId(@Param("userId") String userId);
}
2.4.3.3 Service类
com/gm/seata/openfeign/service/AccountService.java
:
import java.math.BigDecimal;
public interface AccountService {
/**
* 扣除账户余额
*
* @param userId
* @param money
* @return
*/
boolean deduct(String businessKey, String userId, BigDecimal money);
/**
* 补充扣除的账户余额
*
* @param userId
* @param money
* @return
*/
boolean compensateDeduct(String businessKey, String userId, BigDecimal money);
}
com/gm/seata/openfeign/service/impl/AccountServiceImpl.java
:
import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
import com.gm.seata.openfeign.entity.Account;
import com.gm.seata.openfeign.mapper.AccountMapper;
import com.gm.seata.openfeign.service.AccountService;
import com.gm.seata.openfeign.util.ErrorEnum;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import java.math.BigDecimal;
@Slf4j
@Service
public class AccountServiceImpl implements AccountService {
@Autowired
AccountMapper accountMapper;
@Override
public boolean deduct(String businessKey, String userId, BigDecimal money) {
Account account = accountMapper.getAccountByUserId(userId);
if (account == null) {
//throw new RuntimeException("账户不存在");
throw new RuntimeException(String.valueOf(ErrorEnum.NO_SUCH_ACCOUNT.getCode()));
}
// 账户余额 与 本次消费金额进行 比较
if (account.getMoney().compareTo(money) < 0) {
//throw new RuntimeException("余额不足,预扣款失败");
throw new RuntimeException(String.valueOf(ErrorEnum.ACCOUNT_LOW_PREPARE.getCode()));
}
account.setMoney(account.getMoney().subtract(money));
account.setBusinessKey(businessKey);
QueryWrapper query = new QueryWrapper();
query.eq("user_id", userId);
int i = accountMapper.update(account, query);
log.info("{} 账户余额扣除 {} 元", userId, money);
return i == 1;
}
@Override
public boolean compensateDeduct(String businessKey, String userId, BigDecimal money) {
Account account = accountMapper.getAccountByUserId(userId);
account.setMoney(account.getMoney().add(money));
QueryWrapper query = new QueryWrapper();
query.eq("user_id", userId);
query.eq("business_key", businessKey);
int i = accountMapper.update(account, query);
if (i == 1) {
log.info("{} 账户补偿余额 {} 元", userId, money);
} else {
log.info("{} 账户无需补偿", userId);
}
return i == 1;
}
}
2.4.3.4 Controller类
com/gm/seata/openfeign/controller/AccountController.java
:
import com.gm.seata.openfeign.service.AccountService;
import com.gm.seata.openfeign.util.R;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
import java.math.BigDecimal;
@RestController
public class AccountController {
@Autowired
AccountService accountService;
/**
* 扣除账户余额
*
* @param userId
* @param money
* @return
*/
@RequestMapping(value = "deduct", method = RequestMethod.GET)
public R<Boolean> deduct(@RequestParam("businessKey") String businessKey, @RequestParam("userId") String userId, @RequestParam("money") BigDecimal money) {
return R.ok(accountService.deduct(businessKey, userId, money));
}
/**
* 补偿扣除的账户余额
*
* @param userId
* @param money
* @return
*/
@RequestMapping(value = "compensateDeduct", method = RequestMethod.GET)
public R<Boolean> compensateDeduct(@RequestParam("businessKey") String businessKey, @RequestParam("userId") String userId, @RequestParam("money") BigDecimal money) {
return R.ok(accountService.compensateDeduct(businessKey, userId, money));
}
}
对外提供两个方法,对应
Saga
状态机中:ServiceTask
节点AccountService-deduct
远程调用和补偿节点Compensation
节点AccountService-compensateDeduc
远程调用,如下图:
2.5 storage-saga 搭建
2.5.1 完整依赖
减少重复内容,请参考 2.4.1 部分,自动修改
2.5.2 配置文件
减少重复内容,请参考 2.4.2 部分,自动修改
2.5.3 功能搭建
2.5.3.1 启动类
com/gm/seata/openfeign/StorageTCCApplication.java
:
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.client.discovery.EnableDiscoveryClient;
@SpringBootApplication
@EnableDiscoveryClient
public class StorageTCCApplication {
public static void main(String[] args) {
SpringApplication.run(StorageTCCApplication.class, args);
}
}
2.5.3.2 Mapper类
com/gm/seata/openfeign/mapper/StorageMapper.java
:
import com.baomidou.mybatisplus.core.mapper.BaseMapper;
import com.gm.seata.openfeign.entity.Storage;
import org.apache.ibatis.annotations.Mapper;
import org.apache.ibatis.annotations.Param;
import org.apache.ibatis.annotations.Select;
@Mapper
public interface StorageMapper extends BaseMapper<Storage> {
@Select("SELECT * FROM t_storage WHERE commodity_code = #{commodityCode} limit 1")
Storage getStorageByCommodityCode(@Param("commodityCode") String commodityCode);
}
2.5.3.3 Service类
com/gm/seata/openfeign/service/StorageService.java
:
public interface StorageService {
/**
* 扣除库存
*
* @param commodityCode
* @param count
* @return
*/
boolean deduct(String businessKey, String commodityCode, Integer count);
/**
* 补充扣除库存
*
* @param commodityCode
* @param count
* @return
*/
boolean compensateDeduct(String businessKey, String commodityCode, Integer count);
}
com/gm/seata/openfeign/service/impl/StorageServiceImpl.java
:
import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
import com.gm.seata.openfeign.entity.Storage;
import com.gm.seata.openfeign.mapper.StorageMapper;
import com.gm.seata.openfeign.service.StorageService;
import com.gm.seata.openfeign.util.ErrorEnum;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
@Slf4j
@Service
public class StorageServiceImpl implements StorageService {
@Autowired
StorageMapper storageMapper;
@Override
@Transactional(rollbackFor = Exception.class)
public boolean deduct(String businessKey, String commodityCode, Integer count) {
Storage storage = storageMapper.getStorageByCommodityCode(commodityCode);
if (storage == null) {
//throw new RuntimeException("商品不存在");
throw new RuntimeException(String.valueOf(ErrorEnum.NO_SUCH_COMMODITY.getCode()));
}
if (storage.getCount() < count) {
//throw new RuntimeException("库存不足,预扣库存失败");
throw new RuntimeException(String.valueOf(ErrorEnum.STORAGE_LOW_PREPARE.getCode()));
}
storage.setCount(storage.getCount() - count);
storage.setBusinessKey(businessKey);
QueryWrapper query = new QueryWrapper();
query.eq("commodity_code", commodityCode);
Integer i = storageMapper.update(storage, query);
log.info("{} 商品库存扣除 {} 个", commodityCode, count);
return i == 1;
}
@Override
@Transactional(rollbackFor = Exception.class)
public boolean compensateDeduct(String businessKey, String commodityCode, Integer count) {
Storage storage = storageMapper.getStorageByCommodityCode(commodityCode);
storage.setCount(storage.getCount() + count);
QueryWrapper query = new QueryWrapper();
query.eq("commodity_code", commodityCode);
query.eq("business_key", businessKey);
Integer i = storageMapper.update(storage, query);
log.info("{} 商品补偿库存 {} 个", commodityCode, count);
return i == 1;
}
}
2.5.3.4 Controller类
com/gm/seata/openfeign/controller/StorageController.java
:
import com.gm.seata.openfeign.service.StorageService;
import com.gm.seata.openfeign.util.R;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
@RestController
public class StorageController {
@Autowired
StorageService storageService;
/**
* 扣除商品库存
*
* @param commodityCode
* @param count
* @return
*/
@RequestMapping(value = "deduct", method = RequestMethod.GET)
public R<Boolean> deduct(@RequestParam("businessKey") String businessKey, @RequestParam("commodityCode") String commodityCode, @RequestParam("count") Integer count) {
return R.ok(storageService.deduct(businessKey, commodityCode, count));
}
/**
* 扣除商品库存
*
* @param commodityCode
* @param count
* @return
*/
@RequestMapping(value = "compensateDeduct", method = RequestMethod.GET)
public R<Boolean> compensateDeduct(@RequestParam("businessKey") String businessKey, @RequestParam("commodityCode") String commodityCode, @RequestParam("count") Integer count) {
return R.ok(storageService.compensateDeduct(businessKey, commodityCode, count));
}
}
对外提供两个方法,对应
Saga
状态机中:ServiceTask
节点StorageService-deduct
远程调用和补偿节点Compensation
节点`StorageService-compensateDeduct远程调用,如下图:
2.6 order-saga 搭建
2.6.1 完整依赖
seata/openfeign-saga/common-saga/pom.xml
:
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>openfeign-saga</artifactId>
<groupId>com.gm</groupId>
<version>0.0.1-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>order-saga</artifactId>
<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>
<dependencies>
<dependency>
<groupId>com.gm</groupId>
<artifactId>common-saga</artifactId>
<version>0.0.1-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-bootstrap</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-loadbalancer</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-openfeign</artifactId>
</dependency>
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-alibaba-nacos-discovery</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
<!-- 注意一定要引入对版本,要引入spring-cloud版本seata,而不是springboot版本的seata-->
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-alibaba-seata</artifactId>
<!-- 排除掉springcloud默认的seata版本,以免版本不一致出现问题-->
<exclusions>
<exclusion>
<groupId>io.seata</groupId>
<artifactId>seata-spring-boot-starter</artifactId>
</exclusion>
<exclusion>
<groupId>io.seata</groupId>
<artifactId>seata-all</artifactId>
</exclusion>
</exclusions>
</dependency>
<!-- 上面排除掉了springcloud默认色seata版本,此处引入和seata-server版本对应的seata包-->
<dependency>
<groupId>io.seata</groupId>
<artifactId>seata-spring-boot-starter</artifactId>
<version>1.6.1</version>
</dependency>
<!--<dependency>
<groupId>io.seata</groupId>
<artifactId>seata-spring-boot-starter</artifactId>
</dependency>-->
<dependency>
<groupId>com.mysql</groupId>
<artifactId>mysql-connector-j</artifactId>
</dependency>
<dependency>
<groupId>com.baomidou</groupId>
<artifactId>mybatis-plus-boot-starter</artifactId>
<version>3.5.3.1</version>
</dependency>
<dependency>
<groupId>com.gm</groupId>
<artifactId>common-tcc</artifactId>
<version>0.0.1-SNAPSHOT</version>
<scope>compile</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
<plugin>
<groupId>io.fabric8</groupId>
<artifactId>docker-maven-plugin</artifactId>
</plugin>
</plugins>
<resources>
<resource>
<directory>src/main/resources</directory>
<filtering>true</filtering>
<excludes>
<exclude>**/*.xlsx</exclude>
<exclude>**/*.xls</exclude>
</excludes>
</resource>
<resource>
<directory>src/main/resources</directory>
<filtering>false</filtering>
<includes>
<include>**/*.xlsx</include>
<include>**/*.xls</include>
</includes>
</resource>
</resources>
</build>
</project>
2.6.2 配置文件
src/main/resources/bootstrap.yml
:
server:
port: 3012
spring:
application:
name: @artifactId@
cloud:
nacos:
username: @nacos.username@
password: @nacos.password@
discovery:
server-addr: ${NACOS_HOST:nacos1.kc}:${NACOS_PORT:8848}
datasource:
driver-class-name: com.mysql.cj.jdbc.Driver
url: jdbc:mysql://192.168.0.35:3306/seata-saga-demo?useUnicode=true&characterEncoding=UTF-8&rewriteBatchedStatements=true&allowMultiQueries=true&serverTimezone=Asia/Shanghai
username: root
password: '1qaz@WSX'
seata:
# 是否开启spring-boot自动装配,seata-spring-boot-starter 专有配置,默认true
enabled: true
# 是否开启数据源自动代理,seata-spring-boot-starter专有配置,默认会开启数据源自动代理,可通过该配置项关闭
enable-auto-data-source-proxy: false
# 配置自定义事务组名称,需与下方server.vgroupMapping配置一致,程序会通过用户配置的配置中心去寻找service.vgroupMapping
tx-service-group: mygroup
config: # 从nacos配置中心获取client端配置
type: nacos
nacos:
server-addr: ${NACOS_HOST:nacos1.kc}:${NACOS_PORT:8848}
group : DEFAULT_GROUP
namespace: a4c150aa-fd09-4595-9afe-c87084b22105
dataId: seataServer.properties
username: @nacos.username@
password: @nacos.username@
registry: # 通过服务中心通过服务发现获取seata-server服务地址
type: nacos
nacos:
# 注:客户端注册中心配置的serverAddr和namespace与Server端一致,clusterName与Server端cluster一致
application: seata-server # 此处与seata-server的application一致,才能通过服务发现获取服务地址
group : DEFAULT_GROUP
server-addr: ${NACOS_HOST:nacos1.kc}:${NACOS_PORT:8848}
userName: @nacos.username@
password: @nacos.username@
namespace: a4c150aa-fd09-4595-9afe-c87084b22105
service:
# 应用程序(客户端)会通过用户配置的配置中心去寻找service.vgroupMapping.[事务分组配置项]
vgroup-mapping:
# 事务分组配置项[mygroup]对应的值为TC集群名[default],与Seata-Server中的seata.registry.nacos.cluster配置一致
mygroup : default
# 全局事务开关,默认false。false为开启,true为关闭
disable-global-transaction: false
client:
rm:
report-success-enable: true
management:
endpoints:
web:
exposure:
include: '*'
logging:
level:
io.seata: info
# mybatis-plus配置控制台打印完整带参数SQL语句
mybatis-plus:
configuration:
log-impl: org.apache.ibatis.logging.stdout.StdOutImpl
状态机文件src/main/resources/statelang/order.json
:
{
"nodes": [
{
"type": "node",
"size": "80*72",
"shape": "flow-rhombus",
"color": "#13C2C2",
"label": "AccountService-deduct-Choice",
"stateId": "AccountService-deduct-Choice",
"stateType": "Choice",
"x": 467.875,
"y": 286.5,
"id": "c11238b3",
"stateProps": {
"Type": "Choice",
"Choices": [
{
"Expression": "[deductResult] == true",
"Next": "StorageService-deduct"
}
],
"Default": "Fail"
},
"index": 6
},
{
"type": "node",
"size": "39*39",
"shape": "flow-circle",
"color": "red",
"label": "BService-save-catch",
"stateId": "BService-save-catch",
"stateType": "Catch",
"x": 524.875,
"y": 431.5,
"id": "053ac3ac",
"index": 7
},
{
"type": "node",
"size": "72*72",
"shape": "flow-circle",
"color": "#FA8C16",
"label": "Start",
"stateId": "Start",
"stateType": "Start",
"stateProps": {
"StateMachine": {
"Name": "order",
"Comment": "经典的分布式调用",
"Version": "0.0.1"
},
"Next": "AService"
},
"x": 467.875,
"y": 53,
"id": "973bd79e",
"index": 11
},
{
"type": "node",
"size": "110*48",
"shape": "flow-rect",
"color": "#1890FF",
"label": "AccountService-deduct",
"stateId": "AccountService-deduct",
"stateType": "ServiceTask",
"stateProps": {
"Type": "ServiceTask",
"ServiceName": "accountService",
"Next": "AccountService-deduct-Choice",
"ServiceMethod": "deduct",
"Input": [
"$.[businessKey]",
"$.[userId]",
"$.[commodityCode]",
"$.[count]"
],
"Output": {
"deductResult": "$.#root"
},
"Status": {
"#root == true": "SU",
"#root == false": "FA",
"$Exception{java.lang.Throwable}": "UN"
},
"CompensateState": "AccountService-compensateDeduct",
"Retry": []
},
"x": 467.875,
"y": 172,
"id": "e17372e4",
"index": 12
},
{
"type": "node",
"size": "110*48",
"shape": "flow-rect",
"color": "#1890FF",
"label": "StorageService-deduct",
"stateId": "StorageService-deduct",
"stateType": "ServiceTask",
"stateProps": {
"Type": "ServiceTask",
"ServiceName": "storageService",
"ServiceMethod": "deduct",
"CompensateState": "StorageService- compensateDeduct",
"Input": [
"$.[businessKey]",
"$.[userId]",
"$.[commodityCode]",
"$.[count]"
],
"Output": {
"deductResult": "$.#root"
},
"Status": {
"#root == true": "SU",
"#root == false": "FA",
"$Exception{java.lang.Throwable}": "UN"
},
"Next": "StorageService-deduct-Choice"
},
"x": 467.125,
"y": 411,
"id": "a6c40952",
"index": 13
},
{
"type": "node",
"size": "110*48",
"shape": "flow-capsule",
"color": "#722ED1",
"label": "AccountService-compensateDeduct",
"stateId": "AccountService-compensateDeduct",
"stateType": "Compensation",
"stateProps": {
"Type": "Compensation",
"ServiceName": "accountService",
"ServiceMethod": "compensateDeduct",
"Input": [
"$.[businessKey]",
"$.[userId]",
"$.[commodityCode]",
"$.[count]"
]
},
"x": 260.625,
"y": 172.5,
"id": "3b348652",
"index": 14
},
{
"type": "node",
"size": "110*48",
"shape": "flow-capsule",
"color": "#722ED1",
"label": "StorageService-compensateDeduct",
"stateId": "StorageService-compensateDeduct",
"stateType": "Compensation",
"stateProps": {
"Type": "Compensation",
"ServiceName": "storageService",
"ServiceMethod": "compensateDeduct",
"Input": [
"$.[businessKey]",
"$.[userId]",
"$.[commodityCode]",
"$.[count]"
]
},
"x": 262.125,
"y": 411,
"id": "13b600b1",
"index": 15
},
{
"type": "node",
"size": "72*72",
"shape": "flow-circle",
"color": "#05A465",
"label": "Succeed",
"stateId": "Succeed",
"stateType": "Succeed",
"x": 466.625,
"y": 795,
"id": "690e5c5e",
"stateProps": {
"Type": "Succeed"
},
"index": 16
},
{
"type": "node",
"size": "110*48",
"shape": "flow-capsule",
"color": "red",
"label": "Compensation\nTrigger",
"stateId": "CompensationTrigger",
"stateType": "CompensationTrigger",
"x": 881.625,
"y": 430.5,
"id": "757e057f",
"stateProps": {
"Type": "CompensationTrigger",
"Next": "Fail"
},
"index": 17
},
{
"type": "node",
"size": "72*72",
"shape": "flow-circle",
"color": "red",
"label": "Fail",
"stateId": "Fail",
"stateType": "Fail",
"stateProps": {
"Type": "Fail",
"ErrorCode": "FAILED",
"Message": "buy failed"
},
"x": 881.125,
"y": 285.5,
"id": "0131fc0c",
"index": 18
},
{
"type": "node",
"size": "39*39",
"shape": "flow-circle",
"color": "red",
"label": "AccountService-deduct-catch",
"stateId": "AccountService-deduct-catch",
"stateType": "Catch",
"x": 518.125,
"y": 183,
"id": "0955401d"
},
{
"type": "node",
"size": "80*72",
"shape": "flow-rhombus",
"color": "#13C2C2",
"label": "StorageService-deduct-Choice",
"stateId": "StorageService-deduct-Choice",
"stateType": "Choice",
"x": 466.875,
"y": 545.5,
"id": "27978f5d",
"stateProps": {
"Type": "Choice",
"Choices": [
{
"Expression": "[deductResult] == true",
"Next": "OrderService-createOrder"
}
],
"Default": "Fail"
}
},
{
"type": "node",
"size": "110*48",
"shape": "flow-rect",
"color": "#1890FF",
"label": "OrderService-createOrder",
"stateId": "OrderService-createOrder",
"stateType": "ServiceTask",
"stateProps": {
"Type": "ServiceTask",
"ServiceName": "orderService",
"ServiceMethod": "createOrder",
"CompensateState": "OrderService- compensateOrder",
"Input": [
"$.[businessKey]",
"$.[userId]",
"$.[commodityCode]",
"$.[count]"
],
"Output": {
"createOrderResult": "$.#root"
},
"Status": {
"#root == true": "SU",
"#root == false": "FA",
"$Exception{java.lang.Throwable}": "UN"
},
"Next": "Succeed"
},
"x": 466.625,
"y": 676,
"id": "9351460d"
},
{
"type": "node",
"size": "110*48",
"shape": "flow-capsule",
"color": "#722ED1",
"label": "OrderService-compensateOrder",
"stateId": "OrderService-compensateOrder",
"stateType": "Compensation",
"stateProps": {
"Type": "Compensation",
"ServiceName": "orderService",
"ServiceMethod": "compensateOrder",
"Input": [
"$.[businessKey]",
"$.[userId]",
"$.[commodityCode]",
"$.[count]"
]
},
"x": 261.625,
"y": 675.5,
"id": "b2789952"
},
{
"type": "node",
"size": "39*39",
"shape": "flow-circle",
"color": "red",
"label": "OrderService-createOrder-catch",
"stateId": "OrderService-createOrder-catch",
"stateType": "Catch",
"x": 523.125,
"y": 696,
"id": "466cf242"
}
],
"edges": [
{
"source": "973bd79e",
"sourceAnchor": 2,
"target": "e17372e4",
"targetAnchor": 0,
"id": "f0a9008f",
"index": 0
},
{
"source": "e17372e4",
"sourceAnchor": 2,
"target": "c11238b3",
"targetAnchor": 0,
"id": "cd8c3104",
"index": 2,
"label": "执行结果",
"shape": "flow-smooth"
},
{
"source": "c11238b3",
"sourceAnchor": 2,
"target": "a6c40952",
"targetAnchor": 0,
"id": "e47e49bc",
"stateProps": {},
"label": "执行成功",
"shape": "flow-smooth",
"index": 3
},
{
"source": "c11238b3",
"sourceAnchor": 1,
"target": "0131fc0c",
"targetAnchor": 3,
"id": "e3f9e775",
"stateProps": {},
"label": "执行失败",
"shape": "flow-smooth",
"index": 4
},
{
"source": "053ac3ac",
"sourceAnchor": 1,
"target": "757e057f",
"targetAnchor": 3,
"id": "3f7fe6ad",
"stateProps": {
"Exceptions": [
"java.lang.Throwable"
],
"Next": "CompensationTrigger"
},
"label": "StorageService-deduct异常触发补偿",
"shape": "flow-smooth",
"index": 5
},
{
"source": "e17372e4",
"sourceAnchor": 3,
"target": "3b348652",
"targetAnchor": 1,
"id": "52a2256e",
"style": {
"lineDash": "4"
},
"index": 8,
"label": "",
"shape": "flow-smooth"
},
{
"source": "a6c40952",
"sourceAnchor": 3,
"target": "13b600b1",
"targetAnchor": 1,
"id": "474512d9",
"style": {
"lineDash": "4"
},
"index": 9
},
{
"source": "757e057f",
"sourceAnchor": 0,
"target": "0131fc0c",
"targetAnchor": 2,
"id": "1abf48fa",
"index": 10
},
{
"source": "0955401d",
"sourceAnchor": 1,
"target": "757e057f",
"targetAnchor": 1,
"id": "654280aa",
"shape": "flow-polyline-round",
"stateProps": {
"Exceptions": [
"java.lang.Throwable"
],
"Next": "CompensationTrigger"
},
"label": "AccountService-deduct异常触发补偿"
},
{
"source": "a6c40952",
"sourceAnchor": 2,
"target": "27978f5d",
"targetAnchor": 0,
"id": "f25a12eb",
"shape": "flow-polyline-round",
"label": "执行结果"
},
{
"source": "27978f5d",
"sourceAnchor": 2,
"target": "9351460d",
"targetAnchor": 0,
"id": "99d78285",
"shape": "flow-smooth",
"stateProps": {},
"label": "执行成功"
},
{
"source": "9351460d",
"sourceAnchor": 2,
"target": "690e5c5e",
"targetAnchor": 0,
"id": "82670a92",
"shape": "flow-polyline-round"
},
{
"source": "9351460d",
"sourceAnchor": 3,
"target": "b2789952",
"targetAnchor": 1,
"id": "5db6a545",
"shape": "flow-polyline-round",
"style": {
"lineDash": "4",
"endArrow": false
},
"type": "Compensation"
},
{
"source": "466cf242",
"sourceAnchor": 1,
"target": "757e057f",
"targetAnchor": 2,
"id": "a9f55df2",
"shape": "flow-polyline-round",
"stateProps": {
"Exceptions": [
"java.lang.Throwable"
],
"Next": "CompensationTrigger"
},
"label": "OrderService-createOrder异常触发补偿"
},
{
"source": "27978f5d",
"sourceAnchor": 1,
"target": "0131fc0c",
"targetAnchor": 1,
"id": "c303cae6",
"shape": "flow-polyline-round",
"stateProps": {},
"label": "执行失败"
}
]
}
2.6.3 功能搭建
2.6.3.1 启动类
com/gm/seata/openfeign/OrderSagaApplication.java
:
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.client.discovery.EnableDiscoveryClient;
import org.springframework.cloud.openfeign.EnableFeignClients;
@SpringBootApplication
@EnableDiscoveryClient
@EnableFeignClients("com.gm.seata.openfeign.feign")
public class OrderSagaApplication {
public static void main(String[] args) {
SpringApplication.run(OrderSagaApplication.class, args);
}
}
2.6.3.2 状态机配置类
com/gm/seata/openfeign/config/StateMachineEngineConfig.java
:
import io.seata.saga.engine.config.DbStateMachineConfig;
import io.seata.saga.engine.impl.ProcessCtrlStateMachineEngine;
import io.seata.saga.rm.StateMachineEngineHolder;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import javax.sql.DataSource;
import java.util.concurrent.ThreadPoolExecutor;
/**
* @Description 状态机配置
*/
@Configuration
public class StateMachineEngineConfig {
@Autowired
DataSource dataSource;
@Bean
public DbStateMachineConfig dbStateMachineConfig() {
DbStateMachineConfig stateMachineConfig = new DbStateMachineConfig();
stateMachineConfig.setDataSource(dataSource);
stateMachineConfig.setResources(new String[]{"statelang/*.json"});
stateMachineConfig.setEnableAsync(true);
stateMachineConfig.setThreadPoolExecutor(threadExecutor());
stateMachineConfig.setApplicationId("seata-server");
stateMachineConfig.setTxServiceGroup("mygroup");
return stateMachineConfig;
}
@Bean
public ProcessCtrlStateMachineEngine stateMachineEngine() {
ProcessCtrlStateMachineEngine processCtrlStateMachineEngine = new ProcessCtrlStateMachineEngine();
processCtrlStateMachineEngine.setStateMachineConfig(dbStateMachineConfig());
return processCtrlStateMachineEngine;
}
@Bean
public StateMachineEngineHolder stateMachineEngineHolder() {
StateMachineEngineHolder engineHolder = new StateMachineEngineHolder();
engineHolder.setStateMachineEngine(stateMachineEngine());
return engineHolder;
}
@Bean
public ThreadPoolExecutor threadExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
//配置核心线程数
executor.setCorePoolSize(1);
//配置最大线程数
executor.setMaxPoolSize(20);
//配置队列大小
executor.setQueueCapacity(99999);
//配置线程池中的线程的名称前缀
executor.setThreadNamePrefix("SAGA_ASYNC_EXE_");
// 设置拒绝策略:当pool已经达到max size的时候,如何处理新任务
// CALLER_RUNS:不在新线程中执行任务,而是有调用者所在的线程来执行
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
//执行初始化
executor.initialize();
return executor.getThreadPoolExecutor();
}
}
2.6.3.3 Mapper类
com/gm/seata/openfeign/mapper/OrderMapper.java
:
import com.baomidou.mybatisplus.core.mapper.BaseMapper;
import com.gm.seata.openfeign.entity.Order;
import org.apache.ibatis.annotations.Mapper;
@Mapper
public interface OrderMapper extends BaseMapper<Order> {
}
2.6.3.4 Service类
com/gm/seata/openfeign/service/AccountService.java
:
public interface AccountService {
/**
* 扣除账户余额
*
* @param businessKey
* @param userId
* @param commodityCode
* @param count
* @return
*/
boolean deduct(String businessKey, String userId, String commodityCode, Integer count);
/**
* 补偿账户余额
*
* @param businessKey
* @param userId
* @param commodityCode
* @param count
* @return
*/
boolean compensateDeduct(String businessKey, String userId, String commodityCode, Integer count);
}
com/gm/seata/openfeign/service/OrderService.java
:
public interface OrderService {
/**
* 创建订单
*
* @param businessKey
* @param userId
* @param commodityCode
* @param count
* @return
*/
boolean createOrder(String businessKey, String userId, String commodityCode, Integer count);
/**
* 补偿(移除)已创建订单
*
* @param businessKey
* @param userId
* @param commodityCode
* @param count
* @return
*/
boolean compensateOrder(String businessKey, String userId, String commodityCode, Integer count);
}
com/gm/seata/openfeign/service/StorageService.java
:
public interface StorageService {
/**
* 扣除库存
*
* @param businessKey
* @param userId
* @param commodityCode
* @param count
* @return
*/
boolean deduct(String businessKey, String userId, String commodityCode, Integer count);
/**
* 补充扣除库存
*
* @param businessKey
* @param userId
* @param commodityCode
* @param count
* @return
*/
boolean compensateDeduct(String businessKey, String userId, String commodityCode, Integer count);
}
com/gm/seata/openfeign/service/impl/AccountServiceImpl.java
:
import com.gm.seata.openfeign.feign.AccountServiceApi;
import com.gm.seata.openfeign.service.AccountService;
import com.gm.seata.openfeign.util.R;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import java.math.BigDecimal;
@Slf4j
@Service(value = "accountService")
public class AccountServiceImpl implements AccountService {
@Autowired
AccountServiceApi accountServiceApi;
@Override
public boolean deduct(String businessKey, String userId, String commodityCode, Integer count) {
log.info("saga状态机调用AccountService的deduct方法,参数businessKey={},userId={},commodityCode={},count={}", businessKey, userId, commodityCode, count);
try {
R<Boolean> result = accountServiceApi.deduct(businessKey, userId, new BigDecimal(count * 100.0));
return result.getData();
} catch (Exception e) {
throw new RuntimeException(e.getMessage());
}
}
@Override
public boolean compensateDeduct(String businessKey, String userId, String commodityCode, Integer count) {
log.info("saga状态机调用AccountService的compensateDeduct方法,参数businessKey={},userId={},commodityCode={},count={}", businessKey, userId, commodityCode, count);
try {
R<Boolean> result = accountServiceApi.compensateDeduct(businessKey, userId, new BigDecimal(count * 100.0));
return result.getData();
} catch (Exception e) {
throw new RuntimeException(e.getMessage());
}
}
}
注意:通过
@Service(value = "accountService")
设置服务的beanId
,定义正常业务及补偿业务发起远程服务调用
com/gm/seata/openfeign/service/impl/OrderServiceImpl.java
:
import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
import com.gm.seata.openfeign.entity.Order;
import com.gm.seata.openfeign.mapper.OrderMapper;
import com.gm.seata.openfeign.service.OrderService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import java.math.BigDecimal;
@Slf4j
@Service(value = "orderService")
public class OrderServiceImpl implements OrderService {
@Autowired
OrderMapper orderMapper;
@Override
public boolean createOrder(String businessKey, String userId, String commodityCode, Integer count) {
log.info("saga状态机调用OrderService的createOrder方法,参数businessKey={},userId={},commodityCode={},count={}", businessKey, userId, commodityCode, count);
Order order = new Order();
order.setCount(count);
order.setCommodityCode(commodityCode);
order.setUserId(userId);
order.setMoney(new BigDecimal(count * 100.0));
int i = orderMapper.insert(order);
log.info("{} 用户购买的 {} 商品共计 {} 件,订单创建成功", userId, commodityCode, count);
return i == 1;
}
@Override
public boolean compensateOrder(String businessKey, String userId, String commodityCode, Integer count) {
log.info("saga状态机调用OrderService的compensateOrder方法,参数businessKey={},userId={},commodityCode={},count={}", businessKey, userId, commodityCode, count);
QueryWrapper query = new QueryWrapper();
query.eq("commodity_code", commodityCode);
query.eq("userId", commodityCode);
query.eq("Money", new BigDecimal(count * 100.0));
int i = orderMapper.delete(query);
log.info("{} 用户购买的 {} 商品共计 {} 件,订单补充(移除)成功", userId, commodityCode, count);
return i == 1;
}
}
com/gm/seata/openfeign/service/impl/StorageServiceImpl.java
:
import com.gm.seata.openfeign.feign.StorageServiceApi;
import com.gm.seata.openfeign.service.StorageService;
import com.gm.seata.openfeign.util.R;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
@Slf4j
@Service(value = "storageService")
public class StorageServiceImpl implements StorageService {
@Autowired
StorageServiceApi storageService;
@Override
public boolean deduct(String businessKey, String userId, String commodityCode, Integer count) {
log.info("saga状态机调用StorageService的deduct方法,参数businessKey={},userId={},commodityCode={},count={}", businessKey, userId, commodityCode, count);
try {
R<Boolean> result = storageService.deduct(businessKey, commodityCode, count);
return result.getData();
} catch (Exception e) {
throw new RuntimeException(e.getMessage());
}
}
@Override
public boolean compensateDeduct(String businessKey, String userId, String commodityCode, Integer count) {
log.info("saga状态机调用StorageService的compensateDeduct方法,参数businessKey={},userId={},commodityCode={},count={}", businessKey, userId, commodityCode, count);
try {
R<Boolean> result = storageService.compensateDeduct(businessKey, commodityCode, count);
return result.getData();
} catch (Exception e) {
throw new RuntimeException(e.getMessage());
}
}
}
2.6.3.5 Controller类
com/gm/seata/openfeign/controller/OrderController.java
:
import com.gm.seata.openfeign.util.ErrorEnum;
import com.gm.seata.openfeign.util.R;
import io.seata.saga.engine.StateMachineEngine;
import io.seata.saga.statelang.domain.ExecutionStatus;
import io.seata.saga.statelang.domain.StateMachineInstance;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
import java.util.HashMap;
import java.util.Map;
@Slf4j
@RestController
public class OrderController {
@Autowired
StateMachineEngine stateMachineEngine;
/**
* 商品下单购买
*
* @param userId
* @param commodityCode
* @param count
* @return
*/
@RequestMapping(value = "buy", method = RequestMethod.GET)
public R<String> buy(@RequestParam("userId") String userId, @RequestParam("commodityCode") String commodityCode, @RequestParam("count") Integer count) {
//唯一健
String businessKey = String.valueOf(System.currentTimeMillis());
Map<String, Object> startParams = new HashMap<>();
startParams.put("businessKey", businessKey);
startParams.put("userId", userId);
startParams.put("commodityCode", commodityCode);
startParams.put("count", count);
//同步执行
StateMachineInstance inst = stateMachineEngine.startWithBusinessKey("order", null, businessKey, startParams);
if (ExecutionStatus.SU.equals(inst.getStatus())) {
log.info("成功,saga transaction execute Succeed. XID: {},Status: {},CompensationStatus: {}", inst.getId(), inst.getStatus(), inst.getCompensationStatus());
return R.ok("下单成功");
} else {
log.info("成功,saga transaction execute Succeed. XID: {},Status: {},CompensationStatus: {}", inst.getId(), inst.getStatus(), inst.getCompensationStatus());
try {
int code = Integer.parseInt(inst.getException().getMessage());
return R.restResult("下单失败", code, ErrorEnum.getEnumByCode(code).getTitle());
} catch (Exception e) {
return R.failed("下单失败", inst.getException().getMessage());
}
}
}
}
注意:通过
stateMachineEngine.startWithBusinessKey("order", null, businessKey, startParams);
开启状态机
更多状态机
API
请见官网:http://seata.io/zh-cn/docs/user/saga.html
三、示例测试
3.1 正常下单
由2.1章节可知:账户余额500,库存3,每件商品单价100元。
请求地址:http://127.0.0.1:3012/buy?userId=user1&count=2&commodityCode=iphone
每请求一次,扣除余额200元,扣除库存2个,已知可正常下单1次
order-saga
系统日志:
http
请求返回:
数据库记录:
3.1 余额或库存下单失败
storage-saga
系统日志:
order-saga
系统日志:
全局事务回滚成功
数据库记录: