1.消息可靠性
三种丢失的情形:
1.1 生产者确认机制
启动MQ
创建Queues:
两种Callback:
1.ReturnCallback:全局callback
2.ComfirmCallback: 发送信息时候设置
@Test
public void testSendMessage2SimpleQueue() throws InterruptedException {
// 1.准备消息
String message = "hello, spring amqp!";
// 2.准备CorrelationData
// 2.1.消息ID
CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
// 2.2.准备ConfirmCallback
correlationData.getFuture().addCallback(result -> {
// 判断结果
if (result.isAck()) {
// ACK
log.debug("消息成功投递到交换机!消息ID: {}", correlationData.getId());
} else {
// NACK
log.error("消息投递到交换机失败!消息ID:{}", correlationData.getId());
// 重发消息
}
}, ex -> {
// 记录日志
log.error("消息发送失败!", ex);
// 重发消息
});
// 3.发送消息
rabbitTemplate.convertAndSend("amq.topic", "a.simple.test", message, correlationData);
}
执行成功:
监控页面:
模拟失败:
1.投递到交互机失败
2.投递到交换机了,但是没有进入队列
1.2 消息持久化
注意: 生产者确认只能保证数据放到队列当中,但是无法保证数据不丢失(比如所在的机器宕机了),
所以还需要保证数据的持久化
@Configuration
public class CommonConfig {
@Bean
public DirectExchange simpleDirect(){
return new DirectExchange("simple.direct");
}
@Bean
public Queue simpleQueue(){
return QueueBuilder.durable("simple.queue").build();
}
}
@Test
public void testDurableMessage() {
// 1.准备消息 消息持久化
Message message = MessageBuilder.withBody("hello, spring".getBytes(StandardCharsets.UTF_8))
.setDeliveryMode(MessageDeliveryMode.PERSISTENT)
.build();
// 2.发送消息
rabbitTemplate.convertAndSend("simple.queue", message);
}
注意:
//交换机不传值默认就是持久化
//交换机、队列、消息默认都是持久化
@Bean
public DirectExchange simpleDirect(){
return new DirectExchange("simple.direct");
}
public AbstractExchange(String name) {
this(name, true, false);
}
演示数据是否默认持久化:
重启mq:
1. 交互机、队列、消息都做持久化
2.消费者端关闭防止被消费
3.重启mq后看队列中数据是否还在(是否持久化)
1.3 消费者消息确认
生产者确认:能确定消息投递到队列
消息持久化:能避免MQ宕机造成的消息丢失
生产者确认和消息持久化能保证消息能投递到消费者,但是无法保证消息被消费者消费(比如投递消费者的
同时,消费者所在机器宕机了)
1.manual:不推荐 代码侵入
try{
//业务逻辑
ack
} catch(ex){
nack
}
2.auto:推荐 spring全权完成,不需要手动写代码
3.none:不推荐 投递完成立马删除消息,是否成功都不管
@Slf4j
@Component
public class SpringRabbitListener {
@RabbitListener(queues = "simple.queue")
public void listenSimpleQueue(String msg) {
log.debug("消费者接收到simple.queue的消息:【" + msg + "】");
//模拟出现异常情况
System.out.println(1 / 0);
log.info("消费者处理消息成功!");
}
}
默认为none:抛出异常后消息立即被删除:
修改为auto模式:
队列返回nack会再去发送信息:
1.4 失败重试机制
演示失败重试机制:
listener:
simple:
prefetch: 1
acknowledge-mode: auto
retry:
enabled: true
initial-interval: 1000
multiplier: 3
max-attempts: 4
默认重试到达最大次数后消息就丢弃:
但是对于一些比较重要不能丢弃的消息需要使用以下策略:
推荐使用第三种方案:将失败的消息发送到失败的交换机和失败的队列中,后面可以告知管理员然后重新
人工去处理
@Configuration
public class ErrorMessageConfig {
@Bean
public DirectExchange errorMessageExchange(){
return new DirectExchange("error.direct");
}
@Bean
public Queue errorQueue(){
return new Queue("error.queue");
}
@Bean
public Binding errorMessageBinding(){
return BindingBuilder.bind(errorQueue()).to(errorMessageExchange()).with("error");
}
@Bean
public MessageRecoverer republishMessageRecoverer(RabbitTemplate rabbitTemplate){
return new RepublishMessageRecoverer(rabbitTemplate, "error.direct", "error");
}
}
演示:
发送消息:
面试题:最后一分钟的总结