BlockingQueue
BlockingQueue
是Java并发包(java.util.concurrent)中提供的一个阻塞队列接口,它继承自 Queue
接口。
BlockingQueue
中的元素采用 FIFO 的原则,支持多线程环境并发访问,提供了阻塞读取和写入的操作,当前线程在队列满或空的情况下会被阻塞,直到被唤醒或超时。
常用的实现类有:
ArrayBlockingQueue
LinkedBlockingQueue
:并发容器 LinkedBlockingQueue 详解PriorityBlockingQueue
:SynchronousQueue
:LinkedBlockingDeque
:并发容器 LinkedBlockingDeque 详解
等类,它们的实现方式各有不同。
适用场景
BlockingQueue 通常用于一个线程生产对象,而另外一个线程消费这些对象的场景。
一个线程将会持续生产新对象并将其插入到队列之中,直到队列达到它所能容纳的临界点。
如果该阻塞队列到达了其临界点,生产者线程将会在往里边插入新对象时发生阻塞。它会一直处于阻塞之中,直到消费者线程从队列中拿走一个对象。
消费者线程将会一直从该阻塞队列中拿出对象。如果消费线程尝试去从一个空的队列中提取对象的话,这个消费线程将会处于阻塞之中,直到一个生产线程把一个对象丢进队列。
常用方法
-
put(E e)
:将元素 e 插入到队列中,如果队列已满,则会阻塞当前线程,直到队列有空闲空间 -
offer(E e)
:将元素 e 插入到队列中,如果队列已满,则返回 false。 -
offer(E element, long timeout, TimeUnit unit)
方法是BlockingQueue
:在指定的时间内将元素添加到队列中。-
timeout
:超时时间,表示在指定的时间内等待队列空间可用。如果超过指定的时间仍然无法将元素添加到队列中,将返回 false。 -
unit
:超时时间的单位。
-
-
take()
:移除并返回队列头部的元素,如果队列为空,则会阻塞当前线程,直到队列有元素 -
poll()
:移除并返回队列头部的元素,如果队列为空,则返回null
-
poll(long timeout, TimeUnit unit)
:在指定的时间内从队列中检索并移除元素。返回移除的元素。如果超过指定的时间仍然没有可用的元素,将返回 null。 -
peek()
:返回队列头部的元素,但不会移除。如果队列为空,则返回null
-
size()
:返回队列中元素的数量 -
isEmpty()
:判断队列是否为空,为空返回 true,否则返回 false -
isFull()
:判断队列是否已满,已满返回 true,否则返回 false -
clear()
:清空队列中的所有元素
行为 | 抛异常 | 返回特定值 | 阻塞 | 超时 | |
---|---|---|---|---|---|
插入 | add(o) | offer(o) | put(o) | offer(o, timeout, timeunit) | |
移除 | remove() | poll() | take() | poll(timeout, timeunit) | |
检查 | element() | peek() |
-
抛异常: 如果试图的操作无法立即执行,抛一个异常。
-
特定值: 如果试图的操作无法立即执行,返回 true / false / null)。
-
阻塞: 如果试图的操作无法立即执行,该方法调用将会发生阻塞,直到能够执行。
-
超时: 如果试图的操作无法立即执行,该方法调用将会发生阻塞,直到能够执行,但等待时间不会超过给定值。返回一个特定值以告知该操作是否成功(典型的是 true / false)。
若向 BlockingQueue 中插入 null,将会抛出 NullPointerException
死锁问题
需要注意的是,在使用 BlockingQueue
时要注意防止死锁的问题:
-
在队列满之后调用
offer()
方法插入元素会返回false
,此时不能直接调用put()
方法,因为在插入之前还需要获取其它资源,如果在获取资源时一直阻塞在这里,就会发生死锁。 -
为了防止死锁的问题,建议使用
offer(E e, long timeout, TimeUnit unit)
和poll(long timeout, TimeUnit unit)
带有超时时间的方法。
ArrayBlockingQueue
ArrayBlockingQueue
是一个有界阻塞队列,它是基于数组实现的。按 FIFO(先进先出)原则对元素进行排序。
队列的头部 是在队列中存在时间最长的元素。队列的尾部 是在队列中存在时间最短的元素。
ArrayBlockingQueue
适用于多生产者多消费者的场景,可以很好地控制生产者和消费者的速度,以平衡系统的负载。
特点
-
有界队列:
ArrayBlockingQueue
有固定的容量限制,队列容量在创建时就确定,无法改变。 -
阻塞队列:当生产者向队列中添加元素的时候,如果队列已满,就会阻塞产生线程,等待队列中有空闲的位置;
当消费者从队列中取出元素的时候,如果队列为空,则会阻塞消费者线程,等待有新的元素加入到队列中。
-
线程安全:
ArrayBlockingQueue
是线程安全的队列,多个线程可以同时操作队列,而不会出现线程冲突和数据不一致的问题。
由于 ArrayBlockingQueue
是有界的,所以使用时需要注意队列的容量。
如果生产者的速度过快,而消费者的速度跟不上,队列会满,生产者将被阻塞,直到有空闲位置;
如果消费者的速度过快,而生产者的速度跟不上,队列会空,消费者将被阻塞,直到有新的元素加入到队列中。
使用场景
-
生产者-消费者模式:在生产者-消费者模式中,生产者生成数据并添加到队列中,而消费者从队列中取出数据并处理。
ArrayBlockingQueue
的阻塞特性能够自动调节生产者和消费者的速度,确保系统的稳定运行。 -
流量控制:在需要限制系统中待处理任务或数据数量的场景中,可以使用
ArrayBlockingQueue
作为缓冲区,通过设置队列的最大容量来控制流量。 -
任务调度与负载均衡:在并发系统中,
ArrayBlockingQueue
可以作为任务队列使用,存储待执行的任务,由线程池中的工作线程进行处理。
构造方法
- 根据指定的容量创建一个
ArrayBlockingQueue
对象。队列的容量是固定的,不能更改。
ArrayBlockingQueue()
-
根据指定的容量和公平性参数创建一个
ArrayBlockingQueue
对象。公平性参数决定是否按照 FIFO 的顺序来处理阻塞的线程。如果传入
true
,则队列会按照线程等待时间的顺序进行处理如果传入
false
,不考虑线程等待时间的先后顺序,而是尽可能地让新到达的线程获取共享资源。
ArrayBlockingQueue(int capacity, boolean fair)
- 根据指定的容量、公平性参数和初始元素集合创建一个
ArrayBlockingQueue
对象。初始元素集合会按照集合的迭代器顺序依次添加到队列中。
ArrayBlockingQueue(int capacity, boolean fair, Collection<? extends E> collection)
使用示例
以下是一个简单的生产者-消费者模式示例,展示了如何使用ArrayBlockingQueue
:
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
public class ProducerConsumerExample {
public static void main(String[] args) {
BlockingQueue<Integer> queue = new ArrayBlockingQueue<>(5);
Thread producer = new Thread(() -> {
for (int i = 0; i < 10; i++) {
try {
System.out.println("生产者生产了数据: " + i);
queue.put(i);
Thread.sleep(200);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
Thread consumer = new Thread(() -> {
while (true) {
try {
Integer data = queue.take();
System.out.println("消费者消费了数据: " + data);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
producer.start();
consumer.start();
}
}
工作原理:
-
生产者:不断地向队列中添加数据,每添加一个数据就休眠 200 毫秒。如果队列满了(即达到 5 个元素),
put
方法会阻塞,直到队列中有空间。 -
消费者:不断地从队列中取出数据。如果队列为空,
take
方法会阻塞,直到队列中有数据。
注意事项
在使用 ArrayBlockingQueue
时,应注意:
-
选择合适的队列大小:队列的大小应根据具体的应用场景来设置,避免过大或过小导致的性能问题或资源浪费。
-
合理使用阻塞方法:根据具体需求选择合适的阻塞方法(如
put
、take
、offer
、poll
等),以控制生产者和消费者的行为。 -
注意避免死锁:在使用
ArrayBlockingQueue
时,要注意避免在持有其他锁的情况下调用其阻塞方法,以防止死锁的发生。 -
考虑公平性需求:在决定是否使用公平策略时,需要综合考虑系统的性能和公平性要求。