消息堆积常见于以下几种情况:
(1)新上线的消费者功能有BUG,消息无法被消费。
(2)消费者实例宕机或因网络问题暂时无法同Broker建立连接。
(3)生产者短时间内推送大量消息至Broker,消费者消费能力不足。
(4)生产者未感知Broker消费堆积持续向Broker推送消息
处理方式:
1. 扩容消费者:
增加消费者数量可以提高消息的消费速度,从而减少消息堆积。您可以根据实际情况增加消费者的数量,确保消费者能够及时处理消息。
2. 调整消费者配置:
检查消费者的配置参数,如消费线程数、消费批量大小等,确保其能够充分利用系统资源,提高消息的消费效率。
4. 调整消息队列数量:
根据消息堆积的情况,可以考虑增加消息队列的数量。通过增加消息队列,可以提高消息的并发处理能力,加快消息的消费速度。
5. 增加消息消费的并发度:
如果消费者的处理逻辑允许并行处理消息,可以增加消息消费的并发度。通过增加并发度,可以提高消息的处理速度,减少消息堆积。
6.优化消费者代码
调整代码逻辑,减低复杂度,或者先存入redis或者数据库,后续在处理
7. 扩容RocketMQ集群:
如果以上方法无法解决消息堆积问题,可以考虑扩容RocketMQ集群,增加消息的处理能力和存储容量。
云消息队列 RocketMQ 版的消息发送至Broker节点后,配置了Group ID的客户端根据当前的消费位点,从Broker节点拉取部分消息到本地进行消费。一般情况下,客户端从Broker节点拉取消息的过程不会导致消息堆积,主要是客户端本地消费过程中,由于消费耗时过长或消费并发度较小等原因,导致客户端消费能力不足,出现消息堆积的问题
解决方案
若出现消息堆积,可参考以下措施进行定位和处理。
-
判断消息堆积在云消息队列 RocketMQ 版服务端还是客户端。
查看客户端本地日志文件
ons.log
,搜索是否出现如下信息:the cached message count exceeds the threshold
-
出现相关日志信息,说明客户端本地缓冲队列已满,消息堆积在客户端
-
若未出现相关日志,说明消息堆积不在客户端,若出现这种特殊情况,请直接联系阿里云技术支持。
-
-
确认消息的消费耗时是否合理。
-
若查看到消费耗时较长,则需要查看客户端堆栈信息排查具体业务逻辑
-
若查看到消费耗时正常,则有可能是因为消费并发度不够导致消息堆积,需要逐步调大消费线程或扩容节点来解决。
消息的消费耗时可以通过以下方式查看:
-
登录云消息队列 RocketMQ 版控制台查看消息的消费轨迹,在消费者区域中可以看到单条消息的消费耗时。具体操作,请参见查询消息轨迹。
-
登录云消息队列 RocketMQ 版控制台查看消费者状态,在客户端连接信息中查看业务处理时间,获取消费耗时的平均值。具体操作,请参见查看消费者状态。
-
使用阿里云ARMS等其他监控产品做业务埋点采集消息的消费耗时。
-
-
查看客户端堆栈信息。只需要关注线程名为ConsumeMessageThread的线程,这些都是业务消费消息的逻辑。可参见Java官方文档判断线程的状态并根据具体问题修改业务逻辑。
客户端堆栈信息可以通过以下方式获取:
-
登录云消息队列 RocketMQ 版控制台查看消费者状态,在客户端连接信息中查看Java客户端堆栈信息。具体操作,请参见查看消费者状态。
-
使用Jstack工具打印堆栈信息。
-
请参见查看消费者状态获取消息堆积的消费者实例所对应的宿主机IP地址,并登录该宿主机。
-
执行以下任意命令,查看并记录Java进程的PID。
ps -ef |grep javajps -lm
-
执行以下命令,查看堆栈信息。
jstack -l pid > /tmp/pid.jstack
-
执行以下命令,查看
ConsumeMessageThread
的信息。cat /tmp/pid.jstack|grep ConsumeMessageThread -A 10 --color
-
常见的异常堆栈信息如下:
-
示例一:空闲无堆积的堆栈。
消费空闲情况下消费线程都会处于WAITING状态等待从消费任务队里中获取消息。
-
示例二:消费逻辑有抢锁休眠等待等情况。
消费线程阻塞在内部的一个睡眠等待上,导致消费缓慢。
-
示例三:消费逻辑操作数据库等外部存储卡住。
消费线程阻塞在外部的HTTP调用上,导致消费缓慢。
-
-
针对某些特殊业务场景,如果消息堆积已经影响到业务运行,且堆积的消息本身可以跳过不消费,您可以通过重置消费位点跳过这些堆积的消息从最新位点开始消费,快速恢复业务。具体操作,请参见重置消费位点。