聊聊并发编程——多线程之AQS

news2025/1/11 22:42:16

目录

队列同步器(AQS)

独占锁示例

AQS之同步队列结构

解析AQS实现


队列同步器(AQS)

队列同步器AbstractQueuedSynchronizer(以下简称同步器),是用来构建锁或者其他同步组 件的基础框架,它使用了一个int成员变量表示同步状态,通过内置的FIFO队列来完成资源获 取线程的排队工作,并发包的作者(Doug Lea)期望它能够成为实现大部分同步需求的基础。

  • 同步器的主要使用方式是继承,子类通过继承同步器并实现它的抽象方法来管理同步状态。

  • 使用同步器提供的3 个方法(getState()、setState(int newState)和compareAndSetState(int expect,int update))对同步状态进行更改,因为它们能够保证状态的改变是安全的。

同步器是实现锁(也可以是任意同步组件)的关键,在锁的实现中聚合同步器,利用同步器实现锁的语义。可以这样理解二者之间的关系:锁是面向使用者的,它定义了使用者与锁交 互的接口(比如可以允许两个线程并行访问),隐藏了实现细节;同步器面向的是锁的实现者, 它简化了锁的实现方式,屏蔽了同步状态管理、线程的排队、等待与唤醒等底层操作。锁和同步器很好地隔离了使用者和实现者所需关注的领域。

独占锁示例

Lock接口的实现基本都是通过聚合了一个同步器的子类来完成线程访问控制的,通过独占锁了解下队列同步器(AQS)。

public class Mutex implements Lock {
​
    // 子类推荐被定义为自定义同步组件的静态内部类
    // 同步器自身没有实现任何同步接口,它仅仅是定义了若干同步状态获取和释放的方法来供自定义同步组件使用
    private static class Sync extends AbstractQueuedSynchronizer {
        // 是否处于独占状态
        protected boolean isHeldExclusively() {
            return getState() == 1;
        }
        // 当状态为0的时候获取锁
        public boolean tryAcquire(int acquire) {
            if (compareAndSetState(0, 1)) {
                return true;
            }
            return false;
        }
        // 释放锁,将状态设置为0
        protected boolean tryRelease(int release) {
            if (getState() == 0) {
                throw new IllegalMonitorStateException();
            }
            setExclusiveOwnerThread(null);
            setState(0);
            return true;
        }
        // 返回一个Condition,每一个condition都包含了一个condition队列
        Condition newCondition() {
            return new ConditionObject();
        }
    }
​
​
    // 通过Sync进行代理操作,实现Lock接口的API
    private final Sync sync = new Sync();
​
    // 获取锁
    @Override
    public void lock() {
        sync.acquire(1);
    }
​
    // 可中断地获取锁
    @Override
    public void lockInterruptibly() throws InterruptedException {
        sync.acquireInterruptibly(1);
    }
​
    // 尝试非阻塞的获取锁
    @Override
    public boolean tryLock() {
        return sync.tryAcquire(1);
    }
​
    /*
    超时的获取锁,当前线程在以下3种情况下会返回:
    1.当线程在超时时间获得了锁
    2.当线程在超时时间被中断
    3.超时时间结束,返回false
     */
    @Override
    public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
        return sync.tryAcquireNanos(1, unit.toNanos(time));
    }
​
    // 释放锁
    @Override
    public void unlock() {
        sync.release(1);
    }
​
    /*
    获取等待通知组件,该组件和当前的锁绑定,当前线程只有获得了锁,才能调用该组件的wait()方法,而调用用,当前线程将释放锁
     */
    @Override
    public Condition newCondition() {
        return sync.newCondition();
    }
}
AQS之同步队列结构

同步器依赖内部的同步队列(一个FIFO双向队列)来完成同步状态的管理,当前线程获取同步状态失败时,同步器会将当前线程以及等待状态等信息构造成为一个节点(Node)并将其 加入同步队列,同时会阻塞当前线程,当同步状态释放时,会把首节点中的线程唤醒,使其再次尝试获取同步状态。

节点属性:

属性类型与名称描述
int waitStatus等待状态,包含如下状态: CANCELLED:值为1,表示节点已取消,通常是因为线程被中断或者等待超时而被取消。 SIGNAL:值为-1,表示后继节点需要被唤醒,即当前节点的释放(signal)会通知后继节点继续尝试获取锁或资源。 CONDITION:值为-2,表示节点在等待队列中,节点线程等待在Condition上,当其他线程对Condition调用了signal()方法后,该节点将会从等待队列中转移到同步队列中,加入到对同步状态的获取中 PROPAGATE:值为-3,表示释放共享锁时需要向后继节点传播共享性质,以确保后继节点可以被唤醒。这在CountDownLatch等场景中会使用到。 INITIAL:值为0,初始状态。
Node prev前驱节点,当节点加入同步队列时被设置(尾部添加)
Node next后继节点
Node nextWaiter等待队列中的后继节点。如果当前节点是共享的,那么这个字段将是一个SHARED常量,也就是说节点类型(独占和共享)和等待队列中的后继节点共用同一个字段
Thread thread获取同步状态的线程

节点是构成同步队列的基础,同步器拥有首节点(head)和尾结点(tail),没有成功获取同步状态的线程会成为节点加入该队列的尾部。

  • 当一个线程成功地获取了同步状态(或者锁),其他线程将无法获取到同步状态,转而被构造成为节点并加入到同步队列中,而这个加入队列的过程必须要保证线程安全,因此同步器提供了一个基于CAS的设置尾节点的方法:compareAndSetTail(Node expect,Node update),它需要传递当前线程“认为”的尾节点和当前节点,只有设置成功后,当前节点才正式与之前的尾节点建立关联。

        

  • 首节点是获取同步状态成功的节点,首节点的线程在释放同步状态时,将会唤醒后继节点,而后继节点将会在获取同步状态成功时将自己设置为首节点,设置首节点是通过获取同步状态成功的线程来完成的,由于只有一个线程能够成功获取到同步状态,因此设置头节点的方法并不需要使用CAS来保证,它只需要将首节点设置成为原首节点的后继节点并断开原首节点的next引用即可

解析AQS实现

以ReentrantLock的非公平锁为例,看看lock的实现。

  1. ReentrantLock.lock()—获取锁的入口

        public void lock() {
            sync.lock();
        }

    sync 实际上是一个抽象的静态内部类,它继承了 AQS 来实现重入锁的逻辑。

    Sync 有两个具体的实现类,分别是: NofairSync:表示可以存在抢占锁的功能,也就是说不管当前队列上是否存在其他 线程等待,新线程都有机会抢占锁 FailSync: 表示所有线程严格按照 FIFO 来获取锁。

    ReentrantLock的无参构造函数默认创建的是非公平锁。

    public ReentrantLock() {
            sync = new NonfairSync();
        }
  2. NonfairSync.lock()—获取同步状态/锁。

    static final class NonfairSync extends Sync {
            private static final long serialVersionUID = 7316153563782823691L;
    ​
            /**
             * Performs lock.  Try immediate barge, backing up to normal
             * acquire on failure.
             */
            final void lock() {
                if (compareAndSetState(0, 1))
                    setExclusiveOwnerThread(Thread.currentThread());
                else
                    acquire(1);
            }
    ​
            protected final boolean tryAcquire(int acquires) {
                return nonfairTryAcquire(acquires);
            }
        }
    1. 非公平锁的特点:抢占锁的逻辑,不管有没有线程排队,上来先CAS抢占一下。

    2. CAS成功,表示成功获得锁。

    3. CAS失败,调用获取独占锁acquire()走锁竞争逻辑。

  3. AQS.acquire(1)—尝试获取独占锁or加入同步队列自旋获取锁。

    public final void acquire(int arg) {
            if (!tryAcquire(arg) &&
                acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
                selfInterrupt();
        }
    1. 通过 tryAcquire 尝试获取独占锁,如果成功返回 true,失败返回 false

    2. 如果 tryAcquire 失败,则会通过 addWaiter 方法将当前线程封装成 Node 添加 到 AQS 队列尾部。

    3. acquireQueued(),将 Node 作为参数,通过自旋去尝试获取锁。

  4. NonfairSync.tryAcquire(1)—尝试获取独占锁

    protected final boolean tryAcquire(int acquires) {
            return nonfairTryAcquire(acquires);
        }

    它是重写 AQS 类中的 tryAcquire 方法

  5. ReentrantLock.nofairTryAcquire(1)—尝试获取独占锁

    final boolean nonfairTryAcquire(int acquires) {
                final Thread current = Thread.currentThread(); // 获取当前执行的线程
                int c = getState(); // 获取state的值
                if (c == 0) { // 表示无锁状态
                    if (compareAndSetState(0, acquires)) { // CAS替换state的值,case成功表示获取锁成功
                        setExclusiveOwnerThread(current); // 保存当前获得锁的线程,下次再来的时候不用尝试竞争锁
                        return true;
                    }
                }
                else if (current == getExclusiveOwnerThread()) { // 如果同一线程竞争锁,直接增加重入次数
                    int nextc = c + acquires;
                    if (nextc < 0) // overflow
                        throw new Error("Maximum lock count exceeded");
                    setState(nextc);
                    return true;
                }
                return false;
            }
    1. 获取当前线程,判断当前的锁的状态

    2. 如果 state=0 表示当前是无锁状态,通过 cas 更新 state 状态的值

    3. 当前线程是属于重入,则增加重入次数

  6. AQS.addWaiter(Node.EXCLUSIVE) —线程构造成节点加入同步队列 (static final Node EXCLUSIVE = null;)

        private Node addWaiter(Node mode) {
            Node node = new Node(Thread.currentThread(), mode);
            // Try the fast path of enq; backup to full enq on failure
            Node pred = tail;
            if (pred != null) {
                node.prev = pred;
                if (compareAndSetTail(pred, node)) {
                    pred.next = node;
                    return node;
                }
            }
            enq(node);
            return node;
        }

    当 tryAcquire 方法获取锁失败以后,则会先调用 addWaiter 将当前线程封装成 Node.

    入参 mode 表示当前节点的状态,传递的参数是 Node.EXCLUSIVE,表示独占状 态。意味着重入锁用到了 AQS 的独占锁功能

    1. 将当前线程封装成 Node

    2. 当前链表中的 tail 节点是否为空,如果不为空,则通过 cas 操作把当前线程的 node 添加到 AQS 队列

    3. 如果为空或者 cas 失败,调用 enq 将节点添加到 AQS 队列

  7. enq(node)—通过自旋把当前节点加入到队列中

    private Node enq(final Node node) {
            for (;;) {
                Node t = tail;
                if (t == null) { // Must initialize
                    if (compareAndSetHead(new Node()))
                        tail = head;
                } else {
                    node.prev = t;
                    if (compareAndSetTail(t, node)) {
                        t.next = node;
                        return t;
                    }
                }
            }
        }

    图解分析:

    8.AQS.acquireQueued(node, 1)—把node加入到链表去争抢锁

    1. 获取当前节点的 prev 节点

    2. 如果 prev 节点为 head 节点,那么它就有资格去争抢锁,调用 tryAcquire 抢占锁

    3. 抢占锁成功以后,把获得锁的节点设置为 head,并且移除原来的初始化 head 节点

    4. 如果获得锁失败,则根据 waitStatus 决定是否需要挂起线程

    5. 最后,通过 cancelAcquire 取消获得锁的操作

        final boolean acquireQueued(final Node node, int arg) {
            boolean failed = true;
            try {
                boolean interrupted = false;
                for (;;) {
                    final Node p = node.predecessor(); // 获取当前节点的prev节点
                    if (p == head && tryAcquire(arg)) { // 如果是head节点,说明有资格去争抢锁
                        setHead(node); // 获取锁成功,也就是ThreadA已经释放了锁,然后设置head为ThreadB获得执行权限
                        p.next = null; // help GC
                        failed = false;
                        return interrupted;
                    }
                    // ThreadA可能还没释放锁,使得ThreadB在执行tryAcquire返回false
                    if (shouldParkAfterFailedAcquire(p, node) &&
                        parkAndCheckInterrupt())
                        interrupted = true;
                }
            } finally {
                if (failed)
                    cancelAcquire(node);
            }
        }

    9.shouldParkAfterFailedAcquire—竞争锁失败后应该挂起

    这个方法的主要作用是,通过 Node 的状态来判断,ThreadA 竞争锁失败以后是 否应该被挂起。

    1. 如果 ThreadA 的 pred 节点状态为 SIGNAL,那就表示可以放心挂起当前线程

    2. 通过循环扫描链表把 CANCELLED 状态的节点移除

    3. 修改 pred 节点的状态为 SIGNAL,返回 false.

    4. 返回 false 时,也就是不需要挂起,返回 true,则需要调用 parkAndCheckInterrupt 挂起当前线程

    private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) { // pred是前置节点
            int ws = pred.waitStatus; // 前置节点的waitStatus
            if (ws == Node.SIGNAL) // 如果前置节点为 SIGNAL,意味着只需要等待其前置节点的线程被释放
                return true;
            if (ws > 0) { // ws大于 0,意味着prev节点取消了排队,直接移除这个节点就行
                do {
                    node.prev = pred = pred.prev;
                } while (pred.waitStatus > 0);
                pred.next = node;
            } else {
                compareAndSetWaitStatus(pred, ws, Node.SIGNAL); // 利用cas设置prev节点的状态为SIGNAL(-1)
            }
            return false;
        }

    图解分析:

    waitStatus = -1(SIGNAL:值为-1,表示后继节点需要被唤醒,即当前节点的释放会通知后继节点继续尝试获取锁或资源。)

    10.parkAndCheckInterrupt

    Thread.interrupted,返回当前线程是否被其他线程触发过中断请求,也就是thread.interrupt(); 如果有触发过中断请求,那么这个方法会返回当前的中断标识 true,并且对中断标识进行复位标识已经响应过了中断请求。如果返回 true,意味着在acquire方法中会执行 selfInterrupt()。

        private final boolean parkAndCheckInterrupt() {
            LockSupport.park(this);
            return Thread.interrupted(); // 1.中断 2.复位
        }

    selfInterrupt: 标识如果当前线程在 acquireQueued 中被中断过,则需要产生一 个中断请求,原因是线程在调用 acquireQueued 方法的时候是不会响应中断请求的。

    11.ReentrantLock.unlock()—锁释放

        public void unlock() {
            sync.release(1);
        }
        public final boolean release(int arg) {
            if (tryRelease(arg)) { // 释放锁成功
                Node h = head; // 获取aqs中的head节点
                if (h != null && h.waitStatus != 0) // 如果head节点不为空且状态!=0.调用unparkSuccessor(h)唤醒后续节点
                    unparkSuccessor(h);
                return true;
            }
            return false;
        }

    12.ReentrantLock.tryRelease()—设置锁状态

    这个方法可以认为是一个设置锁状态的操作,通过将 state 状态减掉传入的参数值 (参数是 1),如果结果状态为 0,就将排它锁的 Owner 设置为 null,以使得其它 的线程有机会进行执行。 在排它锁中,加锁的时候状态会增加 1(当然可以自己修改这个值),在解锁的时 候减掉 1,同一个锁,在可以重入后,可能会被叠加为 2、3、4 这些值,只有 unlock() 的次数与 lock()的次数对应才会将 Owner 线程设置为空,而且也只有这种情况下才会返回 true.

            protected final boolean tryRelease(int releases) {
                int c = getState() - releases;
                if (Thread.currentThread() != getExclusiveOwnerThread())
                    throw new IllegalMonitorStateException();
                boolean free = false;
                if (c == 0) {
                    free = true;
                    setExclusiveOwnerThread(null);
                }
                setState(c);
                return free;
            }

    13.AQS.unparkSuccessor()—唤醒后续节点

    private void unparkSuccessor(Node node) {
            int ws = node.waitStatus; // 获取head节点的状态
            if (ws < 0)
                compareAndSetWaitStatus(node, ws, 0); // 设置head节点状态为0
            Node s = node.next; // 得到head节点的下一个节点
            //如果下一个节点为 null 或者 status>0 表示 cancelled 状态.
            if (s == null || s.waitStatus > 0) { 
                s = null;
                //通过从尾部节点开始扫描,找到距离head最近的一个waitStatus<=0 的节点
                for (Node t = tail; t != null && t != node; t = t.prev)
                    if (t.waitStatus <= 0)
                        s = t;
            }
            if (s != null) // next节点不为空,直接唤醒这个线程即可
                LockSupport.unpark(s.thread);
        }

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1049238.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

TikTok美国市场爆品:美牙仪一周售出3.36万单,GMV近百万刀

最近一周&#xff0c;超店有数洞察到TikTok Shop美国市场出现一款爆火美牙仪&#xff0c;该款商品售价为31.95美金&#xff0c;佣金比率为25%&#xff0c;一周内销量达3.36万单&#xff0c;GMV近94万美金。自今年7月底上架以来在TikTok上关联视频播放量高达140W&#xff0c;属于…

企业长假期间如何应对突发业务需求?提前部署远程控制为上策

没有人想在长假期间加班&#xff0c;包括管理层也是一样的。但客观来说&#xff0c;很多企业的业务在假期中也是不能中断的&#xff0c;如果业务线遇到紧急需要处理的问题&#xff0c;有没有办法不用长途跋涉跑回公司一趟呢&#xff1f;远程控制现在就是很多企业的选择。 时值…

静态住宅代理是什么?为什么要选择它?

静态住宅代理是互联网服务提供商(ISP)分配的住宅ISP代理。正如名称“静态”所指&#xff0c;他的IP永久不会变化。在当今的数字时代&#xff0c;数据安全、隐私和在线访问已变得至关重要&#xff0c;具有无限带宽的静态住宅代理提供了出色的解决方案。下面给大家具体介绍。 一、…

Django实战项目-学习任务系统-需求说明

一&#xff0c;需求说明   在我最近的阅读中&#xff0c;我深深被一些关于智能或系统的小说吸引。这些小说的主角意外获得某种神秘的智能或系统&#xff0c;然后通过完成系统发布的各种任务&#xff0c;逐渐提升自己的知识和能力。即使是普通的屌丝&#xff0c;也能在系统的管…

win10环境mysql8.10免安装版本配置

MySQL :: Download MySQL Community Server 下载免安装包 解压到相应目录。 以管理员身份启动cmd net start mysql 服务无法启动。 运行mysqld --initialize --console初始化 生成临时密码 验证临时密码并登录测试 mysql -u root -p出错 启动 net start mysql 运行phpmya…

Leetcode684. 冗余连接

Every day a Leetcode 题目来源&#xff1a;684. 冗余连接 解法1&#xff1a;并查集 因为需要判断是否两个节点被重复连通&#xff0c;所以我们可以使用并查集来解决此类问题。 代码&#xff1a; /** lc appleetcode.cn id684 langcpp** [684] 冗余连接*/// lc codestart…

PID温度控制器,全球市场总体规模,前17大厂商排名及市场份额

PID温度控制器全球市场总体规模 PID温度控制器是一种常用的温度控制设备&#xff0c;能够通过使用比例、积分和微分控制算法来实现精确的温度调节。它可以监测和调整温度&#xff0c;保持设定的温度稳定。PID代表比例、积分和微分&#xff0c;比例&#xff08;P&#xff09;控…

【Verilog教程】6.7 Verilog流水线

关键词&#xff1a;流水线&#xff0c;乘法器 硬件描述语言的一个突出优点就是指令执行的并行性。多条语句能够在相同时钟周期内并行处理多个信号数据。 但是当数据串行输入时&#xff0c;指令执行的并行性并不能体现出其优势。而且很多时候有些计算并不能在一个或两个时钟周期…

linux权限机制,

目录 用户与组,id,passwd 查看登录用户whomi,who,w 创建用户 useradd 修改用户信息usermod 删除指定用户userdel 组 ​编辑创建修改删除组groupadd groupmod groupdel 权限 ls-l 修改文件所属用户&#xff0c;所属组 chown,chgrp(change group) 修改权限 chmod 默认权…

针对http接口进行测试,使用Jmeter工具实现

前言&#xff1a; 本文主要针对http接口进行测试&#xff0c;使用Jmeter工具实现。 Jmter工具设计之初是用于做性能测试的&#xff0c;它在实现对各种接口的调用方面已经做的比较成熟&#xff0c;因此&#xff0c;本次直接使用Jmeter工具来完成对Http接口的测试。 一、开发接口…

26381-2011 合成纤维丝织坯绸 阅读笔记

声明 本文是学习GB-T 26381-2011 合成纤维丝织坯绸. 而整理的学习笔记,分享出来希望更多人受益,如果存在侵权请及时联系我们 1 范围 本标准规定了合成纤维丝织坯绸的术语和定义、要求、试验方法、检验规则、包装和标志。 本标准适用于评定各类合成纤维丝织坯绸品质。 2 规…

需求堆积,如何排序产品优先极

面对堆积的产品需求&#xff0c;到底该如何排序优先极呢&#xff1f; 需求排期的目标 在谈具体的排期方法之前&#xff0c;有必要先探讨一下——合理的需求排期应该达到什么的目标呢&#xff1f;如果站在与项目相关的利益人员的角度来看&#xff0c;至少应该使以下四方面的收…

消化酶在婴幼儿中的重要作用

婴幼儿的健康和发育是每个家庭都格外关心的事情。良好的营养对于婴幼儿的生长和发育至关重要。然而&#xff0c;在婴幼儿的早期生活阶段&#xff0c;由于其胃肠道系统尚未充分发育&#xff0c;他们对于食物的消化和吸收能力有限。在这个过程中&#xff0c;消化酶扮演了至关重要…

C# Task任务详解

文章目录 前言Task返回值无参返回有参返回 async和await返回值await搭配使用Main async改造 Task进阶Task线程取消测试用例超时设置 线程暂停和继续测试用例 多任务等最快多任务全等待 结论 前言 Task是对于Thread的封装&#xff0c;是极其优化的设计&#xff0c;更加方便了我…

张量-数值操作函数

tf.ones(shape,dtype)该函数可以按指定类型与形状生成值为1的张量。 示例代码如下: import tensorflow.compat.v1 as tf tf.disable_v2_behavior()temp tf.ones([2,3],tf.int32)with tf.Session() as sess:print(sess.run(temp)) tf.ones_like(input)该函数可生成和输入张量…

基于SpringBoot+Bootstrap的旅游管理系统的设计与实现

目录 前言 一、技术栈 二、系统功能介绍 登录模块的实现 景点信息管理界面 订票信息管理界面 用户评价管理界面 用户管理界面 景点资讯界面 系统主界面 用户注册界面 景点信息详情界面 订票信息界面 三、核心代码 1、登录模块 2、文件上传模块 3、代码封装 前言…

OS 模拟进程状态转换

下面的这个博主写的很好 但是他给的代码print部分和语言风格python三识别不了 这个特别感谢辰同学帮我调好了代码 我放在主页上了 估计过两天就可以通过了 《操作系统导论》实验一&#xff1a;模拟进程状态转换_process-run.py-CSDN博客 这个补充一下他没有的&#xff1a;OS…

深入props --React进阶指南笔记

一次render的过程&#xff1a; 调用React.createElement形成新的element过程&#xff0c;新的element上就会有新的props属性&#xff08;即重新渲染视图的关键&#xff09;。 来看一个demo&#xff1a; /* children 组件 */ function ChidrenComponent(){return <div> I…

自研多模态追踪算法 PICO 为「手柄小型化」找到新思路

作者&#xff1a;张韬、林泽一 、闻超 、赵洋 研发背景 作为头戴的追踪配件&#xff0c;VR手柄可以通过HMD&#xff08;头戴显示设备&#xff09;的inside-out光学追踪定位原理&#xff0c;计算出手柄的空间运动轨迹&#xff0c;同时结合6轴传感器实现6DoF空间定位。与此同时&a…

Stm32_标准库_1_GPIOA初始化

代码&#xff1a; #include "stm32f10x.h" // Device headerGPIO_InitTypeDef GPIO_InitStructur;//定义变量结构体int main(void){/*使用RCC开启GPIO的时钟*/RCC_APB2PeriphClockCmd(RCC_APB2Periph_GPIOA, ENABLE);//开启PA端口时钟/*使用GPIO_I…