惰性队列工作原理
惰性队列通过尽可能多地将消息存储到磁盘上来减少内存的使用。与传统队列相比,惰性队列不会主动将消息加载到内存中,而是尽量让消息停留在磁盘上,从而降低内存占用。尽管如此,它并不保证所有操作都是同步写入磁盘的。这意味着消息可能会先被缓存到操作系统的缓冲区中,然后由操作系统决定何时将其真正写入磁盘。
- 优点:适合处理大量消息且对内存压力敏感的场景。
- 缺点:由于频繁的磁盘I/O操作,性能可能不如传统队列。
同步刷盘的概念
同步刷盘意味着每次写入操作都会等待数据完全写入磁盘后才返回确认信息。虽然这种方式提供了更强的数据持久性保证,但它也显著增加了写入操作的延迟。对于RabbitMQ而言,可以通过设置消息为持久化来增加数据的安全性,但对于极端情况下的数据安全性要求,还需要结合其他策略如调整操作系统参数或使用文件系统级别的同步写入配置。
延迟插件的工作原理
RabbitMQ本身没有内置的延迟队列功能,但可以通过安装rabbitmq_delayed_message_exchange
插件实现这一功能。该插件允许创建一个自定义交换机类型,该交换机能够根据消息头中的延迟时间属性来延迟消息的传递。
在Spring Boot中集成RabbitMQ惰性队列和延迟消息
1. 项目初始化
首先,确保你的Spring Boot项目中包含必要的依赖:
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
</dependencies>
2. 配置RabbitMQ连接
在application.yml
中配置RabbitMQ连接信息:
spring:
rabbitmq:
host: localhost
port: 5672
username: guest
password: guest
3. 定义惰性队列
创建一个配置类来定义惰性队列:
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.HashMap;
import java.util.Map;
@Configuration
public class RabbitMqConfig {
/**
* 定义惰性模式的队列
* @return 返回惰性队列实例
*/
@Bean
public Queue lazyQueue() {
Map<String, Object> args = new HashMap<>();
// 设置队列为惰性模式
args.put("x-queue-mode", "lazy");
return new Queue("my_lazy_queue", true, false, false, args); // durable=true for queue durability
}
}
4. 发送持久化消息
创建一个服务类用于发送消息,并确保消息是持久化的:
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
@Service
public class MessageSender {
@Autowired
private RabbitTemplate rabbitTemplate;
/**
* 发送一条持久化消息到惰性队列
* @param message 要发送的消息内容
*/
public void sendMessage(String message) {
rabbitTemplate.convertAndSend("my_lazy_queue", message);
System.out.println(" [x] Sent '" + message + "'");
}
}
确保消息持久化可以在application.yml
中设置如下:
spring:
rabbitmq:
template:
exchange: ''
routing-key: 'my_lazy_queue'
mandatory: true
publisher-confirms: true
publisher-returns: true
5. 接收消息
创建一个监听器来接收消息:
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
public class MessageReceiver {
/**
* 监听并接收来自惰性队列的消息
* @param message 接收到的消息内容
*/
@RabbitListener(queues = "my_lazy_queue")
public void receiveMessage(String message) {
System.out.println(" [x] Received '" + message + "'");
}
}
6. 使用延迟插件发送延迟消息
首先,在RabbitMqConfig
中定义延迟交换机:
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.CustomExchange;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
@Configuration
public class RabbitMqConfig {
/**
* 定义延迟交换机
* @return 返回延迟交换机实例
*/
@Bean
public CustomExchange delayExchange() {
Map<String, Object> args = new HashMap<>();
args.put("x-delayed-type", "direct");
return new CustomExchange("delayed_exchange", "x-delayed-message", true, false, args);
}
/**
* 绑定延迟队列到延迟交换机
* @param delayedQueue 延迟队列
* @param delayExchange 延迟交换机
* @return 返回绑定实例
*/
@Bean
public Binding binding(Queue delayedQueue, CustomExchange delayExchange) {
return new Binding("delayed_queue", Binding.DestinationType.QUEUE, "delayed_exchange", "routing.key", Collections.emptyMap());
}
/**
* 定义延迟队列
* @return 返回延迟队列实例
*/
@Bean
public Queue delayedQueue() {
return new Queue("delayed_queue");
}
}
然后,创建一个服务类来发送延迟消息:
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.MessagePostProcessor;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Service;
@Service
public class DelayedMessageSender {
@Autowired
private RabbitTemplate rabbitTemplate;
/**
* 发送带有延迟的消息
* @param message 要发送的消息内容
* @param delayTime 延迟时间(毫秒)
*/
public void sendDelayedMessage(String message, int delayTime) {
MessagePostProcessor messagePostProcessor = message -> {
message.getMessageProperties().setHeader("x-delay", delayTime);
return message;
};
rabbitTemplate.convertAndSend("delayed_exchange", "routing.key", message, messagePostProcessor);
System.out.println(" [x] Sent '" + message + "' with delay.");
}
}
最后,创建一个监听器来接收延迟消息:
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
public class DelayedMessageReceiver {
/**
* 监听并接收来自延迟队列的消息
* @param message 接收到的消息内容
*/
@RabbitListener(queues = "delayed_queue")
public void receiveDelayedMessage(String message) {
System.out.println(" [x] Received delayed message '" + message + "'");
}
}
高级特性和最佳实践
-
发布确认机制:为了提高可靠性,可以开启发布确认机制,以确保消息确实被RabbitMQ服务器接受。
rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {
if (ack) {
System.out.println("Message acknowledged");
} else {
System.err.println("Message not acknowledged due to: " + cause);
}
});
-
预取计数(Prefetch Count):通过设置预取计数限制每个消费者同时处理的消息数量,有助于防止消费者被过多未处理的消息压垮。
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class RabbitMqConnectionConfig {
@Bean
public CachingConnectionFactory cachingConnectionFactory() {
CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
connectionFactory.setHost("localhost");
connectionFactory.setPort(5672);
connectionFactory.setUsername("guest");
connectionFactory.setPassword("guest");
connectionFactory.setChannelCacheSize(25);
connectionFactory.getRabbitConnectionFactory().setRequestedChannelMax(200);
return connectionFactory;
}
}
可以在application.yml
中设置:
spring:
rabbitmq:
listener:
simple:
prefetch: 10