RabbitMQ-死信队列常见用法

news2025/4/9 23:57:02

目录

一、什么是死信

 二、什么是死信队列 

​编辑 三、第一种情景:消息被拒绝时

四、第二种场景:. 消费者发生异常,超过重试次数 。 其实spring框架调用的就是 basicNack

五、第三种场景: 消息的Expiration 过期时长或队列TTL过期时间。

六、 第四种情景:  消息队列达到最大容量


一、什么是死信

    在RabbitMQ 中充当主角的就是消息,在不同场景下,消息会有不同地表现。

死信就是消息在特定场景下的一种表现形式,这些场景包括:

1. 消息被拒绝访问,即 RabbitMQ返回 basicNack 的信号时 或者拒绝basicReject

2. 消费者发生异常,超过重试次数 。 其实spring框架调用的就是 basicNack

3. 消息的Expiration 过期时长或队列TTL过期时间

4. 消息队列达到最大容量

上述场景经常产生死信,即消息在这些场景中时,被称为死信。

 二、什么是死信队列 

    死信队列就是用于储存死信的消息队列,在死信队列中,有且只有死信构成,不会存在其余类型的消息。

   死信队列在 RabbitMQ 中并不会单独存在,往往死信队列都会绑定这一个普通的业务消息队列,当所绑定的消息队列中,有消息变成死信了,那么这个消息就会重新被交换机路由到指定的死信队列中去,我们可以通过对这个死信队列进行监听,从而手动的去对这一消息进行补偿。 人工干预

 三、第一种情景:消息被拒绝时

#设置消费者手动应答模式

spring.rabbitmq.listener.simple.acknowledge-mode = manual

package com.by.consumer;


import com.by.model.OrderingOk;
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.io.IOException;

@Configuration
@Slf4j
public class DeadConsumer {
    //死信交换机
    @Bean
    public DirectExchange deadExchange() {
        return ExchangeBuilder.directExchange("Dead_E01").build();
    }

    //死信队列
    @Bean
    public Queue deadQueue1() {
        return QueueBuilder.durable("Dead_Q01").build();
    }

    //死信交换机与死信队列的绑定
    @Bean
    public Binding deadBinding1(Queue deadQueue1, DirectExchange deadExchange) {
        return BindingBuilder.bind(deadQueue1).to(deadExchange).with("RK_DEAD");
    }

    //业务队列
    @Bean
    public Queue queue1() {
        return QueueBuilder
                .durable("Direct_Q01")
                .deadLetterExchange("Dead_E01")
                .deadLetterRoutingKey("RK_DEAD")
                //.ttl(10*1000) //该属性是队列的属性,设置消息的过期时间,消息在队列里面停留时间n毫秒后,就会把这个消息投递到死信交换机,针对的是所有的消息
                //.maxLength(20) //设置队列存放消息的最大个数,x-max-length属性值,当队列里面消息超过20,会把队列之前的消息依次放进死信队列
                .build();
    }

    //业务交换机
    @Bean
    public DirectExchange exchange() {
        return ExchangeBuilder.directExchange("Direct_E01").build();
    }

    //业务交换机与队列的绑定
    @Bean
    public Binding binding1(Queue queue1, DirectExchange exchange) {
        return BindingBuilder.bind(queue1).to(exchange).with("RK01");
    }

    //@RabbitListener(queues = "Direct_Q01")
//    public void receiveMessage(OrderingOk msg) {
//        log.info("消费者1 收到消息:"+ msg );
//        int  i= 5/0;
//    }

    @RabbitListener(queues = "Direct_Q01")
    public void receiveMessage(OrderingOk msg, Message message, Channel channel) throws IOException {

        long deliveryTag = message.getMessageProperties().getDeliveryTag();

        System.out.println("消费者1 收到消息:" + msg + " tag:" + deliveryTag);

        channel.basicReject(deliveryTag, false);
//        try {
//            // 处理消息...
//            int  i= 5/0;
//            // 如果处理成功,手动发送ack确认 ,Yes
//            channel.basicAck(deliveryTag, false);
//        } catch (Exception e) {
//            // 处理失败,可以选择重试或拒绝消息(basicNack或basicReject)  NO
//            channel.basicNack(deliveryTag, false, false); // 并重新入队
//
//        }
    }
}

四、第二种场景:. 消费者发生异常,超过重试次数 。 其实spring框架调用的就是 basicNack

一般要和自动重启一起使用,否则死信队列收不到消息

#设置消费者自动应答模式
spring.rabbitmq.listener.simple.acknowledge-mode = auto
#开启自动应答重试机制
spring.rabbitmq.listener.simple.retry.enabled=true

package com.by.consumer;


import com.by.model.OrderingOk;
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.io.IOException;

@Configuration
@Slf4j
public class DeadConsumer {
    //死信交换机
    @Bean
    public DirectExchange deadExchange() {
        return ExchangeBuilder.directExchange("Dead_E01").build();
    }

    //死信队列
    @Bean
    public Queue deadQueue1() {
        return QueueBuilder.durable("Dead_Q01").build();
    }

    //死信交换机与死信队列的绑定
    @Bean
    public Binding deadBinding1(Queue deadQueue1, DirectExchange deadExchange) {
        return BindingBuilder.bind(deadQueue1).to(deadExchange).with("RK_DEAD");
    }

    //业务队列
    @Bean
    public Queue queue1() {
        return QueueBuilder
                .durable("Direct_Q01")
                .deadLetterExchange("Dead_E01")
                .deadLetterRoutingKey("RK_DEAD")
                //.ttl(10*1000) //该属性是队列的属性,设置消息的过期时间,消息在队列里面停留时间n毫秒后,就会把这个消息投递到死信交换机,针对的是所有的消息
                //.maxLength(20) //设置队列存放消息的最大个数,x-max-length属性值,当队列里面消息超过20,会把队列之前的消息依次放进死信队列
                .build();
    }

    //业务交换机
    @Bean
    public DirectExchange exchange() {
        return ExchangeBuilder.directExchange("Direct_E01").build();
    }

    //业务交换机与队列的绑定
    @Bean
    public Binding binding1(Queue queue1, DirectExchange exchange) {
        return BindingBuilder.bind(queue1).to(exchange).with("RK01");
    }

    //@RabbitListener(queues = "Direct_Q01")
//    public void receiveMessage(OrderingOk msg) {
//        log.info("消费者1 收到消息:"+ msg );
//        int  i= 5/0;
//    }

    @RabbitListener(queues = "Direct_Q01")
    public void receiveMessage(OrderingOk msg, Message message, Channel channel) throws IOException {

        long deliveryTag = message.getMessageProperties().getDeliveryTag();

        System.out.println("消费者1 收到消息:" + msg + " tag:" + deliveryTag);
        int a=10/0;
//        channel.basicReject(deliveryTag, false);
//        try {
//            // 处理消息...
//            int  i= 5/0;
//            // 如果处理成功,手动发送ack确认 ,Yes
//            channel.basicAck(deliveryTag, false);
//        } catch (Exception e) {
//            // 处理失败,可以选择重试或拒绝消息(basicNack或basicReject)  NO
//            channel.basicNack(deliveryTag, false, false); // 并重新入队
//
//        }
    }
}

五、第三种场景: 消息的Expiration 过期时长或队列TTL过期时间

    //业务队列
    @Bean
    public Queue queue1() {
        return QueueBuilder
                .durable("Direct_Q01")
                .deadLetterExchange("Dead_E01")
                .deadLetterRoutingKey("RK_DEAD")
                .ttl(10*1000) //该属性是队列的属性,设置消息的过期时间,消息在队列里面停留时间n毫秒后,就会把这个消息投递到死信交换机,针对的是所有的消息
                //.maxLength(20) //设置队列存放消息的最大个数,x-max-length属性值,当队列里面消息超过20,会把队列之前的消息依次放进死信队列
                .build();
    }

六、 第四种情景:  消息队列达到最大容量

    //业务队列
    @Bean
    public Queue queue1() {
        return QueueBuilder
                .durable("Direct_Q01")
                .deadLetterExchange("Dead_E01")
                .deadLetterRoutingKey("RK_DEAD")
//                .ttl(10*1000) //该属性是队列的属性,设置消息的过期时间,消息在队列里面停留时间n毫秒后,就会把这个消息投递到死信交换机,针对的是所有的消息
                .maxLength(5) //设置队列存放消息的最大个数,x-max-length属性值,当队列里面消息超过5,会把队列之前的消息依次放进死信队列
                .build();
    }
    /**
     * 测试直联交换机
     *
     * @throws IOException
     * @throws InterruptedException
     */
    @Test
    void contextLoads() throws IOException, InterruptedException {
        for (int i = 0; i < 8; i++) {
            OrderingOk orderingOk = OrderingOk.builder().id(1).name("张三"+i).build();
            directProvide.send(orderingOk);
        }
       
        System.in.read();
    }

id为1,2,3的被装进了死信队列,因为数据太老,业务队列优先要新的数据 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1578829.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

备考ICA----Istio实验17---TCP流量授权

备考ICA----Istio实验17—TCP流量授权 1. 环境准备 1.1 环境部署 kubectl apply -f <(istioctl kube-inject -f istio/samples/tcp-echo/tcp-echo.yaml) -n kim kubectl apply -f <(istioctl kube-inject -f istio/samples/sleep/sleep.yaml) -n kim1.2 测试环境 检测…

面试算法-154-搜索二维矩阵 II

题目 编写一个高效的算法来搜索 m x n 矩阵 matrix 中的一个目标值 target 。该矩阵具有以下特性&#xff1a; 每行的元素从左到右升序排列。 每列的元素从上到下升序排列。 示例 1&#xff1a; 输入&#xff1a;matrix [[1,4,7,11,15],[2,5,8,12,19],[3,6,9,16,22],[10,…

解决 IDEA每次打开新的项目都要重新设置maven问题

目录 一、当前项目设置maven 如下图&#xff1a; 二、设置打开的新项目的maven 如下图&#xff1a;​ 一、当前项目设置maven 对于当前项目我们都知道设置maven的配置要在 File -- Settings -- Build -- Maven 中设置 如下图&#xff1a; 二、设置打开的新项目的maven F…

AWE2024酷开科技智能家居,让生活从此更智能!

随着科技的飞速发展&#xff0c;智能家居已经成为了人们生活中不可或缺的一部分。在这个领域里&#xff0c;酷开科技品类逐渐丰富&#xff0c;在AWE2024展会上展现出耀眼光芒&#xff0c;将全品类智能家电新品集结亮相&#xff01;让人们的生活更加便捷、舒适和智能化。 酷开K…

WPF使用MVVM,将Image中的图片绑定到OpenCVSharp中的Mat类型

看了很多帖子&#xff0c;代码复制过去都是报错的&#xff0c;查看了OpenCVSharp.Extensions的底层&#xff0c;发现用法在WPF中已经进行了更改&#xff0c;原本需要从Mat->Bitmap->BitmapImage&#xff0c;简化成了Mat->BitmapSource这一个过程&#xff0c;所以这也是…

从零开始:一步步学习爬虫技术的实用指南(一)

从零开始&#xff1a;一步步学习爬虫技术的实用指南&#xff08;一&#xff09; Urllib1.什么是互联网爬虫2.爬虫核心3.爬虫的用途4.爬虫的分类4.1 通用爬虫&#xff1a;4.1 聚焦爬虫&#xff1a; 5.反爬手段5.1 User‐Agent&#xff1a;5.2.代理IP5.3.验证码访问5.4.动态加载网…

Triton Server Python 后端优化

接上文 不使用 Docker 构建 Triton 服务器并在 Google Colab 平台上部署 HuggingFace 模型 MultiGPU && Multi Instance Config 追加 instance_group [{count: 4kind: KIND_GPUgpus: [ 0, 1 ]} ]Python Backend Triton 会根据配置信息启动四个实例&#xff0c;…

微服务初始及Eureka注册中心

1&#xff0c;架构演变 单体架构&#xff1a;将所有业务功能集中在一个项目中开发&#xff0c;达成一个包部署 优点&#xff1a;架构简单&#xff0c;部署成本低 缺点&#xff1a;项目耦合度高 分布式架构&#xff1a;根据业务功能对系统进行拆分&#xff0c;每个业务作为独…

2024 年广东省职业院校技能大赛(高职组)“云计算应用”赛项样题 5

#需要资源&#xff08;软件包及镜像&#xff09;或有问题的&#xff0c;可私聊博主&#xff01;&#xff01;&#xff01; #需要资源&#xff08;软件包及镜像&#xff09;或有问题的&#xff0c;可私聊博主&#xff01;&#xff01;&#xff01; #需要资源&#xff08;软件包…

SpringBoot学习笔记二

SpringBoot学习笔记二 1.SpringBoot配置加载顺序1.1 内部配置加载顺序1.2 外部配置加载顺序 2. SpringBoot整合其他框架2.1 SpringBoot整合Test2.2 SpringBoot整合Redis 1.SpringBoot配置加载顺序 1.1 内部配置加载顺序 同理可知&#xff0c;父项目中的confg下的配置优先级最…

web前端框架设计第四课-条件判断与列表渲染

web前端框架设计第四课-条件判断与列表渲染 一.预习笔记 1.条件判断 1-1&#xff1a;v-if指令&#xff1a;根据表达式的值来判断是否输出DOM元素 1-2&#xff1a;template中使用v-if 1-3&#xff1a;v-else 1-4&#xff1a;v-else-if 1-5&#xff1a;v-show&#xff08;不支…

flood_fill 算法|图形渲染

flood fill 算法常常用来找极大连通子图&#xff0c;这是必须掌握的基本算法之一&#xff01; 图形渲染 算法原理 我们可以利用DFS遍历数组把首个数组的值记为color&#xff0c;然后上下左右四个方向遍历二维数组数组如果其他方块的值不等于color 或者越界就剪枝 return 代码…

javaScript手写专题——实现instanceof/call/apply/bind/new的过程/继承方式

目录 原型链相关 手写instanceof 实现一个_instance方法&#xff0c;判断对象obj是否是target的实例 测试 手写new的过程 实现一个myNew方法&#xff0c;接收一个构造函数以及构造函数的参数&#xff0c;返回构造函数创建的实例对象 测试myNew方法 手写类的继承 ES6&…

深入理解JVM后端优化技术-逃逸分析(Escape Analysis)

相关系统 深入理解jvm执行引擎-CSDN博客 深入理解JVM后端优化技术-方法内联-CSDN博客 定义 当一个对象在方法里面被定义后,它可能让外部方法所引用,作为调用参数传递到其它的方法中,这种称为方法逃逸;还有可能被外部线程访问到,赋值给可以在其它线程中访问的实例数量,这…

鼠标灵敏度怎么调,鼠标灵敏度怎么调最稳

鼠标和键盘是操作计算机过程中使用最频繁的设备之一&#xff0c;用电脑的时&#xff0c;我敢说你一定离不开鼠标。有些用户发现鼠标不太好用&#xff0c;尤其是在游戏时&#xff0c;总觉得鼠标移动太慢了。另外&#xff0c;如果你感觉鼠标按键失灵、鼠标单击变双击以及反应迟钝…

JVM的简单介绍

目录 一、JVM的简单介绍 JVM的执行流程 二、JVM中的内存区域划分 1、堆&#xff08;只有一份&#xff09; 2、栈&#xff08;可能有N份&#xff09; 3、程序计数器&#xff08;可能有N份&#xff09; 4、元数据区&#xff08;只有一份&#xff09; 经典笔试题 三、JVM…

C++类与对象中(个人笔记)

类与对象中 类的6个默认成员函数1.构造函数1.1特性 2.析构函数2.1特性 3.拷贝构造函数3.1特性 4.赋值运算符重载4.1特性 5.日期类的实现6.const成员6.1const成员的几个问题 7.取地址及const取地址操作符重载 类的6个默认成员函数 如果一个类中什么成员都没有&#xff0c;简称为…

k8s集群node节点状态为Not Ready

目录 一、Node节点Not Ready状态的可能原因 二、排查node节点状态为Not Ready的原因 一、Node节点Not Ready状态的可能原因 node节点状态为Not Ready可能的原因有&#xff1a; 1.网络插件出问题 有过安装经验的小伙伴应该很熟悉未安装网络插件的情况下node节点在集群中的状…

nacos分布式程序开发实例

1.通过windows docker desktop 完成 nacos 的安装/启动/配置 &#xff08;1&#xff09;先安装docker desktop docker-toolbox-windows-docker-for-windows-stable安装包下载_开源镜像站-阿里云 &#xff08;2&#xff09;配置docker 国内镜像源 Docker 镜像加速 | 菜鸟教程…

【相机方案】智能驾驶的域控采用的“串行器和解串器”方案的总结(持续更新),SerDes,GMSL

SerDes是Serializer/Deserializer的缩写&#xff0c;即串行器和解串器。由于同轴线的传输延迟几乎可以忽略不计&#xff08;ns级别&#xff09;&#xff0c;相当于将原来只能短距离传输的高速并行信号(MIPI/I2C/CLK等)的传输距离延长&#xff0c;真正做到高带宽、低延迟、长距离…