如何保证消息的可靠性投递?
1.保证生产者向broke可靠性投递,开启ack投递成功确认,如果失败的话进行消息补偿
/**
* @author yueF_L
* @date 2023-08-10 01:32
* ConfirmCallback:消息只要被 RabbitMQ broker 接收到就会触发confirm方法。
*/
@Slf4j
@Component
public class ConfirmCallbackService implements RabbitTemplate.ConfirmCallback {
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
if (!ack) {
log.error("confirm==>发送到broker失败\r\n" +
"correlationData={}\r\n" + "ack={}\r\n" + "cause={}",
correlationData, ack, cause);
} else {
log.info("confirm==>发送到broker成功\r\n" +
"correlationData={}\r\n" + "ack={}\r\n" + "cause={}",
correlationData, ack, cause);
}
}
2. 保证消息能投敌到目标 queue
/**
* @author yueF_L
* @date 2023-08-10 01:29
* ReturnCallback:如果消息未能投递到目标 queue 里将触发returnedMessage方法。
* 若向 queue 投递消息未成功,可记录下当前消息的详细投递数据,方便后续做重发或者补偿等操作。
*/
@Slf4j
@Component
public class ReturnCallbackService implements RabbitTemplate.ReturnCallback {
@Override
public void returnedMessage(Message message, int replyCode, String replyText,
String exchange, String routingKey) {
log.info("returnedMessage==> \r\n" + "message={}\r\n" + "replyCode={}\r\n" +
"replyText={}\r\n" + "exchange={}\r\n" + "routingKey={}",
message, replyCode, replyText, exchange, routingKey);
}
}
将配置set到rabbitTemplate
/**
* @author yueF_L
* @date 2023-08-10 01:25
* 消息队列配置
*/
@Slf4j
@Configuration
@RequiredArgsConstructor
public class RabbitMQConfig {
private final ConfirmCallbackService confirmCallbackService;
private final ReturnCallbackService returnCallbackService;
@Bean
RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
// 开启失败通知
rabbitTemplate.setMandatory(true);
rabbitTemplate.setConfirmCallback(confirmCallbackService);
rabbitTemplate.setReturnCallback(returnCallbackService);
return rabbitTemplate;
}
}
yml配置
代码中的调用
@RabbitListener(queues = TtlQueueConfig.DEAD_LETTER_QUEUE_TELEPHONE_BILL)
public void receiveD(Message message, Channel channel) {
try {
try {
String msg = new String(message.getBody());
// 模拟异常,测试重试
int a = 1 / 0;
//apiService.doApiHeartChainTelephoneBillOrder(msg);
// 手动确认消息
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
log.info("当前时间:{},收到话费死信队列信息:{}", new Date(), msg);
}catch (Exception e){
//参数1:消费消息的index
//参数2:是否批量否定多个消息,设为false就与basicReject功能一样,triue的前提也是在同一个channel,且在该消息否定前存在未确认的消息
//参数3: 对异常消息的处理,true表示重排序,false表示丢弃
// 如果拒绝消息,要求mq重发的话,一直异常会进入死循环
//channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
log.error(TtlQueueConfig.DEAD_LETTER_QUEUE_TELEPHONE_BILL + " 消息反馈失败,param:{}", message.getBody());
throw e;
}
} catch (Exception e) {
log.error("监听RabbitMq、队列:" + TtlQueueConfig.DEAD_LETTER_QUEUE_TELEPHONE_BILL + "发生异常:"+ e.getMessage());
throw new CustomException("监听RabbitMq、队列:" + TtlQueueConfig.DEAD_LETTER_QUEUE_TELEPHONE_BILL + "发生异常:"+ e.getMessage());
}
}