Java多线程案例之阻塞队列

news2025/1/11 17:54:38

文章目录

  • 一. 认识阻塞队列
    • 1. 什么是阻塞队列
    • 2. 生产者消费者模型
    • 3. 标准库中阻塞队列类
    • 二. 基于循环队列实现的简单阻塞队列
    • 1. 循环队列的简单实现
    • 2. 阻塞队列的简单实现

一. 认识阻塞队列

1. 什么是阻塞队列

阻塞队列本质上还是一种队列, 和普通队列一样, 遵循先进先出, 后进后出的规则, 但阻塞队例相比于普通队列的特殊之处在于阻塞队列的阻塞功能, 主要基于多线程使用.

  1. 如果队列为空, 执行出队列操作, 就会使线程陷入阻塞, 阻塞到另一个线程往队列里添加元素(队列不空)为止.
  2. 如果队列满了,执行入队列操作, 也会使线程阻塞, 阻塞到另一个线程从队列取走元素位置(队列不满)为止.

2. 生产者消费者模型

基于阻塞队列的阻塞特性是可以实现 “生产者消费者模型” 的, 那么什么是生产者消费者模型呢?

还是用生活中的例子来解释, 这不还有半个月就要过年了, 大年当晚我们会吃年夜饭, 饺子就是年夜饭当中的一份主食, 那么要想吃到饺子, 最好就是一家人在一起把饺子包好, 简单来讲包饺子的步骤有: 擀饺子皮+包饺子.

有下面两种包饺子方式 :

  1. 每个人, 都分别进行 擀饺子皮+包饺子这样的操作, 但毕竟家里面的擀面杖不会准备那么多吧, 大家会竞争面杖, 一个人使用擀面杖的使用, 其他人就得阻塞等待, 这就影响了包饺子的效率了.
  2. 一个人专门负责擀饺子皮, 另外三个人负责包, 擀饺子的人每次擀好一个皮, 就放到 盖帘 上, 其他人每次都从盖帘上取一个皮包饺子.

其实第二种包饺子方式就是生产者消费者模型的运用, 饺子皮的那个人就是生产者, 其他负责包饺子的人就是消费者, 放饺子皮的盖帘就相当于阻塞队列, 如果擀饺子皮的人擀的太慢生产的饺子皮供不上使用, 不一会盖帘上没皮了, 包饺子的人就得等一会儿再包; 如果擀饺子的人擀的太快了, 包的速度跟不上擀的速度, 盖帘上放满了饺子皮, 擀饺子皮的人就得等一会儿再擀.

img

生产者消费者模型能够给程序带来两个非常重要的好处, 一是可以实现实现了发送方和接收方之间的 “解耦” , 二是可以 “削峰填谷” , 保证系统的稳定性, 具体理解如下:

在服务器相互调用的场景中假设有两个服务器A(请求服务器), B(应用服务器), A把请求转发给B处理, B处理完了把结果反馈给A, 这种情况下, A和B之间的耦合是比较高的, A要调用B, A 务必要知道B的存在, 如果B挂了, 很容易引起A的bug, 再比如再加一个C服务器, 此时也需要对A修改不少代码, 就需要针对A重新修改代码, 重新测试, 重新发布, 重新部署等, 这就非常麻烦了.

img

而针对上述场景, 使用生产者消费者模型就可以有效的降低耦合,

img

A和B之间通过一个阻塞队列来通信, 此时A是不知道B的, A只知道队列, 也就是说A的代码中没有任何一行代码和B相关; 同样的, B也是不知道A的, B也是只知道队列, B的代码中,也没有任何一行代码和A相关.

如果B挂了, 对于A没有任何影响, 因为队列还是正常的, A仍然可以给队列插入元素, 如果队列满就先阻塞等待; 同样如果A挂了, 也对于B没啥影响, 队列是正常的B就仍然可以从队列取元素, 如果队列空了, 也就阻塞等待就好了; 也就是说, AB任何一方挂了不会对对方造成影响, 同时, 新增一个C来作为消费者, 对于A来说仍然是无感知的.

削峰填谷” 可以联想三峡大坝的水库, 三峡大坝的水库,

img

如果上游水多了, 三峡大坝就会关闸蓄水, 此时就相当于由三峡大坝承担了上游的冲击, 对下游起到了很好的保护左右, 这就是 “削峰” 作用; 如果上游水少了, 三峡大坝开闸放水, 有效保证下游的用水情况, 避免出现干旱灾害, 这就是 “填谷” 作用.

而在服务器开发中, 上游就是用户发送的请求, 下游就是一些执行具体业务的服务器, 用户发多少请求这是不可控的, 有的时候请求多, 有的时候请求少, 而如果没有使用生产者消费者模型, A服务器用户请求暴涨, 此时如果没有充分的准备, B服务器来不及响应一下没抗住, 就可能会挂掉.

但是, 如果使用生产者消费者模型, 那么即使A服务器请求暴涨, 也不会影响到B, 这是因为A请求暴涨后, 用户的请求都被打包到阻塞队列中(如果阻塞队列有界, 则会引起队列阻塞, 不会影响到B), B可以从阻塞队列中取出元素以合适的速度来处理这些请求, 这就是 “削峰填谷” 的作用了.

img

3. 标准库中阻塞队列类

Java标准库也提供了阻塞队列的标准类, 常用的有下面几个:

  • ArrayBlockingQueue : 基于数组实现界阻塞队列
  • LinkedBlockingQueue : 基于链表实现的有界阻塞队列
  • PriorityBlockingQueue : 带有优先级(堆)的无界阻塞队列
  • BlockingQueue接口 : 上面的类实现了该接口

阻塞队列类的核心方法:

方法解释
void put(E e) throws InterruptedException带有阻塞特性的入队操作方法
E take() throws InterruptedException带有阻塞特性的出队操作方法
boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException带有阻塞特性的入队操作方法, 并且可以设置最长等待时间
E poll(long timeout, TimeUnit unit) throws InterruptedException带有阻塞特性的出队操作方法, 并且可以设置最长等待时间

注意 : 其他一些重载的offer, poll等普通队列方法也是支持使用的, 但是这些方法就不带有阻塞功能了.

代码示例:

下面的代码是基于标准库的阻塞队列简单实现的生产者消费者模型.

public class TestDemo19 {
    public static void main(String[] args) {
        BlockingQueue<Integer> blockingQueue = new LinkedBlockingQueue<>();
        //消费者线程
        Thread customer = new Thread(() -> {
           while (true) {
               try {
                   Integer result = blockingQueue.take();
                   System.out.println("消费元素: " + result);
               } catch (InterruptedException e) {
                   throw new RuntimeException(e);
               }
           }
        });
        customer.start();

        //生产者线程
        Thread producer = new Thread(() -> {
            int count = 0;
            while (true) {
                try {
                    blockingQueue.put(count);
                    System.out.println("生产元素: " + count);
                    count++;
                    Thread.sleep(500);
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }
            }
        });
        producer.start();
    }
}

执行结果:

img

二. 基于循环队列实现的简单阻塞队列

1. 循环队列的简单实现

要实现一个阻塞队列, 需要先实现一个普通的循环队列, 循环队列是基于数组实现的, 这里最重要的是如何将队列为空状态与满状态区分开来, 对于这里的实现不懂得可以看看看我前面博客中关于循环队列的内容, 这里就是不做过多的赘述了 : 队列与集合Queue,Deque的理解和使用 , 用栈实现队列,用队列实现栈,设计循环队列 , 阻塞队列最核心的就是出队和入队操作, 所以我们这里重点实现这两个方法, 代码如下:

//普通的循环队列
class MyBlockingQueue {
    //存放元素的数数组
    private int[] items = new int[1000];
    //队头指针
    private int head = 0;
    //队尾指针
    private int tail = 0;
    //记录队列元素的个数
    private int size = 0;

    //入队操作
    public void put (int val) {
        if (size == items.length) {
            //队列满了
            return;
        }
        items[tail++] = val;
        //等价于 tail %= items.length
        if (tail >= items.length) {
            tail = 0;
        }
        size++;
    }

    //出队操作
    public Integer take() {
        int resulet = 0;
        if (size == 0) {
            //队列空了
            return null;
        }
        resulet = items[head++];
        //等价于 head %= elem.length
        if (head >= items.length) {
            head = 0;
        }
        size--;
        return resulet;
    }
}

2. 阻塞队列的简单实现

首先要实现的阻塞队列在大多数情况下是在多线程情况下使用的, 所以要考虑线程安全问题, 上面循环队列的代码takeput方法都有写操作, 直接加锁即可.

//线程安全的循环队列
class MyBlockingQueue {
    //存放元素的数数组
    private int[] items = new int[1000];
    //队头指针
    private int head = 0;
    //队尾指针
    private int tail = 0;
    //记录队列元素的个数
    private int size = 0;

    //入队操作
    public void put (int val) {
        synchronized (this) {
            if (size == items.length) {
                //队列满了
                return;
            }
            items[tail++] = val;
            //等价于 tail %= items.length
            if (tail >= items.length) {
                tail = 0;
            }
            size++;
        }
    }

    //出队操作
    public Integer take() {
        int resulet = 0;
        synchronized (this) {
            if (size == 0) {
                //队列空了
                return null;
            }
            resulet = items[head++];
            //等价于 head %= elem.length
            if (head >= items.length) {
                head = 0;
            }
            size--;
            return resulet;
        }
    }
}

然后就是实现阻塞效果了, 主要是使用waitnotify实现线程的阻塞等待.

入队时, 队列满了需要使用wait方法使线程阻塞, 直到有元素出队队列不满了再使用notify通知线程执行.

出队时, 队列为空也需要使用wait方法使线程阻塞, 直到有新元素入队再使用notify通知线程执行.

代码如下:

class MyBlockingQueue {
    //存放元素的数数组
    private int[] items = new int[1000];
    //队头指针
    private int head = 0;
    //队尾指针
    private int tail = 0;
    //记录队列元素的个数
    private int size = 0;

    //入队操作
    public void put (int val) throws InterruptedException {
        synchronized (this) {
            if (size == items.length) {
                //队列满了,阻塞等待
                this.wait();
            }
            items[tail++] = val;
            //等价于 tail %= items.length
            if (tail >= items.length) {
                tail = 0;
            }
            size++;
            //唤醒因队列空造成的阻塞wait
            this.notify();
        }
    }

    //出队操作
    public Integer take() throws InterruptedException {
        int resulet = 0;
        synchronized (this) {
            if (size == 0) {
                //队列空了,阻塞等待
                this.wait();
            }
            resulet = items[head++];
            //等价于 head %= elem.length
            if (head >= items.length) {
                head = 0;
            }
            size--;
            //唤醒因队列满造成的阻塞wait
            this.notify();
            return resulet;
        }
    }
}

上述代码已经基本实现了阻塞队列的功能, 但不够完善, 这里的代码再改良一下把判断队列满或者空的wait部分的代码, 把if改成while是更好的, 为什么这样写呢?

我们思考当代码中当wait被唤醒的时候, 此时if的条件一定就不成立了吗?

具体来说, 思考put方法中的wait被唤醒, 往下执行是要要求,队列不满但是wait被唤醒了之后, 队列一定是不满的嘛? 其实当前代码中是不会出现这样的问题的, 但是稳妥起见, 最好的办法就是wait唤醒之后再判定一下条件是否满足, 而且Java标准库当中就是建议这么写的.

img

调整部分的代码如下:

//出队部分
while (size == items.length) {
    //队列满了,阻塞等待
    this.wait();
}

//入队部分
while (size == 0) {
    //队列空了,阻塞等待
    this.wait();
}

最后就是测试一下我们所写的这个阻塞队列了, 我们创建两个线程分别是消费者线程customer和生产者线程producer, 生产者生产数字, 消费者消费数字, 为了让执行结果中的阻塞效果明显一些, 我们可以使用sleep方法来控制一下生产者/消费者的生产/消费的频率, 这里我们让开始时生产的速度快一些, 消费的速度慢一些, 全部代码如下:

class MyBlockingQueue {
    //存放元素的数数组
    private int[] items = new int[1000];
    //队头指针
    private int head = 0;
    //队尾指针
    private int tail = 0;
    //记录队列元素的个数
    private int size = 0;

    //入队操作
    public void put (int val) throws InterruptedException {
        synchronized (this) {
            while (size == items.length) {
                //队列满了,阻塞等待
                this.wait();
            }
            items[tail++] = val;
            //等价于 tail %= items.length
            if (tail >= items.length) {
                tail = 0;
            }
            size++;
            //唤醒因队列空造成的阻塞wait
            this.notify();
        }
    }

    //出队操作
    public Integer take() throws InterruptedException {
        int resulet = 0;
        synchronized (this) {
            while (size == 0) {
                //队列空了,阻塞等待
                this.wait();
            }
            resulet = items[head++];
            //等价于 head %= elem.length
            while (head >= items.length) {
                head = 0;
            }
            size--;
            //唤醒因队列满造成的阻塞wait
            this.notify();
            return resulet;
        }
    }
}

public class Test {
    public static void main(String[] args) {
        //消费线程
        MyBlockingQueue queue = new MyBlockingQueue();
        Thread customer = new Thread(() -> {
            while(true) {
                try {
                    int result = queue.take();
                    System.out.println("消费元素: " + result);
                    Thread.sleep(500);
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }
            }
        });
        customer.start();
        //生产线程
        Thread producer = new Thread(() -> {
            int count = 0;
            while (true) {
                try {
                    queue.put(count);
                    System.out.println("生产元素: " + count);
                    count++;
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }
            }
        });
        producer.start();
    }
}

执行结果:
img

可以看到执行结果中因为生产者生产快, 消费者消费慢, 所以一开始生产者生产的速度是极快的, 当生产到阻塞队列满了之后生产者需要等待消费者消费后才能生产, 此时生产者的速度就跟消费者同步了.

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/141862.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

291. 蒙德里安的梦想(状态压缩dp详解)

求把 NM 的棋盘分割成若干个 12 的长方形&#xff0c;有多少种方案。 例如当 N2&#xff0c;M4 时&#xff0c;共有 5 种方案。当 N2&#xff0c;M3 时&#xff0c;共有 3 种方案。 如下图所示&#xff1a; 输入格式 输入包含多组测试用例。 每组测试用例占一行&#xff0c…

龙芯机器JDK安装和配置

龙芯机器&#xff1a;[rootlocalhost j2sdk-image]# cat /proc/cpuinfo | grep name | cut -f2 -d: | uniq -c4 Loongson-3A R4 (Loongson-3A4000) 1800MHz龙芯机器JDK安装和配置下载地址&#xff1a;http://www.loongnix.cn/zh/api/java/选择JDK8&#xff0c;选择MIPS64进行下…

2-3进程管理-进程同步

文章目录一.进程同步、互斥二.实现临界区互斥的基本方法&#xff08;一&#xff09;软件实现方法&#xff08;二&#xff09;硬件实现方法三.互斥锁四.信号量机制五.经典同步问题&#xff08;一&#xff09;生产者-消费者问题&#xff08;二&#xff09;读者-写者问题&#xff…

流逝的一年

昨天远方的大哥打来了电话&#xff0c;我们聊了下近况。当他问及去年是否有新的著作问世&#xff0c;我不禁有些赧然&#xff0c;解释说还在学习中… 放下电话后&#xff0c;我陷入了思索&#xff1a;又是一年划上了句号&#xff0c;这一年我做了什么&#xff1f;我又有什么收…

数据库的一些基本概念

一、服务器&#xff1a;&#xff08;更正大家头脑中的一个错误认识&#xff09; 1、服务器是一种软件&#xff0c;不是硬件&#xff0c;不是计算机。 2、不同服务器负责调用不同类型的文件。 二、表文件、数据库、数据库服务器以及SQL语句&#xff1a; 1、表文件: …

一条 select 语句的执行过程

MySQL 从大方向来说&#xff0c;可以分为 Server 层和存储引擎层。而 Server 层包括连接器、查询缓存、解析器、预处理器、优化器、执行器等&#xff0c;最后 Server 层再通过 API 接口形式调用对应的存储引擎层提供的接口来执行增删改查操作。 如下即为一个简略的 select 语句…

Android动态运行时权限

android 6.0(API 级别 23)开始&#xff0c;android引入了运行时权限&#xff0c;应用安装时不向其授予权限&#xff0c;应用运行时向其授予权限。如果在运行时该功能没有动态地申请相应的权限&#xff0c;就会抛出SecurityException异常。 android的运行时权限的申请过程主要有…

C语言画一个正方体

程序截图 操作方法 鼠标拖动。左键拖动及滚轮能看到不同角度下正方体的形状&#xff0c;右键拖动能将最近的正方体顶点挪到这个投影面的相应位置。 按键控制。wasd 控制投影面旋转&#xff0c;ws 关于 x 轴旋转&#xff0c;ad 关于 y 轴旋转。 个人思路 首先投影面的确立需…

【寒假第一天】LeetCode刷题

&#x1f308;一.选择题&#x1f47f;1.1.堆是一种有用的数据结构。下列那个关键码序列是一个堆&#xff08; &#xff09;。 A. 94,31,53,23,16,72 B. 94,53,31,72,16,23 C. 16,53,23,94,31,72 D. 16,31,23,94,53,72D堆排序有两种排序方法&#xff1a;大堆排序-----根结点要大…

【Kotlin】Kotlin 函数总结 ( 具名函数 | 匿名函数 | Lambda 表达式 | 闭包 | 内联函数 | 函数引用 )

文章目录一、函数头声明二、函数参数1、默认参数值2、具名参数三、Unit 函数四、TODO 函数抛出异常返回 Nothing 类型五、反引号函数名六、匿名函数七、匿名函数的函数类型八、匿名函数的隐式返回九、匿名函数参数十、匿名函数 it 关键字十一、匿名函数变量类型推断十二、匿名函…

JS中BOM 浏览器对象 提供的定时器

window对象提供了2个定时器方法&#xff1a; settTimeout()和setInterval() 1.setTimeout()定时器 语法&#xff1a; window.setTimeout(调用函数,[延迟的毫秒数]);延迟时间可以省略&#xff0c;省略则为0 用于设置一个定时器&#xff0c;该定时器再定时器到期后执行调用函数 …

【nodejs】npm与包

1、什么是包 Node.js中的第三方模块又叫包 2、包的来源 由第三方个人或团队开发出来的&#xff0c;免费供所有人使用 3、为什么需要包 由于Node.js的内置模块仅提供了一些底层的API&#xff0c;导致在基于内置模块进行项目开发时&#xff0c;效率很低。 包是基于内置模块封装出…

杨校老师课堂之IntellJ IDEA的使用技巧

下载地址&#xff1a; https://www.jetbrains.com.cn/idea/download/#sectionwindows 一、常规操作 1、忽略大小写&#xff0c;进行提示 2、启用Idea时&#xff0c;默认不再打开上次项目 3、设置主题 4、设置默认的字体 5、修改类头的文档注释信息 6、设置项目文件编码 7、统一…

electron与jquery起冲突,使用jquery报错解决方法

问题原因&#xff1a;Electron 为了整合 Node.js&#xff0c;会在 DOM 加入 module、exports、require 等模块和函数&#xff0c;和jQuery、RequireJS、Meteor、AngularJS 等发生冲突。 暴力解决方法&#xff1a;去除node功能加持&#xff0c;在加载browserWindow或者browserVi…

C++:闭包:闭包Closure理解

一&#xff1a;什么是闭包 闭包有很多定义&#xff0c;一种说法是&#xff1a;闭包是带有上下文的函数&#xff0c;说白了&#xff0c;就是有状态的函数&#xff0c;这其实就是一个类&#xff0c;换个名字而已。 一个函数&#xff0c;带上一个状态&#xff0c;就变成了闭包&…

共享模型之管程(四)

1.wait/notify 1.1.为什么需要wait? 小故事: ①.假设多个用户(线程)都需要进入房间使用算盘(CPU)进行计算工作,但是为了保证计算过程中的安全,老王设计了一把锁(Synchronized),每次只允许一个用户(线程)拿到钥匙进入房间(成为Owner线程); ②.小南(线程)费了九牛二虎之力,抢…

【Docker】搭建Zookeeper集群

【Docker】搭建Zookeeper集群 下载镜像 docker pull zookeeper:3.5.8wy:study wy$ docker pull zookeeper:3.5.8 3.5.8: Pulling from library/zookeeperDigest: sha256:12af523731cbe390f5332d6c1e254f1d56c734a786910d5582653445a5cee299 Status: Downloaded newer image f…

Allegro174版本新功能介绍之动态铜皮对单独层面参数设置

Allegro174版本新功能介绍之动态铜皮对单独层面参数设置 Allegro升级到了174版本的时候,可以支持动态铜皮对单独的层面进行参数设置,如下图 具体操作如下 在低版本166以及172的时候,只有Global Dynamic Shape Parameter设置,如下图,只有全局的铜皮参数设置升级到了174时候…

WMS智能仓储管理系统源码 SpringMVC物流仓库管理系统源码

淘源码&#xff1a;国内知名的源码免费下载平台 需要源码学习可私信我。 系统介绍&#xff1a; 基于SpringMVCHibernatMinidao&#xff08;类Mybatis&#xff09;Easyui&#xff08;UI库&#xff09; Jquery Boostrap Ehcache Redis Ztree等基础架构开发的物流仓库管理系…

人脸识别:我是如何工作的?

任何自动人脸识别过程都必须考虑导致其复杂性的几个因素&#xff0c;因为人脸是一个动态实体&#xff0c;在多个因素的影响下不断变化&#xff0c;例如光照、姿势、年龄……这三个参数中的任何一个的变化都会导致同一个人的两幅图像之间的误差值大于不同个体的两幅图像之间的误…