问题:在生产环境中由于一些不明原因,导致 rabbitmq 重启,在 RabbitMQ 重启期间生产者消息投递失败,导致消息丢失,需要手动处理和恢复。于是,我们开始思考,如何才能进行 RabbitMQ 的消息可靠投递呢? 特别是在这样比较极端的情况,RabbitMQ 集群不可用的时候,无法投递的消息该如何处理呢?
解决方案就是缓存,比如当生产者发送消息到交换机时,但交换机不存在,我们应该将消息放入缓存中;或者交换机存在,队列不存在了,当交换机发送不到队列中也应该将消息放入缓存。然后在缓存中配置一个定时任务,对没有发送成功的消息重新进行投递。这样就避免了消息丢失的情况。
回调接口——消息确认
接下来我们通过代码实现以上机制,架构图如下所示:我们要解决问题就是如果图中的交换机或者队列出现问题,应该将消息进行缓存处理,防止消息丢失,具体的实现就是通过生产者的回调接口ConfirmCallback
来实现。
1️⃣ 修改配置文件
在配置文件当中需要添加配置表示开启发布消息成功到交换器后会触发回调方法
NONE:禁用发布确认模式,是默认值
CORRELATED:发布消息成功到交换器后会触发回调方法
SIMPLE:经测试有两种效果,其一效果和 CORRELATED 值一样会触发回调方法;其二在发布消息成功后使用 rabbitTemplate 调用 waitForConfirms 或 waitForConfirmsOrDie 方法等待 broker 节点返回发送结果,根据返回结果来判定下一步的逻辑,要注意的点是 waitForConfirmsOrDie 方法如果返回 false 则会关闭 channel,则接下来无法发送消息到 broker
👂这个层次是在交换机层次做的工作,保证消息被正确发送到了交换机。
通过实现一个RabbitTemplate.ConfirmCallback接口,将接口注入到RabbitTemplate中,当消息发送到交换机后就会触发这个回调。如果失败了可以考虑进入死信队列或者重新发送。但是做不到队列层面的工作。
/**
* 交换机确认回调方法
*
* @param correlationData 保存回调消息的ID以及相关信息
* @param ack 表示交换机是否收到消息(true表示收到)
* @param cause 表示消息接收失败的原因(收到消息为null)
*/
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
String id = correlationData != null ? correlationData.getId() : "";
if (ack) {
log.info("交换机已经收到ID为:{}的消息", id);
} else {
log.info("交换机还未收到ID为:{}的消息,原因为:{}", id, cause);
}
}
回调接口——消息回退
我们知道在仅开启了生产者确认机制的情况下,交换机接收到消息后,会直接给消息生产者发送确认消息,如果发现该消息不可路由,那么消息会被直接丢弃,但此时生产者是不知道消息被丢弃这个事件的。那么如何让无法被路由的消息能够让生产者感知并做出处理呢
我们可以通过设置 mandatory
参数可以在当消息传递过程中不可达目的地时将消息返回给生产者。
在配置文件当中需要添加配置表示开启消息路由失败后会触发消息回退回调方法
👂通过实现 RabbitTemplate.ReturnsCallback
接口
@Override
public void returnedMessage(ReturnedMessage returned) {
log.info("消息{}:,被交换机{}退回,退回原因:{},路由key:{}",
new String(returned.getMessage().getBody()),
returned.getExchange(),
returned.getReplyText(),
returned.getRoutingKey());
}
备份交换机
前面我们提到交换机如果出现了问题接受不到消息,我们就让交换机进行消息确认,让生产者重新发消息。如果队列出问题收不到消息,我们就进行消息回退,也是让生产者重新发消息。此外,还有一种解决方法就是给交换机添加一个备份交换机,有了备份交换机之后可以不用讲消息回退给生产者,而是将无法投递的消息交给备份交换机,让备份交换机通过自己的路由以及自己的队列发送给消费者,这样也能达到一个消息不丢失的目的。并且这种方式还能建立一个报警队列,用独立的消费者进行监测和报警。
当回调函数和备用交换机一起使用的时候,备份交换机优先级高。