目录
- 确保消息的可靠性
- RabbitMQ 消息发送可靠性
- 分析
- 解决方案
- 开启事务机制
- 发送方确认机制
- 单条消息处理
- 消息批量处理
- 失败重试
- 自带重试机制
- 业务重试
- RabbitMQ 消息消费可靠性
- 如何保证消息在队列
- RabbitMQ 的消息消费,整体上来说有两种不同的思路:
- 确保消费成功两种思路
- 消息确认
- 自动确认
- 手动确认
- 推模式手动确认
- 拉模式手动确认
- 消息拒绝
- 总结:如何保证消息的可靠性。
- 幂等性问题
- 背景
- 解决思路
- 代码
确保消息的可靠性
先确定消息可能在哪些位置丢失—不同的位置可以有不同的解决方案
- 发送过程
- 从生产者到交换机
- 从交换机到队列
- 消费过程
- 消息在队列中
- 消费者消费
RabbitMQ 消息发送可靠性
分析
-
目标
- 消息成功到达 Exchange
- 消息成功到达 Queue
-
如果能确认这两步,那么我们就可以认为消息发送成功了。
-
如果这两步中任一步骤出现问题,那么消息就没有成功送达,此时我们可能要通过重试等方式去重新发送消息,多次重试之后,如果消息还是不能到达,则可能就需要人工介入了。
-
经过上面的分析,我们可以确认,要确保消息成功发送,我们只需要做好三件事就可以了:
- 确认消息到达 Exchange。
- 确认消息到达 Queue。
- 开启定时任务,定时投递那些发送失败的消息
解决方案
-
如何确保消息成功到达 RabbitMQ?RabbitMQ 给出了两种方案:
- 开启事务机制
- 发送方确认机制
-
这是两种不同的方案,不可以同时开启,只能选择其中之一,如果两者同时开启,则会报如下错误
开启事务机制
-
事务管理器
@Configuration public class RabbitConfig { @Bean public RabbitTransactionManager transactionManager(ConnectionFactory connectionFactory) { return new RabbitTransactionManager(connectionFactory); } }
-
消息生产者:添加事务注解并设置通信信道为事务模式
@Service public class MqService { @Resource private RabbitTemplate rabbitTemplate; @Transactional //标记事务 public void send() { rabbitTemplate.setChannelTransacted(true);//开启事务模式 rabbitTemplate.convertAndSend("mq_exchange_name","mq_queue_name","hello rabbitmq!".getBytes()); int i = 1 / 0;//运行时必然抛出异常,我们可以尝试运行该方法,发现消息并未发送成功 } }
当我们开启事务模式之后,RabbitMQ 生产者发送消息会多出四个步骤:
- 客户端发出请求,将信道设置为事务模式。
- 服务端给出回复,同意将信道设置为事务模式。
- 客户端发送消息。
- 客户端提交事务。
- 服务端给出响应,确认事务提交。
上面的步骤,除了第三步是本来就有的,其他几个步骤都是平白无故多出来的。所以大家看到,事务模式其实效率有点低,这并非一个最佳解决方案。我们可以想想,什么项目会用到消息中间件?一般来说都是一些高并发的项目,这个时候并发性能尤为重要。
所以,RabbitMQ 还提供了发送方确认机制(publisher confirm)来确保消息发送成功,这种方式,性能要远远高于事务模式
发送方确认机制
单条消息处理
-
配置文件:开启消息发送方确认机制
server: port: 8888 spring: rabbitmq: host: 192.168.29.200 port: 5672 username: admin password: admin virtual-host: / publisher-confirm-type: correlated # 配置消息到达交换器的确认回调 publisher-returns: true #配置消息到达队列的回调 # publisher-confirm-type有三个值 : # none:表示禁用发布确认模式,默认即此。 # correlated:表示成功发布消息到交换器后会触发的回调方法。 # simple:类似 correlated,并且支持 waitForConfirms() 和 waitForConfirmsOrDie() 方法的调用。
-
开启两个监听
/** * @author: zjl * @datetime: 2024/5/9 * @desc: * 定义配置类,实现 RabbitTemplate.ConfirmCallback 和 RabbitTemplate.ReturnsCallback 两个接口, * 这两个接口,前者的回调用来确定消息到达交换器,后者则会在消息路由到队列失败时被调用。 * * 定义 initRabbitTemplate 方法并添加 @PostConstruct 注解, * 在该方法中为 rabbitTemplate 分别配置这两个 Callback。 */ @Configuration @Slf4j public class RabbitConfig implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnCallback { public static final String MQ_EXCHANGE_NAME = "mq_exchange_name"; public static final String MQ_QUEUE_NAME = "mq_queue_name"; @Resource private RabbitTemplate rabbitTemplate; @Bean public Queue queue() { return new Queue(MQ_QUEUE_NAME); } @Bean public DirectExchange directExchange() { return new DirectExchange(MQ_EXCHANGE_NAME); } @Bean public Binding binding() { return BindingBuilder.bind(queue()) .to(directExchange()) .with(MQ_QUEUE_NAME); } @PostConstruct public void initRabbitTemplate() { rabbitTemplate.setConfirmCallback(this); rabbitTemplate.setReturnCallback(this); } @Override public void confirm(CorrelationData correlationData, boolean ack, String cause) { if (ack) { log.info("{}:消息成功到达交换器",correlationData.getId()); }else{ log.error("{}:消息发送失败", correlationData.getId()); } } @Override public void returnedMessage(Message message, int i, String s, String s1, String s2) { log.error("{}:消息未成功路由到队列",message.getMessageProperties().getMessageId()); } }
-
测试
首先尝试将消息发送到一个不存在的交换机中@RestController public class SendController { @Resource private RabbitTemplate rabbitTemplate;; @RequestMapping("/send") public String send() { rabbitTemplate.convertAndSend("RabbitConfig.MQ_EXCHANGE_NAME", RabbitConfig.MQ_QUEUE_NAME,"hello rabbitmq!".getBytes(),new CorrelationData(UUID.randomUUID().toString())); return "send success"; } }
给定一个真实存在的交换器,但是给一个不存在的队列
@RestController public class SendController { @Resource private RabbitTemplate rabbitTemplate;; @RequestMapping("/send") public String send() { //rabbitTemplate.convertAndSend("RabbitConfig.MQ_EXCHANGE_NAME", RabbitConfig.MQ_QUEUE_NAME,"hello rabbitmq!".getBytes(),new CorrelationData(UUID.randomUUID().toString())); rabbitTemplate.convertAndSend(RabbitConfig.MQ_EXCHANGE_NAME,"RabbitConfig.MQ_QUEUE_NAME","hello rabbitmq!".getBytes(),new CorrelationData(UUID.randomUUID().toString())); return "send success"; } }
可以看到,消息虽然成功达到交换器了,但是没有成功路由到队列(因为队列不存在)
消息批量处理
- 如果是消息批量处理,那么发送成功的回调监听是一样的,这里不再赘述。
- 这就是 publisher-confirm 模式。相比于事务,这种模式下的消息吞吐量会得到极大的提升
失败重试
- 失败重试分两种情况,一种是压根没找到 MQ 导致的失败重试,另一种是找到 MQ 了,但是消息发送失败了
自带重试机制
- 前面所说的事务机制和发送方确认机制,都是发送方确认消息发送成功的办法。
- 如果发送方一开始就连不上 MQ,那么 Spring Boot 中也有相应的重试机制,但是这个重试机制就和 MQ 本身没有关系了,这是利用 Spring 中的 retry 机制来完成的
-
配置
server: port: 8888 spring: rabbitmq: host: 192.168.29.200 port: 5672 username: admin password: admin virtual-host: / publisher-confirm-type: correlated # 配置消息到达交换器的确认回调 publisher-returns: true #配置消息到达队列的回调 template: retry: enabled: true # 开启重试机制 initial-interval: 1000ms # 重试起始间隔时间 max-attempts: 10 # 最大重试次数 max-interval: 10000ms # 最大重试间隔时间 multiplier: 2 # 间隔时间乘数。(这里配置间隔时间乘数为 2,则第一次间隔时间 1 秒,第二次重试间隔时间 2 秒,第三次 4 秒,以此类推)
-
再次启动 Spring Boot 项目,然后关掉 MQ,此时尝试发送消息,就会发送失败,进而导致自动重试
业务重试
- 业务重试主要是针对消息没有到达交换机的情况
- 如果消息没有成功到达交换机,此时就会触发消息发送失败回调,我们可以利用起来这个回调
- 下面说一下整体思路
-
准备数据库表
DROP TABLE IF EXISTS `service_msg_mq_info`; CREATE TABLE `service_msg_mq_info` ( `msgid` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NOT NULL, `empid` int(11) NULL DEFAULT NULL, `status` int(11) NULL DEFAULT NULL, `routekey` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NULL DEFAULT NULL, `exchange` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NULL DEFAULT NULL, `count` int(11) NULL DEFAULT NULL, `trytime` datetime NULL DEFAULT NULL, `createtime` datetime NULL DEFAULT NULL, `updatetime` datetime NULL DEFAULT NULL, PRIMARY KEY (`msgid`) USING BTREE ) ENGINE = InnoDB CHARACTER SET = utf8mb4 COLLATE = utf8mb4_general_ci ROW_FORMAT = Dynamic;
status:表示消息的状态,有三个取值,0,1,2 分别表示消息发送中、消息发送成功以及消息发送失败。
tryTime:表示消息的第一次重试时间(消息发出去之后,在 tryTime 这个时间点还未显示发送成功,此时就可以开始重试了)。
count:表示消息重试次数。
-
每次发送消息的时候,就往数据库中添加一条记录
-
在消息发送的时候,我们就往该表中保存一条消息发送记录,并设置状态 status 为 0,tryTime 为 1 分钟之后
-
在消息发送的时候,我们就往该表中保存一条消息发送记录,并设置状态 status 为 0,tryTime 为 1 分钟之后
-
另外开启一个定时任务,定时任务每隔 10s 就去数据库中捞一次消息,专门去捞那些 status 为 0 并且已经过了 tryTime 时间记录,把这些消息拎出来后,首先判断其重试次数是否已超过 3 次,如果超过 3 次,则修改该条消息的 status 为 2,表示这条消息发送失败,并且不再重试。对于重试次数没有超过 3 次的记录,则重新去发送消息,并且为其 count 的值+1
当然这种思路有两个弊端:
- 去数据库走一遭,可能拖慢 MQ 的 Qos,不过有的时候我们并不需要 MQ 有很高的 Qos,所以这个应用时要看具体情况。
- 按照上面的思路,可能会出现同一条消息重复发送的情况,不过这都不是事,我们在消息消费时,解决好幂等性问题就行了。
当然,大家也要注意,消息是否要确保 100% 发送成功,也要看具体情况。
RabbitMQ 消息消费可靠性
如何保证消息在队列
- 队列持久化—》创建的时候设置持久化
- 搭建rabbitmq集群–保证高可用
RabbitMQ 的消息消费,整体上来说有两种不同的思路:
-
推(push):MQ 主动将消息推送给消费者,这种方式需要消费者设置一个缓冲区去缓存消息,对于消费者而言,内存中总是有一堆需要处理的消息,所以这种方式的效率比较高,这也是目前大多数应用采用的消费方式。这种方式通过 @RabbitListener 注解去标记消费者,如以下代码,当监听的队列中有消息时,就会触发该方法
@Component public class ConsumerDemo { @RabbitListener(queues = RabbitConfig.MQ_QUEUE_NAME) public void handle(String msg) { System.out.println("msg = " + msg); } }
-
拉(pull):消费者主动从 MQ 拉取消息,这种方式效率并不是很高,不过有的时候如果服务端需要批量拉取消息,倒是可以采用这种方式
@Test public void test01() throws UnsupportedEncodingException { Object o = rabbitTemplate.receiveAndConvert(RabbitConfig.JAVABOY_QUEUE_NAME); System.out.println("o = " + new String(((byte[]) o),"UTF-8")); }
- 调用 receiveAndConvert 方法,方法参数为队列名称,
- 方法执行完成后,会从 MQ 上拉取一条消息下来,如果该方法返回值为 null,表示该队列上没有消息了。
- receiveAndConvert 方法有一个重载方法,可以在重载方法中传入一个等待超时时间,例如 3 秒。
- 此时,假设队列中没有消息了,则 receiveAndConvert 方法会阻塞 3 秒,3 秒内如果队列中有了新消息就返回,3 秒后如果队列中还是没有新消息,就返回 null,这个等待超时时间要是不设置的话,默认为 0
-
这是消息两种不同的消费模式
-
如果需要从消息队列中持续获得消息,就可以使用推模式;
-
如果只是单纯的消费一条消息,则使用拉模式即可。
-
切忌将拉模式放到一个死循环中,变相的订阅消息,这会严重影响 RabbitMQ 的性能
确保消费成功两种思路
- 为了保证消息能够可靠的到达消息消费者,RabbitMQ 中提供了消息消费确认机制。
- 当消费者去消费消息的时候,可以通过指定 autoAck 参数来表示消息消费的确认方式
- 当 autoAck 为 false 的时候,此时即使消费者已经收到消息了,RabbitMQ 也不会立马将消息移除,而是等待消费者显式的回复确认信号后,才会将消息打上删除标记,然后再删除。
- 当 autoAck 为 true 的时候,此时消息消费者就会自动把发送出去的消息设置为确认,然后将消息移除(从内存或者磁盘中),即使这些消息并没有到达消费者。
- 属性解释
- Ready 表示待消费的消息数量。
- Unacked 表示已经发送给消费者但是还没收到消费者 ack 的消息数量。
- 当我们将 autoAck 设置为 false 的时候,对于 RabbitMQ 而言,消费分成了两个部分:
- 待消费的消息
- 已经投递给消费者,但是还没有被消费者确认的消息
- 换句话说,当设置 autoAck 为 false 的时候,消费者就变得非常从容了,它将有足够的时间去处理这条消息,当消息正常处理完成后,再手动 ack,此时 RabbitMQ 才会认为这条消息消费成功了。
- 如果 RabbitMQ 一直没有收到客户端的反馈,并且此时客户端也已经断开连接了,那么 RabbitMQ 就会将刚刚的消息重新放回队列中,等待下一次被消费。
综上所述,确保消息被成功消费,无非就是手动 Ack 或者自动 Ack,无他。当然,无论这两种中的哪一种,最终都有可能导致消息被重复消费,所以一般来说我们还需要在处理消息时,解决幂等性问题。
消息确认
自动确认
- 在 Spring Boot 中,默认情况下,消息消费就是自动确认的
- 通过 @Componet 注解将当前类注入到 Spring 容器中,然后通过 @RabbitListener 注解来标记一个消息消费方法
- 默认情况下,消息消费方法自带事务,即如果该方法在执行过程中抛出异常,那么被消费的消息会重新回到队列中等待下一次被消费
- 如果该方法正常执行完没有抛出异常,则这条消息就算是被消费了
@Component public class ConsumerDemo { @RabbitListener(queues = RabbitConfig.MQ_QUEUE_NAME) public void receive1(String msg) { System.out.println("msg = " + msg); int i = 1 / 0; } }
手动确认
-
配置:修改为手动确认模式
server: port: 8888 spring: rabbitmq: host: 192.168.29.200 port: 5672 username: admin password: admin virtual-host: / publisher-confirm-type: correlated # 配置消息到达交换器的确认回调 publisher-returns: true #配置消息到达队列的回调 template: retry: enabled: true initial-interval: 1000ms max-attempts: 10 max-interval: 10000ms multiplier: 2 listener: simple: acknowledge-mode: manual
推模式手动确认
- 将消费者要做的事情放到一个 try…catch 代码块中。
- 如果消息正常消费成功,则执行 basicAck 完成确认。
- 如果消息消费失败,则执行 basicNack 方法,告诉 RabbitMQ 消息消费失败。
@RabbitListener(queues = RabbitConfig.MQ_QUEUE_NAME) public void receive1(Message message,Channel channel) { long deliveryTag = message.getMessageProperties().getDeliveryTag(); try { //消息消费的代码写到这里 String s = new String(message.getBody()); System.out.println("s = " + s); //消费完成后,手动 ack channel.basicAck(deliveryTag, false); } catch (Exception e) { //手动 nack try { channel.basicNack(deliveryTag, false, true); } catch (IOException ex) { ex.printStackTrace(); } } }
- 这里涉及到两个方法:
- basicAck:这个是手动确认消息已经成功消费,该方法有两个参数:
- 第一个参数表示消息的 id;
- 第二个参数 multiple 如果为 false,表示仅确认当前消息消费成功,如果为 true,则表示当前消息之前所有未被当前消费者确认的消息都消费成功。
- basicNack:这个是告诉 RabbitMQ 当前消息未被成功消费,该方法有三个参数:
- 第一个参数表示消息的 id;
- 第二个参数 multiple 如果为 false,表示仅拒绝当前消息的消费,如果为 true,则表示拒绝当前消息之前所有未被当前消费者确认的消息;
- 第三个参数 requeue 含义和前面所说的一样,被拒绝的消息是否重新入队。
- 当 basicNack 中最后一个参数设置为 false 的时候,还涉及到一个死信队列的问题
- basicAck:这个是手动确认消息已经成功消费,该方法有两个参数:
拉模式手动确认
- 拉模式手动 ack 比较麻烦一些,在 Spring 中封装的 RabbitTemplate 中并未找到对应的方法,所以我们得用原生的办法
- 这里涉及到的 basicAck 和 basicNack 方法跟前面的一样
public void receive2() {
Channel channel = rabbitTemplate.getConnectionFactory().createConnection().createChannel(true);
long deliveryTag = 0L;
try {
GetResponse getResponse = channel.basicGet(RabbitConfig.MQ_QUEUE_NAME, false);
deliveryTag = getResponse.getEnvelope().getDeliveryTag();
System.out.println("o = " + new String((getResponse.getBody()), "UTF-8"));
channel.basicAck(deliveryTag, false);
} catch (IOException e) {
try {
channel.basicNack(deliveryTag, false, true);
} catch (IOException ex) {
ex.printStackTrace();
}
}
}
消息拒绝
- 当客户端收到消息时,可以选择消费这条消息,也可以选择拒绝这条消息
@Component
public class ConsumerDemo {
@RabbitListener(queues = RabbitConfig.JAVABOY_QUEUE_NAME)
public void receive2(Channel channel, Message message) {
//获取消息编号
long deliveryTag = message.getMessageProperties().getDeliveryTag();
try {
//拒绝消息
channel.basicReject(deliveryTag, true);
} catch (IOException e) {
e.printStackTrace();
}
}
}
-
消费者收到消息之后,可以选择拒绝消费该条消息,拒绝的步骤分两步
- 获取消息编号 deliveryTag。
- 调用 basicReject 方法拒绝消息。
-
调用 basicReject 方法时,第二个参数是 requeue,即是否重新入队。
-
如果第二个参数为 true,则这条被拒绝的消息会重新进入到消息队列中,等待下一次被消费;
-
如果第二个参数为 false,则这条被拒绝的消息就会被丢掉,不会有新的消费者去消费它了。
-
需要注意的是,basicReject 方法一次只能拒绝一条消息
总结:如何保证消息的可靠性。
- 设置confirm和returning机制
- 设置队列和交互机的持久化
- 搭建rabbitMQ服务集群
- 消费者改为手动确认机制。
幂等性问题
背景
- 消费者在消费完一条消息后,向 RabbitMQ 发送一个 ack 确认,
- 此时由于网络断开或者其他原因导致 RabbitMQ 并没有收到这个 ack,
- 那么此时 RabbitMQ 并不会将该条消息删除
- 当重新建立起连接后,消费者还是会再次收到该条消息,这就造成了消息的重复消费。
- 同时,由于类似的原因,消息在发送的时候,同一条消息也可能会发送两次
解决思路
- 采用 Redis,在消费者消费消息之前,现将消息的 id 放到 Redis 中,存储方式如下:
- id-0(正在执行业务)
- id-1(执行业务成功)
- 如果 ack 失败,在 RabbitMQ 将消息交给其他的消费者时,先执行 setnx,如果 key 已经存在(说明之前有人消费过该消息),获取他的值,如果是 0,当前消费者就什么都不做,如果是 1,直接 ack。
- 极端情况:第一个消费者在执行业务时,出现了死锁,在 setnx 的基础上,再给 key 设置一个生存时间。生产者,发送消息时,指定 messageId
代码
-
添加redis依赖
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-data-redis</artifactId> </dependency>]
-
添加redis配置
redis: host: localhost port: 6379 password: 123456 timeout: 3000ms database: 0
-
配置类
@Configuration @Slf4j public class RabbitConfig{ public final static String DIRECTNAME = "mq-direct"; @Bean public Queue queue() { return new Queue("hello-queue"); } @Bean public DirectExchange directExchange() { return new DirectExchange(DIRECTNAME, true, false); } @Bean public Binding binding() { return BindingBuilder.bind(queue()) .to(directExchange()).with("direct"); } }
-
生产者
@RestController public class SendController { @Resource private RabbitTemplate rabbitTemplate;; @RequestMapping("/send") public String send() { //携带信息发送 CorrelationData messageId = new CorrelationData(UUID.randomUUID().toString()); rabbitTemplate.convertAndSend(RabbitConfig.DIRECTNAME,"direct","message",messageId); return "send success"; } }
-
消费者
package cn.smbms.consumer; import com.rabbitmq.client.Channel; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.data.redis.core.RedisTemplate; import org.springframework.data.redis.core.StringRedisTemplate; import org.springframework.stereotype.Component; import javax.annotation.Resource; import java.io.IOException; import java.util.concurrent.TimeUnit; /** * @author: zjl * @datetime: 2024/5/9 * @desc: */ @Component public class DirectReceiver { @Resource private StringRedisTemplate stringRedisTemplate; @RabbitListener(queues = "hello-queue") public void getMassage(String msg, Channel channel, Message message) throws IOException { //1、获取messageID String messageID = message.getMessageProperties().getHeader("spring_returned_message_correlation"); //2、用redis的setnx()方法放入值 放入成功返回true 放入失败返回false if (stringRedisTemplate.opsForValue().setIfAbsent(messageID, "0", 10, TimeUnit.SECONDS)) { //3、消费消息 System.out.println("接收到消息:" + msg); //4、设置value值为1 stringRedisTemplate.opsForValue().set(messageID, "1",10,TimeUnit.SECONDS); //5、手动ack channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); } else { //6、如果放入值失败 获取messageID对应的value String s = stringRedisTemplate.opsForValue().get(messageID); //7、value=0 什么都不做 if ("0".equalsIgnoreCase(s)) { return; //8、value=1 手动ack } else { channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); } } } }