RocketMQ5.0消息消费<一> _ PUSH模式的消息拉取
一、消息消费概述
消息消费以组的模式开展,一个消费组内有多个消费者,每一个消费者可订阅多个主题,消费组之间有两种消费模式:集群模式(默认)、广播模式:
- 集群模式(默认):主题下的同一条消息只允许被相同消费组的其中一个消费者消费。消费偏移量存储在Broker端。
- 广播模式:主题下的同一条消息将被集群内的所有消费者消费。消费偏移量存储在消费端。
消息服务器与消费者之间的消息传送有两种模式:推模式(默认)、拉模式。RocketMQ消息推模式的实现基于拉模式,在拉模式上包装一层,一个拉取任务完成后开始下一个拉取任务。
- PUSH推模式(默认):消息到达Broker后,推送给消息消费者。
- PULL拉模式:消费者主动发起拉消息请求。
RocketMQ支持局部顺序消息消费,即保证同一个消息队列上的消息顺序消费。不支持消息全局顺序消费, 若实现某一主题的全局顺序消息消费,可以将该主题的消费队列数设置为1 ,牺牲高可用性。
RocketMQ支持两种消息过滤模式:表达式(TAG、SQL92),类过滤模式。
消息拉模式主要是由客户端调用消息拉取API,而消息推模式是消息服务器Broker主动将消息推送到消息消费者,本章节介绍推模式下消息消费原理。
二、消费者启动流程
1. 订阅主题
如下所示,是消费者启动前的订阅主题信息代码。
// consumer.subscribe(TOPIC, "TAG_ORDER || TAG_ALL"); // TAG模式过滤消息,多个TAG用||隔开
consumer.subscribe(TOPIC, MessageSelector.bySql("(orderId > 5)")); // SQL表达式过滤信息
org.apache.rocketmq.client.consumer.DefaultMQPushConsumer#subscribe(java.lang.String, org.apache.rocketmq.client.consumer.MessageSelector)根据消息选择器订阅消息的方法,其调用链及代码如下。注意事项:
- 根据订阅主题构建订阅信息{@link SubscriptionData};
- 主题订阅信息添加到负载均衡中,后创建拉取消息任务。
/**
* 订阅topic
* step1:构建订阅信息与消息过滤表达式{@link SubscriptionData}
* step2: 放入订阅信息负载均衡实现
*/
public void subscribe(final String topic, final MessageSelector messageSelector) throws MQClientException {
try {
if (messageSelector == null) {
subscribe(topic, SubscriptionData.SUB_ALL);
return;
}
// 构建订阅信息
SubscriptionData subscriptionData = FilterAPI.build(topic,
messageSelector.getExpression(), messageSelector.getExpressionType());
// 订阅添加到负载均衡中,后创建拉取消息任务
this.rebalanceImpl.getSubscriptionInner().put(topic, subscriptionData);
if (this.mQClientFactory != null) {
// 加锁定时发送到所有Broker(类过滤源代码发送到过滤服务器上)
this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();
}
} catch (Exception e) {
throw new MQClientException("subscription exception", e);
}
}
2. 消费者启动
下图是消费者UML图。org.apache.rocketmq.client.consumer.DefaultMQPushConsumer是PUSH模式消费者实现类,该类的关键属性如下代码所示。
// 消费组
private String consumerGroup;
/**
* 消费组下的不同消费模式:
* 集群模式:默认,topic的同一消息只能被组内的一个消费者消费
* 广播模式:topic的同一消息被组内的所有消费者消费
*/
private MessageModel messageModel = MessageModel.CLUSTERING;
/**
* 消费策略
* CONSUME_FROM_LAST_OFFSET:从最新的消息开始消费(默认)
* CONSUME_FROM_FIRST_OFFSET:从最早的消息开始消费
* CONSUME_FROM_TIMESTAMP:从消费者启动时间戳开始消费
*/
private ConsumeFromWhere consumeFromWhere = ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET;
// 集群模式下消费队列的负载均衡策略
private AllocateMessageQueueStrategy allocateMessageQueueStrategy;
// 订阅主题信息
private Map<String /* topic */, String /* sub expression */> subscription = new HashMap<String, String>();
// 消息业务监听器
private MessageListener messageListener;
// 消费进度存储器
private OffsetStore offsetStore;
// 最小消费者线程数量
private int consumeThreadMin = 20;
// 最大消费者线程数量
private int consumeThreadMax = 20;
/**
* 消费队列中消息最大跨度:最大消息的偏移量 - 最小消息的偏移量
* 跨度默认2000,超出时延迟50ms再拉取消息
*/
private int consumeConcurrentlyMaxSpan = 2000;
// 消息拉取间隔时间
private long pullInterval = 0;
// 并发消费时一次消费消息条数
private int consumeMessageBatchMaxSize = 1;
// 每次拉取消息所拉取的消息条数,默认32
private int pullBatchSize = 32;
// 每次拉取消息时是否更新订阅信息,默认false
private boolean postSubscriptionWhenPull = false;
// 最大消费重试次数
private int maxReconsumeTimes = -1;
// 消息延迟到消费线程的时间,默认1s
private long suspendCurrentQueueTimeMillis = 1000;
// 消费超时时间,默认15min
private long consumeTimeout = 15;
org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl#start是消费者启动的核心逻辑方法,而且是个synchronized修饰的方法,避免一个MQClientInstance实例下多个消费者同时启动,如下代码所示。注意事项:
-
获取主题订阅信息,即:构建{@link SubscriptionData}。订阅关系的来源:
a. 消费启动时订阅主题,即:DefaultMQPushConsumerImpl#subscribe();
b. 订阅重试主题,即:主题为:%RETRY% + 消费组名称,不是消费者订阅的主题。
-
初始化消费进度offsetStore。广播模式:消费进度存储在消费端;集群模式:消费进度存储在Broker端。
-
创建消息消费线程池并启动consumeMessageService,负责消费消息:顺序消息、并发消息(默认)。
-
启动MQClientInstance实例时,会启动拉取消息服务{@link PullMessageService} +
启动消费者负载均衡{@link RebalanceService}。
/**
* 消费者启动的核心方法
* step1:消费者默认状态CREATE_JUST,查看{@link DefaultMQPushConsumerImpl#serviceState}属性参数
* step2:检查配置信息,如:消费组、消费模式、主题订阅信息、消费偏移量等内容
* step3:获取主题订阅信息:构建{@link SubscriptionData},主题的负载均衡
* 订阅关系的来源:
* a. 消费启动时订阅主题,即:DefaultMQPushConsumerImpl#subscribe()
* b. 订阅重试主题,即:主题为:%RETRY% + 消费组名称,不是消费者订阅的主题
* step4:初始化:MQClientInstance + 消息重新负载实现类 + 消费进度
* 广播模式:消费进度存储在消费端
* 集群模式:消费进度存储在Broker端
* step5:创建消息消费线程池并启动,负责消费消息:顺序消息、并发消息(默认)
* step6:向MQClientInstance注册消费者并启动MQClientInstance
* 注意:同一个JVM所有生产者、消费者共同持有一个相同MQClientInstance,并只会启动一次
* step7:启动MQClientInstance实例,会启动拉取消息服务{@link PullMessageService} + 启动消费者负载均衡{@link RebalanceService}
*/
public synchronized void start() throws MQClientException {
switch (this.serviceState) {
case CREATE_JUST:
log.info("the consumer [{}] start beginning. messageModel={}, isUnitMode={}", this.defaultMQPushConsumer.getConsumerGroup(),
this.defaultMQPushConsumer.getMessageModel(), this.defaultMQPushConsumer.isUnitMode());
this.serviceState = ServiceState.START_FAILED;
// 检查配置信息,如:消费组、消费模式、主题订阅信息、消费偏移量等内容
this.checkConfig();
/*
* 获取主题订阅信息:构建{@link SubscriptionData},主题的负载均衡
* 订阅关系的来源:
* a. 消费启动时订阅主题,即:DefaultMQPushConsumerImpl#subscribe()
* b. 订阅重试主题,即:主题为:%RETRY% + 消费组名称,不是消费者订阅的主题
*/
this.copySubscription();
if (this.defaultMQPushConsumer.getMessageModel() == MessageModel.CLUSTERING) {
this.defaultMQPushConsumer.changeInstanceNameToPID();
}
// 初始化MQClientInstance
this.mQClientFactory = MQClientManager.getInstance().getOrCreateMQClientInstance(this.defaultMQPushConsumer, this.rpcHook);
// 初始化消息重新负载实现类
this.rebalanceImpl.setConsumerGroup(this.defaultMQPushConsumer.getConsumerGroup());
this.rebalanceImpl.setMessageModel(this.defaultMQPushConsumer.getMessageModel());
this.rebalanceImpl.setAllocateMessageQueueStrategy(this.defaultMQPushConsumer.getAllocateMessageQueueStrategy());
this.rebalanceImpl.setmQClientFactory(this.mQClientFactory);
if (this.pullAPIWrapper == null) {
this.pullAPIWrapper = new PullAPIWrapper(
mQClientFactory,
this.defaultMQPushConsumer.getConsumerGroup(), isUnitMode());
}
this.pullAPIWrapper.registerFilterMessageHook(filterMessageHookList);
// 初始化消费进度
if (this.defaultMQPushConsumer.getOffsetStore() != null) {
this.offsetStore = this.defaultMQPushConsumer.getOffsetStore();
} else {
switch (this.defaultMQPushConsumer.getMessageModel()) {
// 广播模式,存储在消费端
case BROADCASTING:
this.offsetStore = new LocalFileOffsetStore(this.mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup());
break;
// 集群
case CLUSTERING:
this.offsetStore = new RemoteBrokerOffsetStore(this.mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup());
break;
default:
break;
}
this.defaultMQPushConsumer.setOffsetStore(this.offsetStore);
}
this.offsetStore.load();
// 创建消息消费线程池并启动,负责消费消息
// 顺序消息
if (this.getMessageListenerInner() instanceof MessageListenerOrderly) {
this.consumeOrderly = true;
this.consumeMessageService =
new ConsumeMessageOrderlyService(this, (MessageListenerOrderly) this.getMessageListenerInner());
//POPTODO reuse Executor ?
this.consumeMessagePopService = new ConsumeMessagePopOrderlyService(this, (MessageListenerOrderly) this.getMessageListenerInner());
}
// 并发消息
else if (this.getMessageListenerInner() instanceof MessageListenerConcurrently) {
this.consumeOrderly = false;
this.consumeMessageService =
new ConsumeMessageConcurrentlyService(this, (MessageListenerConcurrently) this.getMessageListenerInner());
//POPTODO reuse Executor ?
this.consumeMessagePopService =
new ConsumeMessagePopConcurrentlyService(this, (MessageListenerConcurrently) this.getMessageListenerInner());
}
this.consumeMessageService.start();
// POPTODO
this.consumeMessagePopService.start();
/*
向MQClientInstance注册消费者并启动MQClientInstance
注意:同一个JVM所有生产者、消费者共同持有一个相同MQClientInstance,并启动一次
*/
boolean registerOK = mQClientFactory.registerConsumer(this.defaultMQPushConsumer.getConsumerGroup(), this);
if (!registerOK) {
// 注册失败:改变状态、关闭消费线程
this.serviceState = ServiceState.CREATE_JUST;
this.consumeMessageService.shutdown(defaultMQPushConsumer.getAwaitTerminationMillisWhenShutdown());
throw new MQClientException("The consumer group[" + this.defaultMQPushConsumer.getConsumerGroup()
+ "] has been created before, specify another name please." + FAQUrl.suggestTodo(FAQUrl.GROUP_NAME_DUPLICATE_URL),
null);
}
// 启动MQClientInstance
mQClientFactory.start();
log.info("the consumer [{}] start OK.", this.defaultMQPushConsumer.getConsumerGroup());
this.serviceState = ServiceState.RUNNING;
break;
case RUNNING:
case START_FAILED:
case SHUTDOWN_ALREADY:
throw new MQClientException("The PushConsumer service state not OK, maybe started once, "
+ this.serviceState
+ FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK),
null);
default:
break;
}
this.updateTopicSubscribeInfoWhenSubscriptionChanged();
this.mQClientFactory.checkClientInBroker();
this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();
this.mQClientFactory.rebalanceImmediately();
}
三、PUSH模式的消息拉取机制
如下图所示,是PUSH模式的消息拉取流程图。RocketMQ没有真正实现推模式,而是消费者主动向Broker拉取消息,RocketMQ推模式是消费者循环向Broker端发送消息拉取请求。
消费端拉取的消息存储在org.apache.rocketmq.client.impl.consumer.ProcessQueue类中,该类是消费端重现Broker的消费队列的快照(拉取消息的存放在msgTreeMap) ,其关键属性如下。
// 重入读写锁,控制并发修改msgTreeMap、consumingMsgOrderlyTreeMap
private final ReadWriteLock treeMapLock = new ReentrantReadWriteLock();
// 消息容器(拉取消息存储属性)
private final TreeMap<Long/* 消息在ConsumeQueue中的偏移量 */, MessageExt /* 消息内容 */> msgTreeMap = new TreeMap<Long, MessageExt>();
// msgTreeMap中的消息数量
private final AtomicLong msgCount = new AtomicLong();
private final AtomicLong msgSize = new AtomicLong();
private final Lock consumeLock = new ReentrantLock();
/**
* 处理顺序消息使用,从{@link ProcessQueue#msgTreeMap}取出时,将其存入该参数中
* A subset of msgTreeMap, will only be used when orderly consume
*/
private final TreeMap<Long, MessageExt> consumingMsgOrderlyTreeMap = new TreeMap<Long, MessageExt>();
// msgTreeMap中最大队列偏移量
private volatile long queueOffsetMax = 0L;
// 当前对象是否被丢弃
private volatile boolean dropped = false;
// 上次消息拉取时间戳
private volatile long lastPullTimestamp = System.currentTimeMillis();
// 消息消息消费时间戳
private volatile long lastConsumeTimestamp = System.currentTimeMillis();
// 顺序消息时,待处理队列是否被锁定
private volatile boolean locked = false;
org.apache.rocketmq.client.impl.consumer.PullRequest拉取请求对象,关键属性如下。
// 消费组
private String consumerGroup;
// 待拉取消费队列
private MessageQueue messageQueue;
// 消息处理队列
private ProcessQueue processQueue;
// 待拉取的messageQueue的偏移量
private long nextOffset;
// 是否锁定
private boolean previouslyLocked = false;
1. 消费端拉取消息
MQClientInstance实例启动时,会启动拉取消息服务org.apache.rocketmq.client.impl.consumer.PullMessageService,该类是一个线程,负责从broker拉取消息并提交到线程池消费。下图是run()方法的调用链。
org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl#pullMessage是拉取消息的核心逻辑方法,代码如下所示。注意事项:
- org.apache.rocketmq.client.impl.consumer.ProcessQueue:该类是Broker的消费队列的重现和快照(消息的存放),
a. 默认从broker每次拉取32条消息,按队列偏移量存入ProcessQueue;
b. PullMessageService维护一个线程池,将拉取请求任务提交到线程池;
c. 消息消费成功后,从ProcessQueue中移除
- org.apache.rocketmq.client.consumer.PullCallback:定义拉取消息成功或失败时的处理逻辑。根据拉取消息结果状态,作不同处理。其中拉取结果为FOUND时的处理:
step1:获取下一次拉取的偏移量,矫正后的偏移量;
step2:本次拉取消息为空,则立即执行下次拉取(TAG具体内容过滤导致可能为空);
step3:拉取的消息,存放到ProcessQueue.msgTreeMap中;
step4:本次拉取消息,提交到ConsumeMessageService,供消费者消费(异步提交),本次拉取消息完成。
/**
* 拉取消息
* step1:消息处理队列是否被丢弃{@link ProcessQueue};
* step2:检查当前消费者状态:消费者是否被挂起;
* step3:拉取消息流控:消息总条数、消息总大小、消息最大/最小间隔等流控,并每1000次打印流控信息;
* step4:构建消息拉取的sysFlag;
* step5:从Broker服务器拉取消息{@link PullAPIWrapper#pullKernelImpl};
* step6:定义拉取成功后处理,即:异步拉取回调函数{@link PullCallback};
* 异步回调函数{@link PullCallback}把拉取的消息提交消费消息{@link ConsumeMessageService#submitConsumeRequest)}
* @param pullRequest 消息拉取请求{@link PullRequest}
*/
public void pullMessage(final PullRequest pullRequest) {
// 获取消息处理队列
final ProcessQueue processQueue = pullRequest.getProcessQueue();
// 该处理队列被丢弃,则返回
if (processQueue.isDropped()) {
log.info("the pull request[{}] is dropped.", pullRequest.toString());
return;
}
// 本次拉取时间设置到上次拉取时间戳
pullRequest.getProcessQueue().setLastPullTimestamp(System.currentTimeMillis());
try {
// 消费者状态
this.makeSureStateOK();
} catch (MQClientException e) {
log.warn("pullMessage exception, consumer state not ok", e);
this.executePullRequestLater(pullRequest, pullTimeDelayMillsWhenException);
return;
}
// 消费者被挂起,则本次延迟1s再放入拉取任务队列中
if (this.isPause()) {
log.warn("consumer was paused, execute pull request later. instanceName={}, group={}", this.defaultMQPushConsumer.getInstanceName(), this.defaultMQPushConsumer.getConsumerGroup());
this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_SUSPEND);
return;
}
long cachedMessageCount = processQueue.getMsgCount().get(); // 消息总条数
long cachedMessageSizeInMiB = processQueue.getMsgSize().get() / (1024 * 1024); // 消息总大小
// 消息总条数 > 阈值1000条,则流控,忽略本次拉取,延迟50ms下次拉取
if (cachedMessageCount > this.defaultMQPushConsumer.getPullThresholdForQueue()) {
this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL);
// 流控次数每1000次,打印流控日志
if ((queueFlowControlTimes++ % 1000) == 0) {
log.warn(
"the cached message count exceeds the threshold {}, so do flow control, minOffset={}, maxOffset={}, count={}, size={} MiB, pullRequest={}, flowControlTimes={}",
this.defaultMQPushConsumer.getPullThresholdForQueue(), processQueue.getMsgTreeMap().firstKey(), processQueue.getMsgTreeMap().lastKey(), cachedMessageCount, cachedMessageSizeInMiB, pullRequest, queueFlowControlTimes);
}
return;
}
// 消息总大小 > 阈值100MB,则流控,忽略本次拉取,延迟50ms下次拉取
if (cachedMessageSizeInMiB > this.defaultMQPushConsumer.getPullThresholdSizeForQueue()) {
this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL);
if ((queueFlowControlTimes++ % 1000) == 0) {
log.warn(
"the cached message size exceeds the threshold {} MiB, so do flow control, minOffset={}, maxOffset={}, count={}, size={} MiB, pullRequest={}, flowControlTimes={}",
this.defaultMQPushConsumer.getPullThresholdSizeForQueue(), processQueue.getMsgTreeMap().firstKey(), processQueue.getMsgTreeMap().lastKey(), cachedMessageCount, cachedMessageSizeInMiB, pullRequest, queueFlowControlTimes);
}
return;
}
// 并发消息
if (!this.consumeOrderly) {
// 消息跨度 > 阈值2000条,则流控,忽略本次拉取,延迟50ms下次拉取
if (processQueue.getMaxSpan() > this.defaultMQPushConsumer.getConsumeConcurrentlyMaxSpan()) {
this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL);
if ((queueMaxSpanFlowControlTimes++ % 1000) == 0) {
log.warn(
"the queue's messages, span too long, so do flow control, minOffset={}, maxOffset={}, maxSpan={}, pullRequest={}, flowControlTimes={}",
processQueue.getMsgTreeMap().firstKey(), processQueue.getMsgTreeMap().lastKey(), processQueue.getMaxSpan(),
pullRequest, queueMaxSpanFlowControlTimes);
}
return;
}
}
// 顺序消息
else {
// 处理队列被锁住
if (processQueue.isLocked()) {
// 拉取请求是否锁定,默认不锁定false
if (!pullRequest.isPreviouslyLocked()) {
long offset = -1L;
try {
// 获取消费队列的消费进度,若进度<0时,则根据配置矫正消费进度(DefaultMQPushConsumer.consumeFromWhere配置)
offset = this.rebalanceImpl.computePullFromWhereWithException(pullRequest.getMessageQueue());
if (offset < 0) {
throw new MQClientException(ResponseCode.SYSTEM_ERROR, "Unexpected offset " + offset);
}
} catch (Exception e) {
this.executePullRequestLater(pullRequest, pullTimeDelayMillsWhenException);
log.error("Failed to compute pull offset, pullResult: {}", pullRequest, e);
return;
}
boolean brokerBusy = offset < pullRequest.getNextOffset();
log.info("the first time to pull message, so fix offset from broker. pullRequest: {} NewOffset: {} brokerBusy: {}",
pullRequest, offset, brokerBusy);
if (brokerBusy) {
log.info("[NOTIFYME]the first time to pull message, but pull request offset larger than broker consume offset. pullRequest: {} NewOffset: {}",
pullRequest, offset);
}
pullRequest.setPreviouslyLocked(true);
pullRequest.setNextOffset(offset);
}
}
// 处理队列未被锁住,则延迟
else {
this.executePullRequestLater(pullRequest, pullTimeDelayMillsWhenException);
log.info("pull message later because not locked in broker, {}", pullRequest);
return;
}
}
// 获取topic订阅信息,为空,则忽略本次拉取,延迟3s下次拉取
final SubscriptionData subscriptionData = this.rebalanceImpl.getSubscriptionInner().get(pullRequest.getMessageQueue().getTopic());
if (null == subscriptionData) {
this.executePullRequestLater(pullRequest, pullTimeDelayMillsWhenException);
log.warn("find the consumer's subscription failed, {}", pullRequest);
return;
}
final long beginTimestamp = System.currentTimeMillis();
// 拉取消息成功后处理(内部实现类)
PullCallback pullCallback = new PullCallback() {
@Override
public void onSuccess(PullResult pullResult) {
if (pullResult != null) {
// 消息字节数组填充msgList;TAG具体内容过滤
pullResult = DefaultMQPushConsumerImpl.this.pullAPIWrapper.processPullResult(pullRequest.getMessageQueue(), pullResult,
subscriptionData);
// 根据拉取消息结果状态,作不同处理
switch (pullResult.getPullStatus()) {
case FOUND:
long prevRequestOffset = pullRequest.getNextOffset();
// 获取下一次拉取的偏移量,矫正后的偏移量
pullRequest.setNextOffset(pullResult.getNextBeginOffset());
long pullRT = System.currentTimeMillis() - beginTimestamp;
DefaultMQPushConsumerImpl.this.getConsumerStatsManager().incPullRT(pullRequest.getConsumerGroup(),
pullRequest.getMessageQueue().getTopic(), pullRT);
long firstMsgOffset = Long.MAX_VALUE;
// 本次拉取消息为空,则立即执行下次拉取(TAG具体内容过滤导致可能为空)
if (pullResult.getMsgFoundList() == null || pullResult.getMsgFoundList().isEmpty()) {
DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
} else {
firstMsgOffset = pullResult.getMsgFoundList().get(0).getQueueOffset();
DefaultMQPushConsumerImpl.this.getConsumerStatsManager().incPullTPS(pullRequest.getConsumerGroup(),
pullRequest.getMessageQueue().getTopic(), pullResult.getMsgFoundList().size());
// 拉取的消息,存放到ProcessQueue.msgTreeMap中
boolean dispatchToConsume = processQueue.putMessage(pullResult.getMsgFoundList());
// 本次拉取消息,提交到ConsumeMessageService,供消费者消费(异步提交),本次拉取消息完成
DefaultMQPushConsumerImpl.this.consumeMessageService.submitConsumeRequest(
pullResult.getMsgFoundList(),
processQueue,
pullRequest.getMessageQueue(),
dispatchToConsume);
// 判断两次拉取时间间隔,执行下一次拉取消息
if (DefaultMQPushConsumerImpl.this.defaultMQPushConsumer.getPullInterval() > 0) {
DefaultMQPushConsumerImpl.this.executePullRequestLater(pullRequest,
DefaultMQPushConsumerImpl.this.defaultMQPushConsumer.getPullInterval());
} else {
DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
}
}
if (pullResult.getNextBeginOffset() < prevRequestOffset
|| firstMsgOffset < prevRequestOffset) {
log.warn(
"[BUG] pull message result maybe data wrong, nextBeginOffset: {} firstMsgOffset: {} prevRequestOffset: {}",
pullResult.getNextBeginOffset(),
firstMsgOffset,
prevRequestOffset);
}
break;
case NO_NEW_MSG: // 没有新消息
case NO_MATCHED_MSG: // 没有匹配的消息
// 服务器broker矫正的nextBeginOffset作为下一次拉取的偏移量
pullRequest.setNextOffset(pullResult.getNextBeginOffset());
DefaultMQPushConsumerImpl.this.correctTagsOffset(pullRequest);
// 立即拉取下一轮消息
DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
break;
case OFFSET_ILLEGAL: // offset不合规
log.warn("the pull request offset illegal, {} {}",
pullRequest.toString(), pullResult.toString());
// 服务器broker矫正的nextBeginOffset作为下一次拉取的偏移量
pullRequest.setNextOffset(pullResult.getNextBeginOffset());
// 丢弃ProcessQueue,其中里面的消息立即停止消费
pullRequest.getProcessQueue().setDropped(true);
DefaultMQPushConsumerImpl.this.executeTaskLater(new Runnable() {
@Override
public void run() {
try {
// 更新消费者消费进度
DefaultMQPushConsumerImpl.this.offsetStore.updateOffset(pullRequest.getMessageQueue(),
pullRequest.getNextOffset(), false);
// 持久化消费进度
DefaultMQPushConsumerImpl.this.offsetStore.persist(pullRequest.getMessageQueue());
// 当前消费队列从负载均衡RebalanceImpl移除
DefaultMQPushConsumerImpl.this.rebalanceImpl.removeProcessQueue(pullRequest.getMessageQueue());
log.warn("fix the pull request offset, {}", pullRequest);
} catch (Throwable e) {
log.error("executeTaskLater Exception", e);
}
}
}, 10000);
break;
default:
break;
}
}
}
@Override
public void onException(Throwable e) {
if (!pullRequest.getMessageQueue().getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
log.warn("execute the pull request exception", e);
}
// 延迟拉取消息
DefaultMQPushConsumerImpl.this.executePullRequestLater(pullRequest, pullTimeDelayMillsWhenException);
}
};
// 构建消息拉取的sysFlag
boolean commitOffsetEnable = false;
long commitOffsetValue = 0L;
if (MessageModel.CLUSTERING == this.defaultMQPushConsumer.getMessageModel()) {
commitOffsetValue = this.offsetStore.readOffset(pullRequest.getMessageQueue(), ReadOffsetType.READ_FROM_MEMORY);
if (commitOffsetValue > 0) {
commitOffsetEnable = true;
}
}
String subExpression = null;
boolean classFilter = false;
SubscriptionData sd = this.rebalanceImpl.getSubscriptionInner().get(pullRequest.getMessageQueue().getTopic());
if (sd != null) {
if (this.defaultMQPushConsumer.isPostSubscriptionWhenPull() && !sd.isClassFilterMode()) {
subExpression = sd.getSubString();
}
classFilter = sd.isClassFilterMode();
}
int sysFlag = PullSysFlag.buildSysFlag(
commitOffsetEnable, // commitOffset
true, // suspend 没有找到消息时,是否挂起broker标记
subExpression != null, // subscription 表达式过滤
classFilter // class filter 类过滤消息模式
);
try {
// 从broker服务端拉取消息
this.pullAPIWrapper.pullKernelImpl(
pullRequest.getMessageQueue(),
subExpression,
subscriptionData.getExpressionType(),
subscriptionData.getSubVersion(),
pullRequest.getNextOffset(),
this.defaultMQPushConsumer.getPullBatchSize(),
this.defaultMQPushConsumer.getPullBatchSizeInBytes(),
sysFlag,
commitOffsetValue,
// broker未找到消息时,broker配置了开启长轮询模式
BROKER_SUSPEND_MAX_TIME_MILLIS, // broker挂起超时时间,PUSH模式默认15s,PULL模式默认20s
CONSUMER_TIMEOUT_MILLIS_WHEN_SUSPEND, // broker挂起时,消费者超时时间,默认30s
CommunicationMode.ASYNC,
pullCallback
);
} catch (Exception e) {
log.error("pullKernelImpl exception", e);
this.executePullRequestLater(pullRequest, pullTimeDelayMillsWhenException);
}
}
org.apache.rocketmq.client.impl.consumer.PullAPIWrapper#pullKernelImpl从broker服务端拉取消息执行方法,代码如下。
/**
* 从broker服务端拉取消息
* step1:从本地MQClientInstance中根据消费队列mq获取brokerName、brokerId来查找broker信息;
* step2:本地没有broker信息,则从NameServer注册中心获取;
* step3:组装拉取请求头{@link PullMessageRequestHeader};
* step4:过滤模式是类模式,则从broker获取FilterServer,从FilterServer上拉取消息;
* step5:执行拉取消息,并返回。
* Broker端处理拉取请求:
* {@link org.apache.rocketmq.broker.processor.PullMessageProcessor#processRequest}
* @param mq 从哪个消费队列拉取消息
* @param subExpression 消息过滤表达式
* @param expressionType 过滤表达式类型:TAG、SQL92
* @param subVersion 订阅版本
* @param offset 消息拉取偏移量
* @param maxNums 最大拉取消息数量,默认32
* @param maxSizeInBytes 最大字节数
* @param sysFlag 拉取系统标记
* @param commitOffset 当前MessageQueue的消费进度(内存中)
* @param brokerSuspendMaxTimeMillis 允许Broker挂起时间,默认15s
* @param timeoutMillis 拉取超时时间
* @param communicationMode 拉取模式,默认异步拉取
* @param pullCallback 拉取回调函数
* @return 拉取结果
*/
public PullResult pullKernelImpl(
final MessageQueue mq,
final String subExpression,
final String expressionType,
final long subVersion,
final long offset,
final int maxNums,
final int maxSizeInBytes,
final int sysFlag,
final long commitOffset,
final long brokerSuspendMaxTimeMillis,
final long timeoutMillis,
final CommunicationMode communicationMode,
final PullCallback pullCallback
) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
// 从本地MQClientInstance中根据消费队列mq获取brokerName、brokerId来查找broker信息
FindBrokerResult findBrokerResult =
this.mQClientFactory.findBrokerAddressInSubscribe(this.mQClientFactory.getBrokerNameFromMessageQueue(mq),
this.recalculatePullFromWhichNode(mq), false);
// 本地没有,则根据topic从NameServer(Broker注册中心)
if (null == findBrokerResult) {
this.mQClientFactory.updateTopicRouteInfoFromNameServer(mq.getTopic());
findBrokerResult =
this.mQClientFactory.findBrokerAddressInSubscribe(this.mQClientFactory.getBrokerNameFromMessageQueue(mq),
this.recalculatePullFromWhichNode(mq), false);
}
if (findBrokerResult != null) {
{
// check version
if (!ExpressionType.isTagType(expressionType)
&& findBrokerResult.getBrokerVersion() < MQVersion.Version.V4_1_0_SNAPSHOT.ordinal()) {
throw new MQClientException("The broker[" + mq.getBrokerName() + ", "
+ findBrokerResult.getBrokerVersion() + "] does not upgrade to support for filter message by " + expressionType, null);
}
}
int sysFlagInner = sysFlag;
// broker是从,则sysFlagInner清除FLAG_COMMIT_OFFSET标记
if (findBrokerResult.isSlave()) {
sysFlagInner = PullSysFlag.clearCommitOffsetFlag(sysFlagInner);
}
// 组装拉取请求头
PullMessageRequestHeader requestHeader = new PullMessageRequestHeader();
requestHeader.setConsumerGroup(this.consumerGroup);
requestHeader.setTopic(mq.getTopic());
requestHeader.setQueueId(mq.getQueueId());
requestHeader.setQueueOffset(offset);
requestHeader.setMaxMsgNums(maxNums);
requestHeader.setSysFlag(sysFlagInner);
requestHeader.setCommitOffset(commitOffset);
requestHeader.setSuspendTimeoutMillis(brokerSuspendMaxTimeMillis);
requestHeader.setSubscription(subExpression);
requestHeader.setSubVersion(subVersion);
requestHeader.setMaxMsgBytes(maxSizeInBytes);
requestHeader.setExpressionType(expressionType);
requestHeader.setBname(mq.getBrokerName());
// 过滤模式是类模式,则从broker获取FilterServer,从FilterServer上拉取消息
String brokerAddr = findBrokerResult.getBrokerAddr();
if (PullSysFlag.hasClassFilterFlag(sysFlagInner)) {
// 随机获取Broker上的过滤服务器注册表中的一个过滤服务器
brokerAddr = computePullFromWhichFilterServer(mq.getTopic(), brokerAddr);
}
// 执行拉取消息,并返回结果
PullResult pullResult = this.mQClientFactory.getMQClientAPIImpl().pullMessage(
brokerAddr,
requestHeader,
timeoutMillis,
communicationMode,
pullCallback);
return pullResult;
}
throw new MQClientException("The broker[" + mq.getBrokerName() + "] not exist", null);
}
2. Broker处理拉取请求
org.apache.rocketmq.broker.processor.PullMessageProcessor#processRequest是Broker处理拉取消息请求的核心逻辑方法,代码如下。注意事项:
-
org.apache.rocketmq.store.MessageStore#getMessage(java.lang.String,
java.lang.String, int, long, int,
org.apache.rocketmq.store.MessageFilter):查找消息,默认一次查找32条记录(消费端定义)。 -
Broker是否开启slaveReadEnable:参数决定下次拉取使用从消息队列(主从Broker切换的逻辑代码)。
-
没有找到消息时,broker是否开启长轮询查找消息(默认true):
org.apache.rocketmq.broker.processor.DefaultPullMessageResultHandler#handle,即:没有找到消息时,挂起拉取请求,监听是否有消息到达,若有则重启挂起的请求。
/**
* 处理客户端拉取消息请求
* 消费者拉取消息入口请求broker:{@link org.apache.rocketmq.client.impl.MQClientAPIImpl#pullMessage}
* step1:broker是否读权限、订阅信息是否存在、topic存在且允许读
* step2:消息队列是否合规、有表达式过滤消息
* step3:查找消息{@link DefaultMessageStore#getMessage
* step4:设置nextBeginOffset、minOffset、maxOffset等
* step5:查找消息结果 转换成 拉取响应状态码
* step6:建议下次拉取使用从消息队列
* step7:消息消费函数钩子
* step8:更新消息消费进度
* step9:处理拉取请求,并响应,如:没有找到消息时,broker是否开启长轮询查找消息 {@link DefaultPullMessageResultHandler#handle}
* @param channel {@link Channel}
* @param request 客户端消息拉取请求
* @param brokerAllowSuspend broker是否允许挂起
* @return
* @throws RemotingCommandException
*/
private RemotingCommand processRequest(final Channel channel, RemotingCommand request, boolean brokerAllowSuspend)
throws RemotingCommandException {
RemotingCommand response = RemotingCommand.createResponseCommand(PullMessageResponseHeader.class);
final PullMessageResponseHeader responseHeader = (PullMessageResponseHeader) response.readCustomHeader();
// 解码获取拉取消息请求头
final PullMessageRequestHeader requestHeader =
(PullMessageRequestHeader) request.decodeCommandCustomHeader(PullMessageRequestHeader.class);
response.setOpaque(request.getOpaque());
LOGGER.debug("receive PullMessage request command, {}", request);
// broker是否具有读权限
if (!PermName.isReadable(this.brokerController.getBrokerConfig().getBrokerPermission())) {
response.setCode(ResponseCode.NO_PERMISSION);
response.setRemark(String.format("the broker[%s] pulling message is forbidden", this.brokerController.getBrokerConfig().getBrokerIP1()));
return response;
}
// 简单拉取模式下,是否开启
if (request.getCode() == RequestCode.LITE_PULL_MESSAGE && !this.brokerController.getBrokerConfig().isLitePullMessageEnable()) {
response.setCode(ResponseCode.NO_PERMISSION);
response.setRemark(
"the broker[" + this.brokerController.getBrokerConfig().getBrokerIP1() + "] for lite pull consumer is forbidden");
return response;
}
// 订阅信息是否存在
SubscriptionGroupConfig subscriptionGroupConfig =
this.brokerController.getSubscriptionGroupManager().findSubscriptionGroupConfig(requestHeader.getConsumerGroup());
if (null == subscriptionGroupConfig) {
response.setCode(ResponseCode.SUBSCRIPTION_GROUP_NOT_EXIST);
response.setRemark(String.format("subscription group [%s] does not exist, %s", requestHeader.getConsumerGroup(), FAQUrl.suggestTodo(FAQUrl.SUBSCRIPTION_GROUP_NOT_EXIST)));
return response;
}
// 订阅信息存在,是否允许消费
if (!subscriptionGroupConfig.isConsumeEnable()) {
response.setCode(ResponseCode.NO_PERMISSION);
responseHeader.setForbiddenType(ForbiddenType.GROUP_FORBIDDEN);
response.setRemark("subscription group no permission, " + requestHeader.getConsumerGroup());
return response;
}
final boolean hasCommitOffsetFlag = PullSysFlag.hasCommitOffsetFlag(requestHeader.getSysFlag());
final boolean hasSubscriptionFlag = PullSysFlag.hasSubscriptionFlag(requestHeader.getSysFlag());
// topic配置
TopicConfig topicConfig = this.brokerController.getTopicConfigManager().selectTopicConfig(requestHeader.getTopic());
if (null == topicConfig) {
LOGGER.error("the topic {} not exist, consumer: {}", requestHeader.getTopic(), RemotingHelper.parseChannelRemoteAddr(channel));
response.setCode(ResponseCode.TOPIC_NOT_EXIST);
response.setRemark(String.format("topic[%s] not exist, apply first please! %s", requestHeader.getTopic(), FAQUrl.suggestTodo(FAQUrl.APPLY_TOPIC_URL)));
return response;
}
// topic存在,是否允许读
if (!PermName.isReadable(topicConfig.getPerm())) {
response.setCode(ResponseCode.NO_PERMISSION);
responseHeader.setForbiddenType(ForbiddenType.TOPIC_FORBIDDEN);
response.setRemark("the topic[" + requestHeader.getTopic() + "] pulling message is forbidden");
return response;
}
TopicQueueMappingContext mappingContext = this.brokerController.getTopicQueueMappingManager().buildTopicQueueMappingContext(requestHeader, false);
{
RemotingCommand rewriteResult = rewriteRequestForStaticTopic(requestHeader, mappingContext);
if (rewriteResult != null) {
return rewriteResult;
}
}
// 消费队列是否合规
if (requestHeader.getQueueId() < 0 || requestHeader.getQueueId() >= topicConfig.getReadQueueNums()) {
String errorInfo = String.format("queueId[%d] is illegal, topic:[%s] topicConfig.readQueueNums:[%d] consumer:[%s]",
requestHeader.getQueueId(), requestHeader.getTopic(), topicConfig.getReadQueueNums(), channel.remoteAddress());
LOGGER.warn(errorInfo);
response.setCode(ResponseCode.SYSTEM_ERROR);
response.setRemark(errorInfo);
return response;
}
SubscriptionData subscriptionData = null;
ConsumerFilterData consumerFilterData = null;
// 有表达式过滤消息
if (hasSubscriptionFlag) {
try {
subscriptionData = FilterAPI.build(
requestHeader.getTopic(), requestHeader.getSubscription(), requestHeader.getExpressionType()
);
// 不是TAG模式,即:SQL2过滤
if (!ExpressionType.isTagType(subscriptionData.getExpressionType())) {
// 构建过滤数据
consumerFilterData = ConsumerFilterManager.build(
requestHeader.getTopic(), requestHeader.getConsumerGroup(), requestHeader.getSubscription(),
requestHeader.getExpressionType(), requestHeader.getSubVersion()
);
assert consumerFilterData != null;
}
} catch (Exception e) {
LOGGER.warn("Parse the consumer's subscription[{}] failed, group: {}", requestHeader.getSubscription(),
requestHeader.getConsumerGroup());
response.setCode(ResponseCode.SUBSCRIPTION_PARSE_FAILED);
response.setRemark("parse the consumer's subscription failed");
return response;
}
} else {
ConsumerGroupInfo consumerGroupInfo =
this.brokerController.getConsumerManager().getConsumerGroupInfo(requestHeader.getConsumerGroup());
if (null == consumerGroupInfo) {
LOGGER.warn("the consumer's group info not exist, group: {}", requestHeader.getConsumerGroup());
response.setCode(ResponseCode.SUBSCRIPTION_NOT_EXIST);
response.setRemark("the consumer's group info not exist" + FAQUrl.suggestTodo(FAQUrl.SAME_GROUP_DIFFERENT_TOPIC));
return response;
}
if (!subscriptionGroupConfig.isConsumeBroadcastEnable()
&& consumerGroupInfo.getMessageModel() == MessageModel.BROADCASTING) {
response.setCode(ResponseCode.NO_PERMISSION);
responseHeader.setForbiddenType(ForbiddenType.BROADCASTING_DISABLE_FORBIDDEN);
response.setRemark("the consumer group[" + requestHeader.getConsumerGroup() + "] can not consume by broadcast way");
return response;
}
boolean readForbidden = this.brokerController.getSubscriptionGroupManager().getForbidden(//
subscriptionGroupConfig.getGroupName(), requestHeader.getTopic(), PermName.INDEX_PERM_READ);
if (readForbidden) {
response.setCode(ResponseCode.NO_PERMISSION);
responseHeader.setForbiddenType(ForbiddenType.SUBSCRIPTION_FORBIDDEN);
response.setRemark("the consumer group[" + requestHeader.getConsumerGroup() + "] is forbidden for topic[" + requestHeader.getTopic() + "]");
return response;
}
subscriptionData = consumerGroupInfo.findSubscriptionData(requestHeader.getTopic());
if (null == subscriptionData) {
LOGGER.warn("the consumer's subscription not exist, group: {}, topic:{}", requestHeader.getConsumerGroup(), requestHeader.getTopic());
response.setCode(ResponseCode.SUBSCRIPTION_NOT_EXIST);
response.setRemark("the consumer's subscription not exist" + FAQUrl.suggestTodo(FAQUrl.SAME_GROUP_DIFFERENT_TOPIC));
return response;
}
if (subscriptionData.getSubVersion() < requestHeader.getSubVersion()) {
LOGGER.warn("The broker's subscription is not latest, group: {} {}", requestHeader.getConsumerGroup(),
subscriptionData.getSubString());
response.setCode(ResponseCode.SUBSCRIPTION_NOT_LATEST);
response.setRemark("the consumer's subscription not latest");
return response;
}
if (!ExpressionType.isTagType(subscriptionData.getExpressionType())) {
consumerFilterData = this.brokerController.getConsumerFilterManager().get(requestHeader.getTopic(),
requestHeader.getConsumerGroup());
if (consumerFilterData == null) {
response.setCode(ResponseCode.FILTER_DATA_NOT_EXIST);
response.setRemark("The broker's consumer filter data is not exist!Your expression may be wrong!");
return response;
}
if (consumerFilterData.getClientVersion() < requestHeader.getSubVersion()) {
LOGGER.warn("The broker's consumer filter data is not latest, group: {}, topic: {}, serverV: {}, clientV: {}",
requestHeader.getConsumerGroup(), requestHeader.getTopic(), consumerFilterData.getClientVersion(), requestHeader.getSubVersion());
response.setCode(ResponseCode.FILTER_DATA_NOT_LATEST);
response.setRemark("the consumer's consumer filter data not latest");
return response;
}
}
}
if (!ExpressionType.isTagType(subscriptionData.getExpressionType())
&& !this.brokerController.getBrokerConfig().isEnablePropertyFilter()) {
response.setCode(ResponseCode.SYSTEM_ERROR);
response.setRemark("The broker does not support consumer to filter message by " + subscriptionData.getExpressionType());
return response;
}
// 消息过滤
MessageFilter messageFilter;
// 支持重试消息过滤
if (this.brokerController.getBrokerConfig().isFilterSupportRetry()) {
messageFilter = new ExpressionForRetryMessageFilter(subscriptionData, consumerFilterData,
this.brokerController.getConsumerFilterManager());
}
// 不支持重试消息过滤,即:若是重试消息,则isMatchedByCommitLog直接返回true
else {
messageFilter = new ExpressionMessageFilter(subscriptionData, consumerFilterData,
this.brokerController.getConsumerFilterManager());
}
// 查找消息
final GetMessageResult getMessageResult =
this.brokerController.getMessageStore().getMessage(requestHeader.getConsumerGroup(), requestHeader.getTopic(),
requestHeader.getQueueId(), requestHeader.getQueueOffset(), requestHeader.getMaxMsgNums(), messageFilter);
if (getMessageResult != null) {
// 设置下次拉取的偏移量
response.setRemark(getMessageResult.getStatus().name());
responseHeader.setNextBeginOffset(getMessageResult.getNextBeginOffset());
responseHeader.setMinOffset(getMessageResult.getMinOffset());
// this does not need to be modified since it's not an accurate value under logical queue.
responseHeader.setMaxOffset(getMessageResult.getMaxOffset());
responseHeader.setTopicSysFlag(topicConfig.getTopicSysFlag());
responseHeader.setGroupSysFlag(subscriptionGroupConfig.getGroupSysFlag());
// 查找消息结果转换成拉取响应状态码
switch (getMessageResult.getStatus()) {
case FOUND:
response.setCode(ResponseCode.SUCCESS);
break;
case MESSAGE_WAS_REMOVING:
response.setCode(ResponseCode.PULL_RETRY_IMMEDIATELY);
break;
case NO_MATCHED_LOGIC_QUEUE:
case NO_MESSAGE_IN_QUEUE:
if (0 != requestHeader.getQueueOffset()) {
response.setCode(ResponseCode.PULL_OFFSET_MOVED);
// XXX: warn and notify me
LOGGER.info("the broker stores no queue data, fix the request offset {} to {}, Topic: {} QueueId: {} Consumer Group: {}",
requestHeader.getQueueOffset(),
getMessageResult.getNextBeginOffset(),
requestHeader.getTopic(),
requestHeader.getQueueId(),
requestHeader.getConsumerGroup()
);
} else {
response.setCode(ResponseCode.PULL_NOT_FOUND);
}
break;
case NO_MATCHED_MESSAGE:
response.setCode(ResponseCode.PULL_RETRY_IMMEDIATELY);
break;
case OFFSET_FOUND_NULL:
response.setCode(ResponseCode.PULL_NOT_FOUND);
break;
case OFFSET_OVERFLOW_BADLY:
response.setCode(ResponseCode.PULL_OFFSET_MOVED);
// XXX: warn and notify me
LOGGER.info("the request offset: {} over flow badly, fix to {}, broker max offset: {}, consumer: {}",
requestHeader.getQueueOffset(), getMessageResult.getNextBeginOffset(), getMessageResult.getMaxOffset(), channel.remoteAddress());
break;
case OFFSET_OVERFLOW_ONE:
response.setCode(ResponseCode.PULL_NOT_FOUND);
break;
case OFFSET_TOO_SMALL:
response.setCode(ResponseCode.PULL_OFFSET_MOVED);
LOGGER.info("the request offset too small. group={}, topic={}, requestOffset={}, brokerMinOffset={}, clientIp={}",
requestHeader.getConsumerGroup(), requestHeader.getTopic(), requestHeader.getQueueOffset(),
getMessageResult.getMinOffset(), channel.remoteAddress());
break;
default:
assert false;
break;
}
// 建议使用从消费队列
if (this.brokerController.getBrokerConfig().isSlaveReadEnable() && !this.brokerController.getBrokerConfig().isInBrokerContainer()) {
// consume too slow ,redirect to another machine
// 主Broker繁忙,建议下一次到从Broker拉取
if (getMessageResult.isSuggestPullingFromSlave()) {
responseHeader.setSuggestWhichBrokerId(subscriptionGroupConfig.getWhichBrokerWhenConsumeSlowly());
}
// consume ok
else {
responseHeader.setSuggestWhichBrokerId(subscriptionGroupConfig.getBrokerId());
}
} else {
responseHeader.setSuggestWhichBrokerId(MixAll.MASTER_ID);
}
if (this.brokerController.getBrokerConfig().getBrokerId() != MixAll.MASTER_ID && !getMessageResult.isSuggestPullingFromSlave()) {
if (this.brokerController.getMinBrokerIdInGroup() == MixAll.MASTER_ID) {
LOGGER.debug("slave redirect pullRequest to master, topic: {}, queueId: {}, consumer group: {}, next: {}, min: {}, max: {}",
requestHeader.getTopic(),
requestHeader.getQueueId(),
requestHeader.getConsumerGroup(),
responseHeader.getNextBeginOffset(),
responseHeader.getMinOffset(),
responseHeader.getMaxOffset()
);
responseHeader.setSuggestWhichBrokerId(MixAll.MASTER_ID);
if (!getMessageResult.getStatus().equals(GetMessageStatus.FOUND)) {
response.setCode(ResponseCode.PULL_RETRY_IMMEDIATELY);
}
}
}
// 消费消息钩子函数
if (this.hasConsumeMessageHook()) {
String owner = request.getExtFields().get(BrokerStatsManager.COMMERCIAL_OWNER);
String authType = request.getExtFields().get(BrokerStatsManager.ACCOUNT_AUTH_TYPE);
String ownerParent = request.getExtFields().get(BrokerStatsManager.ACCOUNT_OWNER_PARENT);
String ownerSelf = request.getExtFields().get(BrokerStatsManager.ACCOUNT_OWNER_SELF);
ConsumeMessageContext context = new ConsumeMessageContext();
context.setConsumerGroup(requestHeader.getConsumerGroup());
context.setTopic(requestHeader.getTopic());
context.setQueueId(requestHeader.getQueueId());
context.setAccountAuthType(authType);
context.setAccountOwnerParent(ownerParent);
context.setAccountOwnerSelf(ownerSelf);
context.setNamespace(NamespaceUtil.getNamespaceFromResource(requestHeader.getTopic()));
switch (response.getCode()) {
case ResponseCode.SUCCESS:
int commercialBaseCount = brokerController.getBrokerConfig().getCommercialBaseCount();
int incValue = getMessageResult.getMsgCount4Commercial() * commercialBaseCount;
context.setCommercialRcvStats(BrokerStatsManager.StatsType.RCV_SUCCESS);
context.setCommercialRcvTimes(incValue);
context.setCommercialRcvSize(getMessageResult.getBufferTotalSize());
context.setCommercialOwner(owner);
context.setRcvStat(BrokerStatsManager.StatsType.RCV_SUCCESS);
context.setRcvMsgNum(getMessageResult.getMessageCount());
context.setRcvMsgSize(getMessageResult.getBufferTotalSize());
context.setCommercialRcvMsgNum(getMessageResult.getMsgCount4Commercial());
break;
case ResponseCode.PULL_NOT_FOUND:
if (!brokerAllowSuspend) {
context.setCommercialRcvStats(BrokerStatsManager.StatsType.RCV_EPOLLS);
context.setCommercialRcvTimes(1);
context.setCommercialOwner(owner);
context.setRcvStat(BrokerStatsManager.StatsType.RCV_EPOLLS);
context.setRcvMsgNum(0);
context.setRcvMsgSize(0);
context.setCommercialRcvMsgNum(0);
}
break;
case ResponseCode.PULL_RETRY_IMMEDIATELY:
case ResponseCode.PULL_OFFSET_MOVED:
context.setCommercialRcvStats(BrokerStatsManager.StatsType.RCV_EPOLLS);
context.setCommercialRcvTimes(1);
context.setCommercialOwner(owner);
context.setRcvStat(BrokerStatsManager.StatsType.RCV_EPOLLS);
context.setRcvMsgNum(0);
context.setRcvMsgSize(0);
context.setCommercialRcvMsgNum(0);
break;
default:
assert false;
break;
}
try {
this.executeConsumeMessageHookBefore(context);
} catch (AbortProcessException e) {
response.setCode(e.getResponseCode());
response.setRemark(e.getErrorMessage());
return response;
}
}
//rewrite the response for the
RemotingCommand rewriteResult = rewriteResponseForStaticTopic(requestHeader, responseHeader, mappingContext, response.getCode());
if (rewriteResult != null) {
response = rewriteResult;
}
// 处理拉取请求,并响应,如:没有找到消息时,broker是否开启长轮询查找消息
response = this.pullMessageResultHandler.handle(
getMessageResult,
request,
requestHeader,
channel,
subscriptionData,
subscriptionGroupConfig,
brokerAllowSuspend,
messageFilter,
response
);
} else {
response.setCode(ResponseCode.SYSTEM_ERROR);
response.setRemark("store getMessage return null");
}
// 允许broker拉取消息挂起 + 有commitlog标记时:更新消息消费进度
boolean storeOffsetEnable = brokerAllowSuspend;
storeOffsetEnable = storeOffsetEnable && hasCommitOffsetFlag;
if (storeOffsetEnable) {
// 更新消息消费进度
this.brokerController.getConsumerOffsetManager().commitOffset(RemotingHelper.parseChannelRemoteAddr(channel),
requestHeader.getConsumerGroup(), requestHeader.getTopic(), requestHeader.getQueueId(), requestHeader.getCommitOffset());
}
return response;
}
Broker端处理拉取消息后,响应到消费端,根据消费端发送拉取消息请求(详细见上小节)时,定义的org.apache.rocketmq.client.consumer.PullCallback处理拉取的消息,提交到ConsumeMessageService,供消费者消费(异步提交),本次拉取消息完成。消息消费参考《RocketMQ5.0.0消息消费<三> _ 消息消费》。
四、消息拉取长轮询机制
通过上述两小节介绍,RocketMQ推模式是消费者循环向Broker端发送消息拉取请求,如果消费者向Broker发送消息拉取时,此时消息并未到达消费队列,是否开启长轮询机制做不同处理。长轮询模式使得消息拉取能实现准实时。
- 不启用长轮询模式:会在服务端等待shortPollingTimeMills时间后(挂起)再去判断消息是否已到达消息队列,如果消息未到达则提示消息拉取客户端
PULL_NOT_FOUND(消息不存在); - 开启长轮询模式(默认):RocketMQ一方面会每5s轮询检查一次消息是否到达,与此同时有新消息到达后立马通知挂起线程再次验证新消息是否是自己感兴趣的消息,则从commitlog文件提取消息返回给消息消费者,否则直到挂起超时。
挂起超时时间由消费者在消息拉取时封装在请求参数中,PUSH模式默认为15s,PULL模式DefaultMQPullConsumer#brokerSuspendMaxTimeMillis设置。
RocketMQ长轮询机制由两个线程共同来完成:
-
PullRequestHoldService线程:消费者拉取消息未找到消息时,Broker挂起每个拉取消息的请求,并添加到该线程中,每隔5s重试再次拉取消息;
-
DefaultMessageStore.ReputMessageService线程:新消息执行异步转发(新消息构建消费队列)时,主动唤醒消费者的拉取请求,沉睡1ms继续下一次检查。
1. PullRequestHoldService线程
1):拉取请求添加到PullRequestHoldService线程
参考第二节《PUSH模式的消息拉取机制#Broker处理拉取请求》,org.apache.rocketmq.broker.processor.DefaultPullMessageResultHandler#handle处理拉取请求实现类,其状态ResponseCode.PULL_NOT_FOUND(未找到消息)时的处理逻辑,代码如下。
@Override
public RemotingCommand handle(final GetMessageResult getMessageResult,
final RemotingCommand request,
final PullMessageRequestHeader requestHeader,
final Channel channel,
final SubscriptionData subscriptionData,
final SubscriptionGroupConfig subscriptionGroupConfig,
final boolean brokerAllowSuspend,
final MessageFilter messageFilter,
RemotingCommand response) {
final PullMessageResponseHeader responseHeader = (PullMessageResponseHeader) response.readCustomHeader();
switch (response.getCode()) {
case ResponseCode.SUCCESS:
......
case ResponseCode.PULL_NOT_FOUND: // 没有找到消息
// 消费者拉取请求的是否有挂起标志
final boolean hasSuspendFlag = PullSysFlag.hasSuspendFlag(requestHeader.getSysFlag());
// 消费者拉取请求的Broker挂起超时时间,默认15s
final long suspendTimeoutMillisLong = hasSuspendFlag ? requestHeader.getSuspendTimeoutMillis() : 0;
// Broker是否支持挂起 + 消费者拉取请求的是否有挂起标志
if (brokerAllowSuspend && hasSuspendFlag) {
// Broker挂起超时时间,默认15s
long pollingTimeMills = suspendTimeoutMillisLong;
// Broker没有开启长轮询挂起模式,默认超时1s
if (!this.brokerController.getBrokerConfig().isLongPollingEnable()) {
pollingTimeMills = this.brokerController.getBrokerConfig().getShortPollingTimeMills();
}
String topic = requestHeader.getTopic();
long offset = requestHeader.getQueueOffset();
int queueId = requestHeader.getQueueId();
// 再次组装消息拉取请求
PullRequest pullRequest = new PullRequest(request, channel, pollingTimeMills,
this.brokerController.getMessageStore().now(), offset, subscriptionData, messageFilter);
// 提交到PullRequestHoldService线程
this.brokerController.getPullRequestHoldService().suspendPullRequest(topic, queueId, pullRequest);
// 响应设置null
return null;
}
case ResponseCode.PULL_RETRY_IMMEDIATELY:
break;
case ResponseCode.PULL_OFFSET_MOVED:
......
break;
default:
log.warn("[BUG] impossible result code of get message: {}", response.getCode());
assert false;
}
return response;
}
2):PullRequestHoldService线程周期执行
PullRequestHoldService该线程完成消费者拉取消息未找到消息时,Broker挂起每个拉取消息的请求,并添加到该线程中,每隔5s重试再次拉取消息。PullRequestHoldService#run方法5s周期执行消息拉取任务,其调用链如下。
org.apache.rocketmq.broker.longpolling.PullRequestHoldService#notifyMessageArriving是拉取消息的核心方法,代码如下。注意事项:
- 判断是否有新消息:当前消费队列最大偏移量 > 待拉取偏移量,说明有新消息。
/**
* 处理轮询机制的拉取请求
* step1:获取key下所有挂起的拉取消息请求;
* step2:复制并清空当前key的所有挂起拉取消息任务,
* 采用synchronized,避免并发访问 {@link org.apache.rocketmq.store.DefaultMessageStore.ReputMessageService}也会唤醒,尝试拉取消息
* step3:当前消费队列最大偏移量 > 待拉取偏移量,说明有新消息;
* step4:有匹配消息或拉取请求挂起超时时,拉取请求唤醒并返回给消息拉取的消费者
* {@link PullMessageProcessor#executeRequestWhenWakeup}
* @param topic
* @param queueId
* @param maxOffset 当前消费队列最大偏移量
* @param tagsCode
* @param msgStoreTime 消息存储时间
* @param filterBitMap
* @param properties
*/
public void notifyMessageArriving(final String topic, final int queueId, final long maxOffset, final Long tagsCode,
long msgStoreTime, byte[] filterBitMap, Map<String, String> properties) {
String key = this.buildKey(topic, queueId);
// 获取key下所有挂起的拉取消息请求
ManyPullRequest mpr = this.pullRequestTable.get(key);
if (mpr != null) {
// 复制并清空当前key的所有挂起拉取消息任务
List<PullRequest> requestList = mpr.cloneListAndClear();
if (requestList != null) {
List<PullRequest> replayList = new ArrayList<PullRequest>();
for (PullRequest request : requestList) {
long newestOffset = maxOffset;
if (newestOffset <= request.getPullFromThisOffset()) {
newestOffset = this.brokerController.getMessageStore().getMaxOffsetInQueue(topic, queueId);
}
// 当前消费队列最大偏移量 > 待拉取偏移量,说明有新消息
if (newestOffset > request.getPullFromThisOffset()) {
// 消息是否匹配
boolean match = request.getMessageFilter().isMatchedByConsumeQueue(tagsCode,
new ConsumeQueueExt.CqExtUnit(tagsCode, msgStoreTime, filterBitMap));
// match by bit map, need eval again when properties is not null.
if (match && properties != null) {
match = request.getMessageFilter().isMatchedByCommitLog(null, properties);
}
if (match) {
try {
// 匹配,则将消费者拉取请求唤醒,将消息返回给消息拉取的消费者
this.brokerController.getPullMessageProcessor().executeRequestWhenWakeup(request.getClientChannel(),
request.getRequestCommand());
} catch (Throwable e) {
log.error(
"PullRequestHoldService#notifyMessageArriving: failed to execute request when "
+ "message matched, topic={}, queueId={}", topic, queueId, e);
}
continue;
}
}
// 消费者拉取请求挂起超时,则直接返回给消费者,未找到消息
if (System.currentTimeMillis() >= (request.getSuspendTimestamp() + request.getTimeoutMillis())) {
try {
// 将消费者拉取请求唤醒,将消息返回给消息拉取的消费者
this.brokerController.getPullMessageProcessor().executeRequestWhenWakeup(request.getClientChannel(),
request.getRequestCommand());
} catch (Throwable e) {
log.error(
"PullRequestHoldService#notifyMessageArriving: failed to execute request when time's "
+ "up, topic={}, queueId={}", topic, queueId, e);
}
continue;
}
replayList.add(request);
}
// 下次循环拉取的消息
if (!replayList.isEmpty()) {
mpr.addPullRequest(replayList);
}
}
}
}
2. DefaultMessageStore.ReputMessageService线程
org.apache.rocketmq.store.DefaultMessageStore.ReputMessageService该线程完成新消息执行异步转发时(新消息构建消费队列),主动唤醒消费者的拉取请求,沉睡1ms继续下一次检查。
org.apache.rocketmq.store.DefaultMessageStore.ReputMessageService#doReput执行异步转发的核心逻辑(参考《RocketMQ5.0.0消息存储<三>_消息转发与恢复机制》),其如下部分代码是主动唤醒消费者拉取请求。
/**
* 消息提交到Commitlog时消息转发,构建ConsumeQueue、index文件服务 的核心方法
* step1:消息转发偏移量 > 最小偏移量时,则最小偏移量赋值给消息转发偏移量
* step2:消息转发偏移量 <= 最小偏移量时,获取转发偏移量开始的全部有效数据
* step3:循环转发每条消息,获取每条消息的转发请求对象{@link DispatchRequest}
* step4:消息解析成功后,转发消息{@link DefaultMessageStore#doDispatch(DispatchRequest)}
* 注意:转发消息时遍历LinkedList<CommitLogDispatcher> dispatcherList,集合中有消息队列、索引文件执行转发的实现类
* step5:更新消息转发偏移量
*/
private void doReput() {
......
// 消息转发
DefaultMessageStore.this.doDispatch(dispatchRequest);
// 开启长轮询时,唤醒PullRequestHoldService线程,执行被挂起的拉取消息请求
if (DefaultMessageStore.this.brokerConfig.isLongPollingEnable()
&& DefaultMessageStore.this.messageArrivingListener != null) {
// 唤醒被挂起的拉取消息请求,再次拉取消息
DefaultMessageStore.this.messageArrivingListener.arriving(dispatchRequest.getTopic(),
dispatchRequest.getQueueId(), dispatchRequest.getConsumeQueueOffset() + 1,
dispatchRequest.getTagsCode(), dispatchRequest.getStoreTimestamp(),
dispatchRequest.getBitMap(), dispatchRequest.getPropertiesMap());
// 多个消息队列,唤醒被挂起的拉取消息请求,再次拉取消息
notifyMessageArrive4MultiQueue(dispatchRequest);
}
......
}
五、参考资料
- https://blog.csdn.net/liqiuman180688/article/details/88390667
- https://baijiahao.baidu.com/s?id=1740665653045957262&wfr=spider&for=pc
- https://blog.csdn.net/aideserter/article/details/118713781