1. 锁重试
首先要理解为什么要进行锁重试,之前我们在获取锁时,只要一次获取失败就直接返回false,这样的机制需要进行修改;
尝试获取锁的底层逻辑是
返回锁的有效期(null或者其他值);
为null然后判断是否给leaseTime赋值,若未赋值则自动为-1(赋值为-1同样不开启看门狗),开启看门狗机制;
不为null,则判断剩余等待时间(time)是否大于0,如果大于0则开启订阅,一直dowhile循环获取锁,并且期间不断更新等待时间,如果最后time<0,超时则取消订阅并且返回false无法获取锁;
如果小于0,则直接返回false;
释放锁的顶层逻辑,判断是否成功,失败记录日志即可,成功则要发送释放锁的信号(订阅信号),并且取消看门狗机制,避免一直更新;
leaseTime可给可不给,不给会直接赋默认值;
一旦给waitTime值,就不会获取失败就返回了,而是在等待时间内不断去尝试获取锁,具体获取的原理如下;
进入获取锁的业务逻辑
尝试获取锁
private Long tryAcquire(long waitTime, long leaseTime, TimeUnit unit, long threadId) {
return (Long)this.get(this.tryAcquireAsync(waitTime, leaseTime, unit, threadId));
}
如果 leaseTime 等于 -1,表示没有指定具体的持有时间。此时,方法会继续调用 tryLockInnerAsync() 方法,指定一个默认的锁监视超时时间(lockWatchdogTimeout)作为 leaseTime 的值,并将时间单位调整为毫秒。然后,得到返回的 RFuture 对象 ttlRemainingFuture。
private <T> RFuture<Long> tryAcquireAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId) {
if (leaseTime != -1L) {
return this.tryLockInnerAsync(waitTime, leaseTime, unit, threadId, RedisCommands.EVAL_LONG);
} else {
RFuture<Long> ttlRemainingFuture = this.tryLockInnerAsync(waitTime, this.commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(), TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);
ttlRemainingFuture.onComplete((ttlRemaining, e) -> {
if (e == null) {
if (ttlRemaining == null) {
this.scheduleExpirationRenewal(threadId);
}
}
});
return ttlRemainingFuture;
}
}
leaseTime未设置默认-1,走else逻辑;
具体给的超时时间为this.commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout()
也即是30秒;
然后进入tryLockInnerAsync()方法获取锁的信息,两种结果:一是返回null,二是返回锁的有效期
执行Luna脚本;
判断锁是否存在;
不存在记录锁的标识并且次数加1;
设置锁的有效期;
如果存在判断锁标识是否相同;
是则锁次数+1,并且设置锁的有效期;
获取锁的操作中,获取成功则返回nil(null),失败则返回锁的有效期;
这里的get方法就是阻塞等待
tryAcquireAsync()
的剩余有效期(null或者有效期)
private Long tryAcquire(long waitTime, long leaseTime, TimeUnit unit, long threadId) {
return (Long)this.get(this.tryAcquireAsync(waitTime, leaseTime, unit, threadId));
}
最后返回至最初尝试获取锁位置
两种情况,第一种null,获取成功;
第二种剩余有效期,获取锁失败,再次开始获取;
获取time(获取锁消耗的时间),当waitTime时间大于消耗时间时继续逻辑;
但是此时并不会立马去再去获取锁,因为此时大概率再次获取还是失败,可能使用锁的业务还没有释放锁;
这里给出了一个订阅的概念;
代码重新获取当前时间 current,然后创建一个用于订阅锁释放通知的 subscribeFuture,然后调用 subscribe() 方法进行订阅;
接下来,代码调用 subscribeFuture.await(time, TimeUnit.MILLISECONDS) 方法,等待 time 毫秒时间,等待订阅结果。
如果在等待期间未收到订阅结果,表示等待超时。在等待超时后,代码会尝试取消订阅任务。如果取消失败,会在 subscribeFuture.onComplete() 方法中进行处理,判断是否需要取消订阅,并调用 unsubscribe() 方法进行处理。
如果取消成功,则代码调用 acquireFailed() 方法进行处理,表示当前线程获取锁失败,最终返回 false。
订阅的通知就是释放锁Luna脚本当中发布的通知,然后等待订阅结果,等待的时间就是time(锁的最大剩余时间);
如果是在time时间之内获得释放锁的通知,则会走以下代码;
这段代码是 Redisson 中分布式锁获取的核心逻辑,实现了在等待获取锁的过程中对剩余时间的动态调整。下面是代码的解析:
time是最大剩余时间
-
首先,代码获取当前时间
current
并计算剩余等待时间time
。 -
如果
time
小于等于 0,表示等待锁的时间已经超过了剩余过期时间,或者锁的剩余过期时间非法。在这种情况下,会调用acquireFailed()
方法进行处理,并返回false
。 -
如果
time
大于 0,表示还有剩余的时间需要进行等待,也即就是还能去获取锁。代码会进入一个 do-while 循环,并不断尝试获取锁。 -
在循环中,代码会再次获取当前时间
currentTime
,然后调用tryAcquire()
方法尝试获得锁,同时再次获取返回的锁的剩余过期时间ttl
。 -
如果锁的剩余过期时间
ttl
为空,表示成功获取到锁,会返回true
。 -
如果剩余过期时间
ttl
大于等于 0 且小于等于剩余等待时间time
,表示还有足够的时间可以等待,代码会使用tryAcquire()
方法返回的ttl
来等待锁的释放。 -
如果剩余过期时间
ttl
大于剩余等待时间time
,表示还需要等待更长的时间,代码会使用time
来等待锁的释放。 -
在循环中,每次等待之后,会重新计算剩余等待时间
time
。 -
当剩余等待时间
time
小于或等于 0,表示等待超时,会调用acquireFailed()
方法进行处理,并返回false
。 -
最后,无论等待成功还是失败,都会调用
unsubscribe()
方法取消订阅并释放资源。
这段代码实现了在等待获取锁的过程中对剩余时间的动态调整,确保在等待过程中可以根据实际情况调整等待时间,提高获取锁的效率(不是盲等,而是在不断地尝试)。同时,使用 try-finally 语句块确保在获取锁过程中发生异常时可以正确地取消订阅并释放资源。
try {
time -= System.currentTimeMillis() - current;
if (time <= 0L) {
this.acquireFailed(waitTime, unit, threadId);
boolean var20 = false;
return var20;
} else {
boolean var16;
do {
long currentTime = System.currentTimeMillis();
ttl = this.tryAcquire(waitTime, leaseTime, unit, threadId);
if (ttl == null) {
var16 = true;
return var16;
}
time -= System.currentTimeMillis() - currentTime;
if (time <= 0L) {
this.acquireFailed(waitTime, unit, threadId);
var16 = false;
return var16;
}
currentTime = System.currentTimeMillis();
if (ttl >= 0L && ttl < time) {
((RedissonLockEntry)subscribeFuture.getNow()).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
} else {
((RedissonLockEntry)subscribeFuture.getNow()).getLatch().tryAcquire(time, TimeUnit.MILLISECONDS);
}
time -= System.currentTimeMillis() - currentTime;
} while(time > 0L);
this.acquireFailed(waitTime, unit, threadId);
var16 = false;
return var16;
}
} finally {
this.unsubscribe(subscribeFuture, threadId);
}
所以只要给waitTime时间,就可以做到锁重试的机制;
2. watchdog机制
目的就是解决业务因设置了超时时间但是在时间内业务并没有执行完的问题,采用看门狗机制进行自动延期机制。
这里有一个大前提,为什么需要设置过期时间?原因在之前的笔记提到过,避免线程1阻塞不释放锁,导致线程2无法获取锁。
如果业务是因为阻塞导致锁释放,也就是ttl为null,然后我们获取到锁,这就出现了安全问题,因为我们必须确定业务是正常执行释放而不是阻塞释放;
在获取分布式锁时,会指定一个锁的看门狗超时时间,即 lockWatchdogTimeout。该超时时间一般会设置为一个比较长的值,通常是锁的最大超时时间的一小部分。
当某个线程成功获取了分布式锁时,Redisson 会在服务端设置一个后台线程,该线程会定期发送续约请求给 Redis,以更新锁的过期时间。同时,该线程会监控当前线程是否仍然持有锁。
如果看门狗超时时间内,持有锁的线程没有发送续约请求,或者没有收到持有锁的线程的心跳信号,则看门狗线程会认为持有锁的线程已经不再活跃或发生了异常,并自动终止该锁,并将锁资源释放。
-
异步获取锁后,代码会在 ttlRemainingFuture.onComplete() 方法中定义一个回调函数(tryLockInnerAsync()方法),在获取结果后进行处理。
-
如果回调函数中异常参数 e 为 null,表示获取结果成功。在此情况下,代码判断返回的 ttlRemaining 是否为 null,如果为 null,表示锁的状态为有效,会调用 scheduleExpirationRenewal() 方法进行锁的过期续约。
而锁的自动续约的实现scheduleExpirationRenewal()
EXPIRATION_RENEWAL_MAP.putIfAbsent(this.getEntryName(), entry)方法将这个新的Entry对象放入一个名为EXPIRATION_RENEWAL_MAP的映射中,使用this.getEntryName()方法返回一个唯一的名称(可以理解为锁的名称)作为键。
putIfAbesent()如果不存在再往里面写,返回null;如果是重入,也就是已经存在与当前键关联的Entry对象,在这种情况下,将当前线程ID添加到旧的Entry对象中。这样就保证了线程的唯一,实现可重入唯一。
如果是第一次进入的线程,则把当前线程ID添加到新的Entry对象中,并且调用this.renewExpiration()方法。这个方法用于更新过期时间。
结论:
watchdog每10秒检查一次,续期时间为30秒。当程序没有显式释放锁的操作时,watchdog会不断地执行续期操作,确保分布式锁的key不会过期。这样可以避免其他节点认为锁已经过期而尝试获取锁。
要想看门狗机制启动,不能传leaseTime参数。
锁重试和waitTime参数挂钩,看门狗机制和leaseTime参数挂钩。
假设释放锁的操作异常了,是否一直续约?
并不会,EXPIRATION_RENEWAL_MAP中的目标 ExpirationEntry 对象已经被移除了,watch dog 通过判断后就不会继续给锁续期了;
所以释放锁的逻辑一定要放在finally体中;
// 获取当前这把锁的任务
EXPIRATION_RENEWAL_MAP.get(getEntryName());
if (ent == null) {
return;
}
总结
可重入内容