1. 前言
今天来逐个方法解析下
LinkedBlockingQueue
. 其实跟上篇文件深入了解ArrayBlockingQueue 阻塞队列很类似。只不过底层实现不同。其实看名字就能看出来到底依据什么实现的。好了,废话不多说了,接下来让我们开始吧
至于API的使用情况跟
ArrayBlockingQueue
保持一致。这里就不过多的赘述
2. 代码解析
2.1 构造方法
public LinkedBlockingQueue(int capacity) {
// 如果队列设置的大小 <= 0的话 直接报异常
if (capacity <= 0) throw new IllegalArgumentException();
// 设置队列最终的长度
this.capacity = capacity;
// 设置last 以及head。 是一个虚拟node节点
last = head = new Node<E>(null);
}
2.2 属性介绍
class Node
表示队列中的每个元素,因为是链表结构,所以每个节点都是Nodecapacity;
表示设置链表的最大长度count
表示累计的链表的长度head
表示链表的头节点last
表示链表ode尾节点takeLock
消费者使用锁notEmpty
消费者挂起线程的conditionputLock
生产者使用的锁notFull
生产者挂起线程的condition
2.3 实现方法
2.3.1 生产者实现方法
2.3.1.1 add
添加元素
,阻塞队列满后直接报异常
public boolean add(E e) {
// 直接调用offer方法 如果添加元素成功后直接返回 反之队列满后返回异常
if (offer(e))
return true;
else
throw new IllegalStateException("Queue full");
}
2.3.1.2 offer
添加元素
添加成功返回true,反之返回false
public boolean offer(E e) {
// 如果元素为空,直接报空指针异常
if (e == null) throw new NullPointerException();
// 获取计数器
final AtomicInteger count = this.count;
// 如果队列满了 直接返回false
if (count.get() == capacity)
return false;
int c = -1;
// 封装node节点
Node<E> node = new Node<E>(e);
// 获取生产者锁
final ReentrantLock putLock = this.putLock;
// 启动锁
putLock.lock();
try {
// 队列不满的情况
if (count.get() < capacity) {
// 添加元素
enqueue(node);
// 先返回累加之前的值 然后进行自增
c = count.getAndIncrement();
// 队列中还存在位置
if (c + 1 < capacity)
// 唤醒生产者线程
notFull.signal();
}
} finally {
// 解锁
putLock.unlock();
}
// 如果之前的队列元素个数为0的话,现在就有元素了,唤醒消费者线程
if (c == 0)
signalNotEmpty();
return c >= 0;
}
2.3.1.3 有参offer
跟
2.3.1.2
的API用法保持一致。只不过当队列满的时候,该API对阻塞线程到一定时间。要么时间到了过期了,要么就是被别人唤醒了。
// InterruptedException 可被打断的锁
public boolean offer(E e, long timeout, TimeUnit unit)
throws InterruptedException {
// 元素为null的话 直接报异常
if (e == null) throw new NullPointerException();
// 时间转换
long nanos = unit.toNanos(timeout);
int c = -1;
// 获取生产者锁
final ReentrantLock putLock = this.putLock;
// 累加器
final AtomicInteger count = this.count;
// 上锁 可打断锁
putLock.lockInterruptibly();
try {
// 如果队列是满的话 就一直在循环中
while (count.get() == capacity) {
// 如果时间到了 直接返回false
if (nanos <= 0)
return false;
// 线程挂起指定的时间
nanos = notFull.awaitNanos(nanos);
}
// 队列中添加元素
enqueue(new Node<E>(e));
// count 累加 返回上一次的值
c = count.getAndIncrement();
// 如果队列不满 唤醒挂起的生产者
if (c + 1 < capacity)
notFull.signal();
} finally {
// 解锁
putLock.unlock();
}
// 如果刚开始的时候 队列是空的 直接唤醒消费者
if (c == 0)
signalNotEmpty();
return true;
}
private void enqueue(Node<E> node) {
将元素添加到链表的尾部
last = last.next = node;
}
2.3.1.3 put
添加元素
如果队列是满的话就将线程挂起,一直处于等待状态,直到被唤醒。
public void put(E e) throws InterruptedException {
// 判断是否为空
if (e == null) throw new NullPointerException();
// 默认状态
int c = -1;
// 封装节点
Node<E> node = new Node<E>(e);
// 生产者锁
final ReentrantLock putLock = this.putLock;
// 累加器
final AtomicInteger count = this.count;
// 上锁。可打断锁
putLock.lockInterruptibly();
try {
// 如果队列是满的话 就一直处于线程挂起状态,直到被别人唤醒
while (count.get() == capacity) {
notFull.await();
}
// 添加元素
enqueue(node);
// 累加count 返回上一次的值
c = count.getAndIncrement();
// 如果队列未满的话,唤醒挂起的生产者线程
if (c + 1 < capacity)
notFull.signal();
} finally {
putLock.unlock();
}
// 如果队列之前是空的话 唤醒消费者线程
if (c == 0)
signalNotEmpty();
}
2.3.2 消费者实现方法
2.3.2.1 remove
删除元素
本质上还是调用了poll
方法。如果删除失败的时候,直接报异常
public E remove() {
// 本质上是调用了poll方法
E x = poll();
// 如果返回值不为null的话 直接返回。反之就是异常
if (x != null)
return x;
else
throw new NoSuchElementException();
}
2.3.2.2 poll
删除元素
如果队列为空的话,表示删除失败,反之就是删除成功
public E poll() {
// 获取累加器
final AtomicInteger count = this.count;
// 如果此时队列是空的话 直接返回null
if (count.get() == 0)
return null;
// 初始返回状态
E x = null;
// 初始值
int c = -1;
// 获取消费锁
final ReentrantLock takeLock = this.takeLock;
// 上锁
takeLock.lock();
try {
// 如果队列中有值的话 进入if条件
if (count.get() > 0) {
// 删除元素 返回值
x = dequeue();
// 先返回上一次的值 然后递减
c = count.getAndDecrement();
// 如果队列中还有值的话 唤醒消费者线程
if (c > 1)
notEmpty.signal();
}
} finally {
takeLock.unlock();
}
// 如果上一次满的 但是这次不是了 就开始唤醒产生者线程
if (c == capacity)
signalNotFull();
return x;
}
2.3.2.3 有参poll
删除元素
当队列为空的时候,会将线程挂起一段时间。被唤醒或是时间到期后,继续执行。
public E poll(long timeout, TimeUnit unit) throws InterruptedException {
// 初始值
E x = null;
// 初始长度
int c = -1;
// 统一时间格式转换
long nanos = unit.toNanos(timeout);
// 累加器
final AtomicInteger count = this.count;
// 消费者锁
final ReentrantLock takeLock = this.takeLock;
// 上锁 可被打断
takeLock.lockInterruptibly();
try {
// 如果队列为空的话 就一直在while中
while (count.get() == 0) {
// 如果等待时间到了 直接返回null
if (nanos <= 0)
return null;
// 线程暂时挂起
nanos = notEmpty.awaitNanos(nanos);
}
// 删除元素
x = dequeue();
// 返回上一次的值 同时对count 递减
c = count.getAndDecrement();
// 如果队列中还存在值 唤醒消费者线程
if (c > 1)
notEmpty.signal();
} finally {
takeLock.unlock();
}
// 如果上次是队列是满的,但是这次是递减1 唤醒生产者线程
if (c == capacity)
signalNotFull();
return x;
}
2.3.2.4 take
删除元素
如果队列是空的话 就一直处于等待状态。
public E take() throws InterruptedException {
// 初始值
E x;
// 初始长度
int c = -1;
// 累加器
final AtomicInteger count = this.count;
// 消费者锁
final ReentrantLock takeLock = this.takeLock;
// 上锁 可被打断
takeLock.lockInterruptibly();
try {
// 如果队列为空的时候 就一直处于while循环中
while (count.get() == 0) {
// 挂起线程
notEmpty.await();
}
// 删除元素
x = dequeue();
// 先返回上一次的值 然后进行递减
c = count.getAndDecrement();
// 如果队列中还存在元素 唤醒消费者线程
if (c > 1)
notEmpty.signal();
} finally {
takeLock.unlock();
}
// 上一次队列是满的 经过这次执行后 队列出现了空余。 唤醒生产者线程
if (c == capacity)
signalNotFull();
return x;
}
3. 结束
其实
LinkedBlockingQueue
实现方法很简单。但是勤能补拙
所以一定要总结出来。好了,今天的总结就这么多了如果大家有什么好的想法可以再评论区跟我交流啊