文章目录
- 前言
- 1. 计数器算法(固定窗口限流器)
- 2. 滑动窗口日志限流器
- 3. 漏桶算法(Leaky Bucket)
- 4. 令牌桶算法(Token Bucket)
- 5. 限流队列
- 应用场景
- 实现工具
- 一、Redisson简介
- 二、Redisson限流器的原理
- 三、Redisson限流器技术的应用
- 四、gateway自定义接口限流实现
- 1. maven
- 2. 接口限流数据结构定义RateLimitProperties
- 3. RedissonClient 注入
- 4. redis 初始化写入限流配置
- 5. 接口过滤限流规则
- 6. nacos配置热更新redis配置
- 7. 接口匹配工具类
- 总结
前言
在当今互联网时代,面对快速增长的用户流量和不断扩大的系统规模,合理的限流策略变得愈发重要。针对这一问题,Redisson作为一款高性能的分布式应用开发框架,在限流方面拥有出色的技术。本文将深入探讨Redisson的限流器技术,为读者详细介绍其原理和应用。
在软件架构中,限流器(Rate Limiter)是一种用于控制资源利用、维持服务质量和防止系统过载的重要组件。限流器可以按照不同的策略和算法来实现,主要包括以下几种类型:
1. 计数器算法(固定窗口限流器)
原理:在一个固定的时间窗口内计算请求的数量,如果请求超过了预设的最大阈值,则拒绝后续的请求,直到下一个时间窗口开始。
优点:简单易理解和实现。
缺点:在窗口切换的时刻可能会发生两倍于阈值的请求量,即“窗口边缘效应”。
2. 滑动窗口日志限流器
原理:改进了固定窗口算法的边缘效应。它记录了每个请求的时间戳,将时间窗口分成多个小窗口,滑动时只统计当前时间窗口内的请求量。
优点:比固定窗口算法更平滑,减少了窗口切换时的峰值。
缺点:需要记录更多的请求信息,可能会增加系统开销。
3. 漏桶算法(Leaky Bucket)
原理:所有的请求都被放入到一个固定容量的桶里,请求按照固定的速率从桶中“漏出”(被处理)。如果桶满了,则会丢弃进来的请求。
优点:输出流量比较平滑,即使短时间内有大量请求到达,输出速率也是恒定的。
缺点:不具备突发流量的处理能力,对于突发请求仍然可能导致请求被丢弃。
4. 令牌桶算法(Token Bucket)
原理:令牌以固定速率被添加到桶中,每个请求必须消耗一个令牌才能被处理。如果桶中没有令牌,则请求被排队或丢弃,直到有令牌可用。
优点:允许一定程度的突发流量,因为桶中可以累积一些令牌。
缺点:如果突发流量持续时间较长,超过了桶的容量,那么超出的请求仍然会被丢弃或延迟处理。
5. 限流队列
原理:请求首先进入一个队列,队列根据系统的处理能力和预设的规则控制请求的进入和退出。
优点:可以根据系统负载动态调整请求的进出,对系统影响较小。
缺点:实现相对复杂,需要精确控制队列的大小和请求的处理速率。
应用场景
- API网关:在API网关层面进行限流,可以防止过多的请求直接压垮后端服务。
- 分布式系统:在分布式服务之间进行限流,可以协调不同服务的负载,防止某些服务成为瓶颈。
- 微服务架构:对单个微服务的接口进行限流,保证服务的稳定性。
实现工具
- Guava RateLimiter:基于令牌桶算法,是Java中一个广泛使用的限流实现。
- Nginx:使用漏桶算法进行HTTP请求的限流。
- Redis:可以使用Redis的原子性操作和脚本来实现复杂的限流策略。
选择合适的限流策略应当基于系统的具体需求,比如是否需要应对突发流量、系统的可用性要求、实现的复杂度等因素。在实际应用中,很可能会结合多种策略来实现更加精细化的流量控制。
一、Redisson简介
Redisson是一个基于Redis的Java驻留内存(In-Memory)数据网格中间件,提供了丰富的分布式对象和服务,可用于开发高性能、可扩展的分布式应用。其中,Redisson的限流器技术是其重要的功能之一。
二、Redisson限流器的原理
Redisson限流器采用了一种基于令牌桶算法的策略。令牌桶算法通过维护一个固定容量的令牌桶,在每个时间段内产生若干个令牌,请求在获取令牌后才能进行处理。这种算法可以有效控制流量,并在一定程度上保护系统免受突然的高并发压力。
三、Redisson限流器技术的应用
-
系统流控
Redisson的限流器技术可以用于对系统进行流量控制,确保系统在高负载情况下依然能够提供稳定可靠的服务。通过限制请求的速率和并发数,可以避免系统因为过载而崩溃或变得不可用。 -
接口保护
对于一些重要的接口或某些敏感资源,我们可能希望限制用户的访问频率,以避免恶意的使用或攻击。Redisson的限流器技术可以很好地实现这一目的,通过限制请求的速率和次数,有效保护接口的安全性和可用性。 -
基于用户的限流
对于一些特定用户或特定用户组,我们可能需要设置不同的限流策略。Redisson提供了基于用户的限流管理能力,可以根据用户身份信息或其他特征进行流量控制,满足个性化的限流需求。
四、gateway自定义接口限流实现
1. maven
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis-reactive</artifactId>
</dependency>
<dependency>
<groupId>org.redisson</groupId>
<artifactId>redisson-spring-boot-starter</artifactId>
<version>3.16.1</version>
</dependency>
2. 接口限流数据结构定义RateLimitProperties
gateway.yaml
gateway:
rate-limit:
configs:
- name: 服务1
option:
- uri: 接口1/*
matchType: 1
size: 200
microSecond: 1000
- name: 服务2
option:
- uri: 接口1/*
matchType: 1
size: 1
second: 1
- uri: 接口2
matchType: 0
size: 1
microSecond: 1000
- name: 服务3
option:
- uri: 接口1
matchType: 0
size: 10
microSecond: 1000
consumer: 1
waitTimer: 1000
限流配置类 RateLimitProperties
import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import lombok.Data;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.stereotype.Component;
import java.util.ArrayList;
import java.util.List;
/**
* 限流配置获取,支持热更新
*/
@Data
@Component
@JsonIgnoreProperties(ignoreUnknown = true)
@ConfigurationProperties(prefix = "gateway.rate-limit")
public class RateLimitProperties {
private List<Configs> configs;
@Data
@Component
@ConfigurationProperties(prefix = "gateway.rate-limit.configs")
public static class Configs {
private String name;
private List<Option> option = new ArrayList<>();
@Data
@Component
@JsonIgnoreProperties(ignoreUnknown = true)
@ConfigurationProperties(prefix = "gateway.rate-limit.configs.option")
public static class Option {
private String uri; //api uri
private int matchType; // 匹配类型 0 完整匹配 1 模糊匹配
private int size; //令牌桶 大小
private int microSecond = 1000; //生成令牌的周期
private int consumer = 1; //一个请求消耗令牌数量
private int waitTimer = 0; // 令牌没有时,等待最多xx ms后拿令牌
}
}
}
3. RedissonClient 注入
RedisConfig
import com.fasterxml.jackson.annotation.JsonAutoDetect;
import com.fasterxml.jackson.annotation.JsonTypeInfo;
import com.fasterxml.jackson.annotation.PropertyAccessor;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.jsontype.impl.LaissezFaireSubTypeValidator;
import org.redisson.Redisson;
import org.redisson.api.RedissonClient;
import org.redisson.config.Config;
import org.redisson.config.TransportMode;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.data.redis.RedisProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.serializer.Jackson2JsonRedisSerializer;
import org.springframework.data.redis.serializer.RedisSerializer;
import org.springframework.data.redis.serializer.StringRedisSerializer;
@Configuration
public class RedisConfig {
@Value("${spring.redis.host}")
private String redisAddress;
@Value("${spring.redis.port}")
private String redisPort;
@Value("${spring.redis.password}")
private String redisPas;
@Value("${spring.redis.database:1}")
private Integer redisDb;
@Bean
public RedissonClient redissonClient(RedisProperties redisProperties) {
Config config = new Config();
config.setTransportMode(TransportMode.NIO);
config.useSingleServer().setAddress("redis://" + redisAddress + ":" + redisPort)
.setPassword(redisPas)
.setDatabase(redisDb);
return Redisson.create(config);
}
public static void redisSerialize(RedisTemplate redisTemplate) {
// 2.设置统一序列化规则
RedisSerializer stringRedisSerializer = new StringRedisSerializer();
redisTemplate.setKeySerializer(stringRedisSerializer);
redisTemplate.setHashKeySerializer(stringRedisSerializer);
//Json序列化配置
Jackson2JsonRedisSerializer jackson2JsonRedisSerializer = serializer();
redisTemplate.setValueSerializer(jackson2JsonRedisSerializer);
redisTemplate.setHashValueSerializer(jackson2JsonRedisSerializer);
redisTemplate.setEnableTransactionSupport(false);
}
private static Jackson2JsonRedisSerializer serializer() {
//Json序列化配置
Jackson2JsonRedisSerializer jackson2JsonRedisSerializer = new Jackson2JsonRedisSerializer(Object.class);
ObjectMapper om = new ObjectMapper();
om.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);
om.activateDefaultTyping(LaissezFaireSubTypeValidator.instance,
ObjectMapper.DefaultTyping.NON_FINAL,
JsonTypeInfo.As.WRAPPER_ARRAY);
jackson2JsonRedisSerializer.setObjectMapper(om);
return jackson2JsonRedisSerializer;
}
}
4. redis 初始化写入限流配置
ApiRateLimitInit
import cn.hutool.core.util.ObjectUtil;
import cn.hutool.core.util.StrUtil;
import lombok.extern.slf4j.Slf4j;
import org.redisson.api.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.core.RedisTemplate;
import javax.annotation.Resource;
import java.util.List;
@Configuration
@Slf4j
public class ApiRateLimitInit {
@Resource
private RateLimitProperties properties;
@Resource
private RedisTemplate<String, RateLimitProperties.Configs> redisTemplate;
@Resource
private RedissonClient redissonClient;
@Bean
public void initApis() {
List<RateLimitProperties.Configs> configs = properties.getConfigs();
//将最新的配置放到redis中,在做限流判断的代码中,直接从redis中读取最新的配置
redisLimitConfig(configs);
}
public void redisLimitConfig(List<RateLimitProperties.Configs> configs) {
RedisConfig.redisSerialize(redisTemplate);
redisTemplate.delete("ApiRateLimitFilter");
RKeys keys = redissonClient.getKeys();
keys.deleteByPattern("*buslimiters*");
if (ObjectUtil.isNotNull(configs)) {
redisTemplate.opsForList().leftPushAll("ApiRateLimitFilter", configs);
//重新初始化所有limiter
for (RateLimitProperties.Configs temp : configs) {
temp.getOption().forEach(o -> {
RRateLimiter limiter = redissonClient.getRateLimiter(StrUtil.format("buslimiters:{}:{}", temp.getName(), o.getUri()));
limiter.trySetRate(RateType.OVERALL, o.getSize(), o.getMicroSecond(), RateIntervalUnit.MILLISECONDS);
});
}
}
}
}
5. 接口过滤限流规则
ApiRateLimitFilter
import cn.hutool.core.util.StrUtil;
import com.gsafety.bg.gsdss.gateway.contants.CommonConstant;
import com.gsafety.bg.gsdss.gateway.utils.ResponseUtils;
import com.gsafety.bg.gsdss.gateway.utils.WebUtils;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.redisson.api.RRateLimiter;
import org.redisson.api.RedissonClient;
import org.springframework.cloud.gateway.filter.GatewayFilterChain;
import org.springframework.cloud.gateway.filter.GlobalFilter;
import org.springframework.core.Ordered;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Component;
import org.springframework.web.server.ServerWebExchange;
import reactor.core.publisher.Mono;
import javax.annotation.Resource;
import java.util.List;
import java.util.concurrent.TimeUnit;
@Slf4j
@Component
public class ApiRateLimitFilter implements GlobalFilter, Ordered {
@Resource
private RedissonClient redissonClient;
@Resource
private RedisTemplate<String, RateLimitProperties.Configs> redisTemplate;
@Override
public int getOrder() {
return 0;
}
@Override
public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
String uri = exchange.getRequest().getURI().getPath().substring(1);
int index = uri.indexOf("/");
String microServiceName = StringUtils.substringBefore(uri, "/");
String apiPath = uri.substring(index);
List<RateLimitProperties.Configs> configs = redisTemplate.opsForList().range("ApiRateLimitFilter", 0, -1);
List<RateLimitProperties.Configs.Option> option = configs.stream()
.filter(s -> microServiceName.equals(s.getName()))
.findFirst().orElse(new RateLimitProperties.Configs()).getOption();
RateLimitProperties.Configs.Option thisOption = WebUtils.matchesRateLimit(apiPath, option);
//不在限流配置中
if (thisOption == null) {
return chain.filter(exchange);
}
RRateLimiter limiter = redissonClient.getRateLimiter(StrUtil.format("buslimiters:{}:{}", microServiceName, thisOption.getUri()));
boolean acquire = limiter.tryAcquire(thisOption.getConsumer(), thisOption.getWaitTimer(), TimeUnit.MICROSECONDS);
if (acquire) {
return chain.filter(exchange);
}
log.info(uri + "限流");
//exchange 信息放入kafka中,等待消费
return ResponseUtils.success(exchange.getResponse(), CommonConstant.ErrorResp.BUSY_ERROR);
}
}
6. nacos配置热更新redis配置
RateLimitNacosListener
import cn.hutool.json.JSONUtil;
import com.alibaba.cloud.nacos.NacosConfigManager;
import com.alibaba.cloud.nacos.NacosConfigProperties;
import com.alibaba.nacos.api.config.listener.Listener;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.dataformat.yaml.YAMLMapper;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Configuration;
import javax.annotation.Resource;
import java.util.List;
import java.util.concurrent.Executor;
@Slf4j
@Configuration
public class RateLimitNacosListener implements InitializingBean {
@Value("${spring.application.name}")
private String appName;
@Resource
private NacosConfigManager nacosConfigManager;
@Resource
private NacosConfigProperties configProperties;
@Resource
private ApiRateLimitInit rateLimitInit;
@Override
public void afterPropertiesSet() throws Exception {
nacosConfigManager.getConfigService().addListener(appName + ".yaml", configProperties.getGroup(),
new Listener() {
@Override
public Executor getExecutor() {
return null;
}
@Override
public void receiveConfigInfo(String configInfo) {
YAMLMapper yamlMapper = new YAMLMapper();
try {
JsonNode jsonNode = yamlMapper.readTree(configInfo);
String configStr = jsonNode.get("gateway").get("rate-limit").get("configs").toPrettyString();
List<RateLimitProperties.Configs> configs = JSONUtil.toList(configStr, RateLimitProperties.Configs.class);
//将最新的配置放到redis中,在做限流判断的代码中,直接从redis中读取最新的配置
rateLimitInit.redisLimitConfig(configs);
log.info("config更新:" + configStr);
} catch (JsonProcessingException e) {
throw new RuntimeException(e);
}
}
});
}
}
7. 接口匹配工具类
WebUtils
/**
* WebUtils 工具类
*
*/
public class WebUtils {
/**
* 获取限流接口对象
*/
public static RateLimitProperties.Configs.Option matchesRateLimit(String uri, List<RateLimitProperties.Configs.Option> all) {
PathMatcher matcher = new AntPathMatcher();
List<RateLimitProperties.Configs.Option> allMatch = all.stream().filter(o -> !o.getUri().endsWith("*")).collect(Collectors.toList());
for (RateLimitProperties.Configs.Option option : allMatch) {
if (StringUtils.equals(option.getUri(), uri)) {
return option;
}
}
List<RateLimitProperties.Configs.Option> fuzzyMatch = all.stream().filter(o -> o.getUri().endsWith("*")).collect(Collectors.toList());
for (RateLimitProperties.Configs.Option option : fuzzyMatch) {
if (matcher.match(option.getUri(), uri)) {
return option;
}
}
return null;
}
}
ResponseUtils
public class ResponseUtils {
public static Mono<Void> error(ServerHttpResponse response, Result result) {
//返回错误
response.getHeaders().add(HttpHeaders.CONTENT_TYPE, MediaType.APPLICATION_JSON_UTF8_VALUE);
response.setStatusCode(HttpStatus.UNAUTHORIZED);
DataBuffer buffer = response.bufferFactory().wrap(JSONUtil.toJsonStr(result).getBytes(StandardCharsets.UTF_8));
return response.writeWith(Mono.just(buffer));
}
public static Mono<Void> success(ServerHttpResponse response, Result result) {
//返回200
response.getHeaders().add(HttpHeaders.CONTENT_TYPE, MediaType.APPLICATION_JSON_UTF8_VALUE);
response.setStatusCode(HttpStatus.OK);
DataBuffer buffer = response.bufferFactory().wrap(JSONUtil.toJsonStr(result).getBytes(StandardCharsets.UTF_8));
return response.writeWith(Mono.just(buffer));
}
}
总结
其他方案尝试:
gateway自带限流器:令牌桶算法实现,支持自定义key-resolver,即限流规则(用户、ip、请求地址等),但是只支持服务级别限流,无法限制到具体接口
Redisson的限流器技术是一项用于控制系统流量并保护重要接口的强大工具。它通过令牌桶算法,提供了灵活的流控策略和个性化的限流管理,帮助开发者有效应对高负载和突发流量的挑战。在实际应用中,合理利用Redisson的限流器技术,可以提高系统的稳定性、安全性和可用性,为用户提供更好的体验。