下载RabbitMq:本地安装rabbitmq_王胖胖1112的博客-CSDN博客
1、pom文件引入
<dependency>
<groupId>org.springframework.amqp</groupId>
<artifactId>spring-rabbit-test</artifactId>
<scope>test</scope>
</dependency>
2、设置配置文件
spring:
rabbitmq:
host: IP地址 // 若在本地:127.0.0.1
username: guest //默认的账号密码
password: guest
port: 5672
virtual-host: /
3、使用延迟队列需要安装插件
一、进行地址下载插件:https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases
二、将下载的文件放入rabbitMq目录下的plugins文件夹中
三、进入工作台,进入rabbitMq目录下的plugins目录下,执行:rabbitmq-plugins enable rabbitmq_delayed_message_exchange,开启插件支持
四、重启rabbitMq
net start RabbitMQ 启动
net stop RabbitMQ 停止
五、到http://localhost:15672,查看交换机类型是否有x-delayed-message,有则成功了
4、配置文件类代码
@Configuration public class MqConf { /** *死信交换机 * */ public static final String DELAYED_EXCHANGE_NAME = "delayed.exchange"; /** *死信队列 * */ public static final String DELAYED_QUEUE_NAME = "delayed.queue"; /** *死信路由 * */ public static final String DELAYED_ROUTING_KEY = "delayed.routingKey";/**
*定义死信队列
* */
@Bean("delayedQueue") public Queue delayedQueue(){ return new Queue(DELAYED_QUEUE_NAME); }/**
*定义死信交换机
* */
@Bean("delayedExchange") public CustomExchange delayedExchange(){ Map<String, Object> args = new HashMap<>(1); // 自定义交换机的类型 args.put("x-delayed-type", "direct"); return new CustomExchange(DELAYED_EXCHANGE_NAME, "x-delayed-message", true, false, args); }/**
*绑定
* */
@Bean public Binding bindingDelayedQueue(@Qualifier("delayedQueue") Queue delayedQueue, @Qualifier("delayedExchange") CustomExchange delayedExchange){ return BindingBuilder.bind(delayedQueue).to(delayedExchange).with(DELAYED_ROUTING_KEY).noargs(); }}
5、消息生产者代码
@Slf4j @RestController @RequestMapping("/ttl") public class TestController { @GetMapping("/sendDelayMsg/{message}/{delayTime}") public String sendMsg(@PathVariable String message, @PathVariable Integer delayTime){ rabbitTemplate.convertAndSend(TtlQueueConfig.DELAYED_EXCHANGE_NAME, TtlQueueConfig.DELAYED_ROUTING_KEYA, message, messagePostProcessor ->{ messagePostProcessor.getMessageProperties().setDelay(delayTime); return messagePostProcessor; }); log.info("当前时间:{},发送一条延迟{}毫秒的信息给队列delay.queue:{}", new Date(), delayTime, message); return "发送成功"; } }
6、消息消费者代码
@Slf4j @Component public class DeadLetterConsumer { @RabbitListener(queues = TtlQueueConfig.DELAYED_QUEUE_NAMEA) public void receiveD(String msg){ log.info("当前时间{},队列收到死信队列的消息:{}", new Date(), msg); } }