目录
1.阻塞队列是什么?
2.生产者消费者模型
3.标准库中的阻塞队列
4.阻塞队列的实现
1.阻塞队列是什么?
阻塞队列是⼀种特殊的队列. 也遵守 "先进先出" 的原则
阻塞队列能是⼀种线程安全的数据结构, 并且具有以下特性:
当队列满的时候, 继续⼊队列就会阻塞, 直到有其他线程从队列中取⾛元素.
当队列空的时候, 继续出队列也会阻塞, 直到有其他线程往队列中插⼊元素.
阻塞队列的⼀个典型应⽤场景就是 "⽣产者消费者模型". 这是⼀种⾮常典型的开发模型.
2.生产者消费者模型
⽣产者消费者模式就是通过⼀个容器来解决⽣产者和消费者的强耦合问题。 ⽣产者和消费者彼此之间不直接通讯,⽽通过阻塞队列来进⾏通讯,所以⽣产者⽣产完数据之后不⽤ 等待消费者处理,直接扔给阻塞队列,消费者不找⽣产者要数据,⽽是直接从阻塞队列⾥取.
实际开发中, 经常会涉及到 "分布式系统", 服务器整个功能不是由一个服务器全部完成的. 而是每个服务器负责一部分功能, 通过服务器之间的网络通信, 最终完成整个功能.
上述过程中, A 和 B , A 和 C 之间的耦合性是比较轻的, A中的代码需要设计到一些B相关的操作, B中的代码也涉及到和A相关的操作, A 的代码中也需要涉及和 C 相关的操作, C 的代码也涉及和 A 相关的操作, 另外, 如果 B 或者 C 服务器出现故障, 对 A 的影响就很大.
引用生产者消费者模型, 就可以降低上述的耦合
这样 A B C 之间就不是直接交互了, 而是通过队列在中间经常传递.
3.标准库中的阻塞队列
使用实例:
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
public class ThreadDemo28 {
public static void main(String[] args) throws InterruptedException {
BlockingQueue<String> queue = new ArrayBlockingQueue<>(100);
queue.put("hello");
System.out.println(queue.take());
System.out.println(queue.take());
}
}
当队列中的元素为空时, 在执行 take() 方法, 进程就会进入阻塞状态:
此时进程并会不结束.
4.阻塞队列的实现
基于环形队列来简单实现阻塞队列:
import static java.lang.Thread.sleep;
class MyBlockingQueue {
private String[] elems = null;
private int head = 0;
private int tail = 0;
private int size = 0;
Object locker = new Object();
public MyBlockingQueue(int capacity) {
elems = new String[capacity];
}
public void put(String elem) throws InterruptedException {
synchronized (locker) {
while(size >= elem.length()) {
//队列满了就阻塞
locker.wait();
}
elems[tail] = elem;
tail++;
if(tail >= elems.length) {
tail = 0;
}
size++;
// 入队列成功后唤醒
locker.notify();
}
}
public String take() throws InterruptedException {
String elem = null;
synchronized (locker) {
while(size == 0) {
// 队列空了
// 也需要这个代码阻塞
locker.wait();
}
elem = elems[head];
head++;
if(head >= elems.length) {
head = 0;
}
size--;
//元素出队列成功后, 唤醒
locker.notify();
}
return elem;
}
}
细节注意:
基于阻塞队列实现简单的生产者消费者模型:
public class ThreadDeom29 {
public static void main(String[] args) {
MyBlockingQueue queue = new MyBlockingQueue(1000);
//生产者
Thread t1 = new Thread(()->{
int n = 1;
while(true) {
try {
queue.put(n + "");
System.out.println("生产元素 " + n);
sleep(1000);
n++;
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
});
//消费者
Thread t2 = new Thread(()-> {
while(true) {
try {
String n = queue.take();
System.out.println("消费元素 " + n);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
});
t1.start();
t2.start();
}
}
t1线程每隔一秒钟存放一个元素到阻塞队列中, t2 线程则在队列中有元素时将元素取出, 队列没有元素就进入阻塞状态, 直到队列不再为空