RocketMQ Broker 中的消息被消费后会立即删除吗?
不会,每条消息都会持久化到 CommitLog 中,每个 Consumer 连接到 Broker 后会维持消费进度信息,当有消息消费后只是当前Consumer 的消费进度(CommitLog 的 offset)更新了。
当然也可以配置消息被消费后是否立即删除。当消费者成功消费一条消息,并完成确认后,Broker会将这条消息的状态更新为已消费,并根据配置的策略来决定是否立即删除这条消息。
RocketMQ支持两种消息删除策略:
-
删除策略为"立即删除":当Broker接到消费者的确认消息时,立刻删除该消息。这种删除策略可以在Broker的配置文件中设置,但是一旦设置为立即删除,就无法保证消息的可靠性。
-
删除策略为"延迟删除"(默认删除策略):当Broker接到消费者的确认消息时,将该消息的状态标记为"已消费",但并未立即删除,而是将该消息的删除时间延迟到一定的时间之后,再进行删除。这种删除策略可以使消息在删除前保留一段时间,以便于后续的重试或者其他操作。
如何设置删除策略
可以通过在 RocketMQ Broker 的配置文件 broker.conf 中进行设置。
设置立即删除策略:
broker.autoDeleteMsgInterval=0
设置延迟删除策略:
broker.autoDeleteMsgInterval=1200000
其中,broker.autoDeleteMsgInterval参数表示自动删除过期消息的时间间隔,单位为毫秒。如果该参数设置为0,那么Broker将会采用立即删除策略;如果该参数设置为大于0的数值,那么Broker将会采用延迟删除策略,并在指定的时间间隔之后自动删除已经被标记为“已消费”的消息。
需要注意的是,自动删除功能只适用于非持久化的消息,对于持久化的消息,需要手动删除。同时,对于存在事务相关的消息,在事务完成之前不会进行删除操作。
介绍一下RocketMQ Commitlog
RocketMQ 的消息存储是基于 CommitLog 实现的。CommitLog 是 RocketMQ 中最重要的组件之一,它是一个类似日志的存储系统,所有的消息在被存储之前都要写入到 CommitLog 中。
具体来说,当一个生产者生产一条消息时,该消息会首先被写入到内存缓冲区中,然后再被异步刷写到磁盘上的CommitLog文件中。同样的,当消费者消费一条消息时,该消息的偏移量也会被记录在CommitLog文件中,以便于Broker在后续的消费中能够准确地判断该消息是否已经被消费过。
RocketMQ 的 CommitLog 可以支持高并发的写入和读取,对于不同的 Topic 和 Queue 会生成不同的文件,预设大小为1G,每个文件可以存储约30~40万条消息。在一定时间间隔内,由 RocketM Q的清理线程定期清理已经被消费的消息,以保证 CommitLog 文件的使用效率和文件大小的合理性。
总之,CommitLog 是 RocketMQ 的核心存储组件,保证了 RocketMQ 的高可靠性、高性能和高扩展性。
RocketMQ默认什么时候删除已经消费的消息
在RocketMQ中,已经被消费的消息的清理时间取决于消息消费进度的存储方式。RocketMQ提供了两种消息进度存储方式,分别是文件方式存储和数据库方式存储。
文件方式存储
RocketMQ默认采用文件方式存储消息消费进度。在该方式下,RocketMQ会定时地将消费进度信息保存到本地文件中,文件名以“消费组名-offset”为格式。
对于已经被消费的消息,RocketMQ在下面两种情况中会将它从CommitLog中删除:
- 所有的消费者已经确认消费该消息;
- 存在至少一个消费者没有确认消费该消息,并且距离该消息在CommitLog中的存储时间已经超过了消息保留时间。**在RocketMQ的默认配置下,消息的保留时间为72小时(fileReservedTime)。**这意味着在这个时间之后,即使有消费者没有消费该消息,RocketMQ也会将其从CommitLog中删除。
数据库方式存储
当RocketMQ采用数据库方式存储消费进度时,所有的消费进度信息会被存储到指定的数据库中,包括消费者的消费进度、消费者组的订阅关系以及广播模式下的消息进度等。
对于已经被消费的消息,RocketMQ在下面两种情况中会将它从CommitLog中删除:
- 所有的消费者已经确认最大进度,在本地数据库中都进行了标记;
- 存在至少一个消费者没有确认该消息,并且超过了消息在CommitLog中的保留时间。
RocketMQ默认的消息保留时间和CommitLog文件存储的数据保留时间相同,为72小时。当消息的存储时间超过该时间时,RocketMQ会将其删除。
总之,RocketMQ在默认情况下会在消息的保留时间到期之后自动删除已经被消费的消息,以保证存储空间的有效利用。需要特别注意的是,在使用自定义配置时,删除消息的策略可能会有所不同。
我们也可以指定消息删除时间,deleteWhen = 04,表示几点做消息删除动作,默认是凌晨4点。
RocketMQ消费模式有几种?
RocketMQ消费模式有以下三种:
-
集群消费模式(Clustering mode):集群消费模式下,一个消费者集群多台机器共同消费一个 topic 的多个队列,一个队列只会被一个消费者消费。如果某个消费者挂掉,分组内其它消费者会接替挂掉的消费者继续消费,如下图所示。
-
广播消费模式(Broadcasting mode):多个消费者各自消费一遍消息,广播消费模式不需要消费者进行分组,每个消费者都能消费到消息。
-
定序消费模式(Orderly mode):确保同一个消费者消费同一队列的消息时,消息的顺序是有序的。
消费消息是push还是pull?
pull 其实就是消费者主动从 MQ 中去拉消息,而 push 则像 rabbitMQ 一样,是 MQ 给消费者推送消息。但是 Rocket MQ的 push 其实是基于 pull 来实现的。它会先由一个业务代码从 MQ 中 pull 消息,然后再由业务代码 push 给特定的应用/消费者。其实底层就是一个 pull 模式。
RocketMQ没有真正意义的push,都是pull,虽然有push类,但实际底层实现采用的是长轮询机制,即拉取方式。
在Push Consumer中,消费者订阅某个topic下的消息后,Broker会主动将消息推送到Consumer消费。但是,为了保证消息的可靠性,当Broker将消息发送给Consumer的时候,如果Consumer没有正确消费该条消息,Broker需要重新将消息发送给Consumer,直到Consumer正确消费后才会进行下一条消息的推送。这样就会增加Broker的负担,同时也会增加消费者接收消息的延迟。
因此,为了保证消息的可靠性和避免资源浪费,RocketMQ采用了长轮询的方式,通过在客户端发起拉取请求,服务器返回当前可用的消息列表,如果没有新消息,则等待一段时间后返回一个空列表,从而达到了推送的效果。
介绍一下 RocketMQ 长轮询机制
RocketMQ中的长轮询机制主要是指PullConsumer的长轮询。PullConsumer会在客户端发起消息拉取请求后,如果Broker没有可用的消息,就会等待一段时间(pullInterval,默认为0),直到Broker有新的消息可供拉取或者等待超时才返回空列表。如果有新的消息,Broker会在返回可用消息列表的同时返回下一次消息拉取的建议时间(pullNextDelayTime,默认为0),以便客户端在下一次拉取时能够更加高效地拉取消息。
RocketMQ的PullConsumer还支持批量拉取,即一次拉取多条消息,以提高消费效率。同时,可以通过设置拉取线程数、拉取队列数、拉取批次数等参数,进一步提升消息的拉取效率和处理能力。
通过长轮询机制,RocketMQ能够在保证消息可靠性的前提下,实现快速、高效的消息拉取,并提供了多种参数和策略,满足不同用户对于消费性能和延迟的需求。
PullConsumer 和 PushConsumer 是什么
在RocketMQ中,PullConsumer和PushConsumer是两种消息消费的方式。
- PullConsumer
PullConsumer是一种主动拉取消息的消费方式,它主动向Broker发起拉取消息的请求,并进行消费。与PushConsumer不同,PullConsumer不需要注册回调函数,使用更加灵活,适合对消费实时性要求不那么高的场景。但是需要注意的是,由于PullConsumer需要主动从Broker中拉取消息,因此如果拉取频率过高,会增加Broker的负担。在使用PullConsumer时需要合理地设置拉取参数,避免对Broker和系统造成不必要的压力。
- PushConsumer
PushConsumer是一种被动的消费方式,它需要注册一个回调函数,当有消息到达时,Broker会主动推送消息给Consumer,Consumer在回调函数中对消息进行消费。PushConsumer适用于对实时性要求较高的业务场景,但是对于消息的可靠性和消费速率等方面相对于PullConsumer有所劣势。同时,PushConsumer需要保持与Broker的长连接,消耗资源较多,如果Broker上的推送消息较多,也会增加客户端的负担。
两种消费方式在实际使用中,选择合适的方式要根据实际情况以及业务需求综合考虑。
这里的客户端指的是什么
这里的客户端指的是RocketMQ消费者(Consumer),既包括PullConsumer,也包括PushConsumer。RocketMQ消费者通过向Broker发起消费请求,从而获取消息并进行消费。在这个过程中,消费者充当了应用程序的角色,负责从RocketMQ中获取消息,并进行业务处理。
对于PushConsumer,客户端需要保持和Broker之间的长连接,主动接收和消费Broker推送的消息。而对于PullConsumer,客户端则需要主动发起消息拉取请求,并根据系统返回的消息列表进行消费。
无论是PullConsumer还是PushConsumer,都要维护与Broker之间的通信,并保持消费进度的一致性,以确保消息能够在系统内可靠地传输和消费。
服务端呢
服务端指的是RocketMQ的消息服务组件(Broker),它是负责存储和传输消息的中心组件。Broker管理着所有的Topic,将消息存储在对应的Queue中,等待消费者进行消费。
当消费者(客户端)向Broker发起消费请求时,Broker会检查该消费者对应的消费组是否有订阅该Topic的权限,如果有则根据消费模式推送消息或返回可拉取消息列表。同时,Broker还负责对消息进行存储、持久化、发送等操作,并提供了丰富的消息过滤、消息重发等功能,保证消息能够被可靠地传输和消费。
RocketMQ的Broker采用分布式部署形式,支持主从同步复制和异步复制,从而保证系统的高可用性和数据安全性。通过Broker的集群或者分布式部署,可以扩展RocketMQ的消息处理能力,应对高并发、高吞吐量的业务场景。
回调函数又是什么
回调函数(Callback function)是指在程序运行过程中,将一个函数作为变量传给另一个函数,并且这个函数可以在另一个函数执行完成后被调用或执行的机制。简单来说,回调函数就是一个在某个时间点回调(被调用)执行的函数。
在消息处理中,回调函数用于处理消费者接收到的消息,通常会将消息传递给业务逻辑模块进行处理。在RocketMQ中,PushConsumer通过注册一个回调函数来接收并处理队列中的消息。RocketMQ将已经消费的消息偏移量、消息消费状态(成功或失败)等信息传递给回调函数进行处理,并根据不同的消息处理结果对消息进行重发或进入死信队列等操作。
使用回调函数可以让代码结构更加清晰,降低代码耦合度,提高代码的可维护性和复用性。另外,回调函数还能够实现异步处理,提高程序性能和响应速度,增强用户体验。