描述:在消息投递的过程中可能会存在消息丢失的行为产生,生产者到交换机,交换机到队列的过程都有可能出现这个现象。所以我们要有个发布确认的操作来防止消息丢西。
确认机制方案:
配置文件配置交换机发布确认模式:
publisher-confirm-type=correlated
NONE:禁用发布确认模式,是默认值
CORRELATED:发布消息成功到交换器后会触发回调方法
SIMPLE:经测试有两种效果,其一效果和 CORRELATED 值一样会触发回调方法,
其二在发布消息成功后使用 rabbitTemplate 调用 waitForConfirms 或 waitForConfirmsOrDie 方法
等待 broker 节点返回发送结果,根据返回结果来判定下一步的逻辑,要注意的点是
waitForConfirmsOrDie 方法如果返回 false 则会关闭 channel,则接下来无法发送消息到 broker
交换机发布确认和路由回退分别需要实现RabbitTemplate.ConfirmCallback,RabbitTemplate.ReturnsCallback接口
@Component
public class MyCallBack implements RabbitTemplate.ConfirmCallback,RabbitTemplate.ReturnsCallback {
@Resource
RabbitTemplate rabbitTemplate;
@PostConstruct
public void init(){
//将类注入接口
rabbitTemplate.setConfirmCallback(this); //交换机发布确认
//路由回退配置
rabbitTemplate.setMandatory(true);
//设置回退消息交给谁处理
rabbitTemplate.setReturnsCallback(this);
}
@Override
public void confirm(CorrelationData correlationData, boolean b, String s) { //id ,ack ,结果描述
if (b){
log.info("收到回调id:{}",correlationData.getId());
}else {
log.info("交换机还未收到 id 为:{}消息,由于原因:{}",correlationData.getId(),s);
}
}
@Override
public void returnedMessage(ReturnedMessage returnedMessage) {
log.info("路由:{}---回退的信息:{}",returnedMessage.getRoutingKey(),returnedMessage.getMessage());
}
}
测试:
路由和交换机声明与绑定
public static final String CONFIRM_EXCHANGE_NAME = "confirm.exchange";
public static final String CONFIRM_QUEUE_NAME = "confirm.queue";
public static final String BACKUP_EXCHANGE_NAME = "backup.exchange";
public static final String BACKUP_QUEUE_NAME = "backup.queue";
public static final String WARNING_QUEUE_NAME = "warning.queue";
@Bean
public DirectExchange confirmExchange(){
Map<String, Object> map = new HashMap<>();
map.put("alternate-exchange", BACKUP_EXCHANGE_NAME);
ExchangeBuilder exchangeBuilder = ExchangeBuilder.directExchange(CONFIRM_EXCHANGE_NAME)
.durable(true)//持久化
.withArguments(map);//设置备份交换机
return (DirectExchange) exchangeBuilder.build();
}
@Bean
public Binding bindingConfirmExchange(@Qualifier("confirmQueue") Queue queue,@Qualifier("confirmExchange") DirectExchange directExchange){
return BindingBuilder.bind(queue).to(directExchange).with("key");
}
//声明备份 Exchange
@Bean("backupExchange")
public FanoutExchange backupExchange(){
return new FanoutExchange(BACKUP_EXCHANGE_NAME);
}
// 声明警告队列
@Bean("warningQueue")
public Queue warningQueue(){
return QueueBuilder.durable(WARNING_QUEUE_NAME).build();
}
// 声明报警队列绑定关系
@Bean
public Binding warningBinding(@Qualifier("warningQueue") Queue queue,
@Qualifier("backupExchange") FanoutExchange
backupExchange){
return BindingBuilder.bind(queue).to(backupExchange);
}
// 声明备份队列
@Bean("backQueue")
public Queue backQueue(){
return QueueBuilder.durable(BACKUP_QUEUE_NAME).build();
}
// 声明备份队列绑定关系
@Bean
public Binding backupBinding(@Qualifier("backQueue") Queue queue,
@Qualifier("backupExchange") FanoutExchange backupExchange){
return BindingBuilder.bind(queue).to(backupExchange);
}
生产者:
@GetMapping("sendConfirmMsg/{message}")
public void sendConfirmMsg(@PathVariable String message) {
CorrelationData correlationData1 = new CorrelationData("1");
rabbitTemplate.convertAndSend(TtlQueueConfig.CONFIRM_EXCHANGE_NAME, "key", message,correlationData1);
log.info(" 当 前 时 间 : {}, 发送一条发布确认信息给队列 confirm.queue:{}", new Date(), message);
CorrelationData correlationData2 = new CorrelationData("2");
rabbitTemplate.convertAndSend(TtlQueueConfig.CONFIRM_EXCHANGE_NAME, "key2", message,correlationData2);
log.info(" 当 前 时 间 : {}, 发送一条发布确认信息给路由不存在的队列 confirm.queue:{}", new Date(), message);
}
队列监听消费者:
@RabbitListener(queues = CONFIRM_QUEUE_NAME)
public void receiveCONFIRMQueue(Message message){
String msg = new String(message.getBody());
log.info("当前时间:{},收到发布确认队列的消息:{}", new Date().toString(), msg);
}
@RabbitListener(queues = "warning.queue")
public void receiveWarningQueue(Message message){
String msg = new String(message.getBody());
//log.info("当前时间:{},收到备份交换机的警告队列的消息:{}", new Date().toString(), msg);
log.error("报警发现不可路由消息:{}", msg);
}
总结:监听者收到的ack为false则消息没有投递成功,交换机配置了备份交换机,优先级比路由回退的高,如果交换机到队列的消息没有投递成功,则可以通过备份交换机的监听队列再次去投递消息。