聊聊AQS

news2024/11/15 13:58:25

Java中 AQS 是 AbstractQueuedSynchronizer 类,AQS 依赖 FIFO 队列来提供一个框架,这个框架用于实现锁以及锁相关的同步器,比如信号量、事件等。

在 AQS 中,主要有两部分功能,一部分是操作 state 变量,第二部分是实现排队和阻塞机制。

注意,AQS 并没有实现任何同步接口,它只是提供了类似 acquireInterruptible 的方法,调用这些方法可以实现锁和同步器。

1 管程模型

Java 使用 MESA 管程模型来管理类的成员变量和方法,让这个类的成员变量和方法的操作是线程安全的。下图是 MESA 管程模型,里面除了定义共享变量外,还定义了条件变量和条件变量等待队列:

上图中有三个知识点:

  • MESA 管程模型封装了共享变量和对共享变量的操作,线程要进入管程内部,必须获取到锁,如果获取锁失败就进入入口等待队列阻塞等待。

  • 如果线程获取到锁,就进入到管程内部。但是进入到管程内部,也不一定能立刻操作共享变量,而是要看条件变量是否满足,如果不满足,只能进入条件变量等待队列阻塞等待。

  • 在条件变量等待队列中,如果被其他线程唤醒,也不一定能立刻操作共享变量,而是需要去入口等待队列重新排队等待获取锁。

Java 中的 MESA 管程模型有一点改进,就是管程内部只有一个条件变量和一个等待队列。下图是 AQS 的管程模型:

AQS 的管程模型依赖 AQS 中的 FIFO 队列实现入口等待队列,要进入管程内部,就由各种并发锁的限制。而 ConditionObject 则实现了条件队列,这个队列可以创建多个。

下面就从入口等待队列、并发锁、条件等待队列三个方面来带你彻底理解 AQS。

2 入口等待队列

2.1 获取独占锁

独占, 忽略 interrupts

public final void acquire(int arg) {
    if (!tryAcquire(arg) &&
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
}

这里的 tryAcquire 是抽象方法,由 AQS 的子类来实现,因为每个子类实现的锁是不一样的。

2.1.1 入队

上面的代码可以看到,获取锁失败后,会先执行 addWaiter 方法加入队列,然后执行 acquireQueued 方法自旋地获取锁直到成功。

addWaiter 代码逻辑如下图,简单说就是把 node 入队,入队后返回 node 参数给 acquireQueued 方法:

这里有一个点需要注意,如果队列为空,则新建一个 Node 作为队头。

2.1.2 入队后获取锁

acquireQueued 自旋获取锁逻辑如下图:

这里有几个细节:

1.waitStatus

  • CANCELLED(1):当前节点取消获取锁。当等待超时或被中断(响应中断),会触发变更为此状态,进入该状态后节点状态不再变化;

  • SIGNAL(-1):后面节点等待当前节点唤醒;

  • CONDITION(-2):Condition 中使用,当前线程阻塞在 Condition,如果其他线程调用了 Condition 的 signal 方法,这个结点将从等待队列转移到同步队列队尾,等待获取同步锁;

  • PROPAGATE(-3):共享模式,前置节点唤醒后面节点后,唤醒操作无条件传播下去;

  • 0:中间状态,当前节点后面的节点已经唤醒,但是当前节点线程还没有执行完成。

2.获取锁失败后挂起

如果前置节点不是头节点,或者前置节点是头节点但当前节点获取锁失败,这时当前节点需要挂起,分三种情况:

  • 前置节点 waitStatus=-1,如下图:

  • 前置节点 waitStatus > 0,如下图:

  • 前置节点 waitStatus < 0 但不等于 -1,如下图:

3.取消获取锁

如果获取锁抛出异常,则取消获取锁,如果当前节点是 tail 节点,分两种情况如下图:

如果当前节点不是 tail 节点,也分两种情况,如下图:

4.对中断状态忽略

5.如果前置节点的状态是 0 或 PROPAGATE,会被当前节点自旋过程中更新成 -1,以便之后通知当前节点。

2.1.3 独占 + 响应中断

对应方法 acquireInterruptibly(int arg)。

跟忽略中断(acquire方法)不同的是要响应中断,下面两个地方响应中断:

  • 获取锁之前会检查当前线程是否中断。

  • 获取锁失败入队,在队列中自旋获取锁的过程中也会检查当前线程是否中断。如果检查到当前线程已经中断,则抛出 InterruptedException,当前线程退出。

2.1.4 独占 + 响应中断 + 考虑超时

对应方法 tryAcquireNanos(int arg, long nanosTimeout)。

这个方法具备了独占 + 响应中断 + 超时的功能,下面2个地方要判断是否超时:

  • 自旋获取锁的过程中每次获取锁失败都要判断是否超时;

  • 获取锁失败 park 之前要判断超时时间是否大于自旋的阈值时间 (spinForTimeoutThreshold = 1ns) 另外,park 线程的操作使用 parkNanos 传入阻塞时间。

2.2 释放独占锁

独占锁释放分两步:释放锁,唤醒后继节点。

释放锁的方法 tryRelease 是抽象的,由子类去实现。

我们看一下唤醒后继节点的逻辑,首先需要满足两个条件:

  • head 节点不等于 null;

  • head 节点 waitStatus 不等于 0。这里有两种情况(在方法 unparkSuccessor):

  • 情况一,后继节点 waitStatus <= 0,直接唤醒后继节点,如下图:

  • 情况二:后继节点为空或者 waitStatus > 0,从后往前查找最接近当前节点的节点进行唤醒,如下图:

2.3 获取共享锁

之前我们讲了独占锁,这一小节我们谈共享锁,有什么不同呢?

2.3.1 共享,忽略 interrupts

对应方法 acquireShared,代码如下:

public final void acquireShared(int arg) {
    if (tryAcquireShared(arg) < 0)
        doAcquireShared(arg);
}

2.3.2 tryAcquireShared

这里获取锁使用的方法是 tryAcquireShared,获取的是共享锁。获取共享锁跟获取独占锁不同的是,会返回一个整数值,说明如下:

  • 返回负数:获取锁失败。

  • 返回 0:获取锁成功但是之后再由线程来获取共享锁时就会失败。

  • 返回正数:获取锁成功而且之后再有线程来获取共享锁时也可能会成功。所以需要把唤醒操作传播下去。tryAcquireShared 获取锁失败后(返回负数),就需要入队后自旋获取,也就是执行方法 doAcquireShared。

2.3.3 doAcquireShared

怎么判断队列中等待节点是在等待共享锁呢?nextWaiter == SHARED,这个参数值是入队新建节点的时候构造函数传入的。

自旋过程中,如果获取锁成功(返回正数),首先把自己设置成新的 head 节点,然后把通知传播下去。如下图:

之后会唤醒后面节点并保证唤醒操作可以传播下去。但是需要满足四个条件中的一个:

  • tryAcquireShared 返回值大于0,有多余的锁,可以继续唤醒后继节点。

  • 旧的 head 节点 waitStatus < 0,应该是其他线程释放共享锁过程中把它的状态更新成了 -3。

  • 新的 hade 节点 waitStatus < 0,只要不是 tail 节点,就可能是 -1。这里会造成不必要的唤醒,因为唤醒后获取不到锁只能继续入队等待。

  • 当前节点的后继节点是空或者非空但正在等待共享锁。

唤醒后面节点的操作,其实就是释放共享锁,对应方法是 doReleaseShared,见释放共享锁一节。

2.3.4 共享 + 响应中断

对应方法 acquireSharedInterruptibly(int arg)。

跟共享忽略中断(acquireShared 方法)不同的是要响应中断,下面两个地方响应中断:

  • 获取锁之前会检查当前线程是否中断。

  • 获取锁失败入队,在队列中自旋获取锁的过程中也会检查当前线程是否中断。

如果检查到当前线程已经中断,则抛出 InterruptedException,当前线程退出。

2.3.5 共享 + 响应中断 + 考虑超时

对应方法 tryAcquireSharedNanos(int arg, long nanosTimeout)。

这个方法具备了共享 + 响应中断 + 超时的功能,下面两个个地方要判断是否超时:

  • 自旋获取锁的过程中每次获取锁失败都要判断是否超时。

  • 获取锁失败 park 之前要判断超时时间是否大于自旋的阈值时间(spinForTimeoutThreshold = 1ns)。

另外,park 线程的操作使用 parkNanos 传入阻塞时间。

2.4 释放共享锁

释放共享锁代码如下:

public final boolean releaseShared(int arg) {
    if (tryReleaseShared(arg)) {
        doReleaseShared();
        return true;
    }
    return false;
}

首先尝试释放共享锁,tryReleaseShared 代码由子类来实现。释放成功后执行AQS中的 doReleaseShared 方法,是一个自旋操作。

自旋的条件是队列中至少有两个节点,这里分三种情况。

情况一:当前节点 waitStatus 是 -1,如下图:

情况二:当前节点 waitStatus 是 0(被其他线程更新新成了中间状态),如下图:

情况三:当前节点 waitStatus 是 -3,为什么会这样呢?需要解释一下,head节点唤醒后继节点之前 waitStatus 已经被更新中间态 0 了,唤醒后继节点动作还没有执行,又被其他线程更成了 -3,也就是其他线程释放锁执行了上面情况二。这时需要先把 waitStatus 再更成 0 (在方法 unparkSuccessor),如下图:

2.5 抽象方法

上面的讲解可以看出,如果要基于 AQS 来实现并发锁,可以根据需求重写下面四个方法来实现,这四个方法在 AQS 中没有具体实现:

  • tryAcquire(int arg):获取独占锁

  • tryRelease(int arg):释放独占锁

  • tryAcquireShared(int arg):获取共享锁

  • tryReleaseShared(int arg):释放共享锁

AQS 的子类需要重写上面的方法来修改 state 值,并且定义获取锁或者释放锁时 state 值的变化。子类也可以定义自己的 state 变量,但是只有更新 AQS 中的 state变量才会对同步起作用。

还有一个判断当前线程是否持有独占锁的方法 isHeldExclusively,也可以供子类重写后使用。

获取/释放锁的具体实现放到下篇文章讲解。

2.6 总结

AQS 使用 FIFO 队列实现了一个锁相关的并发器模板,可以基于这个模板来实现各种锁,包括独占锁、共享锁、信号量等。

AQS 中,有一个核心状态是 waitStatus,这个代表节点的状态,决定了当前节点的后续操作,比如是否等待唤醒,是否要唤醒后继节点。

3 并发锁

这一章节讲解 Java AQS 中的并发锁。其实 Java AQS 中的并发锁主要是基于 state 这个变量值来实现的。

3.1 ReentrantLock

我们先来看一下 UML 类图:

从图中可以看到,ReentrantLock 使用抽象内部类 Sync 来实现了 AQS 的方法,然后基于 Sync 这个同步器实现了公平锁和非公平锁。主要实现了下面 3 个方法:

  • tryAcquire(int arg):获取独占锁

  • tryRelease(int arg):释放独占锁

  • isHeldExclusively:当前线程是否占有独占锁。ReentrantLock 默认实现的是非公平锁,可以在构造函数指定。

从实现的方法可以看到,ReentrantLock 中获取的锁是独占锁,我们再来看一下获取和释放独占锁的代码:

public final void acquire(int arg) {
    if (!tryAcquire(arg) &&
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
}

独占锁的特点是调用上面 acquire 方法,传入的参数是 1。

3.1.1 获取公平锁

获取锁首先判断同步状态(state)的值。

3.1.1.1 state 等于 0

这说明没有线程占用锁,当前线程如果符合下面两个条件,就可以获取到锁:

没有前任节点,如下图:

CAS 的方式更新 state 值(把 0 更新成 1)成功。如果获取独占锁成功,会更新 AQS 中 exclusiveOwnerThread 为当前线程,这个很容易理解。

3.1.1.2 state 不等于 0

这说明已经有线程占有锁,判断占有锁的线程是不是当前线程,如下图:

state += 1 值如果小于 0,会抛出异常。

如果获取锁失败,则进入 AQS 队列等待唤醒。

3.1.2 获取非公平锁

跟公平锁相比,非公平锁的唯一不同是如果判断到 state 等于 0,不用判断有没有前任节点,只要 CAS 设置 state 值(把 0 更新成 1)成功,就获取到了锁。

3.1.3 释放锁

公平锁和非公平锁,释放逻辑完全一样,都是在内部类 Sync 中实现的。释放锁需要注意两点,如下图:

为什么 state 会大于 1,因为是可以重入的,占有锁的线程可以多次获取锁。

3.1.4 总结

公平锁的特点是每个线程都要进行排队,不用担心线程永远获取不到锁,但有个缺点是每个线程入队后都需要阻塞和被唤醒,这一定程度上影响了效率。非公平锁的特点是每个线程入队前都会先尝试获取锁,如果获取成功就不会入队了,这比公平锁效率高。但也有一个缺点,队列中的线程有可能等待很长时间,高并发下甚至可能永远获取不到锁。

3.2 ReentrantReadWriteLock

我们先来看一下 UML 类图:

从图中可以看到,ReentrantReadWriteLock 使用抽象内部类Sync来实现了 AQS 的方法,然后基于 Sync 这个同步器实现了公平锁和非公平锁。主要实现了下面 3 个方法:

  • tryAcquire(int arg):获取独占锁

  • tryRelease(int arg):释放独占锁

  • tryAcquireShared(int arg):获取共享锁

  • tryReleaseShared(int arg):释放共享锁

  • isHeldExclusively:当前线程是否占有独占锁 可见ReentrantReadWriteLock里面同时用到了共享锁和独占锁。

下图是定义的几个常用变量:

下面这 2 个方法用户获取共享锁和独占锁的数量:

static int sharedCount(int c)    { return c >>> SHARED_SHIFT; }
static int exclusiveCount(int c) { return c & EXCLUSIVE_MASK; }

从sharedCount 可以看到,共享锁的数量要右移 16 位获取,也就是说共享锁占了高 16 位。从上图 EXCLUSIVE_MASK 的定义看到,跟 EXCLUSIVE_MASK 进行与运算,得到的是低 16 位的值,所以独占锁占了低 16 位。如下图:

这样上面获取锁数量的方法就很好理解了。

3.2.1 读锁

读锁的实现对应内部类 ReadLock。

3.2.1.1 获取读锁

获取读锁实际上是 ReadLock 调用了 AQS 的下面方法,传入参数是 1:

public final void acquireShared(int arg) {
    if (tryAcquireShared(arg) < 0)
        doAcquireShared(arg);
}

ReentrantReadWriteLock 内部类 Sync 实现了 tryAcquireShared 方法,主要包括如下三种情况:

  1. 使用 exclusiveCount 方法查看 state 中是否有独占锁,如果有并且独占线程不是当前线程,返回 -1,获取失败;

  2. 使用 sharedCount 查看 state 中共享锁数量,如果读锁数量小于最大值(MAX_COUNT=65535),则再满足下面 3 个条件就可以获取成功并返回 1:

a.当前线程不需要阻塞(readerShouldBlock)。在公平锁中,需要判断是否有前置节点,如下图就需要阻塞:

在非公平锁中,则是判断第一个节点是不是有独占锁,如下图就需要阻塞:

b.使用 CAS 把 state 的值加 SHARED_UNIT(65536)。这里是不是就更理解读锁占高位的说法了,获取一个读锁,state 的值就要加 SHARED_UNIT 这么多个。

c.给当前线程的 holdCount 加 1。

  1. 如果 2 失败,自旋,重复上面的步骤直到获取到锁。tryAcquireShared (获取共享锁)会返回一个整数,如下:

  • 返回负数:获取锁失败。

  • 返回 0:获取锁成功但是之后再由线程来获取共享锁时就会失败。

  • 返回正数:获取锁成功而且之后再有线程来获取共享锁时也可能会成功。

3.2.1.2 释放读锁

ReentrantReadWriteLock 释放读锁是在 ReadLock 中调用了 AQS 下面方法,传入的参数是1:

public final boolean releaseShared(int arg) {
    if (tryReleaseShared(arg)) {
        doReleaseShared();
        return true;
    }
    return false;
}

ReentrantReadWriteLock 内部类 Sync 实现了 releaseShared 方法,具体逻辑分为下面两步:

  1. 当前线程 holdCounter 值减 1。

  2. CAS的方式将 state 的值减去 SHARED_UNIT。

3.2.2 写锁

写锁的实现对应内部类 WriteLock。

3.2.2.1 获取写锁

ReentrantReadWriteLock 获取写锁其实是在 WriteLock 中调用了 AQS 的下面方法,传入参数 1:

public final void acquire(int arg) {
    if (!tryAcquire(arg) &&
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
}

在ReentrantReadWriteLock 内部类 Sync 实现了 tryAcquire 方法,首先获取 state 值和独占锁数量(exclusiveCount),之后分如下两种情况,如下图:

1.state 不等于 0:

  • 独占锁数量等于 0,这时说明有线程占用了共享锁,如果当前线程不是独占线程,获取锁失败。

  • 独占锁数量不等于 0,独占锁数量加 1 后大于 MAX_COUNT,获取锁失败。

  • 上面 2 种情况不符合,获取锁成功,state 值加 1。2.state 等于 0,判断当前线程是否需要阻塞(writerShouldBlock)。在公平锁中,跟 readerShouldBlock 的逻辑完全一样,就是判断队列中 head 节点的后继节点是不是当前线程。在非公平锁中,直接返回 false,即可以直接尝试获取锁。

如果当前线程不需要阻塞,并且给 state 赋值成功,使用 CAS 方式把 state 值加 1,把独占线程置为当前线程。

3.2.2.2 释放写锁

ReentrantReadWriteLock 释放写锁其实是在 WriteLock 中调用了 AQS 的下面方法,传入参数 1:

public final boolean release(int arg) {
    if (tryRelease(arg)) {
        Node h = head;
        if (h != null && h.waitStatus != 0)
            unparkSuccessor(h);
        return true;
    }
    return false;
}

ReentrantReadWriteLock 在 Sync 中实现了 tryRelease(arg) 方法,逻辑如下:

  1. 判断当前线程是不是独占线程,如果不是,抛出异常。

  2. state值减1后,用新state值判断独占锁数量是否等于0

  • 如果等于0,则把独占线程置为空,返回true,这样上面的代码就可以唤醒队列中的后置节点了

  • 如果不等于0,返回false,不唤醒后继节点。

3.3 CountDownLatch

我们先来看一下UML类图:

从上面的图中看出,CountDownLatch 的内部类 Sync 实现了获取共享锁和释放共享锁的逻辑。

使用 CountDownLatch 时,构造函数会传入一个 int 类型的参数 count,表示调动 count 次的 countDown 后主线程才可以被唤醒。

public CountDownLatch(int count) {
    if (count < 0) throw new IllegalArgumentException("count < 0");
    this.sync = new Sync(count);
}

上面的 Sync(count) 就是将 AQS 中的 state 赋值为 count。

3.3.1 await

CountDownLatch 的 await 方法调用了 AQS 中的 acquireSharedInterruptibly(int arg),传入参数 1,不过这个参数并没有用。代码如下:

public final void acquireSharedInterruptibly(int arg)
        throws InterruptedException {
    if (Thread.interrupted())
        throw new InterruptedException();
    if (tryAcquireShared(arg) < 0)
        doAcquireSharedInterruptibly(arg);
}

Sync 中实现了 tryAcquireShared 方法,await 逻辑如下图:

上面的自旋过程就是等待 state 的值不断减小,只有 state 值成为 0 的时候,主线程才会跳出自旋执行之后的逻辑。

3.3.2 countDown

CountDownLatch 的 countDown 方法调用了 AQS 的 releaseShared(int arg),传入参数 1,不过这个参数并没有用。内部类 Sync 实现了 tryReleaseShared 方法,逻辑如下图:

3.3.3 总结

CountDownLatch 的构造函数入参值会赋值给 state 变量,入队操作是主线程入队,每个子线程调用了countDown 后 state 值减 1,当 state 值成为 0 后唤醒主线程。

3.4 Semaphore

Semaphore 是一个信号量,用来保护共享资源。如果线程要访问共享资源,首先从 Semaphore 获取锁(信号量),如果信号量的计数器等于 0,则当前线程进入 AQS 队列阻塞等待。否则,线程获取锁成功,信号量减 1。使用完共享资源后,释放锁(信号量加 1)。

Semaphore 跟管程模型不一样的是,允许多个(构造函数的 permits)线程进入管程内部,因此也常用它来做限流。

UML 类图如下:

Semaphore的构造函数会传入一个int类型参数,用来初始化state的值。

3.4.1 acquire

获取锁的操作调用了 AQS 中的 acquireSharedInterruptibly 方法,传入参数 1,代码见 CountDownLatch 中 await 小节。Semaphore 在公平锁和非公平锁中分别实现了 tryAcquireShared 方法。

3.4.1.1 公平锁

Semaphore 默认使用非公平锁,如果使用公平锁,需要在构造函数指定。获取公平锁逻辑比较简单,如下图:

3.4.1.2 非公平锁

acquire 在非公平的锁唯一的区别就是不会判断 AQS 队列是否有前置节点(hasQueuedPredecessors),而是直接尝试获取锁。

除了 acquire 方法外,还有其他几个获取锁的方法,原理类似,只是调用了 AQS 中的不同方法。

3.4.2 release

释放锁的操作调用了 AQS 中的 releaseShared(int arg) 方法,传入参数 1,在内部类 Sync 中实现了 tryReleaseShared 方法,逻辑很简单:使用 CAS 的方式将 state 的值加 1,之后唤醒队列中的后继节点。

3.5 ThreadPoolExecutor

ThreadPoolExecutor 中也用到了 AQS,看下面的 UML 类图:

Worker 主要在 ThreadPoolExecutor 中断线程的时候使用。Worker 自己实现了独占锁,在中断线程时首先进行加锁,中断操作后释放锁。按照官方说法,这里不直接使用 ReentrantLock 的原因是防止调用控制线程池的方法(类似 setCorePoolSize)时能够重新获取到锁,

3.5.1 tryAcquire

使用 CAS 的方式把 AQS 中 state 从 0 改为 1,把当前线程置为独占线程。

3.5.2 tryRelease

把独占线程置为空,把 AQS 中 state 改为 0。

Worker 初始化的时候会把 state 置为 -1,这样是不能获取锁成功的。只有调用了 runWorker 方法,才会通过释放锁操作把 state 更为 0。这样保证了只中断运行中的线程,而不会中断等待中的线程。

3.6 总结

AQS 基于双向队列实现了入口等待队列,基于 state 变量实现了各种并发锁,上篇文章讲了入口等待队列,而这篇文章主要讲了基于 AQS 的并发锁原理。

4 条件变量等待队列

本章节主要讲解管程模型中条件变量等待队列。

4.1 官方示例

首先我们看一下官方给出的示例代码:

public class BoundedBuffer {
    final Lock lock = new ReentrantLock();
    final Condition notFull  = lock.newCondition();
    final Condition notEmpty = lock.newCondition();
    final Object[] items = new Object[100];
    int putptr, takeptr, count;

    public void put(Object x) throws InterruptedException {
        lock.lock();
        try {
            while (count == items.length)
                notFull.await();
            items[putptr] = x;
            if (++putptr == items.length) putptr = 0;
            ++count;
            notEmpty.signal();
        } finally {
            lock.unlock();
        }
    }

    public Object take() throws InterruptedException {
        lock.lock();
        try {
            while (count == 0)
                notEmpty.await();
            Object x = items[takeptr];
            if (++takeptr == items.length) takeptr = 0;
            --count;
            notFull.signal();
            return x;
        } finally {
            lock.unlock();
        }
    }
}

这个代码定义了两个条件变量,notFull 和 notEmpty,说明如下:

  1. 如果 items 数组已经满了,则 notFull 变量不满足,线程需要进入 notFull 条件等待队列进行等待。当 take 方法取走一个数组元素时,notFull 条件满足了,唤醒 notFull 条件等待队列中等待线程。

  2. 如果 items 数组为空,则 notEmpty 变量不满足,线程需要进入 notEmpty 条件等待队列进行等待。当 put 方法加入一个数组元素时,notEmpty 条件满足了,唤醒 notEmpty 条件等待队列中等待线程。

  3. 条件变量是绑定在 Lock 上的,示例代码使用了 ReentrantLock。在执行 await 和 signal 方法时首先要获取到锁。

4.2 原理简介

Java AQS 的条件变量等待队列是基于接口 Condition 和 ConditionObject 来实现的,URM 类图如下:

Condition 接口主要定义了下面3个方法:

  • await:进入条件等待队列

  • signal:唤醒条件等待队列中的元素

  • signalAll:唤醒条件等待队列中的所有元素

4.3 await

条件等待队列跟入口等待队列有两个不同:

  • 虽然二者共用了 Node 类,但是条件等待队列是单向队列,入口等待队列是双向队列,条件队列中下一个节点的引用是 nextWaiter,入口等待队列中下一个节点的引用是 next。

  • 条件等待队列中元素的 waitStatus 必须是 -2。await 方法的流程如下图:

4.3.1 进入条件等待队列

入队方法对应方法 addConditionWaiter,这里有三种情况:

  • 队列为空,则新建一个节点,如下图:

  • 队列非空,最后一个元素的 waitStatus 是 -2,如下图:

  • 队列非空,最后一个元素的 waitStatus 不是 -2,如下图:

可以看到,这种情况会从队列第一个元素开始检查 waitStatus 不是 -2 的元素,并从队列中移除。

4.3.2 释放锁

AQS 的并发锁是基于 state 变量实现的,线程进入条件等待队列后,要释放锁,即 state 会变为 0,释放操作会唤醒入口等待队列中的线程。对应方法 fullyRelease,返回值是释放锁减掉的 state 值 savedState。

4.3.3 阻塞等待

释放锁后,线程阻塞,自旋等待被唤醒。

4.3.4 唤醒之后

唤醒之后,当前线程主要有四个动作:

  • 转入入口等待队列,并把 waitStatus 改为 0。waitStatus 等于 0 表示中间状态,当前节点后面的节点已经唤醒,但是当前节点线程还没有执行完成。

  • 重新获取锁,如果获取成功,则当前线程成为入口等待队列头结点,interruptMode 置为 1。

  • 如果当前节点在条件等待队列中有后继节点,则剔除条件等待队列中 waitStatus!=-2 的节点,即队列中状态为取消的节点。

  • interruptMode 如果不等于 0,则处理中断。

4.3.5 一个细节

上面提到了 interruptMode,这个属性有三个值:

  • 0:没有被中断

  • -1:中断后抛出 InterruptedException,这种情况是当前线程阻塞,没有被 signal 之前发生了中断

  • 1:重新进入中断状态,这种情况是指当前线程阻塞,被 signal 之后发生了中断

4.3.6 扩展

AQS 还提供了其他几个 await 方法,如下:

  • awaitUninterruptibly:不用处理中断。

  • awaitNanos:自旋等待唤醒过程中有超时时间限制,超时则转入入口等待队列。

  • awaitUntil:自旋等待唤醒过程中有截止时间,时间到则转入入口等待队列。

4.4 signal

唤醒条件等待队列中的元素,首先判断当前线程是否持有独占锁,如果没有,抛出异常。

唤醒条件队列中的元素,会从第一个元素也就是 firstWaiter 开始,根据 firstWaiter 的 waitStatus 是不是 -2,分两种情况。

4.4.1 waitStatus==-2

条件队列第一个节点进入入口等待队列,等待获取锁,如下图:

这里有两个注意点:

  • 如果入口等待队列中 tail 节点的 waitStatus 小于等于 0,则 firstWaiter 加入后需要把旧 tail 节点置为 -1 (表示后面节点等待当前节点唤醒),如下图:

如果重置 waitStatus 状态失败,则 unpark 节点 firstWaiter。

  • 如果入口等待队列中 tail 节点的 waitStatus 大于 0,则 unpark 节点 firstWaiter。

4.4.2 waitStatus!=-2

如果 firstWaiter 的 waitStatus 不等于 -2,则查找 firstWaiter 的 nextWaiter,直到找到一个 waitStatus 等于 -2 的节点,然后将这个节点加入入口等待队列队尾,如下图:

4.4.3 waitStatus 修改

上面的两种情况无论哪种,进入入口等待队列之前都要用 CAS 的方式把 waitStatus 改为 0。

4.5 signalAll

理解了 signal 的逻辑,signalAll 的逻辑就非常容易理解了。首先判断当前线程是否持有独占锁,如果没有,抛出异常。

将条件等待队列中的所有节点依次加入入口等待队列。如下图:

4.6 使用案例

4.6.1 示例代码

Java 并发包下有很多类使用到了 AQS 中的 Condition,如下图:

这里我们以 CyclicBarrier 为例来讲解。CyclicBarrier 是让一组线程相互等待共同达到一个屏障点。从 Cyclic 可以看出 Barrier 可以循环利用,也就是当线程释放之后可以继续使用。

看下面这段示例代码:

public static void main(String[] args) {
    CyclicBarrier cyclicBarrier = new CyclicBarrier(2, () -> {
        System.out.println("栅栏中的线程执行完成");
    });
    ExecutorService executorService = Executors.newFixedThreadPool(2);

    executorService.submit(() -> {
        try {
            System.out.println("线程1:" + Thread.currentThread().getName());
            cyclicBarrier.await();
        } catch (Exception e) {
            e.printStackTrace();
        }
    });

    executorService.submit(() -> {
        try {
            System.out.println("线程2:" + Thread.currentThread().getName());
            cyclicBarrier.await();
        } catch (Exception e) {
            e.printStackTrace();
        }
    });

    executorService.shutdown();
}

执行结果:

线程1:pool-1-thread-1
线程2:pool-1-thread-2
栅栏中的线程执行完成

4.6.2 原理讲解

CyclicBarrier 初始化的时候,会指定线程的数量 count,每个线程执行完逻辑后,调用 CyclicBarrier 的 await 方法,这个方法首先将 count 减 1,然后调用 Condition的 await,让当前线程进入条件等待队列。当最后一个线程将 count 减 1 后,count 数量等于 0,这时就会调用 Condition 的 signalAll 方法唤醒所有线程。

4.7 总结

Java 的管程模型使用了 MESA 模型,基于 AQS 实现的 MESA 模型中,使用双向队列实现了入口等待队列,使用变量 state 实现了并发锁,使用 Condition 实现了条件等待队列。

在 AQS 的实现中,使用同步队列这个术语来表示双向队列,本文中使用入口等待队列来描述是为了更好的配合管程模型来讲解。

AQS 的 Condition 中,使用 await 方法将当前线程放入条件变量等待队列阻塞等待,使用 notify 来唤醒条件等待队列中的线程,被唤醒之后,线程并不能立刻执行,而是进入入口等待队列等待获取锁。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/129583.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

调用html5播放器时,出现播放器按钮太小的问题

用手机浏览器打开视频&#xff0c;有时会出现播放器按钮太小的情况&#xff0c;此时只需在<head>中加入下面这段viewport代码即可解决&#xff1a; <meta name"viewport" content"widthdevice-width, initial-scale1, maximum-scale1,minimum-scale1…

Docker下Mysql应用部署

目录 环境搭建 进入mysql 外部连接mysql 外部插入数据 查询容器数据 环境搭建 docker pull mysqlmkdir /root/mysql cd /root/mysqldocker run -id \ -p 3307:3306 \ --name my_sql \ -v $PWD/logs:/logs \ -v $PWD/data:/var/lib/mysql \ -v $PWD/conf:/etc/mysql/conf…

【开源项目】任务调度框架PowerJob介绍及源码解析

项目介绍 PowerJob&#xff08;原OhMyScheduler&#xff09;是全新一代分布式调度与计算框架&#xff0c;能让您轻松完成作业的调度与繁杂任务的分布式计算。 项目地址 源码&#xff1a;https://gitee.com/KFCFans/PowerJob官网&#xff1a;http://www.powerjob.tech/index…

前端期末考试试题及参考答案(01)

版权声明 本文原创作者&#xff1a;谷哥的小弟作者博客地址&#xff1a;http://blog.csdn.net/lfdfhl 一、 填空题 ______表示页面中一个内容区块或整个页面的标题。______表示页面中一块与上下文不相关的独立内容&#xff0c;比如一篇文章。CSS的引入方式有3种&#xff0c;分…

Python数据分析案例15——超市零售购物篮关联分析(apriori)

啤酒和纸尿裤的故事大多数人都听说过&#xff0c;纸尿裤的售卖提升了啤酒的销售额。 关联分析就是这样的作用&#xff0c;可以研究某种商品的售卖对另外的商品的销售起促进还是抑制的作用。 案例背景 本次案例背景是超市的零售数据&#xff0c;研究商品之间的关联规则。使用的…

移植SFUD,驱动SPI FLASH ZD25WQ80

1、关于SFUD SFUD (Serial Flash Universal Driver) 串行 Flash 通用驱动库&#xff0c;支持众多spi flash&#xff0c;关于SFUD的详细资料可参考&#xff1a;https://github.com/armink/SFUD。 2、为什么会有通用驱动 JEDEC &#xff08;固态技术协会&#xff09;针对串行 …

Python的22个万用公式,你确定不看看吗

前言 在大家的日常python程序的编写过程中&#xff0c;都会有自己解决某个问题的解决办法&#xff0c;或者是在程序的调试过程中&#xff0c;用来帮助调试的程序公式。 小编通过几十万行代码的总结处理&#xff0c;总结出了22个python万用公式&#xff0c;可以帮助大家解决在…

TypeScript中type和interface区别

typescript中interface介绍&#xff1a;TypeScript 中的接口 interface_疆~的博客-CSDN博客通常使用接口&#xff08;Interface&#xff09;来定义对象的类型。https://blog.csdn.net/qq_40323256/article/details/128478749 type type关键字是声明类型别名的关键字。用来给一…

windows 编译C++ boost库(超详细)

系列文章目录 文章目录系列文章目录前言一、windows二、b2.exe 参数前言 boost库其实不进行编译&#xff0c;大部分库也是可以正常使用的 而且也有一个开源工具vcpkg可以帮组我们下载编译&#xff0c;只是在国内用起来比较麻烦&#xff0c;而且还时常出bug 所以这里详细记录…

mac下,使用 docker 搭建,单机机器集群

背景&#xff1a; 在 Mac本下&#xff0c;通过 docker 完成一个 es 集群&#xff08;3台-或许可多台&#xff09;搭建。&#xff08;后续如果有真实的机器&#xff0c;只需要又该对应的 ip 地址即可&#xff0c;需要关注的是&#xff0c;机器间是可以互相 ping通的&#xff0c;…

4.3.5、IPv4 地址的应用规划

给定一个 IPv4 地址块&#xff0c;如何将其划分成几个更小的地址块&#xff0c;并将这些地址快分配给互联网中的不同网络&#xff0c; 进而可以给各网络中的主机和路由器接口分配 IPv4 地址。 一般有两种方法&#xff1a; 定长的子网掩码 FLSM &#xff08;Fixed Length Sub…

线程,进程以及Java中创建线程的多种方式

1. 前言 今天的这篇文章的目的还是为了讲述下什么叫线程&#xff0c;什么是进程。可能之前很多人都是通过背书得来的&#xff0c;今天就从通俗易懂的角度来分析下 2. 适合人群 线程以及进程的初学者 3. 开始 3.1 什么是程序 其实不管是程序/ 进程/ 线程都是基于操作系统而言…

141.环形链表

给你一个链表的头节点 head &#xff0c;判断链表中是否有环。 如果链表中有某个节点&#xff0c;可以通过连续跟踪 next 指针再次到达&#xff0c;则链表中存在环。 为了表示给定链表中的环&#xff0c;评测系统内部使用整数 pos 来表示链表尾连接到链表中的位置&#xff08;…

SuperMap iDesktop地质体模型匹配地形——精修地质体模型路线

作者&#xff1a;超图研究院技术支持中心-于丁 地质体模型匹配地形——精修地质体模型路线 相信大家开展地质体业务时&#xff0c;常常会遇到构建的精模地质体与DEM地形数据的交界面&#xff0c;嵌合效果不佳、相互压盖、渲染冲突不稳定&#xff08;闪面&#xff09;、掩盖、漂…

前端期末考试试题及参考答案(03)

版权声明 本文原创作者&#xff1a;谷哥的小弟作者博客地址&#xff1a;http://blog.csdn.net/lfdfhl 一、 填空题 transition属性中______规定应用过渡的CSS属性的名称。transition属性中______定义过渡效果花费的时间。transition属性中______属性规定过渡效果的时间曲线。…

GitHub入门指南(上)

前言 我去年入门 GitHub&#xff0c;一开始不知道从哪开始学&#xff0c;在网上查找了很多文章、教程学习。这篇文章就是以我刚学习时的小白视角写的&#xff0c;希望能帮助到想开始学习 GitHub 又不知如何上手的学习者。因为我也是初级水平&#xff0c;文中介绍的知识基于我自…

6.移动端布局-rem布局

1.rem基础 优点&#xff1a;可以通过修改html里边文字的大小来改变页面中其他元素的大小&#xff0c;可以实现整体控制 1.1 rem单位 rem(root em)是一个相对单位&#xff0c;类似于em。 em是相对于自身元素字体大小&#xff08;若自身没有设置font-size则基础父元素的字体大…

一次SQL调优 聊一聊 SQLSERVER 数据页

一&#xff1a;背景 1.讲故事 最近给一位朋友做 SQL 慢语句 优化&#xff0c;花了些时间调优&#xff0c;遗憾的是 SQLSERVER 非源码公开&#xff0c;玩起来不是那么顺利&#xff0c;不过从这次经历中我觉得明年的一个重大任务就是好好研究一下它&#xff0c;争取在 SQLSERVE…

uniapp实现音频播放抢占系统音频焦点

项目为使用uniapp框架开发的Android/iOS APP应用 实现功能需求 假设手机正在播放音乐&#xff0c;当前APP处于前台收到消息&#xff0c;需播放提示音提示用户。目标为降低后台正在播放音乐的音量&#xff0c;播放提示音&#xff0c;播放完毕后恢复后台音乐音量 需求分析 乍…

拉伯证券|新能源汽车前11月产销翻倍,渗透率升至三分之一

2022年11月&#xff0c;国内新能源轿车渗透率已升至33.8%&#xff0c;创前史新高。 2022年的最终一个交易日早盘&#xff0c;两市高开高走&#xff0c;沪指涨0.61%&#xff0c;深证成指涨0.35%&#xff0c;创业板指涨0.3%。板块上来看&#xff0c;Web3.0、虚拟人、网络游戏概念…