目录
- BlockingQueue详解
- 0、BlockingQueue简介
- BlockingQueue接口中方法注释
- BlockingQueue的实现,总结计划
- 1、ArrayBlockingQueue简介
- 2、ArrayBlockingQueue的继承体系
- 3、ArrayBlockingQueue的构造方法
- ①、 `ArrayBlockingQueue(int capacity)`
- ②、`ArrayBlockingQueue(int capacity, boolean fair)`
- ③、`ArrayBlockingQueue(int capacity, boolean fair, Collection<? extends E> c)`
- 4、ArrayBlockingQueue的适用场景
- ①、资源池管理
- ②、多线程任务调度
- ③、实现生产者-消费者模式
- ④、使用有界队列
- ArrayBlockingQueue 的简单使用代码示例:
- 5、ArrayBlockingQueue的数据结构
- 看下ArrayBlockingQueue类的部分属性
- 6、ArrayBlockingQueue的`put`方法
- 7、ArrayBlockingQueue的`take`方法
- 8、ArrayBlockingQueue,take和put方法动画演示
- 9、ArrayBlockingQueue的其他方法
- `add(E e)`方法
- `offer(E e)` 方法
- `offer(E e, long timeout, TimeUnit unit) ` 方法
- `poll()`方法
- `poll(long timeout, TimeUnit unit)` 方法
- `peek()`方法
- `remove()`方法
- `contains(Object o)`方法
- `drainTo(Collection<? super E> c)`方法
- 10、LinkedBlockingQueue简介和数据结构
- LinkedBlockingQueue属性和构造函数
- LinkedBlockingQueue的`take`和`put`方法
- ArrayBlockingQueue和LinkedBlockingQueue的一些区别?
- 为什么ArrayBlockingQueue不设计成读写锁分离的模式?
- 11、其他的BlockingQueue实现
- PriorityBlockingQueue 简单介绍
- SynchronousQueue简单介绍
- LinkedTransferQueue简单介绍
- LinkedBlockingDeque简单介绍
BlockingQueue详解
总结到这儿,总感觉这集合的多线程味越来越重了~
为什么这样呢,因为 BlockingQueue和前面说过的CopyOnWriteArrayList、
ConcurrentHashMap都是 java.util.concurrent
包下的。
这个包就是为了解决多线程的各种问题而设计的,所以java.util.concurrent
包下的集合大都和多线程有关系。
0、BlockingQueue简介
阻塞队列的基本概念:
阻塞队列(Blocking Queue)是一种特殊的队列,支持两个特殊的操作:
- ①、在队列为空时,获取元素的操作将会被阻塞,直到队列变为非空。
- ②、在队列已满时,插入元素的操作将会被阻塞,直到队列不再是满的。
阻塞队列是线程安全的,且通常用于生产者-消费者模型中。
BlockingQueue 提供了四种不同类型的操作方式:抛出异常、特殊值、阻塞和超时。
BlockingQueue接口中方法注释
public interface BlockingQueue<E> extends Queue<E> {
// 尝试添加元素到队列中,如果队列已满,则抛出 IllegalStateException
boolean add(E e);
// 尝试添加元素到队列中,如果队列已满,则返回 false
boolean offer(E e);
// 将元素添加到队列中,如果队列已满,则等待空间可用
void put(E e) throws InterruptedException;
// 尝试将元素添加到队列中,在指定的等待时间内,如果队列已满,则等待空间可用
boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException;
// 获取并移除队列头部的元素,如果队列为空,则等待直到有元素可用
E take() throws InterruptedException;
// 尝试获取并移除队列头部的元素,在指定的等待时间内,如果队列为空,则返回 null
E poll(long timeout, TimeUnit unit) throws InterruptedException;
// 返回队列剩余的容量
int remainingCapacity();
// 从队列中移除指定的元素
boolean remove(Object o);
// 检查队列中是否包含指定的元素
boolean contains(Object o);
// 从队列中移除所有可用的元素,并将它们添加到指定的集合中
int drainTo(Collection<? super E> c);
// 从队列中移除最多 maxElements 个可用的元素,并将它们添加到指定的集合中
int drainTo(Collection<? super E> c, int maxElements);
}
知道了抽象层接口的规范后。再去看实现就有整体的把握了。
BlockingQueue的实现,总结计划
ArrayBlockingQueue 下面会详细讲解。
LinkedBlockingQueue 讲解一下底层数据结构和 take、put方法。
PriorityBlockingQueue 简单介绍。
SynchronousQueue简单介绍。
LinkedTransferQueue简单介绍。
LinkedBlockingDeque简单介绍。
1、ArrayBlockingQueue简介
ArrayBlockingQueue是JDK的JUC包下对阻塞队列的一种实现。
ArrayBlockingQueue的主要特性如下:
- ①、有界:队列有固定的容量,容量在创建时指定,且不能改变。如果队列已满,插入操作将被阻塞,直到有空间可用。
- ②、阻塞:队列支持阻塞的插入和移除操作。这意味着当队列满时,插入操作会等待,直到有空间可用;当队列空时,移除操作会等待,直到有元素可用。
- ③、线程安全:内部使用可重入锁(ReentrantLock)和两个条件变量(notEmpty和notFull)来管理并发访问。
2、ArrayBlockingQueue的继承体系
public class ArrayBlockingQueue<E> extends AbstractQueue<E>
implements BlockingQueue<E>, java.io.Serializable
可以看到ArrayBlockingQueue实现了Collection和Queue接口,是单值集合,具有队列的特性。
又实现了BlockingQueue接口,具有阻塞队列的特性。
3、ArrayBlockingQueue的构造方法
①、 ArrayBlockingQueue(int capacity)
这个构造方法创建一个具有指定容量的 ArrayBlockingQueue,其内部锁的公平性设置为 false。
/**
* 创建一个具有指定容量的ArrayBlockingQueue,锁的公平性设置为false。
*
* @param capacity 队列的容量
* @throws IllegalArgumentException 如果容量小于等于0
*/
public ArrayBlockingQueue(int capacity) {
this(capacity, false);
}
②、ArrayBlockingQueue(int capacity, boolean fair)
这个构造方法创建一个具有指定容量的 ArrayBlockingQueue,并允许指定锁的公平性。如果 fair 为 true,锁将采用公平策略,即按顺序分配锁;如果 false,锁将采用非公平策略。
/**
* 创建一个具有指定容量的ArrayBlockingQueue,并指定锁的公平性。
*
* @param capacity 队列的容量
* @param fair 指定锁的公平性,如果为true则锁是公平的
* @throws IllegalArgumentException 如果容量小于等于0
*/
public ArrayBlockingQueue(int capacity, boolean fair) {
if (capacity <= 0)
throw new IllegalArgumentException(); // 如果容量小于等于0,抛出异常
this.items = new Object[capacity]; // 初始化存储元素的数组
lock = new ReentrantLock(fair); // 创建一个可重入锁,并指定其公平性
notEmpty = lock.newCondition(); // 创建一个条件变量,用于在队列不为空时通知
notFull = lock.newCondition(); // 创建一个条件变量,用于在队列未满时通知
}
③、ArrayBlockingQueue(int capacity, boolean fair, Collection<? extends E> c)
这个构造方法创建一个具有指定容量和锁公平性的 ArrayBlockingQueue,并将指定集合中的元素添加到队列中。
/**
* 创建一个具有指定容量和锁公平性的ArrayBlockingQueue,并将集合中的元素添加到队列中。
*
* @param capacity 队列的容量
* @param fair 指定锁的公平性,如果为true则锁是公平的
* @param c 要添加到队列中的集合
* @throws IllegalArgumentException 如果容量小于等于0或者集合中的元素数量超过队列容量
* @throws NullPointerException 如果集合或其中任何一个元素为null
*/
public ArrayBlockingQueue(int capacity, boolean fair, Collection<? extends E> c) {
this(capacity, fair); // 调用前一个构造方法,初始化容量和锁的公平性
final ReentrantLock lock = this.lock;
lock.lock(); // 加锁以确保可见性,而不是互斥
try {
int i = 0;
try {
for (E e : c) {
checkNotNull(e); // 检查元素是否为null,如果是则抛出NullPointerException
items[i++] = e; // 将元素添加到队列中
}
} catch (ArrayIndexOutOfBoundsException ex) {
throw new IllegalArgumentException(); // 如果集合元素超过容量,抛出异常
}
count = i; // 设置队列中的元素数量
putIndex = (i == capacity) ? 0 : i; // 设置放置下一个元素的索引
} finally {
lock.unlock(); // 释放锁
}
}
4、ArrayBlockingQueue的适用场景
我们先看下这个集合的用途,然后简单的使用体验一下,之后再去看数据结构和一些方法的实现。
①、资源池管理
适用于实现资源池管理,例如数据库连接池。资源池中的资源可以放入 ArrayBlockingQueue,线程可以安全地获取和释放资源。
②、多线程任务调度
在多线程任务调度中,可以使用 ArrayBlockingQueue 来存放任务。工作线程从队列中取任务并执行,新的任务可以被安全地添加到队列中。
③、实现生产者-消费者模式
这个本质上也是多线程任务调度。
④、使用有界队列
当需要限制队列的最大容量时,ArrayBlockingQueue 是一个很好的选择。它在队列满时阻塞生产者,在队列空时阻塞消费者,从而有效地控制系统资源的使用。
还有哪些应用呢? 随便找个Spring项目,然后 Ctrl+鼠标左键点击JDK源码中ArrayBlockingQueue 的类名
就能看到了,可以挑几个感兴趣的研究研究,比如说 兔子消息队列的模板方法类 RabbitTemplate里面有用到ArrayBlockingQueue来实现同步的请求-回复机制。
ArrayBlockingQueue 的简单使用代码示例:
import java.util.concurrent.ArrayBlockingQueue;
public class TestA {
private static final ArrayBlockingQueue<Integer> blockingQueue = new ArrayBlockingQueue<>(5);
public static void main(String[] args) {
Thread producer = new Thread(() -> {
try {
for (int i = 1; i <= 5; i++) {
blockingQueue.put(i);
System.out.println("生产者生产了:" + i);
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}, "生产者");
Thread consumer = new Thread(() -> {
try {
for (int i = 1; i <= 5; i++) {
blockingQueue.take();
System.out.println("消费者消费了:" + i);
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}, "消费者");
producer.start();
consumer.start();
}
}
执行结果:
生产者生产了:1
消费者消费了:1
生产者生产了:2
消费者消费了:2
生产者生产了:3
消费者消费了:3
生产者生产了:4
消费者消费了:4
生产者生产了:5
消费者消费了:5
是不是感觉变味了,明明在讲集合,咋又扯上多线程了。
因为JUC包里的集合基本上都是为了解决多线程问题而设计的,没办法,不扯多线程不行呀。
不过使用ArrayBlockingQueue 来实现生产者和消费者模型是真的简单呀,只需要利用put和take方法即可,什么线程安全问题,线程通信问题都被ArrayBlockingQueue 给解决了,这些内部的线程安全实现对使用者来说算是透明的,后面会分析这些方法是如何实现的。
5、ArrayBlockingQueue的数据结构
毫无疑问 又是Object[]
数组 。
看下ArrayBlockingQueue类的部分属性
public class ArrayBlockingQueue<E> extends AbstractQueue<E>
implements BlockingQueue<E>, java.io.Serializable {
// 存储队列元素的数组
final Object[] items;
// 下一个要取出、轮询、查看或移除的元素的索引
int takeIndex;
// 下一个要放置、提供或添加的元素的索引
int putIndex;
// 队列中的元素数量
int count;
// 主锁,用于保护所有访问
final ReentrantLock lock;
// 条件变量,用于等待取出操作
private final Condition notEmpty;
// 条件变量,用于等待放置操作
private final Condition notFull;
}
6、ArrayBlockingQueue的put
方法
public void put(E e) throws InterruptedException {
// 检查要添加的元素是否为空
checkNotNull(e);
// 获取队列的锁
final ReentrantLock lock = this.lock;
// 以可中断的方式获取锁
//
lock.lockInterruptibly();
try {
// 如果队列已满,则等待
while (count == items.length)
notFull.await();
// 将元素加入队列
enqueue(e);
} finally {
// 释放锁
lock.unlock();
}
}
**注意:**当线程尝试通过lockInterruptibly()方法获取锁时,如果在此期间线程被其他线程中断(通过Thread.interrupt()方法),那么这个方法会立即抛出InterruptedException,从而允许线程优雅地处理中断,比如停止执行某些操作或清理资源。
enqueue
方法
private void enqueue(E x) {
// 获取队列数组
final Object[] items = this.items;
// 将元素放入当前插入位置
items[putIndex] = x;
// 更新插入位置索引,如果到达数组末尾则回绕
if (++putIndex == items.length)
putIndex = 0;
// 增加元素计数
count++;
// 唤醒等待“非空”条件的线程
notEmpty.signal();
}
7、ArrayBlockingQueue的take
方法
public E take() throws InterruptedException {
// 获取ReentrantLock实例,lock是ArrayBlockingQueue类中的一个成员变量
final ReentrantLock lock = this.lock;
// 以中断方式获取锁,如果当前线程被中断则抛出InterruptedException
lock.lockInterruptibly();
try {
// 如果队列为空(count == 0),则等待notEmpty条件
while (count == 0)
notEmpty.await();
// 从队列中移除并返回队头元素
return dequeue();
} finally {
// 确保在任何情况下都释放锁,以避免死锁
lock.unlock();
}
}
private E dequeue() {
// 获取存储队列元素的数组
final Object[] items = this.items;
// 获取takeIndex位置的元素,并将其强制转换为E类型
@SuppressWarnings("unchecked")
E x = (E) items[takeIndex];
// 将取出位置的元素设为null,以便GC回收
items[takeIndex] = null;
// 增加takeIndex,如果到达数组末尾则回绕到开头
if (++takeIndex == items.length)
takeIndex = 0;
// 递减count,表示队列中的元素数减少
count--;
// 如果有迭代器在迭代队列,则通知它们元素已被移除
if (itrs != null)
itrs.elementDequeued();
// 唤醒等待队列非满的线程
notFull.signal();
// 返回取出的元素
return x;
}
8、ArrayBlockingQueue,take和put方法动画演示
先看下面的代码示例:
import java.util.concurrent.ArrayBlockingQueue;
public class TestA {
public static void main(String[] args) {
try {
ArrayBlockingQueue<Integer> blockingQueue = new ArrayBlockingQueue<>(3);
blockingQueue.put(1);
blockingQueue.put(2);
blockingQueue.put(3);
System.out.println("主线程添加元素:1 2 3");
// 使用新线程 t1 再添加 一个元素 由于阻塞队列满了 所以t1线程阻塞
Thread t1 = new Thread(() -> {
try {
blockingQueue.put(4);
System.out.println("t1线程添加元素:4");
} catch (InterruptedException e) {
e.printStackTrace();
}
});
t1.start();
Integer take1 = blockingQueue.take();// 主线程拿走一个元素
System.out.println("主线程拿走元素:" + take1);
t1.join(); // 等待t1 添加完成
System.out.println("当前队列中元素:" + blockingQueue);
blockingQueue.clear(); // 清空 阻塞队列
System.out.println("==========队列已清空===========");
// 使用新线程 t2 获取元素 由于阻塞队列为空 所以t2线程阻塞
Thread t2 = new Thread(() -> {
try {
Integer take = blockingQueue.take();
System.out.println("t2线程获取元素: " + take + "成功");
} catch (InterruptedException e) {
e.printStackTrace();
}
});
t2.start();
blockingQueue.put(5);
System.out.println("主线程添加元素:5");
t2.join();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
执行结果:
一定要结合代码的执行顺序看结果
主线程添加元素:1 2 3
主线程拿走元素:1
t1线程添加元素:4
当前队列中元素:[2, 3, 4]
==========队列已清空===========
主线程添加元素:5
t2线程获取元素: 5成功
下面用动画演示下
ArrayBlockingQueue<Integer> blockingQueue = new ArrayBlockingQueue<>(3);
blockingQueue.put(1);
blockingQueue.put(2);
blockingQueue.put(3);
添加元素,把队列添加满的情况
下面再来看当队列满了之后再去添加元素的情况:
// 使用新线程 t1 再添加 一个元素 由于阻塞队列满了 所以t1线程阻塞
Thread t1 = new Thread(() -> {
try {
blockingQueue.put(4);
System.out.println("t1线程添加元素:4");
} catch (InterruptedException e) {
e.printStackTrace();
}
});
t1.start();
Integer take1 = blockingQueue.take();// 主线程拿走一个元素
最后再看下队列清空后 执行take的情况:
blockingQueue.clear(); // 清空 阻塞队列
System.out.println("==========队列已清空===========");
// 使用新线程 t2 获取元素 由于阻塞队列为空 所以t2线程阻塞
Thread t2 = new Thread(() -> {
try {
Integer take = blockingQueue.take();
System.out.println("t2线程获取元素: " + take + "成功");
} catch (InterruptedException e) {
e.printStackTrace();
}
});
t2.start();
blockingQueue.put(5);
9、ArrayBlockingQueue的其他方法
add(E e)
方法
将元素添加到队列中,当队列已满时,抛出 IllegalStateException。
public boolean add(E e) {
// 调用父类的add方法(实际就是调用下面重写的add方法)
return super.add(e);
}
public boolean add(E e) {
// 调用offer方法 尝试将元素插入队列
if (offer(e))
return true;
else
// 如果插入失败(队列已满),抛出IllegalStateException
throw new IllegalStateException("Queue full");
}
offer(E e)
方法
将元素添加到队列中,如果队列已满,立即返回 false。
public boolean offer(E e) {
// 检查传入的元素是否为null,如果为null,抛出NullPointerException
checkNotNull(e);
// 获取队列的重入锁
final ReentrantLock lock = this.lock;
// 获取锁
lock.lock();
try {
// 如果队列已满,返回false
if (count == items.length)
return false;
else {
// 将元素添加到队列
enqueue(e);
return true;
}
} finally {
// 释放锁
lock.unlock();
}
}
offer(E e, long timeout, TimeUnit unit)
方法
offer方法的重载,带超时时间。
尝试将元素添加到队列中,如果队列已满,则等待指定的时间。
如果在等待时间内队列有空闲空间,则将元素添加到队列中并返回 true;如果等待超时,则返回 false。
public boolean offer(E e, long timeout, TimeUnit unit)
throws InterruptedException {
// 检查传入的元素是否为null,如果为null,抛出NullPointerException
checkNotNull(e);
// 将等待时间转换为纳秒
long nanos = unit.toNanos(timeout);
// 获取队列的重入锁
final ReentrantLock lock = this.lock;
// 以可中断的方式获取锁,如果当前线程在等待获取锁时被中断,则抛出InterruptedException
lock.lockInterruptibly();
try {
// 如果队列已满,进入等待状态,直到队列有空闲空间或等待超时
while (count == items.length) {
// 如果等待时间已到,返回false
if (nanos <= 0)
return false;
// 等待notFull条件变量,返回剩余的等待时间 = 传入的nanos - notFull条件实际等待的时间
nanos = notFull.awaitNanos(nanos);
}
// 将元素添加到队列
enqueue(e);
return true;
} finally {
// 释放锁
lock.unlock();
}
}
poll()
方法
从队列中移除并返回队头元素,如果队列为空,返回 null。
public E poll() {
// 获取队列的重入锁
final ReentrantLock lock = this.lock;
// 获取锁
lock.lock();
try {
// 如果队列为空,返回null,否则移除并返回队头元素
return (count == 0) ? null : dequeue();
} finally {
// 释放锁
lock.unlock();
}
}
poll(long timeout, TimeUnit unit)
方法
从队列中移除并返回队头元素。如果队列为空,则等待指定的时间。
如果在等待时间内队列有元素入队,则返回队头元素;如果等待超时,则返回 null。
public E poll(long timeout, TimeUnit unit) throws InterruptedException {
// 将等待时间转换为纳秒
long nanos = unit.toNanos(timeout);
// 获取队列的重入锁
final ReentrantLock lock = this.lock;
// 以可中断的方式获取锁,如果当前线程在等待获取锁时被中断,则抛出InterruptedException
lock.lockInterruptibly();
try {
// 如果队列为空,进入等待状态,直到队列有元素或等待超时
while (count == 0) {
// 如果等待时间已到,返回null
if (nanos <= 0)
return null;
// 等待notEmpty条件变量,返回剩余的等待时间 = 传入的nanos - notFull条件实际等待的时间
nanos = notEmpty.awaitNanos(nanos);
}
// 从队列中移除并返回队头元素
return dequeue();
} finally {
// 释放锁
lock.unlock();
}
}
peek()
方法
返回队头元素但不移除它,如果队列为空,返回 null。
public E peek() {
// 获取队列的重入锁
final ReentrantLock lock = this.lock;
// 获取锁
lock.lock();
try {
// 返回队头元素但不移除它,如果队列为空,返回null
return itemAt(takeIndex);
} finally {
// 释放锁
lock.unlock();
}
}
final E itemAt(int i) {
// 获取底层数组中相应索引位置的元素
return (E) items[i];
}
remove()
方法
从队列中移除并返回队头元素,如果队列为空,抛出 NoSuchElementException。
public E remove() {
// 尝试移除并返回队头元素
E x = poll();
// 如果队头元素不为空,返回该元素
if (x != null)
return x;
else
// 如果队列为空,抛出NoSuchElementException
throw new NoSuchElementException();
}
contains(Object o)
方法
检查队列中是否包含指定元素。
public boolean contains(Object o) {
// 如果传入的对象为null,返回false
if (o == null) return false;
// 获取队列的数组
final Object[] items = this.items;
// 获取队列的重入锁
final ReentrantLock lock = this.lock;
// 获取锁
lock.lock();
try {
// 如果队列中有元素,检查每个元素是否与传入的对象相等
if (count > 0) {
// 获取当前插入位置索引
final int putIndex = this.putIndex;
// 从队列的读取位置开始检查
int i = takeIndex;
do {
// 如果找到相等的元素,返回true
if (o.equals(items[i]))
return true;
// 更新检查位置索引
if (++i == items.length)
i = 0;
} while (i != putIndex);
}
// 如果未找到相等的元素,返回false
return false;
} finally {
// 释放锁
lock.unlock();
}
}
drainTo(Collection<? super E> c)
方法
将队列中的所有元素转移到指定的集合中。
此操作是一个批量操作,试图一次性地清空队列并把队列中所有元素转移到指定的集合中。
public int drainTo(Collection<? super E> c) {
// 调用重载方法
return drainTo(c, Integer.MAX_VALUE);
}
public int drainTo(Collection<? super E> c, int maxElements) {
// 检查目标集合是否为null,如果为null,抛出NullPointerException
checkNotNull(c);
// 检查目标集合是否为队列本身,如果是,抛出IllegalArgumentException
if (c == this)
throw new IllegalArgumentException();
// 如果maxElements小于等于0,直接返回0
if (maxElements <= 0)
return 0;
// 获取队列的重入锁
final ReentrantLock lock = this.lock;
// 获取锁
lock.lock();
try {
// 计算实际要转移的元素数量
int n = Math.min(maxElements, count);
// 将元素从队列中转移到目标集合
for (int i = 0; i < n; i++) {
c.add(this.dequeue());
}
// 返回实际转移的元素数量
return n;
} finally {
// 释放锁
lock.unlock();
}
}
10、LinkedBlockingQueue简介和数据结构
LinkedBlockingQueue属性和构造函数
看下面两个类属性就知道了
transient Node<E> head;
private transient Node<E> last;
LinkedBlockingQueue 是一个基于链表实现的阻塞队列。
再看下构造函数:
// 创建一个容量为 Integer.MAX_VALUE 的 LinkedBlockingQueue 实例。
public LinkedBlockingQueue() {
// 调用带有容量参数的构造函数,默认容量为Integer.MAX_VALUE
this(Integer.MAX_VALUE);
}
// 创建一个指定容量的 LinkedBlockingQueue 实例。
public LinkedBlockingQueue(int capacity) {
// 检查容量是否大于0,否则抛出IllegalArgumentException
if (capacity <= 0) throw new IllegalArgumentException();
// 初始化队列容量
this.capacity = capacity;
// 初始化头节点和尾节点为哨兵节点,不存储实际数据
last = head = new Node<E>(null);
}
// 使用给定的集合创建一个 LinkedBlockingQueue 实例,并将集合中的元素添加到队列中。
public LinkedBlockingQueue(Collection<? extends E> c) {
// 调用带有容量参数的构造函数,默认容量为Integer.MAX_VALUE
this(Integer.MAX_VALUE);
// 获取插入锁
final ReentrantLock putLock = this.putLock;
// 获取锁以确保可见性
putLock.lock();
try {
int n = 0;
// 遍历集合中的每个元素
for (E e : c) {
// 如果元素为null,抛出NullPointerException
if (e == null)
throw new NullPointerException();
// 如果元素数量达到容量,抛出IllegalStateException
if (n == capacity)
throw new IllegalStateException("Queue full");
// 将元素包装成节点并添加到队列中
enqueue(new Node<E>(e));
++n;
}
// 更新队列中元素的数量
count.set(n);
} finally {
// 释放锁
putLock.unlock();
}
}
LinkedBlockingQueue的take
和put
方法
首先先明确一点LinkedBlockingQueue的读锁和写锁是分开的,这点和ArrayBlockingQueue不同。ArrayBlockingQueue读写用的都是同一个锁。
LinkedBlockingQueue 使用了读写分离锁,即 takeLock 和 putLock,来分别控制读取和插入操作。这种设计可以减少锁竞争,提高并发性能。
private final ReentrantLock takeLock = new ReentrantLock();
private final ReentrantLock putLock = new ReentrantLock();
take
方法
public E take() throws InterruptedException {
E x; // 存储从队列中取出的元素
int c = -1; // 用于记录当前队列元素数量
final AtomicInteger count = this.count; // 队列中元素的数量 缓存成员变量到局部变量
final ReentrantLock takeLock = this.takeLock; // 读取操作的锁
takeLock.lockInterruptibly(); // 获取读取锁,可被中断
try {
// 当队列为空时,等待notEmpty条件
while (count.get() == 0) {
notEmpty.await(); // 等待队列非空
}
x = dequeue(); // 从队列中取出一个元素
c = count.getAndDecrement(); // 获取并递减当前队列中的元素数量
// 如果队列中还有剩余元素,唤醒其他等待线程
if (c > 1)
notEmpty.signal(); // 唤醒其他等待的读取操作
} finally {
takeLock.unlock(); // 释放读取锁
}
// 如果取出元素后,队列变得不满,唤醒等待的插入操作
if (c == capacity)
signalNotFull(); // 通知等待的插入操作
return x; // 返回取出的元素
}
take方法步骤说明:
- 获取 takeLock 锁,确保只有一个线程可以执行读取操作。
- 如果队列为空,等待 notEmpty 条件。
- 从队列中取出一个元素,并递减元素数量。
- 如果队列中仍有元素,唤醒其他等待的读取操作。
- 释放 takeLock 锁。
- 如果队列之前已满,现在有空闲,唤醒等待的插入操作。
- 返回取出的元素。
put
方法
public void put(E e) throws InterruptedException {
if (e == null) throw new NullPointerException(); // 检查插入元素是否为null
int c = -1; // 用于记录当前队列元素数量
Node<E> node = new Node<E>(e); // 创建一个新的节点包装插入的元素
final ReentrantLock putLock = this.putLock; // 插入操作的锁
final AtomicInteger count = this.count; // 队列中元素的数量 缓存成员变量到局部变量
putLock.lockInterruptibly(); // 获取插入锁,可被中断
try {
// 当队列已满时,等待notFull条件
while (count.get() == capacity) {
notFull.await(); // 等待队列非满
}
enqueue(node); // 将新节点插入到队列中
c = count.getAndIncrement(); // 获取并递增当前队列中的元素数量
// 如果插入后队列仍未满,唤醒其他等待线程
if (c + 1 < capacity)
notFull.signal(); // 唤醒其他等待的插入操作
} finally {
putLock.unlock(); // 释放插入锁
}
// 如果插入元素前队列为空,唤醒等待的读取操作
if (c == 0)
signalNotEmpty(); // 通知等待的读取操作
}
put方法步骤说明:
- 检查插入的元素是否为 null,如果是,则抛出 NullPointerException。
- 创建一个新节点包装插入的元素。
- 获取 putLock 锁,确保只有一个线程可以执行插入操作。
- 如果队列已满,等待 notFull 条件。
- 将新节点插入到队列中,并递增元素数量。
- 如果插入后队列仍未满,唤醒其他等待的插入操作。
- 释放 putLock 锁。
- 如果插入元素前队列为空,现在有元素,唤醒等待的读取操作。
注意: 队列中元素数量使用AtomicInteger保证原子性。同时缓存成员变量到局部变量有一些好处。
例如可以提高代码的可读性和可维护性,同时也有助于性能优化和确保线程安全的引用。
final AtomicInteger count = this.count; // 队列中元素的数量
并且源码中对于上面这段代码不在lock锁范围内使用有一段注释:
关于count的非保护使用: 注释指出,尽管count变量没有直接由putLock保护(即在读取count时没有加锁),这种做法在这里是安全的。原因是当线程持有putLock时,其他试图插入或者移除元素的线程都被阻塞了,因此count的值只可能因为当前线程或其他线程从等待中被唤醒并完成了插入或移除操作而改变。这意味着,即使在等待期间检查count的值,也不会因为并发修改而导致不一致。
ArrayBlockingQueue和LinkedBlockingQueue的一些区别?
特性 | ArrayBlockingQueue | LinkedBlockingQueue |
---|---|---|
底层数据结构 | 数组 | 链表 |
有界/无界 | 有界(在构造时指定固定容量) | 可以有界(指定容量)也可以无界(默认) |
锁机制 | 单一锁(用于 put 和 take 操作) | 读写锁分离(分别用于 put 和 take 操作) |
性能 | 在单线程或低并发场景中性能较好,因为锁开销较少 | 在高并发场景中性能较好,因为读写操作分离减少了锁竞争 |
内存开销 | 固定内存开销(数组大小) | 潜在的更高内存开销(链表节点) |
容量扩展 | 不支持动态扩展,容量固定 | 支持动态扩展(如果无界) |
适用场景 | 适用于固定大小的队列和低并发场景 | 适用于需要动态扩展的队列和高并发场景 |
为什么ArrayBlockingQueue不设计成读写锁分离的模式?
我觉得有如下原因:
- 数据结构导致的设计差异
ArrayBlockingQueue:
使用数组作为底层数据结构。
需要在固定大小的数组中进行元素的插入和删除操作,这些操作涉及到对数组索引的维护和管理。
队列满或空时,需要阻塞插入或删除操作,涉及到队列头尾指针的调整。
如果使用读写锁分离,需要考虑并处理队列头尾指针调整的线程安全问题,设计起来比较复杂。
LinkedBlockingQueue:
使用链表作为底层数据结构。
插入和删除操作只需调整链表的头尾指针,不涉及到数组索引的管理。
链表的结构使得读写操作更容易分离,适合使用不同的锁进行管理。
- 性能和设计权衡
ArrayBlockingQueue是针对固定大小的队列,设计目标就是简单、高效。
ArrayBlockingQueue也并非不能设计成读写锁模式,只是投入和回报比不成正比。读写锁的引入会增加复杂度,性能提升可能有限,不一定值得。
11、其他的BlockingQueue实现
PriorityBlockingQueue 简单介绍
PriorityBlockingQueue 是一个基于优先级的无界阻塞队列。
继承体系
public class PriorityBlockingQueue<E> extends AbstractQueue<E>
implements BlockingQueue<E>, java.io.Serializable
数据结构:
底层数据结构:PriorityBlockingQueue 基于一个可调整大小的数组实现的二叉小顶堆(最小堆),它使用数组来表示二叉堆。后面会再写一篇PriorityQueue详解的文章详细介绍其内部数据结构(二叉小顶堆)。
优先级排序:
队列中的元素按照自然顺序(通过实现 Comparable 接口)或通过提供的比较器(Comparator)进行排序。元素的优先级决定了它们在队列中的顺序。
适用场景:
**任务调度:**适用于需要按照优先级处理任务的场景。
路径查找: 在图算法中,优先队列通常用于实现最短路径查找算法,如 Dijkstra 算法。
**多线程环境:**需要在多线程环境中进行优先级调度的场景。
SynchronousQueue简单介绍
SynchronousQueue 是 Java 并发包中的一种特殊类型的队列。
继承体系
public class SynchronousQueue<E> extends AbstractQueue<E>
implements BlockingQueue<E>, java.io.Serializable
特点:
无存储元素: SynchronousQueue 内部并不存储任何元素,即使是空间大小为 0。
直接传输: 它主要用于线程之间直接传输元素,生产者线程将元素直接交给消费者线程,而不会将元素存储在队列中。
阻塞操作: SynchronousQueue 的插入和移除操作是阻塞的。
公平性: 它是一个公平的队列,采用公平的顺序来处理元素的访问。
容量: 容量为 0,即只能容纳一个正在进行交换的元素。
适用场景
直接传输: 适用于需要在生产者和消费者之间进行直接传输的场景,例如线程池任务调度等。
流水线处理: 适用于将生产者生成的数据直接传输给消费者处理的情况,避免数据存储和额外的线程开销。
同步控制: 在并发编程中,用于线程同步和控制流量的传输。
LinkedTransferQueue简单介绍
LinkedTransferQueue 是 Java 并发包中的一个特殊类型的队列,它结合了队列(Queue)和传输队列(TransferQueue)的特性。
继承体系
public class LinkedTransferQueue<E> extends AbstractQueue<E>
implements TransferQueue<E>, java.io.Serializable
数据结构:
链表
特点
无界队列: LinkedTransferQueue 是一个无界队列,可以动态增长以容纳更多的元素。
插入和移除操作: 支持常规的插入(offer、add)和移除(poll、take)操作。
传输操作: 支持直接的元素传输操作,即使队列为空也可以进行元素传输。
阻塞和非阻塞操作: 提供了阻塞和非阻塞的插入和移除方法,以及等待传输的操作。
公平性: 在处理元素时采用公平的顺序,即按照等待时间长短处理。
适用场景
生产者-消费者模式: 适用于多线程环境下的生产者和消费者模式,支持高并发的元素传输和处理。
任务调度: 在任务调度器中,可以使用 LinkedTransferQueue 来管理和调度任务,实现任务之间的依赖和传递。
优先级处理: 可以基于元素的属性或优先级进行传输和处理,支持优先级队列的特性。
LinkedBlockingDeque简单介绍
LinkedBlockingDeque 是 Java 并发包中的双向阻塞队列,结合了双向队列(Deque)和阻塞队列(BlockingDeque)的特性。
继承体系
public class LinkedBlockingDeque<E>
extends AbstractQueue<E>
implements BlockingDeque<E>, java.io.Serializable
数据结构:
双向链表
特点
双向队列: LinkedBlockingDeque 是一个双向队列,支持在队列两端(头部和尾部)进行插入和移除操作。
阻塞操作: 队列的插入和移除操作是阻塞的,即当队列为空或已满时,插入和移除操作会阻塞调用线程,直到有可用的空间或元素。
无界队列: 与 LinkedBlockingQueue 类似,LinkedBlockingDeque 也是一个无界队列,可以动态增长以容纳更多的元素。
线程安全: LinkedBlockingDeque 是线程安全的,支持多个线程同时进行插入、移除和访问操作。
公平性: 使用公平的顺序来处理等待队列的线程。