文章目录
- J.U.C.包
- Lock
- ReadWriteLock
- LockSupport
- AQS
- ReentrantLock
- 对比synchronized
- 加锁原理
- 释放锁原理
- CountDownLatch
- CyclicBarrier
- Semaphore
J.U.C.包
java.util.concurrent
,简称 J.U.C.。是Java并发工具包,提供了在多线程编程中常用的工具类和框架,帮助开发者简化并发编程的复杂性,并提高程序的性能和可靠性。
java.util.concurrent.locks
包下常用的类与接口是JDK1.5
后新增的。lock
的出现是为了弥补synchronized
关键字解决不了的一些问题。例如,当一个代码块被synchronized
修饰了,一个线程获取了对应的锁,并执行该代码块时,其他线程只能一直等待,等待获取锁的线程释放锁。如果这个线程因为某些原因被堵塞了,没有释放锁,那么其他线程只能一直等待下去,导致效率很低。因此就需要有一种机制可以不让等待的线程一直无期限地等待下去,比如只等待一定的时间或者能够响应中断,通过Lock
就可以办到。
java.util.concurrent
包中的锁在locks
包下:
Lock
和ReadWriteLock
是两大锁的根接口,Lock
代表实现类是ReentrantLock
,ReadWriteLock
的代表实现类是ReentrantReadWriteLock
。
除了锁之外,java.util.concurrent
包还提供了一些其他的工具类和框架,如Semaphore
、CountDownLatch
、CyclicBarrier
等。
Lock
Lock
接口在Java的java.util.concurrent.locks
包中定义,用于实现更灵活的线程同步机制。与传统的 synchronized
关键字相比,Lock
接口提供了更多的操作和更细粒度的控制。在实际使用中,自然是能够替代synchronized
关键字的。
Lock
接口中的方法:
lock()
:lock()
方法是平常使用得最多的一个方法,就是用来获取锁。如果锁已经被另一个线程持有,则当前线程将会被阻塞,直到锁被释放。如果使用lock
方法必须主动去释放锁,并且在发生异常时,不会自动释放锁。因此使用Lock
必须在try-catch
块中进行,并且将释放锁的操作放在finally
块中进行,以保证锁一定被被释放,防止死锁的发生。public void increment() { lock.lock(); try { counter++; System.out.println(Thread.currentThread().getName() + ": " + counter); } finally { lock.unlock(); } }
lockInterruptibly()
:获取锁,但与lock()
方法不同,它允许线程在等待获取锁的过程中被中断。例如,当两个线程同时通过lock.lockInterruptibly()
想获取某个锁时,如果此时线程A获取到了锁,而线程B在等待,那么对线程B调用threadB.interrupt()
能够中断线程B的等待过程。当一个线程获取了锁之后,是不会被interrupt()
方法中断的。因为interrupt()
方法只能中断阻塞过程中的线程而不能中断正在运行过程中的线程。与synchronized
相比,当一个线程处于等待某个锁的状态,是无法被中断的,只有一直等待下去。public class LockInterruptiblyExample { private final Lock lock = new ReentrantLock(); private int counter = 0; public void increment() throws InterruptedException { lock.lockInterruptibly(); try { counter++; System.out.println(Thread.currentThread().getName() + ": " + counter); } finally { lock.unlock(); } } public static void main(String[] args) { LockInterruptiblyExample example = new LockInterruptiblyExample(); Runnable task = () -> { try { example.increment(); } catch (InterruptedException e) { System.out.println(Thread.currentThread().getName() + " was interrupted."); } }; Thread thread1 = new Thread(task); Thread thread2 = new Thread(task); thread1.start(); thread2.start(); thread2.interrupt(); // Interrupt the second thread } }
trylock()
:该方法的作用是尝试获取锁,如果锁可用则返回true
,不可用则返回false
。public class TryLockExample { private final Lock lock = new ReentrantLock(); private int counter = 0; public void increment() { if (lock.tryLock()) { try { counter++; System.out.println(Thread.currentThread().getName() + ": " + counter); } finally { lock.unlock(); } } else { System.out.println(Thread.currentThread().getName() + " could not acquire the lock."); } } public static void main(String[] args) { TryLockExample example = new TryLockExample(); Runnable task = example::increment; Thread thread1 = new Thread(task); Thread thread2 = new Thread(task); thread1.start(); thread2.start(); } }
newCondition
:Lock
接口提供了方法Condition newCondition();
,返回的Condition
类型也是一个接口,Condition
提供了更细粒度的线程通信控制,用于实现复杂的线程间协作。类似于Object
类中的wait()
、notify()
和notifyAll()
方法。await()
:当前线程等待,直到被通知或被中断。signal()
:唤醒一个等待线程。如果所有线程都在等待,则任意选择一个线程唤醒。signalAll()
:唤醒所有等待线程。
public class ConditionExample { private final Lock lock = new ReentrantLock(); private final Condition condition = lock.newCondition(); private int counter = 0; public void increment() { lock.lock(); try { while (counter == 0) { condition.await(); } counter++; System.out.println(Thread.currentThread().getName() + ": " + counter); condition.signal(); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } finally { lock.unlock(); } } public void reset() { lock.lock(); try { counter = 0; condition.signal(); } finally { lock.unlock(); } } public static void main(String[] args) { ConditionExample example = new ConditionExample(); Runnable incrementTask = example::increment; Runnable resetTask = example::reset; Thread thread1 = new Thread(incrementTask); Thread thread2 = new Thread(resetTask); thread1.start(); thread2.start(); } }
ReadWriteLock
ReadWriteLock
接口提供了一种用于在某些情况下可以显著提升并发性能的锁定机制。它允许多个读线程同时访问共享资源,但对写线程使用排他锁,这样读操作不会互相阻塞,而写操作会阻塞所有其他操作。
该接口有两个方法:
readLock()
:返回用于读取操作的锁。writeLock()
:返回用于写入操作的锁。
ReadWriteLock
管理一组锁,一个是只读的锁,一个是写锁。Java并发库中ReetrantReadWriteLock
实现了ReadWriteLock
接口并添加了可重入的特性。对于ReetrantReadWriteLock
其读锁是共享锁而写锁是独占锁,读锁的共享可保证并发读是非常高效的。需要注意的是,读写、写读、写写的过程是互斥的,只有读读不是互斥的。
public class ReadWriteLockExample {
private final ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
private final Lock readLock = readWriteLock.readLock();
private final Lock writeLock = readWriteLock.writeLock();
private int value = 0;
// 读操作
public int readValue() {
readLock.lock();
try {
System.out.println(Thread.currentThread().getName() + " Reading: " + value);
return value;
} finally {
readLock.unlock();
}
}
// 写操作
public void writeValue(int value) {
writeLock.lock();
try {
this.value = value;
System.out.println(Thread.currentThread().getName() + " Writing: " + value);
} finally {
writeLock.unlock();
}
}
public static void main(String[] args) {
ReadWriteLockExample example = new ReadWriteLockExample();
Runnable readTask = () -> {
for (int i = 0; i < 5; i++) {
example.readValue();
try {
Thread.sleep(100); // 模拟读取时间
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
};
Runnable writeTask = () -> {
for (int i = 0; i < 5; i++) {
example.writeValue(i);
try {
Thread.sleep(150); // 模拟写入时间
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
};
Thread thread1 = new Thread(readTask);
Thread thread2 = new Thread(readTask);
Thread thread3 = new Thread(writeTask);
thread1.start();
thread2.start();
thread3.start();
}
}
LockSupport
LockSupport
是java.util.concurrent.locks
包下的一个工具类。它提供了最基本的线程阻塞和解除阻塞的功能,通常用来构建更高级的同步机制。其中有两个重要的方法,通过park()
和unpark()
方法来实现阻塞和唤醒线程的操作,可以理解为wait()
和notify()
的加强版。
park()
:阻塞当前线程,直到线程被其他线程中断或调用unpark()
方法唤醒。unpark()
:唤醒指定线程。如果该线程尚未阻塞,则下一次调用park()
方法时不会阻塞。
传统等待唤醒机制是使用Object
中的wait()
方法让线程等待,使用Object
中的notify()
方法唤醒线程。或者使用JUC包中Condition
的await()
方法让线程等待,使用signal()
方法唤醒线程。
wait()
和notify()
/await()
和signal()
方法必须要在同步块或同步方法里且成对出现使用,如果没有在synchronized
代码块使用则抛出java.lang.IllegalMonitorStateException
。必须先wait()
/await()
后notify()
/signal()
,如果先notify()
后wait()
会出现另一个线程一直处于等待状态。
LockSupport
对比传统等待唤醒机制,能够解决传统等待唤醒问题。LockSupport
使用的是许可机制,而wait/notify
使用的是监视器机制。每个线程最多只有一个许可,调用park()
会消耗一个许可,如果有许可则会直接消耗这张许可然后退出,如果没有许可就堵塞等待许可可用。调用unpark()
则会增加一个许可,连续调用多次unpark()
和调用一次一样,只会增加一个许可。而且LockSupport
的park()
和unpark()
是可中断的,且无需在同步块中使用。
public class LockSupportProducerConsumer {
private static Object resource = null;
public static void main(String[] args) {
Thread consumer = new Thread(() -> {
System.out.println("Consumer waiting for resource");
while (resource == null) {
LockSupport.park();
}
System.out.println("Consumer consumed resource");
});
Thread producer = new Thread(() -> {
try {
Thread.sleep(2000); // Simulate some work with sleep
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
resource = new Object();
System.out.println("Producer produced resource");
LockSupport.unpark(consumer);
});
consumer.start();
producer.start();
}
}
LockSupport
类使用了一种名为Permit
的概念来做到阻塞和唤醒线程的功能,每个线程都有一个Permit
,Permit
只有两个值1和0,默认是0。官网解释LockSupport
是用来创建锁和同步其他类的基本线程的阻塞原语。LockSupport
最终调用的Unsafe
中的native
方法。以unpark、park
为例:
public static void unpark(Thread thread) {
if (thread != null)
UNSAFE.unpark(thread);
}
public static void park(Object blocker) {
Thread t = Thread.currentThread();
setBlocker(t, blocker);
UNSAFE.park(false, 0L);
setBlocker(t, null);
}
AQS
AQS是指java.util.concurrent.locks
包下的一个抽象类AbstractQueuedSynchronizer
译为,抽象的队列同步器。
同步器是在多线程编程中用于管理线程间协作和同步的机制。同步器通常用于协调线程的执行顺序、控制共享资源的访问以及管理线程的状态。常见的同步器包括:CountDownLatch、CyclicBarrier、Semaphore等。
在JUC包下,能够看到有许多类都继承了AQS,如ReentrantLock
、CountDownLatch
、ReentrantReadWriteLock
、Semaphore
。
AQS是用来构建锁或其它同步器组件的重要基础框架,以及是整个JUC体系的基石,它用于实现依赖先进先出队列的阻塞锁和相关的同步器。
AQS提供了一个框架,用于创建在等待队列中具有独占或共享模式的同步器。
AQS可以理解为一个框架,因为它定义了一些JUC包下常用"锁"的标准。AQS简单来说,包含一个status
和一个队列。status
保存线程持有锁的状态,用于判断该线程获没获取到锁,没获取到锁就去队列中排队。AQS中的队列,是指CLH队列(Craig, Landin, and Hagerste[三个人名组成])锁队列的变体,是一个双向队列。队列中的元素即Node
结点,每个Node
中包含:头结点、尾结点、等待状态、存放的线程等。Node
遵循从尾部入队,从头部出队的规则,即先进先出原则。
在多线程并发环境下,使用lock
加锁,当处在加锁与解锁之间的代码,只能有一个线程来执行。这时候其他线程不能够获取锁,如果不处理线程就会造成了堵塞。在AQS框架中,会将暂时获取不到锁的线程加入到队列里,这个队列就是AQS的抽象表现。它会将这些线程封装成队列的结点,通过CAS、自旋以及LockSupport.park()
的方式,维护state
变量的状态,使并发达到同步的效果。
ReentrantLock
ReentrantLock
译为可重入锁,是一种锁的实现类,它提供了比synchronized
关键字更广泛的锁定操作选项,提供了公平锁和非公平锁两种模式。
public class ReentrantLockExample {
private final ReentrantLock lock = new ReentrantLock();
private int counter = 0;
public void increment() {
lock.lock();
try {
counter++;
System.out.println(Thread.currentThread().getName() + " incremented counter to " + counter);
} finally {
lock.unlock();
}
}
public static void main(String[] args) {
ReentrantLockExample example = new ReentrantLockExample();
Runnable task = () -> {
for (int i = 0; i < 5; i++) {
example.increment();
try {
Thread.sleep(100);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
};
Thread thread1 = new Thread(task);
Thread thread2 = new Thread(task);
thread1.start();
thread2.start();
}
}
对比synchronized
Java提供了两种锁机制来控制多个线程对共享资源的互斥访问,第一个是JVM实现的 synchronized
,而另一个是 JDK 实现的 ReentrantLock
。
比较 | synchronized | ReentrantLock |
---|---|---|
锁的实现 | JVM实现 | JDK实现 |
性能 | synchronized 与 ReentrantLock 大致相同 | synchronized 与 ReentrantLock 大致相同 |
等待可中断 | 不可中断 | 可中断 |
公平锁 | 非公平锁 | 默认非公平锁,也可以是公平锁 |
锁绑定多个条件 | 不能绑定 | 可以同时绑定多个Condition对象 |
可重入 | 可重入锁 | 可重入锁 |
释放锁 | 自动释放锁 | 调用 unlock() 释放锁 |
等待唤醒 | 搭配wait()、notify或notifyAll()使用 | 搭配await()/singal()使用 |
synchronized
与ReentrantLock
最直观的区别就是,在使用ReentrantLock
的时候需要调用unlock
方法释放锁,所以为了保证一定释放,通常都是和 try-finally
配合使用的。在实际开发中除非需要使用ReentrantLock
的高级功能,否则优先使用synchronized
。这是因为synchronized
是JVM实现的一种锁机制,JVM原生地支持它,而ReentrantLock
不是所有的JDK版本都支持。并且使用synchronized
不用担心没有释放锁而导致死锁问题,因为JVM会确保锁的释放。
加锁原理
ReentrantLock
原理用到了AQS,而AQS包括一个线程队列和一个state
变量,state
,它的值有3种状态:没占用是0,占用了是1,大于1是可重入锁。所以ReentrantLock
加锁过程,可以简单理解为state
变量的变化。
在多线程并发环境下,某个线程持有锁,将state
由0设置为1,如果有其他线程再次进入,线程则会经过一系列判断,然后构建Node
结点,最终形成双向链表结构。最后执行LockSupport.park()
方法,将等待的线程挂起,如果当前持有锁的线程释放了锁,则将state
变量设置为0,调用LockSpoort.unpark()
方法指定唤醒等待队列中的某个线程。
ReentrantLock
加锁有两种形式,默认是非公平锁,但可以通过构造方法来指定为公平锁。
public static void main(String[] args) {
ReentrantLock reentrantLock = new ReentrantLock(true);
}
//⬇⬇⬇⬇⬇⬇⬇⬇⬇⬇⬇⬇⬇⬇⬇⬇⬇⬇⬇⬇⬇⬇⬇⬇⬇⬇⬇⬇⬇⬇⬇⬇⬇⬇
/**
* Creates an instance of {@code ReentrantLock} with the
* given fairness policy.
*
* @param fair {@code true} if this lock should use a fair ordering policy
*/
public ReentrantLock(boolean fair) {
sync = fair ? new FairSync() : new NonfairSync();
}
无论是公平锁还是非公平锁,由于用到了AQS框架,所以底层实现的逻辑大致是差不多的,ReentrantLock
加锁核心方法调用栈:
lock()
--> acquire()
--> tryAcquire()
--> addWaiter()
--> acquireQueued()
--> selfInterrupt()
公平锁还是非公平锁虽然大致逻辑差不多,但是区别总是有的,总的来说非公平锁比非公平锁在代码里面多了几行判断。
// ===========重写 lock 方法对比===========
// 公平锁
final void lock() {
acquire(1);
}
// 非公平锁
final void lock() {
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
else
acquire(1);
}
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
// ===========重写 tryAcquire 方法对比===========
// 公平锁
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
// 非公平锁
protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
在重写的tryAcquire
方法里,公平锁在获取同步状态时多了一个限制条件即hasQueuedPredecessors()
方法。该方法作用是保证等待队列中的线程按照从头到尾的顺序排队获取锁。举个例子,目前队列中有两个线程A、B,线程A,在线程B的前面。在当前线程释放锁的时候,线程B获取到了锁,该方法会判断当前头结点的下一个结点中存放的线程跟当前线程是否相同。在这个例子中头结点的下一个结点存放的线程是傀儡结点线程为null
,而当前线程是线程B,所以返回true
,回到上一个方法true
取反就是false
所以获取锁失败。
public final boolean hasQueuedPredecessors() {
// The correctness of this depends on head being initialized
// before tail and on head.next being accurate if the current
// thread is first in queue.
Node t = tail; // Read fields in reverse initialization order
Node h = head;
Node s;
return h != t &&
((s = h.next) == null || s.thread != Thread.currentThread());
}
在执行完tryAcquire
方法之后就会执行addWaiter
方法。addWaiter
方法作用为,当第一次将等待的线程添加到队列时,先会调用enq
方法。如果不是第一次调用,即尾结点不为空,队列中已经有了其他线程结点,则会直接将当前线程的前结点指向尾结点,即队列中最后一个线程结点。然后用CAS将前一个结点的下一个结点指向当前结点,形成链表结构,最后返回添加到队列中的结点。
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
enq(node);
return node;
}
enq
方法作用是将等待获取锁的线程封装成Node
结点,并将Node
结点串联起来,形成双向链表结构,简而言之就是将线程添加到等待队列中去。 该方法运用自旋机制,如果添加的结点为第一个结点,则会在第一个实际结点之前,生成一个“傀儡结点”。添加的第一个结点的前结点指向傀儡结点,尾结点指向实际结点。傀儡结点的后结点则指向添加的第一个结点。
private Node enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) { // Must initialize
if (compareAndSetHead(new Node()))
tail = head;
} else {
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
之后执行acquireQueued
方法,该方法用到了自旋机制。首先判断当前结点是否为头结点,如果是头结点,就让头结点中的线程尝试获取锁。如果不是头结点,执行shouldParkAfterFailedAcquire
方法尝试让当前线程挂起,直到持有锁的线程释放锁,唤醒等待的线程之后再去尝试获取锁。
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
shouldParkAfterFailedAcquire
方法,该方法首先判断上一个结点的waitStatus
。如果该队列只有一个结点,则上一个结点为头结点,此时头结点的waitStatus=0
,经过该方法会将上一个结点的waitStatus
通过CAS,设置为-1。因为最外部是一个自旋机制,会一直循环,所以当第二次进入该方法,则会直接返回true
。返回true
意味着当前线程将进入堵塞状态,会执行parkAndCheckInterrupt()
方法。
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
if (ws == Node.SIGNAL)
return true;
if (ws > 0) {
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
parkAndCheckInterrupt
方法,底层是调用LockSupport.park()
方法让线程挂起,直到持有锁的线程将它们唤醒。
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);
return Thread.interrupted();
}
ReentrantLock
其加锁核心方法为acquire
方法。最终执行完毕,下面的if
表达式返回true
,则执行selfInterrupt
方法中断线程。
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
ReentrantLock
在采用非公平锁构造时,首先检查锁状态,如果锁可用,直接通过CAS设置成持有状态,且把当前线程设置为锁的拥有者。如果当前锁已经被持有,那么接下来进行可重入检查,如果可重入,需要为锁状态加上请求数。如果不属于上面两种情况,那么说明锁是被其他线程持有,当前线程应该放入等待队列。
在放入等待队列的过程中,首先要检查队列是否为空队列,如果为空队列,需要创建虚拟的头节点,然后把对当前线程封装的节点加入到队列尾部。由于设置尾部节点采用了CAS,为了保证尾节点能够设置成功,ReentrantLock
采用了无限循环的方式,直到设置成功为止。
在完成放入等待队列任务后,则需要维护节点的状态,以及及时清除处于Cancel
状态的节点,来帮助垃圾收集器及时回收。如果当前节点之前的节点的等待状态小于1,说明当前节点之前的线程处于等待状态,那么当前节点的线程也应处于等待状态。通过LockSupport
类实现等待挂起的功能。当等待的线程被唤起后,检查中断状态,如果处于中断状态,那么需要中断当前线程。
释放锁原理
ReentrantLock
释放锁调用栈:
unlock()
--> release()
--> tryRelease()
--> unparkSuccessor()
在release
方法中如果tryRelease
方法返回true
,则判断队列头结点中的waitStatus
,如果不等于0则执行unparkSuccessor
方法,按顺序唤醒队列中等待的线程。
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
tryRelease
方法作用是尝试释放锁,首先获取当前持有锁线程的state
变量并使其减1。如果减1后的state
值等于0,则认为该线程马上要释放锁,将当前持有锁的线程设置为null
,将0设置为state
的新值并返回true
。
protected final boolean tryRelease(int releases) {
int c = getState() - releases;
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
if (c == 0) {
free = true;
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}
unparkSuccessor
方法用于唤醒等待队列中的后继节点。首先判断当前节点的等待状态如果小于0,将其设置为0。然后从尾部开始向前查找,直到找到一个有效的后继节点,如果找到一个有效的后继节点,唤醒其线程。
private void unparkSuccessor(Node node) {
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
LockSupport.unpark(s.thread);
}
CountDownLatch
count down latch
直译为倒计时门闩,也可以叫做闭锁。
门闩,汉语词汇。拼音:mén shuān 释义:指门关上后,插在门内使门推不开的滑动插销。
CountDownLatch
JDK文档注释:
A synchronization aid that allows one or more threads to wait until a set of operations being performed in other threads completes.
文档大意:一种同步辅助工具,允许一个或多个线程等待,直到在其他线程中执行的一组操作完成。
CountDownLatch
是Java中的一个同步工具类,用于使一个或多个线程等待其他线程完成一组操作。CountDownLatch
通过一个计数器实现,该计数器的初始值由构造方法指定,底层还是AQS。
public CountDownLatch(int count) {
if (count < 0) throw new IllegalArgumentException("count < 0");
this.sync = new Sync(count);
}
每调用一次countDown()
方法,计数器减一,当计数器到达零时,所有因调用await()
方法而等待的线程都将被唤醒。举个例子,晚上教室关门,要等同学都离开之后,再关门:
public class MainTest {
public static void main(String[] args) throws InterruptedException {
CountDownLatch countDownLatch = new CountDownLatch(7);
for (int i = 0; i < 7; i++){
new Thread(() -> {
System.out.println("同学"+Thread.currentThread().getName() + "\t 离开");
countDownLatch.countDown();
},String.valueOf(i)).start();
}
countDownLatch.await();
System.out.println("关门...");
}
}
CyclicBarrier
Cyclic Barrier
直译为循环屏障,是Java中关于线程的计数器,也可以叫它栅栏。
CyclicBarrier
JDK文档注释:
A synchronization aid that allows a set of threads to all wait for each other to reach a common barrier point. CyclicBarriers are useful in programs involving a fixed sized party of threads that must occasionally wait for each other. The barrier is called cyclic because it can be re-used after the waiting threads are released.
文档大意:一种同步辅助工具,允许一组线程相互等待到达一个共同的障碍点。cyclicbarrier
在包含固定大小的线程组的程序中非常有用,这些线程必须偶尔相互等待。这个屏障被称为cyclic
,因为它可以在等待的线程被释放后被重用。
它与CountDownLatch
的作用是相反的,CountDownLatch
是定义一个次数,然后减直到减到0,再去执行一些任务。而CyclicBarrier
是定义一个上限次数,从零开始加,直到加到定义的上限次数,再去执行一些任务。CountDownLatch
的计数器只能使用一次,而CyclicBarrier
的计数器可以使用reset()
方法重置,可以使用多次,所以CyclicBarrier
能够处理更为复杂的场景。例如,凑齐七颗龙珠召唤神龙:
public class MainTest {
public static void main(String[] args) {
CyclicBarrier cyclicBarrier = new CyclicBarrier(7,() -> {
System.out.println("凑齐七颗龙珠,召唤神龙!");
});
for (int i = 1; i <= 7;i++){
new Thread(() -> {
System.out.println("拿到"+Thread.currentThread().getName() + "星龙珠");
try {
cyclicBarrier.await();
} catch (InterruptedException | BrokenBarrierException e) {
e.printStackTrace();
}
},String.valueOf(i)).start();
}
}
}
CyclicBarrier
要做的事情是,让一组线程达到一个屏障时被阻塞,直到最后一个线程达到屏障时,所有被屏障拦截的线程才会继续干活,线程进入屏障通过CyclicBarrier.await()
方法。
CyclicBarrier
是基于ReentrantLock
实现的,其底层也是基于AQS。CyclicBarrier
通过一个内部的计数器和一个锁来实现线程间的协调。当所有线程都调用await
方法时,计数器递减,当计数器为零时,所有等待的线程将被唤醒,并重置计数器,以便下一次使用。
Semaphore
Semaphore
译为信号量,有时被称为信号灯。可以用来控制同时访问特定资源的线程数量,通过协调各个线程,保证合理的使用资源。信号量主要用于两个目的,一个是用于多个共享资源的互斥使用,另一个用于并发线程数量的控制。
Semaphore
JDK文档注释:
A counting semaphore. Conceptually, a semaphore maintains a set of permits. Each {@link #acquire} blocks if necessary until a permit is available, and then takes it. Each {@link #release} adds a permit, potentially releasing a blocking acquirer.
文档大意:Semaphore
是一个计数信号量。从概念上讲,信号量维护一组许可。如果需要,每个acquire
方法调用会阻塞,直到有一个许可可用,然后获取许可。每个release
方法调用会添加一个许可,可能会释放一个阻塞的线程。实际上,Semaphore
并没有维护实际的许可对象,只是维护一个可用许可的计数,并根据计数执行相应的操作。
举个例子,九辆车抢三个车位,车位满了之后只有等里面的车离开停车场外面的车才可以进入。
public class MainTest {
public static void main(String[] args) {
Semaphore semaphore = new Semaphore(3);
for (int i = 1; i <= 9; i++) {
new Thread(() -> {
try {
semaphore.acquire();
System.out.println("第" + Thread.currentThread().getName() + "辆车,抢到车位");
Thread.sleep(2000);
System.out.println("停车结束.");
} catch (InterruptedException e) {
e.printStackTrace();
}finally {
semaphore.release();
}
}, String.valueOf(i)).start();
}
}
}
Semaphore
通过一个计数器和一个队列来管理许可和等待线程。它依赖于AQS来实现同步逻辑。Semaphore
是用来保护一个或者多个共享资源的访问,Semaphore
内部维护了一个计数器,其值为可以访问的共享资源的个数。一个线程要访问共享资源,先获得信号量,如果信号量的计数器值大于1,意味着有共享资源可以访问,则使其计数器值减去1,再访问共享资源。如果计数器值为0,线程进入休眠。当某个线程使用完共享资源后,释放信号量,并将信号量内部的计数器加1,之前进入休眠的线程将被唤醒并再次试图获得信号量。
Semaphore
的核心方法为:
acquire()
:获取一个许可,如果没有可用的许可,当前线程将被阻塞,直到有许可可用。当调用semaphore.acquire()
方法时,当前线程会尝试去同步队列获取一个令牌,获取令牌的过程也就是使用原子操作去修改同步队列的state
,获取一个令牌则修改为state=state-1
。当计算出来的state<0
,则代表令牌数量不足,此时会创建一个Node
节点加入阻塞队列,挂起当前线程。当计算出来的state>=0
,则代表获取令牌成功。release()
:释放一个许可,将其返回到Semaphore
。当调用semaphore.release()
方法时,线程会尝试释放一个令牌,释放令牌的过程也就是把同步队列的state
修改为state=state+1
的过程。释放令牌成功之后,同时会唤醒同步队列中的一个线程。被唤醒的节点会重新尝试去修改state=state-1
的操作,如果state>=0
则获取令牌成功,否则重新进入阻塞队列,挂起线程。