消息队列RabbitMQ.03.死信交换机的讲解与使用

news2024/11/16 23:30:21

目录

一、死信队列(延迟队列) 概念讲解

二、确认消息(局部方法处理消息)

三、代码实战

1.编写生产者代码,配置消息、直连交换机、路由键

 1.1代码解析:

2.配置消费者接受类接受直连交换机的路由键

2.1.  String msg,Channel channel ,@Header(AmqpHeaders.DELIVERY_TAG) ,long tag 方法参数解析:

  2.2.channel.basicAck(tag,true); 代码解析

2.3.channel.basicReject(tag,true); 代码解析:

 3.对死信交换机进行测试


 

一、死信队列(延迟队列) 概念讲解


死信,在官网中对应的单词为 “Dead Letter”, 它是 RabbitMQ 的一种消息机制。
一般来说,生产者将消息投递到 broker 或者直接到 queue 里了, consumer queue 取出消息进行消费,如果它一直无法消费某条数据,那么可以把这条消息放入死信队列里面。等待
条件满足了再从死信队列中取出来再次消费,从而避免消息丢失。
死信消息来源:
  • 消息 TTL 过期
  • 队列满了,无法再次添加数据
  • 消息被拒绝(reject nack),并且 requeue =false

二、确认消息(局部方法处理消息)


 默认情况下消息消费者是自动 ack (确认)消息的,如果要手动 ack(确认)则需要修改确认模式为 manual

spring:
  rabbitmq:
    listener:
      simple:
        acknowledge-mode: manual

三、代码实战


1.编写生产者代码,配置消息、直连交换机、路由键

 @Bean
    public Queue queueA() {//正常
        Map<String, Object> config = new HashMap<>();
        //message在该队列queue的存活时间最大为10秒
        config.put("x-message-ttl", 10000);
        //x-dead-letter-exchange参数是设置该队列的死信交换器(DLX)
        config.put("x-dead-letter-exchange", "ExchangeB");
        //x-dead-letter-routing-key参数是给这个DLX指定路由键
        config.put("x-dead-letter-routing-key", "bb");
        return new Queue("queueA",true,false,false,config);
    }

    @Bean
    public DirectExchange ExchangeA(){
        return new DirectExchange("ExchangeA");
    }

    @Bean
    public Binding bindingA(){
        return BindingBuilder
                .bind(queueA())
                .to(ExchangeA())
                .with("aa");
    }

    @Bean
    public Queue queueB() {
        return new Queue("queueB");
    }

    @Bean
    public DirectExchange ExchangeB(){
        return new DirectExchange("ExchangeB");
    }


    @Bean
    public Binding bindingB(){
        return BindingBuilder
                .bind(queueB())
                .to(ExchangeB())
                .with("bb");
    }
 1.1代码解析:

 Map<String, Object> config = new HashMap<>();
        //message在该队列queue的存活时间最大为10秒
        config.put("x-message-ttl", 10000);
        //x-dead-letter-exchange参数是设置该队列的死信交换器(DLX)
        config.put("x-dead-letter-exchange", "ExchangeB");
        //x-dead-letter-routing-key参数是给这个DLX指定路由键
        config.put("x-dead-letter-routing-key", "bb");
        return new Queue("queueA",true,false,false,config);

首先我们需要知道死信产生的原因:

  • 消息过期: 消息在队列中等待的时间超过了指定的过期时间。
  • 消息被拒绝: 消费者拒绝消费消息,并且消息被标记为不可重新投递。
  • 队列满: 队列达到最大容量,无法再接收新的消息。

上述代码的作用是让消息等待10秒,如果10秒内没有进入消费者或者没有被操作那么它就会进入死信;我们就会将它放入消息B也就是当作死信就行处理。

 return new Queue("queueA",true,false,false,config); 中参数的作用:

  • true: 持久化,表示队列在消息代理(例如 RabbitMQ)重启后仍然存在。
  • false: 非独占,表示该队列不会被其他连接独占使用。
  • false: 不自动删除,表示即使没有消费者连接,队列也不会被自动删除。
  • config: 包含了上述配置的 Map 对象,将这些配置应用到队列上。

总体来说,这段代码创建了一个具有消息过期和死信队列功能的队列 "queueA",并配置了过期消息发送到名为 "ExchangeB" 的交换器,并指定了死信的路由键为 "bb"。这样的配置在处理消息的时候能够更加灵活,并且对于消息的生命周期有了额外的控制。


2.配置消费者接受类接受直连交换机的路由键

queueA

@Component
@SuppressWarnings("all")
@Slf4j
@RabbitListener(queues = "queueA")
public class ReceiverQA {

    @RabbitHandler
    //手动确认消息
    public void process(String msg, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long tag) throws Exception{
        log.warn("QA接收到:" + msg);
        //channel.basicAck(tag,true);
        //拒绝 ture是否重新入队 会一直 循环 false 会直接变成死信
        channel.basicReject(tag,true);
        Thread.sleep(1000);
    }
}
2.1.  String msg​​​​​​​Channel channel​​​​​​​ @Header(AmqpHeaders.DELIVERY_TAG) ,long tag 方法参数解析:
  • Channel channel

    • Channel 对象表示与消息代理(例如 RabbitMQ)建立的通信通道。该通道提供了与消息代理进行交互的方法,如确认消息、拒绝消息等。在这个方法中,channel 参数用于与消息代理进行通信。
  • @Header(AmqpHeaders.DELIVERY_TAG) long tag

    • @Header 注解用于提取消息头中的信息。在这里,通过 AmqpHeaders.DELIVERY_TAG 提取了消息的传送标签(delivery tag)。
    • long tag 表示消息的唯一标识符。它是一个标识消息的数字,通常在消息代理传递消息给消费者时分配。这个标识符可以用于在确认、拒绝或重新排队消息时指定特定的消息。

总体来说,这个方法是一个消息处理方法,其中 message 参数表示消息的内容,channel 参数用于与消息代理进行通信,而 tag 参数表示消息的唯一标识符,用于在消息确认、拒绝等操作中指定特定的消息。这样的方法通常用于处理从消息队列中接收到的消息。

  2.2.channel.basicAck(tag,true); 代码解析

 channel.basicAck(tag,true); 用于确认(acknowledge)消息的方法,通常在消费者成功处理消息后调用。以下是该方法的作用和参数含义:

  • channel

    • 这是表示与消息代理建立的通信通道的对象。在很多消息队列系统中,通常通过该通道进行消息的发布、消费等操作。
  • basicAck

    • 这是确认消息的方法。它告诉消息代理,消费者已经成功处理了某个特定的消息。
  • tag

    • 这是消息的唯一标识符或者句柄。在消费者接收到消息时,消息会被分配一个唯一的标识符,这个标识符用于确认或拒绝特定的消息。
  • true

    • 这是一个布尔值,通常表示确认多个消息。如果设置为 true,则表示确认到指定 tag 及其之前的所有消息。如果设置为 false,则仅确认指定的消息。
2.3.channel.basicReject(tag,true); 代码解析:

  channel.basicReject(tag,true);是在使用 AMQP(Advanced Message Queuing Protocol)中的通道(channel)对象时,用于拒绝一条消息的方法。让我们解释这个方法及其参数的意义:

  • tag 参数:

    • tag 表示消息的唯一标识符,通常是通过消费者获取的消息标签。它用于标识要拒绝的特定消息。
  • multiple 参数:

    • multiple 是一个布尔值,用于指定是拒绝单个消息还是多个消息。如果 multiple 为 false,则表示仅拒绝标记为 tag 的单个消息。如果 multiple 为 true,则表示拒绝所有比 tag 小的、未被确认的消息。在这里,true 表示拒绝所有比 tag 小的未确认消息。
  • 作用:

    1. channel.basicReject(tag, true) 的作用是拒绝一条或多条消息。当消费者无法处理接收到的消息时,可以使用该方法将消息标记为拒绝,并根据需要将其重新排队或进入死信队列。

    2. 通过将 multiple 设置为 true,可以一次性拒绝多条消息,这对于批量处理消息的情况很有用。

    3. 拒绝消息会触发相应的处理机制,例如将消息重新排队或将其发送到死信交换器,具体取决于消息代理的配置。

总体来说,channel.basicReject(tag, true) 是用于拒绝一条或多条消息的方法,其中 tag 表示消息的标签,而 multiple 决定了是拒绝单个消息还是多个消息。

queueB 

@Component
@SuppressWarnings("all")
@Slf4j
@RabbitListener(queues = "queueB")
public class ReceiverQB {

    @RabbitHandler
    public void process(String msg) {
        log.warn("QB接收到:" + msg);
        //去数据库做修改
        //更改数据库的订单状态为取消
        //update order set status=-1 where id=#{id} and status=1
    }
}

 

 3.对死信交换机进行测试

  @RequestMapping("/send6")
    public String send6(){
        template.convertAndSend("ExchangeA","aa","43249849234");
        return "🤣";
    }

最后页面网址访问方法/send6即可

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1409701.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

nsenter比docker exec更底层的命令

文章目录 nsenter介绍安装方法简单使用 nsenter介绍 nsenter命令是一个可以在指定进程的命令空间下运行指定程序的命令。它位于util-linux包中。典型的用途就是进入容器的网络命令空间。相当多的容器为了轻量级&#xff0c;是不包含较为基础的命令的&#xff0c;比如说ip addr…

响应式Web开发项目教程(HTML5+CSS3+Bootstrap)第2版 例4-7 datalist

代码 <!doctype html> <html> <head> <meta charset"utf-8"> <title>datalist</title> </head><body> <input id"address" list"addressList"> <datalist id"addressList"…

【数据结构】 链队列的基本操作 (C语言版)

目录 一、链队列 1、链栈的定义&#xff1a; 2、链栈的优缺点&#xff1a; 二、链队列的基本操作算法&#xff08;C语言&#xff09; 1、宏定义 2、创建结构体 3、链栈的初始化 4、链队列的入队 5、链队列的出队 6、取链队列的对头元素 7、链队列的销毁 8、链…

4.列表选择弹窗(CenterListPopup)

愿你出走半生,归来仍是少年&#xff01; 环境&#xff1a;.NET 7、MAUI 在屏幕中间弹窗的列表选择弹窗。 1.布局 <?xml version"1.0" encoding"utf-8" ?> <toolkit:Popup xmlns"http://schemas.microsoft.com/dotnet/2021/maui"x…

【AIGC】Diffusers:扩散模型的开发手册说明2

前言 扩散器被设计成一个用户友好且灵活的工具箱&#xff0c;用于构建适合您用例的扩散系统。工具箱的核心是模型和调度程序。然而 DiffusionPipeline 为方便起见将这些组件捆绑在一起&#xff0c;但您也可以解包管道并分别使用模型和调度程序来创建新的扩散系统。 解构 Stab…

uniapp组件库Modal 模态框 的使用方法

目录 #平台差异说明 #基本使用 #传入富文本内容 #异步关闭 #点击遮罩关闭 #控制模态框宽度 #自定义样式 #缩放效果 #API #Props #Event #Method #Slots 弹出模态框&#xff0c;常用于消息提示、消息确认、在当前页面内完成特定的交互操作。 #平台差异说明 AppH5微…

gin如何实现热更新

什么是热更新&#xff1f; 一种不需要用户关闭应用或重新启动设备就能进行的软件更新技术。它可以快速地在线修复或升级应用程序的错误或功能&#xff0c;从而减少用户的等待时间并提高用户体验。 如何优雅停止服务&#xff1f; Go 1.8版本之后&#xff0c; http.Server 内置…

CentOS使用

1.使用SSH连接操作虚拟机中的CentOS 使用代理软件(MobaX/Xshell)通过ssh连接vmware中的虚拟机,可以摆脱vmware笨重的软件,直接在代理软件中进行操作. 包括使用云虚拟器,其实也只是在本地通过ssh连接别处的云服务商的硬件而已. 1.1 配置静态IP 为什么要配置静态IP? 想要使用…

构建高可用消息队列系统 01

构建高可用消息队列系统 01 引言1. RabbitMQ简介介绍1.1 什么是RabbitMQ1.2 RabbitMQ的核心特性1.3 RabbitMQ与AMQP 2.安装RabbitMQ3.消息队列实践总结 引言 在当今互联网时代&#xff0c;消息队列系统扮演着至关重要的角色&#xff0c;它们被广泛应用于分布式系统、微服务架构…

Linux编辑器vim(含vim的配置)

文章目录 前言vim的基本概念vim基本操作进入vim模式切换退出vim vim指令vim命令模式指令vim底行模式命令 简单vim配置 前言 本篇文章&#xff0c;小编将介绍Linux编辑器–>vim以及vim的配置。 vim的基本概念 正常/普通/命令模式(Normal mode) 控制屏幕光标的移动&#xf…

年夜饭都吃什么菜?年夜饭菜谱保存到手机便签更便捷

农历除夕&#xff0c;是我国一年中最为重要的传统节日之一&#xff0c;而在这一天&#xff0c;全家团圆共进年夜饭是一种重要的仪式感。然而&#xff0c;随着现代生活的繁忙&#xff0c;很多人都在为年夜饭吃什么菜而发愁。年夜饭是一顿团圆、美好的大餐&#xff0c;选择一些好…

基于非缓冲区文件操作(实现cp的功能)

打开文件 -- open open(const char *pathname, int flags); open(const char *pathname, int flags, mode_t mode); 形参&#xff1a; pathname -- 文件的路径 flags&#xff1a;下面的宏定义必须选择一个 O_RDONLY -- 只读 O_WRONLY -- 只写 O_RDWR --…

selenium执行出现异常,SessionNotCreatedException ChromeDriver only supports

问题现状&#xff1a; 运行程序报错&#xff1a; selenium.common.exceptions.SessionNotCreatedException: Message: session not created: This version of ChromeDriver only supports Chrome version 114 Current browser version is 121.0.6167.85 with binary path /App…

8.6跳跃游戏②(LC45-M)

算法&#xff1a; 与上一题一样&#xff0c;还是看最大覆盖范围 要从覆盖范围出发&#xff0c;不管怎么跳&#xff0c;覆盖范围内一定是可以跳到的&#xff0c;以最小的步数增加覆盖范围&#xff0c;覆盖范围一旦覆盖了终点&#xff0c;得到的就是最少步数&#xff01; 这里…

MyBatis 批量插入数据优化

前言 最近在项目上遇到了批量插入的场景问题&#xff0c;由于每次需要插入超过 10w 的数据量并且字段也蛮多的导致如果使用循环单次插入的方式插入数据插入的效率不高。相信读者们在实际开发中也遇到过这样类似的场景&#xff0c;那么批量插入如何实现呢&#xff1f; 其实我也…

本地部署Tomcat开源服务器并结合内网穿透远程访问

文章目录 前言1.本地Tomcat网页搭建1.1 Tomcat安装1.2 配置环境变量1.3 环境配置1.4 Tomcat运行测试1.5 Cpolar安装和注册 2.本地网页发布2.1.Cpolar云端设置2.2 Cpolar本地设置 3.公网访问测试4.结语 前言 Tomcat作为一个轻量级的服务器&#xff0c;不仅名字很有趣&#xff0…

闲人闲谈PS之五十二——虚拟组装部件超过99的问题

惯例闲话&#xff1a;分享 朱龙春老师的一篇随笔《只是做点ERP&#xff0c;没必要跳楼吧 。 多大的事呢&#xff0c;生命面前&#xff0c;一切都是个P》的感想&#xff0c;故事大致是这样&#xff0c;客户不断提需求&#xff0c;技术出身的愣头青项目经理扛不住项目成本压力&am…

初识Docker(架构、安装Docker)

一、什么是Docker Docker 是一个开源的应用容器引擎&#xff0c;它允许开发者将应用程序及其依赖打包到一个轻量级、可移植的容器中。这些容器可以在不同的计算平台上运行&#xff0c;如Linux和Windows&#xff0c;并且可以实现虚拟化。Docker 的设计目标是提供一种快速且轻量…

C++数据结构——红黑树

一&#xff0c;关于红黑树 红黑树也是一种平衡二叉搜索树&#xff0c;但在每个节点上增加一个存储位表示节点的颜色&#xff0c;颜色右两种&#xff0c;红与黑&#xff0c;因此也称为红黑树。 通过对任意一条从根到叶子的路径上各个节点着色方式的限制&#xff0c;红黑树可以…

01 质数筛

一、根据概念进行枚举 1、判断质数的枚举算法 根据概念:除了1和它本身以外没有其他约数的数为质数 //输入一个数n&#xff0c;判断n是不是质数 #include<bits/stdc.h> using namespace std;int main(){int n;cin>>n;//根据概念:除了1和它本身以外没有其他约数的…