Java AQS学习

news2024/12/27 12:00:35

目录

1、AQS初步

2、AQS源码

2.1、ReentrantLock类解析

 2.2、AQS源码


JUC-->AQS-->AbstractQueuedSynchronizer:字面意思:抽象的队列同步器

AQS是用来构建锁或者其它同步器组件的重量级基础框架及整个JUC体系的基石,通过内置的FIFO队列来完成资源获取线程的排队工作,并通过一个int类型变量表示持有锁的状态

CLH:Craig、Landin and Hagersten队列,是一个单向链表,AQS中的队列是CLH变体的虚拟双向队列FIFO。

@author Doug Lea
public class ReentrantLock implements Lock, java.io.Serializable {
...
abstract static class Sync extends AbstractQueuedSynchronizer {
...
abstract void lock();
...
}
@author Doug Lea
public class CountDownLatch {
    private static final class Sync extends AbstractQueuedSynchronizer {
        private static final long serialVersionUID = 4982264981922014374L;

        Sync(int count) {
            setState(count);
        }

        int getCount() {
            return getState();
        }

        protected int tryAcquireShared(int acquires) {
            return (getState() == 0) ? 1 : -1;
        }

        protected boolean tryReleaseShared(int releases) {
            // Decrement count; signal when transition to zero
            for (;;) {
                int c = getState();
                if (c == 0)
                    return false;
                int nextc = c-1;
                if (compareAndSetState(c, nextc))
                    return nextc == 0;
            }
        }
    }

    private final Sync sync;
@author Doug Lea
public class ReentrantReadWriteLock
        implements ReadWriteLock, java.io.Serializable {
...
abstract static class Sync extends AbstractQueuedSynchronizer {
@author Doug Lea
public class Semaphore implements java.io.Serializable {
...
abstract static class Sync extends AbstractQueuedSynchronizer {

并发大神 Doug Lea,提出统一规范并简化了锁的实现,屏蔽了同步状态管理、阻塞线程排队和通知、唤醒机制等。

加锁会导致阻塞:有阻塞就需要排队,实现排队必然需要有某种形式的队列来进行管理

解释说明:抢到资源的线程直接使用处理业务逻辑,抢不到资源的必然涉及一种排队等候机制。抢占资源失败的线程继续去等待(类似银行业务办理窗口都满了,暂时没有受理窗口的顾客只能去候客区排队等候),但等候线程仍然保留获取锁的可能且获取锁流程仍然继续(候客区的顾客也在等着叫号,轮到了再去受理窗口办理业务)。

既然说到了排队等候机制,那么就一定会有某种队列形成,这样的队列是什么数据结构呢?

如果共享资源被占用,就需要一定的阻塞等待唤醒机制来保证锁分配。这个机制主要用的是CLH队列的变体实现的,将暂时获取不到锁的线程加入到队列中,这个队列就是AQS的抽象表现。它将请求共享资源的线程封装成队列的节点(Node),通过CAS,自旋以及LockSupport.park()的方式,维护state变量的状态,使并发达到同步的控制效果。

1、AQS初步

/**
 * Provides a framework for implementing blocking locks and related 
//为实现阻塞锁和相关的同步器提供一个框架,它是依赖于先进先出的一个等待
 * synchronizers (semaphores, events, etc) that rely on
 * first-in-first-out (FIFO) wait queues.  This class is designed to
 * be a useful basis for most kinds of synchronizers that rely on a
 * single atomic {@code int} value to represent state. Subclasses
 * must define the protected methods that change this state, and which 
//依靠单个原子int值来表示状态,通过占用和释放方法,改变状态值
 * define what that state means in terms of this object being acquired
 * or released.  Given these, the other methods in this class carry
 * out all queuing and blocking mechanics. Subclasses can maintain
 * other state fields, but only the atomically updated {@code int}
 * value manipulated using methods {@link #getState}, {@link
 * #setState} and {@link #compareAndSetState} is tracked with respect
 * to synchronization.
......
 * @since 1.5
 * @author Doug Lea
 */
public abstract class AbstractQueuedSynchronizer
    extends AbstractOwnableSynchronizer
    implements java.io.Serializable {

有阻塞就需要排队,实现排队必然需要队列

AQS使用一个volatile的int类型的成员变量来表示同步状态,通过内置的FIFO队列来完成资源获取的排队工作将每条要去抢占资源的线程封装成一个Node节点来实现锁的分配,通过CAS完成对state值的修改。

AQS=state变量+CLH变种的双端队列

 * @since 1.5
 * @author Doug Lea
 */
public abstract class AbstractQueuedSynchronizer
    extends AbstractOwnableSynchronizer
    implements java.io.Serializable {
...

    /**
     * Wait queue node class.
     *
     * <p>The wait queue is a variant of a "CLH" (Craig, Landin, and
     * Hagersten) lock queue. CLH locks are normally used for
     * spinlocks.  We instead use them for blocking synchronizers, but
     * use the same basic tactic of holding some of the control
     * information about a thread in the predecessor of its node.  A
     * "status" field in each node keeps track of whether a thread
     * should block.  A node is signalled when its predecessor
     * releases.  Each node of the queue otherwise serves as a
     * specific-notification-style monitor holding a single waiting
     * thread. The status field does NOT control whether threads are
     * granted locks etc though.  A thread may try to acquire if it is
     * first in the queue. But being first does not guarantee success;
     * it only gives the right to contend.  So the currently released
     * contender thread may need to rewait.
     *
     * <p>To enqueue into a CLH lock, you atomically splice it in as new
     * tail. To dequeue, you just set the head field.
     * <pre>
     *      +------+  prev +-----+       +-----+
     * head |      | <---- |     | <---- |     |  tail
     *      +------+       +-----+       +-----+
     * </pre>
...

    //队列中每个排队的个体就是一个Node,Node<Thread>, Node=waitStatus+前后指针指向
    static final class Node {
        /** Marker to indicate a node is waiting in shared mode */
        static final Node SHARED = new Node();//共享模式:表示线程以共享模式等待锁
        /** Marker to indicate a node is waiting in exclusive mode */
        static final Node EXCLUSIVE = null;//独占模式:表示线程正在以独占的方式等待锁

        /** waitStatus value to indicate thread has cancelled */
        static final int CANCELLED =  1;//线程被取消了
        /** waitStatus value to indicate successor's thread needs unparking */
        static final int SIGNAL    = -1;//后继线程需要唤醒
        /** waitStatus value to indicate thread is waiting on condition */
        static final int CONDITION = -2;//等待condition唤醒
        /**
         * waitStatus value to indicate the next acquireShared should
         * unconditionally propagate
         */
        static final int PROPAGATE = -3;//共享式同步状态获取将会无条件地传播下去

        volatile int waitStatus;//当前节点在队列中的状态,初始为0,状态是上面的几种

        volatile Node prev;//前置节点

        volatile Node next;//后继节点

        /**
         * The thread that enqueued this node.  Initialized on
         * construction and nulled out after use.
         */
        volatile Thread thread;//表示处于该节点的线程
...
}

    /**
     * Head of the wait queue, lazily initialized.  Except for
     * initialization, it is modified only via method setHead.  Note:
     * If head exists, its waitStatus is guaranteed not to be
     * CANCELLED.
     */
    private transient volatile Node head;

    /**
     * Tail of the wait queue, lazily initialized.  Modified only via
     * method enq to add new wait node.
     */
    private transient volatile Node tail;

    /**
     * The synchronization state.
     */
    private volatile int state;
....
    protected final boolean compareAndSetState(int expect, int update) {
        // See below for intrinsics setup to support this
        return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
    }

AQS同步队列的基本结构

2、AQS源码

2.1、ReentrantLock类解析

 从ReentrantLock开始解读AQS,Lock接口的实现类,基本都是通过[聚合]了一个[队列同步器]的子类完成线程访问控制的。

对比公平锁和非公平锁的tryAcquire()方法的实现代码,其实差别在于非公平锁获取锁时比公平锁少一个判断!hasQueuedPredecessors()

    public final boolean hasQueuedPredecessors() {
        // The correctness of this depends on head being initialized
        // before tail and on head.next being accurate if the current
        // thread is first in queue.
        Node t = tail; // Read fields in reverse initialization order
        Node h = head;
        Node s;
        return h != t &&
            ((s = h.next) == null || s.thread != Thread.currentThread());
    }

!hasQueuedPredecessors()中判断了是否需要排队,导致公平锁和非公平锁的差异如下:

公平锁:公平锁讲究先来先到,线程在获取锁时,如果这个锁的等待队列中已经有线程在等待,那么当前线程就会进入等待队列中。

非公平锁:不管是否有等待队列,如果可以获取锁,则立刻占有锁对象。也就是说队列的第一个排队线程在unpark(),之后还是需要竞争锁(存在线程竞争的情况下)

public class ReentrantLock implements Lock, java.io.Serializable {
....
    public ReentrantLock() {
        sync = new NonfairSync();
    }
    public ReentrantLock(boolean fair) {
        sync = fair ? new FairSync() : new NonfairSync();
    }

    static final class FairSync extends Sync {

        final void lock() {
            acquire(1);
        }
        protected final boolean tryAcquire(int acquires) {
            final Thread current = Thread.currentThread();
            int c = getState();
            if (c == 0) {
                if (!hasQueuedPredecessors() &&
                    compareAndSetState(0, acquires)) {
                    setExclusiveOwnerThread(current);
                    return true;
                }
            }
            else if (current == getExclusiveOwnerThread()) {
                int nextc = c + acquires;
                if (nextc < 0)
                    throw new Error("Maximum lock count exceeded");
                setState(nextc);
                return true;
            }
            return false;
        }
    }

    static final class NonfairSync extends Sync {
        final void lock() {
            if (compareAndSetState(0, 1))
                setExclusiveOwnerThread(Thread.currentThread());
            else
                acquire(1);
        }

        protected final boolean tryAcquire(int acquires) {
            return nonfairTryAcquire(acquires);
        }
    }

 2.2、AQS源码

import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantLock;

public class AQSDemo {

	public static void main(String[] args) {
		ReentrantLock lock = new ReentrantLock();
		// 引入银行办理业务场景
		// 3个线程模拟3个来银行办理业务的客户,暂时只开一个受理窗口
		// A 顾客是第一个顾客,受理窗口没有人,A可以直接去办理
		new Thread(() -> {
			lock.lock();
			try {
				System.out.println("--A thread come in");
				try {
					TimeUnit.MINUTES.sleep(1);
				} catch (InterruptedException e) {
					e.printStackTrace();
				}
			} finally {
				lock.unlock();
			}
		}, "A").start();
		// B 顾客,第二个来的只能等待
		new Thread(() -> {
			lock.lock();
			try {
				System.out.println("--B thread come in");
			} finally {
				lock.unlock();
			}
		}, "B").start();
		// C 顾客,第三个来的只能等待
		new Thread(() -> {
			lock.lock();
			try {
				System.out.println("--C thread come in");
			} finally {
				lock.unlock();
			}
		}, "C").start();

	}

}

 下面不贴图了

//AQS主要源码,往下
    public final void acquire(int arg) {
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }

//1.tryAcquire
        protected final boolean tryAcquire(int acquires) {
            return nonfairTryAcquire(acquires);
        }
        final boolean nonfairTryAcquire(int acquires) {
            final Thread current = Thread.currentThread();
            int c = getState();
            if (c == 0) {
                if (compareAndSetState(0, acquires)) {
                    setExclusiveOwnerThread(current);
                    return true;
                }
            }
            else if (current == getExclusiveOwnerThread()) {
                int nextc = c + acquires;
                if (nextc < 0) // overflow
                    throw new Error("Maximum lock count exceeded");
                setState(nextc);
                return true;
            }
            return false;
        }

//2.进入队列等待
    private Node addWaiter(Node mode) {
        Node node = new Node(Thread.currentThread(), mode);
        // Try the fast path of enq; backup to full enq on failure
        Node pred = tail;
        if (pred != null) {
            node.prev = pred;
            if (compareAndSetTail(pred, node)) {
                pred.next = node;
                return node;
            }
        }
        enq(node);
        return node;
    }

//2.1进入队列
    private Node enq(final Node node) {
        for (;;) {
            Node t = tail;
            if (t == null) { // Must initialize
                if (compareAndSetHead(new Node()))//头节点为new Node()空节点,占位使用,可称傀儡节点/哨兵节点
                    tail = head;
            } else {
                node.prev = t;//把当前节点的前指针指向尾指针
                if (compareAndSetTail(t, node)) {//比较并交换把当前要入队的节点和尾节点进行交换
                    t.next = node;//上一个节点的下一个节点是当前节点
                    return t;//
                }
            }
        }
    }

//3.
    final boolean acquireQueued(final Node node, int arg) {
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
                final Node p = node.predecessor();
                if (p == head && tryAcquire(arg)) {
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return interrupted;
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }

干我们这行,啥时候懈怠,就意味着长进的停止,长进的停止就意味着被淘汰,只能往前冲,直到凤凰涅槃的一天!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/477087.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

InnoDB 引擎 底层逻辑

目录 0 课程视频 1 逻辑存储结构 1.1 结构图 1.2 表空间 -> 记录 索引 存储记录 等数据 1.2.1 储存在 cd/var/lib/mysql -> ll -> 目录 mysql.ibd 1.3 段 -> 索引 存储记录 具体存储 1.3.1 数据段 b树 叶子节点 1.3.2 索引段 b树的 非叶子节点 1.3.3 回滚段…

Java 基础入门篇(一)——— Java 概述

文章目录 一、Java 概述二、Java 的产品 JDK2.1 JDK 安装2.2 Java与 Javac 介绍2.3 Java 程序的开发步骤 三、Java 程序的执行原理四、JDK 的组成五、Java 的跨平台工作原理 一、Java 概述 Java 是 sun 公司在 1995 年推出的一门计算机高级编程语言&#xff0c;其语言风格接近人…

javaEE+mysql学生竞赛管理系统

本系统是基于JAVA平台开发的一套学生竞赛信息管理的系统。系统采用JSP为编程语言。数据库采用Mysql建立数据之间的转换。论文主要介绍了本课题的开发背景&#xff0c;所要完成的功能和开发的过程。重点的说明了系统设计的重点、设计思想、难点技术和解决方案。 本课题的目的是使…

none模式配置网络操作

基于Docker引擎启动Nginx WEB容器&#xff0c;默认以None方式启动Docker容器&#xff0c;此处使用pipework工具手工给容器指定桥接网卡&#xff0c;并且手工配置IP地址&#xff0c;操作指令如下&#xff1a; #查看镜像列表&#xff1b; docker images#运行新的容器 docker ru…

【Python】【进阶篇】17、如何配置settings.py文件

目录 17、如何配置settings.py文件1) 修改语言与时区配置2) 设置时区不敏感3) 配置项目所需数据库4&#xff09;学会阅读报错信息 17、如何配置settings.py文件 《settings.py配置文件详解》一文中&#xff0c;将 settings.py 配置文件的每一项给大家做了介绍。在开发的过程中…

MYSQL进阶02

MYSQL进阶02 数据类型char与varchartext与blob浮点数与定点数日期类型的选择 数据类型 char与varchar char和varchar类型类似&#xff0c;都用来存储字符串&#xff0c;但是他们保存和检索的方式不同。char属于固定长度的字符类型&#xff0c;而varchar属于可变长度的字符类型…

JavaWeb学习--RequestResponse

目录 JavaWeb学习--Request&Response 1&#xff0c;Request和Response的概述 request:获取请求数据 response:设置响应数据 **小结** 2&#xff0c;Request对象 **小结** 2.2 Request获取请求数据 **小结** 2.4 请求参数中文乱码问题 URL编码 2.5 Request请求转…

c提高学习——选择排序算法

#define _CRT_SECURE_NO_WARNINGS #include <stdio.h> #include <string.h> #include <stdlib.h>//选择排序 void mySort(int arr[], int len) {for (int i 0; i < len; i){//认定i是最小值的下标int min i;for (int j i1; j < len; j){if (arr[min…

ABTEST平台建设思路与方案

导读 ABTest的作用&#xff1a; 用ABTEST的结果数据&#xff0c;论证是因为某个业务方案的调整&#xff0c;对产品能力的影响。ABTEST是一个过程&#xff0c;只是为了证明改动的效果&#xff0c;其最终的阶段一定是对某个方案进行推全结束实验&#xff0c;避免稳定的业务流程…

STM32的位带操作

STM32的位带操作 为什么需要位带操作&#xff1f; 因为编程需要操作某个bit位来达到我们想要的功能&#xff0c;比如点灯需要操作GPIOA->ODR 的某个bit假设是第2bit&#xff0c;写1就可以让GPIO输出一个高电平。 GPIOA->ODR | 1<<2;这样写其实有三个隐含的操作…

SpringSecurity-从入门到精通

SpringSecurity从入门到精通 一、简介1.1 官网介绍1.2 认证与授权 二、使用步骤2.1 快速入门2.2 认证流程2.3 相关概念 三、解决问题3.1 思路分析3.2 准备工作3.3 具体实现3.4 加密存储3.5 登录接口3.6 认证过滤器 一、简介 1.1 官网介绍 SpringSecurity 是一个强大且高度自定…

Stable Diffusion-生式AI的新范式

! 扩散模型&#xff08;Stable Diffusion)现在是生成图像的首选模型。由于扩散模型允许我们以提示( prompts)为条件生成图像&#xff0c;我们可以生成我们所选择的图像。在这些文本条件的扩散模型中&#xff0c;稳定扩散模型由于其开源性而最为著名。 在这篇文章中&#xff0…

通用智能的瓶颈及可能的解决途径

通用智能是指能够在各种不同的任务和环境中灵活地适应和执行任务的智能。通用智能与特定任务的智能相反&#xff0c;后者只能在特定领域或任务中表现出色。通用智能的理论基础是人工智能领域的通用人工智能&#xff08;AGI&#xff09;研究&#xff0c;旨在设计出能够像人类一样…

【Java笔试强训 5】

&#x1f389;&#x1f389;&#x1f389;点进来你就是我的人了博主主页&#xff1a;&#x1f648;&#x1f648;&#x1f648;戳一戳,欢迎大佬指点! 欢迎志同道合的朋友一起加油喔&#x1f93a;&#x1f93a;&#x1f93a; 目录 一、选择题 二、编程题 &#x1f525;统计回文…

Zynq-7000、FMQL45T900的GPIO控制(三)---linux管脚编号计算

本文主要对在Linux下使用zynq-7000或者FMQL45T900控制MIO/EMIO 首先内核配置项 如下&#xff0c;这个不用太多关注&#xff0c;一般都是默认打开的 CONFIG_GPIO_SYSFSy CONFIG_SYSVIPCy CONFIG_GPIO_ZYNQy两者的控制都是流程都是一样的&#xff0c;在细节上又区别 首先都在…

Go | 一分钟掌握Go | 9 - 通道

作者&#xff1a;Mars酱 声明&#xff1a;本文章由Mars酱编写&#xff0c;部分内容来源于网络&#xff0c;如有疑问请联系本人。 转载&#xff1a;欢迎转载&#xff0c;转载前先请联系我&#xff01; 前言 在Java中&#xff0c;多线程之间的通信方式有哪些&#xff1f;记得吗&…

浪潮之巅 OpenAI有可能是历史上第一个10万亿美元的公司

淘金时代很像 如果你那个时候去加州淘金&#xff0c;一大堆人会死掉&#xff0c;但是卖勺子的人、卖铲子的人永远可以赚钱。所谓的shove and pick business。 大模型是平台型机会。按照我们几天的判断&#xff0c;以模型为先的平台&#xff0c;将比以信息为先的平台体量更大。…

带你深入学习k8s--(四) 控制器(k8s核心)

目录 一、概念 1、什么是控制器 2、控制器执行流程 3、控制器类型 二、控制器的使用 1、ReplicaSet 2、Deployment 1、版本迭代 2、回滚 3、修改滚动更新策略 4、暂停与恢复 3、daemonset 4、job 5、cronjob 前言&#xff1a; 上一章我们说到&#xff0c;pod有…

C++——入门基础知识

0.关注博主有更多知识 C知识合集 目录 1.命名空间 1.1命名空间的定义 1.2命名空间的使用 1.3命名空间定义的补充 2.输入与输出 3.缺省参数 3.1全缺省参数 3.2半缺省参数 3.3缺省参数的补充 4.函数重载 4.1C为什么支持函数重载&#xff1f; &#xff15;.引用 5.…

Wine运行器3.2.1——Windows虚拟机模块支持非X86架构

不写太多啥了&#xff0c;详细介绍看这里就行&#xff1a;https://bbs.deepin.org/post/248098 更新内容 ※1、Windows 虚拟机安装工具支持非 X86 架构&#xff1b; ※2、应用打包器可以与星火应用商店配合构建 arm/all 全架构的 Wine 包&#xff1b; ※3、Windows 虚拟机安装…