Rocket面试(五)Rocketmq发生流量控制的情况有哪些?

news2025/1/23 7:10:37

在使用rocketmq过程中总能看见一下异常

[TIMEOUT_CLEAN_QUEUE]broker busy, start flow control for a while, period in queue: 206ms, size of queue: 5

这是因为Rocketmq出发了流量控制。

 

触发流量控制就是为了防止Broker压力过大挂掉。主要分为Broker流控,Consumer流控

1.Broker流控 

Rocketmq默认采取的是异步刷盘方式,Producer把消息发送到broker后,Broker会把消息暂放在Page Cache中刷盘线程定时的把数据刷到磁盘中

1.1 broker busy 

Broker是开启快速失败的,处理逻辑类是BrokerFastFailure,这个类中有一个定时任务用来清理过期的请求,每 10 ms 执行一次,代码如下:

public void start() {
    this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
        @Override
        public void run() {
            if (brokerController.getBrokerConfig().isBrokerFastFailureEnable()) {
                cleanExpiredRequest();
            }
        }
    }, 1000, 10, TimeUnit.MILLISECONDS);
}

 1.1.1 Page Cache繁忙

清理过期请求之前会先判断一下Page Cache是否繁忙,如果繁忙就会给Producer返回一个系统繁忙的状态码(code=2,remark="[PCBUSY_CLEAN_QUEUE]broker busy, start flow control for a while, period in queue: %sms, size of queue: %d"),那怎么判断Page Cache繁忙呢?

当Broker收到消息后,会放到Page Cache中,这个过程,首先会取一个CommitLog写入锁,如果持有锁的时间超过1s就认为Page Cache繁忙具体代码见 DefaultMessageStore 类 isOSPageCacheBusy 方法。

1.1.2清理过期请求

清理过期请求时,如果请求线程创建的时间与当前系统时间的间隔大于200ms.然后给 Producer 返回一个系统繁忙的状态码(code=2,remark="[TIMEOUT_CLEAN_QUEUE]broker busy, start flow control for a while, period in queue: %sms, size of queue: %d")

1.2 system busy

这个异常在 NettyRemotingAbstract#processRequestCommand 方法.

1.拒绝请求

如果 NettyRequestProcessor 拒绝了请求,就会给 Producer 返回一个系统繁忙的状态码(code=2,remark="[REJECTREQUEST]system busy, start flow control for a while")

那什么情况下请求会被拒绝呢?看下面这段代码:

//SendMessageProcessor类
public boolean rejectRequest() {
    return this.brokerController.getMessageStore().isOSPageCacheBusy() ||
        this.brokerController.getMessageStore().isTransientStorePoolDeficient();
}

从代码中可以看到,请求被拒绝的情况有两种可能,一个是 Page Cache 繁忙,另一个是 TransientStorePoolDeficient。跟踪 isTransientStorePoolDeficient 方法,发现判断依据是在开启 transientStorePoolEnable 配置的情况下,是否还有可用的 ByteBuffer。

注意:在开启 transientStorePoolEnable 的情况下,写入消息时会先写入堆外内存(DirectByteBuffer),然后刷入 Page Cache,最后刷入磁盘。而读取消息是从 Page Cache,这样可以实现读写分离,避免读写都在 Page Cache 带来的问题

2.线程拒绝

Broker 收到请求后,会把处理逻辑封装成到 Runnable 中,由线程池来提交执行,如果线程池满了就会拒绝请求(这里线程池中队列的大小默认是 10000,可以通过参数 sendThreadPoolQueueCapacity 进行配置),线程池拒绝后会抛出异常 RejectedExecutionException,程序捕获到异常后,会判断是不是单向请求(OnewayRPC),如果不是,就会给 Producer 返回一个系统繁忙的状态码(code=2,remark="[OVERLOAD]system busy, start flow control for a while")

判断 OnewayRPC 的代码如下,flag = 2 或者 3 时是单向请求:

public boolean isOnewayRPC() {
    int bits = 1 << RPC_ONEWAY;
    return (this.flag & bits) == bits;
}

 1.3消息重试

Broker 发生流量控制的情况下,返回给 Producer 系统繁忙的状态码(code=2),Producer 收到这个状态码是不会进行重试的。下面是会进行重试的响应码:

//DefaultMQProducer类
private final Set<Integer> retryResponseCodes = new CopyOnWriteArraySet<Integer>(Arrays.asList(
    ResponseCode.TOPIC_NOT_EXIST,
    ResponseCode.SERVICE_NOT_AVAILABLE,
    ResponseCode.SYSTEM_ERROR,
    ResponseCode.NO_PERMISSION,
    ResponseCode.NO_BUYER_ID,
    ResponseCode.NOT_IN_CURRENT_UNIT
));

2.Consumer流控

        DefaultMQPushConsumerImpl 类中有 Consumer 流控的逻辑 。

2.1 缓存消息数量超过阈值

ProcessQueue 保存的消息数量超过阈值(默认 1000,可以配置),源码如下:

if (cachedMessageCount > this.defaultMQPushConsumer.getPullThresholdForQueue()) {
    this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL);
    if ((queueFlowControlTimes++ % 1000) == 0) {
        log.warn(
            "the cached message count exceeds the threshold {}, so do flow control, minOffset={}, maxOffset={}, count={}, size={} MiB, pullRequest={}, flowControlTimes={}",
            this.defaultMQPushConsumer.getPullThresholdForQueue(), processQueue.getMsgTreeMap().firstKey(), processQueue.getMsgTreeMap().lastKey(), cachedMessageCount, cachedMessageSizeInMiB, pullRequest, queueFlowControlTimes);
    }
    return;
}

2.2缓存消息大小超过阈值

ProcessQueue 保存的消息大小超过阈值(默认 100M,可以配置),源码如下:

if (cachedMessageSizeInMiB > this.defaultMQPushConsumer.getPullThresholdSizeForQueue()) {
    this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL);
    if ((queueFlowControlTimes++ % 1000) == 0) {
        log.warn(
            "the cached message size exceeds the threshold {} MiB, so do flow control, minOffset={}, maxOffset={}, count={}, size={} MiB, pullRequest={}, flowControlTimes={}",
            this.defaultMQPushConsumer.getPullThresholdSizeForQueue(), processQueue.getMsgTreeMap().firstKey(), processQueue.getMsgTreeMap().lastKey(), cachedMessageCount, cachedMessageSizeInMiB, pullRequest, queueFlowControlTimes);
    }
    return;
}

2.3缓存消息跨度超过阈值

对于非顺序消费的场景,ProcessQueue 中保存的最后一条和第一条消息偏移量之差超过阈值(默认 2000,可以配置)。源代码如下:

if (!this.consumeOrderly) {
    if (processQueue.getMaxSpan() > this.defaultMQPushConsumer.getConsumeConcurrentlyMaxSpan()) {
        this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL);
        if ((queueMaxSpanFlowControlTimes++ % 1000) == 0) {
            log.warn(
                "the queue's messages, span too long, so do flow control, minOffset={}, maxOffset={}, maxSpan={}, pullRequest={}, flowControlTimes={}",
                processQueue.getMsgTreeMap().firstKey(), processQueue.getMsgTreeMap().lastKey(), processQueue.getMaxSpan(),
                pullRequest, queueMaxSpanFlowControlTimes);
        }
        return;
    }
}

2.4获取锁失败

对于顺序消费的情况,ProcessQueue加锁失败,也会延迟拉取,这个延迟时间默认是 3s,可以配置。

3.总结

本文介绍了 RocketMQ 发生流量控制的 8 个场景,其中 Broker 4 个场景,Consumer 4 个场景。Broker 的流量控制,本质是对 Producer 的流量控制,最好的解决方法就是给 Broker 扩容,增加 Broker 写入能力。而对于 Consumer 端的流量控制,需要解决 Consumer 端消费慢的问题,比如有第三方接口响应慢或者有慢 SQL。

Broker4种场景:

page cache 繁忙:获取commitlog写入锁超过1s 

清理过期请求 如果请求过期请求的时间到当前系统时间超过了200ms 

请求拒绝  一种是page cache繁忙 一种是 transientStorePoolEnable模式看是否可用buffer

线程池拒绝 Broker 收到请求后,会把处理逻辑封装成到 Runnable 中,由线程池来提交执行,如果线程池满了就会拒绝请求(这里线程池中队列的大小默认是 10000

Consumer4种场景

消息数量超过阈值1000

消息大小超过阈值100m

缓存消息跨度超过阈值 对于非顺序消费的场景,ProcessQueue 中保存的最后一条和第一条消息偏移量之差超过阈值2000

ProcessQueue加锁失败 也会延迟加载

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/630889.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

全志V3S嵌入式驱动开发(开发环境再升级)

【 声明&#xff1a;版权所有&#xff0c;欢迎转载&#xff0c;请勿用于商业用途。 联系信箱&#xff1a;feixiaoxing 163.com】 前面我们陆陆续续开发了差不多有10个驱动&#xff0c;涉及到网口、串口、音频和视频等几个方面。但是整个开发的效率还是比较低的。每次开发调试的…

【MySQL 数据库】8、视图

目录 一、什么是视图二、视图语法三、检查选项(1) cascaded&#xff08;级联&#xff09;(2) local 四、视图的作用五、视图案例 一、什么是视图 视图&#xff08;View&#xff09;是一种虚拟存在的表视图中的数据并不在数据库中真实存在行和列数据来自定义视图的查询中使用的…

一棵有点自律的树——搜索二叉树

文章目录 &#x1f490;专栏导读&#x1f490;文章导读&#x1f337;搜索二叉树概念&#x1f337;二叉搜索树的构建&#x1f33a;查找操作&#x1f33a;插入操作&#x1f33a;删除操作&#x1f33a;遍历操作☘️测试 &#x1f3f5;️拓展——递归实现&#x1f343;递归查找&…

数据结构与算法·第9章【查找】

概念 关键字&#xff1a; 是数据元素&#xff08;或记录&#xff09;中某个数据项的值&#xff0c;用以标识&#xff08;识别&#xff09;一个数据元素&#xff08;或记录&#xff09;。若此关键字可以识别唯一的一个记录&#xff0c;则称之谓“主关键字”。若此关键字能识别…

2.3 利用MyBatis实现关联查询

一、创建数据库表 1、创建教师表 执行SQL语句&#xff0c;创建教师表t_teacher CREATE TABLE t_teacher ( t_id int(11) NOT NULL AUTO_INCREMENT, t_name varchar(20) DEFAULT NULL, PRIMARY KEY (t_id) ) ENGINEInnoDB AUTO_INCREMENT4 DEFAULT CHARSETutf8mb4;执行SQL语句…

使用OpenFlow和Ryu控制器实现网络交换机的软件定义网络(SDN)控制

使用OpenFlow和Ryu控制器实现网络交换机的软件定义网络&#xff08;SDN&#xff09;控制 &#xff08;1&#xff09;环境介绍 硬件环境&#xff1a;系统最低要求为2个CPU 、2 GB内存。 拓扑介绍&#xff1a;云平台具体安装拓扑如图5-4所示。 图5-4 云平台安装拓扑 搭建云平…

使用pipreqs生成requirements文件,并在服务器(矩池云)上通过requirements文件安装环境采坑记录

目录 问题描述问题1&#xff1a;问题2&#xff1a;发现问题问题解决 问题3&#xff1a;问题4&#xff1a;问题5&#xff1a;解决方案 关键&#xff01;&#xff01;&#xff01;正常安装成功的操作流程备注1.我为何不在vscode的终端中装pipreqs包&#xff1f;2.在vscode终端中输…

Spring Cloud构建微服务架构:服务注册与发现

Spring Cloud简介 Spring Cloud是一个基于Spring Boot实现的云应用开发工具&#xff0c;它为基于JVM的云应用开发中的配置管理、服务发现、断路器、智能路由、微代理、控制总线、全局锁、决策竞选、分布式会话和集群状态管理等操作提供了一种简单的开发方式。 Spring Cloud包…

面了一个来华为要22K的人,啥都不会,还不如找个应届生来代替···

最近有个在华为的朋友跟我分享了他面试招人的过程&#xff0c;感觉华为还是挺难进的。面试前后进行了20天左右&#xff0c;包含4轮电话面试、1轮笔试、1轮主管视频面试、1轮hr视频面试。 据他所说&#xff0c;80%的人都会栽在第一轮面试。 其实&#xff0c;第一轮的电话面试除…

ASEMI代理英飞凌TLE7244SL功率电子开关,TLE7244SL参数

编辑-Z TLE7244SL参数描述&#xff1a; 型号&#xff1a;TLE7244SL 数字电源电压VDD&#xff1a;3.0 V ~ 5.5 V 模拟电源电压VDDA&#xff1a;4.5 V ~ 5.5 V 每个通道在Tj150C时的最大导通状态电阻RDS(ON,max)&#xff1a;1.7 Ω 额定负载电流IL (nom)&#xff1a;290 mA…

Nginx【反向代理负载均衡动静分离】--中

Nginx【反向代理负载均衡动静分离】–中 负载均衡-配置实例 示意图 负载均衡配置-思路分析/图解 示意图 负载均衡配置规则 负载均衡就是将负载分摊到不同的服务单元&#xff0c;既保证服务的可用性&#xff0c;又保证响应足够快 linux 下有Nginx、LVS、Haproxy 等等服务可…

在Apifox中,使用后置脚本显示响应结果reponse中的base64图片

背景 在使用Apifox去请求有图片的接口时&#xff0c;我想要请求成功的同时&#xff0c;可以显示出来图片&#xff0c;这个时候就开始百度找官方文档。最终发现可以使用后置脚本显示reponse中的图片。 方案 如下图所示&#xff0c;接口请求成功后&#xff0c;返回的json结构为…

【Spring Boot 初识丨四】主应用类

上一篇讲了 Spring Boot 的启动器 本篇来讲一讲 主程序类 Main Application Class 及注解 Spring Boot 初识&#xff1a; 【Spring Boot 初识丨一】入门实战 【Spring Boot 初识丨二】maven 【Spring Boot 初识丨三】starter 主程序类 一、定义二、注解2.1 SpringBootApplicati…

秋招指南(菜狗版)-Java前/后端开发方向

期末考试结束&#xff0c;菜的人还在享受假期&#xff0c;即将进大厂的已经在学习了&#xff08;狗头&#xff09; 作为经受去年秋招摧残的老学姐&#xff0c;给大家带来一些秋招学习的小经验&#xff0c;希望可以帮助大家避免一些求职路上的坑&#xff0c;能快速顺利地找到心仪…

论文笔记与实战:对比学习方法MOCO

目录 1. 什么是MOCO2. MOCO是干吗用的3. MOCO的工作原理3.1 一些概念1. 无监督与有监督的区别2. 什么是对比学习3. 动量是什么 3.2 MOCO工作原理1. 字典查找2. 如何构建一个好的字典3. 工作流程 3.3 &#xff08;伪&#xff09;代码分析 4. 其他一些问题5. MOCO v2和MOCO v35.1…

Nginx【反向代理负载均衡动静分离】--下

Nginx【反向代理负载均衡动静分离】–下 Nginx 工作机制&参数设置 master-worker 机制 示意图 图解 一个master 管理多个worker 一说master-worker 机制 ● 争抢机制示意图 图解 一个master Process 管理多个worker process, 也就是说Nginx 采用的是多进程结构, 而…

字节8年经验总结:13 条自动化测试框架设计原则(建议收藏)

1.代码规范 测试框架随着业务推进&#xff0c;必然会涉及代码的二次开发&#xff0c;所以代码编写应符合通用规范&#xff0c;代码命名符合业界标准&#xff0c;并且代码层次清晰。特别在大型项目、多人协作型项目中&#xff0c;如果代码没有良好的规范&#xff0c;那么整个框…

leetcode109. 有序链表转换二叉搜索树(java)

有序链表转换二叉平衡搜索树 leetcode109. 有序链表转换二叉搜索树题目描述 解题思路代码演示链表和二叉树专题 leetcode109. 有序链表转换二叉搜索树 来源&#xff1a;力扣&#xff08;LeetCode&#xff09; 链接&#xff1a;https://leetcode.cn/problems/convert-sorted-lis…

QCameraViewfinder的使用

目录 引言核心代码完整代码 引言 本文是简单使用Qt快速使用摄像头完成截图等工作&#xff0c;主要涉及QCamera、QCameraViewfinder、QCameraImageCapture这三个类。QCamera通过相机的信息创建&#xff0c;用于控制开始接收图形、停止接收图像。QCameraViewfinder则是图像的展示…

(1Gbit)MT28EW01GABA1LPC-0SIT、MT28EW01GABA1HPC-0SIT FLASH - NOR 存储器

MT28EW01GABA1LPC-0SIT、MT28EW01GABA1HPC-0SIT 1Gbit并行NOR闪存器件具有较高的密度、就地执行 (XiP) 性能和架构灵活性&#xff0c;可满足汽车、消费类和移动产品的设计要求。该器件非常适合用于GPS/导航、汽车后视摄像头、手机、智能手机和电子阅读器。该器件还具有较宽的温…