在 RocketMQ 这一高性能分布式消息队列系统中,BrokerOuterAPI 组件犹如一座桥梁,连接着 Broker 与外部世界,在系统的运行、管理以及与其他组件交互过程中发挥着极为关键的作用。本文将深入探讨 BrokerOuterAPI 组件的内部机制、核心功能以及其在实际应用场景中的价值。
一、BrokerOuterAPI 组件概述
BrokerOuterAPI 并非一个孤立的模块,而是一组封装了 Broker 对外提供服务接口的集合。它涵盖了与客户端(Producer、Consumer)、其他 Broker 以及 NameServer 等进行通信和交互的关键逻辑。通过这些接口,Broker 能够接收并处理各种请求,实现消息的发送、消费、存储管理以及集群状态同步等核心功能。
二、核心功能剖析
1.主要属性信息
/**
* netty客户端的组件
*/
private final RemotingClient remotingClient;
/**
* 地址
*/
private final TopAddressing topAddressing = new TopAddressing(MixAll.getWSAddr());
/**
* nameServer地址
*/
private String nameSrvAddr = null;
/**
* 固定大小的线程池 4-10个
*/
private BrokerFixedThreadPoolExecutor brokerOuterExecutor = new BrokerFixedThreadPoolExecutor(4, 10, 1, TimeUnit.MINUTES,
new ArrayBlockingQueue<>(32), new ThreadFactoryImpl("brokerOutApi_thread_", true));
public class TopAddressing {
private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.COMMON_LOGGER_NAME);
/**
* nameServer的地址
*/
private String nsAddr;
/**
* ws地址
*/
private String wsAddr;
/**
* 单元名称
*/
private String unitName;
}
2.核心方法
2.1fetchNameServerAddr
在 RocketMQ 中,BrokerOuterAPI
的fetchNameServerAddr
方法在整个系统的架构和运行中扮演着不可或缺的角色。
方法功能
fetchNameServerAddr
方法主要用于获取 NameServer 的地址信息。NameServer 在 RocketMQ 集群中承担着路由管理的重要职责,它维护着 Broker 的地址、主题与队列的映射关系等关键信息。Broker 需要与 NameServer 保持紧密通信,无论是注册自身信息、获取最新的路由数据,还是进行心跳检测等操作,都依赖于准确的 NameServer 地址。而fetchNameServerAddr
方法正是为 Broker 提供了获取这些关键地址信息的途径。通过调用此方法,Broker 能够知晓 NameServer 的网络位置,进而建立起与 NameServer 之间的有效连接,确保后续各种交互操作得以顺利进行。
方法调用时机
-
Broker 启动阶段:当 Broker 启动时,它首先需要知道 NameServer 的地址,以便能够向其注册自身信息并获取初始的路由数据。此时,Broker 会调用
fetchNameServerAddr
方法来获取 NameServer 的地址。例如,在一个新搭建的 RocketMQ 集群中,各个 Broker 节点在启动过程中,会通过此方法获取到预先配置或动态发现的 NameServer 地址,然后与 NameServer 建立连接,完成注册流程,使得自身能够被纳入到整个集群的管理体系中。 -
地址变更或重连场景:在 RocketMQ 集群运行过程中,可能会出现 NameServer 地址变更的情况,比如由于集群的扩容、网络架构调整等原因,NameServer 的地址发生了改变。或者当 Broker 与 NameServer 之间的连接因为网络故障等原因断开时,Broker 需要重新连接到 NameServer。在这些场景下,Broker 会再次调用
fetchNameServerAddr
方法,以获取最新有效的 NameServer 地址,从而重新建立连接,恢复与 NameServer 之间的通信,保证系统的正常运行。
代码
//获取NameServer的地址
public String fetchNameServerAddr() {
try {
//获取到nameServer的地址
String addrs = this.topAddressing.fetchNSAddr();
if (addrs != null && !UtilAll.isBlank(addrs)) {
if (!addrs.equals(this.nameSrvAddr)) {
log.info("name server address changed, old: {} new: {}", this.nameSrvAddr, addrs);
this.updateNameServerAddressList(addrs);
this.nameSrvAddr = addrs;
return nameSrvAddr;
}
}
} catch (Exception e) {
log.error("fetchNameServerAddr Exception", e);
}
return nameSrvAddr;
}
//更新nameServer的地址
public void updateNameServerAddressList(final String addrs) {
List<String> lst = new ArrayList<String>();
String[] addrArray = addrs.split(";");
for (String addr : addrArray) {
lst.add(addr);
}
//针对remotingClient 更新nameServer的地址
this.remotingClient.updateNameServerAddressList(lst);
}
2.2 registerBrokerAll
在 RocketMQ 里,BrokerOuterAPI
中的registerBrokerAll
方法是实现 Broker 在集群中注册与信息同步的关键。下面我将详细为你介绍它的功能、工作流程以及在集群管理中的重要性。
方法功能
registerBrokerAll
方法主要用于 Broker 向 NameServer 注册自身信息,并且会同步一些关键的配置与状态数据,以确保 NameServer 掌握整个集群的最新布局与各 Broker 详细信息。这个方法的执行,让 Producer 和 Consumer 能够通过 NameServer 获取到准确的 Broker 地址与相关属性,从而实现消息的发送与消费。
工作流程
-
构建注册请求数据:当 Broker 启动或者检测到自身状态有重大变化(如新增或移除消息队列等)时,会调用
registerBrokerAll
方法。方法内部首先会收集一系列要注册的信息,包括 Broker 的唯一标识符(brokerId
)、所属集群名称(clusterName
)、Broker 地址(brokerAddr
)、Master Broker 地址(如果当前 Broker 是 Slave 角色)、Broker 所支持的消息类型、当前 Broker 存储的消息队列信息(包括每个主题下的队列数量与分布情况)以及 Broker 的配置参数等。 -
向 NameServer 发送请求:将上述构建好的注册信息封装成网络请求,通过与 NameServer 建立的网络连接,发送到 NameServer 集群中的各个节点。RocketMQ 中的 NameServer 通常以集群形式部署,以保证高可用性和负载均衡,所以
registerBrokerAll
方法会确保注册信息同步到所有 NameServer 节点。 -
NameServer 处理注册请求:NameServer 接收到注册请求后,会进行一系列处理。它会检查注册信息的完整性与合法性,比如验证
brokerId
是否唯一、clusterName
是否存在等。若信息合法,NameServer 会将 Broker 的信息更新到其内部维护的路由表中。这个路由表记录了集群中所有 Broker 的详细信息,包括它们的地址、所属集群、负责的主题与队列等,是 Producer 和 Consumer 进行消息路由的重要依据。 -
返回注册结果:NameServer 处理完注册请求后,会向 Broker 返回注册结果。如果注册成功,Broker 会收到确认信息,表明其已经成功在 NameServer 中注册,并且后续可以正常接收来自 Producer 和 Consumer 的请求;若注册失败,NameServer 会返回失败原因,Broker 可能需要根据错误信息进行相应调整后重新尝试注册。
代码:
/**
* 在这里 broker通过netty客户端组件进行向NameSever组件发起注册请求
* @param clusterName 集群名称
* @param brokerAddr broker地址
* @param brokerName broker名字
* @param brokerId brokerid
* @param haServerAddr 高可用地址
* @param topicConfigWrapper topic的元数据
* @param filterServerList 过滤服务器
* @param oneway 是否oneway请求
* @param timeoutMills 超时时间
* @param compressed 是否启用压缩
* @return
*/
public List<RegisterBrokerResult> registerBrokerAll(
final String clusterName,
final String brokerAddr,
final String brokerName,
final long brokerId,
final String haServerAddr,
final TopicConfigSerializeWrapper topicConfigWrapper,
final List<String> filterServerList,
final boolean oneway,
final int timeoutMills,
final boolean compressed) {
//初始化了一个List集合 用来存放Broker的注册结果的返回值
final List<RegisterBrokerResult> registerBrokerResultList = new CopyOnWriteArrayList<>();
//获取nameServerAddressList的集合
List<String> nameServerAddressList = this.remotingClient.getNameServerAddressList();
if (nameServerAddressList != null && nameServerAddressList.size() > 0) {
//构建注册请求的请求头信息
final RegisterBrokerRequestHeader requestHeader = new RegisterBrokerRequestHeader();
requestHeader.setBrokerAddr(brokerAddr);
requestHeader.setBrokerId(brokerId);
requestHeader.setBrokerName(brokerName);
requestHeader.setClusterName(clusterName);
requestHeader.setHaServerAddr(haServerAddr);
requestHeader.setCompressed(compressed);
//构建注册请求的请求体信息
RegisterBrokerBody requestBody = new RegisterBrokerBody();
requestBody.setTopicConfigSerializeWrapper(topicConfigWrapper);
requestBody.setFilterServerList(filterServerList);
final byte[] body = requestBody.encode(compressed);
final int bodyCrc32 = UtilAll.crc32(body);
requestHeader.setBodyCrc32(bodyCrc32);
//这块进行搞了一个CountDownLatch 只有向所有的NameServer注册完成之后才能继续执行
final CountDownLatch countDownLatch = new CountDownLatch(nameServerAddressList.size());
for (final String namesrvAddr : nameServerAddressList) {
brokerOuterExecutor.execute(() -> {
try {
//真正的执行注册的操作
RegisterBrokerResult result = registerBroker(namesrvAddr, oneway, timeoutMills, requestHeader, body);
if (result != null) {
registerBrokerResultList.add(result);
}
log.info("register broker[{}]to name server {} OK", brokerId, namesrvAddr);
} catch (Exception e) {
log.warn("registerBroker Exception, {}", namesrvAddr, e);
} finally {
countDownLatch.countDown();
}
});
}
try {
countDownLatch.await(timeoutMills, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
}
}
return registerBrokerResultList;
}
private RegisterBrokerResult registerBroker(
final String namesrvAddr,
final boolean oneway,
final int timeoutMills,
final RegisterBrokerRequestHeader requestHeader,
final byte[] body
) throws RemotingCommandException, MQBrokerException, RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException,
InterruptedException {
//下面两行代码将请求头和请求体封装到RemotingCommand中
RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.REGISTER_BROKER, requestHeader);
request.setBody(body);
//这个oneway为true的时候就是单向发送请求,不需要等待响应 属于一种特殊情况
if (oneway) {
try {
this.remotingClient.invokeOneway(namesrvAddr, request, timeoutMills);
} catch (RemotingTooMuchRequestException e) {
// Ignore
}
return null;
}
// 真正执行发送请求的代码
RemotingCommand response = this.remotingClient.invokeSync(namesrvAddr, request, timeoutMills);
assert response != null;
switch (response.getCode()) {
case ResponseCode.SUCCESS: {
RegisterBrokerResponseHeader responseHeader =
(RegisterBrokerResponseHeader) response.decodeCommandCustomHeader(RegisterBrokerResponseHeader.class);
RegisterBrokerResult result = new RegisterBrokerResult();
result.setMasterAddr(responseHeader.getMasterAddr());
result.setHaServerAddr(responseHeader.getHaServerAddr());
if (response.getBody() != null) {
result.setKvTable(KVTable.decode(response.getBody(), KVTable.class));
}
return result;
}
default:
break;
}
throw new MQBrokerException(response.getCode(), response.getRemark(), requestHeader == null ? null : requestHeader.getBrokerAddr());
}
2.3unregisterBrokerAll
在 RocketMQ 的 BrokerOuterAPI 组件里,unregisterBrokerAll
方法在集群管理方面扮演着特殊且重要的角色。它主要用于从 NameServer 中注销特定 Broker 的所有相关信息,通常在 Broker 节点需要从集群中彻底移除或进行重大变更时被调用。
方法功能
-
信息移除:
unregisterBrokerAll
方法会向 NameServer 发起请求,将该 Broker 在 NameServer 维护的路由表中所有与之相关的记录删除。这包括 Broker 的地址、所属集群名称、Broker 角色(主节点 Master 或从节点 Slave),以及该 Broker 所负责的主题与队列等信息。NameServer 依靠这些信息来引导生产者(Producer)和消费者(Consumer)与正确的 Broker 节点进行通信,当 Broker 通过此方法注销后,NameServer 会更新内部数据结构,使得其他组件不再能通过 NameServer 找到该 Broker 的相关信息。 -
关联资源清理:在 Broker 自身内部,该方法会触发一系列关联资源的清理操作。例如,它会关闭与其他 Broker 节点用于数据同步的网络连接,停止对消息存储相关资源的维护(如关闭一些文件句柄、释放缓存资源等),因为该 Broker 即将不再参与集群的数据处理和存储工作。同时,Broker 也会清理本地维护的与其他组件(如 Producer、Consumer)交互的会话信息等。
方法调用时机
-
Broker 正常下线:当运维人员计划对某个 Broker 节点进行硬件升级、软件版本更新等操作,需要将该 Broker 从集群中暂时移除时,会调用
unregisterBrokerAll
方法。在操作完成且确保 Broker 符合上线条件后,再通过registerBroker
方法重新注册到集群中。这样可以保证在 Broker 下线期间,不会有新的请求被路由到该节点,避免出现服务中断或数据不一致问题。 -
Broker 故障处理:如果某个 Broker 节点出现严重故障,无法正常提供服务,并且短时间内难以修复,为了保障整个集群的稳定性和可用性,会立即调用
unregisterBrokerAll
方法将其从集群中注销。与此同时,集群中的其他 Broker 节点会根据配置和相关机制,接管故障 Broker 原本负责的部分工作,例如从节点(Slave)可能会切换为主节点(Master),继续提供消息存储和服务。
代码:
//broker的下线请求
public void unregisterBrokerAll(
final String clusterName,
final String brokerAddr,
final String brokerName,
final long brokerId
) {
List<String> nameServerAddressList = this.remotingClient.getNameServerAddressList();
if (nameServerAddressList != null) {
for (String namesrvAddr : nameServerAddressList) {
try {
this.unregisterBroker(namesrvAddr, clusterName, brokerAddr, brokerName, brokerId);
log.info("unregisterBroker OK, NamesrvAddr: {}", namesrvAddr);
} catch (Exception e) {
log.warn("unregisterBroker Exception, {}", namesrvAddr, e);
}
}
}
}
public void unregisterBroker(
final String namesrvAddr,
final String clusterName,
final String brokerAddr,
final String brokerName,
final long brokerId
) throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, InterruptedException, MQBrokerException {
UnRegisterBrokerRequestHeader requestHeader = new UnRegisterBrokerRequestHeader();
requestHeader.setBrokerAddr(brokerAddr);
requestHeader.setBrokerId(brokerId);
requestHeader.setBrokerName(brokerName);
requestHeader.setClusterName(clusterName);
RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.UNREGISTER_BROKER, requestHeader);
RemotingCommand response = this.remotingClient.invokeSync(namesrvAddr, request, 3000);
assert response != null;
switch (response.getCode()) {
case ResponseCode.SUCCESS: {
return;
}
default:
break;
}
throw new MQBrokerException(response.getCode(), response.getRemark(), brokerAddr);
}
三、总结与展望
BrokerOuterAPI 组件作为 RocketMQ 中 Broker 对外交互的窗口,承载了消息发送、消费以及集群管理等核心功能,是 RocketMQ 能够高效、可靠运行的重要基石。深入理解 BrokerOuterAPI 的内部机制和功能,有助于开发者更好地优化 RocketMQ 的应用,提升分布式系统的性能和稳定性。随着分布式技术的不断发展和应用场景的日益复杂,相信 BrokerOuterAPI 组件也将持续演进,为 RocketMQ 在更多领域的广泛应用提供有力支撑。