第十六章 RabbitMQ延迟消息之延迟插件优化

news2025/1/23 3:21:52

目录

一、引言

二、优化方案 

三、核心代码实现

3.1. 生产者代码

3.2. 消息处理器 

3.3. 自定义多延迟消息封装类 

3.4. 订单实体类 

3.5. 消费者代码 

四、运行效果


一、引言

上一章节我们提到,直接使用延迟插件,创建一个延迟指定时间的消息(如10分钟),并不是最好的解决方案,因为假如我们的订单是在5分钟支付的,那么剩余的5分钟时间,RabbitMQ中延迟消息时钟还是一直占用着资源。如果有大量的延迟消息,那么对于服务来说压力是很大的,同时会耗费庞大昂贵的资源。因此,本章节我们就来近一步对延迟插件的消息进行优化。

我们通过下面的流程图来做近一步分析:

1. 用户下单完成后,发送15分钟延迟消息,在15分钟后接收消息,检查支付状态:

2. 已支付:更新订单状态为已支付

3. 未支付:更新订单状态为关闭订单,恢复商品库存

常规延迟插件消息使用的弊端总结:

1. 设置30分钟后检测订单支付状态实现起来非常简单,但是存在两个问题:

2. 如果并发较高,30分钟可能堆积消息过多,对MQ压力很大

3. 大多数订单在下单后1分钟内就会支付,但是却需要在MQ内等待30分钟,浪费资源

二、优化方案 

如下图所示,我们可以将10分钟甚至30分钟拆分成多份零散的较短的时间。

消息初次发送的延迟时间设定为10s,10s过后如果订单还是未支付状态,我们判断延迟时间数组里还有没有剩余延迟时间,如果有则继续发送延迟消息,时间设定为数组中的第二个时间10s,直到订单支付成功终止循环,或是最后一份时间消耗完依然未支付,我们取消订单。

三、核心代码实现

3.1. 生产者代码

package com.example.publisher;

import jakarta.annotation.Resource;
import lombok.extern.slf4j.Slf4j;
import org.junit.jupiter.api.Test;
import org.springframework.amqp.AmqpException;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessagePostProcessor;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.boot.test.context.SpringBootTest;

/**
 * 生产者
 */
@Slf4j
@SpringBootTest
class PublisherApplicationTests {

    @Resource
    private RabbitTemplate rabbitTemplate;

    @Test
    void test() {
        Order order = Order.builder().orderId(1L).content("生活不易,所以保持足够的努力,对自己要有信心,积极地去面对工作生活的挑战!").build();
        MultiDelayMessage<Order> msg = MultiDelayMessage.of(order, 1000L, 5000L, 2000L, 10000L);

        rabbitTemplate.convertAndSend("delay.direct", "delay", msg, new MessagePostProcessor() {
            @Override
            public Message postProcessMessage(Message message) throws AmqpException {
                message.getMessageProperties().setDelayLong(msg.removeNextDelay());
                return message;
            }
        });
    }
}

3.2. 消息处理器 

package com.example.publisher;

import lombok.AllArgsConstructor;
import org.springframework.amqp.AmqpException;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessagePostProcessor;

/**
 * 消息请求处理器
 */
@AllArgsConstructor
public class DelayMessageProcessor implements MessagePostProcessor {

    private final Long delay;

    @Override
    public Message postProcessMessage(Message message) throws AmqpException {
        message.getMessageProperties().setDelayLong(delay);
        return message;
    }
}

3.3. 自定义多延迟消息封装类 

package com.example.publisher;

import lombok.Data;
import lombok.NoArgsConstructor;
import org.springframework.util.CollectionUtils;

import java.io.Serializable;
import java.util.List;

/**
 * 自定义的多延时消息封装类
 * @param <T>
 */
@Data
@NoArgsConstructor
public class MultiDelayMessage<T> implements Serializable {
    /**
     * 消息体
     */
    private T data;

    /**
     * 记录延迟时间的集合
     */
    private List<Long> delayMillis;

    public MultiDelayMessage(T data, List<Long> delayMillis) {
        this.data = data;
        this.delayMillis = delayMillis;
    }

    public static <T> MultiDelayMessage<T> of(T data, Long...delayMillis) {
        return new MultiDelayMessage<>(data, (List<Long>) CollectionUtils.arrayToList(delayMillis));
    }

    /**
     * 获取并移除下一个延迟时间
     * @return 队列中的第一个延迟时间
     */
    public Long removeNextDelay() {
        return delayMillis.remove(0);
    }

    /**
     * 是否还有下一个延迟时间
     * @return
     */
    public boolean hasNextDelay() {
        return !delayMillis.isEmpty();
    }
}

3.4. 订单实体类 

package com.example.publisher;

import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;

/**
 * 订单类 
 * 此处为了演示,将真实业务中的订单类做了简化
 * 只包含一个订单ID和自定义消息内容
 */
@Data
@AllArgsConstructor
@NoArgsConstructor
@Builder
public class Order {

    private Long orderId;

    private String content;
}

3.5. 消费者代码 

package com.example.consumer;

import jakarta.annotation.Resource;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.AmqpException;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessagePostProcessor;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Component;

/**
 * 消费者
 * 因为作为演示,所以商城支付、订单、及扣减库存的业务代码已注释
 * 注释中保留了整个商城下单支付扣减库存的流程步骤
 */
@Slf4j
@Component
public class SimpleListener {

    @Resource
    private RabbitTemplate rabbitTemplate;

    @RabbitListener(bindings = @QueueBinding(
            value = @Queue(name = "delay.queue", durable = "true"),
            exchange = @Exchange(name = "delay.direct", delayed = "true"),
            key = "delay"
    ))
    public void listener(MultiDelayMessage<Order> msg) throws Exception {
        System.out.println(((Order)msg.getData()).getContent());
        // 1. 查询订单状态
        // Order order = orderService.getById(msg.getData())
        // 2. 判断是否已支付
//        if (Order == null || order.status == 2) {
//            订单不存在或者已处理则直接返回
//            return;
//        }
        // 主动去支付服务查询真正的支付状态
//        PayOrder payOrder = payService.getById(order.getId());
        // 2.1. 已支付,则标记订单为已支付
//        if (payOrder.isPay()) {
//            orderService.markOrderPaySuccess(order.getId());
//            return;
//        }
        // 2.2. 未支付,获取下次订单延迟时间
        // 3. 判断是否存在延迟时间
        if (msg.hasNextDelay()) {
            // 3.1 存在,重发延迟消息
            Long nextDelay = msg.removeNextDelay();

            rabbitTemplate.convertAndSend("delay.direct", "delay", msg, new MessagePostProcessor() {
                @Override
                public Message postProcessMessage(Message message) throws AmqpException {
                    message.getMessageProperties().setDelayLong(nextDelay);
                    return message;
                }
            });
            return;
        }
        // 3.2 不存在,取消订单
//        orderService.lambdaUpdate()
//                .set(Order::getStatus, 5);
//                .set(Order::getCloseTime, LocalDateTime.now());
//                .eq(Order::getId, order.getId())
//                .update();
        // 4. 恢复库存
    }
}

四、运行效果

最终我们会看到每间隔一段时间消费者就会消费一条消息,这个间隔时间就是我们设定的分段时间数组,这么做就能极大地减少资源消耗和服务的压力:

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2210290.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

一款零依赖、跨平台的流媒体协议处理工具,支持 RTSP、WebRTC、RTMP 等视频流协议的处理

大家好&#xff0c;今天给大家分享一款功能强大的流媒体协议处理工具go2rtc&#xff0c;支持多种协议和操作系统&#xff0c;具有零依赖、零配置、低延迟等特点。 项目介绍 go2rtc可以从各种来源获取流&#xff0c;包括 RTSP、WebRTC、HomeKit、FFmpeg、RTMP 等&#xff0c;并…

第一个servlet程序

文章目录 在原有工程上建立模块前端配置前后端映射关系添加外部依赖库后端代码启动配置 在原有工程上建立模块 添加web框架 前端 应用结构 <!DOCTYPE html> <html lang"en"> <head><meta charset"UTF-8"><title>第一…

【云原生技术】Docker容器进阶知识

文章目录 namespace概述一、namespace的基本概念二、namespace的主要作用三、namespace的类型四、namespace的操作五、namespace在容器技术中的应用 cgroup一、cgroup的基本概念二、cgroup的主要功能三、cgroup的子系统介绍四、cgroup的应用场景五、cgroup的使用与管理 cgroup和…

uniapp-小程序开发0-1笔记大全

uniapp官网&#xff1a; https://uniapp.dcloud.net.cn/tutorial/syntax-js.html uniapp插件市场&#xff1a; https://ext.dcloud.net.cn/ uviewui类库&#xff1a; https://www.uviewui.com/ 柱状、扇形、仪表盘库&#xff1a; https://www.ucharts.cn/v2/#/ CSS样式&…

余 弦 曲 线

1&#xff0e;问题描述 在屏幕上画出余弦函数cos&#xff08;x&#xff09;曲线&#xff0c;如图1.6所示。 图1.6 余弦函数cos&#xff08;x&#xff09;曲线 2&#xff0e;问题分析 连续的曲线是由点组成的,点与点之间距离比较近&#xff0c;看上去就是曲线了&#xff0c;…

uni-app 如何全局设置,获取app.vue里面的值

在globalData里设置一个值 通过下面方法修改 this.$options.globalData.$versonStatus status 在页面中通过getApp()获取 getApp().globalData.$versonStatus

<<迷雾>> 第11章 全自动加法计算机(3)--存储器示例 示例电路

info::操作说明 将一组比特单元扩展为多组的结果, 操作原理类似 注: 一次只能操作一组, 此处需进一步引入地址译码器才能具体进行操作, 见后面的例子, 此处仅展示结构原理 primary::在线交互操作链接 https://cc.xiaogd.net/?startCircuitLinkhttps://book.xiaogd.net/cyjsjdm…

C++模板初阶速成

温馨提示&#xff1a;本篇文章依旧是c速成系列的文章&#xff0c;但和以往有所不同的是&#xff0c;本篇文章带大家简单了解并能够学会使用模板 泛型编程 概念&#xff1a;编写与类型无关的通用代码&#xff0c;是代码复用的一种手段。 什么是复用&#xff1f; 简单来说就是…

分组相关 -- EoO原理

EoO (Ethernet over OTN) 是将以太网信号处理后&#xff0c;经过封装、映射后上OTN系统&#xff0c;通过WDM通道来传送的技术。传统的OTN承载ETH专线&#xff0c;不支持限速&汇聚等场景。引入EoO技术后&#xff0c;支持LAG、QoS、OAM等L2层的功能特性&#xff0c;实现端口限…

RBTree(红黑树)的介绍和实现

欢迎来到杀马特的主页&#xff1a;羑悻的小杀马特.-CSDN博客 目录 ​编辑 一红黑树介绍&#xff1a; 1.1红黑树概念&#xff1a; 1.2红黑树遵循的原则&#xff1a; 1.3红黑树效率分析&#xff1a; 二.红黑树的实现&#xff1a; 2.1红黑树结构&#xff1a; 2.2红黑树节点…

TMGM:下周全球市场分析

本周&#xff0c;美国股市表现强劲&#xff0c;标普500指数和道琼斯工业平均指数均刷新了历史最高记录。这一涨势得到了美联储公开市场委员会&#xff08;FOMC&#xff09;会议纪要的支撑&#xff0c;纪要显示大部分官员都支持9月份实施重大的50个基点降息。 在澳大利亚&#…

宝安网站建设中的响应式设计

宝安网站建设中的响应式设计 在数字化时代&#xff0c;网站已成为企业展示形象、吸引客户的重要平台。而在宝安地区&#xff0c;随着互联网的发展&#xff0c;越来越多的企业意识到响应式设计的重要性。响应式设计不仅能够提升用户体验&#xff0c;还能为企业带来更大的市场竞争…

.NET 一款通过DCOM实现系统提权的工具

01阅读须知 此文所提供的信息只为网络安全人员对自己所负责的网站、服务器等&#xff08;包括但不限于&#xff09;进行检测或维护参考&#xff0c;未经授权请勿利用文章中的技术资料对任何计算机系统进行入侵操作。利用此文所提供的信息而造成的直接或间接后果和损失&#xf…

2024年恩施职称评前公示

此次公示共有422人&#xff0c;初级职称、中级职称、馆员、畜牧师、助理馆员、三级演员、农艺师等均在一起进行评审前的公示。 根据恩施州职称改革工作领导小组办公室《关于报送2024年度恩施州中初级专业技术职务评审材料的通知》&#xff08;恩施州职改办〔2024〕14号&#xf…

04 什么是线性表

什么是线性表 一、为什么需要线性表 例如&#xff1a; ​ 在程序中保存指定班级的所有的学生信息&#xff08;暂时只需要处理姓名、年龄&#xff09;&#xff0c;该班级最多可容纳30人&#xff0c;且可进行数量上的增减。 业务功能&#xff1a; ​ 1&#xff09;这个项目中…

【分布式架构】分布式锁Redission

一、流程图 二、lua脚本实现原理 由setnxexpire实现 setnx&#xff1a;判断key是否存在&#xff0c;如果不存在&#xff0c;则获取锁 setnx的数据结构&#xff1a;<key,<key1,value>> key&#xff1a;业务声明的key key1&#xff1a;线程id value&#xff1a;次数…

css 翻页效果

有一个项目&#xff0c;页面切换的时候要翻页效果。 所以有一个简单的demo&#xff0c;提供给大家学习 <!DOCTYPE html> <html lang"en"> <head> <meta charset"UTF-8"> <meta name"viewport" content"widthdev…

Golang | Leetcode Golang题解之第476题数字的补数

题目&#xff1a; 题解&#xff1a; func findComplement(num int) int {highBit : 0for i : 1; i < 30; i {if num < 1<<i {break}highBit i}mask : 1<<(highBit1) - 1return num ^ mask }

空间计算:现实与数字世界的无缝融合

随着增强现实&#xff08;AR&#xff09;、虚拟现实&#xff08;VR&#xff09;、物联网&#xff08;IoT&#xff09;和人工智能&#xff08;AI&#xff09;的不断进步&#xff0c;空间计算&#xff08;Spatial Computing&#xff09;作为一种新兴技术&#xff0c;正在成为人们…

IDEA Sping Boot 多配置文件application Maven动态切换

新建application-dev.yml与application-prod.yml pom.xml文件下添加profiles等 让idea识别出配置文件 <profiles><profile><id>dev</id><properties><!-- 环境标识&#xff0c;需要与配置文件的名称相对应 --><profiles.active>dev&…