【深入理解SpringCloud微服务】了解微服务的熔断、限流、降级,手写实现一个微服务熔断限流器
- 服务雪崩
- 熔断、限流、降级
- 熔断
- 降级
- 限流
- 手写实现一个微服务熔断限流器
- 架构设计
- 代码实现
- 整体逻辑
- ProtectorAspect#aroundMethod(ProceedingJoinPoint)具体实现
- 1、获取接口对应的类名、方法名、属性类型
- 2、如果接口启用了限流器,调用限流器进行验证是否需要限流
- 3、如果接口启用了断路器,检查断路器状态决定是否继续往下执行
- 4、执行目标方法,并记录断路器(成功/失败)
- 降级回调
服务雪崩
服务雪崩时分布式系统中会遇到的一个问题,由于分布式系统存在服务间调用的关系,一个服务不可用,往往会影响到调用该服务的另外一个服务。
在分布式系统中,一个服务由于程序bug或流量过大导致不可用,进而导致调用该服务的另一个服务也不可用,然后这种不可用继续影响上游其他服务导致其不可用,这种不可用的情况不断放大的过程,就是服务雪崩。
而应对这种服务雪崩问题的解决方案就是熔断、限流、降级。
熔断、限流、降级
熔断
熔断的作用类似于电路中的保险丝。电路中正确安置保险丝,保险丝就会在电流异常升高到一定的高度和热度的时候,自身熔断切断电流,保护了电路安全运行。
熔断通过增加一个断路器实现。
断路器一般有三个状态:关闭、打开、半开。
- 关闭状态:如同一个开关处于闭合状态,此时接口可以执行正常的逻辑处理。
- 打开状态:如果一个开关处于打开状态,此时接口的所有请求都会被降级处理。
- 半开状态:处于半开状态的断路器,允许一个请求执行正常的逻辑处理,这相当于是试探性的调用一下,如果能正常返回结果,那么断路器恢复为闭合状态,否则变为打开状态。
断路器的三种状态间的转换如下图:
当断路器处于打开状态时,由于接口的正常逻辑无法执行,此时就是进行降级处理。
降级
降级是指原有的逻辑无法正常执行,转而执行的一段备用的有损的逻辑。
比如服务A的接口调用服务B的接口发生异常或超时,那么服务A的接口进行降级逻辑,调用本地的一个方法,返回默认的数据。
除此以外,断路器打开,那么也会执行降级逻辑,而不会走正常逻辑。
限流
限流也叫流控,也就是流量控制,作用是限制流入服务接口的流量大小。每一个服务或接口能扛住的流量大小都是有上限的,如果超过了这个上限,就有可能造成性能下降或服务不可用。
限流通过在服务或接口前添加限流器实现。如果流量没有超过限流器的阈值,那么请求可以正常通过;如果流量超过限流器的阈值,那么超过阈值的这一部分请求就要被拦截掉。
被流控的请求,可以报错返回错误信息,也可以走降级处理的逻辑。
限流器的限流算法包括:
- 简单时间窗算法
- 滑动时间窗算法
- 漏桶算法
- 令牌桶算法
这些算法在下面会用代码进行解析,这里先跳过。
手写实现一个微服务熔断限流器
架构设计
我们设计的熔断限流框架包括的组件有:
- FlowLimiter:限流器
- Breaker:断路器
- Fallback:降级逻辑
当我们的系统接进来一个请求时,先经过FlowLimiter(限流器)处理。如果FlowLimiter的当前流量还没有达到限流阈值,请求正常往下走;如果FlowLimiter的当前流量已经达到了限流阈值,那么就转而走Fallback(降级逻辑)进行降级处理。
如果一个请求成功通过了FlowLimiter后,就会到Breaker(断路器)的处理。断路器判断自身当前状态,如果是关闭状态或半开状态,请求都可以正常通过;如果断路器状态是打开状态,那么请求走Fallback(降级逻辑)进行降级处理。
如果一个请求成功通过了Breaker后,就会执行目标方法。如果目标方法正常执行,那么返回正常处理结果;如果目标方法执行异常或超时,那么请求走Fallback(降级逻辑)进行降级处理。
总体上,我们的框架是通过AOP给目标方法生成动态代理对象,在AOP的增强逻辑中实现的熔断、限流、降级等功能。
代码实现
整体逻辑
首先我们定义一个注解。
@Documented
@Retention(RetentionPolicy.RUNTIME)
@Target({ElementType.METHOD})
public @interface Protected {
...
}
@Protected注解用于修饰指定接口或方法,表示这个接口或方法已经被我们的框架保护,具有熔断、限流、降级等效果。
然后需要定义一个切面类。
/**
* @author huangjunyi
* @date 2023/12/19 19:24
* @desc
*/
@Aspect
public class ProtectorAspect implements PriorityOrdered {
...
@Pointcut("@annotation(com.huangjunyi1993.simple.microservice.protector.aop.Protected)")
public void pointCut() {
}
@Around(value = "pointCut()")
public Object aroundMethod(ProceedingJoinPoint joinPoint) throws IllegalAccessException, InstantiationException, ClassNotFoundException {
...
}
...
}
我们通过SpringAOP扫描被@Protected注解修饰的方法,给它们生成代理对象,并且通过SpringAOP的@Around注解定义了环绕增强逻辑,当请求被@Protected注解修饰的接口方法接收时,就会进入这里的增强逻辑,增强逻辑做的自然就是熔断、限流、降级等这些事情。
而使用的时候,只要在对应接口方法上添加@Protected注解,并指定对应的属性即可(@Protected注解中的各属性会在后面讲解)
@GetMapping("/flow/simple")
@Protected(name = "testFlowSimple", enableFlowLimiter = true, flowLimiterName = FlowLimiterNameConstant.SIMPLE_TIME_WINDOW, limit = 2) // 测试限流效果
public Map<String, Object> testFlowSimple() {
...
}
ProtectorAspect#aroundMethod(ProceedingJoinPoint)整体流程:
@Around(value = "pointCut()")
public Object aroundMethod(ProceedingJoinPoint joinPoint) throws IllegalAccessException, InstantiationException, ClassNotFoundException {
// 1、获取接口对应的类名、方法名、属性类型
...
// 2、如果接口启用了限流器,调用限流器进行验证是否需要限流
if (!annotation.enableFlowLimiter() || checkCanPass(annotation, name)) {
...
// 3、如果接口启用了断路器,检查断路器整体决定是否继续往下执行
// 如果断路器打开,执行fallback逻辑
if (annotation.enableBreaker()) {
...
}
// 4、执行目标方法,并记录断路器(成功/失败),
// 如果超时或异常,执行fallback逻辑
}
// 被流控了,执行fallback逻辑
}
下面就对ProtectorAspect#aroundMethod(ProceedingJoinPoint)方法中的具体实现进行讲解。
ProtectorAspect#aroundMethod(ProceedingJoinPoint)具体实现
1、获取接口对应的类名、方法名、属性类型
@Around(value = "pointCut()")
public Object aroundMethod(ProceedingJoinPoint joinPoint) throws IllegalAccessException, InstantiationException, ClassNotFoundException {
// 1、获取接口对应的类名、方法名、属性类型
MethodSignature methodSignature = (MethodSignature) joinPoint.getSignature();
// 类名
String className = methodSignature.getDeclaringTypeName();
// 方法名
String methodName = methodSignature.getMethod().getName();
// 参数类型列表
Class[] parameterTypes = methodSignature.getParameterTypes();
// @Protected注解
Protected annotation = methodSignature.getMethod().getAnnotation(Protected.class);
// 取得@Protected注解的name属性,如果没有,则生成一个
String name = StringUtils.isNotBlank(annotation.name()) ? annotation.name() : EntryPointSymbolGenerator.get(className, methodName, parameterTypes);
// 如果@Protected注解指定了Fallback实现类全限定名,生成并缓存该Fallback实例
Fallback fallback = null;
if (StringUtils.isNotBlank(annotation.fallback())) {
fallback = FallbackCacheUtil.getInstance(annotation.fallback());
}
// 2、如果接口启用了限流器,调用限流器进行验证是否需要限流
if (!annotation.enableFlowLimiter() || checkCanPass(annotation, name)) {
...
// 3、如果接口启用了断路器,检查断路器状态决定是否继续往下执行
// 如果断路器打开,执行fallback逻辑
if (annotation.enableBreaker()) {
...
}
// 4、执行目标方法,并记录断路器(成功/失败),
// 如果超时或异常,执行fallback逻辑
}
// 被流控了,执行fallback逻辑
}
第一步没做什么特别的事情,就是获取类名className、方法名methodName、参数类型列表parameterTypes。
除此以外获取@Protected注解的name属性,下面会用这个name去生成限流器FlowLimiter和断路器Breaker,不同的FlowLimiter和Breaker之间,就是通过name区分。
然后还会判断一下@Protected注解是否声明了当前接口方法对应的Fallback实现类全限定名,也就是@Protected注解的fallback属性是否不为空,如果不为空,那么生成并缓存对应的Fallback实例。
FallbackCacheUtil.getInstance(annotation.fallback())生成并缓存Fallback实现类实例的逻辑:
/**
* @author huangjunyi
* @date 2023/12/27 15:26
* @desc
*/
public class FallbackCacheUtil {
private static final Map<String, Fallback> CALLBACK_CACHE_MAP = new HashMap<>();
public static Fallback getInstance(String fallbackClassName) throws ClassNotFoundException, IllegalAccessException, InstantiationException {
// 缓存Map中有,则从缓存中返回
if (CALLBACK_CACHE_MAP.containsKey(fallbackClassName)) {
return CALLBACK_CACHE_MAP.get(fallbackClassName);
}
// 反射创建并放入缓存Map
Fallback callback = (Fallback) Class.forName(fallbackClassName).newInstance();
CALLBACK_CACHE_MAP.put(fallbackClassName, callback);
return callback;
}
}
2、如果接口启用了限流器,调用限流器进行验证是否需要限流
然后接下来轮到限流器出场,就是这个if判断:
// 2、如果接口启用了限流器,调用限流器进行验证是否需要限流
if (!annotation.enableFlowLimiter() || checkCanPass(annotation, name)) {
...
}
限流器的所有逻辑都在这个if判断中。
首先是判断@Protected注解的enableFlowLimiter属性是否为true,如果为true,表示启用限流器,才会有后面的checkCanPass(annotation, name)判断;如果enableFlowLimiter属性为false,表示不启用限流器,那么就直接通过了,也就是直接进入if分支内进行下一步处理。
private boolean checkCanPass(Protected annotation, String name) {
// 根据name通过限流器工厂获取对应的限流器
FlowLimiter flowLimiter = flowLimiterFactory.get(name);
if (flowLimiter == null) {
// 如果限流去工厂没有缓存的限流器,调用限流器工厂创建一个并缓存
flowLimiter = flowLimiterFactory.create(annotation);
}
// 调用限流器的canPass方法进行验证
return flowLimiter.canPass(name);
}
上面的flowLimiterFactory是FlowLimiterFactory类型,FlowLimiterFactory是一个限流器工厂接口。
/**
* @author huangjunyi
* @date 2024/1/2 19:39
* @desc
*/
public interface FlowLimiterFactory {
/**
* 根据资源名称获取限流器
* @param name 资源名称 {@link com.huangjunyi1993.simple.microservice.protector.aop.Protected#name()}
* @return 限流器
*/
FlowLimiter get(String name);
/**
* 根据注解创建指定的限流器
* @param annotation Protected注解 {@link com.huangjunyi1993.simple.microservice.protector.aop.Protected}
* @return 限流器
*/
FlowLimiter create(Protected annotation);
}
用户可以实现自定义的FlowLimiterFactory,也可以使用默认的FlowLimiterFactory实现类。默认的FlowLimiterFactory是CacheFlowLimiterFactory,但CacheFlowLimiterFactory不是直接实现的FlowLimiterFactory接口,而是继承了SPIFlowLimiterFactory,SPIFlowLimiterFactory才实现了FlowLimiterFactory接口。
SPIFlowLimiterFactory通过Java的SPI机制创建FlowLimiter实例:
public abstract class SPIFlowLimiterFactory implements FlowLimiterFactory {
@Override
public FlowLimiter create(Protected annotation) {
// 通过ServiceLoader加载所有FlowLimiterBuilder
ServiceLoader<FlowLimiterBuilder> flowLimiterBuilders = ServiceLoader.load(FlowLimiterBuilder.class);
for (FlowLimiterBuilder builder : flowLimiterBuilders) {
// 获取FlowLimiterBuilder上修饰的@FlowLimiterName注解,
// 看是否与@Protected中的flowLimiterName属性匹配,
// 如果匹配,调用FlowLimiterBuilder的build方法创建
if (StringUtils.equals(builder.getClass().getAnnotation(FlowLimiterName.class).name(), annotation.flowLimiterName())) {
return builder.build(annotation);
}
}
throw new ProtectorException("Create flowLimiter failed");
}
}
比如@Protected注解声明了属性flowLimiterName = FlowLimiterNameConstant.SIMPLE_TIME_WINDOW,那么就匹配到简单时间窗算法的FlowLimiterBuilder,它会创建一个SimpleTimeWindowFlowLimiter(简单时间窗限流器)
/**
* @author huangjunyi
* @date 2024/1/3 19:34
* @desc
*/
@FlowLimiterName(name = SIMPLE_TIME_WINDOW)
public class SimpleTimeWindowFlowLimiterBuilder implements FlowLimiterBuilder {
@Override
public FlowLimiter build(Protected annotation) {
if (annotation.limit() <= 0) {
throw new ProtectorException("Protected annotation parameter invalid");
}
return new SimpleTimeWindowFlowLimiter(annotation.limit());
}
}
SPIFlowLimiterFactory是抽象类,CacheFlowLimiterFactory继承了SPIFlowLimiterFactory,添加了缓存已创建的FlowLimiter的逻辑,避免重复创建。
/**
* @author huangjunyi
* @date 2024/1/2 19:43
* @desc
*/
public class CacheFlowLimiterFactory extends SPIFlowLimiterFactory{
private static final Map<String, FlowLimiter> FLOW_LIMITER_MAP = new ConcurrentHashMap<>();
@Override
public FlowLimiter get(String name) {
// 直接从map缓存中取
return FLOW_LIMITER_MAP.get(name);
}
@Override
public FlowLimiter create(Protected annotation) {
// 调用父类SPIFlowLimiterFactory进行创建,然后缓存到map中
FlowLimiter flowLimiter = super.create(annotation);
FLOW_LIMITER_MAP.put(annotation.name(), flowLimiter);
return flowLimiter;
}
}
用户可以实现自己的FlowLimiter和FlowLimiterBuilder,只要FlowLimiterBuilder上的@FlowLimiterName注解与接口方法上的@Protected注解声明的属性flowLimiterName匹配,自定义的FlowLimiter就会生效。
然后flowLimiter.canPass(name)就是调用断路器FlowLimiter的检查方法,检查当前请求是否需要被限流。canPass方法返回true,表示请求可通过,无需限流;否则返回false,表示请求不可通过,需要限流。
FlowLimiter也是一个接口,提供了简单时间窗、滑动时间窗、漏桶、令牌桶四种限流算法的FlowLimiter实现,这些算法打算在下一篇文章分析。用户也可以实现自己的FlowLimiter自定义限流流算法。
3、如果接口启用了断路器,检查断路器状态决定是否继续往下执行
@Around(value = "pointCut()")
public Object aroundMethod(ProceedingJoinPoint joinPoint) throws IllegalAccessException, InstantiationException, ClassNotFoundException {
// 1、获取接口对应的类名、方法名、属性类型
...
// 2、如果接口启用了限流器,调用限流器进行验证是否需要限流
if (!annotation.enableFlowLimiter() || checkCanPass(annotation, name)) {
...
// 3、如果接口启用了断路器,检查断路器状态决定是否继续往下执行
// 如果断路器打开,执行fallback逻辑
if (annotation.enableBreaker()) {
// 从断路器工厂BreakerFactory中获取断路器
breaker = breakerFactory.get(name);
if (breaker == null) {
// 如果没有获取到,创建断路器
breaker = createBreaker(breakerFactory, name, annotation);
}
// 调用断路器进行检验
if (!breaker.canPass()) {
throw new ProtectorException("breaker has tripped");
}
}
// 4、执行目标方法,并记录断路器(成功/失败),
// 如果超时或异常,执行fallback逻辑
}
// 被流控了,执行fallback逻辑
}
private Breaker createBreaker(BreakerFactory breakerFactory, String name, Protected annotation) {
// 调用断路器工厂创建断路器并缓存
// @Protected的timeSpan()属性表示统计失败率的时间跨度
// @Protected的maxFailedNum()属性表示断路器最大容忍失败数
// @Protected的openTime()属性表示断路器打开时的持续时间
return breakerFactory.create(name, annotation.timeSpan(), annotation.maxFailedNum(), annotation.openTime());
}
断路器创建的流程基本跟限流器一样,先看看BreakerFactoryd的get方法有没有,没有就调工厂的create方法创建。
BreakerFactory是一个接口:
public interface BreakerFactory {
/**
* 获取断路器
* @param name 资源名称 {@link com.huangjunyi1993.simple.microservice.protector.aop.Protected#name()}
* @return 断路器
*/
Breaker get(String name);
/**
* 创建断路器
* @param name 资源名称 {@link com.huangjunyi1993.simple.microservice.protector.aop.Protected#name()}
* @param timeSpan 时间跨度 {@link com.huangjunyi1993.simple.microservice.protector.aop.Protected#timeSpan()}
* @param maxFailedNum 时间跨度内最大允许失败数 {@link com.huangjunyi1993.simple.microservice.protector.aop.Protected#maxFailedNum()}
* @param openTime 断路器开路时间 {@link com.huangjunyi1993.simple.microservice.protector.aop.Protected#openTime()}
* @return 断路器
*/
Breaker create(String name, int timeSpan, int maxFailedNum, long openTime);
}
我们提供的默认实现类是BasicBreakerFactory,当然用户也可以自己实现自定义BreakerFactory。
/**
* @author huangjunyi
* @date 2023/12/28 11:02
* @desc
*/
public class BasicBreakerFactory implements BreakerFactory {
private static final Map<String, Breaker> BREAKER_MAP = new ConcurrentHashMap<>();
@Override
public Breaker get(String name) {
// 从缓存map中取
return BREAKER_MAP.get(name);
}
@Override
public Breaker create(String name, int timeSpan, int maxFailedNum, long openTime) {
// BasicBreakerFactory默认创建的就是BasicBreaker
Breaker breaker = new BasicBreaker(timeSpan, maxFailedNum, openTime);
// 放入缓存
BREAKER_MAP.put(name, breaker);
return breaker;
}
}
BasicBreaker中的断路器算法,也是放在后面的文章分析。
4、执行目标方法,并记录断路器(成功/失败)
```java
@Around(value = "pointCut()")
public Object aroundMethod(ProceedingJoinPoint joinPoint) throws IllegalAccessException, InstantiationException, ClassNotFoundException {
// 1、获取接口对应的类名、方法名、属性类型
...
// 2、如果接口启用了限流器,调用限流器进行验证是否需要限流
if (!annotation.enableFlowLimiter() || checkCanPass(annotation, name)) {
...
// 3、如果接口启用了断路器,检查断路器状态决定是否继续往下执行
// 如果断路器打开,执行fallback逻辑
if (annotation.enableBreaker()) {
...
}
// 4、执行目标方法,并记录断路器(成功/失败),
// 如果超时或异常,执行fallback逻辑
if (annotation.enableTimeout() && annotation.timeout() > 0) {
Fallback finalFallback = fallback;
try {
Breaker finalBreaker = breaker;
// 如果@Protected中的enableTimeout属性为true,表示开启超时保护机制,
// 并且@Protected设置的timeout超时时间大于零
// 在CompletableFuture#supplyAsync()中执行
// CompletableFuture#get(timeout)阻塞等待timeout指定的事件长度,超过了就不等了,作超时处理
return CompletableFuture
.supplyAsync(() -> execute(joinPoint, finalFallback, finalBreaker, annotation.enableBreaker()))
.get(annotation.timeout(), TimeUnit.MILLISECONDS);
} catch (TimeoutException e) {
LOGGER.error("ProtectorAspect aroundMethod timeout:", e);
if (annotation.enableBreaker()) {
// 启用了断路器,记录断路器一次失败
breaker.failed();
}
if (finalFallback != null) {
// Fallback不为空,执行Fallback的降级方法
return finalFallback.process(joinPoint.getArgs());
}
// Fallback为空,则抛异常
throw new ProtectorException(String.format("%s.%s timeout", className, methodName));
} catch (InterruptedException|ExecutionException e) {...}
}
// 接口未开启超时保护,直接执行
return execute(joinPoint, fallback, breaker, annotation.enableBreaker());
}
// 被流控了,执行fallback逻辑
}
如果@Protected中的enableTimeout属性为true,并且@Protected设置的timeout超时时间大于零,那么在CompletableFuture的supplyAsync方法中执行,并且get(annotation.timeout(), TimeUnit.MILLISECONDS)等待指定时长。
如果@Protected中的enableTimeout属性为false,或者@Protected设置的timeout超时时间等于零,那么就直接执行。
private Object execute(ProceedingJoinPoint joinPoint, Fallback fallback, Breaker breaker, boolean enableBreaker) {
try {
Object result = doExecute(joinPoint, fallback);
if (enableBreaker) {
// 启用了断路器,记录一个成功计数
breaker.success();
}
return result;
} catch (Throwable throwable) {
LOGGER.error("ProtectorAspect aroundMethod error:", throwable);
if (enableBreaker) {
// 启用了断路器,记录一个失败计数
breaker.failed();
}
if (fallback != null) {
// fallback不为null,调用降级逻辑
return fallback.process(joinPoint.getArgs());
}
// fallback为null就抛异常
throw new ProtectorException(throwable.getMessage());
}
}
private Object doExecute(ProceedingJoinPoint joinPoint, Fallback fallback) throws Throwable {
// 执行目标方法
return joinPoint.proceed();
}
execute方法中执行目标方法,如果成功,则记录断路器一个成功计数,如果失败,则计数断路器一个失败计数。
如果执行失败的话(也就是超时或发生异常),如果fallback不为null(如果@Protected注解指定了Fallback实现类全限定名,就不为null),那么会执行Fallback实现类的降级处理方法。
降级回调
降级处理逻辑抽象成了一个接口Fallback。
/**
* @author huangjunyi
* @date 2023/12/18 19:51
* @desc
*/
@FunctionalInterface
public interface Fallback {
Object process(Object[] args);
}
Fallback的process就是降级处理逻辑,需要用户自定义Fallback实现类并实现process方法中自定义的降级处理逻辑。
当调用一个接口方法时,如果被流控,或者断路器处于打开状态,或者执行目标方法时超时或抛出异常,并且接口设置了降级回调,那么执行对应Fallback实现类的降级处理方法。
@GetMapping("/fallback")
@Protected(name = "fallback", fallback = "com.huangjunyi1993.simple.microservice.protector.example.fallback.TestFallback") // 测试降级效果
public Map<String, Object> fallback() throws Exception {
throw new Exception("test fallback");
}
比如@Protected的fallback属性设置了Fallback的实现类TestFallback的全限定名,那么当执行目标方法时超时或抛出异常,就会执行TestFallback的process(Object[] args)方法。
代码以提交到gitee,可以自行下载阅读。
https://gitee.com/huang_junyi/simple-microservice/tree/master/simple-microservice-protector