Redis系列-5 Redis分布式锁

news2024/12/27 10:08:27

背景:

本文介绍Redis分布式锁的内容,包括Redis相关命令和Lua脚本的介绍,以及操作分布式锁的流程与消息,最后结合Redission源码介绍分布式锁的实现原理。

1.基本命令

1.1 基本键值对的设置

设值: set key value
取值: get key
删除: del key

>set key1 value1
"OK"
>get key1
"value1"
>del key1
"1"

1.2 setnx用法

setnx key value:
当key不存在时,进行设置,返回1(表示操作成功)
当key存在时,不进行设置,返回0(表示操作失败)

>setnx key1 value1
"1"
>setnx key1 value1
"0"

1.3 setex和psetex

setex key seoconds value
等价于原子性地执行了 set key value和expire key seconds
psetex用法与setex相同,区别是setex单位为秒,而psetex是毫秒;

>setex key1 1000 value1
"OK"
>ttl key1
"997"

1.4 set扩展用法

set key value [EX seconds | PX millSeconds] [NX | XX]
seconds EX 表示设置过期时间以秒为单位,millSeconds PX 表示设置过期时间以毫秒为单位;
NX表示当键不存在时执行,并返回OK;否则返回null
XX表示当键存在时执行,并返回OK;否则返回null

>set key1 value1 EX 1000 NX
"OK"
>set key1 value1 EX 1000 NX
null
>set key1 value1 EX 2000 XX
"OK"
>ttl key1
"1996"

2.lua脚本

由于redis是单线程执行的,因此可以原子性地执行lua脚本。因此可通过lua脚本对基本命令进行组合。
格式如下:

EVAL "lua脚本" n KEY... , ARGV...

(1) 通过EVAL命令执行lua脚本;
(2) 可对脚本进行传参,可以传多个KEY和多个ARGV,KEY和ARGV建议使用逗号(,)隔开;
(3) 需要显示指定KEY个数;
(4) lua脚本通过KEYS[i] 和 ARGV[j] 获取传入的参数,下标从1开始;
以下通过案例的方式介绍一下lua脚本的使用。

2.1 加锁

分布式锁的数据结构可以被定义为如下格式:

{
 "lockKey":  {
        "uuid: threadId": num
    }
}

lockKey表示分布式锁:数据库存中存在lockKey键时,表示已有客户端占据了lockKey锁,否则表示lockKey锁未被获取。
uuid: threadId结构包含了UUID唯一字符串,num为获取锁的次数。UUID用于保证上锁和解锁是同一个客户端,num用于实现锁的可重入。
案例:

// 如果锁不存在,则加锁并设置过期时间
if (redis.call('exists', KEYS[1]) == 0) then
    // 设置锁记录锁的获取次数为1
    redis.call('hincrby', KEYS[1], ARGV[2], 1);
    // 设置锁的过期时间
    redis.call('pexpire', KEYS[1], ARGV[1]);
    return nil;
end;

// 如果锁存在,且为自己,锁+1,并重新设置过期时间
if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then
    // 设置锁记录锁的获取次数+1
    redis.call('hincrby', KEYS[1], ARGV[2], 1);
    // 重置锁的过期时间
    redis.call('pexpire', KEYS[1], ARGV[1]);
    return nil;
end;

// 锁已存在,不是自己,则返回锁到期时间
return redis.call('pttl', KEYS[1]);

说明:
上述LUA脚本返回空,说明锁获取成功;否则获取失败并得到锁的过期时间(毫秒)。

其中,redis.call('exists', KEYS[1])表示KEY[1]键是否存在,存在返回1,不存在返回0;redis.call('hincrby', KEYS[1], ARGV[2], 1)表示对哈希类型数据KEYS[1]和ARGV[2]键对应的值加1;redis.call('pexpire', KEYS[1], ARGV[1])表示设置KEYS[1]键的有效期为ARGV[1],单位毫秒;

在redis客户端进行如下操作:

>EVAL "if (redis.call('exists', KEYS[1]) == 0) then redis.call('hincrby', KEYS[1], ARGV[2], 1); redis.call('pexpire', KEYS[1], ARGV[1]); return nil; end; if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then redis.call('hincrby', KEYS[1], ARGV[2], 1); redis.call('pexpire', KEYS[1], ARGV[1]); return nil; end; return redis.call('pttl', KEYS[1]);" 1 myLock 60000 80e6bb0b-b3bb-4ce0-b120-9c7ce917b767:12
null

>hget myLock 80e6bb0b-b3bb-4ce0-b120-9c7ce917b767:12
"1"

>ttl myLock
"54"

>EVAL "if (redis.call('exists', KEYS[1]) == 0) then redis.call('hincrby', KEYS[1], ARGV[2], 1); redis.call('pexpire', KEYS[1], ARGV[1]); return nil; end; if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then redis.call('hincrby', KEYS[1], ARGV[2], 1); redis.call('pexpire', KEYS[1], ARGV[1]); return nil; end; return redis.call('pttl', KEYS[1]);" 1 myLock 60000 80e6bb0b-b3bb-4ce0-b120-9c7ce917b767:12
null

>ttl myLock
"56"

>hget myLock 80e6bb0b-b3bb-4ce0-b120-9c7ce917b767:12
"2"

给上述lua脚本的传参为1 myLock 60000 80e6bb0b-b3bb-4ce0-b120-9c7ce917b767:12

1表示只有一个Key, 其他为ARGV, 即
KEY[1] = myLock
ARGV[1]=60000
ARGV[2]=80e6bb0b-b3bb-4ce0-b120-9c7ce917b767:12

得到的结果如下:

{
 "myLock":  {
        "80e6bb0b-b3bb-4ce0-b120-9c7ce917b767:12": 2
    }
}

表示"myLock"分布式锁已被占用, 获取锁的次数为2次。

2.2 解锁

案例:

// 解锁成功返回1,失败返回0
if (redis.call('del', KEYS[1]) == 1) then 
    // 向Redis发布消息
    redis.call('publish', KEYS[2], ARGV[1]); 
    return 1 
else 
    return 0 
end

说明:
解锁成功后,该lua脚本返回1,解锁失败返回0;
其中: redis.call('del', KEYS[1])表示根据KEYS[1]键删除数据;redis.call('publish', KEYS[2], ARGV[1])表示发布消息 KEYS[2], ARGV[1];

2.3 释放一层锁

// 锁不是被自己占有,直接返回
if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then
    return nil;
end;

// 锁数量-1,如果还大于0,重新设置过期时间;否则删除锁
local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1);
if (counter > 0) then
    // 重置过期时间
    redis.call('pexpire', KEYS[1], ARGV[2]);
    return 0;
else
    redis.call('del', KEYS[1]);
    redis.call('publish', KEYS[2], ARGV[1]);
    return 1;
end;
return nil;

上述Lua脚本返回1表示删除锁成功,返回0表示锁释放一层,返回空表示释放失败。

2.4 续期

// 锁被自己占用,重新设置过期时间,返回1;否则返回0if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then
    redis.call('pexpire', KEYS[1], ARGV[1]);
    return 1;
end;
return 0;

上述Lua脚本返回1表示续期成功,返回0表示续期失败(当前未获取锁)。

3.Redission用法

分布式锁可以直接使用开源的Redission
引入依赖:

<dependency>
    <groupId>org.redisson</groupId>
    <artifactId>redisson</artifactId>
    <version>3.16.1</version>
</dependency>

编码如下:

public static void lock1() {
    RedissonClient redisson = getRedissonClient();
    RLock lock = redisson.getLock("myLock");
    // 获取锁
    lock.lock();
    try {
        // 业务逻辑
    } catch (InterruptedException e) {
        e.printStackTrace();
    } finally {
        // 释放锁
        lock.unlock();
    }
    redisson.shutdown();
    System.out.println("Begin end");
}

// 获取redis客户端实例
private static RedissonClient getRedissonClient() {
    Config config = new Config();
    config.setLockWatchdogTimeout(600*1000);
    config.useSingleServer().setAddress("redis://127.0.0.1:6001").setPassword("xxx");
    RedissonClient redisson = Redisson.create(config);
    return redisson;
}

说明:redisson.getLock(“myLock”)中的myLock即为分布式锁的键,多个客户端实例需要保证键相同。
lock.lock()用于执行获取锁的逻辑,获取成功后直接返回;获取失败后进入等待队列阻塞;lock.unlock();用于手动解锁。
getRedissonClient方法用于获取redis客户端实例,其中的setLockWatchdogTimeout方法用于设置看门狗的超时时间,单位毫秒,默认为30000(30秒)。
使用lock.lock()方法获取锁时不需要设置锁的过期时间,在获取锁成功后,Redisson通过看门狗机制,进行锁的续期,每经过WatchdogTimeout/3时间执行一次续期操作。
当lock.unlock()释放锁时,会同时关闭看门狗。

4.流程和消息

4.1 流程介绍

屏蔽底层Redis对锁的实现方式,仅用Lock和UnLock表示获取锁和释放锁,分布式锁的竞争流程可表示如下图所示:
在这里插入图片描述
[1] 客户端ClientA向Redis发送获取锁的消息,锁key为myLock(自定义);
[2] Redis响应成功,表示占锁成功;
[3] 客户端ClientB向Redis发送获取锁的消息,key为myLock;
[4] 服务器判断此时myLock锁已被ClientA占有,Redis响应失败;
[5] Client B 向Redis发送订阅消息订阅myChannel频道,等待收到通知;
[6-7] ClientA释放锁同时发布消息至Redis的myChannel频道;
[8] Redis收到publish消息后,向所有订阅了myChannel频道的客户端发送message通知消息;
[9] ClientB收到订阅的消息后,知道锁已被释放,再次获取锁;
其中:消息6和消息7是lua脚本执行的,因此具备原子性;当客户端收到message消息时,表明锁已被释放,可以重新竞争锁。
另外,对于客户端ClientA,在消息2-6之间,Redis的看门狗机制会自动为myLock续期。

4.2 消息介绍

Auth消息:

*2
$4
AUTH
$8
Root@123

+OK

其中:*2 表示由两个输入字符串;
$4表示第一个字符串长度为4,即AUTH;
$8表示第二个字符串长度为8,即Root@123;
+OK为Redis返回的结构,表示鉴权成功;
解析后为:

client: AUTH Root@123
Redis: OK

PING/PONG消息:

*1
$4
PING

+PONG

客户端向Redis发送PING心跳消息,Redis响应PONG消息。

QUITE消息:

*1
$4
QUIT

+OK

客户端向Redis发送QUITE退出消息,Redis响应OK消息。

以下分场景介绍Redis消息,包括成功获取锁—锁的续期—锁的释放和发布通知以及获取锁失败—锁的订阅—收到通知消息—取消订阅等,为简化篇幅,将省略AUTH、PING/PONG、AUITE等重复的内容。

4.2.1 成功获取锁

通过wireshark抓包以及经过tcp.port == 16379 && tcp contains "exists"过滤条件,得到:

*6
$4 
EVAL
$339 
if (redis.call('exists', KEYS[1]) == 0) then redis.call('hincrby', KEYS[1], ARGV[2], 1); redis.call('pexpire', KEYS[1], ARGV[1]); return nil; end; if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then redis.call('hincrby', KEYS[1], ARGV[2], 1); redis.call('pexpire', KEYS[1], ARGV[1]); return nil; end; return redis.call('pttl', KEYS[1]);
$1
1
$6
myLock
$4
6000
$39
80e6bb0b-b3bb-4ce0-b120-9c7ce917b767:12

解析后为:

EVAL "lua脚本" 1 myLock 6000 80e6bb0b-b3bb-4ce0-b120-9c7ce917b767:12

4.2.2 获取锁后,锁的续期

通过wireshark抓包以及经过tcp.port == 16379 && tcp contains "pexpire" && tcp contains "hexists""过滤条件,得到:

*6
$4
EVAL
$120
if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then redis.call('pexpire', KEYS[1], ARGV[1]); return 1; end; return 0;
$1
1
$6
myLock
$4
6000
$39
80e6bb0b-b3bb-4ce0-b120-9c7ce917b767:12

解析后为:

EVAL "lua脚本" 1 myLock 6000 80e6bb0b-b3bb-4ce0-b120-9c7ce917b767:12

4.2.3 释放锁

通过wireshark抓包以及经过tcp.port == 16379 && tcp contains "pexpire" && tcp contains "hexists""过滤条件,得到:

*8
$4
EVAL
$305
if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then return nil;end; local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); if (counter > 0) then redis.call('pexpire', KEYS[1], ARGV[2]); return 0; else redis.call('del', KEYS[1]); redis.call('publish', KEYS[2], ARGV[1]); return 1; end; return nil;
$1
2
$6
myLock
$31
redisson_lock__channel:{myLock}
$1
0
$4
6000
$39
80e6bb0b-b3bb-4ce0-b120-9c7ce917b767:12

解析后为:

EVAL "lua脚本" 2 myLock redisson_lock__channel:{myLock} 0 6000 80e6bb0b-b3bb-4ce0-b120-9c7ce917b767:12

4.2.4 获取锁失败后订阅

通过wireshark抓包以及经过tcp.port == 16379 && tcp contains "pexpire" && tcp contains "hexists""过滤条件,得到:

*2
$9
SUBSCRIBE
$31
redisson_lock__channel:{myLock}

解析后为:

EVAL SUBSCRIBE redisson_lock__channel:{myLock}

4.2.5 订阅后收到通知消息

通过wireshark抓包以及经过tcp.port == 16379 && tcp contains "pexpire" && tcp contains "hexists""过滤条件,得到:

*3
$7
message
$31
redisson_lock__channel:{myLock}
$1
0

解析后为:

message redisson_lock__channel:{myLock} 0

4.2.6 取消订阅

通过wireshark抓包以及经过tcp.port == 16379 && tcp contains "pexpire" && tcp contains "hexists""过滤条件,得到:

*2
$11
UNSUBSCRIBE
$31
redisson_lock__channel:{myLock}

解析后为:

EVAL UNSUBSCRIBE redisson_lock__channel:{myLock}

5.源码

源码介绍围绕下图展开,如果对下图的逻辑线比较属性,直接跳过本章内容。
在这里插入图片描述
在介绍源码前,有必要了解一下两个概念: Redis的订阅发布机制和Semaphore.
Redis订阅和发布机制:
打开两个Redis客户端,分别执行subscribe myChannel订阅myChannel频道的消息:

>subscribe myChannel
切换到推送/订阅模式,关闭标签页来停止接收信息。
1) "subscribe"
2) "myChannel"
3) "1"

再打开一个客户端,执行publish myChannel key1告诉Redis,向订阅了myChannel的客户端发送消息:

>publish myChannel key1
"2"

返回值2表示有两个订阅客户端。

客户端收到Redis的通知消息:

1) "message"
2) "myChannel"
3) "key1"

Semaphore:

说明:由于在线程专题已经详细介绍过AQS,这里涉及AQS的内容不再展开介绍。

Semaphore是JUC中的一个并发工具类,内部维持了一个state的整数记录状态值,并提供了acquireXXX和release方法用于获取和释放锁(共享锁)。当state的值小于acquireXXX时,线程会进入AQS的等待队列,处于阻塞状态; release方法被时,state属性会增加,如果大于0,则从等待队列中唤醒一个。

如下案例中,Semaphore创建时,state设置为0,当客户端ClientA调用acquire方法获取锁时,进入Semaphore的等待队列处于阻塞状态;
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-7H3sF8Ns-1717209303379)(C:\Users\0216001379\AppData\Roaming\Typora\typora-user-images\1716255100424.png)]
当客户端ClientB调用release方法释放锁时(本质是对state值进行加法运算),此时Semaphore会自动唤醒处于等待队列中的ClientA.

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-n602PIUM-1717209303380)(C:\Users\0216001379\AppData\Roaming\Typora\typora-user-images\1716255109685.png)]
客户端A唤醒时,会再次尝试获取锁,此时Semaphore拥有共享锁的数量为1,与acquire方法获取数量相同(默认获取1个),获取锁正常。
在这里插入图片描述接下来,根据如下案例进行源码介绍:

public static void main(String[] args) {
    RedissonClient client = getClient();
    RLock lock = client.getLock("myLock");
    System.out.println("Lock1 Begin exec");
    lock.lock();
    try {
        System.out.println("Lock1 Begin...");
        Thread.sleep(1000 * 60);
        System.out.println("Lock1 End.");
    } catch (InterruptedException e) {
        e.printStackTrace();
    } finally {
        lock.unlock();
    }
    client.shutdown();
}

5.1 获取锁

源码入口lock.lock()->lock(-1, null, false)

说明:为减少代码重复度,会提取公共部分代码形成模板方法,模板方法相对于提取前的方法因存在扩展逻辑,导致可读性降低(虽然整体可维护性提升)。如lock(long leaseTime, TimeUnit unit, boolean interruptibly)方法支持响应中断与忽略中断两种情况,支持设置超时时间与不设置超时时间两种情况。

说明:是否响应中断(即被中断时抛出异常或者不抛出异常)不影响解析主线逻辑,在介绍源码时,认为interruptibly为false, 对interruptibly=true分为不进行说明。

private void lock(long leaseTime, TimeUnit unit, boolean interruptibly) throws InterruptedException {
 long threadId = Thread.currentThread().getId();
    // part-1.执行lua脚本获取锁
 Long ttl = tryAcquire(-1, leaseTime, unit, threadId);
 if (ttl == null) {
  return;
 }

    // part-2.向Redis发布订阅请求
 RFuture<RedissonLockEntry> future = subscribe(threadId);
 commandExecutor.syncSubscription(future);

 try {
        // part-3.while死循环中获取锁
  while (true) {
   ttl = tryAcquire(-1, leaseTime, unit, threadId);
   if (ttl == null) {
    break;
   }

   if (ttl >= 0) {
    try {
     future.getNow().getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
    } catch (InterruptedException e) {
     future.getNow().getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
    }
   } else {
    future.getNow().getLatch().acquireUninterruptibly();
   }
  }
 } finally {
        // part-4.获取锁成功后,取消订阅
  unsubscribe(future, threadId);
 }
}

上述lock方法的主体逻辑可以分为4个部分:
[1] part-1:执行Lua脚本尝试获取锁,获取锁成功,则直接返回;
[2] part-2: 获取锁失败后,向Redis发布订阅;
[3] part-3: while死循环,获取锁或者抛出异常后退出循环;
[4] part-4: 退出while循环(获取锁成功或抛出异常),向Redis发送取消订阅消息;
整体流程比较清晰,从逻辑上可以切分为两个部分:获取锁成功场景和获取锁失败场景。

5.1.1 获取锁成功:

private void lock(long leaseTime, TimeUnit unit, boolean interruptibly) throws InterruptedException {
 long threadId = Thread.currentThread().getId();
    // part-1.执行lua脚本获取锁
 Long ttl = tryAcquire(-1, leaseTime, unit, threadId);
 if (ttl == null) {
  return;
 }
 // Ignore ...
}

tryAcquire(-1, leaseTime, unit, threadId)返回null时,获取锁成功,退出lock方法。进入tryAcquire方法:

private Long tryAcquire(long waitTime, long leaseTime, TimeUnit unit, long threadId) {
    return get(tryAcquireAsync(waitTime, leaseTime, unit, threadId));
}

其中tryAcquireAsync返回一个Future对象,get方法阻塞等待(通过Future的await方法)该Future执行完成并返回结果或者抛出异常(包装后的RedisException)。进入tryAcquireAsync方法:

private <T> RFuture<Long> tryAcquireAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId) {
    // 1.尝试从Redis获取锁,返回一个异步的Future对象
    RFuture<Long> ttlRemainingFuture;
    if (leaseTime != -1) {
        ttlRemainingFuture = tryLockInnerAsync(waitTime, leaseTime, unit, threadId, RedisCommands.EVAL_LONG);
    } else {
        ttlRemainingFuture = tryLockInnerAsync(waitTime, internalLockLeaseTime,
                                               TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);
    }
    // 2.在Future对象添加回调函数,完成时回调
    ttlRemainingFuture.onComplete((ttlRemaining, e) -> {
        if (e != null) {
            return;
        }

        // lock acquired
        if (ttlRemaining == null) {
            if (leaseTime != -1) {
                internalLockLeaseTime = unit.toMillis(leaseTime);
            } else {
                // 3.通过看门狗对锁进行自动续期
                scheduleExpirationRenewal(threadId);
            }
        }
    });
    return ttlRemainingFuture;
}

tryAcquireAsync方法整体逻辑较为简单:尝试从Redis获取锁,返回一个异步的Future对象;然后在Future对象添加回调逻辑,在Future完成时回调。

tryAcquireAsync方法仍然是个模板方法,支持设置过期时间和不设置过期时间:

[1]不设置时间: 向Redis申请锁时携带看门狗的超时时间(在3.Redission用法章节中通过Config对象的setLockWatchdogTimeout方法设置),之后通过看门狗续期。

[2]设置过期时间: 向Redis申请锁时携带执行的超时时间,超时后自动释放锁,因此不需要看门狗。

这里有两个重点方法需要关注一下:tryLockInnerAsync向Redis申请锁和scheduleExpirationRenewal开启看门狗。

tryLockInnerAsync方法就是将Lua脚本的执行包装为异步执行,返回一个Future对象:

<T> RFuture<T> tryLockInnerAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
    return evalWriteAsync(getRawName(), LongCodec.INSTANCE, command,
                          "if (redis.call('exists', KEYS[1]) == 0) then " +
                          "redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
                          "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                          "return nil; " +
                          "end; " +
                          "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
                          "redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
                          "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                          "return nil; " +
                          "end; " +
                          "return redis.call('pttl', KEYS[1]);",
                          Collections.singletonList(getRawName()), unit.toMillis(leaseTime), getLockName(threadId));
}

这里的lua脚本在2.1章节已进行结合,不再赘述。

scheduleExpirationRenewal功能是开启一个看门狗线程,定期(1/3的看门狗超时时间)向Redis续期,主线逻辑如下所示:

protected void scheduleExpirationRenewal(long threadId) {
   //...
   // scheduleExpirationRenewal的核心逻辑在于调用renewExpiration
   renewExpiration();
   //...
}


private void renewExpiration() {
    //...
    Timeout task = commandExecutor.getConnectionManager().newTimeout(new TimerTask() {
        @Override
        public void run(Timeout timeout) throws Exception {          
            //...
            // 调用lua脚本-为锁续期
            RFuture<Boolean> future = renewExpirationAsync(threadId);
            future.onComplete((res, e) -> {   
                //...
                // 每internalLockLeaseTime/3时间,回调自身,开始循环
                renewExpiration();   
                //...                    
            });
        }
    }, internalLockLeaseTime / 3, TimeUnit.MILLISECONDS);
    
    //...
}

// 对lua脚本异步执行的封装,与2.4中介绍的lua脚本相同
protected RFuture<Boolean> renewExpirationAsync(long threadId) {
    return evalWriteAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
            "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
                    "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                    "return 1; " +
                    "end; " +
                    "return 0;",
            Collections.singletonList(getRawName()),
            internalLockLeaseTime, getLockName(threadId));
}

5.1.2 获取锁失败:

当锁已被其他客户端占有,获取锁失败,进入订阅-阻塞等待锁释放流程:

private void lock(long leaseTime, TimeUnit unit, boolean interruptibly) throws InterruptedException {
 //...

    // part-2.向Redis发布订阅请求
 RFuture<RedissonLockEntry> future = subscribe(threadId);
 commandExecutor.syncSubscription(future);

 try {
        // part-3.while死循环中获取锁
  while (true) {
   ttl = tryAcquire(-1, leaseTime, unit, threadId);
   if (ttl == null) {
    break;
   }

   if (ttl >= 0) {
    try {
     future.getNow().getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
    } catch (InterruptedException e) {
     future.getNow().getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
    }
   } else {
    future.getNow().getLatch().acquireUninterruptibly();
   }
  }
 } finally {
        // part-4.获取锁成功后,取消订阅
  unsubscribe(future, threadId);
 }
}

[1] 先看一下向Redis发布订阅请求部分:

// 向lua发送订阅消息,并注册一个监听器监听订阅频道的通知消息
RFuture<RedissonLockEntry> future = subscribe(threadId);
// 与前文结合的get方法逻辑相似,阻塞等待Future执行完成,即等待订阅消息发送给Redis并说道订阅成功消息
commandExecutor.syncSubscription(future);

重点在于第一个方法subscribe(threadId):向Redis发送订阅消息,并注册一个监听器,监听通知消息。
向redis发送订阅消息, 消息内容如章节4.2.4中的SUBSCRIBE redisson_lock__channel:{myLock}, 该消息表示客户端订阅redisson_lock__channel:{myLock}频道, 当Redis服务器收到该频道的通知消息后,会以Message类型的消息通知给订阅的客户端,消息内容为:message redisson_lock__channel:{myLock} 0.
当Redssion客户端收到message redisson_lock__channel:{myLock} 0后,监听器被调用,监听器的注册和监听器的内容后面介绍。
[2] 接着进入while死循环,只有抛出异常或者获取锁成功才会退出:

while (true) {
    // tryAcquire前文已介绍:获取锁成功返回null, 否则返回锁的超时时间
    ttl = tryAcquire(-1, leaseTime, unit, threadId);
    if (ttl == null) {
        break;
    }

    if (ttl >= 0) {
        try {
            // 根据锁的超时时间,定时阻塞等待锁,超时后,自动苏醒
            future.getNow().getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
        } catch (InterruptedException e) {
            future.getNow().getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
        }
    } else {
        // 当锁没有设置超时时间时,阻塞等待,直到被唤醒
        future.getNow().getLatch().acquireUninterruptibly();
    }
}

这里的future.getNow().getLatch()返回的是一个Semaphore对象,初始化时state设置为0,因此调用acquire方法会陷入等待队列。唤醒逻辑在5.3 订阅和通知章节中介绍。
[3] unsubscribe(future, threadId)用于取消订阅,能进入finnally说明有异常抛出或者已经获取锁,从而不需要再监听redis的通知,unsubscribe核心是删除注册的监听器。

5.2 释放锁

Redssion提供的分布式锁支持可重入,因此多次获取需要多次释放。根据案例的lock.unlock()进入unlock方法:

public void unlock() {
    try {
        get(unlockAsync(Thread.currentThread().getId()));
    } catch (RedisException e) {
        if (e.getCause() instanceof IllegalMonitorStateException) {
            throw (IllegalMonitorStateException) e.getCause();
        } else {
            throw e;
        }
    }
}

get是5.1章中结合过,这里直接进入unlockAsync方法,主线逻辑如下:

public RFuture<Void> unlockAsync(long threadId) {
    RPromise<Void> result = new RedissonPromise<>();
    //  向Redis发送解锁消息
    RFuture<Boolean> future = unlockInnerAsync(threadId);
    future.onComplete((opStatus, e) -> {
        // 关闭看门狗
        cancelExpirationRenewal(threadId);
  //...
    });
    return result;
}

核心逻辑在于unlockInnerAsync:

protected RFuture<Boolean> unlockInnerAsync(long threadId) {
    return evalWriteAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
                          "if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " +
                          "return nil;" +
                          "end; " +
                          "local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " +
                          "if (counter > 0) then " +
                          "redis.call('pexpire', KEYS[1], ARGV[2]); " +
                          "return 0; " +
                          "else " +
                          "redis.call('del', KEYS[1]); " +
                          "redis.call('publish', KEYS[2], ARGV[1]); " +
                          "return 1; " +
                          "end; " +
                          "return nil;",
                          Arrays.asList(getRawName(), getChannelName()), LockPubSub.UNLOCK_MESSAGE, internalLockLeaseTime, getLockName(threadId));
}

这里的lua脚本用于释放一层锁,如果释放完一层锁后,锁的数量为0,则删除对应的key(释放锁), 此时还会发布一条消息通知Redis,可参考2.3 释放一层锁

5.3 订阅和通知

章节5.2中介绍了客户端获取锁失败后Redis订阅,然后进入等待队列阻塞;在章节5.3中介绍了释放锁以及向Redis发布通知消息,本章节的内容是将二者衔接起来。

订阅和通知的核心功能是:阻塞在等待队列中的客户端将会因为通知消息而被唤醒。

Redis通知Redission:

客户端与Redis服务器底层的通讯是基于TCP链,Redssion使用Netty进行了封装,在pipeline中添加了CommandPubSubDecoder解码器,该解码器中存在如下逻辑:

protected void decodeResult(CommandData<Object, Object> data, List<Object> parts, Channel channel,Object result) throws IOException {
 //...
 if (result instanceof Message) {
  //...
  if (result instanceof PubSubMessage) {
   pubSubConnection.onMessage((PubSubMessage) result);
  }
  //...
 }
 // ...
}

当接收到Message且是PubSubMessage类型的消息(即前文介绍的message redisson_lock__channel:{myLock} 0消息)时,调用pubSubConnection.onMessage((PubSubMessage) result),进入该方法:

public void onMessage(PubSubMessage message) {
    for (RedisPubSubListener<Object> redisPubSubListener : listeners) {
        redisPubSubListener.onMessage(message.getChannel(), message.getValue());
    }
}

这里会调用注册的监听器,包括5.1.2 获取锁失败章节中注册的监听器。

Redission注册监听器:
继续看一下5.1.2 获取锁失败章节RFuture<RedissonLockEntry> future = subscribe(threadId)逻辑:

protected RFuture<RedissonLockEntry> subscribe(long threadId) {
    return pubSub.subscribe(getEntryName(), getChannelName());
}

public RFuture<E> subscribe(String entryName, String channelName) {
    //...
    RedisPubSubListener<Object> listener = createListener(channelName, value);
    service.subscribe(LongCodec.INSTANCE, channelName, semaphore, listener);
    //...
}

通过createListener创建监听器,然后将监听器注册到RedisPubSubConnection对象的listeners属性中:

public class RedisPubSubConnection extends RedisConnection {
    // 监听器容器
    final Queue<RedisPubSubListener<Object>> listeners = new ConcurrentLinkedQueue<RedisPubSubListener<Object>>();
    
    // 添加监听器的方法
    public void addListener(RedisPubSubListener<?> listener) {
        listeners.add((RedisPubSubListener<Object>) listener);
    }
}

至于创建listener后,如何调用RedisPubSubConnection的addListener方法可通过代码追踪和Bebug进行了解,不是重点内容;这里重点关注的是这个监听器的内部逻辑,即当这个监听器被调用时触发的逻辑:

private RedisPubSubListener<Object> createListener(String channelName, E value) {
    RedisPubSubListener<Object> listener = new BaseRedisPubSubListener() {
        @Override
        public void onMessage(CharSequence channel, Object message) {
            if (!channelName.equals(channel.toString())) {
                return;
            }
            PublishSubscribe.this.onMessage(value, (Long) message);
        }

      //...
    };
    return listener;
}

channelName.equals(channel.toString())用于判断收到的消息是否是订阅的消息,如前文介绍的订阅的频道是redisson_lock__channel:{myLock}, 此时会校验channelName。
核心逻辑在于PublishSubscribe.this.onMessage(value, (Long) message):

protected void onMessage(RedissonLockEntry value, Long message) {
    //...
    value.getLatch().release();
    //...
}

value.getLatch()获取的是前文介绍的Semaphore对象,调用release时会修改state值,并唤醒一个等待的线程。
基于上述介绍:线程获取分布式锁失败后,向Redis订阅消息,并注册监听器,然后陷入Semaphore的等待队列;当其他客户端释放锁时,同时会发布通知消息给Redis服务器;Redis服务器收到消息后,向订阅的客户端发送通知消息;客户端收到通知消息后,触发监听器逻辑,监听器基于Semaphore机制,唤醒阻塞的线程。线程被唤醒后再次尝试获取分布式锁。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1807150.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

深度网络及经典网络简介

深度网络及经典网络简介 导语加深网络一个更深的CNN提高识别精度Data Augmentation 层的加深 经典网络VGGGoogLeNetResNet 高速学习迁移学习GPU分布式学习计算位缩减 强化学习总结参考文献 导语 深度学习简单来说&#xff0c;就是加深了层数的神经网络&#xff0c;前面已经提到…

独立游戏《星尘异变》UE5 C++程序开发日志4——实现任务系统

目录 一、任务的数据结构 二、任务栏 三、随机事件奖励 1.随机事件的结构 2.随机事件池的初始化 3.生成随机事件 本游戏作为工厂游戏&#xff0c;任务系统的主要功能就是给玩家生产的目标和动力&#xff0c;也就是给玩家发布一个需要一定数量某星尘的订单&#xff0c;玩家…

5 种技术,可用于系统中的大数据模型

文章目录 一、说明二、第一种&#xff1a;批量大小三、第二种&#xff1a;主动学习四、第三种&#xff1a;增加代币数量五、第四种&#xff1a; 稀疏激活六、第五种&#xff1a;过滤器和更简单的模型后记 一、说明 以下是本文重要观点的摘要。阅读它以获取更多详细信息/获取原…

【CTF MISC】XCTF GFSJ0170 János-the-Ripper Writeup(文件提取+ZIP压缩包+暴力破解)

Jnos-the-Ripper 暂无 解法 用 winhex 打开&#xff0c;提到了 flag.txt。 用 binwalk 扫描&#xff0c;找到一些 zip 压缩包。 binwalk misc100用 foremost 提取文件。 foremost misc100 -o 100flag.txt 在压缩包里。 但是压缩包需要解压密码。 用 Ziperello 暴力破解。 不…

JAVA-LeetCode 热题 100 第56.合并区间

思路&#xff1a; class Solution {public int[][] merge(int[][] intervals) {if(intervals.length < 1) return intervals;List<int[]> res new ArrayList<>();Arrays.sort(intervals, (o1,o2) -> o1[0] - o2[0]);for(int[] interval : intervals){if(res…

vue2中的插槽使用以及Vuex的使用

插槽分为默认插槽&#xff0c;定名插槽还有作用域插槽 一.默认插槽&#xff0c;定名插槽 //app.vue <template> <div class"container"><CategoryTest title"美食" :listData"foods"><img slot"center" src&qu…

前端 移动端 手机调试 (超简单,超有效 !)

背景&#xff1a;webpack工具构建下的vue项目 1. 找出电脑的ipv4地址 2. 替换 host 3. 手机连接电脑热点或者同一个wifi 。浏览器打开链接即可。

【召回第一篇】召回方法综述

各个网站上找的各位大神的优秀回答&#xff0c;记录再此。 首先是石塔西大佬的回答&#xff1a;工业界推荐系统中有哪些召回策略&#xff1f; 万变不离其宗&#xff1a;用统一框架理解向量化召回前言常读我的文章的同学会注意到&#xff0c;我一直强调、推崇&#xff0c;不要…

探索智慧商场的功能架构与应用

在数字化和智能化的浪潮下&#xff0c;智慧商场已经成为零售业的重要发展方向之一。智慧商场系统的功能架构设计与应用&#xff0c;结合了现代信息技术和零售业的实际需求&#xff0c;为商场的管理和运营提供了全新的解决方案。本文将深入探讨智慧商场的功能架构与应用&#xf…

2024高考作文-ChatGPT完成答卷,邀请大家来打分

高考&#xff0c;愿你脑洞大开&#xff0c;知识点全都扎根脑海&#xff1b;考试时手感倍儿棒&#xff0c;答题如行云流水&#xff1b;成绩公布时&#xff0c;笑容如春风拂面&#xff0c;心情如阳光普照&#xff01;高考加油&#xff0c;你一定行&#xff01; 新课标I卷 试题内…

2024年6月9日 (周日) 叶子游戏新闻

万能嗅探: 实测 网页打开 某视频号、某音、某红薯、某站&#xff0c;可以做到无水印的视频和封面下载功能哦&#xff0c;具体玩法大家自行发挥吧。 《Funko Fusion》发布新预告 20款影视作品齐聚一堂第三人称动作游戏新作《Funko Fusion》今日发布最新实机演示。该游戏融合了整…

《python程序语言设计》2018版第5章第47题绘制随机球,在一个宽120高100的矩形里绘制随机的点

这个题其实并不难。 首先我们利用turtle功能绘制一个矩形&#xff0c;圆心点题里要求的是0&#xff0c;0 这个好办 然后我们根据宽120&#xff0c;高100计算一下。肯定是正负两个值参与其中。 坐标点如下 建立矩形代码如下 turtle.penup() turtle.goto(-60, 50) turtle.pend…

程序的基本结构、cout语句(c++语言)

一、如何下载Dev C 登录网站&#xff1a;ht.51goc.com 二、安装Dev C 一、启动Dev C 双击桌面的图标 二、新建一个程序 三、复制一个程序 请你复制以下代码到“程序编辑区” #include<bits/stdc.h> using namespace std; int main() { cout<<"Hell…

Segment Anything CSharp| 在 C# 中通过 OpenVINO™ 部署 SAM 模型实现万物分割

​ OpenVINO™ C# API 是一个 OpenVINO™ 的 .Net wrapper&#xff0c;应用最新的 OpenVINO™ 库开发&#xff0c;通过 OpenVINO™ C API 实现 .Net 对 OpenVINO™ Runtime 调用.Segment Anything Model&#xff08;SAM&#xff09;是一个基于Transformer的深度学习模型&#x…

G盘文件系统损坏的应对与预防全攻略

在日常使用电脑的过程中&#xff0c;我们时常会碰到各种磁盘问题&#xff0c;其中G盘文件系统损坏是一个较为常见且棘手的问题。当G盘文件系统损坏时&#xff0c;不仅可能导致重要数据丢失&#xff0c;还可能影响系统的稳定性和运行效率。本文将详细探讨G盘文件系统损坏的现象、…

RK3568笔记三十一:ekho 6.3 文本转语音移植

若该文为原创文章&#xff0c;转载请注明原文出处。 移植的目的是在在OCR识别基础上增加语音播放&#xff0c;把识别到的文字直接转TTS播报出来&#xff0c;形成类似点读机的功能。 1、下载文件 libsndfile-1.0.28.tar.gz ekho-6.3.tar.xz 2、解压 tar zxvf libsndfile-1.0…

有序二叉树java实现

类实现&#xff1a; package 树;import java.util.LinkedList; import java.util.Queue;public class BinaryTree {public TreeNode root;//插入public void insert(int value){//插入成功之后要return结束方法TreeNode node new TreeNode(value);//如果root为空的话插入if(r…

Nacos的配置中心

1.前言 除了注册中心和负载均衡之外, Nacos还是⼀个配置中心, 具备配置管理的功能. Namespace 的常用场景之一是不同环境的配置区分隔离&#xff0c; 例如开发测试环境和⽣产环境的配置隔离。 1.1 为什么需要配置中心&#xff1f; 当前项目的配置都在代码中&#xff0c;会存…

6.7.12 使用 SWIN Transformer 通过热图像实现乳腺癌检测系统

乳腺癌是重大的公共卫生挑战&#xff0c;需要有效的诊断方法。虽然超声、乳房 X 线照相和 MRI 仍然至关重要&#xff0c;但它们在定期、短间隔大规模筛查中的实用性有限。 热成像作为一种非侵入性且经济有效的选择&#xff0c;具有常规自我筛查的潜力。本研究利用基于自注意力…

java中异常-异常概述+异常体系结构

一、异常概述 1、什么是异常&#xff1f; java程序在运行时出现的不正常情况 2、java中提供的默认的异常处理机制 java中对java程序运行时可能会出现的每种不正常情况都创建了一个唯一对应的类&#xff0c;在java程序运行时如果出现不正常情况&#xff0c;java程序就会创建…