这是本人学习的总结,主要学习资料如下
- 马士兵教育
- rocketMq官方文档
目录
- 1、前置知识
- 1.1、pull和push型消费者
- 1.2、消息CommitLog到ConsumeQueue
- 1.3、自动创建的重试主题
- 1.4、广播型消费和集群型消费中offset的存储位置
- 2、消费中的启动流程
- 2.1、Preview
- 2.2、校验,配置,启动
1、前置知识
1.1、pull和push型消费者
消费者可分为两种,一种是pull
另一种是push
。
pull
是消费者主动到broker
上拿消息,push
则是设置监听等broker
发信息过来。
当然,底层实现其实都是pull
,push
模式下broker
只是发送一个通知,然后消费者到broker
上拉取消息。
1.2、消息CommitLog到ConsumeQueue
消费者读取数据时不会去读取CommitLog
,而是CunsumeQueue
。
生成者发送信息只会到CommitLog
中发送消息,不管ConsumeQueue
。
Broker
专门启动了一个后台线程从CommitLog
中将消息同步到ConsumeQueue
,就是ReputMessageService
。
这种与消息相关的服务基本上都在在DefaultMessageStore
中进行构建和启动。
ReputMessageService
这个类源码没什么特点或者技术亮点,代码一层套一层比较繁琐,没有必要太过深究,只需要知道它的功能即可。
1.3、自动创建的重试主题
消费消息失败时我们可以返回重试枚举return ConsumeConcurrentlyStatus.RECONSUME_LATER;
。
这样隔一段时间后消费者会再次尝试消费消息。
重试机制会为原来的topic
创建一个信息重试topic
。
在dashborad
界面NORMAL就是我们平时创建的一般的topic
,RETRY就是系统自动创建的重试topic
,一般是在原topic
前加上%RETRY%
。
1.4、广播型消费和集群型消费中offset的存储位置
offsetStore
表示消息消费的偏移量的存储位置。
从上图可以看到,广播型和集群型的消费模式有不同的offsetStore
的值。
查看源码会发现,集群型消费的偏移量需要从远程的broker
中获取;而广播型的offsetStore
只在本地叫offsets.json
的文件中维护偏移量,下图是广播型消费创建offsetStore
的源码。
这是因为广播型的消息是给每个messageQueue
都塞了消息,不像集群型一条消息只塞给其中一个messageQueue
,所以广播型消费的特点是不保证每个consumer
都能消费到消息,而集群型需要严格保证这一点。
所以偏移量放在本地可以降低通讯成本,正好广播型不需要保证消息一定被消费,那即使消费者宕机offset
丢失也是可以接受的。
而集群型要保证消息一定被消费,就需要将offset
保存在远程的broker
,这样即使消费者宕机消息在broker
中也是未消费的状态。
2、消费中的启动流程
2.1、Preview
这是一个简单的Consumer
的代码。
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumer_group1");
consumer.setNamesrvAddr("localhost:9876");
consumer.subscribe("topic1", "*");
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list
, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
// todo
}
});
consumer.start();
前面的设置结束后就会通过start()
启动服务拉取消息,接下来会进入start()
查看源码了解消费者的启动过程。
2.2、校验,配置,启动
刚启动时,状态是CREATE_JUST
,我们进入这个分支查看源码。
服务启动前都要做一些校验,然后配置初始化必要的bean,其中就包括为以后可能的消费失败创建一个重试topic
。
这些配置和检验不细说。不过有一个配置offsetStore
比较有意思。
广播型消费的offset
存储在本地的offsets.json
中,集群型消费的offset
存储在远程。下面是相关代码。
各种配置之后开始启动消费者服务。