文章目录
- 分布式锁
- 什么是分布式锁
- 分布式锁的实现
- 基于redis实现分布式锁
- 分布式锁初级版本
- redis分布式锁的误删问题
- 分布式锁的原子性问题
- lua脚本解决多条命令原子性问题
- Java调用lua脚本改进redis的分布式锁
- 分布式锁-Redisson功能介绍
- 分布式锁-Redisson快速入门
- Redisson可重入原理
- Redisson的锁重试和WatchDog机制
- Redisson的multiLock原理
接上篇,在集群模式下,synchronized锁失效了
得让多个JVM监视器都能使用这个外部的锁才行——这就是分布式锁。
分布式锁
关键是让多个JVM进程都能看见这个锁
什么是分布式锁
分布式锁的实现
基于redis实现分布式锁
但这种方案在获取锁以后,要是服务宕机了,那就无法释放锁了,所以我们采用下面的方法:
分布式锁初级版本
先写一个分布式锁的接口
package com.hmdp.utils;
public interface ILock {
/**
* 尝试获取锁
* @param timeoutSec
* @return
*/
boolean tryLock(long timeoutSec);
/**
* 释放锁
*/
void unlock();
}
在写一个实现类
package com.hmdp.utils;
import org.springframework.data.redis.core.StringRedisTemplate;
import java.util.concurrent.TimeUnit;
public class SimpleRedisLock implements ILock{
private String name;
private StringRedisTemplate stringRedisTemplate;
public SimpleRedisLock(String name, StringRedisTemplate stringRedisTemplate) {
this.name = name;
this.stringRedisTemplate = stringRedisTemplate;
}
private static final String KEY_PREFIX = "lock:";
@Override
public boolean tryLock(long timeoutSec) {
// 获取线程标识
long threadId = Thread.currentThread().getId();
// 获取锁
Boolean success = stringRedisTemplate.opsForValue().setIfAbsent(KEY_PREFIX + name, threadId + "", timeoutSec, TimeUnit.SECONDS);
return Boolean.TRUE.equals(success);
}
@Override
public void unlock() {
// 释放锁
stringRedisTemplate.delete(KEY_PREFIX + name);
}
}
然后用我们实现的这个简单的分布式锁来改造我们的一人一单的加锁逻辑
@Override
public Result seckillVoucher(Long voucherId) {
// 1. 查询优惠券
SeckillVoucher voucher = seckillVoucherService.getById(voucherId);
// 2. 判断秒杀是否开始
if (voucher.getBeginTime().isAfter(LocalDateTime.now())) {
// 尚未开始
return Result.fail("秒杀尚未开始!");
}
// 2/ 判断秒杀是否结束
if (voucher.getEndTime().isBefore(LocalDateTime.now())) {
// 已经结束
return Result.fail("秒杀已经结束!");
}
// 4. 判断库存是否充足
if (voucher.getStock()<1) {
// 库存不足
return Result.fail("库存不足!");
}
Long userId = UserHolder.getUser().getId();
/**
* 每一个请求过来,这个id对象都是一个全新的id对象,因为要是对userId加锁的话,对象变了锁就变了,那不行
* 我们希望id的值一样,所以用了toString(),但是toString()依旧不能保证是对对象的值加锁的
* toString底层是new 一个String数组,还是new了一个新对象,同一个用户id在不同的请求中过来,每次都new一个,还是不能把锁加载同一个用户上
* 于是用intern() ,intern()方法可以去字符串常量池中找字符串值一样的引用返回
* 这样一来,如果你的userId是5,不管你new了多少个字符串,只要值是一样的,返回的结果也一样。这样就可以锁住同一个用户
* 不同的用户不会被锁住
*/
/*synchronized (userId.toString().intern()) {
// 获取代理对象(事务)
IVoucherOrderService proxy = (IVoucherOrderService)AopContext.currentProxy(); // 拿到当前对象的代理对象,其实就是IVoucherOrderService这个接口的代理对象,返回的是Object,做个强转
return proxy.createVoucherOrder(voucherId); // 报错了是因为我们的接口中没有这个方法,那我们就在接口中创建一下这个方法
}*/
// 创建锁对象-初版分布式锁
SimpleRedisLock lock = new SimpleRedisLock("oder:" + userId, stringRedisTemplate);
// 获取锁
boolean isLock = lock.tryLock(1200L);
// 判断是否获取锁成功
if(!isLock){
// 获取锁失败,返回错误信息或重试
return Result.fail("不允许重复下单!");
}
try{
// 获取代理对象(事务)
IVoucherOrderService proxy = (IVoucherOrderService)AopContext.currentProxy(); // 拿到当前对象的代理对象,其实就是IVoucherOrderService这个接口的代理对象,返回的是Object,做个强转
return proxy.createVoucherOrder(voucherId); // 报错了是因为我们的接口中没有这个方法,那我们就在接口中创建一下这个方法
}finally{
// 释放锁
lock.unlock();
}
}
现在测试,我们就可以看见一个用户获取了锁之后,另一个用户就无法再次获取锁了。
redis分布式锁的误删问题
这种方案一般情况下没问题,但是有些情况下会有问题:
线程1如果因为某种原因业务产生阻塞,锁的持有周期就变长,如果阻塞的太长时间,超过了我们设置锁的超时时间,
业务还没结束,锁提前释放,其他的线程就可以获得锁,这就出现问题
当线程二拿到锁了之后,线程1的任务结束了,开始去释放锁,这时候线程1把线程2的锁给释放了。
线程3又来了,又再次拿到锁
这种极端情况产生的原因,其一:业务阻塞导致锁提前释放,其二:线程1把别人的线程的锁释放了
那怎么避免呢,我们往redis中存锁的时候,存了线程标识进去了,那我们释放锁的时候,把这个标识利用起来,就可以避免问题的发生。
因此业务流程就要变化,获取锁的时候, 要把线程标识存进去,释放锁的时候,要判断这个标识是不是自己。
我们只需要需改我们的锁的实现类
public class SimpleRedisLock implements ILock{
private String name;
private StringRedisTemplate stringRedisTemplate;
public SimpleRedisLock(String name, StringRedisTemplate stringRedisTemplate) {
this.name = name;
this.stringRedisTemplate = stringRedisTemplate;
}
private static final String KEY_PREFIX = "lock:";
private static final String ID_PREFIX = UUID.randomUUID().toString(true) + "-";
@Override
public boolean tryLock(long timeoutSec) {
// 获取线程标识
String threadId = ID_PREFIX + Thread.currentThread().getId();
// 获取锁
Boolean success = stringRedisTemplate.opsForValue().setIfAbsent(KEY_PREFIX + name, threadId, timeoutSec, TimeUnit.SECONDS);
return Boolean.TRUE.equals(success);
}
@Override
public void unlock() {
// 获取线程标识
String threadId = ID_PREFIX + Thread.currentThread().getId();
// 获取锁中的标识
String id = stringRedisTemplate.opsForValue().get(KEY_PREFIX + name);
// 判断标识是否一致
if(threadId.equals(id)){
// 释放锁
stringRedisTemplate.delete(KEY_PREFIX + name);
}
}
}
这次测试就发现锁的误删就解决了
但是还有问题:
分布式锁的原子性问题
问题简单描述,线程1在释放锁的时候判断锁是不是自己的,判断是自己的,然后准备删除锁的时候,这时候由于JVM垃圾回收等原因,导致了我们的线程1被阻塞了,好巧不巧,这时候锁超时了,自动释放了,然后线程2来了,获取了锁开始执行线程2的任务,然后线程1这时候反应过来了,他不知道锁已经释放了,他还记得自己就是锁的主人,然后,反手就把线程2的锁释放了,然后这时候线程3来了,又可以获取锁了,这不就出问题了。
那这次又因为什么出问题了呢?就因为,我们的判断和释放是两个动作,这两个动作之间产生了阻塞,产生了问题,那我们想避免这样的问题,就得让判断和释放的动作都是原子性的动作。
lua脚本解决多条命令原子性问题
Redis脚本提供了脚本功能,在一个脚本中编写多条Redis命令,确保多条命令执行的原子性。lua脚本是一种编程语言。
lua教程
先把脚本写出来
-- 获取锁中的线程标识 get key
local id = redis.call('get', KEYS[1])
-- 比较线程标识和锁中的标识是否一致
if(id == ARGV[1]) then
-- 释放锁 del key
return redis.call('del', KEYS[1])
end
-- 不一致,则直接返回
return 0
Java调用lua脚本改进redis的分布式锁
则我们的分布式锁的实现代码可以改成这样:
package com.hmdp.utils;
import cn.hutool.core.lang.UUID;
import org.springframework.core.io.ClassPathResource;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.data.redis.core.script.DefaultRedisScript;
import java.util.Collections;
import java.util.concurrent.TimeUnit;
public class SimpleRedisLock implements ILock{
private String name;
private StringRedisTemplate stringRedisTemplate;
// 我们提前把脚本准备好,这样每次使用的时候不用在去读脚本文件,减少IO
private static final DefaultRedisScript<Long> UNLOCK_SCRIPT;
static {
UNLOCK_SCRIPT = new DefaultRedisScript<>();
UNLOCK_SCRIPT.setLocation(new ClassPathResource("unlock.lua"));
UNLOCK_SCRIPT.setResultType(Long.class);
}
public SimpleRedisLock(String name, StringRedisTemplate stringRedisTemplate) {
this.name = name;
this.stringRedisTemplate = stringRedisTemplate;
}
private static final String KEY_PREFIX = "lock:";
private static final String ID_PREFIX = UUID.randomUUID().toString(true) + "-";
@Override
public boolean tryLock(long timeoutSec) {
// 获取线程标识
String threadId = ID_PREFIX + Thread.currentThread().getId();
// 获取锁
Boolean success = stringRedisTemplate.opsForValue().setIfAbsent(KEY_PREFIX + name, threadId, timeoutSec, TimeUnit.SECONDS);
return Boolean.TRUE.equals(success);
}
@Override
public void unlock(){
// 调用lua脚本
stringRedisTemplate.execute(
UNLOCK_SCRIPT,
Collections.singletonList(KEY_PREFIX + name),
ID_PREFIX + Thread.currentThread().getId());
}
/* @Override
public void unlock() {
// 获取线程标识
String threadId = ID_PREFIX + Thread.currentThread().getId();
// 获取锁中的标识
String id = stringRedisTemplate.opsForValue().get(KEY_PREFIX + name);
// 判断标识是否一致
if(threadId.equals(id)){
// 释放锁
stringRedisTemplate.delete(KEY_PREFIX + name);
}
}*/
}
把lua脚本放在文件读取
-- 获取锁中的线程标识 get key
local id = redis.call('get', KEYS[1])
-- 比较线程标识和锁中的标识是否一致
if(id == ARGV[1]) then
-- 释放锁 del key
return redis.call('del', KEYS[1])
end
-- 不一致,则直接返回
return 0
通过lua脚本的执行,实现判断和执行的原子性。
总结
这样就保障了我们的分布式锁的相对可靠,但还有问题
分布式锁-Redisson功能介绍
目前我们实现的分布式锁的问题如下:
官网地址
github地址
分布式锁-Redisson快速入门
<dependency>
<groupId>org.redisson</groupId>
<artifactId>redisson</artifactId>
<version>3.11.2</version>
</dependency>
@Configuration
public class RedissonConfig {
@Bean
public RedissonClient redissonClient(){
// 配置
Config config = new Config();
config.useSingleServer().setAddress("redis://192.168.10.88:6379").setPassword("123456");
// 创建Redisson对象
return Redisson.create(config);
}
}
不推荐使用springBoot整合Redisson的starter使用,因为他会替代我们Spring官方提供的对于Redis的配置和实现。所以使用分布式锁的时候,自己来配置Redisson更值得推荐。
来改造我们的获取锁的业务代码
@Autowired
private RedissonClient redissonClient;
@Override
public Result seckillVoucher(Long voucherId) {
// 1. 查询优惠券
SeckillVoucher voucher = seckillVoucherService.getById(voucherId);
// 2. 判断秒杀是否开始
if (voucher.getBeginTime().isAfter(LocalDateTime.now())) {
// 尚未开始
return Result.fail("秒杀尚未开始!");
}
// 2/ 判断秒杀是否结束
if (voucher.getEndTime().isBefore(LocalDateTime.now())) {
// 已经结束
return Result.fail("秒杀已经结束!");
}
// 4. 判断库存是否充足
if (voucher.getStock()<1) {
// 库存不足
return Result.fail("库存不足!");
}
Long userId = UserHolder.getUser().getId();
/**
* 每一个请求过来,这个id对象都是一个全新的id对象,因为要是对userId加锁的话,对象变了锁就变了,那不行
* 我们希望id的值一样,所以用了toString(),但是toString()依旧不能保证是对对象的值加锁的
* toString底层是new 一个String数组,还是new了一个新对象,同一个用户id在不同的请求中过来,每次都new一个,还是不能把锁加载同一个用户上
* 于是用intern() ,intern()方法可以去字符串常量池中找字符串值一样的引用返回
* 这样一来,如果你的userId是5,不管你new了多少个字符串,只要值是一样的,返回的结果也一样。这样就可以锁住同一个用户
* 不同的用户不会被锁住
*/
/*synchronized (userId.toString().intern()) {
// 获取代理对象(事务)
IVoucherOrderService proxy = (IVoucherOrderService)AopContext.currentProxy(); // 拿到当前对象的代理对象,其实就是IVoucherOrderService这个接口的代理对象,返回的是Object,做个强转
return proxy.createVoucherOrder(voucherId); // 报错了是因为我们的接口中没有这个方法,那我们就在接口中创建一下这个方法
}*/
// 创建锁对象-初版分布式锁
// SimpleRedisLock lock = new SimpleRedisLock("oder:" + userId, stringRedisTemplate);
RLock lock = redissonClient.getLock("lock:oder:" + userId); // 使用Redisson获取锁
// 获取锁
// boolean isLock = lock.tryLock(1200L);
boolean isLock = lock.tryLock();
// 判断是否获取锁成功
if(!isLock){
// 获取锁失败,返回错误信息或重试
return Result.fail("不允许重复下单!");
}
try{
// 获取代理对象(事务)
IVoucherOrderService proxy = (IVoucherOrderService)AopContext.currentProxy(); // 拿到当前对象的代理对象,其实就是IVoucherOrderService这个接口的代理对象,返回的是Object,做个强转
return proxy.createVoucherOrder(voucherId); // 报错了是因为我们的接口中没有这个方法,那我们就在接口中创建一下这个方法
}finally{
// 释放锁
lock.unlock();
}
}
Redisson可重入原理
可重入其实就是在尝试获取锁的时候判断一下是不是自己本身拿着锁,要是自己拿着锁,就可以获取,给重入标识+1
因此不仅记录线程标识,还要记录一个重入次数,那使用这个string类型的数据结构就不行了,那就得使用hash结构,因为hash结构可以保存多个键值对。
但用了hash之后就没了nx ex这样的指令了,所以只能分开做,先判断锁存在不存在,如果锁不存在那就获取锁并且添加线程标识,在设置有效期。
但要是锁已经存在的话,判断一下是不是自己的锁,要是判断发现是自己的锁,那就是重入。
现在为了实现锁的可重入,逻辑明显变得复杂了,就不能使用Java代码来实现了,那就必须要通过lua脚本来实现了。
获取lua脚本如下:
local key = KEYS[1]; -- 锁的key
local threadId = ARGV[1]; -- 线程的唯一标识
local releaseTime = ARGV[2]; -- 锁的自动释放时间
-- 判断是否存在
if(redis.call('exists',key) == 0) then
-- 不存在, 获取锁
redis.call('hset', key, threadId, '1');
-- 设置有效期
redis.call('expire', key, releaseTime);
return 1; -- 返回结果
end;
-- 锁已经存在,判断threadId是否是自己
if(redis.call('hexists', key, threadId) == 1) then
-- 不存在, 获取锁,重入次数+1;
redis.call('hincrby', key, threadId, '1');
-- 设置有效期
redis.call('expire',key, releaseTime);
end;
return 0; -- 代码走到这里,说明获取锁的不是自己,获取锁失败
释放锁的lua脚本:
local key = KEYS[1]; -- 锁的key
local threadId = ARGV[1]; -- 线程的唯一标识
local releaseTime = ARGV[2]; -- 锁的自动释放时间
-- 判断当前锁是否还是被自己持有
if(redis.call('HEXISTS', key, threadId) == 0) then
return nil; -- 如果已经不是自己,则直接返回
end;
-- 是自己的锁,则重入次数-1;
local count = redis.call('HINCRBY', key, threadId, -1);
-- 判断是否重入次数是否已经为0
if(count > 0) then
-- 大于0说明不能释放锁,重置有效期然后返回
redis.call('EXPIRE', key, releaseTime);
return nil;
else -- 等于0 说明可以释放锁,直接删除
redis.call('DEL', key);
return nil;
end;
可以追踪redisson的源码看到
Redisson的锁重试和WatchDog机制
追踪源码
Redisson的multiLock原理
这主要是针对redis的主从一致性导致锁失效的问题
那redis说,既然是主从的关系导致的锁失效,那我们就不要主从结构了,大家都当主,只有在每一个节点都拿到锁,才算获取锁成功。
这下不仅解决了主从一致性,而且保障了可用性,那想让可用性更强一点,那我们可以给节点建立主从关系
这样依旧可以保证我们的可用性,假设有一台主节点宕机了,在同步的过程中,有线程进来拿锁,但是因为还没有同步完成,这个线程拿不到全部主节点的锁,也就无法获取锁,也就不会出现锁失效的问题。
RedissonMultiLock源码追踪看一下。