高并发下阻塞队列的选择
一、队列
队列:queue。简称队,它和堆栈一样,也是一种运算受限的线性表,其限制是仅允许在表的一端进行插入,而在表的另一端进行删除。
简单的说,采用该结构的集合,对元素的存取有如下的特点:
- 先进先出(即,存进去的元素,要在它
前面
的元素依次取出后,才能取出该元素) - 队列的入口、出口各占一侧
数据结构演示网站:
https://www.cs.usfca.edu/~galles/visualization/QueueArray.html
Queue接口
队列在Java中有对应的接口Queue
- add:添加元素并返回true,如果队列满抛出异常
- offer:添加元素并返回true,如果队列满返回false
- remove:删除队首元素并返回元素,队列为空抛出异常
- poll:删除队首元素并返回元素,队列为空返回null
- element:返回队首元素,不移除,队列为空抛出异常
- peek:返回队首元素,不移除,队列为空返回null
二、阻塞队列
阻塞队列(BlockingQueue)是java.util.concurrent包下的阻塞队列接口,BlockingQueue提供了线程安全的队列访问方式:向阻塞队列中插入数据时,如果队列满了,线程将会阻塞等待,直到队列有空闲,从阻塞队列中取数据时,如果队列为空,线程将会阻塞等待,直到队列非空。
BlockingQueue接口
方法 | 抛出异常 | 返回特定值 | 阻塞 | 阻塞指定时间 |
---|---|---|---|---|
入队 | add(e) | offer(e) | put(e) | offer(e,time,unit) |
出队 | remove() | poll() | take() | poll(time,unit) |
获取队首元素 | element() | peek() | 不支持 | 不支持 |
常用方法详细说明
- 入队
- put(e):插入元素时,如果队列已满,进入阻塞,直到队列有空闲
- offer(e):插入元素时,如果队列已满,返回false,不会进入阻塞
- offer(e,time,unit):插入元素时,如果队列已满,进入阻塞,超过阻塞时间返回false
- 出队
- poll():获取元素时,如果队列为空,返回null,不进入阻塞
- poll(time,unit):获取元素时,如果队列为空,进入阻塞,超过阻塞时间返回null
- take():获取元素时,如果队列为空,进入阻塞,直到队列插入数据
应用场景
- 线程池
- 线程池中的任务队列通常是一个阻塞队列,当任务数超出了线程池的容量时,新提交的任务就会被存入任务队列中进行等待,线程池中的工作线程从任务队列中获取出任务进行执行,如果队列为空,那么工作线程就会被阻塞,一直等到队列中有新任务被提交。
- 生产者-消费者
-
生产者-消费者模型下,生产者负责向队列中添加元素,消费者负责从队列中消费元素,阻塞队列可以解决生产者和消费者的并发问题。
-
生产者插入数据时,如果队列中满了,就会进行阻塞等待,直到消费者消费了元素
-
消费者消费元素时,如果队列中为空,就会进行阻塞等待,直到生产者生产了元素
-
- 消息队列
- 消息队列使用了阻塞队列来存储消息,生产者生成出消息存入队列中,消费者从队列中消费消息,消息队列可以实现异步通信,提高了系统的吞吐量和响应性能,并且还可以将不同的组件解耦,提高了系统的可维护性以及可扩展性
- 缓存系统
- 缓存系统使用阻塞队列存储缓存数据,当缓存数据被更新时,它会被存入队列中,其他线程可以从队列中获取更新后的缓存数据进行使用,使用阻塞队列可以避免并发更新缓存数据时的冲突
- 并发任务处理
- 并发处理时,可以将待处理的任务存入队列中,多个工作线程可以从队列中获取任务进行处理,使用阻塞队列可以避免多个线程同时处理同一个任务的问题,并且可以把任务的提交和任务的执行进行解耦。
阻塞队列在实际场景中可以帮我们解决并发问题
三、JUC包下的阻塞队列
队列 | 描述 |
---|---|
ArrayBlockingQueue | 基于数组结构实现的一个有界阻塞队列 |
LinkedBlockingQueue | 基于链表结构实现的一个无界阻塞队列,指定容量为有界阻塞队列 |
PriorityBlockingQueue | 支持根据优先级排序的无界阻塞队列 |
DelayQueue | 基于优先级队列(PriorityBlockingQueue)实现的无界阻塞队列,支持延迟获取元素 |
SynchronousQueue | 不存储元素的队列 |
LinkedTransferQueue | 基于链表结构实现的一个无界阻塞队列 |
LinkedBlockingDeque | 基于链表结构实现的一个双端阻塞队列 |
四、ArrayBlockingQueue
ArrayBlockingQueue内部是用数组存储元素的,初始化时需要指定容量的大小
ArrayBlockingQueue基本使用
/**
* 有界阻塞队列基本使用
*
* @throws InterruptedException
*/
public static void queueHandler00() throws InterruptedException {
BlockingQueue<Object> objects = new ArrayBlockingQueue<>(1024);
objects.put("程云");
Object take = objects.take();
System.out.println(take);
}
在生产者-消费者模型中,生产者生产数据的速度和消费者消费数据的速度匹配下,可以使用ArrayBlockingQueue,如果生产的速度比消费的速度快,会导致队列满,出现生产线程的阻塞。
/**
* 有界阻塞队列 - 生产者与消费者模式
*/
public static void queueHandler01() {
BlockingQueue<String> queue = new ArrayBlockingQueue(5);
// 生产者生产数据
Runnable producer = () -> {
while (true) {
try {
queue.put("程云");
System.out.println("生产者生产了一个元素,队列中元素个数:" + queue.size());
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
};
new Thread(producer).start();
// 消费者消费数据
Runnable consumer = () -> {
while (true) {
try {
String take = queue.take();
System.out.println("消费者消费了一个元素,队列中元素个数:" + queue.size());
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
};
new Thread(consumer).start();
}
流量控制
/**
* 流量控制
*/
public static void queueHandler02() {
BlockingQueue queue = new ArrayBlockingQueue(100);
new Thread(new Runnable() {
@Override
public void run() {
// 处理请求
while (true) {
// 获取队列中的请求
Object poll = queue.poll();
if (poll != null) {
// 处理请求
System.out.println("处理请求:" + poll + ",队列元素个数:" + queue.size());
try {
// 模拟处理时间
Thread.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
}).start();
// 模拟200次请求
for (int i = 0; i < 200; i++) {
// 重试机制
// 判断队列是否已满
if (queue.size() >= 100) {
try {
// 等待一段时间重试
TimeUnit.MILLISECONDS.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
// 将请求任务添加到队列中
queue.offer(new Object());
}
}
ArrayBlockingQueue底层
ArrayBlockingQueue使用ReentrantLock实现了线程的安全,入队和出队操作的是同一个锁对象,那么只能有一个线程进行入队或出队,因此生产者和消费者无法并行操作,在高并发下会成为性能瓶颈。
ArrayBlockingQueue双指针
使用双指针可以避免数组的复制操作,如果使用的是单指针,当删除元素时,后面所有的元素都需要向前移动,这样会导致时间复杂度为O(n),但是使用双指针的话,我们只需要将takeIndex指向下一个元素,不需要将其前面的元素向前移动,当插入元素时,我们只需要将元素插入到putIndex指向的位置,不需要将其后面所有的元素向后移动,这样时间复杂度都是O(1)级别,提高了队列的性能。
五、LinkedBlockingQueue
LinkedBlockingQueue是一个基于链表实现的阻塞队列,该阻塞队列的大小默认是Integer.MAX_VALUE,由于这个数值特别大,所以LinkedBlockingQueue被称为无界队列,表示几乎没有界限,并且队列还可以随着元素的添加进行动态的增长,但是如果满了,就会抛出OOM异常,因此为了避免出现异常情况,建议手动设置一个大小。
LinkedBlockingQueue基本使用
package com.cy.example.blockingqueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
/**
* @Description TODO
* @Author 程云
* @Date 2024/8/24 17:54
* @Version 1.0
*/
public class LinkedBlockingQueueClass {
public static void main(String[] args) throws InterruptedException {
BlockingQueue<String> queue1 = new LinkedBlockingQueue<>();
BlockingQueue<String> queue2 = new LinkedBlockingQueue<>(1024);
// 插入元素
queue2.put("程云");
// 队列长度
System.out.println(queue2.size());
// 获取元素
System.out.println(queue2.take());
}
}
LinkedBlockingQueue基本原理
LinkedBlockingQueue底层由单链表实现,只能从head取元素,从tail添加元素,并且采用了两把锁的锁分离技术实习了入队出队互不阻塞,添加元素和获取元素都有独立的锁,LinkedBlockingQueue是读写分离的,读写操作可以并行执行
六、LinkedBlockingQueue和ArrayBlockingQueue区别
- 队列大小不同
- ArrayBlockingQueue是有界队列,必须指定大小,LinkedBlockingQueue是无界队列,默认有大小,如果指定大小,也可成为有界队列,在无界下,如果添加速度大于移除速度,可能会出现内存溢出等问题
- 数据存储容器不同
-
ArrayBlockingQueue的数据存储容器是数组,LinkedBlockingQueue的数据存储容器是链表
-
ArrayBlockingQueue数组存储容器,在插入或删除元素时不会产生或销毁任何额外的对象实例,LinkedBlockingQueue会生成一个Node对象,这可能会在长时间内需要高并发的处理大数据时,对于GC可能存在影响
-
- 锁不同
- ArrayBlockingQueue队列中的锁是没有分离,添加移除操作都是一个锁,LinkedBlockingQueue队列的锁是分离的,提高队列的吞吐量,在高并发下生产者和消费者可以并行执行,提高并发性能
七、DelayQueue
DelayQueue是一个支持延迟获取元素的阻塞队列,内部采用优先队列PriorityQueue存储元素,同时元素必须实现Delayed接口,在创建元素时可以指定获取元素的延迟时间,只能在指定时间内获取元素。
延迟队列不是先进先出,而是根据延迟时间的长短进行排序,下一个即将执行的任务会排到队列的最前面
DelayQueue存入的元素必须实现Delay接口,Delay接口继承了Comparable接口,因此拥有了排序的能力
package com.cy.example.blockingqueue;
import java.util.concurrent.DelayQueue;
import java.util.concurrent.Delayed;
import java.util.concurrent.TimeUnit;
/**
* @Description TODO
* @Author 程云
* @Date 2024/8/25 9:19
* @Version 1.0
*/
public class DelayQueueClass {
public static void main(String[] args) {
DelayQueue<Delayed> delayeds = new DelayQueue<>();
}
}
package java.util.concurrent;
/**
* A mix-in style interface for marking objects that should be
* acted upon after a given delay.
*
* <p>An implementation of this interface must define a
* {@code compareTo} method that provides an ordering consistent with
* its {@code getDelay} method.
*
* @since 1.5
* @author Doug Lea
*/
public interface Delayed extends Comparable<Delayed> {
/**
* Returns the remaining delay associated with this object, in the
* given time unit.
* 返回与此对象关联的剩余延迟,在给定时间单位。
*
* @param unit the time unit
* @return the remaining delay; zero or negative values indicate
* that the delay has already elapsed
*/
long getDelay(TimeUnit unit);
}
DelayQueue应用场景
- 商城订单超时关闭
- 异步短信通知功能
- 关闭空闲连接
- 缓存过期清楚
- 任务超时处理
订单延迟消费
package com.cy.example.blockingqueue;
import java.time.ZoneId;
import java.time.ZonedDateTime;
import java.time.temporal.ChronoUnit;
import java.util.concurrent.DelayQueue;
import java.util.concurrent.Delayed;
import java.util.concurrent.TimeUnit;
/**
* @Description 延迟订单业务
* @Author 程云
* @Date 2024/8/25 9:38
* @Version 1.0
*/
public class Order implements Delayed {
private String orderId;
private ZonedDateTime expireTime;
public Order() {
}
public Order(String orderId, ZonedDateTime expireTime) {
this.orderId = orderId;
this.expireTime = expireTime;
}
public String getOrderId() {
return orderId;
}
public void setOrderId(String orderId) {
this.orderId = orderId;
}
public ZonedDateTime getExpireTime() {
return expireTime;
}
public void setExpireTime(ZonedDateTime expireTime) {
this.expireTime = expireTime;
}
/**
* 计算出订单延迟剩余的时间
*
* @param unit 时间单位
* @return
*/
@Override
public long getDelay(TimeUnit unit) {
// 过期时间转换时间戳 - 当前系统的时间戳
long convert = unit.convert(expireTime.toInstant().toEpochMilli() - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
return convert;
}
/**
* 排序时间
*
* @param o
* @return
*/
@Override
public int compareTo(Delayed o) {
if (this.getDelay(TimeUnit.MILLISECONDS) > o.getDelay(TimeUnit.MILLISECONDS)) {
return 1;
} else if (this.getDelay(TimeUnit.MILLISECONDS) < o.getDelay(TimeUnit.MILLISECONDS)) {
return -1;
} else {
return 0;
}
}
public static void main(String[] args) throws InterruptedException {
DelayQueue<Order> delayeds = new DelayQueue<>();
delayeds.put(new Order("order1001",ZonedDateTime.now(ZoneId.of("UTC")).plus(5,ChronoUnit.SECONDS)));
delayeds.put(new Order("order1002",ZonedDateTime.now(ZoneId.of("UTC")).plus(2,ChronoUnit.SECONDS)));
delayeds.put(new Order("order1003",ZonedDateTime.now(ZoneId.of("UTC")).plus(4,ChronoUnit.SECONDS)));
while (!delayeds.isEmpty()){
Order order = delayeds.take();
System.out.println("处理订单:"+order.getOrderId());
}
}
}
DelayQueue底层原理
- DelayQueue内部使用了PriorityQueue队列来维护元素的排序,元素根据延迟时间来进行排序,延迟时间最短的元素排在队列的头部
- 使用take或poll方法获取元素时,如果队列为空,进入阻塞,直到元素达到指定延迟时间
- 元素的延迟时间通过getDelay方法返回,如果返回值小于等于0,表示该元素已经到期,可以从队列中取出
八、如何选择合适的阻塞队列
- 功能
- 是否需要阻塞队列帮我们排序,比如优先级排序、延迟执行等,如果有这个需要,就选择类似于PriorityBlockingQueue之类的有排序能力的阻塞队列
- 容量
- 是否有存储的要求,还是只需要直接传递,在考虑这一点的时候,我们知道前面介绍的那几种阻塞队列,有的是容量固定的,如 ArrayBlockingQueue;有的 默认是容量无限的,如 LinkedBlockingQueue;而有的里面没有任何容量,如 SynchronousQueue;而对于 DelayQueue 而言,它的容量固定就是 Integer.MAX_VALUE。所以不同阻塞队列的容量是千差万别的,我们需要根据任务数量来推算出合适的容量,从而去选取合适的 BlockingQueue。
- 扩容
- 是能否扩容。因为有时我们并不能在初始的时候很好的准确估计队列的大小,因为业务可能有高峰期、低谷期。如果一开始就固定一个容量,可能无法应对所有的情况,也是不合适的,有可能需要动态扩容。如果我们需要动态扩容的话,那么就不能选择 ArrayBlockingQueue ,因为它的容量在创建时就确定了,无法扩容。相反,PriorityBlockingQueue 即使在指定了初始容量之后,后续如果有需要,也可以自动扩容。所以我们可以根据是否需要扩容来选取合适的队列。
- 内存结构
- 我们分析过 ArrayBlockingQueue 的源码,看到了它的内部结构是“数组”的形式。和它不同的是,LinkedBlockingQueue 的内部是用链表实现的,所以这里就需要我们考虑到,ArrayBlockingQueue 没有链表所需要的“节点”,空间利用率更高。所以如果我们对性能有要求可以从内存的结构角度去考虑这个问题。
- 性能
- 从性能的角度去考虑。比如 LinkedBlockingQueue 由于拥有两把锁,它的操作粒度更细,在并发程度高的时候,相对于只有一把锁的 ArrayBlockingQueue 性能会更好。另外,SynchronousQueue 性能往往优于其他实现,因为它只需要“直接传递”,而不需要存储的过程。如果我们的场景需要直接传递的话,可以优先考虑SynchronousQueue。
线程池对于阻塞队列的选择
线程池有很多种,不同种类的线程池会根据自己的特点,选择合适的阻塞队列
Executors类下的线程池类型:
- FixedThreadPool(SingleThreadExecutor同理)
- 选择的是LinkedBlockingQueue
- CachedThreadPool
- 选择的是SynchronousQueue
- ScheduledThreadPool(SingleThreadScheduledExecutor同理)
内存的结构角度去考虑这个问题。 - 性能
- 从性能的角度去考虑。比如 LinkedBlockingQueue 由于拥有两把锁,它的操作粒度更细,在并发程度高的时候,相对于只有一把锁的 ArrayBlockingQueue 性能会更好。另外,SynchronousQueue 性能往往优于其他实现,因为它只需要“直接传递”,而不需要存储的过程。如果我们的场景需要直接传递的话,可以优先考虑SynchronousQueue。
线程池对于阻塞队列的选择
线程池有很多种,不同种类的线程池会根据自己的特点,选择合适的阻塞队列
Executors类下的线程池类型:
- FixedThreadPool(SingleThreadExecutor同理)
- 选择的是LinkedBlockingQueue
- CachedThreadPool
- 选择的是SynchronousQueue
- ScheduledThreadPool(SingleThreadScheduledExecutor同理)
- 选择的是延迟队列