分布式锁
分布式锁是一种在分布式计算环境中用于控制多个节点(或多个进程)对共享资源的访问的机制。在分布式系统中,多个节点可能需要协调对共享资源的访问,以防止数据的不一致性或冲突。分布式锁允许多个节点在竞争访问共享资源时进行同步,以确保只有一个节点能够获得锁,从而避免冲突和数据损坏。
设计一个分布式锁需要保证以下四大特性:
- 互斥性:在任意时刻,只能有一个进程持有锁。
- 进程一致:加锁和解锁的操作必须由同一个进程执行。
- 防死锁:即使有一个进程在持有锁期间崩溃而未能主动释放锁,必须有其他方式去释放锁,以保证其他进程能够获取到锁。
- 锁续期:持锁线程执行的操作超出预期时间,只要持锁线程仍然在执行,锁就不应该被释放。
MySQL实现
结构设计
- 设计表结构:设计一个锁的唯一标识
lock_name
作为表的主键,thread_id
字段存储持有锁的线程ID、设置counter
字段用于记录重入次数、expires_at
设置锁的过期时间,以防止死锁。 - 设计索引:还可以在
CREATE
语句中建立联合索引,减少回表次数,优化查询速度,但在高并发场景下执行增删改操作效率会下降。
CREATE TABLE distributed_locks (
lock_name VARCHAR(255) PRIMARY KEY, -- 锁的唯一标识
thread_id VARCHAR(255), -- 当前持有锁的线程ID
counter INT DEFAULT 0, -- 计数器,记录重入次数
expires_at TIMESTAMP NULL -- 锁的过期时间
# INDEX idx_lock_thread_expires (lock_name, thread_id, expires_at)
);
加锁过程
-
首次获取锁:通过
SELECT
语句,以lock_name
和expires_at
为查询条件,查询存在且未过期的锁。如果锁不存在,则使用INSERT
语句插入锁标识、线程ID、计数器初始值一和过期时间。如果锁存在,执行下一步骤。(设置过期时间实现**「防死锁」;由于INSERT
语句默认使用行级锁,同一时刻只能有一个线程插入成功,因此保证了「互斥性」**) -
重复获取锁:判断查询结果中的
thread_id
字段是否与当前线程ID相同。如果相同,说明当前线程需要重复获取锁,执行UPDATE
语句将counter
字段加一,并重置过期时间。如果不相同,执行下一步骤。(设置计数器实现可重入锁) -
获取锁失败:直接从查询结果返回锁的过期时间,帮助申请锁的线程得知等待锁释放的时间。
-- 开始事务
START TRANSACTION;
-- 查询锁是否存在且未过期
SELECT * FROM distributed_locks
WHERE lock_name = ? AND expires_at > NOW();
IF 结果为空 THEN
-- 锁不存在,插入新锁记录
INSERT INTO distributed_locks (lock_name, thread_id, counter, expires_at)
VALUES (?, ?, 1, DATE_ADD(NOW(), INTERVAL ? SECOND));
ELSEIF thread_id 等于当前线程ID THEN
-- 锁已被当前线程持有,重入锁
UPDATE distributed_locks
SET counter = counter + 1, expires_at = DATE_ADD(NOW(), INTERVAL ? SECOND)
WHERE lock_name = ?;
ELSE
-- 锁已被其他线程持有,加锁失败
返回锁的剩余有效期
END IF;
-- 提交事务
COMMIT;
解锁过程
- 检查锁持有者:通过
SELECT
语句,以lock_name
和thread_id
为查询条件,查询锁是否由当前线程持有。如果结果为空,则返回NULL
表示解锁失败。如果结果不为空,执行下一步骤。(通过条件判断保证**「进程一致」**,即加解锁为同一线程) - 减少锁计数器:执行
UPDATE
语句给持有锁的线程的计数器减一,并判断计数器是否大于零。如果大于零,说明锁还没有完全释放,执行UPDATE
语句重置锁的过期时间,返回 0 表示锁未完全释放;如果等于零,说明当前线程已完全释放锁,则执行DELETE
语句删除整个锁,返回 1 表示锁完全释放。
-- 开始事务
START TRANSACTION;
-- 检查锁是否由当前线程持有
SELECT * FROM distributed_locks
WHERE lock_name = ? AND thread_id = ?;
IF 结果为空 THEN
-- 锁不属于当前线程,解锁失败
返回 NULL;
END IF;
-- 减少锁计数器
UPDATE distributed_locks
SET counter = counter - 1
WHERE lock_name = ? AND thread_id = ?;
-- 检查计数器是否大于0
IF counter > 0 THEN
-- 锁仍然被当前线程持有(重置过期时间)
UPDATE distributed_locks
SET expires_at = DATE_ADD(NOW(), INTERVAL ? SECOND)
WHERE lock_name = ?;
返回 0;
ELSE
-- 计数器为0,完全释放锁
DELETE FROM distributed_locks WHERE lock_name = ?;
返回 1;
END IF;
-- 提交事务
COMMIT;
Redis实现
结构设计
- 选用数据结构:采用
String
结构。设置锁的唯一标识作为 KEY,并指定一个唯一的线程标识作为值 VALUE。
加锁过程
- 设置锁:使用 SET 命令 NX(只在键不存在时设置)和 PX(设置过期时间)选项来实现一个原子操作,确保了即使持锁进程崩溃,其他进程仍然能够获取到锁,从而满足**「互斥性」和「防死锁」** 。
-- 1.尝试获取锁,值为唯一的线程标识
redis.call('set', KEYS[1], ARGV[1], 'NX', 'PX', ARGV[2])
解锁过程
- 释放锁:通过 DEL 命令清除锁的键来释放锁。在执行 DEL 操作之前,先使用 GET 命令检查锁的值是否与持锁者的唯一标识匹配,从而满足**「进程一致」** 。
-- 2.比较线程标识与锁中的标识是否一致
if(redis.call('get', KEYS[1]) == ARGV[1]) then
-- 执行del释放锁
return redis.call('del', KEYS[1])
end
return 0
无论是MySQL还是Redis实现的分布式,虽然都考虑到了互斥性、防死锁和进程一致问题,但是却无法解决锁续期问题。所以,Redis 官方推荐采用 Redisson 实现 Redis 的分布式锁,借助 Redisson 的
WatchDog
机制能够很好的解决锁续期的问题。
Redisson实现
结构设计
- 选用数据结构:采用
Hash
结构,设置锁的唯一标识为键,值采用field-value
格式,以线程ID为field
,计数器为value
实现可重入锁。
加锁过程
- 执行Lua脚本:整个 Lua 脚本是以事务方式在 Redis 中运行的,由于 Redis 是单线程模型,因此脚本内的所有命令是按顺序一次性执行的,不会在中途被打断或交叉执行,从而保证**「互斥性」**。
- 首次获取锁:通过
exists
命令判断锁是否不存在。如果不存在,则执行hincrby
命令设置Hash
结构的field
为线程ID,value
为计数器的初始值一,同时执行pexpire
命令设置锁的过期时间;如果存在,执行下一步操作。 - 重复获取锁:通过
hexists
命令判断锁中的field
是否与当前线程相同。如果相同,则执行hincrby
命令给field
对应的计数器加一,同时执行pexpire
命令重置锁的过期时间,防止锁在持有者持有期间过期;如果不相同,说明当前锁被其他线程持有。 - 返回结果:如果返回
nil
表明获取锁成功;如果返回的数据不为null
而是Long
,表明申请锁的线程需要等待的时间。
完整代码如下:
-- 判断锁是否存在
if (redis.call('exists', KEYS[1]) == 0) then
-- 如果锁不存在,设置当前持有者,并将计数器设置为 1
redis.call('hincrby', KEYS[1], ARGV[2], 1)
-- 设置锁的过期时间,单位为毫秒
redis.call('pexpire', KEYS[1], ARGV[1])
-- 返回 nil 表示锁成功创建
return nil
end
-- 判断锁是否已被当前持有者持有
if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then
-- 如果锁已被当前持有者持有,将持有者的计数加 1
redis.call('hincrby', KEYS[1], ARGV[2], 1)
-- 重置锁的过期时间,防止锁在持有者持有期间过期
redis.call('pexpire', KEYS[1], ARGV[1])
-- 返回 nil 表示锁成功重入
return nil
end
-- 如果锁已存在,但被其他持有者持有
-- 返回锁的剩余有效期,单位为毫秒
return redis.call('pttl', KEYS[1])
解锁过程
- 检查锁持有者:通过
hexists
命令查询锁中的field
是否与当前线程相同。如果不相同,表明锁的持有者不是当前线程,返回nil
,如果相同,执行下一步操作。(通过条件判断保证**「进程一致」**,即加解锁为同一线程) - 减少锁计数器:执行
hincrby
命令给持有锁的线程的计数器减一,并判断计数器是否大于零。如果大于零,说明锁还没有完全释放,执行pexpire
命令重置锁的过期时间,返回 0 表示锁未完全释放;如果等于零,说明当前线程已完全释放锁,则执行del
删除整个锁,同时执行publish
命令通知所有等待锁的其他线程,返回 1 表示锁完全释放。(这里执行消息发布是服务于锁等待机制,防止无意义的申请锁而浪费资源)
完整代码如下:
-- 检查锁是否由当前线程持有
if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then
return nil;
end;
-- 减少当前线程持有的锁计数器
local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1);
-- 判断计数器值是否大于 0
if (counter > 0) then
-- 如果计数器大于 0,说明锁仍然被当前线程持有(多次重入)
-- 重置锁的过期时间,防止锁在当前线程还未完全释放时过期
redis.call('pexpire', KEYS[1], ARGV[2]);
-- 返回 0 表示锁还未完全释放(计数器还未清零)
return 0;
else
-- 如果计数器等于 0,说明当前线程已完全释放锁
-- 删除整个锁键
redis.call('del', KEYS[1]);
-- 通过发布频道通知锁已释放(适用于等待锁的其他线程)
redis.call('publish', KEYS[2], ARGV[1]);
-- 返回 1 表示锁成功释放
return 1;
end;
-- 若发生意外情况,返回 nil 表示操作失败
return nil;
看门狗机制
当线程尝试执行 tryLock()
方法获取锁时,在内部调用了 tryAcquireAsync()
方法获取锁的等待时间,返回值为 Long
型 。如果返回结果为 null
,表明加锁成功;返回结果不为 null
,返回值就是需要等待锁的释放时间。
@Override
public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException {
// ...
// 获取加锁的返回值,如果为null则加锁成功,不为null表明加锁失败,还需等待ttl的时间
Long ttl = tryAcquire(waitTime, leaseTime, unit, threadId);
// 获取锁成功,返回true
if (ttl == null) {
return true;
}
}
private Long tryAcquire(long waitTime, long leaseTime, TimeUnit unit, long threadId) {
// 调用tryAcquireAsync获取锁的等待时间的Long值
return get(tryAcquireAsync(waitTime, leaseTime, unit, threadId));
}
在 tryAcquireAsync()
方法中,首先判断锁是否设置了释放时间。
- 如果设置了锁的释放时间,直接进行上述 lua脚本 的加锁操作,并返回结果;
- 如果没有设置锁的释放时间,将锁的过期时间设置为默认值30s并进行 lua脚本 的加锁操作,同时启用看门狗机制,不断的进行自动续约,实现**「锁续期」**;
- 可以看到,两种操作都最终使锁被设置了过期时间,防止持有锁的客户端异常退出后锁无法释放的问题(即**「防死锁」**)。
private <T> RFuture<Long> tryAcquireAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId) {
// 如果设置锁的过期时间,直接进行加锁操作返回结果
if (leaseTime != -1) {
return tryLockInnerAsync(waitTime, leaseTime, unit, threadId, RedisCommands.EVAL_LONG);
}
// 如果没有设置锁的过期时间,同样调用tryLockInnerAsync方法进行加锁,但是将过期时间默认设置为30s
RFuture<Long> ttlRemainingFuture = tryLockInnerAsync(waitTime,
commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(),
TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);
ttlRemainingFuture.onComplete((ttlRemaining, e) -> {
// 出现异常,返回
if (e != null) {
return;
}
// 锁获取成功,进行自动续约
if (ttlRemaining == null) {
scheduleExpirationRenewal(threadId);
}
});
return ttlRemainingFuture;
}
自动续约的操作由 scheduleExpirationRenewal
方法实现。该方法内部首先会从成员变量的 ConcurrentHashMap
集合中根据当前锁的名称获取值,如果获取不到,说明当前线程任务执行完毕,无需再进行锁的自动续期;如果可以获取到值,则启动一个定时任务,通过递归调用实现每 10s 触发一次任务,在任务内部执行了如下的 lua脚本,从而重置锁的过期时间。
-- 检查锁的持有者是否与当前线程相同
if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then
-- 如果相同,重置过期时间
redis.call('pexpire', KEYS[1], ARGV[1])
-- 返回 1 表示操作成功
return 1
end
-- 如果 field 不存在,返回 0 表示操作失败
return 0
取消自动续约:当持有锁的线程的任务执行完毕后,会执行
remove()
方法删除ConcurrentHashMap
集合中的键值,而看门狗在获取ConcurrentHashMap
集合中的键值失败后,就会返回结果,结束自动续约。
锁等待机制
- 尝试获取锁:
- 首先调用
tryAcquire()
方法获取锁剩余的存活时间 ttl,如果结果为 null,返回 true 表明加锁成功。 - 接着计算当前时间与获取锁之前的时间的差值,如果申请锁的耗时大于等待时间,表明申请锁失败,返回 false。
- 首先调用
@Override
public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException {
long time = unit.toMillis(waitTime);
long current = System.currentTimeMillis();
long threadId = Thread.currentThread().getId();
// 1.尝试获取锁
Long ttl = tryAcquire(leaseTime, unit, threadId);
// 1.1.锁获取成功
if (ttl == null) {
return true;
}
// 1.1.申请锁的耗时如果大于等于最大等待时间,则申请锁失败
time -= System.currentTimeMillis() - current;
if (time <= 0) {
acquireFailed(threadId);
return false;
}
current = System.currentTimeMillis();
// x...
}
- 订阅锁释放通知:通过
subscribe
方法,基于当前线程的threadId
发起一个异步订阅请求,等待锁释放的通知。这一步骤的主要作用是通过订阅锁的释放事件来实现对锁的高效管理,防止无效的锁申请对系统资源造成浪费。- 等待锁释放超时:通过
await()
方法(内部使用CountDownLatch
实现阻塞)在指定时间内等待失败,说明当前线程的等待时间超时,无需再获取锁,需要执行取消订阅和失败处理的逻辑。 - 取消订阅:通过
cancel()
方法取消订阅。如果取消失败,说明订阅任务正在执行,此时无法直接取消任务。需要执行回调函数等待任务执行完毕;如果取消成功,则执行acquireFailed()
方法并返回 false。 - 回调函数取消订阅:通过
onComplete
回调,可以在任务完成后自动触发unsubscribe
操作,以确保订阅状态被正确清理。
- 等待锁释放超时:通过
// 2.订阅锁释放通知,通过await方法阻塞等待锁释放,防止无效的锁申请浪费资源
RFuture<RedissonLockEntry> subscribeFuture = subscribe(threadId);
// 2.1.如果await在规定的时间内未完成,表示订阅超时,进入if代码块,执行取消订阅和失败处理的逻辑
if (!subscribeFuture.await(time, TimeUnit.MILLISECONDS)) {
if (!subscribeFuture.cancel(false)) {
subscribeFuture.onComplete((res, e) -> {
if (e == null) {
unsubscribe(subscribeFuture, threadId);
}
});
}
acquireFailed(threadId);
return false;
}
try {
// 2.2.计算获取锁的总耗时,如果大于等于最大等待时间,则获取锁失败.
time -= System.currentTimeMillis() - current;
if (time <= 0) {
acquireFailed(threadId);
return false;
}
// 3.x
} finally {
// 4.x
}
- 轮询获取锁:
- 再次获取锁:返回锁的剩余存活时间
ttl
;如果ttl
为空说明获取锁成功,直接返回 true,否则继续下一步。 - 阻塞获取锁:取锁剩余的存活时间和线程剩余的等待时间的最小值,利用信号量
Semaphore
阻塞获取锁。
- 再次获取锁:返回锁的剩余存活时间
// 3.while(true)死循环,不断尝试获取锁
while (true) {
long currentTime = System.currentTimeMillis();
// 3.1.再次尝试获取锁
ttl = tryAcquire(leaseTime, unit, threadId);
if (ttl == null) {
return true;
}
// 更新剩余的等待时间
time -= System.currentTimeMillis() - currentTime;
if (time <= 0) {
acquireFailed(threadId);
return false;
}
currentTime = System.currentTimeMillis();
// 3.2.取锁剩余的存活时间和线程剩余的等待时间的最小值,尝试获取锁
if (ttl >= 0 && ttl < time) {
getEntry(threadId).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
} else {
getEntry(threadId).getLatch().tryAcquire(time, TimeUnit.MILLISECONDS);
}
// 更新剩余的等待时间
time -= System.currentTimeMillis() - currentTime;
if (time <= 0) {
acquireFailed(threadId);
return false;
}
}
- 取消订阅:无论最终是否成功获取锁,在
finally
中都会调用unsubscribe()
方法取消订阅,以确保资源释放和避免不必要的等待事件。
finally {
// 4.无论是否获取到了锁,都要取消订阅解锁消息
unsubscribe(subscribeFuture, threadId);
}