目录
一、前言
二、安装Redis
1、Windows安装Redis
2、启动方式
3、设置密码
三、项目集成Redission客户端
1、引入依赖
四、实用场景
1、操作缓存
2、分布式锁
3、限流
3.1 创建限流器
3.2 设置限流参数
3.3 获取令牌
3.4 带超时时间获取令牌
3.5 总结
一、前言
Redis是一个开源的高性能键值存储数据库,它提供了多种数据结构来存储数据,如字符串、哈希、列表、集合、有序集合等。Redis将数据存储在内存中,以提供快速的读写访问速度,并且能够通过异步的方式将数据持久化到磁盘上。它支持复制、Lua脚本、事务处理、不同级别的持久化选项以及多种客户端语言的接口。Redis广泛用于缓存、消息队列、短时数据存储和高性能的应用场景中。
通常在SpringBoot项目中集成redis有两种方式:spring-boot-starter-data-redis和redisson-spring-boot-starter。但它们在功能、使用方式、性能以及集成方面存在一些差异。下面是对这两者的详细对比:
1. 集成方式 Spring Data Redis: 是Spring框架的一部分,提供了对Redis的高级抽象,使得Redis操作更加面向对象和易于使用。 通常与Spring Boot一起使用,通过spring-boot-starter-data-redis依赖自动配置。 使用Jedis或Lettuce作为底层客户端。 Redisson: 是一个独立的Redis客户端,提供了比Spring Data Redis更丰富的功能,如分布式数据结构(如RMap, RSet, RQueue等),分布式锁和各种原子操作。 需要手动配置和使用,不直接集成到Spring框架中,但可以通过Redisson Spring Boot Starter简化集成。 主要使用Netty进行网络通信,支持多种序列化机制。 2. 功能特性 Spring Data Redis: 支持基本的Redis操作,如键值对存储、列表、集合、有序集合等。 提供模板类(如RedisTemplate),简化Redis操作。 支持发布/订阅、地理空间等高级功能。 Redisson: 提供了比Spring Data Redis更广泛的分布式数据结构支持。 支持分布式锁、信号量、原子长整型等分布式数据结构。 内置了多种分布式服务(如分布式锁、原子操作),使得在分布式环境中使用Redis更加方便和高效。 3. 性能和易用性 Spring Data Redis: 使用Jedis或Lettuce作为客户端,Jedis是基于阻塞IO的,而Lettuce基于Netty是非阻塞的,因此在某些场景下性能更好。 易于集成和使用,特别是在Spring生态系统中。 Redisson: 基于Netty,通常在性能上优于Jedis和Lettuce(特别是在高并发场景下)。 提供了更丰富的分布式数据结构和工具,但在某些简单的使用场景下可能会显得过于复杂。 4. 社区和支持 Spring Data Redis: 作为Spring项目的一部分,拥有庞大的社区支持和良好的文档。 持续更新和维护,与Spring Boot紧密集成。 Redisson: 也是一个活跃的开源项目,拥有自己的社区和文档。 由于其专注于分布式数据结构和工具,因此在这些领域有很好的支持和应用案例。 结论 选择spring-data-redis还是Redisson取决于你的具体需求: 如果你的项目已经在使用Spring框架,并且需要简单的Redis操作,那么spring-data-redis可能是更好的选择。 如果你的项目需要更复杂的分布式数据结构和工具,特别是在分布式锁和原子操作方面,那么Redisson可能更适合你的需求。在这种情况下,虽然需要更多的手动配置,但它的功能和性能优势可能会让你觉得这是一个值得的投资。
二、安装Redis
1、Windows安装Redis
打开Redis官网,下载压缩包
解压到本地目录后,目录结构如下
2、启动方式
1、双击redis-server.exe启动
2、命令行窗口启动,在当前目录打开cmd窗口,输入:redis-server.exe redis.windows.conf
3、设置密码
Redis服务默认没有密码,如果要设置,编辑redis.windows.conf文件,找到requirepass关键字,后边是密码,修改成自定义密码后,把这行注释打开。并重启Redis服务,需要指定配置文件路径。
三、项目集成Redission客户端
本文主要介绍Redission客户端使用方式及部分高级特性原理。
1、引入依赖
<!-- https://mvnrepository.com/artifact/org.redisson/redisson-spring-boot-starter 最新版本3.45.0-->
<dependency>
<groupId>org.redisson</groupId>
<artifactId>redisson-spring-boot-starter</artifactId>
<version>3.15.6</version>
</dependency>
注意不用再引入spring-boot-starter-data-redis,在redisson-spring-boot-starter内部已经添加了依赖。
四、实用场景
1、操作缓存
直接看一个demo。
public static void main(String[] args) {
Config config = new Config()
.setTransportMode(TransportMode.NIO)
.setCodec(new JsonJacksonCodec());
config.useSingleServer()
.setAddress("redis://127.0.0.1:6379")
.setPassword("123456");
RedissonClient redissonClient = Redisson.create(config);
RBucket<Object> bucket = redissonClient.getBucket("");
// 直接设置value
bucket.set("123");
// 设置value并设置过期时间
bucket.set("123", 3, TimeUnit.SECONDS);
redissonClient.shutdown();
}
显然Redission虽然也能支持Redis常见操作,但是api入门门槛较高,相比于RedisTemplate大量简单且直观的方法确实不易使用。
2、分布式锁
不过Redission也有优势,比如在分布式锁,提供了几个简单方法即可实现。比如:
public static void main(String[] args) {
Config config = new Config()
.setTransportMode(TransportMode.NIO)
.setCodec(new JsonJacksonCodec());
config.useSingleServer()
.setAddress("redis://127.0.0.1:6379")
.setPassword("123456");
RedissonClient redissonClient = Redisson.create(config);
RLock lock = redissonClient.getLock("myLock");
// 用法1:直接上锁,需要在最后手动释放锁
lock.tryLock();
// 用法2:上锁并设置等待时间
lock.tryLock(10, TimeUnit.SECONDS);
// 用法3:上锁并设置等待时间、自动释放时间
lock.tryLock(10, 30, TimeUnit.SECONDS);
}
上述3个tryLock方法最终都会执行
private RFuture<Boolean> tryAcquireOnceAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId)
1. waitTime:等待时间,即在尝试获取锁时最多的等待时间。如果超过这个时间仍未获取到锁,则会放弃获取锁。
2. leaseTime:租约时间,即获取到锁后持有的时间。如果在这段时间内没有手动释放锁,则系统会自动释放锁。默认为-1,即如果不手动释放,则锁永久有效。
3. unit:时间单位,用于指定等待时间和租约时间的单位。
4. threadId:当前线程id
区别是,第一个方法传入的waitTime和leaseTime都是-1,第二个方法传入的leaseTime是-1
需要特别注意:如果调用了第3个方法获取锁,并且leaseTime不是-1,则会在leaseTime过期后,释放锁。
这里就该提到大家都知道的看门狗机制。即获取分布式锁后,执行业务方法,如果在业务方法执行耗时比较久,则后台有个线程会一直给锁续约,前提是leaseTime=-1
private <T> RFuture<Long> tryAcquireAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId) {
RFuture<Long> ttlRemainingFuture;
if (leaseTime != -1) {
ttlRemainingFuture = tryLockInnerAsync(waitTime, leaseTime, unit, threadId, RedisCommands.EVAL_LONG);
} else {
ttlRemainingFuture = tryLockInnerAsync(waitTime, internalLockLeaseTime,
TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);
}
ttlRemainingFuture.onComplete((ttlRemaining, e) -> {
if (e != null) {
return;
}
// lock acquired
if (ttlRemaining == null) {
if (leaseTime != -1) {
internalLockLeaseTime = unit.toMillis(leaseTime);
} else {
// leaseTime=-1,调用scheduleExpirationRenewal方法为当前线程续约
scheduleExpirationRenewal(threadId);
}
}
});
return ttlRemainingFuture;
}
protected void scheduleExpirationRenewal(long threadId) {
ExpirationEntry entry = new ExpirationEntry();
ExpirationEntry oldEntry = EXPIRATION_RENEWAL_MAP.putIfAbsent(getEntryName(), entry);
if (oldEntry != null) {
oldEntry.addThreadId(threadId);
} else {
entry.addThreadId(threadId);
renewExpiration();
}
}
通常使用分布式锁处理逻辑如下:
RLock lock = redissonClient.getLock(cacheKey);
boolean isLocked = lock.tryLock(waitTime, timeUnit);
if (isLocked) {
try {
// 执行业务方法
} finally {
if (lock.isLocked() && lock.isHeldByCurrentThread()) {
lock.unlock();
}
}
} else {
throw new RuntimeException("尝试加锁失败");
}
3、限流
当然Redission还有另外一个比较实用的功能,限流。提到限流,大家可能会想到很多实现方式,比如使用Semaphore控制并发限流,或者使用guava框架提供的限流功能。但是这些大多只适用于单机系统,或者只对单机需要限流。如果遇到分布式服务,需要全局限流,虽然也能通过一定方式实现,但是显然没有那么优雅和高效。
接下来介绍下Redission分布式限流方案:
public static void main(String[] args) {
Config config = new Config()
.setTransportMode(TransportMode.NIO)
.setCodec(new JsonJacksonCodec());
config.useSingleServer()
.setAddress("redis://127.0.0.1:6379")
.setPassword("123456");
RedissonClient redissonClient = Redisson.create(config);
RRateLimiter rateLimiter = redissonClient.getRateLimiter("myRateLimiter");
boolean setRate = rateLimiter.trySetRate(RateType.OVERALL, 5, 1, RateIntervalUnit.SECONDS);
if (!setRate) {
System.out.println("分布式限流器创建失败,已经存在。");
rateLimiter.delete();
setRate = rateLimiter.trySetRate(RateType.OVERALL, 5, 1, RateIntervalUnit.SECONDS);
System.out.println("分布式限流器创建" + (setRate ? "成功" : "失败"));
}
CountDownLatch latch = new CountDownLatch(10);
for (int i = 0; i < 10; i++) {
int finalI = i;
new Thread(() -> {
try {
Thread.sleep((long) (800 * Math.random()));
} catch (Exception e) {
e.printStackTrace();
}
if (rateLimiter.tryAcquire()) {
System.out.println("获取令牌成功");
} else {
System.out.println("Request" + finalI + "获取令牌失败");
}
latch.countDown();
}).start();
}
try {
latch.await();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
// 删除限流器
rateLimiter.delete();
}
注意看核心代码只有三行:
// 创建限流器
RRateLimiter rateLimiter = redissonClient.getRateLimiter("myRateLimiter");
// 设置限流参数
boolean setRate = rateLimiter.trySetRate(RateType.OVERALL, 5, 1, RateIntervalUnit.SECONDS);
// 获取令牌
rateLimiter.tryAcquire()
至于为什么这么简单神奇,接下来分析下源码。看下上边3个方法都干了啥。
3.1 创建限流器
// 方法签名
RRateLimiter getRateLimiter(String name);
@Override
public RRateLimiter getRateLimiter(String name) {
// 只是创建了一个RedissonRateLimiter对象并返回,并且设置了name属性为限流器名称
return new RedissonRateLimiter(commandExecutor, name);
}
3.2 设置限流参数
// 方法签名
boolean trySetRate(RateType mode, long rate, long rateInterval, RateIntervalUnit rateIntervalUnit);
@Override
public boolean trySetRate(RateType type, long rate, long rateInterval, RateIntervalUnit unit) {
return get(trySetRateAsync(type, rate, rateInterval, unit));
}
@Override
public RFuture<Boolean> trySetRateAsync(RateType type, long rate, long rateInterval, RateIntervalUnit unit) {
return commandExecutor.evalWriteAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
"redis.call('hsetnx', KEYS[1], 'rate', ARGV[1]);"
+ "redis.call('hsetnx', KEYS[1], 'interval', ARGV[2]);"
+ "return redis.call('hsetnx', KEYS[1], 'type', ARGV[3]);",
Collections.singletonList(getRawName()), rate, unit.toMillis(rateInterval), type.ordinal());
}
其实也就是设置了几个Redis缓存,key分别是rate、interval、type。
3.3 获取令牌
// 常用几个方法签名
// 方法1:获取1个令牌
boolean tryAcquire();
// 方法2:获取多个令牌
boolean tryAcquire(long permits);
// 方法3:是方法1的变体,多了2个获取令牌超时时间参数
boolean tryAcquire(long timeout, TimeUnit unit);
// 方法4:是方法2的变体,多了2个获取令牌超时时间参数
boolean tryAcquire(long permits, long timeout, TimeUnit unit);
我们先看tryAcquire()方法调用栈:
@Override
public boolean tryAcquire() {
return tryAcquire(1);
}
@Override
public boolean tryAcquire(long permits) {
return get(tryAcquireAsync(RedisCommands.EVAL_NULL_BOOLEAN, permits));
}
最终执行tryAcquireAsync方法,内部执行Lua脚本,保证操作的原子性。每一行脚本都加了说明,其中参数KEYS和ARGS值如下:
KEYS[1]=getRawName(),即限流器名称
KEYS[2]=getValueName(),值是{限流器名称}:value,存放的是数字,当前可用许可
KEYS[3]=getClientValueName()
KEYS[4]=getPermitsName(),值是{限流器名称}:permits,数据结构zset,score是当前获取许可的时间戳
KEYS[5]=getClientPermitsName()
ARGV[1]=value,获取的许可数量
ARGV[2]=System.currentTimeMillis(),当前时间戳
ARGV[3]=ThreadLocalRandom.current().nextLong(),一个随机数
private <T> RFuture<T> tryAcquireAsync(RedisCommand<T> command, Long value) {
return commandExecutor.evalWriteAsync(getRawName(), LongCodec.INSTANCE, command,
# rate 限流速率
"local rate = redis.call('hget', KEYS[1], 'rate');"
# interval 限流间隔
+ "local interval = redis.call('hget', KEYS[1], 'interval');"
# type 限流类型,RateType枚举下标,所以OVERALL=0,PER_CLIENT=1
+ "local type = redis.call('hget', KEYS[1], 'type');"
+ "assert(rate ~= false and interval ~= false and type ~= false, 'RateLimiter is not initialized')"
# valueName 值是{name}:value,存放的是数字,当前可用许可
+ "local valueName = KEYS[2];"
# permitsName 值是{name}:permits,数据结构zset,score是当前获取许可的时间戳
+ "local permitsName = KEYS[4];"
# type=PER_CLIENT时
+ "if type == '1' then "
# valueName 值是{name}:value:managerId
+ "valueName = KEYS[3];"
# permitsName 值是{name}:permits:managerId
+ "permitsName = KEYS[5];"
+ "end;"
# 参数校验:限流速率rate >= 当前请求许可(不传默认是1)
+ "assert(tonumber(rate) >= tonumber(ARGV[1]), 'Requested permits amount could not exceed defined rate'); "
# currentValue 获取当前还有多少许可
+ "local currentValue = redis.call('get', valueName); "
+ "if currentValue ~= false then " # 不是第一次获取许可
# expiredValues 已过期的许可
# zrangebyscore返回有序集合中指定分数区间(0,当前时间戳-限流区间]的成员列表,有序集成员按分数值递增(从小到大)次序排列。
+ "local expiredValues = redis.call('zrangebyscore', permitsName, 0, tonumber(ARGV[2]) - interval); "
# 获取过期许可总数
+ "local released = 0; "
+ "for i, v in ipairs(expiredValues) do "
# 函数struct.unpack从一个类结构字符串中解包出多个Lua值
+ "local random, permits = struct.unpack('fI', v);"
+ "released = released + permits;"
+ "end; "
# 释放过期许可
+ "if released > 0 then "
# zremrangebyscore移除有序集合中给定的分数区间的所有成员
+ "redis.call('zremrangebyscore', permitsName, 0, tonumber(ARGV[2]) - interval); "
# 当前可用许可+释放的许可数
+ "currentValue = tonumber(currentValue) + released; "
# 重新设置当前可用许可
+ "redis.call('set', valueName, currentValue);"
+ "end;"
+ "if tonumber(currentValue) < tonumber(ARGV[1]) then " # 剩余许可不够
#
+ "local nearest = redis.call('zrangebyscore', permitsName, '(' .. (tonumber(ARGV[2]) - interval), '+inf', 'withscores', 'limit', 0, 1); "
# 返回下一个许可需要等待多少时间
+ "return tonumber(nearest[2]) - (tonumber(ARGV[2]) - interval);"
+ "else " # 剩余许可足够
+ "redis.call('zadd', permitsName, ARGV[2], struct.pack('fI', ARGV[3], ARGV[1])); "
# 将当前可用许可数-获取许可数
+ "redis.call('decrby', valueName, ARGV[1]); "
+ "return nil; "
+ "end; "
+ "else " # 第一次获取许可
# 设置可用许可数,首次等于限流速率rate
+ "redis.call('set', valueName, rate); "
# 函数struct.pack将多个Lua值打包成一个类结构(struct-like)字符串[fI,nextLong(),获取许可数]
+ "redis.call('zadd', permitsName, ARGV[2], struct.pack('fI', ARGV[3], ARGV[1])); "
# 将当前可用许可数-获取许可数
+ "redis.call('decrby', valueName, ARGV[1]); "
+ "return nil; "
+ "end;",
Arrays.asList(getRawName(), getValueName(), getClientValueName(), getPermitsName(), getClientPermitsName()),
value, System.currentTimeMillis(), ThreadLocalRandom.current().nextLong());
}
按照上述Lua脚本逻辑,我们来模拟下获取令牌的过程
假设初始列表:860 1200 1300 1800 第一次,当前是1850,则1850-1000=850,回收(0,850],没有可回收。直接放入1850,此时列表变成:860 1200 1300 1800 1850。剩余许可0 第二次,当前是1900,获取可释放许可,1900-1000=900,即(0,900],有860,回收后列表变成:1200 1300 1800 1850。剩余许可1 如果获取1个,则足够,并放入1900,此时列表变成:1200 1300 1800 1850 1900。剩余许可0 如果获取3个,则不够,此时最近一个(1900-1000,正无穷大],所以是1200,需要等待1200-(1900-1000)=300ms。获取失败,当前列表:1200 1300 1800 1850。剩余许可1 第三次,只等待了100ms,即当前是2000,获取可释放许可,2000-1000=1000,即(0,1000],没有可回收,当前列表:1200 1300 1800 1850。剩余许可1 第四次,又等待了200ms,即当前是2200,获取可释放许可,2200-1000=1200,即(0,1200],有1200,回收后列表变成:1300 1800 1850。剩余许可2 如果获取3个,则不够,此时最近一个(2200-1000,正无穷大],所以是1300,需要等待1300-(2200-1000)=100ms。获取失败,当前列表:1300 1800 1850。剩余许可2 第五次,又等待了100ms,即当前是2300,获取可释放许可,2300-1000=1300,即(0,1300],有1300,回收后列表变成:1800 1850。剩余许可3 如果获取3个,则足够。
3.4 带超时时间获取令牌
@Override
public RFuture<Boolean> tryAcquireAsync(long permits, long timeout, TimeUnit unit) {
RPromise<Boolean> promise = new RedissonPromise<Boolean>();
long timeoutInMillis = -1;
if (timeout >= 0) {
timeoutInMillis = unit.toMillis(timeout);
}
tryAcquireAsync(permits, promise, timeoutInMillis);
return promise;
}
private void tryAcquireAsync(long permits, RPromise<Boolean> promise, long timeoutInMillis) {
long s = System.currentTimeMillis();
RFuture<Long> future = tryAcquireAsync(RedisCommands.EVAL_LONG, permits);
future.onComplete((delay, e) -> {
if (e != null) {
// 发生异常,获取令牌失败
promise.tryFailure(e);
return;
}
// delay是获取下一个令牌需要等待的时间。如果不需要等待,表示获取令牌成功
if (delay == null) {
promise.trySuccess(true);
return;
}
// 获取令牌超时时间。如果设置为-1,则退化成tryAcquire(long permits)。即一直到获取成功后返回
if (timeoutInMillis == -1) {
// 延迟delay之后再获取令牌
commandExecutor.getConnectionManager().getGroup().schedule(() -> {
tryAcquireAsync(permits, promise, timeoutInMillis);
}, delay, TimeUnit.MILLISECONDS);
return;
}
// 上一次获取令牌已消耗时间
long el = System.currentTimeMillis() - s;
// 剩余超时时间,如果<=0,表示超时时间已到,获取令牌失败
long remains = timeoutInMillis - el;
if (remains <= 0) {
promise.trySuccess(false);
return;
}
// 如果剩余超时时间<下一个令牌等待时间,即等不到获取下一个令牌已经超时了,则延迟remains之后,获取令牌失败
if (remains < delay) {
commandExecutor.getConnectionManager().getGroup().schedule(() -> {
promise.trySuccess(false);
}, remains, TimeUnit.MILLISECONDS);
} else {
long start = System.currentTimeMillis();
// 延迟delay之后,开始获取令牌
commandExecutor.getConnectionManager().getGroup().schedule(() -> {
// 从创建线程到开始执行消耗时间
long elapsed = System.currentTimeMillis() - start;
if (remains <= elapsed) {
// 如果剩余超时时间<从创建线程到开始执行消耗时间,即线程开始时已经超时了,获取令牌失败
promise.trySuccess(false);
return;
}
// 重新计算剩余超时时间并获取令牌
tryAcquireAsync(permits, promise, remains - elapsed);
}, delay, TimeUnit.MILLISECONDS);
}
});
}
清除限流器
根据如上代码分析,限流器一旦创建并设置参数后,会在Redis中长期缓存几个key,分别是rate、interval、type。如果不处理,会一直存在。假如服务异常宕机,重启时,再次创建限流器可能会创建失败。遇到这种情况,可以先手动删除限流器。
// 删除限流器
rateLimiter.delete();
3.5 小结
- Redission分布式限流使用脚本巧妙的运用了Lua脚本,以及Redis中zset数据结构及操作方法。实现了获取令牌的逻辑。
- Redission实现的限流器,在当前db上只能创建一个。因为rate、interval、type都是全局的。如果需要,可以指定db。在其他db上创建别的限流器。因此最多可以创建16个分布式限流器。理论上可以把这几个key也添加分组器前缀,不知道后边版本会不会支持。或者也可以自己重写方法实现。
- 如果剩余令牌数不足,会返回下一个令牌需要等待多久。但是如果要一次获取多个令牌,可能还需要等待N轮才能成功。
- 如果在所有接口入口都添加获取令牌代码,侵入性太强。可以通过Spring AOP方式对controller接口拦截并限流。