目录
1. 堵塞队列
2. 生产者消费者模型
2.1 解耦合
2.2 削峰填谷
2.3 代码实现生产者消费者模型
3. 构建堵塞队列
3.1 实现普通队列(循环队列)
3.2 普通队列加上线程安全
3.3 普通队列实现堵塞功能
3.4 堵塞队列最终代码
4. 使用生产者消费者模型测试自己构建的堵塞队列
1. 堵塞队列
阻塞队列是一种特殊的队列. 也遵守 "先进先出" 的原则
阻塞队列能是一种线程安全的数据结构, 并且具有以下特性:
- 当队列满的时候, 继续入队列就会阻塞, 直到有其他线程从队列中取走元素.
- 当队列空的时候, 继续出队列也会阻塞, 直到有其他线程往队列中插入元素.
阻塞队列的一个典型应用场景就是 "生产者消费者模型". 这是一种非常典型的开发模型.
代码演示
2. 生产者消费者模型
- 生产者消费者模式就是通过一个容器来解决生产者和消费者的强耦合问题
- 生产者和消费者彼此之间不直接通讯,而通过阻塞队列来进行通讯,所以生产者生产完数据之后不用等待消费者处理,直接扔给阻塞队列,消费者不找生产者要数据,而是直接从阻塞队列里取.
生产者消费者模型生活举例(包饺子)
那么生产者消费者模型主要解决是什么呢?
1. 解耦合
2. 削峰填谷
2.1 解耦合
在未来我们编写代码的时候,我们写的代码一定要尽量满足"高内聚,低耦合"这样的形式.
低耦合:两个模块之间的关联程度越高,耦合就越高,我们写代码要做到低耦合,防止牵一发动全身的情况
高内聚:相关联的代码,要规整的放在一起,避免想用一段代码的时候出现找不到或者找错的行为.
那么堵塞队列怎么实现的解耦合呢?
举例说明
如果引入生产者消费者模型,就能解耦合.
2.2 削峰填谷
2.3 代码实现生产者消费者模型
package blockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
/**
* Created with IntelliJ IDEA.
* Description:基于堵塞队列构建一个生产者消费者模型
* User: YAO
* Date: 2023-05-16
* Time: 16:40
*/
public class ProducerConsumerModel {
public static void main(String[] args) {
BlockingQueue<Integer> queue = new LinkedBlockingQueue<>();
//消费者
Thread t1 = new Thread(()->{
while (true){
try {
int value = queue.take();
System.out.println("消费元素"+value);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
});
t1.start();
//生产者
Thread t2 = new Thread(()->{
int value = 0;
while (true){
System.out.println("生产元素"+value);
try {
queue.put(value);
value++;
Thread.sleep(1000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
});
t2.start();
//上述代码,让生产者,每隔1秒,生产一个元素
// 消费者不受限制
}
}
运行结果:
我们生活中的例子:三峡大坝
3. 构建堵塞队列
自己实现堵塞队列主要有三步:
- 实现一个普通队列
- 加上线程安全
- 加上阻塞功能
3.1 实现普通队列(循环队列)
public class MyBlockingQueue {
private int[] items = new int[100];
// 约定[head, tail)队列 循环队列
private int head;
private int tail;
private int size;
public void put(int elem){
if (isFull()){
return;
}
items[tail] = elem;
tail++;
if (tail == items.length){
tail = 0;
}
// 取余数 tail = tail % items.length; 但是并不推荐(可读行不好),执行效率(低)
size++;
}
public Integer take(){
if (isEmpty()){
return null;
}
int value = items[head];
head++;
if (head == items.length){
head = 0;
}
size--;
return value;
}
public boolean isFull(){
return size == items.length;
}
public boolean isEmpty(){
return size == 0;
}
}
3.2 普通队列加上线程安全
观察上述代码,我们想加入多线程,当我们往队列里面加入元素,和往外面取出元素的这个过程,都要判断是否为空(满),并且此过程对变量有读写的过程,那么我们就应该将我们的成员变量,使用volatile进行修饰,防止出现指令重排序.
3.3 普通队列实现堵塞功能
1.当我们判断队列满的时候,再往队列添加元素的时候,我们不返回,而等待队列出元素的时候,进入堵塞的状态,等待接收通知,再往队列添加元素.
2.同理:当我们判断队列为空的时候,要从对列取出元素的时候,我们不进行返回null,同样进入堵塞的状态,等待入队的通知,在从队列中取元素
实现以上操作,就可以使用wait notify
上述代码就是对于入队列和出队列使用wait notify 实现堵塞功能.
思考:上述代码,是否还是存在弊端?
答案:是的,很有可能再别的代码中使用interrupt给wait唤醒,那么我们就需要再次进行判断是否符合唤醒的条件,也就是将判断队列是否为空(满)写成一个while循环.
3.4 堵塞队列最终代码
package blockingQueue;
/**
* Created with IntelliJ IDEA.
* Description:针对wait部分使用while优化
* User: YAO
* Date: 2023-05-17
* Time: 9:49
*/
public class MyBlockingQueue2 {
private int[] items = new int[100];
// 约定[head, tail)队列 循环队列
volatile private int head;
// 给所有的读操作对应的变量使用volatile关键字进行修饰
volatile private int tail;
volatile private int size;
synchronized public void put(int elem) throws InterruptedException {
// 针对this加锁
while (isFull()){
//return;
this.wait();
//如果队列满了,就进入阻塞,等待出队列的notify进行唤醒
//注意:这样使用wait()是不太正确的,很有可能在别的代码暗中interrupt,把wait唤醒了
//明明条件还没满足条件,但是把wait()给唤醒了,此时就会在这种情况导致栈溢出
//进行优化,wait()被唤醒之后,在确认一下条件是不是满足,如果还不满足就继续wait(),就使用while循环
}
items[tail] = elem;
tail++;
if (tail == items.length){
tail = 0;
}
// 取余数 tail = tail % items.length; 但是并不推荐(可读行不好),执行效率(低)
size++;
this.notify();
// 此时往空队列加入了元素,用来唤醒因为队列为空,而进入堵塞的出队列操作
}
synchronized public Integer take() throws InterruptedException {
while (isEmpty()){
//return null;
this.wait();
//如果队列为空,就进入阻塞,等待入队列的notify进行唤醒
}
int value = items[head];
head++;
if (head == items.length){
head = 0;
}
size--;
this.notify();
// 此时满队列出了元素,用来唤醒因为队列满,而进入堵塞的入队列操作
return value;
}
public boolean isFull(){
return size == items.length;
}
public boolean isEmpty(){
return size == 0;
}
}
4. 使用生产者消费者模型测试自己构建的堵塞队列
package blockingQueue;
/**
* Created with IntelliJ IDEA.
* Description:基于自己构建的堵塞队列构建一个生产者消费者模型
* User: YAO
* Date: 2023-05-17
* Time: 9:32
*/
public class MyProducerConsumerModel {
public static void main(String[] args) {
MyBlockingQueue queue = new MyBlockingQueue();
Thread t1 = new Thread(()->{
while (true){
try {
int value = queue.take();
Thread.sleep(1000);
System.out.println("消费:"+value);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
});
t1.start();
Thread t2 = new Thread(()->{
int value = 0;
while (true){
try {
System.out.println("生产:"+value);
queue.put(value);
//Thread.sleep(1000);
value++;
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
});
t2.start();
System.out.println(" ");
}
}
运行结果:
根据我们的设置,生产者瞬间生产了100个元素,进入堵塞,然后设置消费者,每隔1秒消费一个元素,生产者再继续生产.