MQ专题:延迟消息的通用方案

news2024/9/20 16:46:10

一、主要内容

本文将实现一个MQ延迟消息的通用方案。

方案不依赖于MQ中间件,依靠MySQL和DelayQueue解决,不管大家用的是什么MQ,具体是RocketMQ、RabbitMQ还是kafka,本文这个方案你都可以拿去直接使用,可以轻松实现任意时间的延迟消息投递。

二、涉及技术点

  1. SpringBoot2.7
  2. MyBatisPlus
  3. MySQL
  4. 线程池
  5. java中的延迟队列:DelayQueue
  6. 分布式锁

三、延迟消息常见的使用场景

  1. 订单超时处理

比如下单后15分钟,未支付,则自动取消订单,回退库存。

可以采用延迟队列实现:下单的时候可以投递一条延迟15分钟的消息,15分钟后消息将被消费。

  1. 消息消费失败重试

比如MQ消息消费失败后,可以延迟一段时间再次消费。

可以采用延迟消息实现:消费失败,可以投递一条延迟消息,触发再次消费

  1. 其他任意需要延迟处理的业务

业务中需要延迟处理的场景,都可以使用延迟消息来搞定。

四、延迟消息常见的实现方案

方案1:MySQL + job定时轮询

由于延迟消息的时间不确定,若要达到实时性很高的效果,也就是说消息的延迟时间是不知道的,那就需要轮询每一秒才能确保消息在指定的延迟时间被处理,这就要求job需要每秒查询一次db中待投递的消息。

这种方案访问db的频率比较高,对数据库造成了一定的压力。

方案2:RabbitMQ 中的TTL+死信队列

rabbitmq中可以使用TTL消息 + 死信队列实现,也可以安装延时插件。

此方案对中间件有依赖,不同的MQ实现是不一样的,若换成其他的MQ,方案要重新实现

方案3:MySQL + job定时轮询 + DelayQueue

可以对方案1进行改进,引入java中的 DelayQueue。

job可以采用1分钟执行一次,每次拉取未来2分钟内需要投递的消息,将其丢到java自带的 DelayQueue 这个延迟队列工具类中去处理,这样便能做到实时性很高的投递效果,且对db的压力也降低了很多。

这种方案对db也没什么压力,实时性非常高,且对MQ没有依赖,这样不管切换什么MQ,这种方案都不需要改动。

本文将落地该方案。

需要一张本地消息表(t_msg)

这张表用来暂存事务消息和延迟消息

create table if not exists t_msg
(
    id               varchar(32) not null primary key comment '消息id',
    body_json        text        not null comment '消息体,json格式',
    status           smallint    not null default 0 comment '消息状态,0:待投递到mq,1:投递成功,2:投递失败',
    expect_send_time datetime    not null comment '消息期望投递时间,大于当前时间,则为延迟消息,否则会立即投递',
    actual_send_time datetime comment '消息实际投递时间',
    create_time      datetime comment '创建时间',
    fail_msg         text comment 'status=2 时,记录消息投递失败的原因',
    fail_count       int         not null default 0 comment '已投递失败次数',
    send_retry       smallint    not null default 1 comment '投递MQ失败了,是否还需要重试?1:是,0:否',
    next_retry_time  datetime comment '投递失败后,下次重试时间',
    update_time      datetime comment '最近更新时间',
    key idx_status (status)
) comment '本地消息表';

五、代码落地

将延迟消息保存到t_msg

在这里插入图片描述

投递事务消息 or 2分钟内的延时消息

在这里插入图片描述

每分钟执行一次Job,查出2分钟内应该被投递的延迟消息 和 2分钟内应该重新投递的上次投递失败的事务消息,再放到延时队列里

    /**
     * 每分钟执行一次
     */
    @Scheduled(fixedDelay = 1, timeUnit = TimeUnit.MINUTES)
    public void sendRetry() {
        /**
         * 查询出需要重试的消息(状态为0 and 期望投递时间 <= 当前时间 + 2分钟) || (投递失败的 and 需要重试 and 下次重试时间小于等于当前时间 + 2分钟)
         * select * from t_msg where ((status = 0 and expect_send_time<=当前时间+2分钟) or (status = 2 and send_retry = 1 and next_retry_time <= 当前时间 + 2分钟))
         */
        LocalDateTime time = LocalDateTime.now().plusMinutes(2);
        LambdaQueryWrapper<MsgPO> qw = Wrappers.lambdaQuery(MsgPO.class)
                .and(query -> query.and(lq ->
                                lq.eq(MsgPO::getStatus, MsgStatusEnum.INIT.getStatus())
                                        .le(MsgPO::getExpectSendTime, time))
                        .or(lq -> lq.eq(MsgPO::getStatus, MsgStatusEnum.FAIL.getStatus())
                                .eq(MsgPO::getSendRetry, 1)
                                .le(MsgPO::getNextRetryTime, time)));
        qw.orderByAsc(MsgPO::getId);

        //先获取最小的一条记录的id
        MsgPO minMsgPo = this.msgService.findOne(qw);
        if (minMsgPo == null) {
            return;
        }
        this.msgSender.sendRetry(minMsgPo);
        String minMsgId = minMsgPo.getId();

        //循环中继续向后找出id>minMsgId的所有记录,然后投递重试
        while (true) {
            //select * from t_msg where ((status = 0 and expect_send_time<=当前时间+2分钟) or (status = 2 and send_retry = 1 and next_retry_time <= 当前时间 + 2分钟)) and id>#{minMsgId}
            qw = Wrappers.lambdaQuery(MsgPO.class)
                    .and(query -> query.and(lq ->
                                    lq.eq(MsgPO::getStatus, MsgStatusEnum.INIT.getStatus()).le(MsgPO::getExpectSendTime, time))
                            .or(lq -> lq.eq(MsgPO::getStatus, MsgStatusEnum.FAIL.getStatus()).eq(MsgPO::getSendRetry, 1).le(MsgPO::getNextRetryTime, time)));
            qw.gt(MsgPO::getId, minMsgId);
            qw.orderByAsc(MsgPO::getId);
            Page<MsgPO> page = new Page<>();
            page.setCurrent(1);
            page.setSize(500);
            this.msgService.page(page, qw);

            //如果查询出来的为空 || 当前服务要停止了(stop=true),则退出循环
            if (CollUtils.isEmpty(page.getRecords()) || stop) {
                break;
            }
            //投递重试
            for (MsgPO msgPO : page.getRecords()) {
                this.msgSender.sendRetry(msgPO);
            }

            // minMsgId = 当前列表最后一条消息的id
            minMsgId = page.getRecords().get(page.getRecords().size() - 1).getId();
        }
    }

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2087234.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

【安规电容】

安规电容&#xff08;Safety Capacitors&#xff09;是一种专门设计用于电气设备中的电容器&#xff0c;主要用于确保电气安全&#xff0c;特别是用于交流电路中的滤波、降噪以及跨接隔离等功能。它们必须符合特定的安全标准&#xff0c;以确保电气设备在各种运行条件下都能保持…

MySQL:简述数据库的主从复制

MySQL主从复制是指数据可以从一个MySQL数据库主节点复制到一个或多个从节点。 MySQL默认采用异步复制方式&#xff0c;这样从节点不用一直访问主节点来更新自己的数据&#xff0c;数据的更新可以在远程连接上进行&#xff0c;从节点可以复制主节点中的所有数据库或者特定的数据…

FastJson序列化驼峰-下划线转换问题踩坑记录

背景 问题描述 在MySQL数据表中&#xff0c;存在一个JSON结构的扩展字段&#xff0c;通过updateById进行更新写入操作。更新写入的同一个字段名出现了混合使用了驼峰命名和下划线命名两种格式。 ps: FastJson版本是1.2.83 问题影响 数仓同学离线统计数据时发现字段名有两种…

【生命之树】

题目 思路 求联通区域中的最大和值 代码 #include <bits/stdc.h> using namespace std; const int N 1e510, M N << 1; const int null -0x3f3f3f3f; long long w[N]; int h[N], e[M], ne[M], idx; void add(int a, int b) // 添加一条边a->b {e[idx] b,…

虚幻5|(1)技能栏快捷格子的制作|(2)如何在游戏进行的时候显示鼠标,使用鼠标操作UI||(3)改进技能释放

一.创建技能栏格子UI 1.创建一个UI控件蓝图&#xff0c;命名为技能栏格子&#xff08;如何创建我就不多说了&#xff0c;学到这了基础知识应该有所掌握了&#xff09; 2.添加一个边界和垂直框 3.选中边界&#xff0c;右侧细节栏更改如下 4.再拖入一个文本块&#xff0c;做垂直…

java同步概念

同步&#xff08;Synchronization&#xff09;在Java多线程编程中是一个既重要又复杂的概念。它涉及到如何确保多个线程在访问共享资源时能够保持数据的一致性和完整性&#xff0c;避免出现竞态条件&#xff08;Race Condition&#xff09;等问题。 同步的基本概念 同步的主要目…

视频结构化从入门到精通——视频结构化主要技术介绍

视频结构化主要技术 1 视频接入 “视频接入”是视频结构化管道的起点&#xff08;SRC Point&#xff09;视频接入是视频结构化处理的第一步&#xff0c;它涉及将视频数据从各种采集源获取到系统中进行进一步处理。视频接入的质量和稳定性对后续的数据处理、分析和应用至关重要…

128 最长连续序列

解题思路&#xff1a; \qquad 由于题目要找的是最长连续序列&#xff0c;那么若已知序列起点&#xff0c;目标值递增向后遍历&#xff0c;借助哈希表检查目标值是否存在&#xff0c;可以在 O ( n ) O(n) O(n)时间内完成。 \qquad 但是若从数组每一个元素开始&#xff0c;遍历…

Datawhale X 李宏毅苹果书 AI夏令营_深度学习基础学习心得Task2.2

本次学习分类问题 1、分类与回归&#xff1a; 回归一般输出一个值y_hat 期望与y越接近越好。分类一般有几类就输出几个值&#xff0c;是一个one-hot的向量&#xff0c;在类别对应的位置值为1 本文介绍了一种重复输出数值后加权的方法&#xff0c;但是我做分类一般用全连接层…

【uniapp重大bug】uni-data-select的localdata改变,也会触发@change方法

bug描述 uni-data-select的下拉列表值localdata是动态获取的&#xff0c;且绑定了change方法&#xff0c;在页面加载后&#xff0c;请求localdata的列表数据&#xff0c;给localdata重新赋值&#xff0c;此时发现自动触发了change方法 当前uni版本&#xff1a;^2.0.2-30709202…

全场景——(七)libmodbus 使用

文章目录 一、libmodbus开发库1.1 功能概要1.2 源码获取1.3 libmodbus与应用程序的关系 二、libmodbus源代码解析2.1 核心函数2.2 框架分析与数据结构2.3 情景分析2.3.1 初始化2.3.2 主设备发送请求2.3.3 从设备接收请求2.3.4 从设备回应 2.4 常用接口函数2.4.1 各类辅助接口函…

2024版Assimp配置教程

最近想看看图形学&#xff0c;选择速通LearnOpenGL&#xff0c;不出意外最耗时间的依然是配置环境。按照教程上的把GLFW等等配置的没有问题&#xff0c;但是在Assimp这里卡住了。原因是教程上说的不详细&#xff0c;而网上查的又和现在的版本相去甚远&#xff0c;导致捣鼓了好一…

Linux基础1-基本指令6(grep,zip,tar,查看系统等命令)

一.本章重点 1.grep命令用于过滤文本信息,sort,uniq 2.zip/uzip命令用于压缩&#xff0c;解压文件 3.tar命令用于压缩&#xff0c;解压文件 二.grep grep命令 gerp(文件内容的行过滤工具)&#xff0c;默认会&#xff0c;会匹配文本中的关键字&#xff0c;匹配上的进行行显示 …

全民k歌怎么去水印保存?盘点分享3个无水印保存方法

在全民K歌的世界里&#xff0c;我们尽情展现音乐才华&#xff0c;但有时却会遇到一个棘手的问题&#xff1a;如何将歌曲视频无水印保存&#xff0c;以便自由分享到其他社交平台&#xff1f;为了解决这一难题&#xff0c;本文将为你盘点三种简单有效的无水印保存方法&#xff0c…

Python课堂笔记

1.大小写 大写&#xff1a;True、 None、 False 注意&#xff1a;大小写含义不相同 2.一行多个短句 短句&#xff1a;“ &#xff1b;” 长句&#xff1a;“ \” 3.变量 (1) int A[100] 整型 char B[100] 字符型 &#xff08;2&#xff09;type: 查看变量类型 补充&…

Language Models are Unsupervised Multitask Learners

摘要 自然语言处理任务&#xff0c;如问答、机器翻译、阅读理解和摘要&#xff0c;通常在任务特定的数据集上使用监督学习来处理。当在一个名为WebText的数百万网页的新数据集上训练时,我们证明了语言模型在没有任何明确监督的情况下开始学习这些任务。在不使用127,000多个训练…

【BPF之旅】认识eBPF

文章目录 一、eBPF基础认识1.1 eBPF历史演进1.2 eBPF特点和使用场景eBPF的特点&#xff08;优势&#xff09;eBPF的限制&#xff08;安全性的体现&#xff09;eBPF vs 内核模块应用场景 1.3 eBPF工作原理eBPF程序执行过程eBPF的开销 二、eBPF简单实践&#xff08;Hello World&a…

大数据技术

4v特点 volume&#xff08;体量大&#xff09; velocity&#xff08;处理速度快&#xff09; variety&#xff08;数据类型多&#xff09; value&#xff08;价值密度低&#xff09; 核心设计理念 并行化 规模经济 虚拟化 分布式系统满足需求 系统架构 大数据处理流程 结构化…

如何在QT6上配置文心一言的接口,从而生成一个自己的对话框

这里写自定义目录标题 前言&#xff1a;效果展示&#xff1a;环境配置&#xff1a;计划完善&#xff1a;核心代码&#xff1a; 前言&#xff1a; 网上有很多在前端调用文心一言接口的&#xff0c;想在QT上配置文心一言的接口&#xff0c;从而生成一个自己的对话框。 效果展示…

Sentinel-1 Level 1数据处理的详细算法定义(九)

《Sentinel-1 Level 1数据处理的详细算法定义》文档定义和描述了Sentinel-1实现的Level 1处理算法和方程&#xff0c;以便生成Level 1产品。这些算法适用于Sentinel-1的Stripmap、Interferometric Wide-swath (IW)、Extra-wide-swath (EW)和Wave模式。 今天介绍的内容如下&…