一、park/unpark阻塞唤醒线程
LockSupport是JDK中用来实现线程阻塞和唤醒的工具。使用它可以在任何场合使线程阻塞,可以指定任何线程进行唤醒,并且不用担心阻塞和唤醒操作的顺序,但要注意连续多次唤醒的效果和一次唤醒是一样的。JDK并发包下的锁和其他同步工具的底层实现中大量使用了LockSupport进行线程的阻塞和唤醒,掌握它的用法和原理可以让我们更好的理解锁和其它同步工具的底层实现。
public class LockSupportTest {
public static void main(String[] args) {
Thread parkThread = new Thread(new ParkThread());
parkThread.start();
System.out.println("唤醒parkThread");
LockSupport.unpark(parkThread);
}
static class ParkThread implements Runnable{
public void run() {
System.out.println("ParkThread开始执行");
LockSupport.park();
System.out.println("ParkThread执行完成");
}
}
}
一、LockSupport阻塞和唤醒线程原理
每个线程都有Parker实例
class Parker : public os::PlatformParker {
private:
volatile int _counter ;
...
public:
void park(bool isAbsolute, jlong time);
void unpark();
...
}
class PlatformParker : public CHeapObj<mtInternal> {
protected:
pthread_mutex_t _mutex [1] ;
pthread_cond_t _cond [1] ;
...
}
LockSupport就是通过控制变量_counter来对线程阻塞唤醒进行控制的。原理有点类似于信号量机制。
当调用park()方法时,会将_counter置为0,同时判断前值,大于1说明前面被unpark过,则直接退出,否则将使该线程阻塞。
当调用unpark()方法时,会将_counter置为1,同时判断前值,小于1会进行线程唤醒,否则直接退出。
形象的理解,线程阻塞需要消耗凭证(permit),这个凭证最多只有1个。当调用park方法时,如果有凭证,则会直接消耗掉这个凭证然后正常退出;但是如果没有凭证,就必须阻塞等待凭证可用;而unpark则相反,它会增加一个凭证,但凭证最多只能有1个。
1、为什么可以先唤醒线程后阻塞线程?
因为unpark获得了一个凭证,之后调用park因为有凭证消费,故不会阻塞。
2、为什么唤醒两次后阻塞两次会阻塞线程?
因为凭证的数量最多为1,连续调用两次unpark和调用一次unpark效果一样,只会增加一个凭证;而调用两次park却需要消费两个凭证。
二、AQS原理
一、什么是AQS
java.util.concurrent包中的大多数同步器实现都是围绕着共同的基础行为,比如等待队列、条件队列、独占获取、共享获取等,而这些行为的抽象就是基于AbstractQueuedSynchronizer(简称AQS)实现的,AQS是一个抽象同步框架,可以用来实现一个依赖状态的同步器。
JDK中提供的大多数的同步器如Lock, Latch, Barrier等,都是基于AQS框架来实现的。
AQS具备的特性:
1、阻塞等待队列
2、共享/独占
3、公平/非公平
4、可重入
5、允许中断。
AQS内部维护属性volatile int state,state表示资源的可用状态。
State三种访问方式:
1、getState()
2、setState()
3、compareAndSetState()
AQS定义两种资源共享方式
1、Exclusive-独占,只有一个线程能执行,如ReentrantLock
2、Share-共享,多个线程可以同时执行,如Semaphore/CountDownLatch
AQS定义两种队列
同步等待队列: 主要用于维护获取锁失败时入队的线程。
条件等待队列: 调用await()的时候会释放锁,然后线程会加入到条件队列,调用signal()唤醒的时候会把条件队列中的线程节点移动到同步队列中,等待再次获得锁。
AQS 定义了5个队列中节点状态:
1. 值为0,初始化状态,表示当前节点在sync队列中,等待着获取锁。
2. CANCELLED,值为1,表示当前的线程被取消;
3. SIGNAL,值为-1,表示当前节点的后继节点包含的线程需要运行,也就是unpark;
4. CONDITION,值为-2,表示当前节点在等待condition,也就是在condition队列中;
5. PROPAGATE,值为-3,表示当前场景下后续的acquireShared能够得以执行;
不同的自定义同步器竞争共享资源的方式也不同。自定义同步器在实现时只需要实现共享资源state的获取与释放方式即可,至于具体线程等待队列的维护(如获取资源失败入队/唤醒出队等),AQS已经在顶层实现好了。自定义同步器实现时主要实现以下几种方法:
isHeldExclusively():该线程是否正在独占资源。只有用到condition才需要去实现它。
tryAcquire(int):独占方式。尝试获取资源,成功则返回true,失败则返回false。
tryRelease(int):独占方式。尝试释放资源,成功则返回true,失败则返回false。
tryAcquireShared(int):共享方式。尝试获取资源。负数表示失败;0表示成功,但没有剩余可用资源;正数表示成功,且有剩余资源。
tryReleaseShared(int):共享方式。尝试释放资源,如果释放后允许唤醒后续等待结点返回true,否则返回false。
二、同步等待队列
AQS当中的同步等待队列也称CLH队列,CLH队列是一种基于双向链表数据结构的队列,是FIFO先进先出线程等待队列。
AQS 依赖CLH同步队列来完成同步状态的管理:
当前线程如果获取同步状态失败时,AQS则会将当前线程已经等待状态等信息构造成一个节点(Node)并将其加入到CLH同步队列,同时会阻塞当前线程。当同步状态释放时,会把首节点唤醒(公平锁),使其再次尝试获取同步状态。通过signal或signalAll将条件队列中的节点转移到同步队列。(由条件队列转化为同步队列)。
三、条件等待队列
AQS中条件队列是使用单向列表保存的,用nextWaiter来连接:
调用await方法阻塞线程;
当前线程存在于同步队列的头结点,调用await方法进行阻塞(从同步队列转化到条件队列)
四、Condition接口详解
1. 调用Condition#await方法会释放当前持有的锁,然后阻塞当前线程,同时向Condition队列尾部添加一个节点,所以调用Condition#await方法的时候必须持有锁。
2. 调用Condition#signal方法会将Condition队列的首节点移动到阻塞队列尾部,然后唤醒因调用Condition#await方法而阻塞的线程(唤醒之后这个线程就可以去竞争锁了),所以调用Condition#signal方法的时候必须持有锁,持有锁的线程唤醒被因调用Condition#await方法而阻塞的线程。
三、ReentrantLock源码流程
下面是三个线程启动争抢lock的一个demo
public class ReentrantLockDemo {
private static int sum = 0;
private static Lock lock = new ReentrantLock();
public static void main(String[] args) throws InterruptedException {
for (int i = 0; i < 3; i++) {
Thread thread = new Thread(()->{
lock.lock();
try {
for (int j = 0; j < 10000; j++) {
sum++;
}
} finally {
lock.unlock();
}
});
thread.start();
}
Thread.sleep(2000);
System.out.println(sum);
}
}
一、ReentrantLock#lock方法
从构造方法可知,ReentrantLock默认为非公平锁
public ReentrantLock() {
sync = new NonfairSync();
}
如果compareAndSetState方法是原子化的,如果执行成功则表示获取到锁
final void lock() {
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
else
acquire(1);
}
获取到锁成功后则执行方法setExclusiveOwnerThread将lock中的exclusiveOwnerThread属性修改为当前线程,标记被该线程持有。
protected final void setExclusiveOwnerThread(Thread thread) {
exclusiveOwnerThread = thread;
}
一、ReentrantLock#compareAndSetState
加锁逻辑是通过CAS修改lock本身的state,state为0的时候表示锁为未被持有状态,state大于1的时候表示锁被线程持有。
protected final boolean compareAndSetState(int expect, int update) {
// See below for intrinsics setup to support this
return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
}
二、ReentrantLock#acquire
如果compareAndSetState方法执行失败,那么就会继续执行tryAcquire方法,该方法仍然是尝试获取锁,如果获取失败则将该线程包装为Node节点入队并阻塞。
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
一、ReentrantLock#tryAcquire
nonfairTryAcquire中当前线程先去获取当前锁state状态,如果为0就再次执行CAS去争抢锁,抢不到就会入队;如果不为0,就判断当前线程跟持有锁的线程是否是同一个线程,如果是就将state加1,那么此时也就再次获取到锁,这里体现了ReentrantLock锁的可重入逻辑。
protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
二、ReentranLock#addWaiter
Node node = new Node(Thread.currentThread(), mode);将当前线程包装为一个Node入队,
首次进来的时候队列是null,tail节点也是null,就会执行enq方法,创建一个空节点Node作为队尾,然后再将线程节点加到队尾;如果队列不为null,那么就直接加到当前队尾节点tail节点的后面
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
enq(node);
return node;
}
private Node enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) { // Must initialize
if (compareAndSetHead(new Node()))
tail = head;
} else {
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
三、ReentranLock#acquireQueued
入队之后开始执行acquireQueued方法,for循环中判断当前节点的前驱节点是头结点的话,就再次执行tryAcquire获取锁,如果获取到了,就将该节点设置为头结点,p.next=null删除前驱节点再垃圾回收。
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
如果没有获取到锁则执行到shouldParkAfterFailedAcquire方法中,如果前驱节点的waitStatus为Node.SIGNAL(-1),这个状态上面已经提到,表示该状态节点的后继节点包含的线程需要运行,也就是说node节点后面会通过pred节点执行unpark方法唤醒node节点线程。
如果前驱节点的waitStatus大于0(CANCELLED:1),表示当前线程被取消,无需再争抢线程,需要将这些线程从队列中移除。
如果这两个状态都不是,那么waitStatus就一定是0或-3,这种场景需要将前驱节点waitStatus更新SINGAL(-1),后面通过前驱节点将该线程唤醒。
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
if (ws == Node.SIGNAL)
/*
* This node has already set status asking a release
* to signal it, so it can safely park.
*/
return true;
if (ws > 0) {
/*
* Predecessor was cancelled. Skip over predecessors and
* indicate retry.
*/
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
/*
* waitStatus must be 0 or PROPAGATE. Indicate that we
* need a signal, but don't park yet. Caller will need to
* retry to make sure it cannot acquire before parking.
*/
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
shouldParkAfterFailedAcquire执行成功后,就执行park方法将该线程阻塞。
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);
return Thread.interrupted();
}
四、ReentranLock#cancelAcquire
cancelAcquire主要是将释放锁的节点从队列中移除。
private void cancelAcquire(Node node) {
// Ignore if node doesn't exist
if (node == null)
return;
node.thread = null;
// Skip cancelled predecessors
Node pred = node.prev;
while (pred.waitStatus > 0)
node.prev = pred = pred.prev;
// predNext is the apparent node to unsplice. CASes below will
// fail if not, in which case, we lost race vs another cancel
// or signal, so no further action is necessary.
Node predNext = pred.next;
// Can use unconditional write instead of CAS here.
// After this atomic step, other Nodes can skip past us.
// Before, we are free of interference from other threads.
node.waitStatus = Node.CANCELLED;
// If we are the tail, remove ourselves.
if (node == tail && compareAndSetTail(node, pred)) {
compareAndSetNext(pred, predNext, null);
} else {
// If successor needs signal, try to set pred's next-link
// so it will get one. Otherwise wake it up to propagate.
int ws;
if (pred != head &&
((ws = pred.waitStatus) == Node.SIGNAL ||
(ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&
pred.thread != null) {
Node next = node.next;
if (next != null && next.waitStatus <= 0)
compareAndSetNext(pred, predNext, next);
} else {
unparkSuccessor(node);
}
node.next = node; // help GC
}
}
三、ReentranLock#unlock
释放锁的过程就两步,将state更新为0,唤醒下个节点线程。
public void unlock() {
sync.release(1);
}
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
一、ReentranLock#tryRelease
将当前state减一,如果减去一之后为0,就讲exclusiveOwnerThread设置为null,此时锁就是未被持有状态。
protected final boolean tryRelease(int releases) {
int c = getState() - releases;
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
if (c == 0) {
free = true;
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}
二、ReentranLock#unparkSuccessor
如果当前头结点的状态不为0,就执行unparkSuccessor方法。正常情况下,节点状态为-1(SIGNAL),然后需要更新为0,并开始从尾结点循环查找waitStatus小于等于0的最后一个节点,对于节点状态大于0的都是需要取消的,无需获取锁,最终调用LockSupport.unpark(s.thread)唤醒线程,继续获取锁。
private void unparkSuccessor(Node node) {
/*
* If status is negative (i.e., possibly needing signal) try
* to clear in anticipation of signalling. It is OK if this
* fails or if status is changed by waiting thread.
*/
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
/*
* Thread to unpark is held in successor, which is normally
* just the next node. But if cancelled or apparently null,
* traverse backwards from tail to find the actual
* non-cancelled successor.
*/
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
LockSupport.unpark(s.thread);
}
四、condition源码流程
public class ProducerConsumerTest {
private final ReentrantLock lock = new ReentrantLock();
//非空
private final Condition notEmpty = lock.newCondition();
//非满
private final Condition notFull = lock.newCondition();
//缓冲区:初始元素都为0
private final int[] buffer = new int[5];
//当前元素数量
private int count = 0;
//生产者
public void produce() {
lock.lock();
try {
while (count == buffer.length) { // 如果队列满了
System.out.println("====== 缓冲区已满,生产者正在等待...... ======");
//等待notFull非满条件
notFull.await();
}
//缓冲区添加1个元素
buffer[count++] = 1;
System.out.println("生产了一个项目。缓冲区大小:" + count);
//唤醒消费者
notEmpty.signal();
} catch (Exception e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
//消费者
public void consume() {
lock.lock();
try {
while (count == 0) { // 如果队列空了
System.out.println("====== 缓冲区是空的,消费者正在等待...... ======");
//等待notEmpty非空条件
notEmpty.await();
}
//缓冲区消费1个元素
buffer[--count] = 0;
System.out.println("消费了一个项目。缓冲区大小:" + count);
//唤醒生产者
notFull.signal();
} catch (Exception e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
public static void main(String[] args) {
ProducerConsumerTest pc = new ProducerConsumerTest();
Thread producer = new Thread(() -> {
try {
while (true) {
pc.produce();
Thread.sleep(500); // 模拟生产时间
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});
Thread consumer = new Thread(() -> {
try {
while (true) {
pc.consume();
Thread.sleep(1000); // 模拟消费时间
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});
producer.start();
consumer.start();
}
}
一、condition#await
await中会调用addConditionWaiter方法将节点加入到条件队列,然后fullyRelease方法释放锁,最后将线程给park;
当执行signal方法unpark线程后,线程就会执行accquireQueue方法去重新获取锁。
public final void await() throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
Node node = addConditionWaiter();
long savedState = fullyRelease(node);
int interruptMode = 0;
while (!isOnSyncQueue(node)) {
LockSupport.park(this);
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
if (node.nextWaiter != null) // clean up if cancelled
unlinkCancelledWaiters();
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
}
二、condition#signal
signal方法将条件队列中的中的线程转移到同步队列的末尾,然后unpark该线程。去重新获取锁
private void doSignal(Node first) {
do {
if ( (firstWaiter = first.nextWaiter) == null)
lastWaiter = null;
first.nextWaiter = null;
} while (!transferForSignal(first) &&
(first = firstWaiter) != null);
}
final boolean transferForSignal(Node node) {
/*
* If cannot change waitStatus, the node has been cancelled.
*/
if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
return false;
/*
* Splice onto queue and try to set waitStatus of predecessor to
* indicate that thread is (probably) waiting. If cancelled or
* attempt to set waitStatus fails, wake up to resync (in which
* case the waitStatus can be transiently and harmlessly wrong).
*/
Node p = enq(node);
int ws = p.waitStatus;
if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
LockSupport.unpark(node.thread);
return true;
}