RocketMQ源码 Broker-PullRequestHoldService 长轮询消息拉取组件源码分析

news2025/1/13 3:38:43

前言

PullRequestHoldService 继承了ServiceThread类,它本身是一个线程,以后台方式无线循环运行,支持长轮询(默认5秒)和短轮询(默认1秒)两种方式(CountDownlatch 方式控制)控制线程执行间隔。5秒钟后,线程内部会检查被挂起的请求(消费者建立连接后,会立即执行一次消息拉取服务,如果能拉到消息直接返回相应,如果拉不到,就会被挂起到 PullMessHoldService 服务),通知消息到达,调用 PullMessageProcessor 消息拉取处理组件,再进行一次拉取处理,如果能拉到消息,就直接返回,如果拉取不到再次挂起。

除了以上方式唤醒被挂起的消息拉取请求,还有一个 NotifyMessageArrivingListener 消息到达监听器,可以监听 topic-queueId-tagCode 维度的消息,进行通知唤醒被挂起的线程。

本次涉及三个核心组件:

  1. PullRequestHoldService 长轮询消息拉取组件;
  2. PullMessageProcessor 拉取消息处理组件;
  3. NotifyMessageArrivingListener 消息到达监听组件;

源码版本:4.9.3

源码架构图

核心数据结构

长轮询消息拉取组件数据结构,维护了一个拉取请求处理列表:Map<topic@queueId, 挂起的请求列表> pullRequestTable 。

public class PullRequestHoldService extends ServiceThread {

    // 系统时钟
    private final SystemClock systemClock = new SystemClock();

    // 长轮询消息拉取请求表,key为topic@queueId,value为PullRequest集合
    protected ConcurrentMap<String/* topic@queueId */, ManyPullRequest> pullRequestTable =
        new ConcurrentHashMap<String, ManyPullRequest>(1024);
}

pullRequestTable 拉取请求处理列表的 value对象(ManyPullRequest)维护的核心数据结构是被挂起的请求列表。

public class ManyPullRequest {
    // 拉取请求列表
    private final ArrayList<PullRequest> pullRequestList = new ArrayList<>();
}

核心行为流程

唤醒流程一

PullMessageProcessor 接受网络通信请求,处理消息拉取请求。从messageStore 消息存储组件拉取消息,如果拉不到就将请求,挂起到pullRequestHoldService长轮询消息拉取组件。

// pull方式拉取消息处理器组件
public class PullMessageProcessor extends AsyncNettyRequestProcessor implements NettyRequestProcessor {
    ....
    private RemotingCommand processRequest(final Channel channel, RemotingCommand request, boolean brokerAllowSuspend)
        throws RemotingCommandException {
    // 一堆校验...
    // 关键点:从消息存储组件拉取消息MessageStore
    final GetMessageResult getMessageResult =
           this.brokerController.getMessageStore().getMessage(requestHeader.getConsumerGroup(), requestHeader.getTopic(),
                requestHeader.getQueueId(), requestHeader.getQueueOffset(), requestHeader.getMaxMsgNums(), messageFilter);

        ....
     // 处理查询结果,没有拉到消息时,将请求挂起到hold服务
     case ResponseCode.PULL_NOT_FOUND:

          if (brokerAllowSuspend && hasSuspendFlag) {
                       
         ...
this.brokerController.getPullRequestHoldService().suspendPullRequest(topic, queueId, pullRequest);
              response = null;
              break;
         }

    }
    ...

}

PullRequestHoldService 长轮询消息拉取组件,接受挂起的请求,维护在内存中。且以后台线程方式,轮询pullRequestTable,唤醒所有请求,尝试拉取消息。

// 长轮询消息拉取组件
public class PullRequestHoldService extends ServiceThread {

    // 长轮询消息拉取请求表,key为topic@queueId,value为PullRequest集合
    protected ConcurrentMap<String/* topic@queueId */, ManyPullRequest> pullRequestTable =
        new ConcurrentHashMap<String, ManyPullRequest>(1024);


    // 如果没有拉取到请求,就挂起拉取请求,等待当前线程长轮询处理
    public void suspendPullRequest(final String topic, final int queueId, final PullRequest pullRequest) {
        String key = this.buildKey(topic, queueId);
        ManyPullRequest mpr = this.pullRequestTable.get(key);
        if (null == mpr) {
            mpr = new ManyPullRequest();
            ManyPullRequest prev = this.pullRequestTable.putIfAbsent(key, mpr);
            if (prev != null) {
                mpr = prev;
            }
        }

        mpr.addPullRequest(pullRequest);
    }

    @Override
    public void run() {
        log.info("{} service started", this.getServiceName());

        // 循环,处理被挂起的拉取请求
        while (!this.isStopped()) {
            try {
                // 长轮询,默认5秒
                if (this.brokerController.getBrokerConfig().isLongPollingEnable()) {
                    this.waitForRunning(5 * 1000);
                } else {
                    // 短轮询,默认1秒
                    this.waitForRunning(this.brokerController.getBrokerConfig().getShortPollingTimeMills());
                }

                long beginLockTimestamp = this.systemClock.now();
                // 处理被挂起的拉取请求
                this.checkHoldRequest();

                long costTime = this.systemClock.now() - beginLockTimestamp;
                if (costTime > 5 * 1000) {
                    log.info("[NOTIFYME] check hold request cost {} ms.", costTime);
                }
            } catch (Throwable e) {
                log.warn(this.getServiceName() + " service has exception. ", e);
            }
        }

        log.info("{} service end", this.getServiceName());
    }


    protected void checkHoldRequest() {
        // 遍历被挂起的拉取请求列表
        for (String key : this.pullRequestTable.keySet()) {
            String[] kArray = key.split(TOPIC_QUEUEID_SEPARATOR);
            if (2 == kArray.length) {
                String topic = kArray[0];
                int queueId = Integer.parseInt(kArray[1]);

                // 从消息存储组件获取当前队列消费的最大偏移量
                final long offset = this.brokerController.getMessageStore().getMaxOffsetInQueue(topic, queueId);
                try {
                    // 通知消息消费者
                    this.notifyMessageArriving(topic, queueId, offset);
                } catch (Throwable e) {
                    log.error("check hold request failed. topic={}, queueId={}", topic, queueId, e);
                }
            }
        }
    }

    public void notifyMessageArriving(final String topic, final int queueId, final long maxOffset) {
        notifyMessageArriving(topic, queueId, maxOffset, null, 0, null, null);
    }

    public void notifyMessageArriving(final String topic, final int queueId, final long maxOffset, final Long tagsCode,
        long msgStoreTime, byte[] filterBitMap, Map<String, String> properties) {
        String key = this.buildKey(topic, queueId);
        // 获取指定topic-queueId被挂起的拉取请求列表
        ManyPullRequest mpr = this.pullRequestTable.get(key);
        if (mpr != null) {
            // 克隆被挂起的拉取请求列表
            List<PullRequest> requestList = mpr.cloneListAndClear();
            if (requestList != null) {
                List<PullRequest> replayList = new ArrayList<PullRequest>();

                // 遍历所有请求
                for (PullRequest request : requestList) {
                    long newestOffset = maxOffset;
                    // 如果拉取的最大偏移量小于当前拉取的起始偏移量,则从消息存储组件获取最大的偏移量开始拉取
                    if (newestOffset <= request.getPullFromThisOffset()) {
                        newestOffset = this.brokerController.getMessageStore().getMaxOffsetInQueue(topic, queueId);
                    }

                    // 匹配消息过滤器
                    if (newestOffset > request.getPullFromThisOffset()) {
                        boolean match = request.getMessageFilter().isMatchedByConsumeQueue(tagsCode,
                            new ConsumeQueueExt.CqExtUnit(tagsCode, msgStoreTime, filterBitMap));
                        // match by bit map, need eval again when properties is not null.
                        if (match && properties != null) {
                            match = request.getMessageFilter().isMatchedByCommitLog(null, properties);
                        }

                        if (match) {
                            try {
                                // 重点:如果消息到达filter匹配,则执行挂起请求唤醒服务
                                this.brokerController.getPullMessageProcessor().executeRequestWhenWakeup(request.getClientChannel(),
                                    request.getRequestCommand());
                            } catch (Throwable e) {
                                log.error("execute request when wakeup failed.", e);
                            }
                            continue;
                        }
                    }

                    // 重点:超时处理,如果超时,则执行挂起请求唤醒服务
                    if (System.currentTimeMillis() >= (request.getSuspendTimestamp() + request.getTimeoutMillis())) {
                        try {
                            this.brokerController.getPullMessageProcessor().executeRequestWhenWakeup(request.getClientChannel(),
                                request.getRequestCommand());
                        } catch (Throwable e) {
                            log.error("execute request when wakeup failed.", e);
                        }
                        continue;
                    }

                    replayList.add(request);
                }

                // 没有执行的请求,重新放回被挂起的拉取请求列表
                if (!replayList.isEmpty()) {
                    mpr.addPullRequest(replayList);
                }
            }
        }
    }
}

唤醒流程二

NotifyMessageArrivingListener 通知消息到达监听器,会监听指定topic-指定queueId的消息变化,通知 pullRequestHoldService 长轮询请求处理服务,实时唤醒挂起的对应请求,进行消息拉取。

// 通知消息到达监听器
public class NotifyMessageArrivingListener implements MessageArrivingListener {
    private final PullRequestHoldService pullRequestHoldService;

    public NotifyMessageArrivingListener(final PullRequestHoldService pullRequestHoldService) {
        this.pullRequestHoldService = pullRequestHoldService;
    }

    @Override
    public void arriving(String topic, int queueId, long logicOffset, long tagsCode,
        long msgStoreTime, byte[] filterBitMap, Map<String, String> properties) {

        // 通知长轮询请求处理服务,消息到达,唤醒挂起的请求
        this.pullRequestHoldService.notifyMessageArriving(topic, queueId, logicOffset, tagsCode,
            msgStoreTime, filterBitMap, properties);
    }
}

完整源码文件

见资源包

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1309509.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

在HTML中如何设置音频和视频?

目录 一、设置音频二、设置视频 一、设置音频 <!DOCTYPE html> <html lang"en"> <head><meta charset"UTF-8"><title>Document</title> </head> <body> <!-- controls:控制播放暂停的按钮autop…

【Jmeter】Jmeter基础4-Jmeter元件介绍之监听器

2.4、监听器 监听器主要用于收集、统计、查看和分析结果。 2.4.1、察看结果树 作用&#xff1a;查看取样器请求和响应结果&#xff0c;包括消息头&#xff0c;请求的数据&#xff0c;响应的数据等。一般在调试时才用&#xff0c;在实际运行压测时建议禁用&#xff0c;因为大量…

TikTok科技趋势:平台如何引领数字社交革命?

TikTok作为一款颠覆性的短视频应用&#xff0c;不仅改变了用户的娱乐方式&#xff0c;更在数字社交领域引领了一场革命。本文将深入探讨TikTok在科技趋势方面的引领作用&#xff0c;分析其在数字社交革命中的关键角色&#xff0c;以及通过技术创新如何不断满足用户需求&#xf…

element-ui以服务方式调用loading,自定义修改icon

一、以服务的方式调用Loading 除了常用的v-loading、this.$loading我们还可以以服务的方式调用。主要有以下步骤 引入Loading服务 import { Loading } from element-ui;在需要时调用 Loading.service(options);其中 options 参数为 Loading 的配置项&#xff0c;具体见下表…

2023 亚马逊云科技 re:Invent 大会探秘:Aurora 无限数据库的突破性应用

文章目录 一、前言二、Amazon Aurora 无限数据库2.1 亚马逊云科技数据库产品发展历程2.2 什么是 Amazon Aurora Limitless Database&#xff08;无限数据库&#xff09;2.3 Amazon Aurora Limitless Database 设计架构2.4 Amazon Aurora Limitless Database 分片功能2.5 使用 A…

嵌入式开发板qt gdb调试

1&#xff09; 启动 gdbserver ssh 或者 telnet 登陆扬创平板 192.168.0.253&#xff0c; 进入命令行执行如下&#xff1a; chmod 777 /home/HelloWorld &#xff08;2&#xff09; 打 开 QTcreator->Debug->StartDebugging->Attach to Running Debug Server 进行…

MySQL之DML语句

文章目录 DML语句创建表添加表字段**插入数据**查询数据更新数据替换数据删除数据清除表数据删除表 DML语句 数据操作语言DML&#xff08;Data Manipulation Langua&#xff09; 是SQL语言的一个分类&#xff0c;用于对表的数据进行增&#xff0c;删&#xff0c;改&#xff0c…

xtu oj 1328 数码和

题目描述 一个10进制数n在2∼16进制下可以得到的不同的数码和&#xff0c;求在这些数码和中出现次数最多的数码和。 比如20&#xff0c; 其中数码和2和4分别出现了3次&#xff0c;为最多出现次数。 输入 第一行是一个整数T(1≤T≤1000)&#xff0c;表示样例的个数。 以后每行…

python selenium chrome114版本之后环境配置和携带缓存打开chrome

尽力局 chrome驱动环境配置chrome打开带缓存设置待缓存打开自动关闭浏览器自动关闭浏览器弹窗 最终代码找资料难啊最终效果代码 依赖包和生成依赖包方法关闭谷歌升级 chrome驱动环境配置 网上找到的资料&#xff0c;我现在安装的是120版本的&#xff0c;这个资料是可行的。比较…

猫头虎博主深度探索:Amazon Q——2023 re:Invent大会的AI革新之星

猫头虎博主深度探索&#xff1a;Amazon Q——2023 re:Invent大会的AI革新之星 授权说明&#xff1a;本篇文章授权活动官方亚马逊云科技文章转发、改写权&#xff0c;包括不限于在 亚马逊云科技开发者社区, 知乎&#xff0c;自媒体平台&#xff0c;第三方开发者媒体等亚马逊云科…

一文带你掌握Spring事务核心:TransactionDefinition详解!

TransactionDefinition是Spring框架中用于定义事务属性的核心接口。在Spring的事务管理中&#xff0c;这个接口扮演着至关重要的角色&#xff0c;它允许开发者定制事务的各种属性&#xff0c;如隔离级别、传播行为、超时时间以及是否只读。 基本介绍 TransactionDefinition 接…

Python创建代理IP池详细教程

一、问题背景 在进行网络爬虫或数据采集时&#xff0c;经常会遇到目标网站对频繁访问的IP进行封禁的情况&#xff0c;为了规避这种封禁&#xff0c;我们需要使用代理IP来隐藏真实IP地址&#xff0c;从而实现对目标网站的持续访问。 二、代理IP池的基本概念 代理IP池是一个包…

《使用ThinkPHP6开发项目》 - 登录接口三【表单验证】

《使用ThinkPHP6开发项目》 - 登录接口一-CSDN博客 https://blog.csdn.net/centaury32/article/details/134974860 在设置用户登录时&#xff0c;由于安全问题会对登录密码进行加密 表单验证这里也可以使用ThinkPHP6自带的验证规则&#xff0c;创建一个验证管理员的文件 ph…

〖大前端 - 基础入门三大核心之JS篇(52)〗- 指定函数上下文 - call和apply

说明&#xff1a;该文属于 大前端全栈架构白宝书专栏&#xff0c;目前阶段免费&#xff0c;如需要项目实战或者是体系化资源&#xff0c;文末名片加V&#xff01;作者&#xff1a;哈哥撩编程&#xff0c;十余年工作经验, 从事过全栈研发、产品经理等工作&#xff0c;目前在公司…

大数据存储技术(1)—— Hadoop简介及安装配置

目录 一、Hadoop简介 &#xff08;一&#xff09;概念 &#xff08;二&#xff09;Hadoop发展历史 &#xff08;三&#xff09;Hadoop三大发行版本 &#xff08;四&#xff09;Hadoop的优势 二、Hadoop的组成 &#xff08;一&#xff09;Hadoop1.x和Hadoop2.x的区别​…

RocketMQ源码 Broker-ConsumerFilterManager 消费者数据过滤管理组件源码分析

前言 ConsumerFilterManager 继承了ConfigManager配置管理组件&#xff0c;拥有将内存数据持久化到磁盘文件consumerFilter.json的能力。它主要负责&#xff0c;对在消费者拉取消息时&#xff0c;进行消息数据过滤&#xff0c;且只针对使用表达式过滤的消费者有效。 源码版本&…

SFP3006-ASEMI大电流快恢复二极管SFP3006

编辑&#xff1a;ll SFP3006-ASEMI大电流快恢复二极管SFP3006 型号&#xff1a;SFP3006 品牌&#xff1a;ASEMI 封装&#xff1a;TO-247 最大平均正向电流&#xff1a;30A 最大重复峰值反向电压&#xff1a;600V 产品引线数量&#xff1a;3 产品内部芯片个数&#xff1…

多项式回归

多项式回归 多项式回归 是非线性回归的一种&#xff0c;之前讨论的线性回归都是直线&#xff0c;而多项式回归则是曲线的回归&#xff0c;它通过引入原始预测变量的高阶项&#xff08;如平方、立方等&#xff09;来拟合数据的非线性模式。在多项式回归中&#xff0c;虽然模型对…

06 python 文件基础操作

6.1 .1文件读取操作 演示对文件的读取 # 打开文件 import timef open(02_word.txt, r, encoding"UTF-8") print(type(f))# #读取文件 - read() # print(f读取10个字节的结果{f.read(10)}) # print(f读取全部字节的结果{f.read()})# #读取文件 - readLines() # lines…

头歌-Python 基础

第1关&#xff1a;建模与仿真 1、 建模过程&#xff0c;通常也称为数学优化建模(Mathematical Optimization Modeling)&#xff0c;不同之处在于它可以确定特定场景的特定的、最优化或最佳的结果。这被称为诊断一个结果&#xff0c;因此命名为▁▁▁。 填空1答案&#xff1a;决…