RabbitMQ学习-死信队列

news2025/1/11 5:16:32

死信队列

背:就是三种情况导致消息无法消费就是死信,然后就会转到死信交换机中,死信交换机发送到死信队列中,然后创建个消费者消费死信队列中的东西,再没什么哈哈

死信,顾名思义就是无法被消费的信息,字面意思就是这样理解,一般来说,消息投递到队列中,消费者从队列取出消息进行消费,但某些时候由于特定的原因导致队列中的某些消息无法被消费,这样的消息如果没有后续的处理,就变成了死信,有死信就有了死信队列

消息成为死信的三种情况:

1.队列消息数量到达限制:比如队列最大只能存储10条消息,而发了11条数据,根据先进先出,最先发的消息会进入死信队列

2.消费者拒接消费消息,basicNack/basicReject,并且不把消息重新放入原目标队列,requeue=false

3.原队列存在消息过期设置,消息达到超时时间为被消费

通常情况下,消费者是能正常消费的,但是出现上面说的三种情况之一,就无法正常消费信息,消息就会进入死信交换机,死信交换机和死信队列进行绑定,最后由其他消费者来消费死信消息

死信的产生既然不可避免,那么就需要从实际的业务角度和场景出发,对这些死信进行后续的处理,常见的处理方式大致有下面几种,

① 丢弃,如果不是很重要,可以选择丢弃

② 记录死信入库,然后做后续的业务分析或处理

③ 通过死信队列,由负责监听死信的应用程序进行处理

综合来看,更常用的做法是第三种,即通过死信队列,将产生的死信通过程序的配置路由到指定的死信队列,然后应用监听死信队列,对接收到的死信做后续的处理。

队列绑定死信交换机:

给队列设置参数:x-dead-letter-exchange 和x-dead-letter-routing-key

消息TTl过期

设置TTL的两种方式:

1.队列设置TTL

在创建对别的收设置队列的x-message-ttl属性,例如

Map<String, Object> map = new HashMap<>();
//设置队列有效期为10秒
map.put("x-message-ttl",10000);
channel.queueDeclare(queueName,durable,exclusive,autoDelete,map);

消息设置TTL
对每条消息设置TTL
AMQP.BasicProperties properties = new AMQP.BasicProperties().builder().expiration("10000").build();
 channel.basicPublish(exchangeName,routingKey,mandatory,properties,"msg body".getBytes());
两者之间的区别:
1.如果设置了队列的TTL属性,那么一旦消息过期,就会被队列的丢弃
2.如果是消息设置了TTL属性,那么即使消息过期,也不一定会马上丢弃,因为消息是否过期是在即将投递到消费者之前判定的,如果当前队列有严重的消息挤压情况,那么已
经过期的消息也许还能存活较长时间
3.如果我们没有设置TTL,就表示消息永远不会过期,如果TTL设置为0,则表示除非此时可以直接投递到消费者,否则该消息会被丢弃
生产者代码:
public class Producer {
 private static final String NORMAL_EXCHANGE = "normal_exchange";
 public static void main(String[] argv) throws Exception {
 try (Channel channel = RabbitMqUtils.getChannel()) {
 channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);
 //设置消息的 TTL 时间
 AMQP.BasicProperties properties = new 
AMQP.BasicProperties().builder().expiration("10000").build();
 //该信息是用作演示队列个数限制
 for (int i = 1; i <11 ; i++) {
 String message="info"+i;
 channel.basicPublish(NORMAL_EXCHANGE, "zhangsan", properties, 
message.getBytes());
 System.out.println("生产者发送消息:"+message);
 }
 }
 }
}

消费者 C1 代码(启动之后关闭该消费者 模拟其接收不到消息)

public class Consumer01 {
 //普通交换机名称
 private static final String NORMAL_EXCHANGE = "normal_exchange";
 //死信交换机名称
 private static final String DEAD_EXCHANGE = "dead_exchange";
 public static void main(String[] argv) throws Exception {
 Channel channel = RabbitUtils.getChannel();
//声明死信和普通交换机 类型为 direct
 channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);
 channel.exchangeDeclare(DEAD_EXCHANGE, BuiltinExchangeType.DIRECT);
 //声明死信队列
 String deadQueue = "dead-queue";
 channel.queueDeclare(deadQueue, false, false, false, null);
 //死信队列绑定死信交换机与 routingkey
 channel.queueBind(deadQueue, DEAD_EXCHANGE, "lisi");
 //正常队列绑定死信队列信息
 Map<String, Object> params = new HashMap<>();
 //正常队列设置死信交换机 参数 key 是固定值
 params.put("x-dead-letter-exchange", DEAD_EXCHANGE);
 //正常队列设置死信 routing-key 参数 key 是固定值
 params.put("x-dead-letter-routing-key", "lisi");
 
 String normalQueue = "normal-queue";
 channel.queueDeclare(normalQueue, false, false, false, params);
 channel.queueBind(normalQueue, NORMAL_EXCHANGE, "zhangsan");
 System.out.println("等待接收消息.....");
 DeliverCallback deliverCallback = (consumerTag, delivery) -> {
 String message = new String(delivery.getBody(), "UTF-8");
 System.out.println("Consumer01 接收到消息"+message);
 };
 channel.basicConsume(normalQueue, true, deliverCallback, consumerTag -> {
 });
 }
}

C2 代码(以上步骤完成后 启动 C2 消费者 它消费死信队列里面的消息)  

public class Consumer02 {
 private static final String DEAD_EXCHANGE = "dead_exchange";
 public static void main(String[] argv) throws Exception {
Channel channel = RabbitUtils.getChannel();
 channel.exchangeDeclare(DEAD_EXCHANGE, BuiltinExchangeType.DIRECT);
 String deadQueue = "dead-queue";
 channel.queueDeclare(deadQueue, false, false, false, null);
 channel.queueBind(deadQueue, DEAD_EXCHANGE, "lisi");
 System.out.println("等待接收死信队列消息.....");
 DeliverCallback deliverCallback = (consumerTag, delivery) -> {
 String message = new String(delivery.getBody(), "UTF-8");
 System.out.println("Consumer02 接收死信队列的消息" + message);
 };
 channel.basicConsume(deadQueue, true, deliverCallback, consumerTag -> {
 });
 }
}

 队列达到最大长度

1. 消息生产者代码去掉 TTL 属性
public class Producer {
 private static final String NORMAL_EXCHANGE = "normal_exchange";
 public static void main(String[] argv) throws Exception {
 try (Channel channel = RabbitMqUtils.getChannel()) {
 channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);
 //该信息是用作演示队列个数限制
 for (int i = 1; i <11 ; i++) {
 String message="info"+i;
 channel.basicPublish(NORMAL_EXCHANGE,"zhangsan",null, message.getBytes());
 System.out.println("生产者发送消息:"+message);
 }
 }
 }
}
2. C1 消费者修改以下代码 ( 启动之后关闭该消费者 模拟其接收不到消息 )

注意此时需要把原先队列删除 因为参数改变了 

3. C2 消费者代码不变 ( 启动 C2 消费者)

消息被拒

1.消息生产者代码同上生产者一致

2.C1 消费者代码(启动之后关闭该消费者 模拟其接收不到消息)

 //普通交换机名称
 private static final String NORMAL_EXCHANGE = "normal_exchange";
 //死信交换机名称
 private static final String DEAD_EXCHANGE = "dead_exchange";
 public static void main(String[] argv) throws Exception {
 Channel channel = RabbitUtils.getChannel();
 //声明死信和普通交换机 类型为 direct
 channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);
 channel.exchangeDeclare(DEAD_EXCHANGE, BuiltinExchangeType.DIRECT);
 //声明死信队列
 String deadQueue = "dead-queue";
 channel.queueDeclare(deadQueue, false, false, false, null);
 //死信队列绑定死信交换机与 routingkey
 channel.queueBind(deadQueue, DEAD_EXCHANGE, "lisi");
//正常队列绑定死信队列信息
 Map<String, Object> params = new HashMap<>();
 //正常队列设置死信交换机 参数 key 是固定值
 params.put("x-dead-letter-exchange", DEAD_EXCHANGE);
 //正常队列设置死信 routing-key 参数 key 是固定值
 params.put("x-dead-letter-routing-key", "lisi");
 String normalQueue = "normal-queue";
 channel.queueDeclare(normalQueue, false, false, false, params);
 channel.queueBind(normalQueue, NORMAL_EXCHANGE, "zhangsan");
 System.out.println("等待接收消息.....");
 DeliverCallback deliverCallback = (consumerTag, delivery) -> {
 String message = new String(delivery.getBody(), "UTF-8");
 if(message.equals("info5")){
 System.out.println("Consumer01 接收到消息" + message + "并拒绝签收该消息");
 //requeue 设置为 false 代表拒绝重新入队 该队列如果配置了死信交换机将发送到死信队列中
 channel.basicReject(delivery.getEnvelope().getDeliveryTag(), false);
 }else {
 System.out.println("Consumer01 接收到消息"+message);
 channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
 }
 };
 boolean autoAck = false;
 channel.basicConsume(normalQueue, autoAck, deliverCallback, consumerTag -> {
 });
 }
}

 

3. C2 消费者代码不变

启动消费者 1 然后再启动消费者 2

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/578690.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

springboot接口返回的json字符串如何不显示null值字段

springboot接口返回的json字符串如何不显示null值字段 POSTMAN 测试接口时&#xff0c;默认字段值即使是null也显示出来&#xff0c;如何去掉更加简洁&#xff1f;这个跟POSTMAN无关&#xff0c;POSTMAN仅仅是展示response的body而已 思考&#xff1a;为什么要去掉null值的字…

Copilot插件:时时陪伴的AI助手 | Obsidian实践

这段时间&#xff0c;有点儿沉迷于AIGC实践不可自拔&#xff0c;也因此懈怠了Obsidian实践。回过头来猛然发觉&#xff0c;其实Obsidian也“上架”了很多与ChatGPT有关的插件。 赶紧体验下&#xff0c;看看有没有什么&#xff0c;是一下子就能用起来的。不得不说&#xff0c;自…

mysql错误码1045解决方案

用数据库连接工具访问提示 1045的错误码&#xff0c;在命令行输入mysql -u root –p&#xff0c;输入密码&#xff0c;经常出现下面的错误信息&#xff0c;相信该错误信息很多人在使用mysql时都遇到过。 ERROR 1045 (28000): Access denied for user rootlocalhost (using pas…

如何在百度百科里创建一个百科词条,百度百科官网创建词条步骤方法

很多朋友表示特别希望能在百度百科里创建一个百科词条&#xff0c;但是在百度百科官网却不知如何操作创建百科词条&#xff0c;连最基本的百度百科操作步骤可能都不清楚&#xff0c;下面洛希爱做百科网为大家分享如何在百度百科里创建一个百科词条&#xff0c;百度百科官网创建…

编码,Part 1:ASCII、汉字及 Unicode 标准

个人博客 编码的历史由来就懒得介绍了&#xff0c;只需要知道人类处理文本信息是以字符为基本单位&#xff0c;而计算机在最底层只认识 0/1&#xff0c;所以当计算机要为人类存储/呈现字符时&#xff0c;就需要有一个规则&#xff0c;在字符和 0/1 序列之间建立映射关系&#…

Mybatis generator

文章目录 使用引入依赖配置文件设置生成使用中出现的异常 Mybatis中javaType和jdbcType对应关系int、bigint、smallint 和 tinyint是使用整数数据的精确数字数据类型。 使用 引入依赖 <!-- mysql --><dependency><groupId>mysql</groupId><artifa…

(转载)基于遗传算法的TSP算法(matlab实现)

1 理论基础 TSP(traveling salesman problem,旅行商问题)是典型的NP完全问题&#xff0c;即其最坏情况下的时间复杂度随着问题规模的增大按指数方式增长&#xff0c;到目前为止还未找到一个多项式时间的有效算法。 TSP问题可描述为&#xff1a;已知n个城市相互之间的距离&…

5月份读书学习好文记录

学好C可以采取以下几个步骤&#xff1a; 掌握基本语法&#xff1a;C的语法对于初学者来说可能是一件比较难的事情&#xff0c;所以需要花时间掌握C的语言基础和语法规则&#xff0c;例如数据类型、流程控制、函数等。 学会面向对象编程(OOP)&#xff1a;C是一种面向对象的编程…

RNN Seq2Seq

Feedforward v.s. Recurrent Feedforward network does not have input at each stepFeedforward network has different parameters for each layer 双向RNN 双向递归层可以提供更好的识别预测效果&#xff0c;但却不能实时预测&#xff0c;由于反向递归的计算需要从最末时刻…

第18章 JQuery DataTables初始化渲染显示与排序

1 System.Linq.AsyncIEnumerableExtensions (Data\Extensions\AsyncIEnumerableExtensions.cs) namespace System.Linq { /// <summary> /// 【异步枚举数扩展--类】 /// <remarks> /// 摘要&#xff1a; /// 该类通过对System.Linq.Async中方法的自定义扩展…

开启php8的JIT及时编译,超级详细 照抄即可

JIT时php8的重要功能之一&#xff0c;可以极大的提高性能&#xff1b; JIT编译器集成在了Opcache插件中&#xff0c;仅在启动Opcache插件才有效 Opcache将 PHP 脚本编译后的字节码存储到内存中&#xff0c;以避免每次执行脚本时重新解析和编译&#xff0c;从而提高 PHP 应用程…

English Learning - L3 综合练习 4 VOA-Food 2023.05.24 周三

English Learning - L3 综合练习 4 VOA-Food 2023.05.24 周三 句 1句 2Support 拓展养家&#xff0c;养家之人 句 3mustard 芥末expect 扩展 句 4句 5句 6句 7颁奖句 8句 9句 10句 11句 12句 13句 14好声音比赛 句 1 句 2 Support 拓展 Support 作动词时&#xff1a; Support …

Loki 日志收集系统

一.系统架构 二.组成部分 Loki 的日志堆栈由 3 个组件组成&#xff1a; promtail&#xff1a;用于采集日志、并给每条日志流打标签&#xff0c;每个节点部署&#xff0c;k8s部署模式下使用daemonset管理。 loki&#xff1a;用于存储采集的日志&#xff0c; 并根据标签查询日志流…

Windows 10搭建SFTP服务器【公网远程访问】

相较比高效率的FTP协议而言&#xff0c;SFTP默认只占用一个TCP端口 22端口&#xff0c;采用的是SSH加密隧道&#xff0c;理论上会比FTP更安全&#xff0c;更稳定些。 搭建SFTP服务器&#xff0c;这里我们用freesshd来实现&#xff1b;而在服务器搭建成功后&#xff0c;要实现公…

mysql详细优化建议(谈谈你的SQL优化经验)

sql语句规范 MySQL在Linux系统下数据库名&#xff0c;表名&#xff0c;存储过程名&#xff0c;函数名称&#xff0c;触发器名称等区分大小写&#xff0c;列名不区分大小写&#xff0c;原因是这些操作系统下文件名称区分大小写。 MySQL在Windows系统下全部不区分大小写&#x…

Jenkins使用Maven构建Java应用程序

本教程将向你展示如何使用Jenkins编排并构建一个使用Maven管理的简单Java应用程序。 如果你是使用Maven的Java开发人员&#xff0c;并且对CI/CD概念不熟悉&#xff0c;或者你可能熟悉这些概念&#xff0c;但不知道如何使用Jenkins实现构建应用程序&#xff0c;那么本教程适合你…

C语言数据存储 — 整型篇

C语言数据存储 — 整型篇 前言1. 数据类型介绍1.1 类型的基本分类 2. 整型在内存中的存储2.1 原码、反码、补码2.1.1 为什么数据存放在内存中存放的是补码 2.2 大小端介绍2.2.1 什么是大小端&#xff1f;2.2.2 为什么有大端和小端&#xff1f;2.2.3 一道百度系统工程师笔试题 3…

Linux之环境变量

文章目录 前言一、环境变量1.概念2.运行程序3.windows下的环境变量4.常见的环境变量 二、系统调用获取环境变量1.getenv2.演示1.标识当前的Linux用户2. 判断当前用户是否为root 三、设置环境变量1.关于变量的命令1.echo2.export3.env4.unset5.set 2.子进程继承3.PWD1. 概念2.实…

手摸手教你用AI生成PPT(本文不卖课)

今天再和大家分享一个AI实践&#xff1a; 如何借力AI帮我制作PPT&#xff1f; 上篇和大家安利了目前不用魔法上网&#xff0c;且不用翻墙的最强AI工具&#xff0c;假设我今天要给大家做一个分享&#xff0c;来介绍Claude&#xff0c;如何搞定PPT呢&#xff1f; 当然是直接问Cla…

如何在华为OD机试中获得满分?Java实现【放苹果】一文详解!

✅创作者&#xff1a;陈书予 &#x1f389;个人主页&#xff1a;陈书予的个人主页 &#x1f341;陈书予的个人社区&#xff0c;欢迎你的加入: 陈书予的社区 &#x1f31f;专栏地址: Java华为OD机试真题&#xff08;2022&2023) 文章目录 1. 题目描述2. 输入描述3. 输出描述…