队列
队列的特点先进先出(FIFO)。
如图: 进入队列的顺序是1,2,3,那么出队列的顺序只能是1,2,3,不可能是其他顺序,这是由队列的特点保证的。
保存数据的基本数据结构有数组和链表,基于此,实现队列分为数组队列(ArrayQueue)和链表队列(LinkedQueue)。
BolckingQueue分类
从名称可知 Blocking Queue 阻塞队列,具体可分为数组阻塞队列和链表阻塞队列。
ArrayBlockingQueue源码:
public class ArrayBlockingQueue<E> extends AbstractQueue<E>
implements BlockingQueue<E>, java.io.Serializable {
//存储元素的数组
final Object[] items;
//锁 因此是多线程安全
final ReentrantLock lock;
//条件队列
private final Condition notEmpty;
private final Condition notFull;
...
}
从ArrayBlockingQueue 源码中不难看出,使用Object数组存储元素,使用ReentrantLock 保证多线程操作安全,
阻塞功能通过“条件锁”Condition 实现。
LinkedBlockingQueue源码:
public class LinkedBlockingQueue<E> extends AbstractQueue<E>
implements BlockingQueue<E>, java.io.Serializable {
//节点定义
static class Node<E> {
E item;
//只有尾指针 --> 单线链表
Node<E> next;
Node(E x) { item = x; }
}
//指向头节点
transient Node<E> head;
//尾节点
private transient Node<E> last;
private final ReentrantLock takeLock = new ReentrantLock();
private final Condition notEmpty = takeLock.newCondition();
private final ReentrantLock putLock = new ReentrantLock();
private final Condition notFull = putLock.newCondition();
...
}
从LinkedBlockingQueue源码中不难看出,使用单项链表存储元素,使用ReentrantLock 保证多线程操作安全,
阻塞功能通过“条件锁”Condition 实现。
BolckingQueue 常用方法
首先初始化一个容量为5的队列:
BlockingQueue<String> blockingQueue = new ArrayBlockingQueue<>(5);
1、add(E e);
函数说明:向队列中添加一个元素,当队列为空时抛NPE异常,队列满时,抛IllegalStateException异常。
返回true 表示添加元素成功,其他返回值均不成功;
该方法不会阻塞当前线程往下执行!
for (int i = 0; i < 8; i++) {
try {
System.out.println(blockingQueue.add("aaa"));
} catch (Exception e) {
}
System.out.println("继续执行....");
}
结果:
true
继续执行…
true
继续执行…
true
继续执行…
true
继续执行…
true
继续执行…
继续执行… //这里此时队列元素已满,会抛出异常,由于对异常进行了处理,因此可以继续往下执行
继续执行…
继续执行…
add(E e)底层调用offer(E e)。
public boolean add(E e) {
if (offer(e))
return true;
else
throw new IllegalStateException("Queue full");
}
2、offer(E e)
向队列中添加元素, 若队列为空,抛NPE异常;
true:添加元素成功;
false: 添加元素失败
不会阻塞当前线程执行
for (int i = 0; i < 8; i++) {
System.out.println(blockingQueue.offer("aaa"));
System.out.println("插入操作 继续执行...");
}
结果:
true
插入操作 继续执行…
true
插入操作 继续执行…
true
插入操作 继续执行…
true
插入操作 继续执行…
true
插入操作 继续执行…
false ------------------------------------------队列满后,插入失败
插入操作 继续执行…
false
插入操作 继续执行…
false
插入操作 继续执行…
public boolean offer(E e) {
//元素非空判断
checkNotNull(e);
final ReentrantLock lock = this.lock;
//获取互斥锁 因此是线程安全的
lock.lock();
try {
//队列已满
if (count == items.length)
return false;
else {
//插入队列
enqueue(e);
return true;
}
} finally {
//释放锁
lock.unlock();
}
}
//插入队列
private void enqueue(E x) {
// assert lock.getHoldCount() == 1;
// assert items[putIndex] == null;
final Object[] items = this.items;
items[putIndex] = x;
if (++putIndex == items.length)
putIndex = 0;
count++;
//元素插入成功后,唤醒条件队列中的线程! ********
notEmpty.signal();
}
2、offer(E e, long timeout, TimeUnit unit)
在指定时间内成功插入元素返回true, 失败返回false;
与offer(E e)的不同点在于,offer(E e)只插入一次,成功或者失败立即返回;
offer(E e, long timeout, TimeUnit unit) 先判断队列是否已满,若已满,则等待一定时间后再尝试插入元素。
不会阻塞当前线程执行
for (int i = 0; i < 10; i++) {
System.out.println(blockingQueue.offer("aaa",3, TimeUnit.SECONDS));
System.out.println("继续执行...");
}
结果:
true
继续执行…
true
继续执行…
true
继续执行…
true
继续执行…
true
继续执行…
false
继续执行…
false
继续执行…
false
继续执行…
false
继续执行…
false
继续执行…
public boolean offer(E e, long timeout, TimeUnit unit)
throws InterruptedException {
checkNotNull(e);
long nanos = unit.toNanos(timeout);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
/**
* 如果此时队列已满
* 等待一段时间再操作
*/
while (count == items.length) {
if (nanos <= 0)
return false;
//处理等待时间的操作
nanos = notFull.awaitNanos(nanos);
}
//元素插入到队列中
enqueue(e);
return true;
} finally {
lock.unlock();
}
}
3、put(E e)
向队列中放入元素;
会阻塞当前线程执行
for (int i = 0; i < 10; i++) {
blockingQueue.put("aaa");
System.out.println("继续执行...");
}
结果:
继续执行…
继续执行…
继续执行…
继续执行…
继续执行…
程序在此卡住不再执行…
public void put(E e) throws InterruptedException {
checkNotNull(e);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
/**
* 如果此时队列已满,当前线程加入到条件队列中(进行阻塞)
*/
while (count == items.length)
notFull.await();
//当前队列未满,元素进队
enqueue(e);
} finally {
lock.unlock();
}
}
4、take()
获取队首元素,若元素为空,
则阻塞等待
for (int i = 0; i < 10; i++) {
System.out.println(blockingQueue.take());
System.out.println("获取操作 继续执行...");
}
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
//队列为空
while (count == 0)
//阻塞等待
notEmpty.await();
return dequeue();
} finally {
lock.unlock();
}
}
5、poll()
获取队首元素,如果此时队列为空,返回null;
不会阻塞当前线程执行!
for (int i = 0; i < 10; i++) {
System.out.println(blockingQueue.poll());
System.out.println("获取操作 继续执行...");
}
public E poll() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
return (count == 0) ? null : dequeue();
} finally {
lock.unlock();
}
}
6、poll(long timeout, TimeUnit unit)
同poll(); 只不过当队列为空时,等待一定时间再获取队首元素
for (int i = 0; i < 10; i++) {
System.out.println(blockingQueue.poll(3, TimeUnit.SECONDS));
System.out.println("获取操作 继续执行...");
}
public E poll(long timeout, TimeUnit unit) throws InterruptedException {
long nanos = unit.toNanos(timeout);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
//队列为空
while (count == 0) {
if (nanos <= 0)
return null;
nanos = notEmpty.awaitNanos(nanos);
}
//获取队首元素
return dequeue();
} finally {
lock.unlock();
}
}
7、drainTo(Collection<? super E> c);
获取队列中的全部元素,保存到指定的集合中,结果返回元素个数。
不会阻塞当前线程执行
List<String> list = new ArrayList<>();
int i = blockingQueue.drainTo(list);
System.out.println(i);
list.stream().forEach(str -> {
System.out.println(str);
});
8、drainTo(Collection<? super E> c, int maxElements);
从队列中获取指定数量的元素,保存在给定的集合中
List<String> list = new ArrayList<>();
int i = blockingQueue.drainTo(list, 3);
System.out.println(i);
list.stream().forEach(str -> {
System.out.println(str);
});
OK,对上述所有方法做个总结可如下图所示:
存/取 搭配使用。
使用BolckingQueue实现生产者消费者模式
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
public class ProducerConsumerExample {
private static final int QUEUE_CAPACITY = 5; // 队列容量
private static final int TOTAL_ITEMS = 10; // 生产和消费的总数量
public static void main(String[] args) {
// 创建一个大小为 QUEUE_CAPACITY 的阻塞队列
BlockingQueue<String> queue = new ArrayBlockingQueue<>(QUEUE_CAPACITY);
// 创建生产者线程
Thread producer = new Thread(() -> {
try {
for (int i = 0; i < TOTAL_ITEMS; i++) {
String item = "Item " + i;
queue.put(item); // 如果队列已满,生产者线程会被阻塞
System.out.println("Produced: " + item);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
System.err.println("Producer interrupted");
}
});
// 创建消费者线程
Thread consumer = new Thread(() -> {
try {
for (int i = 0; i < TOTAL_ITEMS; i++) {
String item = queue.take(); // 如果队列为空,消费者线程会被阻塞
System.out.println("Consumed: " + item);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
System.err.println("Consumer interrupted");
}
});
// 启动生产者和消费者线程
producer.start();
consumer.start();
// 等待线程结束
try {
producer.join();
consumer.join();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
System.err.println("Main thread interrupted");
}
System.out.println("Production and consumption completed.");
}
}
ArrayBlockingQueue是在哪一步唤醒等待条件的线程的 ?
唤醒生产者线程:
//获取队头元素
private E dequeue() {
// assert lock.getHoldCount() == 1;
// assert items[takeIndex] != null;
final Object[] items = this.items;
@SuppressWarnings("unchecked")
E x = (E) items[takeIndex];
items[takeIndex] = null;
if (++takeIndex == items.length)
takeIndex = 0;
count--;
if (itrs != null)
itrs.elementDequeued();
//唤醒生产者线程 ****************
notFull.signal();
return x;
}
生产者放入元素后会唤醒消费者
private void enqueue(E x) {
// assert lock.getHoldCount() == 1;
// assert items[putIndex] == null;
final Object[] items = this.items;
items[putIndex] = x;
if (++putIndex == items.length)
putIndex = 0;
count++;
//唤醒消费者
notEmpty.signal();
}