RabbitMQ 死信交换机的详述➕应用

news2025/1/14 18:43:11

🥳🥳Welcome 的Huihui's Code World ! !🥳🥳

接下来看看由辉辉所写的关于RabbitMQ的相关操作吧

目录

🥳🥳Welcome 的Huihui's Code World ! !🥳🥳

一.什么是死信交换机

二. 死信队列的应用场景

三.死信队列【TTL】

1.创建主交换机和主队列

2.创建死信交换机和死信队列

3.设置主队列的死信参数

4.将主队列绑定到主交换机

5.将死信队列绑定到死信交换机

6.消费者

7.测试

四. 死信队列【Reject】

1.配置消息确认模式

⭐⭐消息消费者如何通知 Rabbit 消息消费成功?

2.消费者


一.什么是死信交换机

这个在上篇博文已经讲到了,想看的可以点击一下

简单来说,就是生产者将消息投递到 queue 里了,consumer 从 queue 取出消息进行消费,如果它一直无法消费某条数据,那么可以把这条消息放入死信队列里面。等待 条件满足了再从死信队列中取出来再次消费,从而避免消息丢失。

⭐⭐注意:死信交换机本质上就是一个普通的交换机,只是因为队列设置了参数指定了死信交换机,这个普通的交换机才成为了死信的接收者

二. 死信队列的应用场景

  1. 错误处理:当消息无法被成功处理时,可以将其发送到死信队列,以便后续进行错误处理、日志记录或告警。
  2. 延迟消息:通过设置消息的过期时间,可以实现延迟消息的功能。当消息过期时,将被发送到死信队列,可以用于实现定时任务或延迟任务。
  3. 重试机制:当消息处理失败时,可以将消息发送到死信队列,并设置适当的重试策略。例如,可以使用指数退避算法对消息进行重试,以提高消息处理的成功率。
  4. 消息分析:通过监听死信队列,可以对无法被正常消费的消息进行分析和统计,以了解系统中存在的问题或异常情况

三.死信队列【TTL】

这里死信消息是设置了过期的时间【TTL】

1.创建主交换机和主队列

首先,需要创建一个主交换机和一个主队列。这些是正常消息传递的目标,当消息无法被正常消费时,它们将成为死信的来源。

// 创建一个名为queueA的队列
@Bean
public Queue queueA() {
    return new Queue("queueA");
}

// 创建一个名为ExchangeA的直接交换机
@Bean
public DirectExchange ExchangeA() {
    return new DirectExchange("ExchangeA");
}


2.创建死信交换机和死信队列

接下来,需要创建一个死信交换机和一个死信队列。这些将作为死信消息的目标。

// 创建一个名为queueB的队列
@Bean
public Queue queueB() {
    return new Queue("queueB");
}

// 创建一个名为ExchangeB的直接交换机
@Bean
public DirectExchange ExchangeB() {
    return new DirectExchange("ExchangeB");
}


3.设置主队列的死信参数

在创建主队列时,需要为其设置一些参数来定义死信的行为。具体而言,需要设置x-dead-letter-exchange参数为死信交换机的名称,以及x-dead-letter-routing-key参数为死信队列的路由键。

 // 创建一个名为queueA的队列
    @Bean
    public Queue queueA() {
        Map<String, Object> config = new HashMap<>();
        //message在该队列queue的存活时间最大为10秒
        config.put("x-message-ttl", 10000);
        //x-dead-letter-exchange参数是设置该队列的死信交换器(DLX)
        config.put("x-dead-letter-exchange", "ExchangeB");
        //x-dead-letter-routing-key参数是给这个DLX指定路由键
        config.put("x-dead-letter-routing-key", "BB");
        return new Queue("queueA",true,true,false,config);
    }

关于其中的参数:

  • name:队列的唯一标识符,用于在消息中间件中识别特定的队列。每个队列都有一个名称,发送方将消息发送到指定的队列,接收方从队列中获取消息进行处理。
  • durable:指定队列是否需要持久化存储。如果队列被标记为持久化,那么即使在消息中间件重启之后,队列中的消192息也不会丢失。这对于重要的消息和应用场景非常关键。
  • exclusive:用于指定队列是否为独占队列。当一个队列被标记为独占时【exclusive=true】,只有创建该队列的连接或通道可以访问或使用这个队列。
  • autoDelete:用于指定队列是否在没有任何消费者订阅或连接时自动删除。当一个队列中的"autodelete"属性为true时,如果没有消费者订阅该队列或者没有生产者向该队列发送消息,那么该队列将自动被删除。

4.将主队列绑定到主交换机

将主队列与主交换机进行绑定,以确保正常消息能够被正确路由到主队列。

// 将queueA与ExchangeA进行绑定,并设置路由键为"AA"
@Bean
public Binding bindingAA() {
    return BindingBuilder
            .bind(queueA()) // 将queueA与ExchangeA进行绑定
            .to(ExchangeA()) // 指定绑定的目标交换机为ExchangeA
            .with("AA"); // 设置路由键为"AA"
}

5.将死信队列绑定到死信交换机

将死信队列与死信交换机进行绑定,以确保死信消息能够被正确路由到死信队列。

// 将queueB与ExchangeB进行绑定,并设置路由键为"BB"
@Bean
public Binding bindingBB() {
    return BindingBuilder
            .bind(queueB()) // 将queueB与ExchangeB进行绑定
            .to(ExchangeB()) // 指定绑定的目标交换机为ExchangeB
            .with("BB"); // 设置路由键为"BB"
}

6.消费者

这个消费者监听的是queueB,当queueA的消息过期之后就会交到死信交换机中的queueB进行处理

package com.example.consumer.exchange;

import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

import java.util.Map;

@Component // 声明这是一个Spring组件
@RabbitListener(queues = {"queueB"}) // 监听名为"queueB"的队列
public class DeadLetterReceive {

    @RabbitHandler // 当接收到消息时,调用此方法处理
    public void handler(String msg) { // 参数为接收到的消息,类型为Map<String, Object>
        System.out.println("QB接到消息"+msg); // 打印接收到的消息
    }
}

7.测试

先疯狂的访问queueA队列的方法

现在可以看到queueA这里有六条消息

queueA的消息过期了之后,queueB消费者中就接收到消息了

现在把queueB的消费者停掉

现在再去疯狂的访问queueA的方法,此时就可以发现,queueA的消息都到queueB那里去了

四. 死信队列【Reject】

这里死信消息是设置成了拒绝,【requeue 参数为 false】,还是用的上面所创建的交换机以及队列...

1.配置消息确认模式

  • 消息确认模式有:

    • AcknowledgeMode.NONE:自动确认

    • AcknowledgeMode.AUTO:根据情况确认

    • AcknowledgeMode.MANUAL:手动确认

server:
  port: 9999
spring:
  rabbitmq:
    host: 192.168.101.129
    password: 123456
    port: 5672
    username: wh
    virtual-host: my_vhost
    listener:
      simple:
        acknowledge-mode: manual

消息接收确认

⭐⭐消息消费者如何通知 Rabbit 消息消费成功?

  • 消息通过 ACK 确认是否被正确接收,每个 Message 都要被确认(acknowledged),可以手动去 ACK 或自动 ACK

  • 自动确认会在消息发送给消费者后立即确认,但存在丢失消息的可能,如果消费端消费逻辑抛出异常,也就是消费端没有处理成功这条消息,那么就相当于丢失了消息

  • 如果消息已经被处理,但后续代码抛出异常,使用 Spring 进行管理的话消费端业务逻辑会进行回滚,这也同样造成了实际意义的消息丢失

  • 如果手动确认则当消费者调用 ack、nack、reject 几种方法进行确认,手动确认可以在业务失败后进行一些操作,如果消息未被 ACK 则会发送到下一个消费者

  • 如果某个服务忘记 ACK 了,则 RabbitMQ 不会再发送数据给它,因为 RabbitMQ 认为该服务的处理能力有限

  • ACK 机制还可以起到限流作用,比如在接收到某条消息时休眠几秒钟

2.消费者

这个消费者是queueA的消费者

package com.example.consumer.exchange;

import com.rabbitmq.client.Channel;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.support.AmqpHeaders;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.stereotype.Component;

import java.io.IOException;

@Component // 声明这是一个Spring组件
@RabbitListener(queues = "queueA") // 监听名为"queueA"的队列
public class DeadLetterReceiveA {

    @RabbitHandler // 当接收到消息时,调用此方法处理
    public void handler(String msg, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long tag) throws Exception {
        System.out.println("QA接到消息"+msg); // 打印接收到的消息
        channel.basicAck(tag,true); // 确认消息已被消费
    }
}

此时来访问一下queueA

queueA的消费者也已经将消息消费掉了

上面是queueA正常消费了,现在我直接将消息拒绝掉,并且不让它再重新入队了

package com.example.consumer.exchange;

import com.rabbitmq.client.Channel;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.support.AmqpHeaders;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.stereotype.Component;

import java.io.IOException;

@Component // 声明这是一个Spring组件
@RabbitListener(queues = "queueA") // 监听名为"queueA"的队列
public class DeadLetterReceiveA {

    @RabbitHandler // 当接收到消息时,调用此方法处理
    public void handler(String msg, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long tag) throws Exception {
        System.out.println("QA接到消息"+msg); // 打印接收到的消息
        channel.basicReject(tag,false); // 拒绝消息,不重新入队
        Thread.sleep(1000); // 等待1秒
    }
}

这时,再访问一下方法

刚开始这个消息会到queueA中,但是queueA会把它拒绝掉,拒绝之后消息就会到queueB 中

此时消息全部都放在了queueB中,但是还没有消费,这是因为我们已经将消息确认的模式变成了手动,所以需要手动确认之后,消息才会被消费

package com.example.consumer.exchange;

import com.rabbitmq.client.Channel;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.support.AmqpHeaders;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.stereotype.Component;

import java.io.IOException;
import java.util.Map;

@Component // 声明这是一个Spring组件
@RabbitListener(queues = {"queueB"}) // 监听名为"queueB"的队列
public class DeadLetterReceive {

    @RabbitHandler // 当接收到消息时,调用此方法处理
    public void handler(String msg, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long tag) throws Exception { // 参数为接收到的消息,类型为Map<String, Object>
        System.out.println("QB接到消息"+msg); // 打印接收到的消息
        channel.basicAck(tag,true); // 确认消息已被消费
    }
}

好啦,今天的分享就到这了,希望能够帮到你呢!😊😊 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1417164.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

说说你对vue的mixin的理解,有什么应用场景

mixin是什么 Vue中的mixin 局部混入全局混入注意事项: 使用场景源码分析Vue 的几种类型的合并策略 替换型合并型队列性叠加型小结 此文章&#xff0c;来源于印客学院的资料&#xff0c;这里只是分享&#xff0c;便于查漏补缺。 mixin是什么 Mixin 是 面向对象程序设计语言中…

一文理清楚-Docker 容器如何工作

Docker 容器如何工作 集装箱什么是虚拟机&#xff1f;虚拟化如何运作&#xff1f;什么是容器&#xff1f;什么是 Docker&#xff1f;总结 五星上将麦克阿瑟曾经说过&#xff1a;在docker面前&#xff0c;虚拟机就是个弟弟 集装箱 《盒子&#xff1a;集装箱如何让世界变得更小&…

车载电子电器架构 —— 多核处理器刷写策略

车载电子电器架构 —— 多核处理器刷写策略 我是穿拖鞋的汉子&#xff0c;魔都中坚持长期主义的汽车电子工程师。 老规矩&#xff0c;分享一段喜欢的文字&#xff0c;避免自己成为高知识低文化的工程师&#xff1a; 屏蔽力是信息过载时代一个人的特殊竞争力&#xff0c;任何消…

内存管理(mmu)/内存分配原理/多级页表

1.为什么要做内存管理&#xff1f; 随着进程对内存需求的扩大&#xff0c;和同时调度的进程增加&#xff0c;内存是比较瓶颈的资源&#xff0c;如何更好的高效的利于存储资源是一个重要问题。 这个内存管理的需求也是慢慢发展而来&#xff0c;早期总线上的master是直接使用物…

C++:STL - string

C&#xff1a;STL - string basic_stringstringstring的常见构造string的输入输出operator<<c_stroperator>>getline string访问及遍历operator[ ] & atfont & back迭代器begin & endrbegin & rend 范围for string的容量操作size & lengthmax_…

用甘特图有效管理多个项目进度

当公司或组织同时承担多个项目时,合理规划各项目的时间节点与资源分配对确保高效完成至关重要。采用甘特图可以直观地展示多个项目的时间进程、关键里程碑以及资源分配情况,便于从宏观层面全面把控各项目的动态。 在线甘特图软件 zz-plan.com 提供了非常强大的时间轴规划功能,支…

CSS 多色正方形上升

<template><view class="loop cubes"><view class="item cubes"></view> <!-- 方块1 --><view class="item cubes"></view> <!-- 方块2 --><view class="item cubes"></vie…

C# 将HTML网页、HTML字符串转换为PDF

将HTML转换为PDF可实现格式保留、可靠打印、文档归档等多种用途&#xff0c;满足不同领域和情境下的需求。本文将通过以下两个示例&#xff0c;演示如何使用第三方库Spire.PDF for .NET和QT插件在C# 中将Html 网页&#xff08;URL&#xff09;或HTML字符串转为PDF文件。 HTML转…

【C语言】深入理解指针(3)数组名与函数传参

目录 &#xff08;一&#xff09;数组名的理解 &#xff08;1&#xff09;数组名是数组首元素的地址 &#xff08;2&#xff09;两个例外 &#xff08;二&#xff09;函数内数组传参 &#xff08;1&#xff09;一维数组传参 &#xff08;2&#xff09;二维数组传参 &…

以太网交换基础VLAN原理与配置

目录 7.以太网交换基础 7.1.以太网协议 7.2.以太网帧介绍 7.3.以太网交换机 7.4.同网段数据通信全过程 8.VLAN原理与配置 8.1.VLAN的基本概念 8.2.VLAN的应用 7.以太网交换基础 7.1.以太网协议 以太网是当今现有局域网(Local Area Network,LAN)采用的最通用的通信协议…

git使用指南——以gitlab为例

注册gitlab 自行注册 新建项目 选择新建一个空白的项目 上传项目 clone项目地址到本地 执行完之后&#xff0c;会在目录下生成如下内容&#xff1a;进入里面&#xff0c;选择.git&#xff0c;要上传的内容&#xff08;资料或代码复制到该目录下&#xff09;&#xff1a;…

界面组件DevExpress .NET MAUI中文教程 - 如何优化手机屏幕空间?

DevExpress拥有.NET开发需要的所有平台控件&#xff0c;包含600多个UI控件、报表平台、DevExpress Dashboard eXpressApp 框架、适用于 Visual Studio的CodeRush等一系列辅助工具。 获取DevExpress v23.2正式版下载 Bottom Sheet是一个组件&#xff0c;它显示固定在屏幕底部的…

LeetCode:376.摆动序列

个人主页&#xff1a;仍有未知等待探索-CSDN博客 专题分栏&#xff1a;算法_仍有未知等待探索的博客-CSDN博客 题目链接&#xff1a;376. 摆动序列 - 力扣&#xff08;LeetCode&#xff09; 一、题目 如果连续数字之间的差严格地在正数和负数之间交替&#xff0c;则数字序列称…

【排序5】基数排序:数字的组织与整理艺术

&#x1f3a1;基数排序 &#x1f38a;1、基本思想&#x1f38a;2、基本步骤&#x1f38a;3、代码示例&#x1f38a;4、特性总结 &#x1f38a;1、基本思想 基数排序&#xff08;Radix Sort&#xff09;是一种非比较排序算法&#xff0c;它根据数字的每一位来对元素进行排序。它…

提示unzip :commad not found; 安装unzip教程

1.当使用unzip解压时 提示&#xff1a; unzip :commad not found; 2.安装命令 sudo yum install unzip 3.输入 y 确认 4.提示&#xff1a;完成 5.输入 unzip 文件名 解压-验证 6.完成

34.基于51单片机的智能停车位计时收费系统设计

一、系统功能介绍&#xff1a; 本设计基于 RFID智能识别和高速的视频图像和存储比较相结合&#xff0c;通过计算机的图像处理和自动识别&#xff0c;对车辆进出停车场的收费、车牌识别和车位诱导等&#xff0c;以实现停车场全方位智能管理。 本设计是以AT89C51 型单片机为主控芯…

Kubernetes (十七) 资源监控

一. 资源监控 二. metrics-server资源下载配置 官网:资源下载&#xff1a;http…

搭建Mybatis环境

1.导入依赖 pom.xml <dependencies> <!-- Mybatis依赖--><dependency><groupId>org.mybatis</groupId><artifactId>mybatis</artifactId><version>3.5.7</version></dependency> <!-- Mysql依…

Tonka Finance 测试网活动,开启新铭文时代财富之门

Tonka Finance 是铭文赛道首个借贷市场&#xff0c;通过搭建一套铭文资产借贷质押方案&#xff0c;为铭文资产以借贷的形式释放价值、捕获流动性等方面提供了基础。作为铭文赛道最重要的基建设施之一&#xff0c;Tonka Finance 在面向市场后备受关注&#xff0c;并迅速作为铭文…

【UE】在控件蓝图中通过时间轴控制材质参数变化

效果 步骤 1. 新建一个控件蓝图和一个材质 2. 打开材质&#xff0c;设置材质域为用户界面&#xff0c;混合模式设置为“半透明” 在材质图表中添加两个参数来控制材质的颜色和不透明度 3. 对材质创建材质实例 4. 打开控件蓝图&#xff0c;在画布面板中添加一个图像控件 将刚…