深入理解Sentinel系列-2.Sentinel原理及核心源码分析

news2024/11/27 21:01:23
  • 👏作者简介:大家好,我是爱吃芝士的土豆倪,24届校招生Java选手,很高兴认识大家
  • 📕系列专栏:Spring源码、JUC源码、Kafka原理、分布式技术原理
  • 🔥如果感觉博主的文章还不错的话,请👍三连支持👍一下博主哦
  • 🍂博主正在努力完成2023计划中:源码溯源,一探究竟
  • 📝联系方式:nhs19990716,加我进群,大家一起学习,一起进步,一起对抗互联网寒冬👀

文章目录

  • 流量控制
    • 限流核心因素
    • 并发线程数
    • QPS
      • 直接拒绝
      • warm up(冷启动)
      • 匀速排队
    • 基于调用关系来做流量控制
    • Sentinel控制台
    • 动态限流规则
      • Nacos DataSource(接口)
      • 基于配置文件的动态限流
    • 集群限流
    • 熔断降级
      • 熔断指标
      • 熔断规则
      • 熔断时间窗口
      • 熔断实践
    • sentinel的实现
    • 分析sentinel源码实现
      • 树的构建 NodeSelectorSlot
      • 统计监控 StatisticSlot
      • 流量控制 FlowSlot

流量控制

sentinel中有几个比较重要的东西:

  • 资源 - 针对哪一个方法,可以在接口层面 ,也可以在类层面
  • 规则 - 根据资源配置限流的规则
  • Entry -> 请求

限流核心因素

  • resource 资源
  • count 阈值
  • grade 限流的类型

并发线程数

我们所有的请求它会去基于业务线程也好,容器分配的线程也好,其都是通过线程的分配来处理。(像dubbo就有默认200的线程大小,其实也就是服务端接收到线程以后,会分配一个业务线程去处理这个请求,其最大的线程数设置的是200。)在sentinel中,关于并发线程数的策略指的是对于并发请求的处理策略。这包括了对于同时处理的请求数量的限制、超时处理、以及资源的保护等方面的策略。这些策略旨在保证系统在高并发情况下能够正常运行,同时避免资源被过度消耗或者出现系统崩溃的情况。通过设置合适的并发线程数策略,可以有效地管理系统的资源和保证系统的稳定性。

QPS

在Sentinel中,关于QPS的策略指的是对于每秒查询次数的限制策略。这些策略可以用来控制系统的并发请求量,防止系统被过度压力而导致性能下降或系统崩溃。通过设置QPS的限制策略,可以限制系统在单位时间内能够处理的请求次数,从而保护系统免受过多请求的影响,确保系统在承受合理负载的情况下能够正常运行。这些策略可以根据系统的实际情况和需求来进行调整,以保证系统的稳定性和可靠性。


当触发了上述的两种限流,就会产生一种行为,也就是所谓流量控制的策略

  • 直接拒绝
  • warm up(冷启动)
  • 匀速排队

这个策略主要是通过ControlBehavior属性来控制策略的

直接拒绝

flowRule.setControlBehavior(RuleConstant.CONTROL_BEHAVIOR_DEFAULT);

warm up(冷启动)

在这里插入图片描述

所谓冷启动,就是我们的系统在启动的时候长期处于低水位(整个系统的吞吐量非常低,并发量很低),当突然出现这种瞬时流量,系统有一个启动的过程,并不是一下子将所有的流量都放进来,而是逐步的拉伸整个系统的水位直到通过一个时间维度达到最高峰的阈值。(因为一下子将所有流量放进来,系统很可能在一瞬间被压垮,所以匀速提升整个系统的预热,可以在一定情况下逐渐增加处理上限)

匀速排队

在这里插入图片描述

总体来说,在限流里面,可分为比较重要的就是 资源规则 ,我们可以通过配置不同的资源去设置不同的规则,从而控制整个请求流量的行为。

基于调用关系来做流量控制

在整个调用链路里面,针对不同请求来源去做(FlowRule.limitApp)

  • 根据资源

基于资源的流量控制是指根据被调用的具体资源(比如接口、方法等)来进行流量控制。在Sentinel中,可以通过配置规则来限制不同资源的访问流量,比如可以设置某个接口的最大访问次数或者并发数。这种方式适用于对特定资源进行限流的场景,可以保护资源不被过度访问。

  • 根据来源

基于来源的流量控制是指根据调用方的来源(比如IP地址、用户ID等)来进行流量控制。在Sentinel中,可以通过配置规则来限制不同来源的访问流量,比如可以设置某个IP地址的最大访问次数或者限制某个用户的访问频率。这种方式适用于对不同来源的调用方进行个性化的限流控制。

  • 根据参数

基于参数的流量控制是指根据调用方传递的参数来进行流量控制。在Sentinel中,可以通过配置规则来限制不同参数组合的访问流量,比如可以设置某个接口某个参数的最大访问次数。这种方式适用于对特定参数组合进行限流的场景,可以根据业务需求对不同参数进行限流控制。

Sentinel控制台

java -Dserver.port=7777 -Dcsp.sentinel.dashboard.server=localhost:7777 -
Dproject.name=sentinel-dashboard-1.8.0 -jar sentinel-dashboard-1.8.0.jar

在这里插入图片描述

在这里插入图片描述

上图可以清晰的展示到sentinel可以提供哪些方向上的规则配置。

需要注意的一点就是,单机情况下,如果配置阈值的话,可以通过单机压测 压测出来。

动态限流规则

在这里插入图片描述

实际上就是动态的规则感知进行 (pull、push)

Nacos DataSource(接口)

对于整个Sentinel的动态数据源来说,必须要去做扩展。

这个接口提供了动态配置感知的能力 和 动态配置存储的能力。

// sentinel 提供的spi扩展
/*
SPI(Service Provider Interface)扩展是Java中一种用于扩展框架的机制。在SPI中,框架定义了一组接口(接口或抽象类),并允许外部的实现者按照这些接口的规范来提供具体的实现。这种机制允许系统的扩展功能可以通过在类路径下放置实现者提供的JAR包来实现,而无需修改框架的源代码。

SPI扩展机制的优点是能够实现框架和扩展的解耦,框架可以在不修改代码的情况下,通过加载外部的实现类来扩展功能。同时,SPI扩展也提供了一种简单的插件机制,允许开发者通过编写实现类来扩展框架的功能。
*/

// 扩展之后必须在 resource/META-INF/services/com.alibaba.csp.sentinel.init.initFunc 里面写入com.gupaoedu.example.springbootsentinel.FlowRuleInitFunc 对应的路径

// 扩展之后,sentinel在初始化的时候,就会去加载这个方法

public class FlowRuleInitFunc implements InitFunc{
    
    @Override
    public void init() throws Exception {
        List<FlowRule> rules=new ArrayList<>();
        FlowRule flowRule=new FlowRule();
        flowRule.setResource("read"); //针对那个资源设置规则
        flowRule.setGrade(RuleConstant.FLOW_GRADE_QPS);//QPS或者并发数
        flowRule.setCount(5); //QPS=5
        //直接拒绝:
        flowRule.setControlBehavior(RuleConstant.CONTROL_BEHAVIOR_DEFAULT);
        rules.add(flowRule);

        FlowRuleManager.loadRules(rules);
    }
   
}
# Sentinel 控制台地址 将当前的应用接入到控制台
spring.cloud.sentinel.transport.dashboard=192.168.216.128:7777
# 取消Sentinel控制台懒加载
# 默认情况下 Sentinel 会在客户端首次调用的时候进行初始化,开始向控制台发送心跳包
# 配置 sentinel.eager=true 时,取消Sentinel控制台懒加载功能
spring.cloud.sentinel.eager=true
@RestController
public class SentinelController {

    @Autowired
    TestService testService;

    @GetMapping("/hello/{name}")
    public String sayHello(@PathVariable("name") String name){
        return testService.doTest(name);
    }
}

@Service
public class TestService {

    @SentinelResource(value = "doTest") //声明限流的资源
    public String doTest(String name){
        return "hello , "+name;
    }

}

此时我们已经将 sentinel 接入到 sentinel控制台了,然后我们使用jmeter来进行测试

配置http请求 和 线程数

在这里插入图片描述

此时就可以看到 在sentinel控制台中对应接口的 监控情况

在这里插入图片描述

在前面起到了,只要依赖了 sentinel这个包,那么项目就会自动的去集成流量的一个过滤,比如/hello/{name},因为没有设置规则,所以都能通过。

接下来做一个动态规则。

		<dependency>
            <groupId>com.alibaba.csp</groupId>
            <artifactId>sentinel-datasource-nacos</artifactId>
            <version>1.8.0</version>
        </dependency>
public class FlowRuleInitFunc implements InitFunc{
    private final String nacosAddress="192.168.216.128:8848";
    private final String groupId="SENTINEL_GROUP";
    private final String dataId="-flow-rules";

    private final String appName="App-Test";

    @Override
    public void init() throws Exception {
        registerFlowRule();
    }
    

    private void registerFlowRule(){
        // 从远程服务器上加载规则
        ReadableDataSource<String,List<FlowRule>> flowRuleDs=
                new NacosDataSource<List<FlowRule>>(nacosAddress,groupId,appName+dataId,
                                                         // 当数据发生变化后,会回调这个方法
                        source-> JSON.parseObject(source,new TypeReference<List<FlowRule>>(){}));
   	// 这行代码将上面创建的数据源注册到中。是Sentinel中管理流控规则的组件,方法用于将提供的(属性)注册到。这样,每当Nacos中的配置发生变化时,会获取最新的规则,并通知更新内存中的流控规则。
        FlowRuleManager.register2Property(flowRuleDs.getProperty());
    }
}

此时在nacos中修改对应的配置,就能够监听到,然后更新到内存中的流控规则中

在这里插入图片描述

  • 通过访问测试,即可看到被限流的效果。
  • 也可以在 ${用户}/logs/csp/sentinel-record.log.2020-09-22 文件中看到sentinel启动过程中动态数据源的加载过程。

基于配置文件的动态限流

spring.cloud.sentinel.transport.clientIp=192.168.216.128:7777
spring.cloud.sentinel.datasource.nacos.nacos.serverAddr=192.168.216.128:8848
spring.cloud.sentinel.datasource.nacos.nacos.dataId=com.gupaoedu.sentinel.demo.flow.rule
spring.cloud.sentinel.datasource.nacos.nacos.groupId=SENTINEL_GROUP
spring.cloud.sentinel.datasource.nacos.nacos.dataType=json
spring.cloud.sentinel.datasource.nacos.nacos.ruleType=flow
spring.cloud.sentinel.datasource.nacos.nacos.username=nacos
spring.cloud.sentinel.datasource.nacos.nacos.password=nacos

在这里的配置会出现一个问题,就是修改 sentinel 控制台配置,和nacos上的配置不一致,这样的话最简单的就是修改sentinel的源码,当sentinel控制台变化的时候,动态去修改nacos上的配置即可。

集群限流

token-client代表我们的应用,当一个请求过来以后,经过token-client会进行一个动态的判断,token-server里面采用的是一个动态数据源,此时就能够实现集群限流的思想了,当来一个请求的时候,token-client先去token-server中去判断是否能够通过,如果通过才能执行,否则限流。

当token-server挂了之后,能够直接连接nacos配置中心的规则进行一个处理

在这里插入图片描述

token -server

public static void main(String[] args) throws Exception {
        ClusterTokenServer tokenServer=new SentinelDefaultTokenServer();
        //手动载入namespace和serverTransportConfig的配置到ClusterServerConfigManager
        //集群限流服务端通信相关配置
        ClusterServerConfigManager.loadGlobalTransportConfig(
                new ServerTransportConfig().setIdleSeconds(600).setPort(9999));
        //加载namespace集合列表() , namespace也可以放在配置中心
        ClusterServerConfigManager.loadServerNamespaceSet(Collections.singleton("App-Test"));
        tokenServer.start();
        //Token-client会上报自己的project.name到token-server。Token-server会根据namespace来统计连接数
    }


// 主要作用是从 Nacos 配置中心获取流控规则,并将其应用到 Sentinel 中,以实现动态的流控规则管理。
public class FlowRuleInitFunc implements InitFunc{
    private final String nacosAddress="192.168.216.128:8848";
    private final String groupId="SENTINEL_GROUP";
    private String dataId="-flow-rules";

    @Override
    public void init() throws Exception {
        ClusterFlowRuleManager.setPropertySupplier(namespace->{
            ReadableDataSource<String,List<FlowRule>> flowRuleDs=
                    new NacosDataSource<List<FlowRule>>(nacosAddress,groupId,namespace+dataId,
                            source-> JSON.parseObject(source,new TypeReference<List<FlowRule>>(){}));
            return flowRuleDs.getProperty();
        });
    }
}

在这里插入图片描述

token-client

public class FlowRuleInitFunc implements InitFunc{
    private final String nacosAddress="192.168.216.128:8848";
    private final String groupId="SENTINEL_GROUP";
    private final String dataId="-flow-rules";

    private final String clusterServerHost="localhost";
    private final int clusterServerPort=9999;
    private final int requestTimeOut=20000;
    private final String appName="App-Test";

    @Override
    public void init() throws Exception {
        loadClusterConfig();
        registerFlowRule();
    }
    private void loadClusterConfig(){
        ClusterClientAssignConfig assignConfig=new ClusterClientAssignConfig();
        assignConfig.setServerHost(clusterServerHost); //放到配置中心
        assignConfig.setServerPort(clusterServerPort);
        ClusterClientConfigManager.applyNewAssignConfig(assignConfig);
        ClusterClientConfig clientConfig=new ClusterClientConfig();
        clientConfig.setRequestTimeout(requestTimeOut);  //放到配置中心
        ClusterClientConfigManager.applyNewConfig(clientConfig);
    }

    private void registerFlowRule(){
        ReadableDataSource<String,List<FlowRule>> flowRuleDs=
                new NacosDataSource<List<FlowRule>>(nacosAddress,groupId,appName+dataId,
                        source-> JSON.parseObject(source,new TypeReference<List<FlowRule>>(){}));
        FlowRuleManager.register2Property(flowRuleDs.getProperty());
    }
}

以上就是一个集群流控的核心实现了。

在这里插入图片描述

总结:集群限流配置一个总的qps,多个节点可以配置权重,或者均摊的方式来起到限流的作用。

熔断降级

Sentinel的熔断降级是一种服务保护机制,用于在系统出现异常或超载时,自动限制对受影响服务的访问,防止其继续受到压力而导致系统崩溃。熔断降级可以通过设置阈值和规则来实现,一旦达到设定的条件,系统将自动停止对服务的访问,直到服务恢复正常或超载情况解除。这样可以保护系统免受过载和异常情况的影响,确保系统的稳定性和可靠性。

熔断指标

怎么判断这个服务是一个不稳定的状态?

异常数:

平均响应时间:

异常比例数:10s内,20次请求里面,有百分之五十及以上的错误率,就认为其要触发熔断,而hystrix只有这一种

熔断规则

规则,也就是要设计对应的指标,指标设计为多少才会熔断!

比如说:

  • 1min内,异常数量超过50%,触发熔断
  • 1s 5个请求,平均响应时间超过一个阈值(1000ms)
  • 1min内,超过了多少个异常数量

熔断时间窗口

当处于熔断状态的时候,多长时间内,请求都不会发送到服务端,熔断窗口也是一个需要设置的阈值

熔断实践

public class DataSourceInitFunc implements InitFunc{
    public void init() throws Exception{
        List<DegradeRule> rules = new ArrayList<>();
        DegradeRule rule = new DegradeRule();
        rule.setResource("com.gupaodu.springcloud.dubbo.ISayHelloService");// 表示针对那个服务或者方法的熔断
        // 针对这个类下面所有的接口进行资源的判断,当这个类下面任何一个方法触发了熔断,那么调用这个类下面的任何一个接口都会自动触发降级。
        // 设置模式
        rule.setGrade(RuleConstant.Degrade.DEGRADE_CRADE_EXCEPTION_COUNT)// 错误数 超时时间 错误率 指标
        rule.setCount(3);// 阈值
        rule.setTimeWindow(100)// 时间窗口 100s
        rules.add(rule);
        DegradeRuleManager.loadRules(rules);
    }
}


public interface ISayHelloService{
    
    String sayHello(String msg);
    
	String exceptionTest();
}

@DubboService
public class SayHelloServiceImpl implement ISayHelloService{
    public String sayHello(String msg){
        return "";
    }
    
    // 用来触发测试
    public String exceptionTest(){
        throw new RuntimeException("biz exception");
    }
}

@RestController
public class SentinelController {
    @DubboReference(mock="......SayHelloServiceMock")
    ISayHelloService sayHelloService;
    
    @GetMapping("/say")
    public String say(){
        return sayHelloService.sayHello("");
    }
    
    @GetMapping("/exception")
    public String exception(){
        return sayHelloService.exceptionTest();
    }
}

public class SayHelloServiceMock implements ISayHelloService{
    public String sayHello(String msg){
        return "触发了降级,返回默认数据";
    }
    
    // 用来触发测试
    public String exceptionTest(){
        return "触发了降级,返回默认数据"
    }
}

其实本质就是 当触发熔断之后,该接口下其他方法在时间窗口内 也不能访问,只能被降级。

sentinel的实现

构建一个资源

设置一个规则

指标范围:

  • 当前的qps
  • 当前的总的并发数线程数
  • 当前失败请求数是多少?

在这里插入图片描述

此图为官方提供的图,一个请求过来,它会在 调用链路构建处 构建一个 Invocation Tree结构,也就是将我们请求的资源构建成一个节点,以树形的方式去构建。然后就是针对某个节点进行监控统计,主要是以环形数组的方式哦存,其实也就是每一个node维护了一个滑动窗口,其统计了针对这个资源总的信息去进行指标统计,然后通过一个判断责任链去鉴别触发了那些规则。

分析sentinel源码实现

public static Entry entry(String name, int resourceType, EntryType trafficType) throws BlockException {
        return Env.sph.entryWithType(name, resourceType, trafficType, 1, OBJECTS0);
    }

public Entry entryWithType(String name, int resourceType, EntryType entryType, int count, Object[] args) throws BlockException {
        return this.entryWithType(name, resourceType, entryType, count, false, args);
    }

public Entry entryWithType(String name, int resourceType, EntryType entryType, int count, boolean prioritized, Object[] args) throws BlockException {
        StringResourceWrapper resource = new StringResourceWrapper(name, entryType, resourceType);
        return this.entryWithPriority(resource, count, prioritized, args);
    }

private Entry entryWithPriority(ResourceWrapper resourceWrapper, int count, boolean prioritized, Object... args) throws BlockException {
    // 获取线程上下文
        Context context = ContextUtil.getContext();
        if (context instanceof NullContext) {
            return new CtEntry(resourceWrapper, (ProcessorSlot)null, context);
        } else {
            if (context == null) {
                context = CtSph.InternalContextUtil.internalEnter("sentinel_default_context");
            }

            // 是否开启或关闭流控
            if (!Constants.ON) {
                return new CtEntry(resourceWrapper, (ProcessorSlot)null, context);
            } else {
                // 构建了一个调用链
                ProcessorSlot<Object> chain = this.lookProcessChain(resourceWrapper);
------------------------------------------------------------------------------
ProcessorSlot<Object> lookProcessChain(ResourceWrapper resourceWrapper) {
        ProcessorSlotChain chain = (ProcessorSlotChain)chainMap.get(resourceWrapper);
        if (chain == null) {
            synchronized(LOCK) {
                chain = (ProcessorSlotChain)chainMap.get(resourceWrapper);
                if (chain == null) {
                    // 超过阈值就直接返回null
                    if (chainMap.size() >= 6000) {
                        return null;
                    }
					// 单例模式创建调用链
                    chain = SlotChainProvider.newSlotChain();
                    Map<ResourceWrapper, ProcessorSlotChain> newMap = new HashMap(chainMap.size() + 1);
                    newMap.putAll(chainMap);
                    newMap.put(resourceWrapper, chain);
                    chainMap = newMap;
                    
                    // 如果直接添加进去的话,涉及到扩容,所以采用创建之后赋值的方法
                }
            }
        }

        return chain;
    }
    
public static ProcessorSlotChain newSlotChain() {
        if (slotChainBuilder != null) {
            return slotChainBuilder.build();
        } else {
            slotChainBuilder = (SlotChainBuilder)SpiLoader.of(SlotChainBuilder.class).loadFirstInstanceOrDefault();
            if (slotChainBuilder == null) {
                RecordLog.warn("[SlotChainProvider] Wrong state when resolving slot chain builder, using default", new Object[0]);
                slotChainBuilder = new DefaultSlotChainBuilder();
            } else {
                RecordLog.info("[SlotChainProvider] Global slot chain builder resolved: {}", new Object[]{slotChainBuilder.getClass().getCanonicalName()});
            }

            return slotChainBuilder.build();
        }
    }
                
 public ProcessorSlotChain build() {
        ProcessorSlotChain chain = new DefaultProcessorSlotChain();
        List<ProcessorSlot> sortedSlotList = SpiLoader.of(ProcessorSlot.class).loadInstanceListSorted();
        Iterator var3 = sortedSlotList.iterator();
		// 然后在这里进行一个遍历
        while(var3.hasNext()) {
            ProcessorSlot slot = (ProcessorSlot)var3.next();
            if (!(slot instanceof AbstractLinkedProcessorSlot)) {
                RecordLog.warn("The ProcessorSlot(" + slot.getClass().getCanonicalName() + ") is not an instance of AbstractLinkedProcessorSlot, can't be added into ProcessorSlotChain", new Object[0]);
            } else {
                chain.addLast((AbstractLinkedProcessorSlot)slot);
            }
        }

        return chain;
    }
                
// 在这里面涉及到了SpiLoader
// 其实也就是在这个链路中,可以进入到我们自己的 自定义的slot 中

在这里插入图片描述

在这里插入图片描述

if (chain == null) {
                    return new CtEntry(resourceWrapper, (ProcessorSlot)null, context);
                } else {
                    CtEntry e = new CtEntry(resourceWrapper, chain, context);
					
                    try {
                        // 构建好以后然后通过链式去处理
                        chain.entry(context, resourceWrapper, (Object)null, count, prioritized, args);
                    } catch (BlockException var9) {
                        e.exit(count, args);
                        throw var9;
                    } catch (Throwable var10) {
                        RecordLog.info("Sentinel unexpected exception", var10);
                    }

                    return e;
                }
            }
        }
    }

在这里插入图片描述

这两个就是前面说的滑动窗口要去统计的东西

前面传进来的资源包装成一个 resourceWrapper对象

在这里插入图片描述

当我们打算进入 chain.entry 的时候,发现其实就是一系列的slot

在这里插入图片描述

树的构建 NodeSelectorSlot

public void entry(Context context, ResourceWrapper resourceWrapper, Object obj, int count, boolean prioritized, Object... args) throws Throwable {
        DefaultNode node = (DefaultNode)this.map.get(context.getName());
    // 双重检查锁 + 缓存
        if (node == null) {
            synchronized(this) {
                node = (DefaultNode)this.map.get(context.getName());
                if (node == null) {
                    node = new DefaultNode(resourceWrapper, (ClusterNode)null);
                    HashMap<String, DefaultNode> cacheMap = new HashMap(this.map.size());
                    cacheMap.putAll(this.map);
                    cacheMap.put(context.getName(), node);
                    this.map = cacheMap;
                    ((DefaultNode)context.getLastNode()).addChild(node);
                }
            }
        }

        context.setCurNode(node);
    // 当上述构建操作完成后
    // 释放entry
        this.fireEntry(context, resourceWrapper, node, count, prioritized, args);
    }
// 释放的本质其实就是执行下一个
public void fireEntry(Context context, ResourceWrapper resourceWrapper, Object obj, int count, boolean prioritized, Object... args) throws Throwable {
        if (this.next != null) {
            this.next.transformEntry(context, resourceWrapper, obj, count, prioritized, args);
        }

    }

统计监控 StatisticSlot

实际上是并发量比较大的情况下做数据统计

public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count, boolean prioritized, Object... args) throws Throwable {
        Iterator var8;
        ProcessorSlotEntryCallback handler;
        try {
            // 先交给后续的slot进行处理
            this.fireEntry(context, resourceWrapper, node, count, prioritized, args);
            // 然后开始统计资源
            // node代表当前传过来的资源
            node.increaseThreadNum();
            node.addPassRequest(count);
-----------------------------------------------------------------------------------
public void addPassRequest(int count) {
        super.addPassRequest(count);
        this.clusterNode.addPassRequest(count);
    }
public void addPassRequest(int count) {
        this.rollingCounterInSecond.addPass(count); // 秒级别的统计
        this.rollingCounterInMinute.addPass(count); // 分钟级别的统计
    }
    
this.rollingCounterInSecond = new ArrayMetric(SampleCountProperty.SAMPLE_COUNT, IntervalProperty.INTERVAL);
            
SampleCountProperty.SAMPLE_COUNT = 2;
IntervalProperty.INTERVAL = 1000;

// 上面有点像滑动窗口的概念
public void addPass(int count) {
        WindowWrap<MetricBucket> wrap = this.data.currentWindow();
    // 根据某一个指标加进去 这里面的本质就是通过当前的时间去得到一个窗口,将其加进去
    // LongAdder的思想 其底层就是数组累加的思想 !!!
        ((MetricBucket)wrap.value()).addPass(count);
    }
 // 当最终计数完成后,相当于形成了一开始的那个图的样子
            
            
private final LeapArray<MetricBucket> data;

    public ArrayMetric(int sampleCount, int intervalInMs) {
        this.data = new OccupiableBucketLeapArray(sampleCount, intervalInMs);
    }
            
            
public OccupiableBucketLeapArray(int sampleCount, int intervalInMs) {
        super(sampleCount, intervalInMs);
        this.borrowArray = new FutureBucketLeapArray(sampleCount, intervalInMs);
    }

            // super(sampleCount, intervalInMs);
public LeapArray(int sampleCount, int intervalInMs) {
        AssertUtil.isTrue(sampleCount > 0, "bucket count is invalid: " + sampleCount);
        AssertUtil.isTrue(intervalInMs > 0, "total time interval of the sliding window should be positive");
        AssertUtil.isTrue(intervalInMs % sampleCount == 0, "time span needs to be evenly divided");
        this.windowLengthInMs = intervalInMs / sampleCount; // 每个窗口的时间长度
        this.intervalInMs = intervalInMs;
        this.intervalInSecond = (double)intervalInMs / 1000.0D;
        this.sampleCount = sampleCount;
        this.array = new AtomicReferenceArray(sampleCount); // sampleCount = 2
    	// 也就代表着窗口的大小是两个
    }
            
            
protected final AtomicReferenceArray<WindowWrap<T>> array;

在这里插入图片描述

           
            if (context.getCurEntry().getOriginNode() != null) {
                context.getCurEntry().getOriginNode().increaseThreadNum();
                context.getCurEntry().getOriginNode().addPassRequest(count);
            }

            if (resourceWrapper.getEntryType() == EntryType.IN) {
                Constants.ENTRY_NODE.increaseThreadNum();
                Constants.ENTRY_NODE.addPassRequest(count);
            }

            Iterator var13 = StatisticSlotCallbackRegistry.getEntryCallbacks().iterator();

            while(var13.hasNext()) {
                ProcessorSlotEntryCallback<DefaultNode> handler = (ProcessorSlotEntryCallback)var13.next();
                handler.onPass(context, resourceWrapper, node, count, args);
            }
        } catch (PriorityWaitException var10) {
            node.increaseThreadNum();
            if (context.getCurEntry().getOriginNode() != null) {
                context.getCurEntry().getOriginNode().increaseThreadNum();
            }

            if (resourceWrapper.getEntryType() == EntryType.IN) {
                Constants.ENTRY_NODE.increaseThreadNum();
            }

            var8 = StatisticSlotCallbackRegistry.getEntryCallbacks().iterator();

            while(var8.hasNext()) {
                handler = (ProcessorSlotEntryCallback)var8.next();
                handler.onPass(context, resourceWrapper, node, count, args);
            }
        } catch (BlockException var11) {
            BlockException e = var11;
            context.getCurEntry().setBlockError(var11);
            node.increaseBlockQps(count);
            if (context.getCurEntry().getOriginNode() != null) {
                context.getCurEntry().getOriginNode().increaseBlockQps(count);
            }

            if (resourceWrapper.getEntryType() == EntryType.IN) {
                Constants.ENTRY_NODE.increaseBlockQps(count);
            }

            var8 = StatisticSlotCallbackRegistry.getEntryCallbacks().iterator();

            while(var8.hasNext()) {
                handler = (ProcessorSlotEntryCallback)var8.next();
                handler.onBlocked(e, context, resourceWrapper, node, count, args);
            }

            throw e;
        } catch (Throwable var12) {
            context.getCurEntry().setError(var12);
            throw var12;
        }

    }

流量控制 FlowSlot

public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count, boolean prioritized, Object... args) throws Throwable {
        this.checkFlow(resourceWrapper, context, node, count, prioritized);
        this.fireEntry(context, resourceWrapper, node, count, prioritized, args);
    }

void checkFlow(ResourceWrapper resource, Context context, DefaultNode node, int count, boolean prioritized) throws BlockException {
        this.checker.checkFlow(this.ruleProvider, resource, context, node, count, prioritized);
    }

public void checkFlow(Function<String, Collection<FlowRule>> ruleProvider, ResourceWrapper resource, Context context, DefaultNode node, int count, boolean prioritized) throws BlockException {
        if (ruleProvider != null && resource != null) {
            // 获取针对这个资源的限流规则
            Collection<FlowRule> rules = (Collection)ruleProvider.apply(resource.getName());
            if (rules != null) {
                Iterator var8 = rules.iterator();

                while(var8.hasNext()) {
                    FlowRule rule = (FlowRule)var8.next();
                    if (!this.canPassCheck(rule, context, node, count, prioritized)) {
                        throw new FlowException(rule.getLimitApp(), rule);
                    }
                }
            }

        }
    }

public boolean canPassCheck(FlowRule rule, Context context, DefaultNode node, int acquireCount, boolean prioritized) {
        String limitApp = rule.getLimitApp();
        if (limitApp == null) {
            return true;
        } else {
            return rule.isClusterMode() ? passClusterCheck(rule, context, node, acquireCount, prioritized) : passLocalCheck(rule, context, node, acquireCount, prioritized);
        }
    }

private static boolean passLocalCheck(FlowRule rule, Context context, DefaultNode node, int acquireCount, boolean prioritized) {
        Node selectedNode = selectNodeByRequesterAndStrategy(rule, context, node);
    	// 在这里再根据不同的策略去决定拒绝的要怎么办
        return selectedNode == null ? true : rule.getRater().canPass(selectedNode, acquireCount, prioritized);
    }

static Node selectNodeByRequesterAndStrategy(FlowRule rule, Context context, DefaultNode node) {
        String limitApp = rule.getLimitApp();
        int strategy = rule.getStrategy();
        String origin = context.getOrigin();
    	// 根据不同的来源进行限流
        if (limitApp.equals(origin) && filterOrigin(origin)) {
            return strategy == 0 ? context.getOriginNode() : selectReferenceNode(rule, context, node);
        } else if ("default".equals(limitApp)) {
            return (Node)(strategy == 0 ? node.getClusterNode() : selectReferenceNode(rule, context, node));
        } else if ("other".equals(limitApp) && FlowRuleManager.isOtherOrigin(origin, rule.getResource())) {
            return strategy == 0 ? context.getOriginNode() : selectReferenceNode(rule, context, node);
        } else {
            return null;
        }
    }


public boolean canPass(Node node, int acquireCount, boolean prioritized) {
    	// 根据资源获取当前的使用量
        int curCount = this.avgUsedTokens(node);
    	// 如果当前使用量 + 请求 > 阈值 则按照策略进行限流
        if ((double)(curCount + acquireCount) > this.count) {
            if (prioritized && this.grade == 1) {
                long currentTime = TimeUtil.currentTimeMillis();
                long waitInMs = node.tryOccupyNext(currentTime, acquireCount, this.count);
                if (waitInMs < (long)OccupyTimeoutProperty.getOccupyTimeout()) {
                    node.addWaitingRequest(currentTime + waitInMs, acquireCount);
                    node.addOccupiedPass(acquireCount);
                    this.sleep(waitInMs);
                    throw new PriorityWaitException(waitInMs);
                }
            }

            return false;
        } else {
            return true;
        }
    }

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1291486.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

CAP理论详解

引言 随着分布式系统在现代应用中的广泛应用&#xff0c;工程师们不得不面对诸如数据一致性、可用性和分区容错性等问题。CAP定理作为分布式系统设计的基石之一&#xff0c;为我们提供了在这些问题之间做出权衡的理论依据。本文将深入探讨CAP定理的技术细节、先进性&#xff0…

深度学习记录--广播(Broadcasting)

什么是广播&#xff1f; 广播(Broadcasting)&#xff0c;在python中是一种矩阵初等运算的手段&#xff0c;用于将一个常数扩展成一个矩阵&#xff0c;使得运算可行 广播的作用 比如&#xff1a; 一个1*n的矩阵要和常数b相加&#xff0c;广播使得常数b扩展成一个1*n的矩阵 …

mysql的几种索引

mysql索引的介绍可以mysql官网的词汇表中搜索&#xff1a; https://dev.mysql.com/doc/refman/8.0/en/glossary.html mysql可以在表的一列、或者多列上创建索引&#xff0c;索引的类型可以选择&#xff0c;如下&#xff1a; 普通索引&#xff08;KEY&#xff09; 普通索引可…

SQLserver截取字符串

当我们存的数据是json的时候可以全部取出在模糊查询但是有多个重复数据的时候就没办法准确的模糊出来这个时候我们就需要用的字符串截取 --创建函数create FUNCTION [dbo].[Fmax] (str varchar(50),start VARCHAR(50),length VARCHAR(50)) RETURNS varchar(max) AS BEGINDEC…

McBSP接口概念和使用

copy McBSP包括一个数据通道和一个控制通道&#xff0c;通过7个引脚与外部设备连接。数据发送引脚DX负责数据的发送&#xff0c;数据接收引脚DR负责数据的接收&#xff0c;发送时钟引脚CLKX&#xff0c;接收时钟引脚CLKR&#xff0c;发送帧同步引脚FSX和接收帧同步引脚FSR提供…

Python语言基础知识(一)

文章目录 1、Python内置对象介绍2、标识符与变量3、数据类型—数字4、数据类型—字符串与字节串5、数据类型—列表、元组、字典、集合6、运算符和表达式7、运算符和表达式—算术运算符8、运算符和表达式—关系运算符9.1、运算符和表达式— 成员测试运算符in9.2、运算符和表达式…

外贸平台自建站的教程?做海洋建站的好处?

外贸平台自建站怎么做&#xff1f;搭建网站的具体流程有哪些&#xff1f; 作为外贸从业者&#xff0c;借助互联网平台自建站点已经成为推广业务、拓展市场的一种重要手段。海洋建站将为您提供一份详尽的外贸平台自建站的教程&#xff0c;助您在网络空间中展现您的企业魅力。 …

RocketMQ-RocketMQ⾼性能核⼼原理分析

一、源码环境搭建 1、主要功能模块 ​ RocketMQ的官方Git仓库地址&#xff1a;https://github.com/apache/rocketmq 可以用git把项目clone下来或者直接下载代码包。 ​ 也可以到RocketMQ的官方网站上下载指定版本的源码&#xff1a; http://rocketmq.apache.org/dowloading/…

新能源车交直流充电解释

交流充电&#xff1a; 国家电网输出的电都是交流电&#xff0c;如下图所示&#xff0c;具有正弦切换规律的 而电动车的电池只能接受直流电&#xff0c;因此需要首先把交流电转换成直流电才能充进汽车电池&#xff0c;这就需要到了转换器OBC&#xff08;on-board Charger&#…

PPOCRv3检测模型和识别模型的训练和推理

PPOCRv3检测模型和识别模型的训练和推理 文章目录 PPOCRv3检测模型和识别模型的训练和推理前言一、环境安装1&#xff0c;官方推荐环境&#xff1a;2&#xff0c;本机GPU环境 二、Conda虚拟环境1.Win10安装Anaconda32.使用conda创建虚拟环境 三、安装PPOCR环境1&#xff0c;安装…

基于ssm人事管理信息系统论文

摘 要 现代经济快节奏发展以及不断完善升级的信息化技术&#xff0c;让传统数据信息的管理升级为软件存储&#xff0c;归纳&#xff0c;集中处理数据信息的管理方式。本人事管理信息系统就是在这样的大环境下诞生&#xff0c;其可以帮助管理者在短时间内处理完毕庞大的数据信息…

跨境电商危机公关:应对负面舆情的策略优化

随着跨境电商的快速发展&#xff0c;企业在全球市场中面临的竞争与挑战也日益复杂。在这个数字时代&#xff0c;负面舆情一旦爆发&#xff0c;可能对企业形象和经营造成深远影响。 因此&#xff0c;跨境电商企业需要建立有效的危机公关策略&#xff0c;以迅速、果断、有效地应…

Vue + Element 实现按钮指定间隔时间点击

1、业务需求 需要加一个按钮&#xff0c;调用第三方API&#xff0c;按钮十分钟之内只能点击一次&#xff0c;刷新页面也只能点击一次 2、思路 加一个本地缓存的时间戳&#xff0c;通过时间戳计算指定时间内不能点击按钮 3、实现 1&#xff09;vue页面 <template>&l…

angular项目怎么给iframe动态赋值

前段时间在做项目的时候&#xff0c;给项目嵌入了一个第三方的ai链接&#xff0c;之前写成一个死的链接&#xff0c;测试都正常&#xff0c;但是后期迭代的时候将链接后面动态添加了一个参数&#xff0c;发现iframe不出来&#xff0c;并且查看dom结构&#xff0c;直接src对应的…

python中通过print输出信息

直接输出信息 例如&#xff1a; print("hello python")通过逗号将变量分割 例如&#xff1a; age 12 print("age is ", age)输出&#xff1a; 使用%占位符 例如&#xff1a; age 12 print("age is %d" %age)输出&#xff1a; 用f格式…

备战2024前端春招,你准备好了吗?(附10万字答案解析)

1. 前端知识体系 在说前端面试体系之前&#xff0c;先来看一下之前整理的前端知识体系图&#xff08;可能不太完整&#xff0c;毕竟我只是一个刚毕业一个多月的小菜鸡&#xff09;&#xff0c;这只是一个基础版的前端知识体系图&#xff0c;适合刚入门前端的小伙伴参考&#x…

openeuler安装深度桌面dde

安装深度开发的桌面dde sudo dnf makecachesudo dnf install ddesudo systemctl set-default graphical.target重启系统 reboot

JPA与MySQL锁实战

前言&#xff1a;最近使用jpa和mysql时&#xff0c;遇到了死锁问题。在解决后将一些排查过程中新学到和复习到的知识点再总结整理一下。首先对InnoDB中锁相关的概念进行介绍&#xff0c;然后展示如何利用JPA提供的排他锁来实现想要的功能&#xff0c;最后对死锁问题进行讨论。 …

【EI会议征稿】第三届密码学、网络安全和通信技术国际会议(CNSCT 2024)

第三届密码学、网络安全和通信技术国际会议&#xff08;CNSCT 2024&#xff09; 2024 3rd International Conference on Cryptography, Network Security and Communication Technology 随着互联网和网络应用的不断发展&#xff0c;网络安全在计算机科学中的地位越来越重要&…

solidity案例详解(六)服务评价合约

有服务提供商和用户两类实体&#xff0c;其中服务提供商部署合约&#xff0c;默认诚信为true&#xff0c;用户负责使用智能合约接受服务及评价&#xff0c;服务提供商的评价信息存储在一个映射中&#xff0c;可以根据服务提 供商的地址来查找评价信息。用户评价信息&#xff0c…