看redisson是如何解决锁超时问题
什么是锁超时问题?
比如利用redis实现的分布式锁会设置一定的过期时间,超过该时间,缓存自动删除,锁被释放。这是防止因程序宕机等原因导致锁一直被占用。
但存在一定的问题,如果是该业务执行过程中有点小阻塞,业务还没执行完,结果锁超时释放,下一个线程就又会进来同时执行,产生并发问题。
问题1:怎么保证业务没执行完时,不让锁自己释放呢?
不断给锁续约。
即上锁之后,每隔一段时间增加缓存中的过期时间,一致重复执行,直到业务执行完。业务执行完会发出完成的信号,循环程序监听到信号后跳出不再续约。
问题2:如果此时程序宕机了业务一致不执行完,难道会一直续约么?
不会,续约是java程序控制的,程序挂掉,续约这套操作也就不再执行,锁到过期时间就会被释放。
此处我们以redisson举例
redisson中实现了该机制,叫做watchDog,看门狗
。
进入redisson源码:
进入上锁tryLock(long waitTime, long leaseTime, TimeUnit unit)
方法,该方法在可重试锁文章中有讲到。进入tryAcquire(waitTime, leaseTime, unit, threadId)
方法,最终会进入tryAcquireAsync()
方法,进入源码:
private RFuture<Long> tryAcquireAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId) {
RFuture<Long> ttlRemainingFuture;
if (leaseTime > 0) {
ttlRemainingFuture = tryLockInnerAsync(waitTime, leaseTime, unit, threadId, RedisCommands.EVAL_LONG);
} else {
/**
【构建上锁任务】
不传过期时间的话,才会进入看门狗逻辑
该方法会传进一个默认时间
*/
ttlRemainingFuture = tryLockInnerAsync(waitTime, internalLockLeaseTime,
TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);
}
CompletionStage<Long> s = handleNoSync(threadId, ttlRemainingFuture);
ttlRemainingFuture = new CompletableFutureWrapper<>(s);
CompletionStage<Long> f = ttlRemainingFuture.thenApply(ttlRemaining -> {
// lock acquired
if (ttlRemaining == null) {
//【获取锁成功】
if (leaseTime > 0) {
internalLockLeaseTime = unit.toMillis(leaseTime);
} else {
//【定时过期续约方法】
scheduleExpirationRenewal(threadId);
}
}
return ttlRemaining;
});
return new CompletableFutureWrapper<>(f);
}
看到有个判断,不传过期时间才会实现看门狗机制,传入默认时间internalLockLeaseTime
,这个默认时间是构造方法赋值的,如下可以看到是调用了getLockWatchdogTimeout()
看门狗方法。默认为30s。
public RedissonLock(CommandAsyncExecutor commandExecutor, String name) {
super(commandExecutor, name);
this.commandExecutor = commandExecutor;
this.internalLockLeaseTime = getServiceManager().getCfg().getLockWatchdogTimeout();
this.pubSub = commandExecutor.getConnectionManager().getSubscribeService().getLockPubSub();
}
//默认为30s
private long lockWatchdogTimeout = 30 * 1000;
如果获取锁成功,会进入核心方法scheduleExpirationRenewal(threadId)
,进入方法:
protected void scheduleExpirationRenewal(long threadId) {
ExpirationEntry entry = new ExpirationEntry();
//将该锁相关的实力存进map中
ExpirationEntry oldEntry = EXPIRATION_RENEWAL_MAP.putIfAbsent(getEntryName(), entry);
if (oldEntry != null) {
oldEntry.addThreadId(threadId);
} else {
entry.addThreadId(threadId);
try {
renewExpiration();
} finally {
if (Thread.currentThread().isInterrupted()) {
cancelExpirationRenewal(threadId);
}
}
}
}
可以看到会将当前所相关的实体存入map中,如果是第一次进入,则会执行renewExpiration()
方法,该方法是续约的关键,进入方法:
private void renewExpiration() {
ExpirationEntry ee = EXPIRATION_RENEWAL_MAP.get(getEntryName());
if (ee == null) {
return;
}
//【1 延迟**时间后执行一次】
Timeout task = getServiceManager().newTimeout(new TimerTask() {
@Override
public void run(Timeout timeout) throws Exception {
ExpirationEntry ent = EXPIRATION_RENEWAL_MAP.get(getEntryName());
if (ent == null) {
return;
}
Long threadId = ent.getFirstThreadId();
if (threadId == null) {
return;
}
//【2 更新有效期】
CompletionStage<Boolean> future = renewExpirationAsync(threadId);
future.whenComplete((res, e) -> {
if (e != null) {
log.error("Can't update lock {} expiration", getRawName(), e);
EXPIRATION_RENEWAL_MAP.remove(getEntryName());
return;
}
//【3 执行成功则再次调用】
if (res) {
// reschedule itself
renewExpiration();
} else {
cancelExpirationRenewal(null);
}
});
}
}, internalLockLeaseTime / 3, TimeUnit.MILLISECONDS);
ee.setTimeout(task);
}
该方法中会执行getServiceManager().newTimeout()
方法,代表延迟一定时间之后执行,其中newTimeout(TimerTask task, long delay, TimeUnit unit)
方法task
代表要执行的任务内容、delay
代表多长时间之后执行。
而在renewExpiration
方法中,延迟时间为internalLockLeaseTime / 3
,internalLockLeaseTime为获得的看门狗默认过期时间,默认为30s,此处就是10s后执行。
看执行内容代码,【2】处方法更新缓存过期时间,方法内部还是使用lua脚本实现
如果执行成功,见【3】处,会再次调用该方法,那就会一直每个十秒执行一次。
那什么时候再能停止续约行为呢?
释放锁时,停止续约。
进入unlock() —> unlockAsync() —> unlockAsync0()
,进入unlockAsync0
方法:
private RFuture<Void> unlockAsync0(long threadId) {
CompletionStage<Boolean> future = unlockInnerAsync(threadId);
CompletionStage<Void> f = future.handle((opStatus, e) -> {
//【取消续约操作】
cancelExpirationRenewal(threadId);
if (e != null) {
if (e instanceof CompletionException) {
throw (CompletionException) e;
}
throw new CompletionException(e);
}
if (opStatus == null) {
IllegalMonitorStateException cause = new IllegalMonitorStateException("attempt to unlock lock, not locked by current thread by node id: "
+ id + " thread-id: " + threadId);
throw new CompletionException(cause);
}
return null;
});
return new CompletableFutureWrapper<>(f);
}
核心方法时cancelExpirationRenewal
,进入:
protected void cancelExpirationRenewal(Long threadId) {
ExpirationEntry task = EXPIRATION_RENEWAL_MAP.get(getEntryName());
if (task == null) {
return;
}
if (threadId != null) {
//【1 移除线程任务】
task.removeThreadId(threadId);
}
if (threadId == null || task.hasNoThreads()) {
Timeout timeout = task.getTimeout();
if (timeout != null) {
//【2 清楚时间任务】
timeout.cancel();
}
//【3 从静态变量map中,移除实体类】
EXPIRATION_RENEWAL_MAP.remove(getEntryName());
}
}
可以看到主要取消了三个点:
1 移除线程任务:task.removeThreadId(threadId)
2 清楚时间任务:timeout.cancel()
3 从静态变量map中,移除实体类:EXPIRATION_RENEWAL_MAP.remove(getEntryName())
至此续约机制结束。