Sentine 源码分析之--ParamFlowSlot

news2024/9/23 7:33:58

前言:

上一篇我对 Sentinel 中的 AuthoritySlot、SystemSlot、GatewayFlowSlot 的相关源码进行了分析,本篇我们开始分析 ParamFlowSlot 相关的源码。

Sentinel 系列文章传送门:

Sentinel 初步认识及使用

Sentinel 核心概念和工作流程详解

Spring Cloud 整合 Nacos、Sentinel、OpenFigen 实战【微服务熔断降级实战】

Sentinel 源码分析入门【Entry、Chain、Context】

Sentine 源码分析之–NodeSelectorSlot、ClusterBuilderSlot、StatisticSlot

Sentine 源码分析之–AuthoritySlot、SystemSlot、GatewayFlowSlot

在这里插入图片描述

ParamFlowSlot

ParamFlowSlot 是负责热点参数限流规则,我们在 Sentinel 控制台配置限流规则如下:

![在这里插入图片描述](https://i-blog.csdnimg.cn/direct/3946df15c7e54d4490c8e354dd0c1578.png
当前规则是以 QPS 限流。

  • 单机阀值:就是最大令牌数量 maxCount。
  • 统计窗口时长:就是统计时长 duration。

ParamFlowSlot#entry 方法源码解析

ParamFlowSlot#entry 方法逻辑非常简单,先判断当前规则是否有热点参数限流规则,如果没有直接进入下一个 Slot,如果有限流规则,则执行限流规则判断后进入下一个 Slot,我们重点分析执行限流规则源码。


//com.alibaba.csp.sentinel.slots.block.flow.param.ParamFlowSlot#entry
@Override
public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count,
				  boolean prioritized, Object... args) throws Throwable {
	//当前资源是否设置热点限流规则
	if (!ParamFlowRuleManager.hasRules(resourceWrapper.getName())) {
		//没有设置热点限流规则 直接放行 进入下一个 slot
		fireEntry(context, resourceWrapper, node, count, prioritized, args);
		return;
	}
	//热点规则判断
	checkFlow(resourceWrapper, count, args);
	//进入下一个 slot
	fireEntry(context, resourceWrapper, node, count, prioritized, args);
}

ConfigCacheService#dumpBeta 方法源码解析

ParamFlowSlot#checkFlow 方法主要做了一下几件事:

  • 校验参数及规则,如果参数为空或者没有设置热点参数规则,则直接返回。
  • 获取当前资源的热点规则进行遍历处理。
  • 计算热点规则参数的索引
  • 初始化参数指标
  • 进行热点参数规则校验,规则校验通过不做处理,规则校验不通过则抛出异常,会被 StatisticSlot 捕获。
//com.alibaba.csp.sentinel.slots.block.flow.param.ParamFlowSlot#checkFlow
void checkFlow(ResourceWrapper resourceWrapper, int count, Object... args) throws BlockException {
	//参数为空判断
	if (args == null) {
		return;
	}
	//当前资源是否设置热点规则
	if (!ParamFlowRuleManager.hasRules(resourceWrapper.getName())) {
		return;
	}
	//获取当前资源的热点参数规则
	List<ParamFlowRule> rules = ParamFlowRuleManager.getRulesOfResource(resourceWrapper.getName());
	//遍历规则
	for (ParamFlowRule rule : rules) {
		//计算热点规则参数的索引
		applyRealParamIdx(rule, args.length);

		// Initialize the parameter metrics.
		//初始化参数指标
		ParameterMetricStorage.initParamMetricsFor(resourceWrapper, rule);
		//规则校验
		if (!ParamFlowChecker.passCheck(resourceWrapper, rule, count, args)) {
			//没有通过规则校验
			//触发参数 也就是热点参数
			String triggeredParam = "";
			//判断 参数的长度是否大于 规则中的参数索引
			if (args.length > rule.getParamIdx()) {
				//热点规则赋值
				Object value = args[rule.getParamIdx()];
				triggeredParam = String.valueOf(value);
			}
			 //抛出异常 ParamFlowException 集成了 BlockException 会被 StatisticSlot 的 catch 捕获
			throw new ParamFlowException(resourceWrapper.getName(), triggeredParam, rule);
		}
	}
}

ParamFlowSlot#applyRealParamIdx 方法源码解析

ParamFlowSlot#applyRealParamIdx 主要是处理参数索引不规范的情况,例如参数索引为负数。

//com.alibaba.csp.sentinel.slots.block.flow.param.ParamFlowSlot#applyRealParamIdx
void applyRealParamIdx(/*@NonNull*/ ParamFlowRule rule, int length) {
	//获取参数的索引值
	int paramIdx = rule.getParamIdx();
	//是否小于0
	if (paramIdx < 0) {
		if (-paramIdx <= length) {
			//设置索引
			rule.setParamIdx(length + paramIdx);
		} else {
			// Illegal index, give it a illegal positive value, latter rule checking will pass.
			//非法索引 给一个负数 后面检查的时候会通过
			rule.setParamIdx(-paramIdx);
		}
	}
}

ParameterMetricStorage#initParamMetricsFor 方法源码解析

ParameterMetricStorage#initParamMetricsFor 方法主要是创建参数统计 ParameterMetric 指标,然后根据资源名称添加到缓存 metricsMap中,然后执行初始化操作。

//com.alibaba.csp.sentinel.slots.block.flow.param.ParameterMetricStorage#initParamMetricsFor
public static void initParamMetricsFor(ResourceWrapper resourceWrapper, /*@Valid*/ ParamFlowRule rule) {
	//资源判断
	if (resourceWrapper == null || resourceWrapper.getName() == null) {
		return;
	}
	//获取资源名称
	String resourceName = resourceWrapper.getName();
	//参数指标
	ParameterMetric metric;
	// Assume that the resource is valid.
	//指标为空判断
	if ((metric = metricsMap.get(resourceName)) == null) {
		//指标为空 加锁
		synchronized (LOCK) {
			//再次指标为空判断 double check
			if ((metric = metricsMap.get(resourceName)) == null) {
				//创建指标
				metric = new ParameterMetric();
				//加入指标缓存中
				metricsMap.put(resourceWrapper.getName(), metric);
				RecordLog.info("[ParameterMetricStorage] Creating parameter metric for: " + resourceWrapper.getName());
			}
		}
	}
	//指标初始化
	metric.initialize(rule);
}


ParameterMetric#initialize 方法源码解析

ParameterMetric#initialize 方法主要初始化了三个 Map,具体如下:

  • ruleTimeCounters:记录令牌桶的最后添加时间,用于QPS限流。
  • ruleTokenCounter :记录令牌桶的令牌数量,用于QPS限流。
  • threadCountMap :用于记录线程数量。
//com.alibaba.csp.sentinel.slots.block.flow.param.ParameterMetric#initialize
public void initialize(ParamFlowRule rule) {
	//ruleTimeCounters 记录热点参数上次添加令牌的时间
	//ruleTimeCounters 是否包括 规则 rule
	if (!ruleTimeCounters.containsKey(rule)) {
		//加锁
		synchronized (lock) {
			//判断 rule 在 ruleTimeCounters 中是否为 null
			if (ruleTimeCounters.get(rule) == null) {
				//为空
				long size = Math.min(BASE_PARAM_MAX_CAPACITY * rule.getDurationInSec(), TOTAL_MAX_CAPACITY);
				//加入 ruleTimeCounters
				ruleTimeCounters.put(rule, new ConcurrentLinkedHashMapWrapper<Object, AtomicLong>(size));
			}
		}
	}

	//ruleTokenCounter 记录热点参数剩余的令牌数
	//ruleTokenCounter 是否包含 rule
	if (!ruleTokenCounter.containsKey(rule)) {
		//加锁
		synchronized (lock) {
			//rule 在 ruleTokenCounter 是否为 null
			if (ruleTokenCounter.get(rule) == null) {
				//为空
				long size = Math.min(BASE_PARAM_MAX_CAPACITY * rule.getDurationInSec(), TOTAL_MAX_CAPACITY);
				//加入 ruleTokenCounter
				ruleTokenCounter.put(rule, new ConcurrentLinkedHashMapWrapper<Object, AtomicLong>(size));
			}
		}
	}

	//threadCountMap 记录线程数量 用于线程级别限流
	//threadCountMap 是否包含热点参数索引
	if (!threadCountMap.containsKey(rule.getParamIdx())) {
		//加锁
		synchronized (lock) {
			//为空判断
			if (threadCountMap.get(rule.getParamIdx()) == null) {
				//加入 threadCountMap
				threadCountMap.put(rule.getParamIdx(),
					new ConcurrentLinkedHashMapWrapper<Object, AtomicInteger>(THREAD_COUNT_MAX_CAPACITY));
			}
		}
	}
}


private final Map<ParamFlowRule, CacheMap<Object, AtomicLong>> ruleTimeCounters = new HashMap<>();

private final Map<ParamFlowRule, CacheMap<Object, AtomicLong>> ruleTokenCounter = new HashMap<>();

private final Map<Integer, CacheMap<Object, AtomicInteger>> threadCountMap = new HashMap<>();

ParamFlowChecker#passCheck 方法源码解析

ParamFlowChecker#passCheck 方法主要做了两件事,如下:

  • 对参数值及参数索引进行判断,是否为空及符合标准。
  • 判断规则的模式,根据判断结果去执行集群模式的逻辑或者单机模式的逻辑。
//com.alibaba.csp.sentinel.slots.block.flow.param.ParamFlowChecker#passCheck
public static boolean passCheck(ResourceWrapper resourceWrapper, /*@Valid*/ ParamFlowRule rule, /*@Valid*/ int count,
						 Object... args) {
	//参数为空判断
	if (args == null) {
		return true;
	}
	//获取索引
	int paramIdx = rule.getParamIdx();
	//参数长度小于参数索引 直接返回
	if (args.length <= paramIdx) {
		return true;
	}

	// Get parameter value.
	//获取参数值
	Object value = args[paramIdx];

	// Assign value with the result of paramFlowKey method
	//参数值是否是 ParamFlowArgument 类型
	if (value instanceof ParamFlowArgument) {
		value = ((ParamFlowArgument) value).paramFlowKey();
	}
	// If value is null, then pass
	//参数值为 null 直接返回
	if (value == null) {
		return true;
	}
	//是否集群模式
	if (rule.isClusterMode() && rule.getGrade() == RuleConstant.FLOW_GRADE_QPS) {
		//集群模式规则校验
		return passClusterCheck(resourceWrapper, rule, count, value);
	}
	//非集群模式规则校验
	return passLocalCheck(resourceWrapper, rule, count, value);
}

ParamFlowChecker#passClusterCheck 方法源码解析

ParamFlowChecker#passClusterCheck 该方法是集群模式的热点参数规则判断,具体如下:

  • 获取集群的客户端或者服务端的 TokenService,优先获取客户端,如果 TokenService 为空,则执行 ParamFlowChecker#fallbackToLocalOrPass 方法。
  • 请求客户端或服务端进行进行热点参数规则校验,并处理校验结果。
  • 如果这个过程发生异常也,则执行 ParamFlowChecker#fallbackToLocalOrPass 方法。
//com.alibaba.csp.sentinel.slots.block.flow.param.ParamFlowChecker#passClusterCheck
private static boolean passClusterCheck(ResourceWrapper resourceWrapper, ParamFlowRule rule, int count,
										Object value) {
	try {
		//参数值转换为集合
		Collection<Object> params = toCollection(value);
		//获取集群客户端或者服务端 优先客户端
		TokenService clusterService = pickClusterService();
		if (clusterService == null) {
			//没有获取到集群客户端和服务端 回退到本地或者通过
			// No available cluster client or server, fallback to local or
			// pass in need.
			return fallbackToLocalOrPass(resourceWrapper, rule, count, params);
		}
		//请求令牌规则校验
		TokenResult result = clusterService.requestParamToken(rule.getClusterConfig().getFlowId(), count, params);
		switch (result.getStatus()) {
			case TokenResultStatus.OK:
				//返回 ok 通过
				return true;
			case TokenResultStatus.BLOCKED:
				//返回 false 不通过
				return false;
			default:
				//默认 回退到本地或者通过
				return fallbackToLocalOrPass(resourceWrapper, rule, count, params);
		}
	} catch (Throwable ex) {
		RecordLog.warn("[ParamFlowChecker] Request cluster token for parameter unexpected failed", ex);
		//异常也是 回退到本地或者通过
		return fallbackToLocalOrPass(resourceWrapper, rule, count, value);
	}
}

ParamFlowChecker#fallbackToLocalOrPass 方法源码解析

ParamFlowChecker#fallbackToLocalOrPass 方法的逻辑十分简单,就是判断是执行单机模式模式的热点规则校验,默认是直接通过规则,而不执行单机模式的热点规则校验。

 //com.alibaba.csp.sentinel.slots.block.flow.param.ParamFlowChecker#fallbackToLocalOrPass
private static boolean fallbackToLocalOrPass(ResourceWrapper resourceWrapper, ParamFlowRule rule, int count,
											 Object value) {
	//判断规则配置 失败时候是否回退到本地 也就是单机模式
	//private boolean fallbackToLocalWhenFail = false;
	if (rule.getClusterConfig().isFallbackToLocalWhenFail()) {
		//回退到本地
		return passLocalCheck(resourceWrapper, rule, count, value);
	} else {
		// The rule won't be activated, just pass.
		//直接通过
		return true;
	}
}


ParamFlowChecker#passLocalCheck 方法源码解析

ParamFlowChecker#passLocalCheck 方法的作用主要是判断参数值的类型,然后调用 ParamFlowChecker#passSingleValueCheck 方法进行规则校验。

//com.alibaba.csp.sentinel.slots.block.flow.param.ParamFlowChecker#passLocalCheck
private static boolean passLocalCheck(ResourceWrapper resourceWrapper, ParamFlowRule rule, int count,
									  Object value) {
	try {
		if (Collection.class.isAssignableFrom(value.getClass())) {
			//集合
			for (Object param : ((Collection)value)) {
				//热点参数检查
				if (!passSingleValueCheck(resourceWrapper, rule, count, param)) {
					return false;
				}
			}
		} else if (value.getClass().isArray()) {
			//数组
			int length = Array.getLength(value);
			for (int i = 0; i < length; i++) {
				Object param = Array.get(value, i);
				//热点参数检查
				if (!passSingleValueCheck(resourceWrapper, rule, count, param)) {
					return false;
				}
			}
		} else {
			//object
			//热点参数检查
			return passSingleValueCheck(resourceWrapper, rule, count, value);
		}
	} catch (Throwable e) {
		RecordLog.warn("[ParamFlowChecker] Unexpected error", e);
	}

	return true;
}

ParamFlowChecker#passSingleValueCheck 方法源码解析

ParamFlowChecker#passSingleValueCheck 方法主要做了一下几件事:

  • 对限流规则类型进行区分(有 QPS 限流和线程数限流两种类型)。
  • 对于 QPS 限流会对限流的算法进行区分(有漏桶算法和令牌桶算法)。
  • 对于线程数限流会对比当前参数的已使用线程数和阀值进行比较,返回比较结果。
//com.alibaba.csp.sentinel.slots.block.flow.param.ParamFlowChecker#passSingleValueCheck
static boolean passSingleValueCheck(ResourceWrapper resourceWrapper, ParamFlowRule rule, int acquireCount,
									Object value) {
	if (rule.getGrade() == RuleConstant.FLOW_GRADE_QPS) {
		//QPS 限流
		//private int controlBehavior = RuleConstant.CONTROL_BEHAVIOR_DEFAULT;
		//public static final int CONTROL_BEHAVIOR_DEFAULT = 0
		if (rule.getControlBehavior() == RuleConstant.CONTROL_BEHAVIOR_RATE_LIMITER) {
			//匀速限流 漏桶算法
			return passThrottleLocalCheck(resourceWrapper, rule, acquireCount, value);
		} else {
			//默认限流 令牌桶算法
			return passDefaultLocalCheck(resourceWrapper, rule, acquireCount, value);
		}
	} else if (rule.getGrade() == RuleConstant.FLOW_GRADE_THREAD) {
		//线程限流
		//热点参数集合
		Set<Object> exclusionItems = rule.getParsedHotItems().keySet();
		//获取参数 value 线程数
		long threadCount = getParameterMetric(resourceWrapper).getThreadCount(rule.getParamIdx(), value);
		if (exclusionItems.contains(value)) {
			//热点参数包含 value
			//获取 value 的规则阀值
			int itemThreshold = rule.getParsedHotItems().get(value);
			//比较线程参数和规则阀值
			return ++threadCount <= itemThreshold;
		}
		//获取规则阀值
		long threshold = (long)rule.getCount();
		//比较线程参数和规则阀值
		return ++threadCount <= threshold;
	}

	return true;
}

ParamFlowChecker#passThrottleLocalCheck 方法源码解析

ParamFlowChecker#passThrottleLocalCheck 方法是 QPS 限流的漏桶算法的实现,主要逻辑如下:

  • 根据 ResourceWrapper 获取令牌计数器,如果令牌计数器为空直接返回规则通过。
  • 获取规则中的热点参数的令牌阀值,如果令牌阀值为 0 直接返回规则不通过。
  • 当前请求的令牌数量数量需要多久生成,也就是当前请求通过需要消耗的时间。
  • 初始化当前热点参数的时间计数器,如果返回的时间计数器为空,表示第一次访问,直接返回规则通过。
  • 计算本次请求通过期望的时间,也就是上次请求通过的时间加上本次请求的令牌的生成时间。
  • 比较本次请求通过的期望时间和当前时间的对比,如果小于当前时间或者期望通过的时间减去当前时间大于规则允许等待的时间,则获取令牌成功允许规则通过,并更新最后通过时间,否则不允许规则通过。
//com.alibaba.csp.sentinel.slots.block.flow.param.ParamFlowChecker#passThrottleLocalCheck
static boolean passThrottleLocalCheck(ResourceWrapper resourceWrapper, ParamFlowRule rule, int acquireCount,
									  Object value) {
	//获取参数指标
	ParameterMetric metric = getParameterMetric(resourceWrapper);
	//根据rule 获得最后添加令牌的时间记录map
	CacheMap<Object, AtomicLong> timeRecorderMap = metric == null ? null : metric.getRuleTimeCounter(rule);
	//从令牌桶中获取的 map 为空 返回 true 通过
	if (timeRecorderMap == null) {
		return true;
	}

	// Calculate max token count (threshold)
	//获取热门参数key
	Set<Object> exclusionItems = rule.getParsedHotItems().keySet();
	//获取规则中的令牌数 阀值
	long tokenCount = (long)rule.getCount();
	if (exclusionItems.contains(value)) {
		//热门参数包含 value
		//获取热点参数中的令牌数量
		tokenCount = rule.getParsedHotItems().get(value);
	}

	if (tokenCount == 0) {
		//令牌数量为 0 返回不通过
		return false;
	}
	//根据rule配置的每多少秒可以通过多少请求来计算出一个请求需要多少毫秒 也就是当前请求的令牌数量数量需要多久生成
	getDurationInSec()  统计窗口时间长度 默认1秒
	//private long durationInSec = 1;
	long costTime = Math.round(1.0 * 1000 * acquireCount * rule.getDurationInSec() / tokenCount);
	while (true) {
		//获取当前时间
		long currentTime = TimeUtil.currentTimeMillis();
		//timeRecorderMap 中存在 value 的记录替换  不存在 新增 并返回之前的记录  也就是初始化或者更新时间记录器
		AtomicLong timeRecorder = timeRecorderMap.putIfAbsent(value, new AtomicLong(currentTime));
		if (timeRecorder == null) {
			//之前的记录为空 表示是第一次访问 返回 true 通过
			return true;
		}
		//AtomicLong timeRecorder = timeRecorderMap.get(value);
		//上次请求通过的时间  也就是上次请求令牌的时间
		long lastPassTime = timeRecorder.get();
		//期望的本次请求通过的时间 costTime 一个请求通过需要的时间
		long expectedTime = lastPassTime + costTime;

		if (expectedTime <= currentTime || expectedTime - currentTime < rule.getMaxQueueingTimeMs()) {
			//期望的下次请求通过的时间 小于当前时间 或者
			//期望的下次请求通过的时间 - 当前时间 小于 规则中的排队时间
			//从 timeRecorderMap 中获取最后通过时间
			AtomicLong lastPastTimeRef = timeRecorderMap.get(value);
			//cas 修改 最后通过时间为当前时间
			if (lastPastTimeRef.compareAndSet(lastPassTime, currentTime)) {
				//修改成功 等待时间 等于 期望的下次请求通过的时间 - 当前时间
				long waitTime = expectedTime - currentTime;
				if (waitTime > 0) {
					//等待时间大于0  直接设置为期望通过时间
					lastPastTimeRef.set(expectedTime);
					try {
						//睡眠等待时间
						TimeUnit.MILLISECONDS.sleep(waitTime);
					} catch (InterruptedException e) {
						RecordLog.warn("passThrottleLocalCheck: wait interrupted", e);
					}
				}
				//返回true 通过
				return true;
			} else {
				//线程让步 但是当前线程还是可以再次执行
				Thread.yield();
			}
		} else {
			//触发规则 不通过
			return false;
		}
	}
}


ParamFlowChecker#passDefaultLocalCheck 方法源码解析

ParamFlowChecker#passDefaultLocalCheck 方法QPS 限流的令牌桶算法的实现,主要逻辑如下:

  • 根据 ResourceWrapper 获取令牌计数器和最新生成的令牌时间计数器,如果两者有一个为空接返回规则通过。
  • 获取规则中的热点参数的令牌阀值,如果令牌阀值为 0 直接返回规则不通过。
  • 判断当前申请的令牌数量是否大于最大令牌(令牌数量+突发流量)数量,如果是返回规则不通过。
  • 获取热点参数的令牌最后更新时间,如果最后更新时间为空,则往令牌计数器中添加令牌并立刻消耗令牌,其实是一个初始化动作,最后返回规则通过
  • 如果有最后一次令牌的更新时间,计算最后一次更新时间和当前时间的差值,用计算出来的差值和规则的时间窗口比较大小,如果差值大于时间窗口,则去令牌计数器中获取令牌数量,如果获取的令牌数量为空,表示在这段时间还没有获取过,规则可以通过并更新令牌时间计数器,如果有获取当令牌,则表示这段时间其他线程有来获取过令牌,需要计算剩余的令牌是否大于等于当前申请的令牌数量,如果大于等于则令牌申请成功规则通过,并更新令牌的最后获取时间,否则返回规则不通过。
  • 如果有最后一次令牌的更新时间和当前时间的差值小于规则的时间窗口,获取热点参数旧的令牌数也就是当前时间窗口剩下的令牌数量,如果这个数量大于等于则令牌申请成功规则通过,并更新令牌的最后获取时间,否则返回规则不通过。
//com.alibaba.csp.sentinel.slots.block.flow.param.ParamFlowChecker#passDefaultLocalCheck
static boolean passDefaultLocalCheck(ResourceWrapper resourceWrapper, ParamFlowRule rule, int acquireCount,
									 Object value) {
	//根据资源获取指标
	ParameterMetric metric = getParameterMetric(resourceWrapper);
	//获取规则令牌计数器
	CacheMap<Object, AtomicLong> tokenCounters = metric == null ? null : metric.getRuleTokenCounter(rule);
	//获取规则最新生产的令牌时间计数器
	CacheMap<Object, AtomicLong> timeCounters = metric == null ? null : metric.getRuleTimeCounter(rule);
	//为空判断
	if (tokenCounters == null || timeCounters == null) {
		//有一个为空 就返回true
		return true;
	}

	// Calculate max token count (threshold)
	//获取热门参数集合
	Set<Object> exclusionItems = rule.getParsedHotItems().keySet();
	//获取规则中的令牌数量
	long tokenCount = (long)rule.getCount();
	if (exclusionItems.contains(value)) {
		//获取热点参数的令牌数量
		tokenCount = rule.getParsedHotItems().get(value);
	}
	//令牌数量为0判断
	if (tokenCount == 0) {
		//令牌数量为0 直接返回false
		return false;
	}
	//令牌数量+突发流量
	long maxCount = tokenCount + rule.getBurstCount();
	if (acquireCount > maxCount) {
		//acquireCount 申请的令牌数量>最大令牌数量 返回 false
		return false;
	}

	while (true) {
		//获取当前时间
		long currentTime = TimeUtil.currentTimeMillis();
		//获取热点参数的最后令牌更新时间  存在更新 不存在加入
		AtomicLong lastAddTokenTime = timeCounters.putIfAbsent(value, new AtomicLong(currentTime));
		if (lastAddTokenTime == null) {
			// Token never added, just replenish the tokens and consume {@code acquireCount} immediately.
			//资源之前没有令牌 就添加令牌 并立刻消耗令牌
			tokenCounters.putIfAbsent(value, new AtomicLong(maxCount - acquireCount));
			return true;
		}

		// Calculate the time duration since last token was added.
		//计算当前时间和添加最后一个令牌的时间的差值
		long passTime = currentTime - lastAddTokenTime.get();
		// A simplified token bucket algorithm that will replenish the tokens only when statistic window has passed.
		//rule.getDurationInSec() 时间窗口 默认 1秒
		//当前时间和添加最后一个令牌的时间的差值 是否大于 规则时间窗口
		if (passTime > rule.getDurationInSec() * 1000) {
			//当前时间和添加最后一个令牌的时间的差值 大于时间窗口
			//超过时间窗口了 能获取的最大令牌数为 maxCount
			AtomicLong oldQps = tokenCounters.putIfAbsent(value, new AtomicLong(maxCount - acquireCount));
			if (oldQps == null) {
				// Might not be accurate here.
				//缓存里面没有旧的令牌 就认为令牌获取成功 然后设置最后一次获取令牌时间
				lastAddTokenTime.set(currentTime);
				return true;
			} else {
				//已经有令牌了 可能是其他线程申请的令牌 需要判断本次是否能够获取到令牌
				//获取剩余令牌数量
				long restQps = oldQps.get();
				//计算新产生的令牌数量  tokenCount是每个时间窗口的令牌数量
				long toAddCount = (passTime * tokenCount) / (rule.getDurationInSec() * 1000);
				//计算新的令牌剩余数量
				long newQps = toAddCount + restQps > maxCount ? (maxCount - acquireCount)
					: (restQps + toAddCount - acquireCount);
				//判断令牌剩余数量
				if (newQps < 0) {
					//令牌剩余数量小于0 表示已有令牌量不足以满足此次申请的令牌数量 返回 false
					return false;
				}
				//更新剩余令牌数量
				if (oldQps.compareAndSet(restQps, newQps)) {
					//更新最后获取令牌时间
					lastAddTokenTime.set(currentTime);
					//返回通过
					return true;
				}
				//释放线程 CPU
				Thread.yield();
			}
		} else {
			//当前时间和最后一次获取令牌时间小于时间窗口
			//根据 value 获取 旧的令牌
			AtomicLong oldQps = tokenCounters.get(value);
			//旧的令牌 空判断
			if (oldQps != null) {
				//旧的令牌不为空 可有是其他线程获取的
				//获取已有的令牌数量
				long oldQpsValue = oldQps.get();
				if (oldQpsValue - acquireCount >= 0) {
					//已有的令牌数量-本次需要获取的令牌数量 大于等于0 表示可以获取到令牌
					if (oldQps.compareAndSet(oldQpsValue, oldQpsValue - acquireCount)) {
						//更新令牌数量 返回 true
						return true;
					}
				} else {
					//已有的令牌数量-本次需要获取的令牌数量小于 0 返回false 获取令牌失败
					return false;
				}
			}
			//释放线程 CPU
			Thread.yield();
		}
	}
}


欢迎提出建议及对错误的地方指出纠正。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1982263.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

实时数仓分层架构详解

首先&#xff0c;我们从数据仓库说起。 数据仓库的概念可以追溯到20世纪80年代&#xff0c;当时IBM的研究人员提出了商业数据仓库的概念。数据仓库概念的提出&#xff0c;是为了解决和数据流相关的各种问题&#xff0c;特别是多重数据复制带来的高成本问题。 数据仓库之父Bill …

敏捷与DevOps有什么不同?

在软件开发领域&#xff0c;敏捷和DevOps因其对效率、协作和交付高质量产品的关注而受到欢迎。尽管它们有不同的目标&#xff0c;敏捷和DevOps常常被交替使用。本文旨在阐明这些方法之间的区别和共性&#xff0c;展示它们如何无缝协同以产生结果。 图源自Browser Stack 了解敏捷…

uniapp开发微信小程序出现【错误: xx.js 已被代码依赖分析忽略,无法被其他模块引用】解决办法

报错信息 VM5208:9 app.js错误:Error: config.js 已被代码依赖分析忽略&#xff0c;无法被其他模块引用。你可根据控制台中的【代码依赖分析】告警信息修改代码&#xff0c;或关闭【过滤无依赖文件】功能。详情请查看&#xff1a;https://developers.weixin.qq.com/community/…

滑台模组如何满足特定生产需求?

滑台模组是一种可以进行近乎直线运动的自动化设备&#xff0c;用于将物体沿着平面上的轨道滑动。由导轨、传动结构和导向结构等组成。滑台模组可以通过电机驱动、气动驱动或液压驱动等方式进行移动。滑台模组的结构紧凑、操作简便&#xff0c;具有高精度、高稳定性和高可靠性的…

dijkstra其实是bfs?--重新定义dijkstra

dijkstra其实是bfs?--重新定义dijkstra 1前言2最短路径问题3没有边权的最短路--bfs算法4边权的加入5优先队列与dijkstra6后记 1前言 本文将介绍dijkstra算法全新的理解方式 建议新手对dijkstra有建议了解&#xff0c;强烈推荐这篇文章&#xff0c;无比详细 2最短路径问题 最…

sql注入sqli-labs第二-四关

目录 sql注入sqli-labs第二关 1、了解表的列数 2、连表查询 3、注入管理员账号密码 sql注入sqli-labs第三关 1、逃脱单引号&#xff0c;括号 ​编辑 2、了解表的列数 3、连表查询 4、注入管理员账号密码 sql注入sqli-labs第四关 1、逃脱双引号&#xff0c;括号 2、了…

3GPP入门

官网地址 3GPP – The Mobile Broadband Standard 协议下载链接 Directory Listing /ftp/specs/archive 总纲 重点series Signalling protocols ("stage 3") - user equipment to network24 series信令Radio aspects25 series3G 基础LTE (Evolved UTRA), LTE-Adva…

RCNA | RGOS日常管理和Windows常用网络命令

RCNA | RGOS日常管理和Windows常用网络命令 一、RGOS日常管理操作1. RGOS平台平台概述2. 常用登陆方式3. CLI模式 二、Windows常用网络命令1. ICMP协议2. Ping命令3. Tracert命令4. Windows其他命令 一、RGOS日常管理操作 RGOS操作系统最主要的三大特性是模块化、安全性、开放性…

anaconda下载库的方法

首先打开anaconda prompt&#xff08;桌面搜索&#xff09;&#xff0c;输入 conda activate &#xff08;项目名字&#xff09;然后pip install

ARMxy工控机使用Node-Red教程:开发环境、应用场景(1)

开发环境 Windows 开发环境&#xff1a;Windows 7 64bit 、Windows 10 64bit Linux 开发环境&#xff1a;Ubuntu18.04.4 64bit U-Boot&#xff1a;U-Boot 2018 Kernel &#xff1a;Linux-4.9.170 LinuxSDK&#xff1a;LinuxSDK-[版本号].tar.gz&#xff08;基于全志官方&a…

最新!2024年—华为认证HCIA考试报名攻略分享

HCIA HCIA是华为初级认证。HCIA认证定位于中小型网络的设计、实施和维护&#xff0c;也是三种级别认证中最初级的认证。 HCIA方向 HCIA认证条件 无 HCIA认证考试 考试代码: H12-811 考试类型: 笔试&#xff08;一科&#xff09; 试卷题型: 单选题、多选题、判断题、填空题…

DevOps 的起源

注&#xff1a;机翻&#xff0c;未校。 The Origins of DevOps: What’s in a Name? As DevOps prepares for its second decade of existence, it might be worth a stroll down memory lane to revisit the origins of DevOps methods—and even the term itself. 随着DevO…

时光不等人:java每日一练

题目 选自牛客网 1.final方法等同于private方法。&#xff08; &#xff09; A.正确 B.错误 正确答案&#xff1a;B final方法和private方法并不等同。final修饰的方法表示该方法不能被子类覆盖&#xff08;override&#xff09;&#xff0c;但仍然可以被访问。而private修饰…

Redis02——缓存(缓存更新策略、缓存穿透、缓存雪崩、缓存击穿、缓存工具封装)

目录 缓存概念 添加Redis缓存 业务场景 缓存作用模型 java代码 缓存更新策略 主动更新的三种策略 主动更新——Cache Aside Pattern 实际应用 缓存穿透 概念 解决方法 实际应用 缓存雪崩 概念 解决方法 缓存击穿 互斥锁 介绍 实际应用 逻辑过期 介绍 实际…

【单片机毕业设计选题24101】-基于单片机的车载事故报警系统

系统功能: 系统上电后&#xff0c;OLED显示“欢迎使用请稍后”两秒后显示“Wait SIM900A”, SIM900A模块初始化OK后进入正常界面显示。 第一行显示采集到的温湿度值 第二行显示系统状态&#xff08;OK或Alarm&#xff09; 第三行显示经度值 第四行显示纬度值 注意经纬度信…

dll文件丢失怎么恢复?超简单的5个方法,1分钟搞定dll文件修复!

DLL&#xff0c;或称动态链接库&#xff0c;是一种重要的文件类型&#xff0c;包含了一系列用于运行几乎所有程序的指令&#xff0c;这些程序在win11、win10、win8和win7系统中都广泛使用。如果Windows操作系统中的dll文件丢失&#xff0c;您可能无法正常启动所需的程序或应用。…

劳易测高防护等级的读码系统提升仓储效率

在现代物流与仓储管理领域&#xff0c;条码识别和数据交换系统已经成为实现智能仓储管理的关键技术。面对特定的工业环境挑战&#xff0c;比如腐蚀性气雾等恶劣条件&#xff0c;具备高防护等级和抗腐蚀能力的条码系统显得尤为重要。今天&#xff0c;小易将为您带来创新的解决方…

Zabbix中文乱码问题解决方案

WinR打开运行&#xff0c;输入fonts&#xff0c;回车进入Windows字体目录&#xff0c;找到微软雅黑-常规字体&#xff0c;复制出来将文件名修改为msyh.ttf&#xff0c;注意后缀ttf 将msyh.ttf上传到服务器zabbix字体目录中&#xff1a;/usr/share/zabbix/fonts/ 注意文件权限 …

代码随想录训练营第五十二天 孤岛的总面积

第一题&#xff1a;孤岛的总面积 第二题&#xff1a;沉没孤岛 思路&#xff1a; 将所有在边界的岛屿所在的visited数组位置都置为true&#xff0c;剩下的visited[i][j] true && grid[i][j] 1的位置就是孤岛&#xff0c;将其置为1即可。 代码如下 #include <io…

WiFi to Ethernet: 树莓派共享无线连接至有线网口,自动通过Captive Poartal网页登录认证

物联网开发系列&#xff1a;物联网开发之旅① WiFi to Ethernet: 树莓派共享无线连接至有线网口&#xff0c;自动通过Captive Poartal验证物联网开发番外篇之 Captive Portal验证原理 文章目录 背景实现工具实现细节一、将无线连接共享到以太网1. 配置静态IP地址2. 启用IP转发3…