redis和rabbitmq实现延时队列

news2024/11/28 10:40:39

redis和rabbitmq实现延时队列

  • 延迟队列使用场景
  • Redis中zset实现延时队列
  • Rabbitmq实现延迟队列

延迟队列使用场景

1. 订单超时处理
延迟队列可以用于处理订单超时问题。当用户下单后,将订单信息放入延迟队列,并设置一定的超时时间。如果在超时时间内用户未支付订单,消费者会从延迟队列中获取到该订单,并执行相应的处理操作,如取消订单、释放库存等。

2. 优惠券过期提醒
延迟队列可以用于优惠券的过期提醒功能。将即将过期的优惠券信息放入延迟队列,并设置合适的延迟时间。当延迟时间到达时,消费者将提醒用户优惠券即将过期,引导用户尽快使用。

3. 异步通知与提醒
延迟队列可以用于异步通知和提醒功能。例如,当用户完成某个操作后,系统可以将相关通知消息放入延迟队列,并设置一定的延迟时间,以便在合适的时机发送通知给用户。

Redis中zset实现延时队列

1. 创建延迟队列服务类

  • 创建一个延迟队列的服务类,例如DelayQueueService,用于操作Redis中的ZSet。这个服务类需要完成以下功能:
  • 将消息放入延迟队列:将消息作为元素添加到ZSet中,设置对应的延迟时间作为分数。轮询并处理已到期的消息:定时任务或者消息消费者轮询检查ZSet中的元素,获取到达指定时间的消息进行处理。删除已处理的消息:处理完消息后,从ZSet中将其删除。
@Service
public class DelayQueueService {
    private static final String DELAY_QUEUE_KEY = "delay_queue";


    @Autowired
    private RedisTemplate<String, String> redisTemplate;

    public void addToDelayQueue(String message,long delayTime){
        redisTemplate.opsForZSet().add(DELAY_QUEUE_KEY,message,System.currentTimeMillis()+delayTime);
    }

    public void processDelayedMessage(){
        //reverseRangeByScore 从高到低
        //rangeByScore 从低到高
        Set<String> messages = redisTemplate.opsForZSet().rangeByScore(DELAY_QUEUE_KEY, 0, System.currentTimeMillis());
        for(String message:messages){
            //处理消息
            System.out.println(message);
            redisTemplate.opsForZSet().remove(DELAY_QUEUE_KEY,message);
        }

    }
}

2. 配置定时任务或消息消费者
使用Spring Boot的定时任务或消息队列框架,定时调用延迟队列服务类的轮询方法或监听指定的消息队列,可以将轮训粒度放到1s一次。

@Component
public class DelayQueueSchedule {
    @Autowired
    private DelayQueueService delayQueueService;


    // 每隔一段时间进行轮询并处理延迟消息
    @Scheduled(fixedDelay = 1000)
    public void pollAndProcessDelayedMessages() {
        delayQueueService.pollAndProcessDelayedMessages();
    }
}

然后在启动类上通过@EnableScheduling注解开启任务调度能力。

缺点:
使用ZSET(有序集合,Sorted Set)来实现延迟任务调度(如订单超时取消)是一种有效的方法,但它也有一些缺点和限制:

  1. 内存消耗:ZSET 在Redis中是一个有序集合,它需要占用一定的内存来存储成员和分数。如果你需要存储大量的延迟任务,可能会导致内存消耗较大。这可能会对Redis服务器的性能和成本产生影响,特别是在大规模应用中。
  2. 不适用于大规模延迟任务:ZSET 可以处理相对较小数量的延迟任务,但当需要管理大规模延迟任务队列时,可能会导致性能下降。在这种情况下,需要考虑更高效的延迟队列解决方案,例如使用分布式消息队列。
  3. 无法动态修改延迟时间: 一旦将任务添加到ZSET中,你不能轻松地修改任务的延迟时间。如果需要在任务已经添加后更改延迟时间,可能需要复杂的操作。
  4. 没有重试机制:ZSET 只能用于一次性延迟任务,无法自动处理任务失败后的重试。如果任务在执行时失败,你需要自己实现重试逻辑。
  5. 没有持久化: Redis是内存数据库,如果Redis服务器重启或发生故障,已添加的延迟任务数据将丢失。虽然可以通过Redis持久化机制来部分解决这个问题,但仍然存在一定风险。
  6. 复杂性增加: 使用ZSET来管理延迟任务队列需要编写复杂的代码来处理任务的添加、检索和删除。这可能增加应用程序的复杂性。

Rabbitmq实现延迟队列

死信,顾名思义就是无法被消费的消息。一般来说,producer 将消息投递到 broker 或者直接到queue 里了,consumer 从 queue 取出消息进行消费,但某些时候由于特定的原因导致queu 中的某些消息无法被消费,这样的消息如果没有后续的处理,就变成了死信,有死信自然就有了死信队列。

列出2种实现方式。
(1)使用Time To Live(TTL) + Dead Letter Exchanges(DLX)死信队列组合实现延迟队列的效果。
(2)使用RabbitMQ官方延迟插件rabbitmq_delayed_message_exchange,实现延时队列效果。

由于TTL(生存时间)过期导致的死信,就是我们实现延迟队列的的方式。
我们需要声明如下形式的交互机和队列,以及对应的routing key,并进行绑定:
请添加图片描述
上图绑定的代码如下所示

@Configuration
public class DeadQueueConfig {
    //普通交换机及队列
    public static final String X_EXCHANGE = "X";
    public static final String QUEUE_A = "QA";
    public static final String QUEUE_B = "QB";
    //死信交换机及队列
    public static final String Y_DEAD_LETTER_EXCHANGE = "Y";
    public static final String DEAD_LETTER_QUEUE = "QD";
    //通用队列
    public static final String QUEUE_C = "QC";


    // 声明 xExchange
    @Bean("xExchange")
    public DirectExchange xExchange() {
        return new DirectExchange(X_EXCHANGE);
    }

    //声明队列 A ttl 为 10s 并绑定到对应的死信交换机
    @Bean("queueA")
    public Queue queueA() {
        Map<String, Object> args = new HashMap<>(3);
        //声明当前队列绑定的死信交换机
        args.put("x-dead-letter-exchange", Y_DEAD_LETTER_EXCHANGE);
        //声明当前队列的死信路由 key
        args.put("x-dead-letter-routing-key", "YD");
        //声明队列的 TTL
        args.put("x-message-ttl", 10000);
        return QueueBuilder.durable(QUEUE_A).withArguments(args).build();
    }
    //声明队列A绑定X交换机  路由为XA
    @Bean
    public Binding queueABingX(@Qualifier("queueA") Queue queueA,
                               @Qualifier("xExchange") DirectExchange xExchange){
        return BindingBuilder.bind(queueA).to(xExchange).with("XA");
    }

    //声明队列 B ttl 为 40s 并绑定到对应的死信交换机
    @Bean("queueB")
    public Queue queueB() {
        Map<String, Object> args = new HashMap<>(3);
        //声明当前队列绑定的死信交换机
        args.put("x-dead-letter-exchange", Y_DEAD_LETTER_EXCHANGE);
        //声明当前队列的死信路由 key
        args.put("x-dead-letter-routing-key", "YD");
        //声明队列的 TTL
        args.put("x-message-ttl", 40000);
        return QueueBuilder.durable(QUEUE_B).withArguments(args).build();
    }

    //声明队列 B 绑定 X 交换机
    @Bean
    public Binding queuebBindingX(@Qualifier("queueB") Queue queue1B,
                                  @Qualifier("xExchange") DirectExchange xExchange) {
        return BindingBuilder.bind(queue1B).to(xExchange).with("XB");
    }

    //声明通用队列C 不设ttl,由消费者决定ttl
    @Bean("queueC")
    public Queue queueC() {
        Map<String, Object> args = new HashMap<>(3);
        //声明当前队列绑定的死信交换机
        args.put("x-dead-letter-exchange", Y_DEAD_LETTER_EXCHANGE);
        //声明当前队列的死信路由 key
        args.put("x-dead-letter-routing-key", "YD");
        return QueueBuilder.durable(QUEUE_C).withArguments(args).build();
    }
    // 声明队列 C 绑定 X 交换机
    @Bean
    public Binding queuecBindingX(@Qualifier("queueC") Queue queueC,
                                  @Qualifier("xExchange") DirectExchange xExchange) {
        return BindingBuilder.bind(queueC).to(xExchange).with("XC");
    }

    // 声明 死信队列交换机
    @Bean("yExchange")
    public DirectExchange yExchange() {
        return new DirectExchange(Y_DEAD_LETTER_EXCHANGE);
    }
    //声明死信队列 QD
    @Bean("queueD")
    public Queue queueD() {
        return new Queue(DEAD_LETTER_QUEUE,true);
    }
    //声明死信队列 QD 绑定关系
    @Bean
    public Binding deadLetterBindingQAD(@Qualifier("queueD") Queue queueD,
                                        @Qualifier("yExchange") DirectExchange yExchange) {
        return BindingBuilder.bind(queueD).to(yExchange).with("YD");
    }

}

其中,QD为死信队列。当QA和QB队列中的消息,达到设定的TTL(10s和40s)后,将进入指定的死信队列QD。该方法缺点就是一个TTL对应一个队列

其中的QC作为通用的队列,即在消费者处指定消息对应的TTL,TTL过期后转入死信队列。使用该通用队列可以避免每增加一个新的时间需求,就要新增一个队列的问题。但该方法由于队列先进先出的性质,会导致一定的问题:

即先发出一个TTL为10s的消息a,进入队列;再马上发出一个TTL为2s的消息b,进入队列。由于队列的性质,会在消息a的TTL结束后,a进入死信队列后,b才会进入死信队列。而不是根据TTL的时间,b比a先进入死信队列。

声明交换机、队列,并绑定成功后,编写死信队列消费者代码;

@Component
@Slf4j
public class DeadQueueConsumer {

    @RabbitListener(queues = "QD")
    public void receiveD(Message message, Channel channel) throws IOException {
        String msg = new String(message.getBody());
        log.info("当前时间:{},收到死信队列信息:{}", new Date().toString(), msg);
    }
}

在controller中编写生产者代码,进行测试:

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @GetMapping("/sendMsg/{message}")
    public String sendMsg(@PathVariable String message){
        log.info("当前时间:{},发送一条信息给两个 TTL 队列:{}", new Date(), message);
        rabbitTemplate.convertAndSend("X", "XA", "消息来自 ttl 为 10S 的队列: " + message);
        rabbitTemplate.convertAndSend("X", "XB", "消息来自 ttl 为 40S 的队列: " + message);
        return "finish";
    }

结果如图:请添加图片描述
测试通用队列QC的效果:

@GetMapping("/send/{message}/{ttlTime}")
    public void sendMsg(@PathVariable String message, @PathVariable String ttlTime) {
        rabbitTemplate.convertAndSend("X", "XC", message, correlationData -> {
            correlationData.getMessageProperties().setExpiration(ttlTime);
            return correlationData;
        });
        log.info("当前时间:{},发送一条时长{}毫秒 TTL 信息给队列 C:{}", new Date(), ttlTime, message);
    }

结果如下图
请添加图片描述

可以看到, 两条消息几乎同时到达死信队列,因为TTL为2s的消息由于被堵在TTL为10s的消息后导致。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1530223.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

【LabVIEW FPGA入门】使用FPGA实现串行同步接口(SSI)

SSI&#xff08;串行同步接口&#xff09;是连接绝对位置传感器和控制器的广泛应用的串行接口。SSI利用控制器发出一个时钟脉冲序列&#xff0c;初始化传感器的门限输出。 传感器不断更新位置数据&#xff0c;并传送到移位寄存器中。在每一个时钟脉冲序列之间&#xff…

了解常见字符函数

乐观学习&#xff0c;乐观生活&#xff0c;才能不断前进啊&#xff01;&#xff01;&#xff01; 我的主页&#xff1a;optimistic_chen 我的专栏&#xff1a;c语言 点击主页&#xff1a;optimistic_chen和专栏&#xff1a;c语言&#xff0c; 创作不易&#xff0c;大佬们点赞鼓…

.NET 异步编程(异步方法、异步委托、CancellationToken、WhenAll、yield)

文章目录 异步方法异步委托async方法缺点CancellationTokenWhenAllyield 异步方法 “异步方法”&#xff1a;用async关键字修饰的方法 异步方法的返回值一般是Task<T>&#xff0c;T是真正的返回值类型&#xff0c;Task<int>。惯例&#xff1a;异步方法名字以 Asy…

浅析ArcGis中的软件——ArcMap、ArcScene、 ArcGlobe、ArcCatalog

为什么要写这么一篇介绍ArcGis的文章呢&#xff1f;因为大部分人也包括ArcGisdada&#xff0c;在使用ArcMap应用程序创建工程时总以为我们就是使用了ArcGis这个软件的所有。其实不然&#xff0c;在后期的接触和使用中慢慢发现原来ArcMap只是ArcGis这个综合平台的一部分&#xf…

HarmonyOS NEXT应用开发之动态路由

介绍 本示例将介绍如何使用动态路由跳转到模块中的页面&#xff0c;以及如何使用动态import的方式加载模块 使用说明 通过动态import的方式&#xff0c;在需要进入页面时加载对应的模块。配置动态路由&#xff0c;通过WrapBuilder接口&#xff0c;动态创建页面并跳转。动态i…

2024.3.19

思维导图 模拟面试 1.友元的作用 答&#xff1a;通过关键字friend&#xff0c;可以让一些函数或者类&#xff0c;可以访问一个类中的私有数据成员。 2.匿名对象的作用 答&#xff1a;匿名对象就是没有名字的对象&#xff0c;是用来给有名对象进行初始化工作的。 3.常成员函…

【S5PV210】 | GPIO编程

【S5PV210】 | GPIO编程 时间:2024年3月17日22:02:32 目录 文章目录 【`S5PV210`】 | `GPIO`编程目录1.参考2.`DataSheet`2.1.概述2.1.1.特色2.1.2 输入/输出配置2.1.3 `S5PV210` 输入/输出类型2.1.4 IO驱动强度**2.1.4.1 类型A IO驱动强度****2.1.4.2 类型A IO驱动强度****2…

安泰电子:前置微小信号放大器是什么东西

前置微小信号放大器是一种用于放大微弱信号的设备&#xff0c;在电子和通信领域中有广泛的应用。它的主要功能是将输入的微小信号放大到足够的水平&#xff0c;以便后续电路能够准确地测量、处理和分析这些信号。本文将详细介绍前置微小信号放大器的原理、组成部分和应用领域。…

目标检测——PP-PicoDet算法解读

PP-YOLO系列&#xff0c;均是基于百度自研PaddlePaddle深度学习框架发布的算法&#xff0c;2020年基于YOLOv3改进发布PP-YOLO&#xff0c;2021年发布PP-YOLOv2和移动端检测算法PP-PicoDet&#xff0c;2022年发布PP-YOLOE和PP-YOLOE-R。由于均是一个系列&#xff0c;所以放一起解…

AutoSAR配置与实践(深入篇)10.3 CANTP 传输流程和通信示例

AutoSAR配置与实践(深入篇)10.3 CANTP 通信示例 CANTP 通信示例一、诊断传输流程1.1上位机请求流程1.2 ECU反馈流程二、CANTP 通信示例2.1 通信交互详解CANTP 通信示例 ->返回总目录<- 一、诊断传输流程 1.1上位机请求流程 Step 1. Tester(诊断上位机)通过物理总线…

线程,你真的懂了吗?

大家都知道的是线程其实分为的是内核级线程和用户级线程&#xff0c;这几天在看线程的时候&#xff0c;突然有一种感觉不太明白的地方&#xff0c;那就是linux中pthread.h这个库中的线程到底是用户级还是内核级&#xff0c;后来在网上也搜了很多的例子。我自我认为是看不懂的&a…

科技助力高质量发展:新质生产力的崛起与企业数字化转型

引言 随着科技的飞速发展&#xff0c;我们正逐渐步入数字化智能时代&#xff0c;这个时代不仅为企业带来了无限的机遇&#xff0c;也让其面对前所未有的挑战。在这个快速变革的时代&#xff0c;企业必须不断调整自己的经营策略&#xff0c;适应数字化转型的浪潮&#xff0c;以…

阿里云部署MySQL、Redis、RocketMQ、Nacos集群

文章目录 &#x1f50a;博主介绍&#x1f964;本文内容MySQL集群配置云服务器选购CPU选择内存选择云盘选择ESSD AutoPL云盘块存储性能&#xff08;ESSD&#xff09; 镜像选择带宽选择密码配置注意事项 搭建宝塔面板方便管理云服务器云服务器的安全组安装docker和docker-compose…

MyBatis记录

目录 什么是MyBatis MyBatis的优点和缺点 #{}和${}的区别 Mybatis是如何进行分页的&#xff0c;分页插件的原理 Mybatis是如何将sql执行结果封装为目标对象并返回的 MyBatis实现一对一有几种方式 Mybatis设计模式 什么是MyBatis &#xff08;1&#xff09;Mybatis是一个…

Unbuntu20.04 git push和pull相关问题

文章目录 Unbuntu20.04 git push和pull使用&#xff11;&#xff0e;下载[Git工具包](https://git-scm.com/downloads)&#xff12;&#xff0e;建立本地仓库&#xff13;&#xff0e;将本地仓库与github远程仓库关联&#xff14;&#xff0e;将本地仓库文件上传到github远程仓…

Tcl学习笔记(一)——环境搭建及基本语法

一、Tcl简介 TCL&#xff08;Tool Command Language&#xff0c;即工具命令语言&#xff09;是一种解释执行的脚本语言。所谓解释执行语言&#xff0c;是指其不需要通过编译和联结&#xff0c;而是直接对每条语句进行顺序解释、执行。 TCL包含语言和工具库&#xff0c;TCL语言主…

Modbus TCP转Profinet网关如何实现Modbus主站与多设备通讯

在工业控制领域中&#xff0c;Modbus TCP转Profinet网关&#xff08;XD-ETHPN20&#xff09;扮演着连接不同设备间通讯的重要角色。当将Modbus主站与十几台服务器进行通讯时&#xff0c;通过modbus tcp转profinet网关&#xff08;XD-ETHPN20&#xff09;设备将不同协议间的数据…

【YOLOv5改进系列(2)】高效涨点----Wise-IoU详细解读及使用Wise-IoU(WIOU)替换CIOU

WIOU损失函数替换 &#x1f680;&#x1f680;&#x1f680;前言一、1️⃣ Wise-IoU解读---基于动态非单调聚焦机制的边界框损失1.1 &#x1f393; 介绍1.2 ✨WIOU解决的问题1.3 ⭐️论文实验结果1.4 &#x1f3af;论文方法1.4.1☀️Wise-IoU v11.4.2☀️Wise-IoU v21.4.3☀️…

PySpark案例实战

一、前言介绍 二、基础准备 # 导包 from pyspark import SparkConf,SparkContext #创建SparkConf类对象 confSparkConf().setMaster("local[*]").setAppName("test_spark_app") #基于SparkXConf类对象创建SparkContext对象 scSparkContext(confconf) #打印…

在线播放视频网站源码系统 带完整的安装代码包以及搭建教程

在线播放视频网站源码系统的开发&#xff0c;源于对当前视频市场的深入洞察和用户需求的精准把握。随着视频内容的爆炸式增长&#xff0c;用户对视频播放的需求也日益多样化。他们希望能够随时随地观看自己感兴趣的视频内容&#xff0c;同时还希望能够在观看过程中享受到流畅、…