RabbitMQ死信队列&延迟交换机
1.什么是死信
死信&死信队列 |
---|
死信队列的应用:
- 基于死信队列在队列消息已满的情况下,消息也不会丢失
- 实现延迟消费的效果。比如:下订单时,有15分钟的付款时间
2. 实现死信队列
2.1 准备Exchange&Queue
package com.llp.rabbitmq.config;
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* 死信队列配置
*/
@Configuration
public class DeadLetterConfig {
public static final String NORMAL_EXCHANGE = "normal-exchange";
public static final String NORMAL_QUEUE = "normal-queue";
public static final String NORMAL_ROUTING_KEY = "normal.#";
public static final String DEAD_EXCHANGE = "dead-exchange";
public static final String DEAD_QUEUE = "dead-queue";
public static final String DEAD_ROUTING_KEY = "dead.#";
@Bean
public Exchange normalExchange(){
return ExchangeBuilder.topicExchange(NORMAL_EXCHANGE).build();
}
@Bean
public Queue normalQueue(){
//普通队列,绑定死信队列
return QueueBuilder.durable(NORMAL_QUEUE).deadLetterExchange(DEAD_EXCHANGE).deadLetterRoutingKey("dead.abc").build();
}
@Bean
public Binding normalBinding(Queue normalQueue,Exchange normalExchange){
return BindingBuilder.bind(normalQueue).to(normalExchange).with(NORMAL_ROUTING_KEY).noargs();
}
@Bean
public Exchange deadExchange(){
return ExchangeBuilder.topicExchange(DEAD_EXCHANGE).build();
}
@Bean
public Queue deadQueue(){
return QueueBuilder.durable(DEAD_QUEUE).build();
}
@Bean
public Binding deadBinding(Queue deadQueue,Exchange deadExchange){
return BindingBuilder.bind(deadQueue).to(deadExchange).with(DEAD_ROUTING_KEY).noargs();
}
}
2.2 实现效果
-
基于消费者进行reject或者nack实现死信效果
package com.llp.rabbitmq.topic; import com.llp.rabbitmq.config.DeadLetterConfig; import com.rabbitmq.client.Channel; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; import java.io.IOException; @Component public class DeadListener { @RabbitListener(queues = DeadLetterConfig.NORMAL_QUEUE) public void consume(String msg, Channel channel, Message message) throws IOException { System.out.println("接收到normal队列的消息:" + msg); //设置消息决绝消费,不需要重新放入到队列中 channel.basicReject(message.getMessageProperties().getDeliveryTag(),false); //或者 //channel.basicNack(message.getMessageProperties().getDeliveryTag(),false,false); } }
-
消息的生存时间
-
给消息设置生存时间
@Test public void publishExpire(){ String msg = "dead letter expire"; rabbitTemplate.convertAndSend(DeadLetterConfig.NORMAL_EXCHANGE, "normal.abc", msg, new MessagePostProcessor() { @Override public Message postProcessMessage(Message message) throws AmqpException { message.getMessageProperties().setExpiration("5000"); return message; } }); }
-
给队列设置消息的生存时间
@Bean public Queue normalQueue(){ return QueueBuilder.durable(NORMAL_QUEUE) .deadLetterExchange(DEAD_EXCHANGE) .deadLetterRoutingKey("dead.abc") .ttl(10000) .build(); }
-
-
设置Queue中的消息最大长度
@Bean public Queue normalQueue(){ return QueueBuilder.durable(NORMAL_QUEUE) .deadLetterExchange(DEAD_EXCHANGE) .deadLetterRoutingKey("dead.abc") .maxLength(1) .build(); }
只要Queue中已经有一个消息,如果再次发送一个消息,这个消息会变为死信!
3.延迟交换机
下载地址:https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases/tag/3.8.9
死信队列实现延迟消费时,如果延迟时间比较复杂,比较多,直接使用死信队列时,需要创建大量的队列还对应不同的时间,可以采用延迟交换机来解决这个问题。
将下载的文件上传到linux服务器并使用如下指令,将文件方到rabbitmq容器的plugins目录下
docker cp rabbitmq_delayed_message_exchange-3.8.9-0199d11c.ez 9da57c5038ba:/opt/rabbitmq/plugins
在rabbitmq容器的/opt/rabbitmq/sbin目录下执行
rabbitmq-plugins enable rabbitmq_delayed_message_exchange
重启容器生效
docker restart 9da57c5038ba
可以看到添加插件后多了一个延迟交换机的选项
-
构建延迟交换机
package com.llp.rabbitmq.config; import org.springframework.amqp.core.*; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import java.util.HashMap; import java.util.Map; /** * 延迟队列 */ @Configuration public class DelayedConfig { public static final String DELAYED_EXCHANGE = "delayed-exchange"; public static final String DELAYED_QUEUE = "delayed-queue"; public static final String DELAYED_ROUTING_KEY = "delayed.#"; @Bean public Exchange delayedExchange(){ Map<String, Object> arguments = new HashMap<>(); arguments.put("x-delayed-type","topic"); Exchange exchange = new CustomExchange(DELAYED_EXCHANGE,"x-delayed-message",true,false,arguments); return exchange; } @Bean public Queue delayedQueue(){ return QueueBuilder.durable(DELAYED_QUEUE).build(); } @Bean public Binding delayedBinding(Queue delayedQueue,Exchange delayedExchange){ return BindingBuilder.bind(delayedQueue).to(delayedExchange).with(DELAYED_ROUTING_KEY).noargs(); } }
-
发送消息
package com.llp.rabbitmq; import com.llp.rabbitmq.config.DelayedConfig; import org.junit.jupiter.api.Test; import org.springframework.amqp.AmqpException; import org.springframework.amqp.core.Message; import org.springframework.amqp.core.MessagePostProcessor; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; @SpringBootTest public class DelayedPublisherTest { @Autowired private RabbitTemplate rabbitTemplate; @Test public void publish(){ rabbitTemplate.convertAndSend(DelayedConfig.DELAYED_EXCHANGE, "delayed.abc", "xxxx", new MessagePostProcessor() { @Override public Message postProcessMessage(Message message) throws AmqpException { //设置消息指定多少时间被消费,单位毫秒 message.getMessageProperties().setDelay(30000); return message; } }); } }
**延迟交换机存在的问题:**在延迟推送消息的过程中rabbitmq重启了、或者说服务器宕机了就会导致消息丢失