(十七)抽象队列同步器AQS

news2025/1/15 22:52:55

AQS

AbstractQueuedSynchronizer抽象同步队列简称AQS,它是实现同步器的基础组件,并发包中锁的底层就是使用AQS实现。

类图如下,AbstractQueuedLongSynchronizer与AbstractQueuedSynchronizer结构一模一样,只是AbstractQueuedSynchronizer内部状态变量是 int类型的,AbstractQueuedLongSynchronizer内部的状态变量是 long类型的。

AbstractOwnableSynchronizer

这个类主要记录独占模式下占用资源的线程。
//线程独占的同步器
public abstract class AbstractOwnableSynchronizer implements java.io.Serializable {

    private static final long serialVersionUID = 3737899427754241961L;

    protected AbstractOwnableSynchronizer() { }

    /**
     * 独占模式同步的所有者线程
     */
    private transient Thread exclusiveOwnerThread;

    /**
     * 设置当前拥有独占访问权的线程。null表示没有线程拥有访问权。
     * @param thread the owner thread
     */
    protected final void setExclusiveOwnerThread(Thread thread) {
        exclusiveOwnerThread = thread;
    }

    /**
     * 返回setExclusiveOwnerThread方法最后设置的线程,如果从未设置,则返回null。
     * @return exclusiveOwnerThread
     */
    protected final Thread getExclusiveOwnerThread() {
        return exclusiveOwnerThread;
    }
}

AbstractQueuedSynchronizer

结构

AQS是一个FIFO的双向队列,其内部通过节点head和tail记录队头和队尾元素,队列元素为Node类型。
public abstract class AbstractQueuedSynchronizer
    extends AbstractOwnableSynchronizer
    implements java.io.Serializable {
    /**
     * 延迟初始化的等待队列头部。除了初始化之外,它只能通过setHead方法修改
     * 注意:如果head已经存在,它的waitStatus保证不是CANCELLED。
     */
    private transient volatile Node head;

    /**
     * 延迟初始化的等待队列尾部。只能通过enq方法添加新的等待节点修改
     */
    private transient volatile Node tail;

    /**
     * 同步状态,由子类实现其具体含义。对于ReentrantLock来说,state可以用来表示当前线程
     * 获取锁的可重入次数;
     */
    private volatile int state;
    private static final Unsafe unsafe = Unsafe.getUnsafe();
    // state字段的偏移量,CAS用
    private static final long stateOffset;
    // head字段的偏移量,CAS用
    private static final long headOffset;
    // tail字段的偏移量,CAS用
    private static final long tailOffset;
    // Node的waitStatus字段的偏移量,CAS用
    private static final long waitStatusOffset;
    // Node的next字段的偏移量,CAS用
    private static final long nextOffset;
    //......
}

enq方法

入队的方法,返回node的前驱节点
private Node enq(final Node node) {
    //死循环入队,直到成功退出
    for (;;) {
        Node t = tail;
        if (t == null) { 队尾为空先初始化队头,队尾,插入一个哨兵节点做头
            if (compareAndSetHead(new Node()))
                tail = head;
        } else {
            //前驱先链接
            node.prev = t;
            if (compareAndSetTail(t, node)) {//CAS操作,将队尾指针指向node
                t.next = node;//CAS成功则将t的后继指向node
                return t;
            }
        }
    }
}
对于AQS来说,线程同步的关键是对状态值state进行操作。根据state是否属于某个线程,操作state的方式分为 独占方式和共享方式

独占模式

使用独占方式获取的资源是与具体线程绑定的,就是说如果一个线程获取到了资源,就会标记是这个线程获取到了,其他线程再尝试操作state变量获取资源时会发现当前该资源不是自己持有的,就会在获取失败后被阻塞。

在独占方式下获取和释放资源使用的方法为:
//不响应中断
public final void acquire(int arg);
//响应中断
public final void acquireInterruptibly(int arg);
public final boolean release(int arg);
acquire
tryAcquire在AQS里没有实现,需要子类配合state变量的具体含义实现。使用tryAcquire方法尝试获取资源,具体是设置状态变量state的值,成功则直接返回,失败则将当前线程封装为类型为EXCLUSIVE的Node节点后插入到AQS阻塞队列的尾部,并调用LockSupport.park方法挂起自己。
public final void acquire(int arg) {
    // 如果tryAcquire失败了,则线程包装程node入队并挂起
    if (!tryAcquire(arg) &&
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
}

final boolean acquireQueued(final Node node, int arg) {
    boolean failed = true;
    try {
        boolean interrupted = false;
        for (;;) {
            final Node p = node.predecessor();
            if (p == head && tryAcquire(arg)) {//成功acquire
                setHead(node);
                p.next = null; // help GC
                failed = false;
                return interrupted;
            }
            //如果失败后应该挂起,则调用LockSupport.park挂起当前线程,并返回中断状态
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                interrupted = true;
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

private Node addWaiter(Node mode) {
    Node node = new Node(Thread.currentThread(), mode);
    // 先尝试一次入队,不行再调用enq死循环入队,直到成功
    Node pred = tail;
    if (pred != null) {
        node.prev = pred;
        if (compareAndSetTail(pred, node)) {
            pred.next = node;
            return node;
        }
    }
    enq(node);
    return node;
}
release
tryRelease也是交给子类实现。
当一个线程调用release方法时会尝试使用tryRelease操作释放资源,一般是设置状态变量state的值,然后调用LockSupport.unpark方法唤醒AQS队列里面被阻塞的一个线程。被唤醒的线程在acquireQueued方法里继续使用tryAcquire尝试,如果成功则该线程被真正唤醒继续运行,否则仍然会被放入AQS队列并被挂起。
public final boolean release(int arg) {
    if (tryRelease(arg)) {
        Node h = head;
        if (h != null && h.waitStatus != 0)//唤醒head的后继
            unparkSuccessor(h);
        return true;
    }
    return false;
}

private void unparkSuccessor(Node node) {
    int ws = node.waitStatus;
    if (ws < 0)
        compareAndSetWaitStatus(node, ws, 0);

    Node s = node.next;
    //node的后继是空或者被取消,则从队列尾部开始查找,找到第一个不是取消状态的node,唤醒
    if (s == null || s.waitStatus > 0) {
        s = null;
        for (Node t = tail; t != null && t != node; t = t.prev)
            if (t.waitStatus <= 0)
                s = t;
    }
    if (s != null)
        LockSupport.unpark(s.thread);
}

共享模式

获取共享方式的资源与具体线程是不相关的,当多个线程去请求资源时通过CAS方式竞争获取资源,当一个线程获取到了资源后,另外一个线程再次去获取时如果当前资源还能满足它的需要,则当前线程只需要使用CAS方式进行获取即可。

在共享方式下获取和释放资源的方法为:
//不响应中断
public final void acquireShared(int arg);
//响应中断
public final void acquireSharedInterruptibly(int arg);
public final boolean releaseShared(int arg);
acquireShared
tryAcquireShared也是交给子类实现。
首先使用tryAcquireShared尝试获取资源,成功则直接返回,失败则将当前线程封装为类型为SHARED的Node节点后插入到AQS阻塞队列的尾部,并使用LockSupport.park方法挂起自己。
public final void acquireShared(int arg) {
    if (tryAcquireShared(arg) < 0)
        doAcquireShared(arg);
}

private void doAcquireShared(int arg) {
    //共享模式的节点
    final Node node = addWaiter(Node.SHARED);
    boolean failed = true;
    try {
        boolean interrupted = false;
        for (;;) {
            final Node p = node.predecessor();
            if (p == head) {
                int r = tryAcquireShared(arg);
                if (r >= 0) {
                    setHeadAndPropagate(node, r);
                    p.next = null; // help GC
                    if (interrupted)
                        selfInterrupt();
                    failed = false;
                    return;
                }
            }
            //如果失败后应该挂起,则调用LockSupport.park挂起当前线程,并返回中断状态
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                interrupted = true;
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}
releaseShared
和tryRelease实现差不多,首先尝试使用tryReleaseShared操作释放资源,成功则使用LockSupport.unpark方法唤醒AQS队列里面被阻塞的一个线程。被唤醒的线程继续调用tryReleaseShared获取资源,如果成功则该线程被真正唤醒继续运行,否则仍然会被放入AQS队列并被挂起。
public final boolean releaseShared(int arg) {
    if (tryReleaseShared(arg)) {
        doReleaseShared();
        return true;
    }
    return false;
}

private void doReleaseShared() {
    for (;;) {
        Node h = head;
        if (h != null && h != tail) {
            int ws = h.waitStatus;
            if (ws == Node.SIGNAL) {
                if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                    continue;            
                unparkSuccessor(h);
            }
            else if (ws == 0 &&
                     !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                continue;                
        }
        if (h == head)                   // 如果队头被修改了则再次循环
            break;
    }
}

内部类Node

Node是AQS内部类,有两个用途,一是用于AQS阻塞队列,二是用来作为条件变量的条件队列。

Node中的thread变量用来存放进入AQS队列里的线程;
Node节点内部的SHARED用来标记该线程是获取共享资源时被阻塞挂起后放入AQS队列的,EXCLUSIVE用来标记线程是获取独占资源时被挂起后放入AQS队列的;
waitStatus记录当前线程等待状态,可以为CANCELLED(线程被取消了)、SIGNAL(线程需要被唤醒)、CONDITION(线程在条件队列里面等待)、PROPAGATE(释放共享资源时需要通知其他节点);
prev记录当前节点的前驱节点,next记录当前节点的后继节点。
//等待队列的节点
static final class Node {
    /** 标记节点正在共享模式下等待 */
    static final Node SHARED = new Node();
    /** 标记节点正在独占模式下等待 */
    static final Node EXCLUSIVE = null;

    /** waitStatus的值,表示线程已取消 */
    static final int CANCELLED =  1;
    /** waitStatus的值,表示后继节点的线程需要被唤醒 */
    static final int SIGNAL    = -1;
    /** waitStatus的值,表示线程正在条件变量上等待 */
    static final int CONDITION = -2;
    /** waitStatus的值,表示释放共享资源时需要通知其他节点 */
    static final int PROPAGATE = -3;

    /**
     * 状态:取值为CANCELLED、SIGNAL、CONDITION、PROPAGATE或0
     */
    volatile int waitStatus;

    /** 前驱节点 */
    volatile Node prev;

    /** 后继节点 */
    volatile Node next;

    /** 进入AQS队列的等待线程 */
    volatile Thread thread;

    /** 下一个在条件变量上等待的节点,或者是SHARED */
    Node nextWaiter;

    Node() {}    // 用来初始化头结点或者标记为SHARED模式

    Node(Thread thread, Node mode) {     // addWaiter方法使用
        this.nextWaiter = mode;
        this.thread = thread;
    }

    Node(Thread thread, int waitStatus) { // 条件变量使用
        this.waitStatus = waitStatus;
        this.thread = thread;
    }
}

内部类ConditionObject

这个类是Condition接口的实现,用来结合锁实现线程同步。ConditionObject可以直接访问AQS对象内部的变量,如state状态值和AQS队列。每个ConditionObject对应一个条件队列(单向链表),其用来存放调用条件变量的await方法后被阻塞的线程,这个条件队列的头、尾元素分别为firstWaiter和lastWaiter。
public class ConditionObject implements Condition, java.io.Serializable {
        private static final long serialVersionUID = 1173984872572414699L;
        /** 条件队列的首个节点 */
        private transient Node firstWaiter;
        /** 条件队列的最后一个节点 */
        private transient Node lastWaiter;

        public ConditionObject() { }

        /** 退出等待时重新中断 */
        private static final int REINTERRUPT =  1;
        /** 退出等待时抛出InterruptedException异常 */
        private static final int THROW_IE    = -1;
        //......
    }

await方法

条件变量的挂起方法,响应中断,awaitUninterruptibly不响应中断。该方法将当前线程封装为
CONDITION类型的node进入条件队列,并释放掉获得的锁,然后挂起。
public final void await() throws InterruptedException {
    if (Thread.interrupted())
        throw new InterruptedException();
    //当前线程封装为node入条件队列
    Node node = addConditionWaiter();
    //释放获取的锁
    int savedState = fullyRelease(node);
    int interruptMode = 0;
    while (!isOnSyncQueue(node)) {
        LockSupport.park(this);//挂起
        if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
            break;
    }
    if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
        interruptMode = REINTERRUPT;
    if (node.nextWaiter != null) // 清理取消的node
        unlinkCancelledWaiters();
    if (interruptMode != 0)
        reportInterruptAfterWait(interruptMode);
}

signal

条件变量的唤醒方法,isHeldExclusively方法由AQS子类实现,该方法主要是将条件队列的头结点转移到AQS队列中,并唤醒该节点的线程,等待机会获取锁。
public final void signal() {
    if (!isHeldExclusively())
        throw new IllegalMonitorStateException();
    Node first = firstWaiter;
    if (first != null)
        doSignal(first);
}

private void doSignal(Node first) {
    do {
        if ( (firstWaiter = first.nextWaiter) == null)
            lastWaiter = null;
        first.nextWaiter = null;
    } while (!transferForSignal(first) &&//将头结点从条件队列转移到AQS队列
             (first = firstWaiter) != null);
}

final boolean transferForSignal(Node node) {
    //防止节点取消
    if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
        return false;
    //AQS队列入队
    Node p = enq(node);
    int ws = p.waitStatus;
    if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
        LockSupport.unpark(node.thread);//唤醒node的线程
    return true;
}

锁、条件变量、队列的关系

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/178588.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Springboot+java师生交流答疑作业系统

&#xff0c;本系统拥有学生&#xff0c;教师&#xff0c;管理员三个角色&#xff0c;学生可以注册登陆系统&#xff0c;查看新闻&#xff0c;查看教学&#xff0c;在线提问答疑&#xff0c;提交作业&#xff0c;发布交流&#xff0c;留言反馈等功能&#xff0c;教师可以发布教…

恶意代码分析实战 14 反虚拟机技术

14.1 Lab17-01 题目 这个恶意代码使用了什么反虚拟机技术&#xff1f; 恶意代码用存在漏洞的x86指令来确定自己是否运行在虚拟机中。 如果你有一个商业版本IDAPro&#xff0c;运行第17章中代码清单17-4所示的IDAPython脚本&#xff08;提供如jindAniM.py&#xff09;&#…

spring boot前后端交互之数据格式转换

在前后端分离开发的项目种&#xff0c;前端获取数据的方式基本都是通过Ajax。请求方法也有所不同,常见的有POST,GET,PUT,DELETE等。甚至连请求的数据类型都不一样&#xff0c;x-www-form-urlencodeed,form-data,json等。 那么在前后端交互过程中&#xff0c;具体的数据该如何接…

ESP32设备驱动-8x8LED点阵驱动(基于Max7219+SPI)

8x8LED点阵驱动(基于Max7219+SPI) 1、Max7219介绍 MAX7219/MAX7221是紧凑型串行输入/输出共阴极显示驱动器,可将微处理器(Ps)连接到多达8位的7段数字LED显示器、条形图显示器或64个独立LED。片上包括一个 BCD 代码 B 解码器、多路扫描电路、段和数字驱动器,以及存储每个数字…

通信电子、嵌入式类面试题刷题计划04

文章目录036——看门狗电路的作用是什么&#xff1f;【社招】037——你了解CAN总线协议吗&#xff1f;说一说你的理解【社招】038——锁存器、触发器、寄存器三者的区别&#xff1f;【校招】039——D触发器和D锁存器的区别是什么&#xff1f;【校招】040——三极管和MOS管的区别…

Cadence PCB仿真使用Allegro PCB SI生成单网络EMI报告Single Net EMI Report及报告导读图文教程

🏡《Cadence 开发合集目录》   🏡《Cadence PCB 仿真宝典目录》 目录 1,概述2,生成报告3,报告导读4,总结1,概述 单网络EMI报告是值将差分模式下的网络视为单个网络,分析来自时钟上升沿的辐射影响。本文简单介绍使用Allegro PCB SI生成单网络EMI报告的方法,及Singl…

搜索引擎位置跟踪应用SerpBear

什么是 SerpBear ? SerpBear 是一款开源搜索引擎位置跟踪应用程序。它允许你跟踪你的网站在谷歌中的关键词位置&#xff0c;并得到他们的位置通知。 软件特点&#xff1a; 无限关键词&#xff1a;添加无限域名和无限关键词以跟踪其 SERP电子邮件通知&#xff1a;每天/每周/每…

车载以太网简介

车载以太网简介 基本概念 传统车载网络 LIN&#xff1a;用于通信速率低的场景&#xff0c;比如车窗、座椅等。CAN&#xff1a;目前车载网络首先&#xff0c;低成本高可靠。FlexRay &#xff1a;具备故障容错的车载总线系统。MOST&#xff1a;内置流媒体数据信道&#xff0c;…

2023年企业信息安全缺陷和解决方案,防止职员外泄信息

随着网络的发展和普及&#xff0c;信息安全与每个人息息相关&#xff0c;包含方方面。每个人既是独立个体又必须和社会交换资源。这就需要把控一个尺度。 要了解信息安全&#xff0c;首先需要对信息有个大体了解。从拥有者和使用者分类分为&#xff0c;个人&#xff0c;企业&a…

恶意代码分析实战 11 恶意代码的网络特征

11.1 Lab14-01 问题 恶意代码使用了哪些网络库&#xff1f;它们的优势是什么&#xff1f; 使用WireShark进行动态分析。 使用另外的机器进行分析对比可知&#xff0c;User-Agent不是硬编码。 请求的URL值得注意。 回答&#xff1a;使用了URLDownloadToCacheFileA函数&#…

JavaEE多线程-定时器

目录一、定时器1.1 什么是定时器&#xff1f;1.2 定时器的构成二、简单实现定时器一、定时器 1.1 什么是定时器&#xff1f; 定时器是多线程编码中的一个重要组件,它就好比一个闹钟,例如我们想去坐车,但是不想现在去坐车,想8:30去坐车,于是我们订了一个8点钟的闹钟,也就是说定…

Linux内核驱动初探(四) 内部看门狗

目录 0. 前言 1. menuconfig 2. 设备树 3. 拓展试验 0. 前言 这次的内部看门狗驱动也比较顺利&#xff0c;重点看了 原理图和4.19.x 内核的配置。 内部看门狗设备名叫做 /dev/watchdog 。 1. menuconfig 我们在 linux-menuconfig 里面如下设置&#xff1a;进入 Device D…

[Java]JavaWeb学习笔记(动力节点老杜2022)

文章目录&#x1f97d; Tomcat服务器&#x1f30a; 下载与安装&#x1f30a; 关于Tomcat服务器的目录&#x1f30a; 启动Tomcat&#x1f30a; 实现一个最基本的web应用&#xff08;这个web应用中没有java小程序&#xff09;&#x1f97d; 静态资源与动态资源&#x1f97d; 模拟…

GPU虚拟化(留坑)

文章内容大程度参考B站王利明老师对《GPU虚拟化技术分享》的演讲&#xff1a;https://b23.tv/uQKBpcK GPU 有什么用&#xff1f; GPU可以用于图形渲染&#xff0c;也能够用于高性能计算和编解码等场景。 图&#xff1a;GPU 的典型软件架构&#xff08;不含虚拟化&#xff09; …

注解存储对象到Spring,详解 五大类注解 和方法注解

上一篇博客我们介绍了如何使用xml来引入bean对象&#xff0c;当项目多的时候&#xff0c;显然那样是比较麻烦的。现在我们只需要 个注解就可以替代了。注意&#xff1a;注解和xml可以同时使用准备工作:配置扫描路径我们需要配置 下存储对象的扫描包路径&#xff0c;只有被配置的…

【笔记】openwrt - full cone NAT(全锥NAT)、解决“arp_cache: neighbor table overflow!”

最近安装了比特彗星&#xff08;bitcomet&#xff09;后&#xff0c;老是收到警告说日志的接收超过每秒上限了。一看日志&#xff0c;好家伙&#xff0c;一堆的kern.info kernel: [194004.157620] neighbour: arp_cache: neighbor table overflow!日志&#xff0c;还是kernel的…

损失函数总结

回归损失与分类损失 回归用于逼近某个数值,预测的结果是连续的,例如预测小明的身高,160,161,162,163cm。平方损失即MSE: 分类用于预测物体属于某一个标签,预测的结果是离散的,例如预测明天是否下雪:是or否。 由于预测分类,最终的输出需要介于(0,1)之间,通常在网络…

Redis消息队列 | 黑马点评

目录 一、认识消息队列 二、List模拟消息队列 三、PubSub的消息队列 四、Stream的消息队列&#xff08;重点&#xff09; 1、单消费模式 2、消费者组 五、redis三种消息队列对比 六、优化秒杀实战 1、创建消息队列 2、修改下单脚本 3、接收消息处理 一、认识消息队列 …

设计模式 - 创建型模式_工厂方法模式

文章目录创建型模式概述CaseBad ImplBetter Impl &#xff08;⼯⼚模式优化代码&#xff09;创建型模式 创建型模式提供创建对象的机制&#xff0c; 能够提升已有代码的灵活性和可复⽤性。 类型实现要点工厂方法定义⼀个创建对象的接⼝&#xff0c;让其⼦类⾃⼰决定实例化哪⼀…

【蓝桥杯-筑基篇】基础数学思维与技巧(1)

&#x1f353;系列专栏:蓝桥杯 &#x1f349;个人主页:个人主页 目录 1.一百以内的AB 2.小学生算术求进位次数 3.最大公约数 4.最小公倍数 5.十进制转换其他进制 6.其他进制转十进制 7.天空数 8.求集合的所有子集 9.判断一个数是否为2的次方数 10.二进制中1的个数 1.一…