所谓的阻塞队列:就是带有阻塞特性的《——》线程安全的
- 如果队列为空,尝试出队列,就会阻塞等待,等到队列不为空为止
- 如果队列为满,尝试入队列,也会阻塞等待,等到队列不为满为止
这个东西非常有用,尤其是写多线程代码的时候,多个线程之间进行数据交互,可以使用阻塞队列来简化代码编写!
Java标准库提供了阻塞队列的使用:
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingDeque;
import java.util.concurrent.LinkedBlockingDeque;
//阻塞队列
public class Main3 {
public static void main(String[] args) throws InterruptedException{
// BlockingDeque<>是一个接口,不能直接new,因此,可以new一个 BlockingDeque<>的实现类LinkedBlockingDeque<>()
BlockingDeque<String> queue=new LinkedBlockingDeque<>();//基于链表实现
//ArrayBlockingQueue<>()基于数组实现
//put入队列
queue.push("hello1");
queue.push("hello2");
//take出队列
String result=null;
result= queue.take();//取出队首元素
System.out.println(result);
result=queue.take();
System.out.println(result);
result=queue.take();
System.out.println(result);
}
}
上述代码的运行结果为:
上述代码,入队列两次,当第三次出队列的时候,会进行阻塞!!想要解除阻塞,就需要有另一个线程往阻塞队列中放入元素!!
编写一个“生产者消费者模型”,多线程使用阻塞队列
案列:包饺子~
生产者消费者模型:初心是啥??能解决啥问题??
能解决的问题有很多,最主要的是俩方面:
1.可以让上下游模块之间进行更好的“解耦合”
耦合:两个模块之间的关联关系是强还是弱(关联越强,耦合越高)
写代码的要追求低耦合,避免代码牵一发动全身
内聚:
低内聚,相关联的东西没有放到一起,随便乱放的,
相关联的代码没有放到一起,东一块西一块,
高内聚:相关联的代码,分门别类的规制起来
如:两个服务器A与服务器B之间,有没有阻塞队列的情况:
2.削峰填谷
A收到的请求数量是和用户行为相关的
用户行为是随机的情况,有些情况下会出现“峰值”,暴涨一波!如果A和B是直接调用的关系,A收到峰值,B也同样收到峰值!假设A平时收到的请求是1秒1W个,突然间A收到了1秒5W个请求,则B也会1秒出现5W个请求,此时如果B设计的时候,没有考虑峰值,可能会直接挂了~~
如果有了阻塞队列这种情况:此时A收到的请求多了,队列里的元素也就多了,此时A仍然可以按照之前的速率来取元素,则队列帮B承担了压力!
比如:三峡大坝的闸门《——》削峰填谷
接下来就编写一个“生产者消费者模型”多线程使用阻塞队列的情况!
阻塞队列的简单用法:
import java.util.concurrent.BlockingDeque;
import java.util.concurrent.LinkedBlockingDeque;
public class Main4 {
public static void main(String[] args) {
//阻塞队列
BlockingDeque<Integer> blockingDeque=new LinkedBlockingDeque<>();
//用法比较简单,重点在于如何实现一个阻塞队列
//消费者
Thread t1=new Thread(()->{
while (true){
try {
//从队列中取元素
int value = blockingDeque.take();
System.out.println("消费元素:"+ value);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
t1.start();
//生产者
Thread t2=new Thread(()->{
int value=0;
while (true){
try {
System.out.println("生产元素:"+value);
blockingDeque.put(value);
value++;
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
//上述代码,让生产者每隔1s生产一个元素
//让消费者直接消费,不受限制
t2.start();
}
}
上述代码的运行结果为:
重点:如何模拟写一个阻塞队列
泛型??普通程序猿很少在工作中会用到
一般实现库的程序员会涉及泛型
因此再后续面试的时候,尽量不要写泛型!!(重要)
能不用泛型就不用泛型~
不用泛型,直接用朴素的代码,假定有存储的元素为int,基于数组来实现队列
class MyBlockingQueue{
private int[] items=new int[1000];//数组
//约定[head,tail)队列的有效元素
volatile private int head=0;//指向队首元素下标
volatile private int tail=0;//指向队尾元素下标
volatile private int size=0;//获取队列中的元素个数
//入队列
synchronized public void put(int elem) throws InterruptedException{
if (size==items.length){
//如果队列满了,插入失败
//return;
this.wait();
//如果队列满了,就进行阻塞
//出队列的notify()唤醒
}
//把新元素放到tail所在的位置上
items[tail]=elem;
tail++;
//万一tail达到末尾,要让tail从头再来
if (tail==items.length){
tail=0;
}
//一样的写法,用:tail%items.length不高效
//tail=tail%items.length;不推荐
size++;
this.notify();
}
//出队列
synchronized public Integer take()throws InterruptedException{
while (size==0){
//return null;
this.wait();
//队列为空,阻塞等待
//入队列的notify唤醒
}
int value=items[head];
head++;//队首元素下标往后走
if (head==items.length){
head=0;
}
size--;
this.notify();
return value;
}
}
public class Main5 {
public static void main(String[] args) {
MyBlockingQueue queue=new MyBlockingQueue();
//消费者
Thread t1=new Thread(()->{
while (true){
try {
int value = queue.take();
System.out.println("消费:"+value);
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
//生产者
Thread t2=new Thread(()->{
int value=0;
while (true){
try {
System.out.println("生产:"+value);
queue.put(value);
//Thread.sleep(1000);
value++;
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
t1.start();
t2.start();
}
}
实现阻塞队列分为三步:
- 先实现一个普通队列
- 加上线程安全
- 加上阻塞功能