1、产生死信消息的原因
当在消费消息时,如果队列里的消息出现以下情况,那么该消息将成为一条死信消息:
-
当一条消息被使用 channel.basicNack方法 或 channel.basicReject方法所nack响应 ,并且此时requeue 属性被设置为false。
-
消息在队列的存活时间超过设置的生存时间(TTL)时间。
-
消息队列的消息数量超过了设置的最大队列长度。
死信队列(DLQ)非常简单,就一个普通的队列,只不过它是和死信交换机绑定的而已,在声明队列的时候,通过x-dead-letter-exchange参数和x-dead-letter-routing-key指定死信交换机以及死信路由键即可。
-
参数dead-letter-exchange:指定死信交换机。
-
参数dead-letter-routing-key:指定死信路由键,用于绑定死信交换机和死信队列。
2、代码实现
pom.xml
plugins {
id 'java'
id 'org.springframework.boot' version '3.1.1'
id 'io.spring.dependency-management' version '1.1.0'
}
group = 'com.kexuexiong'
version = '0.0.1-SNAPSHOT'
java {
sourceCompatibility = '17'
}
configurations {
compileOnly {
extendsFrom annotationProcessor
}
}
repositories {
// mavenCentral()
maven {
url 'https://maven.aliyun.com/repository/public'
}
}
dependencies {
implementation 'org.springframework.boot:spring-boot-starter-jdbc'
implementation 'org.springframework.boot:spring-boot-starter-validation'
implementation 'org.springframework.boot:spring-boot-starter-web'
implementation 'org.mybatis.spring.boot:mybatis-spring-boot-starter:3.0.2'
compileOnly 'org.projectlombok:lombok'
runtimeOnly 'com.mysql:mysql-connector-j'
annotationProcessor 'org.projectlombok:lombok'
testImplementation 'org.springframework.boot:spring-boot-starter-test'
testImplementation 'org.mybatis.spring.boot:mybatis-spring-boot-starter-test:3.0.2'
// https://mvnrepository.com/artifact/org.springframework.boot/spring-boot-starter-amqp
implementation 'org.springframework.boot:spring-boot-starter-amqp'
implementation 'cn.hutool:hutool-all:5.8.16'
}
tasks.named('test') {
useJUnitPlatform()
}
yml配置文件:
搭建的是RabbitMQ集群:192.168.49.10:5672,192.168.49.9:5672,192.168.49.11:5672
详细搭建过程可以参考往期中的RabbitMQ集群搭建。
server:
port: 8014
spring:
rabbitmq:
username: admin
password: 123456
dynamic: true
# port: 5672
# host: 192.168.49.9
addresses: 192.168.49.10:5672,192.168.49.9:5672,192.168.49.11:5672
publisher-confirm-type: correlated
publisher-returns: true
application:
name: shushan
datasource:
driver-class-name: com.mysql.cj.jdbc.Driver
url: jdbc:mysql://ip/shushan
username: root
password:
hikari:
minimum-idle: 10
maximum-pool-size: 20
idle-timeout: 50000
max-lifetime: 540000
connection-test-query: select 1
connection-timeout: 600000
常量文件:
package com.kexuexiong.shushan.common.mq;
import javax.swing.plaf.PanelUI;
public class MqConstant {
public final static String demoDirectQueue = "demoDirectQueue";
public final static String demoDirectExchange = "demoDirectExchange";
public final static String demoDirectRouting = "demoDirectRouting";
public final static String lonelyDirectExchange = "lonelyDirectExchange";
public final static String topicExchange = "topicExchange";
public final static String BIG_CAR_TOPIC = "topic.big_car";
public final static String SMALL_CAR_TOPIC = "topic.small_car";
public final static String TOPIC_ALL = "topic.#";
public final static String FANOUT_A = "fanout.A";
public final static String FANOUT_B = "fanout_B";
public final static String FANOUT_C = "fanout_c";
public final static String FANOUT_EXCHANGE = "fanoutExchange";
public final static String DEAD_LETTER_EXCHANGE = "dead.letter.exchange";
public final static String DEAD_LETTER_QUEUE = "dead.letter.queue";
public final static String DEAD_LETTER_ROUTING_KEY = "dead.letter.routing.key";
public final static String BUSINESS_QUEUE = "business.queue";
public final static String BUSINESS_ROUTING_KEY = "business.routing.key";
public final static String BUSINESS_EXCHANGE = "business.exchange";
}
RabbitMQ配置文件:
package com.kexuexiong.shushan.common.mq;
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.HashMap;
@Configuration
public class DirectDLXRabbitConfig {
@Bean
public Queue businessDirectQueue() {
HashMap<String, Object> args = new HashMap<>();
args.put("x-dead-letter-exchange",MqConstant.DEAD_LETTER_EXCHANGE);//设置死信交换机
args.put("x-dead-letter-routing-key",MqConstant.DEAD_LETTER_ROUTING_KEY);
return QueueBuilder.durable(MqConstant.BUSINESS_QUEUE).withArguments(args).build();
}
@Bean
DirectExchange businessDirectExchange() {
return new DirectExchange(MqConstant.BUSINESS_EXCHANGE, true, false);
}
@Bean
DirectExchange deadLetterDirectExchange() {
return new DirectExchange(MqConstant.DEAD_LETTER_EXCHANGE, true, false);
}
@Bean
public Queue deadLetterDirectQueue() {
return new Queue(MqConstant.DEAD_LETTER_QUEUE, true, false, false);
}
@Bean
Binding deadLetterBingingDirect() {
return BindingBuilder.bind(deadLetterDirectQueue()).to(deadLetterDirectExchange()).with(MqConstant.DEAD_LETTER_ROUTING_KEY);
}
@Bean
Binding businessBingingDirect() {
return BindingBuilder.bind(businessDirectQueue()).to(businessDirectExchange()).with(MqConstant.BUSINESS_ROUTING_KEY);
}
}
队列绑定死信交换机:
生产者:
package com.kexuexiong.shushan.controller.mq;
import cn.hutool.core.date.DateUtil;
import cn.hutool.core.util.RandomUtil;
import com.kexuexiong.shushan.common.mq.MqConstant;
import com.kexuexiong.shushan.controller.BaseController;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.MessagePostProcessor;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import java.util.Date;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
@Slf4j
@RestController
@RequestMapping("/mq/")
public class MqController extends BaseController {
@Autowired
RabbitTemplate rabbitTemplate;
@GetMapping("/deadLetterSendDirectMessage")
public String deadLetterSendDirectMessage(){
String msgId = UUID.randomUUID().toString();
String msg = "demo msg ,dead letter!!";
String createTime = DateUtil.format(new Date(),"YYYY-MM-dd HH:mm:ss");
Map<String,Object> map = new HashMap();
map.put("msgId",msgId);
map.put("msg",msg);
map.put("createTime",createTime);
rabbitTemplate.convertAndSend(MqConstant.BUSINESS_EXCHANGE,MqConstant.BUSINESS_ROUTING_KEY,map);
return "ok";
}
@GetMapping("/deadLetterTimeOutSendDirectMessage")
public String deadLetterTimeOutSendDirectMessage(){
String msgId = UUID.randomUUID().toString();
String msg = "demo msg ,dead letter!!";
String createTime = DateUtil.format(new Date(),"YYYY-MM-dd HH:mm:ss");
log.info("msg time :" +createTime);
Map<String,Object> map = new HashMap();
map.put("msgId",msgId);
map.put("msg",msg);
map.put("createTime",createTime);
Integer randomInt = RandomUtil.randomInt(30000);
map.put("randomTime",randomInt);
MessagePostProcessor messagePostProcessor = message -> {
// 设置消息过期时间为5秒
message.getMessageProperties().setExpiration("30000");
return message;
};
rabbitTemplate.convertAndSend(MqConstant.BUSINESS_EXCHANGE,MqConstant.BUSINESS_ROUTING_KEY,map,messagePostProcessor);
return "ok";
}
}
消费者:
package com.kexuexiong.shushan.common.mq;
import org.springframework.amqp.core.AcknowledgeMode;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class MessageListenerConfig {
@Autowired
private CachingConnectionFactory connectionFactory;
@Autowired
private AckReceiver ackReceiver;
@Bean
public SimpleMessageListenerContainer simpleMessageListenerContainer() {
SimpleMessageListenerContainer simpleMessageListenerContainer = new SimpleMessageListenerContainer(connectionFactory);
simpleMessageListenerContainer.setConcurrentConsumers(2);
simpleMessageListenerContainer.setMaxConcurrentConsumers(2);
simpleMessageListenerContainer.setAcknowledgeMode(AcknowledgeMode.MANUAL);
//,MqConstant.demoDirectQueue, MqConstant.FANOUT_A, MqConstant.BIG_CAR_TOPIC
simpleMessageListenerContainer.setQueueNames(MqConstant.DEAD_LETTER_QUEUE);
simpleMessageListenerContainer.setMessageListener(ackReceiver);
return simpleMessageListenerContainer;
}
}
AckReceiver:
package com.kexuexiong.shushan.common.mq;
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.listener.api.ChannelAwareMessageListener;
import org.springframework.stereotype.Component;
import java.io.ByteArrayInputStream;
import java.io.ObjectInputStream;
import java.util.Map;
import java.util.Objects;
@Slf4j
@Component
public class AckReceiver implements ChannelAwareMessageListener {
@Override
public void onMessage(Message message, Channel channel) throws Exception {
long deliveryTag = message.getMessageProperties().getDeliveryTag();
byte[] messageBody = message.getBody();
try (ObjectInputStream inputStream = new ObjectInputStream(new ByteArrayInputStream(messageBody));) {
Map<String, String> msg = (Map<String, String>) inputStream.readObject();
log.info(message.getMessageProperties().getConsumerQueue()+"-ack Receiver :" + msg);
log.info("header msg :"+message.getMessageProperties().getHeaders());
if(Objects.equals(message.getMessageProperties().getConsumerQueue(),MqConstant.BUSINESS_QUEUE)){
//拒绝
channel.basicNack(deliveryTag,false,false);
}else if(Objects.equals(message.getMessageProperties().getConsumerQueue(),MqConstant.DEAD_LETTER_QUEUE)){
channel.basicAck(deliveryTag, true);
}else {
channel.basicAck(deliveryTag, true);
}
} catch (Exception e) {
channel.basicReject(deliveryTag, false);
log.error(e.getMessage());
}
}
}
测试:
1、当一条消息被使用 channel.basicNack方法 或 channel.basicReject方法所nack响应 ,并且此时requeue 属性被设置为false
2023-10-11T16:18:13.124+08:00 INFO 28104 --- [enerContainer-2] c.k.shushan.common.mq.AckReceiver : business.queue-ack Receiver :{msg=demo msg ,dead letter!!, createTime=2023-10-11 16:18:13, msgId=9b31b4b3-c58f-47bd-8b27-cac4f53ae120}
2023-10-11T16:18:13.125+08:00 INFO 28104 --- [enerContainer-2] c.k.shushan.common.mq.AckReceiver : header msg :{spring_listener_return_correlation=995de3d2-d5a8-42fe-91e6-992fd485d20d}
2023-10-11T16:18:13.125+08:00 INFO 28104 --- [nectionFactory1] c.k.shushan.common.config.RabbitConfig : confirmCallback data: null
2023-10-11T16:18:13.125+08:00 INFO 28104 --- [nectionFactory1] c.k.shushan.common.config.RabbitConfig : confirmCallback ack :true
2023-10-11T16:18:13.125+08:00 INFO 28104 --- [nectionFactory1] c.k.shushan.common.config.RabbitConfig : confirmCallback cause :null
2023-10-11T16:18:13.127+08:00 INFO 28104 --- [enerContainer-1] c.k.shushan.common.mq.AckReceiver : dead.letter.queue-ack Receiver :{msg=demo msg ,dead letter!!, createTime=2023-10-11 16:18:13, msgId=9b31b4b3-c58f-47bd-8b27-cac4f53ae120}
2023-10-11T16:18:13.127+08:00 INFO 28104 --- [enerContainer-1] c.k.shushan.common.mq.AckReceiver : header msg :{spring_listener_return_correlation=995de3d2-d5a8-42fe-91e6-992fd485d20d, x-first-death-exchange=business.exchange, x-death=[{reason=rejected, count=1, exchange=business.exchange, time=Wed Oct 11 16:18:14 CST 2023, routing-keys=[business.routing.key], queue=business.queue}], x-first-death-reason=rejected, x-first-death-queue=business.queue}
if(Objects.equals(message.getMessageProperties().getConsumerQueue(),MqConstant.BUSINESS_QUEUE)){
//拒绝
channel.basicNack(deliveryTag,false,false);
}
消费者代码中拒绝消费,然后消息被路由到死信队列中。
{spring_listener_return_correlation=995de3d2-d5a8-42fe-91e6-992fd485d20d, x-first-death-exchange=business.exchange, x-death=[{reason=rejected, count=1, exchange=business.exchange, time=Wed Oct 11 16:18:14 CST 2023, routing-keys=[business.routing.key], queue=business.queue}], x-first-death-reason=rejected, x-first-death-queue=business.queue}
reason为rejected,并记录原交换机 exchange=business.exchange
。
2、消息在队列的存活时间超过设置的生存时间(TTL)时间
将消费者中的MqConstant.BUSINESS_QUEUE
去掉,后测试。
设置消息过期时间可以在队列中设置:增加参数x-message-ttl
;
@Bean("businessQueue")
public Queue businessQueue() {
Map<String, Object> args = new HashMap<>(16);
// 设置当前队列绑定的死信交换机
args.put("x-dead-letter-exchange", DEAD_LETTER_EXCHANGE);
// 设置当前队列的死信路由key
args.put("x-dead-letter-routing-key", DEAD_LETTER_ROUTING_KEY);
// 设置消息的过期时间 单位:ms(毫秒)
args.put("x-message-ttl", 5000);
return QueueBuilder.durable(BUSINESS_QUEUE).withArguments(args).build();
}
也可以针对每个消息进行设置过期时间:
@GetMapping("/deadLetterTimeOutSendDirectMessage")
public String deadLetterTimeOutSendDirectMessage(){
String msgId = UUID.randomUUID().toString();
String msg = "demo msg ,dead letter!!";
String createTime = DateUtil.format(new Date(),"YYYY-MM-dd HH:mm:ss");
log.info("msg time :" +createTime);
Map<String,Object> map = new HashMap();
map.put("msgId",msgId);
map.put("msg",msg);
map.put("createTime",createTime);
Integer randomInt = RandomUtil.randomInt(30000);
map.put("randomTime",randomInt);
MessagePostProcessor messagePostProcessor = message -> {
// 设置消息过期时间为5秒
message.getMessageProperties().setExpiration("30000");
return message;
};
rabbitTemplate.convertAndSend(MqConstant.BUSINESS_EXCHANGE,MqConstant.BUSINESS_ROUTING_KEY,map,messagePostProcessor);
return "ok";
}
测试:
2023-10-11T16:28:49.052+08:00 INFO 25848 --- [nio-8014-exec-1] o.a.c.c.C.[Tomcat].[localhost].[/] : Initializing Spring DispatcherServlet 'dispatcherServlet'
2023-10-11T16:28:49.052+08:00 INFO 25848 --- [nio-8014-exec-1] o.s.web.servlet.DispatcherServlet : Initializing Servlet 'dispatcherServlet'
2023-10-11T16:28:49.053+08:00 INFO 25848 --- [nio-8014-exec-1] o.s.web.servlet.DispatcherServlet : Completed initialization in 1 ms
2023-10-11T16:28:49.099+08:00 INFO 25848 --- [nio-8014-exec-1] c.k.shushan.controller.mq.MqController : msg time :2023-10-11 16:28:49
2023-10-11T16:28:49.120+08:00 INFO 25848 --- [nectionFactory1] c.k.shushan.common.config.RabbitConfig : confirmCallback data: null
2023-10-11T16:28:49.121+08:00 INFO 25848 --- [nectionFactory1] c.k.shushan.common.config.RabbitConfig : confirmCallback ack :true
2023-10-11T16:28:49.121+08:00 INFO 25848 --- [nectionFactory1] c.k.shushan.common.config.RabbitConfig : confirmCallback cause :null
2023-10-11T16:29:19.123+08:00 INFO 25848 --- [enerContainer-1] c.k.shushan.common.mq.AckReceiver : dead.letter.queue-ack Receiver :{msg=demo msg ,dead letter!!, randomTime=22235, createTime=2023-10-11 16:28:49, msgId=4793dd5c-70c2-4c2f-a6ae-cdc6663e0b05}
2023-10-11T16:29:19.123+08:00 INFO 25848 --- [enerContainer-1] c.k.shushan.common.mq.AckReceiver : header msg :{spring_listener_return_correlation=22fdc49f-30da-4524-9fc8-d4f69eb4c2c3, x-first-death-exchange=business.exchange, x-death=[{reason=expired, original-expiration=30000, count=1, exchange=business.exchange, time=Wed Oct 11 16:29:20 CST 2023, routing-keys=[business.routing.key], queue=business.queue}], x-first-death-reason=expired, x-first-death-queue=business.queue}
30秒过期,进入死信队列,然后被消费。
3、 消息队列的消息数量超过了设置的最大队列长度
修改RabbitMQ配置文件,创建businessDirectQueue
时增加x-max-length
设置容量的参数。
package com.kexuexiong.shushan.common.mq;
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.HashMap;
@Configuration
public class DirectDLXRabbitConfig {
@Bean
public Queue businessDirectQueue() {
HashMap<String, Object> args = new HashMap<>();
args.put("x-dead-letter-exchange",MqConstant.DEAD_LETTER_EXCHANGE);
args.put("x-dead-letter-routing-key",MqConstant.DEAD_LETTER_ROUTING_KEY);
args.put("x-max-length", 3);
return QueueBuilder.durable(MqConstant.BUSINESS_QUEUE).withArguments(args).build();
}
@Bean
DirectExchange businessDirectExchange() {
return new DirectExchange(MqConstant.BUSINESS_EXCHANGE, true, false);
}
@Bean
DirectExchange deadLetterDirectExchange() {
return new DirectExchange(MqConstant.DEAD_LETTER_EXCHANGE, true, false);
}
@Bean
public Queue deadLetterDirectQueue() {
return new Queue(MqConstant.DEAD_LETTER_QUEUE, true, false, false);
}
@Bean
Binding deadLetterBingingDirect() {
return BindingBuilder.bind(deadLetterDirectQueue()).to(deadLetterDirectExchange()).with(MqConstant.DEAD_LETTER_ROUTING_KEY);
}
@Bean
Binding businessBingingDirect() {
return BindingBuilder.bind(businessDirectQueue()).to(businessDirectExchange()).with(MqConstant.BUSINESS_ROUTING_KEY);
}
}
测试:
第三次请求的结果:
第四次请求结果:
控制台输出,死信队列接收到消息。
2023-10-11T16:46:54.783+08:00 INFO 20444 --- [nectionFactory2] c.k.shushan.common.config.RabbitConfig : confirmCallback data: null
2023-10-11T16:46:54.783+08:00 INFO 20444 --- [nectionFactory2] c.k.shushan.common.config.RabbitConfig : confirmCallback ack :true
2023-10-11T16:46:54.783+08:00 INFO 20444 --- [nectionFactory2] c.k.shushan.common.config.RabbitConfig : confirmCallback cause :null
2023-10-11T16:46:54.787+08:00 INFO 20444 --- [enerContainer-1] c.k.shushan.common.mq.AckReceiver : dead.letter.queue-ack Receiver :{msg=demo msg ,dead letter!!, createTime=2023-10-11 16:44:35, msgId=db7d2d7f-12a0-4ca4-9e92-81758cda436e}
2023-10-11T16:46:54.787+08:00 INFO 20444 --- [enerContainer-1] c.k.shushan.common.mq.AckReceiver : header msg :{spring_listener_return_correlation=e2a1871a-7765-43ee-b44c-d80d0ac6323c, x-first-death-exchange=business.exchange, x-death=[{reason=maxlen, count=1, exchange=business.exchange, time=Wed Oct 11 16:46:56 CST 2023, routing-keys=[business.routing.key], queue=business.queue}], x-first-death-reason=maxlen, x-first-death-queue=business.queue}
到这里三种情况都介绍完了,总体来讲RabbitMQ的死信队列还是很简单的。但是它作用还是很强大的,可以用于实现延时消息,订单到时取消等。