RabbitMQ初步到精通-第七章-RabbitMQ之延迟队列

news2024/11/23 14:57:19

目录

第七章-RabbitMQ之延迟队列

1. 延迟队列概念

2. 应用场景

3. 架构模式

3.1 队列TTL实现

3.2 消息TTL实现

3.3 插件实现

4. 代码验证

5. 总结


第七章-RabbitMQ之延迟队列

1. 延迟队列概念

延迟-意即 非实时,之前我们讨论大部分的案例都是生产者将消息发送至Broker,消费者立即进行了消费,若消费者消费能力小于生产者生产能力,产生了消息堆积,也会产生延迟。但这种延迟不是我们主观要求的延迟。

此次涉及到的延迟-是在生产者发消息时,就明确预知的,会产生延迟消费,而且延迟的时间也是设定好的。消息会暂存到queue中,等待预设时间到达后,再即时触发消费。

2. 应用场景

所有发出的消息不想立即消费的场景,例如:

2.1 用户下单后30M未支付会取消订单

2.2 用户下单后2M消息提醒其支付

2.3 用户注册后2天内未登录,进行提醒

.....

当然我们可以通过定时扫描的方式,来实现,但定时扫描会存在控制不精准,若扫描的数据量大对性能有影响。

优雅的方式可以引入延迟队列。

3. 架构模式

实现延迟队列的话,我们考虑两种方式,第一种是通过TTL ,第二种通过rabbitmq的插件,TTL这部分已经在死信队列的篇章介绍过了,与之前死信队列的TTL实现是一致的:

RabbitMQ初步到精通-第六章-RabbitMQ之死信队列_Mr-昊哥的博客-CSDN博客

3.1 队列TTL实现

 如上图,我们将队列1 TTL 设置为10s,且队列1不再有消费者进行消费,那生产者生产的消息都会在队列1中暂存10S,然后投递到死信交换机,再路由到队列2,最后被消费者2成功消费,这样就实现了 消息的延迟消费。而且通过队列设置TTL,消息的延迟都是准确的。

问题:

若我们的业务,并非单一的失效时间,存在多种失效时间,或失效的时间不是固定的,这样就会比较麻烦,我们不可能为每一种失效时间再去增加一个队列吧,队列3-TTL20s,队列4-TTL30s,队列5-TTL50s ... 

 那我们试着从消息入手,在消息发出的时候就设定好TTL。

3.2 消息TTL实现

 这次是不是万无一失呢,队列1不再设置失效时间,发送的四条消息设置不同的时间,通过时间到期,自动转到队列2,消费者2成功去消费。

问题:

若按消息设置失效期,则会存在失效时间不准的情况。例如msg1 TTL 60s ,msg2 ttl 10s ,理论上是msg2先失效,结果是 msg1 60s失效后,再msg2失效。

结论呢,RabbitMQ 只会检查第一个消息是否过期,如果过期则丢到死信队列,如果第一个消息的延时时长很长,而第二个消息的延时时长很短,第二个消息并不会优先得到执行。原因是,rabbitmq在等到消息投递给消费者的时候判断当前消息是否过期。

那还有没有更好的办法,装插件,使用延迟的交换机,来实现。

3.3 插件实现

3.3.1 插件的安装

rabbitmq-delayed-message-exchange

官网下载插件:Community Plugins — RabbitMQ


安装完成后:面板展示:

3.3.2 使用延迟插件架构

 这次又回到最经典的模式了,生产者->交换机->队列->消费者,

只是交换机是一个特殊的 延迟交换机而已。

延迟交换机面板:

 核心的不同是声明交换机的时候:

1. 绑定direct类型 与 延迟交换机参数:

Map<String, Object> argMap = new HashMap<>();
        argMap.put("x-delayed-type", "direct");

2. 声明交换机的时候:

channel.exchangeDeclare(PLUGINS_EXCHANGE, "x-delayed-message", true, false, argMap );

4. 代码验证

代码验证不再对TTL进行验证,可以参考前一章节 死信队列的内容

针对插件验证:

生产者:


/**
 * @author rabbit
 * @version 1.0.0
 * @Description -
 * @createTime 2022/07/27 19:34:00
 */
public class PluginsDelayProducer {

    public static String PLUGINS_EXCHANGE = "plugins.exchange";
    public static String PLUGINS_ROUTING_KEY = "plugins";
    public static String PLUGINS_QUEUE = "plugins.queue";

    //生产者
    public static void main(String[] args) throws Exception {
        //1、获取connection
        Connection connection = RabbitCommonConfig.getConnection();
        //2、创建channel
        Channel channel = connection.createChannel();
        List<Integer> delayedTimes = Arrays.asList(5, 2, 3, 4, 1);
        for (Integer delayedTime : delayedTimes) {
            sendMsg(channel, delayedTime);
        }
        //4、关闭管道和连接
        channel.close();
        connection.close();
    }

    private static void sendMsg(Channel channel, Integer delayedTime) throws IOException, InterruptedException {
        SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
        String content = String.format("消息时间:[%s],延时[%d]s", sdf.format(new Date()), delayedTime);
        byte[] msg = content.getBytes(StandardCharsets.UTF_8);
        Map<String, Object> headers = new HashMap<>();
        headers.put("x-delay", delayedTime * 1000);
        AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder().headers(headers).build();
        channel.basicPublish(PLUGINS_EXCHANGE, PLUGINS_ROUTING_KEY, properties, msg);
        System.out.println("消息发送完成:" + content);
    }

}

消费者:


/**
 * @author rabbit
 * @version 1.0.0
 * @Description
 * @createTime 2022/11/17 16:53:00
 */
public class PluginsDelayConsumer {

    public static String PLUGINS_EXCHANGE = "plugins.exchange";
    public static String PLUGINS_ROUTING_KEY = "plugins";
    public static String PLUGINS_QUEUE = "plugins.queue";

    public static void main(String[] args) throws IOException, TimeoutException {
        //1、获取连对象、
        Connection connection = RabbitCommonConfig.getConnection();
        //2、创建channel
        Channel channel = connection.createChannel();

        // 延迟交换机参数
        Map<String, Object> delayParams = getNormalAndDeadParams();

        // 4.声明一个队列与交换机及绑定关系
        handleQueueAndBinding(channel, PLUGINS_QUEUE, delayParams, PLUGINS_EXCHANGE, PLUGINS_ROUTING_KEY);

        channel.basicQos(1);

        //5.开启监听Queue
        DefaultConsumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
                String content = sdf.format(new Date());
                System.out.println("延迟消费者接收消息: " + new String(body, "UTF-8") + "当前时间: " + content);
                channel.basicAck(envelope.getDeliveryTag(), false);
            }
        };

        channel.basicConsume(PLUGINS_QUEUE, false, consumer);

        System.out.println("延迟消费者启动接收消息......");

        //5、键盘录入,让程序不结束!
        System.in.read();

        //6、释放资源
        channel.close();
        connection.close();

    }

    private static Map<String, Object> getNormalAndDeadParams() {
        Map<String, Object> argMap = new HashMap<>();
        argMap.put("x-delayed-type", "direct");
        return argMap;
    }

    /**
     * 处理队列与绑定关系
     *
     * @param channel
     * @param deadQueueName
     * @param o
     * @param exchangeName
     * @param routingKey
     * @throws IOException
     */
    private static void handleQueueAndBinding(Channel channel, String deadQueueName, Map<String, Object> o, String exchangeName, String routingKey) throws IOException {
        channel.exchangeDeclare(PLUGINS_EXCHANGE, "x-delayed-message", true, false, o);
        channel.queueDeclare(PLUGINS_QUEUE, true, false, false, new HashMap<>());
        channel.queueBind(PLUGINS_QUEUE, PLUGINS_EXCHANGE, PLUGINS_ROUTING_KEY);
    }

}

5. 总结

总的来说实现延迟队列有3种形式:

1. 队列TTL

2.消息TTL

3. 安装延迟插件

使用TTL会有限制且不通用,架构也相对复杂,但也有一些业务失效时间是明确的也可以使用。

使用插件会相对简单,但有些公司,中间件是独立管理的,安装插件还需要沟通,也不一定能够同意安装。

所已,还是因地制宜。适合的就是最好的!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/20411.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

【毕业设计】61-基于单片机的超声波测距仪设计(原理图、仿真工程、低重复率参考设计文档、PPT、开题报告、任务书)

【毕业设计】61-基于单片机的超声波测距仪设计&#xff08;原理图、仿真工程、低重复率参考设计文档、PPT、开题报告、任务书&#xff09;[toc] 资料下载链接 资料下载链接 资料链接&#xff1a;https://www.cirmall.com/circuit/33762/ 包含此题目毕业设计全套资料&#xff…

UDS应用场景

诊断协议那些事儿 本文为诊断协议那些事儿专栏文章&#xff0c;旨在介绍诊断的应用场景&#xff0c;其本质就是一个用于汽车行业通信的需求规范&#xff0c;用于诊断功能数据的解析&#xff01;让读者对诊断有一个深入的认识。 关联文章&#xff1a;UDS协议发展历史 文章目录…

idea如何设置代理实现管理突破呢

&#x1f4e2;&#x1f4e2;&#x1f4e2;&#x1f4e3;&#x1f4e3;&#x1f4e3; 哈喽&#xff01;大家好&#xff0c;我是「奇点」&#xff0c;江湖人称 singularity。刚工作几年&#xff0c;想和大家一同进步&#x1f91d;&#x1f91d; 一位上进心十足的【Java ToB端大厂…

抗锯齿渲染

&#xff08;逻辑绘图&#xff09;图像在坐标图中的显示&#xff1a; 笔的默认宽度为1&#xff0c;当笔的宽度大于1时&#xff0c;相当于在则条线的两边均匀加厚&#xff0c;确定坐标的还是这条宽度为1的线段&#xff0c;相当于宽度的中心位置。 物理绘图 &#xff08;默认情况…

集成学习、装袋法、提升法、GBDT、随机森林(机器学习)

集成学习 集成学习(Ensemble learning)是机器学习中近年来的一大热门领域。其中的 集成方法是用多种学习方法的组合来获取比原方法更优的结果 使用于组合的算法是弱学习算法 即分类正确率仅比随机猜测略高的学习算法 但是组合之后的效果仍可能高于强学习算法 即集成之后的…

总抱怨Mac运行速度又卡又慢?这些方法你用得上

通常大家处理Mac运行速度慢的方法不是重启就是清空废纸篓&#xff0c;但是这两种方法对于Mac提速性能的效果是微之甚微的&#xff0c;想要彻底解决Mac运行速度慢&#xff0c;你应该试试一下三种方法~ 1、清理磁盘空间 硬盘空间过少是Mac运行变慢很大的一个因素&#xff0c;各种…

第03章_基本的SELECT语句

第03章_基本的SELECT语句 1. SQL概述 1.1 SQL背景知识 1946 年&#xff0c;世界上第一台电脑诞生&#xff0c;如今&#xff0c;借由这台电脑发展起来的互联网已经自成江湖。在这几十年里&#xff0c;无数的技术、产业在这片江湖里沉浮&#xff0c;有的方兴未艾&#xff0c;有…

【用户画像】Redis的常用五大数据类型和配置文件介绍

文章目录一 常用五大数据类型简介1 Redis键(key)2 Redis字符串(String)3 Redis列表(List)4 Redis集合(Set)5 Redis哈希(Hash)6 Redis有序集合Zset(sorted set)二 Redis配置文件介绍1 UNITS2 INCLUDES3 NETWORK4 MEMORY MANAGEMENT一 常用五大数据类型简介 常用命令 1 Redis键…

攻防世界碎纸机11

碎纸机11 题目描述&#xff1a;我们从碎纸机里抢救回来了某个关键图片资料&#xff0c;你能帮我们修复它吗&#xff1f; 题目环境&#xff1a;https://download.csdn.net/download/m0_59188912/87094757 打开文件&#xff0c;发现是让我们拼图。 可以用python脚本进行拼接。 脚…

pytorch初学笔记(七):神经网络基本骨架 torch.nn.Module

目录 一、 torch.nn模块 二、module模块 三、自定义搭建神经网络 一、 torch.nn模块 torch.nn — PyTorch 1.13 documentation 二、module模块 Module — PyTorch 1.13 documentation 我们自己定义的神经网络需要继承nn.Module类&#xff0c;需要重写以下两个方法&#xf…

智慧林业解决方案-最新全套文件

智慧林业解决方案-最新全套文件一、建设背景二、建设架构1、火险预警2、AI林火分析3、应急指挥4、森林资源GIS5、林业巡检6、林业OA三、建设方案四、获取 - 智慧林业全套最新解决方案合集一、建设背景 森林资源是林地及其所生长的森林有机体的总称&#xff0c;以林木资源为主&…

设计模式之设计原则

程序设计的要遵循的一些理论,也可以理解为程序设计的一种要求和目标,是面向对象程序设计的基石,也是面向对象程序设计的质量保障和依据。设计模式&#xff08;Design pattern&#xff09;是一套被反复使用、多数人知晓的、经过分类编目的、代码设计经验的总结。使用设计模式是为…

Python基础语法入门

14天学习训练营导师课程&#xff1a; 李宁《Python Pygame游戏开发入门与实战》 李宁《计算机视觉OpenCV Python项目实战》1 李宁《计算机视觉OpenCV Python项目实战》2 李宁《计算机视觉OpenCV Python项目实战》3 上一节课&#xff0c;我们了解了基础环境如何配置&#xff0c…

【深度学习】图像分类数据集Fashion-MNIST

今天在手撸深度学习代码的时候&#xff0c;遇到了这个数据集&#xff0c;但是调用的函数的参数和功能不是很明白&#xff0c;因此选择写篇博客总结一下。 一、介绍 Fashion-MNIST是⼀个10类服饰分类数据集。 torchvision 包&#xff1a;它是服务于 PyTorch 深度学习框架的&a…

springboot+maven大学校友活动风采展示管理信息系统

大学校友管理信息系统当然也不能排除在外&#xff0c;从校友活动、校友风采的统计和分析&#xff0c;在过程中会产生大量的、各种各样的数据。本文以大学校友管理信息系统为目标&#xff0c;采用B/S模式&#xff0c;以SSM为开发框架&#xff0c;Jsp为开发技术、Eclipse/idea为开…

计算机毕业设计之java+ssm爱家房屋租赁信息管理系统

项目介绍 本爱家房屋租赁信息管理系统是针对目前房屋租赁信息管理的实际需求&#xff0c;从实际工作出发&#xff0c;对过去的房屋租赁信息管理系统存在的问题进行分析&#xff0c;结合计算机系统的结构、概念、模型、原理、方法&#xff0c;在计算机各种优势的情况下&#xf…

Linux进阶-编辑器以及Shell编程

常用两个编辑器 gedit编辑器&#xff1a;依赖图形界面。 vi/vim编辑器&#xff1a;sudo apt install vim&#xff08;安装vim编辑器&#xff09; vim与vi的区别&#xff1a; vim是vi的升级版本&#xff0c;兼容vi&#xff1b; vi按u只能撤销上次命令&#xff0c;而在vim里…

java计算机毕业设计基于安卓Android的校园单车租赁App

项目介绍 校园单车租赁APP管理是校园单车租赁管理中对用户必不可少的一个部分。在人们校园单车租赁管理的整个过程中,校园单车租赁APP管理担负着最重要的角色。为满足如今日益复杂的管理需求,各类校园单车租赁APP管理程序也在不断改进。本课题所设计的校园单车租赁APP,使用ssm框…

Nginx源码分析--内存池

1.问题引入 使用C语言编程时&#xff0c;一般使用malloc和free进行动态内存申请和释放。如果一不小心忘记了调用free进行释放&#xff0c;很容易造成内存泄露。另一方面&#xff0c;频繁地进行malloc和free操作&#xff0c;很容易造成内存碎片。与此同时&#xff0c;因为mallo…

[Spring Cloud] nacos作为服务中心

✨✨个人主页:沫洺的主页 &#x1f4da;&#x1f4da;系列专栏: &#x1f4d6; JavaWeb专栏&#x1f4d6; JavaSE专栏 &#x1f4d6; Java基础专栏&#x1f4d6;vue3专栏 &#x1f4d6;MyBatis专栏&#x1f4d6;Spring专栏&#x1f4d6;SpringMVC专栏&#x1f4d6;SpringBoot专…