SpringBoot RabbitMQ 死信队列

news2025/1/18 16:53:54

1. 死信定义

无法被消费的消息,称为死信

如果死信一直留在队列中,会导致一直被消费,却从不消费成功,专门有一个存放死信的队列,称为死信队列(DDX, dead-letter-exchange)。

死信队列

DLX,Dead Letter Exchange的缩写,又死信邮箱、死信交换机。其实DLX就是一个普通的交换机,和一般的交换机没有任何区别。当消息在一个队列中变成死信(dead message)时,通过这个交换机将死信发送到死信队列中(指定好相关参数,RabbitMQ会自动发送)。

死信的几种来源:

  • 消息TTL过期(time to live,存活时间,可以用在限时支付消息)

  • 队列达到最大长度(队列满了,无法路由到该队列)

  • 消息被拒绝(basic.reject/basic.nack),并且requeue = false

e6fbecb89282f17480d6ed7048178fe3.png

2. 创建项目

  • pom.xml配置如下

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
 <modelVersion>4.0.0</modelVersion>
 <groupId>com.olive</groupId>
 <artifactId>rabbitmq-spring-demo</artifactId>
 <version>0.0.1-SNAPSHOT</version>
 <parent>
  <groupId>org.springframework.boot</groupId>
  <artifactId>spring-boot-starter-parent</artifactId>
  <version>2.7.7</version>
  <relativePath />
 </parent>
 <dependencies>
  <!--rabbitmq-->
  <dependency>
   <groupId>org.springframework.boot</groupId>
   <artifactId>spring-boot-starter-amqp</artifactId>
  </dependency>
  <dependency>
   <groupId>org.springframework.boot</groupId>
   <artifactId>spring-boot-starter-web</artifactId>
  </dependency>
  <dependency>
   <groupId>org.springframework.boot</groupId>
   <artifactId>spring-boot-starter-test</artifactId>
   <scope>test</scope>
  </dependency>
 </dependencies>
 <build>
  <plugins>
   <plugin>
    <groupId>org.apache.maven.plugins</groupId>
    <artifactId>maven-compiler-plugin</artifactId>
    <configuration>
     <source>1.8</source>
     <target>1.8</target>
    </configuration>
   </plugin>
  </plugins>
 </build>
</project>
  • application.yml

server:
  port: 8080
spring:
  #给项目来个名字
  application:
    name: rabbitmq-spring-demo
  #配置rabbitMq 服务器
  rabbitmq:
    host: 127.0.0.1
    port: 5672
    username: admin
    password: admin123
    #虚拟host。可以不设置,使用server默认host;不同虚拟路径下的队列是隔离的
    virtual-host: /
  • 准备MQ的队列和环境场景

正常交换机

  1. 正常队列(最长队列5);正常消费者,拒绝消息

  2. tt队列(过期时间60秒);没有消费者

死信交换机

  1. 死信队列

package com.olive.config;

import java.util.HashMap;
import java.util.Map;

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class DeadConfig {

 /************ 正常配置 ******************/
 /**
  * 正常交换机,开启持久化
  */
 @Bean
 DirectExchange normalExchange() {
  return new DirectExchange("normalExchange", true, false);
 }

 @Bean
 public Queue normalQueue() {
  // durable: 是否持久化,默认是false,持久化队列:会被存储在磁盘上,当消息代理重启时仍然存在,暂存队列:当前连接有效
  // exclusive: 默认也是false,只能被当前创建的连接使用,而且当连接关闭后队列即被删除。此参考优先级高于durable
  // autoDelete: 是否自动删除,当没有生产者或者消费者使用此队列,该队列会自动删除。
  Map<String, Object> args = deadQueueArgs();
  // 队列设置最大长度
  args.put("x-max-length", 5);
  return new Queue("normalQueue", true, false, false, args);
 }

 @Bean
 public Queue ttlQueue() {
  Map<String, Object> args = deadQueueArgs();
  // 队列设置消息过期时间 60 秒
  args.put("x-message-ttl", 60 * 1000);
  return new Queue("ttlQueue", true, false, false, args);
 }

 @Bean
 Binding normalRouteBinding() {
  return BindingBuilder.bind(normalQueue())
    .to(normalExchange())
    .with("normalRouting");
 }

 @Bean
 Binding ttlRouteBinding() {
  return BindingBuilder.bind(ttlQueue())
    .to(normalExchange())
    .with("ttlRouting");
 }

 /**************** 死信配置 *****************/
 /**
  * 死信交换机
  */
 @Bean
 DirectExchange deadExchange() {
  return new DirectExchange("deadExchange", true, false);
 }

 /**
  * 死信队列
  */
 @Bean
 public Queue deadQueue() {
  return new Queue("deadQueue", true, false, false);
 }

 @Bean
 Binding deadRouteBinding() {
  return BindingBuilder.bind(deadQueue())
    .to(deadExchange())
    .with("deadRouting");
 }

 /**
  * 转发到 死信队列,配置参数
  */
 private Map<String, Object> deadQueueArgs() {
  Map<String, Object> map = new HashMap<>();
  // 绑定该队列到死信交换机
  map.put("x-dead-letter-exchange", "deadExchange");
  map.put("x-dead-letter-routing-key", "deadRouting");
  return map;
 }

}

arguments参数说明:

80a0b2363e4f3b6f1acf1a7cbb0ee004.png
  1. Auto expire: 队列在被自动删除之前可以使用多长时间(毫秒)。(x-expires参数)

  2. Message TTL: 发布到队列的消息在被丢弃之前可以存在多长时间(毫秒)。(x-message-ttl参数)

  3. Overflow behaviour: 设置队列溢出行为。这决定了当达到队列的最大长度时消息会发生什么。有效值为drop-head(删除queue头部的消息)、reject-publish(最近发来的消息将被丢弃)或reject-publish-dlx(拒绝发送消息到死信交换器)。仲裁队列类型只支持drop-head和拒绝-发布。(x-overflow参数)

  4. Single active consumer: 如果设置,确保每次只从队列中使用一个使用者,并在活动使用者被取消或死x-dead-letter-exchange亡的情况下故障转移到另一个注册使用者。(x-single-active-consumer参数)

  5. Dead letter exchange: 一个可选的死信交换机,如果消息被拒绝或过期,将重新发布到死信交换机。(x-dead-letter-exchange参数)

  6. Dead letter routing key: 当消息是死信时使用的可选替换路由键。如果未设置此值,则将使用消息的原始路由键。(x-dead-letter-routing-key参数)

  7. Max length: 一个队列在开始从头中丢弃消息之前可以包含多少(准备好的)消息。(x-max-length参数)

  8. Max length bytes: 队列在开始从头部丢弃消息之前所能包含的就绪消息的总正文大小。(x-max-length-bytes参数)

  9. Leader locator:将队列设置为主位置模式,确定在节点集群上声明时队列主机所在的规则。(x-queue-leader-locator参数)

3. 队列达到的最大长度

消息没有消费者;调用6次正常队列的消息生产方法;消息数量超过队列长度。

package com.olive.controller;

import java.util.HashMap;
import java.util.Map;
import java.util.UUID;

import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;

@RestController
public class TestController {

 @Autowired
 private RabbitTemplate rabbitTemplate;

 /**
  * 正常消息队列,队列最大长度5
  */
 @GetMapping("/normalQueue")
 public String normalQueue() {
  Map<String, Object> map = new HashMap<>();
  map.put("messageId", String.valueOf(UUID.randomUUID()));
  map.put("data", System.currentTimeMillis() + ", 正常队列消息,最大长度5");
  rabbitTemplate.convertAndSend("normalExchange", "normalRouting", map, new CorrelationData());
  return "success";
 }
}

访问6次,发送6条消息

http://127.0.0.1:8080/normalQueue

从RabbitMQ管理后台查看结果:

8acfb18d434e1b8b5c8fb80dd4ecfcce.png

4. 消息TTL过期

消息的TTL指的是消息存活时间,可以通过设置消息TTL或者队列的TTL来实现。

  • 消息的TTL: 对于设置了过期时间属性(expiration)的消息,消息如果在过期时间内没被消费,会过期。

  • 队列的TTL: 对于设置了过期时间属性(x-message-ttl)的队列,所有路由到这个队列的消息,都会设置上这个过期时间。

使用者两种配置都可以,一般都用在定时任务,限时支付这种场景。

/**
 * 消息 TTL, time to live
 */
 @GetMapping("/ttlToDead")
 public String ttlToDead() {
  Map<String, Object> map = new HashMap<>();
  map.put("messageId", String.valueOf(UUID.randomUUID()));
  map.put("data", System.currentTimeMillis() + ", ttl队列消息");
  rabbitTemplate.convertAndSend("normalExchange", "ttlRouting", map, new CorrelationData());
  return "success";
 }

访问6次,发送6条消息

http://127.0.0.1:8080/ttlToDead

从RabbitMQ管理后台查看结果,发送消息后

38723c8a652ff88eacc56175469a20e2.png

从RabbitMQ管理后台查看结果,等待消息过期后

699992d81b48166a3a1447d1ebe803d3.png

建议在项目中尽量使用消息TTL,不使用队列TTL

5.拒绝消息

正常队列消费后拒绝消息,并且不进行重新入队

package com.olive.config;

import java.io.IOException;
import java.util.Map;

import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

import com.rabbitmq.client.Channel;

@Component
@RabbitListener(queues = "normalQueue")
public class NormalConsumer {
 
 @RabbitHandler
 public void process(Map<String, Object> message, Channel channel, Message mqMsg) 
   throws IOException {
  System.out.println("收到消息,并拒绝重新入队:" + message.toString());
  channel.basicReject(mqMsg.getMessageProperties().getDeliveryTag(), false);
 }
 
}

从RabbitMQ管理后台查看结果

133ecf5bc545c3036c838a94d632de59.png

6.死信队列消息消费

package com.olive.config;

import java.io.IOException;
import java.util.Map;

import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

import com.rabbitmq.client.Channel;

@Component
@RabbitListener(queues = "deadQueue")
public class DeadConsumer {
 
 @RabbitHandler
 public void process(Map<String, Object> message, Channel channel, Message mqMsg) 
   throws IOException {
  System.out.println("死信队列收到消息:" + message.toString());
  channel.basicAck(mqMsg.getMessageProperties().getDeliveryTag(), false);
 }
}

从RabbitMQ管理后台查看结果,死信队列消息被完全消费

187abb4bd86c3a318929ab37bce764b1.png

c8d288295f0d6c0ccd9395f2cdff50c9.gif

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/470359.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

火山引擎 BVE 视频图片硬件编码器演进之路

动手点关注 干货不迷路 前言 近日&#xff0c;第 17 届世界编码器大赛 MSU 2022 公布硬件编码器比赛结果&#xff0c;在 60 fps&#xff08;帧率&#xff09;的超快视频编码赛道上&#xff0c;火山引擎多媒体实验室自主研发的 BVE 1.1 编码器表现突出&#xff0c;荣获最佳 FPGA…

计算机网络学习06(HTTP1.0 vs HTTP1.1)

1、响应状态码 HTTP/1.0仅定义了16种状态码。HTTP/1.1中新加入了大量的状态码&#xff0c;光是错误响应状态码就新增了24种。比如说&#xff0c;100 (Continue)——在请求大资源前的预热请求&#xff0c;206 (Partial Content)——范围请求的标识码&#xff0c;409 (Conflict)…

【C++】priority_queue使用和模拟实现——仿函数

文章目录 1. priority_queue的使用1.priority_queue的介绍2.priority_queue的结构3. 主要接口4. 使用示例 2. 仿函数1. 仿函数的概念2.尝试实现仿函数 3.priority_queue的模拟实现1.priority_queue的结构2. 接口实现1.向下调整算法2. 向上调整算法3.构造函数4.修改数据5.获取数…

机器学习 -Statsmodels

机器学习记录 Statsmodels 用于探索数据, 估计模型, 并运行统计检验. conda install -y statsmodels线性回归 import numpy as np import pandas as pd import matplotlib.pyplot as plt import statsmodels.api as sm import statsmodels.datasets.utils as du import sea…

数据结构【二】:霍夫曼编码

霍夫曼编码&#xff08;Huffman Coding&#xff09;是可变长编码&#xff08;VLC&#xff09;的一种。本质上使用变长编码表对源符号进行编码&#xff0c;通过评估源符号出现概率的方法进行分类&#xff0c;将出现几率较高的源字符使用较短的编码&#xff0c;出现几率较低的源字…

Hive优化补充

目录 一、表设计优化 1.通过设计分区表&#xff0c;增加动态分区&#xff0c;查询时避免全表扫描 2.设计分桶表&#xff1a;适用于大表join大表的情况 最后&#xff0c;两张大表进行join转为两张分桶表进行join&#xff1a; 二、文件存储 1.文件格式-概述 2.文件格式——…

学系统集成项目管理工程师(中项)系列13b_人力资源管理(下)

1. 项目团队建设 1.1. 塔克曼(Tuckman)阶梯理论 1.2. 理论基础 1.2.1. 激励理论 1.2.1.1. 马斯洛需要层次理论 1.2.1.1.1. 生理需要 1.2.1.1.2. 安全需要 1.2.1.1.3. 社会交往的需要 1.2.1.1.4. 自尊的需要 1.2.1.1.5. 自我实现的需要 1.2.1.2. 赫茨伯格的双因素理论…

Leetcode力扣秋招刷题路-0802

从0开始的秋招刷题路&#xff0c;记录下所刷每道题的题解&#xff0c;帮助自己回顾总结 802. 找到最终的安全状态 有一个有 n 个节点的有向图&#xff0c;节点按 0 到 n - 1 编号。图由一个 索引从 0 开始 的 2D 整数数组 graph表示&#xff0c; graph[i]是与节点 i 相邻的节…

Git HEAD及detached head

背景&#xff1a;最近在使用git checkout重置HEAD指向&#xff0c;偶尔会出现Detached HEAD提示&#xff0c;于是想探究一下具体的原理及过程&#xff0c;遂写下了这篇文章。一般checkout用于切换分支和检出历史的某个节点&#xff0c;或恢复工作区的文件&#xff0c;这三个功能…

OpenHarmony JS Demo开发讲解

项目结构 打开entry→src→main→js&#xff0c;工程的开发目录如图所示 其中&#xff0c; i18n文件夹&#xff1a;用于存放配置不同语言场景的资源&#xff0c;比如应用文本词条&#xff0c;图片路径等资源。en-US.json文件定义了在英文模式下页面显示的变量内容&#xff0c…

Java 泛型为什么设计成是可以擦除的

Java 泛型是 Java 5 引入的一种类型安全的编程机制&#xff0c;它允许在编译时指定泛型类型参数&#xff0c;从而提高代码的类型安全性和可读性。然而&#xff0c;Java 泛型的实现方式是通过类型擦除来实现的&#xff0c;这也引发了一些争议。本文将介绍 Java 泛型为什么设计成…

2023年某科技公司前端开发初级岗的面试笔试真题(含选择题答案、问答题解析、机试题源码)

📚关于该专栏: 该专栏的发布内容是前端面试中笔试部分真题、答卷类、机试等等的题目,题目类型包括逻辑题、算法题、选择题、问答题等等,除了内容的分享,还有解析和答案。真实来自某些互联网公司,坐标广东广州。 🔥🔥🔥 持 续 更 新 🔥🔥🔥 😉专栏博主: 黛…

HCIP-7.1交换机ARP、VLAN之间的三层通信技术学习

交换机ARP、VLAN之间的三层通信技术学习 1、ARP1.1、 地址解析过程1.2、ARP报文格式1.3、ARP表项1.4、免费ARP1.5、 VLAN间ARP代理1.5.1、解决同网段&#xff0c;不同广播域内主机互通问题&#xff1b;1.5.2、解决同网段&#xff0c;不同VLAN之间主机互通问题。1.5.3、解决同网…

Ignore insecure directories and continue [y] or abort compinit [n]?

问题&#xff1a; 在Mac终端中使用Zsh作为默认shell时&#xff0c;有时会弹出以下提示信息&#xff1a; Ignore insecure directories and continue [y] or abort compinit [n]? 这个提示出现的原因是因为Zsh在加载时会检查所有的目录是否安全&#xff0c;并拒绝加载不安全的…

【LeetCode: 62. 不同路径 | 暴力递归=>记忆化搜索=>动态规划 】

&#x1f680; 算法题 &#x1f680; &#x1f332; 算法刷题专栏 | 面试必备算法 | 面试高频算法 &#x1f340; &#x1f332; 越难的东西,越要努力坚持&#xff0c;因为它具有很高的价值&#xff0c;算法就是这样✨ &#x1f332; 作者简介&#xff1a;硕风和炜&#xff0c;…

卡尔曼滤波器简介——概述

关于卡尔曼滤波器 大多数现代系统都有许多传感器&#xff0c;可以根据一系列测量来估计隐藏&#xff08;未知&#xff09;状态。例如&#xff0c;GPS接收器提供位置和速度估计&#xff0c;其中位置和速度是隐藏状态&#xff0c;卫星信号到达的差分时间是测量值。 跟踪和控制系统…

ChatGPT的进化版?AutoGPT怎么用

AutoGPT是什么 首选给大家介绍&#xff0c;ChatGPT与AutoGPT的区别 目前AutoGPT被称为最接近AGI的人工智能&#xff0c;它是ChatGPT的进化版&#xff1f; “ChatGPT” 只能提供2021年9月之前的信息&#xff0c;所以你问它告诉我今天的天气&#xff0c;它回答不了 “AutoGPT” …

AutoGPT不靠谱,微软推出升级版!可编辑自主规划过程

夕小瑶科技说 原创 作者 | iven 火遍全网的AutoGPT[1]在Github收藏量突破十万。这种自我规划、自我执行的智能体首次关注人工智能模型内部的自我调整与优化。 但是有不少网友发现&#xff0c;AutoGPT的表现不稳定&#xff0c;死循环是最常见的现象。此外&#xff0c;AutoGPT执…

输入指令为±10V或4~20mA型伺服阀控制器

工作电压 19~35 VDC&#xff08;常规24VDC&#xff09; 最大功率消耗 &#xff1c;25VA 空载电流 ≤100mA&#xff08;24V&#xff09; 差分信号输入 0~10 V&#xff0c;输入阻抗≥100KΩ 4~20 mA&#xff0c;输入阻抗100Ω &#xff08;出厂前需指定&#xff0c;现场不可…

免费的ERP系统哪个好?这款让管理更高效

阅读本文你将了解&#xff1a;ERP是什么&#xff1f;解决什么问题&#xff1f;ERP选型的参考维度&#xff1f;零代码ERP系统解决哪些场景问题&#xff1f; 题目提到“免费”&#xff0c;其实很难有软件可以真正做到。 商业化市场决定了没有一家厂商可以不落俗套。因而我们要探…