RabbitMQ深入 —— 死信队列

news2024/12/27 14:00:32

前言

        前面荔枝梳理了RabbitMQ中的普通队列、交换机以及相关的知识,在这篇文章中荔枝将会梳理RabbitMQ的一个重要的队列 —— 死信队列,主要了解消息流转到死信队列的三种的方式以及相应的实现demo。希望能帮助到有需要的小伙伴~~~


文章目录

前言

死信队列

1 基本概念 

2 设置消息时间TTL过期的死信队列

3 队列达到最大长度发生死信 

4 消息被拒引发死信

总结


死信队列

1 基本概念 

     死信就是无法被消费的消息,一般来说,producer将消息投递到broker或者直接到queue里了,consumer从queue取出消息进行消费,但某些时候由于特定的原因导致queue中的某些消息无法被消费,这样的消息如果没有后续的处理就变成了死信,有死信自然就有了死信队列。

应用场景:为了保证订单业务的消息数据不丢失,需要使用到RabbitMQ的死信队列机制,当消息
消费发生异常时,将消息投入死信队列中。比如说:用户在商城下单成功并点击去支付后在指定时间未支付时自动失效。

死信具有一定的延迟性,它可以作为延迟消息来处理。

死信出现的原因:

  • 消息TTL过期
  • 队列达到最大长度(队列满了,无法再添加数据到mq中)
  • 消息被拒绝(basic.reject或basic.nack)并且requeue=false.I 

2 设置消息时间TTL过期的死信队列

首先我们在消费者Consumer1中声明普通交换机、死信交换机、普通队列和死信队列之间的关系,同时在声明之后令Consumer1拒收消息,在RabbitMQ中观察消息生产者发出消息的流转情况。

设置死信队列的消费者1

        在死信队列中我们设置了普通交换机、死信交换机、普通队列和死信队列。同时在正常队列中通过channel信道对象中的queueDeclare方法中的一个Map类型的参数,设置了死信交换机和普通交换机之间的关系,配置好TTL、RoutingKey并声明其死信交换机。

package com.crj.rabbitmq.deadQueue;

import com.crj.rabbitmq.utils.RabbitMqUtil;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;

import java.util.HashMap;
import java.util.Map;

/**
 * 死信队列
 * 消费者1:需要声明死信队列和普通队列
 */
public class Consumer {
    //普通交换机名称
    public static final String NORMAL_EXCHANGE = "normal";
    //死信交换机名称
    public static final String DEAD_EXCHANGE = "dead";
    //普通队列的名称
    public static final String NORMAL_QUEUE = "normalQueue";
    //死信队列的名称
    public static final String DEAD_QUEUE = "deadQueue";

    public static void main(String[] args) throws Exception {
        //声明通道
        Channel channel = RabbitMqUtil.getChannel();
        //声明普通交换机和死信交换机
        channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);
        channel.exchangeDeclare(DEAD_EXCHANGE,BuiltinExchangeType.DIRECT);
        /**
         * 声明普通队列和死信队列
         */
        //创建一个hashmap对象来配置连接死信队列的参数
        Map<String, Object> arguments = new HashMap<>();
        //设置过期时间
        arguments.put("x-message-ttl",10000);
        //正常队列设置死信交换机
        arguments.put("x-dead-letter-exchange",DEAD_EXCHANGE);
        //设置死信RoutingKey
        arguments.put("x-dead-letter-routing-key","dead1");
        //声明普通队列
        channel.queueDeclare(NORMAL_QUEUE,false,false,false,arguments);
        //死信队列
        channel.queueDeclare(DEAD_QUEUE,false,false,false,null);

        //绑定队列和交换机
        channel.queueBind(NORMAL_QUEUE,NORMAL_EXCHANGE,"normal");
        channel.queueBind(DEAD_QUEUE,DEAD_QUEUE,"dead");

        //接收消息
        DeliverCallback deliverCallback = (consumerTag, message)->{
            System.out.println("Consumer1接收到的信息:"+new String(message.getBody(),"UTF-8"));
            System.out.println("接收队列:"+DEAD_QUEUE+"接收键:"+message.getEnvelope().getRoutingKey());
        };
        //消费者开始消费消息
        channel.basicConsume(DEAD_QUEUE,true,deliverCallback,(consumerTag)->{});
    }
}

需要注意的是,这里在正常队列中设置过期时间TTL一般不太常用,我们通常会在publish处设置消息的TTL,因此这里arguments对象有关 "x-message-ttl" 参数的配置可以注释掉。

实际处理消息的消费者2

在处理死信队列消息的消费者处,我们只需要设置消费者接收消息是来自死信队列即可。 

package com.crj.rabbitmq.deadQueue;

import com.crj.rabbitmq.utils.RabbitMqUtil;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;

import java.util.HashMap;
import java.util.Map;

/**
 * 死信队列
 * 消费者1:需要声明死信队列和普通队列
 */
public class Consumer2 {
    //死信队列的名称
    public static final String DEAD_QUEUE = "deadQueue";

    public static void main(String[] args) throws Exception {
        //声明通道
        Channel channel = RabbitMqUtil.getChannel();
        System.out.println("等待接收消息");
        //接收消息
        DeliverCallback deliverCallback = (consumerTag, message)->{
            System.out.println("Consumer2接收到的信息:"+new String(message.getBody(),"UTF-8"));
            System.out.println("接收队列:"+DEAD_QUEUE+"接收键:"+message.getEnvelope().getRoutingKey());
        };
        //消费者开始消费消息
        channel.basicConsume(DEAD_QUEUE,true,deliverCallback,(consumerTag)->{});
    }
}

​​​​生产者

在这里我们借助AMQP. BasicProperties对象的build方法来设置相应的死信TTL。

package com.crj.rabbitmq.deadQueue;

import com.crj.rabbitmq.utils.RabbitMqUtil;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;

public class Publish {
    public static final String NORMAL_EXCHANGE = "normal";
    public static final String NORMAL_QUEUE = "normalQueue";

    public static void main(String[] args) throws Exception {
        Channel channel = RabbitMqUtil.getChannel();
        //在Consumer已经声明过交换机了,所以在这里不能声明
        //死信消息,设置TTL
        AMQP.BasicProperties properties = new AMQP.BasicProperties().builder().expiration("10000").build();

        for (int i = 0; i < 11; i++) {
            String message = "info"+i;
            channel.basicPublish(NORMAL_EXCHANGE,"normal",properties,message.getBytes());
        }
    }
}

未运行Consumer2前我们看到普通队列在我们设置的TTL:10s之后将消息流转到死信队列中。

最后启动Consumer2后确实也收到了死信队列中的消息

3 队列达到最大长度发生死信 

在这一部分中我们需要注释掉之前在生产者中设置的消息的TTL,同时在消费者1中开启正常队列的最大消息堆积容量。 

arguments.put("x-max-length",6);

 这样子我们就可以模拟队列达到最大长度后产生死信的情况了。

4 消息被拒引发死信

        要想开启消费者拒收消息的功能,首先需要在消息接收的basicConsumer方法中关闭自动应答,同时自行设置手动应答的逻辑。在下面接收消息的回调函数中,在basicAck中设置应答,在basicReject实现消息拒收。

package com.crj.rabbitmq.deadQueue;

import com.crj.rabbitmq.utils.RabbitMqUtil;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;

import java.util.HashMap;
import java.util.Map;

/**
 * 死信队列
 * 消费者1:需要声明死信队列和普通队列
 */
public class Consumer {
    //普通交换机名称
    public static final String NORMAL_EXCHANGE = "normal";
    //死信交换机名称
    public static final String DEAD_EXCHANGE = "dead";
    //普通队列的名称
    public static final String NORMAL_QUEUE = "normalQueue";
    //死信队列的名称
    public static final String DEAD_QUEUE = "deadQueue";

    public static void main(String[] args) throws Exception {
        //声明通道
        Channel channel = RabbitMqUtil.getChannel();
        //声明普通交换机和死信交换机
        channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);
        channel.exchangeDeclare(DEAD_EXCHANGE,BuiltinExchangeType.DIRECT);
        /**
         * 声明普通队列和死信队列
         */
        //创建一个hashmap对象来配置连接死信队列的参数
        Map<String, Object> arguments = new HashMap<>();

        //正常队列设置死信交换机
        arguments.put("x-dead-letter-exchange",DEAD_EXCHANGE);
        //设置死信RoutingKey
        arguments.put("x-dead-letter-routing-key","dead1");

        //声明普通队列
        channel.queueDeclare(NORMAL_QUEUE,false,false,false,arguments);
        //死信队列
        channel.queueDeclare(DEAD_QUEUE,false,false,false,null);

        //绑定队列和交换机
        channel.queueBind(NORMAL_QUEUE,NORMAL_EXCHANGE,"normal");
        channel.queueBind(DEAD_QUEUE,DEAD_EXCHANGE,"dead1");

        System.out.println("等待接收消息》》》》》》》》》》》");
        //接收消息
        DeliverCallback deliverCallback = (consumerTag, message)->{
            String msg = new String(message.getBody(),"UTF-8");
            if (msg.equals("info5")){
                System.out.println("Consumer1接收的消息是:"+msg+":此消息是被拒绝的");
                //这里第二个参数设置了是否要将拒收的消息塞回原队列
                channel.basicReject(message.getEnvelope().getDeliveryTag(), false);
            }else {
                System.out.println("Consumer1接收到的信息:"+new String(message.getBody(),"UTF-8"));
                //成功应答,这里设置不批量操作
                channel.basicAck(message.getEnvelope().getDeliveryTag(), false);
            }
        };
        //开启手动应答
        //消费者开始消费消息
        channel.basicConsume(DEAD_QUEUE,false,deliverCallback,(consumerTag)->{});
    }
}

总结

        时间过期、消息被拒、队列容量限制这三个机制会引发消息被转发死信队列,那么死信队列除了在这三种情况下继续保存消息之外,还有什么作用呢?下一篇文章荔枝会梳理延时队列,相信看完下一篇文章大家能有所收获~

今朝已然成为过去,明日依然向往未来!我是荔枝,在技术成长之路上与您相伴~~~

如果博文对您有帮助的话,可以给荔枝一键三连嘿,您的支持和鼓励是荔枝最大的动力!

如果博文内容有误,也欢迎各位大佬在下方评论区批评指正!!!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1020744.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

C++——构造函数

定义 构造函数是一个特殊的成员函数&#xff0c;名字和类名相同&#xff0c;创建类类型对象时由编译器自动调节&#xff0c;保证每个数据成员都有一个合适的初始值&#xff0c;并且在对象的声明周期内只调用一次。 特性 1.函数名和类名相同 2.无返回值 3.对象实例化时编译…

自动化测试工具slelnium的初体验

1.slelnium介绍 1.1 一个Web的自动化测试工具&#xff0c;最初是为网站自动化测试而开发的。 1.2 可以直接运行在浏览器上&#xff0c;它支持所有主流的浏览器&#xff08;包括PhantomJS这些无界面的浏览器&#xff09;&#xff0c;可以接收指令&#xff0c;让浏览器自动加载页…

锐捷交换机vlan隔离(wifi段仅能访问外网,和内网隔离)

因为公司的wifi段&#xff0c;未做隔离&#xff0c;无意间上了网&#xff0c;发现能访问内网网段&#xff0c;这里内网是10、20段&#xff0c;管理网段是100段&#xff0c;于是做了和内网的vlan隔离。 拓朴如下&#xff0c;所有vlan的网关都起在核心上&#xff0c;核心上起了DH…

23062QTday2

完善登录框 点击登录按钮后&#xff0c;判断账号&#xff08;admin&#xff09;和密码&#xff08;123456&#xff09;是否一致&#xff0c;如果匹配失败&#xff0c;则弹出错误对话框&#xff0c;文本内容“账号密码不匹配&#xff0c;是否重新登录”&#xff0c;给定两个按钮…

华为数通方向HCIP-DataCom H12-831题库(单选题:41-60)

第41题 除了虚连接之外,OSPFV3的Hello报文源IPv6地址是哪种类型的IPv6地址? A、IPv6任播地址 B、唯一本地地址 C、全球单播地址 D、链路本地地址 答案: D 解析: 这里题目是源IPv6,不是目的IPv6,与另一题类似 第42题 下列描述中关于MPLS网络中配置静态LSP正确的是? A、…

three.js——模型对象的使用材质和方法

模型对象的使用材质和方法 前言效果图1、旋转、缩放、平移&#xff0c;居中的使用1.1 旋转rotation&#xff08;.rotateX()、.rotateY()、.rotateZ()&#xff09;1.2缩放.scale()1.3平移.translate()1.4居中.center() 2、材质属性.wireframe 前言 BufferGeometry通过.scale()、…

uniapp h5网页打开白屏

修改了默认基本运行路径&#xff0c;然后直接打开index.html的情况下是会这样&#xff0c;放在nginx服务器上运行就ok了。 把默认的./ 路径修改了&#xff1a;/cloudh5 nginx html目录下放子网站 &#xff1a;/cloudh5&#xff1a;

【深度学习实验】线性模型(二):使用NumPy实现线性模型:梯度下降法

目录 一、实验介绍 二、实验环境 1. 配置虚拟环境 2. 库版本介绍 三、实验内容 0. 导入库 1. 初始化参数 2. 线性模型 linear_model 3. 损失函数loss_function 4. 梯度计算函数compute_gradients 5. 梯度下降函数gradient_descent 6. 调用函数 一、实验介绍 使用Nu…

Lombok依赖

一.介绍 Project Lombok 是一个 Java 库&#xff0c;它会自动插入编辑器和构建工具&#xff0c;为您的 Java 增添趣味。永远不要再写另一个 getter 或 equals 方法&#xff0c;使用一个注释&#xff0c;您的类有一个功能齐全的构建器&#xff0c;自动化您的日志记录变量等等。…

2023陇剑杯

2023陇剑杯初赛WP HW hard_web_1 ​ 首先判断哪个是服务器地址 ​ 从响应包看&#xff0c;给客户端返回数据包的就是服务器 所以确定服务器地址是192.168.162.188​ 再从开放端口来看&#xff0c;长期开放的端口 客户端发送一个TCP SYN包&#xff08;同步请求&#xff…

VisualBox QA

出现提示注册表错误&#xff0c;或者之前正常&#xff0c;重启VisualBox后&#xff0c;VM运行失败时&#xff0c;可通过正确卸载VisualBox&#xff0c;然后使用注册表清理软件(CCleaner)清理注册表后&#xff0c;重装VisualBox&#xff0c;即会正常。&#xff08;一般用这个能解…

CSS Id和Class选择器

文章目录 CSS id 选择器示例 CSS class 选择器CSS id和class的区别和相同点 CSS id 选择器 CSS的id选择器是以“#”开头的&#xff0c;用于选择具有特定id属性的HTML元素。 在HTML文档中&#xff0c;每个id应该是全局唯一的&#xff0c;也就是说&#xff0c;每个id只能用于一…

复杂场景:揭秘新生代光伏独角兽企业的数据管理秘诀

项目背景 最新一个光伏独角兽诞生了。 投资界获悉&#xff0c;一道新能源科技股份有限公司&#xff08;以下简称“一道新能”&#xff09;完成Pre-IPO融资。经多家投资方核实&#xff0c;此轮投后估值近80亿元。 一道新能源科技股份有限公司&#xff0c;成立于2018年8月&…

就业创业证查询

这里写目录标题 问题描述结果 问题描述 全国就业创业证查询系统自改版本后不支持根据姓名身份证号查询了&#xff0c;从社保局查又需要证书编号。 结果 经过不谢努力找到了解决办法&#xff0c;可以根据身份证姓名批量查询人员是否有就业证。

【Flowable】使用UEL整合Springboot从0到1(四)

前言 在前面我们介绍了Springboot简单使用了foleable以及flowableUI的安装和使用&#xff0c;在之前我们分配任务的处理人的时候都是通过Assignee去指定固定的人的。这在实际业务中是不合适的&#xff0c;我们希望在流程中动态的去解析每个节点的处理人&#xff0c;当前flowab…

家里电脑怎样远程办公室电脑?快解析映射域名实现内网穿透

远程电脑怎么操作是大家比较关注的问题&#xff0c;特别是涉及内外网&#xff0c;不在同一个局域网内不同计算机间的远程连接访问&#xff0c;如家里电脑怎样远程办公室电脑&#xff1f;这里提供一种简便的异地远程方法&#xff1a;用快解析。通过快解析映射域名软件&#xff0…

【漏洞复现】Smanga未授权远程代码执行漏洞(CVE-2023-36076) 附加SQL注入+任意文件读取

文章目录 前言声明一、产品简介一、漏洞描述二、漏洞等级三、影响范围四、漏洞复现五、修复建议六、附加漏洞漏洞一、SQL注入漏洞二、任意文件读取 前言 Smanga存在未授权远程代码执行漏洞,攻击者可在目标主机执行任意命令,获取服务器权限。 声明 请勿利用文章内的相关技术从…

【面试题】——Java基础篇(33题)

文章目录 1. 八大基本数据类型分类2. 重写和重载的区别3. int和integer区别4. Java的关键字5. 什么是自动装箱和拆箱&#xff1f;6. 什么是Java的多态性&#xff1f;7. 接口和抽象类的区别&#xff1f;8. Java中如何处理异常&#xff1f;9. Java中的final关键字有什么作用&…

Java文字描边效果实现

效果&#xff1a; FontUtil工具类的完整代码如下&#xff1a; 其中实现描边效果的函数为&#xff1a;generateAdaptiveStrokeFontImage() package com.ncarzone.data.contentcenter.biz.img.util;import org.springframework.core.io.ClassPathResource; import org.springfr…

爱思唯尔——利用AI来改善医疗决策和科研

爱思唯尔(Elsevier)是一家全球性的多媒体出版公司&#xff0c;为教育、专业科学和医疗社区提供20,000多种产品&#xff0c;其中包括《柳叶刀》和《细胞》等领先的研究出版物。 该公司正处于数字化转型的第一阶段&#xff0c;将公司140年中发表在报告和期刊上的大量数据数字化。…