文章目录
- 零:前置操作 --- 搭建Seata服务
- 一:介绍说明
- 二:添加undolog表
- 三:框架整合Seata相关依赖
- 3.1:引入公共SEATA POM依赖
- 3.2:业务服务引入SEATA公共组件依赖
- 3.3:yml文件配置Seata客户端和注册信息
- 四:框架整合OpenFeign RPC服务调用相关依赖
- 4.1:OpenFeign RPC POM依赖
- 4.2:OpenFeign RPC 库存服务接口整合
- 4.2:订单业务服务整合Opengfeign组件
- 4.2.1:Opeign组件POM引入
- 五:启动订单服务和库存服务 ---- (库存服务和订单服务搭建过程一样,只不过库和服务名不一样)
- 5.1:启动类注解说明 ---- @EnableFeignClients
- 5.2:启动服务查看输出日志
- 5.2.1:订单服务打印日志如下:
- 5.2.2:库存服务打印日志如下:
- 六:发送订单服务请求,跨服务调用,实现Seata分布式事务
- 6.1:请求订单服务
- 6.1.1:OrderController.class
- 6.1.2:OrderServiceImpl.class
- 6.1.2.1:查看订单请求对应Seata信息
- 6.1.2.1.1:订单库里undolog表信息
- 查看rollback_info字段信息
- 6.1.2.1.2:seata-config配置库里branch_table,global_table,lock_table表信息
- branch_table
- global_table
- 6.2:订单服务进入到库存服务
- 6.2.1 StockServiceImpl.class
- chain-stock.undo_log
- branch_table
- lock_table
- 6.2.2 库存服务抛异常-成功回滚
- 库存服务回滚日志
- 订单服务回滚日志
零:前置操作 — 搭建Seata服务
之前博客整合步骤:
https://blog.csdn.net/Abraxs/article/details/128425499?spm=1001.2014.3001.5502
一:介绍说明
两微服务:
一个order下单服务,一个stock库存服务
下单同时调用stock减少库存
二:添加undolog表
在各业务库中添加事务日志表,相当于各服务都有个事务分支管理
CREATE TABLE `undo_log` (
`id` bigint NOT NULL AUTO_INCREMENT,
`branch_id` bigint NOT NULL,
`xid` varchar(100) CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL,
`context` varchar(128) CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL,
`rollback_info` longblob NOT NULL,
`log_status` int NOT NULL,
`log_created` datetime NOT NULL,
`log_modified` datetime NOT NULL,
`ext` varchar(100) CHARACTER SET utf8 COLLATE utf8_general_ci DEFAULT NULL,
PRIMARY KEY (`id`) USING BTREE,
UNIQUE KEY `ux_undo_log` (`xid`,`branch_id`) USING BTREE
) ENGINE=InnoDB AUTO_INCREMENT=5 DEFAULT CHARSET=utf8mb3 ROW_FORMAT=DYNAMIC;
三:框架整合Seata相关依赖
3.1:引入公共SEATA POM依赖
Seata事务相关依赖抽出,作为一公共组件供其他服务引用
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>europa-platform</artifactId>
<groupId>com.europa</groupId>
<version>1.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>commons-global-tx</artifactId>
<dependencies>
<dependency>
<groupId>com.alibaba.nacos</groupId>
<artifactId>nacos-client</artifactId>
<!-- <version>1.2.0</version>-->
</dependency>
<!-- 分布式事务解决方案 -->
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-alibaba-seata</artifactId>
<exclusions>
<exclusion>
<groupId>io.seata</groupId>
<artifactId>seata-spring-boot-starter</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>io.seata</groupId>
<artifactId>seata-spring-boot-starter</artifactId>
<version>1.4.2</version>
</dependency>
</dependencies>
</project>
3.2:业务服务引入SEATA公共组件依赖
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>modules</artifactId>
<groupId>com.europa</groupId>
<version>1.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>europa-tx</artifactId>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-tx</artifactId>
<version>5.3.3</version>
</dependency>
<dependency>
<groupId>org.springframework.amqp</groupId>
<artifactId>spring-rabbit-test</artifactId>
<scope>test</scope>
</dependency>
<!-- <dependency>-->
<!-- <groupId>org.springframework.boot</groupId>-->
<!-- <artifactId>spring-boot-starter-amqp</artifactId>-->
<!-- </dependency>-->
<dependency>
<groupId>com.europa</groupId>
<artifactId>commons-db</artifactId>
<version>1.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>com.europa</groupId>
<artifactId>commons-support-api</artifactId>
<version>1.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>com.europa</groupId>
<artifactId>commons-global-tx</artifactId>
<version>1.0-SNAPSHOT</version>
</dependency>
<!-- <dependency>-->
<!-- <groupId>com.alibaba.cloud</groupId>-->
<!-- <artifactId>spring-cloud-starter-alibaba-sentinel</artifactId>-->
<!-- </dependency>-->
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>druid</artifactId>
<version>1.2.11</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.12</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-test</artifactId>
<version>2.7.1</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-test</artifactId>
<version>5.3.21</version>
</dependency>
</dependencies>
</project>
3.3:yml文件配置Seata客户端和注册信息
seata:
enabled: true
application-id: ${spring.application.name}
# 事务组
tx-service-group: my_test_tx_group
# 自动数据源代理
enable-auto-data-source-proxy: true
# 数据源代理模式(分布式事务方案)
data-source-proxy-mode: AT
# 与Nacos配置的vgrouping一直
service:
vgroup-mapping:
my_test_tx_group: default
#nacos配置
config:
type: nacos
nacos:
server-addr: 192..101:8848
group: SEATA_GROUP
namespace: d4874eb0-1917-45fd-8dc1-34f6f3f5265a
data-id: seataServer.properties
username: nacos
password: nacos
#nacos注册
registry:
type: nacos
nacos:
server-addr: 192..101:8848 # seata server 所在的nacoas服务地址
application: seata-server # 默认名称:seata-server 没有修改可以不配置
group: SEATA_GROUP # 默认分组:SEATA_GROUP 没有修改可以不配置
username: nacos
password: nacos
namespace: d4874eb0-1917-45fd-8dc1-34f6f3f5265a
下面是整个YML
server:
port: 8092
spring:
application:
name: order-server #服务名称
rabbitmq:
host: 192.101
port: 5672
username: root
password: 123456
cloud:
nacos:
config:
server-addr: 101.62:8848
file-extension: yaml
discovery:
server-addr: 101.62:8848 #nacos的服务注册中心地址
# sentinel:
# transport:
# dashboard: 192.168.56.104:8887
# config:
# # 是否开启配置中心 默认true
# enabled: true
# server-addr: 101.62:8848 #nacos的服务注册中心地址
# # 配置文件后缀
# file-extension: yml
# # 配置对应的分组
# group: SEATA_GROUP
# # Nacos 认证用户
# username: nacos
# # Nacos 认证密码
# password: nacos
# # 支持多个共享 Data Id 的配置,优先级小于extension-configs,自定义 Data Id 配置 属性是个集合,内部由 Config POJO 组成。Config 有 3 个属性,分别是 dataId, group 以及 refresh
# shared-configs[0]:
# data-id: seata-client.yaml # 配置文件名-Data Id
# group: SEATA_GROUP # 默认为DEFAULT_GROUP
# refresh: false # 是否动态刷新,默认为false
# shared-configs:
# - data-id: seata-client.yaml
# group: SEATA_GROUP
# refresh: true
datasource: #链接数据源
type: com.alibaba.druid.pool.DruidDataSource
driver-class-name: com.mysql.cj.jdbc.Driver
url: jdbc:mysql://192.:3306/chain-order?useUnicode=true&characterEncoding=UTF-8&useJDBCCompliantTimezoneShift=true&useLegacyDatetimeCode=false&serverTimezone=GMT%2b8
username: root
password: 123456
#分布式事务
seata:
enabled: true
application-id: ${spring.application.name}
# 客户端和服务端在同一个事务组
tx-service-group: my_test_tx_group
# 自动数据源代理
enable-auto-data-source-proxy: true
# 数据源代理模式(分布式事务方案)
data-source-proxy-mode: AT
# 事务群组,配置项值为TC集群名,需要与服务端保持一致
service:
vgroup-mapping:
my_test_tx_group: default
#整合nacos配置中心
config:
type: nacos
nacos:
server-addr: 192..101:8848
group: SEATA_GROUP
namespace: d4874eb0-1917-45fd-8dc1-34f6f3f5265a
data-id: seataServer.properties
#可选
username: nacos
#可选
password: nacos
#整合nacos注册中心
registry:
type: nacos
nacos:
server-addr: 192..101:8848 # seata server 所在的nacoas服务地址
application: seata-server # 默认名称:seata-server 没有修改可以不配置
group: SEATA_GROUP # 默认分组:SEATA_GROUP 没有修改可以不配置
username: nacos
password: nacos
namespace: d4874eb0-1917-45fd-8dc1-34f6f3f5265a
四:框架整合OpenFeign RPC服务调用相关依赖
4.1:OpenFeign RPC POM依赖
openfeign相关配置依赖单独抽出作为公共组件供其他服务引用
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>commons</artifactId>
<groupId>com.europa</groupId>
<version>1.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>commons-support-api</artifactId>
<dependencies>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-openfeign</artifactId>
</dependency>
<dependency>
<groupId>io.github.openfeign</groupId>
<artifactId>feign-okhttp</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-loadbalancer</artifactId>
</dependency>
</dependencies>
</project>
4.2:OpenFeign RPC 库存服务接口整合
场景: 进入订单服务,订单服务会调用库存服务,需要把库存接口地址抽出来到Openfeign公共组件中
package com.europa.support.provider;
import org.springframework.cloud.openfeign.FeignClient;
import org.springframework.stereotype.Component;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestParam;
@Component
@FeignClient(value = "stock-server", url = "http://localhost:8001/")
public interface IStockProvider {
@PostMapping("/reduceStock")
String reduceStock(@RequestParam("productId") Integer productId, @RequestParam("totalAmount") Integer totalAmount);
@Component
class IStockProviderFallback implements IStockProvider {
@Override
public String reduceStock(Integer productId, Integer totalAmount) {
return null;
}
}
}
4.2:订单业务服务整合Opengfeign组件
4.2.1:Opeign组件POM引入
<dependency>
<groupId>com.europa</groupId>
<artifactId>commons-support-api</artifactId>
<version>1.0-SNAPSHOT</version>
</dependency>
五:启动订单服务和库存服务 ---- (库存服务和订单服务搭建过程一样,只不过库和服务名不一样)
5.1:启动类注解说明 ---- @EnableFeignClients
该注解类 : 是为了启动是扫描到OpenFeign的定义的包路径,跨服务接口并没有具体实现类,通过动态代理
package com.europa.tx;
import org.mybatis.spring.annotation.MapperScan;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.client.discovery.EnableDiscoveryClient;
import org.springframework.cloud.openfeign.EnableFeignClients;
@EnableDiscoveryClient
@SpringBootApplication
@EnableFeignClients(basePackages = "com.europa.support.provider")
@MapperScan(basePackages = {"com.europa.tx.mapper"})
public class EurtxApplication {
public static void main(String[] args) {
SpringApplication app = new SpringApplication(EurtxApplication.class);
app.run(args);
}
}
5.2:启动服务查看输出日志
可以看到打印信息,资源管理器RM 注入成功,对应的事务组也成功配置打印
5.2.1:订单服务打印日志如下:
2022-12-28 01:48:13.257 INFO 8488 --- [ main] i.s.c.rpc.netty.RmNettyRemotingClient : RM will register :jdbc:mysql://192..104:3306/chain-order
2022-12-28 01:48:13.260 INFO 8488 --- [ main] i.s.core.rpc.netty.NettyPoolableFactory : NettyPool create channel to transactionRole:RMROLE,address:192.:8091,msg:< RegisterRMRequest{resourceIds='jdbc:mysql://192..104:3306/chain-order', applicationId='order-server', transactionServiceGroup='my_test_tx_group'} >
2022-12-28 01:49:10.975 INFO 8488 --- [eoutChecker_1_1] i.s.core.rpc.netty.NettyPoolableFactory : NettyPool create channel to transactionRole:TMROLE,address:192..102:8091,msg:< RegisterTMRequest{applicationId='order-server', transactionServiceGroup='my_test_tx_group'} >
2022-12-28 01:49:10.987 INFO 8488 --- [eoutChecker_1_1] i.s.c.rpc.netty.TmNettyRemotingClient : register TM success. client version:1.3.0, server version:1.4.2,channel:[id: 0x318597e3, L:/192..1:52260 - R:/192..102:8091]
2022-12-28 01:49:10.987 INFO 8488 --- [eoutChecker_1_1] i.s.core.rpc.netty.NettyPoolableFactory : register success, cost 5 ms, version:1.4.2,role:TMROLE,channel:[id: 0x318597e3, L:/192.1:52260 - R:/192.102:8091]
5.2.2:库存服务打印日志如下:
六:发送订单服务请求,跨服务调用,实现Seata分布式事务
6.1:请求订单服务
http://localhost:8092/syncOrder
6.1.1:OrderController.class
OrderController.class
package com.europa.tx.controller;
//import org.springframework.amqp.core.Message;
//import org.springframework.amqp.core.MessageProperties;
//import org.springframework.amqp.rabbit.core.RabbitTemplate;
import com.europa.tx.entity.BizOrder;
import com.europa.tx.service.OrderService;
import org.glassfish.jersey.message.MessageProperties;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.Random;
@RestController
public class OrderController {
// @Autowired
// RabbitTemplate rabbitTemplate;
@Autowired
private OrderService orderService;
@RequestMapping("/syncOrder")
public String syncOrder(){
// System.out.println("seata全局事务id====================>"+ RootContext.getXID());
BizOrder order = new BizOrder();
order.setProductId(1);
// order.setTotalAmount(new BigDecimal("10"));
order.setStatus("0");
order.setTotalAmount(100);
try {
boolean result = orderService.saveLock(order);
} catch (Exception e) {
e.printStackTrace();
}
// if (result==false) {
// return "下单失败";
// }
return "下单成功";
}
@RequestMapping("/delayedSend")
public String delayedSend(){
// Random random = new Random();
// int i = random.nextInt(10);
// String millTimes = String.valueOf(i * 1000);
//
// MessageProperties messageProperties = new MessageProperties();
//
// String msg = " 我是 plugins - delay";
// messageProperties.setHeader("x-delay",millTimes);//延迟5秒被删除
// Message message = new Message(msg.getBytes(), messageProperties);
// rabbitTemplate.convertAndSend("delayed-exchange","delay",message);
// SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
// System.out.println("消息发送成功【" + sdf.format(new Date()) + "】");
return "";
}
}
6.1.2:OrderServiceImpl.class
package com.europa.tx.service.impl;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import com.europa.support.provider.IStockProvider;
import com.europa.tx.entity.BizOrder;
import com.europa.tx.mapper.OrderMapper;
import com.europa.tx.service.OrderService;
import io.seata.core.context.RootContext;
import io.seata.spring.annotation.GlobalTransactional;
import org.apache.commons.lang.RandomStringUtils;
import org.apache.commons.lang3.ObjectUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import javax.annotation.Resource;
import java.util.UUID;
@Service
public class OrderServiceImpl extends ServiceImpl<OrderMapper, BizOrder> implements OrderService {
@Resource
private OrderMapper orderMapper;
@Autowired
private IStockProvider iStockProvider;
@Override
@GlobalTransactional(name = "my_test_tx_group", rollbackFor = Exception.class)
// @Transactional(rollbackFor = Exception.class)
public boolean saveLock(BizOrder order) throws Exception {
System.out.println("seata全局事务id====================>"+ RootContext.getXID());
// 模拟有条订单发生改变
BizOrder existOrder = orderMapper.selectById("048b2d2836b641aa93b16d410f682db0");
if (ObjectUtils.isNotEmpty(existOrder)) {
existOrder.setTotalAmount(50);
baseMapper.updateById(existOrder);
}
order.setId(UUID.randomUUID().toString().replaceAll("-", "").toString());
order.setProductId(1);
// order.setTotalAmount(new BigDecimal("90.00"));
order.setStatus("看到改变了,说明失效了哦");
// 下单
int a = orderMapper.insert(order);
String aa = iStockProvider.reduceStock(order.getProductId(), order.getProductId());
// throw new Exception();
// 扣减库存
// int f = 1/0;
// if(a == 1) {
return true;
// }
}
}
6.1.2.1:查看订单请求对应Seata信息
6.1.2.1.1:订单库里undolog表信息
注意:订单服务实现类两条对数据库的操作,修改和新增,下面看undo_log信息
baseMapper.updateById(existOrder);
int a = orderMapper.insert(order);
查看rollback_info字段信息
SELECT CONVERT(t.rollback_info USING utf8mb4),t.* FROM undo_log t
可以看到生成了两条branch_id分支,分别对应数据库操作的回滚信息,有一个update更新类型和一个insert,分别对应一个beforeImage和afterImage镜像信息,包含之前之后的sql变化和值,方便后面抛异常RC事务协调器通知RM资源管理器进行回滚操作。
{"@class":"io.seata.rm.datasource.undo.BranchUndoLog","xid":"192..102:8091:18358011811149491","branchId":18358011811149494,"sqlUndoLogs":["java.util.ArrayList",[{"@class":"io.seata.rm.datasource.undo.SQLUndoLog","sqlType":"UPDATE","tableName":"biz_order","beforeImage":{"@class":"io.seata.rm.datasource.sql.struct.TableRecords","tableName":"biz_order","rows":["java.util.ArrayList",[{"@class":"io.seata.rm.datasource.sql.struct.Row","fields":["java.util.ArrayList",[{"@class":"io.seata.rm.datasource.sql.struct.Field","name":"id","keyType":"PRIMARY_KEY","type":12,"value":"048b2d2836b641aa93b16d410f682db0"},{"@class":"io.seata.rm.datasource.sql.struct.Field","name":"product_id","keyType":"NULL","type":-5,"value":["java.lang.Long",1]},{"@class":"io.seata.rm.datasource.sql.struct.Field","name":"total_amount","keyType":"NULL","type":4,"value":100},{"@class":"io.seata.rm.datasource.sql.struct.Field","name":"status","keyType":"NULL","type":12,"value":"看到改变了,说明失效了哦"}]]}]]},"afterImage":{"@class":"io.seata.rm.datasource.sql.struct.TableRecords","tableName":"biz_order","rows":["java.util.ArrayList",[{"@class":"io.seata.rm.datasource.sql.struct.Row","fields":["java.util.ArrayList",[{"@class":"io.seata.rm.datasource.sql.struct.Field","name":"id","keyType":"PRIMARY_KEY","type":12,"value":"048b2d2836b641aa93b16d410f682db0"},{"@class":"io.seata.rm.datasource.sql.struct.Field","name":"product_id","keyType":"NULL","type":-5,"value":["java.lang.Long",1]},{"@class":"io.seata.rm.datasource.sql.struct.Field","name":"total_amount","keyType":"NULL","type":4,"value":50},{"@class":"io.seata.rm.datasource.sql.struct.Field","name":"status","keyType":"NULL","type":12,"value":"看到改变了,说明失效了哦"}]]}]]}}]]}
BEFOREIMAGE
AFTERIMAGE
6.1.2.1.2:seata-config配置库里branch_table,global_table,lock_table表信息
branch_table
global_table
###### lock_table
6.2:订单服务进入到库存服务
6.2.1 StockServiceImpl.class
断点打在库存服务异常行,在执行sql语句之后,上面订单服务断点位置在sql语句之前,对undolog和seatta-config做比较
package com.europa.tx.stock.service.impl;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import com.europa.tx.stock.entity.Stock;
import com.europa.tx.stock.mapper.StockMapper;
import com.europa.tx.stock.service.StockService;
import io.seata.core.context.RootContext;
import io.seata.spring.annotation.GlobalTransactional;
import org.apache.commons.lang3.RandomUtils;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import java.util.Random;
@Service
public class StockServiceImpl extends ServiceImpl<StockMapper, Stock> implements StockService {
@GlobalTransactional(name = "my_test_tx_group", rollbackFor = Exception.class)
@Override
public void reduceStock(Integer productId, Integer totalAmount) throws Exception {
System.out.println("seata全局事务id====================>"+ RootContext.getXID());
LambdaQueryWrapper<Stock> queryWrapper = new LambdaQueryWrapper();
queryWrapper.eq(Stock::getProductId,productId);
Stock result = this.getOne(queryWrapper);
result.setCount(result.getCount() - 1);
this.updateById(result);
throw new Exception("");
}
}