Java并发编程(8) —— AQS抽象同步队列详解

news2025/1/16 13:57:47

上一篇:Java并发编程(7) —— 锁的分类概述
在上一篇中我们提到并发包中的ReentrantLock类是一种可重入独占锁,其锁机制是基于AQS实现的。实际上,并发包java.util.concurrent.locks中的锁都是基于AQS 实现的。

一、AQS是什么

AbstractQueuedSynchronizer抽象同步队列简称AQS,它是实现锁和同步器的基础组件,并发包中锁的底层就是使用AQS实现的。AQS 为构建锁和同步器提供了一些通用功能的实现,因此使用 AQS 能简单且高效地构造出应用广泛的大量的同步器,比如ReentrantLock,Semaphore,ReentrantReadWriteLock,SynchronousQueue等等皆是基于 AQS 的。

二、AQS的类图结构

AQS的类图结构如下:
在这里插入图片描述

1. 主要成员变量

  • exclusiveOwnerThread:当前持有锁的线程。
  • headtail:双向的FIFO同步队列,存放等待获取锁的阻塞状态的线程。
  • state:volatile修饰的int变量,默认值为0。共享资源标志。例如在多线程同时竞争独占锁时,会尝试用CAS操作修改state的值,只有一个线程能修改成功,CAS成功则将exclusiveOwnerThread设为自己。相关操作函数:
    • getState():获取state的值
    • setState(int newState):直接设置state值
    • compareAndSetState(int expect, int update):通过CAS操作尝试更新state值,成功返回true,失败返回false。

2. 内部类

  • ConditionObject:等待队列,用于实现具有等待/通知功能的同步器。
  • Node:同步/等待队列中的节点。属性说明:
    • thread:节点中保存的线程引用
    • waitStatus:当前节点在队列中的状态。CANCELLED(线程被取消了)、SIGNAL(后继节点线程等待被唤醒)、CONDITION(线程在等待队列里面等待)、PROPAGATE(释放共享资源时需要通知其他节点)。
    • prev,next:用在同步队列中,表示双向队列的前驱节点与后继节点。
    • nextWaiter:用在等待队列中,表示后继节点。

3. 同步器的通用模板方法

在这些模板方法中AQS实现了资源获取与释放后同步队列的调度维护,具体的资源获取与释放过程则由用户继承AQS时通过钩子方法自行实现(模板方法设计模式)

  • acquire(int arg):独占式获取资源模板。内部调用了tryAcquire(),若其返回true则直接放行获得共享资源,若其返回false则将线程阻塞并加入同步队列。
    在这里插入图片描述

  • release(int arg):独占式释放资源模板。内部调用了tryRelease(),若其返回true则唤醒同步队列中下一个需要唤醒的线程来竞争锁。
    在这里插入图片描述

  • acquireShared(int arg):共享式获取资源模板。内部调用了tryAcquireShared()

  • releaseShared(int arg):共享式释放资源模板。内部调用了tryReleaseShared()

其它

  • acquireInterruptibly(int arg):独占式获取并响应中断。等待获取锁的过程中可响应interrupted()中断。对应的还有共享式的acquireSharedInterruptibly
  • tryAcquireNanos(int arg, long time):独占式超时获取。即当前线程没有在指定时间内获取锁则返回失败。对应的还有共享式的tryAcquireSharedNanos

4. 同步器需实现的钩子方法

在模板方法中会调用这些钩子方法,AQS中这些方法默认直接抛出不支持异常,继承AQS实现同步器时需根据需要选择实现这些方法

  • tryAcquire(int arg):独占方式。尝试获取资源,成功则返回true,失败则返回false。
  • tryRelease(int arg):独占方式。尝试释放资源,成功则返回true,失败则返回false。
  • tryAcquireShared(int):共享方式。尝试获取资源。负数表示失败;0表示成功,但没有剩余可用资源;正数表示成功,且有剩余资源。
  • tryReleaseShared(int):共享方式。尝试释放资源,成功则返回true,失败则返回false。
  • isHeldExclusively():该线程是否正在独占资源。只有用到condition才需要去实现它。

三、基于AQS实现同步器的基本原理

继承AQS并重写所需的钩子方法。如下为实现了独占式可重入锁的自定义同步器:

  1. 重写独占锁所需的钩子方法tryAcquire()和tryRelease(),通过操作共享资源标志state来实现资源获取与释放的过程。
  2. 提供加锁方法lock(),内部调用独占式获取资源方法acquire();提供解锁方法unLock(),内部调用独占式释放资源方法release(int arg)
public class MyLock extends AbstractQueuedSynchronizer {

    @Override
    protected boolean tryAcquire(int arg) {
        if (getExclusiveOwnerThread() == Thread.currentThread()) {
            //若尝试获取锁的线程已持有锁,则直接返回true,以实现可重入性
            return true;
        }
        //定义锁空闲状态state为0,尝试用CAS操作将其修改为1。多个线程同时竞争时只有一个线程能够修改成功,其余进入同步队列阻塞等待,实现独占性
        if (compareAndSetState(0, 1)) {
            //若修改成功则将exclusiveOwnerThread设置为当前线程
            setExclusiveOwnerThread(Thread.currentThread());
            return true;
        }
        return false;
    }

    @Override
    protected boolean tryRelease(int arg) {
        if (getExclusiveOwnerThread() != Thread.currentThread())
            //若当前线程不持有锁而调用了此方法,则抛出非法监视器状态异常
            throw new IllegalMonitorStateException();
        setExclusiveOwnerThread(null);
        setState(0);//将锁标志置为空闲
        return true;
    }

    public void lock() {//独占式申请锁资源
        acquire(1);
    }

    public void unLock() {//独占式释放锁资源
        release(1);
    }
}

我们通过如下的一个多线程下num++的案例来测试上述同步器

class MyLockTest {
    private static int num = 0;
    public static void main(String[] args) throws InterruptedException {
        MyLock myLock = new MyLock();
        for (int i = 1;i <= 10;i++) {
            new Thread(() -> {
            	myLock.lock();//临界:只有获取到锁的线程才能继续往下执行
                try {
                    for(int j=0;j<1000;j++){
                        myLock.lock();//验证可重入锁,若不可重入则会在这里陷入死锁,阻塞自己等待自己释放锁
                        num++;
                    }
                } catch (Exception ignore) {
                } finally {
                    myLock.unLock();//在finally中释放锁,防止线程被异常终止时锁没有被释放
                }
            }).start();
        }
        Thread.sleep(3000);//等待上面10个子线程执行完毕后打印num值,预期值10000
        System.out.println("num = " + num);
    }
}

四、AQS的等待/通知机制——Condition接口实现

上述我们实现的自定义同步器中只能对资源进行加锁解锁,如需实现线程间的通知等待机制,还需要依赖AQS中的内部类ConditionObject。

ConditionObject实现了Condition接口(JUC.locks包下定义的一个接口),提供了一种类似Object类中notify()/wait()的监视器方法,用于与Lock配合实现通知/等待机制

Obejct的监视器模型中,一个对象关联的监视器中拥有一个同步队列和一个等待队列,而AQS在维护一个同步队列的同时支持创建多个等待队列,一个等待队列对应一个ConditionObject对象

1. Condition接口

  • Condition中定义了等待/通知两种类型的接口方法
    • await():持有锁的线程释放锁进入等待状态,直到被通知或中断。
    • awaitUninterruptly():不响应中断的await()。
    • signal()/signalAll():持有锁的线程唤醒一个/所有等待队列中的线程。
  • Lock接口中定义了newCondition()方法,即在锁的实现中,可以通过调用newCondition()创建一个与锁关联的Condition对象,从而实现等待/通知机制。

2. ConditionObject

每个ConditionObject对象都维护了一个FIFO等待队列,节点类型与同步队列节点类型相同(静态内部类AQS.Node),实例变量firstWaiter和lastWaiter分别指向了等待队列的头结点和尾结点。

在这里插入图片描述
调用condition.await(),将会以当前线程构造节点从尾部加入等待队列并阻塞。新增节点只需将原有尾结点nextWaiter指向新节点,并且更新lastWaiter即可,调用await()方法的线程必定是获取了锁的线程,因此这个过程不需要CAS保证。

3. 等待机制实现

持有锁的线程,可以通过调用await()方法释放锁并进入等待队列。await()源码如下:

public final void await() throws InterruptedException {
    if (Thread.interrupted())
        throw new InterruptedException();
    Node node = addConditionWaiter(); //1.构造Node节点加入等待队列
    int savedState = fullyRelease(node);//2.内部调用release()释放锁并唤醒同步队列中的下一个待唤醒线程
    int interruptMode = 0;
    while (!isOnSyncQueue(node)) {
        LockSupport.park(this); //3.阻塞当前线程
        if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
            break;
    }
    //4.被唤醒后开始尝试获取锁
    if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
        interruptMode = REINTERRUPT;
    if (node.nextWaiter != null) // clean up if cancelled
        unlinkCancelledWaiters();
    if (interruptMode != 0)
        reportInterruptAfterWait(interruptMode);
}

假设当前持有锁的线程为T:

在这里插入图片描述

此时线程T可以调用CO1.await()方法进入等待状态,同步器的状态发生变化:

  1. 线程T构造Node节点(waitStatus值设为-2 即CONDITION状态)加入到CO1对应的等待队列尾部。
  2. 线程T释放锁,并唤醒T1,HEAD指向T1线程节点,T1线程从阻塞处恢复,重新尝试获取锁。同步队列中T线程的Node节点销毁
  3. 调用LockSupport.park(condition)使线程T进入阻塞状态。

在这里插入图片描述

通知机制在下一节剖析,但通过分析await()方法的源码可以看出:

在这里插入图片描述

  • 被唤醒的线程从LockSupport.park(this)这行代码恢复之后,需要重新获取锁
  • 只有从acquireQueued(node, savedState)方法退出,即获取到锁之后,才会从await()方法返回
  • 总结一下:调用await()的前提是当前线程获取了锁,调用await()后会释放锁进入阻塞态,当重新获取到锁后,才会从await()返回

4. 通知机制实现

持有锁的线程,可以通过调用signal()/signalAll()方法,将等待队列中的节点移动到同步队列中,signal()方法处理的是等待队列中的首个节点,signalAll()处理的是等待队列中的全部节点。以signal()方法为例:

public final void signal() {
    if (!isHeldExclusively()) //1.调用isHeldExclusively()确定当前线程是持有锁的线程。因此同步器需重写此钩子方法
        throw new IllegalMonitorStateException();
    Node first = firstWaiter;
    if (first != null)
        doSignal(first);
}

private void doSignal(Node first) {
    do {
    	//2.1首个节点出队,while循环是为了排除已取消的节点
        if ( (firstWaiter = first.nextWaiter) == null)
            lastWaiter = null;
        first.nextWaiter = null;
    } while (!transferForSignal(first) &&
             (first = firstWaiter) != null);
}

final boolean transferForSignal(Node node) {
	//3.1更改node的状态为0(同时跳过已取消的节点)
    if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
        return false;

    Node p = enq(node); //3.2 将节点添加到同步队列尾部(CAS)
    int ws = p.waitStatus;
    if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
    	//3.3如果前驱节点取消了,需要主动唤醒(这种思想在cancleAcquire方法解析的博文中已经做了详细剖析)
        LockSupport.unpark(node.thread);
    return true;
}

假设上一节中T1线程获取到锁后,调用了CO1.signal()方法:

  • 等待队列中首个节点T3出队,并将其节点状态由-2(CONDITION)置为0,然后添加到同步队列尾部

在这里插入图片描述

  • 可以看出,T1线程调用signal()方法之后,T3线程并没有从阻塞态退出(按signal()的字面意思理解只是发出一个信号,和notify()方法类似,当前线程并不会释放锁),仍依赖于同步队列的前驱节点(即T2线程)唤醒

  • 总结一下:调用signal()的前提是当前线程获取了锁,调用signal()后会将等待队列中的首个节点移动到同步队列尾部,但并不会直接唤醒该节点的阻塞态,仍依赖于同步队列中的前驱节点唤醒

此节转载于:详解AQS对Condition接口的具体实现

五、自定义同步器实现生产者-消费者模型

通过上述分析我们发现如果要使同步器额外实现通知等待功能,只需要开放创建等待队列ConditionObject对象的方法并重写钩子方法isHeldExclusively()。因此我们在前面实现的同步器中加入如下代码即可实现功能拓展:

public class MyLock extends AbstractQueuedSynchronizer {

    final ConditionObject newCondition() {//开放创建等待队列,实现等待/通知机制
        return new ConditionObject();
    }

    @Override
    protected boolean isHeldExclusively() {//判断当前线程是否已独占式持有锁
        return getExclusiveOwnerThread() == Thread.currentThread();
    }

	......
}

在之前的文章(Java并发编程(3) —— 线程的生命周期和状态)中,我们基于Object的监视器方法wait()/notifyAll()实现了一个生产者-消费者队列,但是有个问题就是notifyAll()会唤醒所有生产者和消费者线程(因为一个对象关联的监视器中拥有一个同步队列和一个等待队列,生产者和消费者线程都在同一个等待队列中),这显然是不必要的且会造成性能浪费。而在AQS中可以有多个等待队列,因此在这里我们就可以使用上述基于AQS实现的具有等待通知功能的同步器来实现一个生产者-消费者队列,将生产者等待队列和消费者等待队列分离,这样在需要唤醒生产者/消费者时只需要在生产者/消费者等待队列中去唤醒。实现如下:

/**
 * 生产者消费者等待队列分离的生产者-消费者队列
 */
class ProduceConsumeQueue<T> {
    private final LinkedList<T> queue = new LinkedList<>();
    private final static int MAX_COUNT = 10;//最大库存

    MyLock myLock = new MyLock();
    Condition producerWaitCond = myLock.newCondition();//生产者等待队列
    Condition consumerWaitCond = myLock.newCondition();//消费者等待队列

    public void put(T resource) {
        myLock.lock();
        try {
            while (queue.size() == MAX_COUNT) {
                //若库存已满,则将当前线程(生产者)加入生产者等待队列阻塞等待
                System.out.println("生产者:队列已满,无法插入...");
                producerWaitCond.await();
            }
            //否则生产一个资源,并从消费者等待队列随机唤醒一个消费者线程到同步队列参与锁竞争
            queue.addFirst(resource);
            System.out.println("生产者:插入"+resource + "! ! !");
            consumerWaitCond.signal();
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            myLock.unLock();
        }
    }

    public void take() {
        myLock.lock();
        try {
            while (queue.size() == 0) {
                //若库存为空,则将当前线程(消费者)加入消费者等待队列阻塞等待
                System.out.println("消费者:队列为空,无法取出...");
                consumerWaitCond.await();
            }
            //否则消费一个资源,并从生产者者等待队列随机唤醒一个生产者线程到同步队列参与锁竞争
            queue.removeLast();
            System.out.println("消费者:取出消息! !!");
            producerWaitCond.signal();
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            myLock.unLock();
        }
    }
}

测试:

class PCQueueTest {
    public static void main(String[] args) {
        ProduceConsumeQueue<String> produceConsumeQueue = new ProduceConsumeQueue<>();
        //生产者线程 可多个
        for (int p = 1;p <= 3;p++) {
            new Thread(() -> {
                for (int i = 0;i < 50;i++) {
                    produceConsumeQueue.put("消息" + Thread.currentThread().getName() + "-msg" + i);//生产
                }
            }).start();
        }
        //消费者线程 可多个
        for (int c = 1;c <= 3;c++) {
            new Thread(() -> {
                for (int i = 0; i < 50; i++) {
                    produceConsumeQueue.take();//消费
                }
            }).start();
        }
    }
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/415626.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

13. unity粒子特效--发射模块、各种发射器形状、粒子渐变(颜色/大小)

1. 发射模块&#xff08;Emission&#xff09; 匀速发射&#xff1a; Rate over Time&#xff1a;每秒钟发射的粒子数 Rate over Distance&#xff1a;每移动一米发射的粒子个数 两者可指定其一&#xff1a;若仅指定Rate over Time&#xff0c;则粒子根据时间的变化进行发射&a…

第三节、语言模型

目录 0、介绍 1、N-gram 模型介绍 2、困惑度 3、N-gram 模型的文本评估 4、N-gram 模型的平滑 5、基于 N-gram 模型的文本生成 6、基于统计的语言模型的缺陷 7、实验总结 0、介绍 首先&#xff0c;我们来思考这样一个问题&#xff1a;随便给你一句话&#xff0c;如何判…

MongoDB中的索引

一、说明 索引通常能够极大的提高查询的效率,如果没有索引,MongoDB在读取数据时必须扫描集合中的每个文件并选取那些符合查询条件的记录。这种扫描全集合的查询效率是非常低的,特别在处理大量的数据时,查询可能要花费几十秒甚至几分钟,这对网站的性能是非常致命的。索引是…

Javaweb小练习---在JSP中使用Javabean访问数据库完成用户信息的简单添加

Javaweb小练习---在JSP中使用Javabean访问数据库完成用户信息的简单添加 目录 Javaweb小练习---在JSP中使用Javabean访问数据库完成用户信息的简单添加 0.创建数据库 1. 在resources目录下创建db.properties文件 2. /** * 获取链接与释放资源的工具类--JdbcUtil类 */ 3…

UE-Ueransim-5GC全链路开发记录

目录 1. 系统配置 1.1 Ueransim配置 1.2 UE配置 2. 启动 3. 实际演示 附录 代理1&#xff1a;ueransim-5gc 代理2 ue-ueransim TCPclient TCPserver 1. 系统配置 1.1 Ueransim配置 ueransim的yaml文件如下 version: 3.8 services:ueransim2:container_name: uera…

Ubantu docker学习笔记(六)容器数据卷

文章目录一、容器数据卷二、容器卷挂载2.1 在命令行挂载数据卷2.2 通过dockerfile挂载数据卷三、数据卷容器四、备份数据卷五、数据卷的恢复和迁移5.1 恢复数据卷5.2 迁移数据卷六、管理数据卷6.1 与容器关联&#xff08;1&#xff09;例子一&#xff08;2&#xff09;例子二&a…

【LeetCode】剑指 Offer(28)

目录 题目&#xff1a;剑指 Offer 54. 二叉搜索树的第k大节点 - 力扣&#xff08;Leetcode&#xff09; 题目的接口&#xff1a; 解题思路&#xff1a; 代码&#xff1a; 过啦&#xff01;&#xff01;&#xff01; 题目&#xff1a;剑指 Offer 55 - I. 二叉树的深度 - 力…

MySQL运维11-MySQL的事务隔离级别

文章目录1、MySQL的事务隔离级别2、MySQL事务隔离级别的相关参数和命令2.1、查看事务隔离级别2.2、设置事务隔离级别2.2.1、在会话中设置事务隔离级别2.2.2、在配置文件中设置事务隔离级别3、MySQL的多版本并发控制(MVCC)4、总结1、MySQL的事务隔离级别 事务隔离级别越高&…

Qt5.12实战之规则DLL导出函数使用

1.创建基于MFC的规则DLL工程: 输入工程名,然后点击创建 选择使用共享MFC DLL的常规DLL 创建成功后,解决方案下会多出一个工程 增加导出函数声明 实现导出函数 在模块定义文件def文件中声明导出

YC-B09(原创)基于springboot,vue网上书城

(原创)基于springboot,vue网上书城定制版v4.0 本人原创作品&#xff0c;用户前台、系统管理员后台项目完整&#xff0c;无任何bug。 每行代码都是本人自己写&#xff0c;我在代码上面都写有详细注释 开发工具&#xff1a;IDEA 服务器&#xff1a;Tomcat9.0&#xff0c; jdk…

代码随想录算法训练营第五十九天-单调栈2| 503.下一个更大元素II 42. 接雨水

503. Next Greater Element II 成环就用取模mod方法 import java.util.Arrays; import java.util.Stack;public class NextGreaterElement2 {public int[] nextGreaterElements(int[] nums) {//边界判断if(nums null || nums.length < 1){return new int[]{-1};}int size …

无需兔魔法!国内手机直接畅玩GPT!

为了用上ChatGPT很多同学都是经历一波三折&#xff0c;闯三关过五将&#xff01;因为使用ChatGPT的门槛很高&#xff0c;尤其是这个kx上网把很多人都挡在了门外&#xff01;有的同学说newbing呢&#xff0c;newbing如果你要用聊天功能&#xff0c;一样有这样门槛&#xff01;很…

Ubuntu docker 基本操作

安装docker&#xff1a; curl -fsSL https://get.docker.com | bash -s docker --mirror Aliyun 也可以使用国内 daocloud 一键安装命令&#xff1a; curl -sSL https://get.daocloud.io/docker | sh 启动Docker命令&#xff1a; systemctl start docker 查看版本号&#xf…

NKCTF2023 babyrust

这道题目适合科普&#xff1a;rust逆向&#xff0c;xmm指令。 rust逆向的一些注意事项 rust题中&#xff0c;“ida给的main”里的第一个lea是“用户写的main”。 如下图&#xff0c;这是ida标记的main。而用户写的main其实是sub_1400012A0。姑且可以认为“ida给的main”里的…

初识springcloud

认识微服务 单体架构&#xff1a; 简单方便&#xff0c;高度耦合&#xff0c;扩展性差&#xff0c;适合小型项目&#xff0c;例如&#xff0c;学生管理系统。 分布式架构 松耦合&#xff0c;扩展性好&#xff0c;但架构复杂&#xff0c;难度大。适合大型互联网项目。如&#x…

【数据结构】解析队列各接口功能实现

目录 前言&#xff1a; 一、队列概述&#xff1a; 1.队列的概念&#xff1a; 二、队列的各种接口功能实现&#xff1a; 1.初始化队列&#xff1a; 2.入队&#xff08;尾插&#xff09;&#xff1a; 3.出队&#xff08;头删&#xff09;&#xff1a; 4.查看队头&#xf…

Zookeeper安装(Win和Linux)

Zookeeper安装 Zookeeper单机安装&#xff08;Windows&#xff09; 下载地址&#xff1a;Apache ZooKeeper 1.1 下载安装 下载好的文件进行解压缩得到apache-zookeeper-3.8.0-bin目录&#xff0c;创建zkdata及log目录&#xff0c;后续作为数据存放目录 1.2 配置启动 我们…

易点易动设备管理系统高效管理海量备品备件

纸质设备备品备件管理是企业运营中的重要环节&#xff0c;其管理效率和精度直接关系到企业的生产效率和经济效益。然而&#xff0c;传统的纸质管理方式存在诸多问题&#xff0c;如信息不透明、数据难以更新、易丢失等。为解决这些问题&#xff0c;易点易动设备管理系统应运而生…

webpack 之 Loader开发(一)

1. You may need an additional loader to handle the result of these loaders.&#xff08;我们可能还需要一个额外的加载器来处理当前加载器的结果&#xff09; 2. Loader可能经过一层层链路、又或者只有一层&#xff0c;最终的处理都是转化成 js&#xff0c;&#xff08;L…

【LeetCode: 139. 单词拆分 | 暴力递归=>记忆化搜索=>动态规划】

&#x1f34e;作者简介&#xff1a;硕风和炜&#xff0c;CSDN-Java领域新星创作者&#x1f3c6;&#xff0c;保研|国家奖学金|高中学习JAVA|大学完善JAVA开发技术栈|面试刷题|面经八股文|经验分享|好用的网站工具分享&#x1f48e;&#x1f48e;&#x1f48e; &#x1f34e;座右…