Redis客户端框架Redisson

news2025/1/10 16:46:41

介绍

Redisson是架设在Redis基础上的一个Java驻内存数据网格(In-Memory Data Grid)。

Redisson在基于NIONetty框架上,充分的利用了Redis键值数据库提供的一系列优势,在Java实用工具包中常用接口的基础上,为使用者提供了一系列具有分布式特性的常用工具类。使得原本作为协调单机多线程并发程序的工具包获得了协调分布式多机多线程并发系统的能力,大大降低了设计和研发大规模分布式系统的难度。同时结合各富特色的分布式服务,更进一步简化了分布式环境中程序相互之间的协作。

特色

  1. 支持云托管服务模式(同时支持亚马逊云的ElastiCache Redis和微软云的Azure Redis Cache):

    自动发现主节点变化

  2. 支持Redis集群模式(同时支持亚马逊云的ElastiCache Redis Cluster和微软云的Azure Redis Cache):

    自动发现主从节点

    自动更新状态和组态拓扑

    自动发现槽的变化

  3. 支持Redis哨兵模式:

    自动发现主、从和哨兵节点

    自动更新状态和组态拓扑

  4. 支持Redis主从模式

  5. 支持Redis单节模式

  6. 多节点模式均支持读写分离:从读主写,主读主写,主从混读主写

  7. 所有对象和接口均支持异步操作

  8. 自行管理的弹性异步连接池

  9. 所有操作线程安全

  10. 支持LUA脚本

  11. 提供分布式对象
    通用对象桶(Object Bucket)、二进制流(Binary Stream)、地理空间对象桶(Geospatial Bucket)、BitSet、原子整长形(AtomicLong)、原子双精度浮点数(AtomicDouble)、话题(订阅分发)、 布隆过滤器(Bloom Filter)和基数估计算法(HyperLogLog)

  12. 提供分布式集合
    映射(Map)、多值映射(Multimap)、集(Set)、列表(List)、有序集(SortedSet)、计分排序集(ScoredSortedSet)、字典排序集(LexSortedSet)、列队(Queue)、双端队列(Deque)、阻塞队列(Blocking Queue)、有界阻塞列队(Bounded Blocking Queue)、 阻塞双端列队(Blocking Deque)、阻塞公平列队(Blocking Fair Queue)、延迟列队(Delayed Queue)、优先队列(Priority Queue)和优先双端队列(Priority Deque)

  13. 提供分布式锁和同步器
    可重入锁(Reentrant Lock)、公平锁(Fair Lock)、联锁(MultiLock)、 红锁(RedLock)、读写锁(ReadWriteLock)、信号量(Semaphore)、可过期性信号量(PermitExpirableSemaphore)和闭锁(CountDownLatch)

  14. 提供分布式服务
    分布式远程服务(Remote Service, RPC)、分布式实时对象(Live Object)服务、分布式执行服务(Executor Service)、分布式调度任务服务(Scheduler Service)和分布式映射归纳服务(MapReduce)

  15. 支持Spring框架

  16. 提供Spring Cache集成

  17. 提供Hibernate Cache集成

  18. 提供JCache实现

  19. 提供Tomcat Session Manager

  20. 提供Spring Session集成

  21. 支持异步流方式执行操作

  22. 支持Redis管道操作(批量执行)

  23. 支持安卓(Andriod)系统

  24. 支持断线自动重连

  25. 支持命令发送失败自动重试

  26. 支持OSGi

  27. 支持采用多种方式自动序列化和反序列化(Jackson JSON, Avro, Smile, CBOR, MsgPack, Kryo, FST, LZ4, Snappy和JDK序列化)

引入包

 <dependency>
    <groupId>org.redisson</groupId>
    <artifactId>redisson</artifactId>
    <version>3.12.1</version>
 </dependency>

https://mvnrepository.com/artifact/org.redisson/redisson

源码解析

基于3.12.1 版本。

RedissonClient

使用redis之前必须要创建连接,也即RedissonClient

RedissonClient是个接口,提供操作Redis的一系列命令。

Redisson类是RedissonClient接口的唯一实现。Redisson类提供了几个静态方法创建Client。

public static RedissonClient create();//使用默认配置
public static RedissonClient create(Config config)  //使用指定配置
public static RedissonRxClient createRx();          //RxJava2
public static RedissonRxClient createRx(Config config);
public static RedissonReactiveClient createReactive();//Reactive interface,Project Reactor
public static RedissonReactiveClient createReactive(Config config);  

redisson支持3种方式访问Redis,大多数情况使用RedissonClient,即Redisson,同步操作。

在创建client时需要指定配置,通过Config类完成。

public class Config {
	 //几种模式
    private SentinelServersConfig sentinelServersConfig;
    private MasterSlaveServersConfig masterSlaveServersConfig;
    private SingleServerConfig singleServerConfig;
    private ClusterServersConfig clusterServersConfig;
    private ReplicatedServersConfig replicatedServersConfig;
    //连接管理
    private ConnectionManager connectionManager;

    /**
     * Threads amount shared between all redis node clients
     */
    private int threads = 16;
    private int nettyThreads = 32;

    /**
     * Redis key/value codec. FST codec is used by default
     */
    private Codec codec;

    private ExecutorService executor;

    /**
     * Config option for enabling Redisson Reference feature.
     * Default value is TRUE
     */
    private boolean referenceEnabled = true;

    private TransportMode transportMode = TransportMode.NIO;

    private EventLoopGroup eventLoopGroup;
    //锁默认释放时间,30秒
    private long lockWatchdogTimeout = 30 * 1000;

    private boolean keepPubSubOrder = true;

    private boolean decodeInExecutor = false;

    private boolean useScriptCache = false;

    private int minCleanUpDelay = 5;

    private int maxCleanUpDelay = 30*60;

    private int cleanUpKeysAmount = 100;
    
    /**
     * AddressResolverGroupFactory switch between default and round robin
     */
    private AddressResolverGroupFactory addressResolverGroupFactory = new DnsAddressResolverGroupFactory();

使用方式:

// 1. 构造Config对象
Config = ...

// 2. 构造Redisson实例
RedissonClient redisson = Redisson.create(config);

// 3. 获取需要的对象
RMap map = redisson.getMap("myMap");

RLock lock = redisson.getLock("myLock");

RExecutorService executor = redisson.getExecutorService("myExecutorService");

锁 Lock

了解Redisson的锁,最好先了解Java JUC包中的各种锁。

Java Lock接口

//package java.util.concurrent.locks;
public interface Lock {
    void lock();
    void lockInterruptibly() throws InterruptedException;
    boolean tryLock();
    boolean tryLock(long time, TimeUnit unit) throws InterruptedException;
    void unlock();
    Condition newCondition();
}

主要实现类:

image-20210808160559862

RLock(重入锁)

RLock实现Java的重入锁。扩展了 Lock

public interface RLock extends Lock, RLockAsync {
//获取锁对象名称
    String getName();

    void lockInterruptibly(long leaseTime, TimeUnit unit) throws InterruptedException;
    boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException;
    void lock(long leaseTime, TimeUnit unit);

    /**强制释放锁。如果锁成功释放则返回true。
     */
    boolean forceUnlock();

    /**是否被任何线程锁定
     */
    boolean isLocked();

    /**
     * 是否被线程锁定
     */
    boolean isHeldByThread(long threadId);

    /**
     * 是否被当前线程锁定
     */
    boolean isHeldByCurrentThread();

    /**被当前线程锁定次数(可冲入)
     */
    int getHoldCount();

    long remainTimeToLive();
    
}

锁的异步操作,都返回一个RFuture

public interface RLockAsync {
    RFuture<Boolean> forceUnlockAsync();
    RFuture<Void> unlockAsync();
    RFuture<Void> unlockAsync(long threadId);
    RFuture<Boolean> tryLockAsync();
    RFuture<Void> lockAsync();
    RFuture<Void> lockAsync(long threadId);
    RFuture<Void> lockAsync(long leaseTime, TimeUnit unit);
    RFuture<Void> lockAsync(long leaseTime, TimeUnit unit, long threadId);
    RFuture<Boolean> tryLockAsync(long threadId);
    RFuture<Boolean> tryLockAsync(long waitTime, TimeUnit unit);
    RFuture<Boolean> tryLockAsync(long waitTime, long leaseTime, TimeUnit unit);
    RFuture<Boolean> tryLockAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId);
    RFuture<Integer> getHoldCountAsync();
    RFuture<Boolean> isLockedAsync();
    RFuture<Long> remainTimeToLiveAsync();
}

RLock源码

获取锁getLock
    @Override
    public RLock getLock(String name) {
        return new RedissonLock(connectionManager.getCommandExecutor(), name);
    }

返回一个RedissonLock对象。

RedissonLock派生于RedissonExpirable,实现了RLock

public class RedissonLock extends RedissonExpirable implements RLock {
  	//存储entryName和其过期时间
    private static final ConcurrentMap<String, ExpirationEntry> EXPIRATION_RENEWAL_MAP = new ConcurrentHashMap<>();
     //锁默认释放时间,30秒
    protected long internalLockLeaseTime;
   
    final String id;
    final String entryName;

    protected final LockPubSub pubSub;
		//命令执行器,异步执行器
    final CommandAsyncExecutor commandExecutor;
  
    public RedissonLock(CommandAsyncExecutor commandExecutor, String name) {
        super(commandExecutor, name);
        this.commandExecutor = commandExecutor;
        this.id = commandExecutor.getConnectionManager().getId();
        this.internalLockLeaseTime = commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout();
        this.entryName = id + ":" + name;
        this.pubSub = commandExecutor.getConnectionManager().getSubscribeService().getLockPubSub();
    }  
}
lock()
    @Override
    public void lock() {
        try {
            lock(-1, null, false);
        } catch (InterruptedException e) {
            throw new IllegalStateException();
        }
    }

    @Override
    public void lock(long leaseTime, TimeUnit unit) {
        try {
            lock(leaseTime, unit, false);
        } catch (InterruptedException e) {
            throw new IllegalStateException();
        }
    }

		
	  /**
	  * leaseTime:锁过期时间,
	  * interruptibly:是否允许中断。
	  */
    private void lock(long leaseTime, TimeUnit unit, boolean   interruptibly ) throws InterruptedException {
        long threadId = Thread.currentThread().getId();
        //加锁。 先尝试一次。
        Long ttl = tryAcquire(leaseTime, unit, threadId);
        // lock acquired,null表示加锁成功。
        if (ttl == null) {
            return;
        }
			//加锁失败,则:订阅锁,等待锁释放。
        RFuture<RedissonLockEntry> future = subscribe(threadId);
        if (interruptibly) {
            commandExecutor.syncSubscriptionInterrupted(future);
        } else {
            commandExecutor.syncSubscription(future);
        }

        try {
          	/**死循环,直到满足:
          	
          	*/
            while (true) {
              //再次尝试。
                ttl = tryAcquire(leaseTime, unit, threadId);
                // lock acquired。获取到锁。
                if (ttl == null) {
                    break;
                }

                // waiting for message。
                //通过 Semaphore  来阻塞当前线程。
                if (ttl >= 0) {
                    try {
                        future.getNow().getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
                    } catch (InterruptedException e) {
                        if (interruptibly) {
                            throw e;
                        }
                        future.getNow().getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
                    }
                } else {
                  //ttl < 0 表示可以获取了。
                    if (interruptibly) {
                        future.getNow().getLatch().acquire();
                    } else {
                        future.getNow().getLatch().acquireUninterruptibly();
                    }
                }
            }
        } finally {
            unsubscribe(future, threadId);
        }
//        get(lockAsync(leaseTime, unit));
    }
    
    private Long tryAcquire(long leaseTime, TimeUnit unit, long threadId) {
        return get(tryAcquireAsync(leaseTime, unit, threadId));
    }

    private <T> RFuture<Long> tryAcquireAsync(long leaseTime, TimeUnit unit, long threadId) {
      //设置了过期时间
        if (leaseTime != -1) {
            return tryLockInnerAsync(leaseTime, unit, threadId, RedisCommands.EVAL_LONG);
        }
      //没有设置过期,则定时续期。
        RFuture<Long> ttlRemainingFuture = tryLockInnerAsync(commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(), TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);
        ttlRemainingFuture.onComplete((ttlRemaining, e) -> {
            if (e != null) {
                return;
            }

            // lock acquired
            if (ttlRemaining == null) {
              //设置 watch dog ,每1/3 * expire 设置一次超时
                scheduleExpirationRenewal(threadId);
            }
        });
        return ttlRemainingFuture;
    }
 
	/*加锁,返回nil表示加锁成功,其他表示其他锁的剩余时间。
	*/
    <T> RFuture<T> tryLockInnerAsync(long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
        internalLockLeaseTime = unit.toMillis(leaseTime);
//redis执行lua脚本,KEYS下标从1开始。
//KYES[1]:是key名称,ARGV[1]是超时时间,ARGV[2]是KEY的 value。
//getLockName返回格式:{collection id} + ':' +  {threadId}
        return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, command,
                  //不存在hset 则创建。设置第一个entry是key,value。
                  "if (redis.call('exists', KEYS[1]) == 0) then " +
                       //为key增加field,并设置field的值为1。
                      "redis.call('hset', KEYS[1], ARGV[2], 1); " +
                      "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                      "return nil; " +
                  "end; " +
                  //存在key,则判断当前线程是否已加锁(field存在),存在则把引用计数加1。
                  "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
                      "redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
                      "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                      "return nil; " +
                  "end; " +
                  //返回剩余过期时间:毫秒                            
                  "return redis.call('pttl', KEYS[1]);",
                    Collections.<Object>singletonList(getName()), internalLockLeaseTime, getLockName(threadId));
    }
    
    protected RFuture<RedissonLockEntry> subscribe(long threadId) {
        return pubSub.subscribe(getEntryName(), getChannelName());
    }
image-20210809154939864
watch dog

用于KEY 续期,以免锁过期导致问题。使用的是netty的 时间轮。

private void scheduleExpirationRenewal(long threadId) {
    ExpirationEntry entry = new ExpirationEntry();
    ExpirationEntry oldEntry = EXPIRATION_RENEWAL_MAP.putIfAbsent(getEntryName(), entry);
    if (oldEntry != null) {
        oldEntry.addThreadId(threadId);
    } else {
        entry.addThreadId(threadId);
        renewExpiration();
    }
}

   public static class ExpirationEntry {
        
        private final Map<Long, Integer> threadIds = new LinkedHashMap<>();
        private volatile Timeout timeout;
   }     


    private void renewExpiration() {
        ExpirationEntry ee = EXPIRATION_RENEWAL_MAP.get(getEntryName());
        if (ee == null) {
            return;
        }
        //
        Timeout task = commandExecutor.getConnectionManager().newTimeout(new TimerTask() {
            @Override
            public void run(Timeout timeout) throws Exception {
                ExpirationEntry ent = EXPIRATION_RENEWAL_MAP.get(getEntryName());
                if (ent == null) {
                    return;
                }
                Long threadId = ent.getFirstThreadId();
                if (threadId == null) {
                    return;
                }
                //
                RFuture<Boolean> future = renewExpirationAsync(threadId);
                future.onComplete((res, e) -> {
                    if (e != null) {
                        log.error("Can't update lock " + getName() + " expiration", e);
                        return;
                    }
                    
                    if (res) {
                        // reschedule itself
                        renewExpiration();
                    }
                });
            }
        }, internalLockLeaseTime / 3, TimeUnit.MILLISECONDS);
        
        ee.setTimeout(task);
    }
    
    protected RFuture<Boolean> renewExpirationAsync(long threadId) {
        return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
                "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
                    "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                    "return 1; " +
                "end; " +
                "return 0;",
            Collections.<Object>singletonList(getName()), 
            internalLockLeaseTime, getLockName(threadId));
    }
RedissonExpirable

RedissonExpirable是个抽象类,实现了过期操作。

image-20210808091938473

RedissonObject:Redis对象。

public abstract class RedissonObject implements RObject {

    protected final CommandAsyncExecutor commandExecutor;
    //对象名称
    protected String name;
    //编码
    protected final Codec codec;
}    
unlock()
@Override
public void unlock() {
  try {
  	get(unlockAsync(Thread.currentThread().getId()));
  } catch (RedisException e) {
    if (e.getCause() instanceof IllegalMonitorStateException) {
   	 throw (IllegalMonitorStateException) e.getCause();
    } else {
    	throw e;
    }
  }
}        

   @Override
    public RFuture<Void> unlockAsync(long threadId) {
        RPromise<Void> result = new RedissonPromise<Void>();
        RFuture<Boolean> future = unlockInnerAsync(threadId);
				//
        future.onComplete((opStatus, e) -> {
            cancelExpirationRenewal(threadId);

            if (e != null) {
                result.tryFailure(e);
                return;
            }

            if (opStatus == null) {
                IllegalMonitorStateException cause = new IllegalMonitorStateException("attempt to unlock lock, not locked by current thread by node id: "
                        + id + " thread-id: " + threadId);
                result.tryFailure(cause);
                return;
            }
					//设置处理成功
            result.trySuccess(null);
        });

        return result;
    }

    protected RFuture<Boolean> unlockInnerAsync(long threadId) {
      /*
      KEY[1]:key name
      KEY[2]:
      ARGV[1]:
      ARGV[2]:
      ARGV[3]: field name
      */
        return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
                "if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " +
                    "return nil;" +
                "end; " +
                //计数减1.
                "local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " +
                //计数仍然大于0,则设置过期时间。
                "if (counter > 0) then " +
                    "redis.call('pexpire', KEYS[1], ARGV[2]); " +
                    "return 0; " +
                //计数等于0,则删除key,                              
                "else " +
                    "redis.call('del', KEYS[1]); " +
                    "redis.call('publish', KEYS[2], ARGV[1]); " +
                    "return 1; "+
                "end; " +
                "return nil;",
                Arrays.<Object>asList(getName(), getChannelName()), LockPubSub.UNLOCK_MESSAGE, internalLockLeaseTime, getLockName(threadId));

    }


    @Override
    public RFuture<Boolean> deleteAsync() {
        return forceUnlockAsync();
    }

    @Override
    public RFuture<Boolean> forceUnlockAsync() {
        cancelExpirationRenewal(null);
        return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
                "if (redis.call('del', KEYS[1]) == 1) then "
                + "redis.call('publish', KEYS[2], ARGV[1]); "
                + "return 1 "
                + "else "
                + "return 0 "
                + "end",
                Arrays.<Object>asList(getName(), getChannelName()), LockPubSub.UNLOCK_MESSAGE);
    }
    
    protected void unsubscribe(RFuture<RedissonLockEntry> future, long threadId) {
        pubSub.unsubscribe(future.getNow(), getEntryName(), getChannelName());
    }

状态控制
    @Override
    public boolean isHeldByThread(long threadId) {
      //判断hset的field是否存在,2个参数:getName(),getLockName(threadId)
        RFuture<Boolean> future = commandExecutor.writeAsync(getName(), LongCodec.INSTANCE, RedisCommands.HEXISTS, getName(), getLockName(threadId));
        return get(future);
    }

    @Override
    public int getHoldCount() {
        return get(getHoldCountAsync());
    }
    public RFuture<Integer> getHoldCountAsync() {
        return commandExecutor.writeAsync(getName(), LongCodec.INSTANCE, HGET, getName(), getLockName(Thread.currentThread().getId()));
    }
    

公平锁(RedissonFairLock)

RedissonFairLock是RedissonLock的子类。实现公平锁。

公平锁的设计思路是通过List实现等待队列。

public class RedissonFairLock extends RedissonLock implements RLock {

    private final long threadWaitTime;
    private final CommandAsyncExecutor commandExecutor;
  	//等待队列名称
    private final String threadsQueueName;
    private final String timeoutSetName;
}

tryLockInnerAsync()


    @Override
    <T> RFuture<T> tryLockInnerAsync(long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
        internalLockLeaseTime = unit.toMillis(leaseTime);
			
        long currentTime = System.currentTimeMillis();
        if (command == RedisCommands.EVAL_NULL_BOOLEAN) {
          	//KEY[1]:name。
            //KEY[2]:threadsQueue ,线程队列(list),元素为threadid。
            //KEY[3]:timeoutSet
            return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, command,
                    // remove stale threads //把所有过期时间小于当前时间的都抛弃掉。
                    "while true do " +
                        //获取线程队列(list)第一个元素。firstThreadId2:第1个元素。
                        "local firstThreadId2 = redis.call('lindex', KEYS[2], 0);" +
                         //没有等待对象,直接跳出。
                        "if firstThreadId2 == false then " +
                            "break;" +
                        "end;" +
                        //第一个元素的的score:超时时间
                        "local timeout = tonumber(redis.call('zscore', KEYS[3], firstThreadId2));" +
                        //超时了,则从timeoutSet移除元素,threadsQueue 移除第一个元素
                        "if timeout <= tonumber(ARGV[3]) then " +
                            // remove the item from the queue and timeout set
                            // NOTE we do not alter any other timeout
                            "redis.call('zrem', KEYS[3], firstThreadId2);" +
                            "redis.call('lpop', KEYS[2]);" +
                        "else " +
                            "break;" +
                        "end;" +
                    "end;" +
												//不存在KEY,并且 threadsQueue也无元素 或者 是第一个元素当前线程。则移除当前线程的元素。
                    "if (redis.call('exists', KEYS[1]) == 0) " +          //锁存在
                        "and ((redis.call('exists', KEYS[2]) == 0) " +    // 锁没有被任何线程持有。
                            "or (redis.call('lindex', KEYS[2], 0) == ARGV[2])) then " +  //当前线程持有锁。
                        "redis.call('lpop', KEYS[2]);" +                     //移除自等待队列
                        "redis.call('zrem', KEYS[3], ARGV[2]);" + 

                        // decrease timeouts for all waiting in the queue
                        //所有等待线程的超时减少。
                        "local keys = redis.call('zrange', KEYS[3], 0, -1);" +
                        "for i = 1, #keys, 1 do " +
                            "redis.call('zincrby', KEYS[3], -tonumber(ARGV[4]), keys[i]);" +
                        "end;" +

                        "redis.call('hset', KEYS[1], ARGV[2], 1);" +
                        "redis.call('pexpire', KEYS[1], ARGV[1]);" +
                        "return nil;" +
                    "end;" +
                    //是重入(再次加锁)                              
                    "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
                        "redis.call('hincrby', KEYS[1], ARGV[2], 1);" +
                        "redis.call('pexpire', KEYS[1], ARGV[1]);" +
                        "return nil;" +
                    "end;" +
                    "return 1;",
                    Arrays.<Object>asList(getName(), threadsQueueName, timeoutSetName),
                    internalLockLeaseTime, getLockName(threadId), currentTime, threadWaitTime);
        }

        if (command == RedisCommands.EVAL_LONG) {
            return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, command,
                    // remove stale threads //把所有过期时间小于当前时间的都抛弃掉。
                    "while true do " +
                        "local firstThreadId2 = redis.call('lindex', KEYS[2], 0);" +
                        "if firstThreadId2 == false then " +
                            "break;" +
                        "end;" +

                        "local timeout = tonumber(redis.call('zscore', KEYS[3], firstThreadId2));" +
                        "if timeout <= tonumber(ARGV[4]) then " +
                            // remove the item from the queue and timeout set
                            // NOTE we do not alter any other timeout
                            "redis.call('zrem', KEYS[3], firstThreadId2);" +
                            "redis.call('lpop', KEYS[2]);" +
                        "else " +
                            "break;" +
                        "end;" +
                    "end;" +

                    // check if the lock can be acquired now
                    //可以获取到锁的情况。                              
                    "if (redis.call('exists', KEYS[1]) == 0) " +
                        "and ((redis.call('exists', KEYS[2]) == 0) " +
                            "or (redis.call('lindex', KEYS[2], 0) == ARGV[2])) then " +

                        // remove this thread from the queue and timeout set
                        // 当前线程获取到锁, 从queue、timeout中移除
                        "redis.call('lpop', KEYS[2]);" +
                        "redis.call('zrem', KEYS[3], ARGV[2]);" +

                        // decrease timeouts for all waiting in the queue
                        //把等待队列中的都减少超时时间。
                        "local keys = redis.call('zrange', KEYS[3], 0, -1);" +  
                        "for i = 1, #keys, 1 do " +
                            "redis.call('zincrby', KEYS[3], -tonumber(ARGV[3]), keys[i]);" +
                        "end;" +

                        // acquire the lock and set the TTL for the lease
                        "redis.call('hset', KEYS[1], ARGV[2], 1);" +   // 标记当前锁已被某个线程获取
                        "redis.call('pexpire', KEYS[1], ARGV[1]);" +   // 设置标记的失效时常, 默认是 30 * 1000 ms
                        "return nil;" +
                    "end;" +

                    // check if the lock is already held, and this is a re-entry
                   //当前线程是再次重入。                               
                    "if redis.call('hexists', KEYS[1], ARGV[2]) == 1 then " +
                        "redis.call('hincrby', KEYS[1], ARGV[2],1);" +
                        "redis.call('pexpire', KEYS[1], ARGV[1]);" +
                        "return nil;" +
                    "end;" +

                    // the lock cannot be acquired
                    // check if the thread is already in the queue
                    "local timeout = redis.call('zscore', KEYS[3], ARGV[2]);" +
                    "if timeout ~= false then " +
                        // the real timeout is the timeout of the prior thread
                        // in the queue, but this is approximately correct, and
                        // avoids having to traverse the queue
                        "return timeout - tonumber(ARGV[3]) - tonumber(ARGV[4]);" +
                    "end;" +

                    // add the thread to the queue at the end, and set its timeout in the timeout set to the timeout of
                    // the prior thread in the queue (or the timeout of the lock if the queue is empty) plus the
                    // threadWaitTime
                    //获取最后一个等待线程的过期时间,然后处理,就是当前线程的等待时间。后加入后执行。                              
                    "local lastThreadId = redis.call('lindex', KEYS[2], -1);" +
                    "local ttl;" +
                    "if lastThreadId ~= false and lastThreadId ~= ARGV[2] then " +
                        "ttl = tonumber(redis.call('zscore', KEYS[3], lastThreadId)) - tonumber(ARGV[4]);" +
                    "else " +
                        "ttl = redis.call('pttl', KEYS[1]);" +
                    "end;" +
                    "local timeout = ttl + tonumber(ARGV[3]) + tonumber(ARGV[4]);" +
                    //获取不到锁,当前线程,加入等待队列中。
                    "if redis.call('zadd', KEYS[3], timeout, ARGV[2]) == 1 then " +
                        "redis.call('rpush', KEYS[2], ARGV[2]);" +
                    "end;" +
                    "return ttl;",
                    Arrays.<Object>asList(getName(), threadsQueueName, timeoutSetName),
                    internalLockLeaseTime, getLockName(threadId), threadWaitTime, currentTime);
        }

        throw new IllegalArgumentException();
    }

image-20210809173929271

加锁成功条件:

1、KEY:xxx 不存在

2、KEY:xxx 存在,并且 ( KEY:redisson_lock_timeout:{xxxx} 不存在或者 第一个等待 线程是当前线程 )。

unlock()

    @Override
    protected RFuture<Boolean> unlockInnerAsync(long threadId) {
        return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
                // remove stale threads。 //把所有过期时间小于当前时间的都抛弃掉。
                "while true do "
                //等待队列中取第一个线程。                              
                + "local firstThreadId2 = redis.call('lindex', KEYS[2], 0);"
                + "if firstThreadId2 == false then "
                    + "break;"
                + "end; "
                  //把所有过期时间小于当前时间的都抛弃掉。
                + "local timeout = tonumber(redis.call('zscore', KEYS[3], firstThreadId2));"
                + "if timeout <= tonumber(ARGV[4]) then "
                    + "redis.call('zrem', KEYS[3], firstThreadId2); "
                    + "redis.call('lpop', KEYS[2]); "
                + "else "
                    + "break;"
                + "end; "
              + "end;"
               
              + "if (redis.call('exists', KEYS[1]) == 0) then " + 
                    "local nextThreadId = redis.call('lindex', KEYS[2], 0); " + 
                    "if nextThreadId ~= false then " +
                        "redis.call('publish', KEYS[4] .. ':' .. nextThreadId, ARGV[1]); " +
                    "end; " +
                    "return 1; " +
                "end;" +
                //当前线程没有引用计数,(没在等待队列)。                              
                "if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " +
                    "return nil;" +
                "end; " +
                //当前线程 释放一次锁(引用计数减1)。如果计数大于0,则设置过期时间。                              
                "local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " +
                "if (counter > 0) then " +
                    "redis.call('pexpire', KEYS[1], ARGV[2]); " +
                    "return 0; " +
                "end; " +
                //删除锁。
                "redis.call('del', KEYS[1]); " +
                "local nextThreadId = redis.call('lindex', KEYS[2], 0); " + 
                "if nextThreadId ~= false then " +
                 //发布释放锁消息。
                    "redis.call('publish', KEYS[4] .. ':' .. nextThreadId, ARGV[1]); " +
                "end; " +
                "return 1; ",
                Arrays.<Object>asList(getName(), threadsQueueName, timeoutSetName, getChannelName()), 
                LockPubSub.UNLOCK_MESSAGE, internalLockLeaseTime, getLockName(threadId), System.currentTimeMillis());
    }

RedissonMultiLock

MultiLock可以将多个锁合并为一个大锁,对一个大锁进行统一的申请加锁以及释放锁,一次性锁定多个资源,再去处理一些事情,然后一次性释放所有的资源对应的锁。

public class RedissonMultiLock implements RLock {
  //存储多个RLock
 final List<RLock> locks = new ArrayList<>();
//传入多个锁。
    public RedissonMultiLock(RLock... locks) {
        if (locks.length == 0) {
            throw new IllegalArgumentException("Lock objects are not defined");
        }
        this.locks.addAll(Arrays.asList(locks));
    }  
}

//使用
RedissonClient redisson = Redisson.create(config);

RLock lock1 = redisson.getLock("lock1");
RLock lock2 = redisson.getLock("lock2");
RLock lock3 = redisson.getLock("lock3");

RedissonMultiLock lock = new RedissonMultiLock(lock1,lock2,lock3);

lock.lock();

lock.unlock();

lock()

    public RFuture<Void> lockAsync(long leaseTime, TimeUnit unit, long threadId) {
        long baseWaitTime = locks.size() * 1500;
        long waitTime = -1;
        if (leaseTime == -1) {
            waitTime = baseWaitTime;
        } else {
            leaseTime = unit.toMillis(leaseTime);
            waitTime = leaseTime;
            if (waitTime <= 2000) {
                waitTime = 2000;
            } else if (waitTime <= baseWaitTime) {
                waitTime = ThreadLocalRandom.current().nextLong(waitTime/2, waitTime);
            } else {
                waitTime = ThreadLocalRandom.current().nextLong(baseWaitTime, waitTime);
            }
        }
        
        RPromise<Void> result = new RedissonPromise<Void>();
        tryLockAsync(threadId, leaseTime, TimeUnit.MILLISECONDS, waitTime, result);
        return result;
    }
    
    @Override
    public RFuture<Boolean> tryLockAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId) {
        RPromise<Boolean> result = new RedissonPromise<Boolean>();
        LockState state = new LockState(waitTime, leaseTime, unit, threadId);
        state.tryAcquireLockAsync(locks.listIterator(), result);
        return result;
    }

RReadWriteLock

示例:

 RReadWriteLock rwLock = redisson.getReadWriteLock("anyRWLock");
 // 代码片段
 rwLock.readLock().lock();
 rwLock.readLock().unlock();
 rwLock.writeLock().lock();
 rwLock.writeLock().unlock();
public interface RReadWriteLock extends ReadWriteLock {
    @Override
    RLock readLock();

    @Override
    RLock writeLock();

}

public class RedissonReadWriteLock extends RedissonExpirable implements RReadWriteLock {
    @Override
    public RLock readLock() {
        return new RedissonReadLock(commandExecutor, getName());
    }

    @Override
    public RLock writeLock() {
        return new RedissonWriteLock(commandExecutor, getName());
    }

}

RedissonReadLock

    
    @Override
    <T> RFuture<T> tryLockInnerAsync(long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
        internalLockLeaseTime = unit.toMillis(leaseTime);

        return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, command,
                                 //获取当前是锁模式。read/write。             
                                "local mode = redis.call('hget', KEYS[1], 'mode'); " +
                                 //未加锁。             
                                "if (mode == false) then " +
                                  "redis.call('hset', KEYS[1], 'mode', 'read'); " +  //设置read 模式。
                                  "redis.call('hset', KEYS[1], ARGV[2], 1); " +      //设置锁计数。
                                  "redis.call('set', KEYS[2] .. ':1', 1); " +        //超时信息
                                  "redis.call('pexpire', KEYS[2] .. ':1', ARGV[1]); " +
                                  "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                                  "return nil; " +
                                "end; " +
                                 //如果是读模式,或者 写模式下,当前线程持有锁。             
                                "if (mode == 'read') or (mode == 'write' and redis.call('hexists', KEYS[1], ARGV[3]) == 1) then " +
                                  "local ind = redis.call('hincrby', KEYS[1], ARGV[2], 1); " + //当前线程锁计数加1,
                                  "local key = KEYS[2] .. ':' .. ind;" +
                                  "redis.call('set', key, 1); " +
                                  "redis.call('pexpire', key, ARGV[1]); " +
                                  "local remainTime = redis.call('pttl', KEYS[1]); " +
                                  "redis.call('pexpire', KEYS[1], math.max(remainTime, ARGV[1])); " +
                                  "return nil; " +
                                "end;" +
                                "return redis.call('pttl', KEYS[1]);",
                        Arrays.<Object>asList(getName(), getReadWriteTimeoutNamePrefix(threadId)), 
                        internalLockLeaseTime, getLockName(threadId), getWriteLockName(threadId));
    }

    @Override
    protected RFuture<Boolean> unlockInnerAsync(long threadId) {
        String timeoutPrefix = getReadWriteTimeoutNamePrefix(threadId);
        String keyPrefix = getKeyPrefix(threadId, timeoutPrefix);

        return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
                "local mode = redis.call('hget', KEYS[1], 'mode'); " +
                "if (mode == false) then " +           //无锁,pub 消息
                    "redis.call('publish', KEYS[2], ARGV[1]); " +
                    "return 1; " +
                "end; " +
                "local lockExists = redis.call('hexists', KEYS[1], ARGV[2]); " +
                "if (lockExists == 0) then " +
                    "return nil;" +
                "end; " +
                    
                "local counter = redis.call('hincrby', KEYS[1], ARGV[2], -1); " +   //锁计数减1 。
                "if (counter == 0) then " +
                    "redis.call('hdel', KEYS[1], ARGV[2]); " +       //删除写锁。
                "end;" +
                "redis.call('del', KEYS[3] .. ':' .. (counter+1)); " +
                
                "if (redis.call('hlen', KEYS[1]) > 1) then " +
                    "local maxRemainTime = -3; " + 
                    "local keys = redis.call('hkeys', KEYS[1]); " + 
                    "for n, key in ipairs(keys) do " + 
                        "counter = tonumber(redis.call('hget', KEYS[1], key)); " + 
                        "if type(counter) == 'number' then " + 
                            "for i=counter, 1, -1 do " + 
                                "local remainTime = redis.call('pttl', KEYS[4] .. ':' .. key .. ':rwlock_timeout:' .. i); " + 
                                "maxRemainTime = math.max(remainTime, maxRemainTime);" + 
                            "end; " + 
                        "end; " + 
                    "end; " +
                            
                    "if maxRemainTime > 0 then " +
                        "redis.call('pexpire', KEYS[1], maxRemainTime); " +
                        "return 0; " +
                    "end;" + 
                        
                    "if mode == 'write' then " + 
                        "return 0;" + 
                    "end; " +
                "end; " +
                    
                "redis.call('del', KEYS[1]); " +
                "redis.call('publish', KEYS[2], ARGV[1]); " +
                "return 1; ",
                Arrays.<Object>asList(getName(), getChannelName(), timeoutPrefix, keyPrefix), 
                LockPubSub.UNLOCK_MESSAGE, getLockName(threadId));
    }

RedissonWriteLock

   @Override
    <T> RFuture<T> tryLockInnerAsync(long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
        internalLockLeaseTime = unit.toMillis(leaseTime);

        return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, command,
                            "local mode = redis.call('hget', KEYS[1], 'mode'); " +
                            "if (mode == false) then " +         //未加锁
                                  "redis.call('hset', KEYS[1], 'mode', 'write'); " +       
                                  "redis.call('hset', KEYS[1], ARGV[2], 1); " +
                                  "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                                  "return nil; " +
                              "end; " +
                              "if (mode == 'write') then " +        //已存在写锁
                                  "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " + //是当前线程持有锁。
                                      "redis.call('hincrby', KEYS[1], ARGV[2], 1); " + 
                                      "local currentExpire = redis.call('pttl', KEYS[1]); " +
                                      "redis.call('pexpire', KEYS[1], currentExpire + ARGV[1]); " +
                                      "return nil; " +
                                  "end; " +
                                "end;" +
                                "return redis.call('pttl', KEYS[1]);",     //不是获取锁,返回过期剩余时间。
                        Arrays.<Object>asList(getName()), 
                        internalLockLeaseTime, getLockName(threadId));
    }

    @Override
    protected RFuture<Boolean> unlockInnerAsync(long threadId) {
        return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
                "local mode = redis.call('hget', KEYS[1], 'mode'); " +
                "if (mode == false) then " +
                    "redis.call('publish', KEYS[2], ARGV[1]); " +
                    "return 1; " +
                "end;" +
                "if (mode == 'write') then " +
                    "local lockExists = redis.call('hexists', KEYS[1], ARGV[3]); " +
                    "if (lockExists == 0) then " +
                        "return nil;" +
                    "else " +
                        "local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " +
                        "if (counter > 0) then " +
                            "redis.call('pexpire', KEYS[1], ARGV[2]); " +
                            "return 0; " +
                        "else " +
                            "redis.call('hdel', KEYS[1], ARGV[3]); " +
                            "if (redis.call('hlen', KEYS[1]) == 1) then " +
                                "redis.call('del', KEYS[1]); " +
                                "redis.call('publish', KEYS[2], ARGV[1]); " + 
                            "else " +
                                // has unlocked read-locks
                                "redis.call('hset', KEYS[1], 'mode', 'read'); " +
                            "end; " +
                            "return 1; "+
                        "end; " +
                    "end; " +
                "end; "
                + "return nil;",
        Arrays.<Object>asList(getName(), getChannelName()), 
        LockPubSub.READ_UNLOCK_MESSAGE, internalLockLeaseTime, getLockName(threadId));
    }
    

CountDownLatch

public interface RCountDownLatch extends RObject, RCountDownLatchAsync {
    void await() throws InterruptedException;
    boolean await(long timeout, TimeUnit unit) throws InterruptedException;
    void countDown();
    long getCount();
    boolean trySetCount(long count);
}

public class RedissonCountDownLatch extends RedissonObject implements RCountDownLatch {

    private final CountDownLatchPubSub pubSub;
    private final String id;
}    

trySetCount

    @Override
    public boolean trySetCount(long count) {
        return get(trySetCountAsync(count));
    }

    @Override
    public RFuture<Boolean> trySetCountAsync(long count) {
        return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
               //设置KEY的值为:count,并发布        NEW_COUNT_MESSAGE 消息。                       
                "if redis.call('exists', KEYS[1]) == 0 then "
                    + "redis.call('set', KEYS[1], ARGV[2]); "
                    + "redis.call('publish', KEYS[2], ARGV[1]); "
                    + "return 1 "
                + "else "
                    + "return 0 "
                + "end",
                Arrays.<Object>asList(getName(), getChannelName()), CountDownLatchPubSub.NEW_COUNT_MESSAGE, count);
    }

await

    @Override
    public void await() throws InterruptedException {
        if (getCount() == 0) {
            return;
        }

        RFuture<RedissonCountDownLatchEntry> future = subscribe();
        try {
            commandExecutor.syncSubscriptionInterrupted(future);

            while (getCount() > 0) {
                // waiting for open state
                future.getNow().getLatch().await();
            }
        } finally {
            unsubscribe(future);
        }
    }

    @Override
    public long getCount() {
        return get(getCountAsync());
    }

    @Override
    public RFuture<Long> getCountAsync() {
        return commandExecutor.writeAsync(getName(), LongCodec.INSTANCE, RedisCommands.GET_LONG, getName());
    }

countDown

    @Override
    public void countDown() {
        get(countDownAsync());
    }
    
    @Override
    public RFuture<Void> countDownAsync() {
        return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
                        "local v = redis.call('decr', KEYS[1]);" +
                        "if v <= 0 then redis.call('del', KEYS[1]) end;" +
                        "if v == 0 then redis.call('publish', KEYS[2], ARGV[1]) end;",
                    Arrays.<Object>asList(getName(), getChannelName()), CountDownLatchPubSub.ZERO_COUNT_MESSAGE);
    }

pubsub

Java的Lock在获取不到锁的时候,会阻塞,然后通过notify(),notifyAll()等来唤醒。Redisson的锁,类似的流程,使用了Redis的 pubsub 功能来唤醒线程。

RLock在获取锁时,会创建一个订阅。

private void lock(long leaseTime, TimeUnit unit, boolean interruptibly) throws InterruptedException
{
        RFuture<RedissonLockEntry> future = subscribe(threadId);
        if (interruptibly) {
            commandExecutor.syncSubscriptionInterrupted(future);
        } else {
            commandExecutor.syncSubscription(future);
        }

        try {
            while (true) {
             ......
            }
        } finally {
            unsubscribe(future, threadId);
        }
}

    //protected final LockPubSub pubSub;
		protected RFuture<RedissonLockEntry> subscribe(long threadId) {
        return pubSub.subscribe(getEntryName(), getChannelName());
    }
    protected void unsubscribe(RFuture<RedissonLockEntry> future, long threadId) {
        pubSub.unsubscribe(future.getNow(), getEntryName(), getChannelName());
    }

		//channel名称:redisson_lock__channel:{XXX}
    String getChannelName() {
        return prefixName("redisson_lock__channel", getName());
    }

LockPubSub是PublishSubscribe的一个子类。

RedissonLockEntry

RedissonLockEntry用于控制阻塞,唤醒。

public interface PubSubEntry<E> {

    void acquire();

    int release();

    RPromise<E> getPromise();

}

public class RedissonLockEntry implements PubSubEntry<RedissonLockEntry> {
	  //计数。
    private int counter;
    //使用Java 信号量进行阻塞
    private final Semaphore latch;
    private final RPromise<RedissonLockEntry> promise;
    //监听器
    private final ConcurrentLinkedQueue<Runnable> listeners = new ConcurrentLinkedQueue<Runnable>();

    public RedissonLockEntry(RPromise<RedissonLockEntry> promise) {
        super();
        this.latch = new Semaphore(0);// 信号量是0,则acquire时阻塞。
        this.promise = promise;
    }

    public void acquire() {
        counter++;  //计算加1
    }

    public int release() {
        return --counter; //计算减1
    }

    public RPromise<RedissonLockEntry> getPromise() {
        return promise;
    }

    public void addListener(Runnable listener) {
        listeners.add(listener);
    }

    public boolean removeListener(Runnable listener) {
        return listeners.remove(listener);
    }

    public ConcurrentLinkedQueue<Runnable> getListeners() {
        return listeners;
    }

    public Semaphore getLatch() {
        return latch;
    }

}

PublishSubscribe


abstract class PublishSubscribe<E extends PubSubEntry<E>> {
   //
    private final PublishSubscribeService service;
  //PubSubEntry 的map。
    private final ConcurrentMap<String, E> entries = new ConcurrentHashMap<>();
    PublishSubscribe(PublishSubscribeService service) {
        super();
        this.service = service;
    }
  //创建订阅
    public RFuture<E> subscribe(String entryName, String channelName) {
        AtomicReference<Runnable> listenerHolder = new AtomicReference<Runnable>();
        AsyncSemaphore semaphore = service.getSemaphore(new ChannelName(channelName));
        RPromise<E> newPromise = new RedissonPromise<E>() {
            @Override
            public boolean cancel(boolean mayInterruptIfRunning) {
                return semaphore.remove(listenerHolder.get());
            }
        };
				//listener
        Runnable listener = new Runnable() {

            @Override
            public void run() {
              //获取Entry
                E entry = entries.get(entryName);
                if (entry != null) {
                    entry.acquire(); 
                    semaphore.release();
                    entry.getPromise().onComplete(new TransferListener<E>(newPromise));
                    return;
                }
                //创建新Entry
                E value = createEntry(newPromise);
                value.acquire();
                //
                E oldValue = entries.putIfAbsent(entryName, value);
                if (oldValue != null) {
                    oldValue.acquire();
                    semaphore.release();
                    oldValue.getPromise().onComplete(new TransferListener<E>(newPromise));
                    return;
                }
                //创建Listener。
                RedisPubSubListener<Object> listener = createListener(channelName, value);
                service.subscribe(LongCodec.INSTANCE, channelName, semaphore, listener);
            }
        };
        semaphore.acquire(listener);
        listenerHolder.set(listener);
        
        return newPromise;
    }
  

    public void unsubscribe(E entry, String entryName, String channelName) {
        //
        AsyncSemaphore semaphore = service.getSemaphore(new ChannelName(channelName));
        semaphore.acquire(new Runnable() {
            @Override
            public void run() {
                if (entry.release() == 0) {//引用计数为0,则释放channel。
                    // just an assertion
                    boolean removed = entries.remove(entryName) == entry;
                    if (!removed) {
                        throw new IllegalStateException();
                    }
                    service.unsubscribe(new ChannelName(channelName), semaphore);
                } else {
                    semaphore.release();
                }
            }
        });

    }
  
  	//对子类的覆盖的方法的封装。
    private RedisPubSubListener<Object> createListener(String channelName, E value) {
        RedisPubSubListener<Object> listener = new BaseRedisPubSubListener() {

            @Override
            public void onMessage(CharSequence channel, Object message) {
                if (!channelName.equals(channel.toString())) {
                    return;
                }
								//onMessage
                PublishSubscribe.this.onMessage(value, (Long) message);
            }

            @Override
            public boolean onStatus(PubSubType type, CharSequence channel) {
                if (!channelName.equals(channel.toString())) {
                    return false;
                }

                if (type == PubSubType.SUBSCRIBE) {
                    value.getPromise().trySuccess(value);
                    return true;
                }
                return false;
            }

        };
        return listener;
    }

}

public class LockPubSub extends PublishSubscribe<RedissonLockEntry> {

    public static final Long UNLOCK_MESSAGE = 0L;
    public static final Long READ_UNLOCK_MESSAGE = 1L;

    public LockPubSub(PublishSubscribeService service) {
        super(service);
    }
    
    @Override
    protected RedissonLockEntry createEntry(RPromise<RedissonLockEntry> newPromise) {
        return new RedissonLockEntry(newPromise);
    }
		//消息处理
    @Override
    protected void onMessage(RedissonLockEntry value, Long message) {
      	//锁释放消息
        if (message.equals(UNLOCK_MESSAGE)) {
            Runnable runnableToExecute = value.getListeners().poll();
            if (runnableToExecute != null) {
                runnableToExecute.run();
            }
					//Semaphore release。
            value.getLatch().release();
        } else if (message.equals(READ_UNLOCK_MESSAGE)) {//读锁释放
            while (true) {
                Runnable runnableToExecute = value.getListeners().poll();
                if (runnableToExecute == null) {
                    break;
                }
                runnableToExecute.run();
            }
					//Semaphore release。
            value.getLatch().release(value.getLatch().getQueueLength());
        }
    }

}
public class SemaphorePubSub extends PublishSubscribe<RedissonLockEntry> {

    public SemaphorePubSub(PublishSubscribeService service) {
        super(service);
    }

    @Override
    protected RedissonLockEntry createEntry(RPromise<RedissonLockEntry> newPromise) {
        return new RedissonLockEntry(newPromise);
    }

    @Override
    protected void onMessage(RedissonLockEntry value, Long message) {
        Runnable runnableToExecute = value.getListeners().poll();
        if (runnableToExecute != null) {
            runnableToExecute.run();
        }
        //Semaphore relese。
        value.getLatch().release(message.intValue());
    }

}

AsyncSemaphore

public class AsyncSemaphore {
  //信号量计数
    private volatile int counter;
  //
    private final Set<Entry> listeners = new LinkedHashSet<Entry>();
    private static class Entry {
        private Runnable runnable;
        private int permits;
        
    }  
  
    public AsyncSemaphore(int permits) {
        counter = permits;
    }  
  
    public void acquire(Runnable listener) {
        acquire(listener, 1);
    }
    
   public void acquire(Runnable listener, int permits) {
        boolean run = false;
        
        synchronized (this) {
            if (counter < permits) {   //信号量acquire 不满足,则把请求加到等待队列中。
                listeners.add(new Entry(listener, permits));
                return;
            } else {
                counter -= permits;  //满足,
                run = true;
            }
        }
        
        if (run) {       //满足就运行。
            listener.run();
        }
    }
    

    public void release() {
        Entry entryToAcquire = null;
        
        synchronized (this) {
            counter++;         //信号量加1.
            Iterator<Entry> iter = listeners.iterator();
            if (iter.hasNext()) {
                Entry entry = iter.next();
                if (entry.getPermits() <= counter) {           //唤醒一个可以满足的。
                    iter.remove();
                    entryToAcquire = entry;
                }
            }
        }
        
        if (entryToAcquire != null) {
            acquire(entryToAcquire.getRunnable(), entryToAcquire.getPermits());  //有唤醒的,继续请求。
        }
    }
  
}

PublishSubscribeService

PublishSubscribeService用于操作Redis,进行pubsub。

public class PublishSubscribeService {

    private static final Logger log = LoggerFactory.getLogger(PublishSubscribeService.class);
    //
    private final ConnectionManager connectionManager;
    //
    private final MasterSlaveServersConfig config;
    //AsyncSemaphore 缓存
    private final AsyncSemaphore [] locks = new AsyncSemaphore[50];
    
    private final AsyncSemaphore freePubSubLock = new AsyncSemaphore(1);
    
    private final ConcurrentMap<ChannelName, PubSubConnectionEntry> name2PubSubConnection = new ConcurrentHashMap<>();
    
    private final Queue<PubSubConnectionEntry> freePubSubConnections = new ConcurrentLinkedQueue<>();
	  //Semaphore 控制 唤醒线程。
    private final SemaphorePubSub semaphorePubSub = new SemaphorePubSub(this);
    //
    private final CountDownLatchPubSub countDownLatchPubSub = new CountDownLatchPubSub(this);
    
    private final LockPubSub lockPubSub = new LockPubSub(this);
    
    public PublishSubscribeService(ConnectionManager connectionManager, MasterSlaveServersConfig config) {
        super();
        this.connectionManager = connectionManager;
        this.config = config;
        for (int i = 0; i < locks.length; i++) {
            locks[i] = new AsyncSemaphore(1);   //设置信号量为 1 。
        }
    }
}    
                //PublishSubscribe: 
    public RFuture<E> subscribe(String entryName, String channelName)
    {
                    RedisPubSubListener<Object> listener = createListener(channelName, value);
                    service.subscribe(LongCodec.INSTANCE, channelName, semaphore, listener);  
    }
                //PublishSubscribe: 
    public void unsubscribe(E entry, String entryName, String channelName) {
                    service.unsubscribe(new ChannelName(channelName), semaphore);
    }

		// 1)
    public RFuture<PubSubConnectionEntry> subscribe(Codec codec, String channelName, AsyncSemaphore semaphore, RedisPubSubListener<?>... listeners) {
        RPromise<PubSubConnectionEntry> promise = new RedissonPromise<PubSubConnectionEntry>();
        subscribe(codec, new ChannelName(channelName), promise, PubSubType.SUBSCRIBE, semaphore, listeners);
        return promise;
    }
	// 2)
    private void subscribe(Codec codec, ChannelName channelName, 
            RPromise<PubSubConnectionEntry> promise, PubSubType type, AsyncSemaphore lock, RedisPubSubListener<?>... listeners) {
       //缓存
        PubSubConnectionEntry connEntry = name2PubSubConnection.get(channelName);
        if (connEntry != null) {
            addListeners(channelName, promise, type, lock, connEntry, listeners);
            return;
        }
			// 
        freePubSubLock.acquire(new Runnable() {

            @Override
            public void run() {
                if (promise.isDone()) {   //done
                    lock.release();
                    freePubSubLock.release();
                    return;
                }
                
                PubSubConnectionEntry freeEntry = freePubSubConnections.peek();  //下一个
                if (freeEntry == null) {
                    connect(codec, channelName, promise, type, lock, listeners);
                    return;
                }
                
                int remainFreeAmount = freeEntry.tryAcquire();
                if (remainFreeAmount == -1) {
                    throw new IllegalStateException();
                }
                
                PubSubConnectionEntry oldEntry = name2PubSubConnection.putIfAbsent(channelName, freeEntry);
                if (oldEntry != null) {
                    freeEntry.release();
                    freePubSubLock.release();
                    
                    addListeners(channelName, promise, type, lock, oldEntry, listeners);
                    return;
                }
                
                if (remainFreeAmount == 0) {
                    freePubSubConnections.poll();
                }
                freePubSubLock.release();
                
                RFuture<Void> subscribeFuture = addListeners(channelName, promise, type, lock, freeEntry, listeners);
                
                ChannelFuture future;
                if (PubSubType.PSUBSCRIBE == type) {
                    future = freeEntry.psubscribe(codec, channelName);
                } else {
                    future = freeEntry.subscribe(codec, channelName);
                }
                //netty :ChannelFutureListener
                future.addListener(new ChannelFutureListener() {
                    @Override
                    public void operationComplete(ChannelFuture future) throws Exception {
                        if (!future.isSuccess()) {
                            if (!promise.isDone()) {
                                subscribeFuture.cancel(false);
                            }
                            return;
                        }
                        
                        connectionManager.newTimeout(new TimerTask() {
                            @Override
                            public void run(Timeout timeout) throws Exception {
                                subscribeFuture.cancel(false);
                            }
                        }, config.getTimeout(), TimeUnit.MILLISECONDS);
                    }
                });
            }
            
        });
    }



    public RFuture<Void> unsubscribe(ChannelName channelName, AsyncSemaphore lock) {
        PubSubConnectionEntry entry = name2PubSubConnection.remove(channelName);
        if (entry == null || connectionManager.isShuttingDown()) {
            lock.release();
            return RedissonPromise.newSucceededFuture(null);
        }
        
        AtomicBoolean executed = new AtomicBoolean();
        RedissonPromise<Void> result = new RedissonPromise<Void>();
        ChannelFuture future = entry.unsubscribe(channelName, new BaseRedisPubSubListener() {
            
            @Override
            public boolean onStatus(PubSubType type, CharSequence channel) {
                if (type == PubSubType.UNSUBSCRIBE && channel.equals(channelName)) {
                    executed.set(true);
                    
                    if (entry.release() == 1) {
                        freePubSubConnections.add(entry);
                    }
                    
                    lock.release();
                    result.trySuccess(null);
                    return true;
                }
                return false;
            }
            
        });
        
        future.addListener(new ChannelFutureListener() {
            @Override
            public void operationComplete(ChannelFuture future) throws Exception {
                if (!future.isSuccess()) {
                    return;
                }
                
                connectionManager.newTimeout(new TimerTask() {
                    @Override
                    public void run(Timeout timeout) throws Exception {
                        if (executed.get()) {
                            return;
                        }
                        entry.getConnection().onMessage(new PubSubStatusMessage(PubSubType.UNSUBSCRIBE, channelName));
                    }
                }, config.getTimeout(), TimeUnit.MILLISECONDS);
            }
        });
        
        return result;
    }

附录

参考

https://github.com/mrniko/redisson/wiki

中文文档:https://github.com/redisson/redisson/wiki/Redisson%E9%A1%B9%E7%9B%AE%E4%BB%8B%E7%BB%8D

redisson-2.10.4源代码分析 : https://blog.csdn.net/ly1028826685/article/details/84922706

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/130046.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

使用Stable Diffusion进行Ai+艺术设计(以智慧灯杆为例)

目录一. 安装环境二. 配置模型2.1 stable diffusion v12.2 运行并测试生成效果Stable Diffusion 是一种以 CLIP ViT-L/14 文本编码器的&#xff08;非池化&#xff09;文本嵌入为条件的潜在扩散模型。一. 安装环境 创建并激活一个合适的名为conda的环境&#xff1a;ldm conda…

来自2022的年终总结,迎接新的2023

来自2022的年终总结&#xff0c;迎接新的2023&#x1f389;2022&#x1f389;&#x1f339;CSDN博客数据&#x1f339;2022年度也在持续原创博文&#xff0c;累计超过100篇&#xff0c;也收获了很多同学支持付费专栏订阅不断上升&#xff0c;帮助越来越多的同学学习&#x1f33…

java多线程(11):线程同步线程协作

1 线程通信 应用场景 : 生产者和消费者问题 假设仓库中只能存放一件产品 , 生产者将生产出来的产品放入仓库 , 消费者将仓库中产品取走消费 如果仓库中没有产品 , 则生产者将产品放入仓库 , 否则停止生产并等待 , 直到仓库中的产品被消费者取走为止 如果仓库中放有产品 ,…

CSS——结构和布局

1. 自适应内部元素的宽度max-width: min-content; 如果不给元素指定一个具体的 height&#xff0c;它就会自动适应其内容的高度。尝试对width 也实现类似的行为。 使 figure 元素能跟它所包含的图片一样宽&#xff08;图片的尺寸往往不是固定的&#xff09;&#xff0c;而且是…

win10录屏文件在哪?怎么更改win10录屏保存位置

在我们日常使用的win10电脑是自带录屏的功能的&#xff0c;可以将一些精彩画面录屏下来&#xff1b;当录制完视频后&#xff0c;系统会自动将视频保存起来。那win10录屏文件在哪&#xff1f;怎么更改win10录屏保存位置&#xff1f;今天小编就给大家分享一下如何查看win10录屏文…

智能车|直流电机、编码器与驱动器---驱动器

智能车|直流电机、编码器与驱动器---驱动器驱动器TB6612FNG 电机驱动器TB6612FNG 的主要参数引脚说明驱动器 需要驱动器原因&#xff1a; 改变施加给电机电源极之间的电压来调整转速&#xff0c;手动去改变电压太过于麻烦&#xff0c;可以通过微控制器&#xff08;单片机&…

ahooks中的核心hook-useRequest(上)

前言 useRequest是一个异步数据管理的hooks&#xff0c;是ahooks Hooks库的核心hook&#xff0c;因为其通过插件式组织代码&#xff0c;大部分功能都通过插件的形式来实现&#xff0c;所以其核心代码行数较少&#xff0c;简单易懂&#xff0c;还可以支持我们自定义扩展功能。可…

基础知识总结

Java 基础 1. JDK 和 JRE 有什么区别&#xff1f; JDK&#xff1a; Java Development Kit 的简称&#xff0c;Java 开发工具包&#xff0c;提供了 Java 的开发环境和运行环境。JRE&#xff1a; Java Runtime Environment 的简称&#xff0c;Java 运行环境&#xff0c;为 Java…

Android App加固原理与技术历程

App为什么会被破-jie入侵 随着黑客技术的普及化平民化&#xff0c;App&#xff0c;这个承载我们移动数字工作和生活的重要工具&#xff0c;不仅是黑客眼中的肥肉&#xff0c;也获得更多网友的关注。百度一下“App破-jie”就有5290万条结果。 ​ 一旦App被破-jie&#xff0c;不…

【图像处理】图像的锐化操作 | 边缘检测sobel算子,拉普拉斯算子,Canny算子| opencv

文章目录前言一、一阶导数算子&#xff1a;sobel算子二、二阶导数算子&#xff1a;拉普拉斯算子三.Canny算子前言 参考视频&#xff1a;opencv教程&#xff08;跟着视频敲了一遍代码&#xff09; 参考教材&#xff1a;《数字图像处理基础》 作者&#xff1a;朱虹 一、一阶导数…

【unity笔记】图解 Vector3.SignedAngle()方法的返回值

这个方法可以理解为&#xff1a;“两个向量之间的夹角&#xff08;有符号的&#xff09;”。 我会将它想象成&#xff1a;将两个向量都放在坐标原点&#xff0c;一个向量要向哪个方向旋转多少度 才能与另一个向量重合。 于是我在坐标原点放置了两个向量&#xff1a;OB和OA。 …

Java Object 类

Java Object 类是所有类的父类&#xff0c;也就是说 Java 的所有类都继承了 Object&#xff0c;子类可以使用 Object 的所有方法。 Object 类位于 java.lang 包中&#xff0c;编译时会自动导入&#xff0c;我们创建一个类时&#xff0c;如果没有明确继承一个父类&#xff0c;那…

Python采集股票数据信息

前言 今天打算来整整股票&#xff0c;简简单单的采集一些股票数据 对这个有兴趣的就一起来瞧瞧吧 准备 开发环境 & 第三方模块 解释器版本: python 3.8代码编辑器: pycharm 2021.2requests: pip install requests 爬虫pyecharts: pip install pyecharts 数据分析pandas…

图像属性操作

数字图像处理本质是对多维矩阵的操作。按照处理对象不同&#xff0c;可分为黑白图像处理&#xff0c;灰度图像处理&#xff0c;彩色图像处理。按照处理方法分为空间域处理和频域处理。按照策略分为全局处理和局部处理。 #一般步骤输入图像 多维数组 数组运算 图像…

排序算法:堆排序,快速排序,归并排序。内附完整代码和算法思路详解。

目录 一&#xff1a;堆排序 1.1&#xff1a;堆的数据结构 1.2&#xff1a;大小根堆 1.3&#xff1a;向下调整算法构建小根堆 1.4&#xff1a;堆排序思想 二&#xff1a;快速排序 2.1&#xff1a;快排单趟思想 三&#xff1a;归并排序 3.1&#xff1a;分组过程 3.2&am…

JSP ssh学习管理系统myeclipse开发mysql数据库MVC模式java编程计算机网页设计

一、源码特点 JSP ssh学习管理系统是一套完善的web设计系统&#xff08;系统采用ssh框架进行设计开发&#xff09;&#xff0c;对理解JSP java编程开发语言有帮助&#xff0c;系统具有完整的源代码和数据库&#xff0c;系统主要采用B/S模式 开发。开发环境为TOMCAT7.0,Mye…

MMDetection框架入门教程之环境部署

创建虚拟环境 打开Anaconda Prompt&#xff0c;创建一个新的虚拟环境mmlab。注意我这里选择了python3.6&#xff0c;因为在python3.7下出现了mmdet3d.apis中的类无法无法import的情况&#xff08;但mmdet3d可以import&#xff09;&#xff0c;暂时不清楚原因。 conda create …

SQLSERVER,求平均数,最大,最小,中位数,众数

SQLSERVER&#xff0c;求平均数&#xff0c;最大&#xff0c;最小&#xff0c;中位数&#xff0c;众数 SQLSERVER&#xff0c;求平均数&#xff0c;最大&#xff0c;最小&#xff0c;中位数&#xff0c;众数 SELECT -- *, t1.remark, t1.my_count, t1.my_sum, …

ArcGIS Python ​影像批量裁剪

该工具在&#xff1a;“14综合\工具箱.tbx\影像裁剪\按记录批量裁剪影像”&#xff0c;影像数据按矢量面要素批量裁剪&#xff0c;界面如图14-5所示。 图14-5 影像批量裁剪 按一个矢量面数据&#xff0c;按字段值相同的融合在一起裁剪影像&#xff0c;字段值是裁剪后的影像名字…

【再学Tensorflow2】TensorFlow2的高层封装

TensorFlow2的高层封装使用Tensorflow2构建模型的3种方法使用Sequential按层顺序构建模型使用函数式API创建任意结构的模型使用Model子类化创建自定义模型训练模型的3种方法内置fit方法内置train_on_batch方法自定义训练循环使用GPU训练模型使用单GPU训练模型使用多GPU训练模型…