RabbitMQ实现延时队列

news2024/11/15 23:38:31

目录

  • 什么是延时队列
  • 延时队列的使用场景
  • 前提准备
  • 利用RabbitMQ实现延时队列
  • 延时队列优化
  • 利用RabbitMQ插件实现延迟队列

什么是延时队列

延时队列,首先,它是一种队列,队列意味着内部的元素是有序的,元素出队和入队是有方向性的,元素从一端进入,从另一端取出。

其次,延时队列,最重要的特性就体现在它的延时属性上,跟普通的队列不一样的是,普通队列中的元素总是等着希望被早点取出处理,而延时队列中的元素则是希望被在指定时间得到取出和处理,所以延时队列中的元素是都是带时间属性的,通常来说是需要被处理的消息或者任务。

简单来说,延时队列就是用来存放需要在指定时间被处理的元素的队列。

延时队列的使用场景

那么什么时候需要用延时队列呢?考虑一下以下场景:

  • 订单在十分钟之内未支付则自动取消。
  • 新创建的店铺,如果在十天内都没有上传过商品,则自动发送消息提醒。
  • 账单在一周内未支付,则自动结算。
  • 用户注册成功后,如果三天内没有登陆则进行短信提醒。
  • 用户发起退款,如果三天内没有得到处理则通知相关运营人员。
  • 预定会议后,需要在预定的时间点前十分钟通知各个与会人员参加会议。

这些场景都有一个特点,需要在某个事件发生之后或者之前的指定时间点完成某一项任务,如:发生订单生成事件,在十分钟之后检查该订单支付状态,然后将未支付的订单进行关闭;发生店铺创建事件,十天后检查该店铺上新商品数,然后通知上新数为0的商户;发生账单生成事件,检查账单支付状态,然后自动结算未支付的账单;发生新用户注册事件,三天后检查新注册用户的活动数据,然后通知没有任何活动记录的用户;发生退款事件,在三天之后检查该订单是否已被处理,如仍未被处理,则发送消息给相关运营人员;发生预定会议事件,判断离会议开始是否只有十分钟了,如果是,则通知各个与会人员等等情况。

前提准备

在介绍延时队列之前,还需要先介绍一下RabbitMQ中的一个高级特性——TTL(Time To Live)

TTL是什么呢?TTLRabbitMQ中一个消息或者队列的属性,表明一条消息或者该队列中的所有消息的最大存活时间,单位是毫秒。换句话说,如果一条消息设置了TTL属性或者进入了设置TTL属性的队列,那么这条消息如果在TTL设置的时间内没有被消费,则会成为“死信”(至于什么是死信,可以点击查看)。如果同时配置了队列的TTL和消息的TTL,那么较小的那个值将会被使用。

设置TTL的方式有两种:

第一种,在创建队列的时候设置队列的“x-message-ttl”属性:

Map<String, Object> args = new HashMap<String, Object>();
args.put("x-message-ttl", 6000);
channel.queueDeclare(queueName, durable, exclusive, autoDelete, args);

第二种,针对每条消息设置TTL:


AMQP.BasicProperties.Builder builder = new AMQP.BasicProperties.Builder();
builder.expiration("6000");
AMQP.BasicProperties properties = builder.build();
channel.basicPublish(exchangeName, routingKey, mandatory, properties, "msg body".getBytes());

通过以上两种方式,可以将消息延迟6秒在被消费。

这两种方式是有区别的,如果设置了队列的TTL属性,那么一旦消息过期,就会被队列丢弃,而第二种方式,消息即使过期,也不一定会被马上丢弃,因为消息是否过期是在即将投递到消费者之前判定的,如果当前队列有严重的消息积压情况,则已过期的消息也许还能存活较长时间。

另外,还需要注意的一点是,如果不设置TTL,表示消息永远不会过期,如果将TTL设置为0,则表示除非此时可以直接投递该消息到消费者,否则该消息将会被丢弃。

利用RabbitMQ实现延时队列

想想看,延时队列,不就是想要消息延迟多久被处理吗,TTL则刚好能让消息在延迟多久之后成为死信,另一方面,成为死信的消息都会被投递到死信队列里,这样只需要消费者一直消费死信队列里的消息就万事大吉了,因为里面的消息都是希望被立即处理的消息。

从下图可以大致看出消息的流向:

在这里插入图片描述
生产者生产一条延时消息,根据需要延时时间的不同,利用不同的routingkey将消息路由到不同的延时队列,每个队列都设置了不同的TTL属性,并绑定在同一个死信交换机中,消息过期后,根据routingkey的不同,又会被路由到不同的死信队列中,消费者只需要监听对应的死信队列进行处理即可。

接下来直接上代码

首先申明队列以及交换机的绑定关系,即添加一个RabbitmqConfig文件:

package com.miaosha.study.tet;

import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.HashMap;
import java.util.Map;


/**
 * @Author: laz
 * @CreateTime: 2023-02-27  09:50
 * @Version: 1.0
 */
@Configuration
public class RabbitMQConfig {

    /**
     * 延时交换机
     */
    public static final String DELAY_EXCHANGE_NAME = "delay.queue.business.exchange";
    /**
     * 延时队列a
     */
    public static final String DELAY_QUEUEA_NAME = "delay.queue.business.queuea";
    /**
     *延时队列b
     */
    public static final String DELAY_QUEUEB_NAME = "delay.queue.business.queueb";
    /**
     *延时队列a路由键
     */
    public static final String DELAY_QUEUEA_ROUTING_KEY = "delay.queue.business.queuea.routingkey";
    /**
     *延时队列b路由键
     */
    public static final String DELAY_QUEUEB_ROUTING_KEY = "delay.queue.business.queueb.routingkey";
    /**
     *死信交换机
     */
    public static final String DEAD_LETTER_EXCHANGE = "delay.queue.deadletter.exchange";
    /**
     *死信队列a路由键
     */
    public static final String DEAD_LETTER_QUEUEA_ROUTING_KEY = "delay.queue.deadletter.queuea.routingkey";
    /**
     *死信队列b路由键
     */
    public static final String DEAD_LETTER_QUEUEB_ROUTING_KEY = "delay.queue.deadletter.queueb.routingkey";
    /**
     *死信队列a
     */
    public static final String DEAD_LETTER_QUEUEA_NAME = "delay.queue.deadletter.queuea";
    /**
     *死信队列b
     */
    public static final String DEAD_LETTER_QUEUEB_NAME = "delay.queue.deadletter.queueb";

    /**
     * 声明延时Exchange
     * @return
     */
    @Bean("delayExchange")
    public DirectExchange delayExchange(){
        return new DirectExchange(DELAY_EXCHANGE_NAME);
    }

    /**
     * 声明死信Exchange
     * @return
     */
    @Bean("deadLetterExchange")
    public DirectExchange deadLetterExchange(){
        return new DirectExchange(DEAD_LETTER_EXCHANGE);
    }

    /**
     * 声明延时队列A 延时2s, 并绑定到对应的死信交换机
     * @return
     */
    @Bean("delayQueueA")
    public Queue delayQueueA(){
        Map<String, Object> args = new HashMap<>();
        //声明当前队列绑定的死信交换机
        args.put("x-dead-letter-exchange", DEAD_LETTER_EXCHANGE);
        //声明当前队列的死信路由key
        args.put("x-dead-letter-routing-key", DEAD_LETTER_QUEUEA_ROUTING_KEY);
        //声明队列的TTL
        args.put("x-message-ttl", 2000);
        return QueueBuilder.durable(DELAY_QUEUEA_NAME).withArguments(args).build();
    }

    /**
     * 声明延时队列B 延时20s, 并绑定到对应的死信交换机
     * @return
     */
    @Bean("delayQueueB")
    public Queue delayQueueB(){
        Map<String, Object> args = new HashMap<>(2);
        //声明当前队列绑定的死信交换机
        args.put("x-dead-letter-exchange", DEAD_LETTER_EXCHANGE);
        //声明当前队列的死信路由key
        args.put("x-dead-letter-routing-key", DEAD_LETTER_QUEUEB_ROUTING_KEY);
        //声明队列的TTL
        args.put("x-message-ttl", 20000);
        return QueueBuilder.durable(DELAY_QUEUEB_NAME).withArguments(args).build();
    }

    /**
     * 声明死信队列A
     * @return
     */
    @Bean("deadLetterQueueA")
    public Queue deadLetterQueueA(){
        return new Queue(DEAD_LETTER_QUEUEA_NAME);
    }

    /**
     * 声明死信队列B
     * @return
     */
    @Bean("deadLetterQueueB")
    public Queue deadLetterQueueB(){
        return new Queue(DEAD_LETTER_QUEUEB_NAME);
    }

    /**
     * 声明延时队列A绑定关系
     * @param queue
     * @param exchange
     * @return
     */
    @Bean
    public Binding delayBindingA(@Qualifier("delayQueueA") Queue queue,
                                 @Qualifier("delayExchange") DirectExchange exchange){
        return BindingBuilder.bind(queue).to(exchange).with(DELAY_QUEUEA_ROUTING_KEY);
    }

    /**
     * 声明业务队列B绑定关系
     * @param queue
     * @param exchange
     * @return
     */
    @Bean
    public Binding delayBindingB(@Qualifier("delayQueueB") Queue queue,
                                    @Qualifier("delayExchange") DirectExchange exchange){
        return BindingBuilder.bind(queue).to(exchange).with(DELAY_QUEUEB_ROUTING_KEY);
    }

    /**
     * 声明死信队列A绑定关系
     * @param queue
     * @param exchange
     * @return
     */
    @Bean
    public Binding deadLetterBindingA(@Qualifier("deadLetterQueueA") Queue queue,
                                    @Qualifier("deadLetterExchange") DirectExchange exchange){
        return BindingBuilder.bind(queue).to(exchange).with(DEAD_LETTER_QUEUEA_ROUTING_KEY);
    }

    /**
     * 声明死信队列B绑定关系
     * @param queue
     * @param exchange
     * @return
     */
    @Bean
    public Binding deadLetterBindingB(@Qualifier("deadLetterQueueB") Queue queue,
                                      @Qualifier("deadLetterExchange") DirectExchange exchange){
        return BindingBuilder.bind(queue).to(exchange).with(DEAD_LETTER_QUEUEB_ROUTING_KEY);
    }


  
}

添加消费者:

package com.miaosha.study.tet;

import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

import java.io.IOException;
import java.util.Date;

import static com.miaosha.study.tet.RabbitMQConfig.*;

/**
 * @Author: laz
 * @CreateTime: 2023-02-27  09:58
 * @Version: 1.0
 */
@Slf4j
@Component
public class DeadLetterQueueConsumer {

    @RabbitListener(queues = DEAD_LETTER_QUEUEA_NAME)
    public void receiveA(Message message, Channel channel) throws IOException {
        String msg = new String(message.getBody());
        log.info("当前时间:{},死信队列A收到消息:{}", new Date().toString(), msg);
        channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
    }

    @RabbitListener(queues = DEAD_LETTER_QUEUEB_NAME)
    public void receiveB(Message message, Channel channel) throws IOException {
        String msg = new String(message.getBody());
        log.info("当前时间:{},死信队列B收到消息:{}", new Date().toString(), msg);
        channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
    }

}

添加消息发送者:

package com.miaosha.study.tet;

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import java.util.Date;

import static com.miaosha.study.tet.RabbitMQConfig.*;

/**
 * @Author: laz
 * @CreateTime: 2023-02-27  09:52
 * @Version: 1.0
 */
@Slf4j
@Component
public class DelayMessageSender {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    public void sendMsg(String msg, Integer type){
        switch (type){
            case 1:
                rabbitTemplate.convertAndSend(DELAY_EXCHANGE_NAME, DELAY_QUEUEA_ROUTING_KEY, msg);
                break;
            case 2:
                rabbitTemplate.convertAndSend(DELAY_EXCHANGE_NAME, DELAY_QUEUEB_ROUTING_KEY, msg);
                break;
        }
    }
 }

添加一个测试接口:

package com.miaosha.study.tet;

import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import java.util.Date;
import java.util.Objects;

/**
 * @Author: laz
 * @CreateTime: 2023-02-27  10:58
 * @Version: 1.0
 */
@Slf4j
@RequestMapping("rabbitmq")
@RestController
public class RabbitMQMsgController {

    @Autowired
    private DelayMessageSender sender;

    @RequestMapping("sendmsg")
    public void sendMsg(String msg, Integer delayType){
        log.info("当前时间:{},收到请求,msg:{},delayType:{}", new Date(), msg, delayType);
        sender.sendMsg(msg, delayType);
    }
 }

然后启动项目,访问本接口

浏览器分别请求:
http://localhost:8080/rabbitmq/sendmsg?msg=testMsg1&delayType=1
http://localhost:8080/rabbitmq/sendmsg?msg=testMsg2&delayType=2

观察控制台:

在这里插入图片描述
第一条消息在2s后变成了死信消息,然后被消费者消费掉,第二条消息在20s之后变成了死信消息,然后被消费掉,这样,一个延时队列就打造完成了。

不过,等等,如果这样使用的话,岂不是每增加一个新的时间需求,就要新增一个队列,这里只有2s和20s两个时间选项,如果需要一个小时后处理,那么就需要增加TTL为一个小时的队列,如果是预定会议室然后提前通知这样的场景,岂不是要增加无数个队列才能满足需求??

显然,需要一种更通用的方案才能满足需求,那么就只能将TTL设置在消息属性里了

延时队列优化

接下来,在添加一个延时队列:

package com.miaosha.study.tet;

import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.HashMap;
import java.util.Map;


/**
 * @Author: laz
 * @CreateTime: 2023-02-27  09:50
 * @Version: 1.0
 */
@Configuration
public class RabbitMQConfig {

    /**
     * 延时交换机
     */
    public static final String DELAY_EXCHANGE_NAME = "delay.queue.business.exchange";
    /**
     *延时队列c
     */
    public static final String DELAY_QUEUEC_NAME = "delay.queue.business.queuec";
    /**
     *延时队列c路由键
     */
    public static final String DELAY_QUEUEC_ROUTING_KEY = "delay.queue.business.queuec.routingkey";
    /**
     *死信交换机
     */
    public static final String DEAD_LETTER_EXCHANGE = "delay.queue.deadletter.exchange";
    /**
     *死信队列c路由键
     */
    public static final String DEAD_LETTER_QUEUEC_ROUTING_KEY = "delay.queue.deadletter.queuec.routingkey";
    /**
     *死信队列c
     */
    public static final String DEAD_LETTER_QUEUEC_NAME = "delay.queue.deadletter.queuec";

    /**
     * 声明延时Exchange
     * @return
     */
    @Bean("delayExchange")
    public DirectExchange delayExchange(){
        return new DirectExchange(DELAY_EXCHANGE_NAME);
    }

    /**
     * 声明死信Exchange
     * @return
     */
    @Bean("deadLetterExchange")
    public DirectExchange deadLetterExchange(){
        return new DirectExchange(DEAD_LETTER_EXCHANGE);
    }


    /**
     * 声明延时队列C 不设置TTL(发送信息的时候设置),并绑定到对应的死信交换机
     *
     * @return
     */
    @Bean("delayQueueC")
    public Queue delayQueueC(){
        Map<String, Object> args = new HashMap<>(3);
        // x-dead-letter-exchange    这里声明当前队列绑定的死信交换机
        args.put("x-dead-letter-exchange", DEAD_LETTER_EXCHANGE);
        // x-dead-letter-routing-key  这里声明当前队列的死信路由key
        args.put("x-dead-letter-routing-key", DEAD_LETTER_QUEUEC_ROUTING_KEY);
        return QueueBuilder.durable(DELAY_QUEUEC_NAME).withArguments(args).build();
    }

    /**
     * 声明死信队列C 用于接收延时任意时长处理的消息
     * @return
     */
    @Bean("deadLetterQueueC")
    public Queue deadLetterQueueC(){
        return new Queue(DEAD_LETTER_QUEUEC_NAME);
    }

    /**
     * 声明延时列C绑定关系
     * @param queue
     * @param exchange
     * @return
     */
    @Bean
    public Binding delayBindingC(@Qualifier("delayQueueC") Queue queue,
                                 @Qualifier("delayExchange") DirectExchange exchange){
        return BindingBuilder.bind(queue).to(exchange).with(DELAY_QUEUEC_ROUTING_KEY);
    }

    /**
     * 声明死信队列C绑定关系
     * @param queue
     * @param exchange
     * @return
     */
    @Bean
    public Binding deadLetterBindingC(@Qualifier("deadLetterQueueC") Queue queue,
                                      @Qualifier("deadLetterExchange") DirectExchange exchange){
        return BindingBuilder.bind(queue).to(exchange).with(DEAD_LETTER_QUEUEC_ROUTING_KEY);
    }


}

添加对应消费者:

package com.miaosha.study.tet;

import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

import java.io.IOException;
import java.util.Date;

import static com.miaosha.study.tet.RabbitMQConfig.*;

/**
 * @Author: laz
 * @CreateTime: 2023-02-27  09:58
 * @Version: 1.0
 */
@Slf4j
@Component
public class DeadLetterQueueConsumer {



    @RabbitListener(queues = DEAD_LETTER_QUEUEC_NAME)
    public void receiveC(Message message, Channel channel) throws IOException {
        String msg = new String(message.getBody());
        log.info("当前时间:{},死信队列C收到消息:{}", new Date().toString(), msg);
        channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
    }

}

添加消息生产者:

package com.miaosha.study.tet;

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import java.util.Date;

import static com.miaosha.study.tet.RabbitMQConfig.*;

/**
 * @Author: laz
 * @CreateTime: 2023-02-27  09:52
 * @Version: 1.0
 */
@Slf4j
@Component
public class DelayMessageSender {

    @Autowired
    private RabbitTemplate rabbitTemplate;


    public void delayMsg(String msg,Integer delayTime){
        rabbitTemplate.convertAndSend(DELAY_EXCHANGE_NAME, DELAY_QUEUEC_ROUTING_KEY, msg, a ->{
            a.getMessageProperties().setExpiration(String.valueOf(delayTime));
            return a;
        });
    }


}

编写测试接口:

package com.miaosha.study.tet;

import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import java.util.Date;
import java.util.Objects;

/**
 * @Author: laz
 * @CreateTime: 2023-02-27  10:58
 * @Version: 1.0
 */
@Slf4j
@RequestMapping("rabbitmq")
@RestController
public class RabbitMQMsgController {

    @Autowired
    private DelayMessageSender sender;


    @RequestMapping("delayMsg")
    public void delayMsg(String msg, Integer delayTime){
        log.info("当前时间:{},收到请求,msg:{},delayType:{}", new Date(), msg, delayTime);
        sender.delayMsg(msg, delayTime);
    }

}

重启项目,访问接口,分别请求:
http://localhost:8081/rabbitmq/delayMsg?msg=msg1&delayTime=20000
http://localhost:8081/rabbitmq/delayMsg?msg=msg2&delayTime=2000

观察控制台:

在这里插入图片描述
通过观察时间可以看到,延时队列确实生效了,不过有个问题,第一个20s后被消费,第二个本来应该在2s后被消费,但结果确是跟着第一个后面消费的。也就是说,消息可能并不会按时“死亡“,因为RabbitMQ只会检查第一个消息是否过期,如果过期则丢到死信队列,索引如果第一个消息的延时时长很长,而第二个消息的延时时长很短,则第二个消息并不会优先得到执行。

这个结果显然不是我们想要的结果,那么,我们还得再次优化!

利用RabbitMQ插件实现延迟队列

这里需要用到RabbitMQ的rabbitmq_delayed_message_exchange插件,如果没有安装插件,可以点击查看安装教程

安装完成之后,在添加一个延时队列配置:

package com.miaosha.study.tet;

import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.HashMap;
import java.util.Map;


/**
 * @Author: laz
 * @CreateTime: 2023-02-27  09:50
 * @Version: 1.0
 */
@Configuration
public class RabbitMQConfig {



    public static final String DELAYED_QUEUE_NAME = "delay.queue.delay.queue";
    public static final String DELAYED_EXCHANGE_NAME = "delay.queue.delay.exchange";
    public static final String DELAYED_ROUTING_KEY = "delay.queue.delay.routingkey";

    @Bean
    public Queue immediateQueue() {
        return new Queue(DELAYED_QUEUE_NAME);
    }

    @Bean
    public CustomExchange customExchange() {
        Map<String, Object> args = new HashMap<>();
        args.put("x-delayed-type", "direct");
        return new CustomExchange(DELAYED_EXCHANGE_NAME, "x-delayed-message", true, false, args);
    }

    @Bean
    public Binding bindingNotify(@Qualifier("immediateQueue") Queue queue,
                                 @Qualifier("customExchange") CustomExchange customExchange) {
        return BindingBuilder.bind(queue).to(customExchange).with(DELAYED_ROUTING_KEY).noargs();
    }
}

添加消费者:

package com.miaosha.study.tet;

import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

import java.io.IOException;
import java.util.Date;

import static com.miaosha.study.tet.RabbitMQConfig.*;

/**
 * @Author: laz
 * @CreateTime: 2023-02-27  09:58
 * @Version: 1.0
 */
@Slf4j
@Component
public class DeadLetterQueueConsumer {


    @RabbitListener(queues = DELAYED_QUEUE_NAME)
    public void receiveD(Message message, Channel channel) throws IOException {
        String msg = new String(message.getBody());
        log.info("当前时间:{},延时队列收到消息:{}", new Date().toString(), msg);
        channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
    }
}

添加消息生产者:

package com.miaosha.study.tet;

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import java.util.Date;

import static com.miaosha.study.tet.RabbitMQConfig.*;

/**
 * @Author: laz
 * @CreateTime: 2023-02-27  09:52
 * @Version: 1.0
 */
@Slf4j
@Component
public class DelayMessageSender {

    @Autowired
    private RabbitTemplate rabbitTemplate;


    public void sendDelayMsg(String msg, Integer delayTime) {
        rabbitTemplate.convertAndSend(DELAYED_EXCHANGE_NAME, DELAYED_ROUTING_KEY, msg, a ->{
            a.getMessageProperties().setDelay(delayTime);
            return a;
        });
    }
}

在写一个测试接口:

package com.miaosha.study.tet;

import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import java.util.Date;
import java.util.Objects;

/**
 * @Author: laz
 * @CreateTime: 2023-02-27  10:58
 * @Version: 1.0
 */
@Slf4j
@RequestMapping("rabbitmq")
@RestController
public class RabbitMQMsgController {

    @Autowired
    private DelayMessageSender sender;



    @RequestMapping("delayMsg2")
    public void delayMsg2(String msg, Integer delayTime) {
        log.info("当前时间:{},收到请求,msg:{},delayTime:{}", new Date(), msg, delayTime);
        sender.sendDelayMsg(msg, delayTime);
    }
}

然后重启项目,再次访问:
http://localhost:8081/rabbitmq/delayMsg2?msg=msg1&delayTime=20000
http://localhost:8081/rabbitmq/delayMsg2?msg=msg2&delayTime=2000

再次观察控制台:

在这里插入图片描述

可以看到,已然达到我们的要求,符合预期结果。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/376380.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

沃尔玛采购退款大额过不了的原因是什么?

市面上有很多伪装工具&#xff0c;但不是针对沃尔玛&#xff0c;很多软件方只是针对大众平台伪装&#xff0c;主要是方便程序开发人员调测系统程序&#xff0c;并不是能用于沃尔玛行业。而且每一个平台的风控是不—样的&#xff0c;我们技术团队从底层硬件环境开始配合软件控制…

【论文精读】MVSNet系列(2018-2022)总结

MVSNet系列总结1.MVSNet ECCV20182.RMVSNet CVPR20193.P-MVSNet ICCV20194.MVSCRF ICCV20195.PointMVSNet ICCV20192019年的这四篇文章各有特点&#xff0c;其中RMVSNet、PointMVSNet更是打开了可以继续沿着往下做的思路&#xff1a;6.cascade MVSNet CVPR20207.UCSNet CVPR202…

初识go变量,使用var和:=来声明变量,声明变量的三种方式

初识go变量,使用var和:来声明变量&#xff0c;声明变量的三种方式 Go语言的变量名由字母、数字、下划线组成&#xff0c;其中首个字符不能为数字。 tip:(Go语言中的变量的规范&#xff0c;也和其他高级语言相同) 声明变量的一般形式是使用 var 关键字&#xff1a; var 变量…

SpringCloud源码探析(二)-Nacos注册中心

1.概述 Nacos是Spring Cloud Alibaba中的核心组件之一&#xff0c;它提供了服务发现、服务配置和服务管理等功能&#xff0c;可以作为注册中心和配置中心使用。注册中心是微服务架构体系中的核心组件之一&#xff0c;Nacos比Eureka有着更强大的功能&#xff0c;它们都能提供服…

Java多线程系列-- ForkJoin框架,分治的艺术

前言 本篇是多线程系列中的一篇&#xff0c;我们在先前的一篇文章中回顾了线程池的主要知识 Java多线程基础–线程的创建与线程池管理 过去了很长时间&#xff0c;我们简单提要一下&#xff1a; 设计目的&#xff1a;简化线程的使用&#xff0c;管理与复用&#xff0c;避免…

Windows 远程桌面安全吗?电脑远程桌面的安全如何保障?

远程桌面会话在加密通道上运行&#xff0c;防止任何人通过监听网络来查看您的会话。 但是&#xff0c;在早期版本的 RDP 中用于加密会话的方法存在漏洞。 此漏洞可能允许使用中间人攻击 (link is external) 未经授权访问您的会话。 我们可以在 Windows 10、Windows 11 和 Wind…

学好数据结构与算法其实一点也不难

一. 初识算法 1.1 什么是算法&#xff1f; 定义 在数学和计算机科学领域&#xff0c;算法是一系列有限的严谨指令&#xff0c;通常用于解决一类特定问题或执行计算 In mathematics and computer science, an algorithm (/ˈlɡərɪəm/) is a finite sequence of rigorous …

MSVCP140.dll下载及安装教程,dll修复方法

MSVCP140.dll是Windows操作系统中的一个DLL文件&#xff0c;许多程序依赖于它来正常运行。如果您尝试运行某个程序时&#xff0c;发现缺少MSVCP140.dll文件&#xff0c;那么您需要下载并安装它才能解决问题。本文将介绍如何MSVCP140.dll下载和安装MSVCP140.dll。 第一步&#x…

解决树莓派 bullseye (11) 系统无法通过 xrdp 远程连接的问题

我手上有一台树莓派 4B&#xff0c;使用官方镜像烧录器烧录老版本操作系统 buster (10) 时可以正常通过 Windows 远程桌面连接上&#xff0c;但换成最新的 bullseye (11) 系统后却无法正常连接远程桌面。 问题复现&#xff1a; 使用官方镜像烧录器烧录&#xff0c;配置用户名为…

CSDN 竞赛 32 期

CSDN 竞赛 32 期1、题目名称&#xff1a;传奇霸业2、题目名称&#xff1a;严查枪火3、题目名称&#xff1a;蚂蚁家族4、题目名称&#xff1a;运输石油小结1、题目名称&#xff1a;传奇霸业 传奇霸业&#xff0c;是兄弟就来干。 小春(HP a)遇到了一只黄金哥布林(HP x)。 小春每…

20个让你效率更高的CSS代码技巧

在本文中&#xff0c;我们想与您分享一个由各大css网站总结推荐的20个有用的规则和实践经验集合。有一些是面向CSS初学者的&#xff0c;有一些知识点是进阶型的。希望每个人通过这篇文章都能学到对自己有用的知识。好了&#xff0c;我们开始。1.注意外边距折叠与其他大多数属性…

从工地转行软件测试,拿下13k+年终奖是种什么体验?

最近&#xff0c;一则名为《我&#xff1a;毕业五年&#xff0c;存款5000。她:中传硕士&#xff0c;火锅店保洁》的视频走红网络&#xff0c;两位名校毕业生看似高开低走的就业经历&#xff0c;引起了很多人的共鸣。她们所传达的并不是所谓的躺平、摆烂&#xff0c;而是希望更多…

spark性能调优(一):Shuffle

Shuffle 一、配置项调优二、减少shuffle数据量三、避免shuffle何为shuffle? 集群中跨进程、跨节点的数据分发(Map的输出文件写到本地磁盘,Reducer把Map的输出文件拉到本地)为什么要shuffle? 准确的说,shuffle是刚需(业务场景决定的),分布式环境中,不同节点不能进行内存交换,只…

加快发展先进制造业势在必行!

众所周知&#xff0c;我国是制造大国&#xff0c;但并非制造强国。而我们在持续发展制造业的进程中也面临着诸多实际问题&#xff0c;如产业发展后续乏力&#xff0c;环境制约异常突出&#xff0c;技术创新能力薄弱&#xff0c;结构调整任务艰巨等等。故而要实现由制造大国向制…

数据结构与算法之二叉树大全

目录二叉树的定义二叉树的性质(特性)满二叉树与完全二叉树链式存储的二叉树顺序存储的二叉树线索二叉树(Threaded BinaryTree)二叉排序树&#xff08;Binary Sort Tree&#xff09;平衡二叉树&#xff08; Balanced Binary Tree&#xff09;为什么使用平衡二叉树&#xff1f;如…

Vue中如何利用websocket实现实时通讯

首先我们可以先做一个简单的例子来学习一下简单的websocket模拟聊天对话的功能 原理很简单&#xff0c;有点像VUE中的EventBus&#xff0c;用emit和on传来传去 首先我们可以先去自己去用node搭建一个本地服务器 步骤如下 1.新建一个app.js&#xff0c;然后创建pagejson.js文…

最简单的代码生成器,netcore平台,EF架构,smartsofthelp

1.dbhelper 原生SQL操作类2.Model 实体层3.EF dbfirst 生成entites 实体操作类4.EF实体接口增删改查操作方法成员5.UI 展示层SQL数据脚本Model/// <summary>/// Model实体层 /// </summary>namespace Smart.Model{/// <summary>/// 数据实体层 T_Eventsmart …

珠海数据智能监控器+SaaS平台 轻松实现SMT生产管控

数据智能监控器 兼容市面上99%的SMT设备 直接读取设备生产数据与状态&#xff0c;如&#xff1a;计划产出、实际产出、累计产出、停机、节拍、线利用率、直通率、停产时间、工单状态、OEE…… 产品功能价值 ◎ OEE不达标报警&#xff0c;一手掌握生产效能 ◎ 首检/巡检/成…

研发人员最希望项目经理和PMO能够做什么?看完不要惊讶

作为项目经理和PMO你考虑过自己在其他人眼中的形象么&#xff1f;知道各个环节最希望你做什么吗&#xff1f;对于最常打交道的研发人员&#xff0c;你知道他们最希望你做什么吗&#xff1f; 如果不能了解其他环节对你的期望&#xff0c;你往往很难获得认同&#xff0c;为此&am…

Golang实现RabbitMQ中死信队列各个情况

下面这段教程针对是你已经有一些基本的MQ的知识&#xff0c;比如说能够很清楚的理解queue、exchange等概念&#xff0c;如果你还不是很理解&#xff0c;我建议你先访问官网查看基本的教程。 文章目录1、造成死信队列的主要原因2、操作逻辑图3、代码实战3.1 针对原因1&#xff1…