阻塞队列:
在介绍生产消费者模型之前,我们先认识一下阻塞队列。
阻塞队列是一种支持阻塞操作的队列,常用于生产者消费者模型,它提供了线程安全的队列操作,并且在队列为空或满时,能够阻塞等待,直到条件满足时。
阻塞队列:是一种更为复杂的队列,和之前学的队列有些相似。
阻塞队列的特点:阻塞队列是线程安全的
当队列为空时,尝试出队列,出队列操作就会形成阻塞,直到添加元素为止。
当队列为满时,尝试入队列,入队列操作就会形成阻塞,直到其他线程取走元素为止。
java阻塞队列主要通过java.util.concurrent包中的BlockingQueue接口及其实现类来实现,常见的实现类包括:
1.ArrayBlockingQueue:基于数组的有界阻塞队列,必须指定队列的容量,按照先进先出原则处理元素
2.LinkedBlockingQueue:基于链表的阻塞队列,可以选择有界或无界的(默认无界的【非常大】),按照先进先出原则处理元素。(实际开发,一般建议需要设置好上限,否则你的队列可能非常大,容易把资源耗尽,产生内存超出范围这样的异常)
3.PriorityBlockingQueue:基于优先级的无界阻塞队列,元素必须实现Comparable接口,或者通过构造函数传入Comparator,按照优先级顺序处理元素。
插入操作:add(E e):插入元素,成功返回true,队列满时抛出IllegalStateException(但由于是无界队列,通常不会满),还有offer(E e),put(E e)。
移除操作:remove():移除元素,并返回队列的头部元素,队列为空则抛出异常。poll():移除并返回头部的元素,若队列为空,则返回null。take()移除并返回头部的元素,队列为空时阻塞,直到有元素可用。
在入队列和出队列时,只有put()方法和take()方法,才带有阻塞功能。
生产者-消费者模型:
生产者消费者模型是一种经典的多线程协作模式,用于解决生产者和消费者之间的数据交换问题。生产者负责生成数据并放入共享缓冲区(一般用上述的阻塞队列来储存),而消费者则从缓冲区中取出数据进行处理。为了避免竞争条件和确保线程安全,通常需要使用同步机制。
示例:平时我们包饺子的时候:
第一种情况:擀饺子皮的人(生产者)直接将擀好的饺子皮递给包饺子的人(消费者),这样子的缺点很明显:如果翰饺子皮的人的速度很快,包饺子的人速度跟不上,那饺子皮就会有过剩的,怎么办呢?
如果翰饺子皮的人将擀好的饺子皮放在桌子上,包饺子的人直接从桌子上面拿饺子皮来包饺子,这样就不会出现上面的问题(也就是生产者消费者模型)
在生产者消费者模型使用阻塞队列的优势:
1.解耦合 :
解耦合不一定是两个线程之间,也可以是两个服务器之间
如果服务器A直接访问服务器B,那么这两个服务器之间的耦合度就更高。编写服务器A的代码会有一些包含服务器B的相关逻辑,编写服务器B的代码多少会包含一些服务器A的相关逻辑。当一个服务器受到影响,另一个服务器也会受到相应的影响(如果耦合度很高,这样就不是很好修改相关代码)。
引入阻塞队列之后,服务器A和队列交互,服务器B和队列交互,服务器A不会直接和服务器B交互
,就降低了服务器A和B之间的耦合度。
2.削峰填谷:
上述情况,像服务器A这种上游服务器(入口服务器),干的活很少(单个请求消耗的资源很少)但是B这种下游服务器,承担着更重的任务,复杂的计算/储存工作,单个请求消耗的资源很多(更加容易挂)
一般流量激增的时间是突发的,也是短暂的,为了让服务器B即不会突发性的面临流量激增,也还能处理请求,所以就可以通过阻塞队列来充当缓冲区(趁着波峰过去了,B继续处理请求,利用波谷的时间,来处理之前积压的数据)
阻塞队列很重要,有的甚至会把队列单独部署成一个服务,队列服务器往往可以抵抗很高的请求量。
生产者消费者模型的代价:
1.引入队列之后,整体的结构会更加复杂 (此时需要更多的机器,进行部署,生产环境的结构会更加复杂,管理起来会更麻烦)
2.效率会有影响
模拟实现一个简单的阻塞队列:
class MyBlockingQueue{
private String[] data=null;
private int head=0;//队列头
private int tail=0;//队列尾
private int size=0;//元素个数
public MyBlockingQueue(int capacity){
data=new String[capacity];
}
public void put(String elem) throws InterruptedException {
synchronized (this){
while(size>=data.length){
//队列满了,队列未满时,唤醒wait
this.wait();
}
data[tail]=elem;
tail++;
if(tail>data.length){
tail=0;
}
size++;
this.notify();
}
}
public String take() throws InterruptedException {
synchronized (this){
while(size==0){
//队列为空,队列不为空时,唤醒wait
this.wait();
}
String s=data[head];
head++;
if(head>data.length){
head=0;
}
size--;
this.notify();
return s;
}
}
}
public class Demo {
public static void main(String[] args) {
MyBlockingQueue queue=new MyBlockingQueue(1000);
Thread producer=new Thread(()->{
int n=0;
while(true){
try {
queue.put(n+"");
System.out.println("生产元素 " + n);
n++;
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
Thread consumer=new Thread(()->{
while(true){
String n=null;
try {
n=queue.take();
System.out.println("消耗元素 " + n);
Thread.sleep(1000);//看到结果的变化
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
producer.start();
consumer.start();
}
}
如果有若干个线程使用这个队列,发生阻塞只有两种可能,要么所有线程阻塞在put方法,要么所有线程阻塞在take方法,但是这些线程不可能既阻塞在put,又阻塞在take里面(因为队列不可能同时为满,又为空)
问题1:为什么这里要用while循环呢?不用if?
答:这里用while循环,是为了“二次验证”,因为wait除了会被notify唤醒之外,还有可能interrupt这样的方法给中断,用if判断,就有可能有提前唤醒的风险。所以用while进行二次验证。
问题2:如果此时队列已经满了,此时三个线程分别put(1),put(2),put(3),那这三个线程都会阻塞,现在第四个线程take()之后,线程1的put(1)的wait被唤醒,继续执行,执行到线程1的notify,因为唤醒是随机的,那有没有可能唤醒线程2的put(2),或者线程3的put(3)。
答:不可能唤醒线程2,线程3,多线程notify对wait的唤醒是随机的,但是此时如果唤醒了线程2/线程3的wait,但是别忘了还有二次验证,当验证发现阻塞队列已经满了,还是会继续阻塞等待。
注意:wait在被设计的时候,就是搭配while使用。