使用Spring-RocketMQ
时,只需要引入rocketmq-spring-boot-starter
包,并且定义以下消费者,就可以很简单的实现消息消费
@Component
@RocketMQMessageListener(topic = "first-topic", consumerGroup = "my-producer-group", selectorExpression = "tag1")
public class RocketMQConsumer implements RocketMQListener<String>{
@Override
public void onMessage(String message) {
System.out.println(message);
}
可以看到只需要添加@RocketMQMessageListener
注解,并实现RocketMQListener
接口就可以完成消息的接受、处理逻辑
@RocketMQMessageListener
实现原理
可以看到在ListenerContainerConfiguration
中获取了所有加了RocketMQMessageListener
注解的bean
ListenerContainerConfiguration#afterSingletonsInstantiated
//ListenerContainerConfiguration实现了SmartInitializingSingleton接口,会在bean都实例化完之后,触发afterSingletonsInstantiated方法
@Override
public void afterSingletonsInstantiated() {
//获取所有加了RocketMQMessageListener注解的bean
Map<String, Object> beans = this.applicationContext.getBeansWithAnnotation(RocketMQMessageListener.class)
.entrySet().stream().filter(entry -> !ScopedProxyUtils.isScopedTarget(entry.getKey()))
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
//循环调用registerContainer方法
beans.forEach(this::registerContainer);
}
private void registerContainer(String beanName, Object bean) {
......
//拿到RocketMQMessageListener注解
RocketMQMessageListener annotation = clazz.getAnnotation(RocketMQMessageListener.class);
//获取注解上定义的consumerGroup
String consumerGroup = this.environment.resolvePlaceholders(annotation.consumerGroup());
//获取注解上定义的topic
String topic = this.environment.resolvePlaceholders(annotation.topic());
//定义beanName
String containerBeanName = String.format("%s_%s", DefaultRocketMQListenerContainer.class.getName(),counter.incrementAndGet());
GenericApplicationContext genericApplicationContext = (GenericApplicationContext) applicationContext;
//注册bean 调用createRocketMQListenerContainer初始化一些属性
genericApplicationContext.registerBean(containerBeanName, DefaultRocketMQListenerContainer.class,
() -> createRocketMQListenerContainer(containerBeanName, bean, annotation));
DefaultRocketMQListenerContainer container = genericApplicationContext.getBean(containerBeanName,
DefaultRocketMQListenerContainer.class);
if (!container.isRunning()) {
try {
//调用start方法
container.start();
} catch (Exception e) {
log.error("Started container failed. {}", container, e);
throw new RuntimeException(e);
}
}
log.info("Register the listener to container, listenerBeanName:{}, containerBeanName:{}", beanName, containerBeanName);
}
createRocketMQListenerContainer
里面就是初始化了DefaultRocketMQListenerContainer
这个对象,并且设置了一些消费相关的属性,比如nameServer
、topic
、tags
、consumerGroup
消费者组,rocketMQListener
我们定义的消费监听者等
可以看到这里面并没有定义具体的消费者实例
//DefaultRocketMQListenerContainer定义 实现了InitializingBean接口,在bean初始化的时候会调用afterPropertiesSet方法
public class DefaultRocketMQListenerContainer implements InitializingBean,
RocketMQListenerContainer, SmartLifecycle, ApplicationContextAware {
@Override
public void afterPropertiesSet() throws Exception {
//通过方法名可以看到是初始化MQ消费者实例
initRocketMQPushConsumer();
this.messageType = getMessageType();
this.methodParameter = getMethodParameter();
log.debug("RocketMQ messageType: {}", messageType);
}
private void initRocketMQPushConsumer() throws MQClientException {
......
if (Objects.nonNull(rpcHook)) {
//初始化DefaultMQPushConsumer对象
consumer = new DefaultMQPushConsumer(consumerGroup, rpcHook, new AllocateMessageQueueAveragely(),
enableMsgTrace, this.applicationContext.getEnvironment().
resolveRequiredPlaceholders(this.rocketMQMessageListener.customizedTraceTopic()));
consumer.setVipChannelEnabled(false);
consumer.setInstanceName(RocketMQUtil.getInstanceName(rpcHook, consumerGroup));
} else {
log.debug("Access-key or secret-key not configure in " + this + ".");
consumer = new DefaultMQPushConsumer(consumerGroup, enableMsgTrace,
this.applicationContext.getEnvironment().
resolveRequiredPlaceholders(this.rocketMQMessageListener.customizedTraceTopic()));
}
//消息模式 广播还是集群
switch (messageModel) {
case BROADCASTING:
consumer.setMessageModel(org.apache.rocketmq.common.protocol.heartbeat.MessageModel.BROADCASTING);
break;
case CLUSTERING:
consumer.setMessageModel(org.apache.rocketmq.common.protocol.heartbeat.MessageModel.CLUSTERING);
break;
default:
throw new IllegalArgumentException("Property 'messageModel' was wrong.");
}
//筛选方式 TAG和SQL92
switch (selectorType) {
case TAG:
consumer.subscribe(topic, selectorExpression);
break;
case SQL92:
consumer.subscribe(topic, MessageSelector.bySql(selectorExpression));
break;
default:
throw new IllegalArgumentException("Property 'selectorType' was wrong.");
}
//消费模式 顺序和并发
switch (consumeMode) {
case ORDERLY:
consumer.setMessageListener(new DefaultMessageListenerOrderly());
break;
case CONCURRENTLY:
consumer.setMessageListener(new DefaultMessageListenerConcurrently());
break;
default:
throw new IllegalArgumentException("Property 'consumeMode' was wrong.");
}
}
回到上面registerContainer
,最后拿到DefaultRocketMQListenerContainer
的bean,调用start
方法
DefaultRocketMQListenerContainer#start
@Override
public void start() {
if (this.isRunning()) {
throw new IllegalStateException("container already running. " + this.toString());
}
try {
//当前consumer就是上面分析的DefaultMQPushConsumer
consumer.start();
} catch (MQClientException e) {
throw new IllegalStateException("Failed to start RocketMQ push consumer", e);
}
this.setRunning(true);
log.info("running container: {}", this.toString());
}
DefaultMQPushConsumer#start
@Override
public void start() throws MQClientException {
setConsumerGroup(NamespaceUtil.wrapNamespace(this.getNamespace(), this.consumerGroup));
//可以看到调用的是defaultMQPushConsumerImpl.start()方法
this.defaultMQPushConsumerImpl.start();
if (null != traceDispatcher) {
try {
traceDispatcher.start(this.getNamesrvAddr(), this.getAccessChannel());
} catch (MQClientException e) {
log.warn("trace dispatcher start failed ", e);
}
}
}
defaultMQPushConsumerImpl
是什么时候初始化的呢
上面说到初始化DefaultMQPushConsumer
对象时,点进去构造方法
public DefaultMQPushConsumer(final String namespace, final String consumerGroup, RPCHook rpcHook,
AllocateMessageQueueStrategy allocateMessageQueueStrategy, boolean enableMsgTrace, final String customizedTraceTopic) {
this.consumerGroup = consumerGroup;
this.namespace = namespace;
this.allocateMessageQueueStrategy = allocateMessageQueueStrategy;
//可以看到就在构造方法里面初始化的,通过名字可以猜想就是DefaultMQPushConsumer的实现类,但是并不是通过实现接口的方式,而是组合的方式
defaultMQPushConsumerImpl = new DefaultMQPushConsumerImpl(this, rpcHook);
if (enableMsgTrace) {
try {
AsyncTraceDispatcher dispatcher = new AsyncTraceDispatcher(customizedTraceTopic, rpcHook);
dispatcher.setHostConsumer(this.getDefaultMQPushConsumerImpl());
traceDispatcher = dispatcher;
this.getDefaultMQPushConsumerImpl().registerConsumeMessageHook(
new ConsumeMessageTraceHookImpl(traceDispatcher));
} catch (Throwable e) {
log.error("system mqtrace hook init failed ,maybe can't send msg trace data");
}
}
}
接口看DefaultMQPushConsumerImpl
的start
方法
DefaultMQPushConsumerImpl#start
public synchronized void start() throws MQClientException {
switch (this.serviceState) {
case CREATE_JUST:
//顺序消息 ConsumeMessageOrderlyService处理
if (this.getMessageListenerInner() instanceof MessageListenerOrderly) {
this.consumeOrderly = true;
this.consumeMessageService =
new ConsumeMessageOrderlyService(this, (MessageListenerOrderly) this.getMessageListenerInner());
//并发消息 ConsumeMessageConcurrentlyService处理
} else if (this.getMessageListenerInner() instanceof MessageListenerConcurrently) {
this.consumeOrderly = false;
this.consumeMessageService =
new ConsumeMessageConcurrentlyService(this, (MessageListenerConcurrently) this.getMessageListenerInner());
}
this.consumeMessageService.start();
//调用start方法
mQClientFactory.start();
}
MQClientInstance#start
public void start() throws MQClientException {
synchronized (this) {
switch (this.serviceState) {
case CREATE_JUST:
this.serviceState = ServiceState.START_FAILED;
// If not specified,looking address from name server
if (null == this.clientConfig.getNamesrvAddr()) {
this.mQClientAPIImpl.fetchNameServerAddr();
}
// Start request-response channel
this.mQClientAPIImpl.start();
// Start various schedule tasks
this.startScheduledTask();
// Start pull service 可以看到这里开启拉取消息
this.pullMessageService.start();
// Start rebalance service
this.rebalanceService.start();
// Start push service
this.defaultMQProducer.getDefaultMQProducerImpl().start(false);
log.info("the client factory [{}] start OK", this.clientId);
this.serviceState = ServiceState.RUNNING;
break;
case START_FAILED:
throw new MQClientException("The Factory object[" + this.getClientId() + "] has been created before, and failed.", null);
default:
break;
}
}
}
ServiceThread#start
public void start() {
log.info("Try to start service thread:{} started:{} lastThread:{}", getServiceName(), started.get(), thread);
if (!started.compareAndSet(false, true)) {
return;
}
stopped = false;
//new了一个Thread对象,this表示自己就是一个Runnable对象
this.thread = new Thread(this, getServiceName());
this.thread.setDaemon(isDaemon);
//调用start方法
this.thread.start();
}
//ServiceThread实现了Runnable接口,并且是抽象的,找实现类
public abstract class ServiceThread implements Runnable {
}
可以看到PullMessageService
和我们找的有关,找到它的run
方法
PullMessageService#run
@Override
public void run() {
log.info(this.getServiceName() + " service started");
//通过while循环拉取消息
while (!this.isStopped()) {
try {
//消息存入LinkedBlockingQueue中,通过take方法阻塞获取
PullRequest pullRequest = this.pullRequestQueue.take();
//调用pullMessage处理消息
this.pullMessage(pullRequest);
} catch (InterruptedException ignored) {
} catch (Exception e) {
log.error("Pull Message Service Run Method exception", e);
}
}
log.info(this.getServiceName() + " service end");
}
private void pullMessage(final PullRequest pullRequest) {
//选择一个消费者实例
final MQConsumerInner consumer = this.mQClientFactory.selectConsumer(pullRequest.getConsumerGroup());
if (consumer != null) {
//转换为DefaultMQPushConsumerImpl对象,应该很熟悉吧
DefaultMQPushConsumerImpl impl = (DefaultMQPushConsumerImpl) consumer;
//调用pullMessage方法继续处理
impl.pullMessage(pullRequest);
} else {
log.warn("No matched consumer for the PullRequest {}, drop it", pullRequest);
}
}
public void pullMessage(final PullRequest pullRequest) {
......
PullCallback pullCallback = new PullCallback() {
@Override
public void onSuccess(PullResult pullResult) {
//调用consumeMessageService的submitConsumeRequest方法
//consumeMessageService上面提到过,包含顺序和并发消费
DefaultMQPushConsumerImpl.this.consumeMessageService.submitConsumeRequest(
pullResult.getMsgFoundList(),
processQueue,
pullRequest.getMessageQueue(),
dispatchToConsume);
......
}
@Override
public void onException(Throwable e) {
if (!pullRequest.getMessageQueue().getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
log.warn("execute the pull request exception", e);
}
DefaultMQPushConsumerImpl.this.executePullRequestLater(pullRequest, pullTimeDelayMillsWhenException);
}
};
}
ConsumeMessageOrderlyService#submitConsumeRequest
@Override
public void submitConsumeRequest(
final List<MessageExt> msgs,
final ProcessQueue processQueue,
final MessageQueue messageQueue,
final boolean dispathToConsume) {
if (dispathToConsume) {
//ConsumeRequest是一个Runnable
ConsumeRequest consumeRequest = new ConsumeRequest(processQueue, messageQueue);
//提交到线程池中处理
this.consumeExecutor.submit(consumeRequest);
}
}
来看ConsumeRequest
的run
方法
ConsumeMessageOrderlyService.ConsumeRequest#run
@Override
public void run() {
......
//核心在这里消费消息
status = messageListener.consumeMessage(Collections.unmodifiableList(msgs), context);
}
DefaultRocketMQListenerContainer.DefaultMessageListenerOrderly#consumeMessage
@Override
public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
for (MessageExt messageExt : msgs) {
log.debug("received msg: {}", messageExt);
try {
long now = System.currentTimeMillis();
//处理消息
handleMessage(messageExt);
long costTime = System.currentTimeMillis() - now;
log.info("consume {} cost: {} ms", messageExt.getMsgId(), costTime);
} catch (Exception e) {
log.warn("consume message failed. messageExt:{}", messageExt, e);
context.setSuspendCurrentQueueTimeMillis(suspendCurrentQueueTimeMillis);
return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;
}
}
return ConsumeOrderlyStatus.SUCCESS;
}
DefaultRocketMQListenerContainer#handleMessage
private void handleMessage(
MessageExt messageExt) throws MQClientException, RemotingException, InterruptedException {
if (rocketMQListener != null) {
//可以看到最终调用到onMessage方法,也就是开头我们实现的接口中的onMessage方法
rocketMQListener.onMessage(doConvertMessage(messageExt));
} else if (rocketMQReplyListener != null) {
Object replyContent = rocketMQReplyListener.onMessage(doConvertMessage(messageExt));
Message<?> message = MessageBuilder.withPayload(replyContent).build();
org.apache.rocketmq.common.message.Message replyMessage = MessageUtil.createReplyMessage(messageExt, convertToBytes(message));
consumer.getDefaultMQPushConsumerImpl().getmQClientFactory().getDefaultMQProducer().send(replyMessage, new SendCallback() {
@Override public void onSuccess(SendResult sendResult) {
if (sendResult.getSendStatus() != SendStatus.SEND_OK) {
log.error("Consumer replies message failed. SendStatus: {}", sendResult.getSendStatus());
} else {
log.info("Consumer replies message success.");
}
}
@Override public void onException(Throwable e) {
log.error("Consumer replies message failed. error: {}", e.getLocalizedMessage());
}
});
}
}
至此整个流程也就通了
总结
@RocketMQMessageListener
相当于定义一个消费者,topic、consumerGroup、selectorExpression、consumeMode、messageModel
定义了消费者的一些属性
实现RocketMQListener
接口来处理具体消费逻辑
每个消费者初始化了一个DefaultRocketMQListenerContainer
对象,该对象中包含消费实例和消费者的属性
服务启动的时候开启一个线程轮训队列中的消息,如果没有就一直阻塞,拿到消息后,最终会调用自己实现的onMessage
方法