Redisson源码-多线程之首个获取锁的线程加解锁流程
简介
当有多个线程同时去获取同一把锁时,第一个获取到锁的线程会进行加解锁,其他线程需订阅消息并等待锁释放。
以下源码分析基于redisson-3.17.6版本,不同版本源码会有些许不同需注意。
<dependency>
<groupId>org.redisson</groupId>
<artifactId>redisson</artifactId>
<version>3.17.6</version>
</dependency>
当我们调用Redisson.lock()并且不设置锁时间时,我们进入RedissonLock的lock方法。
public void lock() {
try {
// -1L为锁时间,表示不限时
this.lock(-1L, (TimeUnit)null, false);
} catch (InterruptedException var2) {
throw new IllegalStateException();
}
}
private void lock(long leaseTime, TimeUnit unit, boolean interruptibly) throws InterruptedException {
// 获取当前线程id
long threadId = Thread.currentThread().getId();
// 尝试获取锁
Long ttl = tryAcquire(-1, leaseTime, unit, threadId);
// lock acquired
// 获取到锁直接返回
if (ttl == null) {
return;
}
// 订阅锁消息:当锁被释放的时候,会通过publish发布一条消息,通知其它等待这个锁的线程,锁已经释放。
CompletableFuture<RedissonLockEntry> future = subscribe(threadId);
pubSub.timeout(future);
RedissonLockEntry entry;
if (interruptibly) {
entry = commandExecutor.getInterrupted(future);
} else {
entry = commandExecutor.get(future);
}
try {
// 不停尝试获取锁
while (true) {
ttl = tryAcquire(-1, leaseTime, unit, threadId);
// lock acquired
if (ttl == null) {
break;
}
// waiting for message
// 如果锁的过期时间>=0
if (ttl >= 0) {
try {
// 等待超时时间过去
entry.getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
if (interruptibly) {
throw e;
}
entry.getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
}
// 锁过期时间<0,代表锁的超时时间未设置
} else {
if (interruptibly) {
entry.getLatch().acquire();
} else {
// 无限等待直至获取锁
entry.getLatch().acquireUninterruptibly();
}
}
}
} finally {
unsubscribe(entry, threadId);
}
// get(lockAsync(leaseTime, unit));
}
我们按照代码逻辑先看一下尝试获取锁的代码:
private Long tryAcquire(long waitTime, long leaseTime, TimeUnit unit, long threadId) {
return get(tryAcquireAsync(waitTime, leaseTime, unit, threadId));
}
private <T> RFuture<Long> tryAcquireAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId) {
RFuture<Long> ttlRemainingFuture;
if (leaseTime > 0) {
// 指定了超时时间
ttlRemainingFuture = tryLockInnerAsync(waitTime, leaseTime, unit, threadId, RedisCommands.EVAL_LONG);
} else {
// 未指定超时时间
ttlRemainingFuture = tryLockInnerAsync(waitTime, internalLockLeaseTime,
TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);
}
CompletionStage<Long> f = ttlRemainingFuture.thenApply(ttlRemaining -> {
// lock acquired
// ttlRemaining为null, 本质就是加锁的LUA脚本中返回nil,表示获取锁成功
if (ttlRemaining == null) {
if (leaseTime > 0) {
// 如果设置了超时时间,则更新internalLockLeaseTime为指定的超时时间,并且不会启动看门狗
internalLockLeaseTime = unit.toMillis(leaseTime);
} else {
// 自动续期实现,开启看门狗机制
scheduleExpirationRenewal(threadId);
}
}
return ttlRemaining;
});
return new CompletableFutureWrapper<>(f);
}
tryLockInnerAsync方法里的代码是加锁的核心代码之一:
<T> RFuture<T> tryLockInnerAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
return evalWriteAsync(getRawName(), LongCodec.INSTANCE, command,
"if (redis.call('exists', KEYS[1]) == 0) then " +
"redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return nil; " +
"end; " +
"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
"redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return nil; " +
"end; " +
"return redis.call('pttl', KEYS[1]);",
Collections.singletonList(getRawName()), unit.toMillis(leaseTime), getLockName(threadId));
}
先介绍一下lua脚本的中的参数:
KEYS[1]:锁住的对象(锁住的key),相当于下图的HASH
ARGV[1]:锁的过期时间
ARGV[2]:UUID+当前线程id,相当于下图的key
nil:相当于null
初次获取锁:
锁重入:
接下来我们详细看一下lua脚本的逻辑:
// 通过exists指令判断需要加锁的key是否存在,如果不存在,说明还没被加锁,可以直接进行加锁
"if (redis.call('exists', KEYS[1]) == 0) then " +
// 通过hincrby指令往redis中插入一个哈希结构的数据,key[1]=加锁key ARGV[2]=uuid+当前线程ID 1=锁的重入次数
"redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
// 通过pexpire指令设置锁的过期时间:ARGV[1]=锁的过期时间
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
// 返回nil, 表示加锁成功:nil=null
"return nil; " +
"end; " +
// 如下是可重入锁的逻辑
// 通过hexists指令判断当前的锁是不是自己的,只有是自己的锁时,才支持可重入
"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
// 通过hincrby指令更新hash结构的数据(锁结构数据),将value对应的可重入次数加一
"redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
// 通过pexpire指令设置锁的过期时间
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
// 返回nil, 表示加锁成功
"return nil; " +
"end; " +
// 如果当前已经有人获取了锁,并且这个锁不是自己的,那么将会执行pttl指令,返回当前锁剩余的过期时间
"return redis.call('pttl', KEYS[1]);"
从上面我们可以看出,redisson加锁的本质是通过执行lua脚本,返回nil(相当于null)或锁的剩余过期时间。如果返回并且未设置过期时间则开启看门狗机制。
接下来我们看下开启看门狗机制的代码:
protected void scheduleExpirationRenewal(long threadId) {
ExpirationEntry entry = new ExpirationEntry();
// 将当前锁的名称和 ExpirationEntry 对象放入 EXPIRATION_RENEWAL_MAP中,并返回之前关联的对象(如果存在)。
// 本质就是检查当前锁是否已开启自动续期。
ExpirationEntry oldEntry = EXPIRATION_RENEWAL_MAP.putIfAbsent(getEntryName(), entry);
// 已开启
if (oldEntry != null) {
oldEntry.addThreadId(threadId);
// 未开启
} else {
entry.addThreadId(threadId);
try {
// 执行自动续期机制
renewExpiration();
} finally {
if (Thread.currentThread().isInterrupted()) {
// 如果线程中断则解除自动续期机制避免死锁
cancelExpirationRenewal(threadId);
}
}
}
}
private void renewExpiration() {
// 获取当前锁名称的 ExpirationEntry 对象。
ExpirationEntry ee = EXPIRATION_RENEWAL_MAP.get(getEntryName());
if (ee == null) {
return;
}
// 构建Timeout任务去执行锁续期,本质是调用了netty框架中的newTimeout方法,相当于一个延迟定时任务。
// 相当于每隔 过期时间/3 (默认10秒)毫秒,递归调用renewExpiration方法去执行锁续期直至锁被释放。
Timeout task = commandExecutor.getConnectionManager().newTimeout(new TimerTask() {
@Override
public void run(Timeout timeout) throws Exception {
// 二次获取当前锁名称的 ExpirationEntry 对象。
ExpirationEntry ent = EXPIRATION_RENEWAL_MAP.get(getEntryName());
if (ent == null) {
return;
}
// 获取 ExpirationEntry 对象中的第一个线程 ID。
Long threadId = ent.getFirstThreadId();
if (threadId == null) {
return;
}
// 锁续期
CompletionStage<Boolean> future = renewExpirationAsync(threadId);
future.whenComplete((res, e) -> {
if (e != null) {
log.error("Can't update lock " + getRawName() + " expiration", e);
EXPIRATION_RENEWAL_MAP.remove(getEntryName());
return;
}
// 续期成功
if (res) {
// reschedule itself
// 递归调用自身,不断续期
renewExpiration();
// 续期失败,表示锁被释放
} else {
// 取消定时任务等操作
cancelExpirationRenewal(null);
}
});
}
}, internalLockLeaseTime / 3, TimeUnit.MILLISECONDS);
ee.setTimeout(task);
}
下面我们看一下锁续期的核心代码:
protected CompletionStage<Boolean> renewExpirationAsync(long threadId) {
return evalWriteAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return 1; " +
"end; " +
"return 0;",
Collections.singletonList(getRawName()),
internalLockLeaseTime, getLockName(threadId));
}
该lua脚本的中的参数与上面相同:
KEYS[1]:锁住的对象(锁住的key)
ARGV[1]:锁的过期时间
ARGV[2]:UUID+当前线程id
下面我们看一下这段lua脚本的逻辑:
// 通过hexists指令判断当前的锁是不是自己的
"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
// 如果是的话通过pexpire指令设置锁的过期时间
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return 1; " +
"end; " +
// 否则返回0
"return 0;",
整理上文可以看出看门狗机制简单来说就是每隔 过期时间/3 毫秒去执行lua脚本,若锁未被释放则刷新其过期时间,直至锁被释放为止。
接下来我们再看下解锁的逻辑:
@Override
public void unlock() {
try {
// 释放锁
get(unlockAsync(Thread.currentThread().getId()));
} catch (RedisException e) {
if (e.getCause() instanceof IllegalMonitorStateException) {
throw (IllegalMonitorStateException) e.getCause();
} else {
throw e;
}
}
public RFuture<Void> unlockAsync(long threadId) {
// 释放锁的核心代码
RFuture<Boolean> future = unlockInnerAsync(threadId);
CompletionStage<Void> f = future.handle((opStatus, e) -> {
// 取消看门狗机制(就是取消上文的定时任务等操作)
cancelExpirationRenewal(threadId);
if (e != null) {
throw new CompletionException(e);
}
if (opStatus == null) {
IllegalMonitorStateException cause = new IllegalMonitorStateException("attempt to unlock lock, not locked by current thread by node id: "
+ id + " thread-id: " + threadId);
throw new CompletionException(cause);
}
return null;
});
return new CompletableFutureWrapper<>(f);
}
下面我们看一下解锁的核心代码:
protected RFuture<Boolean> unlockInnerAsync(long threadId) {
return evalWriteAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
"if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " +
"return nil;" +
"end; " +
"local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " +
"if (counter > 0) then " +
"redis.call('pexpire', KEYS[1], ARGV[2]); " +
"return 0; " +
"else " +
"redis.call('del', KEYS[1]); " +
"redis.call('publish', KEYS[2], ARGV[1]); " +
"return 1; " +
"end; " +
"return nil;",
Arrays.asList(getRawName(), getChannelName()), LockPubSub.UNLOCK_MESSAGE, internalLockLeaseTime, getLockName(threadId));
}
先说明一下该lua脚本的中的参数:
KEYS[1]:锁住的对象(锁住的key)
KEYS[2]:监听该锁的频道
ARGV[1]:解锁消息
ARGV[2]:锁的过期时间
ARGV[3]:UUID+当前线程id
下面我们看下这段lua脚本的逻辑:
// 通过hexists指令判断当前的锁是不是自己的
"if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " +
// 不是自己的则返回nil(相当于null)
"return nil;" +
"end; " +
// 则使用 hincrby 指令将字段锁的可重入次数减去 1,即减少持有锁的线程数。
"local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " +
// 如果结果 > 0,代表解锁成功,但是锁仍然存在
"if (counter > 0) then " +
// 通过pexpire指令设置锁的过期时间
"redis.call('pexpire', KEYS[1], ARGV[2]); " +
// 返回0
"return 0; " +
"else " +
// 如果结果 < 0 ,代表持有锁的线程数为0,这时需要完全释放锁,通过del指令删除指定key的锁
"redis.call('del', KEYS[1]); " +
// 通过publish指令向订阅该锁的频道发送解锁消息
"redis.call('publish', KEYS[2], ARGV[1]); " +
// 返回1
"return 1; " +
"end; " +
"return nil;",
以上我们解释说明了单个线程在不等待锁的情况下,直接获取锁,对锁进行续期和解锁的代码逻辑,可以看出加解锁本质上都是通过lua脚本去执行,当有多个线程同时去获取锁时,第一个获取到锁的线程会按照此逻辑执行,其他线程需订阅消息并等待锁释放。