Rocket
- 一、学习目标
- 二、RocketMQ的架构运行图
- 2.1、NameServer
- 2.1.1 为什么需要NameServer
- 2.1.1.1 不可以没有nameserver吗?
- 2.1.2 NameServer需要单独部署吗
- 2.1.3 Nameserver可以动态注册和注销Broker、Topic和Consume 是什么意思
- 2.1.4 可以使用nacos的配置中心替代NameServer吗
- 2.2 broker
- 2.1.1 消息存储模块的设计
- 3. broker的负载均衡算法
- 4. 消费组
- 4.1 多个消费组共同消费一个topic是如何进行维护消费进度。
- 4.2 生产者tag的关系
- 4.3 消费者如何确认消息被消费
- 4.4 消费模式
- 4.5 消费者与消息的拉、推模式
- 4.6 消费者设置多线程
- 4.7 延时消息队列
一、学习目标
RocketMQ 为什么性能高效,到底运用了什么“厉害”的技术?
RocketMQ 如何实现刷盘(可以类比一下数据库方面的刷盘、redo、undo 日志)?
RocketMQ 文件存储设计理念、基于文件的 Hash 索引是怎么实现的?
定时消息、消息过滤等实现原理?
如何进行网络编程(Netty 实战)?
二、RocketMQ的架构运行图
- 生产者
- 消费者
- broker
- nameserver
nameserver、broker、主题、 消费组、消费者、队列负载算法、队列重平衡机制、并发消费、顺序消费、消费进度存储、 定时消息、事务消息、消息过滤等基本概念介绍
2.1、NameServer
RocketMQ的Nameserver是一个元数据管理组件,它负责维护所有Broker、Topic和Consumer的元数据信息,包括Broker的地址、Topic的路由信息以及Consumer的消费进度等。Nameserver通过定期心跳来检测Broker和Consumer的状态,并将最新的元数据信息同步给Broker和Consumer。同时,Nameserver还支持动态注册和注销Broker、Topic和Consumer,使系统具备高可伸缩性和灵活性。
其中nameserver是最终一致性,任何节点都可以进行读写请求,因此在一定的时间内,可能会出现数据不一致的问题,即使出现也无伤大雅,如果对于数据要求必须的话,则可以采用强一致性的容器
2.1.1 为什么需要NameServer
抽出一个Nameserver的目的是为了解耦系统的元数据管理功能与其他业务逻辑,提高系统的可扩展性和灵活性。通过引入Nameserver,RocketMQ可以将Broker、Topic和Consumer的元数据信息集中管理,并对外提供统一的查询和更新接口,从而避免不同组件之间直接相互调用,增加了系统的可维护性和升级性。此外,由于Nameserver可以动态注册和注销Broker、Topic和Consumer,因此RocketMQ的整体拓扑结构可以根据业务需求灵活调整,以适应不同规模和复杂度的应用场景。(如果没有nameserver,集群需要添加一个节点都需要修改其他的节点的配置)
2.1.1.1 不可以没有nameserver吗?
可以啊,比如nacos的配置信息就是在本地啊,因此在nacos的conf文件中需要注明集群的节点进行通信,然后各个服务直接传输信息,但是这样就不够灵活调整了,加节点就需要修改所有的nacos的conf文件。
有些服务需要使用成千上万台rocketMq的broker,你这节点修改不可能的啊。
2.1.2 NameServer需要单独部署吗
是的,Nameserver需要单独部署。RocketMQ的Nameserver是一个独立的元数据管理组件,它需要独立运行,并且与Broker和Consumer分开部署。通常情况下,建议在集群中至少部署两个Nameserver节点,以确保高可用性。此外,为了保证系统的性能和稳定性,建议将Nameserver部署在配置比较高的机器上,并对其进行监控和维护。
2.1.3 Nameserver可以动态注册和注销Broker、Topic和Consume 是什么意思
动态注册和注销是指在运行时动态地向Nameserver注册或注销Broker、Topic和Consumer。具体来说,当一个新的Broker启动时,它会向Nameserver发送注册请求,将自己的元数据信息注册到Nameserver中;而当一个Broker停止运行时,它会向Nameserver发送注销请求,从Nameserver中注销自己的元数据信息。同样地,当一个新的Topic或Consumer被创建时,它们也需要向Nameserver发送注册请求,将自己的元数据信息注册到Nameserver中;而当一个Topic或Consumer被删除时,它们需要向Nameserver发送注销请求,从Nameserver中注销自己的元数据信息。
通过动态注册和注销机制,Nameserver可以动态地管理Broker、Topic和Consumer的元数据信息,使RocketMQ的整体拓扑结构更加灵活和可维护。例如,当需要增加或减少Broker时,只需启动或关闭相应的Broker即可,Nameserver会自动检测并更新系统的元数据信息;当需要增加或减少Topic时,只需创建或删除相应的Topic即可,Nameserver也会自动更新Broker的路由信息。这种机制大大简化了系统的运维和管理工作,并提高了系统的可扩展性和灵活性。
2.1.4 可以使用nacos的配置中心替代NameServer吗
实际上,RocketMQ也提供了与Nacos集成的方案,可以使用Nacos作为元数据管理组件来替代默认的Nameserver。使用Nacos作为元数据管理组件的优势在于可以利用Nacos提供的服务注册和发现、配置管理等功能,简化系统的架构和运维成本。但需要注意的是,使用Nacos作为元数据管理组件需要进行额外的配置和开发工作,同时其性能和稳定性也需要进一步验证和评估。因此,具体是否选择使用Nacos作为元数据管理组件,应该根据实际情况和需求来决定。
2.2 broker
RocketMQ的Broker是消息处理的核心组件,它负责接收和存储生产者发送的消息,并将消息路由给相应的消费者进行消费。具体来说,Broker包括以下几个功能模块:
消息存储模块:负责将生产者发送的消息存储到磁盘上,并支持消息的查询和检索。
消息传输模块:负责与生产者和消费者建立网络连接,接收和发送消息,并实现消息的分发和路由等功能。
索引管理模块:负责维护消息索引,支持快速查找和定位消息。
消息队列管理模块:负责创建和管理消息队列,支持动态扩容和缩容。
订阅关系管理模块:负责维护消费者与消息队列之间的订阅关系,并支持动态调整订阅关系。
在RocketMQ中,Broker可以水平扩展和缩容,即可以通过添加或删除Broker实例来动态调整系统的负载和容量。此外,Broker还支持多种消息存储方式和消息传输协议,可以根据业务需求选择合适的配置和参数。
2.1.1 消息存储模块的设计
- 问题: 一条消息过来怎么存储,怎么保证高效,怎么保证存储后更加容易找到,消息怎么序列化与反序列化?
- RocketMQ的消息存储模块采用了顺序写入、随机读取的方式,将消息存储在磁盘上。具体来说,当一条消息到达Broker时,Broker会首先将其写入内存缓冲区,并异步将缓冲区中的所有消息批量写入磁盘文件中。为了提高写入效率,RocketMQ使用了零拷贝技术和内存映射文件技术,在写入磁盘时避免了数据拷贝和系统调用的开销。
- 为了保证消息存储后更容易找到,RocketMQ采用了基于偏移量和索引的方式进行消息查找。具体来说,每个消息都有一个唯一的偏移量,在存储时将消息的偏移量和所属队列的信息保存到索引文件中。通过索引文件,可以快速定位某个偏移量对应的消息所在的物理地址,并从磁盘中读取出该消息进行消费。
- 在RocketMQ中,消息是以二进制格式进行序列化和反序列化的,支持多种序列化格式,包括JSON、Protobuf等。用户也可以根据自己的需求实现自定义的序列化器。序列化和反序列化操作由客户端和服务端共同完成,RocketMQ只提供序列化和反序列化接口,并不限制具体的实现方式。
- 问题:这里写入到磁盘的时候回先写入page cache ,然后根据对应策略进行刷盘
对的,对于broke而言,消息可以有
- 异步刷盘:等到page cache的数据足够多进行刷盘
- 同步刷盘:消息一到page cache 就刷盘
SYNC_FLUSH, //同步刷盘
ASYNC_FLUSH//异步刷盘(默认)
在broker.config 添加 sync_flush=true
- 问题:broker的零拷贝技术的具体体现
我们的系统是使用java来写的,因此读数据的时候常规会在jvm的堆空间进行开辟空间存储数据,然后刷盘的时候,会从jvm空间拷贝数据到内核空间,内核空间进行刷盘,这样就多了几层拷贝了,因此零拷贝技术的话,可以在生产者提交数据时,直接使用netty框架的零拷贝将数据放置到内核的byteBuffer中。在消费的时候读数据也类似,就是不经过jvm空间。
- 问题:消息是属于对应topic 以及对应queue
topic是一种业务类型,比如发货消息,就属于一个topic,但是如果发货消息过多,导致队列过长,则就新增了topic下属的queue,queue的话不会有对应的名称,类似于queue1\queue2,这样就会将topic的消息进行均分,减轻压力
- 问题:RocketMq为啥要有topic和queue,为啥不能直接一个topic
RocketMQ 之所以设计了 Topic 和 Queue 的概念,是为了更好地管理和处理消息。在 RocketMQ 中,一个 Topic 可以包含多个 Queue,每个 Queue 存储这个 Topic 的一部分消息。这种设计有以下几个优点:
- 提高消息处理效率:将大量消息分散到多个 Queue 中,可以提高消息的处理速度和吞吐量,避免单个队列因为消息积压而导致性能下降。
- 支持负载均衡:通过将消息均匀地分配到不同的 Queue 上,可以实现负载均衡,避免某个 Queue 被过度消耗导致系统不稳定。
- 方便扩展:当需要增加消息的处理能力时,只需要新增 Queue 即可,而不需要改动 Topic 的定义。这种方式既方便了扩展,也不会对已有的业务逻辑产生影响。
- 支持精细化控制:通过设置每个 Queue 的容量、权限等参数,可以实现对不同类型的消息进行精细化的控制,提高系统的可靠性和安全性。
- 问题: commitLog和索引文件的内容
消息到达commitLog都是会生成对应的偏移量,一个commitLog对应其索引文件,在索引文件中记录了commitLog的所有的偏移量。采用了hash槽的模式,可以在索引文件快速找到对应消息。
在 RocketMQ 中,一个 Topic 包含多个消息队列,而每个消息队列对应着一个 CommitLog 文件。具体而言,当创建一个新的 Topic 时,RocketMQ 会自动为该 Topic 创建多个消息队列,并在磁盘上为每个消息队列创建一个对应的 CommitLog 文件和索引文件。这样可以将不同的消息分别存储到不同的 CommitLog 文件中,从而提高消息存储和检索的效率。
在默认情况下,RocketMQ 为每个 Broker 配置了 16 个 Topic,每个 Topic 包含 4 个消息队列,因此总共有 64 个消息队列和 64 个 CommitLog 文件。如果需要增加或减少 Topic 数量或者每个 Topic 的消息队列数量,可以在启动 Broker 时通过修改相应的配置参数来实现。例如,可以通过修改 defaultTopicQueueNums 参数来指定每个 Topic 默认的消息队列数量,也可以通过修改 brokerTopicQueueNums 参数来指定特定 Topic 的消息队列数量。
- 问题:rocketmq如何保证 消息存储commitLog和生成对应的索引文件的事务性
恢复机制:如果发生了关机等意外情况,RocketMQ 提供了恢复机制来防止数据丢失。具体而言,当 Broker 重启时,会检查 CommitLog 文件和索引文件中的消息记录,并对其进行恢复。如果发现 CommitLog 文件中存在消息记录,但是索引文件中没有对应的记录,那么会通过扫描 CommitLog 文件的方式重构索引文件,并重新创建缺失的索引记录。这样可以保证消息的存储和消费的可靠性。
3. broker的负载均衡算法
在 RocketMQ 中,Broker 的负载均衡主要分为两个方面:
-
生产者的负载均衡:RocketMQ 允许将消息发送到多个 Broker 上,并支持自动进行负载均衡。具体而言,当客户端向 NameServer 发送请求时,NameServer 会返回一个包含多个 Broker 地址的列表。客户端收到地址列表后,可以根据特定的负载均衡策略(如随机、轮询等方式)选择一个可用的 Broker 进行消息发送。
-
消费者的负载均衡:RocketMQ 支持消费者集群和广播两种消息消费模式。如果采用消费者集群模式,多个消费者可以共同消费同一主题下的消息,并且每个消费者只能消费部分消息。在这种情况下,RocketMQ 采用 Rebalance 机制来实现消费者的负载均衡。具体而言,当消费者加入或退出消费者集群时,会触发一次 Rebalance 操作,该操作会重新计算主题下所有消息队列的负载情况,并将消息队列分配给各个消费者。在分配消息队列时,可以使用多种负载均衡算法,如轮询、随机、最小负载等方式。
需要注意的是,RocketMQ 的负载均衡机制是基于 NameServer 和 Broker 的协作实现的。当新的 Broker 加入集群或者原有的 Broker 发生宕机等情况时,NameServer 会感知到这种变化,并将相应的 Broker 信息更新到 Broker 地址列表中。消费者在下次 Rebalance 操作时,就可以根据最新的 Broker 列表进行负载均衡了。
综上所述,RocketMQ 的 Broker 负载均衡涉及到生产者和消费者两个方面。通过采用多个 Broker,使用 Rebalance 机制和多种负载均衡算法等技术手段,可以实现消息在多个 Broker 之间的负载均衡,并提高消息发送和消费的效率和可靠性。
4. 消费组
在rocket消息消费的时候,需要有消费者,Rocketmq引入了消费组,消费组可以关联一个Topic。
- subscribe(“topic”,“*”) 订阅topic的所有的queue数据
- subscribe(“topic”:“tag”) 订阅对应tag的数据
在消费组消费数据的时候也有两种模式
3. 集群模式:消费组中的每一个消费者都会有其关联的queue,并且一个queue不可能被多个消费者关联,当消费组的成员发送改变,则会进行rebalance
4. 广播模式: 消费组中的消费者可以消费任何消息,也就是说消息会推送给任何的消费者。
4.1 多个消费组共同消费一个topic是如何进行维护消费进度。
消费者消费一条消息后需要记录消费的位置,这样在消费端重启的时候,继续从上一次 消费的位点开始进行处理新的消息。在 RocketMQ 中,消息消费位点的存储是以消费组为 单位的
4.2 生产者tag的关系
相同的tag会被放在一个queue中,实现顺序队列。
4.3 消费者如何确认消息被消费
消息被确认消费具有手动确认,和自动确认;
-
自动确认模式
在自动确认模式下,当消费者从队列中拉取到一条消息时,该消息将被标记为“已消费”,并且不会再次被消费者接收到。此时,消费者无需显式地确认消息,RocketMQ 会自动将其标记为已处理。 -
手动确认模式
在手动确认模式下,消费者需要通过调用 Acknowledge 接口的方法来显示地确认消息。通常,手动确认模式适用于需要对消息进行重复处理或者跨批次消费等场景,可以提高消息的可靠性和稳定性。
4.4 消费模式
RocketMQ 支持多种消费模式,包括基于 Offset 的消费和基于时间片的消费。这些消费模式可以根据业务需求和系统架构来选择和配置,以满足不同的场景和性能要求。
-
基于 Offset 的消费
基于 Offset 的消费是 RocketMQ 默认的消费模式,它是一种顺序消费模式,在消费者启动时从指定的位置开始消费,并且按照消息的顺序依次消费每个消息。当消费者处理完一条消息后,它的 Offset 会自动增加并保存到本地文件中,以便在下次启动时可以从断点处继续消费。由于 Offset 是在 Consumer 端管理的,因此可以通过修改 Offset 来实现重复消费、跨批次消费等特性。 -
基于时间片的消费(延时队列的实现)
基于时间片的消费是一种并发消费模式,在该模式下,消费者会将消息队列分成若干个时间片,并同时启动多个线程进行并发消费。消息队列中的每个消息都会被投递到一个时间片中,并由该时间片对应的线程进行消费。在消费过程中,RocketMQ 会保证相同的消息只会被一个线程消费,以避免消息的重复消费和丢失。
需要注意的是,在使用基于时间片的消费模式时,需要根据具体的业务场景和系统性能进行调整。同时,还需要考虑消息队列的负载均衡、消费者的并发度、消息处理的异常情况等问题,以确保消息的可靠性和系统的稳定性。
4.5 消费者与消息的拉、推模式
在 RocketMQ 中,消费者可以选择是自动拉取数据(Pull)还是由 Broker 推送数据(Push)。两种模式各具特色,可以根据实际需求选择和配置。
-
Pull 模式
在 Pull 模式下,消费者负责向 Broker 主动发起拉取请求,获取需要消费的消息。具体来说,消费者会定期向 Broker 发送拉取消息的请求,Broker 返回队列中的消息给客户端,然后客户端再进行消费处理。Pull 模式的优点是消费者可以精确控制数据拉取的速度和频率,可以根据系统负载和网络情况等动态调整拉取策略和延迟时间,以避免数据丢失和重复消费。 -
Push 模式
在 Push 模式下,Broker 负责将消息推送到消费者端,不需要消费者向 Broker 发起请求。具体来说,当有新消息到达时,Broker 会立即将其推送给消费者端,消费者只需要处理收到的消息即可。Push 模式的优点是消息传递的实时性更高,能够更快速地响应业务需求和变化,适用于实时性要求较高的场景。
rocketmq.client.consumer.consumeThreadMax = 10
rocketmq.client.consumer.consumeMessageBatchMaxSize = 32
4.6 消费者设置多线程
@Component
@RocketMQMessageListener(
topic = "my-topic",
consumerGroup = "my-group",
selectorExpression = "my-tag"
)
public class MyRocketMQConsumer implements RocketMQListener<String> {
private static final Logger LOGGER = LoggerFactory.getLogger(MyRocketMQConsumer.class);
// 对对应bean设置了对应的线程数
@Value("${rocketmq.consumer.listener-thread-count}")
private int listenerThreadCount;
@Override
public void onMessage(String message) {
LOGGER.info("Received message: {}", message);
}
// 设置了线程数
@Bean(name = "${rocketmq.consumer.listener-container}")
public DefaultRocketMQListenerContainerConfigurer getListenerContainerConfigurer() {
return new DefaultRocketMQListenerContainerConfigurer() {
@Override
public void configure(AbstractRocketMQListenerContainer container) {
container.setConsumeThreadMax(listenerThreadCount);
container.setConsumeThreadMin(listenerThreadCount);
}
};
}
}
也可以在yml进行配置
spring:
rocketmq:
name-server: localhost:9876
consumer:
group: my-group
topic: my-topic
tags: my-tag
listener-thread-count: 10
4.7 延时消息队列
RocketMQ 的延时消息是通过 Message Store 实现的,具体来说,是通过在消息存储过程中设置消息的消息定时级别(Message Delay Level),并将消息保存到相应的 CommitLog 文件中来实现的。
消息定时级别是 RocketMQ 中用于标识消息的延时时间的参数,它是通过消息属性 DELAY_TIME_LEVEL 来传递的。RocketMQ 预定义了 18 个不同的消息定时级别,分别对应着 1s、5s、10s、30s 等不同的延时时间。当 Producer 发送一个延时消息时,它会将消息的 DELAY_TIME_LEVEL 属性设置为合适的值,并将消息发送给 Broker。
在 Broker 接收到延时消息后,它会根据消息的定时级别将消息保存到不同的 CommitLog 文件中。具体来说,Broker 会将延时消息保存到一个名为 %topic%-%queueId% 的 ConsumeQueue 文件中,其中 %topic% 表示消息的主题名称,%queueId% 是该主题下的队列编号。这个文件中记录了消息的位置和坐标,以便于 Broker 在消息消费时能够快速地找到相关的消息。
当消息的定时时间到达后,Broker 会将该消息从 ConsumeQueue 文件中删除,并将其加入到消息消费队列中,等待被消费者接收。因此,可以看出,在 RocketMQ 的延时消息中,消息是在 Broker 端被消费者接收之前才存储到 CommitLog 文件中的。
需要注意的是,在使用延时消息时,需要合理设置消息的定时级别,并根据业务场景和系统架构选择合适的延时时间。同时,为了保证消息传递的实时性和稳定性,也需要考虑消息消费的可靠性、重复消费等问题,并进行相应的优化和调整。