1.死信队列+TTL
-
什么是TTL
- time to live:消息存活时间
- 如果消息在存活时间内未被消费,则会被清除
- RabbitMQ支持两种TTL设置
- 单独消息进行配置TTL
- 整个队列进行配置TTL(使用居多)
-
什么是RabbitMQ的死信队列
- 没有被及时消费的消息存放的队列
-
什么是RabbitMQ的死信交换机
- Dead Letter Exchange(死信交换机,缩写DLX),当消息成为死信后,会被重新发送到另一个交换机,这个交换机就是DLX死信交换机
-
消息有哪几种情况成为死信
-
消费者拒收消息(basic.reject/basic.nack),并且没有重新入队requeue=false
-
消息在队列中未被消费,且超过队列或者消息本身的过期时间TTL(time to live)
-
队列的消息长度达到极限
-
结果:消息成为死信后,如果该队列绑定了死信交换机,则消息会被死信交换机重新路由到死信队列
-
2.使用管控台测试死信队列
-
RabbitMQ管控台消息TTL测试
-
队列过期时间使用参数,对整个队列消息统一过期
- x-message-ttl:单位ms毫秒
-
消息过期时间使用参数(如果队列头部消息未过期,队列中间消息已经过期,该消息还在队列里面)
- expiration:单位ms毫秒
-
两者都配置的话,时间短的先触发
-
-
RabbitMQ Web控制台测试
-
新建死信交换机(和普通交换机没区别)
-
新建死信队列(和普通队列没区别)
-
死信交换机和死信队列绑定
-
新建普通队列,设置过期时间、指定死信交换机
-
测试:直接在Web控制台往普通队列发送消息即可
-
3.延迟队列
-
什么是延迟队列
- 一种带有延迟功能的消息队列,Producer将消息发送到消息队列服务端,但并不期望这条消息立马投递,而是推迟在当前时间点之后的某一个时间投递到Consumer进行消费,该消息即定时消息
-
业界的一些实现延迟方式
- 定时任务高精度轮训
- 采用RocketMQ自带延迟消息功能
- RabbitMQ本身是不支持延迟队列的,怎么办?
- 结合死信队列的特性,就可以做到延迟消息
-
交换机和队列注册代码
package com.gen.config; import org.springframework.amqp.core.*; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import java.util.HashMap; import java.util.Map; @Configuration public class RabbitMQConfig { /** * 死信交换机 */ public static final String DEAD_EXCHANGE = "dead_exchange"; /** * 死信队列 */ public static final String DEAD_QUEUE = "dead_queue"; /** * 死信路由键 */ public static final String DEAD_ROUTING_KEY = "dead_routing_key"; /** * 死信交换机 * * @return */ @Bean public Exchange deadExchange() { return ExchangeBuilder.topicExchange(DEAD_EXCHANGE).durable(true).build(); } /** * 死信队列 * * @return */ @Bean public Queue deadQueue() { return QueueBuilder.durable(DEAD_QUEUE).build(); } /** * 死信交换机与死信队列进行绑定 * * @param deadQueue * @param deadExchange * @return */ @Bean public Binding deadBinding(Queue deadQueue, Exchange deadExchange) { return BindingBuilder.bind(deadQueue).to(deadExchange).with(DEAD_ROUTING_KEY).noargs(); } /** * 普通交换机 */ public static final String ORDER_EXCHANGE = "order_exchange"; /** * 普通队列 */ public static final String ORDER_QUEUE = "order_queue"; /** * 普通路由键 */ public static final String ORDER_ROUTING_KEY = "order_routing_key"; /** * 普通交换机 * * @return */ @Bean public Exchange orderExchange() { return ExchangeBuilder.topicExchange(ORDER_EXCHANGE).durable(true).build(); } /** * 普通队列 * * @return */ @Bean public Queue orderQueue() { Map<String, Object> args = new HashMap<>(3); // 过期时间,单位毫秒 args.put("x-message-ttl", 10000); // 消息过期后,进入到死信交换机 args.put("x-dead-letter-exchange", DEAD_EXCHANGE); // 消息过期后,进入到死信交换机的路由键 args.put("x-dead-letter-routing-key", DEAD_ROUTING_KEY); return QueueBuilder.durable(ORDER_QUEUE).withArguments(args).build(); } /** * 普通交换机与普通队列进行绑定 * * @param orderQueue * @param orderExchange * @return */ @Bean public Binding orderBinding(Queue orderQueue, Exchange orderExchange) { return BindingBuilder.bind(orderQueue).to(orderExchange).with(ORDER_ROUTING_KEY).noargs(); } }
-
消息生产者
package com.gen; import com.gen.config.RabbitMQConfig; import org.junit.jupiter.api.Test; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; @SpringBootTest class GenRabbitmqApplicationTests { @Autowired private RabbitTemplate rabbitTemplate; @Test void send() { this.rabbitTemplate.convertAndSend(RabbitMQConfig.ORDER_EXCHANGE, RabbitMQConfig.ORDER_ROUTING_KEY, "测试延迟队列,设置10s"); } }
-
消息消费者(只监听消费死信队列,不监听消费普通队列)
package com.gen.listener; import com.gen.config.RabbitMQConfig; import com.rabbitmq.client.Channel; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.annotation.RabbitHandler; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; import java.io.IOException; @Component @RabbitListener(queues = RabbitMQConfig.DEAD_QUEUE) public class DeadMQListener { @RabbitHandler public void deadConsumer(String msg, Message message, Channel channel) throws IOException { System.out.println(msg); // 成功确认,消费成功 channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); } }