什么是消息的TTL?
TTL = time to live,消息的TTL = 消息的存活时间或过期时间
什么是死信队列?
当队列中的消息到达存活时间或过期时间后,若未设置死信队列,则该消息将被抛弃,反之则转入死信队列
死信队列
配置类
死信队列仅需配置类,其与消息队列的绑定在消息队列的配置类中完成
// 死信队列
@Configuration
public class DeadLetterConfiguration {
//订单交换机
@Bean
public DirectExchange deadLetterDirectExchange(){
return new DirectExchange("dead_letter_direct_exchange_order", true, false);
}
//消息通知队列
@Bean
public Queue deadLetterQueue(){
return new Queue("dead_letter_queue", true, false, false);
}
//绑定交换机与队列
@Bean
public Binding deadLetterDirectBinding(){
return BindingBuilder.bind(deadLetterQueue()).to(deadLetterDirectExchange()).with("dead_letter_order");
}
}
消息的TTL
消息的TTL分为消息队列的TTL与消息本身的TTL
消息队列的TTL:被转发至该队列中的所有消息均遵循该队列的TTL规则
消息本身的TTL:消息遵循本身携带的TTL规则,不影响所处队列中的其他消息
问:若同时存在消息队列的TTL与消息本身的TTL,优先级如何?
答:取时间短者
消息队列的TTL
配置类
关于示例代码中的参数,可于RabbitMQ Management---Queue---Add a new queue---Arguments中查看
@Configuration
public class QueueTTLConfiguration {
//订单交换机
@Bean
public DirectExchange queueTTLDirectExchange(){
return new DirectExchange("ttl_direct_exchange_order", true, false);
}
//消息通知队列
@Bean
public Queue TTLQueue(){
//TTL参数设置
Map<String, Object> args = new HashMap<>();
args.put("x-message-ttl", 5000); //time to live 5000ms
args.put("x-max-length", 5); //队列最大消息数(溢出的消息移至死信队列)
args.put("x-dead-letter-exchange", "dead_letter_direct_exchange_order");//死信队列交换机
args.put("x-dead-letter-routing-key", "dead_letter_order"); //死信队列routingKey(当然 fanout交换机无需配置routingKey)
return new Queue("ttl_queue", true, false, false, args);
}
//绑定交换机与队列
@Bean
public Binding TTLDirectBinding(){
return BindingBuilder.bind(TTLQueue()).to(queueTTLDirectExchange()).with("queue_ttl");
}
}
生产者
@Service
public class QueueTTLOrderService {
@Autowired
RabbitTemplate rabbitTemplate;
public void makeOrder(String userID, String producerID, int num){
// 1.根据需求查询仓库 判断是否能满足需求
// 2.若能满足则生成订单
String orderID = UUID.randomUUID().toString();
System.out.println("成功生成订单");
// 3.通过RabbitMQ发送消息
String exchangeName = "ttl_direct_exchange_order";
String routingKey = "queue_ttl";
rabbitTemplate.convertAndSend(exchangeName, routingKey, orderID + " queue_ttl");
System.out.println("订单发送成功");
}
}
消费者
此处仅演示消息队列的TTL与死信队列,故不添加消费者
消息本身的TTL
配置类
此处演示不与死信队列绑定情况下的TTL
@Configuration
public class MessageTTLConfiguration {
//订单交换机
@Bean
public DirectExchange messageTTLDirectExchange(){
return new DirectExchange("ttl_direct_exchange_order", true, false);
}
//消息通知队列
@Bean
public Queue messageTTLQueue(){
return new Queue("message_ttl_queue", true, false, false);
}
//绑定交换机与队列
@Bean
public Binding MessageTTLDirectBinding(){
return BindingBuilder.bind(messageTTLQueue()).to(messageTTLDirectExchange()).with("message_ttl");
}
}
生产者
@Service
public class MessageTTLOrderService {
@Autowired
RabbitTemplate rabbitTemplate;
public void makeOrder(String userID, String producerID, int num){
// 1.根据需求查询仓库 判断是否能满足需求
// 2.若能满足则生成订单
String orderID = UUID.randomUUID().toString();
System.out.println("成功生成订单");
// 3.通过RabbitMQ发送消息
String exchangeName = "ttl_direct_exchange_order";
String routingKey = "message_ttl";
//消息传输加工机 此处用于为消息设置TTL时间
MessagePostProcessor messagePostProcessor = new MessagePostProcessor() {
@Override
public Message postProcessMessage(Message message) throws AmqpException {
message.getMessageProperties().setExpiration("5000");//Expiration 到期
message.getMessageProperties().setContentEncoding("UTF-8");
return message;
}
};
// 将messagePostProcessor对象同消息一起传递
rabbitTemplate.convertAndSend(exchangeName, routingKey, orderID + " message_ttl", messagePostProcessor);
System.out.println("订单发送成功");
}
}
消费者
此处仅演示消息本身的TTL,故不添加消费者
测试
消息队列的TTL:此处发送十条消息,溢出的五条会马上被送至死信队列,五秒后余下的五条也会被送至死信队列
@Autowired
QueueTTLOrderService queueTtlOrderService;
@Test
void ttlOrders(){
for (int i = 0; i < 10; i++) {
queueTtlOrderService.makeOrder("1", "1", 1);
}
}
消息本身的TTL:消息队列并未设置上限,也未绑定死信队列,故消息失效后将被直接抛弃
@Autowired
MessageTTLOrderService messageTTLOrderService;
@Test
void messageTTLOrder() {
messageTTLOrderService.makeOrder("1", "1", 1);
}