【RabbitMQ】死信(延迟队列)的使用

news2025/1/15 21:08:20

目录

一、介绍

1、什么是死信队列(延迟队列)

2、应用场景

3、死信队列(延迟队列)的使用

4、死信消息来源

二、案例实践

1、案例一

2、案例二(消息接收确认 )

3、总结


一、介绍

1、什么是死信队列(延迟队列)

  •         死信,在官网中对应的单词为“Dead Letter”,它是 RabbitMQ 的一种消息机制。
  •         死信队列(Dead Letter Queue)延迟队列(Delay Queue)是两种不同的队列类型,但在实际应用中它们可以结合使用。
  •         死信队列是当消息在队列中因为过期、被拒绝等原因无法正常处理时,会被重新发送到另一个交换机上,这个交换机就是死信交换机。死信队列可以用于实现重试机制、日志审计等特殊应用逻辑。

        延迟队列则是一种特殊的队列类型,它允许将消息延迟指定的时间后才能被消费者消费。这种队列通常用于处理那些需要在特定时间点被处理的任务,例如定时任务、限时优惠等。在RabbitMQ中,可以通过设置消息的TTL(生存时间)来实现延迟队列的功能。当消息在队列中超过了TTL,它就会被移除并被发送到指定的死信交换机,进而被路由到死信队列中。

结合使用死信队列和延迟队列可以实现一些复杂的应用逻辑。

例如:

        可以将某个需要延迟处理的消息发送到延迟队列中,并在消息过期之前将其存储在死信队列中。这样,当消息从延迟队列中移除时,它会被自动发送到死信队列中,然后由消费者消费并执行相应的操作。这种结合使用的方式可以提供更高的灵活性和可靠性,使得系统能够更好地应对各种异常情况。

2、应用场景

死信队列(Dead Letter Queue)和延迟队列(Delay Queue)在以下应用场景中表现优异:

        这些场景都是通过结合使用死信队列和延迟队列来提高系统的可靠性、鲁棒性和灵活性。通过合理地设置死信队列和延迟队列的参数,可以实现各种复杂的业务逻辑和异常处理机制。

  1. 库存解锁服务:例如,当一个商品被锁定且无法被购买时,可以将其放入死信队列中,并在一定时间后重新发送到原始队列进行处理,从而解锁库存。
  2. 定时关单功能:例如,用户在商城下单成功并点击去支付后,如果在指定时间内未支付,系统可以自动将订单放入死信队列中,并在一定时间后重新发送到原始队列进行处理。
  3. 保证订单业务的消息数据不丢失:当消息消费发生异常时,可以将消息投入死信队列中,以便后续等到环境好了之后,再消费死信队列中的消息。
  4. 实现重试机制:当某个消息处理失败时,可以将它放入死信队列中,并在一段时间后重新发送到交换机进行重试。这样可以提高系统的鲁棒性,确保消息能够被正确处理。
  5. 处理定时任务和秒杀活动:使用延迟队列可以将消息延迟到特定的时间后进行处理,例如定时任务、秒杀活动等。这样可以确保在特定的时间点执行相应的操作。
  6. 日志处理和审计:将日志消息发送到死信队列中,可以在日志发生异常时进行记录和审计,以便分析和排查问题。

3、死信队列(延迟队列)的使用

使用死信队列(Dead Letter Queue)和延迟队列(Delay Queue)可以提高系统的可靠性和灵活性,特别是在处理异常情况、重试机制和定时任务等方面。

需要注意的是,在使用死信队列和延迟队列时,需要考虑系统的可用性和性能。如果大量的消息被放入死信队列中,可能会导致系统资源的过度消耗。因此,需要合理配置死信队列和延迟队列的大小和数量,以及监控和管理系统的性能和资源使用情况。

  1. 定义死信队列和延迟队列:在RabbitMQ中,可以在定义队列时指定队列类型为死信队列或延迟队列。例如,使用RabbitMQ的命令行工具或管理界面创建死信队列或延迟队列。
  2. 配置交换机和队列属性:在绑定交换机和队列时,可以设置一些属性来控制消息的路由和消费。例如,可以设置消息的TTL(生存时间)来实现延迟队列的功能,或者设置消息的优先级和延迟时间等属性。
  3. 发送消息到队列:使用RabbitMQ的生产者将消息发送到定义的队列中。如果需要将消息放入延迟队列中,可以在发送消息时设置相应的延迟时间。
  4. 处理消息:消费者从队列中获取消息并进行处理。如果消息无法正常处理,可以将它放入死信队列中。在处理消息时,可以根据需要设置重试机制,以便在消息处理失败时重新发送消息到队列中进行重试。
  5. 监控和管理:使用RabbitMQ的管理界面或命令行工具监控和管理死信队列和延迟队列的状态和消息。可以查看队列的大小、消息的延迟时间、消费者数量等信息,并根据需要进行调整和管理。

4、死信消息来源

  •  消息 TTL 过期
  • 队列满了,无法再次添加数据
  • 消息被拒绝(reject 或 nack),并且 requeue =false

二、案例实践

1、案例一

生产者

在生产者的Config里面添加队列 。

 //    =============死信队列===============
    // 定义QueueA Bean,返回QueueA实例
    @Bean
    public Queue QueueA() {
        Map<String, Object> config = new HashMap<>();
        //message在该队列queue的存活时间最大为5秒
        config.put("x-message-ttl", 5000);
        //x-dead-letter-exchange参数是设置该队列的死信交换器(DLX)
        config.put("x-dead-letter-exchange", "deadExchange");
        //x-dead-letter-routing-key参数是给这个DLX指定路由键
        config.put("x-dead-letter-routing-key", "deadQueue");
        // 返回QueueA实例
        return new Queue("QueueA", true, true, false, config);
    }

    // 定义DirectExchangeA Bean,返回DirectExchangeA实例
    @Bean
    public DirectExchange DirectExchangeA() {
        // 返回DirectExchangeA实例
        return new DirectExchange("DirectExchangeA");
    }

    // 定义bindingA Bean,将QueueA绑定到DirectExchangeA,并设置路由键为A.A
    @Bean
    public Binding bindingA() {
        // 将QueueA绑定到DirectExchangeA
        return BindingBuilder
                // 设置路由键为A.A
                .bind(QueueA())
                .to(DirectExchangeA())
                .with("A.A");
    }

    // 定义QueueB Bean,返回QueueB实例
    @Bean
    public Queue QueueB() {
        // 返回QueueB实例
        return new Queue("QueueB");
    }

    // 定义DirectExchangeB Bean,返回DirectExchangeB实例
    @Bean
    public DirectExchange DirectExchangeB() {
        // 返回DirectExchangeB实例
        return new DirectExchange("DirectExchangeB");
    }

    // 定义bindingB Bean,将QueueB绑定到DirectExchangeB,并设置路由键为B.B
    @Bean
    public Binding bindingB() {
        // 将QueueB绑定到DirectExchangeB
        return BindingBuilder
                // 设置路由键为B.B
                .bind(QueueB())
                .to(DirectExchangeB())
                .with("B.B");
    }

消费者

在消费者里面添加QueueAQueueB 

QueueA

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
@SuppressWarnings("all")
@Slf4j
@RabbitListener(queues = "QueueA")
public class ReceiverQA {
 
        // 接收directExchange01交换机中Queue02队列消息的方法
        @RabbitHandler
        public void Queue02(String msg) {
            log.warn("QueueA,接收到信息:" + msg);
        }
 
}

QueueB

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
@SuppressWarnings("all")
@Slf4j
@RabbitListener(queues = "QueueB")
public class ReceiverQB {

    // 接收directExchange01交换机中Queue02队列消息的方法
    @RabbitHandler
    public void Queue02(String msg) {
        log.warn("QueueB,接收到信息:" + msg);
    }

}

编写Controller层

    @RequestMapping("send5")
    public String send5() {
        rabbitTemplate.convertAndSend("DirectExchangeA", "A.A", "Hello,A");
        return "🐉";
    }

结果:

2、案例二(消息接收确认 )

  • 如果某个服务忘记 ACK 了,则 RabbitMQ 不会再发送数据给它,因为 RabbitMQ 认为该服务的处理能力有限

  • ACK 机制还可以起到限流作用,比如在接收到某条消息时休眠几秒钟

  • 消息确认模式有:

    • AcknowledgeMode.NONE:自动确认

    • AcknowledgeMode.AUTO:根据情况确认

    • AcknowledgeMode.MANUAL:手动确认

配置文件
        默认情况下消息消费者是自动 ack (确认)消息的,如果要手动 ack(确认)则需要修改确认模式为 manual

spring:
  rabbitmq:
    listener:
      simple:
        acknowledge-mode: manual

在消费者里的更改ReceiverQA 

import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.support.AmqpHeaders;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.stereotype.Component;

import java.io.IOException;

@Component
@SuppressWarnings("all")
@Slf4j
@RabbitListener(queues = "QueueA")
public class ReceiverQA {

    // 接收directExchange01交换机中Queue02队列消息的方法
    @RabbitHandler
    public void QueueA(String msg, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long tag) throws IOException {
        log.warn("QueueA,接收到信息:" + msg);
        channel.basicAck(tag, false);
    }

}
  • 需要注意的 basicAck 方法需要传递两个参数

    • deliveryTag(唯一标识 ID):当一个消费者向 RabbitMQ 注册后,会建立起一个 Channel ,RabbitMQ 会用 basic.deliver 方法向消费者推送消息,这个方法携带了一个 delivery tag, 它代表了 RabbitMQ 向该 Channel 投递的这条消息的唯一标识 ID,是一个单调递增的正整数,delivery tag 的范围仅限于 Channel

    • multiple:为了减少网络流量,手动确认可以被批处理,当该参数为 true 时,则可以一次性确认 delivery_tag 小于等于传入值的所有消息

3、总结

  • 持久化

    • exchange要持久化

    • queue要持久化

    • message要持久化

  • 消息确认

    • 启动消费返回(@ReturnList注解,生产者就可以知道哪些消息没有发出去)

    • 生产者和Server(broker)之间的消息确认

    • 消费者和Server(broker)之间的消息确认

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1411218.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

脉脉爆料,鹅厂IEG,T11大佬,年终奖94w。。。

今年行情确实是非常艰难&#xff0c;很多朋友表示年终奖减半或者没了 &#xff0c;不过来看看互联网大厂&#xff0c;互联网大厂年终奖确实比较给力&#xff0c;这是我个人亲身感受&#xff0c;运气好的时候年终奖让人吃惊。 比如&#xff0c;一个来自鹅厂IEG T11大佬再脉脉爆n…

【深度优先搜索】【C++算法】834 树中距离之和

作者推荐 【动态规划】【map】【C算法】1289. 下降路径最小和 II 本文涉及知识点 深度优先搜索 树 图论 LeetCode834 树中距离之和 给定一个无向、连通的树。树中有 n 个标记为 0…n-1 的节点以及 n-1 条边 。 给定整数 n 和数组 edges &#xff0c; edges[i] [ai, bi]表…

核心类库ArrayList、hashMap等

八. 核心类库 1. ArrayList 数组缺点 ArrayList&#xff0c;它常常被用来替代数组 数组的缺点&#xff1a;不能自动扩容&#xff0c;比如已经创建了大小为 5 的数组&#xff0c;再想放入一个元素&#xff0c;就放不下了&#xff0c;需要创建更大的数组&#xff0c;还得把旧…

python 学习之 re库的基本使用(正则匹配)上

目录 一、基本用法 二、函数介绍 1、match函数 2、search 函数 3、compile 函数 4、findall 和 finditer 函数 5、sub 函数和 subn 函数 6、split 函数 一、基本用法 首先我们需要引入 re 库 代码基本框架使用两行代码实现 测试代码&#xff1a; import reret re.m…

Pandas设置图像宽高、分辨率、背景色、显示中文、增加子区域、图题、坐标名称、网格线

figure方法 figure方法可以设置绘图对象的长、宽、分辨率及背景颜色等。 import pandas as pd import matplotlib.pyplot as pltdata pd.read_csv(data/iris.csv) plt.figure(figsize(12, 3), dpi120, facecolorred) x data[sepal_length] plt.plot(x) plt.show()一个平面多…

【K8S 云原生】K8S的安全机制

目录 一、K8S安全机制概述 1、概念 2、请求apiserver资源的三个步骤&#xff1a; 一、认证&#xff1a;Anthentcation 1、认证的方式&#xff1a; 1、HTTP TOKEN&#xff1a; 2、http base&#xff1a; 3、http证书&#xff1a; 2、认证的访问类型&#xff1a; 3、签发…

【并发编程】指令集并行原理

&#x1f4dd;个人主页&#xff1a;五敷有你 &#x1f525;系列专栏&#xff1a;并发编程 ⛺️稳重求进&#xff0c;晒太阳 指令集并行原理 名词&#xff1a; Clock Cycle Time CPU的Clock Cycle Time(时钟周期时间)&#xff0c;等于主频的倒数。意思是CPU能识别的最…

VR数字展厅,平面静态跨越到3D立体化时代

近些年&#xff0c;VR的概念被越来越多的人提起&#xff0c;较为常见的形式就是VR数字展厅。VR数字展厅的出现&#xff0c;让各地以及各行业的展厅展馆的呈现和宣传都发生了很大的改变和革新&#xff0c;同时也意味着展览传播的方式不再局限于原来的图文、视频&#xff0c;而是…

redis的备份原理

1 Redis持久化之RDB 1RDB是什么 在指定的时间间隔内将内存中的数据集快照写入磁盘&#xff0c; 也就是行话讲的Snapshot快照&#xff0c;它恢复时是将快照文件直接读到内存里 2备份是如何执行的 Redis会单独创建&#xff08;fork&#xff09;一个子进程来进行持久化&#x…

星环科技基于第五代英特尔®至强®可扩展处理器的分布式向量数据库解决方案重磅发布

12月15日&#xff0c;2023 英特尔新品发布会暨 AI 技术创新派对上&#xff0c;星环科技基于第五代英特尔至强可扩展处理器的Transwarp Hippo分布式向量数据库解决方案重磅发布。该方案利用第五代英特尔至强可扩展处理器带来的强大算力&#xff0c;实现了约 2 倍的代际性能提升&…

2021 Google Chrome RCE漏洞分析

一、复现环境&#xff1a; Win10 Google Chrome 86.0.4240.75 二、利用复现&#xff1a; 关闭沙箱安全使用命令进行关闭 &#xff0c;在正常情况下&#xff0c;浏览器沙箱提供了一个受限制的执行环境&#xff0c;以防止恶意代码对用户系统的损害。关闭沙箱可能会导致浏览器执…

银行数据仓库体系实践(7)--数据模型设计及流程

数据仓库作为全行或全公司的数据中心和总线&#xff0c;汇集了全行各系统以及外部数据&#xff0c;通过良好的系统架构可以保证系统稳定性和处理高效性&#xff0c;那如何保障系统数据的完备性、规范性和统一性呢&#xff1f;这里就需要有良好的数据分区和数据模型&#xff0c;…

「JavaSE」抽象类接口3

&#x1f387;个人主页&#xff1a;Ice_Sugar_7 &#x1f387;所属专栏&#xff1a;快来卷Java啦 &#x1f387;欢迎点赞收藏加关注哦&#xff01; 抽象类&接口3 &#x1f349;Clonable 接口和深拷贝&#x1f34c;浅拷贝和深拷贝 &#x1f349;Object类&#x1f349;抽象类…

Effective C++ 学习

Effective C浅浅学习&#xff0c;很多不太理解 尽量用const, enum, inline 替换#define尽可能使用const确认对象在使用前就已经被初始化构造&#xff0c;析构&#xff0c;赋值运算&#xff0c;拷贝构造为多态基类声明virtual析构函数不要让析构函数抛出异常不在构造和析构过程中…

javaWebssh宠物基地管理系统myeclipse开发mysql数据库MVC模式java编程计算机网页设计

一、源码特点 java ssh宠物基地管理系统是一套完善的web设计系统&#xff08;系统采用ssh框架进行设计开发&#xff09;&#xff0c;对理解JSP java编程开发语言有帮助&#xff0c;系统具有完整的源代码和数据库&#xff0c;系统主要采用 B/S模式开发。开发环境为TOMCAT7.…

跨平台Recorder录音插件:支持多种格式、音频可视化、实时上传、语音识别

视频教程地址&#xff1a;【跨平台Recorder录音插件&#xff1a;支持多种格式、音频可视化、实时上传、语音识别】 https://www.bilibili.com/video/BV1jQ4y1c7e4/?share_sourcecopy_web&vd_sourcee66c0e33402a09ca7ae1f0ed3d5ecf7c /** 先引入Recorder &#xff08; 需先…

幻兽帕鲁服务器数据备份

搭建幻兽帕鲁个人服务器&#xff0c;最近不少用户碰到内存不足、游戏坏档之类的问题。做好定时备份&#xff0c;才能轻松快速恢复游戏进度 这里讲一下如何定时将服务器数据备份到腾讯云轻量对象存储服务&#xff0c;以及如何在有需要的时候进行数据恢复。服务器中间的数据迁移…

CI/CD

介绍一下CI/CD CI/CD的出现改变了开发人员和测试人员发布软件的方式,从最初的瀑布模型,到最后的敏捷开发(Agile Development),再到今天的DevOps,这是现代开发人员构建出色产品的技术路线 随着DevOps的兴起,出现了持续集成,持续交付和持续部署的新方法,传统的软件开发和交付方…

一文学习Thrift RPC

Thrift RPC引言 Thrift RPC的特点 Thrift 是一个RPC的框架&#xff0c;和Hessian RPC有什么区别&#xff0c;最重要的区别是Thrift可以做异构系统开发。 什么是异构系统&#xff0c;服务的提供者和服务的调用者是用不同语言开发的。 为什么会当前系统会有异构系统的调用&…

Vue3 Teleport 将组件传送到外层DOM位置

✨ 专栏介绍 在当今Web开发领域中&#xff0c;构建交互性强、可复用且易于维护的用户界面是至关重要的。而Vue.js作为一款现代化且流行的JavaScript框架&#xff0c;正是为了满足这些需求而诞生。它采用了MVVM架构模式&#xff0c;并通过数据驱动和组件化的方式&#xff0c;使…