固定窗口算法(计数器法)
算法介绍
计数器法是限流算法里最简单也是最容易实现的一种算法。比如我们规定,对于A接口来说,我们1秒的访问次数不能超过10次。那么我们可以这么做:在一开始的时候,我们可以设置一个计数器counter,每当一个请求过来的时候,counter就加1,如果counter的值大于10并且该请求与第一个请求的间隔时间还在1秒之内,那么说明请求数过多;如果该请求与第一个请求的间隔时间大于1秒,那么就重置counter。
代码实现
固定时间窗口算法的代码实现如下:
package com.morris.user.demo;
import lombok.extern.slf4j.Slf4j;
import java.util.Random;
import java.util.concurrent.TimeUnit;
/**
* 固定时间窗口算法
*/
@Slf4j
public class WindowDemo {
public static void main(String[] args) {
Window window = new Window(2);
for (int i = 0; i < 30; i++) {
int finalI = i;
new Thread(() -> {
try {
TimeUnit.MILLISECONDS.sleep(new Random().nextInt(3000));
} catch (InterruptedException e) {
e.printStackTrace();
}
log.info(finalI + "--> " + window.canPass());
}).start();
}
}
public static class Window {
// 时间窗口默认1s
public static final int WINDOWS_TIME = 1000;
// 计数器
public int qps = 0;
// 窗口的开始时间
public long windowStartTime = System.currentTimeMillis();
// 时间窗口内允许的最大请求数
private final int maxCount;
public Window(int maxCount) {
this.maxCount = maxCount;
}
public synchronized boolean canPass() {
long currentTime = System.currentTimeMillis();
// 重置窗口
if (currentTime - this.windowStartTime > WINDOWS_TIME) {
this.qps = 0;
this.windowStartTime = currentTime;
}
if (this.qps + 1 > this.maxCount) {
return false;
}
this.qps++;
return true;
}
}
}
缺点
固定窗口计数器限流算法实现起来虽然很简单,但是有一个十分致命的问题,那就是临界突刺问题:前一秒的后半段和后一秒的前半段流量集中一起,会出现大量流量。
计数器的限制数量判断是按时间段的,在两个时间段的交界时间点,限制数量的当前值会发生一个抖动的变化,从而使瞬间流量突破了我们期望的限制。例如以下的情况:
可以看到在0:59的时候,如果突然来了100个请求,这时候当前值是100,而到了1:00的时候,因为是下一个时间段了,当前值陡降到0,这时候又进来100个请求,都能通过限流判断,虽然两个时间段平均下来还是没超过限制,但是在临界时间点的请求量却达到了两倍之多,这种情况下就可能压垮我们的系统。
滑动窗口算法
算法介绍
上面会出现突刺的问题其实就在于固定窗口算法的窗口时间跨度太大,且是固定不变的,为了解决突刺的问题,我们就有了滑动窗口计数器限流算法。
滑动窗口算法是固定窗口算法的优化版,主要有两个特点:
- 划分多个小的时间段,各时间段各自进行计数。
- 根据当前时间,动态往前滑动来计算时间窗口范围,合并计算总数。
可以看到,每次时间往后,窗口也会动态往后滑动,从而丢弃一些更早期的计数数据,从而实现总体计数的平稳过度。当滑动窗口划分的格子越多,那么滑动窗口的滑动就越平滑,限流的统计就会越精确。事实上,固定窗口算法就是只划分成一个格子的滑动窗口算法。
为了避免突刺,尽量缩小时间窗口,只要时间窗口足够小,小到只允许一个请求通过,这就是漏桶算法。
代码实现1
滑动时间窗口算法的具体代码实现如下:
package com.morris.user.demo;
import lombok.extern.slf4j.Slf4j;
import java.util.Objects;
import java.util.Random;
import java.util.concurrent.TimeUnit;
/**
* 滑动时间窗口算法
*/
@Slf4j
public class SideWindowDemo {
public static void main(String[] args) {
SideWindow sideWindow = new SideWindow(2);
for (int i = 0; i < 30; i++) {
int finalI = i;
new Thread(() -> {
try {
TimeUnit.MILLISECONDS.sleep(new Random().nextInt(3000));
} catch (InterruptedException e) {
e.printStackTrace();
}
log.info(finalI + "--> " + sideWindow.canPass());
}).start();
}
}
public static class SideWindow {
// 时间窗口内允许的最大请求数
public int maxCount;
// 每个窗口的长度 100ms
public static final int WINDOW_LENGTH = 100;
// 1s划分为10个窗口
private final Window[] array = new Window[10];
public SideWindow(int maxCount) {
this.maxCount = maxCount;
}
public Window currentWindow() {
// 获取当前时间
long now = System.currentTimeMillis();
// 当前窗口的下标
int currentIndex = (int) (now / WINDOW_LENGTH % array.length);
// 获取当前窗口
Window currentWindow = array[currentIndex];
if (Objects.isNull(currentWindow)) {
// 初始化窗口
currentWindow = new Window(now);
array[currentIndex] = currentWindow;
} else if (now - currentWindow.startTime >= WINDOW_LENGTH) {
// 重置窗口
currentWindow.reset(now);
}
return currentWindow;
}
public int qps() {
int qps = 0;
for (Window window : array) {
if(Objects.isNull(window)) {
continue;
}
qps += window.qps;
}
return qps;
}
public synchronized boolean canPass() {
Window currentWindow = currentWindow();
int qps = qps();
if(qps + 1 > maxCount) {
return false;
}
currentWindow.qps++;
return true;
}
}
public static class Window {
// 这个窗口开始的时间
long startTime;
// 这个窗口的QPS
long qps;
public Window(long startTime) {
this.startTime = startTime;
this.qps = 0;
}
public void reset(long startTime) {
this.startTime = startTime;
this.qps = 0;
}
}
}
代码实现2
滑动时间窗口算法的第二种实现具体代码实现如下:
package com.morris.user.demo;
import lombok.extern.slf4j.Slf4j;
import java.util.LinkedList;
import java.util.Random;
import java.util.concurrent.TimeUnit;
/**
* 滑动时间窗口算法
*/
@Slf4j
public class SideWindowDemo2 {
public static void main(String[] args) {
SideWindow sideWindow = new SideWindow(2);
for (int i = 0; i < 30; i++) {
int finalI = i;
new Thread(() -> {
try {
TimeUnit.MILLISECONDS.sleep(new Random().nextInt(3000));
} catch (InterruptedException e) {
e.printStackTrace();
}
log.info(finalI + "--> " + sideWindow.canPass());
}).start();
}
}
public static class SideWindow {
// 统计周期 1000ms
public static final int TIME_WINDOW = 1000;
private static final LinkedList<Long> list = new LinkedList<>();
// 时间窗口内允许的最大请求数
private final int maxCount;
public SideWindow(int maxCount) {
this.maxCount = maxCount;
}
public synchronized boolean canPass() {
// 获取当前时间
long nowTime = System.currentTimeMillis();
// 如果队列还没满,则允许通过,并添加当前时间戳到队列开始位置
if (list.size() < maxCount) {
list.addFirst(nowTime);
return true;
}
// 队列已满(达到限制次数),则获取队列中最早添加的时间戳
Long farTime = list.getLast();
// 用当前时间戳 减去 最早添加的时间戳
if (nowTime - farTime <= TIME_WINDOW) {
// 若结果小于等于timeWindow,则说明在timeWindow内,通过的次数大于count
// 不允许通过
return false;
} else {
// 若结果大于timeWindow,则说明在timeWindow内,通过的次数小于等于count
// 允许通过,并删除最早添加的时间戳,将当前时间添加到队列开始位置
list.removeLast();
list.addFirst(nowTime);
return true;
}
}
}
}
优缺点
优点:
- 简单易懂
- 精度高(通过调整时间窗口的大小来实现不同的限流效果)
- 可扩展性强(可以非常容易地与其他限流算法结合使用)
缺点:突发流量无法处理(无法应对短时间内的大量请求,但是一旦到达限流后,请求都会直接暴力被拒绝。这样我们会损失一部分请求,这其实对于产品来说,并不太友好),需要合理调整时间窗口大小。
滑动窗口限流算法虽然可以保证任意时间窗口内接口请求次数都不会超过最大限流值,但是相对来说对系统的瞬时处理能力还是没有考虑到,无法防止在更细的时间粒度上访问过于集中的问题,例如在同一时刻(同一秒)大量请求涌入,还是可能会超过系统负荷能力。
滑动窗口算法在Sentinel中的使用
滑动窗口算法主要用于流控规则中流控效果为直接拒绝,可用于针对线程数和QPS直接限流。
限流统计数据的使用
在流控规则校验时如果流控效果为直接拒绝,会调用DefaultController.canPass()此方法来校验请求是否允许通过。
com.alibaba.csp.sentinel.slots.block.flow.controller.DefaultController#canPass(com.alibaba.csp.sentinel.node.Node, int, boolean)
public boolean canPass(Node node, int acquireCount, boolean prioritized) {
// 拿到当前时间窗口的线程数或QPS
int curCount = avgUsedTokens(node);
if (curCount + acquireCount > count) {
if (prioritized && grade == RuleConstant.FLOW_GRADE_QPS) {
// prioritized默认为false,不会进入
long currentTime;
long waitInMs;
currentTime = TimeUtil.currentTimeMillis();
waitInMs = node.tryOccupyNext(currentTime, acquireCount, count);
if (waitInMs < OccupyTimeoutProperty.getOccupyTimeout()) {
node.addWaitingRequest(currentTime + waitInMs, acquireCount);
node.addOccupiedPass(acquireCount);
sleep(waitInMs);
// PriorityWaitException indicates that the request will pass after waiting for {@link @waitInMs}.
throw new PriorityWaitException(waitInMs);
}
}
// 请求数大于阈值直接返回false,抛出FlowException
return false;
}
return true;
}
private int avgUsedTokens(Node node) {
if (node == null) {
return DEFAULT_AVG_USED_TOKENS;
}
// 如果是根据线程数限流,取node.curThreadNum()
// 如果是根据QPS限流,取node.passQps()
return grade == RuleConstant.FLOW_GRADE_THREAD ? node.curThreadNum() : (int)(node.passQps());
}
DefaultController.canPass()主要是拿到当前窗口的线程数或QPS,然后进行判断。
QPS的计算是通过rollingCounterInSecond.pass()方法得来的。
com.alibaba.csp.sentinel.node.StatisticNode#passQps
public double passQps() {
return rollingCounterInSecond.pass() / rollingCounterInSecond.getWindowIntervalInSec();
}
pass()会遍历所有的窗口,累加每个窗口的QPS。
com.alibaba.csp.sentinel.slots.statistic.metric.ArrayMetric#pass
public long pass() {
data.currentWindow();
long pass = 0;
List<MetricBucket> list = data.values();
for (MetricBucket window : list) {
pass += window.pass();
}
return pass;
}
限流统计数据的统计时机
在目标方法调用完成之后由sentinel责任链中的StatisticSlot进行统计。
com.alibaba.csp.sentinel.slots.statistic.StatisticSlot#entry
@Override
public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count,
boolean prioritized, Object... args) throws Throwable {
try {
// Do some checking.
fireEntry(context, resourceWrapper, node, count, prioritized, args);
// 目标方法调用完成之后进行统计
// Request passed, add thread count and pass count.
// DefaultNode和ClusterNode 线程数+1
node.increaseThreadNum();
// DefaultNode和ClusterNode QPS+count
node.addPassRequest(count);
if (context.getCurEntry().getOriginNode() != null) {
// Add count for origin node.
// OriginNode 线程数+1
// 为什么不是context.getOriginNode()
// context.getCurEntry().getOriginNode() == context.getOriginNode()
context.getCurEntry().getOriginNode().increaseThreadNum();
// OriginNode QPS+count
context.getCurEntry().getOriginNode().addPassRequest(count);
}
if (resourceWrapper.getEntryType() == EntryType.IN) {
// Add count for global inbound entry node for global statistics.
// 记录整个系统的线程数和QPS,可用于系统规则
Constants.ENTRY_NODE.increaseThreadNum();
Constants.ENTRY_NODE.addPassRequest(count);
}
// 扩展点,回调
// Handle pass event with registered entry callback handlers.
for (ProcessorSlotEntryCallback<DefaultNode> handler : StatisticSlotCallbackRegistry.getEntryCallbacks()) {
handler.onPass(context, resourceWrapper, node, count, args);
}
限流统计数据的核心组件介绍
统计数据的存储与计算是通过StatisticNode来实现的。
public class StatisticNode implements Node {
/**
* Holds statistics of the recent {@code INTERVAL} seconds. The {@code INTERVAL} is divided into time spans
* by given {@code sampleCount}.
*/
private transient volatile Metric rollingCounterInSecond = new ArrayMetric(SampleCountProperty.SAMPLE_COUNT,
IntervalProperty.INTERVAL);
StatisticNode的底层核心是使用ArrayMetric。
public class ArrayMetric implements Metric {
private final LeapArray<MetricBucket> data;
public ArrayMetric(int sampleCount, int intervalInMs) {
this.data = new OccupiableBucketLeapArray(sampleCount, intervalInMs);
}
ArrayMetric底层使用OccupiableBucketLeapArray。
public OccupiableBucketLeapArray(int sampleCount, int intervalInMs) {
// This class is the original "CombinedBucketArray".
super(sampleCount, intervalInMs);
this.borrowArray = new FutureBucketLeapArray(sampleCount, intervalInMs);
}
查看OccupiableBucketLeapArray父类LeapArray的构造方法:
public LeapArray(int sampleCount, int intervalInMs) {
AssertUtil.isTrue(sampleCount > 0, "bucket count is invalid: " + sampleCount);
AssertUtil.isTrue(intervalInMs > 0, "total time interval of the sliding window should be positive");
AssertUtil.isTrue(intervalInMs % sampleCount == 0, "time span needs to be evenly divided");
// 窗口的长度500ms
this.windowLengthInMs = intervalInMs / sampleCount;
// 时间间隔,单位为ms
this.intervalInMs = intervalInMs;
// 时间间隔,单位为s
this.intervalInSecond = intervalInMs / 1000.0;
// 时间窗口个数
this.sampleCount = sampleCount;
// 存放窗口的统计数据
this.array = new AtomicReferenceArray<>(sampleCount);
}
到这里可以看出sentinel将1秒划分为2个时间窗口,每个窗口的长度为500ms。
滑动时间窗口算法的实现
统计数据保存时肯定要先拿到当前时间所在的窗口,然后将数据添加到当前窗口上。
com.alibaba.csp.sentinel.slots.statistic.metric.ArrayMetric#addPass
public void addPass(int count) {
WindowWrap<MetricBucket> wrap = data.currentWindow();
wrap.value().addPass(count);
}
要怎么获得该时间窗口,如何判断该获取哪个时间窗口呢?
com.alibaba.csp.sentinel.slots.statistic.base.LeapArray#currentWindow()
public WindowWrap<T> currentWindow() {
return currentWindow(TimeUtil.currentTimeMillis());
}
public WindowWrap<T> currentWindow(long timeMillis) {
if (timeMillis < 0) {
return null;
}
// 计算当前时间戳在数组中的位置
// timeMillis / windowLengthInMs % array.length()
int idx = calculateTimeIdx(timeMillis);
// 计算当前时间戳所在窗口的开始时间
// Calculate current bucket start time.
// timeMillis - timeMillis % windowLengthInMs
long windowStart = calculateWindowStart(timeMillis);
/*
* Get bucket item at given time from the array.
*
* (1) Bucket is absent, then just create a new bucket and CAS update to circular array.
* (2) Bucket is up-to-date, then just return the bucket.
* (3) Bucket is deprecated, then reset current bucket and clean all deprecated buckets.
*/
while (true) {
WindowWrap<T> old = array.get(idx);
if (old == null) {
/*
* B0 B1 B2 NULL B4
* ||_______|_______|_______|_______|_______||___
* 200 400 600 800 1000 1200 timestamp
* ^
* time=888
* bucket is empty, so create new and update
*
* If the old bucket is absent, then we create a new bucket at {@code windowStart},
* then try to update circular array via a CAS operation. Only one thread can
* succeed to update, while other threads yield its time slice.
*/
// 当前时间戳所在窗口无数据
// 初始化窗口数据
WindowWrap<T> window = new WindowWrap<T>(windowLengthInMs, windowStart, newEmptyBucket(timeMillis));
if (array.compareAndSet(idx, null, window)) {
// Successfully updated, return the created bucket.
return window;
} else {
// Contention failed, the thread will yield its time slice to wait for bucket available.
Thread.yield();
}
} else if (windowStart == old.windowStart()) {
/*
* B0 B1 B2 B3 B4
* ||_______|_______|_______|_______|_______||___
* 200 400 600 800 1000 1200 timestamp
* ^
* time=888
* startTime of Bucket 3: 800, so it's up-to-date
*
* If current {@code windowStart} is equal to the start timestamp of old bucket,
* that means the time is within the bucket, so directly return the bucket.
*/
// 当前时间戳在这个窗口内
return old;
} else if (windowStart > old.windowStart()) {
/*
* (old)
* B0 B1 B2 NULL B4
* |_______||_______|_______|_______|_______|_______||___
* ... 1200 1400 1600 1800 2000 2200 timestamp
* ^
* time=1676
* startTime of Bucket 2: 400, deprecated, should be reset
*
* If the start timestamp of old bucket is behind provided time, that means
* the bucket is deprecated. We have to reset the bucket to current {@code windowStart}.
* Note that the reset and clean-up operations are hard to be atomic,
* so we need a update lock to guarantee the correctness of bucket update.
*
* The update lock is conditional (tiny scope) and will take effect only when
* bucket is deprecated, so in most cases it won't lead to performance loss.
*/
if (updateLock.tryLock()) {
try {
// Successfully get the update lock, now we reset the bucket.
/**
* @see OccupiableBucketLeapArray#resetWindowTo(com.alibaba.csp.sentinel.slots.statistic.base.WindowWrap, long)
*/
// 当前时间戳不在这个窗口内,重置窗口数据
return resetWindowTo(old, windowStart);
} finally {
updateLock.unlock();
}
} else {
// Contention failed, the thread will yield its time slice to wait for bucket available.
Thread.yield();
}
} else if (windowStart < old.windowStart()) {
// Should not go through here, as the provided time is already behind.
return new WindowWrap<T>(windowLengthInMs, windowStart, newEmptyBucket(timeMillis));
}
}
}