NameServer:作为注册中心,提供路由注册、路由踢出、路由发现功能,舍弃强一致,保证高可用,集群中各个节点不会实时通讯,其中一个节点下线之后,会提供另外一个节点保证路由功能。
Broker:消息中转角色,负责存储消息、转发消息。Broker服务器在RocketMQ系统中负责接收从生产者发送来的消息并存储、同时为消费者的拉取请求作准备。Broker服务器也存储消息相关的元数据,包括消费者组、消费进度偏移和主题和队列消息等。
Producer:负责生产消息,一般由业务系统负责生产消息。一个消息生产者会把业务应用系统里产生的消息发送到Broker服务器。RocketMQ提供多种发送方式,同步发送、异步发送、顺序发送、单向发送。同步和异步方式均需要Broker返回确认信息,单向发送不需要。
Consumer:负责消费消息,一般是后台系统负责异步消费。一个消息消费者会从Broker服务器拉取消息、并将其提供给应用程序。从用户应用的角度而言提供了两种消费形式:拉取式消费、推动式消费。
除了上面说的三个核心组件外,还有Topic这个概念下面也会多次提到:
Topic:表示一类消息的集合,每个Topic包含若干条消息,每条消息只能属于一个Topic,是RocketMQ进行消息订阅的基本单位。一个Topic可以分片在多个Broker集群上,每一个Topic分片包含多个queue
NameServer架构
启动入口:
org.apache.rocketmq.namesrv.NamesrvController#initialize
public boolean initialize() {
loadConfig();
initiateNetworkComponents();
initiateThreadExecutors();
registerProcessor();
startScheduleService();
initiateSslContext();
initiateRpcHooks();
return true;
}
路由注册
Broker服务器在启动的时候会想NameServer集群中所有的NameServer发送心跳信号进行注册,并会每隔30秒向nameserver发送心跳,告诉NameServer自己活着。NameServer接收到Broker发送的心跳包之后,会记录该broker信息,并保存最近一次收到心跳包的时间。
Broker服务发送心跳包,对外提供Topic配置
org.apache.rocketmq.broker.BrokerController#start
org.apache.rocketmq.broker.out.BrokerOuterAPI#registerBrokerAll
public void start() throws Exception {
this.shouldStartTime = System.currentTimeMillis() + messageStoreConfig.getDisappearTimeAfterStart();
if (messageStoreConfig.getTotalReplicas() > 1 && this.brokerConfig.isEnableSlaveActingMaster() || this.brokerConfig.isEnableControllerMode()) {
isIsolated = true;
}
if (this.brokerOuterAPI != null) {
this.brokerOuterAPI.start();
}
startBasicService();
if (!isIsolated && !this.messageStoreConfig.isEnableDLegerCommitLog() && !this.messageStoreConfig.isDuplicationEnable()) {
changeSpecialServiceStatus(this.brokerConfig.getBrokerId() == MixAll.MASTER_ID);
this.registerBrokerAll(true, false, true);
}
scheduledFutures.add(this.scheduledExecutorService.scheduleAtFixedRate(new AbstractBrokerRunnable(this.getBrokerIdentity()) {
@Override
public void run2() {
try {
if (System.currentTimeMillis() < shouldStartTime) {
BrokerController.LOG.info("Register to namesrv after {}", shouldStartTime);
return;
}
if (isIsolated) {
BrokerController.LOG.info("Skip register for broker is isolated");
return;
}
BrokerController.this.registerBrokerAll(true, false, brokerConfig.isForceRegister());
} catch (Throwable e) {
BrokerController.LOG.error("registerBrokerAll Exception", e);
}
}
}, 1000 * 10, Math.max(10000, Math.min(brokerConfig.getRegisterNameServerPeriod(), 60000)), TimeUnit.MILLISECONDS));
if (this.brokerConfig.isEnableSlaveActingMaster()) {
scheduleSendHeartbeat();
scheduledFutures.add(this.syncBrokerMemberGroupExecutorService.scheduleAtFixedRate(new AbstractBrokerRunnable(this.getBrokerIdentity()) {
@Override
public void run2() {
try {
BrokerController.this.syncBrokerMemberGroup();
} catch (Throwable e) {
BrokerController.LOG.error("sync BrokerMemberGroup error. ", e);
}
}
}, 1000, this.brokerConfig.getSyncBrokerMemberGroupPeriod(), TimeUnit.MILLISECONDS));
}
if (this.brokerConfig.isEnableControllerMode()) {
scheduleSendHeartbeat();
}
if (brokerConfig.isSkipPreOnline()) {
startServiceWithoutCondition();
}
}
public List<RegisterBrokerResult> registerBrokerAll(
final String clusterName,
final String brokerAddr,
final String brokerName,
final long brokerId,
final String haServerAddr,
final TopicConfigSerializeWrapper topicConfigWrapper,
final List<String> filterServerList,
final boolean oneway,
final int timeoutMills,
final boolean enableActingMaster,
final boolean compressed,
final Long heartbeatTimeoutMillis,
final BrokerIdentity brokerIdentity) {
final List<RegisterBrokerResult> registerBrokerResultList = new CopyOnWriteArrayList<>();
List<String> nameServerAddressList = this.remotingClient.getAvailableNameSrvList();
if (nameServerAddressList != null && nameServerAddressList.size() > 0) {
final RegisterBrokerRequestHeader requestHeader = new RegisterBrokerRequestHeader();
requestHeader.setBrokerAddr(brokerAddr);
requestHeader.setBrokerId(brokerId);
requestHeader.setBrokerName(brokerName);
requestHeader.setClusterName(clusterName);
requestHeader.setHaServerAddr(haServerAddr);
requestHeader.setEnableActingMaster(enableActingMaster);
requestHeader.setCompressed(false);
if (heartbeatTimeoutMillis != null) {
requestHeader.setHeartbeatTimeoutMillis(heartbeatTimeoutMillis);
}
RegisterBrokerBody requestBody = new RegisterBrokerBody();
requestBody.setTopicConfigSerializeWrapper(TopicConfigAndMappingSerializeWrapper.from(topicConfigWrapper));
requestBody.setFilterServerList(filterServerList);
final byte[] body = requestBody.encode(compressed);
final int bodyCrc32 = UtilAll.crc32(body);
requestHeader.setBodyCrc32(bodyCrc32);
final CountDownLatch countDownLatch = new CountDownLatch(nameServerAddressList.size());
for (final String namesrvAddr : nameServerAddressList) {
brokerOuterExecutor.execute(new AbstractBrokerRunnable(brokerIdentity) {
@Override
public void run2() {
try {
RegisterBrokerResult result = registerBroker(namesrvAddr, oneway, timeoutMills, requestHeader, body);
if (result != null) {
registerBrokerResultList.add(result);
}
LOGGER.info("Registering current broker to name server completed. TargetHost={}", namesrvAddr);
} catch (Exception e) {
LOGGER.error("Failed to register current broker to name server. TargetHost={}", namesrvAddr, e);
} finally {
countDownLatch.countDown();
}
}
});
}
try {
if (!countDownLatch.await(timeoutMills, TimeUnit.MILLISECONDS)) {
LOGGER.warn("Registration to one or more name servers does NOT complete within deadline. Timeout threshold: {}ms", timeoutMills);
}
} catch (InterruptedException ignore) {
}
}
return registerBrokerResultList;
}
NameServer注册broker信息,集群名称、broker名称、ha注册中心地址、超时时间
org.apache.rocketmq.namesrv.routeinfo.RouteInfoManager#registerBroker
public RegisterBrokerResult registerBroker(
final String clusterName,
final String brokerAddr,
final String brokerName,
final long brokerId,
final String haServerAddr,
final String zoneName,
final Long timeoutMillis,
final Boolean enableActingMaster,
final TopicConfigSerializeWrapper topicConfigWrapper,
final List<String> filterServerList,
final Channel channel) {
RegisterBrokerResult result = new RegisterBrokerResult();
try {
this.lock.writeLock().lockInterruptibly();
//init or update the cluster info
Set<String> brokerNames = ConcurrentHashMapUtils.computeIfAbsent((ConcurrentHashMap<String, Set<String>>) this.clusterAddrTable, clusterName, k -> new HashSet<>());
brokerNames.add(brokerName);
boolean registerFirst = false;
BrokerData brokerData = this.brokerAddrTable.get(brokerName);
if (null == brokerData) {
registerFirst = true;
brokerData = new BrokerData(clusterName, brokerName, new HashMap<>());
this.brokerAddrTable.put(brokerName, brokerData);
}
boolean isOldVersionBroker = enableActingMaster == null;
brokerData.setEnableActingMaster(!isOldVersionBroker && enableActingMaster);
brokerData.setZoneName(zoneName);
Map<Long, String> brokerAddrsMap = brokerData.getBrokerAddrs();
boolean isMinBrokerIdChanged = false;
long prevMinBrokerId = 0;
if (!brokerAddrsMap.isEmpty()) {
prevMinBrokerId = Collections.min(brokerAddrsMap.keySet());
}
if (brokerId < prevMinBrokerId) {
isMinBrokerIdChanged = true;
}
//Switch slave to master: first remove <1, IP:PORT> in namesrv, then add <0, IP:PORT>
//The same IP:PORT must only have one record in brokerAddrTable
brokerAddrsMap.entrySet().removeIf(item -> null != brokerAddr && brokerAddr.equals(item.getValue()) && brokerId != item.getKey());
//If Local brokerId stateVersion bigger than the registering one,
String oldBrokerAddr = brokerAddrsMap.get(brokerId);
if (null != oldBrokerAddr && !oldBrokerAddr.equals(brokerAddr)) {
BrokerLiveInfo oldBrokerInfo = brokerLiveTable.get(new BrokerAddrInfo(clusterName, oldBrokerAddr));
if (null != oldBrokerInfo) {
long oldStateVersion = oldBrokerInfo.getDataVersion().getStateVersion();
long newStateVersion = topicConfigWrapper.getDataVersion().getStateVersion();
if (oldStateVersion > newStateVersion) {
log.warn("Registered Broker conflicts with the existed one, just ignore.: Cluster:{}, BrokerName:{}, BrokerId:{}, " +
"Old BrokerAddr:{}, Old Version:{}, New BrokerAddr:{}, New Version:{}.",
clusterName, brokerName, brokerId, oldBrokerAddr, oldStateVersion, brokerAddr, newStateVersion);
//Remove the rejected brokerAddr from brokerLiveTable.
brokerLiveTable.remove(new BrokerAddrInfo(clusterName, brokerAddr));
return result;
}
}
}
if (!brokerAddrsMap.containsKey(brokerId) && topicConfigWrapper.getTopicConfigTable().size() == 1) {
log.warn("Can't register topicConfigWrapper={} because broker[{}]={} has not registered.",
topicConfigWrapper.getTopicConfigTable(), brokerId, brokerAddr);
return null;
}
String oldAddr = brokerAddrsMap.put(brokerId, brokerAddr);
registerFirst = registerFirst || (StringUtils.isEmpty(oldAddr));
boolean isMaster = MixAll.MASTER_ID == brokerId;
boolean isPrimeSlave = !isOldVersionBroker && !isMaster
&& brokerId == Collections.min(brokerAddrsMap.keySet());
if (null != topicConfigWrapper && (isMaster || isPrimeSlave)) {
ConcurrentMap<String, TopicConfig> tcTable =
topicConfigWrapper.getTopicConfigTable();
if (tcTable != null) {
for (Map.Entry<String, TopicConfig> entry : tcTable.entrySet()) {
if (registerFirst || this.isTopicConfigChanged(clusterName, brokerAddr,
topicConfigWrapper.getDataVersion(), brokerName,
entry.getValue().getTopicName())) {
final TopicConfig topicConfig = entry.getValue();
if (isPrimeSlave) {
// Wipe write perm for prime slave
topicConfig.setPerm(topicConfig.getPerm() & (~PermName.PERM_WRITE));
}
this.createAndUpdateQueueData(brokerName, topicConfig);
}
}
}
if (this.isBrokerTopicConfigChanged(clusterName, brokerAddr, topicConfigWrapper.getDataVersion()) || registerFirst) {
TopicConfigAndMappingSerializeWrapper mappingSerializeWrapper = TopicConfigAndMappingSerializeWrapper.from(topicConfigWrapper);
Map<String, TopicQueueMappingInfo> topicQueueMappingInfoMap = mappingSerializeWrapper.getTopicQueueMappingInfoMap();
//the topicQueueMappingInfoMap should never be null, but can be empty
for (Map.Entry<String, TopicQueueMappingInfo> entry : topicQueueMappingInfoMap.entrySet()) {
if (!topicQueueMappingInfoTable.containsKey(entry.getKey())) {
topicQueueMappingInfoTable.put(entry.getKey(), new HashMap<>());
}
//Note asset brokerName equal entry.getValue().getBname()
//here use the mappingDetail.bname
topicQueueMappingInfoTable.get(entry.getKey()).put(entry.getValue().getBname(), entry.getValue());
}
}
}
BrokerAddrInfo brokerAddrInfo = new BrokerAddrInfo(clusterName, brokerAddr);
BrokerLiveInfo prevBrokerLiveInfo = this.brokerLiveTable.put(brokerAddrInfo,
new BrokerLiveInfo(
System.currentTimeMillis(),
timeoutMillis == null ? DEFAULT_BROKER_CHANNEL_EXPIRED_TIME : timeoutMillis,
topicConfigWrapper == null ? new DataVersion() : topicConfigWrapper.getDataVersion(),
channel,
haServerAddr));
if (null == prevBrokerLiveInfo) {
log.info("new broker registered, {} HAService: {}", brokerAddrInfo, haServerAddr);
}
if (filterServerList != null) {
if (filterServerList.isEmpty()) {
this.filterServerTable.remove(brokerAddrInfo);
} else {
this.filterServerTable.put(brokerAddrInfo, filterServerList);
}
}
if (MixAll.MASTER_ID != brokerId) {
String masterAddr = brokerData.getBrokerAddrs().get(MixAll.MASTER_ID);
if (masterAddr != null) {
BrokerAddrInfo masterAddrInfo = new BrokerAddrInfo(clusterName, masterAddr);
BrokerLiveInfo masterLiveInfo = this.brokerLiveTable.get(masterAddrInfo);
if (masterLiveInfo != null) {
result.setHaServerAddr(masterLiveInfo.getHaServerAddr());
result.setMasterAddr(masterAddr);
}
}
}
if (isMinBrokerIdChanged && namesrvConfig.isNotifyMinBrokerIdChanged()) {
notifyMinBrokerIdChanged(brokerAddrsMap, null,
this.brokerLiveTable.get(brokerAddrInfo).getHaServerAddr());
}
} catch (Exception e) {
log.error("registerBroker Exception", e);
} finally {
this.lock.writeLock().unlock();
}
return result;
}
路由剔除:NameServer和每个Broker保持长连接,每隔30秒接收Broker发送的心跳包,同时自身每个10秒扫描BrokerLiveTable,比较上次收到心跳时间和当前时间比较是否大于120秒,如果超过,那么认为Broker不可用,剔除路由表中该Broker相关信息。
路由发现:路由发现不是实时的,路由变化后,NameServer不主动推给客户端,等待producer定期拉取最新路由信息。这样的设计方式降低了NameServer实现的复杂性,当路由发生变化时通过在消息发送端的容错机制来保证消息发送的高可用(这块内容会在后续介绍producer消息发送时介绍,本文不展开讲解)。