【JUC源码专题】AQS 源码分析(JDK8)

news2025/1/17 21:36:10

文章目录

  • 同步队列
    • 同步队列结点 Node
    • 同步队列状态 state
  • 获取互斥锁
    • acquire 方法
      • tryAcquire 方法获取互斥锁
      • addWaiter 方法
        • enq() 入队
      • acquireQueued()
        • setHead 方法设置头节点
        • shouldParkAfterFailedAcquire()
        • parkAndCheckInterrupt()
        • cancelAcquire 发生异常,取消线程获取锁的行为
          • unparkSuccessor 方法唤醒后驱结点
    • acquireInterruptibly 响应中断
      • doAcquireInterruptibly
  • 获取共享锁
    • acquireShared 方法
      • tryAcquireShared 方法由子类实现
      • doAcquireShared 方法
        • setHeadAndPropagate 方法唤醒其他读线程
  • 释放锁
    • fullyRelease
    • release 释放互斥锁
    • releaseShared 释放共享锁
    • tryReleaseShared 由子类实现
    • doReleaseShared
    • 总结
  • 条件队列
    • ConditionObject
    • 等待条件满足
      • awaitUninterruptibly
      • await()
        • addConditionWaiter 将线程放入等待队列
          • unlinkCancelledWaiters
      • checkInterruptWhileWaiting
      • transferAfterCancelledWait
      • reportInterruptAfterWait
    • 唤醒等待线程
      • signal
        • doSignal
      • signalAll
        • doSignalAll
      • transferForSignal
    • 总结

AQS 是 JAVA 中管程模型的实现,JUC 包中的很多工具类都是基于 AQS 实现的。所以笔者将本篇文章作为《JDK8 JUC 源码全解》专题的开篇文章。

同步队列

同步队列结点 Node

同步队列的底层是双向链表,链表中的结点就是 Node 类。

    // AQS 中的队列是通过双向链表实现的,Node 就是链表中的结点。
    static final class Node {
    
        static final Node SHARED = new Node(); // 共享锁模式
        static final Node EXCLUSIVE = null; // 互斥锁模式

        // 采用 volatile 修饰的变量
        private transient volatile Node head; // 双向链表头结点
        private transient volatile Node tail; // 双向链表尾结点
        volatile int waitStatus; // Node 结点的等待状态
        volatile Node prev; // 前继结点
        volatile Node next; // 后继结点
        volatile Thread thread; // Node 中封装的线程

        // Node 结点的等待状态枚举
        static final int CANCELLED =  1; // 取消状态
        static final int SIGNAL    = -1; // 后继结点已挂起,需要它来唤醒
        static final int CONDITION = -2; // 结点在条件队列中
        static final int PROPAGATE = -3; // 共享锁模式下,传播唤醒的行为
        
        Node nextWaiter; // 特指条件队列中的 next
    }

同步队列状态 state

表明当前同步队列当前的状态。
[[ReentrantLock]] 中 state 表示互斥锁是否被线程持有,即 0 没有线程持有,1 已经被线程持有。
[[Semaphore]] 中 state 变量来实现信号量计数。
[[ReentrantReadWriteLock]] 中 state 的高 16 位用于表示持有读锁的线程的数量,低 16 位用于表示持有写锁的线程的重入次数

private volatile int state;

protected final int getState() {  
    return state;  
}

// 注:此方法不具备线程安全性,需要调用方自行保证上下文线程安全
protected final void setState(int newState) {  
    state = newState;  
}

// 线程安全的方式修改 state 变量
protected final boolean compareAndSetState(int expect, int update) {  
    return unsafe.compareAndSwapInt(this, stateOffset, expect, update);  
}

获取互斥锁

try 前缀的方法都是由子类实现的,AQS 中只是定义了方法,比如 tryAcquiretryRelease
do 前缀的方法,都是 AQS 实现的去同步队列排队,比如 doAcquireShared

acquire 方法

public final void acquire(int arg) {
    if (!tryAcquire(arg) &&
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) // Node.EXCLUSIVE表示互斥锁
        selfInterrupt(); // 如果在等待过程中发生中断,传递中断标志位
}

acquire 方法依赖于 tryAcquire 方法的返回值,tryAcquire 方法的具体逻辑由 AQS 子类实现。

  • 若 tryAcquire 方法返回值为 false,则先将当前线程封装为 Node 对象,插入同步队列中。
  • 然后判断当前线程是否应该阻塞等待,通过死循环保证当前线程阻塞等待或者发生异常被取消或者直接获得锁。
  • 如果在等待过程中发生中断,则传递中断标志位

tryAcquire 方法获取互斥锁

protected boolean tryAcquire(int arg) {
    throw new UnsupportedOperationException();
}

tryAcquire 具体实现由子类决定。[[模版方法模式]]

addWaiter 方法

将当前线程封装为 Node 结点,插入到 AQS 双向链表的末尾。
Pasted image 20221119210757

    // 没竞争到锁资源的线程封装成 Node 插入双向链表排队
    private Node addWaiter(Node mode) {
        // 将当前线程封装为 Node 结点
        Node node = new Node(Thread.currentThread(), mode);
        // 保存 tail 快照
        Node pred = tail;
        if (pred != null) {
            // 分为三步将新结点挂到链表末端
            node.prev = pred;
            if (compareAndSetTail(pred, node)) {
                pred.next = node;
                return node;
            }
        }
        // 若 CAS 失败或者 pred 为 null,则以死循环的方式,保证当前线程挂到双向链表末尾
        enq(node);
        return node;
    }

enq() 入队

enq:
               node.prev = t;
                if (compareAndSetTail(t, node)) {
                    t.next = node;
                    return t;
                }
addWaiter:
            node.prev = pred;
            if (compareAndSetTail(pred, node)) {
                pred.next = node;
                return node;
            }

为什么 enq 方法和 addWaiter 方法里都有 CAS 替换尾指针来原子性插入 Node 结点这段代码,这是代码冗余么?为什么要这么设计?
因为在竞争激烈的条件下,可以直接通过 CAS 来操作插入,而不需要先判断后插入。从代码层面上来说只是多了一个条件判断。但是如果在 CPU 指令流水线层面上减少判断,则是一个重要的优化。

    private Node enq(final Node node) {
        for (;;) { // 存在多线程竞争,所以这里需要死循环。
            // 拿到尾节点
            Node t = tail;
            // 如果尾节点为空,则构建一个伪头节点作为 head 和 tail。
            if (t == null) { // Must initialize
                if (compareAndSetHead(new Node()))
                    tail = head;
            } else {
                node.prev = t;
                if (compareAndSetTail(t, node)) {
                    t.next = node;
                    return t;
                }
            }
        }
    }

acquireQueued()

通过死循环来让当前结点获取锁或者阻塞。返回值表示等待过程中是否发生了中断(true 表示等待过程中发生了中断)。

// acquireQueued(addWaiter(Node.EXCLUSIVE), arg)
final boolean acquireQueued(final Node node, int arg) {
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
                // 前继结点 p
                final Node p = node.predecessor();
                // 若前继结点是伪头结点,则尝试获取锁
                if (p == head && tryAcquire(arg)) {
                    // 获取锁资源成功,更新头结点
                    setHead(node);
                    // 释放 p 结点指向当前结点的引用,帮助 GC
                    p.next = null; // help GC
                    failed = false;
                    return interrupted; // 传递中断标志位
                }
                // 当前线程必须将前继结点的 ws 设置为 -1 后,才能将自身挂起
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt()) 
                    interrupted = true;
            }
        } finally { // tryAcquire(arg) 抛异常的情况下会进这里
            if (failed)
                cancelAcquire(node);
        }
    } 

setHead 方法设置头节点

    private void setHead(Node node) {
        head = node;
        node.thread = null;
        node.prev = null;
    }

shouldParkAfterFailedAcquire()

如果结点的状态是 SIGNAL(-1),那么它释放锁以后必须唤醒后继结点。shouldParkAfterFailedAcquire() 就是要确保前继结点的 ws 是 SIGNAL 后,再调用 parkAndCheckInterrupt() 将自身挂起。

private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
        // 前继结点的 ws
        int ws = pred.waitStatus;
        // 若 ws 为 Node.SIGNAL,说明前继结点阻塞后会唤醒当前线程,当前线程可以放心阻塞
        if (ws == Node.SIGNAL)
            return true;
        // ws 大于 0 的状态只有取消状态 CANCELLED(1)
        if (ws > 0) {
            // 往前找,直到找到一个结点的状态不等于 1 的结点,作为当前结点的上一个结点
            do {
                node.prev = pred = pred.prev;
            } while (pred.waitStatus > 0);
            pred.next = node;
        } else { // ws 为 0 或 PROPAGATE 的情况
            // 尝试将前继结点的 ws 修改为 SIGNAL(这里没用死循环,不一定成功)
            compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
        }
        return false; // 返回 false,重新进入外层死循环
    }

跳过 ws 是 CANCELLED 的结点:

           do {
                node.prev = pred = pred.prev;
            } while (pred.waitStatus > 0);
            pred.next = node;

Pasted image 20221119211101

  • 图中红色边框的结点由于没有 GC ROOT 引用,会被垃圾回收器回收

parkAndCheckInterrupt()

将线程设置为可中断的等待并返回线程是否在等待过程中发生中断。如果通过 LockSupport.unpark 方法唤醒线程,那么返回 false。如果通过 Thread.interrupt 方法唤醒线程,那么返回 true。

private final boolean parkAndCheckInterrupt() {
    // 阻塞线程        
    LockSupport.park(this);  
    
    // 可以确认当前挂起的线程是被中断唤醒的还是被正常唤醒的
    // 被中断唤醒返回 true,正常唤醒返回 false
    return Thread.interrupted();  
}

注:Thread.interrupted(); 会清除中断标志位

// The interrupted status of the thread is cleared by this method.
public static boolean interrupted() {  
    return currentThread().isInterrupted(true);
}

cancelAcquire 发生异常,取消线程获取锁的行为

Cancels an ongoing attempt to acquire.

  1. node.thread = null;
  2. Skip cancelled predecessors
  3. node.waitStatus = Node.CANCELLED;
  4. 将当前结点脱离同步队列,这一步分为 3 种情况:
    • 情况 1:要取消的结点就是 tail 结点
    • 情况 2:要取消的结点是伪头结点
    • 情况 3:要取消的结点不是伪头结点也不是 tail 结点

Pasted image 20221119211144
Pasted image 20221119211212
Pasted image 20221119211231

private void cancelAcquire(Node node) {
        if (node == null)
            return;
        node.thread = null;
        Node pred = node.prev;
        // Skip cancelled predecessors
        while (pred.waitStatus > 0)
            node.prev = pred = pred.prev;
                
        Node predNext = pred.next;
        node.waitStatus = Node.CANCELLED;
                
        // 若当前 node 是尾结点,由于是多线程竞争尾指针,必须通过 CAS 操作修改尾指针为前一个结点
        if (node == tail && compareAndSetTail(node, pred)) {
            // 若 CAS 修改尾指针成功后,将前继结点的 next 设置为 null
            compareAndSetNext(pred, predNext, null);
        } else {
            // 当前结点不是尾结点或者 compareAndSetTail 失败
            int ws;
            // 当前结点不是伪头结点的后继结点
            // 如果 pred 结点的状态是 SIGNAL,则表明 pred 需要唤醒后一个结点。尝试设置 pred 结点的 next 指针为需要他来唤醒的结点。
            if (pred != head &&
                ((ws = pred.waitStatus) == Node.SIGNAL ||
                 (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) 
                 // 存在一个状态 ws == Node.SIGNAL 但是 pred.thread == null
                 // 因为 cancelAcquire 是先执行 node.thread = null; 后执行 node.waitStatus = Node.CANCELLED;
                 // 所以这里需要校验 pred.thread != null
                 && pred.thread != null) {
                Node next = node.next;
                // node 的 next 不为 null,且不是取消结点,则将 pred 的 next 指向当前结点的 next
                // 这里只是试一下,没有用死循环,因为查找有效节点是从 tail 往前找,这里失败也没事
                if (next != null && next.waitStatus <= 0)
                    compareAndSetNext(pred, predNext, next);
            } else {
                // pred == head,当前结点是伪头结点的后继结点,则唤醒后继结点
                // 进入条件判断if (p == head && tryAcquire(arg))尝试竞争锁 
                unparkSuccessor(node);
            }
                        
            // 当前结点的 next 指向当前结点
            node.next = node; // help GC
        }
    }
unparkSuccessor 方法唤醒后驱结点
private void unparkSuccessor(Node node) {

    int ws = node.waitStatus;
    if (ws < 0)
        node.compareAndSetWaitStatus(ws, 0);

    Node s = node.next;
    if (s == null || s.waitStatus > 0) { // 后继结点为 null 或者是 CANCELLED 状态
        s = null;
        // 从 tail 往前找,找到一个离 node 最近的未取消结点
        for (Node p = tail; p != node && p != null; p = p.prev)
            if (p.waitStatus <= 0)
                s = p;
    }
    if (s != null)
        LockSupport.unpark(s.thread); // 唤醒
}

acquireInterruptibly 响应中断

响应中断的方法,如果在获取锁过程中发生了线程被中断,则会抛出中断异常。

public final void acquireInterruptibly(int arg)  
        throws InterruptedException {  
    if (Thread.interrupted())  
        throw new InterruptedException();  // 若线程被中断,抛出中断异常
    if (!tryAcquire(arg))  // 调用子类获取锁的方法
        doAcquireInterruptibly(arg);  // 执行可中断的获取锁流程
}

doAcquireInterruptibly

doAcquireInterruptibly 方法和 acquire 方法逻辑大致相同,区别在于 doAcquireInterruptibly 会响应中断即如果线程被中断,则不会继续获取锁,直接抛出中断异常。acquire 方法则不会响应中断标志位,仅仅是把中断标识位传递出去。

// 拿不到锁资源就一直等,直到锁资源释放后被唤醒或者被中断唤醒
private void doAcquireInterruptibly(int arg)  
    throws InterruptedException {  
    final Node node = addWaiter(Node.EXCLUSIVE);  
    boolean failed = true;  
    try {  
        for (;;) {  
            final Node p = node.predecessor();  
            if (p == head && tryAcquire(arg)) {  
                setHead(node);  
                p.next = null; // help GC  
                failed = false;  
                return;  
            }  
            if (shouldParkAfterFailedAcquire(p, node) &&  
                parkAndCheckInterrupt())  
                throw new InterruptedException();  // 响应中断,与 acquire 主要区别
        }  
    } finally {  
        if (failed)  
            cancelAcquire(node);  
    }

获取共享锁

学习共享锁时,我们需要特别关注读写锁是如果解决写线程饥饿(写线程抢不到锁)问题。

acquireShared 方法

acquireShared 和 acquire 主要区别是,当前线程获取锁后是否会唤醒后继结点。

    public final void acquireShared(int arg) {
        if (tryAcquireShared(arg) < 0)
            doAcquireShared(arg);
    }

tryAcquireShared 方法由子类实现

    protected int tryAcquireShared(int arg) {
        throw new UnsupportedOperationException();
    }

Semaphore、CountDownLatch、ReentrantReadWriteLock 中都有 tryAcquireShared 方法的实现。

doAcquireShared 方法

Pasted image 20221119201930

    private void doAcquireShared(int arg) {
        final Node node = addWaiter(Node.SHARED);
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
                final Node p = node.predecessor();
                // 若当前结点的前继结点是头节点,则调用子类实现的 tryAcquireShared 获取锁
                if (p == head) {
                    // 若成功获取锁,则设置当前结点为头节点,并唤醒同样等待在 SHARED 模式下的线程
                    // ReentrantReadWriteLock 的实现中返回值为 1 表示获取到读锁,-1 表示未获取到
                    int r = tryAcquireShared(arg); 
                    if (r >= 0) {
                        setHeadAndPropagate(node, r);
                        p.next = null; // help GC
                        if (interrupted)
                            selfInterrupt(); // 若被中断过,传递中断标志位
                        failed = false;
                        return;
                    }
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }

setHeadAndPropagate 方法唤醒其他读线程

Sets head of queue, and checks if [[successor]] may be waiting in shared mode, if so propagating if either propagate > 0 or PROPAGATE status was set.

    private void setHeadAndPropagate(Node node, int propagate) {
        Node h = head; // 保存旧的 head 结点
        setHead(node); // 当前结点设置为头结点
        // 老爷子的习惯,在使用变量之前都会做非空校验,虽然这里 h 和 head 必不为 null,但是还是校验了 h == null 和 (h = head) == null
        // 下述条件判断可简化为 if(propagate > 0 || h.waitStatus < 0 || head.waitStatus < 0)

        if (propagate > 0 || h == null || h.waitStatus < 0 ||
            (h = head) == null || h.waitStatus < 0) {
            Node s = node.next;
            if (s == null || s.isShared()) // 此处 s 也不可能为 null,所以可简写为 if(s.isShared())。下一个结点是读线程才唤醒,写线程就不唤醒了
                doReleaseShared();
        }
    }

[[ReentrantReadWriteLock]] 的实现中,tryAcquireShared 获取到读锁时返回值为 1,即 propagate 为 1,propagate > 0 恒成立,setHeadAndPropagate 可简写为:

    private void setHeadAndPropagate(Node node, int propagate) {
        Node h = head; 
        setHead(node); 
        Node s = node.next;
        if (s.isShared()) 
            doReleaseShared();
    }

释放锁

fullyRelease

Invokes release with current state value; returns saved state. Cancels node and throws exception on failure.

final int fullyRelease(Node node) {
    boolean failed = true;
    try {
        // 若是重入锁,则 savedState 是重入次数
        int savedState = getState();
        if (release(savedState)) { // 释放锁并唤醒后继结点
            failed = false;
            return savedState;
        } else {
            throw new IllegalMonitorStateException(); // 若线程未获取到锁就执行该方法,则抛出异常
        }
    } finally {
        if (failed)
            node.waitStatus = Node.CANCELLED; // 若失败则将结点状态更新为 CANCELLED(-1)
    }
}

release 释放互斥锁

release 方法存在伪唤醒问题,即唤醒的线程又继续阻塞。

    public final boolean release(int arg) {
        if (tryRelease(arg)) {
            // 头结点也即当前结点
            Node h = head; 
            // 若当前结点的 waitStatus 状态不等于 0(等于 -1)
            // 说明当前结点需要唤醒后面的结点
            if (h != null && h.waitStatus != 0)
                unparkSuccessor(h);
            return true;
        }
        return false;
    }

releaseShared 释放共享锁

    public final boolean releaseShared(int arg) {
        if (tryReleaseShared(arg)) {
            doReleaseShared();
            return true;
        }
        return false;
    }

tryReleaseShared 由子类实现

  • [[CountDownLatch#tryReleaseShared]]
  • [[ReentrantReadWriteLock#tryReleaseShared]]
  • [[Semaphore#tryReleaseShared]]

doReleaseShared

多个线程同时释放读锁时,第一个线程将头结点的状态从 SIGNAL 修改为 0。第二个线程进来发现 ws == 0 了,于是就将 ws 修改为 -3,表明此时有多个线程同时释放了读锁,被唤醒的线程需要将唤醒传播下去。

      private void doReleaseShared() {
        for (;;) { // 允许多个持有读锁的线程同时释放锁
            Node h = head;
            if (h != null && h != tail) { // 校验是否存在等待读锁的线程
                int ws = h.waitStatus;
                // 若当前头结点的状态为 SIGNAL,则表明后面有结点需要它来唤醒。CAS 将其修改为 0,并唤醒后驱结点
                if (ws == Node.SIGNAL) {
                    if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                        continue;            
                    unparkSuccessor(h); // 唤醒后继结点
                }
                else if (ws == 0 &&
                         !compareAndSetWaitStatus(h, 0, Node.PROPAGATE)) // 传播唤醒
                    continue;                
            }
            if (h == head) // 若头结点发生变化,必须重新进入死循环校验条件                
                break;
        }
    }

总结

Pasted image 20221119205356

条件队列

在生产者消费者模型中,消费者消费时,如果队列为空,则需要等待生产者生产后,唤醒消费者。生产者生产时,如果队列满了,则需要等待消费者消费后,由消费者唤醒生产者。可以参考笔者的 ArrayBlockingQueue 源码分析。
显然只有同步队列是满足不了我们的需求,我们还需要 2 个来队列来保存等待条件满足的生产者的消费者即条件队列。
想象一下,如果没有条件队列,队列为空时,消费者获取锁成功,发现队列为空,于是释放锁,又重新竞争锁资源,导致生产者一直没有获取到锁进行生产。这时极大的性能浪费。

ConditionObject

    public class ConditionObject implements Condition, java.io.Serializable {
        private transient Node firstWaiter; // 第一个等待结点
        private transient Node lastWaiter;  // 最后一个等待结点
        private static final int REINTERRUPT =  1; // 重复中断状态位
        private static final int THROW_IE    = -1; // 发生异常状态位
    }

等待条件满足

先将线程结点放入等待队列中,然后释放同步队列的锁,等待条件达成、中断、超时。当线程等待完成后,将重新调用 acquireQueued 方法获取锁。

awaitUninterruptibly

Implements uninterruptible condition wait.

  1. Save lock state returned by AbstractQueuedSynchronizer.getState().
  2. Invoke AbstractQueuedSynchronizer.release(int) with saved state as argument, throwing IllegalMonitorStateException if it fails.
  3. Block until signalled.
  4. Reacquire by invoking specialized version of AbstractQueuedSynchronizer.acquire(int) with saved state as argument.
        public final void awaitUninterruptibly() {
            // 加入同步队列
            Node node = addConditionWaiter();
            // 释放锁
            int savedState = fullyRelease(node);
            boolean interrupted = false;
            // 若线程不在同步队列中,则阻塞
            while (!isOnSyncQueue(node)) {
                LockSupport.park(this); // 阻塞
                if (Thread.interrupted())
                    interrupted = true; // 中断唤醒
            }
            if (acquireQueued(node, savedState) || interrupted)
                selfInterrupt(); // 传递中断标志位
        }

await()

await 和 awaitUninterruptibly 的区别在于,await 是会响应中断的。

public final void await() throws InterruptedException {
    if (Thread.interrupted()) 
        throw new InterruptedException(); // 响应中断
    // 将线程添加到条件队列中
    Node node = addConditionWaiter();
    int savedState = fullyRelease(node); // 释放锁
    int interruptMode = 0;
    while (!isOnSyncQueue(node)) { // while(不在同步队列中)
        LockSupport.park(this); // 挂起当前线程
        
        // 有两种情况会导致线程被唤醒:1. 中断唤醒 2. singal 将其移入同步队列后唤醒
        // 被唤醒后需要校验等待过程中是否发生中断
        if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
            break; // 若等待过程中发生中断,跳出 while 循环
    }
    // 被唤醒后,重新获取锁,此时线程已在同步队列中
    if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
        interruptMode = REINTERRUPT;
    if (node.nextWaiter != null) // clean up if cancelled
        unlinkCancelledWaiters();
    if (interruptMode != 0) // 处理中断
        reportInterruptAfterWait(interruptMode);
}

addConditionWaiter 将线程放入等待队列

Adds a new waiter to wait queue.

        private Node addConditionWaiter() {
            Node t = lastWaiter;
            // 若最后一个等待结点状态不是 CONDITION,则执行 unlinkCancelledWaiters 方法去掉所有非 Node.CONDITION 状态的变量
            if (t != null && t.waitStatus != Node.CONDITION) {
                unlinkCancelledWaiters();
                t = lastWaiter; // 获取最新的 lastWaiter
            }
            // 新建 Node 结点,插入等待队列
            Node node = new Node(Thread.currentThread(), Node.CONDITION);
            if (t == null)
                firstWaiter = node;
            else
                t.nextWaiter = node;
            lastWaiter = node;
            return node;
        }
unlinkCancelledWaiters

Unlinks cancelled waiter nodes from condition queue. Called only while holding lock. This is called when cancellation occurred during condition wait, and upon insertion of a new waiter when lastWaiter is seen to have been cancelled. This method is needed to avoid garbage retention in the absence of signals. So even though it may require a full traversal, it comes into play only when timeouts or cancellations occur in the absence of signals. It traverses all nodes rather than stopping at a particular target to unlink all pointers to garbage nodes without requiring many re-traversals during cancellation storms.
总结一下就是线程被 transfer 到同步队列以后,可能还在条件队列中,此时需要将他们 unlink 掉。

    private void unlinkCancelledWaiters() {
            Node t = firstWaiter;
            Node trail = null;
            while (t != null) {
                Node next = t.nextWaiter;
                if (t.waitStatus != Node.CONDITION) {
                    t.nextWaiter = null;
                    if (trail == null)
                        firstWaiter = next;
                    else
                        trail.nextWaiter = next;
                    if (next == null)
                        lastWaiter = trail;
                }
                else
                    trail = t;
                t = next;
            }
        }

checkInterruptWhileWaiting

Checks for interrupt, returning THROW_IE if interrupted before signalled, REINTERRUPT if after signalled, or 0 if not interrupted.
检查在等待过程中是否发生了中断:

  • 调用 signal 方法之前被中断唤醒,则返回 THROW_IE
  • 调用 sgnal 方法之后被中断唤醒,则返回 REINTERRUPT
  • 未被中断,则返回 0
// 传递中断状态位
private static final int REINTERRUPT =  1;
// 发生异常状态位
private static final int THROW_IE    = -1;

private int checkInterruptWhileWaiting(Node node) {
    return Thread.interrupted() ? // 检查中断标志位
        (transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) :
        0;
}

transferAfterCancelledWait

  • Transfers node, if necessary, to sync queue after a cancelled wait.
  • Returns true if thread was cancelled before being signalled.
    final boolean transferAfterCancelledWait(Node node) {
        // 当前线程是通过中断方式唤醒,需要抛出中断异常
        if (compareAndSetWaitStatus(node, Node.CONDITION, 0)) {
            enq(node); // transfer
            return true; // 抛出中断异常
        }
        // 
        // 其他线程 signal 当前线程时,被中断了。
        while (!isOnSyncQueue(node)) 
            Thread.yield();
        return false; // REINTERRUPT
    }

reportInterruptAfterWait

Throws InterruptedException, reinterrupts current thread, or does nothing, depending on mode.

        private void reportInterruptAfterWait(int interruptMode)
            throws InterruptedException {
            if (interruptMode == THROW_IE)
                throw new InterruptedException();
            else if (interruptMode == REINTERRUPT)
                selfInterrupt();
        }

唤醒等待线程

signal

Moves the longest-waiting thread, if one exists, from the wait queue for this condition to the wait queue for the owning lock.

        public final void signal() {
            if (!isHeldExclusively())
                throw new IllegalMonitorStateException();
            Node first = firstWaiter; // 先进先出
            if (first != null)
                doSignal(first); // 唤醒等待队列中的头节点
        }

doSignal

Removes and transfers nodes until hit non-cancelled one or null. Split out from signal in part to encourage compilers to inline the case of no waiters.

private void doSignal(Node first) {
    do {
        if ( (firstWaiter = first.nextWaiter) == null)
            lastWaiter = null;
        first.nextWaiter = null;
    } while (!transferForSignal(first) &&
             (first = firstWaiter) != null);
}
if ( (firstWaiter = first.nextWaiter) == null)
    lastWaiter = null;
first.nextWaiter = null;

Pasted image 20221119212029

signalAll

相当于 notifyAll

        public final void signalAll() {
            if (!isHeldExclusively()) // 必须持有锁
                throw new IllegalMonitorStateException(); 
            Node first = firstWaiter;
            if (first != null)
                doSignalAll(first);
        }

doSignalAll

        private void doSignalAll(Node first) {
            lastWaiter = firstWaiter = null;
            do {
                Node next = first.nextWaiter;
                first.nextWaiter = null;
                transferForSignal(first); // 移动到同步队列
                first = next; // 更新 first 的引用
            } while (first != null);
        }

transferForSignal

  • Transfers a node from a condition queue onto sync queue.
  • Returns true if successful.
    final boolean transferForSignal(Node node) {
        // If cannot change waitStatus, the node has been cancelled.
        if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
            return false;
        // 将 node 放入同步队列,并返回前继结点 p
        Node p = enq(node);
        int ws = p.waitStatus;
        // 若 p 的状态大于 0,那么就只有 CANCELLED 状态了,说明前继结点并不能唤醒 node,那么直接调用 unpark 唤醒 node
        // 若 p 不是 CANCELLED 状态,那么尝试将其修改为 -1,后续由 p 来唤醒 node,若 CAS 失败,则直接唤醒 node
        if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL)) // 线程协作
            LockSupport.unpark(node.thread);
        return true;
    }

总结

Pasted image 20221119204227

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/18742.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

第七章第一节:顺序查找和折半查找

文章目录教程1. 查找的基本概念1.1 对查找表的常见操作1.2 查找算法的评价指标2. 顺序查找2.1 顺序查找的算法思想2.2. 顺序查找的实现2.3 查找效率分析2.4 顺序查找的优化&#xff08;对有序表&#xff09;2.5 用查找判定树分析ASL2.6 顺序查找的优化&#xff08;被查概率不相…

在centos中注册gitlab runner

注册runner 有几种不同的方式&#xff0c;这里介绍的是在centos7中使用 rpm包来安装 按照gitlab runner的官网链接里面的介绍&#xff1a; gitlab runner 链接 下载 官网描述&#xff1a; ## Replace ${arch} with any of the supported architectures, e.g. amd64, arm, ar…

python 空间滤波

均值滤波器 空域变换包含灰度变换和空间滤波 灰度变换是通过点对点的映射进行图像增强&#xff0c;是一种点运算 空间滤波是基于邻域的一种运算&#xff0c;即图像像素的灰度值不仅和当前点有关&#xff0c;还和这个点周围邻域像素点的灰度值有关。所以空间滤波其实是一种加…

如何管理oralce口令文件和参数文件

口令文件 口令文件审核 Step 1: 使用root账号将 oracle dba的权限移除 [rootoracle-db-19c ~]# su - oracle [oracleoracle-db-19c ~]$ [oracleoracle-db-19c ~]$ id oracle uid1501(oracle) gid1501(oinstall) groups1501(oinstall),1502(dba),1503(oper),1504(backupdba)…

浅析linux 内核 高精度定时器(hrtimer)实现机制(一)

1 hrtimer 概述 在Linux内核中已经存在了一个管理定时器的通用框架。不过它也有很多不足&#xff0c;最大的问题是其精度不是很高。哪怕底层的定时事件设备精度再高&#xff0c;定时器层的分辨率只能达到Tick级别&#xff0c;按照内核配置选项的不同&#xff0c;在100Hz到1000…

灵界的科学丨一、灵界在哪里?

摘自李嗣涔教授《灵界的科学》 在国内物理学界近十位学者的见证下&#xff0c; 发现我们所处四度时空的物质世界之外&#xff0c; 似乎还有一个世界的存在&#xff0c; 当年我把这个世界称作信息场&#xff0c; 也就是俗称的灵界。 二十世纪末宇宙大尺度谜团的重大发现──…

设计模式学习记录

设计模式 UML图&#xff1a; ------> 依赖 ——>关联 -------▲ 实现 —–—▲ 继承 &#x1f53a;———> 聚合 ▲———> 组合&#xff08;关联性更强&#xff09; 一、策略模式&#xff08;行为型&#xff09; 策略模式&#xff1a;是一种定义一系列算法的方法…

Java --- Spring6项目创建及注意事项

目录 一、Spring框架解决的问题 二、Spring介绍 三、Spring八大模块 四、Spring特点 五、第一个Spring6入门程序 六、spring的细节 6.1、配置文件的bean的id不能重复 6.2、spring底层是通过反射调用无参构造方法创建对象 6.3、spring会把创建好的对象存储在Map集合中 6.4…

【数据结构初阶】树+二叉树+堆的实现

真正的勇士&#xff0c;就是在看清生活的真相后&#xff0c;依旧慷慨面对他所遭受的苦难与挫折。 大学究竟教会了我们什么呢&#xff1f;或许答案只有一个&#xff0c;看清自己&#xff0c;与自己和解&#xff0c;和自己坐下来谈一谈。 人生或许本就没有什么意义&#xff0c;…

MySQL的缓冲池(buffer pool)及 LRU算法

1.什么是缓冲池&#xff08;buffer pool&#xff09; buffer pool 是数据库的一个内存组件&#xff0c;里面缓存了磁盘上的真实数据&#xff0c;Java系统对数据库的增删改操作&#xff0c;主要是这个内存数据结构中的缓存数据执行的。 控制块 存的是 数据页所属的表 空间号&a…

MATLAB break语句

MATLAB中 break 语句用于终止 for 或 while 循环的执行&#xff0c;当在循环体内执行到该语句的时候&#xff0c;程序将会跳出循环&#xff0c;继续执行循环语句的下一语句。 注意&#xff1a;在嵌套循环中&#xff0c;break 退出只能在循环发生&#xff0c;后通过的声明控制循…

git关于创建/删除分支常用命令

主要用来介绍git中如何操作分支的命令&#xff1a; 1.git查看所有的分支: git branch -a 2.创建本地分支&#xff1a; git checkout -b <name> 3.有了本地分支之后推送到远程分支; git push origin <name> 4.切换分支&#xff1a; git checkout <name> 5.删除…

黑*头条_第5章_文章发布粉丝管理成形记

黑*头条_第5章_文章发布&粉丝管理成形记 文章目录黑*头条_第5章_文章发布&粉丝管理成形记文章发布&粉丝管理1 需求分析1.1 功能需求1.2 前端需求1.2.1 图文数据需求1.2.2 素材管理需求1.2.3 发布文章需求1.2.4 内容列表需求1.2.5 粉丝概况需求2 定义2.1 后端定义2.…

MacBook磁盘内存空间清理软件CleanMyMac2023

Mac电脑用的时间久了&#xff0c;Mac用户尤其是MacBook用户会经常收到“磁盘几乎已满”的提示&#xff0c;如何解决这个问题&#xff0c;当我们使用苹果MAC一段时间后&#xff0c;就会有大量的垃圾文件占用磁盘空间&#xff0c;例如系统缓存文件、应用程序缓存文件、备份和重复…

2022/11/18[指针] 关于指针的初步理解

先看一段利用指针交换a和b值得代码&#xff1a; #include<stdio.h> void swap(int* p1, int* p2); int main() {int a, b;int* pa, * pb;printf("enter:");scanf_s("%d%d", &a, &b);pa &a;pb &b;printf("%d,%d\n", pa,…

面试:线程池核心线程如何保持一直运行的

对于Java中 Thread 对象&#xff0c;同一个线程对象调用 start 方法后&#xff0c;会在执行完run 后走向终止&#xff08;TERMINATED&#xff09;状态&#xff0c;也就是说一个线程对象是不可以通过多次调用 start 方法重复执行 run 方法内容的。Java的线程是不允许启动两次的&…

frp内网穿透服务

参考博客&#xff1a; https://www.jianshu.com/p/19ea81efffc4 https://blog.csdn.net/yj222333/article/details/124752420 依赖于&#xff1a;Github开源软件FRP 下载地址&#xff1a;https://github.com/fatedier/frp/releases frp 主要由 客户端(frpc) 和 服务端(frps…

基于JAVA景区售票系统设计与实现 开题报告

本科生毕业论文 基于java框架springboot旅游景区景点购票系统 开题报告 学 院&#xff1a; 专 业&#xff1a; 计算机科学与技术 年 级&#xff1a; 学生姓名&#xff1a; 指导教师…

三维模型文件以及obj、ply格式文件生成pcd点云文件

方法一、三维模型文件生成obj文件 要想生成点云文件&#xff0c;要先将三维模型文件保存为obj文件格式&#xff0c;步骤如下&#xff1a; 通过SolidWorks将模型保存为stl文件格式打开SolidWorks的插件选择&#xff0c;在ScanTo3D前面打勾 在solidworks中以网格文件的形式打开…

通信算法之九十六:电力通信系统-HRF多载波通信系统-物理层收发信道开发

目录1.HRF通信系统-物理层收发信道开发1.1 SIG发射机算法功能模块1.2 SIG接收机算法功能模块1.3 PHR发射机算法功能模块1.4 PHR接收机算法功能模块1.5 PSDU发射接收机处理流程1.6前导LTF/STF序列发射接收处理流程1.7PPDU帧&#xff08;前导SIGPHRPSDU&#xff09;发射接收处理流…