RocketMQ5.0.0消息消费<三> _ 消息消费
一、消息消费
1. 消费UML图
PUSH模式消息拉取机制参考《RocketMQ5.0.0消息消费<一> _ PUSH模式的消息拉取》,PullMessageService负责对消息队列进行消息拉取,从Broker端拉取消息后将消息存入ProcessQueue消息处理队列中,后调用ConsumeMessageService#submitConsumeRequest方法将消息提交到线程池。使用消费线程池确保了消息拉取与消息消费的解耦。RocketMQ使用ConsumeMessageService来实现消息消费的处理逻辑。
RocketMQ支持顺序消费与并发消费,本章节介绍并发消费消息的流程。
下图所示是消费消息UML图,org.apache.rocketmq.client.impl.consumer.ConsumeMessageService维护一个消费线程池。
org.apache.rocketmq.client.impl.consumer.ConsumeMessageConcurrentlyService是PUSH模式的并发消息消费实现类,其关键属性如下。
// 消息推模式实现类
private final DefaultMQPushConsumerImpl defaultMQPushConsumerImpl;
// 默认推模式消费者
private final DefaultMQPushConsumer defaultMQPushConsumer;
// 并发消息监听
private final MessageListenerConcurrently messageListener;
// 消费线程池的任务队列
private final BlockingQueue<Runnable> consumeRequestQueue;
// 消费线程池
private final ThreadPoolExecutor consumeExecutor;
// 消费组
private final String consumerGroup;
// 消费延迟调度器
private final ScheduledExecutorService scheduledExecutorService;
// 定时删除过期消息线程池
private final ScheduledExecutorService cleanExpireMsgExecutors;
2. 提交消息
参考《RocketMQ5.0.0消息消费<一> _ PUSH模式的消息拉取》,成功拉取消息后,org.apache.rocketmq.client.consumer.PullCallback回调onSuccess(),把消息提交(异步提交)到ConsumeMessageService的线程池中,供消费者消费,则本次拉取消息完成。
org.apache.rocketmq.client.impl.consumer.ConsumeMessageConcurrentlyService#submitConsumeRequest是提交拉取的消息到消费线程池中的核心方法,如下代码所示。注意事项:
-
consumeMessageBatchMaxSize:获取并发消费时一次消费消息条数,默认1条。
-
判断拉取的消息条数(默认最大32条)与 consumeBatchSize大小比较:
a. msgs.size() <= consumeBatchSize时:组装消费请求ConsumeRequest并提交消费任务到消费线程池中;
b. msgs.size() > consumeBatchSize时:分页提交到消费线程池中。
-
submitConsumeRequestLater():消费线程池饱和拒绝时,则延迟5s再次提交消费请求。
/**
* 提交消息消费,供消费者消费
* 并发消息消费入口:{@link DefaultMQPushConsumerImpl#pullMessage}中的{@link org.apache.rocketmq.client.consumer.PullCallback}
* step1:获取并发消费时一次消费消息条数,默认1条(DefaultMQPushConsumer.consumeMessageBatchMaxSize)
* step2:msgs.size()一次拉取消息的条数,最大32条 <= consumeBatchSize时,
* a. 组装消费请求并提交消费任务到消费线程池中
* b. 出现饱和抛出异常时,延迟5s提交{@link ConsumeMessageConcurrentlyService#submitConsumeRequestLater(ConsumeRequest)}
* step3:msgs.size()一次拉取消息的条数,最大32条 > consumeBatchSize时,分页提交到消费线程池中
* @param msgs 一次拉取待消费消息,最大默认32条{@link DefaultMQPushConsumer#pullBatchSize}
* @param processQueue 待消息消费处理队列
* @param messageQueue 消息所属消费队列
* @param dispatchToConsume 是否转发到消费线程池,并发消费则忽略
*/
@Override
public void submitConsumeRequest(
final List<MessageExt> msgs,
final ProcessQueue processQueue,
final MessageQueue messageQueue,
final boolean dispatchToConsume) {
// 并发消费时一次消费消息条数,默认1条(DefaultMQPushConsumer.consumeMessageBatchMaxSize)
final int consumeBatchSize = this.defaultMQPushConsumer.getConsumeMessageBatchMaxSize();
// msgs.size()一次拉取消息的条数,最大32条
if (msgs.size() <= consumeBatchSize) {
// 组装消费请求
ConsumeRequest consumeRequest = new ConsumeRequest(msgs, processQueue, messageQueue);
try {
// 消费任务直接提交到消费线程池,具体消费逻辑ConsumeMessageConcurrentlyService.ConsumeRequest.run
this.consumeExecutor.submit(consumeRequest);
} catch (RejectedExecutionException e) {
// 延迟5s提交到消费线程池
this.submitConsumeRequestLater(consumeRequest);
}
}
// msgs.size() > consumeBatchSize时,对拉取消息进行分页,每页有consumeBatchSize条消息
else {
for (int total = 0; total < msgs.size(); ) {
// 每页有consumeBatchSize条消息
List<MessageExt> msgThis = new ArrayList<MessageExt>(consumeBatchSize);
for (int i = 0; i < consumeBatchSize; i++, total++) {
if (total < msgs.size()) {
msgThis.add(msgs.get(total));
} else {
break;
}
}
ConsumeRequest consumeRequest = new ConsumeRequest(msgThis, processQueue, messageQueue);
try {
this.consumeExecutor.submit(consumeRequest);
} catch (RejectedExecutionException e) {
for (; total < msgs.size(); total++) {
msgThis.add(msgs.get(total));
}
this.submitConsumeRequestLater(consumeRequest);
}
}
}
}
3. 消费消息
org.apache.rocketmq.client.impl.consumer.ConsumeMessageConcurrentlyService.ConsumeRequest是提交线程池中的消费任务,该类是个线程。其关键属性如下。
// 提交消费请求的消息
private final List<MessageExt> msgs;
// PullRequest.messageQueue属性:待拉取消费队列(负载均衡后的分配的消息队列)
private final ProcessQueue processQueue;
// PullRequest.processQueue属性:消息处理队列(存储已拉取的消息)
private final MessageQueue messageQueue;
ConsumeMessageConcurrentlyService.ConsumeRequest#run方法执行消息消费逻辑,代码如下。注意事项:
- processQueue.isDropped():判断拉取消息处理队列是否被丢弃(消费队列重新负载 _
删除处理),避免消息重复消费;ACK时,也要进行是否丢弃判断。 - MessageListenerConcurrently#consumeMessage:具体的消息消费业务逻辑,返回该批消息的消费结果。
- ConsumeMessageConcurrentlyService#processConsumeResult():消费者消费消息后,进行消息ACK确认,下小节介绍。
- ConsumeConcurrentlyStatus
status:该批消息的消费结果(成功CONSUME_SUCCESS、失败RECONSUME_LATER)
/**
* 并发消息消费的具体逻辑
* 入口:{@link ConsumeMessageConcurrentlyService#submitConsumeRequest}
* step1:判断当前消费者的ProcessQueue是否被丢弃,true时重新负载均衡后所属消费队列被消费组内其他消费则占用
* step2:恢复重试消息主题名(消息重试机制决定)
* step3:执行消费前的钩子函数
* step4:具体的消息消费业务逻辑,返回该批消息的消费结果
* {@link MessageListenerConcurrently#consumeMessage(List, ConsumeConcurrentlyContext)}
* step5:判断是否消费超时
* step6:执行消费后钩子函数
* step7:再次判断当前消费者的ProcessQueue是否被丢弃(消费过程中是否被修改),防止重复消费
* 没有丢弃,则处理消费结果,ACK机制{@link ConsumeMessageConcurrentlyService#processConsumeResult(ConsumeConcurrentlyStatus, ConsumeConcurrentlyContext, ConsumeRequest)}
*/
@Override
public void run() {
// 判断当前消费者的ProcessQueue是否被丢弃,true时重新负载均衡后所属消费队列被消费组内其他消费则占用
if (this.processQueue.isDropped()) {
log.info("the message queue not be able to consume, because it's dropped. group={} {}", ConsumeMessageConcurrentlyService.this.consumerGroup, this.messageQueue);
return;
}
// 并发消息消费监听器
MessageListenerConcurrently listener = ConsumeMessageConcurrentlyService.this.messageListener;
ConsumeConcurrentlyContext context = new ConsumeConcurrentlyContext(messageQueue);
ConsumeConcurrentlyStatus status = null;
// 恢复重试消息主题名(消息重试机制决定)
defaultMQPushConsumerImpl.tryResetPopRetryTopic(msgs, consumerGroup);
defaultMQPushConsumerImpl.resetRetryAndNamespace(msgs, defaultMQPushConsumer.getConsumerGroup());
// 执行消费前的钩子函数
ConsumeMessageContext consumeMessageContext = null;
if (ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.hasHook()) {
consumeMessageContext = new ConsumeMessageContext();
consumeMessageContext.setNamespace(defaultMQPushConsumer.getNamespace());
consumeMessageContext.setConsumerGroup(defaultMQPushConsumer.getConsumerGroup());
consumeMessageContext.setProps(new HashMap<String, String>());
consumeMessageContext.setMq(messageQueue);
consumeMessageContext.setMsgList(msgs);
consumeMessageContext.setSuccess(false);
// 执行消费前钩子函数
ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.executeHookBefore(consumeMessageContext);
}
long beginTimestamp = System.currentTimeMillis();
boolean hasException = false;
ConsumeReturnType returnType = ConsumeReturnType.SUCCESS;
try {
if (msgs != null && !msgs.isEmpty()) {
for (MessageExt msg : msgs) {
// 设置消费开始时间
MessageAccessor.setConsumeStartTimeStamp(msg, String.valueOf(System.currentTimeMillis()));
}
}
// 具体的消息消费业务逻辑,返回该批消息的消费结果
status = listener.consumeMessage(Collections.unmodifiableList(msgs), context);
} catch (Throwable e) {
log.warn(String.format("consumeMessage exception: %s Group: %s Msgs: %s MQ: %s",
RemotingHelper.exceptionSimpleDesc(e),
ConsumeMessageConcurrentlyService.this.consumerGroup,
msgs,
messageQueue), e);
hasException = true;
}
long consumeRT = System.currentTimeMillis() - beginTimestamp;
if (null == status) {
if (hasException) {
returnType = ConsumeReturnType.EXCEPTION;
} else {
returnType = ConsumeReturnType.RETURNNULL;
}
}
// 消费是否超时
else if (consumeRT >= defaultMQPushConsumer.getConsumeTimeout() * 60 * 1000) {
returnType = ConsumeReturnType.TIME_OUT;
} else if (ConsumeConcurrentlyStatus.RECONSUME_LATER == status) {
returnType = ConsumeReturnType.FAILED;
} else if (ConsumeConcurrentlyStatus.CONSUME_SUCCESS == status) {
returnType = ConsumeReturnType.SUCCESS;
}
if (ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.hasHook()) {
consumeMessageContext.getProps().put(MixAll.CONSUME_CONTEXT_TYPE, returnType.name());
}
if (null == status) {
log.warn("consumeMessage return null, Group: {} Msgs: {} MQ: {}",
ConsumeMessageConcurrentlyService.this.consumerGroup,
msgs,
messageQueue);
status = ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
// 执行消费后钩子函数
if (ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.hasHook()) {
consumeMessageContext.setStatus(status.toString());
consumeMessageContext.setSuccess(ConsumeConcurrentlyStatus.CONSUME_SUCCESS == status);
ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.executeHookAfter(consumeMessageContext);
}
ConsumeMessageConcurrentlyService.this.getConsumerStatsManager()
.incConsumeRT(ConsumeMessageConcurrentlyService.this.consumerGroup, messageQueue.getTopic(), consumeRT);
// 再次判断当前消费者的ProcessQueue是否被丢弃(消费过程中是否被修改),防止重复消费
if (!processQueue.isDropped()) {
// 处理消费结果,ACK机制
ConsumeMessageConcurrentlyService.this.processConsumeResult(status, context, this);
} else {
log.warn("processQueue is dropped without process consume result. messageQueue={}, msgs={}", messageQueue, msgs);
}
}
二、消息确认(ACK)
根据上节的介绍,PUSH模式拉取消息后,拉取请求提交到消费线程池,消费者消费消息。org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus是消费者消费该批消息的结果,只有在消费结果是失败(RECONSUME_LATER)时,这批消息逐条发送消费ACK确认。
Broker处理消费ACK请求时,消息主题修改为重试主题(%RETRY% + 消费组名称),而原始主题存储到消息扩展属性。同时,判断消费次数是否超出最大重试消费(默认16次),若是则进入DLQ队列。新消息再次存储到Commitlog文件,而不是直接修改消息,原因是RocketMQ顺序写入Commitlog文件,随机读,提高消息的吞吐量。消费者重新消费时,恢复为原始主题。
1. 消费端处理消费结果
org.apache.rocketmq.client.impl.consumer.ConsumeMessageConcurrentlyService#processConsumeResult是处理消费结果的核心逻辑方法,如下代码。注意事项:
-
根据消费结果计算ackIndex:
该批消息消费成功CONSUME_SUCCESS:ackIndex为msgs.size() - 1
该批消息消费失败RECONSUME_LATER:ackIndex为-1
-
根据消费模式处理失败RECONSUME_LATER:
广播:只打印日志。
集群:该批消息逐条向Broker同步发送ACK确认,Broker返回失败时设置消费次数(+1)并再次封装成ConsumeRequest,且延迟5s重新消费。
-
消费结果成功时:更新消费进度(移除该批消息后,获取剩下的msgTreeMap消息中最小的偏移量,避免重复消费),下小节介绍消费进度管理。
/**
* 处理消费结果
* step1:计算ackIndex,为消费ACK准备:
* 成功CONSUME_SUCCESS:ackIndex为msgs.size() - 1
* 失败RECONSUME_LATER:ackIndex为-1
* step2:消费模式,处理业务方返回RECONSUME_LATER的消息:
* 广播BROADCASTING:RECONSUME_LATER的消息执行,只打印日志
* 集群CLUSTERING:本批消息只要有一个返回RECONSUME_LATER,则本批消息都需要ACK,ACK返回失败,设置消费次数
* step3:更新消费进度:
* 移除该批消息后,获取剩下的msgTreeMap消息中最小的偏移量,避免重复消费
*/
public void processConsumeResult(
final ConsumeConcurrentlyStatus status,
final ConsumeConcurrentlyContext context,
final ConsumeRequest consumeRequest
) {
int ackIndex = context.getAckIndex();
if (consumeRequest.getMsgs().isEmpty())
return;
// 计算ackIndex,为消费ACK准备
switch (status) {
// 成功,则ackIndex为msgs.size() - 1
case CONSUME_SUCCESS:
if (ackIndex >= consumeRequest.getMsgs().size()) {
ackIndex = consumeRequest.getMsgs().size() - 1;
}
int ok = ackIndex + 1;
int failed = consumeRequest.getMsgs().size() - ok;
this.getConsumerStatsManager().incConsumeOKTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(), ok);
this.getConsumerStatsManager().incConsumeFailedTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(), failed);
break;
// 失败,则ackIndex为-1
case RECONSUME_LATER:
ackIndex = -1;
this.getConsumerStatsManager().incConsumeFailedTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(),
consumeRequest.getMsgs().size());
break;
default:
break;
}
// 根据消费模式,处理业务方返回RECONSUME_LATER的消息
switch (this.defaultMQPushConsumer.getMessageModel()) {
case BROADCASTING:
// RECONSUME_LATER的消息执行,只打印日志
for (int i = ackIndex + 1; i < consumeRequest.getMsgs().size(); i++) {
MessageExt msg = consumeRequest.getMsgs().get(i);
log.warn("BROADCASTING, the message consume failed, drop it, {}", msg.toString());
}
break;
case CLUSTERING:
// 本批消息只要有一个返回RECONSUME_LATER,则本批消息都需要ACK
List<MessageExt> msgBackFailed = new ArrayList<MessageExt>(consumeRequest.getMsgs().size());
for (int i = ackIndex + 1; i < consumeRequest.getMsgs().size(); i++) {
MessageExt msg = consumeRequest.getMsgs().get(i);
// 每条消息,向Broker发送ACK
boolean result = this.sendMessageBack(msg, context);
// ACK返回失败的消息,设置消费次数
if (!result) {
msg.setReconsumeTimes(msg.getReconsumeTimes() + 1);
msgBackFailed.add(msg);
}
}
// ACK返回失败的消息再次封装成ConsumeRequest,且延迟5s重新消费
if (!msgBackFailed.isEmpty()) {
consumeRequest.getMsgs().removeAll(msgBackFailed);
this.submitConsumeRequestLater(msgBackFailed, consumeRequest.getProcessQueue(), consumeRequest.getMessageQueue());
}
break;
default:
break;
}
// 移除消息后,获取剩下的msgTreeMap消息,最小的偏移量,避免重复消费
long offset = consumeRequest.getProcessQueue().removeMessage(consumeRequest.getMsgs());
// 更新本地消费队列的消费进度
if (offset >= 0 && !consumeRequest.getProcessQueue().isDropped()) {
this.defaultMQPushConsumerImpl.getOffsetStore().updateOffset(consumeRequest.getMessageQueue(), offset, true);
}
}
org.apache.rocketmq.client.impl.MQClientAPIImpl#consumerSendMessageBack是同步发送消息ACK确认核心方法,如下代码。请求码RequestCode.CONSUMER_SEND_MSG_BACK,注意事项:
-
delayLevel(延迟级别):取ConsumeConcurrentlyContext#delayLevelWhenNextConsume属性(消费延迟重试策略),值有:
-1:不重试,直接进入死信队列DLQ
0(默认):Broker控制重试次数
>0:消费端控制重试次数
/**
* 同步向Broker发送消息消费ACK请求
* step1:构建消费确认ACK的发送请求头{@link ConsumerSendMsgBackRequestHeader};
* step2:同步发送ACK
* 注意:delayLevel(延迟级别):取{@link ConsumeConcurrentlyContext#delayLevelWhenNextConsume}属性(消费延迟重试策略),值有:
* -1:不重试,直接进入死信队列DLQ;0(默认):Broker控制重试次数;>0:消费端控制重试次数
* Broker入口:{@link org.apache.rocketmq.broker.processor.SendMessageProcessor#processRequest}
* @param addr Broker地址
* @param msg
* @param consumerGroup
* @param delayLevel 延迟级别
* @param timeoutMillis 超时时间
* @param maxConsumeRetryTimes 最大消费重试次数
*/
public void consumerSendMessageBack(
final String addr,
final MessageExt msg,
final String consumerGroup,
final int delayLevel,
final long timeoutMillis,
final int maxConsumeRetryTimes
) throws RemotingException, MQBrokerException, InterruptedException {
// 构建消费确认ACK的发送请求头
ConsumerSendMsgBackRequestHeader requestHeader = new ConsumerSendMsgBackRequestHeader();
// 发送请求码CONSUMER_SEND_MSG_BACK
RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.CONSUMER_SEND_MSG_BACK, requestHeader);
requestHeader.setGroup(consumerGroup);
requestHeader.setOriginTopic(msg.getTopic());
requestHeader.setOffset(msg.getCommitLogOffset());
requestHeader.setDelayLevel(delayLevel);
requestHeader.setOriginMsgId(msg.getMsgId());
requestHeader.setMaxReconsumeTimes(maxConsumeRetryTimes);
RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr),
request, timeoutMillis);
assert response != null;
switch (response.getCode()) {
case ResponseCode.SUCCESS: {
return;
}
default:
break;
}
throw new MQBrokerException(response.getCode(), response.getRemark(), addr);
}
2. Broker端处理消费ACK请求
SendMessageProcessor#processRequest是Broker端接收消费ACK请求方法,请求码RequestCode.CONSUMER_SEND_MSG_BACK。
org.apache.rocketmq.broker.processor.AbstractSendMessageProcessor#consumerSendMsgBack是消费ACK的核心方法,代码如下。注意事项:
- 主题修改为重试主题:重试主题(%RETRY% + 消费组名称),并且从重试队列中随机选择一个队列;消息原始主题存储到消息扩展属性。
- 判断延迟级别delayLevel:见上小节《消费端处理消费结果》,默认0是Broker控制重试次数。
- 判定消费重试次数是否超出最大重试次数:
a. 消费次数 >= 最大重试消费次数 或 delayLevel < 0时:进入DLQ队列(只有写权限),不在进行消费,需要人工干预;
b. 消费次数 < 最大重试消费次数时:若是delayLevel = 0 (默认)则重置delayLevel,后消息进行延迟消费(设置延迟级别)。
- 创建新消息对象:创建新消息并提交到Commitlog文件内存,而不是修改消息(重试次数)。原因是:RocketMQ顺序写入Commitlog文件,随机读,提高消息的吞吐量。
/**
* Broker处理消息消费ACK确认
* 消费者ACK入口:{@link MQClientAPIImpl#consumerSendMessageBack}
* step1:获取重试主题(%RETRY% + 消费组名称),并且从重试队列中随机选择一个队列
* step2:构建重试主题配置信息
* {@link TopicConfigManager#createTopicInSendMessageBackMethod}
* step3:判断是否有写权限
* step4:根据offset从Commitlog获取消息
* step5:消息的原始主题存入到属性中
* step6:获取最大重试消费次数,默认16次
* {@link SubscriptionGroupConfig.retryMaxTimes}
* step7:消费次数 >= 最大重试消费次数 或 delayLevel < 0时,进入DLQ队列(只有写权限),不在进行消费,需要人工干预
* 消费次数 < 最大重试消费次数,若是delayLevel = 0 (默认)则重置delayLevel,后消息进行延迟消费(设置延迟级别)
* step8:创建新的消息对象,并重新提交到Commitlog文件内存
* topic为重试主题、新的msgId,其他属性与原消息一致;新的消息对象存储到Commitlog文件中;原来消息的topic、msgId存入新消息属性。
*/
protected RemotingCommand consumerSendMsgBack(final ChannelHandlerContext ctx, final RemotingCommand request)
throws RemotingCommandException {
final RemotingCommand response = RemotingCommand.createResponseCommand(null);
final ConsumerSendMsgBackRequestHeader requestHeader =
(ConsumerSendMsgBackRequestHeader) request.decodeCommandCustomHeader(ConsumerSendMsgBackRequestHeader.class);
// The send back requests sent to SlaveBroker will be forwarded to the master broker beside
final BrokerController masterBroker = this.brokerController.peekMasterBroker();
if (null == masterBroker) {
response.setCode(ResponseCode.SYSTEM_ERROR);
response.setRemark("no master available along with " + brokerController.getBrokerConfig().getBrokerIP1());
return response;
}
// The broker that received the request.
// It may be a master broker or a slave broker
final BrokerController currentBroker = this.brokerController;
SubscriptionGroupConfig subscriptionGroupConfig =
masterBroker.getSubscriptionGroupManager().findSubscriptionGroupConfig(requestHeader.getGroup());
if (null == subscriptionGroupConfig) {
response.setCode(ResponseCode.SUBSCRIPTION_GROUP_NOT_EXIST);
response.setRemark("subscription group not exist, " + requestHeader.getGroup() + " "
+ FAQUrl.suggestTodo(FAQUrl.SUBSCRIPTION_GROUP_NOT_EXIST));
return response;
}
BrokerConfig masterBrokerConfig = masterBroker.getBrokerConfig();
if (!PermName.isWriteable(masterBrokerConfig.getBrokerPermission())) {
response.setCode(ResponseCode.NO_PERMISSION);
response.setRemark("the broker[" + masterBrokerConfig.getBrokerIP1() + "] sending message is forbidden");
return response;
}
if (subscriptionGroupConfig.getRetryQueueNums() <= 0) {
response.setCode(ResponseCode.SUCCESS);
response.setRemark(null);
return response;
}
// 获取重试主题:%RETRY% + 消费组名称
String newTopic = MixAll.getRetryTopic(requestHeader.getGroup());
// 从重试队列中随机选择一个队列
int queueIdInt = this.random.nextInt(subscriptionGroupConfig.getRetryQueueNums());
int topicSysFlag = 0;
if (requestHeader.isUnitMode()) {
topicSysFlag = TopicSysFlag.buildSysFlag(false, true);
}
// Create retry topic to master broker 构建重试主题配置信息
TopicConfig topicConfig = masterBroker.getTopicConfigManager().createTopicInSendMessageBackMethod(
newTopic,
subscriptionGroupConfig.getRetryQueueNums(),
PermName.PERM_WRITE | PermName.PERM_READ, topicSysFlag);
if (null == topicConfig) {
response.setCode(ResponseCode.SYSTEM_ERROR);
response.setRemark("topic[" + newTopic + "] not exist");
return response;
}
// 是否有写权限
if (!PermName.isWriteable(topicConfig.getPerm())) {
response.setCode(ResponseCode.NO_PERMISSION);
response.setRemark(String.format("the topic[%s] sending message is forbidden", newTopic));
return response;
}
// 根据offset从Commitlog获取消息
// Look message from the origin message store
MessageExt msgExt = currentBroker.getMessageStore().lookMessageByOffset(requestHeader.getOffset());
if (null == msgExt) {
response.setCode(ResponseCode.SYSTEM_ERROR);
response.setRemark("look message by offset failed, " + requestHeader.getOffset());
return response;
}
//for logic queue
if (requestHeader.getOriginTopic() != null
&& !msgExt.getTopic().equals(requestHeader.getOriginTopic())) {
//here just do some fence in case of some unexpected offset is income
response.setCode(ResponseCode.SYSTEM_ERROR);
response.setRemark("look message by offset failed to check the topic name" + requestHeader.getOffset());
return response;
}
// 消息的原始主题存入到属性中
final String retryTopic = msgExt.getProperty(MessageConst.PROPERTY_RETRY_TOPIC);
if (null == retryTopic) {
MessageAccessor.putProperty(msgExt, MessageConst.PROPERTY_RETRY_TOPIC, msgExt.getTopic());
}
msgExt.setWaitStoreMsgOK(false);
// 延迟级别
int delayLevel = requestHeader.getDelayLevel();
// 获取最大重试消费次数,默认16次
int maxReconsumeTimes = subscriptionGroupConfig.getRetryMaxTimes();
if (request.getVersion() >= MQVersion.Version.V3_4_9.ordinal()) {
Integer times = requestHeader.getMaxReconsumeTimes();
if (times != null) {
maxReconsumeTimes = times;
}
}
// 消费次数 >= 最大重试消费次数时,进入DLQ队列(只有写权限),不在进行消费,需要人工干预
boolean isDLQ = false;
if (msgExt.getReconsumeTimes() >= maxReconsumeTimes
|| delayLevel < 0) {
// 进入DQL队列,重置newTopic为:%DLQ% + 消费组
isDLQ = true;
newTopic = MixAll.getDLQTopic(requestHeader.getGroup());
queueIdInt = randomQueueId(DLQ_NUMS_PER_GROUP);
// Create DLQ topic to master broker
topicConfig = masterBroker.getTopicConfigManager().createTopicInSendMessageBackMethod(newTopic,
DLQ_NUMS_PER_GROUP,
PermName.PERM_WRITE | PermName.PERM_READ, 0);
if (null == topicConfig) {
response.setCode(ResponseCode.SYSTEM_ERROR);
response.setRemark("topic[" + newTopic + "] not exist");
return response;
}
msgExt.setDelayTimeLevel(0);
}
// 消费次数 < 最大重试消费次数,则消息进行延迟消费
else {
// 根据消费次数,设置延迟级别
if (0 == delayLevel) {
delayLevel = 3 + msgExt.getReconsumeTimes();
}
msgExt.setDelayTimeLevel(delayLevel);
}
/*
创建新的消息对象
a. topic为重试主题、新的msgId,其他属性与原消息一致;
b. 新的消息对象存储到Commitlog文件中;
c. 原来消息的topic、msgId存入新消息属性。
*/
MessageExtBrokerInner msgInner = new MessageExtBrokerInner();
msgInner.setTopic(newTopic);
msgInner.setBody(msgExt.getBody());
msgInner.setFlag(msgExt.getFlag());
MessageAccessor.setProperties(msgInner, msgExt.getProperties());
msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgExt.getProperties()));
msgInner.setTagsCode(MessageExtBrokerInner.tagsString2tagsCode(null, msgExt.getTags()));
msgInner.setQueueId(queueIdInt);
msgInner.setSysFlag(msgExt.getSysFlag());
msgInner.setBornTimestamp(msgExt.getBornTimestamp());
msgInner.setBornHost(msgExt.getBornHost());
msgInner.setStoreHost(this.getStoreHost());
msgInner.setReconsumeTimes(msgExt.getReconsumeTimes() + 1); // 消费次数+1
String originMsgId = MessageAccessor.getOriginMessageId(msgExt);
MessageAccessor.setOriginMessageId(msgInner, UtilAll.isBlank(originMsgId) ? msgExt.getMsgId() : originMsgId);
msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgExt.getProperties()));
boolean succeeded = false;
// 存储到Commitlog中
// Put retry topic to master message store
PutMessageResult putMessageResult = masterBroker.getMessageStore().putMessage(msgInner);
if (putMessageResult != null) {
String commercialOwner = request.getExtFields().get(BrokerStatsManager.COMMERCIAL_OWNER);
switch (putMessageResult.getPutMessageStatus()) {
case PUT_OK:
String backTopic = msgExt.getTopic();
String correctTopic = msgExt.getProperty(MessageConst.PROPERTY_RETRY_TOPIC);
if (correctTopic != null) {
backTopic = correctTopic;
}
if (TopicValidator.RMQ_SYS_SCHEDULE_TOPIC.equals(msgInner.getTopic())) {
masterBroker.getBrokerStatsManager().incTopicPutNums(msgInner.getTopic());
masterBroker.getBrokerStatsManager().incTopicPutSize(msgInner.getTopic(), putMessageResult.getAppendMessageResult().getWroteBytes());
masterBroker.getBrokerStatsManager().incQueuePutNums(msgInner.getTopic(), msgInner.getQueueId());
masterBroker.getBrokerStatsManager().incQueuePutSize(msgInner.getTopic(), msgInner.getQueueId(), putMessageResult.getAppendMessageResult().getWroteBytes());
}
masterBroker.getBrokerStatsManager().incSendBackNums(requestHeader.getGroup(), backTopic);
if (isDLQ) {
masterBroker.getBrokerStatsManager().incDLQStatValue(
BrokerStatsManager.SNDBCK2DLQ_TIMES,
commercialOwner,
requestHeader.getGroup(),
requestHeader.getOriginTopic(),
BrokerStatsManager.StatsType.SEND_BACK_TO_DLQ.name(),
1);
String uniqKey = msgInner.getProperties().get(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX);
DLQ_LOG.info("send msg to DLQ {}, owner={}, originalTopic={}, consumerId={}, msgUniqKey={}, storeTimestamp={}",
newTopic,
commercialOwner,
requestHeader.getOriginTopic(),
requestHeader.getGroup(),
uniqKey,
putMessageResult.getAppendMessageResult().getStoreTimestamp());
}
response.setCode(ResponseCode.SUCCESS);
response.setRemark(null);
succeeded = true;
break;
default:
break;
}
if (!succeeded) {
response.setCode(ResponseCode.SYSTEM_ERROR);
response.setRemark(putMessageResult.getPutMessageStatus().name());
}
} else {
if (isDLQ) {
String owner = request.getExtFields().get(BrokerStatsManager.COMMERCIAL_OWNER);
String uniqKey = msgInner.getProperties().get(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX);
DLQ_LOG.info("failed to send msg to DLQ {}, owner={}, originalTopic={}, consumerId={}, msgUniqKey={}, result={}",
newTopic,
owner,
requestHeader.getOriginTopic(),
requestHeader.getGroup(),
uniqKey,
"null");
}
response.setCode(ResponseCode.SYSTEM_ERROR);
response.setRemark("putMessageResult is null");
}
if (this.hasConsumeMessageHook() && !UtilAll.isBlank(requestHeader.getOriginMsgId())) {
String namespace = NamespaceUtil.getNamespaceFromResource(requestHeader.getGroup());
ConsumeMessageContext context = new ConsumeMessageContext();
context.setNamespace(namespace);
context.setTopic(requestHeader.getOriginTopic());
context.setConsumerGroup(requestHeader.getGroup());
context.setCommercialRcvStats(BrokerStatsManager.StatsType.SEND_BACK);
context.setCommercialRcvTimes(1);
context.setCommercialOwner(request.getExtFields().get(BrokerStatsManager.COMMERCIAL_OWNER));
context.setAccountAuthType(request.getExtFields().get(BrokerStatsManager.ACCOUNT_AUTH_TYPE));
context.setAccountOwnerParent(request.getExtFields().get(BrokerStatsManager.ACCOUNT_OWNER_PARENT));
context.setAccountOwnerSelf(request.getExtFields().get(BrokerStatsManager.ACCOUNT_OWNER_SELF));
context.setRcvStat(isDLQ ? BrokerStatsManager.StatsType.SEND_BACK_TO_DLQ : BrokerStatsManager.StatsType.SEND_BACK);
context.setSuccess(succeeded);
context.setRcvMsgNum(1);
//Set msg body size 0 when sent back by consumer.
context.setRcvMsgSize(0);
context.setCommercialRcvMsgNum(succeeded ? 1 : 0);
try {
this.executeConsumeMessageHookAfter(context);
} catch (AbortProcessException e) {
response.setCode(e.getResponseCode());
response.setRemark(e.getErrorMessage());
}
}
return response;
}
重试新消息的延迟时间达到后,则允许再次拉取消息,进而重新消费。拉取消息提交到消费线程消费时,重试主题恢复到原始主题重新消费。ConsumeRequest#run()消费消息任务,详细见上节,如下代码所示。
@Override
public void run() {
......
// 恢复重试消息主题名(消息重试机制决定)
defaultMQPushConsumerImpl.tryResetPopRetryTopic(msgs, consumerGroup);
defaultMQPushConsumerImpl.resetRetryAndNamespace(msgs, defaultMQPushConsumer.getConsumerGroup());
......
}
三、消费进度管理
上面章节介绍,消费一批消息成功后,则ProceeQueue移除该批消息,获取剩下的msgTreeMap消息中最小的偏移量,避免重复消费,同时更新消费偏移量。那么消费进度存储在哪里呢?
-
广播模式:同一个消费组的所有消费者都需要消费主题下的所有消息,也就是同组内的消费者的消息消费行为是对立的,互相不影响,所以消费进度存储在本地消费者。
-
集群模式:同一个消费组的所有消费者共享主题下的所有消息,同一个消息消费队列在同一时间只会被消费组内的一个消费者消费,并且随着消费队列的动态变化重新负载,所以消费进度存储在Broker。
org.apache.rocketmq.client.consumer.store.OffsetStore是消费进度接口,其UML图及接口方法如下。
消费者启动时,会初始化消费进度。如:根据集群模式创建OffsetStore接口的具体实现类;执行load()加载消费进度到内存中(offsetTable维护);执行5s定时周期任务,持久化消费进度。
1. 广播模式消费进度存储
org.apache.rocketmq.client.consumer.store.LocalFileOffsetStore是广播模式消费进度存储的实现类,其关键属性如下。
// 本地存储目录,默认用户主目录/.rocketmq_offsets,通过-Drocketmq.client.localOffsetStoreDir配置
public final static String LOCAL_OFFSET_STORE_DIR = System.getProperty(
"rocketmq.client.localOffsetStoreDir",
System.getProperty("user.home") + File.separator + ".rocketmq_offsets");
// 存储文件,路径:LOCAL_OFFSET_STORE_DIR/.rocketmq_offsets/{mQClientFactory.getClientId()}/groupName/offsets.json
private final String storePath;
// 内存中消费队列的消费进度
private ConcurrentMap<MessageQueue/* 消费队列 */, AtomicLong/* 消费进度 */> offsetTable = new ConcurrentHashMap<MessageQueue, AtomicLong>();
消费者启动时,会初始化消费进度。若是广播模式,则从文件中加载消费进度,LocalFileOffsetStore#load加载消费进度,如下代码所示。
/**
* 广播模式下加载消费进度文件(消费者启动时,会初始化消费进度)
* 消费者启动入口:{@link DefaultMQPushConsumerImpl#start()}
* step1:读取本地磁盘消费队列的消费进度到内存
* {@link LocalFileOffsetStore#readLocalOffset()}
* step2:遍历,消费进度存储到内存中消费队列的消费进度{@link LocalFileOffsetStore#offsetTable}
*/
@Override
public void load() throws MQClientException {
// 读取本地磁盘消费队列的消费进度到内存
OffsetSerializeWrapper offsetSerializeWrapper = this.readLocalOffset();
if (offsetSerializeWrapper != null && offsetSerializeWrapper.getOffsetTable() != null) {
offsetTable.putAll(offsetSerializeWrapper.getOffsetTable());
// 打印日志
for (Entry<MessageQueue, AtomicLong> mqEntry : offsetSerializeWrapper.getOffsetTable().entrySet()) {
AtomicLong offset = mqEntry.getValue();
log.info("load consumer's offset, {} {} {}",
this.groupName,
mqEntry.getKey(),
offset.get());
}
}
}
消费者定时5s周期执行持久化消费进度到磁盘,LocalFileOffsetStore#persistAll,代码如下。
/**
* 持久化所有消费进度到磁盘
* 什么时候持久化呢? MQ的客户端实例启动定时任务,每5s持久化一次
* 定时任务{@link MQClientInstance#startScheduledTask()}5s执行persistAllConsumerOffset()方法
*/
@Override
public void persistAll(Set<MessageQueue> mqs) {
if (null == mqs || mqs.isEmpty())
return;
// 构建对象,把指定的MessageQueue添加到offsetTable中,进行持久化
OffsetSerializeWrapper offsetSerializeWrapper = new OffsetSerializeWrapper();
for (Map.Entry<MessageQueue, AtomicLong> entry : this.offsetTable.entrySet()) {
if (mqs.contains(entry.getKey())) {
AtomicLong offset = entry.getValue();
offsetSerializeWrapper.getOffsetTable().put(entry.getKey(), offset);
}
}
// 转为json字符串
String jsonString = offsetSerializeWrapper.toJson(true);
if (jsonString != null) {
try {
MixAll.string2File(jsonString, this.storePath);
} catch (IOException e) {
log.error("persistAll consumer offset Exception, " + this.storePath, e);
}
}
}
2. 集群模式消费进度存储
org.apache.rocketmq.client.consumer.store.RemoteBrokerOffsetStore是集群模式消费进度存储的实现类,其关键属性如下。
// 内存中消费队列的消费进度
private ConcurrentMap<MessageQueue/* 消费队列 */, AtomicLong/* 消费进度 */> offsetTable = new ConcurrentHashMap<MessageQueue, AtomicLong>();
消费者启动时,会初始化消费进度。执行load()方法为空,即:集群模式下无需加载操作。成功消费该批消息后,执行OffsetStore#updateOffset方法更新本地消费队列的消费进度(内存消费进度)。而消费者定时5s周期执行持久化消费进度到磁盘,LocalFileOffsetStore#persistAll(发送到Broker更新内存消费进度)。同时,Broker端默认10s周期执行持久化一次消费进度,如下所示是集群模式下消费进度管理图 。
org.apache.rocketmq.broker.offset.ConsumerOffsetManager是Broker的消费进度更新管理器,其关键属性如下。存储文件名为${RocketMQ_HOME}/store/config/consumerOffset.json。
// 消费进度列表
protected ConcurrentMap<String/* topic@group */, ConcurrentMap<Integer/* 消费队列ID */, Long/* 消费进度 */>> offsetTable = new ConcurrentHashMap<String, ConcurrentMap<Integer, Long>>(512);
RemoteBrokerOffsetStore#persist持久化指定消费队列的消费进度,其核心方法是updateConsumeOffsetToBroker(),其代码如下。
/**
* Update the Consumer Offset synchronously, once the Master is off, updated to Slave, here need to be optimized.
*/
@Override
public void updateConsumeOffsetToBroker(MessageQueue mq, long offset, boolean isOneway) throws RemotingException,
MQBrokerException, InterruptedException, MQClientException {
// 获取消费队列的Broker
FindBrokerResult findBrokerResult = this.mQClientFactory.findBrokerAddressInSubscribe(this.mQClientFactory.getBrokerNameFromMessageQueue(mq), MixAll.MASTER_ID, false);
if (null == findBrokerResult) {
this.mQClientFactory.updateTopicRouteInfoFromNameServer(mq.getTopic());
findBrokerResult = this.mQClientFactory.findBrokerAddressInSubscribe(this.mQClientFactory.getBrokerNameFromMessageQueue(mq), MixAll.MASTER_ID, false);
}
if (findBrokerResult != null) {
UpdateConsumerOffsetRequestHeader requestHeader = new UpdateConsumerOffsetRequestHeader();
requestHeader.setTopic(mq.getTopic());
requestHeader.setConsumerGroup(this.groupName);
requestHeader.setQueueId(mq.getQueueId());
requestHeader.setCommitOffset(offset);
requestHeader.setBname(mq.getBrokerName());
// 单向更新,无更新响应结果
if (isOneway) {
this.mQClientFactory.getMQClientAPIImpl().updateConsumerOffsetOneway(
findBrokerResult.getBrokerAddr(), requestHeader, 1000 * 5);
}
// 不是单向,则返回是否更新成功
else {
this.mQClientFactory.getMQClientAPIImpl().updateConsumerOffset(
findBrokerResult.getBrokerAddr(), requestHeader, 1000 * 5);
}
} else {
throw new MQClientException("The broker[" + mq.getBrokerName() + "] not exist", null);
}
}
org.apache.rocketmq.client.consumer.store.RemoteBrokerOffsetStore#readOffset是读取Broker端的消费进度的核心方法,代码如下。
@Override
public long readOffset(final MessageQueue mq, final ReadOffsetType type) {
if (mq != null) {
switch (type) {
// 先从内存中读取,再从磁盘读取
case MEMORY_FIRST_THEN_STORE:
// 从内存读取
case READ_FROM_MEMORY: {
AtomicLong offset = this.offsetTable.get(mq);
if (offset != null) {
return offset.get();
} else if (ReadOffsetType.READ_FROM_MEMORY == type) {
return -1;
}
}
// 从磁盘读取
case READ_FROM_STORE: {
try {
// 从broker获取指定消费队列的消费偏移量
long brokerOffset = this.fetchConsumeOffsetFromBroker(mq);
AtomicLong offset = new AtomicLong(brokerOffset);
this.updateOffset(mq, offset.get(), false);
return brokerOffset;
}
// No offset in broker
catch (OffsetNotFoundException e) {
return -1;
}
//Other exceptions
catch (Exception e) {
log.warn("fetchConsumeOffsetFromBroker exception, " + mq, e);
return -2;
}
}
default:
break;
}
}
return -3;
}
四、参考资料
- https://blog.csdn.net/qq_21040559/article/details/122775470
- https://www.cnblogs.com/shanml/p/16989785.html