文章目录
- 阻塞队列
- 生产者消费者模型
- 通过BlockingQueue理解阻塞队列
- 自己实现阻塞队列
阻塞队列
我们之前学的队列, 其实是最基础的队列, 实际开发中, 针对队列还有很多种变种
- 普通队列
- 优先级队列
- 阻塞队列
先进先出, 线程安全, 并且带有阻塞功能
阻塞功能指:
如果队列为空, 尝试出队列, 出队列操作就会阻塞, 一直阻塞到队列不为空为止
如果队列为满, 尝试入队列, 入队列操作也会阻塞, 一直阻塞到队列不满为止 - 消息队列
不是普通的先进先出, 而是通过topic这样的参数来对数据进行分类, 出队列的时候, 指定topic, 每个topic下的数据先进先出, 也有阻塞性质
消息队列这样的数据结构, 在实际开发中, 经常会把这样的数据结构封装成单独的服务器程序, 单独部署, 这样的服务器程序, 同时也称为消息队列
消息队列能够起到的作用就是实现==“生产者消费者模型”==
生产者消费者模型
阻塞队列, 也可以实现生产者消费者模型:
如果是在一个进程内, 直接使用阻塞队列, 即可实现生产者消费者模型
如果在分布式系统中, 需要使用单独部署的消息队列服务器, 实现生产者消费者模型
生产者消费者模型, 在开发中主要有两方面的意义:
1. 能够让程序进行解耦
如果让A直接调用B, 意味着A的代码就要包含很多和B相关的逻辑, B的代码也会包含和A相关的逻辑
彼此之间有了一定的耦合
一旦A做出修改 或 A出了bug, 就可能影响到B, 反之亦然
站在A的视角, 不知道B的存在, 只关心和队列的交互
站在B的视角, 不知道A的存在, 只关心和队列的交互
实现了程序的解耦合
此时, 对A做出修改 或 A出了bug, 不太容易影响到B, 反之亦然
未来如果在引入C, 也让A访问C, 那么A中不需要修改任何代码, 直接在队列里读取C的数据即可, 可以提高代码的可扩展能力
2. 能够使程序"削峰填谷"
客户端发来的请求, 个数多少, 我们是没办法提前预知的, 遇到某些突发事件, 就可能导致客户端给服务器发来的请求激增
正常情况下, A收到一个客户端的请求, 就同样要请求B一次
如果A收到的请求激增了, 那么B的请求也会激增
但是由于A做的工作比较简单, 消耗的资源少, 但是B的工作复杂, 消耗的资源多, 一旦请求量大, B就容易挂
那么引入消息队列, 就可以保证, 无论A给队列写多块, B都可以按照固有的节奏来消费数据, B的节奏就不一定完全跟着A了
通过BlockingQueue理解阻塞队列
BlockingQueue 是一个接口, 表示阻塞队列
有三个类实现了这个接口:
阻塞队列只需要考虑, 入队列和出队列即可, 阻塞队列没有"取队首操作"
之前普通队列使用的是offer和poll, 但这组方法是没有阻塞功能的
阻塞队列使用的方法是: put 和 take
ArrayBlockingQueue, 必须要传入capacity, 作为数组的最大容量
如果队列为满, 进行put就会等待阻塞, 直到有现成take走一个元素
如果队列为空, 进行take就会等待阻塞, 直到有线程put一个元素
put和take操作都需要抛InterruptedException, 阻塞过程中, 如果其他线程尝试使用interrupt来终止被阻塞的线程, 此时put就能够返回, 并抛出异常
第二次take时, 队列中已经没有元素, 则线程就会进入阻塞状态
实现生产者消费者模型:
public static void main(String[] args) {
BlockingQueue<Integer> queue = new ArrayBlockingQueue<>(1000);
//生产者
Thread t1 = new Thread(() -> {
int i = 1;
while(true){
try {
queue.put(i);
System.out.println("t1生产: " + i);
i++;
Thread.sleep(1000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
});
//消费者
Thread t2 = new Thread(() -> {
int elem = 0;
while(true){
try {
elem = queue.take();
System.out.println("t2消费: " + elem);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
});
t1.start();
t2.start();
}
上述代码, 我们让t1每秒钟生产一个, 此时t2只能阻塞等待, 等t1生产完后, t2才能消费
下面我们让t2每秒钟消费一次, 那么t1生产到队列满后, 只能等待t2消费完, t1才能生产
自己实现阻塞队列
用数组模拟队列, 记录head和tail下标, 用size记录数组中的有效值(用volatile修饰, 保证线程安全)
入队列:
- 给put操作加锁, 保证线程安全
- **使用wait操作, 最好搭配while循环使用 **
因为wait不仅仅能被notify和notifyAll唤醒, 还可能被其他操作唤醒, 例如interrupt
如果我们使用trycatch, 捕获到异常后, 如果没有继续抛出异常, 后面的代码就会继续执行, 进程不会结束, 此时就会覆盖掉本来的元素
此处我们使用的是throws, 进程不会继续执行
如果搭配while, 可以保证如果wait被其他线程唤醒, 能够再次条件判断, 判断是否真的要继续执行 - 成功put元素后, 记得使用notify唤醒take的阻塞
出队列:
完整代码:
class MyBlockingQueue{
private String[] elems = null;
private volatile int head;
private volatile int tail;
private volatile int size = 0;
public MyBlockingQueue(int capacity){
elems = new String[capacity];
}
//入队列
public void put(String elem) throws InterruptedException {
synchronized(this){
while(size >= elems.length){
//队列满了, 进行阻塞等待
this.wait();
}
//将新的元素加入到队列中
elems[tail] = elem;
tail++;
//循环队列
if(tail >= elems.length){
tail = 0;
}
//更新size的值
size++;
//唤醒take的阻塞
this.notify();
}
}
//出队列
public String take() throws InterruptedException {
synchronized (this){
while(size == 0){
//队列为空, 进行阻塞等待
this.wait();
}
//将队首元素返回给elem
String elem = elems[head];
tail++;
//循环队列
if(head >= elems.length){
head = 0;
}
//更新size的值
size--;
//唤醒put的阻塞
this.notify();
return elem;
}
}
}
public class Demo25 {
public static void main(String[] args) {
MyBlockingQueue queue = new MyBlockingQueue(1000);
//生产者
Thread t1 = new Thread(() -> {
int i = 1;
String elem = "aaa";
while(true){
try {
queue.put(elem);
System.out.println("t1生产: " + i);
i++;
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
});
//消费者
Thread t2 = new Thread(() -> {
while(true){
try {
String elem = queue.take();
System.out.println("t2消费: " + elem);
Thread.sleep(1000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
});
t1.start();
t2.start();
}
}