10.1 模块人口代码的功能
10.1.1 入口函数
首先看一下 NameServer 的源码目录(见图 10-1 ) 。NamesrvStartup 是模块的启动入 口, NamesrvController 是用来协块各个调模功能的代码。
我们从启动代码开始分析,找到 NamesrvStartup.java 里的 main 函数 public static void main(String[] args) {mainO(args);},发现它又把逻辑转到 main。这个函数里。
10.1.2 解析命令行参数
main0 函数主要完成两个功能,第一个功能是解析命令行参数,我们通过源码来看一看,重点是解析 - c 和 - p 参数。
public static void parseCommandlineAndConfigFile(String[] args) throws Exception {
System.setProperty(RemotingCommand.REMOTING_VERSION_KEY, Integer.toString(MQVersion.CURRENT_VERSION));
Options options = ServerUtil.buildCommandlineOptions(new Options());
CommandLine commandLine = ServerUtil.parseCmdLine("mqnamesrv", args, buildCommandlineOptions(options), new DefaultParser());
if (null == commandLine) {
System.exit(-1);
return;
}
namesrvConfig = new NamesrvConfig();
nettyServerConfig = new NettyServerConfig();
nettyClientConfig = new NettyClientConfig();
nettyServerConfig.setListenPort(9876);
if (commandLine.hasOption('c')) {
String file = commandLine.getOptionValue('c');
if (file != null) {
InputStream in = new BufferedInputStream(Files.newInputStream(Paths.get(file)));
properties = new Properties();
properties.load(in);
MixAll.properties2Object(properties, namesrvConfig);
MixAll.properties2Object(properties, nettyServerConfig);
MixAll.properties2Object(properties, nettyClientConfig);
if (namesrvConfig.isEnableControllerInNamesrv()) {
controllerConfig = new ControllerConfig();
MixAll.properties2Object(properties, controllerConfig);
}
namesrvConfig.setConfigStorePath(file);
System.out.printf("load config properties file OK, %s%n", file);
in.close();
}
}
MixAll.properties2Object(ServerUtil.commandLine2Properties(commandLine), namesrvConfig);
if (commandLine.hasOption('p')) {
MixAll.printObjectProperties(logConsole, namesrvConfig);
MixAll.printObjectProperties(logConsole, nettyServerConfig);
MixAll.printObjectProperties(logConsole, nettyClientConfig);
if (namesrvConfig.isEnableControllerInNamesrv()) {
MixAll.printObjectProperties(logConsole, controllerConfig);
}
System.exit(0);
}
if (null == namesrvConfig.getRocketmqHome()) {
System.out.printf("Please set the %s variable in your environment to match the location of the RocketMQ installation%n", MixAll.ROCKETMQ_HOME_ENV);
System.exit(-2);
}
MixAll.printObjectProperties(log, namesrvConfig);
MixAll.printObjectProperties(log, nettyServerConfig);
}
-c 命令行参数用来指定配置文件的位置; - p 命令行参数用来打印所有配置项的值。注意 ,用 - p 参数打印配置项的值之后程序就退出了,这是一个帮助调试的选项 。
10.1.3 初始化 NameServer 的 Controller
main0 函数的另外一个功能是初始化 Controller
根据解析出的配置参数, 调用 controller.initialize()来初始化,然后调用controIler.start() 让 NameServer 开始服务。还有 一个逻辑是注册 ShutdownHookThread ,当程序退出的时候会调用controller.shutdown 来做退出前的清理工作 。
10.2 NameServer 的总控逻辑
NameServer 的总控逻辑在 NamesrvController.java 代码中 。 NameServer 是集群 的协调者 ,它 只是简单地接收其他角色报上来的状态,然后根据请求返回相应的状态 。 首先, NameserverController 把执行线程池初始化好。
org.apache.rocketmq.namesrv.NamesrvController#initialize L103
org.apache.rocketmq.namesrv.NamesrvController#initiateThreadExecutors
private void startScheduleService() {
this.scanExecutorService.scheduleAtFixedRate(NamesrvController.this.routeInfoManager::scanNotActiveBroker,
5, this.namesrvConfig.getScanNotActiveBrokerInterval(), TimeUnit.MILLISECONDS);
this.scheduledExecutorService.scheduleAtFixedRate(NamesrvController.this.kvConfigManager::printAllPeriodically,
1, 10, TimeUnit.MINUTES);
this.scheduledExecutorService.scheduleAtFixedRate(() -> {
try {
NamesrvController.this.printWaterMark();
} catch (Throwable e) {
LOGGER.error("printWaterMark error.", e);
}
}, 10, 1, TimeUnit.SECONDS);
}
启动 了一个默认是 8 个线程的线程池 ( private int serverWorkerThreads = 8),还有两个定时执行的线程,一个用来扫描失效的 Broker (scanNotActiveBroker ) , 另一个用来打印配置信息( printAllPeriodically)。
然后启动负责通信的服务 remotingServer, remotingServer 监听一些端口 ,收到 Broker 、 Client 等发过来的请求后,根据请求的命令,调用不同的 Processor 来处理。 这些不同的处理逻辑被放到上面初始化的线程池中执行。
org.apache.rocketmq.namesrv.NamesrvController#initiateNetworkComponents
org.apache.rocketmq.namesrv.NamesrvController#registerProcessor
remotingServer 是基于Netty 封装的一个网络通信服务,要了解 remotingServer 需要先对 Netty 有个基本的认知。
10.3 核心业务逻辑处理
NameServer 的核心业务逻辑 , 在 DefaultRequestProcessor.java 中可以 一目了然地看出 。 网络通信服务模块收到请求后,就调用这个 Processor 来处理,
逻辑主体是个 switch 语句 ,根据 RequestCode 调用不同的函数来处理 ,从 RequestCode 可以了解到 NameServer 的主要功能,比如 : REGISTER_BROKER 是在集群中新加入一个 Broker 机器; GET_ROUTEINTO_BY_TOPIC 是请求获取一个 Topic 的路由信息 ; WIPE_WRITE_PERM OF BROKER 是删除一个 Broker 的写权限。
10.4 集群状态存储
NameServer 作为集群的协调者,需要保存和维护集群的各种元数据 , 这是通过 RoutelnfoManager 类来实现的。
private final static long DEFAULT_BROKER_CHANNEL_EXPIRED_TIME = 1000 * 60 * 2;
private final ReadWriteLock lock = new ReentrantReadWriteLock();
private final Map<String/* topic */, Map<String, QueueData>> topicQueueTable;
private final Map<String/* brokerName */, BrokerData> brokerAddrTable;
private final Map<String/* clusterName */, Set<String/* brokerName */>> clusterAddrTable;
private final Map<BrokerAddrInfo/* brokerAddr */, BrokerLiveInfo> brokerLiveTable;
private final Map<BrokerAddrInfo/* brokerAddr */, List<String>/* Filter Server */> filterServerTable;
private final Map<String/* topic */, Map<String/*brokerName*/, TopicQueueMappingInfo>> topicQueueMappingInfoTable;
5 张图告诉你 RocketMQ 为什么不使用 Zookeeper 做注册中心 - 腾讯云开发者社区-腾讯云RocketMQ 选择了自己写 NameServer 做注册中心而没有选择 Zookeeper,这是为什么呢?https://cloud.tencent.com/developer/article/2118883?shareByChannel=link#3.1每个结构存储着一类集群信息,具体含义在第 5 章有介绍。了解 RocketMQ各个角色的功能后,对每个结构的处理逻辑就好理解了 。 下面重点看一下控制访问这些结构的锁机制 。
锁分为互斥锁、读写锁; 也可分为可重入锁、不可重入锁。 在 NameServer的场景中 ,读取操作多,更改操作少, 所以选择读写锁能大大提高效率。 RoutelnfoManager 中使用的是可重人的读写锁 ( private final ReadWriteLock lock = new ReentrantReadWriteLock()),我们以 deleteTopic 函数为例,看一下锁的使用方式。
public void deleteTopic(final String topic) {
try {
this.lock.writeLock().lockInterruptibly();
this.topicQueueTable.remove(topic);
} catch (Exception e) {
log.error("deleteTopic Exception", e);
} finally {
this.lock.writeLock().unlock();
}
}
首先锁的获取和执行逻辑要放到一个 try {}里,然后在 finally {}中释放 。这是一种典型的使用方式,我们可以参考这种方式实现自己的代码。