前提介绍
在RocketMQ中一般有两种获取消息的方式,一个是拉(pull,消费者主动去broker拉取),一个是推(push,主动推送给消费者),在上一章节中已经介绍到了相关的Push操作,接下来的章节会介绍Pull操作方式的消费机制体系。
DefaultMQPullConsumer
DefaultMQPullConsumer与DefaultMQPushConsumer相比最大的区别是,消费哪些队列的消息,从哪个位移开始消费,以及何时提交消费位移都是由程序自己的控制的。下面来介绍一下DefaultMQPullConsumer的内部原理。
总体流程执行
DefaultMQPullConsumer使用例子
public class MQPullConsumer {
private static final Map OFFSE_TABLE = new HashMap();
public static void main(String[] args) throws MQClientException {
DefaultMQPullConsumer consumer = new DefaultMQPullConsumer("groupName");
consumer.setNamesrvAddr("name-serverl-ip:9876;name-server2-ip:9876");
consumer.start();
Set mqs = consumer.fetchSubscribeMessageQueues("order-topic");
for(MessageQueue mq:mqs){
try {
long offset = consumer.fetchConsumeOffset(mq,true);
while(true){
PullResult pullResult = consumer.pullBlockIfNotFound(mq, null, getMessageQueueOffset(mq), 32);
putMessageQueueOffset(mq,pullResult.getNextBeginOffset());
switch(pullResult.getPullStatus()){
case FOUND:
List messageExtList = pullResult.getMsgFoundList();
for (MessageExt m : messageExtList) {
System.out.println(new String(m.getBody()));
}
break;
case NO_MATCHED_MSG:
break;
case NO_NEW_MSG:
break;
case OFFSET_ILLEGAL:
break;
}
}
} catch (Exception e) {
e.printStackTrace();
}
}
consumer.shutdown();
}
private static void putMessageQueueOffset(MessageQueue mq,
long nextBeginOffset) {
OFFSE_TABLE.put(mq, nextBeginOffset);
}
private static Long getMessageQueueOffset(MessageQueue mq) {
Long offset = OFFSE_TABLE.get(mq);
if(offset != null){
return offset;
}
return 0l;
}
}
- 消费者启动:consumer.start();
- 获取主题下所有的消息队列:这里是根据topic从nameserver获取的这里我们可以修改为从其他位置获取队列信息
Set mqs = consumer.fetchSubscribeMessageQueues("topicTest");
for(MessageQueue mq:mqs){
try {
long offset = consumer.fetchConsumeOffset(mq,true);
while(true){
PullResult pullResult = consumer.pullBlockIfNotFound(mq, null, getMessageQueueOffset(mq), 32);
}
DefaultMQPullConsumer的总体流程
启动DefaultMQPullConsumer是通过调用start()方法完成的
DefaultMQPullConsumer拉取源码分析
分析下DefaultMQPullConsumer拉取消息的流程
consumer.fetchSubscribeMessageQueues("order-topic")
从指定topic中拉取所有消息队列
Set mqs = consumer.fetchSubscribeMessageQueues("order-topic");
核心源码分析
fetchSubscribeMessageQueues()
- 通过调用fetchSubscribeMessageQueues()方法可以获取指定topic(GET_ROUTEINTO_BY_TOPIC)的读队列信息。它通过向nameserver发送GetRouteInfoRequest请求,请求内容为GET_ROUTEINTO_BY_TOPIC,nameserver将主题下的读队列个数发送给消费者,然后消费者使用如下代码创建出与读队列个数相同的MessageQueue对象。
- 每个MessageQueue对象里面记录了topic、broker名和读队列号。最后fetchSubscribeMessageQueues()将MessageQueue对象集合返回给调用者。
- 向NameServer发送请求获取topic参数对应的Broker信息和topic配置信息,即TopicRouteData对象。
public Set fetchSubscribeMessageQueues(String topic) throws MQClientException {
try {
TopicRouteData topicRouteData = this.mQClientFactory.getMQClientAPIImpl().getTopicRouteInfoFromNameServer(topic, timeoutMillis);
if (topicRouteData != null) {
Set mqList = MQClientInstance.topicRouteData2TopicSubscribeInfo(topic, topicRouteData);
if (!mqList.isEmpty()) {
return mqList;
} else {
throw new MQClientException("Can not find Message Queue for this topic, " + topic + " Namesrv return empty", null);
}
}
} catch (Exception e) {
throw new MQClientException(
"Can not find Message Queue for this topic, " + topic + FAQUrl.suggestTodo(FAQUrl.MQLIST_NOT_EXIST),
e);
}
throw new MQClientException("Unknow why, Can not find Message Queue for this topic, " + topic, null);
}
遍历过程TopicRouteData
遍历TopicRouteData对象的QueueData列表中每个QueueData对象,首先判断该QueueData对象是否具有读权限, 若有则根据该QueueData对象的readQueueNums值,创建readQueueNums个MessageQueue对象,并构成MessageQueue集合; 最后返回给MessageQueue集合
public static Set topicRouteData2TopicSubscribeInfo(final String topic, final TopicRouteData route) {
Set mqList = new HashSet();
List qds = route.getQueueDatas();
for (QueueData qd : qds) {
if (PermName.isReadable(qd.getPerm())) {
for (int i = 0; i < qd.getReadQueueNums(); i++) {
MessageQueue mq = new MessageQueue(topic, qd.getBrokerName(), i);
mqList.add(mq);
}
}
}
return mqList;
}
consumer.fetchConsumeOffset
通过该方法获取该MessageQueue队列下面从offset位置开始的消息内容,其中maxNums=32即表示获取的最大消息个数,offset为该MessageQueue对象的开始消费位置。
DefaultMQPullConsumer.fetchConsumeOffset(MessageQueue mq, boolean fromStore)
fetchConsumeOffset()有两个入参,第一个参数表示队列,第二个参数表示是否从broker获取该队列的消费位移,true表示从broker获取,false表示从本地记录获取,如果本地获取不到再从broker获取。 这里说的从本地获取是指从RemoteBrokerOffsetStore.offsetTable属性中获取,该属性记录了每个队列的消费位移。当从broker获取位移后会更新offsetTable。
pullBlockIfNotFound拉取信息
rocketmq提供了多个拉取方法,可以使用pullBlockIfNotFound()方法也可以使用pull()方法。两者的区别是如果队列中没有消息,两个方法的超时时间是不同的,pullBlockIfNotFound会等待30s返回一个空结果,pull是等待10s返回空结果。
不过pull方法的入参可以调整超时时间,而pullBlockIfNotFound则需要修改DefaultMQPullConsumer.consumerPullTimeoutMillis参数。不过两个方法调用的底层逻辑都是一样的,都是调用DefaultMQPullConsumerImpl.pullSyncImpl()方法获取消息。下面分析一下pullSyncImpl()方法。
public PullResult pullBlockIfNotFound(MessageQueue mq, String subExpression, long offset, int maxNums)
throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
return this.pullSyncImpl(mq, subExpression, offset, maxNums, true, this.getDefaultMQPullConsumer().getConsumerPullTimeoutMillis());
}
获取该MessageQueue队列的消费进度来设定参数offset值该方法最终调用pullSyncImpl,可以获取相关的结果数据。
- 参数1:消息队列(通过调用消费者的fetchSubscibeMessageQueue(topic)可以得到相应topic的所需要消息队列) ;
- 参数2:需要过滤用的表达式 ;
- 参数3:偏移量即消费队列的进度 ;
- 参数4:一次取消息的最大值 ;
DefaultMQPullConsumerImpl.pullSyncImpl(MessageQueue mq, String subExpression, long offset, int maxNums, boolean block)
DefaultMQPullConsumerImpl.pullSyncImpl的实现过程
private PullResult pullSyncImpl(MessageQueue mq, SubscriptionData subscriptionData, long offset, int maxNums, boolean block,
long timeout)
throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
this.isRunning();
if (null == mq) {
throw new MQClientException("mq is null", null);
}
if (offset < 0) {
throw new MQClientException("offset < 0", null);
}
if (maxNums 0) {
throw new MQClientException("maxNums , null);
}
this.subscriptionAutomatically(mq.getTopic());
int sysFlag = PullSysFlag.buildSysFlag(false, block, true, false);
long timeoutMillis = block ? this.defaultMQPullConsumer.getConsumerTimeoutMillisWhenSuspend() : timeout;
boolean isTagType = ExpressionType.isTagType(subscriptionData.getExpressionType());
PullResult pullResult = this.pullAPIWrapper.pullKernelImpl(
mq,
subscriptionData.getSubString(),
subscriptionData.getExpressionType(),
isTagType ? 0L : subscriptionData.getSubVersion(),
offset,
maxNums,
sysFlag,
0,
this.defaultMQPullConsumer.getBrokerSuspendMaxTimeMillis(),
timeoutMillis,
CommunicationMode.SYNC,
null
);
this.pullAPIWrapper.processPullResult(mq, pullResult, subscriptionData);
this.resetTopic(pullResult.getMsgFoundList());
if (!this.consumeMessageHookList.isEmpty()) {
ConsumeMessageContext consumeMessageContext = null;
consumeMessageContext = new ConsumeMessageContext();
consumeMessageContext.setNamespace(defaultMQPullConsumer.getNamespace());
consumeMessageContext.setConsumerGroup(this.groupName());
consumeMessageContext.setMq(mq);
consumeMessageContext.setMsgList(pullResult.getMsgFoundList());
consumeMessageContext.setSuccess(false);
this.executeHookBefore(consumeMessageContext);
consumeMessageContext.setStatus(ConsumeConcurrentlyStatus.CONSUME_SUCCESS.toString());
consumeMessageContext.setSuccess(true);
this.executeHookAfter(consumeMessageContext);
}
return pullResult;
}
检查MessageQueue对象的topic是否在RebalanceImpl.subscriptionInner:ConcurrentHashMap
SubscriptionData subscriptionData;
try {
subscriptionData = FilterAPI.buildSubscriptionData(this.defaultMQPullConsumer.getConsumerGroup(),
mq.getTopic(), subExpression);
} catch (Exception e) {
throw new MQClientException("parse subscription error", e);
}
long timeoutMillis = block ? this.defaultMQPullConsumer.getConsumerTimeoutMillisWhenSuspend() : timeout;
PullResult pullResult = this.pullAPIWrapper.pullKernelImpl(
mq,
subscriptionData.getSubString(),
0L,
offset,
maxNums,
sysFlag,
0,
this.defaultMQPullConsumer.getBrokerSuspendMaxTimeMillis(),
timeoutMillis,
CommunicationMode.SYNC,
null
);
this.pullAPIWrapper.processPullResult(mq, pullResult, subscriptionData);
if (!this.consumeMessageHookList.isEmpty()) {
ConsumeMessageContext consumeMessageContext = null;
consumeMessageContext = new ConsumeMessageContext();
consumeMessageContext.setConsumerGroup(this.groupName());
consumeMessageContext.setMq(mq);
consumeMessageContext.setMsgList(pullResult.getMsgFoundList());
consumeMessageContext.setSuccess(false);
this.executeHookBefore(consumeMessageContext);
consumeMessageContext.setStatus(ConsumeConcurrentlyStatus.CONSUME_SUCCESS.toString());
consumeMessageContext.setSuccess(true);
this.executeHookAfter(consumeMessageContext);
}
return pullResult;
}
Push和Pull的操作对比
- push-优点:及时性、服务端统一处理实现方便
- push-缺点:容易造成堆积、负载性能不可控
- pull-优点:获得消息状态方便、负载均衡性能可控
- pull-缺点:及时性差
使用DefaultMQPullConsumer拉取消息,发送到broker的提交位移永远都是0,所以broker无法记录有效位移,需要程序自己记录和控制提交位移。
分享资源
获取以上资源请访问开源项目 点击跳转