Redisson的看门狗机制究竟有什么用?
一、普通的Redis分布式锁的缺陷
基于Redis的分布式锁
Redis + Lua 脚本实现分布式锁
二、watchDog的自动延期机制
调用链关系
源码解析
tryLock()
tryAcquire()
tryAcquireAsync()
scheduleExpirationRenewal() 锁续约
renewExpiration() 更新有效期
Lua脚本 重置有效期
更新有效期 - renewExpiration() 执行流程
总结
Redisson的看门狗机制究竟有什么用?
想要讨论这个问题,就要先搞清楚Redisson是干什么的?解决了什么问题?
8. 分布式锁和同步器 - 《Redisson 使用手册》 - 书栈网 · BookStackhttps://www.bookstack.cn/read/redisson-wiki-zh/8.-%E5%88%86%E5%B8%83%E5%BC%8F%E9%94%81%E5%92%8C%E5%90%8C%E6%AD%A5%E5%99%A8.mdRedisson是一种分布式锁的解决方案!它为我们封装了很多API,使程序员在开发的时候只要关注业务逻辑,不需要考虑锁可靠性的问题。
分布式锁要是设计不当很可能发生死锁的问题,而Redisson中的看门狗机制就是为了解决这个问题!
一、普通的Redis分布式锁的缺陷
我之前写过两篇文章,就是关于自己手写一个redis分布式锁解决高并发下超卖问题的
基于Redis的分布式锁
Redis的分布式锁问题(八)基于Redis的分布式锁_面向鸿蒙编程的博客-CSDN博客_redis分布式锁https://blog.csdn.net/weixin_43715214/article/details/127967364
Redis + Lua 脚本实现分布式锁
Redis的分布式锁问题(九)Redis + Lua 脚本实现分布式锁_面向鸿蒙编程的博客-CSDN博客https://blog.csdn.net/weixin_43715214/article/details/127982757虽然,最后这种基于Lua脚本实现的分布式锁基本上已经可以应对市面上大多数场景,但是,它还存在很多问题——可重入问题、可重试问题、超时释放问题、主从一致性问题......
还有一个很重要的问题就是分布式锁过期的问题!!!
试想一下,一个锁设置了1分钟超时释放,如果拿到这个锁的线程在一分钟内没有执行完毕,那么这个锁就会被其他线程拿到,可能会导致严重的线上问题!
所以Redisson中就提供了这种看门狗机制来解决这个问题!
二、watchDog的自动延期机制
Redisson提供了一个监控锁的看门狗,它的作用是在Redisson实例被关闭前,不断的延长锁的有效期,也就是说,如果一个拿到锁的线程一直没有完成逻辑,那么看门狗会帮助线程不断的延长锁超时时间,锁不会因为超时而被释放。
默认情况下,看门狗的续期时间是30s,也可以通过修改 Config.lockWatchdogTimeout 来另行指定。另外Redisson 还提供了可以指定leaseTime参数的加锁方法来指定加锁的时间,但是一旦使用了这个参数看门狗机制就失效了(不会自动续期)
调用链关系
tryLock()->tryAcquire()->tryAcquireAsync() -> scheduleExpirationRenewal()
源码解析
tryLock()
public boolean tryLock(long waitTime, TimeUnit unit) throws InterruptedException {
return tryLock(waitTime, -1, unit);
}
@Override
public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException {
// 转成毫秒,后面都是以毫秒为单位
long time = unit.toMillis(waitTime);
// 当前时间
long current = System.currentTimeMillis();
// 线程ID-线程标识
long threadId = Thread.currentThread().getId();
// 尝试获取锁 tryAcquire() !!!
Long ttl = tryAcquire(waitTime, leaseTime, unit, threadId);
// 如果上面尝试获取锁返回的是null,表示成功;如果返回的是时间则表示失败。
if (ttl == null) {
return true;
}
// 剩余等待时间 = 最大等待时间 -(用现在时间 - 获取锁前的时间)
time -= System.currentTimeMillis() - current;
// 剩余等待时间 < 0 失败
if (time <= 0) {
acquireFailed(waitTime, unit, threadId);
return false;
}
// 再次获取当前时间
current = System.currentTimeMillis();
// 重试逻辑,但不是简单的直接重试!
// subscribe是订阅的意思
RFuture<RedissonLockEntry> subscribeFuture = subscribe(threadId);
// 如果在剩余等待时间内,收到了释放锁那边发过来的publish,则才会再次尝试获取锁
if (!subscribeFuture.await(time, TimeUnit.MILLISECONDS)) {
if (!subscribeFuture.cancel(false)) {
subscribeFuture.onComplete((res, e) -> {
if (e == null) {
// 取消订阅
unsubscribe(subscribeFuture, threadId);
}
});
}
// 获取锁失败
acquireFailed(waitTime, unit, threadId);
return false;
}
try {
// 又重新计算了一下,上述的等待时间
time -= System.currentTimeMillis() - current;
if (time <= 0) {
acquireFailed(waitTime, unit, threadId);
return false;
}
// 重试!
while (true) {
long currentTime = System.currentTimeMillis();
ttl = tryAcquire(waitTime, leaseTime, unit, threadId);
// 成功
if (ttl == null) {
return true;
}
// 又获取锁失败,再次计算上面的耗时
time -= System.currentTimeMillis() - currentTime;
if (time <= 0) {
acquireFailed(waitTime, unit, threadId);
return false;
}
currentTime = System.currentTimeMillis();
// 采用信号量的方式重试!
if (ttl >= 0 && ttl < time) {
subscribeFuture.getNow().getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
} else {
subscribeFuture.getNow().getLatch().tryAcquire(time, TimeUnit.MILLISECONDS);
}
// 重新计算时间(充足就继续循环)
time -= System.currentTimeMillis() - currentTime;
if (time <= 0) {
acquireFailed(waitTime, unit, threadId);
return false;
}
}
} finally {
unsubscribe(subscribeFuture, threadId);
}
}
tryAcquire()
private Long tryAcquire(long waitTime, long leaseTime, TimeUnit unit, long threadId) {
return get(tryAcquireAsync(waitTime, leaseTime, unit, threadId));
}
tryAcquireAsync()
这个就是看门狗机制的入口,当没有传leaseTime时,会触发看门狗机制!
private <T> RFuture<Long> tryAcquireAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId) {
if (leaseTime != -1) {
return tryLockInnerAsync(waitTime, leaseTime, unit, threadId, RedisCommands.EVAL_LONG);
}
// leaseTime我们没有传,这里设定默认值(看门狗)30s
RFuture<Long> ttlRemainingFuture = tryLockInnerAsync(waitTime,
commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(),
TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);
// 回调函数 ttlRemaining:剩余有效期,e:异常
ttlRemainingFuture.onComplete((ttlRemaining, e) -> {
if (e != null) {
return;
}
// 剩余有效期为null,表示获取锁成功!
if (ttlRemaining == null) {
// 锁续约
scheduleExpirationRenewal(threadId);
}
});
return ttlRemainingFuture;
}
scheduleExpirationRenewal() 锁续约
private void scheduleExpirationRenewal(long threadId) {
RedissonLock.ExpirationEntry entry = new RedissonLock.ExpirationEntry();
/* putIfAbsent() 是ConcurrentHashMap的API
* (1)如果是新的记录,那么会向map中添加该键值对,并返回null
* (2)如果已经存在,那么不会覆盖已有的值,直接返回已经存在的值
* EXPIRATION_RENEWAL_MAP 是静态的,key为锁的名称
*/
RedissonLock.ExpirationEntry oldEntry = EXPIRATION_RENEWAL_MAP.putIfAbsent(getEntryName(), entry);
// 新的、旧的都会加
if (oldEntry != null) {
oldEntry.addThreadId(threadId);
} else {
entry.addThreadId(threadId);
// 新的,还会多一步操作(更新有效期)
renewExpiration();
}
}
renewExpiration() 更新有效期
这个方法通过自己调用自己的方式去实现锁续期! 每10s执行一次
private void renewExpiration() {
RedissonLock.ExpirationEntry ee = EXPIRATION_RENEWAL_MAP.get(getEntryName());
if (ee == null) {
return;
}
Timeout task = commandExecutor.getConnectionManager().newTimeout(new TimerTask() {
@Override
public void run(Timeout timeout) throws Exception {
RedissonLock.ExpirationEntry ent = EXPIRATION_RENEWAL_MAP.get(getEntryName());
if (ent == null) {
return;
}
Long threadId = ent.getFirstThreadId();
if (threadId == null) {
return;
}
// 刷新有效期
RFuture<Boolean> future = renewExpirationAsync(threadId);
future.onComplete((res, e) -> {
if (e != null) {
log.error("Can't update lock " + getName() + " expiration", e);
return;
}
if (res) {
// 自己调自己
renewExpiration();
}
});
}
}, internalLockLeaseTime / 3, TimeUnit.MILLISECONDS);
ee.setTimeout(task);
}
Lua脚本 重置有效期
protected RFuture<Boolean> renewExpirationAsync(long threadId) {
return evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return 1; " +
"end; " +
"return 0;",
Collections.singletonList(getName()),
internalLockLeaseTime, getLockName(threadId));
}
更新有效期 - renewExpiration() 执行流程
renewExpiration() 函数在执行时,会开启一个任务
这个任务会在10s后执行 (internalLockLeaseTime / 3)
10s后执行的这个任务会更新有效期,并“调自己”!
“调自己”说明又会建立这个任务,而这个任务又在10s后执行.......
这也是为什么在oldEntry中不会调这个函数,而新的entry需要调的原因!!!
因为在oldEntry中本身就有这个任务(之前调过,当它刚刚成为entry的时候) ,最后,在释放锁的时候将这个定时任务清除(通过cancelExpirationRenewal()清除任务)
总结
(1)watchDog 在当前节点存活时,每10s给分布式锁的key续期 30s
(2)watchDog 最终还是通过 Lua脚本的expire命令来进行重置有效期,更新有效期
(3)watchDog 机制启动,且代码中没有释放锁操作时,watchDog 会不断的给锁续期
(4)要使 watchDog 机制生效 ,就不要设置锁过期时间 leaseTime
(5)锁续期是通过一个定时任务,在 renewExpiration() 中自己调自己实现的!