前言
RocketMQ消息消费者是如何启动的,有一个步骤是非常重要的,就是启动消息的监听,通过不断的拉取消息,来实现消息的监听,
那具体怎么做,让我们我们跟着源码来学习一下~
流程地图
源码跟踪
这一块的代码比较多,我自己对关键点的一些整理,这个图我画的不是很OK
核心模块(消息拉取)
入口:this.pullMessageService.start();
- 执行线程池run方法,轮流从pullRequestQueue中获取PullRequest
org.apache.rocketmq.client.impl.consumer.PullMessageService#run
声明一个阻塞队列用来存放 PullRequest 对象
PullRequest 用于消息拉取任务,如果 pullRequestQueue 为空则会阻塞,直到拉取任务被放入
private final LinkedBlockingQueue<PullRequest> pullRequestQueue = new LinkedBlockingQueue<PullRequest>();
复制代码
将 stopped 用volatile来修饰,每次执行的时候都检测stopped的状态,线程只要修改了这个状态,其余线程就会马上知道
protected volatile boolean stopped = false;
@Override
public void run() {
log.info(this.getServiceName() + " service started");
// 判断启动状态
while (!this.isStopped()) {
try {
// 取出一个PullRequest对象
PullRequest pullRequest = this.pullRequestQueue.take();
this.pullMessage(pullRequest);
} catch (InterruptedException ignored) {
} catch (Exception e) {
log.error("Pull Message Service Run Method exception", e);
}
}
log.info(this.getServiceName() + " service end");
}
复制代码
- 获取消费队列快照,判断状态是否正常,同时更新最后一次拉取时间
PullMessageService 从消息服务器默认拉取32条消息,按消息的偏移量顺序存放在 ProcessQueue 队列
final MQConsumerInner consumer = this.mQClientFactory.selectConsumer(pullRequest.getConsumerGroup());
复制代码
入口:org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl#pullMessage
// 获取消费队列快照
final ProcessQueue processQueue = pullRequest.getProcessQueue();
if (processQueue.isDropped()) {
log.info("the pull request[{}] is dropped.", pullRequest.toString());
return;
}
// 设置最后一次拉取时间
pullRequest.getProcessQueue().setLastPullTimestamp(System.currentTimeMillis());
复制代码
- 校验客户端运行状态
// 校验状态
this.makeSureStateOK();
private void makeSureStateOK() throws MQClientException {
if (this.serviceState != ServiceState.RUNNING) {
throw new MQClientException("The consumer service state not OK, "
+ this.serviceState
+ FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK),
null);
}
}
复制代码
如果消费者状态不正确,则抛出异常,启动定时线程池过段时间回收 PullRequest 对象,以便pullMessageService能及时唤醒并再次执行消息拉取,这个逻辑在多个地方使用到了
public void executePullRequestLater(final PullRequest pullRequest, final long timeDelay) {
if (!isStopped()) {
this.scheduledExecutorService.schedule(new Runnable() {
@Override
public void run() {
PullMessageService.this.executePullRequestImmediately(pullRequest);
}
}, timeDelay, TimeUnit.MILLISECONDS);
} else {
log.warn("PullMessageServiceScheduledThread has shutdown");
}
}
复制代码
public void executePullRequestImmediately(final PullRequest pullRequest) {
try {
// 最后将pullRequest放入pullRequestQueue中
this.pullRequestQueue.put(pullRequest);
} catch (InterruptedException e) {
log.error("executePullRequestImmediately pullRequestQueue.put", e);
}
}
复制代码
- 校验消费队列中的消息数量和大小是否符合设置
如果触发流量控制,则延迟拉取消息,先将 PullRequest 对象进行回收,以便pullMessageService能及时唤醒并再次执行消息拉取
// 缓存消息条数
long cachedMessageCount = processQueue.getMsgCount().get();
// 缓存消息的大小
long cachedMessageSizeInMiB = processQueue.getMsgSize().get() / (1024 * 1024);
// 当队列中的消息跳过,超过设置 则延迟拉取消息
if (cachedMessageCount > this.defaultMQPushConsumer.getPullThresholdForQueue()) {
this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL);
if ((queueFlowControlTimes++ % 1000) == 0) {
log.warn(
"the cached message count exceeds the threshold {}, so do flow control, minOffset={}, maxOffset={}, count={}, size={} MiB, pullRequest={}, flowControlTimes={}",
this.defaultMQPushConsumer.getPullThresholdForQueue(), processQueue.getMsgTreeMap().firstKey(), processQueue.getMsgTreeMap().lastKey(), cachedMessageCount, cachedMessageSizeInMiB, pullRequest, queueFlowControlTimes);
}
return;
}
复制代码
if (cachedMessageSizeInMiB > this.defaultMQPushConsumer.getPullThresholdSizeForQueue()) {
this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL);
if ((queueFlowControlTimes++ % 1000) == 0) {
log.warn(
"the cached message size exceeds the threshold {} MiB, so do flow control, minOffset={}, maxOffset={}, count={}, size={} MiB, pullRequest={}, flowControlTimes={}",
this.defaultMQPushConsumer.getPullThresholdSizeForQueue(), processQueue.getMsgTreeMap().firstKey(), processQueue.getMsgTreeMap().lastKey(), cachedMessageCount, cachedMessageSizeInMiB, pullRequest, queueFlowControlTimes);
}
return;
}
复制代码
- 根据主题获取配置的订阅关系
这里通过查询 subscriptionInner Map容器,利用主题来获取对应的订阅关系,如果没有找到对应的订阅关系,则延迟拉取消息,先将 PullRequest 对象进行回收以便 pullMessageService 能及时唤醒并再次执行消息拉取
protected final ConcurrentMap<String /* topic */, SubscriptionData> subscriptionInner =
new ConcurrentHashMap<String, SubscriptionData>();
复制代码
final SubscriptionData subscriptionData = this.rebalanceImpl.getSubscriptionInner().get(pullRequest.getMessageQueue().getTopic());
if (null == subscriptionData) {
this.executePullRequestLater(pullRequest, pullTimeDelayMillsWhenException);
log.warn("find the consumer's subscription failed, {}", pullRequest);
return;
}
复制代码
- 如果为集群模式,则从内存中读取位置
通过消费者启动的模块中,我们知道RocketMQ是根据不同模式,将消息进度存储在不同的地方
广播模式:消息进度存储在本地文件
集群模式:消息进度存储在Broker 服务器上
boolean commitOffsetEnable = false;
long commitOffsetValue = 0L;
if (MessageModel.CLUSTERING == this.defaultMQPushConsumer.getMessageModel()) {
// 从内存中读取位置
commitOffsetValue = this.offsetStore.readOffset(pullRequest.getMessageQueue(), ReadOffsetType.READ_FROM_MEMORY);
if (commitOffsetValue > 0) {
commitOffsetEnable = true;
}
}
复制代码
- 内核中拉取消息(最重要的模块)
入口:org.apache.rocketmq.client.impl.consumer.PullAPIWrapper#pullKernelImpl
public PullResult pullKernelImpl(
final MessageQueue mq,
final String subExpression,
final String expressionType,
final long subVersion,
final long offset,
final int maxNums,
final int sysFlag,
final long commitOffset,
final long brokerSuspendMaxTimeMillis,
final long timeoutMillis,
final CommunicationMode communicationMode,
final PullCallback pullCallback
) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {}
复制代码
我们看到他有非常多的参数
拉取流程
- 通过BrokerName找到对应的Broker
// step 1 通过BrokerName找到对应的Broker
FindBrokerResult findBrokerResult =
this.mQClientFactory.findBrokerAddressInSubscribe(mq.getBrokerName(),
this.recalculatePullFromWhichNode(mq), false);
复制代码
- 如果没有找到对应的,则更新路由信息
// step 2 如果没有找到对应的,则更新路由信息
if (null == findBrokerResult) {
this.mQClientFactory.updateTopicRouteInfoFromNameServer(mq.getTopic());
findBrokerResult =
this.mQClientFactory.findBrokerAddressInSubscribe(mq.getBrokerName(),
this.recalculatePullFromWhichNode(mq), false);
}
复制代码
- 检查Broker版本和Tag信息
// check version
if (!ExpressionType.isTagType(expressionType)
&& findBrokerResult.getBrokerVersion() < MQVersion.Version.V4_1_0_SNAPSHOT.ordinal()) {
throw new MQClientException("The broker[" + mq.getBrokerName() + ", "
+ findBrokerResult.getBrokerVersion() + "] does not upgrade to support for filter message by " + expressionType, null);
}
复制代码
- 设置PullMessageRequestHeader
PullMessageRequestHeader requestHeader = new PullMessageRequestHeader();
requestHeader.setConsumerGroup(this.consumerGroup);
requestHeader.setTopic(mq.getTopic());
requestHeader.setQueueId(mq.getQueueId());
requestHeader.setQueueOffset(offset);
requestHeader.setMaxMsgNums(maxNums);
requestHeader.setSysFlag(sysFlagInner);
requestHeader.setCommitOffset(commitOffset);
requestHeader.setSuspendTimeoutMillis(brokerSuspendMaxTimeMillis);
requestHeader.setSubscription(subExpression);
requestHeader.setSubVersion(subVersion);
requestHeader.setExpressionType(expressionType);
复制代码
- 调用pullMessage方法拉取消息,返回拉取结果
PullResult pullResult = this.mQClientFactory.getMQClientAPIImpl().pullMessage(
brokerAddr,
requestHeader,
timeoutMillis,
communicationMode,
pullCallback);
复制代码
因为 CommunicationMode 传递的是ASYNC,我们着重来看一下这个方法
入口: org.apache.rocketmq.client.impl.MQClientAPIImpl#pullMessageAsync
调用 this.remotingClient.invokeAsync(addr, request, timeoutMillis, new InvokeCallback()
这里我们就先不细看了
拉取消息处理
- 如果PullCallback回调成功,则对结果进行处理
// 处理pullResult数据
pullResult = DefaultMQPushConsumerImpl.this.pullAPIWrapper.processPullResult(pullRequest.getMessageQueue(), pullResult,
subscriptionData);
复制代码
主要做了三件事,转换消息格式、设置消息信息、放入msgFoundList
将pullResult 转成 PullResultExt,转换消息格式为List
PullResultExt pullResultExt = (PullResultExt) pullResult;
// 转换消息格式为List
ByteBuffer byteBuffer = ByteBuffer.wrap(pullResultExt.getMessageBinary());
List<MessageExt> msgList = MessageDecoder.decodes(byteBuffer);
复制代码
执行消息过滤,匹配符合的tag
if (!subscriptionData.getTagsSet().isEmpty() && !subscriptionData.isClassFilterMode()) {
msgListFilterAgain = new ArrayList<MessageExt>(msgList.size());
for (MessageExt msg : msgList) {
if (msg.getTags() != null) {
if (subscriptionData.getTagsSet().contains(msg.getTags())) {
msgListFilterAgain.add(msg);
}
}
}
}
复制代码
设置消息的transactionId、扩展属性、BrokerName名称,放入List中
for (MessageExt msg : msgListFilterAgain) {
String traFlag = msg.getProperty(MessageConst.PROPERTY_TRANSACTION_PREPARED);
if (Boolean.parseBoolean(traFlag)) {
msg.setTransactionId(msg.getProperty(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX));
}
MessageAccessor.putProperty(msg, MessageConst.PROPERTY_MIN_OFFSET,
Long.toString(pullResult.getMinOffset()));
MessageAccessor.putProperty(msg, MessageConst.PROPERTY_MAX_OFFSET,
Long.toString(pullResult.getMaxOffset()));
msg.setBrokerName(mq.getBrokerName());
}
pullResultExt.setMsgFoundList(msgListFilterAgain);
复制代码
当pullStatus为FOUND,消息进行提交消费的请求
- 获取第一条消息的offset(偏移量)
// 获取第一条消息的offset
firstMsgOffset = pullResult.getMsgFoundList().get(0).getQueueOffset();
复制代码
- 将读取消息List,更新到processQueue的TreeMap里面
boolean dispatchToConsume = processQueue.putMessage(pullResult.getMsgFoundList());
复制代码
主要做了两件事,循环读取消息list,存入msgTreeMap和计算此次读取信息偏移量
public boolean putMessage(final List<MessageExt> msgs) {
boolean dispatchToConsume = false;
try {
// 上锁
this.treeMapLock.writeLock().lockInterruptibly();
try {
int validMsgCnt = 0;
// 循环读取消息list,存入msgTreeMap
for (MessageExt msg : msgs) {
MessageExt old = msgTreeMap.put(msg.getQueueOffset(), msg);
if (null == old) {
validMsgCnt++;
this.queueOffsetMax = msg.getQueueOffset();
msgSize.addAndGet(msg.getBody().length);
}
}
msgCount.addAndGet(validMsgCnt);
if (!msgTreeMap.isEmpty() && !this.consuming) {
dispatchToConsume = true;
this.consuming = true;
}
if (!msgs.isEmpty()) {
// 获取最后一条消息
MessageExt messageExt = msgs.get(msgs.size() - 1);
// 获取最大偏移量
String property = messageExt.getProperty(MessageConst.PROPERTY_MAX_OFFSET);
...
}
} finally {
this.treeMapLock.writeLock().unlock();
}
}
...
}
复制代码
- 提交消费请求,消息提交到内部的线程池
// 提交消费请求,消息提交到内部的线程池
DefaultMQPushConsumerImpl.this.consumeMessageService.submitConsumeRequest(
pullResult.getMsgFoundList(),
processQueue,
pullRequest.getMessageQueue(),
dispatchToConsume);
复制代码
入口:org.apache.rocketmq.client.impl.consumer.ConsumeMessageService#submitConsumeRequest
获取 ConsumeRequest对象,拿到当前主题的监听器
这里拿到的监听器,就是我们在启动消费者的时候所注册的,监听到消息后执行相关的业务逻辑
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
...
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
复制代码
在这里触发我们在一开始重写的consumeMessage方法,这里msgs用Collections.unmodifiableList进行包装,意思就是不可以修改的,是一个只读的List
ConsumeRequest consumeRequest = new ConsumeRequest(msgs, processQueue, messageQueue);
status = listener.consumeMessage(Collections.unmodifiableList(msgs), context);
复制代码
- ProcessQueue中移除已经处理的消息,同时更新Offset位置
ConsumeMessageConcurrentlyService.this.processConsumeResult(status, context, this);
复制代码
public void processConsumeResult(
final ConsumeConcurrentlyStatus status,
final ConsumeConcurrentlyContext context,
final ConsumeRequest consumeRequest
) {
int ackIndex = context.getAckIndex();
if (consumeRequest.getMsgs().isEmpty())
return;
switch (status) {
case CONSUME_SUCCESS:
if (ackIndex >= consumeRequest.getMsgs().size()) {
ackIndex = consumeRequest.getMsgs().size() - 1;
}
int ok = ackIndex + 1;
int failed = consumeRequest.getMsgs().size() - ok;
this.getConsumerStatsManager().incConsumeOKTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(), ok);
this.getConsumerStatsManager().incConsumeFailedTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(), failed);
break;
...
}
switch (this.defaultMQPushConsumer.getMessageModel()) {
...
case CLUSTERING:
List<MessageExt> msgBackFailed = new ArrayList<MessageExt>(consumeRequest.getMsgs().size());
for (int i = ackIndex + 1; i < consumeRequest.getMsgs().size(); i++) {
MessageExt msg = consumeRequest.getMsgs().get(i);
boolean result = this.sendMessageBack(msg, context);
if (!result) {
msg.setReconsumeTimes(msg.getReconsumeTimes() + 1);
msgBackFailed.add(msg);
}
}
// 如果存在失败消息,则过5秒在定时执行
if (!msgBackFailed.isEmpty()) {
consumeRequest.getMsgs().removeAll(msgBackFailed);
this.submitConsumeRequestLater(msgBackFailed, consumeRequest.getProcessQueue(), consumeRequest.getMessageQueue());
}
break;
...
}
long offset = consumeRequest.getProcessQueue().removeMessage(consumeRequest.getMsgs());
// 更新Offset位置
if (offset >= 0 && !consumeRequest.getProcessQueue().isDropped()) {
this.defaultMQPushConsumerImpl.getOffsetStore()
.updateOffset(consumeRequest.getMessageQueue(), offset, true);
}
}
复制代码
- 最后pullRequest放入pullRequestQueue中
入口:org.apache.rocketmq.client.impl.consumer.PullMessageService#executePullRequestImmediately
DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
复制代码
消息消费进度提交
- 成功消费一条消息后,更新本地缓存表
- 每5s向Broker提交消息消费进度
- Broker每5s将进度持久化到consumerOffset.json
总结
目前只是将整体的一个消费端监听消息的流程了解清楚,里面还有许多细节需要去推敲~