介绍
Redisson是架设在Redis基础上的一个Java驻内存数据网格(In-Memory Data Grid)。
Redisson在基于NIO
的Netty
框架上,充分的利用了Redis键值数据库提供的一系列优势,在Java实用工具包中常用接口的基础上,为使用者提供了一系列具有分布式特性的常用工具类。使得原本作为协调单机多线程并发程序的工具包获得了协调分布式多机多线程并发系统的能力,大大降低了设计和研发大规模分布式系统的难度。同时结合各富特色的分布式服务,更进一步简化了分布式环境中程序相互之间的协作。
特色
-
支持云托管服务模式(同时支持亚马逊云的ElastiCache Redis和微软云的Azure Redis Cache):
自动发现主节点变化
-
支持Redis集群模式(同时支持亚马逊云的ElastiCache Redis Cluster和微软云的Azure Redis Cache):
自动发现主从节点
自动更新状态和组态拓扑
自动发现槽的变化
-
支持Redis哨兵模式:
自动发现主、从和哨兵节点
自动更新状态和组态拓扑
-
支持Redis主从模式
-
支持Redis单节模式
-
多节点模式均支持读写分离:从读主写,主读主写,主从混读主写
-
所有对象和接口均支持异步操作
-
自行管理的弹性异步连接池
-
所有操作线程安全
-
支持LUA脚本
-
提供分布式对象
通用对象桶(Object Bucket)、二进制流(Binary Stream)、地理空间对象桶(Geospatial Bucket)、BitSet、原子整长形(AtomicLong)、原子双精度浮点数(AtomicDouble)、话题(订阅分发)、 布隆过滤器(Bloom Filter)和基数估计算法(HyperLogLog) -
提供分布式集合
映射(Map)、多值映射(Multimap)、集(Set)、列表(List)、有序集(SortedSet)、计分排序集(ScoredSortedSet)、字典排序集(LexSortedSet)、列队(Queue)、双端队列(Deque)、阻塞队列(Blocking Queue)、有界阻塞列队(Bounded Blocking Queue)、 阻塞双端列队(Blocking Deque)、阻塞公平列队(Blocking Fair Queue)、延迟列队(Delayed Queue)、优先队列(Priority Queue)和优先双端队列(Priority Deque) -
提供分布式锁和同步器
可重入锁(Reentrant Lock)、公平锁(Fair Lock)、联锁(MultiLock)、 红锁(RedLock)、读写锁(ReadWriteLock)、信号量(Semaphore)、可过期性信号量(PermitExpirableSemaphore)和闭锁(CountDownLatch) -
提供分布式服务
分布式远程服务(Remote Service, RPC)、分布式实时对象(Live Object)服务、分布式执行服务(Executor Service)、分布式调度任务服务(Scheduler Service)和分布式映射归纳服务(MapReduce) -
支持Spring框架
-
提供Spring Cache集成
-
提供Hibernate Cache集成
-
提供JCache实现
-
提供Tomcat Session Manager
-
提供Spring Session集成
-
支持异步流方式执行操作
-
支持Redis管道操作(批量执行)
-
支持安卓(Andriod)系统
-
支持断线自动重连
-
支持命令发送失败自动重试
-
支持OSGi
-
支持采用多种方式自动序列化和反序列化(Jackson JSON, Avro, Smile, CBOR, MsgPack, Kryo, FST, LZ4, Snappy和JDK序列化)
引入包
<dependency>
<groupId>org.redisson</groupId>
<artifactId>redisson</artifactId>
<version>3.12.1</version>
</dependency>
https://mvnrepository.com/artifact/org.redisson/redisson
源码解析
基于3.12.1 版本。
RedissonClient
使用redis之前必须要创建连接,也即RedissonClient
。
RedissonClient
是个接口,提供操作Redis的一系列命令。
Redisson
类是RedissonClient
接口的唯一实现。Redisson
类提供了几个静态方法创建Client。
public static RedissonClient create();//使用默认配置
public static RedissonClient create(Config config) //使用指定配置
public static RedissonRxClient createRx(); //RxJava2
public static RedissonRxClient createRx(Config config);
public static RedissonReactiveClient createReactive();//Reactive interface,Project Reactor
public static RedissonReactiveClient createReactive(Config config);
redisson
支持3种方式访问Redis
,大多数情况使用RedissonClient
,即Redisson
,同步操作。
在创建client时需要指定配置,通过Config
类完成。
public class Config {
//几种模式
private SentinelServersConfig sentinelServersConfig;
private MasterSlaveServersConfig masterSlaveServersConfig;
private SingleServerConfig singleServerConfig;
private ClusterServersConfig clusterServersConfig;
private ReplicatedServersConfig replicatedServersConfig;
//连接管理
private ConnectionManager connectionManager;
/**
* Threads amount shared between all redis node clients
*/
private int threads = 16;
private int nettyThreads = 32;
/**
* Redis key/value codec. FST codec is used by default
*/
private Codec codec;
private ExecutorService executor;
/**
* Config option for enabling Redisson Reference feature.
* Default value is TRUE
*/
private boolean referenceEnabled = true;
private TransportMode transportMode = TransportMode.NIO;
private EventLoopGroup eventLoopGroup;
//锁默认释放时间,30秒
private long lockWatchdogTimeout = 30 * 1000;
private boolean keepPubSubOrder = true;
private boolean decodeInExecutor = false;
private boolean useScriptCache = false;
private int minCleanUpDelay = 5;
private int maxCleanUpDelay = 30*60;
private int cleanUpKeysAmount = 100;
/**
* AddressResolverGroupFactory switch between default and round robin
*/
private AddressResolverGroupFactory addressResolverGroupFactory = new DnsAddressResolverGroupFactory();
使用方式:
// 1. 构造Config对象
Config = ...
// 2. 构造Redisson实例
RedissonClient redisson = Redisson.create(config);
// 3. 获取需要的对象
RMap map = redisson.getMap("myMap");
RLock lock = redisson.getLock("myLock");
RExecutorService executor = redisson.getExecutorService("myExecutorService");
锁 Lock
了解Redisson的锁,最好先了解Java JUC
包中的各种锁。
Java Lock接口
//package java.util.concurrent.locks;
public interface Lock {
void lock();
void lockInterruptibly() throws InterruptedException;
boolean tryLock();
boolean tryLock(long time, TimeUnit unit) throws InterruptedException;
void unlock();
Condition newCondition();
}
主要实现类:
RLock
(重入锁)
RLock
实现Java的重入锁。扩展了 Lock
。
public interface RLock extends Lock, RLockAsync {
//获取锁对象名称
String getName();
void lockInterruptibly(long leaseTime, TimeUnit unit) throws InterruptedException;
boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException;
void lock(long leaseTime, TimeUnit unit);
/**强制释放锁。如果锁成功释放则返回true。
*/
boolean forceUnlock();
/**是否被任何线程锁定
*/
boolean isLocked();
/**
* 是否被线程锁定
*/
boolean isHeldByThread(long threadId);
/**
* 是否被当前线程锁定
*/
boolean isHeldByCurrentThread();
/**被当前线程锁定次数(可冲入)
*/
int getHoldCount();
long remainTimeToLive();
}
锁的异步操作,都返回一个RFuture
。
public interface RLockAsync {
RFuture<Boolean> forceUnlockAsync();
RFuture<Void> unlockAsync();
RFuture<Void> unlockAsync(long threadId);
RFuture<Boolean> tryLockAsync();
RFuture<Void> lockAsync();
RFuture<Void> lockAsync(long threadId);
RFuture<Void> lockAsync(long leaseTime, TimeUnit unit);
RFuture<Void> lockAsync(long leaseTime, TimeUnit unit, long threadId);
RFuture<Boolean> tryLockAsync(long threadId);
RFuture<Boolean> tryLockAsync(long waitTime, TimeUnit unit);
RFuture<Boolean> tryLockAsync(long waitTime, long leaseTime, TimeUnit unit);
RFuture<Boolean> tryLockAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId);
RFuture<Integer> getHoldCountAsync();
RFuture<Boolean> isLockedAsync();
RFuture<Long> remainTimeToLiveAsync();
}
RLock
源码
获取锁getLock
@Override
public RLock getLock(String name) {
return new RedissonLock(connectionManager.getCommandExecutor(), name);
}
返回一个RedissonLock
对象。
RedissonLock
派生于RedissonExpirable
,实现了RLock
public class RedissonLock extends RedissonExpirable implements RLock {
//存储entryName和其过期时间
private static final ConcurrentMap<String, ExpirationEntry> EXPIRATION_RENEWAL_MAP = new ConcurrentHashMap<>();
//锁默认释放时间,30秒
protected long internalLockLeaseTime;
final String id;
final String entryName;
protected final LockPubSub pubSub;
//命令执行器,异步执行器
final CommandAsyncExecutor commandExecutor;
public RedissonLock(CommandAsyncExecutor commandExecutor, String name) {
super(commandExecutor, name);
this.commandExecutor = commandExecutor;
this.id = commandExecutor.getConnectionManager().getId();
this.internalLockLeaseTime = commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout();
this.entryName = id + ":" + name;
this.pubSub = commandExecutor.getConnectionManager().getSubscribeService().getLockPubSub();
}
}
lock()
@Override
public void lock() {
try {
lock(-1, null, false);
} catch (InterruptedException e) {
throw new IllegalStateException();
}
}
@Override
public void lock(long leaseTime, TimeUnit unit) {
try {
lock(leaseTime, unit, false);
} catch (InterruptedException e) {
throw new IllegalStateException();
}
}
/**
* leaseTime:锁过期时间,
* interruptibly:是否允许中断。
*/
private void lock(long leaseTime, TimeUnit unit, boolean interruptibly ) throws InterruptedException {
long threadId = Thread.currentThread().getId();
//加锁。 先尝试一次。
Long ttl = tryAcquire(leaseTime, unit, threadId);
// lock acquired,null表示加锁成功。
if (ttl == null) {
return;
}
//加锁失败,则:订阅锁,等待锁释放。
RFuture<RedissonLockEntry> future = subscribe(threadId);
if (interruptibly) {
commandExecutor.syncSubscriptionInterrupted(future);
} else {
commandExecutor.syncSubscription(future);
}
try {
/**死循环,直到满足:
*/
while (true) {
//再次尝试。
ttl = tryAcquire(leaseTime, unit, threadId);
// lock acquired。获取到锁。
if (ttl == null) {
break;
}
// waiting for message。
//通过 Semaphore 来阻塞当前线程。
if (ttl >= 0) {
try {
future.getNow().getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
if (interruptibly) {
throw e;
}
future.getNow().getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
}
} else {
//ttl < 0 表示可以获取了。
if (interruptibly) {
future.getNow().getLatch().acquire();
} else {
future.getNow().getLatch().acquireUninterruptibly();
}
}
}
} finally {
unsubscribe(future, threadId);
}
// get(lockAsync(leaseTime, unit));
}
private Long tryAcquire(long leaseTime, TimeUnit unit, long threadId) {
return get(tryAcquireAsync(leaseTime, unit, threadId));
}
private <T> RFuture<Long> tryAcquireAsync(long leaseTime, TimeUnit unit, long threadId) {
//设置了过期时间
if (leaseTime != -1) {
return tryLockInnerAsync(leaseTime, unit, threadId, RedisCommands.EVAL_LONG);
}
//没有设置过期,则定时续期。
RFuture<Long> ttlRemainingFuture = tryLockInnerAsync(commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(), TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);
ttlRemainingFuture.onComplete((ttlRemaining, e) -> {
if (e != null) {
return;
}
// lock acquired
if (ttlRemaining == null) {
//设置 watch dog ,每1/3 * expire 设置一次超时
scheduleExpirationRenewal(threadId);
}
});
return ttlRemainingFuture;
}
/*加锁,返回nil表示加锁成功,其他表示其他锁的剩余时间。
*/
<T> RFuture<T> tryLockInnerAsync(long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
internalLockLeaseTime = unit.toMillis(leaseTime);
//redis执行lua脚本,KEYS下标从1开始。
//KYES[1]:是key名称,ARGV[1]是超时时间,ARGV[2]是KEY的 value。
//getLockName返回格式:{collection id} + ':' + {threadId}
return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, command,
//不存在hset 则创建。设置第一个entry是key,value。
"if (redis.call('exists', KEYS[1]) == 0) then " +
//为key增加field,并设置field的值为1。
"redis.call('hset', KEYS[1], ARGV[2], 1); " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return nil; " +
"end; " +
//存在key,则判断当前线程是否已加锁(field存在),存在则把引用计数加1。
"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
"redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return nil; " +
"end; " +
//返回剩余过期时间:毫秒
"return redis.call('pttl', KEYS[1]);",
Collections.<Object>singletonList(getName()), internalLockLeaseTime, getLockName(threadId));
}
protected RFuture<RedissonLockEntry> subscribe(long threadId) {
return pubSub.subscribe(getEntryName(), getChannelName());
}
watch dog
用于KEY 续期,以免锁过期导致问题。使用的是netty的 时间轮。
private void scheduleExpirationRenewal(long threadId) {
ExpirationEntry entry = new ExpirationEntry();
ExpirationEntry oldEntry = EXPIRATION_RENEWAL_MAP.putIfAbsent(getEntryName(), entry);
if (oldEntry != null) {
oldEntry.addThreadId(threadId);
} else {
entry.addThreadId(threadId);
renewExpiration();
}
}
public static class ExpirationEntry {
private final Map<Long, Integer> threadIds = new LinkedHashMap<>();
private volatile Timeout timeout;
}
private void renewExpiration() {
ExpirationEntry ee = EXPIRATION_RENEWAL_MAP.get(getEntryName());
if (ee == null) {
return;
}
//
Timeout task = commandExecutor.getConnectionManager().newTimeout(new TimerTask() {
@Override
public void run(Timeout timeout) throws Exception {
ExpirationEntry ent = EXPIRATION_RENEWAL_MAP.get(getEntryName());
if (ent == null) {
return;
}
Long threadId = ent.getFirstThreadId();
if (threadId == null) {
return;
}
//
RFuture<Boolean> future = renewExpirationAsync(threadId);
future.onComplete((res, e) -> {
if (e != null) {
log.error("Can't update lock " + getName() + " expiration", e);
return;
}
if (res) {
// reschedule itself
renewExpiration();
}
});
}
}, internalLockLeaseTime / 3, TimeUnit.MILLISECONDS);
ee.setTimeout(task);
}
protected RFuture<Boolean> renewExpirationAsync(long threadId) {
return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return 1; " +
"end; " +
"return 0;",
Collections.<Object>singletonList(getName()),
internalLockLeaseTime, getLockName(threadId));
}
RedissonExpirable
RedissonExpirable
是个抽象类,实现了过期操作。
RedissonObject
:Redis对象。
public abstract class RedissonObject implements RObject {
protected final CommandAsyncExecutor commandExecutor;
//对象名称
protected String name;
//编码
protected final Codec codec;
}
unlock()
@Override
public void unlock() {
try {
get(unlockAsync(Thread.currentThread().getId()));
} catch (RedisException e) {
if (e.getCause() instanceof IllegalMonitorStateException) {
throw (IllegalMonitorStateException) e.getCause();
} else {
throw e;
}
}
}
@Override
public RFuture<Void> unlockAsync(long threadId) {
RPromise<Void> result = new RedissonPromise<Void>();
RFuture<Boolean> future = unlockInnerAsync(threadId);
//
future.onComplete((opStatus, e) -> {
cancelExpirationRenewal(threadId);
if (e != null) {
result.tryFailure(e);
return;
}
if (opStatus == null) {
IllegalMonitorStateException cause = new IllegalMonitorStateException("attempt to unlock lock, not locked by current thread by node id: "
+ id + " thread-id: " + threadId);
result.tryFailure(cause);
return;
}
//设置处理成功
result.trySuccess(null);
});
return result;
}
protected RFuture<Boolean> unlockInnerAsync(long threadId) {
/*
KEY[1]:key name
KEY[2]:
ARGV[1]:
ARGV[2]:
ARGV[3]: field name
*/
return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
"if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " +
"return nil;" +
"end; " +
//计数减1.
"local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " +
//计数仍然大于0,则设置过期时间。
"if (counter > 0) then " +
"redis.call('pexpire', KEYS[1], ARGV[2]); " +
"return 0; " +
//计数等于0,则删除key,
"else " +
"redis.call('del', KEYS[1]); " +
"redis.call('publish', KEYS[2], ARGV[1]); " +
"return 1; "+
"end; " +
"return nil;",
Arrays.<Object>asList(getName(), getChannelName()), LockPubSub.UNLOCK_MESSAGE, internalLockLeaseTime, getLockName(threadId));
}
@Override
public RFuture<Boolean> deleteAsync() {
return forceUnlockAsync();
}
@Override
public RFuture<Boolean> forceUnlockAsync() {
cancelExpirationRenewal(null);
return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
"if (redis.call('del', KEYS[1]) == 1) then "
+ "redis.call('publish', KEYS[2], ARGV[1]); "
+ "return 1 "
+ "else "
+ "return 0 "
+ "end",
Arrays.<Object>asList(getName(), getChannelName()), LockPubSub.UNLOCK_MESSAGE);
}
protected void unsubscribe(RFuture<RedissonLockEntry> future, long threadId) {
pubSub.unsubscribe(future.getNow(), getEntryName(), getChannelName());
}
状态控制
@Override
public boolean isHeldByThread(long threadId) {
//判断hset的field是否存在,2个参数:getName(),getLockName(threadId)
RFuture<Boolean> future = commandExecutor.writeAsync(getName(), LongCodec.INSTANCE, RedisCommands.HEXISTS, getName(), getLockName(threadId));
return get(future);
}
@Override
public int getHoldCount() {
return get(getHoldCountAsync());
}
public RFuture<Integer> getHoldCountAsync() {
return commandExecutor.writeAsync(getName(), LongCodec.INSTANCE, HGET, getName(), getLockName(Thread.currentThread().getId()));
}
公平锁(RedissonFairLock)
RedissonFairLock是RedissonLock的子类。实现公平锁。
公平锁的设计思路是通过List实现等待队列。
public class RedissonFairLock extends RedissonLock implements RLock {
private final long threadWaitTime;
private final CommandAsyncExecutor commandExecutor;
//等待队列名称
private final String threadsQueueName;
private final String timeoutSetName;
}
tryLockInnerAsync()
@Override
<T> RFuture<T> tryLockInnerAsync(long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
internalLockLeaseTime = unit.toMillis(leaseTime);
long currentTime = System.currentTimeMillis();
if (command == RedisCommands.EVAL_NULL_BOOLEAN) {
//KEY[1]:name。
//KEY[2]:threadsQueue ,线程队列(list),元素为threadid。
//KEY[3]:timeoutSet
return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, command,
// remove stale threads //把所有过期时间小于当前时间的都抛弃掉。
"while true do " +
//获取线程队列(list)第一个元素。firstThreadId2:第1个元素。
"local firstThreadId2 = redis.call('lindex', KEYS[2], 0);" +
//没有等待对象,直接跳出。
"if firstThreadId2 == false then " +
"break;" +
"end;" +
//第一个元素的的score:超时时间
"local timeout = tonumber(redis.call('zscore', KEYS[3], firstThreadId2));" +
//超时了,则从timeoutSet移除元素,threadsQueue 移除第一个元素
"if timeout <= tonumber(ARGV[3]) then " +
// remove the item from the queue and timeout set
// NOTE we do not alter any other timeout
"redis.call('zrem', KEYS[3], firstThreadId2);" +
"redis.call('lpop', KEYS[2]);" +
"else " +
"break;" +
"end;" +
"end;" +
//不存在KEY,并且 threadsQueue也无元素 或者 是第一个元素当前线程。则移除当前线程的元素。
"if (redis.call('exists', KEYS[1]) == 0) " + //锁存在
"and ((redis.call('exists', KEYS[2]) == 0) " + // 锁没有被任何线程持有。
"or (redis.call('lindex', KEYS[2], 0) == ARGV[2])) then " + //当前线程持有锁。
"redis.call('lpop', KEYS[2]);" + //移除自等待队列
"redis.call('zrem', KEYS[3], ARGV[2]);" +
// decrease timeouts for all waiting in the queue
//所有等待线程的超时减少。
"local keys = redis.call('zrange', KEYS[3], 0, -1);" +
"for i = 1, #keys, 1 do " +
"redis.call('zincrby', KEYS[3], -tonumber(ARGV[4]), keys[i]);" +
"end;" +
"redis.call('hset', KEYS[1], ARGV[2], 1);" +
"redis.call('pexpire', KEYS[1], ARGV[1]);" +
"return nil;" +
"end;" +
//是重入(再次加锁)
"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
"redis.call('hincrby', KEYS[1], ARGV[2], 1);" +
"redis.call('pexpire', KEYS[1], ARGV[1]);" +
"return nil;" +
"end;" +
"return 1;",
Arrays.<Object>asList(getName(), threadsQueueName, timeoutSetName),
internalLockLeaseTime, getLockName(threadId), currentTime, threadWaitTime);
}
if (command == RedisCommands.EVAL_LONG) {
return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, command,
// remove stale threads //把所有过期时间小于当前时间的都抛弃掉。
"while true do " +
"local firstThreadId2 = redis.call('lindex', KEYS[2], 0);" +
"if firstThreadId2 == false then " +
"break;" +
"end;" +
"local timeout = tonumber(redis.call('zscore', KEYS[3], firstThreadId2));" +
"if timeout <= tonumber(ARGV[4]) then " +
// remove the item from the queue and timeout set
// NOTE we do not alter any other timeout
"redis.call('zrem', KEYS[3], firstThreadId2);" +
"redis.call('lpop', KEYS[2]);" +
"else " +
"break;" +
"end;" +
"end;" +
// check if the lock can be acquired now
//可以获取到锁的情况。
"if (redis.call('exists', KEYS[1]) == 0) " +
"and ((redis.call('exists', KEYS[2]) == 0) " +
"or (redis.call('lindex', KEYS[2], 0) == ARGV[2])) then " +
// remove this thread from the queue and timeout set
// 当前线程获取到锁, 从queue、timeout中移除
"redis.call('lpop', KEYS[2]);" +
"redis.call('zrem', KEYS[3], ARGV[2]);" +
// decrease timeouts for all waiting in the queue
//把等待队列中的都减少超时时间。
"local keys = redis.call('zrange', KEYS[3], 0, -1);" +
"for i = 1, #keys, 1 do " +
"redis.call('zincrby', KEYS[3], -tonumber(ARGV[3]), keys[i]);" +
"end;" +
// acquire the lock and set the TTL for the lease
"redis.call('hset', KEYS[1], ARGV[2], 1);" + // 标记当前锁已被某个线程获取
"redis.call('pexpire', KEYS[1], ARGV[1]);" + // 设置标记的失效时常, 默认是 30 * 1000 ms
"return nil;" +
"end;" +
// check if the lock is already held, and this is a re-entry
//当前线程是再次重入。
"if redis.call('hexists', KEYS[1], ARGV[2]) == 1 then " +
"redis.call('hincrby', KEYS[1], ARGV[2],1);" +
"redis.call('pexpire', KEYS[1], ARGV[1]);" +
"return nil;" +
"end;" +
// the lock cannot be acquired
// check if the thread is already in the queue
"local timeout = redis.call('zscore', KEYS[3], ARGV[2]);" +
"if timeout ~= false then " +
// the real timeout is the timeout of the prior thread
// in the queue, but this is approximately correct, and
// avoids having to traverse the queue
"return timeout - tonumber(ARGV[3]) - tonumber(ARGV[4]);" +
"end;" +
// add the thread to the queue at the end, and set its timeout in the timeout set to the timeout of
// the prior thread in the queue (or the timeout of the lock if the queue is empty) plus the
// threadWaitTime
//获取最后一个等待线程的过期时间,然后处理,就是当前线程的等待时间。后加入后执行。
"local lastThreadId = redis.call('lindex', KEYS[2], -1);" +
"local ttl;" +
"if lastThreadId ~= false and lastThreadId ~= ARGV[2] then " +
"ttl = tonumber(redis.call('zscore', KEYS[3], lastThreadId)) - tonumber(ARGV[4]);" +
"else " +
"ttl = redis.call('pttl', KEYS[1]);" +
"end;" +
"local timeout = ttl + tonumber(ARGV[3]) + tonumber(ARGV[4]);" +
//获取不到锁,当前线程,加入等待队列中。
"if redis.call('zadd', KEYS[3], timeout, ARGV[2]) == 1 then " +
"redis.call('rpush', KEYS[2], ARGV[2]);" +
"end;" +
"return ttl;",
Arrays.<Object>asList(getName(), threadsQueueName, timeoutSetName),
internalLockLeaseTime, getLockName(threadId), threadWaitTime, currentTime);
}
throw new IllegalArgumentException();
}
加锁成功条件:
1、KEY:xxx 不存在
2、KEY:xxx 存在,并且 ( KEY:redisson_lock_timeout:{xxxx} 不存在或者 第一个等待 线程是当前线程 )。
unlock()
@Override
protected RFuture<Boolean> unlockInnerAsync(long threadId) {
return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
// remove stale threads。 //把所有过期时间小于当前时间的都抛弃掉。
"while true do "
//等待队列中取第一个线程。
+ "local firstThreadId2 = redis.call('lindex', KEYS[2], 0);"
+ "if firstThreadId2 == false then "
+ "break;"
+ "end; "
//把所有过期时间小于当前时间的都抛弃掉。
+ "local timeout = tonumber(redis.call('zscore', KEYS[3], firstThreadId2));"
+ "if timeout <= tonumber(ARGV[4]) then "
+ "redis.call('zrem', KEYS[3], firstThreadId2); "
+ "redis.call('lpop', KEYS[2]); "
+ "else "
+ "break;"
+ "end; "
+ "end;"
+ "if (redis.call('exists', KEYS[1]) == 0) then " +
"local nextThreadId = redis.call('lindex', KEYS[2], 0); " +
"if nextThreadId ~= false then " +
"redis.call('publish', KEYS[4] .. ':' .. nextThreadId, ARGV[1]); " +
"end; " +
"return 1; " +
"end;" +
//当前线程没有引用计数,(没在等待队列)。
"if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " +
"return nil;" +
"end; " +
//当前线程 释放一次锁(引用计数减1)。如果计数大于0,则设置过期时间。
"local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " +
"if (counter > 0) then " +
"redis.call('pexpire', KEYS[1], ARGV[2]); " +
"return 0; " +
"end; " +
//删除锁。
"redis.call('del', KEYS[1]); " +
"local nextThreadId = redis.call('lindex', KEYS[2], 0); " +
"if nextThreadId ~= false then " +
//发布释放锁消息。
"redis.call('publish', KEYS[4] .. ':' .. nextThreadId, ARGV[1]); " +
"end; " +
"return 1; ",
Arrays.<Object>asList(getName(), threadsQueueName, timeoutSetName, getChannelName()),
LockPubSub.UNLOCK_MESSAGE, internalLockLeaseTime, getLockName(threadId), System.currentTimeMillis());
}
RedissonMultiLock
MultiLock可以将多个锁合并为一个大锁,对一个大锁进行统一的申请加锁以及释放锁,一次性锁定多个资源,再去处理一些事情,然后一次性释放所有的资源对应的锁。
public class RedissonMultiLock implements RLock {
//存储多个RLock
final List<RLock> locks = new ArrayList<>();
//传入多个锁。
public RedissonMultiLock(RLock... locks) {
if (locks.length == 0) {
throw new IllegalArgumentException("Lock objects are not defined");
}
this.locks.addAll(Arrays.asList(locks));
}
}
//使用
RedissonClient redisson = Redisson.create(config);
RLock lock1 = redisson.getLock("lock1");
RLock lock2 = redisson.getLock("lock2");
RLock lock3 = redisson.getLock("lock3");
RedissonMultiLock lock = new RedissonMultiLock(lock1,lock2,lock3);
lock.lock();
lock.unlock();
lock()
public RFuture<Void> lockAsync(long leaseTime, TimeUnit unit, long threadId) {
long baseWaitTime = locks.size() * 1500;
long waitTime = -1;
if (leaseTime == -1) {
waitTime = baseWaitTime;
} else {
leaseTime = unit.toMillis(leaseTime);
waitTime = leaseTime;
if (waitTime <= 2000) {
waitTime = 2000;
} else if (waitTime <= baseWaitTime) {
waitTime = ThreadLocalRandom.current().nextLong(waitTime/2, waitTime);
} else {
waitTime = ThreadLocalRandom.current().nextLong(baseWaitTime, waitTime);
}
}
RPromise<Void> result = new RedissonPromise<Void>();
tryLockAsync(threadId, leaseTime, TimeUnit.MILLISECONDS, waitTime, result);
return result;
}
@Override
public RFuture<Boolean> tryLockAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId) {
RPromise<Boolean> result = new RedissonPromise<Boolean>();
LockState state = new LockState(waitTime, leaseTime, unit, threadId);
state.tryAcquireLockAsync(locks.listIterator(), result);
return result;
}
RReadWriteLock
示例:
RReadWriteLock rwLock = redisson.getReadWriteLock("anyRWLock");
// 代码片段
rwLock.readLock().lock();
rwLock.readLock().unlock();
rwLock.writeLock().lock();
rwLock.writeLock().unlock();
public interface RReadWriteLock extends ReadWriteLock {
@Override
RLock readLock();
@Override
RLock writeLock();
}
public class RedissonReadWriteLock extends RedissonExpirable implements RReadWriteLock {
@Override
public RLock readLock() {
return new RedissonReadLock(commandExecutor, getName());
}
@Override
public RLock writeLock() {
return new RedissonWriteLock(commandExecutor, getName());
}
}
RedissonReadLock
@Override
<T> RFuture<T> tryLockInnerAsync(long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
internalLockLeaseTime = unit.toMillis(leaseTime);
return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, command,
//获取当前是锁模式。read/write。
"local mode = redis.call('hget', KEYS[1], 'mode'); " +
//未加锁。
"if (mode == false) then " +
"redis.call('hset', KEYS[1], 'mode', 'read'); " + //设置read 模式。
"redis.call('hset', KEYS[1], ARGV[2], 1); " + //设置锁计数。
"redis.call('set', KEYS[2] .. ':1', 1); " + //超时信息
"redis.call('pexpire', KEYS[2] .. ':1', ARGV[1]); " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return nil; " +
"end; " +
//如果是读模式,或者 写模式下,当前线程持有锁。
"if (mode == 'read') or (mode == 'write' and redis.call('hexists', KEYS[1], ARGV[3]) == 1) then " +
"local ind = redis.call('hincrby', KEYS[1], ARGV[2], 1); " + //当前线程锁计数加1,
"local key = KEYS[2] .. ':' .. ind;" +
"redis.call('set', key, 1); " +
"redis.call('pexpire', key, ARGV[1]); " +
"local remainTime = redis.call('pttl', KEYS[1]); " +
"redis.call('pexpire', KEYS[1], math.max(remainTime, ARGV[1])); " +
"return nil; " +
"end;" +
"return redis.call('pttl', KEYS[1]);",
Arrays.<Object>asList(getName(), getReadWriteTimeoutNamePrefix(threadId)),
internalLockLeaseTime, getLockName(threadId), getWriteLockName(threadId));
}
@Override
protected RFuture<Boolean> unlockInnerAsync(long threadId) {
String timeoutPrefix = getReadWriteTimeoutNamePrefix(threadId);
String keyPrefix = getKeyPrefix(threadId, timeoutPrefix);
return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
"local mode = redis.call('hget', KEYS[1], 'mode'); " +
"if (mode == false) then " + //无锁,pub 消息
"redis.call('publish', KEYS[2], ARGV[1]); " +
"return 1; " +
"end; " +
"local lockExists = redis.call('hexists', KEYS[1], ARGV[2]); " +
"if (lockExists == 0) then " +
"return nil;" +
"end; " +
"local counter = redis.call('hincrby', KEYS[1], ARGV[2], -1); " + //锁计数减1 。
"if (counter == 0) then " +
"redis.call('hdel', KEYS[1], ARGV[2]); " + //删除写锁。
"end;" +
"redis.call('del', KEYS[3] .. ':' .. (counter+1)); " +
"if (redis.call('hlen', KEYS[1]) > 1) then " +
"local maxRemainTime = -3; " +
"local keys = redis.call('hkeys', KEYS[1]); " +
"for n, key in ipairs(keys) do " +
"counter = tonumber(redis.call('hget', KEYS[1], key)); " +
"if type(counter) == 'number' then " +
"for i=counter, 1, -1 do " +
"local remainTime = redis.call('pttl', KEYS[4] .. ':' .. key .. ':rwlock_timeout:' .. i); " +
"maxRemainTime = math.max(remainTime, maxRemainTime);" +
"end; " +
"end; " +
"end; " +
"if maxRemainTime > 0 then " +
"redis.call('pexpire', KEYS[1], maxRemainTime); " +
"return 0; " +
"end;" +
"if mode == 'write' then " +
"return 0;" +
"end; " +
"end; " +
"redis.call('del', KEYS[1]); " +
"redis.call('publish', KEYS[2], ARGV[1]); " +
"return 1; ",
Arrays.<Object>asList(getName(), getChannelName(), timeoutPrefix, keyPrefix),
LockPubSub.UNLOCK_MESSAGE, getLockName(threadId));
}
RedissonWriteLock
@Override
<T> RFuture<T> tryLockInnerAsync(long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
internalLockLeaseTime = unit.toMillis(leaseTime);
return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, command,
"local mode = redis.call('hget', KEYS[1], 'mode'); " +
"if (mode == false) then " + //未加锁
"redis.call('hset', KEYS[1], 'mode', 'write'); " +
"redis.call('hset', KEYS[1], ARGV[2], 1); " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return nil; " +
"end; " +
"if (mode == 'write') then " + //已存在写锁
"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " + //是当前线程持有锁。
"redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
"local currentExpire = redis.call('pttl', KEYS[1]); " +
"redis.call('pexpire', KEYS[1], currentExpire + ARGV[1]); " +
"return nil; " +
"end; " +
"end;" +
"return redis.call('pttl', KEYS[1]);", //不是获取锁,返回过期剩余时间。
Arrays.<Object>asList(getName()),
internalLockLeaseTime, getLockName(threadId));
}
@Override
protected RFuture<Boolean> unlockInnerAsync(long threadId) {
return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
"local mode = redis.call('hget', KEYS[1], 'mode'); " +
"if (mode == false) then " +
"redis.call('publish', KEYS[2], ARGV[1]); " +
"return 1; " +
"end;" +
"if (mode == 'write') then " +
"local lockExists = redis.call('hexists', KEYS[1], ARGV[3]); " +
"if (lockExists == 0) then " +
"return nil;" +
"else " +
"local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " +
"if (counter > 0) then " +
"redis.call('pexpire', KEYS[1], ARGV[2]); " +
"return 0; " +
"else " +
"redis.call('hdel', KEYS[1], ARGV[3]); " +
"if (redis.call('hlen', KEYS[1]) == 1) then " +
"redis.call('del', KEYS[1]); " +
"redis.call('publish', KEYS[2], ARGV[1]); " +
"else " +
// has unlocked read-locks
"redis.call('hset', KEYS[1], 'mode', 'read'); " +
"end; " +
"return 1; "+
"end; " +
"end; " +
"end; "
+ "return nil;",
Arrays.<Object>asList(getName(), getChannelName()),
LockPubSub.READ_UNLOCK_MESSAGE, internalLockLeaseTime, getLockName(threadId));
}
CountDownLatch
public interface RCountDownLatch extends RObject, RCountDownLatchAsync {
void await() throws InterruptedException;
boolean await(long timeout, TimeUnit unit) throws InterruptedException;
void countDown();
long getCount();
boolean trySetCount(long count);
}
public class RedissonCountDownLatch extends RedissonObject implements RCountDownLatch {
private final CountDownLatchPubSub pubSub;
private final String id;
}
trySetCount
@Override
public boolean trySetCount(long count) {
return get(trySetCountAsync(count));
}
@Override
public RFuture<Boolean> trySetCountAsync(long count) {
return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
//设置KEY的值为:count,并发布 NEW_COUNT_MESSAGE 消息。
"if redis.call('exists', KEYS[1]) == 0 then "
+ "redis.call('set', KEYS[1], ARGV[2]); "
+ "redis.call('publish', KEYS[2], ARGV[1]); "
+ "return 1 "
+ "else "
+ "return 0 "
+ "end",
Arrays.<Object>asList(getName(), getChannelName()), CountDownLatchPubSub.NEW_COUNT_MESSAGE, count);
}
await
@Override
public void await() throws InterruptedException {
if (getCount() == 0) {
return;
}
RFuture<RedissonCountDownLatchEntry> future = subscribe();
try {
commandExecutor.syncSubscriptionInterrupted(future);
while (getCount() > 0) {
// waiting for open state
future.getNow().getLatch().await();
}
} finally {
unsubscribe(future);
}
}
@Override
public long getCount() {
return get(getCountAsync());
}
@Override
public RFuture<Long> getCountAsync() {
return commandExecutor.writeAsync(getName(), LongCodec.INSTANCE, RedisCommands.GET_LONG, getName());
}
countDown
@Override
public void countDown() {
get(countDownAsync());
}
@Override
public RFuture<Void> countDownAsync() {
return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
"local v = redis.call('decr', KEYS[1]);" +
"if v <= 0 then redis.call('del', KEYS[1]) end;" +
"if v == 0 then redis.call('publish', KEYS[2], ARGV[1]) end;",
Arrays.<Object>asList(getName(), getChannelName()), CountDownLatchPubSub.ZERO_COUNT_MESSAGE);
}
pubsub
Java的Lock在获取不到锁的时候,会阻塞,然后通过notify(),notifyAll()等来唤醒。Redisson的锁,类似的流程,使用了Redis的 pubsub 功能来唤醒线程。
RLock在获取锁时,会创建一个订阅。
private void lock(long leaseTime, TimeUnit unit, boolean interruptibly) throws InterruptedException
{
RFuture<RedissonLockEntry> future = subscribe(threadId);
if (interruptibly) {
commandExecutor.syncSubscriptionInterrupted(future);
} else {
commandExecutor.syncSubscription(future);
}
try {
while (true) {
......
}
} finally {
unsubscribe(future, threadId);
}
}
//protected final LockPubSub pubSub;
protected RFuture<RedissonLockEntry> subscribe(long threadId) {
return pubSub.subscribe(getEntryName(), getChannelName());
}
protected void unsubscribe(RFuture<RedissonLockEntry> future, long threadId) {
pubSub.unsubscribe(future.getNow(), getEntryName(), getChannelName());
}
//channel名称:redisson_lock__channel:{XXX}
String getChannelName() {
return prefixName("redisson_lock__channel", getName());
}
LockPubSub是PublishSubscribe
的一个子类。
RedissonLockEntry
RedissonLockEntry用于控制阻塞,唤醒。
public interface PubSubEntry<E> {
void acquire();
int release();
RPromise<E> getPromise();
}
public class RedissonLockEntry implements PubSubEntry<RedissonLockEntry> {
//计数。
private int counter;
//使用Java 信号量进行阻塞
private final Semaphore latch;
private final RPromise<RedissonLockEntry> promise;
//监听器
private final ConcurrentLinkedQueue<Runnable> listeners = new ConcurrentLinkedQueue<Runnable>();
public RedissonLockEntry(RPromise<RedissonLockEntry> promise) {
super();
this.latch = new Semaphore(0);// 信号量是0,则acquire时阻塞。
this.promise = promise;
}
public void acquire() {
counter++; //计算加1
}
public int release() {
return --counter; //计算减1
}
public RPromise<RedissonLockEntry> getPromise() {
return promise;
}
public void addListener(Runnable listener) {
listeners.add(listener);
}
public boolean removeListener(Runnable listener) {
return listeners.remove(listener);
}
public ConcurrentLinkedQueue<Runnable> getListeners() {
return listeners;
}
public Semaphore getLatch() {
return latch;
}
}
PublishSubscribe
abstract class PublishSubscribe<E extends PubSubEntry<E>> {
//
private final PublishSubscribeService service;
//PubSubEntry 的map。
private final ConcurrentMap<String, E> entries = new ConcurrentHashMap<>();
PublishSubscribe(PublishSubscribeService service) {
super();
this.service = service;
}
//创建订阅
public RFuture<E> subscribe(String entryName, String channelName) {
AtomicReference<Runnable> listenerHolder = new AtomicReference<Runnable>();
AsyncSemaphore semaphore = service.getSemaphore(new ChannelName(channelName));
RPromise<E> newPromise = new RedissonPromise<E>() {
@Override
public boolean cancel(boolean mayInterruptIfRunning) {
return semaphore.remove(listenerHolder.get());
}
};
//listener
Runnable listener = new Runnable() {
@Override
public void run() {
//获取Entry
E entry = entries.get(entryName);
if (entry != null) {
entry.acquire();
semaphore.release();
entry.getPromise().onComplete(new TransferListener<E>(newPromise));
return;
}
//创建新Entry
E value = createEntry(newPromise);
value.acquire();
//
E oldValue = entries.putIfAbsent(entryName, value);
if (oldValue != null) {
oldValue.acquire();
semaphore.release();
oldValue.getPromise().onComplete(new TransferListener<E>(newPromise));
return;
}
//创建Listener。
RedisPubSubListener<Object> listener = createListener(channelName, value);
service.subscribe(LongCodec.INSTANCE, channelName, semaphore, listener);
}
};
semaphore.acquire(listener);
listenerHolder.set(listener);
return newPromise;
}
public void unsubscribe(E entry, String entryName, String channelName) {
//
AsyncSemaphore semaphore = service.getSemaphore(new ChannelName(channelName));
semaphore.acquire(new Runnable() {
@Override
public void run() {
if (entry.release() == 0) {//引用计数为0,则释放channel。
// just an assertion
boolean removed = entries.remove(entryName) == entry;
if (!removed) {
throw new IllegalStateException();
}
service.unsubscribe(new ChannelName(channelName), semaphore);
} else {
semaphore.release();
}
}
});
}
//对子类的覆盖的方法的封装。
private RedisPubSubListener<Object> createListener(String channelName, E value) {
RedisPubSubListener<Object> listener = new BaseRedisPubSubListener() {
@Override
public void onMessage(CharSequence channel, Object message) {
if (!channelName.equals(channel.toString())) {
return;
}
//onMessage
PublishSubscribe.this.onMessage(value, (Long) message);
}
@Override
public boolean onStatus(PubSubType type, CharSequence channel) {
if (!channelName.equals(channel.toString())) {
return false;
}
if (type == PubSubType.SUBSCRIBE) {
value.getPromise().trySuccess(value);
return true;
}
return false;
}
};
return listener;
}
}
public class LockPubSub extends PublishSubscribe<RedissonLockEntry> {
public static final Long UNLOCK_MESSAGE = 0L;
public static final Long READ_UNLOCK_MESSAGE = 1L;
public LockPubSub(PublishSubscribeService service) {
super(service);
}
@Override
protected RedissonLockEntry createEntry(RPromise<RedissonLockEntry> newPromise) {
return new RedissonLockEntry(newPromise);
}
//消息处理
@Override
protected void onMessage(RedissonLockEntry value, Long message) {
//锁释放消息
if (message.equals(UNLOCK_MESSAGE)) {
Runnable runnableToExecute = value.getListeners().poll();
if (runnableToExecute != null) {
runnableToExecute.run();
}
//Semaphore release。
value.getLatch().release();
} else if (message.equals(READ_UNLOCK_MESSAGE)) {//读锁释放
while (true) {
Runnable runnableToExecute = value.getListeners().poll();
if (runnableToExecute == null) {
break;
}
runnableToExecute.run();
}
//Semaphore release。
value.getLatch().release(value.getLatch().getQueueLength());
}
}
}
public class SemaphorePubSub extends PublishSubscribe<RedissonLockEntry> {
public SemaphorePubSub(PublishSubscribeService service) {
super(service);
}
@Override
protected RedissonLockEntry createEntry(RPromise<RedissonLockEntry> newPromise) {
return new RedissonLockEntry(newPromise);
}
@Override
protected void onMessage(RedissonLockEntry value, Long message) {
Runnable runnableToExecute = value.getListeners().poll();
if (runnableToExecute != null) {
runnableToExecute.run();
}
//Semaphore relese。
value.getLatch().release(message.intValue());
}
}
AsyncSemaphore
public class AsyncSemaphore {
//信号量计数
private volatile int counter;
//
private final Set<Entry> listeners = new LinkedHashSet<Entry>();
private static class Entry {
private Runnable runnable;
private int permits;
}
public AsyncSemaphore(int permits) {
counter = permits;
}
public void acquire(Runnable listener) {
acquire(listener, 1);
}
public void acquire(Runnable listener, int permits) {
boolean run = false;
synchronized (this) {
if (counter < permits) { //信号量acquire 不满足,则把请求加到等待队列中。
listeners.add(new Entry(listener, permits));
return;
} else {
counter -= permits; //满足,
run = true;
}
}
if (run) { //满足就运行。
listener.run();
}
}
public void release() {
Entry entryToAcquire = null;
synchronized (this) {
counter++; //信号量加1.
Iterator<Entry> iter = listeners.iterator();
if (iter.hasNext()) {
Entry entry = iter.next();
if (entry.getPermits() <= counter) { //唤醒一个可以满足的。
iter.remove();
entryToAcquire = entry;
}
}
}
if (entryToAcquire != null) {
acquire(entryToAcquire.getRunnable(), entryToAcquire.getPermits()); //有唤醒的,继续请求。
}
}
}
PublishSubscribeService
PublishSubscribeService用于操作Redis,进行pubsub。
public class PublishSubscribeService {
private static final Logger log = LoggerFactory.getLogger(PublishSubscribeService.class);
//
private final ConnectionManager connectionManager;
//
private final MasterSlaveServersConfig config;
//AsyncSemaphore 缓存
private final AsyncSemaphore [] locks = new AsyncSemaphore[50];
private final AsyncSemaphore freePubSubLock = new AsyncSemaphore(1);
private final ConcurrentMap<ChannelName, PubSubConnectionEntry> name2PubSubConnection = new ConcurrentHashMap<>();
private final Queue<PubSubConnectionEntry> freePubSubConnections = new ConcurrentLinkedQueue<>();
//Semaphore 控制 唤醒线程。
private final SemaphorePubSub semaphorePubSub = new SemaphorePubSub(this);
//
private final CountDownLatchPubSub countDownLatchPubSub = new CountDownLatchPubSub(this);
private final LockPubSub lockPubSub = new LockPubSub(this);
public PublishSubscribeService(ConnectionManager connectionManager, MasterSlaveServersConfig config) {
super();
this.connectionManager = connectionManager;
this.config = config;
for (int i = 0; i < locks.length; i++) {
locks[i] = new AsyncSemaphore(1); //设置信号量为 1 。
}
}
}
//PublishSubscribe:
public RFuture<E> subscribe(String entryName, String channelName)
{
RedisPubSubListener<Object> listener = createListener(channelName, value);
service.subscribe(LongCodec.INSTANCE, channelName, semaphore, listener);
}
//PublishSubscribe:
public void unsubscribe(E entry, String entryName, String channelName) {
service.unsubscribe(new ChannelName(channelName), semaphore);
}
// 1)
public RFuture<PubSubConnectionEntry> subscribe(Codec codec, String channelName, AsyncSemaphore semaphore, RedisPubSubListener<?>... listeners) {
RPromise<PubSubConnectionEntry> promise = new RedissonPromise<PubSubConnectionEntry>();
subscribe(codec, new ChannelName(channelName), promise, PubSubType.SUBSCRIBE, semaphore, listeners);
return promise;
}
// 2)
private void subscribe(Codec codec, ChannelName channelName,
RPromise<PubSubConnectionEntry> promise, PubSubType type, AsyncSemaphore lock, RedisPubSubListener<?>... listeners) {
//缓存
PubSubConnectionEntry connEntry = name2PubSubConnection.get(channelName);
if (connEntry != null) {
addListeners(channelName, promise, type, lock, connEntry, listeners);
return;
}
//
freePubSubLock.acquire(new Runnable() {
@Override
public void run() {
if (promise.isDone()) { //done
lock.release();
freePubSubLock.release();
return;
}
PubSubConnectionEntry freeEntry = freePubSubConnections.peek(); //下一个
if (freeEntry == null) {
connect(codec, channelName, promise, type, lock, listeners);
return;
}
int remainFreeAmount = freeEntry.tryAcquire();
if (remainFreeAmount == -1) {
throw new IllegalStateException();
}
PubSubConnectionEntry oldEntry = name2PubSubConnection.putIfAbsent(channelName, freeEntry);
if (oldEntry != null) {
freeEntry.release();
freePubSubLock.release();
addListeners(channelName, promise, type, lock, oldEntry, listeners);
return;
}
if (remainFreeAmount == 0) {
freePubSubConnections.poll();
}
freePubSubLock.release();
RFuture<Void> subscribeFuture = addListeners(channelName, promise, type, lock, freeEntry, listeners);
ChannelFuture future;
if (PubSubType.PSUBSCRIBE == type) {
future = freeEntry.psubscribe(codec, channelName);
} else {
future = freeEntry.subscribe(codec, channelName);
}
//netty :ChannelFutureListener
future.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (!future.isSuccess()) {
if (!promise.isDone()) {
subscribeFuture.cancel(false);
}
return;
}
connectionManager.newTimeout(new TimerTask() {
@Override
public void run(Timeout timeout) throws Exception {
subscribeFuture.cancel(false);
}
}, config.getTimeout(), TimeUnit.MILLISECONDS);
}
});
}
});
}
public RFuture<Void> unsubscribe(ChannelName channelName, AsyncSemaphore lock) {
PubSubConnectionEntry entry = name2PubSubConnection.remove(channelName);
if (entry == null || connectionManager.isShuttingDown()) {
lock.release();
return RedissonPromise.newSucceededFuture(null);
}
AtomicBoolean executed = new AtomicBoolean();
RedissonPromise<Void> result = new RedissonPromise<Void>();
ChannelFuture future = entry.unsubscribe(channelName, new BaseRedisPubSubListener() {
@Override
public boolean onStatus(PubSubType type, CharSequence channel) {
if (type == PubSubType.UNSUBSCRIBE && channel.equals(channelName)) {
executed.set(true);
if (entry.release() == 1) {
freePubSubConnections.add(entry);
}
lock.release();
result.trySuccess(null);
return true;
}
return false;
}
});
future.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (!future.isSuccess()) {
return;
}
connectionManager.newTimeout(new TimerTask() {
@Override
public void run(Timeout timeout) throws Exception {
if (executed.get()) {
return;
}
entry.getConnection().onMessage(new PubSubStatusMessage(PubSubType.UNSUBSCRIBE, channelName));
}
}, config.getTimeout(), TimeUnit.MILLISECONDS);
}
});
return result;
}
附录
参考
https://github.com/mrniko/redisson/wiki
中文文档:https://github.com/redisson/redisson/wiki/Redisson%E9%A1%B9%E7%9B%AE%E4%BB%8B%E7%BB%8D
redisson-2.10.4源代码分析 : https://blog.csdn.net/ly1028826685/article/details/84922706