阻塞队列、阻塞队列的实现原理、七种阻塞队列分析及源码解读、使用阻 塞队列来实现生产者-消费者模型

news2024/11/23 22:18:43

文章目录

  • 面试回答参考语术
  • 七种队列分析及源码解读
    • ArrayBlockingQueue
      • 2.1.0 ArrayBlockingQueue分析
      • 2.1.1 ArrayBlockingQueue源码解读:
    • LinkedBlockingQueue
      • 2.2.0 LinkedBlockingQueue分析
      • 2.2.1 LinkedBlockingQueue源码解读
    • 2.3 LinkedBlockingQueue 与 ArrayBlockingQueue 对比
    • 2.4 PriorityBlockingQueue
    • 2.5 DelayQueue
    • 2.6 SynchronousQueue
    • 2.6.0 Coding
      • 源码解读
    • 2.7 分析transferQueue的实现
    • 2.8 LinkedTransferQueue
    • 2.9 LinkedBlockingDeque
  • 阻塞队列使用场景
  • 生产者消费者模式

面试回答参考语术

阻塞队列(BlockingQueue)是一个支持两个附加操作的队列。

这两个附加的操作是:在队列为空时,获取元素的线程会等待队列变为非空。当队列满时,存储元
素的线程会等待队列可用。

阻塞队列常用于生产者和消费者的场景,生产者是往队列里添加元素的线程,消费者是从队列里拿
元素的线程。阻塞队列就是生产者存放元素的容器,而消费者也只从容器里拿元素。

JDK7提供了7个阻塞队列。分别是:

  • ArrayBlockingQueue :一个由数组结构组成的有界阻塞队列。
  • LinkedBlockingQueue :一个由链表结构组成的有界阻塞队列。
  • PriorityBlockingQueue :一个支持优先级排序的无界阻塞队列。
  • DelayQueue:一个使用优先级队列实现的无界阻塞队列。
  • SynchronousQueue:一个不存储元素的阻塞队列。
  • LinkedTransferQueue:一个由链表结构组成的无界阻塞队列。
  • LinkedBlockingDeque:一个由链表结构组成的双向阻塞队列。

Java 5之前实现同步存取时,可以使用普通的一个集合,然后在使用线程的协作和线程同步可以实
现生产者,消费者模式,主要的技术就是用好,wait ,notify,notifyAll,sychronized这些关键字。而
在java 5之后,可以使用阻塞队列来实现,此方式大大简少了代码量,使得多线程编程更加容易,
安全方面也有保障。

BlockingQueue接口是Queue的子接口,它的主要用途并不是作为容器,而是作为线程同步的的工
具,因此他具有一个很明显的特性,当生产者线程试图向BlockingQueue放入元素时,如果队列已
满,则线程被阻塞,当消费者线程试图从中取出一个元素时,如果队列为空,则该线程会被阻塞,
正是因为它所具有这个特性,所以在程序中多个线程交替向BlockingQueue中放入元素,取出元
素,它可以很好的控制线程之间的通信。

阻塞队列使用最经典的场景就是socket客户端数据的读取和解析,读取数据的线程不断将数据放入
队列,然后解析线程不断从队列取数据解析。

七种队列分析及源码解读

ArrayBlockingQueue

2.1.0 ArrayBlockingQueue分析

ArrayBlockingQueue,一个由数组实现的有界阻塞队列。该队列采用先进先出(FIFO)的原则对元素进行排序添加的。

ArrayBlockingQueue 为有界且固定,其大小在构造时由构造函数来决定,确认之后就不能再改变了。

ArrayBlockingQueue 支持对等待的生产者线程和使用者线程进行排序的可选公平策略,但是在默认情况下不保证线程公平的访问,在构造时可以选择公平策略(fair = true)。公平性通常会降低吞吐量,但是减少了可变性和避免了“不平衡性”。(ArrayBlockingQueue 内部的阻塞队列是通过 ReentrantLock 和 Condition 条件队列实现的, 所以 ArrayBlockingQueue 中的元素存在公平和非公平访问的区别)

所谓公平访问队列是指阻塞的所有生产者线程或消费者线程,当队列可用时,可以按照阻塞的先后顺序访问队列,即先阻塞的生产者线程,可以先往队列里插入元素,先阻塞的消费者线程,可以先从队列里获取元素,可以保证先进先出,避免饥饿现象

2.1.1 ArrayBlockingQueue源码解读:

public class ArrayBlockingQueue<E> extends AbstractQueue<E>
        implements BlockingQueue<E>, java.io.Serializable {

    // 通过数组来实现的队列
    final Object[] items;
    //记录队首元素的下标
    int takeIndex;
    //记录队尾元素的下标
    int putIndex;
    //队列中的元素个数
    int count;
    //通过ReentrantLock来实现同步
    final ReentrantLock lock;
    //有2个条件对象,分别表示队列不为空和队列不满的情况
    private final Condition notEmpty;
    private final Condition notFull;
    //迭代器
    transient Itrs itrs;

    //offer方法用于向队列中添加数据
    public boolean offer(E e) {
        // 可以看出添加的数据不支持null值
        checkNotNull(e);
        final ReentrantLock lock = this.lock;
        //通过重入锁来实现同步
        lock.lock();
        try {
          //如果队列已经满了的话直接就返回false,不会阻塞调用这个offer方法的线程
            if (count == items.length)
                return false;
            else {
               //如果队列没有满,就调用enqueue方法将元素添加到队列中
                enqueue(e);
                return true;
            }
        } finally {
            lock.unlock();
        }
    }

    //多了个等待时间的 offer方法
    public boolean offer(E e, long timeout, TimeUnit unit)
        throws InterruptedException {

        checkNotNull(e);
        long nanos = unit.toNanos(timeout);
        final ReentrantLock lock = this.lock;
        //获取可中断锁
        lock.lockInterruptibly();
        try {
            while (count == items.length) {
                if (nanos <= 0)
                    return false;
                //等待设置的时间
                nanos = notFull.awaitNanos(nanos);
            }
           //如果等待时间过了,队列有空间的话就会调用enqueue方法将元素添加到队列
            enqueue(e);
            return true;
        } finally {
            lock.unlock();
        }
    }

    //将数据添加到队列中的具体方法
    private void enqueue(E x) {
        // assert lock.getHoldCount() == 1;
        // assert items[putIndex] == null;
        final Object[] items = this.items;
        items[putIndex] = x;
       //通过循环数组实现的队列,当数组满了时下标就变成0了
        if (++putIndex == items.length)
            putIndex = 0;
        count++;
       //激活因为notEmpty条件而阻塞的线程,比如调用take方法的线程
        notEmpty.signal();
    }

    //将数据从队列中取出的方法
    private E dequeue() {
        // assert lock.getHoldCount() == 1;
        // assert items[takeIndex] != null;
        final Object[] items = this.items;
        @SuppressWarnings("unchecked")
        E x = (E) items[takeIndex];
        //将对应的数组下标位置设置为null释放资源
        items[takeIndex] = null;
        if (++takeIndex == items.length)
            takeIndex = 0;
        count--;
        if (itrs != null)
            itrs.elementDequeued();
       //激活因为notFull条件而阻塞的线程,比如调用put方法的线程
        notFull.signal();
        return x;
    }

    //put方法和offer方法不一样的地方在于,如果队列是满的话,它就会把调用put方法的线程阻塞,直到队列里有空间
    public void put(E e) throws InterruptedException {
        checkNotNull(e);
        final ReentrantLock lock = this.lock;
       //因为后面调用了条件变量的await()方法,而await()方法会在中断标志设置后抛出InterruptedException异常后退出,
      // 所以在加锁时候先看中断标志是不是被设置了,如果设置了直接抛出InterruptedException异常,就不用再去获取锁了
        lock.lockInterruptibly();
        try {
            while (count == items.length)
                //如果队列满的话就阻塞等待,直到notFull的signal方法被调用,也就是队列里有空间了
                notFull.await();
           //队列里有空间了执行添加操作
            enqueue(e);
        } finally {
            lock.unlock();
        }
    }

    //poll方法用于从队列中取数据,不会阻塞当前线程
    public E poll() {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            //如果队列为空的话会直接返回null,否则调用dequeue方法取数据
            return (count == 0) ? null : dequeue();
        } finally {
            lock.unlock();
        }
    }
    //有等待时间的 poll 重载方法
    public E poll(long timeout, TimeUnit unit) throws InterruptedException {
        long nanos = unit.toNanos(timeout);
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            while (count == 0) {
                if (nanos <= 0)
                    return null;
                nanos = notEmpty.awaitNanos(nanos);
            }
            return dequeue();
        } finally {
            lock.unlock();
        }
    }

    //take方法也是用于取队列中的数据,但是和poll方法不同的是它有可能会阻塞当前的线程
    public E take() throws InterruptedException {
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            //当队列为空时,就会阻塞当前线程
            while (count == 0)
                notEmpty.await();
            //直到队列中有数据了,调用dequeue方法将数据返回
            return dequeue();
        } finally {
            lock.unlock();
        }
    }

    //返回队首元素
    public E peek() {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            return itemAt(takeIndex); // null when queue is empty
        } finally {
            lock.unlock();
        }
    }

    //获取队列的元素个数,加了锁,所以结果是准确的
    public int size() {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            return count;
        } finally {
            lock.unlock();
        }
    }
    
    // 此外,还有一些其他方法

    //返回队列剩余空间,还能加几个元素
    public int remainingCapacity() {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            return items.length - count;
        } finally {
            lock.unlock();
        }
    }
    
    // 判断队列中是否存在当前元素o
		public boolean contains(Object o){}
    
    // 返回一个按正确顺序,包含队列中所有元素的数组
		public Object[] toArray(){}
		
		// 自动清空队列中的所有元素
		public void clear(){}
		
		// 移除队列中所有可用元素,并将他们加入到给定的 Collection 中    
		public int drainTo(Collection<? super E> c){}
		
		// 返回此队列中按正确顺序进行迭代的,包含所有元素的迭代器
		public Iterator<E> iterator()
}

LinkedBlockingQueue

2.2.0 LinkedBlockingQueue分析

LinkedBlockingQueue 是一个用单向链表实现的有界阻塞队列。此队列的默认和最大长度为 Integer.MAX_VALUE。此队列按照先进先出的原则对元素进行排序。

如果不是特殊业务,LinkedBlockingQueue 使用时,切记要定义容量 new LinkedBlockingQueue(capacity)

,防止过度膨胀。

2.2.1 LinkedBlockingQueue源码解读

public class LinkedBlockingQueue<E> extends AbstractQueue<E>
        implements BlockingQueue<E>, java.io.Serializable {
    private static final long serialVersionUID = -6903933977591709194L;

    // 基于链表实现,肯定要有结点类,典型的单链表结构
    static class Node<E> {
        E item;
        Node<E> next;
        Node(E x) { item = x; }
    }

    //容量
    private final int capacity;

    //当前队列元素数量
    private final AtomicInteger count = new AtomicInteger();

    // 头节点,不存数据
    transient Node<E> head;

 		// 尾节点,便于入队
    private transient Node<E> last;

    // take锁,出队锁,只有take,poll方法会持有
    private final ReentrantLock takeLock = new ReentrantLock();

    // 出队等待条件
		// 当队列无元素时,take锁会阻塞在notEmpty条件上,等待其它线程唤醒
    private final Condition notEmpty = takeLock.newCondition();

    // 入队锁,只有put,offer会持有
    private final ReentrantLock putLock = new ReentrantLock();

    // 入队等待条件
	  // 当队列满了时,put锁会会阻塞在notFull上,等待其它线程唤醒
    private final Condition notFull = putLock.newCondition();

    //同样提供三个构造器
    public LinkedBlockingQueue(int capacity) {
        if (capacity <= 0) throw new IllegalArgumentException();
       // 初始化head和last指针为空值节点
        this.capacity = capacity;
        last = head = new Node<E>(null);
    }
    
    public LinkedBlockingQueue() {
        // 如果没传容量,就使用最大int值初始化其容量
        this(Integer.MAX_VALUE);
    }

    public LinkedBlockingQueue(Collection<? extends E> c) {}
    
    //入队
    public void put(E e) throws InterruptedException {
        // 不允许null元素
        if (e == null) throw new NullPointerException();
        //规定给当前put方法预留一个本地变量
        int c = -1;
        // 新建一个节点
        Node<E> node = new Node<E>(e);
        final ReentrantLock putLock = this.putLock;
        final AtomicInteger count = this.count;
        // 使用put锁加锁
        putLock.lockInterruptibly();
        try {
					// 如果队列满了,就阻塞在notFull条件上
        	// 等待被其它线程唤醒
            while (count.get() == capacity) {
                notFull.await();
            }
            // 队列不满了,就入队
            enqueue(node);
            // 队列长度加1
            c = count.getAndIncrement();
            // 如果现队列长度小于容量
        		// 就再唤醒一个阻塞在notFull条件上的线程
            // 这里为啥要唤醒一下呢?
            // 因为可能有很多线程阻塞在notFull这个条件上的
            // 而取元素时只有取之前队列是满的才会唤醒notFull
            // 为什么队列满的才唤醒notFull呢?
            // 因为唤醒是需要加putLock的,这是为了减少锁的次数
            // 所以,这里索性在放完元素就检测一下,未满就唤醒其它notFull上的线程
            // 说白了,这也是锁分离带来的代价
            if (c + 1 < capacity)
                notFull.signal();
        } finally {
			// 释放锁
            putLock.unlock();
        }
        // 如果原队列长度为0,现在加了一个元素后立即唤醒notEmpty条件
        if (c == 0)
            signalNotEmpty();
    }
    
    private void signalNotEmpty() {
        final ReentrantLock takeLock = this.takeLock;
        // 加take锁
        takeLock.lock();
        try {
            // 唤醒notEmpty条件
            notEmpty.signal();
        } finally {
            takeLock.unlock();
        }
    }


    private void signalNotFull() {
        final ReentrantLock putLock = this.putLock;
        putLock.lock();
        try {
            notFull.signal();
        } finally {
            putLock.unlock();
        }
    }


    private void enqueue(Node<E> node) {
        // 直接加到last后面
        last = last.next = node;
    }

    public boolean offer(E e) {
		//用带过期时间的说明
    }

    public boolean offer(E e, long timeout, TimeUnit unit)
        throws InterruptedException {

        if (e == null) throw new NullPointerException();
        //转换为纳秒
        long nanos = unit.toNanos(timeout);
        int c = -1;
        final ReentrantLock putLock = this.putLock;
        final AtomicInteger count = this.count;
        //获取入队锁,支持等待锁的过程中被中断
        putLock.lockInterruptibly();
        try {
            //队列满了,再看看有没有超时
            while (count.get() == capacity) {
                if (nanos <= 0)
                    //等待时间超时
                    return false;
                //进行等待,awaitNanos(long nanos)是AQS中的方法
                //在等待过程中,如果被唤醒或超时,则继续当前循环
                //如果被中断,则抛出中断异常
                nanos = notFull.awaitNanos(nanos);
            }
            //进入队尾
            enqueue(new Node<E>(e));
            c = count.getAndIncrement();
            //说明当前元素后面还能再插入一个
            //就唤醒一个入队条件队列中阻塞的线程
            if (c + 1 < capacity)
                notFull.signal();
        } finally {
            putLock.unlock();
        }
        //节点数量为0,说明队列是空的
        if (c == 0)
            //唤醒一个出队条件队列阻塞的线程
            signalNotEmpty();
        return true;
    }

    public E take() throws InterruptedException {
        E x;
        int c = -1;
        final AtomicInteger count = this.count;
        final ReentrantLock takeLock = this.takeLock;
        takeLock.lockInterruptibly();
        try {
            // 如果队列无元素,则阻塞在notEmpty条件上
            while (count.get() == 0) {
                notEmpty.await();
            }
            // 否则,出队
            x = dequeue();
            // 获取出队前队列的长度
            c = count.getAndDecrement();
            // 如果取之前队列长度大于1,则唤醒notEmpty
            if (c > 1)
                notEmpty.signal();
        } finally {
            takeLock.unlock();
        }
        // 如果取之前队列长度等于容量
    	 // 则唤醒notFull
        if (c == capacity)
            signalNotFull();
        return x;
    }
    
    private E dequeue() {
        Node<E> h = head;
        Node<E> first = h.next;
        h.next = h; // help GC
        head = first;
        E x = first.item;
        first.item = null;
        return x;
    }

    public E poll(long timeout, TimeUnit unit) throws InterruptedException {
        E x = null;
        int c = -1;
        long nanos = unit.toNanos(timeout);
        final AtomicInteger count = this.count;
        final ReentrantLock takeLock = this.takeLock;
        takeLock.lockInterruptibly();
        try {
            while (count.get() == 0) {
                //队列为空且已经超时,直接返回空
                if (nanos <= 0)
                    return null;
                //等待过程中可能被唤醒,超时,中断
                nanos = notEmpty.awaitNanos(nanos);
            }
            //进行出队操作
            x = dequeue();
            c = count.getAndDecrement();
            if (c > 1)
                notEmpty.signal();
        } finally {
            takeLock.unlock();
        }
        //如果出队前,队列是满的,则唤醒一个被take()阻塞的线程
        if (c == capacity)
            signalNotFull();
        return x;
    }

    public E poll() {
		//
    }

    public E peek() {
        if (count.get() == 0)
            return null;
        final ReentrantLock takeLock = this.takeLock;
        takeLock.lock();
        try {
            Node<E> first = head.next;
            if (first == null)
                return null;
            else
                return first.item;
        } finally {
            takeLock.unlock();
        }
    }

    void unlink(Node<E> p, Node<E> trail) {
        // assert isFullyLocked();
        // p.next is not changed, to allow iterators that are
        // traversing p to maintain their weak-consistency guarantee.
        p.item = null;
        trail.next = p.next;
        if (last == p)
            last = trail;
        if (count.getAndDecrement() == capacity)
            notFull.signal();
    }

    public boolean remove(Object o) {
        if (o == null) return false;
        fullyLock();
        try {
            for (Node<E> trail = head, p = trail.next;
                 p != null;
                 trail = p, p = p.next) {
                if (o.equals(p.item)) {
                    unlink(p, trail);
                    return true;
                }
            }
            return false;
        } finally {
            fullyUnlock();
        }
    }

    public boolean contains(Object o) {
    }
   
    static final class LBQSpliterator<E> implements Spliterator<E> {
      
    }
}

2.3 LinkedBlockingQueue 与 ArrayBlockingQueue 对比

  1. ArrayBlockingQueue 入队出队采用一把锁,导致入队出队相互阻塞,效率低下;
  2. LinkedBlockingQueue 入队出队采用两把锁,入队出队互不干扰,效率较高;
  3. 二者都是有界队列,如果长度相等且出队速度跟不上入队速度,都会导致大量线程阻塞;
  4. LinkedBlockingQueue 如果初始化不传入初始容量,则使用最大 int 值,如果出队速度跟不上入队速度,会导致队列特别长,占用大量内存;

2.4 PriorityBlockingQueue

PriorityBlockingQueue 是一个支持优先级的无界阻塞队列。(虽说是无界队列,但是由于资源耗尽的话,也会OutOfMemoryError,无法添加元素)

默认情况下元素采用自然顺序升序排列。也可以自定义类实现 compareTo() 方法来指定元素排序规则,或者初始化 PriorityBlockingQueue 时,指定构造参数 Comparator 来对元素进行排序。但需要注意的是不能保证同优先级元素的顺序。PriorityBlockingQueue 是基于最小二叉堆实现,使用基于 CAS 实现的自旋锁来控制队列的动态扩容,保证了扩容操作不会阻塞 take 操作的执行。

2.5 DelayQueue

DelayQueue 是一个使用优先级队列实现的延迟无界阻塞队列。

队列使用 PriorityQueue 来实现。队列中的元素必须实现 Delayed 接口,在创建元素时可以指定多久才能从队列中获取当前元素。只有在延迟期满时才能从队列中提取元素。我们可以将 DelayQueue 运用在以下应用场景:

  1. 缓存系统的设计:可以用 DelayQueue 保存缓存元素的有效期,使用一个线程循环查询 DelayQueue,一旦能从DelayQueue 中获取元素时,表示缓存有效期到了。
  2. 定时任务调度。使用 DelayQueue 保存当天将会执行的任务和执行时间,一旦从 DelayQueue 中获取到任务就开始执行,从比如 Timer 就是使用 DelayQueue 实现的。

2.6 SynchronousQueue

SynchronousQueue 是一个不存储元素的阻塞队列,也即是单个元素的队列。

每一个 put 操作必须等待一个 take 操作,否则不能继续添加元素。SynchronousQueue 可以看成是一个传球手,负责把生产者线程处理的数据直接传递给消费者线程。队列本身并不存储任何元素,非常适合于传递性场景, 比如在一个线程中使用的数据,传递给另外一个线程使用,SynchronousQueue 的吞吐量高于 LinkedBlockingQueue 和 ArrayBlockingQueue。

2.6.0 Coding

synchronousQueue 是一个没有数据缓冲的阻塞队列,生产者线程对其的插入操作 put() 必须等待消费者的移除操作 take(),反过来也一样。

对应 peek, contains, clear, isEmpty … 等方法其实是无效的。

但是 poll() 和 offer() 就不会阻塞,举例来说就是 offer 的时候如果有消费者在等待那么就会立马满足返回 true,如果没有就会返回 false,不会等待消费者到来。

public class SynchronousQueueDemo {
    public static void main(String[] args) {
        BlockingQueue<String> queue = new SynchronousQueue<>();
				
      	//System.out.println(queue.offer("aaa"));   //false
        //System.out.println(queue.poll());         //null

        System.out.println(queue.add("bbb"));      //IllegalStateException: Queue full
      
        new Thread(()->{
            try {
                System.out.println("Thread 1 put a");
                queue.put("a");

                System.out.println("Thread 1 put b");
                queue.put("b");

                System.out.println("Thread 1 put c");
                queue.put("c");
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }).start();

        new Thread(()->{
            try {
                TimeUnit.SECONDS.sleep(2);
                System.out.println("Thread 2 get:"+queue.take());

                TimeUnit.SECONDS.sleep(2);
                System.out.println("Thread 2 get:"+queue.take());

                TimeUnit.SECONDS.sleep(2);
                System.out.println("Thread 2 get:"+queue.take());
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }).start();
    }
}
Thread 1 put a
Thread 2 get:a
Thread 1 put b
Thread 2 get:b
Thread 1 put c
Thread 2 get:c

源码解读

不像ArrayBlockingQueue、LinkedBlockingDeque之类的阻塞队列依赖AQS实现并发操作,SynchronousQueue直接使用CAS实现线程的安全访问。

synchronousQueue 提供了两个构造器(公平与否),内部是通过 Transferer 来实现的,具体分为两个Transferer,分别是 TransferStack 和 TransferQueue。

TransferStack:非公平竞争模式使用的数据结构是后进先出栈(LIFO Stack)

TransferQueue:公平竞争模式则使用先进先出队列(FIFO Queue)

性能上两者是相当的,一般情况下,FIFO 通常可以支持更大的吞吐量,但 LIFO 可以更大程度的保持线程的本地化。

不像ArrayBlockingQueue、LinkedBlockingDeque之类的阻塞队列依赖AQS实现并发操作,SynchronousQueue直接使用CAS实现线程的安全访问。

synchronousQueue 提供了两个构造器(公平与否),内部是通过 Transferer 来实现的,具体分为两个Transferer,分别是 TransferStack 和 TransferQueue。

TransferStack:非公平竞争模式使用的数据结构是后进先出栈(LIFO Stack)

TransferQueue:公平竞争模式则使用先进先出队列(FIFO Queue)

性能上两者是相当的,一般情况下,FIFO 通常可以支持更大的吞吐量,但 LIFO 可以更大程度的保持线程的本地化。

2.7 分析transferQueue的实现

//构造函数中会初始化一个出队的节点,并且首尾都指向这个节点
TransferQueue() {
    QNode h = new QNode(null, false); // initialize to dummy node.
    head = h;
    tail = h;
}
//队列节点,
static final class QNode {
  volatile QNode next;          // next node in queue
  volatile Object item;         // CAS'ed to or from null
  volatile Thread waiter;       // to control park/unpark
  final boolean isData;

  QNode(Object item, boolean isData) {
    this.item = item;
    this.isData = isData;
  }
	// 设置next和item的值,用于进行并发更新, cas 无锁操作
  boolean casNext(QNode cmp, QNode val) {
    return next == cmp &&
      UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val);
  }

  boolean casItem(Object cmp, Object val) {
    return item == cmp &&
      UNSAFE.compareAndSwapObject(this, itemOffset, cmp, val);
  }

  void tryCancel(Object cmp) {
    UNSAFE.compareAndSwapObject(this, itemOffset, cmp, this);
  }

  boolean isCancelled() {
    return item == this;
  }

  boolean isOffList() {
    return next == this;
  }

  // Unsafe mechanics
  private static final sun.misc.Unsafe UNSAFE;
  private static final long itemOffset;
  private static final long nextOffset;

  static {
    try {
      UNSAFE = sun.misc.Unsafe.getUnsafe();
      Class<?> k = QNode.class;
      itemOffset = UNSAFE.objectFieldOffset
        (k.getDeclaredField("item"));
      nextOffset = UNSAFE.objectFieldOffset
        (k.getDeclaredField("next"));
    } catch (Exception e) {
      throw new Error(e);
    }
  }
}

put() 方法和 take() 方法可以看出最终调用的都是 TransferQueue 的 transfer() 方法。

public void put(E e) throws InterruptedException {
    if (e == null) throw new NullPointerException();
    if (transferer.transfer(e, false, 0) == null) {
        Thread.interrupted();
        throw new InterruptedException();
    }
}

public E take() throws InterruptedException {
  E e = transferer.transfer(null, false, 0);
  if (e != null)
    return e;
  Thread.interrupted();
  throw new InterruptedException();
}

//transfer方法用于提交数据或者是获取数据
E transfer(E e, boolean timed, long nanos) {

  QNode s = null; // constructed/reused as needed
  //如果e不为null,就说明是添加数据的入队操作
  boolean isData = (e != null);

  for (;;) {
    QNode t = tail;
    QNode h = head;
    if (t == null || h == null)         // saw uninitialized value
      continue;                       // spin

    //如果当前操作和 tail 节点的操作是一样的;或者头尾相同(表明队列中啥都没有)。
    if (h == t || t.isData == isData) { // empty or same-mode
      QNode tn = t.next;
      // 如果 t 和 tail 不一样,说明,tail 被其他的线程改了,重来
      if (t != tail)                  // inconsistent read
        continue;
      // 如果 tail 的 next 不是空。就需要将 next 追加到 tail 后面了
      if (tn != null) {               // lagging tail
        // 使用 CAS 将 tail.next 变成 tail,
        advanceTail(t, tn);
        continue;
      }
      // 时间到了,不等待,返回 null,插入失败,获取也是失败的
      if (timed && nanos <= 0)        // can't wait
        return null;
      if (s == null)
        s = new QNode(e, isData);
      if (!t.casNext(null, s))        // failed to link in
        continue;

      advanceTail(t, s);              // swing tail and wait
      Object x = awaitFulfill(s, e, timed, nanos);
      if (x == s) {                   // wait was cancelled
        clean(t, s);
        return null;
      }

      if (!s.isOffList()) {           // not already unlinked
        advanceHead(t, s);          // unlink if head
        if (x != null)              // and forget fields
          s.item = s;
        s.waiter = null;
      }
      return (x != null) ? (E)x : e;

    } else {                            // complementary-mode
      QNode m = h.next;               // node to fulfill
      if (t != tail || m == null || h != head)
        continue;                   // inconsistent read

      Object x = m.item;
      if (isData == (x != null) ||    // m already fulfilled
          x == m ||                   // m cancelled
          !m.casItem(x, e)) {         // lost CAS
        advanceHead(h, m);          // dequeue and retry
        continue;
      }

      advanceHead(h, m);              // successfully fulfilled
      LockSupport.unpark(m.waiter);
      return (x != null) ? (E)x : e;
    }
  }
}

2.8 LinkedTransferQueue

LinkedTransferQueue 是一个由链表结构组成的无界阻塞 TransferQueue 队列。

LinkedTransferQueue采用一种预占模式。意思就是消费者线程取元素时,如果队列不为空,则直接取走数据,若队列为空,那就生成一个节点(节点元素为null)入队,然后消费者线程被等待在这个节点上,后面生产者线程入队时发现有一个元素为null的节点,生产者线程就不入队了,直接就将元素填充到该节点,并唤醒该节点等待的线程,被唤醒的消费者线程取走元素,从调用的方法返回。我们称这种节点操作为“匹配”方式。

队列实现了 TransferQueue 接口重写了 tryTransfer 和 transfer 方法,这组方法和 SynchronousQueue 公平模式的队列类似,具有匹配的功能

2.9 LinkedBlockingDeque

LinkedBlockingDeque 是一个由链表结构组成的双向阻塞队列。

所谓双向队列指的你可以从队列的两端插入和移出元素。双端队列因为多了一个操作队列的入口,在多线程同时入队时,也就减少了一半的竞争。相比其他的阻塞队列,LinkedBlockingDeque 多了 addFirst,addLast,offerFirst,offerLast,peekFirst,peekLast 等方法,以 First 单词结尾的方法,表示插入,获取(peek)或移除双端队列的第一个元素。以 Last 单词结尾的方法,表示插入,获取或移除双端队列的最后一个元素。另外插入方法 add 等同于 addLast,移除方法 remove 等效于 removeFirst。

在初始化 LinkedBlockingDeque 时可以设置容量防止其过渡膨胀,默认容量也是 Integer.MAX_VALUE。另外双向阻塞队列可以运用在“工作窃取”模式中。

阻塞队列使用场景

我们常用的生产者消费者模式就可以基于阻塞队列实现;

线程池中活跃线程数达到 corePoolSize 时,线程池将会将后续的 task 提交到 BlockingQueue 中;

生产者消费者模式

JDK API文档的 BlockingQueue 给出了一个典型的应用
在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/337319.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

【浅学Redis】Spring Cache的基础使用

用SpringCache操作Redis缓存数据1. Spring Cache是什么2. Spring Cache 常用注释3. Spring Cache 的使用步骤4. 使用Spring Cache操作Redis1. Spring Cache是什么 Spring Cache是一个框架&#xff0c;实现了基于注解的缓存功能&#xff0c;只需要简单的加一个注解&#xff0c;…

计算机视觉框架OpenMMLab开源学习(六):语义分割基础

✨写在前面&#xff1a;强烈推荐给大家一个优秀的人工智能学习网站&#xff0c;内容包括人工智能基础、机器学习、深度学习神经网络等&#xff0c;详细介绍各部分概念及实战教程&#xff0c;通俗易懂&#xff0c;非常适合人工智能领域初学者及研究者学习。➡️点击跳转到网站。…

工地安全帽智能识别系统 YOLOv5

工地安全帽智能识别系统通过opencv深度学习技术&#xff0c;实现对现场人员的安全帽反光衣穿戴进行自动实时识别和检测。我们选择当下YOLO最新的卷积神经网络YOLOv5来进行识别检测。6月9日&#xff0c;Ultralytics公司开源了YOLOv5&#xff0c;离上一次YOLOv4发布不到50天。而且…

Allegro172版本线到铜皮不按照设定值避让的原因和解决办法

Allegro172版本线到铜皮不按照设定值避让的原因和解决办法 用Allegro做PCB设计的时候,有时会单独给某块铜皮附上线到铜皮额外再增加一个数值,如下图 在规则的基础上,额外再避让10mil 规则避让line到铜皮10.02mil 额外设置多避让10mil,避让的结果却是30.02mil,正确的是20.…

2023金三银四季跳槽季,啃完这软件测试面试题,跳槽不就稳稳的了

前言 2023年也到来了&#xff0c;接近我们所说的“金三银四”也正在执行了&#xff0c;时间晃眼就过去了&#xff0c;有的人为了2023跳槽早早做足了准备&#xff0c;有的人在临阵磨刀&#xff0c;想必屏幕前的你也想在2023年涨薪吧&#xff0c;那么问题来了&#xff0c;怎么才…

day4——与数组有关的练习

今天是学习java的第四天&#xff0c;主要内容有 给循环起一个标签数组的定义以及数组的初始化 给循环起一个标签 给循环起一个标签&#xff0c;简单的说就是给循环起一个名字&#xff0c;内部的循环可以控制外部的循环&#xff0c;外部的循环可以控制内部的循环&#xff0c;…

第四章:搭建Windows server AD域和树域

由于Windows简单一点&#xff0c;我就先搞Windows了。AD域&#xff1a;视频教程&#xff1a;https://www.bilibili.com/video/BV1f84y1G72x/在创建AD域时要把网卡配置好这是打开网卡界面的命令DNS要改成自己的&#xff0c;因为在创建域的同时也会自动创建DNS打开服务器管理器&a…

LANP架构搭建

安装Apache解压apache安装包&#xff08;httpd-2.4.17.tar.gz&#xff09;到 /usr/src/目录下面tar -zxvf /root/httpd-2.4.17.tar.gz -C /usr/src/安装httpd所需要的依赖包yum -y install zlib* openssl* apr* pcre-devel openssl*进入httpd目录&#xff0c;安装httpd所需要的…

个人ChatGPT账号注册

作者&#xff1a;Bruce.Dgithub&#xff1a;https://github.com/doukoi-BDB文章底部有【技术社群&福利】&#xff0c;不定更新活动、源码&#xff0c;欢迎来撩~~~今日主题&#xff1a;1、ChatGPT 账号注册2、预计阅读 6 分钟&#xff0c;正文2000字。最近啊 一款类似人工智…

服务降级和熔断机制

&#x1f3c6;今日学习目标&#xff1a; &#x1f340;服务降级和熔断机制 ✅创作者&#xff1a;林在闪闪发光 ⏰预计时间&#xff1a;30分钟 &#x1f389;个人主页&#xff1a;林在闪闪发光的个人主页 &#x1f341;林在闪闪发光的个人社区&#xff0c;欢迎你的加入: 林在闪闪…

状态机设计中的关键技术

⭐本专栏针对FPGA进行入门学习&#xff0c;从数电中常见的逻辑代数讲起&#xff0c;结合Verilog HDL语言学习与仿真&#xff0c;主要对组合逻辑电路与时序逻辑电路进行分析与设计&#xff0c;对状态机FSM进行剖析与建模。 &#x1f525;文章和代码已归档至【Github仓库&#xf…

IT行业寒冬,干测试从月薪18k降到了15k,“我”的路在何方

今天已经是2.10了&#xff0c;马上就是金3银4了&#xff0c;2023年才开始&#xff0c;是的&#xff0c;正值春天&#xff0c;想到了一首诗词自古逢秋悲寂寥&#xff0c;我言秋日胜春朝。晴空一鹤排云上&#xff0c;便引诗情到碧霄。秋天&#xff0c;意味着收获&#xff0c;也意…

【C语言】“指针类型”与“野指针”

文章目录一、指针是什么❔二、指针和指针类型1.指针-整数2.指针解引用三.野指针1.引起野指针的原因2.如果避免野指针完结一、指针是什么❔ 指针也就是 内存地址 &#xff0c;在计算机上我们访问数据需要通过内存地址来访问&#xff0c;在C语言中&#xff0c;指针变量是用来存放…

如何编写Python程序调用ChatGPT,只需3步

如何编写Python程序调用ChatGPT&#xff0c;只需3步 在ChatGPT官网进行注册&#xff0c;注册成功后就可以对ChatGPT进行提问&#xff0c;ChatGPT的注册流程参考这篇文章——手把手教你注册ChatGPT&#xff0c;亲测可用。 来看看ChatGPT&#xff0c;如何回答”ChatGPT是什么“…

Deepwalk深度游走算法

主要思想 Deepwalk是一种将随机游走和word2vec两种算法相结合的图结构数据的挖掘算法。该算法可以学习网络的隐藏信息&#xff0c;能够将图中的节点表示为一个包含潜在信息的向量&#xff0c; Deepwalk算法 该算法主要分为随机游走和生成表示向量两个部分&#xff0c;首先…

c++11 标准模板(STL)(std::multimap)(三)

定义于头文件 <map> template< class Key, class T, class Compare std::less<Key>, class Allocator std::allocator<std::pair<const Key, T> > > class multimap;(1)namespace pmr { template <class Key, class T…

SCI论文阅读-使用基于图像的机器学习模型对FTIR光谱进行功能组识别

期刊&#xff1a; Analytical Chemistry中科院最新分区&#xff08;2022年12月最新版&#xff09;&#xff1a;1区(TOP)影响因子&#xff08;2021-2022&#xff09;&#xff1a;8.008第一作者&#xff1a;Abigail A. Enders通讯作者&#xff1a;Heather C. Allen 原文链接&…

自己家用的电脑可以架设游戏吗

自己家用的电脑可以架设游戏吗家用电脑怎么用来做服务器呢&#xff1f;我是艾西&#xff0c;今天我跟大家详细的说家用电脑可以当服务器使用吗&#xff1f;咱们先把家用电脑不足的点列出来就清楚了一、外网端口映射以前的宽带大多数是adsl拨号&#xff0c;再搭配一个TP路由器&a…

SQL语句训练

好文推荐&#xff1a; 21个MySQL表设计的经验准则 后端程序员必备&#xff1a;书写高质量SQL的30条建议 我们为什么要分库分表&#xff1f; 从0.742秒到0.006秒&#xff0c;MySQL百万数据深分页优化实战 2020年MySQL数据库50面试题目含答案 MyBatis 表连接查询写法|三种对…

深度学习实战(11):使用多层感知器分类器对手写数字进行分类

使用多层感知器分类器对手写数字进行分类 1.简介 1.1 什么是多层感知器&#xff08;MLP&#xff09;&#xff1f; MLP 是一种监督机器学习 (ML) 算法&#xff0c;属于前馈人工神经网络 [1] 类。该算法本质上是在数据上进行训练以学习函数。给定一组特征和一个目标变量&#x…