1、Redisson简介
一个基于Redis实现的分布式工具,有基本分布式对象和高级又抽象的分布式服务,为每个试图再造分布式轮子的程序员带来了大部分分布式问题的解决办法。
2、使用方法
引入依赖
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
使用代码:
@Autowired
private StringRedisTemplate stringRedisTemplate;
@RequestMapping("/deduct_stock")
public String deductStock() {
String lockKey = "lock:product_101";
//获取锁对象
RLock redissonLock = redisson.getLock(lockKey);
//加分布式锁
redissonLock.lock();
try {
int stock = Integer.parseInt(stringRedisTemplate.opsForValue().get("stock"));
if (stock > 0) {
int realStock = stock - 1;
stringRedisTemplate.opsForValue().set("stock", realStock + "");
System.out.println("扣减成功,剩余库存:" + realStock);
} else {
System.out.println("扣减失败,库存不足");
}
} finally {
//解锁
redissonLock.unlock();
}
return "end";
}
3、原理
流程图
线程1在加锁成功后,会在后台开启一个线程,这个线程每隔10秒检查是否还持有锁,如果持有锁则延长锁的时间,这一步操作称之为锁续命。
然后线程2这时候去加锁,如果失败了则会间隙性自旋加锁,直到成功为止。
源码
加锁核心源码
<T> RFuture<T> tryLockInnerAsync(long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
internalLockLeaseTime = unit.toMillis(leaseTime);
return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, command,
"if (redis.call('exists', KEYS[1]) == 0) then " +
"redis.call('hset', KEYS[1], ARGV[2], 1); " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return nil; " +
"end; " +
"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
"redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return nil; " +
"end; " +
"return redis.call('pttl', KEYS[1]);",
Collections.<Object>singletonList(getName()), internalLockLeaseTime, getLockName(threadId));
}
1、if (redis.call('exists', KEYS[1]) == 0)
如果锁不存在
2、redis.call('hset', KEYS[1], ARGV[2], 1);
设置锁,key value, value为uuid+threadId
3、redis.call('pexpire', KEYS[1], ARGV[1]);
设置锁过期时间,默认30秒
锁续命源码
RFuture<Long> ttlRemainingFuture = tryLockInnerAsync(commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(), TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);
ttlRemainingFuture.addListener(new FutureListener<Long>() {
@Override
public void operationComplete(Future<Long> future) throws Exception {
if (!future.isSuccess()) {
return;
}
Long ttlRemaining = future.getNow();
// 加锁成功后,开启定时任务进行锁续命
if (ttlRemaining == null) {
scheduleExpirationRenewal(threadId);
}
}
});
// 定时任务锁续命
Timeout task = commandExecutor.getConnectionManager().newTimeout(new TimerTask() {
@Override
public void run(Timeout timeout) throws Exception {
RFuture<Boolean> future = commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return 1; " +
"end; " +
"return 0;",
Collections.<Object>singletonList(getName()), internalLockLeaseTime, getLockName(threadId));
future.addListener(new FutureListener<Boolean>() {
@Override
public void operationComplete(Future<Boolean> future) throws Exception {
expirationRenewalMap.remove(getEntryName());
if (!future.isSuccess()) {
log.error("Can't update lock " + getName() + " expiration", future.cause());
return;
}
if (future.getNow()) {
// reschedule itself
scheduleExpirationRenewal(threadId);
}
}
});
}
}, internalLockLeaseTime / 3, TimeUnit.MILLISECONDS);
1、if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then
判断锁是否存在
2、redis.call('pexpire', KEYS[1], ARGV[1]);
把锁的超时时间重新设置为30秒
间歇性加锁源码
try {
while (true) {
// ttl不为NULL时,表示加锁失败,返回锁剩余过期时间
ttl = tryAcquire(leaseTime, unit, threadId);
// ttl为null时代表加锁成功
if (ttl == null) {
break;
}
// waiting for message
if (ttl >= 0) {
// 当前线程阻塞ttl秒,释放CPU资源
getEntry(threadId).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
} else {
getEntry(threadId).getLatch().acquire();
}
}
} finally {
unsubscribe(future, threadId);
}
若锁未过期,这时候主线程将锁释放,如何实现唤醒阻塞中的线程呢?使用redis的发布订阅模式
// 未抢到锁的线程订阅了一个channel
RFuture<RedissonLockEntry> future = subscribe(threadId);
commandExecutor.syncSubscription(future);
在解锁的时候发布释放锁的消息
return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
"if (redis.call('exists', KEYS[1]) == 0) then " +
"redis.call('publish', KEYS[2], ARGV[1]); " +
"return 1; " +
"end;" +
"if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " +
"return nil;" +
"end; " +
"local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " +
"if (counter > 0) then " +
"redis.call('pexpire', KEYS[1], ARGV[2]); " +
"return 0; " +
"else " +
"redis.call('del', KEYS[1]); " +
"redis.call('publish', KEYS[2], ARGV[1]); " +
"return 1; "+
"end; " +
"return nil;",
Arrays.<Object>asList(getName(), getChannelName()), LockPubSub.unlockMessage, internalLockLeaseTime, getLockName(threadId));
1、if (redis.call('exists', KEYS[1]) == 0) then
如果锁不存在
2、redis.call('publish', KEYS[2], ARGV[1]);
发布消息告诉阻塞中的线程锁释放
or
1、if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then
如果不是我加的锁
2、return nil;
返回
or
1、local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1);
将锁数量-1
2、if (counter > 0) then
若锁数量>0(注意,这里判断锁数量是因为有可能有重入锁)
3、redis.call('del', KEYS[1]);
删除锁
4、redis.call('publish', KEYS[2], ARGV[1]);
发布锁删除消息,通知阻塞中的线程