文章目录
- 前言
- 消息确认机制
- SpringBoot 实现消息确认
- NONE
- AUTO
- MANUAL
前言
前面我们学习了 SpringBoot 整合 RabbitMQ,并且使用 RabbitMQ 实现了几种工作模式,接下来我们将学习关于 RabbitMQ 的高级特性——消息确认机制,持久化和发送方确认。
消息确认机制
大家应该学习过了计算机网络吧,那么 TCP 连接大家也一定不陌生吧,TCP 三次握手的时候,当服务器接收到建立连接的请求的时候,服务器就会返回一个 ACK 数据包,来告诉客户端我收到了你发送的请求。前面我们讲了一种确认机制,publisher/confirm 发布确认模式,那么这种模式是针对 RabbitMQ Broker 响应生产者发送的消息,而消费者消费消息的时候,也是需要告诉我们的队列是否接收到了这个消息的,以便队列能够选择删除这条消息还是重发这条消息。
对于消费者的消息确认有两种方式——自动确认和手动确认。
- 自动确认:当 autoAck 等于 true 的时候,RabbitMQ 会自动把发送出去的消息置为确认,让后从内存(或者磁盘)中删除,而不会管消费者是否正确的处理完成这条消息,自动确认适用于对于消息可靠性要求不高的场景。
- 手动确认:当 autoAck 等于 false 时,RabbitMQ 会等待消费者显式的调用 Basic.Ack 命令,回复确认后才从内存(或者磁盘)中移去消息。这种模式适用于对消息可靠性要求比较高的场景。
自动确认:
public class Constants {
public static final String CONFIRM_QUEUE= "confirm.queue";
}
生产者代码:
public class ConfirmProducer {
public static void main(String[] args) throws IOException, TimeoutException {
//建立连接
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("123.57.1.114");
factory.setPort(5672);
factory.setVirtualHost("test");
factory.setUsername("admin");
factory.setPassword("admin");
Connection connection = factory.newConnection();
//开启信道
Channel channel = connection.createChannel();
//声明队列
channel.queueDeclare(Constants.CONFIRM_QUEUE,true,false,false,null);
//发送消息
String msg = "rabbitmq confirm";
channel.basicPublish("",Constants.CONFIRM_QUEUE,null,msg.getBytes());
System.out.println("消息发送成功");
channel.close();
connection.close();
}
}
消费者代码:
public class Consumer1 {
public static void main(String[] args) throws IOException, TimeoutException {
//建立连接
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("123.57.1.114");
factory.setPort(5672);
factory.setVirtualHost("test");
factory.setUsername("admin");
factory.setPassword("admin");
Connection connection = factory.newConnection();
//开启信道
Channel channel = connection.createChannel();
//消费消息
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("接收到消息:" + new String(body));
}
};
//true参数表示自动确认
channel.basicConsume(Constants.CONFIRM_QUEUE,true,consumer);
channel.close();
connection.close();
}
}
运行生产者代码,并且观察管理页面:
然后运行消费者代码:
消费者处理了这条消息并且自动的进行了确认。
如果我们在消费者消费完成消息之前创建一个异常,看看结果会怎样:
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
int ret = 3/0;
System.out.println("接收到消息:" + new String(body));
}
};
可以看到,就算我们的消费者没有正确的处理完消息,队列也会将这条消息从队列中删除掉,那么如果我们的消费者需要重新获取到这条消息的话,就无法获得了。
手动确认:
//消费消息
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("接收到消息:" + new String(body));
channel.basicAck(envelope.getDeliveryTag(), false);
}
};
channel.basicConsume(Constants.CONFIRM_QUEUE,false,consumer);
- deliveryTag:这是一个由RabbitMQ服务器分配给每条消息的唯一标识符。客户端使用这个标识符来确认它已经成功处理(或消费)了消息。
- mutiple:是否批量确认,这是一个布尔值,用于指示是否确认单个消息(当为false时)还是确认deliveryTag之前的所有消息(当为true时)。
如果在消费者手动确认之前出现了异常,那么队列中的消息就不会被删除:
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("接收到消息:" + new String(body));
int ret = 3/0;
channel.basicAck(envelope.getDeliveryTag(), false);
}
};
当 autoAck 被设置为 false,对于 RabbitMQ 而言,队列中的消息被分成了两个部分:
一是等待投递给消费者的消息,二是已经投递给消费者,但是还没有收到消费者确认信号的消息。
如果 RabbitMQ 一直没有收到消费者的确认信号,并且消费此消息的消费者已经断开连接,则 RabbitMQ 会安排该消息重新进入队列,等待投递给下一个消费者,当然也可能还是原来的那个消费者。
否定确认:
手动确认可以分为上面的肯定确认,也可以否定确认:
Channel.basicReject(long deliveryTag, boolean requeue)
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("接收到消息:" + new String(body));
channel.basicReject(envelope.getDeliveryTag(), false);
}
};
不管是肯定确认还是否定确认,都是确认消息,所以队列中 Unacked 消息的个数就是 0。
basicReject 命令一次只能拒绝一次消息,如果想要批量拒绝消息,则可以使用 basicNack()
方法:
hannel.basicNack(long deliveryTag, boolean multiple, boolean requeue)
- long deliveryTag:这是一个唯一标识符,用于标识从RabbitMQ服务器接收到的消息。每个消息在被发送到消费者时都会被分配一个唯一的deliveryTag。通过这个标识符,消费者可以明确告诉RabbitMQ服务器它正在拒绝哪个具体的消息。
- boolean multiple:这个参数指定了是否拒绝deliveryTag之前(包括deliveryTag本身)的所有未确认的消息。如果设置为true,则RabbitMQ会拒绝从当前channel上接收到的、且尚未被确认的、所有小于或等于该deliveryTag的消息。如果设置为false,则仅拒绝具有该特定deliveryTag的消息。
- boolean requeue:这个参数决定了被拒绝的消息是否应该被重新放回队列中,以便可以被其他消费者重新处理。如果设置为true,消息将被重新放回队列的末尾(或根据队列的设置可能放到其他队列中,如果使用了死信队列等高级特性)。如果设置为false,消息将被丢弃(或根据服务器的配置可能进入死信队列)。
我们先将上面的手动确认代码的这一行注释掉,然后启动生产者和消费者,制造几个 Unacked 的消息,然后使用批量确认的方法,看前面的消息是否会被确认:
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("接收到消息:" + new String(body));
//这里设置被拒绝的消息会重新入列,fasle 表示被拒绝的消息不会被重新放入队列
channel.basicNack(envelope.getDeliveryTag(), true,true);
}
};
如果我们设置的被拒绝的消息会被重新放入队列的,并且我们的消费者没有及时关闭资源的话,就可能会导致我们消费者死循环的消费,因为当队列中存在 Ready 消息的时候,我们的消费者拒绝了这些消息然后将这些拒绝的消息重新放回队列,因为消费者的资源没有关闭,所以会继续监听这个队列,那么此时被拒绝的消息又加入到队列当中了,所以就又会重新执行上面的操作,最终导致一直消费-放入。
SpringBoot 实现消息确认
Spring AMQP 对消息确认机制提供了三种策略:
public enum AcknowledgeMode {
NONE,
MANUAL,
AUTO;
}
- AcknowledgeMode.NONE
-
- 这种模式下,消息一旦投递给消费者,不管消费者是否成功处理了消息,RabbitMQ 会自动确认消息,从 RabbitMQ 队列中删除该消息,如果消费者处理消息失败,消息可能会丢失
- AcknowledgeMode.AUTO(默认)
-
- 消费者在消息处理成功时会自动确认消息,如果处理过程中抛出了异常,则不会确认消息
- AcknowledgeMode.MANUAL
-
- 手动确认模式,消费者必须在处理完成消息之后显式的调用
basicAck
方法来确认消息,如果消息未被确认,RabbitMQ 会认为该消息未被成功处理,并且在消费者可用的时候重新投递该消息,这种模式提高了消息处理的可靠性,因为即使消费者处理消息后失败,消息也不会丢失,而是可以被重新处理
- 手动确认模式,消费者必须在处理完成消息之后显式的调用
那么在 SpringBoot 中如何配置消息确认的策略呢,同样还是在 application 配置文件中:
spring:
rabbitmq:
listener:
simple:
acknowledge-mode: none
//acknowledge-mode: auto
//acknowledge-mode: manual
NONE
我们先来看看 none 策略的消息确认:
public static final String ACK_QUEUE = "ack.queue";
@Bean("ackQueue")
public Queue ackQueue() {
return QueueBuilder.durable(Constants.ACK_QUEUE).build();
}
@RequestMapping("/ack")
public String confirm() {
rabbitTemplate.convertAndSend("", Constants.ACK_QUEUE,"rabbit ack");
return "消息发送成功";
}
@Component
public class AckListener {
@RabbitListener(queues = Constants.ACK_QUEUE)
public void queueListener(String message, Channel channel) {
System.out.println("接收到消息:" + message + channel);
}
}
如果消费者在处理的过程中抛出了异常:
@Component
public class AckListener {
@RabbitListener(queues = Constants.ACK_QUEUE)
public void queueListener(String message, Channel channel) {
System.out.println("接收到消息:" + message + channel);
int ret = 3/0;
}
}
通过 RabbitMQ 管理页面观察队列情况:
可以发现消息被自动确认了,但是消费者并没有正确处理这个消息。
AUTO
AUTO 策略当消费者消费完成之后会自动确认,如果处理过程中抛出了异常则不会自动确认:
消费者正确处理消息的代码我就不演示了,我们直接来看处理异常的情况
而且运行代码我们还可以发现,代码会陷入死循环,这是因为,当我们的消费过程中抛出了异常之后,那么这个消息就会被设置为 Unacked,因为我们的消费者与 RabbitMQ Server 并没有断开连接,所以消费者会继续监听队列,读取队列中 Ready 和 Unacked 部分的消息,那么当这个消费者读取消息的时候,这个消息又会被设置为 Unacked,所以就会一直重复这个过程。
而当我们手动关闭这个程序的时候,这条消息又会被设置为 Ready:
MANUAL
MANUAL 策略需要我们手动确认消息,在这种模式下如果消费未被正确处理,那么这个消息会被重新投递给消费者,保证了可靠性:
我们还是看消息未被正确处理的情况:
@Component
public class AckListener {
@RabbitListener(queues = Constants.ACK_QUEUE)
public void queueListener(Message message, Channel channel) {
long deliveryTag = message.getMessageProperties().getDeliveryTag();
try {
System.out.println("接收到消息:" + message + channel);
int ret = 3/0;
channel.basicAck(deliveryTag,false);
} catch (Exception e) {
try {
//第二个参数表示是否批量确认,第三个参数表示当拒绝消息的时候是否将该消息重新进入队列
channel.basicNack(deliveryTag,false,true);
} catch (IOException ex) {
throw new RuntimeException(ex);
}
}
}
}
启动项目之后,程序还是会重复执行,跟 auto 的原因是一样的,结束程序之后也是:
那么这三种策略就是我们的 Spring 提供的三种消息确认策略。