AQS框架

news2025/1/19 23:10:06

文章目录

    • 概要
    • AQS概述
    • 公平锁与非公平锁原理
    • 可重入

概要

假设现在需要写一个SDK层面的锁,应该如何实现呢?
初步的思路如下:

  1. 搞一个状态标记,用来表示持有或未持有锁,但得是 volatile 类型的保证线程可见性。
  2. 编写一个 lock unlock 函数用于抢锁和释放锁,就是对状态标记的修改操作
  3. unlock函数要保证并发下只能有一个线程能抢到锁,其他线程要等待获取锁(阻塞式),可以采用CAS+自旋的方式实现

初步实现如下:

public class MyLock {

    // 定义一个状态变量status:为1表示锁被持有,为0表示锁未被持有
    private volatile int status;

    private static final long valueOffset;

    private static final Unsafe unsafe = reflectGetUnsafe();

    static {
        try {
            valueOffset = unsafe.objectFieldOffset
                    (MyLock.class.getDeclaredField("status"));
        } catch (Exception ex) {
            throw new Error(ex);
        }
    }

    private static Unsafe reflectGetUnsafe() {
        Field field = null;
        try {
            field = Unsafe.class.getDeclaredField("theUnsafe");
            field.setAccessible(true);
            return (Unsafe) field.get(null);
        } catch (Exception e) {
            e.printStackTrace();
            return null;
        }

    }

    /**
     * 阻塞式获取锁
     *
     * @return
     */
    public boolean lock() {
        while (!compareAndSet(0, 1)) {

        }
        return true;
    }

    // cas 设置 status
    public final boolean compareAndSet(int expect, int update) {
        return unsafe.compareAndSwapInt(this, valueOffset,
                expect, update);
    }

    /**
     * 释放锁
     */
    public void unlock() {
        status = 0;
    }
}

初步实现的代码,存在获取不到锁自旋时,是空转,浪费CPU的问题
改进1:

/**
 * 阻塞式获取锁
 *
 * @return
 */
public boolean lock() {
    while (!compareAndSet(0, 1)) {
        Thread.yield();//yield+自旋,尽可能的防止CPU空转,让出CPU资源
    }
    return true;
}

或者可以采用线程休眠的方式,但是休眠时间不太好确定,太长太短都不好。

2、采用等待唤醒机制,但是这里由于没有使用 synchronized 关键字,所以也无法使用 wait/notify ,但是我们可以使用 park/unpark ,获取不到锁的线程 park 并且去队列排队,释放锁时从队列拿出一个线程 unpark


private static final Queue<Thread> QUEUE = new LinkedBlockingQueue<>();

/**
 * 阻塞式获取锁
 *
 * @return
 */
public boolean lock() {
    while (!compareAndSet(0, 1)) {
        //Thread.yield();//yield+自旋,尽可能的防止CPU空转,让出CPU资源
        QUEUE.offer(Thread.currentThread());
        LockSupport.park();//线程休眠
    }
    return true;
}
/**
 * 释放锁
 */
public void unlock() {
    status = 0;
    LockSupport.unpark(QUEUE.poll());
}

上面基本能实现一个简单的锁的功能,如果在AQS框架的基础上实现,又应该如何实现呢

AQS概述

AQS(AbstractQueuedSynchronizer):抽象队列同步器,定义了一套多线程访问共享资源的同步器框架,提供了SDK层面的锁机制,JUC中的很多类譬如:ReentrantLock/Semaphore/CountDownLatch…等都是基于它。

  1. AQS用一个 volatile int state; 属性表示锁状态,1表示锁被持有,0表示未被持有,具体的维护由子类去维护,但是提供了修改该属性的三个方法: getState()setState(int newState)compareAndSetState(int expect, int update) ,其中CAS方法是核心。
  2. 框架内部维护了一个FIFO的等待队列,是用双向链表实现的,我们称之为CLH队列
  3. 框架也内部也实现了条件变量 Condition ,用它来实现等待唤醒机制,并且支持多个条件变量
  4. AQS支持两种资源共享的模式:独占模式(Exclusive)和共享模式(Share),所谓独占模式就是任意时刻只允许一个线程访问共享资源,譬如ReentrantLock;而共享模式指的就是允许多个线程同时访问共享资源,譬如Semaphore/CountDownLatch
  5. 使用者只需继承 AbstractQueuedSynchronizer 并重写指定的方法,在方法内完成对共享资源 state 的获取和释放,至于具体线程等待队列的维护,AQS已经在顶层实现好了,在那些 final 的模板方法里
  6. AQS底层使用了模板方法模式,给我们提供了许多模板方法,我们直接使用即可
API说明
final void acquire(int arg)独占模式获取锁,AQS顶层已实现,内部调用了tryAcquire
boolean tryAcquire(int arg)独占模式尝试获取锁,AQS中未实现,由子类去实现,获取到锁返回true
final boolean release(int arg)释放独占锁,AQS顶层已实现,内部调用了tryRelease
boolean tryRelease(int arg)尝试释放独占锁,AQS中未实现,由子类去实现,成功释放返回true
final void acquireShared(int arg)共享模式获取锁,AQS顶层已实现,内部调用了tryAcquireShared
int tryAcquireShared(int arg)尝试获取共享锁,返回负数表示失败,0表示成功,但没有剩余可用资源;
正数表示成功,且有剩余资源,AQS中未实现,由子类实现
final boolean releaseShared(int arg)释放共享锁,返回true代表释放成功,AQS中已实现,内部调用了tryReleaseShared
boolean tryReleaseShared(int arg)尝试释放锁,释放后允许唤醒后续等待结点返回true,否则返回false,
AQS中未实现,需要由子类实现
boolean isHeldExclusively()共享资源是否被独占

AQS基本使用
现在有个场景,基于AQS来实现一个锁

/**
 * 基于 aqs实现锁
 */
public class MyLock2 implements Lock {
    //同步器
    private Syn syn = new Syn();

    @Override
    public void lock() {
        // 模板方法
        syn.acquire(1);
    }

    @Override
    public void unlock() {
        // 模板方法
        syn.release(0);
    }

    class Syn extends AbstractQueuedSynchronizer {
        @Override
        protected boolean tryAcquire(int arg) {
            if (compareAndSetState(0, arg)) {
                return true;
            }
            return false;
        }

        @Override
        protected boolean tryReleaseShared(int arg) {
            setState(arg);
            return true;
        }
    }

    // 其他接口方法暂时先不实现 省略
}

公平锁与非公平锁原理

自己实现的锁在使用过程中发现一个问题,就是有时候有的线程特别容易抢到锁,而有的线程老是抢不到锁。这其实就是涉及到锁是否是公平的,那么什么是公平锁什么是非公平锁呢?这时候就要看看获取锁的模板方法中是如何实现的

/**
  * Acquires in exclusive mode, ignoring interrupts.  Implemented
  * by invoking at least once {@link #tryAcquire},
  * returning on success.  Otherwise the thread is queued, possibly
  * repeatedly blocking and unblocking, invoking {@link
  * #tryAcquire} until success.  This method can be used
  * to implement method {@link Lock#lock}.
  *
  * @param arg the acquire argument.  This value is conveyed to
  *        {@link #tryAcquire} but is otherwise uninterpreted and
  *        can represent anything you like.
  */
 public final void acquire(int arg) {
 	// tryAcquire由子类来实现
     if (!tryAcquire(arg) &&
         acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
         selfInterrupt();
 }
  1. 线程一来首先调用 tryAcquire ,在 tryAcquire 中直接CAS获取锁,如果获取不成功通过 addWaiter 加入等待队列,然后走 acquireQueued 让队列中的某个等待线程去获取锁。

  2. 不公平就体现在这里,线程来了也不先看一下等待队列中是否有线程在等待,如果没有线程等待,那直接获取锁没什么 问题,如果有线程等待就直接去获取锁不就相当于插队么?

如何实现公平性呢?

查看 AbstractQueuedSynchronizer 的类定义,主要定义如下

public abstract class AbstractQueuedSynchronizer
    extends AbstractOwnableSynchronizer
    implements java.io.Serializable {

    private static final long serialVersionUID = 7373984972572414691L;

    protected AbstractQueuedSynchronizer() { }

    /**
     * Wait queue node class.
     *
     */
    static final class Node {
        /** Marker to indicate a node is waiting in shared mode */
        static final Node SHARED = new Node();
        /** Marker to indicate a node is waiting in exclusive mode */
        static final Node EXCLUSIVE = null;

        /** waitStatus value to indicate thread has cancelled */
        static final int CANCELLED =  1;
        /** waitStatus value to indicate successor's thread needs unparking */
        static final int SIGNAL    = -1;
        /** waitStatus value to indicate thread is waiting on condition */
        static final int CONDITION = -2;
    
        static final int PROPAGATE = -3;

        volatile int waitStatus;
        volatile Node prev;
        volatile Node next;
        volatile Thread thread;
        Node nextWaiter;
    }
    public class ConditionObject implements Condition, java.io.Serializable { }
}

内部类 Node 以及其类型的变量head 和 tail 就表示 AQS 内部的一个等待队列,而剩下的 state 变量就用来表示锁的状态。

等待队列应该就是线程获取锁失败时,需要临时存放的一个地方,用来等待被唤醒并尝试获取锁。再看 Node 的属性我们知道, Node 存放了当前线程的指针 thread ,也即可以表示当前线程并对其进行某些操作, prev 和 next 说明它构成了一个双向链表,也就是为某些需要得到前驱或后继节点的算法提供便利。

AQS加锁最核心代码如下:

public final void acquire(int arg) {
      if (!tryAcquire(arg) &&
          acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
          selfInterrupt();
  }

其获取锁标识的过程,图解如下

在这里插入图片描述

那如何让自定义的锁是公平的呢?
其实导致不公平的原因就是线程每次调用 acquire 时,都会先去tryAcquire ,而该方法目前的实现时直接去抢锁,也不看现在等待队列中有没有线程在排队,如果有线程在排队,那岂不是变成了插队,导致不公平。所以现在的解决办法就是,在 tryAcquire 时先看一下等待队列中是否有在排队的,如果有那就乖乖去排队,不插队,如果没有则可以直接去获取锁。
那如何知道线程AQS等待队列中是否有线程排队呢?其实AQS顶层已经实现好了,它提供了一个 hasQueuedPredecessors 函数:如果在当前线程之前有一个排队的线程,则为True; 如果当前线程位于队列的头部(head.next )或队列为空,则为false。

@Override
protected boolean tryAcquire(int arg) {
    //先判断等待队列中是否有线程在排队 没有线程排队则直接去获取锁
    if (!hasQueuedPredecessors() &&
            compareAndSetState(0, arg)) {
        return true;
    }
    return false;
}

现在已经有公平锁了,稍微改造,就可以既能支持公平锁,也支持非公平锁

public class MyLock2 implements Lock {
    //同步器
    private Sync syn;

    MyLock2() {
        syn = new NoFairSync();
    }

    MyLock2(boolean fair) {
        syn = fair ? new FairSync() : new NoFairSync();
    }

    @Override
    public void lock() {
        //调用模板方法
        syn.acquire(1);
    }

    @Override
    public void unlock() {
        //调用模板方法
        syn.release(0);
    }

    // 实现一个独占同步器
    class Sync extends AbstractQueuedSynchronizer {
        @Override
        protected boolean tryRelease(int arg) {
            setState(arg);
            return true;
        }
    }

    class FairSync extends Sync {
        @Override
        protected boolean tryAcquire(int arg) {
            //先判断等待队列中是否有线程在排队 没有线程排队则直接去获取锁
            if (!hasQueuedPredecessors() &&
                    compareAndSetState(0, arg)) {
                return true;
            }
            return false;
        }
    }

    class NoFairSync extends Sync {
        @Override
        protected boolean tryAcquire(int arg) {
        //直接去获取锁
            if (compareAndSetState(0, arg)) {
                return true;
            }
            return false;
        }
    }
}

可重入

那如何让锁支持可重入呢?也就是说如果一个线程持有锁之后,还能继续获取锁,也就是说让锁只对不同线程互斥。
查看 AbstractQueuedSynchronizer 的定义我们发现,它还继承自另一个类: AbstractOwnableSynchronizer

public abstract class AbstractQueuedSynchronizer extends
AbstractOwnableSynchronizer
implements java.io.Serializable {...}
public abstract class AbstractOwnableSynchronizer
implements java.io.Serializable {
private transient Thread exclusiveOwnerThread;
protected final void setExclusiveOwnerThread(Thread
thread) {...}
protected final Thread getExclusiveOwnerThread(){...}
}

AQS 中有个变量是可以保存当前持有独占锁的线程的。当我们获取锁时,如果发现锁被持有不要着急放弃,先看看
持有锁的线程是否时当前线程,如果是还能继续获取锁。
另外关于可重入锁,还要注意一点,锁的获取和释放操作是成对出现的,就像下面这样

lock
	lock
		lock
			lock
				....
			unlock
		unlock
	unlock
unlock

对于重入锁不仅要能记录锁被持有,还要记录重入的次数,释放的时候也不是直接将锁真实的释放,而是先减少重入次数,能释放的时候在释放。
故此时状态变量 state 不在只有两个取值 0,1 ,某线程获取到锁state=1 ,如果当前线程重入获取只需增加状态值 state=2 ,依次同理,锁释放时释放一次状态值 -1 ,当 state=0 时才真正释放,其他线程才能继续获取锁

修改后代码如下:

public class MyLock2 implements Lock {
    //同步器
    private Sync syn;

    MyLock2() {
        syn = new NoFairSync();
    }

    MyLock2(boolean fair) {
        syn = fair ? new FairSync() : new NoFairSync();
    }

    @Override
    public void lock() {
        //调用模板方法
        syn.acquire(1);
    }

    @Override
    public void unlock() {
        //调用模板方法
        syn.release(0);
    }

    // 实现一个独占同步器
    class Sync extends AbstractQueuedSynchronizer {
        @Override
        protected boolean tryRelease(int arg) {
            if (Thread.currentThread() !=
                    getExclusiveOwnerThread()) {
                throw new IllegalMonitorStateException();
            }
            boolean realRelease = false;
            int nextState = getState() - arg;
            if (nextState == 0) {
                realRelease = true;
                setExclusiveOwnerThread(null);
            }
            setState(nextState);
            return realRelease;
        }
    }

    class FairSync extends Sync {
        @Override
        protected boolean tryAcquire(int arg) {
            final Thread currentThread = Thread.currentThread();
            int currentState = getState();
            if (currentState == 0) { // 可以获取锁
                //先判断等待队列中是否有线程在排队 没有线程排队则直接去获取锁
                if (!hasQueuedPredecessors() &&
                        compareAndSetState(0, arg)) {
                    setExclusiveOwnerThread(currentThread);
                    return true;
                }
            } else if (currentThread == getExclusiveOwnerThread()) {
                //重入逻辑 增加 state值
                int nextState = currentState + arg;
                if (nextState < 0) {
                    throw new Error("Maximum lock count exceeded");
                }
                setState(nextState);
                return true;
            }
            return false;
        }

    }

    class NoFairSync extends Sync {
        @Override
        protected boolean tryAcquire(int arg) {
            Thread currentThread = Thread.currentThread();
            int currentState = getState();
            if (currentState == 0) {
                //直接去获取锁
                if (compareAndSetState(0, arg)) {
                    setExclusiveOwnerThread(currentThread);
                    return true;
                }
            } else if (currentThread == getExclusiveOwnerThread()) {
                //重入逻辑 增加 state值
                int nextState = currentState + arg;
                if (nextState < 0) {
                    throw new Error("Maximum lock count exceeded");
                }
                setState(nextState);
                return true;
            }
            return false;
        }
    }
}    

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1989728.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

揭秘公司高效查快递的秘密武器

在快节奏的现代商务环境中&#xff0c;物流管理的效率直接关系到企业的运营成本和客户满意度。对于拥有大量快递业务往来的公司而言&#xff0c;如何快速、准确地追踪每一个包裹的物流信息&#xff0c;成为了一项至关重要的任务。今天&#xff0c;我们将揭秘一款公司高效查快递…

智慧农场数字港系统设计与实现

1 项目介绍 1.1 摘要 农业是一个国家的根本之一&#xff0c;也是国家经济、社会发展的重中之重&#xff0c;从“粮食第一”方针到农业生产市场化&#xff0c;再到乡村振兴、加强扶持农业技术创新和基础建设&#xff0c;我国的农业发展以及走过了几个阶段&#xff0c;并一直在…

Nature教你怎么用GPT做学术

ChatGPT如何助力学术写作&#xff1a;三个关键方式 生成性人工智能&#xff08;AI&#xff09;在近年来逐渐成为学术界的热门话题。Dritjon Gruda在2024年4月发表于《Nature》的一篇文章中&#xff0c;详细探讨了ChatGPT如何在学术写作、编辑和同行评审中提供帮助。这篇文章将…

第R2周:Pytorch实现:LSTM-火灾温度预测

nn.LSTM() 函数详解 nn.LSTM 是 PyTorch 中用于创建长短期记忆&#xff08;Long Short-Term Memory&#xff0c;LSTM&#xff09;模型的类。LSTM 是一种循环神经网络&#xff08;Recurrent Neural Network&#xff0c;RNN&#xff09;的变体&#xff0c;用于处理序列数据&#…

常见的框架漏洞

框架 Web框架(Web framework)或者叫做Web应⽤框架(Web application framework)&#xff0c;是⽤于 进⾏Web开发的⼀套软件架构。⼤多数的Web框架提供了⼀套开发和部署⽹站的⽅式。为Web的 ⾏为提供了⼀套⽀持⽀持的⽅法。使⽤Web框架&#xff0c;很多的业务逻辑外的功能不需要⾃…

微步社区帖子中使用编码数据调戏吃瓜群众初探

什么&#xff0c;居然有人在微步社区公然使用编码后的字符串调戏吃瓜群众。 在演练活动的的某一天&#xff0c;微步威胁情报社区突然流行多重编码后内容的帖子。作者本着为人民群众利益着想的目的&#xff0c;结合毕生所学&#xff0c;决定要将这些奇技淫巧和小把戏公之于众。…

R 语言学习教程,从入门到精通,R 判断语句(7)

1、R 判断语句 判断结构要求程序员指定一个或多个要评估或测试的条件&#xff0c;以及条件为真时要执行的语句&#xff08;必需的&#xff09;和条件为假时要执行的语句&#xff08;可选的&#xff09;。 下面是大多数编程语言中典型的判断结构的一般形式&#xff1a; R 语言…

嵌入式linux系统中USART应用实现

各位开发者大家好,今天主要给大家分享一下,如何在linux系统中使用UART串口的功能。 第一:串口的作用 UART:通用异步收发器简称串口。常用的调试:移植u-boot、内核时,主要使用串口查看打印信息。也可以外接各种模块。 第二:linux系统中的串口 接下来,我们来看一下,linu…

达梦数据库的系统视图v$mem_heap

达梦数据库的系统视图v$mem_heap 达梦数据库的V$MEM_HEAP视图提供了关于内存堆的信息&#xff0c;仅当系统启动时 MEMORY_LEAK_CHECK 为 1 时有效。这个视图通常包含内存堆的使用情况&#xff0c;包括堆的大小、已使用空间、空闲空间等。通过查询V$MEM_HEAP视图&#xff0c;用…

图书馆座位再利用小程序的设计

管理员账户功能包括&#xff1a;系统首页&#xff0c;个人中心&#xff0c;用户管理&#xff0c;座位信息管理&#xff0c;座位预订管理&#xff0c;互勉信息管理&#xff0c;意见反馈管理&#xff0c;系统管理 微信端账号功能包括&#xff1a;系统首页&#xff0c;我的 开发…

[算法]第一集 递归(未完待续)

递归啊递归&#xff0c;说简单简单&#xff0c;说难难。 首先我们要知道 一、什么是递归&#xff1f; 我们再C语言和数据结构里都用了不少递归&#xff0c;这里就不多详细介绍。 递归简单来说就是函数自己调用自己的情况 二、为什么要用递归呢&#xff1f; 本质来说其实就…

GIS赋能数字经济的地理脉络

在全球数字化转型的洪流中&#xff0c;数字经济以其惊人的速度与规模&#xff0c;重塑全球经济格局&#xff0c;成为推动社会进步的关键力量。地理信息系统&#xff08;GIS&#xff09;在数字经济的浪潮中扮演着不可替代的角色&#xff0c;它不仅是数字空间信息的集大动脉&…

用户管理①

&#x1f4d1;打牌 &#xff1a; da pai ge的个人主页 &#x1f324;️个人专栏 &#xff1a; da pai ge的博客专栏 ☁️宝剑锋从磨砺出&#xff0c;梅花香自苦寒来 ☁️运维工程师的职责&#xff1a;监…

Vue.js 3.x 必修课|009|Watch API:响应式数据的侦听器(必读+实操)

欢迎关注公众号:CodeFit。 创作不易,如果你觉得这篇文章对您有帮助,请不要忘了 点赞、分享 和 关注,为我的 持续创作 提供 动力! 欢迎订阅《Vue 3.x 必修课| 2024》:http://t.csdnimg.cn/hHRrM 精品内容,物超所值(9.9 元,20+篇内容)。 1. 引言 在 Vue3 的 Composit…

基于 JWT 的模拟登录爬取实战

准备工作 1. 了解 JWT 相关知识 2. 安装 requests 库&#xff0c;并了解其基本使用 案例介绍 爬取网站&#xff1a; https://login3.scrape.center/ 用户名和密码是&#xff1a; admin 模拟登录 基于 JWT 的网站通常采用的是前后端分离式&#xff0c; 前后端的数据传输依…

【C++高阶】:自定义删除器的全面探索

✨ 我凌于山壑万里&#xff0c;一生自由随风起 &#x1f30f; &#x1f4c3;个人主页&#xff1a;island1314 &#x1f525;个人专栏&#xff1a;C学习 &#x1f680; 欢迎关注&#xff1a;&#x1f44d;点赞 &#x1f442;&am…

观测维度过大的一种ceres求解优化思路

详见 文章 这个优化如此重要&#xff0c;以至于需要单列一个文章。 使用场景&#xff1a; 比如lidar SLAM中优化点到面的距离&#xff0c;如果多个点关联到同一个面&#xff0c;那么就可以利用矩阵批量运算&#xff0c;假如有N个点&#xff0c;那么可以用一个factor来代替N个f…

【MYSQL】表操作

目录 查看当前数据库含有表查看表结构创建表插入&#xff08;新增create&#xff09;查询&#xff08;retrieve&#xff09;全列查询指定列查询查询列是表达式别名查询(as)去重查询(distinct)排序查询(order by)条件查询(where)比较/逻辑运算符使用 分页查询(limit) 一条语句各…

微服务-实现nacos的集群和Gateway网关的实现、认证校验、解决跨域

1. nacos的集群模式 1.1 分析 nacos在企业中的使用100%都是集群模式。需要掌握nacos集群的搭建 nacos的数据存放在derby本地磁盘中&#xff0c;nacos集群模式会导致数据库数据不一致&#xff0c;使用加一层思想&#xff0c;修改nacos的数据库&#xff0c;使用mysql数据库&…

kafka producer metrics

背景 做online Service埋点设计&#xff0c;塞了很多节点&#xff0c;采用了base64压缩&#xff0c;希望能监控当前消息的大小&#xff0c;防止超过threshold后无法正常发送。 kafka基本架构 producer metrics 官方文档 其中有两个参数用来表征在kafka的producer的client里&…