文章目录
- ①. 什么是阻塞队列
- ②. BlockingQueue的主要方法
- ③. BlockingQueue的实现类
- ④. Linked和Array区别
- ⑤. 不推荐使用快捷的线程池
①. 什么是阻塞队列
-
①.阻塞队列:从名字可以看出,它也是队列的一种,那么它肯定是一个先进先出FIFO的数据结构。与普通队列不同的是,他支持两个附加操作,即阻塞添加和阻塞删除方法
-
②. 线程1往阻塞队列中添加元素,而线程2从阻塞队列中移除元素。而在这一系列操作必须符合以下规定:
- 阻塞添加:当阻塞队列是满时,往队列里添加元素的操作将被阻塞
- 阻塞移除:当阻塞队列是空时,从队列中获取元素/删除元素的操作将被阻塞
- ③. 现有三个角色:顾客,休息区,银行办理窗口。Thread1为顾客,BlockingQueue为休息区,Thread2为银行办理窗口
- 正常情况下,一个银行办理窗口同一时间只能对接一个顾客
- 恰巧今天办理的顾客有3个人,另外2个顾客怎么办,你总不至于给人家说不办了,快回家吧
- 而正确的做法是你可以让这两个人在休息区等候,等银行窗口空闲了,然后去办理
- ④. 阻塞队列的好处:阻塞队列不用手动控制什么时候该被阻塞,什么时候该被唤醒,简化了操作
②. BlockingQueue的主要方法
- ①. BlockingQueue提供的部分方法:
public interface BlockingQueue<E> extends Queue<E> {
boolean add(E e);
boolean offer(E e);
void put(E e) throws InterruptedException;
boolean offer(E e, long timeout, TimeUnit unit)throws InterruptedException;
E take() throws InterruptedException;
E poll(long timeout, TimeUnit unit)throws InterruptedException;
int remainingCapacity();
boolean remove(Object o);
public boolean contains(Object o);
int drainTo(Collection<? super E> c);
int drainTo(Collection<? super E> c, int maxElements);
}
- ②. 根据插入和取出两种类型的操作,具体分为下面一些类型:
- 抛出异常是指当队列满时,再次插入会抛出异常如果队列未满,插入返回值未true
- 返回布尔是指当队列满时,再次插入会返回false
- 阻塞是指当队列满时,再次插入会被阻塞,直到队列取出一个元素,才能插入
- 超时是指当一个时限过后,才会插入或者取出
- ③. 生产 - add、offer、put这3个方法都是往队列尾部添加元素,区别如下:
- add:不会阻塞,添加成功时返回true,不响应中断,当队列已满导致添加失败时抛出IllegalStateException
- offer:不会阻塞,添加成功时返回true,因队列已满导致添加失败时返回false,不响应中断
- put:会阻塞会响应中断
- ④. 消费 - take、poll方法能获取队列头部第1个元素,区别如下:
- 会响应中断,会一直阻塞直到取得元素或当前线程中断
- 会响应中断,会阻塞,阻塞时间参照方法里参数timeout.timeUnit,当阻塞时间到了还没取得元素会返回null
- ⑤. add方法代码以及结果
public class BlockingQueueTest {
public static void main(String[] args) {
BlockingQueue<String> blockingQueue = new ArrayBlockingQueue<String>(3);
System.out.println("--------以下为add的相关操作---------");
addRemoveTest(blockingQueue);
}
public static void addRemoveTest(BlockingQueue<String> blockingQueue) {
System.out.println("添加状态+\t"+blockingQueue.add("1"));
System.out.println("添加状态+\t"+blockingQueue.add("2"));
System.out.println("添加状态+\t"+blockingQueue.add("3"));
// System.out.println("添加状态+\t"+blockingQueue.add("4"));
System.out.println("队首元素+\t"+blockingQueue.element());
System.out.println("删除元素+\t"+blockingQueue.remove());
System.out.println("队首元素+\t"+blockingQueue.element());
System.out.println("删除元素+\t"+blockingQueue.remove());
System.out.println("队首元素+\t"+blockingQueue.element());
System.out.println("删除元素+\t"+blockingQueue.remove());
// System.out.println("队首元素+\t"+blockingQueue.element());
// System.out.println("删除元素+\t"+blockingQueue.remove(blockingQueue.element()));
}
}
// 未打开注释代码输出如下:
--------以下为add的相关操作---------
添加状态+ true
添加状态+ true
添加状态+ true
队首元素+ 1
删除元素+ 1
队首元素+ 2
删除元素+ 2
队首元素+ 3
删除元素+ 3
// 当队列已满,继续添加元素时打开注释代码,输出如下:
--------以下为add的相关操作---------
添加状态+ true
添加状态+ true
添加状态+ true
Exception in thread "main" java.lang.IllegalStateException: Queue full
at java.util.AbstractQueue.add(AbstractQueue.java:98)
- ⑥. offer方法代码
public class BlockingQueueTest {
public static void main(String[] args) {
BlockingQueue<String> blockingQueue = new ArrayBlockingQueue<String>(3);
System.out.println("--------以下为offer的相关操作---------");
offerPollTest(blockingQueue);
}
private static void offerPollTest(BlockingQueue<String> blockingQueue) {
System.out.println("添加状态+\t"+blockingQueue.offer("1"));
System.out.println("添加状态+\t"+blockingQueue.offer("2"));
System.out.println("添加状态+\t"+blockingQueue.offer("3"));
System.out.println("添加状态+\t"+blockingQueue.offer("4"));
System.out.println("队首元素+\t"+blockingQueue.peek());
System.out.println("删除元素+\t"+blockingQueue.poll());
System.out.println("队首元素+\t"+blockingQueue.peek());
System.out.println("删除元素+\t"+blockingQueue.poll());
System.out.println("队首元素+\t"+blockingQueue.peek());
System.out.println("删除元素+\t"+blockingQueue.poll());
System.out.println("删除元素+\t"+blockingQueue.poll());
}
}
// 输出如下:
--------以下为offer的相关操作---------
添加状态+ true
添加状态+ true
添加状态+ true
添加状态+ false
队首元素+ 1
删除元素+ 1
队首元素+ 2
删除元素+ 2
队首元素+ 3
删除元素+ 3
删除元素+ null
// 注意:当队列没有元素的时候使用poll,返回null
- ⑦. put方法代码
public class BlockingQueueTest {
public static void main(String[] args) {
BlockingQueue<String> blockingQueue = new ArrayBlockingQueue<String>(3);
System.out.println("--------以下为put的相关操作---------");
putTakeTest(blockingQueue);
}
private static void putTakeTest(BlockingQueue<String> blockingQueue) {
try{
blockingQueue.put("1");
blockingQueue.put("2");
blockingQueue.put("3");
// blockingQueue.put("4");
System.out.println("删除元素+\t"+blockingQueue.take());
blockingQueue.put("4");
System.out.println("删除元素+\t"+blockingQueue.take());
System.out.println("删除元素+\t"+blockingQueue.take());
System.out.println("删除元素+\t"+blockingQueue.take());
}catch(Exception e){
e.getStackTrace();
}
}
}
// 打开注释代码输出如下:(程序未停止)
--------以下为put的相关操作---------
// 未打开注释代码输出如下:
--------以下为put的相关操作---------
删除元素+ 1
删除元素+ 2
删除元素+ 3
删除元素+ 4
③. BlockingQueue的实现类
- ①. 从整体架构图上来看,BlockingQueue是实现了Queue接口,而Queue是属于Collection接口下的派生类
-
②. ArrayBlockingQueue: 由数组结构组成的有界阻塞队列
-
③. LinkedBlockingQueue: 由链表结构组成的有界(但大小默认值Integer>MAX_VAL UE)阻塞队列
需要注意的是LinkedBlockingQueue虽然是有界的,但有个巨坑,其默认大小是IntegerMAX_VALUE,高达21亿,一般情况下内存早爆了在线程池的ThreadPoolExecutor有体现 -
④. SynchronousQueue:不存储元素的阻塞队列,也即是单个元素的队列
- SynchronousQueue没有容量,与其他BlcokingQueue不同,SynchronousQueue是一个不存储元素的BlcokingQueue
- 每个put操作必须要等待一个take操作,否则不能继续添加元素,反之亦然
public class SynchronusQueueTest {
public static void main(String[] args) {
BlockingQueue<String> synchronusQueue = new SynchronousQueue<>();
new Thread(() ->{
try{
System.out.println(Thread.currentThread().getName()+"\t put 1");
synchronusQueue.put("1");
System.out.println(Thread.currentThread().getName()+"\t put 2");
synchronusQueue.put("2");
System.out.println(Thread.currentThread().getName()+"\t put 3");
synchronusQueue.put("3");
}catch(Exception e){
e.getStackTrace();
}
},"Prod").start();
new Thread(() ->{
try {
try{ TimeUnit.SECONDS.sleep(3); }catch (InterruptedException e){ e.printStackTrace(); }
System.out.println(Thread.currentThread().getName()+"\t take "+synchronusQueue.take());
try{ TimeUnit.SECONDS.sleep(3); }catch (InterruptedException e){ e.printStackTrace(); }
System.out.println(Thread.currentThread().getName()+"\t take "+synchronusQueue.take());
try{ TimeUnit.SECONDS.sleep(3); }catch (InterruptedException e){ e.printStackTrace(); }
System.out.println(Thread.currentThread().getName()+"\t take"+synchronusQueue.take());
} catch (Exception e) {
e.printStackTrace();
}
},"Cons").start();
}
// 输出如下:有时间间隔
Prod put 1
Cons take 1
Prod put 2
Cons take 2
Prod put 3
Cons take3
-
⑤. PriorityBlockingQueue:支持优先级排序的无界阻塞队列
-
⑥. LinkedTransferQueue:由链表结构组成的无界阻塞队列
-
⑦. LinkedBlockingDeque:由了解结构组成的双向阻塞队列
④. Linked和Array区别
- ①. 队列大小不同:
- arrayBlockingQueue在初始化的时候,必须指定队列的大小
- 而LinkedBlockingQueue在初始化的时候,如果你没有指定大小,则会默认Integer.MAX_VALUE,是一个很大的值,这样就会导致数据在一个不可控范围,一旦添加速度远大于移除的速度时,可能会有内存泄漏的风险
- ②. 底层实现不同
- arrayBlockingQueue的底层是一个数组
- LinkedBlockingQueue底层是一个链表结构
- 官方文档介绍中,LinkedBlockingQueue的吞吐行是高于arrayBlockingQueue;但是在添加或移除元素中,LinkedBlockingQueue则会生成一个额外的Node对象,对GC可能存在影响
-
④. 为什么说LinkedBlockingQueue的吞吐性是高于arrayBlockingQueue?
吞吐性能强是因为有两个锁,试想一下,Array里面使用的是一个锁,不管put还是take行为,都可能被这个锁卡住,而Linked里面put和take是两个锁,put只会被put行为卡住,而不会被take卡住,因此吞吐性能自然强于Array。 而“less predictable performance”这个也是显而易见的,Array采用的时固定内存,而Linked采用的时动态内存,无论是分配内存还是释放内存甚至GC动态内存的性能自然都会比固定内存要差 -
⑤. 锁机制不一样
arrayBlockingQueue使用的一个锁来控制,LinkedBlockingQueue使用了2个锁来控制,一个名为putLock,另一个是takeLock,但是锁的本质都是ReentrantLock
⑤. 不推荐使用快捷的线程池
-
①. 我们需要根据自己的场景、并发情况来评估线程池的几个核心参数,包括核心线程数、最大线程数、线程回收策略、工作队列的类型,以及拒绝策略,确保线程池的工作行为符合需求,一般都需要设置有界的工作队列和可控的线程数
-
②. 任何时候,都应该为自定义线程池指定有意义的名称,以方便排查问题。当出现线程数量暴增、线程死锁、线程占用大量 CPU、线程执行出现异常等问题时,我们往往会抓取线程栈。此时,有意义的线程名称,就可以方便我们定位问题
-
③. newFixedThreadPool方法的源码不难发现,线程池的工作队列直接new了一个LinkedBlockingQueue,而默认构造方法的 LinkedBlockingQueue是一个Integer.MAX_VALUE 长度的队列,可以认为是无界的
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
public class LinkedBlockingQueue<E> extends AbstractQueue<E>
implements BlockingQueue<E>, java.io.Serializable {
...
/**
* Creates a {@code LinkedBlockingQueue} with a capacity of
* {@link Integer#MAX_VALUE}.
*/
public LinkedBlockingQueue() {
this(Integer.MAX_VALUE);
}
...
}
- ④. 翻看newCachedThreadPool的源码可以看到,这种线程池的最大线程数是Integer.MAX_VALUE,可以认为是没有上限的,而其工作队列 SynchronousQueue是一个没有存储空间的阻塞队列。这意味着,只要有请求到来,就必须找到一条工作线程来处理,如果当前没有空闲的线程就再创建一条新的
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
- ⑤. 线程池的OOM问题,可能是队列满造成的(LinkedBlockingQueue),也可能是线程太多造成的(SynchronousQueue)