线程阻塞队列

news2025/1/23 7:07:19

阻塞队列

一、BlockingQueue 接口

  • BlockingQueue 是阻塞队列接口
  • 实现机制是使用两条线程,允许两个线程同时操作队列
  • 一个线程用于写入 Put ,一个线程用于读取 Take
  • 当队列中没有数据的情况下,读取线程会自动阻塞,直到有数据放入队列
  • 当队列中数据写满,写入线程被自动阻塞
  • 在保证并发的同时,提高了队列的存取效率

在这里插入图片描述

二、实现类

1、ArrayBlockingQueue (基于数组)

(1)实现原理

ArrayBlockingQueue 是一个有界队列,基于数组实现,在 ArrayBlockingQueue 内部,维护了一个定长数组,以便缓存队列中的数据对象,按照FIFO的方式排序;入队与出队的操作,使用同一个 ReentrantLock 来进行控制;

(2)源码展示
public class ArrayBlockingQueue<E> extends AbstractQueue<E>
        implements BlockingQueue<E>, java.io.Serializable {

    // ArrayBlockingQueue使用定长数组做为存储结构
    final Object[] items;
            
	/** Main lock guarding all access */
    final ReentrantLock lock;

    // 创建时传入数组容量(长度)
    public ArrayBlockingQueue(int capacity) {
        this(capacity, false);
    }
}
(3)创建自定义线程池
// 使用ArrayBlockingQueue创建自定义线程池
ExecutorService executorService = 
     new ThreadPoolExecutor(1, 2, 1000, TimeUnit.MILLISECONDS,  
                    new ArrayBlockingQueue<Runnable(10),
                            Executors.defaultThreadFactory(),
                            	new ThreadPoolExecutor.AbortPolicy());
(4)工作机制
  1. 若有新的任务需要执行时,线程池会创建新的线程,直到创建的线程数量达到corePoolSize时,则会将新的任务加入到等待工作队列中
  2. 若等待队列已满,即超过ArrayBlockingQueue有界队列的初始化容量,则继续创建线程,直到线程数量达到maximumPoolSize设置的最大线程数量
  3. 若大于maximumPoolSize,则执行拒绝策略

2、LinkedBlockingQueue(基于链表)

(1)实现原理

LinkedBlockingQueue是一个无界队列,基于单向链表结构,可以选择进行设置容量。如果不设置容量的话,最大长度为 Integer.MAX_VALUE。入队与出队的操作,使用不同ReentrantLock来进行控制,所以LinkedBlockingQueue 吞吐量通常要高于 ArrayBlockingQuene。

(2)源码展示
public class LinkedBlockingQueue<E> extends AbstractQueue<E>
        implements BlockingQueue<E>, java.io.Serializable {

    // 单向链表Node节点
	static class Node<E> {
        E item;

        Node<E> next;

        Node(E x) { item = x; }
    }
	/** Lock held by take, poll, etc */
    private final ReentrantLock takeLock = new ReentrantLock();
    
    /** Lock held by put, offer, etc */
    private final ReentrantLock putLock = new ReentrantLock();
            
    // 按照Integer.MAX_VALUE设置容量
    public LinkedBlockingQueue() {
        this(Integer.MAX_VALUE);
    }
}

FixedThreadPool、SingleThreadExecutor线程池使用LinkedBlockingQueue 队列;

  • 注意:

由于LinkedBlockingQueue是无界队列,线程池的任务队列可以无限制的添加新的任务,在这种情况下maximumPoolSize 参数是无效的,当线程池中的数量达到核心线程数时,线程数也不会增加,后续的任务会直接加到等待队列中

当使用这种任务队列模式时,一定要注意你任务提交与处理之间的协调与控制,不然会出现队列中的任务由于无法及时处理导致一直增长,直到最后资源耗尽的问题。

3、DelayedWorkQueue(基于数组)

(1)实现原理

DelayedWorkQueue是基于堆结构的延迟队列,基于数组实现,初始容量为16,leader线程用于获取堆顶元素(队列头部元素)。该队列根据指定的延迟时间从小到大排序,如果延迟时间相同,则根据插入到队列的先后排序。

static class DelayedWorkQueue extends AbstractQueue<Runnable>
        implements BlockingQueue<Runnable> {
	private static final int INITIAL_CAPACITY = 16;
    private RunnableScheduledFuture<?>[] queue = new RunnableScheduledFuture<?>[INITIAL_CAPACITY];
    private Thread leader = null;
}
  • ScheduledThreadPool线程池使用了这个队列
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
    return new ScheduledThreadPoolExecutor(corePoolSize);
}

//.....

public ScheduledThreadPoolExecutor(int corePoolSize) {
    super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
          new DelayedWorkQueue());

}

4、PriorityBlockingQueue

(1)实现原理

PriorityBlockingQueue 是一个基于优先级的无界队列(优先级的判断通过构造函数传入的Compator或元素实现Comparable接口来决定)。

**注意:**PriorityBlockingQueue并不会阻塞数据生产者,而只会在没有可消费的数据时,阻塞数据的消费者。因此使用的时候要特别注意,生产者生产数据的速度绝对不能快于消费者消费数据的速度,否则时间一长,会最终耗尽所有的可用堆内存空间。

(2)案例

每个订单使用一个线程进行支付,支付时按照订单金额的优先级。

// 订单类
public class PayOrder implements Runnable, Comparable<PayOrder> {

	private int orderNo; // 订单编号
	private BigDecimal payment; // 支付金额

	public PayOrder(int orderNo, BigDecimal payment) {
		this.orderNo = orderNo;
		this.payment = payment;
	}

	@Override
	public int compareTo(PayOrder o) {
		
		return this.payment.compareTo(o.payment);
		
	}

	@Override
	public void run() {
		System.out.printf("订单编号为%d,订单金额为:¥%.1f的订单已完成支付!【%s】\n",orderNo,payment,Thread.currentThread().getName());
	}

	public int getOrderNo() {
		return orderNo;
	}

	public void setOrderNo(int orderNo) {
		this.orderNo = orderNo;
	}

	public BigDecimal getPayment() {
		return payment;
	}

	public void setPayment(BigDecimal payment) {
		this.payment = payment;
	}
}
// 创建10个线程模拟订单支付
public class Test01 {
	
	public static void main(String[] args) {
		
		ThreadPoolExecutor pool=new ThreadPoolExecutor(2, 20, 10, TimeUnit.SECONDS, 
															new PriorityBlockingQueue<Runnable>());
		
		pool.execute(new PayOrder(1, new BigDecimal("1943")));
		pool.execute(new PayOrder(2, new BigDecimal("2000")));
		pool.execute(new PayOrder(3, new BigDecimal("4000")));
		pool.execute(new PayOrder(4, new BigDecimal("4356")));
		pool.execute(new PayOrder(5, new BigDecimal("6543")));
		pool.execute(new PayOrder(6, new BigDecimal("7433")));
		pool.execute(new PayOrder(7, new BigDecimal("234")));
		pool.execute(new PayOrder(8, new BigDecimal("1567")));
		
		pool.shutdown();		
	}

// 运行结果
订单编号为1,订单金额为:1943.0的订单已完成支付!【pool-1-thread-1】
订单编号为2,订单金额为:7894.0的订单已完成支付!【pool-1-thread-2】
订单编号为10,订单金额为:1100.0的订单已完成支付!【pool-1-thread-1】
订单编号为4,订单金额为:1353.0的订单已完成支付!【pool-1-thread-2】
订单编号为3,订单金额为:3253.0的订单已完成支付!【pool-1-thread-1】
订单编号为7,订单金额为:3574.0的订单已完成支付!【pool-1-thread-2】
订单编号为8,订单金额为:3673.0的订单已完成支付!【pool-1-thread-1】
订单编号为6,订单金额为:5430.0的订单已完成支付!【pool-1-thread-2】
订单编号为5,订单金额为:6344.0的订单已完成支付!【pool-1-thread-1】
订单编号为9,订单金额为:8653.0的订单已完成支付!【pool-1-thread-2

可以看到除了前2个任务直接创建线程执行外,其他的任务都被放入了优先任务队列,按优先级进行了重新排列执行,且线程池的线程数一直为2个(corePoolSize核心线程数)。

通过运行的代码我们可以看出PriorityBlockingQueue它其实是一个特殊的无界队列,它其中无论添加了多少个任务,线程池创建的线程数也不会超过corePoolSize的数量,只不过其他队列一般是按照先进先出的规则处理任务,而PriorityBlockingQueue队列可以自定义规则根据任务的优先级顺序先后执行。

5、SynchronousQueue

(1)实现原理

SynchronousQueue是一个同步队列,它是一个不存储元素的阻塞队列(内部没有保存元素的数据结构容器),每个插入操作必须等到另一个线程调用移除操作,否则插入操作一直处于阻塞状态,吞吐量通常要高于LinkedBlockingQueue。

CachedThreadPool线程池使用这个队列

(2)案例
public class Main {
	public static void main(String[] args) {

		// maximumPoolSize设置为2 ,拒绝策略为AbortPolic策略(直接抛出异常)
		ThreadPoolExecutor pool = new ThreadPoolExecutor(1, 2, 10, TimeUnit.MILLISECONDS,
				new SynchronousQueue<Runnable>(), 
				new ThreadPoolExecutor.AbortPolicy());

		// 执行的线程任务大于maximumPoolSize,执行拒绝策略
		for (int i = 1; i <= 3; i++) {
			pool.execute(new Runnable() {
				@Override
				public void run() {
					System.out.println(Thread.currentThread().getName() + "被执行!");
				}
			});
		}
		
		// 关闭线程池
		pool.shutdown();
	}
}

说明:

  • 当任务队列为 SynchronousQueue,创建的线程数大于 maximumPoolSize 时,直接执行了拒绝策略抛出异常。
  • 使用 SynchronousQueue 队列,提交的任务不会被保存,总是会马上提交执行。如果用于执行任务的线程数量小于 maximumPoolSize ,则尝试创建新的线程,如果达到 maximumPoolSize 设置的最大值,则根据你设置的handler执行拒绝策略。
  • 因此在使用了 SynchronousQueue 队列的线程池中,你提交的线程任务不会被存入工作队列,而是会被马上执行,在这种情况下,你需要对程序的并发量有个准确的评估,才能设置合适的 maximumPoolSize数量,否则很容易就会执行拒绝策略。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/906100.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

opencv进阶12-EigenFaces 人脸识别

EigenFaces 通常也被称为 特征脸&#xff0c;它使用主成分分析&#xff08;Principal Component Analysis&#xff0c;PCA&#xff09; 方法将高维的人脸数据处理为低维数据后&#xff08;降维&#xff09;&#xff0c;再进行数据分析和处理&#xff0c;获取识别结果。 基本原理…

蓝牙资讯|消息称富士康投资4亿美元在印度生产苹果 AirPods 耳机

根据印度最大通讯社 PTI 报道&#xff0c;苹果和富士康已经签署一项新的协议&#xff0c;富士康将投资 4 亿美元在印度第四大城市海得拉巴扩建工厂&#xff0c;负责为苹果生产 AirPods TWS 耳机。 报道称苹果已经决定在印度本土生产 AirPods 耳机&#xff0c;富士康计划投资 …

测试框架pytest教程(2)-用例依赖库-pytest-dependency

对于 pytest 的用例依赖管理&#xff0c;可以使用 pytest-dependency 插件。该插件提供了更多的依赖管理功能&#xff0c;使你能够更灵活地定义和控制测试用例之间的依赖关系。 Using pytest-dependency — pytest-dependency 0.5.1 documentation 安装 pytest-dependency 插…

ipad手写笔有必要买吗?开学便宜又好用电容笔推荐

苹果电容笔之所以能够被iPad用户广泛使用&#xff0c;很大程度上是因为其的优秀性能&#xff0c;具有着独特的重力压感功能。但苹果原装的电容笔&#xff0c;价格相对比较高&#xff0c;所以很多人&#xff0c;都选择了普通的平替电容笔。如今许多人都爱用iPad来画图或写笔记&a…

Go 数组

一、复合类型&#xff1a; 二、数组 如果要存储班级里所有学生的数学成绩&#xff0c;应该怎样存储呢&#xff1f;可能有同学说&#xff0c;通过定义变量来存储。但是&#xff0c;问题是班级有80个学生&#xff0c;那么要定义80个变量吗&#xff1f; 像以上情况&#xff0c;最…

攻防世界-command_execution

原题 解题思路 题目告诉了&#xff0c;这可以执行ping命令且没WAF&#xff0c;那就可以在ping命令后连接其他命令。 服务器一般使用Linux&#xff0c;在Linux中可使用“&”连接命令。 ping 127.0.0.1&find / -name "flag*" ping 127.0.0.1&cat /home/f…

Linux中shell脚本常用命令、条件语句与if、case语句

目录 一.shell脚本常用命令 1.1.echo命令 1.2.date命令 1.3.cal命令 1.4.tr命令 1.5.cut命令 1.6.sort命令 1.7.uniq命令 1.8.cat多行重定向 二.条件语句 2.1.条件测试&#xff08;三种测试方法&#xff09; 2.2.正整数值比较 2.3.字符串比较 2.4.逻辑测试 三.i…

深入了解 Java 中 Files 类的常用方法及抽象类的 final 修饰

文章目录 Files 类常用方法抽象类的 final 修饰 &#x1f389;欢迎来到Java学习路线专栏~深入了解 Java 中 Files 类的常用方法及抽象类的 final 修饰 ☆* o(≧▽≦)o *☆嗨~我是IT陈寒&#x1f379;✨博客主页&#xff1a;IT陈寒的博客&#x1f388;该系列文章专栏&#xff1a…

【C语言学习】二分法查找有序数组中的数

二分查找的基本原理 二分查找的基本逻辑就是每次找区间的中间数&#xff0c;然后与要查找的数进行比较&#xff0c;不断的缩小区间&#xff0c;最后区间中只剩一个数&#xff0c;即为要查找的数。如果不是&#xff0c;则没有该数。 二分查找只适用于有序数组 以数组中的数从左…

计算机视觉领域文献引用

Bag of freebies 炼丹白嫖加油包 Bag of freebies、致力于解决数据集中语义分布可能存在偏差的问题。在处理语义分布偏差问题时&#xff0c;一个非常重要的问题是不同类别之间存在数据不平衡的问题。 一、数据增强篇 Data Augmentation &#xff08;1&#xff09;图片像素调整…

安全模式进不去,解决方法在这!

“我想让电脑进入安全模式&#xff0c;但无论我怎么操作都无法进入。这是怎么回事呢&#xff1f;我怎么才能让电脑进入安全模式呢&#xff1f;请求帮助&#xff01;” 安全模式是Windows操作系统的一种启动选项&#xff0c;用于解决系统问题和故障。然而&#xff0c;有时候用户…

PON测试,“信”助力 | 信而泰测试解决方案浅析

PON介绍 一、什么是PON网络 PON是“Passive Optical Network”的缩写&#xff0c;是一种基于光纤的网络技术。PON网络通过单向的光信号传输来实现数据、语音和视频等信息的传输。PON网络可以支持多个传输速率和距离要求&#xff0c;因此广泛应用于FTTH、FTTB&#xff08;Fibe…

Three.js 实现模型分解,拆解效果

原理&#xff1a;通过修改模型材质的 x,y,z 轴坐标 positon.set( x,y,z) 来实现拆解&#xff0c;分解的效果。 注意&#xff1a;支持模型材质position 修改的材质类型为 type“Mesh” ,其他类型的材质修改了position 可能没有实际效果 在上一篇 Three.js加载外部glb,fbx,gltf…

金融市场中的机器学习;快手推出自研语言模型“快意”

&#x1f989; AI新闻 &#x1f680; OpenAI可能面临《纽约时报》的起诉&#xff0c;侵犯知识产权引发争议 摘要&#xff1a;OpenAI使用《纽约时报》的文章和图片来训练AI模型&#xff0c;违反了《纽约时报》的服务条款&#xff0c;可能面临巨大损失。此前&#xff0c;也有其…

冠达管理:定增获批后会大涨吗?

近年来&#xff0c;跟着我国资本商场的稳步发展&#xff0c;定向增发&#xff08;定增&#xff09;已成为不少上市公司的一种重要融资方法&#xff0c;其比较于揭露发行股票&#xff0c;更能够满足少量出资者的融资需求。然而&#xff0c;对于很多出资者来说&#xff0c;一个问…

游戏服务端性能测试

导语&#xff1a;近期经历了一系列的性能测试&#xff0c;涵盖了Web服务器和游戏服务器的领域。在这篇文章中&#xff0c;我将会对游戏服务端所做的测试进行详细整理和记录。需要注意的是&#xff0c;本文着重于记录&#xff0c;而并非深入的编程讨论。在这里&#xff0c;我将与…

Visual Studio 2022 你必须知道的实用调试技巧

目录 1、什么是bug&#xff1f; 2.调试是什么&#xff1f;有多重要&#xff1f; 2.1我们是如何写代码的&#xff1f; 2.2又是如何排查出现的问题的呢&#xff1f; ​编辑 2.3 调试是什么&#xff1f; 2.4调试的基本步骤 2.5Debug和Release的介绍 3.Windows环境调试介绍…

spark第四课

countByValue 数据源中相同的值有多少个,也就是WordCount countByKey 表的是键值对中的key出现了几次,与Value的值无关 不推荐collect,因为他是将数据放入内存,但是内存不够大的话,就容易崩,所以使用saveAsTextFile更好,直接放入磁盘. 保存成对象文件,需要序列化 启动了2个 J…

前端PWA应用的相关知识和基础Demo

一、什么是PWA应用&#xff1f; 1、PWA简介 ​ 渐进式Web应用&#xff08;Progressive Web App&#xff09;&#xff0c;简称PWA&#xff0c;是 Google 在 2015 年提出的一种使用web平台技术构建的应用程序&#xff0c;官方认为其核心在于Reliable&#xff08;可靠的&#xf…

修改窗口类的属性将影响所有该类的窗口

正如标题所指出的&#xff1a;窗口类的属性将影响所有由该窗口类创建的窗口。这就是它为什么称之为窗口类的原因。 我用这么多话来强调这一点&#xff0c;似乎是一件显而易见的事情&#xff0c;但我看到许多”解决方案”忽视了这个简单的事实。 在 WNDCLASS&#xff08;或 WND…