在 Spring Boot 项目中整合 RabbitMQ 并使用死信队列(Dead Letter Exchange, DLX)实现延迟队列功能,可以通过配置 RabbitMQ 的交换机、队列和绑定关系来实现。以下是一个完整的示例,展示如何在 Spring Boot 项目中使用死信队列实现延迟队列。
1. 添加依赖
在
pom.xml
文件中添加 RabbitMQ 的依赖:xml
复制
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency>运行 HTML
2. 配置 RabbitMQ
在
application.yml
文件中配置 RabbitMQ 的连接信息:yaml
复制
spring: rabbitmq: host: localhost port: 5672 username: guest password: guest
3. 创建 RabbitMQ 配置类
创建一个配置类,用于定义交换机、队列和绑定关系:
java
复制
import org.springframework.amqp.core.*; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import java.util.HashMap; import java.util.Map; @Configuration public class RabbitMQConfig { public static final String DELAY_EXCHANGE_NAME = "delay.exchange"; public static final String DELAY_QUEUE_NAME = "delay.queue"; public static final String DLX_EXCHANGE_NAME = "dlx.exchange"; public static final String DLX_QUEUE_NAME = "dlx.queue"; // 创建死信交换机 @Bean public DirectExchange dlxExchange() { return new DirectExchange(DLX_EXCHANGE_NAME); } // 创建死信队列 @Bean public Queue dlxQueue() { return new Queue(DLX_QUEUE_NAME); } // 绑定死信队列到死信交换机 @Bean public Binding dlxBinding() { return BindingBuilder.bind(dlxQueue()).to(dlxExchange()).with(""); } // 创建延迟交换机 @Bean public DirectExchange delayExchange() { return new DirectExchange(DELAY_EXCHANGE_NAME); } // 创建延迟队列,并设置死信交换机 @Bean public Queue delayQueue() { Map<String, Object> args = new HashMap<>(); args.put("x-dead-letter-exchange", DLX_EXCHANGE_NAME); return new Queue(DELAY_QUEUE_NAME, false, false, false, args); } // 绑定延迟队列到延迟交换机 @Bean public Binding delayBinding() { return BindingBuilder.bind(delayQueue()).to(delayExchange()).with(""); } }
4. 发送延迟消息
创建一个消息发送服务,用于发送延迟消息:
java
复制
import org.springframework.amqp.core.AmqpTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; @Service public class RabbitMQSender { @Autowired private AmqpTemplate rabbitTemplate; public void sendDelayMessage(String message, long delayTime) { rabbitTemplate.convertAndSend(RabbitMQConfig.DELAY_EXCHANGE_NAME, "", message, msg -> { msg.getMessageProperties().setExpiration(String.valueOf(delayTime)); return msg; }); System.out.println(" [x] Sent '" + message + "' with delay " + delayTime + " ms"); } }
5. 消费延迟消息
创建一个消息消费者,用于消费延迟消息:
java
复制
import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; @Component public class RabbitMQReceiver { @RabbitListener(queues = RabbitMQConfig.DLX_QUEUE_NAME) public void receiveDelayMessage(String message) { System.out.println(" [x] Received '" + message + "'"); } }
6. 测试延迟队列
创建一个测试类,用于测试延迟队列的功能:
java
复制
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.CommandLineRunner; import org.springframework.stereotype.Component; @Component public class RabbitMQTest implements CommandLineRunner { @Autowired private RabbitMQSender rabbitMQSender; @Override public void run(String... args) throws Exception { String message = "Hello, RabbitMQ Delay Queue!"; long delayTime = 5000; // 延迟 5 秒 rabbitMQSender.sendDelayMessage(message, delayTime); } }
7. 运行项目
启动 Spring Boot 项目,观察控制台输出,可以看到消息在延迟 5 秒后被消费。
总结
配置 RabbitMQ:在
application.yml
文件中配置 RabbitMQ 的连接信息。创建 RabbitMQ 配置类:定义交换机、队列和绑定关系,设置死信交换机。
发送延迟消息:通过
AmqpTemplate
发送带有 TTL 的消息。消费延迟消息:通过
@RabbitListener
注解消费死信队列中的消息。
通过这种方式,可以在 Spring Boot 项目中使用死信队列实现延迟队列功能。需要注意的是,消息的 TTL 是以毫秒为单位的,且消息的延迟时间不能超过 RabbitMQ 的最大消息大小限制。
上面代码没看懂的,看这里,以下是详细的流程:
发送延迟消息:
生产者将消息发送到延迟交换机(
delay.exchange
),并设置消息的 TTL。消息被路由到延迟队列(
delay.queue
)。延迟队列处理:
延迟队列没有消费者,消息会在队列中等待,直到 TTL 到期。
TTL 到期后,消息被转发到死信交换机(
dlx.exchange
)。死信队列处理:
死信交换机将消息路由到死信队列(
dlx.queue
)。死信队列的消费者消费这些消息。