目录
1、LinkedBlokingQueue是一个有界队列
2、LinkedBlokingQueue是一个单向队列
3、LinkedBlokingQueue中的非阻塞方法
4、LinkedBlokingQueue中的阻塞方法
LinkedBlockingQueue是通过ReentrantLock实现的(有界/无界)阻塞队列,在线程池TheadPoolExecutor中的workQueue就是一个LinkedBlockingQueue的实例。
思考:为什么说LinkedBlockingQueue是一个队列?
根据数据结构中队列的特点判断:先进先出(FIFO),队尾进,队头出。
- LinkedBlockingQueue中的插入方法offer()、put()都是在队尾添加元素。
- LinkedBlockingQueue中的获取/删除方法peek()、poll()、take()都是在队头获取/删除元素。
与普通队列相比,线程池使用LinkedBlockingQueue作为缓存队列的好处是:
- 当队列满了的时候可以阻塞添加任务的线程(放到条件变量ConditionObject的条件队列notFull里),而不用丢弃当前线程
- 当队列为空时,会阻塞获取任务的线程(放到条件变量ConditionObject的条件队列notEmpty里),而不用丢弃当前线程
在这篇文章中,会详细介绍LinkedBlockingQueue的底层实现原理。
在此之前,你需要了解ReentrantLock、ConditionObject以及LockSupport几个并发相关的API
为了方便快速了解其结构,简单画了一下的LinkedBlockingQueue类图
通过上面类图可以了解到,LinkedBlokingQueue中依赖了ReentrantLock来保证入队(putLock)和出队(takeLock)的线程安全,同时通过Condition(条件变量)来保存take()方法因队列为空而阻塞的线程(对应条件变量为notEmpty)和put()方法因队列已满而阻塞的线程(对应条件变量为notFull)。
/** Lock held by take, poll, etc */
private final ReentrantLock takeLock = new ReentrantLock();
/** Wait queue for waiting takes */
private final Condition notEmpty = takeLock.newCondition();
/** Lock held by put, offer, etc */
private final ReentrantLock putLock = new ReentrantLock();
/** Wait queue for waiting puts */
private final Condition notFull = putLock.newCondition();
1、LinkedBlokingQueue是一个有界队列
LinkedBlokingQueue是一个有界队列,因为它内部通过int类型的capacity属性来保存当前队列的长度,可以通过实例化时传入int类型参数指定,当通过无参构造方法实例化时,队列长度为Integer.MAX_VALUE,所以这依然是一个无界队列。
public LinkedBlockingQueue() {
this(Integer.MAX_VALUE);
}
public LinkedBlockingQueue(int capacity) {
if (capacity <= 0) {
throw new IllegalArgumentException();
}
this.capacity = capacity;
last = head = new Node<E>(null);
}
2、LinkedBlokingQueue是一个单向队列
LinkedBlokingQueue是一个单向队列,因为其内部定义的Node是一个单向的链表,并且LinkedBlokingQueue只通过head和last保存了队头和队尾节点。
/**
* Linked list node class
*/
static class Node<E> {
E item;
Node<E> next;
Node(E x) { item = x; }
}
3、LinkedBlokingQueue中的非阻塞方法
public boolean offer(E e):往队尾添加元素,如果队列已满,则直接返回false,不会阻塞线程。
public boolean offer(E e) {
if (e == null) {
throw new NullPointerException();
}
// 获取队列长度
final AtomicInteger count = this.count;
// 队列已满,返回false,添加失败
if (count.get() == capacity) {
return false;
}
// 创建一个变量保存队列的大小(长度)
int c = -1;
// 根据数据创建Node节点
Node<E> node = new Node<E>(e);
// 加锁
final ReentrantLock putLock = this.putLock;
putLock.lock();
try {
if (count.get() < capacity) {
// 入队
enqueue(node);
// 队列长度自增1
c = count.getAndIncrement();
// 如果队列还没有满,唤醒notFull中因为添加失败被阻塞的一个线程
if (c + 1 < capacity) {
notFull.signal();
}
}
} finally {
// 释放锁
putLock.unlock();
}
// 如果入队之前队列为空,则入队之后队列中有一个元素
// 唤醒一个因为调用take()方法被阻塞的线程
if (c == 0) {
signalNotEmpty();
}
return c >= 0;
}
入队操作
private void enqueue(Node<E> node) {
// assert putLock.isHeldByCurrentThread();
// assert last.next == null;
last = last.next = node;
}
public E poll():在队头获取并删除一个元素,如果队列为空,直接返回null,不会阻塞线程。
public E poll() {
final AtomicInteger count = this.count;
// 队列没有元素,返回null
if (count.get() == 0) {
return null;
}
E x = null;
int c = -1;
final ReentrantLock takeLock = this.takeLock;
takeLock.lock();
try {
if (count.get() > 0) {
// 出队操作
x = dequeue();
// 队列长度自减1
c = count.getAndDecrement();
// 队列不为空
// 唤醒notEmpty中因为队列为空,即通过take()获取元素失败而被阻塞的一个线程
if (c > 1) {
notEmpty.signal();
}
}
} finally {
takeLock.unlock();
}
// 如果出队之前队列是满的,则出队之后队列中还有一个可用的位置
// 唤醒一个因为调用put()方法被阻塞的线程
if (c == capacity) {
signalNotFull();
}
return x;
}
出队操作
private E dequeue() {
// assert takeLock.isHeldByCurrentThread();
// assert head.item == null;
Node<E> h = head;
Node<E> first = h.next;
h.next = h; // help GC
head = first;
E x = first.item;
first.item = null;
return x;
}
public E peek():从队头获取一个元素,但是不删除元素。
这个方法非常简单,加锁获取队列的头结点,如果队列为空返回null。
public E peek() {
if (count.get() == 0)
return null;
final ReentrantLock takeLock = this.takeLock;
takeLock.lock();
try {
Node<E> first = head.next;
if (first == null)
return null;
else
return first.item;
} finally {
takeLock.unlock();
}
}
4、LinkedBlokingQueue中的阻塞方法
public E take() throws InterruptedException:获取队头的元素,如果队列为空,则阻塞当前线程。
public E take() throws InterruptedException {
E x;
int c = -1;
final AtomicInteger count = this.count;
final ReentrantLock takeLock = this.takeLock;
takeLock.lockInterruptibly();
try {
// 队列为空,当前线程(获取元素的线程)被阻塞
while (count.get() == 0) {
notEmpty.await();
}
// 出队
x = dequeue();
// 队列长度自增
c = count.getAndDecrement();
// 队列不为空,唤醒notEmpty中的一个线程
if (c > 1) {
notEmpty.signal();
}
} finally {
takeLock.unlock();
}
if (c == capacity) {
signalNotFull();
}
return x;
}
notEmpty.await();这行代码完成了阻塞当前线程,我们看一下他的实现
因为notEmpty是调用ReentrantLock的newCondition()方法得到的,所以用的是AQS的内部Condition实现类ConditionObject。
public final void await() throws InterruptedException {
// 如果当前线程被中断了,清除中断状态,抛出中断异常返回
if (Thread.interrupted()) {
throw new InterruptedException();
}
// 把当前线程放到条件队列中
Node node = addConditionWaiter();
int savedState = fullyRelease(node);
int interruptMode = 0;
// node节点已经在条件队列中
while (!isOnSyncQueue(node)) {
// 中断线程
LockSupport.park(this);
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
if (node.nextWaiter != null) // clean up if cancelled
unlinkCancelledWaiters();
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
}
所以,最终是通过LockSupport.part()方法来中断线程的,对应的signal()和signalAll()方法也是通过LockSupport.unpark()方法来唤醒线程。
public final void signal() {
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
Node first = firstWaiter;
if (first != null)
doSignal(first);
}
private void doSignal(Node first) {
do {
if ( (firstWaiter = first.nextWaiter) == null)
lastWaiter = null;
first.nextWaiter = null;
} while (!transferForSignal(first) && (first = firstWaiter) != null);
}
final boolean transferForSignal(Node node) {
if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
return false;
Node p = enq(node);
int ws = p.waitStatus;
if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
LockSupport.unpark(node.thread); // 唤醒线程
return true;
}
public void put(E e) throws InterruptedException:往队尾添加元素,如果队列已满,则阻塞当前线程。
public void put(E e) throws InterruptedException {
// 添加的元素不能为空
if (e == null)
throw new NullPointerException();
int c = -1;
Node<E> node = new Node<E>(e);
final ReentrantLock putLock = this.putLock;
final AtomicInteger count = this.count;
putLock.lockInterruptibly();
try {
// 队列已满,阻塞线程
while (count.get() == capacity) {
notFull.await();
}
// 入队
enqueue(node);
// 队列长度自增
c = count.getAndIncrement();
// 如果队列还没有满,唤醒notFull中因为添加失败被阻塞的一个线程
if (c + 1 < capacity)
notFull.signal();
} finally {
putLock.unlock();
}
if (c == 0)
signalNotEmpty();
}
好了,这篇文章就分享到这里了,看完不要忘了点赞+收藏哦~