BolckingQueue

news2024/11/13 13:13:35
队列

队列的特点先进先出(FIFO)。
如图: 进入队列的顺序是1,2,3,那么出队列的顺序只能是1,2,3,不可能是其他顺序,这是由队列的特点保证的。
在这里插入图片描述
保存数据的基本数据结构有数组链表,基于此,实现队列分为数组队列(ArrayQueue)和链表队列(LinkedQueue)。

BolckingQueue分类

从名称可知 Blocking Queue 阻塞队列,具体可分为数组阻塞队列和链表阻塞队列。
在这里插入图片描述
ArrayBlockingQueue源码:

public class ArrayBlockingQueue<E> extends AbstractQueue<E>
        implements BlockingQueue<E>, java.io.Serializable {

		//存储元素的数组
		final Object[] items;
		
		//锁  因此是多线程安全
		final ReentrantLock lock;
	
		//条件队列
		private final Condition notEmpty;
		private final Condition notFull;
	
		...
}

从ArrayBlockingQueue 源码中不难看出,使用Object数组存储元素,使用ReentrantLock 保证多线程操作安全,
阻塞功能通过“条件锁”Condition 实现。

LinkedBlockingQueue源码:

public class LinkedBlockingQueue<E> extends AbstractQueue<E>
        implements BlockingQueue<E>, java.io.Serializable {
		
		//节点定义
		static class Node<E> {
	        E item;
			//只有尾指针 --> 单线链表
	        Node<E> next;
	        Node(E x) { item = x; }
	    }
	
		//指向头节点
		transient Node<E> head;
		
		//尾节点
		private transient Node<E> last;
	
		private final ReentrantLock takeLock = new ReentrantLock();
	
		private final Condition notEmpty = takeLock.newCondition();
		private final ReentrantLock putLock = new ReentrantLock();
		private final Condition notFull = putLock.newCondition();
	
		...
}

从LinkedBlockingQueue源码中不难看出,使用单项链表存储元素,使用ReentrantLock 保证多线程操作安全,
阻塞功能通过“条件锁”Condition 实现。

BolckingQueue 常用方法

首先初始化一个容量为5的队列:

BlockingQueue<String> blockingQueue = new ArrayBlockingQueue<>(5);
1、add(E e);

函数说明:向队列中添加一个元素,当队列为空时抛NPE异常,队列满时,抛IllegalStateException异常。
返回true 表示添加元素成功,其他返回值均不成功;
该方法不会阻塞当前线程往下执行!

 for (int i = 0; i < 8; i++) {
   try {
         System.out.println(blockingQueue.add("aaa"));
     } catch (Exception e) {

     }
     System.out.println("继续执行....");
 }

结果:

true
继续执行…
true
继续执行…
true
继续执行…
true
继续执行…
true
继续执行…
继续执行… //这里此时队列元素已满,会抛出异常,由于对异常进行了处理,因此可以继续往下执行
继续执行…
继续执行…

add(E e)底层调用offer(E e)。

public boolean add(E e) {
   if (offer(e))
        return true;
    else
        throw new IllegalStateException("Queue full");
}
2、offer(E e)

向队列中添加元素, 若队列为空,抛NPE异常;
true:添加元素成功;
false: 添加元素失败
不会阻塞当前线程执行

for (int i = 0; i < 8; i++) {
   System.out.println(blockingQueue.offer("aaa"));
    System.out.println("插入操作  继续执行...");
}

结果:

true
插入操作 继续执行…
true
插入操作 继续执行…
true
插入操作 继续执行…
true
插入操作 继续执行…
true
插入操作 继续执行…
false ------------------------------------------队列满后,插入失败
插入操作 继续执行…
false
插入操作 继续执行…
false
插入操作 继续执行…

 public boolean offer(E e) {
 		//元素非空判断
        checkNotNull(e);
        final ReentrantLock lock = this.lock;
        //获取互斥锁  因此是线程安全的
        lock.lock();
        try {
            //队列已满
            if (count == items.length)
                return false;
            else {
                //插入队列
                enqueue(e);
                return true;
            }
        } finally {
        	//释放锁
            lock.unlock();
        }
    }

	//插入队列
    private void enqueue(E x) {
        // assert lock.getHoldCount() == 1;
        // assert items[putIndex] == null;
        final Object[] items = this.items;
        items[putIndex] = x;
        if (++putIndex == items.length)
            putIndex = 0;
        count++;
        //元素插入成功后,唤醒条件队列中的线程! ********
        notEmpty.signal();
    }
2、offer(E e, long timeout, TimeUnit unit)

在指定时间内成功插入元素返回true, 失败返回false;
与offer(E e)的不同点在于,offer(E e)只插入一次,成功或者失败立即返回;
offer(E e, long timeout, TimeUnit unit) 先判断队列是否已满,若已满,则等待一定时间后再尝试插入元素。
不会阻塞当前线程执行

for (int i = 0; i < 10; i++) {
     System.out.println(blockingQueue.offer("aaa",3, TimeUnit.SECONDS));
     System.out.println("继续执行...");
 }

结果:

true
继续执行…
true
继续执行…
true
继续执行…
true
继续执行…
true
继续执行…
false
继续执行…
false
继续执行…
false
继续执行…
false
继续执行…
false
继续执行…

public boolean offer(E e, long timeout, TimeUnit unit)
        throws InterruptedException {
        checkNotNull(e);
        long nanos = unit.toNanos(timeout);
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            /**
             * 如果此时队列已满
             * 等待一段时间再操作
             */
            while (count == items.length) {
                if (nanos <= 0)
                    return false;
                //处理等待时间的操作
                nanos = notFull.awaitNanos(nanos);
            }
            
            //元素插入到队列中
            enqueue(e);
            return true;
        } finally {
            lock.unlock();
        }
    }
3、put(E e)

向队列中放入元素;
会阻塞当前线程执行


for (int i = 0; i < 10; i++) {
    blockingQueue.put("aaa");
    System.out.println("继续执行...");
}

结果:

继续执行…
继续执行…
继续执行…
继续执行…
继续执行…
程序在此卡住不再执行…

public void put(E e) throws InterruptedException {
  checkNotNull(e);
    final ReentrantLock lock = this.lock;

    lock.lockInterruptibly();
    try {
        /**
         * 如果此时队列已满,当前线程加入到条件队列中(进行阻塞)
         */
        while (count == items.length)
            notFull.await();
        //当前队列未满,元素进队
        enqueue(e);
    } finally {
        lock.unlock();
    }
}

4、take()

获取队首元素,若元素为空,
则阻塞等待

for (int i = 0; i < 10; i++) {
    System.out.println(blockingQueue.take());
    System.out.println("获取操作  继续执行...");
}
 public E take() throws InterruptedException {
    final ReentrantLock lock = this.lock;
     lock.lockInterruptibly();
     try {
         //队列为空
         while (count == 0)
             //阻塞等待
             notEmpty.await();
         return dequeue();
     } finally {
         lock.unlock();
     }
 }
5、poll()

获取队首元素,如果此时队列为空,返回null;
不会阻塞当前线程执行!

for (int i = 0; i < 10; i++) {
    System.out.println(blockingQueue.poll());
    System.out.println("获取操作  继续执行...");
}
public E poll() {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        return (count == 0) ? null : dequeue();
    } finally {
        lock.unlock();
    }
}
6、poll(long timeout, TimeUnit unit)

同poll(); 只不过当队列为空时,等待一定时间再获取队首元素

 for (int i = 0; i < 10; i++) {
   System.out.println(blockingQueue.poll(3, TimeUnit.SECONDS));
     System.out.println("获取操作  继续执行...");
 }
public E poll(long timeout, TimeUnit unit) throws InterruptedException {
  long nanos = unit.toNanos(timeout);
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    try {
        //队列为空
        while (count == 0) {
            if (nanos <= 0)
                return null;
            nanos = notEmpty.awaitNanos(nanos);
        }
        //获取队首元素
        return dequeue();
    } finally {
        lock.unlock();
    }
}
7、drainTo(Collection<? super E> c);

获取队列中的全部元素,保存到指定的集合中,结果返回元素个数。
不会阻塞当前线程执行

  List<String> list = new ArrayList<>();
  int i = blockingQueue.drainTo(list);
  System.out.println(i);
  list.stream().forEach(str -> {
       System.out.println(str);
   });
8、drainTo(Collection<? super E> c, int maxElements);

从队列中获取指定数量的元素,保存在给定的集合中

 List<String> list = new ArrayList<>();
 int i = blockingQueue.drainTo(list, 3);
 System.out.println(i);
 list.stream().forEach(str -> {
     System.out.println(str);
 });

OK,对上述所有方法做个总结可如下图所示:
在这里插入图片描述
存/取 搭配使用。

使用BolckingQueue实现生产者消费者模式
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;

public class ProducerConsumerExample {
    private static final int QUEUE_CAPACITY = 5; // 队列容量
    private static final int TOTAL_ITEMS = 10; // 生产和消费的总数量

    public static void main(String[] args) {
        // 创建一个大小为 QUEUE_CAPACITY 的阻塞队列
        BlockingQueue<String> queue = new ArrayBlockingQueue<>(QUEUE_CAPACITY);

        // 创建生产者线程
        Thread producer = new Thread(() -> {
            try {
                for (int i = 0; i < TOTAL_ITEMS; i++) {
                    String item = "Item " + i;
                    queue.put(item); // 如果队列已满,生产者线程会被阻塞
                    System.out.println("Produced: " + item);
                }
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                System.err.println("Producer interrupted");
            }
        });

        // 创建消费者线程
        Thread consumer = new Thread(() -> {
            try {
                for (int i = 0; i < TOTAL_ITEMS; i++) {
                    String item = queue.take(); // 如果队列为空,消费者线程会被阻塞
                    System.out.println("Consumed: " + item);
                }
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                System.err.println("Consumer interrupted");
            }
        });

        // 启动生产者和消费者线程
        producer.start();
        consumer.start();

        // 等待线程结束
        try {
            producer.join();
            consumer.join();
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            System.err.println("Main thread interrupted");
        }

        System.out.println("Production and consumption completed.");
    }
}

在这里插入图片描述

ArrayBlockingQueue是在哪一步唤醒等待条件的线程的 ?
唤醒生产者线程:

//获取队头元素
private E dequeue() {
    // assert lock.getHoldCount() == 1;
     // assert items[takeIndex] != null;
     final Object[] items = this.items;
     @SuppressWarnings("unchecked")
     E x = (E) items[takeIndex];
     items[takeIndex] = null;
     if (++takeIndex == items.length)
         takeIndex = 0;
     count--;
     if (itrs != null)
         itrs.elementDequeued();
     //唤醒生产者线程 ****************
     notFull.signal();
     return x;
 }

生产者放入元素后会唤醒消费者

    private void enqueue(E x) {
        // assert lock.getHoldCount() == 1;
        // assert items[putIndex] == null;
        final Object[] items = this.items;
        items[putIndex] = x;
        if (++putIndex == items.length)
            putIndex = 0;
        count++;

        //唤醒消费者
        notEmpty.signal();
    }

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2140409.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

彻底理解浅拷贝和深拷贝

目录 浅拷贝实现 深拷贝实现自己手写 浅拷贝 浅拷贝是指创建一个新对象&#xff0c;这个对象具有原对象属性的精确副本 基本数据类型&#xff08;如字符串、数字等&#xff09;&#xff0c;在浅拷贝过程中它们是通过值传递的&#xff0c;而不是引用传递&#xff0c;修改值并不…

基于yolov8的茶叶病害检测系统python源码+onnx模型+评估指标曲线+精美GUI界面

【算法介绍】 基于YOLOv8的茶叶病害检测系统&#xff0c;是利用深度学习技术&#xff0c;特别是YOLOv8这一先进的目标检测算法&#xff0c;来精准识别和监测茶叶生长过程中出现的各种病害。该系统通过无人机、地面机器人或固定摄像头等设备&#xff0c;定期采集茶园的高分辨率…

力扣刷题(6)

两数之和 II - 输入有序数组 两数之和 II - 输入有序数组-力扣 思路&#xff1a; 因为该数组是非递减顺序排列&#xff0c;因此可以设两个左右下标当左右下标的数相加大于target时&#xff0c;则表示右下标的数字过大&#xff0c;因此将右下标 - -当左右下标的数相加小于targ…

??Ansible——ad-hoc

文章目录 一、ad-hoc介绍二、ad-hoc的使用1、语法2、ad-hoc常用模块1&#xff09;shell模块2&#xff09;command模块3&#xff09;script模块4&#xff09;file模块5&#xff09;copy模块6&#xff09;yum模块7&#xff09;yum-repository模块8&#xff09;service模块9&#…

优化算法(一)—遗传算法(Genetic Algorithm)附MATLAB程序

遗传算法&#xff08;Genetic Algorithm, GA&#xff09;是一种启发式搜索算法&#xff0c;用于寻找复杂优化问题的近似解。它模拟了自然选择和遗传学中的进化过程&#xff0c;主要用于解决那些传统算法难以处理的问题。 遗传算法的基本步骤&#xff1a; 初始化种群&#xff0…

【GO语言】Go语言详解与应用场景分析,与Java的对比及优缺点

Go is an open source programming language that makes it easy to build simple, reliable, and efficient software. Go是一种开源编程语言&#xff0c;可以轻松构建简单、可靠和高效的软件。 文章目录 一、引言二、Go语言详解1. 简史2. 特点3. 核心库 三、应用场景四、与Ja…

comfyui中,sam detector与yoloworld图像分割算法测试以及影响

&#x1f356;背景 图像处理中&#xff0c;经常会用到图像分割&#xff0c;在默认的comfyui图像加载中就有一个sam detector的功能&#xff0c;yoloworld是前一段时间公开的一个更强大的图像分割算法&#xff0c;那么这两个差别大吗&#xff1f;在实际应用中有什么区别吗&…

普推知产:明知商标驳回也要去申请注册!

有个去年加的网友让普推知产商标老杨看在32类申请如何&#xff0c;去年是把33类的申请复审下来&#xff0c;这个网友想的名称都是存在已存在的商标名称&#xff0c;直接都是申请不下来的&#xff0c;需要申请和再加驳回复审。 去年那个在33类的名称&#xff0c;当时查过只有一个…

函数(下)

static 代码1的test函数中的局部变量i是每次进⼊test函数先创建变量&#xff08;⽣命周期开始&#xff09;并赋值为0&#xff0c;然后 &#xff0c;再打印&#xff0c;出函数的时候变量⽣命周期将要结束&#xff08;释放内存&#xff09;。 代码2中&#xff0c;我们从输出结果…

论文阅读-Demystifying Misconceptions in Social Bots Research

论文链接&#xff1a; https://arxiv.org/pdf/2303.17251 目录 摘要: Introduction Methodological issues Information leakage Cherry-picking&#xff08;采摘樱桃&#xff09; Straw-man methodology &#xff08;稻草人&#xff09; Data biases Conceptual issu…

Spring高手之路23——AOP触发机制与代理逻辑的执行

文章目录 1. 从整体视角学习Bean是如何被AOP代理的2. AOP代理的触发机制2.1 postProcessAfterInitialization方法源码分析2.2 wrapIfNecessary方法源码分析2.3 时序图演示触发机制 3. AOP代理逻辑的执行3.1 AOP代理如何使用拦截器3.2 proceed方法源码分析3.3 时序图 1. 从整体视…

【Linux】线程锁条件变量信号量生产消费者模型线程池

文章目录 线程概念线程控制接口和线程id线程优缺点线程互斥和条件变量锁和条件变量相关接口POSIX 信号量生产消费者模型阻塞队列实现生产消费者模型环形队列实现生产消费者模型简易懒汉线程池自旋锁和读写锁&#xff08;了解&#xff09; 线程概念 在操作系统的的视角下&#x…

SysML图例-农业无人机

DDD领域驱动设计批评文集>> 《软件方法》强化自测题集>> 《软件方法》各章合集>>

828华为云征文 | 华为云FlexusX实例下的Kafka集群部署实践与性能优化

前言 华为云FlexusX实例&#xff0c;以创新的柔性算力技术&#xff0c;为Kafka集群部署带来前所未有的性能飞跃。其灵活的CPU与内存配比&#xff0c;结合智能调度与加速技术&#xff0c;让Kafka在高并发场景下依然游刃有余。在828华为云企业上云节期间&#xff0c;FlexusX实例携…

亲测好用,ChatGPT 3.5/4.0新手使用手册,最好论文指令手册~

本以为遥遥领先的GPT早就普及了&#xff0c;但小伙伴寻找使用的热度一直高居不下&#xff0c;其实现在很简单了&#xff01; 国产大模型快200家了&#xff0c;还有很多成熟的国内AI产品&#xff0c;跟官网一样使用&#xff0c;还更加好用~ ① 3.5 大多数场景是够用的&#xff…

【Java】多线程:Thread类并行宇宙

欢迎浏览高耳机的博客 希望我们彼此都有更好的收获 感谢三连支持&#xff01; 在现代编程中&#xff0c;多线程是提高程序性能和响应能力的一种重要手段。Java 通过 Thread 类和 Runnable 接口提供了丰富的线程管理功能。本文是对 Thread 类基本用法的总结。 线程创建 线程可以…

Ubuntu 22.04上安装Java JDK 8

在Ubuntu 22.04上安装Java JDK 8可以通过以下步骤完成&#xff1a; 前言 本文特别感谢浪浪云的赞助发布。浪浪云&#xff0c;其卓越的云服务和技术支持&#xff0c;一直致力于为用户提供高效、可靠的解决方案。无论是个人开发者、小型企业还是大型组织&#xff0c;浪浪云都能…

11.01类的定义和对象的使用(练习)

类的定义 类名&#xff1a;手机(Phone) 成员变量&#xff1a;品牌(brand&#xff09;&#xff0c;价格&#xff08;price&#xff09; 成员方法&#xff1a;打电话(calL)&#xff0c;发短信&#xff08;sendMessage&#xff09; 调用类变量和方法

商标申请注册加字加成通用词等于没加!

以前普推知产商标曾分析过“东方甄选”火遍全网后&#xff0c;许多人申请注册商标都喜欢加“甄选”&#xff0c;但是“甄选”基本属于通用词了&#xff0c;加“甄选”后还是属于前面那个词。 近期看到有人加“心选”&#xff0c;甄选&#xff0c;优选&#xff0c;心选等还都是选…

HTTPTomcat

HTTP&Tomcat&Servlet 今日目标&#xff1a; 了解JavaWeb开发的技术栈理解HTTP协议和HTTP请求与响应数据的格式掌握Tomcat的使用掌握在IDEA中使用Tomcat插件 1&#xff0c;Web概述 1.1 Web和JavaWeb的概念 Web是全球广域网&#xff0c;也称为万维网(www)&#xff0c;…