【初始RabbitMQ】死信队列的实现

news2025/1/23 9:13:17

死信的概念

死信,顾名思义就是无法被消费的消息,字面意思可以这样理解,一般来说,producer 将消息投递到 broker 或者直接到 queue 里了,consumer 从 queue 取出消息 进行消费,但某些时候由于特定的原因导致 queue 中的某些消息无法被消费,这样的消息如果没有 后续的处理,就变成了死信,有死信自然就有了死信队列

应用场景:为了保证订单业务的消息数据不丢失,需要使用到 RabbitMQ 的死信队列机制,当消息消费发生异常时,将消息投入死信队列中。还有比如说:用户在商城下单成功并点击去支付后在指定时间未支付时自动失效

死信的来源

死信的来源主要是有以下几个部分:

  1. 消息 TTL 过期
  2. 队列达到最大长度(队列满了,无法再添加数据到 mq 中)
  3. 消息被拒绝(basic.reject 或 basic.nack)并且 requeue=false

死信的实战

代码架构图

消息TTL过期

生产者代码:

public class Producer {
    private static final String NORMAL_EXCHANGE = "normal_exchange";

    public static void main(String[] args) throws IOException {
        Channel channel = RabbitMqUtils.getChannel();
        channel.exchangeDeclare(NORMAL_EXCHANGE,"direct");
        //设置消息TTL的时间
        /**
         * 设置AMQP(高级消息队列协议)消息的属性
         * AMQP.BasicProperties: 这是AMQP协议中用于定义消息属性的类。
         * new AMQP.BasicProperties().builder(): 这里创建了一个新的AMQP.BasicProperties对象,并通过调用其builder()方法来开始构建该对象的属性。
         * .expiration("10000"): 在构建过程中,通过.expiration()方法设置了消息的expiration属性。expiration属性用于定义消息的TTL,即消息在队列中等待消费的时间。如果在这段时间内消息没有被消费,它将被自动丢弃。
         * 在这里,expiration的值被设置为"10000",这意味着消息的TTL是10,000毫秒,也就是10秒。
         * .build(): 最后,通过调用.build()方法来结束构建过程并返回完全配置的AMQP.BasicProperties对象。
         */
        AMQP.BasicProperties properties =
                new AMQP.BasicProperties().builder().expiration("10000").build();
        for (int i = 1; i < 11; i++) {
            String message = "info"+i;
            /**
             * 发送一个消息
             * 1.发送到那个交换机
             * 2.路由的KEY值是哪个 本次是队列的名称
             * 3.其他参数信息
             * 4.发送消息的消息体
             */
            channel.basicPublish(NORMAL_EXCHANGE,"zhangsan",properties,
                    message.getBytes(StandardCharsets.UTF_8));
        }
    }
}

消费者1的代码:

public class Consumer01 {
    //普通交换机名称
    private static final String NORMAL_EXCHANGE = "normal_exchange";
    //死信交换机名称
    private static final String DEAD_EXCHANGE = "dead_exchange";

    public static void main(String[] args) throws IOException {
        Channel channel = RabbitMqUtils.getChannel();
        //声明死信和普通交换机 类型为direct
        channel.exchangeDeclare(NORMAL_EXCHANGE,"direct");
        channel.exchangeDeclare(DEAD_EXCHANGE,"direct");
        //声明死信队列
        String deadQueue = "dead_queue";
        /*
         *生成一个队列
         * 1.队列名称
         * 2.队列里面的信息是否持久化(磁盘)默认情况时在内存
         * 3.该队列是否只供一个消费者进行消费 是否消费共享 true是允许
         * 4.是否自动删除 最后一个消费者断开连接之后 该队列是否自动删除 true自动删除 false不自动删除
         * 5.其他参数 延迟消息等
         */
        channel.queueDeclare(deadQueue,false,false,false,null);
        //死信队列绑定死信交换机与routingKey
        channel.queueBind(deadQueue,DEAD_EXCHANGE,"lisi");
        //正常队列绑定死信队列信息
        Map<String, Object> params = new HashMap<>();
        //正常队列设置死信交换机 参数 key 是固定值
        params.put("x-dead-letter-exchange", DEAD_EXCHANGE);
        //正常队列设置死信 routing-key 参数 key 是固定值
        params.put("x-dead-letter-routing-key", "lisi");
        String normalQueue = "normal-queue";
        channel.queueDeclare(normalQueue, false, false, false, params);
        channel.queueBind(normalQueue, NORMAL_EXCHANGE, "zhangsan");
        System.out.println("C1等待接收消息......");
        DeliverCallback deliverCallback = (consumerTag,delivery)->{
            String message = new String(delivery.getBody(),"UTF-8");
            System.out.println("C1 接收到消息"+message);
        };
        /**
         * 消费者信息
         * 1.消费哪个队列
         * 2.消费成功之后是否要自动应答 true自动应答 false手动应答
         * 3.消费者成功才能更改消费的回调
         * 4.消费者取消消费回调
         */
        channel.basicConsume(normalQueue,true,deliverCallback,consumerTag->{
        });
    }
}

此时启动生产者观看RabbitMQ的变化

消费者2的代码(用于消耗死信队列中的信息): 

public class Consumer02 {
    private static final String DEAD_EXCHANGE = "dead_exchange";

    public static void main(String[] args) throws IOException {
        Channel channel = RabbitMqUtils.getChannel();
        channel.exchangeDeclare(DEAD_EXCHANGE, BuiltinExchangeType.DIRECT);
        String deadQueue = "dead-queue";
        channel.queueDeclare(deadQueue, false, false, false, null);
        channel.queueBind(deadQueue, DEAD_EXCHANGE, "lisi");
        System.out.println("等待接收死信队列消息.....");
        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            String message = new String(delivery.getBody(), "UTF-8");
            System.out.println("Consumer02 接收死信队列的消息" + message);
        };
        channel.basicConsume(deadQueue, true, deliverCallback, consumerTag -> {
        });
    }
}

运行结果: 

队列达到了最大长度

消息生产者代码去掉 TTL 属性:

public class Producer {
    private static final String NORMAL_EXCHANGE = "normal_exchange";

    public static void main(String[] argv) throws Exception {
        Channel channel = RabbitMqUtils.getChannel();
        channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);
        //该信息是用作演示队列个数限制
        for (int i = 1; i < 11; i++) {
            String message = "info" + i;
            channel.basicPublish(NORMAL_EXCHANGE, "zhangsan", null, message.getBytes());
            System.out.println("生产者发送消息:" + message);
        }
    }
}

C1 消费者修改以下代码:

 C2 消费者代码不变(启动 C2 消费者),运行结果如下:

如果 报错请登录rabbitMQ管理平台将此队列删除:

 消息被拒

消息生产者代码同上生产者一致

C1 消费者的代码:

public class Consumer01 {
    //普通交换机名称
    private static final String NORMAL_EXCHANGE = "normal_exchange";
    //死信交换机名称
    private static final String DEAD_EXCHANGE = "dead_exchange";

    public static void main(String[] args) throws IOException {
        Channel channel = RabbitMqUtils.getChannel();
        //声明死信和普通交换机 类型为direct
        channel.exchangeDeclare(NORMAL_EXCHANGE,"direct");
        channel.exchangeDeclare(DEAD_EXCHANGE,"direct");
        //声明死信队列
        String deadQueue = "dead-queue";
        /*
         *生成一个队列
         * 1.队列名称
         * 2.队列里面的信息是否持久化(磁盘)默认情况时在内存
         * 3.该队列是否只供一个消费者进行消费 是否消费共享 true是允许
         * 4.是否自动删除 最后一个消费者断开连接之后 该队列是否自动删除 true自动删除 false不自动删除
         * 5.其他参数 延迟消息等
         */
        channel.queueDeclare(deadQueue,false,false,false,null);
        //死信队列绑定死信交换机与routingKey
        channel.queueBind(deadQueue,DEAD_EXCHANGE,"lisi");
        //正常队列绑定死信队列信息
        Map<String, Object> params = new HashMap<>();
        //正常队列设置死信交换机 参数 key 是固定值
        params.put("x-dead-letter-exchange", DEAD_EXCHANGE);
        //正常队列设置死信 routing-key 参数 key 是固定值
        params.put("x-dead-letter-routing-key", "lisi");
        String normalQueue = "normal-queue";
        channel.queueDeclare(normalQueue, false, false, false, params);
        channel.queueBind(normalQueue, NORMAL_EXCHANGE, "zhangsan");
        System.out.println("C1等待接收消息......");
        DeliverCallback deliverCallback = (consumerTag,delivery)->{
            String message = new String(delivery.getBody(), "UTF-8");
            if(message.equals("info5")){
                System.out.println("Consumer01 接收到消息" + message + "并拒绝签收该消息");
                //requeue 设置为 false 代表拒绝重新入队 该队列如果配置了死信交换机将发送到死信队列中
                channel.basicReject(delivery.getEnvelope().getDeliveryTag(), false);
            }else {
                System.out.println("Consumer01 接收到消息"+message);
                channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
            }

        };
        /**
         * 消费者信息
         * 1.消费哪个队列
         * 2.消费成功之后是否要自动应答 true自动应答 false手动应答
         * 3.消费者成功才能更改消费的回调
         * 4.消费者取消消费回调
         */
        channel.basicConsume(normalQueue,false,deliverCallback,consumerTag->{
        });
    }
}

C2 消费者代码不变

启动消费者 1 然后再启动消费者 2 ,运行结果如下:

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1460659.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Java 那些诗一般的 数据类型 (1)

本篇会加入个人的所谓‘鱼式疯言’ ❤️❤️❤️鱼式疯言:❤️❤️❤️此疯言非彼疯言 而是理解过并总结出来通俗易懂的大白话, 小编会尽可能的在每个概念后插入鱼式疯言,帮助大家理解的. &#x1f92d;&#x1f92d;&#x1f92d;可能说的不是那么严谨.但小编初心是能让更多人…

私房菜|私房菜定制上门服务系统|基于springboot+vue私房菜定制上门服务系统设计与实现(源码+数据库+文档)

私房菜定制上门服务系统目录 目录 基于springbootvue私房菜定制上门服务系统设计与实现 一、前言 二、系统功能设计 三、系统实现 1、管理员功能实现 &#xff08;1&#xff09;菜品管理 &#xff08;2&#xff09;公告管理 &#xff08;3&#xff09; 厨师管理 2、用…

四非保研之旅

大家好&#xff0c;我是工藤学编程&#xff0c;虽有万分感概&#xff0c;但是话不多说&#xff0c;先直接进入正题&#xff0c;抒情环节最后再说&#xff0c;哈哈哈 写在开头 我的分享是来给大家涨信心的&#xff0c;网上的大佬们都太强了&#xff0c;大家拿我涨涨信心&#…

Linux 上安装及卸载JDK(包含yum方式)

一、 删除JDK 1、先输入java -version查看是否安装了JDK [rootiZbp117bkiezirqkean6g3Z java-11-openjdk-11.0.21.0.9-2.0.3.al8.x86_64]# java -version openjdk version "11.0.21" 2023-10-17 LTS OpenJDK Runtime Environment (Red_Hat-11.0.21.0.9-1) (build 1…

新版AI系统ChatGPT源码支持GPT-4/支持AI绘画去授权

源码获取方式 搜一搜&#xff1a;万能工具箱合集 点击资源库直接进去获取源码即可 如果没看到就是待更新&#xff0c;会陆续更新上 新版AI系统ChatGPT网站源码支持GPT-4/支持AI绘画/Prompt应用/MJ绘画源码/PCH5端/免授权&#xff0c;支持关联上下文&#xff0c;意间绘画模型…

面向对象详解,面向对象的三大特征:封装、继承、多态

文章目录 一、面向对象与面向过程1、什么是面向过程&#xff1f;2、什么是面向对象&#xff1f; 二、类与对象1. 初识对象2. 类的成员方法2.1 类的定义和使用2.2 成员方法 3. 类和对象4. 魔法方法1. _ _ inint _ _ 构造方法2. _ _ str _ _ 字符串方法3. _ _ lt _ _ 小于符号比较…

基于python社交网络大数据分析系统的设计与实现

项目&#xff1a;基于python社交网络大数据分析系统的设计与实现 摘 要 社交网络大数据分析系统是一种能自动从网络上收集信息的工具&#xff0c;可根据用户的需求定向采集特定数据信息的工具&#xff0c;本项目通过研究爬取微博网来实现社交网络大数据分析系统功能。对于采集…

十二、通过色彩空间转换进行更换图片背景

项目功能实现&#xff1a;对一张白色背景的图片进行更换成蓝色背景&#xff0c;类似抠图更换背景操作 按照之前的博文结构来&#xff0c;这里就不在赘述了 一、头文件 inrange.h #pragma once#include<opencv2/opencv.hpp>using namespace cv;class INRANGE{ public:v…

leetcode hot100组合综合四

本题中&#xff0c;是要求nums中求的总和为target的排列数&#xff0c;因为题中说了&#xff0c;元素顺序不同&#xff0c;则可以视为不同的结果之一。 所以&#xff0c;根据对背包问题的总结&#xff0c;本题中元素可以重复使用&#xff0c;是完全背包并且需要求排列数&#…

我是怎么用静态IP代理为Google账号保驾护航的

我为何要使用到静态IP代理服务 我是一名IT从业者&#xff0c;在很多年前就加入了一家跨国软件公司&#xff0c;日常需要在全世界各地跟甲方沟通&#xff0c;负责的工作中重要的一块就是Google广告&#xff0c;为此公司还特意给配置了一台笔记本电脑。 目录 我为何要使用到静态…

教学设计与课堂教学的关系是什么

当你走进教室&#xff0c;是否曾思考过这背后的精心策划&#xff1f;每一堂课的呈现&#xff0c;都源于细致的教学设计。那么&#xff0c;教学设计与课堂教学之间&#xff0c;又隐藏着怎样的秘密呢&#xff1f; 教学设计&#xff0c;就像是一部剧本&#xff0c;为演员&#xf…

Puppeteer 使用实战:如何将自己的 CSDN 专栏文章导出并用于 Hexo 博客(一)

文章目录 效果展示说明利用工具整体思路Puppeteer 使用笔记保持登录状态打开新的页面点击 dialog跳转页面设置页面可见窗口大小寻找元素等待元素出现 整体代码 效果展示 说明 看了看网上很少做这个功能&#xff0c;但是我有这个需求&#xff0c;就抽出时间写了个简单的工具目前…

preCICE流固耦合仿真资料整理

preCICE The coupling library for partitioned multi-physics simulations 资料 Tutorials preCICE—多物理场耦合工具介绍 My First Fluid-Solid Interaction Case CalculiX 流固耦合&#xff08;FESIM有限元分析&#xff09; Coupling with preCICE by Gerasimos Chourdak…

k8s-配置与存储-配置管理

文章目录 一、配置存储1.1 ConfigMap1.1.1.基于文件夹的创建方式1.1.2指定文件的创建方式1.1.3 配置文件创建configmap 1.2 Secret1.2.1Secret的应用与Docker仓库 Secret设置1. Kubernetes 中的 Secrets&#xff1a;创建 Secret 示例&#xff1a;将 Secret 挂载到 Pod 中的示例…

URL、DNS过滤,AV---防火墙综合实验

拓扑图 该实验之前的配置请看我的上一篇博客&#xff0c;这里仅配置URL、DNS过滤&#xff0c;AV 需求 8&#xff0c;分公司内部的客户端可以通过域名访问到内部的服务器 这次的拓扑图在外网多增加了一个DNS服务器和HTTP服务器 DNS服务器IP&#xff1a;40.0.0.30 HTTP服务器…

Pandas快问快答16-30题

16. 如何对一个Pandas数据框进行聚合操作? 聚合操作是数据处理中的一种重要方式&#xff0c;主要用于对一组数据进行汇总和计算&#xff0c;以得到单一的结果。在聚合操作中&#xff0c;可以执行诸如求和、平均值、最大值、最小值、计数等统计操作。这些操作通常用于从大量数…

WebGIS开发常用的JS库:VUE/React/Angular对比

Angular: 作用&#xff1a; Angular是一个完整的基于TypeScript的Web应用开发框架&#xff0c;主要用于构建单页Web应用&#xff08;SPA&#xff09;。它适用于大型和复杂的项目&#xff0c;具有强大的组件集合和丰富的文档。 架构&#xff1a; Angular采用组件化的方式&am…

19个Web前端交互式3D JavaScript框架和库

JavaScript &#xff08;JS&#xff09; 是一种轻量级的解释&#xff08;或即时编译&#xff09;编程语言&#xff0c;是世界上最流行的编程语言。JavaScript 是一种基于原型的多范式、单线程的动态语言&#xff0c;支持面向对象、命令式和声明式&#xff08;例如函数式编程&am…

ts快速入门

文章目录 一、运行环境1、线上Playground2、VSCode 编辑器3、Code Runner 插件4、ts-node 二、声明1、变量声明2、常量声明3、类型推断 三、常用数据类型1、number2、string3、boolean4、数组5、对象 四、函数1、函数声明语法2、参数详解&#xff08;1&#xff09;特殊语法&…

物体检测-系列教程8:YOLOV5 项目配置

1、项目配置 yolo的v1、v2、v3、v4这4个都有一篇对应的论文&#xff0c;而v5在算法上没有太大的改变&#xff0c;主要是对v4做了一个更好的工程化实现 1.1 环境配置 深度学习环境安装请参考&#xff1a;PyTorch 深度学习 开发环境搭建 全教程 要求torch版本>1.6&#xf…