基于Redis实现的分布式锁
- 什么是分布式锁
- 分布式锁主流的实现方案
- Redis分布式锁
- Redis分布式锁的Java代码体现
- 优化一:使用UUID防止误删除
- 优化二:LUA保证删除原子性
什么是分布式锁
- 单体单机部署中可以为一个操作加上锁,这样其他操作就会等待锁释放才能操作
- 但是随业务的不断发展,单机应用常会被分布式集群系统所取代
在分布式集群中存在多台机器,如果给某台机器上加普通的锁,此锁只针对当前机器有效(因为jvm不能跨系统进行锁的控制),因此一种对所有机器都有效的锁应运而生,此即为分布式锁。
即随业务不断发展,需要一种跨JVM的互斥机制来控制共享资源的访问,这就是分布式锁机制要解决的问题!
分布式锁主流的实现方案
分布式锁主流实现方案:
-
- 基于数据库实现分布式锁
-
- 基于缓存(Redis等)
-
- 基于Zookeeper
每一种分布式锁解决方案都有各自的优缺点:
-
- 性能:redis最高
-
- 可靠性:zookeeper最高
这里,我们就基于redis实现分布式锁进行讲解。
Redis分布式锁
Redis中的setex命令就是针对分布式锁操作的一个命令。
回顾setex命令:(setnx中的“nx”表示“not exist:不存在”)
- setnx key value:只有在key 不存在时,才能设置 key 的值。如下图:
使用setnx命令相当于加了一把锁,只有当锁释放的时候此操作才可以继续进行。
思考此锁如何释放?
①首先我们想到的就是del命令删除数据,删除后锁释放,可以再次setnx。 如下图:
但此方案有缺陷。如果锁一直不释放,其他操作就只能等待。所以这样设计不合理!
②于是我们想到expire设置过期时间自动释放锁。如下图:
setnx上锁之后,设置过期时间(通过ttl命令可以查看key剩余多久过期)。过期之后,锁释放。即可再次进行setnx操作。
但上述方式依旧存在问题。
我们提倡的是原子操作,以上setnx操作和使用expire设置过期时间分了两步进行。如果setnx操作执行之后,还没有设置过期时间服务器就断电挂掉了,就不能设置过期时间。针对上锁之后出现异常的情况,引入第三种情况。
③上锁的同时设置上过期时间即可保证原子性操作
(ex表示expire:过期)
Redis分布式锁的Java代码体现
接下来我们通过编写Java代码用一个简单的例子进行演示:
①首先,创建一个SpringBoot空项目,将Redis整合进此项目
②存入redis一条数据,可以把此步骤看作一些具体业务
③Controller新增接口中写入如下代码
@GetMapping("testLock")
public void testLock(){
//1,获取锁,setnx
Boolean lock = redisTemplate.opsForValue().setIfAbsent("lock","111"); //此处相当于setnx的同时设置过期时间为3s
//2,获取锁成功,则从Redis中查询num的值
if(lock){
Object value = redisTemplate.opsForValue().get("num");
//判断num为空则直接return
if(StringUtils.isEmpty(value)){
return;
}
//有值就转成成int
int num = Integer.parseInt(value+"");
//把redis的num加1
redisTemplate.opsForValue().set("num", ++num);
//释放锁,del
redisTemplate.delete("lock");
}else{
//3获取锁失败,则每隔0.1秒再获取
try {
Thread.sleep(100);
testLock();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
优化一:使用UUID防止误删除
以上的代码还是存在问题的,可能会释放掉其他服务器的锁(即锁释放错的问题)。
异常场景:
两个操作分别记为a、b,设置锁在10秒内过期。
如果a先上锁,在a执行业务操作过程中,其服务器突然卡顿超过10秒。此时分布式锁就会过期而自动释放(此时a的业务操作还未结束
)。b拿到这把锁,b拿到之后会先上锁并执行业务操作,b在业务操作过程中
,a的服务器卡顿结束,就需要继续完成a的业务操作,并手动释放锁(但a的锁已经过期自动释放了,此时手动释放锁就会释放掉b的锁
),显然这是存在问题的。
解决上述问题的一个很好的方法是使用uuid防止误删除。
- 上锁的时候 set key uuid nx ex 10,上锁时设置value为一个唯一的随机值
- 利用uuid的唯一性,表示不同的操作
- 释放锁的时候补充
判断当前uuid和要释放锁的uuid是否一致,一致则释放,否则不释放
代码优化如下:
@GetMapping("testLock")
public void testLock(){
//1,生成uuid
String uuid = UUID.randomUUID().toString();
//2,获取锁,setnx (设置value为uuid)
Boolean lock = redisTemplate.opsForValue().setIfAbsent("lock",uuid,10,TimeUnit.SECONDS);
//3,获取锁成功,则从Redis中查询num的值
if(lock){
Object value = redisTemplate.opsForValue().get("num");
//判断num为空则直接return
if(StringUtils.isEmpty(value)){
return;
}
//有值就转成成int
int num = Integer.parseInt(value+"");
//把redis的num加1
redisTemplate.opsForValue().set("num", ++num);
//释放锁,del (释放之前判断当前的uuid是否一致,一致则释放)
String lock1 = (String) redisTemplate.opsForValue().get("lock");
if (lock1.equals(uuid)) {
redisTemplate.delete("lock");
}
}else{
//3,获取锁失败,则每隔0.1秒再获取
try {
Thread.sleep(100);
testLock();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
优化二:LUA保证删除原子性
上一个环节,我们通过uuid解决了误删除问题。但优化后的代码依然存在问题:缺乏原子性。
异常场景:
两个操作分别记为a、b,设置锁在10秒内过期。
如果a先上锁,a执行完成业务操作需要释放锁,假设判断发现uuid一致,此时即将进行释放锁。但服务器此时突然卡顿超过10秒。此时分布式锁就会过期而自动释放(
此时a的锁还未释放
)。b拿到这把锁,b拿到之后会先上锁并执行业务操作,b在业务操作过程中
,a的服务器卡顿结束,就需要继续释放锁(但a的锁已经过期自动释放了,此时手动释放锁就会释放掉b的锁
),显然这是存在原子性问题的。
解决上述问题的一个很好的方法是使用lua脚本(特点:支持原子性操作)。
将复杂的或多步骤的Redis操作,写为一个脚本,一次性提交给Redis执行,减少反复连接Redis,提高性能。
LUA脚本类似Redis事务,有一定的原子性,不会被其他命令插队,可以完成一些类似Redis事务性的操作。
注意:LUA脚本只有Redis 2.6以上版本可用。
@GetMapping("testLockLua")
public void testLockLua() {
//1 声明一个uuid ,将做为一个value 放入我们的key所对应的值中
String uuid = UUID.randomUUID().toString();
//2 定义一个锁:lua 脚本可以使用同一把锁,来实现删除!
String skuId = "25"; // 访问skuId 为25号的商品 100008348542
String locKey = "lock:" + skuId; // 锁住的是每个商品的数据
// 3 获取锁
Boolean lock = redisTemplate.opsForValue().setIfAbsent(locKey, uuid, 10, TimeUnit.SECONDS);
// 第一种: lock 与过期时间中间不写任何的代码。
// 如果true
if (lock) {
// 执行的业务逻辑开始
// 获取缓存中的num 数据
Object value = redisTemplate.opsForValue().get("num");
// 如果是空直接返回
if (StringUtils.isEmpty(value)) {
return;
}
// 不是空
int num = Integer.parseInt(value + "");
// 使num 每次+1 放入缓存
redisTemplate.opsForValue().set("num", String.valueOf(++num));
/*使用lua脚本来释放锁*/
// 定义lua 脚本
String script = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end";
// 使用redis执行lua执行
DefaultRedisScript<Long> redisScript = new DefaultRedisScript<>();
redisScript.setScriptText(script);
// 设置一下返回值类型 为Long
// 因为删除判断的时候,返回的0,给其封装为数据类型。如果不封装那么默认返回String 类型,
// 那么返回字符串与0 会有发生错误。
redisScript.setResultType(Long.class);
// 第一个要是script 脚本 ,第二个需要判断的key,第三个是value值。
redisTemplate.execute(redisScript, Arrays.asList(locKey), uuid);
} else {
// 其他线程等待
try {
// 睡眠
Thread.sleep(1000);
// 睡醒了之后,调用方法。
testLockLua();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
总结:
为确保分布式锁可用,我们至少要确保锁的实现同时满足以下四个条件:
- 互斥性。在任意时刻,只有一个客户端能持有锁。
- 不会发生死锁。即使有一个客户端在持有锁的期间崩溃而没有主动解锁,也能保证后续其他客户端能加锁。
- 解铃还须系铃人。加锁和解锁必须是同一个客户端,客户端自己不能释放掉别人加的锁。
- 加锁和解锁必须具有原子性。