在很多情况下,你的数据库不支持事务,分布式部署也使得你无法去使用JVM锁,那么这种时候,你可以考虑用分布式锁
文章目录
- 分布式锁
- 1. 实现方式
- 2. 特征
- 3. 操作
- 4. 代码改造
- 5. 测试
- 优化
- 1. 递归改成循环
- 2. 防止死锁
- 3. 防误删
- 4. LUA脚本 保证原子性
- 1) EVAL基本使用
- 2)代码实现
- 3) 测试
- 5 分布式可重入锁: lua脚本 + hash
- redis的hash数据结构
- 1) hset
- 2) hexists
- 3) hincrby
- 加锁优化为可重入
- 1)写成LUA脚本
- 2) 测试
- 解锁优化为可重入
- 1) 编写LUA脚本
- 2)测试
- 代码改造
- 1) 代码较复杂,将lock类单独拿出来定义使用
- 2) 工厂模式 DistributedLockClent
- 3)DistributedRedisLock
- 4) StockService
- 测试
- uuid优化
- 1) DistributedLockClient
- 2) DistributedRedisLock
- 6. 自动续期
- 1) Timer定时器
- 2) LUA脚本
- 3) DistributedRedisLock
- 4) 测试
- 总结
- 加锁
- 解锁
- 重试 循环/迭代
分布式锁
1. 实现方式
- 基于redis实现
- 基于zookeeper/etcd实现
- 基于mysql实现
2. 特征
(1) 独占排他使用 setnx
(2) 防死锁发生:
redis客户端获取到锁后,立马宕机(设置失效时间)
不可重入:可重入
(3) 原子性:
- 获取锁和设置过期 set key value ex 3 nx
- 判断和释放锁之间:使用 lua脚本
(4) 防误删:解铃还须系铃人,先判断再删除
(5) 可重入性:hash(key field value) + LUA脚本
(6) 自动续期:实际执行时间大于设置的失效时间
定时任务(时间驱动Timer定时器)+ lua脚本
3. 操作
- 加锁
- 解锁
- 重试:递归、循环
简单来说,其实就类似于操作系统中的临界区-临界资源
一群线程去争抢该资源
- 经过一道阀口,只有一个线程可以抢到钥匙
- 持有钥匙的线程去执行,其他线程循环等待
- 该线程执行完成,归还钥匙,下一个抢到钥匙的线程执行
循环如此
那么代码上具体是如何操作呢
4. 代码改造
@Service
public class StockService {
@Autowired
private StringRedisTemplate redisTemplate;
public void deduct() {
// 用排他锁
Boolean flag = redisTemplate.opsForValue().setIfAbsent("lock", "111");
if(Boolean.FALSE.equals(flag)){
// 如果没抢到,等50ms后,接着抢
try {
Thread.sleep(50);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
deduct();
return;
}
// 如果抢到了,直接执行
try{
// 1。 查询库存
String stockStr = redisTemplate.opsForValue().get("stock");
if (!StringUtil.isNullOrEmpty(stockStr)) {
int stock = Integer.parseInt(stockStr);
// 2。 判断条件是否满足
if (stock > 0) {
// 3 更新redis
redisTemplate.opsForValue().set("stock", String.valueOf(stock - 1));
}
}
}finally {
redisTemplate.delete("lock");
}
}
其实就是用一个新的redis key作为这把钥匙,
- 某个线程成功setIfAbsent,那它就拿到了这把钥匙,其他线程只能等待
- 该线程执行完成后,delete该key
- 另一个成功setIfAbsent的线程拿到钥匙,开始执行
5. 测试
这里我直接使用了本地redis,之前连接的都是远程redis和mysql,发行并发量很低,后续都直接用本地了
可以看出并发量很大,达到了455
redis中库存也成功清0
优化
1. 递归改成循环
@Service
public class StockService {
@Autowired
private StringRedisTemplate redisTemplate;
public void deduct() {
while(!redisTemplate.opsForValue().setIfAbsent("lock", "111")){
// 如果没抢到,等50ms后,接着抢
try {
Thread.sleep(50);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
// 如果抢到了,直接执行
try{
// 1。 查询库存
String stockStr = redisTemplate.opsForValue().get("stock");
if (!StringUtil.isNullOrEmpty(stockStr)) {
int stock = Integer.parseInt(stockStr);
// 2。 判断条件是否满足
if (stock > 0) {
// 3 更新redis
redisTemplate.opsForValue().set("stock", String.valueOf(stock - 1));
}
}
}finally {
redisTemplate.delete("lock");
}
}
2. 防止死锁
若抢到钥匙的线程执行过程中,redis客户端程序突然宕机,那即使重启,该钥匙也永远无法自动归还了,所以设置个过期时间
redisTemplate.opsForValue()
.setIfAbsent("lock", "111",3,TimeUnit.SECONDS)
3. 防误删
利用一个唯一标识,比如线程号/uuid,先判断是否是自己的再删除
String uuid = UUID.randomUUID().toString();
redisTemplate.opsForValue()
.setIfAbsent("lock", uuid,3,TimeUnit.SECONDS)
然后再删除的时候进行判断
// 先判断是否自己的锁,再解锁(防误删)
if(StrUtil.equals(redisTemplate.opsForValue().get("lock"),uuid)){
redisTemplate.delete("lock");
}
但该删除操作并不能保证判断再删除是原子性的,在高并发情况下,
A判断成功后,属于A的key刚好失效,
然后B拿到了锁,最后A 删掉的是B的锁。
4. LUA脚本 保证原子性
redis单线程,执行指令遵循one-by-one。
而LUA脚本一次性发送多个指令给redis,保证原子性。
1) EVAL基本使用
EVAL script numkeys key [key …] arg [arg …]
script: lua脚本字符串
numkeys: key列表的元素数量
key列表:以空格分割 KEYS(index从1开始)
arg列表:以空格分割 ARGV(index从1开始)
- 输出的不是print,而是return返回值
-
全局变量 a = 5 (redis不允许定义全局变量)
-
局部变量 local a = 5
-
分支控制
if 条件 then 代码块 elseif 条件 then 代码块 end
-
传递参数 KEYS[] ARGV[]
-
LUA来调用redis的常见命令 get
与redis的执行命令顺序一致,如果是set
非常简单,弄懂了后就可以开始使用lua脚本
// 判断是否是自己的锁,如果是自己的锁,则执行删除操作
if redis.call(‘get’, ‘lock’) == uuid
then return redis.call(‘del’,‘lock’)
else return 0
end
key: lock
argv: uuid
整理成一句lua脚本语句
EVAL " if redis.call(‘get’, KEYS[1]) == ARGV[1] then return redis.call(‘del’,KEYS[1]) else return 0 end" 1 ‘lock’ ‘1234’
简单测试一下
修改下,看不是自己的lock能不能删除
不是自己的lock,则不删除,ok
2)代码实现
String script = "if redis.call('get', KEYS[1]) == ARGV[1] " +
"then return redis.call('del',KEYS[1]) " +
"else return 0 " +
"end\"";
// 先判断是否自己的锁,再解锁(防误删)
redisTemplate.execute(new DefaultRedisScript<>(script, Boolean.class), Arrays.asList("lock"), uuid);
注意execute参数,
new DefaultRedisScript<>(script, Boolean.class) 脚本和运行返回类型
Arrays.asList("lock") keys列表,所有keys放在列表中
uuid argv参数,后续参数直接,叠加
3) 测试
并发可达475
redis也正常清零了,成功
5 分布式可重入锁: lua脚本 + hash
可参考ReentrantLock可重入锁解析锁实现分布式可重入锁
这个时候就考虑用redis的hash结构,因为你不仅只有key-value,还有个state属性,即加锁次数
redis的hash数据结构
1) hset
hset key field value
hash可以简单看成一个大Map中套着个小Map <key , <key1,value> >
2) hexists
作用:测试给定key下的field是否存在。
格式:hexists key field
3) hincrby
作用:增减数值
格式:hincrby key field increment
把age+1 变成了12
加锁优化为可重入
-
- 判断锁是否存在(exists),若不存在则直接获取锁 hset key field value
-
- 若锁存在则判断是否是自己的锁(hexists),若是则重入:hincrby key field increment
-
- 否则重试,递归/循环
1)写成LUA脚本
if redis.call('exists','lock')==0
then redis.call('hset','lock',uuid,1)
redis.call('expire','lock',30)
return 1
elseif redis.call('hexists','lock',uuid)==0
then redis.call('hincrby','lock',uuid,1)
redis.call('expire','lock',30)
return 1
else return 0
end
这里可以再优化下
hincrby若没有key field会自动生成,此处可替换hset
if redis.call('exists','lock')==0 or redis.call('hexists','lock',uuid)==0
then redis.call('hincrby','lock',uuid,1)
redis.call('expire','lock',30)
return 1
else return 0
end
再将KEYS,ARGV加入后拼凑成一句语句
if redis.call('exists',KEYS[1])==0 or redis.call('hexists',KEYS[1],ARGV[1])==1 then redis.call('hincrby',KEYS[1],ARGV[1],1) redis.call('expire',KEYS[1],ARGV[2]) return 1 else return 0 end
2) 测试
- 先简单加锁 参数 1 “lock” “1234567” 30
加锁成功
- 重入加锁 参数 1 “lock” “1234567” 30
- value变成了2, 而且失效时间成功更新了
- 先参数 1 “lock” “123” 30 再执行参数 1 “lock” “1234567” 30
后一条加锁失败,直至行了123那条。测试成功。
解锁优化为可重入
1) 编写LUA脚本
- 判断自己的锁是否存在(hexists),不存在则返回nil
- 若自己的锁存在,则减1(hincrby -1),判断减1后的值是否为0,为0则释放锁(del)并返回1
- 不为0,返回0
LUA脚本
if redis.call('hexists','lock',uuid) == 0
then return nil
elseif redis.call('hincrby', 'lock',uuid,-1)==0
then return redis.call('del','lock')
else return 0
end
把参数和key填充进去后,优化成一句语句
if redis.call('hexists',KEYS[1],ARGV[1]) == 0 then return nil elseif redis.call('hincrby', KEYS[1],ARGV[1],-1)==0 then return redis.call('del',KEYS[1]) else return 0 end
2)测试
-
先参数 1 “lock” “1234567”
当前没有该锁,返回nil -
用之前脚本加锁
加锁一次,解锁一次,返回1,解锁第二次,返回nil -
多次加锁,多次解锁
两次加锁成功,返回1,解锁第一次返回0,第二次返回1。
代码改造
1) 代码较复杂,将lock类单独拿出来定义使用
DistributedRedisLock 参照ReentrantLock实现Lock接口
public class DistributedRedisLock implements Lock {
@Override
public void lock() {
}
@Override
public void lockInterruptibly() throws InterruptedException {
}
@Override
public boolean tryLock() {
return false;
}
@Override
public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
return false;
}
@Override
public void unlock() {
}
@Override
public Condition newCondition() {
return null;
}
}
2) 工厂模式 DistributedLockClent
由于后续可能还有其他分布式锁类,直接用工厂模式实现。
将StringRedisTemplate传递,因为只有DistributedLockClent添加了@Component供其他类注入
@Component
public class DistributedLockClient {
@Autowired
private StringRedisTemplate redisTemplate;
public DistributedRedisLock getRedisLock(String key){
return new DistributedRedisLock(redisTemplate,key);
}
}
3)DistributedRedisLock
-加锁
@Override
public void lock() {
tryLock();
}
@Override
public boolean tryLock() {
try {
return tryLock(-1,TimeUnit.SECONDS);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
@Override
public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
return false;
}
层层调用后修改tryLock即可。
/**
* 加锁方法
* @param time the maximum time to wait for the lock
* @param unit the time unit of the {@code time} argument
* @return
* @throws InterruptedException
*/
@Override
public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
// 如果不是默认调用的,那么自动计算出秒数
if(time != -1){
expire = unit.toSeconds(time);
}
// 如果没有锁加锁,如果是我的锁,重入 state+1
String script = "if redis.call('exists',KEYS[1])==0 or redis.call('hexists',KEYS[1],ARGV[1])==1 " +
"then" +
" redis.call('hincrby',KEYS[1],ARGV[1],1) " +
" redis.call('expire',KEYS[1],ARGV[2]) " +
" return 1 " +
"else " +
" return 0 " +
"end";
// 如果return 0 即抢锁失败,则循环抢
while (Boolean.FALSE.equals(redisTemplate.execute(new DefaultRedisScript<>(script, Boolean.class), Arrays.asList(key), uuid, String.valueOf(expire)))){
Thread.sleep(50);
}
return true;
}
- 解锁
/**
* 解锁方法
*/
@Override
public void unlock() {
// 如果不是我的锁,return nil 是我的锁减1,建减后state为0,则全部解锁完成return 1 ,不为0, 则return 0
String script = " if redis.call('hexists',KEYS[1],ARGV[1]) == 0 " +
"then " +
" return nil " +
"elseif redis.call('hincrby', KEYS[1],ARGV[1],-1)==0 " +
"then " +
" return redis.call('del',KEYS[1]) " +
"else return 0 " +
"end";
Long flag = redisTemplate.execute(new DefaultRedisScript<>(script,Long.class),Arrays.asList(key),uuid);
if(flag == null){
throw new IllegalMonitorStateException("this lock does not belong to you");
}
}
由于返回值有nil , 1, 0 所有返回类型为 Long.class,分别对应null,1,0
4) StockService
@Service
public class StockService {
@Autowired
private StringRedisTemplate redisTemplate;
@Autowired
private DistributedLockClient distributedLockClient;
public void deduct() {
DistributedRedisLock redisLock = distributedLockClient.getRedisLock("lock");
redisLock.lock();
try{
// 1。 查询库存
String stockStr = redisTemplate.opsForValue().get("stock");
if (!StringUtil.isNullOrEmpty(stockStr)) {
int stock = Integer.parseInt(stockStr);
// 2。 判断条件是否满足
if (stock > 0) {
// 3 更新redis
redisTemplate.opsForValue().set("stock", String.valueOf(stock - 1));
}
}
}finally {
redisLock.unlock();
}
}
}
测试
并发达到了440
库存也正常清空
uuid优化
目前是每个线程都生成一个uuid,在可重入可能会出现问题,
public void test1(){
DistributedRedisLock redisLock = distributedLockClient.getRedisLock("lock");
redisLock.lock();
test2();
redisLock.unlock();
}
public void test2(){
DistributedRedisLock redisLock = distributedLockClient.getRedisLock("lock");
redisLock.lock();
redisLock.unlock();
}
优化为线程id+uuid,且一个工厂类生成一个uuid的方式
1) DistributedLockClient
@Component
public class DistributedLockClient {
@Autowired
private StringRedisTemplate redisTemplate;
public DistributedLockClient() {
this.uuid = UUID.randomUUID().toString();
}
private String uuid;
public DistributedRedisLock getRedisLock(String key){
return new DistributedRedisLock(redisTemplate,key,uuid);
}
}
2) DistributedRedisLock
public String getId(){
return uuid + ":" + Thread.currentThread().getId();
}
把之前的uuid使用地方,替换成getId()即可。
测试后成功。
6. 自动续期
定时任务(时间驱动 Timer定时器) + LUA脚本
1) Timer定时器
public static void main(String[] args) {
System.out.println("定时任务开始时间" + System.currentTimeMillis());
new Timer().schedule(new TimerTask() {
@Override
public void run() {
System.out.println("定时任务执行时间" + System.currentTimeMillis());
}
},2000,5000);
}
由于它可以手动取消,所以选择了它来使用
2) LUA脚本
- 判断自己的锁是否存在(hexists),若存在则重置过期时间
if redis.call('hexists','lock',uuid) == 1
then return redis.call('expire','lock',30)
else return 0
end
整理后
if redis.call('hexists',KEYS[1],ARGV[1]) == 1 then return redis.call('expire',KEYS[1],ARGV[2]) else return 0 end
3) DistributedRedisLock
private void renewExpire(){
// 如果锁存在则续期,不存在则不续期
String script = "if redis.call('hexists',KEYS[1],ARGV[1]) == 1 " +
"then " +
" return redis.call('expire',KEYS[1],ARGV[2]) " +
"else " +
" return 0 end";
new Timer().schedule(new TimerTask() {
@Override
public void run() {
// 如果锁还在,则自动续期
if(Boolean.TRUE.equals(redisTemplate.execute(new DefaultRedisScript<>(script, Boolean.class), Arrays.asList(key), uuid, String.valueOf(expire)))){
renewExpire();
}
}
},expire*1000/3);
}
同时,修改初始化的uuid,防止定时器内部的线程id不同。
public DistributedRedisLock(StringRedisTemplate redisTemplate, String key, String uuid) {
this.redisTemplate = redisTemplate;
this.key = key;
this.uuid = uuid + ":" + Thread.currentThread().getId();
}
4) 测试
成功,但并发较低100,很耗线程。
总结
加锁
- setnx: 独占排他 死锁、不可重入、原子性
- set k v ex 30 nx: 独占排他 死锁 不可重入
- hash + lua脚本: 可重入锁
- 判断锁是否占用,若没有占用则直接获取锁,并设置过期时间
- 若锁被占用,则判断是否当前线程占用,如果是则重入并重置过期时间
- 否则获取锁失败,在代码中重试
- Timer定时器+lua脚本:实现锁的自动续期
- 判断是否是自己的锁,若是则续期。
解锁
- del: 导致误删
- 先判断再删除,同时保持原子性: lua脚本
- hash+lua脚本:可重入
- 判断当前线程的锁是否存在,不存在则返回nil,抛出异常
- 存在则直接减1,判断减1后的值是否为0,为0则释放锁(del),并返回1
- 不为0,则返回0