文章目录
- 二者区别
- 实现原理
-
- 同步队列
-
- 原码-内部类Node
- 源码-获取锁
- 源码-释放锁
- 条件队列
-
- 原码-内部类Node
- 源码-等待
- 源码-唤醒
- Demo-比较典型的条件队列使用场景
二者区别
首先,AQS中的 同步队列 和 条件队列 是两种不同队列:
- 目的不同:同步队列主要用于实现锁机制(也就是锁的获取和释放),而条件队列用于实现条件变量,条件变量是并发编程中一种用于线程间通信的机制,它允许一个或多个线程在特定条件成立之前等待并释放相关的锁,直到其他线程改变了条件并显式的唤醒等待在该条件中的线程。比较典型的一个条件队列使用场景就是 ReentrantLock 的 Condition。
- 使用方式不同:同步队列是AQS自动管理的,开发者通常不需要直接与之交互;而条件队列是通过Condition接口暴露给开发者的,需要显示地调用 等待await()方法 和 通知signal()/signalAll()方法
- 队列类型不同:虽然它们都是队列结构,但同步队列是所有基于AQS同步器共享的,每个同步器实例只有一个同步队列;但条件队列是每个 Condition实例特有的,一个同步器可以有多个 Condition对象,因此也就有多个条件队列。
实现原理
同步队列
同步队列的实现原理比较简单,AQS中同步队列是一个FIFO队列,节点类型为AQS的内部类Node。Node有两个指针,prev和next,分别指向前置节点和后置节点,一个个Node节点组成了一个双向链表。
- 当一个线程尝试获取锁失败时,它会被封装成一个Node节点加入到队列尾部。
- 这个节点会处于等待状态,直到锁被其他线程释放。
- 当锁被释放时,队列的头节点(持有锁的线程)会通知其后继节点(如果存在的话),后继节点尝试获取锁。这个过程会一直持续,直到队列为空。
原码-内部类Node
static final class Node {
// 前置节点和后置节点构成双向链表
volatile Node prev;
volatile Node next;
// 线程本身
volatile Thread thread;
// 状态信息,标识节点在同步队列中的等待状态
volatile int waitStatus;
}
源码-获取锁
public final void acquire(int arg) {
// 尝试获取锁失败之后,将线程封装成一个Node节点加入队列
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
}
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
// 首先尝试直接在尾部插入节点
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
// 插入失败时,进入完整的入队操作
enq(node);
return node;
}
private Node enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) {
// 队列为空,初始化(第一个节点为虚节点,也叫哨兵节点,并不存储任何信息,只是占位。真正的第一个有数据的节点,是从第二个节点开始的。)
if (compareAndSetHead(new Node()))
tail = head;
} else {
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
acquireQueued() 中方法链路太长,这里拆开讲解。
// 在加入队列排队的同时尝试抢夺资源
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
// 自旋
for (;;) {
// 获得当前节点的前置节点,判断当前节点是否是头节点。若是则进行抢占锁
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
// 若抢占成功,则设置当前节点为头节点
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
// 修改Node节点状态,使其线程等待。等候唤醒
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
// 线程被唤醒,进行自旋判断抢占锁
interrupted = t