目录
阻塞队列
生产者消费者模型
消息队列
消息队列的作用
1.解耦
2.削峰填谷
3.异步
演示JDK中的阻塞队列
实现一个阻塞队列
阻塞队列
队列,是一种先进先出(FIFO)数据结构。
阻塞队列也满足队列的特性:
入队元素时,先判断一下队列是否已经满了,如果满了就等待(阻塞),当有空余空间时再插入
出队元素时,先判断一下队列是否已经空了,如果空了就等待(阻塞),当队列中有元素时再取出
生产者消费者模型
生产者和消费者模型使通过一个容器来解决生产者和消费者的强耦合问题。
生产者和消费者彼此之间不直接通讯,而通过阻塞队列来进行通讯,所以生产者生产完数据之后不待消费者处理,直接扔给阻塞队列,消费者不找生产者要数据,而是直接从阻塞队列里取。
例1:分别用一个线程模拟生产者和消费者:
生产者:
//创建生产者线程
Thread producer=new Thread(()->{
int num=1;
while(true){
//生产(打印)一条消息
System.out.println("生产了消息"+num);
try {
//把消息放入阻塞队列中
queue.put(num);
num++;
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
});
消费者:
//创建消费者线程
Thread consumer=new Thread(()->{
while(true){
try {
//从队列中获取消息
int num=queue.take();
//打印一下消费日志
System.out.println("消费了元素"+num);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
});
正常执行线程,生产者生产一个,消费者消费一个:
将生产者休眠时间改为1ms,消费者休眠时间改为1s,模拟生产块消费慢的过程
生产者:
消费者:
生产块消费慢,使用阻塞队列。
例2:现实生活中包饺子的例子
包饺子有三个步骤:
1.和面
2.擀皮
3.包饺子
而这里,生产者就是擀皮的人,消费者使包饺子的人,放饺子皮的地方是一个交易场所,用阻塞队列来是实现。
这就是一个应用场景,用阻塞队列实现饺子皮的交易。
消息队列
消息队列本质上就是一个阻塞队列,在此基础上上为放入阻塞队列的消息打一个标签。
例如:
在基础数据结构上,做了一些针对应用场景的优化和实现,我们把这样的框架和软件,称为“中间件”。
消息队列的作用
1.解耦
在设计程序的时候提出过一些要求,比如:高内聚,低耦合
高内聚:把功能强相关的代码写在一起,维护起来非常方便,是组织代码的方式
低耦合:不要把相关的代码写的到处都是,一般是通过抽象的方式把代码封装成方法,使用的时候调用即可
例如:
购物功能,三个服务器,服务器A负责订单,服务器B负责支付,服务器C查看物流
现在这个情况,使服务器A与服务器B必须都知道对方存在,且参数必须约定好,如果其中一方损坏那么整个流程都会受到影响。这种情况下,某一个功能需要修改那么涉及的服务器肯都需要重新修改代码。这种情况称之为耦合度非常高。
而通过消息队列把三个系统进行解耦,使它们不再直接通信,起到了单独维护,单独运行的作用。
2.削峰填谷
峰与谷指的是消息的密集程度。
在流量激增的时候用消息队列缓冲,在流量减少的时候,把消息队列中存储的消息一点一点的消费。
例如:
双十一或者微博有热点事件时。
加入消息队列
在流量激增的时候用消息队列缓冲,在流量减少的时候,把消息队列中存储的信息一点一点消费。
3.异步
同步:请求方必须死等对方的响应
异步:发出请求之后,自己去干别的事情,有响应时会接收到通知从而处理响应
演示JDK中的阻塞队列
创建一个阻塞队列,指定队列容量为3.
//创建时可以指定队列的容量,当队满时还需要插入就要阻塞队列
BlockingDeque<Integer>queue=new LinkedBlockingDeque<>(3);
由于我们创建的阻塞容量为3,当第四个元素插入时就会发生阻塞。
//往队列中写入元素
queue.put(1);
queue.put(2);
queue.put(3);
System.out.println("已经插入了三个元素");
queue.put(4);
System.out.println("已经插入了4个元素");
阻塞队列中获取元素用take,会产生阻塞效果。
public static void main(String[] args) throws InterruptedException {
//创建时可以指定队列的容量,当队满时还需要插入就要阻塞队列
BlockingDeque<Integer>queue=new LinkedBlockingDeque<>(3);
//往队列中写入元素
queue.put(1);
queue.put(2);
queue.put(3);
System.out.println("已经插入了三个元素");
// queue.put(4);
// System.out.println("已经插入了4个元素");
//阻塞队列中获取元素用take
System.out.println(queue.take());
System.out.println(queue.take());
System.out.println(queue.take());
}
再次调用queue.take(),此时队列已经空了,进入阻塞状态。
实现一个阻塞队列
1.实现一个普通队列,底层用了两种数据结构,一个是链表,一个是循环数组
2.阻塞队列就是在普通队列上加入了阻塞等待的操作
这里使用数组演示:
普通队列:
定义队列的队首,队尾,元素个数
//定义保存元素的数组
private int[] elementData=new int[3];
//定义队首下标
private int head;
//定义队尾下标
private int tail;
//定义有效元素个数
private int size;
插入操作
//插入
public void put(int value){
if(size>=elementData.length){
return;
}
elementData[tail]=value;
tail++;
if(tail>=elementData.length){
tail=0;
}
size++;
}
获取值操作
//获取元素
public int take(){
if(size<=0){
return -1;
}
int val=elementData[head];
head++;
if(head>=elementData.length){
head=0;
}
size--;
return val;
}
阻塞队列:
阻塞队列需要在普通队列的基础上加入等待和唤醒操作
等待操作:wait()
唤醒操作:notify()
这两个方法和synchronized是强相关的,所需需要先在代码中加入synchronized。
插入操作
//插入
public void put(int value) throws InterruptedException {
//根据修改共享变量的范围加锁
//锁对象用this即可
synchronized (this) {
if (size >= elementData.length) {
//如果数组满了,阻塞等待
this.wait();
}
elementData[tail] = value;
tail++;
if (tail >= elementData.length) {
tail = 0;
}
size++;
//唤醒操作
this.notifyAll();
}
}
查找值操作
//获取元素
public int take() throws InterruptedException {
synchronized (this) {
if (size <= 0) {
this.wait();
}
int val = elementData[head];
head++;
if (head >= elementData.length) {
head = 0;
}
size--;
//唤醒操作
this.notifyAll();
return val;
}
}