实现方案
在gateway项目中引入依赖
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-gateway</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis-reactive</artifactId>
</dependency>
实现KeyResolver
接口
@Bean
public KeyResolver userKeyResolver() {
return exchange -> Mono.just(
exchange.getRequest().getQueryParams().getFirst("userId")
//exchange.getRequest().getHeaders().getFirst("X-Forwarded-For") 基于请求ip的限流
);
}
配置文件配置
spring:
cloud:
gateway:
routes:
- id: order_route
uri: lb://app-cls
# uri: http://localhost:28080
order: 1
predicates:
- Path=/cls/**
filters:
- StripPrefix=1
- name: RequestRateLimiter
args:
key-resolver: '#{@userKeyResolver}'
redis-rate-limiter.replenishRate: 1
redis-rate-limiter.burstCapacity: 2
源码分析
GatewayRedisAutoConfiguration
会判断是否存在ReactiveRedisTemplate
,如果存在,加载META-INF/scripts/request_rate_limiter.lua
的redis限流脚本,往容器中注入RedisRateLimiter
@Configuration
@AutoConfigureAfter(RedisReactiveAutoConfiguration.class)
@AutoConfigureBefore(GatewayAutoConfiguration.class)
@ConditionalOnBean(ReactiveRedisTemplate.class)
@ConditionalOnClass({RedisTemplate.class, DispatcherHandler.class})
class GatewayRedisAutoConfiguration {
@Bean
@SuppressWarnings("unchecked")
public RedisScript redisRequestRateLimiterScript() {
DefaultRedisScript redisScript = new DefaultRedisScript<>();
redisScript.setScriptSource(new ResourceScriptSource(new ClassPathResource("META-INF/scripts/request_rate_limiter.lua")));
redisScript.setResultType(List.class);
return redisScript;
}
@Bean
//TODO: replace with ReactiveStringRedisTemplate in future
public ReactiveRedisTemplate<String, String> stringReactiveRedisTemplate(
ReactiveRedisConnectionFactory reactiveRedisConnectionFactory) {
RedisSerializer<String> serializer = new StringRedisSerializer();
RedisSerializationContext<String , String> serializationContext = RedisSerializationContext
.<String, String>newSerializationContext()
.key(serializer)
.value(serializer)
.hashKey(serializer)
.hashValue(serializer)
.build();
return new ReactiveRedisTemplate<>(reactiveRedisConnectionFactory,
serializationContext);
}
@Bean
@ConditionalOnMissingBean
public RedisRateLimiter redisRateLimiter(ReactiveRedisTemplate<String, String> redisTemplate,
@Qualifier(RedisRateLimiter.REDIS_SCRIPT_NAME) RedisScript<List<Long>> redisScript,
Validator validator) {
return new RedisRateLimiter(redisTemplate, redisScript, validator);
}
}
RouteDefinitionRouteLocator#convertToRoute
。当定位到具体的路由,会加载GatewayFilter
。而filterDefinitions
会获取对应的工厂生成对应的GatewayFilter
。
private Route convertToRoute(RouteDefinition routeDefinition) {
AsyncPredicate<ServerWebExchange> predicate = combinePredicates(routeDefinition);
List<GatewayFilter> gatewayFilters = getFilters(routeDefinition);
return Route.async(routeDefinition)
.asyncPredicate(predicate)
.replaceFilters(gatewayFilters)
.build();
}
private List<GatewayFilter> getFilters(RouteDefinition routeDefinition) {
List<GatewayFilter> filters = new ArrayList<>();
//TODO: support option to apply defaults after route specific filters?
if (!this.gatewayProperties.getDefaultFilters().isEmpty()) {
filters.addAll(loadGatewayFilters(DEFAULT_FILTERS,
this.gatewayProperties.getDefaultFilters()));
}
if (!routeDefinition.getFilters().isEmpty()) {
filters.addAll(loadGatewayFilters(routeDefinition.getId(), routeDefinition.getFilters()));
}
AnnotationAwareOrderComparator.sort(filters);
return filters;
}
@SuppressWarnings("unchecked")
private List<GatewayFilter> loadGatewayFilters(String id, List<FilterDefinition> filterDefinitions) {
List<GatewayFilter> filters = filterDefinitions.stream()
.map(definition -> {
GatewayFilterFactory factory = this.gatewayFilterFactories.get(definition.getName());
if (factory == null) {
throw new IllegalArgumentException("Unable to find GatewayFilterFactory with name " + definition.getName());
}
Map<String, String> args = definition.getArgs();
if (logger.isDebugEnabled()) {
logger.debug("RouteDefinition " + id + " applying filter " + args + " to " + definition.getName());
}
Map<String, Object> properties = factory.shortcutType().normalize(args, factory, this.parser, this.beanFactory);
Object configuration = factory.newConfig();
ConfigurationUtils.bind(configuration, properties, factory.shortcutFieldPrefix(),
definition.getName(), validator, conversionService);
GatewayFilter gatewayFilter = factory.apply(configuration);
if (this.publisher != null) {
this.publisher.publishEvent(new FilterArgsEvent(this, id, properties));
}
return gatewayFilter;
})
.collect(Collectors.toList());
ArrayList<GatewayFilter> ordered = new ArrayList<>(filters.size());
for (int i = 0; i < filters.size(); i++) {
GatewayFilter gatewayFilter = filters.get(i);
if (gatewayFilter instanceof Ordered) {
ordered.add(gatewayFilter);
}
else {
ordered.add(new OrderedGatewayFilter(gatewayFilter, i + 1));
}
}
return ordered;
}
RequestRateLimiterGatewayFilterFactory#apply
,RequestRateLimiterGatewayFilterFactory
生成RateLimiter
。根据KeyResolver
的结果来执行limiter.isAllowed(route.getId(), key)
@Override
public GatewayFilter apply(Config config) {
KeyResolver resolver = getOrDefault(config.keyResolver, defaultKeyResolver);
RateLimiter<Object> limiter = getOrDefault(config.rateLimiter, defaultRateLimiter);
boolean denyEmpty = getOrDefault(config.denyEmptyKey, this.denyEmptyKey);
HttpStatusHolder emptyKeyStatus = HttpStatusHolder.parse(getOrDefault(config.emptyKeyStatus, this.emptyKeyStatusCode));
return (exchange, chain) -> {
Route route = exchange.getAttribute(ServerWebExchangeUtils.GATEWAY_ROUTE_ATTR);
return resolver.resolve(exchange).defaultIfEmpty(EMPTY_KEY).flatMap(key -> {
if (EMPTY_KEY.equals(key)) {
if (denyEmpty) {
setResponseStatus(exchange, emptyKeyStatus);
return exchange.getResponse().setComplete();
}
return chain.filter(exchange);
}
return limiter.isAllowed(route.getId(), key).flatMap(response -> {
for (Map.Entry<String, String> header : response.getHeaders().entrySet()) {
exchange.getResponse().getHeaders().add(header.getKey(), header.getValue());
}
if (response.isAllowed()) {
return chain.filter(exchange);
}
setResponseStatus(exchange, config.getStatusCode());
return exchange.getResponse().setComplete();
});
});
};
}
RedisRateLimiter#isAllowed
,根据配置中replenishRate
和burstCapacity
执行脚本。redis 中会操作两个 key,request_rate_limiter.{xxx}.tokens
,request_rate_limiter.{xxx}.timestamp
。
public Mono<Response> isAllowed(String routeId, String id) {
if (!this.initialized.get()) {
throw new IllegalStateException("RedisRateLimiter is not initialized");
}
Config routeConfig = loadConfiguration(routeId);
// How many requests per second do you want a user to be allowed to do?
int replenishRate = routeConfig.getReplenishRate();
// How much bursting do you want to allow?
int burstCapacity = routeConfig.getBurstCapacity();
try {
List<String> keys = getKeys(id);
// The arguments to the LUA script. time() returns unixtime in seconds.
List<String> scriptArgs = Arrays.asList(replenishRate + "", burstCapacity + "",
Instant.now().getEpochSecond() + "", "1");
// allowed, tokens_left = redis.eval(SCRIPT, keys, args)
Flux<List<Long>> flux = this.redisTemplate.execute(this.script, keys, scriptArgs);
// .log("redisratelimiter", Level.FINER);
return flux.onErrorResume(throwable -> Flux.just(Arrays.asList(1L, -1L)))
.reduce(new ArrayList<Long>(), (longs, l) -> {
longs.addAll(l);
return longs;
}) .map(results -> {
boolean allowed = results.get(0) == 1L;
Long tokensLeft = results.get(1);
Response response = new Response(allowed, getHeaders(routeConfig, tokensLeft));
if (log.isDebugEnabled()) {
log.debug("response: " + response);
}
return response;
});
}
catch (Exception e) {
/*
* We don't want a hard dependency on Redis to allow traffic. Make sure to set
* an alert so you know if this is happening too much. Stripe's observed
* failure rate is 0.01%.
*/
log.error("Error determining if user allowed from redis", e);
}
return Mono.just(new Response(true, getHeaders(routeConfig, -1L)));
}
限流脚本解读
- 获取上次获取令牌的时间,获取这段时间可以产生的令牌数。比较容量和令牌数的最小值,为当前可以获取的令牌数。
- 更新令牌数,判断当前可获取的令牌数和需要的令牌数的大小,如果可以获取,减去获取到的令牌数。
redis.replicate_commands()
local tokens_key = KEYS[1]
local timestamp_key = KEYS[2]
--redis.log(redis.LOG_WARNING, "tokens_key " .. tokens_key)
local rate = tonumber(ARGV[1])
local capacity = tonumber(ARGV[2])
local now = redis.call('TIME')[1]
local requested = tonumber(ARGV[4])
local fill_time = capacity/rate
local ttl = math.floor(fill_time*2)
--redis.log(redis.LOG_WARNING, "rate " .. ARGV[1])
--redis.log(redis.LOG_WARNING, "capacity " .. ARGV[2])
--redis.log(redis.LOG_WARNING, "now " .. now)
--redis.log(redis.LOG_WARNING, "requested " .. ARGV[4])
--redis.log(redis.LOG_WARNING, "filltime " .. fill_time)
--redis.log(redis.LOG_WARNING, "ttl " .. ttl)
local last_tokens = tonumber(redis.call("get", tokens_key))
if last_tokens == nil then
last_tokens = capacity
end
--redis.log(redis.LOG_WARNING, "last_tokens " .. last_tokens)
local last_refreshed = tonumber(redis.call("get", timestamp_key))
if last_refreshed == nil then
last_refreshed = 0
end
--redis.log(redis.LOG_WARNING, "last_refreshed " .. last_refreshed)
local delta = math.max(0, now-last_refreshed)
local filled_tokens = math.min(capacity, last_tokens+(delta*rate))
local allowed = filled_tokens >= requested
local new_tokens = filled_tokens
local allowed_num = 0
if allowed then
new_tokens = filled_tokens - requested
allowed_num = 1
end
--redis.log(redis.LOG_WARNING, "delta " .. delta)
--redis.log(redis.LOG_WARNING, "filled_tokens " .. filled_tokens)
--redis.log(redis.LOG_WARNING, "allowed_num " .. allowed_num)
--redis.log(redis.LOG_WARNING, "new_tokens " .. new_tokens)
if ttl > 0 then
redis.call("setex", tokens_key, ttl, new_tokens)
redis.call("setex", timestamp_key, ttl, now)
end
-- return { allowed_num, new_tokens, capacity, filled_tokens, requested, new_tokens }
return { allowed_num, new_tokens }
标题复制10行,并且每行大于10个字符 标题复制10行,并且每行大于10个字符 标题复制10行,并且每行大于10个字符 标题复制10行,并且每行大于10个字符 标题复制10行,并且每行大于10个字符 标题复制10行,并且每行大于10个字符 标题复制10行,并且每行大于10个字符 标题复制10行,并且每行大于10个字符 标题复制10行,并且每行大于10个字符 标题复制10行,并且每行大于10个字符
Spring Cloud Gateway使用RedisRateLimiter实现限流
Spring Cloud Gateway使用RedisRateLimiter实现限流
Spring Cloud Gateway使用RedisRateLimiter实现限流
Spring Cloud Gateway使用RedisRateLimiter实现限流
Spring Cloud Gateway使用RedisRateLimiter实现限流
Spring Cloud Gateway使用RedisRateLimiter实现限流