- 阻塞队列介绍
- 标准库阻塞队列使用
- 基于阻塞队列的简单生产者消费者模型。
- 实现一个简单型阻塞队列 (基于数组实现)
阻塞队列介绍
不要和之前学多线程的就绪队列搞混;
阻塞队列:也是一个队列,先进先出。带有特殊的功能 ;阻塞
1:如果队列为空,执行出队列操作,就会阻塞阻塞到另一个线程往队列里添加元素(队列不空)为止
2:如果队列满了,执行入队列操作,也会阻塞阻塞到另一个线程从队列取走元素位置(队列不满)
消息队列:
先简单介绍消息队列:特殊的队列,相当于在阻塞队列的基础上,加上一个消息类型,按照类别进行先进先出。
因为这个消息队列使用起来太香;所以有大佬把这样的数据结构单独实现成一个程序;可以单独的部署到一组服务器上。使储存、转发能力大大提升。现在已经发展可以和mysql、redis相提并论的一个重要组件。
消息队列之所以这么好用和阻塞队列阻塞特性关系非常大;基于这种特性可以实现 “生产者消费者模型”
生产者消费者模型(常用的并发设计模式):比如包饺子;一个人负责制作饺子皮;一个人负责包;一个人负责下锅。不会说每个人单独包一个饺子,这样会导致撵面杖不够用,影响效率。两个好处。
1:实现发送方与接收方之间的解耦。
比如下面这种是耦合度比较高;A要调用B(A把请求转发给B处理,B把结果反馈给A),A得知道B的存在。如果B挂了,很容易引起A的bug。。如果你要增加一个C服务器,对A也需要修改代码,对A又得重新测试等又不知道是否改动A会对B有影响。
这种模型下;A、B耦合就降低很多;A中无B,B中无A的代码。两边有一方挂了都互相没任何影响;因为阻塞队列是正常的。B挂了,A仍然可以插入元素,满了就阻塞。A挂了,B仍然可以从队列获取元素;空了就阻塞。增加一个C服务器对A是无感知的。
2:削峰填谷的作用,保证系统的稳定性。
服务器开发也很类似;说不定坤哥唱一首只因你太美;热搜瞬间上来,很多用户给你发请求,如果没有削峰填谷的准备服务器很容易就挂掉。
标准库阻塞队列使用
Queue提供:入队列 offer 、出队列 poll、取队首元素 peek。
阻塞队列主要方法:入队列 put、出队列 take。抛异常阻塞得需要唤醒,空队列你再取就阻塞。阻塞队列也有offer和poll方法,但是这些是不带阻塞功能的。
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingDeque;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.PriorityBlockingQueue;
public class test14 {//标准库阻塞队列使用
public static void main(String[] args) throws InterruptedException {
// BlockingDeque<String> q=new PriorityBlockingQueue<>() 基于堆;带有优先级阻塞队列
BlockingDeque<String> q=new LinkedBlockingDeque<>();//基于链表实现
// BlockingDeque<String> q=new ArrayBlockingQueue<>(); 基于数组实现
q.put("hello");
System.out.println(q.take());//输出结果hello
q.take();//空的时候取就会阻塞
}
}
基于阻塞队列的简单生产者消费者模型。
import java.util.concurrent.BlockingDeque;
import java.util.concurrent.LinkedBlockingDeque;
public class test15 {//基于阻塞队列生产者消费者模型
static int i=0;
public static void main(String[] args) {
BlockingDeque<Integer> blockingDeque=new LinkedBlockingDeque();
Thread t1=new Thread(()->{//生产者
while (true){
try {
blockingDeque.put(i);
System.out.println(i);//生产的元素
i++;
Thread.sleep(500);//生产的很慢;消费者就得阻塞先;等到元素生产出来
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
});
t1.start();
Thread t2=new Thread(()->{//消费者
while (true){
try {
System.out.println(blockingDeque.take());//消费的元素
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
});
t2.start();
}
}
执行效果
实现一个简单型阻塞队列 (基于数组实现)
先复习一下循环队列的实现。
public class test16 {//简单阻塞队列的实现;;先复习一遍循环队列
private int[] elem;
public int front;//队头
public int rear;//队尾
public test16(int k) {//创建这个对象,就有循环队列k大小数组
elem=new int[k];
}
//判满;入队的前提条件,入队是满就直接返回。或者你在满的时候扩容,入之前判断是否满;如果满扩容再入
public boolean isfull(){
//怎么判;队尾+1 回到头。但是刚好队头在原点就会产生;位置相同;但是队尾+1的值不等于0.
if((rear+1)%elem.length==front){
return true;
}
return false;
}
//判空;出队的前提条件,出队是空直接返回
public boolean isnull(){
//对头==队就是空
if(rear==front){
return true;
}
return false;
}
//入队方法;
public boolean offer1(int i) {
if(isfull()){//满的情况返回入队失败
return false;
}
//怎么入;是入在后面还是前面呢;队列尾入;这时候浪费的那个空间直接赋值进去;队尾往后移就好了
elem[rear]=i;//会不会有点不合适;万一前面是没有元素呢
rear=(rear+1)%elem.length; //这个队尾两种情况;第一种是单纯的加1;未到数组最后位置
//比如我把前面空间删除。刚好到最后一个位置;就得回归原点。%elem.length
//这里分开写效果会更好;rear++; if(rear>=elem.length){ rear=0; }
//相比之下rear++;read=read%elem.length 。这个不易读,不直观;每次得理解好一会 。效率上又没有优势
return true;
}
//出队方法
public Int poll1(){//头出、怎么出呢。感觉不合理;出队应该返回这个元素.所以得先记录
if(isnull()){
return -1;
}
int value=front;
front=(front+1)%elem.length;//往后移一位;elem.length避免逛了一圈
return elem[value];
}
//获取队首
public int peek1(){
if(isnull()){
return -1;
}
return elem[front];
}
//获取队尾
public int Rear(){
if(isnull()){
return -1;
}
//得注意;不能直接返回read-1.因为如果read是原点;那么减1不就变成负一
if(rear==0){
return elem[elem.length-1];
}
else
return elem[rear-1];
}
public static void main(String[] args) {
}
}
我们在此基础上加上阻塞功能就好了;也就是在多线程环境下使用;得保证线程安全。
在线程里使用这些方法时;如果出现符合阻塞条件情况就阻塞。
public class test17 {
static int i;
public static void main(String[] args) throws InterruptedException {
//看看能不能基于我们创建这个阻塞队列实现生产者消费者模型
my blockingDeque=new my(100);//这里浪费一个空间;达到98;就会阻塞
Thread t1=new Thread(()->{//生产者
while (true){
try {
blockingDeque.offer1(i);
System.out.println(i);//生产的元素
i++;
Thread.sleep(50);//生产的很慢;消费者就得阻塞先;等到元素生产出来
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
});
t1.start();
Thread t2=new Thread(()->{//消费者
while (true){
try {
System.out.println(blockingDeque.poll1());//消费的元素
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
});
// t2.start();
}
}
class my {//简单阻塞队列的实现;;先复习一遍循环队列
private int[] elem;
public int front;//队头
public int rear;//队尾
public my(int k) {
elem = new int[k];
}
public void offer1(int i) throws InterruptedException {
synchronized (this) {
if (isfull()) {
this.wait();
}
elem[rear] = i;
rear = (rear + 1) % elem.length;
// 这个 notify 唤醒 poll1 中的 wait
this.notify();
}
}
//出队方法
public int poll1() throws InterruptedException {
synchronized (this) {
if (isnull()) {
this.wait();//两个wait是不可能同时触发的;因为一个队列不可能既是空又是满。只要this是同一个对象就不会.
}
int value = front;
front = (front + 1) % elem.length;
// 这个 notify 唤醒 offer1 中的 wait
this.notify();
return elem[value];
}
}
public boolean isfull() {
if ((rear + 1) % elem.length ==front) {
return true;
}
return false;
}
public boolean isnull() {
//队头==队尾就是空
if (rear == front) {
return true;
}
return false;
}
}
上述代码还有一丝丝的瑕疵;offer1中wait被唤醒的时候;if的条件一定就不成立?(也就是队列一定是不满的?)
我们当前代码是取元素的时候;就唤醒,是不存在这个问题。万一其它操作也可能唤醒这个wait;但是情况又是队列不满的呢?
所以我们改成
whlie (isnull()) {
this.wait();
}