目录
- 一、网关服务限流熔断降级
- 二、Seata--分布式事务
- 1、分布式事务基础
- ①事务
- ②本地事物
- ③分布式事务
- ④分布式事务的场景
- 2、分布式事务解决方案
- ①全局事务
- ②最大努力通知
- ③TCC事务
- 3、Seata介绍
- 4、Seata实现分布式事务控制
- ①案例基本代码(异常模拟)
- ②启动Seata
- 下载seata
- 修改配置文件及初始化
- ③使用Seata实现事务控制
- ④seata运行流程分析
一、网关服务限流熔断降级
第1步:启动sentinel-dashboard控制台和Nacos注册中心服务
第2步:在网关服务中引入sentinel依赖
<!-- sentinel -->
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-alibaba-sentinel</artifactId>
</dependency>
第3步:在网关服务application.yml中配置sentinel
spring:
application:
name: zmall-gateway
cloud:
nacos:
discovery:
server-addr: localhost:8848
sentinel:
transport:
port: 9998 #跟控制台交流的端口,随意指定一个未使用的端口即可
dashboard: localhost:8080 # 指定控制台服务的地址
eager: true #当服务启动时是否与sentinel建立连接
web-context-unify: false # 关闭URL PATH聚合
第4步:通过域名直接访问商品服务,并登陆到sentinel控制台配置服务流控等信息
打开浏览器输入地址:http://zmall.com/index.html
进入sentinel控制台,选中簇点链路。在搜索框中输入搜索关键字index
这个时候会发现流控操作只能针对具体服务资源链路,而不能针对具体整个服务本身进行流控操作。所以,阿里特此推出了网关限流方式来解决以上问题。
第5步:重新在网关服务模块pom.xml中加入依赖
<!-- sentinel gateway -->
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-alibaba-sentinel-gateway</artifactId>
</dependency>
第6步:重新刷新商品服务,再进入sentinel控制台查看链路情况
这是直接针对该微服务进行网关限流等操作。直接点击流控,设置QPS=1、流控模式=直接(默认)、流控效果=快速失败(默认)等,最后快速刷新商品服务地址即可查看流控效果。同时,也可以配置流控的流控效果为排队等待方式,当流量多大时以排队等待方式慢慢去消化请求,从而可以起到一个流量削锋的目的。
二、Seata–分布式事务
1、分布式事务基础
①事务
事务指的就是一个操作单元,在这个操作单元中的所有操作最终要保持一致的行为,要么所有操作
都成功,要么所有的操作都被撤销。简单地说,事务提供一种“要么什么都不做,要么做全套”机制。
②本地事物
本地事物其实可以认为是数据库提供的事务机制。说到数据库事务就不得不说,数据库事务中的四
大特性:
- A:原子性(Atomicity),一个事务中的所有操作,要么全部完成,要么全部不完成
- C:一致性(Consistency),在一个事务执行之前和执行之后数据库都必须处于一致性状态
- I:隔离性(Isolation),在并发环境中,当不同的事务同时操作相同的数据时,事务之间互不影响
- D:持久性(Durability),指的是只要事务成功结束,它对数据库所做的更新就必须永久的保存下来
数据库事务在实现时会将一次事务涉及的所有操作全部纳入到一个不可分割的执行单元,该执行单
元中的所有操作要么都成功,要么都失败,只要其中任一操作执行失败,都将导致整个事务的回滚
③分布式事务
分布式事务指事务的参与者、支持事务的服务器、资源服务器以及事务管理器分别位于不同的分布
式系统的不同节点之上。
简单的说,就是一次大的操作由不同的小操作组成,这些小的操作分布在不同的服务器上,且属于不同
的应用,分布式事务需要保证这些小操作要么全部成功,要么全部失败。
本质上来说,分布式事务就是为了保证不同数据库的数据一致性。
④分布式事务的场景
单体系统访问多个数据库
一个服务需要调用多个数据库实例完成数据的增删改操作
多个微服务访问同一个数据库
多个服务需要调用一个数据库实例完成数据的增删改操作
多个微服务访问多个数据库
多个服务需要调用一个数据库实例完成数据的增删改操作
2、分布式事务解决方案
①全局事务
全局事务基于DTP模型实现。DTP是由X/Open组织提出的一种分布式事务模型——X/Open
Distributed Transaction Processing Reference Model。它规定了要实现分布式事务,需要三种角色:
- AP: Application 应用系统 (微服务)
- TM: Transaction Manager 事务管理器 (全局事务管理)
- RM: Resource Manager 资源管理器 (数据库)
整个事务分成两个阶段:
-
阶段一: 表决阶段,所有参与者都将本事务执行预提交,并将能否成功的信息反馈发给协调者。
-
阶段二: 执行阶段,协调者根据所有参与者的反馈,通知所有参与者,步调一致地执行提交或者回
滚。
优点 -
提高了数据一致性的概率,实现成本较低
缺点
-
单点问题: 事务协调者宕机
-
同步阻塞: 延迟了提交时间,加长了资源阻塞时间
-
数据不一致: 提交第二阶段,依然存在commit结果未知的情况,有可能导致数据不一致
可靠消息服务
基于可靠消息服务的方案是通过消息中间件保证上、下游应用数据操作的一致性。假设有A和B两个
系统,分别可以处理任务A和任务B。此时存在一个业务流程,需要将任务A和任务B在同一个事务中处
理。就可以使用消息中间件来实现这种分布式事务。
第一步: 消息由系统A投递到中间件
- 在系统A处理任务A前,首先向消息中间件发送一条消息
- 消息中间件收到后将该条消息持久化,但并不投递。持久化成功后,向A回复一个确认应答
- 系统A收到确认应答后,则可以开始处理任务A
- 任务A处理完成后,向消息中间件发送Commit或者Rollback请求。该请求发送完成后,对系统A而
言,该事务的处理过程就结束了 - 如果消息中间件收到Commit,则向B系统投递消息;如果收到Rollback,则直接丢弃消息。但是如果消息中间件收不到Commit和Rollback指令,那么就要依靠"超时询问机制"。
超时询问机制
系统A除了实现正常的业务流程外,还需提供一个事务询问的接口,供消息中间件调
用。当消息中间件收到发布消息便开始计时,如果到了超时没收到确认指令,就会主动调用
系统A提供的事务询问接口询问该系统目前的状态。该接口会返回三种结果,中间件根据三
种结果做出不同反应:
提交:将该消息投递给系统B
回滚:直接将条消息丢弃
处理中:继续等待
第二步: 消息由中间件投递到系统B
消息中间件向下游系统投递完消息后便进入阻塞等待状态,下游系统便立即进行任务的处理,任务
处理完成后便向消息中间件返回应答。
- 如果消息中间件收到确认应答后便认为该事务处理完毕
- 如果消息中间件在等待确认应答超时之后就会重新投递,直到下游消费者返回消费成功响应为止。
一般消息中间件可以设置消息重试的次数和时间间隔,如果最终还是不能成功投递,则需要手工干
预。这里之所以使用人工干预,而不是使用让A系统回滚,主要是考虑到整个系统设计的复杂度问
题。
基于可靠消息服务的分布式事务,前半部分使用异步,注重性能;后半部分使用同步,注重开发成本。
②最大努力通知
最大努力通知也被称为定期校对,其实是对第二种解决方案的进一步优化。它引入了本地消息表来
记录错误消息,然后加入失败消息的定期校对功能,来进一步保证消息会被下游系统消费。
第一步: 消息由系统A投递到中间件
- 处理业务的同一事务中,向本地消息表中写入一条记录
- 准备专门的消息发送者不断地发送本地消息表中的消息到消息中间件,如果发送失败则重试
第二步: 消息由中间件投递到系统B
- 消息中间件收到消息后负责将该消息同步投递给相应的下游系统,并触发下游系统的任务执行
- 当下游系统处理成功后,向消息中间件反馈确认应答,消息中间件便可以将该条消息删除,从而该
事务完成 - 对于投递失败的消息,利用重试机制进行重试,对于重试失败的,写入错误消息表
- 消息中间件需要提供失败消息的查询接口,下游系统会定期查询失败消息,并将其消费
这种方式的优缺点:
- 优点: 一种非常经典的实现,实现了最终一致性。
- 缺点: 消息表会耦合到业务系统中,如果没有封装好的解决方案,会有很多杂活需要处理。
③TCC事务
TCC即为Try Confirm Cancel,它属于补偿型分布式事务。TCC实现分布式事务一共有三个步骤:
-
Try:尝试待执行的业务
这个过程并未执行业务,只是完成所有业务的一致性检查,并预留好执行所需的全部资源 -
Confirm:确认执行业务
确认执行业务操作,不做任何业务检查, 只使用Try阶段预留的业务资源。通常情况下,采用TCC
则认为 Confirm阶段是不会出错的。即:只要Try成功,Confirm一定成功。若Confirm阶段真的
出错了,需引入重试机制或人工处理。 -
Cancel:取消待执行的业务
取消Try阶段预留的业务资源。通常情况下,采用TCC则认为Cancel阶段也是一定成功的。若
Cancel阶段真的出错了,需引入重试机制或人工处理。
TCC两阶段提交与XA两阶段提交的区别是:
XA是资源层面的分布式事务,强一致性,在两阶段提交的整个过程中,一直会持有资源的锁。
TCC是业务层面的分布式事务,最终一致性,不会一直持有资源的锁。
TCC事务的优缺点: -
优点:把数据库层的二阶段提交上提到了应用层来实现,规避了数据库层的2PC性能低下问题。
-
缺点:TCC的Try、Confirm和Cancel操作功能需业务提供,开发成本高。
3、Seata介绍
2019 年 1 月,阿里巴巴中间件团队发起了开源项目 Fescar(Fast & EaSy Commit And
Rollback),其愿景是让分布式事务的使用像本地事务的使用一样,简单和高效,并逐步解决开发者们
遇到的分布式事务方面的所有难题。后来更名为 Seata,意为:Simple Extensible Autonomous
Transaction Architecture,是一套分布式事务解决方案。
Seata的设计目标是对业务无侵入,因此从业务无侵入的2PC方案着手,在传统2PC的基础上演进。
它把一个分布式事务理解成一个包含了若干分支事务的全局事务。全局事务的职责是协调其下管辖的分
支事务达成一致,要么一起成功提交,要么一起失败回滚。此外,通常分支事务本身就是一个关系数据
库的本地事务。
2PC即两阶段提交协议,是将整个事务流程分为两个阶段,准备阶段(Prepare phase)、提交阶段(commit phase),2是指两个阶段,P是指准备阶段,C是指提交阶段。
Seata主要由三个重要组件组成:
- TC:Transaction Coordinator 事务协调器,管理全局的分支事务的状态,用于全局性事务的提交
和回滚。 - TM:Transaction Manager 事务管理器,用于开启、提交或者回滚全局事务。
- RM:Resource Manager 资源管理器,用于分支事务上的资源管理,向TC注册分支事务,上报分
支事务的状态,接受TC的命令来提交或者回滚分支事务。
用例说明:
用户购买商品的业务逻辑:
-
Storage服务:对给定的商品扣除仓储数量。
-
Order服务:根据采购需求创建订单。
-
Account服务:从用户帐户中扣除余额。
-
Business服务:创建订单的同时需完成对商品库存扣减及用户账号余额扣除操作。
Seata的执行流程如下:
-
Business业务服务TM申请向TC开启一个全局事务,TC就会创建一个全局事务并返回一个唯一的XID
-
Storage服务的RM向TC注册分支事务,并及其纳入XID对应全局事务的管辖
-
Storage服务执行分支事务,向数据库做操作,对给定的商品扣除仓储数量
-
Order服务的RM向TC注册分支事务,并及其纳入XID对应全局事务的管辖
-
Order服务执行分支事务,向数据做操作,创建订单
-
Order服务开始远程调用Account服务,此时XID会在微服务的调用链上传播
-
Account服务的RM向TC注册分支事务,并将其纳入XID对应的全局事务的管辖
-
Account服务执行分支事务,向数据库做操作,从用户账户中扣除余额
-
全局事务调用链处理完毕,TM根据有无异常向TC发起全局事务的提交或者回滚
-
TC协调其管辖之下的所有分支事务, 决定是否回滚
Seata实现2PC与传统2PC的差别:
- 架构层次方面,传统2PC方案的 RM 实际上是在数据库层,RM本质上就是数据库自身,通过XA协
议实现,而 Seata的RM是以jar包的形式作为中间件层部署在应用程序这一侧的。 - 两阶段提交方面,传统2PC无论第二阶段的决议是commit还是rollback,事务性资源的锁都要保
持到Phase2完成才释放。而Seata的做法是在Phase1 就将本地事务提交,这样就可以省去Phase2
持锁的时间,整体提高效率。
4、Seata实现分布式事务控制
本示例通过Seata中间件实现分布式事务,模拟电商中的下单和扣库存的过程
我们通过订单微服务执行下单操作,然后由订单微服务调用商品微服务扣除库存
①案例基本代码(异常模拟)
修改product微服务
IProductService接口
public interface IProductService extends IService<Product> {
void updateStock(Integer pid,Integer num);
}
ProductServiceImpl
@Service
public class ProductServiceImpl extends ServiceImpl<ProductMapper, Product> implements IProductService {
@Transactional
@Override
public void updateStock(Integer pid, Integer num) {
//根据商品pid获取商品信息
Product product = this.getById(pid);
//判断库存商品是否大于购物商品数量
if(product.getStock()>num){
product.setStock(product.getStock()-num);
//根据商品pid修改商品数量
this.updateById(product);
}else{
throw new RuntimeException("库存不足");
}
}
}
ProductController
@Controller
public class ProductController {
@RequestMapping("/updateStock/{pid}/{num}")
@ResponseBody
public void updateStock(@PathVariable("pid") Integer pid,
@PathVariable("num") Integer num){
productService.updateStock(pid,num);
}
}
修改order微服务
order微服务启动类配置上配置@EnableFeignClients
ApiProductService
@FeignClient("zmall-product")
public interface ApiProductService {
@RequestMapping("/updateStock/{pid}/{num}")
void updateStock(@PathVariable("pid") Integer pid,
@PathVariable("num") Integer num);
}
IOrderService
public interface IOrderService extends IService<Order> {
Order createOrder(Integer pid,Integer num);
}
OrderServiceImpl
@Service
public class OrderServiceImpl extends ServiceImpl<OrderMapper, Order> implements IOrderService {
@Autowired
private ApiProductService productService;
@Transactional
@Override
public Order createOrder(Integer pid, Integer num) {
//根据商品ID修改商品对应的库存
productService.updateStock(pid,num);
//新增订单
Order order=new Order();
//此处只是做模拟操作
this.save(order);
return order;
}
}
OrderController
public class OrderController {
@Autowired
private IOrderService orderService;
@RequestMapping("/createOrder/{pid}/{num}")
@ResponseBody
public Order createOrder(@PathVariable("pid") Integer pid,
@PathVariable("num") Integer num){
return orderService.createOrder(pid,num);
}
}
异常模拟
在OrderServiceImpl的代码中模拟一个异常
@Transactional
@Override
public Order createOrder(Integer pid, Integer num) {
//根据商品ID修改商品对应的库存
productService.updateStock(pid,num);
//异常模拟
int i = 1 / 0;
//新增订单
Order order=new Order();
//此处只是做模拟操作
this.save(order);
return order;
}
zmall-order启动类
@EnableFeignClients
@EnableDiscoveryClient
@MapperScan({"com.zking.zmall.mapper"})
@SpringBootApplication
public class ZmallOrderApplication {
public static void main(String[] args) {
SpringApplication.run(ZmallOrderApplication.class, args);
}
}
测试地址
http://order.zmall.com/createOrder/733/1
会发现事务不一致
②启动Seata
下载seata
下载地址:https://github.com/seata/seata/releases/v1.3.0/
修改配置文件及初始化
- seata-server-1.4.0.zip
将下载得到的seata-server-1.4.0.zip(非源码包)压缩包进行解压,进入conf目录
修改file.conf
创建数据库Seata并初始化数据表
解压seata-1.4.0源码包,并进入到seata-1.4.0\script\server\db目录,复制运行mysql.sql脚本完成seata服务端数据库初始化工作。
修改registry.conf
registry {
type = “nacos” #这里使用nacos为注册中心,将type修改成nacos
nacos {
application = “seata-server” #注册的服务名
serverAddr = “127.0.0.1:8848” #nacos注册中心地址及端口
group = “SEATA_GROUP” #服务注册分组
namespace = “” #namespace是服务注册时的命名空间,可不填,不填默认public
cluster = “default” #默认即可
username = “nacos” #nacos的登录账号
password = “nacos” #nacos的登录密码
}
}
config {
type = “nacos”
nacos {
serverAddr = “127.0.0.1:8848”
namespace = “”
group = “SEATA_GROUP”
username = “nacos”
password = “nacos”
}
}
registry:指定注册中心,将seata-server注册到指定位置
config: 指定配置中心
- 配置seata-1.4.0.zip(源码包)
修改seata-1.4.0中script/config-center目录中的config.txt配置:
store.mode=db #修改存储方式为db
…
store.db.url=jdbc:mysql://127.0.0.1:3306/seata?useUnicode=true #修改数据库名
store.db.user=root #修改数据库账号
store.db.password=1234 #修改数据库密码
初始化Seata配置到Nacos中
在seata-1.4.0\script\config-center\nacos目录中右键选择git bash here,运行git命令窗口。并输入以下命令:
sh nacos-config.sh -h localhost -p 8848 -g SEATA_GROUP -u nacos -w nacos
执行成功后可以打开Nacos的控制台,在配置列表中,可以看到初始化了很多Group为SEATA_GROUP
的配置。
附录:seata_gc.log
Java HotSpot™ 64-Bit Server VM (25.144-b01) for windows-amd64 JRE (1.8.0_144-b01), built on Jul 21 2017 21:57:33 by “java_re” with MS VC++ 10.0 (VS2010)
Memory: 4k page, physical 8068836k(2311236k free), swap 9993956k(3072052k free)
CommandLine flags: -XX:CMSInitiatingOccupancyFraction=75 -XX:+CMSParallelRemarkEnabled -XX:+DisableExplicitGC -XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=E:\ziliao\seata\seata\bin\…/logs/java_heapdump.hprof -XX:InitialHeapSize=536870912 -XX:MaxDirectMemorySize=1073741824 -XX:MaxHeapSize=536870912 -XX:MaxMetaspaceSize=268435456 -XX:MaxNewSize=536870912 -XX:MetaspaceSize=134217728 -XX:NewSize=536870912 -XX:-OmitStackTraceInFastThrow -XX:+PrintGC -XX:+PrintGCTimeStamps -XX:SurvivorRatio=10 -XX:ThreadStackSize=512 -XX:-UseAdaptiveSizePolicy -XX:+UseCMSInitiatingOccupancyOnly -XX:+UseCompressedClassPointers -XX:+UseCompressedOops -XX:-UseLargePagesIndividualAllocation -XX:+UseParallelGC
675.339: [GC (Allocation Failure) 436736K->22212K(480768K), 0.0448588 secs]
解决方案:
我的MySQL是8.0的版本,5.7的版本不会出现上面错误;
这是seata自带的MySQL驱动,mysql-connector-java-5.1.35.jar版本过低,导致服务始终起不来;
得换成mysql-connector-java-5.1.44.jar,才可以运行
也可能是内存问题,修改启动文件内的大小即可
启动seata服务
直接进入seata服务的seata\bin目录下,双击运行seata-server.bat文件即可。或者使用以下命令方式运行:
cd bin
seata-server.bat -p 9000 -m file
seata-server.bat -h ip地址 -p 9000 -m file
-p 9000:指定监听端口,默认为8091
-m file: 模式
启动后在 Nacos 的服务列表下面可以看到一个名为 serverAddr 的服务。
③使用Seata实现事务控制
初始化数据表
进入源码包seata-1.4.0\script\client\at\db目录,复制并运行mysql.sql数据库脚本完成undo_log表创建,这是Seata记录事务日志要用到的表。
添加配置
- 添加依赖
在zmall-order和zmall-product模块中分别引入以下依赖:
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-alibaba-seata</artifactId>
</dependency>
<!-- 如果已经添加了nacos配置中心依赖,则可以不加入 -->
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-alibaba-nacos-config</artifactId>
</dependency>
DataSourceProxyConfig
Seata 是通过代理数据源实现事务分支的,所以需要配置io.seata.rm.datasource.DataSourceProxy 的Bean,且是 @Primary默认的数据源,否则事务不会回滚,无法实现分布式事务。请在zmall-order中添加DruidDataSource配置类,具体如下所示:
package com.zking.zmall.config;
import com.alibaba.druid.pool.DruidDataSource;
import io.seata.rm.datasource.DataSourceProxy;
import io.seata.rm.datasource.xa.DataSourceProxyXA;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;
import javax.sql.DataSource;
@Configuration
public class DataSourceProxyConfig {
@Bean
@ConfigurationProperties(prefix = "spring.datasource")
public DruidDataSource druidDataSource() {
return new DruidDataSource();
}
@Primary
@Bean("dataSourceProxy")
public DataSource dataSource(DruidDataSource druidDataSource) {
//AT模式
return new DataSourceProxy(druidDataSource);
//XA模式
//return new DataSourceProxyXA(druidDataSource);
}
}
在启动类上排除DataSource数据源自动配置类
@EnableFeignClients
@EnableDiscoveryClient
@MapperScan({“com.zking.zmall.mapper”})
@SpringBootApplication(exclude = DataSourceAutoConfiguration.class)
public class ZmallOrderApplication {
public static void main(String[] args) {
SpringApplication.run(ZmallOrderApplication.class, args);
}
}
在需要进行分布式事务的微服务中进行下面几项配置:
application.yml
在需要进行分布式事务的各个微服务中的application.yml数据源更新成阿里巴巴的DruidDataSource
registry.conf
在微服务模块中的resources目录下添加seata的配置文件 registry.conf,该配置文件来自于seata-server/conf目录下的配置文件。
bootstrap.yaml
请注意spring.cloud.alibaba.seata.tx-service-group=my_test_txgroup配置。这里的my_test_tx_group名称必须与seata源码包中的config.txt配置文件中的名字一致。重要,重要,重要!!!
在order微服务开启全局事务
@GlobalTransactional //seata全局事务控制
@Transactional
@Override
public Order createOrder(Integer pid, Integer num) {
//根据商品ID修改商品对应的库存
productService.updateStock(pid,num);
//模拟程序执行报错
int i=1 / 0;
//新增订单
Order order=new Order();
//此处只是做模拟操作
this.save(order);
return order;
}
测试
再次下单测试,打开浏览器输入测试地址:http://order.zmall.com/createOrder/733/1
这时控制台中的order订单服务窗口已经产生错误信息,如下:
然后再次查看product商品服务窗口可以发现商品库存扣减的SQL语句已经产生了,但是由于seata的分布式事务的干预数据库中对应商品的库存数量并没有扣减成功,则证明seata分布式事务配置成功。
这里可以在订单的业务方法处设置debug断点,查看undo_log表中的数据情况。释放断点,出现异常,事务回滚,undo_log中的临时数据清空。
④seata运行流程分析
要点说明:
1、每个RM使用DataSourceProxy连接数据库,其目的是使用ConnectionProxy,使用数据源和数据连
接代理的目的就是在第一阶段将undo_log和业务数据放在一个本地事务提交,这样就保存了只要有业务
操作就一定有undo_log。
2、在第一阶段undo_log中存放了数据修改前和修改后的值,为事务回滚作好准备,所以第一阶段完成
就已经将分支事务提交,也就释放了锁资源。
3、TM开启全局事务开始,将XID全局事务id放在事务上下文中,通过feign调用也将XID传入下游分支
事务,每个分支事务将自己的Branch ID分支事务ID与XID关联。
4、第二阶段全局事务提交,TC会通知各各分支参与者提交分支事务,在第一阶段就已经提交了分支事
务,这里各各参与者只需要删除undo_log即可,并且可以异步执行,第二阶段很快可以完成。
5、第二阶段全局事务回滚,TC会通知各各分支参与者回滚分支事务,通过 XID 和 Branch ID 找到相应
的回滚日志,通过回滚日志生成反向的 SQL 并执行,以完成分支事务回滚到之前的状态,如果回滚失
败则会重试回滚操作。