一,出现问题的原因
因此每一个锁对象,都会指向一个锁监视器,而每一个锁监视器,同一时刻只能被一个线程持有,这样就实现了互斥效果。但前提是,多个线程使用的是同一把锁。
但问题来了,我们的服务将来肯定会多实例不是,形成集群。每一个实例都会有一个自己的JVM运行环境,因此即便是同一个用户,如果并发的发起了多个请求,由于请求进入了多个JVM,就会出现多个锁对象(用户id对象),自然就有多个锁监视器。此时就会出现每个JVM内部都有一个线程获取锁成功的情况,没有达到互斥的效果,并发安全问题就可能再次发生了。
二,解决方法
我们不能让每个实例去使用各自的JVM内部锁监视器,而是应该在多个实例外部寻找一个锁监视器,多个实例争抢同一把锁。
分布式锁必须要满足的特征:
多JVM实例都可以访问
互斥
使用Redis中的setnx命令
Redis的setnx命令是对string类型数据的操作,语法如下:
# 删除指定key,用来释放锁 DEL key
# 给key赋值为value SETNX key value
当前仅当key不存在的时候,setnx才能执行成功,并且返回1,其它情况都会执行失败,并且返回0.我们就可以认为返回值是1就是获取锁成功,返回值是0就是获取锁失败,实现互斥效果。
1,根据redis中的NX互斥原理
我们用lock作为某个业务的锁的key,获取锁就执行下面命令:
# 获取锁,并记录持有锁的线程 SETNX lock thread1
假设说一开始lock不存在,有很多线程同时对lock执行setnx命令。由于Redis命令本身是串行执行的,也就是各个线程是串行依次执行。因此当第一个线程执行setnx时,会成功添加这个lock。但其余的线程会发现lock已经存在,自然就执行失败。自然就实现了互斥效果。
当业务执行完毕,直接删除lock,自然就释放锁了
# 释放锁 DEL lock
2,但是这样有弊端
比如我们获取锁成功,还未释放锁呢当前实例突然宕机了!那么释放锁的逻辑自然就永远不会被执行,这样lock就永远存在,再也不会有其它线程获取锁成功了!出现了死锁问题。
- 于是我们可以给锁设置过期时间,到时间直接释放锁
# 获取锁,并记录持有锁的线程 SETNX lock thread1 # 设置过期时间,避免死锁 EXPIRE lock 20
为了保证该命令的一致性,我们可以将这两条命令设置成一条
SET lock thread1 NX EX 10
3,这样还有弊端,
如果我们在释放锁之前出现了阻塞,然后到了锁的超时时间自动释放,然后又来了个线程得到锁,这是原有的线程恢复正常释放锁。但是这时候释放的锁就不是自己的锁,这样就出现了锁的误删问题
三,基于以上原因我们使用 Redisson
原子性问题:可以利用Redis的LUA脚本来编写锁操作,确保原子性
超时问题:利用WatchDog(看门狗)机制,获取锁成功时开启一个定时任务,在锁到期前自动续期,避免超时释放。而当服务宕机后,WatchDog跟着停止运行,不会导致死锁。
锁重入问题:可以模拟Synchronized原理,放弃setnx,而是利用Redis的Hash结构来记录锁的持有者以及重入次数,获取锁时重入次数+1,释放锁是重入次数-1,次数为0则锁删除
主从一致性问题:可以利用Redis官网推荐的RedLock机制来解决
基于以上原因我们使用Redisson组件解决上面出现的问题
1, 引入依赖
<!--redisson-->
<dependency>
<groupId>org.redisson</groupId>
<artifactId>redisson</artifactId>
</dependency>
2,编写Redisson配置类
import cn.hutool.core.collection.CollectionUtil;
import cn.hutool.core.util.StrUtil;
import com.tianji.common.autoconfigure.redisson.aspect.LockAspect;
import lombok.extern.slf4j.Slf4j;
import org.redisson.Redisson;
import org.redisson.api.RedissonClient;
import org.redisson.config.Config;
import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
import org.springframework.boot.autoconfigure.data.redis.RedisProperties;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
@Slf4j
@ConditionalOnClass({RedissonClient.class, Redisson.class}) //如果引入了Redisson的依赖会自动注入RedissonClient和Redisson
@Configuration
@EnableConfigurationProperties(RedisProperties.class) //读取配置文件中的信息
public class RedissonConfig {
private static final String REDIS_PROTOCOL_PREFIX = "redis://";
private static final String REDISS_PROTOCOL_PREFIX = "rediss://";
@Bean
@ConditionalOnMissingBean
public LockAspect lockAspect(RedissonClient redissonClient){
return new LockAspect(redissonClient);
}
@Bean
@ConditionalOnMissingBean
public RedissonClient redissonClient(RedisProperties properties){
log.debug("尝试初始化RedissonClient");
// 1.读取Redis配置
RedisProperties.Cluster cluster = properties.getCluster(); //集群模式
RedisProperties.Sentinel sentinel = properties.getSentinel(); //哨兵模式
String password = properties.getPassword();
int timeout = 3000;
Duration d = properties.getTimeout();
if(d != null){
timeout = Long.valueOf(d.toMillis()).intValue();
}
// 2.设置Redisson配置
Config config = new Config();
if(cluster != null && !CollectionUtil.isEmpty(cluster.getNodes())){
// 集群模式
config.useClusterServers()
.addNodeAddress(convert(cluster.getNodes()))
.setConnectTimeout(timeout)
.setPassword(password);
}else if(sentinel != null && !StrUtil.isEmpty(sentinel.getMaster())){
// 哨兵模式
config.useSentinelServers()
.setMasterName(sentinel.getMaster())
.addSentinelAddress(convert(sentinel.getNodes()))
.setConnectTimeout(timeout)
.setDatabase(0)
.setPassword(password);
}else{
// 单机模式
config.useSingleServer()
.setAddress(String.format("redis://%s:%d", properties.getHost(), properties.getPort()))
.setConnectTimeout(timeout)
.setDatabase(0)
.setPassword(password);
}
// 3.创建Redisson客户端
return Redisson.create(config);
}
private String[] convert(List<String> nodesObject) {
List<String> nodes = new ArrayList<>(nodesObject.size());
for (String node : nodesObject) {
if (!node.startsWith(REDIS_PROTOCOL_PREFIX) && !node.startsWith(REDISS_PROTOCOL_PREFIX)) {
nodes.add(REDIS_PROTOCOL_PREFIX + node);
} else {
nodes.add(node);
}
}
return nodes.toArray(new String[0]);
}
}
3,基本用法
我们完全可以基于AOP的思想,将业务部分作为切入点,将业务前后的锁操作作为环绕增强。
定义锁的工厂,方便修改锁的类型
import org.redisson.api.RLock;
import org.redisson.api.RedissonClient;
import org.springframework.stereotype.Component;
import java.util.EnumMap;
import java.util.Map;
import java.util.function.Function;
import static com.tianji.promotion.utils.MyLockType.*;
@Component
public class MyLockFactory {
private final Map<MyLockType, Function<String, RLock>> lockHandlers;
public MyLockFactory(RedissonClient redissonClient) {
this.lockHandlers = new EnumMap<>(MyLockType.class); //EnumMap 自动获取枚举类中有多少枚举项,当我们Map的key是枚举类型时我们就可以使用EnumMap
this.lockHandlers.put(RE_ENTRANT_LOCK, redissonClient::getLock);
this.lockHandlers.put(FAIR_LOCK, redissonClient::getFairLock);
this.lockHandlers.put(READ_LOCK, name -> redissonClient.getReadWriteLock(name).readLock());
this.lockHandlers.put(WRITE_LOCK, name -> redissonClient.getReadWriteLock(name).writeLock());
}
public RLock getLock(MyLockType lockType, String name){
return lockHandlers.get(lockType).apply(name);
}
}
自定义注解
import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
import java.util.concurrent.TimeUnit;
@Retention(RetentionPolicy.RUNTIME) //标记当前注解运行在什么时候
@Target(ElementType.METHOD) //当前注解用在什么地方
public @interface MyLock {
String name();
long waitTime() default 1; //等待时间,获取锁失败后等待1秒,可以在1秒内重试
long leaseTime() default -1; //锁超时释放时间 -1时会自定启用看门狗机制
TimeUnit unit() default TimeUnit.SECONDS; //时间单位
MyLockFactory lockType() default MyLockFactory.RE_ENTRANT_LOCK; //锁的类型
MyLockStrategy lockStrategy() default MyLockStrategy.FAIL_AFTER_RETRY_TIMEOUT; //s锁失败策略
}
编写环绕通知方法
import lombok.RequiredArgsConstructor;
import org.aspectj.lang.ProceedingJoinPoint;
import org.aspectj.lang.annotation.Around;
import org.aspectj.lang.annotation.Aspect;
import org.redisson.api.RLock;
import org.redisson.api.RedissonClient;
import org.springframework.stereotype.Component;
@Component
@Aspect //标记当前方法是个环绕通知
@RequiredArgsConstructor
public class MyLockAspect implements Ordered{
private final RedissonClient redissonClient;
//pjp 切入点
@Around("@annotation(myLock)") //基于myLock进行拦截
public Object tryLock(ProceedingJoinPoint pjp, MyLock myLock) throws Throwable {
// 1.创建锁对象
RLock lock = redissonClient.getLock(myLock.name());
// 2.尝试获取锁
boolean isLock = lock.tryLock(myLock.waitTime(), myLock.leaseTime(), myLock.unit());
// 3.判断是否成功
if(!isLock) {
// 3.1.失败,快速结束
return null;
}
try {
// 3.2.成功,执行业务
return pjp.proceed();
} finally {
// 4.释放锁
lock.unlock();
}
}
@Override //让当前切面方法首先执行
public int getOrder() {
return 0;
}
}
Spring中的AOP切面有很多,会按照Order排序,按照Order值从小到大依次执行。Spring事务AOP的order值是Integer.MAX_VALUE,优先级最低。
我们的分布式锁一定要先于事务执行,因此,我们的切面一定要实现Ordered接口,指定order值小于Integer.MAX_VALUE即可。
三,分布式锁失败策略
获取锁失败是否要重试?有三种策略:
不重试,对应API:
lock.tryLock(0, 10, SECONDS)
,也就是waitTime小于等于0有限次数重试:对应API:
lock.tryLock(5, 10, SECONDS)
,也就是waitTime大于0,重试一定waitTime时间后结束无限重试:对应API
lock.lock(10, SECONDS)
, lock就是无限重试重试失败后怎么处理?有两种策略:
直接结束
抛出异常
代码实现
import com.tianji.common.exceptions.BizIllegalException; import org.redisson.api.RLock; public enum MyLockStrategy { //快速结束 SKIP_FAST(){ @Override public boolean tryLock(RLock lock, MyLock prop) throws InterruptedException { return lock.tryLock(0, prop.leaseTime(), prop.unit()); } }, //快速失败 FAIL_FAST(){ @Override public boolean tryLock(RLock lock, MyLock prop) throws InterruptedException { boolean isLock = lock.tryLock(0, prop.leaseTime(), prop.unit()); if (!isLock) { throw new BizIllegalException("请求太频繁"); } return true; } }, //无限重试 KEEP_TRYING(){ @Override public boolean tryLock(RLock lock, MyLock prop) throws InterruptedException { lock.lock( prop.leaseTime(), prop.unit()); return true; } }, //重试超时后结束 SKIP_AFTER_RETRY_TIMEOUT(){ @Override public boolean tryLock(RLock lock, MyLock prop) throws InterruptedException { return lock.tryLock(prop.waitTime(), prop.leaseTime(), prop.unit()); } }, //重试超时后抛异常 FAIL_AFTER_RETRY_TIMEOUT(){ @Override public boolean tryLock(RLock lock, MyLock prop) throws InterruptedException { boolean isLock = lock.tryLock(prop.waitTime(), prop.leaseTime(), prop.unit()); if (!isLock) { throw new BizIllegalException("请求太频繁"); } return true; } }, ; public abstract boolean tryLock(RLock lock, MyLock prop) throws InterruptedException; }