文章目录
- 一、消息发送入口
- 二、消息发送流程
- 1、消息验证
- 1)消息主题验证
- 2)消息内容验证
- 2、查找路由
- 3、消息发送
- 1)选择消息队列
- 2)消息发送-内核实现
- sendKernelImpl方法参数
- 获取brokerAddr
- 添加消息全局唯一id
- 设置实例id
- 设置系统标记
- 执行消息前置钩子
- 构建发送消息请求体
- 执行发送消息
- 执行后置钩子
一、消息发送入口
消息发送有三种模式:同步消息、异步消息、单向消息。
同步消息:producer向broker发送消息,等待知道broker反馈结果才结束
异步消息:producer向broker发送消息,同时指定回调方法;执行发送之后立即返回,异步等待broker反馈结果后执行回调方法
当向消息:producer向broker发送消息,执行发送后立即返回,也不等待broker结果
消息默认以同步的方式进行发送,发送入口为org.apache.rocketmq.client.producer.DefaultMQProducer#send(org.apache.rocketmq.common.message.Message)
,代码如下
/**
* Send message in synchronous mode. This method returns only when the sending procedure totally completes. </p>
*
* <strong>Warn:</strong> this method has internal retry-mechanism, that is, internal implementation will retry
* {@link #retryTimesWhenSendFailed} times before claiming failure. As a result, multiple messages may be potentially
* delivered to broker(s). It's up to the application developers to resolve potential duplication issue.
*
* @param msg Message to send.
* @return {@link SendResult} instance to inform senders details of the deliverable, say Message ID of the message,
* {@link SendStatus} indicating broker storage/replication status, message queue sent to, etc.
* @throws MQClientException if there is any client error.
* @throws RemotingException if there is any network-tier error.
* @throws MQBrokerException if there is any error with broker.
* @throws InterruptedException if the sending thread is interrupted.
*/
@Override
public SendResult send(
Message msg) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
msg.setTopic(withNamespace(msg.getTopic()));
return this.defaultMQProducerImpl.send(msg);
}
消息发送入口简单,只调用内部默认实现服务(defaultMQProducerImpl)的send方法,最终调用核心实现sendDefaultImpl
方法。
sendDefaultImpl
方法入参如下:
- Message msg:发送的消息对象
- CommunicationMode communicationMode:消息的发送模式,默认同步
- SendCallback sendCallback:发送完后的回调接口
- long timeout:超时时间:默认3000ms
通过sendDefaultImpl
方法的代码可以看出,消息发送的流程主要包含以下几个步骤:消息验证、查找路由、消息发送。让我们接着往下看。
二、消息发送流程
1、消息验证
消息发送之前的验证分为2部分:消息主题验证和消息内容验证。
//org.apache.rocketmq.client.Validators#checkMessage消息验证
public static void checkMessage(Message msg, DefaultMQProducer defaultMQProducer) throws MQClientException {
if (null == msg) {
throw new MQClientException(ResponseCode.MESSAGE_ILLEGAL, "the message is null");
}
// topic
Validators.checkTopic(msg.getTopic());
Validators.isNotAllowedSendTopic(msg.getTopic());
// body
if (null == msg.getBody()) {
throw new MQClientException(ResponseCode.MESSAGE_ILLEGAL, "the message body is null");
}
if (0 == msg.getBody().length) {
throw new MQClientException(ResponseCode.MESSAGE_ILLEGAL, "the message body length is zero");
}
if (msg.getBody().length > defaultMQProducer.getMaxMessageSize()) {
throw new MQClientException(ResponseCode.MESSAGE_ILLEGAL,
"the message body size over max value, MAX: " + defaultMQProducer.getMaxMessageSize());
}
}
1)消息主题验证
消息主题验证,规则包含以下几点:
- topic不能为空
- topic长度不能超过最大值127
- topic只能包含大小写字母和数字
- topic不能与部预留的主题相同
rocketmq内部预留的topic有如下几种:
- SCHEDULE_TOPIC_XXXX:延迟消息所使用的topic
- BenchmarkTest:基准测试使用的topic
- RMQ_SYS_TRANS_HALF_TOPIC:事务消息使用的topic
- RMQ_SYS_TRACE_TOPIC:轨迹消息使用的topic
- RMQ_SYS_TRANS_OP_HALF_TOPIC:事务消息使用的topic(二阶段)
- TRANS_CHECK_MAX_TIME_TOPIC:事务检查超时使用的topic
- SELF_TEST_TOPIC:自测使用
- OFFSET_MOVED_EVENT:消息偏移量改变消息使用
public static void checkTopic(String topic) throws MQClientException {
//topic不能为空
if (UtilAll.isBlank(topic)) {
throw new MQClientException("The specified topic is blank", null);
}
//topic长度不能超过最大值127
if (topic.length() > TOPIC_MAX_LENGTH) {
throw new MQClientException(
String.format("The specified topic is longer than topic max length %d.", TOPIC_MAX_LENGTH), null);
}
//topic只能包含大小写字母和数字
if (isTopicOrGroupIllegal(topic)) {
throw new MQClientException(String.format(
"The specified topic[%s] contains illegal characters, allowing only %s", topic,
"^[%|a-zA-Z0-9_-]+$"), null);
}
}
2)消息内容验证
消息内容验证包含以下几点:
- 消息内容对象(msg)不能为null
- 消息内容主体(body)不能为null
- 消息内容主题(body)长度不能为0,或超过消息允许发送的最大长度4M(1024 * 1024 * 4)
2、查找路由
消息发送需要根据topic找到对应的路由信息即broker。查找路由在DefaultMQProducerImpl#tryToFindTopicPublishInfo
方法中完成。
private TopicPublishInfo tryToFindTopicPublishInfo(final String topic) {
TopicPublishInfo topicPublishInfo = this.topicPublishInfoTable.get(topic);
if (null == topicPublishInfo || !topicPublishInfo.ok()) {
this.topicPublishInfoTable.putIfAbsent(topic, new TopicPublishInfo());
this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic);
topicPublishInfo = this.topicPublishInfoTable.get(topic);
}
if (topicPublishInfo.isHaveTopicRouterInfo() || topicPublishInfo.ok()) {
return topicPublishInfo;
} else {
//发送消息,获取topic路由信息,需要获取默认值
//是否允许获取默认值,在broker配置文件中配置,配置了默认值的,会注册到nameserver
this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic, true, this.defaultMQProducer);
topicPublishInfo = this.topicPublishInfoTable.get(topic);
return topicPublishInfo;
}
}
- 1、先从路由缓存表topicPublishInfoTable中获取
- 2、如果取到的路由为null或路由所持有的队列信息信息为空,则向nameSrv查询topic的路由信息并更新到路由缓存表
- 3、如果已经有topic路由信息或持有队列则返回此路由,否则再次向nameSrv查询更新路由信息到缓存表
3、消息发送
消息发送根据消息通信模式(同步、异步、单向)决定消息发送次数。如果是同步,则只发送一次,如果是其他(异步、单向)则发送失败会进行重试(3次)
1)选择消息队列
private SendResult sendDefaultImpl(
Message msg,
final CommunicationMode communicationMode,
final SendCallback sendCallback,
final long timeout
) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
//......
//异步失败发3次,同步发1次
int timesTotal = communicationMode == CommunicationMode.SYNC ? 1 + this.defaultMQProducer.getRetryTimesWhenSendFailed() : 1;
int times = 0;
String[] brokersSent = new String[timesTotal];
for (; times < timesTotal; times++) {
String lastBrokerName = null == mq ? null : mq.getBrokerName();
//查找除lastBrokerName外的broker消息队列
MessageQueue mqSelected = this.selectOneMessageQueue(topicPublishInfo, lastBrokerName);
//......
}
执行消息发送时,选择所发送消息topic的一个队列,这个队列不能是上一次重试时使用的队列(lastBrokerName)
在重试发送时,记录上一次发送失败的broker(lastBrokerName),这样在下一次重试时能避免再次往失败的broker发送。
2)消息发送-内核实现
在做完上述消息验证、路由选取动作之后,调用核心方法DefaultMQProducerImpl#sendKernelImpl
进行发送消息,下面让我们来看看sendKernelImpl
都做了些什么
sendKernelImpl方法参数
private SendResult sendKernelImpl(final Message msg,
final MessageQueue mq,
final CommunicationMode communicationMode,
final SendCallback sendCallback,
final TopicPublishInfo topicPublishInfo,
final long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
//......
}
参数:
- Message msg:消息实体
- MessageQueue mq:本次发送使用的消息队列
- CommunicationMode communicationMode:消息发送通信模式
- SendCallback sendCallback:回调函数
- TopicPublishInfo topicPublishInfo:路由信息
- long timeout:超时时间
获取brokerAddr
根据mq
队列对象传入的brokerName,从MQClientInstance
本地缓存的brokerAddrTable
中获取主broker地址。如果查找不到主broker,尝试从nameServer更新topic的路由信息再获取。
//从MQClientlnIstance的brokerAddrTable中查找broker地址
String brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName());
if (null == brokerAddr) {
//没找到broker地址,尝试从nameServer更新topic的路由信息再获取
tryToFindTopicPublishInfo(mq.getTopic());
brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName());
}
添加消息全局唯一id
//for MessageBatch,ID has been set in the generating process
if (!(msg instanceof MessageBatch)) {
MessageClientIDSetter.setUniqID(msg);
}
对于单条消息,为其创建一个全局唯一消息id;批量消息则已经提前设置
设置实例id
如果topic设置了命名空间(namespace),则为消息添加对应的INSTANCE_ID
实例id
boolean topicWithNamespace = false;
if (null != this.mQClientFactory.getClientConfig().getNamespace()) {
msg.setInstanceId(this.mQClientFactory.getClientConfig().getNamespace());
topicWithNamespace = true;
}
设置系统标记
int sysFlag = 0;
boolean msgBodyCompressed = false;
if (this.tryToCompressMessage(msg)) {
//标记已压缩
sysFlag |= MessageSysFlag.COMPRESSED_FLAG;
sysFlag |= compressType.getCompressionFlag();
msgBodyCompressed = true;
}
final String tranMsg = msg.getProperty(MessageConst.PROPERTY_TRANSACTION_PREPARED);
if (Boolean.parseBoolean(tranMsg)) {
//标记事务消息
sysFlag |= MessageSysFlag.TRANSACTION_PREPARED_TYPE;
}
如果消息体超过4K,则进行压缩后添加压缩标记;如果消息是事务消息,添加事务标记
执行消息前置钩子
前置的消息钩子有两种:检查禁用钩子和消息发送钩子
//执行禁用钩子函数
if (hasCheckForbiddenHook()) {
CheckForbiddenContext checkForbiddenContext = new CheckForbiddenContext();
checkForbiddenContext.setNameSrvAddr(this.defaultMQProducer.getNamesrvAddr());
checkForbiddenContext.setGroup(this.defaultMQProducer.getProducerGroup());
checkForbiddenContext.setCommunicationMode(communicationMode);
checkForbiddenContext.setBrokerAddr(brokerAddr);
checkForbiddenContext.setMessage(msg);
checkForbiddenContext.setMq(mq);
checkForbiddenContext.setUnitMode(this.isUnitMode());
this.executeCheckForbiddenHook(checkForbiddenContext);
}
//执行消息发送钩子
if (this.hasSendMessageHook()) {
context = new SendMessageContext();
context.setProducer(this);
context.setProducerGroup(this.defaultMQProducer.getProducerGroup());
context.setCommunicationMode(communicationMode);
context.setBornHost(this.defaultMQProducer.getClientIP());
context.setBrokerAddr(brokerAddr);
context.setMessage(msg);
context.setMq(mq);
context.setNamespace(this.defaultMQProducer.getNamespace());
String isTrans = msg.getProperty(MessageConst.PROPERTY_TRANSACTION_PREPARED);
if (isTrans != null && isTrans.equals("true")) {
context.setMsgType(MessageType.Trans_Msg_Half);
}
if (msg.getProperty("__STARTDELIVERTIME") != null || msg.getProperty(MessageConst.PROPERTY_DELAY_TIME_LEVEL) != null) {
context.setMsgType(MessageType.Delay_Msg);
}
this.executeSendMessageHookBefore(context);
}
构建发送消息请求体
SendMessageRequestHeader
消息发送请求对象,包含发送消息必要的信息(topic、队列等)
SendMessageRequestHeader requestHeader = new SendMessageRequestHeader();
requestHeader.setProducerGroup(this.defaultMQProducer.getProducerGroup());
requestHeader.setTopic(msg.getTopic());
requestHeader.setDefaultTopic(this.defaultMQProducer.getCreateTopicKey());
requestHeader.setDefaultTopicQueueNums(this.defaultMQProducer.getDefaultTopicQueueNums());
requestHeader.setQueueId(mq.getQueueId());
requestHeader.setSysFlag(sysFlag);
requestHeader.setBornTimestamp(System.currentTimeMillis());
requestHeader.setFlag(msg.getFlag());
requestHeader.setProperties(MessageDecoder.messageProperties2String(msg.getProperties()));
requestHeader.setReconsumeTimes(0);
requestHeader.setUnitMode(this.isUnitMode());
requestHeader.setBatch(msg instanceof MessageBatch);
if (requestHeader.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
String reconsumeTimes = MessageAccessor.getReconsumeTime(msg);
if (reconsumeTimes != null) {
requestHeader.setReconsumeTimes(Integer.valueOf(reconsumeTimes));
MessageAccessor.clearProperty(msg, MessageConst.PROPERTY_RECONSUME_TIME);
}
String maxReconsumeTimes = MessageAccessor.getMaxReconsumeTimes(msg);
if (maxReconsumeTimes != null) {
requestHeader.setMaxReconsumeTimes(Integer.valueOf(maxReconsumeTimes));
MessageAccessor.clearProperty(msg, MessageConst.PROPERTY_MAX_RECONSUME_TIMES);
}
}
执行发送消息
根据消息通信模式,调用MQClientAPIImpl#sendMessage
方法进行发送消息。如果是异步消息,发送时会传入回调函数sendCallback
以便发送后进行后续操作
SendResult sendResult = null;
switch (communicationMode) {
case ASYNC:
Message tmpMessage = msg;
boolean messageCloned = false;
if (msgBodyCompressed) {
//If msg body was compressed, msgbody should be reset using prevBody.
//Clone new message using commpressed message body and recover origin massage.
//Fix bug:https://github.com/apache/rocketmq-externals/issues/66
tmpMessage = MessageAccessor.cloneMessage(msg);
messageCloned = true;
msg.setBody(prevBody);
}
if (topicWithNamespace) {
if (!messageCloned) {
tmpMessage = MessageAccessor.cloneMessage(msg);
messageCloned = true;
}
msg.setTopic(NamespaceUtil.withoutNamespace(msg.getTopic(), this.defaultMQProducer.getNamespace()));
}
long costTimeAsync = System.currentTimeMillis() - beginStartTime;
if (timeout < costTimeAsync) {
throw new RemotingTooMuchRequestException("sendKernelImpl call timeout");
}
sendResult = this.mQClientFactory.getMQClientAPIImpl().sendMessage(
brokerAddr,
mq.getBrokerName(),
tmpMessage,
requestHeader,
timeout - costTimeAsync,
communicationMode,
sendCallback,
topicPublishInfo,
this.mQClientFactory,
this.defaultMQProducer.getRetryTimesWhenSendAsyncFailed(),
context,
this);
break;
case ONEWAY:
case SYNC:
long costTimeSync = System.currentTimeMillis() - beginStartTime;
if (timeout < costTimeSync) {
throw new RemotingTooMuchRequestException("sendKernelImpl call timeout");
}
sendResult = this.mQClientFactory.getMQClientAPIImpl().sendMessage(
brokerAddr,
mq.getBrokerName(),
msg,
requestHeader,
timeout - costTimeSync,
communicationMode,
context,
this);
break;
default:
assert false;
break;
}
执行后置钩子
消息发送完后,执行发送后置钩子函数
if (this.hasSendMessageHook()) {
//注册了消息发送钩子函数,执行after逻辑
context.setSendResult(sendResult);
this.executeSendMessageHookAfter(context);
}
同样,在发送过程中发生异常也会执行发送后置钩子函数。这里异常主要包括:RemotingException 、MQBrokerException、InterruptedException
} catch (RemotingException e) {
if (this.hasSendMessageHook()) {
context.setException(e);
this.executeSendMessageHookAfter(context);
}
throw e;
} catch (MQBrokerException e) {
if (this.hasSendMessageHook()) {
context.setException(e);
this.executeSendMessageHookAfter(context);
}
throw e;
} catch (InterruptedException e) {
if (this.hasSendMessageHook()) {
context.setException(e);
this.executeSendMessageHookAfter(context);
}
throw e;
} finally {