延时队列的三种实现方案

news2024/12/23 11:51:41

延时队列的三种实现方案

    • 什么是延时队列
    • 延时队列的应用场景
    • 基于Java DelayQueue的实现
      • 源码剖析
    • 基于Redis的zset实现
      • 实现步骤
      • Redis延时队列优势
      • Redis延时队列劣势
    • 基于RabbitMQ的延时队列实现
      • TTL + DXL(死信队列)
      • 插件实现
    • 总结
    • 参考文章

什么是延时队列

在分布式系统中,延时队列(Delay Queue)是一个常见的工具,它 允许程序能够按照预定时间处理任务(类似于定时任务)。延时队列允许我们将任务延时到指定的时间执行,这样就可以将任务按照优先级和执行时间来处理,从而提高系统的可靠性和性能。
延时队列是一种特殊的队列相比于普通队列(先进先出)最大的区别就体现在其延时属性上。在这种队列中,每个元素都有一个预设的延时时间,只有当这个时间到期后,元素才可以被消费。这种机制使得延时队列可以用于实现定时任务、消息重试等功能。
在这里插入图片描述

延时队列的应用场景

延时队列在实际应用中有很多应用场景,例如:

  1. 定时任务:使用延时队列可以实现定时任务,例如每隔一段时间执行某个操作,或者在特定的时间点执行某个操作。

注:延时队列和定时任务的区别

  1. 定时任务一般是有固定时间周期的,有明确的触发时间。而延时任务一般没有固定的开始时间,它常常是由一个事件触发的,而在这个事件触发之后的一段时间内执行,没有执行周期。
  2. 定时任务一般批处理操作多个任务,延时任务一般是单个任务。
  1. 消息重试:在分布式系统中,消息可能因为网络原因或其他原因无法成功送达,此时可以使用延时队列实现消息的重试机制。消息发送失败后,将消息存入延时队列设置一个合适的延时时间,当时间到期后,重新发送消息
  2. 缓解并发压力:在高并发场景下,将大量请求先存入延时队列,然后由消费者逐一处理,从而避免瞬间请求对系统造成压力。
  3. 订单超时自动取消:电商系统中,用户下单后需要在一定时间内付款,否则订单会被自动取消。这种场景下,可以使用延时队列实现订单的超时监控。(类似于上面说的定时任务)
  4. 消息通知:在很多业务场景中,需要给用户发送消息通知,但是由于某些原因,这些消息不能及时发送。例如:当用户购买一件商品时,需要在3天内发货,如果超时未发货,需要给用户发送一条消息通知。这时候就可以通过延时队列来实现。(服务端主动向客户端发送消息)

这些场景都有一个特点,需要在某个事件发生之后或者之前的指定时间点完成某一项任务,如: 发生订单生成事件,在十分钟之后检查该订单支付状态,然后将未支付的订单进行关闭;看起来似乎使用定时任务,一直轮询数据,每秒查一次,取出需要被处理的数据,然后处理不就完事了吗?如果数据量比较少,确实可以这样做,比如:对于“如果账单一周内未支付则进行自动结算”这样的需求, 如果对于时间不是严格限制,而是宽松意义上的一周,那么每天晚上跑个定时任务检查一下所有未支 付的账单,确实也是一个可行的方案。但对于数据量比较大,并且时效性较强的场景,如:“订单十 分钟内未支付则关闭“,短期内未支付的订单数据可能会有很多,活动期间甚至会达到百万甚至千万 级别,对这么庞大的数据量仍旧使用轮询的方式显然是不可取的,很可能在一秒内无法完成所有订单 的检查,同时会给数据库带来很大压力,无法满足业务要求而且性能低下。

在这里插入图片描述


接下来,我们将学习几种常见的延时队列实现方式,包括Java中的DelayQueue,Redis的zset和RabbitMQ的延时队列。

基于Java DelayQueue的实现

DelayQueue是Java并发包java.util.concurrent中提供的一个支持延时获取元素的无界阻塞队列。它的内部实现是基于优先级队列(PriorityQueue)按照元素的过期时间进行排序
DelayQueue中,元素必须实现Delayed接口,该接口提供了getDelay()方法,该方法返回元素的剩余延时时间。
DelayQueue是一个无界队列,它不允许插入null元素,并且元素必须是可比较的。它的元素会按照剩余延时时间的升序排列。在DelayQueue中,当调用take()方法时,如果队列中没有元素,则线程会阻塞等待,直到有元素被添加到队列中。

在这里插入图片描述

下面是一个使用Java DelayQueue实现延时队列的示例代码:

import java.util.concurrent.*;

public class DelayQueueDemo {
    public static void main(String[] args) throws InterruptedException {
        DelayQueue<Message> queue = new DelayQueue<>();

        queue.put(new Message("message 1", 2000));
        queue.put(new Message("message 2", 1000));
        queue.put(new Message("message 3", 3000));

        while (!queue.isEmpty()) {
            System.out.println(queue.take());
        }
    }
}

class Message implements Delayed {
    private String message;
    private long delayTime;

    public Message(String message, long delayTime) {
        this.message = message;
        this.delayTime = System.currentTimeMillis() + delayTime;
    }

    @Override
    public long getDelay(TimeUnit unit) {
        long diff = delayTime - System.currentTimeMillis();
        return unit.convert(diff, TimeUnit.MILLISECONDS);
    }

    @Override
    public int compareTo(Delayed o) {
        if (this.delayTime < ((Message) o).delayTime) {
            return -1;
        } else if (this.delayTime > ((Message) o).delayTime) {
            return 1;
        }
        return 0;
    }

    @Override
    public String toString() {
        return "Message{" +
                "message='" + message + '\'' +
                ", delayTime=" + delayTime +
                '}';
    }
}

在上面的示例中,我们创建了一个DelayQueue,并向队列中添加了三个元素。每个元素都是Message对象,该对象实现了Delayed接口,并实现了getDelay()compareTo()方法。在主函数中,我们使用while循环不断从队列中取出元素,直到队列为空。

源码剖析

public class DelayQueue<E extends Delayed> extends AbstractQueue<E>
    implements BlockingQueue<E> {

    // 一个重入锁,用于保证线程安全
    private final transient ReentrantLock lock = new ReentrantLock();
    // 一个优先级队列,用于存储元素,并维护堆的结构,内部是一个基于数组实现的小顶堆
    private final PriorityQueue<E> q = new PriorityQueue<E>();

    // 一个线程变量,用于记录当前等待堆顶元素到期的线程
    private Thread leader = null;

     /**
     * 一个条件变量,用于实现阻塞等待和唤醒机制
     * 在队列中存在元素的时候,第一个调用take()方法的线程将成为leader线程,
	 * 它将会在available上等待队列头结点剩余的延迟时间
     * 其他的线程将会成为follower线程,它们会一直在available上一直等待
     * leader线程苏醒之后会将leader变量置空,在获取到元素之后最后会唤醒一个在available上等待的follower线程
     * 被唤醒的follower线程将可能成为新的leader线程
     */
    private final Condition available = lock.newCondition();

    /**
     * Creates a new {@code DelayQueue} that is initially empty.
     */
    public DelayQueue() {}

    /**
     * Creates a {@code DelayQueue} initially containing the elements of the
     * given collection of {@link Delayed} instances.
     *
     * @param c the collection of elements to initially contain
     * @throws NullPointerException if the specified collection or any
     *         of its elements are null
     */
    public DelayQueue(Collection<? extends E> c) {
        this.addAll(c);
    }

   
    public boolean add(E e) {
        return offer(e);
    }

    /**
     * Inserts the specified element into this delay queue.
     *
     * @param e the element to add
     * @return {@code true}
     * @throws NullPointerException if the specified element is null
     */
    public boolean offer(E e) {
        // 获取锁,保证线程同步
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            // 调用优先级队列的offer方法将元素插入到数组中,并调整堆的结构(根据compareTo方法比较并构建小顶堆)
            q.offer(e);
            // 判断插入的元素是否是堆顶元素(新加入的元素e是否是延迟时间最短的元素)
            if (q.peek() == e) {
                /** 如果是,则将leader线程置空,让后来的线程可以当选为leader
				* 唤醒一个在available上等待的消费线程,让它和新消费线程重新争夺leader。
            	**/
                leader = null;
                available.signal();
            }
            // 这里不会检查队列是否满了,所以一定会返回true
            return true;
        } finally {
            lock.unlock();
        }
    }

    
    public void put(E e) {
        offer(e);
    }

    
    public boolean offer(E e, long timeout, TimeUnit unit) {
        return offer(e);
    }

    /**
     * Retrieves and removes the head of this queue, or returns {@code null}
     * if this queue has no elements with an expired delay.
     *
     * @return the head of this queue, or {@code null} if this
     *         queue has no elements with an expired delay
     */
    public E poll() {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            // 调用优先级队列的peek方法获取堆顶元素
            E first = q.peek();
            // 判断该元素是否为空或者未到期
            if (first == null || first.getDelay(NANOSECONDS) > 0)
                return null;
            else
                // 如果不是,则调用优先级队列的poll方法将堆顶元素弹出,并调整堆的结构
                return q.poll();
        } finally {
            lock.unlock();
        }
    }

    /**
     * Retrieves and removes the head of this queue, waiting if necessary
     * until an element with an expired delay is available on this queue.
     *
     * @return the head of this queue
     * @throws InterruptedException {@inheritDoc}
     * 出队,获取并移除此延迟队列已过期的队头,如果此时没有已过期的队头,那么一直等待。
     */
    public E take() throws InterruptedException {
         // 获取锁,如果被中断则抛出异
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            //开启一个死循环
            for (;;) {
                // 调用优先级队列的peek方法获取堆顶元素
                E first = q.peek();
                // 如果为空,则阻塞等待在available条件上,直到被唤醒或者中断
                if (first == null)
                    available.await();
                else {
                    // 如果不为空,则获取该元素的剩余延迟时间
                    long delay = first.getDelay(NANOSECONDS);
                    // 如果小于等于0,则说明该元素已经到期,
                    // 调用优先级队列的poll方法将堆顶元素弹出,并调整堆的结构
                    if (delay <= 0)
                        return q.poll();
                    // 如果大于0即未到期,则将first置空方便gc回收
                    first = null; // don't retain ref while waiting
                    // 判断leader线程是否为空
                    if (leader != null)
                        // 如果不为空,则说明有其他线程正在等待堆顶元素到期,
                        // 当前线程也阻塞等待在available条件上,直到被唤醒或者中断
                        available.await();
                    else {
                        /** 如果为空,则将当前线程设为leader线程,并阻塞等待堆顶元素的剩余延迟时间,
                        * 在finally块中判断当前线程是否是leader线程,如果是,则将leader线程置空,
                    	* 并唤醒下一个等待在available条件上的线程
                        **/ 
                        Thread thisThread = Thread.currentThread();
                        leader = thisThread;
                        try {
                            // 阻塞等待堆顶元素的剩余延迟时间,直到被唤醒或者中断
                            available.awaitNanos(delay);
                        } finally {
                            // 如果leader还是当前线程就把它置为空,让其他线程有机会获取元素
                            if (leader == thisThread)
                                leader = null;
                        }
                    }
                }
            }
        } finally {
            // 如果leader为空且堆顶还有元素,就唤醒下一个等待的线程
            if (leader == null && q.peek() != null)
                available.signal();
            lock.unlock();
        }
    }

    /**
     * Retrieves and removes the head of this queue, waiting if necessary
     * until an element with an expired delay is available on this queue,
     * or the specified wait time expires.
     *
     * @return the head of this queue, or {@code null} if the
     *         specified waiting time elapses before an element with
     *         an expired delay becomes available
     * @throws InterruptedException {@inheritDoc}
	 * 在指定时间内等待获得到期的队列头;如果在队头过期之前超过了指定的等待时间,则返回 null。
     */
    public E poll(long timeout, TimeUnit unit) throws InterruptedException {
        long nanos = unit.toNanos(timeout);
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            for (;;) {
                E first = q.peek();
                if (first == null) {
                    if (nanos <= 0)
                        return null;
                    else
                        nanos = available.awaitNanos(nanos);
                } else {
                    long delay = first.getDelay(NANOSECONDS);
                    if (delay <= 0)
                        return q.poll();
                    if (nanos <= 0)
                        return null;
                    first = null; // don't retain ref while waiting
                    if (nanos < delay || leader != null)
                        nanos = available.awaitNanos(nanos);
                    else {
                        Thread thisThread = Thread.currentThread();
                        leader = thisThread;
                        try {
                            long timeLeft = available.awaitNanos(delay);
                            nanos -= delay - timeLeft;
                        } finally {
                            if (leader == thisThread)
                                leader = null;
                        }
                    }
                }
            }
        } finally {
            if (leader == null && q.peek() != null)
                available.signal();
            lock.unlock();
        }
    }

    /**
     * Retrieves, but does not remove, the head of this queue, or
     * returns {@code null} if this queue is empty.  Unlike
     * {@code poll}, if no expired elements are available in the queue,
     * this method returns the element that will expire next,
     * if one exists.
     *
     * @return the head of this queue, or {@code null} if this
     *         queue is empty
     */
    public E peek() {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            return q.peek();
        } finally {
            lock.unlock();
        }
    }


    /**
     * Returns first element only if it is expired.
     * Used only by drainTo.  Call only when holding lock.
     */
    private E peekExpired() {
        // assert lock.isHeldByCurrentThread();
        E first = q.peek();
        return (first == null || first.getDelay(NANOSECONDS) > 0) ?
            null : first;
    }



    /**
     * Removes a single instance of the specified element from this
     * queue, if it is present, whether or not it has expired.
     */
    public boolean remove(Object o) {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            return q.remove(o);
        } finally {
            lock.unlock();
        }
    }



    /**
     * Returns an iterator over all the elements (both expired and
     * unexpired) in this queue. The iterator does not return the
     * elements in any particular order.
     *
     * <p>The returned iterator is
     * <a href="package-summary.html#Weakly"><i>weakly consistent</i></a>.
     *
     * @return an iterator over the elements in this queue
     */
    public Iterator<E> iterator() {
        return new Itr(toArray());
    }

    /**
     * Snapshot iterator that works off copy of underlying q array.
     */
    private class Itr implements Iterator<E> {
        final Object[] array; // Array of all elements
        int cursor;           // index of next element to return
        int lastRet;          // index of last element, or -1 if no such

        Itr(Object[] array) {
            lastRet = -1;
            this.array = array;
        }

        public boolean hasNext() {
            return cursor < array.length;
        }

        @SuppressWarnings("unchecked")
        public E next() {
            if (cursor >= array.length)
                throw new NoSuchElementException();
            lastRet = cursor;
            return (E)array[cursor++];
        }

        public void remove() {
            if (lastRet < 0)
                throw new IllegalStateException();
            removeEQ(array[lastRet]);
            lastRet = -1;
        }
    }

}

leader线程是一个线程变量,它记录了当前正在等待堆顶元素到期的线程。它的作用是为了避免多个线程同时等待同一个元素到期,从而造成资源浪费和竞争。如果有多个线程同时调用take方法,只有第一个线程会被设为leader线程,并阻塞等待堆顶元素的剩余延迟时间,其他线程则会阻塞等待在available条件上,直到被唤醒或者中断。当leader线程被唤醒或者中断后,它会尝试获取堆顶元素,如果成功,则将leader线程置空,并唤醒下一个等待在available条件上的线程,让它成为新的leader线程;如果失败,则重新进入循环判断堆顶元素是否到期。这样就保证了每次只有一个线程在等待堆顶元素到期,提高了效率和公平性。
举个例子:
假设有三个线程A、B、C同时调用take方法,队列中有一个元素D,它的延迟时间是10秒,当前时间是0秒。

  • 线程A先获取到锁,然后获取堆顶元素D,发现它还没有到期,判断leader线程为空,将自己设为leader线程,并阻塞等待10秒。
  • 线程B后获取到锁,然后获取堆顶元素D,发现它还没有到期,判断leader线程不为空,阻塞等待在available条件上。
  • 线程C再获取到锁,然后获取堆顶元素D,发现它还没有到期,判断leader线程不为空,阻塞等待在available条件上。
  • 10秒后,线程A被唤醒,然后获取堆顶元素D,发现它已经到期,调用优先级队列的poll方法将堆顶元素弹出,并调整堆的结构。然后判断自己是否是leader线程,如果是,则将leader线程置空,并唤醒下一个等待在available条件上的线程(假设是线程B)。
  • 线程B被唤醒,然后获取堆顶元素D,发现它为空(因为已经被线程A弹出了),阻塞等待在available条件上(直到有新的元素入队或者被中断)。
  • 线程C仍然阻塞等待在available条件上(直到有新的元素入队或者被中断)。

基于Redis的zset实现

Redis的zset(有序集合)也可以用来实现延时队列。我们知道,zset是一种特殊的集合,其内部成员都是有序排列的,每个元素都关联一个分数值,根据这个分数值对元素进行排序。我们可以将元素的过期时间作为分数值,从而实现延时队列

在这里插入图片描述

有序集合(zset)同样使用了两种不同的存储结构,分别是 zipList(压缩列表)(在Redis7中是listpack)和 skipList(跳跃列表),具体可以看下面这篇博客:Redis底层数据结构分析(二) —— Hash结构_redis哈希表长度_小熊不吃香菜的博客-CSDN博客

实现步骤

  1. 将任务的到期时间作为分值(zadd key timestamp task),将任务的内容作为成员,添加到zset中。
  2. 使用zrangebyscore命令,根据当前时间戳,获取分值小于等于当前时间戳的成员,即到期的任务。
  3. 使用ZREM命令,删除获取到的成员,防止重复执行。
  4. 对获取到的任务进行后续处理
    在这里插入图片描述

下面是一个使用Redis的zset实现延时队列的示例代码:

import redis.clients.jedis.Jedis;

import java.util.Set;

public class RedisDelayQueueDemo {
    private static final String QUEUE_NAME = "delay-queue";

    public static void main(String[] args) throws InterruptedException {
        Jedis jedis = new Jedis("localhost", 6379);

        // 向队列中添加元素
        jedis.zadd(QUEUE_NAME, System.currentTimeMillis() + 2000, "message 1");
        jedis.zadd(QUEUE_NAME, System.currentTimeMillis() + 1000, "message 2");
        jedis.zadd(QUEUE_NAME, System.currentTimeMillis() + 3000, "message 3");

        while (true) {
            Set<String> messages = jedis.zrangeByScore(QUEUE_NAME, 0, System.currentTimeMillis(), 0, 1);
            if (!messages.isEmpty()) {
                String message = messages.iterator().next();
                jedis.zrem(QUEUE_NAME, message);
                System.out.println(message);
            } else {
                Thread.sleep(1000);
            }
        }
    }
}

在上面的示例中,我们使用Jedis连接Redis,并向队列中添加了三个元素,每个元素都是一个字符串,它们的score值分别为元素过期时间。然后,我们使用一个while循环不断从zset中取出过期的元素,并将其从zset中移除。

Redis延时队列优势

  1. Redis zset支持高性能的 score 排序。
  2. Redis是在内存上进行操作的,速度非常快。
  3. Redis可以搭建集群,当消息很多时候,我们可以用集群来提高消息处理的速度,提高可用性。
  4. Redis具有持久化机制,当出现故障的时候,可以通过AOF和RDB方式来对数据进行恢复,保证了数据的可靠性。

Redis延时队列劣势

  1. 使用 Redis 实现的延时消息队列也存在数据持久化, 消息可靠性的问题
  2. 没有重试机制 。处理消息出现异常没有重试机制, 这些需要自己去实现, 包括重试次数的实现等
  3. 没有 ACK 机制。 例如在获取消息并已经删除了消息情况下, 正在处理消息的时候客户端崩溃了, 这条正在处理的这些消息就会丢失, MQ 是需要明确的返回一个值给 MQ 才会认为这个消息是被正确的消费了。

如果对消息可靠性要求较高, 推荐使用 MQ 来实现

基于RabbitMQ的延时队列实现

RabbitMQ是一个广泛使用的消息队列中间件,它也支持延时队列的功能。我们可以通过为队列设置消息的TTL(Time-to-Live)属性 ,或者通过RabbitMQ的插件实现延时队列。

消息的流向:

在这里插入图片描述


两种实现方式:

TTL + DXL(死信队列)

在这里插入图片描述

  1. TTL

TTL 是 RabbitMQ 中一个消息或者队列的属性,表明一条消息或者该队列中的所有 消息的最大存活时间 , 单位是毫秒。换句话说,如果一条消息设置了 TTL 属性或者进入了设置TTL 属性的队列,那么这 条消息如果在TTL 设置的时间内没有被消费,则会成为"死信"。如果同时配置了队列的TTL 和消息的 TTL,那么较小的那个值将会被使用,有两种方式设置 TTL:

(1) 消息设置TTL ,针对每条消息设置TTL
在这里插入图片描述

(2) 队列设置TTL ,在创建队列的时候设置队列的x-message-ttl属性
在这里插入图片描述

两者的区别:如果设置了队列的 TTL 属性,那么一旦消息过期,就会被队列丢弃(如果配置了死信队列被丢到死信队列中),而第二种方式,消息即使过期,也不一定会被马上丢弃,因为消息是否过期是在即将投递到消费者之前判定的,如果当前队列有严重的消息积压情况,则已过期的消息也许还能存活较长时间;另外,还需要注意的一点是,如果不设置 TTL,表示消息永远不会过期,如果将 TTL 设置为 0,则表示除非此时可以直接投递该消息到消费者,否则该消息将会被丢弃。

  1. 死信队列

    死信,顾名思义就是无法被消费的消息,字面意思可以这样理解,一般来说,producer 将消息投递到 broker 或者直接到queue 里了,consumer 从 queue取出消息进行消费,但某些时候由于特定的原因导致 queue 中的某些消息无法被消费,这样的消息如果没有后续的处理,就变成了死信,有死信自然就有了死信队列。
    (1) 应用场景
    为了保证订单业务的消息数据不丢失,需要使用到 RabbitMQ 的死信队列机制,当消息消费发生异常时,将消息投入死信队列中。还有比如说: 用户在商城下单成功并点击去支付后在指定时间未支付时自动失效。

    (2) 死信的来源

    • 消息 TTL 过期
    • 队列达到最大长度(队列满了,无法再添加数据到 mq 中)
    • 消息被拒绝(basic.reject 或 basic.nack)并且 requeue=false

(3) 结构图
在这里插入图片描述

对于消息TTL过期,即对应我们上面介绍的延时队列。此时生产者和消费者代码:

在这里插入图片描述


消费者 C1 代码(启动之后关闭该消费者 模拟其接收不到消息)

public class Consumer01 {
    //普通交换机名称
    private static final String NORMAL_EXCHANGE = "normal_exchange";
    //死信交换机名称
    private static final String DEAD_EXCHANGE = "dead_exchange";
    public static void main(String[] argv) throws Exception {
        Channel channel = RabbitUtils.getChannel();
        //声明死信和普通交换机 类型为 direct
        channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);
        channel.exchangeDeclare(DEAD_EXCHANGE, BuiltinExchangeType.DIRECT);
        //声明死信队列
        String deadQueue = "dead-queue";
        channel.queueDeclare(deadQueue, false, false, false, null);
        //死信队列绑定死信交换机与 routingkey
        channel.queueBind(deadQueue, DEAD_EXCHANGE, "lisi");
        //正常队列绑定死信队列信息
        Map<String, Object> params = new HashMap<>();
        //正常队列设置死信交换机 参数 key 是固定值
        params.put("x-dead-letter-exchange", DEAD_EXCHANGE);
        //正常队列设置死信 routing-key 参数 key 是固定值
        params.put("x-dead-letter-routing-key", "lisi");
        String normalQueue = "normal-queue";
        channel.queueDeclare(normalQueue, false, false, false, params);
        channel.queueBind(normalQueue, NORMAL_EXCHANGE, "zhangsan");
        System.out.println("等待接收消息........... ");
        DeliverCallback deliverCallback = (consumerTag, delivery) -> 
        	{String message = new String(delivery.getBody(), "UTF-8");
        	System.out.println("Consumer01 接收到消息"+message);
        };
        channel.basicConsume(normalQueue, true, deliverCallback, consumerTag -> {
        });
	}
}

在这里插入图片描述


消费者 C2 代码(以上步骤完成后 启动 C2 消费者 它消费死信队列里面的消息)

public class Consumer02 {
    private static final String DEAD_EXCHANGE = "dead_exchange";
    public static void main(String[] argv) throws Exception {
        Channel channel = RabbitUtils.getChannel();
        channel.exchangeDeclare(DEAD_EXCHANGE, BuiltinExchangeType.DIRECT);
        String deadQueue = "dead-queue";
        channel.queueDeclare(deadQueue, false, false, false, null);
        channel.queueBind(deadQueue, DEAD_EXCHANGE, "lisi");
        System.out.println("等待接收死信队列消息........... ");
        DeliverCallback deliverCallback = (consumerTag, delivery) -> 
        {String message = new String(delivery.getBody(), "UTF-8");
        System.out.println("Consumer02 接收死信队列的消息" + message);
        };
        channel.basicConsume(deadQueue, true, deliverCallback, consumerTag -> {
        });
    }
}    

在这里插入图片描述

下面是一个使用TTL+死信队列实现延时队列的示例代码,通过创建一个延时队列和一个死信队列,将消息发送到延时队列中,在延时时间到达后,消息将被转发到死信队列中。

// 导入RabbitMQ相关的依赖
import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeoutException;

public class DelayQueueDemo {

    // 定义交换机、队列、路由键等常量
    public static final String DELAY_EXCHANGE_NAME = "delay.exchange";
    public static final String DELAY_QUEUE_NAME = "delay.queue";
    public static final String DELAY_ROUTING_KEY = "delay.routing.key";
    public static final String DEAD_LETTER_EXCHANGE = "dead.letter.exchange";
    public static final String DEAD_LETTER_QUEUE_NAME = "dead.letter.queue";
    public static final String DEAD_LETTER_ROUTING_KEY = "dead.letter.routing.key";

    public static void main(String[] args) throws IOException, TimeoutException {
        // 创建连接工厂和连接
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        factory.setUsername("guest");
        factory.setPassword("guest");
        Connection connection = factory.newConnection();

        // 创建通道
        Channel channel = connection.createChannel();

        // 声明延时交换机,类型为direct
        channel.exchangeDeclare(DELAY_EXCHANGE_NAME, BuiltinExchangeType.DIRECT);

        // 声明死信交换机,类型为direct
        channel.exchangeDeclare(DEAD_LETTER_EXCHANGE, BuiltinExchangeType.DIRECT);

        // 声明延时队列,并设置TTL和死信交换机
        Map<String, Object> args = new HashMap<>();
        args.put("x-message-ttl", 10000); // 设置消息的过期时间为10秒
        args.put("x-dead-letter-exchange", DEAD_LETTER_EXCHANGE); // 设置死信交换机
        args.put("x-dead-letter-routing-key", DEAD_LETTER_ROUTING_KEY); // 设置死信路由键
        channel.queueDeclare(DELAY_QUEUE_NAME, true, false, false, args);

        // 声明死信队列
        channel.queueDeclare(DEAD_LETTER_QUEUE_NAME, true, false, false, null);

        // 绑定延时队列和延时交换机
        channel.queueBind(DELAY_QUEUE_NAME, DELAY_EXCHANGE_NAME, DELAY_ROUTING_KEY);

        // 绑定死信队列和死信交换机
        channel.queueBind(DEAD_LETTER_QUEUE_NAME, DEAD_LETTER_EXCHANGE, DEAD_LETTER_ROUTING_KEY);

        // 创建消费者,监听死信队列
        Consumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope,
                                       AMQP.BasicProperties properties, byte[] body) throws IOException {
                // 获取消息内容并打印
                String message = new String(body, "UTF-8");
                System.out.println("Received message: " + message);
            }
        };

        // 开始消费消息,自动确认
        channel.basicConsume(DEAD_LETTER_QUEUE_NAME, true, consumer);

        // 发送一条延时消息到延时队列
        String message = "Hello delay queue!";
        channel.basicPublish(DELAY_EXCHANGE_NAME, DELAY_ROUTING_KEY,
                null, message.getBytes("UTF-8"));
        System.out.println("Sent message: " + message);

    }
}

插件实现

在RabbitMQ中,延时队列也可以通过x-delayed-message插件实现的,创建一个类型为x-delayed-message的交换机,发送消息时设置x-delay头部,表示延迟时间。消息到期后会被投递到绑定的队列,消费者监听该队列即可。

在这里插入图片描述


在我们自定义的交换机中,这是一种新的交换类型,该类型消息支持延迟投递机制消息传递后并不会立即投递到目标队列中,而是存储在 mnesia(一个分布式数据系统)表中,当达到投递时间时,才投递到目标队列中。

配置类代码:

@Configuration
public class DelayedQueueConfig {
    public static final String DELAYED_QUEUE_NAME = "delayed.queue";
    public static final String DELAYED_EXCHANGE_NAME = "delayed.exchange";
    public static final String DELAYED_ROUTING_KEY = "delayed.routingkey";
    @Bean
    public Queue delayedQueue() {
   	 	return new Queue(DELAYED_QUEUE_NAME);
    }
    //自定义交换机 我们在这里定义的是一个延迟交换机
    @Bean
    public CustomExchange delayedExchange() { 
        Map<String, Object> args = new HashMap<>();
        //自定义交换机的类型
        args.put("x-delayed-type", "direct");
        return new CustomExchange(DELAYED_EXCHANGE_NAME, 
                                  "x-delayed-message", true, false, args);
    }
    @Bean
    public Binding bindingDelayedQueue(@Qualifier("delayedQueue") Queue queue,
        		@Qualifier("delayedExchange") CustomExchange delayedExchange) {
        return BindingBuilder.bind(queue).to(delayedExchange)
            	.with(DELAYED_ROUTING_KEY).noargs();
    }
}

消息生产者代码 :

public static final String DELAYED_EXCHANGE_NAME = "delayed.exchange";
public static final String DELAYED_ROUTING_KEY = "delayed.routingkey";
@GetMapping("sendDelayMsg/{message}/{delayTime}")
public void sendMsg(@PathVariable String message,@PathVariable Integer delayTime) {
	rabbitTemplate.convertAndSend(DELAYED_EXCHANGE_NAME, DELAYED_ROUTING_KEY, 
                                  message,correlationData ->{
    	correlationData.getMessageProperties().setDelay(delayTime);
    	return correlationData;
	});
	log.info(" 当 前 时 间 : {}, 发 送 一 条 延 迟 {} 毫秒的信息给队列 
             delayed.queue:{}", new Date(),delayTime, message);
}                          

消息消费者代码 :

public static final String DELAYED_QUEUE_NAME = "delayed.queue";
@RabbitListener(queues = DELAYED_QUEUE_NAME)
public void receiveDelayedQueue(Message message){
    String msg = new String(message.getBody());
	log.info("当前时间:{},收到延时队列的消息:{}", new Date().toString(), msg);
}

在这里插入图片描述

第二个消息被先消费掉了,符合预期

总结

延时队列是一种非常有用的消息队列,它可以在一定的延时时间后将消息投递到消费者。延时队列可以应用于多种场景,例如定时任务、订单超时处理等。实现延时队列的方法有多种,可以使用Java中的DelayQueue、Redis的zset,也可以使用RabbitMQ等消息队列。不同的实现方式有各自的优缺点,需要根据实际情况选择合适的实现方式。

  • Java的DelayQueue实现延时队列,是一种基于JDK自带的类的方案,它是一个存储延时任务的环形队列,可以高效地循环遍历。它的优点是简单易用,不需要依赖其他组件,缺点是不支持持久化和分布式,如果程序崩溃或者重启,延时任务会丢失。
  • 使用Redis的zset实现延时队列,是一种基于Redis有序集合的方案,它可以利用score来表示延时时间,通过zaddzrangebyscore等命令来入队和出队。它的优点是高性能,支持持久化和高并发,可以通过Redis集群来提高可用性和扩展性,缺点是需要额外的进程来轮询Redis,并且可能存在消息重复消费或者丢失的风险。
  • 使用rabbitmq实现延时队列,是一种基于rabbitmq的TTL和死信队列功能的方案,它可以通过设置消息或者队列的过期时间,以及将过期消息转移到死信队列中来实现延时效果。它的优点是利用了rabbitmq本身的消息可靠发送、投递和消费机制,保证了消息至少被消费一次,并且可以通过rabbitmq集群来解决单点故障问题,缺点是需要额外配置TTL和死信队列,并且可能存在消息堆积或者延迟不准确的问题(也可以使用插件方式)。

参考文章

  1. 延时队列
  2. 你真的知道怎么实现一个延迟队列吗 ?

文章中所有图片来源于上述文章!!!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/483715.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Go(二):包管理、通道、协程并发、互斥锁基础

包管理、协程并发基础 生成包管理文件go-mod第一步&#xff08;初始化创建包管理文件&#xff09;第二步&#xff08;导入包&#xff09; 常用命令导入远程包&#xff08;示例&#xff1a;gin&#xff09;第一步&#xff08;导入包&#xff09;第二步&#xff08;安装包&#x…

操作系统之进程同异步、互斥

引入 异步性是指&#xff0c;各并发执行的进程以各自独立的、不可预知的速度向前推进。 但是在一定的条件之下&#xff0c;需要进程按照一定的顺序去执行相关进程&#xff1a; 举例说明1&#xff1a; 举例说明2: 读进程和写进程并发地运行&#xff0c;由于并发必然导致异步性…

【Python】如何在Python中绘制带有连接线的双饼图?

文章目录 一、导入所需的库二、准备数据三、绘制双饼图3.1 创建画布和子图对象3.2 绘制大饼图3.3 绘制小饼图3.4 连接线1&#xff0c;连接大饼图的上边缘和小饼图的饼块3.5 连接线2&#xff0c;连接大饼图的下边缘和小饼图的饼块3.6 添加连接线3.7 调整子图布局 四、源代码 在 …

Linux 内核组织(kernel.org)将关闭 FTP 服务

Linux 内核组织&#xff08;kernel.org&#xff09;是一家建立于 2002 年的加利福尼亚公共福利公司&#xff0c;其目的是公开地免费分发 Linux 内核和其它开源软件。它接受 Linux 基金会的管理&#xff0c;包括技术、资金和人员支持&#xff0c;用以维护kernel.org 的运营。 Li…

2 ROS2话题通讯基础(1)

2 ROS2话题通讯基础 2.1 ROS2话题通讯介绍2.2 ROS2常用的消息类型介绍2.2.1 std_msgs消息类型2.2.2 geometry_msgs消息类型 2.3 使用C/C创建基础消息类型的话题通讯2.3.1 创建C/C发布话题信息的功能包并配置VSCode环境2.3.2 编写ROS2发布话题节点CPP文件2.3.3 配置C/C发布话题功…

【Elasticsearch】SQL操作相关

文章目录 SQL操作数据准备查询索引下的数据SQL转化为DSL(本质)SQL与DSL混合使用查看所有索引查询指定索引查看索引(表)结构where条件过滤group by分组having 对分组后的数据进行过滤order by 排序limit 限制查询数量cursor 游标->为缓存设计聚合操作支持的函数和运算比较运算…

虚拟机和Docker有什么区别?

虚拟机 对于虚拟机&#xff0c;抽象层或抽象软件成为管理程序。管理程序就是帮助虚拟机模拟物理计算机的东西。在管理程序下面&#xff0c;我们有些硬件。管理程序管理单个物理主机上不同虚拟机之间的资源分配。管理程序管理单个物理主机上不同虚拟机之间的资源分配。也就是管…

微信小程序学习实录3(环境部署、百度地图微信小程序、单击更换图标、弹窗信息、导航、支持腾讯百度高德地图调起)

百度地图微信小程序 一、环境部署1.need to be declared in the requiredPrivateInfos2.api.map.baidu.com 不在以下 request 合法域名3.width and heigth of marker id 9 are required 二、核心代码&#xff08;一&#xff09;逻辑层index.js&#xff08;二&#xff09;渲染层…

vue diff算法与虚拟dom知识整理(2) snabbdom简介并搭建开发环境

snabbdom算是diff算法 和 虚拟dom 的一个鼻租了 vue源码借鉴了snabbdom 这个单词翻译出来叫速度 命名还是用了点心的 后面是 dom 这个 我们大概去猜作者的意思 大概想表示的就是 一个比较快的dom操作 snabbdom的get地址如下 https://github.com/snabbdom/snabbdom 这里的简…

「OceanBase 4.1 体验」|快速安装部署[OBD方式]

文章目录 一、Oceanbase数据库简介1.1 核心特性1.2 系统架构1.2.1 存储层1.2.2 复制层1.2.3 均衡层1.2.4 事务层1.2.4.1 原子性1.2.4.2 隔离性 1.2.5 SQL 层1.2.5.1 SQL 层组件1.2.5.2 多种计划 1.2.6 接入层 二、OceanBase 数据库社区版部署2.1 部署方式2.2 基础环境配置2.3 通…

【华为OD机试真题】信号发射和接收(javaC++python)100%通过率 超详细代码注释 代码深度解读

信号发射和接收 知识点数组栈 单调栈时间限制: 1s 空间限制: 256MB 限定语言:不限 题目描述: 有一个二维的天线矩阵&#xff0c;每根天线可以向其他天线发射信号也能接收其他天线的信号&#xff0c;为了简化起见&#xff0c;我们约定每根天线只能向东和向南发射信号&#xf…

【ROS仿真实战】获取机器人在gazebo位置真值的三种方法(三)

文章目录 前言一. 使用ROS tf库二、 使用Gazebo Model Plugin三、 使用libgazebo_ros_p3d插件四、总结 前言 在ROS和Gazebo中&#xff0c;获取机器人的位置信息通常通过ROS消息传递进行。在这篇文章中&#xff0c;我们将介绍三种获取机器人在Gazebo中位置真值的方法&#xff1…

CTF ASCII码 密码解密题 简单

1. 题目 这次的CTF题目就是一张图片如下&#xff0c;并且说有几个蛋被打乱过。明显是一个密码学的解码题。 2. 解题思路 左边表格给出10种颜色&#xff0c;特别是第二列给出了数字0&#xff0c;种种迹象都指向了10进制。每一个蛋都有三种颜色&#xff0c;代表每个蛋都是三位…

【GORM框架】一文学会用gorm实现对单表的增删改查操作

博主简介&#xff1a;努力学习的大一在校计算机专业学生&#xff0c;热爱学习和创作。目前在学习和分享&#xff1a;数据结构、Go&#xff0c;Java等相关知识。博主主页&#xff1a; 是瑶瑶子啦所属专栏: GORM框架学习 近期目标&#xff1a;写好专栏的每一篇文章 文章目录 一、…

M1 Mac配置JAVA环境

1、下载JDK 目前JDK有Oracle的JDK还有zulu的Open JDK可供选择&#xff0c;因为需要JAVA1.8所以下文以zulu的JDK为例。 Zulu官网&#xff1a;https://www.azul.com/downloads/?packagejdk 选择所需的JDK版本&#xff08;注意选择ARM架构&#xff09;> 下载.dmg包 > 安装 …

DAY 47 Ngnix优化与防盗链

Ngnix优化主要有两种&#xff0c;一种是配置上的优化&#xff0c;一种是内核上的优化 隐藏响应头中的版本号 方法一&#xff1a;curl命令 网页查看 隐藏版本信息 修改nginx的运行用户和组 方法一&#xff1a;在编译安装时&#xff0c;指定运行用户和组 [root nginx-1.12.2]#…

【英语】100个句子记完7000个雅思单词

其实主要的7000词其实是在主题归纳里面&#xff0c;不过过一遍100个句子也挺好的&#xff0c;反正也不多。 文章目录 Sentence 01Sentence 02Sentence 03Sentence 04Sentence 05Sentence 06Sentence 07Sentence 08Sentence 09Sentence 10Sentence 11Sentence 12Sentence 13Sent…

Linux常用的压缩、解压缩以及scp远程传输命令的使用

Linux常用的压缩、解压缩以及scp远程传输命令的使用 1.压缩命令2 解压命令3. 大文件压缩分割为多个压缩文件4. 远程传输命令scp4.1 将本地文件复制到远程主机目录4.2 将本地目录复制到远程主机目录4.3 将远程主机的文件复制到本机4.4 复制远程主机目录到本机 1.压缩命令 tar -…

Packet Tracer - 综合技能练习(配置新交换机的初始设置、SSH 和端口安全)

Packet Tracer - 综合技能练习 地址分配表 设备 接口 IP 地址 子网掩码 S1 VLAN 1 10.10.10.2 255.255.255.0 PC1 NIC 10.10.10.10 255.255.255.0 PC2 NIC 10.10.10.11 255.255.255.0 场景 网络管理员要求您配置新交换机。 在本练习中&#xff0c;您将使用一…

二分搜索算法通解框架

文章介绍了二分搜索最常见的几个场景的使用&#xff1a;寻找一个数、寻找左侧边界以及寻找右侧边界。阅读本文只需读者了解二分搜索的使用限制和基本原理即可。 我相信&#xff0c;友好的讨论交流会让彼此快速进步&#xff01;文章难免有疏漏之处&#xff0c;十分欢迎大家在评…