目录
1、前言
2、BlockingQueue
2.1、ArrayBlockingQueue
2.1.1、take()
2.1.2、put()
2.2、LinkedBlockingQueue
2.3、PriorityBlockingQueue
2.4、SynchronousQueue
3、简单使用
3.1、创建ArrayBlockingQueue
3.2、Demo
1、前言
对于并发程序而言,高性能自然是一个我们需要追求的目标,但多线程的开发模式还会引入一个问题,那就是如何进行多个线程间的数据交换和共享呢?而JUC库中提供了多种并发队列和环形缓冲区的实现,为我们提供了高性能和线程安全的数据结构。
2、BlockingQueue
BlockingQueue是Java从JDK5开始在并发包(JUC)内引入的。他之所以适合作为数据交换共享的通道,关键在于他的Blocking上。Blocking是阻塞的意思。当服务线程(服务线程指不断获取队列中的消息,进行处理的线程)处理完成队列中所有的消息后,它如何知道下一条消息何时到来呢?
有两种做法:
- 不断轮询监控该队列;
- 监控队列空时,进行等待;当有消息进入队列时,自动唤醒该线程;
很明显第一种方案造成了不必要的资源浪费(线程不停的循环和监控队列)。BlockingQueue则很好的解决了该问题。它会让服务线程在队列为空时进行等待,当有新的消息进入队列后,自动将线程唤醒。
BlockingQueue实际上是个接口。提供了最基本的队列元素操作API,如add(), offer(),put(),take(),poll(),remove()等。
public interface BlockingQueue<E> extends Queue<E> {
boolean add(E var1);
boolean offer(E var1);
void put(E var1) throws InterruptedException;
boolean offer(E var1, long var2, TimeUnit var4) throws InterruptedException;
E take() throws InterruptedException;
E poll(long var1, TimeUnit var3) throws InterruptedException;
int remainingCapacity();
boolean remove(Object var1);
boolean contains(Object var1);
int drainTo(Collection<? super E> var1);
int drainTo(Collection<? super E> var1, int var2);
}
有4个主要的实现类:ArrayBlockingQueue,LinkedBlockingQueue,PriorityBlockingQueue,SynchronousQueue。
2.1、ArrayBlockingQueue
ArrayBlockingQueue是基于数组实现的有界阻塞队列,内部使用一个可重入锁来保证线程安全。我们主要介绍一下该类,下面其他的都基本类似,以此类来讲一下他是如何实现上面说到的数据共享的。
从官方文档可以看出,他是一个有界队列,他会尝试put成满的队列的元件将导致在操作阻挡;尝试take从空队列的元件将类似地阻塞。
用过队列的小伙伴应该都知道,向队列中压入元素可以使用 offer()方法和 put()方法。对于 offer()方法,如果当前队列已经满了,它就会立即返回 false。如果没有满,则执行正常的入队操作。所以,我们不讨论这个方法。现在,我们需要关注的是 put()方法。put()方法也是将元素压入队列末尾。但如果队列满了,它会一直等待,直到队列中有空闲的位置。
从队列中弹出元素可以使用 poll()方法和 take()方法。它们都从队列的头部获得一个元素。不同之处在于:如果队列为空,那么 poll()方法会直接返回 null,而 take()方法会等待,直到队列内有可用元素。
ArrayBlockingQueue类的内部元素都放置在一个对象数组中:
/** The queued items */
final Object[] items;
因此,put()方法和 take()方法才是体现 Blocking 的关键。为了做好等待和通知两件事,在ArrayBlockingQueue 类内部定义了以下一些字段。
2.1.1、take()
从源码可以看到take():
/** Condition for waiting takes */
private final Condition notEmpty;
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == 0)
notEmpty.await();
return dequeue();
} finally {
lock.unlock();
}
}
当执行 take()操作时,如果队列为空,则让当前线程在 notEmpty 上等待。新元素入以时,则进行一次 notEmpty 上的通知。
notEmpty实际上是个Condition并发类。在前面《【JUC基础】06. 生产者和消费者问题》中有提到过,可以找到该篇文章再熟悉一下。
当代码进行到take()执行到notEmpty.await();时,当前线程会进行等待,当队列中新插入新的元素时,线程便会得到一个通知,自动唤醒。
/**
* 新增一个元素
* Inserts element at current put position, advances, and signals.
* Call only when holding lock.
*/
private void enqueue(E x) {
// assert lock.getHoldCount() == 1;
// assert items[putIndex] == null;
final Object[] items = this.items;
items[putIndex] = x;
if (++putIndex == items.length)
putIndex = 0;
count++;
notEmpty.signal();
}
当新元素进入队列后,调用notEmpty.signal();唤醒线程,继续工作。
2.1.2、put()
与take() 类似,put()的操作也是一样的。当队列满的时候,需要让压入的线程等待。
/** Condition for waiting puts */
private final Condition notFull;
/**
* Inserts the specified element at the tail of this queue, waiting
* for space to become available if the queue is full.
*
* @throws InterruptedException {@inheritDoc}
* @throws NullPointerException {@inheritDoc}
*/
public void put(E e) throws InterruptedException {
checkNotNull(e);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == items.length)
notFull.await();
enqueue(e);
} finally {
lock.unlock();
}
}
当有元素从队列中被取走,队列中出现空位置时,自然也需要通知等待入队的线程。
/**
* Extracts element at current take position, advances, and signals.
* Call only when holding lock.
*/
private E dequeue() {
// assert lock.getHoldCount() == 1;
// assert items[takeIndex] != null;
final Object[] items = this.items;
@SuppressWarnings("unchecked")
E x = (E) items[takeIndex];
items[takeIndex] = null;
if (++takeIndex == items.length)
takeIndex = 0;
count--;
if (itrs != null)
itrs.elementDequeued();
notFull.signal();
return x;
}
从实现上来说,ArrayBlockingQueue在物理上是一个数字,但在逻辑上来说是个环形结构。由于其数组的特性,其容量在初始化时就已指定,并且无法动态调整。
当有元素加入或离开队列时,总是使用takeIndex和putIndex两个变量分别表示队列头部和尾部元素在数组中的位置。每一次入队和出队操作都会调整这两个重要的索引位置。
private int incCursor(int index) {
// assert lock.getHoldCount() == 1;
if (++index == items.length)
index = 0;
if (index == putIndex)
index = NONE;
return index;
}
/**
* Circularly decrement i.
*/
final int dec(int i) {
return ((i == 0) ? items.length : i) - 1;
}
可以看出,这两个函数将数组的头尾相接,实现了环形数组。
2.2、LinkedBlockingQueue
与ArrayBlockingQueue类似,LinkedBlockingQueue基于链表实现的可选有界或无界阻塞队列,内部使用两个可重入锁来保证线程安全。这里就不详细展开了。
2.3、PriorityBlockingQueue
PriorityBlockingQueue则是基于优先级堆实现的无界阻塞队列,元素根据优先级进行排序。
2.4、SynchronousQueue
SynchronousQueue则是一个没有容量的阻塞队列,每个插入操作都必须等待另一个线程的移除操作,适用于直接传递任务的场景。
3、简单使用
ArrayBlockingQueue提供了接口中所有方法的实现BlockingQueue。这些方法用于插入、访问和删除数组阻塞队列中的元素。前面说的put和take是阻塞操作的方法,其他的可以参看API自己尝试。
3.1、创建ArrayBlockingQueue
为了创建数组阻塞队列,我们必须导入该java.util.concurrent.ArrayBlockingQueue包。导入包后,我们可以使用以下方法在 Java 中创建数组阻塞队列:
/**
* capacity: 数组阻塞队列的大小
*/
ArrayBlockingQueue<Type> animal = new ArrayBlockingQueue<>(int capacity);
3.2、Demo
import java.util.concurrent.ArrayBlockingQueue;
class Main {
public static void main(String[] args) {
ArrayBlockingQueue<String> animals = new ArrayBlockingQueue<>(5);
try {
//Add elements to animals
animals.put("Dog");
animals.put("Cat");
System.out.println("ArrayBlockingQueue: " + animals);
// Remove an element
String element = animals.take();
System.out.println("Removed Element: " + element);
}
catch(Exception e) {
System.out.println(e);
}
}
}
输出:
ArrayBlockingQueue:[Dog,Cat]
Removed Element: Dog