全网最细RocketMQ源码一:NameSrv

news2025/1/11 18:43:15

一、入口

在这里插入图片描述
NameServer的启动源码在NameStartup,现在开始debug之旅

二、createNamesrcController

public static NamesrvController createNamesrvController(String[] args) throws IOException, JoranException {
        System.setProperty(RemotingCommand.REMOTING_VERSION_KEY, Integer.toString(MQVersion.CURRENT_VERSION));
        //PackageConflictDetect.detectFastjson();


        Options options = ServerUtil.buildCommandlineOptions(new Options());
        // 启动时的参数信息 有commandLine 管理了。
        commandLine = ServerUtil.parseCmdLine("mqnamesrv", args, buildCommandlineOptions(options), new PosixParser());
        if (null == commandLine) {
            System.exit(-1);
            return null;
        }


        // namesrv 配置。
        final NamesrvConfig namesrvConfig = new NamesrvConfig();
        // netty 服务器配置。
        final NettyServerConfig nettyServerConfig = new NettyServerConfig();

        // namesrv服务器 监听端口 修改为9876
        nettyServerConfig.setListenPort(9876);


        if (commandLine.hasOption('c')) {
            // 读取 -c 选项的值
            String file = commandLine.getOptionValue('c');
            if (file != null) {
                // 读取 config 文件数据 到 properties 内
                InputStream in = new BufferedInputStream(new FileInputStream(file));
                properties = new Properties();
                properties.load(in);

                // 如果 config 配置文件 内的配置 涉及到 namesrvConfig 或者 nettyServerConfig 的字段,那么进行复写。
                MixAll.properties2Object(properties, namesrvConfig);
                MixAll.properties2Object(properties, nettyServerConfig);
                // 将读取的 配置文件 路径 保存 到 字段。
                namesrvConfig.setConfigStorePath(file);
                System.out.printf("load config properties file OK, %s%n", file);
                in.close();
            }
        }

        if (commandLine.hasOption('p')) {
            InternalLogger console = InternalLoggerFactory.getLogger(LoggerName.NAMESRV_CONSOLE_NAME);
            MixAll.printObjectProperties(console, namesrvConfig);
            MixAll.printObjectProperties(console, nettyServerConfig);
            System.exit(0);
        }


        // 将启动时 命令行 设置的kv 复写到 namesrvConfig内。
        MixAll.properties2Object(ServerUtil.commandLine2Properties(commandLine), namesrvConfig);

        if (null == namesrvConfig.getRocketmqHome()) {
            System.out.printf("Please set the %s variable in your environment to match the location of the RocketMQ installation%n", MixAll.ROCKETMQ_HOME_ENV);
            System.exit(-2);
        }

        // 创建日志对象。
        LoggerContext lc = (LoggerContext) LoggerFactory.getILoggerFactory();
        JoranConfigurator configurator = new JoranConfigurator();
        configurator.setContext(lc);
        lc.reset();
        configurator.doConfigure(namesrvConfig.getRocketmqHome() + "/conf/logback_namesrv.xml");
        log = InternalLoggerFactory.getLogger(LoggerName.NAMESRV_LOGGER_NAME);


        MixAll.printObjectProperties(log, namesrvConfig);
        MixAll.printObjectProperties(log, nettyServerConfig);

        // 创建 控制器
        // 参数1:namesrvConfig
        // 参数2:网络层配置 nettyServerConfig
        final NamesrvController controller = new NamesrvController(namesrvConfig, nettyServerConfig);

        // remember all configs to prevent discard
        controller.getConfiguration().registerConfig(properties);

        return controller;
    }

主要做了几件事:

  1. 创建了NamesrvConfig
  2. 创建了nettyServerConfig
  3. 创建了NamesrvController

NamesrvController详解:

public class NamesrvController {
    private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.NAMESRV_LOGGER_NAME);

    private final NamesrvConfig namesrvConfig;
    private final NettyServerConfig nettyServerConfig;

    // 调度线程池,执行定时任务,两件事:1. 检查存活的broker状态  2. 打印配置
    private final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl(
        "NSScheduledThread"));

    // 管理kv配置。
    private final KVConfigManager kvConfigManager;
    // 管理路由信息的对象,重要。
    private final RouteInfoManager routeInfoManager;

    // 网络层封装对象,重要。
    private RemotingServer remotingServer;

    // ChannelEventListener ,用于监听channel 状态,当channel状态 发生改变时 close idle... 会向 事件队列发起事件,事件最终由 该service处理。
    private BrokerHousekeepingService brokerHousekeepingService;

    // 业务线程池,netty 线程 主要任务是 解析报文 将 报文 解析成 RemotingCommand 对象,然后 就将 该对象 交给 业务 线程池 再继续处理。
    private ExecutorService remotingExecutor;

    private Configuration configuration;
    private FileWatchService fileWatchService;

    // 参数1:namesrvConfig
    // 参数2:网络层配置 nettyServerConfig
    public NamesrvController(NamesrvConfig namesrvConfig, NettyServerConfig nettyServerConfig) {
        this.namesrvConfig = namesrvConfig;
        this.nettyServerConfig = nettyServerConfig;

        this.kvConfigManager = new KVConfigManager(this);
        this.routeInfoManager = new RouteInfoManager();

        this.brokerHousekeepingService = new BrokerHousekeepingService(this);

        this.configuration = new Configuration(
            log,
            this.namesrvConfig, this.nettyServerConfig
        );
        this.configuration.setStorePathFromConfig(this.namesrvConfig, "configStorePath");
    }
 }

start(NamesrvController)

  public static NamesrvController start(final NamesrvController controller) throws Exception {

        if (null == controller) {
            throw new IllegalArgumentException("NamesrvController is null");
        }
        // 初始化方法..
        boolean initResult = controller.initialize();

        if (!initResult) {
            controller.shutdown();
            System.exit(-3);
        }


        // JVM HOOK ,平滑关闭的逻辑。 当JVM 被关闭时,主动调用 controller.shutdown() 方法,让服务器平滑关机。
        Runtime.getRuntime().addShutdownHook(new ShutdownHookThread(log, new Callable<Void>() {
            @Override
            public Void call() throws Exception {
                controller.shutdown();
                return null;
            }
        }));

        // 启动服务器。
        controller.start();


        return controller;
    }

主要做了几件事:

  1. controller初始化
   public boolean initialize() {
        // 加载本地kv配置
        this.kvConfigManager.load();
        // 创建网络服务器对象
        this.remotingServer = new NettyRemotingServer(this.nettyServerConfig, this.brokerHousekeepingService);
        // 创建业务线程池,默认线程数 8
        this.remotingExecutor =
            Executors.newFixedThreadPool(nettyServerConfig.getServerWorkerThreads(), new ThreadFactoryImpl("RemotingExecutorThread_"));

        // 注册协议处理器(缺省协议处理器)
        this.registerProcessor();

        // 定时任务1:每10秒钟检查 broker 存活状态,将idle状态的 broker 移除。
        this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {

            @Override
            public void run() {
                NamesrvController.this.routeInfoManager.scanNotActiveBroker();
            }
        }, 5, 10, TimeUnit.SECONDS);


        // 定时任务2:每10分钟 打印一遍 kv 配置。
        this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {

            @Override
            public void run() {
                NamesrvController.this.kvConfigManager.printAllPeriodically();
            }
        }, 1, 10, TimeUnit.MINUTES);
    }
  1. controller启动
    在这里插入图片描述
    会调用到NettyRemotingServer.start方法
 public void start() {
        // 当向channel pipeline 添加 handler 时 指定了 group 时,网络事件传播到 当前handler时,事件处理 由 分配给 handler 的线程执行。
        this.defaultEventExecutorGroup = new DefaultEventExecutorGroup(
                nettyServerConfig.getServerWorkerThreads(),
                new ThreadFactory() {

                    private AtomicInteger threadIndex = new AtomicInteger(0);

                    @Override
                    public Thread newThread(Runnable r) {
                        return new Thread(r, "NettyServerCodecThread_" + this.threadIndex.incrementAndGet());
                    }
                });

        // 创建共享的 处理器 handler
        prepareSharableHandlers();

        ServerBootstrap childHandler =
                // 配置服务端 启动对象
                // 配置工作组 boss 和 worker 组
                this.serverBootstrap.group(this.eventLoopGroupBoss, this.eventLoopGroupSelector)
                        // 设置服务端 ServerSocketChannel类型
                        .channel(useEpoll() ? EpollServerSocketChannel.class : NioServerSocketChannel.class)
                        // 设置服务端ch选项
                        .option(ChannelOption.SO_BACKLOG, 1024)
                        .option(ChannelOption.SO_REUSEADDR, true)
                        .option(ChannelOption.SO_KEEPALIVE, false)
                        // 客户端ch选项
                        .childOption(ChannelOption.TCP_NODELAY, true)
                        .childOption(ChannelOption.SO_SNDBUF, nettyServerConfig.getServerSocketSndBufSize())
                        .childOption(ChannelOption.SO_RCVBUF, nettyServerConfig.getServerSocketRcvBufSize())
                        // 设置服务器端口
                        .localAddress(new InetSocketAddress(this.nettyServerConfig.getListenPort()))
                        //
                        .childHandler(new ChannelInitializer<SocketChannel>() {
                            @Override
                            public void initChannel(SocketChannel ch) throws Exception {
                                // 初始化 客户端ch pipeline 的逻辑
                                ch.pipeline()
                                        .addLast(defaultEventExecutorGroup, HANDSHAKE_HANDLER_NAME, handshakeHandler)
                                        .addLast(defaultEventExecutorGroup,
                                                encoder,
                                                new NettyDecoder(),
                                                new IdleStateHandler(0, 0, nettyServerConfig.getServerChannelMaxIdleTimeSeconds()),
                                                connectionManageHandler,
                                                serverHandler
                                        );
                            }
                        });


        if (nettyServerConfig.isServerPooledByteBufAllocatorEnable()) {
            // 客户端开启 内存池,使用的内存池是  PooledByteBufAllocator.DEFAULT
            childHandler.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
        }

        try {
            // 服务器 绑定端口。
            ChannelFuture sync = this.serverBootstrap.bind().sync();
            InetSocketAddress addr = (InetSocketAddress) sync.channel().localAddress();
            // 将服务器成功绑定的端口号 赋值给 字段 port。
            this.port = addr.getPort();
        } catch (InterruptedException e1) {
            throw new RuntimeException("this.serverBootstrap.bind().sync() InterruptedException", e1);
        }

        // housekeepingService 不为空,则创建 网络异常事件 处理器
        if (this.channelEventListener != null) {
            this.nettyEventExecutor.start();
        }

        // 提交定时任务,每一秒 执行一次。
        // 扫描 responseTable 表,将过期的 responseFuture 移除。
        this.timer.scheduleAtFixedRate(new TimerTask() {

            @Override
            public void run() {
                try {
                    NettyRemotingServer.this.scanResponseTable();
                } catch (Throwable e) {
                    log.error("scanResponseTable exception", e);
                }
            }
        }, 1000 * 3, 1000);
    }



本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1374942.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

倍福(Bechhoff) CX8090嵌入式PC控制器开发没有想像中的那么难

笔者曾2023年初曾为云南阜外医院新风系统开发自动控制系统。医院所有新风设备和公区照明全部采用倍福嵌入式PC控制器实现智能控制。其中新风和供水计量通过CX8090实现控制&#xff1b;公区照明通过BC9050实现控制&#xff1b;并采用美国邦纳人机界面(THM035B&#xff09;实现远…

MES系统数据采集的几种方式

生产制造执行MES系统具有能够帮助企业实现生产数据收集与分析、生产计划管理、生产过程监控等的功能板块&#xff0c;在这里小编就不一一介绍了&#xff0c;主要讲讲它的数据采集功能板块&#xff0c;可以说&#xff0c;数据采集是该系统进行数据统计与生产管理等后续工作的基础…

Logstash应用-同步ES(elasticsearch)到HDFS

1.场景分析 现有需求需要将elasticsearch的备份至hdfs存储&#xff0c;根据以上需求&#xff0c;使用logstash按照天级别进行数据的同步 2.重难点 数据采集存在时间漂移问题&#xff0c;数据保存时使用的是采集时间而不是数据生成时间采用webhdfs无法对文件大小进行设置解决…

uniapp 设置底部导航栏

uniapp 设置原生 tabBar 底部导航栏。 设置底部导航栏 一、创建页面&#xff0c;一定要在 pages.json 文件中注册。 二、在 pages.json 文件中&#xff0c;设置 tabBar 配置项。 pages.json 页面 {"pages": [...],"globalStyle": {...},"uniIdRout…

软件测试|探索Python中获取最高数值的几种方法

前言 在数据分析、统计和编程领域&#xff0c;经常会遇到需要从一组数值中找出最高数值的情况。Python 作为一门功能丰富的编程语言&#xff0c;提供了多种方法来实现这一目标。在本文中&#xff0c;我们将探索几种获取最高数值的方法&#xff0c;帮助大家在不同情况下选择最适…

Python 潮流周刊#34:Python 3.13 的 JIT 方案又新又好

△△请给“Python猫”加星标 &#xff0c;以免错过文章推送 你好&#xff0c;我是猫哥。这里每周分享优质的 Python、AI 及通用技术内容&#xff0c;大部分为英文。本周刊开源&#xff0c;欢迎投稿[1]。另有电报频道[2]作为副刊&#xff0c;补充发布更加丰富的资讯&#xff0c;…

滚柱导轨精度等级是如何划分?

滚柱导轨的精度等级主要根据其表面精度、滑块与导轨表面的公差以及定位精度等性能指标来划分。根据不同的标准和应用需求&#xff0c;精度等级的划分存在一定的差异。 1、行走平行度&#xff1a;普通级&#xff08;无标注/C&#xff09;5μm&#xff0c;高级&#xff08;H&…

如何在数学建模竞赛中稳定拿奖

✅作者简介&#xff1a;人工智能专业本科在读&#xff0c;喜欢计算机与编程&#xff0c;写博客记录自己的学习历程。 &#x1f34e;个人主页&#xff1a;小嗷犬的个人主页 &#x1f34a;个人网站&#xff1a;小嗷犬的技术小站 &#x1f96d;个人信条&#xff1a;为天地立心&…

NIO通信代码示例

NIO通信架构图 1.Client NioClient package nio;import constant.Constant;import java.io.IOException; import java.util.Scanner;public class NioClient {private static NioClientHandle nioClientHandle;public static void start() {nioClientHandle new NioClientHa…

【Spring Cloud】Sentinel流量限流和熔断降级的讲解

&#x1f389;&#x1f389;欢迎来到我的CSDN主页&#xff01;&#x1f389;&#x1f389; &#x1f3c5;我是Java方文山&#xff0c;一个在CSDN分享笔记的博主。&#x1f4da;&#x1f4da; &#x1f31f;推荐给大家我的专栏《Spring Cloud》。&#x1f3af;&#x1f3af; &am…

网工内推 | 高级网工,H3C认证优先,朝九晚六,周末双休

01 万德 招聘岗位&#xff1a;高级网络工程师 职责描述&#xff1a; 1、项目交付&#xff1a;项目管理和交付&#xff0c;包括项目前期的规划、实施以及后期的运维支持、项目验收等。 2、技术支持&#xff1a;为客户及合作伙伴提供网上问题远程和现场支持&#xff1b;对公司内…

【C++】STL 算法 ⑥ ( 二元谓词 | std::sort 算法简介 | 为 std::sort 算法设置 二元谓词 排序规则 )

文章目录 一、二元谓词1、二元谓词简介2、 std::sort 算法简介3、 代码示例 - 为 std::sort 算法设置 二元谓词 排序规则 一、二元谓词 1、二元谓词简介 " 谓词 ( Predicate ) " 是一个 返回 布尔 bool 类型值 的 函数对象 / 仿函数 或 Lambda 表达式 / 普通函数 , …

全链路压力测试有哪些主要作用

全链路压力测试是在软件开发和维护过程中不可或缺的一环&#xff0c;尤其在复杂系统和高并发场景下显得尤为重要。下面将详细介绍全链路压力测试的主要作用。 一、全链路压力测试概述 全链路压力测试是指对软件系统的全部组件(包括前端、后端、数据库、网络、中间件等)在高负载…

vue/vue3/js来动态修改我们的界面浏览器上面的文字和图标

前言&#xff1a; 整理vue/vue3项目中修改界面浏览器上面的文字和图标的方法。 效果&#xff1a; vue2/vue3: 默认修改 public/index.html index.html <!DOCTYPE html> <html lang"en"><head><link rel"icon" type"image/sv…

HarmonyOS自定义组件生命周期函数介绍

aboutToAppear 在创建自定义组件的新实例后&#xff0c;在执行其build()函数之前执行。允许在aboutToAppear函数中改变状态变量&#xff0c;更改将在后续执行build()函数中生效。 aboutToDisappear 在自定义组件析构销毁之前执行。不允许在aboutToDisappear函数中改变状态变…

Redis:原理速成+项目实战——Redis实战10(Redis消息队列实现异步秒杀)

&#x1f468;‍&#x1f393;作者简介&#xff1a;一位大四、研0学生&#xff0c;正在努力准备大四暑假的实习 &#x1f30c;上期文章&#xff1a;Redis&#xff1a;原理速成项目实战——Redis实战9&#xff08;秒杀优化&#xff09; &#x1f4da;订阅专栏&#xff1a;Redis&…

wxWidgets实战:使用mpWindow绘制阻抗曲线

选择模型时&#xff0c;需要查看model的谐振频率&#xff0c;因此需要根据s2p文件绘制一张阻抗曲线。 如下图所示&#xff1a; mpWindow 左侧使用mpWindow&#xff0c;右侧使用什么&#xff1f; wxFreeChart https://forums.wxwidgets.org/viewtopic.php?t44928 https://…

实战环境搭建-linux下安装tomcat

安装tomcat Index of /dist/tomcat/tomcat-9/v9.0.8/bin 下载apache-tomcat-9.0.8.tar.gz&#xff0c;可以使用wget; 2、将压缩包tar -zxvf apache-tomcat-9.0.8.tar.gz解压到/home/tomcat 3、修改环境变量 vi /etc/profile export JAVA_HOME/home/java/jdk1.8.0_221 expo…

IntelliJ IDEA Java 连接 mysql 配置(附完整 demo)

下载 MySQL 驱动 从MySQL官网下载JDBC驱动的步骤如下&#xff1a; 1&#xff09;访问MySQL的官方网站&#xff1a;MySQL 2&#xff09;点击页面上方的"DOWNLOADS"菜单&#xff1b; 3&#xff09;在下载页面&#xff0c;找到"MySQL Community (GPL) Downloads…

QT第四天

头文件&#xff1a; #ifndef WIDGET_H #define WIDGET_H#include <QWidget> #include<QTime>//时间类 #include<QTimerEvent>//定时器事件类 #include<QtTextToSpeech> //语言播报类 #include<QPushButton> namespace Ui { class Widget; }clas…