RabbitMQ之死信队列解读

news2024/10/7 16:21:43

目录

基本介绍

消息进入到死信队列的情况

消息过期

队列过期 

 队列达到最大长度(先入队的消息会被发送到DLX)

消费者拒绝消息不进行重新投递

消费者拒绝消息

springboot代码实战

实战架构

工程概述

RabbitConfigDeal 配置类:创建队列及交换机并进行绑定 

MessageService业务类:发送消息及接收消息

主启动类RabbitMq01Application:实现ApplicationRunner接口


基本介绍

什么是死信交换机

在定义业务队列的时候,要考虑指定一个死信交换机,死信交换机可以和任何一个普通的队列进行绑定,然后在业务队列出现死信的时候就会将数据发送到死信队列。

什么是死信队列

死信队列实际上就是一个普通的队列,只是这个队列跟死信交换机进行了绑定,用来存放死信而已

RabbitMQ 中有一种交换器叫 DLX,全称为 Dead-Letter-Exchange,可以称之为死信交换器。当消息在一个队列中变成死信(dead message)之后,它会被重新发送到另外一个交换器中,这个交换器就是 DLX,绑定在 DLX 上的队列就称之为死信队列。 

要注意的是,DLX 也是一个正常的交换器,和一般的交换器没有区别,它能在任何队列上被指定,实际上就是设置某个队列的属性。当这个队列存在死信时,RabbitMQ 就会自动地将这个消息重新发布到设置的 DLX 上去,进而被路由到另一个队列,即死信队列。

消息进入到死信队列的情况

消息过期

MessageProperties messageProperties=new MessageProperties();
//设置此条消息的过期时间为10秒
messageProperties.setExpiration("10000");

队列过期 

 Map<String, Object> arguments =new HashMap<>();
 //指定死信交换机,通过x-dead-letter-exchange 来设置
 arguments.put("x-dead-letter-exchange",EXCHANGE_DLX);
 //设置死信路由key,value 为死信交换机和死信队列绑定的key
 arguments.put("x-dead-letter-routing-key",BINDING_DLX_KEY);
 //队列的过期时间
 arguments.put("x-message-ttl",10000);
return  new Queue(QUEUE_NORMAL,true,false,false,arguments);

 TTL: Time to Live的简称,过期时间

队列达到最大长度(先入队的消息会被发送到DLX)

Map<String, Object> arguments = new HashMap<String, Object>();
//设置队列的最大长度 ,对头的消息会被挤出变成死信
arguments.put("x-max-length", 5);

消费者拒绝消息不进行重新投递

从正常的队列接收消息,但是对消息不进行确认,并且不对消息进行重新投递,此时消息就进入死信队列。

application.yml 启动手动确认

spring:
  rabbitmq:
    listener:
      simple:
        acknowledge-mode: manual
 /**
     * 监听正常的那个队列的名字,不是监听那个死信队列
     * 我们从正常的队列接收消息,但是对消息不进行确认,并且不对消息进行重新投递,此时消息就进入死信队列
     *
     * channel 消息信道(是连接下的一个消息信道,一个连接下有多个消息信息,发消息/接消息都是通过信道完成的)
     */
    @RabbitListener(queues = {RabbitConfig.QUEUE})
    public void process(Message message, Channel channel) {
        System.out.println("接收到的消息:" + message);
        //对消息不确认, ack单词是 确认 的意思
       
        try {
            System.out.println("deliveryTag = " + message.getMessageProperties().getDeliveryTag());
            //要开启rabbitm消息消费的手动确认模式,然后才这么写代码;
            channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false);
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

void basicNack(long deliveryTag, boolean multiple, boolean requeue)

  • deliveryTag:消息的一个数字标签
  • multiple:翻译成中文是多个的意思,如果是true表示对小于deliveryTag标签下的消息都进行Nack不确认,false表示只对当前deliveryTag标签的消息Nack
  • requeue:如果是true表示消息被Nack后,重新发送到队列,如果是false,消息被Nack后,不会重新发送到队列

消费者拒绝消息

开启手动确认模式,并拒绝消息,不重新投递,则进入死信队列

  /**
     * 监听正常的那个队列的名字,不是监听那个死信队列
     * 我们从正常的队列接收消息,但是对消息不进行确认,并且不对消息进行重新投递,此时消息就进入死信队列
     *
     * channel 消息信道(是连接下的一个消息信道,一个连接下有多个消息信息,发消息/接消息都是通过信道完成的)
     */
    @RabbitListener(queues = {RabbitConfig.QUEUE})
    public void process(Message message, Channel channel) {
        System.out.println("接收到的消息:" + message);
        try {
            System.out.println("deliveryTag = " + message.getMessageProperties().getDeliveryTag());
            //要开启rabbitm消息消费的手动确认模式,然后才这么写代码;
            channel.basicReject(message.getMessageProperties().getDeliveryTag(), false);
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}

 channel.basicReject(message.getMessageProperties().getDeliveryTag(), true);

  • basicReject是否定的交付,一般在消费消息时出现异常等的时候执行。可以将该消息丢弃或重排序去重新处理消息
  • 参数1: 消费消息的index
  • 参数2: 对异常消息的处理,true表示重排序,false表示丢弃
  • Reject 在拒绝消息时,可以使用 requeue 标识,告诉 RabbitMQ 是否需要重新发送给别的消费者。如果是 false 则不重新发送,一般这个消息就会被RabbitMQ 丢弃。Reject 一次只能拒绝一条消息。如果是 true 则消息发生了重新投递。
  • Nack 跟 Reject 类似,只是它可以一次性拒绝多个消息。也可以使用 requeue 标识,这是 RabbitMQ 对 AMQP 规范的一个扩展。
  • 通过 RejectRequeuConsumer 可以看到无论是使用 Reject 方式还是 Nack 方式,当 requeue
  • 参数设置为 true 时,消息发生了重新投递。当 requeue 参数设置为 false 时,消息丢失了。

springboot代码实战

实战架构

如上图,消息到达正常的交换机exchange.nomal.a,通过与正常的队列queue.noaml.a绑定,消息会到达正常队列,如果消息变为死消息以后则会转发到与正常队列绑定的死信交换机中,死信交换机会转发到与其绑定的死信队列queue.deal.a。

工程概述

 工程采用springboot架构,主要用到的依赖为:

<!--        rabbit的依赖-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>

        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
        </dependency>

application.yml配置文件如下:

server:
  port: 8080
spring:
  rabbitmq:
    host: 123.249.70.148
    port: 5673
    username: admin
    password: 123456
    virtual-host: /

RabbitConfigDeal 配置类:创建队列及交换机并进行绑定 

@Configuration
public class RabbitConfigDeal {


}

创建正常交换机

   @Bean
    public DirectExchange normalExchange(){
        return ExchangeBuilder.directExchange("exchange.normal.a").build();
    }

创建死信交换机

    @Bean
    public DirectExchange deadExchange(){
        return ExchangeBuilder.directExchange("exchange.dead.a").build();
    }

 创建死信队列

    @Bean
    public Queue deadQueue(){
        return QueueBuilder.durable("queue.dead.a").build();
    }

 创建正常队列,设置他的绑定死信交换机,以及对应绑定的路由key为order

    @Bean
    public Queue normalQueue(){
        Map<String, Object> arguments =new HashMap<>();
        arguments.put("x-message-ttl",20000);
        arguments.put("x-dead-letter-exchange","exchange.dead.a");
        arguments.put("x-dead-letter-routing-key","order");
        return QueueBuilder.durable("queue.normal.a")
                .withArguments(arguments).build();
    }

绑定正常交换机和正常队列

    @Bean
    public Binding bindingNormal(DirectExchange normalExchange,Queue normalQueue){
        return BindingBuilder.bind(normalQueue).to(normalExchange).with("order");
    }

 绑定死信交换机和死信队列

    @Bean
    public Binding bindingDeal(DirectExchange deadExchange,Queue deadQueue){
        return BindingBuilder.bind(deadQueue).to(deadExchange).with("order");
    }

MessageService业务类:发送消息及接收消息

@Component
@Slf4j
public class MessageService {
    @Resource
    private RabbitTemplate rabbitTemplate;

}

 发送消息方法

    public void sendMsg(){
        //添加消息属性
        Message message = MessageBuilder.withBody("hello word!".getBytes(StandardCharsets.UTF_8))
               .build();
        rabbitTemplate.convertAndSend("exchange.normal.a","order",message);
        log.info("发送消息时间:{}",new Date());
    }

这里用的路由key为info 

MessageConvert

  • 涉及网络传输的应用序列化不可避免,发送端以某种规则将消息转成 byte 数组进行发送,接收端则以约定的规则进行 byte[] 数组的解析
  • RabbitMQ 的序列化是指 Messagebody 属性,即我们真正需要传输的内容,RabbitMQ 抽象出一个 MessageConvert 接口处理消息的序列化,其实现有 SimpleMessageConverter默认)、Jackson2JsonMessageConverter

  接受消息

    @RabbitListener(queues = {"queue.dead.a"})
    public void receiveMsg(Message message){
        byte[] body = message.getBody();
        String queue = message.getMessageProperties().getConsumerQueue();
        String msg=new String(body);
        log.info("{}接收到消息时间:{},消息为{}",queue,new Date(),msg);
    }

 Message

在消息传递的过程中,实际上传递的对象为 org.springframework.amqp.core.Message ,它主要由两部分组成:

  1. MessageProperties // 消息属性

  2. byte[] body // 消息内容

@RabbitListener

使用 @RabbitListener 注解标记方法,当监听到队列 debug 中有消息时则会进行接收并处理

  • 消息处理方法参数是由 MessageConverter 转化,若使用自定义 MessageConverter 则需要在 RabbitListenerContainerFactory 实例中去设置(默认 Spring 使用的实现是 SimpleRabbitListenerContainerFactory)

  • 消息的 content_type 属性表示消息 body 数据以什么数据格式存储,接收消息除了使用 Message 对象接收消息(包含消息属性等信息)之外,还可直接使用对应类型接收消息 body 内容,但若方法参数类型不正确会抛异常:

    • application/octet-stream:二进制字节数组存储,使用 byte[]
    • application/x-java-serialized-object:java 对象序列化格式存储,使用 Object、相应类型(反序列化时类型应该同包同名,否者会抛出找不到类异常)
    • text/plain:文本数据类型存储,使用 String
    • application/json:JSON 格式,使用 Object、相应类型

主启动类RabbitMq01Application:实现ApplicationRunner接口

/**
 * @author 风轻云淡
 */
@SpringBootApplication
public class RabbitMq01Application implements ApplicationRunner {

    public static void main(String[] args) {
        SpringApplication.run(RabbitMq01Application.class, args);
    }

    @Resource
    private MessageService messageService;

    /**
     * 程序一启动就会调用该方法
     * @param args
     * @throws Exception
     */
    @Override
    public void run(ApplicationArguments args) throws Exception {
        messageService.sendMsg();

    }
}

在SpringBoot中,提供了一个接口:ApplicationRunner。 该接口中,只有一个run方法,他执行的时机是:spring容器启动完成之后,就会紧接着执行这个接口实现类的run方法。

由于该方法是在容器启动完成之后,才执行的,所以,这里可以从spring容器中拿到其他已经注入的bean。

启动主启动类后查看控制台:

2023-09-28 10:46:17.772  INFO 71700 --- [           main]
 c.e.rabbitmq01.service.MessageService    :
 发送消息时间:Thu Sep 28 10:46:17 CST 2023
2023-09-28 10:46:37.824  INFO 71700 --- [ntContainer#0-1] 
c.e.rabbitmq01.service.MessageService    : 
queue.dead.a接收到消息时间:Thu Sep 28 10:46:37 CST 2023,消息为hello word!

我们在这里可以看见17s的时候发送了消息,在经过了20s,即到37s的时候我们在死信队列queue.dead.a接受到了消息。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1083177.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

极简c++(9)函数模板与类模板

模板概念 函数模板 注意&#xff1a;不允许template与函数模板定义之间有任何语句&#xff1b; 类模板 没有讲的内容 后续需要可以自己查看补充。

C++ 使用getline()从文件中读取一行字符串

我们知道,getline() 方法定义在 istream 类中,而 fstream 和 ifstream 类继承自 istream 类,因此 fstream 和 ifstream 的类对象可以调用 getline() 成员方法。 当文件流对象调用 getline() 方法时,该方法的功能就变成了从指定文件中读取一行字符串。 该方法有以下 2 种语…

战绩查询、作图工具、干货分享,助你成为吃鸡高手!

大家好&#xff01;吃鸡玩家的福音来了&#xff01;欢迎来到闲游盒的世界&#xff0c;这里有一切你需要的&#xff1a;从战绩查询到作图工具&#xff0c;从干货分享到技巧免费领取&#xff01; 首先&#xff0c;让我们谈谈战绩查询。作为一名玩家&#xff0c;当你在吃鸡游戏中取…

C#,数值计算——数据建模Fitmrq(Levenberg-Marquardt nonlinear fitting)的计算方法与源程序

1 文本格式 using System; namespace Legalsoft.Truffer { /// <summary> /// Levenberg-Marquardt nonlinear fitting /// </summary> public class Fitmrq { private const int NDONE 4; private const int ITMAX 1000; …

【【萌新的SOC学习之GPIO之MIO控制LED实验程序设计】】

萌新的SOC学习之GPIO之MIO控制LED实验程序设计 如何设置完GPIO并且传递数据 我们先了解GPIO引脚的配置 每一个GPIO引脚都可以设置成输入输出 &#xff0c;只有GPIO8 7 只能作为输出 我们现在做一个例子 GPIO 的bank我们知道有4个 bank0 1 2 3 DIRM_0 就是第一个bank 需要写入…

tecplot的使曲线光滑的方法

在对流场模拟结果进行处理时&#xff0c;不可避免的存在些许的噪声导致tecplot的曲线不光滑&#xff0c;故介绍一下tecplot中曲线光滑的方法 1.选择data-alter 2.data中选择smooth,通过调节number of passes&#xff0c;调节曲线光滑程度 大功告成&#xff01; 最近发现写的文…

防反接方案,NMOS PMOS

NMOS电路的缺陷 1.1&#xff0c;由于NMOS具有内阻&#xff0c;我们可以把它设为Rdson, 那么电优先会从低电阻串口处走&#xff0c;这样会有烧毁串口地的风险。 1.2&#xff0c;短期内的方案是&#xff0c;可以在串口处串接1个100ohm电阻。 PMOS电路

基于Springboot实现口腔牙诊所管理平台项目【项目源码+论文说明】

基于Springboot实现口腔牙科诊所管理平台演示 摘要 随着科学技术的飞速发展&#xff0c;社会的方方面面、各行各业都在努力与现代的先进技术接轨&#xff0c;通过科技手段来提高自身的优势&#xff0c;口腔管理平台当然也不能排除在外。口腔管理平台是以实际运用为开发背景&am…

oracle connect by详解

1、作用&#xff1a; 用于存在父子&#xff0c;祖孙&#xff0c;上下级等层级关系的数据表进行层级查询。 2、语法 SELECT ... FROM .... START WITH cond1 CONNECT BY cond2 WHERE cond3;2.1、说明 start with: 指定起始节点的条件 connect by: 指定父子行的条件关系 …

geecg-uniapp 路由修改 页面创建 (2)

首先打开 首页面 &#xff08;1&#xff09;我们以home的常用服务 为例 我们修改 usList 数据 &#xff08;2&#xff09;查找对应路径 work.js 我们修改荒石对应的路径 现在跳转 helloword &#xff08;3&#xff09;修改跳转路径 &#xff08;4&#xff09;创建页面 …

Python —— 接口测试之使用requests发起请求实战

1、认识requests模块 1、requests介绍 requests是一个第三方库&#xff0c;因此首先需要安装这个库&#xff0c;安装三步走&#xff1a; 安装&#xff1a;pip install requests在文件中引用这个模块&#xff1a;import requests使用这个库发起一个请求&#xff08;get请求、…

C++——容器适配器

1. 什么是适配器&#xff1f; 容器适配器是C标准库中的一种数据结构&#xff0c;它可以将不同类型的容器&#xff08;如vector、list、deque等&#xff09;转换为另一种类型的容器。容器适配器提供了一种简单的方式来重新组织和访问数据&#xff0c;同时隐藏了底层容器的实现细…

C++ 多维数组

C 支持多维数组。多维数组声明的一般形式如下&#xff1a; type name[size1][size2]...[sizeN];例如&#xff0c;下面的声明创建了一个三维 5 . 10 . 4 整型数组&#xff1a; int threedim[5][10][4];二维数组 多维数组最简单的形式是二维数组。一个二维数组&#xff0c;在本…

vue启用打印机打印-二维码条形码打印

起因 资产、设备管理必备的二维码条形码打印 原理 所需插件 vue-print-nb 本文版本1.7.5 构建所需要打印的内容&#xff0c;利用vue-print-nb进行打印。二维码条形码打印的本质就是图片打印 代码 html部分 <div ref"printDom"id"printDom">//…

Linux内存管理 | 二、虚拟地址空间布局

我的圈子&#xff1a; 高级工程师聚集地 我是董哥&#xff0c;高级嵌入式软件开发工程师&#xff0c;从事嵌入式Linux驱动开发和系统开发&#xff0c;曾就职于世界500强企业&#xff01; 创作理念&#xff1a;专注分享高质量嵌入式文章&#xff0c;让大家读有所得&#xff01; …

SNAP计算哨兵2号的LAI/FVC/FAPAR

SNAP计算LAI 简介 SNAP的计算LAI方法是基于PROSAIL模型&#xff0c;集成的模块&#xff0c;很方便。 首先要下载SNAP软件&#xff0c;下载步骤见博文 打开数据 找到影像的.xml文件 拖入左边的空白框中&#xff0c;发现所有波段会显示如下。这些数据都是已经经过处理完成之后…

Vue之VueX知识探索(一起了解关于VueX的新世界)

目录 前言 一、VueX简介 1. 什么是VueX 2. VueX的作用及重要性 3. VueX的应用场景 二、VueX的使用准备工作 1. 下载安装VueX 2. vuex获取值以及改变值 2.1 创建所需示例 2.2 将创建好的.vue文件页面显示 2.3 创建VueX的相关文件 2.4 配置VueX四个js文件 2.5 加载到vue示…

网络架构介绍

1 网络 7 层架构 7 层模型主要包括&#xff1a; 1. 物理层&#xff1a;主要定义物理设备标准&#xff0c;如网线的接口类型、光纤的接口类型、各种传输介质的传输速率等。它的主要作用是传输比特流&#xff08;就是由 1、0 转化为电流强弱来进行传输,到达目的地后在转化为1、0…

简述电子设计中的EMC、EMI、ESD

简述电子设计中的EMC、EMI、ESD ESD EMI EMC ESD、EMI、EMC 设计是电子工程师在设计中遇到最常见的难题&#xff0c;电磁兼容性&#xff08;EMC&#xff09;是指设备或系统在其电磁环境中符合要求运行并不对其环境中的任何设备产生无法忍受的电磁干扰的能力。 因此&#xff0…

【2023年11月第四版教材】第24章《法律法规与标准规范》(合集篇)

第24章《法律法规与标准规范》(合集篇&#xff09; 1 民法典&#xff08;合同编&#xff09;2 招标投标法2.1 关于时间的总结2.2 内容 3 政府采购法4 专利法5 著作权法6 商标法7 网络安全法8 数据安全法 1 民法典&#xff08;合同编&#xff09; 1、要约是希望和他人订立合同的…