第十四章 RabbitMQ延迟消息之延迟队列

news2024/11/22 21:53:43

目录

一、引言

二、死信队列

三、核心代码实现

四、运行效果 

五、总结


一、引言

什么是延迟消息?

发送者发送消息时指定一个时间,消费者不会立刻收到消息,而是在指定时间后收到消息。

什么是延迟任务?

设置在一定时间之后才执行的任务。

延迟消息使用场景

我们在实际项目中经常会有一些场景,需要延迟指定时间后发送消息,比如在电商或者外卖平台中订单10分钟后自动取消功能等。

对于上述延迟消息的场景,我们该怎么实现呢?

RabbitMQ 官方并没有直接内置延迟消息的功能,但是可以通过 TTL(Time-To-Live)和死信队列(Dead Letter Exchanges)的组合来实现延迟消息的效果,另外RabbitMQ 也可以通过安装延迟消息插件的方式来实现。

二、死信队列

电商购物中,针对用户下单扣减库存的服务逻辑,我们希望删除10分钟后状态为未支付的订单。在过去的项目中,我们可能第一时间会想到通过定时任务定期查询未支付的订单并做删除来实现:

定时任务会有两个问题:

1. 当针对订单量特别大的电商项目而言,定时任务间断性地查询整个订单数据会极大增加订单服务的压力。

2. 定时任务存在时间上的滞后性。

通过使用RabbitMQ延迟消息,我们可以在完成需求的同时,有效的避免上述问题。如下图所示,用户通过交易服务下单(状态为未支付),随后交易服务调用商品服务扣减库存。 用户在调用交易服务的同时发送一个延迟消息到RabbitMQ,10分钟后交易服务收到消息,此时如果订单还是未支付状态,则取消订单。

RabbitMQ中的死信队列,就是一种可以实现延迟消息的方式。当一个队列中的消息满足下列情况之一时,就会成为死信(dead letter):

1. 消费者使用basic.reject或 basic.nack声明消费失败,并且消息的requeue参数设置为false

2. 消息是一个过期消息(达到了队列或消息本身设置的过期时间),超时无人消费

3. 要投递的队列消息堆积满了,最早的消息可能成为死信

如果队列通过dead-letter-exchange属性指定了一个交换机,那么该队列中的死信就会投递到这个交换机中。这个交换机称为死信交换机(Dead Letter Exchange,简称DLX)。

三、核心代码实现

package com.example.consumer;

import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
 * 常规的RabbitMQ 交换机/队列绑定配置类
 */
@Configuration
public class RabbitMQConfig {

    @Bean
    Queue normalQueue() {
        // 使用 QueueBuilder 创建一个持久化队列
        return QueueBuilder.durable("normal.queue")
                .deadLetterExchange("dead.direct")
                .deadLetterRoutingKey("dead")
                .build();
    }

    @Bean
    DirectExchange normalDirect() {
        return ExchangeBuilder.directExchange("normal.direct").build();
    }

    @Bean
    Binding bindingNormal(Queue normalQueue, DirectExchange normalDirect) {
        return BindingBuilder.bind(normalQueue).to(normalDirect).with("normal");
    }

    @Bean
    Queue deadQueue() {
        // 使用 QueueBuilder 创建一个持久化队列
        return QueueBuilder.durable("dead.queue").build();
    }

    @Bean
    DirectExchange deadDirect() {
        return ExchangeBuilder.directExchange("dead.direct").build();
    }

    @Bean
    Binding bindingDead(Queue deadQueue, DirectExchange deadDirect) {
        return BindingBuilder.bind(deadQueue).to(deadDirect).with("dead");
    }
}
package com.example.publisher;

import jakarta.annotation.Resource;
import lombok.extern.slf4j.Slf4j;
import org.junit.jupiter.api.Test;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageBuilder;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.boot.test.context.SpringBootTest;

import java.nio.charset.StandardCharsets;

/**
 * 生产者
 */
@Slf4j
@SpringBootTest
class PublisherApplicationTests {

    @Resource
    private RabbitTemplate rabbitTemplate;

    @Test
    void test() {
        String content = "生活不易,所以保持足够的努力,对自己要有信心,积极地去面对工作生活的挑战!";
        Message message = MessageBuilder.withBody(content.getBytes(StandardCharsets.UTF_8))
                .setExpiration("10000").build();
        rabbitTemplate.convertAndSend("normal.direct",
                "normal", message);
    }
}
package com.example.consumer;

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

import java.nio.charset.StandardCharsets;

/**
 * 消费者
 */
@Slf4j
@Component
public class SimpleListener {

    @RabbitListener(queues = "dead.queue")
    public void listener1(Message message) throws Exception {
        String msg = new String(message.getBody(), StandardCharsets.UTF_8); ;
        System.out.println("消费者1:人生是个不断攀登的过程【" + msg + "】");
    }
}

四、运行效果 

五、总结

虽然我们通过RabbitMQ的死信队列能够实现延迟消息的功能,但是通过代码我们可以看到,这种实现方式相对来说比较繁琐。而且关键是RabbitMQ提供死信队列的初衷并不是让我们用来发送延迟消息的,而是为了作为兜底方案,来接收没有消费的死信的,便于定位问题。因此,后续章节会给大家讲解更优的解决方案,即延迟插件。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2209690.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

InfluxDB持久层封装

InfluxDB持久层封装 了解如何使用spring-boot来操作InfluxDB数据库,首先我们来看下整个的系统结构图例: 对比下mybatis中的执行流程: 1_自动装配 首先,我们来看下第一步自动装配:依赖spring-boot自动装配出InfluxDB对…

第十五届蓝桥杯C/C++学B组(解)

1.握手问题 解题思路一 数学方法 50个人互相握手 (491)*49/2 ,减去7个人没有互相握手(61)*6/2 答案:1024 解题思路二 package 十五届;public class Min {public static void main(String[] args) {i…

实时从TDengine数据库采集数据到Kafka Topic

实时从TDengine数据库采集数据到Kafka Topic 一、认识TDengine二、TDengine Kafka Connector三、什么是 Kafka Connect?四、前置条件五、安装 TDengine Connector 插件六、启动 Kafka七、验证 kafka Connect 是否启动成功八、TDengine Source Connector 的使用九、添…

【更新】A股上市公司企业网络安全治理数据集(2007-2023年)

一、测算方式:参考C刊《金融评论》王辉(2024)老师的做法,安全治理种子词的选取主要依托于《中华人民共和国网络安全法》、《中华人民共和国数据安全法》、《关键信息基础设施安全保护条例》等法律法规文件与《网络安全审查办法》、…

蓝桥杯刷题--幸运数字

幸运数字 题目: 解析: 我们由题目可以知道,某个进制的哈沙德数就是该数和各个位的和取整为0.然后一个幸运数字就是满足所有进制的哈沙德数之和.然后具体就是分为以下几个步骤 1. 我们先写一个方法,里面主要是用来判断,这个数在该进制下是否是哈沙德数 2. 我们在main方法里面调用…

量化之一:均值回归策略

文章目录 均值回归策略理论基础数学公式 关键指标简单移动平均线(SMA)标准差Z-Score 交易信号实际应用优缺点分析优点缺点 结论 实践backtrader参数:正常情况:异常情况: 均值回归策略 均值回归(Mean Rever…

华为公有云实战

1.申请一台ECS云主机,并且可以提供web服务 1.1访问云主机-华为特有技术novnc,KVM中提到vnc技术,novnc是不用安装vnc客户端用浏览器html语言实现。 1.2cloudshell 1.3小工具 ssh 弹性ip 1.4.安装httpd服务 建立索引文件 浏览器上输入弹性ip可…

网络资源模板--Android Studio 实现简易记事本App

目录 一、项目演示 二、项目测试环境 三、项目详情 四、完整的项目源码 一、项目演示 网络资源模板--基于Android studio 实现的简易记事本App 二、项目测试环境 三、项目详情 首页 创建一个空的笔记本列表 mNotebookList。使用该列表和指定的布局资源 item_notebook 创建…

前端开发笔记--html 黑马程序员1

文章目录 前端开发工具--VsCode前端开发基础语法VsCode优秀插件Chinese --中文插件Auto Rename Tag --自动重命名插件open in browserOpen in Default BrowserOpen in Other Browser Live Server -- 实时预览 前端开发工具–VsCode 轻量级与快速启动 快速加载:VSCo…

WordPress添加meta标签做seo优化

一、使用function.php文件添加钩子函数添加 方法1、使用is_page()判断不同页面的page_id进行辨别添加不同页面keyword和description (1)通过页面前台源码查看对应页面的id (2)或者通过wordpress后台,点击页面列表&…

云计算ftp 服务器实验

创建VLAN 10 划分端口 创建VLAN 10 的地址 10.1.1.1 服务器的地址是 10.1.1.2 这是服务上的配置 服务器上选择ftp 启动 ,文件目录选择一下 在 交换机上 ftp 10.1.1.2 服务器的地址 把刚才创建的shenyq txt 文件下载下到本地交换机 我们能看到交换…

有关安科瑞Acrel-1000DP分布式光伏监控系统在某公司分布式光伏发电项目中的应用探讨-安科瑞 蒋静

摘要:分布式光伏作为可再生能源的一种重要形式,能够根据不同场地的实际情况进行定制,尽可能地利用可用空间,减少对传统化石燃料的依赖,也能降低温室气体排放、改善环境质量。在政策支持和市场需求的双重推动下&#xf…

电脑查不到IP地址是什么原因?怎么解决

在日常使用电脑的过程中,有时会遇到无法查询到电脑IP地址的情况,这可能会影响到网络的正常使用。本文将探讨电脑查不到IP地址的可能原因,并提供相应的解决方案。 一、原因分析 ‌网络连接问题‌:首先,网络连接不稳定或…

MySQL(B站CodeWithMosh)——2024.10.11(14)

ZZZZZZ目的ZZZZZZ代码ZZZZZZ重点ZZZZZZ操作(非代码,需要自己手动) 8- CASE运算符The CASE Operator_哔哩哔哩_bilibilihttps://www.bilibili.com/video/BV1UE41147KC?p62&vd_sourceeaeec77dfceb13d96cce76cc299fdd08 在sql_store中&am…

智能网联汽车安全隐患,如何化解?

0. 智能网联汽车安全问题如何才能解决?1. TARA 威胁分析与风险评估平台2. CSTP 智能网联汽车网络安全测试平台3. 智能网联汽车安全解决方案4. 车联网测试认证与培训解决方案5. 车联网网络安全实验室建设方案 0. 智能网联汽车安全问题如何才能解决? 智能…

FFmpeg的简单使用【Windows】--- 简单的视频混合拼接

实现功能 点击【选择文件】按钮在弹出的对话框中选择多个视频,这些视频就是一会将要混剪的视频素材,点击【开始处理】按钮之后就会开始对视频进行处理,处理完毕之后会将处理后的文件路径返回,并在页面展示处理后的视频。 视频所…

【数据结构】排序算法系列——桶排序(附源码+图解)

桶排序 算法思想 桶排序(BucketSort),也被叫做箱排序,它将整个数据组分为n个相同大小的子区间,这类子区间或称为桶。输入数据是均匀、独立分布的,所以一般不会出现一个桶中装有过多数据的情况。作为一种排序算法&…

160页PPT | 埃森哲-制造业变革转型八大领域:痛点剖析与改进策略

PT下载链接见文末~ 引言:制造业数字化转型规划 制造业正处于数字化转型的关键时期,旨在通过技术革新和流程优化,灵活应对市场波动,强化竞争优势,并紧跟技术进步的步伐。此规划围绕三大核心要素展开: 1、…

Pytest中fixture的scope详解

pytest作为Python技术栈下最主流的测试框架,功能极为强大和灵活。其中Fixture夹具是它的核心。而且pytest中对Fixture的作用范围也做了不同区分,能为我们利用fixture带来很好地灵活性。 下面我们就来了解下这里不同scope的作用 fixture的scope定义 首…

8.优化存储过程的性能(8/10)

优化存储过程的性能 1.引言 存储过程是数据库系统中预先编写好的SQL语句集合,它们被保存在数据库服务器上,可以在需要时被调用执行。存储过程的使用可以提高数据库操作的效率,减少网络通信,并且可以封装复杂的逻辑,使…