【阻塞队列】

news2025/1/18 7:04:17

文章目录

  • 普通队列存在的问题
  • 单锁实现
  • 双锁实现

普通队列存在的问题

  1. 大部分场景要求分离向队列放入(生产者)、从队列拿出(消费者)两个角色、它们得由不同的线程来担当,而之前的实现根本没有考虑线程安全问题
  2. 队列为空,那么在之前的实现里会返回 null,如果就是硬要拿到一个元素呢?只能不断循环尝试
  3. 队列为满,那么再之前的实现里会返回 false,如果就是硬要塞入一个元素呢?只能不断循环尝试

因此我们需要解决的问题有

  1. 用锁保证线程安全
  2. 用条件变量让等待非空线程等待不满线程进入等待状态,而不是不断循环尝试,让 CPU 空转

单锁实现

ava 中要防止代码段交错执行,需要使用锁,有两种选择

  • synchronized 代码块,属于关键字级别提供锁保护,功能少
  • ReentrantLock 类,功能丰富

ReentrantLock 配合条件变量来实现:

在队列满时,不是立刻返回,而是当前线程进入等待
什么时候队列不满了,再唤醒这个等待的线程,从上次的代码处继续向下运行

offer方法:

ReentrantLock lock = new ReentrantLock();
Condition tailWaits = lock.newCondition(); // 条件变量
int size = 0;

public void offer(String e) {
    lock.lockInterruptibly();
    try {
        while (isFull()) {
            tailWaits.await();	// 当队列满时, 当前线程进入 tailWaits 等待
        }
        array[tail] = e;
        tail++;
        
        size++;
    } finally {
        lock.unlock();
    }
}

private boolean isFull() {
    return size == array.length;
}
  • 条件变量底层也是个队列,用来存储这些需要等待的线程,当队列满了,就会将 offer 线程加入条件队列,并暂时释放锁
  • 将来我们的队列如果不满了(由 poll 线程那边得知)可以调用 tailWaits.signal() 来唤醒 tailWaits 中首个等待的线程,被唤醒的线程会再次抢到锁,从上次 await 处继续向下运行

上述关键点:

  • 从 tailWaits 中唤醒的线程,会与新来的 offer 的线程争抢锁,谁能抢到是不一定的,如果后者先抢到,就会导致条件又发生变化
  • 这种情况称之为虚假唤醒,唤醒后应该重新检查条件,看是不是得重新进入等待(while循环)

最终版本:

/**
 * 单锁实现
 * @param <E> 元素类型
 */
public class BlockingQueue1<E> implements BlockingQueue<E> {
    private final E[] array;
    private int head = 0;
    private int tail = 0;
    private int size = 0; // 元素个数

    @SuppressWarnings("all")
    public BlockingQueue1(int capacity) {
        array = (E[]) new Object[capacity];
    }

    ReentrantLock lock = new ReentrantLock();//单锁
    Condition tailWaits = lock.newCondition();
    Condition headWaits = lock.newCondition();//条件变量,底层也是队列

    @Override
    public void offer(E e) throws InterruptedException {
        lock.lockInterruptibly();
        try {
            while (isFull()) {
                tailWaits.await();
            }
            array[tail] = e;
            if (++tail == array.length) {
                tail = 0;
            }
            size++;
            headWaits.signal();
        } finally {
            lock.unlock();
        }
    }

    @Override
    public void offer(E e, long timeout) throws InterruptedException {
        lock.lockInterruptibly();
        try {
            long t = TimeUnit.MILLISECONDS.toNanos(timeout);
            while (isFull()) {
                if (t <= 0) {
                    return;
                }
                t = tailWaits.awaitNanos(t);
            }
            array[tail] = e;
            if (++tail == array.length) {
                tail = 0;
            }
            size++;
            headWaits.signal();
        } finally {
            lock.unlock();
        }
    }

    @Override
    public E poll() throws InterruptedException {
        lock.lockInterruptibly();
        try {
            while (isEmpty()) {
                headWaits.await();
            }
            E e = array[head];
            array[head] = null; // help GC
            if (++head == array.length) {
                head = 0;
            }
            size--;
            tailWaits.signal();
            return e;
        } finally {
            lock.unlock();
        }
    }

    private boolean isEmpty() {
        return size == 0;
    }

    private boolean isFull() {
        return size == array.length;
    }
}

注意

  • JDK 中 BlockingQueue 接口的方法命名与我的示例有些差异
    • 方法 offer(E e) 是非阻塞的实现,阻塞实现方法为 put(E e)
    • 方法 poll() 是非阻塞的实现,阻塞实现方法为 take()

双锁实现

单锁的缺点在于:

  • 生产和消费几乎是不冲突的,唯一冲突的是生产者和消费者它们有可能同时修改 size
  • 冲突的主要是生产者之间:多个 offer 线程修改 tail
  • 冲突的还有消费者之间:多个 poll 线程修改 head

如果希望进一步提高性能,可以用两把锁

  • 一把锁保护 tail
  • 另一把锁保护 head
    在这里插入图片描述

初步实现:

@Override	
public void offer(E e) throws InterruptedException {
    tailLock.lockInterruptibly();
    try {
        // 队列满等待
        while (isFull()) {
            tailWaits.await();
        }
        
        // 不满则入队
        array[tail] = e;
        if (++tail == array.length) {
            tail = 0;
        }
        
        // 修改 size (有问题)
        size++;
        
    } finally {
        tailLock.unlock();
    }
}

上面代码的缺点是 size 并不受 tailLock 保护,tailLock 与 headLock 是两把不同的锁,并不能实现互斥的效果。因此,size 需要用下面的代码保证原子性
在这里插入图片描述
最终版本:

  1. 两把锁,一把 锁生产者,一把 锁消费者,生产者的signel需要加生产者的锁,然后唤醒消费者,消费者的signel要加消费者的锁,然后唤醒生产者。
  2. 当向队列取元素时:当队列由满到不满时,由消费者唤醒生产者(此时是生产者锁+生产者条件变量.signel),其他情况(由不满到不满)由消费者唤醒消费者。
  3. 当向队列存元素时:当队列由空到不空时,由生产者线程唤醒消费者(此时是消费者锁+消费者条件变量.signel),其他情况(由不空到不空时),生产者唤醒生产者。
  4. 生产者 / 消费者 获得生产者锁与消费者锁是串行,不能嵌套(避免死锁)。
public class BlockingQueue2<E> implements BlockingQueue<E> {

    private final E[] array;
    private int head = 0;
    private int tail = 0;
    private final AtomicInteger size = new AtomicInteger(0);
    ReentrantLock headLock = new ReentrantLock();
    Condition headWaits = headLock.newCondition();
    ReentrantLock tailLock = new ReentrantLock();
    Condition tailWaits = tailLock.newCondition();

    public BlockingQueue2(int capacity) {
        this.array = (E[]) new Object[capacity];
    }

    @Override
    public void offer(E e) throws InterruptedException {
        int c;
        tailLock.lockInterruptibly();
        try {
            while (isFull()) {
                tailWaits.await();
            }
            array[tail] = e;
            if (++tail == array.length) {
                tail = 0;
            }            
            c = size.getAndIncrement();
            // a. 队列不满, 但不是从满->不满, 由此offer线程唤醒其它offer线程
            if (c + 1 < array.length) {
                tailWaits.signal();
            }
        } finally {
            tailLock.unlock();
        }
        // b. 从0->不空, 由此offer线程唤醒等待的poll线程
        if (c == 0) {
            headLock.lock();
            try {
                headWaits.signal();
            } finally {
                headLock.unlock();
            }
        }
    }

    @Override
    public E poll() throws InterruptedException {
        E e;
        int c;
        headLock.lockInterruptibly();
        try {
            while (isEmpty()) {
                headWaits.await(); 
            }
            e = array[head]; 
            if (++head == array.length) {
                head = 0;
            }
            c = size.getAndDecrement();
            // b. 队列不空, 但不是从0变化到不空,由此poll线程通知其它poll线程
            if (c > 1) {
                headWaits.signal();
            }
        } finally {
            headLock.unlock();
        }
        // a. 从满->不满, 由此poll线程唤醒等待的offer线程
        if (c == array.length) {
            tailLock.lock();
            try {
                tailWaits.signal();
            } finally {
                tailLock.unlock();
            }
        }
        return e;
    }

    private boolean isEmpty() {
        return size.get() == 0;
    }

    private boolean isFull() {
        return size.get() == array.length;
    }

}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/951782.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

【记录】手机QQ和电脑QQ里的emoji种类有什么差异?

版本 手机 QQ&#xff1a;V 8.9.76.12115 电脑 QQ&#xff1a;QQ9.7.15&#xff08;29157&#xff09; 偶然发现&#xff0c;有一种emoji手机上怎么找都找不到&#xff0c;一开始以为自己失忆了&#xff0c;后来发现这种emoji只在电脑上有。 接下来简单说一下找emoji差异的方式…

912.排序数组

目录 一、题目 二、代码 一、题目 912. 排序数组 - 力扣&#xff08;LeetCode&#xff09; 二、代码 class Solution { public:void _MergeSort(vector<int>&data,vector<int>&tmp,int begin,int end){if(begin>end)return;//结束条件int mid (beg…

解决博客不能解析PHP直接下载源码问题

背景&#xff1a; 在网站设置反向代理后&#xff0c;网站突然不能正常访问&#xff0c;而是会直接下载访问文件的PHP源码 解决办法&#xff1a; 由于在搞完反向代理之后&#xff0c;PHP版本变成了纯静态&#xff0c;所以网站不能正常解析&#xff1b;只需要把PHP版本恢复正常…

记录一些问题

1、如何下载从数据库中查询出来的数据 查询结果List 写到文件中&#xff0c;然后下载 GetMapping(value "/download")public void download(HttpServletResponse response)throws IOException {List<ticket> tickets getTickets();File tmpFile write2CSVF…

Python的os.walk()函数使用案例

在Python中&#xff0c;os模块是一个非常实用的工具&#xff0c;它可以让我们与操作系统进行交互&#xff0c;操作文件和目录。在本文中&#xff0c;我们将详细介绍os模块中的遍历文件功能&#xff0c;并通过具体案例和使用场景来解释。 首先&#xff0c;导入os模块。在Pytho…

嵌入式学习之exec族函数

今天&#xff0c;主要学习的内容是exec族函数和system函数&#xff0c;以及system函数和fork函数的配合使用。今日写的代码如下&#xff1a;

《Kubernetes部署篇:Ubuntu20.04基于containerd部署kubernetes1.24.17集群(多主多从)》

一、架构图 如下图所示: 二、环境信息 1、部署规划主机名K8S版本系统版本内核版本IP地址备注k8s-master-631.24.17Ubuntu 20.04.5 LTS5.15.0-69-generic192.168.1.63master节点 + etcd节点k8s-master-641.24.17Ubuntu 20.04.5 LTS5.15.0-69-generic192.168.1.64master节点 + …

Linux禅道上修改Apache 和 MySQL 默认端口号

1. 修改Apache默认端口号 80 cd /opt/zbox/etc/apachevim httpd.conf :wq 保存 2. 修改MySQL默认端口号 3306 cd /opt/zbox/etc/mysql vim my.cnf :wq 保存 3. 重启服务 ./zbox restart

计算机网络-笔记-第六章-应用层

目录 六、第六章——应用层 1、应用层概述 2、(C/S)客户-服务器方式 & &#xff08;P2P&#xff09;对等方式 &#xff08;1&#xff09;客户-服务器方式【C/S】 &#xff08;2&#xff09;对等方式【P2P】 3、DHCP——动态主机配置协议 &#xff08;1&#xff09;诞…

面试结束后:如何写一封有效的感谢信

&#x1f337;&#x1f341; 博主猫头虎 带您 Go to New World.✨&#x1f341; &#x1f984; 博客首页——猫头虎的博客&#x1f390; &#x1f433;《面试题大全专栏》 文章图文并茂&#x1f995;生动形象&#x1f996;简单易学&#xff01;欢迎大家来踩踩~&#x1f33a; &a…

开源且强大的网络嗅探分析工具——Wireshark

Wireshark是一款强大的开源网络协议分析工具&#xff0c;旨在帮助用户深入了解网络通信的细节。通过捕获、解析和展示网络数据包&#xff0c;Wireshark能够帮助工程师诊断问题、优化性能&#xff0c;以及解决各种网络难题。无论是深入分析还是快速调试&#xff0c;Wireshark都是…

学习pytorch7 神经网络的基本骨架--nn,module的使用

神经网络的基本骨架--nn,module的使用 官网Module介绍Python父类子类继承关系前向神经网络pycharm快捷键重写类方法codedebug B站小土堆视频学习笔记 官网Module介绍 https://pytorch.org/docs/stable/generated/torch.nn.Module.html#torch.nn.Module Python父类子类继承关系…

4.4 对幻灯片进行动画制作

动画是演示文稿的重要构成要素&#xff0c;WPS演示为用户提供了多种动画类型&#xff0c;通过学习设置页面切换、动画效果等相关功能&#xff0c;可使演示文稿更加生动&#xff0c;富于表现力。 4.4.1 设置页面的切换方式 页面的切换是指从一张幻灯片切换到另一张幻灯片时的页…

【ES6】JavaScript中的Symbol

Symbol是JavaScript中的一种特殊的、不可变的、不可枚举的数据类型。它通常用于表示一个唯一的标识符&#xff0c;可以作为对象的属性键&#xff0c;确保对象的属性键的唯一性和不可变性。 Symbol.for()是Symbol的一个方法&#xff0c;它用于创建一个已经注册的Symbol对象。当…

ThePASS研究院|以Safe为例,解码DAO国库管理

本研究文章由ThePASS团队呈现。ThePASS是一家开创性的DAO聚合器和搜索引擎&#xff0c;在为DAO提供洞察力和分析方面发挥着关键作用。 Intro 随着去中心化自治组织&#xff08;DAOs&#xff09;的发展&#xff0c;它们被赋予了越来越多的角色和期望。在这种巨幅增长的背景下&…

大数据平台与数据仓库的五大区别

随着大数据的快速发展&#xff0c;很多人难以区分大数据平台与数据仓库的区别&#xff0c;两者傻傻分不清楚。今天我们小编就给大家汇总了大数据平台与数据仓库的五大区别&#xff0c;希望有用哦&#xff01;仅供参考&#xff01; 大数据平台与数据仓库的五大区别 一、概念不同…

docker安装grafana,prometheus,exporter以及springboot整合详细教程(GPE)

springboot项目ip:192.168.168.1 测试服务器ip:192.168.168.81 文章来自互联网,自己略微整理下,更容易上手,方便自己,方便大家 最终效果: node springboot 1.下载镜像 docker pull prom/node-exporter docker pull prom/mysqld-exporter docker pull google/cadvisor dock…

微前端-monorepo-无界

文章目录 前言一、微前端二 、monorepo三 、pnpm硬链接软链接&#xff08;符号链接&#xff09;幽灵依赖依赖安装耗时长monorepo项目搭建子模块复用 四、无界接入无界无界预加载无界传参 总结 前言 本文主要记录微前端框架 无界 的使用与理解以及monorepo代码管理方式。 一、微…

小红书群禁言

群禁言在2023上半年炒得沸沸扬扬&#xff0c;结果官方一个关闭接口操作&#xff0c;谁都没办法了 不过还好&#xff0c;现在有了完美的解决方案&#xff1a;群撤回助手&#xff01; 24小时帮你管群&#xff0c;自定义规则、白名单&#xff0c;有广告自动秒撤&#xff01;多发广…

PyQt6 GUI界面设计和Nuitka包生成exe程序(全笔记)

PyQt6 GUI界面设计和Nuitka包,生成exe程序全笔记 目录一、PyQt6包安装1.1 进行环境配置和安装1.2 检查包是否安装成功。1.3 运行desinger.exe二、GUI界面设计,写程序,并能运行成功。三、Nuitka打包生成exe程序3.1 做Nuitka安装准备工作(1)安装C编译器,设置环境变量3.2 安…