java中Queue、BlockingQueue以及DelayQueue的用法
- 一 Queue 的用法
- Java中Queue的api
- 二 BlockingQueue 的用法
- 阻塞队列的边界
- 三 DelayQueue使用
- DelayQueue常见的应用场景
一 Queue 的用法
Queue(队列):其特性是
先进先出
。只允许在表的一端进行插入,而在表的另一端进行删除。插入叫做入队,删除叫做出队。
Java中Queue的api
- int size():获取队列长度;
- boolean add(E) / boolean offer(E):添加元素到队尾;
- E remove() / E poll():获取队首元素并从队列中删除;
- E element() / E peek():获取队首元素但并不从队列中删除。
例子demo:
import java.util.LinkedList;
import java.util.Queue;
public class Main {
public static void main(String[] args) {
//add()和remove()方法在失败的时候会抛出异常(不推荐)
// 多态 LinkedList 是 Queue 的子类
Queue<String> queue = new LinkedList<String>();
//添加元素
queue.offer("a");
queue.offer("b");
queue.offer("c");
queue.offer("d");
queue.offer("e");
for(String q : queue){
System.out.println(q);
}
System.out.println("===");
System.out.println("poll="+queue.poll()); //返回第一个元素,并在队列中删除
for(String q : queue){
System.out.println(q);
}
System.out.println("===");
System.out.println("element="+queue.element()); //返回第一个元素
for(String q : queue){
System.out.println(q);
}
System.out.println("===");
System.out.println("peek="+queue.peek()); //返回第一个元素
for(String q : queue){
System.out.println(q);
}
}
}
// 结果
a
b
c
d
e
===
poll=a
b
c
d
e
===
element=b
b
c
d
e
===
peek=b
b
c
d
e
二 BlockingQueue 的用法
BlockingQueue 继承了 Queue 接口,是一种阻塞队列
何为阻塞队列?
- 支持阻塞的
插入
方法put
:队列满时,队列会阻塞插入元素的线程,直到队列不满。
- 支持阻塞的
移除
方法take
:队列空时,获取元素的线程会等待队列变为非空。
阻塞队列的边界
所谓的边界其实就是
容量
问题。边界分为有界(有固定容量)
和无界(容量特别大)
两种
无界队列意味着里面
可以容纳非常多的元素
,例如 LinkedBlockingQueue 的上限是 Integer.MAX_VALUE,是非常大的一个数,可以近似认为是无限容量,因为我们几乎无法把这个容量装满。
但是有的阻塞队列是有界的,例如 ArrayBlockingQueue 如果容量满了,也不会扩容,所以一旦满了就无法再往里放数据了。
三 DelayQueue使用
DelayQueue一个
延迟队列
。在指定时间
才能获取队列元素,队列头元素是即将过期的元素。且 DelayQueue实现了BlockingQueue是阻塞队列
我们使用的话,需要实现两个方法:
- getDelay(TimeUnit.NANOSECONDS)
- compareTo。返回元素在队列中的排序方式,一般根据getDelay来计算
DelayQueue是一个没有边界BlockingQueue实现,加入其中的元素必需实现Delayed接口。
-
当生产者线程
调用put之类
的方法加入元素
时,会触发Delayed接口中的compareTo
方法进行排序
,也就是说队列中元素的顺序是按到期时间排序的,而非它们进入队列的顺序
。排在队列头部的元素是最早到期的,越往后到期时间越晚。 -
消费者线程查看队列头部的元素,注意是查看不是取出。然后调用元素的
getDelay
方法,如果此方法返回的值小0或者等于0,则消费者线程会从队列中取出此元素,并进行处理
。如果getDelay方法返回的值大于0,则消费者线程wait返回的时间值后,再从队列头部取出元素
,此时元素应该已经到期。
DelayQueue常见的应用场景
- 淘宝订单业务:下单之后如果三十分钟之内没有付款就自动取消订单。
- 饿了吗订餐通知:下单成功后60s之后给用户发送短信通知。
- 任务超时处理。在网络协议滑动窗口请求应答式交互时,处理超时未响应的请求等。
demo: 用户下订单,到指定时间不支付处理订单过期
/**
一:定义延时队列的队列元素
* 延时队列。
* 每次下订单的时候,把订单数据放入队列。
* 自定义消费者,消费队列。
*/
public class MyDelayMessage implements Delayed{
/**
* 默认延迟30秒
*/
private static final long DELAY_MS = 1000L * 30;
/**
* 订单id
*/
private final String orderId;
/**
* 消息创建的时间戳
*/
private final long createTime;
/**
* 过期时间
*/
private final long expire;
public MyDelayMessage(String orderId){
this.orderId = orderId;
this.createTime = System.currentTimeMillis();
this.expire = this.createTime + DELAY_MS;//计算出过期时长
}
public MyDelayMessage(String orderId,long expireSec){
this.orderId = orderId;
this.createTime = System.currentTimeMillis();
this.expire = this.createTime + expireSec * 1000L;//计算出过期时长
}
/**
* 返回延迟时长
*/
@Override
public long getDelay(TimeUnit unit) {
//根据当前时间计算
return unit.convert(this.expire - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
}
/*
对插入队列的数据进行排序。
*/
@Override
public int compareTo(Delayed o) {
return (int) (this.getDelay(TimeUnit.MILLISECONDS) - o.getDelay(TimeUnit.MILLISECONDS));
}
}
/**
二:创建延时队列。并插入队列元素或者移除元素
1. 单例模式创建延时队列
2. 写插入元素与移除元素的方法
*/
public class MyDelayQueue {
private static DelayQueue<MyDelayMessage> queue = new DelayQueue<>();
private MyDelayQueue(){}
private static class SingletonHolder{
private static MyDelayQueue singleton = new MyDelayQueue();
}
//单例队列
public static MyDelayQueue getQueue(){
return SingletonHolder.singleton;
}
public Boolean produce(MyDelayMessage message){
return queue.add(message);
}
/**
* 延迟消费队列,取不到的话会阻塞一直到队列有消息再被唤醒。之后再取消息
*/
public MyDelayMessage consume() throws InterruptedException {
return queue.take();
}
}
/*
三:插入队列。
前端下订单,后端插入队列。
*/
@Controller
@RequestMapping("order")
public class OrderController extends BaseController {
/**
* 下订单
*/
@PostMapping()
@ResponseBody
public WSResponseVO doOrder(){
//调用生成订单逻辑,并返回订单id
String orderId = UUID.randomUUID().toString();
//此处设置过期时间,10s
MyDelayQueue.getQueue().produce(new MyDelayMessage(orderId,10));
return operateSuccess();
}
}
/**
* 四:创建消费者,用于消费队列中的元素
*/
@Component
public class OrderCancelTask implements ApplicationRunner {
/**
* 在容器启动完成的时候,会执行一个线程。从MyDelayQueue中取消息
*/
@Override
public void run(ApplicationArguments args) throws Exception {
System.out.println("========开启处理过期订单线程=========");
new Thread(()->{
while (true){
MyDelayMessage msg = null;
try {
msg = MyDelayQueue.getQueue().consume();
if(msg != null){
//订单过期的业务逻辑。。。。
System.out.println("有订单 " + msg.getOrderId() + " 过期");
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}).start();
}
}