Redisson分布式锁整体流程图
Redisson分布式锁源码流程图
Redisson分布式锁源码解析
获取分布式锁lock
private void lock(long leaseTime, TimeUnit unit, boolean interruptibly) throws InterruptedException {
//获取当前线程ID
long threadId = Thread.currentThread().getId();
/*
*
* 尝试获取分布式锁
* a.如果获取到锁:返回null
* b.如果没有获取到锁:返回当前分布式锁的剩余的过期时间
*/
Long ttl = tryAcquire(-1, leaseTime, unit, threadId);
// lock acquired
if (ttl == null) {
return;
}
//ttl不为null说明锁被其他线程占用,没有获取到锁。订阅解锁消息
CompletableFuture<RedissonLockEntry> future = subscribe(threadId);
pubSub.timeout(future);
RedissonLockEntry entry;
//订阅解锁消息:如果分布式锁未进行解锁(pub解锁消息),当前线程进入阻塞状态。当接收到分布式锁的pub消息,当前线程被唤醒继续执行
if (interruptibly) {
entry = commandExecutor.getInterrupted(future);
} else {
entry = commandExecutor.get(future);
}
try {
while (true) {
//当接收到分布式锁的pub消息,当前线程被唤醒继续执行,继续尝试获取锁
ttl = tryAcquire(-1, leaseTime, unit, threadId);
// lock acquired
//获取成功跳出循环
if (ttl == null) {
break;
}
// waiting for message
if (ttl >= 0) {
try {
//阻塞ttl毫秒后继续执行
entry.getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
//线程被打断,直接跳出循环
if (interruptibly) {
throw e;
}
//线程被打断,但是没有设置打断跳出循环,则继续阻塞ttl毫秒后继续执行
entry.getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
}
} else {
if (interruptibly) {
entry.getLatch().acquire();
} else {
entry.getLatch().acquireUninterruptibly();
}
}
}
} finally {
unsubscribe(entry, threadId);
}
// get(lockAsync(leaseTime, unit));
}
详细步骤如下:
- 获取当前获取锁的线程ID。
- 调用
tryAcquire
方法尝试获取锁。
tryAcquire
方法返回null
,说明获取到了锁。tryAcquire
方法返回不是null
值,说明没有获取到了锁,返回的Long
值指的是其他线程占用该分布式锁的过期时间,单位为毫秒。- 未获取到锁的线程订阅(
sub
)解锁(pub
)消息,如果分布式锁未进行解锁(pub解锁消息),当前线程进入阻塞状态。当接收到分布式锁的pub消息,当前线程被唤醒继续执行,进入while
循环调用tryAcquire
方法继续争抢分布式锁。while
中:
- 抢到了分布式锁,则跳出循环,并执行finally语句块的取消订阅解锁消息
- 如果没有抢到分布式锁,则执行
entry.getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
方法阻塞分布式锁剩余时间ttl
毫秒后,继续while
循环争取分布式锁,直到抢到分布式锁。
尝试获取分布式锁tryAcquire
尝试获取分布式锁,获取到分布式锁后,开启一个开门狗,为分布式锁续期。为获取到分布式锁,返回分布式锁剩余的过期时间,毫秒为单位。
private <T> RFuture<Long> tryAcquireAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId) {
RFuture<Long> ttlRemainingFuture;
//获取分布式锁
//返回null则说明获取到了分布式锁
//返回不为null说明没有获取到分布式锁,返回的是分布式锁的剩余失效时间
if (leaseTime > 0) {
ttlRemainingFuture = tryLockInnerAsync(waitTime, leaseTime, unit, threadId, RedisCommands.EVAL_LONG);
} else {
ttlRemainingFuture = tryLockInnerAsync(waitTime, internalLockLeaseTime,
TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);
}
//解锁的
CompletionStage<Long> s = handleNoSync(threadId, ttlRemainingFuture);
ttlRemainingFuture = new CompletableFutureWrapper<>(s);
//如果获取到分布式锁,则使用看门狗进行锁的续期操作。默认过期时间是30秒,看门狗续期的时间间隔是过期时间的三分之一。
CompletionStage<Long> f = ttlRemainingFuture.thenApply(ttlRemaining -> {
// lock acquired
if (ttlRemaining == null) {
if (leaseTime > 0) {
internalLockLeaseTime = unit.toMillis(leaseTime);
} else {
scheduleExpirationRenewal(threadId);
}
}
return ttlRemaining;
});
return new CompletableFutureWrapper<>(f);
}
详细步骤如下:
- 调用
tryLockInnerAsync
方法执行Lua
脚本获取锁,获取失败返回锁的过期时间。获取成功返回null
。- 获取分布式锁成功开启一个开门狗,进行锁的续期操作。默认过期时间是30秒,看门狗续期的时间间隔是过期时间的三分之一。
- 未获取到锁则返回锁的过期时间,单位毫秒。
获取分布式锁的lua
脚本
/**
* 执行Lua脚本获取锁
* 1.key是否存在
* 2.重入锁+1
* 3.重置锁的过期时间(默认30秒)
*/
<T> RFuture<T> tryLockInnerAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
return evalWriteAsync(getRawName(), LongCodec.INSTANCE, command,
"if ((redis.call('exists', KEYS[1]) == 0) " +
"or (redis.call('hexists', KEYS[1], ARGV[2]) == 1)) then " +
"redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return nil; " +
"end; " +
"return redis.call('pttl', KEYS[1]);",
Collections.singletonList(getRawName()), unit.toMillis(leaseTime), getLockName(threadId));
}
详细步骤如下
- key是否存在
- 重入锁+1
- 重置锁的过期时间(默认30秒)
分布式锁续期
private void renewExpiration() {
ExpirationEntry ee = EXPIRATION_RENEWAL_MAP.get(getEntryName());
if (ee == null) {
return;
}
Timeout task = commandExecutor.getServiceManager().newTimeout(new TimerTask() {
@Override
public void run(Timeout timeout) throws Exception {
ExpirationEntry ent = EXPIRATION_RENEWAL_MAP.get(getEntryName());
if (ent == null) {
return;
}
Long threadId = ent.getFirstThreadId();
if (threadId == null) {
return;
}
//执行分布式锁续期的lua脚本
CompletionStage<Boolean> future = renewExpirationAsync(threadId);
future.whenComplete((res, e) -> {
if (e != null) {
log.error("Can't update lock {} expiration", getRawName(), e);
EXPIRATION_RENEWAL_MAP.remove(getEntryName());
return;
}
if (res) {
// reschedule itself
//自动续期
renewExpiration();
} else {
//取消分布式锁续期
cancelExpirationRenewal(null);
}
});
}
}, internalLockLeaseTime / 3, TimeUnit.MILLISECONDS);
ee.setTimeout(task);
}
分布式锁续期lua
脚本
protected CompletionStage<Boolean> renewExpirationAsync(long threadId) {
return evalWriteAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return 1; " +
"end; " +
"return 0;",
Collections.singletonList(getRawName()),
internalLockLeaseTime, getLockName(threadId));
}