【源码解析】流控框架Sentinel源码解析

news2024/9/22 17:28:44

Sentinel简介

Sentinel是阿里开源的一款面向分布式、多语言异构化服务架构的流量治理组件。

主要以流量为切入点,从流量路由、流量控制、流量整形、熔断降级、系统自适应过载保护、热点流量防护等多个维度来帮助开发者保障微服务的稳定性。

核心概念

资源

资源是Sentinel中一个非常重要的概念,资源就是Sentinel所保护的对象。

资源可以是一段代码,又或者是一个接口,Sentinel中并没有什么强制规定,但是实际项目中一般以一个接口为一个资源,比如说一个http接口,又或者是rpc接口,它们就是资源,可以被保护。

资源是通过Sentinel的API定义的,每个资源都有一个对应的名称,比如对于一个http接口资源来说,Sentinel默认的资源名称就是请求路径。

规则

规则也是一个重要的概念,规则其实比较好理解,比如说要对一个资源进行限流,那么限流的条件就是规则,后面在限流的时候会基于这个规则来判定是否需要限流。

Sentinel的规则分为流量控制规则、熔断降级规则以及系统保护规则,不同的规则实现的效果不一样。

快速入门

引入依赖

       <dependency>
            <groupId>com.alibaba.cloud</groupId>
            <artifactId>spring-cloud-starter-alibaba-sentinel</artifactId>
            <version>2021.1</version>
        </dependency>

设置流控规则

@Configuration
public class FLowRulesConfig {

    /**
     * 定义限流规则
     */
    @PostConstruct
    public void initFLowRules() {
        //1.创建限流规则
        List<FlowRule> rules = new ArrayList<>();
        FlowRule rule = new FlowRule();

        // 定义资源,表示Sentinel会对哪个资源生效
        rule.setResource("hello");

        // 定义限流规则类型,QPS限流类型
        rule.setGrade(RuleConstant.FLOW_GRADE_QPS);
        // 定义QPS每秒能通过的请求个数
        rule.setCount(2);
        rules.add(rule);

        // 2、加载限流规则
        FlowRuleManager.loadRules(rules);
    }
}

编写Controller,进行测试

@Slf4j
@RestController
public class HelloController {

    @GetMapping("/hello")
    public String startHello() {
        // 进行限流
        try (Entry entry = SphU.entry("hello")) { // 加载限流规则,若果存在才会往下执行
            // 被保护的资源
            return "hello Sentinel";
        } catch (BlockException e) {
            e.printStackTrace();
            // 被限流或者被降级的操作
            return "系统繁忙";
        }
    }
}

源码拆解

CtSph

核心是Entry entry = SphU.entry("hello")

CtSph#entryWithPriority(),获取Context,如果不存在,则使用默认的。构造责任链,执行。

    private Entry entryWithPriority(ResourceWrapper resourceWrapper, int count, boolean prioritized, Object... args)
        throws BlockException {
        Context context = ContextUtil.getContext();
        if (context instanceof NullContext) {
            // The {@link NullContext} indicates that the amount of context has exceeded the threshold,
            // so here init the entry only. No rule checking will be done.
            return new CtEntry(resourceWrapper, null, context);
        }

        if (context == null) {
            // Using default context.
            context = InternalContextUtil.internalEnter(Constants.CONTEXT_DEFAULT_NAME);
        }

        // Global switch is close, no rule checking will do.
        if (!Constants.ON) {
            return new CtEntry(resourceWrapper, null, context);
        }

        ProcessorSlot<Object> chain = lookProcessChain(resourceWrapper);

        /*
         * Means amount of resources (slot chain) exceeds {@link Constants.MAX_SLOT_CHAIN_SIZE},
         * so no rule checking will be done.
         */
        if (chain == null) {
            return new CtEntry(resourceWrapper, null, context);
        }

        Entry e = new CtEntry(resourceWrapper, chain, context);
        try {
            chain.entry(context, resourceWrapper, null, count, prioritized, args);
        } catch (BlockException e1) {
            e.exit(count, args);
            throw e1;
        } catch (Throwable e1) {
            // This should not happen, unless there are errors existing in Sentinel internal.
            RecordLog.info("Sentinel unexpected exception", e1);
        }
        return e;
    }

    ProcessorSlot<Object> lookProcessChain(ResourceWrapper resourceWrapper) {
        ProcessorSlotChain chain = chainMap.get(resourceWrapper);
        if (chain == null) {
            synchronized (LOCK) {
                chain = chainMap.get(resourceWrapper);
                if (chain == null) {
                    // Entry size limit.
                    if (chainMap.size() >= Constants.MAX_SLOT_CHAIN_SIZE) {
                        return null;
                    }

                    chain = SlotChainProvider.newSlotChain();
                    Map<ResourceWrapper, ProcessorSlotChain> newMap = new HashMap<ResourceWrapper, ProcessorSlotChain>(
                        chainMap.size() + 1);
                    newMap.putAll(chainMap);
                    newMap.put(resourceWrapper, chain);
                    chainMap = newMap;
                }
            }
        }
        return chain;
    }

ContextUtil静态方法,会初始化一个默认的EntranceNode

    static {
        // Cache the entrance node for default context.
        initDefaultContext();
    }

    private static void initDefaultContext() {
        String defaultContextName = Constants.CONTEXT_DEFAULT_NAME;
        EntranceNode node = new EntranceNode(new StringResourceWrapper(defaultContextName, EntryType.IN), null);
        Constants.ROOT.addChild(node);
        contextNameNodeMap.put(defaultContextName, node);
    }

ContextUtil

ContextUtil#enter(),当执行该方法的时候,会往线程变量ThreadLocal存入当前的Context

    public static Context enter(String name, String origin) {
        if (Constants.CONTEXT_DEFAULT_NAME.equals(name)) {
            throw new ContextNameDefineException(
                "The " + Constants.CONTEXT_DEFAULT_NAME + " can't be permit to defined!");
        }
        return trueEnter(name, origin);
    }

    protected static Context trueEnter(String name, String origin) {
        Context context = contextHolder.get();
        if (context == null) {
            Map<String, DefaultNode> localCacheNameMap = contextNameNodeMap;
            DefaultNode node = localCacheNameMap.get(name);
            if (node == null) {
                if (localCacheNameMap.size() > Constants.MAX_CONTEXT_NAME_SIZE) {
                    setNullContext();
                    return NULL_CONTEXT;
                } else {
                    LOCK.lock();
                    try {
                        node = contextNameNodeMap.get(name);
                        if (node == null) {
                            if (contextNameNodeMap.size() > Constants.MAX_CONTEXT_NAME_SIZE) {
                                setNullContext();
                                return NULL_CONTEXT;
                            } else {
                                node = new EntranceNode(new StringResourceWrapper(name, EntryType.IN), null);
                                // Add entrance node.
                                Constants.ROOT.addChild(node);

                                Map<String, DefaultNode> newMap = new HashMap<>(contextNameNodeMap.size() + 1);
                                newMap.putAll(contextNameNodeMap);
                                newMap.put(name, node);
                                contextNameNodeMap = newMap;
                            }
                        }
                    } finally {
                        LOCK.unlock();
                    }
                }
            }
            context = new Context(node, name);
            context.setOrigin(origin);
            contextHolder.set(context);
        }

        return context;
    }

ProcessorSlotChain

SlotChainProvider#newSlotChain,按照默认的DefaultSlotChainBuilder生成责任链。

    public static ProcessorSlotChain newSlotChain() {
        if (slotChainBuilder != null) {
            return slotChainBuilder.build();
        }

        // Resolve the slot chain builder SPI.
        slotChainBuilder = SpiLoader.loadFirstInstanceOrDefault(SlotChainBuilder.class, DefaultSlotChainBuilder.class);

        if (slotChainBuilder == null) {
            // Should not go through here.
            RecordLog.warn("[SlotChainProvider] Wrong state when resolving slot chain builder, using default");
            slotChainBuilder = new DefaultSlotChainBuilder();
        } else {
            RecordLog.info("[SlotChainProvider] Global slot chain builder resolved: "
                + slotChainBuilder.getClass().getCanonicalName());
        }
        return slotChainBuilder.build();
    }

DefaultSlotChainBuilder#build,根据spi加载ProcessorSlot

public class DefaultSlotChainBuilder implements SlotChainBuilder {

    @Override
    public ProcessorSlotChain build() {
        ProcessorSlotChain chain = new DefaultProcessorSlotChain();

        // Note: the instances of ProcessorSlot should be different, since they are not stateless.
        List<ProcessorSlot> sortedSlotList = SpiLoader.loadPrototypeInstanceListSorted(ProcessorSlot.class);
        for (ProcessorSlot slot : sortedSlotList) {
            if (!(slot instanceof AbstractLinkedProcessorSlot)) {
                RecordLog.warn("The ProcessorSlot(" + slot.getClass().getCanonicalName() + ") is not an instance of AbstractLinkedProcessorSlot, can't be added into ProcessorSlotChain");
                continue;
            }

            chain.addLast((AbstractLinkedProcessorSlot<?>) slot);
        }

        return chain;
    }
}

NodeSelectorSlot

NodeSelectorSlot,处理同一个resource,使用同一个node

@SpiOrder(-10000)
public class NodeSelectorSlot extends AbstractLinkedProcessorSlot<Object> {

    /**
     * {@link DefaultNode}s of the same resource in different context.
     */
    private volatile Map<String, DefaultNode> map = new HashMap<String, DefaultNode>(10);

    @Override
    public void entry(Context context, ResourceWrapper resourceWrapper, Object obj, int count, boolean prioritized, Object... args)
        throws Throwable {
        DefaultNode node = map.get(context.getName());
        if (node == null) {
            synchronized (this) {
                node = map.get(context.getName());
                if (node == null) {
                    node = new DefaultNode(resourceWrapper, null);
                    HashMap<String, DefaultNode> cacheMap = new HashMap<String, DefaultNode>(map.size());
                    cacheMap.putAll(map);
                    cacheMap.put(context.getName(), node);
                    map = cacheMap;
                    // Build invocation tree
                    ((DefaultNode) context.getLastNode()).addChild(node);
                }

            }
        }

        context.setCurNode(node);
        fireEntry(context, resourceWrapper, node, count, prioritized, args);
    }
}

ClusterBuilderSlot

对于每一个 resource,会对应一个 ClusterNode 实例,如果不存在,就创建一个实例。当设置了 origin 的时候,会额外生成一个 StatisticsNode 实例,挂在 ClusterNode 上。

  • 不同入口的访问数据是DefaultNode
  • 统计所有入口访问数据之和是ClusterNode
  • 来自某个服务的访问数据是OriginNode
@SpiOrder(-9000)
public class ClusterBuilderSlot extends AbstractLinkedProcessorSlot<DefaultNode> {

    private static volatile Map<ResourceWrapper, ClusterNode> clusterNodeMap = new HashMap<>();

    private static final Object lock = new Object();

    private volatile ClusterNode clusterNode = null;

    @Override
    public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count,
                      boolean prioritized, Object... args)
        throws Throwable {
        if (clusterNode == null) {
            synchronized (lock) {
                if (clusterNode == null) {
                    // Create the cluster node.
                    clusterNode = new ClusterNode(resourceWrapper.getName(), resourceWrapper.getResourceType());
                    HashMap<ResourceWrapper, ClusterNode> newMap = new HashMap<>(Math.max(clusterNodeMap.size(), 16));
                    newMap.putAll(clusterNodeMap);
                    newMap.put(node.getId(), clusterNode);

                    clusterNodeMap = newMap;
                }
            }
        }
        node.setClusterNode(clusterNode);

        /*
         * if context origin is set, we should get or create a new {@link Node} of
         * the specific origin.
         */
        if (!"".equals(context.getOrigin())) {
            Node originNode = node.getClusterNode().getOrCreateOriginNode(context.getOrigin());
            context.getCurEntry().setOriginNode(originNode);
        }

        fireEntry(context, resourceWrapper, node, count, prioritized, args);
    }
}

LogSlot

对于下面的节点如果有抛出阻塞异常,进行捕捉,并且打印日志。记录哪些接口被规则挡住了。

@SpiOrder(-8000)
public class LogSlot extends AbstractLinkedProcessorSlot<DefaultNode> {

    @Override
    public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode obj, int count, boolean prioritized, Object... args)
        throws Throwable {
        try {
            fireEntry(context, resourceWrapper, obj, count, prioritized, args);
        } catch (BlockException e) {
            EagleEyeLogUtil.log(resourceWrapper.getName(), e.getClass().getSimpleName(), e.getRuleLimitApp(),
                context.getOrigin(), count);
            throw e;
        } catch (Throwable e) {
            RecordLog.warn("Unexpected entry exception", e);
        }

    }
}

StatisticSlot

负责进行数据统计,为限流降级提供数据支持的。

@SpiOrder(-7000)
public class StatisticSlot extends AbstractLinkedProcessorSlot<DefaultNode> {

    @Override
    public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count,
                      boolean prioritized, Object... args) throws Throwable {
        try {
            // Do some checking.
            fireEntry(context, resourceWrapper, node, count, prioritized, args);

            // Request passed, add thread count and pass count.
            node.increaseThreadNum();
            node.addPassRequest(count);

            if (context.getCurEntry().getOriginNode() != null) {
                // Add count for origin node.
                context.getCurEntry().getOriginNode().increaseThreadNum();
                context.getCurEntry().getOriginNode().addPassRequest(count);
            }

            if (resourceWrapper.getEntryType() == EntryType.IN) {
                // Add count for global inbound entry node for global statistics.
                Constants.ENTRY_NODE.increaseThreadNum();
                Constants.ENTRY_NODE.addPassRequest(count);
            }

            // Handle pass event with registered entry callback handlers.
            for (ProcessorSlotEntryCallback<DefaultNode> handler : StatisticSlotCallbackRegistry.getEntryCallbacks()) {
                handler.onPass(context, resourceWrapper, node, count, args);
            }
        } catch (PriorityWaitException ex) {
            node.increaseThreadNum();
            if (context.getCurEntry().getOriginNode() != null) {
                // Add count for origin node.
                context.getCurEntry().getOriginNode().increaseThreadNum();
            }

            if (resourceWrapper.getEntryType() == EntryType.IN) {
                // Add count for global inbound entry node for global statistics.
                Constants.ENTRY_NODE.increaseThreadNum();
            }
            // Handle pass event with registered entry callback handlers.
            for (ProcessorSlotEntryCallback<DefaultNode> handler : StatisticSlotCallbackRegistry.getEntryCallbacks()) {
                handler.onPass(context, resourceWrapper, node, count, args);
            }
        } catch (BlockException e) {
            // Blocked, set block exception to current entry.
            context.getCurEntry().setBlockError(e);

            // Add block count.
            node.increaseBlockQps(count);
            if (context.getCurEntry().getOriginNode() != null) {
                context.getCurEntry().getOriginNode().increaseBlockQps(count);
            }

            if (resourceWrapper.getEntryType() == EntryType.IN) {
                // Add count for global inbound entry node for global statistics.
                Constants.ENTRY_NODE.increaseBlockQps(count);
            }

            // Handle block event with registered entry callback handlers.
            for (ProcessorSlotEntryCallback<DefaultNode> handler : StatisticSlotCallbackRegistry.getEntryCallbacks()) {
                handler.onBlocked(e, context, resourceWrapper, node, count, args);
            }

            throw e;
        } catch (Throwable e) {
            // Unexpected internal error, set error to current entry.
            context.getCurEntry().setError(e);

            throw e;
        }
    }
}

AuthoritySlot

AuthoritySlot做权限控制,根据 origin 做黑白名单的控制:

@SpiOrder(-6000)
public class AuthoritySlot extends AbstractLinkedProcessorSlot<DefaultNode> {

    @Override
    public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count, boolean prioritized, Object... args)
        throws Throwable {
        checkBlackWhiteAuthority(resourceWrapper, context);
        fireEntry(context, resourceWrapper, node, count, prioritized, args);
    }

    @Override
    public void exit(Context context, ResourceWrapper resourceWrapper, int count, Object... args) {
        fireExit(context, resourceWrapper, count, args);
    }

    void checkBlackWhiteAuthority(ResourceWrapper resource, Context context) throws AuthorityException {
        Map<String, Set<AuthorityRule>> authorityRules = AuthorityRuleManager.getAuthorityRules();

        if (authorityRules == null) {
            return;
        }

        Set<AuthorityRule> rules = authorityRules.get(resource.getName());
        if (rules == null) {
            return;
        }

        for (AuthorityRule rule : rules) {
            if (!AuthorityRuleChecker.passCheck(rule, context)) {
                throw new AuthorityException(context.getOrigin(), rule);
            }
        }
    }
}

AuthorityRuleChecker#passCheck

对origin进行规则校验

final class AuthorityRuleChecker {

    static boolean passCheck(AuthorityRule rule, Context context) {
        String requester = context.getOrigin();

        // Empty origin or empty limitApp will pass.
        if (StringUtil.isEmpty(requester) || StringUtil.isEmpty(rule.getLimitApp())) {
            return true;
        }

        // Do exact match with origin name.
        int pos = rule.getLimitApp().indexOf(requester);
        boolean contain = pos > -1;

        if (contain) {
            boolean exactlyMatch = false;
            String[] appArray = rule.getLimitApp().split(",");
            for (String app : appArray) {
                if (requester.equals(app)) {
                    exactlyMatch = true;
                    break;
                }
            }

            contain = exactlyMatch;
        }

        int strategy = rule.getStrategy();
        if (strategy == RuleConstant.AUTHORITY_BLACK && contain) {
            return false;
        }

        if (strategy == RuleConstant.AUTHORITY_WHITE && !contain) {
            return false;
        }

        return true;
    }

    private AuthorityRuleChecker() {}
}

SystemSlot

根据整个系统运行的统计数据来限流的,防止当前系统负载过高。它支持入口qps、线程数、响应时间、cpu使用率、负载5个限流的维度。

@SpiOrder(-5000)
public class SystemSlot extends AbstractLinkedProcessorSlot<DefaultNode> {

    @Override
    public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count,
                      boolean prioritized, Object... args) throws Throwable {
        SystemRuleManager.checkSystem(resourceWrapper);
        fireEntry(context, resourceWrapper, node, count, prioritized, args);
    }

    @Override
    public void exit(Context context, ResourceWrapper resourceWrapper, int count, Object... args) {
        fireExit(context, resourceWrapper, count, args);
    }

}

Sentinel 针对所有的入口流量,使用了一个全局的 ENTRY_NODE 进行统计,所以我们也要知道,系统保护规则是全局的,和具体的某个资源没有关系。

    public static void checkSystem(ResourceWrapper resourceWrapper) throws BlockException {
        if (resourceWrapper == null) {
            return;
        }
        // Ensure the checking switch is on.
        if (!checkSystemStatus.get()) {
            return;
        }

        // for inbound traffic only
        if (resourceWrapper.getEntryType() != EntryType.IN) {
            return;
        }

        // total qps
        double currentQps = Constants.ENTRY_NODE == null ? 0.0 : Constants.ENTRY_NODE.successQps();
        if (currentQps > qps) {
            throw new SystemBlockException(resourceWrapper.getName(), "qps");
        }

        // total thread
        int currentThread = Constants.ENTRY_NODE == null ? 0 : Constants.ENTRY_NODE.curThreadNum();
        if (currentThread > maxThread) {
            throw new SystemBlockException(resourceWrapper.getName(), "thread");
        }

        double rt = Constants.ENTRY_NODE == null ? 0 : Constants.ENTRY_NODE.avgRt();
        if (rt > maxRt) {
            throw new SystemBlockException(resourceWrapper.getName(), "rt");
        }

        // load. BBR algorithm.
        if (highestSystemLoadIsSet && getCurrentSystemAvgLoad() > highestSystemLoad) {
            if (!checkBbr(currentThread)) {
                throw new SystemBlockException(resourceWrapper.getName(), "load");
            }
        }

        // cpu usage
        if (highestCpuUsageIsSet && getCurrentCpuUsage() > highestCpuUsage) {
            throw new SystemBlockException(resourceWrapper.getName(), "cpu");
        }
    }

FlowSlot

流控的核心处理类

@SpiOrder(-2000)
public class FlowSlot extends AbstractLinkedProcessorSlot<DefaultNode> {

    private final FlowRuleChecker checker;

    public FlowSlot() {
        this(new FlowRuleChecker());
    }

    /**
     * Package-private for test.
     *
     * @param checker flow rule checker
     * @since 1.6.1
     */
    FlowSlot(FlowRuleChecker checker) {
        AssertUtil.notNull(checker, "flow checker should not be null");
        this.checker = checker;
    }

    @Override
    public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count,
                      boolean prioritized, Object... args) throws Throwable {
        checkFlow(resourceWrapper, context, node, count, prioritized);

        fireEntry(context, resourceWrapper, node, count, prioritized, args);
    }
    
        void checkFlow(ResourceWrapper resource, Context context, DefaultNode node, int count, boolean prioritized)
        throws BlockException {
        checker.checkFlow(ruleProvider, resource, context, node, count, prioritized);
    }
}

FlowRuleChecker#checkFlow

public class FlowRuleChecker {

    public void checkFlow(Function<String, Collection<FlowRule>> ruleProvider, ResourceWrapper resource,
                          Context context, DefaultNode node, int count, boolean prioritized) throws BlockException {
        if (ruleProvider == null || resource == null) {
            return;
        }
        Collection<FlowRule> rules = ruleProvider.apply(resource.getName());
        if (rules != null) {
            for (FlowRule rule : rules) {
                if (!canPassCheck(rule, context, node, count, prioritized)) {
                    throw new FlowException(rule.getLimitApp(), rule);
                }
            }
        }
    }
    
    private static boolean passLocalCheck(FlowRule rule, Context context, DefaultNode node, int acquireCount,
                                          boolean prioritized) {
        Node selectedNode = selectNodeByRequesterAndStrategy(rule, context, node);
        if (selectedNode == null) {
            return true;
        }

        return rule.getRater().canPass(selectedNode, acquireCount, prioritized);
    }
}

DegradeSlot

降级策略。Sentinel支持三种熔断策略:慢调用比例、异常比例 、异常数。

Sentinel会为每个设置的规则都创建一个熔断器,熔断器有三种状态,OPEN(打开)、HALF_OPEN(半开)、CLOSED(关闭)

  • 当处于CLOSED状态时,可以访问资源,访问之后会进行慢调用比例、异常比例、异常数的统计,一旦达到了设置的阈值,就会将熔断器的状态设置为OPEN
  • 当处于OPEN状态时,会去判断是否达到了熔断时间,如果没到,拒绝访问,如果到了,那么就将状态改成HALF_OPEN,然后访问资源,访问之后会对访问结果进行判断,符合规则设置的要求,直接将熔断器设置为CLOSED,关闭熔断器,不符合则还是改为OPEN状态
  • 当处于HALF_OPEN状态时,直接拒绝访问资源
@SpiOrder(-1000)
public class DegradeSlot extends AbstractLinkedProcessorSlot<DefaultNode> {

    @Override
    public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count,
                      boolean prioritized, Object... args) throws Throwable {
        performChecking(context, resourceWrapper);

        fireEntry(context, resourceWrapper, node, count, prioritized, args);
    }

    void performChecking(Context context, ResourceWrapper r) throws BlockException {
        List<CircuitBreaker> circuitBreakers = DegradeRuleManager.getCircuitBreakers(r.getName());
        if (circuitBreakers == null || circuitBreakers.isEmpty()) {
            return;
        }
        for (CircuitBreaker cb : circuitBreakers) {
            if (!cb.tryPass(context)) {
                throw new DegradeException(cb.getRule().getLimitApp(), cb.getRule());
            }
        }
    }
}

SentinelWebInterceptor

SentinelWebInterceptor继承AbstractSentinelInterceptor,当有请求访问,会执行AbstractSentinelInterceptor#preHandle

    public boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler) throws Exception {
        try {
            String resourceName = this.getResourceName(request);
            if (StringUtil.isEmpty(resourceName)) {
                return true;
            } else if (this.increaseReferece(request, this.baseWebMvcConfig.getRequestRefName(), 1) != 1) {
                return true;
            } else {
                String origin = this.parseOrigin(request);
                String contextName = this.getContextName(request);
                ContextUtil.enter(contextName, origin);
                Entry entry = SphU.entry(resourceName, 1, EntryType.IN);
                request.setAttribute(this.baseWebMvcConfig.getRequestAttributeName(), entry);
                return true;
            }
        } catch (BlockException var12) {
            BlockException e = var12;

            try {
                this.handleBlockException(request, response, e);
            } finally {
                ContextUtil.exit();
            }

            return false;
        }
    }

InitFunc

SphU#entry(),执行该方法会调用Env的静态变量,Env进行初始化。

    public static Entry entry(String name) throws BlockException {
        return Env.sph.entry(name, EntryType.OUT, 1, OBJECTS0);
    }

Env执行静态方法。

public class Env {

    public static final Sph sph = new CtSph();

    static {
        // If init fails, the process will exit.
        InitExecutor.doInit();
    }

}

InitExecutor#doInit

使用 SPI 加载 InitFunc 的实现

    public static void doInit() {
        if (!initialized.compareAndSet(false, true)) {
            return;
        }
        try {
            ServiceLoader<InitFunc> loader = ServiceLoaderUtil.getServiceLoader(InitFunc.class);
            List<OrderWrapper> initList = new ArrayList<OrderWrapper>();
            for (InitFunc initFunc : loader) {
                RecordLog.info("[InitExecutor] Found init func: " + initFunc.getClass().getCanonicalName());
                insertSorted(initList, initFunc);
            }
            for (OrderWrapper w : initList) {
                w.func.init();
                RecordLog.info(String.format("[InitExecutor] Executing %s with order %d",
                    w.func.getClass().getCanonicalName(), w.order));
            }
        } catch (Exception ex) {
            RecordLog.warn("[InitExecutor] WARN: Initialization failed", ex);
            ex.printStackTrace();
        } catch (Error error) {
            RecordLog.warn("[InitExecutor] ERROR: Initialization failed with fatal error", error);
            error.printStackTrace();
        }
    }

HeartbeatSenderInitFunc,使用HeartbeatSenderProvider获取加载HeartbeatSender,定时执行。

@InitOrder(-1)
public class HeartbeatSenderInitFunc implements InitFunc {
    private ScheduledExecutorService pool = null;

    public HeartbeatSenderInitFunc() {
    }

    private void initSchedulerIfNeeded() {
        if (this.pool == null) {
            this.pool = new ScheduledThreadPoolExecutor(2, new NamedThreadFactory("sentinel-heartbeat-send-task", true), new DiscardOldestPolicy());
        }

    }

    public void init() {
        HeartbeatSender sender = HeartbeatSenderProvider.getHeartbeatSender();
        if (sender == null) {
            RecordLog.warn("[HeartbeatSenderInitFunc] WARN: No HeartbeatSender loaded", new Object[0]);
        } else {
            this.initSchedulerIfNeeded();
            long interval = this.retrieveInterval(sender);
            this.setIntervalIfNotExists(interval);
            this.scheduleHeartbeatTask(sender, interval);
        }
    }

    private boolean isValidHeartbeatInterval(Long interval) {
        return interval != null && interval > 0L;
    }

    private void setIntervalIfNotExists(long interval) {
        SentinelConfig.setConfig("csp.sentinel.heartbeat.interval.ms", String.valueOf(interval));
    }

    long retrieveInterval(HeartbeatSender sender) {
        Long intervalInConfig = TransportConfig.getHeartbeatIntervalMs();
        if (this.isValidHeartbeatInterval(intervalInConfig)) {
            RecordLog.info("[HeartbeatSenderInitFunc] Using heartbeat interval in Sentinel config property: " + intervalInConfig, new Object[0]);
            return intervalInConfig;
        } else {
            long senderInterval = sender.intervalMs();
            RecordLog.info("[HeartbeatSenderInit] Heartbeat interval not configured in config property or invalid, using sender default: " + senderInterval, new Object[0]);
            return senderInterval;
        }
    }

    private void scheduleHeartbeatTask(final HeartbeatSender sender, long interval) {
        this.pool.scheduleAtFixedRate(new Runnable() {
            public void run() {
                try {
                    sender.sendHeartbeat();
                } catch (Throwable var2) {
                    RecordLog.warn("[HeartbeatSender] Send heartbeat error", var2);
                }

            }
        }, 5000L, interval, TimeUnit.MILLISECONDS);
        RecordLog.info("[HeartbeatSenderInit] HeartbeatSender started: " + sender.getClass().getCanonicalName(), new Object[0]);
    }
}

HeartbeatSenderProvider静态方法块进行初始化,根据SPI获取HeartbeatSender

public final class HeartbeatSenderProvider {
    private static HeartbeatSender heartbeatSender = null;

    private static void resolveInstance() {
        HeartbeatSender resolved = (HeartbeatSender)SpiLoader.loadHighestPriorityInstance(HeartbeatSender.class);
        if (resolved == null) {
            RecordLog.warn("[HeartbeatSenderProvider] WARN: No existing HeartbeatSender found", new Object[0]);
        } else {
            heartbeatSender = resolved;
            RecordLog.info("[HeartbeatSenderProvider] HeartbeatSender activated: " + resolved.getClass().getCanonicalName(), new Object[0]);
        }

    }

    public static HeartbeatSender getHeartbeatSender() {
        return heartbeatSender;
    }

    private HeartbeatSenderProvider() {
    }

    static {
        resolveInstance();
    }
}

StatisticNode

ClusterNodeDefaultNodeEntranceNode都继承StatisticNode

StatisticNode#addPassRequest,增加通过的请求数。

        private transient volatile Metric rollingCounterInSecond = new ArrayMetric(SampleCountProperty.SAMPLE_COUNT,
        IntervalProperty.INTERVAL);

    private transient Metric rollingCounterInMinute = new ArrayMetric(60, 60 * 1000, false);

	@Override
    public void addPassRequest(int count) {
        rollingCounterInSecond.addPass(count);
        rollingCounterInMinute.addPass(count);
    }

ArrayMetric的data属性默认实现OccupiableBucketLeapArray

   	private final LeapArray<MetricBucket> data; 
	
	public ArrayMetric(int sampleCount, int intervalInMs) {
        this.data = new OccupiableBucketLeapArray(sampleCount, intervalInMs);
    }

LeapArray#currentWindow(long)。添加数据的时候,我们要先获取操作的目标窗口,也就是 currentWindow 这个方法。

    public WindowWrap<T> currentWindow(long timeMillis) {
        if (timeMillis < 0) {
            return null;
        }

        int idx = calculateTimeIdx(timeMillis);
        // Calculate current bucket start time.
        long windowStart = calculateWindowStart(timeMillis);

        /*
         * Get bucket item at given time from the array.
         *
         * (1) Bucket is absent, then just create a new bucket and CAS update to circular array.
         * (2) Bucket is up-to-date, then just return the bucket.
         * (3) Bucket is deprecated, then reset current bucket and clean all deprecated buckets.
         */
        while (true) {
            WindowWrap<T> old = array.get(idx);
            if (old == null) {
                WindowWrap<T> window = new WindowWrap<T>(windowLengthInMs, windowStart, newEmptyBucket(timeMillis));
                if (array.compareAndSet(idx, null, window)) {
                    // Successfully updated, return the created bucket.
                    return window;
                } else {
                    // Contention failed, the thread will yield its time slice to wait for bucket available.
                    Thread.yield();
                }
            } else if (windowStart == old.windowStart()) {
                return old;
            } else if (windowStart > old.windowStart()) {
                if (updateLock.tryLock()) {
                    try {
                        // Successfully get the update lock, now we reset the bucket.
                        return resetWindowTo(old, windowStart);
                    } finally {
                        updateLock.unlock();
                    }
                } else {
                    // Contention failed, the thread will yield its time slice to wait for bucket available.
                    Thread.yield();
                }
            } else if (windowStart < old.windowStart()) {
                // Should not go through here, as the provided time is already behind.
                return new WindowWrap<T>(windowLengthInMs, windowStart, newEmptyBucket(timeMillis));
            }
        }
    }

LeapArray#values(long),获取有效的窗口中的数据。

    public List<T> values(long timeMillis) {
        if (timeMillis < 0) {
            return new ArrayList<T>();
        }
        int size = array.length();
        List<T> result = new ArrayList<T>(size);

        for (int i = 0; i < size; i++) {
            WindowWrap<T> windowWrap = array.get(i);
            if (windowWrap == null || isWindowDeprecated(timeMillis, windowWrap)) {
                continue;
            }
            result.add(windowWrap.value());
        }
        return result;
    }

TrafficShapingController

FlowRuleUtil#generateRater,根据规则生成对应的TrafficShapingController

    private static TrafficShapingController generateRater(/*@Valid*/ FlowRule rule) {
        if (rule.getGrade() == RuleConstant.FLOW_GRADE_QPS) {
            switch (rule.getControlBehavior()) {
                case RuleConstant.CONTROL_BEHAVIOR_WARM_UP:
                    return new WarmUpController(rule.getCount(), rule.getWarmUpPeriodSec(),
                        ColdFactorProperty.coldFactor);
                case RuleConstant.CONTROL_BEHAVIOR_RATE_LIMITER:
                    return new RateLimiterController(rule.getMaxQueueingTimeMs(), rule.getCount());
                case RuleConstant.CONTROL_BEHAVIOR_WARM_UP_RATE_LIMITER:
                    return new WarmUpRateLimiterController(rule.getCount(), rule.getWarmUpPeriodSec(),
                        rule.getMaxQueueingTimeMs(), ColdFactorProperty.coldFactor);
                case RuleConstant.CONTROL_BEHAVIOR_DEFAULT:
                default:
                    // Default mode or unknown mode: default traffic shaping controller (fast-reject).
            }
        }
        return new DefaultController(rule.getCount(), rule.getGrade());
    }

DefaultController#canPass(),超过流量阈值的会直接拒绝。

    @Override
    public boolean canPass(Node node, int acquireCount, boolean prioritized) {
        int curCount = avgUsedTokens(node);
        if (curCount + acquireCount > count) {
            if (prioritized && grade == RuleConstant.FLOW_GRADE_QPS) {
                long currentTime;
                long waitInMs;
                currentTime = TimeUtil.currentTimeMillis();
                waitInMs = node.tryOccupyNext(currentTime, acquireCount, count);
                if (waitInMs < OccupyTimeoutProperty.getOccupyTimeout()) {
                    node.addWaitingRequest(currentTime + waitInMs, acquireCount);
                    node.addOccupiedPass(acquireCount);
                    sleep(waitInMs);

                    // PriorityWaitException indicates that the request will pass after waiting for {@link @waitInMs}.
                    throw new PriorityWaitException(waitInMs);
                }
            }
            return false;
        }
        return true;
    }

SentinelResourceAspect

@Aspect
public class SentinelResourceAspect extends AbstractSentinelAspectSupport {

    @Pointcut("@annotation(com.alibaba.csp.sentinel.annotation.SentinelResource)")
    public void sentinelResourceAnnotationPointcut() {
    }

    @Around("sentinelResourceAnnotationPointcut()")
    public Object invokeResourceWithSentinel(ProceedingJoinPoint pjp) throws Throwable {
        Method originMethod = resolveMethod(pjp);

        SentinelResource annotation = originMethod.getAnnotation(SentinelResource.class);
        if (annotation == null) {
            // Should not go through here.
            throw new IllegalStateException("Wrong state for SentinelResource annotation");
        }
        String resourceName = getResourceName(annotation.value(), originMethod);
        EntryType entryType = annotation.entryType();
        int resourceType = annotation.resourceType();
        Entry entry = null;
        try {
            entry = SphU.entry(resourceName, resourceType, entryType, pjp.getArgs());
            Object result = pjp.proceed();
            return result;
        } catch (BlockException ex) {
            return handleBlockException(pjp, annotation, ex);
        } catch (Throwable ex) {
            Class<? extends Throwable>[] exceptionsToIgnore = annotation.exceptionsToIgnore();
            // The ignore list will be checked first.
            if (exceptionsToIgnore.length > 0 && exceptionBelongsTo(ex, exceptionsToIgnore)) {
                throw ex;
            }
            if (exceptionBelongsTo(ex, annotation.exceptionsToTrace())) {
                traceException(ex);
                return handleFallback(pjp, annotation, ex);
            }

            // No fallback function can handle the exception, so throw it out.
            throw ex;
        } finally {
            if (entry != null) {
                entry.exit(1, pjp.getArgs());
            }
        }
    }
}

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/480205.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

【分布式】数据冗余

当我们拥有了许多的存储服务器&#xff0c;且通过将数据在网关进行一致性哈希或者哈希桶的分发之后&#xff0c;我们拥有了一个具有基本负载均衡的系统&#xff0c;但是&#xff0c;此时我们又有新的问题产生了&#xff1a;我们所有的数据只有一份&#xff0c;如果这一份数据丢…

OD工具之动态逆向分析技术实例分析

OD工具之动态逆向分析技术实例分析 vscode等编写cmp.cOD工具打开cmp.exe 卧槽垃圾高级软件工程真是烦人还是记录一下吧那么简单的几行没有手册搞半天都无力吐槽了 vscode等编写cmp.c 在vscode等编辑器中编写cmp.c文件&#xff1a; #include<stdio.h> int main() {int …

手机信息管理系统【控制台+MySQL】(Java课设)

系统类型 控制台类型Mysql数据库存储数据 使用范围 适合作为Java课设&#xff01;&#xff01;&#xff01; 部署环境 jdk1.8Mysql8.0Idea或eclipsejdbc 运行效果 本系统源码地址&#xff1a;https://download.csdn.net/download/qq_50954361/87737284 更多系统资源库地…

CTFshow Web入门 命令执行

目录 web29 web30 web31 web32 web33 web34 web35 web36 web37 web38 web39 web40 web41 web42 web43 web44 web45 web46 web47 web48 web49 web50 web51 web52 web53 web54 web55 web56 web57 web58 web59 web60-65 web66 web67 web68 we…

Spring Boot相关概念、创建与运行

文章目录 一、Spring Boot的创建与使用&#xff08;一&#xff09;为什么要学习 Spring Boot&#xff1f;&#xff08;二&#xff09;Spring Boot的优势&#xff08;三&#xff09;Spring Boot的创建1. 安装插件2. 创建项目3. 配置国内源4. 删除无效目录 &#xff08;四&#x…

祝大家劳动节快乐

文章目录 为我的笔记整理了一个小目录 Python基础 Python基础——0.Python环境的安装 Python基础——1.变量和简单数据类型 Python基础——2.列表简介 Python基础——3.操作列表 Python基础——4.if语句 Python基础——5.字典 Python基础——6.用户输入和while循环 Python基…

【王道·计算机网络】第二章 物理层【未完】

一、通信基础 1. 基本概念 1.1 物理层接口特性 物理层解决如何在连接各种计算机的传输媒体上传输比特流&#xff0c;不指定具体的传输媒体主要任务&#xff1a;确定与传输媒体接口有关的一些特性 → 定义标准接口特性&#xff1a; 机械特性&#xff1a;定义物理连接的特性&a…

十分钟点亮iCLed35

文章目录 前言iCLed35整体介绍iCLed概念iCLed系列产品优势iCLed35(6pin)的特性&#xff1a; iCLed35(6pin)的硬件设计iCLed35(6pin)的软件配置通讯时序&#xff1a;通讯协议介绍&#xff1a;整体的数据结构&#xff1a;睡眠模式&#xff1a; 点亮iCLed35(6pin)S32K144EVB配置驱…

linux入门---软硬链接

软链接 使用指令ln -s 被链接的文件 生成的软链接文件 便可以创建软连接文件&#xff0c;ln是link的简写表明当前要创建链接文件&#xff0c;s是soft的简写表明当前创建的链接文件为软链接文件&#xff0c;然后加上被链接的文件&#xff0c;最后写上生成的链接文件的文件名比如…

【谷粒商城之服务认证OAuth2.0】

本笔记内容为尚硅谷谷粒商城服务认证OAuth2.0部分 目录 一、OAuth 2.0 二、微博登录测试 1、微博登陆准备工作 2、获取微博Access Token 3、登录测试 1.添加HttpUtils工具类 2.controller 3.service 4.vo 总结 一、OAuth 2.0 OAuth&#xff1a; OAuth&#xff08;开…

【账号激活】

由于注册时会遇到诸多错误提示&#xff01;所以出此详细教程。 重点是要巧妙的运用无痕浏览窗口。 步骤分为两步&#xff0c;sign up 激活邮箱以及 log in 短信验证码验证。 右上角新建无痕浏览窗口。新建完后记得关闭此有痕浏览窗口。 成功创建。完成第一步。 若要连续操作切记…

双极性信号、正交信号和PAM信号通信系统matlab程序+仿真

资源地址&#xff1a; 双极性信号、正交信号和PAM信号通信系统MATLAB程序仿真资源-CSDN文库 部分程序及仿真图&#xff1a; clear all EbN00:10; %SNR的范围 for ii1:length(EbN0) SNREbN0(ii); %赋值给AWGN信道模块中的SNR sim(ex5); %运行仿…

【Java笔试强训 24】

&#x1f389;&#x1f389;&#x1f389;点进来你就是我的人了博主主页&#xff1a;&#x1f648;&#x1f648;&#x1f648;戳一戳,欢迎大佬指点! 欢迎志同道合的朋友一起加油喔&#x1f93a;&#x1f93a;&#x1f93a; 目录 一、选择题 二、编程题 &#x1f525;年终奖 …

一般对称性和轮换对称性

一般对称性 一元函数的对称性 几何意义是所围图形的面积绝对值 【注】使用对称性的时候&#xff0c;首先抓积分区域关于哪个轴对称&#xff0c;其次抓被积函数是为另一轴的奇&#xff08;偶函数&#xff09;。 二元函数的对称性(奇偶性) 【注】在一般对称性中&#xff0c;(x…

MCU固件升级系列1(STM32)

本系列将从升级流程、boot代码编写、APP代码编写以及固件打包来介绍&#xff0c;硬件选用STM32F407ZGT6&#xff08;手里只有&#xff09;&#xff0c;来完成这系列教程。 前言 为什么需要固件升级: 功能更新&#xff1a;随着产品的迭代和用户需求的变化&#xff0c;可能需要…

【DM】达梦数据库与MySQL适配

一、达梦入门技术文档 新手直接看达梦入门技术文档即可 二、达梦数据库 1、介绍 达梦数据库管理系统是达梦公司推出的具有完全自主知识产权的高性能数据库管理系统&#xff0c;简称DM&#xff0c;它具有如下特点&#xff1a;通用性、高性能、高可用、跨平台、高可扩展 2、与…

网络安全:windows批处理写病毒的一些基本命令.

网络安全&#xff1a;windows批处理一些命令. echo off一般都写在批处理的最上面&#xff0c;用于关闭回显&#xff0c;意思是 关闭回显&#xff1a; 没有关闭回显: 所以&#xff0c;意思就是将输入指令的过程隐藏起来。 set是设置的意思&#xff0c;作业是打印、创建和修改变…

【算法】求最短路径算法

文章目录 一、迪杰斯特拉算法1.1 算法介绍1.2 算法步骤1.3 应用场景 二、弗洛伊德算法2.1 算法介绍2.2 算法步骤2.3 应用场景 一、迪杰斯特拉算法 1.1 算法介绍 从某顶点出发&#xff0c;沿图的边到达另一顶点所经过的路径中&#xff0c;各边上权值之和最小的一条路径叫做最短…

Hausdorff distance

Hausdorff距离量度度量空间中紧子集之间的距离 定义 设 X X X和 Y Y Y是度量空间 M M M的两个紧子集 d H ( X , Y ) max ⁡ { sup ⁡ x ∈ X inf ⁡ y ∈ Y d ( x , y ) , sup ⁡ y ∈ Y inf ⁡ x ∈ X d ( x , y ) } d_H\left(X, Y\right) \max \left\{\sup_{x\in X} \in…

Linux — 多线程的互斥与同步,信号量

1.线程互斥 进程线程间的互斥相关背景概念 临界资源&#xff1a;多线程执行流共享的资源就叫做临界资源。临界区&#xff1a;每个线程内部&#xff0c;访问临界资源的代码&#xff0c;就叫做临界区。互斥&#xff1a;任何时刻&#xff0c;互斥保证有且只有一个执行流进入临界区…