Nameserver的主要作用是为消息的生产者和消息消费者提供关于主题Topic的路由信息,那么Nameserver需要存储路由的基础信息,还要管理Broker节点,包括路由注册、路由删除等。
一、路由元数据
路由元数据主要保存了topic信息,broker信息等
代码:RoutelnfoManager
我们先看下RoutelnfoManager中的五个map,分别存储了哪些元数据。
private final HashMap<String/* topic */, Map<String /* brokerName */ , QueueData>> topicQueueTable;
private final HashMap<String/* brokerName */, BrokerData> brokerAddrTable;
private final HashMap<String/* clusterName */, Set<String/* brokerName */>> clusterAddrTable;
private final HashMap<String/* brokerAddr */, BrokerLiveInfo> brokerLiveTable;
private final HashMap<String/* brokerAddr */, List<String>/* Filter Server */> filterServerTable;
topicQueueTable:**Topig消息队列路由信息,消息发送时根据路由表进行负载均衡brokerAddrTable:Broker基础信息,包括brokerName、所属集群名称、主备Broker地址clusterAddrTable:Broker集群信息,存储集群中所有Broker名称
brokerLiveTable:Broker状态信息,NameServer每次收到心跳包是会替换该信息filterserverTable:Broker上的FilterServer列表,用于类模式消息过滤
topicQueueTable的json结构如下:
topicQueueTable : {
"topic1": [
{
"brokerName":"broker-a",
"readoueueNums":4,
"writeQueueNums":4,
"perm":6,
"topicSynFlag":0
},
{
"brokerName":"broker-b",
"readoueueNums":4,
"writeQueueNums":4,
"perm":6,
"topicSynFlag":0
}],
"topic other":[]
}
上述维护了主题topic与broker之间的关系。但是我们看到这里只有broker的名字,那么怎么找到这个broker呢。就需要brokerAddrTable来维护,我们看下brokerAddrTable的基本结构
brokerAddrTable :{
"broker-a": [
{
"cluster":"c1",
"brokerName" :"broker-a",
"brokerAddrs" :{0:"192.168.56.1:10000",1:"192.168.56.2:10000"
},
"broker-b":
{
"cluster":"cl",
"brokerAddrs" :{0:"192.168.56.3:10000",1:"192.168.56.4:10000"
}
}
上述结构就维护了broker的地址和端口号。我们再看下clusterAddrTable,也就是集群信息的基本结构,很简单就不详细说明了
clusterAddrTable:{"CH":[{"broker-a", "broker-b" }]
下面,就是broker的状态信息,可用还是不可用,brokerLiveTable的结构如下
brokerLiveTable :{
"192.168.56.1:10000":
{
"lastUpdateTimestamp":1518270318980,
"dataVersion":version0bl,
"channel":channelobj,
"haServerAddr":"192.168.56.2:10000"
},
"192.168.56.2:10000":{...},
...
}
上述维护了一个broker的心跳信息,包括上次心跳的发送时间戳,netty的channel信息等。
小结
RocketMQ 基于订阅发布机制, 一个Topic 拥有多个消息队列,一个Broker 为每一主题默认创建4 个读队列4 个写队列。多个Broker 组成一个集群, BrokerName 由相同的多台Broker组成Master-Slave 架构, brokerId 为0 代表Master, 大于0 表示Slave 。BrokerLivelnfo 中的lastUpdateTimestamp 存储上次收到Broker 心跳包的时间。
二、路由注册
2.1 broker启动流程
路由的注册由broker完成。我们通过源码来看下broker启动的的逻辑。和nameserver启动流程相似,broker启动后也是要创建一个BrokerController,大致的流程如下图所示。
这里贴一部分源码
public static BrokerController createBrokerController(String[] args) {
......
// 创建三个config对象 BrokerConfig NettyServerConfig NettyClientConfig
final BrokerConfig brokerConfig = new BrokerConfig();
final NettyServerConfig nettyServerConfig = new NettyServerConfig();
final NettyClientConfig nettyClientConfig = new NettyClientConfig();
// 给config对象赋值
// 设定服务端口
......
nettyServerConfig.setListenPort(10911);
// 通过构造函数创建BrokerController 实例
final BrokerController controller = new BrokerController(
brokerConfig,
nettyServerConfig,
nettyClientConfig,
messageStoreConfig);
// remember all configs to prevent discard
controller.getConfiguration().registerConfig(properties);
// 调用初始化方法
boolean initResult = controller.initialize();
if (!initResult) {
controller.shutdown();
System.exit(-3);
}
// 钩子函数,等待jvm停止时调用
Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
private volatile boolean hasShutdown = false;
private AtomicInteger shutdownTimes = new AtomicInteger(0);
@Override
public void run() {
synchronized (this) {
log.info("Shutdown hook was invoked, {}", this.shutdownTimes.incrementAndGet());
if (!this.hasShutdown) {
this.hasShutdown = true;
long beginTime = System.currentTimeMillis();
controller.shutdown();
long consumingTimeTotal = System.currentTimeMillis() - beginTime;
log.info("Shutdown hook over, consuming total time(ms): {}", consumingTimeTotal);
}
}
}
}, "ShutdownHook"));
return controller;
} catch (Throwable e) {
e.printStackTrace();
System.exit(-1);
}
return null;
}
initialize()这个方法我们后面再详细说明。现在我们的brokerController已经创建完成,我们看源码中的后续调用逻辑。
public static void main(String[] args) {
start(createBrokerController(args));
}
public static BrokerController start(BrokerController controller) {
try {
controller.start();
...
}
return null;
}
从上面贴出的代码可以看到,后续borker调用了brokerController的start方法。在这个方法中就是broker向namesrv注册的过程。
2.2 路由的注册
brokerController的start方法中有如下代码:
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
BrokerController.this.registerBrokerAll(true, false, brokerConfig.isForceRegister());
} catch (Throwable e) {
log.error("registerBrokerAll Exception", e);
}
}
}, 1000 * 10, Math.max(10000, Math.min(brokerConfig.getRegisterNameServerPeriod(), 60000)), TimeUnit.MILLISECONDS);
上述代码就是设定了一个定时任务,brokerConfig.getRegisterNameServerPeriod()的默认值是30s,所以默认每个30秒回去注册broker的信息。我们进入registerBrokerAll的代码内部
public synchronized void registerBrokerAll(final boolean checkOrderConfig, boolean oneway, boolean forceRegister) {
TopicConfigSerializeWrapper topicConfigWrapper = this.getTopicConfigManager().buildTopicConfigSerializeWrapper();
if (!PermName.isWriteable(this.getBrokerConfig().getBrokerPermission())
|| !PermName.isReadable(this.getBrokerConfig().getBrokerPermission())) {
ConcurrentHashMap<String, TopicConfig> topicConfigTable = new ConcurrentHashMap<>();
for (TopicConfig topicConfig : topicConfigWrapper.getTopicConfigTable().values()) {
TopicConfig tmp =
new TopicConfig(topicConfig.getTopicName(), topicConfig.getReadQueueNums(), topicConfig.getWriteQueueNums(),
this.brokerConfig.getBrokerPermission());
topicConfigTable.put(topicConfig.getTopicName(), tmp);
}
topicConfigWrapper.setTopicConfigTable(topicConfigTable);
}
if (forceRegister || needRegister(this.brokerConfig.getBrokerClusterName(),
this.getBrokerAddr(),
this.brokerConfig.getBrokerName(),
this.brokerConfig.getBrokerId(),
this.brokerConfig.getRegisterBrokerTimeoutMills())) {
doRegisterBrokerAll(checkOrderConfig, oneway, topicConfigWrapper);
}
}
上述代码经过一系列的参数检查和封装,最后调用了 doRegisterBrokerAll 方法。
private void doRegisterBrokerAll(boolean checkOrderConfig, boolean oneway,
TopicConfigSerializeWrapper topicConfigWrapper) {
// 调用brokerOuterAPI的registerBrokerAll方法去注册
List<RegisterBrokerResult> registerBrokerResultList = this.brokerOuterAPI.registerBrokerAll(
...
}
进入跟进doRegisterBrokerAll 方法内部。主要有一下几个步骤
1、获得namesrv的地址列表
2、设置请求头信息
3、遍历所有的namesrv
4、注册
public List<RegisterBrokerResult> registerBrokerAll(...) {
// 获得namesrv的地址列表
final List<RegisterBrokerResult> registerBrokerResultList = new CopyOnWriteArrayList<>();
List<String> nameServerAddressList = this.remotingClient.getNameServerAddressList();
if (nameServerAddressList != null && nameServerAddressList.size() > 0) {
// 设置请求头信息
final RegisterBrokerRequestHeader requestHeader = new RegisterBrokerRequestHeader();
requestHeader.setBrokerAddr(brokerAddr);
requestHeader.setBrokerId(brokerId);
requestHeader.setBrokerName(brokerName);
requestHeader.setClusterName(clusterName);
requestHeader.setHaServerAddr(haServerAddr);
requestHeader.setCompressed(compressed);
......
// 遍历所有的namesrv
final CountDownLatch countDownLatch = new CountDownLatch(nameServerAddressList.size());
for (final String namesrvAddr : nameServerAddressList) {
brokerOuterExecutor.execute(() -> {
try {
// 注册
RegisterBrokerResult result = registerBroker(namesrvAddr, oneway, timeoutMills, requestHeader, body);
if (result != null) {
registerBrokerResultList.add(result);
}
log.info("register broker[{}]to name server {} OK", brokerId, namesrvAddr);
} catch (Exception e) {
log.warn("registerBroker Exception, {}", namesrvAddr, e);
} finally {
countDownLatch.countDown();
}
});
}
try {
countDownLatch.await(timeoutMills, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
}
}
return registerBrokerResultList;
}
小结
broker从启动到注册基本流程如下:
1、初始化BrokerController
- 通过配置文件或者命令行获得配置参数创建三个配置对象,BrokerConfig、 NettyServerConfig、NettyClientConfig;
- 通过构造函数创建出BrokerController的实例;
- BrokerController的实例初始化(有大量的定时任务启动);
- 预设钩子函数,当jvm结束时调用BrokerController实例的shutdown方法;
2、调用BrokerController实例的start方法注册路由
内部注册了一个定时任务,默认每个30s向namesrv集群注册信息,也就是发送心跳。
2.3 处理请求包
好的,现在broker端的心跳上报已经结束了,此时我们要看下namesrv中的处理请求包的过程。nameserver模块中处理心跳的类叫DefaultRequestProcessor.class,位置如下图所示:
在其processRequest(ChannelHandlerContext ctx,RemotingCommand request) 方法中,会根据请求的RequestCode来匹配不同的处理方法。此时我们看到匹配注册的code如下:
case RequestCode.REGISTER_BROKER:
Version brokerVersion = MQVersion.value2Version(request.getVersion());
if (brokerVersion.ordinal() >= MQVersion.Version.V3_0_11.ordinal()) {
return this.registerBrokerWithFilterServer(ctx, request);
} else {
return this.registerBroker(ctx, request);
}
这里有一个判定,borker的版本号是否大于了3.0.11,这里又学到了一招如何兼容低版本。registerBrokerWithFilterServer和registerBroker都会调用最终的注册方 —namesrvController .getRouteInfoManager().registerBroker,如下:
RegisterBrokerResult result = this.namesrvController.getRouteInfoManager().registerBroker(
requestHeader.getClusterName(),
requestHeader.getBrokerAddr(),
requestHeader.getBrokerName(),
requestHeader.getBrokerId(),
requestHeader.getHaServerAddr(),
topicConfigWrapper,
null,
ctx.channel()
);
RouteInfoManager.class的registerBroker方法是核心方法,我们看下具体这个方法做了什么工作。
{
...
this.lock.writeLock().lockInterruptibly();
// 更新clusterAddrTable
Set<String> brokerNames = this.clusterAddrTable.computeIfAbsent(clusterName, k -> new HashSet<>());
brokerNames.add(brokerName);
boolean registerFirst = false;
// 更新brokerAddrTable
BrokerData brokerData = this.brokerAddrTable.get(brokerName);
if (null == brokerData) {
registerFirst = true;
brokerData = new BrokerData(clusterName, brokerName, new HashMap<>());
this.brokerAddrTable.put(brokerName, brokerData);
}
Map<Long, String> brokerAddrsMap = brokerData.getBrokerAddrs();
// 更新topicQueueTable
if (null != topicConfigWrapper
&& MixAll.MASTER_ID == brokerId) {
if (this.isBrokerTopicConfigChanged(brokerAddr, topicConfigWrapper.getDataVersion())
|| registerFirst) {
ConcurrentMap<String, TopicConfig> tcTable =
topicConfigWrapper.getTopicConfigTable();
if (tcTable != null) {
for (Map.Entry<String, TopicConfig> entry : tcTable.entrySet()) {
this.createAndUpdateQueueData(brokerName, entry.getValue());
}
}
}
}
// 更新brokerLiveTable
BrokerLiveInfo prevBrokerLiveInfo = this.brokerLiveTable.put(brokerAddr,
new BrokerLiveInfo(
System.currentTimeMillis(),
topicConfigWrapper.getDataVersion(),
channel,
haServerAddr));
// 更新filterServerTable
if (filterServerList != null) {
if (filterServerList.isEmpty()) {
this.filterServerTable.remove(brokerAddr);
} else {
this.filterServerTable.put(brokerAddr, filterServerList);
}
}
}
从上述代码中我们可以看到,就是更新了我们第一章路由元数据里面提到的五个基数元数据的信息,依次更新了clusterAddrTable、brokerAddrTable、topicQueueTable、brokerLiveTable和filterServerTable。
至此,路由注册完成。
2.4 路由删除
Broker每隔30s向Nameserver发送一个心跳包,心跳包包含BrokerId,Broker地址,Broker名称,Broker 所属集群名称、Broker 关联的 Filterserver列表。
但是如果Broker宕机,Nameserver 无法收到心跳包,此时Nameserver如何来剔除这些失效的Broker呢?Nameserver会每隔10s扫描brokerLiveTable状态表,如果BrokerLive的lastUpdateTimestamp的时间戳距当前时间超过120s,则认为Broker失效,移除该Broker,关闭与Broker连接,同时更新topicqueueTable、brokerAddrTable、brokerLiveTable、fiterserverTable。
RocketMQ有两个触发点来删除路由信息:
1、NameServer定期扫描brokerLiveTable检测上次心跳包与当前系统的时间差,如果时间超过120s,则需要移除broker。
2、Broker在正常关闭的情况下,会执行unregisterBroker指令。
这两种方式路由删除的方法都是一样的,就是从相关路由表中删除与该broker相关的信息。
我们在上一章已经讲述了Nameserver启动后会有一个定时任务来扫描 brokerLiveTable状态表,核心代码如下:
public int scanNotActiveBroker() {
int removeCount = 0;
Iterator<Entry<String, BrokerLiveInfo>> it = this.brokerLiveTable.entrySet().iterator();
while (it.hasNext()) {
Entry<String, BrokerLiveInfo> next = it.next();
long last = next.getValue().getLastUpdateTimestamp();
if ((last + BROKER_CHANNEL_EXPIRED_TIME) < System.currentTimeMillis()) {
RemotingUtil.closeChannel(next.getValue().getChannel());
it.remove();
log.warn("The broker channel expired, {} {}ms", next.getKey(), BROKER_CHANNEL_EXPIRED_TIME);
this.onChannelDestroy(next.getKey(), next.getValue().getChannel());
removeCount++;
}
}
return removeCount;
}
我们看到主要是获得最后一次上报的时间,加上120s,如果小于当前时间,就先关闭channel,然后从brokerLiveTable中移除这个broker的信息。最后调用onChannelDestroy方法,这个方法就是从topicqueueTable、brokerAddrTable、fiterserverTable这几个当中查找相关数据并删除。这里就不一一展开说明了。
2.5 路由发现
RocketMO路由发现是非实时的,当Topic路由出现变化后,NameServer不会主动推送给客户端,而是由客户端定时拉取主题最新的路由。
代码:NameServer:DefaultRequestProcessor#getRouteInfoByTopic
case RequestCode.GET_ROUTEINFO_BY_TOPIC:
return this.getRouteInfoByTopic(ctx, request);
进入getRouteInfoByTopic的代码,我们基本看到就是查询到路由信息,然后重新编码后返回客户端。
public RemotingCommand getRouteInfoByTopic(ChannelHandlerContext ctx,
RemotingCommand request) throws RemotingCommandException {
final RemotingCommand response = RemotingCommand.createResponseCommand(null);
final GetRouteInfoRequestHeader requestHeader =
(GetRouteInfoRequestHeader) request.decodeCommandCustomHeader(GetRouteInfoRequestHeader.class);
TopicRouteData topicRouteData = this.namesrvController.getRouteInfoManager().pickupTopicRouteData(requestHeader.getTopic());
if (topicRouteData != null) {
if (this.namesrvController.getNamesrvConfig().isOrderMessageEnable()) {
String orderTopicConf =
this.namesrvController.getKvConfigManager().getKVConfig(NamesrvUtil.NAMESPACE_ORDER_TOPIC_CONFIG,
requestHeader.getTopic());
topicRouteData.setOrderTopicConf(orderTopicConf);
}
byte[] content;
Boolean standardJsonOnly = requestHeader.getAcceptStandardJsonOnly();
if (request.getVersion() >= Version.V4_9_4.ordinal() || (null != standardJsonOnly && standardJsonOnly)) {
content = topicRouteData.encode(SerializerFeature.BrowserCompatible,
SerializerFeature.QuoteFieldNames, SerializerFeature.SkipTransientField,
SerializerFeature.MapSortField);
} else {
content = RemotingSerializable.encode(topicRouteData);
}
response.setBody(content);
response.setCode(ResponseCode.SUCCESS);
response.setRemark(null);
return response;
}
response.setCode(ResponseCode.TOPIC_NOT_EXIST);
response.setRemark("No topic route info in name server for the topic: " + requestHeader.getTopic()
+ FAQUrl.suggestTodo(FAQUrl.APPLY_TOPIC_URL));
return response;
}
三、小结
总结下来就是下面这张图