【2021年遇到最头疼的Bug】【Alibaba中间件技术系列】「RocketMQ技术专题」Broker配置介绍及发送流程、异常(XX Busy)问题分析总结

news2025/1/16 17:41:18

背景介绍

公司最近年底要对系统做一次大的体检,所以是不测不知道,一测吓一跳啊,出现了很多问题,其中最恶心的问题要数我们的ROCKETMQ消息队列的问题了,大家都知道消息队列是作为流量削峰的主要手段,负责系统健壮性和压力的最佳手段,谁知道,它竟然"生病"了,干不动活了。

问题现象

系统频繁出现:system busy 和 broker busy 解决方案:

com.alibaba.rocketmq.client.exception.MQBrokerException: CODE: 2  DESC: [TIMEOUT_CLEAN_QUEUE]broker busy, start flow control for a while, period in queue: 208ms, size of queue: 8

Rocketmq发送控制流程

针对前4种 broker busy ,主要是由于 Broker 在追加消息时持有的锁时间超过了设置的1s,Broker 为了自我保护,会抛出错误,客户端会选择其他 broker 服务器进行重试。

如果对不是金融级服务,建议将 transientStorePoolEnable = true,可以有效避免前面 4 种 broker ,因为开启这个参数,消息首先会存储在堆外内存中,并且 RocketMQ 提供了内存锁定的功能,其追加性能能得到一定的保障,这样可以做到在内存使用层面的读写分离,即写消息是直接写入堆外内存,消费消息直接从 pagecache中读,然后定时将堆外内存的消息写入 pagecache。

但这种方案随之带来的就是可能存在消息丢失,如果对消息非常严谨的话,建议扩容集群,或迁移topic到新的集群。

可以看出来,抛出这种错误,在 broker 还没有发送"严重"的 pagecache 繁忙,即消息追加到内存中的最大时延没有超过 1s,通常追加是很快的,绝大部分都会低于1ms,但可能会由于出现一个超过200ms的追加时间,导致排队中的任务等待时间超过了200ms,则此时会触发broker 端的快速失败,让请求快速失败,便于客户端快速重试。但是这种请求并不是实时的,而是每隔10s 检查一遍。

值得注意的是,一旦出现 TIMEOUT_CLEAN_QUEUE,可能在一个点会有多个这样的错误信息,具体多少与当前积压在待发送队列中的个数有关。

Rocketmq 发送时异常

system busy 和 broker busy 解决方案

  • [REJECTREQUEST]system busy too many requests and system thread pool busy
  • [PC_SYNCHRONIZED]broker busy
  • [PCBUSY_CLEAN_QUEUE]broker busy
  • [TIMEOUT_CLEAN_QUEUE]broker busy

之前写的解决方案,都是基于测试环境测试的.到生产环境之后,正常使用没有问题,生产环境压测时,又出现了system busy异常(简直崩溃)

com.alibaba.rocketmq.client.exception.MQBrokerException: CODE: 2  DESC: [TIMEOUT_CLEAN_QUEUE]broker busy, start flow control for a while, period in queue: 208ms, size of queue: 8
For more information, please visit the url, http:
	at com.alibaba.rocketmq.client.impl.MQClientAPIImpl.processSendResponse(MQClientAPIImpl.java:455)
	at com.alibaba.rocketmq.client.impl.MQClientAPIImpl.sendMessageSync(MQClientAPIImpl.java:272)
	at com.alibaba.rocketmq.client.impl.MQClientAPIImpl.sendMessage(MQClientAPIImpl.java:253)
	at com.alibaba.rocketmq.client.impl.MQClientAPIImpl.sendMessage(MQClientAPIImpl.java:215)
	at com.alibaba.rocketmq.client.impl.producer.DefaultMQProducerImpl.sendKernelImpl(DefaultMQProducerImpl.java:671)
	at com.alibaba.rocketmq.client.impl.producer.DefaultMQProducerImpl.sendDefaultImpl(DefaultMQProducerImpl.java:440)
	at com.alibaba.rocketmq.client.impl.producer.DefaultMQProducerImpl.send(DefaultMQProducerImpl.java:1030)
	at com.alibaba.rocketmq.client.impl.producer.DefaultMQProducerImpl.send(DefaultMQProducerImpl.java:989)
	at com.alibaba.rocketmq.client.producer.DefaultMQProducer.send(DefaultMQProducer.java:90)
	at
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	at java.lang.Thread.run(Thread.java:748)

报错定位

  • cleanExpiredRequestInQueue会处理发送消息、拉取消息、心跳、事务消息队列中的数据,此次遇到的问题是发送Topic消息报出来的错误,所以接下来针对发送消息流程进行分析。
  • 报出此错误的源码位置为broker快速失败机制BrokerFastFailure.java类(该类在Broker启动时会启动一个定时任务,每10毫秒执行一次),报错位置代码如下:
void cleanExpiredRequestInQueue(final BlockingQueue blockingQueue, final long maxWaitTimeMillsInQueue) {
        while (true) {
            try {
                if (!blockingQueue.isEmpty()) {

                    final Runnable runnable = blockingQueue.peek();
                    if (null == runnable) {
                        break;
                    }
                    final RequestTask rt = castRunnable(runnable);
                    if (rt == null || rt.isStopRun()) {
                        break;
                    }

                    final long behind = System.currentTimeMillis() - rt.getCreateTimestamp();

                    if (behind >= maxWaitTimeMillsInQueue) {
                        if (blockingQueue.remove(runnable)) {
                            rt.setStopRun(true);
                            rt.returnResponse(RemotingSysResponseCode.SYSTEM_BUSY, String.format("[TIMEOUT_CLEAN_QUEUE]broker busy, start flow control for a while, period in queue: %sms, size of queue: %d", behind, blockingQueue.size()));
                        }
                    } else {
                        break;
                    }
                } else {
                    break;
                }
            } catch (Throwable ignored) {
            }
        }
    }

这段代码是Broker快速失败机制的核心代码,如果一个等待队列的头元素(也就是第一个要处理或者正在处理的元素)等待时间超过该队列设置的最大等待时间,则丢弃该元素对象的任务,并对这个请求返回[TIMEOUT_CLEAN_QUEUE]broker busy异常信息。

发送Topic消息报该错误

sendThreadPoolQueue取出头元素,转换成对应的任务,判断任务在队列存活时间是否超过了队列设置的最大等待时间,如果超过了则组装处理返回对象response,response的code为RemotingSysResponseCode.SYSTEM_BUSY,内容为:

[TIMEOUT_CLEAN_QUEUE]broker busy, start flow control for a while, period in queue: [当前任务在队列存活时间], size of queue: [当前队列的长度]

MQClientAPIImpl.processSendResponse处理返回response,根据response.getCode()的处理分支,最终返回MQBrokerException异常,response分支处理代码如下:


private SendResult processSendResponse(
        final String brokerName,
        final Message msg,
        final RemotingCommand response
    ) throws MQBrokerException, RemotingCommandException {
        switch (response.getCode()) {
            case ResponseCode.FLUSH_DISK_TIMEOUT:
            case ResponseCode.FLUSH_SLAVE_TIMEOUT:
            case ResponseCode.SLAVE_NOT_AVAILABLE: {
            }
            case ResponseCode.SUCCESS: {

                return sendResult;
            }
            default:
                break;
        }
        throw new MQBrokerException(response.getCode(), response.getRemark());
    }

消息发送客户端接收到MQBrokerException异常信息,捕获异常处理中不符合消息重试逻辑,直接抛出该异常,也就是用户看到的; // timesTotal为消息生产者设置的发送失败重试次数

for (; times < timesTotal; times++) {
                String lastBrokerName = null == mq ? null : mq.getBrokerName();
                MessageQueue mqSelected = this.selectOneMessageQueue(topicPublishInfo, lastBrokerName);
                if (mqSelected != null) {
                    mq = mqSelected;
                    brokersSent[times] = mq.getBrokerName();
                    try {

                    } catch (RemotingException e) {
                        endTimestamp = System.currentTimeMillis();
                        this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true);
                        log.warn(String.format("sendKernelImpl exception, resend at once, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e);
                        log.warn(msg.toString());
                        exception = e;
                        continue;
                    } catch (MQClientException e) {
                        endTimestamp = System.currentTimeMillis();
                        this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true);
                        log.warn(String.format("sendKernelImpl exception, resend at once, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e);
                        log.warn(msg.toString());
                        exception = e;
                        continue;
                    } catch (MQBrokerException e) {

                        endTimestamp = System.currentTimeMillis();
                        this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true);
                        log.warn(String.format("sendKernelImpl exception, resend at once, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e);
                        log.warn(msg.toString());
                        exception = e;
                        switch (e.getResponseCode()) {
                            case ResponseCode.TOPIC_NOT_EXIST:
                            case ResponseCode.SERVICE_NOT_AVAILABLE:
                            case ResponseCode.SYSTEM_ERROR:
                            case ResponseCode.NO_PERMISSION:
                            case ResponseCode.NO_BUYER_ID:
                            case ResponseCode.NOT_IN_CURRENT_UNIT:
                                continue;
                            default:
                                if (sendResult != null) {
                                    return sendResult;
                                }

                                throw e;
                        }
                    } catch (InterruptedException e) {
                        endTimestamp = System.currentTimeMillis();
                        this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false);
                        log.warn(String.format("sendKernelImpl exception, throw exception, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e);
                        log.warn(msg.toString());

                        log.warn("sendKernelImpl exception", e);
                        log.warn(msg.toString());
                        throw e;
                    }
                } else {
                    break;
                }
            }

生产环境各种参数:

  • broker busy异常: 可通过增大 waitTimeMillsInSendQueue 解决
  • system busy异常:可通过增大 osPageCacheBusyTimeOutMills 解决

waitTimeMillsInSendQueue=3000

osPageCacheBusyTimeOutMills=5000

出现问题分析

出现异常的原因是因为我们同一台服务器部署的多个应用造成的。我们一台服务器上部署了 三个ES、八个redis、一个rocketmq ,压力测试时这些都在使用,虽然cpu、内存都还有很大剩余,但是磁盘io和内存频率毕竟只有那么多可能已经占满,或者还有其他都会有影响。

之前测试环境测试其他东西时,发现mq和redis同时大量使用时,redis速度会降低三到四倍,由此可见应用分服务器部署的重要性。以前知道会有影响,没想到影响这么大。

最终结解决方案:应该给rocketmq单独部署性能较高的服务器.

记一次 rocketmq 使用时的异常。

问题分析总结
  1. system busy , start flow control for a while

该异常会造成 消息丢失。

  1. broker busy , start flow control for a while

该异常不会造成消息丢失。

问题解决过程

1、最开始时候 ,测试发现在性能好的服务器上只会出现system busy,也就是说出现异常就会消息丢失。

所以:业务代码进行处理,出现异常就会重发到当前topic的bak队列,当时想的是既然这个topic busy了,就换到另外的topic去发,总不能都 busy吧。也算是临时解决了。

2、发现有消息重复的现象。不用想肯定是报broker busy异常,重发到topic的 bak队列了。又因为broker busy可能不会造成消息丢失,所以消息重复就出现了。

解决方案:

修改rocketmq配置文件:

  • 方案一:sendMessageThreadPoolNums 改成 1 ,没有的话新增一行。sendMessageThreadPoolNums=1
  • 方案二:useReentrantLockWhenPutMessage改成true,没有的话新增一行。
sendMessageThreadPoolNums=32
useReentrantLockWhenPutMessage=true

sendMessageThreadPoolNums这个属性是发送线程池大小, rocketmq4.1版本之后默认为 1,之前版本默认什么不知道但是肯定大于1。这个属性改成1的话,就不用管useReentrantLockWhenPutMessage这个属性了;

如果改成大于1,就需要将useReentrantLockWhenPutMessage这个属性设置为 true;

目前测试 未发现这两个方案有什么区别,sendMessageThreadPoolNums=1 时也支持多线程发送,发送速度感觉和 sendMessageThreadPoolNums大于1没有区别,都能跑满100M的网卡。

感觉如果useReentrantLockWhenPutMessage=true的时候,就是打开锁,然后关键代码其实还是单线程处理;

解决方案
  1. 业务逻辑处理中进行异常捕获,如果捕获到异常为MQBrokerException并且responseCode为2则重发消息;
  2. 修改broker的默认发送消息任务队列等待时长waitTimeMillsInSendQueue(单位: 毫秒);

除此之外,还可以观察报错时磁盘的IO情况,出现这种错误很有可能是当时的磁盘IO很高,导致消息落盘时间变长。

RocketMQ的参数指南

NameServer配置属性


brokerClusterName=rocketmqcluster
brokerName=broker-a

brokerId=0

namesrvAddr=rocketmq-nameserver1:9876

brokerIP1=10.30.51.149

defaultTopicQueueNums=8

autoCreateTopicEnable=false

autoCreateSubscriptionGroup=true

listenPort=10911

deleteWhen=03

fileReservedTime=48

mapedFileSizeCommitLog=1073741824

mapedFileSizeConsumeQueue=1000000
destroyMapedFileIntervalForcibly=120000
redeleteHangedFileInterval=120000

diskMaxUsedSpaceRatio=88

storePathRootDir=/app/data/rocketmq/data

storePathCommitLog=/app/data/rocketmq/data/commitlog

storePathConsumeQueue=/app/data/rocketmq/data/consumerqueue

storePathIndex=/app/data/rocketmq/data/index

storeCheckpoint=/app/data/rocketmq/data/checkpoint

abortFile=/app/data/rocketmq/data/abort

maxMessageSize=‭16777216‬

waitTimeMillsInSendQueue=3000
osPageCacheBusyTimeOutMills=5000
flushCommitLogLeastPages=12
flushConsumeQueueLeastPages=6
flushCommitLogThoroughInterval=30000
flushConsumeQueueThoroughInterval=180000

brokerRole=ASYNC_MASTER

flushDiskType=ASYNC_FLUSH

sendMessageThreadPoolNums=80

pullMessageThreadPoolNums=128
useReentrantLockWhenPutMessage=true

官方资料

  • Rocketmq官网:rocketmq.apache.org/
  • Rocketmq的其它项目:github.com/apache/rock…
  • Rocketmq-console安装:blog.csdn.net/zzzgd_666/a…

分享资源

资源分享
获取以上资源请访问开源项目 点击跳转

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/907759.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

C++基础Ⅰ编译、链接

目录儿 1 C是如何工作的1.1 预处理语句1.2 include1.3 main()1.4 编译单独编译项目编译 1.5 链接 2 定义和调用函数3 编译器如何工作3.1 编译3.1.1 引入头文件系统头文件自定义头文件 3.1.2 自定义类型3.1.3 条件判断拓展: 汇编 3.2 链接3.2.1 起始函数3.2.2 被调用的函数 3.3 …

C++新经典09--函数新特性、inline内联函数与const详解

函数回顾与后置返回类型 函数定义中如果有形参则形参应该有名字&#xff0c;而不光是只有类型&#xff0c;但是如果并不想使用这个形参&#xff0c;换句话说这个形参并不在这个函数中使用&#xff0c;则不给形参名也可以&#xff0c;但在调用这个函数的时候&#xff0c;该位置…

什么情况下,亚马逊账户会被判滥用?

如果说有对亚马逊跨境电商有所了解的朋友就会知道&#xff0c;现在亚马逊跨境电商的规则是十分严格的&#xff0c;亚马逊开店变得越来越困难&#xff0c;尤其是要想成功的把一个亚马逊店铺给开好。 这几年不少有一些违规的亚马逊卖家都被系统检测到了&#xff0c;如果说被系统…

投资者的秘密武器,代理IP在金融决策中的驱动作用

在如今数据为王的时代&#xff0c;无论从事哪个行业&#xff0c;都需要使用数据分析来指导自己的决策&#xff0c;而这些数据又是从哪里来的呢&#xff1f;很多人都知道&#xff0c;数据采集可以帮助我们将分散在互联网各个网站上的大量数据集中起来。对于金融行业来说&#xf…

重磅丨无人机新规出台,这些红线不能踩!

近年来&#xff0c;随着无人机研发技术逐渐成熟&#xff0c;无人机在各个领域得到了广泛应用&#xff0c;包括VR全景航拍、乡村农业、城市管理、环境监测等领域&#xff0c;其应用场景及使用方式都还在迅速拓展&#xff0c;无人机行业受到社会广泛关注。 但在实践中&#xff0c…

SOLIDWORKS焊件是什么?

SOLIDWORKS是一款广泛应用于机械设计领域的三维计算机辅助设计软件。SOLIDWORKS提供了强大的焊件功能&#xff0c;可以帮助工程师们以更高的效率设计焊接件。本文将介绍SOLIDWORKS焊件的概念、特点以及使用方法&#xff0c;以期帮助读者更好地理解和应用这一关键技术。 SOLIDWO…

国际刑警组织逮捕 14 名涉嫌盗窃 4000 万美元的网络罪犯

Bleeping Computer 网站披露&#xff0c;4 月份&#xff0c;国际刑警组织发动了一起为期四个月&#xff0c;横跨 25 个非洲国家的执法行动 “Africa Cyber Surge II”&#xff0c;共逮捕 14 名网络犯罪嫌疑人&#xff0c;摧毁 20000 多个从事勒索、网络钓鱼、BEC 和在线诈骗的犯…

第七次作业 运维高级 docker容器进级版

1、使用mysql:5.6和 owncloud 镜像&#xff0c;构建一个个人网盘。 (1)拉取相应镜像 docker pull mysq:5.6 docker pull owncloud:latest(2)运行mysql&#xff1a;5.6容器 docker run --name mysql -e MYSQL_ROOT_PASSWORD12345 -d mysql:5.6(3)运行owncloud容器 docker run…

【转】CentOS7安装GUI界面及远程连接的实现

用基于浏览器(webdriver)的selenium技术爬取数据&#xff0c;所以程序需运行在GUI环境下。本文分三个部分简要介绍安装GUI界面及远程连接的步骤。 安装GUI界面 大多数云服务器厂商提供的镜像都无GUI界面&#xff0c;所以要先安装图形环境。本文使用GNOME桌面环境&#xff1a;…

小程序 - 人脸核验遇到的问题 startFacialRecognitionVerifyAndUploadVideo:fail:access denied

目录 报错 - 没有权限代码解决 【补充】接口调用正常页面展示 报错 - 没有权限 {"errno": 102, "errMsg": "startFacialRecognitionVerifyAndUploadVideo:fail:access denied"} 代码 import Taro, { Component } from tarojs/taroclass Home e…

DeepHow首席执行官兼联合创始人郑三博士谈运用人工智能加速视频创作,实现视频界的“降本增效”

原创 |文 BFT机器人 DeepHow的首席执行官兼联合创始人Sam Zheng领导着一家快速发展的初创公司&#xff0c;并得到了受人尊敬的投资者的支持。DeepHow通过创新的、人工智能驱动的、以视频为中心的知识捕获和传输平台&#xff0c;彻底改变了熟练的劳动力培训。 在加入DeepHow之前…

YOLO目标检测——猫狗识别数据集下载分享

猫狗识别数据集是一个常用的用于猫和狗图像分类任务的数据集&#xff0c;包含了大量的猫和狗的图像样本 数据集点击下载&#xff1a;YOLO猫狗识别数据集5000图片.rar

了解AI智能问答的流程之后!使用起来更简单了

AI智能问答流程主要是按照自然语言理解、对话管理、自然语言生成这3个步骤&#xff0c;通过这些步骤之后&#xff0c;就可以将语言进行转换&#xff0c;转换成计算机能够理解的意思&#xff0c;再根据当前对话管理判断应该采取的策略。接下来looklook会详细来讲讲具体是如何实现…

<数据结构与算法>堆的应用二叉树的链式实现

目录 前言 一、堆的应用 1. 堆排序 1.1 排升序&#xff0c;建大堆 1.2 时间复杂度计算 2. Top k问题 二、 二叉树的链式实现 1. 二叉树的遍历 2. 二叉树基础OJ 3.DFS && BFS 总结 前言 学习完堆的数据结构&#xff0c;我们要清楚&#xff0c;它虽然实现了排序功能&am…

Wlan——STA上线流程与802.11MAC帧讲解

目录 802.11MAC帧基本概念 802.11帧结构 802.11MAC帧的分类 管理帧 控制帧 数据帧 STA接入无线网络流程 信号扫描—管理帧 链路认证—管理帧 用户关联—管理帧 用户上线 802.11MAC帧基本概念 802.11协议在802家族中的角色位置 其中802.3标准属于以太网的一种帧格式…

Redis 分布式锁存在什么问题 ?如何解决 ?

目录 1. 如何实现分布式锁 2. Redis 分布式锁存在什么问题 2.1 解决死锁问题 2.2 解决锁误删问题 1. 如何实现分布式锁 Redis 天生就可以作为一个分布式系统来使用&#xff0c;所以它实现的锁都是分布式锁。 Redis 可以通过 setnx&#xff08;set if not exists&#xff09…

vant4 自定义垂直步骤条时间线组件几行css代码改造完成(附效果图)

直接上效果图片 <template><!-- 审批流程 --><div><van-steps direction="vertical" active="-1"><van-step><template #inactive-icon><div class="relative"><img :src="girlIcon" /&…

JavaScript-console:JavaScript控制台(Console)常用方法

一、理解 console JavaScript 控制台&#xff08;console&#xff09;是一个开发人员在编写 JavaScript 代码时常用的工具。它是浏览器提供的一种界面&#xff0c;让开发人员能够追踪代码执行的状态和结果。JavaScript 控制台可以记录代码输出的信息、警告和错误&#xff0c;并…

opencv 进阶13-Fisherfaces 人脸识别-函数cv2.face.FisherFaceRecognizer_create()

Fisherfaces 人脸识别 PCA 方法是 EigenFaces 方法的核心&#xff0c;它找到了最大化数据总方差特征的线性组合。不可否认&#xff0c;EigenFaces 是一种非常有效的方法&#xff0c;但是它的缺点在于在操作过程中会损失许多特征信息。 因此&#xff0c;在一些情况下&#xff0c…

外贸网站怎么做推广优化

外贸网站的推广和优化是关键&#xff0c;因为它可以帮助您扩展国际市场并吸引更多的客户。以下是e6zz seo多年经验总结出一些有效的方法&#xff0c;可以帮助您推广和优化外贸网站&#xff1a; 优化网站内容&#xff1a; 确保您的网站内容对国际客户有吸引力&#xff0c;包括产…