目录
一.概念
二.生产者消费者问题
三.阻塞队列接口BlockingQueue
四.基于数组实现单锁的阻塞队列
1.加锁方式
2.代码实现
3.解释说明
(1).offer添加元素
(2)poll取出元素
4.timeout超时时间
5.测试
五.基于数组实现双锁的阻塞队列
1.问题
2.关于size共享变量
3.死锁问题
4.级联唤醒
(1)offer中只唤醒一次,其他交给poll线程唤醒
(2)poll中只唤醒一次,其他交给offer线程唤醒
5.完整代码
一.概念
阻塞队列是一种特殊类型的队列,具有额外的阻塞操作。在阻塞队列中,当队列为空时,从队列中获取元素的操作会被阻塞,直到有元素被添加到队列中为止;当队列满时,向队列中添加元素的操作会被阻塞,直到队列有空闲位置为止。
阻塞队列在多线程编程中非常有用,可以有效地进行线程间的协调和通信。它提供了一种线程安全的方式来共享数据,避免了常见的并发问题,如资源争用和死锁。
常见的阻塞队列有以下几种实现方式:
- ArrayBlockingQueue:基于数组实现的有界阻塞队列,需要指定队列的容量。
- LinkedBlockingQueue:基于链表实现的可选有界或无界阻塞队列。
- PriorityBlockingQueue:基于堆实现的无界优先级阻塞队列,元素按照优先级进行排序。
- SynchronousQueue:特殊的阻塞队列,每个插入操作必须等待一个对应的删除操作,反之亦然。
阻塞队列提供了一些常用的操作方法,如put()和take()等。put()方法用于向队列中添加元素,并在队列满时阻塞调用线程;take()方法用于获取队列中的元素,并在队列为空时阻塞调用线程。
使用阻塞队列可以简化并发编程的实现,提高代码的可读性和维护性。它能够有效地控制线程的访问顺序,并提供了一种直观的方式来处理线程间的同步问题。
二.生产者消费者问题
我们先来分析一下,在我们之前学过的操作系统中,有一个生产者和消费者问题,当生产者每次生产完一个产品后,要往缓冲区中放入,但是如果此时缓冲区是满的,那么就要让生产者进入阻塞状态,进行等待,当消费者从缓冲区中取出一个产品消费后,缓冲区有了空位,此时生产者就被唤醒,往缓冲区中放入生产的产品,如果有多个生产者和消费者,此时为了防止出现混乱,就要在放入缓冲区之前加锁,放入后解锁,消费者在从缓冲区取之前也要加锁,取出后再解锁
三.阻塞队列接口BlockingQueue
/**
* 阻塞队列接口定义
* @param <E>
*/
public interface BlockingQueue<E> {
void offer(E e) throws InterruptedException;
boolean offer(E e,long timeout) throws InterruptedException;
E poll() throws InterruptedException;
E poll(long timeout) throws InterruptedException;
boolean isFull();
boolean isEmpty();
}
我们定义了接口,其中,主要就是成对的两个方法,offer和poll,还有在此基础上加入超时时间的,加入超时时间是,当队列满时,超过了等待时间,就不去添加了,直接返回失败false
四.基于数组实现单锁的阻塞队列
1.加锁方式
这里我们先说明一下,在java中,可以有两种方式加锁
1.synocized关键字加锁
2.ReentrantLock类创建可重入锁对象,该对象还可以创建出条件对象,来执行阻塞和唤醒操作
2.代码实现
我们先来用单锁实现一下阻塞队列
/**
* 基于数组实现阻塞队列
* @param <E>
*/
public class ArrayBlockQueue<E> implements BlockingQueue<E>{
private final E[] array;
private int head;
private int tail;
private int size;
@SuppressWarnings("all")
public ArrayBlockQueue(int capacity){
array = (E[]) new Object[capacity];
}
//锁对象
private final ReentrantLock reentrantLock = new ReentrantLock();
//条件对象
private final Condition headWaits = reentrantLock.newCondition();
private final Condition tailWaits = reentrantLock.newCondition();
/**
* 向队尾添加元素,如果队列为满,则阻塞当前线程
* @param e 要添加元素
* @throws InterruptedException
*/
@Override
public void offer(E e) throws InterruptedException {
//先加锁
reentrantLock.lock();
try{
//判断队列是否为满,这里要用while循环来判断,防止虚假唤醒
while (isFull()){
//如果是满的,就让当前线程阻塞
tailWaits.await(); //因为是向队尾添加元素,所以用tailWaits
}
//当队列不满时,可以进行添加
array[tail] = e;
//先判断tail是否越界
if(++tail == array.length){
tail = 0;
}
//让size+1
size++;
//这时要通知poll,因为放入了一个元素,所以队列肯定不为空,通知poll线程可以取元素了
headWaits.signal(); //唤醒poll线程
}finally {
//释放锁
reentrantLock.unlock();
}
}
/**
* 向队尾添加元素,加入超时时间,如果队满,并且过了超时时间,返回false
* @param e 要添加的元素
* @param timeout 超时时间
* @return 是否添加成功
* @throws InterruptedException
*/
@Override
public boolean offer(E e, long timeout) throws InterruptedException {
//先加锁
reentrantLock.lock();
try{
//将传入的时间转换为纳秒
long t = TimeUnit.MILLISECONDS.toNanos(timeout);
//判断队列是否为满,这里要用while循环来判断,防止虚假唤醒
while (isFull()){
/**
* 如果是满的,就让当前线程阻塞
* 我们用加入时间的方法来阻塞
* 注意这里我们每次阻塞完唤醒后,就更新等待时间
* 如果要等待的时间是5s,那么如果等待了1s发现队列有空,那么唤醒之后,
* 如果是虚假唤醒,就要再次等待,那么下次等待时间就是4s
* 如果等待时间 t<=0了,说明等待超时,直接返回false,不要在等了
**/
if(t <= 0){
return false;
}
t = tailWaits.awaitNanos(t);
}
//当队列不满时,可以进行添加
array[tail] = e;
//先判断tail是否越界
if(++tail == array.length){
tail = 0;
}
//让size+1
size++;
//这时要通知poll,因为放入了一个元素,所以队列肯定不为空,通知poll线程可以取元素了
headWaits.signal(); //唤醒poll线程
/**
* 到这里就说明添加成功,返回true
*/
return true;
}finally {
//释放锁
reentrantLock.unlock();
}
}
/**
* 移除队头元素,如果队列为空,则阻塞当前线程
* @return 队头元素
* @throws InterruptedException
*/
@Override
public E poll() throws InterruptedException {
//先加锁
reentrantLock.lock();
try {
//先循环判断队列是否为空
while (isEmpty()){
//如果队列为空,让当前线程阻塞
headWaits.await();
}
//如果队列不为空,可以取了
E e = array[head];
array[head] = null; //help GC
//判断head是否越界
if(++head == array.length){
head = 0;
}
//让size-1
size--;
//这时队列因为取出了一个元素,所以肯定不为满,通知offer线程可以添加元素了
tailWaits.signal(); //唤醒offer线程
return e;
}finally {
//释放锁
reentrantLock.unlock();
}
}
/**
* 移除队头元素,加入超时时间,如果队空,并且超过等待时间,返回null
* @param timeout 超时时间
* @return 队头元素
* @throws InterruptedException
*/
@Override
public E poll(long timeout) throws InterruptedException {
//先加锁
reentrantLock.lock();
try {
long t = TimeUnit.MILLISECONDS.toNanos(timeout);
//先循环判断队列是否为空
while (isEmpty()){
//如果队列为空,让当前线程阻塞
if(t <= 0){
return null;
}
t = headWaits.awaitNanos(t);
}
//如果队列不为空,可以取了
E e = array[head];
array[head] = null; //help GC
//判断head是否越界
if(++head == array.length){
head = 0;
}
//让size-1
size--;
//这时队列因为取出了一个元素,所以肯定不为满,通知offer线程可以添加元素了
tailWaits.signal(); //唤醒offer线程
return e;
}finally {
//释放锁
reentrantLock.unlock();
}
}
@Override
public boolean isFull() {
return size == array.length;
}
@Override
public boolean isEmpty() {
return size == 0;
}
@Override
public String toString() {
return Arrays.toString(array);
}
}
3.解释说明
注释中也都写了,我在这里再次解释一下:
(1).offer添加元素
当我们执行offer方法向队列中添加元素时:
1. 我们需要先加锁,
2. 我们判断队列是否为满,
3. 如果满了,我们需要让当前线程阻塞 ,
注意:这里使用的是while循环,为什么不用if呢?我们想,如果用if,那么它只会判断一次,如果当某个时刻,队列从满变为不满,这时我们阻塞的offer线程被唤醒,将要去添加元素,但就在此时,另一个offer1线程可能在offer线程添加之前抢先往队列中添加了元素,那么offer线程再去添加就会报错,也就是虚假唤醒(spurious wakeups),使用while循环判断队列是否为满,可以在阻塞线程被唤醒后重新判断队列是否满足条件。如果队列仍然满,线程会继续被阻塞,直到队列有空闲位置。这样可以预防虚假唤醒的问题,确保线程只有在满足条件的情况下才会执行添加元素的操作。
4.当while条件不成立时,也就是队列有空为,并且此时没有其他线程来争抢,那么就可以往队列中添加元素了 , queue[tail] = e;
5.判断++tail是否达到了数组末尾位置,如果到了,那么重新调整为0,相当于一个圆圈
6.让size++
7.当offer线程向队列中添加元素后,此时队列肯定不为空,我们应该向poll线程发出信号,可以唤醒,相当于操作系统中的信号量机制
(2)poll取出元素
1.先加锁
2.判断队列是否为空,同理使用while循环判断
3.当队列不为空时,取出队头元素
4.判断++head是否到数组末尾位置,如果到了,重新置为0
5.让size--;
6.当poll线程从队列中取出元素后,队列肯定不为满,我们应该向offer线程发出信号,唤醒offer线程
4.timeout超时时间
我们可以为offer和poll设置超时时间,当超过了等待时间,将直接返回,不在执行
看一下offer方法设置timeout
/**
* 如果是满的,就让当前线程阻塞
* 我们用加入时间的方法来阻塞
* 注意这里我们每次阻塞完唤醒后,就更新等待时间
* 如果要等待的时间是5s,那么如果等待了1s发现队列有空,那么唤醒之后,
* 如果是虚假唤醒,就要再次等待,那么下次等待时间就是4s
* 如果等待时间 t<=0了,说明等待超时,直接返回false,不要在等了
**/
poll方法同理...
5.测试
下面让我们来测试一下代码
public class TestBlockQueue {
public static void main(String[] args) throws InterruptedException {
ArrayBlockQueue<String> queue = new ArrayBlockQueue<>(4);
queue.offer("task1");
queue.offer("task2");
queue.offer("task3");
queue.offer("task4");
System.out.println(queue);
new Thread(() -> {
System.out.println(Thread.currentThread().getName() + "开始添加元素...");
try {
boolean flag = queue.offer("task5", 4000);
if(flag){
System.out.println(Thread.currentThread().getName() + "添加元素成功....");
}else {
System.out.println(Thread.currentThread().getName()+"添加元素超超时失败....");
}
System.out.println(queue);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}, "t1").start();
}
}
我们给队列初始大小设为4,然后向队列中添加4个任务,此时队列为满,然后我们开启一个t1线程向队列中添加元素,设置超时时间为4s,判断是否能添加成功
运行:
[task1, task2, task3, task4]
t1开始添加元素...
t1添加元素超超时失败....
[task1, task2, task3, task4]
进程已结束,退出代码0
可以看到,过了4s,添加失败,因为我们并没有取出任何元素,所以offer线程一直阻塞直到超时失败!
下面,我们让主线程先休眠2s,然后取出一个元素,再次观察t1是否能添加成功:
//让主线程休眠2s,然后poll
Thread.sleep(2000);
queue.poll();
在上面的代码中添加以上代码,然后运行:
[task1, task2, task3, task4]
t1开始添加元素...
t1添加元素成功....
[task5, task2, task3, task4]
进程已结束,退出代码0
这次可以看到,task5被成功的添加到了队列中,因为我们在超时时间之前取出了队列的一个元素,队列有了空位,task5就可以添加到队列中了。
五.基于数组实现双锁的阻塞队列
1.问题
上面我们是用一把锁来给offer线程和poll线程加锁,他们两个操作用的同一把锁,这样其实并不好,效率比较低,而且添加和取出操作应该是两个互不影响的操作,是互相解耦的,所以我们应该使用双锁来给他们分别加锁和释放锁
2.关于size共享变量
当我们换成双锁后,需要思考一个问题,这个头指针head和尾指针tail,head是poll线程用来取出队列头元素的,tail是offer线程用来向队尾添加元素使用的,所以说head和tail这两个变量是互不影响的,它们分别在各自的线程里使用,但是对于size,在offer线程中,最后要让size++;在poll线程中,最后要让size--,这就是共享的变量了,可能会出现线程安全问题,如果两个线程不是顺序执行的,而是交错执行,就会是size的值发生混乱,所以我们要对size作约束
在java中,我们可以实现原子类来对变量进行线程安全保护,对于int类型的我们使用AtomicInteger,
加一可以使用getAndIncreament()方法,减一可以使用getAndDecreament()方法
//offer锁
private final ReentrantLock headLock = new ReentrantLock();
//poll锁
private final ReentrantLock tailLock = new ReentrantLock();
//条件对象
private final Condition headWaits = headLock.newCondition();
private final Condition tailWaits = tailLock.newCondition();
3.死锁问题
如果我们添加了双锁,那么我们需要设置各自的条件去阻塞和唤醒线程,先看一下offer线程,
@Override
public void offer(E e) throws InterruptedException {
//先加锁
tailLock.lock();
try{
//判断队列是否为满,这里要用while循环来判断,防止虚假唤醒
while (isFull()){
//如果是满的,就让当前线程阻塞
tailWaits.await(); //因为是向队尾添加元素,所以用tailWaits
}
//当队列不满时,可以进行添加
array[tail] = e;
//先判断tail是否越界
if(++tail == array.length){
tail = 0;
}
//让size+1
size.getAndIncrement();
headLock.lock();
try {
headWaits.signal(); //poll1
}finally {
headLock.unlock();
}
}finally {
//释放锁
tailLock.unlock();
}
}
}
注意看,这里我们用headWaits唤醒线程时,它是在tailLock释放锁之前,也就是一个嵌套结构,这样就会导致死锁的发生,
如果tailLock先加锁了,然后headLock也去加锁,之后在offer线程中的headLock想去加锁就加不上了,同理poll线程中的tailLock想加锁也加不上去,他们两个线程互相僵持,陷入了死锁状态,为了防止死锁发生,我们只要把嵌套结构改为平级结构就可以了,这样就能保证一定是释放完锁之后再去加锁,一定可以加锁成功!
@Override
public void offer(E e) throws InterruptedException {
//先加锁
tailLock.lock();
try{
//判断队列是否为满,这里要用while循环来判断,防止虚假唤醒
while (isFull()){
//如果是满的,就让当前线程阻塞
tailWaits.await(); //因为是向队尾添加元素,所以用tailWaits
}
//当队列不满时,可以进行添加
array[tail] = e;
//先判断tail是否越界
if(++tail == array.length){
tail = 0;
}
//让size+1
size.getAndIncrement();
}finally {
//释放锁
tailLock.unlock();
}
/**
* 平级可以防止死锁
* 唤醒poll线程,但是因为是各自不同的锁,所以需要在他们各自的锁内唤醒
*/
headLock.lock();
try {
headWaits.signal(); //poll1
}finally {
headLock.unlock();
}
}
这样的平级结构就可以防止死锁发生了...
4.级联唤醒
以上的唤醒逻辑,是每次都要进行一次唤醒,这样其实效率还不是最好的,那么我们可不可以减少唤醒的次数来提高效率呢?我们可以通过级联唤醒来实现
(1)offer中只唤醒一次,其他交给poll线程唤醒
先看一下offer线程,在最后要随机唤醒一个poll线程,我们让它只唤醒一个线程,剩下的线程交给poll线程自己去唤醒,比如有poll1,poll2,poll3,poll4四个线程,那么我们的offer线程只唤醒poll1线程,然后让poll1去唤醒poll2,poll2去唤醒poll3,以此类推...,这样就可以让offer只执行一次唤醒,提高了效率
@Override
public void offer(E e) throws InterruptedException {
//记录一下每次size加一之前的值
int c;
//先加锁
tailLock.lock();
try{
//判断队列是否为满,这里要用while循环来判断,防止虚假唤醒
while (isFull()){
//如果是满的,就让当前线程阻塞
tailWaits.await(); //因为是向队尾添加元素,所以用tailWaits
}
//当队列不满时,可以进行添加
array[tail] = e;
//先判断tail是否越界
if(++tail == array.length){
tail = 0;
}
//让size+1
c = size.getAndIncrement();
//如果c+1>array.length,说明队列还是有空位置,就自己唤醒后面的线程
if( c+1 > array.length){
tailWaits.signal();
}
}finally {
//释放锁
tailLock.unlock();
}
/**
* 平级可以防止死锁
* 唤醒poll线程,但是因为是各自不同的锁,所以需要在他们各自的锁内唤醒
*/
if( c == 0){ //offer_0,offer_1,offer_2, c=0说明是第一个offer_0,队列时空的,准备添加第一个
headLock.lock();
try {
/**
* 这里我们只唤醒第一个poll1线程,其他的
* 交给poll线程自己唤醒
* 如果c为0,说明队列为空,准备要添加第一个元素,就只让offer_0来唤醒
*/
headWaits.signal(); //poll1
}finally {
headLock.unlock();
}
}
}
我们来看改进后的代码,先设置一个c变量来记录每次size改变前的值,在唤醒时,先判断c是否等于0,这里等于0说明队列从空的状态开始,去添加第一个元素,那么也就是第一个offer1线程,让它去唤醒poll线程,其他的poll线程交给poll线程自己去唤醒,再看一下poll的代码:
@Override
public E poll() throws InterruptedException {
E e;
int c;
//先加锁
headLock.lock();
try {
//先循环判断队列是否为空
while (isEmpty()){
//如果队列为空,让当前线程阻塞
headWaits.await();
}
//如果队列不为空,可以取了
e = array[head];
array[head] = null; //help GC
//判断head是否越界
if(++head == array.length){
head = 0;
}
//让size-1
c = size.getAndDecrement();
//在这里让poll1唤醒poll2,poll2接着唤醒poll3...
if( c > 1 ){
//c>1说明队列中还有不止一个元素,可以继续唤醒其他poll线程来去元素
headWaits.signal();
}
}finally {
//释放锁
headLock.unlock();
}
/**
* 平级,防止死锁
* 唤醒offer线程,需要加锁
* 当 c == array.length时,说明队列从满变为不满,这时才去
* 给tailLock加锁
*/
if( c == array.length ){
tailLock.lock();
try {
tailWaits.signal();
}finally {
tailLock.unlock();
}
}
return e;
}
在poll代码中,只需要判断c是否大于1,如果c>1,说明队列中还有元素,可以继续唤醒,那么就让poll1去唤醒poll2,poll2去唤醒poll3.....
(2)poll中只唤醒一次,其他交给offer线程唤醒
再来看poll中,当c==array.length时,说明这时是队列从满变为不满,只有这时才去唤醒,其他情况,比如队列时不满的,也不去唤醒,
然后在offer中,当c+1<array.length时,说明即使c+1也还有空位,可以继续添加,所以由offer自己去唤醒其他offer线程,offer1唤醒offer2,offer2唤醒offer3....
5.完整代码
/**
* 基于数组的双锁实现阻塞队列
*/
public class ArrayDLBlockQueue<E> implements BlockingQueue<E> {
private final E[] array;
private int head;
private int tail;
//在双锁条件下,size是共享的变量,需要保证原子性
private AtomicInteger size = new AtomicInteger();
@SuppressWarnings("all")
public ArrayDLBlockQueue(int capacity){
array = (E[]) new Object[capacity];
}
//offer锁
private final ReentrantLock headLock = new ReentrantLock();
//poll锁
private final ReentrantLock tailLock = new ReentrantLock();
//条件对象
private final Condition headWaits = headLock.newCondition();
private final Condition tailWaits = tailLock.newCondition();
/**
* 向队尾添加元素,如果队列为满,则阻塞当前线程
* @param e 要添加元素
* @throws InterruptedException
*/
@Override
public void offer(E e) throws InterruptedException {
//记录一下每次size加一之前的值
int c;
//先加锁
tailLock.lock();
try{
//判断队列是否为满,这里要用while循环来判断,防止虚假唤醒
while (isFull()){
//如果是满的,就让当前线程阻塞
tailWaits.await(); //因为是向队尾添加元素,所以用tailWaits
}
//当队列不满时,可以进行添加
array[tail] = e;
//先判断tail是否越界
if(++tail == array.length){
tail = 0;
}
//让size+1
c = size.getAndIncrement();
//如果c+1>array.length,说明队列还是有空位置,就自己唤醒后面的线程
if( c+1 > array.length){
tailWaits.signal();
}
}finally {
//释放锁
tailLock.unlock();
}
/**
* 平级可以防止死锁
* 唤醒poll线程,但是因为是各自不同的锁,所以需要在他们各自的锁内唤醒
*/
if( c == 0){ //offer_0,offer_1,offer_2, c=0说明是第一个offer_0,队列时空的,准备添加第一个
headLock.lock();
try {
/**
* 这里我们只唤醒第一个poll1线程,其他的
* 交给poll线程自己唤醒
* 如果c为0,说明队列为空,准备要添加第一个元素,就只让offer_0来唤醒
*/
headWaits.signal(); //poll1
}finally {
headLock.unlock();
}
}
}
/**
* 向队尾添加元素,加入超时时间,如果队满,并且过了超时时间,返回false
* @param e 要添加的元素
* @param timeout 超时时间
* @return 是否添加成功
* @throws InterruptedException
*/
@Override
public boolean offer(E e, long timeout) throws InterruptedException {
int c;
//先加锁
tailLock.lock();
try{
//将传入的时间转换为纳秒
long t = TimeUnit.MILLISECONDS.toNanos(timeout);
//判断队列是否为满,这里要用while循环来判断,防止虚假唤醒
while (isFull()){
/**
* 如果是满的,就让当前线程阻塞
* 我们用加入时间的方法来阻塞
* 注意这里我们每次阻塞完唤醒后,就更新等待时间
* 如果要等待的时间是5s,那么如果等待了1s发现队列有空,那么唤醒之后,
* 如果是虚假唤醒,就要再次等待,那么下次等待时间就是4s
* 如果等待时间 t<=0了,说明等待超时,直接返回false,不要在等了
**/
if(t <= 0){
return false;
}
t = tailWaits.awaitNanos(t);
}
//当队列不满时,可以进行添加
array[tail] = e;
//先判断tail是否越界
if(++tail == array.length){
tail = 0;
}
//让size+1
c = size.getAndIncrement();
if(c+1 < array.length){
tailWaits.signal();
}
}finally {
//释放锁
tailLock.unlock();
}
/**
* 平级防止死锁
*/
if( c==0){
headLock.lock();
try {
headWaits.signal();
}finally {
headLock.unlock();
}
}
return true;
}
/**
* 移除队头元素,如果队列为空,则阻塞当前线程
* @return 队头元素
* @throws InterruptedException
*/
@Override
public E poll() throws InterruptedException {
E e;
int c;
//先加锁
headLock.lock();
try {
//先循环判断队列是否为空
while (isEmpty()){
//如果队列为空,让当前线程阻塞
headWaits.await();
}
//如果队列不为空,可以取了
e = array[head];
array[head] = null; //help GC
//判断head是否越界
if(++head == array.length){
head = 0;
}
//让size-1
c = size.getAndDecrement();
//在这里让poll1唤醒poll2,poll2接着唤醒poll3...
if( c > 1 ){
//c>1说明队列中还有不止一个元素,可以继续唤醒其他poll线程来去元素
headWaits.signal();
}
}finally {
//释放锁
headLock.unlock();
}
/**
* 平级,防止死锁
* 唤醒offer线程,需要加锁
* 当 c == array.length时,说明队列从满变为不满,这时才去
* 给tailLock加锁
*/
if( c == array.length ){
tailLock.lock();
try {
tailWaits.signal();
}finally {
tailLock.unlock();
}
}
return e;
}
/**
* 移除队头元素,加入超时时间,如果队空,并且超过等待时间,返回null
* @param timeout 超时时间
* @return 队头元素
* @throws InterruptedException
*/
@Override
public E poll(long timeout) throws InterruptedException {
E e;
int c;
//先加锁
headLock.lock();
try {
long t = TimeUnit.MILLISECONDS.toNanos(timeout);
//先循环判断队列是否为空
while (isEmpty()){
//如果队列为空,让当前线程阻塞
if(t <= 0){
return null;
}
t = headWaits.awaitNanos(t);
}
//如果队列不为空,可以取了
e = array[head];
array[head] = null; //help GC
//判断head是否越界
if(++head == array.length){
head = 0;
}
//让size-1
c = size.getAndDecrement();
if( c > 1){
headWaits.signal();
}
}finally {
//释放锁
headLock.unlock();
}
/**
* 平级,防止死锁
* 唤醒offer线程,需要加锁
*/
if( c == array.length ){
tailLock.lock();
try {
tailWaits.signal();
}finally {
tailLock.unlock();
}
}
return e;
}
@Override
public boolean isFull() {
return size.get() == array.length;
}
@Override
public boolean isEmpty() {
return size.get() == 0;
}
@Override
public String toString() {
return Arrays.toString(array);
}
以上就是对阻塞队列的分析了,读者可以在此基础上实现链表的实现等,我们下期再见!