rocketmq源码学习-nameServer

news2025/1/19 2:30:32

前言

最近看了下rocketmq的源码,计划针对最近的学习,做一个笔记,先从nameServer启动的逻辑开始记录吧

在rocketmq中,有四个关键的组件

  1. nameServer
  2. broker
  3. producer
  4. consumer

这四个组件之间的关系是这样的
在这里插入图片描述

关于nameSrv

nameserver的作用是:提供类似于注册中心的功能,基于这个前提,我们可以知道,rocketmq需要提供读写的功能,因为在mq中,组件之间的通信是通过netty完成的,所以,nameserver只需要提供nettyServer即可

在nameSrv中,提供了两个功能:
1、broker在启动的时候,会将自己的信息注册到nameSrv中;
2、producer在向topic中发送消息的时候,consumer在去broker中拉取消息的时候,会先去nameSrv上找对应的topic信息,所以这是nameSrv的两大功能

接着我们来看nameSrv启动的源码

启动源码

nameSrv启动的入口是在:org.apache.rocketmq.namesrv.NamesrvStartup#main 从启动脚本中可以证明这个点

NamesrvController controller = createNamesrvController(args);
start(controller);

在nameSrv启动的入口处,有两行代码是需要关注的

代码1:初始化NamesrvController对象,这里主要是解析配置文件、控制台配置信息,根据配置信息,初始化nettyServerConfig和NameSrvConfig;然后根据这两个config对象,初始化namesrvController对象

代码2:初始化nameSrv

代码1的逻辑,没有什么特别重要的,就是解析配置文件的逻辑,来看代码2:

public static NamesrvController start(final NamesrvController controller) throws Exception {

    if (null == controller) {
        throw new IllegalArgumentException("NamesrvController is null");
    }
    /**
     * 1.初始化了几个定时任务
     */
    boolean initResult = controller.initialize();
    if (!initResult) {
        controller.shutdown();
        System.exit(-3);
    }

    /**
     * 2.这里应该是一个钩子方法,在关闭nameserver的时候会被调用
     */
    Runtime.getRuntime().addShutdownHook(new ShutdownHookThread(log, new Callable<Void>() {
        @Override
        public Void call() throws Exception {
            controller.shutdown();
            return null;
        }
    }));

    // 3.启动nettyServer,也就是namesrv启动,可以接收客户端的请求(这里的客户端 = 服务发送者和服务消息者)
    controller.start();
    return controller;
}

在这个方法中,标注了三个关键点:
在第一个方法中,其中有一个定时任务,需要关注

// 这个任务,每10S执行一次
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
   @Override
       public void run() {
           NamesrvController.this.routeInfoManager.scanNotActiveBroker();
       }
   }, 5, 10, TimeUnit.SECONDS);

public void scanNotActiveBroker() {
    // nameSrv在内存中存储的broker信息
    Iterator<Entry<String, BrokerLiveInfo>> it = this.brokerLiveTable.entrySet().iterator();
    while (it.hasNext()) {
        Entry<String, BrokerLiveInfo> next = it.next();
        // 获取当前broker最后一次心跳时间
        long last = next.getValue().getLastUpdateTimestamp();
        // 最后心跳时间 + 120S,超过这个时间,还没有接收到心跳,认为broker出现问题,暂时先清除
        if ((last + BROKER_CHANNEL_EXPIRED_TIME) < System.currentTimeMillis()) {
            RemotingUtil.closeChannel(next.getValue().getChannel());
            it.remove();
            log.warn("The broker channel expired, {} {}ms", next.getKey(), BROKER_CHANNEL_EXPIRED_TIME);
            this.onChannelDestroy(next.getKey(), next.getValue().getChannel());
        }
    }
}

这个定时任务,是为了去扫描不活跃的broker信息,在nameSrv中,会保存所有的broker信息,这个broker信息是,每个broker在启动的时候,会通过netty请求,向所有nameSrv注册broker信息,同时会启动一个定时任务,每30S执行一次注册的逻辑(broker向nameSrv注册的逻辑,在后面记录broker启动源码的时候,会讲到,这里我们就先知道,broker会通过netty请求,每30S去nameSrv注册一次)

接收注册请求源码

我们来看下,nameSrv在接收到broker的注册请求,是如何处理的
在rocketmq中,发送netty请求的时候,会带上一个code编码,netty服务端会根据code码,路由到不同的类去处理,我们需要知道的是,broker在向nameSrv注册的时候,指定的code是:RequestCode.REGISTER_BROKER(103)

nameSrv在启动时,启动了一个nettyServer,在其

org.apache.rocketmq.remoting.netty.NettyRemotingServer.NettyServerHandler#channelRead0
	org.apache.rocketmq.remoting.netty.NettyRemotingAbstract#processMessageReceived
		org.apache.rocketmq.remoting.netty.NettyRemotingAbstract#processRequestCommand
			org.apache.rocketmq.namesrv.processor.DefaultRequestProcessor#processRequest

在processRequest方法中,会根据code编码,判断当前请求是什么,如果是REGISTER_BROKER,会调用:org.apache.rocketmq.namesrv.processor.DefaultRequestProcessor#registerBroker

// 这个方法太长,删除了一些不需要关心的代码
public RemotingCommand registerBroker(ChannelHandlerContext ctx,
    RemotingCommand request) throws RemotingCommandException {
    final RemotingCommand response = RemotingCommand.createResponseCommand(RegisterBrokerResponseHeader.class);
    final RegisterBrokerResponseHeader responseHeader = (RegisterBrokerResponseHeader) response.readCustomHeader();
    final RegisterBrokerRequestHeader requestHeader =
        (RegisterBrokerRequestHeader) request.decodeCommandCustomHeader(RegisterBrokerRequestHeader.class);

 
    // 注册broker信息 
    RegisterBrokerResult result = this.namesrvController.getRouteInfoManager().registerBroker(
        requestHeader.getClusterName(),
        requestHeader.getBrokerAddr(),
        requestHeader.getBrokerName(),
        requestHeader.getBrokerId(),
        requestHeader.getHaServerAddr(),
        topicConfigWrapper,
        null,
        ctx.channel()
    );

    return response;
}

// 这个方法是nameSrv真正去更新内存中的broker信息
public RegisterBrokerResult registerBroker(
    final String clusterName,
    final String brokerAddr,
    final String brokerName,
    final long brokerId,
    final String haServerAddr,
    final TopicConfigSerializeWrapper topicConfigWrapper,
    final List<String> filterServerList,
    final Channel channel) {
    RegisterBrokerResult result = new RegisterBrokerResult();
    try {
        try {
            // 1、加锁
            this.lock.writeLock().lockInterruptibly();
            // 2、注册到clusterAddrTable中
            Set<String> brokerNames = this.clusterAddrTable.get(clusterName);
            if (null == brokerNames) {
                brokerNames = new HashSet<String>();
                this.clusterAddrTable.put(clusterName, brokerNames);
            }
            brokerNames.add(brokerName);

            boolean registerFirst = false;

            // 3、如果brokerAddrTable中,根据brokerName,查不到对应的value,表示这个broker是第一次注册
            BrokerData brokerData = this.brokerAddrTable.get(brokerName);
            if (null == brokerData) {
                registerFirst = true;
                brokerData = new BrokerData(clusterName, brokerName, new HashMap<Long, String>());
                // 4、添加到map集合中
                this.brokerAddrTable.put(brokerName, brokerData);
            }
            Map<Long, String> brokerAddrsMap = brokerData.getBrokerAddrs();
            //Switch slave to master: first remove <1, IP:PORT> in namesrv, then add <0, IP:PORT>
            //The same IP:PORT must only have one record in brokerAddrTable
            Iterator<Entry<Long, String>> it = brokerAddrsMap.entrySet().iterator();
            while (it.hasNext()) {
                Entry<Long, String> item = it.next();
                if (null != brokerAddr && brokerAddr.equals(item.getValue()) && brokerId != item.getKey()) {
                    it.remove();
                }
            }

            String oldAddr = brokerData.getBrokerAddrs().put(brokerId, brokerAddr);
            registerFirst = registerFirst || (null == oldAddr);

            if (null != topicConfigWrapper
                && MixAll.MASTER_ID == brokerId) {
                if (this.isBrokerTopicConfigChanged(brokerAddr, topicConfigWrapper.getDataVersion())
                    || registerFirst) {
                    ConcurrentMap<String, TopicConfig> tcTable =
                        topicConfigWrapper.getTopicConfigTable();
                    if (tcTable != null) {
                        for (Map.Entry<String, TopicConfig> entry : tcTable.entrySet()) {
                            this.createAndUpdateQueueData(brokerName, entry.getValue());
                        }
                    }
                }
            }

            // 5、这个map中,存放的是:addr + broker信息,其中包括最后心跳时间 lastUpdateTimestamp
            BrokerLiveInfo prevBrokerLiveInfo = this.brokerLiveTable.put(brokerAddr,
                new BrokerLiveInfo(
                    System.currentTimeMillis(),
                    topicConfigWrapper.getDataVersion(),
                    channel,
                    haServerAddr));
            if (null == prevBrokerLiveInfo) {
                log.info("new broker registered, {} HAServer: {}", brokerAddr, haServerAddr);
            }

            if (filterServerList != null) {
                if (filterServerList.isEmpty()) {
                    this.filterServerTable.remove(brokerAddr);
                } else {
                    this.filterServerTable.put(brokerAddr, filterServerList);
                }
            }

            if (MixAll.MASTER_ID != brokerId) {
                String masterAddr = brokerData.getBrokerAddrs().get(MixAll.MASTER_ID);
                if (masterAddr != null) {
                    BrokerLiveInfo brokerLiveInfo = this.brokerLiveTable.get(masterAddr);
                    if (brokerLiveInfo != null) {
                        result.setHaServerAddr(brokerLiveInfo.getHaServerAddr());
                        result.setMasterAddr(masterAddr);
                    }
                }
            }
        } finally {
            this.lock.writeLock().unlock();
        }
    } catch (Exception e) {
        log.error("registerBroker Exception", e);
    }

    return result;
}

在上面方法中,注释5这里的map:brokerLiveTable就是在前面定时扫描不活跃的broker时,所依赖的map

总结

nameSrv启动的源码比较简单,其实就干了两件事情
1、初始化nettyServer服务端,用来接收客户端请求,这里的客户端,包括:producer、consumer、broker
2、启动了一个定时任务,定时去扫描内存中不活跃的broker信息

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/84926.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

[附源码]Python计算机毕业设计钓鱼爱好者交流平台Django(程序+LW)

该项目含有源码、文档、程序、数据库、配套开发软件、软件安装教程 项目运行 环境配置&#xff1a; Pychram社区版 python3.7.7 Mysql5.7 HBuilderXlist pipNavicat11Djangonodejs。 项目技术&#xff1a; django python Vue 等等组成&#xff0c;B/S模式 pychram管理等…

老照片修复清晰?父母以前的老照片还能修复吗?

父母结婚时拍摄的结婚照片&#xff0c;现在大概快四十年了&#xff0c;因为保存不善&#xff0c;导致照片泛黄&#xff0c;严重模糊。因为这是父母年轻的时候唯一保留下来的&#xff0c;对我们来说意义重大&#xff0c;所以想要修复照片可以实现吗&#xff1f; 有些照相馆是提…

论文投稿指南——中国(中文EI)期刊推荐(第6期)

&#x1f680; EI是国际知名三大检索系统之一&#xff0c;在学术界的知名度和认可度仅次于SCI&#xff01;&#x1f384;&#x1f388; 【前言】 想发论文怎么办&#xff1f;手把手教你论文如何投稿&#xff01;那么&#xff0c;首先要搞懂投稿目标——论文期刊。其中&#xf…

ADI Blackfin DSP处理器-BF533的开发详解51:Bin_Conver (图像二值变换处理)(含源码)

硬件准备 ADSP-EDU-BF533&#xff1a;BF533开发板 AD-HP530ICE&#xff1a;ADI DSP仿真器 软件准备 Visual DSP软件 硬件链接 功能介绍 代码实现了图像二值变换处理&#xff0c;代码运行时&#xff0c;会通过文件系统打开工程文件根目下" …/ImageView"路径中的…

阿里云DataWorks荣获DAMA中国数据治理优秀产品奖

DAMA&#xff08;国际数据管理协会&#xff09;是一个全球性的专业组织&#xff0c;协会自1980年成立以来&#xff0c;一直致力于数据管理和数字化的研究、实践及相关知识体系的建设&#xff0c;先后发行了《DAMA 数据管理字典》和《DAMA数据管理的知识体系》等&#xff0c;该知…

C/C++程序的断点调试 - Visual Studio Code

本文以Visual Studio Code为例&#xff0c;简述C/C程序断点调试的基本方法和过程。其它的IDE环境&#xff0c;大同小异。 本文引用自作者编写的下述图书; 本文允许以个人学习、教学等目的引用、讲授或转载&#xff0c;但需要注明原作者"海洋饼干叔 叔"&#xff1b;本…

视频特效如何制作?快把这些方法收好

小伙伴们平时刷短视频的时候&#xff0c;有没有发现一些短视频的效果很惊艳。这些惊艳的效果&#xff0c;大部分都是在视频中添加的一些动画特效。那你们知道手机视频怎么添加特效吗&#xff1f;为了帮助大家解决这个问题&#xff0c;接下来我就将为大家分享几种添加特效的方法…

架构高可用之限流-抽刀断水水更流

上图中是一个水坝泄洪的图&#xff0c;那么&#xff0c;对于软件系统&#xff0c;如何使用最方便的可编程的方式增加服务限流能力呢&#xff1f; 下面我结合一个常规的springCloud项目实践了一把&#xff0c;希望他山之石可以攻玉。 背景 简单使用jmeter&#xff0c;压20个并…

FL Studio21.0.0完整版最高版本升级功能有哪些?

支持苹果 Silicon 芯片 – 对苹果 Silicon 芯片&#xff08;M1 芯片以及相关 CPU&#xff09;的原生 ARM 代码支持&#xff0c;但请注意&#xff1a; NewTime、NewTone 和一些 DirectWave 采样格式的导入功能尚未完全重构可能会有问题。 FL Studio-win21中文更新下载如下: htt…

新通药物被暂缓审议:科创属性遭质疑,招股书“数据打架”

12月12日&#xff0c;上海证券交易所披露的信息显示&#xff0c;西安新通药物研究股份有限公司&#xff08;下称“新通药物”&#xff09;的首发申请被暂缓审议。据贝多财经了解&#xff0c;新通药物于2021年12月6日在科创板递交招股书&#xff0c;计划募资12.79亿元。 科创板上…

识破贷后资金归集——关联网络

近几年&#xff0c;金融机构为了扩大信贷规模&#xff0c;抢占市场份额&#xff0c;通过贷款将贷款发放给无法直接通过金融机构获得贷款的个人或者企业&#xff0c;但这也给金融机构带来了多重风险。 首先&#xff0c;我们来看下资金归集是什么。所谓资金归集&#xff0c;是银…

GCSE英语语言考试-语言和结构

Language语言 Example of a simile from The Hunger Games, Suzanne Collins 《饥饿游戏》中的比喻例子&#xff0c;苏珊娜-柯林斯的作品 When talking about language in prose fiction, there are a number of things you could look for: 在谈论散文小说的语言时&#xff0c…

如何在XMLMap端口修改字段映射?

在使用知行EDI系统的过程中&#xff0c;我们经常会用到XMLMap端口进行数据转化&#xff0c;XMLMap端口可以通过拖拽方式进行字段取值映射&#xff0c;同时也可以写代码添加字段对应的取值及判断条件。有时在完成映射后&#xff0c;发现源文件/目标文件待映射的字段和段落需要添…

安卓玩机搞机技巧综合资源-----闲置手机当摄像头 当监控 上网课必备 多软件评测【十四】

接上篇 安卓玩机搞机技巧综合资源------如何提取手机分区 小米机型代码分享等等 【一】 安卓玩机搞机技巧综合资源------开机英文提示解决dm-verity corruption your device is corrupt. 设备内部报错 AB分区等等【二】 安卓玩机搞机技巧综合资源------EROFS分区格式 小米红…

为什么不能使用bigdecimal的equals比较大小

BigDecimal&#xff0c;相信对于很多人来说都不陌生&#xff0c;很多人都知道他的用法&#xff0c;这是一种 java.math 包中提供的一种可以用来进行精确运算的类型。 很多人都知道&#xff0c;在进行金额表示、金额计算等场景&#xff0c;不能使用 double、float 等类型&#…

一键登陆了解一下

我们先来看一下目前的一些登录方式&#xff1a; 账号、密码登陆 使用账号加密码是最传统的登录方式&#xff0c;可以说是简单粗暴的&#xff0c;一般也不会出现什么问题。 缺点 但这种方式要求用户要记住自己的账号和密码&#xff0c;也就是有一个记忆成本。用户为了降低记忆…

【Tryhackme】KoTH Food CTF(前端验证绕过,图片隐写,SUID提权:vim.basic)

免责声明 本文渗透的主机经过合法授权。本文使用的工具和方法仅限学习交流使用&#xff0c;请不要将文中使用的工具和渗透思路用于任何非法用途&#xff0c;对此产生的一切后果&#xff0c;本人不承担任何责任&#xff0c;也不对造成的任何误用或损害负责。 服务发现 ┌──(r…

学习管理系统五大好处

正如我们先前提到过的&#xff0c;对于公司来说&#xff0c;建立“学习型文化”可以带来许许多多的好处。然而&#xff0c;企业规模会越来越大&#xff0c;员工的培训学习需求并不会减少&#xff0c;这也会为企业的员工培训带来压力。学习管理系统&#xff08;LMS&#xff09;可…

GCSE英语语言考试-对虚构小说的问题作答

How to analyse a fiction extract 如何分析虚构小说节选 In an analytical response, you should show how language and structure create meaning. You could also explore the effect on the reader. An analytical response uses evidence from the text to make clear po…

教育培训机构教学课程内容视频加密是如何做的?

阿酷TONY / 2022-12-13 / 长沙 / 原创组图 / 内容含实测链接可测效果 教育培训机构教学课程内容视频加密是如何做的&#xff1f;教育机构的web课程视频加密是如何实现的&#xff1f;主要通过以下的一些方式来实现&#xff1a; 目录&#xff1a; 1、VRM加密 2、播放器加密…