RabbitMQ的DLX
- 1、RabbitMQ死信队列
- 2、代码示例
- 2.1、队列过期
- 2.1.1、配置类RabbitConfig(关键代码)
- 2.1.2、业务类MessageService
- 2.1.3、配置文件application.yml
- 2.1.4、启动类
- 2.1.5、配置文件
- 2.1.6、测试
- 2.2、消息过期
- 2.2.1、配置类RabbitConfig
- 2.2.2、业务类MessageService(关键代码)
- 2.2.3、配置文件application.yml
- 2.2.4、启动类同上
- 2.2.5、配置文件同上
- 2.2.6、测试
- 2.3、队列达到最大长度(先入队的消息会被发送到DLX)
- 2.3.1、配置类RabbitConfig(关键代码)
- 2.3.2、业务类MessageService(关键代码)
- 2.3.3、配置文件application.yml
- 2.3.4、启动类同上
- 2.3.5、配置文件pom.xml同上
- 2.3.6、测试
- 2.4、消费者拒绝消息不进行重新投递
- 2.4.1、生产者
- 2.4.1.1、生产者application.yml
- 2.4.1.2、生产者发送消息
- 2.4.1.3、生产者配置类
- 2.4.2、消费者
- 2.4.2.1、消费者application.yml 启动手动确认
- 关键配置
- 2.4.2.2、消费者接收消息
- 关键代码
- 2.4.3、测试
1、RabbitMQ死信队列
RabbitMQ死信队列也有叫 死信交换机、死信邮箱等说法。
DLX: Dead-Letter-Exchange 死信交换器,死信邮箱。
1-2、生产者发送一个消息到正常交换机
2-4、正常交换机接收到消息发送到正常队列
4、正常队列设置了队列过期时间,超时消息会自动删除
4-6、原本过期自动删除的消息发送到了死信交换机
6-8、死信交换机将消息发送到了死信队列
如上情况下一个消息会进入DLX(Dead Letter Exchange)死信交换机。
2、代码示例
2.1、队列过期
2.1.1、配置类RabbitConfig(关键代码)
package com.power.config;
import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.HashMap;
import java.util.Map;
@Configuration
public class RabbitConfig {
@Value("${my.exchangeNormalName}")
private String exchangeNormalName;
@Value("${my.queueNormalName}")
private String queueNormalName;
@Value("${my.exchangeDlxName}")
private String exchangeDlxName;
@Value("${my.queueDlxName}")
private String queueDlxName;
/**
* 正常交换机
* @return
*/
@Bean
public DirectExchange normalExchange(){
return ExchangeBuilder.directExchange(exchangeNormalName).build();
}
/**
* 正常队列
* @return
*/
@Bean
public Queue normalQueue(){
Map<String, Object> arguments = new HashMap<>();
arguments.put("x-message-ttl",20000);//设置队列的过期时间为20秒
//重点:设置这两个参数
arguments.put("x-dead-letter-exchange",exchangeDlxName); //设置队列的死信交换机
arguments.put("x-dead-letter-routing-key","error");//设置死信路由key,要跟死信交换机和死信队列绑定的路由key一致
return QueueBuilder.durable(queueNormalName)
.withArguments(arguments) //设置队列的过期时间
.build();
}
/**
* 正常交换机和正常队列绑定
* @param normalExchange
* @param normalQueue
* @return
*/
@Bean
public Binding bingNormal(DirectExchange normalExchange,Queue normalQueue){
return BindingBuilder.bind(normalQueue).to(normalExchange).with("order");
}
/**
* 死信交换机
* @return
*/
@Bean
public DirectExchange dlxExchange(){
return ExchangeBuilder.directExchange(exchangeDlxName).build();
}
/**
* 死信队列
* @return
*/
@Bean
public Queue dlxQueue(){
return QueueBuilder.durable(queueDlxName).build();
}
/**
* 死信交换机和死信队列绑定
* @param dlxExchange
* @param dlxQueue
* @return
*/
@Bean
public Binding bindDlx(DirectExchange dlxExchange,Queue dlxQueue){
return BindingBuilder.bind(dlxQueue).to(dlxExchange).with("error");
}
}
2.1.2、业务类MessageService
package com.power.service;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageBuilder;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Service;
import javax.annotation.Resource;
import java.util.Date;
@Service
@Slf4j
public class MessageService {
@Resource
private RabbitTemplate rabbitTemplate;
@Bean
public void sendMsg(){
Message message = MessageBuilder.withBody("hello world".getBytes()).build();
rabbitTemplate.convertAndSend("exchange.normal.a","order",message);
log.info("消息发送完毕,发送时间是:"+new Date());
}
}
2.1.3、配置文件application.yml
server:
port: 8080
spring:
application:
name: dlx-test01
rabbitmq:
host: 你的服务器IP
port: 5672
username: 你的账号
password: 你的密码
virtual-host: power
my:
exchangeNormalName: exchange.normal.a #正常交换机
queueNormalName: queue.normal.a #正常队列,没有消费组,设置过期时间
exchangeDlxName: exchange.dlx.a #死信交换机
queueDlxName: queue.dlx.a #死信队列
2.1.4、启动类
package com.power;
import com.power.service.MessageService;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import javax.annotation.Resource;
@SpringBootApplication
public class Application implements ApplicationRunner {
@Resource
private MessageService messageService;
public static void main(String[] args) {
SpringApplication.run(Application.class);
}
@Override
public void run(ApplicationArguments args) throws Exception {
messageService.sendMsg();
}
}
2.1.5、配置文件
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.power</groupId>
<artifactId>rabbit_06_dlx01</artifactId>
<version>1.0-SNAPSHOT</version>
<name>rabbit_06_dlx01</name>
<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.6.13</version>
<relativePath/>
</parent>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.24</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
2.1.6、测试
启动程序,发送消息:
消息会先被发送到正常队列queue.normal.a中,超时未被消费,
则消息会被发送到死信队列queue.dlx.a 中
2.2、消息过期
2.2.1、配置类RabbitConfig
package com.power.config;
import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.HashMap;
import java.util.Map;
@Configuration
public class RabbitConfig {
@Value("${my.exchangeNormalName}")
private String exchangeNormalName;
@Value("${my.queueNormalName}")
private String queueNormalName;
@Value("${my.exchangeDlxName}")
private String exchangeDlxName;
@Value("${my.queueDlxName}")
private String queueDlxName;
/**
* 正常交换机
* @return
*/
@Bean
public DirectExchange normalExchange(){
return ExchangeBuilder.directExchange(exchangeNormalName).build();
}
/**
* 正常队列
* @return
*/
@Bean
public Queue normalQueue(){
Map<String, Object> arguments = new HashMap<>();
//重点:设置这两个参数
//设置队列的死信交换机
arguments.put("x-dead-letter-exchange",exchangeDlxName);
//设置死信路由key,要跟死信交换机和死信队列绑定的路由key一致
arguments.put("x-dead-letter-routing-key","error");
return QueueBuilder.durable(queueNormalName)
.withArguments(arguments) //设置队列的过期时间
.build();
}
/**
* 正常交换机和正常队列绑定
* @param normalExchange
* @param normalQueue
* @return
*/
@Bean
public Binding bingNormal(DirectExchange normalExchange,Queue normalQueue){
return BindingBuilder.bind(normalQueue).to(normalExchange).with("order");
}
/**
* 死信交换机
* @return
*/
@Bean
public DirectExchange dlxExchange(){
return ExchangeBuilder.directExchange(exchangeDlxName).build();
}
/**
* 死信队列
* @return
*/
@Bean
public Queue dlxQueue(){
return QueueBuilder.durable(queueDlxName).build();
}
/**
* 死信交换机和死信队列绑定
* @param dlxExchange
* @param dlxQueue
* @return
*/
@Bean
public Binding bindDlx(DirectExchange dlxExchange,Queue dlxQueue){
return BindingBuilder.bind(dlxQueue).to(dlxExchange).with("error");
}
}
2.2.2、业务类MessageService(关键代码)
package com.power.service;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageBuilder;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Service;
import javax.annotation.Resource;
import java.util.Date;
@Service
@Slf4j
public class MessageService {
@Resource
private RabbitTemplate rabbitTemplate;
@Bean
public void sendMsg(){
try {
MessageProperties messageProperties = new MessageProperties();
//设置单条消息的过期时间,单位为毫秒,数据类型为字符串
messageProperties.setExpiration("20000");
Message message = MessageBuilder.withBody("hello world".getBytes()).andProperties(messageProperties).build();
rabbitTemplate.convertAndSend("exchange.normal.02","order",message);
}catch (Exception e){
e.printStackTrace();
log.info("消息发送失败:"+new Date());
}
log.info("消息发送完毕,发送时间是:"+new Date());
}
}
2.2.3、配置文件application.yml
server:
port: 8080
spring:
application:
name: dlx-test01
rabbitmq:
host: 你的服务器IP
port: 5672
username: 你的账号
password: 你的密码
virtual-host: power
my:
exchangeNormalName: exchange.normal.02 #正常交换机
queueNormalName: queue.normal.02 #正常队列,没有消费组,设置过期时间
exchangeDlxName: exchange.dlx.02 #死信交换机
queueDlxName: queue.dlx.02 #死信队列
2.2.4、启动类同上
2.2.5、配置文件同上
2.2.6、测试
启动程序发送消息
登录rabbitmq后台:
消息先进入正常队列queue.normal.02中,超时未消费,
消息超过过期时间,则进入queue.dlx.02死信队列
2.3、队列达到最大长度(先入队的消息会被发送到DLX)
2.3.1、配置类RabbitConfig(关键代码)
package com.power.config;
import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.HashMap;
import java.util.Map;
@Configuration
public class RabbitConfig {
@Value("${my.exchangeNormalName}")
private String exchangeNormalName;
@Value("${my.queueNormalName}")
private String queueNormalName;
@Value("${my.exchangeDlxName}")
private String exchangeDlxName;
@Value("${my.queueDlxName}")
private String queueDlxName;
/**
* 正常交换机
* @return
*/
@Bean
public DirectExchange normalExchange(){
return ExchangeBuilder.directExchange(exchangeNormalName).build();
}
/**
* 正常队列
* @return
*/
@Bean
public Queue normalQueue(){
Map<String, Object> arguments = new HashMap<>();
//设置队列的最大长度
arguments.put("x-max-length",5);
//重点:设置这两个参数
//设置队列的死信交换机
arguments.put("x-dead-letter-exchange",exchangeDlxName);
//设置死信路由key,要跟死信交换机和死信队列绑定的路由key一致
arguments.put("x-dead-letter-routing-key","error");
return QueueBuilder.durable(queueNormalName)
.withArguments(arguments) //设置队列的参数
.build();
}
/**
* 正常交换机和正常队列绑定
* @param normalExchange
* @param normalQueue
* @return
*/
@Bean
public Binding bingNormal(DirectExchange normalExchange,Queue normalQueue){
return BindingBuilder.bind(normalQueue).to(normalExchange).with("order");
}
/**
* 死信交换机
* @return
*/
@Bean
public DirectExchange dlxExchange(){
return ExchangeBuilder.directExchange(exchangeDlxName).build();
}
/**
* 死信队列
* @return
*/
@Bean
public Queue dlxQueue(){
return QueueBuilder.durable(queueDlxName).build();
}
/**
* 死信交换机和死信队列绑定
* @param dlxExchange
* @param dlxQueue
* @return
*/
@Bean
public Binding bindDlx(DirectExchange dlxExchange,Queue dlxQueue){
return BindingBuilder.bind(dlxQueue).to(dlxExchange).with("error");
}
}
2.3.2、业务类MessageService(关键代码)
package com.power.service;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageBuilder;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Service;
import javax.annotation.Resource;
import java.util.Date;
@Service
@Slf4j
public class MessageService {
@Resource
private RabbitTemplate rabbitTemplate;
@Bean
public void sendMsg(){
for (int i = 1; i < 8; i++) {
String msg = "hello world "+ i;
Message message = MessageBuilder.withBody(msg.getBytes()).build();
rabbitTemplate.convertAndSend("exchange.normal.03","order",message);
log.info("消息发送完毕,发送时间是:"+new Date());
}
}
}
2.3.3、配置文件application.yml
server:
port: 8080
spring:
application:
name: dlx-test01
rabbitmq:
host: 你的服务器IP
port: 5672
username: 你的账号
password: 你的密码
virtual-host: power
my:
exchangeNormalName: exchange.normal.03 #正常交换机
queueNormalName: queue.normal.03 #正常队列,没有消费组,设置过期时间
exchangeDlxName: exchange.dlx.03 #死信交换机
queueDlxName: queue.dlx.03 #死信队列
2.3.4、启动类同上
2.3.5、配置文件pom.xml同上
2.3.6、测试
启动项目,发送消息
登录rabbitmq后台:
两条消息进入死信队列
查看消息发现,是前两条消息进入了死信队列,
2.4、消费者拒绝消息不进行重新投递
消费者从正常的队列接收消息,但是消费者对消息不进行确认,并且不对消息进行重新投递,此时消息就进入死信队列。
2.4.1、生产者
2.4.1.1、生产者application.yml
server:
port: 8080
spring:
application:
name: dlx-test04
rabbitmq:
host: 你的服务器IP
port: 5672
username: 你的账号
password: 你的密码
virtual-host: power
my:
exchangeNormalName: exchange.normal.04 #正常交换机
queueNormalName: queue.normal.04 #正常队列,没有消费组,设置过期时间
exchangeDlxName: exchange.dlx.04 #死信交换机
queueDlxName: queue.dlx.04 #死信队列
2.4.1.2、生产者发送消息
package com.power.service;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageBuilder;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Service;
import javax.annotation.Resource;
import java.util.Date;
@Service
@Slf4j
public class MessageService {
@Resource
private RabbitTemplate rabbitTemplate;
@Bean
public void sendMsg(){
String msg = "hello world";
Message message = MessageBuilder.withBody(msg.getBytes()).build();
rabbitTemplate.convertAndSend("exchange.normal.04","order",message);
log.info("消息发送完毕,发送时间是:"+new Date());
}
}
2.4.1.3、生产者配置类
package com.power.config;
import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.HashMap;
import java.util.Map;
@Configuration
public class RabbitConfig {
@Value("${my.exchangeNormalName}")
private String exchangeNormalName;
@Value("${my.queueNormalName}")
private String queueNormalName;
@Value("${my.exchangeDlxName}")
private String exchangeDlxName;
@Value("${my.queueDlxName}")
private String queueDlxName;
/**
* 正常交换机
* @return
*/
@Bean
public DirectExchange normalExchange(){
return ExchangeBuilder.directExchange(exchangeNormalName).build();
}
/**
* 正常队列
* @return
*/
@Bean
public Queue normalQueue(){
Map<String, Object> arguments = new HashMap<>();
//重点:设置这两个参数
//设置队列的死信交换机
arguments.put("x-dead-letter-exchange",exchangeDlxName);
//设置死信路由key,要跟死信交换机和死信队列绑定的路由key一致
arguments.put("x-dead-letter-routing-key","error");
return QueueBuilder.durable(queueNormalName)
.withArguments(arguments) //设置队列的参数
.build();
}
/**
* 正常交换机和正常队列绑定
* @param normalExchange
* @param normalQueue
* @return
*/
@Bean
public Binding bingNormal(DirectExchange normalExchange,Queue normalQueue){
return BindingBuilder.bind(normalQueue).to(normalExchange).with("order");
}
/**
* 死信交换机
* @return
*/
@Bean
public DirectExchange dlxExchange(){
return ExchangeBuilder.directExchange(exchangeDlxName).build();
}
/**
* 死信队列
* @return
*/
@Bean
public Queue dlxQueue(){
return QueueBuilder.durable(queueDlxName).build();
}
/**
* 死信交换机和死信队列绑定
* @param dlxExchange
* @param dlxQueue
* @return
*/
@Bean
public Binding bindDlx(DirectExchange dlxExchange,Queue dlxQueue){
return BindingBuilder.bind(dlxQueue).to(dlxExchange).with("error");
}
}
2.4.2、消费者
2.4.2.1、消费者application.yml 启动手动确认
server:
port: 9090
spring:
application:
name: dlx04-receiver
rabbitmq:
host: 你的服务器IP
port: 5672
username: 你的账号
password: 你的密码
virtual-host: power
listener:
simple:
acknowledge-mode: manual
关键配置
2.4.2.2、消费者接收消息
package com.power.service;
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.io.IOException;
import java.util.Date;
@Component
@Slf4j
public class MessageReceive {
@RabbitListener(queues={"queue.normal.04"})
public void receiveMsg(Message message, Channel channel){
//获取消息属性
MessageProperties messageProperties = message.getMessageProperties();
//获取消息的唯一标识,类似学号和身份证号
long deliveryTag = messageProperties.getDeliveryTag();
try{
byte[] body = message.getBody();
String msg = new String(body);
log.info("监听到的消息是:"+msg+",接收的时间是:"+new Date());
//TODO 业务逻辑处理
int a=1/0;
//消费者的手动确认,false:只确认当前消息,true:批量确认
channel.basicAck(deliveryTag,false);
}catch (Exception e){
log.error("接收者出现问题:{}",e.getMessage());
try {
//消费者的手动不确认,参数3:是重新入队
//不会进入死信队列
// channel.basicNack(deliveryTag,false,true);
//消费者的手动不确认,参数3:false 不重新入队(不重新投递),就会变成死信
channel.basicNack(deliveryTag,false,false);
}catch (IOException ex){
throw new RuntimeException(ex);
}
}
}
}
关键代码
2.4.3、测试
启动生产者:发送消息
启动消费者:
因业务代码出错,程序处理异常,消息进入死信队列