AQS自身属性:
private transient volatile Node head;
private transient volatile Node tail;
private volatile int state;
Node属性:
// 共享
static final Node SHARED = new Node();
// 独占
static final Node EXCLUSIVE = null;
// 线程被取消了
static final int CANCELLED = 1;
// 后继线程需要唤醒
static final int SIGNAL = -1;
// 等待condition唤醒
static final int CONDITION = -2;
// 共享式同步状态获取将会无条件地传播下去
static final int PROPAGATE = -3;
// 初始为0,状态是上面的几种
volatile int waitStatus; // 很重要
// 前置节点
volatile Node prev;
// 后置节点
volatile Node next;
volatile Thread thread;
-
1. 以ReentrantLock为例,其他类举一反三,方法lock()
-
2. Lock接口实现类,基本都是通过【聚合】了一个 【队列同步器】的子类完成线程访问控制的Sync实现lock;Sync继承AQS;Sync又有两个实现类。
-
// 公平与非公平锁的构造 public ReentrantLock() { sync = new NonfairSync(); } public ReentrantLock(boolean fair) { sync = fair ? new FairSync() : new NonfairSync(); }
公平锁与非公平锁的实现:
-
1.0 // 非公平锁 static final class NonfairSync extends Sync { private static final long serialVersionUID = 7316153563782823691L; final void lock() { if (compareAndSetState(0, 1)) // 这里一开始省略了acquire,能抢到直接抢,抢不到再排队 setExclusiveOwnerThread(Thread.currentThread()); else acquire(1); } protected final boolean tryAcquire(int acquires) { return nonfairTryAcquire(acquires);// 实现展开在下面 } } 1.1 // 非公平锁的 tryAcquire实现细节 final boolean nonfairTryAcquire(int acquires) { final Thread current = Thread.currentThread(); int c = getState(); if (c == 0) { // 这里判断状态,1是被占用,0是不被占用。再尝试抢一下,如果正好线程释放锁那么就正好抢到 if (compareAndSetState(0, acquires)) { // 这里和公平锁相比缺少了一个hasQueuedPredecessors() setExclusiveOwnerThread(current); // 设置获取排他锁的线程 return true; // } } else if (current == getExclusiveOwnerThread()) { // 这里判断是不是自己持有锁,自己持有就进去,毕竟可重入锁 int nextc = c + acquires; if (nextc < 0) // overflow throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } return false; // 即不空闲也不是自己占用锁,则返回false } 2.0 // 公平锁: static final class FairSync extends Sync { private static final long serialVersionUID = -3000897897090466540L; final void lock() { acquire(1); // acquire方法 -> tryAcquire(arg) -> protected boolean tryAcquire(int arg) // tryAcquire是父类定义的,这里使用了模版设计模式 // 见下面的函数:2.1 } protected final boolean tryAcquire(int acquires) { // tryAcquire的实现 final Thread current = Thread.currentThread(); int c = getState(); // 获取状态位 if (c == 0) { if (!hasQueuedPredecessors() && // 公平锁分支,公平与非公平就这里不一样,详细在下面2.2 compareAndSetState(0, acquires)) { setExclusiveOwnerThread(current); return true; } } else if (current == getExclusiveOwnerThread()) { int nextc = c + acquires; if (nextc < 0) throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } return false; } } 2.1 // 父类不实现,下放到子类实现 // 子类有很多实现类,这里参考FairSync的实现,代码在上面 protected boolean tryAcquire(int arg) { throw new UnsupportedOperationException(); } 2.2 // hasQueuedPredecessors // 逻辑很简单,就是判断一下队列前面是否有排队线程,如果返回true,说明有一个排队的线程;返回false说明这个线程是队列的头 public final boolean hasQueuedPredecessors() { Node t = tail; // Read fields in reverse initialization order Node h = head; Node s; return h != t && ((s = h.next) == null || s.thread != Thread.currentThread()); }
接下来以非公平锁为突破口:
-
1. 对比公平锁和非公平锁的 tryAcquire()方法的实现代码,其实差别就在于非公平锁获取锁时比公平锁中少了一个判断hasQuevedPredecessors()
-
2. hasQueuedPredecessors() 中判断了是否需要排队,导致公平锁和非公平锁的差异如下:
-
1. 公平锁:公平锁讲究先来先到,线程在获取锁时如果这个锁的等待队列中己经有线程在等待,那么当前线程会进入等待队列中
-
2. 非公平锁:不管是否有等待队列,如果可以获取锁,则立刻占有锁对象。也就是说队列的第一个排队线程苏醒后,不一定就是排头的这个线程得锁,它还是需要参加竞争锁(存在线程竞争的情况下),后来的线程可能不讲武德插队夺锁了。
-
引出源码三部曲:
-
1. tryAcquire
-
2. addWaiter
-
3. acquireQueued
lock方法:
// 对比公平与非公平的lock方法:
// 公平锁lock
final void lock() {
acquire(1);
}
// 非公平锁
final void lock() {
if (compareAndSetState(0, 1)) // CAS操作直接抢锁
setExclusiveOwnerThread(Thread.currentThread()); // 设置拥有独占量的线程
else
acquire(1); // 抢不到再去排队
}
// 这里非公平的复杂,我们看非公平锁的代码:
acquire三大流向:
1.0 acquire方法
// 公平与非公平的acquire方法一样的
public final void acquire(int arg) {
if (!tryAcquire(arg) && // tryAcquire方法是再次尝试抢一下锁,看看是否空闲或者自己持有锁,具体实现在前面的代码里。这里如果失败就是false,取反就是true,继续往下走,执行addWaiter
acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) // Node.EXCLUSIVE表示独占排他
selfInterrupt();
}
2.0 tryAcquire方法
// acquire方法第一个执行的方法,也是分支的第一支
// 顶层父类并未实现改方法,AQS类中
protected boolean tryAcquire(int arg) {
throw new UnsupportedOperationException();
}
// 公平锁和非公平锁都对此方法进行了实现,非公平只是缺少了hasQueuedPredecessors()方法,代码在前面有展示
3.0 addWaiter方法
// 如果执行2.0抢锁失败
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode); // 封装线程,进入队列
Node pred = tail; // 设置前置为tail
if (pred != null) { // 如果pred不为空,意思就是不是队列的第一个元素,执行下列操作
node.prev = pred; // 更新前置节点
if (compareAndSetTail(pred, node)) { // 3.0.1CAS的修改尾部元素,并且直接返回node,不再执行enq入队操作
pred.next = node;
return node;
}
}
enq(node); // 进入队列,当且仅当node是pred==null时才会执行
return node;
}
3.1 enq方法:
// 入队操作:
private Node enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) { // 如果tail为空,则需要初始化,这里头节点并没有使用实参node,而是new了一个空节点,
if (compareAndSetHead(new Node())) // CAS的修改队首,期望值为null,改为空节点,也叫虚拟节点,作用就是站位
tail = head; // 将头尾指针都指向这个节点
} else { // 如果tail不为空(可能一开始tail为空,但是是一个for true循环,第一次初始化之后就会走这一步)
node.prev = t; // 将t设置为node节点的前置节点,
if (compareAndSetTail(t, node)) { //CAS的修改尾节点,这部分代码逻辑和3.0.1 逻辑一样
t.next = node;
return t;
}
}
}// 注意这是个循环,保证这个里面一定会执行到else,
}
4.0 acquireQueued方法
// 解释了线程节点如何在队列里自旋
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false; // 中断标记
for (;;) {
final Node p = node.predecessor(); // 获取当前节点的前置节点
if (p == head && tryAcquire(arg)) { // 如果前置节点是head,再次尝试获取抢锁
setHead(node); // 细节见4.1
p.next = null; // 意思是将获取到锁的节点变成虚拟头节点,之前的虚拟头节点废弃掉,直接GC
failed = false; // 入队成功,则改为false
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) && // 细节见4.2 既改变状态,又返回boolean值
parkAndCheckInterrupt()) // 细节见4.3 等待lockSupport发放许可
interrupted = true; // 修改interrupted为true
}
} finally {
if (failed) // 如果入队失败,则取消排队
cancelAcquire(node);
}
}
4.1 setHead
private void setHead(Node node) {
head = node;
node.thread = null;
node.prev = null;
}
4.2 shouldParkAfterFailedAcquire方法
//
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) { // 此处pred就是head节点
int ws = pred.waitStatus; // head节点状态默认为wait 0
if (ws == Node.SIGNAL) // 如果是-1则执行
return true;
if (ws > 0) { // 大于0
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else { // status = 0跑到这里来
compareAndSetWaitStatus(pred, ws, Node.SIGNAL); // 后面的节点将他前面的节点status改为-1
}
return false;
}
4.3 parkAndCheckInterrupt
// 发放许可并不是卡住线程的关键,他只是保证,进入队列的线程节点一定是按照顺序一个个执行的
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this); // 从acquireQueued方法过来,如果没有获得许可就一直在这卡着
return Thread.interrupted(); // 正常执行,没有意外或被打断就是false,跳转到4.0中继续
}
unlock方法:
0.0 unlock
//
public void unlock() {
sync.release(1);
}
1.0 release
// 释放锁
public final boolean release(int arg) {
if (tryRelease(arg)) { // 细节见1.1 此时执行结果为true
Node h = head; // 获取头节点
if (h != null && h.waitStatus != 0) // 头节点不等于null,且头节点wait!=0,(之前修改为SIGNAL -1)
unparkSuccessor(h); // 细节见1.2,传入参数为头节点
return true;
}
return false;
}
1.1
// tryRelease
protected final boolean tryRelease(int releases) { // releases = 1
int c = getState() - releases; // c =0
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
if (c == 0) { // 进入这里
free = true; // free = true
setExclusiveOwnerThread(null);
}
setState(c);
return free; // 返回true
}
1.2 unparkSuccessor
//
private void unparkSuccessor(Node node) {
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0); // 头节点进来之后,会将wait的-1改为0
Node s = node.next; // 获取头节点的下一个节点,即第一个真实任务节点
if (s == null || s.waitStatus > 0) { // 节点不为null,不进来
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null) // 节点不为null,进入这里
LockSupport.unpark(s.thread); // 给线程节点发放许可,继续进入到1.0中;此处受影响的还有acquire方法代码块的4.3
// 不过发放许可并不代表中一定是这个线程获取锁,这就是非公平锁的精髓,总会有不讲武德插队的,获取许可只是有获取锁的可能,是否真的获取到还要看能否抢到锁。这里发放许可,使得被park的线程唤醒了,可以去tryAcquire操作,依然可能会失败,继续被阻塞
// 这里如果不使用park和unpark,则会一直执行for,导致cpu负载过重
}
异常情况
模拟某个线程不想等待了,想取消排队
-
1. 队尾元素离开
-
2. 队中间的元素离开
-
3. 队中间多个元素离开
1.0 cancelAcquire方法 // 取消入队 private void cancelAcquire(Node node) { // Ignore if node doesn't exist if (node == null) return; node.thread = null; // 将线程赋值null Node pred = node.prev; // 获取前一个节点 while (pred.waitStatus > 0) // waitStatus>0意思就是取消状态,一直向前获取到一个不会取消的头节点, node.prev = pred = pred.prev; // 最坏情况就是获取到虚拟头节点 Node predNext = pred.next; // 前置节点的next指向当前节点的next node.waitStatus = Node.CANCELLED; // 状态设置为取消 if (node == tail && compareAndSetTail(node, pred)) { // 如果node是队尾节点,CAS将pre节点设置为尾节点 compareAndSetNext(pred, predNext, null); // 同时CAS设置pred节点的next为null } else { // 如果不是队尾节点 int ws; if (pred != head && // 如果前置节点不是头节点且 ((ws = pred.waitStatus) == Node.SIGNAL || //前置节点的状态为-1或状态小于0并且修改为-1成功 (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) && pred.thread != null) { // 且pred线程不为空 Node next = node.next; // 获取当前节点的next if (next != null && next.waitStatus <= 0) // next不为空且status<=0时 compareAndSetNext(pred, predNext, next); } else { unparkSuccessor(node); } node.next = node; // help GC } }