一、为什么需要分布式锁
二、分布式锁实现
1.分布式锁演进 - 基本原理
我们可以同时去一个地方“占坑”,如果占到,就执行逻辑。否则就必须等待,直到释放锁。“占坑”可以去redis,可以去数据库,可以去任何大家都能访问的地方。等待可以自旋的方式。
2.分布式锁演进
(1) 分布式锁演进 — 阶段一
代码如下:
public Map<String,List<Catelog2Vo>> getCatalogJsonFromDbwithRedisLock(){
//1、占分布式锁。去redis占坑
Boolean lock = redisTemplate.opsForValue().setIfAbsent("lock","111");
if(lock){
//2、加锁成功... 执行业务
Map<String,List<Catelog2Vo>> dataFromDb = getDataFromDb();
//3、释放锁
redisTemplate.delete( key: "lock");
return dataFromDb
}else {//加锁失败...重试。synchronized ()//休眠100ms重试
//4.自旋的方式
return getCatalogJsonFromDbwithRedisLock();
}
}
问题:如果第二步执行业务代码出异常或者机器断电等会导致第三步不能释放锁
(2)分布式锁演进 — 阶段二
代码如下:
public Map<String,List<Catelog2Vo>> getCatalogJsonFromDbwithRedisLock(){
//1、占分布式锁。去redis占坑
Boolean lock = redisTemplate.opsForValue().setIfAbsent("lock","111");
if(lock){
//1.1、设置过期时间,这样写问题:没有和加锁是同步的,不是原子的
redisTemplate.expire( key: "lock", timeout: 30,TimeUnit.SECONDS)
//2、加锁成功... 执行业务
Map<String,List<Catelog2Vo>> dataFromDb = getDataFromDb();
//3、释放锁
redisTemplate.delete( key: "lock");
return dataFromDb
}else {//加锁失败...重试。synchronized ()//休眠100ms重试
//4.自旋的方式
return getCatalogJsonFromDbwithRedisLock();
}
}
问题:如果1.1加过期时间时机器断电等会导致第三步不能释放锁
redisTemplate.opsForValue.setIfAbsent( "lock", "1111",300,TimeUnit.SECONDS):
(3)分布式锁演进 — 阶段三
代码如下:
public Map<String,List<Catelog2Vo>> getCatalogJsonFromDbwithRedisLock(){
//1、占分布式锁。去redis占坑
Boolean lock = redisTemplate.opsForValue.setIfAbsent( "lock", "1111",300,TimeUnit.SECONDS);
if(lock){
//2、加锁成功... 执行业务
Map<String,List<Catelog2Vo>> dataFromDb = getDataFromDb();
//3、释放锁
redisTemplate.delete( key: "lock");
return dataFromDb;
}else {//加锁失败...重试。synchronized ()//休眠100ms重试
//4.自旋的方式
return getCatalogJsonFromDbwithRedisLock();
}
}
问题:释放锁失败会导致死锁
(4)分布式锁演进 — 阶段四 可在具体项目中使封装使用下面代码,但是建议使用专门的框架
使用 RedisTemplate 操作分布式锁
public Map<String, List<Catelog2Vo>> getCatalogJsonFromDbWithRedisLock() {
//1、占分布式锁。去redis占坑 设置过期时间必须和加锁是同步的,保证原子性(避免死锁)
String uuid = UUID.randomUUID().toString();
Boolean lock = stringRedisTemplate.opsForValue().setIfAbsent("lock", uuid,300,TimeUnit.SECONDS);
if (lock) {
System.out.println("获取分布式锁成功...");
Map<String, List<Catelog2Vo>> dataFromDb = null;
try {
//2.加锁成功...执行业务
dataFromDb = getDataFromDb();
} finally {
//3.删除锁 lua脚本解锁
String script = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) " +
"else return 0 end";
stringRedisTemplate.execute(new DefaultRedisScript<>(script,Long.class),
Arrays.asList("lock"), uuid);
//下面不能这样写
// 先去redis查询下保证当前的锁是自己的
//获取值对比,对比成功删除=原子性
// String lockValue = stringRedisTemplate.opsForValue().get("lock");
// if (uuid.equals(lockValue)) {
// //删除我自己的锁
// stringRedisTemplate.delete("lock");
// }
}
return dataFromDb;
} else {
System.out.println("获取分布式锁失败...等待重试...");
//4. 加锁失败...重试机制,要根据具体业务决定下面需不需要写
//休眠一百毫秒
try { TimeUnit.MILLISECONDS.sleep(100); } catch (InterruptedException e) { e.printStackTrace(); }
return getCatalogJsonFromDbWithRedisLock(); //自旋的方式
}
}