RabbitMQ 是如何做延迟消息的 ?——Java全栈知识(15)

news2024/12/29 8:46:04

RabbitMQ 是如何做延迟消息的 ?

1、什么是死信?

当一个队列中的消息满足下列情况之一时,可以成为死信(dead letter):

  • 消费者使用 basic.rejectbasic.nack 声明消费失败,并且消息的 requeue 参数设置为 false
  • 消息是一个过期消息,超时无人消费
  • 要投递的队列消息满了,无法投递
    如果一个队列中的消息已经成为死信,并且这个队列通过dead-letter-exchange属性指定了一个交换机,那么队列中的死信就会投递到这个交换机中,而这个交换机就称为死信交换机(Dead Letter Exchange)。而此时加入有队列与死信交换机绑定,则最终死信就会被投递到这个队列中。
    死信交换机有什么作用呢?
  1. 收集那些因处理失败而被拒绝的消息
  2. 收集那些因队列满了而被拒绝的消息
  3. 收集因 TTL(有效期)到期的消息

2、死信队列

架构:
image.png
由于第一个队列没有消费者,所以可以在第一个队列中设置 TTL,当消息过期的时候,这个消息就变成了死信,被丢掉私信交换机中,以此实现延迟任务功能。

3、延迟消息

前面两种作用场景可以看做是把死信交换机当做一种消息处理的最终兜底方案,与消费者重试时讲的RepublishMessageRecoverer作用类似。

而最后一种场景,大家设想一下这样的场景: 如图,有一组绑定的交换机(ttl.fanout)和队列(ttl.queue)。但是ttl.queue没有消费者监听,而是设定了死信交换机hmall.direct,而队列direct.queue1则与死信交换机绑定,RoutingKey是blue:
假如我们现在发送一条消息到ttl.fanout,RoutingKey为blue,并设置消息的有效期为5000毫秒: image.png注意:尽管这里的ttl.fanout不需要RoutingKey,但是当消息变为死信并投递到死信交换机时,会沿用之前的RoutingKey,这样hmall.direct才能正确路由消息。
消息肯定会被投递到 ttl.queue 之后,由于没有消费者,因此消息无人消费。5秒之后,消息的有效期到期,成为死信: image.png 死信被再次投递到死信交换机 hmall.direct,并沿用之前的 RoutingKey,也就是 blueimage.png 由于 direct.queue1hmall.direct 绑定的 key 是 blue,因此最终消息被成功路由到 direct.queue1,如果此时有消费者与 direct.queue1 绑定,也就能成功消费消息了。但此时已经是5秒钟以后了: image.png 也就是说,publisher 发送了一条消息,但最终 consumer 在5秒后才收到消息。我们成功实现了延迟消息

[!info]
而且,RabbitMQ 中的这个 TTL 是可以设置任意时长的,这相比于 RocketMQ 只支持一些固定的时长而显得更加灵活一些。

死信队列消息堆积问题

[!danger] 死信队列消息堆积问题
但是,死信队列的实现方式存在一个问题,那就是可能造成队头阻塞。RabbitMQ 会定期扫描队列的头部检查队首的消息是否过期。如果队首消息过期了,它会被放到死信队列中。然而,RabbitMQ 不会逐个检查队列中的所有消息是否过期,而是仅检查队首消息。这样,如果队列的队头消息未过期,而它后面的消息已过期,这些后续消息将无法被单独移除,直到队头的消息被消费或过期。
因为队列是先进先出的,在普通队列中的消息,每次只会判断邢队头的消息是否过期,那么,如果队头的消息时间很长,一直都不过期,那么就会阻塞整个队列,这时候即使排在他后面的消息过期了,那么也会被一直阻塞。

基于 RabbitMQ 的死信队列,可以实现延迟消息,非常灵活的实现定时关单,并且借助 RabbitMQ 的集群扩展性,可以实现高可用,以及处理大并发量。他的缺点第一是可能存在消息阻塞的问题,还有就是方案比较复杂,不仅要依赖 RabbitMQ, 而目还需要声明很多队列出来,增加系统的复杂度

3、DelayExchange 插件

前面我们提到的基于死信队列的方式,是消息先会投递到一个正常队列,在 TTL 过期后进入死信队列。但是基于插件的这种方式,消息并不会立即进入队列,而是先把他们保存在一个基于 Erlang 开发的 Mnesia 数据库中,然后通过一个定时器去查询需要被投递的消息,再把他们投递到 x-delayed-message 交换机中。
基于 RabbitMQ 插件的方式可以实现延迟消息,并且不存在消息阻塞的问题,但是因为是基于插件的,而这个插件支持的最大延长时间是 (2^32)-1 毫秒,大约 49 天,超过这个时间就会被立即消费。

插件下载地址: GitHub - rabbitmq/rabbitmq-delayed-message-exchange: Delayed Messaging for RabbitMQ
由于我们安装的 MQ 是 3.8 版本,因此这里下载 3.8.17 版本:
image.png|600px
附件:![[rabbitmq_delayed_message_exchange-3.8.17.8f537ac.ez]]

4.2.2. 安装

因为我们是基于 Docker 安装,所以需要先查看 RabbitMQ 的插件目录对应的数据卷。

docker volume inspect mq-plugins

结果如下:

[  
    {  
        "CreatedAt": "2024-06-19T09:22:59+08:00",  
        "Driver": "local",  
        "Labels": null,  
        "Mountpoint": "/var/lib/docker/volumes/mq-plugins/_data",  
        "Name": "mq-plugins",  
        "Options": null,  
        "Scope": "local"  
    }  
]  

插件目录被挂载到了 /var/lib/docker/volumes/mq-plugins/_data 这个目录,我们上传插件到该目录下

注意上传插件

接下来执行命令,安装插件:

docker exec -it mq rabbitmq-plugins enable rabbitmq_delayed_message_exchange

运行结果如下:
image.png

4.2.3. 声明延迟交换机

image.png

根据

1、创建交换机:
image.png
2、创建队列
image.png
3、根据 bandingKey 绑定队列:
image.png|500

基于注解方式:

@RabbitListener(bindings = @QueueBinding(  
        value = @Queue(name = "delay.queue", durable = "true"),  
        exchange = @Exchange(name = "delay.direct", delayed = "true"),  
        key = "delay"  
))  
public void listenDelayMessage(String msg){  
    log.info("接收到delay.queue的延迟消息:{}", msg);  
}

基于 @Bean 的方式:

package com.itheima.consumer.config;import lombok.extern.slf4j.Slf4j;  
import org.springframework.amqp.core.*;  
import org.springframework.context.annotation.Bean;  
import org.springframework.context.annotation.Configuration;@Slf4j  
@Configuration  
public class DelayExchangeConfig {@Bean  
    public DirectExchange delayExchange(){  
        return ExchangeBuilder  
                .directExchange("delay.direct") // 指定交换机类型和名称  
                .delayed() // 设置delay的属性为true  
                .durable(true) // 持久化  
                .build();  
    }@Bean  
    public Queue delayedQueue(){  
        return new Queue("delay.queue");  
    }  
      
    @Bean  
    public Binding delayQueueBinding(){  
        return BindingBuilder.bind(delayedQueue()).to(delayExchange()).with("delay");  
    }  
}  

4.2.4. 发送延迟消息

发送消息时,必须通过 x-delay 属性设定延迟时间:

@Test  
void testPublisherDelayMessage() {  
    // 1.创建消息  
    String message = "hello, delayed message";  
    // 2.发送消息,利用消息后置处理器添加消息头  
    rabbitTemplate.convertAndSend("delay.direct", "delay", message, new MessagePostProcessor() {  
        @Override  
        public Message postProcessMessage(Message message) throws AmqpException {  
            // 添加延迟消息属性  
            message.getMessageProperties().setDelay(5000);  
            return message;  
        }  
    });  
}

warning 注意: 延迟消息插件内部会维护一个本地数据库表,同时使用 Elang Timers 功能实现计时。如果消息的延迟时间设置较长,可能会导致堆积的延迟消息非常多,会带来较大的 CPU 开销,同时延迟消息的时间会存在误差。因此,不建议设置延迟时间过长的延迟消息

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1645969.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

(论文阅读-优化器)Selectivity Estimation using Probabilistic Models

目录 摘要 一、简介 二、单表估计 2.1 条件独立Condition Independence 2.2 贝叶斯网络Bayesian Networks 2.3 查询评估中的贝叶斯网络 三、Join选择性估计 3.1 两表Join 3.2 概率关系模型 3.3 使用PRMs的选择性估计 四、PRM构建 4.1 评分标准 4.2 参数估计 4.3 结…

9.媒体元素

视频元素 视频标签基本代码结构&#xff1a; <video src"" controls></video>其中src是视频资源的路径&#xff0c;这个路径有绝对路径和相对路径这里推荐用相对路径。&#xff08;这里可以回顾我html系列的第四篇图片标签&#xff09;&#xff0c;我们…

【数据结构】--- 深入剖析二叉树(中篇)--- 认识堆堆排序Topk

Welcome to 9ilks Code World (๑•́ ₃ •̀๑) 个人主页: 9ilk (๑•́ ₃ •̀๑) 文章专栏&#xff1a; 数据结构之旅 文章目录 &#x1f3e0; 初识堆 &#x1f4d2; 堆的概念 &#x1f4d2; 堆的性质 &#x1f3e0; 向上调整算法 && 向下调整算…

第一天学习(GPT)

1.图片和语义是如何映射的&#xff1f; **Dalle2&#xff1a;**首先会对图片和语义进行预训练&#xff0c;将二者向量存储起来&#xff0c;然后将语义的vector向量转成图片的向量&#xff0c;然后基于这个图片往回反向映射&#xff08;Diffusion&#xff09;——>根据这段描…

Junit 测试中如何对异常进行断言

本文对在 Junit 测试中如何对异常进行断言的几种方法进行说明。 使用 Junit 5 如果你使用 Junit 5 的话,你可以直接使用 assertThrows 方法来对异常进行断言。 代码如下: Exception exception = assertThrows(NumberFormatException.class, () -> {new Integer("on…

基于springboot+vue+Mysql的点餐平台网站

开发语言&#xff1a;Java框架&#xff1a;springbootJDK版本&#xff1a;JDK1.8服务器&#xff1a;tomcat7数据库&#xff1a;mysql 5.7&#xff08;一定要5.7版本&#xff09;数据库工具&#xff1a;Navicat11开发软件&#xff1a;eclipse/myeclipse/ideaMaven包&#xff1a;…

第四百九十二回

文章目录 1. 概念介绍2. 使用方法2.1 SegmentedButton2.2 ButtonSegment 3. 代码与效果3.1 示例代码3.2 运行效果 4. 内容总结 我们在上一章回中介绍了"SearchBar组件"相关的内容&#xff0c;本章回中将介绍SegmentedButton组件.闲话休提&#xff0c;让我们一起Talk …

引领农业新质生产力,鸿道(Intewell®)操作系统助力农业机器人创新发展

4月27日至29日&#xff0c;2024耒耜国际会议在江苏大学召开。科东软件作为特邀嘉宾出席此次盛会&#xff0c;并为江苏大学-科东软件“农业机器人操作系统”联合实验室揭牌。 校企联合实验室揭牌 在开幕式上&#xff0c;江苏大学、科东软件、上交碳中和动力研究院、遨博智能研究…

【c1】数据类型,运算符/循环,数组/指针,结构体,main参数,static/extern,typedef

文章目录 1.数据类型&#xff1a;编译器&#xff08;compiler&#xff09;与解释器&#xff08;interpreter&#xff09;&#xff0c;中文里的汉字和标点符号是两个字节&#xff0c;不能算一个字符&#xff08;单引号&#xff09;2.运算符/循环&#xff1a;sizeof/size_t3.数组…

顶管机种类多样 国内产量不断增长

顶管机种类多样 国内产量不断增长 顶管机是一种用于非开挖管道铺设的机械设备&#xff0c;能够通过非开挖施工技术降低对地面活动的影响&#xff0c;具有工作效率高、安全性好、受地质条件限制小、环保性强等优点&#xff0c;在隧道修建、城市管网建设、地下管线敷设等场景中发…

《QT实用小工具·五十八》模仿VSCode的可任意拖拽的Tab标签组

1、概述 源码放在文章末尾 该项目实现了模仿VSCode的可任意拖拽的Tab标签组&#xff0c;包含如下功能&#xff1a; 拖拽标签页至新窗口 拖拽标签页合并控件 无限嵌套的横纵分割布局&#xff08;类似Qt Creator的编辑框&#xff09; 获取当前使用的标签组、标签页 自动向上合并…

测径仪视窗镜片的维护和保养步骤

关键字:测径仪镜片,测径仪保养,测径仪维护,视窗镜片维护,视窗镜片擦拭保养,视窗镜片的检查, 视窗镜片定期保养 视窗镜片是保护光学镜头免受污染和损伤的光学平镜片&#xff0c;它的污染和破损会直接影响光学系统的测量结果。 视窗镜片一般在受到轻微污染&#xff08;指镜片上…

项目管理-项目采购管理2/2

项目管理&#xff1a;每天进步一点点~ 活到老&#xff0c;学到老 ヾ(◍∇◍)&#xff89;&#xff9e; 何时学习都不晚&#xff0c;加油 本文承接 项目采购管理第二部分&#xff0c;详细讲解项目合同管理。 项目采购管理过程--重点&#xff1a; ①ITTO 输入&#xff0c;输出…

测试环境搭建:JDK+Tomcat+Mysql+Redis

基础的测试环境搭建&#xff1a; LAMPLinux(CentOS、ubuntu、redhat)ApacheMysqlPHP LTMJLinux(CentOS、ubuntu、redhat)TomcatMysql(Oracle)RedisJava 真实的测试环境搭建&#xff1a;&#xff08;企业真实的运维&#xff09; 基于SpringBoot&#xff08;SpringCloud分布式微…

分析:Palo Alto在从SASE向SASO演进中定位不佳

摘要 我们通过上一篇文章&#xff08;Fortinet的愿景——超越SASE&#xff09;中应用于Fortinet的相同框架来回顾Palo Alto Network在网络和网络安全方面的前景。 SASE涉及数据传输的第一英里。不过&#xff0c;随着SASE的发展&#xff0c;投资者还需要考虑中间和最后一英里。…

javaweb学习week7

javaweb学习 十四.Springboot 1.配置优先级 Springboot中支持三种格式的配置文件&#xff1a; 注意&#xff1a;虽然Springboot支持多种格式配置文件&#xff0c;但是在项目开发时&#xff0c;推荐使用一种格式的配置&#xff08;yml是主流&#xff09; Springboot除了支持…

【Osek网络管理测试】[TG3_TC3]tSleepRequestMin_L

&#x1f64b;‍♂️ 【Osek网络管理测试】系列&#x1f481;‍♂️点击跳转 文章目录 1.环境搭建2.测试目的3.测试步骤4.预期结果5.测试结果 1.环境搭建 硬件&#xff1a;VN1630 软件&#xff1a;CANoe 2.测试目的 验证DUT进入NMLimpHome状态后请求睡眠的最短时间是否正确…

周刊是聪明人筛选优质知识的聪明手段!

这是一个信息过载的时代&#xff0c;也是一个信息匮乏的时代。 这种矛盾的现象在 Python 编程语言上的表现非常明显。 它是常年高居编程语言排行榜的最流行语言之一&#xff0c;在国外发展得如火如荼&#xff0c;开发者、项目、文章、播客、会议活动等相关信息如海如潮。 但…

【LeetCode刷题记录】105. 从前序与中序遍历序列构造二叉树 106. 从中序与后序遍历序列构造二叉树

105 从前序与中序遍历序列构造二叉树 给定两个整数数组 preorder 和 inorder &#xff0c;其中 preorder 是二叉树的先序遍历&#xff0c; inorder 是同一棵树的中序遍历&#xff0c;请构造二叉树并返回其根节点。 示例 1: 输入: preorder [3,9,20,15,7], inorder [9,3,1…

近50亿元国资助阵,全球最大量子独角兽登场!

4月30日&#xff0c;澳大利亚与PsiQuantum公司宣布签订一项近10亿澳元&#xff08;约6.2亿美元、47.24亿人民币&#xff09;的协议&#xff0c;旨在建造世界上第一台商业上“有用”的量子计算机。 仅在一天前&#xff0c;澳大利亚还投资了1840万澳元&#xff0c;在悉尼大学成立…