Rabbitmq业务难点
- 1.消息生产者发送的消息无法路由到任何一个队列怎么处理?
- 2.聊聊Rabbitmq的七种工作模式
- 3.Rabbitmq的消息确认机制
- 4.Rabbitmq的消息持久化
- 5.发布确认模式如何确保生产者能够成功将消息投递到消息队列
- 6. Rabbitmq基于队列设置消息过期时间和单独针对消息设置过期时间的不同之处?
- 7.聊聊死信队列
- 8. 优先级队列有啥问题
- 9.备份交换机有啥用
- 10.惰性队列
- 10.Rabbitmq如何实现延迟队列功能
- 综合问题
- 使用消息队列的优势和劣势有哪些,劣势问题又如何解决呢?
- 消息顺序性如何保证 ?
- 如何避免消息重复消费?
- 如何确保消息的可靠传输?
- 如何确保消息正确发送到消息队列?
- 如何确保消费方正确消费了消息?
- 如何确保消息队列重启后不会丢失消息?
- Rabbitmq主备集群和镜像集群
- 消息积压怎么处理
1.消息生产者发送的消息无法路由到任何一个队列怎么处理?
消息生产者如果向交换机发送了一个无法被路由到任何队列上的消息,那么此时交换机会判断消息的mandatory属性值:
- false(默认值): 设置了兜底交换机,那么消息转交给该交换机,否则只简单记录警告日志。
- true: 将消息返回给对应的消息生产者,这一过程是通过回调消息生产者提供的处理回退消息接口完成的,如果生产者没有提供相关回调接口,消息则会被丢弃。
//声明一个直连交换机--向test交换机发送一条消息,路由key为123,此时我们有没有提供对应的队列绑定的路由值123
//将mandatory参数设置为true
channel.exchangeDeclare("test", BuiltinExchangeType.DIRECT,true,false,null);
//生产者提供一个消息回退接口,当前出现当前情况下,会调用该接口,处理发送失败的方法
channel.addReturnListener(new RouteFailListener());
channel.basicPublish("test","123",true,null,"你好".getBytes(StandardCharsets.UTF_8));
2.聊聊Rabbitmq的七种工作模式
- 简单队列模式: 默认交换机(直连交换机)+队列
- 工作队列模式: 默认交换机(直连交换机)+队列+多消费者: 默认采用轮询派发机制,可以通过设置预取值,让消费能力更强的消费者获得更多的消息。
消费端程序调用了 channel.basicQos(5) ,之后订阅了某个队列进行消费。 RabbitMq 会保存一个消费者的列表,每发送一条消息都会为对应的消费者计数,计数达到5后,那么RabbitMQ就不会向这个消费者再发消息。消费者确认了某条消息处理完后,RabbitMQ 将相应的计数减1之后消费者可以继续接收消息,直到再次到达计数上限。这种机制可以类比于 TCP IP中的"滑动窗口"
- 发布订阅模式: 把交换机里的消息发送给所有绑定该交换机的队列,忽略路由key,此时声明的交换机类型为扇形交换机。
- 路由模式: 采用直连交换机类型实现,交换机可以同时绑定多个队列,并且路由key和队列之间是多对多关系
- 主题模式: 在路由模式基础上,提供路由key模式匹配机制,此时需要采用对应的主题交换机实现
- RPC模式: 实现生产者和消费者之间的双向通信–通过生产者在消息头中携带的回调队列名完成双向通信
3.Rabbitmq的消息确认机制
- 自动应答: 消息发送成功后,立即被认为已经消费成功 — 该模式存在很大的消息丢失隐患,并且由于不限制投递给消费者的消息数量,流量大的情况下,消费者端会积压大量消息,最终可能导致消费者段内存耗尽。
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-D7J0T7vk-1676088488565)(C:/Users/zdh/AppData/Roaming/Typora/typora-user-images/image-20230210153005421.png)]
- 手动应答: 只有在消费者发送的ack成功被broker接收到后,broker才会将消息删除。
//第一个参数:确认哪一个消息
//第二个参数:是否开启消息批量应答
channel.basicAck(envelope.getDeliveryTag(),false);
//第一个参数:拒绝哪一个消息
//第二个参数:是否将拒绝的消息重新入队
channel.basicReject(envelope.getDeliveryTag(),true);
//第一个参数:拒绝哪一个消息
//第二个参数:是否批量拒绝
//第三个参数:是否将拒绝的消息重新入队
//basic.nack 方法可以一次拒绝或重新排队多条消息。这就是它与 basic.reject 的区别。
channel.basicNack(envelope.getDeliveryTag(),true,true);
注意:
- 消费者拿到了消息,但是断开连接前,都没有对消息进行应答,那么消息会重新入队。
- 如果消费者没有在指定时间内对某个消息做出应答,那么会强制关闭当前通道,并抛出PRECONDITION_FAILED通道级别异常,默认时间为30分钟。
- 消费者拒绝某个消息时,如果将requeue重新入队设置为false,那么会将消息路由到死信交换机,如果没配置,则直接丢弃消息。
4.Rabbitmq的消息持久化
rabbitmq消息持久化前,需要先将对应的队列先进行持久化,然后在发布消息时,将消息标记为持久化。
channel.basicPublish("",QUEUE_NAME,true,
//消息添加持久化属性
MessageProperties.PERSISTENT_TEXT_PLAIN,("序号"+i).getBytes(StandardCharsets.UTF_8));
5.发布确认模式如何确保生产者能够成功将消息投递到消息队列
生产者一旦将信道设置为confirm模式,所有在该信道上发布的消息都会被指派一个唯一的ID,一旦消息被成功投递到所有匹配的队列后,broker就会发送一个确认给生产者(包含消息的唯一ID),此时生产者就知道消息已经成功到达目的队列了。
如果消息和队列是可持久化的,那么确认消息会在将消息写入磁盘后再发出。
confirm模式本身是异步的,一旦发送一条消息,生产者应用程序就可以在等待信道返回确认的同时继续发送下一条消息,当消息最终得到确认之后,生产者便可以通过回调方法处理该确认消息。如果RabbitMQ因为自身内部错误导致消息丢失,就会发送一条nack消息,生产者应用程序同样可以在回调方法中处理该nack消息。
Rabbitmq提供三种发布确认实现方式:
- 单个发布确认: 发送一条消息,同步阻塞等待该条消息被确认后,继续发送下一条消息
- 批量发布确认: 批量发送一批消息后,同步阻塞等待这批消息的确认,同时必须将发送的这批消息进行保存,在发生故障的场景下,需要利用先前的副本信息,将消息重新进行发送。
- 异步确认: 生产者提供ack和nack回调接口,分别实现消息成功投递和消息投递失败的两种逻辑, 此模式需要保存所有已经发送的消息副本,在消息发送失败时,可以利用副本重新发送消息。
6. Rabbitmq基于队列设置消息过期时间和单独针对消息设置过期时间的不同之处?
- 基于队列粒度设置消息TTL:
HashMap<String, Object> arguments = new HashMap<>();
arguments.put("x-message-ttl",1000);
channel.queueDeclare(Q2_QUEUE,false,false,false,arguments);
由于整个队列中消息的过期时间是一致的,所以过期的消息势必出现在队列头部,那么每次只需要判断队列头部消息是是否过期即可,如果过期就丢弃或者死信。
- 基于单个消息粒度设置TTL:
byte[] messageBodyBytes = "Hello, world!".getBytes();
AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder()
.expiration("60000")
.build();
channel.basicPublish("my-exchange", "routing-key", properties, messageBodyBytes);
由于Rabbitmq只会通过每次判断队列头部消息是否过期,进行丢弃或死信,因此如果基于消息粒度设置过期时间,那么队列中靠前的消息未必是最早过期的, 那么已经过期的消息所持有的资源就不会被释放,直到过期消息来到了队列头部。
同时指定单个消息TTL和单队列TTL情况下,取较小者。
7.聊聊死信队列
死信来源:
- 消息TTL过期了
- 队列满了(队列默认没有消息个数限制,可以通过给队列增加x-max-length参数,这是队列可容纳消息最大个数,当队列满时,会将队列头部最旧的消息进行丢弃)
- 消息被消费拒绝(basic.reject或者basic.nack)并且requeue=false
队列过期不会对其中的消息进行死信
死信怎么处理:
- 不重要就丢弃
- 记录死信入库,做后续业务的分析或处理
- 通过死信队列,由负责监听死信的应用程序进行处理
我们通过设置队列的x-dead-letter-exchange属性,将某个交换机设置为绑定到当前队列上的死信交换机,当出现死信消息时,就交给死信交换机处理:
//声明死信交换机
channel.exchangeDeclare(DEAD_EXCHANGE,DIRECT,false,true,null);
//声明死信队列
channel.queueDeclare(DEAD_QUEUE,false,false,true,null);
//绑定死信交换机和死信队列
channel.queueBind(DEAD_QUEUE,DEAD_EXCHANGE,DEAD_KEY);
//普通队列属性设置
HashMap<String, Object> arguments = new HashMap<>();
//设置当前普通队列关联的死信交换机
arguments.put("x-dead-letter-exchange",DEAD_EXCHANGE);
//设置死信RoutingKey
arguments.put("x-dead-letter-routing-key",DEAD_KEY);
//设置队列中消息的存活时间--5s
arguments.put("x-message-ttl",5000);
//声明普通交换机
channel.exchangeDeclare(EXCHANGE_NAME,DIRECT,false,true,null);
//声明普通队列
channel.queueDeclare(QUEUE_NAME,false,false,true,arguments);
//绑定普通交换机和普通队列
channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,ROUTING_KEY);
8. 优先级队列有啥问题
优先级队列中会对队列中的消息按照优先级进行排序,但是为了支持消息在队列中按照优先级排序,需要付出相应的内存,磁盘和CPU成本。
如果想要优先级队列有机会对队列中的消息进行排队,通常需要配合消费端在手动确认模式下采用basic.qos方法,每次预取指定数量消息,从而给消息在队列中停留提供时间。
9.备份交换机有啥用
前面在设置死信队列时我们提到,可以为队列设置死信交换机来存储那些处理失败的消息,可是这些不可路由消息根本没有机会进入到队列,因此无法使用死信队列来保存消息。
在RabbitMQ.中,有一种备份交换机的机制存在,可以很好的应对这个问题。
备份交换机可以理解为 RabbitMQ中交换机的“备胎”,当我们为某一个交换机声明一个对应的备份交换机时,就是为它创建一个备胎,当交换机接收到一条不可路由消息时,将会把这条消息转发到备份交换机中,由备份交换机来进行转发和处理,通常备份交换机的类型为Fanout,这样就能把所有消息都投递到与其绑定的队列中,然后我们在备份交换机下绑定一个队列,这样所有那些原交换机无法被路由的消息,就会都进入这个队列了。
如果我们没有给某个交换机设置关联的备份交换机,那么会判断交换机对应mandatory参数是否被设置为true,如果为true,会尝试调用生产者提供的消息回退接口。
10.惰性队列
Rabbitmq在3.6.0版本中引入的惰性队列会将队列中的消息存入磁盘,当消费者消费到对应消息时,才会将消息从内存中加载出来。
当消费者由于各种原因下线,长时间无法消费消息造成消息队列中消息堆积时,惰性队列就很有必要了。
默认情况下,当生产者将消息发送到RabbitMQ的时候,队列中的消息会尽可能的存储在内存之中,这样可以更加快速的将消息发送给消费者。即使是持久化的消息,在被写入磁盘的同时也会在内存中驻留一份备份。当RabbitMQ需要释放内存的时候,会将内存中的消息换页至磁盘中,这个操作会耗费较长的时间,也会阻塞队列的操作,进而无法接收新的消息。虽然 RabbitMQ的开发者们一直在升级相关的算法,但是效果始终不太理想,尤其是在消息量特别大的时候。
10.Rabbitmq如何实现延迟队列功能
思路1: 利用单队列消息TTL,在队列粒度,指定队列中消息的过期时间,由于队列中靠头部的消息一定是越早过期的,所以不用担心消息不会按时死亡。但是缺点时,单个队列只能处理过期时间一样的消息,每增加一个新的时间需求,都需要新增一个队列。
思路2: 利用rabbitmq提供的延迟交换机插件, 此时我们就可以基于消息粒度指定消息TTL了,延迟交换机拿到这些消息后,不会立刻将其路由到某个队列,而是先保存起来,然后等待消息的延迟时间结束后,再将消息发送到指定的队列中去。
延迟交换机的劣势:
1.将消息持久化到磁盘保存,性能偏低
2.只发送一次消息,存在消息发送失败的可能,并且不支持mandatory属性
综合问题
利用上面已经提供的关于Rabbitmq相关问题的解决方案,我们来综合利用解决下面场景中存在的问题:
使用消息队列的优势和劣势有哪些,劣势问题又如何解决呢?
优势: 通常使用消息队列完成异步处理;各个微服务通过消息总线进行通信,完成应用解耦;利用消息队列缓存用户请求,完成流量削锋。
缺点: 系统可用性降低,因为需要保证消息队列服务的可用性。系统复杂度提高,引入消息队列中需要考虑数据一致性问题和消息幂等性问题。一致性问题,ABCD四个系统基于消息队列总线进行通信,如果A发布消息到消息总线,BCD三个系统系统中BD写库成功,C失败了,咋整?
消息顺序性如何保证 ?
什么是消息顺序性问题?
- 生产者发送两条消息M1和M2到队列中,并且希望M1优先于M2被消息,此时如果队列存在多个消费者,那么由于默认采用的是轮询派发机制,无法确保M1一定优先于M2被消费。
如何解决:
- 一个queue一个consumer, 在consumer内部可以使用内存队列对消息进行排队,然后将消息派发给底层的worker处理
如何避免消息重复消费?
- 全局唯一ID
- 上述方案,底层保存消息ID的数据源可以采用redis进行优化
setnx具有天然的互斥性,如果key已经存在那么设置失败,返回0
- 乐观锁
在业务中也是同样的处理思路:
- 利用唯一键确保数据不会插入多条
- …
如何确保消息的可靠传输?
这个问题需要拆分为三个子问题进行分析:
- 如何确保消息正确发送到消息队列?
- 如何确保消息队列重启后不会丢失消息?
- 如何确保消费方正确消费了消息?
如何确保消息正确发送到消息队列?
开启生产端的发布确认模式,即将生产方的信道设置为confirm模式,所有在该信道内发布的消息都会被指派一个唯一ID。
如何消息被成功投送到指定交换机,那么broker会给生产者发送一个ack确认消息。如何rabbitmq发生内部错误导致消息丢失,broker会给生产者发送一个nack消息。
如果开启了发布确认的异步模式,那么上述两种场景会分别回调生产者的ack和nack回调接口,生产者可以在nack回调接口中决定是否重新发送消息。
如果设置了消息持久化属性,那么消息会在持久化到硬盘后,再发送ack响应。
如何确保消费方正确消费了消息?
开启消费者端的手动应答机制,每条消息必须等待消费者成功发送ack响应到broker时,broker才会把消息从消息队列中删除。
- 如果消费者消费消息过程中断开了连接,那么消息会被重新入队,尝试分发给其他消费者,又或者消费者迟迟没有发出ack响应,如果超过了默认的30分钟,则消息也会被重新入队处理。(此处存在消息重复消费的可能性)
如何确保消息队列重启后不会丢失消息?
1.声明队列时,将durable参数设置为true,表明当前队列是一个持久化队列
2.发送消息时,将deliverMode设置为2,表示当前消息是一个持久化消息。
Rabbitmq主备集群和镜像集群
实现RabbitMQ的高可用集群,一般在并发和数据量不高的情况下,这种模式非常的好且简单。主备模式也称为Warren模式
- 主备模式:主节点提供读写,从节点不提供读写服务,只是负责提供备份服务,备份节点的主要功能是在主节点宕机时,完成自动切换 从–>主
- 主从模式:主节点提供读写,从节点只读
镜像模式:集群模式非常经典的就是Mirror镜像模式,保证100%数据不丢失,在实际工作中用的最多的。并且实现集群非常的简单,一般互联网大厂都会构建这种镜像集群模式。
Mirror镜像队列,目的是为了保证rabbitmq数据的高可靠性解决方案,主要就是实现数据的同步,一般来讲是2-3个实现数据同步(对于100%数据可靠性解决方案一般是3个节点)集群架构如下:
RabbitMQ集群架构模式
消息积压怎么处理
- 排查是否是由于消息队列服务器硬件原因导致,磁盘太小或者内存太小
- 增加消费者实例数量,将每次获取消息数量的预取值调大
- 给消息设置时间过期时间(存在消息丢失可能,可以配合死信队列使用,记录下被丢弃的消息)
- 如果还是不行,可以考虑开启一个消费者,将mq中的消息全部记录到数据库,然后发送ack, 后面慢慢排查处理。