漏桶算法
漏桶算法介绍
漏桶算法,又称leaky bucket。
从图中我们可以看到,整个算法其实十分简单。首先,我们有一个固定容量的桶,有水流进来,也有水流出去。对于流进来的水来说,我们无法预计一共有多少水会流进来,也无法预计水流的速度。但是对于流出去的水来说,这个桶可以固定水流出的速率。而且,当桶满了之后,多余的水将会溢出。
我们将算法中的水换成实际应用中的请求,我们可以看到漏桶算法天生就限制了请求的速度。当使用了漏桶算法,我们可以保证接口会以一个常速速率来处理请求。所以漏桶算法天生不会出现临界问题。
漏桶算法的实现
漏桶算法具体的实现代码如下:
package com.morris.user.demo;
import lombok.extern.slf4j.Slf4j;
import java.util.Random;
import java.util.concurrent.TimeUnit;
/**
* 漏桶算法
*/
@Slf4j
public class LeakyBucketDemo {
public static void main(String[] args) {
LeakyBucket sideWindow = new LeakyBucket(2);
for (int i = 0; i < 30; i++) {
int finalI = i;
new Thread(() -> {
try {
TimeUnit.MILLISECONDS.sleep(new Random().nextInt(30000));
} catch (InterruptedException e) {
e.printStackTrace();
}
log.info(finalI + "--> " + sideWindow.canPass());
}).start();
}
}
public static class LeakyBucket {
// 桶的容量
private static final long CAPACITY = 10;
// 水漏出的速度(每秒系统能处理的请求数)
private final long rate;
// 当前水量(当前累积请求数)
private double water = 0;
// 上一次请求的时间
private long lastRequestTime = System.currentTimeMillis();
/**
* @param maxCount 1秒内允许的最大请求数
*/
public LeakyBucket(int maxCount) {
// 每秒允许的最大请求数
this.rate = maxCount;
}
public synchronized boolean canPass() {
long now = System.currentTimeMillis();
// 去除从lastRequestTime~now时间段应该从桶中漏出的水
this.water = Math.max(0, water - (now - lastRequestTime) * 1.0D / 1000 * this.rate);
this.lastRequestTime = now;
if (this.water + 1 > CAPACITY) {
// 桶满了
return false;
}
this.water++;
return true;
}
}
}
漏桶算法的实现2
漏桶算法的另一种具体实现代码如下:
package com.morris.user.demo;
import lombok.extern.slf4j.Slf4j;
import java.util.Random;
import java.util.concurrent.TimeUnit;
/**
* 漏桶算法
*/
@Slf4j
public class LeakyBucketDemo2 {
public static void main(String[] args) {
LeakyBucket sideWindow = new LeakyBucket(2);
for (int i = 0; i < 30; i++) {
int finalI = i;
new Thread(() -> {
try {
TimeUnit.MILLISECONDS.sleep(new Random().nextInt(3000));
} catch (InterruptedException e) {
e.printStackTrace();
}
log.info(finalI + "--> " + sideWindow.canPass());
}).start();
}
}
public static class LeakyBucket {
// 上一次请求的时间
private long lastRequestTime = System.currentTimeMillis();
// 两次请求之间的最小时间间隔
private final int interval;
public LeakyBucket(int maxCount) {
this.interval = 1000 / maxCount;
}
public synchronized boolean canPass() {
long now = System.currentTimeMillis();
// 两次请求之间的时间间隔
if (now - lastRequestTime >= interval) {
lastRequestTime = now;
return true;
}
return false;
}
}
}
漏桶算法的优缺点
优点:
- 可以平滑限制请求的处理速度,避免瞬间请求过多导致系统崩溃或者雪崩。
- 可以控制请求的处理速度,使得系统可以适应不同的流量需求,避免过载或者过度闲置。
- 可以通过调整桶的大小和漏出速率来满足不同的限流需求,可以灵活地适应不同的场景。
缺点:
- 需要对请求进行缓存,会增加服务器的内存消耗。
- 对于流量波动比较大的场景,需要较为灵活的参数配置才能达到较好的效果。
- 但是面对突发流量的时候,漏桶算法还是循规蹈矩地处理请求,这不是我们想看到的啦。流量变突发时,我们肯定希望系统尽量快点处理请求,提升用户体验嘛。
漏桶算法的缺点主要是由于服务和漏下来的请求之间没有交互,要么服务太忙,要么服务太闲,这就引出了令牌桶算法。
漏桶算法在sentinel中的应用
漏桶算法主要用于流控规则中流控效果为排队等待。
com.alibaba.csp.sentinel.slots.block.flow.controller.RateLimiterController#canPass(com.alibaba.csp.sentinel.node.Node, int, boolean)
public boolean canPass(Node node, int acquireCount, boolean prioritized) {
// Pass when acquire count is less or equal than 0.
if (acquireCount <= 0) {
return true;
}
// Reject when count is less or equal than 0.
// Otherwise,the costTime will be max of long and waitTime will overflow in some cases.
if (count <= 0) {
return false;
}
// 当前时间
long currentTime = TimeUtil.currentTimeMillis();
// 计算两个请求中间允许的时间间隔
// Calculate the interval between every two requests.
long costTime = Math.round(1.0 * (acquireCount) / count * 1000);
// Expected pass time of this request.
// 下一次请求期待的访问时间
long expectedTime = costTime + latestPassedTime.get();
if (expectedTime <= currentTime) {
// 当前时间大于下一次请求期待的访问时间,允许通过
// Contention may exist here, but it's okay.
latestPassedTime.set(currentTime);
return true;
} else {
// Calculate the time to wait.
// 计算当前线程需要等待的时间
long waitTime = costTime + latestPassedTime.get() - TimeUtil.currentTimeMillis();
if (waitTime > maxQueueingTimeMs) {
// 等待时间大于设置的最大的等待超时时间
return false;
} else {
// latestPassedTime+costTime就是下一次请求期待的访问时间
long oldTime = latestPassedTime.addAndGet(costTime);
try {
waitTime = oldTime - TimeUtil.currentTimeMillis();
if (waitTime > maxQueueingTimeMs) {
// 说明latestPassedTime被另一个线程改了
// 把时间改回去,然后请求不允许通过
latestPassedTime.addAndGet(-costTime);
return false;
}
// in race condition waitTime may <= 0
if (waitTime > 0) {
// 线程休眠,排队等待
Thread.sleep(waitTime);
}
return true;
} catch (InterruptedException e) {
}
}
}
return false;
}