rocketmq源码学习-broker启动

news2025/1/17 18:04:30

前言

这篇笔记记录broker启动的源码学习

broker主要完成一下几件事情:
1.接收producer的发送请求,并对消息进行持久化、同步其他节点
2.接收consumer读取消息星球
3.定时向nameSrv注册心跳信息,保持连接

在启动的时候,也是分了两个方法
1、创建brokerController
2、启动brokerController

createBrokerController

入口方法:org.apache.rocketmq.broker.BrokerStartup#main
在这里,我们先看创建brokerController的方法
org.apache.rocketmq.broker.BrokerStartup#createBrokerController

在这个方法中,会初始化brokerConfig、nettyServerConfig、nettyClientConfig,然后根绝配置文件中的配置信息,填充这几个配置信息,具体的细节,不再一一阐述,可以debug或者是追下源码看下

/**
 * 7.初始化BrokerController对象
 */
final BrokerController controller = new BrokerController(
    brokerConfig,
    nettyServerConfig,
    nettyClientConfig,
    messageStoreConfig);
// remember all configs to prevent discard
controller.getConfiguration().registerConfig(properties);

/**
 * 8.调用brokerController的初始化方法
 */
boolean initResult = controller.initialize();
if (!initResult) {
    controller.shutdown();
    System.exit(-3);
}

在创建brokerController之后,会调用其initialize()方法进行初始化。在这个初始化的方法中,总结来看,就做了两个事情:
1、加载消息文件到内存中:因为broker中负责持久化消息信息,所以在启动的时候,需要从磁盘文件上,加载消息到内存中
2、初始化了一大堆的线程池和定时线程池,暂时没有一一去看这些线程池是为了干什么

在这里插入图片描述

定时拉取所有的nameSrv,更新到内存中

在这一批的定时线程池中,有一个需要关注,就是下面截图中的这个,这个定时线程池要做的事情

  1. 通过http请求,从一个url请求的返回结果中,获取到所有nameSrv地址(http://jmenv.tbsite.net:8080/rocketmq/nsaddr); 为什么这个地址可以返回nameSrv地址信息,暂时还没有搞明白
  2. 让后更新所有的nameSrv地址信息到本地内存中的这个集合中namesrvAddrList
  3. 这里保存到本地内存中,broker在定时向所有nameSrv注册心跳信息时,会用到这个内存中的nameSrv地址

在这里插入图片描述

public String fetchNameServerAddr() {
    try {
        String addrs = this.topAddressing.fetchNSAddr();
        if (addrs != null) {
            if (!addrs.equals(this.nameSrvAddr)) {
                log.info("name server address changed, old: {} new: {}", this.nameSrvAddr, addrs);
                this.updateNameServerAddressList(addrs);
                this.nameSrvAddr = addrs;
                return nameSrvAddr;
            }
        }
    } catch (Exception e) {
        log.error("fetchNameServerAddr Exception", e);
    }
    return nameSrvAddr;
}

public String fetchNameServerAddr() {
    try {
        // 获取当前所有的nameSrv地址信息
        String addrs = this.topAddressing.fetchNSAddr();
        if (addrs != null) {
            if (!addrs.equals(this.nameSrvAddr)) {
                log.info("name server address changed, old: {} new: {}", this.nameSrvAddr, addrs);
                // 更新nameSrv地址信息到内存中
                this.updateNameServerAddressList(addrs);
                this.nameSrvAddr = addrs;
                return nameSrvAddr;
            }
        }
    } catch (Exception e) {
        log.error("fetchNameServerAddr Exception", e);
    }
    return nameSrvAddr;
}

public void updateNameServerAddressList(final String addrs) {
    List<String> lst = new ArrayList<String>();
    String[] addrArray = addrs.split(";");
    for (String addr : addrArray) {
        lst.add(addr);
    }

    this.remotingClient.updateNameServerAddressList(lst);
}


@Override
public void updateNameServerAddressList(List<String> addrs) {
    List<String> old = this.namesrvAddrList.get();
    boolean update = false;

    if (!addrs.isEmpty()) {
        if (null == old) {
            update = true;
        } else if (addrs.size() != old.size()) {
            update = true;
        } else {
            for (int i = 0; i < addrs.size() && !update; i++) {
                if (!old.contains(addrs.get(i))) {
                    update = true;
                }
            }
        }

        if (update) {
            Collections.shuffle(addrs);
            log.info("name server address updated. NEW : {} , OLD: {}", addrs, old);
            // 这里保存的所有nameSrv地址,是broker在定时向nameSrv注册服务心跳信息时,会用到
            this.namesrvAddrList.set(addrs);
        }
    }
}

brokerController.start()

org.apache.rocketmq.broker.BrokerController#start
总结来说,在createBrokerController中new出来了一批组件,在start()方法中,分别调用其start()方法进行启动

public void start() throws Exception {
    // 1.核心:消息存储组件 下面启动的组件,都是在createBrokerController时创建的
    if (this.messageStore != null) {
        this.messageStore.start();
    }

    // 2.核心: netty服务端组件
    if (this.remotingServer != null) {
        this.remotingServer.start();
    }

    if (this.fastRemotingServer != null) {
        this.fastRemotingServer.start();
    }

    if (this.fileWatchService != null) {
        this.fileWatchService.start();
    }

    // 3.核心:netty client组件,负责向外发送请求
    if (this.brokerOuterAPI != null) {
        this.brokerOuterAPI.start();
    }

    if (this.pullRequestHoldService != null) {
        this.pullRequestHoldService.start();
    }

    if (this.clientHousekeepingService != null) {
        this.clientHousekeepingService.start();
    }

    if (this.filterServerManager != null) {
        this.filterServerManager.start();
    }

    // 4.这个判断,应该是判断是否启用了dleger,dleger是rocketmq在后期版本中引入的一个组件,可以用来主从切换等,默认值是false
    if (!messageStoreConfig.isEnableDLegerCommitLog()) {
        startProcessorByHa(messageStoreConfig.getBrokerRole());
        handleSlaveSynchronize(messageStoreConfig.getBrokerRole());
        // 向所有nameSrv注册broker信息
        this.registerBrokerAll(true, false, true);
    }

    /**
     * 5.这里启动的定时线程就是 broker定时向nameServer注册的逻辑,默认配置是30S,也就是说默认是每30S
     * ,向nameServer注册一下当前broker的信息
     */
    this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {

        @Override
        public void run() {
            try {
                BrokerController.this.registerBrokerAll(true, false, brokerConfig.isForceRegister());
            } catch (Throwable e) {
                log.error("registerBrokerAll Exception", e);
            }
        }
    }, 1000 * 10, Math.max(10000, Math.min(brokerConfig.getRegisterNameServerPeriod(), 60000)), TimeUnit.MILLISECONDS);

    if (this.brokerStatsManager != null) {
        this.brokerStatsManager.start();
    }

    if (this.brokerFastFailure != null) {
        this.brokerFastFailure.start();
    }
}

这里可以看到,启动了N个组件;需要注意的,有几个

  1. 调用messageStore.start()
  2. 初始化nettyServer和nettyClient
  3. 向所有nameSrv注册
  4. 启动定时线程池,定时向所有nameSrv注册broker信息,更新其心跳时间

messageStore.start()

我们先来看messageStore.start()方法

在messageStore启动的时候,有N多的逻辑,我先记录当前所了解到的,所用到的

/**
* 1、启动reputMessageService,更新consumerQueue文件
 * 这里启动的异步线程,是为了监听commitLog文件,然后根据文件中写入的数据,更新consumerQueue和indexFiLe
 */
this.reputMessageService.setReputFromOffset(maxPhysicalPosInLogicQueue);
this.reputMessageService.start();


// 2、如果没有启动dleger模式,那就启动haService,这个service看起来是节点间同步msg时用到的
if (!messageStoreConfig.isEnableDLegerCommitLog()) {
    this.haService.start();
}

// 3、这里启动的线程,是去处理延迟消息,将延迟消息的topic修改为真正的topic
this.handleScheduleMessageService(messageStoreConfig.getBrokerRole());

// 4、这里启动的几个组件,暂时还没细看,但是看起来和消息存储有关系
// 这个应该是和StoreCheckPoint文件有关系
this.flushConsumeQueueService.start();
// 这个看起来和mappedFileQueue数据的持久化有关系
this.commitLog.start();
this.storeStatsService.start();


/**
* 这里添加的定时任务,其中一个就是定时清理过期的文件
* 这里清理过期文件的逻辑暂时没有看明白,但是看内部的处理逻辑,是判断哪些需要删除,然后clear()
*/
this.addScheduleTask();

reputMessageService

在rocketmq中,除了commitLog文件,还有consumerQueue和indexFile文件,这两个文件,都是在commitLog文件写入之后,再根据commitLog文件,去写的,具体的逻辑,就在这个service中
在这个service中,会每隔1ms,执行一次判断,判断当前commitLog是否写入了新的消息,然后将新的消息,通过不同的CommitLogDispatcher写入到不同的文件中,其中最重要的是,就是consumerQueue和indexFile这两个文件

this.handleScheduleMessageService(messageStoreConfig.getBrokerRole());

这个service,是来处理延迟消息的,对于延迟消息,broker在启动的时候,会针对每个延迟级别,初始化一个timerTask,然后通过Timer去定时轮询调度
最主要的逻辑,就在截图中这个代码中,后面单独起一篇博客记录延迟消息的原理
在这里插入图片描述

定时向nameSrv注册心跳

接着我们来看定时向nameSrv注册broker心跳时间的逻辑

在这里插入图片描述
这里可以看到,是启动了一个定时执行的线程池,然后去执行注册的逻辑

这里注册的逻辑,中间的代码就不贴了,比较简单,基本上就是一些简单的判断,然后继续往下层调用,来看下相对比较底层的代码

org.apache.rocketmq.broker.out.BrokerOuterAPI#registerBrokerAll

在这里插入图片描述
在这个方法中,第一点,获取所有nameSrv的逻辑,和前面在brokerController启动时,启动的一个线程有关系,这里是从内存中获取nameSrv地址信息(namesrvAddrList 从这个集合中获取) ,在前面,有一个异步线程在定时的更新这个集合

然后这里通过countDownLatch的机制,在所有节点注册成功之后,返回

总结

总结来看,broker在启动的时候

  1. 启动nettyServer和nettyClient
  2. 启动了一些异步定时任务,定时更新一些数据到内存中,同时定时去处理一些逻辑

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/85167.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Ant Design 6.0.0 实践集合

提示&#xff1a;文章写完后&#xff0c;目录可以自动生成&#xff0c;如何生成可参考右边的帮助文档 使用的6.0.0 beta版本 文章目录 前言一、pandas是什么&#xff1f;二、使用步骤 1.引入库2.读入数据总结前言 Ant Design 简称为 Antd antd 为 Web 应用提供了丰富的基础 U…

操作指南|通过JumpServer实现Kubernetes运维安全审计

本文重点介绍如何通过JumpServer实现Kubernetes的运维安全审计。此前&#xff0c;我们专门介绍过在Kubernetes集群上快速部署JumpServer的方法步骤&#xff0c;可参见《操作指南&#xff5c;在Kubernetes集群上快速部署JumpServer开源堡垒机》一文。 一、Kubernetes运维审计现…

ABP Vnext 学习03-授权中心微信小程序登录

前言 小程序开发的 前置条件 1 需要服务端是https 和域名 Ip 是不可以的 2 需要申请appid 小程序的官方流程图 个人理解 对于上面的流程图 步骤一 客户端 小程序调用wx.login 方法 获取用户的code 这个code 是限时的五分钟就会过期 拿到code 就可以向服务端发起登录请求了 …

vue3中ref的作用及ref和reactive之间的转化

ref的作用&#xff1a; &#xff08;1&#xff09;第一个作用&#xff1a;和vue一样&#xff1a; 绑在dom节点上拿到的是dom节点&#xff1b;绑在组件上拿到的是组件对象&#xff1b; 定义方式&#xff1a; <template><div><input type"text" ref&…

java运行数据区域分布

Java在运行程序过程中&#xff0c;会将自己的内存划分为若干个不同的数据数据区域&#xff0c;这些若干个区域&#xff0c;每个区域都有自己的用途&#xff0c;具体看下图 java是面向对象的语言&#xff0c;那么虚拟机中的数据&#xff08;对象&#xff09;是怎么被创建出来的呢…

加密 笔记

文章目录简单异或加密对称加密DES加密AES加密1.简单的加密解密逻辑2.填充方式**noPadding**3.加密模式1、**ECB模式&#xff08;默认&#xff09;**2、**CBC模式**3、CFB模式4、OFB模式5、CTR模式代码案例ECB加密和CBC加密测试非对称加密RSA加密AES和RSA混合加密哈希散列算法什…

[附源码]Python计算机毕业设计动物保护资讯推荐网站Django(程序+LW)

该项目含有源码、文档、程序、数据库、配套开发软件、软件安装教程 项目运行 环境配置&#xff1a; Pychram社区版 python3.7.7 Mysql5.7 HBuilderXlist pipNavicat11Djangonodejs。 项目技术&#xff1a; django python Vue 等等组成&#xff0c;B/S模式 pychram管理等…

RabbitMQ[3]-RabbitMQ如何保证消息的可靠性投递与消费?

上篇文章&#xff1a;RabbitMQ的核心概念有哪些&#xff1f;它们的职责是什么&#xff1f;中我们详细介绍了RabbitMQ的工作模式&#xff0c;根据它的工作模式&#xff0c;一条消息从生产者发出&#xff0c;到消费者消费&#xff0c;需要经历以下4个步骤&#xff1a; 生产者将消…

Java笔记——String类各种方法的使用总结(附带实例)

String类的获取方法 String类实现获取功能的方法有 int length() —— 获取字符串长度 char charAt(int index) —— 获取指定索引处的字符值 int indexOf(int ch) —— 获取指定字符第一次出现的索引位置 int indexOf(String str) —— 获取指定字符串第一次出现的索引位…

ArrayDeque源码解析

ArrayDeque源码解析 问题 &#xff08;1&#xff09;什么是双端队列&#xff1f; &#xff08;2&#xff09;ArrayDeque 是怎么实现双端队列的&#xff1f; &#xff08;3&#xff09;ArrayDeque 是线程安全的吗&#xff1f; &#xff08;4&#xff09;ArrayDeque 是有界的…

【正点原子FPGA连载】 第三十五章双目OV5640摄像头HDMI显示实验 摘自【正点原子】DFZU2EG/4EV MPSoC 之FPGA开发指南V1.0

1&#xff09;实验平台&#xff1a;正点原子MPSoC开发板 2&#xff09;平台购买地址&#xff1a;https://detail.tmall.com/item.htm?id692450874670 3&#xff09;全套实验源码手册视频下载地址&#xff1a; http://www.openedv.com/thread-340252-1-1.html 第三十五章双目O…

基于jsp+mysql+ssm二手书交易管理系统-计算机毕业设计

项目介绍 这样一个二手书交易网站为用户提供了一个可以在网上买卖图书的平台&#xff0c;用户可以通过二手书交易管理系统进行注册或登录操作&#xff0c;登录成功后可以查看自己已发布的售书信息或者求购信息。同时&#xff0c;用户可以浏览其他用户发布的售书信息和求购信息…

基于51单片机的多层电梯(1-16层)运行系统仿真设计_层数可改

基于51单片机的多层电梯(1-16层)运行系统仿真设计_层数可改 仿真图proteus 8.9 程序编译器&#xff1a;keil 4/5 编程语言&#xff1a;C语言 设计编号&#xff1a;S0027 视频演示 基于51单片机的多层电梯(1-16层)运行系统仿真设计演示视频主要功能&#xff1a; 结合实际情…

[附源码]Python计算机毕业设计SSM基于web的学生社团管理系统(程序+LW)

项目运行 环境配置&#xff1a; Jdk1.8 Tomcat7.0 Mysql HBuilderX&#xff08;Webstorm也行&#xff09; Eclispe&#xff08;IntelliJ IDEA,Eclispe,MyEclispe,Sts都支持&#xff09;。 项目技术&#xff1a; SSM mybatis Maven Vue 等等组成&#xff0c;B/S模式 M…

[附源码]Python计算机毕业设计SSM基于的二手房交易系统(程序+LW)

项目运行 环境配置&#xff1a; Jdk1.8 Tomcat7.0 Mysql HBuilderX&#xff08;Webstorm也行&#xff09; Eclispe&#xff08;IntelliJ IDEA,Eclispe,MyEclispe,Sts都支持&#xff09;。 项目技术&#xff1a; SSM mybatis Maven Vue 等等组成&#xff0c;B/S模式 Ma…

基于Java+SQL Server2008开发的(WinForm)个人财物管理系统【100010036】

一、需求分析 个人财务管理系统是智能化简单化个人管理的重要的组成部分。并且随着计算机技术的飞速发展&#xff0c;计算机在管理方面应用的旁及&#xff0c;利用计算机来实现个人财务管理势在必行。本文首先介绍了个人财务管理系统的开发目的&#xff0c;其次对个人财务管理…

2022年12月中国数据库排行榜:OceanBase立足创新登榜首,华为腾讯排名上升树雄心

不经一番寒彻骨&#xff0c;怎得梅花扑鼻香。 2022年12月的 墨天轮中国数据库流行度排行榜 火热出炉&#xff0c;本月共有249个数据库参与排名&#xff0c;相比上月新增3个数据库。本月排行榜前十用一句话可以概括为&#xff1a;榜单前十一片红&#xff0c;TODO 格局重洗牌&…

[附源码]Python计算机毕业设计SSM基于web技术的米其林轮胎管理系统(程序+LW)

项目运行 环境配置&#xff1a; Jdk1.8 Tomcat7.0 Mysql HBuilderX&#xff08;Webstorm也行&#xff09; Eclispe&#xff08;IntelliJ IDEA,Eclispe,MyEclispe,Sts都支持&#xff09;。 项目技术&#xff1a; SSM mybatis Maven Vue 等等组成&#xff0c;B/S模式 M…

SpringBoot中使用Spring-Retry重试框架 - 第454篇

悟纤&#xff1a;最近我看到自己之前的try/catch、while代码进行请求的重试&#xff0c;看着很不舒服。 师傅&#xff1a;确实了&#xff0c;为师以前也是写出过这样的一堆难看的代码。 悟纤&#xff1a;那师傅这个事情有解吗&#xff1f; ​师傅&#xff1a;徒儿&#xff0c;…

博客网页制作基础大二dw作业 web课程设计网页制作 个人网页设计与实现 我的个人博客网页开发

&#x1f389;精彩专栏推荐&#x1f447;&#x1f3fb;&#x1f447;&#x1f3fb;&#x1f447;&#x1f3fb; ✍️ 作者简介: 一个热爱把逻辑思维转变为代码的技术博主 &#x1f482; 作者主页: 【主页——&#x1f680;获取更多优质源码】 &#x1f393; web前端期末大作业…