分布式锁:Redis、Zookeeper

news2025/1/12 1:04:20

1.基于Redis实现分布式锁

Redis分布式锁原理如上图所示,当有多个Set命令发送到Redis时,Redis会串行处理,最终只有一个Set命令执行成功,从而只有一个线程加锁成功

2.SetNx命令加锁

利用Redis的setNx命令在Redis数据库中创建一个<Key,Value>记录,这条命令只有当Redis中没有这个Key的时候才执行成功,当已经有这个Key的时候会返回失败。

可以借助于redis中的命令setnx(key, value),key不存在就新增,存在就什么都不做。同时有多个客户端发送setnx命令,只有一个客户端可以成功,返回1(true);其他的客户端返回0(false),流程图如下图所示:

  • 多个客户端同时尝试获取锁(setnx)
  • 获取成功,执行业务逻辑,执行完成释放锁(del)
  • 其他客户端等待重试

利用如上的setNx命令便可以简单的实现加锁功能,当多个线程去执行这个加锁命令时,只有一个线程执行成功,然后执行业务逻辑,其他线程加锁失败返回或者重试

public void testLock() {
    // 1. 从redis中获取锁,setnx
    Boolean lock = this.redisTemplate.opsForValue().setIfAbsent("lock", "111");
    if (lock) {
        // 查询redis中的num值
        String value = this.redisTemplate.opsForValue().get("num");
        // 没有该值return
        if (StringUtils.isBlank(value)){
            return ;
        }
        // 有值就转成成int
        int num = Integer.parseInt(value);
        // 把redis中的num值+1
        this.redisTemplate.opsForValue().set("num", String.valueOf(++num));
        // 2. 释放锁 del
        this.redisTemplate.delete("lock");
    } else {
        // 3. 每隔1秒钟回调一次,再次尝试获取锁
        try {
            Thread.sleep(1000);
            testLock();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

3.优化分布式锁_设置过期时间

设置过期有俩种方式可以选择:

  • 通过expire设置过期时间(缺乏原子性:如果在setnx和expire之间出现异常,锁也无法释放)
  • 在set时指定过期时间(推荐)

代码实现优化就是在设置锁的时候设置过期时间:

public void testLock() {
    // 1. 从redis中获取锁,setnx
    Boolean lock = this.redisTemplate.opsForValue().setIfAbsent("lock", "111",3, TimeUnit.MINUTES);
    if (lock) {
        //与之前相同代码略过
       ...
    }
}

那么还会不会存在问题呢?
场景:如果业务逻辑的执行时间是7s。执行流程如下:

  • index1业务逻辑没执行完,3秒后锁被自动释放。
  • index2获取到锁,执行业务逻辑,3秒后锁被自动释放。
  • index3获取到锁,执行业务逻辑
  • index1业务逻辑执行完成,开始调用del释放锁,这时释放的是index3的锁,导致index3的业务只执行1s就被别人释放。
  • 最终等于没锁的情况。

解决:setnx获取锁时,设置一个指定的唯一值(例如:uuid);释放前获取这个值,判断是否自己的锁。

4.错误删除锁问题

上面直接删除key来解锁方式会存在一个问题,考虑下面这种情况:

(1)线程1执行业务时间过长导致自己加的锁过期

(2)这时线程2进来加锁成功

(3)然后线程1业务逻辑执行完毕开始执行del key命令

(4)这时就会出现错误删除线程2加的锁

(5)错误删除线程2的锁后,线程3又可以加锁成功,导致有两个线程执行业务代码

5.优化分布式锁_防止误删除

public void testLock() {
    // 1. 从redis中获取锁,setnx
    String uuid = UUID.randomUUID().toString();
    Boolean lock = this.redisTemplate.opsForValue().setIfAbsent("lock", uuid,3, TimeUnit.MINUTES);
    if (lock) {
        //与之前相同代码略过
       ...
       // 2. 释放锁 del
       if (StringUtils.equals(redisTemplate.opsForValue().get("lock"),uuid)){
            this.redisTemplate.delete("lock");
        }
    }
}

场景:

  • index1执行删除时,查询到的lock值确实和uuid相等
  • index1执行删除前,lock刚好过期时间已到,被redis自动释放
  • index2获取了lock
  • index1执行删除,此时会把index2的lock删除
  • 问题:缺乏原子性

上面的setNx命令实现了基本的加锁功能,但存在一个致命的问题是,当程序在执行业务代码崩溃时,无法再执行到下面的解锁指令,从而导致出现死锁问题

为了解决死锁问题,这里就需要引入过期时间的概念,过期时间是给当前这个key设置一定的存活时间,当存活时间到期后,Redis就会自动删除这个过期的Key,从而使得程序在崩溃时也能到期自动释放锁

如上图所示,使用Redis的expire命令来为锁设置过期时间,从而实现到期自动解锁的功能,但这里仍然还存在一个问题就是加锁与给锁设置过期时间这两个操作命令并不是原子命令

考虑下面这种情况:

当程序在加锁完成后,在设置过期时间前崩溃,这时仍然会造成锁无法自动释放,从而产生死锁现象。

6.优化分布式锁_LUA脚本保证删除的原子性

  • 首先我们先简单介绍一下lua脚本的基本知识(lua脚本是c语言) 定义变量:
    全局变量:a = 11局部变量:local b = 22redis不允许lua脚本创建全局变量,只能声明局部变量 流程控制:if(exp) then业务逻辑elseif(exp) then业务逻辑else业务逻辑end redis中执行lua脚本: eval script numkeys keys[] args[] : eval指令的输出不是lua脚本的打印而是lua脚本的返回值 script:lua脚本字符串,定义动态变量:KEYS[1] ARGV[1] numkeys:key数组的元素个数 keys:keys数组 args:argv数组 redis集群执行lua脚本可能会报错:如果所有keys不在同一个分片上,lua脚本就会报错:解决方案是: keys只传一个 可以使用CLUSTER KEYSLOT bb{xx}
  • 删除LUA脚本:
if redis.call('get', KEYS[1]) == ARGV[1] 
    then return redis.call('del', KEYS[1]) 
    else return 0 end
public void testLock() {
    // 1. 从redis中获取锁,setnx
    String uuid = UUID.randomUUID().toString();
    Boolean lock = this.redisTemplate.opsForValue().setIfAbsent("lock", uuid, 3, TimeUnit.SECONDS);
    if (lock) {
        //与之前相同代码略过
        ...
        // 2. 释放锁 del
        String script = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end";
        this.redisTemplate.execute(new DefaultRedisScript<>(script), Arrays.asList("lock"), uuid);
    } else {
        // 3. 每隔1秒钟回调一次,再次尝试获取锁
        try {
            Thread.sleep(1000);
            testLock();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

7.优化分布式锁_可以重入

上述加锁命令使用了 SETNX ,一旦键存在就无法再设置成功,这就导致后续同一线程内继续加锁,将会加锁失败。当一个线程执行一段代码成功获取锁之后,继续执行时,又遇到加锁的子任务代码,可重入性就保证线程能继续执行,而不可重入就是需要等待锁释放之后,再次获取锁成功,才能继续往下执行。
可重入锁最大特性就是计数,计算加锁的次数。所以当可重入锁需要在分布式环境实现时,我们也就需要统计加锁次数。
我们基于Redis Hash 实现方案
Redis 提供了 Hash (哈希表)这种可以存储键值对数据结构。所以我们可以使用 Redis Hash 存储的锁的重入次数,然后利用 lua 脚本判断逻辑。

  • 加锁
if (redis.call('exists', KEYS[1]) == 0 or redis.call('hexists', KEYS[1], ARGV[1]) == 1) 
then
    redis.call('hincrby', KEYS[1], ARGV[1], 1);
    redis.call('expire', KEYS[1], ARGV[2]);
    return 1;
else
	return 0;
end

假设值为:KEYS:[lock], ARGV[uuid, expire]

如果锁不存在或者这是自己的锁,就通过hincrby(不存在新增,存在就加1)获取锁或者锁次数加1。 代码实例如下:

private Boolean tryLock(String lockName, String uuid, Long expire){
    String script = "if (redis.call('exists', KEYS[1]) == 0 or redis.call('hexists', KEYS[1], ARGV[1]) == 1) " +
        "then" +
        "    redis.call('hincrby', KEYS[1], ARGV[1], 1);" +
        "    redis.call('expire', KEYS[1], ARGV[2]);" +
        "    return 1;" +
        "else" +
        "   return 0;" +
        "end";
    if (!this.redisTemplate.execute(new DefaultRedisScript<>(script, Boolean.class), Arrays.asList(lockName), uuid, expire.toString())){
        try {
            // 没有获取到锁,重试
            Thread.sleep(200);
            tryLock(lockName, uuid, expire);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
    // 获取到锁,返回true
    return true;
}
  • 解锁
lua复制代码-- 判断 hash set 可重入 key 的值是否等于 0
-- 如果为 nil 代表 自己的锁已不存在,在尝试解其他线程的锁,解锁失败
-- 如果为 0 代表 可重入次数被减 1
-- 如果为 1 代表 该可重入 key 解锁成功
if (redis.call('hexists', KEYS[1], ARGV[1]) == 0) then
    return nil;
end;
-- 小于等于 0 代表可以解锁
if (redis.call('hincrby', KEYS[1], ARGV[1], -1) > 0) then
    return 0;
else
    redis.call('del', KEYS[1]);
    return 1;
end;

这里之所以没有跟加锁一样使用 Boolean ,这是因为解锁 lua 脚本中,三个返回值含义如下:

  • 1 代表解锁成功,锁被释放
  • 0 代表可重入次数被减 1
  • null 代表其他线程尝试解锁,解锁失败
    如果返回值使用 Boolean,Spring-data-redis 进行类型转换时将会把 null 转为 false,这就会影响我们逻辑判断,所以返回类型只好使用 Long。
private void unlock(String lockName, String uuid){
    String script = "if (redis.call('hexists', KEYS[1], ARGV[1]) == 0) then" +
        "    return nil;" +
        "end;" +
        "if (redis.call('hincrby', KEYS[1], ARGV[1], -1) > 0) then" +
        "    return 0;" +
        "else" +
        "    redis.call('del', KEYS[1]);" +
        "    return 1;" +
        "end;";
    // 这里之所以没有跟加锁一样使用 Boolean ,这是因为解锁 lua 脚本中,三个返回值含义如下:
    // 1 代表解锁成功,锁被释放
    // 0 代表可重入次数被减 1
    // null 代表其他线程尝试解锁,解锁失败
    Long result = this.redisTemplate.execute(new DefaultRedisScript<>(script, Long.class), Lists.newArrayList(lockName), uuid);
    // 如果未返回值,代表尝试解其他线程的锁
    if (result == null) {
        throw new IllegalMonitorStateException("attempt to unlock lock, not locked by lockName: "
                                               + lockName + " with request: "  + uuid);
    }
}
  • 使用
public void testLock() {
    // 加锁
    String uuid = UUID.randomUUID().toString();
    Boolean lock = this.tryLock("lock", uuid, 300l);
    if (lock) {
        // 读取redis中的num值
        String numString = this.redisTemplate.opsForValue().get("num");
        if (StringUtils.isBlank(numString)) {
            return;
        }
        // ++操作
        Integer num = Integer.parseInt(numString);
        num++;
        // 放入redis
        this.redisTemplate.opsForValue().set("num", String.valueOf(num));
        // 测试可重入性
        this.testSubLock(uuid);
        // 释放锁
        this.unlock("lock", uuid);
    }
}
// 测试可重入性
private void testSubLock(String uuid){
    // 加锁
    Boolean lock = this.tryLock("lock", uuid, 300l);
    if (lock) {
        System.out.println("分布式可重入锁。。。");
        this.unlock("lock", uuid);
    }
}

8.优化分布式锁_自动续期

A线程超时时间设为10s(为了解决死锁问题),但代码执行时间可能需要30s,然后redis服务端10s后将锁删除。 此时,B线程恰好申请锁,redis服务端不存在该锁,可以申请,也执行了代码。那么问题来了, A、B线程都同时获取到锁并执行业务逻辑,这与分布式锁最基本的性质相违背:在任意一个时刻,只有一个客户端持有锁(即独享排他)。

对于上述的这种情况,原因是由于设置的过期时间太短或者业务执行时间太长导致锁过期,但是为了避免死锁问题又必须设置过期时间,那这就需要引入自动续期的功能,即在加锁成功时,开启一个定时任务,自动刷新Redis加锁key的超时时间,从而避免上诉情况发生,如下图所示:

锁延期方法:开启子线程执行延期。在加锁成功后可以启动一个定时任务来对锁进行自动续期,定时任务的执行逻辑是:

(1)判断Redis中的锁是否是自己的

(2)如果存在的话就使用expire命令重新设置过期时间

这里由于需要两个Redis的命令,所以也需要使用lua脚本来实现原子操作,代码如下所示:

/**
 * 锁延期
 * 线程等待超时时间的2/3时间后,执行锁延时代码,直到业务逻辑执行完毕,因此在此过程中,其他线程无法获取到锁,保证了线程安全性
 * @param lockName
 * @param expire 单位:毫秒
 */
private void renewTime(String lockName, String uuid, Long expire){
    String script = "if(redis.call('hexists', KEYS[1], ARGV[1]) == 1) then redis.call('expire', KEYS[1], ARGV[2]); return 1; else return 0; end";
    new Thread(() -> {
        while (this.redisTemplate.execute(new DefaultRedisScript<>(script, Boolean.class), Lists.newArrayList(lockName), uuid, expire.toString())){
            try {
                // 到达过期时间的2/3时间,自动续期
                Thread.sleep(expire / 3);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }).start();
}

获取锁成功后,调用延期方法给锁 定时延期:

private Boolean tryLock(String lockName, String uuid, Long expire){
    String script = "if (redis.call('exists', KEYS[1]) == 0 or redis.call('hexists', KEYS[1], ARGV[1]) == 1) " +
        "then" +
        "    redis.call('hincrby', KEYS[1], ARGV[1], 1);" +
        "    redis.call('expire', KEYS[1], ARGV[2]);" +
        "    return 1;" +
        "else" +
        "   return 0;" +
        "end";
    if (!this.redisTemplate.execute(new DefaultRedisScript<>(script, Boolean.class), Arrays.asList(lockName), uuid, expire.toString())){
        try {
            // 没有获取到锁,重试
            Thread.sleep(200);
            tryLock(lockName, uuid, expire);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
    // 锁续期
    this.renewTime(lockName, uuid, expire * 1000);
    // 获取到锁,返回true
    return true;
}

9.优化分布式锁_Redlock算法

redis集群状态下的问题:

  • 客户端A从master获取到锁
  • 在master将锁同步到slave之前,master宕掉了。
  • slave节点被晋级为master节点
  • 客户端B取得了同一个资源被客户端A已经获取到的另外一个锁。
    安全失效 解决集群下锁失效,参照redis官方网站针对redlock文档:redis.io/topics/dist…

10.本地锁会出现的问题

我们知道java中有synchronized、lock锁、读写锁ReadWriteLock,众所周知这些锁都是本地锁。
提到锁就不得不提JUC:java.util.concurrent包,又称concurrent包。jdk1.5提供,为多线程高并发编程而提供的包,但此文章的场景是分布式场景,后续会出JUC的文章。

  • 简单的介绍一下synchronized及lock锁 synchronized是一个关键字,lock是一个接口,ReentrantLock是实现了lock接口的一个类 ReentrantLock:悲观的独占的互斥的排他的可公平可不公平的可重入锁 synchronized:悲观的独占的互斥的排他的非公平的可重入锁

11.准备

redis、ab工具(压测)

11.1不使用任何锁的情况下

  • 我们首先创建一个测试方法,testNoLock
@GetMapping("/test")
public void testNoLock(){
    String count = (String) this.redisTemplate.opsForValue().get("count");
    if (count == null){
        //没有值直接返回
        return;
    }
    // 有值就转成成int
    int number = Integer.parseInt(count);
    // 把redis中的num值+1
    this.redisTemplate.opsForValue().set("count", String.valueOf(++number));
}

测试之前的查看值为1

@GetMapping("/getCount")
public String getCount(){
    String count = String.valueOf(this.redisTemplate.opsForValue().get("count"));
    return count; //1
}
  • 接下来使用ab压力测试工具
cmd复制代码// ab  -n(一次发送的请求数)  -c(请求的并发数) 访问路径
ab -n100 -c50 http://127.0.0.1:8080/test/test

再次查询结果为6。

11.2使用本地锁

public synchronized void testNoLock(){
        String count = String.valueOf(this.redisTemplate.opsForValue().get("count"));
        if ("null".equals(count)){
            //没有值直接返回
            return;
        }
        // 有值就转成成int
        int number = Integer.parseInt(count);
        // 把redis中的num值+1
        this.redisTemplate.opsForValue().set("count", String.valueOf(++number));
    }
  • 再次使用ab压力测试工具
ab -n100 -c50 http://127.0.0.1:8080/test/test

此次结果为106,说明结果是正确的,看样子结果是非常完美的,但是真的很完美吗?

 11.3使用集群+本地锁

  • 我们只需要在idea中在启动俩个服务,修改端口号,三个运行实例的名称是相同的,并且网关的配置就是通过服务名在负载均衡,所以我们只需要访问网关,网关就会给我们做负载均衡了。

  • 再次使用ab压力测试工具(将count重置为1)
cmd复制代码ab -n100 -c50 http://127.0.0.1:8080/test/test

此次的结果为58!!!

到此说明了,本地锁是有局限性的。

12.可重入锁

对于一个功能完整的锁来说,可重入功能是必不可少的特性,所谓的锁可重入就是同一个线程,第一次加锁成功后,在第二次加锁时,无需进行排队等待,只需要判断是否是自己的锁就行了,可以直接再次获取锁来执行业务逻辑,如下图所示:

 实现可重入机制的原理就是在加锁的时候记录加锁次数,在释放锁的时候减少加锁次数,这个加锁的次数记录可以存在Redis中,如下图所示:

 如上图所示,加入可重入功能后,加锁的步骤就变为如下步骤:

(1)判断锁是否存在

(2)判断锁是否是自己的

(3)增加加锁的次数

由于增加次数以及减少次数是多个操作,这里需要再次使用lua脚本来实现,同时由于这里需要在Redis中存入加锁的次数,所以需要使用到Redis中的Map数据结构Map(key,uuid,lockCount),加锁lua脚本如下:

//锁不存在
if (redis.call('exists', key) == 0) then
    redis.call('hset', key, uuid, 1); 
    redis.call('expire', key, time); 
    return 1;
end;
//锁存在,判断是否是自己的锁
if (redis.call('hexists', key, uuid) == 1) then
    redis.call('hincrby', key, uuid, 1); 
    redis.call('expire', key, uuid);
    return 1; 
end; 
//锁不是自己的,返回加锁失败
return 0;

加入可重入功能后的解锁逻辑就变为:

(1)判断锁是否是自己的

(2)如果是自己的则减少加锁次数,否则返回解锁失败

//判断锁是否是自己的,不是自己的直接返回错误
if (redis.call('hexists', key,uuid) == 0) then
    return 0;
end;
//锁是自己的,则对加锁次数-1
local counter = redis.call('hincrby', key, uuid, -1);
if (counter > 0) then 
    //剩余加锁次数大于0,则不能释放锁,重新设置过期时间
    redis.call('expire', key, uuid); 
    return 1;
else
//等于0,代表可以释放锁了
    redis.call('del', key); 
    return 1; 
end; 

到此,在实现基本的加锁与解锁的逻辑上,又加入了可重入和自动续期的功能

13.Zookeeper实现分布式锁

Zookeeper是一个分布式协调服务,分布式协调主要是来解决分布式系统中多个应用之间的数据一致性,Zookeeper内部的数据存储方式类似于文件目录形式的存储结构,它的内存结果如下图所示:

14.Zookeeper加锁原理

在Zookeeper中的指定路径下创建节点,然后客户端根据当前路径下的节点状态来判断是否加锁成功,如下图一种情况为例,线程1创建节点成功后,线程2再去创建节点就会创建失败

15.Zookeeper节点类型

持久节点:在Zookeeper中创建后会进行持久储存,直到客户端主动删除

临时节点:以客户端会话Session维度创建节点,一旦客户端会话断开,节点就会自动删除

临时/持久顺序节点:在同一个路径下创建的节点会对每个节点按创建先后顺序编号

zookeeper.exists("/watchpath",new Watcher() {
    @Override
    public void process(WatchedEvent event) {
	System.out.println("进入监听器");
	System.out.println("监听路径Path:"+event.getPath());
	System.out.println("监听事件类型EventType:"+event.getType());				
    }			
});	

16.利用临时顺序节点和监听机制来实现分布式锁

实现分布式锁的方式有多种,我们可以使用临时节点和顺序节点这种方案来实现分布式锁:

1:使用临时节点可以在客户端程序崩溃时自动释放锁,避免死锁问题

2:使用顺序节点的好处是,可以利用锁释放的事件监听机制,来实现阻塞监听式的分布式锁

下面将基于这两个特性来实现分布式锁

17.加锁原理

1:首先在Zookeeper上创建临时顺序节点Node01、Node02等

2:第二步客户端拿到加锁路径下所有创建的节点

3:判断自己的序号是否最小,如果最小的话,代表加锁成功,如果不是最小的话,就对前一个节点创建监听器

4:如果前一个节点删除,监听器就会通知客户端来准备重新获取锁

加锁原理和代码入下图所示:

//加锁路径
String lockPath;
//用来阻塞线程
CountDownLatch cc = new CountDownLatch(1);
//创建锁节点的路径
Sting LOCK_ROOT_PATH = "/locks"

//先创建锁
public void createLock(){
    //lockPath = /locks/lock_01 
    lockPath = zkClient.create(LOCK_ROOT_PATH+"/lock_", CreateMode.EPHEMERAL_SEQUENTIAL);
}

//获取锁
public boolean acquireLock(){
    //获取当前加锁路径下所有的节点
    allLocks = zkClient.getChildren("/locks");
    //按节点顺序大小排序
    Collections.sort(allLocks);
    //判断自己是否是第一个节点
    int index = allLocks.indexOf(lockPath.substring(LOCK_ROOT_PATH.length() + 1));
    //如果是第一个节点,则加锁成功
    if (index == 0) {
        System.out.println(Thread.currentThread().getName() + "获得锁成功, lockPath: " + lockPath);
        return true;
    } else {
        //不是序号最小的节点,则监听前一个节点
        String preLock = allLocks.get(index - 1);
        //创建监听器
        Stat status = zkClient.exists(LOCK_ROOT_PATH + "/" + preLockPath, watcher);
        // 前一个节点不存在了,则重新获取锁
        if (status == null) {
            return acquireLock();
        } else { 
            //阻塞当前进程,直到前一个节点释放锁
            System.out.println(" 等待前一个节点锁释放,prelocakPath:"+preLockPath);
            //唤醒当前线程,继续尝试获取锁
            cc.await();
            return acquireLock();
        }
    }
}

private Watcher watcher = new Watcher() {
    @Override
    public void process(WatchedEvent event) {
         //监听到前一个节点释放锁,唤醒当前线程
         cc.countDown();
    }
}

18.可重入锁实现

Zookeeper实现可重入分布式锁的机制是在本地维护一个Map记录,因为如果在Zookeeper节点维护数据的话,Zookeeper的写操作是很慢,集群内部需要进行投票同步数据,所以在本地维护一个Map记录来记录当前加锁的次数和加锁状态,在释放锁的时候减少加锁的次数,原理如下图所示:

//利用Map记录线程持有的锁
ConcurrentMap<Thread, LockData> lockMap = Maps.newConcurrentMap();
public Boolean lock(){
    Thread currentThread = Thread.currentThread();
    LockData lockData = lockMap.get(currentThread);
    //LockData不为空则说明已经有锁
    if (lockData != null) {
       //加锁次数加一
       lockData.lockCount.increment();
       return true;
    }
    //没有锁则尝试获取锁
    Boolean lockResult = acquireLock();
    //获取到锁
    if (lockResult) {
        LockData newLockData = new LockData(currentThread,1);
        lockMap.put(currentThread, newLockData);
        return true;
    }
    //获取锁失败
    return false;
}

19.解锁原理

解锁的步骤如下:

(1)判断锁是不是自己的

(2)如果是则减少加锁次数

(3)如果加锁次数等于0,则释放锁,删除掉创建的临时节点,下一个监听这个节点的客户端会感知到节点删除事件,从而重新去获取锁

public Boolean releaseLock(){
    LockData lockData = lockMap.get(currentThread);
    //没有锁
    if(lockData == null){
       return false; 
    }
    //有锁则加锁次数减一
    lockCount = lockData.lockCount.decrement();
    if(lockCount > 0){
        return true;
    } 
    //加锁次数为0
    try{
        //删除节点
        zkClient.delete(lockPath);
        //断开连接
        zkClient.close();
    finally{
        //删除加锁记录
        lockMap.remove(currentThread);
    }
    return true;
}

20.Redis和Zookeeper锁对比

Redis

Zookeeper

读性能

基于内存

基于内存

加锁性能

直接写内存加锁

Master节点创建好后与其他Follower节点进行同步,半数成功后才能返回写入成功

数据一致性

AP架构Redis集群之间的数据同步是存在一定的延迟的,当主节点宕机后,数据如果还没有同步到从节点上,就会导致分布式锁失效,会造成数据的不一致

CP架构当Leader节点宕机后,会进行集群重新选举,如果此时只有一部分节点收到了数据的话,会在集群内进行数据同步,保证集群数据的一致性

21.总结

使用Redis还是Zookeeper来实现分布式锁,最终还是要基于业务来决定,可以参考以下两种情况:

(1)如果业务并发量很大,Redis分布式锁高效的读写性能更能支持高并发。

(2)如果业务要求锁的强一致性,那么使用Zookeeper可能是更好的选择。

(3)在做技术选型的时候,也应该酌情考虑团队成员技能及现有资源情况,如果部署有Redsi集群克优先考虑使用Redis。

  • 性能角度:redis > zk > mysql
  • 安全角度:zk > redis == mysql
  • 难易程度:zk > redis > mysql

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/778407.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

01 openEuler操作系统介绍

文章目录 01 openEuler操作系统介绍1.1 发布件1.2 最小硬件要求1.3 硬件兼容性1.4 关键特性1.4.1 openEuler 22.03-LTS基于 Linux Kernel 5.10 内核构建, 在进程调度、内存管理等方面带来10余处创新1.4.2 新介质文件系统1.4.3 内存分级扩展1.4.4 用户态协议栈1.4.5 云原生调度增…

【人工智能】深度优先搜索、代价一致搜索、深度有限搜索、迭代深度优先搜索、图搜索

【人工智能】无信息搜索—BFS 、代价一致、DFS、深度受限、迭代深入深度优先、图搜索 什么是搜索 搜索问题是指既不能通过数学建模解决,又没有其他算法可以套用或者非遍历所有情况才能得出正确结果。这时就需要采用搜索算法来解决问题。搜索就是一种通过穷举所有解的状态,来…

【车载开发系列】AUTOSAR DemDTCAttributes

【车载开发系列】AUTOSAR DemDTCAttributes 【车载开发系列】AUTOSAR DemDTCAttributes 【车载开发系列】AUTOSAR DemDTCAttributes一. DemDTCAttributes概念二. DemAgingCycleCounterThreshold三. DemAgingAllowed四. DemDTCPriority五. DemImmediateNvStorage六. DemMaxNumbe…

BatchNorm, LayerNorm, InstanceNorm和GroupNorm

1. 介绍 Batch Norm: 对NHW计算归一化参数(均值和方差)&#xff0c;总共得到C组归一化参数, 相当于对每个channel进行归一化。BN主要缺点是对batchsize的大小比较敏感&#xff0c;由于每次计算均值和方差是在一个batch上&#xff0c;所以如果batchsize太小&#xff0c;则计算的…

idea2021.安装pojie教程

1、下载ideaIU-2021.3应用包&#xff0c;点击finish 2、先关闭idea窗口&#xff0c;等会激活了脚本再运行打开。 3、双击运行install-current-user.vbs&#xff0c;等待一会会提示运行成功。 4、运行后&#xff0c;在文件中会多出一条配置 5、打开运行idea,输入激活码&#x…

iPhone 开机停留在苹果logo画面(已解决)

一、问题 如下图&#xff0c;开不了机&#xff1a; 标题 二、根因 存储空间满了。 三、解决方法 方法一 用苹果数据线&#xff08;最好是原装&#xff09;连接Mac电脑&#xff0c;在装有 macOS Catalina 10.15 或更高版本的 Mac 上&#xff0c;打开“访达”。在装有 macOS…

Vue-组件高级(上)

一、目标 能够掌握watch侦听器的基本使用能够知道vue中常用的生命周期函数能够知道如何实现组件之间的数据共享能够知道如何在vue3.x的项目中全局配置axios 二、目录 watch侦听器 1.什么是watch侦听器 watch侦听器允许开发之监视数据的变化&#xff0c;从而针对数据的变化做…

什么小程序需要商家自营相关类目?

1、百货&#xff1a;小程序主体公司综合零售商&#xff0c;在线售卖多种日用品&#xff0c;需补充商家自营-百货类目。预包装食品定义&#xff1a; 预包装食品&#xff0c;指预先定量包装或者制作在包装材料和容器中的食品&#xff1b;包括预先定量包装以及预先定量制作在包装…

微信小程序中如何携带参数跳转到tabBar页面

在小程序中使用了tabBar组件之后就不能用wx.navigateTo跳转到tabBar页面了 , 能跳转到tabBar页面的方法有以下两种 但是使用第一种方法时,会因为这种方法在路径后不能携带参数,所以行不通 那么就只能用第二种方法 , 用wx.reLaunch进行跳转 , 地址后跟上自己想要的参数 , 或者用…

使用Vue+elementUI实现CRUD

文章目录 前言一、简介二、使用Vue-Cli搭建Vue项目1. vue-cli 介绍2.axios.js 介绍3.Element-Ul 介绍4.moment.js 介绍5.搭建项目6.添加main.js配置7.修改App.vue8. 将moment.js 格式 Date 类型引入9. 加入分页10. 实现删除11. 实现添加12. 实现修改 总结 前言 最近了解了一下…

Qt 模态 非模态对话框 半模态 不阻塞对话框

Part1&#xff1a; 什么是模态和非模态对话框 对话框分为模态对话框和非模态对话框。 所谓模态对话框 所谓模态对话框&#xff0c;会阻塞同一应用程序中其它窗口的输入。同时会阻塞当前线程&#xff1b;程序不再下执行&#xff1b; 关闭 窗口&#xff0c;执行下面的代码&a…

从Nginx学习如何获取时间

最近因为工作接触到Nginx的学习&#xff0c;我就把Nginx的源代码下载下来&#xff0c;然后对其进行了分析。发现Nginx的性能强大离不开作者编码的苦心&#xff0c;作者将C的性能发挥到了极致&#xff0c;每个变量都用得非常出神入化。有如此强大的功能&#xff0c;才支撑了全球…

React:从 npx开始

使用 npm 来创建第一个 recat 文件&#xff08; react-demo 是文件名&#xff0c;可以自定义&#xff09; npx create-react-app react-demo npx是 npm v5.2 版本新添加的命令&#xff0c;用来简化 npm 中工具包的使用 原始&#xff1a; 全局安装npm i -g create-react-app 2 …

格式工厂5.10.0版本安装

目前格式工厂有很多&#xff0c;大多都可以进行视频转换 之前遇到一个用ffmpeg拉流保存的MP4在vlc和迅雷都无法正常播放的问题&#xff0c;发现视频长度不对&#xff0c;声音也不对&#xff0c;最后换到了格式工厂的格式播放器是可以正常播放的 格式工厂下载之家的地址 http…

【历史上的今天】7 月 20 日:人类登上月球;数据仓库之父诞生;Mac OS X Lion 发布

整理 | 王启隆 透过「历史上的今天」&#xff0c;从过去看未来&#xff0c;从现在亦可以改变未来。 今天是 2023 年 7 月 20 日&#xff0c;在 2005 年的今天&#xff0c;时任微软全球副总裁的李开复加盟谷歌担任谷歌全球副总裁及中国区总裁。谷歌公司在发布聘请李开复消息的同…

ffplay播放器剖析(5)----视频输出剖析

文章目录 1.视频输出模块1.1 视频输出初始化1.1.1 视频输出初始化主要流程1.1.2 calculate_display_rect初始化显示窗口大小 1.2 视频输出逻辑1.2.1 event_loop开始处理SDL事件1.2.2 video_refresh1.2.2.1 计算上一帧显示时长,判断是否还要继续上一帧1.2.2.2 估算当前帧显示时长…

数据结构——(一)绪论

&#x1f449;数据元素整体思维导图 欢迎补充 一、基本概念❤️ 1.1基本术语⭐️ &#xff08;1&#xff09;数据 客观事务属性的数字、字符。 &#xff08;2&#xff09;数据元素 数据元素是数据的基本单位&#xff0c;一个数据元素可由若干数据项组成&#xff0c;数据项是…

【测试开发】Python+Django实现接口测试工具

PythonDjango接口自动化 引言&#xff1a; 最近被几个公司实习生整自闭了&#xff0c;没有基础&#xff0c;想学自动化又不知道怎么去学&#xff0c;没有方向没有头绪&#xff0c;说白了其实就是学习过程中没有成就感&#xff0c;所以学不下去。出于各种花里胡哨的原因&#xf…

C语言第七课----------函数的定义及使用--------C语言重要一笔

作者前言 个人主页::小小页面 gitee页面:秦大大 一个爱分享的小博主 欢迎小可爱们前来借鉴 __________________________________________________________ 目录 1.函数是什么 2. 库函数 3. 自定义函数 4. 函数参数 5. 函数调用 6. 函数的嵌套调用和链式访问 7. 函数的声…

《向量数据库指南》:使用公共的Pinecone数据集

目录 数据集包含向量和元数据 列出公共数据集 加载数据集 迭代数据集 分批迭代文档并插入到索引中。 将数据集插入为数据帧。 接下来怎么做 本文档介绍如何使用现有的Pinecone数据集。 要了解创建和列出数据集的方法,请参阅创建数据集。 数据集包含向量和元数据 P…