深入浅出理解Java并发AQS的共享锁模式

news2025/2/5 9:12:47

自定义共享锁例子

首先我们通过AQS实现一个非常最最最轻量简单的共享锁例子,帮助大家对共享锁有一个整体的感知。

@Slf4j
public class ShareLock {

    /**
     * 共享锁帮助类
     */
    private static class ShareSync extends AbstractQueuedSynchronizer {

        private int lockCount;

        /**
         * 创建共享锁帮助类,最多有count把共享锁,超过了则阻塞
         *
         * @param count 共享锁数量
         */
        public ShareSync(int count) {
           this.lockCount = count;
        }

        /**
         * 尝试获取共享锁
         *
         * @param arg 每次获取锁的数量
         * @return 返回正数,表示后续其他线程获取共享锁可能成功; 返回0,表示后续其他线程无法获取共享锁;返回负数,表示当前线程获取共享锁失败
         */
        @Override
        protected int tryAcquireShared(int arg) {
            // 自旋
            for (;;) {
                int c = getState();
                // 如果持有锁的数量大于指定数量,返回-1,线程进入阻塞
                if(c >= lockCount) {
                    return -1;
                }
                int nextc = c + 1;
                // cas设置成功,返回1,获取到共享锁
                if (compareAndSetState(c, nextc)) {
                    return 1;
                }
            }
        }

        /**
         * 尝试释放共享锁
         *
         * @param arg 释放锁的数量
         * @return 如果释放后允许唤醒后续等待结点返回true,否则返回false
         */
        @Override
        protected boolean tryReleaseShared(int arg) {
            // 自旋操作
            for (; ; ) {
                int c = getState();
                // 如果没有锁了
                if (c == 0) {
                    return false;
                }
                // 否则锁量-1
                int nextc = c - 1;
                // cas修改状态
                if (compareAndSetState(c, nextc)) {
                    return true;
                }
            }
        }
    }

    private final ShareSync sync;

    public ShareLock(int count) {
        this.sync = new ShareSync(count);
    }

    /**
     * 加共享锁
     */
    public void lockShare() {
        sync.acquireShared(1);
    }

    /**
     * 释放共享锁
     */
    public void releaseShare() {
        sync.releaseShared(1);
    }
}
复制代码
  • 创建内部类共享帮助锁ShareSync类,继承自AbstractQueuedSynchronizer类,实现了共享锁相关的方法tryAcquireShared()tryReleaseShared()
  • 创建ShareLock,提供了lockShare()加锁和releaseShare()两个API。

验证:

public static void main(String[] args) throws InterruptedException {
        ShareLock shareLock = new ShareLock(3);
        for (int i = 0; i < 5; i++) {
            new Thread(() -> {
                shareLock.lockShare();
                try {
                    log.info("lock success");
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } finally {
                    shareLock.releaseShare();
                    log.info("release success");
                }

            }, "thread-" + i).start();
        }
        Thread.sleep(10000);
    }
复制代码
  • 一共创建最多共同有3个线程共享的共享锁。
  • 创建5个线程去竞争共享锁。

运行结果:

  • 运行结果显示每次最多只有3个lock success,说明同时只有3个线程共享。
  • 只有在释放共享锁以后,其他线程才能获取锁。

下面对它的实现原理一探究竟。

核心原理机制

共享模式也是由AQS提供的,首先我们关注下AQS的数据结构。

AQS内部维护了一个volatile int state(代表共享资源)和一个FIFO线程等待队列(多线程争用资源被阻塞时会进入此队列)。

AQS作为一个抽象方法,提供了加锁、和释放锁的框架,这里采用的模板方模式,在上面中提到的tryAcquireSharedtryReleaseShared就是和共享模式相关的模板方法。

方法名描述
protected int tryAcquireShared(int arg)共享方式。arg为获取锁的次数,尝试获取资源。负数表示失败;0表示成功,但没有剩余可用资源;正数表示成功,且有剩余资源。
protected boolean tryReleaseShared(int arg)共享方式。arg为释放锁的次数,尝试释放资源,如果释放后允许唤醒后续等待结点返回True,否则返回False。

共享模式的入口方法如下:

方法名描述
void acquireShared(int arg)共享模式获取锁,不响应中断。
void acquireSharedInterruptibly(int arg)共享模式获取锁,响应中断。
tryAcquireSharedNanos(int arg, long nanosTimeout)尝试在共享模式下获取锁,如果中断则中止,如果超过给定超时则失败。
boolean releaseShared(int arg)共享模式下释放锁。

源码解析

上图是AQS的类结构图,其中标红部分是组成AQS的重要成员变量。

成员变量

  1. state共享变量

AQS中里一个很重要的字段state,表示同步状态,是由volatile修饰的,用于展示当前临界资源的获锁情况。通过getState(),setState(),compareAndSetState()三个方法进行维护。

关于state的几个要点:

  • 使用volatile修饰,保证多线程间的可见性。
  • getState()、setState()、compareAndSetState()使用final修饰,限制子类不能对其重写。
  • compareAndSetState()采用乐观锁思想的CAS算法,保证原子性操作。
  1. CLH队列(FIFO队列)

AQS里另一个重要的概念就是CLH队列,它是一个双向链表队列,其内部由head和tail分别记录头结点和尾结点,队列的元素类型是Node。

private transient volatile Node head;
private transient volatile Node tail;
复制代码

Node的结构如下:

static final class Node {
    //共享模式下的等待标记
    static final Node SHARED = new Node();
    //独占模式下的等待标记
    static final Node EXCLUSIVE = null;
    //表示当前结点已取消调度。当timeout或被中断(响应中断的情况下),会触发变更为此状态,进入该状态后的结点将不会再变化。
    static final int CANCELLED =  1;
    //表示后继结点在等待当前结点唤醒。后继结点入队时,会将前继结点的状态更新为SIGNAL。
    static final int SIGNAL    = -1;
    //表示结点等待在Condition上,当其他线程调用了Condition的signal()方法后,CONDITION状态的结点将从等待队列转移到同步队列中,等待获取同步锁。
    static final int CONDITION = -2;
    //共享模式下,前继结点不仅会唤醒其后继结点,同时也可能会唤醒后继的后继结点。
    static final int PROPAGATE = -3;
    //状态,包括上面的四种状态值,初始值为0,一般是节点的初始状态
    volatile int waitStatus;
    //上一个节点的引用
    volatile Node prev;
    //下一个节点的引用
    volatile Node next;
    //保存在当前节点的线程引用
    volatile Thread thread;
    //condition队列的后续节点
    Node nextWaiter;
}
复制代码

注意,waitSstatus负值表示结点处于有效等待状态,而正值表示结点已被取消。所以源码中很多地方用>0、<0来判断结点的状态是否正常。

  1. exclusiveOwnerThread

AQS通过继承AbstractOwnableSynchronizer类,拥有的属性。表示独占模式下同步器持有的线程。

共享锁获取acquireShared(int)

acquireShared(int)是共享锁模式下线程获取共享资源的入口方法,它会获取指定量的资源,获取成功则直接返回,获取失败则进入等待队列,直到获取到资源为止,整个过程无法响应中断。

public final void acquireShared(int arg) {
    if (tryAcquireShared(arg) < 0)
        doAcquireShared(arg);
}
复制代码

方法的整体流程如下:

  1. tryAcquireShared()尝试获取资源,需要自定义同步器去实现,返回负值代表获取失败;0代表获取成功,但没有剩余资源;正数表示获取成功,还有剩余资源,其他线程还可以去获取。
  2. 如果失败则通过doAcquireShared()进入等待队列,直到获取到资源为止才返回。

doAcquireShared(int)

此方法用于将当前线程加入等待队列尾部休息,直到其他线程释放资源唤醒自己,自己成功拿到相应量的资源后才返回。

private void doAcquireShared(int arg) {
    //封装线程为共享Node 加入队列尾部
    final Node node = addWaiter(Node.SHARED);
    //是否成功标志
    boolean failed = true;
    try {
        //等待过程中是否被中断过的标志
        boolean interrupted = false;
        // 自旋操作
        for (;;) {
            // 获取前驱节点
            final Node p = node.predecessor();
            //如果到head的下一个,因为head是拿到资源的线程,此时node被唤醒,很可能是head用完资源来唤醒自己的
            if (p == head) {
                //尝试获取资源
                int r = tryAcquireShared(arg);
                //成功
                if (r >= 0) {
                    //将head指向自己,还有剩余资源可以再唤醒之后的线程
                    setHeadAndPropagate(node, r);
                    p.next = null; // help GC
                    //如果等待过程中被打断过,此时将中断补上。
                    if (interrupted)
                        selfInterrupt();
                    failed = false;
                    return;
                }
            }

            //判断状态,寻找安全点,进入waiting状态,等着被unpark()或interrupt()
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                interrupted = true;
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}
复制代码

doAcquireShared方法的实现和获取独占锁中的acquireQueued方法很类似,但是主要有一点不同,那就是线程在被唤醒后,若成功获取到了共享锁,还需要判断共享锁是否还能被其他线程获取,若可以,则继续向后唤醒它的下一个节点对应的线程。

setHeadAndPropagate(Node, int)

该方法主要将当前节点设置为头节点,同时判断条件是否符合(比如还有剩余资源),还会去唤醒后继结点,毕竟是共享模式。

private void setHeadAndPropagate(Node node, int propagate) {
    Node h = head;
    //head指向自己
    setHead(node);
     //如果还有剩余量,继续唤醒下一个邻居线程
    if (propagate > 0 || h == null || h.waitStatus < 0) {
        Node s = node.next;
        if (s == null || s.isShared())
            // 唤醒操作
            doReleaseShared();
    }
}
复制代码

共享释放releaseShared(int)

releaseShared(int)是共享模式下线程释放共享资源的入口,它会释放指定量的资源,如果成功释放且允许唤醒等待线程,它会唤醒等待队列里的其他线程来获取资源。

public final boolean releaseShared(int arg) {
    //尝试释放资源
    if (tryReleaseShared(arg)) {
        //唤醒后继结点
        doReleaseShared();
        return true;
    }
    return false;
}
复制代码

方法的整体流程如下:

  • tryReleaseShared尝试释放锁,这由自定义同步器去实现, 返回true表示释放成功。
  • doReleaseShared唤醒后续队列中等待的节点,

doReleaseShared()

此方法主要用于唤醒队列中等待的共享节点。

private void doReleaseShared() {
    // 自旋操作
    for (;;) {
        // 获取头节点
        Node h = head;
        if (h != null && h != tail) {
            // 获取节点的等待状态
            int ws = h.waitStatus;
            // 如果节点等待状态是-1, -1表示有责任唤醒后续节点的状态
            if (ws == Node.SIGNAL) {
                // cas修改当前节点的等待状态为0
                if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                    continue;
                //唤醒后续节点
                unparkSuccessor(h);
            }
            else if (ws == 0 &&
                     !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                continue;
        }
        if (h == head)// head发生变化
            break;
    }
}
复制代码
  • 逻辑是一个死循环,每次循环中重新读取一次head,然后保存在局部变量h中,再配合if(h == head) break;,这样,循环检测到head没有变化时就会退出循环。注意,head变化一定是因为:acquire thread被唤醒,之后它成功获取锁,然后setHead设置了新head。而且注意,只有通过if(h == head) break;即head不变才能退出循环,不然会执行多次循环。
  • if (h != null && h != tail)判断队列是否至少有两个node,如果队列从来没有初始化过(head为null),或者head就是tail,那么中间逻辑直接不走,直接判断head是否变化了。
  • 如果队列中有两个或以上个node,那么检查局部变量h的状态:
    • 如果状态为SIGNAL,说明h的后继是需要被通知的。通过对CAS操作结果取反,将compareAndSetWaitStatus(h, Node.SIGNAL, 0)unparkSuccessor(h)绑定在了一起。说明了只要head成功得从SIGNAL修改为0,那么head的后继的代表线程肯定会被唤醒了。
    • 如果状态为0,说明h的后继所代表的线程已经被唤醒或即将被唤醒,并且这个中间状态即将消失,要么由于acquire thread获取锁失败再次设置head为SIGNAL并再次阻塞,要么由于acquire thread获取锁成功而将自己(head后继)设置为新head并且只要head后继不是队尾,那么新head肯定为SIGNAL。所以设置这种中间状态的head的status为PROPAGATE,让其status又变成负数,这样可能被被唤醒线程检测到。
  • 如果状态为PROPAGATE,直接判断head是否变化。
  • 两个continue保证了进入那两个分支后,只有当CAS操作成功后,才可能去执行if(h == head) break;,才可能退出循环。
  • if(h == head) break;保证了,只要在某个循环的过程中有线程刚获取了锁且设置了新head,就会再次循环。目的当然是为了再次执行unparkSuccessor(h),即唤醒队列中第一个等待的线程。

总结

本文主要讲解了AQS的共享模式,通过一个自定义简单的demo帮助大家深入浅出的理解,同时深入分析了源码实现,希望对大家有帮助。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/103229.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

C. Sequence Pair Weight(数学贡献法)

Problem - 1527C - Codeforces 序列的权重被定义为具有相同值&#xff08;aiaj&#xff09;的无序索引对&#xff08;i,j&#xff09;&#xff08;这里i<j&#xff09;的数量。例如&#xff0c;序列a[1,1,2,2,1]的权重是4&#xff0c;具有相同值的无序索引对的集合是&#x…

结构篇-适配器模式

文章目录一、跨越鸿沟靠适配二、插头与插孔的冲突1.三相插孔接口2.两相插孔接口3.电视机机类TV4.客户端类二、通用适配1. 适配器2.客户端类三、专属适配1.电视机专属适配器2.客户端类总结1. 对象适配器2. 类适配器总结适配器模式(Adapter)通常也被称为转换器&#xff0c;顾名思…

数字孪生炒得火热,但好像对企业发展还没有任何实质性的突破,是否只是表面功夫?

首先&#xff0c;什么是数字孪生&#xff1f; ​ 数字孪生是充分利用物理模型、传感器更新、运行历史等数据&#xff0c;集成多学科、多物理量、多尺度、多概率的仿真过程&#xff0c;在虚拟空间中完成映射&#xff0c;从而反映相对应的实体装备的全生命周期过程。数字孪生是一…

const成员,流插入,流提取重载,初始化列表!(6千字长文详解!)

c详解之const成员&#xff0c;流插入&#xff0c;流提取重载&#xff0c;初始化列表&#xff01; 文章目录c详解之const成员&#xff0c;流插入&#xff0c;流提取重载&#xff0c;初始化列表&#xff01;<< 流插入 和 >> 流提取的重载解决共有成员函数问题链式访问…

实用!7个强大的Python机器学习库!⛵

&#x1f4a1; 作者&#xff1a;韩信子ShowMeAI &#x1f4d8; 机器学习实战系列&#xff1a;https://www.showmeai.tech/tutorials/41 &#x1f4d8; 本文地址&#xff1a;https://www.showmeai.tech/article-detail/412 &#x1f4e2; 声明&#xff1a;版权所有&#xff0c;转…

如何借助扬尘视频监测系统开展扬尘污染防控工作?

一、方案背景 目前&#xff0c;跟随国家快速发展的步伐&#xff0c;城市化建设也在飞速发展&#xff0c;各种建设工程遍地开花。如何更好抵管理施工扬尘&#xff0c;杜绝各种违规及不文明现象&#xff0c;一直是施工企业、政府管理部门关注的焦点。 二、系统介绍 环境扬尘视频…

手把手教你一套完善且高效的k8s离线部署方案

作者&#xff1a;郝建伟 背景 面对更多项目现场交付&#xff0c;偶而会遇到客户环境不具备公网条件&#xff0c;完全内网部署&#xff0c;这就需要有一套完善且高效的离线部署方案。 系统资源 编号主机名称IP资源类型CPU内存磁盘01k8s-master110.132.10.91CentOS-74c8g40g0…

SkeyeVSS储备地块可视化管理信息系统 助力土地批后全程监管解决方案

一、方案背景 近年来&#xff0c;储备地块经常遭遇倾倒渣土、隐蔽性私搭乱建等违法行为的侵害&#xff0c;在违法行为发生之后又面临追责难度大、效率低的问题&#xff0c;因此可视化监管系统的建设将有效地解决单纯靠人力巡查、巡查时间长、巡查效率低以及发现侵害行为后追责…

阿里云轻量服务器--Docker--Mqtt(eclipse-mosquitto)安装

1 获取镜像&#xff1a; docker pull eclipse-mosquitto:1.6.14查看获取的镜像&#xff1a; docker images 2 安装&#xff1a; 2.1 新建日志和数据目录&#xff1a; # 配置文件存放 mkdir -p /root/mosquitto/config # 数据文件存放 mkdir -p /root/mosquitto/data # 日志文…

安全智能分析技术 神经网络架构搜索

神经网络架构搜索 定义内涵 神经网络架构搜索是为给定数据集自动找到一个或多个架构的任务&#xff0c;这些架构将为给定 的数据集生成具有良好结果的模型&#xff0c;其本质是在高维空间的最优参数搜索问题。 技术背景 深度学习模型的使用越来越大众化&#xff0c;在很多行…

论多线程之中断篇

线程中断一. 启动线程的方式二. 安全中断三. 线程的补充知识3.1 线程常用方法和线程的状态&#xff1a;3.2 线程的优先级概念&#xff1a;一. 启动线程的方式 新启线程的方式 继承Thread类实现Runnable接口&#xff0c;实际上也是通过Thread类来进行线程的操作的 package cn.…

嵌入式:数据处理指令详解

文章目录数据处理指令的特点数据处理指令的汇编格式数据处理指令&#xff0d;指令表&#xff08;1&#xff09;ADD、ADC、SUB、SBC、RSB和RSC&#xff08;2&#xff09;AND、ORR、EOR和BIC&#xff08;3&#xff09;MOV和MVN&#xff08;4&#xff09;CMP和CMN&#xff08;5&am…

如何下载及安装BIGEMAP GIS Office

如何下载及安装BIGEMAP GIS Office 发布时间&#xff1a;2018-01-17 版权&#xff1a; 本产品支持主流winodws操作系统&#xff08;xp sp3,vista,windows 7,windows 8及windows 10 11&#xff09;&#xff0c; 可通过访问Bigemap官网(BIGEMAP-卫星地图_高清卫星地图制图软件_…

791068-69-4,肾素的FRET底物

FRET substrate for renin. excitation at 340 nm, emission at 490 nm.肾素的FRET底物。激发波长为340 nm&#xff0c;发射波长为490 nm。 编号: 182722中文名称: Renin Substrate 1英文名: Renin Substrate 1CAS号: 791068-69-4单字母: H2N-R-E(Edans)-IHPFHLVIHT-K(Dabcyl)-…

PDF如何加密码保护?分享PDF加密的简单方法

PDF 通常是只读的&#xff0c;但如果收件人有特定的编辑软件&#xff0c;它们仍然可以修改&#xff0c;因此当您发送或共享 PDF 文档时&#xff0c;您可能希望使用密码对其进行保护。这样&#xff0c;未经您的许可&#xff0c;任何人都无法读取文件。 如何使用密码保护 pdf 文档…

什么是SD-WAN,它如何改变传统网络?

近年来&#xff0c;网络的构建、管理和运行方式发生了重大变化。许多 IT 管理员现在正在用更高级的网络概念和策略取代传统的网络组件和传统技术。例如&#xff0c;他们越来越依赖网络容器化、自动化、软件定义网络 &#xff08;SDN&#xff09; 和云计算等概念来简化网络。 这…

网络安全和信息化条例

神经网络架构搜索 定义内涵 神经网络架构搜索是为给定数据集自动找到一个或多个架构的任务&#xff0c;这些架构将为给定 的数据集生成具有良好结果的模型&#xff0c;其本质是在高维空间的最优参数搜索问题。 技术背景 深度学习模型的使用越来越大众化&#xff0c;在很多行…

数据处理指令

目录 一、指令 1.1 数据处理指令:数学运算、逻辑运算 1.1.1数据搬移指令 1.1.2机器码 1.1.3立即数 1.1.4数据运算指令基本格式 1.1.5加法指令 1.1.6减法指令、逆向减法指令、乘法指令 1.1.7位运算&#xff08;逻辑运算&#xff09; 1.1.8格式扩展 1.1.9数据运算指令对条…

面试官:Spring Bean的生命周期,你知道吗?

小熊学Java网站&#xff1a;https://javaxiaobear.gitee.io/&#xff0c;每周持续更新干货&#xff0c;建议收藏&#xff01; bean 的生命周期从调用 beanFactory 的 getBean 开始&#xff0c;到这个 bean 被销毁&#xff0c;可以总结为以下七个阶段&#xff1a; 处理名称&…

Aosp系统编译定制系统签名

商业化产品,如果使用默认的签名,一是不安全,而是显得没啥技术。就连谷歌官方也说了,不建议使用testkey作为最终发布版的key,因此,定制系统签名就派上用场了。 具体使用方法谷歌给了一个大致的说法,我们可以在aosp的自述文件中找到,位置位于build\target\product\secur…