Redission实现分布式锁之源码解析
- 1、Redission实现分布式锁之源码解析
- 1.1 分布式锁-redission功能介绍
- 1.2 分布式锁-Redission快速入门
- 1.3 分布式锁-redission可重入锁原理
- 1.4 分布式锁-redission锁重试和WatchDog机制
- 1.5 分布式锁-redission锁的MutiLock原理
1、Redission实现分布式锁之源码解析
1.1 分布式锁-redission功能介绍
接上篇Redis分布式锁原理之实现秒杀抢优惠卷业务
基于setnx实现的分布式锁存在下面的问题:
重入问题:重入问题是指 获得锁的线程可以再次进入到相同的锁的代码块中,可重入锁的意义在于防止死锁,比如HashTable这样的代码中,他的方法都是使用synchronized修饰的,假如他在一个方法内,调用另一个方法,那么此时如果是不可重入的,不就死锁了吗?所以可重入锁他的主要意义是防止死锁,我们的synchronized和Lock锁都是可重入的。
不可重试:是指目前的分布式只能尝试一次,我们认为合理的情况是:当线程在获得锁失败后,他应该能再次尝试获得锁。
**超时释放:**我们在加锁时增加了过期时间,这样的我们可以防止死锁,但是如果卡顿的时间超长,虽然我们采用了lua表达式防止删锁的时候,误删别人的锁,但是毕竟没有锁住,有安全隐患
主从一致性: 如果Redis提供了主从集群,当我们向集群写数据时,主机需要异步的将数据同步给从机,而万一在同步过去之前,主机宕机了,就会出现死锁问题。
那么什么是Redission呢
Redisson是一个在Redis的基础上实现的Java驻内存数据网格(In-Memory Data Grid)。它不仅提供了一系列的分布式的Java常用对象,还提供了许多分布式服务,其中就包含了各种分布式锁的实现。
Redission提供了分布式锁的多种多样的功能
1.2 分布式锁-Redission快速入门
引入依赖:
<dependency>
<groupId>org.redisson</groupId>
<artifactId>redisson</artifactId>
<version>3.13.6</version>
</dependency>
配置Redisson客户端:
@Configuration
public class RedissonConfig {
@Bean
public RedissonClient redissonClient(){
// 配置
Config config = new Config();
config.useSingleServer().setAddress("redis://127.0.0.1:6379")
.setPassword("123321");
// 创建RedissonClient对象
return Redisson.create(config);
}
}
如何使用Redission的分布式锁
@Resource
private RedissionClient redissonClient;
@Test
void testRedisson() throws Exception{
//获取锁(可重入),指定锁的名称
RLock lock = redissonClient.getLock("anyLock");
//尝试获取锁,参数分别是:获取锁的最大等待时间(期间会重试),锁自动释放时间,时间单位
boolean isLock = lock.tryLock(1,10,TimeUnit.SECONDS);
//判断获取锁成功
if(isLock){
try{
System.out.println("执行业务");
}finally{
//释放锁
lock.unlock();
}
}
}
在 VoucherOrderServiceImpl
注入RedissonClient
@Resource
private RedissonClient redissonClient;
@Override
public Result seckillVoucher(Long voucherId) {
// 1.查询优惠券
SeckillVoucher voucher = seckillVoucherService.getById(voucherId);
// 2.判断秒杀是否开始
if (voucher.getBeginTime().isAfter(LocalDateTime.now())) {
// 尚未开始
return Result.fail("秒杀尚未开始!");
}
// 3.判断秒杀是否已经结束
if (voucher.getEndTime().isBefore(LocalDateTime.now())) {
// 尚未开始
return Result.fail("秒杀已经结束!");
}
// 4.判断库存是否充足
if (voucher.getStock() < 1) {
// 库存不足
return Result.fail("库存不足!");
}
Long userId = UserHolder.getUser().getId();
//创建锁对象 这个代码不用了,因为我们现在要使用分布式锁
//SimpleRedisLock lock = new SimpleRedisLock("order:" + userId, stringRedisTemplate);
RLock lock = redissonClient.getLock("lock:order:" + userId);
//获取锁对象
boolean isLock = lock.tryLock();
//加锁失败
if (!isLock) {
return Result.fail("不允许重复下单");
}
try {
//获取代理对象(事务)
IVoucherOrderService proxy = (IVoucherOrderService) AopContext.currentProxy();
return proxy.createVoucherOrder(voucherId);
} finally {
//释放锁
lock.unlock();
}
}
1.3 分布式锁-redission可重入锁原理
在Lock锁中,他是借助于底层的一个voaltile的一个state变量来记录重入的状态的,比如当前没有人持有这把锁,那么state=0,假如有人持有这把锁,那么state=1,如果持有这把锁的人再次持有这把锁,那么state就会+1 ,如果是对于synchronized而言,他在c语言代码中会有一个count,原理和state类似,也是重入一次就加一,释放一次就-1 ,直到减少成0 时,表示当前这把锁没有被人持有。
在redission中,我们的也支持支持可重入锁
在分布式锁中,他采用hash结构用来存储锁,其中大key表示表示这把锁是否存在,用小key表示当前这把锁被哪个线程持有,所以接下来我们一起分析一下当前的这个lua表达式
这个地方一共有3个参数
KEYS[1] : 锁名称
ARGV[1]: 锁失效时间
ARGV[2]: threadId; 锁的小key
exists: 判断数据是否存在 name:是lock是否存在,如果==0,就表示当前这把锁不存在
redis.call(‘hset’, KEYS[1], ARGV[2], 1);此时他就开始往redis里边去写数据 ,写成一个hash结构
Lock{
threadId : 1
}
如果当前这把锁存在,则第一个条件不满足,再判断
redis.call(‘hexists’, KEYS[1], ARGV[2]) == 1
此时需要通过大key+小key判断当前这把锁是否是属于自己的,如果是自己的,则进行
redis.call(‘hincrby’, KEYS[1], ARGV[2], 1)
将当前这个锁的value进行+1 ,redis.call(‘pexpire’, KEYS[1], ARGV[1]); 然后再对其设置过期时间,如果以上两个条件都不满足,则表示当前这把锁抢锁失败,最后返回pttl,即为当前这把锁的失效时间
如果小伙伴们看了前边的源码, 你会发现他会去判断当前这个方法的返回值是否为null,如果是null,则对应则前两个if对应的条件,退出抢锁逻辑,如果返回的不是null,即走了第三个分支,在源码处会进行while(true)的自旋抢锁。
"if (redis.call('exists', KEYS[1]) == 0) then " +
"redis.call('hset', KEYS[1], ARGV[2], 1); " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return nil; " +
"end; " +
"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
"redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return nil; " +
"end; " +
"return redis.call('pttl', KEYS[1]);"
1.4 分布式锁-redission锁重试和WatchDog机制
说明:触发锁重试机制的前提必须指定tryLock方法的等待时间参数,下面进行源码跟踪。
进入内部tryLock方法块,执行流程如下:
@Override
public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException {
long time = unit.toMillis(waitTime); // 将等待时间的单位转为毫秒
long current = System.currentTimeMillis(); // 记录当前时间
long threadId = Thread.currentThread().getId();
// 上来直接尝试获取,我们知道获取获取锁的lua脚本返回值有null和pttl;
// null代表获取成功;
// pttl代表锁的过期剩余时间
Long ttl = tryAcquire(waitTime, leaseTime, unit, threadId);
if (ttl == null) { // 条件成立代表锁获取成功
return true;
}
// 获取锁失败, 验证获取锁的等待时间(time参数)是否消耗殆尽
time -= System.currentTimeMillis() - current; // 剩余等待时间 = 等待时间 - 第一次获取锁失败的消耗时间
if (time <= 0) { // 条件成立,获取锁失败并执行结束,返回false
acquireFailed(waitTime, unit, threadId);
return false;
}
// 还有剩余的等待时间,进行锁重试
current = System.currentTimeMillis(); // 记录当前时间
/**
* 2.订阅锁释放事件,在执行unLock方法的lua脚本后会发布锁释放消息
* 此时通过 await 方法阻塞等待锁释放,有效的解决了无效的锁申请浪费资源的问题:
* 基于信息量,当锁被其它资源占用时,当前线程通过 Redis 的 channel 订阅锁的释放事件,一旦锁释放会发消息通知待等待的线程进行竞争.
*
* 当 subscribeFuture.await 返回 false,说明等待时间已经超出获取锁最大等待时间,取消订阅并返回获取锁失败.
* 当 subscribeFuture.await 返回 true,进入循环尝试获取锁.
*/
RFuture<RedissonLockEntry> subscribeFuture = subscribe(threadId);
if (!subscribeFuture.await(time, TimeUnit.MILLISECONDS)) {
if (!subscribeFuture.cancel(false)) {
subscribeFuture.onComplete((res, e) -> {
if (e == null) {
unsubscribe(subscribeFuture, threadId);
}
});
}
acquireFailed(waitTime, unit, threadId);
return false;
}
// subscribeFuture.await 返回 true,上面代码块的条件不成立,不走,进入循环重试获取锁
try {
time -= System.currentTimeMillis() - current; // 剩余等待时间 = 等待时间 - 第一次获取锁失败的消耗时间
if (time <= 0) { // 条件成立,获取锁失败并执行结束,返回false
acquireFailed(waitTime, unit, threadId);
return false;
}
// while(True)循环重试获取锁
while (true) {
long currentTime = System.currentTimeMillis(); // 记录当前时间
ttl = tryAcquire(waitTime, leaseTime, unit, threadId); // 尝试获取锁,这里的逻辑和开头的代码一致
if (ttl == null) {
return true;
}
time -= System.currentTimeMillis() - currentTime;
if (time <= 0) {
acquireFailed(waitTime, unit, threadId);
return false;
}
// 还有剩余等待时间,再次获取当前时间
currentTime = System.currentTimeMillis();
if (ttl >= 0 && ttl < time) {
//如果剩余时间(ttl)小于wait time ,就在 ttl 时间内,从Entry的信号量获取一个许可(除非被中断或者一直没有可用的许可),等待并获取其它线程释放的信号
subscribeFuture.getNow().getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
} else {
//则就在wait time 时间范围内等待可以接收的信号量
subscribeFuture.getNow().getLatch().tryAcquire(time, TimeUnit.MILLISECONDS);
}
// 剩余等待时间 = 等待时间 - 第一次获取锁失败的消耗时间
time -= System.currentTimeMillis() - currentTime;
if (time <= 0) { // 等待时间耗尽,结束
acquireFailed(waitTime, unit, threadId);
return false;
}
// 走到这里,无代码了,继续循环,继续重试获取锁
}
} finally {
// 无论是否获得锁,都要取消订阅解锁消息
unsubscribe(subscribeFuture, threadId);
}
// return get(tryLockAsync(waitTime, leaseTime, unit));
}
现在理解了锁的重试机制,明白未抢到锁的线程,会根据等待时间是否充分,去决定是否重试获取锁。
假设线程1获取锁成功了,但是在执行业务遇到阻塞情况,此时pttl过期了,导致锁超时释放。另一个哥们(线程2)接收到前面线程1的pttl过期并释放锁的信号,随后线程2获取锁后就出现线程安全问题了。这个问题该怎么解决呢?没错,接下来就要讲到我们Redission的看门狗机制了。
首先进入tryLock方法,跟进一下调用链,来到tryAcquireOnceAsync方法:
private RFuture<Boolean> tryAcquireOnceAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId) {
if (leaseTime != -1) {
return tryLockInnerAsync(waitTime, leaseTime, unit, threadId, RedisCommands.EVAL_NULL_BOOLEAN);
}
// //当leaseTime = -1 时,启动 watch dog机制,getLockWatchdogTimeout()默认指定过期时间为30s
RFuture<Boolean> ttlRemainingFuture = tryLockInnerAsync(waitTime,
commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(),
TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_NULL_BOOLEAN);
// 执行完lua脚本后的回调
ttlRemainingFuture.onComplete((ttlRemaining, e) -> {
if (e != null) {
return;
}
// 成功获取锁,更新有效期,内部源码通过定时任务每隔10s,定时重置有效期
if (ttlRemaining) {
// 跟进此方法
scheduleExpirationRenewal(threadId);
}
});
return ttlRemainingFuture;
}
private void scheduleExpirationRenewal(long threadId) {
// 一个锁就对应自己的一个ExpirationEntry类实例对象
ExpirationEntry entry = new ExpirationEntry()
// EXPIRATION_RENEWAL_MAP是一个线程安全的ConcurrentMap集合;
// putIfAbsent方法添加键值对,如果添加的键在集合中不存在,即添加成功,返回null;
// putIfAbsent方法添加键值对,如果添加的键在集合中存在,即添加失败,不替换value,返回原有的value值;
// 下面利用集合存储<EntryName是指锁的名称:entry对象>键值对
ExpirationEntry oldEntry = EXPIRATION_RENEWAL_MAP.putIfAbsent(getEntryName(), entry);
if (oldEntry != null) { // 满足条件,代表线程进行锁的重入,不进行延期操作
// 将线程ID加入
oldEntry.addThreadId(threadId);
} else { // 第一次获取锁,进行延期操作
// 将线程ID加入
entry.addThreadId(threadId);
// 延期方法,下面跟进这个方法
renewExpiration();
}
}
private void renewExpiration() {
// 根据锁的名称获取entry对象
ExpirationEntry ee = EXPIRATION_RENEWAL_MAP.get(getEntryName());
// 如果集合不存在,那不再锁续期
if (ee == null) {
return;
}
//这个是一个延迟任务,每隔10秒执行一次
Timeout task = commandExecutor.getConnectionManager().newTimeout(new TimerTask() {
//延迟任务内容
@Override
public void run(Timeout timeout) throws Exception {
// 根据锁的名称获取entry对象
ExpirationEntry ent = EXPIRATION_RENEWAL_MAP.get(getEntryName());
// 如果集合不存在,那不再锁续期
if (ent == null) {
return;
}
// 获取在scheduleExpirationRenewal中存入的线程id
Long threadId = ent.getFirstThreadId();
if (threadId == null) {
return;
}
// 线程不为空,调用renewExpirationAsync方法刷新过期时间,重置为30s
RFuture<Boolean> future = renewExpirationAsync(threadId);
future.onComplete((res, e) -> { // res:返回值 e:异常信息
if (e != null) {
log.error("Can't update lock " + getName() + " expiration", e);
return;
}
if (res) { // 续期成功
//renewExpirationAsync方法执行成功之后,进行递归调用,调用自己本身函数
//那么就可以实现这样的效果
//首先第一次进行这个函数,设置了一个延迟任务,在10s后执行
//10s后,执行延迟任务的内容,刷新有效期成功,那么就会再新建一个延迟任务,刷新过期时间
//这样这个过期时间就会一直续费
renewExpiration();
}
});
}
}, internalLockLeaseTime / 3, TimeUnit.MILLISECONDS); // internalLockLeaseTime/3=10秒执行一次
ee.setTimeout(task);
}
protected RFuture<Boolean> renewExpirationAsync(long threadId) {
// 执行lua脚本,给过期时间进行续期30s
return evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return 1; " +
"end; " +
"return 0;",
Collections.singletonList(getName()),
internalLockLeaseTime, getLockName(threadId));
}
现在到了这步,看门狗就会一直进行续期,什么时候结束呢?当然是在释放锁的过程中,下面是释放锁的方法:
public RFuture<Void> unlockAsync(long threadId) {
RPromise<Void> result = new RedissonPromise<Void>();
// unlockInnerAsync方法执行释放锁的lua脚本
RFuture<Boolean> future = unlockInnerAsync(threadId);
// 回调方法
future.onComplete((opStatus, e) -> {
// 无论lua脚本是否执行成功,都会执行cancelExpirationRenewal方法来删除EXPIRATION_RENEWAL_MAP中的缓存
cancelExpirationRenewal(threadId);
if (e != null) {
result.tryFailure(e);
return;
}
if (opStatus == null) {
IllegalMonitorStateException cause = new IllegalMonitorStateException("attempt to unlock lock, not locked by current thread by node id: "
+ id + " thread-id: " + threadId);
result.tryFailure(cause);
return;
}
result.trySuccess(null);
});
return result;
}
void cancelExpirationRenewal(Long threadId) {
ExpirationEntry task = EXPIRATION_RENEWAL_MAP.get(getEntryName());
if (task == null) {
return;
}
if (threadId != null) { // 删除entry对象中的线程id
task.removeThreadId(threadId);
}
if (threadId == null || task.hasNoThreads()) {
Timeout timeout = task.getTimeout();
if (timeout != null) {
timeout.cancel();
}
// 删除集合中此锁对应的entry对象缓存,这样看门狗那边就获取不到此对象,停止续期了
EXPIRATION_RENEWAL_MAP.remove(getEntryName());
}
}
如果释放锁操作本身异常了,watch dog 还会不停的续期吗?
不会。因为无论在释放锁的时候,是否出现异常,都会执行释放锁的回调函数,把看门狗停了
有没有设想过一种场景?服务器宕机了?其实这也没关系,首先获取锁和释放锁的逻辑都是在一台服务器上,那看门狗的续约也就没有了,redis中只有一个看门狗上次重置了30秒的key,时间到了key也就自然删除了,那么其他服务器,只需要等待redis自动删除这个key就好了,也就不存在死锁了
总结
(1)watchDog 在当前节点存活时,每10s给分布式锁的key续期 30s
(2)watchDog 最终还是通过 Lua脚本的expire命令来进行重置有效期,更新有效期
(3)watchDog 机制启动,且代码中没有释放锁操作时,watchDog 会不断的给锁续期
(4)要使 watchDog 机制生效 ,就不要设置锁过期时间 leaseTime
(5)锁续期是通过一个定时任务,在 renewExpiration() 中自己调自己实现的!
1.5 分布式锁-redission锁的MutiLock原理
为了提高redis的可用性,我们会搭建集群或者主从,现在以主从为例
此时我们去写命令,写在主机上, 主机会将数据同步给从机,但是假设在主机还没有来得及把数据写入到从机去的时候,此时主机宕机,哨兵会发现主机宕机,并且选举一个slave变成master,而此时新的master中实际上并没有锁信息,此时锁信息就已经丢掉了。
为了解决这个问题,redission提出来了MutiLock锁,使用这把锁咱们就不使用主从了,每个节点的地位都是一样的, 这把锁加锁的逻辑需要写入到每一个主丛节点上,只有所有的服务器都写入成功,此时才是加锁成功,假设现在某个节点挂了,那么他去获得锁的时候,只要有一个节点拿不到,都不能算是加锁成功,就保证了加锁的可靠性。
那么MutiLock 加锁原理是什么呢?笔者画了一幅图来说明
当我们去设置了多个锁时,redission会将多个锁添加到一个集合中,然后用while循环去不停去尝试拿锁,但是会有一个总共的加锁时间,这个时间是用需要加锁的个数 * 1500ms ,假设有3个锁,那么时间就是4500ms,假设在这4500ms内,所有的锁都加锁成功, 那么此时才算是加锁成功,如果在4500ms有线程加锁失败,则会再次去进行重试.