redis系列整体栏目
内容 | 链接地址 |
---|---|
【一】redis基本数据类型和使用场景 | https://zhenghuisheng.blog.csdn.net/article/details/142406325 |
【二】redis的持久化机制和原理 | https://zhenghuisheng.blog.csdn.net/article/details/142441756 |
【三】redis缓存穿透、缓存击穿、缓存雪崩 | https://zhenghuisheng.blog.csdn.net/article/details/142577507 |
【四】redisson实现分布式锁实战和源码剖析 | https://zhenghuisheng.blog.csdn.net/article/details/142646301 |
如需转载,请输入:https://blog.csdn.net/zhenghuishengq/article/details/142577507
redisson实现分布式锁实战和源码剖析
- 一,redisson实现分布式锁实战和源码剖析
- 1,redis原生方式实现分布式锁
- 2,Redisson实现分布式锁
- 2.1,ReentrantLock 实现锁
- 2.2,Redission实现分布式锁案例
- 2.3,Redission底层实现原理和源码剖析
- 2.3.1,lock加锁逻辑
- 2.3.2,lock锁续命逻辑
- 2.3.3,加锁失败阻塞逻辑
- 2.3.4,unlock锁释放
- 3,Redisson总结
一,redisson实现分布式锁实战和源码剖析
前面几篇讲解了redis的基本数据类型,接下来在本文中,讲解一下如何通过redis实现一把分布式锁。在分布式环境中,所有的jvm层面的锁将会失去该有的作用,因此在分布式环境中,可以通过redis来实现这种分布式锁,说白了就是在分布式和高并发的环境下,将并行的线程改成串行。
1,redis原生方式实现分布式锁
在redis内部,提供了实现分布式锁的方式,可以直接通过 setnx 命令的方式来,加下来直接通过代码的方式来演示一段扣减库存的代码,比如以下这段代码,对id为1001的手机进行扣减库存,此时redis中设置了库存为100
set phoneCount 100
随后自定义一把分布式锁,在设置 key/value 的同时,并设置过期时间,可以保证整条命令的原子性
@RestController
@RequestMapping("/test")
@Slf4j
public class StockController {
@Resource
private RedisTemplate redisTemplate;
//手机id
public static final String PHONE_ID = "phone:1001";
//手机数量
public static final String PHONE_COUNT = "phoneCount";
@GetMapping("/disStock")
public AjaxResult disStock(){
//每个线程分配一个唯一标识
String flag = UUID.randomUUID().toString();
//定义一把分布式锁,设置有效期为30s
redisTemplate.opsForValue().setIfAbsent(PHONE_ID, flag, 30, TimeUnit.SECONDS);
try {
//当前库存
Integer stock = Integer.parseInt(redisTemplate.opsForValue().get(PHONE_COUNT) + "");
if (stock > 0){
stock = stock - 1;
redisTemplate.opsForValue().set(PHONE_COUNT,stock);
log.info("当前库存值为:" + stock);
}else{
log.info("当前库存为空,扣减失败");
}
}finally {
redisTemplate.delete(PHONE_ID);
}
return AjaxResult.success();
}
}
如果是在并发量不大,或者说能接受超卖的情况下,上面这种方式实现分布式锁是够用的。
当然上面这种方式实现也有问题,就是有可能锁被误删的问题。假设此时线程1先拿到锁,线程2来时发现线程1已经拿到锁,那么线程2就会等待线程1执行完。但是现在还有问题,锁设置了一个过期时间30s,假设说线程1在执行下面这段代码的时候,可能逻辑特别复杂执行时间超过了30s,假设需要花费40s才能完成,那么在30s的时候,锁就过期了,那么线程2就能去抢锁
if (stock > 0){
stock = stock - 1;
xxxxx //业务需要执行40s
redisTemplate.opsForValue().set(PHONE_COUNT,stock);
log.info("当前库存值为:" + stock);
}
但是线程1还是在执行的,假设此时线程2正拿到锁在执行任务,在40s后线程1执行完的时候,直接把这把锁给删了,导致线程2锁又失效了,线程2又没执行完,后面又会执行扣库存,删锁的命令,这样就会导致后面的线程的锁都会被莫名其妙的删除,库存方面最终也会出现超卖的问题。
redisTemplate.delete(PHONE_ID);
而且还会导致超卖问题,如线程1还没有set减1的操作到redis中,线程2拿到的还是100,按理来说是线程1减掉的值99,然后还是对100进行操作,如果是在高并发环境下,就会严重的出现超卖的问题。
因此需要在删锁时做一个进一步的优化,判断一下加锁的唯一标识是不是当前线程的唯一标识,是的话才能删
if(flag.equals(redisTemplate.opsForValue().get(PHONE_ID))){
redisTemplate.delete(PHONE_ID);
}
上面这种情况在系统稳定的时候进行释放锁时没有问题,但是也可能遇到极端的情况,比如在释放锁之前遇到系统卡顿的情况,导致还没执行释放锁的命令锁又过期了,这样别的线程又能抢锁,然后当前线程在执行到删除锁命令的时候,有把别的抢到锁的线程的锁给删除了,又出现了上面的这个 超卖 的问题
if(flag.equals(redisTemplate.opsForValue().get(PHONE_ID))){
xxxx //系统卡顿
redisTemplate.delete(PHONE_ID);
}
总而言之如果用redis的自定义分布式锁时,会由于锁的超时时间、锁过期和锁删除机制,会导致出现 超卖问题。其主要原因是锁会过期,这样导致未执行完的线程还会对锁进行删除的操作,导致其他线程锁失效。
2,Redisson实现分布式锁
虽然说redis确实可以通过自定义的方式实现一把分布式锁,但是其内部确实还存在一些问题,如经典的超卖问题,其主要原因还是,不能控制每个线程的过期时间,导致如果某个线程超时的话,就会出现锁提前释放,后续也可能出现将其他线程的锁删除的行为
因此出现了redisson分布式锁的实现,其官网链接地址如下:https://github.com/redisson/redisson/wiki/%E7%9B%AE%E5%BD%95
2.1,ReentrantLock 实现锁
看到这些可重入锁,公平锁等等,可以联想到JDK内部的JUC的实现,如可以查看本人写的JUC系列的 ReentrantLock的实现: https://blog.csdn.net/zhenghuishengq/article/details/132857564
实现一把单JVM进程锁的方式如下,在定义完一把 ReentrantLock 锁之后呢,直接调用内部两个简单的api就能实现加锁和解锁,底层通过aqs实现
ReentrantLock lock = new ReentrantLock();
lock.lock();
lock.unlock()
底层通过clh同步等待队列实现,同时还支持公平锁和非公平锁,由于本文主角是redission,因此详细可以去看上面给的文章的链接
2.2,Redission实现分布式锁案例
接下来针对上面这段自定义实现的分布式锁,通过redisson进行优化
public class StockController {
@Resource
private RedisTemplate redisTemplate;
@Resource
private Redisson redisson;
//手机id
public static final String PHONE_ID = "phone:1001";
//手机数量
public static final String PHONE_COUNT = "phoneCount";
@GetMapping("/disStock")
public AjaxResult disStock(){
String flag = UUID.randomUUID().toString();
//定义一把分布式锁,设置有效期为30s
RLock lock = redisson.getLock(PHONE_ID);
lock.lock();
try {
//当前库存
Integer stock = Integer.parseInt(redisTemplate.opsForValue().get(PHONE_COUNT) + "");
if (stock > 0){
stock = stock - 1;
redisTemplate.opsForValue().set(PHONE_COUNT,stock);
log.info("当前库存值为:" + stock);
}else{
log.info("当前库存为空,扣减失败");
}
}finally {
lock.unlock();
}
return AjaxResult.success();
}
}
通过上面这段代码可以发现,其内部的实现方式和ReentrantLock的实现是很像的,都是在获取到相对于的实例对象之后,通过lock方式加锁和通过unlock的方式进行解锁
RLock lock = redisson.getLock(PHONE_ID);
lock.lock();
lock.unlock();
2.3,Redission底层实现原理和源码剖析
2.3.1,lock加锁逻辑
在看源码之前,来先对加锁内部做一个预想,无非就是抢锁、没抢到的阻塞,阻塞的线程轮询抢锁。
接下里进入内部源码查看,先进入这个lock方法,然后进入这个 lockInterruptibly 方法,首先会获取到当前线程id,需要给后面使用
接下来就是进入重要的 tryAcquire 方法,这个就是主要的获取锁的逻辑代码
Long ttl = this.tryAcquire(leaseTime, unit, threadId);
最后进入这个最重要的 tryLockInnerAsync 方法,内部其实是通过一个lua脚本来实现原子性,由于redis中执行任务的线程还是单线程,因此下面这一大段都可以保证操作的原子性
感兴趣的可以了解一下lua脚本,上面也通过箭头表明了lua脚本的参数,正常的对象都是通过 key/value 的方式表示,在lua脚本中,前面的这个集合标识key,后面的几个参数表示value,就是前面的 getName 是key,后面的 internalLockLeaseTime, getLockName(threadId) 是value,通过ARGV表示。lua脚本官方更加推荐使用一个key对应一个value,当然也允许key有1个或者多个,value有1个或者多个
Collections.<Object>singletonList(getName()), internalLockLeaseTime, getLockName(threadId));
接下来分析这段代码,其本质就是一个通过hset设置值,
- key是KEYS[1],代表的是getName(),这个name由外部提供,在实例化的时候传了一个key进来redisson.getLock(PHONE_ID),那么这个name此时表示的就是外部设置的手机id。
- ARGV[2]表示第二个参数,对应的是 getLockName(threadId) ,就是线程id
- 最后通过pexpire设置一个过期时间,此时的 ARGV[1]表示的是 internalLockLeaseTime,在内部定义了这个时间 private long lockWatchdogTimeout = 30 * 1000; 默认就是30s,看名字就知道是一个看门狗的超时机制
- 如果设置成功,那么就回返回一个nil值,对应java里面的null值
"if (redis.call('exists', KEYS[1]) == 0) then " +
"redis.call('hset', KEYS[1], ARGV[2], 1); " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return nil; " +
"end; "
redis通过这种管道的方式实现lua脚本,减少网络开销,同时保证操作执行的原子性,在redis官方也有介绍,可以直接通过lua脚本代替redis的事务。
2.3.2,lock锁续命逻辑
上面的讲解的就是 tryLockInnerAsync 方法,通过异步的方式去拿到锁,通过Future阻塞拿到执行任务的结果,拿到执行结果之后,再回调一下这个 addListener 方法
接下来主要是看这个 addListener 中的核心方法 scheduleExpirationRenewal ,第一眼可以看到里面就是一个定时任务的线程类,看默认就是会在 internalLockLeaseTime / 3 时间内执行一次,也就是10s后执行一次
这一块的内部实现延时通过lua脚本,实现锁续命机制。其续命逻辑也很简单,如果10s后线程还没执行完成,内部会通过递归的方式循环调用,继续调用这个 scheduleExpirationRenewal 方法,很多中间件实现这种续命的方式都是采用内部递归调用的方式
//判断上一次续命是否续命成功
if (future.getNow()) {
// reschedule itself
scheduleExpirationRenewal(threadId);
}
2.3.3,加锁失败阻塞逻辑
当某个线程加锁失败时,那么该线程就会设置成阻塞状态,从而让出cpu的使用权。依旧得看这个抢锁逻辑的lua脚本,看到最后一句,如果抢锁失败的话,那么就会返回一个pttl的状态,其实就是一个拿到锁的过期时间。比如拿到锁的线程1已经执行了10s,那么来拿锁的线程2就会获取到剩余20s的过期时间
再回到进入这个最初的抢锁方法中,可以发现每一个线程都会返回一个ttl的过期时间,首先会对这个ttl超时时间进行判断,上面抢锁的逻辑可以看到,如果拿到锁的线程会返回一个nil,就是对应java中的null,如果不为null,就会继续往下执行
在进行阻塞的时候,作者使用了发布订阅的模式进行了优化 ,在线程进行阻塞时,把这些队列都订阅一个主题,当有锁释放的时候,那么就唤醒订阅了这个主题的线程。
RFuture<RedissonLockEntry> future = subscribe(threadId);
commandExecutor.syncSubscription(future);
随后进入一个自旋获取锁的阶段,在JDK的 ReentrantLock 中,就是采用的自旋的方式获取锁。但是在redis分布式情况下,一般都是适用于高流量高并发的场景,因此是不能完全一直空转自旋的,而且想想默认设置30s一个线程,刚运行就让大量的线程在那空转,肯定是不合适的
接下来看内部的这段自旋的代码,接下来主要分析这段ttl大于0的情况,getLatch表示一个JDK中的Semaphore信号量,这里用来做阻塞操作,比如说获取到的ttl为15s,那么这个线程就阻塞15s在这里,再进行一次自旋抢锁,而不是像 ReentrantLock 一样一直空转自旋在那抢锁,从而降低cpu的使用率,同时通过阻塞让出cpu的使用权
try {
while (true) {
ttl = tryAcquire(leaseTime, unit, threadId);
// lock acquired
if (ttl == null) {
break;
}
if (ttl >= 0) { //如果ttl大于0
//
getEntry(threadId).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
} else {
getEntry(threadId).getLatch().acquire();
}
}
} finally {
unsubscribe(future, threadId); //唤醒订阅的线程
}
通过这种信号量阻塞的方式,达到间歇性加锁的目的。并且在抢锁时,内部没有公平锁的概念,默认就是非公平锁。
2.3.4,unlock锁释放
在上面的加锁失败阻塞中,讲了有这段加锁失败阻塞的方法,内部提供了一个同步订阅的方法,就是每个加锁失败的线程会订阅一个topic主题,当锁被释放或者过期之后就能通知订阅的线程来抢锁,不然每次自旋抢锁就太低效了,比如5s中执行完了,设置的30s,剩余的ttl还有25s,还要等25s去抢锁,显然不合适
RFuture<RedissonLockEntry> future = subscribe(threadId);
commandExecutor.syncSubscription(future);
接下来查看这段释放锁的逻辑,通过 unlockInnerAsync 进行释放锁
详细查看这个 unlockInnerAsync 方法之后,内部又是一个lua脚本,主要判断锁是否存在,如果存在则进行解锁的操作,然后通过 publish 发送一条消息给订阅了这个主题的所有线程可以来抢锁,内部还包含一些可重入锁等
3,Redisson总结
redission主要通过lua脚本来实现加锁和解锁的操作,从而保证相关操作的原子性,其主要操作有以下步骤。
- 线程进来先执行抢锁的操作,抢锁成功则继续往下执行业务,并且通过内部递归的方式给当前线程一个watch dog 看门狗的一个续命方式。
- 抢锁失败线程则阻塞,并将线程关注一个发布订阅的模型,供锁释放时被唤醒。并且内部通过间接性的方式轮旋抢锁,时间间隔为当前线程结束时间的ttl
- unlock释放锁时会往一个发布订阅的模型里面发送消息,关注了这个模型的线程接收到消息之后,则会被唤醒,从而的去进行加锁的操作