消息生产者的代码都在client模块中,相对于RocketMQ来讲,消息生产者就是客户端,也是消息的提供者。
启动流程
代码:DefaultMQProducerImpl#start
public void start(final boolean startFactory) throws MQClientException {
switch (this.serviceState) {
case CREATE_JUST:
this.checkConfig();
// ...
this.mQClientFactory = MQClientManager.getInstance().getOrCreateMQClientInstance(this.defaultMQProducer, rpcHook);
// ...
//注册当前生产者到到MQClientInstance管理中,方便后续调用网路请求
boolean registerOK = mQClientFactory.registerProducer(this.defaultMQProducer.getProducerGroup(), this);
// ...
// 启动生产者
mQClientFactory.start();
// ...
this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();
RequestFutureHolder.getInstance().startScheduledTask(this);
}
}
整个JVM中只存在一个MQClientManager实例,维护一个MQClientInstance缓存表
ConcurrentMap<String/* clientId */, MQClientInstance> factoryTable = new ConcurrentHashMap<String,MQClientInstance>();
同一个clientId只会创建一个MQClientInstance。
MQClientInstance封装了RocketMQ网络处理API,是消息生产者和消息消费者与NameServer、Broker打交道的网络通道
org.apache.rocketmq.client.impl.MQClientManager#getOrCreateMQClientInstance(org.apache.rocketmq.client.ClientConfig, org.apache.rocketmq.remoting.RPCHook)
消息发送
org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl#sendDefaultImpl
- 验证消息
// 消息验证 Validators.checkMessage(msg, this.defaultMQProducer); public static void checkMessage(Message msg, DefaultMQProducer defaultMQProducer) throws MQClientException { if (null == msg) { throw new MQClientException(ResponseCode.MESSAGE_ILLEGAL, "the message is null"); } // topic Validators.checkTopic(msg.getTopic()); Validators.isNotAllowedSendTopic(msg.getTopic()); // body if (null == msg.getBody()) { throw new MQClientException(ResponseCode.MESSAGE_ILLEGAL, "the message body is null"); } if (0 == msg.getBody().length) { throw new MQClientException(ResponseCode.MESSAGE_ILLEGAL, "the message body length is zero"); } if (msg.getBody().length > defaultMQProducer.getMaxMessageSize()) { throw new MQClientException(ResponseCode.MESSAGE_ILLEGAL, "the message body size over max value, MAX: " + defaultMQProducer.getMaxMessageSize()); } }
- 查找路由
TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic()); private TopicPublishInfo tryToFindTopicPublishInfo(final String topic) { TopicPublishInfo topicPublishInfo = this.topicPublishInfoTable.get(topic); if (null == topicPublishInfo || !topicPublishInfo.ok()) { this.topicPublishInfoTable.putIfAbsent(topic, new TopicPublishInfo()); this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic); topicPublishInfo = this.topicPublishInfoTable.get(topic); } if (topicPublishInfo.isHaveTopicRouterInfo() || topicPublishInfo.ok()) { return topicPublishInfo; } else { this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic, true, this.defaultMQProducer); topicPublishInfo = this.topicPublishInfoTable.get(topic); return topicPublishInfo; } }
public class TopicPublishInfo { private boolean orderTopic = false; //是否是顺序消息 private boolean haveTopicRouterInfo = false; private List<MessageQueue> messageQueueList = new ArrayList<MessageQueue>(); //该主题消息队列 private volatile ThreadLocalIndex sendWhichQueue = new ThreadLocalIndex();//每选择一次消息队列,该值+1 private TopicRouteData topicRouteData;//关联Topic路由元信息 }
代码:MQClientInstance#updateTopicRouteInfoFromNameServer
TopicRouteData topicRouteData; //使用默认主题从NameServer获取路由信息 if (isDefault && defaultMQProducer != null) { topicRouteData = this.mQClientAPIImpl.getDefaultTopicRouteInfoFromNameServer(clientConfig.getMqClientApiTimeout()); if (topicRouteData != null) { for (QueueData data : topicRouteData.getQueueDatas()) { int queueNums = Math.min(defaultMQProducer.getDefaultTopicQueueNums(), data.getReadQueueNums()); data.setReadQueueNums(queueNums); data.setWriteQueueNums(queueNums); } } } else { //使用指定主题从NameServer获取路由信息 topicRouteData = this.mQClientAPIImpl.getTopicRouteInfoFromNameServer(topic, clientConfig.getMqClientApiTimeout()); }
if (topicRouteData != null) { TopicRouteData old = this.topicRouteTable.get(topic); boolean changed = topicRouteData.topicRouteDataChanged(old); //判断路由是否需要更改 if (!changed) { changed = this.isNeedUpdateTopicRouteInfo(topic); } else { log.info("the topic[{}] route info changed, old[{}] ,new[{}]", topic, old, topicRouteData); }
if (changed) { // Update endpoint map { ConcurrentMap<MessageQueue, String> mqEndPoints = topicRouteData2EndpointsForStaticTopic(topic, topicRouteData); if (!mqEndPoints.isEmpty()) { topicEndPointsTable.put(topic, mqEndPoints); } } // Update Pub info { TopicPublishInfo publishInfo = topicRouteData2TopicPublishInfo(topic, topicRouteData); publishInfo.setHaveTopicRouterInfo(true); for (Entry<String, MQProducerInner> entry : this.producerTable.entrySet()) { MQProducerInner impl = entry.getValue(); if (impl != null) { impl.updateTopicPublishInfo(topic, publishInfo); } } } // Update sub info if (!consumerTable.isEmpty()) { Set<MessageQueue> subscribeInfo = topicRouteData2TopicSubscribeInfo(topic, topicRouteData); for (Entry<String, MQConsumerInner> entry : this.consumerTable.entrySet()) { MQConsumerInner impl = entry.getValue(); if (impl != null) { impl.updateTopicSubscribeInfo(topic, subscribeInfo); } } } }
选择队列
- 默认不启用Broker故障延迟机制
代码:org.apache.rocketmq.client.latency.MQFaultStrategy#selectOneMessageQueue
public MessageQueue selectOneMessageQueue(final String lastBrokerName) {
//第一次选择队列
if (lastBrokerName == null) {
return selectOneMessageQueue();
} else {
//遍历消息队列集合
for (int i = 0; i < this.messageQueueList.size(); i++) {
//sendWhichQueue自增后取模
int index = this.sendWhichQueue.incrementAndGet();
int pos = index % this.messageQueueList.size();
MessageQueue mq = this.messageQueueList.get(pos);
//规避上次Broker队列
if (!mq.getBrokerName().equals(lastBrokerName)) {
return mq;
}
}
//如果以上情况都不满足,返回sendWhichQueue取模后的队列
return selectOneMessageQueue();
}
}
- 启用Broker故障延迟机制
public MessageQueue selectOneMessageQueue(final TopicPublishInfo tpInfo, final String lastBrokerName) {
//Broker故障延迟机制
if (this.sendLatencyFaultEnable) {
try {
//对sendWhichQueue自增
int index = tpInfo.getSendWhichQueue().getAndIncrement();
//对消息队列轮询获取一个队列
for (int i = 0; i < tpInfo.getMessageQueueList().size(); i++) {
int pos = Math.abs(index++) % tpInfo.getMessageQueueList().size();
if (pos < 0)
pos = 0;
MessageQueue mq = tpInfo.getMessageQueueList().get(pos);
//验证该队列是否可用
if (latencyFaultTolerance.isAvailable(mq.getBrokerName())) {
//可用
if (null == lastBrokerName || mq.getBrokerName().equals(lastBrokerName))
return mq;
}
}
//从规避的Broker中选择一个可用的Broker
final String notBestBroker = latencyFaultTolerance.pickOneAtLeast();
//获得Broker的写队列集合
int writeQueueNums = tpInfo.getQueueIdByBroker(notBestBroker);
if (writeQueueNums > 0) {
//获得一个队列,指定broker和队列ID并返回
final MessageQueue mq = tpInfo.selectOneMessageQueue();
if (notBestBroker != null) {
mq.setBrokerName(notBestBroker);
mq.setQueueId(tpInfo.getSendWhichQueue().getAndIncrement() % writeQueueNums);
}
return mq;
} else {
latencyFaultTolerance.remove(notBestBroker);
}
} catch (Exception e) {
log.error("Error occurred when selecting message queue", e);
}
return tpInfo.selectOneMessageQueue();
}
return tpInfo.selectOneMessageQueue(lastBrokerName);
}
原理分析
代码:DefaultMQProducerImpl#sendDefaultImpl
endTimestamp = System.currentTimeMillis();
this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true);
public void updateFaultItem(final String brokerName, final long currentLatency, boolean isolation) {
//计算broker规避的时长
if (this.sendLatencyFaultEnable) {
long duration = computeNotAvailableDuration(isolation ? 30000 : currentLatency);
//更新该FaultItem规避时长
this.latencyFaultTolerance.updateFaultItem(brokerName, currentLatency, duration);
}
}
public void updateFaultItem(final String name, final long currentLatency, final long notAvailableDuration) {
//获得原FaultItem
FaultItem old = this.faultItemTable.get(name);
//为空新建faultItem对象,设置规避时长和开始时间
if (null == old) {
final FaultItem faultItem = new FaultItem(name);
faultItem.setCurrentLatency(currentLatency);
faultItem.setStartTimestamp(System.currentTimeMillis() + notAvailableDuration);
old = this.faultItemTable.putIfAbsent(name, faultItem);
if (old != null) {
old.setCurrentLatency(currentLatency);
old.setStartTimestamp(System.currentTimeMillis() + notAvailableDuration);
}
} else {
//更新规避时长和开始时间
old.setCurrentLatency(currentLatency);
old.setStartTimestamp(System.currentTimeMillis() + notAvailableDuration);
}
}
4.发送消息
消息发送API核心入口DefaultMQProducerImpl#sendKernelImpl
private SendResult sendKernelImpl(
final Message msg, //待发送消息
final MessageQueue mq, //消息发送队列
final CommunicationMode communicationMode, //消息发送内模式
final SendCallback sendCallback, pp //异步消息回调函数
final TopicPublishInfo topicPublishInfo, //主题路由信息
final long timeout //超时时间
)
代码:DefaultMQProducerImpl#sendKernelImpl
long beginStartTime = System.currentTimeMillis();
String brokerName = this.mQClientFactory.getBrokerNameFromMessageQueue(mq);
//获得broker网络地址信息
String brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(brokerName);
//没有找到从NameServer更新broker网络地址信息
if (null == brokerAddr) {
tryToFindTopicPublishInfo(mq.getTopic());
brokerName = this.mQClientFactory.getBrokerNameFromMessageQueue(mq);
brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(brokerName);
}
//为消息分类唯一ID
if (!(msg instanceof MessageBatch)) {
MessageClientIDSetter.setUniqID(msg);
}
boolean topicWithNamespace = false;
if (null != this.mQClientFactory.getClientConfig().getNamespace()) {
msg.setInstanceId(this.mQClientFactory.getClientConfig().getNamespace());
topicWithNamespace = true;
}
//消息大小超过4K,启用消息压缩
int sysFlag = 0;
boolean msgBodyCompressed = false;
if (this.tryToCompressMessage(msg)) {
sysFlag |= MessageSysFlag.COMPRESSED_FLAG;
msgBodyCompressed = true;
}
//如果是事务消息,设置消息标记MessageSysFlag.TRANSACTION_PREPARED_TYPE
final String tranMsg = msg.getProperty(MessageConst.PROPERTY_TRANSACTION_PREPARED);
if (tranMsg != null && Boolean.parseBoolean(tranMsg)) {
sysFlag |= MessageSysFlag.TRANSACTION_PREPARED_TYPE;
}
//如果注册了消息发送钩子函数,在执行消息发送前的增强逻辑
if (this.hasSendMessageHook()) {
context = new SendMessageContext();
context.setProducer(this);
context.setProducerGroup(this.defaultMQProducer.getProducerGroup());
context.setCommunicationMode(communicationMode);
context.setBornHost(this.defaultMQProducer.getClientIP());
context.setBrokerAddr(brokerAddr);
context.setMessage(msg);
context.setMq(mq);
context.setNamespace(this.defaultMQProducer.getNamespace());
String isTrans = msg.getProperty(MessageConst.PROPERTY_TRANSACTION_PREPARED);
if (isTrans != null && isTrans.equals("true")) {
context.setMsgType(MessageType.Trans_Msg_Half);
}
if (msg.getProperty("__STARTDELIVERTIME") != null || msg.getProperty(MessageConst.PROPERTY_DELAY_TIME_LEVEL) != null) {
context.setMsgType(MessageType.Delay_Msg);
}
this.executeSendMessageHookBefore(context);
}
代码:SendMessageHook
public interface SendMessageHook {
String hookName();
void sendMessageBefore(final SendMessageContext context);
void sendMessageAfter(final SendMessageContext context);
}
代码:DefaultMQProducerImpl#sendKernelImpl
//构建消息发送请求包
SendMessageRequestHeader requestHeader = new SendMessageRequestHeader();
//生产者组
requestHeader.setProducerGroup(this.defaultMQProducer.getProducerGroup());
//主题
requestHeader.setTopic(msg.getTopic());
//默认创建主题Key
requestHeader.setDefaultTopic(this.defaultMQProducer.getCreateTopicKey());
//该主题在单个Broker默认队列树
requestHeader.setDefaultTopicQueueNums(this.defaultMQProducer.getDefaultTopicQueueNums());
//队列ID
requestHeader.setQueueId(mq.getQueueId());
//消息系统标记
requestHeader.setSysFlag(sysFlag);
//消息发送时间
requestHeader.setBornTimestamp(System.currentTimeMillis());
//消息标记
requestHeader.setFlag(msg.getFlag());
//消息扩展信息
requestHeader.setProperties(MessageDecoder.messageProperties2String(msg.getProperties()));
//消息重试次数
requestHeader.setReconsumeTimes(0);
requestHeader.setUnitMode(this.isUnitMode());
//是否是批量消息等
requestHeader.setBatch(msg instanceof MessageBatch);
if (requestHeader.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
String reconsumeTimes = MessageAccessor.getReconsumeTime(msg);
if (reconsumeTimes != null) {
requestHeader.setReconsumeTimes(Integer.valueOf(reconsumeTimes));
MessageAccessor.clearProperty(msg, MessageConst.PROPERTY_RECONSUME_TIME);
}
String maxReconsumeTimes = MessageAccessor.getMaxReconsumeTimes(msg);
if (maxReconsumeTimes != null) {
requestHeader.setMaxReconsumeTimes(Integer.valueOf(maxReconsumeTimes));
MessageAccessor.clearProperty(msg, MessageConst.PROPERTY_MAX_RECONSUME_TIMES);
}
}
//如果注册了钩子函数,则发送完毕后执行钩子函数
if (this.hasSendMessageHook()) {
context.setSendResult(sendResult);
this.executeSendMessageHookAfter(context);
}
2.3.4 批量消息发送
2.4.7 刷盘机制