延时队列的三种实现方案
- 什么是延时队列
- 延时队列的应用场景
- 基于Java DelayQueue的实现
- 源码剖析
- 基于Redis的zset实现
- 实现步骤
- Redis延时队列优势
- Redis延时队列劣势
- 基于RabbitMQ的延时队列实现
- TTL + DXL(死信队列)
- 插件实现
- 总结
- 参考文章
什么是延时队列
在分布式系统中,延时队列(Delay Queue)是一个常见的工具,它 允许程序能够按照预定时间处理任务(类似于定时任务)。延时队列允许我们将任务延时到指定的时间执行,这样就可以将任务按照优先级和执行时间来处理,从而提高系统的可靠性和性能。
延时队列是一种特殊的队列,相比于普通队列(先进先出)最大的区别就体现在其延时属性上。在这种队列中,每个元素都有一个预设的延时时间,只有当这个时间到期后,元素才可以被消费
。这种机制使得延时队列可以用于实现定时任务、消息重试等功能。
延时队列的应用场景
延时队列在实际应用中有很多应用场景,例如:
- 定时任务:使用延时队列可以实现定时任务,例如每隔一段时间执行某个操作,或者在特定的时间点执行某个操作。
注:延时队列和定时任务的区别:
- 定时任务一般是有固定时间周期的,有明确的触发时间。而延时任务一般没有固定的开始时间,它常常是由一个事件触发的,而在这个事件触发之后的一段时间内执行,没有执行周期。
- 定时任务一般批处理操作多个任务,延时任务一般是单个任务。
- 消息重试:在分布式系统中,消息可能因为网络原因或其他原因无法成功送达,此时可以使用延时队列实现消息的重试机制。消息发送失败后,将消息存入延时队列,设置一个合适的延时时间,当时间到期后,重新发送消息。
- 缓解并发压力:在高并发场景下,将大量请求先存入延时队列,然后由消费者逐一处理,从而避免瞬间请求对系统造成压力。
- 订单超时自动取消:电商系统中,用户下单后需要在一定时间内付款,否则订单会被自动取消。这种场景下,可以使用延时队列实现订单的超时监控。(类似于上面说的定时任务)
- 消息通知:在很多业务场景中,需要给用户发送消息通知,但是由于某些原因,这些消息不能及时发送。例如:当用户购买一件商品时,需要在3天内发货,如果超时未发货,需要给用户发送一条消息通知。这时候就可以通过延时队列来实现。(服务端主动向客户端发送消息)
这些场景都有一个特点,需要在某个事件发生之后或者之前的指定时间点完成某一项任务,如: 发生订单生成事件,在十分钟之后检查该订单支付状态,然后将未支付的订单进行关闭;看起来似乎使用定时任务,一直轮询数据,每秒查一次,取出需要被处理的数据,然后处理不就完事了吗?如果数据量比较少,确实可以这样做,比如:对于“如果账单一周内未支付则进行自动结算”这样的需求, 如果对于时间不是严格限制,而是宽松意义上的一周,那么每天晚上跑个定时任务检查一下所有未支 付的账单,确实也是一个可行的方案。但对于数据量比较大,并且时效性较强的场景,如:“订单十 分钟内未支付则关闭“,短期内未支付的订单数据可能会有很多,活动期间甚至会达到百万甚至千万 级别,对这么庞大的数据量仍旧使用轮询的方式显然是不可取的,很可能在一秒内无法完成所有订单 的检查,同时会给数据库带来很大压力,无法满足业务要求而且性能低下。
接下来,我们将学习几种常见的延时队列实现方式,包括Java中的DelayQueue,Redis的zset和RabbitMQ的延时队列。
基于Java DelayQueue的实现
DelayQueue
是Java并发包java.util.concurrent
中提供的一个支持延时获取元素的无界阻塞队列。它的内部实现是基于优先级队列(PriorityQueue),按照元素的过期时间进行排序。
在DelayQueue
中,元素必须实现Delayed
接口,该接口提供了getDelay()
方法,该方法返回元素的剩余延时时间。
DelayQueue
是一个无界队列,它不允许插入null
元素,并且元素必须是可比较的。它的元素会按照剩余延时时间的升序排列。在DelayQueue
中,当调用take()
方法时,如果队列中没有元素,则线程会阻塞等待,直到有元素被添加到队列中。
下面是一个使用Java DelayQueue实现延时队列的示例代码:
import java.util.concurrent.*;
public class DelayQueueDemo {
public static void main(String[] args) throws InterruptedException {
DelayQueue<Message> queue = new DelayQueue<>();
queue.put(new Message("message 1", 2000));
queue.put(new Message("message 2", 1000));
queue.put(new Message("message 3", 3000));
while (!queue.isEmpty()) {
System.out.println(queue.take());
}
}
}
class Message implements Delayed {
private String message;
private long delayTime;
public Message(String message, long delayTime) {
this.message = message;
this.delayTime = System.currentTimeMillis() + delayTime;
}
@Override
public long getDelay(TimeUnit unit) {
long diff = delayTime - System.currentTimeMillis();
return unit.convert(diff, TimeUnit.MILLISECONDS);
}
@Override
public int compareTo(Delayed o) {
if (this.delayTime < ((Message) o).delayTime) {
return -1;
} else if (this.delayTime > ((Message) o).delayTime) {
return 1;
}
return 0;
}
@Override
public String toString() {
return "Message{" +
"message='" + message + '\'' +
", delayTime=" + delayTime +
'}';
}
}
在上面的示例中,我们创建了一个DelayQueue
,并向队列中添加了三个元素。每个元素都是Message
对象,该对象实现了Delayed
接口,并实现了getDelay()
和compareTo()
方法。在主函数中,我们使用while
循环不断从队列中取出元素,直到队列为空。
源码剖析
public class DelayQueue<E extends Delayed> extends AbstractQueue<E>
implements BlockingQueue<E> {
// 一个重入锁,用于保证线程安全
private final transient ReentrantLock lock = new ReentrantLock();
// 一个优先级队列,用于存储元素,并维护堆的结构,内部是一个基于数组实现的小顶堆
private final PriorityQueue<E> q = new PriorityQueue<E>();
// 一个线程变量,用于记录当前等待堆顶元素到期的线程
private Thread leader = null;
/**
* 一个条件变量,用于实现阻塞等待和唤醒机制
* 在队列中存在元素的时候,第一个调用take()方法的线程将成为leader线程,
* 它将会在available上等待队列头结点剩余的延迟时间
* 其他的线程将会成为follower线程,它们会一直在available上一直等待
* leader线程苏醒之后会将leader变量置空,在获取到元素之后最后会唤醒一个在available上等待的follower线程
* 被唤醒的follower线程将可能成为新的leader线程
*/
private final Condition available = lock.newCondition();
/**
* Creates a new {@code DelayQueue} that is initially empty.
*/
public DelayQueue() {}
/**
* Creates a {@code DelayQueue} initially containing the elements of the
* given collection of {@link Delayed} instances.
*
* @param c the collection of elements to initially contain
* @throws NullPointerException if the specified collection or any
* of its elements are null
*/
public DelayQueue(Collection<? extends E> c) {
this.addAll(c);
}
public boolean add(E e) {
return offer(e);
}
/**
* Inserts the specified element into this delay queue.
*
* @param e the element to add
* @return {@code true}
* @throws NullPointerException if the specified element is null
*/
public boolean offer(E e) {
// 获取锁,保证线程同步
final ReentrantLock lock = this.lock;
lock.lock();
try {
// 调用优先级队列的offer方法将元素插入到数组中,并调整堆的结构(根据compareTo方法比较并构建小顶堆)
q.offer(e);
// 判断插入的元素是否是堆顶元素(新加入的元素e是否是延迟时间最短的元素)
if (q.peek() == e) {
/** 如果是,则将leader线程置空,让后来的线程可以当选为leader
* 唤醒一个在available上等待的消费线程,让它和新消费线程重新争夺leader。
**/
leader = null;
available.signal();
}
// 这里不会检查队列是否满了,所以一定会返回true
return true;
} finally {
lock.unlock();
}
}
public void put(E e) {
offer(e);
}
public boolean offer(E e, long timeout, TimeUnit unit) {
return offer(e);
}
/**
* Retrieves and removes the head of this queue, or returns {@code null}
* if this queue has no elements with an expired delay.
*
* @return the head of this queue, or {@code null} if this
* queue has no elements with an expired delay
*/
public E poll() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
// 调用优先级队列的peek方法获取堆顶元素
E first = q.peek();
// 判断该元素是否为空或者未到期
if (first == null || first.getDelay(NANOSECONDS) > 0)
return null;
else
// 如果不是,则调用优先级队列的poll方法将堆顶元素弹出,并调整堆的结构
return q.poll();
} finally {
lock.unlock();
}
}
/**
* Retrieves and removes the head of this queue, waiting if necessary
* until an element with an expired delay is available on this queue.
*
* @return the head of this queue
* @throws InterruptedException {@inheritDoc}
* 出队,获取并移除此延迟队列已过期的队头,如果此时没有已过期的队头,那么一直等待。
*/
public E take() throws InterruptedException {
// 获取锁,如果被中断则抛出异
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
//开启一个死循环
for (;;) {
// 调用优先级队列的peek方法获取堆顶元素
E first = q.peek();
// 如果为空,则阻塞等待在available条件上,直到被唤醒或者中断
if (first == null)
available.await();
else {
// 如果不为空,则获取该元素的剩余延迟时间
long delay = first.getDelay(NANOSECONDS);
// 如果小于等于0,则说明该元素已经到期,
// 调用优先级队列的poll方法将堆顶元素弹出,并调整堆的结构
if (delay <= 0)
return q.poll();
// 如果大于0即未到期,则将first置空方便gc回收
first = null; // don't retain ref while waiting
// 判断leader线程是否为空
if (leader != null)
// 如果不为空,则说明有其他线程正在等待堆顶元素到期,
// 当前线程也阻塞等待在available条件上,直到被唤醒或者中断
available.await();
else {
/** 如果为空,则将当前线程设为leader线程,并阻塞等待堆顶元素的剩余延迟时间,
* 在finally块中判断当前线程是否是leader线程,如果是,则将leader线程置空,
* 并唤醒下一个等待在available条件上的线程
**/
Thread thisThread = Thread.currentThread();
leader = thisThread;
try {
// 阻塞等待堆顶元素的剩余延迟时间,直到被唤醒或者中断
available.awaitNanos(delay);
} finally {
// 如果leader还是当前线程就把它置为空,让其他线程有机会获取元素
if (leader == thisThread)
leader = null;
}
}
}
}
} finally {
// 如果leader为空且堆顶还有元素,就唤醒下一个等待的线程
if (leader == null && q.peek() != null)
available.signal();
lock.unlock();
}
}
/**
* Retrieves and removes the head of this queue, waiting if necessary
* until an element with an expired delay is available on this queue,
* or the specified wait time expires.
*
* @return the head of this queue, or {@code null} if the
* specified waiting time elapses before an element with
* an expired delay becomes available
* @throws InterruptedException {@inheritDoc}
* 在指定时间内等待获得到期的队列头;如果在队头过期之前超过了指定的等待时间,则返回 null。
*/
public E poll(long timeout, TimeUnit unit) throws InterruptedException {
long nanos = unit.toNanos(timeout);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
for (;;) {
E first = q.peek();
if (first == null) {
if (nanos <= 0)
return null;
else
nanos = available.awaitNanos(nanos);
} else {
long delay = first.getDelay(NANOSECONDS);
if (delay <= 0)
return q.poll();
if (nanos <= 0)
return null;
first = null; // don't retain ref while waiting
if (nanos < delay || leader != null)
nanos = available.awaitNanos(nanos);
else {
Thread thisThread = Thread.currentThread();
leader = thisThread;
try {
long timeLeft = available.awaitNanos(delay);
nanos -= delay - timeLeft;
} finally {
if (leader == thisThread)
leader = null;
}
}
}
}
} finally {
if (leader == null && q.peek() != null)
available.signal();
lock.unlock();
}
}
/**
* Retrieves, but does not remove, the head of this queue, or
* returns {@code null} if this queue is empty. Unlike
* {@code poll}, if no expired elements are available in the queue,
* this method returns the element that will expire next,
* if one exists.
*
* @return the head of this queue, or {@code null} if this
* queue is empty
*/
public E peek() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
return q.peek();
} finally {
lock.unlock();
}
}
/**
* Returns first element only if it is expired.
* Used only by drainTo. Call only when holding lock.
*/
private E peekExpired() {
// assert lock.isHeldByCurrentThread();
E first = q.peek();
return (first == null || first.getDelay(NANOSECONDS) > 0) ?
null : first;
}
/**
* Removes a single instance of the specified element from this
* queue, if it is present, whether or not it has expired.
*/
public boolean remove(Object o) {
final ReentrantLock lock = this.lock;
lock.lock();
try {
return q.remove(o);
} finally {
lock.unlock();
}
}
/**
* Returns an iterator over all the elements (both expired and
* unexpired) in this queue. The iterator does not return the
* elements in any particular order.
*
* <p>The returned iterator is
* <a href="package-summary.html#Weakly"><i>weakly consistent</i></a>.
*
* @return an iterator over the elements in this queue
*/
public Iterator<E> iterator() {
return new Itr(toArray());
}
/**
* Snapshot iterator that works off copy of underlying q array.
*/
private class Itr implements Iterator<E> {
final Object[] array; // Array of all elements
int cursor; // index of next element to return
int lastRet; // index of last element, or -1 if no such
Itr(Object[] array) {
lastRet = -1;
this.array = array;
}
public boolean hasNext() {
return cursor < array.length;
}
@SuppressWarnings("unchecked")
public E next() {
if (cursor >= array.length)
throw new NoSuchElementException();
lastRet = cursor;
return (E)array[cursor++];
}
public void remove() {
if (lastRet < 0)
throw new IllegalStateException();
removeEQ(array[lastRet]);
lastRet = -1;
}
}
}
leader线程是一个线程变量,它记录了当前正在等待堆顶元素到期的线程。它的作用是为了避免多个线程同时等待同一个元素到期,从而造成资源浪费和竞争。如果有多个线程同时调用take方法,只有第一个线程会被设为leader线程,并阻塞等待堆顶元素的剩余延迟时间,其他线程则会阻塞等待在available条件上,直到被唤醒或者中断。当leader线程被唤醒或者中断后,它会尝试获取堆顶元素,如果成功,则将leader线程置空,并唤醒下一个等待在available条件上的线程,让它成为新的leader线程;如果失败,则重新进入循环判断堆顶元素是否到期。这样就保证了每次只有一个线程在等待堆顶元素到期,提高了效率和公平性。
举个例子:
假设有三个线程A、B、C同时调用take方法,队列中有一个元素D,它的延迟时间是10秒,当前时间是0秒。
- 线程A先获取到锁,然后获取堆顶元素D,发现它还没有到期,判断leader线程为空,将自己设为leader线程,并阻塞等待10秒。
- 线程B后获取到锁,然后获取堆顶元素D,发现它还没有到期,判断leader线程不为空,阻塞等待在available条件上。
- 线程C再获取到锁,然后获取堆顶元素D,发现它还没有到期,判断leader线程不为空,阻塞等待在available条件上。
- 10秒后,线程A被唤醒,然后获取堆顶元素D,发现它已经到期,调用优先级队列的poll方法将堆顶元素弹出,并调整堆的结构。然后判断自己是否是leader线程,如果是,则将leader线程置空,并唤醒下一个等待在available条件上的线程(假设是线程B)。
- 线程B被唤醒,然后获取堆顶元素D,发现它为空(因为已经被线程A弹出了),阻塞等待在available条件上(直到有新的元素入队或者被中断)。
- 线程C仍然阻塞等待在available条件上(直到有新的元素入队或者被中断)。
基于Redis的zset实现
Redis的zset
(有序集合)也可以用来实现延时队列。我们知道,zset
是一种特殊的集合,其内部成员都是有序排列的,每个元素都关联一个分数值,根据这个分数值对元素进行排序。我们可以将元素的过期时间作为分数值,从而实现延时队列。
有序集合(zset)同样使用了两种不同的存储结构,分别是 zipList(压缩列表)(在Redis7中是listpack)和 skipList(跳跃列表),具体可以看下面这篇博客:Redis底层数据结构分析(二) —— Hash结构_redis哈希表长度_小熊不吃香菜的博客-CSDN博客
实现步骤
- 将任务的到期时间作为分值(
zadd key timestamp task
),将任务的内容作为成员,添加到zset中。 - 使用
zrangebyscore
命令,根据当前时间戳,获取分值小于等于当前时间戳的成员,即到期的任务。 - 使用ZREM命令,删除获取到的成员,防止重复执行。
- 对获取到的任务进行后续处理
下面是一个使用Redis的zset实现延时队列的示例代码:
import redis.clients.jedis.Jedis;
import java.util.Set;
public class RedisDelayQueueDemo {
private static final String QUEUE_NAME = "delay-queue";
public static void main(String[] args) throws InterruptedException {
Jedis jedis = new Jedis("localhost", 6379);
// 向队列中添加元素
jedis.zadd(QUEUE_NAME, System.currentTimeMillis() + 2000, "message 1");
jedis.zadd(QUEUE_NAME, System.currentTimeMillis() + 1000, "message 2");
jedis.zadd(QUEUE_NAME, System.currentTimeMillis() + 3000, "message 3");
while (true) {
Set<String> messages = jedis.zrangeByScore(QUEUE_NAME, 0, System.currentTimeMillis(), 0, 1);
if (!messages.isEmpty()) {
String message = messages.iterator().next();
jedis.zrem(QUEUE_NAME, message);
System.out.println(message);
} else {
Thread.sleep(1000);
}
}
}
}
在上面的示例中,我们使用Jedis连接Redis,并向队列中添加了三个元素,每个元素都是一个字符串,它们的score值分别为元素过期时间。然后,我们使用一个while循环不断从zset中取出过期的元素,并将其从zset中移除。
Redis延时队列优势
- Redis zset支持高性能的 score 排序。
- Redis是在内存上进行操作的,速度非常快。
- Redis可以搭建集群,当消息很多时候,我们可以用集群来提高消息处理的速度,提高可用性。
- Redis具有持久化机制,当出现故障的时候,可以通过AOF和RDB方式来对数据进行恢复,保证了数据的可靠性。
Redis延时队列劣势
- 使用 Redis 实现的延时消息队列也存在数据持久化, 消息可靠性的问题
- 没有重试机制 。处理消息出现异常没有重试机制, 这些需要自己去实现, 包括重试次数的实现等
- 没有 ACK 机制。 例如在获取消息并已经删除了消息情况下, 正在处理消息的时候客户端崩溃了, 这条正在处理的这些消息就会丢失, MQ 是需要明确的返回一个值给 MQ 才会认为这个消息是被正确的消费了。
如果对消息可靠性要求较高, 推荐使用 MQ 来实现
基于RabbitMQ的延时队列实现
RabbitMQ是一个广泛使用的消息队列中间件,它也支持延时队列的功能。我们可以通过为队列设置消息的TTL(Time-to-Live)属性 ,或者通过RabbitMQ的插件实现延时队列。
消息的流向:
两种实现方式:
TTL + DXL(死信队列)
- TTL
TTL 是 RabbitMQ 中一个消息或者队列的属性,表明一条消息或者该队列中的所有 消息的最大存活时间 , 单位是毫秒。换句话说,如果一条消息设置了 TTL 属性或者进入了设置TTL 属性的队列,那么这 条消息如果在TTL 设置的时间内没有被消费,则会成为"死信"。如果同时配置了队列的TTL 和消息的 TTL,那么较小的那个值将会被使用,有两种方式设置 TTL:
(1) 消息设置TTL ,针对每条消息设置TTL
(2) 队列设置TTL ,在创建队列的时候设置队列的x-message-ttl
属性
两者的区别:如果设置了队列的 TTL 属性,那么一旦消息过期,就会被队列丢弃(如果配置了死信队列被丢到死信队列中),而第二种方式,消息即使过期,也不一定会被马上丢弃,因为消息是否过期是在即将投递到消费者之前判定的,如果当前队列有严重的消息积压情况,则已过期的消息也许还能存活较长时间;另外,还需要注意的一点是,如果不设置 TTL,表示消息永远不会过期,如果将 TTL 设置为 0,则表示除非此时可以直接投递该消息到消费者,否则该消息将会被丢弃。
-
死信队列
死信,顾名思义就是无法被消费的消息,字面意思可以这样理解,一般来说,producer 将消息投递到
broker
或者直接到queue
里了,consumer 从queue
取出消息进行消费,但某些时候由于特定的原因导致queue
中的某些消息无法被消费,这样的消息如果没有后续的处理,就变成了死信,有死信自然就有了死信队列。
(1) 应用场景
为了保证订单业务的消息数据不丢失,需要使用到 RabbitMQ 的死信队列机制,当消息消费发生异常时,将消息投入死信队列中。还有比如说: 用户在商城下单成功并点击去支付后在指定时间未支付时自动失效。
(2) 死信的来源- 消息 TTL 过期
- 队列达到最大长度(队列满了,无法再添加数据到 mq 中)
- 消息被拒绝(basic.reject 或 basic.nack)并且 requeue=false
(3) 结构图
对于消息TTL过期,即对应我们上面介绍的延时队列。此时生产者和消费者代码:
消费者 C1 代码(启动之后关闭该消费者 模拟其接收不到消息)
public class Consumer01 {
//普通交换机名称
private static final String NORMAL_EXCHANGE = "normal_exchange";
//死信交换机名称
private static final String DEAD_EXCHANGE = "dead_exchange";
public static void main(String[] argv) throws Exception {
Channel channel = RabbitUtils.getChannel();
//声明死信和普通交换机 类型为 direct
channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);
channel.exchangeDeclare(DEAD_EXCHANGE, BuiltinExchangeType.DIRECT);
//声明死信队列
String deadQueue = "dead-queue";
channel.queueDeclare(deadQueue, false, false, false, null);
//死信队列绑定死信交换机与 routingkey
channel.queueBind(deadQueue, DEAD_EXCHANGE, "lisi");
//正常队列绑定死信队列信息
Map<String, Object> params = new HashMap<>();
//正常队列设置死信交换机 参数 key 是固定值
params.put("x-dead-letter-exchange", DEAD_EXCHANGE);
//正常队列设置死信 routing-key 参数 key 是固定值
params.put("x-dead-letter-routing-key", "lisi");
String normalQueue = "normal-queue";
channel.queueDeclare(normalQueue, false, false, false, params);
channel.queueBind(normalQueue, NORMAL_EXCHANGE, "zhangsan");
System.out.println("等待接收消息........... ");
DeliverCallback deliverCallback = (consumerTag, delivery) ->
{String message = new String(delivery.getBody(), "UTF-8");
System.out.println("Consumer01 接收到消息"+message);
};
channel.basicConsume(normalQueue, true, deliverCallback, consumerTag -> {
});
}
}
消费者 C2 代码(以上步骤完成后 启动 C2 消费者 它消费死信队列里面的消息)
public class Consumer02 {
private static final String DEAD_EXCHANGE = "dead_exchange";
public static void main(String[] argv) throws Exception {
Channel channel = RabbitUtils.getChannel();
channel.exchangeDeclare(DEAD_EXCHANGE, BuiltinExchangeType.DIRECT);
String deadQueue = "dead-queue";
channel.queueDeclare(deadQueue, false, false, false, null);
channel.queueBind(deadQueue, DEAD_EXCHANGE, "lisi");
System.out.println("等待接收死信队列消息........... ");
DeliverCallback deliverCallback = (consumerTag, delivery) ->
{String message = new String(delivery.getBody(), "UTF-8");
System.out.println("Consumer02 接收死信队列的消息" + message);
};
channel.basicConsume(deadQueue, true, deliverCallback, consumerTag -> {
});
}
}
下面是一个使用TTL+死信队列实现延时队列的示例代码,通过创建一个延时队列和一个死信队列,将消息发送到延时队列中,在延时时间到达后,消息将被转发到死信队列中。
// 导入RabbitMQ相关的依赖
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeoutException;
public class DelayQueueDemo {
// 定义交换机、队列、路由键等常量
public static final String DELAY_EXCHANGE_NAME = "delay.exchange";
public static final String DELAY_QUEUE_NAME = "delay.queue";
public static final String DELAY_ROUTING_KEY = "delay.routing.key";
public static final String DEAD_LETTER_EXCHANGE = "dead.letter.exchange";
public static final String DEAD_LETTER_QUEUE_NAME = "dead.letter.queue";
public static final String DEAD_LETTER_ROUTING_KEY = "dead.letter.routing.key";
public static void main(String[] args) throws IOException, TimeoutException {
// 创建连接工厂和连接
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
factory.setUsername("guest");
factory.setPassword("guest");
Connection connection = factory.newConnection();
// 创建通道
Channel channel = connection.createChannel();
// 声明延时交换机,类型为direct
channel.exchangeDeclare(DELAY_EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
// 声明死信交换机,类型为direct
channel.exchangeDeclare(DEAD_LETTER_EXCHANGE, BuiltinExchangeType.DIRECT);
// 声明延时队列,并设置TTL和死信交换机
Map<String, Object> args = new HashMap<>();
args.put("x-message-ttl", 10000); // 设置消息的过期时间为10秒
args.put("x-dead-letter-exchange", DEAD_LETTER_EXCHANGE); // 设置死信交换机
args.put("x-dead-letter-routing-key", DEAD_LETTER_ROUTING_KEY); // 设置死信路由键
channel.queueDeclare(DELAY_QUEUE_NAME, true, false, false, args);
// 声明死信队列
channel.queueDeclare(DEAD_LETTER_QUEUE_NAME, true, false, false, null);
// 绑定延时队列和延时交换机
channel.queueBind(DELAY_QUEUE_NAME, DELAY_EXCHANGE_NAME, DELAY_ROUTING_KEY);
// 绑定死信队列和死信交换机
channel.queueBind(DEAD_LETTER_QUEUE_NAME, DEAD_LETTER_EXCHANGE, DEAD_LETTER_ROUTING_KEY);
// 创建消费者,监听死信队列
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties, byte[] body) throws IOException {
// 获取消息内容并打印
String message = new String(body, "UTF-8");
System.out.println("Received message: " + message);
}
};
// 开始消费消息,自动确认
channel.basicConsume(DEAD_LETTER_QUEUE_NAME, true, consumer);
// 发送一条延时消息到延时队列
String message = "Hello delay queue!";
channel.basicPublish(DELAY_EXCHANGE_NAME, DELAY_ROUTING_KEY,
null, message.getBytes("UTF-8"));
System.out.println("Sent message: " + message);
}
}
插件实现
在RabbitMQ中,延时队列也可以通过x-delayed-message插件实现的,创建一个类型为x-delayed-message的交换机,发送消息时设置x-delay头部,表示延迟时间。消息到期后会被投递到绑定的队列,消费者监听该队列即可。
在我们自定义的交换机中,这是一种新的交换类型,该类型消息支持延迟投递机制消息传递后并不会立即投递到目标队列中,而是存储在 mnesia(一个分布式数据系统)表中,当达到投递时间时,才投递到目标队列中。
配置类代码:
@Configuration
public class DelayedQueueConfig {
public static final String DELAYED_QUEUE_NAME = "delayed.queue";
public static final String DELAYED_EXCHANGE_NAME = "delayed.exchange";
public static final String DELAYED_ROUTING_KEY = "delayed.routingkey";
@Bean
public Queue delayedQueue() {
return new Queue(DELAYED_QUEUE_NAME);
}
//自定义交换机 我们在这里定义的是一个延迟交换机
@Bean
public CustomExchange delayedExchange() {
Map<String, Object> args = new HashMap<>();
//自定义交换机的类型
args.put("x-delayed-type", "direct");
return new CustomExchange(DELAYED_EXCHANGE_NAME,
"x-delayed-message", true, false, args);
}
@Bean
public Binding bindingDelayedQueue(@Qualifier("delayedQueue") Queue queue,
@Qualifier("delayedExchange") CustomExchange delayedExchange) {
return BindingBuilder.bind(queue).to(delayedExchange)
.with(DELAYED_ROUTING_KEY).noargs();
}
}
消息生产者代码 :
public static final String DELAYED_EXCHANGE_NAME = "delayed.exchange";
public static final String DELAYED_ROUTING_KEY = "delayed.routingkey";
@GetMapping("sendDelayMsg/{message}/{delayTime}")
public void sendMsg(@PathVariable String message,@PathVariable Integer delayTime) {
rabbitTemplate.convertAndSend(DELAYED_EXCHANGE_NAME, DELAYED_ROUTING_KEY,
message,correlationData ->{
correlationData.getMessageProperties().setDelay(delayTime);
return correlationData;
});
log.info(" 当 前 时 间 : {}, 发 送 一 条 延 迟 {} 毫秒的信息给队列
delayed.queue:{}", new Date(),delayTime, message);
}
消息消费者代码 :
public static final String DELAYED_QUEUE_NAME = "delayed.queue";
@RabbitListener(queues = DELAYED_QUEUE_NAME)
public void receiveDelayedQueue(Message message){
String msg = new String(message.getBody());
log.info("当前时间:{},收到延时队列的消息:{}", new Date().toString(), msg);
}
第二个消息被先消费掉了,符合预期
总结
延时队列是一种非常有用的消息队列,它可以在一定的延时时间后将消息投递到消费者。延时队列可以应用于多种场景,例如定时任务、订单超时处理等。实现延时队列的方法有多种,可以使用Java中的DelayQueue
、Redis的zset
,也可以使用RabbitMQ
等消息队列。不同的实现方式有各自的优缺点,需要根据实际情况选择合适的实现方式。
- Java的
DelayQueue
实现延时队列,是一种基于JDK自带的类的方案,它是一个存储延时任务的环形队列,可以高效地循环遍历。它的优点是简单易用,不需要依赖其他组件,缺点是不支持持久化和分布式,如果程序崩溃或者重启,延时任务会丢失。 - 使用Redis的
zset
实现延时队列,是一种基于Redis有序集合的方案,它可以利用score来表示延时时间,通过zadd
和zrangebyscore
等命令来入队和出队。它的优点是高性能,支持持久化和高并发,可以通过Redis集群来提高可用性和扩展性,缺点是需要额外的进程来轮询Redis,并且可能存在消息重复消费或者丢失的风险。 - 使用rabbitmq实现延时队列,是一种基于rabbitmq的TTL和死信队列功能的方案,它可以通过设置消息或者队列的过期时间,以及将过期消息转移到死信队列中来实现延时效果。它的优点是利用了rabbitmq本身的消息可靠发送、投递和消费机制,保证了消息至少被消费一次,并且可以通过rabbitmq集群来解决单点故障问题,缺点是需要额外配置TTL和死信队列,并且可能存在消息堆积或者延迟不准确的问题(也可以使用插件方式)。
参考文章
- 延时队列
- 你真的知道怎么实现一个延迟队列吗 ?
文章中所有图片来源于上述文章!!!