1.阻塞队列是什么
阻塞队列是一种特殊的队列. 也遵守 "先进先出" 的原则.
阻塞队列能是一种线程安全的数据结构, 并且具有以下特性:
- 当队列满的时候, 继续入队列就会阻塞, 直到有其他线程从队列中取走元素.
- 当队列空的时候, 继续出队列也会阻塞, 直到有其他线程往队列中插入元素.
阻塞队列的一个典型应用场景就是 "生产者消费者模型". 这是一种非常典型的开发模型.
2.生产者消费者模型
生产者消费者模型有啥用?
- 在开发中起到服务器之间"解耦合"的效果~~
- 在请求突然暴增的峰值中,起到"削峰填谷"的效果~~
如果A和B是直接相互调用.
此时A就得知道B的存在,B也得知道A的存在.
并且A需要知道B提供的接口是什么.B也得知道A是通过啥方式来调用的~~(耦合比较强)
如果其中一个服务挂了,另一个服务也好不到哪去~~
A不需要知道B存在,B也不需要知道A存在
双方只要认识这个阻塞队列即可~~
如果其中的一个服务挂了,并不影响另一个服务正常工作~~
如果A收到的请求突然暴增了,此时B收到的请求也会暴增!!!
如果B本身计算量已经很大了请求再次暴增,可能B就挂了~~
(每个请求处理的时候都需要分配硬件资源)
由阻塞队列来承担A这边的压力.
B仍然按照原有的节奏来消费队列中的数据~~
生产者消费者模式就是通过一个容器来解决生产者和消费者的强耦合问题。
生产者和消费者彼此之间不直接通讯,而通过阻塞队列来进行通讯,所以生产者生产完数据之后不用等待消费者处理,直接扔给阻塞队列,消费者不找生产者要数据,而是直接从阻塞队列里取.
1) 阻塞队列就相当于一个缓冲区,平衡了生产者和消费者的处理能力.
比如在 "秒杀" 场景下, 服务器同一时刻可能会收到大量的支付请求. 如果直接处理这些支付请求,服务器可能扛不住(每个支付请求的处理都需要比较复杂的流程). 这个时候就可以把这些请求都放到一个阻塞队列中, 然后再由消费者线程慢慢的来处理每个支付请求.这样做可以有效进行 "削峰", 防止服务器被突然到来的一波请求直接冲垮
2) 阻塞队列也能使生产者和消费者之间 解耦.
比如过年一家人一起包饺子. 一般都是有明确分工, 比如一个人负责擀饺子皮, 其他人负责包. 擀饺子皮的人就是 "生产者", 包饺子的人就是 "消费者".擀饺子皮的人不关心包饺子的人是谁(能包就行, 无论是手工包, 借助工具, 还是机器包), 包饺子的人也不关心擀饺子皮的人是谁(有饺子皮就行, 无论是用擀面杖擀的, 还是拿罐头瓶擀, 还是直接从超市买的)
3.标准库中的阻塞队列
在 Java 标准库中内置了阻塞队列. 如果我们需要在一些程序中使用阻塞队列, 直接使用标准库中的即可.
- BlockingQueue 是一个接口. 真正实现的类是 LinkedBlockingQueue.
- put 方法用于阻塞式的入队列, take 用于阻塞式的出队列.
- BlockingQueue 也有 offer, poll, peek 等方法, 但是这些方法不带有阻塞特性.
生产者消费者模型的实现
package thread2;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
public class Test2 {
public static void main(String[] args) {
BlockingQueue<Integer> blockingQueue = new LinkedBlockingQueue<>();
Thread producer = new Thread(() -> {
int n = 1;
while (true) {
try {
blockingQueue.put(n);
System.out.println("生产者生产了 " + n);
Thread.sleep(1000);
n++;
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
});
producer.start();
Thread customer = new Thread(() -> {
while (true) {
try {
int n = blockingQueue.take();
System.out.println("消费者消费了 " + n);
Thread.sleep(2000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
});
customer.start();
}
}
阻塞队列实现(自己造轮子实现生产者消费者模型)
- 通过 "循环队列" 的方式来实现.
- 使用 synchronized 进行加锁控制.
- put 插入元素的时候, 判定如果队列满了, 就进行 wait. (注意, 要在循环中进行 wait. 被唤醒时不一定队列就不满了, 因为同时可能是唤醒了多个线程).
- take 取出元素的时候, 判定如果队列为空, 就进行 wait. (也是循环 wait)
package thread2;
class MyBlockingQueue {
private int[] items = new int[1000];
private int size = 0;
private int head = 0;
private int tail = 0;
private Object locker = new Object();
public void put(int val) throws InterruptedException {
synchronized (locker) {
if (size == items.length) {
// return;
locker.wait();
}
items[tail] = val;
tail++;
if (tail >= items.length) {
tail = 0;
}
size++;
locker.notify();//唤醒take
}
}
public Integer take() throws InterruptedException {
synchronized (locker) {
if (size == 0) {
locker.wait();
// return null;
}
int ret = items[head];
head++;
if (head >= items.length) {
head = 0;
}
size--;
locker.notify();//唤醒put
return ret;
}
}
}
public class Test1 {
private static MyBlockingQueue queue = new MyBlockingQueue();
public static void main(String[] args) throws InterruptedException {
Thread producer = new Thread(() -> {
int n = 1;
while (true) {
try {
queue.put(n);
System.out.println("生产者生产了:" + n);
n++;
//Thread.sleep(500);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
});
producer.start();
Thread customer = new Thread(() -> {
while (true) {
try {
int n = queue.take();
System.out.println("消费者消费了 " + n);
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
customer.start();
}
}