Sentinel源码—6.熔断降级和数据统计的实现一

news2025/4/25 17:43:14

大纲

1.DegradeSlot实现熔断降级的原理与源码

2.Sentinel数据指标统计的滑动窗口算法

1.DegradeSlot实现熔断降级的原理与源码

(1)熔断降级规则DegradeRule的配置Demo

(2)注册熔断降级监听器和加载熔断降级规则

(3)DegradeSlot根据熔断降级规则对请求进行验证

(1)熔断降级规则DegradeRule的配置Demo

首先熔断降级规则的应用场景有如下两种:

场景一:在微服务架构中,当一个服务出现问题时,可以通过配置熔断降级规则,防止故障扩散,保护整个系统的稳定性。

场景二:在调用第三方API时,可以配置熔断降级规则,避免因第三方API不稳定导致自身系统不稳定。

然后从下图可知,熔断降级规则包含以下属性:

属性一:熔断策略(grade)

这表示的是熔断降级规则的类型,取值范围分别是:

RuleConstant.DEGRADE_GRADE_RT(慢调用比例)
RuleConstant.DEGRADE_GRADE_EXCEPTION_RATIO(异常比例)
RuleConstant.DEGRADE_GRADE_EXCEPTION_COUNT(异常数)

其中,默认下的熔断降级规则是基于慢调用比例策略的,也就是默认值为:

RuleConstant.DEGRADE_GRADE_RT

属性二:熔断降级的阈值(count)

DegradeRule.count属性的具体含义取决于DegradeRule.grade属性的值。

如果grade为慢调用比例,则count表示慢调用比例阈值。
如果grade为异常比例,则count表示异常比例阈值。
如果grade为异常数,则count表示异常数阈值。

属性三:熔断时长(timeWindow)

这表示的是熔断降级发生后的降级持续时间,在这段时间内对应的资源将被降级。

属性四:最小请求数(minRequestAmount)

这表示的是熔断降级统计周期内的最小请求总数。仅当周期内的请求总数达到此值时,才会根据grade和count进行熔断降级。默认值为:

RuleConstant.DEGRADE_DEFAULT_MIN_REQUEST_AMOUNT

属性五:慢调用比例阈值(slowRatioThreshold)

该属性当grade为慢调用比例时生效,取值范围为0到1之间的小数,表示慢调用请求占总请求的比例。

属性六:统计时长(statIntervalMs)

这表示的是熔断降级统计周期(单位:毫秒),默认值为1000毫秒(1秒)。在这个周期内,Sentinel会对请求进行统计,以判断是否要进行熔断降级。

public class DegradeRule extends AbstractRule {
    //熔断策略,表示的是熔断降级规则的类型
    private int grade = RuleConstant.DEGRADE_GRADE_RT;

    //熔断降级的阈值,具体含义取决于DegradeRule.grade属性的值
    //如果grade为慢调用比例,则count表示慢调用比例阈值
    //如果grade为异常比例,则count表示异常比例阈值
    //如果grade为异常数,则count表示异常数阈值
    private double count;

    //熔断时长,即熔断降级发生后的降级持续时间,在这段时间内对应的资源将被降级
    private int timeWindow;

    //最小请求数,仅当周期内的请求总数达到此值时,才会根据grade和count进行熔断降级
    private int minRequestAmount = RuleConstant.DEGRADE_DEFAULT_MIN_REQUEST_AMOUNT;

    //慢调用比例阈值,仅当grade为慢调用比例时生效
    private double slowRatioThreshold = 1.0d;

    //统计时长,熔断降级统计周期,在这个周期内,Sentinel会对请求进行统计,以判断是否要进行熔断降级
    private int statIntervalMs = 1000;
    ...
}

接着如下便是DegradeRule的配置Demo:

//Run this demo, and the output will be like:
//1529399827825,total:0, pass:0, block:0
//1529399828825,total:4263, pass:100, block:4164
//1529399829825,total:19179, pass:4, block:19176 // circuit breaker opens
//1529399830824,total:19806, pass:0, block:19806
//1529399831825,total:19198, pass:0, block:19198
//1529399832824,total:19481, pass:0, block:19481
//1529399833826,total:19241, pass:0, block:19241
//1529399834826,total:17276, pass:0, block:17276
//1529399835826,total:18722, pass:0, block:18722
//1529399836826,total:19490, pass:0, block:19492
//1529399837828,total:19355, pass:0, block:19355
//1529399838827,total:11388, pass:0, block:11388
//1529399839829,total:14494, pass:104, block:14390 // After 10 seconds, the system restored
//1529399840854,total:18505, pass:0, block:18505
//1529399841854,total:19673, pass:0, block:19676
public class SlowRatioCircuitBreakerDemo {
    private static final String KEY = "some_method";
    private static volatile boolean stop = false;
    private static int seconds = 120;
    private static AtomicInteger total = new AtomicInteger();
    private static AtomicInteger pass = new AtomicInteger();
    private static AtomicInteger block = new AtomicInteger();

    public static void main(String[] args) throws Exception {
        initDegradeRule();
        registerStateChangeObserver();
        startTick();

        int concurrency = 8;
        for (int i = 0; i < concurrency; i++) {
            Thread entryThread = new Thread(() -> {
                while (true) {
                    Entry entry = null;
                    try {
                        entry = SphU.entry(KEY);
                        pass.incrementAndGet();
                        //RT: [40ms, 60ms)
                        sleep(ThreadLocalRandom.current().nextInt(40, 60));
                    } catch (BlockException e) {
                        block.incrementAndGet();
                        sleep(ThreadLocalRandom.current().nextInt(5, 10));
                    } finally {
                        total.incrementAndGet();
                        if (entry != null) {
                            entry.exit();
                        }
                    }
                }
            });
            entryThread.setName("sentinel-simulate-traffic-task-" + i);
            entryThread.start();
        }
    }

    private static void registerStateChangeObserver() {
        EventObserverRegistry.getInstance().addStateChangeObserver("logging",
            (prevState, newState, rule, snapshotValue) -> {
                if (newState == State.OPEN) {
                    System.err.println(String.format("%s -> OPEN at %d, snapshotValue=%.2f", prevState.name(), TimeUtil.currentTimeMillis(), snapshotValue));
                } else {
                    System.err.println(String.format("%s -> %s at %d", prevState.name(), newState.name(), TimeUtil.currentTimeMillis()));
                }
            }
        );
    }

    private static void initDegradeRule() {
        List<DegradeRule> rules = new ArrayList<>();
        DegradeRule rule = new DegradeRule(KEY)
            .setGrade(CircuitBreakerStrategy.SLOW_REQUEST_RATIO.getType())//Max allowed response time
            .setCount(50)
            .setTimeWindow(10)//Retry timeout (in second)
            .setSlowRatioThreshold(0.6)//Circuit breaker opens when slow request ratio > 60%
            .setMinRequestAmount(100)
            .setStatIntervalMs(20000);
        rules.add(rule);
        DegradeRuleManager.loadRules(rules);
        System.out.println("Degrade rule loaded: " + rules);
    }

    private static void sleep(int timeMs) {
        try {
            TimeUnit.MILLISECONDS.sleep(timeMs);
        } catch (InterruptedException e) {
            // ignore
        }
    }

    private static void startTick() {
        Thread timer = new Thread(new TimerTask());
        timer.setName("sentinel-timer-tick-task");
        timer.start();
    }

    static class TimerTask implements Runnable {
        @Override
        public void run() {
            long start = System.currentTimeMillis();
            System.out.println("Begin to run! Go go go!");
            System.out.println("See corresponding metrics.log for accurate statistic data");

            long oldTotal = 0;
            long oldPass = 0;
            long oldBlock = 0;

            while (!stop) {
                sleep(1000);

                long globalTotal = total.get();
                long oneSecondTotal = globalTotal - oldTotal;
                oldTotal = globalTotal;

                long globalPass = pass.get();
                long oneSecondPass = globalPass - oldPass;
                oldPass = globalPass;

                long globalBlock = block.get();
                long oneSecondBlock = globalBlock - oldBlock;
                oldBlock = globalBlock;

                System.out.println(TimeUtil.currentTimeMillis() + ", total:" + oneSecondTotal + ", pass:" + oneSecondPass + ", block:" + oneSecondBlock);
                if (seconds-- <= 0) {
                    stop = true;
                }
            }

            long cost = System.currentTimeMillis() - start;
            System.out.println("time cost: " + cost + " ms");
            System.out.println("total: " + total.get() + ", pass:" + pass.get() + ", block:" + block.get());
            System.exit(0);
        }
    }
}

(2)注册熔断降级监听器和加载熔断降级规则

一.Sentinel监听器模式的核心代码回顾

二.注册熔断降级监听器和加载熔断降级规则

三.用于实现Sentinel熔断降级功能的熔断器接口

一.Sentinel监听器模式的核心代码回顾

Sentinel监听器模式会包含三大角色:

角色一:监听器PropertyListener

角色二:监听器管理器SentinelProperty

角色三:规则管理器RuleManager

首先,规则管理器RuleManager在初始化时,会调用监听器管理器SentinelProperty的addListener()方法将监听器PropertyListener注册到监听器管理器SentinelProperty上。

然后,使用方使用具体的规则时,可以通过调用规则管理器RuleManager的loadRules()方法加载规则。加载规则时会调用监听器管理器SentinelProperty的updateValue()方法通知每一个监听器PropertyListener,即通过监听器PropertyListener的configUpdate()方法把规则加载到规则管理器的本地中。

二.注册熔断降级监听器和加载熔断降级规则

DegradeRuleManager中有两个全局的HashMap:一个是用于存放资源和熔断器的对应关系的HashMap—circuitBreakers,另一个是用于存放资源和熔断规则的对应关系的HashMap—ruleMap。

其中熔断器是由熔断策略DegradeRule.grade来决定的。如果熔断策略是慢调用比例,则熔断器是ResponseTimeCircuitBreaker。如果熔断策略是异常比例和异常数,则熔断器是ExceptionCircuitBreaker。

public class DynamicSentinelProperty<T> implements SentinelProperty<T> {
    protected Set<PropertyListener<T>> listeners = new CopyOnWriteArraySet<>();
    private T value = null;
    ...
    //添加监听器到集合
    @Override
    public void addListener(PropertyListener<T> listener) {
        listeners.add(listener);
        //回调监听器的configLoad()方法初始化规则配置
        listener.configLoad(value);
    }
    
    //更新值
    @Override
    public boolean updateValue(T newValue) {
        //如果值没变化,直接返回
        if (isEqual(value, newValue)) {
            return false;
        }
        RecordLog.info("[DynamicSentinelProperty] Config will be updated to: {}", newValue);

        //如果值发生了变化,则遍历监听器,回调监听器的configUpdate()方法更新对应的值
        value = newValue;
        for (PropertyListener<T> listener : listeners) {
            listener.configUpdate(newValue);
        }
        return true;
    }
    ...
}

public final class DegradeRuleManager {
    //用于存放资源和熔断器的对应关系的HashMap,其中熔断器是由熔断策略DegradeRule.grade来决定的
    private static volatile Map<String, List<CircuitBreaker>> circuitBreakers = new HashMap<>();
    //用于存放资源和熔断规则的对应关系的HashMap
    private static volatile Map<String, Set<DegradeRule>> ruleMap = new HashMap<>();
    private static final RulePropertyListener LISTENER = new RulePropertyListener();
    private static SentinelProperty<List<DegradeRule>> currentProperty = new DynamicSentinelProperty<>();

    static {
        currentProperty.addListener(LISTENER);
    }
    ...
    
    private static class RulePropertyListener implements PropertyListener<List<DegradeRule>> {
        @Override
        public void configUpdate(List<DegradeRule> conf) {
            reloadFrom(conf);
            RecordLog.info("[DegradeRuleManager] Degrade rules has been updated to: {}", ruleMap);
        }
       
        @Override
        public void configLoad(List<DegradeRule> conf) {
            reloadFrom(conf);
            RecordLog.info("[DegradeRuleManager] Degrade rules loaded: {}", ruleMap);
        }
       
        private synchronized void reloadFrom(List<DegradeRule> list) {
            //构建熔断器
            Map<String, List<CircuitBreaker>> cbs = buildCircuitBreakers(list);
            Map<String, Set<DegradeRule>> rm = new HashMap<>(cbs.size());
            for (Map.Entry<String, List<CircuitBreaker>> e : cbs.entrySet()) {
                assert e.getValue() != null && !e.getValue().isEmpty();
                Set<DegradeRule> rules = new HashSet<>(e.getValue().size());
                for (CircuitBreaker cb : e.getValue()) {
                    rules.add(cb.getRule());
                }
                rm.put(e.getKey(), rules);
            }
            DegradeRuleManager.circuitBreakers = cbs;
            DegradeRuleManager.ruleMap = rm;
        }

        private Map<String, List<CircuitBreaker>> buildCircuitBreakers(List<DegradeRule> list) {
            Map<String, List<CircuitBreaker>> cbMap = new HashMap<>(8);
            if (list == null || list.isEmpty()) {
                return cbMap;
            }
            for (DegradeRule rule : list) {
                if (!isValidRule(rule)) {
                    RecordLog.warn("[DegradeRuleManager] Ignoring invalid rule when loading new rules: {}", rule);
                    continue;
                }
                if (StringUtil.isBlank(rule.getLimitApp())) {
                    rule.setLimitApp(RuleConstant.LIMIT_APP_DEFAULT);
                }
                CircuitBreaker cb = getExistingSameCbOrNew(rule);
                if (cb == null) {
                    RecordLog.warn("[DegradeRuleManager] Unknown circuit breaking strategy, ignoring: {}", rule);
                    continue;
                }
                String resourceName = rule.getResource();
                List<CircuitBreaker> cbList = cbMap.get(resourceName);
                if (cbList == null) {
                    cbList = new ArrayList<>();
                    cbMap.put(resourceName, cbList);
                }
                cbList.add(cb);
            }
            return cbMap;
        }
    }

    private static CircuitBreaker getExistingSameCbOrNew(DegradeRule rule) {
        List<CircuitBreaker> cbs = getCircuitBreakers(rule.getResource());
        if (cbs == null || cbs.isEmpty()) {
            return newCircuitBreakerFrom(rule);
        }
        for (CircuitBreaker cb : cbs) {
            if (rule.equals(cb.getRule())) {
                //Reuse the circuit breaker if the rule remains unchanged.
                return cb;
            }
        }
        return newCircuitBreakerFrom(rule);
    }
    
    static List<CircuitBreaker> getCircuitBreakers(String resourceName) {
        return circuitBreakers.get(resourceName);
    }
    
    //Create a circuit breaker instance from provided circuit breaking rule.
    private static CircuitBreaker newCircuitBreakerFrom(DegradeRule rule) {
        switch (rule.getGrade()) {
            case RuleConstant.DEGRADE_GRADE_RT:
                return new ResponseTimeCircuitBreaker(rule);
            case RuleConstant.DEGRADE_GRADE_EXCEPTION_RATIO:
            case RuleConstant.DEGRADE_GRADE_EXCEPTION_COUNT:
                return new ExceptionCircuitBreaker(rule);
            default:
                return null;
        }
    }
}

三.用于实现Sentinel熔断降级功能的熔断器接口

进行熔断降级规则验证时,就会根据熔断降级规则的熔断策略,选择对应的熔断器CircuitBreaker。再通过熔断器CircuitBreaker的tryPass()接口,尝试通过当前请求。

//熔断器接口,用于实现Sentinel的熔断降级功能
public interface CircuitBreaker {
    //Get the associated circuit breaking rule.
    //获取当前熔断器对应的熔断降级规则
    DegradeRule getRule();

    //Acquires permission of an invocation only if it is available at the time of invoking.
    //尝试通过熔断器
    //如果熔断器处于关闭状态(CLOSED),则允许请求通过;
    //如果处于打开状态(OPEN),则拒绝请求;
    //如果处于半开状态(HALF_OPEN),则根据规则允许部分请求通过;
    boolean tryPass(Context context);

    //Get current state of the circuit breaker.
    //获取当前熔断器的状态(OPEN, HALF_OPEN, CLOSED)
    State currentState();

    //Record a completed request with the context and handle state transformation of the circuit breaker.
    //Called when a passed invocation finished.
    //在请求完成后调用此方法,用于更新熔断器的统计数据
    void onRequestComplete(Context context);

    //Circuit breaker state.
    enum State {
        //In OPEN state, all requests will be rejected until the next recovery time point.
        //表示熔断器处于打开状态,此时会拒绝所有请求
        OPEN,
       
        //In HALF_OPEN state, the circuit breaker will allow a "probe" invocation.
        //If the invocation is abnormal according to the strategy (e.g. it's slow), 
        //the circuit breaker will re-transform to the OPEN state and wait for the next recovery time point;
        //otherwise the resource will be regarded as "recovered" and the circuit breaker will cease cutting off requests and transform to CLOSED state. 
        //表示熔断器处于半开状态,此时允许部分请求通过,以检测系统是否已经恢复正常
        HALF_OPEN,
        
        //In CLOSED state, all requests are permitted. 
        //When current metric value exceeds the threshold, the circuit breaker will transform to OPEN state.
        //表示熔断器处于关闭状态,此时允许所有请求通过
        CLOSED
    }
}

public class ResponseTimeCircuitBreaker extends AbstractCircuitBreaker {
    ...
    ...
}

public class ExceptionCircuitBreaker extends AbstractCircuitBreaker {
    ...
    ...
}

(3)DegradeSlot根据熔断降级规则对请求进行验证

一.entry()方法会对请求进行熔断降级的规则验证

二.exit()方法会触发改变熔断器的状态

开始对请求进行规则验证时,需要调用SphU的entry()方法。完成对请求的规则验证后,也需要调用Entry的exit()方法。

一.entry()方法会对请求进行熔断降级的规则验证

在DegradeSlot的entry()方法中,执行熔断降级规则验证的是DegradeSlot的performChecking()方法。该方法首先会根据资源名称从DegradeRuleManager中获取熔断器,然后调用每个熔断器的tryPass()方法判断熔断器开关是否已打开来验证。如果验证通过,则返回true表示放行请求。如果验证不通过,则返回false表示拦截请求。

在判断熔断器开关是否已打开的AbstractCircuitBreaker的tryPass()方法中,首先会判断熔断器是否是关闭状态。如果是关闭状态,则代表没打开熔断器,于是会直接返回true放行请求。如果是打开状态,则要继续判断当前请求是否已达熔断器恢复时间。如果当前请求已达熔断器恢复时间,也就是当前时间大于下次尝试恢复的时间,且成功将熔断器状态从OPEN变为HALF_OPEN,则放行请求,否则拒绝。

@Spi(order = Constants.ORDER_DEGRADE_SLOT)
public class DegradeSlot extends AbstractLinkedProcessorSlot<DefaultNode> {
    @Override
    public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count, boolean prioritized, Object... args) throws Throwable {
        //验证熔断规则的逻辑
        performChecking(context, resourceWrapper);
        fireEntry(context, resourceWrapper, node, count, prioritized, args);
    }

    void performChecking(Context context, ResourceWrapper r) throws BlockException {
        //先根据资源名称获取对应的熔断器,也就是从DegradeRuleManager中的Map<String, List<CircuitBreaker>>类型的circuitBreakers获取
        List<CircuitBreaker> circuitBreakers = DegradeRuleManager.getCircuitBreakers(r.getName());
        if (circuitBreakers == null || circuitBreakers.isEmpty()) {
            return;
        }
        //调用每个熔断器的tryPass()方法验证当前请求是否可以通过
        for (CircuitBreaker cb : circuitBreakers) {
            if (!cb.tryPass(context)) {
                throw new DegradeException(cb.getRule().getLimitApp(), cb.getRule());
            }
        }
    }
    ...
}

public abstract class AbstractCircuitBreaker implements CircuitBreaker {
    protected final DegradeRule rule;
    private final EventObserverRegistry observerRegistry;
    //熔断器当前的开关状态
    protected final AtomicReference<State> currentState = new AtomicReference<>(State.CLOSED);
    //下一次尝试恢复的时间
    protected volatile long nextRetryTimestamp;
    ...
    
    @Override
    public boolean tryPass(Context context) {
        //首先判断熔断器是否是关闭状态
        //如果是关闭状态则代表根本没打开熔断器,也就不涉及熔断了,因此直接返回true放行当前请求
        if (currentState.get() == State.CLOSED) {
            return true;
        }
        //如果熔断器是打开状态,那么就要进行如下逻辑判断
        if (currentState.get() == State.OPEN) {
            //如果当前系统时间大于等于下一次尝试恢复的时间,即已到达可尝试恢复的时间且成功设置当前熔断器状态为半开启,则可以放行当前请求
            //也就是如果此次请求已达到了熔断器恢复时间,并且将熔断器的状态从打开变为半开启,则放行,反之拒绝
            return retryTimeoutArrived() && fromOpenToHalfOpen(context);
        }
        return false;
    }
    
    protected boolean retryTimeoutArrived() {
        //当前时间是否大于下一次尝试恢复的时间
        return TimeUtil.currentTimeMillis() >= nextRetryTimestamp;
    }
    
    protected boolean fromOpenToHalfOpen(Context context) {
        //将当前熔断器的状态从OPEN变为HALF_OPEN
        if (currentState.compareAndSet(State.OPEN, State.HALF_OPEN)) {
            //通过观察者模式通知各个观察者
            notifyObservers(State.OPEN, State.HALF_OPEN, null);
            ...
            return true;
        }
        return false;
    }
    
    private void notifyObservers(CircuitBreaker.State prevState, CircuitBreaker.State newState, Double snapshotValue) {
        for (CircuitBreakerStateChangeObserver observer : observerRegistry.getStateChangeObservers()) {
            observer.onStateChange(prevState, newState, rule, snapshotValue);
        }
    }
    ...
}

二.exit()方法会触发改变熔断器的状态

问题一:熔断器的tryPass()方法一开始就会判断熔断器的状态,那么熔断器何时打开、何时关闭?

答:这个会在配置熔断降级规则DegradeRule时指定,如下图就指定了:如果最近10000ms内,10个请求中有2个是异常的,则触发熔断。

问题二:如果熔断器状态为打开,判断时会比较下一次尝试恢复时间和当前时间。如果当前时间大于下一次尝试恢复的时间,意味着请求已超过熔断时间,即当前请求不再处于熔断时间段内,因此可以放行。那么下一次尝试恢复的时间nextRetryTimestamp会在何时更新?

异常数据的采集和下次恢复时间的更新会由DegradeSlot的exit()方法触发。因为开始对请求进行规则验证时,会调用SphU的entry()方法。但完成对请求的规则验证后,则会调用Entry的exit()方法,而Entry的exit()方法最终就会执行到DegradeSlot的exit()方法。

在DegradeSlot的exit()方法中,就会调用熔断器的onRequestComplete()方法来进行计数,并改变熔断器的状态。

比如在执行ExceptionCircuitBreaker的onRequestComplete()方法中,会先统计异常数errorCount和总请求数totalCount,然后根据熔断降级的规则判断是否达到打开或关闭熔断器的阈值,最后执行比如AbstractCircuitBreaker的transformToOpen()方法打开熔断器。

AbstractCircuitBreaker.transformToOpen()方法的主要工作是:首先将当前熔断器状态变更为OPEN,然后更新下一次尝试恢复时间nextRetryTimestamp,最后通过观察者设计模式通知各个观察者。

@Spi(order = Constants.ORDER_DEGRADE_SLOT)
public class DegradeSlot extends AbstractLinkedProcessorSlot<DefaultNode> {
    ...
    @Override
    public void exit(Context context, ResourceWrapper r, int count, Object... args) {
        Entry curEntry = context.getCurEntry();
        if (curEntry.getBlockError() != null) {
            fireExit(context, r, count, args);
            return;
        }
        List<CircuitBreaker> circuitBreakers = DegradeRuleManager.getCircuitBreakers(r.getName());
        if (circuitBreakers == null || circuitBreakers.isEmpty()) {
            fireExit(context, r, count, args);
            return;
        }

        //如果没报错,那么就调用熔断器的onRequestComplete()方法来计数
        if (curEntry.getBlockError() == null) {
            // passed request
            for (CircuitBreaker circuitBreaker : circuitBreakers) {
                circuitBreaker.onRequestComplete(context);
            }
        }

        fireExit(context, r, count, args);
    }
    ...
}

public class ExceptionCircuitBreaker extends AbstractCircuitBreaker {
    ...
    @Override
    public void onRequestComplete(Context context) {
        Entry entry = context.getCurEntry();
        if (entry == null) {
            return;
        }
        Throwable error = entry.getError();
        //获取当前值
        SimpleErrorCounter counter = stat.currentWindow().value();
        //如果此次请求报错了,则将errorCount + 1
        if (error != null) {
            counter.getErrorCount().add(1);
        }
        //将totalCount总数 + 1,用于计算异常比例
        counter.getTotalCount().add(1);

        handleStateChangeWhenThresholdExceeded(error);
    }

    private void handleStateChangeWhenThresholdExceeded(Throwable error) {
        //如果当前熔断器已经打开了,则直接返回
        if (currentState.get() == State.OPEN) {
            return;
        }
        //如果当前熔断器是半开启状态
        if (currentState.get() == State.HALF_OPEN) {
            //如果本次请求没出现异常,则代表可以关闭熔断器了
            if (error == null) {
                //调用AbstractCircuitBreaker.fromHalfOpenToClose()关闭熔断器
                fromHalfOpenToClose();
            } else {
                //如果本次请求还是异常,就继续熔断
                //即调用AbstractCircuitBreaker.fromHalfOpenToOpen()方法打开熔断器
                fromHalfOpenToOpen(1.0d);
            }
            return;
        }
    
        List<SimpleErrorCounter> counters = stat.values();
        //异常数量
        long errCount = 0;
        //请求总数
        long totalCount = 0;
        for (SimpleErrorCounter counter : counters) {
            errCount += counter.errorCount.sum();
            totalCount += counter.totalCount.sum();
        }
        //如果请求总数没超过最小请求数,那直接放行
        if (totalCount < minRequestAmount) {
            return;
        }
        double curCount = errCount;
        //熔断策略为异常比例
        if (strategy == DEGRADE_GRADE_EXCEPTION_RATIO) {
            //计算百分比
            curCount = errCount * 1.0d / totalCount;
        }
        //当错误率或者错误数大于阈值,则打开熔断器
        if (curCount > threshold) {
            //调用AbstractCircuitBreaker.transformToOpen()方法打开熔断器
            transformToOpen(curCount);
        }
    }
    ...
}

public abstract class AbstractCircuitBreaker implements CircuitBreaker {
    private final EventObserverRegistry observerRegistry;
    //熔断器当前的开关状态
    protected final AtomicReference<State> currentState = new AtomicReference<>(State.CLOSED);
    //下一次尝试恢复的时间
    protected volatile long nextRetryTimestamp;
    ...
    
    protected void transformToOpen(double triggerValue) {
        State cs = currentState.get();
        switch (cs) {
            case CLOSED:
                fromCloseToOpen(triggerValue);
                break;
            case HALF_OPEN:
                fromHalfOpenToOpen(triggerValue);
                break;
            default:
                break;
        }
    }
    
    protected boolean fromHalfOpenToClose() {
        if (currentState.compareAndSet(State.HALF_OPEN, State.CLOSED)) {
            resetStat();
            notifyObservers(State.HALF_OPEN, State.CLOSED, null);
            return true;
        }
        return false;
    }
    
    protected boolean fromHalfOpenToOpen(double snapshotValue) {
        if (currentState.compareAndSet(State.HALF_OPEN, State.OPEN)) {
            updateNextRetryTimestamp();
            notifyObservers(State.HALF_OPEN, State.OPEN, snapshotValue);
            return true;
        }
        return false;
    }
    
    private void notifyObservers(CircuitBreaker.State prevState, CircuitBreaker.State newState, Double snapshotValue) {
        for (CircuitBreakerStateChangeObserver observer : observerRegistry.getStateChangeObservers()) {
            observer.onStateChange(prevState, newState, rule, snapshotValue);
        }
    }
    ...
}

(4)总结

一.Sentinel熔断降级的两种熔断策略

二.Sentinel熔断降级的流程

三.熔断器的三个状态

四.熔断器三个状态之间的流转过程

一.Sentinel熔断降级的两种熔断策略

策略一:异常熔断器ExceptionCircuitBreaker

异常熔断器关注错误数量、错误比例,其核心功能是在请求结束时更新计数器,统计异常数和总请求数。当达到阈值时,熔断器的状态从CLOSED变为OPEN。

策略二:慢调用比例熔断器ResponseTimeCircuitBreaker

慢调用比例熔断器关注响应时间RT。它计算请求结束时间与请求开始时间的差值,然后与用户设置的阈值比较。若达到阈值,熔断器状态将从CLOSED变为OPEN。

二.Sentinel熔断降级的流程

步骤一:计数

比如统计错误数、响应时间rt。

步骤二:对比

将计数结果和用户设置的熔断阈值做对比,如达到阈值则打开熔断器。

步骤三:验证

请求进来时就可以直接判断熔断器是否是打开状态。如果是打开状态,则直接拒绝请求。如果是关闭状态,则直接放行请求。如果是半打开状态,则进行二次验证,看看是否能放行请求。

三.熔断器的三个状态

状态一:CLOSED,关闭状态

当熔断器处于CLOSED状态时,表示系统正常运行,没有发生熔断。此时,熔断器会对请求进行正常计数和统计。如果统计结果表明异常数/比例或者慢调用比例超过了预设阈值,熔断器将切换至OPEN状态,触发熔断。

状态二:OPEN,打开状态

当熔断器处于OPEN状态时,系统进入熔断状态。在这个状态下,熔断器会拒绝所有新的请求,直接返回预定义的降级策略。在熔断器打开一段时间后(通常由用户设置),熔断器会尝试从OPEN状态切换到HALF_OPEN状态,看系统是否已恢复。

状态三:HALF_OPEN,半开启状态

当熔断器处于HALF_OPEN状态时,系统将允许有限数量的请求通过。如果这些请求成功,熔断器将认为系统已经恢复,然后切回CLOSED状态。如果这些请求异常,熔断器会认为系统未恢复,切回OPEN状态继续熔断。

四.熔断器三个状态之间的流转过程

过程一:从CLOSED到OPEN

当异常数/比例或慢调用比例超过阈值时。

过程二:从OPEN到HALF_OPEN

在熔断器打开一段时间后,尝试恢复系统。

过程三:从HALF_OPEN到CLOSED

当允许的有限数量请求成功时。

过程四:从HALF_OPEN到OPEN

当允许的有限数量请求仍然出现异常时,HALF_OPEN就好比一个中间态。

这种流转机制确保了在系统出现问题时,熔断器能够自动进行熔断保护,同时在系统恢复后能够及时恢复正常运行。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2342597.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Volcano 实战快速入门 (一)

一、技术背景 随着大型语言模型&#xff08;LLM&#xff09;的蓬勃发展&#xff0c;其在 Kubernetes (K8s) 环境下的训练和推理对资源调度与管理提出了前所未有的挑战。这些挑战主要源于 LLM 对计算资源&#xff08;尤其是 GPU&#xff09;的巨大需求、分布式任务固有的复杂依…

用交换机连接两台电脑,电脑A读取/写电脑B的数据

1、第一步&#xff0c;打开控制面板中的网络和共享中心&#xff0c;如下图配置&#xff0c;电脑A和电脑B均要配置&#xff1b; 注意&#xff1a;要保证电脑A和电脑B在同一子网掩码下&#xff0c;不同的IP地址&#xff1b; 2、在电脑上同时按‘CommandR’&#xff0c;在弹出的输…

问道数码兽 怀旧剧情回合手游源码搭建教程(反查重优化版)

本文将对"问道数码兽"这一经典卡通风格回合制手游的服务端部署与客户端调整流程进行详细拆解&#xff0c;适用于具备基础 Windows 运维和手游源码调试经验的开发者参考使用。教程以实战为导向&#xff0c;基于原始说明内容重构优化&#xff0c;具备较高的内容查重避重…

WLAN共享给以太网后以太网IP为169.254.xx.xx以及uboot无法使用nfs下载命令的的解决方案

WLAN共享网络给以太网&#xff0c;实际上是把以太网口当作一个路由器&#xff0c;这个路由器的IP是由WLAN给他分配的&#xff0c;169.254.xx.xx是windows设定的ip&#xff0c;当网络接口无法从上一级网络接口获得ip时&#xff0c;该网络接口的ip被设置为169.254 &#xff0c;所…

ROS 快速入门教程03

8.编写Subscriber订阅者节点 8.1 创建订阅者节点 cd catkin_ws/src/ catkin_create_pkg atr_pkg rospy roscpp std_msgs ros::Subscriber sub nh.subscribe(话题名, 缓存队列长度, 回调函数) 回调函数通常在你创建订阅者时定义。一个订阅者会监听一个话题&#xff0c;并在有…

在 macOS 上合并 IntelliJ IDEA 的项目窗口

在使用 IntelliJ IDEA 开发时&#xff0c;可能会打开多个项目窗口&#xff0c;这可能会导致界面变得混乱。为了提高工作效率&#xff0c;可以通过合并项目窗口来简化界面。本文将介绍如何在 macOS 上合并 IntelliJ IDEA 的项目窗口。 操作步骤 打开 IntelliJ IDEA: 启动你的 I…

基于多用户商城系统的行业资源整合模式与商业价值探究

随着电子商务的蓬勃发展&#xff0c;传统的单一商家电商模式逐渐显现出一定的局限性。为了解决商家成本过高、市场竞争激烈等问题&#xff0c;多用户商城系统应运而生&#xff0c;成为一种新型的电商平台模式。通过整合行业资源&#xff0c;这种模式不仅极大地提升了平台和商家…

Three.js + React 实战系列 : 从零搭建 3D 个人主页

可能你对tailiwindcss毫不了解&#xff0c;别紧张&#xff0c;记住我们只是在学习&#xff0c;学习的是作者的思想和技巧&#xff0c;并不是某一行代码。 在之前的几篇文章中&#xff0c;我们已经熟悉了 Three.js 的基本用法&#xff0c;并通过 react-three-fiber 快速构建了一…

如何用大模型技术重塑物流供应链

摘要 在数字化转型加速的背景下&#xff0c;大模型技术凭借其强大的数据分析、逻辑推理和决策优化能力&#xff0c;正成为物流供应链领域的核心驱动力。本文深入探讨大模型如何通过需求预测、智能调度、供应链协同、风险管控等关键环节&#xff0c;推动物流行业从 "经验驱…

【银河麒麟高级服务器操作系统】磁盘只读问题分析

系统环境及配置 系统环境 物理机/虚拟机/云/容器 虚拟机 网络环境 外网/私有网络/无网络 私有网络 硬件环境 机型 KVM Virtual Machine 处理器 Kunpeng-920 内存 32 GiB 整机类型/架构 arm64 固件版本 EFI Development Kit II / OVMF 软件环境 具体操作系统版…

机器视觉的智能手机屏贴合应用

在智能手机制造领域&#xff0c;屏幕贴合工艺堪称"微米级的指尖芭蕾"。作为影响触控灵敏度、显示效果和产品可靠性的关键工序&#xff0c;屏幕贴合精度直接决定了用户体验。传统人工对位方式已无法满足全面屏时代对极窄边框和超高屏占比的严苛要求&#xff0c;而Mast…

AIM Robotics电动胶枪:智能分配,让机器人点胶涂胶精准无误

在现代工业自动化和智能制造领域&#xff0c;精确的液体分配技术正成为提升生产效率和产品质量的重要因素。AIM Robotics作为这一领域的创新者&#xff0c;提供了多种高效、灵活的点胶涂胶分配解决方案。本文将带您了解AIM Robotics的核心技术、产品系列以及在各行业的成功应用…

负环-P3385-P2136

通过选择标签&#xff0c;洛谷刷一个类型的题目还是很方便的 模版题P3385 P3385 【模板】负环 - 洛谷 Tint(input())def bellman(n,edges,sta):INFfloat(inf)d[INF]*(n1)d[sta]0for i in range(n-1):for u,v,w in edges:ncostd[u]wif ncost<d[v]:d[v]ncostfor u,v,w in e…

抖音的逆向工程获取弹幕(websocket和protobuf解析)

目录 声明前言第一节 获取room_id和ttwid值第二节 signture值逆向python 实现signature第三节 Websocket实现长链接请求protubuf反序列化pushFrame反序列化Response解压和反序列化消息体Message解析应答ack参考博客声明 本文章中所有内容仅供学习交流使用,不用于其他任何目的…

WPF 图片文本按钮 自定义按钮

效果 上面图片,下面文本 样式 <!-- 图片文本按钮样式 --> <Style x:Key="ImageTextButtonStyle" TargetType="Button"><Setter Property="Background" Value="Transparent"/><Setter Property="BorderTh…

Diffusion inversion后的latent code与标准的高斯随机噪音不一样

可视化latents_list如下; 可视化最后一步与标准的噪声&#xff1a; 能隐约看出到最后一步还是会有“马”的形状 整个代码&#xff08;及可视化代码如下&#xff09;&#xff1a; ## 参考freeprompt(FPE)的代码 import os import torch import torch.nn as nn import torch.n…

江湖密码术:Rust中的 bcrypt 加密秘籍

前言 江湖险恶,黑客如雨,昔日密码“123456”早被各路大侠怒斥为“纸糊轻功”。若还执迷不悟,用明文密码闯荡江湖,无异于身披藏宝图在集市上狂奔,目标大到闪瞎黑客双眼。 为护你安然度过每一场数据风波,特献上一门绝学《Rust加密神功》。核心招式正是传说中的 bcrypt 密…

Milvus(3):数据库、Collections说明

1 数据库 Milvus 在集合之上引入了数据库层&#xff0c;为管理和组织数据提供了更有效的方式&#xff0c;同时支持多租户。 1.1 什么是数据库 在 Milvus 中&#xff0c;数据库是组织和管理数据的逻辑单元。为了提高数据安全性并实现多租户&#xff0c;你可以创建多个数据库&am…

【Hive入门】Hive数据模型与存储格式深度解析:从理论到实践的最佳选择

目录 1 Hive数据模型全景图 2 Hive存储架构解析 3 存储格式对比矩阵 4 存储格式选择决策树 5 ORC文件结构剖析 6 Parquet与ORC技术对比 7 最佳实践指南 7.1 建表示例模板 7.2 性能优化 8 总结 1 Hive数据模型全景图 模型核心组件解析&#xff1a; Database&#xff1…

2025能源网络安全大赛CTF --- Crypto wp

文章目录 前言simpleSigninNumberTheory 前言 大半年以来写的第一篇文章&#xff01;&#xff01;&#xff01; simpleSignin 题目&#xff1a; from Crypto.Util.number import * from gmpy2 import * import osflag bxxx p next_prime(bytes_to_long(os.urandom(128))…