分析
LinkedBlockingQueue有如下特点:
- 近乎无界队列,但可以是有界队列
- 实现了BlockingQueue接口
- 需要实现take方法和put方法,实现阻塞效果
- 数据结构是单链表,有head跟last指针来进行入队出队操作
- 有两把锁,读写分离
- 所以也有两个条件队列,分别用来提醒消费者继续消费,或者生产者继续生产
手写源码过程中,还是如往常一样,需要注意的是,按照Api要求来实现自己的代码。
源码
下面只是简单的实现了take()
跟put()
方法,其他就略过了
package org.tuling.juc.queue;
import java.util.AbstractQueue;
import java.util.Collection;
import java.util.Iterator;
import java.util.concurrent.BlockingDeque;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
/**
* 手写LinkedBlockingQueue
*
* @tag 【LinkedBlockingQueue】 【手写】
* @Date 2023/8/1 19:13
* @slogan 编码即学习
**/
public class MyLinkedBlockingQueue<E> extends AbstractQueue<E> implements BlockingDeque<E> {
/**
* 容量
*/
private final int capacity;
/**
* 当前队列任务数量
*/
private final AtomicInteger count = new AtomicInteger();
/**
* 链表头节点
*/
transient Node<E> head;
/**
* 链表尾节点
*/
transient Node<E> last;
/**
* 拿锁(读锁)
*/
private final ReentrantLock takeLock = new ReentrantLock();
/**
* 队列不空,条件
* 通知消费者可以去消费任务了。当队列为空时,消费者会阻塞在这个条件上,等待唤醒
*/
private Condition notEmpty = takeLock.newCondition();
/**
* 添加锁(写锁)
*/
private final ReentrantLock putLock = new ReentrantLock();
/**
* 队列不满,条件
* 通知生产者可以去生产任务了。当队列满时,生产者会阻塞在这个条件上,等待唤醒
*/
private Condition notFull = putLock.newCondition();
/**
* 默认的构造函数
* 会创建一个capacity为Integer.MAX_VALUE的阻塞队列
*/
public MyLinkedBlockingQueue() {
this(Integer.MAX_VALUE);
}
/**
* 创建一个指定容量capacity的的阻塞队列
*
* @param capacity 容量大小
*/
public MyLinkedBlockingQueue(int capacity) {
this.capacity = capacity;
head = new Node(null);
last = head;
}
/**
* 添加元素方法(可阻塞等待)
*
* <p>
* 以下是BlockingQueue接口要求:
* 将指定的元素插入此队列,并在必要时等待可用空间。
* 参数: E -要添加的元素
* 抛出:
* InterruptedException -如果在等待时中断
* ClassCastException——如果指定元素的类阻止它被添加到这个队列
* NullPointerException -如果指定的元素为空
* IllegalArgumentException -如果指定元素的某些属性阻止它被添加到这个队列
* </p>
*/
@Override
public void put(E e) throws InterruptedException {
if (e == null) {
throw new NullPointerException();
}
// 添加元素
Node<E> node = new Node<>(e);
final ReentrantLock putLock = this.putLock;
final AtomicInteger count = this.count;
putLock.lockInterruptibly();
try {
while (count.get() == this.capacity) {
// 队列满了
notFull.await();
}
// 入队
enqueue(node);
int andIncrement = count.getAndIncrement();
// 判断是否需要唤醒生产者(可能这边生产,另一头在消费了)
if (andIncrement + 1 < capacity) {
notFull.signal();
}
} finally {
putLock.unlock();
}
// 还要通知消费者继续消费了
if (count.get() > 0) {
// 通知消费者
this.signalNotEmpty();
}
}
/**
* 通知消费者,队列不为空了
* 这里需要上锁,为什么?因为如果不上锁,可能会有其他消费线程并发进来,发现队列是空的,然后进入了沉睡(与其沉睡上下文切换,还不如让它竞争锁)
*/
private void signalNotEmpty() {
final ReentrantLock takeLock = this.takeLock;
takeLock.lock();
try {
notEmpty.signal();
} finally {
takeLock.unlock();
}
}
/**
* 入队(尾插法)
*
* @param node 需要入队的节点
*/
private void enqueue(Node<E> node) {
last.next = node;
last = last.next;
}
/**
* 出队(可阻塞等待)
*
* <p>
* 以下是BlockingQueue接口要求:
* 检索并删除此队列的头部,必要时等待,直到元素可用为止。
* 返回:
* 这个队列的头
* 抛出:
* InterruptedException -如果在等待时中断
* </p>
*/
@Override
public E take() throws InterruptedException {
E result;
final ReentrantLock takeLock = this.takeLock;
final AtomicInteger count = this.count;
takeLock.lock();
try {
while (count.get() == 0) {
// 队列为空
notEmpty.await();
}
// 出队
result = dequeue();
int andDecrement = count.getAndDecrement();
// 判断是否需要唤醒消费者(可能这边消费,另一头在生产了)
if (andDecrement > 1) {
notEmpty.signal();
}
} finally {
takeLock.unlock();
}
// 通知身缠这生产
if (count.get() < capacity) {
this.signalNotFull();
}
return result;
}
private void signalNotFull() {
final ReentrantLock putLock = this.putLock;
putLock.lock();
try {
this.notFull.signal();
} finally {
putLock.unlock();
}
}
/**
* 出队
*/
private E dequeue() {
Node<E> head = this.head;
Node<E> first = head.next;
// 这样设置,方便将以前的头节点给GC了
head.next = null;
this.head = first;
E item = first.item;
first.item = null;
return item;
}
@Override
public Iterator<E> iterator() {
return null;
}
@Override
public Iterator<E> descendingIterator() {
return null;
}
@Override
public void push(E e) {
}
@Override
public E pop() {
return null;
}
@Override
public void addFirst(E e) {
}
@Override
public void addLast(E e) {
}
@Override
public boolean offerFirst(E e) {
return false;
}
@Override
public boolean offerLast(E e) {
return false;
}
@Override
public E removeFirst() {
return null;
}
@Override
public E removeLast() {
return null;
}
@Override
public E pollFirst() {
return null;
}
@Override
public E pollLast() {
return null;
}
@Override
public E getFirst() {
return null;
}
@Override
public E getLast() {
return null;
}
@Override
public E peekFirst() {
return null;
}
@Override
public E peekLast() {
return null;
}
@Override
public void putFirst(E e) throws InterruptedException {
}
@Override
public void putLast(E e) throws InterruptedException {
}
@Override
public boolean offerFirst(E e, long timeout, TimeUnit unit) throws InterruptedException {
return false;
}
@Override
public boolean offerLast(E e, long timeout, TimeUnit unit) throws InterruptedException {
return false;
}
@Override
public E takeFirst() throws InterruptedException {
return null;
}
@Override
public E takeLast() throws InterruptedException {
return null;
}
@Override
public E pollFirst(long timeout, TimeUnit unit) throws InterruptedException {
return null;
}
@Override
public E pollLast(long timeout, TimeUnit unit) throws InterruptedException {
return null;
}
@Override
public boolean removeFirstOccurrence(Object o) {
return false;
}
@Override
public boolean removeLastOccurrence(Object o) {
return false;
}
@Override
public boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException {
return false;
}
@Override
public E poll(long timeout, TimeUnit unit) throws InterruptedException {
return null;
}
@Override
public int remainingCapacity() {
return 0;
}
@Override
public int drainTo(Collection<? super E> c) {
return 0;
}
@Override
public int drainTo(Collection<? super E> c, int maxElements) {
return 0;
}
@Override
public int size() {
return 0;
}
@Override
public boolean offer(E e) {
return false;
}
@Override
public E poll() {
return null;
}
@Override
public E peek() {
return null;
}
/**
* 内部单链表
* 维护阻塞队列
*/
public static final class Node<E> {
/**
* 当前节点
*/
E item;
/**
* 下一个节点
*/
Node<E> next;
public Node(E item) {
this.item = item;
}
}
}
使用示例
public class MyLinkedBlockingQueueTest {
public static void main(String[] args) throws InterruptedException {
// 指定队列的大小创建有界队列
// BlockingQueue<Integer> boundedQueue = new LinkedBlockingQueue<>(10);
BlockingQueue<Integer> boundedQueue = new MyLinkedBlockingQueue<>(10);
// 向队列中添加元素
boundedQueue.put(3);
// 从队列中取出元素
Integer take = boundedQueue.take();
System.out.println(take);
}
}