0. 像喝点东西,但不知道喝什么
先来段源码,看一下 我们在dashboard 录入的降级规则,都映射到哪些字段上
package com.alibaba.csp.sentinel.slots.block.degrade;
public class DegradeRule extends AbstractRule {
public DegradeRule(String resourceName) {
setResource(resourceName);
}
/**
* Circuit breaking strategy (0: average RT, 1: exception ratio, 2: exception count).
*/
private int grade = RuleConstant.DEGRADE_GRADE_RT;
/**
* Threshold count.
*/
private double count;
/**
* Recovery timeout (in seconds) when circuit breaker opens. After the timeout, the circuit breaker will
* transform to half-open state for trying a few requests.
*/
private int timeWindow;
/**
* Minimum number of requests (in an active statistic time span) that can trigger circuit breaking.
*
* @since 1.7.0
*/
private int minRequestAmount = RuleConstant.DEGRADE_DEFAULT_MIN_REQUEST_AMOUNT;
/**
* The threshold of slow request ratio in RT mode.
*/
private double slowRatioThreshold = 1.0d;
private int statIntervalMs = 1000;
}
1. sentinel 的断路器实现
- 效果跟 netflix.hystrix 差不离
package com.alibaba.csp.sentinel.slots.block.degrade.circuitbreaker;
public abstract class AbstractCircuitBreaker implements CircuitBreaker {
protected final DegradeRule rule;
protected final int recoveryTimeoutMs;
private final EventObserverRegistry observerRegistry;
protected final AtomicReference<State> currentState = new AtomicReference<>(State.CLOSED);
protected volatile long nextRetryTimestamp;
public AbstractCircuitBreaker(DegradeRule rule) {
this(rule, EventObserverRegistry.getInstance());
}
AbstractCircuitBreaker(DegradeRule rule, EventObserverRegistry observerRegistry) {
AssertUtil.notNull(observerRegistry, "observerRegistry cannot be null");
if (!DegradeRuleManager.isValidRule(rule)) {
throw new IllegalArgumentException("Invalid DegradeRule: " + rule);
}
this.observerRegistry = observerRegistry;
this.rule = rule;
this.recoveryTimeoutMs = rule.getTimeWindow() * 1000;
}
// true, 成功获得令牌
@Override
public boolean tryPass(Context context) {
// 断路器关闭
// Template implementation.
if (currentState.get() == State.CLOSED) {
return true;
}
// 断路器开启
if (currentState.get() == State.OPEN) {
// 半开状态,允许通过1个请求来尝试,可行的话,即 true
// For half-open state we allow a request for probing.
return retryTimeoutArrived() && fromOpenToHalfOpen(context);
}
return false;
}
// 当前系统时间 >= 下一次重试时间
protected boolean retryTimeoutArrived() {
return TimeUtil.currentTimeMillis() >= nextRetryTimestamp;
}
protected boolean fromOpenToHalfOpen(Context context) {
if (currentState.compareAndSet(State.OPEN, State.HALF_OPEN)) {
// 通知订阅者: 状态的变化 开 -> 半开
notifyObservers(State.OPEN, State.HALF_OPEN, null);
Entry entry = context.getCurEntry();
// 过程中断时的回调,回滚状态
entry.whenTerminate(new BiConsumer<Context, Entry>() {
@Override
public void accept(Context context, Entry entry) {
// Note: This works as a temporary workaround for https://github.com/alibaba/Sentinel/issues/1638
// Without the hook, the circuit breaker won't recover from half-open state in some circumstances
// when the request is actually blocked by upcoming rules (not only degrade rules).
if (entry.getBlockError() != null) {
// Fallback to OPEN due to detecting request is blocked
currentState.compareAndSet(State.HALF_OPEN, State.OPEN);
notifyObservers(State.HALF_OPEN, State.OPEN, 1.0d);
}
}
});
return true;
}
return false;
}
private void notifyObservers(CircuitBreaker.State prevState, CircuitBreaker.State newState, Double snapshotValue) {
for (CircuitBreakerStateChangeObserver observer : observerRegistry.getStateChangeObservers()) {
observer.onStateChange(prevState, newState, rule, snapshotValue);
}
}
}
从 DegradeRule.grade 可知道:
- 默认的降级策略,即 响应时长
- 除此之外,还支持 异常 的触发方式
下面分别借助两个 AbstractCircuitBreaker 的实现类来说明实现细节
1.1 ResponseTimeCircuitBreaker(RT)
package com.alibaba.csp.sentinel.slots.block.degrade.circuitbreaker;
// 根据请求的响应时间(慢调用比例)
public class ResponseTimeCircuitBreaker extends AbstractCircuitBreaker {
private static final double SLOW_REQUEST_RATIO_MAX_VALUE = 1.0d;
private final long maxAllowedRt;
private final double maxSlowRequestRatio;
private final int minRequestAmount;
private final LeapArray<SlowRequestCounter> slidingCounter;
public ResponseTimeCircuitBreaker(DegradeRule rule) {
this(rule, new SlowRequestLeapArray(1, rule.getStatIntervalMs()));
}
ResponseTimeCircuitBreaker(DegradeRule rule, LeapArray<SlowRequestCounter> stat) {
super(rule);
AssertUtil.isTrue(rule.getGrade() == RuleConstant.DEGRADE_GRADE_RT, "rule metric type should be RT");
AssertUtil.notNull(stat, "stat cannot be null");
this.maxAllowedRt = Math.round(rule.getCount());
this.maxSlowRequestRatio = rule.getSlowRatioThreshold();
this.minRequestAmount = rule.getMinRequestAmount();
this.slidingCounter = stat;
}
@Override
public void onRequestComplete(Context context) {
SlowRequestCounter counter = slidingCounter.currentWindow().value();
Entry entry = context.getCurEntry();
if (entry == null) {
return;
}
long completeTime = entry.getCompleteTimestamp();
if (completeTime <= 0) {
completeTime = TimeUtil.currentTimeMillis();
}
long rt = completeTime - entry.getCreateTimestamp();
if (rt > maxAllowedRt) {
counter.slowCount.add(1);
}
counter.totalCount.add(1);
handleStateChangeWhenThresholdExceeded(rt);
}
private void handleStateChangeWhenThresholdExceeded(long rt) {
if (currentState.get() == State.OPEN) {
return;
}
if (currentState.get() == State.HALF_OPEN) {
// In detecting request
// TODO: improve logic for half-open recovery
if (rt > maxAllowedRt) {
fromHalfOpenToOpen(1.0d);
} else {
fromHalfOpenToClose();
}
return;
}
List<SlowRequestCounter> counters = slidingCounter.values();
long slowCount = 0;
long totalCount = 0;
for (SlowRequestCounter counter : counters) {
slowCount += counter.slowCount.sum();
totalCount += counter.totalCount.sum();
}
if (totalCount < minRequestAmount) {
return;
}
double currentRatio = slowCount * 1.0d / totalCount;
if (currentRatio > maxSlowRequestRatio) {
transformToOpen(currentRatio);
}
if (Double.compare(currentRatio, maxSlowRequestRatio) == 0 &&
Double.compare(maxSlowRequestRatio, SLOW_REQUEST_RATIO_MAX_VALUE) == 0) {
transformToOpen(currentRatio);
}
}
static class SlowRequestCounter {
private LongAdder slowCount;
private LongAdder totalCount;
}
}
1.2 ExceptionCircuitBreaker
package com.alibaba.csp.sentinel.slots.block.degrade.circuitbreaker;
// 策略:异常比例、异常数
public class ExceptionCircuitBreaker extends AbstractCircuitBreaker {
private final int strategy; // 策略的枚举值
private final int minRequestAmount; // 最小请求数
private final double threshold; // 设置的阈值
private final LeapArray<SimpleErrorCounter> stat;
public ExceptionCircuitBreaker(DegradeRule rule) {
this(rule, new SimpleErrorCounterLeapArray(1, rule.getStatIntervalMs()));
}
ExceptionCircuitBreaker(DegradeRule rule, LeapArray<SimpleErrorCounter> stat) {
super(rule);
this.strategy = rule.getGrade();
boolean modeOk = strategy == DEGRADE_GRADE_EXCEPTION_RATIO || strategy == DEGRADE_GRADE_EXCEPTION_COUNT;
AssertUtil.isTrue(modeOk, "rule strategy should be error-ratio or error-count");
AssertUtil.notNull(stat, "stat cannot be null");
this.minRequestAmount = rule.getMinRequestAmount();
this.threshold = rule.getCount();
this.stat = stat;
}
@Override
public void onRequestComplete(Context context) {
Entry entry = context.getCurEntry();
if (entry == null) {
return;
}
Throwable error = entry.getError();
SimpleErrorCounter counter = stat.currentWindow().value();
// 异常发生了,累加
if (error != null) {
counter.getErrorCount().add(1);
}
// 总数(异常+非异常),同样累加
counter.getTotalCount().add(1);
// step into ...
handleStateChangeWhenThresholdExceeded(error);
}
private void handleStateChangeWhenThresholdExceeded(Throwable error) {
// 断路器早已启动? 好吧,后面不用看了
if (currentState.get() == State.OPEN) {
return;
}
// 半开状态,试探一下
if (currentState.get() == State.HALF_OPEN) {
// In detecting request
if (error == null) {
fromHalfOpenToClose();
} else {
fromHalfOpenToOpen(1.0d);
}
return;
}
// 把这个异常计数传播到整个时间窗(LeapArray)的计数器中
List<SimpleErrorCounter> counters = stat.values();
long errCount = 0;
long totalCount = 0;
for (SimpleErrorCounter counter : counters) {
errCount += counter.errorCount.sum();
totalCount += counter.totalCount.sum();
}
// 虽然有异常,但是比配置的最小请求数还小,那不需要使用断路器
if (totalCount < minRequestAmount) {
return;
}
double curCount = errCount;
// 如果策略是:按照异常率的话,计算概率
if (strategy == DEGRADE_GRADE_EXCEPTION_RATIO) {
// Use errorRatio
curCount = errCount * 1.0d / totalCount;
}
// 这里开启断路器
if (curCount > threshold) {
transformToOpen(curCount);
}
}
static class SimpleErrorCounter {
private LongAdder errorCount;
private LongAdder totalCount;
public SimpleErrorCounter() {
this.errorCount = new LongAdder();
this.totalCount = new LongAdder();
}
public LongAdder getErrorCount() {
return errorCount;
}
public LongAdder getTotalCount() {
return totalCount;
}
public SimpleErrorCounter reset() {
errorCount.reset();
totalCount.reset();
return this;
}
@Override
public String toString() {
return "SimpleErrorCounter{" +
"errorCount=" + errorCount +
", totalCount=" + totalCount +
'}';
}
}
}