搭建Redisson流程以及解读MutilLock源码解决分布式锁的主从一致性问题
- 1、搭建3台独立主节点的redis服务
- 2、创建java redisson客户端
- 3、获取分布式锁
- 4、分析获取锁源码
- getMultiLock
- tryLock(long waitTime, long leaseTime, TimeUnit unit)
- 5、总结
1、搭建3台独立主节点的redis服务
为了方便,采用docker进行搭建
搭建前准备:使用docker容器搭建redis服务,不同于其他服务,启动后并没有找到有关redis的配置,但是其实docker 提供的redis镜像已经默认帮我们添加了许多配置,比如说通过命令直接搭建的redis服务,可以直接被其他服务器访问
docker run -d -p 6379:6379 --name redis_6379 redis
但是搭建过redis服务器的小伙伴都知道,必须在redis.conf配置文件中修改bind 0.0.0.0 才能被除本机外其他服务器所访问
现在我们是基于学习搭建的测试服务器,所以我们还是通过制定配置文件的方式启动
红色框部分不需要创建,只需要保证 /root/redis 多其他文件夹以及redis.conf文件即可
redis.conf配置文件
bind 0.0.0.0
appendonly yes
创建文件 vim docker_run_redis.sh
挂载redis.conf文件、以及aof、rbd的持久化文件,并且通过redis-server命令指定 目标文件启动
docker run -d -p 6379:6379 --name redis_zs -v /root/redis/redis.conf:/data/redis.conf -v /root/redis/data:/data redis redis-server /data/redis.conf
docker run -d -p 6380:6379 --name redis_zs1 -v /root/redis/redis_zs1/redis.conf:/data/redis.conf -v /root/redis/redis_zs1/data:/data redis redis-server /data/redis.conf
docker run -d -p 6381:6379 --name redis_zs2 -v /root/redis/redis_zs2/redis.conf:/data/redis.conf -v /root/redis/redis_zs2/data:/data redis redis-server /data/redis.conf
sh docekr_cun_redis.sh 启动当前命令
查看redis服务节点启动情况
搭建完成
2、创建java redisson客户端
@Configuration
public class RedisConfig {
@Bean
public RedissonClient redissonClient() {
// 配置
Config config = new Config();
config.useSingleServer().setAddress("redis://192.168.79.128:6379");
// 创建 RedissonClient 对象
return Redisson.create(config);
}
@Bean
public RedissonClient redissonClient1() {
// 配置
Config config = new Config();
config.useSingleServer().setAddress("redis://192.168.79.128:6380");
// 创建 RedissonClient 对象
return Redisson.create(config);
}
@Bean
public RedissonClient redissonClient2() {
// 配置
Config config = new Config();
config.useSingleServer().setAddress("redis://192.168.79.128:6381");
// 创建 RedissonClient 对象
return Redisson.create(config);
}
}
3、获取分布式锁
@SpringBootTest
public class RedissonTest {
@Resource
private RedissonClient redissonClient;
@Resource
private RedissonClient redissonClient1;
@Resource
private RedissonClient redissonClient2;
private RLock lock;
@BeforeEach
void setUp() {
RLock lock1 = redissonClient.getLock("test");
RLock lock2 = redissonClient1.getLock("test");
RLock lock3 = redissonClient2.getLock("test");
// 创建联锁
lock = redissonClient.getMultiLock(lock1, lock2, lock3);
}
@Test
void method1() throws InterruptedException {
boolean isLock = lock.tryLock(1L, TimeUnit.SECONDS);
if (!isLock) {
log.error("获取锁失败 ... 1");
return;
}
try {
log.info("获取锁成功 ... 1");
method2();
log.info("开始执行业务 ... 1");
} finally {
log.warn("准备释放锁 ... 1");
lock.unlock();
}
}
@Test
void method2() {
boolean isLock = lock.tryLock();
if (!isLock) {
log.error("获取锁失败 ... 2");
return;
}
try {
log.info("获取锁成功 ... 2");
log.info("开始执行业务 ... 2");
} finally {
log.warn("准备释放锁 ... 2");
lock.unlock();
}
}
}
4、分析获取锁源码
getMultiLock
该方法入参是可变参数
最终赋值给RedissonMutilLock 的常量locks
@Override
public RLock getMultiLock(RLock... locks) {
return new RedissonMultiLock(locks);
}
public RedissonMultiLock(RLock... locks) {
if (locks.length == 0) {
throw new IllegalArgumentException("Lock objects are not defined");
}
this.locks.addAll(Arrays.asList(locks));
}
tryLock(long waitTime, long leaseTime, TimeUnit unit)
@Override
public boolean tryLock(long waitTime, TimeUnit unit) throws InterruptedException {
// 没有设置锁的过期释放时间默认leaseTime为-1
return tryLock(waitTime, -1, unit);
}
@Override
public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException {
// 代码迭代
// try {
// return tryLockAsync(waitTime, leaseTime, unit).get();
// } catch (ExecutionException e) {
// throw new IllegalStateException(e);
// }
long newLeaseTime = -1;
// 判断是否手动设置了释放锁时间
if (leaseTime != -1) {
// 当前手动设置了释放锁时间
// 判断是否手动设置了获取锁的等待时间
if (waitTime == -1) {
// 当前没有手动设置获取锁等待时间,获取锁失败不进入重试
// 释放时间不变
newLeaseTime = unit.toMillis(leaseTime);
} else {
// 当前有手动设置获取锁的等待时间以及释放锁时间
// 重试可能耗时较久,如果还没有重试完锁就释放了,那么要同时多个节点获取锁还有什么意义的?
// 将获取锁的等待时间的2倍赋值给newLeaseTime 这样做是为了防止锁过期释放时间leaseTime比等待获取锁时间waitTime小
// 为什么呢? 如果执行到这一行代码,则表示调用者同时手动设置了leaseTime以及waitTime,设置了leaseTime则不会有watchDog机制进行锁续命,那么在多次获取锁的情况下,并且是多节点同时获取锁成功,一定要保证等待获取锁的过程中,锁一定不能过期被释放,因为redisson的getMultiLock,是通过多个RedissonClient获取的锁,那么可能存在某个线程获取不到锁而等待waitTime,在等待过程中这个锁过期释放了,就不能够保证多个节点同时获取锁
newLeaseTime = unit.toMillis(waitTime)*2;
}
}
// 记录当前时间
long time = System.currentTimeMillis();
long remainTime = -1;
if (waitTime != -1) {
// 如果设置了等待获取锁的时间
// remainTime 赋值为等待获取锁的时间
remainTime = unit.toMillis(waitTime);
}
// 计算所等待时间,该方法返回值还是remainTime
long lockWaitTime = calcLockWaitTime(remainTime);
// 获取锁失败限制为 0
int failedLocksLimit = failedLocksLimit();
// 创建集合的大小为locks集合中元素的大小,当前为3
List<RLock> acquiredLocks = new ArrayList<>(locks.size());
// 通过for循环遍历每一个lock
for (ListIterator<RLock> iterator = locks.listIterator(); iterator.hasNext();) {
--------------------------- 内循环开始-----------------------------
// 取出lock
RLock lock = iterator.next();
boolean lockAcquired;
try {
// 没有传递获取锁等待时间、锁释放时间
if (waitTime == -1 && leaseTime == -1) {
// 直接获取锁 tryLock(-1, -1, null)
lockAcquired = lock.tryLock();
} else {
// 可能传递了waitTime、leaseTime,或者都传递了
// 重置获取锁的等待时间,这里lockWaitTime、remainTime是一样的
long awaitTime = Math.min(lockWaitTime, remainTime);
// 当前 awaitTime = remainTime = lockWaitTime
// newLeaseTime 可能为调用者传入的leaseTime,如果调用者没有传入leaseTime,则设置为2被的waitTime
// 保证newLeaseTime一定大于awatiTime
lockAcquired = lock.tryLock(awaitTime, newLeaseTime, TimeUnit.MILLISECONDS);
}
} catch (RedisResponseTimeoutException e) {
// 捕获redis响应异常,尝试释放当前可能获取到的锁
unlockInner(Arrays.asList(lock));
// 标记当前获取锁失败
lockAcquired = false;
} catch (Exception e) {
lockAcquired = false;
}
if (lockAcquired) {
// 获取锁成功
// 加入到获取锁成功的集合
acquiredLocks.add(lock);
} else {
// 获取锁失败
// 应该获取到RedissonLock个数 - 已经获取到RedissonLock个数 是否等允许失败最大锁个数
// failedLocksLimit 方法返回值默认为0
// 也就是说for循环遍历到的这个RedissonLock,并且获取锁失败了
if (locks.size() - acquiredLocks.size() == failedLocksLimit()) {
// 所有的锁都拿到了才能够结束
break;
}
if (failedLocksLimit == 0) {
// 把已经获取到的锁释放掉
unlockInner(acquiredLocks);
if (waitTime == -1) {
// 没有获取锁的等待时间,则说明不想重试,直接返回失败
return false;
}
// 想重试
failedLocksLimit = failedLocksLimit();
// 把已经拿到的锁释放掉
acquiredLocks.clear();
// reset iterator
// 迭代器指针重置
while (iterator.hasPrevious()) {
// 迭代器指针前移
iterator.previous();
}
} else {
failedLocksLimit--;
}
}
// 判断剩余等待时间
if (remainTime != -1) {
// 更新剩余等待时间
remainTime -= System.currentTimeMillis() - time;
time = System.currentTimeMillis();
if (remainTime <= 0) {
// 说明刚才获取锁已经把等待时间耗尽
// 把已经获取到的锁释放掉,失败后前面的锁已经不能再拿了,避免其他线程获取锁失败
unlockInner(acquiredLocks);
// 返回false获取锁失败
return false;
}
// 如果时间还很充足,则进去下一层循环,继续获取下一把锁
}
--------------------------- 内循环结束 -----------------------
} // for循环结束
if (leaseTime != -1) {
// 手动设置leaseTime才会触发延长锁的过期时间(没有手动设置才会触发watchDog机制)
// 设置了锁的过期时间
// 获取所有已经拿到的锁
List<RFuture<Boolean>> futures = new ArrayList<>(acquiredLocks.size());
for (RLock rLock : acquiredLocks) {
// 延长锁的过期时间
// 因为MutilLock要求获取多节点的锁,在成功获取第一把锁时,锁的过期时间倒计时就已经开始了,这样等多节点获取锁完成之后,获取的第一把锁的过期时间会比最后一把锁的过期时间明显要短
// 所以等所有锁都拿完了,在为每一把锁延长锁的过期时间
RFuture<Boolean> future = ((RedissonLock) rLock).expireAsync(unit.toMillis(leaseTime), TimeUnit.MILLISECONDS);
futures.add(future);
}
for (RFuture<Boolean> rFuture : futures) {
rFuture.syncUninterruptibly();
}
}
return true;
}
5、总结
原理:搭建多个独立的Redis节点,必须在所有节点获取到锁才算真正的获取锁成功
缺陷:运维成本高,实现复杂,如果要保证业务的高可用性,需要搭建多个节点,并且为每个主节点配置从节点,实现主从复制
以上便是搭建Redisson流程以及MutilLock的源码解读,如有误解,请在评论区指出,谢谢