文章目录
- ReentrantLock
- AQS 队列
- AbstractOwnableSynchronizer
- AbstractQueuedSynchronizer
- Node
- waitStatus
- SHARED/EXCLUSIVE 模式
- 加锁流程
- 尝试加锁 tryAcquire
- 加锁失败入队
- addWaiter
- enq
- 阻塞等待 acquireQueued
- parkAndCheckInterrupt
- 放弃加锁 cancelAcquire
- 唤醒阻塞线程 unparkSuccessor
- 释放锁流程
- 中断处理
- 中断加锁模式 lockInterruptibly
- Condition
- ConditionObject
- await
- transferAfterCancelledWait
- reportInterruptAfterWait
- signal
- signalAll
- 对比Object#wait, notify/notifyAll
ReentrantLock
ReentrantLock 是 Java 中的一种可重入锁,属于 java.util.concurrent.locks 包。它提供了比传统的 synchronized 关键字更灵活的锁机制。以下是 ReentrantLock 的主要特点:
- 可重入性
同一线程可以多次获得同一把锁,而不会导致死锁。当线程第一次获取锁时,锁的持有计数增加;每次释放锁时,计数减少,直到计数为零时,锁才真正被释放。 - 公平性
ReentrantLock 可以选择公平性策略,默认非公平。公平锁按请求的顺序允许线程获取锁,而非公平锁则允许某些线程“插队”。可以通过构造方法指定锁的公平性:ReentrantLock lock = new ReentrantLock(true);
- 可中断的锁请求
ReentrantLock 提供了lockInterruptibly()
方法,允许线程在等待锁时响应中断。这使得线程在获取锁时可以被中断,增强了程序的灵活性。 - 条件变量支持
ReentrantLock 提供了 Condition 类的支持,允许线程在特定条件下等待和通知。通过 newCondition() 方法可以创建条件变量,支持更加复杂的线程协调。 - 锁的状态查询
可以通过isHeldByCurrentThread()
方法检查当前线程是否持有锁,通过getHoldCount()
方法获取当前线程持有锁的次数。 - 灵活的锁管理
ReentrantLock 允许显式地尝试获取锁,使用 tryLock() 方法可以尝试立即获取锁而不阻塞。如果获取失败,可以选择执行其他操作。 - 性能优势
在高竞争环境下,ReentrantLock 通常比 synchronized 更具性能优势,尤其是在高并发的场景中。 - 非阻塞锁请求
tryLock(long timeout, TimeUnit unit)
方法允许线程在指定的时间内尝试获取锁,如果无法在该时间内获取锁,则返回失败,避免了永久阻塞。 - 实现复杂的同步算法
由于其灵活的特性,ReentrantLock 可以用于实现一些复杂的同步算法和数据结构。
AQS 队列
ReentrantLock 基于AQS同步机制实现加锁的机制
AbstractOwnableSynchronizer
java.util.concurrent.locks.AbstractOwnableSynchronizer
是JUC里提供的一个抽象类,维护了当前获取锁的线程
public abstract class AbstractOwnableSynchronizer
implements java.io.Serializable {
/**
* The current owner of exclusive mode synchronization.
*/
private transient Thread exclusiveOwnerThread;
}
一般使用的是其子类 AbstractQueuedSynchronizer,这个才是 AQS 本身
AbstractQueuedSynchronizer
java.util.concurrent.locks.AbstractQueuedSynchronizer 基于模板方法设计模式,
public abstract class AbstractQueuedSynchronizer
/**
* Head of the wait queue, lazily initialized. Except for
* initialization, it is modified only via method setHead. Note:
* If head exists, its waitStatus is guaranteed not to be
* CANCELLED.
*/
private transient volatile Node head;
/**
* Tail of the wait queue, lazily initialized. Modified only via
* method enq to add new wait node.
*/
private transient volatile Node tail;
/**
* The synchronization state.
*/
private volatile int state;
}
Node
Node是AbstractQueuedSynchronizer的一个内部类,表示AQS队列的节点对象
static final class Node {
volatile int waitStatus;
volatile Node prev;
volatile Node next;
volatile Thread thread;
Node nextWaiter;
}
waitStatus
class Node {
/** waitStatus value to indicate thread has cancelled */
static final int CANCELLED = 1;
/* waitStatus value to indicate successor's thread needs unparking */
static final int SIGNAL = -1;
/** waitStatus value to indicate thread is waiting on condition */
static final int CONDITION = -2;
/**
* waitStatus value to indicate the next acquireShared should
* unconditionally propagate
*/
static final int PROPAGATE = -3;
}
状态字段,仅采用以下值:
- SIGNAL:此节点的后继节点已(或即将)被阻止(通过停放),因此当前节点必须在释放或取消后继节点时取消停放。为避免竞争,获取方法必须首先指示它们需要信号,然后重试原子获取,然后在失败时阻止。
- CANCELLED:此节点由于超时或中断而被取消。节点永远不会离开此状态。特别是,具有已取消节点的线程永远不会再被阻止。
- CONDITION:此节点当前位于条件队列中。在传输之前,它将不会用作同步队列节点,此时状态将设置为 0。(此处使用此值与字段的其他用途无关,但简化了机制。)
- PROPAGATE:releaseShared 应传播到其他节点。这在 doReleaseShared 中设置(仅适用于头节点),以确保传播继续,即使其他操作已经介入。
- 0:以上都不是
为简化使用,这些值按数字排列。非负值表示节点不需要发信号。因此,大多数代码不需要检查特定值,只需检查与0的大小关系。对于普通同步节点,该字段初始化为 0,对于条件节点,该字段初始化为 CONDITION。使用 CAS(或在可能的情况下,使用无条件volatile写入)对其进行修改。
SHARED/EXCLUSIVE 模式
static final class Node {
/** Marker to indicate a node is waiting in shared mode */
static final Node SHARED = new Node();
/** Marker to indicate a node is waiting in exclusive mode */
static final Node EXCLUSIVE = null;
}
加锁流程
非公平方式加锁流程如下
final void lock() {
// 先尝试CAS加锁
if (compareAndSetState(0, 1))
// 成功则
setExclusiveOwnerThread(Thread.currentThread());
else acquire(1);
}
公平方式加锁流程如下
final void lock() {
acquire(1);
}
都是调用 acquire(1)
,这是AbstractQueuedSynchronizer的方法
//
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
尝试加锁 tryAcquire
公平和非公平的区别在于 tryAcquire 的各自的实现。tryAcquire 方法在AbstractQueuedSynchronizer
中默认实现是直接抛异常
protected boolean tryAcquire(int arg) {
throw new UnsupportedOperationException();
}
在NonfairSync
和FairSync中有各自的实现实现
// NonfairSync
protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}
// Sync
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
// 锁未被任何线程持有,直接尝试加锁
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
} else if (current == getExclusiveOwnerThread()) { // 锁重入
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
// 加锁失败
return false;
}
// FairSync
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
// 没有等待的线程才尝试加锁
if (!hasQueuedPredecessors() && compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
} else if (current == getExclusiveOwnerThread()) { // 锁重入
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
重入次数限制
重入次数最大支持2147483647,在文档中有提到
加锁失败入队
加锁失败后,封装为一个Node进行等待队列
addWaiter
private Node addWaiter(Node mode) {
// mode: Node.EXCLUSIVE
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
Node pred = tail;
if (pred != null) {
// 说明有线程在排队
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
enq(node);
return node;
}
如果有多个线程同时执行 addWaiter,那么多个线程可能node.prev = pred;
都执行成功,如下图所示,此时pred.next = node;
就需要进行并发控制了,这里采用的是 CAS。
enq
如果当前队列为空,没有线程排队或者前面并发情况下CAS失败的线程
private Node enq(final Node node) {
for (;;) {
// 获取 tail 节点
Node t = tail;
if (t == null) { // Must initialize
if (compareAndSetHead(new Node()))
tail = head;
} else {
// 有线程排队
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
// 如果有多个线程,失败的线程继续下一次循环,直到设置成功
}
}
}
阻塞等待 acquireQueued
处在等待队列中的线程只有在队头时有机会尝试加锁,返回值表示在阻塞过程中是否中断过
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
// p == head 表明该节点在队首,再次尝试获取锁
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
// 获取锁失败的线程: 阻塞该线程,等待被唤醒获取锁资源
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
获取失败后是否应该阻塞,只要是Node.SIGNAL的节点都需要阻塞
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
// 获取前驱节点的状态
int ws = pred.waitStatus;
if (ws == Node.SIGNAL)
return true;
if (ws > 0) { // CANCELLED
// node.prev 指向队列中向前第一个不为 CANCELLED 的节点
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else { // 0 | CONDITION -2 | PROPAGATE -3
// 满足条件: <= 0 & != -1
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
由前驱节点负责唤醒后一个节点
parkAndCheckInterrupt
中断会唤醒阻塞的线程
private final boolean parkAndCheckInterrupt() {
// 阻塞当前线程
LockSupport.park(this);
// 返回线程是否被中断过
return Thread.interrupted();
}
放弃加锁 cancelAcquire
cancelAcquire意为取消获取锁,什么时候取消获取,如下图所示,由变量 fasle 控制,但是如果加锁成功,failed 肯定为 false,所以不会执行cancelAcquire,而如果不成功那么会阻塞,或者是再次循环,所以也不会执行cancelAcquire。所以想要执行cancelAcquire要满足的条件只有1个,就是for循环里面操作抛了异常
下面来看取消获取锁的逻辑
private void cancelAcquire(Node node) {
// Ignore if node doesn't exist
if (node == null)
return;
node.thread = null;
// 跳过取消的前驱节点
Node pred = node.prev;
while (pred.waitStatus > 0) // CANCELLED
node.prev = pred = pred.prev;
// predNext is the apparent node to unsplice. CASes below will
// fail if not, in which case, we lost race vs another cancel
// or signal, so no further action is necessary.
Node predNext = pred.next;
// Can use unconditional write instead of CAS here.
// After this atomic step, other Nodes can skip past us.
// Before, we are free of interference from other threads.
node.waitStatus = Node.CANCELLED;
// If we are the tail, remove ourselves.
if (node == tail && compareAndSetTail(node, pred)) {
compareAndSetNext(pred, predNext, null);
} else {
// If successor needs signal, try to set pred's next-link
// so it will get one. Otherwise wake it up to propagate.
int ws;
if (pred != head &&
((ws = pred.waitStatus) == Node.SIGNAL ||
(ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&
pred.thread != null) {
Node next = node.next;
if (next != null && next.waitStatus <= 0)
compareAndSetNext(pred, predNext, next);
} else {
unparkSuccessor(node);
}
node.next = node; // help GC
}
}
唤醒阻塞线程 unparkSuccessor
唤醒指定节点的后继节点
private void unparkSuccessor(Node node) {
/*
* If status is negative (i.e., possibly needing signal) try
* to clear in anticipation of signalling. It is OK if this
* fails or if status is changed by waiting thread.
*/
int ws = node.waitStatus;
if (ws < 0) // 非 CANCELLED 状态
compareAndSetWaitStatus(node, ws, 0);
/*
* Thread to unpark is held in successor, which is normally
* just the next node. But if cancelled or apparently null,
* traverse backwards from tail to find the actual
* non-cancelled successor.
*/
Node s = node.next;
if (s == null || s.waitStatus > 0) { // CANCELLED
s = null;
// 从后往前找到node的第一个waitStatus <= 0的后继节点
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
LockSupport.unpark(s.thread); // 解除阻塞状态
}
释放锁流程
如果当前线程持有锁,则会减少该线程的持有计数。每当线程成功获取锁时,持有计数会增加;释放锁时,计数会相应减少。
释放锁的条件
- 持有计数为零:当持有计数减小到零时,表示当前线程完全释放了锁,此时会进行锁的状态更新。
- 唤醒等待线程:如果有其他线程在等待获取该锁,ReentrantLock 会唤醒等待队列中的一个或多个线程,以便它们可以尝试获取锁。
public void unlock() {
sync.release(1);
}
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
// 存在等待队列
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
protected final boolean tryRelease(int releases) {
int c = getState() - releases;
// 检查当前线程(释放锁的线程)是否持有锁
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
if (c == 0) {
free = true;
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}
因此非公平锁和公平锁释放锁的流程是一样的
线程可以反复加锁,但也需要释放同样加锁次数的锁,即重入了多少次,就要释放多少次,不然也会导致锁不被释放。什么情况下会重入呢?比如下面这个:
public void m() {
for (int i = 0; i < 10; i++) {
lock.lock();
}
try {
// ... method body
} finally {
lock.unlock(); // 只释放一次,不会导致锁释放
}
}
中断处理
因为 acquireQueued 方法如果返回,那必然是获取锁成功,但是从入队到获取锁成功这段时间内线程是可能被中断的,中断需要由调用方进行处理。
acquireQueued 在获取锁失败进行阻塞时,会清除中断标志位
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);
return Thread.interrupted(); // 调用 currentThread().isInterrupted(true)
}
所以框架在此时中断当前线程,重置中断标志位,以让线程自己处理中断
static void selfInterrupt() {
Thread.currentThread().interrupt();
}
调用interrupt()
方法会将当前线程的中断状态设置为 true。这表示该线程希望被中断。该状态可以在其他地方通过Thread.interrupted()
或isInterrupted()
方法检查。
如果当前线程正在执行某些阻塞操作,如Object.wait()
、Thread.sleep()
或 BlockingQueue.take()
等,调用interrupt()
会导致这些操作抛出 InterruptedException。
这使得线程能够在等待或睡眠状态中被唤醒,以便做出适当的响应。
为什么要调用
currentThread().isInterrupted(true)
清除中断标志后然后又通过Thread.currentThread().interrupt()
重置中断状态?
中断加锁模式 lockInterruptibly
public void lockInterruptibly() throws InterruptedException {
sync.acquireInterruptibly(1);
}
public final void acquireInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
if (!tryAcquire(arg))
doAcquireInterruptibly(arg);
}
private void doAcquireInterruptibly(int arg) throws InterruptedException {
final Node node = addWaiter(Node.EXCLUSIVE);
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
// 如果有中断过,直接抛出中断异常
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}
Condition
下面这个示例使用Condition:
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
class BoundedBuffer {
private final int[] buffer;
private int count, putIndex, takeIndex;
private final ReentrantLock lock = new ReentrantLock();
private final Condition notEmpty = lock.newCondition();
private final Condition notFull = lock.newCondition();
public BoundedBuffer(int size) {
buffer = new int[size];
}
public void put(int value) throws InterruptedException {
lock.lock();
try {
while (count == buffer.length) {
notFull.await(); // 等待缓冲区不满
}
buffer[putIndex] = value;
if (++putIndex == buffer.length) putIndex = 0; // 循环使用
count++;
notEmpty.signal(); // 唤醒等待取数据的线程
} finally {
lock.unlock();
}
}
public int take() throws InterruptedException {
lock.lock();
try {
while (count == 0) {
notEmpty.await(); // 等待缓冲区不空
}
int value = buffer[takeIndex];
if (++takeIndex == buffer.length) takeIndex = 0; // 循环使用
count--;
notFull.signal(); // 唤醒等待放数据的线程
return value;
} finally {
lock.unlock();
}
}
}
ConditionObject
java.util.concurrent.locks.AbstractQueuedSynchronizer.ConditionObject
内部维护了一个单向链表
public class ConditionObject implements Condition {
/** First node of condition queue. */
private transient Node firstWaiter;
/** Last node of condition queue. */
private transient Node lastWaiter;
}
await
public final void await() throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
Node node = addConditionWaiter();
int savedState = fullyRelease(node);
int interruptMode = 0;
while (!isOnSyncQueue(node)) {
// 如果在同步队列中,则进行阻塞
LockSupport.park(this);
// checkInterruptWhileWaiting 调用的是
// Thread.interrupted() ? (transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) : 0;
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
if (node.nextWaiter != null) // clean up if cancelled
unlinkCancelledWaiters();
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
}
addConditionWaiter 方法用于将等待的节点加入链表尾部
private Node addConditionWaiter() {
Node t = lastWaiter;
// If lastWaiter is cancelled, clean out.
if (t != null && t.waitStatus != Node.CONDITION) {
unlinkCancelledWaiters();
// 尾节点
t = lastWaiter;
}
Node node = new Node(Thread.currentThread(), Node.CONDITION);
if (t == null)
// 没有等待的线程
firstWaiter = node;
else
t.nextWaiter = node;
lastWaiter = node;
return node;
}
删除所有取消的节点,其实就是链表操作,删除t.waitStatus != Node.CONDITION
的节点
private void unlinkCancelledWaiters() {
Node t = firstWaiter;
Node trail = null;
while (t != null) {
Node next = t.nextWaiter;
if (t.waitStatus != Node.CONDITION) {
t.nextWaiter = null;
// 头节点
if (trail == null)
firstWaiter = next;
else
trail.nextWaiter = next;
if (next == null)
lastWaiter = trail;
} else
trail = t;
// 下一个节点
t = next;
}
}
调用 AQS 释放锁
final int fullyRelease(Node node) {
boolean failed = true;
try {
int savedState = getState();
if (release(savedState)) {
failed = false;
return savedState;
} else {
throw new IllegalMonitorStateException();
}
} finally {
if (failed)
// 必然释放锁失败,也就是抛出 IllegalMonitorStateException 异常
node.waitStatus = Node.CANCELLED;
}
}
判断是否在AQS同步队列中
final boolean isOnSyncQueue(Node node) {
if (node.waitStatus == Node.CONDITION || node.prev == null)
return false;
if (node.next != null) // If has successor, it must be on queue
return true;
/*
* node.prev can be non-null, but not yet on queue because
* the CAS to place it on queue can fail. So we have to
* traverse from tail to make sure it actually made it. It
* will always be near the tail in calls to this method, and
* unless the CAS failed (which is unlikely), it will be
* there, so we hardly ever traverse much.
*/
return findNodeFromTail(node);
}
// 从尾节点开始找
private boolean findNodeFromTail(Node node) {
Node t = tail;
for (;;) {
if (t == node)
return true;
if (t == null)
return false;
t = t.prev;
}
}
transferAfterCancelledWait
final boolean transferAfterCancelledWait(Node node) {
if (compareAndSetWaitStatus(node, Node.CONDITION, 0)) {
enq(node);
return true;
}
/*
* If we lost out to a signal(), then we can't proceed
* until it finishes its enq(). Cancelling during an
* incomplete transfer is both rare and transient, so just
* spin.
*/
while (!isOnSyncQueue(node))
Thread.yield();
return false;
}
reportInterruptAfterWait
private void reportInterruptAfterWait(int interruptMode)
throws InterruptedException {
if (interruptMode == THROW_IE)
throw new InterruptedException();
else if (interruptMode == REINTERRUPT)
selfInterrupt();
}
signal
该方法用于唤醒一个在该条件上等待的线程。如果有多个线程在等待,signal 方法只会唤醒其中一个线程。
public final void signal() {
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
Node first = firstWaiter;
if (first != null)
doSignal(first);
}
private void doSignal(Node first) {
do {
if ( (firstWaiter = first.nextWaiter) == null)
lastWaiter = null;
first.nextWaiter = null;
} while (!transferForSignal(first) &&
(first = firstWaiter) != null);
}
将等待状态由Node.CONDITION
改为0,然后进入AQS同步队列
final boolean transferForSignal(Node node) {
/*
* If cannot change waitStatus, the node has been cancelled.
*/
if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
return false;
/*
* Splice onto queue and try to set waitStatus of predecessor to
* indicate that thread is (probably) waiting. If cancelled or
* attempt to set waitStatus fails, wake up to resync (in which
* case the waitStatus can be transiently and harmlessly wrong).
*/
Node p = enq(node);
int ws = p.waitStatus;
// CANCELLED
if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
LockSupport.unpark(node.thread);
return true;
}
signalAll
该方法用于唤醒所有在该条件上等待的线程。对每个Node都调用transferForSignal
方法
public final void signalAll() {
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
Node first = firstWaiter;
if (first != null)
doSignalAll(first);
}
private void doSignalAll(Node first) {
lastWaiter = firstWaiter = null;
do {
Node next = first.nextWaiter;
first.nextWaiter = null;
transferForSignal(first);
first = next;
} while (first != null);
}
对比Object#wait, notify/notifyAll
Condition 和 Object#wait
都用于实现线程间的协调与通信,但它们在设计、使用方式和功能上有一些重要的区别。以下是它们之间的比较:
-
基本概念
Object#wait
是 Java 的内置方法,任何对象都可以调用。线程在调用 wait() 方法时会释放该对象的监视器锁,并进入等待状态,直到被其他线程调用 notify() 或 notifyAll() 唤醒。
Condition是 java.util.concurrent.locks 包中的一个接口,通常与 ReentrantLock 一起使用。提供了更灵活的线程间通信机制,可以在条件不满足时阻塞线程,并在条件满足时唤醒。 -
使用方式
Object#wait
需要配合synchronized
使用,线程必须持有对象的监视器锁才能调用 wait(),否则会抛出IllegalMonitorStateException。唤醒等待的线程时,必须在同一个对象的锁上调用 notify() 或 notifyAll()。而Condition需要与 ReentrantLock 结合使用,调用 lock() 方法获取锁后,才能调用 await()。唤醒等待的线程时,可以使用 signal() 或 signalAll(),不需要在同一个对象上。
synchronized (obj) {
obj.wait(); // 进入等待状态
// 处理逻辑
}
lock.lock();
try {
condition.await(); // 进入等待状态
// 处理逻辑
} finally {
lock.unlock();
}
- 灵活性:Object#wait只能在对象的监视器锁上使用,缺乏灵活性。Condition支持多个条件变量,允许在同一个锁下定义多个不同的条件。更适合复杂线程协调需求
- 性能:
Object#wai
t在高并发场景下,性能可能较低,因为synchronized
会导致线程竞争和上下文切换。Condition通常在高并发环境中性能更优,因为它支持更细粒度的控制,可以避免不必要的上下文切换。 - 可读性和维护性
Object#wait代码可读性较低,容易导致死锁和复杂的错误。Condition提供更好的可读性和维护性,可以更清晰地表达线程之间的关系和条件。