目录
一、前言
二、实现生产者发送消息
1、启动生产者
1.1、RocketMQTemplate消息发送模板
1.2、afterPropertiesSet()逻辑
1.3、DefaultMQProducer#start()逻辑
2、DefaultMQProducer#start()启动逻辑
2.1、更新路由信息到本地
2.2、从本地获取主题Topic信息
2.3、数据更新维护
3、生产者发送消息
3.1、消息发送默认实现与重试
3.2、异步发送
一、前言
前面的文章我们聊了RocketMQ如何集成到SpringBoot,基于SpringBoot的自动装配机制;还聊了消费者的启动流程,其实跟我们下面将要聊的生产者启动流程是类似的。本篇文章我们补充一下对RocketMQTemplate消息发送模板介绍,它里面的方法笔者在另一个专栏《Java基础及实战》已经对其接口详细描述了,代码层面简单就不介绍了,感兴趣的可以去看看。本篇文章我们重点讨论生产者是如何实现消息发送的、消息发送失败如何重试、重试次数、如何检测超时、异步发送?如何进行数据维护的、数据结构、缓存本地?下面我们来探究探究:
二、实现生产者发送消息
1、启动生产者
1.1、RocketMQTemplate消息发送模板
public class RocketMQTemplate extends AbstractMessageSendingTemplate<String> implements InitializingBean, DisposableBean {
private static final Logger log = LoggerFactory.getLogger(RocketMQTemplate.class);
/**
* 默认消息生产者,在RocketMQAutoConfiguration进行SpringBoot自动装配时先初始化
* 然后在RocketMQTemplate初始化的过程中,进行setter注入
*/
private DefaultMQProducer producer;
/**
* 处理json数据
*/
private ObjectMapper objectMapper;
private String charset = "UTF-8";
private MessageQueueSelector messageQueueSelector = new SelectMessageQueueByHash();
private final Map<String, TransactionMQProducer> cache = new ConcurrentHashMap();
public RocketMQTemplate() {
}
// .....此处省略n行代码.......
}
RocketMQTemplate消息发送模板扩展了Spring的AbstractMessageSendingTemplate功能,同时实现了InitializingBean接口的afterPropertiesSet()方法逻辑下面分析(上一篇也分析了)、DisposableBean接口的destroy()方法(完成资源回收,感兴趣的读者可以去看下)。RocketMQTemplate消息发送模板只有一个单纯构造方法,有几个字段,之前在前面文章也分析了是通过setter注入的,其它几个字段直接new初始化了。
1.2、afterPropertiesSet()逻辑
@Override
public void afterPropertiesSet() throws Exception {
if (this.producer != null) {
this.producer.start();
}
}
这里逻辑不多,比消费者的少了很多逻辑,这里就预校验以防空指针异常,然后就调用setter注入的生产者producer的start()方法启动生产者。
1.3、DefaultMQProducer#start()逻辑
@Override
public void start() throws MQClientException {
this.setProducerGroup(withNamespace(this.producerGroup));
this.defaultMQProducerImpl.start();
if (null != traceDispatcher) {
try {
traceDispatcher.start(this.getNamesrvAddr(), this.getAccessChannel());
} catch (MQClientException e) {
log.warn("trace dispatcher start failed ", e);
}
}
}
这里的逻辑并不多,跟消费者的启动流程类似。DefaultMQProducer是发送消息的应用程序的入口点。优化公开 getter/setter 方法的字段是可以的,但是请记住,对于大多数场景,所有这些字段都应该能够很好地开箱即用。这个类聚合了各种send方法来向代理传递消息。它们各有优缺点; 在实际编写代码之前,最好了解它们的优缺点。线程安全: 在配置和启动进程之后,这个类可以被认为是线程安全的,可以在多线程上下文中使用。
2、DefaultMQProducer#start()启动逻辑
public void start() throws MQClientException {
this.start(true);
}
public void start(final boolean startFactory) throws MQClientException {
switch (this.serviceState) {
case CREATE_JUST:
// 置为失败状态,ServiceState封装的代码可重用
this.serviceState = ServiceState.START_FAILED;
// 校验配置,逻辑较少
this.checkConfig();
if (!this.defaultMQProducer.getProducerGroup().equals(MixAll.CLIENT_INNER_PRODUCER_GROUP)) {
this.defaultMQProducer.changeInstanceNameToPID();
}
this.mQClientFactory = MQClientManager.getInstance()
.getOrCreateMQClientInstance(this.defaultMQProducer, rpcHook);
boolean registerOK = mQClientFactory
.registerProducer(this.defaultMQProducer.getProducerGroup(), this);
if (!registerOK) {
this.serviceState = ServiceState.CREATE_JUST;
throw new MQClientException("The producer group[" + this.defaultMQProducer.getProducerGroup()
+ "] has been created before, specify another name please."
+ FAQUrl.suggestTodo(FAQUrl.GROUP_NAME_DUPLICATE_URL),
null);
}
// 缓存路由信息
this.topicPublishInfoTable.put(this.defaultMQProducer.getCreateTopicKey(), new TopicPublishInfo());
if (startFactory) {
mQClientFactory.start();
}
log.info("the producer [{}] start OK. sendMessageWithVIPChannel={}", this.defaultMQProducer.getProducerGroup(),
this.defaultMQProducer.isSendMessageWithVIPChannel());
this.serviceState = ServiceState.RUNNING;
break;
case RUNNING:
case START_FAILED:
case SHUTDOWN_ALREADY:
throw new MQClientException("The producer service state not OK, maybe started once, "
+ this.serviceState
+ FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK),
null);
default:
break;
}
// 发送心跳到所有的Broker并且加锁
this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();
// 启动定时任务
RequestFutureHolder.getInstance().startScheduledTask(this);
}
主要逻辑:
- 置为失败状态,ServiceState封装的代码可重用
- 校验配置,逻辑较少
- 注册生产者,维护生产者信息到MQClientInstance的(ConcurrentMap<String/* group */, MQProducerInner> 类型)producerTable字段,从中可以获取主题等信息。
- 缓存路由信息,里面维护了messageQueueList逻辑消息队列列表、TopicRouteData 路由表数据等几个字段。
- 启动MQClientInstance,跟消费者启动逻辑同理,下面补充一下
- 发送心跳到所有的Broker并且加锁
- 启动定时任务,延迟3秒执行,周期性间隔1秒执行扫描过期请求并移除
2.1、更新路由信息到本地
// 延迟10毫秒,周期性间隔30秒定时执行
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
// 更新路由信息到本地,在消息发送的时候需用到路由信息
MQClientInstance.this.updateTopicRouteInfoFromNameServer();
} catch (Exception e) {
log.error("ScheduledTask updateTopicRouteInfoFromNameServer exception", e);
}
}
}, 10, this.clientConfig.getPollNameServerInterval(), TimeUnit.MILLISECONDS);
MQClientInstance的底层实现涉及到远程通信模块,也维护了好几个定时任务。这里延迟10毫秒,周期性间隔30秒定时执行远程获取名字服务NameServer路由信息并更新路由信息到本地缓存起来,在消息发送的时候需用到路由信息
2.2、从本地获取主题Topic信息
public void updateTopicRouteInfoFromNameServer() {
Set<String> topicList = new HashSet<String>();
// Consumer
{
Iterator<Entry<String, MQConsumerInner>> it = this.consumerTable.entrySet().iterator();
// 遍历已经注册的消费者表
while (it.hasNext()) {
Entry<String, MQConsumerInner> entry = it.next();
MQConsumerInner impl = entry.getValue();
if (impl != null) {
// 获取消费者订阅数据
Set<SubscriptionData> subList = impl.subscriptions();
if (subList != null) {
// 遍历订阅数据获取主题
for (SubscriptionData subData : subList) {
topicList.add(subData.getTopic());
}
}
}
}
}
// Producer
{
Iterator<Entry<String, MQProducerInner>> it = this.producerTable.entrySet().iterator();
// 遍历已经注册的消生产者表
while (it.hasNext()) {
Entry<String, MQProducerInner> entry = it.next();
MQProducerInner impl = entry.getValue();
if (impl != null) {
// 从路由表获取主题
Set<String> lst = impl.getPublishTopicList();
topicList.addAll(lst);
}
}
}
for (String topic : topicList) {
this.updateTopicRouteInfoFromNameServer(topic);
}
}
- 该方法主要逻辑就是从本地获取主题信息,然后遍历主题列表拉取名字服务NameServer服务端中TopicRouteInfo信息然后更新到客户端本地。
- consumerTable(ConcurrentMap<String/* group */, MQConsumerInner>类型)维护了消费者注册的信息,MQConsumerInner是内部消息消费接口,实现类有push推模式的DefaultMQPushConsumerImpl,另一个是pull拉模式的DefaultLitePullConsumerImpl(2022版本)。消费者的订阅数据维护在Consumer端实现负载平衡的核心类RebalanceImpl的(ConcurrentMap<String /* topic */, SubscriptionData>类型) subscriptionInner字段
- 从本地producerTable(ConcurrentMap<String/* group */, MQProducerInner>类型)获取已经注册的生产者信息,MQProducerInner的实现类目前只有DefaultMQProducerImpl,其维护了(ConcurrentMap<String/* topic */, TopicPublishInfo> 类型)topicPublishInfoTable字段,这里获取主题列表就是从该字段通过keySet()方法转换过来的。
2.3、数据更新维护
public boolean updateTopicRouteInfoFromNameServer(final String topic, boolean isDefault,
DefaultMQProducer defaultMQProducer) {
try {
// 尝试加锁,最多3秒
if (this.lockNamesrv.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) {
try {
TopicRouteData topicRouteData;
if (isDefault && defaultMQProducer != null) {
// 从名字服务NameServer拉取路由表信息
topicRouteData = this.mQClientAPIImpl.getDefaultTopicRouteInfoFromNameServer(defaultMQProducer.getCreateTopicKey(),
clientConfig.getMqClientApiTimeout());
if (topicRouteData != null) {
for (QueueData data : topicRouteData.getQueueDatas()) {
int queueNums = Math.min(defaultMQProducer.getDefaultTopicQueueNums(), data.getReadQueueNums());
data.setReadQueueNums(queueNums);
data.setWriteQueueNums(queueNums);
}
}
} else {
// 从名字服务NameServer拉取路由表信息
topicRouteData = this.mQClientAPIImpl.getTopicRouteInfoFromNameServer(topic, clientConfig.getMqClientApiTimeout());
}
if (topicRouteData != null) {
// 根据此主题从本地路由表获取旧数据
TopicRouteData old = this.topicRouteTable.get(topic);
// 判断是否发生了变化,通过构建、排序以及equals()比对是否相同(!old.equals(now))
boolean changed = topicRouteDataIsChange(old, topicRouteData);
if (!changed) {
// 主要判断此主题下数据是否存在
changed = this.isNeedUpdateTopicRouteInfo(topic);
} else {
log.info("the topic[{}] route info changed, old[{}] ,new[{}]", topic, old, topicRouteData);
}
if (changed) {
for (BrokerData bd : topicRouteData.getBrokerDatas()) {
this.brokerAddrTable.put(bd.getBrokerName(), bd.getBrokerAddrs());
}
// Update endpoint map更新端点映射到本地缓存
{
ConcurrentMap<MessageQueue, String> mqEndPoints = topicRouteData2EndpointsForStaticTopic(topic, topicRouteData);
if (mqEndPoints != null && !mqEndPoints.isEmpty()) {
topicEndPointsTable.put(topic, mqEndPoints);
}
}
// Update Pub info更新生产者的TopicPublishInfo信息
{
TopicPublishInfo publishInfo = topicRouteData2TopicPublishInfo(topic, topicRouteData);
publishInfo.setHaveTopicRouterInfo(true);
Iterator<Entry<String, MQProducerInner>> it = this.producerTable.entrySet().iterator();
while (it.hasNext()) {
Entry<String, MQProducerInner> entry = it.next();
MQProducerInner impl = entry.getValue();
if (impl != null) {
// 更新路由信息
impl.updateTopicPublishInfo(topic, publishInfo);
}
}
}
// Update sub info更新消费者的订阅数据
if (!consumerTable.isEmpty()) {
Set<MessageQueue> subscribeInfo = topicRouteData2TopicSubscribeInfo(topic, topicRouteData);
Iterator<Entry<String, MQConsumerInner>> it = this.consumerTable.entrySet().iterator();
while (it.hasNext()) {
Entry<String, MQConsumerInner> entry = it.next();
MQConsumerInner impl = entry.getValue();
if (impl != null) {
impl.updateTopicSubscribeInfo(topic, subscribeInfo);
}
}
}
TopicRouteData cloneTopicRouteData = new TopicRouteData(topicRouteData);
log.info("topicRouteTable.put. Topic = {}, TopicRouteData[{}]", topic, cloneTopicRouteData);
// 消费者路由表
this.topicRouteTable.put(topic, cloneTopicRouteData);
return true;
}
} else {
log.warn("updateTopicRouteInfoFromNameServer, getTopicRouteInfoFromNameServer return null, Topic: {}. [{}]", topic, this.clientId);
}
} catch (MQClientException e) {
if (!topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX) && !topic.equals(TopicValidator.AUTO_CREATE_TOPIC_KEY_TOPIC)) {
log.warn("updateTopicRouteInfoFromNameServer Exception", e);
}
} catch (RemotingException e) {
log.error("updateTopicRouteInfoFromNameServer Exception", e);
throw new IllegalStateException(e);
} finally {
this.lockNamesrv.unlock();
}
} else {
log.warn("updateTopicRouteInfoFromNameServer tryLock timeout {}ms. [{}]", LOCK_TIMEOUT_MILLIS, this.clientId);
}
} catch (InterruptedException e) {
log.warn("updateTopicRouteInfoFromNameServer Exception", e);
}
return false;
}
- 尝试加锁,最多3秒,可以避免死锁
- 根据主题topic从名字服务NameServer拉取路由表信息
- 根据此主题从本地路由表topicRouteTable获取旧数据TopicRouteData
- 判断是否发生了变化,通过构建、排序以及equals()比对是否相同(!old.equals(now))
- 再次判断主要判断此主题下数据是否存在:判断此主题下生产者本地路由表信息topicPublishInfoTable是否存在或路由表内部消息队列信息是否存在,不存在return true;否则继续判断此主题下消费者下rebalanceImpl.topicSubscribeInfoTable是否包含此主题
- 如果发生了变更,维护本地(ConcurrentMap<String/* Broker Name */, HashMap<Long/* brokerId */, String/* address */>>类型)brokerAddrTable字段
- 如果发生了变更,更新端点映射到本地缓存(ConcurrentMap<String/* Topic */, ConcurrentMap<MessageQueue, String/*brokerName*/>>类型)topicEndPointsTable
- 如果发生了变更,本地producerTable获取已注册生产者信息列表,更新生产者的TopicPublishInfo信息
- 如果发生了变更,更新消费者的订阅数据,维护到rebalanceImpl.topicSubscribeInfoTable
- 更新本地topicRouteTable路由表
3、生产者发送消息
3.1、消息发送默认实现与重试
private SendResult sendDefaultImpl(
Message msg,
final CommunicationMode communicationMode,
final SendCallback sendCallback,
final long timeout
) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
// 确保服务为运行状态
this.makeSureStateOK();
// 校验消息是否为空,消息体是否空,消息大小不能超过4M(默认可配置);否则抛异常
Validators.checkMessage(msg, this.defaultMQProducer);
// 随机调用ID
final long invokeID = random.nextLong();
long beginTimestampFirst = System.currentTimeMillis();
long beginTimestampPrev = beginTimestampFirst;
long endTimestamp = beginTimestampFirst;
// 先根据Topic找到指定的TopicPublishInfo路由信息,首先尝试从本地topicPublishInfoTable获取,没有则从名字服务NameServer获取并缓存到本地
TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());
// TopicPublishInfo路由信息不为空,消息队列可用
if (topicPublishInfo != null && topicPublishInfo.ok()) {
boolean callTimeout = false;
MessageQueue mq = null;
Exception exception = null;
SendResult sendResult = null;
// 重试次数,同步才有可能重试默认重试2次
int timesTotal = communicationMode == CommunicationMode.SYNC ? 1 + this.defaultMQProducer
.getRetryTimesWhenSendFailed() : 1;
// 由0开始,那么同步发送默认可执行3次如果发送失败
int times = 0;
String[] brokersSent = new String[timesTotal];
// 同样的重试机制借助循环体
for (; times < timesTotal; times++) {
String lastBrokerName = null == mq ? null : mq.getBrokerName();
// 根据容错策略选取一条消息队列投递消息,topicPublishInfo中维护了逻辑消费队列列表等信息
MessageQueue mqSelected = this.selectOneMessageQueue(topicPublishInfo, lastBrokerName);
if (mqSelected != null) {
mq = mqSelected;
brokersSent[times] = mq.getBrokerName();
try {
beginTimestampPrev = System.currentTimeMillis();
if (times > 0) {
//Reset topic with namespace during resend.在重发期间使用命名空间重置主题。
msg.setTopic(this.defaultMQProducer.withNamespace(msg.getTopic()));
}
long costTime = beginTimestampPrev - beginTimestampFirst;
// 请求超时,选取队列也花费了一部分时间?获取路由信息可能花费一定时间,可能从名字服务端远程获取
if (timeout < costTime) {
callTimeout = true;
break;
}
sendResult = this.sendKernelImpl(msg, mq, communicationMode, sendCallback, topicPublishInfo
, timeout - costTime);
endTimestamp = System.currentTimeMillis();
// 更新,以便生产者端负载均衡选择逻辑消费队列
this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev
, false);
// 通讯模式
switch (communicationMode) {
case ASYNC:
return null;
case ONEWAY:
return null;
// 同步发送才需要返回结果
case SYNC:
if (sendResult.getSendStatus() != SendStatus.SEND_OK) {
if (this.defaultMQProducer.isRetryAnotherBrokerWhenNotStoreOK()) {
continue;
}
}
return sendResult;
default:
break;
}
} catch (RemotingException e) {
endTimestamp = System.currentTimeMillis();
// 更新可容错项,以便下次负载均衡
this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true);
// 打印警告日志,continue重试
log.warn(String.format("sendKernelImpl exception, resend at once, InvokeID: %s, RT: %sms, Broker:" +
" %s", invokeID, endTimestamp - beginTimestampPrev, mq), e);
log.warn(msg.toString());
exception = e;
continue;
} catch (MQClientException e) {
endTimestamp = System.currentTimeMillis();
this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true);
log.warn(String.format("sendKernelImpl exception, resend at once, InvokeID: %s, RT: %sms, Broker: %s"
, invokeID, endTimestamp - beginTimestampPrev, mq), e);
log.warn(msg.toString());
exception = e;
continue;
} catch (MQBrokerException e) {
endTimestamp = System.currentTimeMillis();
this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true);
log.warn(String.format("sendKernelImpl exception, resend at once, InvokeID: %s, RT: %sms, Broker: %s"
, invokeID, endTimestamp - beginTimestampPrev, mq), e);
log.warn(msg.toString());
exception = e;
// 注意这里与上面不同,如果返回的响应编码在重试编码里面,则继续重试;否则抛异常结束这方法体
if (this.defaultMQProducer.getRetryResponseCodes().contains(e.getResponseCode())) {
continue;
} else {
if (sendResult != null) {
return sendResult;
}
throw e;
}
} catch (InterruptedException e) {
// 中断异常情况下不会存在重试机会,直接抛异常结束方法体
endTimestamp = System.currentTimeMillis();
this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false);
log.warn(String.format("sendKernelImpl exception, throw exception, InvokeID: %s, RT: %sms, Broker: %s"
, invokeID, endTimestamp - beginTimestampPrev, mq), e);
log.warn(msg.toString());
throw e;
}
} else {
break;
}
}
// 发送成功,返回发送响应结果
if (sendResult != null) {
return sendResult;
}
// 重试了依然发送失败,抛异常处理
String info = String.format("Send [%d] times, still failed, cost [%d]ms, Topic: %s, BrokersSent: %s",
times,
System.currentTimeMillis() - beginTimestampFirst,
msg.getTopic(),
Arrays.toString(brokersSent));
info += FAQUrl.suggestTodo(FAQUrl.SEND_MSG_FAILED);
MQClientException mqClientException = new MQClientException(info, exception);
if (callTimeout) {
throw new RemotingTooMuchRequestException("sendDefaultImpl call timeout");
}
if (exception instanceof MQBrokerException) {
mqClientException.setResponseCode(((MQBrokerException) exception).getResponseCode());
} else if (exception instanceof RemotingConnectException) {
mqClientException.setResponseCode(ClientErrorCode.CONNECT_BROKER_EXCEPTION);
} else if (exception instanceof RemotingTimeoutException) {
mqClientException.setResponseCode(ClientErrorCode.ACCESS_BROKER_TIMEOUT);
} else if (exception instanceof MQClientException) {
mqClientException.setResponseCode(ClientErrorCode.BROKER_NOT_EXIST_EXCEPTION);
}
throw mqClientException;
}
validateNameServerSetting();
throw new MQClientException("No route info of this topic: " + msg.getTopic() + FAQUrl.suggestTodo(FAQUrl.NO_TOPIC_ROUTE_INFO),
null).setResponseCode(ClientErrorCode.NOT_FOUND_TOPIC_EXCEPTION);
}
- 确保服务为运行状态
- 校验消息是否为空,消息体是否空,消息大小不能超过4M(默认可配置);否则抛异常
- 先根据Topic找到指定的TopicPublishInfo路由信息,首先尝试从本地topicPublishInfoTable获取,没有则从名字服务NameServer获取并缓存到本地
- TopicPublishInfo路由信息不为空,逻辑消息队列可用
- 重试次数,同步才有可能重试默认重试2次;由0开始,那么同步发送默认可执行3次如果发送失败
- 同样的重试机制借助循环体,边界就是重试次数
- 根据容错策略选取一条消息队列投递消息,topicPublishInfo中维护了逻辑消费队列列表messageQueueList(List<MessageQueue> 类型)等信息
- 如果重试次数大于0,在重发期间使用命名空间重置主题。
- 远程调用前检查请求是否超时,选取队列也花费了一部分时间?获取路由信息可能花费一定时间,可能从名字服务端远程获取
- this.sendKernelImpl()预备远程调用,将消息发送到代理服务器Broker以被持久化等,其中有一部分逻辑,感兴趣可以看看
- 远程调用后更新,以便生产者端(客户端)负载均衡选择逻辑消费队列
- switch通讯模式,如果是同步模式,那么需要返回结果
- 异常场景处理都需要更新可容错项,以便下次负载均衡,同时打印日志:1)远程通信网络异常,会continue重试;2)消息客户端异常,可continue重试;3)远程消息代理服务器异常,注意这里与上面不同,如果返回的响应编码在重试编码里面,则继续重试;否则抛异常结束这方法体;4)中断异常情况下不会存在重试机会,直接抛异常结束方法体
- 如果发送成功返回结果;否则重试了依然发送失败,抛异常处理;如果请求超时,抛超时异常,否则封装为MQClientException并抛出。
3.2、异步发送
@Deprecated
public void send(final Message msg, final SendCallback sendCallback, final long timeout)
throws MQClientException, RemotingException, InterruptedException {
ExecutorService executor = this.getAsyncSenderExecutor();
try {
executor.submit(new Runnable() {
@Override
public void run() {
try {
sendDefaultImpl(msg, CommunicationMode.ASYNC, sendCallback, timeout);
} catch (Exception e) {
sendCallback.onException(e);
}
}
});
} catch (RejectedExecutionException e) {
throw new MQClientException("executor rejected ", e);
}
}
异步发送的底层实现就是多线程编程了,通过开启子线程去异步执行远程调用。具体实现细节就是上面讲的消息发送与重试,不过异步是不会存在重试的,想想到底为什么。