深入理解Semaphore

news2024/9/23 19:26:53

Semaphore(信号量)是操作系统中PV操作的原语在java中的实现,它也是基于AQS实现的。其中PV操作是操作系统中一种实现进程互斥与同步的有效方法。PV操作与信号量(S)的处理有关,P表示通过,V表示释放。用PV操作来管理共享资源时,首先要确保PV操作自身执行的正确性。

P操作的主要动作如下:

  1. S减1;
  2. 若S减1后仍大于等于0,则进程继续执行;
  3. 若S减1后小于0,则该进程被阻塞后放入等待该信号量的等待队列中,转进程调度。

V操作的主要动作如下:

  1. S加1;
  2. 若相加后结果大于0,则进程继续执行;
  3. 若相加后结果小于等于0,则从该信号的等待队列中释放一个等待进程,再返回原进程继续执行或转进程调度。

ReentrantLock是AQS的独占锁实现,Semaphore是AQS的共享锁实现。Semaphore通过设置资源数量可以实现限流的功能,即控制同时只能有n个线程获取信号量。AQS的state对Semaphore来说可以是共享资源的数量,也可以是许可证的数量。当state>0时线程可以获得许可证继续执行,state-1;当state=0时线程不能获得许可证进入同步等待队列,阻塞直到被唤醒。

Semaphore有两个构造函数,如下:

//传入许可证数量
public Semaphore(int permits) {
    sync = new NonfairSync(permits);
}
//传入共享资源数量和是否公平锁,默认是非公平锁
public Semaphore(int permits, boolean fair) {
    sync = fair ? new FairSync(permits) : new NonfairSync(permits);
}

其常用方法和方法作用如下:

  • public void acquire() throws InterruptedException:尝试获取锁,如果获取失败则阻塞;
  • public boolean tryAcquire():尝试获取锁,如果获取失败则直接返回false,不会阻塞;
  • public void release():释放许可;
  • public int availablePermits():获取可用的许可证数;
  • public final int getQueueLength():返回正在等待获取许可证的线程数;
  • public final boolean hasQueuedThreads():是否有线程正在等待许可证;
  • protected void reducePermits(int reduction):减少reduction个许可证;
  • public final Collection<Thread> getQueuedThreads():获取等待许可证的线程集合。

Semaphore使用示例如下,初始化一个许可证数量为3的信号量,doSomething()方法每次都需要获取一个信号量才能执行,执行时间为2S,因此即便是main方法中的线程池每秒执行5次doSomething()方法,最终的效果仍然是每两秒执行三次doSomething()方法,这就达到了限流的目的。

@Slf4j
public class SemaphoreExample {

    //定义一个许可证数量为3的信号量
    private static Semaphore semaphore = new Semaphore(3);

    //线程池
    private static ThreadPoolExecutor executor = new ThreadPoolExecutor(5,10,30, TimeUnit.SECONDS,new ArrayBlockingQueue<>(50));

    public static void main(String[] args) throws InterruptedException {
        while (true) {
            executor.execute(() -> doSomething());

            //每秒执行5次
            Thread.sleep(200);
        }
    }

    private static void doSomething() {
        try {
            //尝试获取一个许可证
            semaphore.acquire(1);
            log.info("正在执行...");
            Thread.sleep(2000);
        } catch (InterruptedException e) {
            log.error("操作失败",e);
        } finally {
            //释放许可
            semaphore.release();
        }
    }
}

控制台打印如下,可以看到大概每两秒才会执行三次doSomething()方法,这就是由信号量Semaphore来控制的。

17:42:54.199 [pool-1-thread-1] INFO com.company.charles.multithreading.SemaphoreExample - 正在执行...
17:42:54.409 [pool-1-thread-2] INFO com.company.charles.multithreading.SemaphoreExample - 正在执行...
17:42:54.609 [pool-1-thread-3] INFO com.company.charles.multithreading.SemaphoreExample - 正在执行...
17:42:56.203 [pool-1-thread-4] INFO com.company.charles.multithreading.SemaphoreExample - 正在执行...
17:42:56.421 [pool-1-thread-2] INFO com.company.charles.multithreading.SemaphoreExample - 正在执行...
17:42:56.622 [pool-1-thread-5] INFO com.company.charles.multithreading.SemaphoreExample - 正在执行...
17:42:58.204 [pool-1-thread-1] INFO com.company.charles.multithreading.SemaphoreExample - 正在执行...
17:42:58.421 [pool-1-thread-3] INFO com.company.charles.multithreading.SemaphoreExample - 正在执行...
17:42:58.623 [pool-1-thread-5] INFO com.company.charles.multithreading.SemaphoreExample - 正在执行...
17:43:00.214 [pool-1-thread-1] INFO com.company.charles.multithreading.SemaphoreExample - 正在执行...
17:43:00.427 [pool-1-thread-4] INFO com.company.charles.multithreading.SemaphoreExample - 正在执行...
17:43:00.626 [pool-1-thread-5] INFO com.company.charles.multithreading.SemaphoreExample - 正在执行...

源码解析

Semaphore的AQS实现也区分公平和非公平,由于这两种锁的区别很小,此处只介绍较常用的也是默认的非公平锁的实现。

acquire(permits)获取许可证

  1. Semaphore获取许可证的方法是acquire(permits),实现如下:
public void acquire(int permits) throws InterruptedException {
    if (permits < 0) throw new IllegalArgumentException();
    sync.acquireSharedInterruptibly(permits);
}
  1. acquireSharedInterruptibly(permits)方法是AQS中定义的共享锁获取锁的通用方法,实现如下:
public final void acquireSharedInterruptibly(int arg) throws InterruptedException {
	if (Thread.interrupted())
        throw new InterruptedException();
    if (tryAcquireShared(arg) < 0)
        doAcquireSharedInterruptibly(arg);
}
  1. tryAcquireShared(arg)是提供给子类实现的模版方法,该方法在Semaphore中的实现如下:
protected int tryAcquireShared(int acquires) {
    return nonfairTryAcquireShared(acquires);
}

final int nonfairTryAcquireShared(int acquires) {
    for (;;) {
        //获取state许可证数量
        int available = getState();
        int remaining = available - acquires;
        //remaining小于0,表示获取许可证失败,返回一个负值
        //remaining大于等于0,表示剩余许可证是足够的,使用CAS尝试修改state许可证数量,如果获取失败则重复获取直到获取成功,返回一个大于等于0的值
        if (remaining < 0 ||
            compareAndSetState(available, remaining))
            return remaining;
    }
}
  1. 在第2步中,如果tryAcquireShared(arg)方法返回值不小于0,则表示当前线程使用CAS获取许可证成功,否则获取失败调用doAcquireSharedInterruptibly(arg)方法进入同步等待队列阻塞,doAcquireSharedInterruptibly(arg)方法实现如下:
private void doAcquireSharedInterruptibly(int arg)
    throws InterruptedException {
    //将当前线程封装成一个Node对象
    final Node node = addWaiter(Node.SHARED);
    boolean failed = true;
    try {
        for (;;) {
            final Node p = node.predecessor();
            //如果当前节点的的前一个节点是head,则尝试获取许可证
            if (p == head) {
                int r = tryAcquireShared(arg);
                if (r >= 0) {
                    //获取许可证成功则设置当前节点为head节点
                    setHeadAndPropagate(node, r);
                    p.next = null; // help GC
                    failed = false;
                    return;
                }
            }
            //如果当前节点不是head的下一个节点,则直接阻塞
            if (shouldParkAfterFailedAcquire(p, node) &&
                 parkAndCheckInterrupt())
                throw new InterruptedException();
        }
    } finally {
        if (failed)
             cancelAcquire(node);
    }
}

acquire(permits)获取许可证方法的流程总结如下:

  1. 调用Semaphore实现的tryAcquireShared(arg)方法尝试获取许可证;
  2. 如果当前许可证数量足够(即state-要获取的许可证数量>=0),则循环调用CAS尝试修改state获取许可证,直到获取成功直接返回或者许可证被其他线程获取导致数量不够;
  3. 如果当前许可证数量不够(即state-要获取的许可证数量<0),则将当前线程封装到Node对象并添加到同步队列中;
  4. 判断当前节点是否是head节点的下一个节点,如果是则尝试获取许可证、出队head节点、设置当前Node为head节点并返回;如果不是,则调用LockSupport.park()方法阻塞当前线程。

release(permits)释放许可证

  1. release(permits)方法的实现如下:
public void release(int permits) {
    if (permits < 0) throw new IllegalArgumentException();
    sync.releaseShared(permits);
}
  1. releaseShared(permits)是AQS中实现的共享锁释放锁的通用方法,实现如下:
public final boolean releaseShared(int arg) {
    if (tryReleaseShared(arg)) {
        doReleaseShared();
        return true;
    }
    return false;
}
  1. tryReleaseShared(arg)是AQS定义的给子类实现的模版方法,该方法在Semaphore的实现如下:
protected final boolean tryReleaseShared(int releases) {
    for (;;) {
        //获取许可证数量
        int current = getState();
        int next = current + releases;
        //next < current表示要释放的许可证releases<0,抛出异常
        if (next < current) // overflow
            throw new Error("Maximum permit count exceeded");
        //使用CAS修改许可证数量,如果失败则重复调用,直到修改成功返回true
        if (compareAndSetState(current, next))
            return true;
    }
}
  1. tryReleaseShared(arg)返回true后,调用doReleaseShared()方法,该方法在AQS中实现,具体实现如下:
private void doReleaseShared() {
    //遍历唤醒同步等待队列节点,释放许可证
    for (;;) {
        Node h = head;
        if (h != null && h != tail) {
            int ws = h.waitStatus;
            if (ws == Node.SIGNAL) {
                if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                    continue;            // loop to recheck cases
                unparkSuccessor(h);
        	}
            else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                continue;                // loop on failed CAS
        }
        //前面如果没有节点被唤醒,则h仍然指向head,表示许可证已经释放完成或者许可证数量已经不够了,直接返回
        if (h == head)                   // loop if head changed
            break;
    }
}

private void unparkSuccessor(Node node) {
    int ws = node.waitStatus;
    if (ws < 0)
        compareAndSetWaitStatus(node, ws, 0);
    Node s = node.next;
    if (s == null || s.waitStatus > 0) {
        s = null;
        for (Node t = tail; t != null && t != node; t = t.prev)
            if (t.waitStatus <= 0)
                s = t;
    }
    if (s != null)
        LockSupport.unpark(s.thread);
}

release(permits)方法释放许可证的流程总结如下:

  1. 调用tryReleaseShared(arg)方法尝试释放许可证,如果释放失败则一直调用CAS修改state许可证数量,直到成功返回true;
  2. 释放成功后,唤醒同步等待队列中的head的下一个节点;
  3. head的下一个节点被唤醒后,会继续执行doAcquireSharedInterruptibly()方法中的循环语句尝试获取许可证;
  4. 另外,此处会一直唤醒同步等待队列中的节点,直到同步等待队列节点为空或者许可证数量不够。

需要注意的是,tryReleaseShared(arg)方法释放许可证方法并没有判断许可证上限,例如定义了信号量的许可证数量为1,直接调用release()方法,在tryReleaseShared()方法中调用CAS是可以修改成功的,这里直接修改的是AQS中的state,因此先调用release()方法会影响限流效果。

例如下面初始化Semaphore的许可证数量为1,调用10次release()方法后,许可证数量变成了11。

public static void main(String[] args) throws InterruptedException {
    Semaphore semaphore = new Semaphore(1);
    for (int i=0;i<10;i++) {
        semaphore.release(1);
    }
    System.out.println(semaphore.availablePermits());
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/908483.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

2023.8 - java - 泛型

泛型问题的引出&#xff1a; jdk 1.5 引出泛型 // package 泛型; public class index {public static void main (String[] args){test t new test();t.setContent("aaa");int a (int) t.getContent();System.out.println(a);} }class test{Object content;publi…

分享图片 | 快速浏览网页资源,批量保存、一键分享图片

前言 小伙伴学习吉他&#xff0c;有时需要在互联网搜索曲谱资源&#xff0c;而多数曲谱均为图片&#xff0c;并且为多页&#xff0c;在电脑上显示练习很不方便&#xff0c;需要停下来点击鼠标进行翻页&#xff0c;影响练习的连贯性。 为了解决上述问题&#xff0c;通常把图片…

【数据分析入门】Jupyter Notebook

目录 一、保存/加载二、适用多种编程语言三、编写代码与文本3.1 编辑单元格3.2 插入单元格3.3 运行单元格3.4 查看单元格 四、Widgets五、帮助 Jupyter Notebook是基于网页的用于交互计算的应用程序。其可被应用于全过程计算&#xff1a;开发、文档编写、运行代码和展示结果。 …

产品流程图是什么?怎么做?

产品流程图是什么&#xff1f; 产品流程图是一种图形化的表达方式&#xff0c;用于描述产品开发、制造、销售、使用等各个阶段中涉及的流程、步骤和关系。它通过图形符号、箭头、文本等元素&#xff0c;展示了产品的各个环节之间的关联和顺序&#xff0c;通常被用于可视化产…

IT项目即将上线:项目经理的前夜清单

在IT项目的生命周期中&#xff0c;投产前的准备是至关重要的。作为项目经理&#xff0c;你需要确保所有的细节都已经准备好&#xff0c;以确保项目的顺利上线。以下是一份详细的清单&#xff0c;帮助项目经理在项目投产前进行全面的准备。 1. 项目的回顾 在项目即将上线之前&…

stm32的命令规则

stm32型号的说明&#xff1a;以STM32F103RBT6这个型号的芯片为例&#xff0c;该型号的组成为7个部分&#xff0c;其命名规则如下&#xff1a;

前端(十三)——JavaScript 闭包的奥秘与高级用法探索

&#x1f636;博主&#xff1a;小猫娃来啦 &#x1f636;文章核心&#xff1a;深入理解 JavaScript 中的闭包 文章目录 不理解闭包&#xff1f;这玩意很难&#xff1f;闭包的定义与原理闭包是什么创建一个闭包 闭包的应用场景闭包与作用域闭包与作用域之间的关系全局作用域、函…

python之Pandas

1.Pandas简介 Pandas 是 Python 语言的一个扩展程序库&#xff0c;用于数据分析。 Pandas 名字衍生自术语 “panel data”&#xff08;面板数据&#xff09;和 “Python data analysis”&#xff08;Python 数据分析&#xff09;。 Pandas 一个强大的分析结构化数据的工具集…

苹果手机桌面APP带云图标有个箭头,过一段时间经常要下载才能使用APP

环境&#xff1a; IPhone 11 IOS13.0 问题描述&#xff1a; 苹果手机桌面APP带云图标有个箭头&#xff0c;过一段时间经常要下载才能使用APP 解决方案&#xff1a; 1.打开设置&#xff0c;往下找到iTunes Store与App Store 2.找到下面卸载未使用的APP 关闭按钮

记录几个Hudi Flink使用问题及解决方法

前言 如题&#xff0c;记录几个Hudi Flink使用问题&#xff0c;学习和使用Hudi Flink有一段时间&#xff0c;虽然目前用的还不够深入&#xff0c;但是目前也遇到了几个问题&#xff0c;现在将遇到的这几个问题以及解决方式记录一下 版本 Flink 1.15.4Hudi 0.13.0 流写 流写…

一百六十三、Kettle——Linux上安装Kettle9.2(亲测有效,附截图)

一、目的 由于之前发现kettle8.2和kettle9.3这两个版本&#xff0c;或多或少的存在问题 比如kettle8.2的本地服务没问题&#xff0c;但在Linux上创建共享资源库时就有问题&#xff1b; 比如kettle9.3由于不自带shims驱动包&#xff0c;目前在新的下载官网上无法找到下载路径…

Gitbook超详细使用教程,搭建属于你自己的博客!

文章目录 简介与github同步1.创建space2.安装github插件3.同步github4.生成space的url 博客搭建指南1.自定义域名2.发表博客内容3.设置域名默认页面4.界面设置注意事项 End 简介 Gitbook 是一个平台&#xff0c;允许用户创建和分享内容丰富的在线书籍。它有一个用户友好的界面…

JDK JRE JVM 三者之间的详解

JDK : Java Development Kit JRE: Java Runtime Environment JVM : JAVA Virtual Machine JDK : Java Development Kit JDK : Java Development Kit【 Java开发者工具】&#xff0c;可以从上图可以看出&#xff0c;JDK包含JRE&#xff1b;java自己的一些开发工具中&#…

SpringBootWeb案例 Part 2

3. 员工管理 完成了部门管理的功能开发之后&#xff0c;我们进入到下一环节员工管理功能的开发。 基于以上原型&#xff0c;我们可以把员工管理功能分为&#xff1a; 分页查询 带条件的分页查询 删除员工 新增员工 修改员工 那下面我们就先从分页查询功能开始学习。 3.…

内存分布(以及new,delete)

今天给大家说下内存分布&#xff0c;我们都知道的是&#xff0c;像局部变量都在栈区&#xff0c;但是像我们自己有时候申请的空间都在堆区&#xff0c;当然&#xff0c;内存分布不只只是栈区和堆区&#xff0c;还有常量区&#xff0c;代码区等等。如下图&#xff1a; 这就是内存…

项目实战笔记4:敏捷

术语介绍 敏捷项目管理是一种以快速响应变化为核心的项目管理方法。与传统的瀑布模型不同&#xff0c;敏捷方法强调迭代开发和紧密的团队合作。其目的是尽可能快地交付可用的产品&#xff0c;然后在客户和团队之间进行反馈和迭代&#xff0c;以不断优化产品和开发过程。 在敏捷…

电商转化率是什么意思,怎么计算和提高电商转化率?

电商转化率是指访问电商网站的用户中&#xff0c;实际完成购买行为的比例。它可以衡量电商网站的销售能力和用户转化效果&#xff0c;是衡量电商运营效果的重要指标之一。 一、电商转化率的计算公式 电商转化率的计算公式为&#xff1a;转化率完成购买的用户数/访问网站的用户…

PyTorch DataLoader 报错 “DataLoader worker exited unexpectedly“ 的解决方案

注意&#xff1a;博主没有重写d2l的源代码文件&#xff0c;而是创建了一个新的python文件&#xff0c;并重写了该方法。 一、代码运行日志 C:\Users\Administrator\anaconda3\envs\limu\python.exe G:/PyCharmProjects/limu-d2l/ch03/softmax_regression.py Traceback (most r…

Python Opencv实践 - 图像中值滤波

import cv2 as cv import numpy as np import matplotlib.pyplot as pltimg cv.imread("../SampleImages/pomeranian.png", cv.IMREAD_COLOR) print(img.shape) pixel_count img.shape[0] * img.shape[1] print(pixel_count)#为图像添加椒盐噪声 #参考资料&#xf…

Java后端开发面试题——框架篇

Spring框架中的bean是单例的吗&#xff1f;Spring框架中的单例bean是线程安全的吗&#xff1f; singleton : bean在每个Spring IOC容器中只有一个实例。 prototype&#xff1a;一个bean的定义可以有多个实例。 Spring bean并没有可变的状态(比如Service类和DAO类)&#xff0c…