[RocketMQ] NameServer启动流程源码解析 (一)

news2024/11/23 10:20:45

文章目录

      • 1.NameServer概述
      • 2.NamesrvStartup启动入口
      • 3.createNamesrvController创建NamesrvController
        • 3.1 创建NamesrvController
      • 4.start启动NamesrvController
        • 4.1 初始化NettyServer
          • 4.1.1 创建NettyRemotingServer
          • 4.1.2 registerProcessor注册默认请求处理器
          • 4.1.3 启动定时任务
        • 4.2 注册销毁钩子函数
        • 4.3 start启动NettyServer
      • 5.NameServer启动流程总结

1.NameServer概述

NameServer是一个非常简单的Topic路由注册中心, 其角色类似Dubbo中的zookeeper, 支持Broker的动态注入和发现。

在这里插入图片描述

  1. Broker管理: NameServer接受Broker集群的注册信息并且保存下来作为路由信息的基本数据。然后提供心跳检测机制, 检查Broker是否存活。
  2. 路由信息管理: 每个NameServer将保存关于Broker集群的整个路由信息和用于客户端查询的队列信息。然后Producer和Conumser通过NameServer就可以知道整个Broker集群的路由信息, 从而进行消息的投递和消费。

无论是Producer、Conumser、Broker都会直接和NameServer进行通信。

RocketMQ在部署启动时, 先启动NameServer

2.NamesrvStartup启动入口

NamesrvStartup#main

    public static NamesrvController main0(String[] args) {

        try {
        	// 创建NamesrvController
            NamesrvController controller = createNamesrvController(args);
            // 启动NamesrvController
            start(controller);
            String tip = "The Name Server boot success. serializeType=" + RemotingCommand.getSerializeTypeConfigInThisServer();
            log.info(tip);
            System.out.printf("%s%n", tip);
            return controller;
        } catch (Throwable e) {
            e.printStackTrace();
            System.exit(-1);
        }

        return null;
    }

在这里插入图片描述

1.createNamesrvController 创建NamesrvController
2. start启动NamesrvController

3.createNamesrvController创建NamesrvController

创建NamesrvController, 该方法主要是解析命令行, 加载NameServer配置和NettyServer各种配置, 并保存起来然后创建一个NamesrvController。NamesrvController相当于NameServer的一个中央控制器类。

public static NamesrvController createNamesrvController(String[] args) throws IOException, JoranException {
    //设置RocketMQ的版本信息,属性名为rocketmq.remoting.version
    System.setProperty(RemotingCommand.REMOTING_VERSION_KEY, Integer.toString(MQVersion.CURRENT_VERSION));
    //PackageConflictDetect.detectFastjson();
    /*jar包启动时,构建命令行操作的指令,使用main方法启动可以忽略*/
    Options options = ServerUtil.buildCommandlineOptions(new Options());
    //mqnamesrv命令文件
    commandLine = ServerUtil.parseCmdLine("mqnamesrv", args, buildCommandlineOptions(options), new PosixParser());
    if (null == commandLine) {
        System.exit(-1);
        return null;
    }
    //创建NameServer的配置类,包含NameServer的配置,比如ROCKETMQ_HOME
    final NamesrvConfig namesrvConfig = new NamesrvConfig();
    //和NettyServer的配置类
    final NettyServerConfig nettyServerConfig = new NettyServerConfig();
    //netty服务的监听端口设置为9876
    nettyServerConfig.setListenPort(9876);
    //判断命令行中是否包含字符'c',即是否包含通过命令行指定配置文件的命令
    //例如,启动Broker的时候添加的 -c /Volumes/Samsung/Idea/rocketmq/config/conf/broker.conf命令
    if (commandLine.hasOption('c')) {
        /*解析配置文件并且存入NamesrvConfig和NettyServerConfig中,没有的话就不用管*/
        String file = commandLine.getOptionValue('c');
        if (file != null) {
            InputStream in = new BufferedInputStream(new FileInputStream(file));
            properties = new Properties();
            properties.load(in);
            MixAll.properties2Object(properties, namesrvConfig);
            MixAll.properties2Object(properties, nettyServerConfig);

            namesrvConfig.setConfigStorePath(file);

            System.out.printf("load config properties file OK, %s%n", file);
            in.close();
        }
    }
    /*判断命令行中是否包含字符'p',如果存在则打印配置信息并结束jvm运行,没有的话就不用管*/
    if (commandLine.hasOption('p')) {
        InternalLogger console = InternalLoggerFactory.getLogger(LoggerName.NAMESRV_CONSOLE_NAME);
        MixAll.printObjectProperties(console, namesrvConfig);
        MixAll.printObjectProperties(console, nettyServerConfig);
        System.exit(0);
    }
    //把命令行的配置解析到namesrvConfig
    MixAll.properties2Object(ServerUtil.commandLine2Properties(commandLine), namesrvConfig);
    //如果不存在ROCKETMQ_HOME的配置,那么打印异常并退出程序,这就是最开始启动NameServer是抛出异常的位置
    if (null == namesrvConfig.getRocketmqHome()) {
        System.out.printf("Please set the %s variable in your environment to match the location of the RocketMQ installation%n", MixAll.ROCKETMQ_HOME_ENV);
        System.exit(-2);
    }
    /*一系列日志的配置*/
    LoggerContext lc = (LoggerContext) LoggerFactory.getILoggerFactory();
    JoranConfigurator configurator = new JoranConfigurator();
    configurator.setContext(lc);
    lc.reset();
    configurator.doConfigure(namesrvConfig.getRocketmqHome() + "/conf/logback_namesrv.xml");

    log = InternalLoggerFactory.getLogger(LoggerName.NAMESRV_LOGGER_NAME);
    //打印nameServer 服务器配置类和 netty 服务器配置类的配置信息
    MixAll.printObjectProperties(log, namesrvConfig);
    MixAll.printObjectProperties(log, nettyServerConfig);
    /*
     * 根据namesrvConfig和nettyServerConfig创建NamesrvController
     */
    final NamesrvController controller = new NamesrvController(namesrvConfig, nettyServerConfig);

    // 将所有的-c的外部配置信息保存到NamesrvController中的Configuration对象属性的allConfigs属性中
    controller.getConfiguration().registerConfig(properties);

    return controller;
}
  1. 创建NameServer的配置类和NettyServer的配置类, 包含NameServer的配置, 比如ROCKETMQ_HOME。
  2. 判断命令行中是否包含字符’c’, 即是否包含通过命令行指定配置文件的命令。
  3. 判断命令行中是否包含字符’p’, 如果存在则打印配置信息并结束jvm运行。
  4. 把命令行的配置解析到namesrvConfig。
  5. 根据namesrvConfig和nettyServerConfig创建NamesrvController。
  6. 将所有的-c的外部配置信息保存到NamesrvController中的Configuration对象属性的allConfigs属性中。

3.1 创建NamesrvController

在这里插入图片描述

初始化一些属性。

public NamesrvController(NamesrvConfig namesrvConfig, NettyServerConfig nettyServerConfig) {
    //nameserver的配置
    this.namesrvConfig = namesrvConfig;
    //nameserver的netty服务的配置
    this.nettyServerConfig = nettyServerConfig;
    //kv配置管理器
    this.kvConfigManager = new KVConfigManager(this);
    //路由信息管理器
    this.routeInfoManager = new RouteInfoManager();
    //Broker连接的各种事件的处理服务,是处理Broker连接发生变化的服务
    //主要用于监听在Channel通道关闭事件触发时调用RouteInfoManager#onChannelDestroy清除路由信息
    this.brokerHousekeepingService = new BrokerHousekeepingService(this);
    //配置类,并将namesrvConfig和nettyServerConfig的配置注册到内部的allConfigs集合中
    this.configuration = new Configuration(log, this.namesrvConfig, this.nettyServerConfig);
    //存储路径配置
    this.configuration.setStorePathFromConfig(this.namesrvConfig, "configStorePath");
}

brokerHousekeepingService: ChannelEventListener的实现类, 主要用于监听Broker的Channel通道关闭事件, 并在事件触发时调用RouteInfoManager#onChannelDestroy清除路由信息。

public class BrokerHousekeepingService implements ChannelEventListener {
    private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.NAMESRV_LOGGER_NAME);
    private final NamesrvController namesrvController;

    public BrokerHousekeepingService(NamesrvController namesrvController) {
        this.namesrvController = namesrvController;
    }
    //连接事件,不处理
    @Override
    public void onChannelConnect(String remoteAddr, Channel channel) {
    }
    //连接关闭事件
    @Override
    public void onChannelClose(String remoteAddr, Channel channel) {
        this.namesrvController.getRouteInfoManager().onChannelDestroy(remoteAddr, channel);
    }
    //连接异常事件
    @Override
    public void onChannelException(String remoteAddr, Channel channel) {
        this.namesrvController.getRouteInfoManager().onChannelDestroy(remoteAddr, channel);
    }
    //连接闲置事件
    @Override
    public void onChannelIdle(String remoteAddr, Channel channel) {
        this.namesrvController.getRouteInfoManager().onChannelDestroy(remoteAddr, channel);
    }
}

4.start启动NamesrvController

启动NameServer中的NettyServer服务。

  1. 调用initialize方法初始化NettyServer。创建netty远程服务, 初始化netty线程池, 注册请求处理器, 配置定时任务, 扫描并移除不活跃的Broker等操作。
  2. 对JVM添加关闭钩子方法, 在NameServer的JVM关闭之前执行, 关闭NameServerController中的线程池, NettyServer进行关闭进行一些内存清理、对象销毁等操作。
  3. 调用start方法启动NettyServer, 并监听。
public static NamesrvController start(final NamesrvController controller) throws Exception {
    //不能为null
    if (null == controller) {
        throw new IllegalArgumentException("NamesrvController is null");
    }
    /*
     * 1 初始化NettyServer
     * 创建netty远程服务,初始化Netty线程池,注册请求处理器,配置定时任务,用于扫描并移除不活跃的Broker等操作。
     */
    boolean initResult = controller.initialize();
    //初始化失败则退出程序
    if (!initResult) {
        controller.shutdown();
        System.exit(-3);
    }
    /*
     * 2 添加关闭钩子方法,在NameServer关闭之前执行,进行一些内存清理、对象销毁等操作
     */
    Runtime.getRuntime().addShutdownHook(new ShutdownHookThread(log, new Callable<Void>() {
        @Override
        public Void call() throws Exception {
            controller.shutdown();
            return null;
        }
    }));
    /*
     * 3 启动NettyServer,并进行监听
     */
    controller.start();

    return controller;
}

4.1 初始化NettyServer

在这里插入图片描述

initialize:

  1. 加载KV配置并存储到kvConfigManager内部的configTable属性中。
  2. 创建NameServer的netty远程服务, remotingServer是一个基于Netty的用于NameServer与Broker、Consumer、Producer进行网络通信的服务端。
  3. 创建netty远程通信执行器线程池remotingExecutor, 线程数默认8, 线程名以RemotingExecutorThread_为前缀, 用作默认的请求处理线程池。
  4. 注册默认请求处理器DefaultRequestProcessor到remotingServer中。
  5. 启动两个定时任务。其中一个每隔十秒钟检测不活跃的Broker并清理相关路由信息。另一个任务则是每隔十分钟打印kv配置信息。
public boolean initialize() {
    /*
     * 1 加载KV配置并存储到kvConfigManager内部的configTable属性中
     * KVConfig配置文件默认路径是 ${user.home}/namesrv/kvConfig.json
     */
    this.kvConfigManager.load();
    /*
     * 2 创建NameServer的netty远程服务
     * 设置了一个ChannelEventListener,为此前创建brokerHousekeepingService
     * remotingServer是一个基于Netty的用于NameServer与Broker、Consumer、Producer进行网络通信的服务端
     */
    this.remotingServer = new NettyRemotingServer(this.nettyServerConfig, this.brokerHousekeepingService);
    /*
     * 3 创建netty远程通信执行器线程池,用作默认的请求处理线程池,线程名以RemotingExecutorThread_为前缀
     */
    this.remotingExecutor =
        Executors.newFixedThreadPool(nettyServerConfig.getServerWorkerThreads(), new ThreadFactoryImpl("RemotingExecutorThread_"));
    /*
     * 4 注册默认请求处理器DefaultRequestProcessor
     * 将remotingExecutor绑定到DefaultRequestProcessor上,用作默认的请求处理线程池
     * DefaultRequestProcessor绑定到remotingServer的defaultRequestProcessor属性上
     */
    this.registerProcessor();
    /*
     * 5 启动一个定时任务
     * 首次启动延迟5秒执行,此后每隔10秒执行一次扫描无效的Broker,并清除Broker相关路由信息的任务
     */
    this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {

        @Override
        public void run() {
            //扫描notActive的broker
            NamesrvController.this.routeInfoManager.scanNotActiveBroker();
        }
    }, 5, 10, TimeUnit.SECONDS);
    /*
     * 6 启动一个定时任务
     * 首次启动延迟1分钟执行,此后每隔10分钟执行一次打印kv配置信息的任务
     */
    this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {

        @Override
        public void run() {
            //打印kv配置信息
            NamesrvController.this.kvConfigManager.printAllPeriodically();
        }
    }, 1, 10, TimeUnit.MINUTES);
    /*
     * Tls传输相关配置,通信安全的文件监听模块,用来观察网络加密配置文件的更改
     */
    if (TlsSystemConfig.tlsMode != TlsMode.DISABLED) {
        // Register a listener to reload SslContext
        try {
            fileWatchService = new FileWatchService(
                new String[] {
                    TlsSystemConfig.tlsServerCertPath,
                    TlsSystemConfig.tlsServerKeyPath,
                    TlsSystemConfig.tlsServerTrustCertPath
                },
                new FileWatchService.Listener() {
                    boolean certChanged, keyChanged = false;
                    @Override
                    public void onChanged(String path) {
                        if (path.equals(TlsSystemConfig.tlsServerTrustCertPath)) {
                            log.info("The trust certificate changed, reload the ssl context");
                            reloadServerSslContext();
                        }
                        if (path.equals(TlsSystemConfig.tlsServerCertPath)) {
                            certChanged = true;
                        }
                        if (path.equals(TlsSystemConfig.tlsServerKeyPath)) {
                            keyChanged = true;
                        }
                        if (certChanged && keyChanged) {
                            log.info("The certificate and private key changed, reload the ssl context");
                            certChanged = keyChanged = false;
                            reloadServerSslContext();
                        }
                    }
                    private void reloadServerSslContext() {
                        ((NettyRemotingServer) remotingServer).loadSslContext();
                    }
                });
        } catch (Exception e) {
            log.warn("FileWatchService created error, can't load the certificate dynamically");
        }
    }

    return true;
}
4.1.1 创建NettyRemotingServer

remotingServer是一个基于Netty的用于NameServer与Broker、Consumer、Producer进行网络通信的服务端。

在这里插入图片描述

  1. 创建ServerBootstrap, Netty启动服务端。
  2. 创建一个公共线程池publicExecutor, 线程数为4, 线程名以NettyServerPublicExecutor_为前缀。用在registerProcessor方法中,
  3. 根据是否使用epoll模型初始化Boss EventLoopGroup和Worker EventLoopGroup这两个事件循环组, 线程数分别默认1个和3个线程, 线程名分别以NettyEPOLLBoss_和NettyServerEPOLLSelector_为前缀。如果是linux内核, 并且指定开启epoll, 并且系统支持epoll, 才会使用EpollEventLoopGroup类型, 否则使用NioEventLoopGroup。
public NettyRemotingServer(final NettyServerConfig nettyServerConfig,
    final ChannelEventListener channelEventListener) {
    //设置服务器单向、异步发送信号量
    super(nettyServerConfig.getServerOnewaySemaphoreValue(), nettyServerConfig.getServerAsyncSemaphoreValue());
    //创建Netty服务端启动类,引导启动服务端
    this.serverBootstrap = new ServerBootstrap();
    this.nettyServerConfig = nettyServerConfig;
    this.channelEventListener = channelEventListener;
    //服务器回调执行线程数量,默认设置为4
    int publicThreadNums = nettyServerConfig.getServerCallbackExecutorThreads();
    if (publicThreadNums <= 0) {
        publicThreadNums = 4;
    }
    //创建一个公共线程池,负责处理某些请求业务,例如发送异步消息回调,线程名以NettyServerPublicExecutor_为前缀
    this.publicExecutor = Executors.newFixedThreadPool(publicThreadNums, new ThreadFactory() {
        private AtomicInteger threadIndex = new AtomicInteger(0);

        @Override
        public Thread newThread(Runnable r) {
            return new Thread(r, "NettyServerPublicExecutor_" + this.threadIndex.incrementAndGet());
        }
    });
    /*
     * 是否使用epoll模型,并且初始化Boss EventLoopGroup和Worker EventLoopGroup这两个事件循环组
     * 如果是linux内核,并且指定开启epoll,并且系统支持epoll,才会使用EpollEventLoopGroup,否则使用NioEventLoopGroup
     */
    if (useEpoll()) {
        /*采用了epoll*/
        this.eventLoopGroupBoss = new EpollEventLoopGroup(1, new ThreadFactory() {
            private AtomicInteger threadIndex = new AtomicInteger(0);

            @Override
            public Thread newThread(Runnable r) {
                return new Thread(r, String.format("NettyEPOLLBoss_%d", this.threadIndex.incrementAndGet()));
            }
        });

        this.eventLoopGroupSelector = new EpollEventLoopGroup(nettyServerConfig.getServerSelectorThreads(), new ThreadFactory() {
            private AtomicInteger threadIndex = new AtomicInteger(0);
            private int threadTotal = nettyServerConfig.getServerSelectorThreads();

            @Override
            public Thread newThread(Runnable r) {
                return new Thread(r, String.format("NettyServerEPOLLSelector_%d_%d", threadTotal, this.threadIndex.incrementAndGet()));
            }
        });
    } else {
        /*未采用epoll*/
        //Boss EventLoopGroup 默认1个线程,线程名以NettyNIOBoss_为前缀
        this.eventLoopGroupBoss = new NioEventLoopGroup(1, new ThreadFactory() {
            private AtomicInteger threadIndex = new AtomicInteger(0);

            @Override
            public Thread newThread(Runnable r) {
                return new Thread(r, String.format("NettyNIOBoss_%d", this.threadIndex.incrementAndGet()));
            }
        });
        //Worker EventLoopGroup 默认3个线程,线程名以NettyServerNIOSelector_为前缀
        this.eventLoopGroupSelector = new NioEventLoopGroup(nettyServerConfig.getServerSelectorThreads(), new ThreadFactory() {
            private AtomicInteger threadIndex = new AtomicInteger(0);
            private int threadTotal = nettyServerConfig.getServerSelectorThreads();

            @Override
            public Thread newThread(Runnable r) {
                return new Thread(r, String.format("NettyServerNIOSelector_%d_%d", threadTotal, this.threadIndex.incrementAndGet()));
            }
        });
    }
    //加载ssl信息
    loadSslContext();
}
4.1.2 registerProcessor注册默认请求处理器

注册默认请求处理器DefaultRequestProcessor到remotingServer中。

在这里插入图片描述

当请求来的时候, 先根据请求的code, 获取对应的RequestProcessor, 如果code没有注册RequestProcessor, 采用DefaultRequestProcessor处理。

private void registerProcessor() {
    if (namesrvConfig.isClusterTest()) {

        this.remotingServer.registerDefaultProcessor(new ClusterTestRequestProcessor(this, namesrvConfig.getProductEnvName()),
            this.remotingExecutor);
    } else {
        //将remotingExecutor绑定到DefaultRequestProcessor上,用作默认的请求处理线程池
        //将DefaultRequestProcessor绑定到remotingServer的defaultRequestProcessor属性上
        this.remotingServer.registerDefaultProcessor(new DefaultRequestProcessor(this), this.remotingExecutor);
    }
}

@Override
public void registerDefaultProcessor(NettyRequestProcessor processor, ExecutorService executor) {
    //defaultRequestProcessor属性
    this.defaultRequestProcessor = new Pair<NettyRequestProcessor, ExecutorService>(processor, executor);
}
4.1.3 启动定时任务

在创建了remotingServer并且注册了默认请求处理器之后, 创建两个定时任务

  1. 首次启动延迟5s, 每隔10s执行一次扫描无效的Broker, 并清除Broker相关路由信息的任务。
  2. 首次启动延迟1分钟执行, 此后每隔10分钟执行一次打印kvConfig配置信息的任务。

在这里插入图片描述

/*
 * 5 启动一个定时任务
 * 首次启动延迟5秒执行,此后每隔10秒执行一次扫描无效的Broker,并清除Broker相关路由信息的任务
 */
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {

    @Override
    public void run() {
        //扫描notActive的broker
        NamesrvController.this.routeInfoManager.scanNotActiveBroker();
    }
}, 5, 10, TimeUnit.SECONDS);
/*
 * 6 启动一个定时任务
 * 首次启动延迟1分钟执行,此后每隔10分钟执行一次打印kv配置信息的任务
 */
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {

    @Override
    public void run() {
        //打印kv配置信息
        NamesrvController.this.kvConfigManager.printAllPeriodically();
    }
}, 1, 10, TimeUnit.MINUTES);

4.2 注册销毁钩子函数

在这里插入图片描述

public void shutdown() {
    //关闭nettyserver
    this.remotingServer.shutdown();
    //关闭线程池
    this.remotingExecutor.shutdown();
    //关闭定时任务
    this.scheduledExecutorService.shutdown();

    if (this.fileWatchService != null) {
        this.fileWatchService.shutdown();
    }
}
  1. 关闭nettyserver
  2. 关闭线程池
  3. 关闭定时任务

4.3 start启动NettyServer

在这里插入图片描述

/**
 * NamesrvController的方法
 */
public void start() throws Exception {
    //调用remotingServer的启动方法
    this.remotingServer.start();

    if (this.fileWatchService != null) {
        //监听tts相关文件是否发生变化
        this.fileWatchService.start();
    }
}
  1. 调用remotingServer的启动方法: 将会启动一个Netty服务端, NettyRemotingServer类属于remoting远程通信模块, 是NameServer和Broker共用的进入网络通信类。
  2. 监听tts相关文件是否发生变化。

remotingServer.start():

@Override
public void start() {
    /*
     * 1 创建默认事件处理器组,线程数默认8个线程,线程名以NettyServerCodecThread_为前缀。
     * 主要用于执行在真正执行业务逻辑之前需要进行的SSL验证、编解码、空闲检查、网络连接管理等操作
     * 其工作时间位于IO线程组之后,process线程组之前
     */
    this.defaultEventExecutorGroup = new DefaultEventExecutorGroup(
            nettyServerConfig.getServerWorkerThreads(),
            new ThreadFactory() {

                private AtomicInteger threadIndex = new AtomicInteger(0);

                @Override
                public Thread newThread(Runnable r) {
                    return new Thread(r, "NettyServerCodecThread_" + this.threadIndex.incrementAndGet());
                }
            });
    /*
     * 2 准备一些共享handler
     * 包括handshakeHandler、encoder、connectionManageHandler、serverHandler
     */
    prepareSharableHandlers();
    /*
     * 3 配置NettyServer的启动参数
     * 包括handshakeHandler、encoder、connectionManageHandler、serverHandler
     */
    ServerBootstrap childHandler =
            //配置bossGroup为此前创建的eventLoopGroupBoss,默认1个线程,用于处理连接时间
            //配置workerGroup为此前创建的eventLoopGroupSelector,默认三个线程,用于处理IO事件
            this.serverBootstrap.group(this.eventLoopGroupBoss, this.eventLoopGroupSelector)
                    //IO模型
                    .channel(useEpoll() ? EpollServerSocketChannel.class : NioServerSocketChannel.class)
                    /*设置通道的选项参数, 对于服务端而言就是ServerSocketChannel, 客户端而言就是SocketChannel*/
                    /*option主要是针对boss线程组,child主要是针对worker线程组*/
                    //对应的是tcp/ip协议listen函数中的backlog参数
                    .option(ChannelOption.SO_BACKLOG, nettyServerConfig.getServerSocketBacklog())
                    //对应于套接字选项中的SO_REUSEADDR,这个参数表示允许重复使用本地地址和端口
                    .option(ChannelOption.SO_REUSEADDR, true)
                    //对应于套接字选项中的SO_KEEPALIVE,该参数用于设置TCP连接,当设置该选项以后,连接会测试链接的状态
                    .option(ChannelOption.SO_KEEPALIVE, false)
                    //对应于套接字选项中的TCP_NODELAY,该参数的使用与Nagle算法有关
                    .childOption(ChannelOption.TCP_NODELAY, true)
                    //配置本地地址,监听端口为此前设置的9876
                    .localAddress(new InetSocketAddress(this.nettyServerConfig.getListenPort()))
                    /*设置用于为 Channel 的请求提供服务的 ChannelHandler*/
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        public void initChannel(SocketChannel ch) throws Exception {
                            //ChannelPipeline一个ChannelHandler的链表,Netty处理请求基于责任链默认
                            //里面的ChannelHandler就是用于处理请求的
                            ch.pipeline()
                                    //处理TSL协议握手的Handler
                                    .addLast(defaultEventExecutorGroup, HANDSHAKE_HANDLER_NAME, handshakeHandler)
                                    //为defaultEventExecutorGroup,添加handler
                                    .addLast(defaultEventExecutorGroup,
                                            //RocketMQ自定义的请求解码器
                                            encoder,
                                            //RocketMQ自定义的请求编码器
                                            new NettyDecoder(),
                                            //Netty自带的心跳管理器,主要是用来检测远端是否存活
                                            //即测试端一定时间内未接受到被测试端消息和一定时间内向被测试端发送消息的超时时间为120秒
                                            new IdleStateHandler(0, 0, nettyServerConfig.getServerChannelMaxIdleTimeSeconds()),
                                            //连接管理器,他负责连接的激活、断开、超时、异常等事件
                                            connectionManageHandler,
                                            //服务请求处理器,处理RemotingCommand消息,即请求和响应的业务处理,并且返回相应的处理结果。这是重点
                                            //例如broker注册、producer/consumer获取Broker、Topic信息等请求都是该处理器处理
                                            //serverHandler最终会将请求根据不同的消息类型code分发到不同的process线程池处理
                                            serverHandler
                                    );
                        }
                    });
    //对应于套接字选项中的SO_SNDBUF,接收缓冲区,默认是65535
    if (nettyServerConfig.getServerSocketSndBufSize() > 0) {
        log.info("server set SO_SNDBUF to {}", nettyServerConfig.getServerSocketSndBufSize());
        childHandler.childOption(ChannelOption.SO_SNDBUF, nettyServerConfig.getServerSocketSndBufSize());
    }
    //对应于套接字选项中的SO_SNDBUF,发送缓冲区,默认是65535
    if (nettyServerConfig.getServerSocketRcvBufSize() > 0) {
        log.info("server set SO_RCVBUF to {}", nettyServerConfig.getServerSocketRcvBufSize());
        childHandler.childOption(ChannelOption.SO_RCVBUF, nettyServerConfig.getServerSocketRcvBufSize());
    }
    //用于设置写缓冲区的低水位线和高水位线。
    if (nettyServerConfig.getWriteBufferLowWaterMark() > 0 && nettyServerConfig.getWriteBufferHighWaterMark() > 0) {
        log.info("server set netty WRITE_BUFFER_WATER_MARK to {},{}",
                nettyServerConfig.getWriteBufferLowWaterMark(), nettyServerConfig.getWriteBufferHighWaterMark());
        childHandler.childOption(ChannelOption.WRITE_BUFFER_WATER_MARK, new WriteBufferWaterMark(
                nettyServerConfig.getWriteBufferLowWaterMark(), nettyServerConfig.getWriteBufferHighWaterMark()));
    }
    //分配缓冲区
    if (nettyServerConfig.isServerPooledByteBufAllocatorEnable()) {
        childHandler.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
    }

    try {
        /*
         * 启动Netty服务
         */
        ChannelFuture sync = this.serverBootstrap.bind().sync();
        InetSocketAddress addr = (InetSocketAddress) sync.channel().localAddress();
        //设置端口号,默认9876
        this.port = addr.getPort();
    } catch (InterruptedException e1) {
        throw new RuntimeException("this.serverBootstrap.bind().sync() InterruptedException", e1);
    }
    //如果channelEventListener不为null,那么启动netty事件执行器
    //这里的listener就是之前初始化的BrokerHousekeepingService
    if (this.channelEventListener != null) {
        this.nettyEventExecutor.start();
    }
    /*
     * 启动定时任务,初始启动3秒后执行,此后每隔1秒执行一次
     * 扫描responseTable,将超时的ResponseFuture直接移除,并且执行这些超时ResponseFuture的回调
     */
    this.timer.scheduleAtFixedRate(new TimerTask() {

        @Override
        public void run() {
            try {
                NettyRemotingServer.this.scanResponseTable();
            } catch (Throwable e) {
                log.error("scanResponseTable exception", e);
            }
        }
    }, 1000 * 3, 1000);
}

在这里插入图片描述

Handler:

  1. handshakeHandler: 用来处理TSL协议握手的Handler, 无需过多关注。
  2. NettyEncoder和NettyDecoder: RocketMQ自定义的请求解码和编码器, 处理报文的编解码操作。负责网络传输数据和 RemotingCommand 之间的编解码。
  3. IdleStateHandler: Netty自带的心跳管理器, 用来检测远端是否存活。
  4. connectionManageHandler: 处理连接事件的Handler, 负责连接的激活、断开、超时、异常等事件。
  5. serverHandler: 处理读写事件的handler, 是真正处理业务请求的。

在这里插入图片描述

通过serverBootstrap#sync方法启动netty服务端的, NameServer启动完毕, 然后可以对外提供远程通信服务, 启动Broker。

启动了一个定时任务scanResponseTable, 是用于处理通信时的异常情况, RocketMQ会将请求结果封装为一个ResponseFuture并且存入responseTable中。那么在发送消息时候, 如果出现异常的话, 可能造成responseTable中的ResponseFuture累积, 因此每一秒扫描一次responseTable, 将超时的ResponseFuture直接移除, 执行ResponseFuture的回调。

5.NameServer启动流程总结

  • 进行一些初始化的配置的读取, 然后最重要的是启动一个基于Netty的服务端, 端口为9876。
  • 启动一些定时任务, 比如printAllPeriodically每一分钟打印kv信息, scanResponseTable每一秒钟清除无效ResponseFuture, scanNotActiveBroker每10s执行一次扫描, 检查无效的broker, 并清除相关的路由信息的任务。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/667680.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Google C++ Style文档及常用代码规范(一):命名约定、格式、注释

文章目录 Google C Style文档及常用代码规范&#xff08;一&#xff09;&#xff1a;命名约定、格式、注释命名约定通用命名规则文件命名类型命名变量命名常量命名函数命名命名空间命名枚举命名宏命名命名规则的特例 格式注释注释风格文件注释类注释函数注释变量注释类数据成员…

flutter getx nested navigation 嵌套路由

flutter getx nested navigation 嵌套路由 视频 https://youtu.be/SXER4JVBFps 前言 嵌套路由可以用在如购物确认向导界面切换。 使用 getx 实现嵌套路由&#xff0c;需要如下步骤&#xff1a; 通过 Navigator 组件的 key 属性 用 Get.nestedKey(1) 进行标记 onGenerateRoute…

delmia msd学习

在默认打开的结构树中添加一个产品tworobts TwoRobots是新建的一个空产品&#xff0c;并将其插入到resoourceslist下面通过 然后创建一个工位的工作区域 插入机器人 把机器人放在工作区域中,即其子物体 先选要移动的对象&#xff0c;然后选移动到什么地方 Keep positions的意思…

栈的概念和结构以及实现

1. 栈 1.1栈的概念及结构 栈:一种特殊的线性表&#xff0c;其只允许在 固定的一端 进行 插入和删除 元素操作。 进行数据插入和删除 操作的一端称为 栈顶 &#xff0c;另一端称为 栈底 。栈中的数据元素遵守 后进先出 LIFO (Last in First Out) 的原则。 压栈:栈的插入操作叫做…

【带你刷《剑指Offer》系列】【每天40分钟,跟我一起用50天刷完 (剑指Offer)】第一天

专注 效率 记忆 预习 笔记 复习 做题 欢迎观看我的博客&#xff0c;如有问题交流&#xff0c;欢迎评论区留言&#xff0c;一定尽快回复&#xff01;&#xff08;大家可以去看我的专栏&#xff0c;是所有文章的目录&#xff09;   文章字体风格&#xff1a; 红色文字表示&#…

python:使用Scikit-image对单波段遥感影像进行形状特征提取(morphology)

作者:CSDN @ _养乐多_ 本文将介绍使用Scikit-image对单波段遥感影像做形状特征提取的方法和代码。包括:腐蚀(erosion),膨胀(dilation),开运算(opening),闭运算(closing),形态学梯度(morphological gradient),白帽变换(top hat),黑帽变换(black hat),形…

一、枚举类型——用EnumSet来代替标识

Set 是一种不允许有重复元素存在的集合。enum 要求每个内部成员都是唯一的&#xff0c;因此看起来很像 Set&#xff0c;但是由于无法添加或移除元素&#xff0c;它并不如 Set 那么好用。于是 EnumSet 被引入&#xff0c;用来配合 enum 的使用&#xff0c;以替代传统的基于 int …

计算机启动

按下主机上的 power 键后&#xff0c;第一个运行的软件是 BIOS,BIOS 全称叫 Base Input & Output System&#xff0c;即基本输入输出系统。 &#xff08;8086的1MB内存&#xff09; 地址 0&#xff5e;0x9FFFF 处是 DRAM&#xff0c;顶部的 0xF0000&#xff5e;0xFFFFF&am…

第一章 基础算法(一)—— 快排,归并与二分

文章目录 快排归并排序二分整数二分浮点数二分 快速排序练习题785. 快速排序786. 第k个数 归并排序练习题787. 归并排序788. 逆序对的数量 二分练习题789. 数的范围790. 数的三次方根 有些累了&#xff0c;把这两天做的笔记整理发出 快排 快排的思路&#xff1a; 确定分界点根…

Pandas-DataFrame常用基础知识点总结

注&#xff1a;以下知识点总结是将数据转为DataFrame格式数据的基础之上进行操作的 &#xff08;首先需要做的是将数据转为DataFrame格式&#xff09; DataFrame格式示例&#xff1a; import pandas as pd data {"code": [000008, 000009, 000021, 000027, 00003…

代码随想录二刷 day28 | 回溯 之 93.复原IP地址 78.子集 90.子集II

day28 93.复原IP地址判断子串是否合法 78.子集回溯三部曲 90.子集II 93.复原IP地址 题目链接 解题思路&#xff1a; 切割问题就可以使用回溯搜索法把所有可能性搜出来 回溯三部曲 递归参数 startIndex一定是需要的&#xff0c;因为不能重复分割&#xff0c;记录下一层递归分…

一种数据源切换的实践方案

随着业务的不断深入&#xff0c;我们会碰见很多关于数据源切换的业务场景&#xff0c;数据源切换也是当前最常用的分库后的分流策略方式之一&#xff0c;对于读写职责分离的数据库集群而言&#xff0c;我们在服务层面制定相应的接口与数据库交互的定制化开发&#xff0c;也就是…

云 cloud 高可用系统--在RDS上实现,从原理上不可能保证你100%不丢数据

我写这篇文字&#xff0c;实属无奈&#xff0c;在目前很多企业都依赖云的情况下&#xff0c;数据库的很多事情都是身不由己&#xff0c;发生问题&#xff0c;你查看日志&#xff0c;分析日志可能你连日志都不是全部的&#xff0c;并且想通过程序来过滤这个日志很多情况下都有限…

数据库系统概述——第六章 关系数据理论(知识点复习+练习题)

&#x1f31f;博主&#xff1a;命运之光 &#x1f984;专栏&#xff1a;离散数学考前复习&#xff08;知识点题&#xff09; &#x1f353;专栏&#xff1a;概率论期末速成&#xff08;一套卷&#xff09; &#x1f433;专栏&#xff1a;数字电路考前复习 &#x1f99a;专栏&am…

CMU 15-445 Project #2 - B+Tree(CHECKPOINT #1)

CHECKPOINT #1 一、题目链接二、准备工作三、部分实现1.查找操作2.插入操作3.删除操作 四、评测结果 一、题目链接 二、准备工作 见 CMU 15-445 Project #0 - C Primer 中的准备工作。 三、部分实现 对于B树的节点定义&#xff0c;通过节点类的命名 b_plus_tree_page 不难发现…

linux-centos7操作系统查看系统未挂载的磁盘,挂载磁盘

linux-centos7操作系统查看系统未挂载的磁盘,挂载磁盘 查看当前磁盘空间 根目录 / 下也只有44G,其他目录只有10几G,正式环境肯定不够用 df -h查看硬盘数量和分区情况 fdisk -l查看到/dev/vdb 有500多G了 将/dev/vdb在分出一个区使用 第一步:编辑分区。执行命令fdisk …

pr视频叠加,即原视频右上角添加另外一个视频方法,以及pr导出视频步骤

一、pr视频叠加&#xff0c;即原视频右上角添加另外一个视频方法 在使用pr制作视频时&#xff0c;我们希望在原视频的左上角或右上角同步播放另外一个视频&#xff0c;如下图所示&#xff1a; 具体方法为&#xff1a; 1、导入原视频&#xff0c;第一个放在v1位置&#xff0c;第…

Selenium编写自动化用例的8种技巧

在开始自动化时&#xff0c;您可能会遇到各种可能包含在自动化代码中的方法&#xff0c;技术&#xff0c;框架和工具。有时&#xff0c;与提供更好的灵活性或解决问题的更好方法相比&#xff0c;这种多功能性导致代码更加复杂。在编写自动化代码时&#xff0c;重要的是我们能够…

【序列dp】最长上升子序列(一)

文章目录 最长上升子序列-序列dp概览895 最长上升子序列-O(n^2)1017 怪盗基德的滑翔翼1014 登山482 合唱队形1012 友好城市 最长上升子序列-序列dp 什么是序列相关的 DP &#xff1f;序列相关 DP&#xff0c;顾名思义&#xff0c;就是将动态规划算法用于数组或者字符串上&…

textgen教程(持续更新ing...)

诸神缄默不语-个人CSDN博文目录 官方GitHub项目&#xff1a;shibing624/textgen: TextGen: Implementation of Text Generation models, include LLaMA, BLOOM, GPT2, BART, T5, SongNet and so on. 文本生成模型&#xff0c;实现了包括LLaMA&#xff0c;ChatGLM&#xff0c;B…