8、Broker进一步了解

news2025/1/24 4:49:58

1、Broker消息分发服务以及构建ConsumeQueue和IndexFile与消息清除

前面分析如何进行刷盘,本章分析Broker的消息分发以及构建ConsumerQueue和IndexFile,两者构建是为了能够提高效率,减少消息查找时间以及减少网络带宽与存储空间。

ConsumeQueue 可以快速定位消息,IndexFile 可以快速定位 ConsumeQueue 中的位置,并随时实时更新,以保证消息的消费速度。

ConsumeQueue 和 IndexFile 采用了压缩存储和分片存储的方式,消费者可以仅仅消费需要的消息,从而节省存储空间。

ConsumeQueue 可以快速定位消息,减少消息传递所消耗的网络带宽,提高消息传递效率。

消息分发

在执行Broker的启动方法会去启动消息分发服务,简单理解就是为消息建立一种索引更加快速有效的消费查询消息。
broker处理消息分发的类是ReputMessageService,启动一个线程不断地将commitLong分到到对应的consumerQueue与indexFile,消息分发流程处理完毕;


public void run() {
    DefaultMessageStore.log.info(this.getServiceName() + " service started");

    while (!this.isStopped()) {
        try {
            // 休眠1s,进行分发ConsumeQueue和IndexFile
            Thread.sleep(1);
            this.doReput();
        } catch (Exception e) {
            DefaultMessageStore.log.warn(this.getServiceName() + " service has exception. ", e);
        }
    }

    DefaultMessageStore.log.info(this.getServiceName() + " service end");
}

消息每隔1s进行消息的分发,尝试去构建ConsumerQueue索引与

IndexFile索引。
private void doReput() {
    if (this.reputFromOffset < DefaultMessageStore.this.commitLog.getMinOffset()) {
        log.warn("The reputFromOffset={} is smaller than minPyOffset={}, this usually indicate that the dispatch behind too much and the commitlog has expired.",
            this.reputFromOffset, DefaultMessageStore.this.commitLog.getMinOffset());
        this.reputFromOffset = DefaultMessageStore.this.commitLog.getMinOffset();
    }
    // 如果分发偏移量小于commitlog的最大物理偏移量,那么循环分发
    for (boolean doNext = true; this.isCommitLogAvailable() && doNext; ) {

        if (DefaultMessageStore.this.getMessageStoreConfig().isDuplicationEnable()
            && this.reputFromOffset >= DefaultMessageStore.this.getConfirmOffset()) {
            break;
        }
      // 从commitLog中获取需要分发的消息
        SelectMappedBufferResult result = DefaultMessageStore.this.commitLog.getData(reputFromOffset);
        if (result != null) {
            try {
                this.reputFromOffset = result.getStartOffset();

                for (int readSize = 0; readSize < result.getSize() && doNext; ) {
                    DispatchRequest dispatchRequest =
                        DefaultMessageStore.this.commitLog.checkMessageAndReturnSize(result.getByteBuffer(), false, false);
                    int size = dispatchRequest.getBufferSize() == -1 ? dispatchRequest.getMsgSize() : dispatchRequest.getBufferSize();

                    if (dispatchRequest.isSuccess()) {
                        if (size > 0) {
                            // CommitLog日志分发到ConsumeQueue和IndexFile
                            DefaultMessageStore.this.doDispatch(dispatchRequest);
                            // 长轮询:如果有消息到了主节点,并且开启了长轮询。(消息拉取逻辑后面单独讲)
                            if (BrokerRole.SLAVE != DefaultMessageStore.this.getMessageStoreConfig().getBrokerRole()
                                    && DefaultMessageStore.this.brokerConfig.isLongPollingEnable()
                                    && DefaultMessageStore.this.messageArrivingListener != null) {
                                //messageArrivingListener实例是NotifyMessageArrivingListener
                                //通知被hold住的消费者拉取消息的请求
                                DefaultMessageStore.this.messageArrivingListener.arriving(dispatchRequest.getTopic(),
                                    dispatchRequest.getQueueId(), dispatchRequest.getConsumeQueueOffset() + 1,
                                    dispatchRequest.getTagsCode(), dispatchRequest.getStoreTimestamp(),
                                    dispatchRequest.getBitMap(), dispatchRequest.getPropertiesMap());
                                notifyMessageArrive4MultiQueue(dispatchRequest);
                            }

                            this.reputFromOffset += size;
                            readSize += size;
                            
                            if (DefaultMessageStore.this.getMessageStoreConfig().getBrokerRole() == BrokerRole.SLAVE) {
                                DefaultMessageStore.this.storeStatsService
                                    .getSinglePutMessageTopicTimesTotal(dispatchRequest.getTopic()).add(1);
                                DefaultMessageStore.this.storeStatsService
                                    .getSinglePutMessageTopicSizeTotal(dispatchRequest.getTopic())
                                    .add(dispatchRequest.getMsgSize());
                            }
                        } else if (size == 0) {
                             // 如果等于0,表示读取到MappedFile文件尾
                            // 获取下一个文件的起始索引
                            this.reputFromOffset = DefaultMessageStore.this.commitLog.rollNextFile(this.reputFromOffset);
                            readSize = result.getSize();
                        }
                    } else if (!dispatchRequest.isSuccess()) {

                        if (size > 0) {
                            log.error("[BUG]read total count not equals msg total size. reputFromOffset={}", reputFromOffset);
                            this.reputFromOffset += size;
                        } else {
                            doNext = false;
                            // If user open the dledger pattern or the broker is master node,
                            // it will not ignore the exception and fix the reputFromOffset variable
                            if (DefaultMessageStore.this.getMessageStoreConfig().isEnableDLegerCommitLog() ||
                                DefaultMessageStore.this.brokerConfig.getBrokerId() == MixAll.MASTER_ID) {
                                log.error("[BUG]dispatch message to consume queue error, COMMITLOG OFFSET: {}",
                                    this.reputFromOffset);
                                this.reputFromOffset += result.getSize() - readSize;
                            }
                        }
                    }
                }
            } finally {
                result.release();
            }
        } else {
            doNext = false;
        }
    }
}

通过getData()方法来获取需要进行分发的消息,根据reputFromOffset的物理偏移量找到mappedFileQueue中对应的CommitLog文件的MappedFile,然后从该MappedFile中截取一段自reputFromOffset偏移量开始的ByteBuffer。
依次进行消息的分发调用doDispatch方法,把消息写入ConsumerQueue与IndexFile中。
如果当前节点是主节点且开启长轮询,则调用messageArrivingListener.arriving方法进行消息的投递,具体长轮询的方案后期介绍。
构建ConsumerQueue与IndexFile


public void doDispatch(DispatchRequest req) {
    //dispatchList中包含CommitLogDispatcherBuildConsumeQueue和CommitLogDispatcherBuildIndex,
    // 分别用来构建ConsumeQueue文件和IndexFile文件。
    for (CommitLogDispatcher dispatcher : this.dispatcherList) {
        dispatcher.dispatch(req);
    }
}

调用dispatch进行消息的分发,其中包括三个实现类。

CommitLogDispatcherBuildConsumeQueue类:写ConsumeQueue文件,构建ConsumeQueue索引。

CommitLogDispatcherBuildIndex类:写IndexFile文件,构建IndexFile索引。

CommitLogDispatcherCalcBitMap类:构建布隆过滤器,加速SQL92过滤效率,避免每次都解析sql。

消息清理

当消息到达一定程度就会被RocketMQ丢弃,而在Broker启动会去添加定时删除文件的任务,60s之后执行第一个后面每隔10s执行一次。

// 过期文件删除
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
    @Override
    public void run() {
        // 清除方法
        DefaultMessageStore.this.cleanFilesPeriodically();
    }
}

private void cleanFilesPeriodically() {
    // 删除CommitLog文件
    this.cleanCommitLogService.run();
    // 清除ConsumerQueue文件和IndexFile文件
    this.cleanConsumeQueueService.run();
}

清除的主要逻辑:首先清除CommitLog文件,然后在清除ConsumerQueue文件和IndexFile文件。

CommitLog文件的清除
// 定期删除CommitLog文件的地方
public void run() {
    try {
        // 删除文件
        this.deleteExpiredFiles();
        // 删除挂起文件
        this.redeleteHangedFile();
    } catch (Throwable e) {
        DefaultMessageStore.log.warn(this.getServiceName() + " service has exception. ", e);
    }
}

private void deleteExpiredFiles() {
    int deleteCount = 0;
    long fileReservedTime = DefaultMessageStore.this.getMessageStoreConfig().getFileReservedTime();
    int deletePhysicFilesInterval = DefaultMessageStore.this.getMessageStoreConfig().getDeleteCommitLogFilesInterval();
    int destroyMapedFileIntervalForcibly = DefaultMessageStore.this.getMessageStoreConfig().getDestroyMapedFileIntervalForcibly();
    //K2 Broker触发文件删除的三个条件
    boolean timeup = this.isTimeToDelete(); // deleteWhen是否达到,配置项
    boolean spacefull = this.isSpaceToDelete(); // 判断磁盘是否达到删除,diskmax配置项
    boolean manualDelete = this.manualDeleteFileSeveralTimes > 0; // 手动删除次数
    // 并未判断消息是否已经被消费,所以当长时间积压的消息,触发上面三个条件之一就会导致消息丢失未消费的情况,所以我们得避免消息积压。
    if (timeup || spacefull || manualDelete) {

        if (manualDelete)
            this.manualDeleteFileSeveralTimes--;

        boolean cleanAtOnce = DefaultMessageStore.this.getMessageStoreConfig().isCleanFileForciblyEnable() && this.cleanImmediately;

        log.info("begin to delete before {} hours file. timeup: {} spacefull: {} manualDeleteFileSeveralTimes: {} cleanAtOnce: {}",
            fileReservedTime,
            timeup,
            spacefull,
            manualDeleteFileSeveralTimes,
            cleanAtOnce);

        fileReservedTime *= 60 * 60 * 1000;

        deleteCount = DefaultMessageStore.this.commitLog.deleteExpiredFile(fileReservedTime, deletePhysicFilesInterval,
            destroyMapedFileIntervalForcibly, cleanAtOnce);
        if (deleteCount > 0) {
        } else if (spacefull) {
            log.warn("disk space will be full soon, but delete file failed.");
        }
    }
}

首先删除文件需要满足的要求是其一deleteWhen是否达到默认是04(可配置),其二判断磁盘是否达到删除默认75(可配置),其三手动删除。

触发过期⽂件删除时,有两个检查的纬度,⼀个是,是否到了触发删除的时间,就是broker.conf⾥配置的deleteWhen属性。另外还会检查磁盘利⽤率,达到阈值也会触发过期⽂件删除。这个阈值默认是75%,可以在broker.conf⽂件当中定制。但是最⼤值为95,最⼩值为10。

注意:观察源码可得 并未判断消息是否已经被消费,所以当长时间积压的消息,触发上面三个条件之一就会导致消息丢失未消费的情况,所以我们得避免消息积压。

如果删除的时候我们的文件正在被使用这就导致删除不会生效此时就需要redeleteHangedFile方法区删除挂起文件。

ConsumerQueueIndexFile清理
public void run() {
    try {
        // 删除过期的file
        this.deleteExpiredFiles();
    } catch (Throwable e) {
        DefaultMessageStore.log.warn(this.getServiceName() + " service has exception. ", e);
    }
}

private void deleteExpiredFiles() {
    // 间隔100
    int deleteLogicsFilesInterval = DefaultMessageStore.this.getMessageStoreConfig().getDeleteConsumeQueueFilesInterval();

    long minOffset = DefaultMessageStore.this.commitLog.getMinOffset();
    if (minOffset > this.lastPhysicalMinOffset) {
        this.lastPhysicalMinOffset = minOffset;

        ConcurrentMap<String, ConcurrentMap<Integer, ConsumeQueue>> tables = DefaultMessageStore.this.consumeQueueTable;

        for (ConcurrentMap<Integer, ConsumeQueue> maps : tables.values()) {
            for (ConsumeQueue logic : maps.values()) {
                // 进行consumerQueue删除
                int deleteCount = logic.deleteExpiredFile(minOffset);

                if (deleteCount > 0 && deleteLogicsFilesInterval > 0) {
                    try {
                        Thread.sleep(deleteLogicsFilesInterval);
                    } catch (InterruptedException ignored) {
                    }
                }
            }
        }
    // 进行IndexFile删除
        DefaultMessageStore.this.indexService.deleteExpiredFile(minOffset);
    }
}

在删除ConsumeQueue和IndexFile⽂件时,会去检查CommitLog当前的最⼩Offset,然后在删除时进⾏对⻬。

2、Broker端整体流程以及文件索引结构

至此我们的Producer发送消息以及Broker如何处理存储消息以及介绍完了,首先我们需要去整理一下整个流程,然后后面再来分析Consumer端的消费,以及各种消息类型的案例源码等。

发送端流程总结

生产者发送消息写入内存推送给Broker进行刷盘入CommitLog文件,同时处理分发将消息索引存放到ConsumerQueue与IndexFile文件当中。

如果是主从架构就会去进行主从之间的同步。

接下来当消息达到一定条件就会触发删除机制,程序注册了一个定时任务首次60s执行后面每隔10s去执行一次清理工作。注意:删除文件并没有判断是否被消费过了只要达到要求就会删除继而导致消息的丢失,所以我们得去避免消息的积压。
在这里插入图片描述
索引结构

CommitLog⽂件的⼤⼩是固定的,但是其中每个存储的每个消息单元长度是不固定的,在源码CommitLog类的calMsgLength方法有计算消息长度的方法。

protected static int calMsgLength(int sysFlag, int bodyLength, int topicLength, int propertiesLength) {
    int bornhostLength = (sysFlag & MessageSysFlag.BORNHOST_V6_FLAG) == 0 ? 8 : 20;
    int storehostAddressLength = (sysFlag & MessageSysFlag.STOREHOSTADDRESS_V6_FLAG) == 0 ? 8 : 20;
    final int msgLen = 4 //TOTALSIZE
        + 4 //MAGICCODE
        + 4 //BODYCRC
        + 4 //QUEUEID
        + 4 //FLAG
        + 8 //QUEUEOFFSET
        + 8 //PHYSICALOFFSET
        + 4 //SYSFLAG
        + 8 //BORNTIMESTAMP
        + bornhostLength //BORNHOST
        + 8 //STORETIMESTAMP
        + storehostAddressLength //STOREHOSTADDRESS
        + 4 //RECONSUMETIMES
        + 8 //Prepared Transaction Offset
        + 4 + (bodyLength > 0 ? bodyLength : 0) //BODY
        + 1 + topicLength //TOPIC
        + 2 + (propertiesLength > 0 ? propertiesLength : 0) //propertiesLength
        + 0;
    return msgLen;
}

正因为消息的记录⼤⼩不固定,所以RocketMQ在每次存CommitLog⽂件时,都会去检查当前CommitLog⽂件空间是否⾜够,如果不够的话,就重新创建⼀个CommitLog⽂件,⽂件名为当前消息的偏移量。

ConsumeQueue⽂件主要是加速消费者的消息索引。他的每个⽂件夹对应RocketMQ中的⼀个MessageQueue,⽂件夹下的⽂件记录了每个MessageQueue中的消息在CommitLog⽂件当中的偏移量。这样,消费者通过ComsumeQueue⽂件,就可以快速找到CommitLog⽂件中感兴趣的消息记录。⽽消费者在ConsumeQueue⽂件当中的消费进度会保存在config/consumerOffset.json⽂件当中。

ConsumerQueue=30w固定大小20byte的数据块组成(8字节msgPhyOffset表示起始位置+4字节msgSize文件占用长度+8字节magTagCode消息的tag的Hash值)。

IndexFile⽂件主要是辅助消息检索。他的作⽤主要是⽤来⽀持根据key和timestamp检索消息。它的⽂件名⽐较特殊不以消息偏移量命名⽽是⽤的时间命名,但是其实它也是⼀个固定⼤⼩的⽂件。

IndexFile=固定40字节indexHeader+固定20字节的个数500w的slot+固定20字节最多个数500W*4的index。

indexHeader表示文件头,slots表示一系列的槽位,index表示真正的索引,槽位里面存放index,槽位紧挨着indexHeader而每个槽位里面最多存放4个index。

在这里插入图片描述
在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1289733.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

软著项目推荐 深度学习图像风格迁移 - opencv python

文章目录 0 前言1 VGG网络2 风格迁移3 内容损失4 风格损失5 主代码实现6 迁移模型实现7 效果展示8 最后 0 前言 &#x1f525; 优质竞赛项目系列&#xff0c;今天要分享的是 &#x1f6a9; 深度学习图像风格迁移 - opencv python 该项目较为新颖&#xff0c;适合作为竞赛课题…

鸿蒙4.0开发笔记之ArkTS语法基础之条件渲染和循环渲染的使用(十五)

文章目录 一、条件渲染&#xff08;if&#xff09;二、循环渲染&#xff08;ForEach&#xff09; 一、条件渲染&#xff08;if&#xff09; 1、定义 正如其他语言中的if…else…语句&#xff0c;ArkTS提供了渲染控制的能力&#xff0c;条件渲染可根据应用的不同状态&#xff0…

数独训练APP -->>穿山甲SDK接入收益·android广告接入·app变现·广告千展收益·eCPM收益(2023.11.01 -2023.11.30)

接入穿山甲SDK的app 全屏文字滚动APP 数独训练APP 广告接入示例: Android 个人开发者如何接入广告SDK&#xff0c;实现app流量变现 接入穿山甲SDK app示例&#xff1a; android 数独小游戏 经典数独休闲益智 2023 11月份收益总结 – 数独训练APP app接入上架有一段时间了&a…

SQL server 根据已有数据库创建相同的数据库

文章目录 用导出的脚本创建相同的数据库导出建表脚本再次建表 一些sql语句 用导出的脚本创建相同的数据库 导出建表脚本 首先&#xff0c;右击要导出的数据库名&#xff0c;依次选择任务-生成脚本。 简介&#xff08;第一页&#xff09;处选择下一步&#xff0c;然后来到选择…

TCP协议实现一对一聊天

服务端代码&#xff1a; import java.io.BufferedReader; import java.io.IOException; import java.io.InputStreamReader; import java.io.PrintWriter; import java.net.ServerSocket; import java.net.Socket; import java.util.Scanner; /** * 发送消息线程 */ class…

【开源】基于Vue和SpringBoot的音乐偏好度推荐系统

项目编号&#xff1a; S 012 &#xff0c;文末获取源码。 \color{red}{项目编号&#xff1a;S012&#xff0c;文末获取源码。} 项目编号&#xff1a;S012&#xff0c;文末获取源码。 目录 一、摘要1.1 项目介绍1.2 项目录屏 二、系统设计2.1 功能模块设计2.1.1 音乐档案模块2.1…

2023中医药国际传承传播大会暨中医药图片和非遗艺术展隆重揭幕

由世界针灸学会联合会、中新社国际传播集团、中国新闻图片网、中国民族医药学会、中国针灸学会联合主办的“2023中医药国际传承传播大会”3日在广东省深圳市举办&#xff0c;“中医药国际传承传播图片展”与“非遗艺术展”在大会举办期间开展迎客。会议聚焦非遗健康、非遗传承等…

python 源码阅读

在 python 源码阅读 过程中发现的一些很有意思的书写习惯&#xff0c;学习靠拢一下&#xff1a; 1. Python 函数的文档字符串&#xff08;docstring&#xff09;的使用&#xff1a; 文档字符串是放置在函数定义内部顶部的字符串&#xff0c;用于描述函数的作用、参数、返回值…

深度探索 Python Pyramid 框架

更多资料获取 &#x1f4da; 个人网站&#xff1a;ipengtao.com Pyramid是一个灵活且强大的Python web框架&#xff0c;广泛用于构建各种规模的Web应用程序。本文将深度探索Pyramid框架&#xff0c;介绍其核心概念、应用场景以及一些高级特性。 安装与基础用法 首先&#xf…

JRT打印预览实现

JRT客户端部分已经实现了打印、导出Excel部分&#xff0c;之前没实现打印预览部分&#xff0c;因为要自己写打印预览界面&#xff0c;所以留到最后做&#xff0c;经过两晚的努力&#xff0c;实现了打印预览。 效果: 打印预览界面代码 package Monitor.Print;import javafx.a…

【Linux】telnet命令使用

telnet命令 telnet命令用于使用telnet协议与另一台主机进行通信。如果在没有主机参数的情况下调用telnet&#xff0c;它将进入命令模式&#xff0c;由其提示&#xff08;telnet>&#xff09;指示。在这种模式下&#xff0c;它接受并执行下面列出的命令。如果使用参数调用它…

相交链表(LeetCode 160)

文章目录 1.问题描述2.难度等级3.热门指数4.解题思路方法一&#xff1a;暴力法方法二&#xff1a;哈希表方法三&#xff1a;双栈方法四&#xff1a;双指针&#xff1a;记录链表长度方法五&#xff1a;双指针&#xff1a;互换遍历 5.实现示例参考文献 1.问题描述 给你两个单链表…

ai智能客服的发展是怎么样的?

随着科技的不断发展&#xff0c;人工智能&#xff08;AI&#xff09;技术已经逐渐进入了我们的生活&#xff0c;其中最为突出的应用之一就是智能客服。智能客服是指利用人工智能技术来实现客户服务的一种方式&#xff0c;它可以通过自然语言处理、机器学习等技术&#xff0c;来…

go语言 | etcd源码导读(一)

参考 本文参考https://zhuanlan.zhihu.com/p/600893553 https://www.topgoer.com/%E6%95%B0%E6%8D%AE%E5%BA%93%E6%93%8D%E4%BD%9C/go%E6%93%8D%E4%BD%9Cetcd/etcd%E4%BB%8B%E7%BB%8D.html 前沿etcd 与 raft etcd是使用Go语言开发的一个开源的、高可用的分布式key-value存储系…

【MySQL数据类型】

目录&#xff1a; 前言数据类型分类整数类型tinyintbit 小数类型floatdecimal 字符串类型charvarchar日期和时间enum & set在集合中查找find_in_set 前言 剑指offer&#xff1a;一年又4天 数据类型分类 整数类型 tinyint 整数类型都分为有符号和无符号两种&#xff0c;默…

【7】PyQt布局layout

目录 1. 布局简介 2. 水平布局QHBoxLayout 3. 竖直布局QVBoxLayout 4. 表单布局QFormLayout 5. 布局嵌套 1. 布局简介 一个pyqt窗口中可以有多个控件。所谓布局,指的就是多个控件在窗口中的展示方式 布局方式大致分为: 水平布局竖直布局网格布局表单布局 2. 水平布局Q…

zookeeper集群 +kafka集群

1.zookeeper kafka3.0之前依赖于zookeeper zookeeper是一个开源&#xff0c;分布式的架构&#xff0c;提供协调服务&#xff08;Apache项目&#xff09; 基于观察者模式涉及的分布式服务管理架构 存储和管理数据&#xff0c;分布式节点上的服务接受观察者的注册&#xff0c…

【EI会议征稿中,IEEE出版】第三届计算机科学、电子信息工程和智能控制技术国际会议(CEI 2023)

第三届计算机科学、电子信息工程和智能控制技术国际会议&#xff08;CEI 2023&#xff09; 2023 3rd International Conference on Computer Science, Electronic Information Engineering and Intelligent Control Technology 第三届计算机科学、电子信息工程和智能控制技术…

Auto4D:从时序点云标注4D物体

文章&#xff1a;Auto4D: Learning to Label 4D Objects from Sequential Point Clouds 作者&#xff1a;Bin Yang&#xff0c; Min Bai&#xff0c; Ming Liang&#xff0c; Wenyuan Zeng&#xff0c; Raquel Urtasun 编辑&#xff1a;点云PCL 来源&#xff1a;arXiv 2021 …

180天Java从入门到就业-Day04-01Java程序流程控制介绍、Java分支结构if语句

1.程序流程控制介绍 1.1 流程控制结构介绍 流程控制语句是用来控制程序中各语句执行顺序的语句,可以将语句组合成完成一定功能的逻辑模块。 一个程序会包含三种流程控制结构:顺序结构、分支结构、循环结构 顺序结构在没有使用程序流程控制语句(if-else语句、switch-case语…