大纲
1.DegradeSlot实现熔断降级的原理与源码
2.Sentinel数据指标统计的滑动窗口算法
1.DegradeSlot实现熔断降级的原理与源码
(1)熔断降级规则DegradeRule的配置Demo
(2)注册熔断降级监听器和加载熔断降级规则
(3)DegradeSlot根据熔断降级规则对请求进行验证
(1)熔断降级规则DegradeRule的配置Demo
首先熔断降级规则的应用场景有如下两种:
场景一:在微服务架构中,当一个服务出现问题时,可以通过配置熔断降级规则,防止故障扩散,保护整个系统的稳定性。
场景二:在调用第三方API时,可以配置熔断降级规则,避免因第三方API不稳定导致自身系统不稳定。
然后从下图可知,熔断降级规则包含以下属性:
属性一:熔断策略(grade)
这表示的是熔断降级规则的类型,取值范围分别是:
RuleConstant.DEGRADE_GRADE_RT(慢调用比例)
RuleConstant.DEGRADE_GRADE_EXCEPTION_RATIO(异常比例)
RuleConstant.DEGRADE_GRADE_EXCEPTION_COUNT(异常数)
其中,默认下的熔断降级规则是基于慢调用比例策略的,也就是默认值为:
RuleConstant.DEGRADE_GRADE_RT
属性二:熔断降级的阈值(count)
DegradeRule.count属性的具体含义取决于DegradeRule.grade属性的值。
如果grade为慢调用比例,则count表示慢调用比例阈值。
如果grade为异常比例,则count表示异常比例阈值。
如果grade为异常数,则count表示异常数阈值。
属性三:熔断时长(timeWindow)
这表示的是熔断降级发生后的降级持续时间,在这段时间内对应的资源将被降级。
属性四:最小请求数(minRequestAmount)
这表示的是熔断降级统计周期内的最小请求总数。仅当周期内的请求总数达到此值时,才会根据grade和count进行熔断降级。默认值为:
RuleConstant.DEGRADE_DEFAULT_MIN_REQUEST_AMOUNT
属性五:慢调用比例阈值(slowRatioThreshold)
该属性当grade为慢调用比例时生效,取值范围为0到1之间的小数,表示慢调用请求占总请求的比例。
属性六:统计时长(statIntervalMs)
这表示的是熔断降级统计周期(单位:毫秒),默认值为1000毫秒(1秒)。在这个周期内,Sentinel会对请求进行统计,以判断是否要进行熔断降级。
public class DegradeRule extends AbstractRule {
//熔断策略,表示的是熔断降级规则的类型
private int grade = RuleConstant.DEGRADE_GRADE_RT;
//熔断降级的阈值,具体含义取决于DegradeRule.grade属性的值
//如果grade为慢调用比例,则count表示慢调用比例阈值
//如果grade为异常比例,则count表示异常比例阈值
//如果grade为异常数,则count表示异常数阈值
private double count;
//熔断时长,即熔断降级发生后的降级持续时间,在这段时间内对应的资源将被降级
private int timeWindow;
//最小请求数,仅当周期内的请求总数达到此值时,才会根据grade和count进行熔断降级
private int minRequestAmount = RuleConstant.DEGRADE_DEFAULT_MIN_REQUEST_AMOUNT;
//慢调用比例阈值,仅当grade为慢调用比例时生效
private double slowRatioThreshold = 1.0d;
//统计时长,熔断降级统计周期,在这个周期内,Sentinel会对请求进行统计,以判断是否要进行熔断降级
private int statIntervalMs = 1000;
...
}
接着如下便是DegradeRule的配置Demo:
//Run this demo, and the output will be like:
//1529399827825,total:0, pass:0, block:0
//1529399828825,total:4263, pass:100, block:4164
//1529399829825,total:19179, pass:4, block:19176 // circuit breaker opens
//1529399830824,total:19806, pass:0, block:19806
//1529399831825,total:19198, pass:0, block:19198
//1529399832824,total:19481, pass:0, block:19481
//1529399833826,total:19241, pass:0, block:19241
//1529399834826,total:17276, pass:0, block:17276
//1529399835826,total:18722, pass:0, block:18722
//1529399836826,total:19490, pass:0, block:19492
//1529399837828,total:19355, pass:0, block:19355
//1529399838827,total:11388, pass:0, block:11388
//1529399839829,total:14494, pass:104, block:14390 // After 10 seconds, the system restored
//1529399840854,total:18505, pass:0, block:18505
//1529399841854,total:19673, pass:0, block:19676
public class SlowRatioCircuitBreakerDemo {
private static final String KEY = "some_method";
private static volatile boolean stop = false;
private static int seconds = 120;
private static AtomicInteger total = new AtomicInteger();
private static AtomicInteger pass = new AtomicInteger();
private static AtomicInteger block = new AtomicInteger();
public static void main(String[] args) throws Exception {
initDegradeRule();
registerStateChangeObserver();
startTick();
int concurrency = 8;
for (int i = 0; i < concurrency; i++) {
Thread entryThread = new Thread(() -> {
while (true) {
Entry entry = null;
try {
entry = SphU.entry(KEY);
pass.incrementAndGet();
//RT: [40ms, 60ms)
sleep(ThreadLocalRandom.current().nextInt(40, 60));
} catch (BlockException e) {
block.incrementAndGet();
sleep(ThreadLocalRandom.current().nextInt(5, 10));
} finally {
total.incrementAndGet();
if (entry != null) {
entry.exit();
}
}
}
});
entryThread.setName("sentinel-simulate-traffic-task-" + i);
entryThread.start();
}
}
private static void registerStateChangeObserver() {
EventObserverRegistry.getInstance().addStateChangeObserver("logging",
(prevState, newState, rule, snapshotValue) -> {
if (newState == State.OPEN) {
System.err.println(String.format("%s -> OPEN at %d, snapshotValue=%.2f", prevState.name(), TimeUtil.currentTimeMillis(), snapshotValue));
} else {
System.err.println(String.format("%s -> %s at %d", prevState.name(), newState.name(), TimeUtil.currentTimeMillis()));
}
}
);
}
private static void initDegradeRule() {
List<DegradeRule> rules = new ArrayList<>();
DegradeRule rule = new DegradeRule(KEY)
.setGrade(CircuitBreakerStrategy.SLOW_REQUEST_RATIO.getType())//Max allowed response time
.setCount(50)
.setTimeWindow(10)//Retry timeout (in second)
.setSlowRatioThreshold(0.6)//Circuit breaker opens when slow request ratio > 60%
.setMinRequestAmount(100)
.setStatIntervalMs(20000);
rules.add(rule);
DegradeRuleManager.loadRules(rules);
System.out.println("Degrade rule loaded: " + rules);
}
private static void sleep(int timeMs) {
try {
TimeUnit.MILLISECONDS.sleep(timeMs);
} catch (InterruptedException e) {
// ignore
}
}
private static void startTick() {
Thread timer = new Thread(new TimerTask());
timer.setName("sentinel-timer-tick-task");
timer.start();
}
static class TimerTask implements Runnable {
@Override
public void run() {
long start = System.currentTimeMillis();
System.out.println("Begin to run! Go go go!");
System.out.println("See corresponding metrics.log for accurate statistic data");
long oldTotal = 0;
long oldPass = 0;
long oldBlock = 0;
while (!stop) {
sleep(1000);
long globalTotal = total.get();
long oneSecondTotal = globalTotal - oldTotal;
oldTotal = globalTotal;
long globalPass = pass.get();
long oneSecondPass = globalPass - oldPass;
oldPass = globalPass;
long globalBlock = block.get();
long oneSecondBlock = globalBlock - oldBlock;
oldBlock = globalBlock;
System.out.println(TimeUtil.currentTimeMillis() + ", total:" + oneSecondTotal + ", pass:" + oneSecondPass + ", block:" + oneSecondBlock);
if (seconds-- <= 0) {
stop = true;
}
}
long cost = System.currentTimeMillis() - start;
System.out.println("time cost: " + cost + " ms");
System.out.println("total: " + total.get() + ", pass:" + pass.get() + ", block:" + block.get());
System.exit(0);
}
}
}
(2)注册熔断降级监听器和加载熔断降级规则
一.Sentinel监听器模式的核心代码回顾
二.注册熔断降级监听器和加载熔断降级规则
三.用于实现Sentinel熔断降级功能的熔断器接口
一.Sentinel监听器模式的核心代码回顾
Sentinel监听器模式会包含三大角色:
角色一:监听器PropertyListener
角色二:监听器管理器SentinelProperty
角色三:规则管理器RuleManager
首先,规则管理器RuleManager在初始化时,会调用监听器管理器SentinelProperty的addListener()方法将监听器PropertyListener注册到监听器管理器SentinelProperty上。
然后,使用方使用具体的规则时,可以通过调用规则管理器RuleManager的loadRules()方法加载规则。加载规则时会调用监听器管理器SentinelProperty的updateValue()方法通知每一个监听器PropertyListener,即通过监听器PropertyListener的configUpdate()方法把规则加载到规则管理器的本地中。
二.注册熔断降级监听器和加载熔断降级规则
DegradeRuleManager中有两个全局的HashMap:一个是用于存放资源和熔断器的对应关系的HashMap—circuitBreakers,另一个是用于存放资源和熔断规则的对应关系的HashMap—ruleMap。
其中熔断器是由熔断策略DegradeRule.grade来决定的。如果熔断策略是慢调用比例,则熔断器是ResponseTimeCircuitBreaker。如果熔断策略是异常比例和异常数,则熔断器是ExceptionCircuitBreaker。
public class DynamicSentinelProperty<T> implements SentinelProperty<T> {
protected Set<PropertyListener<T>> listeners = new CopyOnWriteArraySet<>();
private T value = null;
...
//添加监听器到集合
@Override
public void addListener(PropertyListener<T> listener) {
listeners.add(listener);
//回调监听器的configLoad()方法初始化规则配置
listener.configLoad(value);
}
//更新值
@Override
public boolean updateValue(T newValue) {
//如果值没变化,直接返回
if (isEqual(value, newValue)) {
return false;
}
RecordLog.info("[DynamicSentinelProperty] Config will be updated to: {}", newValue);
//如果值发生了变化,则遍历监听器,回调监听器的configUpdate()方法更新对应的值
value = newValue;
for (PropertyListener<T> listener : listeners) {
listener.configUpdate(newValue);
}
return true;
}
...
}
public final class DegradeRuleManager {
//用于存放资源和熔断器的对应关系的HashMap,其中熔断器是由熔断策略DegradeRule.grade来决定的
private static volatile Map<String, List<CircuitBreaker>> circuitBreakers = new HashMap<>();
//用于存放资源和熔断规则的对应关系的HashMap
private static volatile Map<String, Set<DegradeRule>> ruleMap = new HashMap<>();
private static final RulePropertyListener LISTENER = new RulePropertyListener();
private static SentinelProperty<List<DegradeRule>> currentProperty = new DynamicSentinelProperty<>();
static {
currentProperty.addListener(LISTENER);
}
...
private static class RulePropertyListener implements PropertyListener<List<DegradeRule>> {
@Override
public void configUpdate(List<DegradeRule> conf) {
reloadFrom(conf);
RecordLog.info("[DegradeRuleManager] Degrade rules has been updated to: {}", ruleMap);
}
@Override
public void configLoad(List<DegradeRule> conf) {
reloadFrom(conf);
RecordLog.info("[DegradeRuleManager] Degrade rules loaded: {}", ruleMap);
}
private synchronized void reloadFrom(List<DegradeRule> list) {
//构建熔断器
Map<String, List<CircuitBreaker>> cbs = buildCircuitBreakers(list);
Map<String, Set<DegradeRule>> rm = new HashMap<>(cbs.size());
for (Map.Entry<String, List<CircuitBreaker>> e : cbs.entrySet()) {
assert e.getValue() != null && !e.getValue().isEmpty();
Set<DegradeRule> rules = new HashSet<>(e.getValue().size());
for (CircuitBreaker cb : e.getValue()) {
rules.add(cb.getRule());
}
rm.put(e.getKey(), rules);
}
DegradeRuleManager.circuitBreakers = cbs;
DegradeRuleManager.ruleMap = rm;
}
private Map<String, List<CircuitBreaker>> buildCircuitBreakers(List<DegradeRule> list) {
Map<String, List<CircuitBreaker>> cbMap = new HashMap<>(8);
if (list == null || list.isEmpty()) {
return cbMap;
}
for (DegradeRule rule : list) {
if (!isValidRule(rule)) {
RecordLog.warn("[DegradeRuleManager] Ignoring invalid rule when loading new rules: {}", rule);
continue;
}
if (StringUtil.isBlank(rule.getLimitApp())) {
rule.setLimitApp(RuleConstant.LIMIT_APP_DEFAULT);
}
CircuitBreaker cb = getExistingSameCbOrNew(rule);
if (cb == null) {
RecordLog.warn("[DegradeRuleManager] Unknown circuit breaking strategy, ignoring: {}", rule);
continue;
}
String resourceName = rule.getResource();
List<CircuitBreaker> cbList = cbMap.get(resourceName);
if (cbList == null) {
cbList = new ArrayList<>();
cbMap.put(resourceName, cbList);
}
cbList.add(cb);
}
return cbMap;
}
}
private static CircuitBreaker getExistingSameCbOrNew(DegradeRule rule) {
List<CircuitBreaker> cbs = getCircuitBreakers(rule.getResource());
if (cbs == null || cbs.isEmpty()) {
return newCircuitBreakerFrom(rule);
}
for (CircuitBreaker cb : cbs) {
if (rule.equals(cb.getRule())) {
//Reuse the circuit breaker if the rule remains unchanged.
return cb;
}
}
return newCircuitBreakerFrom(rule);
}
static List<CircuitBreaker> getCircuitBreakers(String resourceName) {
return circuitBreakers.get(resourceName);
}
//Create a circuit breaker instance from provided circuit breaking rule.
private static CircuitBreaker newCircuitBreakerFrom(DegradeRule rule) {
switch (rule.getGrade()) {
case RuleConstant.DEGRADE_GRADE_RT:
return new ResponseTimeCircuitBreaker(rule);
case RuleConstant.DEGRADE_GRADE_EXCEPTION_RATIO:
case RuleConstant.DEGRADE_GRADE_EXCEPTION_COUNT:
return new ExceptionCircuitBreaker(rule);
default:
return null;
}
}
}
三.用于实现Sentinel熔断降级功能的熔断器接口
进行熔断降级规则验证时,就会根据熔断降级规则的熔断策略,选择对应的熔断器CircuitBreaker。再通过熔断器CircuitBreaker的tryPass()接口,尝试通过当前请求。
//熔断器接口,用于实现Sentinel的熔断降级功能
public interface CircuitBreaker {
//Get the associated circuit breaking rule.
//获取当前熔断器对应的熔断降级规则
DegradeRule getRule();
//Acquires permission of an invocation only if it is available at the time of invoking.
//尝试通过熔断器
//如果熔断器处于关闭状态(CLOSED),则允许请求通过;
//如果处于打开状态(OPEN),则拒绝请求;
//如果处于半开状态(HALF_OPEN),则根据规则允许部分请求通过;
boolean tryPass(Context context);
//Get current state of the circuit breaker.
//获取当前熔断器的状态(OPEN, HALF_OPEN, CLOSED)
State currentState();
//Record a completed request with the context and handle state transformation of the circuit breaker.
//Called when a passed invocation finished.
//在请求完成后调用此方法,用于更新熔断器的统计数据
void onRequestComplete(Context context);
//Circuit breaker state.
enum State {
//In OPEN state, all requests will be rejected until the next recovery time point.
//表示熔断器处于打开状态,此时会拒绝所有请求
OPEN,
//In HALF_OPEN state, the circuit breaker will allow a "probe" invocation.
//If the invocation is abnormal according to the strategy (e.g. it's slow),
//the circuit breaker will re-transform to the OPEN state and wait for the next recovery time point;
//otherwise the resource will be regarded as "recovered" and the circuit breaker will cease cutting off requests and transform to CLOSED state.
//表示熔断器处于半开状态,此时允许部分请求通过,以检测系统是否已经恢复正常
HALF_OPEN,
//In CLOSED state, all requests are permitted.
//When current metric value exceeds the threshold, the circuit breaker will transform to OPEN state.
//表示熔断器处于关闭状态,此时允许所有请求通过
CLOSED
}
}
public class ResponseTimeCircuitBreaker extends AbstractCircuitBreaker {
...
...
}
public class ExceptionCircuitBreaker extends AbstractCircuitBreaker {
...
...
}
(3)DegradeSlot根据熔断降级规则对请求进行验证
一.entry()方法会对请求进行熔断降级的规则验证
二.exit()方法会触发改变熔断器的状态
开始对请求进行规则验证时,需要调用SphU的entry()方法。完成对请求的规则验证后,也需要调用Entry的exit()方法。
一.entry()方法会对请求进行熔断降级的规则验证
在DegradeSlot的entry()方法中,执行熔断降级规则验证的是DegradeSlot的performChecking()方法。该方法首先会根据资源名称从DegradeRuleManager中获取熔断器,然后调用每个熔断器的tryPass()方法判断熔断器开关是否已打开来验证。如果验证通过,则返回true表示放行请求。如果验证不通过,则返回false表示拦截请求。
在判断熔断器开关是否已打开的AbstractCircuitBreaker的tryPass()方法中,首先会判断熔断器是否是关闭状态。如果是关闭状态,则代表没打开熔断器,于是会直接返回true放行请求。如果是打开状态,则要继续判断当前请求是否已达熔断器恢复时间。如果当前请求已达熔断器恢复时间,也就是当前时间大于下次尝试恢复的时间,且成功将熔断器状态从OPEN变为HALF_OPEN,则放行请求,否则拒绝。
@Spi(order = Constants.ORDER_DEGRADE_SLOT)
public class DegradeSlot extends AbstractLinkedProcessorSlot<DefaultNode> {
@Override
public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count, boolean prioritized, Object... args) throws Throwable {
//验证熔断规则的逻辑
performChecking(context, resourceWrapper);
fireEntry(context, resourceWrapper, node, count, prioritized, args);
}
void performChecking(Context context, ResourceWrapper r) throws BlockException {
//先根据资源名称获取对应的熔断器,也就是从DegradeRuleManager中的Map<String, List<CircuitBreaker>>类型的circuitBreakers获取
List<CircuitBreaker> circuitBreakers = DegradeRuleManager.getCircuitBreakers(r.getName());
if (circuitBreakers == null || circuitBreakers.isEmpty()) {
return;
}
//调用每个熔断器的tryPass()方法验证当前请求是否可以通过
for (CircuitBreaker cb : circuitBreakers) {
if (!cb.tryPass(context)) {
throw new DegradeException(cb.getRule().getLimitApp(), cb.getRule());
}
}
}
...
}
public abstract class AbstractCircuitBreaker implements CircuitBreaker {
protected final DegradeRule rule;
private final EventObserverRegistry observerRegistry;
//熔断器当前的开关状态
protected final AtomicReference<State> currentState = new AtomicReference<>(State.CLOSED);
//下一次尝试恢复的时间
protected volatile long nextRetryTimestamp;
...
@Override
public boolean tryPass(Context context) {
//首先判断熔断器是否是关闭状态
//如果是关闭状态则代表根本没打开熔断器,也就不涉及熔断了,因此直接返回true放行当前请求
if (currentState.get() == State.CLOSED) {
return true;
}
//如果熔断器是打开状态,那么就要进行如下逻辑判断
if (currentState.get() == State.OPEN) {
//如果当前系统时间大于等于下一次尝试恢复的时间,即已到达可尝试恢复的时间且成功设置当前熔断器状态为半开启,则可以放行当前请求
//也就是如果此次请求已达到了熔断器恢复时间,并且将熔断器的状态从打开变为半开启,则放行,反之拒绝
return retryTimeoutArrived() && fromOpenToHalfOpen(context);
}
return false;
}
protected boolean retryTimeoutArrived() {
//当前时间是否大于下一次尝试恢复的时间
return TimeUtil.currentTimeMillis() >= nextRetryTimestamp;
}
protected boolean fromOpenToHalfOpen(Context context) {
//将当前熔断器的状态从OPEN变为HALF_OPEN
if (currentState.compareAndSet(State.OPEN, State.HALF_OPEN)) {
//通过观察者模式通知各个观察者
notifyObservers(State.OPEN, State.HALF_OPEN, null);
...
return true;
}
return false;
}
private void notifyObservers(CircuitBreaker.State prevState, CircuitBreaker.State newState, Double snapshotValue) {
for (CircuitBreakerStateChangeObserver observer : observerRegistry.getStateChangeObservers()) {
observer.onStateChange(prevState, newState, rule, snapshotValue);
}
}
...
}
二.exit()方法会触发改变熔断器的状态
问题一:熔断器的tryPass()方法一开始就会判断熔断器的状态,那么熔断器何时打开、何时关闭?
答:这个会在配置熔断降级规则DegradeRule时指定,如下图就指定了:如果最近10000ms内,10个请求中有2个是异常的,则触发熔断。
问题二:如果熔断器状态为打开,判断时会比较下一次尝试恢复时间和当前时间。如果当前时间大于下一次尝试恢复的时间,意味着请求已超过熔断时间,即当前请求不再处于熔断时间段内,因此可以放行。那么下一次尝试恢复的时间nextRetryTimestamp会在何时更新?
异常数据的采集和下次恢复时间的更新会由DegradeSlot的exit()方法触发。因为开始对请求进行规则验证时,会调用SphU的entry()方法。但完成对请求的规则验证后,则会调用Entry的exit()方法,而Entry的exit()方法最终就会执行到DegradeSlot的exit()方法。
在DegradeSlot的exit()方法中,就会调用熔断器的onRequestComplete()方法来进行计数,并改变熔断器的状态。
比如在执行ExceptionCircuitBreaker的onRequestComplete()方法中,会先统计异常数errorCount和总请求数totalCount,然后根据熔断降级的规则判断是否达到打开或关闭熔断器的阈值,最后执行比如AbstractCircuitBreaker的transformToOpen()方法打开熔断器。
AbstractCircuitBreaker.transformToOpen()方法的主要工作是:首先将当前熔断器状态变更为OPEN,然后更新下一次尝试恢复时间nextRetryTimestamp,最后通过观察者设计模式通知各个观察者。
@Spi(order = Constants.ORDER_DEGRADE_SLOT)
public class DegradeSlot extends AbstractLinkedProcessorSlot<DefaultNode> {
...
@Override
public void exit(Context context, ResourceWrapper r, int count, Object... args) {
Entry curEntry = context.getCurEntry();
if (curEntry.getBlockError() != null) {
fireExit(context, r, count, args);
return;
}
List<CircuitBreaker> circuitBreakers = DegradeRuleManager.getCircuitBreakers(r.getName());
if (circuitBreakers == null || circuitBreakers.isEmpty()) {
fireExit(context, r, count, args);
return;
}
//如果没报错,那么就调用熔断器的onRequestComplete()方法来计数
if (curEntry.getBlockError() == null) {
// passed request
for (CircuitBreaker circuitBreaker : circuitBreakers) {
circuitBreaker.onRequestComplete(context);
}
}
fireExit(context, r, count, args);
}
...
}
public class ExceptionCircuitBreaker extends AbstractCircuitBreaker {
...
@Override
public void onRequestComplete(Context context) {
Entry entry = context.getCurEntry();
if (entry == null) {
return;
}
Throwable error = entry.getError();
//获取当前值
SimpleErrorCounter counter = stat.currentWindow().value();
//如果此次请求报错了,则将errorCount + 1
if (error != null) {
counter.getErrorCount().add(1);
}
//将totalCount总数 + 1,用于计算异常比例
counter.getTotalCount().add(1);
handleStateChangeWhenThresholdExceeded(error);
}
private void handleStateChangeWhenThresholdExceeded(Throwable error) {
//如果当前熔断器已经打开了,则直接返回
if (currentState.get() == State.OPEN) {
return;
}
//如果当前熔断器是半开启状态
if (currentState.get() == State.HALF_OPEN) {
//如果本次请求没出现异常,则代表可以关闭熔断器了
if (error == null) {
//调用AbstractCircuitBreaker.fromHalfOpenToClose()关闭熔断器
fromHalfOpenToClose();
} else {
//如果本次请求还是异常,就继续熔断
//即调用AbstractCircuitBreaker.fromHalfOpenToOpen()方法打开熔断器
fromHalfOpenToOpen(1.0d);
}
return;
}
List<SimpleErrorCounter> counters = stat.values();
//异常数量
long errCount = 0;
//请求总数
long totalCount = 0;
for (SimpleErrorCounter counter : counters) {
errCount += counter.errorCount.sum();
totalCount += counter.totalCount.sum();
}
//如果请求总数没超过最小请求数,那直接放行
if (totalCount < minRequestAmount) {
return;
}
double curCount = errCount;
//熔断策略为异常比例
if (strategy == DEGRADE_GRADE_EXCEPTION_RATIO) {
//计算百分比
curCount = errCount * 1.0d / totalCount;
}
//当错误率或者错误数大于阈值,则打开熔断器
if (curCount > threshold) {
//调用AbstractCircuitBreaker.transformToOpen()方法打开熔断器
transformToOpen(curCount);
}
}
...
}
public abstract class AbstractCircuitBreaker implements CircuitBreaker {
private final EventObserverRegistry observerRegistry;
//熔断器当前的开关状态
protected final AtomicReference<State> currentState = new AtomicReference<>(State.CLOSED);
//下一次尝试恢复的时间
protected volatile long nextRetryTimestamp;
...
protected void transformToOpen(double triggerValue) {
State cs = currentState.get();
switch (cs) {
case CLOSED:
fromCloseToOpen(triggerValue);
break;
case HALF_OPEN:
fromHalfOpenToOpen(triggerValue);
break;
default:
break;
}
}
protected boolean fromHalfOpenToClose() {
if (currentState.compareAndSet(State.HALF_OPEN, State.CLOSED)) {
resetStat();
notifyObservers(State.HALF_OPEN, State.CLOSED, null);
return true;
}
return false;
}
protected boolean fromHalfOpenToOpen(double snapshotValue) {
if (currentState.compareAndSet(State.HALF_OPEN, State.OPEN)) {
updateNextRetryTimestamp();
notifyObservers(State.HALF_OPEN, State.OPEN, snapshotValue);
return true;
}
return false;
}
private void notifyObservers(CircuitBreaker.State prevState, CircuitBreaker.State newState, Double snapshotValue) {
for (CircuitBreakerStateChangeObserver observer : observerRegistry.getStateChangeObservers()) {
observer.onStateChange(prevState, newState, rule, snapshotValue);
}
}
...
}
(4)总结
一.Sentinel熔断降级的两种熔断策略
二.Sentinel熔断降级的流程
三.熔断器的三个状态
四.熔断器三个状态之间的流转过程
一.Sentinel熔断降级的两种熔断策略
策略一:异常熔断器ExceptionCircuitBreaker
异常熔断器关注错误数量、错误比例,其核心功能是在请求结束时更新计数器,统计异常数和总请求数。当达到阈值时,熔断器的状态从CLOSED变为OPEN。
策略二:慢调用比例熔断器ResponseTimeCircuitBreaker
慢调用比例熔断器关注响应时间RT。它计算请求结束时间与请求开始时间的差值,然后与用户设置的阈值比较。若达到阈值,熔断器状态将从CLOSED变为OPEN。
二.Sentinel熔断降级的流程
步骤一:计数
比如统计错误数、响应时间rt。
步骤二:对比
将计数结果和用户设置的熔断阈值做对比,如达到阈值则打开熔断器。
步骤三:验证
请求进来时就可以直接判断熔断器是否是打开状态。如果是打开状态,则直接拒绝请求。如果是关闭状态,则直接放行请求。如果是半打开状态,则进行二次验证,看看是否能放行请求。
三.熔断器的三个状态
状态一:CLOSED,关闭状态
当熔断器处于CLOSED状态时,表示系统正常运行,没有发生熔断。此时,熔断器会对请求进行正常计数和统计。如果统计结果表明异常数/比例或者慢调用比例超过了预设阈值,熔断器将切换至OPEN状态,触发熔断。
状态二:OPEN,打开状态
当熔断器处于OPEN状态时,系统进入熔断状态。在这个状态下,熔断器会拒绝所有新的请求,直接返回预定义的降级策略。在熔断器打开一段时间后(通常由用户设置),熔断器会尝试从OPEN状态切换到HALF_OPEN状态,看系统是否已恢复。
状态三:HALF_OPEN,半开启状态
当熔断器处于HALF_OPEN状态时,系统将允许有限数量的请求通过。如果这些请求成功,熔断器将认为系统已经恢复,然后切回CLOSED状态。如果这些请求异常,熔断器会认为系统未恢复,切回OPEN状态继续熔断。
四.熔断器三个状态之间的流转过程
过程一:从CLOSED到OPEN
当异常数/比例或慢调用比例超过阈值时。
过程二:从OPEN到HALF_OPEN
在熔断器打开一段时间后,尝试恢复系统。
过程三:从HALF_OPEN到CLOSED
当允许的有限数量请求成功时。
过程四:从HALF_OPEN到OPEN
当允许的有限数量请求仍然出现异常时,HALF_OPEN就好比一个中间态。
这种流转机制确保了在系统出现问题时,熔断器能够自动进行熔断保护,同时在系统恢复后能够及时恢复正常运行。