目录
1、Redisson lock 方法原理解析
1. 如果指定了过期时间
2. 如果没有指定过期时间
3. lock 方法的主要步骤
Redisson lock 方法完整代码
分步骤解释
步骤 1:尝试获取锁
步骤 2:获取锁失败,发起订阅
步骤 3:循环等待锁释放和尝试获取锁
小结
2、Redisson tryLock 方法原理解析
1. 如果指定了过期时间
2. 如果没有指定过期时间
3. tryLock 方法的主要步骤
Redisson tryLock 方法完整代码
分步骤解释
步骤 1:尝试获取锁
步骤 2:获取锁失败,计算剩余时间并发起订阅
步骤 3:循环等待锁释放和尝试获取锁
小结
3、Redisson unlock 方法原理解析
unlock 方法的主要步骤
Redisson unlock 方法完整代码
分步骤解释
步骤 1:调用 unlock 方法
步骤 2:异步解锁操作
步骤 3:等待异步操作完成
小结
1、Redisson lock
方法原理解析
1. 如果指定了过期时间
- 异步续命机制(Watchdog 机制)不再生效,锁会在指定的时间过期并自动释放。
2. 如果没有指定过期时间
- 启动 Watchdog 机制,自动续命锁,直到显式调用
unlock()
方法释放锁为止。
3. lock
方法的主要步骤
以下是 Redisson lock
方法的完整代码及其详细分步骤解释。
Redisson lock
方法完整代码
private void lock(long leaseTime, TimeUnit unit, boolean interruptibly) throws InterruptedException {
long threadId = Thread.currentThread().getId(); // 获取当前线程ID
Long ttl = tryAcquire(-1, leaseTime, unit, threadId); // 尝试获取锁,等待时间为-1,表示无限等待
if (ttl == null) { // 如果成功获取到锁,直接返回
return;
}
CompletableFuture<RedissonLockEntry> future = subscribe(threadId); // 订阅锁释放通知
pubSub.timeout(future); // 设置超时回调
RedissonLockEntry entry;
if (interruptibly) {
entry = commandExecutor.getInterrupted(future); // 获取可中断的锁条目
} else {
entry = commandExecutor.get(future); // 获取锁条目
}
try {
while (true) {
ttl = tryAcquire(-1, leaseTime, unit, threadId); // 尝试重新获取锁
if (ttl == null) { // 如果成功获取到锁,退出循环
break;
}
if (ttl >= 0) {
try {
entry.getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS); // 等待锁释放通知
} catch (InterruptedException e) {
if (interruptibly) {
throw e;
}
entry.getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS); // 再次等待锁释放通知
}
} else {
if (interruptibly) {
entry.getLatch().acquire(); // 等待锁释放通知
} else {
entry.getLatch().acquireUninterruptibly(); // 等待锁释放通知,不可中断
}
}
}
} finally {
unsubscribe(entry, threadId); // 取消订阅
}
}
分步骤解释
步骤 1:尝试获取锁
- 方法调用:
tryAcquire(-1, leaseTime, unit, threadId)
- 解释:
- 使用 Lua 脚本尝试原子性地获取锁。
- 如果锁不存在,创建新锁并设置过期时间。
- 如果锁存在并且由当前线程持有,增加锁的重入计数并重新设置过期时间。
- 如果成功获取到锁,返回
null
,否则返回锁的剩余存活时间。
private Long tryAcquire(long waitTime, long leaseTime, TimeUnit unit, long threadId) {
return evalWrite(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_LONG,
"if (redis.call('exists', KEYS[1]) == 0) then " +
"redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return nil; " +
"end; " +
"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
"redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return nil; " +
"end; " +
"return redis.call('pttl', KEYS[1]);",
Collections.singletonList(getRawName()), unit.toMillis(leaseTime), getLockName(threadId));
}
-
Lua 脚本原理
-
检查锁是否存在:
if (redis.call('exists', KEYS[1]) == 0) then redis.call('hincrby', KEYS[1], ARGV[2], 1); redis.call('pexpire', KEYS[1], ARGV[1]); return nil; end;
- 如果锁不存在(
exists
返回 0),则创建一个新的锁,并将其设置为当前线程持有,同时设置过期时间。
- 如果锁不存在(
-
检查锁是否由当前线程持有:
if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then redis.call('hincrby', KEYS[1], ARGV[2], 1); redis.call('pexpire', KEYS[1], ARGV[1]); return nil; end;
- 如果锁已经存在并且由当前线程持有(
hexists
返回 1),则增加锁的重入计数,并重新设置过期时间。
- 如果锁已经存在并且由当前线程持有(
-
返回锁的剩余存活时间:
return redis.call('pttl', KEYS[1]);
- 如果锁存在且不由当前线程持有,则返回锁的剩余存活时间。
-
步骤 2:获取锁失败,发起订阅
如果初次尝试获取锁失败,Redisson 会订阅锁的释放通知。
- 方法调用:
subscribe(threadId)
- 解释:
- 如果初次尝试获取锁失败,Redisson 会订阅锁的释放通知。
- 通过
subscribe
方法订阅锁的释放通知,以便在锁被释放时能够及时收到通知。 pubSub.timeout(future)
设置超时回调,以防订阅过程中出现问题。- 使用
commandExecutor.get
或commandExecutor.getInterrupted
获取订阅结果,根据是否可中断进行选择。
CompletableFuture<RedissonLockEntry> future = subscribe(threadId);
pubSub.timeout(future);
RedissonLockEntry entry;
if (interruptibly) {
entry = commandExecutor.getInterrupted(future);
} else {
entry = commandExecutor.get(future);
}
步骤 3:循环等待锁释放和尝试获取锁
在等待锁释放期间,Redisson 会进入一个循环,不断尝试重新获取锁。
- 代码块:
try {
while (true) {
ttl = tryAcquire(-1, leaseTime, unit, threadId);
if (ttl == null) {
break;
}
if (ttl >= 0) {
try {
entry.getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
if (interruptibly) {
throw e;
}
entry.getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
}
} else {
if (interruptibly) {
entry.getLatch().acquire();
} else {
entry.getLatch().acquireUninterruptibly();
}
}
}
} finally {
unsubscribe(entry, threadId);
}
- 解释:
-
尝试获取锁:
- 在循环中,Redisson 不断调用
tryAcquire
方法尝试获取锁。 - 如果成功获取到锁,退出循环。
- 在循环中,Redisson 不断调用
-
等待锁释放通知:
- 如果获取锁失败且锁的剩余存活时间大于 0,Redisson 会等待锁释放通知。
- 使用
entry.getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS)
方法在指定时间内等待锁的释放。
-
重复尝试获取锁:
- 在锁释放或等待超时后,Redisson 会继续尝试获取锁,直到成功或显式中断。
-
取消订阅:
- 在获取锁成功或最终失败后,调用
unsubscribe(entry, threadId)
取消订阅锁的释放通知。
- 在获取锁成功或最终失败后,调用
-
小结
-
指定过期时间:
- 锁会在指定的时间过期并自动释放,异步续命机制不再生效。
-
未指定过期时间:
- 启动 Watchdog 机制,自动续命锁,确保锁在持有期间不会被自动释放,直到显式调用
unlock()
方法释放锁为止。
- 启动 Watchdog 机制,自动续命锁,确保锁在持有期间不会被自动释放,直到显式调用
2、Redisson tryLock
方法原理解析
tryLock
方法与 lock
方法不同的是,tryLock
方法在获取锁失败时不会一直阻塞,而是根据指定的等待时间和租约时间进行尝试,并返回是否成功获取锁。
1. 如果指定了过期时间
- 异步续命机制(Watchdog 机制)不再生效,锁会在指定的时间过期并自动释放。
2. 如果没有指定过期时间
- 启动 Watchdog 机制,自动续命锁,直到显式调用
unlock()
方法释放锁为止。
3. tryLock
方法的主要步骤
以下是 Redisson tryLock
方法的完整代码及其详细分步骤解释。
Redisson tryLock
方法完整代码
public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException {
long time = unit.toMillis(waitTime);
long current = System.currentTimeMillis();
long threadId = Thread.currentThread().getId();
Long ttl = tryAcquire(waitTime, leaseTime, unit, threadId);
if (ttl == null) {
return true; // 成功获取到锁
}
time -= System.currentTimeMillis() - current;
if (time <= 0) {
acquireFailed(waitTime, unit, threadId);
return false;
}
current = System.currentTimeMillis();
CompletableFuture<RedissonLockEntry> subscribeFuture = subscribe(threadId);
try {
subscribeFuture.get(time, TimeUnit.MILLISECONDS);
} catch (TimeoutException e) {
if (!subscribeFuture.completeExceptionally(new RedisTimeoutException(
"Unable to acquire subscription lock after " + time + "ms. " +
"Try to increase 'subscriptionsPerConnection' and/or 'subscriptionConnectionPoolSize' parameters."))) {
subscribeFuture.whenComplete((res, ex) -> {
if (ex == null) {
unsubscribe(res, threadId);
}
});
}
acquireFailed(waitTime, unit, threadId);
return false;
} catch (ExecutionException e) {
acquireFailed(waitTime, unit, threadId);
return false;
}
try {
time -= System.currentTimeMillis() - current;
if (time <= 0) {
acquireFailed(waitTime, unit, threadId);
return false;
}
while (true) {
long currentTime = System.currentTimeMillis();
ttl = tryAcquire(waitTime, leaseTime, unit, threadId);
if (ttl == null) {
return true; // 成功获取到锁
}
time -= System.currentTimeMillis() - currentTime;
if (time <= 0) {
acquireFailed(waitTime, unit, threadId);
return false;
}
currentTime = System.currentTimeMillis();
if (ttl >= 0 && ttl < time) {
commandExecutor.getNow(subscribeFuture).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
} else {
commandExecutor.getNow(subscribeFuture).getLatch().tryAcquire(time, TimeUnit.MILLISECONDS);
}
time -= System.currentTimeMillis() - currentTime;
if (time <= 0) {
acquireFailed(waitTime, unit, threadId);
return false;
}
}
} finally {
unsubscribe(commandExecutor.getNow(subscribeFuture), threadId);
}
}
分步骤解释
步骤 1:尝试获取锁
- 方法调用:
tryAcquire(waitTime, leaseTime, unit, threadId)
- 解释:
- 使用 Lua 脚本尝试原子性地获取锁。
- 如果锁不存在,创建新锁并设置过期时间。
- 如果锁存在并且由当前线程持有,增加锁的重入计数并重新设置过期时间。
- 如果成功获取到锁,返回
null
,否则返回锁的剩余存活时间。
private <T> RFuture<Long> tryAcquireAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId) {
RFuture<Long> ttlRemainingFuture;
if (leaseTime > 0) {
ttlRemainingFuture = tryLockInnerAsync(waitTime, leaseTime, unit, threadId, RedisCommands.EVAL_LONG);
} else {
ttlRemainingFuture = tryLockInnerAsync(waitTime, internalLockLeaseTime,
TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);
}
CompletionStage<Long> f = ttlRemainingFuture.thenApply(ttlRemaining -> {
if (ttlRemaining == null) {
if (leaseTime > 0) {
internalLockLeaseTime = unit.toMillis(leaseTime);
} else {
scheduleExpirationRenewal(threadId);
}
}
return ttlRemaining;
});
return new CompletableFutureWrapper<>(f);
}
-
Lua 脚本原理
-
检查锁是否存在:
if (redis.call('exists', KEYS[1]) == 0) then redis.call('hincrby', KEYS[1], ARGV[2], 1); redis.call('pexpire', KEYS[1], ARGV[1]); return nil; end;
- 如果锁不存在(
exists
返回 0),则创建一个新的锁,并将其设置为当前线程持有,同时设置过期时间。
- 如果锁不存在(
-
检查锁是否由当前线程持有:
if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then redis.call('hincrby', KEYS[1], ARGV[2], 1); redis.call('pexpire', KEYS[1], ARGV[1]); return nil; end;
- 如果锁已经存在并且由当前线程持有(
hexists
返回 1),则增加锁的重入计数,并重新设置过期时间。
- 如果锁已经存在并且由当前线程持有(
-
返回锁的剩余存活时间:
return redis.call('pttl', KEYS[1]);
- 如果锁存在且不由当前线程持有,则返回锁的剩余存活时间。
-
步骤 2:获取锁失败,计算剩余时间并发起订阅
如果初次尝试获取锁失败,Redisson 会订阅锁的释放通知。
- 方法调用:
subscribe(threadId)
- 解释:
- 如果初次尝试获取锁失败,Redisson 会订阅锁的释放通知。
- 通过
subscribe
方法订阅锁的释放通知,以便在锁被释放时能够及时收到通知。 pubSub.timeout(future)
设置超时回调,以防订阅过程中出现问题。- 使用
commandExecutor.get
或commandExecutor.getInterrupted
获取订阅结果,根据是否可中断进行选择。
CompletableFuture<RedissonLockEntry> subscribeFuture = subscribe(threadId);
try {
subscribeFuture.get(time, TimeUnit.MILLISECONDS);
} catch (TimeoutException e) {
if (!subscribeFuture.completeExceptionally(new RedisTimeoutException(
"Unable to acquire subscription lock after " + time + "ms. " +
"Try to increase 'subscriptionsPerConnection' and/or 'subscriptionConnectionPoolSize' parameters."))) {
subscribeFuture.whenComplete((res, ex) -> {
if (ex == null) {
unsubscribe(res, threadId);
}
});
}
acquireFailed(waitTime, unit, threadId);
return false;
} catch (ExecutionException e) {
acquireFailed(waitTime, unit, threadId);
return false;
}
步骤 3:循环等待锁释放和尝试获取锁
在等待锁释放期间,Redisson 会进入一个循环,不断尝试重新获取锁。
- 代码块:
try {
time -= System.currentTimeMillis() - current;
if (time <= 0) {
acquireFailed(waitTime, unit, threadId);
return false;
}
while (true) {
long currentTime = System.currentTimeMillis();
ttl = tryAcquire(waitTime, leaseTime, unit, threadId);
if (ttl == null) {
return true; // 成功获取到锁
}
time -= System.currentTimeMillis() - currentTime;
if (time <= 0) {
acquireFailed(waitTime, unit, threadId);
return false;
}
currentTime = System.currentTimeMillis();
if (ttl >= 0 && ttl < time) {
commandExecutor.getNow(subscribeFuture).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
} else {
commandExecutor.getNow(subscribeFuture).getLatch().tryAcquire(time, TimeUnit.MILLISECONDS);
}
time -= System.currentTimeMillis() - currentTime;
if (time <= 0) {
acquireFailed(waitTime, unit, threadId);
return false;
}
}
} finally {
unsubscribe(commandExecutor.getNow(subscribeFuture), threadId);
}
- 解释:
- 尝试获取锁:
- 在循环中,Redisson 不断调用
tryAcquire
方法尝试获取锁。 - 如果成功获取到锁,退出循环
- 在循环中,Redisson 不断调用
- 尝试获取锁:
并返回 true
。
-
等待锁释放通知:
- 如果获取锁失败且锁的剩余存活时间大于 0,Redisson 会等待锁释放通知。
- 使用
entry.getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS)
方法在指定时间内等待锁的释放。
-
重复尝试获取锁:
- 在锁释放或等待超时后,Redisson 会继续尝试获取锁,直到成功或显式中断。
-
取消订阅:
- 在获取锁成功或最终失败后,调用
unsubscribe(entry, threadId)
取消订阅锁的释放通知。
- 在获取锁成功或最终失败后,调用
小结
Redisson 的 tryLock
方法提供了一种非阻塞的分布式锁机制,通过以下几个步骤实现:
-
尝试获取锁:
- 通过 Lua 脚本进行原子性操作,确保获取锁的过程是线程安全的。
- 如果成功获取到锁,返回
true
。
-
获取锁失败,计算剩余时间并发起订阅:
- 如果初次获取锁失败,Redisson 会订阅锁的释放通知,并等待一定时间。
-
循环等待锁释放和尝试获取锁:
- 在等待锁释放期间,Redisson 进入循环,不断尝试重新获取锁。
- 使用
entry.getLatch().tryAcquire
方法在指定时间内等待锁的释放。 - 如果成功获取到锁,退出循环并返回
true
,否则在时间用尽后返回false
。
3、Redisson unlock
方法原理解析
unlock
方法用于释放已经持有的锁,确保其他线程可以获取锁。Redisson 通过 Lua 脚本原子性地执行解锁操作,以保证解锁过程的安全性和一致性。
unlock
方法的主要步骤
以下是 Redisson unlock
方法的完整代码及其详细分步骤解释。
Redisson unlock
方法完整代码
@Override
public void unlock() {
long threadId = Thread.currentThread().getId();
RFuture<Boolean> future = unlockAsync(threadId);
commandExecutor.get(future);
}
private <T> RFuture<T> unlockAsync(long threadId) {
return evalWriteAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
"if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " +
"return nil;" +
"end; " +
"local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " +
"if (counter > 0) then " +
"redis.call('pexpire', KEYS[1], ARGV[2]); " +
"return 0; " +
"else " +
"redis.call('del', KEYS[1]); " +
"redis.call('publish', KEYS[2], ARGV[1]); " +
"return 1; "+
"end; " +
"return nil;",
Arrays.<Object>asList(getRawName(), getChannelName()),
LockPubSub.UNLOCK_MESSAGE, internalLockLeaseTime, getLockName(threadId));
}
分步骤解释
步骤 1:调用 unlock
方法
- 方法调用:
unlock()
- 解释:
- 获取当前线程的 ID。
- 调用
unlockAsync
方法进行异步解锁操作。 - 使用
commandExecutor.get(future)
等待异步操作完成。
@Override
public void unlock() {
long threadId = Thread.currentThread().getId();
RFuture<Boolean> future = unlockAsync(threadId);
commandExecutor.get(future);
}
步骤 2:异步解锁操作
- 方法调用:
unlockAsync(threadId)
- 解释:
- 使用 Lua 脚本原子性地执行解锁操作。
- 如果锁由当前线程持有,减少锁的重入计数。
- 如果重入计数减到 0,删除锁并发布解锁消息。
private <T> RFuture<T> unlockAsync(long threadId) {
return evalWriteAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
"if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " +
"return nil;" +
"end; " +
"local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " +
"if (counter > 0) then " +
"redis.call('pexpire', KEYS[1], ARGV[2]); " +
"return 0; " +
"else " +
"redis.call('del', KEYS[1]); " +
"redis.call('publish', KEYS[2], ARGV[1]); " +
"return 1; "+
"end; " +
"return nil;",
Arrays.<Object>asList(getRawName(), getChannelName()),
LockPubSub.UNLOCK_MESSAGE, internalLockLeaseTime, getLockName(threadId));
}
-
Lua 脚本原理
-
检查锁是否由当前线程持有:
if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then return nil; end;
- 如果锁不由当前线程持有(
hexists
返回 0),返回nil
,表示解锁失败。
- 如果锁不由当前线程持有(
-
减少锁的重入计数:
local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); if (counter > 0) then redis.call('pexpire', KEYS[1], ARGV[2]); return 0; else redis.call('del', KEYS[1]); redis.call('publish', KEYS[2], ARGV[1]); return 1; end;
- 如果锁由当前线程持有,减少锁的重入计数(
hincrby
)。 - 如果重入计数大于 0,重新设置锁的过期时间,并返回
0
,表示锁仍然被持有。 - 如果重入计数减到 0,删除锁(
del
),并发布解锁消息(publish
),返回1
,表示锁已释放。
- 如果锁由当前线程持有,减少锁的重入计数(
-
步骤 3:等待异步操作完成
- 方法调用:
commandExecutor.get(future)
- 解释:
- 等待异步解锁操作完成。
- 如果解锁操作失败,抛出异常。
commandExecutor.get(future);
小结
Redisson 的 unlock
方法通过以下几个步骤实现安全可靠的解锁操作:
-
调用
unlock
方法:- 获取当前线程的 ID。
- 调用
unlockAsync
方法进行异步解锁操作。 - 使用
commandExecutor.get(future)
等待异步操作完成。
-
异步解锁操作:
- 使用 Lua 脚本原子性地执行解锁操作,确保操作的安全性和一致性。
- 如果锁由当前线程持有,减少锁的重入计数。
- 如果重入计数减到 0,删除锁并发布解锁消息。
-
等待异步操作完成:
- 等待异步解锁操作完成,如果解锁操作失败,抛出异常。