目录
一、死信队列介绍
1.死信
2.死信的来源
2.1 TTL
2.2 死信的来源
3.死信队列
4.死信队列的用途
二、死信队列的实现
1.导入依赖 pom.xml
2.application.properties
3.配置类
4.生产者
5.业务消费者(正常消费者)
6.死信队列消费者
一、死信队列介绍
1.死信
死信顾名思义就是没办法被消费的消息;
2.死信的来源
2.1 TTL
什么是TTL?
TTL(Time To Live)翻译为生存时间,是指消息在队列中可以存活的时间,如果消息在队列中存活的时间超过了TTL,那么消息就会被标记为死信,然后进入死信队列;
2.2 死信的来源
- 消息TTL过期;
- 队列达到最大长度: 队列满了无法再添加消息,就会成为死信,然后进入死信队列;
- 消息被拒绝,比如我们设置了消息的应答模式为手动应但是没有调用ack方法,那么消息就会被标记为死信,然后进入死信队列;
3.死信队列
我们能了解到,消息生产者生产消息,消费者消费(处理消息),消息生产者发送消息到队列,消费者从队列中获取消息,某些消息会无法被消费就会成为死信,自然而然的,我们需要一个队列来存储死信,而这个队列就被成为死信队列;
4.死信队列的用途
首先呢一个事物能够存在就说明他有存在的理由,死信队列其实一般来做一个定时的作用 例如:
- 在保证订单业务中的消息数据不丢失,当消息没有被处理或者是超出了TTL时间,那么我们就可以将他放在死信队列中,然后定时去消费死信队列中的消息,然后进行相应的处理;
- 如果这个消息是被动的,就是说我们想让他被消费但是没有被消费那么其实就是保证了消息的不丢失;
- 如果是一个主动的,我们设置了我们需要的TTL,那么就可以成为一个定时功能。比如取消支付功能;
1.2.1 延迟队列:
如果是这个消息使我们故意的想让发到死信队列中,其实我们可以将他叫做为延时队列,我们可以设置一个时间,比如我们想让这个消息延迟10分钟再发送到死信队列中,那么我们就可以将这个消息发送到延迟队列中,然后定时去消费延迟队列中的消息,然后进行相应的处理;
二、死信队列的实现
1.导入依赖 pom.xml
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.amqp</groupId>
<artifactId>spring-rabbit-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
2.application.properties
spring.application.name=springboot-rabbitmq
server.port=8080
#默认地址就是127.0.0.1:5672,如果是服务器的rabbitmq就改下
spring.rabbitmq.host=192.168.174.130
spring.rabbitmq.port=5672
spring.rabbitmq.username=admin
spring.rabbitmq.password=admin
spring.rabbitmq.listener.type=simple
#设置为false,会丢弃消息或者重新发步到死信队列
spring.rabbitmq.listener.simple.default-requeue-rejected=false
#手动签收
spring.rabbitmq.listener.simple.acknowledge-mode=manual
#虚拟主机目录
spring.rabbitmq.virtual-host=/
3.配置类
@Configuration
public class rabbitMQConf {
//普通交换机的名字
public static final String NORMAL_EXCHANGE = "normalExchange";
//普通队列的名字
public static final String NORMAL_QUEUE = "normalQueue";
//死信交换机的名字
public static final String DEAD_EXCHANGE = "deadExchange";
//死信队列的名字
public static final String DEAD_QUEUE = "deadQueue";
/**
* 普通交换机
*/
@Bean
public DirectExchange normalExchange() {
return new DirectExchange(NORMAL_EXCHANGE);
}
/*
普通队列
*/
@Bean
public Queue normalQueue() {
return new Queue(NORMAL_QUEUE);
}
/**
* 死信交换机 死信队列
*/
@Bean
public DirectExchange deadExchange() {
return new DirectExchange(DEAD_EXCHANGE);
}
/**
* 死信队列
*/
@Bean
public Queue deadQueue() {
return new Queue(DEAD_QUEUE);
}
/**
* 绑定正常队列
*/
@Bean
public Binding normalBinding() {
return BindingBuilder.bind(normalQueue()).to(normalExchange()).with("normal");
}
/**
* 死信队列绑定
* @return
*/
@Bean
public Binding deadBinding() {
return BindingBuilder.bind(deadQueue()).to(deadExchange()).with("dead");
}
}
4.生产者
@Slf4j
@RestController
@RequestMapping("/test")
public class SendMessageController {
@Autowired
private RabbitTemplate rabbitTemplate;
@GetMapping("/sendMsg/{msg}")
public String sendMsg(@PathVariable(value = "msg") String msg) {
log.info("send msg:" + msg);
rabbitTemplate.convertAndSend(NORMAL_EXCHANGE, "normal", msg);
return "success";
}
}
5.业务消费者(正常消费者)
@Service
@Slf4j
public class NormalMessageReceiver {
/**
* 消费消息
*/
@RabbitListener(queues = NORMAL_QUEUE)
@SneakyThrows
public void receive(Message msg, Channel channel) {
String s = msg.getBody().toString();
String s1 = new String(msg.getBody());
log.info("这个是toString方式得出来的s:{}", s);
log.info("这个是new String方式得出来的s:{}", s1);
boolean ack=true;
Exception exception=null;
try {
if (s1.contains("dead")){
throw new RuntimeException("dead letter exception");
}
} catch (RuntimeException e) {
ack=false;
exception=e;
}
if (!ack){
System.out.println("error msg{ }"+exception.getMessage());
//设置死信消息
channel.basicNack(msg.getMessageProperties().getDeliveryTag(),false,false);
}else {
channel.basicAck(msg.getMessageProperties().getDeliveryTag(),false);
}
System.out.println("正常消息消费者收到消息:" + msg);
}
}
6.死信队列消费者
@Component
public class DeadMessageReceiver {
/**
* 死信队列
*/
@RabbitListener(queues = rabbitMQConf.DEAD_QUEUE)
public void receiveA(Message message, Channel channel) throws IOException {
System.out.println("DeadMessageA{}" + new String(message.getBody()));
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
}
}