【源码】【Java并发】【AQS】从ReentrantLock、Semaphore、CutDownLunch、CyclicBarrier看AQS源码

news2025/4/22 21:14:24

👋hi,我不是一名外包公司的员工,也不会偷吃茶水间的零食,我的梦想是能写高端CRUD

🔥 2025本人正在沉淀中… 博客更新速度++

👍 欢迎点赞、收藏、关注,跟上我的更新节奏

📚欢迎订阅专栏,专栏名《在2B工作中寻求并发是否搞错了什么》

前言

经过上一篇的学习,我们知道了。AQS的基本原理和使用。

【Java并发】【AQS】适合初学者体质的AQS入门

主播觉得,AQS的原理,就是通过这2个队列的协助,实现核心功能,同步队列(CLH队列)和条件队列(Condition队列)。

在这里插入图片描述

同步队列(CLH队列)

  • 作用:管理需要获取锁的线程。当多个线程竞争共享资源时,未获取到锁的线程会被封装成节点,按FIFO顺序加入阻塞队列,等待唤醒后重新尝试获取锁。
  • 解决的问题
    实现锁的公平性线程排队机制。通过CLH队列,AQS可以按顺序唤醒线程(如公平锁),避免线程无休止自旋竞争资源,减少CPU开销。

条件队列(Condition队列)

  • 作用:管理等待特定条件的线程。当线程调用Condition.await()时,会释放锁并进入条件队列;当其他线程调用Condition.signal()时,条件队列中的线程会被转移到阻塞队列,重新参与锁竞争。
  • 解决的问题
    实现线程间的精细化协作(如生产者-消费者模式)。例如:
    • 生产者线程在队列满时,通过条件队列挂起,而非占用锁空等。
    • 消费者线程消费数据后,通过signal()唤醒生产者,解耦等待条件与锁竞争。

下面,主播会通过ReentrantLock、Semaphore、CutDownLunch、CyclicBarrier的源码的角度来分析。它们都是基于AQS的实现。带大家看看AQS到底有啥?

以ReentrantLock的角度看AQS独占实现

ReentrantLock的简单使用

简单看下ReentrantLock怎么使用的独占锁。

public class SimpleLockDemo {
    static ReentrantLock lock = new ReentrantLock(); // 1. 创建锁对象
    static int count = 0; // 共享资源

    public static void main(String[] args) throws InterruptedException {
        Runnable task = () -> {
            lock.lock();          // 2. 加锁
            try {
                count++;         // 3. 操作共享资源
            } finally {
                lock.unlock();  // 4. 解锁(必须执行)
            }
        };

        Thread t1 = new Thread(task);
        Thread t2 = new Thread(task);

        t1.start();
        t2.start();

        t1.join();
        t2.join();

        System.out.println("结果: " + count); // 输出 2
    }
}

ReentrantLock独占锁源码分析

这里多说下ReentrantLock,公平锁和非公平锁吧!

其实非公平就是多了一步,setExclusiveOwnerThread将当前线程所有者改为当前线程。

这个exclusiveOwnerThread字段,是AQS继承AbstractOwnableSynchronizer来的字段。

state字段是AQS定义的。在ReentrantLock中,这个state0就是没有线程获锁,1就是有线程获取到锁。

// 非公平锁  ReentrantLock.NonfairSync
final void lock() {
    // 尝试获取锁,将state由0改1
    if (compareAndSetState(0, 1))
        // 把锁的线程的所有者exclusiveOwnerThread字段,设置为当前线程
        setExclusiveOwnerThread(Thread.currentThread()); 
    else
        acquire(1);	// 抢锁失败,放入阻塞队列
}

// AbstractOwnableSynchronizer#setExclusiveOwnerThread
protected final void setExclusiveOwnerThread(Thread thread) {
    exclusiveOwnerThread = thread;
}


// 公平锁  ReentrantLock.FairSync
final void lock() {
    acquire(1);
}

下面是AQS的独占锁具体逻辑:

在这里插入图片描述

首先是执行子类(ReentrantLock)的实现:

  1. tryAcquire方法,尝试再获取锁1次。
  2. addWaiter将当前线程封装为Node,加入CLH队列。
  3. acquireQueued将线程挂起。
public final void acquire(int arg) {
    if (!tryAcquire(arg) && 	// 1.tryAcquire调用子类实现
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) // AQS实现,加入同步队列等待
        selfInterrupt();		// 线程中断复位
}

tryAcquire

tryAcquire方法,由于子类具体实现,下面是公平锁的实现源码:

// 不公平的实现
protected final boolean tryAcquire(int acquires) {
    return nonfairTryAcquire(acquires);
}

// nonfairTryAcquire方法
final boolean nonfairTryAcquire(int acquires) {
    // 获取当前线程
    final Thread current = Thread.currentThread();
    // 获取当前状态,ReentrantLock中 0是锁没有被抢占,1是已经被其他线程抢占了
    int c = getState();
    // 如果锁没有被抢占
    if (c == 0) {
        // cas尝试抢占,state由0改到1
        if (compareAndSetState(0, acquires)) {
            // 把锁的线程的所有者exclusiveOwnerThread字段,设置为当前线程
            setExclusiveOwnerThread(current);
            return true;
        }
    }
    // 可重入锁的情况,如果持有锁的线程为当前线程
    else if (current == getExclusiveOwnerThread()) { 
        // 线程重入数量+1,ReentrantLock独占,这里的acquires就是1
        int nextc = c + acquires;
        if (nextc < 0) // overflow
            throw new Error("Maximum lock count exceeded");
        // 修改state的状态
        setState(nextc);
        return true;
    }
    return false;
}

公平锁的实现,和非公平不一样的地方在hasQueuedPredecessors方法这里,hasQueuedPredecessors方法的作用是判断当前线程是否排队等待获取锁

🍪这里我们不展开说,主播把大家当初学者来看,现在你并不知道同步队列的结构,主播会在下面的Semaphore源码分析的时候,再说这个东西。你现在只需要知道这个方法是用来判断当前线程是否排队等待获取锁

protected final boolean tryAcquire(int acquires) {
    final Thread current = Thread.currentThread();
    int c = getState();
    if (c == 0) {
        // 当前线程不需要等待获取锁 且 cas获取锁成功
        if (!hasQueuedPredecessors() &&		
            compareAndSetState(0, acquires)) {
            // 把锁的线程的所有者exclusiveOwnerThread字段,设置为当前线程
            setExclusiveOwnerThread(current);
            return true;
...

addWaiter

创建一个独占的Node,并将它放入同步队列中。

在这里插入图片描述

// 独占模式这里是mode是Node.EXCLUSIVE。值为null
private Node addWaiter(Node mode) {
    // 创建1个独占模式的 Node。在同步队列中nextWaiter字段用来区分是独占还是共享模式。
    // waitStatus初始值就是0,Node.EXCLUSIVE值为null
    Node node = new Node(Thread.currentThread(), mode);
    // pred赋值为当前同步队列的tail
    Node pred = tail;
    // 如果当前同步队列有tail(就是已经构建过同步队列了)
    if (pred != null) {
        // 当前要加入同步队列Node的前序,指向同步队列的尾部
        node.prev = pred;
        // cas将同步队列的tail赋值为当前要加入的Node
        if (compareAndSetTail(pred, node)) {
            // 同步队列的tail的下一个Node赋值为当前要加入的Node
            pred.next = node;
            return node;
        }
    }
    // 没有构建过同步队列,node入队
    enq(node);
    return node;
}


Node(Thread thread, Node mode) {     // Used by addWaiter
    this.nextWaiter = mode;
    this.thread = thread;
}

enq方法

private Node enq(final Node node) {
    // cas修改
    for (;;) {
        // 临时Node赋值为tail节点
        Node t = tail;
        // 当前同步队列没有tail节点(说明没有初始化过)
        if (t == null) { 
            // cas将head节点设置为新创建的节点(注意,这里是new的Node,不是入参的Node)
            if (compareAndSetHead(new Node()))
                tail = head;	// 将tail赋值为head
        } else {
            // ====== 下面是同步队列初始化过的逻辑 ========
            // 要加入同步队列的node的prev设置为tail节点
            node.prev = t;
            // cas将tail节点,由t设置为要加入同步队列的node
            if (compareAndSetTail(t, node)) {
                // tail节点的下一个节点赋值为当前要加入node的节点
                t.next = node;
                return t;
            }
        }
    }
}

acquireQueued

在这里插入图片描述

final boolean acquireQueued(final Node node, int arg) {
    boolean failed = true;
    try {
        boolean interrupted = false;
        for (;;) {
            // 获取当前node的前一个node
            final Node p = node.predecessor();
            // 前一个node为头节点 且 再次尝试获取1次成功
            if (p == head && tryAcquire(arg)) {
                // 当前要加入的节点设置为头节点
                setHead(node);
                // 前一个node的next设置为null(方便当前node的前驱被gc回收)
                p.next = null; 
                failed = false;
                return interrupted;
            }
            // 抢占失败,判断是否要将线程挂起
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                interrupted = true;
        }
    } finally {
        if (failed)
            cancelAcquire(node);	// 因为抢占异常,将等待状态设置为CANCELLED
    }
}


// setHead方法
private void setHead(Node node) {
    // 同步队列头节点设置为当前要加入的node
    head = node;
    // 当前要加入的节点的线程,设置为null
    node.thread = null;
    // 当前要加入的节点的前驱,设置为null
    node.prev = null;
}

下面是判断是否要将线程挂起shouldParkAfterFailedAcquire,和具体挂起线程parkAndCheckInterrupt的代码方法。

// 判断是否要将线程挂起
// 入参:
// - pred:当前要加入的CLH队列节点的前驱节点,下面会称为前驱节点
// - node:当前要加入的CLH队列的节点,下面会称为当前节点
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
    // 前驱节点的等待状态
    int ws = pred.waitStatus;
    // 判断当前节点,是否为Node.SIGNAL(值为-1)
    // Node.SIGNAL表示,pred节点释放后,会通知node,当前线程可以安心的挂起。
    if (ws == Node.SIGNAL)
        return true;
    // 当前节点的状态为CANCELLED(值为1),说明前驱节点已因超时/中断被取消
    if (ws > 0) {
        // 前驱节点向前找,将当前节点的前驱设置为,不为CANCELLED状态的节点。
        do {
            node.prev = pred = pred.prev;
        } while (pred.waitStatus > 0);
        // 前驱节点的后继节点设置为当前节点。
        pred.next = node;
    } else {
        // cas将前驱节点的等待状态设置为Node.SIGNAL。
        compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
    }
    return false;
}


// 中断线程
private final boolean parkAndCheckInterrupt() {
    // 将当前线程挂起
    LockSupport.park(this);
    // 唤醒后执行下面的代码
    return Thread.interrupted();
}

什么情况下,节点的等待状态会变成CANCELLED呢?

  1. 线程被中断:当线程在acquire过程中被中断(调用Thread.interrupt()),会触发cancelAcquire方法将节点状态设为CANCELLED
  2. 超时未获锁:在doAcquireNanos等带超时的获取方法中,若超时仍未获得锁,会通过cancelAcquire标记为取消
  3. 节点失效处理:在shouldParkAfterFailedAcquire中,若发现前驱节点是CANCELLED状态,会主动跳过这些失效节点

ReentrantLock释放资源分析

ReentrantLock释放资源的入口unlock方法,调用

// ReentrantLock#unlock
public void unlock() {
    sync.release(1);
}

// AQS#release
public final boolean release(int arg) {
    // 调用ReentrantLock.Sync子类实现的tryRelease方法
    if (tryRelease(arg)) {
        Node h = head;
        if (h != null && h.waitStatus != 0)
            unparkSuccessor(h);
        return true;
    }
    return false;
}

因为这篇是AQS源码阅读,这里我们简单看ReentrantLock如何tryRelease方法的

// ReentrantLock.Sync#tryRelease
// 入参 releases = 1
protected final boolean tryRelease(int releases) {
    // 这c,不一定是0,因为锁是可重入的,每次重入state+1
    int c = getState() - releases;
    // 只有持有锁的线程,才能释放锁
    if (Thread.currentThread() != getExclusiveOwnerThread())
        throw new IllegalMonitorStateException();
    // free表示是否是否成功,只有c=0的时候才算释放成功
    boolean free = false;
    if (c == 0) {
        free = true;
        setExclusiveOwnerThread(null);
    }
    setState(c);
    return free;
}

所以有lock一定要有对应的unlock来减少state数量,不然就线程安全问题了💀。下面是可重入锁,但是没有unlock释放锁,导致线程获取不到锁。

public static void main(String[] args) {
    ReentrantLock lock = new ReentrantLock();
    for (int i = 0; i < 5; i++) {
        new Thread(() -> {
            lock.lock();
            try {
                System.out.println(Thread.currentThread().getName() + ",获取到锁了。");
                Thread.sleep(1000);
                lock.lock();
                System.out.println("重入获取锁");
                // 缺少unlock
            } catch (Exception e) {
                throw new RuntimeException(e);
            } finally {
                lock.unlock();
            }
        }).start();
    }
}

// 输出结果
Thread-0,获取到锁了。
重入获取锁

好了,主播好像又说了一堆别的内容,现在继续来说说AQS那块是怎么实现释放资源的吧 !

省流版就是:

  1. 获取同步队列中的头节点。
  2. 如果有头节点且等待状态不为0,就唤醒头结点后续的节点。
public final boolean release(int arg) {
    if (tryRelease(arg)) {
        // 获取同步队列的head节点
        Node h = head;
        // 同步队列的头节点不为空且头节节点等待状态不为0
        if (h != null && h.waitStatus != 0)
            unparkSuccessor(h);	// 唤醒head节点
        return true;
    }
    return false;
}

unparkSuccessor做了什么呢?省流来咯:

  1. CAS将头节点的等待状态改为0。
  2. 唤醒同步队列中最先进入不为CANCELLED的节点。
// 这里的入参node是头节点,为了方便理解下面都说头节点。
private void unparkSuccessor(Node node) {
    int ws = node.waitStatus;
    // 将头节点的等待状态设置0
    if (ws < 0)
        compareAndSetWaitStatus(node, ws, 0);

    // 获取头节点的后继节点
    Node s = node.next;
    // 头节点没有后继节点 或 头节点的等待状态大于0(CANCELLED)
    // 那就从尾节点向前,不断找不最前面,不为CANCELLED的节点,并赋值给s
    if (s == null || s.waitStatus > 0) {
        s = null;
        for (Node t = tail; t != null && t != node; t = t.prev)
            if (t.waitStatus <= 0)
                s = t;
    }
    // s不空就唤醒s节点的线程
    if (s != null)
        LockSupport.unpark(s.thread);
}

以为Semaphore的角度看AQS共享锁实现

Semaphore的简单使用

Semaphore可以用来控制资源并发访问数量,可以用来做限流,下面的代码例子,我们限制每次只能由2个线程来获取共享资源。

public static void main(String[] args) {
    Semaphore semaphore = new Semaphore(2);
    for (int i = 0; i < 5; i++) {
        new Thread(() -> {
            try {
                // 访问共享资源
                semaphore.acquire();
                // 模拟执行业务时间2s
                Thread.sleep(2000);
                System.out.println("Thread " + Thread.currentThread().getName() + " 获取到共享资源");
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            } finally {
                semaphore.release();
            }
        }).start();
    }
}

输出结果

Thread Thread-0 获取到共享资源
Thread Thread-1 获取到共享资源
Thread Thread-2 获取到共享资源
Thread Thread-4 获取到共享资源
Thread Thread-3 获取到共享资源

Semaphore获取共享锁的源码分析

Semaphore也是有分为公平和非公平的说法的。

// Semaphore#acquire
public void acquire() throws InterruptedException {
    sync.acquireSharedInterruptibly(1);
}

// AbstractQueuedSynchronizer#acquireSharedInterruptibly
public final void acquireSharedInterruptibly(int arg)
        throws InterruptedException {
    if (Thread.interrupted())
        throw new InterruptedException();
    if (tryAcquireShared(arg) < 0)		// 	调用子类实现
        doAcquireSharedInterruptibly(arg);
}

tryAcquireShared,让我们看看Semaphore子类是怎么实现的,Semaphore也有非公平和公平的说法。

public Semaphore(int permits, boolean fair) {
    sync = fair ? new FairSync(permits) : new NonfairSync(permits);
}

公平和非公平的区别在,公平的会先判断,当前线程是否需要排队。

我们先来看看公平的实现:

// Semaphore.NonfairSync#tryAcquireShared 
protected int tryAcquireShared(int acquires) {
    for (;;) {
        // 判断当前线程是否需要排队。
        if (hasQueuedPredecessors())
            return -1;
        // state在这里表示共享资源的可占有数量
        int available = getState();
        // 减去本次想要占有的数量
        int remaining = available - acquires;
        if (remaining < 0 ||
            compareAndSetState(available, remaining))
            return remaining;    // 返回的remaining要是小于0就是抢共享资源失败的意思
    }
}

之前主播在ReentrantLock说的,这里要详细说说这个hasQueuedPredecessors方法

现在来具体说下这个hasQueuedPredecessors方法,判断当前线程是否需要排队。

public final boolean hasQueuedPredecessors() {
    Node t = tail;
    Node h = head;
    Node s;
    // 具体下面的判断
    return h != t &&
        ((s = h.next) == null || s.thread != Thread.currentThread());
}

步骤1(判断头节点和尾节点是否不同?): h != t

目的是为了快速判断当前同步队列是否为空,如果头节点和尾节点相同,说明同步队列为空,如下图所示,所以当前线程是不需要排队的。

在这里插入图片描述

步骤2(头节点的下一个节点 s 是否存在?): (s = h.next) == null

在并发场景中,可能有其他线程正在入队(比如刚设置完 tail,但还未更新 head.next),导致 h.next 暂时为 null。如下图所示。这时,认为存在并发竞争,保守判定为需要排队

在这里插入图片描述

步骤3(队列中第一个有效等待线程(s.thread)是否是当前线程?)s.thread != Thread.currentThread()

如果是当前线程 ,说明自己是队列中的第一个等待者 ,不用排队(返回 false)。

外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传

如果不是 ,其他线程更早排队(下图thread0更早) , 必须排队(返回 true)。

在这里插入图片描述

非公平的其实很公平的差不多,少了个判断是需要排队。

// Semaphore.FairSync#tryAcquireShared
protected int tryAcquireShared(int acquires) {
    return nonfairTryAcquireShared(acquires);
}

// Semaphore.Sync#nonfairTryAcquireShared
final int nonfairTryAcquireShared(int acquires) {
    for (;;) {
        // state在这里表示共享资源的可占有数量
        int available = getState();
        // 减去本次想要占有的数量
        int remaining = available - acquires;
        if (remaining < 0 ||
            compareAndSetState(available, remaining))
            return remaining; // 返回的remaining要是小于0就是抢共享资源失败的意思
    }
}

好了,现在回到我们AQS的获取共享资源的代码里来吧!

// AbstractQueuedSynchronizer#acquireSharedInterruptibly
public final void acquireSharedInterruptibly(int arg)
        throws InterruptedException {
    if (Thread.interrupted())
        throw new InterruptedException();
    if (tryAcquireShared(arg) < 0)		
        doAcquireSharedInterruptibly(arg);	// AQS的具体获取共享资源
}

doAcquireSharedInterruptibly 方法

nNIMnhkRjz5F90tQ%253D&pos_id=img-HkcWOFLz-1745232961337)

// AQS#doAcquireSharedInterruptibly
private void doAcquireSharedInterruptibly(int arg)
    throws InterruptedException {
    // 创建一个共享的Node,加入同步队列中(这里的addWaiter和ReentrantLock是一样的流程)
    // static final Node SHARED = new Node();
    final Node node = addWaiter(Node.SHARED);
    boolean failed = true;
    try {
        for (;;) {
            final Node p = node.predecessor();
            // 如果当前节点的前驱节点是头节点
            if (p == head) {
                // 尝试获取锁,返回的r是,后面方法的入参propagate,意思是还有多个共享资源可以占有
                int r = tryAcquireShared(arg);
                if (r >= 0) {
                    setHeadAndPropagate(node, r); // 将当前节点设置为头节点,并试试唤醒后继节点
                    p.next = null; 
                    failed = false;
                    return;
                }
            }
            // 获取锁失败。找到等待状态为Node.SIGNAL的前继节点 + 挂起线程
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                throw new InterruptedException();
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

setHeadAndPropagate方法,做了2件事:

  1. 设置当前节点为头节点
  2. 尝试唤醒后继节点
private void setHeadAndPropagate(Node node, int propagate) {
    // 备份head节点
    Node h = head;
    // 将当前节点设置为头节点
    setHead(node);

    // 尝试唤醒后继节点
    // 1.propagate > 0: 表示当前有剩余资源(如Semaphore的许可),可以继续唤醒后续线程。
    // 2.h == null: h 是旧的头部节点,若为 null 说明队列异常(实际极少发生)
    // 3.h.waitStatus < 0:表示旧头部节点处于需唤醒后续节点的状态(如 SIGNAL)。
    // 4.(h = head) == null || h.waitStatus < 0:重新获取下head的值再试一次
    if (propagate > 0 || h == null || h.waitStatus < 0 ||
        (h = head) == null || h.waitStatus < 0) {
        Node s = node.next;
        // 当前唤醒节点的后继节点为null或者是共享类型的Node,就唤醒后继节点。
        if (s == null || s.isShared())
            doReleaseShared();		// 具体的释放资源逻辑,下面会说
    }
}

// setHead方法
private void setHead(Node node) {
    head = node;
    node.thread = null;
    node.prev = null;
}

Semaphore释放共享锁的源码分析

当我们调用release方法的时候,会释放共享资源。

// Semaphore#release
public void release() {
    sync.releaseShared(1);
}

// AbstractQueuedSynchronizer#releaseShared
public final boolean releaseShared(int arg) {
    // 子类实现尝试释放资源
    if (tryReleaseShared(arg)) {
        doReleaseShared();
        return true;

    }
    return false;
}

让我先看看子类Semaphore是怎么实现释放资源的:

简单来说,就是CAS的,修改state。

protected final boolean tryReleaseShared(int releases) {
    for (;;) {
        int current = getState();
        int next = current + releases;
        if (next < current) // overflow
            throw new Error("Maximum permit count exceeded");
        if (compareAndSetState(current, next))
            return true;
    }
}

好了,我们看看AQS那部分是怎么做的释放共享资源,跟上主播的节奏,来到doReleaseShared方法:

private void doReleaseShared() {
    for (;;) {
        Node h = head;
        // 当前同步队列不为空,不为刚刚初始化完。
        if (h != null && h != tail) {
            int ws = h.waitStatus;
            // 头节点的状态为Node.SIGNAL,就唤醒这个线程。并将当前头节点的waitState改为0。
            if (ws == Node.SIGNAL) {
                if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                    continue;            
                unparkSuccessor(h);
            }
            // 处理头节点处于0状态的情况,确保在并发释放时唤醒信号能正确传播。
            // 你一定会好奇这个ws == 0是怎么来的?
            // 线程A将头节点状态从SIGNAL改为0并唤醒线程B。
            // 线程B获取资源后成为新头节点,此时线程C进入doReleaseShared(),发现新头节点状态为0
            else if (ws == 0 &&
                     !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                continue;                
        }
        // 如果head节点发生改变,说明存在竞争就需要重新判断唤醒。没有变的话就结束。
        if (h == head)                   
            break;
    }
}

以CutDownLunch的角度看AQS共享锁的实现

CutDownLunch的简单使用

让我简单看看CountDownLatch的使用,通过一个计数器实现线程等待,适用于“主线程等待子线程完成任务”或“多线程同时启动”等场景。

public static void main(String[] args) throws InterruptedException {
    CountDownLatch latch = new CountDownLatch(3); // 初始化计数器为3

    // 创建并启动3个子线程
    for (int i = 0; i < 3; i++) {
        new Thread(() -> {
            try {
                System.out.println("子线程执行任务...");
                Thread.sleep(1000);
                latch.countDown(); // 任务完成后计数器减1
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
        }).start();
    }

    latch.await(); // 主线程等待所有子线程完成任务
    System.out.println("所有子线程已完成任务,主线程继续执行");
}

输出结果

子线程执行任务...
子线程执行任务...
子线程执行任务...
所有子线程已完成任务,主线程继续执行

CutDownLunch获取共享锁源码分析

让我们从CountDownLatch的await方法开始看起:

// CountDownLatch#await()
public void await() throws InterruptedException {
    sync.acquireSharedInterruptibly(1);
}

AQS怎么获取共享资源的?

// AbstractQueuedSynchronizer#acquireSharedInterruptibly
public final void acquireSharedInterruptibly(int arg)
        throws InterruptedException {
    if (Thread.interrupted())
        throw new InterruptedException();
    if (tryAcquireShared(arg) < 0)			// 调用子类(CutDownLunch)tryAcquireShared实现
        doAcquireSharedInterruptibly(arg);	// 获取锁失败执行
}

让我们简单看看CutDownLunchtryAcquireShared方法

  • state == 0:返回 1,表示倒计时已完成,线程可以直接通过。
  • state > 0:返回 -1,表示倒计时未完成,线程需阻塞等待。
// CountDownLatch.Sync#tryAcquireShared
protected int tryAcquireShared(int acquires) {
    return (getState() == 0) ? 1 : -1;
}

我们看看AQS是怎么做的,其实如果上面你看了Semaphore的共享资源获取实现,你就会惊奇的发现,好像差不多哈。

// AQS#doAcquireSharedInterruptibly
private void doAcquireSharedInterruptibly(int arg)
    throws InterruptedException {
    final Node node = addWaiter(Node.SHARED);
    boolean failed = true;
    try {
        for (;;) {
            final Node p = node.predecessor();
            if (p == head) {
                // 这里返回是1或者-1
                // - 返回1,线程可以直接通过。
                // - 返回-1,表示倒计时未完成,线程需阻塞等待。
                int r = tryAcquireShared(arg);
                if (r >= 0) {
                    setHeadAndPropagate(node, r); // 尝试唤醒后继节点
                    p.next = null; 
                    failed = false;
                    return;
                }
            }
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                throw new InterruptedException();
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

setHeadAndPropagate

// 入参解释下
// node:当前新增的节点 propagate:这里是1
private void setHeadAndPropagate(Node node, int propagate) {
    Node h = head; 
    setHead(node);	// 将当前节点设置为头节点

    // 重点在这里
    // CountDownLatch中:propagate 表示是否已完全释放(即 state 是否减到 0)
    if (propagate > 0 || h == null || h.waitStatus < 0 ||
        (h = head) == null || h.waitStatus < 0) {
        Node s = node.next;
        if (s == null || s.isShared())
            doReleaseShared();	// 
    }
}

CutDownLunch释放共享锁源码分析

我们看看countDown是怎么释放共享资源的。

// CountDownLatch#countDown
public void countDown() {
    sync.releaseShared(1);
}

CountDownLatch#countDown中调用AQS的releaseShared方法

public final boolean releaseShared(int arg) {
    // 子类实现tryReleaseShared方法获取共享资源
    if (tryReleaseShared(arg)) {
        doReleaseShared();
        return true;
    }
    return false;
}

doReleaseShared是AQS的实现,我们在上面的Semaphore讲过了。

这里具体看看子类CountDownLatch是怎么重写tryReleaseShared方法的。

protected boolean tryReleaseShared(int releases) {
    for (;;) {
        int c = getState();
        if (c == 0)
            return false;
        // 将state数量-1,state减到0就表示可以放行所有被await()阻塞的线程。
        int nextc = c-1;
        if (compareAndSetState(c, nextc))
            return nextc == 0;
    }
}

以CyclicBarrier的角度看AQS条件队列

CyclicBarrier的简单使用

CyclicBarrier(循环屏障)用于让一组线程互相等待,直到所有线程都到达某个屏障点后,再一起继续执行。

public static void main(String[] args) {
    int threadCount = 3;
    // 创建 CyclicBarrier,指定等待的线程数和到达屏障后的回调动作
    CyclicBarrier barrier = new CyclicBarrier(threadCount, () -> {
        System.out.println("所有线程已到达屏障,开始下一阶段");
    });

    for (int i = 0; i < threadCount; i++) {
        new Thread(() -> {
            try {
                System.out.println(Thread.currentThread().getName() + " 开始第一阶段任务");
                Thread.sleep(1000);
                barrier.await(); // 等待其他线程到达屏障

                System.out.println(Thread.currentThread().getName() + " 开始第二阶段任务");
                Thread.sleep(1000);
                barrier.await(); // 再次等待

                System.out.println(Thread.currentThread().getName() + " 完成所有任务");
            } catch (Exception e) {
                e.printStackTrace();
            }
        }, "线程" + (i + 1)).start();
    }
}

CyclicBarrier循环屏障源码分析

await方法

// CyclicBarrier#await
public int await() throws InterruptedException, BrokenBarrierException {
    try {
        return dowait(false, 0L);
    } catch (TimeoutException toe) {
        throw new Error(toe);
    }
}

dowait方法

// CyclicBarrier#dowait
private int dowait(boolean timed, long nanos)
    throws InterruptedException, BrokenBarrierException,
           TimeoutException {
    final ReentrantLock lock = this.lock;
    // 这里加锁是为了使用Condition的await(),具体原因会在ReentrantLock源码解读中说。
    lock.lock();
    try {
        // generation代的概念,线程数量达到构造CyclicBarrier传的parties数量,可以执行构造时传的任务,这就是1代。
        final Generation g = generation;

        // 标记屏障是否被破坏,如果屏障被破坏,其他等待线程需要立即感知到这一状态,而不是无限等待。
        if (g.broken)
            throw new BrokenBarrierException();

        // 出现线程被中断,主动破坏屏障(breakBarrier()),唤醒所有等待线程,避免它们无限等待
        if (Thread.interrupted()) {
            breakBarrier();
            throw new InterruptedException();
        }

        // 需要打破屏障的线程数量--
        // 数量为0,就是打破屏障,执行构造CyclicBarrier传的任务。
        int index = --count;
        if (index == 0) {  
            boolean ranAction = false;
            try {
                final Runnable command = barrierCommand;
                if (command != null)
                    command.run();	// 执行任务
                ranAction = true;	
                nextGeneration();	// 复原,准备下一代的数据
                return 0;
            } finally {
                if (!ranAction)
                    breakBarrier();
            }
        }

        // 如果没有达到打破屏障的线程数,那就挂起这个线程
        for (;;) {
            try {
                // 是否启用超时机制
                if (!timed)
                    trip.await();		// 释放锁 + 挂起线程
                else if (nanos > 0L)
                    nanos = trip.awaitNanos(nanos);
            } catch (InterruptedException ie) {
                if (g == generation && ! g.broken) {
                    breakBarrier();
                    throw ie;
                } else {
                    Thread.currentThread().interrupt();
                }
            }

            if (g.broken)
                throw new BrokenBarrierException();

            if (g != generation)
                return index;

            if (timed && nanos <= 0L) {
                breakBarrier();
                throw new TimeoutException();
            }
        }
    } finally {
        lock.unlock();	// 释放锁
    }
}

await方法,这里会涉及到线程的挂起和锁的释放。

public final void await() throws InterruptedException {
    if (Thread.interrupted())
        throw new InterruptedException();
    Node node = addConditionWaiter();	  // 加入到条件队列中
    int savedState = fullyRelease(node);  // 释放当前线程持有的锁
    int interruptMode = 0;
    // 如果不在同步队列中,那么就挂起这个线程。
    while (!isOnSyncQueue(node)) {
        LockSupport.park(this);		// 挂起线程
        if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
            break;
    }
    if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
        interruptMode = REINTERRUPT;
    if (node.nextWaiter != null) 
        unlinkCancelledWaiters();
    if (interruptMode != 0)
        reportInterruptAfterWait(interruptMode);
}

addConditionWaiter方法,创建Node.CONDITION的Node,添加到条件队列里面。

下图模拟了条件队列的可能的添加新Node的情况。
在这里插入图片描述
在这里插入图片描述

private Node addConditionWaiter() {
    Node t = lastWaiter;
    // 有最后一个节点,且等待状态不为Node.CONDITION
    if (t != null && t.waitStatus != Node.CONDITION) {
        unlinkCancelledWaiters();
        t = lastWaiter;
    }
    Node node = new Node(Thread.currentThread(), Node.CONDITION);
    if (t == null)
        firstWaiter = node;
    else
        t.nextWaiter = node;
    lastWaiter = node;
    return node;
}

条件队列转同步队列

比如,我们在await的线程数量,达到可以打破屏障的时候,我们会执行nextGeneration方法。这时候就会涉及到条件队列转同步队列。

int index = --count;
if (index == 0) {  
    boolean ranAction = false;
    try {
        final Runnable command = barrierCommand;
        if (command != null)
            command.run();	
        ranAction = true;	
        nextGeneration();	// 唤醒所有线程,从条件队列进入到同步队列

nextGeneration方法,会为下一代准备数据。

// CyclicBarrier#nextGeneration
private void nextGeneration() {
    // 重点在这里,唤醒线程
    trip.signalAll();

    count = parties;	// 复原打破屏障的数量 
    generation = new Generation();	// 创建新的一代
}

// AbstractQueuedSynchronizer#signalAll
public final void signalAll() {
    if (!isHeldExclusively())
        throw new IllegalMonitorStateException();
    Node first = firstWaiter;
    if (first != null)
        doSignalAll(first);		// 重点在这里
}

doSignalAll,这里会逐步的将每个节点都放入,放入到同步队列种。

在这里插入图片描述

// AbstractQueuedSynchronizer#doSignalAll
private void doSignalAll(Node first) {
    lastWaiter = firstWaiter = null;
    do {
        Node next = first.nextWaiter;
        first.nextWaiter = null;
        transferForSignal(first);
        first = next;
    } while (first != null);
}

具体转换同步队列

// AbstractQueuedSynchronizer#transferForSignal
final boolean transferForSignal(Node node) {
    // CAS将等待状态修改从-2到0
    if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
        return false;
    // 加入同步队列,enq具体做啥之前说过了(‾◡◝)
    Node p = enq(node);
    int ws = p.waitStatus;
    // 等待状态为cancel 或者 cas修改等待状态Node.SIGNAL失败。就唤醒node的线程。
    if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
        LockSupport.unpark(node.thread);
    return true;
}

// AbstractQueuedSynchronizer#enq
private Node enq(final Node node) {
    for (;;) {
        Node t = tail;
        if (t == null) {
            if (compareAndSetHead(new Node()))
                tail = head;
        } else {
            node.prev = t;
            if (compareAndSetTail(t, node)) {
                t.next = node;
                return t;
            }
        }
    }
}

后话

怎么样?有毅力的你,能看到这里,你真的非常的厉害了,给你一个👍

通过对ReentrantLock独占锁的分析,聪明的你一定明白了Node是什么?Node.SIGNAL的意思,AQS的同步队列是什么样子的,怎么加入同步队列的?

通过Semaphore、CutDownLunch共享锁的分析,聪明的你一定明白了Node.PROPAGATE是干嘛的,它们是怎么基于AQS实现共享模式的?nextWaite是用来区分独占和共享模式的字段。

通过CyclicBarrier的分析,我们知道了条件队列,和AQS实现的条件队列转同步队列。

最后的最后

其实还有很多内容还是可以补充的,也欢迎各位大佬指出我的不足🙇‍♂️🙇‍♂️🙇‍♂️

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2340344.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

k8s介绍与实践

第一节 理论 基础介绍&#xff0c;部署实践&#xff0c;操作实践&#xff0c;点击这里学习 第二节 dashboard操作 查看安装的dashboard服务信息 kubectl get pod,svc -n kubernetes-dashboard 网页登录地址&#xff1a;https://server_ip:30976/#/login 创建token kube…

KRaft面试思路引导

Kafka实在2.8之后就用KRaft进行集群管理了 Conroller负责选举Leader&#xff0c;同时Controller管理集群元数据状态信息&#xff0c;并将元数据信息同步给各个分区的Leader 和Zookeeper管理一样&#xff0c;会选出一个Broker作为Controller去管理整个集群&#xff0c;但是元数…

FreeRTOS菜鸟入门(六)·移植FreeRTOS到STM32

目录 1. 获取裸机工程模版 2. 下载 FreeRTOS V9.0.0 源码 3. FreeRTOS文件夹内容简介 3.1 FreeRTOS文件夹 3.1.1 Demo文件夹 3.1.2 License 文件夹 3.1.3 Source 文件夹 3.2 FreeRTOS-Plus 文件夹 4. 往裸机工程添加 FreeRTOS 源码 5. 拷贝 FreeRTOSConfig…

14.第二阶段x64游戏实战-分析人物的名字

免责声明&#xff1a;内容仅供学习参考&#xff0c;请合法利用知识&#xff0c;禁止进行违法犯罪活动&#xff01; 本次游戏没法给 内容参考于&#xff1a;微尘网络安全 上一个内容&#xff1a;13.第二阶段x64游戏实战-分析人物等级和升级经验 名字&#xff08;中文英文符号…

【CS*N是狗】亲测可用!!WIN11上禁用Chrome自动更新IDM插件

现象&#xff1a;每次打开chrome后IDM会弹出提示插件版本不一致。经过排查后发现是chrome把IDM插件给更新了&#xff0c;导致IDM提示版本不匹配。经过摸索后&#xff0c;得到了可行的方案。 第一步&#xff0c;打开Chrome&#xff0c;把IDM插件卸载掉&#xff0c;然后重新安装I…

漫游git rebase + 浅谈git checkout和git branch -f的分支命令

今天学了两个命令非常有意思&#xff1a;一个是git checkout&#xff0c;一个是git branch -f。我们可以认为在提交树上&#xff0c;任何一个节点代表着一次提交。并且&#xff0c;git commit将会在 H E A D HEAD HEAD指针指向的节点上进行进一步提交。将每一个分支名视为标记当…

深入理解 React 组件的生命周期:从创建到销毁的全过程

React 作为当今最流行的前端框架之一&#xff0c;其组件生命周期是每个 React 开发者必须掌握的核心概念。本文将全面剖析 React 组件的生命周期&#xff0c;包括类组件的各个生命周期方法和函数组件如何使用 Hooks 模拟生命周期行为&#xff0c;帮助开发者编写更高效、更健壮的…

OpenCV 图形API(44)颜色空间转换-----将图像从 BGR 色彩空间转换为 RGB 色彩空间函数BGR2RGB()

操作系统&#xff1a;ubuntu22.04 OpenCV版本&#xff1a;OpenCV4.9 IDE:Visual Studio Code 编程语言&#xff1a;C11 算法描述 将图像从BGR色彩空间转换为RGB色彩空间。 该函数将输入图像从BGR色彩空间转换为RGB。B、G和R通道值的常规范围是0到255。 输出图像是8位无符号3通…

配置nginx服务,通过多ip区分多网站

首先关闭防火墙,setenforce 0 关过了,不截图了 多IP,首先配置多个IP地址 可以在vm增加虚拟网卡,也可以在同一网卡配置多个IP,我用第一种 记得点确定 查看新的虚拟网卡IP 没有IP,配置一个 安装nginx 写配置 server{listen 192.168.214.130:80;root /www/ip/130; # 资源根目…

[k8s实战]Containerd 1.7.2 离线安装与配置全指南(生产级优化)

[k8s实战]Containerd 1.7.2 离线安装与配置全指南&#xff08;生产级优化&#xff09; 摘要&#xff1a;本文详细讲解在无外网环境下部署 Containerd 1.7.2 容器运行时的完整流程&#xff0c;涵盖二进制包安装、私有镜像仓库配置、Systemd服务集成等关键步骤&#xff0c;并提供…

解决Windows安全中心显示空白页面

1、电脑重装系统后&#xff0c;发现原本一些软件打不开了&#xff0c;电脑莫名认为有病毒&#xff0c;自动删除插件。附图。 2、第一反应是电脑防火墙的原因&#xff0c;默认威胁防护识别到了病毒软件&#xff0c;自动删除。在开始屏幕搜Windows安全中心&#xff0c;打开之后发…

【MQ篇】初识MQ!

目录 一、什么是MQ&#xff1f;简单来说就是个“快递中转站” &#x1f4e6;二、为什么要用MQ&#xff1f;用了它&#xff0c;好处多多&#xff01;&#x1f929;三、MQ的应用场景&#xff1a;各行各业都能用&#xff01;&#x1f30d;四、MQ的优缺点&#xff1a;硬币的两面&am…

2、SpringAI接入ChatGPT与微服务整合

2、SpringAI接入ChatGPT与微服务整合 小薛博客AI 大模型资料 1、SpringAI简介 https://spring.io/projects/spring-ai Spring AI是一个人工智能工程的应用框架。其目标是将Spring生态系统的设计原则&#xff08;如可移植性和模块化设计&#xff09;应用于人工智能领域&#…

榕壹云预约咨询系统:基于ThinkPHP+MySQL+UniApp打造的灵活预约小程序解决方案

数字化咨询场景的痛点与解决方案 在心理咨询、医疗问诊、法律咨询等需要预约服务的场景中&#xff0c;传统线下预约存在效率低、管理复杂、资源分配不均等问题。榕壹云预约咨询系统基于ThinkPHPMySQLUniApp技术栈开发&#xff0c;为咨询类行业提供了一套高效、安全、可扩展的数…

opencv 图像矫正的原理

图像矫正的原理是透视变换&#xff0c;下面来介绍一下透视变换的概念。 听名字有点熟&#xff0c;我们在图像旋转里接触过仿射变换&#xff0c;知道仿射变换是把一个二维坐标系转换到另一个二维坐标系的过程&#xff0c;转换过程坐标点的相对位置和属性不发生变换&#xff0c;…

计算机前沿技术课程论文 K-means算法在图像处理的应用

K-means算法在图像处理的应用 这是本人在计算机前沿技术课程中的课程论文文章&#xff0c;为了方便大家参考学习&#xff0c;我把完整的论文word文档发到了我的资源里&#xff0c;有需要的可以自取。 点击完整资源链接 目录 K-means算法在图像处理的应用摘要&#xff1a;引言1…

WSL2-Ubuntu22.04安装URSim5.21.3

WSL2-Ubuntu22.04安装URSim5.21.3 准备安装启动 准备 名称版本WSL2Ubuntu22.04URSim5.21.3VcXsrvNaN WSL2安装与可视化请见这篇:WSL2-Ubuntu22.04-配置。 安装 我们是wsl2-ubuntu22.04&#xff0c;所以安装Linux版本的URSim&#xff0c;下载之前需要注册一下&#xff0c;即…

blender 录课键位显示插件(图文傻瓜式安装)

1、下载 点击这个链接进行下载https://github.com/nutti/Screencast-Keys 下载好不用解压 2、安装 打开blender进行安装 点击编辑选择偏好设置 选择插件再点击这个下箭头 选择从磁盘安装 然后找到自己刚刚下载好的&#xff0c;点击从磁盘安装 安装完成后勾选上插件 …

天翼云手机断开连接2小时关机

2025-04-21 天翼云手机断开连接2小时自动 天翼云手机 4元1个月 天翼云手机永不关机 天翼云手机不休眠 天翼云手机断开连接时&#xff0c;界面显示&#xff1a;离线运行&#xff0c;2小时后自动关机 电脑每小时自动连接一次 手机每小时自动连接一次

基于 FFmpeg 的音视频处理基础原理与实验探究

目录 1 基本知识1.1 解封装1.2 AAC和ADTS说明 1.3 H2641.3.1 H264编码结构解析1.3.2 NALU1.3.2 分类 2 实验1 探究音视频信息2.1 重要结构体介绍2.2 相关的API 3 实验二 提取AAC数据4 实验三 提取h264 1 基本知识 1.1 解封装 封装的逆向操作&#xff1a;封装是把音频流、视频流…