背景:
本文介绍Redis分布式锁的内容,包括Redis相关命令和Lua脚本的介绍,以及操作分布式锁的流程与消息,最后结合Redission源码介绍分布式锁的实现原理。
1.基本命令
1.1 基本键值对的设置
设值: set key value
取值: get key
删除: del key
>set key1 value1
"OK"
>get key1
"value1"
>del key1
"1"
1.2 setnx用法
setnx key value:
当key不存在时,进行设置,返回1(表示操作成功)
当key存在时,不进行设置,返回0(表示操作失败)
>setnx key1 value1
"1"
>setnx key1 value1
"0"
1.3 setex和psetex
setex key seoconds value
等价于原子性地执行了 set key value和expire key seconds
psetex用法与setex相同,区别是setex单位为秒,而psetex是毫秒;
>setex key1 1000 value1
"OK"
>ttl key1
"997"
1.4 set扩展用法
set key value [EX seconds | PX millSeconds] [NX | XX]
seconds EX 表示设置过期时间以秒为单位,millSeconds PX 表示设置过期时间以毫秒为单位;
NX表示当键不存在时执行,并返回OK;否则返回null
XX表示当键存在时执行,并返回OK;否则返回null
>set key1 value1 EX 1000 NX
"OK"
>set key1 value1 EX 1000 NX
null
>set key1 value1 EX 2000 XX
"OK"
>ttl key1
"1996"
2.lua脚本
由于redis是单线程执行的,因此可以原子性地执行lua脚本。因此可通过lua脚本对基本命令进行组合。
格式如下:
EVAL "lua脚本" n KEY... , ARGV...
(1) 通过EVAL命令执行lua脚本;
(2) 可对脚本进行传参,可以传多个KEY和多个ARGV,KEY和ARGV建议使用逗号(,)隔开;
(3) 需要显示指定KEY个数;
(4) lua脚本通过KEYS[i] 和 ARGV[j] 获取传入的参数,下标从1开始;
以下通过案例的方式介绍一下lua脚本的使用。
2.1 加锁
分布式锁的数据结构可以被定义为如下格式:
{
"lockKey": {
"uuid: threadId": num
}
}
lockKey表示分布式锁:数据库存中存在lockKey键时,表示已有客户端占据了lockKey锁,否则表示lockKey锁未被获取。
uuid: threadId结构包含了UUID唯一字符串,num为获取锁的次数。UUID用于保证上锁和解锁是同一个客户端,num用于实现锁的可重入。
案例:
// 如果锁不存在,则加锁并设置过期时间
if (redis.call('exists', KEYS[1]) == 0) then
// 设置锁记录锁的获取次数为1
redis.call('hincrby', KEYS[1], ARGV[2], 1);
// 设置锁的过期时间
redis.call('pexpire', KEYS[1], ARGV[1]);
return nil;
end;
// 如果锁存在,且为自己,锁+1,并重新设置过期时间
if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then
// 设置锁记录锁的获取次数+1
redis.call('hincrby', KEYS[1], ARGV[2], 1);
// 重置锁的过期时间
redis.call('pexpire', KEYS[1], ARGV[1]);
return nil;
end;
// 锁已存在,不是自己,则返回锁到期时间
return redis.call('pttl', KEYS[1]);
说明:
上述LUA脚本返回空,说明锁获取成功;否则获取失败并得到锁的过期时间(毫秒)。
其中,redis.call('exists', KEYS[1])
表示KEY[1]键是否存在,存在返回1,不存在返回0;redis.call('hincrby', KEYS[1], ARGV[2], 1)
表示对哈希类型数据KEYS[1]和ARGV[2]键对应的值加1;redis.call('pexpire', KEYS[1], ARGV[1])
表示设置KEYS[1]键的有效期为ARGV[1],单位毫秒;
在redis客户端进行如下操作:
>EVAL "if (redis.call('exists', KEYS[1]) == 0) then redis.call('hincrby', KEYS[1], ARGV[2], 1); redis.call('pexpire', KEYS[1], ARGV[1]); return nil; end; if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then redis.call('hincrby', KEYS[1], ARGV[2], 1); redis.call('pexpire', KEYS[1], ARGV[1]); return nil; end; return redis.call('pttl', KEYS[1]);" 1 myLock 60000 80e6bb0b-b3bb-4ce0-b120-9c7ce917b767:12
null
>hget myLock 80e6bb0b-b3bb-4ce0-b120-9c7ce917b767:12
"1"
>ttl myLock
"54"
>EVAL "if (redis.call('exists', KEYS[1]) == 0) then redis.call('hincrby', KEYS[1], ARGV[2], 1); redis.call('pexpire', KEYS[1], ARGV[1]); return nil; end; if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then redis.call('hincrby', KEYS[1], ARGV[2], 1); redis.call('pexpire', KEYS[1], ARGV[1]); return nil; end; return redis.call('pttl', KEYS[1]);" 1 myLock 60000 80e6bb0b-b3bb-4ce0-b120-9c7ce917b767:12
null
>ttl myLock
"56"
>hget myLock 80e6bb0b-b3bb-4ce0-b120-9c7ce917b767:12
"2"
给上述lua脚本的传参为1 myLock 60000 80e6bb0b-b3bb-4ce0-b120-9c7ce917b767:12
1表示只有一个Key, 其他为ARGV, 即
KEY[1] = myLock
ARGV[1]=60000
ARGV[2]=80e6bb0b-b3bb-4ce0-b120-9c7ce917b767:12
得到的结果如下:
{
"myLock": {
"80e6bb0b-b3bb-4ce0-b120-9c7ce917b767:12": 2
}
}
表示"myLock"分布式锁已被占用, 获取锁的次数为2次。
2.2 解锁
案例:
// 解锁成功返回1,失败返回0
if (redis.call('del', KEYS[1]) == 1) then
// 向Redis发布消息
redis.call('publish', KEYS[2], ARGV[1]);
return 1
else
return 0
end
说明:
解锁成功后,该lua脚本返回1,解锁失败返回0;
其中: redis.call('del', KEYS[1])
表示根据KEYS[1]键删除数据;redis.call('publish', KEYS[2], ARGV[1])
表示发布消息 KEYS[2], ARGV[1];
2.3 释放一层锁
// 锁不是被自己占有,直接返回
if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then
return nil;
end;
// 锁数量-1,如果还大于0,重新设置过期时间;否则删除锁
local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1);
if (counter > 0) then
// 重置过期时间
redis.call('pexpire', KEYS[1], ARGV[2]);
return 0;
else
redis.call('del', KEYS[1]);
redis.call('publish', KEYS[2], ARGV[1]);
return 1;
end;
return nil;
上述Lua脚本返回1表示删除锁成功,返回0表示锁释放一层,返回空表示释放失败。
2.4 续期
// 锁被自己占用,重新设置过期时间,返回1;否则返回0;
if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then
redis.call('pexpire', KEYS[1], ARGV[1]);
return 1;
end;
return 0;
上述Lua脚本返回1表示续期成功,返回0表示续期失败(当前未获取锁)。
3.Redission用法
分布式锁可以直接使用开源的Redission
引入依赖:
<dependency>
<groupId>org.redisson</groupId>
<artifactId>redisson</artifactId>
<version>3.16.1</version>
</dependency>
编码如下:
public static void lock1() {
RedissonClient redisson = getRedissonClient();
RLock lock = redisson.getLock("myLock");
// 获取锁
lock.lock();
try {
// 业务逻辑
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
// 释放锁
lock.unlock();
}
redisson.shutdown();
System.out.println("Begin end");
}
// 获取redis客户端实例
private static RedissonClient getRedissonClient() {
Config config = new Config();
config.setLockWatchdogTimeout(600*1000);
config.useSingleServer().setAddress("redis://127.0.0.1:6001").setPassword("xxx");
RedissonClient redisson = Redisson.create(config);
return redisson;
}
说明:redisson.getLock(“myLock”)中的myLock即为分布式锁的键,多个客户端实例需要保证键相同。
lock.lock()用于执行获取锁的逻辑,获取成功后直接返回;获取失败后进入等待队列阻塞;lock.unlock();用于手动解锁。
getRedissonClient方法用于获取redis客户端实例,其中的setLockWatchdogTimeout方法用于设置看门狗的超时时间,单位毫秒,默认为30000(30秒)。
使用lock.lock()方法获取锁时不需要设置锁的过期时间,在获取锁成功后,Redisson通过看门狗机制,进行锁的续期,每经过WatchdogTimeout/3时间执行一次续期操作。
当lock.unlock()释放锁时,会同时关闭看门狗。
4.流程和消息
4.1 流程介绍
屏蔽底层Redis对锁的实现方式,仅用Lock和UnLock表示获取锁和释放锁,分布式锁的竞争流程可表示如下图所示:
[1] 客户端ClientA向Redis发送获取锁的消息,锁key为myLock(自定义);
[2] Redis响应成功,表示占锁成功;
[3] 客户端ClientB向Redis发送获取锁的消息,key为myLock;
[4] 服务器判断此时myLock锁已被ClientA占有,Redis响应失败;
[5] Client B 向Redis发送订阅消息订阅myChannel频道,等待收到通知;
[6-7] ClientA释放锁同时发布消息至Redis的myChannel频道;
[8] Redis收到publish消息后,向所有订阅了myChannel频道的客户端发送message通知消息;
[9] ClientB收到订阅的消息后,知道锁已被释放,再次获取锁;
其中:消息6和消息7是lua脚本执行的,因此具备原子性;当客户端收到message消息时,表明锁已被释放,可以重新竞争锁。
另外,对于客户端ClientA,在消息2-6之间,Redis的看门狗机制会自动为myLock续期。
4.2 消息介绍
Auth消息:
*2
$4
AUTH
$8
Root@123
+OK
其中:*2 表示由两个输入字符串;
$4表示第一个字符串长度为4,即AUTH;
$8表示第二个字符串长度为8,即Root@123;
+OK为Redis返回的结构,表示鉴权成功;
解析后为:
client: AUTH Root@123
Redis: OK
PING/PONG消息:
*1
$4
PING
+PONG
客户端向Redis发送PING心跳消息,Redis响应PONG消息。
QUITE消息:
*1
$4
QUIT
+OK
客户端向Redis发送QUITE退出消息,Redis响应OK消息。
以下分场景介绍Redis消息,包括成功获取锁—锁的续期—锁的释放和发布通知以及获取锁失败—锁的订阅—收到通知消息—取消订阅等,为简化篇幅,将省略AUTH、PING/PONG、AUITE等重复的内容。
4.2.1 成功获取锁
通过wireshark抓包以及经过tcp.port == 16379 && tcp contains "exists"
过滤条件,得到:
*6
$4
EVAL
$339
if (redis.call('exists', KEYS[1]) == 0) then redis.call('hincrby', KEYS[1], ARGV[2], 1); redis.call('pexpire', KEYS[1], ARGV[1]); return nil; end; if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then redis.call('hincrby', KEYS[1], ARGV[2], 1); redis.call('pexpire', KEYS[1], ARGV[1]); return nil; end; return redis.call('pttl', KEYS[1]);
$1
1
$6
myLock
$4
6000
$39
80e6bb0b-b3bb-4ce0-b120-9c7ce917b767:12
解析后为:
EVAL "lua脚本" 1 myLock 6000 80e6bb0b-b3bb-4ce0-b120-9c7ce917b767:12
4.2.2 获取锁后,锁的续期
通过wireshark抓包以及经过tcp.port == 16379 && tcp contains "pexpire" && tcp contains "hexists""
过滤条件,得到:
*6
$4
EVAL
$120
if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then redis.call('pexpire', KEYS[1], ARGV[1]); return 1; end; return 0;
$1
1
$6
myLock
$4
6000
$39
80e6bb0b-b3bb-4ce0-b120-9c7ce917b767:12
解析后为:
EVAL "lua脚本" 1 myLock 6000 80e6bb0b-b3bb-4ce0-b120-9c7ce917b767:12
4.2.3 释放锁
通过wireshark抓包以及经过tcp.port == 16379 && tcp contains "pexpire" && tcp contains "hexists""
过滤条件,得到:
*8
$4
EVAL
$305
if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then return nil;end; local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); if (counter > 0) then redis.call('pexpire', KEYS[1], ARGV[2]); return 0; else redis.call('del', KEYS[1]); redis.call('publish', KEYS[2], ARGV[1]); return 1; end; return nil;
$1
2
$6
myLock
$31
redisson_lock__channel:{myLock}
$1
0
$4
6000
$39
80e6bb0b-b3bb-4ce0-b120-9c7ce917b767:12
解析后为:
EVAL "lua脚本" 2 myLock redisson_lock__channel:{myLock} 0 6000 80e6bb0b-b3bb-4ce0-b120-9c7ce917b767:12
4.2.4 获取锁失败后订阅
通过wireshark抓包以及经过tcp.port == 16379 && tcp contains "pexpire" && tcp contains "hexists""
过滤条件,得到:
*2
$9
SUBSCRIBE
$31
redisson_lock__channel:{myLock}
解析后为:
EVAL SUBSCRIBE redisson_lock__channel:{myLock}
4.2.5 订阅后收到通知消息
通过wireshark抓包以及经过tcp.port == 16379 && tcp contains "pexpire" && tcp contains "hexists""
过滤条件,得到:
*3
$7
message
$31
redisson_lock__channel:{myLock}
$1
0
解析后为:
message redisson_lock__channel:{myLock} 0
4.2.6 取消订阅
通过wireshark抓包以及经过tcp.port == 16379 && tcp contains "pexpire" && tcp contains "hexists""
过滤条件,得到:
*2
$11
UNSUBSCRIBE
$31
redisson_lock__channel:{myLock}
解析后为:
EVAL UNSUBSCRIBE redisson_lock__channel:{myLock}
5.源码
源码介绍围绕下图展开,如果对下图的逻辑线比较属性,直接跳过本章内容。
在介绍源码前,有必要了解一下两个概念: Redis的订阅发布机制和Semaphore.
Redis订阅和发布机制:
打开两个Redis客户端,分别执行subscribe myChannel
订阅myChannel频道的消息:
>subscribe myChannel
切换到推送/订阅模式,关闭标签页来停止接收信息。
1) "subscribe"
2) "myChannel"
3) "1"
再打开一个客户端,执行publish myChannel key1
告诉Redis,向订阅了myChannel的客户端发送消息:
>publish myChannel key1
"2"
返回值2表示有两个订阅客户端。
客户端收到Redis的通知消息:
1) "message"
2) "myChannel"
3) "key1"
Semaphore:
说明:由于在线程专题已经详细介绍过AQS,这里涉及AQS的内容不再展开介绍。
Semaphore是JUC中的一个并发工具类,内部维持了一个state的整数记录状态值,并提供了acquireXXX和release方法用于获取和释放锁(共享锁)。当state的值小于acquireXXX时,线程会进入AQS的等待队列,处于阻塞状态; release方法被时,state属性会增加,如果大于0,则从等待队列中唤醒一个。
如下案例中,Semaphore创建时,state设置为0,当客户端ClientA调用acquire方法获取锁时,进入Semaphore的等待队列处于阻塞状态;
当客户端ClientB调用release方法释放锁时(本质是对state值进行加法运算),此时Semaphore会自动唤醒处于等待队列中的ClientA.
客户端A唤醒时,会再次尝试获取锁,此时Semaphore拥有共享锁的数量为1,与acquire方法获取数量相同(默认获取1个),获取锁正常。
接下来,根据如下案例进行源码介绍:
public static void main(String[] args) {
RedissonClient client = getClient();
RLock lock = client.getLock("myLock");
System.out.println("Lock1 Begin exec");
lock.lock();
try {
System.out.println("Lock1 Begin...");
Thread.sleep(1000 * 60);
System.out.println("Lock1 End.");
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
client.shutdown();
}
5.1 获取锁
源码入口lock.lock()->lock(-1, null, false)
:
说明:为减少代码重复度,会提取公共部分代码形成模板方法,模板方法相对于提取前的方法因存在扩展逻辑,导致可读性降低(虽然整体可维护性提升)。如lock(long leaseTime, TimeUnit unit, boolean interruptibly)方法支持响应中断与忽略中断两种情况,支持设置超时时间与不设置超时时间两种情况。
说明:是否响应中断(即被中断时抛出异常或者不抛出异常)不影响解析主线逻辑,在介绍源码时,认为interruptibly为false, 对interruptibly=true分为不进行说明。
private void lock(long leaseTime, TimeUnit unit, boolean interruptibly) throws InterruptedException {
long threadId = Thread.currentThread().getId();
// part-1.执行lua脚本获取锁
Long ttl = tryAcquire(-1, leaseTime, unit, threadId);
if (ttl == null) {
return;
}
// part-2.向Redis发布订阅请求
RFuture<RedissonLockEntry> future = subscribe(threadId);
commandExecutor.syncSubscription(future);
try {
// part-3.while死循环中获取锁
while (true) {
ttl = tryAcquire(-1, leaseTime, unit, threadId);
if (ttl == null) {
break;
}
if (ttl >= 0) {
try {
future.getNow().getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
future.getNow().getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
}
} else {
future.getNow().getLatch().acquireUninterruptibly();
}
}
} finally {
// part-4.获取锁成功后,取消订阅
unsubscribe(future, threadId);
}
}
上述lock方法的主体逻辑可以分为4个部分:
[1] part-1:执行Lua脚本尝试获取锁,获取锁成功,则直接返回;
[2] part-2: 获取锁失败后,向Redis发布订阅;
[3] part-3: while死循环,获取锁或者抛出异常后退出循环;
[4] part-4: 退出while循环(获取锁成功或抛出异常),向Redis发送取消订阅消息;
整体流程比较清晰,从逻辑上可以切分为两个部分:获取锁成功场景和获取锁失败场景。
5.1.1 获取锁成功:
private void lock(long leaseTime, TimeUnit unit, boolean interruptibly) throws InterruptedException {
long threadId = Thread.currentThread().getId();
// part-1.执行lua脚本获取锁
Long ttl = tryAcquire(-1, leaseTime, unit, threadId);
if (ttl == null) {
return;
}
// Ignore ...
}
当tryAcquire(-1, leaseTime, unit, threadId)
返回null时,获取锁成功,退出lock方法。进入tryAcquire方法:
private Long tryAcquire(long waitTime, long leaseTime, TimeUnit unit, long threadId) {
return get(tryAcquireAsync(waitTime, leaseTime, unit, threadId));
}
其中tryAcquireAsync返回一个Future对象,get方法阻塞等待(通过Future的await方法)该Future执行完成并返回结果或者抛出异常(包装后的RedisException)。进入tryAcquireAsync方法:
private <T> RFuture<Long> tryAcquireAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId) {
// 1.尝试从Redis获取锁,返回一个异步的Future对象
RFuture<Long> ttlRemainingFuture;
if (leaseTime != -1) {
ttlRemainingFuture = tryLockInnerAsync(waitTime, leaseTime, unit, threadId, RedisCommands.EVAL_LONG);
} else {
ttlRemainingFuture = tryLockInnerAsync(waitTime, internalLockLeaseTime,
TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);
}
// 2.在Future对象添加回调函数,完成时回调
ttlRemainingFuture.onComplete((ttlRemaining, e) -> {
if (e != null) {
return;
}
// lock acquired
if (ttlRemaining == null) {
if (leaseTime != -1) {
internalLockLeaseTime = unit.toMillis(leaseTime);
} else {
// 3.通过看门狗对锁进行自动续期
scheduleExpirationRenewal(threadId);
}
}
});
return ttlRemainingFuture;
}
tryAcquireAsync方法整体逻辑较为简单:尝试从Redis获取锁,返回一个异步的Future对象;然后在Future对象添加回调逻辑,在Future完成时回调。
tryAcquireAsync方法仍然是个模板方法,支持设置过期时间和不设置过期时间:
[1]不设置时间: 向Redis申请锁时携带看门狗的超时时间(在3.Redission用法章节中通过Config对象的setLockWatchdogTimeout方法设置),之后通过看门狗续期。
[2]设置过期时间: 向Redis申请锁时携带执行的超时时间,超时后自动释放锁,因此不需要看门狗。
这里有两个重点方法需要关注一下:tryLockInnerAsync向Redis申请锁和scheduleExpirationRenewal开启看门狗。
tryLockInnerAsync方法就是将Lua脚本的执行包装为异步执行,返回一个Future对象:
<T> RFuture<T> tryLockInnerAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
return evalWriteAsync(getRawName(), LongCodec.INSTANCE, command,
"if (redis.call('exists', KEYS[1]) == 0) then " +
"redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return nil; " +
"end; " +
"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
"redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return nil; " +
"end; " +
"return redis.call('pttl', KEYS[1]);",
Collections.singletonList(getRawName()), unit.toMillis(leaseTime), getLockName(threadId));
}
这里的lua脚本在2.1章节已进行结合,不再赘述。
scheduleExpirationRenewal功能是开启一个看门狗线程,定期(1/3的看门狗超时时间)向Redis续期,主线逻辑如下所示:
protected void scheduleExpirationRenewal(long threadId) {
//...
// scheduleExpirationRenewal的核心逻辑在于调用renewExpiration
renewExpiration();
//...
}
private void renewExpiration() {
//...
Timeout task = commandExecutor.getConnectionManager().newTimeout(new TimerTask() {
@Override
public void run(Timeout timeout) throws Exception {
//...
// 调用lua脚本-为锁续期
RFuture<Boolean> future = renewExpirationAsync(threadId);
future.onComplete((res, e) -> {
//...
// 每internalLockLeaseTime/3时间,回调自身,开始循环
renewExpiration();
//...
});
}
}, internalLockLeaseTime / 3, TimeUnit.MILLISECONDS);
//...
}
// 对lua脚本异步执行的封装,与2.4中介绍的lua脚本相同
protected RFuture<Boolean> renewExpirationAsync(long threadId) {
return evalWriteAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return 1; " +
"end; " +
"return 0;",
Collections.singletonList(getRawName()),
internalLockLeaseTime, getLockName(threadId));
}
5.1.2 获取锁失败:
当锁已被其他客户端占有,获取锁失败,进入订阅-阻塞等待锁释放流程:
private void lock(long leaseTime, TimeUnit unit, boolean interruptibly) throws InterruptedException {
//...
// part-2.向Redis发布订阅请求
RFuture<RedissonLockEntry> future = subscribe(threadId);
commandExecutor.syncSubscription(future);
try {
// part-3.while死循环中获取锁
while (true) {
ttl = tryAcquire(-1, leaseTime, unit, threadId);
if (ttl == null) {
break;
}
if (ttl >= 0) {
try {
future.getNow().getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
future.getNow().getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
}
} else {
future.getNow().getLatch().acquireUninterruptibly();
}
}
} finally {
// part-4.获取锁成功后,取消订阅
unsubscribe(future, threadId);
}
}
[1] 先看一下向Redis发布订阅请求部分:
// 向lua发送订阅消息,并注册一个监听器监听订阅频道的通知消息
RFuture<RedissonLockEntry> future = subscribe(threadId);
// 与前文结合的get方法逻辑相似,阻塞等待Future执行完成,即等待订阅消息发送给Redis并说道订阅成功消息
commandExecutor.syncSubscription(future);
重点在于第一个方法subscribe(threadId)
:向Redis发送订阅消息,并注册一个监听器,监听通知消息。
向redis发送订阅消息, 消息内容如章节4.2.4中的SUBSCRIBE redisson_lock__channel:{myLock}
, 该消息表示客户端订阅redisson_lock__channel:{myLock}
频道, 当Redis服务器收到该频道的通知消息后,会以Message类型的消息通知给订阅的客户端,消息内容为:message redisson_lock__channel:{myLock} 0
.
当Redssion客户端收到message redisson_lock__channel:{myLock} 0
后,监听器被调用,监听器的注册和监听器的内容后面介绍。
[2] 接着进入while死循环,只有抛出异常或者获取锁成功才会退出:
while (true) {
// tryAcquire前文已介绍:获取锁成功返回null, 否则返回锁的超时时间
ttl = tryAcquire(-1, leaseTime, unit, threadId);
if (ttl == null) {
break;
}
if (ttl >= 0) {
try {
// 根据锁的超时时间,定时阻塞等待锁,超时后,自动苏醒
future.getNow().getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
future.getNow().getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
}
} else {
// 当锁没有设置超时时间时,阻塞等待,直到被唤醒
future.getNow().getLatch().acquireUninterruptibly();
}
}
这里的future.getNow().getLatch()返回的是一个Semaphore对象,初始化时state设置为0,因此调用acquire方法会陷入等待队列。唤醒逻辑在5.3 订阅和通知章节中介绍。
[3] unsubscribe(future, threadId)用于取消订阅,能进入finnally说明有异常抛出或者已经获取锁,从而不需要再监听redis的通知,unsubscribe核心是删除注册的监听器。
5.2 释放锁
Redssion提供的分布式锁支持可重入,因此多次获取需要多次释放。根据案例的lock.unlock()
进入unlock方法:
public void unlock() {
try {
get(unlockAsync(Thread.currentThread().getId()));
} catch (RedisException e) {
if (e.getCause() instanceof IllegalMonitorStateException) {
throw (IllegalMonitorStateException) e.getCause();
} else {
throw e;
}
}
}
get是5.1章中结合过,这里直接进入unlockAsync方法,主线逻辑如下:
public RFuture<Void> unlockAsync(long threadId) {
RPromise<Void> result = new RedissonPromise<>();
// 向Redis发送解锁消息
RFuture<Boolean> future = unlockInnerAsync(threadId);
future.onComplete((opStatus, e) -> {
// 关闭看门狗
cancelExpirationRenewal(threadId);
//...
});
return result;
}
核心逻辑在于unlockInnerAsync:
protected RFuture<Boolean> unlockInnerAsync(long threadId) {
return evalWriteAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
"if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " +
"return nil;" +
"end; " +
"local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " +
"if (counter > 0) then " +
"redis.call('pexpire', KEYS[1], ARGV[2]); " +
"return 0; " +
"else " +
"redis.call('del', KEYS[1]); " +
"redis.call('publish', KEYS[2], ARGV[1]); " +
"return 1; " +
"end; " +
"return nil;",
Arrays.asList(getRawName(), getChannelName()), LockPubSub.UNLOCK_MESSAGE, internalLockLeaseTime, getLockName(threadId));
}
这里的lua脚本用于释放一层锁,如果释放完一层锁后,锁的数量为0,则删除对应的key(释放锁), 此时还会发布一条消息通知Redis,可参考2.3 释放一层锁。
5.3 订阅和通知
在章节5.2中介绍了客户端获取锁失败后Redis订阅,然后进入等待队列阻塞;在章节5.3中介绍了释放锁以及向Redis发布通知消息,本章节的内容是将二者衔接起来。
订阅和通知的核心功能是:阻塞在等待队列中的客户端将会因为通知消息而被唤醒。
Redis通知Redission:
客户端与Redis服务器底层的通讯是基于TCP链,Redssion使用Netty进行了封装,在pipeline中添加了CommandPubSubDecoder解码器,该解码器中存在如下逻辑:
protected void decodeResult(CommandData<Object, Object> data, List<Object> parts, Channel channel,Object result) throws IOException {
//...
if (result instanceof Message) {
//...
if (result instanceof PubSubMessage) {
pubSubConnection.onMessage((PubSubMessage) result);
}
//...
}
// ...
}
当接收到Message且是PubSubMessage类型的消息(即前文介绍的message redisson_lock__channel:{myLock} 0
消息)时,调用pubSubConnection.onMessage((PubSubMessage) result)
,进入该方法:
public void onMessage(PubSubMessage message) {
for (RedisPubSubListener<Object> redisPubSubListener : listeners) {
redisPubSubListener.onMessage(message.getChannel(), message.getValue());
}
}
这里会调用注册的监听器,包括5.1.2 获取锁失败章节中注册的监听器。
Redission注册监听器:
继续看一下5.1.2 获取锁失败章节中RFuture<RedissonLockEntry> future = subscribe(threadId)
逻辑:
protected RFuture<RedissonLockEntry> subscribe(long threadId) {
return pubSub.subscribe(getEntryName(), getChannelName());
}
public RFuture<E> subscribe(String entryName, String channelName) {
//...
RedisPubSubListener<Object> listener = createListener(channelName, value);
service.subscribe(LongCodec.INSTANCE, channelName, semaphore, listener);
//...
}
通过createListener创建监听器,然后将监听器注册到RedisPubSubConnection对象的listeners属性中:
public class RedisPubSubConnection extends RedisConnection {
// 监听器容器
final Queue<RedisPubSubListener<Object>> listeners = new ConcurrentLinkedQueue<RedisPubSubListener<Object>>();
// 添加监听器的方法
public void addListener(RedisPubSubListener<?> listener) {
listeners.add((RedisPubSubListener<Object>) listener);
}
}
至于创建listener后,如何调用RedisPubSubConnection的addListener方法可通过代码追踪和Bebug进行了解,不是重点内容;这里重点关注的是这个监听器的内部逻辑,即当这个监听器被调用时触发的逻辑:
private RedisPubSubListener<Object> createListener(String channelName, E value) {
RedisPubSubListener<Object> listener = new BaseRedisPubSubListener() {
@Override
public void onMessage(CharSequence channel, Object message) {
if (!channelName.equals(channel.toString())) {
return;
}
PublishSubscribe.this.onMessage(value, (Long) message);
}
//...
};
return listener;
}
channelName.equals(channel.toString())
用于判断收到的消息是否是订阅的消息,如前文介绍的订阅的频道是redisson_lock__channel:{myLock}
, 此时会校验channelName。
核心逻辑在于PublishSubscribe.this.onMessage(value, (Long) message)
:
protected void onMessage(RedissonLockEntry value, Long message) {
//...
value.getLatch().release();
//...
}
value.getLatch()
获取的是前文介绍的Semaphore对象,调用release时会修改state值,并唤醒一个等待的线程。
基于上述介绍:线程获取分布式锁失败后,向Redis订阅消息,并注册监听器,然后陷入Semaphore的等待队列;当其他客户端释放锁时,同时会发布通知消息给Redis服务器;Redis服务器收到消息后,向订阅的客户端发送通知消息;客户端收到通知消息后,触发监听器逻辑,监听器基于Semaphore机制,唤醒阻塞的线程。线程被唤醒后再次尝试获取分布式锁。