一、基本分布式锁实现
1、案例(扣减库存)
@RequestMapping("reduceStock")
public String reduceStock() {
String lockKey = "lock:product_101";
String clientId = UUID.randomUUID().toString();
// 过期时间要和设置key成为一条命令 否则分布式环境下可能机器宕机导致其它节点无法获取锁 设置clientId是为了防止高并发场景下释放其它线程的锁
Boolean result = stringRedisTemplate.opsForValue().setIfAbsent(lockKey, clientId, 10, TimeUnit.SECONDS); //jedis.setnx(k,v)
if (!result) {
return "error_code";
}
try {
int stock = Integer.parseInt(stringRedisTemplate.opsForValue().get("stock")); // jedis.get("stock")
if (stock > 0) {
int realStock = stock - 1;
stringRedisTemplate.opsForValue().set("stock", realStock + ""); // jedis.set(key,value)
System.out.println("扣减成功,剩余库存:" + realStock);
} else {
System.out.println("扣减失败,库存不足");
}
} finally {
if (clientId.equals(stringRedisTemplate.opsForValue().get(lockKey))) {
stringRedisTemplate.delete(lockKey);
}
}
return "end";
}
2、存在问题
在高并发场景下,各节点执行时间会变慢,比如A线程执行过程中,key超时了,其它线程又可以加锁,比如B线程就可以加锁成功,B线程执行过程中A线程执行完了就会把key锁释放,此时B线程如果正在执行中,结果B的锁又被释放了
二、使用redission实现分布式锁
springboot下引入redisson相关依赖
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
<dependency>
<groupId>redis.clients</groupId>
<artifactId>jedis</artifactId>
<version>2.9.0</version>
</dependency>
<dependency>
<groupId>org.redisson</groupId>
<artifactId>redisson</artifactId>
<version>3.6.5</version>
</dependency>
使用
@RequestMapping("reduceStock")
public String reduceStock() {
String lockKey = "lock:product_001";
//获取锁对象
RLock redissonLock = redisson.getLock(lockKey);
//加分布式锁
redissonLock.lock(); // .setIfAbsent(lockKey, clientId, 30, TimeUnit.SECONDS);
try {
int stock = Integer.parseInt(stringRedisTemplate.opsForValue().get("stock")); // jedis.get("stock")
if (stock > 0) {
int realStock = stock - 1;
stringRedisTemplate.opsForValue().set("stock", realStock + ""); // jedis.set(key,value)
System.out.println("扣减成功,剩余库存:" + realStock);
} else {
System.out.println("扣减失败,库存不足");
}
} finally {
//解锁
redissonLock.unlock();
}
return "end";
}
三、redisson源码分析
1、加锁
public void lockInterruptibly() throws InterruptedException {
this.lockInterruptibly(-1L, (TimeUnit)null);
}
public void lockInterruptibly(long leaseTime, TimeUnit unit) throws InterruptedException {
// 线程id
long threadId = Thread.currentThread().getId();
// 尝试获取锁,这部分是获取锁的关键方法
Long ttl = this.tryAcquire(leaseTime, unit, threadId);
// 不等于null说明获取锁失败
if (ttl != null) {
// redis订阅redisson_lock__channel + ":" + id + ":" + threadId
RFuture<RedissonLockEntry> future = this.subscribe(threadId);
this.commandExecutor.syncSubscription(future);
try {
while(true) {
// 再尝试获取一次锁
ttl = this.tryAcquire(leaseTime, unit, threadId);
if (ttl == null) {
// 获取成功直接返回
return;
}
// 大于等于0说明还没释放锁,通过semaphore阻塞时长为key的剩余有效时间
if (ttl >= 0L) {
this.getEntry(threadId).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
} else {
// 说明锁时间已到期可以尝试获取锁
this.getEntry(threadId).getLatch().acquire();
}
}
} finally {
this.unsubscribe(future, threadId);
}
}
}
private Long tryAcquire(long leaseTime, TimeUnit unit, long threadId) {
return (Long)this.get(this.tryAcquireAsync(leaseTime, unit, threadId));
}
private RFuture<Boolean> tryAcquireOnceAsync(long leaseTime, TimeUnit unit, final long threadId) {
if (leaseTime != -1L) {
// 持有锁的时间不等于-1说明是重入
return this.tryLockInnerAsync(leaseTime, unit, threadId, RedisCommands.EVAL_NULL_BOOLEAN);
} else {
// 执行Lua脚本返回结果,如果加锁成功返回null不成功返回key剩余有效时间
RFuture<Boolean> ttlRemainingFuture = this.tryLockInnerAsync(this.commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(), TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_NULL_BOOLEAN);
ttlRemainingFuture.addListener(new FutureListener<Boolean>() {
public void operationComplete(Future<Boolean> future) throws Exception {
if (future.isSuccess()) {
Boolean ttlRemaining = (Boolean)future.getNow();
if (ttlRemaining) {
// 递归调用scheduleExpirationRenewal()方法延时执行
RedissonLock.this.scheduleExpirationRenewal(threadId);
}
}
}
});
return ttlRemainingFuture;
}
}
<T> RFuture<T> tryLockInnerAsync(long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
this.internalLockLeaseTime = unit.toMillis(leaseTime);
return this.commandExecutor.evalWriteAsync(this.getName(),
LongCodec.INSTANCE, command,
// 如果key不存在,证明锁被释放,加锁成功,设置过期时间,直接返回null
"if (redis.call('exists', KEYS[1]) == 0) then
redis.call('hset', KEYS[1], ARGV[2], 1);
redis.call('pexpire', KEYS[1], ARGV[1]);
return nil; end;
// key和field确定的加锁的就是当前线程,加锁成功,重新设置过期时间,重入次数加1,返回null
if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then
redis.call('hincrby', KEYS[1], ARGV[2], 1);
redis.call('pexpire', KEYS[1], ARGV[1]);
return nil; end;
// pttl是返回剩余key的有效时间
return redis.call('pttl', KEYS[1]);", Collections.singletonList(this.getName()), new Object[]{this.internalLockLeaseTime, this.getLockName(threadId)});
}
2、加锁成功线程的子线程加锁延长持有锁时间
private void scheduleExpirationRenewal(final long threadId) {
if (!expirationRenewalMap.containsKey(this.getEntryName())) {
Timeout task = this.commandExecutor.getConnectionManager().newTimeout(new TimerTask() {
public void run(Timeout timeout) throws Exception {
RFuture<Boolean> future = RedissonLock.this.commandExecutor.evalWriteAsync(RedissonLock.this.getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
// 如果key对应的field存在,也就是当前线程,会重新设置过期时间
"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then
redis.call('pexpire', KEYS[1], ARGV[1]);
return 1; end;
return 0;",
Collections.singletonList(RedissonLock.this.getName()), new Object[]{RedissonLock.this.internalLockLeaseTime, RedissonLock.this.getLockName(threadId)});
future.addListener(new FutureListener<Boolean>() {
public void operationComplete(Future<Boolean> future) throws Exception {
RedissonLock.expirationRenewalMap.remove(RedissonLock.this.getEntryName());
if (!future.isSuccess()) {
RedissonLock.log.error("Can't update lock " + RedissonLock.this.getName() + " expiration", future.cause());
} else {
if ((Boolean)future.getNow()) {
RedissonLock.this.scheduleExpirationRenewal(threadId);
}
}
}
});
}
}, this.internalLockLeaseTime / 3L, TimeUnit.MILLISECONDS);
if (expirationRenewalMap.putIfAbsent(this.getEntryName(), task) != null) {
task.cancel();
}
}
}
3、加锁失败的线程订阅channel
protected RFuture<RedissonLockEntry> subscribe(long threadId) {
// 订阅
return PUBSUB.subscribe(this.getEntryName(), this.getChannelName(), this.commandExecutor.getConnectionManager().getSubscribeService());
}
// 如果发布了订阅回来执行这个方法
protected void onMessage(RedissonLockEntry value, Long message) {
if (message.equals(unlockMessage)) {
// semaphore释放资源 之前的tryAcquire()方法就会被唤醒
value.getLatch().release();
while(true) {
Runnable runnableToExecute = null;
synchronized(value) {
Runnable runnable = (Runnable)value.getListeners().poll();
if (runnable != null) {
if (value.getLatch().tryAcquire()) {
runnableToExecute = runnable;
} else {
value.addListener(runnable);
}
}
}
if (runnableToExecute == null) {
return;
}
runnableToExecute.run();
}
}
}
4、释放锁发布channel
protected RFuture<Boolean> unlockInnerAsync(long threadId) {
return this.commandExecutor.evalWriteAsync(this.getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
// 如果key已经失效,证明锁已经被释放,直接发布订阅消息
"if (redis.call('exists', KEYS[1]) == 0) then
redis.call('publish', KEYS[2], ARGV[1]); return 1; end;
// 如果发现调用释放锁的线程不是当前线程返回null
if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then
return nil;end;
// key对应field的value减1
local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1);
// 如果还是大于0 说明是重入锁,不用管
if (counter > 0) then
redis.call('pexpire', KEYS[1], ARGV[2]); return 0;
else
// 真正的释放锁 删除key 发布订阅消息,就会触发之前的onMessage()方法,释放semaphore资源
redis.call('del', KEYS[1]); redis.call('publish', KEYS[2], ARGV[1]); return 1; end;
return nil;", Arrays.asList(this.getName(), this.getChannelName()), new Object[]{LockPubSub.unlockMessage, this.internalLockLeaseTime, this.getLockName(threadId)});
}