本博客为个人学习笔记,学习网站与详细见:黑马程序员Redis入门到实战 P56 - P63
目录
分布式锁介绍
基于Redis的分布式锁
Redis锁代码实现
修改业务代码
分布式锁误删问题
分布式锁原子性问题
Lua脚本
编写脚本
代码优化
总结
分布式锁介绍
在上一篇文章 Redis实战—优惠卷秒杀 中,我们通过使用锁、事务和代理对象实现了“一人一单”的优惠券秒杀功能。但我们使用的锁是基于JVM内部的锁,这导致锁的范围只能限制单个JVM的线程操作,因此在集群情况下,依然会出现超卖问题。所以我们需要设置一个锁,使其能够同时限制集群中的多个JVM线程操作,而这个锁就是分布式锁,由此引出本文。
集群情况下JVM锁的使用情况如下图。
集群情况下分布式锁的使用情况如下图。
分布式锁的实现
基于Redis的分布式锁
我们利用Redis的SET lock thread1 NX操作来模拟获取锁,即如果当前不存在lock键,则添加lock键成功,如果当前存在lock键,则添加lock键失败。我们将添加lock键的操作视为获取锁的操作,将lock键是否存在视为当前锁是否已被其他线程获取。执行语句后,通过Redis返回OK或者nil,我们可以判断是否获取锁成功。为防止宕机时无法对锁进行销毁,我们在进行SET操作时还需通过EX为键设置一个合理的时间。
Redis锁代码实现
// 接口类
public interface ILock {
/*
* 尝试获取锁
* timeoutSec 锁持有的超时时间,过期后自动释放
* 返回值 true代表获取锁成功;false代表获取锁失败
* */
boolean tryLock(long timeoutSec);
//释放锁
void unlock();
}
// 接口实现类
public class SimpleRedisLock implements ILock {
private String name;
private StringRedisTemplate stringRedisTemplate;
public SimpleRedisLock(String name, StringRedisTemplate stringRedisTemplate) {
this.name = name;
this.stringRedisTemplate = stringRedisTemplate;
}
private static final String KEY_PREFIX = "lock:";
@Override
public boolean tryLock(long timeoutSec) {
// 获取线程标识
long threadId = Thread.currentThread().getId();
// 获取锁,并添加时间
Boolean success = stringRedisTemplate.opsForValue()
.setIfAbsent(KEY_PREFIX + name, threadId + " ", timeoutSec, TimeUnit.SECONDS);
//避免拆箱导致空指针,使用Boolean.TRUE.equals方法返回结果
return Boolean.TRUE.equals(success);
}
@Override
public void unlock() {
// 释放锁
stringRedisTemplate.delete(KEY_PREFIX + name);
}
}
修改业务代码
public Result seckillVoucher(Long voucherId) {
//判断是否满足抢购条件
...
Long userId = UserHolder.getUser().getId();
// 创建锁对象,根据用户ID加锁
SimpleRedisLock lock = new SimpleRedisLock("order:" + userId, stringRedisTemplate);
// 获取锁
boolean isLock = lock.tryLock(1200);
// 若获取锁失败
if (!isLock)
return Result.fail("不允许重复下单");
// 若获取锁成功
try {
// 获取当前代理对象(事务)
IVoucherOrderService proxy = (IVoucherOrderService) AopContext.currentProxy();
return proxy.createVoucherOrder(voucherId);
} finally {
lock.unlock();
}
}
分布式锁误删问题
如上图所示,持有锁的线程1在锁的内部出现了业务阻塞,导致它的锁被超时释放。这时线程2尝试获得锁成功,然而在线程2持有锁执行过程中,线程1的业务反应过来,继续执行,而线程1业务执行完成后,进行了删除锁逻辑,此时就会把本应属于线程2的锁进行删除,这就是误删其它线程锁的情况。
解决方案:当线程创建锁时,同时为该锁添加当前线程标识,该标识由UUID随机数为前缀与线程id组合而成(为避免出现集群下两个线程的id相同的情况,因此添加UUID前缀)。当一个线程删除锁时,需要判断当前线程标识与锁标识是否一致,若一致,说明该锁由当前线程创建,可进行删除;若不一致,说明该锁由其它线程创建,不可进行删除。
对simpleRedisLock类代码优化如下。
package com.hmdp.utils;
import cn.hutool.core.lang.UUID;
import org.springframework.data.redis.core.StringRedisTemplate;
import java.util.concurrent.TimeUnit;
public class SimpleRedisLock implements ILock {
private String name;
private StringRedisTemplate stringRedisTemplate;
public SimpleRedisLock(String name, StringRedisTemplate stringRedisTemplate) {
this.name = name;
this.stringRedisTemplate = stringRedisTemplate;
}
private static final String KEY_PREFIX = "lock:";
private static final String ID_PREFIX = UUID.randomUUID().toString(true) + "-";
@Override
public boolean tryLock(long timeoutSec) {
// 获取线程标识
String threadId = ID_PREFIX + Thread.currentThread().getId();
// 获取锁,并设置标识、添加时间
Boolean success = stringRedisTemplate.opsForValue()
.setIfAbsent(KEY_PREFIX + name, threadId, timeoutSec, TimeUnit.SECONDS);
//避免拆箱导致空指针,使用Boolean.TRUE.equals方法返回结果
return Boolean.TRUE.equals(success);
}
@Override
public void unlock() {
// 获取线程标识
String threadId = ID_PREFIX + Thread.currentThread().getId();
// 获取锁标识
String lockID = stringRedisTemplate.opsForValue().get(KEY_PREFIX + name);
// 判断标识是否一致
if(threadId.equals(lockID))
stringRedisTemplate.delete(KEY_PREFIX + name);
}
}
分布式锁原子性问题
如上图所示,线程1执行业务结束后,进行释放锁的操作,在对锁的标识进行判断后,开始释放锁。但是,线程1在"判断结束"到"释放锁"的期间,受到了阻塞(遇到JVM垃圾回收机制时会暂停程序,导致阻塞),这时线程2获取锁。当线程1恢复后,继续进行释放锁的操作,将会误删线程2的锁。我们前面设置了锁标识,并且要求在释放锁之前需要做一个判断,但在判断可以释放锁后,如果遇到了阻塞,将可能导致上图所示的误删操作。
解决方法:我们需要实现"判断"和"释放锁"这两条命令的原子性问题。
Lua脚本
Redis提供了Lua脚本功能,在一个脚本中编写多条Redis命令,能够确保多条命令执行时的原子性。Lua是一种编程语言,其基本语法可以参考网站:Lua 教程 | 菜鸟教程。这里重点介绍Redis提供的调用函数,我们可以使用lua去操作redis,以保证多条redis命令的原子性,这样就可以实现拿锁、判断、删锁多条命令的原子性动作了,作为一名Java程序员这一块并不需要大家过于精通,只需要知道它有什么作用即可。
编写脚本
我们需要在resources文件中新建.lua文件(如果没有该新建项,需要下载EmmyLua插件),并在其中添加下图中的脚本内容。
代码优化
优化后的代码如下。
public class SimpleRedisLock implements ILock {
private String name;
private StringRedisTemplate stringRedisTemplate;
public SimpleRedisLock(String name, StringRedisTemplate stringRedisTemplate) {
this.name = name;
this.stringRedisTemplate = stringRedisTemplate;
}
private static final String KEY_PREFIX = "lock:";
private static final String ID_PREFIX = UUID.randomUUID().toString(true) + "-";
private static final DefaultRedisScript<Long> UNLOCK_SCRIPT;
//初始化UNLOCK_SCRIPT
static {
UNLOCK_SCRIPT = new DefaultRedisScript<>();
UNLOCK_SCRIPT.setLocation(new ClassPathResource("unlock.lua"));
//初始化返回值
UNLOCK_SCRIPT.setResultType(Long.class);
}
@Override
public boolean tryLock(long timeoutSec) {
// 获取线程标识
String threadId = ID_PREFIX + Thread.currentThread().getId();
// 获取锁,并设置锁标识、添加时间
Boolean success = stringRedisTemplate.opsForValue()
.setIfAbsent(KEY_PREFIX + name, threadId, timeoutSec, TimeUnit.SECONDS);
//避免拆箱导致空指针,使用Boolean.TRUE.equals方法返回结果
return Boolean.TRUE.equals(success);
}
@Override
public void unlock() {
// 调用lua脚本
stringRedisTemplate.execute(
UNLOCK_SCRIPT,
//要求传入KEYS集合,使用Collections单元素集合工具
Collections.singletonList(KEY_PREFIX + name),
//线程标识
ID_PREFIX + Thread.currentThread().getId());
}
/* @Override
public void unlock() {
// 获取线程标识
String threadId = ID_PREFIX + Thread.currentThread().getId();
// 获取锁标识
String lockID = stringRedisTemplate.opsForValue().get(KEY_PREFIX + name);
// 判断标识是否一致
if(threadId.equals(lockID))
stringRedisTemplate.delete(KEY_PREFIX + name);
}*/
}
总结
基于Redis的分布式锁实现思路
· 利用set nxex获取锁,并设置过期时间,保存线程标识
· 释放锁时先判断线程标识是否与锁标识一致,若一致则删除锁
特性
· 利用set nx满足互斥性
· 利用set ex保证故障时锁依然能释放,避免死锁,提高安全性
· 利用redis集群保证高可用和高并发特性(本文未涉及)