文章目录
- 一、阻塞队列
- 阻塞队列的概念
- 阻塞队列相关类和方法
- 生产者消费者模型
- 二、自定义实现阻塞队列
- 自定义实现循环队列
- 自定义实现阻塞队列
- 生产者消费者模型体验
一、阻塞队列
阻塞队列的概念
队列我们并不默认,一提起队列,我们立马就能想到 "先进先出"的特性。
今天我们就来学习一下特殊的队列: 阻塞队列,它具有下面两个特性:
- 如果队列为空,执行出队列操作时,就会阻塞,直到另一个线程往队列里添加元素(队列不为空时)
- 如果队列为满,执行入队列操作时,就会阻塞,直到另一个线程从队列里取走元素
(队列不为满时)
阻塞队列相关类和方法
我们的JUC下为我们提供了一个泛型接口BlockingQueue.
同时也提供了一下具体的实现类:
ArrayBlockingQueue : 一个由数组结构组成的有界阻塞队列。
LinkedBlockingQueue : 一个由链表结构组成的有界阻塞队列。
PriorityBlockingQueue : 一个支持优先级排序的无界阻塞队列。
LinkedTransferQueue: 一个由链表结构组成的无界阻塞队列。
LinkedBlockingDeque: 一个由链表结构组成的双向阻塞队列。
SynchronousQueue: 一个不存储元素的阻塞队列。
同时也为我们提供了一些操作阻塞队列的方法。
方法 | 作用 |
---|---|
void put() | 带有阻塞特性的入队操作 |
E take() | 带有阻塞特性的出队特性 |
boolean contains(Object o) | 判断阻塞队列是否包含某元素 |
我们来简单的使用一下阻塞队列
public static void main(String[] args) throws InterruptedException {
BlockingQueue<Integer> queue = new LinkedBlockingQueue<>();
queue.put(1);
queue.put(2);
Integer take = queue.take();
System.out.println("取出阻塞队列中的元素: " + take);
take = queue.take();
System.out.println("取出阻塞队列中的元素: " + take);
take = queue.take();
System.out.println("取出阻塞队列中的元素: " + take);
}
我们可以发现我们阻塞队列只放入了2个元素,当我们取元素时,取第三个时,因为阻塞队列没有元素,所以一直在阻塞等待。
生产者消费者模型
为什么会有生产者消费者模型?
在我们多线程编程种,为了解决生产者(生产数据的线程)和消费者(消费数据的线程)之间的执行速度的差异和解决生产者和消费者之间的强耦合关系。
通过在生产者和消费者之间增加一个队列来避免生产者和消费者之间的直接通信,生产者将数据直接添加到队列中,消费者直接从队列中取数据处理。
那么这样的模型有什么好处呢?
1.降低生产者和消费者之间的耦合度
2.增加一个阻塞队列,平衡生产者和消费者之间的处理能力
比如我们的双11,12的购物狂欢节,某一时刻的秒杀活动,我们的服务器A某一时刻会向服务器B发送巨量的强求数据,但是我们的服务器处理速度有限,一次性招架不住这么多请求,有可能服务器之间就崩了。
我们使用生产者消费者模型时,就不会受到影响,即使服务器的请求暴涨,但是他发送到阻塞队列中,我们的服务器B按照自己的处理速度去阻塞队列中取元素,所以不会产生实质性的影响。
public static void main(String[] args) {
BlockingQueue<Integer> queue = new LinkedBlockingQueue<>();
Thread customer = new Thread(() -> {
while(true) {
try {
int val = queue.take();
System.out.println("消费者消费了: " + val);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
customer.start();
Thread producer = new Thread(() -> {
for (int i = 0; i < 10; i++) {
try {
queue.put(i);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
});
producer.start();
}
二、自定义实现阻塞队列
我们的阻塞队列可以由链表实现,也可以由数组实现,由于链表实现比较简单,尾插头删或者头插尾删即可,这里我们重点介绍数组实现阻塞队列,大家也可以先去学习一下队列这篇文章:java详解队列
自定义实现循环队列
在实现循环队列时,我们主要面临的问题是,什么情况下队列为空,什么情况下队列为满,在判断满时:我们有两种方案,定义一个size变量,如果等于0为空,等于队列容量为满,这种过于简单,我们采用浪费一个空间的办法,如果head == tail队列为空,如果tail的下一个位置为head为满。
public class MyCircularQueue {
//定义队列,默认大小50
private int[] num = new int[50];
//定义队头指针
private int head;
//定义队尾指针
private int tail;
//记录队列元素
private int size;
//入队
public void put(int val) {
if(size == num.length) {
//队列为满
return;
}
num[tail] = val;
tail++;
// 等价tail = tail % num.length
if(tail >= num.length) {
tail = 0; //这样的写法,可读性较高
}
size++;
}
//出队
public Integer take() {
if(size == 0) {
//队列为空
return null;
}
int ret = num[head];
head++;
//等价 head = head % num.length
if(head >= num.length) {
head = 0;
}
size--;
return ret;
}
}
自定义实现阻塞队列
因为我们的阻塞队列基本都是在多线程情况下使用的,我们的put和take方法都涉及了多线程的读写操作,所以我们加锁。
public class MyBlockingQueue {
//定义队列,默认大小50
private int[] num = new int[50];
//定义队头指针
private int head;
//定义队尾指针
private int tail;
//记录队列元素
private int size;
//入队
public void put(int val) {
synchronized (this) {
if(size == num.length) {
//队列为满
return;
}
num[tail] = val;
tail++;
// 等价tail = tail % num.length
if(tail >= num.length) {
tail = 0; //这样的写法,可读性较高
}
size++;
}
}
//出队
public Integer take() {
synchronized (this) {
if(size == 0) {
//队列为空
return null;
}
int ret = num[head];
head++;
//等价 head = head % num.length
if(head >= num.length) {
head = 0;
}
size--;
return ret;
}
}
}
阻塞队列,其最重要的功能就是阻塞,既然我们要实现阻塞功能,我们这里的业务场景需要,主要使用wait 和 notify实现阻塞等待,以及唤醒。我们需要在两处阻塞
- 当我们take时,队列为空,我们需要wait方法使其阻塞,直到有元素入队列时,在notify将其唤醒。
- 当我们put时,队列为满,我们需要wait方法使其阻塞,直到有元素出队列时,在notify将其唤醒。
public class MyBlockingQueue {
//定义队列,默认大小50
private int[] num = new int[50];
//定义队头指针
private int head;
//定义队尾指针
private int tail;
//记录队列元素
private int size;
//入队
public void put(int val) throws InterruptedException {
synchronized (this) {
if(size == num.length) {
//队列为满,阻塞等待
this.wait();
}
num[tail] = val;
tail++;
// 等价tail = tail % num.length
if(tail >= num.length) {
tail = 0; //这样的写法,可读性较高
}
size++;
//唤醒take方法中因队列为空阻塞的线程
this.notify();
}
}
//出队
public Integer take() throws InterruptedException {
synchronized (this) {
if(size == 0) {
//队列为空,阻塞等待
this.wait();
}
int ret = num[head];
head++;
//等价 head = head % num.length
if(head >= num.length) {
head = 0;
}
size--;
//唤醒put方法中因队列为满阻塞的线程
this.notify();
return ret;
}
}
}
上述代码,我们已经基本实现了阻塞队列的功能,但是有一点美中不足的地方,我们的wait被唤醒之后,一定能确保if的条件一定是不满足的吗?
虽然这里可能是肯定的,但我们是一个逻辑严密的程序员,我们在被唤醒的时候在去判断一下是否满足更为妥善。
我们wait的源码中,也是这样建议的,所以我们修改完善一下。
//入队
public void put(int val) throws InterruptedException {
synchronized (this) {
while(size == num.length) {
//队列为满,阻塞等待
this.wait();
}
num[tail] = val;
tail++;
// 等价tail = tail % num.length
if(tail >= num.length) {
tail = 0; //这样的写法,可读性较高
}
size++;
//唤醒take方法中因队列为空阻塞的线程
this.notify();
}
}
//出队
public Integer take() throws InterruptedException {
synchronized (this) {
while(size == 0) {
//队列为空,阻塞等待
this.wait();
}
int ret = num[head];
head++;
//等价 head = head % num.length
if(head >= num.length) {
head = 0;
}
size--;
//唤醒put方法中因队列为满阻塞的线程
this.notify();
return ret;
}
}
生产者消费者模型体验
我们用我们自定义实现的阻塞队列体验一下生产者消费者模型。
public static void main(String[] args) {
MyBlockingQueue queue = new MyBlockingQueue();
Thread producer = new Thread(() -> {
int count = 0;
while (true) {
try {
queue.put(count);
System.out.println("生产了" + count);
Thread.sleep(50);
count++;
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
producer.start();
Thread customer = new Thread(() -> {
while (true) {
try {
int val = queue.take();
System.out.println("消费了: " + val);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
customer.start();
}
这样就实现了一个源源不断的生产者消费者模型。这些内容大家还需要多加练习。