SpringCloud之Resilience4j熔断器源码解析

news2024/11/14 19:39:00

Hystrix官方已经停止开发了,Hystrix官方推荐使用新一代熔断器作为Resilience4j。作为新一代的熔断器,Resilience4j有很多优势,比如依赖少,模块化程度较好等优势。

Resilience4j是受Hystrix启发而做的熔断器,通过管理远程调用的容错处理来帮助实现一个健壮的系统。resilience4j提供了更好用的API,并且提供了很多其他功能比如Rate Limiter(限流器)、Bulkhead(舱壁隔离)、熔断器、重试、缓存、限时器等。

  1. 限时器【TimeLimiterAutoConfiguration】
    引入resilience4j-timelimiter[6]依赖。可以使用TimeLimiter限制调用远程服务所花费的时间。

  2. 重试【RetryAutoConfiguration】
    引入resilience4j-retry[4]库。

    用户可参与的设置:

    • 最大尝试数。
    • 重试前等待时间。
    • 自定义函数,用于修改故障后的等待间隔。
    • 自定义谓词,用于评估异常是否应重试。
  3. 舱壁隔离【BulkheadAutoConfiguration】
    引入resilience4j-bulkhead[3]依赖。可以限制对特定服务的并发调用数。

    用户可参与的设置:允许的最大并行数、线程等待的最大时间。

  4. 限流器【RateLimiterAutoConfiguration】
    引入resilience4j-ratelimiter[2]依赖。可以允许限制对某些服务的访问。用户可参与的设置:limit刷新周期、刷新周期的权限限制、默认等待权限持续时间。

  5. 熔断器【CircuitBreakerAutoConfiguration】

    熔断器有三种可能状态:

    • 关闭:服务正常,不需要进行短路。
    • 打开:远程服务宕机,所有请求都短路。
    • 半开:进入打开状态一段时间后,熔断器允许检查远程服务是否恢复。

    用户可参与的设置:

    • 熔断器进入打开状态的阈值。
    • 等待时间,即熔断器进入打开状态到半开状态需要等待的时间。
    • 熔断器半开或者闭合时,ring buffer的大小。
    • 处理自定义事件的监听器。
    • 自定义谓词,用于评估异常是否应算作故障,从而提高故障率。

FeignAutoConfiguration核心类中熔断器功能的引导类之Targeter子类FeignCircuitBreakerTargeter。

feign.circuitbreaker.enabled【false】:则通过DefaultTargeter路由至抽象类Feign完成Feign客户端的代理【ReflectiveFeign利用InvocationHandler之FeignInvocationHandler】。

DefaultTargeter情况下读超时、连接超时的配置属性完全由FeignEncoderProperties类控制。注意与低版本的区别

feign.circuitbreaker.enabled【true】:则通过FeignCircuitBreakerTargeter路由至FeignCircuitBreaker完成【Feign客户端】的代理【ReflectiveFeign利用InvocationHandler之FeignCircuitBreakerInvocationHandler、MethodHandler之SynchronousMethodHandler】。

SynchronousMethodHandler内部是通过客户端FeignBlockingLoadBalancerClient完成请求的执行。

1.resilience4j涉及的相关配置类

public class CommonProperties {

    Map<String, String> tags = new HashMap<>();
    public Map<String, String> getTags() {
        return tags;
    }
    public void setTags(Map<String, String> tags) {
        this.tags = tags;
    }
}

父类CommonProperties涉及的子类包含RateLimiterConfigurationProperties、CircuitBreakerConfigurationProperties、RetryConfigurationProperties、TimeLimiterConfigurationProperties等。

1.1.熔断器相关配置

通过CircuitBreakerConfiguration初始化类CircuitBreakerRegistry,该类存储了全部CircuitBreaker实例。并且CircuitBreaker实例所需的配置信息是通过CommonProperties的子类转化为CircuitBreakerConfig后得到的。

类CircuitBreakerConfig存在的配置项只是CommonProperties的子类中部分配置项内容。

public class CircuitBreakerConfigurationProperties extends CommonProperties {
	//这个是可以针对特定业务线配置,但是其name值如下图中ID属性值。目前本人尚未解决是否可以简化name值的套路
    private Map<String, InstanceProperties> instances = new HashMap<>();
    private Map<String, InstanceProperties> configs = new HashMap<>();

    public Optional<InstanceProperties> findCircuitBreakerProperties(String name) {
        InstanceProperties instanceProperties = instances.get(name);
        if (instanceProperties == null) {
            instanceProperties = configs.get("default");
        }
        return Optional.ofNullable(instanceProperties);
    }

    public CircuitBreakerConfig createCircuitBreakerConfig(String backendName,
        InstanceProperties instanceProperties,
        CompositeCustomizer<CircuitBreakerConfigCustomizer> compositeCircuitBreakerCustomizer) {
        if (StringUtils.isNotEmpty(instanceProperties.getBaseConfig())) {
            InstanceProperties baseProperties = configs.get(instanceProperties.getBaseConfig());
            if (baseProperties == null) {
                throw new ConfigurationNotFoundException(instanceProperties.getBaseConfig());
            }
            return buildConfigFromBaseConfig(instanceProperties, baseProperties,
                compositeCircuitBreakerCustomizer,
                backendName);
        }
        return buildConfig(custom(), instanceProperties, compositeCircuitBreakerCustomizer,
            backendName);
    }
}

如上配置初始化过程只是将公共配置项复制到最终的CircuitBreakerConfig中,在属性configs中还是配置文件中配置的原始内容。
在这里插入图片描述

resilience4j: 
  timelimiter: 
    configs: 
      default: 
        timeoutDuration: 1s
  circuitbreaker: 
    configs: 
      myfans: 
        baseConfig: common // 公共配置项
        registerHealthIndicator: true // 独有配置项
      default:
        baseConfig: common
        registerHealthIndicator: true // 独有配置项
      common: 
        minimumNumberOfCalls: 3
        slidingWindowSize: 10
        automaticTransitionFromOpenToHalfOpenEnabled: true

如上所示,即类CircuitBreakerConfigurationProperties中configs属性中对应三个名为myfans、default、common的配置项内容。初始化CircuitBreakerConfig中配置时并不会将common中内容添加到myfans、default中。

2.FeignCircuitBreakerInvocationHandler

class FeignCircuitBreakerInvocationHandler implements InvocationHandler {
	private final boolean c;
	@Override
	public Object invoke(final Object proxy, final Method method, final Object[] args) throws Throwable {
		...
		String circuitName = circuitBreakerNameResolver.resolveCircuitBreakerName(feignClientName, target, method);
		// 如果存在链路追踪sleuth选择TraceCircuitBreaker,否则选择Resilience4JCircuitBreaker
		CircuitBreaker circuitBreaker = c ? factory.create(circuitName, feignClientName): factory.create(circuitName);
		Supplier<Object> supplier = asSupplier(method, args);
		if (this.nullableFallbackFactory != null) {
			Function<Throwable, Object> fallbackFunction = throwable -> {
				Object fallback = this.nullableFallbackFactory.create(throwable);
				// 执行降级逻辑
				return this.fallbackMethodMap.get(method).invoke(fallback, args);
			};
			//通过TraceCircuitBreaker的CircuitBreaker类型的属性之delegate取值Resilience4JCircuitBreaker完成下游服务的调用
			return circuitBreaker.run(supplier, fallbackFunction);
		}
		return circuitBreaker.run(supplier);
	}
		
	private Supplier<Object> asSupplier(final Method method, final Object[] args) {
		final RequestAttributes requestAttributes = RequestContextHolder.getRequestAttributes();
		final Thread caller = Thread.currentThread();
		return () -> {
			// 利用SynchronousMethodHandler完成目标方法的调用
			return dispatch.get(method).invoke(args);
		};
	}
}

2.1.Resilience4JCircuitBreaker

public class Resilience4JCircuitBreaker implements CircuitBreaker {
	private final ExecutorService executorService;
	public <T> T run(Supplier<T> toRun, Function<Throwable, T> fallback) {
		io.vavr.collection.Map<String, String> tags = io.vavr.collection.HashMap.of("group",this.groupName);
		//ID:标识某个Feign客户端中某个方法。返回接口CircuitBreaker的子类为CircuitBreakerStateMachine
		CircuitBreaker defaultCircuitBreaker = registry.circuitBreaker(this.id,this.circuitBreakerConfig, tags);
		circuitBreakerCustomizer.ifPresent(customizer -> customizer.customize(defaultCircuitBreaker));
		TimeLimiter timeLimiter = timeLimiterRegistry.timeLimiter(id, timeLimiterConfig, tags);
		if (bulkheadProvider != null) {
			return bulkheadProvider.run(this.groupName, toRun, fallback, defaultCircuitBreaker, timeLimiter, tags);
		}else {
			if (executorService != null) {
				// 异步触发Ribbon调用下游服务
				Supplier<Future<T>> futureSupplier = () -> executorService.submit(toRun::get);
				Callable restrictedCall = TimeLimiter.decorateFutureSupplier(timeLimiter, futureSupplier);
				Callable<T> callable = CircuitBreaker.decorateCallable(defaultCircuitBreaker, restrictedCall);
				
				return Try.of(callable::call)// 第一步:触发decorateCallable的Lambda表达式执行
							  .recover(fallback).get();// 第一步中存在异常则在recover内部回调fallback执行降级逻辑
			}else {Supplier<T> decorator = CircuitBreaker.decorateSupplier(defaultCircuitBreaker, toRun);
				return Try.of(decorator::get).recover(fallback).get();
			}
		}
	}
}

CircuitBreakerStateMachine:初始化维护当前ID对应的熔断器的状态。初始状态为ClosedState

public interface CircuitBreaker {

	static <T> Callable<T> decorateCallable(CircuitBreaker circuitBreaker, Callable<T> callable) {
	    return () -> {
	        circuitBreaker.acquirePermission();
	        final long start = circuitBreaker.getCurrentTimestamp();
	        try {
	            T result = callable.call();//第二步:触发 TimeLimiter#decorateFutureSupplier的Lambda表达式执行
	            long duration = circuitBreaker.getCurrentTimestamp() - start;//下游服务响应花费的时间
	            circuitBreaker.onResult(duration, circuitBreaker.getTimestampUnit(), result);
	            return result;// 成功返回响应结果
	        } catch (Exception exception) {
	            long duration = circuitBreaker.getCurrentTimestamp() - start;
	            circuitBreaker.onError(duration, circuitBreaker.getTimestampUnit(), exception);
	            throw exception;
	        }
	    };
	}
}

3.限时器之TimeLimiter

public class TimeLimiterImpl implements TimeLimiter {
	
	private final TimeLimiterConfig timeLimiterConfig;
	
	public TimeLimiterImpl(String name, TimeLimiterConfig config,io.vavr.collection.Map<String, String> tags) {
        this.name = name;
        this.tags = Objects.requireNonNull(tags, "Tags must not be null");
        this.timeLimiterConfig = config;
        this.eventProcessor = new TimeLimiterEventProcessor();
    }
	public <T, F extends Future<T>> Callable<T> decorateFutureSupplier(Supplier<F> futureSupplier) {
	    return () -> {
	        Future<T> future = futureSupplier.get();// 第三步:触发 Lambda表达式之Supplier执行,即触发Ribbon调用下游服务
	        try {
	            // resilience4j.timelimiter.configs.default.timeoutDuration:默认为1秒。
	        	// 从timeLimiterConfig获取限时时间,如果在限时时间内没有得到下游服务的响应,则降级处理
	            T result = future.get(getTimeLimiterConfig().getTimeoutDuration().toMillis(),TimeUnit.MILLISECONDS);
	            onSuccess();
	            return result;// 成功返回响应结果
	        } catch (TimeoutException e) {//限时时间内没有得到下游服务的响应,则降级处理
	            TimeoutException timeoutException = TimeLimiter.createdTimeoutExceptionWithName(name, e);
	            onError(timeoutException);
	            if (getTimeLimiterConfig().shouldCancelRunningFuture()) {
	                future.cancel(true);
	            }
	            throw timeoutException;
	        }
	    };
	}
}

resilience4j.timelimiter.configs.default.timeoutDuration:下游服务响应超时时间。
resilience4j.timelimiter.configs.default.cancelRunningFuture:默认为true。表示出现异常之后是否中断当前请求的线程。

4.熔断策略

在这里插入图片描述
通过上述得知,每个CircuitBreakerStateMachine维护某个ID对应的熔断状态。

每个【状态CircuitBreakerState】实例化时都伴随一个与其对应的类CircuitBreakerMetrics的实例化。如下所示,CircuitBreakerMetrics类的作用:

  • 维护了可以统计当前ID运行相关指标的接口,即基于计数的滑动窗口 FixedSizeSlidingWindowMetrics 和基于时间滑动窗口 SlidingTimeWindowMetrics
  • 可以通过CircuitBreakerConfig的属性之SlidingWindowType选择不同的指标统计方式。

4.1.触发熔断时机

public interface CircuitBreaker {
	// circuitBreaker:CircuitBreakerStateMachine
	static <T> Callable<T> decorateCallable(CircuitBreaker circuitBreaker, Callable<T> callable) {
	    return () -> {
	        circuitBreaker.acquirePermission();//时机1 
	        final long start = circuitBreaker.getCurrentTimestamp();
	        try {
	            T result = callable.call();
	            long duration = circuitBreaker.getCurrentTimestamp() - start;
	            circuitBreaker.onResult(duration, circuitBreaker.getTimestampUnit(), result);//时机2
	            return result;
	        } catch (Exception exception) {
	            long duration = circuitBreaker.getCurrentTimestamp() - start;
	            circuitBreaker.onError(duration, circuitBreaker.getTimestampUnit(), exception);//时机3
	            throw exception;
	        }
	    };
	}
}

时机1:根据当前的熔断状态选择继续访问下游服务还是直接降级处理。

  • 如果是OpenState,则当前时间超过waitIntervalFunctionInOpenState则继续访问下游服务,否则直接降级处理。
  • 如果是HalfOpenState,如果permittedNumberOfCallsInHalfOpenState【默认为10】即运行放过的请求达到0则直接降级处理,否则继续访问下游服务。
  • 如果是ForcedOpenState,则所有请求直接降级处理。
  • 如果是DisabledState,则所有请求任何时候都访问下游服务。

时机2 & 时机3:目的是改变熔断器的状态,从而影响时机1的执行。一旦时机1执行过程中发生异常则请求就会放弃调用下游服务,直接选择降级逻辑。

注意:时机2、时机3如果不符合执行条件或者没有影响熔断器初始状态,则时机1永远是执行熔断器状态为关闭情况下的逻辑,此时对请求没有任何影响。

时机1 & 时机2 & 时机3:三者不会影响请求在执行过程中是否执行降级逻辑。

4.2.CircuitBreakerStateMachine

CircuitBreakerStateMachine内部维护不同类型的熔断状态。

public final class CircuitBreakerStateMachine implements CircuitBreaker {
	
	private final AtomicReference<CircuitBreakerState> stateReference;

	private CircuitBreakerMetrics(int sws,SlidingWindowType swt,CircuitBreakerConfig config,Clock clock) {
        if (swt == config.SlidingWindowType.COUNT_BASED) {
            this.metrics = new FixedSizeSlidingWindowMetrics(sws);
            this.minimumNumberOfCalls = Math.min(circuitBreakerConfig.getMinimumNumberOfCalls(), sws);
        } else {
            this.metrics = new SlidingTimeWindowMetrics(sws, clock);
            this.minimumNumberOfCalls = config.getMinimumNumberOfCalls();
        }
        this.failureRateThreshold = config.getFailureRateThreshold();
        this.slowCallRateThreshold = config.getSlowCallRateThreshold();
        this.slowCallDurationThresholdInNanos = config.getSlowCallDurationThreshold().toNanos();
        this.numberOfNotPermittedCalls = new LongAdder();
    }
	
	public void onError(long duration, TimeUnit durationUnit, Throwable throwable) {
        if (throwable instanceof CompletionException || throwable instanceof ExecutionException) {
            Throwable cause = throwable.getCause();
            handleThrowable(duration, durationUnit, cause);
        } else {
            handleThrowable(duration, durationUnit, throwable);//error1
        }
    }
	
	public void onSuccess(long duration, TimeUnit durationUnit) {
        publishSuccessEvent(duration, durationUnit);
        stateReference.get().onSuccess(duration, durationUnit);
    }
	
	private void handleThrowable(long duration, TimeUnit durationUnit, Throwable throwable) {
		 // resilience4j.circuitbreaker.configs.default.ignoreExceptionPredicate:判断是否忽略异常
        if (circuitBreakerConfig.getIgnoreExceptionPredicate().test(throwable)) {
            releasePermission();
            publishCircuitIgnoredErrorEvent(name, duration, durationUnit, throwable);
         // resilience4j.circuitbreaker.configs.default.recordExceptionPredicate:判断是否记录异常【默认方式】
        } else if (circuitBreakerConfig.getRecordExceptionPredicate().test(throwable)) {
            publishCircuitErrorEvent(name, duration, durationUnit, throwable);
            //首次触发时,其熔断器的状态为ClosedState
            stateReference.get().onError(duration, durationUnit, throwable);//error2
        } else {
            publishSuccessEvent(duration, durationUnit);
            stateReference.get().onSuccess(duration, durationUnit);
        }
    }
	
	public void transitionToOpenState() {
        stateTransition(OPEN,currentState -> new OpenState(currentState.attempts() + 1, currentState.getMetrics()));
    }
	//改变属性stateReference持有实例为OpenState
	private void stateTransition(State newState,
        UnaryOperator<CircuitBreakerState> newStateGenerator) {
        CircuitBreakerState previousState = stateReference.getAndUpdate(currentState -> {
            StateTransition.transitionBetween(getName(), currentState.getState(), newState);
            currentState.preTransitionHook();
            return newStateGenerator.apply(currentState);
        });
        ...
    }
	
	public void transitionToHalfOpenState() {
        stateTransition(HALF_OPEN, currentState -> new HalfOpenState(currentState.attempts()));
    }
}

stateReference#onSuccess || stateReference#onError:其实是调用具体某个熔断器状态中对应的方法,其目的是尽量改变熔断器的状态。

4.3.CircuitBreakerMetrics

内部维护的Metrics可以触发指标统计的相关算法。

class CircuitBreakerMetrics implements CircuitBreaker.Metrics {

	rivate final Metrics metrics;
    private final float failureRateThreshold;
    private final float slowCallRateThreshold;
    private final long slowCallDurationThresholdInNanos;
    private final LongAdder numberOfNotPermittedCalls;
    private int minimumNumberOfCalls;
    
	// 计数滑动窗口之FixedSizeSlidingWindowMetrics || 基于时间滑动窗口之SlidingTimeWindowMetrics
	private final Metrics metrics;

	public Result onSuccess(long duration, TimeUnit durationUnit) {
	    Snapshot snapshot;
	    // resilience4j.circuitbreaker.configs.default.slowCallDurationThresholdInNanos
	    if (durationUnit.toNanos(duration) > slowCallDurationThresholdInNanos) {
	        snapshot = metrics.record(duration, durationUnit, Outcome.SLOW_SUCCESS);
	    } else {
	        //FixedSizeSlidingWindowMetrics || SlidingTimeWindowMetrics
	        snapshot = metrics.record(duration, durationUnit, Outcome.SUCCESS);
	    }
	    return checkIfThresholdsExceeded(snapshot);
	}
	
	public Result onError(long duration, TimeUnit durationUnit) {//error4
	    Snapshot snapshot;
	    // resilience4j.circuitbreaker.configs.default.slowCallDurationThreshold:判断是否为慢请求【6000000000】。
	    if (durationUnit.toNanos(duration) > slowCallDurationThresholdInNanos) {
	        snapshot = metrics.record(duration, durationUnit, Outcome.SLOW_ERROR);// 慢请求
	    } else {
	    	//FixedSizeSlidingWindowMetrics || SlidingTimeWindowMetrics
	        snapshot = metrics.record(duration, durationUnit, Outcome.ERROR);// 快请求
	    }
	    return checkIfThresholdsExceeded(snapshot);
	}
	
	
	private Result checkIfThresholdsExceeded(Snapshot snapshot) {
	    float failureRateInPercentage = getFailureRate(snapshot);
	    float slowCallsInPercentage = getSlowCallRate(snapshot);
		
	    if (failureRateInPercentage == -1 || slowCallsInPercentage == -1) {
	        return Result.BELOW_MINIMUM_CALLS_THRESHOLD;//表示失败次数没有达到阈值
	    }
	    //resilience4j.circuitbreaker.configs.default.failureRateThreshold:失败的比率,默认为50
	     //resilience4j.circuitbreaker.configs.default.slowCallRateThreshold:慢调用失败的比率,默认为100
	    if (failureRateInPercentage >= failureRateThreshold && slowCallsInPercentage >= slowCallRateThreshold) {
	        return Result.ABOVE_THRESHOLDS;
	    }
	    if (failureRateInPercentage >= failureRateThreshold) {
	        return Result.FAILURE_RATE_ABOVE_THRESHOLDS;// 条件只打到失败比率
	    }
	
	    if (slowCallsInPercentage >= slowCallRateThreshold) {
	        return Result.SLOW_CALL_RATE_ABOVE_THRESHOLDS;// 条件只打到慢调用比率
	    }
	    return Result.BELOW_THRESHOLDS;
	}
	
	private float getSlowCallRate(Snapshot snapshot) {
		//resilience4j.circuitbreaker.configs.default.totalNumberOfCalls
        int bufferedCalls = snapshot.getTotalNumberOfCalls();
        if (bufferedCalls == 0 || bufferedCalls < minimumNumberOfCalls) {
            return -1.0f;
        }
        return snapshot.getSlowCallRate();
    }
	
    private float getFailureRate(Snapshot snapshot) {
    	//resilience4j.circuitbreaker.configs.default.totalNumberOfCalls:当前ID调用的实时总次数
        int bufferedCalls = snapshot.getTotalNumberOfCalls();
        //resilience4j.circuitbreaker.configs.default.minimumNumberOfCalls:当前ID配置的最少调用次数
        if (bufferedCalls == 0 || bufferedCalls < minimumNumberOfCalls) {
            return -1.0f;
        }
        return snapshot.getFailureRate();
    }
	
	enum Result {
        BELOW_THRESHOLDS,
        FAILURE_RATE_ABOVE_THRESHOLDS,
        SLOW_CALL_RATE_ABOVE_THRESHOLDS,
        ABOVE_THRESHOLDS,
        BELOW_MINIMUM_CALLS_THRESHOLD;
		
        public static boolean hasExceededThresholds(Result result) {
            return hasFailureRateExceededThreshold(result) || hasSlowCallRateExceededThreshold(result);
        }

        public static boolean hasFailureRateExceededThreshold(Result result) {
            return result == ABOVE_THRESHOLDS || result == FAILURE_RATE_ABOVE_THRESHOLDS;
        }

        public static boolean hasSlowCallRateExceededThreshold(Result result) {
            return result == ABOVE_THRESHOLDS || result == SLOW_CALL_RATE_ABOVE_THRESHOLDS;
        }
    }
}

hasExceededThresholds:失败比率、慢调用比率两者存在超过阈值的情况或者两者都超过阈值。
minimumNumberOfCalls:调用次数小于该变量配置值的情况下是不会触发熔断,但是可以降级。此时熔断状态就是始终处于关闭状态。

public class SnapshotImpl implements Snapshot {

    private final long totalDurationInMillis;
    private final int totalNumberOfSlowCalls;
    private final int totalNumberOfSlowFailedCalls;
    private final int totalNumberOfFailedCalls;
    private final int totalNumberOfCalls;

    ...
    @Override
    public float getSlowCallRate() {
        if (totalNumberOfCalls == 0) {
            return 0;
        }
        return totalNumberOfSlowCalls * 100.0f / totalNumberOfCalls;
    }

    @Override
    public float getFailureRate() {
        if (totalNumberOfCalls == 0) {
            return 0;
        }
        //失败总次数占用调用总次数的比例
        return totalNumberOfFailedCalls * 100.0f / totalNumberOfCalls;
    }

    @Override
    public Duration getAverageDuration() {
        if (totalNumberOfCalls == 0) {
            return Duration.ZERO;
        }
        return Duration.ofMillis(totalDurationInMillis / totalNumberOfCalls);
    }
}

5.熔断器之OpenState

public final class CircuitBreakerStateMachine implements CircuitBreaker {

	OpenState(final int attempts, CircuitBreakerMetrics circuitBreakerMetrics) {
	    this.attempts = attempts;
	    // 经过waitIntervalFunctionInOpenState毫秒后尝试将熔断器状态修改为半开状态
	    // resilience4j.circuitbreaker.configs.default.waitIntervalFunctionInOpenState:默认6秒【60000】。
	    final long waitDurationInMillis = circuitBreakerConfig.getWaitIntervalFunctionInOpenState().apply(attempts);
	    this.retryAfterWaitDuration = clock.instant().plus(waitDurationInMillis, MILLIS);
	    this.circuitBreakerMetrics = circuitBreakerMetrics;
	    // 只有当前开关automaticTransitionFromOpenToHalfOpenEnabled为true,才会尝试将熔断器状态修改为半开状态
	    //resilience4j.circuitbreaker.configs.default.automaticTransitionFromOpenToHalfOpenEnabled:默认值false
	    if (circuitBreakerConfig.isAutomaticTransitionFromOpenToHalfOpenEnabled()) {
	        ScheduledExecutorService scheduledExecutorService = schedulerFactory.getScheduler();
	        transitionToHalfOpenFuture = scheduledExecutorService
	            .schedule(this::toHalfOpenState, waitDurationInMillis, TimeUnit.MILLISECONDS);
	    } else {
	        transitionToHalfOpenFuture = null;
	    }
	    isOpen = new AtomicBoolean(true);
	}
	
	private void toHalfOpenState() {
        if (isOpen.compareAndSet(true, false)) {
            transitionToHalfOpenState();//CircuitBreakerStateMachine#transitionToHalfOpenState
        }
    }
	
	 @Override
     public boolean tryAcquirePermission() {
     	// 当前请求的时间达到retryAfterWaitDuration秒后,说明熔断器状态需要修改为半开状态
         if (clock.instant().isAfter(retryAfterWaitDuration)) {
             toHalfOpenState();// 修改为半开状态
             return true;// 请求继续访问下游服务
         }
         circuitBreakerMetrics.onCallNotPermitted();
         return false;// 直接降级处理
     }
	
	@Override
    public void acquirePermission() {
        if (!tryAcquirePermission()) {
            throw CallNotPermittedException.createCallNotPermittedException(CircuitBreakerStateMachine.this);
        }
    }
}

即使automaticTransitionFromOpenToHalfOpenEnabled为false,但是waitIntervalFunctionInOpenState配置值一定存在。如上得知经过waitIntervalFunctionInOpenState时间后照样可以将熔断状态从打开状态变更为半开状态。

6.熔断器之HalfOpenState

public final class CircuitBreakerStateMachine implements CircuitBreaker {

    private final CircuitBreakerConfig circuitBreakerConfig;
    private final Clock c;

	private class HalfOpenState implements CircuitBreakerState {

        private final AtomicInteger permittedNumberOfCalls;
        private final AtomicBoolean isHalfOpen;
        private final int attempts;
        private final CircuitBreakerMetrics circuitBreakerMetrics;
        @Nullable
        private final ScheduledFuture<?>  transitionToOpenFuture;

        HalfOpenState(int attempts) {
        	// 半开状态时允许请求通过继续调用下游服务的个数。默认为10个请求
            // resilience4j.circuitbreaker.configs.default.permittedNumberOfCallsInHalfOpenState:默认10。
            int permittedNumber =circuitBreakerConfig.getPermittedNumberOfCallsInHalfOpenState();
            this.circuitBreakerMetrics = CircuitBreakerMetrics.forHalfOpen(permittedNumber,circuitBreakerConfig, c);
            this.permittedNumberOfCalls = new AtomicInteger(permittedNumber);
            this.isHalfOpen = new AtomicBoolean(true);
            this.attempts = attempts;
            // 经过 maxWaitDurationInHalfOpenState 秒之后尝试将 半开状态修改为打开状态
             // resilience4j.circuitbreaker.configs.default.maxWaitDurationInHalfOpenState:默认0。
			Duration maxWaitDurationInHalfOpenState = circuitBreakerConfig.getMaxWaitDurationInHalfOpenState();
            final long maxWaitDurationInHalfOpenState = maxWaitDurationInHalfOpenState.toMillis();
            if (maxWaitDurationInHalfOpenState >= 1) {
                ScheduledExecutorService scheduledExecutorService = schedulerFactory.getScheduler();
                transitionToOpenFuture = scheduledExecutorService
                    .schedule(this::toOpenState, maxWaitDurationInHalfOpenState, TimeUnit.MILLISECONDS);
            } else {
                transitionToOpenFuture = null;
            }
        }

        @Override
        public boolean tryAcquirePermission() {
            if (permittedNumberOfCalls.getAndUpdate(current -> current == 0 ? current : --current) > 0) {
                return true;// 请求继续访问下游服务
            }
            circuitBreakerMetrics.onCallNotPermitted();
            return false;// 直接降级处理
        }

        @Override
        public void acquirePermission() {
            if (!tryAcquirePermission()) {//只要条件满足则抛出异常,就会执行降级策略
                throw CallNotPermittedException.createCallNotPermittedException(CircuitBreakerStateMachine.this);
            }
        }

        private void toOpenState() {
            if (isHalfOpen.compareAndSet(true, false)) {
                transitionToOpenState();
            }
        }

        @Override
        public void releasePermission() {
            permittedNumberOfCalls.incrementAndGet();
        }

        @Override
        public void onError(long duration, TimeUnit durationUnit, Throwable throwable) {
            checkIfThresholdsExceeded(circuitBreakerMetrics.onError(duration, durationUnit));
        }

        @Override
        public void onSuccess(long duration, TimeUnit durationUnit) {
            checkIfThresholdsExceeded(circuitBreakerMetrics.onSuccess(duration, durationUnit));
        }
      
        private void checkIfThresholdsExceeded(Result result) {
            if (Result.hasExceededThresholds(result)) {
                if (isHalfOpen.compareAndSet(true, false)) {
                    transitionToOpenState();
                }
            }
            if (result == BELOW_THRESHOLDS) {// 慢调用 & 错误次数均没有达到上限
                if (isHalfOpen.compareAndSet(true, false)) {
                    transitionToClosedState();
                }
            }
        }
		...
    }
}

如果在半开状态下存在请求访问成功则将熔断状态修改为关闭或者打开状态。

7.熔断器之ClosedState

public final class CircuitBreakerStateMachine implements CircuitBreaker {

	private class ClosedState implements CircuitBreakerState {
		
		private final CircuitBreakerMetrics circuitBreakerMetrics;
		
		public void onError(long duration, TimeUnit durationUnit, Throwable throwable) {
	        checkIfThresholdsExceeded(circuitBreakerMetrics.onError(duration, durationUnit));//error3
	    }
	
		private void checkIfThresholdsExceeded(Result result) {
            if (Result.hasExceededThresholds(result)) {// Result特定取值才会触发熔断器状态的改变
                if (isClosed.compareAndSet(true, false)) {
                    publishCircuitThresholdsExceededEvent(result, circuitBreakerMetrics);
                    transitionToOpenState();//将熔断器关闭状态切换为open状态
                }
            }
        }
	}
}

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1074956.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Stm32_标准库_9_TIM

频率(HZ)是频率的基本单位1HZ是1s的倒数 STM32F103C8T6一般情况给定时器的内部时钟都是72MHz&#xff08;系统主频率&#xff09; TIM基本构成 计数器、预分频器、自动化重装 // 都是16位其中计数器、自动化重装&#xff0c;都是16位换算成10进制范围为[0, 655536] 时间 1 /…

【Es基础入门必看】

Es基础入门 1. ElasticSearch的认知1.1 搜索1.2 倒排索引1.3 Lucene1.4 ES基本概念 1. ElasticSearch的认知 ElasticSearch&#xff08;以下简称ES&#xff09;是什么&#xff1f;按照官网的定义&#xff0c;ElasticSearch是一个分布式的、RESTFUL风格的搜索引擎。 1.1 搜索…

浅谈go语言的错误处理

前言 本文是探讨的是"go语言中如何优雅地处理错误" 观察go语言源码 在go语言的内置函数中,很多的函数都会返回一个error,特别是在与文件读写操作的相关的函数的时候,基本上都会有返回error,返回这个的好处是用来辨别是否出错,把结果直接告诉你,以便你进行下一步操…

64.最小路径和

法&#xff1a;动态规划 第一行的元素&#xff0c;只有通过左侧右移才能到达&#xff1b;第一列的元素&#xff0c;只有通过上方的下移才能到达。其他位置元素&#xff1a;比较从上方元素向下移动的路径和和左侧元素向右移动的路径和的较小值dp[i][j]&#xff1a;到达(i,j)位置…

【数据分类】基于麻雀搜索算法优化支持向量机的数据分类方法 SSA-SVM分类算法【Matlab代码#61】

文章目录 【可更换其他群智能算法&#xff0c;获取资源请见文章第6节&#xff1a;资源获取】1. 麻雀搜索算法&#xff08;SSA&#xff09;2. 支持向量机&#xff08;SVM&#xff09;3. SSA-SVM分类模型4. 部分代码展示5. 仿真结果展示6. 资源获取 【可更换其他群智能算法&#…

Jmeter 链接MySQL测试

1.环境部署 1.1官网下载MySQL Connector https://dev.mysql.com/downloads/connector/j/ 1.2 解压后&#xff0c;将jar放到jmeter/lib目录下 1.3 在测试计划中添加引用 2.脚本设置 2.1设置JDBC Connection Configuration 先添加一个setUp线程中&#xff0c;在setUp中添加“…

Git知识整理(持续更新)

一、跨系统配置之CSLF和LF Windows系统中&#xff0c;从第n行到第n1行&#xff0c;用的是回车\r加换行\n&#xff0c;即Carriage Return和Line Feed。 Mac和Linux系统中&#xff0c;从第n行到第n1行&#xff0c;只用了换行\n&#xff0c;即Line Feed。 git有CRLF机制&#xf…

基于springboot实现准妈妈孕期交流平台项目【项目源码+论文说明】

基于springboot实现准妈妈孕期交流平台演示 摘要 随着科学技术的飞速发展&#xff0c;社会的方方面面、各行各业都在努力与现代的先进技术接轨&#xff0c;通过科技手段来提高自身的优势&#xff0c;准妈妈孕期交流平台当然也不能排除在外。准妈妈孕期交流平台是以实际运用为开…

【UE5 Cesium】16-Cesium for Unreal 建立飞行跟踪器(1)

目录 步骤 一、关卡准备 二、UE读取存有航线点迹的CSV文件 三、程序化CesiumCartographicPolygon的点 步骤 一、关卡准备 1. 在Cesium For Unreal官网找到“San Francisco international Airport”的经纬度如下&#xff1a; 在“CesiumGeoreference”中设置相应的经纬度 …

软件开发进度的有效管控方法和技巧

对于软件开发而言&#xff0c;进度把控十分重要。如果无法即时跟踪进度&#xff0c;就无法获知项目计划的进展&#xff0c;团队通常会在此过程中失去动力。 Zoho Projects 结合IT行业特点运用项目管理技术、理念和方法&#xff0c;包括9大知识领域&#xff08;项目综合、范围、…

各类高危漏洞介绍及验证方式教程(一)

本期整理的漏洞验证教程约包含50多类漏洞&#xff0c;分多个章节编写&#xff0c;可从以下链接获取全文&#xff1a; 各类高危漏洞验证方式.docx (访问密码: 1455) 搭建dvwa测试环境基础教程.docx(访问密码: 1455) web逻辑漏洞挖掘快速入门基础教程.docx(访问密码: 1455) 01 Ca…

【面试算法——动态规划 21】不同的子序列(hard) 通配符匹配(hard)

115. 不同的子序列 给你两个字符串 s 和 t &#xff0c;统计并返回在 s 的 子序列 中 t 出现的个数&#xff0c;结果需要对 109 7 取模。 链接&#xff1a;&#xff1a;https://leetcode.cn/problems/distinct-subsequences/ 示例 1&#xff1a; 输入&#xff1a;s “rab…

FreeRTOS自我救赎1之基本知识

每次写博客都是先用完了知识再写&#xff0c;也是很无奈&#xff0c;因为面试的时候我只会操作但我不会说&#xff0c;多写博客才能会“吹”。 1.目录结构 不好评价&#xff0c;我的目录结构是cubemx生成的&#xff0c;其中BSP_Device存储了一些模块化后的中间层代码 1.1核心…

【Java 进阶篇】使用Druid数据库连接池工具类进行测试

在前面的博客中&#xff0c;我们已经介绍了如何配置和使用Druid数据库连接池。现在&#xff0c;让我们来学习如何编写测试代码&#xff0c;以确保Druid连接池的正常运行。 步骤1&#xff1a;创建测试表 首先&#xff0c;我们需要创建一个测试用的数据库表&#xff0c;以便在示…

MongoDB——centOS7安装mongodb5.0.21版本服务端(图解版)

目录 一、mongodb官网下载地址二、安装步骤2.1、上传安装包并解压2.2、配置环境变量2.3、创建目录并授权2.4、创建配置文件2.5、启动MongoDB 三、开放端口四、客户端连接 一、mongodb官网下载地址 mongodb官网下载地址&#xff1a;https://www.mongodb.com/try/download/commu…

Android启动式服务

Github: https://github.com/MADMAX110/Joke 服务是与活动类似的应用组件&#xff0c;只不过服务没有用户界面。 使用服务可以一直在后台做某些事情&#xff0c;比如下载一个大文件&#xff0c;播放一段音乐或者监听来自服务器的一个消息。 有三种类型的服务&#xff1a; 1、启…

【UiPath】解决办法:There are no Unattended runtimes configured on this machine.

本文收录于【#摸鱼玩 UiPath】专栏中&#xff0c;记录在 RPA&#xff08;UiPath&#xff09;使用过程中&#xff0c;遇到的问题以及解决办法。 更多关于 RPA 技术内容敬请关注&#xff1a;【#摸鱼玩 UiPath】专栏。 文章目录 问题描述分析原因解决办法文末参考RPA 技术 UiPath…

【自动控制原理】Simulink搭建仿真模型(信号发生器、比较点、传递函数、示波器)

目录 一、前言 二、Simulink~自动控制原理 1、Simulink官方教程 2、开环系统、闭环系统 a. 开环系统 加干扰 b. 闭环系统 三、Simulink模型仿真 1、信号发生器 a. 阶跃信号发生器 b. 脉冲信号发生器 2、比较点 3、传递函数 a. 比例环节&#xff08;Proportional …

Xilisoft Video Converter Ultimate for Mac:让音视频转换变得更简单

无论是在工作还是娱乐中&#xff0c;我们都会遇到音视频格式不兼容的问题。这时候&#xff0c;一个好用的音视频格式转换工具就显得尤为重要。Xilisoft Video Converter Ultimate for Mac&#xff08;曦力音视频转换&#xff09;就是这样一款让您的音视频转换变得更简单的工具。…