Redis常见问题、各个分布式锁优缺点-05

news2024/11/19 14:53:45

Redis集群为什么至少需要三个master节点,并且推荐节点数为奇数?

因为新master的选举需要大于半数的集群master节点同意才能选举成功,如果只有两个master节点,当其中一个挂了,是达不到选举新master的条件的。

 奇数个master节点可以在满足选举该条件的基础上节省一个节点,比如三个master节点和四个master节点的集群相比,大家如果都挂了一个master节点都能选举新master节点,如果都挂了两个master节点都没法选举新master节点了,所以奇数的master节点更多的是从节省机器资源角度出发说的。

Redis集群对批量操作命令的支持

对于类似mset,mget这样的多个key的原生批量操作命令,redis集群只支持所有key落在同一slot的情况,如果有多个key一定要用mset命令在redis集群上操作,则可以在key的前面加上{XX},这样参数数据分片hash计算的只会是大括号里的值,这样能确保不同的key能落到同一slot里去,示例如下:

1 mset {user1}:1:name zhuge {user1}:1:age 18

假设name和age计算的hash slot值不一样,但是这条命令在集群下执行,redis只会用大括号里的 user1 做hash slot计算,所以算出来的slot值肯定相同,最后都能落在同一slot。

Redis有哪几种数据淘汰策略?

redis.conf中可配置Redis的最大内存量 maxmemory,如果配置为0,在64位系统下则表示无最大内存限制,在32位系统下则表示最大内存限制为 3 GB。当实际使用内存 mem_used 达到设置的阀值 maxmemory 后,Redis将按照预设的淘汰策略进行数据淘汰。

淘汰策略名称 策略含义

noeviction 默认策略,不淘汰数据;大部分写命令都将返回错误(DEL等少数除外)

allkeys-lru 从所有数据中根据 LRU 算法挑选数据淘汰

volatile-lru 从设置了过期时间的数据中根据 LRU 算法挑选数据淘汰

allkeys-random 从所有数据中随机挑选数据淘汰

volatile-random 从设置了过期时间的数据中随机挑选数据淘汰

volatile-ttl 从设置了过期时间的数据中,挑选越早过期的数据进行删除

allkeys-lfu 从所有数据中根据 LFU 算法挑选数据淘汰(4.0及以上版本可用)

volatile-lfu 从设置了过期时间的数据中根据 LFU 算法挑选数据淘汰(4.0及以上版本可用)

Redis Key的过期策略有哪些?

惰性删除:当读/写一个已经过期的key时,会触发惰性删除策略,直接删除掉这个过期key,很明显,这是被动的。(例如:先get 这个key ,如果不存在就再删除)
定期删除:由于惰性删除策略无法保证冷数据被及时删掉,所以 redis 会定期主动淘汰一批已过期的key。
主动删除:当前已用内存超过maxMemory限定时,触发主动清理策略。主动设置的前提是设置了maxMemory的值。

网络抖动

真实世界的机房网络往往并不是风平浪静的,它们经常会发生各种各样的小问题。比如网络抖动就是非常常见的一种现象,突然之间部分连接变得不可访问,然后很快又恢复正常。

为解决这种问题,Redis Cluster 提供了一种选项cluster­node­timeout,表示当某个节点持续 timeout的时间失联时,才可以认定该节点出现故障,需要进行主从切换。如果没有这个选项,网络抖动会导致主从频繁切换 (数据的重新复制)。

缓存与数据库双写不一致

在大并发下,同时操作数据库与缓存会存在数据不一致性问题

1、双写不一致情况

2、读写并发不一致

解决方案:
1、对于并发几率很小的数据(如个人维度的订单数据、用户数据等),这种几乎不用考虑这个问题,很少会发生 缓存不一致,可以给缓存数据加上过期时间,每隔一段时间触发读的主动更新即可。
2、就算并发很高,如果业务上能容忍短时间的缓存数据不一致(如商品名称,商品分类菜单等),缓存加上过期 时间依然可以解决大部分业务对于缓存的要求。
3、如果不能容忍缓存数据不一致,可以通过加 分布式读写锁 保证并发读写或写写的时候按顺序排好队, 读读的 时候相当于无锁
4、也可以用阿里开源的canal通过监听数据库的binlog日志及时的去修改缓存,但是引入了新的中间件,增加 了系统的复杂度

总结:
以上我们针对的都是 读多写少 的情况加入缓存提高性能,如果 写多读多 的情况又不能容忍缓存数据不一致,那
就没必要加缓存了,可以直接操作数据库。当然,如果数据库抗不住压力,还可以把缓存作为数据读写的主存
储,异步将数据同步到数据库,数据库只是作为数据的备份。
放入缓存的数据应该是对实时性、一致性要求不是很高的数据。切记不要为了用缓存,同时又要保证绝对的一
致性做大量的过度设计和控制,增加系统复杂性

缓存失效(击穿)

由于大批量缓存在同一时间失效可能导致大量请求同时穿透缓存直达数据库,可能会造成数据库瞬间压力过大 甚至挂掉,对于这种情况我们在批量增加缓存时最好将这一批数据的缓存过期时间设置为一个时间段内的不同 时间。

缓存雪崩

缓存雪崩指的是缓存层支撑不住或宕掉后, 流量会像奔逃的野牛一样, 打向后端存储层。
由于缓存层承载着大量请求, 有效地保护了存储层, 但是如果缓存层由于某些原因不能提供服务(比如超大并 发过来,缓存层支撑不住,或者由于缓存设计不好,类似大量请求访问bigkey,导致缓存能支撑的并发急剧下 降), 于是大量请求都会打到存储层, 存储层的调用量会暴增, 造成存储层也会级联宕机的情况。预防和解决缓存雪崩问题, 可以从以下三个方面进行着手。
1) 保证缓存层服务高可用性,比如使用Redis Sentinel或Redis Cluster。
2) 依赖隔离组件为后端限流熔断并降级。比如使用Sentinel或Hystrix限流降级组件。
比如服务降级,我们可以针对不同的数据采取不同的处理方式。当业务应用访问的是非核心数据(例如电商商 品属性,用户信息等)时,暂时停止从缓存中查询这些数据,而是直接返回预定义的默认降级信息、空值或是 错误提示信息;当业务应用访问的是核心数据(例如电商商品库存)时,仍然允许查询缓存,如果缓存缺失, 也可以继续通过数据库读取。
3) 提前演练。 在项目上线前, 演练缓存层宕掉后, 应用以及后端的负载情况以及可能出现的问题, 在此基 础上做一些预案设定

缓存穿透

缓存穿透是指查询一个一定不存在的数据,由于缓存是不命中时需要从数据库查询,查不到数据则不写入缓存,这将导致这个不存在的数据每次请求都要到数据库去查询,进而给数据库带来压力。

解决方案:

1 对url中的key id值进行对称加密,不能轻易暴露出真实的key值,防止黑客攻击

2 不管数据实际上存不存在,我们都把这个键存到缓存中(有效期设置的短一些,比如一分钟到三分钟),然后值设置为一个特定值,业务中如果获取到的结果是这个特定值,则报错返回。

布隆过滤器

对于恶意攻击,向服务器请求大量不存在的数据造成的缓存穿透,还可以用布隆过滤器先做一次过滤,对于不 存在的数据布隆过滤器一般都能够过滤掉,不让请求再往后端发送。当布隆过滤器说某个值存在时,这个值可 能不存在;当它说不存在时,那就肯定不存在。

  布隆过滤器就是一个大型的位数组和几个不一样的无偏 hash 函数。所谓无偏就是能够把元素的 hash 值算得 比较均匀。
向布隆过滤器中添加 key 时,会使用多个 hash 函数对 key 进行 hash 算得一个整数索引值然后对位数组长度 进行取模运算得到一个位置,每个 hash 函数都会算得一个不同的位置。再把位数组的这几个位置都置为 1 就 完成了 add 操作。
向布隆过滤器询问 key 是否存在时,跟 add 一样,也会把 hash 的几个位置都算出来,看看位数组中这几个位 置是否都为 1,只要有一个位为 0,那么说明布隆过滤器中这个key 不存在。如果都是 1,这并不能说明这个 key 就一定存在,只是极有可能存在,因为这些位被置为 1 可能是因为其它的 key 存在所致。如果这个位数组 比较稀疏,这个概率就会很大,如果这个位数组比较拥挤,这个概率就会降低。 这种方法适用于数据命中不高、 数据相对固定、 实时性低(通常是数据集较大) 的应用场景, 代码维护较为 复杂, 但是缓存空间占用很少。  
注意:如果缓存数据有更新,布隆过滤器并不会跟着更新,也不能修改,需要把所有的数据全部生成一遍

redis的分布式锁

setnx 优缺点

Setnx(key,value);

场景:请求过来设置key,value,存在返回false,不存在返回true。这样下个请求过来就不会执行

问题: 如果第一个请求过来获取到锁,项目挂了,那这个锁就永远都得不到释放

增加超时时间

Setnex(key,value,超时时间)

场景:针对上面的问题,在set key ,value 时,增加一个超时时间,这样就算项目挂了,时间一到。锁也会得到释放

问题与不足

这么操作有一个很明显的弊端,一旦业务执行超时,锁自动失效的话,会造成删除错锁的线程安全问题!列举一个场景:

业务逻辑 1 首先持有锁,开始执行自己的业务逻辑。
业务逻辑 1 由于网络波动等原因,在锁到期之前未能执行完毕自己的业务逻辑,但是锁到期自动释放了,此时 Redis 中没有锁了。
业务逻辑 2 也来获取锁,顺利持有锁后开始执行自己的业务逻辑。
业务逻辑 1 终于执行完毕了自己的业务,删除了锁。注意,此时业务逻辑 1 删除的其实是业务逻辑 2 的锁,导致了线程安全问题。

确保删除的是自己的锁

Setnex(key,value,超时时间)  value 为唯一标识,如线程id

在释放锁的时候判断是不是自己,如果是自己再删除

问题与不足

这么操作依然还是有线程安全问题,因为并不能保证确认是自己的锁和删除锁的原子性!还是列举一个场景:

业务逻辑 1 执行删除时,查询到的 LikeLock 值确实与自己的 uuid 相等。
业务逻辑 1 执行删除前,LikeLock 刚好过期时间已到,被 redis 自动释放,在 redis 中没有了 LikeLock,没有了锁。
业务逻辑 2 获取了 LikeLock,加锁成功,开始执行自己的业务。
业务逻辑 1 此时执行了删除操作,会把业务逻辑 2 的 LikeLock 删除,导致出现进程安全问题。

使用lua脚本确保删除锁的原子性

我们采取 LUA 脚本使两条命令在 Redis 客户端当做一个脚本整体运行,中间不会插入其他命令。

Lua 是一种轻量小巧的脚本语言,用标准 C 语言编写并以源代码形式开放, 其设计目的是为了嵌入应用程序中,从而为应用程序提供灵活的扩展和定制功能。

lua 脚本优点:

减少网络开销:原先多次请求的逻辑放在 redis 服务器上完成。使用脚本,减少了网络往返时延

原子操作:Redis 会将整个脚本作为一个整体执行,中间不会被其他命令插入(想象为事务)

复用:客户端发送的脚本会永久存储在 Redis 中,意味着其他客户端可以复用这一脚本而不需要使用代码完成同样的逻辑

setNX 锁在非单机模式下的缺陷只能说,在单机 Redis 模式下,setnx 分布式锁,简直是无敌!

但是 setnx 锁最大的缺点就是它加锁时只作用在一个 Redis 节点上,即使 Redis 通过 Sentinel(哨岗、哨兵) 保证高可用,如果这个 master 节点由于某些原因发生了主从切换,那么就会出现锁丢失的情况,下面是个例子:

在 Redis 的 master 节点上拿到了锁;但是这个加锁的 key 还没有同步到 slave 节点;master 故障,发生故障转移,slave 节点升级为 master 节点;上边 master 节点上的锁丢失。

RedLock

他的核心思路是:搞几个独立的Master,比如5个。然后挨着个的加锁,只要超过一半以上(这里是5 / 2 + 1 = 3个)那就代表加锁成功,然后释放锁的时候也逐台释放。这样的好处在于一台Master挂了的话,还有其他的,所以不耽误,看起来好像完美解决了上面的问题。但是并不是100%安全,后面会说。

具体细节为:

使用相同的key和随机数在N个Master节点上获取锁,这里获取锁的尝试时间要远远小于锁的超时时间,就是为了防止某个Master挂了后我们还在不断的获取锁,导致被阻塞的时间过长。也就是说,假设锁30秒过期,三个节点加锁花了31秒,自然是加锁失败了。

只有在大多数节点(一般是【(2/n)+1】)上获取到了锁,而且总的获取时间小于锁的超时时间的情况下,认为锁获取成功了。

如果锁获取成功了,锁的超时时间就是最初的锁超时时间减获取锁的总耗时时间。

如果锁获取失败了,不管是因为获取成功的节点的数目没有过半,还是因为获取锁的耗时超过了锁的释放时间,都会将已经设置了key的master上的key删除。

需要注意两点:

  1. Redis多个Master所在的机器时间必须同步。
  2. Redis红锁机器挂了的话要延迟启动1min(大于锁超时时间就行),因为:如果三台Master,写入2台成功了,加锁成功,但是挂了一个,还保留了一个Master可用,释放锁的时候自然挂了的那个不会执行del,当他瞬间再次启动的时候会发现锁还在(因为还没到过期时间),可能造成未知的问题。所以让Redis延迟启动。

主要存在的问题:

  1. 实现原理异常复杂,相信大家也看到了。
  2. 依然是不安全的加锁方式。比如:给5个Master都加了锁,失效时间是3s,但是因为加锁的时候可能因为网络抖动或者其他情况导致只给3台机器加完锁就到3s了,失效了。后面2台还没加锁呢,前面3个已经失效了。但是这时候其他线程又进行上锁发现前面3个无锁正常上锁,因为是过半原则,3个认为加锁成功。这就导致了两个线程同时加锁成功,前3个是后面线程的锁,后两个是最开始线程的锁,这不乱套了吗?线程也不安全了!或许你会说开watchDog续期,那好像是没问题了,但是我换个问题,我不是到期了,而是挂了一台,还没同步到Slave呢,Slave升级为Master了,其他线程发现这个Slave上没有锁,依然可以加锁成功3台,半数以上。还是并发了,不安全。那怎么办?不要Slave了嘛?RedLock太麻烦啦!

Redisson

Redisson 是架设在 Redis 基础上的一个 Java 驻内存数据网格框架, 充分利用 Redis 键值数据库提供的一系列优势, 基于 Java 实用工具包中常用接口, 为使用者提供了 一系列具有分布式特性的常用工具类

指定一个 key 作为锁标记,存入 Redis 中,指定一个 唯一的用户标识 作为 value。
当 key 不存在时才能设置值,确保同一时间只有一个客户端进程获得锁,满足 互斥性 特性。
设置一个过期时间,防止因系统异常导致没能删除这个 key,满足 防死锁 特性。
当处理完业务之后需要清除这个 key 来释放锁,清除 key 时需要校验 value 值,需要满足 只有加锁的人才能释放锁。
WatchDog 机制 能够很好的解决锁续期的问题,预防死锁。
能够灵活的设置加锁时间,等待锁时间,释放锁失败后锁的存在时间。

获取锁

public RLock getLock(String name) {
    return new RedissonLock(connectionManager.getCommandExecutor(), name);
}
public RedissonLock(CommandAsyncExecutor commandExecutor, String name) {
        super(commandExecutor, name);
        //异步处理的命令执行器
        this.commandExecutor = commandExecutor;
        //生成唯一id
        this.id = commandExecutor.getConnectionManager().getId();
        //锁存活时间,默认30s
        this.internalLockLeaseTime = commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout();
        //将id和业务key拼接,作为实际的key
        this.entryName = id + ":" + name;
        this.pubSub = commandExecutor.getConnectionManager().getSubscribeService().getLockPubSub();
}

加锁过程

private void lock(long leaseTime, TimeUnit unit, boolean interruptibly) throws InterruptedException {
    long threadId = Thread.currentThread().getId();
    Long ttl = tryAcquire(-1, leaseTime, unit, threadId);
    // lock acquired
    if (ttl == null) {
        return;
    }

    RFuture<RedissonLockEntry> future = subscribe(threadId);
    if (interruptibly) {
        commandExecutor.syncSubscriptionInterrupted(future);
    } else {
        commandExecutor.syncSubscription(future);
    }

    try {
        while (true) {
            ttl = tryAcquire(-1, leaseTime, unit, threadId);
            // lock acquired
            if (ttl == null) {
                break;
            }

            // waiting for message
            if (ttl >= 0) {
                try {
                    future.getNow().getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
                } catch (InterruptedException e) {
                    if (interruptibly) {
                        throw e;
                    }
                    future.getNow().getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
                }
            } else {
                if (interruptibly) {
                    future.getNow().getLatch().acquire();
                } else {
                    future.getNow().getLatch().acquireUninterruptibly();
                }
            }
        }
    } finally {
        unsubscribe(future, threadId);
    }
//        get(lockAsync(leaseTime, unit));
}
private Long tryAcquire(long waitTime, long leaseTime, TimeUnit unit, long threadId) {
        return get(tryAcquireAsync(waitTime, leaseTime, unit, threadId));
}
private <T> RFuture<Long> tryAcquireAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId) {
    if (leaseTime != -1) {
        return tryLockInnerAsync(waitTime, leaseTime, unit, threadId, RedisCommands.EVAL_LONG);
    }
    RFuture<Long> ttlRemainingFuture = tryLockInnerAsync(waitTime, internalLockLeaseTime,
                                                            TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);
    ttlRemainingFuture.onComplete((ttlRemaining, e) -> {
        if (e != null) {
            return;
        }

        // lock acquired
        if (ttlRemaining == null) {
            scheduleExpirationRenewal(threadId);
        }
    });
    return ttlRemainingFuture;
}

我们直接调用的lock方法,这时leaseTime为-1,不执行if分支。

<T> RFuture<T> tryLockInnerAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
        internalLockLeaseTime = unit.toMillis(leaseTime);

        return evalWriteAsync(getName(), LongCodec.INSTANCE, command,
                "if (redis.call('exists', KEYS[1]) == 0) then " +
                        "redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
                        "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                        "return nil; " +
                        "end; " +
                        "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
                        "redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
                        "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                        "return nil; " +
                        "end; " +
                        "return redis.call('pttl', KEYS[1]);",
                Collections.singletonList(getName()), internalLockLeaseTime, getLockName(threadId));
}

这时leaseTime为默认的30s,这段lua的执行是重点:

首先呢,他先用exists命令判断了待获取锁的key anyLock 存不存在,如果不存在,就使用hset命令将锁key testlock作为key的map结构中存入一对键值对,4afd01d9-48e8-4341-9358-19f0507a9dcc:397 1
同时还使用了pexpire命令给anyLock设置了过期时间30000毫秒,然后返回为空;
如果anyLock已经存在了,会走另一个分支,此时会判断anyLock Map中是否存在37f75873-494a-439c-a0ed-f102bc2f3204:1,如果存在的话,就调用hincrby命令自增这个key的值,并且将anyLock的过期时间设置为30000毫秒,并且返回空。
如果上面俩种情况都不是,那么就返回这个anyLock的剩余存活时间。

 脚本也可以保证执行命令的原子性。然后呢就直接返回了一个RFuture ttlRemainingFuture,并且给他加了一个监听器,如果当前的这个异步加锁的步骤完成的时候调用,如果执行成功,就直接同步获取一个Long类型的ttlRemaining。通过加锁的lua脚本可知,如果加锁或者重入锁成功的话会发现TTLRemaining是为null的,那么就会执行下面的这一行代码,我们可以看到注释 锁已获得。
 

// lock acquired

if (ttlRemaining == null) {
  scheduleExpirationRenewal(threadId);
}

以上我们分析了redisson加锁的过程,总结来说,流程不复杂,代码也很直观,主要是异步通过lua脚本执行了加锁的逻辑。

看门狗机制
其中,我们注意到了一些细节,比如 RedissonLock中的变量internalLockLeaseTime,默认值是30000毫秒,还有调用tryLockInnerAsync()传入的一个从连接管理器获取的getLockWatchdogTimeout(),他的默认值也是30000毫秒,这些都和redisson官方文档所说的watchdog机制有关,看门狗,还是很形象的描述这一机制,那么看门狗到底做了什么,为什么怎么做呢?下面我们就来分析和探讨一下。

加锁成功后的问题

假设在一个分布式环境下,多个服务实例请求获取锁,其中服务实例1成功获取到了锁,在执行业务逻辑的过程中,服务实例突然挂掉了或者hang住了,那么这个锁会不会释放,什么时候释放?
回答这个问题,自然想起来之前我们分析的lua脚本,其中第一次加锁的时候使用pexpire给锁key设置了过期时间,默认30000毫秒,由此来看如果服务实例宕机了,锁最终也会释放,其他服务实例也是可以继续获取到锁执行业务。但是要是30000毫秒之后呢,要是服务实例1没有宕机但是业务执行还没有结束,所释放掉了就会导致线程问题,这个redisson是怎么解决的呢?这个就一定要实现自动延长锁有效期的机制。


之前,我们分析到异步执行完lua脚本执行完成之后,设置了一个监听器,来处理异步执行结束之后的一些工作

private void scheduleExpirationRenewal(long threadId) {
        ExpirationEntry entry = new ExpirationEntry();
        ExpirationEntry oldEntry = EXPIRATION_RENEWAL_MAP.putIfAbsent(getEntryName(), entry);
        if (oldEntry != null) {
            oldEntry.addThreadId(threadId);
        } else {
            entry.addThreadId(threadId);
            renewExpiration();
        }
}
首先,会先判断在expirationRenewalMap中是否存在了entryName,这是个map结构,主要还是判断在这个服务实例中的加锁客户端的锁key是否存在,如果已经存在了,就直接返回;第一次加锁,肯定是不存在的。
接下来就是搞了一个TimeTask,延迟internalLockLeaseTime/3之后执行,这里就用到了文章一开始就提到奇妙的变量,算下来就是大约10秒钟执行一次,调用了一个异步执行的方法,renewExpirationAsync方法,也是调用异步执行了一段lua脚本

private void renewExpiration() {
        ExpirationEntry ee = EXPIRATION_RENEWAL_MAP.get(getEntryName());
        if (ee == null) {
            return;
        }
        
        Timeout task = commandExecutor.getConnectionManager().newTimeout(new TimerTask() {
            @Override
            public void run(Timeout timeout) throws Exception {
                ExpirationEntry ent = EXPIRATION_RENEWAL_MAP.get(getEntryName());
                if (ent == null) {
                    return;
                }
                Long threadId = ent.getFirstThreadId();
                if (threadId == null) {
                    return;
                }
                
                RFuture<Boolean> future = renewExpirationAsync(threadId);
                future.onComplete((res, e) -> {
                    if (e != null) {
                        log.error("Can't update lock " + getName() + " expiration", e);
                        EXPIRATION_RENEWAL_MAP.remove(getEntryName());
                        return;
                    }
                    
                    if (res) {
                        // reschedule itself
                        renewExpiration();
                    }
                });
            }
        }, internalLockLeaseTime / 3, TimeUnit.MILLISECONDS);
        
        ee.setTimeout(task);
}

首先判断这个锁key的map结构中是否存在对应的4afd01d9-48e8-4341-9358-19f0507a9dcc:397,如果存在,就直接调用pexpire命令设置锁key的过期时间,默认30000毫秒。

protected RFuture<Boolean> renewExpirationAsync(long threadId) {
        return evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
                "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
                        "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                        "return 1; " +
                        "end; " +
                        "return 0;",
                Collections.singletonList(getName()),
                internalLockLeaseTime, getLockName(threadId));
}

在上面任务调度的方法中,也是异步执行并且设置了一个监听器,在操作执行成功之后,会回调这个方法,如果调用失败会打一个错误日志并返回,更新锁过期时间失败;
然后获取异步执行的结果,如果为true,就会调用本身,如此说来又会延迟10秒钟去执行这段逻辑,所以,这段逻辑在你成功获取到锁之后,会每隔十秒钟去执行一次,并且,在锁key还没有失效的情况下,会把锁的过期时间继续延长到30000毫秒,也就是说只要这台服务实例没有挂掉,并且没有主动释放锁,看门狗都会每隔十秒给你续约一下,保证锁一直在你手中。完美的操作。

其他实例没有获得锁的过程

这时如果有别的服务实例来尝试加锁又会发生什么情况呢?或者当前客户端的别的线程来获取锁呢?很显然,肯定会阻塞住,我们来通过代码看看是怎么做到的。还是把眼光放到之前分析的那段加锁lua代码上。

当加锁的锁key存在的时候并且锁key对应的map结构中当前客户端的唯一key也存在时,会去调用hincrby命令,将唯一key的值自增一,并且会pexpire设置key的过期时间为30000毫秒,然后返回nil,可以想象这里也是加锁成功的,也会继续去执行定时调度任务,完成锁key过期时间的续约,这里呢,就实现了锁的可重入性。

<T> RFuture<T> tryLockInnerAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
        internalLockLeaseTime = unit.toMillis(leaseTime);

        return evalWriteAsync(getName(), LongCodec.INSTANCE, command,
                "if (redis.call('exists', KEYS[1]) == 0) then " +
                        "redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
                        "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                        "return nil; " +
                        "end; " +
                        "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
                        "redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
                        "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                        "return nil; " +
                        "end; " +
                        "return redis.call('pttl', KEYS[1]);",
                Collections.singletonList(getName()), internalLockLeaseTime, getLockName(threadId));
}

那么当以上这种情况也没有发生呢,这里就会直接返回当前锁的剩余有效期,相应的也不会去执行续约逻辑。此时一直返回到上面的方法:

如果加锁成功就直接返回,否则就会进入一个死循环,去尝试加锁,并且也会在等待一段时间之后一直循环尝试加锁,阻塞住,直到第一个服务实例释放锁。对于不同的服务实例尝试会获取一把锁,也和上面的逻辑类似,都是这样实现了锁的互斥。

private void lock(long leaseTime, TimeUnit unit, boolean interruptibly) throws InterruptedException {
    long threadId = Thread.currentThread().getId();
    Long ttl = tryAcquire(-1, leaseTime, unit, threadId);
    // lock acquired
    if (ttl == null) {
        return;
    }

    RFuture<RedissonLockEntry> future = subscribe(threadId);
    if (interruptibly) {
        commandExecutor.syncSubscriptionInterrupted(future);
    } else {
        commandExecutor.syncSubscription(future);
    }

    try {
        while (true) {
            ttl = tryAcquire(-1, leaseTime, unit, threadId);
            // lock acquired
            if (ttl == null) {
                break;
            }

            // waiting for message
            if (ttl >= 0) {
                try {
                    future.getNow().getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
                } catch (InterruptedException e) {
                    if (interruptibly) {
                        throw e;
                    }
                    future.getNow().getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
                }
            } else {
                if (interruptibly) {
                    future.getNow().getLatch().acquire();
                } else {
                    future.getNow().getLatch().acquireUninterruptibly();
                }
            }
        }
    } finally {
        unsubscribe(future, threadId);
    }
//        get(lockAsync(leaseTime, unit));
}

释放锁

public void unlock() {
        try {
            get(unlockAsync(Thread.currentThread().getId()));
        } catch (RedisException e) {
            if (e.getCause() instanceof IllegalMonitorStateException) {
                throw (IllegalMonitorStateException) e.getCause();
            } else {
                throw e;
            }
        }
}
public RFuture<Void> unlockAsync(long threadId) {
        RPromise<Void> result = new RedissonPromise<Void>();
        RFuture<Boolean> future = unlockInnerAsync(threadId);

        future.onComplete((opStatus, e) -> {
            cancelExpirationRenewal(threadId);

            if (e != null) {
                result.tryFailure(e);
                return;
            }

            if (opStatus == null) {
                IllegalMonitorStateException cause = new IllegalMonitorStateException("attempt to unlock lock, not locked by current thread by node id: "
                        + id + " thread-id: " + threadId);
                result.tryFailure(cause);
                return;
            }

            result.trySuccess(null);
        });

        return result;
}

判断当前客户端对应的唯一key的值是否存在,如果不存在就会返回nil;否则,值自增-1,判断唯一key的值是否大于零,如果大于零,则返回0;否则删除当前锁key,并返回1。

protected RFuture<Boolean> unlockInnerAsync(long threadId) {
        return evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
                "if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " +
                        "return nil;" +
                        "end; " +
                        "local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " +
                        "if (counter > 0) then " +
                        "redis.call('pexpire', KEYS[1], ARGV[2]); " +
                        "return 0; " +
                        "else " +
                        "redis.call('del', KEYS[1]); " +
                        "redis.call('publish', KEYS[2], ARGV[1]); " +
                        "return 1; " +
                        "end; " +
                        "return nil;",
                Arrays.asList(getName(), getChannelName()), LockPubSub.UNLOCK_MESSAGE, internalLockLeaseTime, getLockName(threadId));
}

返回到上一层方法,也是针对返回值进行了操作,如果返回值是1,则会去取消之前的定时续约任务,如果失败了,则会做一些类似设置状态的操作

void cancelExpirationRenewal(Long threadId) {
        ExpirationEntry task = EXPIRATION_RENEWAL_MAP.get(getEntryName());
        if (task == null) {
            return;
        }
        
        if (threadId != null) {
            task.removeThreadId(threadId);
        }

        if (threadId == null || task.hasNoThreads()) {
            Timeout timeout = task.getTimeout();
            if (timeout != null) {
                timeout.cancel();
            }
            EXPIRATION_RENEWAL_MAP.remove(getEntryName());
        }
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/633462.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

BLIP和BLIP2

文章主要是对BLIP2 &#xff08;使用冻结图像编码器和大型语言模型的Bootstrapping语言图像预训练&#xff09;论文的阅读笔记&#xff0c;也对BLIP&#xff08;用于统一视觉语言理解和生成的Bootstrapping语言图像预训练&#xff09;算法进行了简单的介绍。 一、BLIP&#xf…

走进人工智能| Computer Vision 数字化时代的视觉启示录

前言&#xff1a; 计算机视觉是通过模仿人类视觉系统的工作原理&#xff0c;使计算机能够感知、理解和解释图像和视频的能力。 文章目录 序言背景适用领域技术支持应用领域程序员如何学总结 序言 计算机视觉是人工智能领域的一个重要分支&#xff0c;它涉及使计算机能够“看”…

nginx的开始(一)---nginx的安装

文章目录 1.nginx是什么&#xff1f;2.nginx安装2.1.安装准备&#xff1a;2.2.进行安装&#xff1a;2.2.1.apt安装&#xff08;快速&#xff09;2.2.2.源码安装 2.3.配置文件简解&#xff08;nginx.conf&#xff09; 1.nginx是什么&#xff1f; Nginx&#xff08;发音为"e…

AndroidT(13) init 进程 -- first stage init 的初始化 (二)

1.概览 第一阶段的 init 工作主要用于读取系统启动阶段需要的配置信息(例如 linux的bootconfig&#xff0c;cmdline等配置信息&#xff09;、挂载文件系统、安装 kernel 中的模块驱动&#xff0c;最后就是启动第二阶段的 init 来进行 Android 系统相关的组件。第一阶段的 init …

《微服务实战》 第三十章 分布式事务框架seata TCC模式

前言 本章节介绍分布式事务框架seata TCC模式&#xff0c;上一章节介绍seata以及集成到Springboot、微服务框架里。 1、TCC模式 一个分布式的全局事务&#xff0c;整体是 两阶段提交 的模型。全局事务是由若干分支事务组成的&#xff0c;分支事务要满足 两阶段提交 的模型要…

如何利用ChatGPT写毕业论文

如何利用ChatGPT写毕业论文 ChatGPT是什么&#xff1f;利用ChatGPT写毕业论文的步骤1.准备数据2.训练模型3.生成论文4.检查论文 总结地址 ChatGPT是什么&#xff1f; ChatGPT是一个基于GPT-2模型的开源聊天机器人&#xff0c;它可以回答用户的问题&#xff0c;进行闲聊和提供各…

或许是一个新的算法方向?

动动发财的小手&#xff0c;点个赞吧&#xff01; 今日谷歌 DeepMind 使用深度强化学习发现更快的排序算法&#xff0c;相关论文[1]成果已经发表在Nature上。 据报道&#xff1a;该算法可以提速 70&#xff05;&#xff0c;相比之下&#xff0c;快了3倍之多。 摘要 排序或散列等…

230611-通过Doxygen实现项目代码的文档自动化生成(Mac+Win通用)

背景介绍 目前主流的Python项目的文档管理多通过Sphinx实现&#xff1b;当前Sphinx尚未有针对C#等代码的插件&#xff1b;若想对C#的项目代码进行Sphinx的管理&#xff0c;可通过Doxygen导出为xml文件&#xff0c;进行二次转换&#xff1b;有关Doxygen的介绍及使用&#xff0c…

Java使用Opencv进行大图找小图并使用其找图功能进行bilibili视频下载案例

Java使用Opencv进行大图找小图并使用其找图功能进行bilibili视频下载案例 一、Opencv大图找小图说明二、Opencv的window安装1.下载windows下的安装包2.安装3.Java中Opencv加载测试 三、Java中通过Opencv进行模板匹配大图找小图四、进行多图查找五&#xff1a;案例下载bilibili视…

碳排放预测模型 | Python实现基于机器学习回归分析的碳排放预测模型——随机森林、决策树、KNN 和多层感知器 (MLP) 预测分析

文章目录 效果一览文章概述研究内容环境准备源码设计KNNRandom ForestDecision TreeMLPModel Evaluation学习总结参考资料效果一览

【Android开发基础】随机点名系统(关于读取xml资源文件)

文章目录 一、引言二、设计1、读取xml2、下拉框Spinner3、随机算法 三、实施1、子元素随机&#xff08;单位&#xff1a;班级&#xff09;2、父元素随机&#xff08;单位&#xff1a;专业&#xff09;3、指定人数随机4、指定人数混合排序 四、附件 一、引言 描述&#xff1a;这…

【手撕MyBatis源码】动态SQL全流程解析

文章目录 动态SQL概述ifchoose(when、otherwise)trim&#xff08;where、set&#xff09;foreach OGNL表达式BoundSql动态SQL主流程分析SqlNodeDynamicContext源码解析StaticTextSqlNodeTextSqlNodeIfSqlNodeChooseSqlNodeForEachSqlNode 动态脚本结构动态脚本执行 SqlSourceSt…

Spring Cloud - Eureka原理、注册、搭建、应用(全过程详解)

目录 一、Eureka 注册原理 1.1、为什么要使用 Eureka 1.2、Eureka 的工作流程及原理 1.3、eureka 的作用 二、具体实现 2.1、搭建注册中心 2.2、服务注册和部署 2.2.1、user-service 服务注册 2.2.2、服务部署 2.2.3、order-service 服务注册 2.2.4、验证服务 2.3、…

java SSM 药品集中管理系统myeclipse开发mysql数据库springMVC模式java编程计算机网页设计

一、源码特点 java SSM 药品集中管理系统是一套完善的web设计系统&#xff08;系统采用SSM框架进行设计开发&#xff0c;springspringMVCmybatis&#xff09;&#xff0c;对理解JSP java编程开发语言有帮助&#xff0c;系统具有完整的源代 码和数据库&#xff0c;系统主要采…

[神经网络]迁移学习-微调

一、概述 一般的有监督迁移学习分为以下三种&#xff1a; ①将训练好的模型作为特征抽取模块&#xff08;例如以resnet进行特征提取&#xff09; ②在一个相关的任务中训练后直接后直接使用(例如gpt) ③在训练好的模型基础上进行微调 此外还有无监督学习的方式 zero-shot&#…

【集群】LVS+Keepalived群集

文章目录 前言一、Keepalived的概念1. Keepalived 概述2. Keepalived 作用3. Keepalived 实现原理剖析3.1 Keepalived 工作原理3.1 VRRP协议&#xff08;虚拟路由冗余协议&#xff09; 4. Keepalived 主要模块及其作用4.1 健康检查方式&#xff08;学名&#xff1a;探针&#x…

【架构基础】正交设计四原则

数学中的正交&#xff0c;是指相互垂直的两个向量&#xff0c;简单来讲就是平面上的两个垂直线段&#xff0c;其中一个线段变长或减短或者转圈圈&#xff0c;另外一根是不变的也不影响它们的垂直度的。表现为空间的独立性&#xff0c;在软件中我们可以理解为两个只有交叉点而互…

springboot0+java+vuie个人家庭财务理财系统

。本文介绍了个人理财系统的开发全过程。通过分析个人理财系统管理的不足&#xff0c;创建了一个计算机管理个人理财系统的方案。文章介绍了个人理财系统的系统分析部分&#xff0c;包括可行性分析等&#xff0c;系统设计部分主要介绍了系统功能设计和数据库设计。 本个人理财系…

【数据湖架构】在 Azure Data Lake Storage (ADLS)二代上构建数据湖

介绍 一开始&#xff0c;规划数据湖似乎是一项艰巨的任务——决定如何最好地构建数据湖、选择哪种文件格式、是拥有多个数据湖还是只有一个数据湖、如何保护和管理数据湖。并非所有这些都需要在第一天回答&#xff0c;有些可能通过反复试验来确定。构建数据湖没有明确的指南&am…

【C++】一文带你吃透C++多态

&#x1f34e; 博客主页&#xff1a;&#x1f319;披星戴月的贾维斯 &#x1f34e; 欢迎关注&#xff1a;&#x1f44d;点赞&#x1f343;收藏&#x1f525;留言 &#x1f347;系列专栏&#xff1a;&#x1f319; C/C专栏 &#x1f319;那些看似波澜不惊的日复一日&#xff0c;…