AbstractQueueSynchronizer简称AQS(抽象的队列同步器),是重量级基础框架以及JUC体系的基石,主要用于解决锁分配给谁的问题。
AQS入门级理论知识
整体就是一个抽象的FIFO队列来完成线程获取资源排队的工作,并通过一个int类变量(state)表示持有锁的状态。
ReentrantLock、CountDownLatch、ReentrantReadWriteLock、Semaphore底层都是调用的AQS。
AQS中含有静态内部类Node用来存储阻塞线程信息,头尾指针,以及state持有锁状态
ReentrantLock与AQS
ReentrantLock实现了Lock、Serializable接口,ReentrantLock含有静态内部类Sync、FairSync、NofairSync,其中公平锁(FairSync)与非公平锁(NofairSync)都继承了Sync,而Sync继承了AbstractQueueSynchronizer(AQS)类。
创建对象时参数决定是否使用公平锁:
非公平锁在加锁时首先会尝试抢锁,
-
枪锁成功:则会修改锁状态由未被持有0更改为被持有1,然后该锁的持有者为当前线程。
-
枪锁失败:则会和公平锁一样调用AQS中 acquire(1) 方法,然后各自调用自己实现AQS(Sync)中tryAcquire(1)方法。
static final class NonfairSync extends Sync {
private static final long serialVersionUID = 7316153563782823691L;
/**
* Performs lock. Try immediate barge, backing up to normal
* acquire on failure.
*/
final void lock() {
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
else
acquire(1);
}
protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}
}
static final class FairSync extends Sync {
private static final long serialVersionUID = -3000897897090466540L;
final void lock() {
acquire(1);
}
/**
* Fair version of tryAcquire. Don't grant access unless
* recursive call or no waiters or is first.
*/
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
}
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
非公平锁的tryAcquire()方法:
首先调用NonfairSync的tryAcquire()方法然后进入Sync中的nonfairTryAcquire()方法,
获取当前线程,获取锁持有状态。1被持有,0则未被持有。若锁未被
-
state == 1,锁被持有。比较持有锁的线程是否是当前线程,
-
state == 0,锁未被持有,比较并交换锁状态,设置当前锁持有线程,返回true。
protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
addWaiter():
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
enq(node);
return node;
}
private Node enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) { // Must initialize
if (compareAndSetHead(new Node()))
tail = head;
} else {
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
Node.EXCLUSIVE 独占、值为null,创建一个含有当前线程信息的Node节点,
acquireQueued(addWaiter(Node.EXCLUSIVE), arg)