AQS
AQS(AbstractQueuedSynchronizer):抽象队列同步器,是一种用来构建锁和同步器的框架。在是JUC下一个重要的并发类,例如:ReentrantLock、Semaphore、CountDownLatch、LimitLatch等并发都是由AQS衍生出来的。
理解
CLH队列
是一种基于链表的可扩展,高性能,公平的自旋锁。它的队列中每个节点等待前驱节点释放锁,当前置节点执行完成,才会唤醒后置节点,这样最多只有后置节点和新进入的线程(非公平锁状态下)来抢占CPU资源,其余线程处于阻塞状态,极大地减少了CPU开销(同时将多个线程唤醒时,唤醒的线程将进入RUNABLE状态,但是只有一个线程会从竞争获取到锁,而其他线程将会处于BLOCKED状态)。
CLH的出列
当node1出列时,就会把当head的指向出队的下个节点node2,同时也把node1的上下关系断开。
CLH的入列
队列初始化的时候,head和tail都为空。此时有节点入列,这把当前节点存放到队列尾部节点的下个节点中,在让tail指向这个新增节点,同时也会处理入队的节点的上下节点关系。
CAS
全称为Compare-And-Swap,它是一条CPU并发原语,也是一种乐观锁(类似数据库设计表时添加的version字段)。执行过程:如果当前状态值(内存中在stateOffset位置的值)等于预期值(expect),则自动将同步状态设置为给定的更新值(update);反之则重试(好像是10次),重试后依旧无果,那么就更新失败。
protected final boolean compareAndSetState(int expect, int update) {
// See below for intrinsics setup to support this
// stateOffset指向一段内存,通过这个可以准确地告诉你某个字段相对于对象的起始内存地址的字节偏移,便于比较。
return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
}
过程
-
抢锁
公平锁
按照线程锁的添加顺序(由CLH来保证)来获取锁。public final void acquire(int arg) { if (!tryAcquire(arg) && // 尝试获取锁 acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) // 获取锁失败,添加到队列中 selfInterrupt();// 添加到队列成功后,就进行中断状态 } // 返回false时,获取锁失败,返回true,获取成功 protected final boolean tryAcquire(int acquires) { final Thread current = Thread.currentThread(); int c = getState(); // 获取锁的状态state if (c == 0) { if (!hasQueuedPredecessors() && //判断队列中是否有线程在等待 compareAndSetState(0, acquires)) {// 尝试去修改锁的状态 setExclusiveOwnerThread(current);// 修改成功,就把自己设置成独占访问权限 return true; } } else if (current == getExclusiveOwnerThread()) { // 自旋 int nextc = c + acquires; if (nextc < 0) throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } return false; }
非公平锁
等待队列中的首位和正要获取的锁的线程(没有添加到队列中),有同等的机会获取锁。final void lock() { if (compareAndSetState(0, 1))//尝试修改锁的状态 setExclusiveOwnerThread(Thread.currentThread());// 修改成功,就把自己设置成独占访问权限 else acquire(1); // 就加入等待队列中 } protected final boolean tryAcquire(int acquires) { return nonfairTryAcquire(acquires);// 已非公平锁的方式获取锁, } // final boolean nonfairTryAcquire(int acquires) { final Thread current = Thread.currentThread(); int c = getState(); if (c == 0) {// 处于空闲时,和队列中等待的线程竞争获取锁。 if (compareAndSetState(0, acquires)) { setExclusiveOwnerThread(current); return true; } } else if (current == getExclusiveOwnerThread()) {// 自旋 int nextc = c + acquires; if (nextc < 0) // overflow throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } return false; }
-
释放锁
asdfasd -
入队、阻塞
// 线程入列 final boolean acquireQueued(final Node node, int arg) { boolean failed = true; try { boolean interrupted = false; for (;;) { final Node p = node.predecessor(); // 节点的前面的节点 if (p == head && tryAcquire(arg)) {// 如果是前置节点是头结点,就再次尝试获取锁 setHead(node); p.next = null; // help GC failed = false; return interrupted; } if (shouldParkAfterFailedAcquire(p, node) && // 如果获取锁失败,修改线程状态 parkAndCheckInterrupt()) interrupted = true;// 线程状态 } } finally { if (failed) // 当前添加的节点获取到锁时,取消正在尝试获取的节点 cancelAcquire(node); } } // 判断节点状态,用于判断是否可以中断,同时处理掉线程中已经中断的节点 private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) { int ws = pred.waitStatus; if (ws == Node.SIGNAL)// 正常阻塞的节点 SIGNAL=-1 /* * This node has already set status asking a release * to signal it, so it can safely park. */ return true; if (ws > 0) {// 节点状态错误,删除掉这样的节点 /* * Predecessor was cancelled. Skip over predecessors and * indicate retry. */ do { node.prev = pred = pred.prev; } while (pred.waitStatus > 0); pred.next = node; } else { /* * waitStatus must be 0 or PROPAGATE. Indicate that we * need a signal, but don't park yet. Caller will need to * retry to make sure it cannot acquire before parking. */ compareAndSetWaitStatus(pred, ws, Node.SIGNAL);// 将前置节点修改位-1,表示后续节点将被阻塞。 } return false; } // 队列是否有前置线程 public final boolean hasQueuedPredecessors() { // The correctness of this depends on head being initialized // before tail and on head.next being accurate if the current // thread is first in queue. Node t = tail; // Read fields in reverse initialization order Node h = head; Node s; return h != t && ((s = h.next) == null || s.thread != Thread.currentThread()); // h = t时,队列中至少有两个队列(未初始化的时候,h,t都为null,说明没有前执行的线程,返回false;如多队列中只有一个线程的时候,h=t, 这个线程一定正在执行,返回false;队列中存在多个时, // h != t时,(s = h.next) == null 这个条件可能是为了在end()中,给head设置new Node时,还未将head赋值给tail时做的处理,s.thread != Thread.currentThread()用于判断是不是自旋) } // 添加到队列中 private Node addWaiter(Node mode) { Node node = new Node(Thread.currentThread(), mode); // Try the fast path of enq; backup to full enq on failure Node pred = tail; if (pred != null) {// 尾节点不是空花,就添加到最后 node.prev = pred; if (compareAndSetTail(pred, node)) {// 把当前节点设置成尾节点 pred.next = node;// 把原来的未节点的下分节点设置成当前添加的节点 return node; } } enq(node);// 兜底的手段:1)一定要把当前节点添加到队列中 2)对位未初始化,初始化队列中 return node; } // 通过死循环添加到队列中 private Node enq(final Node node) { for (;;) { Node t = tail; if (t == null) { // Must initialize if (compareAndSetHead(new Node()))// 头节未nul时,增加一个空的新节点 tail = head;// 让尾节点等于头节点 } else { node.prev = t; if (compareAndSetTail(t, node)) {// 把当前节点添加到未节点 t.next = node; return t; } } } }
-
唤醒
// 释放锁
public void unlock() {
sync.release(1);
}
// 释放锁的具体实现
public final boolean release(int arg) {
if (tryRelease(arg)) {// 释放成功后,
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);//唤醒当前的头结点
return true;
}
return false;
}
// 尝试释放
protected final boolean tryRelease(int releases) {
int c = getState() - releases;
if (Thread.currentThread() != getExclusiveOwnerThread())// 释放锁的线程和获取锁的线程不同,抛出异常
throw new IllegalMonitorStateException();
boolean free = false;
if (c == 0) {// 为0是,说明锁处于空闲状态
free = true;
setExclusiveOwnerThread(null);// 修改当前获取锁的线程为空
}
setState(c);// 修改锁的状态
return free;// c != 0时,说明自选还未完成,释放锁失败。
}
// 唤醒下个节点
private void unparkSuccessor(Node node) {
/*
* If status is negative (i.e., possibly needing signal) try
* to clear in anticipation of signalling. It is OK if this
* fails or if status is changed by waiting thread.
*/
int ws = node.waitStatus;
if (ws < 0)// 节点的线程状态处于等待中,就更新为0.
compareAndSetWaitStatus(node, ws, 0);
/*
* Thread to unpark is held in successor, which is normally
* just the next node. But if cancelled or apparently null,
* traverse backwards from tail to find the actual
* non-cancelled successor.
*/
Node s = node.next;
if (s == null || s.waitStatus > 0) {// 线程可能超时,或者中断了,无法在唤醒了
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)// 从队列尾部开始唤醒可以获取锁的线程,直到上一个完成的线程(t).这里从尾部开始的原因可能是越早的线程越容易超时(中断),所以从尾部获取比较快速。
if (t.waitStatus <= 0)// 线程状态必须<=0。由于线程在由初始化0变成-1时,不是一个原子,所以这里位<=0
s = t;
}
if (s != null)// 当线程存在时,唤醒线程
LockSupport.unpark(s.thread);
}
- 出列
在添加队列的时候调用acquireQueued时,会将head出列,交给gc回收
机制
- state
锁的状态:- 0:锁处于空闲状态
- >=1: 已经有线程获取到锁了(大于1 自选)
/**
* The synchronization state.
*/
private volatile int state;
- waitState
线程状态:- -1:表示该节点的后继节点被阻塞(或即将被阻塞),因此当前节点在释放或取消时必须解除其后继节点的阻塞。
- 0:初始化的状态
- -2:条件状态(暂时不清楚)
- >=1:表示由于超时或中断,此节点被取消,节点永远不会离开这种状态。取消了节点的线程不会再阻塞其它线程。