一、分布式锁
1.1 什么是分布式锁
分布式锁,即分布式系统中的锁。在单体应用中我们通过锁解决的是控制共享资源访问的问题,而分布式锁,就是解决了分布式系统中控制共享资源访问的问题。与单体应用不同的是,分布式系统中竞争共享资源的最小粒度从线程升级成了进程。
1.2 分布式锁应该具备哪些条件
-
在分布式系统环境下,一个方法在同一时间只能被一个机器的一个线程执行
-
高可用的获取锁与释放锁
-
高性能的获取锁与释放锁
-
具备可重入特性(可理解为重新进入,由多于一个任务并发使用,而不必担心数据错误)
-
具备锁失效机制,即自动解锁,防止死锁
-
具备非阻塞锁特性,即没有获取到锁将直接返回获取锁失败
1.3 分布式锁的实现方式
-
基于数据库实现分布式锁
-
基于
Zookeeper
实现分布式锁 -
基于
Reids
实现分布式锁
本章重点讲解的是基于
Reids
的分布式锁
二、利用RedisTemplate实现分布式锁
在真实的项目里,我们经常会使用Spring Boot/Spring Cloud
框架,该框架已经集成了 RedisTemplate
类,开放了很多对Redis
的API
。
本章节列举基于
RedisTemplate
实现方式分布式锁的各种方式及存在的问题。
2.1 利用setIfAbsent先设置key value再设置expire
redisTemplate.opsForValue().setIfAbsent(key,value);
redisTemplate.expire(key, time, TimeUnit.SECONDS);
问题:「不是原子操作」。以上两条语句不是原子性的。假如执行完第一条语句后,服务挂掉,导致
key
永久存在,锁无法释放。
setIfAbsent(key, value)
方法简介:如果key
不存在则新增,返回true
;存在则不改变已经有的值返回false
。
2.2 利用setIfAbsent同时设置 key value expire
redisTemplate.opsForValue().setIfAbsent(key, value, time, TimeUnit.SECONDS);
问题一:「锁过期释放了,业务还没执行完」。假设线程a获取锁成功,一直在执行临界区的代码。但是100s过去后,它还没执行完。但是,这时候锁已经过期了,此时线程b又请求过来。显然线程b就可以获得锁成功,也开始执行临界区的代码。那么问题就来了,临界区的业务代码都不是严格串行执行。
问题二:「锁被别的线程误删」。假设线程a执行完后,去释放锁。但是它不知道当前的锁可能是线程b持有的(线程a去释放锁时,有可能过期时间已经到了,此时线程b进来占有了锁)。那线程a就把线程b的锁释放掉了,但是线程b临界区业务代码可能都还没执行完。
2.3 利用setIfAbsent同时设置 key value expire,value为客户端标识
可以在获取锁,解锁时。首先获取客户端标识,如果和加锁时不一致,则获解锁操作失败,解决了2.2
中提到了「锁被别的线程误删」
问题。
Callable<String> callable = () -> {
String result = "";
String threadId = Thread.currentThread().getId() + "";
if (Boolean.TRUE.equals(redisTemplate.opsForValue().setIfAbsent(key, threadId, 10, TimeUnit.SECONDS))) {
result = threadId;
try {
// 模拟业务处理
Thread.sleep(5000);
} catch (Exception e) {
e.printStackTrace();
} finally {
if (redisTemplate.opsForValue().get(key).equals(threadId)) {
redisTemplate.delete(key);
}
}
}
return result;
};
List<Future<String>> list = new ArrayList<>();
for (int i = 0; i < 10; i++) {
Future<String> future = executorService.submit(callable);
list.add(future);
}
for (Future<String> future : list) {
log.info(future.get());
}
问题:「不是原子操作」。通过
key
获取加锁时的客户端标识和释放锁两条语句不是原子操作。
2.4 利用lua脚本进行加锁及释放锁原子操作
本章重点,通过
lua
脚本对2.3
提出的原子性问题进行解决。
RedisTemplate
执行lua
脚本加锁释放锁工具类:
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.core.script.RedisScript;
import org.springframework.data.redis.serializer.RedisSerializer;
import org.springframework.data.redis.serializer.StringRedisSerializer;
import org.springframework.stereotype.Component;
import java.util.Collections;
import java.util.List;
@Component
public class ConcurrentLockUtil {
private static final String LOCK_LUA = "if redis.call('setnx', KEYS[1], ARGV[1]) == 1 then redis.call('expire', KEYS[1], ARGV[2]) return 'true' else return 'false' end";
private static final String UNLOCK_LUA = "if redis.call('get', KEYS[1]) == ARGV[1] then redis.call('del', KEYS[1]) end return 'true' ";
private final RedisScript lockRedisScript;
private final RedisScript unLockRedisScript;
private final RedisSerializer<String> argsSerializer;
private final RedisSerializer<String> resultSerializer;
private RedisTemplate redisTemplate;
public ConcurrentLockUtil(RedisTemplate redisTemplate) {
this.argsSerializer = new StringRedisSerializer();
this.resultSerializer = new StringRedisSerializer();
this.lockRedisScript = RedisScript.of(LOCK_LUA, String.class);
this.unLockRedisScript = RedisScript.of(UNLOCK_LUA, String.class);
this.redisTemplate = redisTemplate;
}
/**
* 分布式锁
*
* @param lockKey
* @param value
* @param time
* @return
*/
public boolean lock(String lockKey, String value, long time) {
List<String> keys = Collections.singletonList(lockKey);
String flag = (String) redisTemplate.execute(lockRedisScript, argsSerializer, resultSerializer, keys, value, String.valueOf(time));
return Boolean.valueOf(flag);
}
/**
* 删除锁
*
* @param lock
* @param val
*/
public void unlock(String lock, String val) {
List<String> keys = Collections.singletonList(lock);
redisTemplate.execute(unLockRedisScript, argsSerializer, resultSerializer, keys, val);
}
}
业务调用:
@Autowired
ConcurrentLockUtil concurrentLockUtil;
ExecutorService executorService = Executors.newFixedThreadPool(10);
String key = "test2-lock";
Callable<String> callable = () -> {
String result = "";
String threadId = Thread.currentThread().getId() + "";
if (Boolean.TRUE.equals(concurrentLockUtil.lock(key, threadId, 10))) {
result = threadId;
try {
Thread.sleep(5000);
} catch (Exception e) {
e.printStackTrace();
} finally {
concurrentLockUtil.unlock(key, threadId);
}
}
return result;
};
List<Future<String>> list = new ArrayList<>();
for (int i = 0; i < 10; i++) {
Future<String> future = executorService.submit(callable);
list.add(future);
}
for (Future<String> future : list) {
log.info(future.get());
}
问题:针对
2.2
章节提到的「锁过期释放了,业务还没执行完」问题仍然存在。
有些小伙伴认为,稍微把锁过期时间设置长一些就可以。其实我们设想一下,是否可以给获得锁的线程,开启一个定时守护线程,每隔一段时间检查锁是否还存在,存在则对锁的过期时间延长,防止锁过期提前释放,请见下面章节。
三、利用Redisson框架RLock实现分布式锁
开源框架Redisson
解决了2.1.4
提出的问题。官网地址:https://github.com/redisson/redisson/wiki/1.-%E6%A6%82%E8%BF%B0
我们一起来看下Redisson
底层原理图:
只要线程一加锁成功,就会启动一个watch dog
看门狗,它是一个后台线程,会每隔10
秒检查一下,如果线程1还持有锁,那么就会不断的延长锁key
的生存时间(Redisson
中使用的 Lua
脚本做的检查及设置过期时间操作,本身是原子性的)。因此,解决了「锁过期释放,业务没执行完」
问题。
3.1 加锁逻辑
Redisson
中 Lua
加锁脚本定义及流程如下:
3.2 解锁逻辑
Redisson
中 Lua
解锁脚本定义及流程如下:
3.3 续锁逻辑
可以看到续时方法将 threadId
当作标识符进行续时
知道核心理念就好了, 没必要研究每一行代码
四、利用RedissonRedLock多机实现分布式锁
4.1 单机版锁存在的问题
前面章节的几种方案都只是基于单机版的讨论,还不是很完美。其实Redis
一般都是集群部署的:
如果线程一在Redis
的master
节点上拿到了锁,但是加锁的key
还没同步到slave
节点。恰好这时,master
节点发生故障,一个slave
节点就会升级为master
节点。线程二就可能获取同个key
的锁,但线程一也已经拿到锁了,锁的安全性就没了。
4.2 RedissonRedLock核心思想
为了解决上述问题,Redis
作者 antirez
提出一种高级的分布式锁算法:Redlock
。Redlock
核心思想是这样的:
搞多个
Redis
master
部署,以保证它们不会同时宕掉。并且这些master
节点是完全相互独立的,相互之间不存在数据同步。同时,需要确保在这多个master
实例上,是与在Redis
单实例,使用相同方法来获取和释放锁。
RedissonRedLock
算法:
RedissonRedLock
业务逻辑实现:
Config config1 = new Config();
config1.useSingleServer().setAddress("redis://172.0.0.1:5378").setPassword("a123456").setDatabase(0);
RedissonClient redissonClient1 = Redisson.create(config1);
Config config2 = new Config();
config2.useSingleServer().setAddress("redis://172.0.0.1:5379").setPassword("a123456").setDatabase(0);
RedissonClient redissonClient2 = Redisson.create(config2);
Config config3 = new Config();
config3.useSingleServer().setAddress("redis://172.0.0.1:5380").setPassword("a123456").setDatabase(0);
RedissonClient redissonClient3 = Redisson.create(config3);
String lockKey = "myLock";
int waitTimeout = 5;
int leaseTime = 30;
/**
* 获取多个 RLock 对象
*/
RLock lock1 = redissonClient1.getLock(lockKey);
RLock lock2 = redissonClient2.getLock(lockKey);
RLock lock3 = redissonClient3.getLock(lockKey);
/**
* 根据多个 RLock 对象构建 RedissonRedLock (最核心的差别就在这里)
*/
RedissonRedLock redLock = new RedissonRedLock(lock1, lock2, lock3);
try {
/**
* 4.尝试获取锁
* waitTimeout 尝试获取锁的最大等待时间,超过这个值,则认为获取锁失败
* leaseTime 锁的持有时间,超过这个时间锁会自动失效(值应设置为大于业务处理的时间,确保在锁有效期内业务能处理完)
*/
boolean res = redLock.tryLock((long) waitTimeout, (long) leaseTime, TimeUnit.SECONDS);
if (res) {
//成功获得锁,在这里处理业务
}
} catch (Exception e) {
throw new RuntimeException("aquire lock fail");
} finally {
//无论如何, 最后都要解锁
redLock.unlock();
}
RedissonRedLock
核心源码:
public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException {
long newLeaseTime = -1;
if (leaseTime != -1) {
newLeaseTime = unit.toMillis(waitTime)*2;
}
long time = System.currentTimeMillis();
long remainTime = -1;
if (waitTime != -1) {
remainTime = unit.toMillis(waitTime);
}
long lockWaitTime = calcLockWaitTime(remainTime);
/**
* 1. 允许加锁失败节点个数限制(N-(N/2+1))
*/
int failedLocksLimit = failedLocksLimit();
/**
* 2. 遍历所有节点通过EVAL命令执行lua加锁
*/
List<RLock> acquiredLocks = new ArrayList<>(locks.size());
for (ListIterator<RLock> iterator = locks.listIterator(); iterator.hasNext();) {
RLock lock = iterator.next();
boolean lockAcquired;
/**
* 3.对节点尝试加锁
*/
try {
if (waitTime == -1 && leaseTime == -1) {
lockAcquired = lock.tryLock();
} else {
long awaitTime = Math.min(lockWaitTime, remainTime);
lockAcquired = lock.tryLock(awaitTime, newLeaseTime, TimeUnit.MILLISECONDS);
}
} catch (RedisResponseTimeoutException e) {
// 如果抛出这类异常,为了防止加锁成功,但是响应失败,需要解锁所有节点
unlockInner(Arrays.asList(lock));
lockAcquired = false;
} catch (Exception e) {
// 抛出异常表示获取锁失败
lockAcquired = false;
}
if (lockAcquired) {
/**
*4. 如果获取到锁则添加到已获取锁集合中
*/
acquiredLocks.add(lock);
} else {
/**
* 5. 计算已经申请锁失败的节点是否已经到达 允许加锁失败节点个数限制 (N-(N/2+1))
* 如果已经到达, 就认定最终申请锁失败,则没有必要继续从后面的节点申请了
* 因为 Redlock 算法要求至少N/2+1 个节点都加锁成功,才算最终的锁申请成功
*/
if (locks.size() - acquiredLocks.size() == failedLocksLimit()) {
break;
}
if (failedLocksLimit == 0) {
unlockInner(acquiredLocks);
if (waitTime == -1 && leaseTime == -1) {
return false;
}
failedLocksLimit = failedLocksLimit();
acquiredLocks.clear();
// reset iterator
while (iterator.hasPrevious()) {
iterator.previous();
}
} else {
failedLocksLimit--;
}
}
/**
* 6.计算 目前从各个节点获取锁已经消耗的总时间,如果已经等于最大等待时间,则认定最终申请锁失败,返回false
*/
if (remainTime != -1) {
remainTime -= System.currentTimeMillis() - time;
time = System.currentTimeMillis();
if (remainTime <= 0) {
unlockInner(acquiredLocks);
return false;
}
}
}
if (leaseTime != -1) {
List<RFuture<Boolean>> futures = new ArrayList<>(acquiredLocks.size());
for (RLock rLock : acquiredLocks) {
RFuture<Boolean> future = ((RedissonLock) rLock).expireAsync(unit.toMillis(leaseTime), TimeUnit.MILLISECONDS);
futures.add(future);
}
for (RFuture<Boolean> rFuture : futures) {
rFuture.syncUninterruptibly();
}
}
/**
* 7.如果逻辑正常执行完则认为最终申请锁成功,返回true
*/
return true;
}
当然,对于 RedissonRedLock
算法不是没有质疑声, 大家可以去 Redis
官网查看Martin Kleppmann
与 Redis
作者Antirez
的辩论。