这里作者将大家走进redisson,读完这篇相信加深你对redisson的获取锁,重入,超时,看门狗,发布订阅等原理和功能的理解。
本文将深入原理代码,给出每行代码的意义以及最后的效果,过程有些枯燥,但探索的过程是快乐的,同时也希望大家看的过程当中去一起去查看源码。
redisson
lock
参数介绍:
long leaseTime
:
这个参数指定锁的持有时长。在这段时间结束后,锁会自动释放,除非它被续期或手动解锁。这是一种安全措施,确保在出现问题(如应用崩溃或网络问题)时锁最终会被释放,避免死锁情况的发生。
指定的时间长度通常取决于预期的操作时间和网络延迟。设置适当的租约时间可以防止资源长时间被锁定,并允许其他线程或进程在合理的时间内访问资源。
TimeUnit unit
:
这个参数定义了
leaseTime
的时间单位。TimeUnit
是一个枚举类型,提供了时间单位的常量,如TimeUnit.SECONDS
,TimeUnit.MILLISECONDS
等,使你能以不同的时间单位指定锁的租约时间。使用
TimeUnit
参数可以提供额外的灵活性和清晰度,允许调用者根据需要轻松地指定租约时间,而不必担心时间单位的转换。
boolean interruptibly
:
这个布尔值指定了锁获取的行为应该如何响应中断。如果设置为
true
,那么当线程在等待获取锁的过程中被中断时,它会响应这个中断,并且可能会抛出一个InterruptedException
。这允许线程在等待获取锁时可以被取消或中断。如果设置为
false
,即使线程被中断,也会继续等待锁,不响应中断。这通常用于那些必须等到获取锁后才能继续的情况,确保操作的完成。
private void lock(long leaseTime, TimeUnit unit, boolean interruptibly) throws InterruptedException {
//获取当前线程的id
long threadId = Thread.currentThread().getId();
//ttl(Time To Live)代表锁的剩余存活时间。它是尝试获取锁时返回的值,表示锁在自动释放之前还能保持多久
Long ttl = tryAcquire(leaseTime, unit, threadId);
// lock acquired
//如果为为null,代表锁获取成功。
if (ttl == null) {
return;
}
//redis的发布订阅机制,用来订阅通知。
RFuture<RedissonLockEntry> future = subscribe(threadId);
//负责发送命令和处理响应
commandExecutor.syncSubscription(future);
try {
//这段代码实现了一个循环,不断尝试获取锁,直到成功。在尝试获取锁的过程中,根据不同的情况采取不同的行动,比如等待一定时间后重试,或者在被中断时根据interruptibly变量的值决定是否继续等待
while (true) {
ttl = tryAcquire(leaseTime, unit, threadId);
// lock acquired,锁获取成功,直接退出循环
if (ttl == null) {
break;
}
// waiting for message
if (ttl >= 0) {
try {
//这里尝试等到锁知道锁可用或者超时。
getEntry(threadId).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
//如果等待过程当中被中断了,参数interruptibly为true,抛出异常,否则继续等待。
if (interruptibly) {
throw e;
}
//释放锁的人释放之后会发出一个信号量,
getEntry(threadId).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
}
//小于0,代表没有超过指定时间
} else {
if (interruptibly) {
//选择阻塞调用acquire()(响应中断)或acquireUninterruptibly()(不响应中断)。
getEntry(threadId).getLatch().acquire();
} else {
//getLatch()提供的信号量成为了一个同步点,线程可以在这里等待,直到它们可以安全地继续执行
getEntry(threadId).getLatch().acquireUninterruptibly();
}
}
}
} finally {
unsubscribe(future, threadId);
}
// get(lockAsync(leaseTime, unit));
}
tryAcquire
private Long tryAcquire(long leaseTime, TimeUnit unit, long threadId) {
//返回一个获取锁的方法
return get(tryAcquireAsync(leaseTime, unit, threadId));
}
tryAcquireAsync
private <T> RFuture<Long> tryAcquireAsync(long leaseTime, TimeUnit unit, long threadId) {
//如果leaseTime不等于-1,意味着用户指定了一个有效的租约时间,就不会走自带的逻辑。
if (leaseTime != -1) {
return tryLockInnerAsync(leaseTime, unit, threadId, RedisCommands.EVAL_LONG);
}
//如果没有设置leaseTime的值,方法获取默认的锁看门狗超时时间,再次调用tryLockInnerAsync。这意味着将使用默认的超时设置来尝试获取锁。
RFuture<Long> ttlRemainingFuture = tryLockInnerAsync(commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(), TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);
ttlRemainingFuture.onComplete((ttlRemaining, e) -> {
//e表示回调的异常,如果为null则说明操作完成,否则直接返回。
if (e != null) {
return;
}
// lock acquired
//ttlRemaining是回调的结果,表示锁的剩余存活时间。
if (ttlRemaining == null) {
//在锁已经成功获取的情况下,看当前线程是否需要续订锁。
scheduleExpirationRenewal(threadId);
}
});
//如果锁当前被其他线程持有,这个值可能表示要等待多长时间才有可能再次尝试获取锁。
//null:通常表示锁已成功被当前线程获取,没有剩余等待时间。
return ttlRemainingFuture;
}
订阅发布机制目的是为了在当前线程不能立即获得锁时,订阅一个通知,当锁被释放或变为可用时,当前线程可以得到通知。通过这种方式,Redisson 实现了一个高效且响应迅速的分布式锁机制,允许多个线程或进程协调地访问共享资源。
ExprirationEntry
用于管理锁续约机制中线程和超时信息的数据结构
public static class ExpirationEntry {
//一个映射,键是线程ID(Long),值是计数器(Integer),用于跟踪每个线程对应的锁请求次数。LinkedHashMap保持了插入顺序,这可能对确定哪个线程首先请求锁有用。
private final Map<Long, Integer> threadIds = new LinkedHashMap<>();
//一个Timeout类型的变量,用来存储与该锁条目关联的超时任务。标记为volatile以确保在多线程环境中线程安全地访问。
private volatile Timeout timeout;
public ExpirationEntry() {
super();
}
//添加或增加一个线程ID的计数。如果该线程ID已存在,则增加其计数;如果不存在,则将其计数设置为1。
public void addThreadId(long threadId) {
Integer counter = threadIds.get(threadId);
if (counter == null) {
counter = 1;
} else {
counter++;
}
threadIds.put(threadId, counter);
}
//检查threadIds映射是否为空,即没有任何线程ID与此锁条目关联。
public boolean hasNoThreads() {
return threadIds.isEmpty();
}
public Long getFirstThreadId() {
if (threadIds.isEmpty()) {
return null;
}
return threadIds.keySet().iterator().next();
}
public void removeThreadId(long threadId) {
Integer counter = threadIds.get(threadId);
if (counter == null) {
return;
}
counter--;
if (counter == 0) {
threadIds.remove(threadId);
} else {
threadIds.put(threadId, counter);
}
}
public void setTimeout(Timeout timeout) {
this.timeout = timeout;
}
public Timeout getTimeout() {
return timeout;
}
}
syncSubscription
同步等待一个异步订阅操作的完成,并处理超时以及中断情况,确保资源正确管理和异常处理逻辑得到妥善执行
public void syncSubscription(RFuture<?> future) {
//获取redis的配置信息,该配置信息存储在MasterSlaveServersConfig对象中,MasterSlaveServersConfig可能包含了与主从服务器相关的配置设置
MasterSlaveServersConfig config = connectionManager.getConfig();
try {
//计算超时时间,基于配置中的单次操作超时时间(getTimeout())、重试间隔(getRetryInterval())以及重试次数(getRetryAttempts())计算得出
int timeout = config.getTimeout() + config.getRetryInterval() * config.getRetryAttempts();
//等待future的完成
if (!future.await(timeout)) {
((RPromise<?>) future).tryFailure(new RedisTimeoutException("Subscribe timeout: (" + timeout + "ms). Increase 'subscriptionsPerConnection' and/or 'subscriptionConnectionPoolSize' parameters."));
}
// 如果在指定的超时时间内future没有完成(即await返回false),则尝试将future标记为失败,并抛出一个带有超时信息的
} catch (InterruptedException e) {
//处理线程中断
Thread.currentThread().interrupt();
}
//同步完成,忽略中断,最后,调用syncUninterruptibly()确保当前线程在future完成之前不会继续执行,syncUninterruptibly()会忽略线程的中断状态,确保等待直到操作完全完成
future.syncUninterruptibly();
}
unsubscribe
在完成所有操作后取消订阅,避免内存泄露或不必要的资源占用
public void unsubscribe(E entry, String entryName, String channelName) {
//这个信号量用于控制对特定操作的并发访问,确保在对订阅进行更改时的线程安全。
AsyncSemaphore semaphore = service.getSemaphore(new ChannelName(channelName));
semaphore.acquire(new Runnable() {
@Override
public void run() {
//如果entry.release() == 0,则表示相关资源可以被释放
if (entry.release() == 0) {
// just an assertion
boolean removed = entries.remove(entryName) == entry;
if (!removed) {
throw new IllegalStateException();
}
//如果成功释放资源,调用service的unsubscribe方法取消订阅特定的频道,同时传入信号量以维护操作的同步性。
service.unsubscribe(new ChannelName(channelName), semaphore);
} else {
//如果entry没有被完全释放(即release()方法返回非零值),则调用semaphore.release()来释放信号量,允许其他操作继续进行
semaphore.release();
}
}
});
}
tryLockInnerAsync
T> RFuture<T> tryLockInnerAsync(long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
internalLockLeaseTime = unit.toMillis(leaseTime);
//key=Keys[1] --锁的key
//threadId--线程唯一id
//releaseTime-锁自动释放时间
return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, command,
//首先判断锁是否存在
"if (redis.call('exists', KEYS[1]) == 0) then " +
//不存在,获取锁
"redis.call('hset', KEYS[1], ARGV[2], 1); " +
//调用expire设置有效期
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
//返回结果
"return nil; " +
"end; " +
//锁已经存在了,判断threadId是不是自己的线程
"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
//是自己的,重入次数+1
"redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
//设置有效期
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return nil; " +
"end; " +
//如果不是自己的就返回失败。
"return redis.call('pttl', KEYS[1]);",
Collections.<Object>singletonList(getName()), internalLockLeaseTime, getLockName(threadId));
}
scheduleExpirationRenewal
确保锁的过期时间被适时续约,特别是在锁被长时间持有时。它通过维护一个包含所有需要续约的锁条目的映射(EXPIRATION_RENEWAL_MAP
)来实现。每个条目代表一个锁,包含一个或多个希望续约该锁的线程ID。如果条目已存在,说明已经有其他线程计划续约这个锁,当前线程ID将被添加到这个条目中;如果条目不存在,将创建一个新的条目,并触发续约操作。这种机制允许多个线程协调地续约同一个锁,确保锁在需要时保持活动状态,避免因超时而导致的锁自动释放。
private void scheduleExpirationRenewal(long threadId) {
ExpirationEntry entry = new ExpirationEntry();
RedissonLock的所有实例都可以看到EXPIRATION_RENEWAL_MAP,因为他是静态的。一个RedissonLock类会创建出很多实例,每个锁都有自己的名字,有自己唯一entry。
ExpirationEntry oldEntry =
EXPIRATION_RENEWAL_MAP.putIfAbsent(getEntryName(), entry);
//oldEntry不为null,说明映射中已经有一个条目与当前锁相关联,那么就将当前线程ID添加到这个旧条目中,表明当前线程也希望续约这个锁的过期时间。
if (oldEntry != null) {
oldEntry.addThreadId(threadId);
} else {
//如果没有旧条目(即oldEntry为null),表示这是第一次尝试续约当前锁的过期时间,那么就将当前线程ID添加到新条目中,并调用renewExpiration方法来实际执行续约操作。
entry.addThreadId(threadId);
renewExpiration();
}
}
renewExpiration
private void renewExpiration() {
//获取与当前锁相关联的ExpirationEntry对象。这个对象可能包含了关于锁续约所需的信息,如相关的线程ID
ExpirationEntry ee = EXPIRATION_RENEWAL_MAP.get(getEntryName());
//没有找到过期条目,方法直接返回,锁可能被释放或者没有续约必要
if (ee == null) {
return;
}
//创建一个新的Timeout任务,该任务将在指定的延迟后执行
Timeout task =
//调度异步任务的方法,用于在指定的时间后执行代码。这里使用了internalLockLeaseTime / 3作为延时,表示在锁租约时间的三分之一后执行。
commandExecutor.getConnectionManager().newTimeout(new TimerTask() {
@Override
public void run(Timeout timeout) throws Exception {
//再次从映射中获取锁的过期条目。
ExpirationEntry ent = EXPIRATION_RENEWAL_MAP.get(getEntryName());
if (ent == null) {
return;
}
//获取第一个等待续约的线程ID。
Long threadId = ent.getFirstThreadId();
if (threadId == null) {
return;
}
//如果线程ID存在,调用renewExpirationAsync(threadId)异步续约锁。
RFuture<Boolean> future = renewExpirationAsync(threadId);
future.onComplete((res, e) -> {
if (e != null) {
//如果出现异常,记录错误日志。
log.error("Can't update lock " + getName() + " expiration", e);
return;
}
//如果续约成功(res为真),则递归调用renewExpiration以再次调度续约,确保锁在持有期间保持活动。
if (res) {
// reschedule itself
//重置有效期
renewExpiration();
}
});
}
//意味着在锁的租约时间过去三分之一之前,尝试对其进行续约。
}, internalLockLeaseTime / 3, TimeUnit.MILLISECONDS);
//更新过期条目的定时器
ee.setTimeout(task);
}
只有在释放锁的时候才会停止续期。
unlockAsync
void cancelExpirationRenewal(Long threadId) {
//根据锁的名字来取
ExpirationEntry task = EXPIRATION_RENEWAL_MAP.get(getEntryName());
if (task == null) {
return;
}
if (threadId != null) {
task.removeThreadId(threadId);
}
if (threadId == null || task.hasNoThreads()) {
task.getTimeout().cancel();
EXPIRATION_RENEWAL_MAP.remove(getEntryName());
}
}
tryLock
public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException {
long time = unit.toMillis(waitTime);
long current = System.currentTimeMillis();
long threadId = Thread.currentThread().getId();
Long ttl = tryAcquire(leaseTime, unit, threadId);
// lock acquired
//获取成功
if (ttl == null) {
return true;
}
//等待的时间为减掉获取锁的时间
time -= System.currentTimeMillis() - current;
//等待时间小于0就不等了
if (time <= 0) {
acquireFailed(threadId);
return false;
}
//严格控制时间,但是刚获取一次,立刻获取成功率不大,这时候我们可以用到了之前的发布订阅,获取到该县城订阅的别的线程是否释放锁的信息
current = System.currentTimeMillis();
//订阅与锁相关的信息
RFuture<RedissonLockEntry> subscribeFuture = subscribe(threadId);
//等待订阅完成,或者给定的等待时间time耗尽,
if (!await(subscribeFuture, time, TimeUnit.MILLISECONDS)) {
//订阅未在指定时间完成
if (!subscribeFuture.cancel(false)) {
//尝试取消订阅操作。如果取消失败(即订阅操作已完成或无法取消),则设置一个完成时的回调来处理结果。
subscribeFuture.onComplete((res, e) -> {
if (e == null) {
unsubscribe(subscribeFuture, threadId);
}
});
}
//调用acquireFailed方法处理获取锁失败的逻辑
acquireFailed(threadId);
//返回false,表示未能成功获取锁。
return false;
}
try {
//精确控制之前耗尽的时间
time -= System.currentTimeMillis() - current;
if (time <= 0) {
acquireFailed(threadId);
return false;
}
//不断地尝试
while (true) {
long currentTime = System.currentTimeMillis();
ttl = tryAcquire(leaseTime, unit, threadId);
// lock acquired
if (ttl == null) {
return true;
}
time -= System.currentTimeMillis() - currentTime;
if (time <= 0) {
acquireFailed(threadId);
return false;
}
// waiting for message
currentTime = System.currentTimeMillis();
if (ttl >= 0 && ttl < time) {
//如果ttl小于剩余时间且大于0,通过getEntry(threadId).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);等待直到锁可能变为可用或直至ttl时间结束。
getEntry(threadId).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
} else {
//等待剩余地总时间
getEntry(threadId).getLatch().tryAcquire(time, TimeUnit.MILLISECONDS);
}
time -= System.currentTimeMillis() - currentTime;
if (time <= 0) {
acquireFailed(threadId);
return false;
}
}
} finally {
//无论结果如何,都取消对锁释放消息的订阅,以避免内存泄漏或其他潜在问题
unsubscribe(subscribeFuture, threadId);
}
// return get(tryLockAsync(waitTime, leaseTime, unit));
}
unlockInnerAsync
解锁逻辑
protected RFuture<Boolean> unlockInnerAsync(long threadId) {
return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
//判断锁是不是自己持有
"if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " +
//如果不是自己的就返回
"return nil;" +
"end; " +
//如果是自己的锁,冲入次数-1
"local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " +
//判断是不是0,大于0说明不能释放,等于0就可以释放
"if (counter > 0) then " +
"redis.call('pexpire', KEYS[1], ARGV[2]); " +
"return 0; " +
"else " +
//直接删除了
"redis.call('del', KEYS[1]); " +
//将信息发送到指定的频道,任何订阅了该频道的客户端都可以接收到这个信息,在这个上下文中,它用于在锁被释放时通知其他可能正在等待这个锁的客户端
"redis.call('publish', KEYS[2], ARGV[1]); " +
"return 1; "+
"end; " +
"return nil;",
Arrays.<Object>asList(getName(), getChannelName()), LockPubSub.UNLOCK_MESSAGE, internalLockLeaseTime, getLockName(threadId));
}
图示流程
黑马的图,不重复造轮子
不过大家要记住如果设置了超时时间看门狗线程就不起作用了。也就是leasetime