rabbitmq死信队列详解

news2025/1/13 15:32:45

目录

1 概念

2 成为死信队列的条件

2.1 队列指定长度 

2.2  消息ttl时间

2.3 消费者拒收消息


1 概念

死信队列:死信队列其实和普通的队列一样,只不过里面存放的消息都是普通队列过期没有消费的。所以,接收没有及时被消费消息的队列为死信队列。

2 成为死信队列的条件

以下条件只要满足一条,即可以成为死信队列。

  1. 队列长度满了:排在前面的消息会被拒收或者进入死信交换机
  2. 消息的ttl时间到了:消息超时未被消费
  3. 消息被拒收了:手动拒绝消息

一个队列设置了队列长度或者过期时间或被拒收,并且设置了死信队列的交换机和死信的路由key。那么消息满足条件就会进入死信队列。

例如:

@Bean("queueB")
public Queue queueB(){
    Map<String, Object> arguments  = new HashMap<>();
    //设置死信交换机
    arguments.put("x-dead-letter-exchange",Y_DEAD_LETTER_XCHANGE);
    //设置死信RoutingKey
    arguments.put("x-dead-letter-routing-key","YD");
    //设置ttl
    arguments.put("x-message-ttl",40000);

    return QueueBuilder.durable(QUEUE_B).withArguments(arguments).build();
}

以上只是声明了一个普通队列queueB,然后在该队列设置了过期时间40s,和死信交换机和死信路由key。

注意:死信队列就是一个普通的队列,只不过声明普通队列的时候指定了死信交换机,二者才产生了联系

2.1 队列指定长度 

配置相关队列和交换机

注意:声明一个队列分为3步(声明交换机、声明队列、将队列和交换机路由绑定)

package com.liubujun.rabbitmqspringbootdemo.config;

import com.rabbitmq.client.AMQP;
import com.sun.javafx.collections.MappingChange;
import jdk.nashorn.internal.objects.NativeUint8Array;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.Exchange;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Component;

import java.util.HashMap;
import java.util.Map;

/**
 * @Author: liubujun
 * @Date: 2023/6/3 16:15
 */

@Component
public class DeadQueueConfig {

    //队列
    public static final String FORMAL_QUEUE = "formal_queue";
    public static final String DEAD_QUEUE = "dead_queue";

    //交换机
    public static final String FORMAL_EXCHANGE = "formal_exchange";
    public static final String DEAD_EXCHANGE = "dead_exchaneg";

    //路由key
    public static final String FORMAL_ROUNTE_KEY = "formal_rounte_key";
    public static final String DEAD_ROUNTE_KEY = "dead_route_key";

    /**
     * 普通队列交换机声明
     * 交换机类型:topic:处理路由键,按模式匹配,向符合规则的队列投递消息
     * name 交换机名称
     * durable 是否持久化
     * autoDelete 是否删除
     * arguments 用于设置其他参数
     * @return
     */
    @Bean
    public Exchange getFormalExchange(){
        return new TopicExchange(FORMAL_EXCHANGE,true,false,null);
    }

    /**
     * 声明普通队列,并设置与死信队列联系
     * name 队列名称
     * durable 是否持久化
     * exclusive 是否排外 如果是排外的,该队列 仅对首次声明它的连接(Connection)可见,是该Connection私有的,
     * 类似于加锁,并在连接断开connection.close()时自动删除
     * autoDelete 是否删除
     * arguments 用于设置其他参数
     * @return
     */
    @Bean
    public Queue getFormalQueue(){
        Map<String, Object> map = new HashMap<>();
        //设置队列最大长度
        map.put("x-max-length",5);
        //设置死信队列交换机
        map.put("x-dead-letter-exchange",DEAD_EXCHANGE);
        //设置死信队列路由key
        map.put("x-dead-letter-routing-key",DEAD_ROUNTE_KEY);
        return new Queue(FORMAL_QUEUE,true,false,false,map);
    }

    /**
     * 将普通队列和交换机绑定
     * destination:目标队列或交换器
     * destinationType:DesdinationType指出目标是交换器还是对列
     * exchange:交换机
     * routingKey:路由key
     * arguments:参数设置
     * @return
     */
    @Bean
    public Binding bingFormalQueue(){
        return new Binding(FORMAL_QUEUE, Binding.DestinationType.QUEUE,FORMAL_EXCHANGE,FORMAL_ROUNTE_KEY,null);
    }


    /**
     * 声明死信队列交换机
     * @return
     */
    @Bean
    public Exchange getDeadExchange(){
        return new TopicExchange(DEAD_EXCHANGE,true,false,null);
    }


    /**
     * 声明死信队列
     * @return
     */
    @Bean
    public Queue getDeadQueue(){
        return new Queue(DEAD_QUEUE,true,false,false, null);
    }

    /**
     * 将死信队列和交换机绑定
     * @return
     */
    @Bean
    public Binding bingDeadQueue(){
        return new Binding(DEAD_QUEUE, Binding.DestinationType.QUEUE,DEAD_EXCHANGE,DEAD_ROUNTE_KEY,null);
    }




}

生产者:发送6条消息,看rabbitmq中队列变化

    @GetMapping("/sendMessageTtl/{message}")
    public void sendMessageTtl(@PathVariable String message){
        log.info("当前时间发送:{},发送5条消息给两个TTL队列:{}",new Date().toString(),message);
        for (int i = 0; i < 6; i++) {
            rabbitTemplate.convertAndSend(DeadQueueConfig.FORMAL_EXCHANGE,DeadQueueConfig.FORMAL_ROUNTE_KEY,message);
        }
    }

rabbitmq控制台:

普通队列有5条消息,而死信队列有1条消息。

因为在声明普通队列的时候,已经说明了队列最大长度为5,那么多余的消息就会根据配置的参数找到对应的交换机进而找到对应的路由,然后路由到对应的队列(死信队列) 。

2.2  消息ttl时间

继续沿用上面的配置,只不过修改下普通队列的参数。

    @Bean
    public Queue getFormalQueue(){
        Map<String, Object> map = new HashMap<>();
        //设置队列超时时间
        map.put("x-message-ttl",5000);
        //设置死信队列交换机
        map.put("x-dead-letter-exchange",DEAD_EXCHANGE);
        //设置死信队列路由key
        map.put("x-dead-letter-routing-key",DEAD_ROUNTE_KEY);
        return new Queue(FORMAL_QUEUE,true,false,false,map);
    }

发送消息:

可以发现,发送给普通队列的消息,超时没有被消费,都进入到了死信队列中。 

2.3 消费者拒收消息

沿用上面的配置,并在声明普通队列的时候去掉消息的过期时间。

注意:需要在rabbitmq控制台删除队列,不然项目启动会报错。

添加消费者:

@Slf4j
@Component
public class DeadQueueConsumer {

    /**
     * 监听死信队列
     */
//    @RabbitListener(queues = DeadQueueConfig.DEAD_QUEUE)
//    public void listenDeadQueue(Message message, Channel channel){
//    log.info("接收到死信队列消息:{}",message.getBody());
//    }

    /**
     * 监听普通队列
     */
    @RabbitListener(queues = "formal_queue")
    public void listenFormalQueue(Message message, Channel channel) throws IOException {
        log.info("接收到普通队列消息:{}",message.getBody());
        long deliveryTag = message.getMessageProperties().getDeliveryTag();
        //拒绝消息
        channel.basicReject(deliveryTag,false);
    }
}

rabbitmq控制台结果: 

消息在消费者端被拒收后,直接被放进了死信队列。

 

 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/607851.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

[SWPUCTF 2021 新生赛] (WEB二)

目录 easyupload1.0 easyupload2.0 easyupload3.0 no_wakeup PseudoProtocols error hardrce pop sql finalrce hardrce_3 easyupload1.0 1.启动环境 2.上传一个图片木马 GIF89a <script languagephp>eval($_POST[a]);</script>3.蚁剑连接&#xff0c;…

VBScript深度学习入门——线性回归

背景 破电脑装不了VS、Py、IDea、Golang等主流编译器或其语言运行环境&#xff0c;但是自带.Net FrameWork 3.5&#xff0c;可以使用VBScript进行脚本编写&#xff0c;无所谓&#xff0c;反正都是了解底层原理学习&#xff0c;大不了手搓机器学习框架。 分析 了解线性回归的…

剑指 Offer 24. 反转链表解题思路

文章目录 题目解题思路 题目 定义一个函数&#xff0c;输入一个链表的头节点&#xff0c;反转该链表并输出反转后链表的头节点。 示例: 输入: 1->2->3->4->5->NULL 输出: 5->4->3->2->1->NULL 限制&#xff1a; 0 < 节点个数 < 5000 …

去过门头沟吗?

门头沟&#xff08;Mt.Gox&#xff09; 1. 成立2. 发展3. 危机4. 后续 1. 成立 MtGox是程序员杰德麦凯莱布&#xff08;Jed MeCaleb&#xff09;在比特币论坛的用户名。2007年&#xff0c;麦凯莱布开发了魔法风云会线上交易平台&#xff1a;门头沟&#xff08;Mt.Gox&#xff…

【数据分析之道-Matplotlib(六)】Matplotlib饼图

文章目录 专栏导读1、Matplotlib饼图pie()语法格式2、设置饼图各个扇形的标签与颜色3、突出显示第二个扇形&#xff0c;并格式化输出百分比4、shadows通过将参数设置为&#xff1a;向饼图添加阴影 True5、使用legend()函数为每个楔形添加解释列表6、将title参数添加到legend 函…

MYSQL实战45讲笔记--基础架构:一条SQL查询语句是如何执行的?

基础架构&#xff1a;一条SQL查询语句是如何执行的&#xff1f; **select** * **from** T **where** ID10mysql架构 MySQL 可以分为 Server 层和存储引擎层两部分。 Server 层&#xff1a;连接器、查询缓存、分析器、优化器、执行器等&#xff0c;涵盖 MySQL 的大多数核心服务…

不同平均值—集合—力扣—Python

一、题目描述&#xff1a; 二、代码解题&#xff1a; 1.不含注释语句的 class Solution:def distinctAverages(self, nums: List[int]) -> int:avg set([])l len(nums)//2for i in range(0,l):mi min(nums)ma max(nums)p (mima)/2avg.add(p)nums.remove(mi)nums.remo…

Python-For-EEG基础代码讲解(1)

Python-For-EEG 我要演示脑电图信号的基本分析。 主题 1、基于时域分析&#xff0c;P300信号数据集 Event-related potentials and 1-dimensional convolution&#xff08;ERP,CNN&#xff09;Long short-term memory(LSTM) 2、基于频域分析&#xff0c;DEAP和SSVEP数据集…

LeetCode_双指针_中等_82.删除排序链表中的重复元素 II

目录 1.题目2.思路3.代码实现&#xff08;Java&#xff09; 1.题目 给定一个已排序的链表的头 head &#xff0c; 删除原始链表中所有重复数字的节点&#xff0c;只留下不同的数字 。返回已排序的链表。 示例 1&#xff1a; 输入&#xff1a;head [1,2,3,3,4,4,5] 输出&…

oracle19c SYSAUX表空间使用率高

今早手机收到一个信息&#xff0c;某客户的19c环境sysaux使用率超过了80%告警了。既然有事了还是需要登录查看下的 SYS > SET LINES 120 pagesize 199; SYS > COL OCCUPANT_NAME FORMAT A30; SYS > SELECT * FROM (SELECT OCCUPANT_NAME,SPACE_USAGE_KBYTES FROM V$S…

【逆向基础】JS逆向入门:小白也可以看懂

文章目录 前言一、接口抓包二、逆向分析3. 接口验证总结 前言 出于对数据安全的考虑&#xff0c;现代化的网站/APP通常会对数据接口做加密处理。而分析这些接口的加密算法并实现模拟调用的过程就叫做「逆向」。逆向对于爬虫工程师来说是一个永远绕不开的话题&#xff0c;也逐渐…

《嵌入式存储器架构、电路与应用》----学习记录(四)

第5章 新型嵌入式存储器 在现有主流嵌入式存储器中&#xff0c;SRAM虽然读写速度非常快&#xff0c;但是单元面积太大&#xff0c;无法在片上实现高密度集成&#xff1b;DRAM由于要制造电容&#xff0c;所采用的工艺无法在先进的CMOS工艺中实现&#xff0c;不利于做嵌入式存储…

六一专辑||C++实现动态烟花代码

首先&#xff0c;祝大家儿童节快乐&#xff01; 在这篇文章中&#xff0c;将用烟花致以大家最好的祝福&#xff01; 烟花代码将会用到 Easyx 图形库&#xff0c;可以去官网下载&#xff1a;easyx.cnhttp://easyx.cn/ 代码思路 1 烟花结构体 2 初始化烟花 3 烟花上升 4 烟…

设置主机名和host映射

这里写目录标题 设置主机名设置host映射主机名解析过程分析 设置主机名 为了方便记忆。可以给linux系统主机名&#xff0c;也可以根据需要修改主机名 指令hostname来查看主机名 修改主机名 vim /etc/hostname 进入之后修改就行 修改之后重启生效 设置host映射 如何通过主机…

分布式锁实现原理

为什么需要分布式锁&#xff1f; 本地锁synchronized只能锁住当前服务进程&#xff0c;一个本地锁只能锁一个服务&#xff0c;如果是分布式服务情况下使用本地锁&#xff0c;那么多少服务就会有多少进程同时执行&#xff0c;就是去了锁的效果&#xff0c;为了到达分布式情况下…

3.9 流水作业调度问题

博主简介&#xff1a;一个爱打游戏的计算机专业学生博主主页&#xff1a; 夏驰和徐策所属专栏&#xff1a;算法设计与分析 1.我对流水调度问题的理解 流水作业调度问题是动态规划中的一个经典问题&#xff0c;它涉及将一系列作业分配给多个工作站以最小化总完成时间。该问题的…

go test 包外测试

之前文章有介绍过 go test coverage 单测覆盖率 和Go test基础用法&#xff0c;今天这里主要介绍 go 单测中比较特殊的一种场景&#xff1a;包外测试。初次看到这个名字&#xff0c;我还以为就是单独创建一个新目录&#xff0c;所有的单测用例统一都汇总到这个目录下&#xff0…

【P48】JMeter 断言持续时间(Duration Assertion)

文章目录 一、断言持续时间&#xff08;Duration Assertion&#xff09;参数说明二、测试计划设计 一、断言持续时间&#xff08;Duration Assertion&#xff09;参数说明 可以控制取样器的执行是否超过某个时间&#xff0c;如果超时则报错&#xff0c;持续时间断言器也叫超时…

21天学会C++:Day6----内联函数

CSDN的uu们&#xff0c;大家好。这里是C入门的第六讲。 座右铭&#xff1a;前路坎坷&#xff0c;披荆斩棘&#xff0c;扶摇直上。 博客主页&#xff1a; 姬如祎 收录专栏&#xff1a;C专题 目录 1. 知识引入 2. 知识点讲解 2.1 内联函数的使用 2.2 内联函数的特性 2.2 …

强大Excel 插件 Zbrainsoft Dose for Excel 3.6.2 Crack

强大的 Excel 插件 Zbrainsoft Dose for Excel 3.6.2 如果您厌倦了在Excel中消除重复的行&#xff0c;比较工作表或执行困难的活动&#xff0c;那么Dose for Excel是您需要的强大便捷解决方案&#xff0c;只需单击几下即可将所有这些复杂的杂务简化。它具有 100 多个强大的新功…