阻塞队列和生产者消费者模型
文章目录
- 阻塞队列和生产者消费者模型
- 1. 阻塞队列
- 2. Java标准库中的阻塞队列 - BlockingQueue
- 3. 阻塞队列的实现
- 4. 生产者消费者模型
1. 阻塞队列
阻塞队列是什么呢?阻塞队列是一种特殊的队列,满足队列的基本要求 - 先进先出。同时阻塞队列使一种线程安全的数据结构。不过阻塞队列相较于普通队列也有着它的特殊之处。
- 线程安全
- 队列满时,继续插入元素,队列会阻塞,直到其它线程从队列中取出元素。
- 队列空时,继续删除元素,队列会阻塞,直到其它线程从队列中插入元素。
阻塞队列的一个典型应用场景就是 “生产者消费者模型”,后面我们也将介绍到。
2. Java标准库中的阻塞队列 - BlockingQueue
Java 标准库中内置了阻塞队列。在大部分场景下,Java提供的阻塞队列已经足够满足我们如果我们需求了。
Java提供的阻塞队列 - BlockingQueue 是一个接口, 真正实现的类是LinkedBlockingQueue我们在使用时需要new其实现类LinkedBlockingQueue的对象。
操作:
- put() - 用于阻塞式的入队列
- take() - 用于阻塞式的出队列.
BlockingQueue也有 offer, poll, peek 等方法, 但是这些方法不带有阻塞特性,但是一般不会使用。
3. 阻塞队列的实现
这里的阻塞是通过 “循环数组” 的方式来实现的,只需要对其操作使用 synchronized 进行加锁控制.,保证原子性,put 插入元素的时候, 判定如果队列满了, 就进行 wait.
take 取出元素的时候, 判定如果队列为空, 就进行 wait。
public class BlockingQueue {
private int[] arr = new int[1000];
private volatile int size = 0;
private int front = 0;
private int rear = 0;
public void put(int value) throws InterruptedException {
synchronized (this) {
// 此处使用 while,否则 notifyAll 的时候, 该线程从 wait 中被唤醒,
// 但是紧接着并未抢占到锁. 当锁被抢占的时候, 可能又已经队列满了
// 就只能继续等待
while (size == arr.length) {
wait();
}
arr[rear] = value;
rear = (rear + 1) % arr.length;
size++;
notifyAll();
}
}
public int take() throws InterruptedException {
int ret = 0;
synchronized (this) {
while (size == 0) {
wait();
}
ret = arr[front];
front = (front + 1) % arr.length;
size--;
notifyAll();
}
return ret;
}
public synchronized int size() {
return size;
}
}
注意, 要在循环中进行 wait, 被唤醒时不一定队列就不满了/不空了, 因为同时可能是唤醒了多个线程。
4. 生产者消费者模型
生产者消费者模式是通过一个容器来解决生产者和消费者的强耦合问题。生产者和消费者彼此之间不直接通讯,而通过阻塞队列来进行通讯,所以生产者生产完数据之后不用等待消费者处理,直接扔给阻塞队列,消费者不找生产者要数据,而是直接从阻塞队列里取.
-
阻塞队列就相当于一个缓冲区,平衡了生产者和消费者的处理能力 - 削峰填谷
-
阻塞队列也能使生产者和消费者之间解耦.
当多个客户端(生产者)同时访问服务器时,服务器的消费能力有限,如果客户端直接将请求发送非服务器,一但超过了服务器的最大承受能力,服务器就可能会挂掉。我们可以让客户端将请求发送给阻塞队列,这样虽然不能及时返回响应但是至少保证了服务的稳定性(削峰)。同时使用这样一种模式,可以降低客户端和服务器直接的耦合,客户端的请求并不是直接到达服务器,而是经过了阻塞队列。