Gateway网关限流

news2025/1/4 19:51:09

在高并发的系统中,往往需要在系统中做限流,一方面是为了防止大量的请求使服务器过载,导致服务不可用,另一方面是为了防止恶意网络攻击

文章目录

  • 一、常见限流场景
    • 1.1 限流的对象
    • 1.2 限流的处理
    • 1.3 限流的架构
  • 二、常见的限流算法
    • 2.1 计数器算法
    • 2.2 漏桶算法(Leaky Bucket)
    • 2.3 令牌桶算法(Token Bucket)
  • 三、Gateway令牌桶限流
    • 3.1 添加依赖jar包
    • 3.2 yml添加配置
    • 3.3 添加redis配置类
    • 3.4 定义KeyResolver的bean对象
    • 3.5自定义返回信息配置类
    • 3.6 测试
  • 四、Gateway令牌桶限流源码解析

一、常见限流场景

缓存、降级 和 限流 被称为高并发、分布式系统的三驾马车,网关作为整个分布式系统中的第一道关卡,限流功能自然必不可少。通过限流,可以控制服务请求的速率,从而提高系统应对突发大流量的能力,让系统更具弹性。限流有着很多实际的应用场景,比如双十一的秒杀活动, 12306 的抢票等。这里讨论网关限流。

1.1 限流的对象

通过上面的介绍,我们对限流的概念可能感觉还是比较模糊,到底限流限的是什么?顾名思义,限流就是限制流量,但这里的流量是一个比较笼统的概念。如果考虑各种不同的场景,限流是非常复杂的,而且和具体的业务规则密切相关,可以考虑如下几种常见的场景:

a):请求频率限流(Request rate limiting):限制某个接口的一分钟内的请求数
b):并发量限流(Concurrent requests limiting):限制某个服务的处理请求数
c):传输速率限流(Transmission rate limiting):限制下载文件速率
d):限制黑名单用户访问
e):限制某个IP请求

1.2 限流的处理

a):拒绝服务:请求直接抛出异常,如:gateway返回429状态码
b):排队等待:请求放入队列中,等待处理
c):服务降级:请求返回兜底数据等

最简单的做法是拒绝服务,直接抛出异常,返回错误信息(比如返回 HTTP 状态码 429 Too Many Requests),或者给前端返回 302 重定向到一个错误页面,提示用户资源没有了或稍后再试。但是对于一些比较重要的接口不能直接拒绝,比如秒杀、下单等接口,我们既不希望用户请求太快,也不希望请求失败,这种情况一般会将请求放到一个消息队列中排队等待,消息队列可以起到削峰和限流的作用。第三种处理方式是服务降级,当触发限流条件时,直接返回兜底数据,比如查询商品库存的接口,可以默认返回有货。

1.3 限流的架构

针对不同的系统架构,需要使用不同的限流方案。如下图所示,服务部署的方式一般可以分为单机模式和集群模式:

在这里插入图片描述单机模式的限流非常简单,可以直接基于内存就可以实现,而集群模式的限流必须依赖于某个“中心化”的组件,比如网关或 Redis,从而引出两种不同的限流架构:网关层限流 和 中间件限流。

在这里插入图片描述网关作为整个分布式系统的入口,承担了所有的用户请求,所以在网关中进行限流是最合适不过的。网关层限流有时也被称为 接入层限流。除了我们使用的 Spring Cloud Gateway,最常用的网关层组件还有 Nginx,可以通过它的 ngx_http_limit_req_module 模块,使用 limit_conn_zone、limit_req_zone、limit_rate 等指令很容易的实现并发量限流、请求频率限流和传输速率限流。这里不对 Nginx 作过多的说明,关于这几个指令的详细信息可以 参考 Nginx 的官方文档。
另一种限流架构是中间件限流,可以将限流的逻辑下沉到服务层。但是集群中的每个服务必须将自己的流量信息统一汇总到某个地方供其他服务读取,一般来说用 Redis 的比较多,Redis 提供的过期特性和 lua 脚本执行非常适合做限流。除了 Redis 这种中间件,还有很多类似的分布式缓存系统都可以使用,如 Hazelcast、Apache Ignite、Infinispan 等。
我们可以更进一步扩展上面的架构,将网关改为集群模式,虽然这还是网关层限流架构,但是由于网关变成了集群模式,所以网关必须依赖于中间件进行限流,这和上面讨论的中间件限流没有区别。

在这里插入图片描述

二、常见的限流算法

2.1 计数器算法

计数器算法采用计数器实现限流有点简单粗暴,一般我们会限制一秒钟的能够通过的请求数,比如限流qps为100,算法的实现思路就是从第一个请求进来开始计时,在接下去的1s内,每来一个请求,就把计数加1,如果累加的数字达到了100,那么后续的请求就会被全部拒绝。等到1s结束后,把计数恢复成0,重新开始计数。

在这里插入图片描述
如上图所示,假定1min内限制请求数量为100,一个请求则计数器counter+1。当counter大于100且还在当前的1min内,触发限流;1min过后,counter重新计数。计数器算法存在临界问题,如下图所示:假设有一个恶意用户,他在 0:59 时,瞬间发送了 100 个请求,并且 1:00 又瞬间发送了 100 个请求,那么其实这个用户在 1 秒里面,瞬间发送了 200 个请求。这个临界点上,可以压垮服务。

在这里插入图片描述

2.2 漏桶算法(Leaky Bucket)

除了计数器算法,另一个很自然的限流思路是将所有的请求缓存到一个队列中,然后按某个固定的速度慢慢处理,这其实就是漏桶算法(Leaky Bucket)。漏桶算法假设将请求装到一个桶中,桶的容量为 M,当桶满时,请求被丢弃。在桶的底部有一个洞,桶中的请求像水一样按固定的速度(每秒 r 个)漏出来。我们用下面这个形象的图来表示漏桶算法:

在这里插入图片描述
桶的上面是个水龙头,我们的请求从水龙头流到桶中,水龙头流出的水速不定,有时快有时慢,这种忽快忽慢的流量叫做 Bursty flow。如果桶中的水满了,多余的水就会溢出去,相当于请求被丢弃。从桶底部漏出的水速是固定不变的,可以看出漏桶算法可以平滑请求的速率。
漏桶算法可以通过一个队列来实现,如下图所示:

在这里插入图片描述
当请求到达时,不直接处理请求,而是将其放入一个队列,然后另一个线程以固定的速率从队列中读取请求并处理,从而达到限流的目的。注意的是这个队列可以有不同的实现方式,比如设置请求的存活时间,或将队列改造成 PriorityQueue,根据请求的优先级排序而不是先进先出。当然队列也有满的时候,如果队列已经满了,那么请求只能被丢弃了。漏桶算法有一个缺陷,在处理突发流量时效率很低。比如双十一抢购、秒杀活动

2.3 令牌桶算法(Token Bucket)

令牌桶算法(Token Bucket)是目前应用最广泛的一种限流算法,它的基本思想由两部分组成:生成令牌 和 消费令牌。

生产令牌:固定容量的令牌桶,按固定的速率(N/s)往桶中放入令牌,桶满时不再放入;
消费令牌:每个请求需要从桶中拿取令牌,当消费速率低于生产速率时,直至桶中令牌满而触发限流,此时请求可以放入缓冲队列或直接拒绝。

令牌桶算法的图示如下:

在这里插入图片描述在上面的图中,我们将请求放在一个缓冲队列中,可以看出这一部分的逻辑和漏桶算法几乎一模一样,只不过在处理请求上,一个是以固定速率处理,一个是从桶中获取令牌后才处理。
仔细思考就会发现,令牌桶算法有一个很关键的问题,就是桶大小的设置,正是这个参数可以让令牌桶算法具备处理突发流量的能力。譬如将桶大小设置为 100,生成令牌的速度设置为每秒 10 个,那么在系统空闲一段时间的之后(桶中令牌一直没有消费,慢慢的会被装满),突然来了 50 个请求,这时系统可以直接按每秒 50 个的速度处理,随着桶中的令牌很快用完,处理速度又会慢慢降下来,和生成令牌速度趋于一致。这是令牌桶算法和漏桶算法最大的区别,漏桶算法无论来了多少请求,只会一直以每秒 10 个的速度进行处理。当然,处理突发流量虽然提高了系统性能,但也给系统带来了一定的压力,如果桶大小设置不合理,突发的大流量可能会直接压垮系统。

总结三种算法特点
类别特点缺点
计数器算法

1.结构简单-计数器;

2.临界问题;

出现临界问题
漏斗算法

1.固定速率处理请求;

2.保护服务;

无法处理突发流量
令牌桶算法

1.固定速率生产令牌;

2.设置容量大小;

3.处理突发流量;

容量设置不合理,可能压垮服务

三、Gateway令牌桶限流

Spring Cloud Gateway官方就提供了RequestRateLimiterGatewayFilterFactory这个类,适用Redis和lua脚本实现了令牌桶的方式。具体实现逻辑在RequestRateLimiterGatewayFilterFactory类中,lua脚本在如下图所示的文件夹中:
在这里插入图片描述接下来我们使用redis作为令牌桶来实现限流:

步骤如下:
引入依赖
创建限流标识
配置限流速率

3.1 添加依赖jar包

SpringBoot和cloud版本

<!-- SpringBoot 依赖配置 -->
<dependency>
	<groupId>org.springframework.boot</groupId>
	<artifactId>spring-boot-dependencies</artifactId>
	<version>2.3.7.RELEASE</version>
	<type>pom</type>
	<scope>import</scope>
</dependency>
<!-- Springcloud 依赖配置 -->
<dependency>
	<groupId>org.springframework.cloud</groupId>
	<artifactId>spring-cloud-dependencies</artifactId>
	<version>Hoxton.SR9</version>
	<type>pom</type>
	<scope>import</scope>
</dependency>

限流依赖

<!--网关-->
<dependency>
	<groupId>org.springframework.cloud</groupId>
	<artifactId>spring-cloud-starter-gateway</artifactId>
</dependency>
<!--基于Redis实现限流-->
<dependency>
	<groupId>org.springframework.boot</groupId>
	<artifactId>spring-boot-starter-data-redis-reactive</artifactId>
	<version>2.2.13.RELEASE</version>
</dependency>

3.2 yml添加配置

server:
  # 服务器的HTTP端口,默认为80
  port: 9000

# Spring配置
spring:
  application:
    #微服务名称
    name: microservice-gateway

  cloud:
    gateway:
      discovery: #配置网关发现机制
        locator: #配置网关处理机制
          enabled: false #开启网关自动映射
          lower-case-service-id: false #服务名称大小写转换:true开启,false 关闭
      routes:
        - id: routed2
          uri: lb://CONSUMER80
          predicates:
            - Path=/api/*/*
          filters:
            # 截断一位url请求前缀
            #- StripPrefix=1
            - name: GatewayRequestRateLimiter
              args:
                # 指定限流标识
                key-resolver: '#{@ipKeyResolver}' #SpringEL表达式,从spring容器中找对象并赋值 '#{@beanName}'
                # 生产令牌速度,每秒多少个令牌
                redis-rate-limiter.replenishRate: 1
                # 令牌桶的总容量
                redis-rate-limiter.burstCapacity: 5

# eureka客户端配置
eureka:
  instance:
    #向注册中心注册服务ID
    instance-id: ${spring.cloud.client.ip-address}:${server.port}
    prefer-ip-address: true     #显示IP地址
    # Eureka客户端向服务端发送心跳的时间间隔,单位为秒(默认是30秒)
    lease-renewal-interval-in-seconds: 30
    #Eureka服务端在收到最后一次心跳后等待时间上限,单位为秒(默认是90秒),超时将剔除服务
    lease-expiration-duration-in-seconds: 30
  server:
    # 设置eureka是否启动自我保护
    enable-self-preservation: true
    # 剔除服务的时间间隔毫秒数(单位:毫秒,默认60秒)
    eviction-interval-timer-in-ms: 5000
  client:
    #表示是否向Eureka注册中心注册自己
    register-with-eureka: true
    fetch-registry: true # false表示自己就是注册中心,我的职责就是维护服务实例,并不需要去检索服务
    service-url:
      #defaultZone:  http://eureka7001.com:7001/eureka/,http://eureka7002.com:7002/eureka/
      defaultZone: http://192.168.10.130:7001/eureka/,http://192.168.10.130:7002/eureka/


redis:
  host: 127.0.0.1
  port: 6379
  password: 123456
  database: 0

在上面的配置文件,配置了RequestRateLimiter的限流过滤器,该过滤器需要配置三个参数:

burstCapacity,令牌桶总容量。
replenishRate,令牌桶每秒填充平均速率。
key-resolver,用于限流的键的解析器的 Bean 对象的名字。它使用 SpEL 表达式根据#{@beanName}从 Spring 容器中获取 Bean 对象。

3.3 添加redis配置类

为什么要配置这个,是因为 reactive 使用了 lettuce连接池


import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;
import org.springframework.core.annotation.Order;
import org.springframework.data.redis.connection.ReactiveRedisConnectionFactory;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.connection.RedisPassword;
import org.springframework.data.redis.connection.RedisStandaloneConfiguration;
import org.springframework.data.redis.connection.lettuce.LettuceConnectionFactory;
import org.springframework.data.redis.core.ReactiveRedisTemplate;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.serializer.RedisSerializationContext;
import org.springframework.data.redis.serializer.RedisSerializer;
import org.springframework.data.redis.serializer.StringRedisSerializer;

@Configuration
@Order(-1)
//@AutoConfigureBefore({RedisAutoConfiguration.class, RedisReactiveAutoConfiguration.class})
public class RedisConfig {


    @Value("${redis.host}")
    private String redisHost;

    @Value("${redis.port}")
    private int redisPort;

    @Value("${redis.password}")
    private String redisPassword;


    @Bean
    public ReactiveRedisTemplate<Object, Object> reactiveRedisTemplate(ReactiveRedisConnectionFactory reactiveRedisConnectionFactory) {
        RedisSerializationContext<Object, Object> serializationContext = RedisSerializationContext
                .newSerializationContext(RedisSerializer.string())
                .value(RedisSerializer.json())
                .build();
        return new ReactiveRedisTemplate<>(reactiveRedisConnectionFactory, serializationContext);
    }



    @Primary
    @Bean
    public ReactiveRedisConnectionFactory lettuceConnectionFactory() {
        RedisStandaloneConfiguration redisConfig = new RedisStandaloneConfiguration(redisHost, redisPort);
        redisConfig.setPassword(RedisPassword.of(redisPassword));
        return new LettuceConnectionFactory(redisConfig);
    }


}

3.4 定义KeyResolver的bean对象

配置中key-resolver: “#{@ipKeyResolver}”,其中ipKeyResolver对应的是下面方法的名称

import lombok.extern.slf4j.Slf4j;
import org.springframework.cloud.gateway.filter.ratelimit.KeyResolver;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;
import org.springframework.http.server.reactive.ServerHttpRequest;
import org.springframework.web.server.ServerWebExchange;
import reactor.core.publisher.Mono;


/**
 * 定义KeyResolver的bean对象
 */

@Slf4j
@Configuration
public class KeyResolverConfig {

    /**
     * 基于url
     */
    @Bean
    public KeyResolver pathKeyResolver() {
        System.out.println("基于url限流");
        return exchange -> Mono.just(
                exchange.getRequest().getPath().toString()
        );
    }

    /**
     * 基于用户限流
     */
    @Bean
    KeyResolver userKeyResolver() {
        System.out.println("基于用户限流");
        //按用户限流
        return exchange -> Mono.just(exchange.getRequest().getQueryParams().getFirst("user"));
    }
   
//    @Bean
//    @Primary
//    KeyResolver ipKeyResolver() {
//        System.out.println("基于IP来限流");
//        //按IP来限流
//        return exchange -> Mono.just(exchange.getRequest().getRemoteAddress().getHostName());
//    }

    /**
     * 基于IP来限流
     */
    @Primary
    @Bean
    public KeyResolver ipKeyResolver() {
        return new KeyResolver() {
            @Override
            public Mono<String> resolve(ServerWebExchange exchange) {
                ServerHttpRequest request = exchange.getRequest();
                String remoteAddr = request.getRemoteAddress().getAddress().getHostAddress();
                // 这里根据请求【URI】进行限流
                log.info("这里根据url请求 {}", remoteAddr);
                return Mono.just(remoteAddr);
            }
        };
    }
}

3.5自定义返回信息配置类

因为源码的过滤器RequestRateLimiterGatewayFilterFactory中,会将限流拦截的请求的http status code设置为429,但是具体的内容格式却不是JSON格式,导致我们看到的响应结果如上图所示。

at org.springframework.cloud.gateway.filter.factory.RequestRateLimiterGatewayFilterFactory.lambda$null$0(RequestRateLimiterGatewayFilterFactory.java:120)

源码如下:

在这里插入图片描述这里有往header里加参数,但是提示显示,是 ReadOnlyHttpHeaders.add,只读的headers,是不可以添加操作的,所以抛出了UnsupportedOperationException的异常:

在这里插入图片描述
所以,这个问题基本都是由于源代码的过滤器所导致,这里要解决问题,我们可以自定义一个过滤器替代一下,代码如下:

import lombok.extern.slf4j.Slf4j;
import org.springframework.cloud.gateway.filter.GatewayFilter;
import org.springframework.cloud.gateway.filter.factory.RequestRateLimiterGatewayFilterFactory;
import org.springframework.cloud.gateway.filter.ratelimit.KeyResolver;
import org.springframework.cloud.gateway.filter.ratelimit.RateLimiter;
import org.springframework.cloud.gateway.route.Route;
import org.springframework.cloud.gateway.support.ServerWebExchangeUtils;
import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.http.HttpStatus;
import org.springframework.http.server.reactive.ServerHttpResponse;
import org.springframework.stereotype.Component;
import reactor.core.publisher.Mono;

import java.nio.charset.StandardCharsets;
import java.time.Instant;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.Map;

/***
 * 自定义限流处理异常信息
 *
 */
@Slf4j
@Component
public class GatewayRequestRateLimiterGatewayFilterFactory extends RequestRateLimiterGatewayFilterFactory {

    private final RateLimiter defaultRateLimiter;

    private final KeyResolver defaultKeyResolver;

    public GatewayRequestRateLimiterGatewayFilterFactory(RateLimiter defaultRateLimiter, KeyResolver defaultKeyResolver) {
        super(defaultRateLimiter, defaultKeyResolver);
        this.defaultRateLimiter = defaultRateLimiter;
        this.defaultKeyResolver = defaultKeyResolver;
        log.info("限流自定义返回加载");
    }

    @Override
    public GatewayFilter apply(Config config) {
        KeyResolver resolver = getOrDefault(config.getKeyResolver(), defaultKeyResolver);
        RateLimiter<Object> limiter = getOrDefault(config.getRateLimiter(), defaultRateLimiter);
        return (exchange, chain) -> resolver.resolve(exchange).flatMap(key -> {
            String routeId = config.getRouteId();
            if (routeId == null) {
                Route route = exchange.getAttribute(ServerWebExchangeUtils.GATEWAY_ROUTE_ATTR);
                routeId = route.getId();
            }
            String finalRouteId = routeId;
            return limiter.isAllowed(routeId, key).flatMap(response -> {
                for (Map.Entry<String, String> header : response.getHeaders().entrySet()) {
                    exchange.getResponse().getHeaders().add(header.getKey(), header.getValue());
                }
                if (response.isAllowed()) {
                    return chain.filter(exchange);
                }
                log.warn("已限流: {}", finalRouteId);
                ServerHttpResponse httpResponse = exchange.getResponse();
                //修改code为500
                httpResponse.setStatusCode(HttpStatus.TOO_MANY_REQUESTS);
                if (!httpResponse.getHeaders().containsKey("Content-Type")) {
                    httpResponse.getHeaders().add("Content-Type", "application/json");
                }
                Instant end = Instant.now();
                String dateTimeStr = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));
                //此处无法触发全局异常处理,手动返回
                DataBuffer buffer = httpResponse.bufferFactory().wrap(("{"
                        + "  \"code\": \"429\","
                        + "  \"message\": \"服务器限流\","
                        + "  \"data\": \"Server throttling\","
                        + "  \"time\": " + dateTimeStr + ","
                        + "  \"success\": false"
                        + "}").getBytes(StandardCharsets.UTF_8));
                return httpResponse.writeWith(Mono.just(buffer));
            });
        });
    }

    private <T> T getOrDefault(T configValue, T defaultValue) {
        return (configValue != null) ? configValue : defaultValue;
    }
}

3.6 测试

jmter开启十个线程,每秒钟请求1次,
限制请求后,网关直接返回429状态码

在这里插入图片描述
查看redis
在这里插入图片描述

四、Gateway令牌桶限流源码解析

Spring Cloud Gateway 中定义了关于限流的一个接口 RateLimiter,如下

package org.springframework.cloud.gateway.filter.ratelimit;

public interface RateLimiter<C> extends StatefulConfigurable<C> {
    Mono<RateLimiter.Response> isAllowed(String routeId, String id);
    }

这个接口就一个方法 isAllowed,第一个参数 routeId 表示请求路由的 ID,根据 routeId 可以获取限流相关的配置,第二个参数 id 表示要限流的对象的唯一标识,可以是用户名,也可以是 IP,或者其他的可以从 ServerWebExchange 中得到的信息。我们看下 RequestRateLimiterGatewayFilterFactory 中对 isAllowed 的调用逻辑:

 public GatewayFilter apply(RequestRateLimiterGatewayFilterFactory.Config config) {
 		// 从配置中得到 KeyResolver
        KeyResolver resolver = (KeyResolver)this.getOrDefault(config.keyResolver, this.defaultKeyResolver);
        // 从配置中得到 RateLimiter
        RateLimiter<Object> limiter = (RateLimiter)this.getOrDefault(config.rateLimiter, this.defaultRateLimiter);
        boolean denyEmpty = (Boolean)this.getOrDefault(config.denyEmptyKey, this.denyEmptyKey);
        HttpStatusHolder emptyKeyStatus = HttpStatusHolder.parse((String)this.getOrDefault(config.emptyKeyStatus, this.emptyKeyStatusCode));
        return (exchange, chain) -> {
            return resolver.resolve(exchange).defaultIfEmpty("____EMPTY_KEY__").flatMap((key) -> {
    // 通过KeyResolver得到key,作为唯一标识id传入isAllowed()方法
                if ("____EMPTY_KEY__".equals(key)) {
                    if (denyEmpty) {
                        ServerWebExchangeUtils.setResponseStatus(exchange, emptyKeyStatus);
                        return exchange.getResponse().setComplete();
                    } else {
                        return chain.filter(exchange);
                    }
                } else {
            // 获取当前路由ID,作为routeId参数传入isAllowed()方法
                    String routeId = config.getRouteId();
                    if (routeId == null) {
                        Route route = (Route)exchange.getAttribute(ServerWebExchangeUtils.GATEWAY_ROUTE_ATTR);
                        routeId = route.getId();
                    }

                    return limiter.isAllowed(routeId, key).flatMap((response) -> {
                        Iterator var4 = response.getHeaders().entrySet().iterator();

                        while(var4.hasNext()) {
                            Entry<String, String> header = (Entry)var4.next();
                            exchange.getResponse().getHeaders().add((String)header.getKey(), (String)header.getValue());
                        }

                        if (response.isAllowed()) {
                        // 请求允许,直接走到下一个 filter
                            return chain.filter(exchange);
                        } else {
// 请求被限流,返回设置的 HTTP 状态码(默认是 429)                     ServerWebExchangeUtils.setResponseStatus(exchange, config.getStatusCode());
                            return exchange.getResponse().setComplete();
                        }
                    });
                }
            });
        };
    }

从上面的的逻辑可以看出,通过实现 KeyResolver 接口的 resolve 方法就可以自定义要限流的对象了。

public interface KeyResolver {
    Mono<String> resolve(ServerWebExchange exchange);
}

比如下面的 HostAddrKeyResolver 可以根据 IP 来限流:


 public class HostAddrKeyResolver implements KeyResolver {
     @Override
     public Mono<String> resolve(ServerWebExchange exchange) {
         return Mono.just(exchange.getRequest().getRemoteAddress().getAddress().getHostAddress());
     }
}

我们继续看 Spring Cloud Gateway 的代码发现,RateLimiter 接口只提供了一个实现类 RedisRateLimiter:

在这里插入图片描述
很显然是基于 Redis 实现的限流,虽说通过 Redis 也可以实现单机限流,但是总感觉有些大材小用,而且对于那些没有 Redis 的环境很不友好。所以,我们要实现真正的本地限流。
我们从 Spring Cloud Gateway 的 pull request 中发现了一个新特性 Feature/local-rate-limiter,而且看提交记录,这个新特性很有可能会合并到 3.0.0 版本中。我们不妨来看下这个 local-rate-limiter 的实现:LocalRateLimiter.java,可以看出它是基于 Resilience4有意思的是,这个类 还有一个早期版本,是基于 Bucket4j 实现的:

public Mono<Response> isAllowed(String routeId, String id) {
    Config routeConfig = loadConfiguration(routeId);

    // How many requests per second do you want a user to be allowed to do?
    int replenishRate = routeConfig.getReplenishRate();

    // How many seconds for a token refresh?
    int refreshPeriod = routeConfig.getRefreshPeriod();

   // How many tokens are requested per request?
    int requestedTokens = routeConfig.getRequestedTokens();

    final io.github.resilience4j.ratelimiter.RateLimiter rateLimiter = RateLimiterRegistry
            .ofDefaults()
            .rateLimiter(id, createRateLimiterConfig(refreshPeriod, replenishRate));

   final boolean allowed = rateLimiter.acquirePermission(requestedTokens);
    final Long tokensLeft = (long) rateLimiter.getMetrics().getAvailablePermissions();

    Response response = new Response(allowed, getHeaders(routeConfig, tokensLeft));
   return Mono.just(response);

}

实现分布式请求频率限流
上面介绍了如何实现单机请求频率限流,接下来再看下分布式请求频率限流。这个就比较简单了,因为上面说了,Spring Cloud Gateway 自带了一个限流实现,就是 RedisRateLimiter,可以用于分布式限流。它的实现原理依然是基于令牌桶算法的,不过实现逻辑是放在一段 lua 脚本中的,我们可以在 src/main/resources/META-INF/scripts 目录下找到该脚本文件 request_rate_limiter.lua:

local tokens_key = KEYS[1]
local timestamp_key = KEYS[2]
--redis.log(redis.LOG_WARNING, "tokens_key " .. tokens_key)

local rate = tonumber(ARGV[1])
local capacity = tonumber(ARGV[2])
local now = tonumber(ARGV[3])
local requested = tonumber(ARGV[4])

local fill_time = capacity/rate
local ttl = math.floor(fill_time*2)

--redis.log(redis.LOG_WARNING, "rate " .. ARGV[1])
--redis.log(redis.LOG_WARNING, "capacity " .. ARGV[2])
--redis.log(redis.LOG_WARNING, "now " .. ARGV[3])
--redis.log(redis.LOG_WARNING, "requested " .. ARGV[4])
--redis.log(redis.LOG_WARNING, "filltime " .. fill_time)
--redis.log(redis.LOG_WARNING, "ttl " .. ttl)

local last_tokens = tonumber(redis.call("get", tokens_key))
if last_tokens == nil then
  last_tokens = capacity
end
--redis.log(redis.LOG_WARNING, "last_tokens " .. last_tokens)

local last_refreshed = tonumber(redis.call("get", timestamp_key))
if last_refreshed == nil then
  last_refreshed = 0
end
--redis.log(redis.LOG_WARNING, "last_refreshed " .. last_refreshed)

local delta = math.max(0, now-last_refreshed)
local filled_tokens = math.min(capacity, last_tokens+(delta*rate))
local allowed = filled_tokens >= requested
local new_tokens = filled_tokens
local allowed_num = 0
if allowed then
  new_tokens = filled_tokens - requested
  allowed_num = 1
end

--redis.log(redis.LOG_WARNING, "delta " .. delta)
--redis.log(redis.LOG_WARNING, "filled_tokens " .. filled_tokens)
--redis.log(redis.LOG_WARNING, "allowed_num " .. allowed_num)
--redis.log(redis.LOG_WARNING, "new_tokens " .. new_tokens)

if ttl > 0 then
  redis.call("setex", tokens_key, ttl, new_tokens)
  redis.call("setex", timestamp_key, ttl, now)
end

-- return { allowed_num, new_tokens, capacity, filled_tokens, requested, new_tokens }
return { allowed_num, new_tokens }

这段代码和上面介绍令牌桶算法时用 Java 实现的那段经典代码几乎是一样的。这里使用 lua 脚本,主要是利用了 Redis 的单线程特性,以及执行 lua 脚本的原子性,避免了并发访问时可能出现请求量超出上限的现象。想象目前令牌桶中还剩 1 个令牌,此时有两个请求同时到来,判断令牌是否足够也是同时的,两个请求都认为还剩 1 个令牌,于是两个请求都被允许了。

有两种方式来配置 Spring Cloud Gateway 自带的限流。第一种方式是通过配置文件,比如下面所示的代码,可以对某个 route 进行限流:

spring:
  cloud:
    gateway:
      routes:
      - id: test
        uri: lb://Provider  # 路由定义对应的微服务的转发地址:lb;负载均衡 + 服务名称
        filters:
          - name: RequestRateLimiter
            args:
              key-resolver: '#{@hostAddrKeyResolver}'
              redis-rate-limiter.replenishRate: 1
              redis-rate-limiter.burstCapacity: 3

其中,key-resolver 使用 SpEL 表达式 #{@beanName} 从 Spring 容器中获取 hostAddrKeyResolver 对象,burstCapacity 表示令牌桶的大小,replenishRate 表示每秒往桶中填充多少个令牌,也就是填充速度。

第二种方式是通过下面的代码来配置:

 @Bean
 public RouteLocator myRoutes(RouteLocatorBuilder builder) {
  return builder.routes()
    .route(p -> p
      .path("/get")
      .filters(filter -> filter.requestRateLimiter()
        .rateLimiter(RedisRateLimiter.class, rl -> rl.setBurstCapacity(3).setReplenishRate(1)).and())
      .uri("http://httpbin.org:80"))
   .build();
}

这样就可以对某个 route 进行限流了。但是这里有一点要注意,Spring Cloud Gateway 自带的限流器有一个很大的坑,replenishRate 不支持设置小数,也就是说往桶中填充的 token 的速度最少为每秒 1 个,所以,如果我的限流规则是每分钟 10 个请求(按理说应该每 6 秒填充一次,或每秒填充 1/6 个 token),这种情况 Spring Cloud Gateway 就没法正确的限流。网上也有人提了 issue,support greater than a second resolution for the rate limiter,但还没有得到解决。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1394675.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

HTML5+CSS3+JS小实例:实时给中文添加拼音

实例:实时给中文添加拼音 技术栈:HTML+CSS+JS 效果: 源码: 【HTML】 <!DOCTYPE html> <html lang="zh-CN"> <head><meta charset="UTF-8"><meta http-equiv="X-UA-Compatible" content="IE=edge"&…

GoZero微服务个人探究之路(零)个人对微服务产生原因的思考,对前三篇的补充

为什么产生了微服务架构--必要性 这里我觉得看GoZero作者写的博文就可以有很好的体会 具体的&#xff0c;他画了这一张图&#xff08;以电商后台系统为例子&#xff09; 所以&#xff0c;我个人产生了如下思考 1.业务逻辑越来越复杂&#xff0c;层层嵌套&#xff0c;分解成微…

力扣刷MySQL-第三弹(详细讲解)

&#x1f389;欢迎您来到我的MySQL基础复习专栏 ☆* o(≧▽≦)o *☆哈喽~我是小小恶斯法克&#x1f379; ✨博客主页&#xff1a;小小恶斯法克的博客 &#x1f388;该系列文章专栏&#xff1a;力扣刷题讲解-MySQL &#x1f379;文章作者技术和水平很有限&#xff0c;如果文中出…

CSS||Emmet语法

1、简介 ​ Emmet语法的前身是Zen coding,它使用缩写,来提高html/css的编写速度, Vscode内部已经集成该语法。 ​ 快速生成HTML结构语法 ​ 快速生成CSS样式语法 2、快速生成HTML结构语法 生成标签 直接输入标签名 按tab键即可 比如 div 然后tab 键&#xff0c; 就可以生成 <…

如何使用VNC实现Win系统远程桌面Ubuntu图形化界面【内网穿透】

文章目录 推荐前言1. ubuntu安装VNC2. 设置vnc开机启动3. windows 安装VNC viewer连接工具4. 内网穿透4.1 安装cpolar【支持使用一键脚本命令安装】4.2 创建隧道映射4.3 测试公网远程访问 5. 配置固定TCP地址5.1 保留一个固定的公网TCP端口地址5.2 配置固定公网TCP端口地址5.3 …

Go 中 slice 的 In 功能实现探索

文章目录 遍历二分查找map key性能总结 之前在知乎看到一个问题&#xff1a;为什么 Golang 没有像 Python 中 in 一样的功能&#xff1f;于是&#xff0c;搜了下这个问题&#xff0c;发现还是有不少人有这样的疑问。 补充&#xff1a;本文写于 2019 年。GO 现在已经支持泛型&am…

[Linux 进程(五)] 程序地址空间深度剖析

文章目录 1、前言2、什么是进程地址空间&#xff1f;3、进程地址空间的划分4、虚拟地址与物理地址的关系5、页表的作用扩展 6、为什么要有地址空间&#xff1f; 1、前言 Linux学习路线比较线性&#xff0c;也比较长&#xff0c;因此一个完整的知识点学习就会分布在两篇文章中&…

龙腾荆楚 | 软件供应链安全检测中心落地襄阳

1月16日&#xff0c;襄阳市东津新区“园区提质、企业满园”行动暨2024年东津云谷首月重大项目集中签约活动圆满完成&#xff0c;开源网安城市级项目再下一城&#xff0c;分别与襄阳市政府、高校、国投签订战略合作协议&#xff0c;推动荆楚地区数字政府、数字经济、数字社会、数…

首次公开发声,OpenAI CEO 奥特曼回忆“宫斗门”丨 RTE 开发者日报 Vol.129

开发者朋友们大家好&#xff1a; 这里是「RTE 开发者日报」&#xff0c;每天和大家一起看新闻、聊八卦。我们的社区编辑团队会整理分享 RTE &#xff08;Real Time Engagement&#xff09; 领域内「有话题的新闻」、「有态度的观点」、「有意思的数据」、「有思考的文章」、「…

风丘科技为您提供完整的ADAS测试方案

一 方案概述 随着5G通讯与互联网的快速发展&#xff0c;智能汽车和ADAS辅助系统的研究与发展在世界范围内也在如火如荼地进行。风丘科技紧跟时代脚步&#xff0c;经多年积累沉淀&#xff0c;携手整车厂与高校共同研发打造出了一套完整且适用于国内ADAS测试的系统方案。 | ADAS…

Python-基础篇-类与对象/面向对象程序设计-py脚本

面向对象基础 第一个面向对象 class Cat:def eat(self):print("小猫爱吃鱼")def drink(self):print("小猫要喝水")# 创建猫对象 tom Cat()tom.eat() tom.drink()print(tom)addr id(tom) print("%x" % addr)新建两个猫对象 class Cat:def ea…

【现代控制系统】LTI系统的反馈结构和状态估计器

LTI系统的反馈结构和状态估计器 2023年12月13日 #controlsys 文章目录 LTI系统的反馈结构和状态估计器1. 线性系统的反馈结构1.1 状态反馈/线性直接状态反馈1.2 反馈至状态微分的输出反馈1.3 反馈至参考输入的输出反馈 2. 状态反馈的极点配置算法2.1 状态反馈渐进跟踪问题——…

【Alibaba工具型技术系列】「EasyExcel技术专题」实战技术针对于项目中常用的Excel操作指南

这里写目录标题 EasyExcel教程Maven依赖 EasyExcel API分析介绍EasyExcel 注解通用参数ReadWorkbook&#xff08;理解成excel对象&#xff09;参数ReadSheet&#xff08;就是excel的一个Sheet&#xff09;参数注解参数通用参数 WriteWorkbook&#xff08;理解成excel对象&#…

60天干翻C++———— C++ 类和对象

C类和对象 类和对象的引入类的限定符类的特性类的作用域this 指针 默认成员函数构造函数析构函数拷贝构造函数运算符重载const成员 类和对象的引入 在c语言中&#xff0c;“数据”和“处理数据的函数“是分开声明的&#xff0c;也就是说c语言本身不支持”数据和函数“之间的关…

实战之-Redis商户查询缓存

一、什么是缓存? 前言:什么是缓存? 就像自行车,越野车的避震器 举个例子:越野车,山地自行车,都拥有"避震器",防止车体加速后因惯性,在酷似"U"字母的地形上飞跃,硬着陆导致的损害,像个弹簧一样; 同样,实际开发中,系统也需要"避震器",防止过高…

西门子1200和西门子200smart S7通讯

S7通讯是西门子以太网络通讯中最简单最常用的通讯。 下面来介绍200smart和1200之间如何进行S7通讯: 由于200smart和1200使用不同的编程软件&#xff0c;所以只能使用单端组态&#xff0c;我们这里以1200为客服端组态。 1.首先打开博图软件添加1200设备&#xff0c;这里选择1…

代码随想录算法训练营第23天 | 669. 修剪二叉搜索树 + 108.将有序数组转换为二叉搜索树 + 538.把二叉搜索树转换为累加树

今日任务 669. 修剪二叉搜索树 108.将有序数组转换为二叉搜索树 538.把二叉搜索树转换为累加树 总结篇 669. 修剪二叉搜索树 - Medium 题目链接&#xff1a;力扣&#xff08;LeetCode&#xff09;官网 - 全球极客挚爱的技术成长平台 给你二叉搜索树的根节点 root &#xf…

【多线程】认识Thread类及其常用方法

&#x1f4c4;前言&#xff1a; 本文是对以往多线程学习中 Thread类 的介绍&#xff0c;以及对其中的部分细节问题进行总结。 文章目录 一. 线程的 创建和启动&#x1f346;1. 通过继承 Thread 类创建线程&#x1f345;2. 通过实现 Runnable 接口创建线程&#x1f966;3. 其他方…

九、K8S-label和label Selector

label和label selector 标签和标签选择器 1、label 标签&#xff1a; 一个label就是一个key/value对 label 特性&#xff1a; label可以被附加到各种资源对象上一个资源对象可以定义任意数量的label同一个label可以被添加到任意数量的资源上 2、label selector 标签选择器 L…

Cellinx NVT 摄像机 UAC.cgi 任意用户创建漏洞复现

0x01 产品简介 Cellinx NVT IP PTZ是韩国Cellinx公司的一个摄像机设备。 0x02 漏洞概述 Cellinx NVT 摄像机 UAC.cgi接口处存在任意用户创建漏洞,未经身份认证的攻击者可利用此接口创建管理员账户,登录后台可查看敏感信息,使系统处于极不安全的状态。 0x03 复现环境 FO…