目录
1.消息确认机制
1.1.消息确认机制介绍
2.1.Spring-AMQP的三种消息确认(重点)
2.持久性
2.1.交换机持久性
2.2.队列持久性
2.3.消息持久性
3.发送方确认
3.1.confirm确认模式
3.2.return退回模式
4.重试机制
4.1.重试机制定义
4.2.重试机制代码的准备工作
4.3.重试机制演示
1.消息确认机制
1.1.消息确认机制介绍
这里的消息确认机制,指的是消费者对消息的确认,而不是生产者。
(1)背景缘由
当消费者把消息发送出去后,就会把消息删除。如果消费者这边处理消息成功,则相安无事;但是如果处理异常,消息也就会丢失。所以就需要设置消费者的消息确认模式
(2)消息确认的机制
消息确认机制分为两个大类:自动确认和手动确认
手动确认又分为三种模式:肯定确认、否定确认、否定批量确认
对于rabbitmq的操作有两种,我们主要介绍第二种
- 直接使用amqb提供的包(RabbitMQ Java Client库)
- 使用spring集成进来的
第一种介绍:
对于第一种主要把autoAck参数设置成false即为手动确认,改成true即为自动确认。收到确认需要调用Channel.basicAck函数
//自动确认,第二个参数为true
channel.basicConsume(Constants.RPC_RESPONSE_QUEUE,true,consumer);
//收到确认,第二个参数为false
DefaultConsumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String request = new String(body,"UTF-8");
System.out.println("接收到请求:"+ request);
String response = "针对request:"+ request +", 响应成功";
AMQP.BasicProperties basicProperties = new AMQP.BasicProperties().builder()
.correlationId(properties.getCorrelationId())
.build();
channel.basicPublish("", Constants.RPC_RESPONSE_QUEUE, basicProperties, response.getBytes());//返回响应
channel.basicAck(envelope.getDeliveryTag(), false);//手动确认
}
};
channel.basicConsume(Constants.RPC_REQUEST_QUEUE,false,consumer);//false
- 自动确认:当autoAck为true时,RabbitMQ会自动把发出去的消息设置为确认,然后从内存(硬盘)删除,而不会去管消费者是否真正的消费到该消息。适用对消息可靠性要求不高的场景
- 收到确认:当autoAck为false时,RabbiMQ会等待消费者显示地调用Basic.Ack命令,回复确认信号之后才会删除该消息。适用对消息可靠性要求比较高的场景
2.1.Spring-AMQP的三种消息确认(重点)
关于消息确认机制的问题:none自动确认(不管消息是否收到)、auto成功收到删除,异常收到不确认、manual手动确认
none:
(1)成功处理:不考虑
(2)消费异常:队列也会直接将消息删除
auto:
(1)成功处理--不考虑
(2)消费异常:队列不会把消息删除
- 没有进行异常捕获 --- 会一直尝试消费消息并一直报异常,队列不会把该消息删除,也就是不会确认消息
- 进行了异常捕获 --- 消费逻辑只会执行一次并,队列会删除消息
manual:手动确认模式(肯定确认和否定确认)
(1)成功处理
- 进行了肯定确认,队列将消息进行删除
- 没有进行肯定确认,队列会将消息从ready至为unacked
(2)消费异常
下面的情况是进行了异常的捕获
- 进行否定确认,但不重新入队 --- 队列会直接将消息删除
- 进行了否定确认,并且重新入队 --- 消息会重新入队,也就是队列不会将消息删除
以上是消息确认机制的情况总结。
下面展示代码:
配置文件:除了manual模式,其他模式只需要配置了就会生效
mamual模式的确认机制:
@RabbitListener(queues = Constant.ACK_QUEUE)
public void ackQueue2(Message message, Channel channel) throws IOException {
long deliveryTag = message.getMessageProperties().getDeliveryTag();
System.out.println("消息是:"+new String(message.getBody()));
System.out.println("deliveryTag下标是:"+deliveryTag);
try {
System.out.println("消费异常前");
int a=10/0;
System.out.println("消费异常后");
channel.basicAck(deliveryTag, false);
}catch (Exception e) {
System.out.println("消费异常");
channel.basicNack(deliveryTag, false, true);
}
}
下面这一段写的无需观看,都是一些学习时写的混乱思路
Spring-AMQP提供了三种消息确认机制
public enum AcknowledgeMode {
NONE ,
MANUAL ,
AUTO ;
}
- AcknowledgeMode.NONE:这种也就是对应自动确认模式。消息一旦投递给消费者,不关消费者是否成功处理,RabbitMQ都会自动确认消息,就会把消息删除。
- AcknowledgeMode.AUTO(默认):这种模式下,消费者处理成功会自动确认消息,但是如果处理失败就会抛出异常,不会确认消息。
- AcknowlegeMode.MANUAL:手动确认模式。消费者必须在成处理消息后显示的调用basicAck方法来确认消息。如果消息未被确认,RabbitMQ会认为消息尚未被成功处理,并且会重新投递消息。
(1)收到确认的两个方法
(2)先准备好下面的代码
1)常量类
public class Constant {
//消息确认机制
public static final String ACK_QUEUE = "ack_queue";
public static final String ACK_EXCHANGE = "ack_exchange";
}
2)配置类和配置文件
@Configuration
public class AckConfig {
//消息确认机制
@Bean("ackQueue")
public Queue ackQueue() {
return QueueBuilder.durable(Constant.ACK_QUEUE).build();
}
@Bean("ackDirectExchange")
public DirectExchange ackDirectExchange() {
return ExchangeBuilder.directExchange(Constant.ACK_EXCHANGE).build();
}
@Bean("ackBinding")
public Binding ackBinding(@Qualifier("ackDirectExchange") DirectExchange ackDirectExchange,
@Qualifier("ackQueue") Queue ackQueue) {
return BindingBuilder.bind(ackQueue).to(ackDirectExchange).with("ack");
}
}
spring:
rabbitmq:
addresses: amqp://study:study@8.138.121.41:5672/extension1
listener:
simple:
acknowledge-mode: none #消息确认模式
3)生产者
@RequestMapping("/produce")
@RestController
public class ProduceController {
@Autowired
private RabbitTemplate rabbitTemplate;
@RequestMapping("/ack")
public String ack() {
rabbitTemplate.convertAndSend(Constant.ACK_EXCHANGE,"ack","这是一条ack消息");
return "消息发送成功";
}
}
4)消费者
@Component
public class AckListener {
@RabbitListener(queues = Constant.ACK_QUEUE)
public void ackQueue(Message message, Channel channel) {
System.out.println("接受到消息:"+ new String(message.getBody()));
}
}
程序正常运行:
(3)消息确认模式
1)第一种:none
正常运行:会直接确认消息并且从队列中删除
程序抛出异常:也会自动确认消息并且从队列中删除
小结:只要消费者把消息成功发送出去,不管消费者如何处理,都会自动确认并且删除消息
2)第二种:auto
正常运行:会直接确认消息并且从队列中删除
异常处理:消息不会确认,并且消息会不停地尝试重新入队
小结: 消费者成功处理消息就会确认消息,发生异常不会确认并尝试重新发送消息。
3)第三种:manual
正常情况并确认:需要手动确认消息
@Component
public class AckListener {
@RabbitListener(queues = Constant.ACK_QUEUE)
public void ackQueue(Message message, Channel channel) {
long deliveryTag = message.getMessageProperties().getDeliveryTag();//消息的序号
try {
System.out.println("异常情况前");
int a=10/0;
System.out.println("异常情况后");
System.out.println("接受到消息:"+ new String(message.getBody()));
channel.basicAck(deliveryTag, false);//收到确认消息
}catch (Exception e) {
System.out.println("尝试重新入队列");
}
}
}
这种就是正常的情况,不演示
正确情况不确认:会在队列中一直存在,不确认就会一直存在,消费者也会一直拿到该消息。如果下一个消费者进行了确认才会删除
异常情况并确认:在捕获异常中进行否定确认(一样也会确认消息)
@Component
public class AckListener {
@RabbitListener(queues = Constant.ACK_QUEUE)
public void ackQueue(Message message, Channel channel) throws IOException {
long deliveryTag = message.getMessageProperties().getDeliveryTag();//消息的序号
try {
System.out.println("异常情况前");
int a=10/0;
System.out.println("异常情况后");
System.out.println("接受到消息:"+ new String(message.getBody()));
channel.basicAck(deliveryTag, false);//收到确认消息
}catch (Exception e) {
channel.basicNack(deliveryTag,false,false);//b:非批量确认,b1:不进行重新入队
//System.out.println("尝试重新入队列");
}
}
}
异常情况确认并重新入队:消息重新入队,并且一直尝试重新消费
2.持久性
背景:RabbitMQ的持久性也是保证可靠性的一种机制。RabbitMQ持久性分为交换机持久性、队列持久性和消息持久性。
2.1.交换机持久性
(1)持久性定义
这里交换机的持久性是指在RabbitMQ重启前后,交换机是否还存在。如果设置为持久性,则会一直存在,反之重启后交换机就会被删除。
RabbitMQ重启的定义:在服务器上进行重启启动。比如在linux上使用systemctl restart rabbitmq-server.service进行重新启动
(2)Spring设置交换机持久化
spring集成进来的交换机在创建的时候是默认为持久化的
ExchangeBuilder. topicExchange (Constant. ACK_EXCHANGE_NAME ).durable(true).build()
如果要设置成非持久化,只需要设置成false即可。
ExchangeBuilder.directExchange(Constant.PRE_EXCHANGE).durable(false).build();
(3)持久化与非持久化演示
持久化:交换机仍然存在
非持久化:重启后框框中的队列已经不见了
2.2.队列持久性
(1)持久性定义
定义:队列持久化和交换机一样,指的都是RabbitMQ服务器重启前后是否还存在。持久化,服务器重启后仍然存在,队列中的消息也会存在;非持久化,当服务器重启后队列就会被删除,队列中的消息也会被删除。
(2)设置持久化
创建的队列默认是持久化的
设置的方法:
QueueBuilder.durable(Constant.ACK_QUEUE).build();//持久化(默认)
QueueBuilder.nonDurable(Constant.PRE_QUEUE).build();//非持久化队列
(3)持久化与非持久化演示
持久化:重启前后消息队列存在,如果里面有消息,也仍然存在
非持久化:重启后队列消失,消息也消失
(4)可靠性的保证
如果只是队列持久性,但是消息不持久性,MQ重启之后,消息仍然会消失,也就是不可靠。所以要想保证消息可靠性,就需要同时设置消息持久性和队列持久化。
2.3.消息持久性
(1)定义
消息的持久化,放入队列中如果是持久化,那个mq服务器重启后消息也仍然会存在(队列也要是持久化);反之如果消息是非持久化,那么重启服务后消息将会被删除。
(2)设置消息持久性
对于消息的持久性属性,需要额外设置
消息默认是持久化的。如果队列是非持久化,消息也会被修改成非持久化。
rabbitmq-client设置消息持久化:设置第三个属性
//⾮持久化信息
channel.basicPublish("",QUEUE_NAME,null,msg.getBytes());
//持久化信息
channel.basicPublish("",QUEUE_NAME,MessageProperties.PERSISTENT_TEXT_PLAIN ,msg.getBytes());
spring内置:需要设置new MessageProperties()中的MessageProperties()属性
Message message = new Message("".getBytes(),new MessageProperties());
//1.消息非持久化
message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.NON_PERSISTENT);
//2.消息持久化
message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);
小结:想要消息做到持久化,必须要求队列和消息都为持久化才可以,其他情况都不可以。
3.发送方确认
要想保证消息持久化,前提是消息已经到达了服务器,那如果保证消息可以发送到服务器呢?RabbitMQ就给我们提供了两种方式:事务机制和发送方确认机制,我们介绍后者。
发送方确认机制也就是:publisher confirm 机制,该机制也分为两种:confirm确认模式和return退回模式
confirm确认模式运用在生产者和交换机之间;而return退回模式运用在交换机和队列之间。
3.1.confirm确认模式
(1)介绍confirm
1)工作的地方
2)定义
生产者发送消息后,会对发送端设置一个监听,无论是否到达Exchange,这个监听都会被执行。如果Exchange成功收到,ACK为true,如果没收到,为false
(2)使用confirm模式
1)第一步:配置确认模式
spring:
rabbitmq:
addresses: amqp://study:study@8.138.121.41:5672/extension1
listener:
simple:
#acknowledge-mode: none #消息确认模式
#acknowledge-mode: auto #消息确认模式
acknowledge-mode: manual #消息确认模式
publisher-confirm-type: correlated #消息发送确认
2)编写准备代码
已准备好的队列和交换机:
//发送方确认机制
@Bean("confirmQueue")
public Queue confirmQueue() {
return QueueBuilder.durable(Constant.CONFIRM_QUEUE).build();
}
@Bean("confirmDirectExchange")
public DirectExchange confirmDirectExchange() {
return ExchangeBuilder.directExchange(Constant.CONFIRM_EXCHANGE).build();
}
@Bean("confirmBinding")
public Binding confirmBinding(@Qualifier("confirmDirectExchange")Exchange exchange,@Qualifier("confirmQueue")Queue queue ) {
return BindingBuilder.bind(queue).to(exchange).with("comfrim").noargs();
}
3)编写生产者代码
因为需要在交换机收到消息时回复确认,所以就需要回调方法,这个需要在rabbitmq客户端对象上进行设置。
设置客户端的属性:
@Configuration
public class RabbitTemplateConfig {
//1.返回原装的rabbitmqTemplate
@Bean
public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
return rabbitTemplate;
}
//2.返回confirm模式的
@Bean
public RabbitTemplate confirmRabbitTemplate(ConnectionFactory connectionFactory) {
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
//设置confirm回调方法
rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
@Override
public void confirm(CorrelationData correlationData, boolean b, String s) {
/**
* b:是否收到ack
* correlationData: 消息id
* s: 原因
*/
if(b) {
System.out.printf("接收到消息,消息id: %s \n",correlationData==null?"null":correlationData.getId());
}else {
System.out.printf("未接收到消息,消息id: %s,原因: %s \n",correlationData==null?"null":correlationData.getId(),s);
//下面可以做相应的消息重发
}
}
});
return rabbitTemplate;
}
}
一个客户端修改了属性,后续的所有操作都会生效,并且只能设置一次,多次设置会报错。
在生产者代码中注入属性并发送消息:
@Resource(name = "confirmRabbitTemplate")
private RabbitTemplate confirmRabbitTemplate;
//confirm确认模式
@RequestMapping("/confirm")
public String confirm() {
CorrelationData correlationData =new CorrelationData("1");//设置消息的id
confirmRabbitTemplate.convertAndSend(Constant.CONFIRM_EXCHANGE,"confirm11","我是一条confirm test",correlationData);
return "confirm";
}
如果消息没有成功到达指定的交换机,ack就为false。这里不管理会消息是否到达队列。
(3)confirm模式小结
- 需要配置为确认模式
- 设置rabbitmq客户端的属性
- 消息只要成功发送到指定的交换机,ack就为true,也就是说明成功收到消息。
3.2.return退回模式
confirm模式和return模式可以一起使用并不互斥
(1)模式定义
定义:消息到达Exchange后,就会根据路由规则将消息放入指定的队列中。在这个过程中开启了return模式,只要存在一条消息无法到达队列(比如路由规则不匹配或者队列不存在),就可以选择把消息退回给发送者。选择是否发送回给发送者,就需要设置一个回调方法(和confirm模式一样)
(2)return退回模式代码编写
1)配置文件
这里上述的confirm已经配置好,这里无需再进行配置。
spring:
rabbitmq:
addresses: amqp://study:study@8.138.121.41:5672/extension1
listener:
simple:
#acknowledge-mode: none #消息确认模式
#acknowledge-mode: auto #消息确认模式
acknowledge-mode: manual #消息确认模式
publisher-confirm-type: correlated #消息发送确认(包含confirm和return)
2)编写准备代码
这里的队列和交换机使用上面的。
3)设置return回调方法
//2.返回confirm模式的
@Bean
public RabbitTemplate confirmRabbitTemplate(ConnectionFactory connectionFactory) {
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
//1.设置confirm回调方法
rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
@Override
public void confirm(CorrelationData correlationData, boolean b, String s) {
/**
* b:是否收到ack
* correlationData: 消息id
* s: 原因
*/
if(b) {
System.out.printf("接收到消息,消息id: %s \n",correlationData==null?"null":correlationData.getId());
}else {
System.out.printf("未接收到消息,消息id: %s,原因: %s \n",correlationData==null?"null":correlationData.getId(),s);
//下面可以做相应的消息重发
}
}
});
//2.设置return回调
rabbitTemplate.setMandatory(true);//开启
rabbitTemplate.setReturnsCallback(new RabbitTemplate.ReturnsCallback() {
@Override
public void returnedMessage(ReturnedMessage returnedMessage) {
System.out.println("消息已被退回:"+returnedMessage);
//具体退回逻辑省略
}
});
return rabbitTemplate;
}
4.重试机制
4.1.重试机制定义
重试机制指的是在消费消息时发送异常情况,进行对消息重新入队的机制。如果入队成功,就相当于没有消费;如果入队失败,队列就把消息删除。
(1)在消息发送时,可能会存在很多的问题导致消息发送失败,RabbitMQ的重试机制就运行消息可以重新发送
(2)一些网络问题比如:网络故障、服务器不可以、资源等问题,是有可能重新发送成功的;但是如果是由于代码的问题导致,就无法重新发送成功
(3)为了防止无休止的重试,就可以设置重试的次数
(4)重试机制需要配合消息确认机制。如果消息为自动确认,消息到达就会被自动删除,这个时候重试机制也就没有了用武之地。
4.2.重试机制代码的准备工作
(1)配置重试机制
spring:
rabbitmq:
listener:
simple:
retry:
enabled: true #开启消费者失败重试
initial-interval: 5000ms #初始失败等待时长为5秒
max-attempts: 5 # 最⼤重试次数(包括⾃⾝消费的⼀次)
(2)队列和交换机的编写
生产者:
//retry模式
@RequestMapping("/retry")
public String retry() {
//System.out.println("我进来了");
rabbitTemplate.convertAndSend(Constant.RETRY_EXCHANGE,"retry","我是retry ……");
return "retry";
}
队列和交换机:
//重试机制
@Bean("retryQueue")
public Queue retryQueue() {
return QueueBuilder.durable(Constant.RETRY_QUEUE).build();
}
@Bean("retryDirectExchange")
public DirectExchange retryDirectExchange() {
return ExchangeBuilder.directExchange(Constant.RETRY_EXCHANGE).build();
}
public Binding retryBinding(@Qualifier("retryDirectExchange")Exchange exchange,@Qualifier("retryQueue")Queue queue) {
return BindingBuilder.bind(queue).to(exchange).with("retry").noargs();
}
(3)消费者代码编写
@Configuration
public class RetryListener {
//自动确认的消费者
//@RabbitListener(queues = Constant.RETRY_QUEUE)
public void handleMessage(Message message) throws Exception {
System.out.println("消费消息:"+new String(message.getBody()));
System.out.println("异常前");
int a=9/0;
System.out.println("异常后");
}
//自动确认的消费者
@RabbitListener(queues = Constant.RETRY_QUEUE)
public void handlerMessage(Message message, Channel channel) throws Exception {
long deliveryTag = message.getMessageProperties().getDeliveryTag();
System.out.printf("["+Constant.RETRY_QUEUE+"]接收到消息: %s, deliveryTag: %s \n", new String(message.getBody(), "UTF-8"), deliveryTag);
try {
int num = 3/0;
System.out.println("业务处理完成");
channel.basicAck(deliveryTag, false);
}catch (Exception e){
channel.basicNack(deliveryTag, false, true);
}
}
}
(4)回忆消息确认机制
1)NONE:消息一旦投递给消费者,不关消费者是否成功处理,RabbitMQ都会自动确认消息,就会把消息删除。
2)MANUAL:消费者必须在成处理消息后显示的调用basicAck方法来确认消息。如果消息未被确认,RabbitMQ会认为消息尚未被成功处理,并且会重新投递消息。
3)AUTO:消费者处理成功会自动确认消息,但是如果处理失败就会抛出异常,不会确认消息。
4.3.重试机制演示
演示三种消息确认模式下的重试机制(正常消费情况无法触发重试,所以都是在异常情况下)
(1)none模式下重试
代码发送异常并进行重试:
重试次数完成还是无法入队,就进行了报错。
(2)auto模式下重试
重试次数完成还是无法入队,就进行了报错。
以上两种统称为自动确认,都是可以搭配重试机制
(3)manual模式下重试
- 发送异常否定确认并重新入队
使用的是否定确认的重复入队机制,并不会采取重试机制,最后队列不会把消息删除
- 发送异常否定确认,不进行重新入队
队列删除消息,不会进行重试