java数据结构--阻塞队列

news2025/1/4 19:34:13

目录

一.概念

二.生产者消费者问题

三.阻塞队列接口BlockingQueue

四.基于数组实现单锁的阻塞队列

1.加锁方式

2.代码实现

3.解释说明

(1).offer添加元素

(2)poll取出元素

4.timeout超时时间

5.测试

五.基于数组实现双锁的阻塞队列

1.问题

2.关于size共享变量

3.死锁问题

 4.级联唤醒

(1)offer中只唤醒一次,其他交给poll线程唤醒

(2)poll中只唤醒一次,其他交给offer线程唤醒

5.完整代码


一.概念

阻塞队列是一种特殊类型的队列,具有额外的阻塞操作。在阻塞队列中,当队列为空时,从队列中获取元素的操作会被阻塞,直到有元素被添加到队列中为止;当队列满时,向队列中添加元素的操作会被阻塞,直到队列有空闲位置为止。

阻塞队列在多线程编程中非常有用,可以有效地进行线程间的协调和通信。它提供了一种线程安全的方式来共享数据,避免了常见的并发问题,如资源争用和死锁。

常见的阻塞队列有以下几种实现方式:

  1. ArrayBlockingQueue:基于数组实现的有界阻塞队列,需要指定队列的容量。
  2. LinkedBlockingQueue:基于链表实现的可选有界或无界阻塞队列。
  3. PriorityBlockingQueue:基于堆实现的无界优先级阻塞队列,元素按照优先级进行排序。
  4. SynchronousQueue:特殊的阻塞队列,每个插入操作必须等待一个对应的删除操作,反之亦然。

阻塞队列提供了一些常用的操作方法,如put()和take()等。put()方法用于向队列中添加元素,并在队列满时阻塞调用线程;take()方法用于获取队列中的元素,并在队列为空时阻塞调用线程。

使用阻塞队列可以简化并发编程的实现,提高代码的可读性和维护性。它能够有效地控制线程的访问顺序,并提供了一种直观的方式来处理线程间的同步问题。

二.生产者消费者问题

 我们先来分析一下,在我们之前学过的操作系统中,有一个生产者和消费者问题,当生产者每次生产完一个产品后,要往缓冲区中放入,但是如果此时缓冲区是满的,那么就要让生产者进入阻塞状态,进行等待,当消费者从缓冲区中取出一个产品消费后,缓冲区有了空位,此时生产者就被唤醒,往缓冲区中放入生产的产品,如果有多个生产者和消费者,此时为了防止出现混乱,就要在放入缓冲区之前加锁,放入后解锁,消费者在从缓冲区取之前也要加锁,取出后再解锁

三.阻塞队列接口BlockingQueue

/**
 * 阻塞队列接口定义
 * @param <E>
 */
public interface BlockingQueue<E> {

    void offer(E e) throws InterruptedException;

    boolean offer(E e,long timeout) throws InterruptedException;

    E poll() throws InterruptedException;

    E poll(long timeout) throws InterruptedException;


   boolean isFull();

   boolean isEmpty();

}

  我们定义了接口,其中,主要就是成对的两个方法,offer和poll,还有在此基础上加入超时时间的,加入超时时间是,当队列满时,超过了等待时间,就不去添加了,直接返回失败false

四.基于数组实现单锁的阻塞队列

1.加锁方式

  这里我们先说明一下,在java中,可以有两种方式加锁

  1.synocized关键字加锁

 2.ReentrantLock类创建可重入锁对象,该对象还可以创建出条件对象,来执行阻塞和唤醒操作

2.代码实现

  我们先来用单锁实现一下阻塞队列

/**
 * 基于数组实现阻塞队列
 * @param <E>
 */
public class ArrayBlockQueue<E> implements BlockingQueue<E>{


    private final E[] array;
    private int head;
    private int tail;
    private int size;

    @SuppressWarnings("all")
    public ArrayBlockQueue(int capacity){
        array = (E[]) new Object[capacity];
    }

    //锁对象
    private final ReentrantLock reentrantLock = new ReentrantLock();
    //条件对象
    private final Condition headWaits = reentrantLock.newCondition();
    private final Condition tailWaits = reentrantLock.newCondition();

    /**
     * 向队尾添加元素,如果队列为满,则阻塞当前线程
     * @param e 要添加元素
     * @throws InterruptedException
     */
    @Override
    public void offer(E e) throws InterruptedException {
       //先加锁
        reentrantLock.lock();
        try{
            //判断队列是否为满,这里要用while循环来判断,防止虚假唤醒
            while (isFull()){
                //如果是满的,就让当前线程阻塞
                tailWaits.await();  //因为是向队尾添加元素,所以用tailWaits
            }
            //当队列不满时,可以进行添加
            array[tail] = e;
            //先判断tail是否越界
            if(++tail == array.length){
                tail = 0;
            }
            //让size+1
            size++;
            //这时要通知poll,因为放入了一个元素,所以队列肯定不为空,通知poll线程可以取元素了
            headWaits.signal(); //唤醒poll线程
        }finally {
            //释放锁
            reentrantLock.unlock();
        }
    }

    /**
     * 向队尾添加元素,加入超时时间,如果队满,并且过了超时时间,返回false
     * @param e 要添加的元素
     * @param timeout 超时时间
     * @return 是否添加成功
     * @throws InterruptedException
     */
    @Override
    public boolean offer(E e, long timeout) throws InterruptedException {
        //先加锁
        reentrantLock.lock();

        try{
            //将传入的时间转换为纳秒
            long t = TimeUnit.MILLISECONDS.toNanos(timeout);
            //判断队列是否为满,这里要用while循环来判断,防止虚假唤醒
            while (isFull()){
                /**
                 *  如果是满的,就让当前线程阻塞
                 *  我们用加入时间的方法来阻塞
                 *  注意这里我们每次阻塞完唤醒后,就更新等待时间
                 *  如果要等待的时间是5s,那么如果等待了1s发现队列有空,那么唤醒之后,
                 *  如果是虚假唤醒,就要再次等待,那么下次等待时间就是4s
                 *  如果等待时间 t<=0了,说明等待超时,直接返回false,不要在等了
                 **/
                if(t <= 0){
                    return false;
                }
                t = tailWaits.awaitNanos(t);
            }
            //当队列不满时,可以进行添加
            array[tail] = e;
            //先判断tail是否越界
            if(++tail == array.length){
                tail = 0;
            }
            //让size+1
            size++;
            //这时要通知poll,因为放入了一个元素,所以队列肯定不为空,通知poll线程可以取元素了
            headWaits.signal(); //唤醒poll线程
            /**
             * 到这里就说明添加成功,返回true
             */
            return true;
        }finally {
            //释放锁
            reentrantLock.unlock();
        }
    }

    /**
     * 移除队头元素,如果队列为空,则阻塞当前线程
     * @return 队头元素
     * @throws InterruptedException
     */
    @Override
    public E poll() throws InterruptedException {
        //先加锁
        reentrantLock.lock();

        try {
          //先循环判断队列是否为空
            while (isEmpty()){
                //如果队列为空,让当前线程阻塞
                headWaits.await();
            }
            //如果队列不为空,可以取了
            E e = array[head];
            array[head] = null; //help GC
            //判断head是否越界
            if(++head == array.length){
                head = 0;
            }
            //让size-1
            size--;
            //这时队列因为取出了一个元素,所以肯定不为满,通知offer线程可以添加元素了
            tailWaits.signal(); //唤醒offer线程
            return e;
        }finally {
            //释放锁
            reentrantLock.unlock();
        }
    }

    /**
     * 移除队头元素,加入超时时间,如果队空,并且超过等待时间,返回null
     * @param timeout 超时时间
     * @return 队头元素
     * @throws InterruptedException
     */
    @Override
    public E poll(long timeout) throws InterruptedException {
        //先加锁
        reentrantLock.lock();

        try {
            long t = TimeUnit.MILLISECONDS.toNanos(timeout);
            //先循环判断队列是否为空
            while (isEmpty()){
                //如果队列为空,让当前线程阻塞
                if(t <= 0){
                    return null;
                }
                 t = headWaits.awaitNanos(t);
            }
            //如果队列不为空,可以取了
            E e = array[head];
            array[head] = null; //help GC
            //判断head是否越界
            if(++head == array.length){
                head = 0;
            }
            //让size-1
            size--;
            //这时队列因为取出了一个元素,所以肯定不为满,通知offer线程可以添加元素了
            tailWaits.signal(); //唤醒offer线程
            return e;
        }finally {
            //释放锁
            reentrantLock.unlock();
        }
    }

    @Override
    public boolean isFull() {
        return size == array.length;
    }

    @Override
    public boolean isEmpty() {
        return size == 0;
    }


    @Override
    public String toString() {
        return Arrays.toString(array);
    }
}

3.解释说明

注释中也都写了,我在这里再次解释一下:

(1).offer添加元素

  当我们执行offer方法向队列中添加元素时:

 1. 我们需要先加锁,

 2. 我们判断队列是否为满,

 3.  如果满了,我们需要让当前线程阻塞 ,

  注意:这里使用的是while循环,为什么不用if呢?我们想,如果用if,那么它只会判断一次,如果当某个时刻,队列从满变为不满,这时我们阻塞的offer线程被唤醒,将要去添加元素,但就在此时,另一个offer1线程可能在offer线程添加之前抢先往队列中添加了元素,那么offer线程再去添加就会报错,也就是虚假唤醒(spurious wakeups),使用while循环判断队列是否为满,可以在阻塞线程被唤醒后重新判断队列是否满足条件。如果队列仍然满,线程会继续被阻塞,直到队列有空闲位置。这样可以预防虚假唤醒的问题,确保线程只有在满足条件的情况下才会执行添加元素的操作。

 4.当while条件不成立时,也就是队列有空为,并且此时没有其他线程来争抢,那么就可以往队列中添加元素了 , queue[tail] = e;

5.判断++tail是否达到了数组末尾位置,如果到了,那么重新调整为0,相当于一个圆圈

6.让size++

7.当offer线程向队列中添加元素后,此时队列肯定不为空,我们应该向poll线程发出信号,可以唤醒,相当于操作系统中的信号量机制

(2)poll取出元素

    1.先加锁

    2.判断队列是否为空,同理使用while循环判断

    3.当队列不为空时,取出队头元素

    4.判断++head是否到数组末尾位置,如果到了,重新置为0

    5.让size--;

    6.当poll线程从队列中取出元素后,队列肯定不为满,我们应该向offer线程发出信号,唤醒offer线程

4.timeout超时时间

 我们可以为offer和poll设置超时时间,当超过了等待时间,将直接返回,不在执行

 看一下offer方法设置timeout

    /**
                 *  如果是满的,就让当前线程阻塞
                 *  我们用加入时间的方法来阻塞
                 *  注意这里我们每次阻塞完唤醒后,就更新等待时间
                 *  如果要等待的时间是5s,那么如果等待了1s发现队列有空,那么唤醒之后,
                 *  如果是虚假唤醒,就要再次等待,那么下次等待时间就是4s
                 *  如果等待时间 t<=0了,说明等待超时,直接返回false,不要在等了
                 **/

poll方法同理... 

5.测试

 下面让我们来测试一下代码

  

public class TestBlockQueue {


    public static void main(String[] args) throws InterruptedException {


        ArrayBlockQueue<String> queue = new ArrayBlockQueue<>(4);

        queue.offer("task1");
        queue.offer("task2");
        queue.offer("task3");
        queue.offer("task4");

        System.out.println(queue);

        new Thread(() -> {
            System.out.println(Thread.currentThread().getName() + "开始添加元素...");
            try {
                boolean flag = queue.offer("task5", 4000);
                if(flag){
                    System.out.println(Thread.currentThread().getName() + "添加元素成功....");
                }else {
                    System.out.println(Thread.currentThread().getName()+"添加元素超超时失败....");
                }

                System.out.println(queue);
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }

        }, "t1").start();



    }
}

 我们给队列初始大小设为4,然后向队列中添加4个任务,此时队列为满,然后我们开启一个t1线程向队列中添加元素,设置超时时间为4s,判断是否能添加成功

运行:

[task1, task2, task3, task4]
t1开始添加元素...
t1添加元素超超时失败....
[task1, task2, task3, task4]

进程已结束,退出代码0

可以看到,过了4s,添加失败,因为我们并没有取出任何元素,所以offer线程一直阻塞直到超时失败!

下面,我们让主线程先休眠2s,然后取出一个元素,再次观察t1是否能添加成功:

        //让主线程休眠2s,然后poll
        Thread.sleep(2000);
        queue.poll();

在上面的代码中添加以上代码,然后运行:

[task1, task2, task3, task4]
t1开始添加元素...
t1添加元素成功....
[task5, task2, task3, task4]

进程已结束,退出代码0

这次可以看到,task5被成功的添加到了队列中,因为我们在超时时间之前取出了队列的一个元素,队列有了空位,task5就可以添加到队列中了。

五.基于数组实现双锁的阻塞队列

1.问题

 上面我们是用一把锁来给offer线程和poll线程加锁,他们两个操作用的同一把锁,这样其实并不好,效率比较低,而且添加和取出操作应该是两个互不影响的操作,是互相解耦的,所以我们应该使用双锁来给他们分别加锁和释放锁

2.关于size共享变量

  当我们换成双锁后,需要思考一个问题,这个头指针head和尾指针tail,head是poll线程用来取出队列头元素的,tail是offer线程用来向队尾添加元素使用的,所以说head和tail这两个变量是互不影响的,它们分别在各自的线程里使用,但是对于size,在offer线程中,最后要让size++;在poll线程中,最后要让size--,这就是共享的变量了,可能会出现线程安全问题,如果两个线程不是顺序执行的,而是交错执行,就会是size的值发生混乱,所以我们要对size作约束

 在java中,我们可以实现原子类来对变量进行线程安全保护,对于int类型的我们使用AtomicInteger

加一可以使用getAndIncreament()方法,减一可以使用getAndDecreament()方法

    //offer锁
    private final ReentrantLock headLock = new ReentrantLock();
    //poll锁
    private final ReentrantLock tailLock = new ReentrantLock();
    //条件对象
    private final Condition headWaits = headLock.newCondition();
    private final Condition tailWaits = tailLock.newCondition();

3.死锁问题

  如果我们添加了双锁,那么我们需要设置各自的条件去阻塞和唤醒线程,先看一下offer线程,

    @Override
    public void offer(E e) throws InterruptedException {

       

        //先加锁
        tailLock.lock();
        try{
            //判断队列是否为满,这里要用while循环来判断,防止虚假唤醒
            while (isFull()){
                //如果是满的,就让当前线程阻塞
                tailWaits.await();  //因为是向队尾添加元素,所以用tailWaits
            }
            //当队列不满时,可以进行添加
            array[tail] = e;
            //先判断tail是否越界
            if(++tail == array.length){
                tail = 0;
            }
            //让size+1
            size.getAndIncrement();

             headLock.lock();
            try {
              
                headWaits.signal(); //poll1
            }finally {
                headLock.unlock();
            }
            
        }finally {
            //释放锁
            tailLock.unlock();
        }

       
        }

    }

注意看,这里我们用headWaits唤醒线程时,它是在tailLock释放锁之前,也就是一个嵌套结构,这样就会导致死锁的发生,

 

如果tailLock先加锁了,然后headLock也去加锁,之后在offer线程中的headLock想去加锁就加不上了,同理poll线程中的tailLock想加锁也加不上去,他们两个线程互相僵持,陷入了死锁状态,为了防止死锁发生,我们只要把嵌套结构改为平级结构就可以了,这样就能保证一定是释放完锁之后再去加锁,一定可以加锁成功!

 

   @Override
    public void offer(E e) throws InterruptedException {

       

        //先加锁
        tailLock.lock();
        try{
            //判断队列是否为满,这里要用while循环来判断,防止虚假唤醒
            while (isFull()){
                //如果是满的,就让当前线程阻塞
                tailWaits.await();  //因为是向队尾添加元素,所以用tailWaits
            }
            //当队列不满时,可以进行添加
            array[tail] = e;
            //先判断tail是否越界
            if(++tail == array.length){
                tail = 0;
            }
            //让size+1
            size.getAndIncrement();

          
        }finally {
            //释放锁
            tailLock.unlock();
        }


       /**
         * 平级可以防止死锁
         * 唤醒poll线程,但是因为是各自不同的锁,所以需要在他们各自的锁内唤醒
         */  
           headLock.lock();
            try {
              
                headWaits.signal(); //poll1
            }finally {
                headLock.unlock();
            }
            

    }

这样的平级结构就可以防止死锁发生了... 

 4.级联唤醒

  以上的唤醒逻辑,是每次都要进行一次唤醒,这样其实效率还不是最好的,那么我们可不可以减少唤醒的次数来提高效率呢?我们可以通过级联唤醒来实现

(1)offer中只唤醒一次,其他交给poll线程唤醒

  先看一下offer线程,在最后要随机唤醒一个poll线程,我们让它只唤醒一个线程,剩下的线程交给poll线程自己去唤醒,比如有poll1,poll2,poll3,poll4四个线程,那么我们的offer线程只唤醒poll1线程,然后让poll1去唤醒poll2,poll2去唤醒poll3,以此类推...,这样就可以让offer只执行一次唤醒,提高了效率

 

  @Override
    public void offer(E e) throws InterruptedException {

        //记录一下每次size加一之前的值
        int c;

        //先加锁
        tailLock.lock();
        try{
            //判断队列是否为满,这里要用while循环来判断,防止虚假唤醒
            while (isFull()){
                //如果是满的,就让当前线程阻塞
                tailWaits.await();  //因为是向队尾添加元素,所以用tailWaits
            }
            //当队列不满时,可以进行添加
            array[tail] = e;
            //先判断tail是否越界
            if(++tail == array.length){
                tail = 0;
            }
            //让size+1
            c = size.getAndIncrement();
            //如果c+1>array.length,说明队列还是有空位置,就自己唤醒后面的线程
            if( c+1 > array.length){
                tailWaits.signal();
            }
        }finally {
            //释放锁
            tailLock.unlock();
        }

        /**
         * 平级可以防止死锁
         * 唤醒poll线程,但是因为是各自不同的锁,所以需要在他们各自的锁内唤醒
         */
        if( c == 0){   //offer_0,offer_1,offer_2, c=0说明是第一个offer_0,队列时空的,准备添加第一个
            headLock.lock();
            try {
                /**
                 * 这里我们只唤醒第一个poll1线程,其他的
                 * 交给poll线程自己唤醒
                 * 如果c为0,说明队列为空,准备要添加第一个元素,就只让offer_0来唤醒
                 */
                headWaits.signal(); //poll1
            }finally {
                headLock.unlock();
            }
        }

    }

   我们来看改进后的代码,先设置一个c变量来记录每次size改变前的值,在唤醒时,先判断c是否等于0,这里等于0说明队列从空的状态开始,去添加第一个元素,那么也就是第一个offer1线程,让它去唤醒poll线程,其他的poll线程交给poll线程自己去唤醒,再看一下poll的代码:

 @Override
    public E poll() throws InterruptedException {
        E e;
        int c;
        //先加锁
        headLock.lock();
        try {
            //先循环判断队列是否为空
            while (isEmpty()){
                //如果队列为空,让当前线程阻塞
                headWaits.await();
            }
            //如果队列不为空,可以取了
             e = array[head];
            array[head] = null; //help GC
            //判断head是否越界
            if(++head == array.length){
                head = 0;
            }
            //让size-1
            c = size.getAndDecrement();
            //在这里让poll1唤醒poll2,poll2接着唤醒poll3...
            if( c > 1 ){
                //c>1说明队列中还有不止一个元素,可以继续唤醒其他poll线程来去元素
                headWaits.signal();
            }

        }finally {
            //释放锁
            headLock.unlock();
        }

        /**
         * 平级,防止死锁
         * 唤醒offer线程,需要加锁
         *  当 c == array.length时,说明队列从满变为不满,这时才去
         *  给tailLock加锁
         */
        if( c == array.length ){
            tailLock.lock();
            try {
                tailWaits.signal();
            }finally {
                tailLock.unlock();
            }
        }

        return e;
    }

在poll代码中,只需要判断c是否大于1,如果c>1,说明队列中还有元素,可以继续唤醒,那么就让poll1去唤醒poll2,poll2去唤醒poll3.....

(2)poll中只唤醒一次,其他交给offer线程唤醒

再来看poll中,当c==array.length时,说明这时是队列从满变为不满,只有这时才去唤醒,其他情况,比如队列时不满的,也不去唤醒,

然后在offer中,当c+1<array.length时,说明即使c+1也还有空位,可以继续添加,所以由offer自己去唤醒其他offer线程,offer1唤醒offer2,offer2唤醒offer3....

5.完整代码

/**
 * 基于数组的双锁实现阻塞队列
 */
public class ArrayDLBlockQueue<E> implements BlockingQueue<E> {



    private final E[] array;
    private int head;
    private int tail;
    //在双锁条件下,size是共享的变量,需要保证原子性
    private AtomicInteger size = new AtomicInteger();

    @SuppressWarnings("all")
    public ArrayDLBlockQueue(int capacity){
        array = (E[]) new Object[capacity];
    }

    //offer锁
    private final ReentrantLock headLock = new ReentrantLock();
    //poll锁
    private final ReentrantLock tailLock = new ReentrantLock();
    //条件对象
    private final Condition headWaits = headLock.newCondition();
    private final Condition tailWaits = tailLock.newCondition();

    /**
     * 向队尾添加元素,如果队列为满,则阻塞当前线程
     * @param e 要添加元素
     * @throws InterruptedException
     */
    @Override
    public void offer(E e) throws InterruptedException {

        //记录一下每次size加一之前的值
        int c;

        //先加锁
        tailLock.lock();
        try{
            //判断队列是否为满,这里要用while循环来判断,防止虚假唤醒
            while (isFull()){
                //如果是满的,就让当前线程阻塞
                tailWaits.await();  //因为是向队尾添加元素,所以用tailWaits
            }
            //当队列不满时,可以进行添加
            array[tail] = e;
            //先判断tail是否越界
            if(++tail == array.length){
                tail = 0;
            }
            //让size+1
            c = size.getAndIncrement();
            //如果c+1>array.length,说明队列还是有空位置,就自己唤醒后面的线程
            if( c+1 > array.length){
                tailWaits.signal();
            }
        }finally {
            //释放锁
            tailLock.unlock();
        }

        /**
         * 平级可以防止死锁
         * 唤醒poll线程,但是因为是各自不同的锁,所以需要在他们各自的锁内唤醒
         */
        if( c == 0){   //offer_0,offer_1,offer_2, c=0说明是第一个offer_0,队列时空的,准备添加第一个
            headLock.lock();
            try {
                /**
                 * 这里我们只唤醒第一个poll1线程,其他的
                 * 交给poll线程自己唤醒
                 * 如果c为0,说明队列为空,准备要添加第一个元素,就只让offer_0来唤醒
                 */
                headWaits.signal(); //poll1
            }finally {
                headLock.unlock();
            }
        }

    }

    /**
     * 向队尾添加元素,加入超时时间,如果队满,并且过了超时时间,返回false
     * @param e 要添加的元素
     * @param timeout 超时时间
     * @return 是否添加成功
     * @throws InterruptedException
     */
    @Override
    public boolean offer(E e, long timeout) throws InterruptedException {
        int c;
        //先加锁
        tailLock.lock();

        try{
            //将传入的时间转换为纳秒
            long t = TimeUnit.MILLISECONDS.toNanos(timeout);
            //判断队列是否为满,这里要用while循环来判断,防止虚假唤醒
            while (isFull()){
                /**
                 *  如果是满的,就让当前线程阻塞
                 *  我们用加入时间的方法来阻塞
                 *  注意这里我们每次阻塞完唤醒后,就更新等待时间
                 *  如果要等待的时间是5s,那么如果等待了1s发现队列有空,那么唤醒之后,
                 *  如果是虚假唤醒,就要再次等待,那么下次等待时间就是4s
                 *  如果等待时间 t<=0了,说明等待超时,直接返回false,不要在等了
                 **/
                if(t <= 0){
                    return false;
                }
                t = tailWaits.awaitNanos(t);
            }
            //当队列不满时,可以进行添加
            array[tail] = e;
            //先判断tail是否越界
            if(++tail == array.length){
                tail = 0;
            }
            //让size+1
            c = size.getAndIncrement();
            if(c+1 < array.length){
                tailWaits.signal();
            }
        }finally {
            //释放锁
            tailLock.unlock();
        }

        /**
         * 平级防止死锁
         */
        if( c==0){
            headLock.lock();
            try {
                headWaits.signal();
            }finally {
                headLock.unlock();
            }
        }


        return true;
    }

    /**
     * 移除队头元素,如果队列为空,则阻塞当前线程
     * @return 队头元素
     * @throws InterruptedException
     */
    @Override
    public E poll() throws InterruptedException {
        E e;
        int c;
        //先加锁
        headLock.lock();
        try {
            //先循环判断队列是否为空
            while (isEmpty()){
                //如果队列为空,让当前线程阻塞
                headWaits.await();
            }
            //如果队列不为空,可以取了
             e = array[head];
            array[head] = null; //help GC
            //判断head是否越界
            if(++head == array.length){
                head = 0;
            }
            //让size-1
            c = size.getAndDecrement();
            //在这里让poll1唤醒poll2,poll2接着唤醒poll3...
            if( c > 1 ){
                //c>1说明队列中还有不止一个元素,可以继续唤醒其他poll线程来去元素
                headWaits.signal();
            }

        }finally {
            //释放锁
            headLock.unlock();
        }

        /**
         * 平级,防止死锁
         * 唤醒offer线程,需要加锁
         *  当 c == array.length时,说明队列从满变为不满,这时才去
         *  给tailLock加锁
         */
        if( c == array.length ){
            tailLock.lock();
            try {
                tailWaits.signal();
            }finally {
                tailLock.unlock();
            }
        }

        return e;
    }

    /**
     * 移除队头元素,加入超时时间,如果队空,并且超过等待时间,返回null
     * @param timeout 超时时间
     * @return 队头元素
     * @throws InterruptedException
     */
    @Override
    public E poll(long timeout) throws InterruptedException {
        E e;
        int c;
        //先加锁
        headLock.lock();

        try {
            long t = TimeUnit.MILLISECONDS.toNanos(timeout);
            //先循环判断队列是否为空
            while (isEmpty()){
                //如果队列为空,让当前线程阻塞
                if(t <= 0){
                    return null;
                }
                t = headWaits.awaitNanos(t);
            }
            //如果队列不为空,可以取了
             e = array[head];
            array[head] = null; //help GC
            //判断head是否越界
            if(++head == array.length){
                head = 0;
            }
            //让size-1
            c = size.getAndDecrement();
            if( c > 1){
                headWaits.signal();
            }
        }finally {
            //释放锁
            headLock.unlock();
        }

        /**
         * 平级,防止死锁
         * 唤醒offer线程,需要加锁
         */
        if( c == array.length ){
            tailLock.lock();
            try {
                tailWaits.signal();
            }finally {
                tailLock.unlock();
            }
        }
       
        return e;
    }

    @Override
    public boolean isFull() {
        return size.get() == array.length;
    }

    @Override
    public boolean isEmpty() {
        return size.get() == 0;
    }


    @Override
    public String toString() {
        return Arrays.toString(array);
    }


以上就是对阻塞队列的分析了,读者可以在此基础上实现链表的实现等,我们下期再见!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1193824.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

数据的使用、表关系的创建、Django框架的请求生命周期流程图

目录 一、数据的增删改查 1. 用户列表的展示 2. 修改数据的逻辑分析 3. 删除功能的分析 二、如何创建表关系 三、Django的请求生命周期流程图 一、数据的增删改查 1. 用户列表的展示 把数据表中得用户数据都给查询出来展示在页面上 查询数据 def userlist(request):&qu…

luckysheet的使用——14.开启表格只读模式(所有单元格无法编辑)

开启只读模式后&#xff0c;所有的单元格都无法编辑&#xff0c;与非编辑模式做区分。 1.在src/global/api.js文件中&#xff0c;新增开启只读模式的方法&#xff1a; /*** 开启工作表只读模式(所有单元格无法编辑)*/ export function setWorkBookReadOnly() {Store.allowEdi…

Docker部署ubuntu1804镜像详细步骤

Docker部署ubuntu1804镜像详细步骤 ubuntu镜像库地址&#xff1a;https://hub.docker.com/_/ubuntu/tags?page1&ordering-name 拉取镜像&#xff08;默认为最新版本&#xff09;&#xff1a; docker pull ubuntu或&#xff0c;拉取指定版本镜像&#xff1a; docker pull…

P1529 [USACO2.4] 回家 Bessie Come Home 题解

文章目录 题目描述输入格式输出格式样例样例输入样例输出 提示完整代码 题目描述 现在是晚餐时间&#xff0c;而母牛们在外面分散的牧场中。 Farmer John 按响了电铃&#xff0c;所以她们开始向谷仓走去。 你的工作是要指出哪只母牛会最先到达谷仓&#xff08;在给出的测试数…

【文件IO】认识文件

文章目录 认识文件文件的结构和目录文件路径 认识文件 我们先来认识狭义上的文件(file)&#xff0c;针对硬盘这种持久化存储的I/O设备&#xff0c;当我们想要进行数据保存时&#xff0c;往往不是保存一个整体&#xff0c;而是独立成一个个单位进行保存&#xff0c;这个独立的单…

人工智能技术的高速发展,普通人如何借助AI实现弯道超车?

人工智能技术的高速发展&#xff0c;普通人如何借助AI实现弯道超车&#xff1f; 随着互联网信息传播的爆炸&#xff0c;人类科技文明的快速发展“人工智能”成为新的话题&#xff0c;科技的进步也让普通人觉得自己与社会脱节&#xff0c;找工作越来越难&#xff0c;创业越来越难…

前端开发学习指南

前端是一个看似入门门槛不高&#xff0c;但要学好很难的领域。前端的知识体系庞杂又松散&#xff0c;技术演进快&#xff0c;如果摸不清脉络的话很容易陷入盲人摸象的困境甚至跑偏。 其实只要掌握了正确的方法&#xff0c;学习前端和学好前端就只是个时间问题&#xff0c;希望下…

阻塞队列和定时器的使用

阻塞队列 谈到队列,大家就能想到队列的先进先出原则,但有些特殊的队列,虽然也是先进先出的,但是带有阻塞功能,我们把这种队列叫做阻塞队列. ★如果队列为空,执行出队操作就会阻塞,阻塞到另外一个线程往队列里添加元素(队列不为空)为止. ★如果队列满了,执行入队操作时,也会阻…

网工内推 | 运维工程师,软考认证优先,全额社保

01 北京中科网威信息技术有限公司 招聘岗位&#xff1a;运维工程师 职责描述&#xff1a; 1 熟悉网络安全标准&#xff0c;等级保护管理制度 2 负责等级保护管理制度的的企业管理要求编写&#xff1b; 3 熟系网络组网和相关安全产品&#xff1b; 4 负责用户需求挖掘、分析和…

关于锁策略

常见的锁策略悲观锁乐观锁读写锁轻量级锁、重量级锁自旋锁公平锁和非公平锁可重入锁 vs 不可重入锁synchronized是什么锁呢&#xff1f; 常见的锁策略 锁策略不仅仅限制于Java;其它锁相关的也是会涉及这些策略;这些特性主要是在实现锁的时候运用的。虽然我们的工作可能就是把轮…

中断处理程序的延迟可能导致中断标志位仍然被置位

当中断处理程序的执行时间超过了中断事件的频率时&#xff0c;可能出现中断标志位仍然被置位的情况。让我们来详细解释一下这种情况。 在一个典型的系统中&#xff0c;中断处理程序会在中断事件发生时被触发执行。中断处理程序负责处理中断事件&#xff0c;并可能执行一系列操…

sjvisualizer,一个超强的Python数据可视化动画库

大家好&#xff0c;今天给大家介绍一个非常棒的数据可视化库&#xff0c;sjvisualizer。 根据时间序列数据制作动态图表&#xff0c;包含条形图、饼图、堆叠条形图、折线图、堆叠面积图。 可以先看一下官方的示例~ 只需几行代码&#xff0c;就可以制作电脑浏览器发展史的动态…

使用jmeter进行简单压力测试

前言 最近项目要上线,需要项目进行简单的压力测试,本次使用的是jmeter来进行的,由于本人不是专业测试,只是对本次使用过程进行简单的记录. 一.jemeter的下载与安装 我这个已经安装很久了,具体过程这个可以查询下其他博客(偷个懒). 二.使用过程 1.测试计划右击-添加(add)-线…

Kubernetes 创建pod的yaml文件-简单版-nginx

apiVersion: v1 #api文档版本 kind: Pod # 资源类型 Deployment,StatefulSet之类 metadata: #pod元数据 描述信息 name: nginx-demo labels: type: app #自定义标签 version: 1.0.0 # 自定义pod版本 namespace: default spec: #期望Pod按照这里的描述创建 cont…

Python类的定义和使用:什么是类?实在不知道啥叫累!

文章目录 前言1.基础概念2.定义一个 Person 类3.类定义4.类方法定义5.类的继承6.类的公有,私有7.子类调用父类的方法关于Python技术储备一、Python所有方向的学习路线二、Python基础学习视频三、精品Python学习书籍四、Python工具包项目源码合集①Python工具包②Python实战案例…

“第六十五天”

固态硬盘&#xff1a;SSD 原理&#xff1a;基于闪存技术Flash Memory &#xff0c;属于电可擦除ROM&#xff0c;即EEPROM&#xff1b; 由闪存翻译层和存储介质组成&#xff1b;闪存翻译层负责翻译逻辑块号&#xff0c;找到对应页&#xff0c;存储介质是由多个闪存芯片构成的&…

Windows中的Directory Junction和Directory symbolic link

这两者有几点显著的区别&#xff1a; 前者谁都可以创建&#xff0c;后者需要提升至管理员权限才可以创建 这两者创建的手时候都不需要Target存在

C++动态库

C动态库 动态库文件&#xff08;Dynamic Link Library&#xff0c;DLL&#xff09;是程序在运行时所需要调用的库。静态库文件是程序在编译时所需要调用的库。 1 环境介绍 VS版本&#xff1a;VS2017 编程语言&#xff1a;C 2 功能介绍 使用VS2017项目模板创建C动态库生成…

(论文阅读24/100)Visual Tracking with Fully Convolutional Networks

文献阅读笔记&#xff08;sel - CNN&#xff09; 简介 题目 Visual Tracking with Fully Convolutional Networks 作者 Lijun Wang, Wanli Ouyang, Xiaogang Wang, and Huchuan Lu 原文链接 http://202.118.75.4/lu/Paper/ICCV2015/iccv15_lijun.pdf 【DeepLearning】…

AMESim 2021安装教程

主要是AMESim的安装 写在前面&#xff0c;由于项目需要&#xff0c;需要自学AMESim&#xff0c;因此需要安装这个软件&#xff0c;目前仅仅安装使用&#xff0c;还不涉及到与MATLAB的联合仿真&#xff0c;老板说用 RT LAB半实物仿真平台&#xff0c;但是简单搜了一下&#xff0…