消息发送确认机制
消息发送确认机制:消息由producer发送后,确认其是否到达broker,又是否被exchange转发至对应queue的机制
该机制分为两部分:producer---broker,exchange---queue
前者的实现依靠ConfirmCallback机制,后者的实现依靠ReturrnsCallback机制
ConfirmCallback:
实现ConfirmCallback接口,并重写confirm方法
confirm方法参数含义:
correlationData:CorrelationData类只有一个 id 属性 用于唯一标识该消息 public CorrelationData() { this.id = UUID.randomUUID().toString(); }
ack:消息是否成功传输到 broker (true表示成功传输 false表示传输失败)
cause:传输失败的原因
ReturrnsCallback:
实现ReturnsCallback接口,并重写returnedMessage方法
当消息转发失败后就会触发ReturrnsCallback,会将消息返回给生产者,同时会返回与消息转发失败的相关信息(包含在参数returned内),可对此采取后续处理
@Autowired
private RabbitTemplate rabbitTemplate;
@Autowired
private ConfirmCallbackService confirmCallbackService;
@Autowired
private ReturnCallbackService returnCallbackService;
public void sendMessage(String exchange, String routingKey, Object msg) {
/**
* 确保消息发送失败后可以重新返回到队列中
* 注意:yml需要配置 publisher-returns: true
*/
rabbitTemplate.setMandatory(true);
/**
* 消费者确认收到消息后,手动ack回执回调处理
*/
rabbitTemplate.setConfirmCallback(confirmCallbackService);
/**
* 消息投递到队列失败回调处理
*/
rabbitTemplate.setReturnCallback(returnCallbackService);
/**
* 发送消息
*/
rabbitTemplate.convertAndSend(exchange, routingKey, msg,
message -> {
message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);
return message;
},
new CorrelationData(UUID.randomUUID().toString()));
}