🌸个人主页:https://blog.csdn.net/2301_80050796?spm=1000.2115.3001.5343
🏵️热门专栏:
🧊 Java基本语法(97平均质量分)https://blog.csdn.net/2301_80050796/category_12615970.html?spm=1001.2014.3001.5482
🍕 Collection与数据结构 (93平均质量分)https://blog.csdn.net/2301_80050796/category_12621348.html?spm=1001.2014.3001.5482
🧀线程与网络(96平均质量分) https://blog.csdn.net/2301_80050796/category_12643370.html?spm=1001.2014.3001.5482
🍭MySql数据库(93平均质量分)https://blog.csdn.net/2301_80050796/category_12629890.html?spm=1001.2014.3001.5482
🍬算法(97平均质量分)https://blog.csdn.net/2301_80050796/category_12676091.html?spm=1001.2014.3001.5482
🍃 Spring(97平均质量分)https://blog.csdn.net/2301_80050796/category_12724152.html?spm=1001.2014.3001.5482
🎃Redis(97平均质量分)https://blog.csdn.net/2301_80050796/category_12777129.html?spm=1001.2014.3001.5482
🐰RabbitMQ(97平均质量分) https://blog.csdn.net/2301_80050796/category_12792900.html?spm=1001.2014.3001.5482
感谢点赞与关注~~~
目录
- 1. 幂等性保障
- 1.1 幂等性介绍
- 1.2 解决方案
- 2. 顺序性保障
- 2.1 顺序性保障
- 2.2 顺序性保障方案
- 3. 消息积压问题
- 3.1 原因分析
- 3.2 解决方案
该章节中会涉及到RabbitMQ消息处理中可能出现的各种问题以及解决方案,所以这一章也是面试中的重点.
1. 幂等性保障
1.1 幂等性介绍
幂等性是数学和计算机科学中某些运算的性质,它们可以多次被应用,而不会改变初始的结果.
- 应用程序的幂等性
在应用程序中,幂等性就是指对一个系统的资源进行重复调用,不论多少次请求,这些请求对系统的影响都是相同的效果.
比如数据库的select操作,不同时间两次查询的结果不可能不同,但是这个操作是符合幂等性的,需要注意的是,幂等性是对资源的影响,而不是返回结果.查询数据库对数据资源基本不会产生影响.再比如数据库的update操作就不是幂等性的,它会对数据库中的资源造成修改,操作前后的数据资源是不一样的.
再比如我们在之前的网络通信中也提到过 - MQ的幂等性
对于MQ而言,幂等性是指同一条消息多次消费,对系统的影响是相同的.
一般消息中间件的消息传输分为三个层级:- At most once:最多⼀次.消息可能会丢失,但绝不会重复传输.(可靠性比较低)
- At least once:最少⼀次.消息绝不会丢失,但可能会重复传输.(可靠性高)
- Exactly once:恰好⼀次.每条消息肯定会被传输⼀次且仅传输⼀次.
RabbitMQ支持的是"最多⼀次"和"最少⼀次".
对于恰好一次,目前市面上的消息中间件都做不到这一点,这会对消息中间件的性能产生恒大的影响.
在业务场景中,对于消息可靠性要求比较高的场景,建议使用最少一次,以防止消息丢失. "最多⼀次"会因为消息发送过程中,网络问题,消费出现异常等种种原因,导致消息丢失.
以下场景可能会导致消息重复发送:
- 发送时消息重复: 当一条消息被成功发送到服务器并完成持久化,此时出现了网络闪断或者客户端宕机,导致服务端对客户端应答失败.如果此时Producer意识到消息发送失败并尝试再次发送消息,Consumer后续会收到两条内容相同并且Message ID也相同的消息.(发送者确认没有得到应答)
- 投递时消息重复: 消息消费的场景下,消息已投递到Consumer并完成业务处理,当客户端给服务端反馈应答的时候网络闪断. 为了保证消息至少被消费⼀次,云消息队列RabbitMQ版的服务端将在网络恢复后次尝试投递之前已被处理过的消息,Consumer后续会收到两条内容相同并且Message ID也相同的消息.(消息确认没有得到应答)
1.2 解决方案
MQ消费者的幂等性的解决方法,一般有以下几种:
- 全局唯一ID
为每条消息分配一个唯一标识符,比如UUID或者MQ消息中唯一的ID,但是一定要保证唯一性.
消费者接收到消息之后,先用该ID判断该消息是否已经消费过,如果已经消费过,则放弃处理.
如果没有消费过,消费者开始处理消息,业务处理成功之后,把唯一的ID保存起来(数据库或Redis等).
可以使用Redis的原子性操作,比如使用Set操作,在后面加上nx命令(存在的时候不设置)来保证幂等性,将唯一的id设置在Redis中,如果返回1,则说明之前没有设置过,正常消费,如果返回的是0,说明之前这个id被保存过,即证明这条消息已经被消费过了,不再进行消费,自动抛弃.
- 业务逻辑判断
在业务逻辑层面进行判断以处理幂等性.
例如: 通过检查数据库中是否已经存在相关的数据记录,或者使用乐观锁机制来避免更新已经被其他的事务修改的数据,再或者在处理消息之前,先检查业务相关的状态,确保消息对应的操作尚未执行,然后才进行处理,具体根据业务场景来处理.
2. 顺序性保障
2.1 顺序性保障
消息的顺序性是指消费者消费的消息和生产者发送消息的顺序是一致的.很多业务场景下,消息的消费是不用保证顺序的,但有些业务场景,可能存在多个消息顺序处理的情况.比如用户信息修改,对同⼀个用户的同⼀个资料进行修改,需要保证消息的顺序.
哪些情况可能会打破RabbitMQ的顺序性呢?下面介绍几种常见的场景:
- 多个消费者:当一个队列配置了多个消费者的时候,消息可能会被不同的消费者并行处理,而有的消费者处理消息快,有的消费者处理消息慢,从而导致消息处理的顺序性无法保证.
- 网络波动或者异常: 在消息传递的过程中,如果出现网络波动或者异常,可能会导致ACK丢失,从而会让消息重新加入队列中,造成顺序性问题.
- 消息重试: 如果消费者在处理消息之后未能及时发送确认,或者确认消息在传输过程中丢失,那么MQ可能会认为消息未被成功消费而进行重试,这也可能导致消息处理的顺序性问题.
- 消息路由问题: 在复杂的路由场景中,消息可能会根据路由键被发送到不同的队列中,从而无法保证全局的顺序性.
- 死信队列: 消息因为某些原因,进入了死信队列,死信队列被消费之后无法保证消息的顺序和生产者发送消息的顺序一致.
2.2 顺序性保障方案
消息顺序性保障分为: 局部顺序性保障和全局顺序性保障方案
局部顺序性通常指的是在单个队列内部保证消息的顺序.全局顺序性是指在多个队列或多个消费者之间保证消息的顺序.
接下来说一下消息的顺序性保障的常见策略:(注意,下面的处理方式在一些场景之下如果只使用一个是无法保证顺序性的,我们需要把这些方案进行综合运用,才可以保证顺序性)
- 单队列单消费者
最简单的方法是使用单个队列,并有单个消费者进行处理,同⼀个队列中的消息是先进先出的,这是RabbitMQ来帮助我们保证的. - 分区消费
单个消费者的吞吐太低了,当需要多个消费者以提高处理速度时,可以使用分区消费.把⼀个队列分割成多个分区,每个分区由一个消费者处理,以此来保持每个分区内消息的顺序性.(针对一个队列有多个消费者的场景)
但是RabbitMQ本身并不支持单个队列的分区消费,需要业务逻辑实现,或者借助Spring-cloud-Stream来实现.可参考:https://docs.spring.io/spring-cloud-stream/reference/rabbit/rabbit_partitions.html - 消息确认机制
使用手动消息确认机制,消费者在处理完一条消息之后,显示地发送确认,这样RabbitMQ才会移除并继续发送下一条消息. - 业务逻辑控制
在某些情况下,即使消息乱序到达,也可以在业务逻辑层面实现顺序控制.比如通过在消息中嵌入序列号,并在消费时根据这些信息来处理.
RabbitMQ本身并不保证全局的严格顺序性,特别是在分布式系统中,在实际应用开发中,我们需要根据具体的业务需求,可能需要结合多种策略来实现所需要的顺序性保证.
3. 消息积压问题
3.1 原因分析
消息积压指的是在消息队列中,等待处理的消息数量超过了消费者的处理能力,导致消息在队列中不断堆积的现象.
通常有以下的几种原因:
- 消息生产过快: :在高流量或者高负载的情况下,生产者以极高的速率发送消息,超过了消费者的处理能力.
- 消费者处理能力不足: 消费者处理消息的速度跟不上消息生产的速度,也会导致消息在队列中积压.
可能得原因有:
- 消费端业务逻辑复杂,耗时长.
- 消费端代码性能低
- 系统资源限制,如CPU,内存,磁盘等也会限制消费者处理消息的效率.
- 异常处理不当,消费者在处理消息的时候出现异常,消息无法被正确处理和确认导致消息会进行重试,从而导致消息的积压.
- 网络问题: 因为网络延迟或者不稳定,消费者无法及时接收或确认消息,最终导致消息积压.
- RabbitMQ服务器配置偏低
3.2 解决方案
遇到消息积压的时候,首先要分析消息积压造成的原因.根据原因来调整策略.
主要从以下几个方面来提升效率:
- 提高消费者效率
- 增加消费者实例数量,比如新增机器.
- 优化业务逻辑,比如使用多线程并发处理业务.
- 设置prefetchCount,当一个消费者处于繁忙阶段的时候,把消息转发到其他未阻塞的消费者.
- 消息发生异常时,设置合适的重试策略,或者是转入到死信队列.
- 限制生产者速率
- 流量控制: 在消息生产者中实现流量控制,根据消费者处理能力动态调整发送速率.
- 限流: 使用限流工具,为消息发送速率设置一个上限.
- 设置过期时间:,如果消息过期未消费,可以配置死信队列,以免消息丢失,并减少对主队列的压力.
- 资源与配置优化.比如升级RabbitMQ服务器硬件,调整RabbitMQ的配置参数.