RocketMQ源码阅读-七-高可用
- 概述
- NameServer高可用
- Broker注册到NameServer
- Producer、Consumer 访问 Namesrv
- Broker高可用
- Broker主从配置
- Master、Slave通信组件
- Master与Slave的通信协议
- Slave节点逻辑
- Master节点逻辑
- Master_SYNC模式
- Producer发消息
- Consumer消费消息
- 总结
本篇分析RocketMQ如何实现高可用。
概述
Rocket的高可用包括NameServer和Broker的高可用,以及Producer和Consumer与他们的通信。
NameServer高可用
NameServer是一个轻量级的注册中心,提供命名服务。可以启动多个NameServer实现高可用。
RocketMQ的多个NameServer之间,没有任何关系,没有主从之分,不进行通信与数据同步。
Broker注册到NameServer
Broker循环注册多个NameServer。具体代码为BrokerOuterAPI#registerBrokerAll:
/**
* 注册到多个 Namesrv
*
* @param clusterName 集群名
* @param brokerAddr broker地址
* @param brokerName brokerName
* @param brokerId brokerId
* @param haServerAddr 高可用服务地址。用于broker master节点给 slave节点同步数据
* @param topicConfigWrapper topic配置信息
* @param filterServerList filtersrv数组
* @param oneway 是否oneway通信方式
* @param timeoutMills 请求超时时间
* @return 注册结果
*/
public RegisterBrokerResult registerBrokerAll(
final String clusterName,
final String brokerAddr,
final String brokerName,
final long brokerId,
final String haServerAddr,
final TopicConfigSerializeWrapper topicConfigWrapper,
final List<String> filterServerList,
final boolean oneway,
final int timeoutMills) {
RegisterBrokerResult registerBrokerResult = null;
List<String> nameServerAddressList = this.remotingClient.getNameServerAddressList();
if (nameServerAddressList != null) {
for (String namesrvAddr : nameServerAddressList) {
try {
// 循环注册多个 Namesrv
RegisterBrokerResult result = this.registerBroker(namesrvAddr, clusterName, brokerAddr, brokerName, brokerId,
haServerAddr, topicConfigWrapper, filterServerList, oneway, timeoutMills);
if (result != null) {
registerBrokerResult = result;
}
log.info("register broker to name server {} OK", namesrvAddr);
} catch (Exception e) {
log.warn("registerBroker Exception, {}", namesrvAddr, e);
}
}
}
return registerBrokerResult;
}
Producer、Consumer 访问 Namesrv
Producer和Consumer会从NameServer列表中选一个可用的进行通信。具体代码为NettyRemotingClient#getAndCreateNameserverChannel:
private Channel getAndCreateNameserverChannel() throws InterruptedException {
// 返回已选择、可连接Namesrv
String addr = this.namesrvAddrChoosed.get();
if (addr != null) {
ChannelWrapper cw = this.channelTables.get(addr);
if (cw != null && cw.isOK()) {
return cw.getChannel();
}
}
//
final List<String> addrList = this.namesrvAddrList.get();
if (this.lockNamesrvChannel.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) {
try {
// 返回已选择、可连接的Namesrv
addr = this.namesrvAddrChoosed.get();
if (addr != null) {
ChannelWrapper cw = this.channelTables.get(addr);
if (cw != null && cw.isOK()) {
return cw.getChannel();
}
}
// 从【Namesrv列表】中选择一个连接的返回
if (addrList != null && !addrList.isEmpty()) {
for (int i = 0; i < addrList.size(); i++) {
int index = this.namesrvIndex.incrementAndGet();
index = Math.abs(index);
index = index % addrList.size();
String newAddr = addrList.get(index);
this.namesrvAddrChoosed.set(newAddr);
Channel channelNew = this.createChannel(newAddr);
if (channelNew != null)
return channelNew;
}
}
} catch (Exception e) {
log.error("getAndCreateNameserverChannel: create name server channel exception", e);
} finally {
this.lockNamesrvChannel.unlock();
}
} else {
log.warn("getAndCreateNameserverChannel: try to lock name server, but timeout, {}ms", LOCK_TIMEOUT_MILLIS);
}
return null;
}
Broker高可用
Broker的高可用,需要通过分组+集群的方式。
Broker分组包括 1 个 Master节点 和 N 个 slave节点。其中Master节点提供读写服务,Slvae节点提供只读服务。
多个Broker分组组成一个集群。
对个Broker分组之间没有数据通信。
Broker主从配置
- 每个分组,Master节点 不断发送新的 CommitLog 给 Slave节点。 Slave节点 不断上报本地的 CommitLog 已经同步到的位置给 Master节点。
- Broker分组 与 Broker分组 之间没有任何关系,不进行通信与数据同步。
- Slave 节点会从 Master 节点拉取消费进度、Topic 配置等等。
集群内,Master节点 有两种类型:
- Master_SYNC:在 Producer 发送消息时,等待 Slave节点 存储完毕后再返回发送结果。
- Master_ASYNC:在 Producer 发送消息时,不需等待 Slave节点 存储完毕,直接返回发送结果。
Master、Slave通信组件
Master和Slave节点通过如下组件进行通信:
- Master节点
- AcceptSocketService:接收 Slave节点 连接
- HAConnection:
- ReadSocketService:读来自 Slave节点 的数据
- WriteSocketService:写到往 Slave节点 的数据
- Slave节点
- HAClient:对 Master节点 连接、读写数据
- HAClient:对 Master节点 连接、读写数据
Master与Slave的通信协议
待补充,// todo
Slave节点逻辑
Slave的主要逻辑是:
- 接收从 Master 传输的 CommitLog 数据
- 上传 Master 自己本地的 CommitLog 已经同步物理位置
Slave的主要逻辑都在类HAClient.java中:
HAClient实现了Runable接口,实现了run方法,run方法源码为:
@Override
public void run() {
log.info(this.getServiceName() + " service started");
while (!this.isStopped()) {
try {
if (this.connectMaster()) {
// 若到满足上报间隔,上报到Master进度
if (this.isTimeToReportOffset()) {
boolean result = this.reportSlaveMaxOffset(this.currentReportedOffset);
if (!result) {
this.closeMaster();
}
}
this.selector.select(1000);
// 处理读取事件
boolean ok = this.processReadEvent();
if (!ok) {
this.closeMaster();
}
// 若进度有变化,上报到Master进度
if (!reportSlaveMaxOffsetPlus()) {
continue;
}
// Master过久未返回数据,关闭连接
long interval = HAService.this.getDefaultMessageStore().getSystemClock().now() - this.lastWriteTimestamp;
if (interval > HAService.this.getDefaultMessageStore().getMessageStoreConfig()
.getHaHousekeepingInterval()) {
log.warn("HAClient, housekeeping, found this connection[" + this.masterAddress
+ "] expired, " + interval);
this.closeMaster();
log.warn("HAClient, master not response some time, so close connection");
}
} else {
this.waitForRunning(1000 * 5);
}
} catch (Exception e) {
log.warn(this.getServiceName() + " service has exception. ", e);
this.waitForRunning(1000 * 5);
}
}
log.info(this.getServiceName() + " service end");
}
从源码可以看出,Slave先连接Master,连接成功后:
- 上报自己本地的 CommitLog 已经同步物理位置
- 处理读取事件,也即处理Master同步给自己的CommitLog信息
第一步上报自己本地的 CommitLog 已经同步物理位置调用的方法是HAClient#reportSlaveMaxOffset:
private final ByteBuffer reportOffset = ByteBuffer.allocate(8);
/**
* 上报进度
*
* @param maxOffset 进度
* @return 是否上报成功
*/
private boolean reportSlaveMaxOffset(final long maxOffset) {
this.reportOffset.position(0);
this.reportOffset.limit(8);
this.reportOffset.putLong(maxOffset);
this.reportOffset.position(0);
this.reportOffset.limit(8);
for (int i = 0; i < 3 && this.reportOffset.hasRemaining(); i++) {
try {
this.socketChannel.write(this.reportOffset);
} catch (IOException e) {
log.error(this.getServiceName()
+ "reportSlaveMaxOffset this.socketChannel.write exception", e);
return false;
}
}
return !this.reportOffset.hasRemaining();
}
做的操作就是更新缓冲区的数据,并刷新缓冲区。
第二步处理读取事件调用的方法为HAClient#processReadEvent:
private boolean processReadEvent() {
int readSizeZeroTimes = 0;
while (this.byteBufferRead.hasRemaining()) {
try {
int readSize = this.socketChannel.read(this.byteBufferRead);
if (readSize > 0) {
lastWriteTimestamp = HAService.this.defaultMessageStore.getSystemClock().now();
readSizeZeroTimes = 0;
// 处理读取事件
boolean result = this.dispatchReadRequest();
if (!result) {
log.error("HAClient, dispatchReadRequest error");
return false;
}
} else if (readSize == 0) {
if (++readSizeZeroTimes >= 3) {
break;
}
} else {
// TODO ERROR
log.info("HAClient, processReadEvent read socket < 0");
return false;
}
} catch (IOException e) {
log.info("HAClient, processReadEvent read socket exception", e);
return false;
}
}
return true;
}
此方法是判断缓冲区是否有新数据,有的话出发读取函数HAClient#dispatchReadRequest:
/**
* 读取Master传输的CommitLog数据,并返回是否异常
* 如果读取到数据,写入CommitLog
* 异常原因:
* 1. Master传输来的数据offset 不等于 Slave的CommitLog数据最大offset
* 2. 上报到Master进度失败
*
* @return 是否异常
*/
private boolean dispatchReadRequest() {
final int msgHeaderSize = 8 + 4; // phyoffset + size
int readSocketPos = this.byteBufferRead.position();
while (true) {
// 读取到请求
int diff = this.byteBufferRead.position() - this.dispatchPostion;
if (diff >= msgHeaderSize) {
// 读取masterPhyOffset、bodySize。使用dispatchPostion的原因是:处理数据“粘包”导致数据读取不完整。
long masterPhyOffset = this.byteBufferRead.getLong(this.dispatchPostion);
int bodySize = this.byteBufferRead.getInt(this.dispatchPostion + 8);
// 校验 Master传输来的数据offset 是否和 Slave的CommitLog数据最大offset 是否相同。
long slavePhyOffset = HAService.this.defaultMessageStore.getMaxPhyOffset();
if (slavePhyOffset != 0) {
if (slavePhyOffset != masterPhyOffset) {
log.error("master pushed offset not equal the max phy offset in slave, SLAVE: "
+ slavePhyOffset + " MASTER: " + masterPhyOffset);
return false;
}
}
// 读取到消息
if (diff >= (msgHeaderSize + bodySize)) {
// 写入CommitLog
byte[] bodyData = new byte[bodySize];
this.byteBufferRead.position(this.dispatchPostion + msgHeaderSize);
this.byteBufferRead.get(bodyData);
HAService.this.defaultMessageStore.appendToCommitLog(masterPhyOffset, bodyData);
// 设置处理到的位置
this.byteBufferRead.position(readSocketPos);
this.dispatchPostion += msgHeaderSize + bodySize;
// 上报到Master进度
if (!reportSlaveMaxOffsetPlus()) {
return false;
}
// 继续循环
continue;
}
}
// 空间写满,重新分配空间
if (!this.byteBufferRead.hasRemaining()) {
this.reallocateByteBuffer();
}
break;
}
return true;
}
Master节点逻辑
回顾下Master节点组件的功能:
- Master节点
- AcceptSocketService:接收 Slave节点 连接
- HAConnection:
- ReadSocketService:读来自 Slave节点 的数据
- WriteSocketService:写到往 Slave节点 的数据
先来看ReadSocketService:
ReadSocketService实现了Runable接口,实现了run方法:
@Override
public void run() {
HAConnection.log.info(this.getServiceName() + " service started");
while (!this.isStopped()) {
try {
this.selector.select(1000);
boolean ok = this.processReadEvent();
if (!ok) {
HAConnection.log.error("processReadEvent error");
break;
}
// 过长时间无心跳,断开连接
long interval = HAConnection.this.haService.getDefaultMessageStore().getSystemClock().now() - this.lastReadTimestamp;
if (interval > HAConnection.this.haService.getDefaultMessageStore().getMessageStoreConfig().getHaHousekeepingInterval()) {
log.warn("ha housekeeping, found this connection[" + HAConnection.this.clientAddr + "] expired, " + interval);
break;
}
} catch (Exception e) {
HAConnection.log.error(this.getServiceName() + " service has exception.", e);
break;
}
}
此方法调用ReadSocketService#processReadEvent方法:
private boolean processReadEvent() {
int readSizeZeroTimes = 0;
// 清空byteBufferRead
if (!this.byteBufferRead.hasRemaining()) {
this.byteBufferRead.flip();
this.processPostion = 0;
}
while (this.byteBufferRead.hasRemaining()) {
try {
int readSize = this.socketChannel.read(this.byteBufferRead);
if (readSize > 0) {
readSizeZeroTimes = 0;
// 设置最后读取时间
this.lastReadTimestamp = HAConnection.this.haService.getDefaultMessageStore().getSystemClock().now();
if ((this.byteBufferRead.position() - this.processPostion) >= 8) {
// 读取Slave 请求来的CommitLog的最大位置
int pos = this.byteBufferRead.position() - (this.byteBufferRead.position() % 8);
long readOffset = this.byteBufferRead.getLong(pos - 8);
this.processPostion = pos;
// 设置Slave CommitLog的最大位置
HAConnection.this.slaveAckOffset = readOffset;
// 设置Slave 第一次请求的位置
if (HAConnection.this.slaveRequestOffset < 0) {
HAConnection.this.slaveRequestOffset = readOffset;
log.info("slave[" + HAConnection.this.clientAddr + "] request offset " + readOffset);
}
// 通知目前Slave进度。主要用于Master节点为同步类型的。
HAConnection.this.haService.notifyTransferSome(HAConnection.this.slaveAckOffset);
}
} else if (readSize == 0) {
if (++readSizeZeroTimes >= 3) {
break;
}
} else {
log.error("read socket[" + HAConnection.this.clientAddr + "] < 0");
return false;
}
} catch (IOException e) {
log.error("processReadEvent exception", e);
return false;
}
}
return true;
}
}
主要逻辑就是读取Slave 请求来的CommitLog的最大位置,并设置Slave CommitLog的最大位置。
下面看WriteSocketService的逻辑:
同样是实现了Runable接口,实现了run方法,源码:
@Override
public void run() {
HAConnection.log.info(this.getServiceName() + " service started");
while (!this.isStopped()) {
try {
this.selector.select(1000);
// 未获得Slave读取进度请求,sleep等待。
if (-1 == HAConnection.this.slaveRequestOffset) {
Thread.sleep(10);
continue;
}
// 计算初始化nextTransferFromWhere
if (-1 == this.nextTransferFromWhere) {
if (0 == HAConnection.this.slaveRequestOffset) {
long masterOffset = HAConnection.this.haService.getDefaultMessageStore().getCommitLog().getMaxOffset();
masterOffset = masterOffset - (masterOffset % HAConnection.this.haService.getDefaultMessageStore().getMessageStoreConfig().getMapedFileSizeCommitLog());
if (masterOffset < 0) {
masterOffset = 0;
}
this.nextTransferFromWhere = masterOffset;
} else {
this.nextTransferFromWhere = HAConnection.this.slaveRequestOffset;
}
log.info("master transfer data from " + this.nextTransferFromWhere + " to slave[" + HAConnection.this.clientAddr
+ "], and slave request " + HAConnection.this.slaveRequestOffset);
}
if (this.lastWriteOver) {
long interval = HAConnection.this.haService.getDefaultMessageStore().getSystemClock().now() - this.lastWriteTimestamp;
if (interval > HAConnection.this.haService.getDefaultMessageStore().getMessageStoreConfig().getHaSendHeartbeatInterval()) { // 心跳
// Build Header
this.byteBufferHeader.position(0);
this.byteBufferHeader.limit(headerSize);
this.byteBufferHeader.putLong(this.nextTransferFromWhere);
this.byteBufferHeader.putInt(0);
this.byteBufferHeader.flip();
this.lastWriteOver = this.transferData();
if (!this.lastWriteOver)
continue;
}
} else { // 未传输完成,继续传输
this.lastWriteOver = this.transferData();
if (!this.lastWriteOver)
continue;
}
// 选择新的CommitLog内容进行传输
SelectMappedBufferResult selectResult =
HAConnection.this.haService.getDefaultMessageStore().getCommitLogData(this.nextTransferFromWhere);
if (selectResult != null) {
int size = selectResult.getSize();
if (size > HAConnection.this.haService.getDefaultMessageStore().getMessageStoreConfig().getHaTransferBatchSize()) {
size = HAConnection.this.haService.getDefaultMessageStore().getMessageStoreConfig().getHaTransferBatchSize();
}
long thisOffset = this.nextTransferFromWhere;
this.nextTransferFromWhere += size;
selectResult.getByteBuffer().limit(size);
this.selectMappedBufferResult = selectResult;
// Build Header
this.byteBufferHeader.position(0);
this.byteBufferHeader.limit(headerSize);
this.byteBufferHeader.putLong(thisOffset);
this.byteBufferHeader.putInt(size);
this.byteBufferHeader.flip();
this.lastWriteOver = this.transferData();
} else { // 没新的消息,挂起等待
HAConnection.this.haService.getWaitNotifyObject().allWaitForRunning(100);
}
} catch (Exception e) {
HAConnection.log.error(this.getServiceName() + " service has exception.", e);
break;
}
}
// 断开连接 & 暂停写线程 & 暂停读线程 & 释放CommitLog
if (this.selectMappedBufferResult != null) {
this.selectMappedBufferResult.release();
}
this.makeStop();
readSocketService.makeStop();
haService.removeConnection(HAConnection.this);
SelectionKey sk = this.socketChannel.keyFor(this.selector);
if (sk != null) {
sk.cancel();
}
try {
this.selector.close();
this.socketChannel.close();
} catch (IOException e) {
HAConnection.log.error("", e);
}
HAConnection.log.info(this.getServiceName() + " service end");
}
传输数据的方法为WriteSocketService#transferData:
/**
* 传输数据
*/
private boolean transferData() throws Exception {
int writeSizeZeroTimes = 0;
// Write Header
while (this.byteBufferHeader.hasRemaining()) {
int writeSize = this.socketChannel.write(this.byteBufferHeader);
if (writeSize > 0) {
writeSizeZeroTimes = 0;
this.lastWriteTimestamp = HAConnection.this.haService.getDefaultMessageStore().getSystemClock().now();
} else if (writeSize == 0) {
if (++writeSizeZeroTimes >= 3) {
break;
}
} else {
throw new Exception("ha master write header error < 0");
}
}
if (null == this.selectMappedBufferResult) {
return !this.byteBufferHeader.hasRemaining();
}
writeSizeZeroTimes = 0;
// Write Body
if (!this.byteBufferHeader.hasRemaining()) {
while (this.selectMappedBufferResult.getByteBuffer().hasRemaining()) {
int writeSize = this.socketChannel.write(this.selectMappedBufferResult.getByteBuffer());
if (writeSize > 0) {
writeSizeZeroTimes = 0;
this.lastWriteTimestamp = HAConnection.this.haService.getDefaultMessageStore().getSystemClock().now();
} else if (writeSize == 0) {
if (++writeSizeZeroTimes >= 3) {
break;
}
} else {
throw new Exception("ha master write body error < 0");
}
}
}
boolean result = !this.byteBufferHeader.hasRemaining() && !this.selectMappedBufferResult.getByteBuffer().hasRemaining();
if (!this.selectMappedBufferResult.getByteBuffer().hasRemaining()) {
this.selectMappedBufferResult.release();
this.selectMappedBufferResult = null;
}
return result;
}
Master_SYNC模式
此模式下,在Producer发消息时,Master节点会等待Slave节点存储完毕再返回给Producer发送结果,源码位置在CommitLog#putMessage方法:
/**
* 添加消息,返回消息结果
*
* @param msg 消息
* @return 结果
*/
public PutMessageResult putMessage(final MessageExtBrokerInner msg) {
// ...省略部分代码
// Synchronous write double 如果是同步Master,同步到从节点
if (BrokerRole.SYNC_MASTER == this.defaultMessageStore.getMessageStoreConfig().getBrokerRole()) {
HAService service = this.defaultMessageStore.getHaService();
if (msg.isWaitStoreMsgOK()) {
// Determine whether to wait
if (service.isSlaveOK(result.getWroteOffset() + result.getWroteBytes())) {
if (null == request) {
request = new GroupCommitRequest(result.getWroteOffset() + result.getWroteBytes());
}
service.putRequest(request);
// 唤醒WriteSocketService
service.getWaitNotifyObject().wakeupAll();
boolean flushOK = request.waitForFlush(this.defaultMessageStore.getMessageStoreConfig().getSyncFlushTimeout());
if (!flushOK) {
log.error("do sync transfer other node, wait return, but failed, topic: " + msg.getTopic() + " tags: "
+ msg.getTags() + " client address: " + msg.getBornHostString());
putMessageResult.setPutMessageStatus(PutMessageStatus.FLUSH_SLAVE_TIMEOUT);
}
}
// Slave problem
else {
// Tell the producer, slave not available
putMessageResult.setPutMessageStatus(PutMessageStatus.SLAVE_NOT_AVAILABLE);
}
}
}
return putMessageResult;
}
上述方法第22行唤醒 WriteSocketService
- WriteSocketService 挂起等待新消息结束,Master 传输 Slave 新的 CommitLog 数据
- Slave 收到数据后,立即上报最新的 CommitLog 同步进度到 Master。ReadSocketService 唤醒第 24 行:request#waitForFlush(…)
Producer发消息
Producer 发消息时,从 Broker集群 的所有队列中进行选择,相关代码在DefaultMQProducerImpl#sendDefaultImpl中:
/**
* 发送消息。
* 1. 获取消息路由信息
* 2. 选择要发送到的消息队列
* 3. 执行消息发送核心方法
* 4. 对发送结果进行封装返回
*
* @param msg 消息
* @param communicationMode 通信模式
* @param sendCallback 发送回调
* @param timeout 发送消息请求超时时间
* @return 发送结果
* @throws MQClientException 当Client发生异常
* @throws RemotingException 当请求发生异常
* @throws MQBrokerException 当Broker发生异常
* @throws InterruptedException 当线程被打断
*/
private SendResult sendDefaultImpl(//
Message msg, //
final CommunicationMode communicationMode, //
final SendCallback sendCallback, //
final long timeout//
) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
// ...省略部分代码
// 获取 Topic路由信息
TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());
if (topicPublishInfo != null && topicPublishInfo.ok()) {
MessageQueue mq = null; // 最后选择消息要发送到的队列
Exception exception = null;
SendResult sendResult = null; // 最后一次发送结果
int timesTotal = communicationMode == CommunicationMode.SYNC ? 1 + this.defaultMQProducer.getRetryTimesWhenSendFailed() : 1; // 同步多次调用
int times = 0; // 第几次发送
String[] brokersSent = new String[timesTotal]; // 存储每次发送消息选择的broker名
// 循环调用发送消息,直到成功
for (; times < timesTotal; times++) {
String lastBrokerName = null == mq ? null : mq.getBrokerName();
@SuppressWarnings("SpellCheckingInspection")
MessageQueue tmpmq = this.selectOneMessageQueue(topicPublishInfo, lastBrokerName); // 选择消息要发送到的队列
if (tmpmq != null) {
mq = tmpmq;
brokersSent[times] = mq.getBrokerName();
try {
beginTimestampPrev = System.currentTimeMillis();
// 调用发送消息核心方法
sendResult = this.sendKernelImpl(msg, mq, communicationMode, sendCallback, topicPublishInfo, timeout);
endTimestamp = System.currentTimeMillis();
// 更新Broker可用性信息
this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false);
switch (communicationMode) {
case ASYNC:
return null;
case ONEWAY:
return null;
case SYNC:
if (sendResult.getSendStatus() != SendStatus.SEND_OK) {
if (this.defaultMQProducer.isRetryAnotherBrokerWhenNotStoreOK()) { // 同步发送成功但存储有问题时 && 配置存储异常时重新发送开关 时,进行重试
continue;
}
}
return sendResult;
default:
break;
}
} catch (RemotingException e) { // 打印异常,更新Broker可用性信息,更新继续循环
endTimestamp = System.currentTimeMillis();
this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true);
log.warn(String.format("sendKernelImpl exception, resend at once, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e);
log.warn(msg.toString());
exception = e;
continue;
} catch (MQClientException e) { // 打印异常,更新Broker可用性信息,继续循环
endTimestamp = System.currentTimeMillis();
this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true);
log.warn(String.format("sendKernelImpl exception, resend at once, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e);
log.warn(msg.toString());
exception = e;
continue;
} catch (MQBrokerException e) { // 打印异常,更新Broker可用性信息,部分情况下的异常,直接返回,结束循环
endTimestamp = System.currentTimeMillis();
this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true);
log.warn(String.format("sendKernelImpl exception, resend at once, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e);
log.warn(msg.toString());
exception = e;
switch (e.getResponseCode()) {
// 如下异常continue,进行发送消息重试
case ResponseCode.TOPIC_NOT_EXIST:
case ResponseCode.SERVICE_NOT_AVAILABLE:
case ResponseCode.SYSTEM_ERROR:
case ResponseCode.NO_PERMISSION:
case ResponseCode.NO_BUYER_ID:
case ResponseCode.NOT_IN_CURRENT_UNIT:
continue;
// 如果有发送结果,进行返回,否则,抛出异常;
default:
if (sendResult != null) {
return sendResult;
}
throw e;
}
} catch (InterruptedException e) {
endTimestamp = System.currentTimeMillis();
this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false);
log.warn(String.format("sendKernelImpl exception, throw exception, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e);
log.warn(msg.toString());
throw e;
}
} else {
break;
}
}
// 返回发送结果
if (sendResult != null) {
return sendResult;
}
// 根据不同情况,抛出不同的异常
String info = String.format("Send [%d] times, still failed, cost [%d]ms, Topic: %s, BrokersSent: %s", times, System.currentTimeMillis() - beginTimestampFirst,
msg.getTopic(), Arrays.toString(brokersSent)) + FAQUrl.suggestTodo(FAQUrl.SEND_MSG_FAILED);
MQClientException mqClientException = new MQClientException(info, exception);
if (exception instanceof MQBrokerException) {
mqClientException.setResponseCode(((MQBrokerException) exception).getResponseCode());
} else if (exception instanceof RemotingConnectException) {
mqClientException.setResponseCode(ClientErrorCode.CONNECT_BROKER_EXCEPTION);
} else if (exception instanceof RemotingTimeoutException) {
mqClientException.setResponseCode(ClientErrorCode.ACCESS_BROKER_TIMEOUT);
} else if (exception instanceof MQClientException) {
mqClientException.setResponseCode(ClientErrorCode.BROKER_NOT_EXIST_EXCEPTION);
}
throw mqClientException;
}
// Namesrv找不到异常
List<String> nsList = this.getmQClientFactory().getMQClientAPIImpl().getNameServerAddressList();
if (null == nsList || nsList.isEmpty()) {
throw new MQClientException(
"No name server address, please set it." + FAQUrl.suggestTodo(FAQUrl.NAME_SERVER_ADDR_NOT_EXIST_URL), null).setResponseCode(ClientErrorCode.NO_NAME_SERVER_EXCEPTION);
}
// 消息路由找不到异常
throw new MQClientException("No route info of this topic, " + msg.getTopic() + FAQUrl.suggestTodo(FAQUrl.NO_TOPIC_ROUTE_INFO),
null).setResponseCode(ClientErrorCode.NOT_FOUND_TOPIC_EXCEPTION);
}
Consumer消费消息
Consumer 消费消息时,也从 Broker集群 的所有队列中进行选择。
总结
RocketMQ实现高可用的架构如上图。
- Broker通过一个Master和0-N个Slave实现一个分组
- 多个Broker分组,行程一个集群,每个Broker分组之间互相没有联系,不进行通信
- pro