文章目录
- 什么是阻塞队列?
- 使用阻塞队列有什么好处?
- 代价
- BlockingQueue的使用
- 自己实现一个阻塞队列(基于数组)
什么是阻塞队列?
阻塞队列是在普通的(先进先出)的基础上,做出了扩充.
- 线程安全
- 具有阻塞特性
a. 如果队列为空,进行出队列操作,此时就会出现阻塞,一直阻塞到其他线程往队列里添加元素为止
b. 如果队列为满,进行入队列操作,此时也会出现阻塞,一直阻塞到其他线程从队列里取走元素为止.
(标准库中原有的队列Queue和其子类,默认都是线程不安全的)
通常谈到的"阻塞队列"是代码中的一个数据结构,但是由于这个东西太好用了,以至于会把这样的数据结构单独封装成一个服务器程序,并且在单独的服务器机器上进行部署.
此时,这样的阻塞队列,有了一个新的名字—“消息队列”(Message Queue,简称MQ)
基于阻塞队列,最大的应用场景就是实现"生产者消费者模型".
什么是"生产者消费者模型"呢?
举个栗子,大年三十包饺子.
第一种典型的包法,我们三个人,每个人都分别进行擀饺子皮和包饺子这两操作.但是这种方式其实是比较低效的,我们三个人就需要竞争擀面杖,就可能涉及到阻塞等待~
第二种典型的包法,也是实际生活中大家普遍采取的方案.
我负责擀饺子皮,其他两个人负责包.
使用阻塞队列有什么好处?
使用生产者消费者模型,主要有两方面的好处
-
服务器之间的"解耦合"(模块之间的关联程度/影响程度)
引入生产者消费者模型后,结构就成了下列摸样
可能有人会说了,看起来AB之间是解耦合了,但是A和队列,B和队列,这不是引入了新的耦合吗?
诶,这就要说说我们为啥害怕耦合?因为耦合的代码在后续的变更过程中,比较复杂,容易产生bug.
但是消息队列是成熟稳定的产品,代码不会频繁修改,也就是说代码是稳定的.A和队列,B和队列之间的交互逻辑,基本写一次就固定下来了. -
通过中间的阻塞队列,可以起点"削峰填谷"的作用,在遇到请求量激增的突发情况下,仍可以有效保护下游的服务器,让它们不会被请求冲垮.
到这里,就出现了两个问题:- 为啥一个服务器收到的请求更多,就可能会挂?
答:一台服务器的配置再好,它的硬件资源也是有限的.服务器每次收到一个请求,在处理这个请求的过程中,就会执行代码,就要消耗一定的硬件资源.当这些请求消耗的总硬件资源的量,超过了服务器能提供的上限,当然就会挂啦. - 在请求激增的时候,A为啥不会挂?队列为啥不会挂?
答:A的角色是一个"网关服务器",他负责收到客户端的请求,再把请求转发给其他的服务器,这样的服务器里面的代码做的工作比较简单(单纯的数据转发),消耗的硬件资源,通常更少.
处理一个请求,消耗的资源更少,同样的配置下,就能支持更多的请求处理.
同理,队列,其实也是比较简单的程序,单位请求消耗的硬件资源,也是比较少的.
而B这个服务器,是真正干活的服务器,它要真正完成一系列的业务逻辑.这一系列的工作,代码量非常庞大,消耗的时间很多,消耗系统的硬件资源更多,因此更容易挂.类似的,MySQL这样的数据库,处理每个请求的时候,做的工作就是比较多的,消耗的硬件资源也是比较多的,因此MySQL也是后端系统中,容易挂的部分.
对应的,想Redis这种内存数据库,处理请求做的工作远远少于mysql做的工作,因此消耗的资源更少,Redis就比MySQL皮实很多.不容易挂~
- 为啥一个服务器收到的请求更多,就可能会挂?
代价
- 需要更多的机器,来部署这样的消息队列
- A和B之间通信的延迟会变长~
因此,对于A和B之间的调用,如果要求响应的时间比较短,那就不太适合使用了~
BlockingQueue的使用
阻塞队列在Java标准库中提供了现成的封装.
Queue的一些操作,offer,poll这些,在BlockingQueue中同样也能够使用(不能阻塞).
BlockingQueue提供了另外两个专属方法(能阻塞):
- put 入队列
- take 出队列
阻塞队列没有提供"阻塞版本的"获取队首元素的操作.
需要注意的是,put和take产生的阻塞会被interrupt方法唤醒.
自己实现一个阻塞队列(基于数组)
代码:
class MyBlockingQueue {
private int size = 0;
private int[] data = null;
private int head = 0;
private int tail = 0;
public MyBlockingQueue(int capacity) {
data = new int[capacity];
}
public void put(int value) throws InterruptedException {
synchronized (this) {
while (size == data.length) {
//阻塞
this.wait();
}
data[tail] = value;
tail++;
size++;
if (tail >= data.length) {
tail = 0;
}
this.notify();
}
}
public int take() throws InterruptedException {
int ret = 0;
synchronized (this) {
while (size == 0) {
//阻塞
this.wait();
}
ret = data[head];
head++;
if (head >= data.length) {
head = 0;
}
size--;
this.notify();
}
return ret;
}
}
public class Demo18 {
public static void main(String[] args) {
MyBlockingQueue myBlockingQueue = new MyBlockingQueue(1000);
Thread t1 = new Thread(() -> {
int n = 1;
while (true) {
try {
myBlockingQueue.put(n);
System.out.println("生产:" + n);
n++;
//Thread.sleep(1000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
});
Thread t2 = new Thread(() -> {
while (true) {
try {
int n = myBlockingQueue.take();
System.out.println("消费:" + n);
Thread.sleep(1000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
});
t1.start();
t2.start();
}
}
一个小知识:
while的作用,就是在wait被唤醒之后,再次确认条件,看是否能继续执行.
注释里也说了,最好把wait放在一个循环中.
本文到这里就结束啦~