死信队列

news2025/1/23 4:09:18

死信队列

死信的概念

先从概念解释上搞清楚这个定义,死信,顾名思义就是无法被消费的消息,字面意思可以这样理解,一般来说,producer 将消息投递到 broker 或者直接到queue 里了,consumer 从 queue 取出消息进行消费,但某些时候由于特定的原因导致queue 中的某些消息无法被消费,这样的消息如果没有后续的处理,就变成了死信,有死信自然就有了死信队列.

应用场景:为了保证订单业务的消息数据不丢失,需要使用到 RabbitMQ 的死信队列机制,当消息消费发生异常时,将消息投入死信队列中.还有比如说: 用户在商城下单成功并点击去支付后在指定时间未支付时自动失效

造成死信的原因

  • 消息 TTL 过期
  • 队列达到最大长度(队列满了,无法再添加数据到 MQ 中)
  • 消息被拒绝(basic.reject 或 basic.nack)并且 requeue=false.

死信架构图

在这里插入图片描述

代码实战

  • TTL过期

    package com.vmware.rabbit.demo8;
    
    import com.rabbitmq.client.BuiltinExchangeType;
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.DeliverCallback;
    import com.vmware.rabbit.utils.RabbitUtil;
    
    import java.util.HashMap;
    
    public class Consumer {
        //普通交换机
        private static final String NORMAL_EXCHANGE = "normal_exchange";
        //死信交换机
        private static final String DEAD_EXCHANGE = "dead_exchange";
        //普通队列
        private static final String NORMAL_QUEUE = "normal_queue";
        //死信队列
        private static final String DEAD_QUEUE = "dead_queue";
    
        public static void main(String[] args) throws Exception {
            Connection connection = RabbitUtil.getConnection();
            Channel channel = connection.createChannel();
            //创建普通队列死信分发参数
            HashMap<String,Object> arguments= new HashMap<>();
            arguments.put("x-dead-letter-exchange",DEAD_EXCHANGE);
            arguments.put("x-dead-letter-routing-key","lisi");
            //创建普通交换机与队列并绑定
            channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);
            channel.queueDeclare(NORMAL_QUEUE,false,false,false,arguments);
            channel.queueBind(NORMAL_QUEUE,NORMAL_EXCHANGE,"zhangsan");
    
            //创建死信交换机和队列并绑定
            channel.exchangeDeclare(DEAD_EXCHANGE,BuiltinExchangeType.DIRECT);
            channel.queueDeclare(DEAD_QUEUE,false,false,false,null);
            channel.queueBind(DEAD_QUEUE,DEAD_EXCHANGE,"lisi");
    
    
            DeliverCallback deliverCallback= (tag,msg)->{
                String message = new String(msg.getBody());
                System.out.println("接收到消息:"+message);
                }
            };
    
            //创建消费者
            channel.basicConsume(NORMAL_QUEUE,false,deliverCallback,(tag)->{});
        }
    }
    
    package com.vmware.rabbit.demo8;
    
    import com.rabbitmq.client.AMQP;
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.vmware.rabbit.utils.RabbitUtil;
    
    public class Producer {
        private static final String EXCHANGE_NAME = "normal_exchange";
        private static final String ROUTING_KEY = "zhangsan";
    
        public static void main(String[] args) throws Exception {
            Connection connection = RabbitUtil.getConnection();
            System.out.println("已连接到RabbitMQ服务器....");
            Channel channel = connection.createChannel();
            //设置超时为10秒
            AMQP.BasicProperties basicProperties = new AMQP.BasicProperties().builder().expiration("10000").build();
            for (int i = 0; i < 10; i++) {
                String message = "msg" + i;
                channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY, basicProperties, message.getBytes());
                System.out.println("消息:"+message+"发送成功!");
            }
        }
    }
    
  • 队列达到最大长度

    package com.vmware.rabbit.demo8;
    
    import com.rabbitmq.client.BuiltinExchangeType;
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.DeliverCallback;
    import com.vmware.rabbit.utils.RabbitUtil;
    
    import java.util.HashMap;
    
    public class Consumer {
        //普通交换机
        private static final String NORMAL_EXCHANGE = "normal_exchange";
        //死信交换机
        private static final String DEAD_EXCHANGE = "dead_exchange";
        //普通队列
        private static final String NORMAL_QUEUE = "normal_queue";
        //死信队列
        private static final String DEAD_QUEUE = "dead_queue";
    
        public static void main(String[] args) throws Exception {
            Connection connection = RabbitUtil.getConnection();
            Channel channel = connection.createChannel();
            //创建普通队列死信分发参数
            HashMap<String,Object> arguments= new HashMap<>();
            arguments.put("x-dead-letter-exchange",DEAD_EXCHANGE);
            arguments.put("x-dead-letter-routing-key","lisi");
            //设置队列最大长度
            arguments.put("x-max-length",5);
            //创建普通交换机与队列并绑定
            channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);
            channel.queueDeclare(NORMAL_QUEUE,false,false,false,arguments);
            channel.queueBind(NORMAL_QUEUE,NORMAL_EXCHANGE,"zhangsan");
    
            //创建死信交换机和队列并绑定
            channel.exchangeDeclare(DEAD_EXCHANGE,BuiltinExchangeType.DIRECT);
            channel.queueDeclare(DEAD_QUEUE,false,false,false,null);
            channel.queueBind(DEAD_QUEUE,DEAD_EXCHANGE,"lisi");
    
    
            DeliverCallback deliverCallback= (tag,msg)->{
                String message = new String(msg.getBody());
           		System.out.println("消息:"+message+"被拒绝");
            };
    
            //创建消费者
            channel.basicConsume(NORMAL_QUEUE,false,deliverCallback,(tag)->{});
        }
    }
    
    package com.vmware.rabbit.demo8;
    
    import com.rabbitmq.client.AMQP;
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.vmware.rabbit.utils.RabbitUtil;
    
    public class Producer {
        private static final String EXCHANGE_NAME = "normal_exchange";
        private static final String ROUTING_KEY = "zhangsan";
    
        public static void main(String[] args) throws Exception {
            Connection connection = RabbitUtil.getConnection();
            System.out.println("已连接到RabbitMQ服务器....");
            Channel channel = connection.createChannel();
            for (int i = 0; i < 10; i++) {
                String message = "msg" + i;
                channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY, null, message.getBytes());
                System.out.println("消息:"+message+"发送成功!");
            }
        }
    }
    
  • 消息被拒

    package com.vmware.rabbit.demo8;
    
    import com.rabbitmq.client.BuiltinExchangeType;
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.DeliverCallback;
    import com.vmware.rabbit.utils.RabbitUtil;
    
    import java.util.HashMap;
    
    public class Consumer {
        //普通交换机
        private static final String NORMAL_EXCHANGE = "normal_exchange";
        //死信交换机
        private static final String DEAD_EXCHANGE = "dead_exchange";
        //普通队列
        private static final String NORMAL_QUEUE = "normal_queue";
        //死信队列
        private static final String DEAD_QUEUE = "dead_queue";
    
        public static void main(String[] args) throws Exception {
            Connection connection = RabbitUtil.getConnection();
            Channel channel = connection.createChannel();
            //创建普通队列死信分发参数
            HashMap<String,Object> arguments= new HashMap<>();
            arguments.put("x-dead-letter-exchange",DEAD_EXCHANGE);
            arguments.put("x-dead-letter-routing-key","lisi");
            //创建普通交换机与队列并绑定
            channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);
            channel.queueDeclare(NORMAL_QUEUE,false,false,false,arguments);
            channel.queueBind(NORMAL_QUEUE,NORMAL_EXCHANGE,"zhangsan");
    
            //创建死信交换机和队列并绑定
            channel.exchangeDeclare(DEAD_EXCHANGE,BuiltinExchangeType.DIRECT);
            channel.queueDeclare(DEAD_QUEUE,false,false,false,null);
            channel.queueBind(DEAD_QUEUE,DEAD_EXCHANGE,"lisi");
    
    
            DeliverCallback deliverCallback= (tag,msg)->{
                String message = new String(msg.getBody());
                if (message.equals("msg5")){
                    System.out.println("消息:"+message+"被拒绝");
                    channel.basicReject(msg.getEnvelope().getDeliveryTag(),false);
                }else {
                    System.out.println("接收到消息:"+message);
                    channel.basicAck(msg.getEnvelope().getDeliveryTag(),false);
                }
            };
    
            //创建消费者
            channel.basicConsume(NORMAL_QUEUE,false,deliverCallback,(tag)->{});
        }
    }
    
    package com.vmware.rabbit.demo8;
    
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.DeliverCallback;
    import com.vmware.rabbit.utils.RabbitUtil;
    
    public class Consumer2 {
        private static final String DEAD_QUEUE_NAME = "dead_queue";
    
        public static void main(String[] args)throws Exception {
            Connection connection = RabbitUtil.getConnection();
            Channel channel = connection.createChannel();
            DeliverCallback deliverCallback=(tag,msg)->{
                String message= new String(msg.getBody());
                System.out.println("队列:"+DEAD_QUEUE_NAME+"\t收到消息:"+message);
            };
            channel.basicConsume(DEAD_QUEUE_NAME,true,deliverCallback,(tag)->{});
        }
    }
    
    package com.vmware.rabbit.demo8;
    
    import com.rabbitmq.client.AMQP;
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.vmware.rabbit.utils.RabbitUtil;
    
    public class Producer {
        private static final String EXCHANGE_NAME = "normal_exchange";
        private static final String ROUTING_KEY = "zhangsan";
    
        public static void main(String[] args) throws Exception {
            Connection connection = RabbitUtil.getConnection();
            System.out.println("已连接到RabbitMQ服务器....");
            Channel channel = connection.createChannel();
            for (int i = 0; i < 10; i++) {
                String message = "msg" + i;
                channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY, null, message.getBytes());
                System.out.println("消息:"+message+"发送成功!");
            }
        }
    }
    

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/483617.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

[Pandas] 构建DataFrame数据框

DataFrame是二维数据结构&#xff0c;数据以行和列的形式排列 构建DataFrame最基本的定义格式如下 df pd.DataFrame(dataNone, indexNone, columnsNone) 参数说明 data: 具体数据 index: 行索引&#xff0c;如果没有指定&#xff0c;会自动生成RangeIndex(0,1,2,...,n) colu…

mongoose使用详细 -- 如何通过mongoose搭建服务器

前言 授人以鱼不如授人以渔&#xff0c;这篇文章详细介绍了&#xff0c;对于一个从来没有听说过mongoose的小菜鸟如何快速了解和上手mongoose 其他一些开源库可以借助类似的方法进行学习 提前需要准备的工具 1.官网文档 Mongoose :: Documentation 官网提供了很多例子讲解&am…

[Pandas] 查看DataFrame的常用属性

导入数据 import pandas as pddf pd.DataFrame([[L123,A,0,123],[L456,A,1,456],[L437,C,0,789],[L112,B,1,741],[L211,A,0,852],[L985,B,1,963]],columns[Material,Level,Passing,LT]) df 1.dtypes: 查看DataFrame中各列的数据类型 df.dtypes会返回每个字段的数据类型及Da…

C++练级之初级:第六篇

类和对象入门级&#xff1a;第六篇 1.类的引入2.类的定义2.1类的访问限定符2.2类的封装2.3类的实例化 3.如何计算类或者对象的大小4.this指针 总结 我们知道&#xff0c;C在C语言的基础上引入了对象的概念&#xff0c;那么从本篇开始进入类和对象&#xff1b; 1.类的引入 &…

【JavaEE】_1.多线程(1)

目录 1.操作系统 2. 进程 3. CPU分配——进程调度 3.1 操作系统对进程的管理 3.2 PCB的属性 3.2.1 基础属性 3.2.2 实现进程调度的属性 4. 内存分配——内存管理 4.1 虚拟地址空间 4.2 进程间通信 5. 线程 5.1 线程的概念 5.2 创建与使用多线程 5.2.1 方式1&a…

【数据结构】八大排序(一)

&#x1f61b;作者&#xff1a;日出等日落 &#x1f4d8; 专栏&#xff1a;数据结构 珍惜自己的时间&#xff0c;利用好每一份每一秒。做事不放过没一个细节&#xff0c;小心谨慎&#xff0c;细致&#xff0c;能够做到这些&#xff0c;还有什么是不可能的呢? 目录 ​编辑 ✔…

【刷题之路Ⅱ】LeetCode 61. 旋转链表

【刷题之路Ⅱ】LeetCode 61. 旋转链表 一、题目描述二、解题1、方法1——移动部分链表1.1、思路分析1.2、代码实现 2、方法1——闭合为环2.1、思路分析2.2、代码实现 一、题目描述 原题连接&#xff1a; 61. 旋转链表 题目描述&#xff1a; 给你一个链表的头节点 head &#x…

【Python | matplotlib】matplotlib.cm的理解以及举例说明

文章目录 一、模块介绍二、颜色举例 一、模块介绍 matplotlib.cm是Matplotlib中的一个模块&#xff0c;它提供了一组用于处理颜色映射&#xff08;colormap&#xff09;的函数和类。颜色映射是一种将数值映射到颜色的方法&#xff0c;常用于制作热力图、等值线图、散点图等。 …

软件工程实验:原型设计

目录 前言实验目的实验要求实验过程系统原型绘制生成html代码 总结 前言 本次实验的主题是原型设计&#xff0c;即根据用户需求和系统功能&#xff0c;设计一个简单的软件原型&#xff0c;展示系统的界面和交互方式。原型设计是软件工程中的一种重要技术&#xff0c;它可以帮助…

深入探索PyTorch中的自动微分原理及梯度计算方法

❤️觉得内容不错的话&#xff0c;欢迎点赞收藏加关注&#x1f60a;&#x1f60a;&#x1f60a;&#xff0c;后续会继续输入更多优质内容❤️ &#x1f449;有问题欢迎大家加关注私戳或者评论&#xff08;包括但不限于NLP算法相关&#xff0c;linux学习相关&#xff0c;读研读博…

如何完全卸载linux下通过rpm安装的mysql

卸载linux下通过rpm安装的mysql 1.关闭MySQL服务2.使用 rpm 命令的方式查看已安装的mysql3. 使用rpm -ev 命令移除安装4. 查询是否还存在遗漏文件5. 删除MySQL数据库内容 1.关闭MySQL服务 如果之前安装过并已经启动&#xff0c;则需要卸载前请先关闭MySQL服务 systemctl stop…

Tomcat整体架构解析

一、Tomcat整体架构介绍 Tomcat是一个开源的轻量级web应用服务器。整体架构如下&#xff1a; Tomcat中最顶层的容器是Server&#xff0c;即代表一个Tomcat服务器&#xff0c;一个Server中可以有多个Service&#xff0c;对外提供不同的web服务。Service是对Connector和Contain…

电话号码的字母组合

题目&#xff1a;17. 电话号码的字母组合 - 力扣&#xff08;Leetcode&#xff09; 思路&#xff1a; 给定一个电话号码字符串 digits&#xff0c;须输出它所能表示的所有字母组合。我们可以先定义一个数字字符到字母表的映射表 numToStr&#xff0c;然后再用 Combine 函数递归…

【Linux专区】 环境搭建 | 带你白嫖七个月阿里云服务器

&#x1f49e;&#x1f49e;欢迎来到 Claffic 的博客&#x1f49e;&#x1f49e; &#x1f449; 专栏&#xff1a;《Linux专区》&#x1f448; 前言&#xff1a; 工欲善其事必先利其器&#xff0c;没个Linux环境怎么愉快地学Linux&#xff1f;这期就先带大家把环境搞好&#xf…

物联网系统中常见的通信协议分析

物联网&#xff08;Internet of Things, 简称IoT&#xff09;是指将各种传感器、设备等通过互联网连接起来&#xff0c;形成一个庞大的网络&#xff0c;实现物与物之间的互联互通。在实现这个过程中&#xff0c;各种不同的通信协议被广泛应用。本文将为大家介绍物联网中常见的通…

[架构之路-185]-《软考-系统分析师》-3-操作系统基本原理 - 文件索引表

目录 一、文件的索引块。 二、索引分配表 三、索引表的链接方案 四、多层索引 五、混合索引分配 一、文件的索引块。 存放在目录中的文件&#xff0c;并非是文件的真实内容。 目录中记录了文件的索引块是几号磁盘块。 文件对应的索引表是存放在指定的磁盘块中的&#x…

CSI指纹预处理(中值、均值、Hampel、小波滤波)

目录 1、前言 2、中值滤波器 3、均值滤波器 4、Hampel滤波器 5、小波变换滤波器 1、前言 因为设备、温度和实验室物品摆设等因素的影响&#xff0c;未经处理的CSI数据不能直接使用&#xff0c;需要对数据进行异常值处理以保证数据的稳定性&#xff0c;同时减少环境中人的…

云原生Istio架构和组件介绍

目录 1 Istio 架构2 Istio组件介绍2.1 Pilot2.2 Mixer2.3 Citadel2.4 Galley2.5 Sidecar-injector2.6 Proxy(Envoy)2.7 Ingressgateway2.8 其他组件 1 Istio 架构 Istio的架构&#xff0c;分为控制平面和数据面平两部分。 - 数据平面&#xff1a;由一组智能代理&#xff08;[En…

Eclipse改SSH项目,修改java代码无效

遇到了一个大坑&#xff0c;记录一下… 坑1&#xff1a;修改后台代码总是没用… 1.背景&#xff1a; Eclipse运行SSH项目&#xff08;StrutsSpringHibernate&#xff09;&#xff0c;修改SQL语句&#xff0c;但是前端查询的结果没变化…(例如&#xff0c;在sql里加上 where …

LeetCode279之完全平方数(相关话题:动态规划,四平方和定理)

题目描述 给你一个整数 n ,返回 和为 n 的完全平方数的最少数量 。 完全平方数 是一个整数,其值等于另一个整数的平方;换句话说,其值等于一个整数自乘的积。例如,1、4、9 和 16 都是完全平方数,而 3 和 11 不是。 示例 1: 输入:n = 12 输出:3 解释:12 = 4 + 4 +…