基本介绍
队列同步器AbstractQueuedSynchronizer(以下简称同步器),是用来构建锁或者其他同步组件的基础框架。
使用了一个int成员变量(volatile int state)表示同步状态,通过内置的FIFO队列来完成资源获取线程的排队工作。
主要使用方式是继承,子类继承该类,实现类中抽象方法来管理同步状态,此时涉及到对同步状态的修改会用到下面三个方法:
getState()
,获取当前同步状态;setState(int newState)
,设置当前同步状态;compareAndSetState(int expect,int update)
,使用CAS设置当前状态,该方法能够保证状态设置的原子性,保证线程安全性。
子类推荐被定义为自定义同步组件的静态内部类,同步器既支持独占式地获取同步状态,也支持共享式地获取同步状态
,这样就可以方便实现不同类型的同步组件(ReentrantLock、ReentrantReadWriteLock和CountDownLatch等)。
AQS共享资源的方式:独占式和共享式
AQS 定义了两种资源共享方式 :独占式 (Exclusive)和共享式(Share)
- 独占式:只有一个线程能执行,具体的 Java 实现有 ReentrantLock。
- 共享式:多个线程可同时执行,具体的 Java 实现有 Semaphore和CountDownLatch。
AQS只是一个框架 ,只定义了一个抽象类,具体资源的获取、释放都由自定义同步器去实现。
独占式:
共享式:
实现类概述:
ReentrantLock对AQS的独占方式实现为:ReentrantLock中的state初始值为0表示无锁状态。在线程执行 tryAcquire()获取该锁后ReentrantLock中的state+1,这时该线程独占ReentrantLock锁,其他线程在通过tryAcquire() 获取锁时均会失败,直到该线程释放锁后state再次为0,其他线程才有机会获取该锁。该线程在释放锁之前可以重复获取此锁,每获取一次便会执行一次state+1, 因此ReentrantLock属于可重入锁(同一个线程可以获得锁多次)。 但获取多少次锁就要释放多少次锁,这样才能保证state最终为0。如果获取锁的次数多于释放锁的次数,则会出现该线程一直持有该锁的情况;如果获取锁的次数少于释放锁的次数,则运行中的程序会报锁异常。
CountDownLatch对AQS的共享方式实现为:CountDownLatch 将任务分为N个子线程去执行,将 state 初始化为 N, N与线程的个数一致,N个子线程是井行执行的,每个子线程都在执行完成后 countDown()1次, state 执行 CAS 操作并减1。在所有子线程都执行完成( state=0)时会unpark()主线程,然后主线程会从 await()返回,继续执行后续的动作。
一般来说,自定义同步器要么采用独占方式,要么采用共享方式 ,实现类只需实现tryAcquire、tryRelease和tryAcquireShared、tryReleaseShared 中的一组即可。但AQS也支持自定义同步器同时实现独占和共享两种方式,例如 ReentrantReadWriteLock 在读取时采用了共享方式,在写入时采用了独占方式。
自定义独占锁
:(不可重入)
class Mutex implements Lock {
// 静态内部类,自定义同步器
private static class Sync extends AbstractQueuedSynchronizer {
// 是否处于占用状态
protected boolean isHeldExclusively() {
return getState() == 1;
}
// 当状态为0的时候获取锁
public boolean tryAcquire(int acquires) {
if (compareAndSetState(0, 1)) {
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}
// 释放锁,将状态设置为0
protected boolean tryRelease(int releases) {
if (getState() == 0) throw new
IllegalMonitorStateException();
setExclusiveOwnerThread(null);
// setState 保证前面设置owner线程被其他线程可见
setState(0);
return true;
}
// 返回一个Condition,每个condition都包含了一个condition队列
Condition newCondition() {
return new ConditionObject();
}
}
// 仅需要将操作代理到Sync上即可
private final Sync sync = new Sync();
// 加锁(不成功加入阻塞队列)
public void lock() {
sync.acquire(1);
}
// 尝试加锁(一次)
public boolean tryLock() {
return sync.tryAcquire(1);
}
public void unlock() {
sync.release(1);
}
public Condition newCondition() {
return sync.newCondition();
}
public boolean isLocked() {
return sync.isHeldExclusively();
}
public boolean hasQueuedThreads() {
return sync.hasQueuedThreads();
}
// 加锁,可打断
public void lockInterruptibly() throws InterruptedException {
sync.acquireInterruptibly(1);
}
// 尝试加锁有超时时间(一次)
public boolean tryLock(long timeout, TimeUnit unit) throws InterruptedException {
return sync.tryAcquireNanos(1, unit.toNanos(timeout));
}
}
测试1(开启两个线程):
public static void main(String[] args) {
Mutex myLock = new Mutex();
new Thread(() -> {
myLock.lock();
try {
System.out.println("lock... t1 " + LocalDateTime.now());
// 等待两秒
TimeUnit.SECONDS.sleep(2);
} catch (Exception e) {
throw new RuntimeException(e);
} finally {
System.out.println("unlock... t1 " + LocalDateTime.now());
myLock.unlock();
}
}, "t1").start();
new Thread(() -> {
myLock.lock();
try {
System.out.println("lock... t2 " + LocalDateTime.now());
} catch (Exception e) {
throw new RuntimeException(e);
} finally {
System.out.println("unlock... t2 " + LocalDateTime.now());
myLock.unlock();
}
}, "t2").start();
}
结果:当线程1释放完锁,线程2才能获取锁
lock... t1 2023-07-21T14:51:03.479
unlock... t1 2023-07-21T14:51:05.495
lock... t2 2023-07-21T14:51:05.495
unlock... t2 2023-07-21T14:51:05.495
测试2:
public static void main(String[] args) {
Mutex myLock = new Mutex();
new Thread(() -> {
myLock.lock();
System.out.println("lock1..." + LocalDateTime.now());
myLock.lock();
System.out.println("lock2... " + LocalDateTime.now());
try {
System.out.println("lock... t1 " + LocalDateTime.now());
TimeUnit.SECONDS.sleep(2);
} catch (Exception e) {
throw new RuntimeException(e);
} finally {
System.out.println("unlock... t1 " + LocalDateTime.now());
myLock.unlock();
}
}, "t1").start();
new Thread(() -> {
myLock.lock();
try {
System.out.println("lock... t2 " + LocalDateTime.now());
} catch (Exception e) {
throw new RuntimeException(e);
} finally {
System.out.println("unlock... t2 " + LocalDateTime.now());
myLock.unlock();
}
}, "t2").start();
}
结果:不可重入锁,线程1获取锁成功之后,再次获取锁会阻塞线程
AQS原理
AQS 为每个共享资源都设置一个共享资源锁,线程在需要访问共享资源时首先需要获取共享资源锁,如果获取到了共享资源锁,便可以在当前线程中使用该共享资源,如果获取不到,则将该线程放入线程等待队列,等待下一次资源调度
同步器提供的模板方法:
模板方法基本上分为3类:独占式获取与释放同步状态、共享式获取与释放同步状态和查询同步队列中的等待线程情况。
同步队列
同步器依赖内部的同步队列(一个FIFO双向队列)来完成同步状态的管理。
当前线程获取同步状态失败时,同步器会将当前线程以及等待状态等信息构造成为一个节点(Node)并将其加入同步队列,同时会阻塞当前线程,当同步状态释放时,会把首节点中的线程唤醒,使其再次尝试获取同步状态。
队列节点Node(AQS静态内部类):
static final class Node {
// 共享式
static final Node SHARED = new Node();
// 独占式
static final Node EXCLUSIVE = null;
// 线程等待状态枚举
static final int CANCELLED = 1;
static final int SIGNAL = -1;
static final int CONDITION = -2;
static final int PROPAGATE = -3;
// 线程等待状态
volatile int waitStatus;
// 双向链表
volatile Node prev;
volatile Node next;
// 当前线程
volatile Thread thread;
Node nextWaiter;
/**
* Returns true if node is waiting in shared mode.
*/
final boolean isShared() {
return nextWaiter == SHARED;
}
// 获取前置节点
final Node predecessor() throws NullPointerException {
Node p = prev;
if (p == null)
throw new NullPointerException();
else
return p;
}
// 构造器
Node() { // Used to establish initial head or SHARED marker
}
Node(Thread thread, Node mode) { // Used by addWaiter
this.nextWaiter = mode;
this.thread = thread;
}
Node(Thread thread, int waitStatus) { // Used by Condition
this.waitStatus = waitStatus;
this.thread = thread;
}
}
节点Node属性介绍:
同步器拥有首节点(head)和尾节点(tail),没有成功获取同步状态的线程将会成为节点加入该队列的尾部
public abstract class AbstractQueuedSynchronizer
extends AbstractOwnableSynchronizer
implements java.io.Serializable {
...
static final class Node {
...
}
private transient volatile Node head;
private transient volatile Node tail;
// 同步状态
private volatile int state;
...
}
当一个线程成功地获取了同步状态(或者锁),其他线程将无法获取到同步状态,转而被构造成为节点并加入到同步队列中,而这个加入队列的过程必须要保证线程安全
,因此同步器提供了一个基于CAS的设置尾节点的方法:compareAndSetTail(Node expect,Node update)
同步队列遵循FIFO,首节点是获取同步状态成功的节点,首节点的线程在释放同步状态时,将会唤醒后继节点,而后继节点将会在获取同步状态成功时将自己设置为首节点
独占式
获取锁
一个线程获取同步状态成功,其他线程获取同步状态失败后进入同步队列中,后续对线程进行中断操作时,线程不会从同步队列中移出
同步器的acquire方法:
public final void acquire(int arg) {
// 做这几件事:同步状态获取、节点构造、加入同步队列以及在同步队列中自旋等待的相关工作
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
// 取决于实现类实现,一般只有一个线程能获取同步状态
protected boolean tryAcquire(int arg) {
throw new UnsupportedOperationException();
}
主要逻辑:首先调用自定义同步器实现的tryAcquire(int arg)方法,该方法保证线程安全的获取同步状态
,如果同步状态获取失败,则构造同步节点(独占式Node.EXCLUSIVE,同一时刻只能有一个线程成功获取同步状态)
并通过addWaiter(Node node)方法将该节点加入到同步队列的尾部
,最后调用acquireQueued(Node node,int arg)
方法,使得该节点以“死循环”的方式获取同步状态
。
安全地将节点添加到同步队列尾部:
// 添加为尾节点后,返回该节点
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
// 快速尝试在尾部添加
Node pred = tail;
if (pred != null) {
node.prev = pred;
// CAS确保尾节点能够被安全地添加
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
enq(node);
return node;
}
// 在“死循环”中只有通过CAS将节点设置成为尾节点之后,当前线程才能从该方法返回
private Node enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) { // Must initialize
if (compareAndSetHead(new Node()))
tail = head;
} else {
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
节点进入同步队列之后,就进入了一个自旋的过程,每个节点(或者说每个线程)都在自省地观察,当条件满足,获取到了同步状态,就可以从这个自旋过程中退出,否则依旧留在这个自旋过程中(并会阻塞节点的线程)
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
// 获取前置节点
final Node p = node.predecessor();
// 只有前驱节点是头节点才能够尝试获取同步状态
if (p == head && tryAcquire(arg)) {
// 获取同步状态成功,将节点设置为头结点(相当于节点出队列的感觉)
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
只有前驱节点是头节点才能够尝试获取同步状态,原因有二:
- 头节点是成功获取到同步状态的节点,而头节点的线程释放了同步状态之后,将会唤醒其后继节点,后继节点的线程被唤醒后需要检查自己的前驱节点是否是头节点。
- 维护同步队列的FIFO原则。
流程图
独占式同步状态获取流程
前驱节点为头节点且能够获取同步状态的判断条件和线程进入等待状态是获取同步状态的自旋过程。同步状态获取成功之后,当前线程从acquire(int arg)方法返回,代表着当前线程获取了锁,可以往后继续执行逻辑。
释放锁
当前线程获取同步状态并执行了相应逻辑之后,就需要释放同步状态,使得后续节点能够继续获取同步状态
。通过调用同步器的release(int arg)方法可以释放同步状态,在释放了同步状态之后,会唤醒其后继节点(进而使后继节点重新尝试获取同步状态)。
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
唤醒头节点的后继节点线程
,unparkSuccessor(Node node)方法使用LockSupport来唤醒处于等待状态的线程。
小结
独占式同步状态获取和释放过程:在获取同步状态时,同步器维护一个同步队列,获取状态失败的线程都会被加入到队列中并在队列中进行自旋;移出队列(或停止自旋)的条件是前驱节点为头节点且成功获取了同步状态。在释放同步状态时,同步器调用tryRelease(int arg)方法释放同步状态,然后唤醒头节点的后继节点
。
共享式
共享式获取:同一时刻能否有多个线程同时获取到同步状态
。
以文件读写为例,写操作要求对资源的独占式访问,而读操作可以是共享式访问。
独占资源与共享资源的区别:
获取锁
同步器的acquireShared(int arg)
方法可以共享式地获取同步状态
public final void acquireShared(int arg) {
if (tryAcquireShared(arg) < 0)
doAcquireShared(arg);
}
private void doAcquireShared(int arg) {
// 同步状态小于0,加入同步队列 SHARED: 共享
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
// 头节点
if (p == head) {
int r = tryAcquireShared(arg);
// 值大于等于0
if (r >= 0) {
setHeadAndPropagate(node, r);
p.next = null;
if (interrupted)
selfInterrupt();
failed = false;
return;
}
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
调用tryAcquireShared(int arg)方法尝试获取同步状态,tryAcquireShared(int arg)方法返回值为int类型,当返回值大于等于0时,表示能够获取到同步状态
。
共享式获取的自旋过程中,成功获取到同步状态并退出自旋的条件就是tryAcquireShared(int arg)方法返回值大于等于0。
在**doAcquireShared(int arg)**方法的自旋过程中,如果当前节点的前驱为头节点时,尝试获取同步状态,如果返回值大于等于0,表示该次获取同步状态成功并从自旋过程中退出
。
释放锁
共享式获取也需要释放同步状态,通过调用releaseShared(int arg)方法可以释放同步状态。
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}
释放同步状态之后,将会唤醒后续处于等待状态的节点。和独占式主要区别在于tryReleaseShared(int arg)方法必须确保同步状态(或者资源数)线程安全释放,一般是通过循环和CAS来保证的,因为释放同步状态的操作会同时来自多个线程。
独占式超时获取
调用同步器的doAcquireNanos(int arg,long nanosTimeout)方法可以超时获取同步状态,即在指定的时间段内获取同步状态,如果获取到同步状态则返回true,否则,返回false
。
响应中断的同步状态获取
Java 5之前,当一个线程获取不到锁而被阻塞在synchronized之外时,对该线程进行中断操作,此时该线程的中断标志位会被修改,但线程依旧会阻塞在synchronized上,等待着获取锁
。
Java 5中,同步器提供了**acquireInterruptibly(int arg)**方法,在等待获取同步状态时,如果当前线程被中断,会立刻返回,并抛出InterruptedException
。
超时获取锁
doAcquireNanos(int arg,long nanosTimeout)方法在支持响应中断的基础上,增加了超时获取的特性。
private boolean doAcquireNanos(int arg, long nanosTimeout)
throws InterruptedException {
// 等待时间小于0,直接return false
if (nanosTimeout <= 0L)
return false;
// 记录最终等待终止时间
final long deadline = System.nanoTime() + nanosTimeout;
// 将节点添加到同步队列尾部
final Node node = addWaiter(Node.EXCLUSIVE);
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();
// 前置节点是 head节点 并且获取同步状态成功 退出自旋 return true
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return true;
}
// 最终剩余时间(重新计算超时间间隔)
nanosTimeout = deadline - System.nanoTime();
// 小于0(截止到等待时间)还未退出自旋 return false
if (nanosTimeout <= 0L)
return false;
if (shouldParkAfterFailedAcquire(p, node) &&
nanosTimeout > spinForTimeoutThreshold)
// 线程超时等待,超时或者被唤醒继续执行
LockSupport.parkNanos(this, nanosTimeout);
// 线程被中断,抛出异常
if (Thread.interrupted())
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}
nanosTimeout小于等于spinForTimeoutThreshold(1000纳秒),将不会使该线程进行超时等待,而是进入快速的自旋过程。
非常短的超时等待无法做到十分精确,如果这时再进行超时等待,会让nanosTimeout的超时从整体上表现得反而不精确
(本该能获取同步状态,可能由于唤醒判断将时间浪费导致获取同步状态失败)。
因此,在超时非常短的场景下,同步器会进入无条件的快速自旋
(直到获取到同步状态或者超时退出自旋)。
流程图
共享锁实践
设计一个同步工具:该工具在同一时刻,只允许至多两个线程同时访问,超过两个线程的访问将被阻塞,将这个同步工具命名为TwinsLock。
TwinsLock能够在同一时刻支持多个线程的访问,是共享式访问,因此,需要使用同步器提供的acquireShared(int args)方法等和Shared相关的方法,TwinsLock必须重写tryAcquireShared(int args)方法和tryReleaseShared(int args)方法,这样才能保证同步器的共享式同步状态的获取与释放方法得以执行。
TwinsLock在同一时刻允许至多两个线程的同时访问,表明同步资源数为2,这样可以设置初始状态status为2,当一个线程进行获取,status减1,该线程释放,则status加1
,状态的合法范围为0、1和2,其中0表示当前已经有两个线程获取了同步资源,此时再有其他线程对同步状态进行获取,该线程只能被阻塞。在同步状态变更时,需要使用compareAndSet(int expect,int update)方法做原子性保障
。
/**
* 同时两个线程可以获取同步状态
*/
public class TwinsLock implements Lock {
private final Sync sync = new Sync(2);
private static final class Sync extends AbstractQueuedSynchronizer {
Sync(int count) {
if (count <= 0) {
throw new IllegalArgumentException("count must large than zero.");
}
setState(count);
}
public int tryAcquireShared(int reduceCount) {
// 自旋 + CAS
for (; ; ) {
int current = getState();
int newCount = current - reduceCount;
if (newCount < 0 || compareAndSetState(current, newCount)) {
return newCount;
}
}
}
public boolean tryReleaseShared(int returnCount) {
// 自旋 + CAS
for (; ; ) {
int current = getState();
int newCount = current + returnCount;
if (compareAndSetState(current, newCount)) {
return true;
}
}
}
}
@Override
public void lockInterruptibly() throws InterruptedException {
}
@Override
public boolean tryLock() {
return false;
}
@Override
public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
return false;
}
@Override
public Condition newCondition() {
return null;
}
// 主要看下面两个方法
public void lock() {
sync.acquireShared(1);
}
public void unlock() {
sync.releaseShared(1);
}
}
同步器作为一个桥梁,连接线程访问以及同步状态控制等底层技术与不同并发组件。
测试:开启10个线程,每次都只会打印两个线程名称,同一时刻只有两个线程能获取到锁。
public class TwinsLockTest {
@Test
public void test() {
final Lock lock = new TwinsLock();
class Worker extends Thread {
public void run() {
while (true) {
lock.lock();
try {
SleepUtils.second(1);
System.out.println(Thread.currentThread().getName());
SleepUtils.second(1);
} finally {
lock.unlock();
}
}
}
}
// 启动10个线程
for (int i = 0; i < 10; i++) {
Worker w = new Worker();
w.setDaemon(true);
w.start();
}
// 每隔1秒换行
for (int i = 0; i < 10; i++) {
SleepUtils.second(1);
System.out.println();
}
}
}
效果实现!