AQS概括
核心思想
AQS(AbstractQueuedSynchronizer)是Java并发包中的一个核心同步器框架,它定义了一套多线程访问共享资源的同步机制。
其核心思想是:利用一个volatile的int类型的变量state
来表示同步状态,并通过一个FIFO(先进先出)队列来管理获取同步状态失败的线程。
当线程无法获取同步状态时,会被放入等待队列中阻塞,直到同步状态被释放,队列中的线程被唤醒并重新尝试获取同步状态。
数据结构
AQS中的等待队列是一个基于双向链表的FIFO队列。
队列中的每个节点(Node)代表一个等待获取同步状态的线程,节点之间通过prev
和next
指针相互连接。
此外,每个节点还包含线程引用(thread
)、等待状态(waitStatus
)等信息。
工作原理
AQS使用一个int
类型的成员变量state
来表示同步状态,通过内置的FIFO队列来完成资源获取线程的排队工作。
线程通过CAS(Compare-And-Swap)操作来修改AQS的同步状态。
如果线程获取同步状态失败(例如state
不为0),AQS则会将当前线程以及等待状态等信息构造成一个节点(Node)并将其加入同步队列,同时会阻塞当前线程。
当同步状态释放时,则会把节点中的线程唤醒,使其再次尝试获取同步状态。
资源共享方式
AQS支持两种资源共享方式:
-
独占模式(Exclusive Mode):在这种模式下,一次只有一个线程能够获取到同步状态。例如,
ReentrantLock
就是以独占模式实现的。 -
共享模式(Shared Mode):在这种模式下,允许多个线程同时获取到同步状态,但是每次获取到的资源量可能不同。例如,
Semaphore
和CountDownLatch
就是以共享模式实现的。
重要方法
AQS提供了一系列重要方法用于实现同步状态的管理和线程的阻塞与唤醒,包括:
-
acquire(int arg)
: 以独占模式获取同步状态,如果获取失败则进入等待队列。 -
release(int arg)
: 释放同步状态,并唤醒等待队列中的一个或多个线程。 -
tryAcquire(int arg)
: 尝试以独占模式获取同步状态,成功则返回true,失败则返回false。 -
tryRelease(int arg)
: 尝试释放同步状态,成功则返回true,失败则返回false。 -
tryAcquireShared(int arg)
: 尝试以共享模式获取同步状态。 -
tryReleaseShared(int arg)
: 尝试以共享模式释放同步状态。
应用场景
AQS广泛应用于Java并发包中的各种同步组件中,如ReentrantLock
、Semaphore
、CountDownLatch
等。
它为这些同步组件提供了一个统一的框架和机制来实现多线程的同步和协调。
赛跑起跑场景中的AQS应用
CountDownLatch
是Java并发包中的一个同步工具,它允许一个或多个线程等待其他线程完成一组操作。
CountDownLatch
是基于AQS框架实现的,它继承了AQS并重写了tryAcquireShared
方法来尝试获取同步状态。
在CountDownLatch
中,state
的初始值被设置为构造方法中传入的计数值,表示需要等待的线程数量。
当调用await
方法时,如果state
不为0,当前线程会进入等待状态,并将其封装成Node
节点加入AQS的等待队列。
当调用countDown
方法时,state
的值会递减。如果state
减至0,AQS会唤醒等待队列中的所有线程。
赛跑起跑场景中的AQS应用
假设有一个田径比赛,有8名选手参赛。为了保证比赛的公平性,我们需要确保所有选手在听到枪声后同时起跑。
这里,我们可以使用CountDownLatch
来实现这一功能,而CountDownLatch
则是基于AQS实现的。
以下是赛跑起跑场景的示例代码:
package com.hmblogs.backend.study.thread;
import java.util.concurrent.CountDownLatch;
public class RaceStartWithCountDownLatch {
public static void main(String[] args) throws InterruptedException {
int numberOfRunners = 8; // 参赛选手数量
CountDownLatch startSignal = new CountDownLatch(numberOfRunners); // 初始化起跑信号
for (int i = 0; i < numberOfRunners; i++) {
new Thread(new Runner(startSignal, "Runner " + (i + 1))).start();
}
// 这里为了示例的简洁性,只调用了一次countDown,实际使用时需要确保调用次数与选手数量一致
// 正确的做法是在所有选手都准备好后,再一次性调用numberOfRunners次countDown
System.out.println("Starting the race...");
for (int i = 0; i < numberOfRunners; i++) {//如果不加该for逻辑则不会有选手起跑。
startSignal.countDown(); // 发出起跑信号,让所有选手起跑(实际应调用多次)
}
}
static class Runner implements Runnable {
private final CountDownLatch startSignal;
private final String name;
Runner(CountDownLatch startSignal, String name) {
this.startSignal = startSignal;
this.name = name;
}
@Override
public void run() {
try {
System.out.println(name + " is ready and waiting for the start signal...");
startSignal.await(); // 等待起跑信号
System.out.println(name + " has started running!");
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
}
在上面的代码中,我们创建了一个CountDownLatch
实例startSignal
,其计数器的初始值为参赛选手的数量。
每个选手线程在调用startSignal.await
时,都会尝试获取同步状态。由于state
不为0,它们会进入等待状态。
当主线程(或起跑控制线程)调用startSignal.countDown
时,state
的值会递减。但是,由于示例代码中只调用了一次countDown
,所以state
不会减至0,选手线程不会立即被唤醒。
为了修正这一点,应该确保在发出起跑信号时调用足够次数的countDown
,使得state
减至0,从而唤醒所有等待的选手线程。
最后
AQS作为一个通用的同步框架,为Java并发编程提供了极大的便利。
通过继承AQS并重写相应的方法,开发者可以轻松地实现自定义的同步逻辑,而无需深入了解底层的同步机制。
这使得Java并发编程变得更加简单、高效。
此外,AQS还提供了丰富的同步特性,如可重入性、可中断性、超时等,这些特性使得基于AQS实现的同步工具更加灵活、强大。