Java 并发队列详解

news2024/12/24 2:37:57

一,简介

1,并发队列两种实现

  • 以ConcurrentLinkedQueue为代表的高性能非阻塞队列
  • 以BlockingQueue接口为代表的阻塞队列

2,阻塞队列与非阻塞队列的区别

  • 当阻塞队列是空的时,从队列中获取元素的操作将会被阻塞,试图从空的阻塞队列中获取元素的线程将会被阻塞,直到其他的线程往空的队列插入新的元素。
  • 当阻塞队列是满时,往队列里添加元素的操作会被阻塞。试图往已满的阻塞队列中添加新元素的线程同样也会被阻塞,直到其他的线程使队列重新变得空闲起来,如从队列中移除一个或者多个元素,或者完全清空队列。

3,类接口关系

二,ConcurrentLinkedQueue

ConcurerntLinkedQueue一个基于链表的非阻塞的无界线程安全队列。此队列按照 FIFO(先进先出)原则对元素进行排序队列的头部是队列中时间最长的元素。队列的尾部是队列中时间最短的元素。新的元素插入到队列的尾部,队列获取操作从队列头部获得元素。当多个线程共享访问一个公共 collection 时,ConcurrentLinkedQueue是一个恰当的选择。此队列不允许使用null元素。

1,ConcurrentLinkedQueue数据结构

通过源码分析可知,ConcurrentLinkedQueue的数据结构与LinkedBlockingQueue的数据结构相同,都是使用的链表结构。ConcurrentLinkedQueue的数据结构如下:

 说明: ConcurrentLinkedQueue采用的链表结构,并且包含有一个头节点和一个尾结点。

2,ConcurrentLinkedQueue源码分析

类的继承关系

public class ConcurrentLinkedQueue<E> extends AbstractQueue<E>
        implements Queue<E>, java.io.Serializable {}

说明: ConcurrentLinkedQueue继承了抽象类AbstractQueue,AbstractQueue定义了对队列的基本操作;同时实现了Queue接口,Queue定义了对队列的基本操作,同时,还实现了Serializable接口,表示可以被序列化。

类的内部类

private static class Node<E> {
    // 元素
    volatile E item;
    // next域
    volatile Node<E> next;

    /**
        * Constructs a new node.  Uses relaxed write because item can
        * only be seen after publication via casNext.
        */
    // 构造函数
    Node(E item) {
        // 设置item的值
        UNSAFE.putObject(this, itemOffset, item);
    }
    // 比较并替换item值
    boolean casItem(E cmp, E val) {
        return UNSAFE.compareAndSwapObject(this, itemOffset, cmp, val);
    }
    
    void lazySetNext(Node<E> val) {
        // 设置next域的值,并不会保证修改对其他线程立即可见
        UNSAFE.putOrderedObject(this, nextOffset, val);
    }
    // 比较并替换next域的值
    boolean casNext(Node<E> cmp, Node<E> val) {
        return UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val);
    }

    // Unsafe mechanics
    // 反射机制
    private static final sun.misc.Unsafe UNSAFE;
    // item域的偏移量
    private static final long itemOffset;
    // next域的偏移量
    private static final long nextOffset;

    static {
        try {
            UNSAFE = sun.misc.Unsafe.getUnsafe();
            Class<?> k = Node.class;
            itemOffset = UNSAFE.objectFieldOffset
                (k.getDeclaredField("item"));
            nextOffset = UNSAFE.objectFieldOffset
                (k.getDeclaredField("next"));
        } catch (Exception e) {
            throw new Error(e);
        }
    }
}

说明: Node类表示链表结点,用于存放元素,包含item域和next域,item域表示元素,next域表示下一个结点,其利用反射机制和CAS机制来更新item域和next域,保证原子性。

类的属性

public class ConcurrentLinkedQueue<E> extends AbstractQueue<E>
        implements Queue<E>, java.io.Serializable {
    // 版本序列号        
    private static final long serialVersionUID = 196745693267521676L;
    // 反射机制
    private static final sun.misc.Unsafe UNSAFE;
    // head域的偏移量
    private static final long headOffset;
    // tail域的偏移量
    private static final long tailOffset;
    static {
        try {
            UNSAFE = sun.misc.Unsafe.getUnsafe();
            Class<?> k = ConcurrentLinkedQueue.class;
            headOffset = UNSAFE.objectFieldOffset
                (k.getDeclaredField("head"));
            tailOffset = UNSAFE.objectFieldOffset
                (k.getDeclaredField("tail"));
        } catch (Exception e) {
            throw new Error(e);
        }
    }
    
    // 头节点
    private transient volatile Node<E> head;
    // 尾结点
    private transient volatile Node<E> tail;
}

说明: 属性中包含了head域和tail域,表示链表的头节点和尾结点,同时,ConcurrentLinkedQueue也使用了反射机制和CAS机制来更新头节点和尾结点,保证原子性。

类的构造函数

  • ConcurrentLinkedQueue()型构造函数
public ConcurrentLinkedQueue() {
    // 初始化头节点与尾结点
    head = tail = new Node<E>(null);
}

说明: 该构造函数用于创建一个最初为空的 ConcurrentLinkedQueue,头节点与尾结点指向同一个结点,该结点的item域为null,next域也为null。

  • ConcurrentLinkedQueue(Collection<? extends E>)型构造函数
public ConcurrentLinkedQueue(Collection<? extends E> c) {
    Node<E> h = null, t = null;
    for (E e : c) { // 遍历c集合
        // 保证元素不为空
        checkNotNull(e);
        // 新生一个结点
        Node<E> newNode = new Node<E>(e);
        if (h == null) // 头节点为null
            // 赋值头节点与尾结点
            h = t = newNode;
        else {
            // 直接头节点的next域
            t.lazySetNext(newNode);
            // 重新赋值头节点
            t = newNode;
        }
    }
    if (h == null) // 头节点为null
        // 新生头节点与尾结点
        h = t = new Node<E>(null);
    // 赋值头节点
    head = h;
    // 赋值尾结点
    tail = t;
}

说明: 该构造函数用于创建一个最初包含给定 collection 元素的 ConcurrentLinkedQueue,按照此 collection 迭代器的遍历顺序来添加元素。

核心函数分析

offer函数

public boolean offer(E e) {
    // 元素不为null
    checkNotNull(e);
    // 新生一个结点
    final Node<E> newNode = new Node<E>(e);

    for (Node<E> t = tail, p = t;;) { // 无限循环
        // q为p结点的下一个结点
        Node<E> q = p.next;
        if (q == null) { // q结点为null
            // p is last node
            if (p.casNext(null, newNode)) { // 比较并进行替换p结点的next域
                // Successful CAS is the linearization point
                // for e to become an element of this queue,
                // and for newNode to become "live".
                if (p != t) // p不等于t结点,不一致    // hop two nodes at a time
                    // 比较并替换尾结点
                    casTail(t, newNode);  // Failure is OK.
                // 返回
                return true;
            }
            // Lost CAS race to another thread; re-read next
        }
        else if (p == q) // p结点等于q结点
            // We have fallen off list.  If tail is unchanged, it
            // will also be off-list, in which case we need to
            // jump to head, from which all live nodes are always
            // reachable.  Else the new tail is a better bet.
            // 原来的尾结点与现在的尾结点是否相等,若相等,则p赋值为head,否则,赋值为现在的尾结点
            p = (t != (t = tail)) ? t : head;
        else
            // Check for tail updates after two hops.
            // 重新赋值p结点
            p = (p != t && t != (t = tail)) ? t : q;
    }
}

说明: offer函数用于将指定元素插入此队列的尾部。下面模拟offer函数的操作,队列状态的变化(假设单线程添加元素,连续添加10、20两个元素)。

  • 若ConcurrentLinkedQueue的初始状态如上图所示,即队列为空。单线程添加元素,此时,添加元素10,则状态如下所示

  • 如上图所示,添加元素10后,tail没有变化,还是指向之前的结点,继续添加元素20,则状态如下所示

  • 如上图所示,添加元素20后,tail指向了最新添加的结点。

poll函数

public E poll() {
    restartFromHead:
    for (;;) { // 无限循环
        for (Node<E> h = head, p = h, q;;) { // 保存头节点
            // item项
            E item = p.item;

            if (item != null && p.casItem(item, null)) { // item不为null并且比较并替换item成功
                // Successful CAS is the linearization point
                // for item to be removed from this queue.
                if (p != h) // p不等于h    // hop two nodes at a time
                    // 更新头节点
                    updateHead(h, ((q = p.next) != null) ? q : p); 
                // 返回item
                return item;
            }
            else if ((q = p.next) == null) { // q结点为null
                // 更新头节点
                updateHead(h, p);
                return null;
            }
            else if (p == q) // p等于q
                // 继续循环
                continue restartFromHead;
            else
                // p赋值为q
                p = q;
        }
    }
}

说明: 此函数用于获取并移除此队列的头,如果此队列为空,则返回null。下面模拟poll函数的操作,队列状态的变化(假设单线程操作,状态为之前offer10、20后的状态,poll两次)。

  •  队列初始状态如上图所示,在poll操作后,队列的状态如下图所示

  • 如上图可知,poll操作后,head改变了,并且head所指向的结点的item变为了null。再进行一次poll操作,队列的状态如下图所示。

  • 如上图可知,poll操作后,head结点没有变化,只是指示的结点的item域变成了null。

remove函数

public boolean remove(Object o) {
    // 元素为null,返回
    if (o == null) return false;
    Node<E> pred = null;
    for (Node<E> p = first(); p != null; p = succ(p)) { // 获取第一个存活的结点
        // 第一个存活结点的item值
        E item = p.item;
        if (item != null &&
            o.equals(item) &&
            p.casItem(item, null)) { // 找到item相等的结点,并且将该结点的item设置为null
            // p的后继结点
            Node<E> next = succ(p);
            if (pred != null && next != null) // pred不为null并且next不为null
                // 比较并替换next域
                pred.casNext(p, next);
            return true;
        }
        // pred赋值为p
        pred = p;
    }
    return false;
}

说明: 此函数用于从队列中移除指定元素的单个实例(如果存在)。其中,会调用到first函数和succ函数,first函数的源码如下

Node<E> first() {
    restartFromHead:
    for (;;) { // 无限循环,确保成功
        for (Node<E> h = head, p = h, q;;) {
            // p结点的item域是否为null
            boolean hasItem = (p.item != null);
            if (hasItem || (q = p.next) == null) { // item不为null或者next域为null
                // 更新头节点
                updateHead(h, p);
                // 返回结点
                return hasItem ? p : null;
            }
            else if (p == q) // p等于q
                // 继续从头节点开始
                continue restartFromHead;
            else
                // p赋值为q
                p = q;
        }
    }
}

说明: first函数用于找到链表中第一个存活的结点。succ函数源码如下

final Node<E> succ(Node<E> p) {
    // p结点的next域
    Node<E> next = p.next;
    // 如果next域为自身,则返回头节点,否则,返回next
    return (p == next) ? head : next;
}

说明: succ用于获取结点的下一个结点。如果结点的next域指向自身,则返回head头节点,否则,返回next结点。下面模拟remove函数的操作,队列状态的变化(假设单线程操作,状态为之前offer10、20后的状态,执行remove(10)、remove(20)操作)。

  • 如上图所示,为ConcurrentLinkedQueue的初始状态,remove(10)后的状态如下图所示

  • 如上图所示,当执行remove(10)后,head指向了head结点之前指向的结点的下一个结点,并且head结点的item域置为null。继续执行remove(20),状态如下图所示

  • 如上图所示,执行remove(20)后,head与tail指向同一个结点,item域为null。

size函数

public int size() {
    // 计数
    int count = 0;
    for (Node<E> p = first(); p != null; p = succ(p)) // 从第一个存活的结点开始往后遍历
        if (p.item != null) // 结点的item域不为null
            // Collection.size() spec says to max out
            if (++count == Integer.MAX_VALUE) // 增加计数,若达到最大值,则跳出循环
                break;
    // 返回大小
    return count;
}

说明: 此函数用于返回ConcurrenLinkedQueue的大小,从第一个存活的结点(first)开始,往后遍历链表,当结点的item域不为null时,增加计数,之后返回大小。

3,ConcurrentLinkedQueue示例

下面通过一个示例来了解ConcurrentLinkedQueue的使用

import java.util.concurrent.ConcurrentLinkedQueue;

class PutThread extends Thread {
    private ConcurrentLinkedQueue<Integer> clq;
    public PutThread(ConcurrentLinkedQueue<Integer> clq) {
        this.clq = clq;
    }
    
    public void run() {
        for (int i = 0; i < 10; i++) {
            try {
                System.out.println("add " + i);
                clq.add(i);
                Thread.sleep(100);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}

class GetThread extends Thread {
    private ConcurrentLinkedQueue<Integer> clq;
    public GetThread(ConcurrentLinkedQueue<Integer> clq) {
        this.clq = clq;
    }
    
    public void run() {
        for (int i = 0; i < 10; i++) {
            try {
                System.out.println("poll " + clq.poll());
                Thread.sleep(100);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}

public class ConcurrentLinkedQueueDemo {
    public static void main(String[] args) {
        ConcurrentLinkedQueue<Integer> clq = new ConcurrentLinkedQueue<Integer>();
        PutThread p1 = new PutThread(clq);
        GetThread g1 = new GetThread(clq);
        
        p1.start();
        g1.start();
        
    }
}

运行结果(某一次):

add 0
poll null
add 1
poll 0
add 2
poll 1
add 3
poll 2
add 4
poll 3
add 5
poll 4
poll 5
add 6
add 7
poll 6
poll 7
add 8
add 9
poll 8

说明: GetThread线程不会因为ConcurrentLinkedQueue队列为空而等待,而是直接返回null,所以当实现队列不空时,等待时,则需要用户自己实现等待逻辑。

4,HOPS(延迟更新的策略)的设计

通过上面对offer和poll方法的分析,我们发现tail和head是延迟更新的,两者更新触发时机为:

  • tail更新触发时机:当tail指向的节点的下一个节点不为null的时候,会执行定位队列真正的队尾节点的操作,找到队尾节点后完成插入之后才会通过casTail进行tail更新;当tail指向的节点的下一个节点为null的时候,只插入节点不更新tail。

  • head更新触发时机:当head指向的节点的item域为null的时候,会执行定位队列真正的队头节点的操作,找到队头节点后完成删除之后才会通过updateHead进行head更新;当head指向的节点的item域不为null的时候,只删除节点不更新head。

并且在更新操作时,源码中会有注释为:hop two nodes at a time。所以这种延迟更新的策略就被叫做HOPS的原因是这个,从上面更新时的状态图可以看出,head和tail的更新是“跳着的”即中间总是间隔了一个。那么这样设计的意图是什么呢?

如果让tail永远作为队列的队尾节点,实现的代码量会更少,而且逻辑更易懂。但是,这样做有一个缺点,如果大量的入队操作,每次都要执行CAS进行tail的更新,汇总起来对性能也会是大大的损耗。如果能减少CAS更新的操作,无疑可以大大提升入队的操作效率,所以doug lea大师每间隔1次(tail和队尾节点的距离为1)进行才利用CAS更新tail。对head的更新也是同样的道理,虽然,这样设计会多出在循环中定位队尾节点,但总体来说读的操作效率要远远高于写的性能,因此,多出来的在循环中定位尾节点的操作的性能损耗相对而言是很小的。

5,ConcurrentLinkedQueue适合的场景

ConcurrentLinkedQueue通过无锁来做到了更高的并发量,是个高性能的队列,但是使用场景相对不如阻塞队列常见,毕竟取数据也要不停的去循环,不如阻塞的逻辑好设计,但是在并发量特别大的情况下,是个不错的选择,性能上好很多,而且这个队列的设计也是特别费力,尤其的使用的改良算法和对哨兵的处理。整体的思路都是比较严谨的,这个也是使用了无锁造成的,我们自己使用无锁的条件的话,这个队列是个不错的参考。

三,BlockingQueue

1,BlockingQueue和BlockingDeque

BlockingQueue

BlockingQueue 通常用于一个线程生产对象,而另外一个线程消费这些对象的场景。下图是对这个原理的阐述:

一个线程往里边放,另外一个线程从里边取的一个 BlockingQueue。

一个线程将会持续生产新对象并将其插入到队列之中,直到队列达到它所能容纳的临界点。也就是说,它是有限的。如果该阻塞队列到达了其临界点,负责生产的线程将会在往里边插入新对象时发生阻塞。它会一直处于阻塞之中,直到负责消费的线程从队列中拿走一个对象。 负责消费的线程将会一直从该阻塞队列中拿出对象。如果消费线程尝试去从一个空的队列中提取对象的话,这个消费线程将会处于阻塞之中,直到一个生产线程把一个对象丢进队列。

BlockingQueue 的方法

BlockingQueue 具有 4 组不同的方法用于插入、移除以及对队列中的元素进行检查。如果请求的操作不能得到立即执行的话,每个方法的表现也不同。这些方法如下:

抛异常特定值阻塞超时
插入add(o)offer(o)put(o)offer(o, timeout, timeunit)
移除remove()poll()take()poll(timeout, timeunit)
检查element()peek()

四组不同的行为方式解释:

  • 抛异常: 如果试图的操作无法立即执行,抛一个异常。
  • 特定值: 如果试图的操作无法立即执行,返回一个特定的值(常常是 true / false)。
  • 阻塞: 如果试图的操作无法立即执行,该方法调用将会发生阻塞,直到能够执行。
  • 超时: 如果试图的操作无法立即执行,该方法调用将会发生阻塞,直到能够执行,但等待时间不会超过给定值。返回一个特定值以告知该操作是否成功(典型的是 true / false)。

无法向一个 BlockingQueue 中插入 null。如果你试图插入 null,BlockingQueue 将会抛出一个 NullPointerException。 可以访问到 BlockingQueue 中的所有元素,而不仅仅是开始和结束的元素。比如说,你将一个对象放入队列之中以等待处理,但你的应用想要将其取消掉。那么你可以调用诸如 remove(o) 方法来将队列之中的特定对象进行移除。但是这么干效率并不高(译者注: 基于队列的数据结构,获取除开始或结束位置的其他对象的效率不会太高),因此你尽量不要用这一类的方法,除非你确实不得不那么做。

BlockingDeque

java.util.concurrent 包里的 BlockingDeque 接口表示一个线程安放入和提取实例的双端队列。

BlockingDeque 类是一个双端队列,在不能够插入元素时,它将阻塞住试图插入元素的线程;在不能够抽取元素时,它将阻塞住试图抽取的线程。 deque(双端队列) 是 "Double Ended Queue" 的缩写。因此,双端队列是一个你可以从任意一端插入或者抽取元素的队列。

在线程既是一个队列的生产者又是这个队列的消费者的时候可以使用到 BlockingDeque。如果生产者线程需要在队列的两端都可以插入数据,消费者线程需要在队列的两端都可以移除数据,这个时候也可以使用 BlockingDeque。BlockingDeque 图解:

BlockingDeque 的方法

一个 BlockingDeque - 线程在双端队列的两端都可以插入和提取元素。 一个线程生产元素,并把它们插入到队列的任意一端。如果双端队列已满,插入线程将被阻塞,直到一个移除线程从该队列中移出了一个元素。如果双端队列为空,移除线程将被阻塞,直到一个插入线程向该队列插入了一个新元素。

BlockingDeque 具有 4 组不同的方法用于插入、移除以及对双端队列中的元素进行检查。如果请求的操作不能得到立即执行的话,每个方法的表现也不同。这些方法如下:

抛异常特定值阻塞超时
插入addFirst(o)offerFirst(o)putFirst(o)offerFirst(o, timeout, timeunit)
移除removeFirst(o)pollFirst(o)takeFirst(o)pollFirst(timeout, timeunit)
检查getFirst(o)peekFirst(o)
抛异常特定值阻塞超时
插入addLast(o)offerLast(o)putLast(o)offerLast(o, timeout, timeunit)
移除removeLast(o)pollLast(o)takeLast(o)pollLast(timeout, timeunit)
检查getLast(o)peekLast(o)

四组不同的行为方式解释:

  • 抛异常: 如果试图的操作无法立即执行,抛一个异常。
  • 特定值: 如果试图的操作无法立即执行,返回一个特定的值(常常是 true / false)。
  • 阻塞: 如果试图的操作无法立即执行,该方法调用将会发生阻塞,直到能够执行。
  • 超时: 如果试图的操作无法立即执行,该方法调用将会发生阻塞,直到能够执行,但等待时间不会超过给定值。返回一个特定值以告知该操作是否成功(典型的是 true / false)。

BlockingDeque 与BlockingQueue关系

BlockingDeque 接口继承自 BlockingQueue 接口。这就意味着你可以像使用一个 BlockingQueue 那样使用 BlockingDeque。如果你这么干的话,各种插入方法将会把新元素添加到双端队列的尾端,而移除方法将会把双端队列的首端的元素移除。正如 BlockingQueue 接口的插入和移除方法一样。

以下是 BlockingDeque 对 BlockingQueue 接口的方法的具体内部实现:

BlockingQueueBlockingDeque
add()addLast()
offer() x 2offerLast() x 2
put()putLast()
remove()removeFirst()
poll() x 2pollFirst()
take()takeFirst()
element()getFirst()
peek()peekFirst()

BlockingQueue 的例子

这里是一个 Java 中使用 BlockingQueue 的示例。本示例使用的是 BlockingQueue 接口的 ArrayBlockingQueue 实现。 首先,BlockingQueueExample 类分别在两个独立的线程中启动了一个 Producer 和 一个 Consumer。Producer 向一个共享的 BlockingQueue 中注入字符串,而 Consumer 则会从中把它们拿出来。

public class BlockingQueueExample {
 
    public static void main(String[] args) throws Exception {
 
        BlockingQueue queue = new ArrayBlockingQueue(1024);
 
        Producer producer = new Producer(queue);
        Consumer consumer = new Consumer(queue);
 
        new Thread(producer).start();
        new Thread(consumer).start();
 
        Thread.sleep(4000);
    }
}

以下是 Producer 类。注意它在每次 put() 调用时是如何休眠一秒钟的。这将导致 Consumer 在等待队列中对象的时候发生阻塞。

public class Producer implements Runnable{
 
    protected BlockingQueue queue = null;
 
    public Producer(BlockingQueue queue) {
        this.queue = queue;
    }
 
    public void run() {
        try {
            queue.put("1");
            Thread.sleep(1000);
            queue.put("2");
            Thread.sleep(1000);
            queue.put("3");
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

以下是 Consumer 类。它只是把对象从队列中抽取出来,然后将它们打印到 System.out。

public class Consumer implements Runnable{
 
    protected BlockingQueue queue = null;
 
    public Consumer(BlockingQueue queue) {
        this.queue = queue;
    }
 
    public void run() {
        try {
            System.out.println(queue.take());
            System.out.println(queue.take());
            System.out.println(queue.take());
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

2,数组阻塞队列 ArrayBlockingQueue

ArrayBlockingQueue 类实现了 BlockingQueue 接口。

ArrayBlockingQueue 是一个有界的阻塞队列,其内部实现是将对象放到一个数组里。有界也就意味着,它不能够存储无限多数量的元素。它有一个同一时间能够存储元素数量的上限。你可以在对其初始化的时候设定这个上限,但之后就无法对这个上限进行修改了(译者注: 因为它是基于数组实现的,也就具有数组的特性: 一旦初始化,大小就无法修改)。 ArrayBlockingQueue 内部以 FIFO(先进先出)的顺序对元素进行存储。队列中的头元素在所有元素之中是放入时间最久的那个,而尾元素则是最短的那个。 以下是在使用 ArrayBlockingQueue 的时候对其初始化的一个示例:

BlockingQueue queue = new ArrayBlockingQueue(1024);
queue.put("1");
Object object = queue.take();

以下是使用了 Java 泛型的一个 BlockingQueue 示例。注意其中是如何对 String 元素放入和提取的:

BlockingQueue<String> queue = new ArrayBlockingQueue<String>(1024);
queue.put("1");
String string = queue.take();

3,延迟队列 DelayQueue

DelayQueue 实现了 BlockingQueue 接口。

DelayQueue 对元素进行持有直到一个特定的延迟到期。注入其中的元素必须实现 java.util.concurrent.Delayed 接口,该接口定义:

public interface Delayed extends Comparable<Delayed< {
    public long getDelay(TimeUnit timeUnit);
}

DelayQueue 将会在每个元素的 getDelay() 方法返回的值的时间段之后才释放掉该元素。如果返回的是 0 或者负值,延迟将被认为过期,该元素将会在 DelayQueue 的下一次 take 被调用的时候被释放掉。

传递给 getDelay 方法的 getDelay 实例是一个枚举类型,它表明了将要延迟的时间段。TimeUnit 枚举将会取以下值:

  • DAYS
  • HOURS
  • INUTES
  • SECONDS
  • MILLISECONDS
  • MICROSECONDS
  • NANOSECONDS

正如你所看到的,Delayed 接口也继承了 java.lang.Comparable 接口,这也就意味着 Delayed 对象之间可以进行对比。这个可能在对 DelayQueue 队列中的元素进行排序时有用,因此它们可以根据过期时间进行有序释放。 以下是使用 DelayQueue 的例子:

public class DelayQueueExample {
 
    public static void main(String[] args) {
        DelayQueue queue = new DelayQueue();
        Delayed element1 = new DelayedElement();
        queue.put(element1);
        Delayed element2 = queue.take();
    }
}

DelayedElement 是我所创建的一个 DelayedElement 接口的实现类,它不在 java.util.concurrent 包里。你需要自行创建你自己的 Delayed 接口的实现以使用 DelayQueue 类。

4,链阻塞队列 LinkedBlockingQueue

LinkedBlockingQueue 类实现了 BlockingQueue 接口。

LinkedBlockingQueue 内部以一个链式结构(链接节点)对其元素进行存储。如果需要的话,这一链式结构可以选择一个上限。如果没有定义上限,将使用 Integer.MAX_VALUE 作为上限。

LinkedBlockingQueue 内部以 FIFO(先进先出)的顺序对元素进行存储。队列中的头元素在所有元素之中是放入时间最久的那个,而尾元素则是最短的那个。 以下是 LinkedBlockingQueue 的初始化和使用示例代码:

BlockingQueue<String> unbounded = new LinkedBlockingQueue<String>();
BlockingQueue<String> bounded   = new LinkedBlockingQueue<String>(1024);
bounded.put("Value");
String value = bounded.take();

5,具有优先级的阻塞队列 PriorityBlockingQueue

PriorityBlockingQueue 类实现了 BlockingQueue 接口。

PriorityBlockingQueue 是一个无界的并发队列。它使用了和类 java.util.PriorityQueue 一样的排序规则。你无法向这个队列中插入 null 值。 所有插入到 PriorityBlockingQueue 的元素必须实现 java.lang.Comparable 接口。因此该队列中元素的排序就取决于你自己的 Comparable 实现。 注意 PriorityBlockingQueue 对于具有相等优先级(compare() == 0)的元素并不强制任何特定行为。

同时注意,如果你从一个 PriorityBlockingQueue 获得一个 Iterator 的话,该 Iterator 并不能保证它对元素的遍历是以优先级为序的。 以下是使用 PriorityBlockingQueue 的示例:

BlockingQueue queue   = new PriorityBlockingQueue();
//String implements java.lang.Comparable
queue.put("Value");
String value = queue.take();

6,同步队列 SynchronousQueue

SynchronousQueue 类实现了 BlockingQueue 接口。

SynchronousQueue 是一个特殊的队列,它的内部同时只能够容纳单个元素。如果该队列已有一元素的话,试图向队列中插入一个新元素的线程将会阻塞,直到另一个线程将该元素从队列中抽走。同样,如果该队列为空,试图向队列中抽取一个元素的线程将会阻塞,直到另一个线程向队列中插入了一条新的元素。 据此,把这个类称作一个队列显然是夸大其词了。它更多像是一个汇合点。

7,BlockingDeque 的例子

既然 BlockingDeque 是一个接口,那么你想要使用它的话就得使用它的众多的实现类的其中一个。java.util.concurrent 包提供了以下 BlockingDeque 接口的实现类: LinkedBlockingDeque。

以下是如何使用 BlockingDeque 方法的一个简短代码示例:

BlockingDeque<String> deque = new LinkedBlockingDeque<String>();
deque.addFirst("1");
deque.addLast("2");
 
String two = deque.takeLast();
String one = deque.takeFirst();

8,链阻塞双端队列 LinkedBlockingDeque

LinkedBlockingDeque 类实现了 BlockingDeque 接口。

deque(双端队列) 是 "Double Ended Queue" 的缩写。因此,双端队列是一个你可以从任意一端插入或者抽取元素的队列。

LinkedBlockingDeque 是一个双端队列,在它为空的时候,一个试图从中抽取数据的线程将会阻塞,无论该线程是试图从哪一端抽取数据。

以下是 LinkedBlockingDeque 实例化以及使用的示例:

BlockingDeque<String> deque = new LinkedBlockingDeque<String>();
deque.addFirst("1");
deque.addLast("2");
 
String two = deque.takeLast();
String one = deque.takeFirst();

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/531858.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

【BFS】华子20230506笔试第三题(动态迷宫问题)Java实现

文章目录 题目链接思路BFS板子我的解答 题目链接 塔子哥的codeFun2000&#xff1a;http://101.43.147.120/p/P1251 测试样例1 输入 3 2 1 0 1 2 2 1 2 0 100 100 100 100 000 100 000 000 001输出 1测试样例2 输入 3 2 1 0 2 0 0 1 2 2 000 000 001 010 101 101 110 010 …

在docker容器中启动docker服务并实现构建多平台镜像的能力

在docker容器中启动docker服务并实现构建多平台镜像的能力 背景 在容器中运行docker&#xff0c;是devops中无法避免的场景&#xff0c;通常被应用于提供统一的镜像构建工具&#xff0c;出于安全考虑&#xff0c;不适合将主机的docker进程暴露给公司的内部人员使用&#xff0…

SpringCloud alibaba微服务b2b2c电子商务平台

1. 涉及平台 平台管理、商家端&#xff08;PC端、手机端&#xff09;、买家平台&#xff08;H5/公众号、小程序、APP端&#xff08;IOS/Android&#xff09;、微服务平台&#xff08;业务服务、系统服务、中间件服务&#xff09; 2. 核心架构 Spring Cloud、Spring Boot2、My…

飞书开发流程

1、进入飞书并创建一个应用 链接: 创建应用 创建应用成功后需要审核通过&#xff0c;如果你拥有管理权限则可以自己进入管理后台通过审核&#xff0c;否则需要联系管理员通过审核 2、进入开发者后台 链接: 发者后台 3、在该调试平台上测试 以这个订阅审批事件为例 这一步…

DHCP协议简单配置

实验原理 网络中主机需要与外界进行通信时,需要配置自己的IP地址、网关地址、DNS服务器等网络参数信息。手工在每台主机上配置维护成本高,容易出错,而且不利于管理员统一维护。 通过DHCP地址自动配置协议,使终端设备能自动获取地址,实现即插即用且IP地址统一由服务器管理…

springboot+java充电桩充电额维修管理系统

项目介绍 Spring Boot 是 Spring 家族中的一个全新的框架&#xff0c;它用来简化Spring应用程序的创建和开发过程。也可以说 Spring Boot 能简化我们之前采用SSM&#xff08;Spring MVC Spring MyBatis &#xff09;框架进行开发的过程。 系统基于B/S即所谓浏览器/服务器模式…

STM32 学习笔记_9 定时器中断:编码器接口模式

TIM编码器接口 之前我们处理旋转编码器&#xff0c;是转一下中断一次&#xff0c;挺消耗资源的。 我们可以利用TIM的编码器功能&#xff0c;隔一段时间取一下旋转器值使得cnt或–&#xff0c;以此判断旋转位置以及计算速度&#xff0c;相比中断节约资源。相当于外接了一个有方…

Kubernetes那点事儿——暴露服务之Service

Kubernetes那点事儿——暴露服务之Service 前言一、Service二、Service与Pod关系三、Service常用类型ClusterIPNodePortLoadBalancer 四、Service代理模式IptablesIPVS修改代理模式 前言 K8s中&#xff0c;我们将应用跑在Pod里。多数情况下是一组Pod&#xff0c;用户如何访问这…

凌恩生物美文分享 | 提升科研有一套 | 宏基因组磷循环分析又出新!

磷是包括微生物在内的所有生命体中不可缺少的元素。在生物大分子核酸、高能量化合物ATP、以及生物体内糖代谢的某些中间体中&#xff0c;都有磷的存在。在自然界中&#xff0c;磷的循环包括可溶性无机磷的同化、有机磷的矿化、不溶性磷的溶解等。微生物分解含磷化合物的作用&am…

操作系统面试相关知识

目录 一、简介1、什么是操作系统2、操作系统主要有哪些功能&#xff1f; 二、操作系统结构1、什么是内核&#xff1f;2、什么是用户态和内核态&#xff1f;3、 用户态和内核态是如何切换的&#xff1f; 三、 进程和线程1、并行和并发有什么区别&#xff1f;2、什么是进程上下文…

无线传感器网络的时钟同步估计问题(Matlab代码实现)

目录 &#x1f4a5;1 概述 &#x1f4da;2 运行结果 &#x1f389;3 参考文献 &#x1f468;‍&#x1f4bb;4 Matlab代码 &#x1f4a5;1 概述 随着无线传感器网络的快速发展,其应用领域也越来越广。在诸多的应用环境中都需要大量已同步的传感器节点通过协同作用执行一个…

Python 中IndexError: list assignment index out of range 错误解决

文章目录 Python IndexError&#xff1a;列表分配索引超出范围修复 Python 中的 IndexError: list assignment index out of range修复 IndexError: list assignment index out of range 使用 append() 函数修复 IndexError: list assignment index out of range 使用 insert()…

怎么把文本翻译成英文?安利三个文本翻译方法

在当今全球化的时代&#xff0c;跨国交流和合作已经成为常态。然而&#xff0c;不同语言之间的沟通障碍经常阻碍着信息传递和理解。为了帮助我们更好地进行国际交流&#xff0c;文本翻译英文软件应运而生。这类软件能够将各种语言的文本迅速准确地翻译成英文&#xff0c;使我们…

【起飞】让你电脑速度快到飞起的一些牛逼的设置整理【电脑卡顿反应慢等问题解决】

对于开发来说电脑的反应速度简直影响了思维的速度&#xff0c;要让电脑速度跟上我们的思维&#xff0c;提高工作效率&#xff0c;早点打卡下班回家陪老婆孩子哈哈 这篇文章主要对windows系统做的一些优化&#xff0c;是真的好用&#xff0c;仿佛在访问静态页面一样&#xff0c;…

超实用!年薪40W的项目经理都在用的6个项目管理软件

项目管理软件是帮助团队进行项目计划、任务分配、进度跟踪和团队协作等方面的工具&#xff0c;已经成为了项目经理必不可少的工具之一。 市面上的项目管理软件有很多&#xff0c;这就来分享一下几款我认为好用的项目管理软件&#xff01; 一、六款好用的项目管理软件 1.简道…

C++开发工具 VTK技术实现三维重建CT医学影像PACS系统

一、信息管理 1、支持对患者、检查项目、申请医生、申请单据、设备等信息进行管理&#xff1b; 2、支持检查病人排队管理功能&#xff1b; 3、支持大屏幕队列显示和语音呼叫&#xff1b; 4、提供预约调整、插队管理和掉队处理等功能&#xff1b; 5、支持急诊申请优先安排。…

美股股指期货重要吗?要注意哪些风险?

美股股指期货是一种以美股股票价格指数作为标的物的金融期货合约。美股股指期货的表现对全球股指市场都有重要的影响力&#xff0c;具体体现在以下方面。 美股股指期货成为全球股指市场风向标 自20世纪70年代起&#xff0c;美国芝加哥商品交易所&#xff08;CME&#xff09;推…

Metasploit基础和实操-渗透测试察打一体(7)

Metasploit 基础包含专业术语,Metasploit 接口介绍,Metasploit 功能介绍 这些是弄渗透测试必须要了解的基础知识,也是0开始玩渗透测试和渗透开发的准备之一。 metasploit命令演示的环境搭建可以参考:https://luozhonghua.blog.csdn.net/article/details/123549917 作者用…

2023.05.14-微调ResNet参加kaggle上猫狗大战比赛打到99%的分类准确率_convert

文章目录 1. 前言2. 下载数据集3. 比赛成绩排名 4. baseline 5. 尝试5.1. 数据归一化&#xff08;98.994%&#xff09;5.2. 使用AdamW优化器&#xff08;98.63%&#xff09;5.3. 使用AdamW优化器SegNet模块&#xff08;95.05%&#xff09; 6. 结语7. 感慨 8. 代码8.1. ResNetNo…

贝尔曼福特算法——负权值单源最短路径

title: 贝尔曼福特算法——负权值单源最短路径 date: 2023-05-16 11:42:26 tags: 数据结构与算法 贝尔曼福特算法——负权值单源最短路径 **问题&#xff1a;**具有负权值非环图的单源最短路径算法 git地址&#xff1a;https://github.com/944613709/HIT-Data-Structures-and-A…