AQS
AbstractQueuedSynchronizer抽象同步队列简称AQS,它是实现同步器的基础组件,并发包中锁的底层就是使用AQS实现。
类图如下,AbstractQueuedLongSynchronizer与AbstractQueuedSynchronizer结构一模一样,只是AbstractQueuedSynchronizer内部状态变量是
int类型的,AbstractQueuedLongSynchronizer内部的状态变量是
long类型的。
AbstractOwnableSynchronizer
这个类主要记录独占模式下占用资源的线程。
//线程独占的同步器
public abstract class AbstractOwnableSynchronizer implements java.io.Serializable {
private static final long serialVersionUID = 3737899427754241961L;
protected AbstractOwnableSynchronizer() { }
/**
* 独占模式同步的所有者线程
*/
private transient Thread exclusiveOwnerThread;
/**
* 设置当前拥有独占访问权的线程。null表示没有线程拥有访问权。
* @param thread the owner thread
*/
protected final void setExclusiveOwnerThread(Thread thread) {
exclusiveOwnerThread = thread;
}
/**
* 返回setExclusiveOwnerThread方法最后设置的线程,如果从未设置,则返回null。
* @return exclusiveOwnerThread
*/
protected final Thread getExclusiveOwnerThread() {
return exclusiveOwnerThread;
}
}
AbstractQueuedSynchronizer
结构
AQS是一个FIFO的双向队列,其内部通过节点head和tail记录队头和队尾元素,队列元素为Node类型。
public abstract class AbstractQueuedSynchronizer
extends AbstractOwnableSynchronizer
implements java.io.Serializable {
/**
* 延迟初始化的等待队列头部。除了初始化之外,它只能通过setHead方法修改
* 注意:如果head已经存在,它的waitStatus保证不是CANCELLED。
*/
private transient volatile Node head;
/**
* 延迟初始化的等待队列尾部。只能通过enq方法添加新的等待节点修改
*/
private transient volatile Node tail;
/**
* 同步状态,由子类实现其具体含义。对于ReentrantLock来说,state可以用来表示当前线程
* 获取锁的可重入次数;
*/
private volatile int state;
private static final Unsafe unsafe = Unsafe.getUnsafe();
// state字段的偏移量,CAS用
private static final long stateOffset;
// head字段的偏移量,CAS用
private static final long headOffset;
// tail字段的偏移量,CAS用
private static final long tailOffset;
// Node的waitStatus字段的偏移量,CAS用
private static final long waitStatusOffset;
// Node的next字段的偏移量,CAS用
private static final long nextOffset;
//......
}
enq方法
入队的方法,返回node的前驱节点
private Node enq(final Node node) {
//死循环入队,直到成功退出
for (;;) {
Node t = tail;
if (t == null) { 队尾为空先初始化队头,队尾,插入一个哨兵节点做头
if (compareAndSetHead(new Node()))
tail = head;
} else {
//前驱先链接
node.prev = t;
if (compareAndSetTail(t, node)) {//CAS操作,将队尾指针指向node
t.next = node;//CAS成功则将t的后继指向node
return t;
}
}
}
}
对于AQS来说,线程同步的关键是对状态值state进行操作。根据state是否属于某个线程,操作state的方式分为
独占方式和共享方式。
独占模式
使用独占方式获取的资源是与具体线程绑定的,就是说如果一个线程获取到了资源,就会标记是这个线程获取到了,其他线程再尝试操作state变量获取资源时会发现当前该资源不是自己持有的,就会在获取失败后被阻塞。
在独占方式下获取和释放资源使用的方法为:
//不响应中断
public final void acquire(int arg);
//响应中断
public final void acquireInterruptibly(int arg);
public final boolean release(int arg);
acquire
tryAcquire在AQS里没有实现,需要子类配合state变量的具体含义实现。使用tryAcquire方法尝试获取资源,具体是设置状态变量state的值,成功则直接返回,失败则将当前线程封装为类型为EXCLUSIVE的Node节点后插入到AQS阻塞队列的尾部,并调用LockSupport.park方法挂起自己。
public final void acquire(int arg) {
// 如果tryAcquire失败了,则线程包装程node入队并挂起
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {//成功acquire
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
//如果失败后应该挂起,则调用LockSupport.park挂起当前线程,并返回中断状态
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
// 先尝试一次入队,不行再调用enq死循环入队,直到成功
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
enq(node);
return node;
}
release
tryRelease也是交给子类实现。
当一个线程调用release方法时会尝试使用tryRelease操作释放资源,一般是设置状态变量state的值,然后调用LockSupport.unpark方法唤醒AQS队列里面被阻塞的一个线程。被唤醒的线程在acquireQueued方法里继续使用tryAcquire尝试,如果成功则该线程被真正唤醒继续运行,否则仍然会被放入AQS队列并被挂起。
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)//唤醒head的后继
unparkSuccessor(h);
return true;
}
return false;
}
private void unparkSuccessor(Node node) {
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
Node s = node.next;
//node的后继是空或者被取消,则从队列尾部开始查找,找到第一个不是取消状态的node,唤醒
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
LockSupport.unpark(s.thread);
}
共享模式
获取共享方式的资源与具体线程是不相关的,当多个线程去请求资源时通过CAS方式竞争获取资源,当一个线程获取到了资源后,另外一个线程再次去获取时如果当前资源还能满足它的需要,则当前线程只需要使用CAS方式进行获取即可。
在共享方式下获取和释放资源的方法为:
//不响应中断
public final void acquireShared(int arg);
//响应中断
public final void acquireSharedInterruptibly(int arg);
public final boolean releaseShared(int arg);
acquireShared
tryAcquireShared也是交给子类实现。
首先使用tryAcquireShared尝试获取资源,成功则直接返回,失败则将当前线程封装为类型为SHARED的Node节点后插入到AQS阻塞队列的尾部,并使用LockSupport.park方法挂起自己。
public final void acquireShared(int arg) {
if (tryAcquireShared(arg) < 0)
doAcquireShared(arg);
}
private void doAcquireShared(int arg) {
//共享模式的节点
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head) {
int r = tryAcquireShared(arg);
if (r >= 0) {
setHeadAndPropagate(node, r);
p.next = null; // help GC
if (interrupted)
selfInterrupt();
failed = false;
return;
}
}
//如果失败后应该挂起,则调用LockSupport.park挂起当前线程,并返回中断状态
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
releaseShared
和tryRelease实现差不多,首先尝试使用tryReleaseShared操作释放资源,成功则使用LockSupport.unpark方法唤醒AQS队列里面被阻塞的一个线程。被唤醒的线程继续调用tryReleaseShared获取资源,如果成功则该线程被真正唤醒继续运行,否则仍然会被放入AQS队列并被挂起。
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}
private void doReleaseShared() {
for (;;) {
Node h = head;
if (h != null && h != tail) {
int ws = h.waitStatus;
if (ws == Node.SIGNAL) {
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue;
unparkSuccessor(h);
}
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue;
}
if (h == head) // 如果队头被修改了则再次循环
break;
}
}
内部类Node
Node是AQS内部类,有两个用途,一是用于AQS阻塞队列,二是用来作为条件变量的条件队列。
Node中的thread变量用来存放进入AQS队列里的线程;
Node节点内部的SHARED用来标记该线程是获取共享资源时被阻塞挂起后放入AQS队列的,EXCLUSIVE用来标记线程是获取独占资源时被挂起后放入AQS队列的;
waitStatus记录当前线程等待状态,可以为CANCELLED(线程被取消了)、SIGNAL(线程需要被唤醒)、CONDITION(线程在条件队列里面等待)、PROPAGATE(释放共享资源时需要通知其他节点);
prev记录当前节点的前驱节点,next记录当前节点的后继节点。
//等待队列的节点
static final class Node {
/** 标记节点正在共享模式下等待 */
static final Node SHARED = new Node();
/** 标记节点正在独占模式下等待 */
static final Node EXCLUSIVE = null;
/** waitStatus的值,表示线程已取消 */
static final int CANCELLED = 1;
/** waitStatus的值,表示后继节点的线程需要被唤醒 */
static final int SIGNAL = -1;
/** waitStatus的值,表示线程正在条件变量上等待 */
static final int CONDITION = -2;
/** waitStatus的值,表示释放共享资源时需要通知其他节点 */
static final int PROPAGATE = -3;
/**
* 状态:取值为CANCELLED、SIGNAL、CONDITION、PROPAGATE或0
*/
volatile int waitStatus;
/** 前驱节点 */
volatile Node prev;
/** 后继节点 */
volatile Node next;
/** 进入AQS队列的等待线程 */
volatile Thread thread;
/** 下一个在条件变量上等待的节点,或者是SHARED */
Node nextWaiter;
Node() {} // 用来初始化头结点或者标记为SHARED模式
Node(Thread thread, Node mode) { // addWaiter方法使用
this.nextWaiter = mode;
this.thread = thread;
}
Node(Thread thread, int waitStatus) { // 条件变量使用
this.waitStatus = waitStatus;
this.thread = thread;
}
}
内部类ConditionObject
这个类是Condition接口的实现,用来结合锁实现线程同步。ConditionObject可以直接访问AQS对象内部的变量,如state状态值和AQS队列。每个ConditionObject对应一个条件队列(单向链表),其用来存放调用条件变量的await方法后被阻塞的线程,这个条件队列的头、尾元素分别为firstWaiter和lastWaiter。
public class ConditionObject implements Condition, java.io.Serializable {
private static final long serialVersionUID = 1173984872572414699L;
/** 条件队列的首个节点 */
private transient Node firstWaiter;
/** 条件队列的最后一个节点 */
private transient Node lastWaiter;
public ConditionObject() { }
/** 退出等待时重新中断 */
private static final int REINTERRUPT = 1;
/** 退出等待时抛出InterruptedException异常 */
private static final int THROW_IE = -1;
//......
}
await方法
条件变量的挂起方法,响应中断,awaitUninterruptibly不响应中断。该方法将当前线程封装为
CONDITION类型的node进入条件队列,并释放掉获得的锁,然后挂起。
public final void await() throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
//当前线程封装为node入条件队列
Node node = addConditionWaiter();
//释放获取的锁
int savedState = fullyRelease(node);
int interruptMode = 0;
while (!isOnSyncQueue(node)) {
LockSupport.park(this);//挂起
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
if (node.nextWaiter != null) // 清理取消的node
unlinkCancelledWaiters();
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
}
signal
条件变量的唤醒方法,isHeldExclusively方法由AQS子类实现,该方法主要是将条件队列的头结点转移到AQS队列中,并唤醒该节点的线程,等待机会获取锁。
public final void signal() {
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
Node first = firstWaiter;
if (first != null)
doSignal(first);
}
private void doSignal(Node first) {
do {
if ( (firstWaiter = first.nextWaiter) == null)
lastWaiter = null;
first.nextWaiter = null;
} while (!transferForSignal(first) &&//将头结点从条件队列转移到AQS队列
(first = firstWaiter) != null);
}
final boolean transferForSignal(Node node) {
//防止节点取消
if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
return false;
//AQS队列入队
Node p = enq(node);
int ws = p.waitStatus;
if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
LockSupport.unpark(node.thread);//唤醒node的线程
return true;
}
锁、条件变量、队列的关系