Semaphore到底该如何使用
事情的起因是最近在看redisson的源码,刚好看到了RedissonSemaphore的acquire/release实现。
public RFuture<Void> releaseAsync(int permits) {
if (permits < 0) {
throw new IllegalArgumentException("Permits amount can't be negative");
}
if (permits == 0) {
return new CompletableFutureWrapper<>((Void) null);
}
RFuture<Void> future = commandExecutor.syncedEval(getRawName(), StringCodec.INSTANCE, RedisCommands.EVAL_VOID,
"local value = redis.call('incrby', KEYS[1], ARGV[1]); " +
"redis.call('publish', KEYS[2], value); ",
Arrays.asList(getRawName(), getChannelName()), permits);
if (log.isDebugEnabled()) {
future.thenAccept(o -> {
log.debug("released, permits: {}, name: {}", permits, getName());
});
}
return future;
}
这段代码并没有去校验key是否存在,以及设置的semaphore的最大值是多少,直接进行了incr操作。
如果其他客户端存在重复release()的行为,就会导致凭证超发的情况发生。
因此回过头去看了一下jdk的semaphore的实现。
Semaphore semaphore = new Semaphore(1);
semaphore.acquire();
semaphore.release();
semaphore.release();
semaphore.release();
semaphore.release();
boolean b1 = semaphore.tryAcquire();
boolean b2 = semaphore.tryAcquire();
boolean b3 = semaphore.tryAcquire();
System.out.println(b1 + ":" + b2 + ":" + b3);
这段代码的执行结果是:true:true:true
JDK的semaphore的release()核心代码如下:
protected final boolean tryReleaseShared(int releases) {
for (;;) {
int current = getState();
int next = current + releases;
if (next < current) // overflow
throw new Error("Maximum permit count exceeded");
if (compareAndSetState(current, next))
return true;
}
}
这里只是在release的时候,将当前current和next的值做比对,防止并发情况下修改值异常问题,并不会去校验new Semaphore()时传入的初始化值。其中getState()返回的就是初始化时的state。
protected final int getState() {
return state;
}
在tryReleaseShared()方法的compareAndSetState(current, next)中,会修改这个state的值。
所以在JDK的semaphore实现中,new Semaphore(int permits)传入的值,只是一个初始化的值,如果在实际使用的时候,进行了重复release()释放,就会导致多余的凭证超发的问题。
在了解了JDK的semaphore的实际实现后,就不难理解redisson的RedissonSemaphore实现了,是完全与JDK保持一致的。
在JDK的semaphore使用时,需要保证多线程进行凭证释放的时候,不会重复release(),防止凭证超发,而使用RedissonSemaphore的时候,一般是分布式的场景,所以常见的凭证超发的情况可能就在于重复消费,所以需要调用方自己保证调用幂等,避免凭证超发。