RocketMQ5.0–顺序消息
一、顺序消息概览
RocketMQ支持局部消息顺序消费,可以确保同一个消费队列中的消息被顺序消费,如果做到全局顺序消费则可以将主题配置成一个消费队列。并发(默认)消息消费参考 《RocketMQ5.0.0消息消费<一> _ PUSH模式的消息拉取》 、 《RocketMQ5.0.0消息消费<二> _ 消息队列负载均衡机制》 。本章主要介绍消费队列的顺序消费。如下图所示,是消费者UML图。其中org.apache.rocketmq.client.impl.consumer.ConsumeMessageOrderlyService是顺序消费的实现类。
顺序消息实际上是消费者的负载均衡后的消费队列, 在Broker端给消费队列加锁。同时,消费端给MessageQueue、ProcessQueue加锁来消费消息。
二、顺序消息实现机制
1. 消息队列负载
1):消费端发送加锁请求
消费队列负载均衡参考 《RocketMQ5.0.0消息消费<二> _ 消息队列负载均衡机制》 ,其中RebalanceImpl#rebalanceByTopic(final String topic, final boolean isOrder)是根据消费者订阅主题下消费队列重新负载均衡的核心方法。方法形参isOrder来定义是否是顺序消息,默认false,则是并发消息,其中调用updateProcessQueueTableInRebalance()就使用该参数,部分代码如下所示。
遍历负载均衡后的每一个消费队列,若新增消费队列时,需要判定是不是顺序消息,若是则向Broker端发送锁定该消费队列(避免其他消费者消费),锁定失败后需要延迟重新负载均衡。
/**
* 消费者对应的分配消息队列是否变化
* step1:消费队列缓存表中不在本次均衡分配的消费队列时,则暂停消费并移除,且持久化待移除消费队列的消费进度;
* step2:本次均衡分配的消费队列不在消费队列缓存表中,则新增:
* 1):删除内存中该消费队列的消费进度;
* 2):创建broker的消费队列;
* 3):从磁盘中获取该消费队列的消费进度(若进度<0时,则根据配置矫正消费进度),创建拉取消息请求
* {@link RebalanceImpl#computePullFromWhere}
* step3: 新增消费队列,则创建{@link PullRequest}加入到{@link PullMessageService},唤醒该线程拉取消息
* {@link RebalanceImpl#dispatchPullRequest}
* step4:顺序消息时,则尝试向Broker请求锁定该消费队列,锁定失败延迟重新负载
* @param topic 主题
* @param mqSet 本次均衡分配的消费队列
* @param isOrder 是否顺序
* @return true变化;false未改变
*/
private boolean updateProcessQueueTableInRebalance(final String topic, final Set<MessageQueue> mqSet,
final boolean isOrder) {
......
// 若是顺序消息,则尝试向Broker请求锁定该消费队列,锁定失败延迟重新负载
if (isOrder && !this.lock(mq)) {
log.warn("doRebalance, {}, add a new mq failed, {}, because lock failed", consumerGroup, mq);
allMQLocked = false;
continue;
}
......
// 锁定消费队列失败,延迟重新负载
if (!allMQLocked) {
mQClientFactory.rebalanceLater(500);
}
......
}
2):Broker处理加锁
org.apache.rocketmq.broker.client.rebalance.RebalanceLockManager是顺序消息Broker端给消费者分配到的消费队列加锁的核心类,请求码是RequestCode.LOCK_BATCH_MQ。该类的关键属性如下。注意事项:
- 锁容器mqLockTable:当前消费队列被消费组中的哪个消费者持有,即持有锁对象LockEntry。
- tryLockBatch()与unlockBatch()方法:给消费队列加锁或释放锁的方法。
// 锁存活时间,默认60s,可配置
private final static long REBALANCE_LOCK_MAX_LIVE_TIME = Long.parseLong(System.getProperty(
"rocketmq.broker.rebalance.lockMaxLiveTime", "60000"));
private final Lock lock = new ReentrantLock();
// 锁容器:当前消费队列被消费组中的哪个消费者持有
private final ConcurrentMap<String/* group */, ConcurrentHashMap<MessageQueue, LockEntry>> mqLockTable =
new ConcurrentHashMap<String, ConcurrentHashMap<MessageQueue, LockEntry>>(1024);
2. 消息拉取
消费队列负载均衡后,便从重新负载后的消费队列拉取消息,参考 《RocketMQ5.0.0消息消费<一> _ PUSH模式的消息拉取》 ,其中拉取消息DefaultMQPushConsumerImpl#pullMessage方法中,关于顺序消息的代码如下所示。
消费端的ProcessQueue消息处理队列是否被锁定,若是没有锁定则延迟3s再次将拉取消息请求PullRequest放入到拉取任务中再次拉取;若是被锁定,则向Broker拉取消息成功后,提交到消费线程池中供消费者消费。
/**
* 拉取消息
* step1:消息处理队列是否被丢弃{@link ProcessQueue};
* step2:检查当前消费者状态:消费者是否被挂起;
* step3:拉取消息流控:消息总条数、消息总大小、消息最大/最小间隔等流控,并每1000次打印流控信息;
* step4:构建消息拉取的sysFlag;
* step5:从Broker服务器拉取消息{@link PullAPIWrapper#pullKernelImpl};
* step6:定义拉取成功后处理,即:异步拉取回调函数{@link PullCallback};
* 异步回调函数{@link PullCallback}把拉取的消息提交消费消息{@link ConsumeMessageService#submitConsumeRequest)}
* @param pullRequest 消息拉取请求{@link PullRequest}
*/
public void pullMessage(final PullRequest pullRequest) {
......
// 并发消息
if (!this.consumeOrderly) {
......
}
// 顺序消息
else {
// 处理队列被锁住
if (processQueue.isLocked()) {
// 拉取请求是否锁定,默认不锁定false
if (!pullRequest.isPreviouslyLocked()) {
long offset = -1L;
try {
// 获取消费队列的消费进度,若进度<0时,则根据配置矫正消费进度(DefaultMQPushConsumer.consumeFromWhere配置)
offset = this.rebalanceImpl.computePullFromWhereWithException(pullRequest.getMessageQueue());
if (offset < 0) {
throw new MQClientException(ResponseCode.SYSTEM_ERROR, "Unexpected offset " + offset);
}
} catch (Exception e) {
this.executePullRequestLater(pullRequest, pullTimeDelayMillsWhenException);
log.error("Failed to compute pull offset, pullResult: {}", pullRequest, e);
return;
}
boolean brokerBusy = offset < pullRequest.getNextOffset();
log.info("the first time to pull message, so fix offset from broker. pullRequest: {} NewOffset: {} brokerBusy: {}",
pullRequest, offset, brokerBusy);
if (brokerBusy) {
log.info("[NOTIFYME]the first time to pull message, but pull request offset larger than broker consume offset. pullRequest: {} NewOffset: {}",
pullRequest, offset);
}
pullRequest.setPreviouslyLocked(true);
pullRequest.setNextOffset(offset);
}
}
// 处理队列未被锁住,则延迟
else {
this.executePullRequestLater(pullRequest, pullTimeDelayMillsWhenException);
log.info("pull message later because not locked in broker, {}", pullRequest);
return;
}
}
......
}
3. 消息消费
消息消费参考 《RocketMQ5.0.0消息消费<三> _ 消息消费》 ,其中PullCallback处理拉取结果处理,拉取成功后,拉取的结果PullResult提交到线程池供消费者消费。根据是否是顺序消息,选择消费实现类,顺序消息的消费实现类是ConsumeMessageOrderlyService。
1):启动ConsumeMessageOrderlyService服务
ConsumeMessageOrderlyService#start是启动方法,其调用链如下所示。启动方法逻辑是,若是集群模式,则20s定时周期执行锁定分配的消费队列。
org.apache.rocketmq.client.impl.consumer.RebalanceImpl#lockAll是锁定分配的消费队列的核心逻辑方法,其代码如下。 注意事项:
-
brokerMqs:当前消费者均衡的消息队列缓存表processQueueTable 转换成 按Broker组织的消费队列集合。
-
findBrokerAddressInSubscribe():根据Broker名称获取主节点Broker。
-
lockBatchMQ():向主Broker发送锁定消费队列请求,并返回锁定成功的消息队列:
锁定成功的消费队列:锁定对应的ProcessQueue处理消费队列设置为锁定状态 + 更新加锁时间;
锁定失败的消费队列:对应的ProcessQueue解锁,则暂停拉取消息与消息消费。
/**
* 锁定消费者分配到的消费队列
* step1:当前消费者均衡的消息队列缓存表processQueueTable 转换成 按Broker组织的消费队列集合
* step2:根据Broker名称获取主节点Broker
* step3:向主Broker发送锁定消费队列请求,并返回锁定成功的消息队列
* step4:锁定成功的消费队列对应的ProcessQueue待处理消费队列设置为锁定状态 + 更新加锁时间
* step5:锁定失败的消费队列对应的ProcessQueue待处理消费队列设置为解锁,暂停拉取消息与消息消费
*/
public void lockAll() {
// 当前消费者均衡的消息队列缓存表processQueueTable 转换成 按Broker组织的消费队列集合
HashMap<String/* brokerName */, Set<MessageQueue>> brokerMqs = this.buildProcessQueueTableByBrokerName();
Iterator<Entry<String, Set<MessageQueue>>> it = brokerMqs.entrySet().iterator();
while (it.hasNext()) {
Entry<String, Set<MessageQueue>> entry = it.next();
final String brokerName = entry.getKey();
final Set<MessageQueue> mqs = entry.getValue();
if (mqs.isEmpty()) {
continue;
}
// 根据Broker名称获取主节点Broker
FindBrokerResult findBrokerResult = this.mQClientFactory.findBrokerAddressInSubscribe(brokerName, MixAll.MASTER_ID, true);
if (findBrokerResult != null) {
LockBatchRequestBody requestBody = new LockBatchRequestBody();
requestBody.setConsumerGroup(this.consumerGroup);
requestBody.setClientId(this.mQClientFactory.getClientId()); // 消费者ID
requestBody.setMqSet(mqs); // 消费者分配到的消息队列
try {
// 向Broker发送锁定消费队列请求,并返回锁定成功的消息队列
Set<MessageQueue> lockOKMQSet =
this.mQClientFactory.getMQClientAPIImpl().lockBatchMQ(findBrokerResult.getBrokerAddr(), requestBody, 1000);
// 锁定成功的消费队列对应的ProcessQueue待处理消费队列设置为锁定状态 + 更新加锁时间
for (MessageQueue mq : lockOKMQSet) {
// 锁定成功的消费队列对应的ProcessQueue
ProcessQueue processQueue = this.processQueueTable.get(mq);
if (processQueue != null) {
if (!processQueue.isLocked()) {
log.info("the message queue locked OK, Group: {} {}", this.consumerGroup, mq);
}
// ProcessQueue设置为锁定状态 + 更新加锁时间
processQueue.setLocked(true);
processQueue.setLastLockTimestamp(System.currentTimeMillis());
}
}
// 没有锁定成功的,则相应ProcessQueue解锁,则暂停拉取消息与消息消费
for (MessageQueue mq : mqs) {
if (!lockOKMQSet.contains(mq)) {
ProcessQueue processQueue = this.processQueueTable.get(mq);
if (processQueue != null) {
processQueue.setLocked(false);
log.warn("the message queue locked Failed, Group: {} {}", this.consumerGroup, mq);
}
}
}
} catch (Exception e) {
log.error("lockBatchMQ exception, " + mqs, e);
}
}
}
}
2):拉取消息提交到线程池
org.apache.rocketmq.client.impl.consumer.ConsumeMessageOrderlyService#submitConsumeRequest是顺序消息拉取后提交到线程池的方法,代码如下。注意事项:
- dispathToConsume:只有顺序消息使用该参数,并发消息忽略该参数。
// 提交消费请求,到消费线程池,供消费者消费
@Override
public void submitConsumeRequest(
final List<MessageExt> msgs,
final ProcessQueue processQueue,
final MessageQueue messageQueue,
final boolean dispathToConsume) {
// 是否转发到消费线程池中(注意,并发消息忽略该参数)
if (dispathToConsume) {
// 构建消费任务
ConsumeRequest consumeRequest = new ConsumeRequest(processQueue, messageQueue);
// 提交到消费线程池
this.consumeExecutor.submit(consumeRequest);
}
}
3):消费线程池任务
org.apache.rocketmq.client.impl.consumer.ConsumeMessageOrderlyService.ConsumeRequest是顺序消息的线程池任务,它是个线程。其run()方法,如下代码所示。注意事项:
- fetchLockObject():获取MessageQueue消费队列的锁对象,并加synchronized锁,这样一个消费队列同一时间被一个消费线程消费消息。
- 根据集群模式消费:广播模式 或 ProcessQueue加锁且锁未过期 时,则直接消费;集群下未加锁 或 ProcessQueue加锁且锁过期 时,延迟100ms重新消费。
- takeMessages():顺序消息不是消费当前拉取消息,而是从ProcessQueue.msgTreeMap消息临时存储在ProcessQueue.consumingMsgOrderlyTreeMap,而后再去消费。
/**
* 运行提交消费任务,即:消费线程消费任务
* step1:待处理队列是否丢弃,若丢弃,则停止消费
* step2:获取指定消费队列的锁对象,目的:一个消费队列同一时间被一个线程消费
* step3:给锁对象加锁,进行消费
* step4:广播模式 或 加锁且锁未过期 时,则直接消费;
* 集群下未加锁 或 加锁且锁过期 时,延迟100ms重新消费
* step5:顺序取出消息(顺序消息时,临时存储在ProcessQueue.consumingMsgOrderlyTreeMap中)
* {@link ProcessQueue#takeMessages(int)}
* step6:获取消费锁,消费监听器调用消费逻辑
* {@link MessageListenerOrderly#consumeMessage}
* step7:根据消费结果,判定是否重试消费:{@link ConsumeMessageOrderlyService#processConsumeResult}
* 检查消费次数,判断是否进入DLQ队列(进入DLQ成功,则认为消费成功)
* {@link ConsumeMessageOrderlyService#checkReconsumeTimes}
*/
@Override
public void run() {
// 待处理队列是否丢弃,若丢弃,则停止消费
if (this.processQueue.isDropped()) {
log.warn("run, the message queue not be able to consume, because it's dropped. {}", this.messageQueue);
return;
}
// 获取指定消费队列的锁对象,目的:一个消费队列同一时间被一个线程消费
final Object objLock = messageQueueLock.fetchLockObject(this.messageQueue);
// 给消费队列的锁对象加锁,然后才消费消息
synchronized (objLock) {
// 广播模式 或 加锁且锁未过期 时,则直接消费
if (MessageModel.BROADCASTING.equals(ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.messageModel())
|| this.processQueue.isLocked() && !this.processQueue.isLockExpired()) {
final long beginTime = System.currentTimeMillis();
// continueConsume是否继续消费,不是根据消息条数,而是连续消费最大时间ConsumeMessageOrderlyService.MAX_TIME_CONSUME_CONTINUOUSLY
for (boolean continueConsume = true; continueConsume; ) {
if (this.processQueue.isDropped()) {
log.warn("the message queue not be able to consume, because it's dropped. {}", this.messageQueue);
break;
}
if (MessageModel.CLUSTERING.equals(ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.messageModel())
&& !this.processQueue.isLocked()) {
log.warn("the message queue not locked, so consume later, {}", this.messageQueue);
ConsumeMessageOrderlyService.this.tryLockLaterAndReconsume(this.messageQueue, this.processQueue, 10);
break;
}
if (MessageModel.CLUSTERING.equals(ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.messageModel())
&& this.processQueue.isLockExpired()) {
log.warn("the message queue lock expired, so consume later, {}", this.messageQueue);
ConsumeMessageOrderlyService.this.tryLockLaterAndReconsume(this.messageQueue, this.processQueue, 10);
break;
}
// 超出连续消费最大时间ConsumeMessageOrderlyService.MAX_TIME_CONSUME_CONTINUOUSLY
long interval = System.currentTimeMillis() - beginTime;
if (interval > MAX_TIME_CONSUME_CONTINUOUSLY) {
ConsumeMessageOrderlyService.this.submitConsumeRequestLater(processQueue, messageQueue, 10);
break;
}
// 从ProcessQueue取出消息条数
final int consumeBatchSize =
ConsumeMessageOrderlyService.this.defaultMQPushConsumer.getConsumeMessageBatchMaxSize();
// 顺序取出消息(顺序消息时,临时存储在ProcessQueue.consumingMsgOrderlyTreeMap中)
List<MessageExt> msgs = this.processQueue.takeMessages(consumeBatchSize);
// 恢复重试消息主题名(消息重试机制决定)
defaultMQPushConsumerImpl.resetRetryAndNamespace(msgs, defaultMQPushConsumer.getConsumerGroup());
if (!msgs.isEmpty()) {
final ConsumeOrderlyContext context = new ConsumeOrderlyContext(this.messageQueue);
ConsumeOrderlyStatus status = null;
// 执行消费前钩子函数
ConsumeMessageContext consumeMessageContext = null;
if (ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.hasHook()) {
consumeMessageContext = new ConsumeMessageContext();
consumeMessageContext
.setConsumerGroup(ConsumeMessageOrderlyService.this.defaultMQPushConsumer.getConsumerGroup());
consumeMessageContext.setNamespace(defaultMQPushConsumer.getNamespace());
consumeMessageContext.setMq(messageQueue);
consumeMessageContext.setMsgList(msgs);
consumeMessageContext.setSuccess(false);
// init the consume context type
consumeMessageContext.setProps(new HashMap<String, String>());
ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.executeHookBefore(consumeMessageContext);
}
long beginTimestamp = System.currentTimeMillis();
ConsumeReturnType returnType = ConsumeReturnType.SUCCESS;
boolean hasException = false; // 消费是否有异常
try {
// 获取消费锁
this.processQueue.getConsumeLock().lock();
if (this.processQueue.isDropped()) {
log.warn("consumeMessage, the message queue not be able to consume, because it's dropped. {}",
this.messageQueue);
break;
}
// 消费监听器调用业务方,具体的消费逻辑
status = messageListener.consumeMessage(Collections.unmodifiableList(msgs), context);
} catch (Throwable e) {
log.warn(String.format("consumeMessage exception: %s Group: %s Msgs: %s MQ: %s",
RemotingHelper.exceptionSimpleDesc(e),
ConsumeMessageOrderlyService.this.consumerGroup,
msgs,
messageQueue), e);
hasException = true;
} finally {
this.processQueue.getConsumeLock().unlock();
}
if (null == status
|| ConsumeOrderlyStatus.ROLLBACK == status
|| ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT == status) {
log.warn("consumeMessage Orderly return not OK, Group: {} Msgs: {} MQ: {}",
ConsumeMessageOrderlyService.this.consumerGroup,
msgs,
messageQueue);
}
long consumeRT = System.currentTimeMillis() - beginTimestamp;
if (null == status) {
if (hasException) {
returnType = ConsumeReturnType.EXCEPTION;
} else {
returnType = ConsumeReturnType.RETURNNULL;
}
} else if (consumeRT >= defaultMQPushConsumer.getConsumeTimeout() * 60 * 1000) {
returnType = ConsumeReturnType.TIME_OUT;
} else if (ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT == status) {
returnType = ConsumeReturnType.FAILED;
} else if (ConsumeOrderlyStatus.SUCCESS == status) {
returnType = ConsumeReturnType.SUCCESS;
}
if (ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.hasHook()) {
consumeMessageContext.getProps().put(MixAll.CONSUME_CONTEXT_TYPE, returnType.name());
}
if (null == status) {
status = ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;
}
// 执行消费后(正常或异常)的钩子函数
if (ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.hasHook()) {
consumeMessageContext.setStatus(status.toString());
consumeMessageContext
.setSuccess(ConsumeOrderlyStatus.SUCCESS == status || ConsumeOrderlyStatus.COMMIT == status);
ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.executeHookAfter(consumeMessageContext);
}
ConsumeMessageOrderlyService.this.getConsumerStatsManager()
.incConsumeRT(ConsumeMessageOrderlyService.this.consumerGroup, messageQueue.getTopic(), consumeRT);
// 处理消费结果,是否继续消费(注意:进入DLQ认为消费成功,继续消费)
continueConsume = ConsumeMessageOrderlyService.this.processConsumeResult(msgs, status, context, this);
} else {
continueConsume = false;
}
}
}
// 集群下未加锁 或 加锁且锁过期 时,延迟100ms重新消费
else {
if (this.processQueue.isDropped()) {
log.warn("the message queue not be able to consume, because it's dropped. {}", this.messageQueue);
return;
}
// 延迟100ms重新消费
ConsumeMessageOrderlyService.this.tryLockLaterAndReconsume(this.messageQueue, this.processQueue, 100);
}
}
}