一、阻塞队列
阻塞队列是⼀种特殊的队列,其也遵守队列 "先进先出" 的原则;
阻塞队列是⼀种线程安全的数据结构,并且具有以下特性:
当队列满的时候,继续入队列就会阻塞,直到有其他线程从队列中取走元素;
当队列空的时候,继续出队列也会阻塞,直到有其他线程往队列中插⼊元素;
阻塞队列的⼀个典型应用场景就是 "⽣产者消费者模型",这是⼀种非常典型的开发模型;
二、生产者消费者模型
生产者消费者模型就是通过⼀个容器来解决生产者和消费者的强耦合问题。生产者和消费者彼此之间不直接通讯,而通过阻塞队列来进行通讯,所以生产者生产完数据之后不用等待消费者处理,直接扔给阻塞队列,消费者不找生产者要数据,而是直接从阻塞队列里取;
使用阻塞队列实现生产者消费者模型的好处:
1. 阻塞队列就相当于⼀个缓冲区,平衡了生产者和消费者的处理能力;(削峰填谷)
2. 阻塞队列也能使生产者和消费者之间解耦合;
Java 标准库中的阻塞队列:
BlockingQueue 是⼀个接口,真正实现的类是 LinkedBlockingQueue,ArrayBlockingQueue,PriorityBlockingQueue 等;
put 方法用于阻塞式的入队列,take 用于阻塞式的出队列,BlockingQueue 也有 offer,poll,peek 等方法,但是这些方法并不带有阻塞特性;
下面就来实现一个阻塞队列,并基于该阻塞队列实现生产者消费者模型;
public class MyBlockingQueue {
private int[] items = new int[10000];
private volatile int size = 0;
private volatile int head = 0;
private volatile int tail = 0;
/**
* 实现进队列
* @param value value
* @throws InterruptedException e
*/
public synchronized void put(int value) throws InterruptedException {
while (size == items.length){
this.wait();
}
items[tail] = value;
tail = (tail + 1) % items.length;
size++;
notifyAll();
}
/**
* 实现出队列
* @return value
* @throws InterruptedException e
*/
public synchronized int take() throws InterruptedException {
while (size == 0){
this.wait();
}
int tmp = items[head];
head = (head + 1) % items.length;
size--;
notifyAll();
return tmp;
}
public static void main(String[] args) throws InterruptedException {
MyBlockingQueue blockingQueue = new MyBlockingQueue();
Thread customer = new Thread(() -> {
while (true){
try {
int value = blockingQueue.take();
System.out.println("消费者消费 " + value);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
});
Thread producer = new Thread(() -> {
while (true){
try {
int value = new Random().nextInt(100);
blockingQueue.put(value);
System.out.println("生产者生产 " + value);
Thread.sleep(1000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
});
customer.start();
producer.start();
customer.join();
producer.join();
}
}