整体架构图
从部署架构图可知NameServer与所有的broker通讯,NameServer集群之间互不通信。
主要功能包括
1、Broker管理
1.1 维护Broker集群 clusterAddrTable
1.2 Broker信息 接收Broker注册信息并保存作为路由信息的基本数据
brokerAddrTable
1.3 Topic消息路由信息,消息发送时根据路由表进行负载均衡
topicQueueTable
1.4 Broker状态信息,NameServer每次收到心跳包后会更新心跳时间
brokerLiveTable
1.5 Broker 的消息过滤订阅信息
filterServerTable
1 NamesrvStartup 启动入口
NamesrvController 通过main方法调用createNamesrvController方法创建NamesrvController实例。
NamesrvController 主要属性 NamesrvConfig namesrvConfig 注册中心配置文件封装
//rocketmq主目录
private String rocketmqHome = System.getProperty(MixAll.ROCKETMQ_HOME_PROPERTY, System.getenv(MixAll.ROCKETMQ_HOME_ENV));
//*NameServer存储KV配置属性的持久化路径
private String kvConfigPath = System.getProperty("user.home") + File.separator + "namesrv" + File.separator + "kvConfig.json";
//nameServer默认配置文件路径
private String configStorePath = System.getProperty("user.home") + File.separator + "namesrv" + File.separator + "namesrv.properties";
private String productEnvName = "center";
private boolean clusterTest = false;
//是否支持顺序消息
private boolean orderMessageEnable = false;
NettyServerConfig nettyServerConfig netty服务端配置信息
/**
* 端口
*/
private int listenPort = 8888;
/**
* remotingExecutor 通信线程池中,固定服务工作线程
* 处理如broker注册,topic路由信息查询、topic删除等与producer、broker交互request
*/
private int serverWorkerThreads = 8;
/**
*事件处理器注册时如果没指定线程池时,使用serverCallbackExecutorThreads指定的公用publicExecutor来处理特定业务交互命令
*/
private int serverCallbackExecutorThreads = 0;
/**
* Netty Selector线程数量
*/
private int serverSelectorThreads = 3;
/**
* 同步发送支持的异常发送最大消息数
*/
private int serverOnewaySemaphoreValue = 256;
/**
* 异步发送支持的异常发送的最大消息数
*/
private int serverAsyncSemaphoreValue = 64;
/**
* 通道的最大空闲时间
*/
private int serverChannelMaxIdleTimeSeconds = 120;
/**
* 网络发送区域的缓存区大小
*/
private int serverSocketSndBufSize = NettySystemConfig.socketSndbufSize;
/**
* 网络接收区域的缓存区大小
*/
private int serverSocketRcvBufSize = NettySystemConfig.socketRcvbufSize;
/**
* 是否开启缓存
*/
private boolean serverPooledByteBufAllocatorEnable = true;
main 方法启动
createNamesrvController 创建 NamesrvController
启动调用controler initialize 方法
//主要就是将文件里的kv配置加载到内存里
this.kvConfigManager.load();
//创建NettyServer网络处理对象,创建远程服务器,这里使用的是netty框架,接着就是创建一个默认是8个线程的线程池
this.remotingServer = new NettyRemotingServer(this.nettyServerConfig, this.brokerHousekeepingService);
//通信线程池
this.remotingExecutor =
Executors.newFixedThreadPool(nettyServerConfig.getServerWorkerThreads(), new ThreadFactoryImpl("RemotingExecutorThread_"));
//注册processor,这个processor其实就是服务器收到请求后,看看执行的是什么命令,然后什么命令交给对应的那个processor
this.registerProcessor();
//开启定时任务:每隔10min打印一次KV配置
//开启定时任务:每隔10s扫描一次Broker,移除不活跃的Broker
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
NamesrvController.this.routeInfoManager.scanNotActiveBroker();
}
}, 5, 10, TimeUnit.SECONDS);