为什么需要分布式锁?
本地锁synchronized只能锁住当前服务进程,一个本地锁只能锁一个服务,如果是分布式服务情况下使用本地锁,那么多少服务就会有多少进程同时执行,就是去了锁的效果,为了到达分布式情况下一把锁能同时锁住全部服务,一次只放行一个进程那么就必须使用分布式锁
本地锁逻辑
/**
* 本地锁
*/
public Map<String, List<CategoryLevel2Vo>> getCatelogJsonFromDataWithLocalLock() {
//Map<一级分类id,二级分类集合>
//本地锁
synchronized (this) {
/**
* 这种加锁,此项目只部署在一台服务器的情况下可以,本地锁锁住一个实例
* 分布式情况下就不行,此项目部署在多台服务器上吗,每个锁锁住自己的实例只放一个线程进来
* 如果8太服务器就会有8个线程同时访问,失去了锁的作用
* 本地锁只能锁住当前进程,不能锁住其他服务
*
*/
//TODO 本地锁(当前进程锁)synchronized 、 (JUC)Lock ,分布式情况下必须使用分布式锁
//下一个线程拿到锁之后先查缓存,缓存中没有再查数据库,避免频繁查库
String categoryJson = stringRedisTemplate.opsForValue().get("categoryJson");
if (StringUtils.isNotBlank(categoryJson)){
Map<String, List<CategoryLevel2Vo>> object = JSON.parseObject(categoryJson, new TypeReference<Map<String, List<CategoryLevel2Vo>>>() {});
return object;
}
System.out.println("当前进程锁查询了数据库");
List<CategoryEntity> categoryGetAll = baseMapper.selectList(null);
//一级分类
List<CategoryEntity> category1Level = getParentCid(categoryGetAll,0L);
Map<String, List<CategoryLevel2Vo>> collect = category1Level.stream().collect(Collectors.toMap(item -> item.getCatId().toString()
, l1 -> {
//二级分类
List<CategoryEntity> category2Level = getParentCid(categoryGetAll,l1.getCatId());
List<CategoryLevel2Vo> collect2 = null;
if (category2Level != null){
collect2 = category2Level.stream().map(l2 -> {
//三级分类
List<CategoryEntity> category3Level = getParentCid(categoryGetAll,l2.getCatId());
List<CategoryLevel2Vo.CategoryLevel3Vo> collect3 = null;
if (category3Level != null){
collect3 = category3Level.stream().map(level3 -> {
return new CategoryLevel2Vo.CategoryLevel3Vo(l2.getCatId().toString(),level3.getCatId().toString(),level3.getName());
}).collect(Collectors.toList());
}
return new CategoryLevel2Vo(l1.getCatId().toString(),collect3,l2.getCatId().toString(),l2.getName());
}).collect(Collectors.toList());
}
return collect2;
}));
/**
* 为什么将存入redis放在释放锁之前?
* 线程拿到锁会去查缓存是否有数据,又因为我们向redis存入缓存数据是在释放锁之后
* 那么释放锁之后,下一个线程查缓存,上一个线程并未存入完成。此时就会出现查询多次数据库的情况,锁失效
* 故,存缓存数据应在锁释放之前完成
*/
//存入缓存redis
String s = JSON.toJSONString(collect);
stringRedisTemplate.opsForValue().set("categoryJson",s,1, TimeUnit.DAYS);
return collect;
}
}
分布式锁实现过程
基本原理:在redis中设置锁,执行完业务,释放锁操作,如果没有抢到锁就等待一段时间继续判断,自旋操作一定要返回
-
抢占锁
向redis 存入一个锁 (“lock”,“xxx”),使用setNX命令,NX – 只有键key不存在的时候才会设置key的值
stringRedisTemplate.opsForValue().setIfAbsent(“lock”, “1”);
setIfAbsent()方法set成功返回true,所以我们只需要判断返回值为true就代表占锁成功 -
释放锁
占锁成功后,执行完业务,然后删除这个key即可
public Map<String, List<CategoryLevel2Vo>> getCatelogJsonFromDataWithRedisLock() {
Boolean lock = stringRedisTemplate.opsForValue().setIfAbsent("lock", "1");
if(lock){
//抢占锁,set key 成功
//执行业务
Map<String, List<CategoryLevel2Vo>> data = getDataFromDb();
//业务....
//释放锁,删除key
stringRedisTemplate.delete("lock");
return data ;
}else{
//自旋
return getCatelogJsonFromDataWithRedisLock();
}
}
问题1:如果执行业务时出现异常,那么将不会执行下面的释放锁操作,那么将会死锁
解决办法:set锁时指定过期时间,如果业务出现异常,一定时间后锁会自动释放锁
问题2:如果业务未执行完成,锁到了过期时间自动释放导致其他进程抢占锁,此时业务执行完成然后释放锁,此时就会释放其他线程的锁
解决办法:set锁时,value用使用uuid,这样每个进程就只能释放自己的锁,那么删除锁时就需要根据uuid判断是否是自己的锁再进行删除
优化逻辑
public Map<String, List<CategoryLevel2Vo>> getCatelogJsonFromDataWithRedisLock() {
String uuid = UUID.randomUUID().toString();
Boolean lock = stringRedisTemplate.opsForValue().setIfAbsent("lock", uuid,300,TimeUnit.SECONDS);
if(lock){
//抢占锁,set key 成功
//执行业务
Map<String, List<CategoryLevel2Vo>> data = getDataFromDb();
//业务....
//释放锁,判断uuid
String redisValue = stringRedisTemplate.opsForValue().get("lock");
if(uuid.equals(redisValue )){
stringRedisTemplate.delete("lock");
}
stringRedisTemplate.delete("lock");
return data ;
}else{
//自旋
return getCatelogJsonFromDataWithRedisLock();
}
}
问题3:如果释放锁操作时出现问题导致释放锁失败死锁怎么办?现在释放锁需要取值、判断、删除,如果操作出现问题就会死锁
解决办法:使用 Lua 脚本 执行释放锁的操作,因为脚本能保证原子性
Lua解锁脚本
if redis.call(“get”,KEYS[1]) == ARGV[1]
then
return redis.call(“del”,KEYS[1])
else
return 0
end
分布式锁最终优化逻辑
原子加锁(uuid Value+过期时间)+ 原子解锁(lua脚本) + 加长过期时间,且释放锁操作用finally块包起来以确保释放锁,如果没有拿到锁则sleep一段时间,防止频繁调用栈空间溢出
/**
* 分布式锁 原子加锁(uuid Value+过期时间)+原子解锁(lua脚本)+加长过期时间
*/
public Map<String, List<CategoryLevel2Vo>> getCatelogJsonFromDataWithRedisLock() {
//set NX key不存在才能set
//设置set lock hah EX 300 NX,设置过期时间EX,防止死锁,加锁跟设置过期时间必须是原子性操作
String uuid = UUID.randomUUID().toString();
Boolean lock = stringRedisTemplate.opsForValue().setIfAbsent("lock", uuid,300,TimeUnit.SECONDS);
//lock为true set成功
if (lock){
System.out.println("获取分布式锁成功");
//加锁成功
Map<String, List<CategoryLevel2Vo>> dataFromDb;
try {
dataFromDb = getDataFromDb();
} finally {
//释放锁(删除lock)
//使用lua删除锁脚本,或者值+比较值+删除锁(脚本可以保证原子性)
String redisScript = "if redis.call(\"get\",KEYS[1]) == ARGV[1] then return redis.call(\"del\",KEYS[1]) else return 0 end";
Long execute = stringRedisTemplate.execute(new DefaultRedisScript<Long>(redisScript, Long.class), Arrays.asList("lock"), uuid);
}
return dataFromDb;
}else {
//加锁失败
System.out.println("获取分布式锁失败,等待重试");
//休眠
try {
Thread.sleep(200);
} catch (InterruptedException e) {
e.printStackTrace();
}
//自旋
return getCatelogJsonFromDataWithRedisLock();
}
}