前言
首先Redis执行命令是单线程的,所以可以利用Redis实现分布式锁,而对于Redis单线程的问题,是其线程模型的问题,本篇重点是对目前流行的工具Redisson
怎么去实现的分布式锁进行深入理解;开始之前,我们可以下你思考一个问题,Redisson
的实现方式有何不同?为什么?
使用
引入依赖
<dependency>
<groupId>org.redisson</groupId>
<artifactId>redisson</artifactId>
<version>3.25.0</version>
</dependency>
添加配置
@Autowired
private RedisProperties redisProperties;
@Bean
public RedissonClient redisClient() {
Config config = new Config();
config.setTransportMode(TransportMode.NIO);
SingleServerConfig singleServerConfig = config.useSingleServer();
singleServerConfig.setAddress(String.format("redis://%s:%s", redisProperties.getHost(), redisProperties.getPort()));
singleServerConfig.setPassword(redisProperties.getPassword());
return Redisson.create(config);
}
加锁
private final RedissonClient redissonClient;
public void lock() throws Exception {
RLock lock = redissonClient.getLock(PRODUCT_LOCK_KEY + id);
if (lock.tryLock(100, TimeUnit.MILLISECONDS)) {
// 业务
} else {
// 业务
}
}
如上使用是最简单的方式,Redission它底层封装了很多逻辑,但如果说要redis客户端实现,你要怎么实现?
Redis中本就有支持分布式锁的命令:setnx
,对应RedisTemplate
中,使用如下:
redisTemplate.opsForValue().setIfAbsent("lockkey", "", 1, TimeUnit.SECONDS);
使用Redis实现分布式锁需要注意的是不要产生死锁,所以使用Redis实现分布式锁有两种方式:
- setnx命令
- lua脚本
两种方式都是一个操作完成key,value的设置以及过期时间的设置,你是否有相关,他们的实现是否都一样,或者说,这个实现可以有其他实现方式?
源码
原理
- lua脚本保证多个命令的原子性;
- 采用
hash
数据结构,key为锁的名称,field是线程对应的名称,也因为这个数据结构,也支持可重入锁; - 定时延时的操作避免死锁(看门狗);
lock()
我们先看lock()
方法,tryLock()
和lock()
底层是一样的,所以我们只看lock方法;
@Override
public void lock() {
try {
lock(-1, null, false);
} catch (InterruptedException e) {
throw new IllegalStateException();
}
}
位置:org.redisson.RedissonLock#lock(long, java.util.concurrent.TimeUnit, boolean)
传参是:(-1, null, false)
/**
leaseTime: 过期时间
unit:过期时间单位
interruptibly: 信号量,是否打断线程
*/
private void lock(long leaseTime, TimeUnit unit, boolean interruptibly) throws InterruptedException {
// 取线程ID,后面作为锁的一个标志
long threadId = Thread.currentThread().getId();
// 加锁
// 两个步骤:
// 1. 利用lua脚本设置锁与过期时间(原子操作),过期时间lockWatchdogTimeout = 30 * 1000 = 30秒
// 2. 执行过期时间刷新(这里的刷新时利用回调的方式 + 延迟执行实现的定时任务),lockWatchdogTimeout/3 = 10秒
Long ttl = tryAcquire(-1, leaseTime, unit, threadId);
// lua脚本中,加锁成功返回nil,对应redis中是null,加锁失败,则返回的是已存在的锁的过期时间
// 所以这里返回null,就是加锁成功了,就不再往下走了
if (ttl == null) {
return;
}
// 加锁成功的在上面就已经结束了,所以下面的都是加锁失败时走的
// 这里它订阅了一个channel,参数threadId无用,别被误导了,它订阅的名称时固定的
// 为什么这里要订阅?可以思考一下
CompletableFuture<RedissonLockEntry> future = subscribe(threadId);
// 检查是否超时
pubSub.timeout(future);
RedissonLockEntry entry;
// 这里的interruptibly应该时程序一次时,是否结束,而不是一直在执行中
if (interruptibly) {
entry = commandExecutor.getInterrupted(future);
} else {
entry = commandExecutor.get(future);
}
try {
// 这里就是一个自旋 + 加锁
while (true) {
// 每次循环都进行加锁操作
ttl = tryAcquire(-1, leaseTime, unit, threadId);
// 同样,如果这里时null,那么就是加锁成功了
if (ttl == null) {
break;
}
// 加锁失败:返回了锁的过期时间
if (ttl >= 0) {
try {
// 信号量 + unsafe.park
// 信号量:本地更改线程共享变量状态达到加锁的目的
// unsafe.park:利用park方法将加锁失败(信号量更改失败)的线程进行挂起
// 为什么要挂起,这个问题应该不用多说
entry.getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
if (interruptibly) {
throw e;
}
entry.getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
}
} else {
// 这个分支时ttl < 0,也就是过期时间为负数,也就是锁失效了
// 对本地加锁,之后在下一次循环redis加锁
if (interruptibly) {
entry.getLatch().acquire();
} else {
entry.getLatch().acquireUninterruptibly();
}
}
}
} finally {
unsubscribe(future, threadId);
}
// get(lockAsync(leaseTime, unit));
}
上面的步骤就是Redisson大致逻辑:
- lua脚本加锁,记录线程id,防止非本线程解锁
- 成功则退出
- 添加订阅对应的channel(这里的订阅是异步的)
- 唤醒:当收到channel的通知后,也就是上一个锁解锁了
- 从第6步醒来
- 自旋
- lua脚本加锁(与第一步一样),存在订阅后,上一个锁解锁了,就不用再挂起线程
- 失败挂起:unsafe.park
- 加锁成功后,取消订阅
lua脚本加锁
简单说一下这个脚本做了什么(lua脚本类似js):
redis.call
是调用redis命令,第一个参数是redis的命令,第二个参数是参数;
它先是exists
判断了key
,以及hash数据结构了的field
是否存在,
如果存在,对field
递增,为什么要递增?思考一下;并设置过期时间,返回返回nil
;
如果不存在,调用redis命令pttl
,获取key的过期时间,并返回;
那如果我们自己写lua脚本呢?
redis 2.6之后支持lua脚本,一个脚本,执行这个脚本是原子性的,所以脚本里的多个命令是原子性的。
如果说要执行批量的命令,可以使用piple
,但是管道的话,它并不是原子性的,他只是一次性把批量的命令发给了redis。
LUA脚本格式:
eval "脚本 KEYS[1...N] ARGV[1...N]" count key[1...N] argv[1...N]
KEYS[1…N]:key的展位符,多个时,序号递增,从1开始
ARGV[1…N]:value的展位符,多个时,序号递增,从1开始
count:是对应key输入的个数
key[1…N]:对应KEYS[0…N]的key
argv[1…N]:对应ARGV[0…N]的value
这里就大概简单的说明了一下,使得看本篇的朋友能够理解,详细的还请百度;
看门狗
使用过Redisson的朋友应该都听过“看门狗”,为什么Redisson加锁要看门狗呢?
如果我们使用lock()
方法,不设置过期时间,那么应该是永不过期;
好,如果说,加锁成功,在解锁时出了意外,如服务异常退出,或宕机,导致没有解锁,那么这个锁就需要人工干预了,这是有问题的。
所以,在Redisson中,它并不是永不过期,当我们使用lock()
方法时,它的参数时-1
,但在最终执行lua脚本时传入了默认参数:
位置:org.redisson.RedissonLock#tryAcquireAsync
过期时间时:internalLockLeaseTime;
可以看到在初始化时,它便赋予了该值一个默认的30秒;
它并没有将锁设置为无限时间,而是30秒,又怎么保证锁的有效?
所以它还有一个续约的操作,对未使用unlock
的锁进行时间延迟,这一操作是为了保证未来某一时刻如果出现服务或其他问题导致解锁失败,产生死锁这样的一个情况。
来看代码:
private RFuture<Long> tryAcquireAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId) {
RFuture<Long> ttlRemainingFuture;
// leaseTime=-1表示永不过期
if (leaseTime > 0) {
// 对应方法:tryLock(过期时间, 时间单位)
ttlRemainingFuture = tryLockInnerAsync(waitTime, leaseTime, unit, threadId, RedisCommands.EVAL_LONG);
} else {
// 对应方法:lock()
// 注意它达到参数是:internalLockLeaseTime,上面说了他是30秒
// internalLockLeaseTime = lockWatchdogTimeout = 30 * 1000;
ttlRemainingFuture = tryLockInnerAsync(waitTime, internalLockLeaseTime,
TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);
}
// 这里是上面ttlRemainingFuture回调结果处理,
// 如果出现异常,就unlock解锁
CompletionStage<Long> s = handleNoSync(threadId, ttlRemainingFuture);
ttlRemainingFuture = new CompletableFutureWrapper<>(s);
// 这里是定时刷新任务
CompletionStage<Long> f = ttlRemainingFuture.thenApply(ttlRemaining -> {
// 执行成功返回的时null,所以这里以null为加锁成功的标志
if (ttlRemaining == null) {
if (leaseTime > 0) {
internalLockLeaseTime = unit.toMillis(leaseTime);
} else {
scheduleExpirationRenewal(threadId);
}
}
return ttlRemaining;
});
return new CompletableFutureWrapper<>(f);
}
handleNoSync
它只是针对异常做了处理,正常情况下只是进行了封装,而unlocakInnerAysnc
也是在异常时的一个回调;
return
是结果封装;
再回到刷新的部分:
它是在加锁完后的一个回调方法,ttlRemaining
它就是上面执行的结果,null或者是过期时间,
private RFuture<Long> tryAcquireAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId) {
RFuture<Long> ttlRemainingFuture;
// leaseTime=-1表示永不过期
if (leaseTime > 0) {
// 对应方法:tryLock(过期时间, 时间单位)
ttlRemainingFuture = tryLockInnerAsync(waitTime, leaseTime, unit, threadId, RedisCommands.EVAL_LONG);
} else {
// 对应方法:lock()
// 注意它达到参数是:internalLockLeaseTime,上面说了他是30秒
// internalLockLeaseTime = lockWatchdogTimeout = 30 * 1000;
ttlRemainingFuture = tryLockInnerAsync(waitTime, internalLockLeaseTime,
TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);
}
// 这里是上面ttlRemainingFuture回调结果处理,
// 如果出现异常,就unlock解锁
CompletionStage<Long> s = handleNoSync(threadId, ttlRemainingFuture);
ttlRemainingFuture = new CompletableFutureWrapper<>(s);
// 这里是定时刷新任务
CompletionStage<Long> f = ttlRemainingFuture.thenApply(ttlRemaining -> {
// 执行成功返回的时null,所以这里以null为加锁成功的标志
if (ttlRemaining == null) {
if (leaseTime > 0) {
// leaseTime > 0 表示加锁时设置了过期时间
// 而ternalLockLeaseTime存在默认值,这里就是取消了默认值
internalLockLeaseTime = unit.toMillis(leaseTime);
} else {
// 那,这里执行 过期时间的定时刷新
scheduleExpirationRenewal(threadId);
}
}
return ttlRemaining;
});
return new CompletableFutureWrapper<>(f);
}
过期时间刷新的步骤:
- 创建ExpirationEntry,可以看作时一个上下文对象,他是延时的标识
- 原子操作,添加上下文ExpirationEntry
- 设置当前线程id到上下文
- 创建Timeout延时任务,延时10秒
- 延时任务执行(异步):
- 根据客户端id和锁获取上下文
- 通过上下文获取到线程id
- 异步执行延时lua脚本
- 执行回调:成功,回调本身,回调第四步,失败,移除上下文,取消延时
protected void scheduleExpirationRenewal(long threadId) {
// 1. 创建ExpirationEntry,可以看作时一个上下文对象,他是延时的标识
ExpirationEntry entry = new ExpirationEntry();
// 2. 原子操作,添加上下文ExpirationEntry
// 要进行过期时间刷新,就要先添加这个对象,可以理解为一个刷新的标志
ExpirationEntry oldEntry = EXPIRATION_RENEWAL_MAP.putIfAbsent(getEntryName(), entry);
// 3. 设置当前线程id到上下文
if (oldEntry != null) {
// 不为空,说明这个对象已经存在,已经有其他线程执行了过期刷新
oldEntry.addThreadId(threadId);
} else {
// 为空,则是不存在该映射对象(锁不存在)
entry.addThreadId(threadId);
try {
// 执行过期时间刷新
// 4. 创建Timeout延时任务,延时10秒
// 5. 延时任务执行(异步):
// 5.1 根据客户端id和锁获取上下文
// 5.2 通过上下文获取到线程id
// 5.3 异步执行延时lua脚本
// 5.4 执行回调:成功,回调本身,回调第四步,失败,移除上下文,取消延时
renewExpiration();
} finally {
if (Thread.currentThread().isInterrupted()) {
cancelExpirationRenewal(threadId);
}
}
}
}
下面我们进入renewExpiration
方法:
private void renewExpiration() {
// EXPIRATION_RENEWAL_MAP 在上一层方法中添加了刷新的上下文标志
ExpirationEntry ee = EXPIRATION_RENEWAL_MAP.get(getEntryName());
if (ee == null) {
// 如果这里为null,就说明已经有其他线程移除了,那么就是不需要刷新,锁不存在了
return;
}
// 4. 创建Timeout延时任务,延时10秒
// 注意这里的时间: internalLockLeaseTime / 3
// 之前说过:internalLockLeaseTime = lockWatchdogTimeout = 30 * 1000;
// 所以这里是延迟10秒
Timeout task = getServiceManager().newTimeout(new TimerTask() {
// 5. 延时任务执行(异步)
@Override
public void run(Timeout timeout) throws Exception {
// 5.1 根据客户端id和锁获取上下文
// entryName是客户端id+线程id
// 每次执行都要检查,刷新的标志存在,就说明锁还在,需要执行
ExpirationEntry ent = EXPIRATION_RENEWAL_MAP.get(getEntryName());
if (ent == null) {
return;
}
// 5.2 通过上下文获取到线程id
Long threadId = ent.getFirstThreadId();
if (threadId == null) {
return;
}
// 5.3 异步执行延时lua脚本
CompletionStage<Boolean> future = renewExpirationAsync(threadId);
// 当延时完成后,执行这个方法,也是个回调
future.whenComplete((res, e) -> {
if (e != null) {
log.error("Can't update lock {} expiration", getRawName(), e);
// 异常时,移除刷新的标志
// 也是避免死锁的一个方式
EXPIRATION_RENEWAL_MAP.remove(getEntryName());
return;
}
// 5.4 成功:回调本身,回调第四步,失败:移除上下文,取消延时
if (res) {
// 延时成功,回调当前这个方法,以实现定时任务
renewExpiration();
} else {
// 没有成功,也是移除刷新的标志
// 这里没有成功的情况,是锁已经不存在了,对应的定时任务就应该停止;
// 两个步骤:
// 1. task.cancel():通过上下文获取到任务,然后取消
// 2. EXPIRATION_RENEWAL_MAP.remove(getEntryName());
cancelExpirationRenewal(null);
}
});
}
}, internalLockLeaseTime / 3, TimeUnit.MILLISECONDS);
// 将定时任务放到entry中,也就是放到了定时任务的上下文中
// 在下一次时,通过上下文获取到这个定时任务
ee.setTimeout(task);
}
其实他延时的逻辑也是一个lua脚本:
漏了一点:
由Redisson实现JDK的Timeout
类,加回调完成的一个定时任务;当当前任务执行完后,又执行本身方法,创建一个延迟任务,也就实现了一个定时任务