💐个人主页:初晴~
📚相关专栏:多线程 / javaEE初阶
一、阻塞队列
- 当队列满的时候, 继续⼊队列就会阻塞, 直到有其他线程从队列中取⾛元素
- 当队列空的时候, 继续出队列也会阻塞, 直到有其他线程往队列中插⼊元素
二、生产者消费者模型
1、概念
比如一个生产线需要制作零件,组装两个步骤,这时就由两种方式进行生产部署:
1、每个人都分别进行零件制作和组装两个操作
2、将员工分成两拨人,一拨人专门负责制作零件,另一拨人专门负责组装
显然第一种方法是比较低效的,制作零件的机器数量是有限的,所有人就会去竞争这几台机器,可能会导致阻塞等待导致效率低下。而第二种方法则实现了各司其职,自然效率会更高,这时零件制作就相当于是生产者,组装就相当于是消费者,这种方式就被称之为“生产者消费者模型”,在后端开发中经常会涉及到。
2、作用
⽣产者消费者模式就是通过⼀个容器来解决⽣产者和消费者的强耦合问题。⽣产者和消费者彼此之间不直接通讯,⽽通过阻塞队列来进⾏通讯,所以⽣产者⽣产完数据之后不⽤等待消费者处理,直接扔给阻塞队列,消费者不找⽣产者要数据,⽽是直接从阻塞队列⾥取.
注意:阻塞队列是一种 数据结构,由于比较好用,会将其单独封装为一个服务器程序,并且在单独的服务器机器上进行部署。这时的阻塞队列就被称之为 “消息队列”(Messae Queue,MQ)了。而消息队列相对是比较成熟的,代码不会频繁修改,因此我们认为A与队列,B与队列之间的交互是 低耦合的。
2. 阻塞队列就相当于⼀个缓冲区,平衡了⽣产者和消费者的处理能⼒. (削峰填⾕)
优点:1、解耦合2、削峰填谷缺点:1、需要更多的机器来部署消息队列2、生产者与消费者之间的通信会延时,响应时间会变长
三、实现方式
1、手写一个阻塞队列
在之前的文章中,我们曾在 栈和队列 一文深入探讨过队列的写法,这里就不做过多赘述,直接看一下最基础的队列的代码实现:
class MyBlockingQueue{
private String[] data=null;
private int head=0;
private int tail=0;
private int size=0;
public MyBlockingQueue(int capacity){
data=new String[capacity];
}
public void put(String s){
if(size==data.length){
//队列满了
return;
}
data[tail]=s;
tail++;
if(tail>=data.length){
tail=0;
}
size++;
}
public String take(){
if(size==0){
//队列为空
return null;
}
String ret=data[head];
head++;
if(head>=data.length){
head=0;
}
size--;
return ret;
}
}
由于put与take方法中涉及了很多的修改操作,这样的代码在多线程环境下肯定是会有线程安全问题的,博主在 深入剖析线程安全问题 一文中做过详细分析。
那么该如何解决这一问题呢,显然就是通过加锁操作了:
但这样还是不够的。当线程发现队列为空时,不应该继续再去参与锁的竞争应该直接进入阻塞等待的状态,等到其它线程调用put方法,队列不为空时,再恢复执行,从而避免浪费不必要的资源,提高执行效率,这时就可以利用wait-notify来解决了。这在博主的 等待通知机制 一文过详细的介绍。于是最终代码实现就如下:
class MyBlockingQueue{
private String[] data=null;
private int head=0;
private int tail=0;
private int size=0;
public MyBlockingQueue(int capacity){
data=new String[capacity];
}
public void put(String s){
synchronized (this){
if(size==data.length){
//队列满了
return;
}
data[tail]=s;
tail++;
if(tail>=data.length){
tail=0;
}
size++;
this.notify();
}
}
public String take() throws InterruptedException {
synchronized (this){
if(size==0){
//队列为空
this.wait();
}
String ret=data[head];
head++;
if(head>=data.length){
head=0;
}
size--;
return ret;
}
}
}
2、实现生产者消费者模型
- BlockingQueue 是⼀个接⼝. 真正实现的类LinkedBlockingQueue/ArrayBlockingQueue等
- put ⽅法⽤于阻塞式的⼊队列, take ⽤于阻塞式的出队列
- BlockingQueue 也有 offer, poll, peek 等⽅法, 但是这些⽅法不带有阻塞特性
使用示例:
public class Main {
public static void main(String[] args) throws InterruptedException {
BlockingQueue<String> queue=new ArrayBlockingQueue<>(3);
queue.put("111");
System.out.println("put 成功");
queue.put("111");
System.out.println("put 成功");
queue.take();
System.out.println("take 成功");
queue.take();
System.out.println("take 成功");
queue.take();
System.out.println("take 成功");
}
}
我们可以看到当执行第三个take时,由于此时队列为空,因此线程就会进入阻塞状态了。
接着我们就可以用它来试着简单实现一个生产者消费者模型了:
public class Main{
public static void main(String[] args) {
BlockingQueue<Integer> queue=new ArrayBlockingQueue<>(1000);
//生产者线程
Thread t1=new Thread(()->{
int i=1;
while(true){
try {
queue.put(i);
System.out.println("生产元素 "+ i);
i++;
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
});
//消费者线程
Thread t2=new Thread(()->{
while (true){
try {
Integer i=queue.take();
System.out.println("消费元素 "+ i);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
});
t1.start();
t2.start();
}
}
其中t1就作为生产者线程,t2作为消费者线程,让我们试试看让生产慢于消费会发生什么:
这时我们得益于生产者消费者模型,由于生产会慢一些,消费者就会进入阻塞等待,每当生产者线程生产一个元素,便紧接着消费一个元素,从而形成了这种井然有序的执行结果。
让我们再试试消费慢于生产会发生什么:
我们可以看到虽然上传线程一下产生了非常多生产元素,但是由于阻塞队列的存在,消费线程依旧会不紧不慢的依次处理消费掉元素,这能有效防止服务器在高并发环境下面对激增的需求量无法执行而崩溃。
总结
生产者消费者模型可以有效地让程序解耦合,便于后续代码的维护和优化,并且可以有效地“削峰填谷”,在高并发场景下能有效缓解服务器压力,避免服务器崩溃。在以后的代码开发中是非常重要一种结构。
那么本篇文章就到此为止了,如果觉得这篇文章对你有帮助的话,可以点一下关注和点赞来支持作者哦。作者还是一个萌新,如果有什么讲的不对的地方欢迎在评论区指出,希望能够和你们一起进步✊