学会RabbitMQ的延迟队列,提高消息处理效率

news2024/11/23 20:40:06

系列文章目录

手把手教你,本地RabbitMQ服务搭建(windows)
消息队列选型——为什么选择RabbitMQ
RabbitMQ灵活运用,怎么理解五种消息模型
RabbitMQ 能保证消息可靠性吗
推或拉? RabbitMQ 消费模式该如何选择
死信是什么,如何运用RabbitMQ的死信机制?
真的好用吗?鲜有人提的 RabbitMQ-RPC模式



在这里插入图片描述
前面我们讲到了RabbitMQ的死信队列,其实除了死信队列,RabbitMQ还有一个常用的延迟队列设计。今天,我们就来说一下这个延迟队列

📕作者简介:战斧,从事金融IT行业,有着多年一线开发、架构经验;爱好广泛,乐于分享,致力于创作更多高质量内容
📗本文收录于 RabbitMQ ,有需要者,可直接订阅专栏实时获取更新
📘高质量专栏 云原生RabbitMQ、Spring全家桶 等仍在更新,欢迎指导
📙Zookeeper Redis kafka docker netty等诸多框架,以及架构与分布式专题即将上线,敬请期待


提示:以下是本篇文章正文内容,下面案例可供参考

一、什么是延迟队列?

延迟队列指的是当我们将消息发送到RabbitMQ时,可以指定消息的有效期或者消息需要在未来某个时间点才能被消费。这种消息被称为“延迟消息”。因此,RabbitMQ支持通过延迟队列来实现延迟消息的发送和消费。

二、延迟队列的实现

延迟队列的实现原理其实就是将消息放入到一个普通的队列中,只不过这个队列有一个特殊的属性:消息的消费被延迟一段时间。这个延迟时间可以是任意的,也可以是固定的。当消息进入队列时,会有一个定时器在计时,当计时器到达设定的时间时,消息会被转移至消费队列等待被消费。

在RabbitMQ中,延迟队列的实现有两种方式:一种是通过x-delayed-message插件实现;另一种是通过TTL(Time To Live)和死信队列实现。

1. x-delayed-message插件

x-delayed-message插件可以让RabbitMQ支持延迟消息功能,它是一个非官方插件,需要自行下载并安装。其源码地址如下:github地址 或 gitee地址;如果你是从笔者之前的安装博客 手把手教你,本地RabbitMQ服务搭建(windows) 过来的,那么你用的可能是RabbitMQ V3.12,可以直接下载我上传的资源 3.12-插件

首先,需要在RabbitMQ服务器上安装x-delayed-message插件。把上述的插件复制进我们RabbitMQ的服务插件目录下
在这里插入图片描述
然后执行插件的启用 rabbitmq-plugins enable rabbitmq_delayed_message_exchange 即可
然后,在Java代码中定义queue、exchange和connectionFactory,代码如下:

ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost(HOST_NAME);
connectionFactory.setUsername(USERNAME);
connectionFactory.setPassword(PASSWORD);
connectionFactory.setPort(PORT);

Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();

Map<String, Object> arguments = new HashMap<String, Object>();
arguments.put("x-delayed-type", "direct");
channel.exchangeDeclare("delayed_exchange", "x-delayed-message", true, false, arguments);
channel.queueDeclare("delayed_queue", true, false, false, null);
channel.queueBind("delayed_queue", "delayed_exchange", "delayed_routing_key");

不难发现,此时其实是交换机在做延迟,
在这里插入图片描述

当然,除了交换机的设置,在发送消息时,还需要在消息头部设置x-delay属性,代码如下:

AMQP.BasicProperties.Builder builder = new AMQP.BasicProperties.Builder();
builder.deliveryMode(2);
builder.headers(new HashMap<String, Object>(){{put("x-delay", 5000);}});
AMQP.BasicProperties properties = builder.build();
channel.basicPublish("delayed_exchange", "delayed_routing_key", properties, message.getBytes());

2. TTL + 死信队列

此种方式的原理其实我们在学习死信队列的时候应该就察觉到了,就是利用消息超时(TTL)后会转入死信交换机的机制,其模型如下:
在这里插入图片描述

首先,需要在Java代码中定义queue、exchange和connectionFactory,代码如下:

ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost(HOST_NAME);
connectionFactory.setUsername(USERNAME);
connectionFactory.setPassword(PASSWORD);
connectionFactory.setPort(PORT);

Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();

Map<String, Object> arguments = new HashMap<String, Object>();
arguments.put("x-dead-letter-exchange", "dead_letter_exchange");
arguments.put("x-dead-letter-routing-key", "dead_letter_routing_key");
arguments.put("x-message-ttl", 5000);

channel.exchangeDeclare("normal_exchange", "direct", true, false, null);
channel.exchangeDeclare("dead_letter_exchange", "direct", true, false, null);
channel.queueDeclare("normal_queue", true, false, false, arguments);
channel.queueDeclare("dead_letter_queue", true, false, false, null);
channel.queueBind("normal_queue", "normal_exchange", "normal_routing_key");
channel.queueBind("dead_letter_queue", "dead_letter_exchange", "dead_letter_routing_key");

在发送消息时,只需要将消息发送到normal_exchange交换机下,代码如下:

channel.basicPublish("normal_exchange", "normal_routing_key", MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());

三、手写延时队列

当然,除了RabbitMQ,实现延时队列的方式还有很多,我们甚至可以自己实现,本节,我们就尝试自己写个延时队列

1. 时间轮概念

在关于计时或定时的设计里,时间轮是一种用于处理定时任务的数据结构。它通过将时间划分为一系列的时刻,每个时刻对应一个槽,将任务存储在相应的槽中
在这里插入图片描述
时间轮通常包含多个槽和指针,其中指针指向当前时刻对应的槽,每过单位时间,指针就指向下一个槽,这样任务调度时按照指针的移动依次执行槽中的任务
在这里插入图片描述

2. JAVA演示

我们先使用JUC相关内容实现一个时间轮

import java.util.*;
import java.util.concurrent.*;

class TimeWheel {
    private int size;
    private int currentIndex;
    private List<BlockingQueue<Task>> slots;
    private Executor executor;

    public TimeWheel(int size, Executor executor) {
        this.size = size;
        this.slots = new ArrayList<>(size);
        for (int i = 0; i < size; i++) {
            slots.add(new LinkedBlockingQueue<>());
        }
        this.executor = executor;
    }

    public void addTask(Task task) {
        int expireIndex = (int)(currentIndex + task.getDelay() / 1000) % size;
        slots.get(expireIndex).add(task);
    }

    public void start() {
        new Thread(() -> {
            while (true) {
                currentIndex = (currentIndex + 1) % size;
                BlockingQueue<Task> currentSlot = slots.get(currentIndex);
                List<Task> tasks = new ArrayList<>();
                currentSlot.drainTo(tasks);
                for (Task task : tasks) {
                    executor.execute(task);
                }
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }).start();
    }
}

class Task implements Runnable {
    private long delay; // 延迟时间,单位毫秒
    private Runnable task; // 任务

    public Task(long delay, Runnable task) {
        this.delay = delay;
        this.task = task;
    }

    public long getDelay() {
        return delay;
    }

    @Override
    public void run() {
        task.run();
    }
}


我们可以使用main方法来尝试验证这个时间轮效果:

    public static void main(String[] args) {
        TimeWheel timeWheel = new TimeWheel(60 * 60, Executors.newFixedThreadPool(10));
        // 添加任务,延迟5秒执行
        timeWheel.addTask(new Task(5000, () -> System.out.println("Task 1 executed!")));
        // 添加任务,延迟10秒执行
        timeWheel.addTask(new Task(10000, () -> System.out.println("Task 2 executed!")));
        // 启动时间轮
        timeWheel.start();
    }

在这里插入图片描述

当然,以上代码只是一个简化的实现,实际情况中需要考虑任务执行时间和时间轮的精度等问题。

四、应用场景与注意事项

1. 应用场景

  1. 红包预告
    在现在的抢红包的场景下,当用户发起红包活动后,可能不希望立即开抢,而是设定在一段时间后开启。那么我们可以将将红包信息发送到一个延迟队列中,一定时间后,系统会自动激活红包,此时用户才可以真正抢红包
    在这里插入图片描述

  2. 订单系统
    在订单系统中,有一些订单需要在未来某个时间点才能被处理。例如,有些订单需要在一定的时间之后才能发货或者确认收货。这时候,我们可以将这些订单放到延迟队列中,当时间到达时再进行处理。

  3. 优惠券系统
    在优惠券系统中,有一些优惠券需要在未来某个时间点才能使用。这时候,我们可以将这些优惠券放到延迟队列中,当时间到达时再进行激活。

2. 注意事项

  1. 延迟队列不要使用太多
    使用延迟队列可以在一定程度上减少系统的负载,但是使用过多的延迟队列会导致系统变得更加复杂,维护起来也更加困难。

  2. 延迟队列可能会导致消息丢失
    在RabbitMQ中,当一个带有TTL消息被发送到队列中时,如果队列中的消息太多,或者队列的消费者速度太慢,就会导致消息失效,如果没有使用死信机制,消息就会被丢失。为了避免这种情况发生,我们需要对队列进行监控,及时发现问题并进行处理。

  3. 设置合适的延迟时间
    在使用延迟队列时,需要根据实际需求设置合适的延迟时间。如果延迟时间太短,可能会导致消息延迟效果不明显;如果延迟时间太长,可能会导致系统累积大量的消息,导致负载过高。

总结

RabbitMQ的延迟队列是一种非常实用的特性,可以帮助我们实现定时任务、限流、削峰等功能。但是,在使用延迟队列时,需要谨慎对待,根据实际需求设置合适的延迟时间,并及时监控队列中的消息,避免出现消息丢失的情况。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/850674.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

[CVPR-23-Highlight] Magic3D: High-Resolution Text-to-3D Content Creation

目录 Abstract Background: DreamFusion High-Resolution 3D Generation Coarse-to-fine Diffusion Priors Scene Models Coarse-to-fine Optimization NeRF optimization Mesh optimization Experiments Controllable 3D Generation Personalized text-to-3D Prom…

自动化实践-全量Json对比在技改需求提效实践

1 背景 随着自动化测试左移实践深入&#xff0c;越来越多不同类型的需求开始用自动化测试左移来实践&#xff0c;在实践的过程中也有了新的提效诉求&#xff0c;比如技改类的服务拆分项目或者BC流量拆分的项目&#xff0c;在实践过程中&#xff0c;这类需求会期望不同染色环境…

当前服务器版本不支持该功能,请联系经销商升级服务器 - - 达梦数据库报错

当前服务器版本不支持该功能&#xff0c;请联系经销商升级服务器 - - 达梦数据库报错 环境介绍1 搭建测试环境2 报错内容3 标准版介绍 环境介绍 某项目使用标准版数据库中&#xff0c;使用insert into 正常操作表&#xff0c;插入数据时报错&#xff0c;表为普通表。 1 搭建测…

【云原生•监控】基于Prometheus实现自定义指标弹性伸缩(HPA)

【云原生•监控】基于Prometheus实现自定义指标弹性伸缩(HPA) 什么是弹性伸缩 「Autoscaling即弹性伸缩&#xff0c;是Kubernetes中的一种非常核心的功能&#xff0c;它可以根据给定的指标&#xff08;例如 CPU 或内存&#xff09;自动缩放Pod副本&#xff0c;从而可以更好地管…

new function是什么?(小写function)

参考链接&#xff1a;https://juejin.cn/post/7006232342398238733

【SpringBoot笔记】定时任务(cron)

定时任务就是在固定的时间执行某个程序&#xff0c;闹钟的作用。 1.在启动类上添加注解 EnableScheduling 2.创建定时任务类 在这个类里面使用表达式设置什么时候执行 cron 表达式&#xff08;也叫七子表达式&#xff09;&#xff0c;设置执行规则 package com.Lijibai.s…

地震预警系统全平台开通攻略

大家好&#xff0c;我是熊哥。 最近地震频发&#xff0c;做为一个有社为责任感的人&#xff0c;我认为我有必要为大家总结这一份安全手册&#xff1b;做为一个小V有必要让更多的人看见&#xff0c;积德攒人品。希望大家平安健康。 效果 在地震来临前&#xff0c;手机、电视会…

每天一道leetcode:剑指 Offer 59 - II. 队列的最大值(中等)

今日份题目&#xff1a; 请定义一个队列并实现函数 max_value 得到队列里的最大值&#xff0c;要求函数max_value、push_back 和 pop_front 的均摊时间复杂度都是O(1)。 若队列为空&#xff0c;pop_front 和 max_value 需要返回 -1 示例1 输入: ["MaxQueue",&qu…

centos7实现负载均衡

目录 一、基于 CentOS 7 构建 LVS-DR 集群。 1.1 配置lvs负载均衡服务 1.1.1 下载ipvsadm 1.1.2 增加vip 1.1.3 配置ipvsadm 1.2 配置rs1 1.2.1 编写测试页面 1.2.2 手工在RS端绑定VIP、添加路由 1.2.3 抑制arp响应 1.3 配置rs2 1.4 测试 二、配置nginx负载…

Springboot后端通过路径映射获取本机图片资源

项目场景&#xff1a; 项目中对图片的处理与查看是必不可少的&#xff0c;本文将讲解如何通过项目路径来获取到本机电脑的图片资源 如图所示&#xff0c;在我的本机D盘的图片测试文件夹(文件夹名字不要有中文)下有一些图片&#xff0c; 我们要在浏览器上访问到这些图片&#…

Flutter系列文章-实战项目

在本篇文章中&#xff0c;我们将通过一个实际的 Flutter 应用来综合运用最近学到的知识&#xff0c;包括保存到数据库、进行 HTTP 请求等。我们将开发一个简单的天气应用&#xff0c;可以根据用户输入的城市名获取该城市的天气信息&#xff0c;并将用户查询的城市列表保存到本地…

灵活利用ChatAI,减轻工作任务—语言/翻译篇

前言 ChatAI在语言和翻译方面具有重要作用。它能够帮助用户进行多语言交流、纠正错误、学习新语言、了解不同文化背景&#xff0c;并提供文本翻译与校对等功能。通过与ChatAI互动&#xff0c;我们能够更好地利用技术来拓展自己在语言领域的能力和知识&#xff0c;实现更加无障…

P11-Transformer学习1.1-《Attention Is All You Need》

Transformer目录:《Transformer Paper》1.0 CV Transformer必读论文5篇_汉卿HanQ的博客-CSDN博客 前文参考:Transformer1.0-预热_汉卿HanQ的博客-CSDN博客 全文1w3字左右&#xff0c;按照论文翻译个人理解精读&#xff0c;如果对你有所帮助&#xff0c;欢迎点个赞哦&#xff…

【C语言】初识C语言+进阶篇导读

✨个人主页&#xff1a; Anmia.&#x1f389;所属专栏&#xff1a; C Language &#x1f383;操作环境&#xff1a; Visual Studio 2019 版本 本篇目的是面向编程新手&#xff0c;没接触过编程的人。以及C进阶的导读。 内容是C语言重要知识点的简单解释&#xff0c;不做详解。给…

征稿 | 第三届粤港澳大湾区人工智能与大数据论坛(AIBDF 2023)

第三届粤港澳大湾区人工智能与大数据论坛&#xff08;AIBDF 2023&#xff09; 2023 3rd Guangdong-Hong Kong-Macao Greater Bay Area Artificial Intelligence And Big Data Forum 本次高端论坛围绕建设国家数字经济创新发展试验区进行选题。全面贯彻落实党的二十大精神&…

001-Spring boot 启动内置Web容器分析

目录 代码入口上下文容器 加载web容器 代码入口 上下文容器 SpringApplication.run(App.class); //追踪下去发现 context createApplicationContext();createApplicationContext()&#xff1a; return this.applicationContextFactory.create(this.webApplicationType);这里…

linux手动安装 golangci-lint-1.53.3-linux-386.rpm

首先还是 去下载对应的 rpm 包 https://github.com/golangci/golangci-lint/releases 然后上传到 服务器/usr/local 目录下 执行安装命令 sudo rpm -ivh golangci-lint-1.53.3-linux-386.rpm 查看版本 golangci-lint --version

14.3.5 LVM 的 LV 磁盘快照

什么是 LV 磁盘快照&#xff1f;快照就是将当时的系统信息记录下来&#xff0c;就好像照相记录一般&#xff01; 未来若有任何数据更动了&#xff0c;则原始数据会被搬移到快照区&#xff0c;没有被更动的区域则由快照区与文件系统共享。 图14.3.3、LVM快照区域的备份示意图 左…

微信小程序的项目解构

视频链接 黑马程序员前端微信小程序开发教程&#xff0c;微信小程序从基础到发布全流程_企业级商城实战(含uni-app项目多端部署)_哔哩哔哩_bilibili 接口文档 https://www.escook.cn/docs-uni-shop/mds/1.start.html 1&#xff1a;微信小程序宿主环境 1&#xff1a;常见的宿…

Openlayers实战:判断共享单车是否在电子围栏内

共享单车方便了我们的日常生活,解决了后一公里的行程问题。为了解决共享单车乱放的问题,运营部门规划出一些围栏,配合到电子地图上即为电子围栏,只有放在围栏内才能停车结算,在我们的Openlayers实战示例中,即模拟这一场景。 效果图 源代码 /* * @Author: 大剑师兰特(x…