// consumer处理成功后,通知broker删除队列中的消息,如果设置multiple=true,表示支持批量确认机制以减少网络流量
channel.basicAck(deliveryTag, multiple);
// 拒绝deliveryTag对应的消息,第二个参数是否requeue,true则重新入队列,否则丢弃或者进入死信队列,该方法reject后,该消费者还是会消费到该条被reject的消息
channel.basicReject(deliveryTag, requeue);
// 不确认 deliveryTag 对应的消息,第二个参数是否应用于多消息,第三个参数是否requeue,与basic.reject区别就是同时支持多个消息,可以nack该消费者先前接收未ack的所有消息。nack后的消息也会被自己消费到。
channel.basicNack(deliveryTag, multiple, requeue);
// 是否恢复消息到队列,参数是是否requeue,true则重新入队列,并且尽可能的将之前recover的消息投递给其他消费者消费,而不是自己再次消费。false则消息会重新被投递给自己。
channel.basicRecover(false);
搭建项目
spring:
rabbitmq:
host: 127.0.0.1
port: 5672
username: guest
password: guest
listener:
type: simple
simple:
default-requeue-rejected: false
acknowledge-mode: manual
@Configuration
public class RabbitMQConfig {
// 正常业务
public static final String NORMAL_EXCHANGE_A = "normal-exchange-a";
public static final String NORMAL_QUEUE_A = "normal-queue-a";
public static final String NORMAL_ROUTING_KEY_A = "normal-routing-key-a";
// 死信队列
public static final String DEAD_EXCHANGE_A = "dead-exchange-a";
public static final String DEAD_QUEUE_A = "dead-queue-a";
public static final String DEAD_ROUTING_KEY_A = "dead-routing-key-a";
// 声明交换机
@Bean("businessExchange")
public TopicExchange normalExchangeA() {
return new TopicExchange(NORMAL_EXCHANGE_A);
}
@Bean("deadExchange")
public DirectExchange deadExchange() {
return new DirectExchange(DEAD_EXCHANGE_A);
}
// 声明队列
@Bean("businessQueueA")
public Queue businessQueueA() {
HashMap<String, Object> args = new HashMap<>(3);
args.put("x-dead-letter-exchange", DEAD_EXCHANGE_A);
args.put("x-dead-letter-routing-key", DEAD_ROUTING_KEY_A);
return QueueBuilder.durable(NORMAL_QUEUE_A).withArguments(args).build();
}
@Bean("deadQueueA")
public Queue deadQueue() {
return QueueBuilder.durable(DEAD_QUEUE_A).build();
}
// 声明绑定关系
@Bean
public Binding bindingA(@Qualifier("businessQueueA") Queue queue, @Qualifier("businessExchange") TopicExchange exchange) {
return BindingBuilder.bind(queue).to(exchange).with(NORMAL_ROUTING_KEY_A);
}
@Bean
public Binding bindingDead(@Qualifier("deadQueueA") Queue queue, @Qualifier("deadExchange") DirectExchange exchange) {
return BindingBuilder.bind(queue).to(exchange).with(DEAD_ROUTING_KEY_A);
}
}
@Component
public class SmsListener {
@RabbitListener(queues = RabbitMQConfig.NORMAL_QUEUE_A)
public void smsListener(Message message, Channel channel) throws IOException {
String body = new String(message.getBody());
System.out.println("收到消息:" + body);
long deliveryTag = message.getMessageProperties().getDeliveryTag();
if (body.contains("dead")) {
channel.basicNack(deliveryTag, false, false);
}
channel.basicAck(deliveryTag, false);
}
}
@Component
public class DeadListener {
@RabbitListener(queues = RabbitMQConfig.DEAD_QUEUE_A)
public void deadListener(Message message, Channel channel) throws IOException {
String body = new String(message.getBody());
System.out.println("dead listener: " + body);
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
}
}
@RestController
public class HelloController {
@Resource
private RabbitTemplate rabbitTemplate;
@GetMapping("/hello")
public Boolean hello(String msg) {
rabbitTemplate.convertAndSend(RabbitMQConfig.NORMAL_EXCHANGE_A, RabbitMQConfig.NORMAL_ROUTING_KEY_A, msg);
return true;
}
}
使用注解
@Configuration
public class RabbitMQConfig {
// 正常业务
public static final String NORMAL_EXCHANGE_A = "normal-exchange-a";
public static final String NORMAL_QUEUE_A = "normal-queue-a";
public static final String NORMAL_ROUTING_KEY_A = "normal-routing-key-a";
// 死信队列
public static final String DEAD_EXCHANGE_A = "dead-exchange-a";
public static final String DEAD_QUEUE_A = "dead-queue-a";
public static final String DEAD_ROUTING_KEY_A = "dead-routing-key-a";
// 声明交换机
@Bean("businessExchange")
public TopicExchange normalExchangeA() {
return new TopicExchange(NORMAL_EXCHANGE_A);
}
// 声明队列
@Bean()
public Queue businessQueueA() {
HashMap<String, Object> args = new HashMap<>(2);
args.put("x-dead-letter-exchange", DEAD_EXCHANGE_A);
args.put("x-dead-letter-routing-key", DEAD_ROUTING_KEY_A);
return QueueBuilder.durable(NORMAL_QUEUE_A).withArguments(args).build();
}
// 声明绑定关系
@Bean
public Binding bindingA(@Qualifier("businessQueueA") Queue queue, @Qualifier("businessExchange") TopicExchange exchange) {
return BindingBuilder.bind(queue).to(exchange).with(NORMAL_ROUTING_KEY_A);
}
}
死信队列使用注解实现
@Component
public class DeadListener {
@RabbitListener(bindings = @QueueBinding(
value = @Queue(value = RabbitMQConfig.DEAD_QUEUE_A),
exchange = @Exchange(value = RabbitMQConfig.DEAD_EXCHANGE_A, type = ExchangeTypes.DIRECT),
key = RabbitMQConfig.DEAD_ROUTING_KEY_A
))
public void deadListener(Message message, Channel channel) throws IOException {
String body = new String(message.getBody());
System.out.println("死信队列消费消息: " + body);
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
}
}
@Component
public class SmsListener {
@RabbitListener(queues = RabbitMQConfig.NORMAL_QUEUE_A)
// @RabbitListener(bindings = @QueueBinding(
// value = @Queue(value = RabbitMQConfig.NORMAL_QUEUE_A),
// exchange = @Exchange(value = RabbitMQConfig.NORMAL_EXCHANGE_A, type = ExchangeTypes.TOPIC),
// key = RabbitMQConfig.NORMAL_ROUTING_KEY_A
// ))
public void smsListener(Message message, Channel channel) throws IOException {
String body = new String(message.getBody());
System.out.println("正常消费消息:" + body);
long deliveryTag = message.getMessageProperties().getDeliveryTag();
if (body.contains("dead")) {
channel.basicNack(deliveryTag, false, false);
// return;
}
channel.basicAck(deliveryTag, false);
}
}
报错:
Shutdown Signal: channel error; protocol method: #method<channel.close>(reply-code=406, reply-text=PRECONDITION_FAILED - unknown delivery tag 1, class-id=60, method-id=80)
// 由于程序编写不严谨,在 basicNack 执行后没有退出方法,导致最后还执行了 basicAck,出现了上述错误
@RabbitListener(bindings = @QueueBinding(
value = @Queue(value = RabbitMQConfig.NORMAL_QUEUE_A),
exchange = @Exchange(value = RabbitMQConfig.NORMAL_EXCHANGE_A, type = ExchangeTypes.TOPIC),
key = RabbitMQConfig.NORMAL_ROUTING_KEY_A
))
public void smsListener(Message message, Channel channel) throws IOException {
String body = new String(message.getBody());
System.out.println("正常消费消息:" + body);
long deliveryTag = message.getMessageProperties().getDeliveryTag();
if (body.contains("dead")) {
channel.basicNack(deliveryTag, false, false);
return;
}
channel.basicAck(deliveryTag, false);
}
// 问题二: 控制台报错,但是也能正常消费mq消息,这里与第一种唯一的区别是在于 @RabbitListener, 我的推测是 自定义 bean 和注解生成的 bean 重复导致,看能不能使用注解绑定死信队列
2023-04-23 22:03:25.630 ERROR 8580 --- [ 127.0.0.1:5672] o.s.a.r.c.CachingConnectionFactory : Shutdown Signal: channel error; protocol method: #method<channel.close>(reply-code=406, reply-text=PRECONDITION_FAILED - inequivalent arg 'x-dead-letter-exchange' for queue 'normal-queue-a' in vhost '/': received none but current is the value 'dead-exchange-a' of type 'longstr', class-id=50, method-id=10)
Broker not available; cannot force queue declarations during start: java.io.IOException
@Target({})
@Retention(RetentionPolicy.RUNTIME)
public @interface Exchange {
String TRUE = "true";
String FALSE = "false";
@AliasFor("name")
String value() default "";
@AliasFor("value")
String name() default "";
String type() default "direct";
String durable() default "true";
String autoDelete() default "false";
String internal() default "false";
String ignoreDeclarationExceptions() default "false";
String delayed() default "false";
Argument[] arguments() default {};
String declare() default "true";
String[] admins() default {};
}
@RabbitListener(bindings = @QueueBinding(
value = @Queue(value = RabbitMQConfig.NORMAL_QUEUE_A),
exchange = @Exchange(value = RabbitMQConfig.NORMAL_EXCHANGE_A, type = ExchangeTypes.TOPIC, arguments = {
@Argument(name = "x-dead-letter-exchange", value = RabbitMQConfig.DEAD_EXCHANGE_A),
@Argument(name = "x-dead-letter-routing-key", value = RabbitMQConfig.DEAD_ROUTING_KEY_A)
}),
key = RabbitMQConfig.NORMAL_ROUTING_KEY_A
))
public void smsListener(Message message, Channel channel) throws IOException {
String body = new String(message.getBody());
System.out.println("正常消费消息:" + body);
long deliveryTag = message.getMessageProperties().getDeliveryTag();
if (body.contains("dead")) {
channel.basicNack(deliveryTag, false, false);
return;
}
channel.basicAck(deliveryTag, false);
}
可以使用注解的方式来绑定 死信队列,但是还是会报上面的错误,继续修改 参数试试
java - How to set x-dead-letter-exchange in Rabbit? - Stack Overflow
但是使用注解绑定的话好像又不生效了,问题原因,tmd将死信参数绑到交换机上了,c
修改代码
@RabbitListener(bindings = @QueueBinding(
value = @Queue(value = RabbitMQConfig.NORMAL_QUEUE_A, durable = "false", arguments = {
@Argument(name = "x-dead-letter-exchange", value = RabbitMQConfig.DEAD_EXCHANGE_A),
@Argument(name = "x-dead-letter-routing-key", value = RabbitMQConfig.DEAD_ROUTING_KEY_A)
}),
exchange = @Exchange(value = RabbitMQConfig.NORMAL_EXCHANGE_A, durable = "false", type = ExchangeTypes.TOPIC),
key = RabbitMQConfig.NORMAL_ROUTING_KEY_A
))
public void smsListener(Message message, Channel channel) throws IOException {
String body = new String(message.getBody());
System.out.println("正常消费消息:" + body);
long deliveryTag = message.getMessageProperties().getDeliveryTag();
if (body.contains("dead")) {
channel.basicNack(deliveryTag, false, false);
return;
}
channel.basicAck(deliveryTag, false);
}
至于问题二是由于队列和交换机默认持久化,这样就导第二次启动项目时重复
Springboot纯注解版的RabbitMq 死信队列_注解声明私信队列_lopo呀的博客-CSDN博客
全注解版
spring:
rabbitmq:
host: 127.0.0.1
port: 5672
username: guest
password: guest
listener:
type: simple
simple:
default-requeue-rejected: false
acknowledge-mode: manual
// 正常业务
public static final String NORMAL_EXCHANGE_A = "normal-exchange-a";
public static final String NORMAL_QUEUE_A = "normal-queue-a";
public static final String NORMAL_ROUTING_KEY_A = "normal-routing-key-a";
// 死信队列
public static final String DEAD_EXCHANGE_A = "dead-exchange-a";
public static final String DEAD_QUEUE_A = "dead-queue-a";
public static final String DEAD_ROUTING_KEY_A = "dead-routing-key-a";
@Component
public class SmsListener {
@RabbitListener(bindings = @QueueBinding(
value = @Queue(value = RabbitMQConfig.NORMAL_QUEUE_A, durable = "false", arguments = {
@Argument(name = "x-dead-letter-exchange", value = RabbitMQConfig.DEAD_EXCHANGE_A),
@Argument(name = "x-dead-letter-routing-key", value = RabbitMQConfig.DEAD_ROUTING_KEY_A)
}),
exchange = @Exchange(value = RabbitMQConfig.NORMAL_EXCHANGE_A, durable = "false", type = ExchangeTypes.TOPIC),
key = RabbitMQConfig.NORMAL_ROUTING_KEY_A
))
public void smsListener(Message message, Channel channel) throws IOException {
String body = new String(message.getBody());
System.out.println("正常消费消息:" + body);
long deliveryTag = message.getMessageProperties().getDeliveryTag();
if (body.contains("dead")) {
channel.basicNack(deliveryTag, false, false);
return;
}
channel.basicAck(deliveryTag, false);
}
}
@Component
public class DeadListener {
@RabbitListener(bindings = @QueueBinding(
value = @Queue(value = RabbitMQConfig.DEAD_QUEUE_A, durable = "false"),
exchange = @Exchange(value = RabbitMQConfig.DEAD_EXCHANGE_A, durable = "false"),
key = RabbitMQConfig.DEAD_ROUTING_KEY_A
))
public void deadListener(Message message, Channel channel) throws IOException {
String body = new String(message.getBody());
System.out.println("死信队列消费消息: " + body);
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
}
}
@GetMapping("/hello")
public Boolean hello(String msg) {
System.out.println("发送消息:" + msg);
rabbitTemplate.convertAndSend(RabbitMQConfig.NORMAL_EXCHANGE_A, RabbitMQConfig.NORMAL_ROUTING_KEY_A, msg);
return true;
}
// conslog
发送消息:dead
正常消费消息:dead
死信队列消费消息: dead
明天在研究下回调啥的
springboot整合rabbitMQ confirm 确认模式 return 退回模式_weixin_44318244的博客-CSDN博客
回调
在使用 RabbitMQ 的时候,作为消息发送方希望杜绝任何消息丢失或者投递失败场景。 RabbitMQ 为我们提供了两种方式用来控制消息的投递可靠性模式。
-
confirm 确认模式
-
return 退回模式
rabbitmq 整个消息投递的路径为:producer—>rabbitmq broker—>exchange—>queue—>consumer
-
消息从 producer 到 exchange 则会返回一个 confirmCallback 。
-
消息从 exchange–>queue 投递失败则会返回一个 returnCallback
我们将利用这两个 callback 控制消息的可靠性投递
消息的可靠投递小结 ➢ 设置ConnectionFactory的publisher-confirms=“true” 开启 确认模式。 ➢ 使用rabbitTemplate.setConfirmCallback设置回调函数。当消息发送到exchange后回调confirm方法。在方法中判断ack,如果为true,则发送成功,如果为false,则发送失败,需要处理。
➢ 设置ConnectionFactory的publisher-returns=“true” 开启 退回模式。 ➢ 使用rabbitTemplate.setReturnCallback设置退回函数,当消息从exchange路由到 queue失败后,如果设置了rabbitTemplate.setMandatory(true)参数,则会将消息退回给producer。并执行回调函数returnedMessage。
确认模式
spring:
rabbitmq:
host: 127.0.0.1
port: 5672
username: guest
password: guest
listener:
type: simple
simple:
default-requeue-rejected: false
acknowledge-mode: manual
publisher-confirm-type: correlated # 发布确认属性配置
publisher-returns: true # 开启 退回模式
public enum ConfirmType {
/**
* Use {@code RabbitTemplate#waitForConfirms()} (or {@code waitForConfirmsOrDie()}
* within scoped operations.
SIMPLE值经测试有两种效果,其一效果和CORRELATED值一样会触发回调方法,其二在发布消息成功后使用rabbitTemplate调用waitForConfirms或 waitForConfirmsOrDie方法等待broker节点返回发送结果,根据返回结果来判定下一步的逻辑,要注意的点是waitForConfirmsOrDie方法如果返回false则会 关闭channel,则接下来无法发送消息到broker;
*/
SIMPLE,
/**
* Use with {@code CorrelationData} to correlate confirmations with sent 发布消息成功到交换器后会触发回调方法
* messsages.
*/
CORRELATED,
/**
* Publisher confirms are disabled (default).
*/
NONE
}
@Configuration
public class PublisherConfirmHandler implements RabbitTemplate.ConfirmCallback {
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
if (ack){
System.out.println("发送消息到交换机成功!MessageId: " + correlationData.getId());
}else {
System.out.println("发送消息到交换机失败!MessageId: " + correlationData.getId() + ", 退回原因:" + cause);
}
}
}
@Resource
private PublisherConfirmHandler publisherConfirmHandler;
rabbitTemplate.setConfirmCallback(publisherConfirmHandler);
回退模式
@Configuration
public class ReturnsCallbackHandler implements RabbitTemplate.ReturnsCallback {
@Override
public void returnedMessage(ReturnedMessage returned) {
System.out.println("return 执行了!" + returned);
}
}
//
rabbitTemplate.setMandatory(true);
rabbitTemplate.setReturnsCallback(returnsCallbackHandler);