0、引言:分布式锁的引出
锁常常用于多线程并发的场景下保证数据的一致性,例如防止超卖、一人一单等场景需求 。通过加锁可以解决在单机情况下安全问题,但是在集群模式下就不行了。集群模式,即部署了多个服务器、并配置了负载均衡后,原来加的锁会失效,具体原因如下:
由于现在我们部署了多个tomcat,每个tomcat都有一个属于自己的jvm,那么假设在服务器A的tomcat内部,有两个线程,这两个线程由于使用的是同一份代码,那么他们的锁对象是同一个,是可以实现互斥的;
但是如果现在是服务器B的tomcat内部,又有两个线程,但是他们的锁对象写的虽然和服务器A一样,但是锁对象却不是同一个,所以线程3和线程4可以实现互斥,但是却无法和线程1和线程2实现互斥。
这就是集群环境下,syn锁失效的原因,在这种情况下,我们就需要使用分布式锁来解决这个问题。
1、分布式锁的基本原理
Redis:redis作为分布式锁是非常常见的一种使用方式,现在企业级开发中基本都使用redis或者zookeeper作为分布式锁,利用setnx这个方法(具体使用可看我的这篇文章http://t.csdn.cn/U7Z6y),如果插入key成功,则表示获得到了锁,如果有人插入成功,其他人插入失败则表示无法获得到锁,利用这套逻辑来实现分布式锁。同时增加过期时间,防止死锁,此方法可以保证加锁和增加过期时间具有原子性
Redis锁的代码实现
private static final String KEY_PREFIX="lock:"
@Override
public boolean tryLock(long timeoutSec) {
// 获取线程标示
String threadId = Thread.currentThread().getId()
// 获取锁
Boolean success = stringRedisTemplate.opsForValue()
.setIfAbsent(KEY_PREFIX + name, threadId + "", timeoutSec, TimeUnit.SECONDS);
return Boolean.TRUE.equals(success);
}
public void unlock() {
//通过del删除锁
stringRedisTemplate.delete(KEY_PREFIX + name);
}
2. 分布式锁将遇到的问题与解决方案
2.1 如何避免死锁
当用户1拿到锁以后,若进程挂了、或因为别的原因,没有机会主动释放锁,会导致已经获得锁的客户端一直占用锁,其他客户端永远无法获取到锁。
解决方案:
为了解决以上死锁问题,最容易想到的方案是:在申请锁时,在Redis中实现时,给锁设置一个过期时间,假设操作共享资源的时间不会超过10s,那么加锁时,给这个key设置10s过期即可。并且Redis 2.6.12之后,Redis扩展了SET命令的参数,可以在SET的同时指定EXPIRE时间,这条操作是原子的,例如以下命令是设置锁的过期时间为10秒。
SET lock_key 1 EX 10 NX
2.2 Redis分布式锁误删
对于2.1情况下,并不是完美的解决方案,例如在遇到这种情况时:
1. 持有锁的线程1在锁的内部出现了阻塞,而他的锁超时自动释放(del了),这时其他线程,线程2来尝试获得锁,就拿到了这把锁(setnx了);
2.然后线程2在持有锁执行过程中,线程1反应过来,继续执行,而线程1执行过程中,走到了删除锁逻辑,此时就会把本应该属于线程2的锁进行删除(把线程2的setnx的值del了)。
3. 因为线程2还没有执行完,其锁就被释放,如果此时线程3进入获取到了锁,则两个线程会同时操作数据,造成不安全的情况。
解决方案:
在每一次释放锁之前,判断当前的锁是否属于自己这个线程,这样就避免了释放别人锁的情况。
核心逻辑:在存入锁时,放入自己线程的标识,在删除锁时,判断当前这把锁的标识是不是自己存入的,如果是,则进行删除,如果不是,则不进行删除。
改进后的代码:
private static final String ID_PREFIX = UUID.randomUUID().toString(true) + "-";
@Override
public boolean tryLock(long timeoutSec) {
// 获取线程标示
String threadId = ID_PREFIX + Thread.currentThread().getId();
// 获取锁
Boolean success = stringRedisTemplate.opsForValue()
.setIfAbsent(KEY_PREFIX + name, threadId, timeoutSec, TimeUnit.SECONDS);
return Boolean.TRUE.equals(success);
}
public void unlock() {
// 获取线程标示
String threadId = ID_PREFIX + Thread.currentThread().getId();
// 获取锁中的标示
String id = stringRedisTemplate.opsForValue().get(KEY_PREFIX + name);
// 判断标示是否一致
if(threadId.equals(id)) {
// 释放锁
stringRedisTemplate.delete(KEY_PREFIX + name);
}
}
2.3 Redis分布式锁原子性问题
在2.1、2.2两次改进的基础上,感觉没什么问题了,但又有新的神秘bug出现:对于上述的操作流程如下图所示,我们说假如啊,假如,在程序进入最后的“判断锁标识是否是自己”这个判断句,已经进入了(已经判断完以后),就要执行释放锁操作了:
但是由于判断id、删除id,这两个步骤并不是原子性的:假如在del的时候发生了阻塞,而导致超时释放锁,将造成以下后果:
此时线程2获取到了锁,正在嘎嘎执行业务的时候,线程1的del阻塞结束了,但由于在判断句内部,这个锁仍然会被释放(即线程2的锁仍然被认为是线程1的,被释放了)
这时候线程3进来,又会发生一样的安全问题。
解决方案:Lua脚本解决多命令原子性问题
Redis提供了Lua脚本功能,在一个脚本中编写多条Redis命令,确保多条命令执行时的原子性。 lua脚本如下:
-- 这里的 KEYS[1] 就是锁的key,这里的ARGV[1] 就是当前线程标示
-- 获取锁中的标示,判断是否与当前线程标示一致
if (redis.call('GET', KEYS[1]) == ARGV[1]) then
-- 一致,则删除锁
return redis.call('DEL', KEYS[1])
end
-- 不一致,则直接返回
return 0
对应的,用java代码调用此脚本的方法如下:
private static final DefaultRedisScript<Long> UNLOCK_SCRIPT;
static {
UNLOCK_SCRIPT = new DefaultRedisScript<>();
UNLOCK_SCRIPT.setLocation(new ClassPathResource("unlock.lua"));
UNLOCK_SCRIPT.setResultType(Long.class);
}
public void unlock() {
// 调用lua脚本
stringRedisTemplate.execute(
UNLOCK_SCRIPT,
Collections.singletonList(KEY_PREFIX + name),
ID_PREFIX + Thread.currentThread().getId());
}
3、redission替代setnx分布式锁
实际上,setnx这种分布式锁的实现方式存在以下问题:
实现Redis的分布式锁,除了自己基于redis client原生api来实现之外,其实还可以使用开源框架:Redission
Redisson是一个企业级的开源Redis Client,也提供了分布式锁的支持,并且拥有上述Redis没有的优点。
3.1 Redisson实现可重入锁(01)与锁重试(02)
在redission中也支持支持可重入锁。在分布式锁中,他采用hash结构用来存储锁,其中大key表示表示这把锁是否存在,用小key表示当前这把锁被哪个线程持有。value标识这把锁有多少个方法正在使用:
源码解析:tryAcquire、tryAcquireAsync实现
watiTime是传入的等待时间:超过这个时间,线程还拿不到锁,那就不等了,获取锁失败。
要注意区分,这个时间并不是锁的有效时间、超时释放的最大存活时间。
如上图源码,实现可重入的方式是通过String字符串的方式替代lua脚本,这个地方一共有3个参数:
KEYS[1] : 锁名称
ARGV[1]: 锁失效时间
ARGV[2]: id + ":" + threadId; 锁的小key
exists: 判断数据是否存在 name:
是lock是否存在,如果==0,就表示当前这把锁不存在;
redis.call('hset', KEYS[1], ARGV[2], 1);此时他就开始往redis里边去写数据 ,写成一个hash结构:
Lock{
id+":"+threadld:1
}
如果当前这把锁存在,则第一个条件不满足,再判断:
redis.call('hexists', KEYS[1], ARGV[2]) == 1
此时需要通过大key+小key判断当前这把锁是否是属于自己的,如果是自己的,则进行:
redis.call('hincrby', KEYS[1], ARGV[2], 1)
将当前这个锁的value进行+1 ,即重入
成功拿到锁,则返回null
没成功拿到锁:
redis.call('pexpire', KEYS[1], ARGV[1]);
然后再对其设置过期时间,如果以上两个条件都不满足,则表示当前这把锁抢锁失败,最后返回pttl,即为当前这把锁的剩余有效期。
返回ttl后,会在源码处进行while(true)的自旋重复获取锁:
//能运行到这里,说明time剩余等待时间仍然>0
while (true) {
long currentTime = System.currentTimeMillis();
ttl = tryAcquire(waitTime, leaseTime, unit, threadId);
// lock acquired
if (ttl == null) {
return true;
}
time -= System.currentTimeMillis() - currentTime;
if (time <= 0) {
acquireFailed(waitTime, unit, threadId);
return false;
}
// waiting for message
currentTime = System.currentTimeMillis();
if (ttl >= 0 && ttl < time) {
subscribeFuture.getNow().getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
} else {
subscribeFuture.getNow().getLatch().tryAcquire(time, TimeUnit.MILLISECONDS);
}
time -= System.currentTimeMillis() - currentTime;
if (time <= 0) {
acquireFailed(waitTime, unit, threadId);
return false;
}
}
} finally {
unsubscribe(subscribeFuture, threadId);
}
}
3.2 WatchDog防止锁超时释放机制(03)
在3.1节中,不论是直接一次性获取到锁(ttl为null),还是说在while循环反复尝试后获得到锁,都是可以拿到锁,然后去执行业务的。但锁有自己的寿命,运行一定时间会自己超时释放:
我们希望锁的释放是因为业务执行完释放,而不是因为阻塞超时导致的释放。
因此,我们把锁的寿命无限延长:
原理:
Redisson提供的分布式锁是支持锁自动续期的,也就是说,如果线程仍旧没有执行完,那么redisson会自动给redis中的目标key延长超时时间,锁不会因为超时而被释放。这在Redisson中称之为 Watch Dog 机制。
默认情况下,看门狗的续期时间是30s,也可以通过修改Config.lockWatchdogTimeout来另行指定。另外Redisson 还提供了可以指定leaseTime参数的加锁方法来指定加锁的时间。超过这个时间后锁便自动解开了,不会延长锁的有效期。
注意:这个30秒不是你传的leaseTime参数为30,而是你不传leaseTime或者传-1时,Redisson配置中默认给你的30秒。所以,如果你想解决由于线程执行慢或者阻塞,造成锁超时释放的问题,就不要在两个方法中传release。
源码解析:
private RFuture<Boolean> tryAcquireOnceAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId) {
//如果自己设置了leaseTime(且不是-1)
//那么直接return,也就不会执行看门狗的延时函数scheduleExpirationRenewal了
if (leaseTime != -1) {
return tryLockInnerAsync(waitTime, leaseTime, unit, threadId, RedisCommands.EVAL_NULL_BOOLEAN);
}
//如果获取锁成功,ttlRemainingFuture 就是null,失败则为剩余有效时间
RFuture<Boolean> ttlRemainingFuture = tryLockInnerAsync(waitTime,
commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(),
TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_NULL_BOOLEAN);
ttlRemainingFuture.onComplete((ttlRemaining, e) -> {
if (e != null) {
return;
}
// lock acquired 如果已经获取锁成功了,解决有效期的问题:scheduleExpirationRenewal
if (ttlRemaining) {
scheduleExpirationRenewal(threadId);
//这是针对当前线程,无限延长对应的锁的寿命。
//但也不能一直无限长寿命,在业务结束后,线程主动释放锁以后,
//将关闭看门狗。
}
});
//不管怎样,都会return回去,返回获取锁的结果如何。
return ttlRemainingFuture;
}
3.3 总结