前言
这篇笔记记录broker启动的源码学习
broker主要完成一下几件事情:
1.接收producer的发送请求,并对消息进行持久化、同步其他节点
2.接收consumer读取消息星球
3.定时向nameSrv注册心跳信息,保持连接
在启动的时候,也是分了两个方法
1、创建brokerController
2、启动brokerController
createBrokerController
入口方法:org.apache.rocketmq.broker.BrokerStartup#main
在这里,我们先看创建brokerController的方法
org.apache.rocketmq.broker.BrokerStartup#createBrokerController
在这个方法中,会初始化brokerConfig、nettyServerConfig、nettyClientConfig,然后根绝配置文件中的配置信息,填充这几个配置信息,具体的细节,不再一一阐述,可以debug或者是追下源码看下
/**
* 7.初始化BrokerController对象
*/
final BrokerController controller = new BrokerController(
brokerConfig,
nettyServerConfig,
nettyClientConfig,
messageStoreConfig);
// remember all configs to prevent discard
controller.getConfiguration().registerConfig(properties);
/**
* 8.调用brokerController的初始化方法
*/
boolean initResult = controller.initialize();
if (!initResult) {
controller.shutdown();
System.exit(-3);
}
在创建brokerController之后,会调用其initialize()方法进行初始化。在这个初始化的方法中,总结来看,就做了两个事情:
1、加载消息文件到内存中:因为broker中负责持久化消息信息,所以在启动的时候,需要从磁盘文件上,加载消息到内存中
2、初始化了一大堆的线程池和定时线程池,暂时没有一一去看这些线程池是为了干什么
定时拉取所有的nameSrv,更新到内存中
在这一批的定时线程池中,有一个需要关注,就是下面截图中的这个,这个定时线程池要做的事情:
- 通过http请求,从一个url请求的返回结果中,获取到所有nameSrv地址(http://jmenv.tbsite.net:8080/rocketmq/nsaddr); 为什么这个地址可以返回nameSrv地址信息,暂时还没有搞明白
- 让后更新所有的nameSrv地址信息到本地内存中的这个集合中namesrvAddrList
- 这里保存到本地内存中,broker在定时向所有nameSrv注册心跳信息时,会用到这个内存中的nameSrv地址
public String fetchNameServerAddr() {
try {
String addrs = this.topAddressing.fetchNSAddr();
if (addrs != null) {
if (!addrs.equals(this.nameSrvAddr)) {
log.info("name server address changed, old: {} new: {}", this.nameSrvAddr, addrs);
this.updateNameServerAddressList(addrs);
this.nameSrvAddr = addrs;
return nameSrvAddr;
}
}
} catch (Exception e) {
log.error("fetchNameServerAddr Exception", e);
}
return nameSrvAddr;
}
public String fetchNameServerAddr() {
try {
// 获取当前所有的nameSrv地址信息
String addrs = this.topAddressing.fetchNSAddr();
if (addrs != null) {
if (!addrs.equals(this.nameSrvAddr)) {
log.info("name server address changed, old: {} new: {}", this.nameSrvAddr, addrs);
// 更新nameSrv地址信息到内存中
this.updateNameServerAddressList(addrs);
this.nameSrvAddr = addrs;
return nameSrvAddr;
}
}
} catch (Exception e) {
log.error("fetchNameServerAddr Exception", e);
}
return nameSrvAddr;
}
public void updateNameServerAddressList(final String addrs) {
List<String> lst = new ArrayList<String>();
String[] addrArray = addrs.split(";");
for (String addr : addrArray) {
lst.add(addr);
}
this.remotingClient.updateNameServerAddressList(lst);
}
@Override
public void updateNameServerAddressList(List<String> addrs) {
List<String> old = this.namesrvAddrList.get();
boolean update = false;
if (!addrs.isEmpty()) {
if (null == old) {
update = true;
} else if (addrs.size() != old.size()) {
update = true;
} else {
for (int i = 0; i < addrs.size() && !update; i++) {
if (!old.contains(addrs.get(i))) {
update = true;
}
}
}
if (update) {
Collections.shuffle(addrs);
log.info("name server address updated. NEW : {} , OLD: {}", addrs, old);
// 这里保存的所有nameSrv地址,是broker在定时向nameSrv注册服务心跳信息时,会用到
this.namesrvAddrList.set(addrs);
}
}
}
brokerController.start()
org.apache.rocketmq.broker.BrokerController#start
总结来说,在createBrokerController中new出来了一批组件,在start()方法中,分别调用其start()方法进行启动
public void start() throws Exception {
// 1.核心:消息存储组件 下面启动的组件,都是在createBrokerController时创建的
if (this.messageStore != null) {
this.messageStore.start();
}
// 2.核心: netty服务端组件
if (this.remotingServer != null) {
this.remotingServer.start();
}
if (this.fastRemotingServer != null) {
this.fastRemotingServer.start();
}
if (this.fileWatchService != null) {
this.fileWatchService.start();
}
// 3.核心:netty client组件,负责向外发送请求
if (this.brokerOuterAPI != null) {
this.brokerOuterAPI.start();
}
if (this.pullRequestHoldService != null) {
this.pullRequestHoldService.start();
}
if (this.clientHousekeepingService != null) {
this.clientHousekeepingService.start();
}
if (this.filterServerManager != null) {
this.filterServerManager.start();
}
// 4.这个判断,应该是判断是否启用了dleger,dleger是rocketmq在后期版本中引入的一个组件,可以用来主从切换等,默认值是false
if (!messageStoreConfig.isEnableDLegerCommitLog()) {
startProcessorByHa(messageStoreConfig.getBrokerRole());
handleSlaveSynchronize(messageStoreConfig.getBrokerRole());
// 向所有nameSrv注册broker信息
this.registerBrokerAll(true, false, true);
}
/**
* 5.这里启动的定时线程就是 broker定时向nameServer注册的逻辑,默认配置是30S,也就是说默认是每30S
* ,向nameServer注册一下当前broker的信息
*/
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
BrokerController.this.registerBrokerAll(true, false, brokerConfig.isForceRegister());
} catch (Throwable e) {
log.error("registerBrokerAll Exception", e);
}
}
}, 1000 * 10, Math.max(10000, Math.min(brokerConfig.getRegisterNameServerPeriod(), 60000)), TimeUnit.MILLISECONDS);
if (this.brokerStatsManager != null) {
this.brokerStatsManager.start();
}
if (this.brokerFastFailure != null) {
this.brokerFastFailure.start();
}
}
这里可以看到,启动了N个组件;需要注意的,有几个
- 调用messageStore.start()
- 初始化nettyServer和nettyClient
- 向所有nameSrv注册
- 启动定时线程池,定时向所有nameSrv注册broker信息,更新其心跳时间
messageStore.start()
我们先来看messageStore.start()方法
在messageStore启动的时候,有N多的逻辑,我先记录当前所了解到的,所用到的
/**
* 1、启动reputMessageService,更新consumerQueue文件
* 这里启动的异步线程,是为了监听commitLog文件,然后根据文件中写入的数据,更新consumerQueue和indexFiLe
*/
this.reputMessageService.setReputFromOffset(maxPhysicalPosInLogicQueue);
this.reputMessageService.start();
// 2、如果没有启动dleger模式,那就启动haService,这个service看起来是节点间同步msg时用到的
if (!messageStoreConfig.isEnableDLegerCommitLog()) {
this.haService.start();
}
// 3、这里启动的线程,是去处理延迟消息,将延迟消息的topic修改为真正的topic
this.handleScheduleMessageService(messageStoreConfig.getBrokerRole());
// 4、这里启动的几个组件,暂时还没细看,但是看起来和消息存储有关系
// 这个应该是和StoreCheckPoint文件有关系
this.flushConsumeQueueService.start();
// 这个看起来和mappedFileQueue数据的持久化有关系
this.commitLog.start();
this.storeStatsService.start();
/**
* 这里添加的定时任务,其中一个就是定时清理过期的文件
* 这里清理过期文件的逻辑暂时没有看明白,但是看内部的处理逻辑,是判断哪些需要删除,然后clear()
*/
this.addScheduleTask();
reputMessageService
在rocketmq中,除了commitLog文件,还有consumerQueue和indexFile文件,这两个文件,都是在commitLog文件写入之后,再根据commitLog文件,去写的,具体的逻辑,就在这个service中
在这个service中,会每隔1ms,执行一次判断,判断当前commitLog是否写入了新的消息,然后将新的消息,通过不同的CommitLogDispatcher写入到不同的文件中,其中最重要的是,就是consumerQueue和indexFile这两个文件
this.handleScheduleMessageService(messageStoreConfig.getBrokerRole());
这个service,是来处理延迟消息的,对于延迟消息,broker在启动的时候,会针对每个延迟级别,初始化一个timerTask,然后通过Timer去定时轮询调度
最主要的逻辑,就在截图中这个代码中,后面单独起一篇博客记录延迟消息的原理
定时向nameSrv注册心跳
接着我们来看定时向nameSrv注册broker心跳时间的逻辑
这里可以看到,是启动了一个定时执行的线程池,然后去执行注册的逻辑
这里注册的逻辑,中间的代码就不贴了,比较简单,基本上就是一些简单的判断,然后继续往下层调用,来看下相对比较底层的代码
org.apache.rocketmq.broker.out.BrokerOuterAPI#registerBrokerAll
在这个方法中,第一点,获取所有nameSrv的逻辑,和前面在brokerController启动时,启动的一个线程有关系,这里是从内存中获取nameSrv地址信息(namesrvAddrList 从这个集合中获取) ,在前面,有一个异步线程在定时的更新这个集合
然后这里通过countDownLatch的机制,在所有节点注册成功之后,返回
总结
总结来看,broker在启动的时候
- 启动nettyServer和nettyClient
- 启动了一些异步定时任务,定时更新一些数据到内存中,同时定时去处理一些逻辑