SpringCloud集成Seata saga模式案例

news2025/1/11 22:42:18

文章目录

  • 一、前言
  • 二、Seata saga模式介绍
    • 1、示例状态图
    • 2、“状态机”介绍
      • 1)“状态机”属性
      • 2)“状态”属性
      • 3)更多状态相关内容
  • 三、SpringCloud 集成 seata saga
    • 1、saga模式状态机相关信息
      • 1)状态机配置相关的三个表
      • 2)状态图
    • 2、项目代码
      • 0)pom.xml
      • 1)线程池配置 -- MyThreadFactory
      • 2)seata saga相关配置 -- SagaConfiguration
      • 3)库存服务 -- InventoryService
        • InventoryServiceImpl
      • 4)账户余额服务 -- BalanceService
        • BalanceServiceImpl
      • 5)启动类 -- SagaTradeApplication
      • 6) 状态图对应的JSON文件 -- reduce_inventory_and_balance.json
        • 状态图流程解析
      • 7)application.yml
      • 8)file.conf
      • 9)开启状态机入口 -- TradeController
    • 3、测试 / 验证
      • 1)启动seata-server服务;
      • 2)启动seata-client(saga-trade)
      • 3)事务提交
      • 4)事务回滚
  • 三、总结

一、前言

更多内容见Seata专栏:https://blog.csdn.net/saintmm/category_11953405.html

至此,seata系列的内容已出:

  1. can not get cluster name in registry config ‘service.vgroupMapping.xx‘, please make sure registry问题解决;
  2. Seata Failed to get available servers: endpoint format should like ip:port 报错原因/解决方案汇总版(看完本文必解决问题)
  3. Seata json decode exception, Cannot construct instance of java.time.LocalDateTime报错原因/解决方案最全汇总版
  4. 超细的Spring Cloud 整合Seata实现分布式事务(排坑版)
  5. Spring Cloud整合Seata、Nacos实现分布式事务案例(巨细排坑版)【云原生】
  6. 分布式事务Seata源码解析一:在IDEA中启动Seata Server
  7. 分布式事务Seata源码解析二:Seata Server启动时都做了什么
  8. 分布式事务Seata源码解析三:从Spring Boot特性来看Seata Client 启动时都做了什么
  9. 分布式事务Seata源码解析四:图解Seata Client 如何与Seata Server建立连接、通信
  10. 分布式事务Seata源码解析五:@GlobalTransactional如何开启全局事务
  11. 分布式事务Seata源码解析六:全局/分支事务分布式ID如何生成?序列号超了怎么办?时钟回拨问题如何处理?
  12. 分布式事务Seata源码解析七:图解Seata事务执行流程之开启全局事务
  13. 分布式事务Seata源码解析八:本地事务执行流程(AT模式下)
  14. 分布式事务Seata源码解析九:分支事务如何注册到全局事务
  15. 分布式事务Seata源码解析十:AT模式回滚日志undo log详细构建过程
  16. 分布式事务Seata源码解析11:全局事务执行流程之两阶段全局事务提交
  17. 分布式事务Seata源码解析12:全局事务执行流程之全局事务回滚
  18. Spring Cloud整合Seata实现TCC分布式事务模式案例
  19. 分布式事务Seata源码解析13:TCC事务模式实现原理
  20. 分布式事务Seata TCC空回滚/幂等/悬挂问题、解决方案(seata1.5.1如何解决?)
  21. Seata XA模式概述+案例
  22. saga模式、Seata saga模式详介

至此,Seata常用的AT模式、TCC模式 和 XA模式已完结,SAGA模式也已做了基本介绍,本文接着聊Spring Cloud 如何集成Seata saga模式实现全局事务、分支事务

二、Seata saga模式介绍

官方文档地址:https://seata.io/zh-cn/docs/user/saga.html

Seata提供的Saga模式目前只能通过状态机引擎来实现,整体机制为:

  1. 通过状态图来定义服务调用的流程并生成 json 状态语言定义文件;
    • 换言之,需要开发者手工的进行Saga业务流程绘制,并将其转换为JSON配置文件;
  2. 状态图中一个节点可以是调用一个服务,节点可以配置它的补偿节点;
    • 注意: 异常发生时是否进行补偿也可由用户自定义决定,可以选择不配置;
  3. 状态图 json 由状态机引擎驱动执行,当出现异常时状态引擎反向执行已成功节点对应的补偿节点将事务回滚;
    • 在程序启动时,会根据saga状态图加载业务处理流程(包括:服务补偿处理);
  4. 可以实现服务编排需求,支持单项选择、并发、子流程、参数转换、参数映射、服务执行状态判断、异常捕获等功能;

1、示例状态图

在这里插入图片描述

2、“状态机”介绍

seata saga的状态语言在一定程度上参考了AWS Step Functions

1)“状态机”属性

  • Name: 表示状态机的名称,必须唯一
  • Comment: 状态机的描述
  • Version: 状态机定义版本
  • StartState: 启动时运行的第一个"状态"
  • States: 状态列表,是一个map结构,key是"状态"的名称,在状态机内必须唯一
  • IsRetryPersistModeUpdate: 向前重试时, 日志是否基于上次失败日志进行更新
  • IsCompensatePersistModeUpdate: 向后补偿重试时, 日志是否基于上次补偿日志进行更新

2)“状态”属性

  1. Type: “状态” 的类型,比如有:
    • ServiceTask: 执行调用服务任务
    • Choice: 单条件选择路由
    • CompensationTrigger: 触发补偿流程
    • Succeed: 状态机正常结束
    • Fail: 状态机异常结束
    • SubStateMachine: 调用子状态机
    • CompensateSubMachine: 用于补偿一个子状态机
  2. ServiceName: 服务名称,通常是服务的beanId(也就是Spring容器中的beanName)
    • 无论是SpringCloud,还是Dubbo、HSF…,最重要的就是配置这个beanId。
  3. ServiceMethod: 服务方法名称(也就是:Spring Bean中的某个方法名)
  4. CompensateState: 该"状态"的补偿"状态"
  5. Loop: 标识该事务节点是否为循环事务, 即由框架本身根据循环属性的配置, 遍历集合元素对该事务节点进行循环执行
  6. Input: 调用服务的输入参数列表, 是一个数组, 对应于服务方法的参数列表, $.表示使用表达式从状态机上下文中取参数,表达使用 SpringEL, 如果是常量直接写值即可
  7. Ouput: 将服务返回的参数赋值到状态机上下文中, 是一个map结构,key为放入到状态机上文时的key(状态机上下文也是一个map),value中$.是表示SpringEL表达式,表示从服务的返回参数中取值,#root表示服务的整个返回参数
  8. Status: 服务执行状态映射,框架定义了三个状态,SU 成功、FA 失败、UN 未知, 我们需要把服务执行的状态映射成这三个状态,帮助框架判断整个事务的一致性,是一个map结构,key是条件表达式,一般是取服务的返回值或抛出的异常进行判断,默认是SpringEL表达式判断服务返回参数,带$Exception{开头表示判断异常类型。value是当这个条件表达式成立时则将服务执行状态映射成这个值
  9. Catch: 捕获到异常后的路由
  10. Next: 服务执行完成后下一个执行的"状态"
  11. Choices: Choice类型的"状态"里, 可选的分支列表, 分支中的Expression为SpringEL表达式, Next为当表达式成立时执行的下一个"状态"
  12. ErrorCode: Fail类型"状态"的错误码
  13. Message: Fail类型"状态"的错误信息

3)更多状态相关内容

更多详细的状态语言使用示例见github:
https://github.com/seata/seata/tree/develop/test/src/test/java/io/seata/saga/engine

三、SpringCloud 集成 seata saga

官方提供的saga案例地址:https://github.com/seata/seata-samples/tree/master/saga

在这里插入图片描述

然而并没有提供SpringCloud与saga模式集成的案例;以下介绍SpringCloud与saga模式集成的案例。

1、saga模式状态机相关信息

1)状态机配置相关的三个表

首先,我们需要 在使用状态机开启saga分支事务的 服务对应的数据库连接中创建三个表(以MYSQL为例):

CREATE TABLE IF NOT EXISTS `seata_state_machine_def`
(
    `id`               VARCHAR(32)  NOT NULL COMMENT 'id',
    `name`             VARCHAR(128) NOT NULL COMMENT 'name',
    `tenant_id`        VARCHAR(32)  NOT NULL COMMENT 'tenant id',
    `app_name`         VARCHAR(32)  NOT NULL COMMENT 'application name',
    `type`             VARCHAR(20)  COMMENT 'state language type',
    `comment_`         VARCHAR(255) COMMENT 'comment',
    `ver`              VARCHAR(16)  NOT NULL COMMENT 'version',
    `gmt_create`       DATETIME(3)  NOT NULL COMMENT 'create time',
    `status`           VARCHAR(2)   NOT NULL COMMENT 'status(AC:active|IN:inactive)',
    `content`          TEXT COMMENT 'content',
    `recover_strategy` VARCHAR(16) COMMENT 'transaction recover strategy(compensate|retry)',
    PRIMARY KEY (`id`)
) ENGINE = InnoDB
  DEFAULT CHARSET = utf8mb4;

CREATE TABLE IF NOT EXISTS `seata_state_machine_inst`
(
    `id`                  VARCHAR(128)            NOT NULL COMMENT 'id',
    `machine_id`          VARCHAR(32)             NOT NULL COMMENT 'state machine definition id',
    `tenant_id`           VARCHAR(32)             NOT NULL COMMENT 'tenant id',
    `parent_id`           VARCHAR(128) COMMENT 'parent id',
    `gmt_started`         DATETIME(3)             NOT NULL COMMENT 'start time',
    `business_key`        VARCHAR(48) COMMENT 'business key',
    `start_params`        TEXT COMMENT 'start parameters',
    `gmt_end`             DATETIME(3) COMMENT 'end time',
    `excep`               BLOB COMMENT 'exception',
    `end_params`          TEXT COMMENT 'end parameters',
    `status`              VARCHAR(2) COMMENT 'status(SU succeed|FA failed|UN unknown|SK skipped|RU running)',
    `compensation_status` VARCHAR(2) COMMENT 'compensation status(SU succeed|FA failed|UN unknown|SK skipped|RU running)',
    `is_running`          TINYINT(1) COMMENT 'is running(0 no|1 yes)',
    `gmt_updated`         DATETIME(3) NOT NULL,
    PRIMARY KEY (`id`),
    UNIQUE KEY `unikey_buz_tenant` (`business_key`, `tenant_id`)
) ENGINE = InnoDB
  DEFAULT CHARSET = utf8mb4;

CREATE TABLE IF NOT EXISTS `seata_state_inst`
(
    `id`                       VARCHAR(48)  NOT NULL COMMENT 'id',
    `machine_inst_id`          VARCHAR(128) NOT NULL COMMENT 'state machine instance id',
    `name`                     VARCHAR(128) NOT NULL COMMENT 'state name',
    `type`                     VARCHAR(20)  COMMENT 'state type',
    `service_name`             VARCHAR(128) COMMENT 'service name',
    `service_method`           VARCHAR(128) COMMENT 'method name',
    `service_type`             VARCHAR(16) COMMENT 'service type',
    `business_key`             VARCHAR(48) COMMENT 'business key',
    `state_id_compensated_for` VARCHAR(50) COMMENT 'state compensated for',
    `state_id_retried_for`     VARCHAR(50) COMMENT 'state retried for',
    `gmt_started`              DATETIME(3)  NOT NULL COMMENT 'start time',
    `is_for_update`            TINYINT(1) COMMENT 'is service for update',
    `input_params`             TEXT COMMENT 'input parameters',
    `output_params`            TEXT COMMENT 'output parameters',
    `status`                   VARCHAR(2)   NOT NULL COMMENT 'status(SU succeed|FA failed|UN unknown|SK skipped|RU running)',
    `excep`                    BLOB COMMENT 'exception',
    `gmt_updated`              DATETIME(3) COMMENT 'update time',
    `gmt_end`                  DATETIME(3) COMMENT 'end time',
    PRIMARY KEY (`id`, `machine_inst_id`)
) ENGINE = InnoDB
  DEFAULT CHARSET = utf8mb4;

数据库表的出处,见seata官方地址:https://github.com/seata/seata/blob/1.5.2/script/client/saga/db/mysql.sql

在这里插入图片描述

2)状态图

状态机设计器演示(在线画图工具)地址:http://seata.io/saga_designer/index.html

在这里插入图片描述

2、项目代码

整体代码结构:

在这里插入图片描述

此处案例和saga官方提供的一样,仅示范saga模式的使用,不涉及RPC、业务表操作,若读者想丰富案例,可在笔者的todo标注处自行添加。

0)pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.3.12.RELEASE</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>
    <modelVersion>4.0.0</modelVersion>
    <version>0.0.1-SNAPSHOT</version>
    <groupId>com.saint</groupId>
    <artifactId>saga-trade</artifactId>

        <properties>
        <java.version>1.8</java.version>
        <druid.version>1.2.8</druid.version>
        <mysql.version>8.0.22</mysql.version>
        <!--seata1.5.2 版本源码验证-->
        <spring-boot.version>2.3.12.RELEASE</spring-boot.version>
        <spring-cloud.version>Hoxton.SR12</spring-cloud.version>
        <spring-cloud-alibaba.version>2.2.9.RELEASE</spring-cloud-alibaba.version>
        <druid.version>1.2.8</druid.version>
        <mysql.version>8.0.22</mysql.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <scope>runtime</scope>
        </dependency>
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>druid-spring-boot-starter</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-data-jpa</artifactId>
        </dependency>
        <dependency>
            <groupId>com.alibaba.cloud</groupId>
            <artifactId>spring-cloud-starter-alibaba-seata</artifactId>
        </dependency>
        <dependency>
            <groupId>org.apache.commons</groupId>
            <artifactId>commons-lang3</artifactId>
            <version>3.12.0</version>
        </dependency>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <optional>true</optional>
        </dependency>
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>2.0.10</version>
        </dependency>
    </dependencies>

    <dependencyManagement>

        <dependencies>
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-dependencies</artifactId>
                <version>${spring-boot.version}</version>
                <type>pom</type>
                <scope>import</scope>
            </dependency>
            <!--整合spring cloud-->
            <dependency>
                <groupId>org.springframework.cloud</groupId>
                <artifactId>spring-cloud-dependencies</artifactId>
                <version>${spring-cloud.version}</version>
                <type>pom</type>
                <scope>import</scope>
            </dependency>
            <!--整合spring cloud alibaba-->
            <dependency>
                <groupId>com.alibaba.cloud</groupId>
                <artifactId>spring-cloud-alibaba-dependencies</artifactId>
                <version>${spring-cloud-alibaba.version}</version>
                <type>pom</type>
                <scope>import</scope>
            </dependency>

            <dependency>
                <groupId>com.alibaba</groupId>
                <artifactId>druid-spring-boot-starter</artifactId>
                <version>${druid.version}</version>
            </dependency>

            <dependency>
                <groupId>mysql</groupId>
                <artifactId>mysql-connector-java</artifactId>
                <version>${mysql.version}</version>
            </dependency>
        </dependencies>
    </dependencyManagement>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
                <version>${spring-boot.version}</version>
                <configuration>
                    <excludes>
                        <exclude>
                            <groupId>org.projectlombok</groupId>
                            <artifactId>lombok</artifactId>
                        </exclude>
                    </excludes>
                </configuration>
            </plugin>
        </plugins>
    </build>
</project>

1)线程池配置 – MyThreadFactory

import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicInteger;

/**
 * 自定义线程工厂
 */
public class MyThreadFactory implements ThreadFactory {

    private final AtomicInteger threadNumber;
    private ThreadGroup group;
    private String namePrefix;

    public MyThreadFactory(String namePrefix) {
        this.threadNumber = new AtomicInteger(1);
        SecurityManager s = System.getSecurityManager();
        this.group = s != null ? s.getThreadGroup() : Thread.currentThread().getThreadGroup();
        this.namePrefix = namePrefix + "_THREAD_";
    }

    @Override
    public Thread newThread(Runnable r) {
        Thread t = new Thread(this.group, r, this.namePrefix + this.threadNumber.getAndIncrement(), 0L);
        return t;
    }
}

2)seata saga相关配置 – SagaConfiguration

import com.alibaba.druid.pool.DruidDataSource;
import io.seata.saga.engine.config.DbStateMachineConfig;
import io.seata.saga.engine.impl.ProcessCtrlStateMachineEngine;
import io.seata.saga.rm.StateMachineEngineHolder;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import javax.sql.DataSource;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

/**
 * @author Saint
 */
@Configuration
public class SagaConfiguration {

    @Bean
    @ConfigurationProperties(prefix = "spring.datasource")
    public DataSource dataSource() {
        return new DruidDataSource();
    }

    @Bean
    public ThreadPoolExecutor sagaThreadPool() {
        ThreadPoolExecutor executor = new ThreadPoolExecutor(1,
                20,
                30, TimeUnit.SECONDS,
                new LinkedBlockingQueue<>(2000),
                new MyThreadFactory("SAGA_ASYNC_EXE_"),
                new ThreadPoolExecutor.AbortPolicy());
        return executor;
    }

    @Bean
    public DbStateMachineConfig dbStateMachineConfig() {
        DbStateMachineConfig config = new DbStateMachineConfig();
        config.setDataSource(dataSource());
        config.setResources(new String[]{"statelang/*.json"});
        config.setEnableAsync(true);
        config.setApplicationId("saga-trade");
        config.setTxServiceGroup("saint-trade-tx-group");
        config.setThreadPoolExecutor(sagaThreadPool());
        return config;
    }

    @Bean
    public ProcessCtrlStateMachineEngine stateMachineEngine() {
        ProcessCtrlStateMachineEngine engine = new ProcessCtrlStateMachineEngine();
        engine.setStateMachineConfig(dbStateMachineConfig());
        return engine;
    }

    @Bean
    public StateMachineEngineHolder stateMachineEngineHolder() {
        StateMachineEngineHolder holder = new StateMachineEngineHolder();
        holder.setStateMachineEngine(stateMachineEngine());
        return holder;
    }
}

3)库存服务 – InventoryService

InventoryService提供了两个方法:一个reduce()、一个reduce()对应的补偿方法compensateReduce()

package com.saint.saga.trade.service;

/**
 * Inventory Actions
 */
public interface InventoryService {

    /**
     * reduce
     *
     * @param businessKey 业务上的唯一标识
     * @param count
     * @return
     */
    boolean reduce(String businessKey, int count);

    /**
     * increase
     *
     * @param businessKey 业务上的唯一标识
     * @return
     */
    boolean compensateReduce(String businessKey);
}

InventoryServiceImpl

package com.saint.saga.trade.service.impl;

import com.saint.saga.trade.service.InventoryService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Service;

/**
 * 库存
 *
 * @author Saint
 */
@Service(value = "inventoryService")
public class InventoryServiceImpl implements InventoryService {

    private static final Logger LOGGER = LoggerFactory.getLogger(InventoryActionImpl.class);

    @Override
    public boolean reduce(String businessKey, int count) {
        LOGGER.info("reduce inventory succeed, count: " + count + ", businessKey:" + businessKey);
        // todo rpc / http

        return true;
    }

    @Override
    public boolean compensateReduce(String businessKey) {
        LOGGER.info("compensate reduce inventory succeed, businessKey:" + businessKey);
        // todo rpc / http

        return true;
    }
}

4)账户余额服务 – BalanceService

BalanceService提供了两个方法:一个reduce()、一个reduce()对应的补偿方法compensateReduce()

package com.saint.saga.trade.service;

import java.math.BigDecimal;
import java.util.Map;

/**
 * Balance Actions
 */
public interface BalanceService {

    /**
     * reduce
     *
     * @param businessKey 业务上的唯一标识
     * @param amount
     * @param params
     * @return
     */
    boolean reduce(String businessKey, BigDecimal amount, Map<String, Object> params);

    /**
     * compensateReduce
     *
     * @param businessKey 业务上的唯一标识
     * @param params
     * @return
     */
    boolean compensateReduce(String businessKey, Map<String, Object> params);

}

BalanceServiceImpl

package com.saint.saga.trade.service.impl;

import com.saint.saga.trade.service.BalanceService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Service;

import java.math.BigDecimal;
import java.util.Map;

/**
 * 账户余额
 *
 * @author Saint
 */
@Service(value = "balanceService")
public class BalanceServiceImpl implements BalanceService {

    private static final Logger LOGGER = LoggerFactory.getLogger(BalanceServiceImpl.class);

    @Override
    public boolean reduce(String businessKey, BigDecimal amount, Map<String, Object> params) {
        if (params != null) {
            Object throwException = params.get("throwException");
            if (throwException != null && "true".equals(throwException.toString())) {
                throw new RuntimeException("reduce balance failed");
            }
        }
        LOGGER.info("reduce balance succeed, amount: " + amount + ", bizCode:" + businessKey);
        // todo rpc / http

        return true;
    }

    @Override
    public boolean compensateReduce(String businessKey, Map<String, Object> params) {
        if (params != null) {
            Object throwException = params.get("throwException");
            if (throwException != null && "true".equals(throwException.toString())) {
                throw new RuntimeException("compensate reduce balance failed");
            }
        }
        LOGGER.info("compensate reduce balance succeed, businessKey:" + businessKey);
        // todo rpc / http

        return true;
    }
}

5)启动类 – SagaTradeApplication

@SpringBootApplication
public class SagaTradeApplication {
    public static void main(String[] args) {
        ConfigurableApplicationContext run = SpringApplication.run(SagaTradeApplication.class, args);
        InventoryService bean = run.getBean(InventoryService.class);
        BalanceService bean1 = run.getBean(BalanceService.class);
    }
}

6) 状态图对应的JSON文件 – reduce_inventory_and_balance.json

{
  "Name": "reduceInventoryAndBalance",
  "Comment": "reduce inventory then reduce balance in a transaction",
  "StartState": "ReduceInventory",
  "Version": "0.0.1",
  "States": {
    "ReduceInventory": {
      "Type": "ServiceTask",
      "ServiceName": "inventoryService",
      "ServiceMethod": "reduce",
      "CompensateState": "CompensateReduceInventory",
      "Next": "ChoiceState",
      "Input": [
        "$.[businessKey]",
        "$.[count]"
      ],
      "Output": {
        "reduceInventoryResult": "$.#root"
      },
      "Status": {
        "#root == true": "SU",
        "#root == false": "FA",
        "$Exception{java.lang.Throwable}": "UN"
      }
    },
    "ChoiceState": {
      "Type": "Choice",
      "Choices": [
        {
          "Expression": "[reduceInventoryResult] == true",
          "Next": "ReduceBalance"
        }
      ],
      "Default": "Fail"
    },
    "ReduceBalance": {
      "Type": "ServiceTask",
      "ServiceName": "balanceService",
      "ServiceMethod": "reduce",
      "CompensateState": "CompensateReduceBalance",
      "Input": [
        "$.[businessKey]",
        "$.[amount]",
        {
          "throwException": "$.[mockReduceBalanceFail]"
        }
      ],
      "Output": {
        "compensateReduceBalanceResult": "$.#root"
      },
      "Status": {
        "#root == true": "SU",
        "#root == false": "FA",
        "$Exception{java.lang.Throwable}": "UN"
      },
      "Catch": [
        {
          "Exceptions": [
            "java.lang.Throwable"
          ],
          "Next": "CompensationTrigger"
        }
      ],
      "Next": "Succeed"
    },
    "CompensateReduceInventory": {
      "Type": "ServiceTask",
      "ServiceName": "inventoryService",
      "ServiceMethod": "compensateReduce",
      "Input": [
        "$.[businessKey]"
      ]
    },
    "CompensateReduceBalance": {
      "Type": "ServiceTask",
      "ServiceName": "balanceService",
      "ServiceMethod": "compensateReduce",
      "Input": [
        "$.[businessKey]"
      ]
    },
    "CompensationTrigger": {
      "Type": "CompensationTrigger",
      "Next": "Fail"
    },
    "Succeed": {
      "Type": "Succeed"
    },
    "Fail": {
      "Type": "Fail",
      "ErrorCode": "PURCHASE_FAILED",
      "Message": "purchase failed"
    }
  }
}

状态图流程解析

在这里插入图片描述

7)application.yml

server:
  port: 9099
spring:
  application:
    name: saga-trade
  datasource:
    url: jdbc:mysql://127.0.0.1:3306/seata_saga?useUnicode=true&characterEncoding=utf8&allowMultiQueries=true&useSSL=false
    username: root
    password: 123456
    driver-class-name: com.mysql.cj.jdbc.Driver
  jpa:
    show-sql: true

seata:
  tx-service-group: saint-trade-tx-group

8)file.conf

transport {
  # tcp udt unix-domain-socket
  type = "TCP"
  #NIO NATIVE
  server = "NIO"
  #enable heartbeat
  heartbeat = true
  # the client batch send request enable
  enableClientBatchSendRequest = true
  #thread factory for netty
  threadFactory {
    bossThreadPrefix = "NettyBoss"
    workerThreadPrefix = "NettyServerNIOWorker"
    serverExecutorThread-prefix = "NettyServerBizHandler"
    shareBossWorker = false
    clientSelectorThreadPrefix = "NettyClientSelector"
    clientSelectorThreadSize = 1
    clientWorkerThreadPrefix = "NettyClientWorkerThread"
    # netty boss thread size,will not be used for UDT
    bossThreadSize = 1
    #auto default pin or 8
    workerThreadSize = "default"
  }
  shutdown {
    # when destroy server, wait seconds
    wait = 3
  }
  serialization = "seata"
  compressor = "none"
}
service {
  #transaction service group mapping
  vgroupMapping.saint-trade-tx-group = "seata-server-sh"
  #only support when registry.type=file, please don't set multiple addresses
  seata-server-sh.grouplist = "127.0.0.1:8091"
  #degrade, current not support
  enableDegrade = false
  #disable seata
  disableGlobalTransaction = false
}

client {
  rm {
    asyncCommitBufferLimit = 10000
    lock {
      retryInterval = 10
      retryTimes = 30
      retryPolicyBranchRollbackOnConflict = true
    }
    reportRetryCount = 5
    tableMetaCheckEnable = false
    reportSuccessEnable = false
  }
  tm {
    commitRetryCount = 5
    rollbackRetryCount = 5
  }
  undo {
    dataValidation = true
    logSerialization = "jackson"
    logTable = "undo_log"
  }
  log {
    exceptionRate = 100
  }
}

9)开启状态机入口 – TradeController

状态机支持两种执行方式:同步执行、异步执行;

在这里插入图片描述

  • 同步执行API:StateMachineEngine#startWithBusinessKey();
  • 异步执行API:StateMachineEngine#startWithBusinessKeyAsync(…, AsyncCallback)
    • 其中的AsyncCallback为异步执行结束之后的回调函数
package com.saint.saga.trade.controller;

import io.seata.saga.engine.AsyncCallback;
import io.seata.saga.engine.StateMachineEngine;
import io.seata.saga.proctrl.ProcessContext;
import io.seata.saga.statelang.domain.ExecutionStatus;
import io.seata.saga.statelang.domain.StateMachineInstance;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.RestController;

import java.math.BigDecimal;
import java.util.HashMap;
import java.util.Map;

/**
 * @author Saint
 */
@RestController
@RequestMapping("saga")
@Slf4j
public class TradeController {

    @Autowired
    private StateMachineEngine stateMachineEngine;

    /**
     * POST请求 http://localhost:9099/saga/commit?amount=50&count=2
     */
    @RequestMapping(value = "/commit", method = RequestMethod.POST)
    public String commit(Integer amount, Integer count) {
        String businessKey = String.valueOf(System.currentTimeMillis());
        Map<String, Object> startParams = generateStartParams(amount, count, false);

        // 1、sync test
        StateMachineInstance instance = stateMachineEngine.startWithBusinessKey("reduceInventoryAndBalance", null,
                businessKey, startParams);

        // 2、async test
//        StateMachineInstance instance = stateMachineEngine.startWithBusinessKeyAsync("reduceInventoryAndBalance", null, businessKey, startParams,
//                CALL_BACK);
//        waitingForFinish(instance);

        // PS: instance is not null
        if (!ExecutionStatus.SU.equals(instance.getStatus())) {
            log.error("saga transaction execute failed. XID: {}", instance.getId());
            return "rollback";
        }

        log.info("saga transaction commit succeed. XID: {}", instance.getId());

        return "succeed";
    }

    /**
     * POST请求 http://localhost:9099/saga/rollback?amount=50&count=2
     */
    @RequestMapping(value = "/rollback", method = RequestMethod.POST)
    public String rollback(Integer amount, Integer count) {
        String businessKey = String.valueOf(System.currentTimeMillis());
        // unique difference is here
        Map<String, Object> startParams = generateStartParams(amount, count, true);

        // 1、sync test
        StateMachineInstance instance = stateMachineEngine.startWithBusinessKey("reduceInventoryAndBalance", null,
                businessKey, startParams);

        // 2、async test
//        StateMachineInstance instance = stateMachineEngine.startWithBusinessKeyAsync("reduceInventoryAndBalance", null, businessKey, startParams,
//                CALL_BACK);
//        waitingForFinish(instance);

        // PS: instance is not null
        if (!ExecutionStatus.SU.equals(instance.getStatus())) {
            log.error("saga transaction execute failed. XID: {}", instance.getId());
            return "rollback";
        }

        log.info("saga transaction commit succeed. XID: {}", instance.getId());

        return "succeed";
    }

    /**
     * parameters to be used in the state machine(状态机需要用到的参数在这里组装)
     */
    private Map<String, Object> generateStartParams(Integer amount, Integer count, Boolean mockFail) {
        String businessKey = String.valueOf(System.currentTimeMillis());
        Map<String, Object> startParams = new HashMap<>(8);
        startParams.put("businessKey", businessKey);
        startParams.put("count", 10);
        startParams.put("amount", new BigDecimal(String.valueOf(amount)));
        if (mockFail)
            startParams.put("mockReduceBalanceFail", true);
        return startParams;
    }

    private static volatile Object lock = new Object();
    private static AsyncCallback CALL_BACK = new AsyncCallback() {
        @Override
        public void onFinished(ProcessContext context, StateMachineInstance stateMachineInstance) {
            synchronized (lock) {
                lock.notifyAll();
            }
        }

        @Override
        public void onError(ProcessContext context, StateMachineInstance stateMachineInstance, Exception exp) {
            synchronized (lock) {
                lock.notifyAll();
            }
        }
    };

    private static void waitingForFinish(StateMachineInstance inst) {
        synchronized (lock) {
            if (!ExecutionStatus.RU.equals(inst.getStatus()))
                return;
            try {
                lock.wait();
            } catch (InterruptedException e) {
                log.error("occur exception, ", e);
            }
        }
    }
}

3、测试 / 验证

1)启动seata-server服务;

参考博文:超细的Spring Cloud 整合Seata实现分布式事务(排坑版)进行seata-server的配置和启动;

在这里插入图片描述

注意:本文使用的seata版本是1.5.2,切勿使用成参考博文中的1.3.0。

在这里插入图片描述

seata server1.5.2启动成功后控制台输出:

在这里插入图片描述

2)启动seata-client(saga-trade)

在这里插入图片描述

3)事务提交

执行 POST类型请求:http://localhost:9099/saga/commit?amount=50&count=2

在这里插入图片描述

saga-trade控制台输出:

在这里插入图片描述

seata-server日志:

在这里插入图片描述

4)事务回滚

执行 POST类型请求:http://localhost:9099/saga/commit?amount=50&count=2

在这里插入图片描述

saga-trade控制台输出:

在这里插入图片描述

seata-server日志:

在这里插入图片描述

三、总结

seata的saga模式适用于长流程 或 长事务场景。saga模式复杂的地方在于引入状态机,需要自己根据业务定义状态机的流程,然后把定义好的流程用json文件导入到工程中。

此外,saga模式需要开发者自定义回滚事件,并要考虑空补偿、悬挂、幂等三种问题,即:允许空补偿、做防悬挂控制、做幂等控制。读者可以参考TCC模式中的解决方案(分布式事务Seata TCC空回滚/幂等/悬挂问题、解决方案(seata1.5.1如何解决?))实现;

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/423987.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

根据 cadence 设计图学习硬件知识 day01了解腾锐 D2000芯片

1. 首先了解 腾锐 D2000 1.介绍 腾锐D2000 芯片 D2000芯片集成8个飞腾自主研发的新一代高性能处理器内核FTC663&#xff0c;采用乱序四发射超标量流水线&#xff0c;兼容64位ARMV8指令集并支持ARM64和ARM32两种执行模式&#xff0c;支持单精度、双精度浮点运算指令和ASIMD处…

ASP.NET动态Web开发技术第6章

第6章ASP.NET状态管理一.预习笔记 1.ASP.NET状态管理概述 状态管理是在一个网页或者不同网页的多个访问请求中&#xff0c;维护网页状态和信息的过程。 状态管理包含视图状态(ViewState)、控件状态(ControlState)、隐藏域状态(HiddenField)、Cookie、查询字符串(QueryString…

24位AD和16位DA超高精度PID串级控制器在张力控制中的应用

摘要&#xff1a;针对目前张力控制器中普遍存在测量控制精度较差和无法实现串级控制这类高级复杂控制的问题&#xff0c;本文介绍了具有超高精度和多功能的新一代张力控制器。这种新一代张力控制器具有24位AD模数转换、16位DA数模转换、双精度浮点运算和0.01%的最小输出百分比&…

【教程】使用R语言绘制词云图

哈喽&#xff0c;大家好&#xff0c;我是木易巷~ 最近木易巷在了解R语言&#xff0c;今天给大家分享一下使用R语言绘制出词云图的教程。 什么是R语言 R语言是一个开源的数据分析环境&#xff0c;起初是由数位统计学家建立起来&#xff0c;以更好的进行统计计算和绘图。由于R可…

Android之AppWidget 开发浅析

什么是AppWidget AppWidget 即桌面小部件&#xff0c;也叫桌面控件&#xff0c;就是能直接显示在Android系统桌面上的小程序&#xff0c;先看图&#xff1a; 图中我用黄色箭头指示的即为AppWidget&#xff0c;一些用户使用比较频繁的程序&#xff0c;可以做成AppWidget&#x…

分布式系统概念和设计-分布式对象和远程调用

分布式系统概念和设计 分布式对象和远程调用 能够接收远程方法调用的对象称为远程对象&#xff0c;远程对象实现一个远程接口。 调用者和被调用对象分别存在不同的失败可能性&#xff0c;RMI和本地调用有不同的语义。 中间件 在进程和消息传递等基本构造模块之上提供编程模型的…

PDCA循环模型——如何用同样的时间做更多的事?【No.1 】

PDCA循环模型&#xff0c;又称戴明环&#xff0c;是一个持续改进模型。PDCA循环包括以下内容&#xff1a; Plan阶段&#xff1a;确认目标&#xff0c;制定计划Do阶段&#xff1a;执行措施和计划Check阶段&#xff1a;检查验证&#xff0c;评估效果Action阶段&#xff1a;有效措…

健哥MYSQL私房菜 - 基础与介绍

前言 从今天开始, 健哥就带各位小伙伴学习数据库技术。数据库技术是Java开发中必不可少的一部分知识内容。也是非常重要的技术。本系列教程由浅入深, 全面讲解数据库体系。 非常适合零基础的小伙伴来学习。 ------------------------------前戏已做完&#xff0c;精彩即开始---…

docker-compose详讲

一、概述 docker-compose 项目是docker官方的开源项目&#xff0c; 负责实现对docker容器集群的快速编排&#xff0c;来轻松高效的管理容器&#xff0c;定义运行多个容器。 docker-compose将所管理的容器分为三层&#xff0c; 分别是工程&#xff08;project&#xff09;&#…

C#,码海拾贝(19)——一般实矩阵的QR分解(QR Decomposition)方法之C#源代码,《C#数值计算算法编程》源代码升级改进版

1 实矩阵 实矩阵&#xff0c;指的是矩阵中所有的数都是实数的矩阵。如果一个矩阵中含有除实数以外的数&#xff0c;那么这个矩阵就不是实矩阵。 2 QR&#xff08;正交三角&#xff09;分解法 QR&#xff08;正交三角&#xff09;分解法是求一般矩阵全部特征值的最有效并广泛应…

基于Java+SpringBoot制作一个宿舍报修小程序

制作一个宿舍报修小程序&#xff0c;让学生实现快速报修&#xff0c;将流程进行精简&#xff0c; 便于管理部门有效响应。 微信小程序实战开发专栏 一、小程序1.1 项目创建1.2 首页iconfont图标引入1.3 报修管理报修提交报修记录报修溯源1.4 来访登记1.5 公告通知二、API2.1 Sp…

windows日志捕获工具-DebugView使用教程

debugview 是一款捕获windows桌面系统程序中由TRACE(debug版本)和OutputDebugString输出的信息。 1、双击打开DebugView.exe工具&#xff0c;看到如下界面&#xff1a; 其中这里 Include代表过滤想要的关键字&#xff0c;一般不会找自己想要的关键字日志就设置成星号*&#xf…

GDOUCTF WEB

web hate eat snake 游戏题找js&#xff0c;将判断语句删掉即可 ezweb 看源码找到/src路由&#xff0c;找到源码 import flaskapp flask.Flask(__name__)app.route(/, methods[GET]) def index():return flask.send_file(index.html)app.route(/src, methods[GET]) def so…

如何快速开发软件?这篇文章说明白了

随着经济迅速发展&#xff0c;传统软件开发模式存在研发周期长、需求转化困难、投入成本高等问题&#xff0c;无法适应当前业务发展速度,市场需要快速开发工具。快速开发软件可分为代码生成类、少代码类、零代码功能配置类。代码生成类相对灵活&#xff0c;但对用户要求高&…

MySQL group_concat设置group_concat_max_len

GROUP_CONCAT函数用于将多个字符串连接成一个字符串&#xff0c;在拼接成字符串时就会存在拼接长度的问题&#xff0c;mysql 默认的拼接最大长度为1024 个字节&#xff0c;由于1024个字节会出现不够用的情况&#xff0c;所以有时需要去根据情况进行修改&#xff0c;方式如下。 …

nginx启动、配置、测试(全网最全)

目录 一、要求 1.配置不同IP访问 2.配置不同端口访问 3.配置域名访问 二、前期准备 1.安装gcc g的依赖库 2.安装 pcre的依赖库 3.安装zlib的依赖库 4.安装openssl的依赖库 5.解压nginx的安装包 6.进入到解压的nginx安装目录里面 7.将nginx安装到/usr/local/下 8.编译 9.进入到…

从 Python 中的字典列表中删除重复项

要从字典列表中删除重复项&#xff1a; 使用字典推导来遍历列表。使用每个 id 属性的值作为键&#xff0c;使用字典作为值。使用 dict.values() 方法只获取唯一的字典。使用 list() 类将结果转换为列表。 list_of_dictionaries [{id: 1, site: jiyik.com},{id: 2, site: goo…

5G NR调制阶数与EVM关系以及对系统SNR要求分析

移动通信技术对数据传输速率要求越来越高。一种提高传输速率的思路是使用更高阶的QAM 调制方式&#xff0c;例如5G NR 的256QAM PDSCH&#xff0c;微波的1024QAM&#xff0c;2048QAM和4096QAM 调制。更高阶的QAM 调制方式对系统也提出了更高的要求。例如某个系统的EVM 测试结果…

微服务+springcloud+springcloud alibaba学习笔记【Hystrix(豪猪哥)的使用】(6/9)

Hystrix&#xff08;豪猪哥&#xff09;的使用 6/91、Hystrix熔断器概述2、HyStrix重要概念3、hystrix案例3.1 新建模块 Cloud-provider-hystrix-payment80013.2 创建带降级的order模块 Cloud-comsumer-feign-hystrix-order803.3 配置服务降级:3.3.1 服务降级 Cloud-provider-h…

企业做体系认证要警惕的8大问题,别再被不良认证机构忽悠啦!

企业资质认证要警惕这八大问题 企业资质真的很管用。我们都知道从事任何一个行业都需要准入证明&#xff0c;尤其是招投标企业&#xff0c;企业资质更是投标的准入门槛&#xff0c;并且在投标中还可以为企业加分。 有些资质如ISO三体系是企业必备的资质之一&#xff0c;也是常…