【源码解析】Spring Cloud Gateway使用RedisRateLimiter实现限流

news2025/1/16 20:12:19

实现方案

在gateway项目中引入依赖

  <dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-starter-gateway</artifactId>
</dependency>
 
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-data-redis-reactive</artifactId>
</dependency>

实现KeyResolver接口

	@Bean
	public KeyResolver userKeyResolver() {
		return exchange -> Mono.just(
				exchange.getRequest().getQueryParams().getFirst("userId")
				//exchange.getRequest().getHeaders().getFirst("X-Forwarded-For") 基于请求ip的限流
		);
	}

配置文件配置

spring:
  cloud:
    gateway:
      routes:
        - id: order_route
          uri: lb://app-cls
#          uri: http://localhost:28080
          order: 1
          predicates:
            - Path=/cls/**
          filters:
            - StripPrefix=1
            - name: RequestRateLimiter
              args:
                key-resolver: '#{@userKeyResolver}'
                redis-rate-limiter.replenishRate: 1
                redis-rate-limiter.burstCapacity: 2

源码分析

GatewayRedisAutoConfiguration会判断是否存在ReactiveRedisTemplate,如果存在,加载META-INF/scripts/request_rate_limiter.lua的redis限流脚本,往容器中注入RedisRateLimiter

@Configuration
@AutoConfigureAfter(RedisReactiveAutoConfiguration.class)
@AutoConfigureBefore(GatewayAutoConfiguration.class)
@ConditionalOnBean(ReactiveRedisTemplate.class)
@ConditionalOnClass({RedisTemplate.class, DispatcherHandler.class})
class GatewayRedisAutoConfiguration {

	@Bean
	@SuppressWarnings("unchecked")
	public RedisScript redisRequestRateLimiterScript() {
		DefaultRedisScript redisScript = new DefaultRedisScript<>();
		redisScript.setScriptSource(new ResourceScriptSource(new ClassPathResource("META-INF/scripts/request_rate_limiter.lua")));
		redisScript.setResultType(List.class);
		return redisScript;
	}

	@Bean
	//TODO: replace with ReactiveStringRedisTemplate in future
	public ReactiveRedisTemplate<String, String> stringReactiveRedisTemplate(
			ReactiveRedisConnectionFactory reactiveRedisConnectionFactory) {
		RedisSerializer<String> serializer = new StringRedisSerializer();
		RedisSerializationContext<String , String> serializationContext = RedisSerializationContext
				.<String, String>newSerializationContext()
				.key(serializer)
				.value(serializer)
				.hashKey(serializer)
				.hashValue(serializer)
				.build();
		return new ReactiveRedisTemplate<>(reactiveRedisConnectionFactory,
				serializationContext);
	}

	@Bean
	@ConditionalOnMissingBean
	public RedisRateLimiter redisRateLimiter(ReactiveRedisTemplate<String, String> redisTemplate,
											 @Qualifier(RedisRateLimiter.REDIS_SCRIPT_NAME) RedisScript<List<Long>> redisScript,
											 Validator validator) {
		return new RedisRateLimiter(redisTemplate, redisScript, validator);
	}
}

RouteDefinitionRouteLocator#convertToRoute。当定位到具体的路由,会加载GatewayFilter。而filterDefinitions会获取对应的工厂生成对应的GatewayFilter

	private Route convertToRoute(RouteDefinition routeDefinition) {
		AsyncPredicate<ServerWebExchange> predicate = combinePredicates(routeDefinition);
		List<GatewayFilter> gatewayFilters = getFilters(routeDefinition);

		return Route.async(routeDefinition)
				.asyncPredicate(predicate)
				.replaceFilters(gatewayFilters)
				.build();
	}

	private List<GatewayFilter> getFilters(RouteDefinition routeDefinition) {
		List<GatewayFilter> filters = new ArrayList<>();

		//TODO: support option to apply defaults after route specific filters?
		if (!this.gatewayProperties.getDefaultFilters().isEmpty()) {
			filters.addAll(loadGatewayFilters(DEFAULT_FILTERS,
					this.gatewayProperties.getDefaultFilters()));
		}

		if (!routeDefinition.getFilters().isEmpty()) {
			filters.addAll(loadGatewayFilters(routeDefinition.getId(), routeDefinition.getFilters()));
		}

		AnnotationAwareOrderComparator.sort(filters);
		return filters;
	}

	@SuppressWarnings("unchecked")
	private List<GatewayFilter> loadGatewayFilters(String id, List<FilterDefinition> filterDefinitions) {
		List<GatewayFilter> filters = filterDefinitions.stream()
				.map(definition -> {
					GatewayFilterFactory factory = this.gatewayFilterFactories.get(definition.getName());
					if (factory == null) {
                        throw new IllegalArgumentException("Unable to find GatewayFilterFactory with name " + definition.getName());
					}
					Map<String, String> args = definition.getArgs();
					if (logger.isDebugEnabled()) {
						logger.debug("RouteDefinition " + id + " applying filter " + args + " to " + definition.getName());
					}

                    Map<String, Object> properties = factory.shortcutType().normalize(args, factory, this.parser, this.beanFactory);

                    Object configuration = factory.newConfig();

                    ConfigurationUtils.bind(configuration, properties, factory.shortcutFieldPrefix(),
							definition.getName(), validator, conversionService);

                    GatewayFilter gatewayFilter = factory.apply(configuration);
                    if (this.publisher != null) {
                        this.publisher.publishEvent(new FilterArgsEvent(this, id, properties));
                    }
                    return gatewayFilter;
				})
				.collect(Collectors.toList());

		ArrayList<GatewayFilter> ordered = new ArrayList<>(filters.size());
		for (int i = 0; i < filters.size(); i++) {
			GatewayFilter gatewayFilter = filters.get(i);
			if (gatewayFilter instanceof Ordered) {
				ordered.add(gatewayFilter);
			}
			else {
				ordered.add(new OrderedGatewayFilter(gatewayFilter, i + 1));
			}
		}

		return ordered;
	}

RequestRateLimiterGatewayFilterFactory#applyRequestRateLimiterGatewayFilterFactory生成RateLimiter。根据KeyResolver的结果来执行limiter.isAllowed(route.getId(), key)

	@Override
	public GatewayFilter apply(Config config) {
		KeyResolver resolver = getOrDefault(config.keyResolver, defaultKeyResolver);
		RateLimiter<Object> limiter = getOrDefault(config.rateLimiter, defaultRateLimiter);
		boolean denyEmpty = getOrDefault(config.denyEmptyKey, this.denyEmptyKey);
		HttpStatusHolder emptyKeyStatus = HttpStatusHolder.parse(getOrDefault(config.emptyKeyStatus, this.emptyKeyStatusCode));

		return (exchange, chain) -> {
			Route route = exchange.getAttribute(ServerWebExchangeUtils.GATEWAY_ROUTE_ATTR);

			return resolver.resolve(exchange).defaultIfEmpty(EMPTY_KEY).flatMap(key -> {
				if (EMPTY_KEY.equals(key)) {
					if (denyEmpty) {
						setResponseStatus(exchange, emptyKeyStatus);
						return exchange.getResponse().setComplete();
					}
					return chain.filter(exchange);
				}
				return limiter.isAllowed(route.getId(), key).flatMap(response -> {

					for (Map.Entry<String, String> header : response.getHeaders().entrySet()) {
						exchange.getResponse().getHeaders().add(header.getKey(), header.getValue());
					}

					if (response.isAllowed()) {
						return chain.filter(exchange);
					}

					setResponseStatus(exchange, config.getStatusCode());
					return exchange.getResponse().setComplete();
				});
			});
		};
	}

RedisRateLimiter#isAllowed,根据配置中replenishRateburstCapacity执行脚本。redis 中会操作两个 key,request_rate_limiter.{xxx}.tokensrequest_rate_limiter.{xxx}.timestamp

	public Mono<Response> isAllowed(String routeId, String id) {
		if (!this.initialized.get()) {
			throw new IllegalStateException("RedisRateLimiter is not initialized");
		}

		Config routeConfig = loadConfiguration(routeId);

		// How many requests per second do you want a user to be allowed to do?
		int replenishRate = routeConfig.getReplenishRate();

		// How much bursting do you want to allow?
		int burstCapacity = routeConfig.getBurstCapacity();

		try {
			List<String> keys = getKeys(id);


			// The arguments to the LUA script. time() returns unixtime in seconds.
			List<String> scriptArgs = Arrays.asList(replenishRate + "", burstCapacity + "",
					Instant.now().getEpochSecond() + "", "1");
			// allowed, tokens_left = redis.eval(SCRIPT, keys, args)
			Flux<List<Long>> flux = this.redisTemplate.execute(this.script, keys, scriptArgs);
					// .log("redisratelimiter", Level.FINER);
			return flux.onErrorResume(throwable -> Flux.just(Arrays.asList(1L, -1L)))
					.reduce(new ArrayList<Long>(), (longs, l) -> {
						longs.addAll(l);
						return longs;
					}) .map(results -> {
						boolean allowed = results.get(0) == 1L;
						Long tokensLeft = results.get(1);

						Response response = new Response(allowed, getHeaders(routeConfig, tokensLeft));

						if (log.isDebugEnabled()) {
							log.debug("response: " + response);
						}
						return response;
					});
		}
		catch (Exception e) {
			/*
			 * We don't want a hard dependency on Redis to allow traffic. Make sure to set
			 * an alert so you know if this is happening too much. Stripe's observed
			 * failure rate is 0.01%.
			 */
			log.error("Error determining if user allowed from redis", e);
		}
		return Mono.just(new Response(true, getHeaders(routeConfig, -1L)));
	}

限流脚本解读

  1. 获取上次获取令牌的时间,获取这段时间可以产生的令牌数。比较容量和令牌数的最小值,为当前可以获取的令牌数。
  2. 更新令牌数,判断当前可获取的令牌数和需要的令牌数的大小,如果可以获取,减去获取到的令牌数。
redis.replicate_commands()

local tokens_key = KEYS[1]
local timestamp_key = KEYS[2]
--redis.log(redis.LOG_WARNING, "tokens_key " .. tokens_key)

local rate = tonumber(ARGV[1])
local capacity = tonumber(ARGV[2])
local now = redis.call('TIME')[1]
local requested = tonumber(ARGV[4])

local fill_time = capacity/rate
local ttl = math.floor(fill_time*2)

--redis.log(redis.LOG_WARNING, "rate " .. ARGV[1])
--redis.log(redis.LOG_WARNING, "capacity " .. ARGV[2])
--redis.log(redis.LOG_WARNING, "now " .. now)
--redis.log(redis.LOG_WARNING, "requested " .. ARGV[4])
--redis.log(redis.LOG_WARNING, "filltime " .. fill_time)
--redis.log(redis.LOG_WARNING, "ttl " .. ttl)

local last_tokens = tonumber(redis.call("get", tokens_key))
if last_tokens == nil then
  last_tokens = capacity
end
--redis.log(redis.LOG_WARNING, "last_tokens " .. last_tokens)

local last_refreshed = tonumber(redis.call("get", timestamp_key))
if last_refreshed == nil then
  last_refreshed = 0
end
--redis.log(redis.LOG_WARNING, "last_refreshed " .. last_refreshed)

local delta = math.max(0, now-last_refreshed)
local filled_tokens = math.min(capacity, last_tokens+(delta*rate))
local allowed = filled_tokens >= requested
local new_tokens = filled_tokens
local allowed_num = 0
if allowed then
  new_tokens = filled_tokens - requested
  allowed_num = 1
end

--redis.log(redis.LOG_WARNING, "delta " .. delta)
--redis.log(redis.LOG_WARNING, "filled_tokens " .. filled_tokens)
--redis.log(redis.LOG_WARNING, "allowed_num " .. allowed_num)
--redis.log(redis.LOG_WARNING, "new_tokens " .. new_tokens)

if ttl > 0 then
  redis.call("setex", tokens_key, ttl, new_tokens)
  redis.call("setex", timestamp_key, ttl, now)
end

-- return { allowed_num, new_tokens, capacity, filled_tokens, requested, new_tokens }
return { allowed_num, new_tokens }
标题复制10行,并且每行大于10个字符 标题复制10行,并且每行大于10个字符 标题复制10行,并且每行大于10个字符 标题复制10行,并且每行大于10个字符 标题复制10行,并且每行大于10个字符 标题复制10行,并且每行大于10个字符 标题复制10行,并且每行大于10个字符 标题复制10行,并且每行大于10个字符 标题复制10行,并且每行大于10个字符 标题复制10行,并且每行大于10个字符

在这里插入图片描述

Spring Cloud Gateway使用RedisRateLimiter实现限流
Spring Cloud Gateway使用RedisRateLimiter实现限流
Spring Cloud Gateway使用RedisRateLimiter实现限流
Spring Cloud Gateway使用RedisRateLimiter实现限流
Spring Cloud Gateway使用RedisRateLimiter实现限流
Spring Cloud Gateway使用RedisRateLimiter实现限流

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/501161.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

3BHB003154R0101确定每个控制器将如何知道设备地址、识别发给它的消息

3BHB003154R0101确定每个控制器将如何知道设备地址、识别发给它的消息 DNP3 协议用于各种 SCADA 系统组件之间的通信。这些系统组件包括 SCADA 主站或HMI、远程终端单元和智能电子设备。SCADA 系统的操作员可以在其操作中监控 DNP3 协议&#xff0c;以提高系统可靠性。这将通过…

java版深圳 工程管理系统软件 自主研发,工程行业适用 软件源码

Java版工程项目管理系统 Spring CloudSpring BootMybatisVueElementUI前后端分离 功能清单如下&#xff1a; 首页 工作台&#xff1a;待办工作、消息通知、预警信息&#xff0c;点击可进入相应的列表 项目进度图表&#xff1a;选择&#xff08;总体或单个&#xff09;项目显示…

Redis 常见缓存问题与解决方案

文章目录 1. 缓存穿透解决方法 2. 缓存击穿解决方法 3. 缓存雪崩解决方法 在 redis 的应用场景中&#xff0c;需要考虑缓存在某些场景下可能出现的问题&#xff1a; 缓存穿透 缓存击穿 缓存雪崩 以下缓存问题的讨论都是基于以下应用架构讨论的&#xff1a; 1. 缓存穿透 对应…

Python 中的字典顺序

文章目录 Python 中的字典顺序在 Python 中将数字列表按词典顺序排序 我们将介绍 Python 中的字典顺序。 我们还将通过示例讨论实现词典顺序的不同方法。 Python 中的字典顺序 在数学中&#xff0c;词典顺序或词典顺序是对按字母顺序排列的元素列表或元素数组进行排序的过程。…

Java学习之Swing图形界面

Java提供的Swing组件众多&#xff0c;下面列举其中的几种&#xff0c;本章主要讲解顶层容器&#xff0c;其余容器在下面几章会做讲解。 1、顶层容器 1&#xff09;顶层容器就是不包含在其他容器中的容器&#xff0c;Swing中常见的顶层容器有JFrame&#xff0c;JFrame被称为窗口…

分享78个C 源码,总有一款适合您

C 源码 分享78个C 源码&#xff0c;总有一款适合您 源码下载链接&#xff1a;https://pan.baidu.com/s/1_vslGj8XQUGbUhQFnKZg4g?pwdoe87 提取码&#xff1a;oe87 OpenCV计算机视觉库 v4.7.0 OpenCV计算机视觉库 v3.4.19 Photoflare图像编辑器v1.6.12 开源向量数据库mil…

Lecture 13(Extra Material):PPO

On-policy v.s.Off-policy On-policy: The agent learned and the agent interacting with the environment is the same.Off-policy: The agent learned and the agent interacting with the environment is different. Issue of Importance Sampling: 尽管q可以是任意的&am…

day43—编程题

文章目录 1.第一题1.1题目1.2思路1.3解题 2.第二题2.1题目2.2思路2.3解题 1.第一题 1.1题目 描述&#xff1a; 输入两个整数 n 和 m&#xff0c;从数列1&#xff0c;2&#xff0c;3…n 中随意取几个数,使其和等于 m ,要求将其中所有的可能组合列出来 输入描述: 每个测试输入包…

Java 基础进阶篇(十二)—— Stream 流常用方法总结

文章目录 一、Stream流概述二、获取Stream流2.1 集合获取 Stream 流2.2 数组获取 Stream 流 三、中间方法四、终结方法五、Stream流的综合应用六、收集Stream流 一、Stream流概述 Stream 流是在 Java8 中&#xff0c;得益于 Lambda 所带来的函数式编程&#xff0c; 引入了一个…

前端技术——css

1.CSS的引入 【1】为什么要学习CSS? 如果只用HEML画页面的话--->这个页面就是页面上需要的元素罗列起来&#xff0c;但是页面效果很差&#xff0c;不好看&#xff0c;为了让页面好看&#xff0c;为了修饰页面。所以我们需要用到CSS。 CSS的作用&#xff1a;修饰HTML页面…

总结844

学习目标&#xff1a; 月目标&#xff1a;5月&#xff08;张宇强化前10讲&#xff0c;背诵15篇短文&#xff0c;熟词僻义300词基础词&#xff09; 周目标&#xff1a;张宇强化前3讲并完成相应的习题并记录&#xff0c;英语背3篇文章并回诵 每日必复习&#xff08;5分钟&#…

密码学:流密码.(对称密码)

密码学&#xff1a;流密码. 流密码(Stream Cipher)属于对称密码算法中的一种&#xff0c;其基本特征是加解密双方使用一串与明文长度相同的密钥流&#xff0c;与明文流组合来进行加解密密钥流通常是由某一确定状态的伪随机数发生器所产生的比特流&#xff0c;双方将伪随机数生…

数据结构-二叉树遍历线索二叉树

目录 一、二叉树的定义 *几种特殊的二叉树 *二、二叉树的性质 三、二叉树的存储结构 *四、二叉树的遍历 *4.1先序遍历 * 4.2中序遍历 * 4.3后序遍历 非递归算法遍历 *4.4层序遍历 *五、遍历序列构造二叉树 六、线索二叉树 6.1逻辑结构: * 6.2构造线索二叉树 一、二…

Mybatis Plus | 快速入门

&#x1f497;wei_shuo的个人主页 &#x1f4ab;wei_shuo的学习社区 &#x1f310;Hello World &#xff01; Mybatis Plus MyBatis-Plus&#xff08;简称 MP&#xff09;是一个基于 MyBatis 的增强工具&#xff0c;它对 Mybatis 的基础功能进行了增强&#xff0c;但未做任何改…

Qt 多语言界面设计概述

1、多语言界面设计概述 有些软件需要开发多语言界面版本&#xff0c;如中文版和英文版&#xff0c;并且在软件里可以方便地切换界面语言。Qt 为多语言界面提供了很好的支持&#xff0c;使用 Qt 的一些规则和工具&#xff0c;可以很方便地为应用程序开发提供多语言界面支持。 …

Shell脚本函数简介及运用(喜欢,适合,能在一起,是三码事)

一、函数的作用 语句块定义成函数约等于别名&#xff0c;定义函数&#xff0c;再引用函数 封装的可重复利用的具有特定功能的代码 二、定义函数 定义函数就是只将一段实现某个任务的命令序列封装进一个函数&#xff0c;便于使用和后期维护。 function 函数名() { 命令序列 }…

快速原型设计工具(Axure)的安装、汉化

〇、一些名词解释&#xff1a; 1. 草图 一般主要用于产品整理思路&#xff0c;寻找灵感&#xff0c;或者在产品团队内部互相讨论碰撞火花时使用。画 给自己看的&#xff0c;想怎么画就怎么画。 2. 低保真 打个比喻来说就像&#xff0c;用于“生产的图纸”&#xff0c;要简单易读…

Postgres:Win/Linux环境安装及一键部署脚本

1.Win安装Postgres &#xff08;1&#xff09;下载安装包 &#xff08;2&#xff09;开始安装 修改安装目录 选择要安装的组件 data也就是库表及数据的.dba文件存放目录 密码设置 端口设置 next next 开始安装 安装完成&#xff0c;Stack Builder 根据需要选择是否安装。仅仅是…

03-Docker容器命令

新建启动容器 docker run [OPTIONS] IMAGE [COMMAND] [ARG...]常用的参数&#xff1a; --namenew_name&#xff1a;为容器指定一个名称-d&#xff1a;后台运行容器并返回容器ID&#xff0c;即启动守护式容器-i&#xff1a;以交互模式&#xff08;interactive&#xff09;运行…

【三十天精通Vue 3】第二十九天 Vue 3中的Mock数据模拟详解

✅创作者:陈书予 🎉个人主页:陈书予的个人主页 🍁陈书予的个人社区,欢迎你的加入: 陈书予的社区 🌟专栏地址: 三十天精通 Vue 3 文章目录 引言一、Mock数据模拟的概述1.1 为什么需要Mock数据模拟?1.2 Mock数据模拟的优点和缺点二、安装和配置Mock.js库2.1 使用NPM安…