延时任务的四种实现方式

news2025/1/14 1:15:51

什么是延迟任务?

顾明思议,我们把需要延迟执行的任务叫做延迟任务

延迟任务的使用场景有以下这些:

  1. 红包 24 小时未被查收,需要延迟执退还业务;

  1. 每个月账单日,需要给用户发送当月的对账单;

  1. 订单下单之后 30 分钟后,用户如果没有付钱,系统需要自动取消订单。

等事件都需要使用延迟任务。

延迟任务实现思路分析

延迟任务实现的关键是在某个时间节点执行某个任务。基于这个信息我们可以想到实现延迟任务的手段有以下两个:

  1. 自己手写一个“死循环”一直判断当前时间节点有没有要执行的任务;

  1. 借助 JDK 或者第三方提供的工具类来实现延迟任务。

而通过 JDK 实现延迟任务我们能想到的关键词是:DelayQueue、ScheduledExecutorService,而第三方提供的延迟任务执行方法就有很多了,例如:Redis、Netty、MQ 等手段。

延迟任务实现

  1. 限循环实现延迟任务

此方式我们需要开启一个无限循环一直扫描任务,然后使用一个 Map 集合用来存储任务和延迟执行的时间,实现代码如下:

import java.time.Instant;
import java.time.LocalDateTime;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;

/**
 * 延迟任务执行方法汇总
 */
public class DelayTaskExample {
    // 存放定时任务
    private static Map<String, Long> _TaskMap = new HashMap<>();

    public static void main(String[] args) {
        System.out.println("程序启动时间:" + LocalDateTime.now());
        // 添加定时任务
        _TaskMap.put("task-1", Instant.now().plusSeconds(3).toEpochMilli()); // 延迟 3s

        // 调用无限循环实现延迟任务
        loopTask();
    }

    /**
     * 无限循环实现延迟任务
     */
    public static void loopTask() {
        Long itemLong = 0L;
        while (true) {
            Iterator it = _TaskMap.entrySet().iterator();
            while (it.hasNext()) {
                Map.Entry entry = (Map.Entry) it.next();
                itemLong = (Long) entry.getValue();
                // 有任务需要执行
                if (Instant.now().toEpochMilli() >= itemLong) {
                    // 延迟任务,业务逻辑执行
                    System.out.println("执行任务:" + entry.getKey() +
                            " ,执行时间:" + LocalDateTime.now());
                    // 删除任务
                    _TaskMap.remove(entry.getKey());
                }
            }
        }
    }
}

以上程序执行的结果为:

程序启动时间:2020-04-12T18:51:28.188
执行任务:task-1 ,执行时间:2020-04-12T18:51:31.189

可以看出任务延迟了 3s 钟执行了,符合我们的预期。

2.Java API 实现延迟任务

Java API 提供了两种实现延迟任务的方法:DelayQueue 和 ScheduledExecutorService。

① ScheduledExecutorService 实现延迟任务

我们可以使用 ScheduledExecutorService 来以固定的频率一直执行任务,实现代码如下:

public class DelayTaskExample {
    public static void main(String[] args) {
        System.out.println("程序启动时间:" + LocalDateTime.now());
        scheduledExecutorServiceTask();
    }

    /**
     * ScheduledExecutorService 实现固定频率一直循环执行任务
     */
    public static void scheduledExecutorServiceTask() {
        ScheduledExecutorService executor = Executors.newScheduledThreadPool(1);
        executor.scheduleWithFixedDelay(
                new Runnable() {
                    @Override
                    public void run() {
                        // 执行任务的业务代码
                        System.out.println("执行任务" +
                                " ,执行时间:" + LocalDateTime.now());
                    }
                },
                2, // 初次执行间隔
                2, // 2s 执行一次
                TimeUnit.SECONDS);
    }
}

以上程序执行的结果为:

程序启动时间:2020-04-12T21:28:10.416
执行任务 ,执行时间:2020-04-12T21:28:12.421
执行任务 ,执行时间:2020-04-12T21:28:14.422
......

可以看出使用 ScheduledExecutorService#scheduleWithFixedDelay(...) 方法之后,会以某个频率一直循环执行延迟任务。

② DelayQueue 实现延迟任务

DelayQueue 是一个支持延时获取元素的无界阻塞队列,队列中的元素必须实现 Delayed 接口,并重写 getDelay(TimeUnit) 和 compareTo(Delayed) 方法,DelayQueue 实现延迟队列的完整代码如下:

public class DelayTest {
    public static void main(String[] args) throws InterruptedException {
        DelayQueue delayQueue = new DelayQueue();
        // 添加延迟任务
        delayQueue.put(new DelayElement(1000));
        delayQueue.put(new DelayElement(3000));
        delayQueue.put(new DelayElement(5000));
        System.out.println("开始时间:" +  DateFormat.getDateTimeInstance().format(new Date()));
        while (!delayQueue.isEmpty()){
            // 执行延迟任务
            System.out.println(delayQueue.take());
        }
        System.out.println("结束时间:" +  DateFormat.getDateTimeInstance().format(new Date()));
    }

    static class DelayElement implements Delayed {
        // 延迟截止时间(单面:毫秒)
        long delayTime = System.currentTimeMillis();
        public DelayElement(long delayTime) {
            this.delayTime = (this.delayTime + delayTime);
        }
        @Override
        // 获取剩余时间
        public long getDelay(TimeUnit unit) {
            return unit.convert(delayTime - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
        }
        @Override
        // 队列里元素的排序依据
        public int compareTo(Delayed o) {
            if (this.getDelay(TimeUnit.MILLISECONDS) > o.getDelay(TimeUnit.MILLISECONDS)) {
                return 1;
            } else if (this.getDelay(TimeUnit.MILLISECONDS) < o.getDelay(TimeUnit.MILLISECONDS)) {
                return -1;
            } else {
                return 0;
            }
        }
        @Override
        public String toString() {
            return DateFormat.getDateTimeInstance().format(new Date(delayTime));
        }
    }
}

以上程序执行的结果为:

开始时间:2020-4-12 20:40:38
2020-4-12 20:40:39
2020-4-12 20:40:41
2020-4-12 20:40:43
结束时间:2020-4-12 20:40:43

3. 监听Redis过期key

在Redis中,有个发布订阅的机制

生产者在消息发送时需要到指定发送到哪个channel上,消费者订阅这个channel就能获取到消息。图中channel理解成MQ中的topic。

并且在Redis中,有很多默认的channel,只不过向这些channel发送消息的生产者不是我们写的代码,而是Redis本身。这里面就有这么一个channel叫做__keyevent@<db>__:expired,db是指Redis数据库的序号。

当某个Redis的key过期之后,Redis内部会发布一个事件到__keyevent@<db>__:expired这个channel上,只要监听这个事件,那么就可以获取到过期的key。

所以基于监听Redis过期key实现延迟任务的原理如下:

  • 将延迟任务作为key,过期时间设置为延迟时间

  • 监听__keyevent@<db>__:expired这个channel,那么一旦延迟任务到了过期时间(延迟时间),那么就可以获取到这个任务

实现代码如下:

import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPubSub;
import utils.JedisUtils;

public class TaskExample {
    public static final String _TOPIC = "__keyevent@0__:expired"; // 订阅频道名称
    public static void main(String[] args) {
        Jedis jedis = JedisUtils.getJedis();
        // 执行定时任务
        doTask(jedis);
    }

    /**
     * 订阅过期消息,执行定时任务
     * @param jedis Redis 客户端
     */
    public static void doTask(Jedis jedis) {
        // 订阅过期消息
        jedis.psubscribe(new JedisPubSub() {
            @Override
            public void onPMessage(String pattern, String channel, String message) {
                // 接收到消息,执行定时任务
                System.out.println("收到消息:" + message);
            }
        }, _TOPIC);
    }
}

4. MQ 实现延迟任务

如果专门开启一个 MQ 中间件来执行延迟任务,就有点杀鸡用宰牛刀般的奢侈了,不过已经有了 MQ 环境的话,用它来实现延迟任务的话,还是可取的。

几乎所有的 MQ 中间件都可以实现延迟任务,在这里更准确的叫法应该叫延队列。本文就使用 RabbitMQ 为例,来看它是如何实现延迟任务的。

RabbitMQ 实现延迟队列的方式有两种:

  • 通过消息过期后进入死信交换器,再由交换器转发到延迟消费队列,实现延迟功能;

  • 使用 rabbitmq-delayed-message-exchange 插件实现延迟功能。

由于使用死信交换器比较麻烦,所以推荐使用第二种实现方式 rabbitmq-delayed-message-exchange 插件的方式实现延迟队列的功能。

首先,我们需要下载并安装 rabbitmq-delayed-message-exchange 插件,下载地址:http://www.rabbitmq.com/community-plugins.html

选择相应的对应的版本进行下载,然后拷贝到 RabbitMQ 服务器目录,使用命令 rabbitmq-plugins enable rabbitmq_delayed_message_exchange 开启插件,在使用命令 rabbitmq-plugins list 查询安装的所有插件,安装成功如下图所示:

最后重启 RabbitMQ 服务,使插件生效。

首先,我们先要配置消息队列,实现代码如下:

import com.example.rabbitmq.mq.DirectConfig;
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.HashMap;
import java.util.Map;

@Configuration
public class DelayedConfig {
    final static String QUEUE_NAME = "delayed.goods.order";
    final static String EXCHANGE_NAME = "delayedec";
    @Bean
    public Queue queue() {
        return new Queue(DelayedConfig.QUEUE_NAME);
    }

    // 配置默认的交换机
    @Bean
    CustomExchange customExchange() {
        Map<String, Object> args = new HashMap<>();
        args.put("x-delayed-type", "direct");
        //参数二为类型:必须是x-delayed-message
        return new CustomExchange(DelayedConfig.EXCHANGE_NAME, "x-delayed-message", true, false, args);
    }
    // 绑定队列到交换器
    @Bean
    Binding binding(Queue queue, CustomExchange exchange) {
        return BindingBuilder.bind(queue).to(exchange).with(DelayedConfig.QUEUE_NAME).noargs();
    }
}

然后添加增加消息的代码,具体实现如下:

import org.springframework.amqp.AmqpException;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessagePostProcessor;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.text.SimpleDateFormat;
import java.util.Date;

@Component
public class DelayedSender {
    @Autowired
    private AmqpTemplate rabbitTemplate;

    public void send(String msg) {
        SimpleDateFormat sf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
        System.out.println("发送时间:" + sf.format(new Date()));

        rabbitTemplate.convertAndSend(DelayedConfig.EXCHANGE_NAME, DelayedConfig.QUEUE_NAME, msg, new MessagePostProcessor() {
            @Override
            public Message postProcessMessage(Message message) throws AmqpException {
                message.getMessageProperties().setHeader("x-delay", 3000);
                return message;
            }
        });
    }
}

再添加消费消息的代码:

import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.text.SimpleDateFormat;
import java.util.Date;

@Component
@RabbitListener(queues = "delayed.goods.order")
public class DelayedReceiver {
    @RabbitHandler
    public void process(String msg) {
        SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
        System.out.println("接收时间:" + sdf.format(new Date()));
        System.out.println("消息内容:" + msg);
    }
}

最后,我们使用代码测试一下:

import com.example.rabbitmq.RabbitmqApplication;
import com.example.rabbitmq.mq.delayed.DelayedSender;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;

import java.text.SimpleDateFormat;
import java.util.Date;

@RunWith(SpringRunner.class)
@SpringBootTest
public class DelayedTest {

    @Autowired
    private DelayedSender sender;

    @Test
    public void Test() throws InterruptedException {
        SimpleDateFormat sf = new SimpleDateFormat("yyyy-MM-dd");
        sender.send("Hi Admin.");
        Thread.sleep(5 * 1000); //等待接收程序执行之后,再退出测试
    }
}

以上程序的执行结果如下:

发送时间:2020-04-13 20:47:51
接收时间:2020-04-13 20:47:54
消息内容:Hi Admin.

从结果可以看出,以上程序执行符合延迟任务的实现预期。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/390612.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

计算机网络之MAC和IP地址

MAC地址 在局域网中&#xff0c;硬件地址&#xff0c;又称为物理地址或MAC地址。 目前现在的局域网中实际上使用的都是6字节的MAC地址&#xff0c;所以每一个以太网设备都具有唯一的MAC地址。 MAC地址的格式 假设传输使用的是IP数据&#xff0c;V2的MAC帧较为简单&#xff…

Sonar:VSCode配置SonarLint/SonarLint连接SonarQube

需求描述 公司为项目代码配置了Sonar检测&#xff0c;希望在VSCode中开发项目时能够同步检测结果。 注意事项 SonarQube版本必须为7.9&#xff0c;否则SonarLint无法连接&#xff08;GitHub-SonarLint-Wiki第一行就有说明&#xff09;&#xff01;&#xff01;&#xff01;S…

【前端】一个更底层库-React基础知识点第2篇

目录属性状态PROPSPROP VALIDATIONSTATEFORMCONTROLLED COMPONENTSMIXINCOMPONENT APICOMPONENT LIFECYCLETOP API上一篇文章也是React基础知识点&#xff0c;了解到了React是什么&#xff1f;为什么要使用React&#xff1f;还知道了JSX概述&#xff0c;JSX嵌入变量&#xff0c…

python Django中的cookies和session会话保持技术

cookies和session都是为了保持会话状态而诞生的两个存储技术会话定义&#xff1a; 从打开浏览器访问一个网站&#xff0c;到关闭浏览器结束此次访问&#xff0c;称之为一次会话HTTP协议是无状态的&#xff0c;导致会话状态难以保持Cookies-定义 cookies是保存在客户端浏览器上的…

超简单的卷积和加法融合

神经网络的优化除了之前提到的一些硬件优化手段(AI硬件加速拾遗)之外&#xff0c;还有很多图层方面的优化手段。大家好啊&#xff0c;我是董董灿。 而且图层方面的优化&#xff0c;有时效果更佳。往往一个有效的优化&#xff0c;甚至可以“消除”掉一个算子的存在。 这里的“…

Vue组件-$refs、$nextTick和name属性的使用

Vue组件-$refs和$nextTick使用一、获取DOM二、$refs获取组件对象三、$nextTick异步更新DOM四、组件name属性的使用一、获取DOM 通过id或ref属性获取原生DOM 在mounted生命周期 – 2种方式获取原生DOM标签 目标标签 – 添加id / ref恰当时机, 通过id / 通过ref属性 获取目标标签…

Ubuntu系统新硬盘挂载

Ubuntu系统新硬盘挂载 服务器通常会面临存储不足的问题&#xff0c;大部分服务器都是ubuntu系统&#xff0c;该篇博客浅浅记载一下在ubuntu系统上挂载新硬盘的步骤。本篇博文仅仅记载简单挂载一块新的硬盘&#xff0c;而没有对硬盘进行分区啥的。如果需要更加完善的教程&#…

【C++】平衡二叉搜索(AVL)树的模拟实现

一、 AVL树的概念 map、multimap、set、multiset 在其文档介绍中可以发现&#xff0c;这几个容器有个共同点是&#xff1a;其底层都是按照二叉搜索树来实现的&#xff0c;但是二叉搜索树有其自身的缺陷&#xff0c;假如往树中插入的元素有序或者接近有序&#xff0c;二叉搜索树…

Java文档搜索引擎总结

Java文档搜索引擎总结项目介绍项目使用的技术栈前端页面展示后端逻辑部分索引部分搜索模块部分Web模块部分项目介绍 Java文档搜索引擎项目是一个SSM项目&#xff0c;该项目的前端界面部分是由搜索页面和展示页面组成&#xff0c;后端部分索引模块&#xff08;ScanAnalysis、in…

UNET 对 CARVANA 数据集的分割

目录 1. 介绍 2. UNET 网络 3. dataset 数据加载 4. utils 工具模块 4.1 get_loaders 函数 4.2 check_accuracy 函数 4.3 save_predictions_as_imgs 函数 4.4 完整代码 5. train 函数 5.1 关于导入的库文件 5.2 设置超参数 5.3 train_fn 训练一个epoch函数 5.4 m…

Redis是单线程还是多线程?Redis的10种数据类型,有哪些应用场景?

目录专栏导读一、同样是缓存&#xff0c;用map不行吗&#xff1f;二、Redis为什么是单线程的&#xff1f;三、Redis真的是单线程的吗&#xff1f;四、Redis优缺点1、优点2、缺点五、Redis常见业务场景六、Redis常见数据类型1、String2、List3、Hash4、Set5、Zset6、BitMap7、Bi…

【Java基础】30分钟Git 从入门到精通

一、 版本控制工具1、什么是版本控制系统&#xff1f;版本控制系统&#xff08;Version Control System&#xff09;:是一种记录一个或若干文件内容变化&#xff0c;以便将来查阅特定版本修订情况的系统。版本控制系统不仅可以应用于软件源代码的文本文件&#xff0c;而且可以对…

主成分分析(PCA)方法 和协方差 相关系数

提示&#xff1a;文章写完后&#xff0c;目录可以自动生成&#xff0c;如何生成可参考右边的帮助文档 文章目录前言1.主成分分析&#xff08;PCA&#xff09;方法2.算法步骤前言 主成分分析&#xff08;Principal Components Analysis&#xff0c;PCA&#xff09;是一种数据降…

Android工厂模式

工厂模式分为三种 :简单工厂模式 、工厂方法模式 、抽象工厂模式 。 目录 简单工厂模式 UML图 实现 使用场景&#xff1a; 优点 &#xff1a; 缺点&#xff1a; 工厂方法模式 UML图 实现 使用场景&#xff1a; 优点&#xff1a; 缺点&#xff1a; 抽象工厂模式 UM…

SpringCloud-负载均衡-Ribbon

文章目录1. 作用&#xff1a;负载均衡2. 应用实战2.1 provider-a代码2.2 provider-b代码2.3 consumer代码2.4 api工具向consumer发送请求查看对provider的调用情况3. ribbon总结3.1 Ribbon 源码核心3.2 如何实现负载均衡的呢&#xff1f;1. 作用&#xff1a;负载均衡 2. 应用实…

演唱会总是抢不到票?教你用Python制作一个自动抢票脚本

人生苦短 我用python 这个大家应该都知道吧&#xff1f; 是中国综合类现场娱乐票务营销平台&#xff0c; 业务覆盖演唱会、 话剧、音乐剧、体育赛事等领域。 如何快速抢票&#xff1f; 那么&#xff0c; 今天带大家用Python来制作一个自动抢票的脚本小程序 本文源码python安…

使用Java对稀疏数组的压缩与还原

稀疏矩阵的压缩与还原 稀疏数组中元素个数很少或者有大量的重复值&#xff0c;如果直接保存保存&#xff0c;会浪费很多空间&#xff0c;这时&#xff0c;就可以考虑对数组进行压缩存储。 先定义一个稀疏数组 //创建一个二维数组 11 * 11 int[][] array1 new int[11][11]; /…

Window 编辑、删除、新增右键菜单

关于 Window 右键菜单 右键菜单可以在注册表编辑器中新增和修改 建议先下载 registry-finder&#xff0c;查找速度更快&#xff01; 使用管理员模式打开 registry-finder 后&#xff0c;点击 HKEY_CLASSES_ROOT &#xff0c;修改注册表右键菜单的子路径如下表所示 类型路径…

49.在ROS中实现local planner(2)- 实现Purepersuit(纯跟踪)算法

48.在ROS中实现local planner&#xff08;1&#xff09;- 实现一个可以用的模板实现了一个模板&#xff0c;接下来我们将实现一个简单的纯跟踪控制&#xff0c;也就是沿着固定的路径运动&#xff0c;全局规划已经规划出路径点&#xff0c;基于该路径输出相应的控制速度 1. Pur…

Linux系列学习(三) - 进程和库文件

目录 引言&#xff1a; 学习&#xff1a; 基本命令补充&#xff1a; wc命令&#xff1a; more命令&#xff1a; less命令&#xff1a; cat ps命令&#xff1a; kill命令&#xff1a; bg命令&#xff1a; fg命令&#xff1a; 查看系统运行级别&#xff1a; 库文件&a…