Redisson的引入
我们先来看看之前的基于setnx实现的分布式锁存在的问题:
我们之前实现的分布式锁是基于redis的setnx命令的特性的!
但是,这样子实现起来会有很多弊端!
不可重入
简单的来说就是一旦setnx [key] [value]后,就不能再对这个key做任何操作了(除了删除)
假设我们在开发中有A和B两个业务,在业务A中,执行了setnx操作,然后在业务A中调用业务B。
然后在业务B中也有setnx的操作(同一个KEY)
此时,业务B就会阻塞在这里,等待业务A释放锁
但是,业务A肯定不会释放锁,因为业务A还没有执行完(调B)。故就会发生死锁。
不可重试
在我们之前业务逻辑中,尝试获取锁,如果获取不到就直接return了,没有“重来”的机会!也无法提供重试的机制!
超时释放
我们之前,分析过分布式锁被误删的问题。这个问题是已经解决了。
但是,仍然会存在隐患!我们这里是用TTL来控制它。业务执行,时间多少,这是一个未知数,TTL要怎么设置?如何处理业务阻塞?
主从一致性
在主节点上获取到了锁,但是主节点突然宕机了,就会从从结点中选出一个节点,作为主节点。
但由于,因为之前的那个主节点宕机了。在新选举出来的这个主节点中是无法获取到之前的锁。
所以之前的那个锁相当于失效了!
Redisson
要解决上述问题并不是那么容易的,如果我们自己实现很有可能会出一些问题!所以最好的办法就是使用市面上的一些框架来解决!
什么是Redisson?
Redisson是一个在Redis的基础上实现的Java驻内存数据网格(In-Memory Data Grid)。它不仅提供了一系列的分布式的Java常用对象,还提供了许多分布式服务,其中就包含了各种分布式锁的实现。
Redisson使用手册
0. 项目介绍 - 《Redisson 使用手册》 - 书栈网 · BookStackhttps://www.bookstack.cn/read/redisson-wiki-zh/Redisson%E9%A1%B9%E7%9B%AE%E4%BB%8B%E7%BB%8D.md里面提到了Redisson可以实现大致如下的分布式锁
Redisson快速入门(Demo)
(1)导依赖
<!-- redis-redisson -->
<dependency>
<groupId>org.redisson</groupId>
<artifactId>redisson</artifactId>
<version>3.13.6</version>
</dependency>
(2)配置Redisson客户端
/**
* 配置 Redisson
*/
@Configuration
public class RedisConfig {
@Bean
public RedissonClient redissonClient() {
// 配置
Config config = new Config();
config.useSingleServer().setAddress("redis://192.168.89.128:6379").setPassword("888888");
// 创建 RedissonClient 对象
return Redisson.create(config);
}
}
(3)使用Redisson的分布式锁
@Test
void testRedisson() throws Exception {
RLock anyLock = redissonClient.getLock("anyLock");
boolean isLock = anyLock.tryLock(1, 10, TimeUnit.SECONDS);
if(isLock) {
try {
System.out.println("执行业务");
} finally {
anyLock.unlock();
}
}
}
测试结果
Redisson实现可重入锁
这里可重入锁的实现 和 Java的 ReentrantLock 类似!
获取锁的时候,先判断是不是同一个对象,是就将 value+1,释放锁的时候就 value-1,当其小于0时就将该key删除!
在Redis中使用 Hash结构 去存储!
Redisson 关于获取锁、释放锁的源码分析
获取锁
private RFuture<Boolean> tryAcquireOnceAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId) {
if (leaseTime != -1) {
return tryLockInnerAsync(waitTime, leaseTime, unit, threadId, RedisCommands.EVAL_NULL_BOOLEAN);
}
RFuture<Boolean> ttlRemainingFuture = tryLockInnerAsync(waitTime,
commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(),
TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_NULL_BOOLEAN);
ttlRemainingFuture.onComplete((ttlRemaining, e) -> {
if (e != null) {
return;
}
// lock acquired
if (ttlRemaining) {
scheduleExpirationRenewal(threadId);
}
});
return ttlRemainingFuture;
}
<T> RFuture<T> tryLockInnerAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
internalLockLeaseTime = unit.toMillis(leaseTime);
// 在Lua脚本中起始位是1
return evalWriteAsync(getName(), LongCodec.INSTANCE, command,
"if (redis.call('exists', KEYS[1]) == 0) then " +
"redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return nil; " +
"end; " +
"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
"redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return nil; " +
"end; " +
"return redis.call('pttl', KEYS[1]);",
Collections.singletonList(getName()), internalLockLeaseTime, getLockName(threadId));
}
上述代码中字符串部分就是Lua脚本,Redisson用其实现可重入锁!
Redisson 获取锁中的Lua脚本源码解析
-- 判断锁是否存在
if (redis.call('exists', KEYS[1]) == 0) then
-- 不存在,获取锁
redis.call('hincrby', KEYS[1], ARGV[2], 1);
-- 设置有效期
redis.call('pexpire', KEYS[1], ARGV[1]);
return nil;
end;
-- 锁已经存在,判断是否是自己?!
if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then
-- 自增+1
redis.call('hincrby', KEYS[1], ARGV[2], 1);
-- 重置有效期
redis.call('pexpire', KEYS[1], ARGV[1]);
return nil;
end;
return redis.call('pttl', KEYS[1]);
释放锁
@Override
public RFuture<Void> unlockAsync(long threadId) {
RPromise<Void> result = new RedissonPromise<Void>();
RFuture<Boolean> future = unlockInnerAsync(threadId);
future.onComplete((opStatus, e) -> {
cancelExpirationRenewal(threadId);
if (e != null) {
result.tryFailure(e);
return;
}
if (opStatus == null) {
IllegalMonitorStateException cause = new IllegalMonitorStateException("attempt to unlock lock, not locked by current thread by node id: "
+ id + " thread-id: " + threadId);
result.tryFailure(cause);
return;
}
result.trySuccess(null);
});
return result;
}
protected RFuture<Boolean> unlockInnerAsync(long threadId) {
return evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
"if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " +
"return nil;" +
"end; " +
"local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " +
"if (counter > 0) then " +
"redis.call('pexpire', KEYS[1], ARGV[2]); " +
"return 0; " +
"else " +
"redis.call('del', KEYS[1]); " +
"redis.call('publish', KEYS[2], ARGV[1]); " +
"return 1; " +
"end; " +
"return nil;",
Arrays.asList(getName(), getChannelName()), LockPubSub.UNLOCK_MESSAGE, internalLockLeaseTime, getLockName(threadId));
}
Redisson 释放锁中的Lua脚本源码解析
-- 判断当前锁是否还是被自己持有
if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then
-- 不是就就直接返回
return nil;
end;
-- 是自己,则重入次数 -1
local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1);
-- 判断重入次数是否已经为0
if (counter > 0) then
-- 大于0,说明不能释放,重置有效期即可
redis.call('pexpire', KEYS[1], ARGV[2]);
return 0;
else
-- 等于0,说明可以直接删除
redis.call('del', KEYS[1]);
-- 发消息
redis.call('publish', KEYS[2], ARGV[1]);
return 1;
end;
return nil;
测试代码
我们这边模拟一下锁重入的场景。方法A上锁后调方法B,方法B也获取锁(如果是不可重入,这里就会阻塞!)
/**
* Redisson的单元测试
*/
@SpringBootTest
@Slf4j
public class RedissonTest {
@Resource
private RedissonClient redissonClient;
private RLock lock;
@BeforeEach
void setUp() {
lock = redissonClient.getLock("order");
}
@Test
void method1() {
boolean isLock = lock.tryLock();
if (!isLock) {
log.error("获取锁失败 ... 1");
return;
}
try {
log.info("获取锁成功 ... 1");
method2();
log.info("开始执行业务 ... 1");
} finally {
log.warn("准备释放锁 ... 1");
lock.unlock();
}
}
@Test
void method2() {
boolean isLock = lock.tryLock();
if (!isLock) {
log.error("获取锁失败 ... 2");
return;
}
try {
log.info("获取锁成功 ... 2");
log.info("开始执行业务 ... 2");
} finally {
log.warn("准备释放锁 ... 2");
lock.unlock();
}
}
}
运行结果
Redis 中值的情况