RabbitMQ5-死信队列

news2025/3/7 9:07:57

目录

死信的概念

死信的来源

死信实战

死信之TTl

死信之最大长度

死信之消息被拒


死信的概念

死信,顾名思义就是无法被消费的消息,一般来说,producer 将消息投递到 broker 或直接到queue 里了,consumer 从 queue 取出消息进行消费,但某些时候由于特定的原因导致 queue 中的某些消息无法被消费,这样的消息如果没有后续的处理,就变成了死信,有死信自然就有了死信队列。

应用场景:为了保证订单业务的消息数据不丢失,需要使用到 RabbitMQ 的死信队列机制,当消息消费发生异常时,将消息投入死信队列中;还有用户在商城下单成功并点击去支付后在指定时间未支付时自动失效。

死信的来源

  • 消息 TTL 过期

    TTL是Time To Live的缩写, 也就是生存时间

  • 队列达到最大长度

    队列满了,无法再添加数据到 mq 中

  • 消息被拒绝

    (basic.reject 或 basic.nack) 并且 requeue=false

死信实战

死信之TTl

消费者 C1:

package com.qcby.sixin;

import com.qcby.util.RabbitMqUtils;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;

import java.util.HashMap;
import java.util.Map;

/**
 * 死信队列 - 消费者1
 *
 */
public class Consumer01 {

    //普通交换机名称
    private static final String NORMAL_EXCHANGE = "normal_exchange";

    //死信交换机名称
    private static final String DEAD_EXCHANGE = "dead_exchange";

    public static void main(String[] args) throws Exception {
        Channel channel = RabbitMqUtils.getChannel();

        //声明死信和普通交换机 类型为 direct
        channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);
        channel.exchangeDeclare(DEAD_EXCHANGE, BuiltinExchangeType.DIRECT);

        //声明死信队列
        String deadQueue = "dead-queue";
        channel.queueDeclare(deadQueue, false, false, false, null);
        //死信队列绑定:队列、交换机、路由键(routingKey)
        channel.queueBind(deadQueue, DEAD_EXCHANGE, "lisi");


        //正常队列绑定死信队列信息
        Map<String, Object> params = new HashMap<>();
        //正常队列设置死信交换机,参数 key 是固定值
        params.put("x-dead-letter-exchange", DEAD_EXCHANGE);
        //正常队列设置死信 routing-key,参数 key 是固定值
        params.put("x-dead-letter-routing-key", "lisi");
        //正常队列
        String normalQueue = "normal-queue";
        channel.queueDeclare(normalQueue, false, false, false, params);
        channel.queueBind(normalQueue, NORMAL_EXCHANGE, "zhangsan");


        System.out.println("等待接收消息........... ");
        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            String message = new String(delivery.getBody(), "UTF-8");
            System.out.println("Consumer01 接收到消息" + message);
        };
        channel.basicConsume(normalQueue, true, deliverCallback, consumerTag -> {
        });
    }

}

生产者:

package com.qcby.sixin;

import com.qcby.util.RabbitMqUtils;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;

public class Producer {
    private static final String NORMAL_EXCHANGE = "normal_exchange";

    public static void main(String[] argv) throws Exception {
        Channel channel = RabbitMqUtils.getChannel();

        channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);
        //设置消息的 TTL 时间 10s
        AMQP.BasicProperties properties = new AMQP.BasicProperties().builder().expiration("10000").build();
        //该信息是用作演示队列个数限制
        for (int i = 1; i < 11; i++) {
            String message = "info" + i;
            channel.basicPublish(NORMAL_EXCHANGE, "zhangsan", properties, message.getBytes());
            System.out.println("生产者发送消息:" + message);
        }

    }
}

消费者 C2:

package com.qcby.sixin;

import com.qcby.util.RabbitMqUtils;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;

public class Consumer02 {
    //死信交换机名称
    private static final String DEAD_EXCHANGE = "dead_exchange";

    public static void main(String[] args) throws Exception {
        Channel channel = RabbitMqUtils.getChannel();

        //声明交换机
        channel.exchangeDeclare(DEAD_EXCHANGE, BuiltinExchangeType.DIRECT);
        //声明队列
        String deadQueue = "dead-queue";
        channel.queueDeclare(deadQueue, false, false, false, null);
        channel.queueBind(deadQueue, DEAD_EXCHANGE, "lisi");

        System.out.println("等待接收死信消息........... ");
        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            String message = new String(delivery.getBody(), "UTF-8");
            System.out.println("Consumer02 接收到消息" + message);
        };
        channel.basicConsume(deadQueue, true, deliverCallback, consumerTag -> {
        });
    }
}

启动 C1 ,之后关闭消费者,模拟其接收不到消息,再启动 Producer:

启动 C2 消费者,它消费死信队列里面的消息:

死信之最大长度

消费者 C1:

package com.qcby.sixin.two;

import com.qcby.util.RabbitMqUtils;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;

import java.util.HashMap;
import java.util.Map;

/**
 * 死信队列 - 消费者1
 *
 */
public class Consumer01 {

    //普通交换机名称
    private static final String NORMAL_EXCHANGE = "normal_exchange";

    //死信交换机名称
    private static final String DEAD_EXCHANGE = "dead_exchange";

    public static void main(String[] args) throws Exception {
        Channel channel = RabbitMqUtils.getChannel();

        //声明死信和普通交换机 类型为 direct
        channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);
        channel.exchangeDeclare(DEAD_EXCHANGE, BuiltinExchangeType.DIRECT);

        //声明死信队列
        String deadQueue = "dead-queue";
        channel.queueDeclare(deadQueue, false, false, false, null);
        //死信队列绑定:队列、交换机、路由键(routingKey)
        channel.queueBind(deadQueue, DEAD_EXCHANGE, "lisi");


        //正常队列绑定死信队列信息
        Map<String, Object> params = new HashMap<>();
        //正常队列设置死信交换机,参数 key 是固定值
        params.put("x-dead-letter-exchange", DEAD_EXCHANGE);
        //正常队列设置死信 routing-key,参数 key 是固定值
        params.put("x-dead-letter-routing-key", "lisi");
        //设置正常队列的长度限制
        params.put("x-max-length",6);
        
        //正常队列
        String normalQueue = "normal-queue";
        channel.queueDeclare(normalQueue, false, false, false, params);
        channel.queueBind(normalQueue, NORMAL_EXCHANGE, "zhangsan");


        System.out.println("等待接收消息........... ");
        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            String message = new String(delivery.getBody(), "UTF-8");
            System.out.println("Consumer01 接收到消息" + message);
        };
        channel.basicConsume(normalQueue, true, deliverCallback, consumerTag -> {
        });
    }

}

生产者:

package com.qcby.sixin.two;

import com.qcby.util.RabbitMqUtils;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;

public class Producer {
    private static final String NORMAL_EXCHANGE = "normal_exchange";

    public static void main(String[] argv) throws Exception {
        Channel channel = RabbitMqUtils.getChannel();

        //该信息是用作演示队列个数限制
        for (int i = 1; i < 11; i++) {
            String message = "info" + i;
            channel.basicPublish(NORMAL_EXCHANGE, "zhangsan", null, message.getBytes());
            System.out.println("生产者发送消息:" + message);
        }
    }
}

消费者 C2:

package com.qcby.sixin.two;

import com.qcby.util.RabbitMqUtils;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;

public class Consumer02 {
    //死信交换机名称
    private static final String DEAD_EXCHANGE = "dead_exchange";

    public static void main(String[] args) throws Exception {
        Channel channel = RabbitMqUtils.getChannel();

        //声明交换机
        channel.exchangeDeclare(DEAD_EXCHANGE, BuiltinExchangeType.DIRECT);
        //声明队列
        String deadQueue = "dead-queue";
        channel.queueDeclare(deadQueue, false, false, false, null);
        channel.queueBind(deadQueue, DEAD_EXCHANGE, "lisi");

        System.out.println("等待接收死信消息........... ");
        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            String message = new String(delivery.getBody(), "UTF-8");
            System.out.println("Consumer02 接收到消息" + message);
        };
        channel.basicConsume(deadQueue, true, deliverCallback, consumerTag -> {
        });
    }
}

死信之消息被拒

拒收消息 "info5"

消费者 C1:

package com.qcby.sixin.three;

import com.qcby.util.RabbitMqUtils;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;

import java.util.HashMap;
import java.util.Map;

/**
 * 死信队列 - 消费者1
 *
 */
public class Consumer01 {

    //普通交换机名称
    private static final String NORMAL_EXCHANGE = "normal_exchange";

    //死信交换机名称
    private static final String DEAD_EXCHANGE = "dead_exchange";

    public static void main(String[] args) throws Exception {
        Channel channel = RabbitMqUtils.getChannel();

        //声明死信和普通交换机 类型为 direct
        channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);
        channel.exchangeDeclare(DEAD_EXCHANGE, BuiltinExchangeType.DIRECT);

        //声明死信队列
        String deadQueue = "dead-queue";
        channel.queueDeclare(deadQueue, false, false, false, null);
        //死信队列绑定:队列、交换机、路由键(routingKey)
        channel.queueBind(deadQueue, DEAD_EXCHANGE, "lisi");


        //正常队列绑定死信队列信息
        Map<String, Object> params = new HashMap<>();
        //正常队列设置死信交换机,参数 key 是固定值
        params.put("x-dead-letter-exchange", DEAD_EXCHANGE);
        //正常队列设置死信 routing-key,参数 key 是固定值
        params.put("x-dead-letter-routing-key", "lisi");


        //正常队列
        String normalQueue = "normal-queue";
        channel.queueDeclare(normalQueue, false, false, false, params);
        channel.queueBind(normalQueue, NORMAL_EXCHANGE, "zhangsan");


        System.out.println("等待接收消息........... ");
        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            String message = new String(delivery.getBody(), "UTF-8");
            if (message.equals("info5")) {
                System.out.println("Consumer01 接收到消息" + message + "并拒绝签收该消息");
                //requeue 设置为 false 代表拒绝重新入队 该队列如果配置了死信交换机将发送到死信队列中
                channel.basicReject(delivery.getEnvelope().getDeliveryTag(), false);
            } else {
                System.out.println("Consumer01 接收到消息" + message);
                channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
            }
        };
        //开启手动应答
        channel.basicConsume(normalQueue, false, deliverCallback, consumerTag -> {
        });
    }

}

生产者:

package com.qcby.sixin.three;

import com.qcby.util.RabbitMqUtils;
import com.rabbitmq.client.Channel;

public class Producer {
    private static final String NORMAL_EXCHANGE = "normal_exchange";

    public static void main(String[] argv) throws Exception {
        Channel channel = RabbitMqUtils.getChannel();

        //该信息是用作演示队列个数限制
        for (int i = 1; i < 11; i++) {
            String message = "info" + i;
            channel.basicPublish(NORMAL_EXCHANGE, "zhangsan", null, message.getBytes());
            System.out.println("生产者发送消息:" + message);
        }
    }
}

消费者 C2:

package com.qcby.sixin.three;

import com.qcby.util.RabbitMqUtils;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;

public class Consumer02 {
    //死信交换机名称
    private static final String DEAD_EXCHANGE = "dead_exchange";

    public static void main(String[] args) throws Exception {
        Channel channel = RabbitMqUtils.getChannel();

        //声明交换机
        channel.exchangeDeclare(DEAD_EXCHANGE, BuiltinExchangeType.DIRECT);
        //声明队列
        String deadQueue = "dead-queue";
        channel.queueDeclare(deadQueue, false, false, false, null);
        channel.queueBind(deadQueue, DEAD_EXCHANGE, "lisi");

        System.out.println("等待接收死信消息........... ");
        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            String message = new String(delivery.getBody(), "UTF-8");
            System.out.println("Consumer02 接收到消息" + message);
        };
        channel.basicConsume(deadQueue, true, deliverCallback, consumerTag -> {
        });
    }
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2283473.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

[JavaScript] 面向对象编程

JavaScript 是一种多范式语言&#xff0c;既支持函数式编程&#xff0c;也支持面向对象编程。在 ES6 引入 class 语法后&#xff0c;面向对象编程在 JavaScript 中变得更加易于理解和使用。以下将详细讲解 JavaScript 中的类&#xff08;class&#xff09;、构造函数&#xff0…

Windows上通过Git Bash激活Anaconda

在Windows上配置完Anaconda后&#xff0c;普遍通过Anaconda Prompt激活虚拟环境并执行Python&#xff0c;如下图所示&#xff1a; 有时需要连续执行多个python脚本时&#xff0c;直接在Anaconda Prompt下可以通过在以下方式&#xff0c;即命令间通过&&连接&#xff0c;…

主机监控软件WGCLOUD使用指南 - 如何设置主题背景色

WGCLOUD运维监控系统&#xff0c;从v3.5.7版本开始支持设置不同的主题背景色&#xff0c;如下 更多主题查看说明 如何设置主题背景色 - WGCLOUD

C语言教程——文件处理(2)

目录 前言 一、顺序读写函数&#xff08;续&#xff09; 1.1fprintf 1.2fscanf 1.3fwrite 1.4fread 二、流和标准流 2.1流 2.2标准流 2.3示例 三、sscanf和sprintf 3.1sprintf 3.2sscanf 四、文件的随机读写 4.1fseek 4.2ftell 4.3rewind 五、文件读取结束的…

ios打包:uuid与udid

ios的uuid与udid混乱的网上信息 新人开发ios&#xff0c;发现uuid和udid在网上有很多帖子里是混淆的&#xff0c;比如百度下&#xff0c;就会说&#xff1a; 在iOS中使用UUID&#xff08;通用唯一识别码&#xff09;作为永久签名&#xff0c;通常是指生成一个唯一标识&#xf…

.NET9增强OpenAPI规范,不再内置swagger

ASP.NETCore in .NET 9.0 OpenAPI官方文档ASP.NET Core API 应用中的 OpenAPI 支持概述 | Microsoft Learnhttps://learn.microsoft.com/zh-cn/aspnet/core/fundamentals/openapi/overview?viewaspnetcore-9.0https://learn.microsoft.com/zh-cn/aspnet/core/fundamentals/ope…

hot100_234. 回文链表

给你一个单链表的头节点 head &#xff0c;请你判断该链表是否为回文链表。如果是&#xff0c;返回 true &#xff1b;否则&#xff0c;返回 false 。 示例 1&#xff1a; 输入&#xff1a;head [1,2,2,1] 输出&#xff1a;true 示例 2&#xff1a; 输入&#xff1a;head …

【超详细】ELK实现日志采集(日志文件、springboot服务项目)进行实时日志采集上报

本文章介绍&#xff0c;Logstash进行自动采集服务器日志文件&#xff0c;并手把手教你如何在springboot项目中配置logstash进行日志自动上报与日志自定义格式输出给logstash。kibana如何进行配置索引模式&#xff0c;可以在kibana中看到采集到的日志 日志流程 logfile-> l…

IPoIB(IP over InfiniBand)数据接收与发送机制详解

IPoIB&#xff08;IP over InfiniBand&#xff09;是一种在InfiniBand网络上实现IP协议的技术&#xff0c;它允许在InfiniBand网络上传输IP数据包。IPoIB通过将IP数据包封装在InfiniBand的数据包中&#xff0c;实现了在InfiniBand网络上的高效通信。本文将详细分析IPoIB如何接收…

Spring Boot - 数据库集成04 - 集成Redis

Spring boot集成Redis 文章目录 Spring boot集成Redis一&#xff1a;redis基本集成1&#xff1a;RedisTemplate Jedis1.1&#xff1a;RedisTemplate1.2&#xff1a;实现案例1.2.1&#xff1a;依赖引入和属性配置1.2.2&#xff1a;redisConfig配置1.2.3&#xff1a;基础使用 2&…

JAVAweb学习日记(八) 请数据库模型MySQL

一、MySQL数据模型 二、SQL语言 三、DDL 详细见SQL学习日记内容 四、DQL-条件查询 五、DQL-分组查询 聚合函数&#xff1a; 分组查询&#xff1a; 六、DQL-分组查询 七、分页查询 八、多表设计-一对多&一对一&多对多 一对多-外键&#xff1a; 一对一&#xff1a; 多…

Scratch游戏作品 | 僵尸来袭——生存大战,保卫你的领地!

今天为大家推荐一款刺激十足的Scratch射击生存游戏——《僵尸来袭》&#xff01;在这个充满危机的世界里&#xff0c;僵尸不断向你袭来&#xff0c;利用你的战斗技能与策略道具生存下来&#xff0c;成为最后的幸存者&#xff01;✨ 源码现已在小虎鲸Scratch资源站免费下载&…

C# 自定义随机字符串生成

目录 简介 调用方式&#xff1a; 详细说明 1.外层方法封装 2.定义字符组合 3.按长度生成 简介 随机字符串&#xff0c;包含数字&#xff0c;字母&#xff0c;特殊符号等&#xff0c;随机拼凑&#xff0c;长度可输入 为了生成效率&#xff0c;长度大于1024的&#xff0c…

【C++探索之路】STL---string

走进C的世界&#xff0c;也意味着我们对编程世界的认知达到另一个维度&#xff0c;如果你学习过C语言&#xff0c;那你绝对会有不一般的收获&#xff0c;感受到C所带来的码云风暴~ ---------------------------------------begin--------------------------------------- 什么是…

rust 发包到crates.io/ 操作流程 (十)

第一步github登录 https://crates.io/ 在项目里面login&#xff1a; cargo login ciol4sMwaR61YvzWniodRlssk6RfS4HcZTU --registry crates-io如果不想每次带 这个&#xff0c;就执行 vim ~/.cargo/config.toml 添加下面 [registry] default "crates-io"git a…

MongoDB部署模式

目录 单节点模式&#xff08;Standalone&#xff09; 副本集模式&#xff08;Replica Set&#xff09; 分片集群模式&#xff08;Sharded Cluster&#xff09; MongoDB有多种部署模式&#xff0c;可以根据业务需求选择适合的架构和部署方式。 单节点模式&#xff08;Standa…

国自然重点项目|代谢影像组学只能预测肺癌靶向耐药的关键技术与应用|基金申请·25-01-25

小罗碎碎念 今天和大家分享一个国自然重点项目&#xff0c;项目执行年限为2019.01 - 2023.12&#xff0c;直接费用为294万。 项目聚焦肺癌靶向治疗中药物疗效预测难题&#xff0c;整合多组学与代谢影像数据展开研究。 在研究过程中&#xff0c;团队建立动物模型获取多维数据&am…

NFT Insider #166:Nifty Island 推出 AI Agent Playground;Ronin 推出1000万美元资助计划

引言&#xff1a;NFT Insider 由 NFT 收藏组织 WHALE Members、BeepCrypto 联合出品&#xff0c; 浓缩每周 NFT 新闻&#xff0c;为大家带来关于 NFT 最全面、最新鲜、最有价值的讯息。每期周报将从 NFT 市场数据&#xff0c;艺术新闻类&#xff0c;游戏新闻类&#xff0c;虚拟…

Word 中实现方框内点击自动打 √ ☑

注&#xff1a; 本文为 “Word 中方框内点击打 √ ☑ / 打 ☒” 相关文章合辑。 对第一篇增加了打叉部分&#xff0c;第二篇为第一篇中方法 5 “控件” 实现的详解。 在 Word 方框内打 √ 的 6 种技巧 2020-03-09 12:38 使用 Word 制作一些调查表、检查表等&#xff0c;通常…

Cpp::静态 动态的类型转换全解析(36)

文章目录 前言一、C语言中的类型转换二、为什么C会有四种类型转换&#xff1f;内置类型 -> 自定义类型自定义类型 -> 内置类型自定义类型 -> 自定义类型隐式类型转换的坑 三、C强制类型转换static_castreinterpret_castconst_castdynamic_cast 四、RTTI总结 前言 Hell…