文章目录
- 一、简介
- 二、集成redission
- pom文件
- redission 配置文件
- application.yml文件
- 启动类
- 三、JAVA 操作案例
- 字符串操作
- 哈希操作
- 列表操作
- 集合操作
- 有序集合操作
- 布隆过滤器操作
- 分布式锁操作
- 四、源码解析
一、简介
Redisson 是一个在 Redis 的基础上实现的 Java 驻内存数据网格客户端(In-Memory Data Grid)。它不仅提供了一系列的 redis 常用数据结构命令服务,还提供了许多分布式服务,例如分布式锁、分布式对象、分布式集合、分布式远程服务、分布式调度任务服务等等。
本文会介绍如何将redission集成到springboot和如何使用 Redisson 操作 Redis 中的字符串、哈希、列表、集合、有序集合,以及布隆过滤器和分布式锁等功能,并解析源码中其最大亮点的看门狗
机制。
二、集成redission
本章内容使用的springboot版本为2.6.3
,redission-starter版本为3.17.7
pom文件
引入redission相关依赖
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-core</artifactId>
<version>2.12.3</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.dataformat</groupId>
<artifactId>jackson-dataformat-yaml</artifactId>
<version>2.12.3</version>
</dependency>
<dependency>
<groupId>org.redisson</groupId>
<artifactId>redisson-spring-boot-starter</artifactId>
<version>3.17.7</version>
</dependency>
redission 配置文件
将redission相关的配置抽到单独的一个redission文件,放在application.yml的同级目录
# 单节点配置
singleServerConfig:
# 连接空闲超时,单位:毫秒
idleConnectionTimeout: 10000
# 连接超时,单位:毫秒
connectTimeout: 10000
# 命令等待超时,单位:毫秒
timeout: 3000
# 命令失败重试次数,如果尝试达到 retryAttempts(命令失败重试次数) 仍然不能将命令发送至某个指定的节点时,将抛出错误。
# 如果尝试在此限制之内发送成功,则开始启用 timeout(命令等待超时) 计时。
retryAttempts: 3
# 命令重试发送时间间隔,单位:毫秒
retryInterval: 1500
# 密码,没有设置密码时,需要注释掉,否则会报错
# password: redis.shbeta
# 单个连接最大订阅数量
subscriptionsPerConnection: 5
# 客户端名称
clientName: "axin"
# 节点地址
address: "redis://192.168.102.111:6379"
password: Cidneueopx
# 发布和订阅连接的最小空闲连接数
subscriptionConnectionMinimumIdleSize: 1
# 发布和订阅连接池大小
subscriptionConnectionPoolSize: 50
# 最小空闲连接数
connectionMinimumIdleSize: 32
# 连接池大小
connectionPoolSize: 64
# 数据库编号
database: 1
# DNS监测时间间隔,单位:毫秒
dnsMonitoringInterval: 5000
# 线程池数量,默认值: 当前处理核数量 * 2
threads: 0
# Netty线程池数量,默认值: 当前处理核数量 * 2
nettyThreads: 0
# 编码
codec: !<org.redisson.codec.JsonJacksonCodec> {}
# 传输模式
transportMode : "NIO"
# 配置看门狗的默认超时时间为30s,这里改为 10s
lockWatchdogTimeout: 10000
application.yml文件
在该配置文件将redission配置文件引入
spring:
redis:
redisson:
file: classpath:redisson.yml
启动类
启动类需要加上注解@ImportAutoConfiguration,如下所示
@SpringBootApplication
@ImportAutoConfiguration(RedissonAutoConfiguration.class)
public class EasyUserApplication {
public static void main(String[] args) {
SpringApplication.run(EasyUserApplication.class, args);
}
}
三、JAVA 操作案例
字符串操作
Redisson 支持通过RBucket对象来操作字符串或对象(对象需要实现序列化Serializable)数据结构,同时支持设置数据和有效期,例子如下
@Autowired
RedissonClient redissonClient;
@GetMapping("/opString")
public void opString() {
RBucket<String> strKey = redissonClient.getBucket("strKey");
strKey.set("china");
//表示10分钟后删除该键
strKey.expire(Duration.ofMinutes(10));
System.out.println(strKey.get());
}
哈希操作
通过获取一个RMap 对象来进行哈希数据结构的操作,例子如下
@Autowired
RedissonClient redissonClient;
@GetMapping("/opMap")
public void opMap() {
//获取一个map对象
RMap<String, String> rMap = redissonClient.getMap("mapKey");
rMap.put("wawa", "1212");
//表示10分钟后删除该键
rMap.expire(Duration.ofMinutes(10));
System.out.println(rMap.get("wawa"));
}
列表操作
redission支持通过RList对象来操作列表数据结构,例子如下
@Autowired
RedissonClient redissonClient;
@GetMapping("/opList")
public void opList() {
RList<Integer> rList = redissonClient.getList("listKey");
rList.add(100);
rList.add(200);
rList.add(300);
//表示10分钟后删除该键
rList.expire(Duration.ofMinutes(10));
//读取列表全部数据,不删除
System.out.println(rList.readAll());
}
集合操作
Redisson支持通过RSet对象来操作集合,例子如下
@Autowired
RedissonClient redissonClient;
@GetMapping("/opSet")
public void opSet() {
RSet<Integer> rSet = redissonClient.getSet("setKey");
rSet.add(100);
rSet.add(200);
rSet.add(300);
//表示10分钟后删除该键
rSet.expire(Duration.ofMinutes(10));
System.out.println(rSet.readAll());
log.info("========testSkyworking");
}
有序集合操作
Redisson 支持通过RSortedSet对象来操作有序集合数据结构,如果使用存储对象,则实体对象必须先实现Comparable接口,并重写比较逻辑,否则会报错,例子如下
@Autowired
RedissonClient redissonClient;
@GetMapping("/opSortSet")
public void opSortSet() {
RSortedSet<Integer> rSortSet = redissonClient.getSortedSet("sortsetKey");
rSortSet.add(300);
rSortSet.add(200);
rSortSet.add(100);
System.out.println(rSortSet.readAll());
}
布隆过滤器操作
Redisson支持通过RBloomFilter对象来操作布隆过滤器,布隆过滤器可以用于检索一个元素是否在一个集合中。它的优点是空间效率和查询时间都比一般的算法要好的多,缺点是有一定的误识别率和删除困难,例子如下
@Autowired
RedissonClient redissonClient;
@GetMapping("/opBloomFilter")
public void opBloomFilter() {
RBloomFilter rBloomFilter = redissonClient.getBloomFilter("BloomFilterKey");
// 初始化预期插入的数据量为50000和期望误差率为0.01
rBloomFilter.tryInit(50000, 0.01);
rBloomFilter.add(300);
rBloomFilter.add(200);
rBloomFilter.add(100);
//表示100分钟后删除该键
rBloomFilter.expire(Duration.ofMinutes(100));
// 判断是否存在
System.out.println(rBloomFilter.contains(300));
System.out.println(rBloomFilter.contains(700));
}
分布式锁操作
其实很多时候,我们引入redission,最想使用的功能场景就是其分布式锁,因为我们不需要去设置最大释放锁的时间,redission内部有一个看门狗机制会主动去续期。
Redisson通过RLock对象来操作分布式锁,例子如下
@Autowired
RedissonClient redissonClient;
@GetMapping("/opLock")
public void opLock() {
//获取锁对象实例
final String lockKey = "mylock";
RLock rLock = redissonClient.getLock(lockKey);
try {
//尝试5秒内获取锁 不设置释放锁的时间
boolean res = rLock.tryLock(5L, TimeUnit.SECONDS);
System.out.println("获取锁成功");
if (res) {
for (int i = 0; i < 10; i++) {
System.out.println(i);
Thread.sleep(1000L);
}
//成功获得锁,在这里处理业务
System.out.println("处理业务");
}
} catch (Exception e) {
System.out.println("获取锁失败,失败原因:" + e.getMessage());
} finally {
//无论如何, 最后都要解锁
rLock.unlock();
}
}
四、源码解析
分布式锁的看门狗机制大致如下列流程图所示
调用链路
org.redisson.RedissonLock#tryLock
=》org.redisson.RedissonLock#tryLockAsync
=》org.redisson.RedissonLock#tryAcquireAsync
我们来仔细看看org.redisson.RedissonLock#tryAcquireAsync
private <T> RFuture<Long> tryAcquireAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId) {
RFuture ttlRemainingFuture;
//判断锁的持有时间是否由用户自定义
if (leaseTime > 0L) {
ttlRemainingFuture = this.tryLockInnerAsync(waitTime, leaseTime, unit, threadId, RedisCommands.EVAL_LONG);
} else {
//当用户没有自定义锁占有时间时,默认传入 internalLockLeaseTime
//private long lockWatchdogTimeout = 30 * 1000; 默认30秒
ttlRemainingFuture = this.tryLockInnerAsync(waitTime, this.internalLockLeaseTime, TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);
}
CompletionStage<Long> f = ttlRemainingFuture.thenApply((ttlRemaining) -> {
if (ttlRemaining == null) {
if (leaseTime > 0L) {
//如果用户传入占用时间直接转换,把默认值internalLockLeaseTime 更新为用户自定义的占有时间
this.internalLockLeaseTime = unit.toMillis(leaseTime);
} else {
//这里就是触发看门狗机制的方法,只有当 leaseTime == -1时才会触发看门狗机制
this.scheduleExpirationRenewal(threadId);
}
}
return ttlRemaining;
});
return new CompletableFutureWrapper(f);
}
接着进行scheduleExpirationRenewal
protected void scheduleExpirationRenewal(long threadId) {
RedissonBaseLock.ExpirationEntry entry = new RedissonBaseLock.ExpirationEntry();
//EXPIRATION_RENEWAL_MAP 是一个全局的静态常量Map
RedissonBaseLock.ExpirationEntry oldEntry = (RedissonBaseLock.ExpirationEntry)EXPIRATION_RENEWAL_MAP.putIfAbsent(this.getEntryName(), entry);
if (oldEntry != null) {
//oldEntry != null 表示该线程不是第一次触发
oldEntry.addThreadId(threadId);
} else {
//oldEntry == null 表示该线程是第一次触发
entry.addThreadId(threadId);
try {
//更新过期时间
this.renewExpiration();
} finally {
if (Thread.currentThread().isInterrupted()) {
this.cancelExpirationRenewal(threadId);
}
}
}
}
org.redisson.RedissonBaseLock#renewExpiration
中主要是更新时间的详细逻辑
private void renewExpiration() {
//获取当前线程的更新对象
RedissonBaseLock.ExpirationEntry ee = (RedissonBaseLock.ExpirationEntry)EXPIRATION_RENEWAL_MAP.get(this.getEntryName());
if (ee != null) {
//创建了一个定时任务
Timeout task = this.getServiceManager().newTimeout(new TimerTask() {
public void run(Timeout timeout) throws Exception {
RedissonBaseLock.ExpirationEntry ent = (RedissonBaseLock.ExpirationEntry)RedissonBaseLock.EXPIRATION_RENEWAL_MAP.get(RedissonBaseLock.this.getEntryName());
if (ent != null) {
Long threadId = ent.getFirstThreadId();
if (threadId != null) {
//异步更新过期时间
CompletionStage<Boolean> future = RedissonBaseLock.this.renewExpirationAsync(threadId);
future.whenComplete((res, e) -> {
if (e != null) {
//如果出现异常,从map中删除,直接返回
RedissonBaseLock.log.error("Can't update lock {} expiration", RedissonBaseLock.this.getRawName(), e);
RedissonBaseLock.EXPIRATION_RENEWAL_MAP.remove(RedissonBaseLock.this.getEntryName());
} else {
if (res) {
//如果没有报错,就再次定时延期
RedissonBaseLock.this.renewExpiration();
} else {
//否则取消定时
RedissonBaseLock.this.cancelExpirationRenewal((Long)null);
}
}
});
}
}
}
}, this.internalLockLeaseTime / 3L, TimeUnit.MILLISECONDS);
ee.setTimeout(task);
}
}