一、解耦思想
在 RabbitMQ 在设计的时候,特意让生产者和消费者分离,也就是消息的发布和消息的消费之间是解耦的。
生产者与消费者之间的直连,少了很多的灵活性和策略的制定。还有一种解耦的思想存在。
二、消息的可靠性保证与性能关系
消息的可靠性保证:
1.靠事务机制(存在事务的几阶段过程,性能下降严重)
2.确定机制--异步回复机制(轻量级处理机制)
相比之下,发送方确认机制最大的好处在于它提供异步回复机制,一旦发布一条消息生产者程序可以在等待信道返回确认的同时继续发布下一条消息,当消息得到最终确认之后,生产者程序可以通过回调 方法来处理该确认消息,
confirm的三种实现方式
方式一:channel.waitForConfirms()普通发送方确认模式;消息到达交换器,就会返回 true。
方式二:channel.waitForConfirmsOrDie()批量confirm模式,每发送一批消息之后,调用waitForConfirms()方法,等待服务端confirm,这种批量确认的模式极大的提高了confirm效率,但只要有一个消息未到达交换器就会抛出 IOException 异常。客户端往往需要将这一批次的消息全部重发,这会带来明显的重复消息,如果这种情况频繁发生的话(网络不稳定),效率也会不升反降;
方式三:channel.addConfirmListener()异步监听发送方确认模式; 通过监听器的返回效果和批量confirm模式类似,都是每隔一段时间确认一批消息,区别就是不会造成生产者阻塞,但依旧会导致重复消息
在 RabbitMQ 中,有不同的投递机制(生产者),但是每一种机制都对性能有一定的影响。一般来讲速度快的可靠性低,可靠性好的性能差,具体怎么使用需要根据你的应用程序来定,所以说没有最好的方式,只有最合适的方式。只有把你的项目和技术相结合,才能找到适合你的平衡。
三、消息推拉模式
Consumer消费消息,并向服务器进行应答。表明这个消息已经消费完了还是可以继续让别人消费。主要收集了两种消费方式
推模式(push)
1:推模式接收消息是最有效的一种消息处理方式。channel.basicConsume(queneName,consumer)方法将信道(channel)设置成投递模式,直到取消队列的订阅为止;在投递模式期间,当消息到达RabbitMQ时,RabbitMQ会自动地、不断地投递消息给匹配的消费者,而不需要消费端手动来拉取,当然投递消息的个数还是会受到channel.basicQos的限制。
2:推模式将消息提前推送给消费者,消费者必须设置一个缓冲区缓存这些消息。优点是消费者总是有一堆在内存中待处理的消息,所以当真正去消费消息时效率很高。缺点就是缓冲区可能会溢出。
3:由于推模式是信息到达RabbitMQ后,就会立即被投递给匹配的消费者,所以实时性非常好,消费者能及时得到最新的消息。
4:Consumer等待rabbitMQ 服务器将message推送过来再消费。一般是启一个一直挂起的线程来等待。
拉模式(pull)
1:如果只想从队列中获取单条消息而不是持续订阅,则可以使用channel.basicGet方法来进行消费消息。
2:拉模式在消费者需要时才去消息中间件拉取消息,这段网络开销会明显增加消息延迟,降低系统吞吐量。
3:由于拉模式需要消费者手动去RabbitMQ中拉取消息,所以实时性较差;消费者难以获取实时消息,具体什么时候能拿到新消息完全取决于消费者什么时候去拉取消息。
结论
1:不能在循环中使用拉模式来模拟推模式,因为拉模式每次都需要去消息中间件中拉取消息来消费,所以会严重影响RabbitMQ性能。
2:要想实现高吞吐量,消费者需要使用推模式。
- 其次,官方特意提到的message的持久性。关键的message不能因为服务出现问题而被忽略。还要注意,官方特意提到,所有的queue是不能被多次定义的。如果一个queue在开始时被声明为durable,那在后面再次声明这个queue时,即使声明为 not durable,那这个queue的结果也还是durable的。
- 然后,是中间件最为关键的分发方式。这里,RabbitMQ默认是采用的fair dispatch,也叫round-robin模式,就是把消息轮询,在所有consumer中轮流发送。这种方式,没有考虑消息处理的复杂度以及consumer的处理能力。而他们改进后的方案,是consumer可以向服务器声明一个prefetchCount,我把他叫做预处理能力值。channel.basicQos(prefetchCount);表示当前这个consumer可以同时处理几个message。这样服务器在进行消息发送前,会检查这个consumer当前正在处理中的message(message已经发送,但是未收到consumer的basicAck)有几个,如果超过了这个consumer节点的能力值,就不再往这个consumer发布。
这种模式,官方也指出还是有问题的,消息有可能全部阻塞,所有consumer节点都超过了能力值,那消息就阻塞在服务器上,这时需要自己及时发现这个问题,采取措施,比如增加consumer节点或者其他策略
*Note about queue size
If all the workers are busy, your queue can fill up. You will want to keep an eye on that, and maybe add more workers, or have some other strategy.*
另外 官网上没有深入提到的,就是还是没有考虑到message处理的复杂程度。有的message处理可能很简单,有的可能很复杂,现在还是将所有message的处理程度当成一样的。还是有缺陷的,但是目前也只看到dubbo里对单个服务有权重值的概念,涉及到了这个问题。
四、消息的丢失
1、producer生产者丢失消息
原因:生产者发送消息由于网络等原因并没有发送到RabbitMq
解决方案:
1.1、开启RabbitMq事务机制
生产者发送数据之前开启 RabbitMQ 事务channel.txSelect,然后发送消 息,如果消息没有成功被 RabbitMQ 接收到,那么生产者会收到异常报错,此时就可以回滚事务channel.txRollback,然后重试发送消息;如果收到了消息,那么可以提交事务channel.txCommit,类似我们数据库数据库事务机制。
1.2、开启 confirm 模式
在生产者端设置开启 confirm 模式之后,你每次写的消息都会分配一个唯一的 ID,然后如果写入了 RabbitMQ 中,RabbitMQ 会给你回传一个 ack 消息,告诉你说这个消息已经收到。如果 RabbitMQ 没能处理这个消息,会回调你的一个 nack 接口,告诉你这个消息接收失败,你可以重试。而且可以结合这个机制在自己业务里维护每个消息 ID 的状态,如果超过一定时间还没接收到这个消息的回调,那么可以业务主动重发。
事务机制和 confirm 机制优劣:
事务机制是同步的,提交一个事务之后会阻塞,吞吐量会下来,耗性能。
confirm 机制是异步的,流程不会阻塞,吞吐量较高,性能较好。
2、broker消息中间件自身丢失消息
原因:RabbitMq收到生产者的消息后还没有来得及持久化到磁盘,又或者创建队列没有持久化以及消息并没有设置为持久化,在Mq故障宕机后都会有消息丢失的情况。
解决方案:
2.1、创建队列queue的时候设置队列持久化
2.2、mq配置deliveryMode == 2 消息持久化
重点:必须同时设置队列持久化和消息持久化,再结合生产者的confrim模式,才能保证消息准确投递到broker并保证进入磁盘。
3、consumer消费者丢失消息
原因:消费者自动ack配置情况下,业务代码异常或者其他故障消息并没有处理完成也会自动ack。RabbitMq消息ack后就会丢弃,这就导致异常情况下的消息丢失了。
解决方案:
3.1 关闭RabbitMq自动ack,业务代码成功消费了消息手动调用Mq ack,让Mq丢弃消息;如果业务代码异常则直接nack,让Mq重新推送消息进行处理。当然,在要求比较高的情况下也可以异常数据进入死信队列,保证数据的完整性。
五、消息的重复消费/消费顺序
什么情况下会造成消息重复呢?
假设我们的消费者在执行完任务后,准备显式调用 basicAck给RabbitMQ删除消息时。此时消费者宕机了。
此时我们的RabbitMQ检测到自己队列所对应的消费者掉线了,就会将这个消息重新入队并等待下一个消费者连接进来后继续投递消费。那么问题就很明显了,上一个消费者已经消费了消息,而新连接的消费者再次消费了该消息,导致了重复消费。
无论如何,手动提交的方式也已经解决了最大的麻烦(消息丢失的问题)。
QoS 预取模式(重复消息)
在确认消息被接收之前,消费者可以预先要求接收一定数量的消息,在处理完一定数量的消息后,批量进行确认。如果消费者应用程序在确认消息之前崩溃,则所有在RabbitMQ中未确认的消息将被重新发送给其他消费者。很明显,这样也会导致重复消息的产生。
我们不难发现,无论是生产者还是消费者,但凡是批量操作,总会让重复消息的概率大大提升。奈何批量操作的速度快啊,因此还是要用。
我们发现在一个消息从生产者到消费者的过程中,至少有三种情况导致重复消息的产生。那么重复消息该如何处理呢?
消费者的事务
当然,我们的消费者端也可以采用事务的方式来确保消息的准确性。但是依旧是效率问题导致这个方式非常鸡肋,不再赘述了。
消息幂等性处理重复消息
让每个消息携带一个全局的唯一ID,即可保证消息的幂等性,具体消费过程为:
消费者获取到消息后先根据id去查询redis/db是否存在该消息(或者放到内存容器中)。
如果不存在,则正常消费,消费完毕后写入redis/db
如果存在,则证明消息被消费过,直接丢弃。
原来说的那么高大上,无非就是交给我们消费者自己去做去重校验。