目录
队列同步器(AQS)
独占锁示例
AQS之同步队列结构
解析AQS实现
队列同步器(AQS)
队列同步器AbstractQueuedSynchronizer(以下简称同步器),是用来构建锁或者其他同步组 件的基础框架,它使用了一个int成员变量表示同步状态,通过内置的FIFO队列来完成资源获 取线程的排队工作,并发包的作者(Doug Lea)期望它能够成为实现大部分同步需求的基础。
-
同步器的主要使用方式是继承,子类通过继承同步器并实现它的抽象方法来管理同步状态。
-
使用同步器提供的3 个方法(getState()、setState(int newState)和compareAndSetState(int expect,int update))对同步状态进行更改,因为它们能够保证状态的改变是安全的。
同步器是实现锁(也可以是任意同步组件)的关键,在锁的实现中聚合同步器,利用同步器实现锁的语义。可以这样理解二者之间的关系:锁是面向使用者的,它定义了使用者与锁交 互的接口(比如可以允许两个线程并行访问),隐藏了实现细节;同步器面向的是锁的实现者, 它简化了锁的实现方式,屏蔽了同步状态管理、线程的排队、等待与唤醒等底层操作。锁和同步器很好地隔离了使用者和实现者所需关注的领域。
独占锁示例
Lock接口的实现基本都是通过聚合了一个同步器的子类来完成线程访问控制的,通过独占锁了解下队列同步器(AQS)。
public class Mutex implements Lock { // 子类推荐被定义为自定义同步组件的静态内部类 // 同步器自身没有实现任何同步接口,它仅仅是定义了若干同步状态获取和释放的方法来供自定义同步组件使用 private static class Sync extends AbstractQueuedSynchronizer { // 是否处于独占状态 protected boolean isHeldExclusively() { return getState() == 1; } // 当状态为0的时候获取锁 public boolean tryAcquire(int acquire) { if (compareAndSetState(0, 1)) { return true; } return false; } // 释放锁,将状态设置为0 protected boolean tryRelease(int release) { if (getState() == 0) { throw new IllegalMonitorStateException(); } setExclusiveOwnerThread(null); setState(0); return true; } // 返回一个Condition,每一个condition都包含了一个condition队列 Condition newCondition() { return new ConditionObject(); } } // 通过Sync进行代理操作,实现Lock接口的API private final Sync sync = new Sync(); // 获取锁 @Override public void lock() { sync.acquire(1); } // 可中断地获取锁 @Override public void lockInterruptibly() throws InterruptedException { sync.acquireInterruptibly(1); } // 尝试非阻塞的获取锁 @Override public boolean tryLock() { return sync.tryAcquire(1); } /* 超时的获取锁,当前线程在以下3种情况下会返回: 1.当线程在超时时间获得了锁 2.当线程在超时时间被中断 3.超时时间结束,返回false */ @Override public boolean tryLock(long time, TimeUnit unit) throws InterruptedException { return sync.tryAcquireNanos(1, unit.toNanos(time)); } // 释放锁 @Override public void unlock() { sync.release(1); } /* 获取等待通知组件,该组件和当前的锁绑定,当前线程只有获得了锁,才能调用该组件的wait()方法,而调用用,当前线程将释放锁 */ @Override public Condition newCondition() { return sync.newCondition(); } }
AQS之同步队列结构
同步器依赖内部的同步队列(一个FIFO双向队列)来完成同步状态的管理,当前线程获取同步状态失败时,同步器会将当前线程以及等待状态等信息构造成为一个节点(Node)并将其 加入同步队列,同时会阻塞当前线程,当同步状态释放时,会把首节点中的线程唤醒,使其再次尝试获取同步状态。
节点属性:
属性类型与名称 | 描述 |
---|---|
int waitStatus | 等待状态,包含如下状态: CANCELLED:值为1,表示节点已取消,通常是因为线程被中断或者等待超时而被取消。 SIGNAL:值为-1,表示后继节点需要被唤醒,即当前节点的释放(signal)会通知后继节点继续尝试获取锁或资源。 CONDITION:值为-2,表示节点在等待队列中,节点线程等待在Condition上,当其他线程对Condition调用了signal()方法后,该节点将会从等待队列中转移到同步队列中,加入到对同步状态的获取中 PROPAGATE:值为-3,表示释放共享锁时需要向后继节点传播共享性质,以确保后继节点可以被唤醒。这在CountDownLatch等场景中会使用到。 INITIAL:值为0,初始状态。 |
Node prev | 前驱节点,当节点加入同步队列时被设置(尾部添加) |
Node next | 后继节点 |
Node nextWaiter | 等待队列中的后继节点。如果当前节点是共享的,那么这个字段将是一个SHARED常量,也就是说节点类型(独占和共享)和等待队列中的后继节点共用同一个字段 |
Thread thread | 获取同步状态的线程 |
节点是构成同步队列的基础,同步器拥有首节点(head)和尾结点(tail),没有成功获取同步状态的线程会成为节点加入该队列的尾部。
-
当一个线程成功地获取了同步状态(或者锁),其他线程将无法获取到同步状态,转而被构造成为节点并加入到同步队列中,而这个加入队列的过程必须要保证线程安全,因此同步器提供了一个基于CAS的设置尾节点的方法:compareAndSetTail(Node expect,Node update),它需要传递当前线程“认为”的尾节点和当前节点,只有设置成功后,当前节点才正式与之前的尾节点建立关联。
-
首节点是获取同步状态成功的节点,首节点的线程在释放同步状态时,将会唤醒后继节点,而后继节点将会在获取同步状态成功时将自己设置为首节点,设置首节点是通过获取同步状态成功的线程来完成的,由于只有一个线程能够成功获取到同步状态,因此设置头节点的方法并不需要使用CAS来保证,它只需要将首节点设置成为原首节点的后继节点并断开原首节点的next引用即可
解析AQS实现
以ReentrantLock的非公平锁为例,看看lock的实现。
-
ReentrantLock.lock()—获取锁的入口
public void lock() { sync.lock(); }
sync 实际上是一个抽象的静态内部类,它继承了 AQS 来实现重入锁的逻辑。
Sync 有两个具体的实现类,分别是: NofairSync:表示可以存在抢占锁的功能,也就是说不管当前队列上是否存在其他 线程等待,新线程都有机会抢占锁 FailSync: 表示所有线程严格按照 FIFO 来获取锁。
ReentrantLock的无参构造函数默认创建的是非公平锁。
public ReentrantLock() { sync = new NonfairSync(); }
-
NonfairSync.lock()—获取同步状态/锁。
static final class NonfairSync extends Sync { private static final long serialVersionUID = 7316153563782823691L; /** * Performs lock. Try immediate barge, backing up to normal * acquire on failure. */ final void lock() { if (compareAndSetState(0, 1)) setExclusiveOwnerThread(Thread.currentThread()); else acquire(1); } protected final boolean tryAcquire(int acquires) { return nonfairTryAcquire(acquires); } }
-
非公平锁的特点:抢占锁的逻辑,不管有没有线程排队,上来先CAS抢占一下。
-
CAS成功,表示成功获得锁。
-
CAS失败,调用获取独占锁acquire()走锁竞争逻辑。
-
-
AQS.acquire(1)—尝试获取独占锁or加入同步队列自旋获取锁。
public final void acquire(int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); }
-
通过 tryAcquire 尝试获取独占锁,如果成功返回 true,失败返回 false
-
如果 tryAcquire 失败,则会通过 addWaiter 方法将当前线程封装成 Node 添加 到 AQS 队列尾部。
-
acquireQueued(),将 Node 作为参数,通过自旋去尝试获取锁。
-
-
NonfairSync.tryAcquire(1)—尝试获取独占锁
protected final boolean tryAcquire(int acquires) { return nonfairTryAcquire(acquires); }
它是重写 AQS 类中的 tryAcquire 方法
-
ReentrantLock.nofairTryAcquire(1)—尝试获取独占锁
final boolean nonfairTryAcquire(int acquires) { final Thread current = Thread.currentThread(); // 获取当前执行的线程 int c = getState(); // 获取state的值 if (c == 0) { // 表示无锁状态 if (compareAndSetState(0, acquires)) { // CAS替换state的值,case成功表示获取锁成功 setExclusiveOwnerThread(current); // 保存当前获得锁的线程,下次再来的时候不用尝试竞争锁 return true; } } else if (current == getExclusiveOwnerThread()) { // 如果同一线程竞争锁,直接增加重入次数 int nextc = c + acquires; if (nextc < 0) // overflow throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } return false; }
-
获取当前线程,判断当前的锁的状态
-
如果 state=0 表示当前是无锁状态,通过 cas 更新 state 状态的值
-
当前线程是属于重入,则增加重入次数
-
-
AQS.addWaiter(Node.EXCLUSIVE) —线程构造成节点加入同步队列 (static final Node EXCLUSIVE = null;)
private Node addWaiter(Node mode) { Node node = new Node(Thread.currentThread(), mode); // Try the fast path of enq; backup to full enq on failure Node pred = tail; if (pred != null) { node.prev = pred; if (compareAndSetTail(pred, node)) { pred.next = node; return node; } } enq(node); return node; }
当 tryAcquire 方法获取锁失败以后,则会先调用 addWaiter 将当前线程封装成 Node.
入参 mode 表示当前节点的状态,传递的参数是 Node.EXCLUSIVE,表示独占状 态。意味着重入锁用到了 AQS 的独占锁功能
-
将当前线程封装成 Node
-
当前链表中的 tail 节点是否为空,如果不为空,则通过 cas 操作把当前线程的 node 添加到 AQS 队列
-
如果为空或者 cas 失败,调用 enq 将节点添加到 AQS 队列
-
-
enq(node)—通过自旋把当前节点加入到队列中
private Node enq(final Node node) { for (;;) { Node t = tail; if (t == null) { // Must initialize if (compareAndSetHead(new Node())) tail = head; } else { node.prev = t; if (compareAndSetTail(t, node)) { t.next = node; return t; } } } }
图解分析:
8.AQS.acquireQueued(node, 1)—把node加入到链表去争抢锁
-
获取当前节点的 prev 节点
-
如果 prev 节点为 head 节点,那么它就有资格去争抢锁,调用 tryAcquire 抢占锁
-
抢占锁成功以后,把获得锁的节点设置为 head,并且移除原来的初始化 head 节点
-
如果获得锁失败,则根据 waitStatus 决定是否需要挂起线程
-
最后,通过 cancelAcquire 取消获得锁的操作
final boolean acquireQueued(final Node node, int arg) { boolean failed = true; try { boolean interrupted = false; for (;;) { final Node p = node.predecessor(); // 获取当前节点的prev节点 if (p == head && tryAcquire(arg)) { // 如果是head节点,说明有资格去争抢锁 setHead(node); // 获取锁成功,也就是ThreadA已经释放了锁,然后设置head为ThreadB获得执行权限 p.next = null; // help GC failed = false; return interrupted; } // ThreadA可能还没释放锁,使得ThreadB在执行tryAcquire返回false if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { if (failed) cancelAcquire(node); } }
9.shouldParkAfterFailedAcquire—竞争锁失败后应该挂起
这个方法的主要作用是,通过 Node 的状态来判断,ThreadA 竞争锁失败以后是 否应该被挂起。
-
如果 ThreadA 的 pred 节点状态为 SIGNAL,那就表示可以放心挂起当前线程
-
通过循环扫描链表把 CANCELLED 状态的节点移除
-
修改 pred 节点的状态为 SIGNAL,返回 false.
-
返回 false 时,也就是不需要挂起,返回 true,则需要调用 parkAndCheckInterrupt 挂起当前线程
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) { // pred是前置节点 int ws = pred.waitStatus; // 前置节点的waitStatus if (ws == Node.SIGNAL) // 如果前置节点为 SIGNAL,意味着只需要等待其前置节点的线程被释放 return true; if (ws > 0) { // ws大于 0,意味着prev节点取消了排队,直接移除这个节点就行 do { node.prev = pred = pred.prev; } while (pred.waitStatus > 0); pred.next = node; } else { compareAndSetWaitStatus(pred, ws, Node.SIGNAL); // 利用cas设置prev节点的状态为SIGNAL(-1) } return false; }
图解分析:
waitStatus = -1(SIGNAL:值为-1,表示后继节点需要被唤醒,即当前节点的释放会通知后继节点继续尝试获取锁或资源。)
10.parkAndCheckInterrupt
Thread.interrupted,返回当前线程是否被其他线程触发过中断请求,也就是thread.interrupt(); 如果有触发过中断请求,那么这个方法会返回当前的中断标识 true,并且对中断标识进行复位标识已经响应过了中断请求。如果返回 true,意味着在acquire方法中会执行 selfInterrupt()。
private final boolean parkAndCheckInterrupt() { LockSupport.park(this); return Thread.interrupted(); // 1.中断 2.复位 }
selfInterrupt: 标识如果当前线程在 acquireQueued 中被中断过,则需要产生一 个中断请求,原因是线程在调用 acquireQueued 方法的时候是不会响应中断请求的。
11.ReentrantLock.unlock()—锁释放
public void unlock() { sync.release(1); }
public final boolean release(int arg) { if (tryRelease(arg)) { // 释放锁成功 Node h = head; // 获取aqs中的head节点 if (h != null && h.waitStatus != 0) // 如果head节点不为空且状态!=0.调用unparkSuccessor(h)唤醒后续节点 unparkSuccessor(h); return true; } return false; }
12.ReentrantLock.tryRelease()—设置锁状态
这个方法可以认为是一个设置锁状态的操作,通过将 state 状态减掉传入的参数值 (参数是 1),如果结果状态为 0,就将排它锁的 Owner 设置为 null,以使得其它 的线程有机会进行执行。 在排它锁中,加锁的时候状态会增加 1(当然可以自己修改这个值),在解锁的时 候减掉 1,同一个锁,在可以重入后,可能会被叠加为 2、3、4 这些值,只有 unlock() 的次数与 lock()的次数对应才会将 Owner 线程设置为空,而且也只有这种情况下才会返回 true.
protected final boolean tryRelease(int releases) { int c = getState() - releases; if (Thread.currentThread() != getExclusiveOwnerThread()) throw new IllegalMonitorStateException(); boolean free = false; if (c == 0) { free = true; setExclusiveOwnerThread(null); } setState(c); return free; }
13.AQS.unparkSuccessor()—唤醒后续节点
private void unparkSuccessor(Node node) { int ws = node.waitStatus; // 获取head节点的状态 if (ws < 0) compareAndSetWaitStatus(node, ws, 0); // 设置head节点状态为0 Node s = node.next; // 得到head节点的下一个节点 //如果下一个节点为 null 或者 status>0 表示 cancelled 状态. if (s == null || s.waitStatus > 0) { s = null; //通过从尾部节点开始扫描,找到距离head最近的一个waitStatus<=0 的节点 for (Node t = tail; t != null && t != node; t = t.prev) if (t.waitStatus <= 0) s = t; } if (s != null) // next节点不为空,直接唤醒这个线程即可 LockSupport.unpark(s.thread); }
-