一、测试流程
1.添加POM
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.2.1</version>
</dependency>
2.配置属性参数
# 默认的消息组
rocketmq.producer.group=springBootProduceGroup
rocketmq.consumer.topic=DEVICE_CENTER_MSG_DEV
rocketmq.consumer.group=springBootConsumeGroup33
3.添加监听代码
@Service
@RocketMQMessageListener(topic = "${rocketmq.consumer.topic}",consumerGroup ="${rocketmq.consumer.group}",
selectorExpression = "*",maxReconsumeTimes=2)
public class MsgRecvServiceImpl implements RocketMQListener<MessageExt>, RocketMQPushConsumerLifecycleListener {
@Override
public void onMessage(MessageExt messageExt) {
System.out.printf("messageExt: %s,body:%s \n",messageExt,new String(messageExt.getBody()));
}
@Override
public void prepareStart(DefaultMQPushConsumer consumer) {
consumer.setConsumeThreadMin(8);
consumer.setConsumeThreadMax(32);
consumer.setPullBatchSize(32);
// consumer.setInstanceName("10.6.33.23@10.6.14.3:9876@33404");
// consumer.setAllocateMessageQueueStrategy(new );
System.out.printf(" consumer:%s \n",consumer);
}
}
二、测试现象
1.问题描述
监听者有时收不到消息。
然后查看消费组配置
发现此主题有4个队列,但是有两个队列没有消息客户端订阅。
三、原因分析
1.原因详细分析
查看消费端负载均衡策略,看到本进程竟然有两个CID客户端在订阅这个TOPIC.
然后我们重启服务,在生成消费客户端实例名的函数断点。
可以看到在自动配置时创建了一个默认的PullConsumer
然后在创建自定义的监听实例时也实例了一个DefaultMQPushConsumer
2.原因总结
是由于springboot-rocketmq-start自动配置里面创建了一个默认的CONSUMER,导致生成了两个CONSUMER,
3.解决方案
因为自动配置是根据rocketmq.consumer.topic,rocketmq.consumer.group来生成的,所以只要在配置文件修改这两个属性的命名即可。
# NameServer地址
rocketmq.name-server=10.6.14.3:9876
# 默认的消息组
rocketmq.producer.group=springBootProduceGroup
rocketmq.device_center_dev_consumer.topic=DEVICE_CENTER_MSG_DEV
rocketmq.device_center_dev_consumer.group=springBootConsumeGroup33
修改完后,消费端负载均衡只找到了一个消费端,
然后查看ROCKETMQ控制台,这个消费客户端也同时消费了4个队列的数据,接收数据正常。
查看消息消费状态,也正常
程序也收到了消息