前言
最近看了下rocketmq的源码,计划针对最近的学习,做一个笔记,先从nameServer启动的逻辑开始记录吧
在rocketmq中,有四个关键的组件
- nameServer
- broker
- producer
- consumer
这四个组件之间的关系是这样的
关于nameSrv
nameserver的作用是:提供类似于注册中心的功能,基于这个前提,我们可以知道,rocketmq需要提供读写的功能,因为在mq中,组件之间的通信是通过netty完成的,所以,nameserver只需要提供nettyServer即可
在nameSrv中,提供了两个功能:
1、broker在启动的时候,会将自己的信息注册到nameSrv中;
2、producer在向topic中发送消息的时候,consumer在去broker中拉取消息的时候,会先去nameSrv上找对应的topic信息,所以这是nameSrv的两大功能
接着我们来看nameSrv启动的源码
启动源码
nameSrv启动的入口是在:org.apache.rocketmq.namesrv.NamesrvStartup#main 从启动脚本中可以证明这个点
NamesrvController controller = createNamesrvController(args);
start(controller);
在nameSrv启动的入口处,有两行代码是需要关注的
代码1:初始化NamesrvController对象,这里主要是解析配置文件、控制台配置信息,根据配置信息,初始化nettyServerConfig和NameSrvConfig;然后根据这两个config对象,初始化namesrvController对象
代码2:初始化nameSrv
代码1的逻辑,没有什么特别重要的,就是解析配置文件的逻辑,来看代码2:
public static NamesrvController start(final NamesrvController controller) throws Exception {
if (null == controller) {
throw new IllegalArgumentException("NamesrvController is null");
}
/**
* 1.初始化了几个定时任务
*/
boolean initResult = controller.initialize();
if (!initResult) {
controller.shutdown();
System.exit(-3);
}
/**
* 2.这里应该是一个钩子方法,在关闭nameserver的时候会被调用
*/
Runtime.getRuntime().addShutdownHook(new ShutdownHookThread(log, new Callable<Void>() {
@Override
public Void call() throws Exception {
controller.shutdown();
return null;
}
}));
// 3.启动nettyServer,也就是namesrv启动,可以接收客户端的请求(这里的客户端 = 服务发送者和服务消息者)
controller.start();
return controller;
}
在这个方法中,标注了三个关键点:
在第一个方法中,其中有一个定时任务,需要关注
// 这个任务,每10S执行一次
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
NamesrvController.this.routeInfoManager.scanNotActiveBroker();
}
}, 5, 10, TimeUnit.SECONDS);
public void scanNotActiveBroker() {
// nameSrv在内存中存储的broker信息
Iterator<Entry<String, BrokerLiveInfo>> it = this.brokerLiveTable.entrySet().iterator();
while (it.hasNext()) {
Entry<String, BrokerLiveInfo> next = it.next();
// 获取当前broker最后一次心跳时间
long last = next.getValue().getLastUpdateTimestamp();
// 最后心跳时间 + 120S,超过这个时间,还没有接收到心跳,认为broker出现问题,暂时先清除
if ((last + BROKER_CHANNEL_EXPIRED_TIME) < System.currentTimeMillis()) {
RemotingUtil.closeChannel(next.getValue().getChannel());
it.remove();
log.warn("The broker channel expired, {} {}ms", next.getKey(), BROKER_CHANNEL_EXPIRED_TIME);
this.onChannelDestroy(next.getKey(), next.getValue().getChannel());
}
}
}
这个定时任务,是为了去扫描不活跃的broker信息,在nameSrv中,会保存所有的broker信息,这个broker信息是,每个broker在启动的时候,会通过netty请求,向所有nameSrv注册broker信息,同时会启动一个定时任务,每30S执行一次注册的逻辑(broker向nameSrv注册的逻辑,在后面记录broker启动源码的时候,会讲到,这里我们就先知道,broker会通过netty请求,每30S去nameSrv注册一次)
接收注册请求源码
我们来看下,nameSrv在接收到broker的注册请求,是如何处理的
在rocketmq中,发送netty请求的时候,会带上一个code编码,netty服务端会根据code码,路由到不同的类去处理,我们需要知道的是,broker在向nameSrv注册的时候,指定的code是:RequestCode.REGISTER_BROKER(103)
nameSrv在启动时,启动了一个nettyServer,在其
org.apache.rocketmq.remoting.netty.NettyRemotingServer.NettyServerHandler#channelRead0
org.apache.rocketmq.remoting.netty.NettyRemotingAbstract#processMessageReceived
org.apache.rocketmq.remoting.netty.NettyRemotingAbstract#processRequestCommand
org.apache.rocketmq.namesrv.processor.DefaultRequestProcessor#processRequest
在processRequest方法中,会根据code编码,判断当前请求是什么,如果是REGISTER_BROKER,会调用:org.apache.rocketmq.namesrv.processor.DefaultRequestProcessor#registerBroker
// 这个方法太长,删除了一些不需要关心的代码
public RemotingCommand registerBroker(ChannelHandlerContext ctx,
RemotingCommand request) throws RemotingCommandException {
final RemotingCommand response = RemotingCommand.createResponseCommand(RegisterBrokerResponseHeader.class);
final RegisterBrokerResponseHeader responseHeader = (RegisterBrokerResponseHeader) response.readCustomHeader();
final RegisterBrokerRequestHeader requestHeader =
(RegisterBrokerRequestHeader) request.decodeCommandCustomHeader(RegisterBrokerRequestHeader.class);
// 注册broker信息
RegisterBrokerResult result = this.namesrvController.getRouteInfoManager().registerBroker(
requestHeader.getClusterName(),
requestHeader.getBrokerAddr(),
requestHeader.getBrokerName(),
requestHeader.getBrokerId(),
requestHeader.getHaServerAddr(),
topicConfigWrapper,
null,
ctx.channel()
);
return response;
}
// 这个方法是nameSrv真正去更新内存中的broker信息
public RegisterBrokerResult registerBroker(
final String clusterName,
final String brokerAddr,
final String brokerName,
final long brokerId,
final String haServerAddr,
final TopicConfigSerializeWrapper topicConfigWrapper,
final List<String> filterServerList,
final Channel channel) {
RegisterBrokerResult result = new RegisterBrokerResult();
try {
try {
// 1、加锁
this.lock.writeLock().lockInterruptibly();
// 2、注册到clusterAddrTable中
Set<String> brokerNames = this.clusterAddrTable.get(clusterName);
if (null == brokerNames) {
brokerNames = new HashSet<String>();
this.clusterAddrTable.put(clusterName, brokerNames);
}
brokerNames.add(brokerName);
boolean registerFirst = false;
// 3、如果brokerAddrTable中,根据brokerName,查不到对应的value,表示这个broker是第一次注册
BrokerData brokerData = this.brokerAddrTable.get(brokerName);
if (null == brokerData) {
registerFirst = true;
brokerData = new BrokerData(clusterName, brokerName, new HashMap<Long, String>());
// 4、添加到map集合中
this.brokerAddrTable.put(brokerName, brokerData);
}
Map<Long, String> brokerAddrsMap = brokerData.getBrokerAddrs();
//Switch slave to master: first remove <1, IP:PORT> in namesrv, then add <0, IP:PORT>
//The same IP:PORT must only have one record in brokerAddrTable
Iterator<Entry<Long, String>> it = brokerAddrsMap.entrySet().iterator();
while (it.hasNext()) {
Entry<Long, String> item = it.next();
if (null != brokerAddr && brokerAddr.equals(item.getValue()) && brokerId != item.getKey()) {
it.remove();
}
}
String oldAddr = brokerData.getBrokerAddrs().put(brokerId, brokerAddr);
registerFirst = registerFirst || (null == oldAddr);
if (null != topicConfigWrapper
&& MixAll.MASTER_ID == brokerId) {
if (this.isBrokerTopicConfigChanged(brokerAddr, topicConfigWrapper.getDataVersion())
|| registerFirst) {
ConcurrentMap<String, TopicConfig> tcTable =
topicConfigWrapper.getTopicConfigTable();
if (tcTable != null) {
for (Map.Entry<String, TopicConfig> entry : tcTable.entrySet()) {
this.createAndUpdateQueueData(brokerName, entry.getValue());
}
}
}
}
// 5、这个map中,存放的是:addr + broker信息,其中包括最后心跳时间 lastUpdateTimestamp
BrokerLiveInfo prevBrokerLiveInfo = this.brokerLiveTable.put(brokerAddr,
new BrokerLiveInfo(
System.currentTimeMillis(),
topicConfigWrapper.getDataVersion(),
channel,
haServerAddr));
if (null == prevBrokerLiveInfo) {
log.info("new broker registered, {} HAServer: {}", brokerAddr, haServerAddr);
}
if (filterServerList != null) {
if (filterServerList.isEmpty()) {
this.filterServerTable.remove(brokerAddr);
} else {
this.filterServerTable.put(brokerAddr, filterServerList);
}
}
if (MixAll.MASTER_ID != brokerId) {
String masterAddr = brokerData.getBrokerAddrs().get(MixAll.MASTER_ID);
if (masterAddr != null) {
BrokerLiveInfo brokerLiveInfo = this.brokerLiveTable.get(masterAddr);
if (brokerLiveInfo != null) {
result.setHaServerAddr(brokerLiveInfo.getHaServerAddr());
result.setMasterAddr(masterAddr);
}
}
}
} finally {
this.lock.writeLock().unlock();
}
} catch (Exception e) {
log.error("registerBroker Exception", e);
}
return result;
}
在上面方法中,注释5这里的map:brokerLiveTable就是在前面定时扫描不活跃的broker时,所依赖的map
总结
nameSrv启动的源码比较简单,其实就干了两件事情
1、初始化nettyServer服务端,用来接收客户端请求,这里的客户端,包括:producer、consumer、broker
2、启动了一个定时任务,定时去扫描内存中不活跃的broker信息