redis — 基于Spring Boot实现redis延迟队列

news2025/1/14 18:28:52

1. 业务场景

延时队列场景在我们日常业务开发中经常遇到,它是一种特殊类型的消息队列,它允许把消息发送到队列中,但不立即投递给消费者,而是在一定时间后再将消息投递给消费者。延迟队列的常见使用场景有以下几种:

  • 在各种购物平台上下单,订单超过30分钟未支付,自动关闭。
  • 订单完成后, 如果用户一直未评价, 5天后自动好评。
  • 会员到期前15天, 到期前3天分别发送短信提醒。
  • 当订单一直处于未支付状态时,如何及时的关闭订单,并退还库存?
  • 如何定期检查处于退款状态的订单是否已经退款成功?

2. Redis延迟队列实现原理

目前延迟队列的类型主要实现有:

  • 基于消息的延迟:指为每条消息设置不同的延迟时间,那么每当队列中有新消息进入的时候就会重新根据延迟时间排序,或者定义时间轮,新消息落在指定位置;
  • 基于队列的延迟: 设置不同延迟级别的队列,比如5s、1min、30mins、1h等,每个队列中消息的延迟时间都是相同的。

基于第一种不少组件都有实现方案,比如redis的sortset间接实现,kafka内部时间轮,rabbitMQ可安装插件实现。第一种实时性高,不过主观看会比较依赖组件本身,但自己实现就得考虑持久化、高可用等问题,建议直接使用组件本身;第二种方案可以基于组件去实现,通用性会高点,不过实时性不高,更适合用于重试业务场景。当然Redis本身并不支持延迟队列,所以我们只是实现一个比较简单的延迟队列,而且Redis不太适合大量消息堆积,所以只适合比较简单的场景,然假如我们对消息的实时性以及可靠性要求非常高,可能就需要使用MQ或kafka来实现了。

消息延迟流程图如下:
在这里插入图片描述
Redis延迟队列可以通过 zset 来实现,因为 zset 中有一个 score,我们可以把时间作为 score,将 value 存到 redis 中,然后通过轮询的方式,去不断的读取消息出来,整体思路为:

  1. 消息体设置有效期,设置好score,然后放入zset中
  2. 通过排名拉取消息
  3. 有效期到了,就把当前消息从zset中移除

zadd命令

使用方式:ZADD key score member [[score member][score member] …]
将一个或多个 member 元素及其 score 值加入到有序集 key 当中。如果 key 不存在,则创建一个空的有序集并执行 ZADD 操作。如果某个 member 已经是有序集的成员,那么更新这个 member 的 score 值,并通过重新插入这个 member 元素,来保证该 member 在正确的位置上。score 值可以是整数值或双精度浮点数。

ZRANGEBYSCORE命令

使用方式:ZRANGEBYSCORE key min max [WITHSCORES] [LIMIT offset count]

  1. 返回有序集 key 中,所有 score 值介于 min 和 max 之间(包括等于 min 或 max )的成员。有序集成员按 score 值递增(从小到大)次序排列。
  2. 具有相同 score 值的成员按字典序来排列
  3. 可选的 LIMIT 参数指定返回结果的数量及区间(就像SQL中的 SELECT LIMIT offset, count ),注意当 offset 很大时,定位 offset 的操作可能需要遍历整个有序集,此过程最坏复杂度为 O(N) 时间。
  4. 可选的 WITHSCORES 参数决定结果集是单单返回有序集的成员,还是将有序集成员及其 score 值一起返回。

ZREM命令

使用方式:ZREM key member [member …]
移除有序集 key 中的一个或多个成员,不存在的成员将被忽略。
当 key 存在但不是有序集类型时,返回一个错误。

3. 基于springboot实现redis延迟队列

3.1 引入依赖

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-data-redis</artifactId>
    <version>${version}</version>
</dependency>
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-web</artifactId>
    <version>${version}</version>
</dependency>

3.2 redis基础方法

定义RedisService基础服务方法,本次案例只涉及到以下三个基础方法:

    /**
     * 添加 ZSet 元素
     *
     * @param key
     * @param value
     * @param score
     */
    @Override
    public boolean add(String key, Object value, double score) {
        return redisTemplate.opsForZSet().add(key, value, score);
    }
    
    /**
     * 返回 分数范围内 指定 count 数量的元素集合, 并且从 offset 下标开始(从小到大,不带分数的集合)
     *
     * @param key
     * @param min
     * @param max
     * @param offset 从指定下标开始
     * @param count  输出指定元素数量
     * @return
     */
    @Override
    public Set<Object> rangeByScore(String key, double min, double max, long offset, long count) {
        return redisTemplate.opsForZSet().rangeByScore(key, min, max, offset, count);
    }

    /**
     * Zset 删除一个或多个元素
     *
     * @param key
     * @param values
     * @return
     */
    @Override
    public Long removeZset(String key, Object... values) {
        return redisTemplate.opsForZSet().remove(key, values);
    }

3.3 定义Spring消息事件推送

@Getter
@ToString
public class DelayMsg extends ApplicationEvent {
    private String msg;
    private String topic;

    public DelayMsg(Object source, String msg, String topic) {
        super(source);
        this.msg = msg;
        this.topic = topic;
    }
}

3.4 消息获取

定义redis获取延迟队列消息方法:

/**
 * 从zset中取出score小于当前时间戳的数据
 *
 * @param key
 * @return
 */
public String getDelayOne(String key) {
    //先查后删,一次拿3个做备选,这样抢占到的概率就会高一些
    Set<Object> sets = redisService.rangeByScore(key, 0, System.currentTimeMillis(), 0, 3);
    if (CollectionUtils.isEmpty(sets)) {
        return null;
    }

    for (Object val : sets) {
        if (1L.equals(redisService.removeZset(key, val))) {
            // 删除成功,表示抢占到
            return val.toString();
        }
    }
    return null;
}

这里每次查询时取了三个数据,然后遍历获取到的数据,依次尝试去删除,若删除成功,则表示当前实例抢占到了这个消息

  1. 为什么这样设计? 这里有两个点,先解释第一个,为啥先查后删

如果我们按照正常的实现流程,每次从zset中取一个,但是无法保证这个时候就只有我一个人拿到了这个数据,在多实例的场景下,可能存在多个实例同时拿到了它,那么如何才能表示只有一个实例抢占到呢?

借助redis的单线程机制,只可能有一个实例会删除成功,所以拿到并删除成功的那个小伙伴,就是最终的幸运儿;

因此实现细节就是先查,后删,若删除成功,表示获取成功;否则表示被其他的实例捷足先登。

  1. 接下来再看第二个,为啥一次拿三个

从上面的分析可以看出,如果我一次只拿一个,那么我抢占到的几率并不太大,特别是当实例比较多时,可能会做多次的无效操作;为了减少这个可能性,所以我一次多拿几个做备选,这样抢占到的概率就会高一些,至于为什么是3,这个就看实际的实例与定时任务的执行间隔了。

上面定义了如何获取延迟队列中已到期的消息,接下来需要定时轮训获取消息:

/**
 * 每5s定时轮训消息
 */
@Scheduled(fixedRate = 5000)
public void schedule() {
   for (String specialTopic : topic) {
       String msg = redisDelayQueue.getDelayOne(specialTopic);
       logger.info("开始轮训获取消息 {}", msg);
       if (StringUtil.isNotEmpty(msg)) {
       	   //使用Spring推送事件处理
           applicationContext.publishEvent(new DelayMsg(this, msg, specialTopic));
       }
   }
}

上面的定时任务,直接借助Spring的@Schedule来实现,遍历所有的topic,捞出数据之后,通过spring的 event/listener事件机制来实现消息处理的解耦

3.5 定义消费者注解和切面处理

@Target({ElementType.METHOD, ElementType.ANNOTATION_TYPE})
@Retention(RetentionPolicy.RUNTIME)
@Documented
@EventListener
public @interface Consumer {
    String topic();
}

注意这个注解上面还有 @EventListener,表明它可以监听的spring的事件

3.6 定义延时业务的切面处理

@Aspect
@Component
public class ConsumerAspect {

    @Around("@annotation(consumer)")
    public Object around(ProceedingJoinPoint joinPoint, Consumer consumer) throws Throwable {
        Object[] args = joinPoint.getArgs();
        boolean check = false;
        for (Object obj : args) {
            if (obj instanceof DelayMsg) {
                check = consumer.topic().equals(((DelayMsg) obj).getTopic());
            }
        }
        if (!check) {
            // 不满足条件,直接忽略
            return null;
        }
        // topic匹配成功,执行
        return joinPoint.proceed();
    }
}

3.7 消息监听

	//使用自定义的consumer注解监听topic延迟队列
    @Consumer(topic = RedisKeyConstant.DELAY_QUEUE)
    public void consumer(DelayMsg delayMsg) {
        logger.info("预约单延时确认: " + delayMsg.getMsg() + " at:" + System.currentTimeMillis());
        //延迟业务具体实现
        //...
        //...
    }

3.8 业务facade层调用延迟处理

经过以上的延迟队列封装处理,在facade层,也就是我们的业务中就可以直接调用:

@Autowired
private DelayListWrapper delayListWrapper;
...
delayListWrapper.publish(RedisKeyConstant.DELAY_QUEUE, xxxId, xxx);

4 总结

本文以redis的zset来实现延时队列,并基于SpringBoot实现了延迟队列的推送和消费。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/875443.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Linux目录结构(快速了解)

linux目录结构 核心 1.Linux一切皆文件 2.只有一个顶级目录&#xff0c;而windows分C盘、D盘等 目录结构 目录含义&#xff08;与windows进行比对&#xff09; Linux含义windows/bin所有用户可用的基本命令存放的位置windows无固定的命令存放目录/bootlinux系统启动的时候需要…

JaCoCo助您毁灭线上僵尸代码 | 京东物流技术团队

一. 现状问题 随着需求不断迭代&#xff0c;业务系统的业务代码突飞猛进&#xff0c;在你自豪于自己的代码量产出很高时&#xff0c;有没有回头看看线上真正的客户使用量又有多少呢&#xff1f; 费事费力耗费大量人力&#xff0c;成本上线的功能&#xff0c;可能一年没人使用…

一文讲透!请收下这份“完美”地图贴图的制作攻略

3D渲染中&#xff0c;我们称传入材质的纹理为贴图。贴图一词强调其用途&#xff0c;当某个纹理用于在材质中实现法线效果时&#xff0c;我们称之为法线贴图。而在EasyV中&#xff0c;我们地图组件填充样式中的自定义上传的图片称之为「地图贴图」&#xff0c;主要用于地图表面/…

多线程进阶

多线程进阶 本章博客主要是围绕一些多线程相关的面试题&#xff0c;讨论的内容都是往年同学遇到的原题&#xff0c;以后面试也大概率会遇到的&#xff01;&#xff01;&#xff01; 常见的锁策略 锁策略指的不是某个具体的锁&#xff0c;是一个抽象的概念&#xff0c;描述的…

VS2019 设置注释和取消注释选选定内容

Microsoft Visual Studio Professional 2019 由于老是忘记&#xff0c;换了电脑就不行了&#xff0c;原来默认的太繁琐。 每次都去设置选定内容&#xff0c;老是不行。 应该设置在切换块注释 上面这样就可以按一个组合键实现注释和不注释了。

考研408 | 【计算机网络】 传输层

导图 传输层的功能 传输层的两个协议 传输层的寻址与端口 UDP协议 UDP的主要特点 UDP首部格式&#xff1a; UDP校验&#xff1a; TCP协议 TCP协议的特点 TCP报文段首部格式 TCP连接管理 TCP的连接建立 SYN洪泛攻击 TCP的连接释放 TCP可靠传输 序号&#xff1a; 确认&#xff1…

【Rust】Rust学习 第十二章一个 I/O 项目:构建一个命令行程序

本章既是一个目前所学的很多技能的概括&#xff0c;也是一个更多标准库功能的探索。我们将构建一个与文件和命令行输入/输出交互的命令行工具来练习现在一些你已经掌握的 Rust 技能。 Rust 的运行速度、安全性、单二进制文件输出和跨平台支持使其成为创建命令行程序的绝佳选择…

回归预测 | MATLAB实现基于LSSVM-Adaboost最小二乘支持向量机结合AdaBoost多输入单输出回归预测

回归预测 | MATLAB实现基于LSSVM-Adaboost最小二乘支持向量机结合AdaBoost多输入单输出回归预测 目录 回归预测 | MATLAB实现基于LSSVM-Adaboost最小二乘支持向量机结合AdaBoost多输入单输出回归预测预测效果基本介绍模型描述程序设计参考资料 预测效果 基本介绍 1.Matlab实现L…

C++ 动态规划经典案例解析之最长公共子序列(LCS)_窥探递归和动态规划的一致性

1. 前言 动态规划处理字符相关案例中&#xff0c;求最长公共子序列以及求最短编辑距离&#xff0c;算是经典中的经典案例。 讲解此类问题的算法在网上一抓应用一大把&#xff0c;即便如此&#xff0c;还是忍不住有写此文的想法。毕竟理解、看懂都不算是真正掌握&#xff0c;唯…

侯捷 C++ part2 兼谈对象模型笔记——6 多态 虚机制

6 多态 虚机制 6.1 虚机制 当类中有虚函数时&#xff08;无论多少个&#xff09;&#xff0c;其就会多一个指针—— vptr 虚指针&#xff0c;其会指向一个 vtbl 虚函数表&#xff0c;而 vtbl 中有指针一一对应指向所有的虚函数 有三个类依次继承&#xff0c;其中A有两个虚函…

​LeetCode解法汇总617. 合并二叉树

目录链接&#xff1a; 力扣编程题-解法汇总_分享记录-CSDN博客 GitHub同步刷题项目&#xff1a; https://github.com/September26/java-algorithms 原题链接&#xff1a;力扣&#xff08;LeetCode&#xff09;官网 - 全球极客挚爱的技术成长平台 描述&#xff1a; 给你两棵二…

智能制造感知产品在工业4.0中的应用

在工业4.0时代&#xff0c;智能制造已经成为制造行业的重要发展方向。智能制造感知产品作为智能制造的核心组成部分&#xff0c;对于提高制造效率、降低成本、提升产品质量等方面具有重要的作用。本文将详细介绍智能制造感知产品在工业4.0中的应用。 智能制造感知产品在工业4.…

磁场是灵魂散发出来的力量

为什么有些人的思维很敏捷&#xff0c;但是&#xff0c;到了另外一个人面前&#xff0c;他的思维会突然错乱或停顿了呢&#xff1f; 每一个人的磁场都有一种释放和吸收的功能&#xff0c;如果经常和磁场比较污浊的人接触&#xff0c;他也会把我们的磁场给染污了。如果跟一个磁场…

IDEA部署配置Maven项目教程,IDEA配置Tomcat(2019.3.3)(2023.1.3)

我们往往会用到多版本的IDEA进行一个Maven项目配置部署&#xff0c;还有tomcat的配置&#xff0c;这里就有你需要的&#xff0c;有低版本的&#xff0c;也有高版本的&#xff0c;根据自己的情况来进行一个操作 一、前言 当涉及到软件开发和项目管理时&#xff0c;使用一个可靠的…

iPhone恢复备忘录的4种方法!超好用!

iPhone备忘录能够帮助我们记录一些重要的事务或者个人事项&#xff0c;帮助减少遗漏和失误。小编也常常使用iPhone备忘录来记录事情&#xff0c;避免自己忘记。但有时候可能会因为自己的操作失误&#xff0c;导致备忘录误删除或丢失&#xff0c;那么这时候该怎么办呢&#xff1…

电子行业精密空调监控,这个方法非常全面!

在电子行业&#xff0c;精密空调监控扮演着至关重要的角色。电子设备的制造、储存和运行过程中&#xff0c;恒定的环境条件如温度、湿度和空气质量对于确保设备的高效运行和稳定性至关重要。 由于许多电子元件对环境变化极为敏感&#xff0c;因此精密空调监控成为了维护产品质量…

使用UDP协议实现—翻译服务器

目录 前言 1.设计思路&#xff1a; 2.词库设计 3.设计客户端 4.设计服务端 5.编译客户端和服务端 6.测试结果 7.总结 前言 上一篇文章中&#xff0c;我们使用UDP协议编码完成了一个简单的服务器&#xff0c;实现数据通信&#xff0c;服务器设计出来后目的不仅仅只是实现…

C#软件外包开发框架

C# 是一种由微软开发的多范式编程语言&#xff0c;常用于开发各种类型的应用程序&#xff0c;从桌面应用程序到移动应用程序和Web应用程序。在 C# 开发中&#xff0c;有许多框架和库可供使用&#xff0c;用于简化开发过程、提高效率并实现特定的功能。下面和大家分享一些常见的…

VeraCard已经上线了 — 快来领取你的吧!

VeraCard现在已经可以订购&#xff0c;并为英国居民积极发货&#xff01;VeraCard是一张Visa借记卡&#xff0c;是我们与DAMEX合作推出的。Verasity是在DAMEX上列出的第一个替代币&#xff0c;并且是第一个拥有自己品牌卡片的代币。有了VeraCard&#xff0c;Verasity社区成员现…

OpenCV图像处理——形态学操作

目录 连通性形态学操作腐蚀和膨胀开闭运算礼帽和黑帽 连通性 形态学操作 形态学转换是基于图像形状的一些简单操作。它通常在二进制图像上执行。腐蚀和膨胀时两个基本的形态学运算符。然后它的变体形式如开运算&#xff0c;闭运算&#xff0c;礼帽黑帽等 腐蚀和膨胀 cv.erode…