Hystrix官方已经停止开发了,Hystrix官方推荐使用新一代熔断器作为Resilience4j。作为新一代的熔断器,Resilience4j有很多优势,比如依赖少,模块化程度较好等优势。
Resilience4j是受Hystrix启发而做的熔断器,通过管理远程调用的容错处理来帮助实现一个健壮的系统。resilience4j提供了更好用的API,并且提供了很多其他功能比如Rate Limiter(限流器)、Bulkhead(舱壁隔离)、熔断器、重试、缓存、限时器等。
-
限时器
引入resilience4j-timelimiter[6]依赖。可以使用TimeLimiter限制调用远程服务所花费的时间。 -
重试
引入resilience4j-retry[4]库。用户可参与的设置:
- 最大尝试数。
- 重试前等待时间。
- 自定义函数,用于修改故障后的等待间隔。
- 自定义谓词,用于评估异常是否应重试。
-
舱壁隔离
引入resilience4j-bulkhead[3]依赖。可以限制对特定服务的并发调用数。用户可参与的设置:允许的最大并行数、线程等待的最大时间。
-
限流器
引入resilience4j-ratelimiter[2]依赖。可以允许限制对某些服务的访问。用户可参与的设置:limit刷新周期、刷新周期的权限限制、默认等待权限持续时间。 -
熔断器
熔断器有三种可能状态:
- 关闭:服务正常,不需要进行短路。
- 打开:远程服务宕机,所有请求都短路。
- 半开:进入打开状态一段时间后,熔断器允许检查远程服务是否恢复。
用户可参与的设置:
- 熔断器进入打开状态的阈值。
- 等待时间,即熔断器进入打开状态到半开状态需要等待的时间。
- 熔断器半开或者闭合时,ring buffer的大小。
- 处理自定义事件的监听器。
- 自定义谓词,用于评估异常是否应算作故障,从而提高故障率。
FeignAutoConfiguration、BulkheadAutoConfiguration【舱壁隔离的核心类】、RetryAutoConfiguration【重试功能的核心类】、CircuitBreakerAutoConfiguration【熔断器的核心类】、RateLimiterAutoConfiguration【限流器的核心类】、TimeLimiterAutoConfiguration【限时器的核心类】。
在FeignAutoConfiguration
核心类中熔断器功能的引导类之Targeter子类FeignCircuitBreakerTargeter。
feign.circuitbreaker.enabled【false】:则通过DefaultTargeter路由至抽象类Feign完成Feign客户端的代理【ReflectiveFeign利用InvocationHandler之FeignInvocationHandler】。
feign.circuitbreaker.enabled【true】:则通过FeignCircuitBreakerTargeter
路由至FeignCircuitBreaker完成【Feign客户端】的代理【ReflectiveFeign利用InvocationHandler之FeignCircuitBreakerInvocationHandler
、MethodHandler之SynchronousMethodHandler】。
SynchronousMethodHandler内部是通过客户端FeignBlockingLoadBalancerClient
完成请求的执行。
1.resilience4j涉及的相关配置类
public class CommonProperties {
Map<String, String> tags = new HashMap<>();
public Map<String, String> getTags() {
return tags;
}
public void setTags(Map<String, String> tags) {
this.tags = tags;
}
}
父类CommonProperties涉及的子类包含RateLimiterConfigurationProperties、CircuitBreakerConfigurationProperties、RetryConfigurationProperties、TimeLimiterConfigurationProperties等。
2.FeignCircuitBreakerInvocationHandler
class FeignCircuitBreakerInvocationHandler implements InvocationHandler {
private final boolean c;
@Override
public Object invoke(final Object proxy, final Method method, final Object[] args) throws Throwable {
...
String circuitName = circuitBreakerNameResolver.resolveCircuitBreakerName(feignClientName, target, method);
// 如果存在链路追踪sleuth选择TraceCircuitBreaker,否则选择Resilience4JCircuitBreaker
CircuitBreaker circuitBreaker = c ? factory.create(circuitName, feignClientName): factory.create(circuitName);
Supplier<Object> supplier = asSupplier(method, args);
if (this.nullableFallbackFactory != null) {
Function<Throwable, Object> fallbackFunction = throwable -> {
Object fallback = this.nullableFallbackFactory.create(throwable);
// 执行降级逻辑
return this.fallbackMethodMap.get(method).invoke(fallback, args);
};
//通过TraceCircuitBreaker的CircuitBreaker类型的属性之delegate取值Resilience4JCircuitBreaker完成下游服务的调用
return circuitBreaker.run(supplier, fallbackFunction);
}
return circuitBreaker.run(supplier);
}
private Supplier<Object> asSupplier(final Method method, final Object[] args) {
final RequestAttributes requestAttributes = RequestContextHolder.getRequestAttributes();
final Thread caller = Thread.currentThread();
return () -> {
// 利用SynchronousMethodHandler完成目标方法的调用
return dispatch.get(method).invoke(args);
};
}
}
2.1.Resilience4JCircuitBreaker
public class Resilience4JCircuitBreaker implements CircuitBreaker {
private final ExecutorService executorService;
public <T> T run(Supplier<T> toRun, Function<Throwable, T> fallback) {
io.vavr.collection.Map<String, String> tags = io.vavr.collection.HashMap.of("group",this.groupName);
CircuitBreaker defaultCircuitBreaker = registry.circuitBreaker(this.id,this.circuitBreakerConfig, tags);
circuitBreakerCustomizer.ifPresent(customizer -> customizer.customize(defaultCircuitBreaker));
TimeLimiter timeLimiter = timeLimiterRegistry.timeLimiter(id, timeLimiterConfig, tags);
if (bulkheadProvider != null) {
return bulkheadProvider.run(this.groupName, toRun, fallback, defaultCircuitBreaker, timeLimiter, tags);
}else {
if (executorService != null) {
// 异步触发Ribbon调用下游服务
Supplier<Future<T>> futureSupplier = () -> executorService.submit(toRun::get);
Callable restrictedCall = TimeLimiter.decorateFutureSupplier(timeLimiter, futureSupplier);
Callable<T> callable = CircuitBreaker.decorateCallable(defaultCircuitBreaker, restrictedCall);
return Try.of(callable::call)// 第一步:触发decorateCallable的Lambda表达式执行
.recover(fallback).get();// 第一步中存在异常则在recover内部回调fallback执行降级逻辑
}else {Supplier<T> decorator = CircuitBreaker.decorateSupplier(defaultCircuitBreaker, toRun);
return Try.of(decorator::get).recover(fallback).get();
}
}
}
}
public interface CircuitBreaker {
static <T> Callable<T> decorateCallable(CircuitBreaker circuitBreaker, Callable<T> callable) {
return () -> {
circuitBreaker.acquirePermission();
final long start = circuitBreaker.getCurrentTimestamp();
try {
T result = callable.call();//第二步:触发 TimeLimiter#decorateFutureSupplier的Lambda表达式执行
long duration = circuitBreaker.getCurrentTimestamp() - start;//下游服务响应花费的时间
circuitBreaker.onResult(duration, circuitBreaker.getTimestampUnit(), result);
return result;// 成功返回响应结果
} catch (Exception exception) {
long duration = circuitBreaker.getCurrentTimestamp() - start;
circuitBreaker.onError(duration, circuitBreaker.getTimestampUnit(), exception);
throw exception;
}
};
}
}
3.限时器之TimeLimiter
public class TimeLimiterImpl implements TimeLimiter {
private final TimeLimiterConfig timeLimiterConfig;
public TimeLimiterImpl(String name, TimeLimiterConfig config,io.vavr.collection.Map<String, String> tags) {
this.name = name;
this.tags = Objects.requireNonNull(tags, "Tags must not be null");
this.timeLimiterConfig = config;
this.eventProcessor = new TimeLimiterEventProcessor();
}
public <T, F extends Future<T>> Callable<T> decorateFutureSupplier(Supplier<F> futureSupplier) {
return () -> {
Future<T> future = futureSupplier.get();// 第三步:触发 Lambda表达式之Supplier执行,即触发Ribbon调用下游服务
try {
// 从timeLimiterConfig获取限时时间,如果在限时时间内没有得到下游服务的响应,则降级处理
T result = future.get(getTimeLimiterConfig().getTimeoutDuration().toMillis(),TimeUnit.MILLISECONDS);
onSuccess();
return result;// 成功返回响应结果
} catch (TimeoutException e) {//限时时间内没有得到下游服务的响应,则降级处理
TimeoutException timeoutException = TimeLimiter.createdTimeoutExceptionWithName(name, e);
onError(timeoutException);
if (getTimeLimiterConfig().shouldCancelRunningFuture()) {
future.cancel(true);
}
throw timeoutException;
}
};
}
}