springboot整合rabbitmq死信队列
什么是死信
说道死信,可能大部分观众大姥爷会有懵逼的想法,什么是死信?
死信队列,俗称DLX,翻译过来的名称为Dead Letter Exchange 死信交换机。
当消息限定时间内未被消费,成为 Dead Message后,可以被重新发送到另一个交换机中,发挥其应有的价值!
需要测试死信队列,则需要先梳理整体的思路,如可以采取如下方式进行配置:
从上面的逻辑图中,可以发现大致的思路:
.1. 消息队列分为正常交换机、正常消息队列;以及死信交换机和死信队列。
2. 正常队列针对死信信息,需要将数据 重新 发送至死信交换机中。
死信使用的场景
- 消息被拒绝
- 消息ttl过期
- 队列达到最大长度
这三种场景就会成为死信,然后放入死信交换机
import org.springframework.amqp.core.\*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.HashMap;
import java.util.Map;
@Configuration
public class RabbitmqConfig {
//正常交换机的名字
public final static String EXCHANGE\_NAME = "exchange\_name";
//正常队列的名字
public final static String QUEUE\_NAME="queue\_name";
//死信交换机的名字
public final static String EXCHANGE\_DEAD = "exchange\_dead";
//死信队列的名字
public final static String QUEUE\_DEAD="queue\_dead";
//死信路由key
public final static String DEAD\_KEY="dead.key";
//创建正常交换机
@Bean(EXCHANGE\_NAME)
public Exchange exchange(){
return ExchangeBuilder.topicExchange(EXCHANGE\_NAME)
//持久化 mq重启后数据还在
.durable(true)
.build();
}
//创建正常队列
@Bean(QUEUE\_NAME)
public Queue queue(){
//正常队列和死信进行绑定 转发到 死信队列,配置参数
Map<String,Object>map=getMap();
return new Queue(QUEUE\_NAME,true,false,false,map);
}
//正常队列绑定正常交换机 设置规则 执行绑定 定义路由规则 requestmaping映射
@Bean
public Binding binding(@Qualifier(QUEUE\_NAME) Queue queue,
@Qualifier(EXCHANGE\_NAME) Exchange exchange){
return BindingBuilder.bind(queue)
.to(exchange)
//路由规则
.with("app.#")
.noargs();
}
//创建死信队列
@Bean(QUEUE\_DEAD)
public Queue queueDead(){
return new Queue(QUEUE\_DEAD,true,false,false);
}
//创建死信交换机
@Bean(EXCHANGE\_DEAD)
public Exchange exchangeDead(){
return ExchangeBuilder.topicExchange(EXCHANGE\_DEAD)
.durable(true) //持久化 mq重启后数据还在
.build();
}
//绑定死信队列和死信交换机
@Bean
public Binding deadBinding(){
return BindingBuilder.bind(queueDead())
.to(exchangeDead())
//路由规则 正常路由key
.with(DEAD\_KEY)
.noargs();
}
/\*\*
获取死信的配置信息
\*
\*\*/
public Map<String,Object>getMap(){
//3种方式 任选其一,选择其他方式之前,先把交换机和队列删除了,在启动项目,否则报错。
//方式一
Map<String,Object> map=new HashMap<>(16);
//死信交换器名称,过期或被删除(因队列长度超长或因空间超出阈值)的消息可指定发送到该交换器中;
map.put("x-dead-letter-exchange", EXCHANGE\_DEAD);
//死信消息路由键,在消息发送到死信交换器时会使用该路由键,如果不设置,则使用消息的原来的路由键值
map.put("x-dead-letter-routing-key", DEAD\_KEY);
//方式二
//消息的过期时间,单位:毫秒;达到时间 放入死信队列
// map.put("x-message-ttl",5000);
//方式三
//队列最大长度,超过该最大值,则将从队列头部开始删除消息;放入死信队列一条数据
// map.put("x-max-length",3);
return map;
}
}
配置文件信息
spring:
rabbitmq:
host: 192.168.23.135
port: 5672
username: admin
password: admin
#虚拟主机
virtual-host: dmg-a
listener:
simple:
#自动ack
acknowledge-mode: auto
retry:
#最大重试次数
max-attempts: 3
#开启重试
enabled: true
引入 rabbitmq 依赖
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
生产者
@RestController
@RequestMapping("p")
public class TestController {
@Autowired
private RabbitTemplate rabbitTemplate;
@GetMapping("/test")
public String test(){
//正常交换机 正常路由键 正常消息内容
rabbitTemplate.convertAndSend(RabbitmqConfig.EXCHANGE\_NAME
,"app.test","我是生产者");
return "aa";
}
}
//消费者
@Component
public class Xf {
//监听正常队列名称
@RabbitListener(queues = {RabbitmqConfig.QUEUE\_NAME})
public void normal(String payload, Message message, Channel channel) throws IOException {
System.out.println("正常消息:"+payload);
long tag=message.getMessageProperties().getDeliveryTag();
try{
// int i=1/0;
//手动签收
channel.basicAck(tag,true);
}catch (RuntimeException runtimeException){
//出现异常 删除消息 放入死信队列
channel.basicReject(tag,false);
}
}
监听死信队列名称
@RabbitListener(queues = {RabbitmqConfig.QUEUE\_DEAD})
public void dead(String payload, Message message, Channel channel) throws IOException {
System.out.println("死信队列:"+payload);
//删除消息 放入数据库 人工处理
long deliveryTag=message.getMessageProperties().getDeliveryTag();
channel.basicAck(deliveryTag,true);
}
}