文章目录
- 阻塞队列的由来
- BlockingQueue的操作方法
- BlockingQueue的实现类
- ArrayBlockingQueue
- LinkedBlockingQueue
- DelayQueue
- PriorityBlockingQueue
- SynchronousQueue
- 阻塞队列原理深入分析
- 1. 构造器和监视器初始化
- 2. put操作的实现
- 3. take操作的实现
- 4. 注意事项
- 小结
- 线程池中的阻塞队列
- BlockingQueue 的使用场景
- 生产者-消费者模型
阻塞队列的由来
假设一种典型场景:有一个生产者不断地生产资源,消费者不断地消费资源,所有的资源被存储在一个共享的缓冲池中。生产者将资源放入缓冲池,消费者从缓冲池取出资源进行消费。这种设计模式被称为生产者-消费者模式。
生产者-消费者模式的优势在于:
- 消除了生产者类与消费者类之间的代码依赖性。
- 将数据生产与数据使用的过程解耦,有助于简化开发过程,并提升系统的负载能力。
但在实现这一模式时,多个线程对共享变量的操作会引发线程安全问题,如重复消费或死锁。尤其是在多个生产者和消费者同时存在的情况下,管理共享资源变得更为复杂。当缓冲池为空时,消费者需要等待;当缓冲池满时,生产者需要等待。为了实现这种等待-唤醒机制,需要手动编写复杂的线程同步代码。
幸运的是,Java 的并发包(java.util.concurrent
)中已经为我们提供了一个用于简化这类开发的工具——阻塞队列(BlockingQueue)。使用阻塞队列,开发人员不需要再担心在多线程环境下操作共享变量时的线程安全问题。
阻塞队列是 Java 并发包下的重要数据结构。与普通队列不同,它提供了线程安全的访问方式,是并发包中很多高级同步类的基础。
通常,阻塞队列用于生产者-消费者模式。在这种模式中,生产者将数据添加到队列中,消费者从队列中取出数据。阻塞队列就是用来存放这些数据的容器。
BlockingQueue的操作方法
阻塞队列提供了四组方法用于插入、移除和检查元素:
操作类型 | 抛出异常 | 返回特殊值 | 一直阻塞 | 超时退出 |
---|---|---|---|---|
插入 | add(e) | offer(e) | put(e) | offer(e, time, unit) |
移除 | remove() | poll() | take() | poll(time, unit) |
检查 | element() | peek() | - | - |
- 抛出异常:当操作无法立即执行时,方法会抛出异常。例如,当阻塞队列已满时,调用
add(e)
将抛出IllegalStateException("Queue full")
;当队列为空时,调用remove()
将抛出NoSuchElementException
。 - 返回特殊值:如果操作无法立即执行,方法返回一个特殊值(如
true/false
)。 - 一直阻塞:如果操作无法立即执行,方法会阻塞线程直到操作成功或线程被中断。
- 超时退出:方法会在给定时间内等待操作成功,如果超时则返回一个特殊值(如
true/false
)。
注意:
- 阻塞队列中不允许插入
null
值,否则会抛出NullPointerException
。 - 尽量避免调用
remove(o)
移除特定对象,这种操作效率较低。
BlockingQueue的实现类
ArrayBlockingQueue
ArrayBlockingQueue
是基于数组结构的有界阻塞队列。它的内部结构为一个定长数组,且一旦初始化,队列的大小不能改变。该类构造函数允许指定是否使用公平锁,默认是非公平锁。
public ArrayBlockingQueue(int capacity, boolean fair){
// 初始化队列大小和公平性设置
lock = new ReentrantLock(fair);
// 初始化其他参数
}
LinkedBlockingQueue
LinkedBlockingQueue
是基于链表结构的有界阻塞队列。默认情况下,队列大小为Integer.MAX_VALUE
,可以手动指定队列大小。该队列遵循**先进先出(FIFO)**原则。
DelayQueue
DelayQueue
是一个特殊的阻塞队列,队列中的元素只有在指定的延迟时间到期后,才能从队列中获取到。队列中的元素必须实现java.util.concurrent.Delayed
接口。DelayQueue
是一个无界队列,插入操作永远不会被阻塞,但获取操作会在没有到期元素时被阻塞。
PriorityBlockingQueue
PriorityBlockingQueue
是基于优先级的无界阻塞队列。队列元素的优先级由构造函数中传入的Comparator
对象决定。内部使用非公平锁进行线程同步。
public PriorityBlockingQueue(int initialCapacity,
Comparator<? super E> comparator) {
this.lock = new ReentrantLock(); // 默认使用非公平锁
// 初始化其他参数
}
SynchronousQueue
SynchronousQueue
是一个没有内部容量的特殊队列。每个插入操作必须等待一个相应的移除操作,反之亦然。
iterator()
永远返回空。peek()
永远返回null
。put()
阻塞,直到有消费者取走元素。offer()
在插入成功后立即返回true
,否则返回false
。take()
阻塞,直到有元素可取。poll()
在取不到元素时立即返回null
。isEmpty()
永远返回true
。
阻塞队列原理深入分析
阻塞队列的原理主要基于锁和**条件变量(Condition)**来控制线程的等待与唤醒机制,确保多线程环境下的线程安全和资源同步。
通过分析ArrayBlockingQueue
的源码,我们可以深入理解阻塞队列的工作原理。
1. 构造器和监视器初始化
public ArrayBlockingQueue(int capacity, boolean fair) {
// 初始化队列大小和是否公平锁
lock = new ReentrantLock(fair);
// 初始化消费者和生产者监视器
notEmpty = lock.newCondition();
notFull = lock.newCondition();
}
- 队列数组(items): 用于存储队列元素。
- 锁(lock): 控制多线程对队列的并发访问,支持公平锁和非公平锁。
- 条件变量:
notEmpty
用于标记和唤醒消费者线程,notFull
用于标记和唤醒生产者线程。
2. put操作的实现
public void put(E e) throws InterruptedException {
checkNotNull(e);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly(); // 自旋获取锁
try {
while (count == items.length) {
// 如果队列已满,阻塞当前线程,标记为生产者线程
notFull.await();
}
// 插入元素到队列
enqueue(e);
} finally {
lock.unlock(); // 释放锁
}
}
put
操作分为以下几步:
- 获取锁: 线程尝试获取锁。如果没有获取到,线程将自旋等待。
- 判断队列是否已满: 如果队列已满,线程调用
notFull.await()
方法阻塞自己,等待被消费者线程唤醒。此时线程释放锁。 - 插入元素: 当队列未满或被唤醒后再次获取到锁,线程将元素插入队列。
- 唤醒消费者线程: 插入元素后,线程调用
notEmpty.signal()
唤醒一个等待的消费者线程。
3. take操作的实现
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly(); // 自旋获取锁
try {
while (count == 0) {
// 如果队列为空,阻塞当前线程,标记为消费者线程
notEmpty.await();
}
// 从队列中取出元素
return dequeue();
} finally {
lock.unlock(); // 释放锁
}
}
take
操作的流程与put
操作类似:
- 获取锁: 线程尝试获取锁,失败则自旋等待。
- 判断队列是否为空: 如果队列为空,线程调用
notEmpty.await()
阻塞自己,等待被生产者线程唤醒,线程释放锁。 - 取出元素: 当队列非空或线程被唤醒并获取到锁后,线程取出队列中的元素。
- 唤醒生产者线程: 取出元素后,线程调用
notFull.signal()
唤醒一个等待的生产者线程。
4. 注意事项
- 锁的竞争:
put
和take
操作必须首先获取到锁,才能进行后续操作。没有获取到锁的线程会自旋等待,避免竞争引发的线程安全问题。 - 条件等待与唤醒: 如果队列满或空,线程将被阻塞并释放锁,等待相应的生产者或消费者线程唤醒。
- 循环判断(while而非if): 使用
while
循环判断队列状态,确保被唤醒的线程在条件改变后,仍能正确执行后续操作。即使被唤醒,线程还需重新检查队列状态,防止并发条件下的误操作。
小结
阻塞队列通过ReentrantLock
和Condition
机制,确保了在多线程环境下的安全操作。put
和take
操作通过相应的条件变量,阻塞等待与唤醒操作,使生产者和消费者线程能够有效地协作,从而实现线程间的同步与资源共享。
线程池中的阻塞队列
Java 线程池(ThreadPoolExecutor
)中的任务队列就是使用阻塞队列实现的。
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
Executors.defaultThreadFactory(), defaultHandler);
}
使用阻塞队列可以有效地管理任务的执行,实现线程池的负载均衡和任务调度。
BlockingQueue 的使用场景
生产者-消费者模式和线程池是阻塞队列的两个经典应用场景。
生产者-消费者模型
以下是一个简单的生产者-消费者模型示例:
public class ProducerConsumerExample {
private int queueSize = 10;
private ArrayBlockingQueue<Integer> queue = new ArrayBlockingQueue<>(queueSize);
public static void main(String[] args) {
ProducerConsumerExample example = new ProducerConsumerExample();
Producer producer = example.new Producer();
Consumer consumer = example.new Consumer();
producer.start();
consumer.start();
}
class Consumer extends Thread {
@Override
public void run() {
consume();
}
private void consume() {
while (true) {
try {
queue.take();
System.out.println("取走一个元素,队列剩余:" + queue.size() + " 个元素");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
class Producer extends Thread {
@Override
public void run() {
produce();
}
private void produce() {
while (true) {
try {
queue.put(1);
System.out.println("插入一个元素,队列剩余空间:" + (queueSize - queue.size()));
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
}
输出可能如下:
取走一个元素,队列剩余0个元素
取走一个元素,队列剩余0个元素
插入一个元素,队列剩余空间:9
插入一个元素,队列剩余空间:9
注意:由于put()
和System.out.println()
没有加锁,可能会出现日志输出不一致的情况。