RabbitMQ-死信交换机

news2025/1/11 3:56:34

文章目录

  • 1 死信交换机
    • 1.1.什么是死信交换机
    • 1.2 利用死信交换机接收死信
    • ==1.3 总结==
  • 2 TTL
    • 2.1 接收超时死信的死信交换机
    • 2.2 声明一个队列,并且指定TTL
    • 2.3 发送消息时,设定TTL
    • 2.4.总结
  • 3 延迟队列
    • 3.1 DelayExchange原理
    • 3.2 使用DelayExchange
    • 3.3 总结

1 死信交换机

1.1.什么是死信交换机

什么是死信?

当一个队列中的消息满足下列情况之一时,可以成为死信(dead letter):

  • 消费者使用basic.reject或 basic.nack声明消费失败,并且消息的requeue参数设置为false
  • 消息是一个过期消息,超时无人消费
  • 要投递的队列消息满了,无法投递

如果这个包含死信的队列配置了dead-letter-exchange属性,指定了一个交换机,那么队列中的死信就会投递到这个交换机中,而这个交换机称为死信交换机(Dead Letter Exchange,检查DLX)。

如图,一个消息被消费者拒绝了,变成了死信:
在这里插入图片描述
因为simple.queue绑定了死信交换机 dl.direct,因此死信会投递给这个交换机:
在这里插入图片描述
如果这个死信交换机也绑定了一个队列,则消息最终会进入这个存放死信的队列:
在这里插入图片描述

另外,队列将死信投递给死信交换机时,必须知道两个信息:

  • 死信交换机名称
  • 死信交换机与死信队列绑定的RoutingKey

这样才能确保投递的消息能到达死信交换机,并且正确的路由到死信队列。
在这里插入图片描述

1.2 利用死信交换机接收死信

在失败重试策略中,默认的RejectAndDontRequeueRecoverer会在本地重试次数耗尽后,发送reject给RabbitMQ,消息变成死信,被丢弃。

我们可以给simple.queue添加一个死信交换机,给死信交换机绑定一个队列。这样消息变成死信后也不会丢弃,而是最终投递到死信交换机,路由到与死信交换机绑定的队列。

我们在consumer服务中,定义一组死信交换机、死信队列:

// 声明普通的 simple.queue队列,并且为其指定死信交换机:dl.direct
@Bean
public Queue simpleQueue2(){
    return QueueBuilder.durable("simple.queue") // 指定队列名称,并持久化
        .deadLetterExchange("dl.direct") // 指定死信交换机
        .build();
}
// 声明死信交换机 dl.direct
@Bean
public DirectExchange dlExchange(){
    return new DirectExchange("dl.direct", true, false);
}
// 声明存储死信的队列 dl.queue
@Bean
public Queue dlQueue(){
    return new Queue("dl.queue", true);
}
// 将死信队列 与 死信交换机绑定
@Bean
public Binding dlBinding(){
    return BindingBuilder.bind(dlQueue()).to(dlExchange()).with("simple");
}

1.3 总结

什么样的消息会成为死信?

  • 消息被消费者reject或者返回nack
  • 消息超时未消费
  • 队列满了

死信交换机的使用场景是什么?

  • 如果队列绑定了死信交换机,死信会投递到死信交换机;
  • 可以利用死信交换机收集所有消费者处理失败的消息(死信),交由人工处理,进一步提高消息队列的可靠性。

2 TTL

一个队列中的消息如果超时未消费,则会变为死信,超时分为两种情况:

  • 消息所在的队列设置了超时时间
  • 消息本身设置了超时时间

在这里插入图片描述

2.1 接收超时死信的死信交换机

在consumer服务的SpringRabbitListener中,定义一个新的消费者,并且声明 死信交换机、死信队列:

@RabbitListener(bindings = @QueueBinding(
    value = @Queue(name = "dl.ttl.queue", durable = "true"),
    exchange = @Exchange(name = "dl.ttl.direct"),
    key = "ttl"
))
public void listenDlQueue(String msg){
    log.info("接收到 dl.ttl.queue的延迟消息:{}", msg);
}

2.2 声明一个队列,并且指定TTL

要给队列设置超时时间,需要在声明队列时配置x-message-ttl属性:

@Bean
public Queue ttlQueue(){
    return QueueBuilder.durable("ttl.queue") // 指定队列名称,并持久化
        .ttl(10000) // 设置队列的超时时间,10秒
        .deadLetterExchange("dl.ttl.direct") // 指定死信交换机
        .build();
}

注意,这个队列设定了死信交换机为dl.ttl.direct

声明交换机,将ttl与交换机绑定:

@Bean
public DirectExchange ttlExchange(){
    return new DirectExchange("ttl.direct");
}
@Bean
public Binding ttlBinding(){
    return BindingBuilder.bind(ttlQueue()).to(ttlExchange()).with("ttl");
}

发送消息,但是不要指定TTL:

@Test
public void testTTLQueue() {
    // 创建消息
    String message = "hello, ttl queue";
    // 消息ID,需要封装到CorrelationData中
    CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
    // 发送消息
    rabbitTemplate.convertAndSend("ttl.direct", "ttl", message, correlationData);
    // 记录日志
    log.debug("发送消息成功");
}

发送消息的日志:

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-VilczII5-1669978976776)(assets/image-20210718191657478.png)]

查看下接收消息的日志:

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-Rk4jWyng-1669978976777)(assets/image-20210718191738706.png)]

因为队列的TTL值是10000ms,也就是10秒。可以看到消息发送与接收之间的时差刚好是10秒。

2.3 发送消息时,设定TTL

在发送消息时,也可以指定TTL:

@Test
public void testTTLMsg() {
    // 创建消息
    Message message = MessageBuilder
        .withBody("hello, ttl message".getBytes(StandardCharsets.UTF_8))
        .setExpiration("5000")
        .build();
    // 消息ID,需要封装到CorrelationData中
    CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
    // 发送消息
    rabbitTemplate.convertAndSend("ttl.direct", "ttl", message, correlationData);
    log.debug("发送消息成功");
}

查看发送消息日志:

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-xRKftnIo-1669978976778)(assets/image-20210718191939140.png)]

接收消息日志:

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-346X1ExA-1669978976779)(assets/image-20210718192004662.png)]

这次,发送与接收的延迟只有5秒。说明当队列、消息都设置了TTL时,任意一个到期就会成为死信。

2.4.总结

消息超时的两种方式是?

  • 队列设置ttl属性,进入队列后超过ttl时间的消息变为死信
  • 消息设置ttl属性,队列接收到消息超过ttl时间后变为死信

如何实现发送一个消息20秒后消费者才收到消息?

  • 给消息的目标队列指定死信交换机
  • 将消费者监听的队列绑定到死信交换机
  • 发送消息时给消息设置超时时间为20秒

3 延迟队列

利用TTL结合死信交换机,我们实现了消息发出后,消费者延迟收到消息的效果。这种消息模式就称为延迟队列(Delay Queue)模式。

延迟队列的使用场景包括:

  • 延迟发送短信
  • 用户下单,如果用户在15 分钟内未支付,则自动取消
  • 预约工作会议,20分钟后自动通知所有参会人员

因为延迟队列的需求非常多,所以RabbitMQ的官方也推出了一个插件,原生支持延迟队列效果。

这个插件就是DelayExchange插件。参考RabbitMQ的插件列表页面:https://www.rabbitmq.com/community-plugins.html

在这里插入图片描述

使用方式可以参考官网地址:https://blog.rabbitmq.com/posts/2015/04/scheduling-messages-with-rabbitmq

3.1 DelayExchange原理

DelayExchange需要将一个交换机声明为delayed类型。当我们发送消息到delayExchange时,流程如下:

  • 接收消息
  • 判断消息是否具备x-delay属性
  • 如果有x-delay属性,说明是延迟消息,持久化到硬盘,读取x-delay值,作为延迟时间
  • 返回routing not found结果给消息发送者
  • x-delay时间到期后,重新投递消息到指定队列

3.2 使用DelayExchange

插件的使用也非常简单:声明一个交换机,交换机的类型可以是任意类型,只需要设定delayed属性为true即可,然后声明队列与其绑定即可。

1)声明DelayExchange交换机

  • 基于注解方式(推荐):

在这里插入图片描述

  • 基于@Bean的方式:

在这里插入图片描述

2)发送消息

发送消息时,一定要携带x-delay属性,指定延迟的时间:

在这里插入图片描述

3.3 总结

延迟队列插件的使用步骤包括哪些?

  • 声明一个交换机,添加delayed属性为true

  • 发送消息时,添加x-delay头,值为超时时间

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/55269.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

智云通CRM:如何判断客户忠诚度的高低?

客户忠诚度是一个相对概念,说明了客户在购买同类产品或服务时对某一企业或品牌光顾比重的高低。注意,不能跨产品或服务进行客户忠诚度比较,因为这样比较是没有意义的。客户忠诚度可以通过以下指标来衡量。 一、 客户重复购买的次数 客户重复…

【LeetCode每日一题:1769. 移动所有球到每个盒子所需的最小操作数~~~双重循环遍历模拟】

题目描述 有 n 个盒子。给你一个长度为 n 的二进制字符串 boxes ,其中 boxes[i] 的值为 ‘0’ 表示第 i 个盒子是 空 的,而 boxes[i] 的值为 ‘1’ 表示盒子里有 一个 小球。 在一步操作中,你可以将 一个 小球从某个盒子移动到一个与之相邻…

区块链共识机制 (Consensus)(PoW,PoS,PAXOS,RAFT,PBFT)

文章目录ConsensusProof of Work(PoW)Proof of Stake(PoS)PAXOSPhases in PAXOSPrepare PhaseAccept PhaseReplicated And Fault Tolerant(RAFT)Leader ElectionLog ReplicationPractical Byzantine Fault Tolerance (…

Ubuntu18.04开机自动启动终端并运行脚本

目录 1.创建测试脚本文件 2.添加到开机自启动 1.创建测试脚本文件 CtrlAltT打开终端在终端中输入以下指令,创建.sh文件。 touch “文件名”.sh 双击打开test.sh文件,输入以下测试代码,并保存 #!/bin/bash source /opt/ros/melodic/setu…

用于Python降维的线性判别分析

减少预测模型的输入变量数称为降维。 较少的输入变量可以产生更简单的预测模型,该模型在对新数据进行预测时可能具有更好的性能。 线性判别分析(简称LDA)是一种用于多类分类的预测建模算法。它还可以用作降维技术,提供训练数据集…

Unity 3D 导航系统||Unity 3D 障碍物

Unity 3D 导航系统 过去,游戏开发者必须自己打造寻路系统,特别是在基于节点的寻路系统中,必须手动地在 AI 使用的点之间进行导航,因此基于节点系统的寻路非常烦琐。 Unity 3D 不仅具有导航功能,还使用了导航网格&…

牛客练习赛106 药丸

牛客练习赛106 药丸 2022.12.02 与舍友四排玩了团体对抗的模式,练习赛就只有40分钟的剩余时间了。 题目描述 来源:牛客网 牛牛有 nnn 个属性,第 iii 个属性的初始值为 aia_iai​ ,牛牛想把第 iii 个属性的值变为目标值 bib_ib…

享元模式Flyweight

1.意图:运用共享技术有效地支持大量细粒度的对象。 2.结构 描述一个接口,通过这个接口Flyweight可以接受并作用于外部状态; ConcreteFlyweight实现Flyweight接口,并为内部状态(如果有)增加存储空间。Conr…

超融合时序数据库YMatrixDB与PostGIS案例

目录 什么是PostGIS PostGIS的特点 PostGIS 基础知识 OGC的WKB和WKT格式 插入数据实例 EWKT、EWKB和Canonical格式 插入数据实例 SQL-MM格式 常几何类型和函数 常用操作符 常用操作函数 OGC标准函数 管理函数 几何对象关系函数 几何对象处理函数 几何对象存取函…

Redis中的事务可以满足ACID属性吗?

前言 事务是数据库操作的最小工作单元,由一个有限的数据库操作序列构成。这些操作要么全部执行,要么全部不执行,是一个不可分割的工作单位。一键获取最先java文档。 事务在执行时,会提供专门的属性保证:原子性、一致性…

[附源码]Python计算机毕业设计Django高校学生摄影作品展示平台

项目运行 环境配置: Pychram社区版 python3.7.7 Mysql5.7 HBuilderXlist pipNavicat11Djangonodejs。 项目技术: django python Vue 等等组成,B/S模式 pychram管理等等。 环境需要 1.运行环境:最好是python3.7.7,…

sentinel读取监控文件分析

主要分析的类 com.alibaba.csp.sentinel.dashboard.metric.MetricFetcher在sentinel监控数据db持久化中,更换了MetricController的存储实现,可以发现com.alibaba.csp.sentinel.dashboard.repository.metric.MetricsRepository原来只有com.alibaba.csp.s…

第4季3:Hi3518e的sensor接口引脚复用设置

以下内容源于朱有鹏嵌入式课程的学习与整理,如有侵权请告知删除。 在第2、3季的内容中,在板载系统的配置脚本即/etc/profile文件中,都有如下这句代码: ./load3518e -i -sensor ar0130 -osmem 32 -total 64 在第4季1&#xff1a…

MAC glucuronide linker-1/MAC glucuronide linker-2蛋白降解酶

ERRa_PROTAC(Cpd11,D1oonM40%)蛋白降解活性随着Linker的增长而减弱。Linker -(CHz)s-的化合物13d在浓度30 nM时能够降解58%的蛋白,浓度为100 nM时能够降解78%的ERRα蛋白,其降解活性不如 13c。Linker -(CHz)z-的13f&…

java计算机毕业设计ssm齐市疫苗管理系统w80jw(附源码、数据库)

java计算机毕业设计ssm齐市疫苗管理系统w80jw(附源码、数据库) 项目运行 环境配置: Jdk1.8 Tomcat8.5 Mysql HBuilderX(Webstorm也行) Eclispe(IntelliJ IDEA,Eclispe,MyEclispe,Sts都支持&#xff0…

2023最新SSM计算机毕业设计选题大全(附源码+LW)之java高校心理咨询管理系统0e78p

大部分步骤是 1.确定选题 选题的确定需要查阅大量的资料,要搞清楚自己大概想要研究的方向是什么。可以选择自己感兴趣的学科或者强势的学科进行研究,同时要多和毕业指导老师多交流,征求老师的意见和建议,最后确立选题。计算机专业…

面试官:你知道 Java 中的回调机制吗?

调用和回调机制 在一个应用系统中, 无论使用何种语言开发, 必然存在模块之间的调用, 调用的方式分为几种。 1.同步调用 同步调用是最基本并且最简单的一种调用方式, 类A的方法a()调用类B的方法b(), 一直等待b()方法执行完毕, a()方法继续往下走. 这种调用方式适用于方法b()执…

【论文速读】Scene Text Telescope: Text-Focused Scene Image Super-Resolution

前言 在阅读这篇文章的时候,我花费了近一周的时间在将其基本消化理解,至于为什么花费如此长的时间,我发现主要原因是我对transformer一知半解,所以在transformer中提出的名词,例如:Postion-Wise Feed-Forw…

365天深度学习训练营-第P3周:天气识别

🍨 本文为🔗365天深度学习训练营 内部限免文章(版权归 K同学啊 所有)🍦 参考文章地址: 🔗第P3周:天气识别 | 365天深度学习训练营🍖 作者:K同学啊 | 接辅导、…

视频转文字怎么操作?这三种转换方法你该学会

如今短视频让各种知识传播变得生动形象,但是视频学习对于后期的整理复习不是很便捷,现在教大家一种好用的视频知识整理方法,那就是视频转文字,可以将视频内容轻松转换为文字形式。那么就有人问了,怎样转换才更简单呢&a…