阻塞队列(BlockingQueue)
阻塞队列是多线程代码中比较常用的一种数据结构。是一种特殊的队列,带有阻塞特性。
为何说是一种特殊的队列?
1.线程安全
2.带有阻塞特性
- 如果队列为空,继续出队列,就会发生阻塞。阻塞到其他线程往队列里添加元素为止。
- 如果队列为满,继续入队列,就会发生阻塞。阻塞到其他线程从队列中取走元素为止。
- 意义:可以用来实现"生产者消费者模型"。生产者消费者模型通俗的来讲,生产者负责生产东西,并将东西放到阻塞队列中,然后消费者就会从阻塞队列中获取内容,如果生产者生产的慢,消费者就得等待 (即消费者从空的队列中获取元素就得等待);相反如果生产者生产的快,生产者就可以休息速度慢下来 (即生产者从满的队列中添加元素就会阻塞等待)。
在java中标准库中针对zuseduilie提供两种实现方式:
基于数组:
BlockingQueue<String> queue = new ArrayBlockingQueue<>(5);基于链表:
BlockingQueue<String> queue = new LinkedBlockingDeque<>();方法:
put() 入队列 带有阻塞性质
take() 出队列 带有阻塞性质
实现基于数组(循环队列)的阻塞队列:
//实现阻塞队列
class myBlockingQueue{
//锁对象
private Object object = new Object();
//队列采用循环队列 数组
private String[] data = new String[1000];
//队头元素位置 加volatile防止内存可见性问题
private volatile int head = 0;
//队尾元素位置
private volatile int tail = 0;
//有效长度
private volatile int size = 0;
//带有阻塞性质的入队操作put
public void put(String str) throws InterruptedException {
synchronized(object) {
//队列满时
if(size==data.length) {
//阻塞等待 等待另一个线程调用notify方法唤醒
object.wait();
}
//队列不满 入队列
data[tail] = str;
tail++;
size++;
object.notify();
//由于数组循环使用 也防止索引出界
if(tail==data.length) {
tail = 0;
}
}
}
//带有阻塞性质的出队列操作
public String take() throws InterruptedException {
synchronized(object) {
//队列为空
if(size==0) {
//阻塞等待
object.wait();
}
//队列不为空
String tmp = data[head];
head++;
if(head==data.length) {
head = 0;
}
size--;
//唤醒
object.notify();
return tmp;
}
}
}
代码实现中的一些细节:
- 指向队头,队尾元素,size在代码中可能会出现内存可见性问题,要加volatile。
- 入队,出队方法都存在着可能会影响线程安全的读,修改操作,最好给整个方法加锁。
- 虽然在一个方法中有wait和notify,但是一个队列满队列和空队列不会同时出现。并且使用wait进行阻塞等待时,是由另一个线程中的notify唤醒的。
- 抛异常可以是方法后跟throws,也可以是try...catch...,但这里应该使用throws,原因是try...catch...执行后程序不会停止,还是继续向下执行,对应代码就是入队操作判断队列满时,如果使用try...catch...,程序出现异常后,向下再接着执行,是会覆盖掉队列中其他未执行的内容的,而使用throws若程序出现异常,会抛出异常后整个方法就结束了,interrupt唤醒了wait。
- 使用wait的时候,往往都是使用while作为条件判定的方式,java源码解释也是推荐while。目的就是为了让wait唤醒之后还能再确认一次,是否条件仍然满足。
- 一个队列,空和满只能同时出现一种,take和put只有一边能阻塞。如果put阻塞了,其他线程继续调用put也都会阻塞,只有靠take唤醒,如果take阻塞了,其他线程继续调用take也都会阻塞,只能靠put唤醒。
实现生产者消费者模型
生产者 - 消费者模型( Producer-consumer problem) 是一个非常经典的多线程并发协作的模型,在分布式系统里非常常见。
这个模型由两类线程和一个缓冲区组成来组成
- 生产者线程:生产数据,并把数据放在这个队列里面
- 缓冲区:存放生产者的数据的地方即阻塞队列
- 消费者线程:从队列里面取数据,消费数据
运行流程
- 生产者和消费者在同一时间段内共用同一个存储空间
- 生产者往存储空间中添加产品
- 消费者从存储空间中取走产品
- 当存储空间为空时,消费者阻塞,当存储空间满时,生产者阻塞。
实现"生产者消费者模型"好处:
(1)解耦合
两个模块之间联系越紧密,耦合就越高。尤其对于分布式系统来说,十分有意义。
(2)削峰填谷
峰:指短时间内请求多。
比如服务器和客户端之间的请求与响应,当用户量请求增大时,服务器也会受牵连,甚至于 将服务器弄崩溃给挂了,耦合性较高,如果两者之间用一种数据结构如队列存储请求,就不论客户 端用户量请求有多大时,服务器仍然可以按照自己的速度去处理请求。
消息队列:当把阻塞队列封装成单独的服务器程序,部署到特定的机器上,这个时候就把这个队列称为"消息队列"。
实现生产者消费者模型代码:
//实现阻塞队列
class myBlockingQueue{
//锁对象
private Object object = new Object();
//队列采用循环队列 数组
private String[] data = new String[1000];
//头指针 加volatile防止内存可见性问题
private volatile int head = 0;
//尾指针
private volatile int tail = 0;
//有效长度
private volatile int size = 0;
//带有阻塞性质的入队操作put
public void put(String str) throws InterruptedException {
synchronized(object) {
//队列满时
while (size==data.length) {
//阻塞等待 等待另一个线程调用notify方法唤醒
object.wait();
}
//队列不满 入队列
data[tail] = str;
tail++;
size++;
object.notify();
//由于数组循环使用 也防止索引出界
if(tail==data.length) {
tail = 0;
}
}
}
//带有阻塞性质的出队列操作
public String take() throws InterruptedException {
synchronized(object) {
//队列为空
while (size==0) {
//阻塞等待
object.wait();
}
//队列不为空
String tmp = data[head];
head++;
if(head==data.length) {
head = 0;
}
size--;
//唤醒
object.notify();
return tmp;
}
}
}
//借助阻塞队列 实现生产者消费者模型
public class test {
public static void main(String[] args) {
MyBlockingQueue queue = new MyBlockingQueue();
//生产者模型
Thread t1 = new Thread(()->{
int num = 1;
while(true) {
try {
queue.put(num);
System.out.println("生产者生产"+num);
num++;
//Thread.sleep(1000); //生产者有节奏生产
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
//消费者模型
Thread t2 =new Thread(()->{
while(true) {
try {
int tmp = queue.take();
System.out.println("消费者消费"+tmp);
Thread.sleep(1000); //消费者有节奏消费
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
t1.start();
t2.start();
}
}