分布式锁
synchronized只能保证单个JVM内部的线程互斥,不能保证集群模式下的多个JVM的线程互斥。
分布式锁原理
每个JVM内部都有自己的锁监视器,但是跨JVM,就会有多个锁监视器,就会有多个线程获取到锁,不能实现多JVM进程之间的互斥。
我们不能使用JVM内部的锁监视器,我们必须让多个JVM去使用同一个锁监视器,所以肯定是一个独立于JVM内部的,多个JVM都可以看到的监视器。
过程
特性
多进程可见
多个JVM都可以看到,比如Redis,MySQL等。JVM外部的基本都可以实现。
互斥
只能有一个人拿到锁
高可用
大多数情况下,获取锁都是成功的,而不是频繁失败
高并发/高性能
加锁本身就会影响性能,会变成串行执行,如果加锁本身也很慢,就不行了。
安全性
异常情况下,比如,获取锁完毕之后,锁无法释放,服务宕机了。
死锁问题等等。
功能性特性
比如是否可重入,阻塞还是非阻塞的,公平还是非公平锁
不同的分布式锁区别
MySQL
- 互斥:通过事务的互斥锁来实现,事务提交锁释放,异常事务回滚
- 高可用:依赖MySQL本身的高可用
- 高性能:受限于MySQL的性能
- 安全性:通过事务获取锁,断开链接的时候,锁会自动释放
Redis
- 互斥:通过setnx互斥命令来实现互斥
- 高可用:Redis本身可以实现主从和集群模式,可用性高
- 高性能:较高
- 安全性:服务出现故障,锁无法释放,死锁,可以利用key的过期机制来实现
Zookeeper
- 互斥:利用内部节点的唯一性和有序性来实现,每个节点的id都是自增的,删除节点,另外一个节点就说最小的了
- 高可用:支持集群
- 高性能:保证强一致性,主从之间数据同步会消耗一定时间
- 安全性:创建的是临时节点,服务宕机,锁会自动释放
Redis实现分布式锁
分布式锁需要实现两个最基本的方法
获取锁
互斥
确保只能有一个线程执行成功。通过redis的setnx命令来实现,同时执行时,只有1个能执行成功,实现互斥。
#获取锁
setnx key value
- 添加锁的过期时间,避免服务宕机引起死锁。过期时间需要注意,业务还没处理完但是锁过期的问题
#设置过期时间
expire key 10
为了避免出现,setnx后,expire之前,服务宕机的问题,我们将两条命令合并为一条,保证原子性
#添加锁 nx是互斥,ex是过期时间
set key value ex 10 nx
#或者
set key value nx ex 10
非/阻塞式获取锁
获取锁成功返回ok,失败返回nil,如果失败了,有两种解决方案,jdk中,有两种方案:一直阻塞式等待,另一种,获取锁失败即刻返回。
非阻塞式获取锁,尝试一次,成功返回true,失败返回false!
释放锁
手动释放
手动删除即可
#释放锁
del key
超时释放
获取锁时,添加一个超时时间,避免出现服务宕机,锁无法被释放
流程
分布式锁初级版
执行流程
分布式锁代码
接口
/**
* 分布式锁
*
* @author zhangzengxiu
* @date 2023/10/9
*/
public interface ILock {
/**
* 尝试去获取锁
*
* @param timeoutSc 过期时间,过期锁自动释放
* @return 获取成功返回true,失败返回false
*/
boolean tryLock(long timeoutSc);
/**
* 释放锁
*/
void unlock();
}
实现
import org.springframework.data.redis.core.StringRedisTemplate;
import java.util.concurrent.TimeUnit;
/**
* @author zhangzengxiu
* @date 2023/10/9
*/
public class SimpleRedisLock implements ILock {
private StringRedisTemplate stringRedisTemplate;
/**
* 锁统一前缀
*/
public static final String KEY_PRE = "lock:";
/**
* 业务名称
*/
private String name;
public SimpleRedisLock(StringRedisTemplate stringRedisTemplate, String name) {
this.stringRedisTemplate = stringRedisTemplate;
this.name = name;
}
@Override
public boolean tryLock(long timeoutSc) {
//获取线程标识
long threadId = Thread.currentThread().getId();
String key = KEY_PRE + name;
String value = String.valueOf(threadId);
Boolean res = stringRedisTemplate.opsForValue().setIfAbsent(key, value, timeoutSc, TimeUnit.SECONDS);
return Boolean.TRUE.equals(res);
}
@Override
public void unlock() {
String key = KEY_PRE + name;
stringRedisTemplate.delete(key);
}
}
业务代码
异常情况
线程1尝试去获取锁,获取到锁之后,
- 正常情况:业务执行完毕后,正常释放锁
- 异常情况:业务执行时间超过了锁的超时时间,锁被超时释放;
- 误删锁
- 线程1的锁由于业务阻塞被超时释放了,此时锁被线程2获取到了,此时线程1醒了,继续执行并释放了锁,此时被释放的锁是线程2的锁。
- 这时线程3也获取到了被释放的锁,此时相当于多个线程在并行执行,线程并发安全问题依然存在。
解决方案
释放锁的时候判断是不是自己的锁,是自己的锁才能释放,否则无法释放锁。
改进分布式锁(解决锁误删问题)
线程id是JVM内部递增的,集群模式下,每个JVM内部都会有自增的线程id,会出现线程id冲突的情况。
如果只是使用线程id作为区分是不行的,还要区分JVM,我们可以使用UUID或者线程id拼接UUID的形式来实现。通过UUID来区分不同的JVM,再通过线程id来区分不同的线程。
业务流程
分布式锁代码实现
import cn.hutool.core.lang.UUID;
import org.apache.commons.lang3.StringUtils;
import org.springframework.data.redis.core.StringRedisTemplate;
import java.util.concurrent.TimeUnit;
/**
* @author zhangzengxiu
* @date 2023/10/9
*/
public class SimpleRedisLock implements ILock {
private StringRedisTemplate stringRedisTemplate;
/**
* 锁统一前缀
*/
public static final String KEY_PRE = "lock:";
/**
* 锁的值的前缀
*/
public static final String ID_PRE = UUID.randomUUID().toString(true) + "—";
/**
* 业务名称
*/
private String name;
public SimpleRedisLock(StringRedisTemplate stringRedisTemplate, String name) {
this.stringRedisTemplate = stringRedisTemplate;
this.name = name;
}
@Override
public boolean tryLock(long timeoutSc) {
//获取线程标识
String value = ID_PRE + Thread.currentThread().getId();
String key = KEY_PRE + name;
Boolean res = stringRedisTemplate.opsForValue().setIfAbsent(key, value, timeoutSc, TimeUnit.SECONDS);
return Boolean.TRUE.equals(res);
}
@Override
public void unlock() {
//获取线程标识
String value = ID_PRE + Thread.currentThread().getId();
String key = KEY_PRE + name;
//获取锁中的标识
String val = stringRedisTemplate.opsForValue().get(key);
if (StringUtils.equals(value, val)) {
//释放锁
stringRedisTemplate.delete(key);
}
}
}
异常情况
当前代码依然存在异常情况,比如:
- 线程1操作结束,释放锁的时候,先判断是否是自己的锁,然后准备释放的时候,被阻塞了,可能是因为JVM的垃圾回收机制FullGC导致了阻塞,导致了线程1 的锁由于超时自动释放
- 此时线程2获取到了锁,在执行业务代码的过程中,线程1结束了阻塞,此时直接去释放了锁,但是此时释放的锁却是线程2的锁;
- 现在属于无锁状态,此时线程3获取到了锁,线程2和3就属于并行执行,线程安全问题再次出现。
问题:判断锁和释放锁是两个操作,并不具有原子性!!!
Lua脚本解决原子性问题
判断锁+释放锁在特殊情况下依然存在原子性问题,也可以通过Redis的事务+乐观锁机制来实现。
Lua
Redis提供了Lua脚本功能,在一个脚本中编写多条Redis命令,确保多条命令执行时的原子性。
借鉴网站:Lua 基本语法 | 菜鸟教程
我们可以使用Redis提供的函数进行调用,
redis.call('命令名称','key','其他参数',...);
示例代码:
redis.call('set','key','value');
执行脚本
EVAL "return redis.call('set','key','value')" 0
说明:
其中双引号中的内容是脚本内容
0:表示key类型参数的数量,我们可以将value设置为可传入的参数,不写死
示例:
不带参数的Lua脚本:
因为有些redis命令是可以一次性设置多个key value的,比如 mset
如果脚本中的key、value不想写死,可以作为参数传递。key类型参数会放到KEYS数组中,其他参数会放到ARGV数组中,在脚本中可以从KEYS和ARGV数组中获取参数:
lua语言中,数组的角标是从1开始而不是0
执行脚本:
分布式锁的释放锁的Lua脚本
释放锁业务流程
1、获取锁中的线程标识
2、判断是否与指定的标识(当前线程标识)一致
3、判断如果一致则释放锁(删除)
4、如果不一致啥也不做
Lua脚本
-- 获取锁中线程标识(key传参)
local key = KEYS[1]
-- 获取当前线程的标识(其他参数传参)
local threadId = ARGV[1]
-- 获取锁中的线程标识
local id = redis.call('get',key)
-- 比较线程中标识和锁中的标识是否一致
if (threadId == id) then
-- 释放锁
return redis.call('del',key)
end
return 0
简化写法:
-- 比较线程中标识和锁中的标识是否一致
if (ARGV[1] == redis.call('get',KEYS[1])) then
-- 释放锁
return redis.call('del',KEYS[1])
end
return 0
Java语言调用Lua脚本
修改代码:
修改前
修改后
@Override
public void unlock() {
//传入Lua脚本的KEYS数组
List<String> keys = new ArrayList<>(Arrays.asList(KEY_PRE + name));
//传入Lua的其他参数
String arg = ID_PRE + Thread.currentThread().getId();
//调用Lua脚本
stringRedisTemplate.execute(UNLOCK_SCRIPT, keys, arg);
}
总结
基于Redis的分布式锁优化
当前的分布式锁仍然存在一些问题
存在的问题
不可重入
同一个线程无法多次获取同一把锁。
当线程1拿到锁,A方法调用B方法时,A方法需要锁,B方法也需要锁,但是,A在调B时,锁还没有释放,还在A手里,B就迟迟拿不到锁,A也无法释放锁,此时就会出现死锁。
不可重试
获取锁,只重试一次,只要没获取到,立即返回false,没有进行重试
超时释放
如果锁超时时间过短,业务还没执行完,锁就被释放了,也会有问题。
如果锁超时时间过长,一但出了问题,需要很长一段时间才能自动释放锁。
主从一致性
主节点和从节点之间存在延迟,极端情况下,如果锁通过set写入到主节点,但是主节点还没来得及同步到从节点,这个时候主节点就宕机了,从节点里是没有这个锁的标识的。
此时,重新选举的主节点,是没有锁的,这个时候其他线程就会获取到锁。
如果你是用的单节点,其实也不用去理会这个问题。
以上这些问题,要实现起来其实很麻烦,我们可以通过现有的工具来进行实现。
Redisson
快速入门
引入依赖
<!--redisson-->
<dependency>
<groupId>org.redisson</groupId>
<artifactId>redisson</artifactId>
<version>3.13.6</version>
</dependency>
配置Redisson客户端
官方有提供来Redisson的SpringBoot的stater,但是会替代Spring官方提供的配置和实现,不建议使用,建议自己去配置。
import org.redisson.Redisson;
import org.redisson.api.RedissonClient;
import org.redisson.config.Config;
import org.redisson.config.SingleServerConfig;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* @author zhangzengxiu
* @date 2023/10/10
*/
@Configuration
public class RedisConfig {
/**
* redis的主机
*/
@Value("${spring.redis.host}")
private String redisHost;
/**
* redis的端口
*/
@Value("${spring.redis.port}")
private String redisPort;
/**
* redis的密码
*/
@Value("${spring.redis.password}")
private String redisPassword;
/**
* redis协议
*/
public static final String REDIS_PRE = "redis://";
@Bean
public RedissonClient getRedissonClient() {
//配置类
Config config = new Config();
//配置单节点的Redis
SingleServerConfig ssc = config.useSingleServer();
//配置集群 需要配置多个Redis地址
//SingleServerConfig ssc = config.useClusterServers();
ssc.setAddress(REDIS_PRE + redisHost + ":" + redisPort);
ssc.setPassword(redisPassword);
//创建客户端
return Redisson.create(config);
}
}
使用分布式锁
@Autowired
private RedissonClient redissonClient;
@Test
public void testRedissonLock() throws InterruptedException {
//获取锁(可重入)
RLock lock = redissonClient.getLock("orderLock");
/**
* 尝试获取锁
* 无参:失败直接返回
* 有参:
* 1:获取锁的最大等待时间,在此期间,获取锁失败了就会等待一段时间再去重试,超过这个最大等待时间才会返回false
* 10:自动释放的时间,服务出现宕机的情况下,自动释放的时间
* TimeUnit.SECONDS:时间单位
*/
boolean isLock = lock.tryLock(1, 10, TimeUnit.SECONDS);
if (isLock) {
try {
System.out.println("");
} finally {
lock.unlock();
}
}
}
修改之前的业务代码
Redisson可重入锁原理
可重入
流程
不可重入原因
当method1调用method2执行时,需要再次执行setnx
,但是setnx是互斥的,所以无法再次获取这把锁。
我们可以参考JDK提供的ReentrantLock�来实现锁的可重入,在获取锁的同时去判断是否是当前线程,每次获取锁就进行+1操作,释放锁就-1。所以使用redis的string类型就不满足要求了。
我们可以通过hash结构来实现:
string类型可以通过set nx ex
这样的命令来实现,但是hash并没有这样的组合命令,只能将命令拆开来实现。
获取锁Lua脚本
释放锁Lua脚本
查看Redisson获取锁的源码:
Lua脚本是通过字符串的形式来直接写死的。
释放锁
可重试
源码
time就是:设置的超时时间-前面第一次获取锁消耗的时间所得到的剩余时间
重试等待:利用了信号量+消息订阅机制
不是while(true)无休止的等待,是等每次订阅到之后才进行重试。
至此,重试问题已经解决了。
超时释放
获取锁成功了,但是业务还没执行完,锁到期了,锁被释放了???
timeout超时任务进行自动续约,每过一段时间就重置时间,一直执行
新的任务没有更新有效期的任务,所以需要调用renewExpiration方法,旧的任务已经有了这个刷新有效期的任务,就不需要再调用一次了。
锁释放的时间?是在unlock的时候才释放锁
总结
主从一致性问题
获取到锁之后,主节点宕机
重新选举出来的新的主节点,出现数据丢失,锁失效
解决方案
联合节点(最少3个节点)
简单粗暴,那就不要主从节点,每个节点都获取锁成功,才算成功!
如果后期其中一个节点宕机了,他自己的从节点数据丢失,那么此时并不是所有的节点都持有这把锁。
因为只有每一个节点都拿到锁,才算获取锁成功。
只要有1个节点是存活的状态,那么就不会有其他线程拿到锁,就不会有锁失效的问题。
我们可以单独使用几个节点,但是不建立主从关系就可以。
3个独立节点配置方式:
源码: