Consumer Ack
ack 指的是acknowledge 确认, 指的是消费端收到消息后的确认方式。
有三种确认方式:
- 自动确认
- 手动确认 (根据业务情况 手动确认是否成功发送)
- 根据异常情况确认
我们在消费端用代码实践一下:
首先我们定义监听器来获取消息:
@Component
public class RabbitMQListener {
@RabbitListener(queues = "boot_queue")
public void ListenerQueue(Message message){
System.out.println(Arrays.toString(message.getBody()));
}
}
你只需要指定 queues 队列 就可以了 这是最简答的一种方式。
有些新手可能会问? 拿到消息 不是要实现MessageListener接口并重写onMessage方法吗???
这种也可以 不过这是一种 比较传统的方法:
这里解释一下:
当你不使用@RabbitListener注解时,需要实现MessageListener接口并重写onMessage方法,是因为这是一种传统的方式来定义 RabbitMQ 的消息监听器。
MessageListener接口定义了一个用于处理消息的方法onMessage,当消息到达监听容器时,容器会调用该方法,并将消息作为参数传递给方法。
当你使用SimpleMessageListenerContainer或其他类似的监听容器来配置 RabbitMQ 的消息监听器时,需要显式地指定一个实现了MessageListener接口的对象作为消息监听器。容器会负责接收消息,并将其传递给监听器的onMessage方法进行处理。
而使用@RabbitListener注解的方式,是Spring AMQP提供的一种更简化和方便的方式,用于声明消息监听器的方法。通过注解标记的方法会被框架自动识别为消息监听器,并处理消息的接收和转换。
使用@RabbitListener注解的方法不需要显式实现MessageListener接口,是因为框架会自动处理消息的转换,并将消息内容作为方法的参数传递进去。
总结起来,使用@RabbitListener注解的方式是Spring AMQP提供的一种便捷方式,可以更轻松地定义和配置消息监听器。而不使用注解的情况下,需要显式实现MessageListener接口并重写onMessage方法,是一种传统的方式来定义 RabbitMQ 的消息监听器。
在yml中设置手动ack:
spring:
rabbitmq:
host: ***********
username: guest
password: guest
virtual-host: /
port: 5672
listener:
simple:
acknowledge-mode: manual
代码:
@Component
public class RabbitMQListener {
@RabbitListener(queues = "boot_queue")
public void ListenerQueue(Message message,
@Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag,
Channel channel) throws IOException {
try {
System.out.println("收到消息为"+new String(message.getBody()));
System.out.println("处理业务逻辑。。");
channel.basicAck(deliveryTag,true);
}catch (IOException e){
//拒绝签收
//这里的第三个参数表示 如果为true 消息重回到queue broker会重新发送该消息给消费端
channel.basicNack(deliveryTag,true,true);
//从系统解耦的角度上讲这个地方 应该为false 因为你业务失败了 上游是不会再给你发一次的
}
}
}
然后我们把原来的代码扩充一下 在这里实现 手动的ack。
这里有一个大坑 卡了我半天。 网上好多教程里面 这个ackmode设置直接可以在@RabbitListener注解里面设置,但是我的spring版本里面 RabbitListener接口就没有这个 ack这个属性 西巴
最后发现在yml里面配置就好了