Redis高并发分布式锁实战
1.分布式场景下的synchronized失效的问题–用redis实现分布式锁
synchronized是通过monitor实现的jvm级别的锁,如果是分布式系统,跑在不同的虚拟机上的tomcat上,会导致synchronized无法锁住对象 ----------- 需要分布式锁 redis
SET、SETEX、SETNX
SET key value
含义:
SET KEY value V-K
相同的K 后写的覆盖先写的
SETEX key seconds value
该命令相当于将下面两行操作合并为一个原子操作
SET key value
EXPIRE key seconds # 设置生存时间
含义(setex = set expire):
SET KEY value V-K 设置生命周期
相同的K 后写的覆盖先写的
SETNX key value
含义(setnx = SET if Not eXists):
SET KEY value V-K ,key 不存在返回1 表示成功,key存在返回0
SETNX 是『SET if Not eXists』(如果不存在,则 SET)的简写。
返回值:
设置成功,返回 1 。
设置失败,返回 0 。
2.redis实现分布式锁
@RequestMapping("/redis-001")
public String redis001() {
String key = "redis-001";
Boolean result = stringRedisTemplate.opsForValue().setIfAbsent(key, "redis-001");
if (!result) {
return "error_code";
}
int stock = Integer.parseInt(stringRedisTemplate.opsForValue().get("stock"));
if (stock > 0) {
int realStock = stock - 1;
stringRedisTemplate.opsForValue().set("stock", realStock + "");
System.out.println("扣减成功,剩余库存:" + realStock);
} else {
System.out.println("扣减失败,库存不足");
}
stringRedisTemplate.delete(key);
return "end";
}
在 stringRedisTemplate.delete(key) 释放锁之前会有业务代码块,若出现异常抛出,则不能执行关锁的代码块
@RequestMapping("/redis-001")
public String redis001() {
String key = "redis-001";
Boolean result = stringRedisTemplate.opsForValue().setIfAbsent(key, "redis-001");
if (!result) {
return "error_code";
}
try {
int stock = Integer.parseInt(stringRedisTemplate.opsForValue().get("stock"));
if (stock > 0) {
int realStock = stock - 1;
stringRedisTemplate.opsForValue().set("stock", realStock + "");
System.out.println("扣减成功,剩余库存:" + realStock);
} else {
System.out.println("扣减失败,库存不足");
}
} finally {
stringRedisTemplate.delete(key);
}
return "end";
}
在程序执行的任意时刻都有可能应为不可抗力因素突然终止,重启、宕机导致不能执行到finally代码块,所以必须要设置超时时间
@RequestMapping("/redis-001")
public String redis001() {
String key = "redis-001";
Boolean result = stringRedisTemplate.opsForValue().setIfAbsent(key, "redis-001");
stringRedisTemplate.expire(key,2000 ,TimeUnit.MILLISECONDS);
if (!result) {
return "error_code";
}
try {
int stock = Integer.parseInt(stringRedisTemplate.opsForValue().get("stock"));
if (stock > 0) {
int realStock = stock - 1;
stringRedisTemplate.opsForValue().set("stock", realStock + "");
System.out.println("扣减成功,剩余库存:" + realStock);
} else {
System.out.println("扣减失败,库存不足");
}
} finally {
stringRedisTemplate.delete(key);
}
return "end";
}
redis设置的时候需要保证原子性
Boolean result = stringRedisTemplate.opsForValue().setIfAbsent(key, "redis-001");
stringRedisTemplate.expire(key,2000 ,TimeUnit.MILLISECONDS);
解决方案
Boolean result = stringRedisTemplate.opsForValue().setIfAbsent(key, clintId,2000 ,TimeUnit.MILLISECONDS);
若T1的锁设置失效时间20s,但是T1执行20s没有完成,此时T2可以获得锁,T1执行会在finally代码块中释放T2加的锁
@RequestMapping("/redis-001")
public String redis001() {
String key = "redis-001";
//不满足原子性
// Boolean result = stringRedisTemplate.opsForValue().setIfAbsent(key, "redis-001");
// stringRedisTemplate.expire(key,2000 ,TimeUnit.MILLISECONDS);
String clintId = UUID.randomUUID().toString();
//Boolean result = stringRedisTemplate.opsForValue().setIfAbsent(key, "redis-001",2000 ,TimeUnit.MILLISECONDS);
Boolean result = stringRedisTemplate.opsForValue().setIfAbsent(key, clintId,2000 ,TimeUnit.MILLISECONDS);
if (!result) {
return "error_code";
}
try {
int stock = Integer.parseInt(stringRedisTemplate.opsForValue().get("stock"));
if (stock > 0) {
int realStock = stock - 1;
stringRedisTemplate.opsForValue().set("stock", realStock + "");
System.out.println("扣减成功,剩余库存:" + realStock);
} else {
System.out.println("扣减失败,库存不足");
}
} finally {
if (clintId.equals(stringRedisTemplate.opsForValue().get(key))){
stringRedisTemplate.delete(key);
}
}
return "end";
}
finally {
if (clintId.equals(stringRedisTemplate.opsForValue().get(key))){
stringRedisTemplate.delete(key);
}
}
上述代码来确定是否是自己的锁,是没有原子性的,使用redisson(lua)来解决这个问题
Redisson
Redis 自 2.6 版本开始支持 Lua 脚本,是将所有操作打包成原子操作的一种机制。使用 Lua 脚本可以对 Redis 数据库进行复杂的操作,比如多个命令组合执行、避免分布式事务中的竞态条件等。
Lua 脚本在 Redis 中的原理是:将脚本发送到 Redis 服务器时,Redis 会先对脚本进行语法检查和编译,然后将编译后的字节码缓存起来并返回一个 SHA1 校验和。之后客户端每次需要执行这个脚本时,只需要将 SHA1 校验和发送给 Redis 服务器,Redis 通过校验和即可直接获取缓存中的字节码,避免了每次解析和编译 Lua 脚本的开销。
Lua 脚本的好处有以下几点:
-
原子性:Lua 脚本是 Redis 支持的最完整的事务形式,因为它们在 Redis 服务器上作为一个单独的脚本条目执行,因此能够保证所有操作的原子性。
-
灵活性:Lua 脚本方便对于 Redis 数据库进行复杂的操作,比如批量操作等。
-
性能:由于 Redis 会对 Lua 脚本进行预编译并缓存字节码,因此当相同的脚本被多次执行时,可以避免每次解析和编译脚本带来的开销。而且,Lua 脚本在 Redis 服务器中以单线程运行,相比于多线程,这样可以减少线程切换、锁等开销。
总之,Redis Lua 脚本具有良好的性能、灵活性和原子性,使得 Redis 支持更复杂、更安全地操作数据,提高了 Redis 在实际应用中的可靠性和稳定性。
Lua脚本语法
Redisson.lock()
<T> RFuture<T> tryLockInnerAsync(long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
internalLockLeaseTime = unit.toMillis(leaseTime);
return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, command,
"if (redis.call('exists', KEYS[1]) == 0) then " +
"redis.call('hset', KEYS[1], ARGV[2], 1); " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return nil; " +
"end; " +
"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
"redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return nil; " +
"end; " +
"return redis.call('pttl', KEYS[1]);",
Collections.<Object>singletonList(getName()), internalLockLeaseTime, getLockName(threadId));
}
getName() //RLock redissonLock = redisson.getLock(lockKey);
internalLockLeaseTime //internalLockLeaseTime = unit.toMillis(leaseTime);->
RFuture<Boolean> ttlRemainingFuture = tryLockInnerAsync(commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(), TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_NULL_BOOLEAN); //getLockWatchdogTimeout() 默认30s
getLockName(threadId) //final UUID id + threadId
设置锁成功如何执行看门狗机制实现续命
RFuture<Long> ttlRemainingFuture = tryLockInnerAsync(commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(), TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);
ttlRemainingFuture.addListener(new FutureListener<Long>() {
@Override
public void operationComplete(Future<Long> future) throws Exception {
if (!future.isSuccess()) {
return;
}
Long ttlRemaining = future.getNow(); //成功是nil 即null
// lock acquired
if (ttlRemaining == null) {
//成功一定进入的代码块
scheduleExpirationRenewal(threadId);
}
}
});
没有获得锁的线程自旋等待,间歇性尝试加锁
getEntry(threadId).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS); //是阻塞等待不会占用资源
ttl时间内如果当前获得锁的线程执行完成了怎么办,订阅模式
订阅的内容将在删除的时候更新
unlock使用了lua代码保证了原子操作