文章目录
- 锁(Lock)
- 公平锁(Fair Lock)
- 联锁(MultiLock)
- 红锁(RedLock) 【已废弃】
- 读写锁(ReadWriteLock)
- 信号量(Semaphore)
- 可过期许可信号量(PermitExpirableSemaphore)
- 倒计时门闩(CountDownLatch)
- 自旋锁(Spin Lock)
- 栅栏锁(Fenced Lock)
- 官方文档
锁(Lock)
基于Redis或Valkey的分布式可重入锁对象(Java实现),实现了Lock
接口。通过发布/订阅(pub/sub)通道通知所有Redisson实例中等待获取锁的其他线程。
如果获取锁的Redisson实例崩溃,该锁可能会永远处于已获取状态。为避免此问题,Redisson维护了一个锁看门狗,当持有锁的Redisson实例存活时,它会延长锁的过期时间。默认锁看门狗超时时间为30秒,可通过Config.lockWatchdogTimeout
配置修改。
在获取锁时可通过leaseTime
参数指定锁的持有时间。超过指定时间后,锁将自动释放。
RLock
对象的行为遵循Java锁规范,即只有锁的持有线程可以解锁,否则会抛出IllegalMonitorStateException
。其他情况可考虑使用RSemaphore
对象。
代码示例:
RLock lock = redisson.getLock("myLock");
// 传统加锁方式
lock.lock();
// 加锁并10秒后自动解锁
lock.lock(10, TimeUnit.SECONDS);
// 等待最多100秒尝试获取锁,并在获取后10秒自动解锁
boolean res = lock.tryLock(100, 10, TimeUnit.SECONDS);
if (res) {
try {
...
} finally {
lock.unlock();
}
}
异步接口使用示例:
RLock lock = redisson.getLock("myLock");
long threadId = Thread.currentThread().getId();
RFuture<Void> lockFuture = lock.lockAsync(threadId);
// 加锁并10秒后自动解锁
RFuture<Void> lockFuture = lock.lockAsync(10, TimeUnit.SECONDS, threadId);
// 等待最多100秒尝试获取锁,并在获取后10秒自动解锁
RFuture<Boolean> lockFuture = lock.tryLockAsync(100, 10, TimeUnit.SECONDS, threadId);
lockFuture.whenComplete((res, exception) -> {
// ...
lock.unlockAsync(threadId);
});
响应式接口使用示例:
RedissonReactiveClient redisson = redissonClient.reactive();
RLockReactive lock = redisson.getLock("myLock");
long threadId = Thread.currentThread().getId();
Mono<Void> lockMono = lock.lock(threadId);
// 加锁并10秒后自动解锁
Mono<Void> lockMono = lock.lock(10, TimeUnit.SECONDS, threadId);
// 等待最多100秒尝试获取锁,并在获取后10秒自动解锁
Mono<Boolean> lockMono = lock.tryLock(100, 10, TimeUnit.SECONDS, threadId);
lockMono.doOnSuccess(res -> {
// ...
})
.doFinally(r -> lock.unlock(threadId).subscribe())
.subscribe();
RxJava3接口使用示例:
RedissonRxClient redisson = redissonClient.rxJava();
RLockRx lock = redisson.getLock("myLock");
long threadId = Thread.currentThread().getId();
Completable lockRes = lock.lock(threadId);
// 加锁并10秒后自动解锁
Completable lockRes = lock.lock(10, TimeUnit.SECONDS, threadId);
// 等待最多100秒尝试获取锁,并在获取后10秒自动解锁
Single<Boolean> lockRes = lock.tryLock(100, 10, TimeUnit.SECONDS, threadId);
lockRes.doOnSuccess(res -> {
// ...
})
.doFinally(() -> lock.unlock(threadId).subscribe())
.subscribe();
公平锁(Fair Lock)
基于Redis或Valkey的分布式可重入公平锁(Java实现),实现了Lock
接口。公平锁保证线程按请求顺序获取锁。所有等待线程会被队列化,若某线程异常终止,Redisson会等待其恢复5秒(例如5个线程异常终止,总延迟为25秒)。
锁看门狗机制与普通锁相同,支持leaseTime
参数。解锁需由持有线程执行,否则抛出IllegalMonitorStateException
。
代码示例:
RLock lock = redisson.getFairLock("myLock");
// 传统加锁方式
lock.lock();
// 加锁并10秒后自动解锁
lock.lock(10, TimeUnit.SECONDS);
// 等待最多100秒尝试获取锁,并在获取后10秒自动解锁
boolean res = lock.tryLock(100, 10, TimeUnit.SECONDS);
if (res) {
try {
...
} finally {
lock.unlock();
}
}
异步接口使用示例:
RLock lock = redisson.getFairLock("myLock");
RFuture<Void> lockFuture = lock.lockAsync();
// 加锁并10秒后自动解锁
RFuture<Void> lockFuture = lock.lockAsync(10, TimeUnit.SECONDS);
// 等待最多100秒尝试获取锁,并在获取后10秒自动解锁
RFuture<Boolean> lockFuture = lock.tryLockAsync(100, 10, TimeUnit.SECONDS);
lockFuture.whenComplete((res, exception) -> {
// ...
lock.unlockAsync();
});
响应式接口使用示例:
RedissonReactiveClient redisson = redissonClient.reactive();
RLockReactive lock = redisson.getFairLock("myLock");
long threadId = Thread.currentThread().getId();
Mono<Void> lockMono = lock.lock(threadId);
// 加锁并10秒后自动解锁
Mono<Void> lockMono = lock.lock(10, TimeUnit.SECONDS, threadId);
// 等待最多100秒尝试获取锁,并在获取后10秒自动解锁
Mono<Boolean> lockMono = lock.tryLock(100, 10, TimeUnit.SECONDS, threadId);
lockMono.doOnSuccess(res -> {
// ...
})
.doFinally(r -> lock.unlock(threadId).subscribe())
.subscribe();
RxJava3接口使用示例:
RedissonRxClient redisson = redissonClient.rxJava();
RLockRx lock = redisson.getFairLock("myLock");
long threadId = Thread.currentThread().getId();
Completable lockRes = lock.lock(threadId);
// 加锁并10秒后自动解锁
Completable lockRes = lock.lock(10, TimeUnit.SECONDS, threadId);
// 等待最多100秒尝试获取锁,并在获取后10秒自动解锁
Single<Boolean> lockRes = lock.tryLock(100, 10, TimeUnit.SECONDS, threadId);
lockRes.doOnSuccess(res -> {
// ...
})
.doFinally(() -> lock.unlock(threadId).subscribe())
.subscribe();
联锁(MultiLock)
基于Redis或Valkey的分布式联锁对象,允许多个RLock
对象(可属于不同Redisson实例)组合为单个锁。若持有联锁的Redisson实例崩溃,联锁可能永久处于已获取状态。锁看门狗机制与普通锁相同,支持leaseTime
参数。解锁需由持有线程执行。
代码示例:
RLock lock1 = redisson1.getLock("lock1");
RLock lock2 = redisson2.getLock("lock2");
RLock lock3 = redisson3.getLock("lock3");
RLock multiLock = anyRedisson.getMultiLock(lock1, lock2, lock3);
// 传统加锁方式
multiLock.lock();
// 加锁并10秒后自动解锁
multiLock.lock(10, TimeUnit.SECONDS);
// 等待最多100秒尝试获取锁,并在获取后10秒自动解锁
boolean res = multiLock.tryLock(100, 10, TimeUnit.SECONDS);
if (res) {
try {
...
} finally {
multiLock.unlock();
}
}
异步接口使用示例:
RLock lock1 = redisson1.getLock("lock1");
RLock lock2 = redisson2.getLock("lock2");
RLock lock3 = redisson3.getLock("lock3");
RLock multiLock = anyRedisson.getMultiLock(lock1, lock2, lock3);
long threadId = Thread.currentThread().getId();
RFuture<Void> lockFuture = multiLock.lockAsync(threadId);
// 加锁并10秒后自动解锁
RFuture<Void> lockFuture = multiLock.lockAsync(10, TimeUnit.SECONDS, threadId);
// 等待最多100秒尝试获取锁,并在获取后10秒自动解锁
RFuture<Boolean> lockFuture = multiLock.tryLockAsync(100, 10, TimeUnit.SECONDS, threadId);
lockFuture.whenComplete((res, exception) -> {
// ...
multiLock.unlockAsync(threadId);
});
响应式接口使用示例:
RedissonReactiveClient anyRedisson = redissonClient.reactive();
RLockReactive lock1 = redisson1.getLock("lock1");
RLockReactive lock2 = redisson2.getLock("lock2");
RLockReactive lock3 = redisson3.getLock("lock3");
RLockReactive multiLock = anyRedisson.getMultiLock(lock1, lock2, lock3);
long threadId = Thread.currentThread().getId();
Mono<Void> lockMono = multiLock.lock(threadId);
// 加锁并10秒后自动解锁
Mono<Void> lockMono = multiLock.lock(10, TimeUnit.SECONDS, threadId);
// 等待最多100秒尝试获取锁,并在获取后10秒自动解锁
Mono<Boolean> lockMono = multiLock.tryLock(100, 10, TimeUnit.SECONDS, threadId);
lockMono.doOnSuccess(res -> {
// ...
})
.doFinally(r -> multiLock.unlock(threadId).subscribe())
.subscribe();
RxJava3接口使用示例:
RedissonRxClient anyRedisson = redissonClient.rxJava();
RLockRx lock1 = redisson1.getLock("lock1");
RLockRx lock2 = redisson2.getLock("lock2");
RLockRx lock3 = redisson3.getLock("lock3");
RLockRx multiLock = anyRedisson.getMultiLock(lock1, lock2, lock3);
long threadId = Thread.currentThread().getId();
Completable lockRes = multiLock.lock(threadId);
// 加锁并10秒后自动解锁
Completable lockRes = multiLock.lock(10, TimeUnit.SECONDS, threadId);
// 等待最多100秒尝试获取锁,并在获取后10秒自动解锁
Single<Boolean> lockRes = multiLock.tryLock(100, 10, TimeUnit.SECONDS, threadId);
lockRes.doOnSuccess(res -> {
// ...
})
.doFinally(() -> multiLock.unlock(threadId).subscribe())
.subscribe();
红锁(RedLock) 【已废弃】
此对象已弃用,请改用RLock
或RFencedLock
。
RedLock 的背景
- 问题场景:传统单节点 Redis 锁(如
SETNX
)在主从切换时可能失效(例如主节点崩溃,锁未同步到从节点,导致其他客户端重复获取锁)。 - 解决方案:Redis 作者 Antirez 提出 RedLock 算法,要求客户端在多个独立的 Redis 节点上获取锁,通过多数派机制提高可靠性。
RedLock 实现原理
Redisson 的 RedissonRedLock
类实现了 RedLock 算法,核心步骤如下:
-
获取锁:
- 向 N 个独立 Redis 节点 依次发起加锁请求(使用相同键和随机值)。
- 加锁成功条件:
- 成功获取锁的节点数 ≥
N/2 + 1
(多数派)。 - 总耗时 < 锁的失效时间(避免锁过期后仍被误用)。
- 成功获取锁的节点数 ≥
-
释放锁:
- 向所有节点发起解锁请求(无论是否加锁成功),避免残留锁状态。
Code
// 1. 创建多个独立 Redis 节点的客户端
RedissonClient client1 = Redisson.create(new Config().useSingleServer().setAddress("redis://node1:6379"));
RedissonClient client2 = Redisson.create(new Config().useSingleServer().setAddress("redis://node2:6379"));
RedissonClient client3 = Redisson.create(new Config().useSingleServer().setAddress("redis://node3:6379"));
// 2. 从每个客户端获取 RLock 对象
RLock lock1 = client1.getLock("myLock");
RLock lock2 = client2.getLock("myLock");
RLock lock3 = client3.getLock("myLock");
// 3. 创建 RedLock
RLock redLock = new RedissonRedLock(lock1, lock2, lock3);
try {
// 4. 尝试加锁(最多等待 10 秒,锁有效期 30 秒)
boolean success = redLock.tryLock(10, 30, TimeUnit.SECONDS);
if (success) {
// 执行业务逻辑
}
} finally {
redLock.unlock(); // 5. 释放锁
}
使用注意事项
- 节点独立性:所有 Redis 节点必须为独立主节点(无主从关系),避免数据同步问题。
- 锁有效期(Lease Time):需合理设置,确保业务逻辑能在锁失效前完成。
- 性能开销:跨节点通信增加延迟,适合对可靠性要求高的场景。
- 网络分区风险:极端情况下仍可能锁失效,需结合业务幂等性设计。
缺陷
-
无法应对NPC异常场景
RedLock 的核心算法无法正确处理网络延迟(Network Delay)、进程暂停(Process Pause)和时钟漂移(Clock Drift)这三种异常情况。例如:当持有锁的客户端因进程暂停(如JVM的STW)导致锁超时释放时,其他客户端可能同时获取同一资源的锁,破坏互斥性。若Redis节点时钟不同步,锁可能提前失效,导致多个客户端同时持有锁。 -
学术界与社区的批评
Redis作者Antirez提出的RedLock算法曾遭到分布式系统专家Martin Kleppmann的质疑。Martin指出,RedLock依赖的“多数派加锁成功”机制无法保证严格的互斥性,尤其在节点故障恢复或主从切换时,可能产生锁状态不一致的问题。
RedLock的替代方案
- 联锁(MultiLock):允许一次性锁定多个资源,但需显式管理所有锁实例。
- 单节点锁+WatchDog:通过定期续期避免锁超时,简化实现并兼顾性能
读写锁(ReadWriteLock)
基于Redis或Valkey的分布式可重入读写锁(Java实现),实现了ReadWriteLock
接口。读锁和写锁均实现RLock
接口,允许多个读锁或单个写锁。
锁看门狗机制与普通锁相同,支持leaseTime
参数。解锁需由持有线程执行,否则抛出IllegalMonitorStateException
。
并发规则矩阵:
请求类型 | 已持读锁 | 已持写锁 |
---|---|---|
请求读锁 | ✅允许 | ❌拒绝 |
请求写锁 | ❌拒绝 | ❌拒绝 |
代码示例:
RReadWriteLock rwlock = redisson.getReadWriteLock("myLock");
RLock readLock = rwlock.readLock();
// 或
RLock writeLock = rwlock.writeLock();
// 传统加锁方式
readLock.lock();
// 加锁并10秒后自动解锁
writeLock.lock(10, TimeUnit.SECONDS);
// 等待最多100秒尝试获取锁,并在获取后10秒自动解锁
boolean res = writeLock.tryLock(100, 10, TimeUnit.SECONDS);
if (res) {
try {
...
} finally {
writeLock.unlock();
}
}
异步接口使用示例:
RReadWriteLock rwlock = redisson.getReadWriteLock("myLock");
long threadId = Thread.currentThread().getId();
RLock readLock = rwlock.readLock();
// 或
RLock writeLock = rwlock.writeLock();
RFuture<Void> lockFuture = readLock.lockAsync(threadId);
// 加锁并10秒后自动解锁
RFuture<Void> lockFuture = writeLock.lockAsync(10, TimeUnit.SECONDS, threadId);
// 等待最多100秒尝试获取锁,并在获取后10秒自动解锁
RFuture<Boolean> lockFuture = writeLock.tryLockAsync(100, 10, TimeUnit.SECONDS, threadId);
lockFuture.whenComplete((res, exception) -> {
// ...
writeLock.unlockAsync(threadId);
});
响应式接口使用示例:
RedissonReactiveClient redisson = redissonClient.reactive();
RReadWriteLockReactive rwlock = redisson.getReadWriteLock("myLock");
RLockReactive readLock = rwlock.readLock();
// 或
RLockReactive writeLock = rwlock.writeLock();
long threadId = Thread.currentThread().getId();
Mono<Void> lockMono = writeLock.lock(threadId);
// 加锁并10秒后自动解锁
Mono<Void> lockMono = writeLock.lock(10, TimeUnit.SECONDS, threadId);
// 等待最多100秒尝试获取锁,并在获取后10秒自动解锁
Mono<Boolean> lockMono = writeLock.tryLock(100, 10, TimeUnit.SECONDS, threadId);
lockMono.doOnSuccess(res -> {
// ...
})
.doFinally(r -> writeLock.unlock(threadId).subscribe())
.subscribe();
RxJava3接口使用示例:
RedissonRxClient redisson = redissonClient.rxJava();
RReadWriteLockRx rwlock = redisson.getReadWriteLock("myLock");
RLockRx readLock = rwlock.readLock();
// 或
RLockRx writeLock = rwlock.writeLock();
long threadId = Thread.currentThread().getId();
Completable lockRes = writeLock.lock(threadId);
// 加锁并10秒后自动解锁
Completable lockRes = writeLock.lock(10, TimeUnit.SECONDS, threadId);
// 等待最多100秒尝试获取锁,并在获取后10秒自动解锁
Single<Boolean> lockRes = writeLock.tryLock(100, 10, TimeUnit.SECONDS, threadId);
lockRes.doOnSuccess(res -> {
// ...
})
.doFinally(() -> writeLock.unlock(threadId).subscribe())
.subscribe();
信号量(Semaphore)
基于Redis或Valkey的分布式信号量(Java实现),与Java Semaphore
类似。需通过trySetPermits(permits)
初始化许可数量。
代码示例:
RSemaphore semaphore = redisson.getSemaphore("mySemaphore");
// 获取一个许可
semaphore.acquire();
// 获取10个许可
semaphore.acquire(10);
// 尝试获取许可
boolean res = semaphore.tryAcquire();
// 尝试获取许可(最多等待15秒)
boolean res = semaphore.tryAcquire(15, TimeUnit.SECONDS);
// 尝试获取10个许可
boolean res = semaphore.tryAcquire(10);
// 尝试获取10个许可(最多等待15秒)
boolean res = semaphore.tryAcquire(10, 15, TimeUnit.SECONDS);
if (res) {
try {
...
} finally {
semaphore.release();
}
}
异步接口使用示例:
RSemaphore semaphore = redisson.getSemaphore("mySemaphore");
// 获取一个许可
RFuture<Void> acquireFuture = semaphore.acquireAsync();
// 获取10个许可
RFuture<Void> acquireFuture = semaphore.acquireAsync(10);
// 尝试获取许可
RFuture<Boolean> acquireFuture = semaphore.tryAcquireAsync();
// 尝试获取许可(最多等待15秒)
RFuture<Boolean> acquireFuture = semaphore.tryAcquireAsync(15, TimeUnit.SECONDS);
// 尝试获取10个许可
RFuture<Boolean> acquireFuture = semaphore.tryAcquireAsync(10);
// 尝试获取10个许可(最多等待15秒)
RFuture<Boolean> acquireFuture = semaphore.tryAcquireAsync(10, 15, TimeUnit.SECONDS);
acquireFuture.whenComplete((res, exception) -> {
// ...
semaphore.releaseAsync();
});
响应式接口使用示例:
RedissonReactiveClient redisson = redissonClient.reactive();
RSemaphoreReactive semaphore = redisson.getSemaphore("mySemaphore");
// 获取一个许可
Mono<Void> acquireMono = semaphore.acquire();
// 获取10个许可
Mono<Void> acquireMono = semaphore.acquire(10);
// 尝试获取许可
Mono<Boolean> acquireMono = semaphore.tryAcquire();
// 尝试获取许可(最多等待15秒)
Mono<Boolean> acquireMono = semaphore.tryAcquire(15, TimeUnit.SECONDS);
// 尝试获取10个许可
Mono<Boolean> acquireMono = semaphore.tryAcquire(10);
// 尝试获取10个许可(最多等待15秒)
Mono<Boolean> acquireMono = semaphore.tryAcquire(10, 15, TimeUnit.SECONDS);
acquireMono.doOnSuccess(res -> {
// ...
})
.doFinally(r -> semaphore.release().subscribe())
.subscribe();
RxJava3接口使用示例:
RedissonRxClient redisson = redissonClient.rxJava();
RSemaphoreRx semaphore = redisson.getSemaphore("mySemaphore");
// 获取一个许可
Completable acquireRx = semaphore.acquire();
// 获取10个许可
Completable acquireRx = semaphore.acquire(10);
// 尝试获取许可
Single<Boolean> acquireRx = semaphore.tryAcquire();
// 尝试获取许可(最多等待15秒)
Single<Boolean> acquireRx = semaphore.tryAcquire(15, TimeUnit.SECONDS);
// 尝试获取10个许可
Single<Boolean> acquireRx = semaphore.tryAcquire(10);
// 尝试获取10个许可(最多等待15秒)
Single<Boolean> acquireRx = semaphore.tryAcquire(10, 15, TimeUnit.SECONDS);
acquireRx.doOnSuccess(res -> {
// ...
})
.doFinally(() -> semaphore.release().subscribe())
.subscribe();
可过期许可信号量(PermitExpirableSemaphore)
基于Redis或Valkey的分布式信号量,支持为每个许可设置租约时间。每个许可通过唯一ID标识,需使用该ID释放。需通过trySetPermits(permits)
初始化许可数量,并通过addPermits(permits)
动态调整。
代码示例:
RPermitExpirableSemaphore semaphore = redisson.getPermitExpirableSemaphore("mySemaphore");
semaphore.trySetPermits(23);
// 获取许可
String id = semaphore.acquire();
// 获取许可(租约10秒)
String id = semaphore.acquire(10, TimeUnit.SECONDS);
// 尝试获取许可
String id = semaphore.tryAcquire();
// 尝试获取许可(最多等待15秒)
String id = semaphore.tryAcquire(15, TimeUnit.SECONDS);
// 尝试获取许可(租约15秒,最多等待10秒)
String id = semaphore.tryAcquire(10, 15, TimeUnit.SECONDS);
if (id != null) {
try {
...
} finally {
semaphore.release(id);
}
}
异步接口使用示例:
RPermitExpirableSemaphore semaphore = redisson.getPermitExpirableSemaphore("mySemaphore");
RFuture<Boolean> setFuture = semaphore.trySetPermitsAsync(23);
// 获取许可
RFuture<String> acquireFuture = semaphore.acquireAsync();
// 获取许可(租约10秒)
RFuture<String> acquireFuture = semaphore.acquireAsync(10, TimeUnit.SECONDS);
// 尝试获取许可
RFuture<String> acquireFuture = semaphore.tryAcquireAsync();
// 尝试获取许可(最多等待15秒)
RFuture<String> acquireFuture = semaphore.tryAcquireAsync(15, TimeUnit.SECONDS);
// 尝试获取许可(租约15秒,最多等待10秒)
RFuture<String> acquireFuture = semaphore.tryAcquireAsync(10, 15, TimeUnit.SECONDS);
acquireFuture.whenComplete((id, exception) -> {
// ...
semaphore.releaseAsync(id);
});
响应式接口使用示例:
RedissonReactiveClient redisson = redissonClient.reactive();
RPermitExpirableSemaphoreReactive semaphore = redisson.getPermitExpirableSemaphore("mySemaphore");
Mono<Boolean> setMono = semaphore.trySetPermits(23);
// 获取许可
Mono<String> acquireMono = semaphore.acquire();
// 获取许可(租约10秒)
Mono<String> acquireMono = semaphore.acquire(10, TimeUnit.SECONDS);
// 尝试获取许可
Mono<String> acquireMono = semaphore.tryAcquire();
// 尝试获取许可(最多等待15秒)
Mono<String> acquireMono = semaphore.tryAcquire(15, TimeUnit.SECONDS);
// 尝试获取许可(租约15秒,最多等待10秒)
Mono<String> acquireMono = semaphore.tryAcquire(10, 15, TimeUnit.SECONDS);
acquireMono.flatMap(id -> {
// ...
return semaphore.release(id);
}).subscribe();
RxJava3接口使用示例:
RedissonRxClient redisson = redissonClient.rxJava();
RPermitExpirableSemaphoreRx semaphore = redisson.getPermitExpirableSemaphore("mySemaphore");
Single<Boolean> setRx = semaphore.trySetPermits(23);
// 获取许可
Single<String> acquireRx = semaphore.acquire();
// 获取许可(租约10秒)
Single<String> acquireRx = semaphore.acquire(10, TimeUnit.SECONDS);
// 尝试获取许可
Maybe<String> acquireRx = semaphore.tryAcquire();
// 尝试获取许可(最多等待15秒)
Maybe<String> acquireRx = semaphore.tryAcquire(15, TimeUnit.SECONDS);
// 尝试获取许可(租约15秒,最多等待10秒)
Maybe<String> acquireRx = semaphore.tryAcquire(10, 15, TimeUnit.SECONDS);
acquireRx.flatMap(id -> {
// ...
return semaphore.release(id);
}).subscribe();
倒计时门闩(CountDownLatch)
基于Redis或Valkey的分布式倒计时门闩(Java实现),需通过trySetCount(count)
初始化计数器。
代码示例:
RCountDownLatch latch = redisson.getCountDownLatch("myCountDownLatch");
// 初始化计数器为1
latch.trySetCount(1);
// 等待计数器归零
latch.await();
// 其他线程或JVM中减少计数
RCountDownLatch latch = redisson.getCountDownLatch("myCountDownLatch");
latch.countDown();
异步接口使用示例:
RCountDownLatch latch = redisson.getCountDownLatch("myCountDownLatch");
RFuture<Boolean> setFuture = latch.trySetCountAsync(1);
// 等待计数器归零
RFuture<Void> awaitFuture = latch.awaitAsync();
// 其他线程或JVM中减少计数
RCountDownLatch latch = redisson.getCountDownLatch("myCountDownLatch");
RFuture<Void> countFuture = latch.countDownAsync();
响应式接口使用示例:
RedissonReactiveClient redisson = redissonClient.reactive();
RCountDownLatchReactive latch = redisson.getCountDownLatch("myCountDownLatch");
Mono<Boolean> setMono = latch.trySetCount(1);
// 等待计数器归零
Mono<Void> awaitMono = latch.await();
// 其他线程或JVM中减少计数
RCountDownLatchReactive latch = redisson.getCountDownLatch("myCountDownLatch");
Mono<Void> countMono = latch.countDown();
RxJava3接口使用示例:
RedissonRxClient redisson = redissonClient.rxJava();
RCountDownLatchRx latch = redisson.getCountDownLatch("myCountDownLatch");
Single<Boolean> setRx = latch.trySetCount(1);
// 等待计数器归零
Completable awaitRx = latch.await();
// 其他线程或JVM中减少计数
RCountDownLatchRx latch = redisson.getCountDownLatch("myCountDownLatch");
Completable countRx = latch.countDown();
自旋锁(Spin Lock)
基于Redis或Valkey的分布式可重入自旋锁(Java实现),采用指数退避策略避免因短时间内大量锁操作导致的网络吞吐量限制和Redis/Valkey CPU过载。锁看门狗机制与普通锁相同,支持leaseTime
参数。解锁需由持有线程执行。
代码示例:
RLock lock = redisson.getSpinLock("myLock");
// 传统加锁方式
lock.lock();
// 加锁并10秒后自动解锁
lock.lock(10, TimeUnit.SECONDS);
// 等待最多100秒尝试获取锁,并在获取后10秒自动解锁
boolean res = lock.tryLock(100, 10, TimeUnit.SECONDS);
if (res) {
try {
...
} finally {
lock.unlock();
}
}
异步接口使用示例:
RLock lock = redisson.getSpinLock("myLock");
long threadId = Thread.currentThread().getId();
RFuture<Void> lockFuture = lock.lockAsync(threadId);
// 加锁并10秒后自动解锁
RFuture<Void> lockFuture = lock.lockAsync(10, TimeUnit.SECONDS, threadId);
// 等待最多100秒尝试获取锁,并在获取后10秒自动解锁
RFuture<Boolean> lockFuture = lock.tryLockAsync(100, 10, TimeUnit.SECONDS, threadId);
lockFuture.whenComplete((res, exception) -> {
// ...
lock.unlockAsync(threadId);
});
响应式接口使用示例:
RedissonReactiveClient redisson = redissonClient.reactive();
RLockReactive lock = redisson.getSpinLock("myLock");
long threadId = Thread.currentThread().getId();
Mono<Void> lockMono = lock.lock(threadId);
// 加锁并10秒后自动解锁
Mono<Void> lockMono = lock.lock(10, TimeUnit.SECONDS, threadId);
// 等待最多100秒尝试获取锁,并在获取后10秒自动解锁
Mono<Boolean> lockMono = lock.tryLock(100, 10, TimeUnit.SECONDS, threadId);
lockMono.doOnSuccess(res -> {
// ...
})
.doFinally(r -> lock.unlock(threadId).subscribe())
.subscribe();
RxJava3接口使用示例:
RedissonRxClient redisson = redissonClient.rxJava();
RLockRx lock = redisson.getSpinLock("myLock");
long threadId = Thread.currentThread().getId();
Completable lockRes = lock.lock(threadId);
// 加锁并10秒后自动解锁
Completable lockRes = lock.lock(10, TimeUnit.SECONDS, threadId);
// 等待最多100秒尝试获取锁,并在获取后10秒自动解锁
Single<Boolean> lockRes = lock.tryLock(100, 10, TimeUnit.SECONDS, threadId);
lockRes.doOnSuccess(res -> {
// ...
})
.doFinally(() -> lock.unlock(threadId).subscribe())
.subscribe();
栅栏锁(Fenced Lock)
基于Redis或Valkey的分布式可重入栅栏锁(Java实现),通过**防护令牌(Fencing Token)**避免因客户端长时间停顿(如GC暂停)导致的锁失效问题。加锁方法返回令牌,服务需校验令牌是否递增。
锁看门狗机制与普通锁相同,支持leaseTime
参数。解锁需由持有线程执行。
代码示例:
RFencedLock lock = redisson.getFencedLock("myLock");
// 加锁并获取令牌
Long token = lock.lockAndGetToken();
// 加锁并10秒后自动解锁,返回令牌
token = lock.lockAndGetToken(10, TimeUnit.SECONDS);
// 等待最多100秒尝试获取锁,并在获取后10秒自动解锁,返回令牌
Long token = lock.tryLockAndGetToken(100, 10, TimeUnit.SECONDS);
if (token != null) {
try {
// 校验令牌是否 >= 旧令牌
...
} finally {
lock.unlock();
}
}
异步接口使用示例:
RFencedLock lock = redisson.getFencedLock("myLock");
RFuture<Long> lockFuture = lock.lockAndGetTokenAsync();
// 加锁并10秒后自动解锁,返回令牌
RFuture<Long> lockFuture = lock.lockAndGetTokenAsync(10, TimeUnit.SECONDS);
// 等待最多100秒尝试获取锁,并在获取后10秒自动解锁,返回令牌
RFuture<Long> lockFuture = lock.tryLockAndGetTokenAsync(100, 10, TimeUnit.SECONDS);
long threadId = Thread.currentThread().getId();
lockFuture.whenComplete((token, exception) -> {
if (token != null) {
try {
// 校验令牌是否 >= 旧令牌
...
} finally {
lock.unlockAsync(threadId);
}
}
});
响应式接口使用示例:
RedissonReactiveClient redisson = redissonClient.reactive();
RFencedLockReactive lock = redisson.getFencedLock("myLock");
long threadId = Thread.currentThread().getId();
Mono<Long> lockMono = lock.lockAndGetToken();
// 加锁并10秒后自动解锁,返回令牌
Mono<Long> lockMono = lock.lockAndGetToken(10, TimeUnit.SECONDS);
// 等待最多100秒尝试获取锁,并在获取后10秒自动解锁,返回令牌
Mono<Long> lockMono = lock.tryLockAndGetToken(100, 10, TimeUnit.SECONDS);
lockMono.doOnSuccess(token -> {
if (token != null) {
try {
// 校验令牌是否 >= 旧令牌
...
} finally {
lock.unlock(threadId).subscribe();
}
}
})
.subscribe();
RxJava3接口使用示例:
RedissonRxClient redisson = redissonClient.rxJava();
RFencedLockRx lock = redisson.getFencedLock("myLock");
Single<Long> lockRes = lock.lockAndGetToken();
// 加锁并10秒后自动解锁,返回令牌
Single<Long> lockRes = lock.lockAndGetToken(10, TimeUnit.SECONDS);
// 等待最多100秒尝试获取锁,并在获取后10秒自动解锁,返回令牌
Single<Long> lockRes = lock.tryLock(100, 10, TimeUnit.SECONDS);
long threadId = Thread.currentThread().getId();
lockRes.doOnSuccess(token -> {
if (token != null) {
try {
// 校验令牌是否 >= 旧令牌
...
} finally {
lock.unlock(threadId).subscribe();
}
}
})
.subscribe();
官方文档
https://redisson.pro/docs/data-and-services/locks-and-synchronizers/