一、消息
消息实体类为org.apache.rocketmq.common.message.Message,其主要属性如下。
// 消息所属topic
private String topic;
// 消息Flag(RocketMQ不作处理),即:用户处理
private int flag;
// 扩展属性
private Map<String, String> properties;
// 消息体
private byte[] body;
// 事务ID
private String transactionId;
二、消息生产者
1.生产者UML
默认消息生产者org.apache.rocketmq.client.producer.DefaultMQProducer。消息生产者两个实现类:默认生产者DefaultMQProducer、事务消息生产者TransactionMQProducer。
2.DefaultMQProducer关键属性
// 封装生产者内部实现方法
protected final transient DefaultMQProducerImpl defaultMQProducerImpl;
// 生产组,默认DEFAULT_PRODUCER
private String producerGroup;
// 创建消息队列的数量,默认4
private volatile int defaultTopicQueueNums = 4;
// 发送消息超时时间,默认3s
private int sendMsgTimeout = 3000;
// 消息体大小超出4K则压缩消息体
private int compressMsgBodyOverHowmuch = 1024 * 4;
// 同步模式下最大发送消息次数
private int retryTimesWhenSendFailed = 2;
// 异步模式下最大发送消息次数
private int retryTimesWhenSendAsyncFailed = 2;
// 发送失败时,是否选择任意一个Broker发送,默认否
private boolean retryAnotherBrokerWhenNotStoreOK = false;
// 最大发送消息体的大小,默认4MB
private int maxMessageSize = 1024 * 1024 * 4;
3.发送消息方法
RocketMQ支持3种消息发送方式:
同步(sync):发送消息时,同步等待直到消息服务器返回发送结果。
异步(async):发送消息时,指定消息发送成功后的回调函数,然后发送,立即返回,消息发送者线程不阻塞,消息发送成功或失败的回调任务在一个新的线程中执行。
单向(oneway):发送消息后直接返回,不等待Broker的结果,也不注册回调函数,即:只发送,不在乎消息是否成功存储到Broker。
以下是消息发送方法,其中:
SendCallback sendCallback:含有SendCallback类的,是指定回调方法,属于异步方式发送消息;
MessageQueue messageQueue:指定发送消息存储到哪个消费队列;
MessageQueueSelector selector:消息队列选择器,指定发送哪个消费队列;
long timeout:发送超时时间;
Object arg:传入到MessageQueueSelector的用户参数;
Collection<Message> msgs:批量发送消息;
sendMessageInTransaction()方法:发送事务消息,生产者为TransactionMQProducer;
sendOneway():单向方式发送消息。
三、生产者启动流程
生产者启动方法org.apache.rocketmq.client.producer.DefaultMQProducer#start,其方法调用链如下。
org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl#start(boolean)是核心方法,如下代码所示。DefaultMQProducerImpl维护生产者示例的ConcurrentMap<String/* topic */, TopicPublishInfo> topicPublishInfoTable(主题映射的发布信息)。
// 启动生产者
public void start(final boolean startFactory) throws MQClientException {
// 默认仅创建,不启动,即:创建DefaultMQProducerImpl.serviceState = CREATE_JUST
switch (this.serviceState) {
// 仅创建,不启动
case CREATE_JUST:
this.serviceState = ServiceState.START_FAILED;
// 检查生产组名称是否符合规范:最大长度为255,不含特殊字符
this.checkConfig();
// 生产者的instanceName为进程ID
if (!this.defaultMQProducer.getProducerGroup().equals(MixAll.CLIENT_INNER_PRODUCER_GROUP)) {
this.defaultMQProducer.changeInstanceNameToPID();
}
/*
获取或创建MQClientInstance:
a. 当前生产者获取clientId
b. 根据clientId获取或创建MQClientInstance
*/
this.mQClientFactory = MQClientManager.getInstance().getOrCreateMQClientInstance(this.defaultMQProducer, rpcHook);
// 当前生产者注册到MQClientInstance(维护所有生产者)
boolean registerOK = mQClientFactory.registerProducer(this.defaultMQProducer.getProducerGroup(), this);
if (!registerOK) {
this.serviceState = ServiceState.CREATE_JUST;
throw new MQClientException("The producer group[" + this.defaultMQProducer.getProducerGroup()
+ "] has been created before, specify another name please." + FAQUrl.suggestTodo(FAQUrl.GROUP_NAME_DUPLICATE_URL),
null);
}
// 当前生产者,topic发布信息
this.topicPublishInfoTable.put(this.defaultMQProducer.getCreateTopicKey(), new TopicPublishInfo());
// 启动MQClientInstance
if (startFactory) {
mQClientFactory.start();
}
log.info("the producer [{}] start OK. sendMessageWithVIPChannel={}", this.defaultMQProducer.getProducerGroup(),
this.defaultMQProducer.isSendMessageWithVIPChannel());
this.serviceState = ServiceState.RUNNING;
break;
case RUNNING:
case START_FAILED:
case SHUTDOWN_ALREADY:
throw new MQClientException("The producer service state not OK, maybe started once, "
+ this.serviceState
+ FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK),
null);
default:
break;
}
this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();
RequestFutureHolder.getInstance().startScheduledTask(this);
}
注意,整个JVM实例中只存在一个MQClientManager去维护一个MQClientlnstance缓存表ConcurrentMap<String/* clientId */, MQClientInstance> factoryTable,即:同一个clientId只会创建一个MQClientInstance。而clientId组成:当前生产者服务器IP + instanceName(PID#时间戳) + unitName( 可选)。从clientId组成看出,单个JVM有且只有一个MQClientInstance实例。多个生产者和多个消费者共用一个MQClientInstance实例。
如下代码所示MQClientInstance维护的不同容器。示例启动后,定时任务周期维护生产者和消费者的路由信息、Topic发布信息等。
// 生产者容器
private final ConcurrentMap<String/* 生产组 */, MQProducerInner> producerTable = new ConcurrentHashMap<>();
// 消费者容器
private final ConcurrentMap<String/* 消费组 */, MQConsumerInner> consumerTable = new ConcurrentHashMap<>();
// adminExt容器
private final ConcurrentMap<String, MQAdminExtInner> adminExtTable = new ConcurrentHashMap<>();
// 主题的路由信息容器
private final ConcurrentMap<String/* Topic */, TopicRouteData> topicRouteTable = new ConcurrentHashMap<>();
// 主题消费队列容器
private final ConcurrentMap<String/* Topic */, ConcurrentMap<MessageQueue, String/*brokerName*/>> topicEndPointsTable = new ConcurrentHashMap<>();
四、消息发送流程
以下代码是生产者发送消息SendResult send(Message msg, long timeout) 方法为入口讲解。其发送消息的核心实现方法是org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl#sendDefaultImpl,其调用链如下图所示。发送主要步骤是:验证消息、查找topic路由信息、选择消息队列、消息发送(包括异常处理)。
1.验证消息
org.apache.rocketmq.client.Validators#checkMessage方法检查消息是否符合规范:
主题是否规范:Topic不能为空字符串、长度不能超出127
消息体是否规范:消息体不能null、不能为空字符串、长度不能超出4MB
// 检查消息是否符合规范:topic规范、消息体不能为空、消息体长度默认不能大于4MB
public static void checkMessage(Message msg, DefaultMQProducer defaultMQProducer) throws MQClientException {
if (null == msg) {
throw new MQClientException(ResponseCode.MESSAGE_ILLEGAL, "the message is null");
}
// topic
Validators.checkTopic(msg.getTopic());
Validators.isNotAllowedSendTopic(msg.getTopic());
// body
if (null == msg.getBody()) {
throw new MQClientException(ResponseCode.MESSAGE_ILLEGAL, "the message body is null");
}
if (0 == msg.getBody().length) {
throw new MQClientException(ResponseCode.MESSAGE_ILLEGAL, "the message body length is zero");
}
if (msg.getBody().length > defaultMQProducer.getMaxMessageSize()) {
throw new MQClientException(ResponseCode.MESSAGE_ILLEGAL,
"the message body size over max value, MAX: " + defaultMQProducer.getMaxMessageSize());
}
}
2.查找主题路由信息
org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl#tryToFindTopicPublishInfo方法是获取主题路由信息。从生产者本地缓存的topicPublishInfoTable表获取,没有找到则尝试创建或更新到NameServer,则再从NameServer查找,若获取不到,则根据isDefault是否采用默认主题路由来获取,再找不到,则抛出异常。
获取主题路由信息TopicPublishInfo的目的是:最终获取topic下该生产组的Broker路由信息TopicRouteData(获取到Broker地址及存储的消费队列)。
/**
* 获取主题路由发布信息
* 获取逻辑:
* step1: 本地生产者有缓存该topic路由信息和消息队列,则直接返回
* step2: 本地生产者没有缓存,则从NameServer查找主题路由信息
* step3: 没有缓存,从NameServer查找不到,则isDefault是否采用默认主题路由(defaultMQProducer.getCreateTopicKey() —— AUTO_CREATE_TOPIC_KEY_TOPIC)
* @param topic 主题
* @return 主题发布信息
*/
private TopicPublishInfo tryToFindTopicPublishInfo(final String topic) {
// 获取生产者缓存的主题发布信息
TopicPublishInfo topicPublishInfo = this.topicPublishInfoTable.get(topic);
// 生产者没有主题发布信息或没有消息队列,则创建并更新NameServer主题路由信息
if (null == topicPublishInfo || !topicPublishInfo.ok()) {
// 本地生产者创建
this.topicPublishInfoTable.putIfAbsent(topic, new TopicPublishInfo());
// 更新NameServer主题路由信息
this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic);
topicPublishInfo = this.topicPublishInfoTable.get(topic);
}
// 本地有缓存,则直接获取
if (topicPublishInfo.isHaveTopicRouterInfo() || topicPublishInfo.ok()) {
return topicPublishInfo;
} else {
// isDefault为true,采用默认主题发送消息
this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic, true, this.defaultMQProducer);
topicPublishInfo = this.topicPublishInfoTable.get(topic);
return topicPublishInfo;
}
}
3.选择消息队列
根据获取的主题发布信息,进而选择消息队列。当选择消息队列时,生产者和NameServer并不能立即知道Broker是否可用,原因是:
NameServer检测Broker是否可用是有延迟的,最短为一次心跳检测间隔(5s);
生产者每隔30s更新一次路由信息,NameServer不会检测到Broker岩机后立刻推送消息给生产者;
因此采用发送消息重试机制(同步次数:retryTimesWhenSendFailed,异步次数:retryTimesWhenSendAsyncFailed),同时引入一种机制,在Broker宕机期间,如果一次消息发送失败后,可以将该Broker暂时排除在消息队列的选择范围中。sendLatencyFaultEnable配置是否启用Broker故障延迟机制,默认不启用。
1). 默认机制
没有启用Broker故障延迟机制,即:sendLatencyFaultEnable为false,采用默认机制,其实现方法org.apache.rocketmq.client.impl.producer.TopicPublishInfo#selectOneMessageQueue(java.lang.String),其代码如下。
其算法是:没有失败的Broker,则随机获取一个;有失败的Broker,排除上次失败的Broker。该算法主要是获取上次发送失败的Broker,该缺点是:一个Broker可能有多个消费队列,下次选择可能还是失败Broker队列。则引入了Broker故障延迟机制。
/**
* 选择一个消息队列
* @param lastBrokerName 上次发送失败的brokerName
* @return 发送消息的消息队列
*/
public MessageQueue selectOneMessageQueue(final String lastBrokerName) {
// 没有broker发送失败
if (lastBrokerName == null) {
return selectOneMessageQueue();
}
// 有broker发送失败
else {
for (int i = 0; i < this.messageQueueList.size(); i++) {
// sendWhichQueue自增长再获取,原子操作
int index = this.sendWhichQueue.incrementAndGet();
// 取模,获取消息队列索引值
int pos = Math.abs(index) % this.messageQueueList.size();
if (pos < 0)
pos = 0;
MessageQueue mq = this.messageQueueList.get(pos);
// 跳过失败的broker,该算法缺陷:多次获取到失败的broker(宕机broker中多个消息队列)
if (!mq.getBrokerName().equals(lastBrokerName)) {
return mq;
}
}
return selectOneMessageQueue();
}
}
public MessageQueue selectOneMessageQueue() {
// sendWhichQueue自增长再获取,原子操作
int index = this.sendWhichQueue.incrementAndGet();
// 取模,获取消息队列索引值
int pos = Math.abs(index) % this.messageQueueList.size();
if (pos < 0)
pos = 0;
return this.messageQueueList.get(pos);
}
2).Broker故障延迟机制
Broker故障延迟机制核心类,如下:
启用Broker故障延迟机制,即:sendLatencyFaultEnable为true,则选择消息队列的核心方法是org.apache.rocketmq.client.latency.MQFaultStrategy#selectOneMessageQueue,其代码如下。
/**
* 选择一个消息队列
* step1:是否启用故障延迟机制sendLatencyFaultEnable,默认false,不启用
* step2:若启用,判断上次Broker是否可用
* @param tpInfo 主题路由信息
* @param lastBrokerName 上次发送失败的brokerName
* @return 发送消息的消息队列
*/
public MessageQueue selectOneMessageQueue(final TopicPublishInfo tpInfo, final String lastBrokerName) {
// sendLatencyFaultEnable为true时,启动Broker故障延迟机制;默认false,不启用
if (this.sendLatencyFaultEnable) {
try {
int index = tpInfo.getSendWhichQueue().incrementAndGet();
for (int i = 0; i < tpInfo.getMessageQueueList().size(); i++) {
int pos = Math.abs(index++) % tpInfo.getMessageQueueList().size();
if (pos < 0)
pos = 0;
MessageQueue mq = tpInfo.getMessageQueueList().get(pos);
// 判断broker是否可用,判断依据:当前时间是否在不可用持续时间(notAvailableDuration)范围内
if (latencyFaultTolerance.isAvailable(mq.getBrokerName()))
return mq;
}
// 从失败的broker中选择一个可用的broker,没有则返回null
final String notBestBroker = latencyFaultTolerance.pickOneAtLeast();
int writeQueueNums = tpInfo.getQueueIdByBroker(notBestBroker);
if (writeQueueNums > 0) {
final MessageQueue mq = tpInfo.selectOneMessageQueue();
if (notBestBroker != null) {
mq.setBrokerName(notBestBroker);
mq.setQueueId(tpInfo.getSendWhichQueue().incrementAndGet() % writeQueueNums);
}
return mq;
} else {
// 移除可用的失败broker
latencyFaultTolerance.remove(notBestBroker);
}
} catch (Exception e) {
log.error("Error occurred when selecting message queue", e);
}
return tpInfo.selectOneMessageQueue();
}
// 不启用Broker故障延迟机制
return tpInfo.selectOneMessageQueue(lastBrokerName);
}
4.消息发送
1).生产者消息发送
org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl#sendKernelImpl是消息发送核心方法,其代码如下。
/**
* 发送消息
* step1:从MQClientInstance获取broker地址,若没有,则从NameServer获取,再没有则抛出异常;
* step2:不是批量发送消息,则为消息分配一个全局消息ID;
* step3:设置消息是否压缩,消息体 > 4KB,则压缩;
* step4:是否是事务Prepare消息,若是写入事务prepare标签;
* step5:执行发送前钩子函数;
* step6:根据不同发送方式,发送消息;
* step7:执行发送后钩子函数;
* @param msg 待发送消息
* @param mq 选择的消息队列(消息发送到该队列)
* @param communicationMode 发送消息模式,如:同步、异步、单向
* @param sendCallback 异步发送消息回调函数
* @param topicPublishInfo topic发布路由信息
* @param timeout 发送超时时间
* @return 发送结果
* @throws MQClientException
* @throws RemotingException
* @throws MQBrokerException
* @throws InterruptedException
*/
private SendResult sendKernelImpl(final Message msg,
final MessageQueue mq,
final CommunicationMode communicationMode,
final SendCallback sendCallback,
final TopicPublishInfo topicPublishInfo,
final long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
long beginStartTime = System.currentTimeMillis();
// 从MQClientInstance获取broker地址,若没有,则从NameServer获取,再没有则抛出异常
String brokerName = this.mQClientFactory.getBrokerNameFromMessageQueue(mq);
String brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(brokerName);
if (null == brokerAddr) {
// 从NameServer获取
tryToFindTopicPublishInfo(mq.getTopic());
brokerName = this.mQClientFactory.getBrokerNameFromMessageQueue(mq);
brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(brokerName);
}
SendMessageContext context = null;
if (brokerAddr != null) {
brokerAddr = MixAll.brokerVIPChannel(this.defaultMQProducer.isSendMessageWithVIPChannel(), brokerAddr);
byte[] prevBody = msg.getBody();
try {
//for MessageBatch,ID has been set in the generating process
// 不是批量发送消息,则为消息分配一个全局消息ID
if (!(msg instanceof MessageBatch)) {
MessageClientIDSetter.setUniqID(msg);
}
// 设置消息instanceId
boolean topicWithNamespace = false;
if (null != this.mQClientFactory.getClientConfig().getNamespace()) {
msg.setInstanceId(this.mQClientFactory.getClientConfig().getNamespace());
topicWithNamespace = true;
}
// 设置消息是否压缩,消息体 > 4KB,则压缩
int sysFlag = 0;
boolean msgBodyCompressed = false;
if (this.tryToCompressMessage(msg)) {
sysFlag |= MessageSysFlag.COMPRESSED_FLAG; // |= ,是二进制位或运算,如:a = a | b
sysFlag |= compressType.getCompressionFlag();
msgBodyCompressed = true;
}
// 是否是事务Prepare消息
final String tranMsg = msg.getProperty(MessageConst.PROPERTY_TRANSACTION_PREPARED);
if (Boolean.parseBoolean(tranMsg)) {
sysFlag |= MessageSysFlag.TRANSACTION_PREPARED_TYPE; // 事务消息标签
}
// 判断是否注册了禁止消息发送钩子函数,即:判定checkForbiddenHookList不为空
if (hasCheckForbiddenHook()) {
CheckForbiddenContext checkForbiddenContext = new CheckForbiddenContext();
checkForbiddenContext.setNameSrvAddr(this.defaultMQProducer.getNamesrvAddr());
checkForbiddenContext.setGroup(this.defaultMQProducer.getProducerGroup());
checkForbiddenContext.setCommunicationMode(communicationMode);
checkForbiddenContext.setBrokerAddr(brokerAddr);
checkForbiddenContext.setMessage(msg);
checkForbiddenContext.setMq(mq);
checkForbiddenContext.setUnitMode(this.isUnitMode());
this.executeCheckForbiddenHook(checkForbiddenContext);
}
// 判断是否注册了消息发送钩子函数,即:判定sendMessageHookList不为空
// 注册钩子函数:DefaultMQProducerImpl.registerSendMessageHook
if (this.hasSendMessageHook()) {
context = new SendMessageContext();
context.setProducer(this);
context.setProducerGroup(this.defaultMQProducer.getProducerGroup());
context.setCommunicationMode(communicationMode);
context.setBornHost(this.defaultMQProducer.getClientIP());
context.setBrokerAddr(brokerAddr);
context.setMessage(msg);
context.setMq(mq);
context.setNamespace(this.defaultMQProducer.getNamespace());
String isTrans = msg.getProperty(MessageConst.PROPERTY_TRANSACTION_PREPARED);
if (isTrans != null && isTrans.equals("true")) {
context.setMsgType(MessageType.Trans_Msg_Half);
}
if (msg.getProperty("__STARTDELIVERTIME") != null || msg.getProperty(MessageConst.PROPERTY_DELAY_TIME_LEVEL) != null) {
context.setMsgType(MessageType.Delay_Msg);
}
this.executeSendMessageHookBefore(context);
}
// 创建发送消息请求头
SendMessageRequestHeader requestHeader = new SendMessageRequestHeader();
requestHeader.setProducerGroup(this.defaultMQProducer.getProducerGroup());
requestHeader.setTopic(msg.getTopic());
requestHeader.setDefaultTopic(this.defaultMQProducer.getCreateTopicKey());
requestHeader.setDefaultTopicQueueNums(this.defaultMQProducer.getDefaultTopicQueueNums()); // 默认主题在该broker的队列数
requestHeader.setQueueId(mq.getQueueId());
requestHeader.setSysFlag(sysFlag); // 系统标记
requestHeader.setBornTimestamp(System.currentTimeMillis()); // 消息发送时间
requestHeader.setFlag(msg.getFlag()); // 消息标记,目前不做处理
requestHeader.setProperties(MessageDecoder.messageProperties2String(msg.getProperties()));
requestHeader.setReconsumeTimes(0); // 消息重试次数
requestHeader.setUnitMode(this.isUnitMode());
requestHeader.setBatch(msg instanceof MessageBatch); // 是否批量消息
if (requestHeader.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
String reconsumeTimes = MessageAccessor.getReconsumeTime(msg);
if (reconsumeTimes != null) {
requestHeader.setReconsumeTimes(Integer.valueOf(reconsumeTimes));
MessageAccessor.clearProperty(msg, MessageConst.PROPERTY_RECONSUME_TIME);
}
String maxReconsumeTimes = MessageAccessor.getMaxReconsumeTimes(msg);
if (maxReconsumeTimes != null) {
requestHeader.setMaxReconsumeTimes(Integer.valueOf(maxReconsumeTimes));
MessageAccessor.clearProperty(msg, MessageConst.PROPERTY_MAX_RECONSUME_TIMES);
}
}
SendResult sendResult = null;
switch (communicationMode) {
case ASYNC:
Message tmpMessage = msg;
boolean messageCloned = false;
if (msgBodyCompressed) {
//If msg body was compressed, msgbody should be reset using prevBody.
//Clone new message using commpressed message body and recover origin massage.
//Fix bug:https://github.com/apache/rocketmq-externals/issues/66
tmpMessage = MessageAccessor.cloneMessage(msg);
messageCloned = true;
msg.setBody(prevBody);
}
if (topicWithNamespace) {
if (!messageCloned) {
tmpMessage = MessageAccessor.cloneMessage(msg);
messageCloned = true;
}
msg.setTopic(NamespaceUtil.withoutNamespace(msg.getTopic(), this.defaultMQProducer.getNamespace()));
}
long costTimeAsync = System.currentTimeMillis() - beginStartTime;
if (timeout < costTimeAsync) {
throw new RemotingTooMuchRequestException("sendKernelImpl call timeout");
}
sendResult = this.mQClientFactory.getMQClientAPIImpl().sendMessage(
brokerAddr,
brokerName,
tmpMessage,
requestHeader,
timeout - costTimeAsync,
communicationMode,
sendCallback,
topicPublishInfo,
this.mQClientFactory,
this.defaultMQProducer.getRetryTimesWhenSendAsyncFailed(),
context,
this);
break;
case ONEWAY:
case SYNC:
// 发送消息是否超时
long costTimeSync = System.currentTimeMillis() - beginStartTime;
if (timeout < costTimeSync) {
throw new RemotingTooMuchRequestException("sendKernelImpl call timeout");
}
// 发送消息请求
sendResult = this.mQClientFactory.getMQClientAPIImpl().sendMessage(
brokerAddr,
brokerName,
msg,
requestHeader,
timeout - costTimeSync,
communicationMode,
context,
this);
break;
default:
assert false;
break;
}
// 执行发送后钩子函数
if (this.hasSendMessageHook()) {
context.setSendResult(sendResult);
this.executeSendMessageHookAfter(context);
}
return sendResult;
} catch (RemotingException e) {
// 执行发送后钩子函数
if (this.hasSendMessageHook()) {
context.setException(e);
this.executeSendMessageHookAfter(context);
}
throw e;
} catch (MQBrokerException e) {
if (this.hasSendMessageHook()) {
context.setException(e);
this.executeSendMessageHookAfter(context);
}
throw e;
} catch (InterruptedException e) {
if (this.hasSendMessageHook()) {
context.setException(e);
this.executeSendMessageHookAfter(context);
}
throw e;
} finally {
msg.setBody(prevBody);
msg.setTopic(NamespaceUtil.withoutNamespace(msg.getTopic(), this.defaultMQProducer.getNamespace()));
}
}
// 没有broker地址,则抛出异常
throw new MQClientException("The broker[" + brokerName + "] not exist", null);
}
执行发送消息请求,根据同步、异步、单向发送方式进行网络传输,其发送消息请求码都是RequestCode.SEND_MESSAGE。其实现方法是org.apache.rocketmq.client.impl.MQClientAPIImpl#sendMessage,其代码如下。
/**
* 发送消息请求
* @param addr broker地址
* @param brokerName broker名称
* @param msg 待发送消息 {@link Message}
* @param requestHeader 发送消息请求头 {@link SendMessageRequestHeader}
* @param timeoutMillis 发送超时时间
* @param communicationMode 发送消息模式,如:同步、异步、单向
* @param sendCallback 异步发送消息回调函数 {@link SendCallback}
* @param topicPublishInfo topic发布路由信息 {@link TopicPublishInfo}
* @param instance {@link MQClientInstance}
* @param retryTimesWhenSendFailed 重试次数
* @param context {@link SendMessageContext}
* @param producer {@link DefaultMQProducerImpl}
* @return
* @throws RemotingException
* @throws MQBrokerException
* @throws InterruptedException
*/
public SendResult sendMessage(
final String addr,
final String brokerName,
final Message msg,
final SendMessageRequestHeader requestHeader,
final long timeoutMillis,
final CommunicationMode communicationMode,
final SendCallback sendCallback,
final TopicPublishInfo topicPublishInfo,
final MQClientInstance instance,
final int retryTimesWhenSendFailed,
final SendMessageContext context,
final DefaultMQProducerImpl producer
) throws RemotingException, MQBrokerException, InterruptedException {
long beginStartTime = System.currentTimeMillis();
RemotingCommand request = null;
// 根据消息类型,判断发送请求码
String msgType = msg.getProperty(MessageConst.PROPERTY_MESSAGE_TYPE);
boolean isReply = msgType != null && msgType.equals(MixAll.REPLY_MESSAGE_FLAG);
if (isReply) {
if (sendSmartMsg) {
SendMessageRequestHeaderV2 requestHeaderV2 = SendMessageRequestHeaderV2.createSendMessageRequestHeaderV2(requestHeader);
request = RemotingCommand.createRequestCommand(RequestCode.SEND_REPLY_MESSAGE_V2, requestHeaderV2);
} else {
request = RemotingCommand.createRequestCommand(RequestCode.SEND_REPLY_MESSAGE, requestHeader);
}
} else {
// 批量消息发送,请求码SEND_BATCH_MESSAGE
if (sendSmartMsg || msg instanceof MessageBatch) {
SendMessageRequestHeaderV2 requestHeaderV2 = SendMessageRequestHeaderV2.createSendMessageRequestHeaderV2(requestHeader);
request = RemotingCommand.createRequestCommand(msg instanceof MessageBatch ? RequestCode.SEND_BATCH_MESSAGE : RequestCode.SEND_MESSAGE_V2, requestHeaderV2);
}
// 单个消息发送,请求码SEND_MESSAGE
else {
request = RemotingCommand.createRequestCommand(RequestCode.SEND_MESSAGE, requestHeader);
}
}
request.setBody(msg.getBody()); // 请求体,就是消息体
/*
发送消息请求到broker端
broker端处理发送消息总入口:SendMessageProcessor.processRequest
*/
switch (communicationMode) {
case ONEWAY:
this.remotingClient.invokeOneway(addr, request, timeoutMillis);
return null;
case ASYNC:
final AtomicInteger times = new AtomicInteger();
long costTimeAsync = System.currentTimeMillis() - beginStartTime;
if (timeoutMillis < costTimeAsync) {
throw new RemotingTooMuchRequestException("sendMessage call timeout");
}
this.sendMessageAsync(addr, brokerName, msg, timeoutMillis - costTimeAsync, request, sendCallback, topicPublishInfo, instance,
retryTimesWhenSendFailed, times, context, producer);
return null;
case SYNC:
long costTimeSync = System.currentTimeMillis() - beginStartTime;
if (timeoutMillis < costTimeSync) {
throw new RemotingTooMuchRequestException("sendMessage call timeout");
}
return this.sendMessageSync(addr, brokerName, msg, timeoutMillis - costTimeSync, request);
default:
assert false;
break;
}
return null;
}
2).Broker接收发送消息
org.apache.rocketmq.broker.processor.SendMessageProcessor是Broker接收消息的实现类,其代码如下。
// broker处理发送消息的请求
@Override
public RemotingCommand processRequest(ChannelHandlerContext ctx,
RemotingCommand request) throws RemotingCommandException {
SendMessageContext traceContext;
switch (request.getCode()) {
// 消费段消费ACK确认发送请求入口MQClientAPIImpl.consumerSendMessageBack
case RequestCode.CONSUMER_SEND_MSG_BACK:
return this.consumerSendMsgBack(ctx, request);
default:
SendMessageRequestHeader requestHeader = parseRequestHeader(request);
if (requestHeader == null) {
return null;
}
// broker的topic映射队列上下文
TopicQueueMappingContext mappingContext = this.brokerController.getTopicQueueMappingManager().buildTopicQueueMappingContext(requestHeader, true);
RemotingCommand rewriteResult = this.brokerController.getTopicQueueMappingManager().rewriteRequestForStaticTopic(requestHeader, mappingContext);
if (rewriteResult != null) {
return rewriteResult;
}
traceContext = buildMsgContext(ctx, requestHeader);
String owner = request.getExtFields().get(BrokerStatsManager.COMMERCIAL_OWNER);
traceContext.setCommercialOwner(owner);
// 执行发送前钩子函数
try {
this.executeSendMessageHookBefore(ctx, request, traceContext);
} catch (AbortProcessException e) {
final RemotingCommand errorResponse = RemotingCommand.createResponseCommand(e.getResponseCode(), e.getErrorMessage());
errorResponse.setOpaque(request.getOpaque());
return errorResponse;
}
RemotingCommand response;
// 批量消息
if (requestHeader.isBatch()) {
response = this.sendBatchMessage(ctx, request, traceContext, requestHeader, mappingContext,
(ctx1, response1) -> executeSendMessageHookAfter(response1, ctx1));
}
// 单个消息
else {
response = this.sendMessage(ctx, request, traceContext, requestHeader, mappingContext,
(ctx12, response12) -> executeSendMessageHookAfter(response12, ctx12));
}
return response;
}
}
五、参考资料
https://www.cnblogs.com/wzh2010/p/16629876.html
https://blog.csdn.net/m0_37543627/article/details/128542505
https://blog.csdn.net/m0_37543627/article/details/128551723