highlight: arduino-light
消息拉取概述
消息消费模式有两种模式:广播模式与集群模式。
广播模式比较简单,每一个消费者需要拉取订阅主题下所有队列的消息。本文重点讲解集群模式。
在集群模式下,同一个消费者组内有多个消息消费者,同一个主题存在多个消费队列,消费者通过负载均衡的方式消费消息。
消息队列负载均衡,通常的作法是一个消息队列在同一个时间只允许被一个消费者消费,一个消息消费者可以同时消费多个消息队列,并且对每个分配给自己的队列加锁。
从MQClientInstance的启动流程中可以看出,RocketMQ使用一个单独的线程PullMessageService来负责消息的拉取,并且这个拉取消息的动作是1个死循环。
```java @Override public void run() { log.info(this.getServiceName() + " service started");
while (!this.isStopped()) {
try {
PullRequest pullRequest = this.pullRequestQueue.take();
this.pullMessage(pullRequest);
} catch (InterruptedException ignored) {
} catch (Exception e) {
log.error("Pull Message Service Run Method exception", e);
}
}
log.info(this.getServiceName() + " service end");
} ```
pullRequestQueue的类型是LinkedBlockingQueue,LinkedBlockingQueue是一个线程安全的阻塞队列,先进先出。 LinkedBlockingQueue作为生产者消费者的首选,可以指定容量也可以不指定,不指定的话默认最大是Integer.MAX_VALUE。
主要用到put和take方法,take和put都是阻塞式的操作,两者协作完成生产者和消费者的线程模型。 put方法将一个对象放到队列尾部,在队列满的时候会阻塞,直到有队列成员被消费。 take方法从head取一个对象,在队列为空的时候会阻塞,直到有队列成员被放进来。
this.pullRequestQueue.take()
就是从pullRequestQueue中获取一个pullRequest,使用pullRequest去拉取消息。
**既然有take,那么必然有put!
pullRequestQueue中pullRequest的从哪来的呢?
拉取消息是在负载均衡服务时,给当前消费者分配了一些队列,将这些队列构建成一个一个的pullRequest。
放入了pullRequestQueue
理一下pullRequestQueue和PullRequest以及ProcessQueue的关系
pullRequestQueue的类型是LinkedBlockingQueue,是阻塞队列,实现了生产消费模型,里面放的是一个一个的PullRequest。
PullRequest中封装了ProcessQueue。
ProcessQueue是MessageQueue在消费端的重现、快照。
消息拉取流程
1.消费者每次做完负载均衡之后,都会构建PullRequest放到pullRequestQueue也就是LinkedBlockingQueue。
java //构建PullRequest PullRequest pullRequest = new PullRequest(); pullRequest.setConsumerGroup(consumerGroup); //拉取的消费进度 pullRequest.setNextOffset(nextOffset); pullRequest.setMessageQueue(mq); pullRequest.setProcessQueue(pq); //将新加入构建PullRequest的放入pullRequestList pullRequestList.add(pullRequest); changed = true;
2.PullRequest中封装了ProcessQueue,ProcessQueue是MessageQueue在消费端的重现、快照。
3.PullMessageService线程调用LinkedBlockingQueue的take方法从pullRequestQueue获取PullRequest。
4.PullMessageService线程使用PullRequest从消息服务器默认每次拉取32条消息,按消息的队列偏移顺序存放在ProcessQueue中。
5.PullMessageService线程将消息提交到消费者消费线程池,消息成功消费后ProcessQueue中移除。
上面就是消息拉取的整体流程。
消息拉取源码
上一篇文章我们讲到了RebalancePushImpl#dispatchPullRequest。
java @Override public void dispatchPullRequest(List<PullRequest> pullRequestList) { //遍历所有的pullRequest for (PullRequest pullRequest : pullRequestList) { this.defaultMQPushConsumerImpl .executePullRequestImmediately(pullRequest); } }
那么我们这一篇文章就接着往下看。
1.放入ProcessQueue
PullMessageService#executePullRequestImmediately
```java public void executePullRequestImmediately(final PullRequest pullRequest) { try { this.pullRequestQueue.put(pullRequest); } catch (InterruptedException e) { log.error("executePullRequestImmediately pullRequestQueue.put", e); } }
//把PullRequest放到pullRequestQueue后就会被拉取消息的线程服务取走
//做真正的拉取操作
public void put(E e) throws InterruptedException {
if (e == null) throw new NullPointerException();
int c = -1;
Node<E> node = new Node<E>(e);
final ReentrantLock putLock = this.putLock;
final AtomicInteger count = this.count;
putLock.lockInterruptibly();
try {
//生产者消费者模型
while (count.get() == capacity) {
notFull.await();
}
enqueue(node);
c = count.getAndIncrement();
if (c + 1 < capacity)
notFull.signal();
} finally {
putLock.unlock();
}
if (c == 0)
signalNotEmpty();
}
```
发现只是将PullRequest做了入队操作,代码走到这里发现已经点不下去了。
所以我们搜索一下pullRequestQueue相关的代码。
2.拉取消息定时任务取出pullRequest开始拉取消息
默认是CommunicationMode.ASYNC,异步拉取消息。
PullMessageService#run
```java @Override public void run() { log.info(this.getServiceName() + " service started"); while (!this.isStopped()) { try { //从pullRequestQueue获取pullRequest PullRequest pullRequest = this.pullRequestQueue.take(); this.pullMessage(pullRequest); } catch (InterruptedException ignored) { } catch (Exception e) { log.error("Pull Message Service Run Method exception", e); } }
log.info(this.getServiceName() + " service end");
} ```
PullMessageService是MQClientInstance#start
方法执行this.pullMessageService.start();
时启动。
从pullRequestQueue获取pullRequest,然后调用pullMessage(pullRequest);
PullMessageService#pullMessage
java private void pullMessage(final PullRequest pullRequest) { //根据消费组选择消费者 //MQConsumerInner prev = this.consumerTable.putIfAbsent(group, consumer); //如果prev不为空 只会打印日志 final MQConsumerInner consumer = this.mQClientFactory.selectConsumer (pullRequest.getConsumerGroup()); // 使用消费者拉取消息 if (consumer != null) { DefaultMQPushConsumerImpl impl = (DefaultMQPushConsumerImpl) consumer; impl.pullMessage(pullRequest); } else { log.warn("No matched consumer for the PullRequest,drop it"); } }
this.consumerTable.putIfAbsent(group, consumer);
是根据消费组选择消费者,点进去看看。
java public MQConsumerInner selectConsumer(final String group) { return this.consumerTable.get(group); }
consumerTable
是MQClientInstance
的1个属性,源码如下:
java ConcurrentMap<String/* group */, MQConsumerInner> consumerTable = new ConcurrentHashMap<String, MQConsumerInner>();
可以看到consumerTable的类型是1个ConcurrentHashMap,key是consumerGroup,value是MQConsumerInner。
但是为什么选择的消费者只有1个?
在消费者启动的时候会调用consumerTable.putIfAbsent(group, consumer);
往map中放入消费者组和消费者。
假如在main方法中for循环启动2个消费者,因为每个消费者启动都会执行以下代码:
```java public synchronized boolean registerConsumer(final String group, final MQConsumerInner consumer) { if (null == group || null == consumer) { return false; }
MQConsumerInner prev = this.consumerTable.putIfAbsent(group, consumer);
if (prev != null) {
log.warn("the consumer group[" + group + "] exist already.");
return false;
}
return true;
} 在registerConsumer方法会执行以下代码
java consumerTable.putIfAbsent(group, consumer); ```
所以第一个消费者启动的时候,执行putIfAbsent成功。
第二个消费者启动的时候,执行putIfAbsent失败。
也就是在1个JVM中同1个消费者组只有1个消费者。
2.1流控:拉取次数&拉取流量
- processQueue 的消息数量 大于 1000, processQueue 的消息大小 大于 100 MB,将延迟 50 毫秒后拉取消息
- processQueue 中偏移量最大的消息与偏移量最小的消息的跨度超过 2000 则延迟 50 毫秒再拉取消息。
- 根据主题拉取订阅的消息,如果为空,延迟 3 秒,再拉取。
2.3如果是顺序消费要先加锁
顺序消费需要先加锁,如果本地的消费进度大于远程broker的消费进度。
会使用远程的broker的消费进度进行拉取
2.4注册回调函数
2.5发送拉取消息请求
2.6执行回调函数,将拉取的消息放到线程池消费
对于顺序消费
status = messageListener.consumeMessage(Collections.unmodifiableList(msgs), context);
DefaultMQPushConsumerImpl#pullMessage
```java public void pullMessage(final PullRequest pullRequest) { final ProcessQueue processQueue = pullRequest.getProcessQueue(); if (processQueue.isDropped()) { log.info("the pull request[{}] is dropped.", pullRequest.toString()); return; } //设置上次更新时间 用于移除不重要的mq pullRequest.getProcessQueue() .setLastPullTimestamp(System.currentTimeMillis());
//判断当前消费者是否还在RUNNING
//PULL_TIME_DELAY_MILLS_WHEN_SUSPEND=3
try {
this.makeSureStateOK();
} catch (MQClientException e) {
log.warn("pullMessage exception, consumer state not ok", e);
//延迟任务:3秒以后执行executePullRequestImmediately
//将pullRequest重新放入pullRequestQueue
this.executePullRequestLater(pullRequest,
PULL_TIME_DELAY_MILLS_WHEN_EXCEPTION);
return;
}
//PULL_TIME_DELAY_MILLS_WHEN_SUSPEND=3
//如果在在暂停状态
//延迟任务:3秒以后执行executePullRequestImmediately
//将pullRequest重新放入pullRequestQueue
if (this.isPause()) {
log.warn("consumer was paused, execute pull request later");
this.executePullRequestLater(pullRequest,
PULL_TIME_DELAY_MILLS_WHEN_SUSPEND);
return;
}
//private int pullThresholdForQueue = 1000;
//流量控制:
//拉取次数大于1000次
long cachedMessageCount = processQueue.getMsgCount().get();
long cachedMessageSizeInMiB = processQueue.getMsgSize().get() / (1024 * 1024);
if (cachedMessageCount > this.defaultMQPushConsumer.getPullThresholdForQueue()) {
//PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL = 50
//50毫秒以后在拉取
this.executePullRequestLater(pullRequest,
PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL);
//如果正好是1000的倍数次还要打印日志
if ((queueFlowControlTimes++ % 1000) == 0) {
log.warn("exceeds the threshold {}, so do flow control");
}
return;
}
//private int pullThresholdForQueue = 1000;
//流量控制:
//拉取的数据累计大于100兆
if (cachedMessageSizeInMiB >
this.defaultMQPushConsumer.getPullThresholdSizeForQueue()) {
//50毫秒以后在拉取
this.executePullRequestLater(pullRequest,
PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL);
//如果正好是1000的倍数次还要打印日志
if ((queueFlowControlTimes++ % 1000) == 0) {
log.warn("exceeds the threshold {} MiB, so do flow control");
}
return;
}
//如果不是顺序消费
if (!this.consumeOrderly) {
//则检查ProcessQueue对象的msgTreeMap。
//TreeMap<Long,MessageExt>变量的第一个key值与最后一个key值之间的差额
//该key值表示查询的队列偏移量queueoffset
//若差额大于阈值(由DefaultMQPushConsumer. consumeConcurrentlyMaxSpan指定
//默认是2000)
//则调用PullMessageService.executePullRequestLater方法,
//在50毫秒之后重新将该PullRequest请求
//放入PullMessageService.pullRequestQueue队列中;
//并跳出该方法;
if (processQueue.getMaxSpan() >
this.defaultMQPushConsumer.getConsumeConcurrentlyMaxSpan()) {
//延迟50毫秒在拉取
this.executePullRequestLater
(pullRequest,PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL);
if ((queueMaxSpanFlowControlTimes++ % 1000) == 0) {
log.warn("the queue's messages, span too long, flow control");
}
return;
}
} else {
//如果是顺序消费
//负载均衡时加的锁
if (processQueue.isLocked()) {
//不是第一次拉取
if (!pullRequest.isLockedFirst()) {
//计算从哪里开始拉取消息即拉取消息的偏移量
//默认是CONSUME_FROM_LAST_OFFSET
final long offset =
this.rebalanceImpl
.computePullFromWhere(pullRequest.getMessageQueue());
//本地的偏移量大于服务器的偏移量
boolean brokerBusy = offset < pullRequest.getNextOffset();
//重新设置本地的偏移量为从broker拉取的偏移量
//这里也会重复消费吧
pullRequest.setLockedFirst(true);
pullRequest.setNextOffset(offset);
}
} else {
this.executePullRequestLater(pullRequest,
PULL_TIME_DELAY_MILLS_WHEN_EXCEPTION);
return;
}
}
final SubscriptionData subscriptionData
= this.rebalanceImpl
.getSubscriptionInner()
.get(pullRequest.getMessageQueue().getTopic());
if (null == subscriptionData) {
//3秒后重新放入
this.executePullRequestLater(pullRequest,
PULL_TIME_DELAY_MILLS_WHEN_EXCEPTION);
log.warn("find the consumer's subscription failed");
return;
}
final long beginTimestamp = System.currentTimeMillis();
/*
注意这里只是注册了1个回调函数
功能就是把拉取到的消息保存到processqueue上,然后进行客户端实际业务消费,
最后把pullRequest重新添加到阻塞队列供pullmessageservice服务线程重新拉取
*/
PullCallback pullCallback = new PullCallback() {
@Override
public void onSuccess(PullResult pullResult) {
if (pullResult != null) {
//如果拉取的结果是FOUND 即 拉取到了数据
//那么解析二进制数据为具体的消息列表
pullResult= DefaultMQPushConsumerImpl
.this.pullAPIWrapper.processPullResult
(pullRequest.getMessageQueue(),
pullResult,subscriptionData);
//转换拉取结果
switch (pullResult.getPullStatus()) {
//消息拉取结果,消息拉取到了
case FOUND:
//拉取到的消息的位置,相对于consumer queue
long prevRequestOffset
= pullRequest.getNextOffset();
//下次待拉取的消息在consumer queue的位置
pullRequest.setNextOffset
(pullResult.getNextBeginOffset());
//拉取消息花费的时间
long pullRT =
System.currentTimeMillis() - beginTimestamp;
//累加响应时间
DefaultMQPushConsumerImpl
.this
.getConsumerStatsManager()
.incPullRT(pullRequest.getConsumerGroup(),
pullRequest.getMessageQueue().getTopic(), pullRT);
long firstMsgOffset = Long.MAX_VALUE;
//如果拉取的消息列表为空
if (pullResult.getMsgFoundList() == null
||pullResult.getMsgFoundList().isEmpty()) { //放入拉取队列再拉一次
defaultMQPushConsumerImpl.this
.executePullRequestImmediately(pullRequest);
//如果拉取的消息列表不为空
} else {
//拉取到的消息中的第一个消息在commitlog的位置
//获取第一个消息的偏移量
firstMsgOffset =
pullResult
.getMsgFoundList()
.get(0)
.getQueueOffset();
//累加响应时间
DefaultMQPushConsumerImpl
.this.getConsumerStatsManager()
.incPullTPS
(pullRequest.getConsumerGroup(),
pullRequest.getMessageQueue().getTopic(),
pullResult.getMsgFoundList().size());
//重要代码★:
//将消息放入processQueue的treemap
//把拉取到的消息保存到ProcessQueue.msgTreeMap
boolean dispatchToConsume =
processQueue.putMessage
(pullResult.getMsgFoundList());
//提交给consumeMessageService
//客户端消费并发执行 ConsumeRequest.run()
//客户端消费并发执行 ConsumeRequest.run()
//客户端消费并发执行 ConsumeRequest.run()
//消费的代码要重点看
DefaultMQPushConsumerImpl
.this
.consumeMessageService
.submitConsumeRequest
(pullResult.getMsgFoundList(),
processQueue,
pullRequest.getMessageQueue(),
dispatchToConsume);
// private long pullInterval = 0;
// 如果interval大于0 那么延迟interval毫秒后拉取
// 默认是0这个应该是可以配置的 设置PullInterval
// 可以在拉取消息完成后 等待一段时间在拉取
if (DefaultMQPushConsumerImpl.this.
defaultMQPushConsumer.getPullInterval() > 0) {
DefaultMQPushConsumerImpl.this
.executePullRequestLater(
pullRequest,
DefaultMQPushConsumerImpl.this
.defaultMQPushConsumer
.getPullInterval());
} else {
//把PullRequest重新保存到
//PullMessageService.pullRequestQueue阻塞队列
//供消费线程继续执行消息继续拉取
DefaultMQPushConsumerImpl
.this
.executePullRequestImmediately(pullRequest);
}
}
//下一次请求的偏移量应该大于前一次的偏移量
//第一个消息的偏移量应该大于前一次的偏移量
if (pullResult.getNextBeginOffset() < prevRequestOffset
|| firstMsgOffset < prevRequestOffset) {
log.warn("pull message result maybe data wrong");
}
break;
//未拉取到消息
case NO_NEW_MSG:
//拉取下一个新的offset 这个偏移量是服务器端获取到的
pullRequest.setNextOffset
(pullResult.getNextBeginOffset());
//本次拉取到的消息总size==0,则更新消费端本地的offset
DefaultMQPushConsumerImpl
.this
.correctTagsOffset(pullRequest);
//把pullRequest重新保存到pullmessageservice的阻塞队列
//供拉取线程重新执行
DefaultMQPushConsumerImpl
.this.executePullRequestImmediately(pullRequest);
break;
//消息拉取到了但是不匹配tag,broker进行tag过滤
case NO_MATCHED_MSG:
//拉取下一个新的offset 这个偏移量是服务器端获取到的
pullRequest.setNextOffset
(pullResult.getNextBeginOffset());
DefaultMQPushConsumerImpl
.this.correctTagsOffset(pullRequest);
//把pullRequest重新保存到pullmessageservice的
//阻塞队列供拉取线程重新执行
DefaultMQPushConsumerImpl
.this.executePullRequestImmediately(pullRequest);
break;
//offset非法,那么该pullRequest不会被重新进行拉取
case OFFSET_ILLEGAL:
log.warn("the pull request offset illegal");
//拉取下一个新的offset 这个偏移量是服务器端获取到的
pullRequest
.setNextOffset(pullResult.getNextBeginOffset());
pullRequest.getProcessQueue().setDropped(true);
DefaultMQPushConsumerImpl
.this.executeTaskLater(new Runnable() {
@Override
public void run() {
try {
DefaultMQPushConsumerImpl
.this.offsetStore
.updateOffset
(pullRequest.getMessageQueue(),
pullRequest.getNextOffset(), false);
DefaultMQPushConsumerImpl.this.offsetStore
.persist(pullRequest.getMessageQueue());
DefaultMQPushConsumerImpl.this
.rebalanceImpl
.removeProcessQueue
(pullRequest.getMessageQueue());
log.warn("fix the pull request offset");
} catch (Throwable e) {
log.error("executeTaskLater Exception", e);
}
}
}, 10000);
break;
default:
break;
}
}
}
@Override
public void onException(Throwable e) {
if (!pullRequest.getMessageQueue().getTopic()
.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
log.warn("execute the pull request exception", e);
}
DefaultMQPushConsumerImpl.this
.executePullRequestLater
(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_EXCEPTION);
}
};
boolean commitOffsetEnable = false;
long commitOffsetValue = 0L;
if (MessageModel.CLUSTERING == this.defaultMQPushConsumer.getMessageModel()) {
commitOffsetValue = this.offsetStore.readOffset
(pullRequest.getMessageQueue(),
ReadOffsetType.READ_FROM_MEMORY);
if (commitOffsetValue > 0) {
commitOffsetEnable = true;
}
}
String subExpression = null;
boolean classFilter = false;
SubscriptionData sd = this.rebalanceImpl.getSubscriptionInner() .get(pullRequest.getMessageQueue().getTopic());
if (sd != null) {
if (this.defaultMQPushConsumer.isPostSubscriptionWhenPull()
&& !sd.isClassFilterMode()) {
subExpression = sd.getSubString();
}
classFilter = sd.isClassFilterMode();
}
int sysFlag = PullSysFlag.buildSysFlag(
commitOffsetEnable, // commitOffset
true, // suspend
subExpression != null, // subscription
classFilter // class filter
);
//真正拉取消息的逻辑在这里
//上面的代码是拉取到消息的回调pullCallback
//private static final long BROKER_SUSPEND_MAX_TIME_MILLIS = 1000 * 15;
//拉取消息的最大挂起时间是15秒
//private static final long CONSUMER_TIMEOUT_MILLIS_WHEN_SUSPEND = 1000 * 30;
//消费端的超时时间
try {
this.pullAPIWrapper.pullKernelImpl(
pullRequest.getMessageQueue(),
subExpression,
subscriptionData.getExpressionType(),
subscriptionData.getSubVersion(),
pullRequest.getNextOffset(),
this.defaultMQPushConsumer.getPullBatchSize(),
sysFlag,
commitOffsetValue,
BROKER_SUSPEND_MAX_TIME_MILLIS,
CONSUMER_TIMEOUT_MILLIS_WHEN_SUSPEND,
CommunicationMode.ASYNC,
//注册的回调函数
pullCallback
);
} catch (Exception e) {
log.error("pullKernelImpl exception", e);
this.executePullRequestLater(pullRequest,
PULL_TIME_DELAY_MILLS_WHEN_EXCEPTION);
}
}
```
PullAPIWrapper#pullKernelImpl
```java public PullResult pullKernelImpl( final MessageQueue mq, final String subExpression, final String expressionType, final long subVersion, final long offset, final int maxNums, final int sysFlag, final long commitOffset, final long brokerSuspendMaxTimeMillis, final long timeoutMillis, final CommunicationMode communicationMode, final PullCallback pullCallback ) throws MQClientException, RemotingException, MQBrokerException, InterruptedException { FindBrokerResult findBrokerResult = this.mQClientFactory.findBrokerAddressInSubscribe(mq.getBrokerName(), this.recalculatePullFromWhichNode(mq), false); if (null == findBrokerResult) { this.mQClientFactory.updateTopicRouteInfoFromNameServer(mq.getTopic()); findBrokerResult = this.mQClientFactory.findBrokerAddressInSubscribe(mq.getBrokerName(), this.recalculatePullFromWhichNode(mq), false); }
if (findBrokerResult != null) {
{
// check version
if (!ExpressionType.isTagType(expressionType)
&& findBrokerResult.getBrokerVersion() < MQVersion.Version.V4_1_0_SNAPSHOT.ordinal()) {
throw new MQClientException("The broker[" + mq.getBrokerName() + ", "
+ findBrokerResult.getBrokerVersion() + "] does not upgrade to support for filter message by " + expressionType, null);
}
}
int sysFlagInner = sysFlag;
if (findBrokerResult.isSlave()) {
sysFlagInner = PullSysFlag.clearCommitOffsetFlag(sysFlagInner);
}
PullMessageRequestHeader requestHeader = new PullMessageRequestHeader();
requestHeader.setConsumerGroup(this.consumerGroup);
requestHeader.setTopic(mq.getTopic());
requestHeader.setQueueId(mq.getQueueId());
requestHeader.setQueueOffset(offset);
requestHeader.setMaxMsgNums(maxNums);
//是否允许挂起的标志
requestHeader.setSysFlag(sysFlagInner);
requestHeader.setCommitOffset(commitOffset);
//15秒
requestHeader.setSuspendTimeoutMillis(brokerSuspendMaxTimeMillis);
requestHeader.setSubscription(subExpression);
requestHeader.setSubVersion(subVersion);
requestHeader.setExpressionType(expressionType);
String brokerAddr = findBrokerResult.getBrokerAddr();
if (PullSysFlag.hasClassFilterFlag(sysFlagInner)) {
brokerAddr =
computPullFromWhichFilterServer(mq.getTopic(), brokerAddr);
}
PullResult pullResult =
this.mQClientFactory.getMQClientAPIImpl().pullMessage(
brokerAddr,
requestHeader,
timeoutMillis,
communicationMode,
pullCallback);
return pullResult;
}
throw new MQClientException
("The broker[" + mq.getBrokerName() + "] not exist", null);
}
```
RebalancePushImpl#computePullFromWhere
3.消息进度如何计算的
```java @Override public long computePullFromWhere(MessageQueue mq) { long result = -1; //默认是从上一个OFFSET消费 默认策略,即跳过消费过的历史消息 //private ConsumeFromWhere consumeFromWhere = // ConsumeFromWhere.CONSUMEFROMLASTOFFSET; final ConsumeFromWhere consumeFromWhere =
this.defaultMQPushConsumerImpl .getDefaultMQPushConsumer().getConsumeFromWhere(); final OffsetStore offsetStore = this.defaultMQPushConsumerImpl.getOffsetStore(); switch (consumeFromWhere) { case CONSUMEFROMLASTOFFSETANDFROMMINWHENBOOTFIRST: case CONSUMEFROMMINOFFSET: case CONSUMEFROMMAXOFFSET: //返回本地存储的消费偏移量获取上一次消费的偏移量 //注意集群消费模式offsetStore是RemoteBrokerOffsetStore //注意广播消费模式offsetStore是LocalFileOffsetStore#updateOffset case CONSUMEFROMLASTOFFSET: { //默认根据存储的消息偏移量消费 //根据消费模式 判断是从本地还是远程 //集群:远程 广播:本地 //第一次会从远程拉取 //后面都是从offsetTable读取 long lastOffset = offsetStore.readOffset (mq, ReadOffsetType.READFROMSTORE); //大于等于0说明是正常的消费进度 if (lastOffset >= 0) { //返回消费偏移量 result = lastOffset; } // First start,no offset else if (-1 == lastOffset) { if (mq.getTopic().startsWith(MixAll.RETRYGROUPTOPICPREFIX)) { result = 0L; } else { try { //从Broker中拉取该队列对应的maxOffset result = this.mQClientFactory .getMQAdminImpl().maxOffset(mq); } catch (MQClientException e) { result = -1; } } } else { result = -1; } break; } //从队列最开始开始消费,即历史消息(还储存在broker的)全部消费一遍 case CONSUMEFROMFIRSTOFFSET: { long lastOffset = offsetStore.readOffset (mq, ReadOffsetType.READFROMSTORE); if (lastOffset >= 0) { result = lastOffset; } else if (-1 == lastOffset) { result = 0L; } else { result = -1; } break; } //从某个时间点开始消费,和setConsumeTimestamp()配合使用,默认是半个小时以前 case CONSUMEFROMTIMESTAMP: { long lastOffset = offsetStore.readOffset (mq, ReadOffsetType.READFROMSTORE); if (lastOffset >= 0) { result = lastOffset; } else if (-1 == lastOffset) { if (mq.getTopic().startsWith(MixAll.RETRYGROUPTOPICPREFIX)) { try { result = this.mQClientFactory .getMQAdminImpl().maxOffset(mq); } catch (MQClientException e) { result = -1; } } else { try { long timestamp = UtilAll.parseDate( this.defaultMQPushConsumerImpl .getDefaultMQPushConsumer().getConsumeTimestamp(), UtilAll.YYYYMMDDHHMMSS).getTime()); result = this.mQClientFactory.getMQAdminImpl() .searchOffset(mq, timestamp); } catch (MQClientException e) { result = -1; } } } else { result = -1; } break; }
default:
break;
}
return result;
}
```
CONSUME_FROM_LAST_OFFSET计算逻辑
```java case CONSUMEFROMLASTOFFSET: { //读取远程broker的偏移量 long lastOffset = offsetStore.readOffset(mq, ReadOffsetType.READFROMSTORE);
//如果返回的偏移量大于等于0,则直接使用该offset //这个也能理解,大于等于0,表示查询到有效的消息消费进度,从该有效进度开始消费。 //但我们要特别留意lastOffset为0是什么场景 //因为返回0,并不会执行CONSUMEFROMLASTOFFSET(语义)。 if (lastOffset >= 0) {
result = lastOffset; //如果lastOffset为-1//表示当前并未存储其有效偏移量,可以理解为第一次消费
}else if (-1 == lastOffset) {
//如果是消费组重试主题,从重试队列偏移量为0开始消费;
if (mq.getTopic().startsWith(MixAll.RETRYGROUPTOPICPREFIX)) {
result = 0L; } else { try { // 如果是普通主题,则从队列当前的最大的有效偏移量开始消费 // 即CONSUMEFROMLASTOFFSET语义的实现。 result = this.mQClientFactory.getMQAdminImpl().maxOffset(mq);
} catch (MQClientException e) { //如果从远程服务拉取最大偏移量拉取异常或其他情况,则使用-1作为第一次拉取偏移量。 result = -1; } } } else { result = -1;
} break; }
@Override
public long readOffset(final MessageQueue mq, final ReadOffsetType type) {
if (mq != null) {
switch (type) {
case MEMORY_FIRST_THEN_STORE:
case READ_FROM_MEMORY: {
AtomicLong offset = this.offsetTable.get(mq);
if (offset != null) {
return offset.get();
} else if (ReadOffsetType.READ_FROM_MEMORY == type) {
return -1;
}
}
case READ_FROM_STORE: {
try {
//从broker读取进度
long brokerOffset = this.fetchConsumeOffsetFromBroker(mq);
AtomicLong offset = new AtomicLong(brokerOffset);
this.updateOffset(mq, offset.get(), false);
return brokerOffset;
}
// No offset in broker
catch (MQBrokerException e) {
return -1;
}
//Other exceptions
catch (Exception e) {
log.warn("fetchConsumeOffsetFromBroker exception, " + mq, e);
return -2;
}
}
default:
break;
}
}
return -1;
}
```
接下来我们将以集群模式来查看一下消息消费进度的查询逻辑,集群模式的消息进度存储管理器实现为: RemoteBrokerOffsetStore,最终Broker端的命令处理类为:ConsumerManageProcessor。
```java ConsumerManageProcessor#queryConsumerOffset private RemotingCommand queryConsumerOffset(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException { final RemotingCommand response = RemotingCommand.createResponseCommand(QueryConsumerOffsetResponseHeader.class); final QueryConsumerOffsetResponseHeader responseHeader = (QueryConsumerOffsetResponseHeader) response.readCustomHeader(); final QueryConsumerOffsetRequestHeader requestHeader = (QueryConsumerOffsetRequestHeader) request .decodeCommandCustomHeader(QueryConsumerOffsetRequestHeader.class); //从消费消息进度文件中查询消息消费进度。 long offset = this.brokerController.getConsumerOffsetManager().queryOffset( requestHeader.getConsumerGroup(), requestHeader.getTopic(), requestHeader.getQueueId());
//如果消息消费进度文件中存储该队列的消息进度,其返回的offset必然会大于等于0 //则直接返回该偏移量该客户端,客户端从该偏移量开始消费。 if (offset >= 0) {
responseHeader.setOffset(offset); response.setCode(ResponseCode.SUCCESS); response.setRemark(null); } else { //如果未从消息消费进度文件中查询到其进度,offset为-1。 //则首先获取该主题、消息队列当前在Broker服务器中的最小偏移量。 long minOffset = this.brokerController.getMessageStore() .getMinOffsetInQueue(requestHeader.getTopic(), requestHeader.getQueueId()); //如果小于等于0 //返回0则表示该队列的文件还未曾删除过 //并且其最小偏移量对应的消息存储在内存中而不是存在磁盘中,则返回偏移量0 //这就意味着ConsumeFromWhere中定义的三种枚举类型都不会生效,直接从0开始消费
if (minOffset <= 0 && !this.brokerController.getMessageStore().checkInDiskByConsumeOffset(
requestHeader.getTopic(), requestHeader.getQueueId(), 0)) { responseHeader.setOffset(0L); response.setCode(ResponseCode.SUCCESS); response.setRemark(null); } else { //如果偏移量小于等于0,但其消息已经存储在磁盘中 //此时返回未找到,最终RebalancePushImpl#computePullFromWhere中得到的偏移量为-1。 response.setCode(ResponseCode.QUERYNOTFOUND); response.setRemark ("Not found, V306_SNAPSHOT maybe this group consumer boot first"); } } return response; }
参考链接:https://blog.csdn.net/prestigeding/article/details/96576932 查询文件消费进度:假如文件中是0:10那么下次就从10开始拉
java public long queryOffset(final String group, final String topic, final int queueId) { // topic@group String key = topic + TOPICGROUPSEPARATOR + group; ConcurrentMap
map = this.offsetTable.get(key); if (null != map) { Long offset = map.get(queueId); if (offset != null) return offset; }
return -1;
}
```
4.组装消息
PullMessageProcessor#processRequest
PullMessageProcessor#processReques
java //构建消息过滤器 MessageFilter messageFilter; if (this.brokerController.getBrokerConfig().isFilterSupportRetry()) { messageFilter = new ExpressionForRetryMessageFilter(subscriptionData, consumerFilterData, this.brokerController.getConsumerFilterManager()); } else { messageFilter = new ExpressionMessageFilter(subscriptionData, consumerFilterData, this.brokerController.getConsumerFilterManager()); } //调用MessageStore.getMessage查找消息 final GetMessageResult getMessageResult = this.brokerController.getMessageStore().getMessage( requestHeader.getConsumerGroup(), //消费组名称 requestHeader.getTopic(), //主题名称 requestHeader.getQueueId(), //队列ID requestHeader.getQueueOffset(), //待拉取偏移量 requestHeader.getMaxMsgNums(), //最大拉取消息条数 messageFilter //消息过滤器 );
DefaultMessageStore#getMessage
```java public GetMessageResult getMessage(final String group, final String topic, final int queueId, final long offset, final int maxMsgNums, final MessageFilter messageFilter) { if (this.shutdown) { log.warn("message store has shutdown, so getMessage is forbidden"); return null; }
if (!this.runningFlags.isReadable()) {
log.warn("message store is not readable, so getMessage is forbidden");
return null;
}
long beginTime = this.getSystemClock().now();
GetMessageStatus status = GetMessageStatus.NO_MESSAGE_IN_QUEUE;
//offset是待拉取偏移量
//查找下一次队列偏移量
long nextBeginOffset = offset;
//当前消息队列最小偏移量
long minOffset = 0;
//当前消息队列最大偏移量
long maxOffset = 0;
//预定义返回的响应结果
GetMessageResult getResult = new GetMessageResult();
//获取commitLog最大偏移量
final long maxOffsetPy = this.commitLog.getMaxOffset();
//根据主题名称和队列编号获取消息消费队列ConsumeQueue
ConsumeQueue consumeQueue = findConsumeQueue(topic, queueId);
if (consumeQueue != null) {
minOffset = consumeQueue.getMinOffsetInQueue();
maxOffset = consumeQueue.getMaxOffsetInQueue();
//消息偏移量异常情况校对下一次拉取偏移量
if (maxOffset == 0) {
//表示当前消息队列中没有消息
status = GetMessageStatus.NO_MESSAGE_IN_QUEUE;
nextBeginOffset = nextOffsetCorrection(offset, 0);
} else if (offset < minOffset) {
//待拉取消息的偏移量小于队列的最小偏移量
status = GetMessageStatus.OFFSET_TOO_SMALL;
nextBeginOffset = nextOffsetCorrection(offset, minOffset);
} else if (offset == maxOffset) {
//待拉取偏移量为队列最大偏移量
status = GetMessageStatus.OFFSET_OVERFLOW_ONE;
nextBeginOffset = nextOffsetCorrection(offset, offset);
} else if (offset > maxOffset) {
//偏移量越界
status = GetMessageStatus.OFFSET_OVERFLOW_BADLY;
if (0 == minOffset) {
nextBeginOffset = nextOffsetCorrection(offset, minOffset);
} else {
nextBeginOffset = nextOffsetCorrection(offset, maxOffset);
}
} else {
SelectMappedBufferResult bufferConsumeQueue
= consumeQueue.getIndexBuffer(offset);
if (bufferConsumeQueue != null) {
try {
status = GetMessageStatus.NO_MATCHED_MESSAGE;
long nextPhyFileStartOffset = Long.MIN_VALUE;
long maxPhyOffsetPulling = 0;
int i = 0;
//获取最大可以拉取的字节数
//最大每次可以拉取的字节数是16000
//最大每次可以拉取800条
//maxMsgNums默认是32,每个消费队列的索引大小是20字节,也就是640字节
final int maxFilterMessageCount
= Math.max(16000, maxMsgNums *
ConsumeQueue.CQ_STORE_UNIT_SIZE);
final boolean diskFallRecorded
= this.messageStoreConfig.isDiskFallRecorded();
ConsumeQueueExt.CqExtUnit cqExtUnit
= new ConsumeQueueExt.CqExtUnit();
//for循环 根据偏移量从CommitLog中拉取32条消息
//maxFilterMessageCount = 640
//每次拉取后增加20字节
for (; i < bufferConsumeQueue.getSize() &&
i < maxFilterMessageCount;
i += ConsumeQueue.CQ_STORE_UNIT_SIZE) {
//读取偏移量8个字节
long offsetPy =
bufferConsumeQueue.getByteBuffer().getLong();
//读取消息长度4个字节
int sizePy = bufferConsumeQueue.getByteBuffer().getInt();
//读取8个字节的tag的hashcode
long tagsCode =
bufferConsumeQueue.getByteBuffer().getLong();
maxPhyOffsetPulling = offsetPy;
if (nextPhyFileStartOffset != Long.MIN_VALUE) {
if (offsetPy < nextPhyFileStartOffset)
continue;
}
boolean isInDisk
= checkInDiskByCommitOffset(offsetPy, maxOffsetPy);
if (this.isTheBatchFull(sizePy, maxMsgNums,
getResult.getBufferTotalSize(),
getResult.getMessageCount(),
isInDisk)) {
break;
}
boolean extRet = false, isTagsCodeLegal = true;
if (consumeQueue.isExtAddr(tagsCode)) {
extRet = consumeQueue.getExt(tagsCode, cqExtUnit);
if (extRet) {
tagsCode = cqExtUnit.getTagsCode();
} else {
log.error("can't Find Consume queue");
isTagsCodeLegal = false;
}
}
if (messageFilter != null
&& !messageFilter.isMatchedByConsumeQueue
(isTagsCodeLegal ? tagsCode : null, extRet ?
cqExtUnit : null)) {
if (getResult.getBufferTotalSize() == 0) {
status = GetMessageStatus.NO_MATCHED_MESSAGE;
}
continue;
}
//根据消息的偏移量和消息的长度查询消息对应的MappedBuffer
SelectMappedBufferResult selectResult =
this.commitLog.getMessage(offsetPy, sizePy);
if (null == selectResult) {
if (getResult.getBufferTotalSize() == 0) {
status = GetMessageStatus.MESSAGE_WAS_REMOVING;
}
nextPhyFileStartOffset =
this.commitLog.rollNextFile(offsetPy);
continue;
}
//判断消息过滤器是否存在
//如果存在使用消息过滤器过滤本条消息是否符合
if (messageFilter != null
&& !messageFilter.isMatchedByCommitLog
(selectResult.getByteBuffer().slice(), null)) {
if (getResult.getBufferTotalSize() == 0) {
status = GetMessageStatus.NO_MATCHED_MESSAGE;
}
selectResult.release();
continue;
}
this.storeStatsService
.getGetMessageTransferedMsgCount()
.incrementAndGet();
//将获取到的消息添加到getResult
getResult.addMessage(selectResult);
status = GetMessageStatus.FOUND;
nextPhyFileStartOffset = Long.MIN_VALUE;
}
if (diskFallRecorded) {
long fallBehind = maxOffsetPy - maxPhyOffsetPulling;
brokerStatsManager.recordDiskFallBehindSize
(group, topic, queueId, fallBehind);
}
//获取下一次请求要拉取的偏移量
//假如本次offset是从第0条开始拉取,拉取32条
//i是累计的拉取的偏移量的总和640
//那么下次拉取的偏移量是:0+640/20=32 也就是从32开始拉取
nextBeginOffset=offset+(i/ConsumeQueue.CQ_STORE_UNIT_SIZE);
long diff = maxOffsetPy - maxPhyOffsetPulling;
long memory = (long) (StoreUtil.TOTAL_PHYSICAL_MEMORY_SIZE
* (this.messageStoreConfig
.getAccessMessageInMemoryMaxRatio() / 100.0));
getResult.setSuggestPullingFromSlave(diff > memory);
} finally {
bufferConsumeQueue.release();
}
} else {
status = GetMessageStatus.OFFSET_FOUND_NULL;
nextBeginOffset = nextOffsetCorrection
(offset, consumeQueue.rollNextFile(offset));
}
}
} else {
status = GetMessageStatus.NO_MATCHED_LOGIC_QUEUE;
nextBeginOffset = nextOffsetCorrection(offset, 0);
}
if (GetMessageStatus.FOUND == status) {
this.storeStatsService.getGetMessageTimesTotalFound().incrementAndGet();
} else {
this.storeStatsService.getGetMessageTimesTotalMiss().incrementAndGet();
}
long eclipseTime = this.getSystemClock().now() - beginTime;
this.storeStatsService.setGetMessageEntireTimeMax(eclipseTime);
getResult.setStatus(status);
getResult.setNextBeginOffset(nextBeginOffset);
getResult.setMaxOffset(maxOffset);
getResult.setMinOffset(minOffset);
//返回响应
return getResult;
}
```
PullMessageProcessor#processRequest
```java //根据拉取结果填充responseHeader response.setRemark(getMessageResult.getStatus().name()); responseHeader.setNextBeginOffset(getMessageResult.getNextBeginOffset()); responseHeader.setMinOffset(getMessageResult.getMinOffset()); responseHeader.setMaxOffset(getMessageResult.getMaxOffset());
//判断如果存在主从同步慢,设置下一次拉取任务的ID为主节点
switch (this.brokerController.getMessageStoreConfig().getBrokerRole()) {
case ASYNC_MASTER:
case SYNC_MASTER:
break;
case SLAVE:
if (!this.brokerController.getBrokerConfig().isSlaveReadEnable()) {
response.setCode(ResponseCode.PULL_RETRY_IMMEDIATELY);
responseHeader.setSuggestWhichBrokerId(MixAll.MASTER_ID);
}
break;
}
...
//GetMessageResult与Response的Code转换
switch (getMessageResult.getStatus()) {
case FOUND: //成功
response.setCode(ResponseCode.SUCCESS);
break;
case MESSAGE_WAS_REMOVING: //消息存放在下一个commitLog中
response.setCode(ResponseCode.PULL_RETRY_IMMEDIATELY); //消息重试
break;
case NO_MATCHED_LOGIC_QUEUE: //未找到队列
case NO_MESSAGE_IN_QUEUE: //队列中未包含消息
if (0 != requestHeader.getQueueOffset()) {
response.setCode(ResponseCode.PULL_OFFSET_MOVED);
requestHeader.getQueueOffset(),
getMessageResult.getNextBeginOffset(),
requestHeader.getTopic(),
requestHeader.getQueueId(),
requestHeader.getConsumerGroup()
);
} else {
response.setCode(ResponseCode.PULL_NOT_FOUND);
}
break;
case NO_MATCHED_MESSAGE: //未找到消息
response.setCode(ResponseCode.PULL_RETRY_IMMEDIATELY);
break;
case OFFSET_FOUND_NULL: //消息物理偏移量为空
response.setCode(ResponseCode.PULL_NOT_FOUND);
break;
case OFFSET_OVERFLOW_BADLY: //offset越界
response.setCode(ResponseCode.PULL_OFFSET_MOVED);
// XXX: warn and notify me
log.info("the request offset: {} over flow badly, broker max offset: {}, consumer: {}",
requestHeader.getQueueOffset(), getMessageResult.getMaxOffset(), channel.remoteAddress());
break;
case OFFSET_OVERFLOW_ONE: //offset在队列中未找到
response.setCode(ResponseCode.PULL_NOT_FOUND);
break;
case OFFSET_TOO_SMALL: //offset未在队列中
response.setCode(ResponseCode.PULL_OFFSET_MOVED);
requestHeader.getConsumerGroup(),
requestHeader.getTopic(),
requestHeader.getQueueOffset(),
getMessageResult.getMinOffset(), channel.remoteAddress());
break;
default:
assert false;
break;
}
...
//如果CommitLog标记可用,并且当前Broker为主节点,则更新消息消费进度
boolean storeOffsetEnable = brokerAllowSuspend;
storeOffsetEnable = storeOffsetEnable && hasCommitOffsetFlag;
storeOffsetEnable = storeOffsetEnable
&& this.brokerController.getMessageStoreConfig().getBrokerRole() != BrokerRole.SLAVE;
if (storeOffsetEnable) {
this.brokerController.getConsumerOffsetManager().commitOffset(RemotingHelper.parseChannelRemoteAddr(channel),
requestHeader.getConsumerGroup(), requestHeader.getTopic(), requestHeader.getQueueId(), requestHeader.getCommitOffset());
}
```
5.Broker返回响应
MQClientAPIImpl#processPullResponse
java private PullResult processPullResponse( final RemotingCommand response) throws MQBrokerException, RemotingCommandException { PullStatus pullStatus = PullStatus.NO_NEW_MSG; //判断响应结果 switch (response.getCode()) { case ResponseCode.SUCCESS: pullStatus = PullStatus.FOUND; break; case ResponseCode.PULL_NOT_FOUND: pullStatus = PullStatus.NO_NEW_MSG; break; case ResponseCode.PULL_RETRY_IMMEDIATELY: pullStatus = PullStatus.NO_MATCHED_MSG; break; case ResponseCode.PULL_OFFSET_MOVED: pullStatus = PullStatus.OFFSET_ILLEGAL; break; default: throw new MQBrokerException(response.getCode(), response.getRemark()); } //解码响应头 PullMessageResponseHeader responseHeader = (PullMessageResponseHeader) response.decodeCommandCustomHeader(PullMessageResponseHeader.class); //封装PullResultExt返回 return new PullResultExt(pullStatus, responseHeader.getNextBeginOffset(), responseHeader.getMinOffset(), responseHeader.getMaxOffset(), null, responseHeader.getSuggestWhichBrokerId(), response.getBody()); }