一、引言
并发编程在互联网技术使用如此广泛,几乎所有的后端技术面试官都要在并发编程的使用和原理方面对小伙伴们进行 360° 的刁难。
二、使用
对于阻塞队列,想必大家应该都不陌生,我们这里简单的介绍一下,对于 Java
里面的阻塞队列,其使用了 生产者和消费者 的模型
对于生产者来说,主要有以下几部分:
add(E) // 添加数据到队列,如果队列满了,无法存储,抛出异常
offer(E) // 添加数据到队列,如果队列满了,返回false
offer(E,timeout,unit) // 添加数据到队列,如果队列满了,阻塞timeout时间,如果阻塞一段时间,依然没添加进入,返回false
put(E) // 添加数据到队列,如果队列满了,挂起线程,等到队列中有位置,再扔数据进去,死等!
复制代码
对于消费者来说,主要有以下几部分:
remove() // 从队列中移除数据,如果队列为空,抛出异常
poll() // 从队列中移除数据,如果队列为空,返回null,么的数据
poll(timeout,unit) // 从队列中移除数据,如果队列为空,挂起线程timeout时间,等生产者扔数据,再获取
take() // 从队列中移除数据,如果队列为空,线程挂起,一直等到生产者扔数据,再获取
复制代码
我们本篇来讲讲堵塞队列中的第二员猛将,LinkedBlockingQueue
的故事
我们先来看其基本使用
public class LinkedBlockingQueueTest {
public static void main(String[] args) throws Exception {
LinkedBlockingQueue queue = new LinkedBlockingQueue();
// 生产者扔数据
queue.add("1");
queue.offer("2");
queue.offer("3", 2, TimeUnit.SECONDS);
queue.put("2");
// 消费者取数据
System.out.println(queue.remove());
System.out.println(queue.poll());
System.out.println(queue.poll(2, TimeUnit.SECONDS));
System.out.println(queue.take());
}
}
复制代码
三、源码
1、初始化
由于我们的 LinkedBlockingQueue
底层是链表实现的,所以我们初始化的时候不需要指定其大小
LinkedBlockingQueue queue = new LinkedBlockingQueue();
// 如果我们不指定容量大小的话,这里的容量默认为Integer.MAX_VALUE
public LinkedBlockingQueue() {
this(Integer.MAX_VALUE);
}
public LinkedBlockingQueue(int capacity) {
// 如果容量传进来是小于等于0的,直接抛异常
if (capacity <= 0){
throw new IllegalArgumentException();
}
// 当前的容量赋值
this.capacity = capacity;
// 这里其实和我们的AQS有点像
// 搞一个虚拟的头结点,减少后面的判空
last = head = new Node<E>(null);
}
复制代码
当然,除了我们初始化的这些成员变量,我们还有一部分:
class Node<E> {
// 当前的数据
E item;
// 指向下一个数据的指针
Node<E> next;
Node(E x) {
item = x;
}
}
// 当前链表中存在的数据数量
private final AtomicInteger count = new AtomicInteger();
// 读锁
private final ReentrantLock takeLock = new ReentrantLock();
// 唤醒消费者线程
private final Condition notEmpty = takeLock.newCondition();
// 写锁
private final ReentrantLock putLock = new ReentrantLock();
// 唤醒生产者线程
private final Condition notFull = putLock.newCondition();
复制代码
这里可能有的小伙伴有点懵逼,为什么这哥们(LinkedBlockingQueue
)用了两个锁呢?为什么我 ArrayBlockingQueue
只能用一把锁?
不要急,我们慢慢的往下看他源码
2、生产者的源码
2.1 add()源码实现
public boolean add(E e) {
return super.add(e);
}
// 走到这里会发现,我们的add方法就是调用了offer方法
// offer: 添加数据到队列,如果队列满了,返回false
// 所以这里offer满了,就会抛出异常:"Queue full"
public boolean add(E e) {
if (offer(e))
return true;
else
throw new IllegalStateException("Queue full");
}
复制代码
2.2 offer()源码实现
public boolean offer(E e) {
// 如果是空值,直接抛出异常
if (e == null) throw new NullPointerException();
// 引用,上篇我们分析过
final AtomicInteger count = this.count;
// 判断当前数据量是否和我们总容量一样
if (count.get() == capacity){
return false;
}
// 标记位
int c = -1;
// 创建节点
Node<E> node = new Node<E>(e);
// 引用写锁
final ReentrantLock putLock = this.putLock;
// 上锁
putLock.lock();
try {
// 如果当前数据量小于总容量
// 这里我们上面也检查过,相当于DCL的意思
if (count.get() < capacity) {
// 插入队列
enqueue(node);
// 得到当前数据量
// 这里需要注意:getAndIncrement先返回数据,再加一
c = count.getAndIncrement();
// 如果我们发现当前数据量还小于总容量
// 也就是我们可以继续放数据
if (c + 1 < capacity)
// 唤醒其他的生产者线程扔数据
// 当然这里稍微多说一点,这里的唤醒指的是将生产者从Condition队列放到AQS队列中
// 具体什么时候执行还需要看AQS的调度
notFull.signal();
}
} finally {
// 解锁
putLock.unlock();
}
// 如果我们当前数据量为0,代表队列中原来无数据
// 但上面现在扔进去了一个
if (c == 0)
// 需要唤醒所有的消费者消费数据
signalNotEmpty();
return c >= 0;
}
private void enqueue(Node<E> node) {
// 将当面节点挂在last节点后
// 将last节点指向当前节点
last = last.next = node;
}
// 这里我们的Condition聊过
// 必须持有当前锁资源才可以使用Condition的方法
private void signalNotEmpty() {
// 拿到读锁
final ReentrantLock takeLock = this.takeLock;
// 加锁
takeLock.lock();
try {
// 唤醒消费者线程
notEmpty.signal();
} finally {
// 解锁
takeLock.unlock();
}
}
复制代码
2.3 offer(time)源码实现
public boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException {
// 如果是空值,直接抛出异常
if (e == null) throw new NullPointerException();
// 转成统一的单位
long nanos = unit.toNanos(timeout);
int c = -1;
// 写锁
final ReentrantLock putLock = this.putLock;
// 当前容量
final AtomicInteger count = this.count;
// 加锁
putLock.lockInterruptibly();
try {
// 如果当前数据量小于总容量
// 这里我们上面也检查过,相当于DCL的意思
while (count.get() == capacity) {
// 如果我们剩余时间小于0,直接失败即可
if (nanos <= 0)
return false;
// 反之生产者线程写入挂起nanos时间
nanos = notFull.awaitNanos(nanos);
}
// 添加至队列
enqueue(new Node<E>(e));
// 得到当前数据量
// 这里需要注意:getAndIncrement先返回数据,再加一
c = count.getAndIncrement();
// 如果我们发现当前数据量还小于总容量
// 也就是我们可以继续放数据
if (c + 1 < capacity)
// 唤醒其他的生产者线程扔数据
// 当然这里稍微多说一点,这里的唤醒指的是将生产者从Condition队列放到AQS队列中
// 具体什么时候执行还需要看AQS的调度
notFull.signal();
} finally {
// 解锁
putLock.unlock();
}
// 如果我们当前数据量为0,代表队列中原来无数据
// 但现在扔进去了一个,唤醒消费者线程
if (c == 0)
signalNotEmpty();
return true;
}
复制代码
2.4 put()源码实现
- 这里就不写了,其实和我们的 offer 一样,大家自己看看就好
public void put(E e) throws InterruptedException {
if (e == null) throw new NullPointerException();
int c = -1;
Node<E> node = new Node<E>(e);
final ReentrantLock putLock = this.putLock;
final AtomicInteger count = this.count;
putLock.lockInterruptibly();
try {
while (count.get() == capacity) {
notFull.await();
}
enqueue(node);
c = count.getAndIncrement();
if (c + 1 < capacity)
notFull.signal();
} finally {
putLock.unlock();
}
if (c == 0)
signalNotEmpty();
}
复制代码
3、消费者的源码
3.1 remove()源码实现
public E remove() {
E x = poll();
if (x != null)
return x;
else
throw new NoSuchElementException();
}
复制代码
3.2 poll()源码实现
public E poll() {
// 获取当前链表的数据量
final AtomicInteger count = this.count;
// 如果数据量为0,说明无数据
// 消费者无法消费,直接返回null即可
if (count.get() == 0)
return null;
E x = null;
int c = -1;
// 拿到读锁
final ReentrantLock takeLock = this.takeLock;
// 加锁
takeLock.lock();
try {
// 如果数据量大于0,说明有数据
// 这里我们上面也检查过,相当于DCL的意思
if (count.get() > 0) {
// 取数
x = dequeue();
// 得到当前数据量
// 这里需要注意:getAndIncrement先返回数据,再减一
c = count.getAndDecrement();
// 如果我们的数据量大于1,则唤醒消费者来消费
if (c > 1)
notEmpty.signal();
}
} finally {
// 解锁
takeLock.unlock();
}
// 如果数据量等于当前的总容量
// 说明当前的链表已经有空余了,唤醒生产者生产
if (c == capacity)
signalNotFull();
return x;
}
// 这个取数据和我们的AQS有点像
// 去除当前数据并且将当前节点作为头结点
private E dequeue() {
Node<E> h = head;
Node<E> first = h.next;
h.next = h; // help GC
head = first;
E x = first.item;
first.item = null;
return x;
}
private void signalNotFull() {
// 拿到写锁
final ReentrantLock putLock = this.putLock;
// 上锁
putLock.lock();
try {
// 唤醒生产者
notFull.signal();
} finally {
// 解锁
putLock.unlock();
}
}
复制代码
3.3 poll(time)源码实现
public E poll(long timeout, TimeUnit unit) throws InterruptedException {
E x = null;
int c = -1;
// 统一时间单位
long nanos = unit.toNanos(timeout);
// 拿到当前数据量 + 读锁
final AtomicInteger count = this.count;
final ReentrantLock takeLock = this.takeLock;
// 加可中断锁
takeLock.lockInterruptibly();
try {
// 如果当前的数据量为0
while (count.get() == 0) {
// 如果时间没有剩余,直接返回null即可
if (nanos <= 0)
return null;
// 让消费者线程等待nanos时间
nanos = notEmpty.awaitNanos(nanos);
}
// 取数据
x = dequeue();
// 后面都是一样的
c = count.getAndDecrement();
if (c > 1)
notEmpty.signal();
} finally {
takeLock.unlock();
}
if (c == capacity)
signalNotFull();
return x;
}
复制代码
3.4 take()源码实现
- 这个大家可以自己看一下补充,也算一个小测试
public E take() throws InterruptedException {
E x;
int c = -1;
final AtomicInteger count = this.count;
final ReentrantLock takeLock = this.takeLock;
takeLock.lockInterruptibly();
try {
while (count.get() == 0) {
notEmpty.await();
}
x = dequeue();
c = count.getAndDecrement();
if (c > 1)
notEmpty.signal();
} finally {
takeLock.unlock();
}
if (c == capacity)
signalNotFull();
return x;
}
复制代码
4、疑惑
看到这里,我想大家可能有和我一样的疑惑?
之前我们聊 ArrayBlockingQueue
的时候,他只用了一把锁(互斥锁),但 LinkedBlockingQueue
却使用了两把锁(读锁、写锁)
这时候你脑子会不会有一种疑问,我 ArrayBlockingQueue
能不能使用两把锁(读锁、写锁)来进行访问
如果你有这种想法,说明你确实思考了,哈哈哈
没错,博主我查阅了相关的资料,ArrayBlockingQueue
确实可以使用两把锁进行逻辑的更改
整体的逻辑基本上是仿造 LinkedBlockingQueue
的业务逻辑改造的,经测试这种性能要比原始的 ArrayBlockingQueue
要快 20%~30% 左右,感兴趣的也可以自己去测试一下。
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
public class ArrayBlockingQueueUsingTwoLockApproach {
/** The queued items */
final Object[] items;
/** items index for next take, poll, peek or remove */
int takeIndex;
/** items index for next put, offer, or add */
int putIndex;
/** Current number of elements */
private final AtomicInteger count = new AtomicInteger();
/** Lock held by take, poll, etc */
private final ReentrantLock takeLock = new ReentrantLock();
/** Wait queue for waiting takes */
private final Condition notEmpty = takeLock.newCondition();
/** Lock held by put, offer, etc */
private final ReentrantLock putLock = new ReentrantLock();
/** Wait queue for waiting puts */
private final Condition notFull = putLock.newCondition();
public ArrayBlockingQueueUsingTwoLockApproach(int capacity) {
if (capacity <= 0)
throw new IllegalArgumentException();
this.items = new Object[capacity];
}
public void put(Object e) throws InterruptedException {
if (e == null) throw new NullPointerException();
int c = -1;
final ReentrantLock putLock = this.putLock;
final AtomicInteger count = this.count;
putLock.lockInterruptibly();
try {
while (count.get() == items.length) {
notFull.await();
}
enqueue(e);
c = count.getAndIncrement();
if (c + 1 < items.length)
notFull.signal();
} finally {
putLock.unlock();
}
if (c == 0)
signalNotEmpty();
}
public Object take() throws InterruptedException {
Object x;
int c = -1;
final AtomicInteger count = this.count;
final ReentrantLock takeLock = this.takeLock;
takeLock.lockInterruptibly();
try {
while (count.get() == 0) {
notEmpty.await();
}
x = dequeue();
c = count.getAndDecrement();
if (c > 1)
notEmpty.signal();
} finally {
takeLock.unlock();
}
if (c == items.length)
signalNotFull();
return x;
}
private void signalNotEmpty() {
final ReentrantLock takeLock = this.takeLock;
takeLock.lock();
try {
notEmpty.signal();
} finally {
takeLock.unlock();
}
}
/**
* Inserts element at current put position, advances, and signals.
* Call only when holding lock.
*/
private void enqueue(Object x) {
// assert lock.getHoldCount() == 1;
// assert items[putIndex] == null;
final Object[] items = this.items;
items[putIndex] = x;
if (++putIndex == items.length)
putIndex = 0;
count.incrementAndGet();
}
/**
* Extracts element at current take position, advances, and signals.
* Call only when holding lock.
*/
private Object dequeue() {
// assert lock.getHoldCount() == 1;
// assert items[takeIndex] != null;
final Object[] items = this.items;
@SuppressWarnings("unchecked")
Object x = (Object) items[takeIndex];
items[takeIndex] = null;
if (++takeIndex == items.length)
takeIndex = 0;
count.decrementAndGet();
return x;
}
/**
* Signals a waiting put. Called only from take/poll.
*/
private void signalNotFull() {
final ReentrantLock putLock = this.putLock;
putLock.lock();
try {
notFull.signal();
} finally {
putLock.unlock();
}
}
}
复制代码
四、流程图
其实,我们 LinkedBlockingQueue
整体的代码逻辑和 ArrayBlockingQueue
类似,只不过底层数据结构不同罢了
我们这里简单的画一下,有兴趣的同学也可以自己画吆