什么是MQ消息积压?
MQ消息积压是指消息队列中的消息无法及时处理和消费,导致队列中消息累积过多的情况。
消息积压后果:
①:消息不能及时消费,导致任务不能及时处理
②:下游消费者处理大量的消息任务,导致系统性能下降、延迟增加以及资源消耗过高
如何思考这个问题?反映的是候选者在高并发场景下的消费能力问题。
如果出现积压,那一定是性能问题,想要解决消息从生产到消费上的性能问题,就首先要知道哪些环节可能出现消息积压,然后在考虑如何解决。
因为消息发送之后才会出现积压的问题,所以和消息生产端没有关系,又因为绝大部分的消息队列单节点都能达到每秒钟几万的处理能力,相对于业务逻辑来说,性能不会出现在中间件的消息存储上面。毫无疑问,出问题的肯定是消息消费阶段,那么从消费端入手,如何回答呢?
如果是线上突发问题,要临时扩容,增加消费端的数量,与此同时,降级一些非核心的业务。通过扩容和降级承担流量,这是为了表明你对应急问题的处理能力。
其次,才是排查解决异常问题,如通过监控,日志等手段分析是否消费端的业务逻辑代码出现了问题,优化消费端的业务处理逻辑。
最后,如果是消费端的处理能力不足,可以通过水平扩容来提供消费端的并发处理能力,但这里有一个考点需要特别注意, 那就是在扩容消费者的实例数的同时,必须同步扩容主题 Topic 的分区数量,确保消费者的实例数和分区数相等。如果消费者的实例数超过了分区数,由于分区是单线程消费,所以这样的扩容就没有效果。
消息积压可能的问题
- 生产者:
- 消息冗余下发
- 消息队列
- 分区设置不合理
- 消费者
- 消费服务宕机
- 消费能力不足
- 消费线程卡死
MQ消息积压解决方法:
1、消费端:
①:检查消费服务是否在正常消费
消费服务是否宕机、消费线程是否卡死,可使用jstack导出堆栈信息排查消费卡死原因
②:增加消费者数量。
若消费者数量小于积压topic分区的数量,通过增加消费者的数量来提高消息的处理速度。可以动态调整消费者的数量,根据积压的数量和消费速度来决定是否增加或减少消费者的数量
③:优化消费逻辑,提高消费者的处理能力
优化消费端的代码逻辑和处理过程,提高消费端的处理能力。可以使用多线程或多进程来并发处理消息,或者采用分布式处理方式,将消息分配给多个消费者处理
④:消息过滤
在消息处理之前先通过业务逻辑对消息进行过滤,如果是无效的消息,则直接提交offset,跳过业务处理,避免占用资源
⑤:设置超时机制
可以设置超时时间,并在超时后对消息进行重新处理或者进行补偿操作
这种时候只能操作临时扩容,以更快的速度去消费数据了。
具体线上的操作步骤和思路如下: ①先修复consumer的问题,确保其恢复消费速度,然后将现有consumer都停掉。
②临时建立好原先10倍或者20倍的queue数量(新建一个topic,partition是原来的10倍)。
③然后写一个临时分发消息的consumer程序,这个程序部署上去消费积压的消息,消费之后不做耗时处理,直接均匀轮询写入临时建好分10数量的queue里面。
④紧接着征用10倍的机器来部署consumer,每一批consumer消费一个临时queue的消息。
⑤这种做法相当于临时将queue资源和consumer资源扩大10倍,以正常速度的10倍来消费消息。
⑥等快速消费完了之后,恢复原来的部署架构,重新用原来的consumer机器来消费消息
2、消息队列
①:扩容MQ服务器
如果MQ服务器性能达到瓶颈,可以考虑增加MQ服务器的数量或者升级硬件配置,以提高MQ的吞吐量和处理能力
②:增加topic分区(和下游增加消费者结合使用)
如果topic分区数较少(下游消费组中消费者数量大于分区数量),可以通过增加分区的数量,使下游消费组中的每个消费者都能够消费到分区,以此来提高下游的消费能力
③:数据清理机制
定期清理过期和无效的消息。避免队列中存在大量无效的消息占用资源
④:性能优化和调优
对MQ的性能进行优化和调优,包括调整MQ的参数配置、网络优化、硬件优化等,以提高MQ的吞吐量和稳定性
设置了过期时间:
假设你用的是rabbitmq,rabbitmq是可以设置过期时间的,就是TTL,如果消息在queue中积压超过一定的时间就会被rabbitmq给清理掉,这个数据就没了。那这就是第二个坑了。这就不是说数据会大量积压在mq里,而是大量的数据会直接搞丢。 解决方案: 这种情况下,实际上没有什么消息挤压,而是丢了大量的消息。所以第一种增加consumer肯定不适用。 这种情况可以采取 “批量重导” 的方案来进行解决。 在流量低峰期(比如夜深人静时),写一个程序,手动去查询丢失的那部分数据,然后将消息重新发送到mq里面,把丢失的数据重新补回来。
3、生产者
①:避免冗余下发消息
- 如果消费者的过滤规则,会过滤掉这条消息不进行处理,则在生产者端就应该判断不进行下发
- 避免一个消息重复下发多次
②:根据消息的优先级,使用多个topic
根据消息的重要性和紧急程度,调整消息的优先级。优先处理重要的消息,确保关键业务的及时性,而对于非关键的消息可以进行降级处理或延后处理。避免大量非关键消息写入队列topic影响关键消息的消费
如使用高优队列、普通队列、慢速队列,来处理不同优先级的消息
③:监控和报警
实时监控MQ的消息积压情况,设置阈值并触发报警机制。当消息积压超过一定阈值时,及时发出报警通知,以便及时采取措施解决问题
④:逃生机制(兜底方案)
通过监控如果发现消息一直未到达下游,启用逃生机制,如直接调用下游的接口推送消息(只推送关键消息)
怎么解决消息被重复消费的问题?”之外,面试官还会问到你“消息积压”。 原因在于消息积压反映的是性能问题,解决消息积压问题,可以说明候选者有能力处理高并发场景下的消费能力问题。
你在解答这个问题时,依旧要传递给面试官一个这样的思考过程:
比如在 Kafka 中,一个 Topic 可以配置多个 Partition(分区),数据会被写入到多个分区中,但在消费的时候,Kafka 约定一个分区只能被一个消费者消费,Topic 的分区数量决定了消费的能力,所以,可以通过增加分区来提高消费者的处理能力。
https://www.cnblogs.com/yangyongjie/p/17644874.html
https://juejin.cn/post/6844903849107406856
MQ 消息积压问题与解决方案-CSDN博客