二八佳人体似酥,腰间仗剑斩愚夫。虽然不见人头落,暗里教君骨髓枯。
在生产环境中由于一些不明原因,导致 rabbitmq 重启,在 RabbitMQ重启期间生产者消息投递失败,
导致消息丢失,需要手动处理和恢复。于是,我们开始思考,如何才能进行 RabbitMQ的消息可靠投递呢? 特别是在这样比较极端的情况,RabbitMQ集群不可用的时候,无法投递的消息该如何处理呢:
应用 [xxx] 在 [08-1516:36:04] 发生 [ 错误日志异常 ] , alertId=[xxx] 。由 [org.springframework.amqp.rabbit.listener.BlockingQueueConsumer:start:620] 触 发
应用 xxx 可能原因如下 服
务 名 为 :
异常为: org.springframework.amqp.rabbit.listener.BlockingQueueConsumer:start:620, 产 生 原 因 如 下 :1.org.springframework.amqp.rabbit.listener.QueuesNotAvailableException: Cannot prepare queue for listener. Either the queue doesn’t exist or the broker will not allow us to use it.||Consumer received fatal=false exception on startup:
发布确认
配置文件 application.yml
在配置文件当中需要添加
spring.rabbitmq.publisher-confirm-type=correlated
-
NONE 禁用发布确认模式,是默认值
-
CORRELATED 发布消息成功到交换器后会触发回调方法
-
SIMPLE 经测试有两种效果,其一效果和 CORRELATED值一样会触发回调方法,
其二在发布消息成功后使用 rabbitTemplate 调用waitForConfirms 或 waitForConfirmsOrDie 方法 等待 broker 节点返回发送结果,根据返回结果来判定下一步的逻辑,要注意的点是 waitForConfirmsOrDie 方法如果返回 false 则会关闭 channel,则接下来无法发送消息到 broker
消息交换机队列配置 ConfirmConfig
@Component
public class ConfirmConfig {
@Value("${rabbit.confirm.queue}")
private String queue;
@Value("${rabbit.confirm.exchange}")
private String exchange;
@Value("${rabbit.confirm.routing-key}")
private String routingKey;
@Bean(value="confirm_direct_exchange")
DirectExchange confirmDirectExchange(){
return new DirectExchange(exchange);
}
@Bean(value = "confirm_queue")
public Queue confirmQueue() {
return new Queue(queue);
}
//进行绑定
@Bean
Binding confirmBindingTopicExchange(@Qualifier("confirm_queue") Queue queue,
@Qualifier("confirm_direct_exchange") DirectExchange directExchange) {
return BindingBuilder.bind(queue).to(directExchange).with(routingKey);
}
}
消息消费者 ConfirmConsumer
@Component
@Slf4j
public class ConfirmConsumer {
/**
* 确认队列
*/
@RabbitListener(queues = {"${rabbit.confirm.queue}"})
public void receiveMsg(Message message) {
String msg = new String(message.getBody());
log.info("接受到队列 confirm.queue 消息:{}", msg);
}
设置确认 ack 回调接口 MyCallBack
@Component
@Slf4j
public class MyCallBack implements RabbitTemplate.ConfirmCallback {
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
String id = Optional.ofNullable(correlationData).map(n -> n.getId()).orElse("0");
if (ack) {
log.info("交换机已经收到 id 为:{}的消息", id);
} else {
log.info("交换机还未收到 id 为:{}消息,由于原因:", id, cause);
}
}
}
生产者生产消息 ConfirmProduer
@RestController
@Slf4j
public class ConfirmProduer {
@Value("${rabbit.confirm.exchange}")
private String exchange;
@Value("${rabbit.confirm.routing-key}")
private String routingKey;
@Resource
private RabbitTemplate rabbitTemplate;
@Resource
private MyCallBack myCallBack;
@PostConstruct
public void initCallBack() {
rabbitTemplate.setConfirmCallback(myCallBack);
}
@GetMapping("/confirmSendMessage/{message}")
public String sendMessage(@PathVariable("message") String message) {
log.info("接收到消息:{}", message);
CorrelationData correlationData1 = new CorrelationData();
correlationData1.setId("1");
// 发送到 A 队列
rabbitTemplate.convertAndSend(exchange, routingKey, "编号为1:" + message, correlationData1);
CorrelationData correlationData2 = new CorrelationData("2");
correlationData2.setId("2");
//发送到B队列
rabbitTemplate.convertAndSend(exchange, "12222", "编号为2:" + message, correlationData2);
return message;
}
}
验证
输入网址: http://localhost:8088/Server/confirmSendMessage/你好啊,亲
可以看到,发送了两条消息,第一条消息的 RoutingKey 为 “key1”,第二条消息的 RoutingKey 为
“12222”,两条消息都成功被交换机接收,也收到了交换机的确认回调,
但消费者只收到了一条消息,因为 第二条消息的 RoutingKey 与队列的 BindingKey 不一致,
也没有其它队列能接收这个消息,所有第二条 消息被直接丢弃了。
回退消息
Mandatory 参数
在仅开启了生产者确认机制的情况下,交换机接收到消息后,会直接给消息生产者发送确认消息,如
果发现该消息不可路由,那么消息会被直接丢弃,此时生产者是不知道消息被丢弃这个事件的。那么如何 让无法被路由的消息帮我想办法处理一下?最起码通知我一声,我好自己处理啊。
通过设置mandatory 参 数可以在当消息传递过程中不可达目的地时将消息返回给生产者.
回调接口 MyCallBack
实现 RabbitTemplate.ReturnCallback 接口, returnedMessage 方法
@Component
@Slf4j
public class MyCallBack implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnCallback {
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
String id = Optional.ofNullable(correlationData).map(n -> n.getId()).orElse("0");
if (ack) {
log.info("交换机已经收到 id 为:{}的消息", id);
} else {
log.info("交换机还未收到 id 为:{}消息,由于原因:", id, cause);
}
}
@Override
public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
log.info("消费 {} 被退回,退回原因是:{}, 交换机是:{},路由key是:{}", new String(message.getBody()), replyText, exchange, routingKey);
}
}
生产者 ConfirmProduer2
先将 com.yjl.amqp.config.confirm.ConfirmProduer#initCallBack 方法内容 注释掉
// @PostConstruct
// public void initCallBack() {
// rabbitTemplate.setConfirmCallback(myCallBack);
// }
@RestController
@Slf4j
public class ConfirmProduer2 implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnCallback {
@Value("${rabbit.confirm.exchange}")
private String exchange;
@Value("${rabbit.confirm.routing-key}")
private String routingKey;
@Resource
private RabbitTemplate rabbitTemplate;
@PostConstruct
public void initCallBack() {
rabbitTemplate.setConfirmCallback(this);
// 设置开启
/**
* true:
* 交换机无法将消息进行路由时,会将该消息返回给生产者
* false:
* 如果发现消息无法进行路由,则直接丢弃
*/
rabbitTemplate.setMandatory(true);
rabbitTemplate.setReturnCallback(this);
}
@GetMapping("/confirmSendMessage2/{message}")
public String sendMessage(@PathVariable("message") String message) {
log.info("接收到消息:{}", message);
CorrelationData correlationData1 = new CorrelationData();
correlationData1.setId("1");
// 发送到 A 队列
rabbitTemplate.convertAndSend(exchange, routingKey, "编号为1:" + message, correlationData1);
CorrelationData correlationData2 = new CorrelationData("2");
correlationData2.setId("2");
//发送到B队列
rabbitTemplate.convertAndSend(exchange, "12222", "编号为2:" + message, correlationData2);
return message;
}
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
String id = Optional.ofNullable(correlationData).map(n -> n.getId()).orElse("0");
if (ack) {
log.info(" 生产者处理, 交换机已经收到 id 为:{}的消息", id);
} else {
log.info("生产者处理, 交换机还未收到 id 为:{}消息,由于原因:", id, cause);
}
}
@Override
public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
log.info("生产者处理, 消费 {} 被退回,退回原因是:{}, 交换机是:{},路由key是:{}", new String(message.getBody()), replyText, exchange, routingKey);
}
}
相当于 生产者 弄个事件监听
验证
请求信息: http://localhost:8088/Server/confirmSendMessage2/生产者处理消息
备份交换机
有了 mandatory 参数和回退消息,我们获得了对无法投递消息的感知能力,有机会在生产者的消息
无法被投递时发现并处理。
但有时候,我们并不知道该如何处理这些无法路由的消息,最多打个日志,然 后触发报警,再来手动处理。而通过日志来处理这些无法路由的消息是很不优雅的做法,特别是当生产者 所在的服务有多台机器的时候,手动复制日志会更加麻烦而且容易出错。而且设置 mandatory 参数会增 加生产者的复杂性,需要添加处理这些被退回的消息的逻辑。如果既不想丢失消息,又不想增加生产者的 复杂性,该怎么做呢?前面在设置死信队列的文章中,我们提到,可以为队列设置死信交换机来存储那些 处理失败的消息,可是这些不可路由消息根本没有机会进入到队列,因此无法使用死信队列来保存消息。 在 RabbitMQ中,有一种备份交换机的机制存在,可以很好的应对这个问题。什么是备份交换机呢?备份 交换机可以理解为 RabbitMQ 中交换机的“备胎”,当我们为某一个交换机声明一个对应的备份交换机时, 就是为它创建一个备胎,当交换机接收到一条不可路由消息时,将会把这条消息转发到备份交换机中,由 备份交换机来进行转发和处理,通常备份交换机的类型为 Fanout ,这样就能把所有消息都投递到与其绑 定的队列中,然后我们在备份交换机下绑定一个队列,这样所有那些原交换机无法被路由的消息,就会都 进入这个队列了。当然,我们还可以建立一个报警队列,用独立的消费者来进行监测和报警。
配置 备份 ConfirmConfig
将之前的 交换机 confirm_exchange_1 和 队列 confirm_queue 均删除
@Component
public class ConfirmConfig {
@Value("${rabbit.confirm.queue}")
private String queue;
@Value("${rabbit.confirm.exchange}")
private String exchange;
@Value("${rabbit.confirm.routing-key}")
private String routingKey;
// 配置未达的那些
@Value("${rabbit.confirm.backup_queue}")
private String backupQueue;
@Value("${rabbit.confirm.backup_exchange}")
private String backupExchange;
@Value("${rabbit.confirm.warn_queue}")
private String warnQueue;
// @Bean(value="confirm_direct_exchange")
// DirectExchange confirmDirectExchange(){
// return new DirectExchange(exchange);
// }
@Bean(value = "confirm_direct_exchange")
DirectExchange confirmDirectExchange() {
ExchangeBuilder exchangeBuilder = ExchangeBuilder.directExchange(exchange)
.durable(true)
//设置备份
.withArgument("alternate-exchange", backupExchange);
return exchangeBuilder.build();
}
@Bean(value = "confirm_queue")
public Queue confirmQueue() {
return new Queue(queue);
}
//进行绑定
@Bean
Binding confirmBindingTopicExchange(@Qualifier("confirm_queue") Queue queue,
@Qualifier("confirm_direct_exchange") DirectExchange directExchange) {
return BindingBuilder.bind(queue).to(directExchange).with(routingKey);
}
@Bean(value = "backup_confirm_direct_exchange")
FanoutExchange backupConfirmDirectExchange() {
return new FanoutExchange(backupExchange);
}
@Bean(value = "backup_confirm_queue")
public Queue backupConfirmQueue() {
return new Queue(backupQueue);
}
@Bean(value = "warn_confirm_queue")
public Queue warnConfirmQueue() {
return new Queue(warnQueue);
}
//进行绑定
@Bean
Binding backupConfirmBindingTopicExchange(@Qualifier("backup_confirm_queue") Queue queue,
@Qualifier("backup_confirm_direct_exchange") FanoutExchange directExchange) {
return BindingBuilder.bind(queue).to(directExchange);
}
//进行绑定
@Bean
Binding warnConfirmBindingTopicExchange(@Qualifier("warn_confirm_queue") Queue queue,
@Qualifier("backup_confirm_direct_exchange") FanoutExchange directExchange) {
return BindingBuilder.bind(queue).to(directExchange);
}
}
备份队列消息消费 ConfirmConsumer
/**
* 备份队列
* 备份交换机优先级高
*/
@RabbitListener(queues = {"${rabbit.confirm.warn_queue}"})
public void warnMsg(Message message) {
String msg = new String(message.getBody());
log.info("报警接收到不可接收的消息:{}", msg);
}
生产者 还是之前的 ConfirmProduer2 不变
验证
访问请求: http://localhost:8088/Server/confirmSendMessage2/生产者备份处理消息
mandatory 参数与备份交换机可以一起使用的时候,如果两者同时开启,消息究竟何去何从?
谁优先 级高,经过上面结果显示答案是备份交换机优先级高。
RabbitMQ 其他知识点
幂等性
概念
用户对于同一操作发起的一次请求或者多次请求的结果是一致的,不会因为多次点击而产生了副作用。
举个最简单的例子,那就是支付,用户购买商品后支付,支付扣款成功,但是返回结果的时候网络异常, 此时钱已经扣了,用户再次点击按钮,此时会进行第二次扣款,返回结果成功,用户查询余额发现多扣钱 了,流水记录也变成了两条。在以前的单应用系统中,我们只需要把数据操作放入事务中即可,发生错误 立即回滚,但是再响应客户端的时候也有可能出现网络中断或者异常等等
消息重复消费
消费者在消费MQ中的消息时,MQ已把消息发送给消费者,消费者在给MQ返回 ack 时网络中断,
故MQ未收到确认信息,该条消息会重新发给其他的消费者,或者在网络重连后再次发送给该消费者,但 实际上该消费者已成功消费了该条消息,造成消费者消费了重复的消息。
解决思路
MQ消费者的幂等性的解决一般使用全局 ID 或者写个唯一标识比如时间戳 或者UUID 或者订单消费
者消费MQ中的消息也可利用MQ的该 id 来判断,或者可按自己的规则生成一个全局唯一 id,每次消费消 息时用该 id 先判断该消息是否已消费过。
消费端的幂等性保障
在海量订单生成的业务高峰期,生产端有可能就会重复发生了消息,这时候消费端就要实现幂等性,
这就意味着我们的消息永远不会被消费多次,即使我们收到了一样的消息。业界主流的幂等性有两种操作:a. 唯一 ID+指纹码机制,利用数据库主键去重, b.利用 redis 的原子性去实现
唯一 ID+指纹码机制
指纹码:我们的一些规则或者时间戳加别的服务给到的唯一信息码,它并不一定是我们系统生成的,基
本都是由我们的业务规则拼接而来,但是一定要保证唯一性,然后就利用查询语句进行判断这个 id 是否存 在数据库中,优势就是实现简单就一个拼接,然后查询判断是否重复;劣势就是在高并发时,如果是单个数 据库就会有写入性能瓶颈当然也可以采用分库分表提升性能,但也不是我们最推荐的方式
优先级队列
使用场景
在我们系统中有一个订单催付的场景,我们的客户在天猫下的订单,淘宝会及时将订单推送给我们,如
果在用户设定的时间内未付款那么就会给用户推送一条短信提醒,很简单的一个功能对吧,但是,tmall 商家对我们来说,肯定是要分大客户和小客户的对吧,比如像苹果,小米这样大商家一年起码能给我们创 造很大的利润,所以理应当然,他们的订单必须得到优先处理,而曾经我们的后端系统是使用 redis 来存 放的定时轮询,大家都知道 redis 只能用 List 做一个简简单单的消息队列,并不能实现一个优先级的场景,
所以订单量大了后采用 RabbitMQ进行改造和优化,如果发现是大客户的订单给一个相对比较高的优先级, 否则就是默认优先级。
如何添加
要让队列实现优先级需要做的事情有如下事情:
队列需要设置为优先级队列,
消息需要设置消息的优先级,
消费者需要等待消息已经发送到队列中才去消费因为,
这样才有机会对消息进行排序
惰性队列
使用场景
RabbitMQ从 3.6.0 版本开始引入了惰性队列的概念。惰性队列会尽可能的将消息存入磁盘中,而在消
费者消费到相应的消息时才会被加载到内存中,它的一个重要的设计目标是能够支持更长的队列,即支持 更多的消息存储。当消费者由于各种各样的原因(比如消费者下线、宕机亦或者是由于维护而关闭等)而致 使长时间内不能消费消息造成堆积时,惰性队列就很有必要了。
默认情况下,当生产者将消息发送到 RabbitMQ的时候,队列中的消息会尽可能的存储在内存之中,
这样可以更加快速的将消息发送给消费者。即使是持久化的消息,在被写入磁盘的同时也会在内存中驻留 一份备份。当 RabbitMQ需要释放内存的时候,会将内存中的消息换页至磁盘中,这个操作会耗费较长的 时间,也会阻塞队列的操作,进而无法接收新的消息。虽然 RabbitMQ的开发者们一直在升级相关的算法, 但是效果始终不太理想,尤其是在消息量特别大的时候。
两种模式
队列具备两种模式:default 和 lazy。默认的为 default 模式,在 3.6.0 之前的版本无需做任何变更。lazy
模式即为惰性队列的模式,可以通过调用 channel.queueDeclare 方法的时候在参数中设置,也可以通过 Policy 的方式设置,如果一个队列同时使用这两种方式设置的话,那么 Policy 的方式具备更高的优先级。 如果要通过声明的方式改变已有队列的模式的话,那么只能先删除队列,然后再重新声明一个新的。
在队列声明的时候可以通过“x-queue-mode”参数来设置队列的模式,取值为“default”和“lazy”。下面示
例中演示了一个惰性队列的声明细节:
Map<String, Object> args = new HashMap<String, Object>();
args.put("x-queue-mode", "lazy");
channel.queueDeclare("myqueue", false, false, false, args);