深入浅出消息队列----【如何保证消息不丢失?】
- 消息流转链路
- 生成且发送消息流程
- 存储流程
- 消费流程
本文仅是文章笔记,整理了原文章中重要的知识点、记录了个人的看法
文章来源:编程导航-鱼皮【yes哥深入浅出消息队列专栏】
消息流转链路
消息从生产者产生发送至 Broker,Broker 存储消息等待消费者来拉取,消费者从 Broker 拉取消息。
所以,如果要保证消息不丢失,那么从整个链路来看需要保证三大流程中消息都不丢失,缺一不可。
- 发送
- 存储
- 消费
生产者需要保证消息一定被完整的发送并存储至 Broker 中。
Broker 需要保证已经存储的消息不会丢失,比如 Broker 重启、宕机后还存在。
消费者需要保证拉取的消息一定被消费,比如消费一半重启了,需要确保还未消费的消息后续也能被消费。
生成且发送消息流程
消息从生产者产生,而且消息的生成往往伴随着某个业务,比如下单就加积分这个业务场景。
我们会在代码里先保存订单,然后发送消息让积分系统给对应的用户加积分:
public boolean addOrder(xx) {
// do sth
saveOrder(); // 保存订单
sendMessage(); // 发送加积分消息
}
我们需要确保订单保存成功,积分消息一定要发送成功,不然消息就丢了。
这里就涉及了请求确认机制,即 ack,学过 TCP 协议应该都会知道 ack。
简单来说就是生产者与 Broker 交互通过 ack 来确认消息的成功接收。
当生产者发送消息给 Broker 后,如果 Broker 接收到这条消息,那么就返回 ack 给生产者,一旦生产者收到了 ack,那么就知道消息已经被 Broker 成功接收了,因此保证发送阶段消息不会丢失。
如果 Broker 没有返回 ack,那么咋办?
不管是网络原因还是什么原因,只要这时候生产者超时等待没收到 ack,那么就需要重试。
当然不可能无限重试,这样就会阻塞后续的业务流程,像 RocketMQ 默认重试 3 次失败后就会返回错误或直接抛出异常,这时需要人工介入处理。
因此,在使用的时候需要关注发送结果或者异常情况。
public boolean addOrder(xx) {
// do sth
saveOrder(); // 保存订单
try {
SendResult senResult = senMessage(); // 发送加积分消息
if (sendResult...) {
recordSend(); // 记录失败消息
}
} catch (Exception e) {
log.error("send msg error");
recordSend(); // 记录失败消息
}
}
这种情况下消息发送异常,我们不能影响正常的下单流程,因此记录下错误日志,然后把发送积分的消息保存到数据库中,后续再通过定时任务来补偿这些没有加上的积分。
很多同学的处理可能就是直接抛错,但是抛错意味着 addOrder 这个方法报错了,那么用户下单就报错了,体验就很差,这里需要注意:不能让非主流程的功能影响主流程的功能,下单是主流程,加积分是非主流程。
同步发送可以通过 try-catch 来捕获异常,如果是异步发送,需要记得处理 onException 的逻辑:
// 异步发送消息,发送结果通过 callback 返回给客户端
producer.send(msg, new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
// do sth
}
@Override
public void onException(Throwable e) {
// 记得处理发送失败场景
log.error("send msg error");
recordSend();
}
});
小结
在生产阶段,利用请求确认机制,保证消息发送成功,如果没收到 ack 那么需要重试,如果重试失败,那么看场景分析。
可以直接将整个业务方法失败使得流程报错,这样业务没处理成功自然也不存在消息的丢失(好比下单没下成功,自然不需要发送积分)。
可以让业务的处理都正常,然后通过落库等其它手段保存这个消息,后续利用定时任务在尝试发送或其它手段补偿,来确保消息不丢失。
存储流程
当 Broker 返回 ack 给生产者之后,生产者认为消息已经被成功存储至 Broker,因此后续不会再发送这个条消息。
因此 Broker 返回 ack 给生产者之前需要确保消息真的已经被成功存储了。
RocketMQ 的消息默认是异步刷盘,也就是消息先写入到 pageCache 中,即文件系统的缓存,然后等待操作系统或定时刷盘任务再将消息刷到磁盘上。
也就是默认情况下,当消息写入到缓存中,Broker 就给生产者返回 ack 了。
如果 Broker 正常运行肯定是没问题的,但突然断电,那么 pageCache 里还未刷盘的消息就没了,此时消息就真的丢了!
生产者就懵逼了,明明说好的 ack,怎么消息说没就没了呢?
所以如果一定要确保消息不被丢失,那么需要将 Broker 的刷盘配置改成同步刷盘,即 flushDiskType 配置为 SYNC_FLUSH。
这样生产者接到 ack 的消息都是已经被刷到磁盘文件上的,断电了也不影响。
那么磁盘损坏了呢?
默认情况下如果磁盘损坏了确实消息也就丢了,不过现在有很多磁盘阵列可以保证磁盘上内容的可靠性,简单理解就是备份,这里损坏了还能从另一个地方读。
还有其实生产上 Broker 可能是有很多台组成集群使用的。
在保证消息不丢失的场景,需要设置同步复制,这样当 master 挂了需要 salve 顶上的时候,才能保证 salve 的消息是全的。
小结
单台 Broker 需要保证同步刷盘,还有磁盘阵列来保证消息不会丢失。不过这里需要注意,同步刷盘的性能会差一些。
集群架构需要保证同步复制,这样当 salve 被消费的时候提供的消息才是全的。
消费流程
新手在实现消费流程的时候,很容易丢失消息。
消息是有点位提交的,consumer 需要上报给 Broker 已经消费到的设置,这样假设 consumer 重启后,也可以从 Broker 获取之前的消费位置,然后往后消费消息。
因此需要保证,上报给 Broker 的点位必须是已经被消费过的消息。
具体例子,消费者直接从 Broker 拉取了 50 条消息,分别是从 1-50,当消息到达消费者内存后,消费者直接将消息都提交到线程池中进行异步处理,然后直接返回消费成功(消费成功本质就是点位提交,RocketMQ pushConsumer 封装了这个功能)。
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List msgs, ConsumeConcurrentlyContext context) {
// 将消息提交到线程池
executorPool.execute(xxxx);
// 返回消息消费状态,为消费成功
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
这样实现的话,消费者客户端会认为这些消息已经被消费了,然后给 Broker 提交了点位,已经消费到 50 了,下一次拉取从 51 开始。
此时,假设消费者程序宕机了,意外重启了,线程池的任务队列可是内存队列,那么线程池里面还未消费的消息是不是就没了?
但重启后消费者从 Broker 拿到的消费点位已经从 51 开始了,前面 50 条认为已经消费完了,所以之前那些在线程池任务队列排队的还未被消费的消息就丢失了。
所以需要确保:只有在对应消息的业务流程处理完毕后,再给 Broker 返回消费确认,提交点位。
小结
消费者消费消息的时候,只有当对应的业务都处理完了,再返回消息的正确消费即消息点位的提交。
注意 MessageListener 里面的一部处理,容易丢失消息。