前期回顾:
【Java编程系列】Springcloud-gateway自带限流方案实践篇
1、实践中发生的问题
主要有以下几个问题:
1、限流返回的响应数据无法自定义
(LogFormatUtils.java:91) - [7b93af46-20] Completed 429 TOO_MANY_REQUESTS
返回后显示的情况如下:
2、默认的限流器(RequestRateLimiterGatewayFilterFactory)会发生异常
有一部分情况会出现:Error [java.lang.UnsupportedOperationException]
详细信息如下:
2023-04-25 12:27:10.628 [NGIEGzOnuguYdozn] ERROR (HttpWebHandlerAdapter.java:295) - [7533b14d-23] Error [java.lang.UnsupportedOperationException] for HTTP POST "***/activity/interaction", but ServerHttpResponse already committed (429 TOO_MANY_REQUESTS)
2023-04-25 12:27:10.641 [NGIEGzOnuguYdozn] ERROR (Loggers.java:319) - [id: 0x7533b14d, L:/0:0:0:0:0:0:0:1:18085 - R:/0:0:0:0:0:0:0:1:48745] Error starting response. Replying error status
java.lang.UnsupportedOperationException: null
at org.springframework.http.ReadOnlyHttpHeaders.add(ReadOnlyHttpHeaders.java:91)
Suppressed: reactor.core.publisher.FluxOnAssembly$OnAssemblyException:
Error has been observed at the following site(s):
|_ checkpoint ⇢ springfox.boot.starter.autoconfigure.SwaggerUiWebFluxConfiguration$CustomWebFilter [DefaultWebFilterChain]
|_ checkpoint ⇢ org.springframework.cloud.gateway.filter.WeightCalculatorWebFilter [DefaultWebFilterChain]
|_ checkpoint ⇢ HTTP POST "/***/***/interact" [Exception***Handler]
Stack trace:
at org.springframework.http.ReadOnlyHttpHeaders.add(ReadOnlyHttpHeaders.java:91)
at org.springframework.cloud.gateway.filter.factory.RequestRateLimiterGatewayFilterFactory.lambda$null$0(RequestRateLimiterGatewayFilterFactory.java:120)
at reactor.core.publisher.MonoFlatMap$FlatMapMain.onNext(MonoFlatMap.java:118)
at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onNext(FluxMapFuseable.java:121)
at reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1705)
at reactor.core.publisher.MonoReduceSeed$ReduceSeedSubscriber.onComplete(MonoReduceSeed.java:156)
at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.onComplete(Operators.java:1939)
at reactor.core.publisher.FluxUsingWhen$UsingWhenSubscriber.deferredComplete(FluxUsingWhen.java:402)
at reactor.core.publisher.FluxUsingWhen$CommitInner.onComplete(FluxUsingWhen.java:536)
at reactor.core.publisher.MonoIgnoreElements$IgnoreElementsSubscriber.onComplete(MonoIgnoreElements.java:81)
at reactor.core.publisher.FluxFlatMap$FlatMapMain.checkTerminated(FluxFlatMap.java:803)
at reactor.core.publisher.FluxFlatMap$FlatMapMain.drainLoop(FluxFlatMap.java:589)
at reactor.core.publisher.FluxFlatMap$FlatMapMain.drain(FluxFlatMap.java:569)
at reactor.core.publisher.FluxFlatMap$FlatMapMain.onComplete(FluxFlatMap.java:455)
at reactor.core.publisher.FluxArray$ArraySubscription.slowPath(FluxArray.java:137)
at reactor.core.publisher.FluxArray$ArraySubscription.request(FluxArray.java:99)
at reactor.core.publisher.FluxFlatMap$FlatMapMain.onSubscribe(FluxFlatMap.java:363)
at reactor.core.publisher.FluxMerge.subscribe(FluxMerge.java:69)
at reactor.core.publisher.Mono.subscribe(Mono.java:4110)
at reactor.core.publisher.FluxUsingWhen$UsingWhenSubscriber.onComplete(FluxUsingWhen.java:394)
at reactor.core.publisher.FluxMap$MapSubscriber.onComplete(FluxMap.java:136)
at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.onComplete(Operators.java:1939)
at reactor.core.publisher.FluxConcatMap$ConcatMapImmediate.drain(FluxConcatMap.java:359)
at reactor.core.publisher.FluxConcatMap$ConcatMapImmediate.onComplete(FluxConcatMap.java:268)
at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.onComplete(Operators.java:1939)
at reactor.core.publisher.MonoFlatMapMany$FlatMapManyInner.onComplete(MonoFlatMapMany.java:252)
at io.lettuce.core.RedisPublisher$ImmediateSubscriber.onComplete(RedisPublisher.java:895)
at io.lettuce.core.RedisPublisher$State.onAllDataRead(RedisPublisher.java:673)
at io.lettuce.core.RedisPublisher$State$3.read(RedisPublisher.java:587)
at io.lettuce.core.RedisPublisher$State$3.onDataAvailable(RedisPublisher.java:544)
at io.lettuce.core.RedisPublisher$RedisSubscription.onDataAvailable(RedisPublisher.java:313)
at io.lettuce.core.RedisPublisher$RedisSubscription.onAllDataRead(RedisPublisher.java:328)
at io.lettuce.core.RedisPublisher$SubscriptionCommand.complete(RedisPublisher.java:758)
at io.lettuce.core.protocol.CommandHandler.complete(CommandHandler.java:654)
at io.lettuce.core.protocol.CommandHandler.decode(CommandHandler.java:614)
at io.lettuce.core.protocol.CommandHandler.channelRead(CommandHandler.java:565)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:377)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:363)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:355)
at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:377)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:363)
at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:163)
at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:714)
at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:650)
at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:576)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:493)
at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
at java.lang.Thread.run(Thread.java:748)
3、在nacos配置gateway的限流配置不生效问题
在nacos的配置中,配置gateway的限流配置,如下:
- id: ***service
uri: 'lb://***service'
order: 0
filters: []
predicates:
- args:
pattern: /xxx/**
name: Path
- id: ***service-limiter
uri: lb://***service
order: 0
predicates:
- Path= /xxx/activity/**
filters:
- name: GatewayRequestRateLimiter
args:
key-resolver: "#{@remoteAddrKeyResolver}"
redis-rate-limiter.replenishRate: 20 # 令牌桶填充的速率 秒为单位
redis-rate-limiter.burstCapacity: 20 # 令牌桶总容量
redis-rate-limiter.requestedTokens: 1 # 每次请求获取的令牌数
你会发现这样配置是不会生效的~~~,测试时都有点不理解。。。
2、解决方案
首先,我们分析一下2种情况导致的原因,
第一个问题,因为源码的过滤器RequestRateLimiterGatewayFilterFactory中,会将限流拦截的请求的http status code设置为429,但是具体的内容格式却不是JSON格式,导致我们看到的响应结果如上图所示。
第二个问题,通过报错内容提示,我们可以找到源码的120行:
at org.springframework.cloud.gateway.filter.factory.RequestRateLimiterGatewayFilterFactory.
lambda$null$0(RequestRateLimiterGatewayFilterFactory.java:120)
源码如下:
这里有往header里加参数,但是提示显示,是 ReadOnlyHttpHeaders.add,只读的headers,是不可以添加操作的,所以抛出了UnsupportedOperationException的异常:
所以,
这2个问题基本都是由于源代码的过滤器所导致,这里要解决问题,我们可以自定义一个过滤器替代一下,代码如下:
package ***.filters;
import com.alibaba.fastjson.JSONObject;
import lombok.extern.slf4j.Slf4j;
import org.springframework.cloud.gateway.filter.GatewayFilter;
import org.springframework.cloud.gateway.filter.factory.RequestRateLimiterGatewayFilterFactory;
import org.springframework.cloud.gateway.filter.ratelimit.KeyResolver;
import org.springframework.cloud.gateway.filter.ratelimit.RateLimiter;
import org.springframework.cloud.gateway.route.Route;
import org.springframework.cloud.gateway.support.ServerWebExchangeUtils;
import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.http.HttpStatus;
import org.springframework.http.server.reactive.ServerHttpResponse;
import org.springframework.stereotype.Component;
import reactor.core.publisher.Mono;
import java.nio.charset.StandardCharsets;
import java.util.Map;
/**
* @Date: 2023/4/25 10:44
* @Description gateway限流limiter,重写限流原过滤器RequestRateLimiterGatewayFilterFactory
*/
@Component
@Slf4j
public class GatewayRequestRateLimiter extends RequestRateLimiterGatewayFilterFactory {
private final RateLimiter defaultRateLimiter;
private final KeyResolver defaultKeyResolver;
public GatewayRequestRateLimiter(RateLimiter defaultRateLimiter, KeyResolver defaultKeyResolver) {
super(defaultRateLimiter, defaultKeyResolver);
this.defaultRateLimiter = defaultRateLimiter;
this.defaultKeyResolver = defaultKeyResolver;
}
@Override
public GatewayFilter apply(Config config) {
KeyResolver resolver = getOrDefault(config.getKeyResolver(), defaultKeyResolver);
RateLimiter<Object> limiter = getOrDefault(config.getRateLimiter(), defaultRateLimiter);
return (exchange, chain) -> resolver.resolve(exchange).flatMap(key -> {
String routeId = config.getRouteId();
if (routeId == null) {
Route route = exchange.getAttribute(ServerWebExchangeUtils.GATEWAY_ROUTE_ATTR);
routeId = route.getId();
}
String finalRouteId = routeId;
return limiter.isAllowed(routeId, key).flatMap(response -> {
for (Map.Entry<String, String> header : response.getHeaders().entrySet()) {
exchange.getResponse().getHeaders().add(header.getKey(), header.getValue());
}
if (response.isAllowed()) {
return chain.filter(exchange);
}
ServerHttpResponse httpResponse = exchange.getResponse();
//修改code为500
httpResponse.setStatusCode(HttpStatus.INTERNAL_SERVER_ERROR);
if (!httpResponse.getHeaders().containsKey("Content-Type")) {
httpResponse.getHeaders().add("Content-Type", "application/json");
}
//此处无法触发全局异常处理,手动返回
JSONObject object = new JSONObject();
object.put("status","429");
object.put("message","请求已被限流");
DataBuffer buffer = httpResponse.bufferFactory().wrap(object.toJSONString().getBytes(StandardCharsets.UTF_8));
return httpResponse.writeWith(Mono.just(buffer));
});
});
}
private <T> T getOrDefault(T configValue, T defaultValue) {
return (configValue != null) ? configValue : defaultValue;
}
}
然后,替换掉原来的gateway中的配置项:
filters:
- name: GatewayRequestRateLimiter #替换原默认过滤器RequestRateLimiter
args:
key-resolver: "#{@remoteAddrKeyResolver}"
redis-rate-limiter.replenishRate: 20 # 令牌桶填充的速率 秒为单位
redis-rate-limiter.burstCapacity: 20 # 令牌桶总容量
redis-rate-limiter.requestedTokens: 1 # 每次请求获取的令牌数
然后再来看看效果,返回结果:
第三个问题,是因为该服务的限流配置的过滤器优先级,和要限流的服务本身的路由配置,两者之间的执行前后优先级一样或是限流的在服务本身配置之后了,简单说就是order的值如果更大,就会越迟执行,所以才导致限流配置没生效。。改成如下即可:
- id: ***service-limiter
uri: lb://***service
order: -1 ##将优先级改小,即提高优先级
predicates:
- Path= /xxx/activity/**
filters:
- name: GatewayRequestRateLimiter
args:
key-resolver: "#{@remoteAddrKeyResolver}"
redis-rate-limiter.replenishRate: 20 # 令牌桶填充的速率 秒为单位
redis-rate-limiter.burstCapacity: 20 # 令牌桶总容量
redis-rate-limiter.requestedTokens: 1 # 每次请求获取的令牌数
好啦,至此限流的优化处理,基本完成啦。。。
最后,对于本文有疑问的地方,欢迎下方留言讨论,喜欢的朋友,请帮忙一键三连~~~