↑↑↑请在文章开头处下载测试项目源代码↑↑↑
文章目录
- 前言
- 4.4 分布式锁
- 4.4.4 分布式锁的误删问题
- 4.4.4.1 问题说明
- 4.4.4.2 解决方案
- 4.4.4.3 代码实现
- 4.4.5 Redis分布式锁的原子性问题
- 4.4.5.1 问题说明
- 4.4.5.2 解决方案
- 4.4.5.3 代码实现
- 4.4.6 分布式锁小结
- 4.5 分布式锁-Redisson
- 4.5.1 功能介绍
- 4.5.2 快速入门
- 4.5.3 可重入锁原理分析
- 4.5.3.1 获取锁的原理
- 4.5.3.2 释放锁的原理
- 4.5.3.3 测试
前言
Redis实战系列文章:
Redis从入门到精通(四)Redis实战(一)短信登录
Redis从入门到精通(五)Redis实战(二)商户查询缓存
Redis从入门到精通(六)Redis实战(三)优惠券秒杀
Redis从入门到精通(七)Redis实战(四)库存超卖、一人一单与Redis分布式锁
4.4 分布式锁
上一节利用Redis的分布式锁实现了“一人一单”的需求:线程1在拿到互斥锁后,创建订单,最后释放锁;其他线程由于无法获得锁,所以不会进行订单创建。
这一节继续来讨论分布式锁中存在的问题。
4.4.4 分布式锁的误删问题
4.4.4.1 问题说明
假设持有锁的线程1在锁的内部出现了阻塞,导致它的锁自动释放(Redis分布式锁设置了超时时间),这时线程2来尝试获得锁,也能成功拿到了这把锁。
线程2在持有锁执行业务的过程中,线程1反应过来,继续执行并走到了删除锁逻辑,此时就会把本应该属于线程2的锁进行删除。
这就是误删别人锁的情况,如下图所示:
4.4.4.2 解决方案
如果每个线程在释放锁的时候,能够判断出当前这把锁是否属于自己,如果不属于自己,则不进行锁的删除;如果属于自己,才进行删除。这样就能解决误删问题。
那如何判断当前这把锁是否属于自己呢?
我们在上一节编写的tryLock()
方法用于尝试获取锁,其代码如下:
// com.star.redis.dzdp.utils.SimpleRedisLock
@Override
public boolean tryLock(long timeout) {
// 1.获取线程ID
long threadId = Thread.currentThread().getId();
// 2.获取锁,并设置超时时间
Boolean flag = stringRedisTemplate.opsForValue().setIfAbsent("lock:" + key, threadId + "", timeout, TimeUnit.SECONDS);
log.info("set to Redis : Key = {}, Value = {}. set result = {}", "lock:" + key, threadId, flag);
// 3.返回
return BooleanUtil.isTrue(flag);
}
可见,锁对象的Value值是当前线程的ID,这实际上就是判断锁是否属于自己的依据。
在调用unlock()
方法释放锁时,先获取Value值,判断该Value值是否就是当前线程的ID,如果是,则说明是自己的锁,可以释放;否则不是自己的锁,不能释放。
4.4.4.3 代码实现
对unlock()
方法进行改造:
// com.star.redis.dzdp.utils.SimpleRedisLock
@Override
public void unlock() {
// 1.获取当前线程ID
String threadId = Thread.currentThread().getId() + "";
// 2.获取锁对象
String cache = stringRedisTemplate.opsForValue().get("lock:" + key);
log.info("threadId = {}, cache = {}", threadId, cache);
// 3.判断锁对象是否保存了当前线程ID
if(threadId.equals(cache)) {
// 释放锁
Boolean flag = stringRedisTemplate.delete("lock:" + key);
log.info("del from to Redis : Key = {}. del result = {}", "lock:" + key, flag);
}
}
我们可以利用IDEA模拟出这种场景,其日志如下:
// 线程5进入
[http-nio-8081-exec-5] 开始秒杀下单...voucherId = 14, userId = 1012
[http-nio-8081-exec-5] ==> Preparing: SELECT voucher_id,stock,create_time,begin_time,end_time,update_time FROM tb_seckill_voucher WHERE voucher_id=?
[http-nio-8081-exec-5] ==> Parameters: 14(Long)
[http-nio-8081-exec-5] <== Total: 1
[http-nio-8081-exec-5] SeckillVoucher(voucherId=14, stock=993, createTime=Fri Apr 05 19:36:15 CST 2024, beginTime=Fri Apr 05 14:00:00 CST 2024, endTime=Sat Apr 06 18:00:00 CST 2024, updateTime=Fri Apr 05 22:53:22 CST 2024)
// 线程5拿到了锁,Value值是39
// 线程5进入阻塞,期间锁被自动释放了
[http-nio-8081-exec-5] set to Redis : Key = lock:voucher_order:1012, Value = 39. set result = true
[http-nio-8081-exec-5] begin checkAndCreateVoucherOrder... voucherId = 14, userId = 1012
[http-nio-8081-exec-5] ==> Preparing: SELECT COUNT( * ) FROM tb_voucher_order WHERE (voucher_id = ? AND user_id = ?)
// 线程6进入
[http-nio-8081-exec-6] 开始秒杀下单...voucherId = 14, userId = 1012
[http-nio-8081-exec-5] ==> Parameters: 14(Long), 1012(Long)
[http-nio-8081-exec-6] ==> Preparing: SELECT voucher_id,stock,create_time,begin_time,end_time,update_time FROM tb_seckill_voucher WHERE voucher_id=?
[http-nio-8081-exec-6] ==> Parameters: 14(Long)
[http-nio-8081-exec-5] <== Total: 1
[http-nio-8081-exec-6] <== Total: 1
[http-nio-8081-exec-5] old order count = 0
[http-nio-8081-exec-6] SeckillVoucher(voucherId=14, stock=993, createTime=Fri Apr 05 19:36:15 CST 2024, beginTime=Fri Apr 05 14:00:00 CST 2024, endTime=Sat Apr 06 18:00:00 CST 2024, updateTime=Fri Apr 05 22:53:22 CST 2024)
[http-nio-8081-exec-5] ==> Preparing: UPDATE tb_seckill_voucher SET stock = stock - 1 WHERE (voucher_id = ? AND stock > ?)
[http-nio-8081-exec-5] ==> Parameters: 14(Long), 0(Integer)
// 线程6也拿到了锁,Value值是40,说明线程5的锁被自动释放了
[http-nio-8081-exec-6] set to Redis : Key = lock:voucher_order:1012, Value = 40. set result = true
[http-nio-8081-exec-5] <== Updates: 1
[http-nio-8081-exec-6] begin checkAndCreateVoucherOrder... voucherId = 14, userId = 1012
[http-nio-8081-exec-5] update result = true
[http-nio-8081-exec-6] ==> Preparing: SELECT COUNT( * ) FROM tb_voucher_order WHERE (voucher_id = ? AND user_id = ?)
[http-nio-8081-exec-6] ==> Parameters: 14(Long), 1012(Long)
[http-nio-8081-exec-5] get orderId = 7354397016337678337
[http-nio-8081-exec-6] <== Total: 1
[http-nio-8081-exec-6] old order count = 0
[http-nio-8081-exec-6] ==> Preparing: UPDATE tb_seckill_voucher SET stock = stock - 1 WHERE (voucher_id = ? AND stock > ?)
[http-nio-8081-exec-5] ==> Preparing: INSERT INTO tb_voucher_order ( id, user_id, voucher_id, pay_time ) VALUES ( ?, ?, ?, ? )
[http-nio-8081-exec-6] ==> Parameters: 14(Long), 0(Integer)
[http-nio-8081-exec-5] ==> Parameters: 7354397016337678337(Long), 1012(Long), 14(Long), 2024-04-05 22:56:31.779(Timestamp)
[http-nio-8081-exec-5] <== Updates: 1
// 线程5进入释放锁方法,但发现不是自己线程的锁,所以不释放
[http-nio-8081-exec-5] threadId = 39, cache = 40
[http-nio-8081-exec-6] <== Updates: 1
[http-nio-8081-exec-6] update result = true
[http-nio-8081-exec-6] get orderId = 7354397067877285889
[http-nio-8081-exec-6] ==> Preparing: INSERT INTO tb_voucher_order ( id, user_id, voucher_id, pay_time ) VALUES ( ?, ?, ?, ? )
[http-nio-8081-exec-6] ==> Parameters: 7354397067877285889(Long), 1012(Long), 14(Long), 2024-04-05 22:56:43.615(Timestamp)
[http-nio-8081-exec-6] <== Updates: 1
// 线程6进入释放锁方法,发现是自己线程的锁,所以释放
[http-nio-8081-exec-6] threadId = 40, cache = 40
[http-nio-8081-exec-6] del from to Redis : Key = lock:voucher_order:1012. del result = true
4.4.5 Redis分布式锁的原子性问题
4.4.5.1 问题说明
对于Redis分布式锁的误删问题,我们通过在删除前进行判断是否是自己的锁来解决。但还有一种更加极端的情况:在完成判断之后,也就是判断出确实是自己的锁之后,锁到期自动释放了。但由于已经判断过了,还是要继续删,因此还是会把别人的锁给删了。
这种情况相当于判断条件没有起到作用,之所以有这个问题,是因为线程的取锁、比锁、删锁并不是原子性的。
4.4.5.2 解决方案
Redis提供了Lua脚本功能,即在一个Lua脚本中编写多条Redis命令,调用这个脚本可以确保这多条命令执行时的原子性。
Lua脚本中调用Redis命令的语法如下:
redis.call('命令名称', 'key', '其它参数', ...)
例如:
# 执行 set name jack
redis.call('set', 'name', 'jack')
# 执行 get name
local name = redis.call('get', 'name')
# 返回 name
return name
Redis提供了EVAL
方法来调用Lua脚本:
例如:
127.0.0.1:6379> eval 'return redis.call("set","name","Jack")' 0
OK
127.0.0.1:6379> eval 'return redis.call("get","name")' 0
"Jack"
如果脚本中的Key、Value不想写死,可以作为参数传递。Key参数会放入KEYS数组,其它参数会放入ARGV数组,在脚本中可以从KEYS和ARGV数组获取这些参数:
127.0.0.1:6379> eval 'return redis.call("set",KEYS[1],ARGV[1])' 1 name Rose
OK
127.0.0.1:6379> eval 'return redis.call("get",KEYS[1])' 1 name
"Rose"
4.4.5.3 代码实现
当前释放锁的逻辑是这样的:获取锁中的Value值,并与当前线程ID进行比较,如果一致则释放锁,否则什么都不做。
将以上逻辑转换成Lua脚本。在resources目录下新建一个unlock.lua文件,内容如下:
-- unlock.lua
-- 这里的 KEYS[1] 就是锁的Key,ARGV[1] 就是当前线程ID
-- 获取锁中的线程ID,判断是否与当前线程ID一致
if(redis.call('GET', KEYS[1]) == ARGV[1]) then
-- 一致,则删除锁
return redis.call('DEL', KEYS[1])
end
-- 不一致,则直接返回
return 0
接下来使用Java代码调用调用Lua脚本改造分布式锁。在RedisTemplate类中,提供了一个重载的execute()
方法去执行脚本:
// org.springframework.data.redis.core.RedisTemplate
@Override
public <T> T execute(RedisScript<T> script, List<K> keys, Object... args) {
return scriptExecutor.execute(script, keys, args);
}
改造SimpleRedisLock类的unlock()
方法:
// com.star.redis.dzdp.utils.SimpleRedisLock
private static final DefaultRedisScript<Long> UNLOCK_SCRIPT;
static {
UNLOCK_SCRIPT = new DefaultRedisScript<>();
UNLOCK_SCRIPT.setLocation(new ClassPathResource("unlock.lua"));
UNLOCK_SCRIPT.setResultType(Long.class);
}
@Override
public void unlock() {
// 1.获取当前线程ID
String threadId = Thread.currentThread().getId() + "";
// 2.获取锁对象
// String cache = stringRedisTemplate.opsForValue().get("lock:" + key);
// log.info("threadId = {}, cache = {}", threadId, cache);
// // 3.判断锁对象是否保存了当前线程ID
// if(threadId.equals(cache)) {
// // 释放锁
// Boolean flag = stringRedisTemplate.delete("lock:" + key);
// log.info("del from to Redis : Key = {}. del result = {}", "lock:" + key, flag);
// }
// 改为调用Lua脚本
Long flag = stringRedisTemplate.execute(UNLOCK_SCRIPT, Collections.singletonList("lock:" + key),
threadId);
log.info("del from to Redis : Key = {}. del result = {}", "lock:" + key, flag);
}
经过以上代码改造后,能够实现拿锁、比锁、删锁的原子性动作。简单测试一下,调用/voucher/seckill/order
接口进行下单,其日志如下:
[http-nio-8081-exec-7] 开始秒杀下单...voucherId = 14, userId = 1012
[http-nio-8081-exec-7] ==> Preparing: SELECT voucher_id,stock,create_time,begin_time,end_time,update_time FROM tb_seckill_voucher WHERE voucher_id=?
[http-nio-8081-exec-7] ==> Parameters: 14(Long)
[http-nio-8081-exec-7] <== Total: 1
[http-nio-8081-exec-7] SeckillVoucher(voucherId=14, stock=982, createTime=Fri Apr 05 19:36:15 CST 2024, beginTime=Fri Apr 05 14:00:00 CST 2024, endTime=Sat Apr 06 18:00:00 CST 2024, updateTime=Fri Apr 05 23:11:27 CST 2024)
// 成功获取到锁
[http-nio-8081-exec-7] set to Redis : Key = lock:voucher_order:1012, Value = 44. set result = true
[http-nio-8081-exec-7] begin checkAndCreateVoucherOrder... voucherId = 14, userId = 1012
[http-nio-8081-exec-7] ==> Preparing: SELECT COUNT( * ) FROM tb_voucher_order WHERE (voucher_id = ? AND user_id = ?)
[http-nio-8081-exec-7] ==> Parameters: 14(Long), 1012(Long)
[http-nio-8081-exec-7] <== Total: 1
[http-nio-8081-exec-7] old order count = 0
[http-nio-8081-exec-7] ==> Preparing: UPDATE tb_seckill_voucher SET stock = stock - 1 WHERE (voucher_id = ? AND stock > ?)
[http-nio-8081-exec-7] ==> Parameters: 14(Long), 0(Integer)
[http-nio-8081-exec-7] <== Updates: 1
[http-nio-8081-exec-7] update result = true
[http-nio-8081-exec-7] get orderId = 7354571748492181505
[http-nio-8081-exec-7] ==> Preparing: INSERT INTO tb_voucher_order ( id, user_id, voucher_id, pay_time ) VALUES ( ?, ?, ?, ? )
[http-nio-8081-exec-7] ==> Parameters: 7354571748492181505(Long), 1012(Long), 14(Long), 2024-04-06 10:14:34.286(Timestamp)
[http-nio-8081-exec-7] <== Updates: 1
// 成功释放锁,说明调用Lua脚本成功
[http-nio-8081-exec-7] del from to Redis : Key = lock:voucher_order:1012. del result = 1
4.4.6 分布式锁小结
总结一下基于Redis的分布式锁实现思路:
- 利用
SETNX
获取锁,并设置过期时间,保存当前线程ID; - 释放锁时先比较锁中保存的线程ID与当前线程的ID是否一致,一致时才删除锁。
4.5 分布式锁-Redisson
4.5.1 功能介绍
基于SETNX
方法实现的分布式锁存在下面的问题:
- 重入问题:是指获得锁的线程可以再次进入到相同的锁代码块中。可重入锁的意义在于防止死锁。
- 不可重试:是指目前的分布式锁只能尝试获取一次。而更合理的情况是:当线程在获得锁失败后,能再次尝试获得锁。
- 超时释放:在加锁时增加了过期时间,这样可以防止死锁,但是如果卡顿的时间超长,虽然采用了Lua表达式防止误删,但这毕竟是没有锁住,有安全隐患。
- 主从一致性:如果Redis提供了主从集群,在向集群写数据时,主机需要异步的将数据同步给从机,而万一在同步过去之前,主机宕机了,就会出现死锁问题。
那么什么是Redisson呢?(Redis-son,难道是Redis的儿子…???)
Redisson是一个在Redis的基础上实现的Java驻内存数据网格(In-Memory Data Grid)。它不仅提供了一系列的分布式的Java常用对象,还提供了许多分布式服务,其中就包含了各种分布式锁的实现。
Redisson提供了分布式锁的多种多样的功能,例如可重入锁、公平锁、联锁、红锁、读写锁等等。
4.5.2 快速入门
- 1)引入依赖
<!--pom.xml-->
<dependency>
<groupId>org.redisson</groupId>
<artifactId>redisson</artifactId>
<version>3.13.6</version>
</dependency>
- 2)配置Redisson客户端
// com.star.redis.dzdp.utils.RedissonConfig
@Configuration
public class RedissonConfig {
@Bean
public RedissonClient redissonClient() {
Config config = new Config();
config.useSingleServer().setAddress("redis://192.168.146.128:6379")
.setPassword("123321");
return Redisson.create(config);
}
}
- 3)使用Redisson的分布式锁
@RunWith(SpringRunner.class)
@SpringBootTest(classes = DzdpApp.class)
public class TestRedisson {
@Resource
private RedissonClient redissonClient;
@Test
public void testRedisson() throws Exception {
// 获取锁(可重入),指定锁的名称
RLock lock = redissonClient.getLock("lock:redisson");
// 尝试获取锁,参数分别是:获取锁的最大等待时间(期间会重试),锁自动释放时间,时间单位
boolean isLock = lock.tryLock(1, 100, TimeUnit.SECONDS);
// 判断获取锁成功
if (isLock) {
try {
System.out.println("执行业务...");
} finally {
// 释放锁
lock.unlock();
}
}
}
}
运行以上单元测试,获取锁成功后,会在Redis中存储一个Hash数据结构,Key为锁的名称,Field为当前操作的线程ID,Value为锁重入的次数:
4.5.3 可重入锁原理分析
4.5.3.1 获取锁的原理
通过阅读源码,我们可以在org.redisson.RedissonLock类的tryLockInnerAsync()
方法中找到获取锁时执行的Lua脚本:
-- org.redisson.RedissonLock#tryLockInnerAsync()
-- 参数说明:
-- KEYS[1] 锁的名称
-- ARGV[1] 超时时间
-- ARGV[2] 当前操作的线程ID
-- 判断锁是否存在
if (redis.call('exists', KEYS[1]) == 0) then
-- 锁不存在
-- 通过hincrby命令将锁的计数器加1
redis.call('hincrby', KEYS[1], ARGV[2], 1);
-- 刷新锁的过期时间
redis.call('pexpire', KEYS[1], ARGV[1]);
return nil;
end;
-- 锁存在,再判断当前线程是否持有锁
if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then
-- 当前线程持有锁
-- 通过hincrby命令将锁的计数器加1
redis.call('hincrby', KEYS[1], ARGV[2], 1);
-- 刷新锁的过期时间
redis.call('pexpire', KEYS[1], ARGV[1]);
return nil;
end;
-- 锁存在,但当前线程不持有锁(说明锁不是当前线程的),则返回锁的剩余生存时间
return redis.call('pttl', KEYS[1]);
这就是Redission可重入锁获取锁的实现,它通过判断当前获取锁的线程是否和Redis保存的锁的线程信息一致。若是,则获取锁成功,可以继续往下执行业务;若不是,则直接返回,无法获取锁。
4.5.3.2 释放锁的原理
同样,通过阅读源码,我们可以在org.redisson.RedissonLock类的unlockInnerAsync()
方法中找到释放锁时执行的Lua脚本:
-- org.redisson.RedissonLock#unlockInnerAsync()
-- 参数说明:
-- KEYS[1] 锁的名称
-- ARGV[1] 释放锁消息
-- ARGV[2] 超时时间
-- ARGV[3] 当前操作的线程ID
-- 判断锁是否存在
if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then
-- 锁不存在,直接返回空
return nil;
end;
-- 锁存在
-- 通过hincrby命令将锁的计数器减1,并获取计数器的值
local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1);
-- 判断计数器是否大于0
if (counter > 0) then
-- 计数器大于0,则表示仍有其他线程持有该锁,通过pexpire命令续约锁的过期时间,并返回0
redis.call('pexpire', KEYS[1], ARGV[2]);
return 0;
else
-- 计数器等于0,则表示当前线程是最后一个持有锁的线程,通过del命令删除锁,并通过publish命令发布一个解锁消息,返回1
redis.call('del', KEYS[1]);
redis.call('publish', KEYS[2], ARGV[1]);
return 1;
end;
return nil;
这就是Redission可重入锁释放锁的实现,它通过减少锁的计数器来实现锁的释放,并根据计数器的值判断是否需要续约或者删除锁。如果当前线程是最后一个持有锁的线程,则会发布一个解锁消息。
用流程图表示以上两个脚本的逻辑如下:
4.5.3.3 测试
@Resource
private RedissonClient redissonClient;
private RLock lock;
@Before
public void init() {
//获取锁(可重入),指定锁的名称
lock = redissonClient.getLock("lock:redisson2");
}
@Test
public void test1() throws Exception {
boolean isLock = lock.tryLock(1, 100, TimeUnit.SECONDS);
if (!isLock) {
log.info("test1 获取锁失败!");
return;
}
try {
log.info("test1 获取锁成功!");
// 保证是同一线程
test2();
} finally {
log.info("test1 释放锁!");
//lock.unlock();
}
}
public void test2() throws Exception {
boolean isLock = lock.tryLock(1, 100, TimeUnit.SECONDS);
if (!isLock) {
log.info("test2 获取锁失败!");
return;
}
try {
log.info("test2 获取锁成功!");
} finally {
log.info("test2 释放锁!");
//lock.unlock();
}
}
执行以上单元测试,在同一线程两次获取锁,控制台打印结果如下:
[main] test1 获取锁成功!
[main] test2 获取锁成功!
[main] test2 释放锁!
[main] test1 释放锁!
如果将释放锁的代码注释掉,还可以在Redis查看此时的锁对象,其Value值为2:
最后,我们将VoucherOrderServiceImpl类的seckillVoucher()
方法的锁替换为Redisson可重入锁:
// com.star.redis.dzdp.service.impl.VoucherOrderServiceImpl#seckillVoucher()
@Resource
private RedissonClient redissonClient;
// 创建锁对象(不再使用)
// SimpleRedisLock simpleRedisLock = new SimpleRedisLock("voucher_order:" + userId, stringRedisTemplate);
// 使用Redisson的可重入锁
RLock simpleRedisLock = redissonClient.getLock("voucher_order:" + userId);
// 尝试获取锁
boolean lock = simpleRedisLock.tryLock();
// 加锁失败
if(!lock) {
return BaseResult.setFail("每个帐号只能抢购一张优惠券!");
}
// 加锁成功,则执行业务代码
try {
return checkAndCreateVoucherOrder(voucherId, userId);
} finally {
// 释放锁
simpleRedisLock.unlock();
}
…
本节完,更多内容请查阅分类专栏:Redis从入门到精通
感兴趣的读者还可以查阅我的另外几个专栏:
- SpringBoot源码解读与原理分析(已完结)
- MyBatis3源码深度解析(已完结)
- 再探Java为面试赋能(持续更新中…)