阻塞队列 详解
- 一. 什么是阻塞队列
- 二. 生产者消费者模型
- 三. 标准库中的阻塞队列
- 四. 阻塞队列实现
一. 什么是阻塞队列
阻塞队列是一种特殊的队列. 也遵守 “先进先出” 的原则.
阻塞队列是一种线程安全的数据结构, 并且具有以下特性:
- 当队列满的时候, 继续入队列就会阻塞, 直到有其他线程从队列中取走元素.
- 当队列空的时候, 继续出队列也会阻塞, 直到有其他线程往队列中插入元素.
阻塞队列的一个典型应用场景就是 “生产者消费者模型”. 这是一种非常典型的开发模型.
二. 生产者消费者模型
生产者消费者模式就是通过一个容器来解决生产者和消费者的强耦合问题。
生产者和消费者彼此之间不直接通讯,而通过阻塞队列来进行通讯,所以生产者生产完数据之后不用等
待消费者处理,直接扔给阻塞队列,消费者不找生产者要数据,而是直接从阻塞队列里取.
生产者消费者模型的优点:
- 能对请求 “削峰填谷”
比如在 “秒杀” 场景下, 服务器同一时刻可能会收到大量的支付请求. 如果直接处理这些支付请求,
服务器可能扛不住(每个支付请求的处理都需要比较复杂的流程). 这个时候就可以把这些请求都放
到一个阻塞队列中, 然后再由消费者线程慢慢的来处理每个支付请求.
这样做可以有效进行 “削峰”, 防止服务器被突然到来的一波请求直接冲垮.
不使用阻塞队列:
使用阻塞队列:
- 阻塞队列也能使生产者和消费者之间 解耦.
比如过年一家人一起包饺子. 一般都是有明确分工, 比如一个人负责擀饺子皮, 其他人负责包. 擀饺子皮的人就是 “生产者”, 包饺子的人就是 “消费者”.
擀饺子皮的人不关心包饺子的人是谁(能包就行, 无论是手工包, 借助工具, 还是机器包), 包饺子的人也不关心擀饺子皮的人是谁(有饺子皮就行, 无论是用擀面杖擀的, 还是拿罐头瓶擀, 还是直接从超市买的).
不使用阻塞队列:
使用阻塞队列:
三. 标准库中的阻塞队列
在 Java 标准库中内置了阻塞队列. 如果我们需要在一些程序中使用阻塞队列, 直接使用标准库中的即可.
- BlockingQueue 是一个接口. 真正实现的类是 LinkedBlockingQueue.
- put 方法用于阻塞式的入队列, take 用于阻塞式的出队列.
- BlockingQueue 也有 offer, poll, peek 等方法, 但是这些方法不带有阻塞特性.
BlockingQueue<String> queue = new LinkedBlockingQueue<>();
// 入队列
queue.put("abc");
// 出队列. 如果没有 put 直接 take, 就会阻塞.
String elem = queue.take();
生产者消费者模型
public static void main(String[] args) throws InterruptedException {
BlockingQueue<Integer> blockingQueue = new LinkedBlockingQueue<Integer>();
Thread customer = new Thread(() -> {
while (true) {
try {
int value = blockingQueue.take();
System.out.println("消费元素: " + value);
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}, "消费者");
customer.start();
Thread producer = new Thread(() -> {
Random random = new Random();
while (true) {
try {
int num = random.nextInt(1000);
System.out.println("生产元素: " + num);
blockingQueue.put(num);
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}, "生产者");
producer.start();
customer.join();
producer.join();
}
四. 阻塞队列实现
- 通过 “循环队列” 的方式来实现.
- 使用 synchronized 进行加锁控制.
- put 插入元素的时候, 判定如果队列满了, 就进行 wait. (注意, 要在循环中进行 wait. 被唤醒时不一定队列就不满了, 因为同时可能是唤醒了多个线程).
- take 取出元素的时候, 判定如果队列为空, 就进行 wait. (也是循环 wait)
class MyBlockingQueue {
private int[] items = new int[1000];
private volatile int size = 0; // 注意使用 volatile 保证内存可见性
private int head = 0;
private int tail = 0;
public void put(int value) throws InterruptedException {
synchronized (this) {
// 此处最好使用 while.
// 否则 notifyAll 的时候, 该线程从 wait 中被唤醒,
// 但是紧接着并未抢占到锁. 当锁被抢占的时候, 可能又已经队列满了
// 就只能继续等待
while (size == items.length) {
wait();
}
items[tail] = value;
tail = (tail + 1) % items.length;
size++;
notifyAll();
}
}
public int take() throws InterruptedException {
int ret = 0;
synchronized (this) {
while (size == 0) {
wait();
}
ret = items[head];
head = (head + 1) % items.length;
size--;
notifyAll();
}
return ret;
}
public synchronized int size() {
return size;
}
// 测试代码
public static void main(String[] args) throws InterruptedException {
MyBlockingQueue blockingQueue = new MyBlockingQueue();
Thread customer = new Thread(() -> {
while (true) {
try {
int value = blockingQueue.take();
System.out.println("消费元素: " + value);
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}, "消费者");
customer.start();
Thread producer = new Thread(() -> {
Random random = new Random();
while (true) {
try {
int num = random.nextInt(1000);
System.out.println("生产元素: " + num);
blockingQueue.put(num);
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}, "生产者");
producer.start();
customer.join();
producer.join();
}
}
注意:
- 针对 this 加锁, 即针对当前类对象加锁, 只有多个线程使用同一个阻塞队列时才会发生阻塞。
- 使用 volatile 保证内存可见性。
- 判断是否需要 wait 时使用 while 循环判断。 因为可能线程被唤醒时, 条件又再次不满足了,
比如: t1 线程取元素时发现队列空的就阻塞了, t2 插入了一个元素, 唤醒了所有的线程, 但是 t1 没有抢到锁, t2 抢到了, 把唯一的一个元素取走了, 当 t1 抢到锁时, 队列已经再次为空了,t1 还得继续阻塞等待。
好啦! 以上就是对 阻塞队列 的讲解,希望能帮到你 !
评论区欢迎指正 !