1、阻塞队列
队列是一种先进先出的数据结构。而阻塞队列也是一种特殊的队列,也遵守”先进先出“的原则。
阻塞队列是一种线程安全的的数据结构,并且具有以下特性:
1、队列往进写元素是从队尾插入,队首取出
2、当插入元素的时候,先判断一下,队列是否已经满了,如果满了就继续等(阻塞),等到队列有空余的位置的时候再去插入。
3、当取出元素的时候,先判断一下,队列是否为空,如果空了就继续等(阻塞),等到队列中有元素的时候再去取出。
阻塞队列有一个典型的应用场景就是”生产者消费者模型“,这是一种非常典型的开发模型。
生产者消费者模型:
生产者和消费者模式就是通过一个容器来解决生产者和消费者强耦合问题。
生产者和消费者彼此之间不直接通讯,而是通过阻塞队列来进行通讯,所以生产者生产完数据之后不用原地等待消费者进行处理,直接扔给阻塞队列,消费者不找生产者要数据,而是直接从阻塞队列中获取。
1、阻塞队列相当于是一个缓冲区,平衡了消费者和生产者之间的处理能力
比如在一些购物软件有"秒杀"场景,服务器在同一时刻可能会收到大量的支付请求,如果直接处理这些请求,服务器很有可能会扛不住大量数据的冲击(每一个支付请求的处理都需要比较复杂的流程),这个时候就可以把它放到一个阻塞队列中,然后由服务器慢慢处理每个支付请求。
这样做可以有效的进行“削峰",防止服务器被突然到来的一波请求直接冲垮。
2、阻塞队列也能使生产者和消费者之间解耦
比如过年一家人在一起包饺子,一般都是分工明确,比如一个人负责擀饺子皮,其他人负责包饺子,那么擀饺子皮的人就是生产者,包饺子的人就是消费者。
擀饺子皮的不关心包饺子的人是谁,只管擀包子皮,包饺子的人也不管擀包子皮的人是谁,只管包饺子。
1.1、消息队列
消息队列是阻塞队列一种典型应用,基于消费者生产者模型实现的,是在业务的驱使下,应用队列这个数据结构做了一些自定义的功能开发,满组一些真实业务工作,类似这样的框架或是软件,被叫做“中间件”。
工作原理:
1、正常队列,先进先出,完全遵守这个顺序。
2、消息队列,把每个消息打了个”标签“。标签可以理解为”类型",把消息类型分类
1.2、为什么要使用消息队列(阻塞队列)?
1、解耦
现在的程序尽量做到高内聚,低耦合。 也就是业务强相关的代码放到一起,为了维护程序方便,设计和组织代码的一种方式。需要哪一种方法去接口调用即可,避免代码分散开发。
2、削峰填谷
微博很难应对流量暴增的情况,流量暴增会在系统中申请很多很多线程,各种资源,最终会瞬间把服务器资源耗尽。
陶宝应对流量冲击案例:
削峰:在流量暴增的时候用消息队列把消息缓存起来,后面的服务器一点一点正常处理。
填谷:消费信息的服务器在流量不多的情况下,处理之前堆积的消息,就是填谷
3、异步操作
在发起请求到接收到响应的过程中,啥也不干,叫做同步;如果发起请求之后去执行别任务,那么就叫做异步。
1.3、标准库中的阻塞队列
在java标准库中内置了阻塞队列,如果需要在一些程序中使用阻塞队列,直接使用标准库中的即可。
*BlockingQueue是一个接口,真正实现类的是LinkedBlockingQueue。
*put方法用于阻塞式的入队列,take用于阻塞式的出队列。
*BlockingQueue也有offer,poll,peek等方法,但是这些方法不带有阻塞特性。
代码示例:
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
public class Exe_01 {
public static void main(String[] args) throws InterruptedException {
//JDK提供的创建方式
BlockingQueue<Integer> queue=new LinkedBlockingQueue<>(3);
//往阻塞队列添加3个元素
queue.put(1);
queue.put(2);
queue.put(3);
System.out.println("添加了3个元素");
//再添加第四个
//queue.put(4);
//System.out.println("添加了4个元素");
System.out.println(queue.take());
System.out.println(queue.take());
System.out.println(queue.take());
System.out.println("take了三个元素");
}
}
1.4、阻塞队列的实现
*通过“循环队列”的方式来实现
*使用synchronized来进行加锁控制
*put插入元素的时候,判定如果队列满的话,就进行wait。(注意:要在循环的时候进行wait,被唤醒时不一定队列就不满了,因为同时可能唤醒了多个线程)
*take取出元素的时候,判定如果队列为空,就进行wait。(也是循环wait)
阻塞队列实现分析:
1、在普通队列的基础上加了等待操作,在入队时如果队列已满就要等,出队时队列为空就要等
2、在普通队列的基础上加上了唤醒操作,执行完入队操作就会唤醒出队线程,执行完出队操作就会唤醒入队线程
阻塞队列不可能出现即是空的,又是满的这种状态,所以不会出现相互等待的现象。
代码示例:
/**
* 实现阻塞队列
*/
public class MyBlockingQueue {
//需要一个数组来保存数据
private Integer[] elementData=new Integer[20];
//定义队首队尾的下标
private volatile int head=0;
private volatile int tail=0;
//有效元素的个数
private volatile int size=0;
/**
* 添加元素
* @param value
*/
public void put(Integer value) throws InterruptedException {
synchronized(this){
//判断队列是否已经满了
if(size>=elementData.length){
this.wait();
}
//从队尾入队
elementData[tail]=value;
//队尾向前移动
tail++;
//处理循环
if(tail>= elementData.length){
tail=0;
}
size++;
//添加新的元素唤醒线程
this.notifyAll();
}
}
/**
* 获取元素
* @return
*/
public Integer take() throws InterruptedException {
synchronized(this){
//先判断是否为空
if(size==0){
//出队时,如果为空,继续等待
this.wait();
}
//出队队首元素
Integer value=elementData[head];
//向后移动head
head++;
//处理循环
if(head>=elementData.length){
head=0;
}
size--;
//出队时唤醒其他线程
this.notifyAll();
return value;
}
}
}
public class Exe_02 {
public static void main(String[] args) throws InterruptedException {
MyBlockingQueue queue=new MyBlockingQueue();
//往阻塞队列添加三个元素
queue.put(1);
queue.put(2);
queue.put(3);
System.out.println("添加了3个元素");
//再添加第四个
//queue.put(4);
//System.out.println("添加了4个元素");
System.out.println(queue.take());
System.out.println(queue.take());
System.out.println(queue.take());
System.out.println("take了三个元素");
}
}
运行结果:
1.5、实现生产者消费者模型
代码示例:
public class Exe_03 {
//用两个线程生产者消费者模型
public static void main(String[] args) {
//创建一个阻塞队列,表示交易场所
MyBlockingQueue queue=new MyBlockingQueue();
//创建生产者线程
Thread producer=new Thread(() ->{
int num=0;
while(true){
try {
queue.put(num);
System.out.println("生产了元素:"+num);
num++;
} catch (InterruptedException e) {
e.printStackTrace();
}
}
},"生产者");
//启动线程
producer.start();
//创建消费者线程
Thread consumer=new Thread(() ->{
while(true) {
try {
Integer value = queue.take();
//睡眠一会
Thread.sleep(1000);
System.out.println("消费了了元素" + value);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
},"消费者");
//启动线程
consumer.start();
}
}
运行结果:
现象就是 ,生产者都把队列填满后,开始阻塞,等消费者一点一点消费,生产者一点一点生产。
调用wait()解决虚假唤醒问题
更新代码:
public class Exe_03 {
//用两个线程生产者消费者模型
public static void main(String[] args) {
//创建一个阻塞队列,表示交易场所
MyBlockingQueue queue=new MyBlockingQueue();
//创建生产者线程
Thread producer=new Thread(() ->{
int num=0;
while(true){
try {
queue.put(num);
System.out.println("生产了元素:"+num);
num++;
} catch (InterruptedException e) {
e.printStackTrace();
}
}
},"生产者");
//启动线程
producer.start();
//创建消费者线程
Thread consumer=new Thread(() ->{
while(true) {
try {
Integer value = queue.take();
//睡眠一会
Thread.sleep(1000);
System.out.println("消费了了元素" + value);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
},"消费者");
//启动线程
consumer.start();
}
}