二十五 RocketMQ
1 消息队列介绍
消息队列,简称 MQ(Message Queue),它其实就指消息中间件,当前业界比较流行的开源消息中间件包括:RabbitMQ、RocketMQ、Kafka。(一个使用队列来通信的组件)它的本质,就是个转发器,包含发消息、存消息、消费消息的过程。最简单的消息队列模型如下:
2 消息队列的使用场景
消息队列(Message Queue)是一种在分布式系统中进行异步通信的方法,它允许应用程序之间发送和接收消息,而无需立即进行同步处理。消息队列的使用场景非常广泛,以下是一些常见的使用场景:
① 异步处理:
当一个应用需要执行耗时操作时(如发送邮件、处理图片、进行数据分析等),可以将这些操作放入消息队列中,由后台任务异步处理,从而避免阻塞主线程,提高应用的响应速度。
② 解耦:
在微服务架构中,不同的服务之间可能需要相互通信。通过消息队列,可以将这些服务解耦,使得它们可以独立地进行开发和部署。当一个服务需要调用另一个服务的功能时,只需将消息发送到队列中,然后由另一个服务从队列中取出消息并进行处理。
③ 流量削峰:
在高并发场景下,应用可能会在短时间内接收到大量的请求。如果直接将这些请求交给后台处理,可能会导致系统崩溃。通过将请求放入消息队列中,可以平滑地处理这些请求,避免系统过载。同时,后台可以根据处理能力,从队列中逐步取出消息进行处理。
④ 日志处理:
在分布式系统中,日志的收集和处理是一个重要的问题。通过将日志消息发送到消息队列中,可以集中地对这些日志进行存储、分析和处理。
⑤ 任务调度:
消息队列可以用于实现任务调度系统。例如,可以使用消息队列来触发定时任务、延迟任务或周期性任务。
⑥ 数据同步:
在分布式系统中,不同的服务或数据库之间可能需要保持数据的一致性。通过消息队列,可以实现数据的异步同步。当一个服务的数据发生变化时,可以将变化的信息发送到消息队列中,然后由其他服务从队列中取出信息并更新自己的数据。
⑦ 系统解耦和容错:
消息队列可以作为一个中间件,将不同的系统或组件连接起来。当某个系统或组件出现故障时,其他系统或组件可以继续从消息队列中取出消息进行处理,从而实现系统的容错性。
⑧ 消息广播和订阅:
消息队列支持发布/订阅模式,允许一个或多个消费者订阅某个主题(Topic),当有新消息发布到该主题时,所有订阅了该主题的消费者都会收到该消息。这种模式适用于需要广播消息的场景,如实时推送、新闻发布等。
⑨ 分布式事务:
在分布式系统中实现事务性操作是一个挑战。通过使用消息队列和补偿事务等机制,可以实现分布式事务的可靠执行。当一个操作失败时,可以通过发送补偿消息来撤销之前的操作,确保数据的一致性。
⑩ 数据收集和分析:
消息队列可以用于收集各种类型的数据(如用户行为数据、系统日志等),并将这些数据发送到大数据平台或数据仓库中进行进一步的分析和处理。
3 如何解决消息丢失问题
一个消息从生产者产生,到被消费者消费,主要经过这 3 个过程:生产者(生产消息),队列(存储消息),消费者(消费消息),保证 MQ 不丢失消息,可以从这三个阶段阐述:
① 生产者保证不丢消息(确保生产的消息能到达存储端)
如果是 RocketMQ 消息中间件,Producer 生产者提供了三种发送消息的方式,分别是:同步发送;异步发送;单向发送。生产者要想发消息时保证消息不丢失,可以:采用同步方式发送,send 消息方法返回成功状态,就表示消息正常到达了存储端 Broker。如果 send 消息异常或者返回非成功状态,可以重试。可以使用事务消息,RocketMQ 的事务消息机制就是为了保证零丢失来设计的
② 存储端不丢消息(确保消息持久化到磁盘 – 刷盘机制)
刷盘机制分同步刷盘和异步刷盘:生产者消息发过来时,只有持久化到磁盘,RocketMQ 的存储端 Broker 才返回一个成功的 ACK 响应,这就是同步刷盘。它保证消息不丢失,但是影响了性能。异步刷盘的话,只要消息写入 PageCache 缓存,就返回一个成功的 ACK 响应。这样提高了 MQ 的性能,但是如果这时候机器断电了,就会丢失消息。
Broker 一般是集群部署的,有 master 主节点和 slave 从节点。消息到 Broker存储端,只有主节点和从节点都写入成功,才反馈成功的 ack 给生产者。这就是同步复制,它保证了消息不丢失,但是降低了系统的吞吐量。与之对应的就是异步复制,只要消息写入主节点成功,就返回成功的 ack,它速度快,但是会有性能问题。
③ 消费阶段不丢消息
消费者执行完业务逻辑,再反馈会 Broker 说消费成功,这样才可以保证消费阶段不丢消息。
4 消息队列如何保证消息的顺序性
① 单一生产者-单一消费者模式:
在这种模式下,只有一个生产者将消息发送到队列,而只有一个消费者从队列中获取消息。这样可以自然保证消息按照发送的顺序被消费。
② 分区顺序消息:
一些消息队列系统(如Kafka)支持分区的概念。虽然不同分区之间的消息是并行处理的,但在同一个分区内的消息是有序的。如果业务允许,可以将需要保持顺序的消息发送到同一个分区中。
③ 消息序列号:
生产者在发送消息时给每条消息加上序列号,消费者在接收消息后根据序列号进行排序处理。这种方法在消息被打乱的情况下仍然可以恢复消息的顺序。
④消息分组:
一些消息队列支持消息分组的概念,同一分组中的消息会按照发送的顺序被消费。通过将需要保持顺序的消息放入同一个分组,可以确保它们被按顺序处理。
⑤ 延迟处理:
即使消息乱序到达,消费者在处理消息时也可以根据消息中的时间戳或其他信息来重新排序。但这需要消费者具有额外的处理能力,并且可能会增加处理的复杂性。
⑥ 队列设计:
在消息队列中创建多个队列,将同一规则的数据(例如基于唯一标识的哈希)放入同一个队列中。然后确保消费者只从一个队列中取出并消费数据,从而保持顺序性。
⑦ 分布式系统中的分区和路由:
对于像Kafka这样的系统,可以通过精心设计的分区键(Partition Key)来确保相关消息被路由到同一个分区,从而保持顺序。
⑧ 使用特定的消息队列系统:
一些消息队列系统(如RabbitMQ)提供了更高级的特性来支持消息的顺序性。例如,RabbitMQ中的消息可以具有优先级,或者通过特定的队列和交换机配置来确保顺序。
⑨ 在消费者端保证顺序:
如果消息队列系统本身不支持严格的顺序保证,消费者可以在接收到消息后根据业务逻辑在本地进行排序。但这种方法可能会增加消费者的复杂性和性能开销。
5 消息队列如何避免重复消费(幂等)
① 避免消息重复消费的策略:
消息去重标识:在消息中添加唯一标识(如UUID、时间戳和序列号等)。消费者在处理消息时,会记录已处理的消息ID。当再次收到相同的消息ID时,消费者会忽略它,从而避免重复处理。
消息消费确认机制:消费者成功处理消息后,应及时向消息队列发送确认消息(如RabbitMQ中的ack)。这样,消息队列可以删除或标记已消费的消息,避免再次推送。
消息过期设置:为消息设置TTL(Time To Live,生存时间),确保消息在一定时间内被消费。过期的消息将被消息队列自动删除,从而避免过期消息的重复消费。
分布式锁:在多个消费者共享同一个队列的情况下,可以使用分布式锁来确保同一时间只有一个消费者能够处理某条消息。
②实现幂等性的策略:
唯一标识符:在消息中添加唯一标识符,并在处理消息时使用该标识符来检查操作是否已经执行过。如果已经执行过,则忽略该消息;否则,执行相应的操作。
数据库唯一键约束:在数据库中为需要保证幂等性的数据设置唯一键约束。当尝试插入重复数据时,数据库将拒绝该操作,从而确保操作的幂等性。
状态机:使用状态机来管理消息的处理流程。在消息处理过程中,根据当前状态和目标状态来判断是否需要执行相应的操作。如果操作已经执行过,则状态机将跳过该操作,确保幂等性。
去重表:在数据库中创建一个去重表,用于记录已经处理过的消息ID。在处理消息时,首先检查去重表中是否存在该消息的ID。如果存在,则忽略该消息;否则,执行相应的操作并将消息ID添加到去重表中。
6 如何处理消息队列的消息积压问题
① 增加消费者数量:
增加消费者数量可以提高消息的消费速度,从而减少积压现象。但是,增加消费者数量也会增加系统的负载,因此需要权衡性能和资源消耗之间的关系。
② 调整消息队列参数:
根据实际情况调整消息队列的参数,如增加队列容量、调整消息过期时间等,以减少积压现象。但是,过度调整参数可能会导致系统的不稳定性,因此需要谨慎操作。
③ 优化消费者处理逻辑:
优化消费者端的代码逻辑和处理过程,提高消费端的处理能力。这包括使用多线程或多进程来并发处理消息,或者采用分布式处理方式,将消息分配给多个消费者进行处理。
④ 调整消息处理的优先级:
根据消息的重要性和紧急程度,调整消息处理的优先级。优先处理重要的消息,确保关键业务的及时性,而对于非关键的消息可以进行降级处理或延后处理。
⑤ 增加队列分区:
如果消息队列支持分区,可以将消息分散到多个队列中,避免单个队列出现积压。这样可以通过增加队列数量来提高消息的并发处理能力。
⑥ 设置合理的超时机制:
在消费者端设置合理的超时机制,避免因为处理时间过长而导致消息积压。可以设置超时时间,并在超时后对消息进行重新处理或者进行补偿操作。
⑦ 消息队列组件重选型:
如果当前使用的消息队列组件在性能上无法满足需求,可以考虑更换为更成熟、高性能、高吞吐量的消息队列组件,如RabbitMQ、RocketMQ、Kafka等。
⑧ 异步处理:
在消息队列积压的情况下,可以采用异步处理的方式,将消息存储到数据库中,然后通过异步任务来处理。这样可以减少对消息队列的压力,从而降低积压的风险。
⑨ 监控和告警:
实时监控消息队列的状态和性能指标,如队列长度、消费速度、处理时间等。当发现消息积压问题时,及时触发告警并采取相应的处理措施。
⑩ 资源扩容:
如果消息积压问题是由于系统资源不足导致的,可以考虑增加硬件资源或进行扩容操作,以提高系统的处理能力。
处理消息队列的消息积压问题需要综合考虑多个方面,包括调整消费者数量、优化消费者处理逻辑、调整消息处理优先级、增加队列分区、设置合理的超时机制、更换消息队列组件、异步处理、监控和告警以及资源扩容等。通过综合使用这些方法,可以有效解决消息队列的消息积压问题。
7 消息中间件如何做到高可用
① 集群部署:
在多台服务器上部署消息中间件实例,构成一个集群。当一台机器出现故障时,其他机器可以继续提供服务,从而提高整体系统的可用性。
② 主从复制(Master-Slave Replication):
设置主从节点,消息中间件会将主节点上的数据同步到从节点上。当主节点出现故障时,可以快速切换到从节点提供服务,保证系统的连续性。
③ 负载均衡:
在消息中间件前面使用负载均衡设备或软件,将请求分发到多个消息中间件实例上,避免单点故障,并充分利用集群中的资源。
④ 容错机制:
设计容错机制来处理节点故障、网络中断等异常情况。例如,通过消息确认机制确保消息被正确处理,利用消息持久化确保在节点故障时不会丢失消息。
⑤ 分布式消息队列:
使用分布式消息队列来实现消息的存储和发送。分布式消息队列可以将消息存储在网络中,并通过网络进行发送,从而提高了系统的可扩展性和容错性。
⑥ RabbitMQ的高可用机制:
对于RabbitMQ这样的消息中间件,可以采用普通集群、镜像集群或仲裁队列等机制来实现高可用。普通集群(或标准集群)会在集群各个节点间共享部分数据(如交换机、队列元信息),但不包含队列中的消息。当队列所在节点宕机时,队列中的消息会丢失。镜像集群是主从模式,交换机、队列和队列中的消息会在各个RabbitMQ的镜像节点之间同步备份。这提高了消息的可用性,但也可能存在数据丢失的风险。仲裁队列是RabbitMQ 3.8版本以后的新功能,用来替代镜像队列,同样支持主从数据同步。
⑦ 消息持久化:
将消息持久化到磁盘或其他存储介质中,以确保在节点故障或系统重启时不会丢失消息。
⑧监控和告警:
实时监控消息中间件的状态和性能指标,如队列长度、消息延迟、处理速度等。当检测到异常或潜在问题时,及时触发告警并采取相应的处理措施。
⑨ 容灾恢复:
制定容灾恢复计划,定期备份数据和配置信息。在发生严重故障时,能够快速地恢复系统和数据。
实现消息中间件的高可用性需要综合考虑多个方面的因素,包括集群部署、主从复制、负载均衡、容错机制、分布式消息队列、消息持久化、监控和告警以及容灾恢复等。通过合理的设计和配置,可以确保消息中间件在出现故障时仍能提供可靠的服务。
8 如何保证数据一致性(事务消息)
数据一致性是分布式系统中一个非常重要的概念,它指的是在多个节点或组件上存储的数据副本应该保持一致。以下是一些保证数据一致性的常见方法,以及事务消息的实现方式:
一、保证数据一致性的方法:
①分布式事务:
分布式事务是在多个节点上的数据操作中,提供事务的ACID特性(原子性、一致性、隔离性和持久性)的方法。它可以确保所有节点上的操作要么全部成功,要么全部失败。常见的分布式事务实现方式包括使用基于日志的方法和使用两阶段提交协议。
② 副本同步:
副本同步可以通过多种方式实现,如同步复制、异步复制和半同步复制等。在这些复制过程中,保证副本的同步性即为保证数据一致性的一项重要措施。
③ 基于版本的控制:
通过为每个数据操作分配唯一的时间戳或版本号,可以准确地追踪和管理数据的变化。当数据发生冲突时,基于版本的控制可以自动解决冲突,保证数据的一致性。
④ 负载均衡:
在分布式系统中,负载均衡是提高系统性能和可伸缩性的重要手段。然而,负载均衡的实现需要注意数据一致性的问题,例如通过一致性哈希算法等机制来确保数据访问的正确性。
二、事务消息的实现:
事务消息是确保在分布式系统中数据一致性的重要手段之一。以下是事务消息的一般实现步骤:
①发送半事务消息:
发送方将半事务消息发送至消息中间件(如RocketMQ、RabbitMQ等)。消息中间件在接收到消息后,会将其持久化存储,并向发送方返回确认消息已经发送成功。
② 执行本地事务:
发送方在发送半事务消息后,开始执行本地事务逻辑。这个事务逻辑可能涉及数据库操作或其他业务逻辑。
③ 提交或回滚事务:
根据本地事务的执行结果,发送方向消息中间件提交二次确认(Commit或Rollback)。如果事务执行成功,则提交Commit;如果事务执行失败或需要回滚,则提交Rollback。
④ 消息中间件处理二次确认:
消息中间件在接收到二次确认后,根据确认的类型(Commit或Rollback)对半事务消息进行相应的处理。如果收到Commit确认,则将半事务消息标记为可投递状态,并将其发送给订阅方;如果收到Rollback确认,则删除半事务消息,订阅方将不会收到该消息。
⑤ 消息回查:
在特殊情况下(如网络故障、应用重启等),发送方可能无法及时提交二次确认。此时,消息中间件会定期发起消息回查,要求发送方再次确认本地事务的最终状态。发送方根据检查得到的本地事务的最终状态再次提交二次确认,消息中间件仍按照上述步骤对半事务消息进行操作。
通过以上步骤,事务消息可以确保在分布式系统中数据的一致性。当发送方执行本地事务时,它可以确保数据的正确性;当消息中间件处理二次确认时,它可以确保所有相关的操作都按照预期的顺序和结果执行。这样,即使在复杂的分布式系统中,也可以保证数据的一致性和正确性。
9 RocketMQ 的角色组成
角色 | 作用 |
---|---|
Nameserver | 无状态,动态列表;这也是和 zookeeper 的重要区别之一。zookeeper 是有状态的。 |
Producer | 消息生产者,负责发消息到 Broker。 |
Broker | 就是 MQ 本身,负责收发消息、持久化消息等。 |
Consumer | 消息消费者,负责从 Broker 上拉取消息进行消费,消费完进行 ack。 |
10 RocketMQ Broker 中的消息被消费后会立即删除吗
不会,每条消息都会持久化到 CommitLog 中,每个 Consumer 连接到 Broker后会维持消费进度信息,当有消息消费后只是当前 Consumer 的消费进度(CommitLog 的 offset)更新了。
默认 48 小时后会删除不再使用的 CommitLog 文件