可靠同步发送、可靠异步发送、单向发送、批量消息发送。
RocketMQ 消息发送需要考虑以下3个问题。
1)消息队列如何进行负载?
2)消息发送如何实现高可用?
3)批量消息发送如何实现一致性?
org.apache.rocketmq.common.message.Message
public class Message implements Serializable {
private String topic; // 主题
private int flag; // 消息标记(不做处理)
private Map<String, String> properties; // 扩展属性
private byte[] body; // 消息体
private String transactionId; // 事务 id public Message() { } }
消息扩展属性:
org.apache.rocketmq.common.sysflag.MessageSysFlag
?这是啥意思
public class MessageSysFlag {
public final static int COMPRESSED_FLAG = 0x1;
public final static int MULTI_TAGS_FLAG = 0x1 << 1;
public final static int TRANSACTION_NOT_TYPE = 0;
public final static int TRANSACTION_PREPARED_TYPE = 0x1 << 2;
public final static int TRANSACTION_COMMIT_TYPE = 0x2 << 2;
public final static int TRANSACTION_ROLLBACK_TYPE = 0x3 << 2;
public final static int BORNHOST_V6_FLAG = 0x1 << 4;
public final static int STOREHOSTADDRESS_V6_FLAG = 0x1 << 5; //Mark the flag for batch to avoid conflict
public final static int NEED_UNWRAP_FLAG = 0x1 << 6;
public final static int INNER_BATCH_FLAG = 0x1 << 7; // COMPRESSION_TYPE
public final static int COMPRESSION_LZ4_TYPE = 0x1 << 8;
public final static int COMPRESSION_ZSTD_TYPE = 0x2 << 8;
public final static int COMPRESSION_ZLIB_TYPE = 0x3 << 8;
public final static int COMPRESSION_TYPE_COMPARATOR = 0x7 << 8; }
tags:消息 tag,用于消息过滤。
keys:消息索引键,用空格隔开,RocketMQ 可以根据这些 key(键)快速检索消息。
waitStoreMsgOK:消息发送时是否等消息存储完成后再返回。
DefaultMQProducer
继承了 MQAdmin 接口
public interface MQAdmin {
// 创建 topic
void createTopic(String key, String newTopic, int queueNum, int topicSysFlag, Map<String, String> attributes) throws MQClientException;
// 根据时间搜索下标
long searchOffset(final MessageQueue mq, final long timestamp) throws MQClientException;
// 最大下标 long maxOffset(final MessageQueue mq) throws MQClientException;
// 最小下标 long minOffset(final MessageQueue mq) throws MQClientException;
// 获取最早消息存储时间 long earliestMsgStoreTime(final MessageQueue mq) throws MQClientException;
// 根据 msgId 查询消息 MessageExt viewMessage(final String offsetMsgId) throws RemotingException, MQBrokerException, InterruptedException, MQClientException;
// 查询消息 QueryResult queryMessage(final String topic, final String key, final int maxNum, final long begin, final long end) throws MQClientException, InterruptedException; }
org.apache.rocketmq.client.producer.MQProducer
主要方法有:
public interface MQProducer extends MQAdmin {
// 获取发布的消息队列
List<MessageQueue> fetchPublishMessageQueues(final String topic) throws MQClientException;
// 同步发送
SendResult send(final Message msg) throws MQClientException, RemotingException, MQBrokerException, InterruptedException;
// 异步发送
void send(final Message msg, final SendCallback sendCallback) throws MQClientException, RemotingException, InterruptedException;
// 单向消息发送
void sendOneway(final Message msg) throws MQClientException, RemotingException, InterruptedException;
// 同步方式发送消息,且发送到指定的消息队列。
SendResult send(final Message msg, final MessageQueue mq) throws MQClientException, RemotingException, MQBrokerException, InterruptedException;
// 异步方式发送消息,且发送到指定的消息队列。
void send(final Message msg, final MessageQueue mq, final SendCallback sendCallback) throws MQClientException, RemotingException, InterruptedException;
// 单向方式发送消息,且发送到指定的消息队列。
void sendOneway(final Message msg, final MessageQueue mq) throws MQClientException, RemotingException, InterruptedException;
// 同步批量消息发送。
SendResult send(final Message msg, final MessageQueueSelector selector, final Object arg) throws MQClientException, RemotingException, MQBrokerException, InterruptedException;
// 异步批量消息发送。
void send(final Message msg, final MessageQueueSelector selector, final Object arg, final SendCallback sendCallback) throws MQClientException, RemotingException, InterruptedException;
// 单向批量消息发送。
void sendOneway(final Message msg, final MessageQueueSelector selector, final Object arg) throws MQClientException, RemotingException, InterruptedException;
// 发送事务消息
TransactionSendResult sendMessageInTransaction(final Message msg, final LocalTransactionExecuter tranExecuter, final Object arg) throws MQClientException;
// 同步批量消息发送。
SendResult send(final Collection<Message> msgs) throws MQClientException, RemotingException, MQBrokerException, InterruptedException;
}
DefaultMQProducer 的核心属性
public class DefaultMQProducer extends ClientConfig implements MQProducer {
protected final transient DefaultMQProducerImpl defaultMQProducerImpl; // 实际的生存者调用实现
private String producerGroup; // 生产者组
private volatile int defaultTopicQueueNums = 4; // 默认 4 个 队列
private int sendMsgTimeout = 3000; // 发送超时时间
private int compressMsgBodyOverHowmuch = 1024 * 4; // 消息体最大大小
private int retryTimesWhenSendFailed = 2; // 同步重试次数
private int retryTimesWhenSendAsyncFailed = 2; // 异步重试次数
private boolean retryAnotherBrokerWhenNotStoreOK = false; // 发送失败是否会指定另外的 broker
private int maxMessageSize = 1024 * 1024 * 4; // 最大消息为 4M
}
消息生产者启动流程
org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl#start (boolean)
// 检查配置
this.checkConfig();
if (!this.defaultMQProducer.getProducerGroup().equals(MixAll.CLIENT_INNER_PRODUCER_GROUP)) {
// 修改生产者 id
this.defaultMQProducer.changeInstanceNameToPID();
}
// 创建实例
this.mQClientFactory = MQClientManager.getInstance().getOrCreateMQClientInstance(this.defaultMQProducer, rpcHook);
// 注册实例
boolean registerOK = mQClientFactory.registerProducer(this.defaultMQProducer.getProducerGroup(), this);
if (!registerOK) {
this.serviceState = ServiceState.CREATE_JUST;
throw new MQClientException("The producer group[" + this.defaultMQProducer.getProducerGroup()
+ "] has been created before, specify another name please." + FAQUrl.suggestTodo(FAQUrl.GROUP_NAME_DUPLICATE_URL),
null);
}
// 成功就放入方便管理
this.topicPublishInfoTable.put(this.defaultMQProducer.getCreateTopicKey(), new TopicPublishInfo());
// 启动
if (startFactory) {
mQClientFactory.start();
}
org.apache.rocketmq.client.impl.MQClientManager#getOrCreateMQClientInstance (org.apache.rocketmq.client.ClientConfig, org.apache.rocketmq.remoting.RPCHook)
public MQClientInstance getOrCreateMQClientInstance(final ClientConfig clientConfig, RPCHook rpcHook) {
// 实例 id
String clientId = clientConfig.buildMQClientId();
MQClientInstance instance = this.factoryTable.get(clientId);
// 不存在则创建一个实例
if (null == instance) {
instance =
new MQClientInstance(clientConfig.cloneClientConfig(),
this.factoryIndexGenerator.getAndIncrement(), clientId, rpcHook);
MQClientInstance prev = this.factoryTable.putIfAbsent(clientId, instance);
if (prev != null) {
instance = prev;
log.warn("Returned Previous MQClientInstance for clientId:[{}]", clientId);
} else {
log.info("Created new MQClientInstance for clientId:[{}]", clientId);
}
}
return instance;
}
消息发送基本流程
发送流程图
RocketMQ 发送-存储-消费全流程。
RocketMQ 发送主代码。
private SendResult sendDefaultImpl(
Message msg,
final CommunicationMode communicationMode,
final SendCallback sendCallback,
final long timeout
) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
// 消息验证
this.makeSureStateOK();
Validators.checkMessage(msg, this.defaultMQProducer);
// 拿 topic
TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());
if (topicPublishInfo != null && topicPublishInfo.ok()) {
...
// 发送主逻辑
sendResult = this.sendKernelImpl(msg, mq, communicationMode, sendCallback, topicPublishInfo, timeout - costTime);
}
// 远程获取 nameSrc
validateNameServerSetting();
throw new MQClientException("No route info of this topic: " + msg.getTopic() + FAQUrl.suggestTodo(FAQUrl.NO_TOPIC_ROUTE_INFO),
null).setResponseCode(ClientErrorCode.NOT_FOUND_TOPIC_EXCEPTION);
}
查找路由信息
tryToFindTopicPublishInfo 是查找主题的路由信息的方法
获取路由信息,发哪个 Broker。
private TopicPublishInfo tryToFindTopicPublishInfo(final String topic) {
// 查找路由信息
TopicPublishInfo topicPublishInfo = this.topicPublishInfoTable.get(topic);
if (null == topicPublishInfo || !topicPublishInfo.ok()) {
this.topicPublishInfoTable.putIfAbsent(topic, new TopicPublishInfo());
this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic);
topicPublishInfo = this.topicPublishInfoTable.get(topic);
}
if (topicPublishInfo.isHaveTopicRouterInfo() || topicPublishInfo.ok()) {
return topicPublishInfo;
} else {
// 从 NameSpace 中更新路由信息
this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic, true, this.defaultMQProducer);
topicPublishInfo = this.topicPublishInfoTable.get(topic);
return topicPublishInfo;
}
}
TopicPublishInfo 的 Class 结构:
public class TopicPublishInfo {
private boolean orderTopic = false;
private boolean haveTopicRouterInfo = false;
private List<MessageQueue> messageQueueList = new ArrayList<>();
private volatile ThreadLocalIndex sendWhichQueue = new ThreadLocalIndex();
private TopicRouteData topicRouteData;
}
public class TopicRouteData extends RemotingSerializable {
private String orderTopicConf;
private List<QueueData> queueDatas;
private List<BrokerData> brokerDatas;
private HashMap<String/* brokerAddr */, List<String>/* Filter Server */> filterServerTable;
//It could be null or empty
private Map<String/*brokerName*/, TopicQueueMappingInfo> topicQueueMappingByBroker;
}
MQClientInstance#updateTopicRouteInfoFromNameServer 从 NameSpace 拿路由信息的方法。
public boolean updateTopicRouteInfoFromNameServer(final String topic, boolean isDefault,
DefaultMQProducer defaultMQProducer) {
// 去 NameSpace 拿路由信息
TopicRouteData topicRouteData;
if (isDefault && defaultMQProducer != null) {
topicRouteData = this.mQClientAPIImpl.getDefaultTopicRouteInfoFromNameServer(clientConfig.getMqClientApiTimeout());
if (topicRouteData != null) {
for (QueueData data : topicRouteData.getQueueDatas()) {
int queueNums = Math.min(defaultMQProducer.getDefaultTopicQueueNums(), data.getReadQueueNums());
data.setReadQueueNums(queueNums);
data.setWriteQueueNums(queueNums);
}
} } else {
topicRouteData = this.mQClientAPIImpl.getTopicRouteInfoFromNameServer(topic, clientConfig.getMqClientApiTimeout());
}
if (topicRouteData != null) {
// 获取路由信息
TopicRouteData old = this.topicRouteTable.get(topic);
// 对比
boolean changed = topicRouteData.topicRouteDataChanged(old);
if (!changed) {
// 发生变化就改路由信息
changed = this.isNeedUpdateTopicRouteInfo(topic);
} else {
log.info("the topic[{}] route info changed, old[{}] ,new[{}]", topic, old, topicRouteData);
}
if (changed) {
// 更新 Broker 地址缓存表
for (BrokerData bd : topicRouteData.getBrokerDatas()) {
this.brokerAddrTable.put(bd.getBrokerName(), bd.getBrokerAddrs());
}
// Update endpoint map
{
ConcurrentMap<MessageQueue, String> mqEndPoints = topicRouteData2EndpointsForStaticTopic(topic, topicRouteData);
if (!mqEndPoints.isEmpty()) {
topicEndPointsTable.put(topic, mqEndPoints);
}
}
// Update Pub info
{
// 将 topicRouteData 转换为 TopicPublishInfo
TopicPublishInfo publishInfo = topicRouteData2TopicPublishInfo(topic, topicRouteData);
publishInfo.setHaveTopicRouterInfo(true);
for (Entry<String, MQProducerInner> entry : this.producerTable.entrySet()) {
MQProducerInner impl = entry.getValue();
if (impl != null) {
impl.updateTopicPublishInfo(topic, publishInfo);
}
}
}
// Update sub info
if (!consumerTable.isEmpty()) {
// 更新 consumer 信息
Set<MessageQueue> subscribeInfo = topicRouteData2TopicSubscribeInfo(topic, topicRouteData);
for (Entry<String, MQConsumerInner> entry : this.consumerTable.entrySet()) {
MQConsumerInner impl = entry.getValue();
if (impl != null) {
impl.updateTopicSubscribeInfo(topic, subscribeInfo);
}
}
}
TopicRouteData cloneTopicRouteData = new TopicRouteData(topicRouteData);
log.info("topicRouteTable.put. Topic = {}, TopicRouteData[{}]", topic, cloneTopicRouteData);
this.topicRouteTable.put(topic, cloneTopicRouteData);
return true; }
}
选择消息队列
首先会 broker + queue 的方式进行顺序排列。
[{“brokerName”: “broker-a”、“queueId”: 0}、
{“brokerName”:“broker-a”、“queueId”:1}、
{“brokerName”:“broker-a”、“queueId”:2}、
{“brokerName”:“broker-a”、“queueId”:3}、
{“brokerName”:“broker-b”、“queueId”:0}、
…]
Q:重试机制
由retryTimesWhenSendFailed指定同步方式重试次数
由 retryTimesWhenSend AsyncFailed 指定异常重试次数
选择消息队列有两种方式:
1)sendLatencyFaultEnable=false,默认不启用 Broker 故障延迟机制。
2)sendLatencyFaultEnable=true,启用 Broker 故障延迟机制。
如果 sendLatencyFaultEnable=false
延时故障启动机制,则调用 TopicPublishInfo #selectOneMessageQueue
public MessageQueue selectOneMessageQueue(final TopicPublishInfo tpInfo, final String lastBrokerName) {
if (this.sendLatencyFaultEnable) {
try {
int index = tpInfo.getSendWhichQueue().incrementAndGet();
// 1)轮询获取一个消息队列。
for (int i = 0; i < tpInfo.getMessageQueueList().size(); i++) {
int pos = index++ % tpInfo.getMessageQueueList().size();
MessageQueue mq = tpInfo.getMessageQueueList().get(pos);
// 2)验证该消息队列是否可用,latencyFaultTolerance.isAvailable(mq.getBrokerName())是关键。
if (!StringUtils.equals(lastBrokerName, mq.getBrokerName()) && latencyFaultTolerance.isAvailable(mq.getBrokerName())) {
return mq;
}
}
// 如果返回的MessageQueue可用,则移除latencyFaultTolerance关于该topic的条目,表明该Broker故障已经修复。
final String notBestBroker = latencyFaultTolerance.pickOneAtLeast();
int writeQueueNums = tpInfo.getWriteQueueIdByBroker(notBestBroker);
if (writeQueueNums > 0) {
// Broker故障延迟机制
final MessageQueue mq = tpInfo.selectOneMessageQueue();
if (notBestBroker != null) {
mq.setBrokerName(notBestBroker);
mq.setQueueId(tpInfo.getSendWhichQueue().incrementAndGet() % writeQueueNums);
}
return mq;
} else {
latencyFaultTolerance.remove(notBestBroker);
}
return tpInfo.selectOneMessageQueue();
}
// 不启用
return tpInfo.selectOneMessageQueue(lastBrokerName);
}
// 故障机制,平移到其他 Queue
public MessageQueue selectOneMessageQueue() {
int index = this.sendWhichQueue.incrementAndGet();
int pos = index % this.messageQueueList.size();
return this.messageQueueList.get(pos);
}
RocketMQ 故障延迟机制核心类
核心类解析:
1、LatenctFaultTolerance:接口
2、LatenctFaultToleranceImpl:故障容错实现类
3、FaultItem:延迟最小单位
4、MQFalutStrategy:操作 LatenctFaultTolerance
接口封装类
详解
todo
消息发送
DefaultMQProducerImpl#sendKernelImpl
参数:
1)Message msg:待发送消息。
2)MessageQueue mq:消息将发送到该消息队列上。
3)CommunicationMode communicationMode:消息发送模式,包括 SYNC、ASYNC、ONEWAY。
4)SendCallback sendCallback:异步消息回调函数。
5)TopicPublishInfo topicPublishInfo:主题路由信息。
6)long timeout:消息发送超时时间。
主发送逻辑
private SendResult sendKernelImpl() {
// broker 地址
if (null == brokerAddr) {
tryToFindTopicPublishInfo(mq.getTopic());
brokerName = this.mQClientFactory.getBrokerNameFromMessageQueue(mq);
brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(brokerName);
}
SendMessageContext context = null;
if (brokerAddr != null) {
brokerAddr = MixAll.brokerVIPChannel(this.defaultMQProducer.isSendMessageWithVIPChannel(), brokerAddr);
byte[] prevBody = msg.getBody();
try {
//for MessageBatch,ID has been set in the generating process
// 对于 MsgBatch,在生成过程中已设置 ID
if (!(msg instanceof MessageBatch)) {
MessageClientIDSetter.setUniqID(msg);
}
// 标记压缩 FLAG
int sysFlag = 0;
boolean msgBodyCompressed = false;
if (this.tryToCompressMessage(msg)) {
sysFlag |= MessageSysFlag.COMPRESSED_FLAG;
sysFlag |= compressType.getCompressionFlag();
msgBodyCompressed = true;
}
// 标记事务消息
final String tranMsg = msg.getProperty(MessageConst.PROPERTY_TRANSACTION_PREPARED);
if (Boolean.parseBoolean(tranMsg)) {
sysFlag |= MessageSysFlag.TRANSACTION_PREPARED_TYPE;
}
// 注册消息发送钩子函数
if (hasCheckForbiddenHook()) {
CheckForbiddenContext checkForbiddenContext = new CheckForbiddenContext();
...
this.executeCheckForbiddenHook(checkForbiddenContext);
}
if (this.hasSendMessageHook()) {
context = new SendMessageContext();
...
// 发送之前的增强逻辑
this.executeSendMessageHookBefore(context);
}
// 构建消息发送请求包
SendMessageRequestHeader requestHeader = new SendMessageRequestHeader();
...
// 发送消息 同步、异步、单向
SendResult sendResult = null;
switch (communicationMode) {
case ASYNC:
break;
case ONEWAY:
break;
case SYNC:
break;
}
// 发送之后的处理逻辑
if (this.hasSendMessageHook()) {
context.setSendResult(sendResult);
this.executeSendMessageHookAfter(context);
}
return sendResult;
} catch (RemotingException | InterruptedException | MQBrokerException e) {
// 如果注册了钩子函数,就执行 after 逻辑
if (this.hasSendMessageHook()) {
context.setException(e);
this.executeSendMessageHookAfter(context);
}
throw e;
} finally {
msg.setBody(prevBody);
msg.setTopic(NamespaceUtil.withoutNamespace(msg.getTopic(), this.defaultMQProducer.getNamespace()));
}
}
throw new MQClientException("The broker[" + brokerName + "] not exist", null);
}
三种发送方式 :
1、同步
2、异步
3、OneWay
// 发送方式
switch (communicationMode) {
case ONEWAY:
this.remotingClient.invokeOneway(addr, request, timeoutMillis);
return null;
case ASYNC:
final AtomicInteger times = new AtomicInteger();
long costTimeAsync = System.currentTimeMillis() - beginStartTime;
if (timeoutMillis < costTimeAsync) {
throw new RemotingTooMuchRequestException("sendMessage call timeout");
}
this.sendMessageAsync(addr, brokerName, msg, timeoutMillis - costTimeAsync, request, sendCallback, topicPublishInfo, instance,
retryTimesWhenSendFailed, times, context, producer);
return null;
case SYNC:
long costTimeSync = System.currentTimeMillis() - beginStartTime;
if (timeoutMillis < costTimeSync) {
throw new RemotingTooMuchRequestException("sendMessage call timeout");
}
return this.sendMessageSync(addr, brokerName, msg, timeoutMillis - costTimeSync, request);
default:
assert false;
break;}
其核心重试逻辑就是先记录到容错 Item 中,然后在通过回调方法就行发送。
org.apache.rocketmq.client.impl.MQClientAPIImpl#sendMessageAsync 。
try {
this.remotingClient.invokeAsync(addr, request, timeoutMillis, new InvokeCallback() {
@Override
public void operationComplete(ResponseFuture responseFuture) {
long cost = System.currentTimeMillis() - beginStartTime;
RemotingCommand response = responseFuture.getResponseCommand();
if (null == sendCallback && response != null) {
try {
SendResult sendResult = MQClientAPIImpl.this.processSendResponse(brokerName, msg, response, addr);
if (context != null && sendResult != null) {
context.setSendResult(sendResult);
context.getProducer().executeSendMessageHookAfter(context);
}
} catch (Throwable e) {
}
producer.updateFaultItem(brokerName, System.currentTimeMillis() - responseFuture.getBeginTimestamp(), false);
return; }
if (response != null) {
try {
SendResult sendResult = MQClientAPIImpl.this.processSendResponse(brokerName, msg, response, addr);
assert sendResult != null;
if (context != null) {
context.setSendResult(sendResult);
context.getProducer().executeSendMessageHookAfter(context);
}
try {
sendCallback.onSuccess(sendResult);
} catch (Throwable e) {
}
producer.updateFaultItem(brokerName, System.currentTimeMillis() - responseFuture.getBeginTimestamp(), false);
} catch (Exception e) {
producer.updateFaultItem(brokerName, System.currentTimeMillis() - responseFuture.getBeginTimestamp(), true);
// 重试
onExceptionImpl(brokerName, msg, timeoutMillis - cost, request, sendCallback, topicPublishInfo, instance,
retryTimesWhenSendFailed, times, e, context, false, producer);
}
} else {
producer.updateFaultItem(brokerName, System.currentTimeMillis() - responseFuture.getBeginTimestamp(), true);
if (!responseFuture.isSendRequestOK()) {
MQClientException ex = new MQClientException("send request failed", responseFuture.getCause());
// 重试
onExceptionImpl(brokerName, msg, timeoutMillis - cost, request, sendCallback, topicPublishInfo, instance,
retryTimesWhenSendFailed, times, ex, context, true, producer);
} else if (responseFuture.isTimeout()) {
MQClientException ex = new MQClientException("wait response timeout " + responseFuture.getTimeoutMillis() + "ms",
responseFuture.getCause());
// 重试
onExceptionImpl(brokerName, msg, timeoutMillis - cost, request, sendCallback, topicPublishInfo, instance,
retryTimesWhenSendFailed, times, ex, context, true, producer);
} else {
MQClientException ex = new MQClientException("unknow reseaon", responseFuture.getCause());
// 重试
onExceptionImpl(brokerName, msg, timeoutMillis - cost, request, sendCallback, topicPublishInfo, instance,
retryTimesWhenSendFailed, times, ex, context, true, producer);
}
}
}
});
} catch (Exception ex) {
long cost = System.currentTimeMillis() - beginStartTime;
producer.updateFaultItem(brokerName, cost, true);
// 重试
onExceptionImpl(brokerName, msg, timeoutMillis - cost, request, sendCallback, topicPublishInfo, instance,
retryTimesWhenSendFailed, times, ex, context, true, producer);
}
批量发送
DefaultMQProducer #send (Collection msgs)
和单条发送差不多。
总结
关键点
1、延迟容错机制
2、重试机制
问题:
- 消息队列如何进行负载?
- 消息发送如何实现高可用?
- 批量消息发送如何实现一致性?
- 消息发送流程