【分布式锁】Redisson分布式锁的使用(推荐使用)

news2024/12/1 0:40:53

文章目录

  • 前言
  • 一、常见分布式锁方案对比
  • 二、分布式锁需满足四个条件
  • 三、什么是Redisson?
    • 官网和官方文档
    • Redisson使用
  • 四、Redisson 分布式重入锁用法
    • Redisson 支持单点模式、主从模式、哨兵模式、集群模式
    • 自己先思考下,如果要手写一个分布式锁组件,怎么做?
  • 五、加锁&解锁Lua脚本
    • 1、加锁Lua脚本
      • 脚本入参
      • 脚本内容
      • 脚本解读
    • 2、解锁Lua脚本
      • 脚本入参
      • 脚本内容
      • 脚本解读
  • 六、源码解读
    • 加锁流程源码
    • 加锁过程小结
    • 解锁流程源码
    • 加锁&解锁流程串起来
    • 概括下整个流程
  • 七、其它相关
  • 八、redlock算法
    • 用 Redisson 实现分布式锁(红锁 RedissonRedLock)及源码分析(实现三)
  • Redisson 实现redlock算法源码分析(RedLock)
    • 加锁核心代码
  • 参考文献

前言

在某些场景中,多个进程必须以互斥的方式独占共享资源,这时用分布式锁是最直接有效的。

随着技术快速发展,数据规模增大,分布式系统越来越普及,一个应用往往会部署在多台机器上(多节点),在有些场景中,为了保证数据不重复,要求在同一时刻,同一任务只在一个节点上运行,即保证某一方法同一时刻只能被一个线程执行。在单机环境中,应用是在同一进程下的,只需要保证单进程多线程环境中的线程安全性,通过 JAVA 提供的 volatile、ReentrantLock、synchronized 以及 concurrent 并发包下一些线程安全的类等就可以做到。而在多机部署环境中,不同机器不同进程,就需要在多进程下保证线程的安全性了。因此,分布式锁应运而生。

以往的工作中看到或用到几种实现方案,有基于zk的,也有基于redis的。由于实现上逻辑不严谨,线上时不时会爆出几个死锁case。那么,究竟什么样的分布式锁实现,才算是比较好的方案?

一、常见分布式锁方案对比

分类方案实现原理优点缺点
基于数据库基于mysql 表唯一索引1.表增加唯一索引
2.加锁:执行insert语句,若报错,则表明加锁失败
3.解锁:执行delete语句
完全利用DB现有能力,实现简单1.锁无超时自动失效机制,有死锁风险
2.不支持锁重入,不支持阻塞等待
3.操作数据库开销大,性能不高
基于MongoDB findAndModify原子操作1.加锁:执行findAndModify原子命令查找document,若不存在则新增
2.解锁:删除document
实现也很容易,较基于MySQL唯一索引的方案,性能要好很多1.大部分公司数据库用MySQL,可能缺乏相应的MongoDB运维、开发人员
2.锁无超时自动失效机制
基于分布式协调系统基于ZooKeeper1.加锁:在/lock目录下创建临时有序节点,判断创建的节点序号是否最小。若是,则表示获取到锁;否,则则watch /lock目录下序号比自身小的前一个节点
2.解锁:删除节点
1.由zk保障系统高可用
2.Curator框架已原生支持系列分布式锁命令,使用简单
需单独维护一套zk集群,维保成本高
基于缓存基于redis命令1. 加锁:执行setnx,若成功再执行expire添加过期时间
2. 解锁:执行delete命令
实现简单,相比数据库和分布式系统的实现,该方案最轻,性能最好1.setnx和expire分2步执行,非原子操作;若setnx执行成功,但expire执行失败,就可能出现死锁
2.delete命令存在误删除非当前线程持有的锁的可能
3.不支持阻塞等待、不可重入
基于redis Lua脚本能力1. 加锁:执行SET lock_name random_value EX seconds NX 命令

2. 解锁:执行Lua脚本,释放锁时验证random_value 
-- ARGV[1]为random_value,  KEYS[1]为lock_name

if redis.call("get", KEYS[1]) == ARGV[1] then

    return redis.call("del",KEYS[1])

else

    return 0

end

同上;实现逻辑上也更严谨,除了单点问题,生产环境采用用这种方案,问题也不大。不支持锁重入,不支持阻塞等待

表格中对比了几种常见的方案,redis+lua基本可应付工作中分布式锁的需求。然而,当偶然看到redisson分布式锁实现方案(传送门),相比以上方案,redisson保持了简单易用、支持锁重入、支持阻塞等待、Lua脚本原子操作,不禁佩服作者精巧的构思和高超的编码能力。下面就来学习下redisson这个牛逼框架,是怎么实现的。

二、分布式锁需满足四个条件

首先,为了确保分布式锁可用,我们至少要确保锁的实现同时满足以下四个条件:

  1. 互斥性。在任意时刻,只有一个客户端能持有锁。
  2. 不会发生死锁。即使有一个客户端在持有锁的期间崩溃而没有主动解锁,也能保证后续其他客户端能加锁。
  3. 解铃还须系铃人。加锁和解锁必须是同一个客户端,客户端自己不能把别人加的锁给解了,即不能误解锁。
  4. 具有容错性。只要大多数Redis节点正常运行,客户端就能够获取和释放锁。

三、什么是Redisson?

官网和官方文档

官网:https://redisson.org/
官方文档: https://github.com/redisson/redisson/wiki

Redisson是一个在Redis的基础上实现的Java驻内存数据网格(In-Memory Data Grid)。它不仅提供了一系列的分布式的Java常用对象,还提供了许多分布式服务。
其中包括(BitSet, Set, Multimap, SortedSet, Map, List, Queue, BlockingQueue, Deque, BlockingDeque, Semaphore, Lock, AtomicLong, CountDownLatch, Publish / Subscribe, Bloom filter, Remote service, Spring cache, Executor service, Live Object service, Scheduler service) Redisson提供了使用Redis的最简单和最便捷的方法。
Redisson的宗旨是促进使用者对Redis的关注分离(Separation of Concern),从而让使用者能够将精力更集中地放在处理业务逻辑上。

Redisson使用

  • 加入jar包的依赖
<dependency>
    <groupId>org.redisson</groupId>
    <artifactId>redisson</artifactId>
    <version>2.7.0</version>
</dependency>
  • 配置Redisson
public class RedissonManager {
  private static Config config = new Config();
  //声明redisso对象
  private static Redisson redisson = null;
  
   //实例化redisson
	static{
	  config.useClusterServers()
	  // 集群状态扫描间隔时间,单位是毫秒
	 .setScanInterval(2000)
	  //cluster方式至少6个节点(3主3从,3主做sharding,3从用来保证主宕机后可以高可用)
	 .addNodeAddress("redis://127.0.0.1:6379" )
	 .addNodeAddress("redis://127.0.0.1:6380")
	 .addNodeAddress("redis://127.0.0.1:6381")
	 .addNodeAddress("redis://127.0.0.1:6382")
	 .addNodeAddress("redis://127.0.0.1:6383")
	 .addNodeAddress("redis://127.0.0.1:6384");
	 
	  //得到redisson对象
	  redisson = (Redisson) Redisson.create(config);
	}
	
	  //获取redisson对象的方法
	  public static Redisson getRedisson(){
	    return redisson;
	 }
}
  • 锁的获取和释放
public class DistributedRedisLock {
  //从配置类中获取redisson对象
  private static Redisson redisson = RedissonManager.getRedisson();
  private static final String LOCK_TITLE = "redisLock_";
  
  //加锁
  public static boolean acquire(String lockName){
    //声明key对象
    String key = LOCK_TITLE + lockName;
    //获取锁对象
    RLock mylock = redisson.getLock(key);
    //加锁,并且设置锁过期时间3秒,防止死锁的产生  uuid+threadId
    mylock.lock(2,3,TimeUtil.SECOND);
    //加锁成功
    return  true;
  }
  
  //锁的释放
  public static void release(String lockName){
    //必须是和加锁时的同一个key
    String key = LOCK_TITLE + lockName;
    //获取所对象
    RLock mylock = redisson.getLock(key);
    //释放锁(解锁)
    mylock.unlock();
  • 业务逻辑中使用分布式锁
public String discount() throws IOException{
    String key = "lock001";
    //加锁
    DistributedRedisLock.acquire(key);
    //执行具体业务逻辑
    dosoming
    //释放锁
    DistributedRedisLock.release(key);
    //返回结果
    return soming;
 }

四、Redisson 分布式重入锁用法

Redisson 支持单点模式、主从模式、哨兵模式、集群模式

这里以单点模式为例:

// 1.构造redisson实现分布式锁必要的Config
Config config = new Config();
config.useSingleServer().setAddress("redis://127.0.0.1:5379").setPassword("123456").setDatabase(0);
// 2.构造RedissonClient
RedissonClient redissonClient = Redisson.create(config);
// 3.获取锁对象实例(无法保证是按线程的顺序获取到)
RLock rLock = redissonClient.getLock(lockKey);
try {
    /**
     * 4.尝试获取锁
     * waitTimeout 尝试获取锁的最大等待时间,超过这个值,则认为获取锁失败
     * leaseTime   锁的持有时间,超过这个时间锁会自动失效(值应设置为大于业务处理的时间,确保在锁有效期内业务能处理完)
     */
    boolean res = rLock.tryLock((long)waitTimeout, (long)leaseTime, TimeUnit.SECONDS);
    if (res) {
        //成功获得锁,在这里处理业务
    }
} catch (Exception e) {
    throw new RuntimeException("aquire lock fail");
}finally{
    //无论如何, 最后都要解锁
    rLock.unlock();
}

redisson这个框架重度依赖了Lua脚本和Netty,代码很牛逼,各种Future及FutureListener的异步、同步操作转换。

自己先思考下,如果要手写一个分布式锁组件,怎么做?

  • 肯定要定义2个接口:加锁、解锁;大道至简,redisson的作者就是在加锁和解锁的执行层面采用Lua脚本,逼格高,而且重要有原子性保证啊。
  • 当然,redisson的作者毕竟牛逼,加锁和解锁过程中还巧妙地利用了redis的发布订阅功能,后面会讲到。下面先对加锁和解锁Lua脚本了解下。

五、加锁&解锁Lua脚本

在这里插入图片描述

加锁、解锁Lua脚本是redisson分布式锁实现最重要的组成部分。首先不看代码,先研究下Lua脚本都是什么逻辑

1、加锁Lua脚本

脚本入参

参数示例值含义
KEY个数1KEY个数
KEYS[1]my_first_lock_name锁名
ARGV[1]60000持有锁的有效时间:毫秒
ARGV[2]58c62432-bb74-4d14-8a00-9908cc8b828f:1唯一标识:获取锁时set的唯一值,实现上为redisson客户端ID(UUID)+线程ID

脚本内容

-- 若锁不存在:则新增锁,并设置锁重入计数为1、设置锁过期时间
if (redis.call('exists', KEYS[1]) == 0) then
    redis.call('hset', KEYS[1], ARGV[2], 1);
    redis.call('pexpire', KEYS[1], ARGV[1]);
    return nil;
end;
 
-- 若锁存在,且唯一标识也匹配:则表明当前加锁请求为锁重入请求,故锁重入计数+1,并再次设置锁过期时间
if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then
    redis.call('hincrby', KEYS[1], ARGV[2], 1);
    redis.call('pexpire', KEYS[1], ARGV[1]);
    return nil;
end;
 
-- 若锁存在,但唯一标识不匹配:表明锁是被其他线程占用,当前线程无权解他人的锁,直接返回锁剩余过期时间
return redis.call('pttl', KEYS[1]);

脚本解读

在这里插入图片描述

Q:返回nil、返回剩余过期时间有什么目的?
A:当且仅当返回nil,才表示加锁成功;客户端需要感知加锁是否成功的结果

2、解锁Lua脚本

脚本入参

参数示例值含义
KEY个数2KEY个数
KEYS[1]my_first_lock_name锁名
KEYS[2]redisson_lock__channel:{my_first_lock_name}解锁消息PubSub频道
ARGV[1]0redisson定义0表示解锁消息
ARGV[2]30000设置锁的过期时间;默认值30秒
ARGV[3]58c62432-bb74-4d14-8a00-9908cc8b828f:1唯一标识;同加锁流程

脚本内容

-- 若锁不存在:则直接广播解锁消息,并返回1
if (redis.call('exists', KEYS[1]) == 0) then
    redis.call('publish', KEYS[2], ARGV[1]);
    return 1; 
end;
 
-- 若锁存在,但唯一标识不匹配:则表明锁被其他线程占用,当前线程不允许解锁其他线程持有的锁
if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then
    return nil;
end; 
 
-- 若锁存在,且唯一标识匹配:则先将锁重入计数减1
local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); 
if (counter > 0) then 
    -- 锁重入计数减1后还大于0:表明当前线程持有的锁还有重入,不能进行锁删除操作,但可以友好地帮忙设置下过期时期
    redis.call('pexpire', KEYS[1], ARGV[2]); 
    return 0; 
else 
    -- 锁重入计数已为0:间接表明锁已释放了。直接删除掉锁,并广播解锁消息,去唤醒那些争抢过锁但还处于阻塞中的线程
    redis.call('del', KEYS[1]); 
    redis.call('publish', KEYS[2], ARGV[1]); 
    return 1;
end;
 
return nil;

脚本解读

在这里插入图片描述

Q1:广播解锁消息有什么用?
A:是为了通知其他争抢锁阻塞住的线程,从阻塞中解除,并再次去争抢锁。

Q2:返回值0、1、nil有什么不一样?
A:当且仅当返回1,才表示当前请求真正触发了解锁Lua脚本;但客户端又并不关心解锁请求的返回值,好像没什么用?

六、源码解读

加锁流程源码

读加锁源码时,可以把tryAcquire(leaseTime, unit, threadId)方法直接视为执行加锁Lua脚本。直接进入org.redisson.RedissonLock#tryLock(long, long, java.util.concurrent.TimeUnit)源码

@Override
    public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException {
        // 获取锁能容忍的最大等待时长
        long time = unit.toMillis(waitTime);
        long current = System.currentTimeMillis();
        final long threadId = Thread.currentThread().getId();
 
        // 【核心点1】尝试获取锁,若返回值为null,则表示已获取到锁
        Long ttl = tryAcquire(leaseTime, unit, threadId);
        // lock acquired
        if (ttl == null) {
            return true;
        }
 
        // 还可以容忍的等待时长=获取锁能容忍的最大等待时长 - 执行完上述操作流逝的时间
        time -= (System.currentTimeMillis() - current);
        if (time <= 0) {
            acquireFailed(threadId);
            return false;
        }
 
        current = System.currentTimeMillis();
        // 【核心点2】订阅解锁消息,见org.redisson.pubsub.LockPubSub#onMessage
        /**
     * 4.订阅锁释放事件,并通过await方法阻塞等待锁释放,有效的解决了无效的锁申请浪费资源的问题:
     * 基于信息量,当锁被其它资源占用时,当前线程通过 Redis 的 channel 订阅锁的释放事件,一旦锁释放会发消息通知待等待的线程进行竞争
     * 当 this.await返回false,说明等待时间已经超出获取锁最大等待时间,取消订阅并返回获取锁失败
     * 当 this.await返回true,进入循环尝试获取锁
     */
        final RFuture<RedissonLockEntry> subscribeFuture = subscribe(threadId);
        //await 方法内部是用CountDownLatch来实现阻塞,获取subscribe异步执行的结果(应用了Netty 的 Future)
        if (!await(subscribeFuture, time, TimeUnit.MILLISECONDS)) {
            if (!subscribeFuture.cancel(false)) {
                subscribeFuture.addListener(new FutureListener<RedissonLockEntry>() {
                    @Override
                    public void operationComplete(Future<RedissonLockEntry> future) throws Exception {
                        if (subscribeFuture.isSuccess()) {
                            unsubscribe(subscribeFuture, threadId);
                        }
                    }
                });
            }
            acquireFailed(threadId);
            return false;
        }
 
        // 订阅成功
        try {
            // 还可以容忍的等待时长=获取锁能容忍的最大等待时长 - 执行完上述操作流逝的时间
            time -= (System.currentTimeMillis() - current);
            if (time <= 0) {
                // 超出可容忍的等待时长,直接返回获取锁失败
                acquireFailed(threadId);
                return false;
            }
 
            while (true) {
                long currentTime = System.currentTimeMillis();
                // 尝试获取锁;如果锁被其他线程占用,就返回锁剩余过期时间【同上】
                ttl = tryAcquire(leaseTime, unit, threadId);
                // lock acquired
                if (ttl == null) {
                    return true;
                }
 
                time -= (System.currentTimeMillis() - currentTime);
                if (time <= 0) {
                    acquireFailed(threadId);
                    return false;
                }
 
                // waiting for message
                currentTime = System.currentTimeMillis();
 
                // 【核心点3】根据锁TTL,调整阻塞等待时长;
                // 注意:这里实现非常巧妙,1、latch其实是个信号量Semaphore,调用其tryAcquire方法会让当前线程阻塞一段时间,避免了在while循环中频繁请求获取锁;
               //2、该Semaphore的release方法,会在订阅解锁消息的监听器消息处理方法org.redisson.pubsub.LockPubSub#onMessage调用;当其他线程释放了占用的锁,会广播解锁消息,监听器接收解锁消息,并释放信号量,最终会唤醒阻塞在这里的线程。
                if (ttl >= 0 && ttl < time) {
                    getEntry(threadId).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
                } else {
                    getEntry(threadId).getLatch().tryAcquire(time, TimeUnit.MILLISECONDS);
                }
 
                time -= (System.currentTimeMillis() - currentTime);
                if (time <= 0) {
                    acquireFailed(threadId);
                    return false;
                }
            }
        } finally {
            // 取消解锁消息的订阅
            unsubscribe(subscribeFuture, threadId);
        }
    }

接下的再获取锁方法 tryAcquire的实现,真的就是执行Lua脚本!

private Long tryAcquire(long leaseTime, TimeUnit unit, long threadId) {
    // tryAcquireAsync异步执行Lua脚本,get方法同步获取返回结果
    return get(tryAcquireAsync(leaseTime, unit, threadId));
}
 
//  见org.redisson.RedissonLock#tryAcquireAsync
private <T> RFuture<Long> tryAcquireAsync(long leaseTime, TimeUnit unit, final long threadId) {
    if (leaseTime != -1) {
        // 实质是异步执行加锁Lua脚本
        return tryLockInnerAsync(leaseTime, unit, threadId, RedisCommands.EVAL_LONG);
    }
    RFuture<Long> ttlRemainingFuture = tryLockInnerAsync(commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(), TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);
    ttlRemainingFuture.addListener(new FutureListener<Long>() {
        @Override
        public void operationComplete(Future<Long> future) throws Exception {
            //先判断这个异步操作有没有执行成功,如果没有成功,直接返回,如果执行成功了,就会同步获取结果
            if (!future.isSuccess()) {
                return;
            }
 
            Long ttlRemaining = future.getNow();
            // lock acquired
            //如果ttlRemaining为null,则会执行一个定时调度的方法scheduleExpirationRenewal
            if (ttlRemaining == null) {
                scheduleExpirationRenewal(threadId);
            }
        }
    });
    return ttlRemainingFuture;
}
 
// 见org.redisson.RedissonLock#tryLockInnerAsync
<T> RFuture<T> tryLockInnerAsync(long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
    internalLockLeaseTime = unit.toMillis(leaseTime);
 
    return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, command,
              "if (redis.call('exists', KEYS[1]) == 0) then " +
                  "redis.call('hset', KEYS[1], ARGV[2], 1); " +
                  "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                  "return nil; " +
              "end; " +
              "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
                  "redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
                  "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                  "return nil; " +
              "end; " +
              "return redis.call('pttl', KEYS[1]);",
                Collections.<Object>singletonList(getName()), internalLockLeaseTime, getLockName(threadId));
}

加锁过程小结

  1. 锁其实也是一种资源,各线程争抢锁操作对应到redisson中就是争抢着去创建一个hash结构,谁先创建就代表谁获得锁;hash的名称为锁名,hash里面内容仅包含一条键值对,键为redisson客户端唯一标识+持有锁线程id,值为锁重入计数;给hash设置的过期时间就是锁的过期时间。放个图直观感受下:
    在这里插入图片描述

  2. 加锁流程核心就3步

Step1:尝试获取锁,这一步是通过执行加锁Lua脚本来做;
Step2:若第一步未获取到锁,则去订阅解锁消息,当获取锁到剩余过期时间后,调用信号量方法阻塞住,直到被唤醒或等待超时
Step3:一旦持有锁的线程释放了锁,就会广播解锁消息。于是,第二步中的解锁消息的监听器会释放信号量,获取锁被阻塞的那些线程就会被唤醒,并重新尝试获取锁。

比如 RedissonLock中的变量internalLockLeaseTime,默认值是30000毫秒,还有调用tryLockInnerAsync()传入的一个从连接管理器获取的getLockWatchdogTimeout(),他的默认值也是30000毫秒,这些都和redisson官方文档所说的watchdog机制有关,看门狗,还是很形象的描述这一机制,那么看门狗到底做了什么,为什么这么做,来看下核心代码.

先思考一个问题,假设在一个分布式环境下,多个服务实例请求获取锁,其中服务实例1成功获取到了锁,在执行业务逻辑的过程中,服务实例突然挂掉了或者hang住了,那么这个锁会不会释放,什么时候释放?回答这个问题,自然想起来之前我们分析的lua脚本,其中第一次加锁的时候使用pexpire给锁key设置了过期时间,默认30000毫秒,由此来看如果服务实例宕机了,锁最终也会释放,其他服务实例也是可以继续获取到锁执行业务。但是要是30000毫秒之后呢,要是服务实例1没有宕机但是业务执行还没有结束,所释放掉了就会导致线程问题,这个redisson是怎么解决的呢?这个就一定要实现自动延长锁有效期的机制。
异步执行完lua脚本执行完成之后,设置了一个监听器,来处理异步执行结束之后的一些工作。在操作完成之后会去执行operationComplete方法,先判断这个异步操作有没有执行成功,如果没有成功,直接返回,如果执行成功了,就会同步获取结果,如果ttlRemaining为null,则会执行一个定时调度的方法scheduleExpirationRenewal,回想一下之前的lua脚本,当加锁逻辑
处理结束,返回了一个nil;如此说来 就一定会走定时任务了。来看下定时调度scheduleExpirationRenewal代码

private void scheduleExpirationRenewal(final long threadId) {
        if (expirationRenewalMap.containsKey(getEntryName())) {
            return;
        }
 
        Timeout task = commandExecutor.getConnectionManager().newTimeout(new TimerTask() {
            @Override
            public void run(Timeout timeout) throws Exception {
                
                RFuture<Boolean> future = commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
                        "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
                            "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                            "return 1; " +
                        "end; " +
                        "return 0;",
                          Collections.<Object>singletonList(getName()), internalLockLeaseTime, getLockName(threadId));
                
                future.addListener(new FutureListener<Boolean>() {
                    @Override
                    public void operationComplete(Future<Boolean> future) throws Exception {
                        expirationRenewalMap.remove(getEntryName());
                        if (!future.isSuccess()) {
                            log.error("Can't update lock " + getName() + " expiration", future.cause());
                            return;
                        }
                        
                        if (future.getNow()) {
                            // reschedule itself
                            scheduleExpirationRenewal(threadId);
                        }
                    }
                });
            }
        }, internalLockLeaseTime / 3, TimeUnit.MILLISECONDS);
 
        if (expirationRenewalMap.putIfAbsent(getEntryName(), task) != null) {
            task.cancel();
        }
    }

首先,会先判断在expirationRenewalMap中是否存在了entryName,这是个map结构,主要还是判断在这个服务实例中的加锁客户端的锁key是否存在,如果已经存在了,就直接返回;第一次加锁,肯定是不存在的,接下来就是搞了一个TimeTask,延迟internalLockLeaseTime/3之后执行,这里就用到了文章一开始就提到奇妙的变量,算下来就是大约10秒钟执行一次,调用了一个异步执行的方法

在这里插入图片描述

如图也是调用异步执行了一段lua脚本,首先判断这个锁key的map结构中是否存在对应的key8a9649f5-f5b5-48b4-beaa-d0c24855f9ab:anyLock:1,如果存在,就直接调用pexpire命令设置锁key的过期时间,默认30000毫秒。

OK,现在思路就清晰了,在上面任务调度的方法中,也是异步执行并且设置了一个监听器,在操作执行成功之后,会回调这个方法,如果调用失败会打一个错误日志并返回,更新锁过期时间失败;然后获取异步执行的结果,如果为true,就会调用本身,如此说来又会延迟10秒钟去执行这段逻辑,所以,这段逻辑在你成功获取到锁之后,会每隔十秒钟去执行一次,并且,在锁key还没有失效的情况下,会把锁的过期时间继续延长到30000毫秒,也就是说只要这台服务实例没有挂掉,并且没有主动释放锁,看门狗都会每隔十秒给你续约一下,保证锁一直在你手中。完美的操作。

到现在来说,加锁,锁自动延长过期时间,都OK了,然后就是说在你执行业务,持有锁的这段时间,别的服务实例来尝试加锁又会发生什么情况呢?或者当前客户端的别的线程来获取锁呢?很显然,肯定会阻塞住,我们来通过代码看看是怎么做到的。还是把眼光放到之前分析的那段加锁lua代码上,当加锁的锁key存在的时候并且锁key对应的map结构中当前客户端的唯一key也存在时,会去调用hincrby命令,将唯一key的值自增一,并且会pexpire设置key的过期时间为30000毫秒,然后返回nil,可以想象这里也是加锁成功的,也会继续去执行定时调度任务,完成锁key过期时间的续约,这里呢,就实现了锁的可重入性。

那么当以上这种情况也没有发生呢,这里就会直接返回当前锁的剩余有效期,相应的也不会去执行续约逻辑。此时一直返回到上面的方法,如果加锁成功就直接返回;否则就会进入一个死循环,去尝试加锁,并且也会在等待一段时间之后一直循环尝试加锁,阻塞住,知道第一个服务实例释放锁。对于不同的服务实例尝试会获取一把锁,也和上面的逻辑类似,都是这样实现了锁的互斥。

紧接着,我们来看看锁释放的逻辑,其实也很简单,调用了lock.unlock()方法,跟着代码走流程发现,也是异步调用了一段lua脚本,lua脚本,应该就比较清晰,也就是通过判断锁key是否存在,如果不存在直接返回;否则就会判断当前客户端对应的唯一key的值是否存在,如果不存在就会返回nil;否则,值自增-1,判断唯一key的值是否大于零,如果大于零,则返回0;否则删除当前锁key,并返回1;返回到上一层方法,也是针对返回值进行了操作,如果返回值是1,则会去取消之前的定时续约任务,如果失败了,则会做一些类似设置状态的操作,这一些和解锁逻辑也没有什么关系,可以不去看他。

解锁流程源码

解锁流程相对比较简单,完全就是执行解锁Lua脚本,无额外的代码逻辑,直接看org.redisson.RedissonLock#unlock代码

@Override
    public void unlock() {
        // 执行解锁Lua脚本,这里传入线程id,是为了保证加锁和解锁是同一个线程,避免误解锁其他线程占有的锁
        Boolean opStatus = get(unlockInnerAsync(Thread.currentThread().getId()));
        if (opStatus == null) {
            throw new IllegalMonitorStateException("attempt to unlock lock, not locked by current thread by node id: "
                    + id + " thread-id: " + Thread.currentThread().getId());
        }
        if (opStatus) {
            cancelExpirationRenewal();
        }
    }
 
// 见org.redisson.RedissonLock#unlockInnerAsync
protected RFuture<Boolean> unlockInnerAsync(long threadId) {
    return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
            "if (redis.call('exists', KEYS[1]) == 0) then " +
                "redis.call('publish', KEYS[2], ARGV[1]); " +
                "return 1; " +
            "end;" +
            "if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " +
                "return nil;" +
            "end; " +
            "local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " +
            "if (counter > 0) then " +
                "redis.call('pexpire', KEYS[1], ARGV[2]); " +
                "return 0; " +
            "else " +
                "redis.call('del', KEYS[1]); " +
                "redis.call('publish', KEYS[2], ARGV[1]); " +
                "return 1; "+
            "end; " +
            "return nil;",
            Arrays.<Object>asList(getName(), getChannelName()), LockPubSub.unlockMessage, internalLockLeaseTime, getLockName(threadId));
 
}

加锁&解锁流程串起来

上面结合Lua脚本和源码,分别分析了加锁流程和解锁流程。下面升级下挑战难度,模拟下多个线程争抢锁会是怎样的流程。示意图如下,比较关键的三处已用红色字体标注。

在这里插入图片描述

概括下整个流程

  1. 线程A和线程B两个线程同时争抢锁。线程A很幸运,最先抢到了锁。线程B在获取锁失败后,并未放弃希望,而是主动订阅了解锁消息,然后再尝试获取锁,顺便看看没有抢到的这把锁还有多久就过期,线程B就按需阻塞等锁释放。

  2. 线程A拿着锁干完了活,自觉释放了持有的锁,于此同时广播了解锁消息,通知其他抢锁的线程再来枪;

  3. 解锁消息的监听者LockPubSub收到消息后,释放自己持有的信号量;线程B就瞬间从阻塞中被唤醒了,接着再抢锁,这次终于抢到锁了!后面再按部就班,干完活,解锁

七、其它相关

Q1:订阅频道名称(如:redisson_lock__channel:{my_first_lock_name})为什么有大括号?
A:

  1. 在redis集群方案中,如果Lua脚本涉及多个key的操作,则需限制这些key在同一个slot中,才能保障Lua脚本执行的原子性。否则运行会报错Lua script attempted to access a non local key in a cluster node . channel;
  2. HashTag是用{}包裹key的一个子串,若设置了HashTag,集群会根据HashTag决定key分配到哪个slot;HashTag不支持嵌套,只有第一个左括号{和第一个右括号}里面的内容才当做HashTag参与slot计算;通常,客户端都会封装这个计算逻辑。
// 见org.redisson.cluster.ClusterConnectionManager#calcSlot
@Override
public int calcSlot(String key) {
    if (key == null) {
        return 0;
    }
 
    int start = key.indexOf('{');
    if (start != -1) {
        int end = key.indexOf('}');
        key = key.substring(start+1, end);
    }
 
    int result = CRC16.crc16(key.getBytes()) % MAX_SLOT;
    log.debug("slot {} for {}", result, key);
    return result;
}
  1. 在解锁Lua脚本中,操作了两个key:一个是锁名my_lock_name,一个是解锁消息发布订阅频道redisson_lock__channel:{my_first_lock_name},按照上面slot计算方式,两个key都会按照内容my_first_lock_name来计算,故能保证落到同一个slot

Q2:redisson代码几乎都是以Lua脚本方式与redis服务端交互,如何跟踪这些脚本执行过程?
A:启动一个redis客户端终端,执行monitor命令以便在终端上实时打印 redis 服务器接收到的命令;然后debug执行redisson加锁/解锁测试用例,即可看到代码运行过程中实际执行了哪些Lua脚本

eg:上面整体流程示意图的测试用例位:

@RunWith(SpringRunner.class)
@SpringBootTest
public class RedissonDistributedLockerTest {
 
    private static final Logger log = LoggerFactory.getLogger(RedissonDistributedLocker.class);
 
    @Resource
    private DistributedLocker distributedLocker;
 
    private static final ExecutorService executorServiceB = Executors.newSingleThreadExecutor();
 
    private static final ExecutorService executorServiceC = Executors.newSingleThreadExecutor();
 
    @Test
    public void tryLockUnlockCost() throws Exception {
        StopWatch stopWatch = new StopWatch("加锁解锁耗时统计");
        stopWatch.start();
        for (int i = 0; i < 10000; i++) {
            String key = "mock-key:" + UUID.randomUUID().toString().replace("-", "");
            Optional<LockResource> optLocked = distributedLocker.tryLock(key, 600000, 600000);
            Assert.assertTrue(optLocked.isPresent());
            optLocked.get().unlock();
        }
        stopWatch.stop();
        log.info(stopWatch.prettyPrint());
    }
 
    @Test
    public void tryLock() throws Exception {
        String key = "mock-key:" + UUID.randomUUID().toString().replace("-", "");
        Optional<LockResource> optLocked = distributedLocker.tryLock(key, 600000, 600000);
        Assert.assertTrue(optLocked.isPresent());
 
        Optional<LockResource> optLocked2 = distributedLocker.tryLock(key, 600000, 600000);
        Assert.assertTrue(optLocked2.isPresent());
 
        optLocked.get().unlock();
    }
 
    /**
     * 模拟2个线程争抢锁:A先获取到锁,A释放锁后,B再获得锁
     */
    @Test
    public void tryLock2() throws Exception {
        String key = "mock-key:" + UUID.randomUUID().toString().replace("-", "");
        CountDownLatch countDownLatch = new CountDownLatch(1);
        Future<Optional<LockResource>> submit = executorServiceB.submit(() -> {
                    countDownLatch.await();
                    log.info("B尝试获得锁:thread={}", currentThreadId());
                    return distributedLocker.tryLock(key, 600000, 600000);
                }
        );
 
        log.info("A尝试获得锁:thread={}", currentThreadId());
        Optional<LockResource> optLocked = distributedLocker.tryLock(key, 300000, 600000);
        Assert.assertTrue(optLocked.isPresent());
 
        log.info("A已获得锁:thread={}", currentThreadId());
        countDownLatch.countDown();
 
        optLocked.get().unlock();
        log.info("A已释放锁:thread={}", currentThreadId());
 
        Optional<LockResource> lockResource2 = submit.get();
        Assert.assertTrue(lockResource2.isPresent());
 
        executorServiceB.submit(() -> {
            log.info("B已获得锁:thread={}", currentThreadId());
            lockResource2.get().unlock();
            log.info("B已释放锁:thread={}", currentThreadId());
        });
    }
 
    /**
     * 模拟3个线程争抢锁:A先获取到锁,A释放锁后,B和C同时争抢锁
     */
    @Test
    public void tryLock3() throws Exception {
        String key = "mock-key:" + UUID.randomUUID().toString().replace("-", "");
 
        log.info("A尝试获得锁:thread={}", currentThreadId());
        Optional<LockResource> optLocked = distributedLocker.tryLock(key, 600000, 600000);
 
        if (optLocked.isPresent()) {
            log.info("A已获得锁:thread={}", currentThreadId());
        }
        Assert.assertTrue(optLocked.isPresent());
 
        CyclicBarrier cyclicBarrier = new CyclicBarrier(2);
        Future<Optional<LockResource>> submitB = executorServiceB.submit(() -> {
                    cyclicBarrier.await();
                    log.info("B尝试获得锁:thread={}", currentThreadId());
                    return distributedLocker.tryLock(key, 600000, 600000);
                }
        );
 
        Future<Optional<LockResource>> submitC = executorServiceC.submit(() -> {
                    cyclicBarrier.await();
                    log.info("C尝试获得锁:thread={}", currentThreadId());
                    return distributedLocker.tryLock(key, 600000, 600000);
                }
        );
 
        optLocked.get().unlock();
        log.info("A已释放锁:thread={}", currentThreadId());
 
        CountDownLatch countDownLatch = new CountDownLatch(2);
        executorServiceB.submit(() -> {
            log.info("B已获得锁:thread={}", currentThreadId());
            try {
                submitB.get().get().unlock();
            } catch (InterruptedException | ExecutionException e) {
                e.printStackTrace();
            }
            log.info("B已释放锁:thread={}", currentThreadId());
            countDownLatch.countDown();
        });
 
        executorServiceC.submit(() -> {
            log.info("C已获得锁:thread={}", currentThreadId());
            try {
                submitC.get().get().unlock();
            } catch (InterruptedException | ExecutionException e) {
                e.printStackTrace();
            }
            log.info("C已释放锁:thread={}", currentThreadId());
            countDownLatch.countDown();
        });
 
        countDownLatch.await();
    }
 
    private static Long currentThreadId() {
        return Thread.currentThread().getId();
    }
 
    @Test
    public void tryLockWaitTimeout() throws Exception {
        String key = "mock-key:" + UUID.randomUUID().toString();
 
        Optional<LockResource> optLocked = distributedLocker.tryLock(key, 10, 2000);
        Assert.assertTrue(optLocked.isPresent());
 
        Optional<LockResource> optLockResource = CompletableFuture.supplyAsync(() -> {
                long now = System.currentTimeMillis();
                Optional<LockResource> optLockedAgain = distributedLocker.tryLock(key, 1000, 10);
                long cost = System.currentTimeMillis() - now;
                log.info("cost={}", cost);
                return optLockedAgain;
        }).exceptionally(th -> {
            log.error("Exception: ", th);
            return Optional.empty();
        }).join();
 
        Assert.assertTrue(!optLockResource.isPresent());
    }
 
    @Test
    public void tryLockWithLeaseTime() throws Exception {
        String key = "mock-key-with-leaseTime:" + UUID.randomUUID().toString();
        Optional<LockResource> optLocked = distributedLocker.tryLock(key, 3000, 1000);
        Assert.assertTrue(optLocked.isPresent());
 
        // 可重入
        Optional<LockResource> optLockedAgain = distributedLocker.tryLock(key, 3000, 1000);
        Assert.assertTrue(optLockedAgain.isPresent());
    }
 
    /**
     * 模拟1000个并发请求枪一把锁
     */
    @Test
    public void tryLockWithLeaseTimeOnMultiThread() throws Exception {
        int totalThread = 1000;
        String key = "mock-key-with-leaseTime:" + UUID.randomUUID().toString();
        AtomicInteger tryAcquireLockTimes = new AtomicInteger(0);
        AtomicInteger acquiredLockTimes = new AtomicInteger(0);
 
        ExecutorService executor = Executors.newFixedThreadPool(totalThread);
        for (int i = 0; i < totalThread; i++) {
            executor.submit(new Runnable() {
 
                @Override
                public void run() {
                    tryAcquireLockTimes.getAndIncrement();
                    Optional<LockResource> optLocked = distributedLocker.tryLock(key, 10, 10000);
                    if (optLocked.isPresent()) {
                        acquiredLockTimes.getAndIncrement();
                    }
                }
            });
        }
        executor.awaitTermination(15, TimeUnit.SECONDS);
 
        Assert.assertTrue(tryAcquireLockTimes.get() == totalThread);
        Assert.assertTrue(acquiredLockTimes.get() == 1);
    }
 
    @Test
    public void tryLockWithLeaseTimeOnMultiThread2() throws Exception {
        int totalThread = 100;
        String key = "mock-key-with-leaseTime:" + UUID.randomUUID().toString();
        AtomicInteger tryAcquireLockTimes = new AtomicInteger(0);
        AtomicInteger acquiredLockTimes = new AtomicInteger(0);
 
        ExecutorService executor = Executors.newFixedThreadPool(totalThread);
        for (int i = 0; i < totalThread; i++) {
            executor.submit(new Runnable() {
 
                @Override
                public void run() {
                    long now = System.currentTimeMillis();
                    Optional<LockResource> optLocked = distributedLocker.tryLock(key, 10000, 5);
                    long cost = System.currentTimeMillis() - now;
                    log.info("tryAcquireLockTimes={}||wait={}", tryAcquireLockTimes.incrementAndGet(), cost);
                    if (optLocked.isPresent()) {
                        acquiredLockTimes.getAndIncrement();
                        // 主动释放锁
                        optLocked.get().unlock();
                    }
                }
            });
        }
        executor.awaitTermination(20, TimeUnit.SECONDS);
 
        log.info("tryAcquireLockTimes={}, acquireLockTimes={}", tryAcquireLockTimes.get(), acquiredLockTimes.get());
        Assert.assertTrue(tryAcquireLockTimes.get() == totalThread);
        Assert.assertTrue(acquiredLockTimes.get() == totalThread);
    }
 
}
 
 
public interface DistributedLocker {
 
    Optional<LockResource> tryLock(String lockKey, int waitTime);
 
    Optional<LockResource> tryLock(String lockKey, int waitTime, int leaseTime);
 
}
 
public interface LockResource {
 
    void unlock();
 
}

执行的Lua脚本如下:

加锁:redissonClient.getLock("my_first_lock_name").tryLock(600000, 600000);
解锁:redissonClient.getLock("my_first_lock_name").unlock();

# 线程A
## 1.1.1尝试获取锁 -> 成功
1568357723.205362 [0 127.0.0.1:56419] "EVAL" "if (redis.call('exists', KEYS[1]) == 0) then redis.call('hset', KEYS[1], ARGV[2], 1); redis.call('pexpire', KEYS[1], ARGV[1]); return nil; end; if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then redis.call('hincrby', KEYS[1], ARGV[2], 1); redis.call('pexpire', KEYS[1], ARGV[1]); return nil; end; return redis.call('pttl', KEYS[1]);" "1" "my_first_lock_name" "600000" "58c62432-bb74-4d14-8a00-9908cc8b828f:1"
1568357723.205452 [0 lua] "exists" "my_first_lock_name"
1568357723.208858 [0 lua] "hset" "my_first_lock_name" "58c62432-bb74-4d14-8a00-9908cc8b828f:1" "1"
1568357723.208874 [0 lua] "pexpire" "my_first_lock_name" "600000"
 
 
# 线程B
### 2.1.1尝试获取锁,未获取到,返回锁剩余过期时间
1568357773.338018 [0 127.0.0.1:56417] "EVAL" "if (redis.call('exists', KEYS[1]) == 0) then redis.call('hset', KEYS[1], ARGV[2], 1); redis.call('pexpire', KEYS[1], ARGV[1]); return nil; end; if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then redis.call('hincrby', KEYS[1], ARGV[2], 1); redis.call('pexpire', KEYS[1], ARGV[1]); return nil; end; return redis.call('pttl', KEYS[1]);" "1" "my_first_lock_name" "600000" "58c62432-bb74-4d14-8a00-9908cc8b828f:26"
1568357773.338161 [0 lua] "exists" "my_first_lock_name"
1568357773.338177 [0 lua] "hexists" "my_first_lock_name" "58c62432-bb74-4d14-8a00-9908cc8b828f:26"
1568357773.338197 [0 lua] "pttl" "my_first_lock_name"
 
 
## 2.1.1.3 添加订阅(非Lua脚本) -> 订阅成功
1568357799.403341 [0 127.0.0.1:56421] "SUBSCRIBE" "redisson_lock__channel:{my_first_lock_name}"
 
 
## 2.1.1.4 再次尝试获取锁 -> 未获取到,返回锁剩余过期时间
1568357830.683631 [0 127.0.0.1:56418] "EVAL" "if (redis.call('exists', KEYS[1]) == 0) then redis.call('hset', KEYS[1], ARGV[2], 1); redis.call('pexpire', KEYS[1], ARGV[1]); return nil; end; if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then redis.call('hincrby', KEYS[1], ARGV[2], 1); redis.call('pexpire', KEYS[1], ARGV[1]); return nil; end; return redis.call('pttl', KEYS[1]);" "1" "my_first_lock_name" "600000" "58c62432-bb74-4d14-8a00-9908cc8b828f:26"
1568357830.684371 [0 lua] "exists" "my_first_lock_name"
1568357830.684428 [0 lua] "hexists" "my_first_lock_name" "58c62432-bb74-4d14-8a00-9908cc8b828f:26"
1568357830.684485 [0 lua] "pttl" "my_first_lock_name"
 
 
# 线程A
## 3.1.1 释放锁并广播解锁消息,0代表解锁消息
1568357922.122454 [0 127.0.0.1:56420] "EVAL" "if (redis.call('exists', KEYS[1]) == 0) then redis.call('publish', KEYS[2], ARGV[1]); return 1; end;if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then return nil;end; local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); if (counter > 0) then redis.call('pexpire', KEYS[1], ARGV[2]); return 0; else redis.call('del', KEYS[1]); redis.call('publish', KEYS[2], ARGV[1]); return 1; end; return nil;" "2" "my_first_lock_name" "redisson_lock__channel:{my_first_lock_name}" "0" "30000" "58c62432-bb74-4d14-8a00-9908cc8b828f:1"
1568357922.123645 [0 lua] "exists" "my_first_lock_name"
1568357922.123701 [0 lua] "hexists" "my_first_lock_name" "58c62432-bb74-4d14-8a00-9908cc8b828f:1"
1568357922.123741 [0 lua] "hincrby" "my_first_lock_name" "58c62432-bb74-4d14-8a00-9908cc8b828f:1" "-1"
1568357922.123775 [0 lua] "del" "my_first_lock_name"
1568357922.123799 [0 lua] "publish" "redisson_lock__channel:{my_first_lock_name}" "0"
 
 
# 线程B
## 监听到解锁消息消息 -> 释放信号量,阻塞被解除;4.1.1.1 再次尝试获取锁  -> 获取成功
1568357975.015206 [0 127.0.0.1:56419] "EVAL" "if (redis.call('exists', KEYS[1]) == 0) then redis.call('hset', KEYS[1], ARGV[2], 1); redis.call('pexpire', KEYS[1], ARGV[1]); return nil; end; if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then redis.call('hincrby', KEYS[1], ARGV[2], 1); redis.call('pexpire', KEYS[1], ARGV[1]); return nil; end; return redis.call('pttl', KEYS[1]);" "1" "my_first_lock_name" "600000" "58c62432-bb74-4d14-8a00-9908cc8b828f:26"
1568357975.015579 [0 lua] "exists" "my_first_lock_name"
1568357975.015633 [0 lua] "hset" "my_first_lock_name" "58c62432-bb74-4d14-8a00-9908cc8b828f:26" "1"
1568357975.015721 [0 lua] "pexpire" "my_first_lock_name" "600000"
 
## 4.1.1.3 取消订阅(非Lua脚本)
1568358031.185226 [0 127.0.0.1:56421] "UNSUBSCRIBE" "redisson_lock__channel:{my_first_lock_name}"
 
 
# 线程B
## 5.1.1 释放锁并广播解锁消息
1568358255.551896 [0 127.0.0.1:56417] "EVAL" "if (redis.call('exists', KEYS[1]) == 0) then redis.call('publish', KEYS[2], ARGV[1]); return 1; end;if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then return nil;end; local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); if (counter > 0) then redis.call('pexpire', KEYS[1], ARGV[2]); return 0; else redis.call('del', KEYS[1]); redis.call('publish', KEYS[2], ARGV[1]); return 1; end; return nil;" "2" "my_first_lock_name" "redisson_lock__channel:{my_first_lock_name}" "0" "30000" "58c62432-bb74-4d14-8a00-9908cc8b828f:26"
1568358255.552125 [0 lua] "exists" "my_first_lock_name"
1568358255.552156 [0 lua] "hexists" "my_first_lock_name" "58c62432-bb74-4d14-8a00-9908cc8b828f:26"
1568358255.552200 [0 lua] "hincrby" "my_first_lock_name" "58c62432-bb74-4d14-8a00-9908cc8b828f:26" "-1"
1568358255.552258 [0 lua] "del" "my_first_lock_name"
1568358255.552304 [0 lua] "publish" "redisson_lock__channel:{my_first_lock_name}" "0"

需要特别注意的是,RedissonLock 同样没有解决 节点挂掉的时候,存在丢失锁的风险的问题。而现实情况是有一些场景无法容忍的,所以 Redisson 提供了实现了redlock算法的 RedissonRedLock,RedissonRedLock 真正解决了单点失败的问题,代价是需要额外的为 RedissonRedLock 搭建Redis环境。

所以,如果业务场景可以容忍这种小概率的错误,则推荐使用 RedissonLock, 如果无法容忍,则推荐使用 RedissonRedLock。

八、redlock算法

Redis 官网对 redLock 算法的介绍大致如下:

The Redlock algorithm

在分布式版本的算法里我们假设我们有N个Redis master节点,这些节点都是完全独立的,我们不用任何复制或者其他隐含的分布式协调机制。之前我们已经描述了在Redis单实例下怎么安全地获取和释放锁。我们确保将在每(N)个实例上使用此方法获取和释放锁。在我们的例子里面我们把N设成5,这是一个比较合理的设置,所以我们需要在5台机器上面或者5台虚拟机上面运行这些实例,这样保证他们不会同时都宕掉。为了取到锁,客户端应该执行以下操作:

  1. 获取当前Unix时间,以毫秒为单位。

  2. 依次尝试从5个实例,使用相同的key和具有唯一性的value(例如UUID)获取锁。当向Redis请求获取锁时,客户端应该设置一个尝试从某个Reids实例获取锁的最大等待时间(超过这个时间,则立马询问下一个实例),这个超时时间应该小于锁的失效时间。例如你的锁自动失效时间为10秒,则超时时间应该在5-50毫秒之间。这样可以避免服务器端Redis已经挂掉的情况下,客户端还在死死地等待响应结果。如果服务器端没有在规定时间内响应,客户端应该尽快尝试去另外一个Redis实例请求获取锁。

  3. 客户端使用当前时间减去开始获取锁时间(步骤1记录的时间)就得到获取锁消耗的时间。当且仅当从大多数(N/2+1,这里是3个节点)的Redis节点都取到锁,并且使用的总耗时小于锁失效时间时,锁才算获取成功。

  4. 如果取到了锁,key的真正有效时间 = 有效时间(获取锁时设置的key的自动超时时间) - 获取锁的总耗时(询问各个Redis实例的总耗时之和)(步骤3计算的结果)。

  5. 如果因为某些原因,最终获取锁失败(即没有在至少 “N/2+1 ”个Redis实例取到锁或者“获取锁的总耗时”超过了“有效时间”),客户端应该在所有的Redis实例上进行解锁(即便某些Redis实例根本就没有加锁成功,这样可以防止某些节点获取到锁但是客户端没有得到响应而导致接下来的一段时间不能被重新获取锁)。

用 Redisson 实现分布式锁(红锁 RedissonRedLock)及源码分析(实现三)

这里以三个单机模式为例,需要特别注意的是他们完全互相独立,不存在主从复制或者其他集群协调机制。

Config config1 = new Config();
config1.useSingleServer().setAddress("redis://172.0.0.1:5378").setPassword("a123456").setDatabase(0);
RedissonClient redissonClient1 = Redisson.create(config1);
 
Config config2 = new Config();
config2.useSingleServer().setAddress("redis://172.0.0.1:5379").setPassword("a123456").setDatabase(0);
RedissonClient redissonClient2 = Redisson.create(config2);
 
Config config3 = new Config();
config3.useSingleServer().setAddress("redis://172.0.0.1:5380").setPassword("a123456").setDatabase(0);
RedissonClient redissonClient3 = Redisson.create(config3);
 
/**
 * 获取多个 RLock 对象
 */
RLock lock1 = redissonClient1.getLock(lockKey);
RLock lock2 = redissonClient2.getLock(lockKey);
RLock lock3 = redissonClient3.getLock(lockKey);
 
/**
 * 根据多个 RLock 对象构建 RedissonRedLock (最核心的差别就在这里)
 */
RedissonRedLock redLock = new RedissonRedLock(lock1, lock2, lock3);
 
try {
    /**
     * 4.尝试获取锁
     * waitTimeout 尝试获取锁的最大等待时间,超过这个值,则认为获取锁失败
     * leaseTime   锁的持有时间,超过这个时间锁会自动失效(值应设置为大于业务处理的时间,确保在锁有效期内业务能处理完)
     */
    boolean res = redLock.tryLock((long)waitTimeout, (long)leaseTime, TimeUnit.SECONDS);
    if (res) {
        //成功获得锁,在这里处理业务
    }
} catch (Exception e) {
    throw new RuntimeException("aquire lock fail");
}finally{
    //无论如何, 最后都要解锁
    redLock.unlock();
}

最核心的变化就是需要构建多个 RLock ,然后根据多个 RLock 构建成一个 RedissonRedLock,因为 redLock 算法是建立在多个互相独立的 Redis 环境之上的(为了区分可以叫为 Redission node),Redission node 节点既可以是单机模式(single),也可以是主从模式(master/salve),哨兵模式(sentinal),或者集群模式(cluster)。这就意味着,不能跟以往这样只搭建 1个 cluster、或 1个 sentinel 集群,或是1套主从架构就了事了,需要为 RedissonRedLock 额外搭建多几套独立的 Redission 节点。 比如可以搭建3个 或者5个 Redission节点,具体可看视资源及业务情况而定。

下图是一个利用多个 Redission node 最终 组成 RedLock分布式锁的例子,需要特别注意的是每个 Redission node 是互相独立的,不存在任何复制或者其他隐含的分布式协调机制。

在这里插入图片描述

在这里插入图片描述

Redisson 实现redlock算法源码分析(RedLock)

加锁核心代码

org.redisson.RedissonMultiLock#tryLock

public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException {
    long newLeaseTime = -1;
    if (leaseTime != -1) {
        newLeaseTime = unit.toMillis(waitTime)*2;
    }
    
    long time = System.currentTimeMillis();
    long remainTime = -1;
    if (waitTime != -1) {
        remainTime = unit.toMillis(waitTime);
    }
    long lockWaitTime = calcLockWaitTime(remainTime);
    /**
     * 1. 允许加锁失败节点个数限制(N-(N/2+1))
     */
    int failedLocksLimit = failedLocksLimit();
    /**
     * 2. 遍历所有节点通过EVAL命令执行lua加锁
     */
    List<RLock> acquiredLocks = new ArrayList<>(locks.size());
    for (ListIterator<RLock> iterator = locks.listIterator(); iterator.hasNext();) {
        RLock lock = iterator.next();
        boolean lockAcquired;
        /**
         *  3.对节点尝试加锁
         */
        try {
            if (waitTime == -1 && leaseTime == -1) {
                lockAcquired = lock.tryLock();
            } else {
                long awaitTime = Math.min(lockWaitTime, remainTime);
                lockAcquired = lock.tryLock(awaitTime, newLeaseTime, TimeUnit.MILLISECONDS);
            }
        } catch (RedisResponseTimeoutException e) {
            // 如果抛出这类异常,为了防止加锁成功,但是响应失败,需要解锁所有节点
            unlockInner(Arrays.asList(lock));
            lockAcquired = false;
        } catch (Exception e) {
            // 抛出异常表示获取锁失败
            lockAcquired = false;
        }
        
        if (lockAcquired) {
            /**
             *4. 如果获取到锁则添加到已获取锁集合中
             */
            acquiredLocks.add(lock);
        } else {
            /**
             * 5. 计算已经申请锁失败的节点是否已经到达 允许加锁失败节点个数限制 (N-(N/2+1))
             * 如果已经到达, 就认定最终申请锁失败,则没有必要继续从后面的节点申请了
             * 因为 Redlock 算法要求至少N/2+1 个节点都加锁成功,才算最终的锁申请成功
             */
            if (locks.size() - acquiredLocks.size() == failedLocksLimit()) {
                break;
            }
 
            if (failedLocksLimit == 0) {
                unlockInner(acquiredLocks);
                if (waitTime == -1 && leaseTime == -1) {
                    return false;
                }
                failedLocksLimit = failedLocksLimit();
                acquiredLocks.clear();
                // reset iterator
                while (iterator.hasPrevious()) {
                    iterator.previous();
                }
            } else {
                failedLocksLimit--;
            }
        }
 
        /**
         * 6.计算 目前从各个节点获取锁已经消耗的总时间,如果已经等于最大等待时间,则认定最终申请锁失败,返回false
         */
        if (remainTime != -1) {
            remainTime -= System.currentTimeMillis() - time;
            time = System.currentTimeMillis();
            if (remainTime <= 0) {
                unlockInner(acquiredLocks);
                return false;
            }
        }
    }
 
    if (leaseTime != -1) {
        List<RFuture<Boolean>> futures = new ArrayList<>(acquiredLocks.size());
        for (RLock rLock : acquiredLocks) {
            RFuture<Boolean> future = ((RedissonLock) rLock).expireAsync(unit.toMillis(leaseTime), TimeUnit.MILLISECONDS);
            futures.add(future);
        }
        
        for (RFuture<Boolean> rFuture : futures) {
            rFuture.syncUninterruptibly();
        }
    }
 
    /**
     * 7.如果逻辑正常执行完则认为最终申请锁成功,返回true
     */
    return true;
}

参考文献

[1]Distributed locks with Redis

[2]Distributed locks with Redis 中文版

[3]SET - Redis

[4]EVAL command

[5] Redisson

[6]Redis分布式锁的正确实现方式

[7]Redlock实现分布式锁

[8]Redisson实现Redis分布式锁

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/512008.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

深入理解Java虚拟机:JVM高级特性与最佳实践-总结-1

深入理解Java虚拟机&#xff1a;JVM高级特性与最佳实践-总结-1 Java内存区域与内存溢出异常运行时数据区域程序计数器Java虚拟机栈本地方法栈Java堆方法区 OutOfMemoryError异常Java堆溢出 垃圾收集器与内存分配策略对象是否可以被回收引用计数算法可达性分析算法 Java内存区域…

力库华为机试题练习

1、两数之和 arg [2, 3, 6, 5] target 4 for i in range(len(arg)): other target - arg[i] if other in arg[i1:]: print(i, arg[i1:].index(other)i1) else: print(“输入目标数在该列表中不存在”) 2、回文数 方法一&#xff1a; class Solution: def isPalindrome(sel…

抖音小程序怎么压缩图片?教你使用抖音图片压缩助手

图片压缩是将原始图像的数据量进行减少&#xff0c;从而使其文件大小更小&#xff0c;但尽量保持原有图像质量的一种技术。通过对图片进行压缩&#xff0c;可以降低图片在传输过程中所需的带宽和存储空间&#xff0c;提高网站或应用程序的加载速度和响应速度。 此外&#xff0…

亚马逊云科技将帮助GoPlus Security,助力行业健康发展

Gartner 2022年7月发布的技术成熟度曲线分析报告显示&#xff0c;目前Web3技术已经历了第一波创新高峰期&#xff0c;正在从“创新启动阶段”向“创新泡沫阶段”过渡&#xff0c;技术体系逐步成型&#xff0c;市场热度较高&#xff0c;创业投资活跃。高速增长的背后&#xff0c…

浅谈Hutool工具类

一、Hutool简介 Hutool是一个Java工具类库&#xff0c;它封装了很多常用的Java工具类&#xff0c;如加密解密、文件操作、日期时间处理、Http客户端等。它的目标是让Java开发变得更加简单、高效。 二、Hutool的特点 高效&#xff1a;提供了很多高效的工具类和方法。 简单&…

最全的国内chatGPT大模型企业及产品整理

作者 | gongyouliu 编辑 | gongyouliu 自从去年11月30日openAI发布chatGPT以来&#xff0c;chatGPT引爆了新一轮科技革命。最近很多年都没有哪一项科技进步如chatGPT这般吸引全球的目光。除了媒体的大肆报道&#xff0c;国内外各个科技公司、科研机构、高等院校都在跟进&#x…

智能卡接口(ISO7816)

概述 智能卡接口&#xff08;7816&#xff09;是外部智能卡通过2 线交换8 位数据的串行同步通讯手段。芯片提供了2 个7816主机接口模块。 ⚫ 2路独立7816接口 ⚫ 具备卡时钟输出端口&#xff0c;输出频率在1MHz~5MHz之间可设 ⚫ 位传输方向可配置&#xff0c;支持MSB First或LS…

初识C++之C++中的IO流

目录 一、C语言中的输入与输入 二、流 三、C中的流 四、C中的文件IO流 1. 二进制文件 1.1 打开文件 1.2 向文件写入数据 1.3 从文件读取数据 1.4 关闭文件 1.5 综合使用 2. 文本读写 一、C语言中的输入与输入 在C语言中&#xff0c;我们最长使用的输入输出方式就是…

0基础小白简单入门使用emqx的webhook+规则实现Mysql数据持久化

EMQX (opens new window)是一款大规模可弹性伸缩的云原生分布式物联网 MQTT (opens new window)消息服务器。 作为全球最具扩展性的 MQTT 消息服务器&#xff0c;EMQX 提供了高效可靠海量物联网设备连接&#xff0c;能够高性能实时移动与处理消息和事件流数据&#xff0c;帮助…

EndNote X9 参考文献附录列表 格式调整

文章目录 1 参考文献附录列表 格式调整2 EndNote X9 插入参考文献常见问题总结3 EndNote X9 快速上手教程&#xff08;毕业论文参考文献管理器&#xff09; 1 参考文献附录列表 格式调整 注意&#xff1a;这里讲的是对齐格式&#xff0c; 文献规范格式参考EndNote X9 快速上手…

发布会彩排哪些内容?要注意哪些细节?

传媒如春雨&#xff0c;润物细无声&#xff0c;大家好&#xff0c;我是51媒体网胡老师。 相信发布会前小伙伴都会进行彩排&#xff0c;对发布会的细节&#xff0c;流程&#xff0c;各个工种如何配合进行提前的演练&#xff0c;那么发布会彩排哪些内容&#xff0c;要注意哪些细…

数据结构-图的创建与深度优先遍历DFS(邻接矩阵-动态类型)

图的创建&#xff1a; 我们先构建一个无向图&#xff1a;如图所示 根据规定&#xff0c;如果两个顶点相连&#xff0c;则两顶点的边改为1&#xff0c;否则为0&#xff0c;我们用数组指针arcs来指向标记是否有边的数组。 1.先创建结构体&#xff0c;因为都为动态所以我们都先定…

RabbitMQ详解(四):SpringBoot整合MQ

SpringBoot整合MQ 需要创建两个springboot项目&#xff0c;一个springboot_rabbitmq_producer生产者&#xff0c;一个springboot_rabbitmq_consumer消费者 fanout模式&#xff08;配置文件方式&#xff09; 定义生产者 创建生产者工程 springboot_rabbitmq_producer pom.x…

DragonflyDB 安装使用

前言 全世界最快的内存数据库 Dragonfly是一种针对现代应用程序负荷需求而构建的内存数据库&#xff0c;完全兼容Redis和Memcached的 API&#xff0c;迁移时无需修改任何代码。相比于这些传统的内存数据库&#xff0c;Dragonfly提供了其25倍的吞吐量&#xff0c;高缓存命中率和…

演化博弈模型简介

演化博弈模型简介 文章目录 演化博弈模型简介[toc]1 演化博弈思想2 演化博弈关注的问题3 复制动态中的博弈 1 演化博弈思想 传统博弈苛刻假设&#xff1a; 完全理性完全信息 演化博弈论&#xff1a;演化博弈论(Evolutionary Game Theory)把博弈理论分析和动态演化过程分析结…

【python】Pandas库用法详解!

pandas 是基于NumPy 的一种工具&#xff0c;该工具是为解决数据分析任务而创建的。Pandas 纳入了大量库和一些标准的数据模型&#xff0c;提供了高效地操作大型数据集所需的工具。pandas提供了大量能使我们快速便捷地处理数据的函数和方法。你很快就会发现&#xff0c;它是使Py…

Android系统启动流程(三)——属性服务

1 属性服务 属性服务的启动在init进程中&#xff0c;init进程初始化的第二阶段初始化中启动了属性服务。 system/core/init/init.cpp int SecondStageMain(int argc, char** argv) {...PropertyInit();...StartPropertyService(&property_fd);...2 启动属性服务 system/…

Win系统软件闪屏/Edge闪屏/Office闪屏 - 解决方案

Win系统软件闪屏/Edge闪屏/Office闪屏 - 解决方案 前言原因解决方案方案1&#xff08;推荐&#xff09;&#xff1a;重新安装核显驱动方案2&#xff1a;软件使用独显方案3&#xff1a;软件关闭硬件加速 前言 使用Win10及以上系统时&#xff0c;可能会出现频繁闪现黑屏的状态&a…

【jupyter】mac os系统下的jupyter的实用技巧

Jupyter notebook是一个开源的web应用&#xff0c;可以让你创建和分享包含代码、公式、可视化和叙述文本的文档。它可以用于数据清洗和转换、数值模拟、统计建模、数据可视化、机器学习等多种用途。 在mac os系统下&#xff0c;有多种方法可以安装jupyter notebook&#xff0c…

十大生产力神器,包括5大jupyter插件和五个提升python研发生产力的神器

JupyterLab&#xff1a;一款下一代的笔记本界面&#xff0c;支持多种编程语言&#xff0c;包括python。它具有灵活的界面&#xff0c;可以配置和安排数据科学、科学计算、计算新闻和机器学习等领域的工作流程。 Voil&#xff1a;一款可以将笔记本转换为安全、独立的web应用程序…