前言
本文将讨论的做一个高并发场景下避不开的话题,即redis分布式锁。比如在淘宝 的秒杀场景、热点新闻和热搜排行榜等。可见分布式锁是一个程序员面向高级的一门必修课,下面请跟着本篇文章好好学习。
redis分布式锁有哪些面试题
1.Redis做分布式的时候需要注意什么问题?
2.你们公司自己实现的分布式锁是否用的setnx命令实现?这个是最合适的吗?你如何考虑分布式锁的可重入问题?
3.如果Redis是单点部署的,会带来什么问题?准备怎么解决单点问题呢?
Redis集群模式下,比如主从模式下,CAP方面有没有什么问题?
1.分布式锁是什么?
1.1 锁的种类介绍
锁的种类 | 锁的概念 |
---|---|
单机 | 单机版同一个JVM虚拟机内,synchronized或者lock接口 |
分布式 | 分布式多个不同的java虚拟机,单机的线程锁机制不再起作用了,资源类在不同的服务器之间共享了。 |
1.2 一个正经的分布式锁具有哪些刚需
独占性:任何时刻只能有且仅有一个线程持有
高可用:若redis集群环境下,不能因为某个节点挂了而出现获取锁或者释放锁失败。高并发请求下依旧能够保证良好使用。
防止死锁:杜绝死锁,必须有超时控制或者撤销操作,有个兜底终止跳出方案
不乱抢:防止张冠李戴,不能私下uolock别人的锁,只能自己加锁自己释放,自己约的锁自己要释放,可以设置过期时间,或者业务代码执行完毕以后删除对一个的锁。
可重入:同一个节点的同一个线程如果获得锁之后,他也可以再次获得这个锁。
1.3 redis分布式锁
setnx key values
1.4 java实现分布式锁的案例
先来个乞丐版的分布式锁,并没有遵循上面五大原则。然后慢慢进行优化,乞丐版分布锁案例如下代码所示:
public String sale() {
String resMessgae = "";
String key = "luojiaRedisLocak";
String uuidValue = IdUtil.simpleUUID() + ":" + Thread.currentThread().getId();
Boolean flag = stringRedisTemplate.opsForValue().setIfAbsent(key, uuidValue);
// 抢不到的线程继续重试
if (!flag) {
// 线程休眠20毫秒,进行递归重试
try {
TimeUnit.MILLISECONDS.sleep(20);
} catch (InterruptedException e) {
e.printStackTrace();
}
sale();
} else {
try {
// 1 抢锁成功,查询库存信息
String result = stringRedisTemplate.opsForValue().get("inventory01");
// 2 判断库存书否足够
Integer inventoryNum = result == null ? 0 : Integer.parseInt(result);
// 3 扣减库存,每次减少一个库存
if (inventoryNum > 0) {
stringRedisTemplate.opsForValue().set("inventory01", String.valueOf(--inventoryNum));
resMessgae = "成功卖出一个商品,库存剩余:" + inventoryNum + "\t" + ",服务端口号:" + port;
log.info(resMessgae);
} else {
resMessgae = "商品已售罄。" + "\t" + ",服务端口号:" + port;
log.info(resMessgae);
}
} finally {
stringRedisTemplate.delete(key);
}
}
return resMessgae;
}
请看看以上代码有哪些问题?既没有删除过期时间 ,也没有判断redis获取的redis值进行删除,有可能删除错锁。如果进一步优化可以redis可以存一个流水号,业务代码执行完了以后,判断流水号是否相等,然后进行删除。可重入问题可以通过递归实现重试,但是依旧有问题:手工设置5000个线程来抢占锁,压测OK,但是容易导致StackOverflowError,在高并发不推荐使用,需要进一步完善。改进获取重试方法代码如下所示:
public String sale() {
String resMessgae = "";
String key = "luojiaRedisLocak";
// 标记线程id,知道使哪个线程在执行
String uuidValue = IdUtil.simpleUUID() + ":" + Thread.currentThread().getId();
// 不用递归了,高并发容易出错,我们用自旋代替递归方法重试调用;也不用if,用while代替
while (!stringRedisTemplate.opsForValue().setIfAbsent(key, uuidValue)) {
// 线程休眠20毫秒,进行递归重试
try {TimeUnit.MILLISECONDS.sleep(20);} catch (InterruptedException e) {e.printStackTrace();}
}
try {
// 1 抢锁成功,查询库存信息
String result = stringRedisTemplate.opsForValue().get("inventory01");
// 2 判断库存书否足够
Integer inventoryNum = result == null ? 0 : Integer.parseInt(result);
// 3 扣减库存,每次减少一个库存
if (inventoryNum > 0) {
stringRedisTemplate.opsForValue().set("inventory01", String.valueOf(--inventoryNum));
resMessgae = "成功卖出一个商品,库存剩余:" + inventoryNum + "\t" + ",服务端口号:" + port;
log.info(resMessgae);
} else {
resMessgae = "商品已售罄。" + "\t" + ",服务端口号:" + port;
log.info(resMessgae);
}
} finally {
stringRedisTemplate.delete(key);
}
return resMessgae;
}
为了防止出现死锁,需要给锁设置过期时,关键点在于过期时间设置,以避免代码异常出现,而该线程持续占有该锁。其java代码如下所示:
while (!stringRedisTemplate.opsForValue().setIfAbsent(key, uuidValue, 30L, TimeUnit.SECONDS)) {
// 线程休眠20毫秒,进行递归重试
try {TimeUnit.MILLISECONDS.sleep(20);} catch (InterruptedException e) {e.printStackTrace();}
}
为了防止误删key,在执行完了业务代码以后需要删掉锁,在try-catch-finally 中添加如下删除锁的代码 :
try {
//和上一个代码块重复,省略掉了
} finally {
// v5.0 改进点,判断加锁与解锁是不同客户端,自己只能删除自己的锁,不误删别人的锁
if (stringRedisTemplate.opsForValue().get(key).equalsIgnoreCase(uuidValue)) {
stringRedisTemplate.delete(key);
}
}
在finally中删除key并不能保持它的原子性,当业务执行时间大于锁的过期时间是,其他线程可以抢占该锁,没法保证业务执行的完整性,所以以下代码借助Lua脚本进行优化,java代码如下所示:
public String sale() {
String resMessgae = "";
String key = "luojiaRedisLocak";
String uuidValue = IdUtil.simpleUUID() + ":" + Thread.currentThread().getId();
// 不用递归了,高并发容易出错,我们用自旋代替递归方法重试调用;也不用if,用while代替
while (!stringRedisTemplate.opsForValue().setIfAbsent(key, uuidValue, 30L, TimeUnit.SECONDS)) {
// 线程休眠20毫秒,进行递归重试
try {TimeUnit.MILLISECONDS.sleep(20);} catch (InterruptedException e) {e.printStackTrace();}
}
try {
// 1 抢锁成功,查询库存信息
String result = stringRedisTemplate.opsForValue().get("inventory01");
// 2 判断库存书否足够
Integer inventoryNum = result == null ? 0 : Integer.parseInt(result);
// 3 扣减库存,每次减少一个库存
if (inventoryNum > 0) {
stringRedisTemplate.opsForValue().set("inventory01", String.valueOf(--inventoryNum));
resMessgae = "成功卖出一个商品,库存剩余:" + inventoryNum + "\t" + ",服务端口号:" + port;
log.info(resMessgae);
} else {
resMessgae = "商品已售罄。" + "\t" + ",服务端口号:" + port;
log.info(resMessgae);
}
} finally {
// 改进点,修改为Lua脚本的Redis分布式锁调用,必须保证原子性,参考官网脚本案例
String luaScript =
"if redis.call('get',KEYS[1]) == ARGV[1] then " +
"return redis.call('del',KEYS[1]) " +
"else " +
"return 0 " +
"end";
stringRedisTemplate.execute(new DefaultRedisScript(luaScript, Boolean.class), Arrays.asList(key), uuidValue);
}
return resMessgae;
}
上述代码,既然所已经被删除了,如何兼顾锁的可重入问题?这个问题下文会做出解释,希望读者耐心看完。可以在业务代码(同步代码块)前后添加lock和unlock实现加锁。而今重新进入时没必要重新获得一把锁。
可重入锁的概念:
可重入锁又名递归锁,是指在同一个线程在外层方法获取锁的时候,再进入该线程的内层方法会自动获取锁(前提,锁对象得是同一个对象),不会因为之前已经获取过还没释放而阻塞。
如果是1个有synchronized修饰的递归调用方法,程序第2次进入被自己阻塞了岂不是天大的笑话,出现了作茧自缚。所以Java中ReentrantLock和synchronized都是可重入锁,可重入锁的一个优点是可以一定程度避免死锁。下文1.5将展示详细的可重入分布式锁。
1.5 优化分布式锁
本次优化主要解决的问题有:宕机防止死锁、防止误删key、Lua保证原子性。设置 过期时间的同时,当业务执行时间大于过期时间,自动续锁功能等,分布式锁实现可重入需要使用的hset,记录进入次数。java代码如下所示:
新增续锁功能,java代码如下所示,其中实现步骤为:
步骤一:复原程序为初识无锁版本(即上述乞丐版)
步骤二:新建RedisDistributedLock类实现JUC里面的Lock接口
步骤三:满足JUC里面AQS对Lock锁的接口规范定义来进行实现落地代码
步骤四:结合设计模式开发属于自己的Redis分布式锁工具类
```java
package com.luojia.redislock.mylock;
import cn.hutool.core.util.IdUtil;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.data.redis.core.script.DefaultRedisScript;
import java.util.Arrays;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
/**
* 自研的分布式锁,实现了Lock接口
*/
public class RedisDistributedLock implements Lock {
private StringRedisTemplate stringRedisTemplate;
private String lockName; // KEYS[1]
private String uuidValule; // ARGV[1]
private long expireTime; // ARGV[2]
public RedisDistributedLock(StringRedisTemplate stringRedisTemplate, String lockName) {
this.stringRedisTemplate = stringRedisTemplate;
this.lockName = lockName;
this.uuidValule = IdUtil.simpleUUID() + ":" + Thread.currentThread().getId();
this.expireTime = 50L;
}
@Override
public void lock() {
tryLock();
}
@Override
public boolean tryLock() {
try {
tryLock(-1L, TimeUnit.SECONDS);
} catch (InterruptedException e) {
e.printStackTrace();
}
return false;
}
@Override
public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
if (-1 == time) {
//lua脚本加锁
String script =
"if redis.call('exists', KEYS[1]) == 0 or redis.call('hexists', KEYS[1], ARGV[1]) == 1 then " +
"redis.call('hincrby', KEYS[1], ARGV[1], 1) " +
"redis.call('expire', KEYS[1], ARGV[2]) " +
"return 1 " +
"else " +
"return 0 " +
"end";
System.out.println("lockName:" + lockName + "\t" + "uuidValue:" + uuidValule);
// 加锁失败需要自旋一直获取锁
while (!stringRedisTemplate.execute(
new DefaultRedisScript<>(script, Boolean.class),
Arrays.asList(lockName),
uuidValule,
String.valueOf(expireTime))) {
// 休眠60毫秒再来重试
try {TimeUnit.MILLISECONDS.sleep(60);} catch (InterruptedException e) {e.printStackTrace();}
}
return true;
}
return false;
}
@Override
public void unlock() {
//lua脚本解锁
String script = "" +
"if redis.call('hexists', KEYS[1], ARGV[1]) == 0 then " +
"return nil " +
"elseif redis.call('hincrby', KEYS[1], ARGV[1], -1) == 0 then " +
"return redis.call('del', KEYS[1]) " +
"else " +
"return 0 " +
"end";
System.out.println("lockName:" + lockName + "\t" + "uuidValue:" + uuidValule);
// LUA脚本由C语言编写,nil -> false; 0 -> false; 1 -> true;
// 所以此处DefaultRedisScript构造函数返回值不能是Boolean,Boolean没有nil
Long flag = stringRedisTemplate.execute(
new DefaultRedisScript<>(script, Long.class),
Arrays.asList(lockName),
uuidValule);
if (null == flag) {
throw new RuntimeException("this lock does not exists.");
}
}
// 下面两个暂时用不到,不用重写
@Override
public void lockInterruptibly() throws InterruptedException {
}
@Override
public Condition newCondition() {
return null;
}
}
其中lua脚本加锁说明,如下图所示:
返回为1情况说明。
lua脚本解锁说明
完整的分布式锁java代码如下所示:
// v7.0 使用自研的lock/unlock+LUA脚本自研的Redis分布式锁
Lock redisDistributedLock = new RedisDistributedLock(stringRedisTemplate, "luojiaRedisLock");
public String sale() {
String resMessgae = "";
redisDistributedLock.lock();
try {
// 1 抢锁成功,查询库存信息
String result = stringRedisTemplate.opsForValue().get("inventory01");
// 2 判断库存书否足够
Integer inventoryNum = result == null ? 0 : Integer.parseInt(result);
// 3 扣减库存,每次减少一个库存
if (inventoryNum > 0) {
stringRedisTemplate.opsForValue().set("inventory01", String.valueOf(--inventoryNum));
resMessgae = "成功卖出一个商品,库存剩余:" + inventoryNum + "\t" + ",服务端口号:" + port;
log.info(resMessgae);
} else {
resMessgae = "商品已售罄。" + "\t" + ",服务端口号:" + port;
log.info(resMessgae);
}
} finally {
redisDistributedLock.unlock();
}
return resMessgae;
}
小总结
引入工厂模式
可重入测试,在InventoryService类新增可重入测试方法。
// v7.1 使用工厂类创建锁
@Autowired
private DistributedLockFactory distributedLockFactory;
public String sale() {
String resMessgae = "";
Lock redisLock = distributedLockFactory.getDistributedLock("REDIS", "luojiaRedisLock");
redisLock.lock();
try {
// 1 抢锁成功,查询库存信息
String result = stringRedisTemplate.opsForValue().get("inventory01");
// 2 判断库存书否足够
Integer inventoryNum = result == null ? 0 : Integer.parseInt(result);
// 3 扣减库存,每次减少一个库存
if (inventoryNum > 0) {
stringRedisTemplate.opsForValue().set("inventory01", String.valueOf(--inventoryNum));
resMessgae = "成功卖出一个商品,库存剩余:" + inventoryNum + "\t" + ",服务端口号:" + port;
log.info(resMessgae);
testReEntry();
} else {
resMessgae = "商品已售罄。" + "\t" + ",服务端口号:" + port;
log.info(resMessgae);
}
} finally {
redisLock.unlock();
}
return resMessgae;
}
private void testReEntry() {
Lock redisLock = distributedLockFactory.getDistributedLock("REDIS", "luojiaRedisLock");
redisLock.lock();
try {
log.info("=================测试可重入锁=================");
} finally {
redisLock.unlock();
}
}
引入工厂模式
package com.luojia.redislock.mylock;
import cn.hutool.core.util.IdUtil;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Component;
import java.util.concurrent.locks.Lock;
@Component
public class DistributedLockFactory {
@Autowired
private StringRedisTemplate stringRedisTemplate;
private String uuid;
public DistributedLockFactory() {
this.uuid = IdUtil.simpleUUID();
}
public Lock getDistributedLock(String lockType, String lockName) {
if (lockType == null) {
return null;
}
if ("REDIS".equalsIgnoreCase(lockType)) {
return new RedisDistributedLock(stringRedisTemplate, lockName, uuid);
} else if ("ZOOKEEPER".equalsIgnoreCase(lockType)) {
// 后面存在就返回对应的分布式锁
} else if ("MYSQL".equalsIgnoreCase(lockType)) {
// 后面存在就返回对应的分布式锁
}
return null;
}
}
在RedisDistributedLock中,修改构造方法:
public RedisDistributedLock(StringRedisTemplate stringRedisTemplate, String lockName, String uuid) {
this.stringRedisTemplate = stringRedisTemplate;
this.lockName = lockName;
this.uuidValule = uuid + ":" + Thread.currentThread().getId();
this.expireTime = 50L;
}
锁的自动续费功能
确保RedisLock过期时间大于业务执行时间的问题,以便确保时间到了,业务没有执行完需要自动续期,对tryLock进行改正。
自动续锁的Lua脚本:
// 自动续期的LUA脚本
if redis.call('hexists', KEYS[1], ARGV[1]) == 1 then
return redis.call('expire', KEYS[1], ARGV[2])
else
return 0
end
```java
@Override
public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
if (-1 == time) {
String script =
"if redis.call('exists', KEYS[1]) == 0 or redis.call('hexists', KEYS[1], ARGV[1]) == 1 then " +
"redis.call('hincrby', KEYS[1], ARGV[1], 1) " +
"redis.call('expire', KEYS[1], ARGV[2]) " +
"return 1 " +
"else " +
"return 0 " +
"end";
System.out.println("lock() lockName:" + lockName + "\t" + "uuidValue:" + uuidValule);
// 加锁失败需要自旋一直获取锁
while (!stringRedisTemplate.execute(
new DefaultRedisScript<>(script, Boolean.class),
Arrays.asList(lockName),
uuidValule,
String.valueOf(expireTime))) {
// 休眠60毫秒再来重试
try {TimeUnit.MILLISECONDS.sleep(60);} catch (InterruptedException e) {e.printStackTrace();}
}
// 新建一个后台扫描程序,来检查Key目前的ttl,是否到我们规定的剩余时间来实现锁续期
resetExpire();
return true;
}
return false;
}
// 自动续期
private void resetExpire() {
String script =
"if redis.call('hexists', KEYS[1], ARGV[1]) == 1 then " +
"return redis.call('expire', KEYS[1], ARGV[2]) " +
"else " +
"return 0 " +
"end";
new Timer().schedule(new TimerTask() {
@Override
public void run() {
if (stringRedisTemplate.execute(
new DefaultRedisScript<>(script, Boolean.class),
Arrays.asList(lockName),
uuidValule,
String.valueOf(expireTime))) {
// 续期成功,继续监听
System.out.println("resetExpire() lockName:" + lockName + "\t" + "uuidValue:" + uuidValule);
resetExpire();
}
}
}, (this.expireTime * 1000 / 3));
}
总结
1.synchronized单机版OK;
2.Nginx分布式微服务,轮询多台服务器,单机锁不行;
3.取消单机锁,上redis分布式锁setnx,中小企业使用没问题;
4. 只是加锁了,没有释放锁,出异常的话,可能无法释放锁,必须要在代码层面finally释放锁
5. 如果服务宕机,部署了微服务代码层面根本就没有走到finally这块,没办法保证解锁,这个Key没有被删除,需要对锁设置过期时间 -
6. 为redis的分布式锁key增加过期时间,还必须要保证setnx+过期时间在同一行,保证原子性
7. 程序由于执行超过锁的过期时间,所以在finally中必须规定只能自己删除自己的锁,不能把别人的锁删除了,防止张冠李戴
8.将Lock、unlock变成LUA脚本保证原子性;
9.保证锁的可重入性,hset替代setnx+Lock变成LUA脚本,保障可重入性;
10.锁的自动续期 。