0. 虽说时运不佳,仍欲提桶跑路
分布式锁的常见实现方案
常用锁的用例
runoob Lua教程
-
对于分布式锁的实现方案,本文如标题所言,简单梳理了redisson的实现方案
-
redisson 也是基于redis的多个命令组合来实现的,为保证执行多个命令时的原子性,redisson借助了lua脚本实现,这个脚本算是其核心科技,也是本章节所关注的。
-
本文大概可以给出常用锁的源码解读、状态图
0.1通过UML简单鸟瞰redisson提供的众多组件
可以看到
- 分布式锁是基于 j.u.c.Lock 来实现的
- 除了分布式锁,还提供了一些分布式缓存组件(数组、队列、映射)等等
并且也基于 j.u.c.Semaphore 实现了分布式的信号量
1. 可重入锁
1.1 获取锁
// org.redisson.RedissonLock#tryLockInnerAsync
// RFuture<T> evalWriteAsync(String key, Codec codec, RedisCommand<T> evalCommandType, String script, List<Object> keys, Object... params)
evalWriteAsync(getName(), LongCodec.INSTANCE, command,
// 当前申请的资源未被占用,
// 于是 创建被申请资源的hash,并put lockName:1(重入次数)
// 被申请的资源的hash,设置租赁时间
"if (redis.call('exists', KEYS[1]) == 0) then " +
"redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return nil; " +
"end; " +
"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
"redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return nil; " +
"end; " +
"return redis.call('pttl', KEYS[1]);",
// KEYS
Collections.singletonList(getName()), // 被申请的资源key(hash)
// ARGV
internalLockLeaseTime, // 租赁时间
getLockName(threadId)); // 客户端线程的标识
1.2 释放锁
// org.redisson.RedissonLock#unlockInnerAsync
// RFuture<T> evalWriteAsync(String key, Codec codec, RedisCommand<T> evalCommandType, String script, List<Object> keys, Object... params)
evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
// 被申请的资源未被占用,直接返回即可
"if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " +
"return nil;" +
"end; " +
// 被申请的资源存在占用,递减其重入次数
"local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " +
// 重入次数大于0,说明该客户端线程(lockName)后续重入了,那么再续上一波租赁时间,并退出
"if (counter > 0) then " +
"redis.call('pexpire', KEYS[1], ARGV[2]); " +
"return 0; " +
// 此时当前客户端线程不再需要锁定该资源了,删除资源的key,通知其他线程
"else " +
"redis.call('del', KEYS[1]); " +
"redis.call('publish', KEYS[2], ARGV[1]); " +
"return 1; " +
"end; " +
"return nil;",
// KEYS
Arrays.asList(getName(), getChannelName()),
// ARGV
LockPubSub.UNLOCK_MESSAGE, internalLockLeaseTime, getLockName(threadId));
2. 公平锁
应用常用通常是锁定并发量较高的资源
2.1 获取锁
// org.redisson.RedissonFairLock#tryLockInnerAsync
evalWriteAsync(
getName(),
LongCodec.INSTANCE,
command,
// 循环删除过期的线程,直至找到1个未过期的线程
// remove stale threads
"while true do " +
// 从队列中获取1个存在的线程名lockName
// threadsQueueName[0] == false,不存在则直接退出
"local firstThreadId2 = redis.call('lindex', KEYS[2], 0);" +
"if firstThreadId2 == false then " +
"break;" +
"end;" +
// zSet通过lockName,获取优先级较高的timeout(最快要过期的)
// 这里使用时延作为排序的依据
"local timeout = tonumber(redis.call('zscore', KEYS[3], firstThreadId2));" +
// 这里线程过期
"if timeout <= tonumber(ARGV[3]) then " +
// 从Zset中删除这个过期的线程
"redis.call('zrem', KEYS[3], firstThreadId2);" +
// 将过期的lockName从队列中移出
"redis.call('lpop', KEYS[2]);" +
"else " +
"break;" +
"end;" +
"end;" +
// 两种情况:第一次?还是说重入了?
// 当前资源没有被占用(或已释放)
"if (redis.call('exists', KEYS[1]) == 0) " +
// 队列为空,没有排队的客户端了
"and ((redis.call('exists', KEYS[2]) == 0) " +
// 或 刚好排到了当前客户端的线程
"or (redis.call('lindex', KEYS[2], 0) == ARGV[2])) then " +
// 将这个lockName从线程队列中移出
"redis.call('lpop', KEYS[2]);" +
// 将这个lockName从zset中移除
"redis.call('zrem', KEYS[3], ARGV[2]);" +
// 查询所有timeoutSetName线程名
// decrease timeouts for all waiting in the queue
"local keys = redis.call('zrange', KEYS[3], 0, -1);" +
// 迭代查询到的线程名全集
"for i = 1, #keys, 1 do " +
// 更新其他排队中的线程的超时时间
"redis.call('zincrby', KEYS[3], -tonumber(ARGV[4]), keys[i]);" +
"end;" +
// 当前资源(锁)的hash中,新增1条key:value=当前客户端线程标识:1(自增以支持可重入)
"redis.call('hset', KEYS[1], ARGV[2], 1);" +
// 将锁的租赁时间作为key的超时时间
"redis.call('pexpire', KEYS[1], ARGV[1]);" +
// 退出
"return nil;" +
"end;" +
// 此时,当前客户端线程已经持有过该资源了,这次即重入
"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
// 重入,即自增 resource.clientId(hash)
"redis.call('hincrby', KEYS[1], ARGV[2], 1);" +
// 重新设置租赁时间
"redis.call('pexpire', KEYS[1], ARGV[1]);" +
// 退出
"return nil;" +
"end;" +
"return 1;",
// KEYS
// getName():资源名、锁的key(hash=lockName:重入次数=RedissonObject.name)
// threadsQueueName:客户端线程标识的队列(list),lockName
// timeoutSetName:维护lockName的zSet(优先级依据超时时间),lockName:timeout
Arrays.asList(getName(), threadsQueueName, timeoutSetName),
// ARGV
internalLockLeaseTime, // 锁的租赁时间
getLockName(threadId), // 持有锁的客户端线程标识: 客户端UUID:线程ID
currentTime, // 系统时钟
wait // 排队的等待时间
);
2.2 释放锁
evalWriteAsync(
getName(),
LongCodec.INSTANCE,
RedisCommands.EVAL_BOOLEAN,
// 如出一辙:清理超时的lockName的zset、list
// remove stale threads
"while true do "
+ "local firstThreadId2 = redis.call('lindex', KEYS[2], 0);"
+ "if firstThreadId2 == false then "
+ "break;"
+ "end; "
+ "local timeout = tonumber(redis.call('zscore', KEYS[3], firstThreadId2));"
+ "if timeout <= tonumber(ARGV[4]) then "
+ "redis.call('zrem', KEYS[3], firstThreadId2); "
+ "redis.call('lpop', KEYS[2]); "
+ "else "
+ "break;"
+ "end; "
+ "end;"
// 锁未被占用
// 但list中存在当前客户端线程
// 依旧发布解锁事件,唤醒其他线程,并退出
+ "if (redis.call('exists', KEYS[1]) == 0) then " +
"local nextThreadId = redis.call('lindex', KEYS[2], 0); " +
"if nextThreadId ~= false then " +
"redis.call('publish', KEYS[4] .. ':' .. nextThreadId, ARGV[1]); " +
"end; " +
"return 1; " +
"end;" +
// hash中不存在该lockName:当前客户端线程并没有占用该资源,即退出
"if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " +
"return nil;" +
"end; " +
// hash递减重入次数
"local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " +
// 如果重入,更新资源的过期时间,并退出
"if (counter > 0) then " +
"redis.call('pexpire', KEYS[1], ARGV[2]); " +
"return 0; " +
"end; " +
// 如果不再占用任务资源了,发布解锁事件,唤醒其他线程
"redis.call('del', KEYS[1]); " +
"local nextThreadId = redis.call('lindex', KEYS[2], 0); " +
"if nextThreadId ~= false then " +
"redis.call('publish', KEYS[4] .. ':' .. nextThreadId, ARGV[1]); " +
"end; " +
"return 1; ",
Arrays.asList(
getName(), // hash=lockName:重入次数
threadsQueueName, // list=lockName
timeoutSetName, // zset=lockName(排序依据timeout)
getChannelName() // 解锁消息的发布通道(监听这个key,以了解解锁的消息)
),
LockPubSub.UNLOCK_MESSAGE, // 解锁的消息
internalLockLeaseTime, // 锁的租赁时间
getLockName(threadId), // lockName,即持锁的客户端线程标识
System.currentTimeMillis());// 系统时钟
3. 联锁、红锁
3.1 获取锁
// 从 lock.lock() 一路步进
// org.redisson.RedissonMultiLock#lockInterruptibly(long, java.util.concurrent.TimeUnit)
@Override
public void lockInterruptibly(long leaseTime, TimeUnit unit) throws InterruptedException {
// 这一坨just 通过租赁时间 计算 等待时间
long baseWaitTime = locks.size() * 1500;
long waitTime = -1;
if (leaseTime == -1) {
waitTime = baseWaitTime;
} else {
leaseTime = unit.toMillis(leaseTime);
waitTime = leaseTime;
if (waitTime <= 2000) {
waitTime = 2000;
} else if (waitTime <= baseWaitTime) {
waitTime = ThreadLocalRandom.current().nextLong(waitTime/2, waitTime);
} else {
waitTime = ThreadLocalRandom.current().nextLong(baseWaitTime, waitTime);
}
}
// 获取锁失败,则反复重试
while (true) {
// step into ...
if (tryLock(waitTime, leaseTime, TimeUnit.MILLISECONDS)) {
return;
}
}
}
// org.redisson.RedissonMultiLock#tryLock(long, long, java.util.concurrent.TimeUnit)
@Override
public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException {
// try {
// return tryLockAsync(waitTime, leaseTime, unit).get();
// } catch (ExecutionException e) {
// throw new IllegalStateException(e);
// }
// 租赁时间 做个转换而已
long newLeaseTime = -1;
if (leaseTime != -1) {
if (waitTime == -1) {
newLeaseTime = unit.toMillis(leaseTime);
} else {
newLeaseTime = unit.toMillis(waitTime)*2;
}
}
// 计算等待时间
long time = System.currentTimeMillis();
long remainTime = -1;
if (waitTime != -1) {
remainTime = unit.toMillis(waitTime);
}
// long lockWaitTime = remainTime
long lockWaitTime = calcLockWaitTime(remainTime);
// return 0
// 表示获取锁失败了的redis节点数,0表示必须获取到所有节点的锁才算成功
int failedLocksLimit = failedLocksLimit();
List<RLock> acquiredLocks = new ArrayList<>(locks.size());
for (ListIterator<RLock> iterator = locks.listIterator(); iterator.hasNext();) {
RLock lock = iterator.next();
boolean lockAcquired;
try {
// 遍历每个锁,tryLock
// lockAcquired=true 表示当前锁获取成功
if (waitTime == -1 && leaseTime == -1) {
lockAcquired = lock.tryLock();
} else {
long awaitTime = Math.min(lockWaitTime, remainTime);
lockAcquired = lock.tryLock(awaitTime, newLeaseTime, TimeUnit.MILLISECONDS);
}
} catch (RedisResponseTimeoutException e) {
unlockInner(Arrays.asList(lock));
lockAcquired = false;
} catch (Exception e) {
lockAcquired = false;
}
// 当前锁获取成功,即放入集合中
if (lockAcquired) {
acquiredLocks.add(lock);
} else {
// 如果获取失败的节点个数 == 所容忍的个数,即失败,退出
if (locks.size() - acquiredLocks.size() == failedLocksLimit()) {
break;
}
// 如果之前有成功获取到的锁,这里不会释放,只是重置迭代器的索引
// 接踵而来的就是,从头重试获取
if (failedLocksLimit == 0) {
unlockInner(acquiredLocks);
if (waitTime == -1) {
return false;
}
failedLocksLimit = failedLocksLimit();
// list.clear()
acquiredLocks.clear();
// reset iterator
while (iterator.hasPrevious()) {
iterator.previous();
}
} else {
failedLocksLimit--;
}
}
if (remainTime != -1) {
remainTime -= System.currentTimeMillis() - time;
time = System.currentTimeMillis();
if (remainTime <= 0) {
unlockInner(acquiredLocks);
return false;
}
}
}
if (leaseTime != -1) {
List<RFuture<Boolean>> futures = new ArrayList<>(acquiredLocks.size());
for (RLock rLock : acquiredLocks) {
RFuture<Boolean> future = ((RedissonLock) rLock).expireAsync(unit.toMillis(leaseTime), TimeUnit.MILLISECONDS);
futures.add(future);
}
for (RFuture<Boolean> rFuture : futures) {
rFuture.syncUninterruptibly();
}
}
return true;
}
3.2 释放锁
// org.redisson.RedissonMultiLock#unlock
// 简直不要太朴实
@Override
public void unlock() {
List<RFuture<Void>> futures = new ArrayList<>(locks.size());
for (RLock lock : locks) {
futures.add(lock.unlockAsync());
}
for (RFuture<Void> future : futures) {
future.syncUninterruptibly();
}
}
3.3 红锁实现
redisson 将联锁视为一种特殊的红锁(最少许获取的锁的数量,为锁的总数)
package org.redisson;
public class RedissonRedLock extends RedissonMultiLock {
/**
* Creates instance with multiple {@link RLock} objects.
* Each RLock object could be created by own Redisson instance.
*
* @param locks - array of locks
*/
public RedissonRedLock(RLock... locks) {
super(locks);
}
@Override
protected int failedLocksLimit() {
// redis节点总数 - 成功获取的最小容忍的节点个数
return locks.size() - minLocksAmount(locks);
}
protected int minLocksAmount(final List<RLock> locks) {
return locks.size()/2 + 1;
}
@Override
protected long calcLockWaitTime(long remainTime) {
return Math.max(remainTime / locks.size(), 1);
}
@Override
public void unlock() {
unlockInner(locks);
}
}
4. 读写锁
4.1 读锁
4.1.1 获取锁
// org.redisson.RedissonReadLock#tryLockInnerAsync
evalWriteAsync(getName(), LongCodec.INSTANCE, command,
// 获取 资源.模式
"local mode = redis.call('hget', KEYS[1], 'mode'); " +
// 资源.模式 还未被占用
"if (mode == false) then " +
// 添加 资源.模式=read
"redis.call('hset', KEYS[1], 'mode', 'read'); " +
// 添加 资源.客户端=1(重入次数)
"redis.call('hset', KEYS[1], ARGV[2], 1); "
// 添加 '{资源}:客户端:rwlock_timeout:重入次数'=1
"redis.call('set', KEYS[2] .. ':1', 1); " +
// 设置 '{资源}:客户端:rwlock_timeout:重入次数' 的租赁时间
"redis.call('pexpire', KEYS[2] .. ':1', ARGV[1]); " +
// 设置 资源的租赁时间
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return nil; " +
"end; " +
// 资源.模式=read || (资源.模式=write && 资源.写端=1)
"if (mode == 'read') or (mode == 'write' and redis.call('hexists', KEYS[1], ARGV[3]) == 1) then " +
// 资源.客户端=重入次数 ++
"local ind = redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
// key='{资源}:客户端:rwlock_timeout:ind'
"local key = KEYS[2] .. ':' .. ind;" +
// 添加 '{资源}:客户端:rwlock_timeout:ind'=1
"redis.call('set', key, 1); " +
// 设置 '{资源}:客户端:rwlock_timeout:ind' 的租赁时间
"redis.call('pexpire', key, ARGV[1]); " +
// 获取 资源 的剩余租赁时间
"local remainTime = redis.call('pttl', KEYS[1]); " +
// 更新 资源 的租赁时间
"redis.call('pexpire', KEYS[1], math.max(remainTime, ARGV[1])); " +
"return nil; " +
"end;" +
// 返回 资源 的剩余租赁时间
"return redis.call('pttl', KEYS[1]);",
Arrays.<Object>asList(
// 资源
// 资源.模式=是否被占用
// 资源.客户端=重入次数
// 租赁时间的更新逻辑:当前申请的锁的租赁时间
getName(),
// 写端name '{资源}:客户端:rwlock_timeout:重入次数'=1
// 该key为读、写之前协作使用的
// 租赁时间的更新:max(剩余的租赁时间,当前申请的锁的租赁时间)
// 因为读锁可以同时被多个客户端线程所持有,因此这里区分一下客户端线程
getReadWriteTimeoutNamePrefix(threadId)
),
internalLockLeaseTime,
getLockName(threadId),
// lockName:write
getWriteLockName(threadId));
4.1.2 释放锁
// org.redisson.RedissonReadLock#unlockInnerAsync
evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
// 获取 资源.模式
"local mode = redis.call('hget', KEYS[1], 'mode'); " +
// 如果 资源.模式 不被占用的话,即发布解锁事件,退出
"if (mode == false) then " +
"redis.call('publish', KEYS[2], ARGV[1]); " +
"return 1; " +
"end; " +
// 如果 资源.客户端 不存在,即退出
"local lockExists = redis.call('hexists', KEYS[1], ARGV[2]); " +
"if (lockExists == 0) then " +
"return nil;" +
"end; " +
// 资源.客户端=重入次数 --
"local counter = redis.call('hincrby', KEYS[1], ARGV[2], -1); " +
// 如果 重入次数=0,删除 资源.客户端
"if (counter == 0) then " +
"redis.call('hdel', KEYS[1], ARGV[2]); " +
"end;" +
// 删除 '{资源}:客户端:rwlock_timeout:重入次数',这是用于保存 租赁时间的
"redis.call('del', KEYS[3] .. ':' .. (counter+1)); " +
// 如果 资源 的key(value即客户端) 尚有客户端
"if (redis.call('hlen', KEYS[1]) > 1) then " +
"local maxRemainTime = -3; " +
"local keys = redis.call('hkeys', KEYS[1]); " +
"for n, key in ipairs(keys) do " +
"counter = tonumber(redis.call('hget', KEYS[1], key)); " +
"if type(counter) == 'number' then " +
"for i=counter, 1, -1 do " +
// 取出其剩余的租赁时间
"local remainTime = redis.call('pttl', KEYS[4] .. ':' .. key .. ':rwlock_timeout:' .. i); " +
"maxRemainTime = math.max(remainTime, maxRemainTime);" +
"end; " +
"end; " +
"end; " +
// 若存在大于0的剩余租赁时间,说明还有客户端在等待占用
// 更新该客户端的租赁时间,然后返回
"if maxRemainTime > 0 then " +
"redis.call('pexpire', KEYS[1], maxRemainTime); " +
"return 0; " +
"end;" +
// 如果说 资源.模式=write(加锁事件由写触发) 那没事了
"if mode == 'write' then " +
"return 0;" +
"end; " +
"end; " +
// 如果 资源.模式=read 释放资源,发布解锁事件
"redis.call('del', KEYS[1]); " +
"redis.call('publish', KEYS[2], ARGV[1]); " +
"return 1; ",
Arrays.<Object>asList(
getName(),
getChannelName(),
timeoutPrefix,
keyPrefix
),
LockPubSub.UNLOCK_MESSAGE,
getLockName(threadId)
);
4.2 写锁
4.2.1 获取锁
// org.redisson.RedissonWriteLock#tryLockInnerAsync
evalWriteAsync(getName(), LongCodec.INSTANCE, command,
// 资源.模式 若不存在,则:
// 设置 资源.模式=写入模式
// 设置 资源.客户端线程=1
// 设置 资源 对应的租赁时间
// 直接退出
"local mode = redis.call('hget', KEYS[1], 'mode'); " +
"if (mode == false) then " +
"redis.call('hset', KEYS[1], 'mode', 'write'); " +
"redis.call('hset', KEYS[1], ARGV[2], 1); " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return nil; " +
"end; " +
// 如果 资源.模式=写入,则:
// 如果 资源.'资源.客户端线程'=重入次数,也存在,则:
// 资源.'资源.客户端线程' 对应的 重入次数 ++
// 资源 对应的租赁时间 更新为 剩余租赁时间 + 当前申请的时间
// 退出
"if (mode == 'write') then " +
"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
"redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
"local currentExpire = redis.call('pttl', KEYS[1]); " +
"redis.call('pexpire', KEYS[1], currentExpire + ARGV[1]); " +
"return nil; " +
"end; " +
"end;" +
// 默认分治,则直接输入其 剩余租赁时间
"return redis.call('pttl', KEYS[1]);",
// 资源
Arrays.<Object>asList(getName()),
// 写入锁的租赁时间
internalLockLeaseTime,
// '资源:客户端线程'
// 注意: 写入客户端线程 的命名规则,资源:客户端线程id:write
getLockName(threadId)
);
4.2.2 释放锁
// org.redisson.RedissonWriteLock#unlockInnerAsync
evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
// 资源.模式 不存在,则:
// 直接发布解锁事件
// 随即,退出即可
"local mode = redis.call('hget', KEYS[1], 'mode'); " +
"if (mode == false) then " +
"redis.call('publish', KEYS[2], ARGV[1]); " +
"return 1; " +
"end;" +
// 如果 资源.模式=写入,则:
"if (mode == 'write') then " +
"local lockExists = redis.call('hexists', KEYS[1], ARGV[3]); " +
// 如果 资源.客户端 却不存在,则:
// 直接退出
"if (lockExists == 0) then " +
"return nil;" +
// 如果 资源.客户端 同时存在,则:
// 如果 资源.'资源.客户端线程' 对应的 重入次数 > 0
// 设置 资源 的租赁时间 为当前申请的时间
// 直接返回0
// 如果 资源.'资源.客户端线程' 对应的 重入次数 <= 0
// 删除 资源.'资源.客户端线程'
// 如果 删除后的 资源 下的key 还有1个(即 资源.模式)
// 删除 这仅存的 资源.模式
// 发布解锁事件
// 如果 删除后的 资源 下还存在 无锁的读端
// 将当前的 资源.模式 改成 读
// 整个函数,返回1
"else " +
"local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " +
"if (counter > 0) then " +
"redis.call('pexpire', KEYS[1], ARGV[2]); " +
"return 0; " +
"else " +
"redis.call('hdel', KEYS[1], ARGV[3]); " +
"if (redis.call('hlen', KEYS[1]) == 1) then " +
"redis.call('del', KEYS[1]); " +
"redis.call('publish', KEYS[2], ARGV[1]); " +
"else " +
// has unlocked read-locks
"redis.call('hset', KEYS[1], 'mode', 'read'); " +
"end; " +
"return 1; "+
"end; " +
"end; " +
"end; "
// 保底给个nil
+ "return nil;",
Arrays.<Object>asList(
getName(),
getChannelName()
),
LockPubSub.READ_UNLOCK_MESSAGE,
internalLockLeaseTime,
getLockName(threadId)
);