在高并发的应用中,限流是一个绕不开的话题。限流可以保障我们的 API 服务对所有用户的可用性,也可以防止网络攻击。
一般开发高并发系统常见的限流有:限制总并发数(比如数据库连接池、线程池)、限制瞬时并发数(如 nginx 的 limit_conn 模块,用来限制瞬时并发连接数)、限制时间窗口内的平均速率(如 Guava 的 RateLimiter、nginx 的 limit_req 模块,限制每秒的平均速率);其他还有如限制远程接口调用速率、限制 MQ 的消费速率。另外还可以根据网络连接数、网络流量、CPU 或内存负载等来限流。
本文详细探讨在 Spring Cloud Gateway 中如何实现限流。
# 限流算法
做限流 (Rate Limiting/Throttling) 的时候,除了简单的控制并发,如果要准确的控制 TPS,简单的做法是维护一个单位时间内的 Counter,如判断单位时间已经过去,则将 Counter 重置零。此做法被认为没有很好的处理单位时间的边界,比如在前一秒的最后一毫秒里和下一秒的第一毫秒都触发了最大的请求数,也就是在两毫秒内发生了两倍的 TPS。
常用的更平滑的限流算法有两种:漏桶算法和令牌桶算法。很多传统的服务提供商如华为中兴都有类似的专利,参考采用令牌漏桶进行报文限流的方法。
# 漏桶算法
漏桶(Leaky Bucket)算法思路很简单,水(请求)先进入到漏桶里,漏桶以一定的速度出水(接口有响应速率),当水流入速度过大会直接溢出(访问频率超过接口响应速率),然后就拒绝请求,可以看出漏桶算法能强行限制数据的传输速率。
可见这里有两个变量,一个是桶的大小,支持流量突发增多时可以存多少的水(burst),另一个是水桶漏洞的大小(rate)。因为漏桶的漏出速率是固定的参数,所以,即使网络中不存在资源冲突(没有发生拥塞),漏桶算法也不能使流突发(burst)到端口速率。因此,漏桶算法对于存在突发特性的流量来说缺乏效率。
# 令牌桶算法
令牌桶算法(Token Bucket)和 Leaky Bucket 效果一样但方向相反的算法,更加容易理解。随着时间流逝,系统会按恒定 1/QPS 时间间隔(如果 QPS=100,则间隔是 10ms)往桶里加入 Token(想象和漏洞漏水相反,有个水龙头在不断的加水),如果桶已经满了就不再加了。新请求来临时,会各自拿走一个 Token,如果没有 Token 可拿了就阻塞或者拒绝服务。
令牌桶的另外一个好处是可以方便的改变速度。一旦需要提高速率,则按需提高放入桶中的令牌的速率。一般会定时(比如 100 毫秒)往桶中增加一定数量的令牌,有些变种算法则实时的计算应该增加的令牌的数量。
Leakly Bucket vs Token Bucket
# 限流实现(Redis)
1.引入jar包
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-gateway</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
2.编写配置文件
spring:
application:
name: gateway-9205
cloud:
nacos:
discovery:
server-addr: localhost:8847
gateway:
routes:
- id: user-provider-9206
uri: lb://user-provider-9206
predicates:
- Path=/user/**
filters:
- RewritePath=/user(?<segment>/?.*), $\{segment}
- name: RequestRateLimiter
args:
# 如果返回的key是空的话,则不进行限流
deny-empty-key: false
# 每秒产生多少个令牌
redis-rate-limiter.replenishRate: 1
# 1秒内最大的令牌,即在1s内可以允许的突发流程,设置为0,表示阻止所有的请求
redis-rate-limiter.burstCapacity: 1
# 每次请求申请几个令牌
redis-rate-limiter.requestedTokens: 1
redis:
host: 192.168.7.1
database: 12
port: 6379
password: 123456
server:
port: 9205
debug: true
3.网关正常响应
4.网关限流响应
自定义限流算法和限流key
1.自定义限流key
编写一个类实现 KeyResolver 接口即可。
package com.huan.study.gateway;
import lombok.extern.slf4j.Slf4j;
import org.springframework.cloud.gateway.filter.ratelimit.KeyResolver;
import org.springframework.cloud.gateway.route.Route;
import org.springframework.cloud.gateway.support.ServerWebExchangeUtils;
import org.springframework.http.server.reactive.ServerHttpRequest;
import org.springframework.stereotype.Component;
import org.springframework.web.server.ServerWebExchange;
import reactor.core.publisher.Mono;
import java.util.Optional;
/**
* 限流的key获取
*
* @author huan.fu 2021/9/7 - 上午10:25
*/
@Slf4j
@Component
public class DefaultGatewayKeyResolver implements KeyResolver {
@Override
public Mono<String> resolve(ServerWebExchange exchange) {
// 获取当前路由
Route route = exchange.getAttribute(ServerWebExchangeUtils.GATEWAY_ROUTE_ATTR);
ServerHttpRequest request = exchange.getRequest();
String uri = request.getURI().getPath();
log.info("当前返回的uri:[{}]", uri);
return Mono.just(Optional.ofNullable(route).map(Route::getId).orElse("") + "/" + uri);
}
}
配置文件中的写法(部分)
spring:
cloud:
gateway:
routes:
- id: user-provider-9206
filters:
- name: RequestRateLimiter
args:
# 返回限流的key
key-resolver: "#{@defaultGatewayKeyResolver}"
2.自定义限流算法
编写一个类实现 RateLimiter ,此处使用内存限流
package com.huan.study.gateway;
import com.google.common.collect.Maps;
import com.google.common.util.concurrent.RateLimiter;
import lombok.Getter;
import lombok.Setter;
import lombok.ToString;
import lombok.extern.slf4j.Slf4j;
import org.springframework.cloud.gateway.filter.ratelimit.AbstractRateLimiter;
import org.springframework.cloud.gateway.support.ConfigurationService;
import org.springframework.context.annotation.Primary;
import org.springframework.stereotype.Component;
import reactor.core.publisher.Mono;
/**
* @author huan.fu 2021/9/7 - 上午10:36
*/
@Component
@Slf4j
@Primary
public class DefaultGatewayRateLimiter extends AbstractRateLimiter<DefaultGatewayRateLimiter.Config> {
/**
* 和配置文件中的配置属性相对应
*/
private static final String CONFIGURATION_PROPERTY_NAME = "default-gateway-rate-limiter";
private RateLimiter rateLimiter = RateLimiter.create(1);
protected DefaultGatewayRateLimiter(ConfigurationService configurationService) {
super(DefaultGatewayRateLimiter.Config.class, CONFIGURATION_PROPERTY_NAME, configurationService);
}
@Override
public Mono<Response> isAllowed(String routeId, String id) {
log.info("网关默认的限流 routeId:[{}],id:[{}]", routeId, id);
Config config = getConfig().get(routeId);
return Mono.fromSupplier(() -> {
boolean acquire = rateLimiter.tryAcquire(config.requestedTokens);
if (acquire) {
return new Response(true, Maps.newHashMap());
} else {
return new Response(false, Maps.newHashMap());
}
});
}
@Getter
@Setter
@ToString
public static class Config {
/**
* 每次请求多少个 token
*/
private Integer requestedTokens;
}
}
配置文件中的写法(部分)
spring:
cloud:
gateway:
routes:
- id: user-provider-9206
filters:
- name: RequestRateLimiter
args:
# 自定义限流规则
rate-limiter: "#{@defaultGatewayRateLimiter}"
注意⚠️: 这个类需要加上 @Primary 注解。
3.配置文件中的写法
spring:
application:
name: gateway-9205
cloud:
nacos:
discovery:
server-addr: localhost:8847
gateway:
routes:
- id: user-provider-9206
uri: lb://user-provider-9206
predicates:
- Path=/user/**
filters:
- RewritePath=/user(?<segment>/?.*), $\{segment}
- name: RequestRateLimiter
args:
# 自定义限流规则
rate-limiter: "#{@defaultGatewayRateLimiter}"
# 返回限流的key
key-resolver: "#{@defaultGatewayKeyResolver}"
# 如果返回的key是空的话,则不进行限流
deny-empty-key: false
# 限流后向客户端返回的响应码429,请求太多
status-code: TOO_MANY_REQUESTS
# 每次请求申请几个令牌 default-gateway-rate-limiter 的值是在 defaultGatewayRateLimiter 中定义的。
default-gateway-rate-limiter.requestedTokens: 1
server:
port: 9205
debug: true