延迟队列实现方案总结

news2025/1/2 4:06:09

日常开发中,可能会遇到一些延迟处理的消息任务,例如以下场景

①订单支付超时未支付
②考试时间结束试卷自动提交
③身份证或其他验证信息超时未提交等场景。
④用户申请退款,一天内没有响应默认自动退款等等。

如何处理这类任务,最简单的方法就是将消息插入到数据库,然后使用定时任务扫描数据库。但是如果如果大量用户请求需要处理,就需要线程频繁的连接数据库,这样可能会对其他数据库请求造成影响,这样情况下我们可以使用延迟队列方式解决此类问题。

1.DelayQueue实现方案

首先使用java自带的DelayQueue完成此方案。
DelayQueue内部使用优先级队列PriorityQueue完成任务存储,而 PriorityQueue 采用二叉堆的思想确保在数据插入到队列中时最小值的排在堆顶,每次从拿数据只要从堆顶取即可。
同时 DelayQueue 还是用了可重入锁 ReentrantLock来确保线程并发安全。
DelayQueue的源码解析可以查看DelayQueue源码解析
使用DelayQueue完成延迟需要定义Delayed 实现类来充当任务元素,具体使用方法:

//DelayQueue的元素必须是Delayed的实现类
class DelayTask implements Delayed {
   private long time;
   private Consumer consumer;

   public DelayTask(long time, Consumer consumer) {
       this.time = time;
       this.consumer = consumer;
   }

   @Override
   public long getDelay(TimeUnit unit) {
       return time - System.currentTimeMillis();
   }

   @Override
   public int compareTo(Delayed o) {
       DelayTask delayTask = (DelayTask) o;
       return (int) (time - delayTask.getTime());
   }

   public long getTime() {
       return time;
   }

   public void call() {
       this.consumer.accept(this);
   }
}

public class QueueTest {
   public static void main(String[] args) throws InterruptedException {
       long startTime = System.currentTimeMillis();
       DelayTask d1 = new DelayTask(10000 + startTime, (o) -> System.out.println("3333"));
       DelayTask d2 = new DelayTask(1000 + startTime, (o) -> System.out.println("1111"));
       DelayTask d3 = new DelayTask(2000 + startTime, (o) -> System.out.println("2222"));

       DelayQueue<DelayTask> delayQueue = new DelayQueue<>();
       delayQueue.add(d1);
       delayQueue.add(d2);
       delayQueue.add(d3);
       while (!delayQueue.isEmpty()) {
           //阻塞等待,如果有任务到期就取出,如果没有任务到期就等待
           DelayTask delayTask = delayQueue.take();
           delayTask.call();
       }
   }
}

getDelay()方法用于从队列中取任务时查看是否到期,如果小于等于0则表示可以取出,如果大于,当前线程需要根据是否是leader判断等待时间。
compareTo()方法用于任务入队时,判断该任务元素在堆位置时比较的逻辑。
Consumer consumer;存储实际执行的任务,也可以使用Runnable,Callable以及其他自定义类。

note:该方法支持动态添加和删除任务,而且线程安全,但是只适用于单机环境,而且需要自己定义查询逻辑,实现稍微复杂。

2.定时任务实现方案

通过线程池对象ScheduledExecutorService也可以实现延迟处理任务的功能,而且操作更简单。ScheduledExecutorService是jdk提供的类来完成指定时间或定期执行某些任务。代码如下:

class Task implements Callable {
   private int idx;

   public Task(Integer idx) {
       this.idx = idx;
   }

   @Override
   public Object call() throws Exception {
       System.out.println("---" + this.idx);
       return null;
   }
}

public class DelayedTest {
   public static void main(String[] args) {
       ScheduledExecutorService scheduledExecutorService = new ScheduledThreadPoolExecutor(10);
       scheduledExecutorService.schedule(new Task(3), 1, TimeUnit.SECONDS);
       scheduledExecutorService.schedule(new Task(2), 2, TimeUnit.SECONDS);
       scheduledExecutorService.schedule(new Task(1), 1, TimeUnit.SECONDS);
   }
}

ScheduledExecutorService继承了ExecutorService,与ExecutorService的逻辑大致相同
schedule()方法是ScheduledExecutorService特有的方法,这个方法会将我们定义的Task封装成ScheduledFutureTask
②然后生成Worker线程(内部存在一个Thread,是真正的执行类)
Worker类
③Worker线程在执行的时候会先判断当前firstTask(就是要执行的Runnable)属性是否为空,如果有就先执行firstTask,执行完成firstTask之后,然后再从workQueue中取任务,红字也是ExecutorService 的执行逻辑,
但是ScheduledExecutorService的 schedule()会先生成null 的firstTask,Worker会直接从workQueue中阻塞的获取任务
Worker类获取task并执行的逻辑
workQueue获取task的逻辑

④workQueue是在我们new对象的时候生成的DelayedWorkQueue,它的逻辑定义和DelayedQueue基本相同,下面是DelayedWorkQueuetake()方法的代码逻辑

public ScheduledThreadPoolExecutor(int corePoolSize) {
       super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
             new DelayedWorkQueue());
}

DelayedWorkQueue take()方法代码

scheduleAtFixedRate()是在Worker执行完task后,计算任务下次的执行时间,并重新将任务放入workQueue中来实现循环执行。下面的代码就是ScheduledFutureTask再执行过程中判断逻辑,如果periodic是true则执行run方法。如果period是false则执行计算下次执行时间和重新放入任务的逻辑。
定时器ScheduledExecutorService原理分析
ScheduledFutureTask中run()方法逻辑

note:ScheduledExecutorService是jdk提供的非常方便的延迟消息处理类,支持多线程处理消息,前一个任务的阻塞并不会影响下一个任务的运行。内部使用的是类似DelayQueue的逻辑,而且不需要再实现轮询过程。但是它和DelayQueue一样,只能单机使用。

3.Redis实现方案

通过Redis的Zset结构也可以实现延迟队列的功能。通过将过期时间的时间戳作为score存入Zset中,然后调用zrangebyscore key 0 当前时间命令定时扫描Zset数据,如果返回的结果那一定是已经过期的数据,然后再执行删除命令删除指定的key-value。为了防止多线程执行过程中可能存在的问题,需要配置lua脚本使用。

//lua脚本:定义查询zset数据和删除数据的原子操作
//定义查询的最大值和最小值
local minscore = ARGV[1]
local maxscore = ARGV[2]
local key = KEYS[1]

local tables = redis.call("zrangebyscore", key, minscore, maxscore)

for i, value in ipairs(tables) do
   redis.call("zrem", key, value)
end

return tables
   //java代码:定义轮询线程
   @Test
   public void test() throws InterruptedException {
       String script = "lua脚本";
       String key = "test";

       long time = System.currentTimeMillis();
       redisTemplate.opsForZSet().add(key, "123", time + 1000 * 3);

       Thread scanExpireThread = new Thread(() -> {
           System.out.println("开始扫描过期数据...");
           while (true) {
               try {
                   long currentTime = System.currentTimeMillis();
                   long count = redisTemplate.opsForZSet().count(key, 0, currentTime);
                   if (count == 0) {
                       // 查询最小等待时间并睡眠,减少cpu空转
                       sleep(key);
                   }
                   System.out.println("获取数据...");

                   RedisScript redisScript = new DefaultRedisScript(script, List.class);
                   List<String> expireList = (List) redisTemplate.execute(redisScript, Arrays.asList(key), 0, >System.currentTimeMillis());
                   System.out.println(expireList);

                   if (!CollectionUtils.isEmpty(expireList)) {
                       String msg = RandomStringUtils.random(3, "1234567890");
                       Integer delayedTime = RandomUtils.nextInt(0, 10);
                       System.out.println("随机生成延迟信息:" + msg + ", 延迟时间:" + delayedTime);
                       redisTemplate.opsForZSet().add(key, msg, System.currentTimeMillis() + 1000 * >delayedTime);
                   } else {
                       // 查询最小等待时间并睡眠,减少cpu空转
                       sleep(key);
                   }

                   TimeUnit.SECONDS.sleep(1);
               } catch (InterruptedException e) {
                   e.printStackTrace();
              }
           }
       });
       scanExpireThread.start();

       TimeUnit.MINUTES.sleep(10);
   }

   public void sleep(String key) throws InterruptedException {
      Set<DefaultTypedTuple> objs = redisTemplate.opsForZSet().rangeWithScores(key, 0, 0);
      for (DefaultTypedTuple<String> typedTuple : objs) {
           Long minTime = typedTuple.getScore().longValue();

           long diffTime = minTime - System.currentTimeMillis();
           TimeUnit.MINUTES.sleep(diffTime / (1000 * 60));
       }
   }

以上的示例代码只是延迟队列的简单时间。并没有考虑任务失败重试的问题。而且上面的方案还可以优化,比如当获取的元素的集合是空的时候,可以使用LockSupport.park()阻塞线程。只有有延迟任务被推送到redis中时,才重新唤醒轮询线程,避免轮询线程空转。

note:redis实现版本中,并发性高。需要自己定义轮询线程。在消息量较少的时候,会浪费资源,在消息量非常多的时候,又会出现因为轮询间隔设置不合理导致延时时间不准确的问题。

######4.Rabbitmq/Rocketmq实现
很多MQ消息中间件自带延迟消息功能,如果系统本身RocketMQ组件,则可以使用MQ来完成。不仅使用方便,而且可能存在的诸多细节问题。
RocketMQ

RocketMQ本身支持延迟消息功能,但是RocketMQ4.x只支持固定级别的延迟消息,并没有自定义延迟时间的功能。如果想实现自定义延迟消息的功能,可以使用Rocket5.x或者RabbitMQ

实现原理:RocketMQ实现延迟消息的过程是先将消息写入到SCHEDULE_TOPIC_XXXX的topic中,然后根据 level 存入特定的queue,每个queue都有一个调度线程消费消息,如果发现消息到期,就会将消息投递到指定的topic中。
rocketmq延迟消息过程

以下是RocketMQ5.x文档中的示例程序:

//生产者 延时消息发送
MessageBuilder messageBuilder = new MessageBuilderImpl();;
//以下示例表示:延迟时间为10分钟之后的Unix时间戳。
Long deliverTimeStamp = System.currentTimeMillis() + 10L * 60 * 1000;
Message message = messageBuilder.setTopic("topic")
   //设置消息索引键,可根据关键字精确查找某条消息。
   .setKeys("messageKey")
   //设置消息Tag,用于消费端根据指定Tag过滤消息。
   .setTag("messageTag")
   .setDeliveryTimestamp(deliverTimeStamp)
   //消息体
   .setBody("messageBody".getBytes())
   .build();
try {
   //发送消息,需要关注发送结果,并捕获失败等异常。
   SendReceipt sendReceipt = producer.send(message);
   System.out.println(sendReceipt.getMessageId());
} catch (ClientException e) {
   e.printStackTrace();
}

//消费示例一:使用PushConsumer消费定时消息,只需要在消费监听器处理即可。
MessageListener messageListener = new MessageListener() {
   @Override
   public ConsumeResult consume(MessageView messageView) {
       System.out.println(messageView.getDeliveryTimestamp());
       //根据消费结果返回状态。
       return ConsumeResult.SUCCESS;
   }
};

note:RocketMQ的实现版本性能较好,可靠性较高,但是不支持动态添加或删除队列。

RabbitMQ
RabbitMQ也可以根据自身的特性实现延迟消息的功能。比如利用RabbitMQ的TTL和DLX特性
TTL是指存活时间(可以作用在消息中,也可以作用在队列中)
DLX是指死信队列(是指消息被拒绝或者消息过期后存放的)

使用TTL与DLX存在的问题:
1)TTL作用在队列中,需要为每一个延迟时间定义一种队列,灵活性太差。
2)TTL作用在消息上,消息是在即将投递到消费者之前判定是否过期的,所以如果前一个消息阻塞了太长,将导致后面的消息不能即时的被执行。

而且使用上面的方式需要定义普通交换机和死信交换机,所以一般使用延迟消息插件 rabbitmq-delayed-message-exchange来完成。使用插件生成的消息不会立即进入对应队列,而是先将消息保存至 Mnesia (RabbitMQ中的一种数据存储形式) ,然后插件会尝试确认是否过期,再投递到对应绑定的队列之中
下载地址: rabbitmq-delayed-message-exchange
插件使用步骤:

  • 下载延迟插件,然后解压放置到 RabbitMQ 的插件目录。注意一定要解压并且把版本名字取消
    如果是使用docker安装的rabbitmq,可以使用docker cp rabbitmq_delayed_message_exchange containerId:/RabbitMQ_HOME/plugins/拷贝到RabbitMQ容器中。
  • 进入 RabbitMQ 的安装目录下的 plgins 目录,执行下面命令让该插件生效rabbitmq-plugins enable rabbitmq_delayed_message_exchange
    可以使用rabbitmq-plugins list查看生效的插件,插件前面带上*号的才是真正生效的插件
    RabbitMQ启用插件
  • 最后重启 RabbitMQ,就可以在管理界面看到Type是x-delayed-messageexchange
    RabbitMQ管理界面
    更加详细的安装步骤可以查看链接 RabbitMQ 学习笔记 – 13 使用插件方式实现延迟队列

接下来就可以测试RabbitMQ的延迟消息功能了,以下是示例代码:
Rabbitmq配置

@Configuration
public class RabbitMqConfig {
   //定义队列交换机,队列,路由
   public static final String DELAYED_QUEUE = "delayed_queue";
   public static final String DELAYED_EXCHANGE = "delayed_exchange";
   public static final String DELAYED_ROUTINGKEY = "delayed_test";

   @Bean(DELAYED_EXCHANGE)
   public Exchange DELAYED_EXCHANGE() {
       HashMap<String, Object> map = new HashMap<>(1);
       map.put("x-delayed-type", "direct");
       return new CustomExchange(DELAYED_EXCHANGE, "x-delayed-message", true, false, map);
   }

   @Bean(DELAYED_QUEUE)
   public Queue DELAYED_QUEUE() {
       return new Queue(DELAYED_QUEUE);
   }

   //队列绑定交换机,
   @Bean
   public Binding >BINDING_DELAYED_QUEUE(@Qualifier(DELAYED_QUEUE) Queue queue,
                                    @Qualifier(DELAYED_EXCHANGE) Exchange exchange) {
       return BindingBuilder.bind(queue).to(exchange).with(DELAYED_ROUTINGKEY).noargs();
   }
}

延迟消息生产者

@RestController
public class TestController {
   @Autowired
   private RabbitTemplate rabbitTemplate;

   @RequestMapping("/test")
   public String test(@RequestParam(required = false) Integer delay) {
       rabbitTemplate.convertAndSend(RabbitMqConfig.DELAYED_EXCHANGE, RabbitMqConfig.DELAYED_ROUTINGKEY,
               "hello", msg -> {
                   //设置消息的延迟时间
                   msg.getMessageProperties().setDelay(delay * 1000);
                   //设置优先级
                   msg.getMessageProperties().setPriority(9);
                   //设置消息的持久化方式
                   >msg.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);
                   //设置唯一标识
                   msg.getMessageProperties().setMessageId(UUID.randomUUID().toString());
                   return msg;
               });

       return "success";
   }
}

延迟消息消费者

@Component
public class RabbitmqHandler {
   //监听delayed_test队列
   @RabbitListener(queues = {RabbitMqConfig.DELAYED_QUEUE})
   public void receive_delayed_test(Message message, Channel channel) {
       System.out.println("----delayed_test----");
       System.out.println("properties:" + message.getMessageProperties().toString());

       System.out.println("body:" + new String(message.getBody()));
       System.out.println();
   }
}

note:RabbitMQ支持集群,分布式,高并发场景,性能较好,可靠性高,不需要自己处理轮询线程。

参照:
延迟队列解决方案
有赞延迟队列设计
盘点JAVA中延时任务的几种实现方式
RabbitMQ之延迟队列

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1152531.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

http1,https,http2,http3总结

1.HTTP 当我们浏览网页时&#xff0c;地址栏中使用最多的多是https://开头的url&#xff0c;它与我们所学的http协议有什么区别&#xff1f; http协议又叫超文本传输协议&#xff0c;它是应用层中使用最多的协议&#xff0c; http与我们常说的socket有什么区别吗&#xff1f; …

2000-2021年上市公司产融结合度量数据

2000-2021年上市公司产融结合度量数据 1、时间&#xff1a;2000-2021年 2、指标&#xff1a;股票代码、年份、是否持有银行股份、持有银行股份比例、是否持有其他金融机构股份、产融结合 3、来源&#xff1a;上市公司年报 4、范围&#xff1a;上市公司 5、样本量&#xff…

4种类型WMS的简要说明

仓库管理系统&#xff08;WMS&#xff09;主要有四种类型&#xff1a;独立仓库管理系统、供应链管理系统中的仓库管理模块、ERP 系统中的仓库管理模块和基于云的仓库管理系统。 独立仓库管理系统 独立仓库管理系统提供的功能可实现日常仓库运营。公司可以使用WMS系统来监管和…

【MATLAB源码-第62期】基于matlab的DCSK(差分混沌移位键控调制)系统误码率仿真。

MATLAB 2022a 1、算法描述 DCSK&#xff08;Differential Chaos Shift Keying&#xff09;是一种差分混沌移位键控调制方式&#xff0c;常用于无线通信系统。其调制和解调的基本流程如下&#xff1a; 1. DCSK调制 1.1 生成混沌序列 - 初始条件&#xff1a;选择一个混沌映射&a…

『K8S 入门』一:基础概念与初步搭建

『K8S 入门』一&#xff1a;基础概念与初步搭建 一、kubernetes 组件 官方示图 抽象示图 Master 控制面板 Api-Server&#xff1a;接口服务&#xff0c;基于 REST 风格开放 k8s 接口的服务ControllerManager cloud-controller-manager&#xff1a;云控制管理器。第三方平…

Android图片加载框架库源码解析 - Coil

文章目录 一、什么是Coil二、引入Coil1、ImageView加载图片1.1、普通加载1.2、crossfade(淡入淡出)加载1.3、crossfade的动画时间1.4、placeholder1.5、error1.6、高斯模糊1.7、灰度变换1.8、圆形1.9、圆角 2、Gif加载3、SVG加载(不存在)4、视频帧加载5、监听下载过程6、取消下…

想翻译pdf文档,试了几个工具对比:有阿里(完全免费,快,好用,质量高,不用注册登录)道最好(有限免费) 百度(有限免费)和谷歌完全免费(网不好)

文档翻释作为基础设施&#xff0c;工作必备。 阿里 &#xff08;完全免费&#xff0c;快&#xff0c;好用&#xff0c;质量高&#xff0c;不用注册登录&#xff0c;无广告&#xff09;我给满分 https://translate.alibaba.com/#core-translation 先选好语言。 Google(完全免…

PDManer生成Postgis对应Schema数据库设计文档

项目开发数据库选择postGis&#xff0c;由于需要编写数据库设计说明书&#xff0c;因此选择工具PDManer生成数据库设计文档&#xff0c;但是postGis一个数据库&#xff0c;可能对应多个Schema。如下图所示&#xff1a; 1.编写数据库设计文档时&#xff0c;仅需编写hly这个Sche…

美妆造型教培服务预约小程序的作用是什么

美业市场规模很高&#xff0c;细分类目更是比较广&#xff0c;而美妆造型就是其中的一类&#xff0c;从业者也比较多&#xff0c;除了学校科目外&#xff0c;美妆造型教培机构也有生意。 对机构来说主要目的是拓客引流-转化及赋能&#xff0c;而想要完善路径却是不太容易&…

Ubuntu16.04 python matplotlib 坐标轴标签出现中文乱码

问题&#xff1a;坐标轴打印中文时&#xff0c;显示会乱码 import matplotlib.pyplot as plt plt.ylabel(时间刻度)原因&#xff1a;matplotlib里面没有中文字体解决方法&#xff1a;下载SimHei字体&#xff0c;快捷方法是使用everything直接在windows搜索simhei.ttf&#xff…

计算机基础知识41

前端 # 前端是所有跟用户直接打交道 比如&#xff1a;PC页面、手机页面、汽车显示屏&#xff0c;肉眼可以看见的 # 后端&#xff1a;一堆代码&#xff0c;用户不能够直接看到&#xff0c;不直接与用户打交道 常见的后端&#xff1a;Python、Java、Go等 # 学了前端就可以做全栈…

0004net程序设计-抗疫物资

文章目录 **摘** **要**目 录系统设计开发环境 摘 要 近些年来&#xff0c;随着科技的飞速发展&#xff0c;互联网的普及逐渐延伸到各行各业中&#xff0c;给人们生活带来了十分的便利&#xff0c;抗疫物资管理系统利用计算机网络实现信息化管理&#xff0c;使整个抗疫物资管理…

Kafka - 3.x 分区分配策略及再平衡不完全指北

文章目录 生产经验——分区分配策略及再平衡生产者分区分配之Range及再平衡Range分区策略原理Range分区分配策略及再平衡案例 生产者分区分配之RoundRobin策略及再平衡RoundRobin分区策略原理RoundRobin分区分配策略及再平衡案例 生产者分区分配之Sticky及再平衡Sticky分区策略…

网络协议--TCP的超时与重传

21.1 引言 TCP提供可靠的运输层。它使用的方法之一就是确认从另一端收到的数据。但数据和确认都有可能会丢失。TCP通过在发送时设置一个定时器来解决这种问题。如果当定时器溢出时还没有收到确认&#xff0c;它就重传该数据。对任何实现而言&#xff0c;关键之处就在于超时和重…

Android NDK开发详解之Android.mk探秘

Android NDK开发详解之Android.mk探秘 概览基础知识变量和宏NDK 定义的 include 变量CLEAR_VARSBUILD_EXECUTABLEBUILD_SHARED_LIBRARYBUILD_STATIC_LIBRARYPREBUILT_SHARED_LIBRARYPREBUILT_STATIC_LIBRARY 目标信息变量TARGET_ARCHTARGET_PLATFORMTARGET_ABI 模块描述变量LOC…

ubuntu下英伟达显卡驱动及cuda安装

一、查看显卡需要安装的cuda版本及需要的驱动版本 进入官网查看&#xff1a; CUDA 12.3 Release Notes 比如需要装cuda12.2GA需要驱动版本至少为535.54.03 二、下载显卡驱动 2.1 进入官网下载界面&#xff1a; Official Drivers | NVIDIA&#xff0c;点击Beta and older dr…

系列三十三、代理(三)动态代理

一、概述 在实际开发过程中&#xff0c;往往我们自己不会去创建代理类&#xff0c;而是通过JDK提供的Proxy类在程序运行时&#xff0c;运用反射机制动态创建而成&#xff0c;这就是所谓的动态代理。 1.1、动态代理 vs 静态代理 静态代理需要程序员自己写代理类&#xff0c;动态…

软件测试面试百问:如何测试App性能?

APP性能测试几乎是客户端面试必问。 为什么要做App性能测试 如果APP总是出现卡顿或网络延迟的情况&#xff0c;降低了用户的好感&#xff0c;用户可能会抛弃该App&#xff0c;换同类型的其他应用。如果APP的性能较好&#xff0c;用户体验高&#xff0c;使用起来丝滑顺畅&…

干货:传统软文和新媒体软文的区别在哪儿

其实早在古代就有软文的影子&#xff0c;不管是“借问酒家何处有&#xff0c;牧童遥指杏花村”&#xff0c;还是“日啖荔枝三百颗,不辞长作岭南人。”都有软文的影子。今天媒介盒子就来和大家聊聊&#xff0c;传统软文和新媒体软文的区别在哪儿&#xff1f; 一、 渠道不同 在…

UWB智能制造

&#xff08;一&#xff09;无人值守的人、车、物出入监控 &#xff08;二&#xff09;人、车、物授权区域进出监控 &#xff08;三&#xff09;固定资产区域盘点及移动盘点 &#xff08;四&#xff09;人、车、物作业现场网格化管理 &#xff08;五&#xff09;作业现场人车、…