【并发专题】手写LinkedBlockingQueue

news2025/1/15 23:48:12

分析

LinkedBlockingQueue有如下特点:

  1. 近乎无界队列,但可以是有界队列
  2. 实现了BlockingQueue接口
  3. 需要实现take方法和put方法,实现阻塞效果
  4. 数据结构是单链表,有head跟last指针来进行入队出队操作
  5. 有两把锁,读写分离
  6. 所以也有两个条件队列,分别用来提醒消费者继续消费,或者生产者继续生产
    在这里插入图片描述
    手写源码过程中,还是如往常一样,需要注意的是,按照Api要求来实现自己的代码。

源码

下面只是简单的实现了take()put()方法,其他就略过了

package org.tuling.juc.queue;

import java.util.AbstractQueue;
import java.util.Collection;
import java.util.Iterator;
import java.util.concurrent.BlockingDeque;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;

/**
 * 手写LinkedBlockingQueue
 *
 * @tag 【LinkedBlockingQueue】 【手写】
 * @Date 2023/8/1 19:13
 * @slogan 编码即学习
 **/
public class MyLinkedBlockingQueue<E> extends AbstractQueue<E> implements BlockingDeque<E> {
    /**
     * 容量
     */
    private final int capacity;

    /**
     * 当前队列任务数量
     */
    private final AtomicInteger count = new AtomicInteger();

    /**
     * 链表头节点
     */
    transient Node<E> head;

    /**
     * 链表尾节点
     */
    transient Node<E> last;

    /**
     * 拿锁(读锁)
     */
    private final ReentrantLock takeLock = new ReentrantLock();

    /**
     * 队列不空,条件
     * 通知消费者可以去消费任务了。当队列为空时,消费者会阻塞在这个条件上,等待唤醒
     */
    private Condition notEmpty = takeLock.newCondition();

    /**
     * 添加锁(写锁)
     */
    private final ReentrantLock putLock = new ReentrantLock();

    /**
     * 队列不满,条件
     * 通知生产者可以去生产任务了。当队列满时,生产者会阻塞在这个条件上,等待唤醒
     */
    private Condition notFull = putLock.newCondition();

    /**
     * 默认的构造函数
     * 会创建一个capacity为Integer.MAX_VALUE的阻塞队列
     */
    public MyLinkedBlockingQueue() {
        this(Integer.MAX_VALUE);
    }

    /**
     * 创建一个指定容量capacity的的阻塞队列
     *
     * @param capacity 容量大小
     */
    public MyLinkedBlockingQueue(int capacity) {
        this.capacity = capacity;
        head = new Node(null);
        last = head;
    }

    /**
     * 添加元素方法(可阻塞等待)
     *
     * <p>
     * 以下是BlockingQueue接口要求:
     * 将指定的元素插入此队列,并在必要时等待可用空间。
     * 参数: E -要添加的元素
     * 抛出:
     * InterruptedException -如果在等待时中断
     * ClassCastException——如果指定元素的类阻止它被添加到这个队列
     * NullPointerException -如果指定的元素为空
     * IllegalArgumentException -如果指定元素的某些属性阻止它被添加到这个队列
     * </p>
     */
    @Override
    public void put(E e) throws InterruptedException {
        if (e == null) {
            throw new NullPointerException();
        }

        // 添加元素
        Node<E> node = new Node<>(e);
        final ReentrantLock putLock = this.putLock;
        final AtomicInteger count = this.count;
        putLock.lockInterruptibly();
        try {
            while (count.get() == this.capacity) {

                // 队列满了
                notFull.await();
            }

            // 入队
            enqueue(node);
            int andIncrement = count.getAndIncrement();

            // 判断是否需要唤醒生产者(可能这边生产,另一头在消费了)
            if (andIncrement + 1 < capacity) {
                notFull.signal();
            }
        } finally {
            putLock.unlock();
        }

        // 还要通知消费者继续消费了
        if (count.get() > 0) {

            // 通知消费者
            this.signalNotEmpty();
        }
    }

    /**
     * 通知消费者,队列不为空了
     * 这里需要上锁,为什么?因为如果不上锁,可能会有其他消费线程并发进来,发现队列是空的,然后进入了沉睡(与其沉睡上下文切换,还不如让它竞争锁)
     */
    private void signalNotEmpty() {
        final ReentrantLock takeLock = this.takeLock;
        takeLock.lock();
        try {
            notEmpty.signal();
        } finally {
            takeLock.unlock();
        }
    }

    /**
     * 入队(尾插法)
     *
     * @param node 需要入队的节点
     */
    private void enqueue(Node<E> node) {
        last.next = node;
        last = last.next;
    }

    /**
     * 出队(可阻塞等待)
     *
     * <p>
     * 以下是BlockingQueue接口要求:
     * 检索并删除此队列的头部,必要时等待,直到元素可用为止。
     * 返回:
     * 这个队列的头
     * 抛出:
     * InterruptedException -如果在等待时中断
     * </p>
     */
    @Override
    public E take() throws InterruptedException {
        E result;
        final ReentrantLock takeLock = this.takeLock;
        final AtomicInteger count = this.count;
        takeLock.lock();
        try {

            while (count.get() == 0) {

                // 队列为空
                notEmpty.await();
            }

            // 出队
            result = dequeue();
            int andDecrement = count.getAndDecrement();

            // 判断是否需要唤醒消费者(可能这边消费,另一头在生产了)
            if (andDecrement > 1) {
                notEmpty.signal();
            }
        } finally {
            takeLock.unlock();
        }

        // 通知身缠这生产
        if (count.get() < capacity) {
            this.signalNotFull();
        }
        return result;
    }

    private void signalNotFull() {
        final ReentrantLock putLock = this.putLock;
        putLock.lock();
        try {
            this.notFull.signal();
        } finally {
            putLock.unlock();
        }
    }

    /**
     * 出队
     */
    private E dequeue() {

        Node<E> head = this.head;
        Node<E> first = head.next;

        // 这样设置,方便将以前的头节点给GC了
        head.next = null;

        this.head = first;
        E item = first.item;
        first.item = null;
        return item;
    }

    @Override
    public Iterator<E> iterator() {
        return null;
    }

    @Override
    public Iterator<E> descendingIterator() {
        return null;
    }

    @Override
    public void push(E e) {

    }

    @Override
    public E pop() {
        return null;
    }

    @Override
    public void addFirst(E e) {

    }

    @Override
    public void addLast(E e) {

    }

    @Override
    public boolean offerFirst(E e) {
        return false;
    }

    @Override
    public boolean offerLast(E e) {
        return false;
    }

    @Override
    public E removeFirst() {
        return null;
    }

    @Override
    public E removeLast() {
        return null;
    }

    @Override
    public E pollFirst() {
        return null;
    }

    @Override
    public E pollLast() {
        return null;
    }

    @Override
    public E getFirst() {
        return null;
    }

    @Override
    public E getLast() {
        return null;
    }

    @Override
    public E peekFirst() {
        return null;
    }

    @Override
    public E peekLast() {
        return null;
    }

    @Override
    public void putFirst(E e) throws InterruptedException {

    }

    @Override
    public void putLast(E e) throws InterruptedException {
    }

    @Override
    public boolean offerFirst(E e, long timeout, TimeUnit unit) throws InterruptedException {
        return false;
    }

    @Override
    public boolean offerLast(E e, long timeout, TimeUnit unit) throws InterruptedException {
        return false;
    }

    @Override
    public E takeFirst() throws InterruptedException {
        return null;
    }

    @Override
    public E takeLast() throws InterruptedException {
        return null;
    }

    @Override
    public E pollFirst(long timeout, TimeUnit unit) throws InterruptedException {
        return null;
    }

    @Override
    public E pollLast(long timeout, TimeUnit unit) throws InterruptedException {
        return null;
    }

    @Override
    public boolean removeFirstOccurrence(Object o) {
        return false;
    }

    @Override
    public boolean removeLastOccurrence(Object o) {
        return false;
    }

    @Override
    public boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException {
        return false;
    }

    @Override
    public E poll(long timeout, TimeUnit unit) throws InterruptedException {
        return null;
    }

    @Override
    public int remainingCapacity() {
        return 0;
    }

    @Override
    public int drainTo(Collection<? super E> c) {
        return 0;
    }

    @Override
    public int drainTo(Collection<? super E> c, int maxElements) {
        return 0;
    }

    @Override
    public int size() {
        return 0;
    }

    @Override
    public boolean offer(E e) {
        return false;
    }

    @Override
    public E poll() {
        return null;
    }

    @Override
    public E peek() {
        return null;
    }

    /**
     * 内部单链表
     * 维护阻塞队列
     */
    public static final class Node<E> {

        /**
         * 当前节点
         */
        E item;

        /**
         * 下一个节点
         */
        Node<E> next;

        public Node(E item) {
            this.item = item;
        }
    }
}

使用示例

public class MyLinkedBlockingQueueTest {
    public static void main(String[] args) throws InterruptedException {

        // 指定队列的大小创建有界队列
//        BlockingQueue<Integer> boundedQueue = new LinkedBlockingQueue<>(10);
        BlockingQueue<Integer> boundedQueue = new MyLinkedBlockingQueue<>(10);

        // 向队列中添加元素
        boundedQueue.put(3);

        // 从队列中取出元素
        Integer take = boundedQueue.take();
        System.out.println(take);
    }
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/822532.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

linux-MySQL的数据目录

总结&#xff1a; window中的my.ini linux 中 /etc/my.cnfwindow中的D:\soft\mysql-5.7.35-winx64\data linux 中 /var/lib/mysql 1.查找与mysql有关的目录 find / -name mysql [rootVM-4-6-centos etc]# find / -name mysql /opt/mysql /etc/selinux/targeted/tmp/modul…

【Linux命令200例】patch 用于将补丁文件应用到源码中

&#x1f3c6;作者简介&#xff0c;黑夜开发者&#xff0c;全栈领域新星创作者✌&#xff0c;阿里云社区专家博主&#xff0c;2023年6月csdn上海赛道top4。 &#x1f3c6;本文已收录于专栏&#xff1a;Linux命令大全。 &#x1f3c6;本专栏我们会通过具体的系统的命令讲解加上鲜…

glide加载content://com.android.contacts图片源码粗略梳理

获取链路是这样的&#xff1b; UriLoader类里定义了协议头&#xff1a; 里面有个内部类StreamFactory&#xff1a; 通过StreamLocalUriFetcher类的loadResource方法获取InputStream然后把流转换成为图片&#xff1b; 在这里作个草稿笔记给自己看

分布式事务之CAP理论和BASE理论详解

&#x1f680; 分布式事务 &#x1f680; &#x1f332; AI工具、AI绘图、AI专栏 &#x1f340; &#x1f332; 如果你想学到最前沿、最火爆的技术&#xff0c;赶快加入吧✨ &#x1f332; 作者简介&#xff1a;硕风和炜&#xff0c;CSDN-Java领域优质创作者&#x1f3c6;&…

PHP8的变量-PHP8知识详解

昨天我们讲解了PHP8的常量&#xff0c;今天讲解PHP8的变量。常量有定义常量和预定义常量&#xff0c;变量呢&#xff1f;那就没有定义变量了&#xff0c;那叫给变量赋值&#xff0c;但是还是有预定义变量的。下面就给大家讲解什么是变量、变量赋值及使用及预定义变量。 一、什么…

SE-Net注意力机制

📌本次任务:了解SE-Net原理 SE-Net 是 ImageNet 2017(ImageNet 收官赛)的冠军模型,是由WMW团队发布。具有复杂度低,参数少和计算量小的优点。且SENet 思路很简单,很容易扩展到已有网络结构如 Inception 和 ResNet 中。(这篇论文是2019年的,应该是后续做了更新) 一…

CodeGeeX2 模型全新上线,编程助手能力全面升级!

第二代CodeGeeX代码生成模型CodeGeeX2-6B已于近日发布&#xff0c;并在CodeGeeX编程助手插件上全面上线。新模型基于 ChatGLM2-6B 架构加入代码预训练实现&#xff0c;精度更高、速度更快、能力更强。 下面我们一起来看一下新版模型给CodeGeeX编程助手带来的变化吧&#xff1a;…

C# Blazor 学习笔记(4):blazor代码分离

文章目录 前言代码分离 前言 Blazor可以支持在razor文件里面添加cs代码&#xff0c;但是代码一旦复杂了之后就会变得特别的麻烦。但是VS提供了代码分组的功能。 分离前 分离后 代码分离 我们直接右键razor组件是不能直接添加cs代码部分的 注意新建类的类名是xxx.razor…

安全加固服务器

根据以下的内容来加固一台Linux服务器的安全。 首先是限制连续密码错误的登录次数&#xff0c;由于RHEL8之后都不再使用pam_tally.so和pam_tally2.so&#xff0c;而是pam_faillock.so 首先进入/usr/lib64/security/中查看有什么模块&#xff0c;确认有pam_faillock.so 因为只…

【虚拟数字人】SadTalker简易部署教程

视频教程在这里&#xff1a; sadtalker数字人创建简易教程 项目基于SadTalkers实现视频唇形合成的Wav2lip。通过以视频文件方式进行语音驱动生成唇形&#xff0c;设置面部区域可配置的增强方式进行合成唇形&#xff08;人脸&#xff09;区域画面增强&#xff0c;提高生成唇形的…

【MySQL】数据库基础和SQL分类

文章目录 MySQL数据库基础 数据库的概念主流数据库基本使用连接服务器服务器管理数据库服务器&#xff0c;数据库&#xff0c;表关系 MySQL架构 SQL分类存储引擎查看存储引擎 MySQL数据库基础 数据库的概念 数据库是按照数据结构来组织、存储和管理数据的仓库&#xff0c;…

mysql--InnoDB存储引擎--架构和事务

MySQL进阶篇 文章目录 架构1、逻辑结构InnoDB 逻辑存储单元主层级关系图&#xff1a;1、表空间2、段3、区4、页5、行总结&#xff1a; 2、架构2、1 内存架构2、2 磁盘架构 3、事务3、1事务基础&#xff08;1&#xff09;事务&#xff08;2&#xff09;特性 架构 1、逻辑结构 I…

青少年软件编程(Python) 等级考试试卷(六级)2023年5月

青少年软件编程&#xff08;Python&#xff09; 等级考试试卷&#xff08;六级&#xff09; 分数&#xff1a; 100 题数&#xff1a; 38 一、 单选题(共 25 题&#xff0c; 共 50 分) 1.明明每天坚持背英语单词&#xff0c; 他建立了英语单词错题本文件“mistakes. txt” &…

PPT文件常见的几种格式有哪些?

PPT文件大家都不陌生&#xff0c;那么PPT文件的几种格式&#xff0c;大家也可以学习一下&#xff1a; .ppt & .pptx 这两种PPT格式是最基本的ppt文件后缀&#xff0c;就是我们普通可以编辑的PPT文件格式&#xff0c;2003版的PowerPoint的文件格式是.ppt&#xff0c;而在0…

【C++】文件操作(囊括特殊情况:读文件遇到的空格被跳过、“文件只读一次“)

author&#xff1a;&Carlton tag&#xff1a;C topic&#xff1a;【C】文件操作&#xff08;囊括特殊情况&#xff1a;读文件遇到的空格被跳过、“文件只读一次”&#xff09; website&#xff1a;黑马程序员C date&#xff1a;2023年7月31日 目录 文本文件 写文件 源…

二叉搜索树的模拟实现

基础的二叉树用的其实不多&#xff0c;二叉树的重点在二叉树的延伸&#xff1a;二叉搜索树。二叉搜索树又延伸出了平衡二叉搜索树。搜索数的特点是&#xff1a;查找效率极高。 二叉搜索树的作用&#xff1a; 1. map和set特性需要先铺垫二叉搜索树&#xff0c;而二叉搜索树也是一…

3节点linux服务器集群搭建

一&#xff0c;目的 由于当前集群部署已经成为主流&#xff0c;适当研究一些集群部署的基本操作&#xff0c;有助于后续像k8s集群、doris集群的部署。 大部分集群都是一主两从这种三节点配置。故本文也是采用三节点完成相关学习和记录。 二&#xff0c;说明 因为会关闭防火…

webScoket

webScoket是什么&#xff1f; 支持端对端通讯可以由客户端发起&#xff0c;也可以有服务端发起用于消息通知、直播间讨论区、聊天室、协同编辑等 做一个简单的webScoket 客户端配置&#xff1a; 1、新建一个页面叫web-scoket.html <!DOCTYPE html> <html lang"…

P7243 最大公约数

题目 思路 利用曼哈顿原理求离&#xff08;x&#xff0c;y&#xff09;最远的点 代码 #include<bits/stdc.h> using namespace std; #define int long long #define INF 0x3f3f3f3f const int maxn2005; int gcd(int a,int b) { return b?gcd(b,a%b):a; } int n,m; i…

华为OD机试真题 JavaScript 实现【输入整型数组和排序标识,对其元素按照升序或降序进行排序】【牛客练习题】

目录 一、题目描述二、输入描述三、输出描述四、解题思路五、JavaScript算法源码 华为OD机试 2023B卷题库疯狂收录中&#xff0c;刷题点这里 刷的越多&#xff0c;抽中的概率越大&#xff0c;每一题都有详细的答题思路、详细的代码注释、样例测试&#xff0c;发现新题目&#x…