【深度长文】聊一聊 Java AbstractQueuedSynchronizer 以及在 ReentrantLock 中的应用

news2024/11/16 20:32:31

文章目录

  • AQS 原理简述
    • 内部数据结构
    • 公平锁 vs. 非公平锁
      • ReentrantLock 非公平锁
      • ReentrantLock 公平锁
  • AQS 源码分析
    • 加锁过程
      • addWaiter
      • acquireQueued
      • shouldParkAfterFailedAcquire
      • cancelAcquire
    • 解锁过程
      • unparkSuccessor

AbstractQueuedSynchronizer (AQS) 是 Java 并发包中,实现各种同步结构和部分其他组成单元( ThreadPoolExecutor.Worker) 的基础。

理论上,同步结构可以相互实现(例如使用 Semaphore 实现 mutex),但 直接用一个同步结构来实现另一个可能会导致实现上的复杂性和难以理解的代码 。为了解决这种复杂性,Doug Lea 设计了 AQS,AQS 作为一个抽象类,内部封装了同步状态的管理、线程的排队和等待、以及条件变量的支持等基础功能。

通过继承 AQS 并实现其部分方法,可以相对容易地构建各种同步结构,如互斥锁 ReentrantLock、读写锁 ReadWriteLockCountDownLatch ,同步结构的开发者只需要关注于同步机制的具体逻辑,而不是底层的线程调度和同步状态管理。

本文旨在介绍 AQS 抽象类以及 ReentrantLock 如何基于 AQS 实现公平、非公平两种模式的互斥锁。

AQS 原理简述

AQS 的核心思想是,如果被请求的共享资源空闲,那么就将当前请求资源的线程设置为有效的工作线程,将共享资源设置为锁定状态。如果共享资源被占用,就需要一定的 阻塞等待唤醒机制 来保证锁分配。

AQS 内部字段和方法有三个重要的组成:

  • 一个先入先出(FIFO) 的等待线程队列 CLH,等待获取锁的线程放入该队列中,实现多线程间的竞争和等待。

    CLH:Craig、Landin and Hagersten队列,单向链表实现。AQS 的队列是 CLH 变体的虚拟双向队列,AQS 将请求共享资源的线程封装为 Node 对象实现锁分配。

  • 共享资源的持有状态通过一个 volatile 的整数成员 state 同步,同时还提供了 setStategetState 方法。

    /* The synchronization state. */
    private volatile int state;
    
  • 各种基于 CAS 操作的基础方法,以及期望具体同步结构(AQS 实现类)去实现的 tryAcquire/tryRelease 方法。tryAcquire 操作,尝试非阻塞地获取资源的独占权tryRelease 操作,释放对某个资源的独占。
    ReentrantLock 的内部类 Sync 继承了 AQS,公平锁和非公平锁模式分别实现了 tryAcquire 方法。

等待线程队列是 AQS 机制的核心之一,它的示意图如下:

CLH队列示意图


内部数据结构

AQS CLH 等待队列中的节点 Node 的数据结构如下:

// 队列中的节点
static final class Node {
    // 标记当前节点等待状态为共享还是排他
    static final Node SHARED = new Node();
    static final Node EXCLUSIVE = null;
    
    // 当前节点已经取消(因为超时或中断)
    static final int CANCELLED =  1;
    // 后继节点被或将要被阻塞(因park), 当前节点在释放或取消时需要unpark后继
    static final int SIGNAL    = -1;
    // 当前节点位于条件变量等待队列, 不会被用于sync队列节点
    static final int CONDITION = -2;
    // waitStatus value to indicate the next acquireShared should unconditionally propagate
    static final int PROPAGATE = -3;

    volatile int waitStatus;
    volatile Node prev;
    volatile Node next;
    volatile Thread thread;
    Node nextWaiter;
    ...
}

上述字段的含义如下:

方法和属性值含义
waitStatus当前节点在队列中的状态
thread节点包装的线程对象
prev指向前驱节点的指针
nextWaiter指向下一个处于 CONDITION 状态的节点(Condition Queue使用)
next指向后继节点的指针

详细解释下上述各个字段的用处:

  1. 队列节点 Node 包装 Thread 实例,控制信息由前驱节点 waitStatus 字段持有。例如:当前线程尝试获取锁失败后是否 park,需要查看前驱节点的waitStatus 是否为SIGNAL。
  2. 节点入队时,只需要将节点作为新的 tail 节点链接到链表中;出队时,将节点设置为新的 head。
  3. 使用prev指针处理可能的取消 (等待超时或中断);如果一个节点取消,它的后继节点需要重新链接到一个未被取消的前驱节点。(详见shouldParkAfterFailedAcquire方法)
  4. next 指针用于实现阻塞唤醒机制,unparkSuccessor(node) 方法先使用 node.next node 节点的第一个有效的后继节点,确定向哪个线程发送 unpark 信号。(详见 unparkSuccessor方法)

waitStatus 枚举值如下:

枚举含义
0当一个Node被初始化的时候的默认值
CANCELLED(1)表示线程获取锁的请求已经取消了
CONDITION(-2)节点在等待队列中,节点线程等待唤醒
PROPAGATE(-3)线程处在SHARED情况下,该字段才会使用
SIGNAL(-1)线程已经准备好,等待资源释放

独占锁的获取流程



共享锁的获取流程(参考ReadWriteLock):



公平锁 vs. 非公平锁

公平锁 指的是多个线程排队获取共享资源,获取锁的顺序按照线程请求锁的顺序来分配,线程队列中的第一个线程可以获得共享资源,其余线程等待资源释放。

公平锁的优点:

  • 公平:所有线程都能按照申请锁的顺序获得锁,避免了“饥饿”现象
  • 可预测性:可以预测任何线程获取锁的大致时间。

公平锁的缺点:

  • 效率较低:每次都需要检查和维护获取锁的顺序,增加了开销。
  • 吞吐量相比非公平锁更低:CPU唤醒阻塞线程的开销比非公平锁大。

非公平锁 指的是多个线程加锁时直接尝试获取锁,获取不到才会加入等待队列中。但如果加锁时成功获取锁,则无需进入等待队列。非公平锁可能出现后加锁的线程先于先加锁的线程获得锁:

优点:减少唤起线程的开销,提升整体吞吐率

缺点:等待队列中的线程可能获取不到锁,导致饥饿。


以奶茶店排队为例:

  • 公平锁模式:新同学 whr 到奶茶店领取奶茶,发现已经排了很长队,下一个领奶茶的是小王同学。具有公平观念的 whr 排到队伍尾部等待领取奶茶。
  • 非公平锁模式:左图中新同学 whr 领取奶茶,小王同学开了小差,whr直接插队领到了奶茶。右图中,小王同学坚守道德原则,不允许 whr 插队,whr 领取奶茶失败,排到队伍尾部等待领取。

在介绍 ReentrantLock 两种锁模式前,我先梳理下 ReentrantLock 和 AQS 之间的关系。

  • ReentrantLock 的内部抽象类 Sync 继承 AQS:

    abstract static class Sync extends AbstractQueuedSynchronizer {}
    
  • ReentrantLock 中的成员属性 sync 为 Sync 类型实例的引用,ReentrantLock 构造函数对该属性初始化:

    public class ReentrantLock implements Lock, java.io.Serializable {
    	private final Sync sync;
    	
    	// 默认为非公平锁, 系统吞吐率更高
        public ReentrantLock() {
            sync = new NonfairSync();
        }
    
    	// 如果fair为true, 显式指定采用公平锁模式
        public ReentrantLock(boolean fair) {
            sync = fair ? new FairSync() : new NonfairSync();
        }
    }
    

    NonfairSyncFairSync 均为继承 Sync 抽象类的同步结构实现类,对应于非公平锁和公平锁。

  • ReentrantLock 使用者通常使用 lock API 获取锁,该方法会调用 Sync#lock() 方法,FairSync 和 NonFairSync 各自实现了 Sync#lock 抽象方法。
    注意到:非公平锁 lock 时,先使用 CAS 尝试更新 state,如果更新成功则说明成功获取锁,不考虑是否有其它线程在排队等待

    // ReentrantLock#lock
    public void lock() {
        sync.lock();
    }
    
    // FairSync#lock
    final void lock() {
        acquire(1);
    }
    
    // NonFairSync#lock
    final void lock() {
       if (compareAndSetState(0, 1))
           setExclusiveOwnerThread(Thread.currentThread());
       else
           acquire(1);
    }
    

    AQS 中实现了 acquire 方法,调用 tryAcquire 方法尝试非阻塞地获取锁。如果成功,acquire 方法返回;如果失败,则将当前线程包装为 Node 对象,加入到阻塞队列中。

    public final void acquire(int arg) {
    	// tryAcquire 非阻塞地获取锁
        if (!tryAcquire(arg) &&
        	// 非阻塞获取锁失败, 当前线程进入 CLH 队列中等待其它线程释放锁
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }
    

    tryAcquire 方法在 AQS 中的实现是抛出异常,它的真正实现交由继承了 AQS 的实现类。

介绍完 ReentrantLock 和 AQS 的 API 交互关系,下面我将结合源码介绍 FairSync、NonFairSync 对 tryAcquire 方法的实现。

ReentrantLock 非公平锁

这一章介绍 ReentrantLock 使用 AQS 实现的非公平锁 NonfairSync,下图为加锁流程图:

加锁时调用 ReentrantLock#lock:

  • 调用 sync 属性的 lock 方法,sync 属性默认初始化为 NonfairSync 对象。
  • 如果 state 等于 0,NonfairSync#lock 先通过 CAS设置为 1;
    • 设置成功,将 exclusiveOwnerThread 设置为当前线程;
    • 如果失败,执行 AQS 的 acquire 方法排队等待锁资源的释放;
  • acquire 方法会先调用 tryAcquire 方法尝试获取锁,该方法由同步器实现,因此调用 NonfairSync#tryAcquire,也就是调用 nonfairTryAcquire 方法。
  • nonfairTryAcquire 方法中
    • 判断 state 为 0,尝试 CAS 更新为 1,表示占有锁,失败则返回 false;
    • 如果 state 不等于 0,说明锁已经被某个线程占用,检查 exclusiveOwnerThread 是否等于当前线程。若等于,则说明当前线程已经持有锁,更新 state 表示重入获取;若不为当前线程,返回 false。
// ReentrantLock.NonfairSync 非公平锁实现的lock()方法
// 无论等待队列中是否有等待线程, 直接使用 CAS 尝试获取锁
final void lock() {
    if (compareAndSetState(0, 1))
        setExclusiveOwnerThread(Thread.currentThread());
    else
        acquire(1);
}

// ReentrantLock.NonfairSync 
protected final boolean tryAcquire(int acquires) {
    return nonfairTryAcquire(acquires);
}

// ReentrantLock.Sync
final boolean nonfairTryAcquire(int acquires) {
    final Thread current = Thread.currentThread();
    int c = getState();
    if (c == 0) {
    	// 当前锁是空闲状态, CAS 设置 state 为 1, 设置 exclusiveOwnerThread 为当前线程 
        if (compareAndSetState(0, acquires)) {
            setExclusiveOwnerThread(current);
            return true;
        }
    }
    else if (current == getExclusiveOwnerThread()) {
    	// 可重入锁的持有线程为当前线程, 增加持有计数, 允许再次获取锁 
        int nextc = c + acquires;
        if (nextc < 0) // overflow
            throw new Error("Maximum lock count exceeded");
        setState(nextc);
        return true;
    }
    return false;
}

可以看到,当ReentrantLock 是非公平锁时,在 lock 方法和 tryAcquire 方法中,新来的线程会直接尝试 CAS 设置 state 为 1,而不考虑 CLH 队列中是否有其它线程在等待。这就是买奶茶时的插队行为,系统并发度较高时,争用共享资源的线程很多,可能导致队列中的等待线程饥饿!


ReentrantLock 公平锁

下面代码是公平锁实现的 tryAcquire 方法:

protected final boolean tryAcquire(int acquires) {
    final Thread current = Thread.currentThread();
    int c = getState();
    if (c == 0) {
       // hasQueuedPredecessors 检查队列中是否存在排队的线程
        if (!hasQueuedPredecessors() &&
            compareAndSetState(0, acquires)) {
            setExclusiveOwnerThread(current); // 设置当前线程为锁占有线程
            return true;
        }
    }
    else if (current == getExclusiveOwnerThread()) {
    	// 当前线程已经持有锁, 可重入地获取锁, 增加重入计数
        int nextc = c + acquires;
        if (nextc < 0)
            throw new Error("Maximum lock count exceeded");
        setState(nextc);
        return true;
    }
    return false;
}

相比 非公平锁,公平锁 FairSync#tryAcquire 在判断 state 等于 0 后,先通过 hasQueuedPredecessors 判断是否已经有线程在等待队列中排队。如果返回 true,说明有线程已经在排队了,当前线程不能直接获取锁。


hasQueuedPredecessors 实现如下:

public final boolean hasQueuedPredecessors() {
    Node t = tail; // Read fields in reverse initialization order
    Node h = head;
    Node s;
    return h != t &&
        ((s = h.next) == null || s.thread != Thread.currentThread());
}

来讨论下实现细节,该方法何时返回 true

return 语句中的条件出现的场景如下:

  • h != t
    • 情况一:已经有节点在排队,h 和 t 都不为 null;
    • 情况二:队列正在初始化,h 和 t 有一个为 null,但这种赋值顺序下,只可能 t 为 null。
  • s = h.next
    • s = h.next 不等于 null,对应情况一,队列中至少有两个不同的节点存在,除去 head 为虚节点,至少有一个线程正在排队等待锁释放。如果队列中第一个等待线程不是当前线程,说明队列中已经有其它线程正在等待,当前线程需要入队等待,保证公平锁策略。
    • s = h.next 等于 null,对应情况二,其它线程 B 正在初始化 CLH 队列,但还未完成【队列初始化并将线程节点入队】,当前线程 A 需要入队等待。

情况一的时序图如下

线程 A 调用 FairSync#tryAcquire 获取公平锁,执行 hasQueuedPredecessors() 检查等待队列中是否已经有线程排队。

线程 B 在线程 A 之前执行 tryAcquire,但是获取锁失败,执行 addWaiter 将自身加入到等待队列中。但是等待队列还未初始化(tail == null),因此使用 enq 方法入队,enq 会执行队列初始化,。(addWaiter 和 enq 方法实现在 AQS,我会在【源码分析】中详细介绍)


最终 h != t && (s = h.next) == null ,hasQueuedPredecessors 返回 true,而这个队列中的前驱节点就是 线程 B 的 Node 对象。



情况二时序图如下

等待队列中已经存在节点(h.next != null),且等待队列中第一个线程不是当前线程 s.thread != Thread.currentThread(),说明存在排队的线程,hasQueuedPredecessors 返回 true。


AQS 源码分析

本章节结合 AQS 源码,带大家更深入地了解调用 ReentrantLock 的 lock 和 unlock 方法时,底层的 AQS 完成了哪些工作。

加锁过程

ReentrantLock#lock 最终会调用 AQS 的 acquire 方法,我以该方法为入口介绍加锁流程:

public final void acquire(int arg) {
    if (!tryAcquire(arg) &&
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
}

AQS acquire 方法会调用同步结构实现的 tryAcquire 方法非阻塞地获取锁,根据 ReentrantLock 不同的锁模式,执行不同的 tryAcquire。这一部分已经在【非公平锁 vs. 非公平锁】中完成详细介绍。


addWaiter

acquire 方法中,如果 tryAcquire 获取锁失败,则会执行 addWaiteraddWaiter当前线程包装为 Node 节点,节点模式为 Node.EXCLUSIVE,加入到 CLH 等待队列中,返回包含当前线程的 Node 对象。

  • 如果 tail 等于 null,说明队列还未初始化,使用 enq 方法先初始化队列,再将 Node 实例入队。
  • 如果 CAS 更新 tail 字段失败,也执行 enq 方法,在 死循环中确保节点入队成功
private Node addWaiter(Node mode) {
    // 锁模式 + 当前线程 新建 CLH 队列的节点
    Node node = new Node(Thread.currentThread(), mode);
    Node pred = tail;
    if (pred != null) {
        // 前驱指向 pred, 即原 tail 节点
        node.prev = pred;
        // CAS 设置 tail 属性, 设置成功才允许将 prev.next 设置为 node
        if (compareAndSetTail(pred, node)) {
            pred.next = node;
            return node;
        }
    }
    // 队列为空, 初始化队列; CAS 失败, 则死循环直到入队成功
    enq(node);
    return node;
}

如果走到 enq 方法,说明 tail 等于 null (等待队列为空) 或者 CAS 设置 tail 失败

private Node enq(final Node node) {
    for (;;) {
        Node t = tail;
        if (t == null) {
            // 入队前先初始化队列, CAS确保只有一个线程完成队列初始化
            if (compareAndSetHead(new Node()))
                tail = head;
        } else {
            // CLH 队列已经初始化, 将 node 追加到队列尾部, CAS 失败则继续循环
            node.prev = t;
            if (compareAndSetTail(t, node)) {
                t.next = node;
                return t;
            }
        }
    }
}

acquireQueued

执行完 addWaiter 入队操作后,acquireQueued 方法让排队中的线程尝试获取锁,直到成功 或者 不再需要获取(中断)

final boolean acquireQueued(final Node node, int arg) {
    boolean failed = true;
    try {
        boolean interrupted = false;
        for (;;) {
            final Node p = node.predecessor();
            // p==head: 当前线程是队列中第一个等待线程, 锁释放后唤醒了当前线程
            if (p == head && tryAcquire(arg)) {
                // tryAcquire返回true, 当前线程成功获取锁, 将node设置为头节点
                // node.prev node.thread 设置为 null
                setHead(node); 
                p.next = null; // help GC
                failed = false;
                return interrupted;
            }
            // shouldParkAfterFailedAcquire 判断当前线程能否park(RUNNABLE 切换为 WAITING), 后面详细解释这个方法
            // 如果可以, 使用 LockSupport.park 挂起当前线程, unpark唤醒后, 返回中断标记
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                interrupted = true;
        }
    } finally {
        // 如果使用可中断的 API, parkAndCheckInterrupt返回true后抛出InterruptedException
        // 此时failed等于true, cancelAcquire 取消当前线程获取锁的尝试, node.ws更新为 CANCELLED
        if (failed)
            cancelAcquire(node);
    }
}

// 将Node实例设置为等待队列头节点, 
private void setHead(Node node) {
    head = node;
    node.thread = null;
    node.prev = null;
}

当前线程被包装为 node 对象加入队列后,如果 head 的后继为 node 节点,且当前线程 tryAcquire 返回 true。说明获取锁成功,等待队列的变化如下图所示:

如果 node 节点不为 head 的后继,说明队列中已经有线程在等待锁,应该 park 挂起当前线程,直到当前线程被中断或 unpark 唤醒。

这一过程的详细分析以及等待队列的变化将在 shouldParkAfterFailedAcquire 方法中介绍。现在大家只要了解 shouldParkAfterFailedAcquire 用于判断当前线程是否应该 park 挂起。如果该方法返回 true,则执行 parkAndCheckInterrupt 方法挂起当前线程:

private final boolean parkAndCheckInterrupt() {
    LockSupport.park(this);
    return Thread.interrupted();
}

线程被唤醒后,从LockSupport.park(this)后继续执行,返回线程中断标记(中断标记会被清除)。

如果 parkAndCheckInterrupt 返回 true,则在当前线程获取锁后,acquireQueued 方法也返回 true:

public final void acquire(int arg) {
    if (!tryAcquire(arg) &&
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
}

因为 Thread.interrupted 检查中断时清除了中断标记,因此这里调用 selfInterrupt 重新设置线程中断标记。综上所属:ReentrantLock#lock() 不会响应中断,只是记录中断记录。中断仅仅让等待线程在 acquireQueued 空转一次。


这里简单提一下,如果使用支持中断的 API:ReentrantLock#lockInterruptibly(),在 tryAcquire 返回 false 后,会调用 AQS 的 doAcquireInterruptibly 方法获取锁。

    private void doAcquireInterruptibly(int arg)
        throws InterruptedException {
        final Node node = addWaiter(Node.EXCLUSIVE);
        boolean failed = true;
        try {
            for (;;) {
                final Node p = node.predecessor();
                if (p == head && tryAcquire(arg)) {
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return;
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    throw new InterruptedException();
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }

该方法的流程与 acquireQueued 基本相同,差异主要在于线程被中断后的响应。如果 parkAndCheckInterrupt 返回 true,则会抛出InterruptedException,调用者可以感知到中断异常,而不需要显式检查中断标记。


shouldParkAfterFailedAcquire


shouldParkAfterFailedAcquire 代码详细注释如下,该方法判断当前线程能否执行后续的 parkAndInterrupted() 方法,park 挂起线程等待锁资源释放:

private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
    int ws = pred.waitStatus;
    if (ws == Node.SIGNAL)
        // ws == -1: 当前线程可以安心地挂起, 如果可获取锁, 前驱节点负责唤醒当前线程
        return true;
    if (ws > 0) {
        // 移除被取消的前驱节点, 再次尝试在 acquireQueued 的循环中获取锁
        do {
            node.prev = pred = pred.prev;
        } while (pred.waitStatus > 0);
        // 不使用 CAS 更新, 因为node包装的当前线程不可能被取消
        // 不存在其它线程将 pred.next 更新为其它节点的情况
        pred.next = node; 
    } else {
        // ws == 0 可能有如下两种情况: 
        // (1) pred 节点入队时ws一直为初始值 0
        // (2) 锁释放时执行了unparkSuccessor(pred), 但是当前线程没有成功获取锁
        //   设置前驱节点的 ws 为 -1, 等待下一次 unparkSuccessor(pred)
        compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
    }
    return false;
}
  • 只有 node 的前驱节点的 waitStatus 等于 SIGNAL,才能返回 true,当前线程才能安心地挂起。锁释放时,当前线程会被唤醒。
  • shouldParkAfterFailedAcquire 返回 false 后,进入 acquireQueued 方法的下一轮循环,当前线程会再次尝试获取锁 p == head && tryAcquire(arg)。返回 false 有如下两种情况:
    • 如果 node 的前驱节点已经取消(ws == 1),将取消节点移除等待队列,方法返回 false,再次尝试从 acquireQueued 方法获取锁。
    • 如果 node 的前驱节点 ws == 0,可能因为 pred 节点入队时 ws 一直保持初始值 0;或是 持有线程释放锁时执行了 unparkSuccessor(pred),唤醒了当前线程,但是当前线程在唤醒后还是没有成功获取锁。
      这两种情况下,都需要设置前驱节点的 ws 为 -1,等待下一次锁被释放时,执行 unparkSuccessor(pred)

下图为 node 未成功获取锁时,等待队列发生的变化:

  1. node 节点对应的线程 tryAcquire 未能获取锁,执行 acquireQueued 排队等待,此时 node 节点将作为新的 tail;
  2. node 线程执行 shouldParkAfterFailedAcquire 方法,发现它的两个前驱节点 node1、node2 的 waitStatus 为 CANCELLED(1),因此将它们从队列中移除,方法返回 false。
  3. node 线程再次尝试在 acquireQueued 方法的循环中获取锁,结果 tryAcquire 还是失败。再次进入 shouldParkAfterFailedAcquire 方法,发现 node 前驱节点 head 的 waitStatus 等于 0,因此通过 CAS 修改为 SIGNAL(-1),方法返回 -1。
  4. node 线程执行第三轮循环,终于 tryAcquire 返回 true,成功获取锁,node 成为头节点。

下面,我将分享下阅读源码过程中的一些疑问,以及自己的理解。

ws > 0 说明前驱节点状态为 CANCELLED,这种状态怎么产生的

当使用可中断的 API 时,例如lockInterruptibly,parkAndCheckInterrupt 检测到中断返回 true 时,会抛出 InterruptedException。finally 代码块执行时 failed 等于 true,执行 cancelAcquire 取消 node 获取锁的尝试,从等待队列中移除 node,这一过程中 node.waitStatus 会被修改为 CANCELLED

private void doAcquireInterruptibly(int arg)
    // ...
    boolean failed = true;
    try {
        for (;;) {
            //...
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                // 当前线程被中断后, parkAndCheckInterrupt返回true
                throw new InterruptedException();
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

ws 等于 0 和 -1 分别是什么含义

如果节点 node 作为头节点,node.waitStatus 等于 0,说明:

  • node 没有后继节点(没有线程在它后面排队)。如果有新的线程加入队列,会在 shouldParkAfterFailedAcquire 方法将 node.waitStatus 修改为 SIGNAL。

  • 锁持有者释放锁资源时,已经通过 unparkSuccessor(node) 唤醒了阻塞队列中第一个等待线程,node.waitStatus 会修改为中间状态 0。

    private void unparkSuccessor(Node node) {
        int ws = node.waitStatus;
        if (ws < 0)
            compareAndSetWaitStatus(node, ws, 0);
        // ...
    }
    

头节点 node.waitStatus 如果为 SIGNAL(-1),锁持有者在释放锁时(执行 AQS release 方法),需要唤醒阻塞队列中的第一个等待线程,即调用 unparkSuccessor(node)。如果 node.waitStatus 等于 0,则无需这样做。

    public final boolean release(int arg) {
        if (tryRelease(arg)) {
            Node h = head;
            if (h != null && h.waitStatus != 0)
                unparkSuccessor(h);
            return true;
        }
        return false;
    }

为什么 ws == 0,CAS 更新 ws 为 SIGNAL(-1) 后,方法不返回 true,执行 park 挂起当前线程

可能存在如下情况:

  1. 锁持有线程 owner_t 释放锁时,使用 unparkSuccessor ,该方法会将 head.ws 设置为 0,并唤醒队列第一个线程 curr_t。
  2. curr_t 被唤醒后,执行 tryAcquire 尝试获取锁,却因为 preempt_t 抢夺锁而获取失败。
  3. 争抢锁的线程 preempt_t 获取锁,快速执行完业务逻辑后,使用 unlock 释放锁,因为 head.ws 等于 0,不执行 unparkSuccessor 方法
  4. curr_t 因为获取锁失败,执行 shouldParkAfterFailedAcquire,检查到 head.ws 等于 0,于是 CAS 将 head.waitStatus 更新为 SIGNAL(-1)。

上述操作的时间序列如下表所示:

owner_tcurr_tpreempt_t
unlock-->unparkSuccessor
head.ws == 0
lock()
tryAcquire failed
unlock()
head.ws==0, no unparkSuccessor
shouldParkAfterFailedAcquire
compareAndSetWaitStatus(0 to -1)
return false;
空转 再次 tryAcquire 获取锁

如果 CAS 更新 pred.waitStatus 后,方法返回 true,当前线程将在本轮循环中执行 park 操作挂起。在这种情况下,锁处于空闲状态,curr_t 线程却阻塞在队列中等待唤醒而不是获取锁,有可能阻塞队列中的线程永远无法获取锁!

而返回 false 空转一次,curr_t 线程能确认锁当前是被其它线程占用的,才能安心地 park 挂起。

该场景地时序图如下:


cancelAcquire

节点因中断或超时,通过 cancelAcquire 方法设置状态为CANCLE。(ReentrantLock 中支持中断的API为lockInterruptibly)

private void doAcquireInterruptibly(int arg)
    // ...
    boolean failed = true;
    try {
        for (;;) {
            //...
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                // 当前线程被中断后, parkAndCheckInterrupt返回true
                throw new InterruptedException();
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

cancelAcquire 方法的执行逻辑如下:

  1. 找到被取消节点node未被取消的前驱节点pred,然后设置node的状态为CANCELLED
  2. 如果node为链表尾部,通过CAS 将tail设置为前驱节点 pred。
  3. 如果 pred 不是头节点,维护 pred 节点的 next 指针,设置 pred.next=node.next
  4. pred为头节点,需要通过unparkSuccessor(node) 唤醒线程争抢锁。

代码详细注释如下:

private void cancelAcquire(Node node) {
    // Ignore if node doesn't exist
    if (node == null)
        return;

    node.thread = null;

    // 通过 prev 指针跳过取消状态的节点
    Node pred = node.prev;
    while (pred.waitStatus > 0)
        node.prev = pred = pred.prev;

    Node predNext = pred.next;

    // waitStatus为volatile字段, 赋值之后, 其余节点能跳过该节点
    // 如果在赋值之前, 不受其它线程的干扰
    node.waitStatus = Node.CANCELLED;

    // 如果node为尾部, 从队列中移除自身, 将node的前驱pred设置为尾部, pred.next设置为null 
    if (node == tail && compareAndSetTail(node, pred)) {
        // CAS, pred.next可能已经因新节点入队而改变
        compareAndSetNext(pred, predNext, null);
    } else {
        int ws;
        // 当前节点的前驱不是head && 
        // 1: 判断当前节点前驱节点 ws 是否为 SIGNAL 2: 如果不是但可以 CAS 更新为 SIGNAL(即prev没有被取消)
        // 1 或 2 为 true, 且当前节点线程不为 null, CAS 设置 prev.next 为 node.next
        // 如果当前节点线程为 null, 可能是prev被取消 或 prev线程获得了锁成为head
        if (pred != head &&
            ((ws = pred.waitStatus) == Node.SIGNAL ||
             (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&
            pred.thread != null) {
            Node next = node.next;
            
            // 为什么仅修改pred.next? 
            // 1. unparkSuccessor(node) 优先通过node.next定位下一个未被取消的节点, 唤醒该节点争抢锁
            // 若node.next等于null或ws==0, 利用prev指针从后向前遍历, 效率会降低
            // 2. 不让 pred.next 引用被取消的 cancelled 节点, 有助于被取消节点的垃圾回收
            if (next != null && next.waitStatus <= 0)
                compareAndSetNext(pred, predNext, next);
        } else {
            // 兜底方案: 1(1)node的前驱为head 或 (2)pred节点几乎同时被取消; 唤醒最靠近node节点且未被取消的后继
            // 牺牲了部分性能, 因为可能要遍历等待队列, 并且唤醒线程
            unparkSuccessor(node);
        }

        node.next = node; // help GC
    }
}

上述代码,将被取消的节点 node 非为了三类:node 为尾节点、node 为中间节点、node 为头节点的后继节点。

  • node 为尾部节点时,将 node 未被取消的前驱节点 pred 设置为 tail,更新 pred.next 为 null,过程如下:

  • node 为中间节点时,维护好前驱节点 pred.next,如果 node.next 有效,将 pred.next 设置为node.next;

  • node 为头节点的后继,此时需要 unpark 唤醒 node 的后继节点,尝试获取锁。


我们来讨论如下几个问题:

为什么 pred == head 情况下,一定要 unparkSuccessor(node)

pred 节点对应的线程获得锁,记为 owner_t;node 节点对应的线程记为 node_t。假设出现如下的指令序列:

owner_tnode_t
owner_t 线程释放锁,unparkSuccessor(head)
head.ws 设置为 0
找到后继第一个未被取消的节点 node
node_t 被中断,执行cancelAcquire(node)
LockSupport.unpark(node.thread)
node.waitStatus = Node.CANCELLED

结论:如果 node 节点取消后不执行 unparkSuccessor,可能 node 节点后的等待线程不会再被 unpark

因为锁持有线程 owner_t 释放时,执行unparkSuccessor 将 node_t 线程唤醒,并将 head.waitStatus 设置为 0。但与此同时node 线程因为中断被取消,并不会尝试获取锁。

此时锁被释放,但是等待队列中的线程却不会被唤醒,永远无法获取锁。

即使是非公平锁,另一个线程 t 抢占了锁,释放时执行 release 方法发现 head.waitStatus 等于 0,无需执行 unparkSuccessor,仍然无法唤醒等待队列中的线程。

public final boolean release(int arg) {
    if (tryRelease(arg)) {
        Node h = head;
        // 执行 waitStatus 需要 head.ws 不等于 0
        if (h != null && h.waitStatus != 0)
            unparkSuccessor(h);
        return true;
    }
    return false;
}

为什么需要判断 pred.thread != null

if (pred != head &&
    ((ws = pred.waitStatus) == Node.SIGNAL ||
     (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&
    pred.thread != null) {
    //...
}

pred.thread 等于 null 有两种情况:

  • 情况一:pred 成为新的 head 节点,thread 被设置为 null;
  • 情况二:pred 也被取消了,thread 被设置为 null;

结论

  1. 发生情况一,pred 成为新的 head 节点,如果没有 pred.thread !=null,可能导致等待队列中的线程不会再被唤醒!
  2. 如果发生情况二 pred 被取消,没必要判断 thread != null,因为 是否唤醒后续线程的决定交由 pred 节点,node 不执行 unparkSuccessor 不会导致等待队列中的线程得不到唤醒。

假设出现如下序列,pred 节点的 waitStatus 属性等于 0,pred_t 成功获取锁,并将 pred 节点设置为头节点。

随后,pred_t 释放锁,因为检查到 head.ws 等于 0,不执行 unparkSuccessor。

在这之后,node_t 线程 CAS 更新 pred.ws 为 SIGNAL(-1);如果不检查 pred.thread,将不会有其他线程 unpark 等待队列中的线程。

pred_t 线程node 线程
pred != head
pred_t 线程执行 tryAcquire 返回 true (pred.ws == 0)
setHead(pred):
1. head属性设置为 pred
2. pred.thread设置为null
pred 立即释放锁,检查到 head.ws 等于0
不执行 unparkSuccessor
CAS 将 pred.ws 设置为 SIGNAL(-1)

解锁过程

ReentrantLock 通过 unlock 方法释放锁:

public void unlock() {
    sync.release(1);
}

本质是调用 AQS 的 release 方法,release 中调用 tryRelease 方法:

public final boolean release(int arg) {
    if (tryRelease(arg)) {
        Node h = head;
        if (h != null && h.waitStatus != 0)
            unparkSuccessor(h);
        return true;
    }
    return false;
}

ReentrantLock 内部类 Sync 实现了 tryRelease 方法,并不区分公平锁和非公平锁:

protected final boolean tryRelease(int releases) {
    // c 为剩余重入次数
    int c = getState() - releases;
    // 锁只能持有者线程才能释放, 当前线程若没有持有锁, 抛出异常
    if (Thread.currentThread() != getExclusiveOwnerThread())
        throw new IllegalMonitorStateException();
    boolean free = false;
    // 锁重入次数扣减为 0, 可以释放锁资源, 独占锁的持有线程 exclusiveOwnerThread 设置为 null
    if (c == 0) {
        free = true;
        // 锁持有线程设置为 null
        setExclusiveOwnerThread(null);
    }
    // 更新state
    setState(c);
    return free;
}
  1. 检查当前线程是否为互斥锁的所有线程,如果不是,抛出 IllegalMonitorStateException 异常。
  2. 检查 state 锁持有次数,减去释放次数 releases 后是否等于 0。如果等于 0,说明锁完全释放,将持有线程设置为 null,此时锁可以被其它线程获取。
  3. setState 更新 state。

阅读代码时的问题

问题一:为什么 release 中判断 h != null && h.waitStatus != 0

  • h 如果为 null,说明锁释放后,等待队列还未初始化,没有线程在等待锁资源;
  • h != null && h.waitStatus == 0 ,说明 head 没有后继节点,或者后继节点没有 park,仍然在运行中,无需唤醒
  • h != null && h.waitStatus < 0,head 的后继节点处于 WAITING 状态,需要唤醒。

问题二:是否可以先 setState,再 setExclusiveOwnerThread不可以!

tryAcquire 获取锁时,会先检查 state 是否为 0,如果为 0,则 CAS 更新为 state 为 1,然后设置 exclusiveOwnerThread 为当前线程。

假设线程 A 执行 tryRelease 释放锁,线程 B 执行 FairSync#tryAcquire 尝试获取锁,可能出现如下序列:

线程 A线程 B
setState(0)
compareAndSetState(0, acquires)
setExclusiveOwnerThread(current)
setExclusiveOwnerThread(null)

线程 A 执行先执行 setState 将 state 设置为 0,线程 B 执行 tryAcquire 是 CAS 更新 state 为 1 成功。

随后,线程 B 将锁持有线程更新为 current 当前线程,结果随后线程 A setExclusiveOwnerThread(null) 操作将 锁持有线程 设置为 null,造成了同步状态的不一致!



unparkSuccessor

tryRelease 执行成功后,如果需要唤醒队列中等待锁的线程,将执行 unparkSuccessor。该方法用于唤醒 node 后第一个未取消的节点

private void unparkSuccessor(Node node) {
	// node.ws 更新为 0, 准备 unpark 后继节点
    int ws = node.waitStatus;
    if (ws < 0)
        compareAndSetWaitStatus(node, ws, 0);
	// 先使用node.next指针找, 相比使用prev遍历, 效率更高
    Node s = node.next;
    if (s == null || s.waitStatus > 0) {
        s = null;
        // node.next被取消, 使用兜底方案
        // 从尾部节点开始找,利用prev指针向前遍历, 找到队列中第一个 ws<=0(未取消)的节点
        for (Node t = tail; t != null && t != node; t = t.prev)
            if (t.waitStatus <= 0)
                s = t;
    }
    if (s != null)
        LockSupport.unpark(s.thread);
}

代码我已经添加了详细注释,但有几点我需要解释下:

  • 使用 prev 指针从后往前找未取消节点 (ws <= 0) 是安全的,因为 prev 的所有修改仅仅是 跳过已取消节点 (ws==1),不影响查找未取消的节点

  • 使用 next 指针从前向后遍历是不安全的,因为可能存在 node 的后继节点 succ 完成 cancelAcquire 方法的调用。cancelAcquire 方法最后设置 succ.next==succ。此时 succ.next 指针是不可靠的,因此只能走从后往前遍历的方式。

  • 为什么 next 指针遍历不安全,还会先访问一次 node.next?

    因为 node.next 指针一定有效,unparkSuccessor 会在如下场景中被调用:

    • (1) 在 AQS release方法中执行 unparkSuccessor(head) 用以唤醒队列中的头节点,头节点的 next 指针一定有效。
    • (2) node 节点取消时,node 对应的线程在 cancelAcquire(node) 方法中执行 unparkSuccessor(node),此时 node.next 指针还未修改,一定是有效的。

总结:next 指针用于加快唤醒等待线程的效率,不必遍历整个 CLH 队列;而 prev 指针将作为 next 指向节点被取消后的兜底方案,从队列尾部后向前遍历,确保一定能找到最近的未取消节点




创作过程耗时费力,但我乐在其中(钻研源码的过程和分享知识是让人快乐的事情),如果大家喜欢这种图文结合、代码详细注释的写作风格,就给我点一个免费的赞吧!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1526338.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

【Qt问题】解决已经下载好的qt怎么添加或卸载其他组件

问题描述&#xff1a; 使用一段时间Qt以后发现需要用到某个组件&#xff0c;比如Qt Webengine这个组件等&#xff0c;而此时我又不可能把Qt卸载了&#xff0c;重新安装一次。 解决办法&#xff1a; 其实Qt提供了安装或卸载其他组件的方式&#xff0c;以下是操作步骤&#xff…

nuxt3项目总结

nuxt3项目总结 仓库 前言 大半年的时间&#xff0c;项目从秋天到春天&#xff0c;从管理后台到APP再到数据大屏&#xff0c;技术栈从vue3到uniApp再到nuxt3&#xff0c;需求不停的改&#xff0c;注释掉代码都快到项目总体的三分之一。 一、准备-搭建项目架子 1.1 创建一个…

windows无法启动HV主机服务 服务(位于本地计算机上) 错误 1068:依赖服务或组无法启动

背景: LZ本地装了VMware Workstation虚拟机,里面装了Oracle, 又在Docker里装了PostgreSQL, 操作系统是Windows10 专业版 如果启动VM,就报这个错Device/Credential Guard 不兼容 参考这个博客,就可以解决 与Device/Credetial Guard不兼容 我用到的是方法二, 但是想用Docker时, …

吴恩达机器学习笔记 二十四 决策树模型 学习过程 什么时候停止分裂 如何选择结点特征

案例&#xff1a;识别小猫&#xff0c;上面这个分类的特征 x 采用分类值&#xff08;几个离散的值&#xff09; 决策树最顶端的结点称根结点(root node)&#xff0c;除了根结点和叶子结点之外的叫决策结点(decision node)&#xff0c;最底层的叫叶子结点(leaf node)&#xff0c…

JVM的双亲委派模型和垃圾回收机制

jvm的作用是解释执行java字节码.java的跨平台就是靠jvm实现的.下面看看一个java程序的执行流程. 1. jvm中的内存区域划分 jvm也是一个进程,进程在运行过程中,要行操作系统申请一些资源.这些内存空间就支撑了后续java程序的执行. jvm从系统申请了一大块内存,这块内存在java程序使…

蓝桥杯2022年第十三届省赛真题-选数异或

solution1&#xff08;55/100 只保留最后一次的位置&#xff0c;其实有点问题&#xff0c;能骗一点分数 #include<iostream> #include<map> using namespace std; const int maxn 1e5 10; int a[maxn]; int main(){int n, m, x, l, r, t, flag;map<int, int…

知识分享:宣传海报中的APP下载二维码如何制作?

近期&#xff0c;在北京地铁西直门站内&#xff0c;出现了一组十分醒目的海报&#xff0c;海报上加大、加粗的趣味文字深深地共情了每一个苦命的打工人。 比如&#xff1a; 查询我的精神状态 老板画的饼有多大&#xff1f; 为什么早上的会那么好睡&#xff1f; 再比如&#x…

云计算与APP开发,如何利用云端服务提升应用性能?

随着移动应用程序&#xff08;APP&#xff09;的普及&#xff0c;如何提升应用性能成为了开发者们关注的重点之一。而云计算技术的发展为APP开发者提供了全新的解决方案。本文将探讨云计算与APP开发的结合&#xff0c;以及我们公司提出的解决方案&#xff0c;帮助开发者利用云端…

人工智能如何撬动新质生产力发展?

全国两会期间&#xff0c;“新质生产力”成为高频词&#xff0c;引发高度关注。新质生产力是由技术革命性突破、生产要素创新性配置、产业深度转型升级催生的当代先进生产力。而人工智能被视为形成新质生产力的重要引擎。 随着人工智能&#xff08;AI&#xff09;技术跨越奇点…

24.2 SpringCloud电商进阶开发

24.2 SpringCloud电商进阶开发 1. 定时任务1.1 使用场景1.2 CRON表达式1.3 代码实战*****************************************************************************************************1. 定时任务 1.1 使用场景

数学建模-多目标规划算法(美赛建模)

&#x1f49e;&#x1f49e; 前言 hello hello~ &#xff0c;这里是viperrrrrrr~&#x1f496;&#x1f496; &#xff0c;欢迎大家点赞&#x1f973;&#x1f973;关注&#x1f4a5;&#x1f4a5;收藏&#x1f339;&#x1f339;&#x1f339; &#x1f4a5;个人主页&#xff…

拆解Spring boot:Springboot为什么如此丝滑而简单?源码剖析解读自动装配

&#x1f389;&#x1f389;欢迎光临&#xff0c;终于等到你啦&#x1f389;&#x1f389; &#x1f3c5;我是苏泽&#xff0c;一位对技术充满热情的探索者和分享者。&#x1f680;&#x1f680; &#x1f31f;持续更新的专栏《Spring 狂野之旅&#xff1a;从入门到入魔》 &a…

关于Zookeeper分布式锁

背景 之前说到分布式锁的实现有三种 1、基于数据库实现的分布式锁 2、Redis分布式锁 3、Zookeeper分布式锁 前者redis分布式锁博客已具体介绍&#xff0c;此博客最终决定补齐关于Zookeeper分布式锁的实现原理。 简述 Zoopkeeper&#xff0c;它是一个为分布式的协调服务&…

linux下dlib静态库和动态库编译

本文讲述的linux系统下如何编译dlib的静态库和动态库方法。 dlib源码下载地址 dlib官网&#xff1a;dlib C LibraryGitHub - davisking/dlib: A toolkit for making real world machine learning and data analysis applications in C dlib源码的目录结构如下&#xff1a; 编…

某网乱序拼图验证码快速破解还原

注意,本文只提供学习的思路,严禁违反法律以及破坏信息系统等行为,本文只提供思路 本文的验证码网址如下,使用base64解码获得 aHR0cHM6Ly93d3cuZGluZ3hpYW5nLWluYy5jb20vYnVzaW5lc3MvY2FwdGNoYQ== ———————————————— 数据集如下,每张图片会切割成四个部分,…

git基础命令(二)

目录 git revert 撤消上一次提交的更改但是会创建一个新的提交来撤消该提交所做的更改git show 显示提交详细信息git mv 重命名文件git rm 从工作树和索引中移除文件git clean 从工作树中移除未跟踪文件git checkout 将文件恢复到工作树git reset 撤销更改、移动 HEAD 指针以及…

理论学习:with torch.no_grad()

如果不加上“with torch.no_grad():”&#xff0c;模型参数会发生改变吗&#xff1f; 如果不使用with torch.no_grad():&#xff0c;在进行模型推理&#xff08;即计算outputs_cls net(inputs[batch_size//2:])这一步&#xff09;时&#xff0c;模型参数不会发生改变&#xf…

【Godot4.2】求沿任意直线对称的点的坐标

概述 求一个点沿任意直线对称的点的坐标&#xff0c;这是一个简单的初中几何问题&#xff0c;如果用传统的求解思路&#xff0c;就是用公式&#xff0c;解方程组。但是作为Godot的使用者&#xff0c;要深谙向量的魅力。 我们绘制如下的图来分析&#xff1a; 对称轴L可以被理…

使用python开发GUI程序-用网页做界面

1.背景 之前一直使用PysimpleGUI来做带图形界面的程序&#xff0c;每次都需要编译成exe发给别人用&#xff0c;一来版本更新就要通知挨个通知更新&#xff0c;二来显示的界面也比较丑&#xff0c;要看大面积的数据&#xff0c;就不方便。 今天午餐时问了一下AI&#xff0c;把…

德人合科技|办公电脑文件资料防泄密软件

#天锐绿盾# 办公电脑文件资料防泄密软件通常具备以下几个关键功能来保障公司敏感信息的安全&#xff1a; PC端&#xff1a; https://isite.baidu.com/site/wjz012xr/2eae091d-1b97-4276-90bc-6757c5dfedee 1. 文件透明加密&#xff1a; 这是此类软件的核心功能之一&#xff…