1、限流算法简介
限流顾名思义,就是对请求或并发数进行限制;通过对一个时间窗口内的请求量进行限制来保障系统的正常运行。如果我们的服务资源有限、处理能力有限,就需要对调用我们服务的上游请求进行限制,以防止自身服务由于资源耗尽而停止服务。
在限流中有两个概念需要了解。
- 阈值:在一个单位时间内允许的请求量。如 QPS 限制为10,说明 1 秒内最多接受 10 次请求。
- 拒绝策略:超过阈值的请求的拒绝策略,常见的拒绝策略有直接拒绝、排队等待等。
(1)固定窗口/滑动窗口:
固定窗口在一段时间间隔内(时间窗/时间区间),处理请求的最大数量固定,超过部分不做处理。滑动窗口解决了固定窗口临界突破的问题,只要窗口足够细分。
(2)漏桶:
漏桶大小固定,处理速度固定,但请求进入速度不固定(在突发情况请求过多时,会丢弃过多的请求)。
(3)令牌桶:
令牌桶的大小固定,令牌的产生速度固定,但是消耗令牌(即请求)速度不固定(可以应对一些某些时间请求过多的情况);每个请求都会从令牌桶中取出令牌,如果没有令牌则丢弃该次请求。
(4)分布式流控
2、固定窗口限流
在一段时间间隔内(时间窗/时间区间),处理请求的最大数量固定,超过部分不做处理。
举个例子,比如我们规定对于接口,我们1s的访问次数不能超过2个。
那么我们可以这么做:
-
在一开 始的时候,我们可以设置一个计数器counter,每当一个请求过来的时候,counter就加1,如果counter的值大于2并且该请求与第一个请求的间隔时间还在指定的时间窗口之内,那么说明请求数过多,拒绝访问;
-
如果该请求与第一个请求的间隔时间大于指定的时间窗口,且counter的值还在限流范围内,那么就重置 counter,就是这么简单粗暴。
实现
// 计速器 限速
@Slf4j
public class CounterLimiter
{
// 起始时间
private static long startTime = System.currentTimeMillis();
// 时间区间的时间间隔 ms
private static long interval = 1000;
// 每秒限制数量
private static long maxCount = 2;
//累加器
private static AtomicLong accumulator = new AtomicLong();
// 计数判断, 是否超出限制
public synchronized static boolean tryAcquire() {
if ((System.currentTimeMillis() - startTime) > interval) {
log.inf("窗口重置")
accumulator.set(0);
startTime = System.currentTimeMillis();
}
return accumulator.incrementAndGet() <= maxCount;
}
public static void main(String[] args) throws InterruptedException {
for (int i = 0; i < 10; i++) {
Thread.sleep(250);
LocalTime now = LocalTime.now();
if (!tryAcquire()) {
System.out.println(now + " 被限流");
} else {
System.out.println(now + " 做点什么");
}
}
}
}
问题
从输出结果中可以看到大概每秒操作 3 次,由于限制 QPS 为 2,所以平均会有一次被限流。看起来可以了,不过我们思考一下就会发现这种简单的限流方式是有问题的,虽然我们限制了 QPS 为 2,但是当遇到时间窗口的临界突变时,如 1s 中的后 500 ms 和第 2s 的前 500ms 时,虽然是加起来是 1s 时间,却可以被请求 4 次
3、滑动窗口算法
滑动窗口算法是对固定窗口算法的改进。既然固定窗口算法在遇到时间窗口的临界突变时会有问题,那么我们在遇到下一个时间窗口前也调整时间窗口不就可以了吗?
上图的示例中,每 500ms 滑动一次窗口,可以发现窗口滑动的间隔越短,时间窗口的临界突变问题发生的概率也就越小,不过只要有时间窗口的存在,还是有可能发生时间窗口的临界突变问题。,如果样本窗口定义的合理够小,基本是不会出现临界突破问题。
实现
import java.time.LocalTime;
import java.util.concurrent.atomic.AtomicInteger;
/**
* 滑动窗口限流工具类
*/
public class RateLimiterSlidingWindow {
/**
* 阈值
*/
private int qps = 2;
/**
* 时间窗口总大小(毫秒)
*/
private long windowSize = 1000;
/**
* 多少个子窗口
*/
private Integer windowCount = 10;
/**
* 窗口列表
*/
private WindowInfo[] windowArray = new WindowInfo[windowCount];
public RateLimiterSlidingWindow(int qps) {
this.qps = qps;
long currentTimeMillis = System.currentTimeMillis();
for (int i = 0; i < windowArray.length; i++) {
windowArray[i] = new WindowInfo(currentTimeMillis, new AtomicInteger(0));
}
}
/**
* 1. 计算当前时间窗口
* 2. 更新当前窗口计数 & 重置过期窗口计数
* 3. 当前 QPS 是否超过限制
*
* @return
*/
public synchronized boolean tryAcquire() {
long currentTimeMillis = System.currentTimeMillis();
// 1. 计算当前时间窗口
int currentIndex = (int)(currentTimeMillis % windowSize / (windowSize / windowCount));
// 2. 更新当前窗口计数 & 重置过期窗口计数
int sum = 0;
for (int i = 0; i < windowArray.length; i++) {
WindowInfo windowInfo = windowArray[i];
if ((currentTimeMillis - windowInfo.getTime()) > windowSize) {
windowInfo.getNumber().set(0);
windowInfo.setTime(currentTimeMillis);
}
if (currentIndex == i && windowInfo.getNumber().get() < qps) {
windowInfo.getNumber().incrementAndGet();
}
sum = sum + windowInfo.getNumber().get();
}
// 3. 当前 QPS 是否超过限制
return sum <= qps;
}
private class WindowInfo {
// 窗口开始时间
private Long time;
// 计数器
private AtomicInteger number;
public WindowInfo(long time, AtomicInteger number) {
this.time = time;
this.number = number;
}
// get...set...
}
}
测试用例
public static void main(String[] args) throws InterruptedException {
int qps = 2, count = 20, sleep = 300, success = count * sleep / 1000 * qps;
System.out.println(String.format("当前QPS限制为:%d,当前测试次数:%d,间隔:%dms,预计成功次数:%d", qps, count, sleep, success));
success = 0;
RateLimiterSlidingWindow myRateLimiter = new RateLimiterSlidingWindow(qps);
for (int i = 0; i < count; i++) {
Thread.sleep(sleep);
if (myRateLimiter.tryAcquire()) {
success++;
if (success % qps == 0) {
System.out.println(LocalTime.now() + ": success, ");
} else {
System.out.print(LocalTime.now() + ": success, ");
}
} else {
System.out.println(LocalTime.now() + ": fail");
}
}
System.out.println();
System.out.println("实际测试成功次数:" + success);
}
这种方式没有了时间窗口突变的问题,限流比较准确,但是因为要记录下每次请求的时间点,所以占用的内存较多。
4、漏桶算法
漏桶算法限流的基本原理为:水(对应请求)从进水口进入到漏桶里,漏桶以一定的速度出水(请求放行),当水流入速度过大,桶内的总水量大于桶容量会直接溢出,请求被拒绝,如图所示。
大致的漏桶限流规则如下:
(1)进水口(对应客户端请求)以任意速率流入进入漏桶。
(2)漏桶的容量是固定的,出水(放行)速率也是固定的。
(3)漏桶容量是不变的,如果处理速度太慢,桶内水量会超出了桶的容量,则后面流入的水滴会溢出,表示请求拒绝。
漏桶算法其实很简单,可以粗略的认为就是注水漏水过程,往桶中以任意速率流入水,以一定速率流出水,当水超过桶容量(capacity)则丢弃,因为桶容量是不变的,保证了整体的速率。
实现
// 漏桶 限流
@Slf4j
public class LeakBucketLimiter {
// 计算的起始时间
private static long lastOutTime = System.currentTimeMillis();
// 流出速率 每秒 2 次
private static int leakRate = 2;
// 桶的容量
private static int capacity = 2;
//剩余的水量
private static AtomicInteger water = new AtomicInteger(0);
//返回值说明:
// false 没有被限制到
// true 被限流
public static synchronized boolean isLimit() {
// 如果是空桶,就当前时间作为漏出的时间
if (water.get() == 0) {
lastOutTime = System.currentTimeMillis();
water.addAndGet(1);
return false;
}
// 执行漏水 2/s (一般情况下 waterLeaked小于桶容量才行)
int waterLeaked = ((int) ((System.currentTimeMillis() - lastOutTime) / 1000)) * leakRate;
// 计算剩余水量
int waterLeft = water.get() - waterLeaked;
water.set(Math.max(0, waterLeft));
// 重新更新leakTimeStamp
lastOutTime = System.currentTimeMillis();
// 尝试加水,并且水还未满 ,放行
if ((water.get()) < capacity) {
water.addAndGet(1);
return false;
} else {
// 水满,拒绝加水, 限流
return true;
}
}
}
漏桶出口的速度固定,不能灵活的应对后端能力提升。比如,通过动态扩容,后端流量从1000QPS提升到1WQPS,漏桶没有办法。但是一定时间内,流出速度(处理速度是固定的),流量整形,避免服务被冲垮
5、令牌桶算法
令牌桶算法以一个设定的速率产生令牌并放入令牌桶,每次用户请求都得申请令牌,如果令牌不足,则拒绝请求。
令牌桶算法中新请求到来时会从桶里拿走一个令牌,如果桶内没有令牌可拿,就拒绝服务。当然,令牌的数量也是有上限的。令牌的数量与时间和发放速率强相关,时间流逝的时间越长,会不断往桶里加入越多的令牌,如果令牌发放的速度比申请速度快,令牌桶会放满令牌,直到令牌占满整个令牌桶,如图所示。
令牌桶限流大致的规则如下:
(1)进水口按照某个速度,向桶中放入令牌。
(2)令牌的容量是固定的,但是放行的速度不是固定的,只要桶中还有剩余令牌,一旦请求过来就能申请成功,然后放行。
(3)如果令牌的发放速度,慢于请求到来速度,桶内就无牌可领,请求就会被拒绝。
总之,令牌的发送速率可以设置,从而可以对突发的出口流量进行有效的应对。
// 令牌桶 限速
@Slf4j
public class TokenBucketLimiter {
// 上一次令牌发放时间
public long lastTime = System.currentTimeMillis();
// 桶的容量
public int capacity = 2;
// 令牌生成速度 /s
public int rate = 2;
// 当前令牌数量
public AtomicInteger tokens = new AtomicInteger(0);
;
//返回值说明:
// false 没有被限制到
// true 被限流
public synchronized boolean isLimited() {
long now = System.currentTimeMillis();
//时间间隔,单位为 ms
long gap = now - lastTime;
//计算时间段内的令牌数
int reverse_permits = (int) (gap * rate / 1000);
int all_permits = tokens.get() + reverse_permits;
// 当前令牌数
tokens.set(Math.min(capacity, all_permits));
if (tokens.get() < applyCount) {
// 若拿不到令牌,则拒绝
return true;
} else {
// 还有令牌,领取令牌
tokens.getAndAdd( - applyCount);
lastTime = now;
// log.info("剩余令牌.." + tokens);
return false;
}
}
}
6、分布式流控
高性能的分布式限流组件可以使用Redis+Lua来开发,京东的抢购就是使用Redis+Lua完成的限流。并且无论是Nginx外部网关还是gateway内部网关,都可以使用Redis+Lua限流组件。
定义注解
/**
* @description 自定义注解实现分布式限流
*/
@Target(value = ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface RedisLimit {
/**
* 请求限制,一秒内可以允许好多个进入(默认一秒可以支持100个)
* @return
*/
int reqLimit() default 1000;
/**
* 模块名称
* @return
*/
String reqName() default "";
}
注解实现
/**
* @description MyRedisLimiter注解的切面类
*/
@Aspect
@Component
public class RedisLimiterAspect {
private final Logger logger = LoggerFactory.getLogger(RedisLimitStream.class);
private static String EXPIRE_TIME = "1";
/**
* 当前响应请求
*/
@Autowired
private HttpServletResponse response;
/**
* redis服务
*/
@Autowired
private RedisService redisService;
/**
* 执行redis的脚本文件
*/
@Autowired
private RedisScript<Boolean> rateLimitLua;
/**
* 对所有接口进行拦截
*/
@Pointcut("@annotation(xxx.RedisLimit)")
public void pointcut(){}
/**
* 对切点进行继续处理
*/
@Around("pointcut()")
public Object process(ProceedingJoinPoint proceedingJoinPoint) throws Throwable{
//使用反射获取RedisLimit注解
MethodSignature signature = (MethodSignature) proceedingJoinPoint.getSignature();
//没有添加限流注解的方法直接放行
RedisLimit redisLimit = signature.getMethod().getDeclaredAnnotation(RedisLimit.class);
if(ObjectUtils.isEmpty(redisLimit)){
return proceedingJoinPoint.proceed();
}
//List设置Lua的KEYS[1]
List<String> keyList = new ArrayList<>();
// 当前秒数作为 key
keyList.add("ip:" + (System.currentTimeMillis() / 1000));
//获取注解上的参数,获取配置的速率
//List设置Lua的ARGV[1]
int value = redisLimit.reqLimit();
// 调用Redis执行lua脚本,未拿到令牌的,直接返回提示
boolean acquired = redisService.execute(rateLimitLua, keyList, value,EXPIRE_TIME);
logger.info("执行lua结果:" + acquired);
if(!acquired){
this.limitStreamBackMsg();
return null;
}
//获取到令牌,继续向下执行
return proceedingJoinPoint.proceed();
}
/**
* 被拦截的人,提示消息
*/
private void limitStreamBackMsg() {
response.setHeader("Content-Type", "text/html;charset=UTF8");
PrintWriter writer = null;
try {
writer = response.getWriter();
writer.println("{\"code\":503,\"message\":\"当前排队人较多,请稍后再试!\",\"data\":\"null\"}");
writer.flush();
} catch (Exception e) {
e.printStackTrace();
} finally {
if (writer != null) {
writer.close();
}
}
}
}
定义redis配置文件
/**
* @description 实现redis的编码方式
*/
@Configuration
public class RedisConfiguration {
/**
* 初始化将lua脚本加载到redis脚本中
* @return
*/
@Bean
public DefaultRedisScript loadRedisScript() {
DefaultRedisScript redisScript = new DefaultRedisScript();
redisScript.setLocation(new ClassPathResource("limit.lua"));
redisScript.setResultType(Boolean.class);
return redisScript;
}
}
定义固定窗口 lua脚本
local count = redis.call("incr",KEYS[1])
if count == 1 then
redis.call('expire',KEYS[1],ARGV[2])
end
if count > tonumber(ARGV[1]) then
return 0
end
return 1
封装方法调用
/**
* 执行lua脚本
* @param redisScript lua源代码脚本
* @param keyList
* @param value
* @return
*/
public boolean execute(RedisScript<Boolean> redisScript, List<String> keyList, int value,,String expireTime) {
return redisTemplate.execute(redisScript, keyList, String.valueOf(value),expireTime);
}
7、lua脚本
7.1、固定窗口lua脚本
Redis 中的固定窗口限流是使用 incr 命令实现的,incr 命令通常用来自增计数;如果我们使用时间戳信息作为 key,自然就可以统计每秒的请求量了,以此达到限流目的。
这里有两点要注意。
- 对于不存在的 key,第一次新增时,value 始终为 1。
- INCR 和 EXPIRE 命令操作应该在一个原子操作中提交,以保证每个 key 都正确设置了过期时间,不然会有 key 值无法自动删除而导致的内存溢出。
由于 Redis 中实现事务的复杂性,所以这里直接只用 lua 脚本来实现原子操作。下面是 lua 脚本内容。
local count = redis.call("incr",KEYS[1])
if count == 1 then
redis.call('expire',KEYS[1],ARGV[2])
end
if count > tonumber(ARGV[1]) then
return 0
end
return 1
7.1、滑动窗口lua脚本
通过对上面的基于 incr 命令实现的 Redis 限流方式的测试,我们已经发现了固定窗口限流所带来的问题,在这篇文章的第三部分已经介绍了滑动窗口限流的优势,它可以大幅度降低因为窗口临界突变带来的问题,那么如何使用 Redis 来实现滑动窗口限流呢?
这里主要使用 ZSET 有序集合来实现滑动窗口限流,ZSET 集合有下面几个特点:
- ZSET 集合中的 key 值可以自动排序。
- ZSET 集合中的 value 不能有重复值。
- ZSET 集合可以方便的使用 ZCARD 命令获取元素个数。
- ZSET 集合可以方便的使用 ZREMRANGEBYLEX 命令移除指定范围的 key 值。
基于上面的四点特性,可以编写出基于 ZSET 的滑动窗口限流 lua 脚本。
--KEYS[1]: 限流 key
--ARGV[1]: 时间戳 - 时间窗口
--ARGV[2]: 当前时间戳(作为score)
--ARGV[3]: 阈值
--ARGV[4]: score 对应的唯一value
-- 1. 移除时间窗口之前的数据
redis.call('zremrangeByScore', KEYS[1], 0, ARGV[1])
-- 2. 统计当前元素数量
local res = redis.call('zcard', KEYS[1])
-- 3. 是否超过阈值
if (res == nil) or (res < tonumber(ARGV[3])) then
redis.call('zadd', KEYS[1], ARGV[2], ARGV[4])
return 1
else
return 0
end
不想自己实现,可以使用redision,Guava
参考
https://www.cnblogs.com/crazymakercircle/p/15187184.html
https://www.jb51.net/article/256542.htm
https://mp.weixin.qq.com/s/xFtQTueVEd1snfYNRoVlmA