Java阻塞队列:ArrayBlockingQueue
ArrayBlockingQueue
是Java中的一个阻塞队列(Blocking Queue)实现,它是线程安全的,并且基于数组实现。ArrayBlockingQueue
常用于生产者-消费者模型,在这种模型中,生产者线程负责将元素放入队列,而消费者线程负责从队列中取出元素。
ArrayBlockingQueue
是一个有界队列,这意味着它有一个固定的容量。在队列已满时,试图向队列中添加元素的操作将被阻塞,直到队列有空间可用。同样地,在队列为空时,试图从队列中取出元素的操作也将被阻塞,直到队列中有可用的元素。
主要特性
- 线程安全:
ArrayBlockingQueue
内部使用锁和条件变量来确保线程安全。 - 有界:队列的容量在创建时指定,并且无法改变。
- FIFO顺序:元素按照先进先出的顺序进行处理。
构造方法
ArrayBlockingQueue
提供了多个构造方法,常用的有以下两种:
public ArrayBlockingQueue(int capacity)
public ArrayBlockingQueue(int capacity, boolean fair)
capacity
:指定队列的容量。fair
:指定是否使用公平策略。如果设置为true
,则队列的操作将按照公平的顺序进行;否则,不保证公平性。
主要方法
put(E e)
:将指定元素添加到队列中,如果队列已满,则等待空间可用。take()
:从队列中获取并移除元素,如果队列为空,则等待元素可用。offer(E e)
:尝试将指定元素添加到队列中,如果队列已满,则返回false
。poll()
:从队列中获取并移除元素,如果队列为空,则返回null
。
使用场景
ArrayBlockingQueue
非常适合以下场景:
- 生产者-消费者模型:多个生产者线程向队列中添加任务,多个消费者线程从队列中取出任务进行处理。
- 线程池:用于存放待处理任务的队列,线程池中的工作线程从队列中取出任务并执行。
示例代码
下面是一个简单的示例,展示了如何使用ArrayBlockingQueue
实现生产者-消费者模型。
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
public class ProducerConsumerExample {
private static final int CAPACITY = 10;
private static final BlockingQueue<Integer> queue = new ArrayBlockingQueue<>(CAPACITY);
public static void main(String[] args) {
Thread producer = new Thread(new Producer());
Thread consumer = new Thread(new Consumer());
producer.start();
consumer.start();
}
static class Producer implements Runnable {
@Override
public void run() {
try {
for (int i = 0; i < 20; i++) {
System.out.println("Produced: " + i);
queue.put(i);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
static class Consumer implements Runnable {
@Override
public void run() {
try {
while (true) {
int value = queue.take();
System.out.println("Consumed: " + value);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
}
线程安全
ArrayBlockingQueue
的线程安全性主要依赖于内部的锁机制和条件变量来管理并发访问。这种设计确保了多个线程可以安全地进行入队和出队操作,而不会导致数据不一致或其他并发问题。具体来说,ArrayBlockingQueue
通过以下几种方式实现线程安全:
ReentrantLock
ArrayBlockingQueue
使用java.util.concurrent.locks.ReentrantLock
来管理对共享资源的访问。ReentrantLock是一种可重入的互斥锁,允许同一个线程多次获得锁而不会导致死锁。ArrayBlockingQueue
通常会使用两种锁:
- 主锁(Main Lock):用于保护队列的所有变更操作,如插入、删除等。
- 分离锁(Separate Locks):在某些实现中,可能会为插入和删除操作使用不同的锁,以减少锁竞争并提高并发性能。
在ArrayBlockingQueue
中,通常只有一个锁来保护整个队列。
Condition条件变量
ArrayBlockingQueue
还使用了java.util.concurrent.locks.Condition
条件变量来实现线程间的协作。Condition变量提供了类似Object
类中的wait
、notify
和notifyAll
方法,但更强大和灵活。通过Condition变量,可以让线程在特定条件下等待或被唤醒,这对于实现阻塞操作非常重要。
在ArrayBlockingQueue
中,通常会有两个Condition变量:
- notFull:表示队列未满的条件。当队列已满时,试图执行插入操作的线程会在这个条件上等待,直到有空间可用。
- notEmpty:表示队列不为空的条件。当队列为空时,试图执行移除操作的线程会在这个条件上等待,直到有元素可用。
线程安全机制的实现
以下是ArrayBlockingQueue
实现线程安全的几个关键点:
-
加锁与解锁
在每次修改队列状态(如插入或删除元素)之前,
ArrayBlockingQueue
都会先获取主锁,以确保只有一个线程能够进行修改操作。当操作完成后,再释放锁。final ReentrantLock lock = this.lock; lock.lock(); try { // 修改队列状态 } finally { lock.unlock(); }
-
等待和通知
使用Condition变量来处理队列满和空的情况。当队列已满时,插入操作会调用
notFull.await()
进入等待状态,直到有空间可用。同样,当队列为空时,移除操作会调用notEmpty.await()
进入等待状态,直到有新元素被插入。final ReentrantLock lock = this.lock; lock.lock(); try { while (count == items.length) { notFull.await(); } // 插入元素 notEmpty.signal(); } finally { lock.unlock(); }
final ReentrantLock lock = this.lock; lock.lock(); try { while (count == 0) { notEmpty.await(); } // 移除元素 notFull.signal(); } finally { lock.unlock(); }
完整示例代码
以下是ArrayBlockingQueue
的一个简化示例,展示了如何使用ReentrantLock和Condition来实现线程安全的阻塞队列:
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
public class SimpleArrayBlockingQueue<E> {
private final E[] items;
private int putIndex, takeIndex, count;
private final ReentrantLock lock = new ReentrantLock();
private final Condition notFull = lock.newCondition();
private final Condition notEmpty = lock.newCondition();
public SimpleArrayBlockingQueue(int capacity) {
items = (E[]) new Object[capacity];
}
public void put(E e) throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lock();
try {
while (count == items.length) {
notFull.await();
}
items[putIndex] = e;
if (++putIndex == items.length) putIndex = 0;
count++;
notEmpty.signal();
} finally {
lock.unlock();
}
}
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lock();
try {
while (count == 0) {
notEmpty.await();
}
E e = items[takeIndex];
if (++takeIndex == items.length) takeIndex = 0;
count--;
notFull.signal();
return e;
} finally {
lock.unlock();
}
}
}