作者简介:
专注于研究Linux内核、Hotspot虚拟机、汇编语言、JDK源码、各大中间件源码等等
喜欢的话,可以三连+关注~
ArrayBlockingQueue的介绍
在JUC包下关于线程安全的队列实现有很多,那么此篇文章讲解ArrayBlockingQueue的实现原理。相对于LinkedBlockingQueue和SynchronousQueue来说,ArrayBlockingQueue效率比较低,但是实现比较容易,从类名也可以看出,这个是基于数组实现的队列。从简单入手,再一步步学习复杂的。
JUC并发编程之LinkedBlockingQueue的底层原理
JUC并发编程之SynchronousQueue的底层原理
// 因为基于数组实现的队列,全局数组
final Object[] items;
// 消费者索引
int takeIndex;
// 生产者索引
int putIndex;
// 队列总数计数器
int count;
// 同步锁,所以生产者和消费者同一时刻只能一个运行。
final ReentrantLock lock;
// 消费者条件等待队列
private final Condition notEmpty;
// 生产者条件等待队列
private final Condition notFull;
ArrayBlockingQueue实现基于数组,但是DougLea把数组玩的比较灵活,这里是一个循环数组,把一个数组完成了复用。
整体只使用了一把锁,所以生产者和消费者共用一把锁,也即生产的时候不能消费(这里对比LinkedBlockingQueue,所以ArrayBlockingQueue效率比较低)。条件等待队列还是分为了消费者队列和生产者队列。
构造方法把数组和同步锁给初始化了,没啥好讲,那么下面就开始介绍生产者和消费者方法。
put方法(生产者)
// 生产者,带阻塞
public void put(E e) throws InterruptedException {
checkNotNull(e);
final ReentrantLock lock = this.lock;
// 可响应中断
lock.lockInterruptibly();
try {
// 如果当前队列满了,就去阻塞
while (count == items.length)
// 就去阻塞,等待消费线程消费了节点把我唤醒,然后抢锁再去插入。
notFull.await();
// 插入节点
enqueue(e);
} finally {
lock.unlock();
}
}
// 插入数据
private void enqueue(E x) {
// 当前是单线程,已经外面已经上锁了,能到这里代表已经获取到锁了。
// 因为当前队列是数组实现的,所以得到当前数组对象
final Object[] items = this.items;
// 插入,从0开始
items[putIndex] = x;
// ++putIndex调整索引,方便下次插入。
// 如果当前插入后达到了最大容量,让数组下标又从0开始,循环数组。
if (++putIndex == items.length)
// 清零,方便下一轮插入。
putIndex = 0;
// 当前队列中总共的数量。
count++;
// 唤醒因为当前队列中没有节点而阻塞的消费者线程,
notEmpty.signal();
}
要注意这里是一个循环数组,当putIndex生产索引达到数组长度后,把索引清零。此时会不会出现数组越界呢?答案肯定是:不会,因为count全局计数器保证了当队列满了以后,生产者会去条件队列阻塞,等待消费者消费再唤醒。
大致流程如下:
上可响应中断锁
如果队列满了就去阻塞等待
如果没满就往队列(数组)中插入元素
如果生产索引达到了数组长度,清空索引,达到数组复用(循环数组)。
队列元素计数器+1
唤醒消费者节点
释放锁
take方法(消费者)
// 消费者,带阻塞
public E take() throws InterruptedException {
// 获取到锁
final ReentrantLock lock = this.lock;
// 可响应中断锁
lock.lockInterruptibly();
try {
// 如果队列为空,消费者直接去阻塞
while (count == 0)
// 阻塞等待被唤醒
notEmpty.await();
// 被中断唤醒或者是被生产者唤醒。
// 此时消费节点。
return dequeue();
} finally {
lock.unlock();
}
}
private E dequeue() {
// 得到当前的数组对象。
final Object[] items = this.items;
@SuppressWarnings("unchecked")
// 取出节点
E x = (E) items[takeIndex];
// 把节点置为null,方便下次使用。
items[takeIndex] = null;
// 如果当前取完节点后,是最后一个节点,那么清空索引,方便下轮使用,循环数组
if (++takeIndex == items.length)
takeIndex = 0;
// 总数量-1
count--;
//
if (itrs != null)
itrs.elementDequeued();
// 唤醒那些因为队列满了而阻塞等待的生产者线程。
notFull.signal();
return x;
}
这里跟生产者方法基本思想一致,存在一个消费索引,当消费索引等于数组长度时,清空索引,达到复用。
大致流程如下:
上可响应中断锁
如果队列为空就去阻塞等待
如果不为空就往队列(数组)中根据takeIndex消费索引获取元素
如果消费索引达到了数组长度,清空索引,达到数组复用(循环数组)。
队列元素计数器-1
唤醒生产者节点
释放锁
总结
循环队列,生产者操作putIndex索引,消费者操作takeIndex索引。队列满了生产者阻塞,等待消费者消费节点唤醒生产者。队列为空消费者阻塞,等待生产者生产节点唤醒消费者。
最后,如果本帖对您有一定的帮助,希望能点赞+关注+收藏!您的支持是给我最大的动力,后续会一直更新各种框架的使用和框架的源码解读~!