Redis分布式锁
一。什么是分布式锁
在讨论分布式锁之前我们回顾一下一些单机锁,比如synchronized、Lock 等
锁的基本特性:
1.互斥性:同一时刻只能有一个节点访问共享资源,比如一个代码块,或者同一个订单同一时刻只能有一个线程去支付等。
2.可重入性: 允许一个已经获得锁的线程,在没有释放锁之前重新去获得锁
3.锁的获取和释放,锁的失效机制是避免死锁 的一个问题
分布式锁也是基于这些特性来实现的,只不过是在分布式环境中来实现的。
那什么是分布式锁呢:
分布式锁独立于业务服务的,是一种跨进程的,跨机器节点的一种互斥锁。保证多个机器节点对共享资源访问的一个排他性。
二。自己设计一个锁,如何实现
先分析一下需求:
1.只能有一个线程能同时执行互斥的资源
2.其他的线程执行的时候,如果有线程要执行的话,要么等待,要么报错
实现方案:
1.要有一个标记来标记一个线程是不是在执行
比如说小黑屋,如何保证一个小黑屋只有一个人,就是门,如果一个人进去之后把门锁住,那就能保证一个小黑屋只能有一个人了
单机锁中的synchronized 是把这个标记放到对象头的,JUC中的Lock实现的时候也是有一个state来标记的
2.这个标记是可见的。
也就是说小黑屋里一个人进去之后 得让所有人能获取到这个小黑屋里有没有人的最新结果
如何解决呢 可以用 volatile
3.获取 这个标记不能同时抢占成功,这一步必须是安全的。
也就是说一个人进小黑屋,至少分三步,进门,关门,锁门
如果说这三个操作不是原子性的话,一个人虽然进去了,但是还没有锁门,另外一个人也是可以进的,这样就不能保证互斥性
比如说JUC的Lock是怎么实现的 是通过cas(Compare And Swap 比较交换)实现的
三。Redis是怎么实现分布式锁的
1.标记
Redis key-value 结构 这个key就作为一个标记,这个key存在就说明有人在做,如果不存在就说明没人在做
2.可见性
也就是说我再get的时候,其他线程是不能set的
Redis中是如何保证的呢?单线程来实现的 因为必须要set完之后才能get set和get是顺序执行的
3.保证原子性
setnx 是单线程执行的 setnx 是一个原子性的指令
Redis获取锁与占有锁不是原子的,导致锁失效
为什么要原子性可以看下图:
4.保证原子性Redis还有两种方式
1.事务
multi 开启一个事务
exec 提交
discard 回滚
比如:
提交之后才能看到数据
回滚
事务中命令可以是原子的,但是不能根据中间的指令的结果来决定后续的逻辑,所以实现不了 if(exists(‘pay:111’) == 0) == true {set(‘pay:111’, 1)}
问题:两个开启的事务,同时修改一个值怎么办
用到了redis中的watch(监控)
如下:
watch id //开启监控
multi //开启一个事务
incr id //将id加1 但是没有提交事务
如果此时有另外一个客户端 再将 id加1 如下:
此时回到第一个客户端 进行提交时 会报nil 也就是说不能执行
2.Lua脚本
四。redis的发布订阅机制
Redis 发布订阅 (pub/sub) 是一种消息通信模式:发送者 (pub) 发送消息,订阅者 (sub) 接收消息,发布者可以向指定的渠道 (channel) 发送消息,订阅者如果订阅了该频道的话就能收到消息,从而实现多个客户端的通信效果。
订阅的命令是SUBSCRIBE channel[channel …],可以订阅一个或多个频道,当有新消息通过PUBLISH命令发送给频道时,订阅者就能收到消息,就好像这样:
开启两个客户端,一个订阅了频道channel1,另一个通过PUBLISH发送消息后,订阅的那个就能收到了,靠这种模式就能实现不同客户端之间的通信。
五。问题
我们知道了redis的这些特性,那如果redis想要实现分布式锁,就得考虑几个问题:
1.重入锁怎么做,同一个线程能加锁多次
重入锁为了防止死锁,因为自己等待自己很容易死锁
重入锁需要做到以下三点:
1.互斥key 大Key
2.知道线程信息 field
3.保存重入次数 value hincrby(线程安全的)
Redis中的Hash完美的解决
2.假如30s锁会过期,但是业务还没有执行完,锁失效了怎么办
redission中的看门狗机制
redission 实现分布式锁
分布式锁的核心功能其实就三个:加锁、解锁、设置锁超时。这三个功能也是我们研究Redisson分布式锁原理的方向。
首先看一下redission 实现分布式的原理图
一。Redisson的源码
在使用Redisson加锁之前,需要先获取一个RLock实例对象,有了这个对象就可以调用lock、tryLock方法来完成加锁的功能
@Bean
@Primary
public Config redissonConfig() {
Config config = new Config();
config.useSingleServer()
.setAddress("redis://" + redisConfig.getHost() + ":" + redisConfig.getPort())
.setPassword(redisConfig.getPassword())
.setDatabase(redisConfig.getDatabase());
return config;
}
@Bean
@Primary
public RedissonClient redissonClient(Config config) {
RedissonClient redissonClient = Redisson.create(config);
return redissonClient;
}
String lockKey = String.format(RedisKeys.APPROVAL_ACTION_KEY, approveParamDto.getProcessInstanceId());
RLock lock = redissonClient.getLock(lockKey);
RLock是一个接口,具体的同步器需要实现该接口,当我们调用redisson.getLock()时,程序会初始化一个默认的同步执行器RedissonLock