AbstractQueuedSynchronizer 独占式源码阅读

news2025/1/12 16:06:15

概述

● 一个int成员变量 state 表示同步状态
● 通过内置的FIFO队列来完成资源获取线程的排队工作

属性

AbstractQueuedSynchronizer属性

   /**
     * 同步队列的头节点     
     */
    private transient volatile Node head;

    /**
     * 同步队列尾节点,enq 加入
     */
    private transient volatile Node tail;

    /**
     * 同步状态
     */
    private volatile int state;

    /**
     * 获取状态
     */
    protected final int getState() {
        return state;
    }

    /**
     * 设置状态
     */
    protected final void setState(int newState) {
        state = newState;
    }

    /**
     * CAS 设置状态
     */
    protected final boolean compareAndSetState(int expect, int update) {
        // See below for intrinsics setup to support this
        return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
    }

    /**
     * The number of nanoseconds for which it is faster to spin
     * rather than to use timed park. A rough estimate suffices
     * to improve responsiveness with very short timeouts.
     */
    static final long spinForTimeoutThreshold = 1000L;

Node 节点属性

 static final class Node {
        /** 共享节点 */
        static final Node SHARED = new Node();
     
        /** 独占节点 */
        static final Node EXCLUSIVE = null;

        // 在同步队列中等待的线程等待超时或被中断, 需要从同步队列中取消等待, 状态不会变化 |
        static final int CANCELLED = 1;
          
        // 后继节点处于等待状态, 当前节点释放了同步状态或者被取消, 通知后续节点, 使后续节点得以运行
        static final int SIGNAL = -1;
        
        // 值为-2, 节点在等待队列, 当其他线程 signal(),从等待队列中移除到同步队列中 |
        static final int CONDITION = -2;
        
        // 值为-3, 下一次共享获取同步状态将会无条件传播下去
        static final int PROPAGATE = -3;

        /**
         * 节点初始状态,初始化为0
         */
        volatile int waitStatus;

        /**
         * 前一个节点
         */
        volatile Node prev;

        /**
         * 后一个节点
         */
        volatile Node next;

        /*
          * 节点的线程
          */
        volatile Thread thread;

        /**
         * 下一个等待者
         */
        Node nextWaiter;

        /**
         * 是否是共享节点
         */
        final boolean isShared() {
            return nextWaiter == SHARED;
        }

        /**
         *  前一个节点
         */
        final Node predecessor() throws NullPointerException {
            Node p = prev;
            if (p == null)
                throw new NullPointerException();
            else
                return p;
        }

        Node() {    // Used to establish initial head or SHARED marker
        }

        Node(Thread thread, Node mode) {     // Used by addWaiter
            this.nextWaiter = mode;
            this.thread = thread;
        }

        Node(Thread thread, int waitStatus) { // Used by Condition
            this.waitStatus = waitStatus;
            this.thread = thread;
        }
    }

常用方法

同步状态的三个方法:
● getState() 获取同步状态
● setState(int newState) 设置当前同步状态
● compareAndSetState(int expect, int update) CAS设置同步状态,原子操作

AbstractQueuedSynchronizer可重写的方法:

方法名称方法描述
boolean tryAcquire(int arg)独占式获取同步状态,查询当前状态是否符合预期,并且CAS设置
boolean tryRelease(int arg)独占式释放同步状态,释放后,等待获取同步状态的线程有机会获取同步状态
int tryAcquireShared(int arg)共享式获取同步状态,如果大于等于0,表示获取成功
boolean tryReleaseShared(int arg)共享式释放同步状态
boolean isHeldExclusively()在独占模式下被线程占用,表示是否被当前线程独占

AbstractQueuedSynchronizer提供的模版方法

方法名称方法描述
boolean acquire(int arg)独占式获取同步状态, 成功返回, 失败队列等待, 调用tryAcquire()
boolean acquireInterruptibly(int arg)acquire 相同, 但是可以中断
int tryAcquireNanos(int arg, long nanos)acquireInterruptibly 基础上增加了超时限制, 超时返回false, 返回true
acquireShared(int arg)共享式获取同步状态, 和acquire差不多, 区别是同一时刻可以有多个线程获取同步状态
acquireSharedInterruptibly(int arg)acquireShared 相同, 但是可以中断
int tryAcquireSharedInterruptibly(int arg, long nanos)acquireSharedInterrup

流程图

在这里插入图片描述

流程图主要方法源码阅读

acquire

独占式获取同步状态, 成功返回, 失败队列等待

public final void acquire(int arg) {
    // tryAcquire获取信号量
    // 如果失败 tryAcquire(arg)=false addWaiter入队列、acquireQueued 排队获取锁
    if (!tryAcquire(arg) &&
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
}

final boolean acquireQueued(final Node node, int arg) {
    boolean failed = true;
    try {
        boolean interrupted = false;
        for (;;) {
            // 前一个节点是头节点 尝试获取锁 获取锁成功 设置自己为头节点 
            final Node p = node.predecessor();
            if (p == head && tryAcquire(arg)) {
                setHead(node);
                p.next = null; // help GC
                failed = false;
                return interrupted;
            }

           // 前面节点设置为 singal,自己就可以睡眠了
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                // 被中断 尝试获取信号量
                interrupted = true;
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

addWaiter

节点进入同步队列

    private Node addWaiter(Node mode) {
        // 创建节点
        Node node = new Node(Thread.currentThread(), mode);
        // Try the fast path of enq; backup to full enq on failure
        Node pred = tail;
        // 尾节点不为空
        if (pred != null) {
            // 设置当前节点的前一个节点为尾节点
            node.prev = pred;
            // cas 设置自己为尾节点
            if (compareAndSetTail(pred, node)) {
                pred.next = node;
                return node;
            }
        }
        // 尾节点为空 或 cas 设置自己为尾节点失败了
        enq(node);
        return node;
    }
    
    /**
     * 入队
     */
    private Node enq(final Node node) {
        for (;;) {
            Node t = tail;
           // 尾节点为空,设置新的头节点
            if (t == null) { // Must initialize
                if (compareAndSetHead(new Node()))
                    tail = head;
            } else {
                // 设置当前节点的前一个节点为尾节点
                node.prev = t;
                // cas 设置自己为尾节点
                if (compareAndSetTail(t, node)) {
                    t.next = node;
                    return t;
                }
            }
        }
    }

shouldParkAfterFailedAcquire

前面节点设置为 singal,设置成功返回true,失败false

 private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
        int ws = pred.waitStatus;
        // 前面的节点SIGNAL自己就可以park了
        if (ws == Node.SIGNAL)
            return true;
        if (ws > 0) {
            // 找到第一个不是取消状态的节点
            do {
                node.prev = pred = pred.prev;
            } while (pred.waitStatus > 0);
            pred.next = node;
        } else {
            /*
             * 设置 WaitStatus SIGNAL
             */
            compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
        }
        return false;
    }

parkAndCheckInterrupt

   private final boolean parkAndCheckInterrupt() {
        LockSupport.park(this);
        return Thread.interrupted();
    }

acquireInterruptibly

acquire 相同, 但是可以中断

 public final void acquireInterruptibly(int arg)
            throws InterruptedException {
        // 被中断抛出InterruptedException
        if (Thread.interrupted())
            throw new InterruptedException();
        if (!tryAcquire(arg))
            doAcquireInterruptibly(arg);
    }
    
private void doAcquireInterruptibly(int arg)
    throws InterruptedException {
    final Node node = addWaiter(Node.EXCLUSIVE);
    boolean failed = true;
    try {
        for (;;) {
            final Node p = node.predecessor();
            if (p == head && tryAcquire(arg)) {
                setHead(node);
                p.next = null; // help GC
                failed = false;
                return;
            }
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                // 被中断抛出InterruptedException
                throw new InterruptedException();
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }

tryAcquireNanos

acquireInterruptibly 基础上增加了超时限制, 超时返回false, 返回true

   public final boolean tryAcquireNanos(int arg, long nanosTimeout)
            throws InterruptedException {
        if (Thread.interrupted())
            throw new InterruptedException();
        return tryAcquire(arg) ||
            doAcquireNanos(arg, nanosTimeout);
    }
    
    private boolean doAcquireNanos(int arg, long nanosTimeout)
            throws InterruptedException {
        if (nanosTimeout <= 0L)
            return false;
        final long deadline = System.nanoTime() + nanosTimeout;
        final Node node = addWaiter(Node.EXCLUSIVE);
        boolean failed = true;
        try {
            for (;;) {
                final Node p = node.predecessor();
                if (p == head && tryAcquire(arg)) {
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return true;
                }
                nanosTimeout = deadline - System.nanoTime();
                // 超时返回false
                if (nanosTimeout <= 0L)
                    return false;
                if (shouldParkAfterFailedAcquire(p, node) &&
                    nanosTimeout > spinForTimeoutThreshold)
                    // park指定时间
                    LockSupport.parkNanos(this, nanosTimeout);
                // 中断抛出异常
                if (Thread.interrupted())
                    throw new InterruptedException();
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }

release

释放信号量, 如果头节点不为空 状态为SINGAL, 唤醒头节点的下一个节点

public final boolean release(int arg) {
    if (tryRelease(arg)) {
        // 释放arg信号量成功
        Node h = head;
        // 如果头节点不为空 状态为SINGAL, 唤醒头节点的下一个节点
        if (h != null && h.waitStatus != 0)
            unparkSuccessor(h);
        return true;
    }
    return false;
}

private void unparkSuccessor(Node node) {
    int ws = node.waitStatus;
    // 唤醒先修改waitStatus从SINGAL->0初始化
    if (ws < 0)
        compareAndSetWaitStatus(node, ws, 0);

    // 找到node之后第一个不被取消的节点, LockSupport.unpark唤醒该节点
    Node s = node.next;
    if (s == null || s.waitStatus > 0) {
        s = null;
        for (Node t = tail; t != null && t != node; t = t.prev)
            if (t.waitStatus <= 0)
                s = t;
    }
    if (s != null)
        LockSupport.unpark(s.thread);
}

参考文献

  • Java并发编程的艺术第二版 方腾飞、魏鹏、程晓明

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1540042.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

水果销售管理网站|jsp+ Mysql+Java+ B/S结构(可运行源码+数据库+设计文档)

本项目包含可运行源码数据库LW&#xff0c;文末可获取本项目的所有资料。 推荐阅读100套最新项目 最新ssmjava项目文档视频演示可运行源码分享 最新jspjava项目文档视频演示可运行源码分享 最新Spring Boot项目文档视频演示可运行源码分享 2024年56套包含java&#xff0c;…

深度学习(二)安装tensorflow深度学习框架

0.前言 速度更新新的一期&#xff0c;快夸奖我。前情提要这是我在window10系统下完成的操作&#xff0c;并不是ubuntu&#xff0c;所以有相应的区别。 1.安装tensorflow和d2l 这里默认大家已经安装好了anconda或者miniconda并且以及创建了虚拟环境。 conda create -n huahuaji…

JavaScript 权威指南第七版(GPT 重译)(六)

第十五章&#xff1a;JavaScript 在 Web 浏览器中 JavaScript 语言是在 1994 年创建的&#xff0c;旨在使 Web 浏览器显示的文档具有动态行为。自那时以来&#xff0c;该语言已经发生了显著的演变&#xff0c;与此同时&#xff0c;Web 平台的范围和功能也迅速增长。今天&#…

浮点数在计算机中的存储

1. 引言 我们知道&#xff0c;整数在计算机中是以二进制补码的形式存储的&#xff0c;那么浮点数呢&#xff1f; 考虑到这个问题&#xff0c;你会感到十分苦恼&#xff0c;因为你并不知道要如何将一个浮点数转化成一段二进制序列。 那我们不妨先来验证一下&#xff0c;整数与…

【Node.js】npx

概述 npx 可以使用户在不安装全局包的情况下&#xff0c;运行已安装在本地项目中的包或者远程仓库中的包。 高版本npm会自带npx命令。 它可以直接运行 node_modules/.bin 下的 exe 可执行文件。而不像之前&#xff0c;我们需要在 scripts 里面配置&#xff0c;然后 npm run …

相交链表:寻找链表的公共节点

目录 一、公共节点 二、题目 三、思路 四、代码 五、代码解析 1.计算长度 2.等长处理 3.判断 六、注意点 1.leetcode的尿性 2.仔细观察样例 3.经验总结 一、公共节点 链表不会像两直线相交一样&#xff0c;相交之后再分开。 由于单链表只有一个next指针&#xff0…

MySQL 8.0-索引- 不可见索引(invisible indexes)

概述 MySQL 8.0引入了不可见索引(invisible index)&#xff0c;这个在实际工作用还是用的到的&#xff0c;我觉得可以了解下。 在介绍不可见索引之前&#xff0c;我先来看下invisible index是个什么或者定义。 我们依然使用拆开来看&#xff0c;然后再把拆出来的词放到MySQL…

Redis高级数结构HyperLogLog

HyperLogLog 概述 概述。 HyperLogLog并不是一种新的数据结构(实际类型为字符串类型),而是一种基数算法&#xff0c;通过HyperLogLog可以利用极小的内存空间完成独立总数的统计&#xff0c;数据集可以是IP、Email、ID等 如果你负责开发维护一个大型的网站&#xff0c;有一天产…

使用 VMWare 安装 Android-x86 系统(小白版)

文章目录 VMWare 介绍Android 系统介绍概述最终效果前置步骤开始安装 VMWare 介绍 VMware Workstation是VMware公司开发的一款桌面虚拟化软件。它允许用户在一台物理计算机上同时运行多个操作系统&#xff0c;每个操作系统都在自己的虚拟机中运行。这使得用户可以在同一台计算…

如何使用swiprt插件

首先可以来到swiper网站看文档(中文) swiper网址地点 可以在这里面去下载swiper版本插件 需要注意的是从Swiper7开始&#xff0c;容器默认类名由’.swiper-container’变更为’.swiper’ 下载后呢 找到swiper-bundle.min.js和swiper-bundle.min.css文件 并放在你的项目中&…

Apollo配置中心

一、介绍 简要介绍 Apollo&#xff08;阿波罗&#xff09;是携程框架部门研发的分布式配置中心&#xff0c;能够集中化管理应用不同环境、不同集群的配置&#xff0c;配置修改后能够实时推送到应用端&#xff0c;并且具备规范的权限、流程治理等特性&#xff0c;适用于微服务…

在Linux中同一个tomcat出现多个进程

第一步&#xff0c;查看服务器所有的启动进程。 命令&#xff1a;top -c 第二步&#xff0c;通过点击“shiftM”&#xff0c;按照内存大小排序&#xff1b;点击“shiftP”&#xff0c;按照CPU大小排序。 在[COMMAND]列找到相同的tomcat进程&#xff0c;可以得到对应的PID。 …

【JavaEE初阶系列】——单例模式 (“饿汉模式“和“懒汉模式“以及解决线程安全问题)

目录 &#x1f6a9;单例模式 &#x1f388;饿汉模式 &#x1f388;懒汉模式 ❗线程安全问题 &#x1f4dd;加锁 &#x1f4dd;执行效率提高 &#x1f4dd;指令重排序 &#x1f36d;总结 单例模式&#xff0c;非常经典的设计模式&#xff0c;也是一个重要的学科&#x…

iStoreOS R4S软路由结合内网穿透实现公网远程本地电脑桌面

文章目录 简介一、配置远程桌面公网地址二、家中使用永久固定地址 访问公司电脑**具体操作方法是&#xff1a;** 简介 软路由是PC的硬件加上路由系统来实现路由器的功能&#xff0c;也可以说是使用软件达成路由功能的路由器。 使用软路由控制局域网内计算机的好处&#xff1a…

(2022级)成都工业学院软件构造实验二:面向对象软件构造

写在前面 1、基于2022级软件工程实验指导书 2、代码仅提供参考 3、如果代码不满足你的要求&#xff0c;请寻求其他的途径 运行环境 window11家庭版 IntelliJ IDEA 2023.2.2 jdk17.0.6 实验要求 任务&#xff1a;在第2章构造任务的基础上用面向对象构造技术&#xff0c…

错误centos docker版本过低导致 is not a valid repository/tag: invalid reference format

文章目录 错误centos docker版本过低导致 is not a valid repository/tag: invalid reference format1、查看免费主机刚才下载的docker版本2、卸载旧版本3、安装yum依赖包4、安装镜像信息5、安装docker CE6、查看docker版本7、再次运行就成功了&#xff01;&#xff01;&#x…

Codeforces Round 930 (Div. 2)(A,B,C,D)

比赛链接 C是个交互&#xff0c;D是个前缀和加二分。D还是很难写的。 A. Shuffle Party 题意&#xff1a; 您将得到一个数组 a 1 , a 2 , … , a n a_1, a_2, \ldots, a_n a1​,a2​,…,an​ 。最初&#xff0c;每个 1 ≤ i ≤ n 1 \le i \le n 1≤i≤n 对应 a i i a_ii…

STM32 使用gcc编译介绍

文章目录 前言1. keil5下的默认编译工具链用的是哪个2. Arm编译工具链和GCC编译工具链有什么区别吗&#xff1f;3. Gcc交叉编译工具链的命名规范4. 怎么下载gcc-arm编译工具链参考资料 前言 我们在STM32上进行开发时&#xff0c;一般都是基于Keil5进行编译下载&#xff0c;Kei…

QT文件读写操作和内容提取

访问IO设备&#xff0c;需要先调用open()来设置正确的OpenMode(例如ReadOnly或ReadWrite) 打开设备后后&#xff0c;使用write() 或putChar() 写入数据到文件和设备&#xff0c;并通过调用read()&#xff0c;readLine() 或readAll() 进行读取&#xff1b;使用完设备后&#xf…

深度学习十大算法之长短时记忆网络(LSTM)

一、长短时记忆网络&#xff08;LSTM&#xff09;的基本概念 长短时记忆网络&#xff08;LSTM&#xff09;是一种特殊类型的循环神经网络&#xff08;RNN&#xff09;&#xff0c;主要用于处理和预测序列数据的任务。LSTM由Hochreiter和Schmidhuber于1997年提出&#xff0c;其…