RabbitMQ高级特性 - 消息和队列TTL、死信队列

news2024/11/13 9:18:17

文章目录

  • 消息和队列TTL
    • 概述
    • 实战开发
  • 死信队列
    • 概述
    • 实战开发

消息和队列TTL


概述

a)TTL(Time To Live 过期时间),RabbitMQ 可以对消息和队列设置 TTL. 当消息到达存活时间之后,还没有被消费,就会被自动清除.

b)注意:

  • 消息设置TTL:MQ 扫描到队列头部消息(先进先出)过期了,就会剔除消息(这里会存在一个问题:当队列头部的消息还没过期,而队列头部之后的元素过期了却不会被清理,需要等到头部元素过期被清理后,才会依次扫描后面的元素).
  • 队列设置TTL:相当于是给当前队列中的所有消息设置了一个全局过期时间,消息的 ttl 最终取决于 消息 和 队列 设置的 ttl 中的最小值.(队列设置 ttl 并非到期就删除队列,而是作用于其中的消息)
  • 消息和队列都设置TTL:只要队列过期,那么该队列中的所有消息也都会被清理. 假设队列 TTL 是 20s,消息 TTL 是 10s,那么消息的 TTL 会自动取最小值,也就是 10s.

c)消息过期了,但由于排在此消息前面的消息没有过期,,导致过期的消息不能被清理,可能会造成什么问题?
例如想通过 消息TTL + 死信队列 来实现 延迟队列 的效果,就可能存在漏洞:
当前消息过期了,但是前面的消息没有过期,于是此消息依然在队列中,进而导致消息不能在过期的时候被死信队列接收到,那么进一步导致消息不能被消费者处理(如果是实现订单过期删除业务,就要出大问题了 -> 锁库存 -> 商品即使有,也卖不出去).

实战开发

案例:给消息和队列都设置过期时间,队列的过期时间比消息时间短.

a)配置 交换机、队列、绑定

    @Bean
    fun ttlExchange(): DirectExchange = ExchangeBuilder
        .directExchange(MQConst.TTL_EXCHANGE)
        .build()

    /**
     * 消息设置过期时间有很多中方式,本质都是给扩展参数中添加 x-message-ttl
     */
    @Bean
    fun ttlQueue(): Queue = QueueBuilder
        .durable(MQConst.TTL_QUEUE)
        //.withArgument("x-message-ttl", 10 * 1000) //设置过期时间本质都是它
        .ttl(10 * 1000) // 10s 过期
        .build()
    @Bean
    fun ttlBinding(
        @Qualifier("ttlExchange") exchange: DirectExchange,
        @Qualifier("ttlQueue") queue: Queue,
    ): Binding = BindingBuilder
        .bind(queue)
        .to(exchange)
        .with(MQConst.TTL_BINDING)

b)生产者接口

import com.cyk.rabbitmq.constants.MQConst
import org.springframework.amqp.rabbit.core.RabbitTemplate
import org.springframework.web.bind.annotation.RequestMapping
import org.springframework.web.bind.annotation.RestController

@RestController
@RequestMapping("/mq")
class MQApi(
    private val rabbitTemplate: RabbitTemplate
) {

    @RequestMapping("ttl")
    fun ttl(): String {
        rabbitTemplate.convertAndSend(MQConst.TTL_EXCHANGE, MQConst.TTL_BINDING, "ttl msg 1") { msg ->
            //给 msg 配置一些属性
            val expire = 20 * 1000
            msg.messageProperties.expiration = expire.toString()
            return@convertAndSend msg
        }
        return "ok"
    }

}

c)效果演示
SpringBoot 程序一启动,就可以看到交换机队列被创建,并且队列是带 ttl 的
在这里插入图片描述
此时向队列中发送一个过期时间是 20s 的消息,由于队列的过期时间是 10s,因此入队后,消息的过期时间变为 10s.

死信队列


概述

在这里插入图片描述
a)概念
死信(dead message):可以简单理解为因为种种原因,无法被消费的信息,就是死信.
死信交换机(DLX -> Dead Letter Exchange):如果普通队列设置了对应的死信交换机,那么队列中的生成的死信就会被路由到死信交换机.
死信队列(DLQ -> Dead Letter Queue):死信交换机会根据对应的普通队列设置的 routingKey ,将死信转发给死信队列,最后交给订阅的消费者消费.

b)消息变成死信的情况:

  1. 消息过期(消息TTL / 队列TTL) .
  2. 消息被拒绝(Basic.Reject / Basic.Nack),并且设置 requeue 参数为 false.
  3. 队列设置最大长度(maxLength).

c)死信队列解决消息积压问题:
消费者再处理消息时,可能会因为以下原因,异常处理消息:

  • 网络短暂故障:通过 nack + requeue=true 可以实现消息重新入队,解决网路短暂故障.
  • 代码逻辑错误:如果还是 nack + requeue=true,就会导致消息不断重复,不仅导致负载飙升,还是导致消息积压. 那么这里就可以使用 nack + requeue=false + 死信队列 的方式,保证了消息可以被正确处理.

d)应用场景:
例如用户下订单,发送订单到普通队列,超过一定时间用户没有支付,消息就会变成死信. 此时就可以用一个消费者来处理死信,修改订单状态为取消.

e)注意事项(和之前 消息 TTL 同理):
例如想通过 消息TTL + 死信队列 来实现 延迟队列 的效果,就可能存在漏洞:
当前消息过期了,但是前面的消息没有过期,于是此消息依然在队列中,进而导致消息不能在过期的时候被死信队列接收到,那么进一步导致消息不能被消费者处理(如果是实现订单过期删除业务,就要出大问题了 -> 锁库存 -> 商品即使有,也卖不出去).

实战开发

a)分别指定 普通 和 死信 交换机、队列、绑定

@Configuration
class DLConfig {

    @Bean
    fun normalExchange(): DirectExchange = ExchangeBuilder
        .directExchange(MQConst.NORMAL_EXCHANGE)
        .build()
    @Bean
    fun normalQueue(): Queue = QueueBuilder
        .durable(MQConst.NORMAL_QUEUE)
        .deadLetterExchange(MQConst.DL_EXCHANGE)
        .deadLetterRoutingKey(MQConst.DL_BINDING)
        .ttl(10 * 1000)
        //.maxLength(10L)  //队列设置最大长度
        .build()
    @Bean
    fun normalBinding(
        @Qualifier("normalExchange") exchange: Exchange,
        @Qualifier("normalQueue") queue: Queue,
    ): Binding = BindingBuilder
        .bind(queue)
        .to(exchange)
        .with(MQConst.NORMAL_BINDING)
        .noargs() //如果 交换机 是顶级接口,这里需要 noargs()

    @Bean
    fun dlExchange(): DirectExchange = ExchangeBuilder
        .directExchange(MQConst.DL_EXCHANGE)
        .build()
    @Bean
    fun dlQueue(): Queue = QueueBuilder
        .durable(MQConst.DL_QUEUE)
        .build()
    @Bean
    fun dlBinding(): Binding = BindingBuilder
        .bind(dlQueue())
        .to(dlExchange())
        .with(MQConst.DL_BINDING)

}

b)生产者接口

    @RequestMapping("dl")
    fun dl(): String {
        rabbitTemplate.convertAndSend(MQConst.NORMAL_EXCHANGE, MQConst.NORMAL_BINDING, "dl msg 1")
        return "ok"
    }

c)演示效果
触发生产者接口,可以看到前 10s,如下
在这里插入图片描述
消息过期之后,可以看到消息变为死信,最终路由到死信队列,如下:
在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2037778.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

fmql之Linux移植

先了解以下linux移植的大致流程,以及需要的资料、软件等。 《领航者ZYNQ之嵌入式Linux开发指南_V2.0》第十八章 Linux内核移植 (amobbs.com 阿莫电子技术论坛) 前言 复旦微(他人经验) 复旦微fmql的操作指南来了: 复旦微 FMQL L…

算法:排序(前言)

所谓排序,就是使一串记录,按照其中的某个或某些关键字的大小,递增或递减的排列起来的操作。排序算法,就是如何使得记录按照要求排列的方法。排序算法在很多领域得到相当地重视,尤其是在大量数据的处理方面。一个优秀的…

还在担心Android功能不会用吗?Intro Showcase View助你快速实现功能引导

还在担心Android功能不会用吗?Intro Showcase View助你快速实现功能引导 1. 引言 在现代应用开发中,如何有效引导用户快速上手并掌握应用的核心功能,是提升用户体验的重要一环。功能引导不仅帮助用户理解复杂功能,还能提高用户留存率,减少因操作复杂度带来的用户流失。随…

将Excel数据导入到SQL Server数据库

1.找到SQLServer导入导出工具,有32位和64位 2.打开后点击 下一步 3.选择数据源、Excel文件,自动匹配Excel的版本,根据实际情况,勾选首行是否是列名 4.选择目标,如图 5.点击属性,设置要导入的目标数据库…

zookeeper+kafka群集

一 :消息队列 1:什么是消息队列 消息(Message)是指在应用间传送的数据。消息可以非常简单,比如只包含文本字符串,也可以更复杂,可能包含嵌入对象。 消息队列(Message Queue)是一种应用间的通信方式,消息发送后可以立即返回由消息…

【Datawhale X 魔搭 】AI夏令营第四期大模型方向,Task2:头脑风暴会,巧灵脑筋急转弯(持续更新)

队伍名称:巧灵脑筋急转弯 队伍技术栈:python,LLM,RAG,大模型,nlp,Gradio,Vue,java 队友:知唐(队长),我真的敲不动…

RCE绕过技巧

目录 EVAL长度限制突破技巧 1.使用反引号 2.file_put_contents写入文件 3.php5.6变长参数usort回调后门 命令长度限制突破技巧 1.拼接文件名 无字母数字的webshell命令执行 1.取反码 2.上传临时文件 EVAL长度限制突破技巧 分析代码:首先传递一个param参数&…

优思学院|日本制造的质量真的那么好吗?回顾日本的质量发展

时代不停改变,时移势易,没有事情是永恒的。 事实上,在20世纪40年代,日本产品常被认为是廉价、劣质的制品,并不如大家所想像的那么好。日本的工业领导者意识到了这个问题,决心生产出创新的高质量产品。 一…

Docker 网络代理配置及防火墙设置指南

Docker 网络代理配置及防火墙设置指南 背景 在某些环境中,服务器无法直接访问外网,需要通过网络代理进行连接。虽然我们通常会在 /etc/environment 或 /etc/profile 等系统配置文件中直接配置代理,但 Docker 命令无法使用这些配置。例如&am…

Python | Leetcode Python题解之第332题重新安排行程

题目: 题解: class Solution:def findItinerary(self, tickets: List[List[str]]) -> List[str]:def dfs(curr: str):while vec[curr]:tmp heapq.heappop(vec[curr])dfs(tmp)stack.append(curr)vec collections.defaultdict(list)for depart, arri…

KillWxapkg-自动化反编译微信小程序工具(附安装包)

KillWxapkg是一款纯Golang实现,一个用于自动化反编译微信小程序、小程序安全评估工具工具,小程序安全利器, 可以自动解密,解包,可还原工程目录,支持Hook,小程序修改,支持微信开发者工…

Java 实现 B树(通俗易懂)

目录 一.概念 二.节点定义 三.插入操作 1.查找位置 2.插入 3.分裂 四.B树和B*树 1.B树 2.B*树 一.概念 B树是一颗多叉平衡树,空树也是多叉平衡树。 一颗M阶的B树要满足以下条件: 1.根节点至少有两个孩子; 2.每个非根节点至少有(上…

iPhone微信过期图片怎么恢复?4招轻松搞定

微信作为最受欢迎的即时通讯工具之一,经常用来分享生活中的点点滴滴。然而,由于手机存储空间的限制或是误触删除,我们有时会发现那些曾在微信中热聊的照片不见了,给我们的记忆留下了空白。别担心,小编为大家提供了多种…

Java | Leetcode Java题解之第331题验证二叉树的前序序列化

题目&#xff1a; 题解&#xff1a; class Solution {public boolean isValidSerialization(String preorder) {int n preorder.length();int i 0;int slots 1;while (i < n) {if (slots 0) {return false;}if (preorder.charAt(i) ,) {i;} else if (preorder.charAt(…

玩AI第一步——显卡环境配置安装

目录 显卡驱动安装 CUDA环境安装 显卡驱动安装 玩ai&#xff0c;首先第一步是需要安装好显卡驱动 如果是n卡&#xff0c;则需要从官网下载对应的驱动&#xff0c;可点击下方链接去搜索自己显卡对应的驱动。 如果不知道自己是什么显卡&#xff0c;可以下载鲁大师或者驱动精灵…

线程池概述

1.1 线程池概念 在处理大量并发任务的时候&#xff0c;如果按照传统的方式&#xff0c;来一个任务请求&#xff0c;创建一个线程来进行任务的处理&#xff0c;大量线程的创建和销毁&#xff0c;将消耗过多的系统资源&#xff0c;还增加了线程上下文&#xff08;运行环境&#x…

Unity 编写自己的aar库,接收Android广播(broadcastReceiver)并传递到Unity

编写本文是因为找了很多文章&#xff0c;都比较片段&#xff0c;不容易理解&#xff0c;对于Android新手来说理解起来不友好。我这里写了一个针对比较小白的文章&#xff0c;希望有所帮助。 Android端 首先还是先来写Android端&#xff0c;我们新建一个Android空项目&#xf…

AI智能网关 边缘计算 视觉AI

随着人工智能技术的不断发展&#xff0c;AI智能网关正成为连接现实世界和虚拟智能世界的重要桥梁。作为智能化时代的关键设备&#xff0c;AI智能网关在物联网、工业、市政、无人驾驶、农业、环保、水利等领域起到了至关重要的作用。   首先&#xff0c;AI智能网关是物联网的核…

转行大模型成功进字节了!48k*15薪!

以ChatGPT为代表的大模型技术的出现&#xff0c;让算法工程师重新成了炙手可热的岗位。 现在国内各家大小厂都在搞大模型算法&#xff0c;投入了巨量的人力物力财力&#xff0c;都不愿意放弃这个百年难遇的机会&#xff0c;像字节&#xff0c;腾讯&#xff0c;京东等大厂也增加…

数学建模笔记(1):插值法

1.插值法的用途 在对数据进行处理的时候&#xff0c;我们往往会碰到由于数据量比较小的情况&#xff0c;这样的情况不利对数据进行分析。插值法就是是针对这种情况&#xff0c;模拟产生和原来数据相近的数据来为数据分析提供完整可靠的数据。 总结&#xff1a;插值法是一种自己…