数据结构基础-队列

news2024/11/26 12:38:32

队列

概述

计算机科学中,queue 是以顺序的方式维护的一组数据集合,在一端添加数据,从另一端移除数据。习惯来说,添加的一端称为,移除的一端称为,就如同生活中的排队买商品

In computer science, a queue is a collection of entities that are maintained in a sequence and can be modified by the addition of entities at one end of the sequence and the removal of entities from the other end of the sequence

先定义一个简化的队列接口

public interface Queue<E> {

    /**
     * 向队列尾插入值
     * @param value 待插入值
     * @return 插入成功返回 true, 插入失败返回 false
     */
    boolean offer(E value);

    /**
     * 从对列头获取值, 并移除
     * @return 如果队列非空返回对头值, 否则返回 null
     */
    E poll();

    /**
     * 从对列头获取值, 不移除
     * @return 如果队列非空返回对头值, 否则返回 null
     */
    E peek();

    /**
     * 检查队列是否为空
     * @return 空返回 true, 否则返回 false
     */
    boolean isEmpty();

    /**
     * 检查队列是否已满
     * @return 满返回 true, 否则返回 false
     */
    boolean isFull();
}

链表实现

下面以单向环形带哨兵链表方式来实现队列

在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

代码

public class LinkedListQueue<E>
        implements Queue<E>, Iterable<E> {

    private static class Node<E> {
        E value;
        Node<E> next;

        public Node(E value, Node<E> next) {
            this.value = value;
            this.next = next;
        }
    }

    private Node<E> head = new Node<>(null, null);
    private Node<E> tail = head;
    private int size = 0;
    private int capacity = Integer.MAX_VALUE;

    {
        tail.next = head;
    }

    public LinkedListQueue() {
    }

    public LinkedListQueue(int capacity) {
        this.capacity = capacity;
    }

    @Override
    public boolean offer(E value) {
        if (isFull()) {
            return false;
        }
        Node<E> added = new Node<>(value, head);
        tail.next = added;
        tail = added;
        size++;
        return true;
    }

    @Override
    public E poll() {
        if (isEmpty()) {
            return null;
        }
        Node<E> first = head.next;
        head.next = first.next;
        if (first == tail) {
            tail = head;
        }
        size--;
        return first.value;
    }

    @Override
    public E peek() {
        if (isEmpty()) {
            return null;
        }
        return head.next.value;
    }

    @Override
    public boolean isEmpty() {
        return head == tail;
    }

    @Override
    public boolean isFull() {
        return size == capacity;
    }

    @Override
    public Iterator<E> iterator() {
        return new Iterator<E>() {
            Node<E> p = head.next;
            @Override
            public boolean hasNext() {
                return p != head;
            }
            @Override
            public E next() {
                E value = p.value;
                p = p.next;
                return value;
            }
        };
    }
}

环形数组实现

好处

  1. 对比普通数组,起点和终点更为自由,不用考虑数据移动
  2. “环”意味着不会存在【越界】问题
  3. 数组性能更佳
  4. 环形数组比较适合实现有界队列、RingBuffer 等
    在这里插入图片描述

下标计算

例如,数组长度是 5,当前位置是 3 ,向前走 2 步,此时下标为 (3 + 2)%5 = 0
在这里插入图片描述

(cur + step) % length

  • cur 当前指针位置
  • step 前进步数
  • length 数组长度

注意:

  • 如果 step = 1,也就是一次走一步,可以在 >= length 时重置为 0 即可

判断空

在这里插入图片描述

判断满

在这里插入图片描述

满之后的策略可以根据业务需求决定

  • 例如我们要实现的环形队列,满之后就拒绝入队

代码

public class ArrayQueue<E> implements Queue<E>, Iterable<E>{

    private int head = 0;
    private int tail = 0;
    private final E[] array;
    private final int length;

    @SuppressWarnings("all")
    public ArrayQueue(int capacity) {
        length = capacity + 1;
        array = (E[]) new Object[length];
    }

    @Override
    public boolean offer(E value) {
        if (isFull()) {
            return false;
        }
        array[tail] = value;
        tail = (tail + 1) % length;
        return true;
    }

    @Override
    public E poll() {
        if (isEmpty()) {
            return null;
        }
        E value = array[head];
        head = (head + 1) % length;
        return value;
    }

    @Override
    public E peek() {
        if (isEmpty()) {
            return null;
        }
        return array[head];
    }

    @Override
    public boolean isEmpty() {
        return tail == head;
    }

    @Override
    public boolean isFull() {
        return (tail + 1) % length == head;
    }

    @Override
    public Iterator<E> iterator() {
        return new Iterator<E>() {
            int p = head;
            @Override
            public boolean hasNext() {
                return p != tail;
            }

            @Override
            public E next() {
                E value = array[p];
                p = (p + 1) % array.length;
                return value;
            }
        };
    }
}

判断空、满方法2

引入 size

public class ArrayQueue2<E> implements Queue<E>, Iterable<E> {

    private int head = 0;
    private int tail = 0;
    private final E[] array;
    private final int capacity;
    private int size = 0;

    @SuppressWarnings("all")
    public ArrayQueue2(int capacity) {
        this.capacity = capacity;
        array = (E[]) new Object[capacity];
    }

    @Override
    public boolean offer(E value) {
        if (isFull()) {
            return false;
        }
        array[tail] = value;
        tail = (tail + 1) % capacity;
        size++;
        return true;
    }

    @Override
    public E poll() {
        if (isEmpty()) {
            return null;
        }
        E value = array[head];
        head = (head + 1) % capacity;
        size--;
        return value;
    }

    @Override
    public E peek() {
        if (isEmpty()) {
            return null;
        }
        return array[head];
    }

    @Override
    public boolean isEmpty() {
        return size == 0;
    }

    @Override
    public boolean isFull() {
        return size == capacity;
    }

    @Override
    public Iterator<E> iterator() {
        return new Iterator<E>() {
            int p = head;

            @Override
            public boolean hasNext() {
                return p != tail;
            }

            @Override
            public E next() {
                E value = array[p];
                p = (p + 1) % capacity;
                return value;
            }
        };
    }
}

判断空、满方法3

  • head 和 tail 不断递增,用到索引时,再用它们进行计算,两个问题

    • 如何保证 head 和 tail 自增超过正整数最大值的正确性

    • 如何让取模运算性能更高

  • 答案:让 capacity 为 2 的幂

public class ArrayQueue3<E> implements Queue<E>, Iterable<E> {

    private int head = 0;
    private int tail = 0;
    private final E[] array;
    private final int capacity;

    @SuppressWarnings("all")
    public ArrayQueue3(int capacity) {
        if ((capacity & capacity - 1) != 0) {
            throw new IllegalArgumentException("capacity 必须为 2 的幂");
        }
        this.capacity = capacity;
        array = (E[]) new Object[this.capacity];
    }

    @Override
    public boolean offer(E value) {
        if (isFull()) {
            return false;
        }
        array[tail & capacity - 1] = value;
        tail++;
        return true;
    }

    @Override
    public E poll() {
        if (isEmpty()) {
            return null;
        }
        E value = array[head & capacity - 1];
        head++;
        return value;
    }

    @Override
    public E peek() {
        if (isEmpty()) {
            return null;
        }
        return array[head & capacity - 1];
    }

    @Override
    public boolean isEmpty() {
        return tail - head == 0;
    }

    @Override
    public boolean isFull() {
        return tail - head == capacity;
    }

    @Override
    public Iterator<E> iterator() {
        return new Iterator<E>() {
            int p = head;

            @Override
            public boolean hasNext() {
                return p != tail;
            }

            @Override
            public E next() {
                E value = array[p & capacity - 1];
                p++;
                return value;
            }
        };
    }
}

双端队列

概述

双端队列、队列、栈对比

定义特点
队列一端删除(头)另一端添加(尾)First In First Out
一端删除和添加(顶)Last In First Out
双端队列两端都可以删除、添加
优先级队列优先级高者先出队
延时队列根据延时时间确定优先级
并发非阻塞队列队列空或满时不阻塞
并发阻塞队列队列空时删除阻塞、队列满时添加阻塞

注1:

  • Java 中 LinkedList 即为典型双端队列实现,不过它同时实现了 Queue 接口,也提供了栈的 push pop 等方法

注2:

  • 不同语言,操作双端队列的方法命名有所不同,参见下表

    操作JavaJavaScriptC++leetCode 641
    尾部插入offerLastpushpush_backinsertLast
    头部插入offerFirstunshiftpush_frontinsertFront
    尾部移除pollLastpoppop_backdeleteLast
    头部移除pollFirstshiftpop_frontdeleteFront
    尾部获取peekLastat(-1)backgetRear
    头部获取peekFirstat(0)frontgetFront
  • 吐槽一下 leetCode 命名比较 low

  • 常见的单词还有 enqueue 入队、dequeue 出队

接口定义

public interface Deque<E> {

    boolean offerFirst(E e);

    boolean offerLast(E e);

    E pollFirst();

    E pollLast();

    E peekFirst();

    E peekLast();
    
    boolean isEmpty();

    boolean isFull();
}

链表实现

/**
 * 基于环形链表的双端队列
 * @param <E> 元素类型
 */
public class LinkedListDeque<E> implements Deque<E>, Iterable<E> {

    @Override
    public boolean offerFirst(E e) {
        if (isFull()) {
            return false;
        }
        size++;
        Node<E> a = sentinel;
        Node<E> b = sentinel.next;
        Node<E> offered = new Node<>(a, e, b);
        a.next = offered;
        b.prev = offered;
        return true;
    }

    @Override
    public boolean offerLast(E e) {
        if (isFull()) {
            return false;
        }
        size++;
        Node<E> a = sentinel.prev;
        Node<E> b = sentinel;
        Node<E> offered = new Node<>(a, e, b);
        a.next = offered;
        b.prev = offered;
        return true;
    }

    @Override
    public E pollFirst() {
        if (isEmpty()) {
            return null;
        }
        Node<E> a = sentinel;
        Node<E> polled = sentinel.next;
        Node<E> b = polled.next;
        a.next = b;
        b.prev = a;
        size--;
        return polled.value;
    }

    @Override
    public E pollLast() {
        if (isEmpty()) {
            return null;
        }
        Node<E> polled = sentinel.prev;
        Node<E> a = polled.prev;
        Node<E> b = sentinel;
        a.next = b;
        b.prev = a;
        size--;
        return polled.value;
    }

    @Override
    public E peekFirst() {
        if (isEmpty()) {
            return null;
        }
        return sentinel.next.value;
    }

    @Override
    public E peekLast() {
        if (isEmpty()) {
            return null;
        }
        return sentinel.prev.value;
    }

    @Override
    public boolean isEmpty() {
        return size == 0;
    }

    @Override
    public boolean isFull() {
        return size == capacity;
    }

    @Override
    public Iterator<E> iterator() {
        return new Iterator<E>() {
            Node<E> p = sentinel.next;
            @Override
            public boolean hasNext() {
                return p != sentinel;
            }

            @Override
            public E next() {
                E value = p.value;
                p = p.next;
                return value;
            }
        };
    }

    static class Node<E> {
        Node<E> prev;
        E value;
        Node<E> next;

        public Node(Node<E> prev, E value, Node<E> next) {
            this.prev = prev;
            this.value = value;
            this.next = next;
        }
    }

    Node<E> sentinel = new Node<>(null, null, null);
    int capacity;
    int size;

    public LinkedListDeque(int capacity) {
        sentinel.next = sentinel;
        sentinel.prev = sentinel;
        this.capacity = capacity;
    }
}

数组实现

/**
 * 基于循环数组实现, 特点
 * <ul>
 *     <li>tail 停下来的位置不存储, 会浪费一个位置</li>
 * </ul>
 * @param <E>
 */
public class ArrayDeque1<E> implements Deque<E>, Iterable<E> {

    /*
                    h
            t
        0   1   2   3
        b           a
     */
    @Override
    public boolean offerFirst(E e) {
        if (isFull()) {
            return false;
        }
        head = dec(head, array.length);
        array[head] = e;
        return true;
    }

    @Override
    public boolean offerLast(E e) {
        if (isFull()) {
            return false;
        }
        array[tail] = e;
        tail = inc(tail, array.length);
        return true;
    }

    @Override
    public E pollFirst() {
        if (isEmpty()) {
            return null;
        }
        E e = array[head];
        array[head] = null;
        head = inc(head, array.length);
        return e;
    }

    @Override
    public E pollLast() {
        if (isEmpty()) {
            return null;
        }
        tail = dec(tail, array.length);
        E e = array[tail];
        array[tail] = null;
        return e;
    }

    @Override
    public E peekFirst() {
        if (isEmpty()) {
            return null;
        }
        return array[head];
    }

    @Override
    public E peekLast() {
        if (isEmpty()) {
            return null;
        }
        return array[dec(tail, array.length)];
    }

    @Override
    public boolean isEmpty() {
        return head == tail;
    }

    @Override
    public boolean isFull() {
        if (tail > head) {
            return tail - head == array.length - 1;
        } else if (tail < head) {
            return head - tail == 1;
        } else {
            return false;
        }
    }

    @Override
    public Iterator<E> iterator() {
        return new Iterator<E>() {
            int p = head;
            @Override
            public boolean hasNext() {
                return p != tail;
            }

            @Override
            public E next() {
                E e = array[p];
                p = inc(p, array.length);
                return e;
            }
        };
    }

    E[] array;
    int head;
    int tail;

    @SuppressWarnings("unchecked")
    public ArrayDeque1(int capacity) {
        array = (E[]) new Object[capacity + 1];
    }

    static int inc(int i, int length) {
        if (i + 1 >= length) {
            return 0;
        }
        return i + 1;
    }

    static int dec(int i, int length) {
        if (i - 1 < 0) {
            return length - 1;
        }
        return i - 1;
    }
}

数组实现中,如果存储的是基本类型,那么无需考虑内存释放,例如
在这里插入图片描述

但如果存储的是引用类型,应当设置该位置的引用为 null,以便内存及时释放
在这里插入图片描述

优先级队列

无序数组实现

要点

  1. 入队保持顺序
  2. 出队前找到优先级最高的出队,相当于一次选择排序
public class PriorityQueue1<E extends Priority> implements Queue<E> {

    Priority[] array;
    int size;

    public PriorityQueue1(int capacity) {
        array = new Priority[capacity];
    }

    @Override // O(1)
    public boolean offer(E e) {
        if (isFull()) {
            return false;
        }
        array[size++] = e;
        return true;
    }

    // 返回优先级最高的索引值
    private int selectMax() {
        int max = 0;
        for (int i = 1; i < size; i++) {
            if (array[i].priority() > array[max].priority()) {
                max = i;
            }
        }
        return max;
    }

    @Override // O(n)
    public E poll() {
        if (isEmpty()) {
            return null;
        }
        int max = selectMax();
        E e = (E) array[max];
        remove(max);
        return e;
    }

    private void remove(int index) {
        if (index < size - 1) {
            System.arraycopy(array, index + 1,
                    array, index, size - 1 - index);
        }
        array[--size] = null; // help GC
    }

    @Override
    public E peek() {
        if (isEmpty()) {
            return null;
        }
        int max = selectMax();
        return (E) array[max];
    }

    @Override
    public boolean isEmpty() {
        return size == 0;
    }

    @Override
    public boolean isFull() {
        return size == array.length;
    }
}
  • 视频中忘记了 help GC,注意一下

有序数组实现

要点

  1. 入队后排好序,优先级最高的排列在尾部
  2. 出队只需删除尾部元素即可
public class PriorityQueue2<E extends Priority> implements Queue<E> {

    Priority[] array;
    int size;

    public PriorityQueue2(int capacity) {
        array = new Priority[capacity];
    }

    // O(n)
    @Override
    public boolean offer(E e) {
        if (isFull()) {
            return false;
        }
        insert(e);
        size++;
        return true;
    }

    // 一轮插入排序
    private void insert(E e) {
        int i = size - 1;
        while (i >= 0 && array[i].priority() > e.priority()) {
            array[i + 1] = array[i];
            i--;
        }
        array[i + 1] = e;
    }

    // O(1)
    @Override
    public E poll() {
        if (isEmpty()) {
            return null;
        }
        E e = (E) array[size - 1];
        array[--size] = null; // help GC
        return e;
    }

    @Override
    public E peek() {
        if (isEmpty()) {
            return null;
        }
        return (E) array[size - 1];
    }

    @Override
    public boolean isEmpty() {
        return size == 0;
    }

    @Override
    public boolean isFull() {
        return size == array.length;
    }
}

阻塞队列

之前的队列在很多场景下都不能很好地工作,例如

  1. 大部分场景要求分离向队列放入(生产者)、从队列拿出(消费者)两个角色、它们得由不同的线程来担当,而之前的实现根本没有考虑线程安全问题
  2. 队列为空,那么在之前的实现里会返回 null,如果就是硬要拿到一个元素呢?只能不断循环尝试
  3. 队列为满,那么再之前的实现里会返回 false,如果就是硬要塞入一个元素呢?只能不断循环尝试

因此我们需要解决的问题有

  1. 用锁保证线程安全
  2. 用条件变量让等待非空线程等待不满线程进入等待状态,而不是不断循环尝试,让 CPU 空转

有同学对线程安全还没有足够的认识,下面举一个反例,两个线程都要执行入队操作(几乎在同一时刻)

public class TestThreadUnsafe {
    private final String[] array = new String[10];
    private int tail = 0;

    public void offer(String e) {
        array[tail] = e;
        tail++;
    }

    @Override
    public String toString() {
        return Arrays.toString(array);
    }

    public static void main(String[] args) {
        TestThreadUnsafe queue = new TestThreadUnsafe();
        new Thread(()-> queue.offer("e1"), "t1").start();
        new Thread(()-> queue.offer("e2"), "t2").start();
    }
}

执行的时间序列如下,假设初始状态 tail = 0,在执行过程中由于 CPU 在两个线程之间切换,造成了指令交错

线程1线程2说明
array[tail]=e1线程1 向 tail 位置加入 e1 这个元素,但还没来得及执行 tail++
array[tail]=e2线程2 向 tail 位置加入 e2 这个元素,覆盖掉了 e1
tail++tail 自增为1
tail++tail 自增为2
最后状态 tail 为 2,数组为 [e2, null, null …]

糟糕的是,由于指令交错的顺序不同,得到的结果不止以上一种,宏观上造成混乱的效果

单锁实现

Java 中要防止代码段交错执行,需要使用锁,有两种选择

  • synchronized 代码块,属于关键字级别提供锁保护,功能少
  • ReentrantLock 类,功能丰富

以 ReentrantLock 为例

ReentrantLock lock = new ReentrantLock();

public void offer(String e) {
    lock.lockInterruptibly();
    try {
        array[tail] = e;
        tail++;
    } finally {
        lock.unlock();
    }
}

只要两个线程执行上段代码时,锁对象是同一个,就能保证 try 块内的代码的执行不会出现指令交错现象,即执行顺序只可能是下面两种情况之一

线程1线程2说明
lock.lockInterruptibly()t1对锁对象上锁
array[tail]=e1
lock.lockInterruptibly()即使 CPU 切换到线程2,但由于t1已经对该对象上锁,因此线程2卡在这儿进不去
tail++切换回线程1 执行后续代码
lock.unlock()线程1 解锁
array[tail]=e2线程2 此时才能获得锁,执行它的代码
tail++
  • 另一种情况是线程2 先获得锁,线程1 被挡在外面
  • 要明白保护的本质,本例中是保护的是 tail 位置读写的安全

事情还没有完,上面的例子是队列还没有放满的情况,考虑下面的代码(这回锁同时保护了 tail 和 size 的读写安全)

ReentrantLock lock = new ReentrantLock();
int size = 0;

public void offer(String e) {
    lock.lockInterruptibly();
    try {
        if(isFull()) {
            // 满了怎么办?
        }
        array[tail] = e;
        tail++;
        
        size++;
    } finally {
        lock.unlock();
    }
}

private boolean isFull() {
    return size == array.length;
}

之前是返回 false 表示添加失败,前面分析过想达到这么一种效果:

  • 在队列满时,不是立刻返回,而是当前线程进入等待
  • 什么时候队列不满了,再唤醒这个等待的线程,从上次的代码处继续向下运行

ReentrantLock 可以配合条件变量来实现,代码进化为

ReentrantLock lock = new ReentrantLock();
Condition tailWaits = lock.newCondition(); // 条件变量
int size = 0;

public void offer(String e) {
    lock.lockInterruptibly();
    try {
        while (isFull()) {
            tailWaits.await();	// 当队列满时, 当前线程进入 tailWaits 等待
        }
        array[tail] = e;
        tail++;
        
        size++;
    } finally {
        lock.unlock();
    }
}

private boolean isFull() {
    return size == array.length;
}
  • 条件变量底层也是个队列,用来存储这些需要等待的线程,当队列满了,就会将 offer 线程加入条件队列,并暂时释放锁
  • 将来我们的队列如果不满了(由 poll 线程那边得知)可以调用 tailWaits.signal() 来唤醒 tailWaits 中首个等待的线程,被唤醒的线程会再次抢到锁,从上次 await 处继续向下运行

思考为何要用 while 而不是 if,设队列容量是 3

操作前offer(4)offer(5)poll()操作后
[1 2 3]队列满,进入tailWaits 等待[1 2 3]
[1 2 3]取走 1,队列不满,唤醒线程[2 3]
[2 3]抢先获得锁,发现不满,放入 5[2 3 5]
[2 3 5]从上次等待处直接向下执行[2 3 5 ?]

关键点:

  • 从 tailWaits 中唤醒的线程,会与新来的 offer 的线程争抢锁,谁能抢到是不一定的,如果后者先抢到,就会导致条件又发生变化
  • 这种情况称之为虚假唤醒,唤醒后应该重新检查条件,看是不是得重新进入等待

最后的实现代码

/**
 * 单锁实现
 * @param <E> 元素类型
 */
public class BlockingQueue1<E> implements BlockingQueue<E> {
    private final E[] array;
    private int head = 0;
    private int tail = 0;
    private int size = 0; // 元素个数

    @SuppressWarnings("all")
    public BlockingQueue1(int capacity) {
        array = (E[]) new Object[capacity];
    }

    ReentrantLock lock = new ReentrantLock();
    Condition tailWaits = lock.newCondition();
    Condition headWaits = lock.newCondition();

    @Override
    public void offer(E e) throws InterruptedException {
        lock.lockInterruptibly();
        try {
            while (isFull()) {
                tailWaits.await();
            }
            array[tail] = e;
            if (++tail == array.length) {
                tail = 0;
            }
            size++;
            headWaits.signal();
        } finally {
            lock.unlock();
        }
    }

    @Override
    public void offer(E e, long timeout) throws InterruptedException {
        lock.lockInterruptibly();
        try {
            long t = TimeUnit.MILLISECONDS.toNanos(timeout);
            while (isFull()) {
                if (t <= 0) {
                    return;
                }
                t = tailWaits.awaitNanos(t);
            }
            array[tail] = e;
            if (++tail == array.length) {
                tail = 0;
            }
            size++;
            headWaits.signal();
        } finally {
            lock.unlock();
        }
    }

    @Override
    public E poll() throws InterruptedException {
        lock.lockInterruptibly();
        try {
            while (isEmpty()) {
                headWaits.await();
            }
            E e = array[head];
            array[head] = null; // help GC
            if (++head == array.length) {
                head = 0;
            }
            size--;
            tailWaits.signal();
            return e;
        } finally {
            lock.unlock();
        }
    }

    private boolean isEmpty() {
        return size == 0;
    }

    private boolean isFull() {
        return size == array.length;
    }
}
  • public void offer(E e, long timeout) throws InterruptedException 是带超时的版本,可以只等待一段时间,而不是永久等下去,类似的 poll 也可以做带超时的版本,这个留给大家了

注意

  • JDK 中 BlockingQueue 接口的方法命名与我的示例有些差异
    • 方法 offer(E e) 是非阻塞的实现,阻塞实现方法为 put(E e)
    • 方法 poll() 是非阻塞的实现,阻塞实现方法为 take()

双锁实现

单锁的缺点在于:

  • 生产和消费几乎是不冲突的,唯一冲突的是生产者和消费者它们有可能同时修改 size
  • 冲突的主要是生产者之间:多个 offer 线程修改 tail
  • 冲突的还有消费者之间:多个 poll 线程修改 head

如果希望进一步提高性能,可以用两把锁

  • 一把锁保护 tail
  • 另一把锁保护 head
ReentrantLock headLock = new ReentrantLock();  // 保护 head 的锁
Condition headWaits = headLock.newCondition(); // 队列空时,需要等待的线程集合

ReentrantLock tailLock = new ReentrantLock();  // 保护 tail 的锁
Condition tailWaits = tailLock.newCondition(); // 队列满时,需要等待的线程集合

先看看 offer 方法的初步实现

@Override
public void offer(E e) throws InterruptedException {
    tailLock.lockInterruptibly();
    try {
        // 队列满等待
        while (isFull()) {
            tailWaits.await();
        }
        
        // 不满则入队
        array[tail] = e;
        if (++tail == array.length) {
            tail = 0;
        }
        
        // 修改 size (有问题)
        size++;
        
    } finally {
        tailLock.unlock();
    }
}

上面代码的缺点是 size 并不受 tailLock 保护,tailLock 与 headLock 是两把不同的锁,并不能实现互斥的效果。因此,size 需要用下面的代码保证原子性

AtomicInteger size = new AtomicInteger(0);	   // 保护 size 的原子变量

size.getAndIncrement(); // 自增
size.getAndDecrement(); // 自减

代码修改为

@Override
public void offer(E e) throws InterruptedException {
    tailLock.lockInterruptibly();
    try {
        // 队列满等待
        while (isFull()) {
            tailWaits.await();
        }
        
        // 不满则入队
        array[tail] = e;
        if (++tail == array.length) {
            tail = 0;
        }
        
        // 修改 size
        size.getAndIncrement();
        
    } finally {
        tailLock.unlock();
    }
}

对称地,可以写出 poll 方法

@Override
public E poll() throws InterruptedException {
    E e;
    headLock.lockInterruptibly();
    try {
        // 队列空等待
        while (isEmpty()) {
            headWaits.await();
        }
        
        // 不空则出队
        e = array[head];
        if (++head == array.length) {
            head = 0;
        }
        
        // 修改 size
        size.getAndDecrement();
        
    } finally {
        headLock.unlock();
    }
    return e;
}

下面来看一个难题,就是如何通知 headWaits 和 tailWaits 中等待的线程,比如 poll 方法拿走一个元素,通知 tailWaits:我拿走一个,不满了噢,你们可以放了,因此代码改为

@Override
public E poll() throws InterruptedException {
    E e;
    headLock.lockInterruptibly();
    try {
        // 队列空等待
        while (isEmpty()) {
            headWaits.await();
        }
        
        // 不空则出队
        e = array[head];
        if (++head == array.length) {
            head = 0;
        }
        
        // 修改 size
        size.getAndDecrement();
        
        // 通知 tailWaits 不满(有问题)
        tailWaits.signal();
        
    } finally {
        headLock.unlock();
    }
    return e;
}

问题在于要使用这些条件变量的 await(), signal() 等方法需要先获得与之关联的锁,上面的代码若直接运行会出现以下错误

java.lang.IllegalMonitorStateException

加上锁不就行吗,于是写出了下面的代码
在这里插入图片描述

发现什么问题了?两把锁这么嵌套使用,非常容易出现死锁,如下所示

在这里插入图片描述

因此得避免嵌套,两段加锁的代码变成了下面平级的样子
在这里插入图片描述
性能还可以进一步提升

  1. 代码调整后 offer 并没有同时获取 tailLock 和 headLock 两把锁,因此两次加锁之间会有空隙,这个空隙内可能有其它的 offer 线程添加了更多的元素,那么这些线程都要执行 signal(),通知 poll 线程队列非空吗?

    • 每次调用 signal() 都需要这些 offer 线程先获得 headLock 锁,成本较高,要想法减少 offer 线程获得 headLock 锁的次数
    • 可以加一个条件:当 offer 增加前队列为空,即从 0 变化到不空,才由此 offer 线程来通知 headWaits,其它情况不归它管
  2. 队列从 0 变化到不空,会唤醒一个等待的 poll 线程,这个线程被唤醒后,肯定能拿到 headLock 锁,因此它具备了唤醒 headWaits 上其它 poll 线程的先决条件。如果检查出此时有其它 offer 线程新增了元素(不空,但不是从0变化而来),那么不妨由此 poll 线程来唤醒其它 poll 线程

这个技巧被称之为级联通知(cascading notifies),类似的原因

  1. 在 poll 时队列从满变化到不满,才由此 poll 线程来唤醒一个等待的 offer 线程,目的也是为了减少 poll 线程对 tailLock 上锁次数,剩下等待的 offer 线程由这个 offer 线程间接唤醒

最终的代码为

public class BlockingQueue2<E> implements BlockingQueue<E> {

    private final E[] array;
    private int head = 0;
    private int tail = 0;
    private final AtomicInteger size = new AtomicInteger(0);
    ReentrantLock headLock = new ReentrantLock();
    Condition headWaits = headLock.newCondition();
    ReentrantLock tailLock = new ReentrantLock();
    Condition tailWaits = tailLock.newCondition();

    public BlockingQueue2(int capacity) {
        this.array = (E[]) new Object[capacity];
    }

    @Override
    public void offer(E e) throws InterruptedException {
        int c;
        tailLock.lockInterruptibly();
        try {
            while (isFull()) {
                tailWaits.await();
            }
            array[tail] = e;
            if (++tail == array.length) {
                tail = 0;
            }            
            c = size.getAndIncrement();
            // a. 队列不满, 但不是从满->不满, 由此offer线程唤醒其它offer线程
            if (c + 1 < array.length) {
                tailWaits.signal();
            }
        } finally {
            tailLock.unlock();
        }
        // b. 从0->不空, 由此offer线程唤醒等待的poll线程
        if (c == 0) {
            headLock.lock();
            try {
                headWaits.signal();
            } finally {
                headLock.unlock();
            }
        }
    }

    @Override
    public E poll() throws InterruptedException {
        E e;
        int c;
        headLock.lockInterruptibly();
        try {
            while (isEmpty()) {
                headWaits.await(); 
            }
            e = array[head]; 
            if (++head == array.length) {
                head = 0;
            }
            c = size.getAndDecrement();
            // b. 队列不空, 但不是从0变化到不空,由此poll线程通知其它poll线程
            if (c > 1) {
                headWaits.signal();
            }
        } finally {
            headLock.unlock();
        }
        // a. 从满->不满, 由此poll线程唤醒等待的offer线程
        if (c == array.length) {
            tailLock.lock();
            try {
                tailWaits.signal();
            } finally {
                tailLock.unlock();
            }
        }
        return e;
    }

    private boolean isEmpty() {
        return size.get() == 0;
    }

    private boolean isFull() {
        return size.get() == array.length;
    }

}

双锁实现的非常精巧,据说作者 Doug Lea 花了一年的时间才完善了此段代码

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/616055.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

chatgpt赋能python:Python声音分析的应用

Python 声音分析的应用 Python是一种强大的编程语言&#xff0c;具有广泛的应用和使用场景&#xff0c;而其在声音分析领域中的应用也是相当广泛的。本文将会介绍Python在声音分析方面的应用。 什么是声音分析&#xff1f; 声音分析是指通过计算机技术对声音信号进行分析&am…

centos 7 安装部署MySQL主主模式

主机&#xff1a;192.168.1.108&#xff0c;192.168.1.109 192.168.1.108主机上操作 上传mysql安装包&#xff08;略&#xff09; tar zxf mysql.5.7.35.tar.gz –C /data mkdir /var/log/mariadb 使用root用户创建 chown –R unioncloud. /var/log/mariadb 使用root用户执行 切…

SCM Manager XSS漏洞复现(CVE-2023-33829)

一、漏洞描述 漏洞简述 SCM-Manager 是一款开源的版本库管理软件&#xff0c;同时支持 subversion、mercurial、git 的版本库管理。安装简单&#xff0c;功能较强&#xff0c;提供用户、用户组的权限管理 &#xff0c;有丰富的插件支持。由于在MIT的许可下是开源的&#xff0…

如何给证件照替换背景颜色?一键替换证件照背景色的方法

证件照换背景的优点 在申请各种证件时&#xff0c;一张合格的证件照是必不可少的。然而&#xff0c;在拍摄证件照时&#xff0c;往往因为背景、光线等问题导致照片质量不佳。因此&#xff0c;将证件照的背景更换为统一的纯色背景就显得尤为重要。 证件照换背景的主要优点包括…

合并文件解决HiveServer2内存溢出方案

一、文件过多导致HiveServer2内存溢出 1.1查看表文件个数 desc formatted yanyu.tmp• 表文件数量为6522102 1.2查看表文件信息 hadoop fs -ls warehouse/yanyu.db/tmp• 分区为string 类型的time字段&#xff0c;分了2001个区。 1.3.查看某个分区下的文件个数为10000个 …

Jmter压测试

1、常规性能测试--压测 1、添加线程组 线程数模拟用户数&#xff0c;线程数1表示1个用户&#xff0c;如果模拟10个用户就设置线程数为10 Ramp-Up表示在多长时间内开启多少个线程&#xff0c;如果设置为10&#xff0c;表示10s内开启对应的线程数 循环次数 永远表示如果不惦记…

Hibernate+Lombok进行表与表之间关系时插入数据时栈溢出

报错信息如下&#xff1a; 当使用Hibernate和Lombok处理表与表之间的关系时&#xff0c;在插入数据时可能会遇到栈溢出错误。这篇博客将详细讨论此问题的原因&#xff0c;并提供解决办法。 标题: HibernateLombok进行表与表之间关系时插入数据时栈溢出 问题背景 Hibernate是一…

如何在Windows 10中创建屏幕保护程序设置快捷方式

屏幕保护程序是指你在电脑上未处于活动状态并等待指定时间后,电脑屏幕上显示的动态图片或图案。 屏幕保护程序最初用于保护旧的单色显示器免受损坏,但现在它们主要是通过提供密码保护来个性化你的电脑或增强其安全性的一种方式。 一、右键单击或按住桌面上的空白区域,然后…

linuxOPS基础_linux umask

1、什么是umask umask表示创建文件时的默认权限&#xff08;即创建文件时不需要设置而天生的权限&#xff09; 例如&#xff1a; root用户下&#xff0c;touch a &#xff0c;文件a的默认权限是644 普通用户下&#xff0c;touch b &#xff0c;文件b的默认权限是664 644和…

AOSP+WSL+adb搭建安卓开发ebpf环境

0.写在前面 首先我们要明白&#xff0c;安卓的AOSP包含了海量的代码&#xff0c;他包含了包括了&#xff1a; 1.不同架构下&#xff08;音响&#xff0c;手机&#xff0c;电视等等各种基于安卓的设备&#xff09;的上层应用 2.Java API Framework&#xff08;大部分安卓开发…

叉积求二维空间两直线交点以及过两点的直线数学原理

叉积求二维空间两直线交点以及过两点的直线数学原理_wang.chen.xue的博客-CSDN博客

ThreeJS教程:屏幕坐标转标准设备坐标

推荐&#xff1a;将 NSDT场景编辑器 加入你的3D工具链 3D工具集&#xff1a; NSDT简石数字孪生 屏幕坐标转标准设备坐标 在讲解下节课鼠标点击选中模型之前&#xff0c;先给大家讲解下坐标系的问题。 获取鼠标事件坐标 先来了解一些&#xff0c;普通的web前端相关知识。 鼠…

aop原理

1. 使用 1.1 依赖 <dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-aop</artifactId></dependency>1.2 定义切面类 定义一个切面类&#xff0c;指定增强的方法&#xff0c;方法前两个注解必须…

Axure教程—滚动加载(中继器 )

本文将教大家如何用AXURE中的中继器制作滚动加载效果 一、效果介绍 如图&#xff1a; 预览地址&#xff1a;https://awjggr.axshare.com 下载地址&#xff1a;https://download.csdn.net/download/weixin_43516258/87867798?spm1001.2014.3001.5503 二、功能介绍 向下滚动鼠…

联想YOGA Pro14s电脑运行时总是蓝屏怎么办?

联想YOGA Pro14s电脑运行时总是蓝屏怎么办&#xff1f;最近有用户在使用电脑的时候&#xff0c;电脑总是会自动变成蓝屏&#xff0c;导致自己的操作中断。那么遇到这个情况要怎么去进行问题的解决呢&#xff1f;接下来我们来看看以下的详细解决方法分享吧。 准备工作&#xff1…

通过python封装关键词搜索1688商品列表数据API、1688商品列表API接口、1688API接口

1688商品详情接口是一种用于访问阿里巴巴旗下的批发市场平台上的商品列表信息的API接口。通过该接口&#xff0c;可以获取商品的详细信息&#xff0c;包括商品名称、规格、价格、描述、图片等。这些信息对于买家和卖家来说都非常重要&#xff0c;可以帮助他们更好地了解商品&am…

MyBatisPlus3-条件查询和映射问题(字段、表名)

1. 条件查询三种方式 条件查询多用第三种&#xff1b; 链式写表示且的关系&#xff0c;中间加上or()表示或的关系&#xff1b; 给出相应示例代码&#xff1a; Test public void testGetList(){//方式一&#xff1a;按条件查询/*QueryWrapper<User> userQueryWrapper new…

基于WebGL的智慧化工三维可视化管理系统

前言 作为全球化学品第一生产大国&#xff0c;我国危险化学品规模总量大、涉及品种多、应用范围广、管理链条长、安全风险高&#xff0c;历来是防范化解重大安全风险的重点领域。 危险化学品领域频繁发生的典型事故&#xff0c;暴露出传统安全风险管控手段问题突出。 建设背景…

【裸机驱动LED】使用汇编代码驱动LED(一)—— 寄存器解析篇

为了后续使用C语言驱动LED&#xff0c;事先学习汇编代码驱动LED&#xff0c;有如下好处&#xff1a; 熟悉一些基本的汇编语法了解驱动LED的基本流程了解驱动LED需要用到哪些寄存器作为一个初学者&#xff0c;可以锻炼自己阅读开发文档的能力 本文的主要目的是了解驱动LED的基…

【手撕Spring源码】SpringBoot启动过程中发生了什么?

文章目录 SpringBoot启动过程启动详解启动演示启动过程总结 SpringBoot启动过程 启动详解 SpringBoot的启动分为两个部分&#xff1a; 构造SpringApplication执行run方法 接下来我们先来看看构造方法里面都做了什么事情。 第一步&#xff1a;记录 BeanDefinition 源 大家知…