前言:
上一篇我对 Sentinel 中的 AuthoritySlot、SystemSlot、GatewayFlowSlot 的相关源码进行了分析,本篇我们开始分析 ParamFlowSlot 相关的源码。
Sentinel 系列文章传送门:
Sentinel 初步认识及使用
Sentinel 核心概念和工作流程详解
Spring Cloud 整合 Nacos、Sentinel、OpenFigen 实战【微服务熔断降级实战】
Sentinel 源码分析入门【Entry、Chain、Context】
Sentine 源码分析之–NodeSelectorSlot、ClusterBuilderSlot、StatisticSlot
Sentine 源码分析之–AuthoritySlot、SystemSlot、GatewayFlowSlot
ParamFlowSlot
ParamFlowSlot 是负责热点参数限流规则,我们在 Sentinel 控制台配置限流规则如下:
当前规则是以 QPS 限流。
- 单机阀值:就是最大令牌数量 maxCount。
- 统计窗口时长:就是统计时长 duration。
ParamFlowSlot#entry 方法源码解析
ParamFlowSlot#entry 方法逻辑非常简单,先判断当前规则是否有热点参数限流规则,如果没有直接进入下一个 Slot,如果有限流规则,则执行限流规则判断后进入下一个 Slot,我们重点分析执行限流规则源码。
//com.alibaba.csp.sentinel.slots.block.flow.param.ParamFlowSlot#entry
@Override
public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count,
boolean prioritized, Object... args) throws Throwable {
//当前资源是否设置热点限流规则
if (!ParamFlowRuleManager.hasRules(resourceWrapper.getName())) {
//没有设置热点限流规则 直接放行 进入下一个 slot
fireEntry(context, resourceWrapper, node, count, prioritized, args);
return;
}
//热点规则判断
checkFlow(resourceWrapper, count, args);
//进入下一个 slot
fireEntry(context, resourceWrapper, node, count, prioritized, args);
}
ConfigCacheService#dumpBeta 方法源码解析
ParamFlowSlot#checkFlow 方法主要做了一下几件事:
- 校验参数及规则,如果参数为空或者没有设置热点参数规则,则直接返回。
- 获取当前资源的热点规则进行遍历处理。
- 计算热点规则参数的索引
- 初始化参数指标
- 进行热点参数规则校验,规则校验通过不做处理,规则校验不通过则抛出异常,会被 StatisticSlot 捕获。
//com.alibaba.csp.sentinel.slots.block.flow.param.ParamFlowSlot#checkFlow
void checkFlow(ResourceWrapper resourceWrapper, int count, Object... args) throws BlockException {
//参数为空判断
if (args == null) {
return;
}
//当前资源是否设置热点规则
if (!ParamFlowRuleManager.hasRules(resourceWrapper.getName())) {
return;
}
//获取当前资源的热点参数规则
List<ParamFlowRule> rules = ParamFlowRuleManager.getRulesOfResource(resourceWrapper.getName());
//遍历规则
for (ParamFlowRule rule : rules) {
//计算热点规则参数的索引
applyRealParamIdx(rule, args.length);
// Initialize the parameter metrics.
//初始化参数指标
ParameterMetricStorage.initParamMetricsFor(resourceWrapper, rule);
//规则校验
if (!ParamFlowChecker.passCheck(resourceWrapper, rule, count, args)) {
//没有通过规则校验
//触发参数 也就是热点参数
String triggeredParam = "";
//判断 参数的长度是否大于 规则中的参数索引
if (args.length > rule.getParamIdx()) {
//热点规则赋值
Object value = args[rule.getParamIdx()];
triggeredParam = String.valueOf(value);
}
//抛出异常 ParamFlowException 集成了 BlockException 会被 StatisticSlot 的 catch 捕获
throw new ParamFlowException(resourceWrapper.getName(), triggeredParam, rule);
}
}
}
ParamFlowSlot#applyRealParamIdx 方法源码解析
ParamFlowSlot#applyRealParamIdx 主要是处理参数索引不规范的情况,例如参数索引为负数。
//com.alibaba.csp.sentinel.slots.block.flow.param.ParamFlowSlot#applyRealParamIdx
void applyRealParamIdx(/*@NonNull*/ ParamFlowRule rule, int length) {
//获取参数的索引值
int paramIdx = rule.getParamIdx();
//是否小于0
if (paramIdx < 0) {
if (-paramIdx <= length) {
//设置索引
rule.setParamIdx(length + paramIdx);
} else {
// Illegal index, give it a illegal positive value, latter rule checking will pass.
//非法索引 给一个负数 后面检查的时候会通过
rule.setParamIdx(-paramIdx);
}
}
}
ParameterMetricStorage#initParamMetricsFor 方法源码解析
ParameterMetricStorage#initParamMetricsFor 方法主要是创建参数统计 ParameterMetric 指标,然后根据资源名称添加到缓存 metricsMap中,然后执行初始化操作。
//com.alibaba.csp.sentinel.slots.block.flow.param.ParameterMetricStorage#initParamMetricsFor
public static void initParamMetricsFor(ResourceWrapper resourceWrapper, /*@Valid*/ ParamFlowRule rule) {
//资源判断
if (resourceWrapper == null || resourceWrapper.getName() == null) {
return;
}
//获取资源名称
String resourceName = resourceWrapper.getName();
//参数指标
ParameterMetric metric;
// Assume that the resource is valid.
//指标为空判断
if ((metric = metricsMap.get(resourceName)) == null) {
//指标为空 加锁
synchronized (LOCK) {
//再次指标为空判断 double check
if ((metric = metricsMap.get(resourceName)) == null) {
//创建指标
metric = new ParameterMetric();
//加入指标缓存中
metricsMap.put(resourceWrapper.getName(), metric);
RecordLog.info("[ParameterMetricStorage] Creating parameter metric for: " + resourceWrapper.getName());
}
}
}
//指标初始化
metric.initialize(rule);
}
ParameterMetric#initialize 方法源码解析
ParameterMetric#initialize 方法主要初始化了三个 Map,具体如下:
- ruleTimeCounters:记录令牌桶的最后添加时间,用于QPS限流。
- ruleTokenCounter :记录令牌桶的令牌数量,用于QPS限流。
- threadCountMap :用于记录线程数量。
//com.alibaba.csp.sentinel.slots.block.flow.param.ParameterMetric#initialize
public void initialize(ParamFlowRule rule) {
//ruleTimeCounters 记录热点参数上次添加令牌的时间
//ruleTimeCounters 是否包括 规则 rule
if (!ruleTimeCounters.containsKey(rule)) {
//加锁
synchronized (lock) {
//判断 rule 在 ruleTimeCounters 中是否为 null
if (ruleTimeCounters.get(rule) == null) {
//为空
long size = Math.min(BASE_PARAM_MAX_CAPACITY * rule.getDurationInSec(), TOTAL_MAX_CAPACITY);
//加入 ruleTimeCounters
ruleTimeCounters.put(rule, new ConcurrentLinkedHashMapWrapper<Object, AtomicLong>(size));
}
}
}
//ruleTokenCounter 记录热点参数剩余的令牌数
//ruleTokenCounter 是否包含 rule
if (!ruleTokenCounter.containsKey(rule)) {
//加锁
synchronized (lock) {
//rule 在 ruleTokenCounter 是否为 null
if (ruleTokenCounter.get(rule) == null) {
//为空
long size = Math.min(BASE_PARAM_MAX_CAPACITY * rule.getDurationInSec(), TOTAL_MAX_CAPACITY);
//加入 ruleTokenCounter
ruleTokenCounter.put(rule, new ConcurrentLinkedHashMapWrapper<Object, AtomicLong>(size));
}
}
}
//threadCountMap 记录线程数量 用于线程级别限流
//threadCountMap 是否包含热点参数索引
if (!threadCountMap.containsKey(rule.getParamIdx())) {
//加锁
synchronized (lock) {
//为空判断
if (threadCountMap.get(rule.getParamIdx()) == null) {
//加入 threadCountMap
threadCountMap.put(rule.getParamIdx(),
new ConcurrentLinkedHashMapWrapper<Object, AtomicInteger>(THREAD_COUNT_MAX_CAPACITY));
}
}
}
}
private final Map<ParamFlowRule, CacheMap<Object, AtomicLong>> ruleTimeCounters = new HashMap<>();
private final Map<ParamFlowRule, CacheMap<Object, AtomicLong>> ruleTokenCounter = new HashMap<>();
private final Map<Integer, CacheMap<Object, AtomicInteger>> threadCountMap = new HashMap<>();
ParamFlowChecker#passCheck 方法源码解析
ParamFlowChecker#passCheck 方法主要做了两件事,如下:
- 对参数值及参数索引进行判断,是否为空及符合标准。
- 判断规则的模式,根据判断结果去执行集群模式的逻辑或者单机模式的逻辑。
//com.alibaba.csp.sentinel.slots.block.flow.param.ParamFlowChecker#passCheck
public static boolean passCheck(ResourceWrapper resourceWrapper, /*@Valid*/ ParamFlowRule rule, /*@Valid*/ int count,
Object... args) {
//参数为空判断
if (args == null) {
return true;
}
//获取索引
int paramIdx = rule.getParamIdx();
//参数长度小于参数索引 直接返回
if (args.length <= paramIdx) {
return true;
}
// Get parameter value.
//获取参数值
Object value = args[paramIdx];
// Assign value with the result of paramFlowKey method
//参数值是否是 ParamFlowArgument 类型
if (value instanceof ParamFlowArgument) {
value = ((ParamFlowArgument) value).paramFlowKey();
}
// If value is null, then pass
//参数值为 null 直接返回
if (value == null) {
return true;
}
//是否集群模式
if (rule.isClusterMode() && rule.getGrade() == RuleConstant.FLOW_GRADE_QPS) {
//集群模式规则校验
return passClusterCheck(resourceWrapper, rule, count, value);
}
//非集群模式规则校验
return passLocalCheck(resourceWrapper, rule, count, value);
}
ParamFlowChecker#passClusterCheck 方法源码解析
ParamFlowChecker#passClusterCheck 该方法是集群模式的热点参数规则判断,具体如下:
- 获取集群的客户端或者服务端的 TokenService,优先获取客户端,如果 TokenService 为空,则执行 ParamFlowChecker#fallbackToLocalOrPass 方法。
- 请求客户端或服务端进行进行热点参数规则校验,并处理校验结果。
- 如果这个过程发生异常也,则执行 ParamFlowChecker#fallbackToLocalOrPass 方法。
//com.alibaba.csp.sentinel.slots.block.flow.param.ParamFlowChecker#passClusterCheck
private static boolean passClusterCheck(ResourceWrapper resourceWrapper, ParamFlowRule rule, int count,
Object value) {
try {
//参数值转换为集合
Collection<Object> params = toCollection(value);
//获取集群客户端或者服务端 优先客户端
TokenService clusterService = pickClusterService();
if (clusterService == null) {
//没有获取到集群客户端和服务端 回退到本地或者通过
// No available cluster client or server, fallback to local or
// pass in need.
return fallbackToLocalOrPass(resourceWrapper, rule, count, params);
}
//请求令牌规则校验
TokenResult result = clusterService.requestParamToken(rule.getClusterConfig().getFlowId(), count, params);
switch (result.getStatus()) {
case TokenResultStatus.OK:
//返回 ok 通过
return true;
case TokenResultStatus.BLOCKED:
//返回 false 不通过
return false;
default:
//默认 回退到本地或者通过
return fallbackToLocalOrPass(resourceWrapper, rule, count, params);
}
} catch (Throwable ex) {
RecordLog.warn("[ParamFlowChecker] Request cluster token for parameter unexpected failed", ex);
//异常也是 回退到本地或者通过
return fallbackToLocalOrPass(resourceWrapper, rule, count, value);
}
}
ParamFlowChecker#fallbackToLocalOrPass 方法源码解析
ParamFlowChecker#fallbackToLocalOrPass 方法的逻辑十分简单,就是判断是执行单机模式模式的热点规则校验,默认是直接通过规则,而不执行单机模式的热点规则校验。
//com.alibaba.csp.sentinel.slots.block.flow.param.ParamFlowChecker#fallbackToLocalOrPass
private static boolean fallbackToLocalOrPass(ResourceWrapper resourceWrapper, ParamFlowRule rule, int count,
Object value) {
//判断规则配置 失败时候是否回退到本地 也就是单机模式
//private boolean fallbackToLocalWhenFail = false;
if (rule.getClusterConfig().isFallbackToLocalWhenFail()) {
//回退到本地
return passLocalCheck(resourceWrapper, rule, count, value);
} else {
// The rule won't be activated, just pass.
//直接通过
return true;
}
}
ParamFlowChecker#passLocalCheck 方法源码解析
ParamFlowChecker#passLocalCheck 方法的作用主要是判断参数值的类型,然后调用 ParamFlowChecker#passSingleValueCheck 方法进行规则校验。
//com.alibaba.csp.sentinel.slots.block.flow.param.ParamFlowChecker#passLocalCheck
private static boolean passLocalCheck(ResourceWrapper resourceWrapper, ParamFlowRule rule, int count,
Object value) {
try {
if (Collection.class.isAssignableFrom(value.getClass())) {
//集合
for (Object param : ((Collection)value)) {
//热点参数检查
if (!passSingleValueCheck(resourceWrapper, rule, count, param)) {
return false;
}
}
} else if (value.getClass().isArray()) {
//数组
int length = Array.getLength(value);
for (int i = 0; i < length; i++) {
Object param = Array.get(value, i);
//热点参数检查
if (!passSingleValueCheck(resourceWrapper, rule, count, param)) {
return false;
}
}
} else {
//object
//热点参数检查
return passSingleValueCheck(resourceWrapper, rule, count, value);
}
} catch (Throwable e) {
RecordLog.warn("[ParamFlowChecker] Unexpected error", e);
}
return true;
}
ParamFlowChecker#passSingleValueCheck 方法源码解析
ParamFlowChecker#passSingleValueCheck 方法主要做了一下几件事:
- 对限流规则类型进行区分(有 QPS 限流和线程数限流两种类型)。
- 对于 QPS 限流会对限流的算法进行区分(有漏桶算法和令牌桶算法)。
- 对于线程数限流会对比当前参数的已使用线程数和阀值进行比较,返回比较结果。
//com.alibaba.csp.sentinel.slots.block.flow.param.ParamFlowChecker#passSingleValueCheck
static boolean passSingleValueCheck(ResourceWrapper resourceWrapper, ParamFlowRule rule, int acquireCount,
Object value) {
if (rule.getGrade() == RuleConstant.FLOW_GRADE_QPS) {
//QPS 限流
//private int controlBehavior = RuleConstant.CONTROL_BEHAVIOR_DEFAULT;
//public static final int CONTROL_BEHAVIOR_DEFAULT = 0
if (rule.getControlBehavior() == RuleConstant.CONTROL_BEHAVIOR_RATE_LIMITER) {
//匀速限流 漏桶算法
return passThrottleLocalCheck(resourceWrapper, rule, acquireCount, value);
} else {
//默认限流 令牌桶算法
return passDefaultLocalCheck(resourceWrapper, rule, acquireCount, value);
}
} else if (rule.getGrade() == RuleConstant.FLOW_GRADE_THREAD) {
//线程限流
//热点参数集合
Set<Object> exclusionItems = rule.getParsedHotItems().keySet();
//获取参数 value 线程数
long threadCount = getParameterMetric(resourceWrapper).getThreadCount(rule.getParamIdx(), value);
if (exclusionItems.contains(value)) {
//热点参数包含 value
//获取 value 的规则阀值
int itemThreshold = rule.getParsedHotItems().get(value);
//比较线程参数和规则阀值
return ++threadCount <= itemThreshold;
}
//获取规则阀值
long threshold = (long)rule.getCount();
//比较线程参数和规则阀值
return ++threadCount <= threshold;
}
return true;
}
ParamFlowChecker#passThrottleLocalCheck 方法源码解析
ParamFlowChecker#passThrottleLocalCheck 方法是 QPS 限流的漏桶算法的实现,主要逻辑如下:
- 根据 ResourceWrapper 获取令牌计数器,如果令牌计数器为空直接返回规则通过。
- 获取规则中的热点参数的令牌阀值,如果令牌阀值为 0 直接返回规则不通过。
- 当前请求的令牌数量数量需要多久生成,也就是当前请求通过需要消耗的时间。
- 初始化当前热点参数的时间计数器,如果返回的时间计数器为空,表示第一次访问,直接返回规则通过。
- 计算本次请求通过期望的时间,也就是上次请求通过的时间加上本次请求的令牌的生成时间。
- 比较本次请求通过的期望时间和当前时间的对比,如果小于当前时间或者期望通过的时间减去当前时间大于规则允许等待的时间,则获取令牌成功允许规则通过,并更新最后通过时间,否则不允许规则通过。
//com.alibaba.csp.sentinel.slots.block.flow.param.ParamFlowChecker#passThrottleLocalCheck
static boolean passThrottleLocalCheck(ResourceWrapper resourceWrapper, ParamFlowRule rule, int acquireCount,
Object value) {
//获取参数指标
ParameterMetric metric = getParameterMetric(resourceWrapper);
//根据rule 获得最后添加令牌的时间记录map
CacheMap<Object, AtomicLong> timeRecorderMap = metric == null ? null : metric.getRuleTimeCounter(rule);
//从令牌桶中获取的 map 为空 返回 true 通过
if (timeRecorderMap == null) {
return true;
}
// Calculate max token count (threshold)
//获取热门参数key
Set<Object> exclusionItems = rule.getParsedHotItems().keySet();
//获取规则中的令牌数 阀值
long tokenCount = (long)rule.getCount();
if (exclusionItems.contains(value)) {
//热门参数包含 value
//获取热点参数中的令牌数量
tokenCount = rule.getParsedHotItems().get(value);
}
if (tokenCount == 0) {
//令牌数量为 0 返回不通过
return false;
}
//根据rule配置的每多少秒可以通过多少请求来计算出一个请求需要多少毫秒 也就是当前请求的令牌数量数量需要多久生成
getDurationInSec() 统计窗口时间长度 默认1秒
//private long durationInSec = 1;
long costTime = Math.round(1.0 * 1000 * acquireCount * rule.getDurationInSec() / tokenCount);
while (true) {
//获取当前时间
long currentTime = TimeUtil.currentTimeMillis();
//timeRecorderMap 中存在 value 的记录替换 不存在 新增 并返回之前的记录 也就是初始化或者更新时间记录器
AtomicLong timeRecorder = timeRecorderMap.putIfAbsent(value, new AtomicLong(currentTime));
if (timeRecorder == null) {
//之前的记录为空 表示是第一次访问 返回 true 通过
return true;
}
//AtomicLong timeRecorder = timeRecorderMap.get(value);
//上次请求通过的时间 也就是上次请求令牌的时间
long lastPassTime = timeRecorder.get();
//期望的本次请求通过的时间 costTime 一个请求通过需要的时间
long expectedTime = lastPassTime + costTime;
if (expectedTime <= currentTime || expectedTime - currentTime < rule.getMaxQueueingTimeMs()) {
//期望的下次请求通过的时间 小于当前时间 或者
//期望的下次请求通过的时间 - 当前时间 小于 规则中的排队时间
//从 timeRecorderMap 中获取最后通过时间
AtomicLong lastPastTimeRef = timeRecorderMap.get(value);
//cas 修改 最后通过时间为当前时间
if (lastPastTimeRef.compareAndSet(lastPassTime, currentTime)) {
//修改成功 等待时间 等于 期望的下次请求通过的时间 - 当前时间
long waitTime = expectedTime - currentTime;
if (waitTime > 0) {
//等待时间大于0 直接设置为期望通过时间
lastPastTimeRef.set(expectedTime);
try {
//睡眠等待时间
TimeUnit.MILLISECONDS.sleep(waitTime);
} catch (InterruptedException e) {
RecordLog.warn("passThrottleLocalCheck: wait interrupted", e);
}
}
//返回true 通过
return true;
} else {
//线程让步 但是当前线程还是可以再次执行
Thread.yield();
}
} else {
//触发规则 不通过
return false;
}
}
}
ParamFlowChecker#passDefaultLocalCheck 方法源码解析
ParamFlowChecker#passDefaultLocalCheck 方法QPS 限流的令牌桶算法的实现,主要逻辑如下:
- 根据 ResourceWrapper 获取令牌计数器和最新生成的令牌时间计数器,如果两者有一个为空接返回规则通过。
- 获取规则中的热点参数的令牌阀值,如果令牌阀值为 0 直接返回规则不通过。
- 判断当前申请的令牌数量是否大于最大令牌(令牌数量+突发流量)数量,如果是返回规则不通过。
- 获取热点参数的令牌最后更新时间,如果最后更新时间为空,则往令牌计数器中添加令牌并立刻消耗令牌,其实是一个初始化动作,最后返回规则通过
- 如果有最后一次令牌的更新时间,计算最后一次更新时间和当前时间的差值,用计算出来的差值和规则的时间窗口比较大小,如果差值大于时间窗口,则去令牌计数器中获取令牌数量,如果获取的令牌数量为空,表示在这段时间还没有获取过,规则可以通过并更新令牌时间计数器,如果有获取当令牌,则表示这段时间其他线程有来获取过令牌,需要计算剩余的令牌是否大于等于当前申请的令牌数量,如果大于等于则令牌申请成功规则通过,并更新令牌的最后获取时间,否则返回规则不通过。
- 如果有最后一次令牌的更新时间和当前时间的差值小于规则的时间窗口,获取热点参数旧的令牌数也就是当前时间窗口剩下的令牌数量,如果这个数量大于等于则令牌申请成功规则通过,并更新令牌的最后获取时间,否则返回规则不通过。
//com.alibaba.csp.sentinel.slots.block.flow.param.ParamFlowChecker#passDefaultLocalCheck
static boolean passDefaultLocalCheck(ResourceWrapper resourceWrapper, ParamFlowRule rule, int acquireCount,
Object value) {
//根据资源获取指标
ParameterMetric metric = getParameterMetric(resourceWrapper);
//获取规则令牌计数器
CacheMap<Object, AtomicLong> tokenCounters = metric == null ? null : metric.getRuleTokenCounter(rule);
//获取规则最新生产的令牌时间计数器
CacheMap<Object, AtomicLong> timeCounters = metric == null ? null : metric.getRuleTimeCounter(rule);
//为空判断
if (tokenCounters == null || timeCounters == null) {
//有一个为空 就返回true
return true;
}
// Calculate max token count (threshold)
//获取热门参数集合
Set<Object> exclusionItems = rule.getParsedHotItems().keySet();
//获取规则中的令牌数量
long tokenCount = (long)rule.getCount();
if (exclusionItems.contains(value)) {
//获取热点参数的令牌数量
tokenCount = rule.getParsedHotItems().get(value);
}
//令牌数量为0判断
if (tokenCount == 0) {
//令牌数量为0 直接返回false
return false;
}
//令牌数量+突发流量
long maxCount = tokenCount + rule.getBurstCount();
if (acquireCount > maxCount) {
//acquireCount 申请的令牌数量>最大令牌数量 返回 false
return false;
}
while (true) {
//获取当前时间
long currentTime = TimeUtil.currentTimeMillis();
//获取热点参数的最后令牌更新时间 存在更新 不存在加入
AtomicLong lastAddTokenTime = timeCounters.putIfAbsent(value, new AtomicLong(currentTime));
if (lastAddTokenTime == null) {
// Token never added, just replenish the tokens and consume {@code acquireCount} immediately.
//资源之前没有令牌 就添加令牌 并立刻消耗令牌
tokenCounters.putIfAbsent(value, new AtomicLong(maxCount - acquireCount));
return true;
}
// Calculate the time duration since last token was added.
//计算当前时间和添加最后一个令牌的时间的差值
long passTime = currentTime - lastAddTokenTime.get();
// A simplified token bucket algorithm that will replenish the tokens only when statistic window has passed.
//rule.getDurationInSec() 时间窗口 默认 1秒
//当前时间和添加最后一个令牌的时间的差值 是否大于 规则时间窗口
if (passTime > rule.getDurationInSec() * 1000) {
//当前时间和添加最后一个令牌的时间的差值 大于时间窗口
//超过时间窗口了 能获取的最大令牌数为 maxCount
AtomicLong oldQps = tokenCounters.putIfAbsent(value, new AtomicLong(maxCount - acquireCount));
if (oldQps == null) {
// Might not be accurate here.
//缓存里面没有旧的令牌 就认为令牌获取成功 然后设置最后一次获取令牌时间
lastAddTokenTime.set(currentTime);
return true;
} else {
//已经有令牌了 可能是其他线程申请的令牌 需要判断本次是否能够获取到令牌
//获取剩余令牌数量
long restQps = oldQps.get();
//计算新产生的令牌数量 tokenCount是每个时间窗口的令牌数量
long toAddCount = (passTime * tokenCount) / (rule.getDurationInSec() * 1000);
//计算新的令牌剩余数量
long newQps = toAddCount + restQps > maxCount ? (maxCount - acquireCount)
: (restQps + toAddCount - acquireCount);
//判断令牌剩余数量
if (newQps < 0) {
//令牌剩余数量小于0 表示已有令牌量不足以满足此次申请的令牌数量 返回 false
return false;
}
//更新剩余令牌数量
if (oldQps.compareAndSet(restQps, newQps)) {
//更新最后获取令牌时间
lastAddTokenTime.set(currentTime);
//返回通过
return true;
}
//释放线程 CPU
Thread.yield();
}
} else {
//当前时间和最后一次获取令牌时间小于时间窗口
//根据 value 获取 旧的令牌
AtomicLong oldQps = tokenCounters.get(value);
//旧的令牌 空判断
if (oldQps != null) {
//旧的令牌不为空 可有是其他线程获取的
//获取已有的令牌数量
long oldQpsValue = oldQps.get();
if (oldQpsValue - acquireCount >= 0) {
//已有的令牌数量-本次需要获取的令牌数量 大于等于0 表示可以获取到令牌
if (oldQps.compareAndSet(oldQpsValue, oldQpsValue - acquireCount)) {
//更新令牌数量 返回 true
return true;
}
} else {
//已有的令牌数量-本次需要获取的令牌数量小于 0 返回false 获取令牌失败
return false;
}
}
//释放线程 CPU
Thread.yield();
}
}
}
欢迎提出建议及对错误的地方指出纠正。