写在前面
本文看下流量整形相关算法。
目前流量整形算法主要有三种,计数器,漏桶,令牌桶。分别看下咯!
1:计数器
1.1:描述
单位时间内只允许指定数量的请求,如果是时间区间内超过指定数量,则直接拒绝,如果时间区间结束,则重置计数器,开始下一个时间区间。
1.2:程序
package com.dahuyou.algrithm.triffic.shaper.counter;
import org.junit.Test;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
// 计速器 限速
public class CounterLimiter {
// 起始时间
private static long startTime = System.currentTimeMillis();
// 时间区间的时间间隔 ms
private static long interval = 1000;
// 每interval时间内限制数量
private static long maxCount = 2;
//累加器
private static AtomicLong accumulator = new AtomicLong();
// 计数判断, 是否超出限制
private static long tryAcquire(long taskId, int turn) {
long nowTime = System.currentTimeMillis();
//在时间区间之内
if (nowTime < startTime + interval) {
long count = accumulator.incrementAndGet();
if (count <= maxCount) {
System.out.println("taskId: " + taskId + " 正常执行!");
return count;
} else {
// 返回-1说明时间区间内被限制了
// return -count;
System.out.println("时区内达到次数咯!");
return -1;
}
} else {
//在时间区间之外
synchronized (CounterLimiter.class) {
System.out.println("新时间区到了,taskId:" + taskId + ", turn {}.." + turn);
// 再一次判断,防止重复初始化
if (nowTime > startTime + interval) {
accumulator.set(0);
startTime = nowTime;
}
}
return 0;
}
}
final int threads = 1;
//线程池,用于多线程模拟测试
// private ExecutorService pool = Executors.newFixedThreadPool(10);
private ExecutorService pool = Executors.newFixedThreadPool(threads);
@Test
public void testLimit() {
// 被限制的次数
AtomicInteger limited = new AtomicInteger(0);
// 线程数
// final int threads = 2;
// 每条线程的执行轮数
final int turns = 20;
// 同步器
CountDownLatch countDownLatch = new CountDownLatch(threads);
long start = System.currentTimeMillis();
for (int i = 0; i < threads; i++) {
pool.submit(() ->
{
try {
for (int j = 0; j < turns; j++) {
long taskId = Thread.currentThread().getId();
long index = tryAcquire(taskId, j);
// if (index <= 0) {
if (index == -1) {
// 被限制的次数累积
limited.getAndIncrement();
}
Thread.sleep(200);
}
} catch (Exception e) {
e.printStackTrace();
}
//等待所有线程结束
countDownLatch.countDown();
});
}
try {
countDownLatch.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
float time = (System.currentTimeMillis() - start) / 1000F;
//输出统计结果
System.out.println("限制的次数为:" + limited.get() +
",通过的次数为:" + (threads * turns - limited.get()));
System.out.println("限制的比例为:" + (float) limited.get() / (float) (threads * turns));
System.out.println("运行的时长为:" + time);
}
}
输出:
1.3:优缺点
- 优点
简单
- 缺点
无法处理流量分配不均匀的情况,可能导致大量的请求被拒绝
1.4:适用场景
流量比较平稳业务场景。比如我司的机器人外呼业务,因为是程序在跑,所以流量很稳定,一旦业务配置导致流量增高,则可以使用该算法进行限流。
但对于突发流量场景,可能会因为很短时间内的突发流量就导致计数器达到最大值,从而时间区间内的剩余时间所有请求全部丢弃,这也存在着被攻击的风险。
2:漏桶
2.1:描述
水(对应请求)从进水口进入到漏桶里,漏桶以一定的速度出水(请求放行),当水流入速度过大,桶内的总水量大于桶容量会直接溢出,请求被拒绝,如图所示:
2.2:程序
package com.dahuyou.algrithm.triffic.shaper.counter;
import org.junit.Test;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
// 漏桶 限流
public class LeakBucketLimiter {
// 计算的起始时间
private static long lastOutTime = System.currentTimeMillis();
// 流出速率 每100毫秒漏2次
private static int leakRate = 1;
// private static int leakRate = 2000;
// 桶的容量
private static int capacity = 5;
//剩余的水量
private static AtomicInteger water = new AtomicInteger(0);
//返回值说明:
// false 没有被限制到
// true 被限流
public static synchronized boolean isLimit(long taskId, int turn) {
// 如果是空桶,就当前时间作为漏出的时间
if (water.get() == 0) {
lastOutTime = System.currentTimeMillis();
water.addAndGet(1);
return false;
}
// 执行漏水
// int waterLeaked = ((int) ((System.currentTimeMillis() - lastOutTime) / 1000)) * leakRate;
int waterLeaked = ((int) ((System.currentTimeMillis() - lastOutTime) / 100)) * leakRate;
// 计算剩余水量,当前的量减去漏出去的量就是剩余的量
int waterLeft = water.get() - waterLeaked;
// 要注意:剩余的量最小是0
water.set(Math.max(0, waterLeft));
// 重新更新leakTimeStamp
lastOutTime = System.currentTimeMillis();
// 尝试加水,并且水还未满 ,放行
if ((water.get()) < capacity) {
System.out.println("水未满,成功加水");
water.addAndGet(1);
return false;
} else {
System.out.println("水已满,水溢出");
// 水满,拒绝加水, 限流
return true;
}
}
final int threads = 1;
//线程池,用于多线程模拟测试(负责加水)
private ExecutorService pool = Executors.newFixedThreadPool(threads);
private ExecutorService outWaterPool = Executors.newFixedThreadPool(threads);
@Test
public void testLimit() {
// new Thread(() -> {
// for (int i = 0; i < 1000; i++) {
// if (water.get() > 0) {
// System.out.println("出水了");
// water.decrementAndGet();
// } else {
// System.out.println("无水可出了");
// }
// try {
// TimeUnit.MILLISECONDS.sleep(100);
// } catch (InterruptedException e) {
// e.printStackTrace();
// }
// }
// }).start();
// 被限制的次数
AtomicInteger limited = new AtomicInteger(0);
// 线程数
// final int threads = 2;
// 每条线程的执行轮数
final int turns = 20;
// 线程同步器
CountDownLatch countDownLatch = new CountDownLatch(threads);
long start = System.currentTimeMillis();
for (int i = 0; i < threads; i++) {
pool.submit(() ->
{
try {
for (int j = 0; j < turns; j++) {
long taskId = Thread.currentThread().getId();
boolean intercepted = isLimit(taskId, j);
if (intercepted) {
// 被限制的次数累积
limited.getAndIncrement();
}
Thread.sleep(200);
}
} catch (Exception e) {
e.printStackTrace();
}
//等待所有线程结束
countDownLatch.countDown();
});
}
try {
countDownLatch.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
float time = (System.currentTimeMillis() - start) / 1000F;
//输出统计结果
System.out.println("限制的次数为:" + limited.get() +
",通过的次数为:" + (threads * turns - limited.get()));
System.out.println("限制的比例为:" + (float) limited.get() / (float) (threads * turns));
System.out.println("运行的时长为:" + time);
}
}
运行:
水未满,成功加水
水未满,成功加水
水未满,成功加水
水未满,成功加水
水未满,成功加水
水未满,成功加水
水未满,成功加水
水未满,成功加水
水未满,成功加水
水未满,成功加水
水未满,成功加水
水未满,成功加水
水未满,成功加水
水未满,成功加水
水未满,成功加水
水未满,成功加水
水未满,成功加水
水未满,成功加水
水未满,成功加水
限制的次数为:0,通过的次数为:20
限制的比例为:0.0
运行的时长为:4.136
Process finished with exit code 0
此时因为水流出的速度快于流入的速度,所以,一直可以成功加水,可以修改leakRate=0
,再运行:
水未满,成功加水
水未满,成功加水
水未满,成功加水
水未满,成功加水
水已满,水溢出
水已满,水溢出
水已满,水溢出
水已满,水溢出
水已满,水溢出
水已满,水溢出
水已满,水溢出
水已满,水溢出
水已满,水溢出
水已满,水溢出
水已满,水溢出
水已满,水溢出
水已满,水溢出
水已满,水溢出
水已满,水溢出
限制的次数为:15,通过的次数为:5
限制的比例为:0.75
运行的时长为:4.176
Process finished with exit code 0
就可以看到水满溢出的情况了。
2.3:优缺点
- 优点
可应对突发流量,避免服务被冲垮,从而起到保护服务的作用
- 缺点
因为出口速率固定,所以当服务能力提升时,无法自动匹配后端服务的能力提升
2.4:适用场景
3:令牌桶
3.1:描述
有一个固定容量的令牌桶,按照一定的速率(可以调节)
向令牌桶中放入令牌,请求想要被执行,必须能够从令牌桶中获取到令牌,否则将会被抛弃,参考下图:
3.2:程序
package com.dahuyou.algrithm.triffic.shaper.counter;
//import lombok.extern.slf4j.Slf4j;
import org.junit.Test;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;
// 令牌桶 限速
//@Slf4j
public class TokenBucketLimiter {
// 上一次令牌发放时间
public long lastTime = System.currentTimeMillis();
// 桶的容量
public int capacity = 2;
// 令牌生成速度 /s,如果是调大令牌的生成速度,则服务能力也会得到提高(在服务扛得住的前提下)
public int rate = 2;
// 当前令牌数量
public AtomicInteger tokens = new AtomicInteger(0);
//返回值说明:
// false 没有被限制到
// true 被限流
public synchronized boolean isLimited(long taskId, int applyCount) {
long now = System.currentTimeMillis();
//时间间隔,单位为 ms
long gap = now - lastTime;
//计算时间段内的令牌数
int reverse_permits = (int) (gap * rate / 1000);
int all_permits = tokens.get() + reverse_permits;
// 当前令牌数(固有的令牌加上时间段内新产生的令牌就是当前真实的令牌数啦),
// 因为令牌桶也有固定的数量所以要取下最小值
tokens.set(Math.min(capacity, all_permits));
// log.info("tokens {} capacity {} gap {} ", tokens, capacity, gap);
// System.out.println("tokens " + tokens + " capacity " + capacity + " gap " + gap);
/**
* 如果申请的数量大于可用令牌数,则拒绝,否则发放令牌,执行请求
*/
if (tokens.get() < applyCount) {
System.out.println("没有辣么多令牌啦!!!");
// 若拿不到令牌,则拒绝
// log.info("被限流了.." + taskId + ", applyCount: " + applyCount);
return true;
} else {
System.out.println("令牌拿去撒!!!");
// 还有令牌,领取令牌
tokens.getAndAdd( - applyCount);
lastTime = now;
// log.info("剩余令牌.." + tokens);
return false;
}
}
//线程池,用于多线程模拟测试
private ExecutorService pool = Executors.newFixedThreadPool(10);
@Test
public void testLimit() {
// 被限制的次数
AtomicInteger limited = new AtomicInteger(0);
// 线程数
final int threads = 2;
// 每条线程的执行轮数
final int turns = 20;
// 同步器
CountDownLatch countDownLatch = new CountDownLatch(threads);
long start = System.currentTimeMillis();
for (int i = 0; i < threads; i++) {
pool.submit(() ->
{
try {
for (int j = 0; j < turns; j++) {
long taskId = Thread.currentThread().getId();
boolean intercepted = isLimited(taskId, 1);
if (intercepted) {
// 被限制的次数累积
limited.getAndIncrement();
}
Thread.sleep(200);
}
} catch (Exception e) {
e.printStackTrace();
}
//等待所有线程结束
countDownLatch.countDown();
});
}
try {
countDownLatch.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
float time = (System.currentTimeMillis() - start) / 1000F;
//输出统计结果
System.out.println("限制的次数为:" + limited.get() +
",通过的次数为:" + (threads * turns - limited.get()));
System.out.println("限制的比例为:" + (float) limited.get() / (float) (threads * turns));
System.out.println("运行的时长为:" + time);
}
}
输出:
展示的是既有申请到令牌也有没有申请到令牌的场景,修改代码public int rate = 2000;
给令牌发放一个非常大的速度,此时就会一直可以拿得到令牌:
修改程序public int rate = 0;
直接不发放令牌,就可以看到令牌全部申请失败的场景:
3.3:优缺点
- 优点
1:因为令牌桶容量有限制,所以可以应对突发流量
2:服务QPS增加或者降低时只需要对应调整令牌的发放速度即可适配
- 缺点
3.4:适用场景
写在后面
参考文章列表
限流:计数器、漏桶、令牌桶 三大算法的原理与实战(史上最全) 。