深度解析RocketMq源码-消费者索引ConsumeQueue

news2025/1/17 3:06:04

1.绪论

rocketmq的broker中关于消息持久化的组件主要包含三个,分别是:持久化消息到文件中的组件commitLog;根据消息key索引commitLog日志的indexFile;消费者根据topic和queueId查询commitLog日志的consumeQueue。前面已经介绍commitLog和indexFile,这节我们就来学习一下consumeQueue。

2. consumeQueue

4.1.1 consumeQueue的中每条消息的组成

消费者在消息的时候,只知道它在哪个topic的哪个queue里面消费哪个tags的拿条消息,那我们怎么由此定位到一条消息呢?所以我们需要为commitLog建立一条索引。

其实每个topic+queueId都会建立一个ConsumeQueue文件,而这个映射关系存储在broker中consumeQueueTable中,我们查询消息的时候,过consumeQueueTable根据queueId和topic快速定位到我们需要的ConsumeQueue,然后我们再根据消费者所提交的consumeQueueOffset*每条consumequeue索引的大小便能找到我们所以要的consume索引文件的位置,再根据里面存储的commitLog的物理偏移量便能在commitLog中定位到具体的消息的位置。

commitLog的存储结构可以如下所示:

└── TopicTest
    ├── 0
            └── 00000000000000000000
    ├── 1
            └── 00000000000000000000
    ├── 2
            └── 00000000000000000000
    └── 3
            └── 00000000000000000000

可以通过下图来说明,consumequeue是如何组成的,并且和commitLog的关系:

3.consumeQueue中消息的创建和获取

consumequeue其实底层和commitLog是一样的,其实由多个mappedFile来构成的,这里我们就不在讨论consumequeue的具体存储逻辑。有兴趣的小伙伴可以看这篇文章:

深度解析RocketMq源码-持久化组件(四) CommitLog_rocketmq一主一备commitlog-CSDN博客

接下来我们主要看一看consumequeue是什么时候创建的,并且在消费者知道它需要消费的topic和queueId过后,如何找到它具体要消费的哪条消息的,这其实也是rocketmq的核心之一。

3.1. consumequeue建立消息

其实consumequeue是一个采用mappedFile持久化数据的组件,它写入数据其实发分两步:

1.根据topic和queueId在topic和queueid与consumequeue的映射表(consumeQueueTable)中找到Consumequeue。

2.构造一条consumequeue记录,包括8字节的8字节的commitLog的offset + 4字节的消息大小+8自己的tagsCode,然后顺序写入到consumequeue中。

3.1.1 根据topic和queueId找到需要写入的consumequeue

  public void putMessagePositionInfo(DispatchRequest dispatchRequest) {
        //根据topic和queueId找到对应的consumequeue
        ConsumeQueue cq = this.findConsumeQueue(dispatchRequest.getTopic(), dispatchRequest.getQueueId());
        //根据consumequeue顺序写入consumemequeue的索引数据
        cq.putMessagePositionInfoWrapper(dispatchRequest, checkMultiDispatchQueue(dispatchRequest));
    }
 //根据queueId和topic检索到这个topic和queueId是属于哪个topic的
    public ConsumeQueue findConsumeQueue(String topic, int queueId) {
        //根据consumeQueueTable和queueId获取到具体的ConsumeQueue
        ConcurrentMap<Integer, ConsumeQueue> map = consumeQueueTable.get(topic);
        if (null == map) {
            ConcurrentMap<Integer, ConsumeQueue> newMap = new ConcurrentHashMap<Integer, ConsumeQueue>(128);
            ConcurrentMap<Integer, ConsumeQueue> oldMap = consumeQueueTable.putIfAbsent(topic, newMap);
            if (oldMap != null) {
                map = oldMap;
            } else {
                map = newMap;
            }
        }

        ConsumeQueue logic = map.get(queueId);
        if (null == logic) {
            //如果consumeQueue为空,便新建一个Consumequeue
            ConsumeQueue newLogic = new ConsumeQueue(
                topic,
                queueId,
                StorePathConfigHelper.getStorePathConsumeQueue(this.messageStoreConfig.getStorePathRootDir()),
                this.getMessageStoreConfig().getMappedFileSizeConsumeQueue(),
                this);
            //并且设置到consumeQueueTable中
            ConsumeQueue oldLogic = map.putIfAbsent(queueId, newLogic);
            if (oldLogic != null) {
                logic = oldLogic;
            } else {
                if (MixAll.isLmq(topic)) {
                    lmqConsumeQueueNum.getAndIncrement();
                }
                logic = newLogic;
            }
        }

        return logic;
    }

3.1.2 向consumequeue中写入一条记录

//commitLog的真正组成8字节的commitLog的offset + 4字节的消息大小+8自己的tagsCode
    private boolean putMessagePositionInfo(final long offset, final int size, final long tagsCode,
        final long cqOffset) {
        if (offset + size <= this.maxPhysicOffset) {
            log.warn("Maybe try to build consume queue repeatedly maxPhysicOffset={} phyOffset={}", maxPhysicOffset, offset);
            return true;
        }

        this.byteBufferIndex.flip();
        //consumqueue的每条数据占20个字节
        this.byteBufferIndex.limit(CQ_STORE_UNIT_SIZE);
        //8字节的commotLog的物理偏移量
        this.byteBufferIndex.putLong(offset);
        //4字节消息大小
        this.byteBufferIndex.putInt(size);
        //8字节的tagsCode
        this.byteBufferIndex.putLong(tagsCode);
        //写入到哪个offset
        final long expectLogicOffset = cqOffset * CQ_STORE_UNIT_SIZE;
        //获取到offset的最后一条数据
        MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile(expectLogicOffset);
        if (mappedFile != null) {

            if (mappedFile.isFirstCreateInQueue() && cqOffset != 0 && mappedFile.getWrotePosition() == 0) {
                //将跟新mappedFilde1写指针为expectLogicOffset
                this.minLogicOffset = expectLogicOffset;
                this.mappedFileQueue.setFlushedWhere(expectLogicOffset);
                this.mappedFileQueue.setCommittedWhere(expectLogicOffset);
                this.fillPreBlank(mappedFile, expectLogicOffset);
                log.info("fill pre blank space " + mappedFile.getFileName() + " " + expectLogicOffset + " "
                    + mappedFile.getWrotePosition());
            }

            if (cqOffset != 0) {
                //
                long currentLogicOffset = mappedFile.getWrotePosition() + mappedFile.getFileFromOffset();

                if (expectLogicOffset < currentLogicOffset) {
                    log.warn("Build  consume queue repeatedly, expectLogicOffset: {} currentLogicOffset: {} Topic: {} QID: {} Diff: {}",
                        expectLogicOffset, currentLogicOffset, this.topic, this.queueId, expectLogicOffset - currentLogicOffset);
                    return true;
                }

                if (expectLogicOffset != currentLogicOffset) {
                    LOG_ERROR.warn(
                        "[BUG]logic queue order maybe wrong, expectLogicOffset: {} currentLogicOffset: {} Topic: {} QID: {} Diff: {}",
                        expectLogicOffset,
                        currentLogicOffset,
                        this.topic,
                        this.queueId,
                        expectLogicOffset - currentLogicOffset
                    );
                }
            }
            //实际的物理地址为offset+size大小
            this.maxPhysicOffset = offset + size;
            //将数据写入到bytebuffer中
            return mappedFile.appendMessage(this.byteBufferIndex.array());
        }
        return false;
    }

3.2 根据topic和messagequeue检索消息

先根据topic和queueId找到以及consumeoffset查询一条消息分三步:

1.根据topic和queueId查询到对应的consumequeue;

2.根据consumeoffset在consumequeue中找到一条consumequeue的记录,里面包含一个属性就是实际消息在commitLog中的物理偏移量和大小;

3.根据物理偏移量和消息大小在commitLog中获取到实际消息内容。

    //根据queueId和topic检索到这个topic和queueId是属于哪个topic的
    public ConsumeQueue findConsumeQueue(String topic, int queueId) {
        //根据consumeQueueTable和queueId获取到具体的ConsumeQueue
        ConcurrentMap<Integer, ConsumeQueue> map = consumeQueueTable.get(topic);
        if (null == map) {
            ConcurrentMap<Integer, ConsumeQueue> newMap = new ConcurrentHashMap<Integer, ConsumeQueue>(128);
            ConcurrentMap<Integer, ConsumeQueue> oldMap = consumeQueueTable.putIfAbsent(topic, newMap);
            if (oldMap != null) {
                map = oldMap;
            } else {
                map = newMap;
            }
        }

        ConsumeQueue logic = map.get(queueId);
        if (null == logic) {
            //如果consumeQueue为空,便新建一个Consumequeue
            ConsumeQueue newLogic = new ConsumeQueue(
                topic,
                queueId,
                StorePathConfigHelper.getStorePathConsumeQueue(this.messageStoreConfig.getStorePathRootDir()),
                this.getMessageStoreConfig().getMappedFileSizeConsumeQueue(),
                this);
            //并且设置到consumeQueueTable中
            ConsumeQueue oldLogic = map.putIfAbsent(queueId, newLogic);
            if (oldLogic != null) {
                logic = oldLogic;
            } else {
                if (MixAll.isLmq(topic)) {
                    lmqConsumeQueueNum.getAndIncrement();
                }
                logic = newLogic;
            }
        }

        return logic;
    }

再根据consumeoffset在consumequeue中获取到具体consumequeue的索引数据。

public boolean get(final long address, final CqExtUnit cqExtUnit) {
        if (!isExtAddr(address)) {
            return false;
        }

        final int mappedFileSize = this.mappedFileSize;
        final long realOffset = unDecorate(address);

        MappedFile mappedFile = this.mappedFileQueue.findMappedFileByOffset(realOffset, realOffset == 0);
        if (mappedFile == null) {
            return false;
        }

        int pos = (int) (realOffset % mappedFileSize);

        SelectMappedBufferResult bufferResult = mappedFile.selectMappedBuffer(pos);
        if (bufferResult == null) {
            log.warn("[BUG] Consume queue extend unit({}) is not found!", realOffset);
            return false;
        }
        boolean ret = false;
        try {
            ret = cqExtUnit.read(bufferResult.getByteBuffer());
        } finally {
            bufferResult.release();
        }

        return ret;
    }

4.总结

至此,我们已经大概了解消息在进入到broker过后做了什么。在生产者推送消息到broker过后,为了保证数据的能够快速的持久化,是直接按照到达顺序写入到commitLog中的,然后就会给主线程返回生产消息成功的通知。但是消费者需要根据topic和queueId获取到一条消息,并且需要根据消息的key检索一条消息。为了满足上述两个需求,rocketmq会启动一个线程,扫描commitLog,如果有新的消息写入,便会构建IndexFile和consumequeue两个文件,其实相当于两个索引文件。这一步骤在我们后面章节会详细介绍。

其实先持久化文件,然后启动线程对消息做其他处理,这一思想的本质就是为了增大吞吐量。在其他框架中也会应用到这种思想,比如elasticsearch中,在写消息的时候,会同时写入到transLog和memory buffer中后便会返回成功,后续单独启动线程根据memory buffer中的数据来进行其他操作,比如分词,建立倒排索引等,可以看出translog其实就类似于rokcetmq的commitLog。所以万变不离其中,只要有一份持久化数据过后,便可以跟客户端返回成功了,然后再单独的启动线程根据这份持久化数据做定制化处理。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1874649.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Logback-打印方法名及代码行号

背景 公司产品使用了logback作为日志输出框架&#xff0c;日志输出的pattern里配置了打印调用方法名及代码行号的配置&#xff0c;但是实际输出的日志方法名总是显示? 在强迫症的驱使下&#xff0c;开启了探秘之旅 Logback版本 1.2.3 项目中Logging.pattern配置如下&#xff1…

51单片机STC89C52RC——11.1 蜂鸣器播放音乐

目录 目的/效果 一&#xff0c;STC单片机模块 二&#xff0c;蜂鸣器 2.1 介绍 2.2 板子位置电路图 2.3 发声原理 2.4 音符和频率 三&#xff0c;创建Keil项目 四&#xff0c;代码 4.1 乐谱代码 4.1.1 《义勇军进行曲》 4.1.2 《天空之城》 4.1.3 《小美满》 4.1.…

6.26.4.3 条件生成对抗和卷积网络用于x射线乳房质量分割和形状分类

一种基于条件生成对抗网络(conditional Generative Adversarial Networks, cGAN)的乳房肿块分割方法。假设cGAN结构非常适合准确地勾勒出质量区域&#xff0c;特别是当训练数据有限时。生成网络学习肿瘤的内在特征&#xff0c;而对抗网络强制分割与基础事实相似。从公开DDSM数据…

【移动应用开发期末复习】第五/六章例题

系列文章 第一章——Android平台概述 第一章例题 第二章——Android开发环境 第二章例题 第三章 第三章例题 第四章 第五/六章 系列文章RadioGroup 是一个Android特有的布局容器,用于包含多个RadioButton组件。当用户选择其中一个RadioButton时,RadioGroup会自动取消其他Rad…

CS-隐藏防朔源-数据转发-iptables(Linux自带的防火墙)

免责声明:本文仅做技术交流与学习... 目录 准备环境: 1-iptables转发机设置转发: 2-CS服务器配置iptables服务器的IP 准备环境: 两台外网服务器. --iptables服务器就是做一个中转...封了中转就没了... 1-iptables转发机设置转发: iptables -I INPUT -p tcp -m tcp --dport 8…

【FFmpeg】avio_open2函数

【FFmpeg】avio_open2函数 1.avio_open21.1 创建URLContext&#xff08;ffurl_open_whitelist&#xff09;1.1.1 创建URLContext&#xff08;ffurl_alloc&#xff09;1.1.1.1 查找合适的protocol&#xff08;url_find_protocol&#xff09;1.1.1.2 为查找到的URLProtocol创建UR…

【前端项目笔记】6 参数管理

参数管理 效果展示&#xff1a; 在开发功能之前先创建分支goods_params cls 清空终端 git branch 查看所有分支 git checkout -b goods_params 新建分支goods_params git push -u origin goods_params 把本地的新分支推送到云端origin并命名为goods_params 参数管理需要维…

报餐小程序可以运用在饭堂的哪方面

随着科技的快速发展&#xff0c;智能化、信息化的管理方式逐渐渗透到我们日常生活的方方面面。在饭堂管理中&#xff0c;报餐小程序的应用为传统的餐饮管理方式带来了革命性的变革。本文将探讨报餐小程序在饭堂管理中的应用及其带来的优势。 一、报餐小程序的基本功能 报餐小程…

GIT 基于master分支创建hotfix分支的操作

基于master分支创建hotfix分支的操作通常遵循以下步骤&#xff1a; 切换到master分支&#xff1a; 首先&#xff0c;确保你的工作区是最新的&#xff0c;并且你在master分支上。如果不在master分支&#xff0c;你需要先切换过去。 Bash git checkout master 拉取最新的master…

鸿蒙开发设备管理:【@ohos.distributedHardware.deviceManager (设备管理)】

设备管理 本模块提供分布式设备管理能力。 系统应用可调用接口实现如下功能&#xff1a; 注册和解除注册设备上下线变化监听发现周边不可信设备认证和取消认证设备查询可信设备列表查询本地设备信息&#xff0c;包括设备名称&#xff0c;设备类型和设备标识 说明&#xff1a…

ATFX汇市:美国5月PCE数据来袭,EURUSD或迎剧烈波动

ATFX汇市&#xff1a;今日20:30&#xff0c;美国商务部将公布5月核心PCE物价指数年率&#xff0c;前值为2.8%&#xff0c;预期值2.6%&#xff0c;预期下降0.2个百分点。PCE数据是美联储进行货币政策决策的重要依据&#xff0c;尤其是核心PCE年率&#xff0c;向下波动会增加降息…

【LeetCode】一、数组相关:双指针算法 + 置换

文章目录 1、算法复杂度1.1 时间复杂度1.2 空间复杂度 2、数组3、leetcode485&#xff1a;最大连续1的个数4、leetcode283&#xff1a;移动05、leetcode27&#xff1a;移除元素 1、算法复杂度 1.1 时间复杂度 算法的执行时间与输入值之间的关系&#xff08;看代码实际总行数的…

J2EE框架之mybatis学习——连接数据库实现查询操作

J2EE框架之mybatis学习——连接数据库实现查询操作 作业要求&#xff1a; 作者&#xff1a;杨建东 关于具体内容我正准备更新至我的CSDN【被瞧不起的神】也可移步我的公众号【猿小馆】 结合老师的课件和黑马程序员的课程学习。 因为我上课老师已经讲过了基本的概念和理解&a…

Swagger与RESTful API

1. Swagger简介 在现代软件开发中&#xff0c;RESTful API已成为应用程序间通信的一个标准。这种架构风格通过使用标准的HTTP方法来执行网络上的操作&#xff0c;简化了不同系统之间的交互。API&#xff08;应用程序编程接口&#xff09;允许不同的软件系统以一种预定义的方式…

西安电子科技大学微电子/集成电路801考研第一名学长经验分享

西安电子科技大学801考研经验贴 24 届上岸杭研微电子&#xff0c;以下是我的初试成绩&#xff0c;在这里给学弟学妹们分享一下初试复习经验&#xff0c;希望对大家有帮助&#xff0c;有疑问可以在会员群私聊我&#xff1b; 专业课杭研第一名&#xff0c;当时跟的研梦考研小孙学…

流水线作业模拟程序

目录 一 设计原型 二 后台源码 一 设计原型 二 后台源码 namespace 流水线作业模拟 {public partial class Form1 : Form{public Form1(){InitializeComponent();}private int Count 0;private bool IsStop false;private void uiLight1_Click(object sender, EventArgs e…

Jenkins容器的部署

本文主要是记录如何在Centos7上安装docker,以及在docker里面配置tomcat、mysql、jenkins等环境。 一、安装docker 1.1 准备工作 centos7、VMware17Pro 1.2 通过yum在线安装dokcer yum -y install docker1.3 启动docker服务 systemctl start docker.service1.4 查看docke…

Python 中的抽象语法树

Abstract Syntax Trees in Python 注&#xff1a;机翻&#xff0c;未校对。 Requirement: All examples are compatible with at least Python v3.6, except for using ast.dump() with the attribute indent which has been added in Python v3.9. 要求&#xff1a;所有示例至…

黑马点评的程序登录界面点击了发送验证码之后弹出红色异常框之后又返回登录页面

分析原因是因为token为null&#xff0c;然后执行了response.setStatus 401然后就出现这个问题&#xff0c;只要注释了这行就行了&#xff0c;就能正常登录&#xff01;~

3d模型怎么一缩放模型都散了?---模大狮模型网

在3D建模和渲染中&#xff0c;缩放是常见的操作&#xff0c;用来调整模型的大小以适应不同场景或视角需求。然而&#xff0c;有时在进行缩放操作时&#xff0c;模型可能会出现不希望的散乱现象&#xff0c;这可能导致模型的外观和结构受到影响。模大狮将探讨为何会出现这种问题…