文章目录
- 1.RabbitMQ如何防止消息堆积
- 2.RabbitMQ如何保证消息顺序消费
- 3.RabbitMQ如何防止消息重复消费
- 4.RabbitMQ如何保证消息可靠性
- 4.1 消息持久化
- 4.2 生产者确认
- 2.2.1 application.yml
- 2.2.2 Config
- 2.2.3 Test
- 4.3 消费者确认
- 4.3.1 application.yml
- 4.3.2 Test
1.RabbitMQ如何防止消息堆积
当生产者发送消息的速度超过了消费者处理消息的速度,就会导致队列中的消息堆积,直到队列存储消息达到上限。之后发送的消息就会成为死信,可能会被丢弃,这就是消息堆积问题。
解决消息堆积有两种思路:一是队列上绑定多个消费者,提高消费速度,也就是work工作模式;二是扩大队列容积,提高堆积上限
要提升队列容积,把消息保存在内存中显然是不行的,需要将消息保存到本地磁盘中,这时候就需要用到惰性队列了。
惰性队列的特征如下:
- 接收到消息后直接存入磁盘而非内存
- 消费者要消费消息时才会从磁盘中读取并加载到内存
- 支持数百万条的消息存储
要设置一个队列为惰性队列,只需要在声明队列时,指定 x-queue-mode 属性为 lazy 即可。
大量消息突然涌入导致积压临时处理方法
方案一:
- 新建一个或多个topic交换机,partition是原来的10倍,临时建立好原先10倍的queue数量。
- 然后写一个临时的分发数据的consumer程序,这个程序部署上去消费积压的数据,消费之后不做耗时的处理,直接均匀轮询将消费的消息临时写入建立好的10倍数量的queue中。
- 接着临时征用10倍或足够用的机器来部署consumer,每一批consumer消费一个临时queue的数据。这种做法相当于是临时将 queue 资源和consumer资源扩大10倍,以正常的10倍速度来消费数据。
- 修复原来的consumer问题,确保其恢复消费速度,重新用原先consumer机器来消费消息。
方案二:
前提
:MQ中消息失效:假设你用的是RabbitMQ,RabbtiMQ是可以设置过期时间的,也就是 TTL。如果消息在queue中积压超过一定的时间就会被RabbitMQ给清理掉,这个数据就没了。那这就是第二个坑了,这就不是说数据会大量积压在mq里,而是大量的数据会直接搞丢。
解决方法
:批量重导。队列消息大量积压的时候,在consumer接收到消息标记后直接丢弃数据,然后等过了高峰期以后,比如大家一起喝咖啡熬夜到晚上12点以后,用户都睡觉了。这个时候开始写程序,将丢失的那批数据,写个临时程序,一点一点的查出来,然后重新灌入mq里面去,把白天丢的数据给他补回来。假设1万个订单积压在mq里面,没有处理,其中 1000个订单都丢了,你只能手动写程序把那1000个订单给查出来,手动发到mq里去再补一次。
2.RabbitMQ如何保证消息顺序消费
之所以要保证消息消费的顺序性是因为使用中间件消息队列后,假设当前是购物场景,用户下订单和支付订单是两种不同的业务数据,当放到两个队列中时,要保证下订单消息要在支付订单消息之前处理才不会出现异常,这就需要保证消费者消费消息的顺序性。
方案一
一个queue (消息队列)但是对应一个consumer(消费者),然后这个consumer(消费者)内部用内存队列做排队,然后分发给底层不同的worker来处理。
方案二
拆分多个queue(消息队列),每个queue(消息队列) 一个consumer(消费者),就是多一些queue(消息队列)而已。
不同队列中的消息消费顺序是没有保证的,例如:火车站检票的的时候,排了三个队伍,不同队伍之间不同确保谁先进站。
3.RabbitMQ如何防止消息重复消费
什么是消息的重复消费?
两个消费者消费了相同的数据。
为什么会重复消费?
消费者在消费消息的时候,消费完毕后,会发送一个确认消息给消息队列,消息队列就知道该消息被消费了,就会将该消息从消息队列中删除,但是因为网络传输等等故障,确认信息没有传送到消息队列,导致消息队列不知道自己已经消费过该消息了,再次将消息分发给其他的消费者
解决方法
保证消息的唯一性,在写入消息队列的数据做唯一标示,消费消息时,根据唯一标识判断是否消费过,就算是多次传输,不要让消息的多次消费带来影响;保证消息等幂性;
其实MQ内部已经为我们做出了一些保障,在消息生产时,MQ内部针对每条生产者发送的消息生成一个inner-msg-id,作为去重的依据(消息投递失败并重传),避免重复的消息进入队列;
在消息消费时,要求消息体中必须要有一个 bizId(对于同一业务全局唯一,如支付ID、订单ID、帖子ID 等)作为去重的依据,避免同一条消息被重复消费。
4.RabbitMQ如何保证消息可靠性
消息的可靠性是指从 生产者发送消息 --》 消息队列存储消息 --》消费者消费消息 的整个过程中消息的安全性及可控性。
4.1 消息持久化
生产者确认可以确保消息投递到 RabbitMQ 的队列中,但是消息发送到 RabbitMQ 以后,如果宕机,也可能导致消息丢失。
要想确保消息在 RabbitMQ 中安全保存,必须开启消息持久化机制。
- 交换机持久化
- 队列持久化
- 1.将queue的持久化标识durable设置为true,则代表是一个持久的队列
- 2.发送消息的时候将deliveryMode=2这样设置以后,即使rabbitMQ挂了,重启后也能恢复数据
- 消息持久化
默认情况下,由SpringAMQP声明的交换机、队列、消息都是持久化的。
4.2 生产者确认
生产者丢失消息:从生产者弄丢数据这个角度来看,RabbitMQ提供transaction和confirm模式来确保生产者不丢消息;
transaction机制就是说:发送消息前,开启事务(channel.txSelect()),然后发送消息,如果发送过程中出现什么异常,事务就会回滚(channel.txRollback()),如果发送成功则提交事务(channel.txCommit())。然而,这种方式有个缺点:吞吐量下降;
confirm模式用的居多:一旦channel进入confirm模式,所有在该信道上发布的消息都将会被指派一个唯一的ID(从1开始),一旦消息被投递到所有匹配的队列之后;rabbitMQ就会发送一个ACK给生产者(包含消息的唯一ID),这就使得生产者知道消息已经正确到达目的队列了;如果rabbitMQ没能处理该消息,则会发送一个Nack消息给你,你可以进行重试操作
2.2.1 application.yml
spring:
rabbitmq:
publisher-confirm-type: correlated #开启消息确认异步回调
publisher-returns: true #开启消息发送失败回调
template:
# 定义消息路由失败时的策略。true,则调用ReturnCallback;false:则直接丢弃消息
mandatory: true
-
publish-confirm-type:开启 publisher-confirm,支持两种类型:
-
- simple:同步等待 confirm 结果,直到超时
- correlated:异步回调, 定义ConfirmCallback,MQ返回结果时会回调这个ConfirmCallback
-
publish-returns:开启publish-return功能,同样是基于callback机制,不过是定义ReturnCallback
-
template.mandatory:定义消息发送到交换机时失败的策略。true,则调用ReturnCallback;false:则直接丢弃消息
每个RabbitTemplate只能配置一个 ReturnCallback,因此需要在项目加载时配置:
2.2.2 Config
package com.bjpowernode.product.config;
import com.bjpowernode.product.entity.LocalMessage;
import com.bjpowernode.product.mapper.LocalMessageMapper;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.ReturnedMessage;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
@Component
@Slf4j
public class RabbitTemplateConfig implements RabbitTemplate.ConfirmCallback ,RabbitTemplate.ReturnsCallback{
@Autowired
private RabbitTemplate rabbitTemplate;
@PostConstruct
public void init() {
rabbitTemplate.setConfirmCallback(this);
rabbitTemplate.setReturnsCallback(this);
}
/**
* 确认消息是否投递到交换机
*
*/
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
if (ack) {
log.info("消息成功投递到交换机...");
} else {
log.error("消息未成功投递到交换机...");
}
}
/**
* 消息成功投递到交换机,向队列投递失败时调用
* @param returned the returned message and metadata.
*/
@Override
public void returnedMessage(ReturnedMessage returned) {
log.error("消息未成功投递到队列...");
}
}
2.2.3 Test
package com.bjpowernode.mq;
import org.junit.jupiter.api.Test;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
@SpringBootTest
class SpringAmqpTest02 {
@Autowired
private RabbitTemplate rabbitTemplate;
@Test
void testDirectExchange() {
rabbitTemplate
.convertAndSend("amq.direct", "product.saveOrUpdate2", "新增或更新商品通知");
}
}
4.3 消费者确认
消费者丢失消息:消费者丢数据一般是因为采用了自动确认消息模式,改为手动确认消息即可!
消费者在收到消息之后,处理消息之前,会自动回复RabbitMQ已收到消息;
如果这时处理消息失败,就会丢失该消息;
解决方案:处理消息成功后,手动回复确认消息。
4.3.1 application.yml
spring:
rabbitmq:
listener:
simple:
# manual:手动ack,需要在业务代码结束后,调用 api发送ack。
# auto:自动ack,由 Spring 监测 Listener 代码是否出现异常,没有异常则返回 ack;抛出异常则返回 nack
# none:关闭 ack,消息投递给消费者后立即从队列删除
acknowledge-mode: manual #手动确认
4.3.2 Test
@RabbitListener(queues = "simple.queue")
public void basicQueueListener(String message, Channel channel, Message msg) throws IOException {
try {
int i = 100 / 0;
System.out.println("消费者接收到消息:" + message);
// 手动ACK
channel.basicAck(msg.getMessageProperties().getDeliveryTag(),false);
} catch (Exception e) {
// TODO 添加额外的处理逻辑
//b:是否允许多条处理 b1:是否重新回到队列
// 返回 nack,从队列删除该消息
channel.basicNack(msg.getMessageProperties().getDeliveryTag(), false, false);
}
}