文章目录
- 阻塞队列
- 1. 概念与特性
- 2. 生产者与消费者模型
- 2.1 生产者消费者模型的两个好处(主要的)
- 3. 标准库中的阻塞队列
- 3.1 代码实现生产者消费者模型
- 4. 阻塞队列实现
- 4.1 普通队列实现
- 4.2 给队列追加阻塞功能
阻塞队列
1. 概念与特性
阻塞队列 是一种特殊的队列,也遵守 “先进先出” 的原则。
阻塞队列 是一种线程安全的数据结构,虽然也是 先进先出 的,
但是它还带有 阻塞 功能。
两个特性:
- 当 队列满 的时候,继续 入队列就会阻塞,直到有其他线程从队列中取走元素。
- 当 队列空 的时候,继续 出队列也会阻塞,直到有其他线程往队列中插入元素。
2. 生产者与消费者模型
生产者消费者模式就是通过一个 容器 来解决生产者和消费者的 强耦合问题。
生产者和消费者彼此之间不直接通讯,而通过 阻塞队列 来进行通讯,
所以 生产者 生产完数据之后不用等待 消费者 处理,直接 扔给阻塞队列,
消费者 不找 生产者 要数据,而是直接 从阻塞队列里取。
举一个例子
过年包饺子的时候,妈妈负责擀饺子皮,并且将擀好的饺子皮放到盖帘上,
爸爸和孩子就直接到盖帘上拿饺子皮开始包饺子。
这里的 妈妈 就相当于是 生产者 ,爸爸和孩子 就相当于是 消费者 ,
盖帘 就相当于是 阻塞队列。
如果此时 妈妈 擀的速度太快了,盖帘的空间就会占满,妈妈就要等爸爸和孩子把饺子皮拿走后,
盖帘有空间了,再继续擀饺子皮。
如果此时 爸爸和孩子 包的速度太快了,盖帘上没有饺子皮了,
此时就要等待着妈妈把饺子皮擀好,才能继续包饺子。
2.1 生产者消费者模型的两个好处(主要的)
1、实现了发送方与接收方的之间的解耦合
这里的 耦合 指的是代码不同模块之间的关系
如果某一个模块的代码发生了改变,对其他模块影响的大小。
比方说你的女朋友生病了,这个时候那就要照顾她,在这个时候她对于我还有很大的关系的。
如果哪一天分手了,她再生病,对于我就没有这么大的关系了。
当然,写代码的时候尽量是 低耦合 ,降低耦合 的过程就叫做 解耦合。
比如解开缠在一起的耳机线。
开发中典型的场景:服务器之间的相互调用
此时就可以视为A调用了B。
A服务器接收到充值请求后转发给B服务器处理,B服务器处理完了再把结果反馈给A服务器。
上述场景中耦合性是比较高的。
A 调用 B 的前提是务必要知道 B 是存在的,如果 B 挂了,此时就很容易引起BUG。
针对上面的场景使用 生产者消费者模型 就可以有效的降低耦合。
此时 A 和 B 之间的耦合就降低了很多。
A 不知道 B 的存在,A 只知道队列(A 的代码中没有任何一行代码与 B 的相关)
B 不知道 A 的存在,B 只知道队列(B 的代码中没有任何一行代码与 A 的相关)
如果此时 B 挂了,对于 A 不会有任何影响,因为队列还好着,
A 仍然可以给队列插入元素,如果队列满了,就先阻塞就好。
如果此时 A 挂了,对于 B 也不会有任何影响,因为队列还好着,
B 仍然可以给队列插入元素,如果队列满了,就先阻塞就好。
A B 任何一方挂了都不会对对方造成影响。
2、削峰填谷,保证系统的稳定性
削峰填谷 就是把波峰削低一点,把波谷填高一点。
三峡大坝就有 削峰填谷 的作用。
如果上游水多了,就关闸蓄水,此时就相当于由三峡大坝承担了上游的冲击,
对下游起到了很好的保护作用,这就是 削峰
如果上游少水了,三峡大坝就卡闸放水,有效保证下游的用水情况,避免出现干旱灾害。
这就是 填谷。
用服务器进行开发,也是和上述模型是非常相似的。
上游就是用户发送的请求,下游就是一些执行具体的业务的服务器。
用户发送多少个请求,是不可控的,有时多有时少。
如果哪天超过了服务器的上限,说不定就会挂了,这个时候就可以采用生产者消费者模型来解决。
3. 标准库中的阻塞队列
BlockingQueue 是一个接口,真正实现的类是 LinkedBlockingQueue。
put 方法用于阻塞式的入队列,
take 用于阻塞式的出队列。
BlockingQueue 也有 offer、poll、peek 等方法,但是这些方法不带有阻塞特性。
代码实现
package thread;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
public class ThreadDemo1 {
public static void main(String[] args) throws InterruptedException{
BlockingQueue<String> blockingQueue = new LinkedBlockingQueue<>();
blockingQueue.put("hello");
System.out.println(blockingQueue.take());
}
}
输出的结果即是,出队列拿到的结果。
如果连续出两次队列,此时队列就为空了,这是就会发生 阻塞。
3.1 代码实现生产者消费者模型
1、先创建两个线程来作为生产者和消费者
2、消费者要做的就是从队列里拿元素。
3、生产者的任务是往队列中放元素
整体代码
package thread;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
public class ThreadDemo2 {
public static void main(String[] args) {
BlockingQueue<Integer> blockingQueue = new LinkedBlockingQueue<>();
//创建两个线程来作为生产者和消费者
Thread customer = new Thread(() ->{ //消费者
//消费者要做的就是从队列里拿元素
while (true) {
try {
Integer result = blockingQueue.take();
System.out.println("消费元素:" + result);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
customer.start(); //启动
Thread producer = new Thread(() -> { //生产者
//生产者的任务是往队列中放元素
int count = 0; //元素个数
while (true) {
try {
blockingQueue.put(count);
System.out.println("生产元素:" + count);
count++;
//限制生产的速度
Thread.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
producer.start(); //启动
}
}
根据输出结果可以看出要先生产才能消费,这是由于消费线程阻塞等待了。
在还没有生产出来之前就会一直阻塞。
4. 阻塞队列实现
4.1 普通队列实现
要想实现一个 阻塞队列 先要实现一个普通的队列。
head 表示头部,tail 表示尾部。
此时的 head 和 tail 就组成了一个区间:[head, tail)
如果要插入一个元素(入队列),tail 的位置就往后移一个。
如果要拿出一个元素(出队列),head 的位置就往后移一个。
如果 tail 指向了数组的最后一个位置,此时在插入一个一个元素时,tail 就移动到最左边。
这也就形成了一个 循环队列
如何区分队列是空还是满
可以采取两个方法:
- 浪费一个空间
当 tail 走到 head 的前面是,队列就满了。 - 引入一个 size 来记录元素的个数
size 是 0 就是空的,size 是数组的长度就是满的。
下面是基于数组实现的队列。
完整代码
package thread;
// 次数不考虑泛型,只使用 int 来表示元素类型
class MyBlockQueue {
private int[] items = new int[1000]; //要先有数组
private int head = 0; //头部
private int tail = 0; //尾部
private int size = 0; //元素个数
//入队列
public void put(int value) {
//如果数组空间满了就不能插入了
if (items.length == size) {
return;
}
items[tail] = value;
tail++;
//针对于tail位于数组末尾的处理
if (tail >= items.length) {
tail = 0; //重新放到数组的开头
}
size++; //队列元素个数加一个
}
//出队列
public Integer take() {
//如果队列是空的不能出队列
if (size == 0) {
return null;
}
int result = items[head];
head++; //head向后移一个位置
//针对head位于数组末尾位置的处理
if (head >= items.length) {
head = 0; //重新指向数组的开头
}
size--; //元素个数减一个
return result;
}
}
public class ThreadDemo3 {
public static void main(String[] args) {
MyBlockQueue myBlockQueue = new MyBlockQueue();
//入队列
myBlockQueue.put(1);
myBlockQueue.put(2);
myBlockQueue.put(3);
//出队列
int result = 0;
result = myBlockQueue.take();
System.out.println("result = " + result);
result = myBlockQueue.take();
System.out.println("result = " + result);
result = myBlockQueue.take();
System.out.println("result = " + result);
}
}
关于 队列 更加详细的讲解请参考下面文章:
https://blog.csdn.net/m0_63033419/article/details/127828890?spm=1001.2014.3001.5502
4.2 给队列追加阻塞功能
1、为了保证线程安全需要给入队列和出队列操作加上锁( synchronized)。
2、入队列的时候,如果队列满了就会产生阻塞(wait 和 notify)。
- 这个时候就要等到把队列里的元素拿出以后才可以入队列。
- take 操作结束后,可以使用 notify() 来唤醒 put 操作中的 wait。
- 这里不使用 sleep 是因为 sleep 只能是用于设置时间,
而我们不知道会阻塞等待到什么时候。
3、出队列的时候,如果队列为空就阻塞(wait 和 notify)。
- 这个时候就要等到入队列以后才可以出队列。
4、put 和 take 操作特殊情况的处理(while)。
当 wait 被唤醒的时候,此时 if 的条件一定就不成立了吗?
具体来说,put 中的 wait 被唤醒要求队列不满,但是 wait 被唤醒了之后,队列一定是不满的吗?
虽然当前的代码不会出现这样的情况,但是稳妥起见,最好的办法是:
wait 返回之后再次判断一下,看此时的条件是不是具备了。
完整代码
//入队列
public void put(int value) {
synchronized (this) {
//如果数组空间满了就不能插入了
if (items.length == size) {
//队列满了就要阻塞等待
this.wait();
}
items[tail] = value;
tail++;
//针对于tail位于数组末尾的处理
if (tail >= items.length) {
tail = 0; //重新放到数组的开头
}
size++; //队列元素个数加一个
//唤醒 take 中的 wait
this.notify();
}
}
//出队列
public Integer take() {
int result = 0;
synchronized (this) {
//如果队列是空的不能出队列
if (size == 0) {
//队列为空就阻塞
this.wait();
}
result = items[head];
head++; //head向后移一个位置
//针对head位于数组末尾位置的处理
if (head >= items.length) {
head = 0; //重新指向数组的开头
}
size--; //元素个数减一个
//唤醒 put 中的 wait
this.notify();
}
return result;
}