配置文件
spring:
rabbitmq:
publisher-confirm-type: correlated #开启确认回调
publisher-returns: true #开启返回回调
listener:
simple:
acknowledge-mode: manual #设置手动接受消息
消息从生产者到交换机
无论消息是否到交换机ConfirmCallback都会触发。
@Resource
private RabbitTemplate rabbitTemplate;
@PostConstruct
public void init() {
//构造方法执行之后执行,用于初始化一些信息
rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
if (ack) {
log.info("消息成功到达交换机");
return;
}
//未到达交换机可以采取一系列措施保证消息不会丢失
log.error("消息未发送到交换机{}", cause);
}
});
}
消息从交换机到队列
只有消息没到达队列才会触发ReturnsCallback
@Resource
private RabbitTemplate rabbitTemplate;
@PostConstruct
public void init() {
rabbitTemplate.setReturnsCallback(new RabbitTemplate.ReturnsCallback() {
@Override
public void returnedMessage(ReturnedMessage returned) {
log.error("消息没有从交换机到达队列{}", returned.getReplyText());
}
});
}
消息从队列到消费者(ACK)
消息默认是自动确认的(手动确认需配置文件开启),无论消息是否被成功消费都会被确认,确认后消息就会自动删除
Channel
接口里有三个方法
// deliveryTag消息的唯一表示 multiple 为true可以批量处理这条消息之前的所有消息,假设你的消费者从 RabbitMQ 中获取了一批消息,然后在处理完这批消息后,你可以一次性确认所有消息,而不需要一个一个地确认。requeue 是否重新入队,不重新入队就会变成死信,如果配置了死信交换机和死信队列就会进入死信队列,没有配置消息就直接删除
void basicAck(long deliveryTag, boolean multiple) //确认消息
void basicNack(long deliveryTag, boolean multiple, boolean requeue)//不确认消息
void basicReject(long deliveryTag, boolean requeue)// 拒绝消息
示例代码
@RabbitListener(queues = {"queue.direct.i"})
public void receiveMessage2(Message message, Channel channel) {
MessageProperties messageProperties = message.getMessageProperties();
//消息的唯一标识,发消息时自动添加,消息的身份证
long deliveryTag = messageProperties.getDeliveryTag();
try {
byte[] body = message.getBody();
log.info("接收到的消息为{}", new String(body));
//multiple false 表示只确认当前消息 true 确认所有消息
channel.basicAck(deliveryTag, false);
} catch (Exception e) {
log.error("处理过程出错{}", e.getMessage());
try {
//requeue true 重新入队 false 进入死信队列,如果没有死信队列则直接删除
channel.basicNack(deliveryTag, false, false);
} catch (IOException ex) {
throw new RuntimeException(ex);
}
throw new RuntimeException(e);
}
}