本文从一个经典的库存超卖问题分析说明常见锁的应用,假设库存资源存储在Redis里面。
假设我们的减库存代码如下:
@Autowired
StringRedisTemplate redisTemplate;
public void deduct(){
String stock = redisTemplate.opsForValue().get("stock");
if(StringUtils.hasLength(stock)){
Integer st = Integer.valueOf(stock);
if(st>0){
redisTemplate.opsForValue().set("stock",String.valueOf(--st));
}
}
}
此时方法操作是先读后写
,非原子性操作,是存在并发问题的。如何解决该问题,有三种方案:
- JVM本地锁
- Redis乐观锁
- Redis实现分布式锁
JVM本地锁的实现与优缺点在从库存超卖问题分析锁和分布式锁的应用(一)已经分析过了,这里不再赘述。
【1】Redis乐观锁
也就是watch
、multi
与exec
组合指令的使用。
watch可以监控一个或多个key的值,如果在事务(exec)执行之前,key的值发生变化则取消事务执行。
multi用来开启事务,exec用来提交/执行事务。
watch stock
multi
set stock 5000
exec
代码修改如下:
public void deduct(){
this.redisTemplate.execute(new SessionCallback() {
@Override
public Object execute(RedisOperations operations) throws DataAccessException {
operations.watch("stock");
// 1. 查询库存信息
Object stock = operations.opsForValue().get("stock");
// 2. 判断库存是否充足
int st = 0;
if (stock != null && (st = Integer.parseInt(stock.toString())) > 0) {
// 3. 扣减库存
operations.multi();//开启事务
operations.opsForValue().set("stock", String.valueOf(--st));
List exec = operations.exec();//执行事务
if (exec == null || exec.size() == 0) {
try {
// 这里睡眠一下,降低竞争,提高乐观锁的吞吐量
Thread.sleep(50);
} catch (InterruptedException e) {
e.printStackTrace();
}
//再次递归
deduct();
}
return exec;
}
return null;
}
});
}
这种方式确实可以解决并发问题,但也可能在高并发的情况下由于不断重试(CAS思想)出现性能问题、连接被耗尽的情况。
【2】Redis分布式锁
① 基于setnx思想简单实现
借助于redis中的命令setnx(key, value),key不存在就新增,存在就什么都不做。同时有多个客户端发送setnx命令,只有一个客户端可以成功,返回1(true);其他的客户端返回0(false)。
// 递归思想
public void deduct(){
//获取锁
Boolean lock = redisTemplate.opsForValue().setIfAbsent("lock", "1111");
//如果获取不到则递归重试
if(!lock){
deduct();
}else{
try{
String stock = redisTemplate.opsForValue().get("stock");
if(StringUtils.hasLength(stock)){
Integer st = Integer.valueOf(stock);
if(st>0){
redisTemplate.opsForValue().set("stock",String.valueOf(--st));
}
}
}finally {
//释放锁
redisTemplate.delete("lock");
}
}
}
或者使用while思想:
public void deduct(){
//当setnx刚刚获取到锁,当前服务器宕机,导致del释放锁无法执行,进而导致锁无法锁无法释放(死锁)
while (Boolean.FALSE.equals(this.redisTemplate.opsForValue().setIfAbsent("lock", "xxx"))){
try {
Thread.sleep(100);
}catch (Exception e){
e.printStackTrace();
}
}
try{
String stock = redisTemplate.opsForValue().get("stock");
if(StringUtils.hasLength(stock)){
Integer st = Integer.valueOf(stock);
if(st>0){
redisTemplate.opsForValue().set("stock",String.valueOf(--st));
}
}
}finally {
//释放锁
redisTemplate.delete("lock");
}
}
这种方式存在问题:当setnx刚刚获取到锁,当前服务器宕机,导致del释放锁无法执行,进而导致锁无法锁无法释放(死锁)
解决方案:给锁设置过期时间,自动释放锁。
设置过期时间两种方式:
- 通过expire设置过期时间(缺乏原子性:如果在setnx和expire之间出现异常,锁也无法释放)
- 使用set指令设置过期时间:
set key value ex 3 nx
(既达到setnx的效果,又设置了过期时间)
② 防死锁优化
修改while中获取锁的逻辑如下所示:
while (Boolean.FALSE.equals(this.redisTemplate.opsForValue().setIfAbsent("lock", "xxx",3, TimeUnit.SECONDS)){
try {
Thread.sleep(100);
}catch (Exception e){
e.printStackTrace();
}
}
这种方式解决了死锁问题但是可能会释放其他服务器的锁。
场景:如果业务逻辑的执行时间是7s。执行流程如下
- index1业务逻辑没执行完,3秒后锁被自动释放。
- index2获取到锁,执行业务逻辑,3秒后锁被自动释放。
- index3获取到锁,执行业务逻辑
- index1业务逻辑执行完成,开始调用del释放锁,这时释放的是index3的锁,导致index3的业务只
执行1s就被别人释放。最终等于没锁的情况。
解决:setnx获取锁时,设置一个指定的唯一值(例如:uuid);释放前获取这个值,判断是否自己的锁
③ 防误删优化
如下这里设置锁的密钥为UUID,加锁者持有。
public void deduct(){
String uuid = UUID.randomUUID().toString();
while (Boolean.FALSE.equals(this.redisTemplate.opsForValue().setIfAbsent("lock", uuid,3, TimeUnit.SECONDS)){
try {
Thread.sleep(100);
}catch (Exception e){
e.printStackTrace();
}
}
try{
String stock = redisTemplate.opsForValue().get("stock");
if(StringUtils.hasLength(stock)){
Integer st = Integer.valueOf(stock);
if(st>0){
redisTemplate.opsForValue().set("stock",String.valueOf(--st));
}
}
}finally {
//释放锁
if(uuid.equals(redisTemplate.opsForValue().get("lock"))){
redisTemplate.delete("lock");
}
}
}
这种方式仍旧存在问题:删除操作缺乏原子性。
场景:
- index1执行删除时,查询到的lock值确实和uuid相等
- index1执行删除前,lock刚好过期时间已到,被redis自动释放
- index2获取了lock
- index1执行删除,此时会把index2的lock删除
解决方案:没有一个命令可以同时做到判断 + 删除,所有只能通过其他方式实现(LUA脚本)
④ lua脚本保证删除原子性
redis采用单线程架构,可以保证单个命令的原子性,但是无法保证一组命令在高并发场景下的原子性。
如下AB两个进程示例:
在串行场景下:A和B的值肯定都是3。在并发场景下:A和B的值可能在0-6之间。
极限情况下1:则A的结果是0,B的结果是3
极限情况下2:则A和B的结果都是6
如果redis客户端通过lua脚本把3个命令一次性发送给redis服务器,那么这三个指令就不会被其他客户端指令打断。Redis 也保证脚本会以原子性(atomic)的方式执行: 当某个脚本正在运行的时候,不会有其他脚本或 Redis 命令被执行。 这和使用 MULTI/ EXEC 包围的事务很类似。
但是MULTI/ EXEC方法来使用事务功能,将一组命令打包执行,无法进行业务逻辑的操作。这期间有某一条命令执行报错(例如给字符串自增),其他的命令还是会执行,并不会回滚。
优化代码如下所示:
public void deduct(){
String uuid = UUID.randomUUID().toString();
while (Boolean.FALSE.equals(this.redisTemplate.opsForValue().setIfAbsent("lock", uuid,3, TimeUnit.SECONDS))){
try {
Thread.sleep(100);
}catch (Exception e){
e.printStackTrace();
}
}
try{
String stock = redisTemplate.opsForValue().get("stock");
if(StringUtils.hasLength(stock)){
int st = Integer.parseInt(stock);
if(st>0){
redisTemplate.opsForValue().set("stock",String.valueOf(--st));
}
}
}finally {
// 先判断是否自己的锁,再解锁
String script = "if redis.call('get', KEYS[1]) == ARGV[1] " +
"then " +
" return redis.call('del', KEYS[1]) " +
"else " +
" return 0 " +
"end";
this.redisTemplate.execute(new DefaultRedisScript<>(script, Boolean.class), Collections.singletonList("lock"), uuid);
// //释放锁
// if(uuid.equals(redisTemplate.opsForValue().get("lock"))){
// redisTemplate.delete("lock");
// }
}
}
到这里似乎完美解决了我们考虑到的几点问题,那么结束了吗?
并没有,目前这种方式不支持可重入性、并且集群环境下也存在失效情况
。更甚者如果由于异常情况,获取锁后服务逻辑未执行完毕,锁就自动释放了呢
?