文章目录
- Queue
- 队列
- Queue
- Deque
- 阻塞队列
- BlockingQueue
- ArrayBlockingQueue
- LinkedBlockingQueue
- PriorityBlockingQueue
- SynchronousQueue
- DelayQueue
- BlockingDeque
- LinkedBlockingDeque
- TransferQueue
- LinkedTransferQueue
Queue
Queue(队列)是一种特殊的线性表,它只允许在表的前端(front)进行删除操作,只允许在表的后端(rear)进行插入操作。进行插入操作的端称为队尾,进行删除操作的端称为队头。
每个元素总是从队列的rear端进入队列,然后等待该元素之前的所有元素出队之后,当前元素才能出对,遵循先进先出(FIFO)原则。下面是Queue类的继承关系图:
图中我们可以看到,最上层是Collection接口,Queue满足集合类的所有方法:
- add(E e):增加元素
- remove(Object o):删除元素
- clear():清除集合中所有元素
- size():集合元素的大小
- isEmpty():集合是否没有元素
- contains(Object o):集合是否包含元素o
队列
Queue
Queue:队列的上层接口,提供了插入、删除、获取元素这3种类型的方法,而且对每一种类型都提供了两种方式。
插入方法
- add(E e):插入元素到队尾,插入成功返回true,没有可用空间抛出异常 IllegalStateException
- offer(E e): 插入元素到队尾,插入成功返回true,否则返回false
add和offer作为插入方法的唯一不同就在于队列满了之后的处理方式。add抛出异常,而offer返回false。
删除和获取元素方法(和插入方法类似)
- remove():获取并移除队首的元素,该方法和poll方法的不同之处在于,如果队列为空该方法会抛出异常,而poll不会
- poll():获取并移除队首的元素,如果队列为空,返回null
- element():获取队列首的元素,该方法和peek方法的不同之处在于,如果队列为空该方法会抛出异常,而peek不会
- peek():获取队列首的元素,如果队列为空,返回null
如果队列是空,remove和element方法会抛出异常,而poll和peek返回null。当然,Queue只是单向队列,为了提供更强大的功能,JDK在1.6的时候新增了一个双向队列Deque,用来实现更灵活的队列操作。
Deque
Deque在Queue的基础上,增加了以下几个方法:
- addFirst(E e):在前端插入元素,异常处理和add一样
- addLast(E e):在后端插入元素,和add一样的效果
- offerFirst(E e):在前端插入元素,异常处理和offer一样
- offerLast(E e):在后端插入元素,和offer一样的效果
- removeFirst():移除前端的一个元素,异常处理和remove一样
- removeLast():移除后端的一个元素,和remove一样的效果
- pollFirst():移除前端的一个元素,和poll一样的效果
- pollLast():移除后端的一个元素,异常处理和poll一样
- getFirst():获取前端的一个元素,和element一样的效果
- getLast():获取后端的一个元素,异常处理和element一样
- peekFirst():获取前端的一个元素,和peek一样的效果
- peekLast():获取后端的一个元素,异常处理和peek一样
- removeFirstOccurrence(Object o):从前端开始移除第一个是o的元素
- removeLastOccurrence(Object o):从后端开始移除第一个是o的元素
- push(E e):和addFirst一样的效果
- pop():和removeFirst一样的效果
其实很多方法的效果都是一样的,只不过名字不同。比如Deque为了实现Stack的语义,定义了push和pop两个方法。
阻塞队列
BlockingQueue
BlockingQueue(阻塞队列),在Queue的基础上实现了阻塞等待的功能。它是JDK 1.5中加入的接口,它是指这样的一个队列:当生产者向队列添加元素但队列已满时,生产者会被阻塞;当消费者从队列移除元素但队列为空时,消费者会被阻塞。
先给出BlockingQueue新增的方法:
- put(E e):向队尾插入元素。如果队列满了,阻塞等待,直到被中断为止。
- boolean offer(E e, long timeout, TimeUnit unit):向队尾插入元素。如果队列满了,阻塞等待timeout个时长,如果到了超时时间还没有空间,抛弃该元素。
- take():获取并移除队首的元素。如果队列为空,阻塞等待,直到被中断为止。
- poll(long timeout, TimeUnit unit):获取并移除队首的元素。如果队列为空,阻塞等待timeout个时长,如果到了超时时间还没有元素,则返回null。
- remainingCapacity():返回在无阻塞的理想情况下(不存在内存或资源约束)此队列能接受的元素数量,如果该队列是无界队列,返回Integer.MAX_VALUE。
- drainTo(Collection<? super E> c):移除此队列中所有可用的元素,并将它们添加到给定 collection 中。
- drainTo(Collection<? super E> c, int maxElements):最多从此队列中移除给定数量的可用元素,并将这些元素添加到给定 collection 中。
BlockingQueue最重要的也就是关于阻塞等待的几个方法,而这几个方法正好可以用来实现生产-消费的模型。
从图中我们可以知道实现了BlockingQueue的类有以下几个:
- ArrayBlockingQueue:一个由数组结构组成的有界阻塞队列。
- LinkedBlockingQueue:一个由链表结构组成的有界阻塞队列。
- PriorityBlockingQueue:一个支持优先级排序的无界阻塞队列。
- SynchronousQueue:一个不存储元素的阻塞队列。
- DelayQueue:一个使用优先级队列实现的无界阻塞队列。
ArrayBlockingQueue
ArrayBlockingQueue是一个用数组实现的有界阻塞队列。此队列按照先进先出(FIFO)的原则对元素进行排序。默认情况下不保证访问者公平的访问队列,所谓公平访问队列是指阻塞的所有生产者线程或消费者线程,当队列可用时,可以按照阻塞的先后顺序访问队列,即先阻塞的生产者线程,可以先往队列里插入元素,先阻塞的消费者线程,可以先从队列里获取元素。通常情况下为了保证公平性会降低吞吐量。
特性
- 内部使用循环数组进行存储
- 内部使用ReentrantLock来保证线程安全
- 由Condition的await和signal来实现等待唤醒功能
- 支持对生产者线程和消费者线程进行公平的调度。默认情况下是不保证公平性的。公平性通常会降低吞吐量,但是减少了可变性和避免了线程饥饿问题
数据结构 —— 数组
通常,队列的实现方式有数组和链表两种方式。对于数组这种实现方式来说,我们可以通过维护一个队尾指针,使得在入队的时候可以在O(1)的时间内完成。但是对于出队操作,在删除队头元素之后,必须将数组中的所有元素都往前移动一个位置,这个操作的复杂度达到了O(n),效果并不是很好。如下图所示:
数据结构 —— 环型结构
为了解决这个问题,我们可以使用另外一种逻辑结构来处理数组中各个位置之间的关系。假设现在我们有一个数组A[1…n],我们可以把它想象成一个环型结构,即A[n]之后是A[1],相信了解过一致性Hash算法的应该很容易能够理解。如下图所示:
我们可以使用两个指针,分别维护队头和队尾两个位置,使入队和出队操作都可以在O(1)的时间内完成。当然,这个环形结构只是逻辑上的结构,实际的物理结构还是一个普通的数据。
入队方法
ArrayBlockingQueue 提供了多种入队操作的实现来满足不同情况下的需求,入队操作有如下几种:
-
boolean add(E e):其调用的是父类,即AbstractQueue的add方法,实际上调用的就是offer方法
-
void put(E e):在count等于items长度时,一直等待,直到被其他线程唤醒。唤醒后调用enqueue方法放入队列
-
boolean offer(E e):offer方法在队列满了的时候返回false,否则调用enqueue方法插入元素,并返回true。
enqueue:方法首先把元素放在items的putIndex位置,接着判断在putIndex+1等于队列的长度时把putIndex设置为0,也就是上面提到的圆环的index操作。最后唤醒等待获取元素的线程。
-
boolean offer(E e, long timeout, TimeUnit unit):只是在offer(E e)的基础上增加了超时时间的概念
出队方法
ArrayBlockingQueue提供了多种出队操作的实现来满足不同情况下的需求,如下:
-
E poll():poll方法是非阻塞方法,如果队列没有元素返回null,否则调用dequeue把队首的元素出队列。
dequeue:会根据takeIndex获取到该位置的元素,并把该位置置为null,接着利用圆环原理,在takeIndex到达列表长度时设置为0,最后唤醒等待元素放入队列的线程。
-
E poll(long timeout, TimeUnit unit):该方法是poll()的可配置超时等待方法,和上面的offer一样,使用while循环+Condition的awaitNanos来进行等待,等待时间到后执行dequeue获取元素
-
E take():
获取元素方法
- peek():这里获取元素时上锁是为了避免脏数据的产生
删除元素方法
- remove(Object o):从takeIndex一直遍历到putIndex,直到找到和元素o相同的元素,调用removeAt进行删除。removeAt():
- 当removeIndex == takeIndex时就不需要后面的元素整体往前移了,而只需要把takeIndex的指向下一个元素即可(还记得前面说的ArrayBlockingQueue可以类比为圆环吗)
- 当removeIndex != takeIndex时,通过putIndex将removeIndex后的元素往前移一位
LinkedBlockingQueue
LinkedBlockingQueue是一个用链表实现的有界阻塞队列。此队列的默认和最大长度为Integer.MAX_VALUE
,也就是无界队列,所以为了避免队列过大造成机器负载或者内存爆满的情况出现,我们在使用的时候建议手动传一个队列的大小。此队列按照先进先出的原则对元素进行排序。
LinkedBlockingQueue是一个阻塞队列,内部由两个ReentrantLock来实现出入队列的线程安全,由各自的Condition对象的await和signal来实现等待和唤醒功能。
LinkedBlockingQueue和ArrayBlockingQueue的不同点
- 队列大小有所不同,ArrayBlockingQueue是有界的初始化必须指定大小,而LinkedBlockingQueue可以是有界的也可以是无界的(Integer.MAX_VALUE),对于后者而言,当添加速度大于移除速度时,在无界的情况下,可能会造成内存溢出等问题
- 数据存储容器不同,ArrayBlockingQueue采用的是数组作为数据存储容器,而LinkedBlockingQueue采用的则是以Node节点作为连接对象的链表
- 由于ArrayBlockingQueue采用的是数组的存储容器,因此在插入或删除元素时不会产生或销毁任何额外的对象实例,而LinkedBlockingQueue则会生成一个额外的Node对象。这可能在长时间内需要高效并发地处理大批量数据的时,对于GC可能存在较大影响
- 两者的实现队列添加或移除的锁不一样,ArrayBlockingQueue实现的队列中的锁是没有分离的,即添加操作和移除操作采用的同一个ReenterLock锁,而LinkedBlockingQueue实现的队列中的锁是分离的,其添加采用的是putLock,移除采用的则是takeLock,这样能大大提高队列的吞吐量,也意味着在高并发的情况下生产者和消费者可以并行地操作队列中的数据,以此来提高整个队列的并发性能
入队方法
LinkedBlockingQueue提供了多种入队操作的实现来满足不同情况下的需求,入队操作有如下几种:
- void put(E e):
- 队列已满,阻塞等待
- 队列未满,创建一个node节点放入队列中,如果放完以后队列还有剩余空间,继续唤醒下一个添加线程进行添加。如果放之前队列中没有元素,放完以后要唤醒消费线程进行消费
- boolean offer(E e):offer仅仅对put方法改动了一点点,当队列没有可用元素的时候,不同于put方法的阻塞等待,offer方法直接方法false
- boolean offer(E e, long timeout, TimeUnit unit):该方法只是对offer方法进行了阻塞超时处理,使用了Condition的awaitNanos来进行超时等待。为什么要用while循环?因为awaitNanos方法是可中断的,为了防止在等待过程中线程被中断,这里使用while循环进行等待过程中中断的处理,继续等待剩下需等待的时间
出队方法
入队列的方法说完后,我们来说说出队列的方法。LinkedBlockingQueue提供了多种出队操作的实现来满足不同情况下的需求,如下:
- E take():
- 队列为空,阻塞等待
- 队列不为空,从队首获取并移除一个元素,如果消费后还有元素在队列中,继续唤醒下一个消费线程进行元素移除。如果放之前队列是满元素的情况,移除完后要唤醒生产线程进行添加元素
- E poll():poll方法去除了take方法中元素为空后阻塞等待
- E poll(long timeout, TimeUnit unit):利用了Condition的awaitNanos方法来进行阻塞等待直至超时
获取元素方法
- peek():加锁获取。枷锁后获取到head节点的next节点,如果为空返回null,如果不为空,返回next节点的item值
删除元素方法
- remove(Object o):因为remove方法使用两个锁(put锁和take锁)全部上锁,所以其它操作都需要等待它完成,而该方法需要从head节点遍历到尾节点,所以时间复杂度为O(n)
PriorityBlockingQueue
PriorityBlockingQueue是一个支持优先级的无界队列。默认情况下元素采取自然顺序排列,也可以通过比较器comparator来指定元素的排序规则。元素按照升序排列。
SynchronousQueue
SynchronousQueue是一个不存储元素的阻塞队列。每一个put操作必须等待一个take操作,否则不能继续添加元素。SynchronousQueue可以看成是一个传球手,负责把生产者线程处理的数据直接传递给消费者线程。队列本身并不存储任何元素,非常适合于传递性场景,比如在一个线程中使用的数据,传递给另外一个线程使用,SynchronousQueue的吞吐量高于LinkedBlockingQueue 和 ArrayBlockingQueue。
DelayQueue
DelayQueue是一个支持延时获取元素的无界阻塞队列。队列使用PriorityQueue来实现。队列中的元素必须实现Delayed接口,在创建元素时可以指定多久才能从队列中获取当前元素。只有在延迟期满时才能从队列中提取元素。我们可以将DelayQueue运用在以下应用场景:
- 缓存系统的设计:可以用DelayQueue保存缓存元素的有效期,使用一个线程循环查询DelayQueue,一旦能从DelayQueue中获取元素时,表示缓存有效期到了。
- 定时任务调度。使用DelayQueue保存当天将会执行的任务和执行时间,一旦从DelayQueue中获取到任务就开始执行,从比如TimerQueue就是使用DelayQueue实现的。
BlockingDeque
**BlockingDeque(阻塞双端队列)**在Deque的基础上实现了双端阻塞等待的功能。和第2节说的类似,BlockingDeque也提供了双端队列该有的阻塞等待方法:
- putFirst(E e):在队首插入元素,如果队列满了,阻塞等待,直到被中断为止。
- putLast(E e):在队尾插入元素,如果队列满了,阻塞等待,直到被中断为止。
- offerFirst(E e, long timeout, TimeUnit unit):向队首插入元素。如果队列满了,阻塞等待timeout个时长,如果到了超时时间还没有空间,抛弃该元素。
- offerLast(E e, long timeout, TimeUnit unit):向队尾插入元素。如果队列满了,阻塞等待timeout个时长,如果到了超时时间还没有空间,抛弃该元素。
- takeFirst():获取并移除队首的元素。如果队列为空,阻塞等待,直到被中断为止。
- takeLast():获取并移除队尾的元素。如果队列为空,阻塞等待,直到被中断为止。
- pollFirst(long timeout, TimeUnit unit):获取并移除队首的元素。如果队列为空,阻塞等待timeout个时长,如果到了超时时间还没有元素,则返回null。
- pollLast(long timeout, TimeUnit unit):获取并移除队尾的元素。如果队列为空,阻塞等待timeout个时长,如果到了超时时间还没有元素,则返回null。
- removeFirstOccurrence(Object o):从队首开始移除第一个和o相等的元素。
- removeLastOccurrence(Object o):从队尾开始移除第一个和o相等的元素。
LinkedBlockingDeque
LinkedBlockingDeque是一个由链表结构组成的双向阻塞队列,即可以从队列的两端插入和移除元素。双向队列因为多了一个操作队列的入口,在多线程同时入队时,也就减少了一半的竞争。LinkedBlockingDeque
是可选容量的,在初始化时可以设置容量防止其过度膨胀,如果不设置,默认容量大小为Integer.MAX_VALUE
。
相比于其它阻塞队列,LinkedBlockingDeque多了addFirst、addLast、peekFirst、peekLast等方法,以first结尾的方法,表示插入、获取获移除双端队列的第一个元素。以last结尾的方法,表示插入、获取获移除双端队列的最后一个元素。
LinkedBlockingDeque和LinkedBlockingQueue的相同点
- 基于链表
- 容量可选,不设置的话,就是Int的最大值
LinkedBlockingDeque和LinkedBlockingQueue的不同点
- 双端链表和单链表
- 不存在头节点
- 一把锁+两个条件
入队方法
LinkedBlockingDeque提供了多种入队操作的实现来满足不同情况下的需求,入队操作有如下几种:
- add(E e)、addFirst(E e)、addLast(E e)
- offer(E e)、offerFirst(E e)、offerLast(E e)
- offer(E e, long timeout, TimeUnit unit)、offerFirst(E e, long timeout, TimeUnit unit)、offerLast(E e, long timeout, TimeUnit unit)
- put(E e)、putFirst(E e)、putLast(E e)
出队方法
入队列的方法说完后,我们来说说出队列的方法。LinkedBlockingDeque提供了多种出队操作的实现来满足不同情况下的需求,如下:
- remove()、removeFirst()、removeLast()
- poll()、pollFirst()、pollLast()
- take()、takeFirst()、takeLast()
- poll(long timeout, TimeUnit unit)、pollFirst(long timeout, TimeUnit unit)、pollLast(long timeout, TimeUnit unit)
获取元素方法
获取元素前加锁,防止并发问题导致数据不一致。利用first和last节点直接可以获得元素。
- element()
- peek()
删除元素方法
删除元素是从头/尾向两边进行遍历比较,故时间复杂度为O(n),最后调用unlink把要移除元素的prev和next进行关联,把要移除的元素从链中脱离,等待下次GC回收。
- remove(Object o):
TransferQueue
TransferQueue是JDK 1.7对于并发类库新增加的一个接口,它扩展自BlockingQueue,所以保持着阻塞队列的所有特性。
TransferQueue对比与BlockingQueue更强大的一点是,生产者会一直阻塞直到所添加到队列的元素被某一个消费者所消费(不仅仅是添加到队列里就完事)。新添加的transfer方法用来实现这种约束。顾名思义,阻塞就是发生在元素从一个线程transfer到另一个线程的过程中,它有效地实现了元素在线程之间的传递(以建立Java内存模型中的happens-before关系的方式)。
该接口提供的标准方法:
- tryTransfer(E e):若当前存在一个正在等待获取的消费者线程(使用take()或者poll()函数),使用该方法会即刻转移/传输对象元素e并立即返回true;若不存在,则返回false,并且不进入队列。这是一个不阻塞的操作
- transfer(E e):若当前存在一个正在等待获取的消费者线程,即立刻移交之;否则,会插入当前元素e到队列尾部,并且等待进入阻塞状态,到有消费者线程取走该元素
- tryTransfer(E e, long timeout, TimeUnit unit):若当前存在一个正在等待获取的消费者线程,会立即传输给它;否则将插入元素e到队列尾部,并且等待被消费者线程获取消费掉;若在指定的时间内元素e无法被消费者线程获取,则返回false,同时该元素被移除
- hasWaitingConsumer():判断是否存在消费者线程
- getWaitingConsumerCount():获取所有等待获取元素的消费线程数量
LinkedTransferQueue
LinkedTransferQueue 是单向链表结构的无界阻塞队列。
LinkedTransferQueue(LTQ) 相比 BlockingQueue 更进一步,生产者会一直阻塞直到所添加到队列的元素被某一个消费者所消费(不仅仅是添加到队列里就完事)。新添加的 transfer 方法用来实现这种约束。顾名思义,阻塞就是发生在元素从一个线程 transfer 到另一个线程的过程中,它有效地实现了元素在线程之间的传递(以建立 Java 内存模型中的 happens-before 关系的方式)。**Doug Lea 说从功能角度来讲,LinkedTransferQueue 实际上是 ConcurrentLinkedQueue、SynchronousQueue(公平模式)和 LinkedBlockingQueue 的超集。**而且 LinkedTransferQueue 更好用,因为它不仅仅综合了这几个类的功能,同时也提供了更高效的实现。