基于SpringBoot实现轻量级的动态定时任务调度

news2024/9/25 11:12:08

在使用SpringBoot框架进行开发时,一般都是通过@Scheduled注解进行定时任务的开发:


@Component
public class TestTask
{
    @Scheduled(cron="0/5 * *  * * ? ")   //每5秒执行一次
    public void execute(){
        SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); 
        log.info("任务执行" + df.format(new Date()));
    }
}

但是这种方式存在一个问题,那就是任务的周期控制是死的,必须编写在代码中,如果遇到需要在系统运行过程中想中止、立即执行、修改执行周期等动态操作的需求时,使用注解的方式便不能满足了,当然为了满足此种需求可以额外再引入其他任务调度插件(例如XXL-Job等),但是引入其他组件是需要衡量成本的,额外的依赖成本、组件的维护成本、开发的复杂度等等,所以如果系统体量不是那么大,完全没必要通过增加组件来完成,可以基于SpringBoot框架实现一套内置轻量级的任务调度。

设计思路

整体设计

这里我们把定时任务以类作为基础单位,即一个类为一个任务,然后通过配置数据的方式,进行任务的读取,通过反射生成任务对象,使用SpringBoot本身的线程池任务调度,完成动态的定时任务驱动,同时通过接口支撑实现相应的REST API对外暴露接口

任务模型

首先基于模板模式,设计基础的任务执行流程抽象类,定义出一个定时任务需要执行的内容和步骤和一些通用的方法函数,后续具体的定时任务直接继承该父类,实现该父类的before、start、after三个抽象函数即可,所有公共操作均在抽象父类完成

特殊说明:

    基于此方法创建的类是不归Spring的容器管理的,所以自定义的任务子类中是无法使用SpringBoot中的任何注解,尤其在自定义任务类中如果需要依赖其他Bean时,需要借助抽象父类AbstractBaseCronTask中已经实现的<T> T getServer(Class<T> className)来完成,getServer的实现如下:

public <T> T getServer(Class<T> className){
       return applicationContext.getBean(className);
    }

是通过SpringBoot中的ApplicationContext接口来获取Spring的上下文,以此来满足可以获取Spring中其他Bean的诉求。

例如,有个定时任务TaskOne类,它需要使用UserService类中的 caculateMoney()的方法,势必这个定时任务需要依赖UserService类,而TaskOne并非是Spring创建的对象,而是我们人为干预生成的对象,所以它是不在Spring的Bean管理范围的,自然也就无法使用@Autowird等方式注入UserService类,此时就需要使用getServer方法来获取UserService对象

//自定义定时任务类
public class TaskOne extends AbstractBaseCronTask {
    
    private UserService userService;

  
    public TestTask(TaskEntity taskEntity) {
        super(taskEntity);
    }

    @Override
    public void beforeJob() {
        //任务运行第一步,先将userService进行变量注入
        userService = getServer(UserService.class);
        ……
    }

    @Override
    public void startJob() {
       if(XXXX){
          //直接调用getServer获取需要的bean
          User user = getServer(UserMapper.class).findUser("111223")
          userService.caluateMoney(user);
          //……其他代码
       }
    }

    @Override
    public void afterJob() {

    }
}

任务对象加载过程

 核心逻辑在于利用反射,在SpringBoot启动后动态创建相应的定时任务类,并将其放置到SpringBoot的定时线程池中进行维护,同时将该对象同步存放至内存中一份,便于可以实时调用,当进行修改任务相关配置时,需要重新加载一次内容。

public class TaskScheduleServerImpl implements TaskScheduleServer {

    //正在运行的任务
    private static ConcurrentHashMap<String, ScheduledFuture> runningTasks = new ConcurrentHashMap<>();

    //线程池任务调度
    private ThreadPoolTaskScheduler threadPoolTaskScheduler = new ThreadPoolTaskScheduler();

    public boolean addTaskToScheduling(TaskEntity task) {
        if(!runningTasks.containsKey(task.getTaskId())){
            try{
                Class<?> clazz = Class.forName(task.getTaskClass());
                Constructor c = clazz.getConstructor(TaskEntity.class);
                AbstractBaseCronTask runnable = (AbstractBaseCronTask) c.newInstance(task);
                //反射方式生成对象不属于Spring容器管控,对于Spring的bean使用需要手动注入
                runnable.setApplicationContext(context);
                CronTrigger cron = new CronTrigger(task.getTaskCron());
                //put到runTasks
                runningTasks.put(task.getTaskId(), Objects.requireNonNull(this.threadPoolTaskScheduler.schedule(runnable, cron)));
                //存入内存中,便于外部调用
                ramTasks.put(task.getTaskId(),runnable);
                task.setTaskRamStatus(1);
                taskInfoOpMapper.updateTaskInfo(task);
                return true;
            }catch (Exception e){
                log.error("定时任务加载失败..."+e);
            }
        }
        return false;
    }
}

部分源码

这里将配置内容放入数据库中,直接以数据库中的表作为任务配置的基础

/**
* 任务对象
**/
@Data
public class TaskEntity implements Serializable {

    //任务唯一ID
    private String taskId;
    
    //任务名称
    private String taskName;
    
    //任务描述
    private String taskDesc;

    //执行周期配置
    private String taskCron;

    //任务类的全路径
    private String taskClass;
    
    //任务的额外配置
    private String taskOutConfig;
    
    //任务创建时间
    private String taskCreateTime;

    //任务是否启动,1启用,0不启用
    private Integer taskIsUse;
    
    //是否随系统启动立即执行
    private Integer taskBootUp;
    
    //任务上次执行状态
    private Integer taskLastRun;
    
    //任务是否加载至内存中 
    private Integer taskRamStatus;
}

核心逻辑,加载定时任务接口及其实现类



public interface TaskScheduleServer {



    ConcurrentHashMap<String, AbstractBaseCronTask> getTaskSchedulingRam();


    /**
     * 初始化任务调度
     */
    void initScheduling();

    /**
     * 添加任务至内存及容器
     * @param taskEntity 任务实体
     * @return boolean
     */
    boolean addTaskToScheduling(TaskEntity taskEntity);

    /**
     * 从任务调度器中移除任务
     * @param id 任务id
     * @return Boolean
     */
    boolean removeTaskFromScheduling(String id);


    /**
     * 执行指定任务
     * @param id 任务id
     * @return double 耗时
     */
    double runTaskById(String id);


    /**
     * 清空任务
     */
    void claearAllTask();



    /**
     * 加载所有任务
     */
    void loadAllTask();

    /**
     * 运行开机自启任务
     */
    void runBootUpTask();

}





@Slf4j
@Component
public class TaskScheduleServerImpl implements TaskScheduleServer {

    
    …………

    @Override
    public double runTaskById(String id) {
        TaskEntity task = taskInfoOpMapper.queryTaskInfoById(id);
        if(null!=task) {
            if (runningTasks.containsKey(task.getTaskId())){
                ramTasks.get(task.getTaskId()).run();
                return ramTasks.get(task.getTaskId()).getRunTime();
            }
        }
        return 0d;
    }

    @Override
    public void claearAllTask() {
        ramTasks.clear();
        log.info("【定时任务控制器】清除内存任务 完成");
        runningTasks.clear();
        log.info("【定时任务控制器】清除线程任务 完成");
        threadPoolTaskScheduler.shutdown();
    }

    @Override
    public void loadAllTask() {
        List<TaskEntity> allTask = taskInfoOpMapper.queryTaskInfo(null);
        for (TaskEntity task : allTask) {
            if(addTaskToScheduling(task)){
                log.info("【定时任务初始化】装填任务:{} [ 任务执行周期:{} ] [ bootup:{}]",task.getTaskName(),task.getTaskCron(),task.getTaskBootUp());
            }
        }
    }


    @Override
    public void runBootUpTask() {
        TaskEntity entity = new TaskEntity().taskBootUp(1);
        List<TaskEntity> list = taskInfoOpMapper.queryTaskInfo(entity);
        for(TaskEntity task:list){
            runTaskById(task.getTaskId());
        }
    }
}

在SpringBoot中的加载类

@Order(3)
@Component
@Slf4j
public class AfterAppStarted implements ApplicationRunner {


    TaskScheduleServer taskScheduleServer;

    @Autowired
    public void setTaskScheduleServer(TaskScheduleServer taskScheduleServer) {
        this.taskScheduleServer = taskScheduleServer;
    }

    @Override
    public void run(ApplicationArguments args) throws Exception {
        //运行随系统启动的定时任务
        taskScheduleServer.runBootUpTask();
    }

}

对外暴露控制接口及其Service

@RestController
@RequestMapping("/taskScheduling/manage")
@Api(tags = "数据源管理服务")
public class TaskSchedulingController {


    TaskScheduleManagerService taskScheduleManagerService;

    @Autowired
    public void setTaskScheduleManagerService(TaskScheduleManagerService taskScheduleManagerService) {
        this.taskScheduleManagerService = taskScheduleManagerService;
    }

    @PostMapping("/search")
    @Operation(summary = "分页查询任务")
    public Response searchData(@RequestBody SearchTaskDto param){
        return Response.success(taskScheduleManagerService.searchTaskForPage(param));
    }


    @GetMapping("/detail")
    @Operation(summary = "具体任务对象")
    public Response searchDetail(String taskId){
        return Response.success(taskScheduleManagerService.searchTaskDetail(taskId));
    }

    @GetMapping("/shutdown")
    @Operation(summary = "关闭指定任务")
    public Response shutdownTask(String taskId){
        return Response.success(taskScheduleManagerService.shutdownTask(taskId));
    }


    @GetMapping("/open")
    @Operation(summary = "开启指定任务")
    public Response openTask(String taskId){
        return Response.success(taskScheduleManagerService.openTask(taskId));
    }


    @GetMapping("/run")
    @Operation(summary = "运行指定任务")
    public  Response runTask(String taskId){
        return Response.success(taskScheduleManagerService.runTask(taskId));
    }


    @PostMapping("/update")
    @Operation(summary = "更新指定任务")
    public Response updateTask(@RequestBody TaskEntity taskEntity){
        return Response.success(taskScheduleManagerService.updateTaskBusinessInfo(taskEntity));
    }


}

相关接口实现类

@Service
public class TaskScheduleManagerServiceImpl implements TaskScheduleManagerService {


    private TaskInfoOpMapper taskInfoOpMapper;


    private TaskScheduleServer taskScheduleServer;

    @Autowired
    public void setTaskInfoOpMapper(TaskInfoOpMapper taskInfoOpMapper) {
        this.taskInfoOpMapper = taskInfoOpMapper;
    }

    @Autowired
    public void setTaskScheduleServer(TaskScheduleServer taskScheduleServer) {
        this.taskScheduleServer = taskScheduleServer;
    }

    @Override
    public IPage<TaskEntity> searchTaskForPage(SearchTaskDto dto) {
        Page<TaskEntity> pageParam = new Page<>(1,10);
        pageParam.setAsc("task_id");
        return taskInfoOpMapper.queryTaskInfoPage(pageParam,dto.getFilterKey(),dto.getBootUp(),dto.getLastRunStatus());
    }


    @Override
    public TaskEntity searchTaskDetail(String taskId) {
        if(!StringUtils.isEmpty(taskId)){
            return taskInfoOpMapper.queryTaskInfoById(taskId);
        }
        return null;
    }


    @Override
    public TaskRunRetDto runTask(String taskId) {
        AbstractBaseCronTask task = taskScheduleServer.getTaskSchedulingRam().get(taskId);
        TaskRunRetDto result = new TaskRunRetDto(TaskRunRetDto.TaskOperation.run, 0);
        if(null != task) {
                double time = taskScheduleServer.runTaskById(taskId);
                result.setResult(1);
                return result.extend(time).taskInfo(task.getThisTaskInfo());
        } else {
            return result.extend("任务未启用");
        }
    }

    @Override
    public TaskRunRetDto shutdownTask(String taskId) {
        AbstractBaseCronTask task = taskScheduleServer.getTaskSchedulingRam().get(taskId);
        TaskRunRetDto result = new TaskRunRetDto(TaskRunRetDto.TaskOperation.shutdown, 0);
        if(null != task) {
            boolean flag = taskScheduleServer.removeTaskFromScheduling(taskId);
            if(flag) {
                result.setResult(1);
            }
            return result.extend("任务成功关闭").taskInfo(task.getThisTaskInfo());
        } else {
            return result.extend("任务未启用");
        }
    }

    @Override
    public TaskRunRetDto openTask(String taskId) {
        TaskEntity task = taskInfoOpMapper.queryTaskInfoById(taskId);
        TaskRunRetDto result = new TaskRunRetDto(TaskRunRetDto.TaskOperation.open, 0);
        if(null != task) {
            if (!taskScheduleServer.getTaskSchedulingRam().containsKey(taskId)) {
              boolean flag = taskScheduleServer.addTaskToScheduling(task);
                if(flag) {
                    result.setResult(1);
                }
                return result.extend("任务开启成功").taskInfo(task);
            } else {
                return result.extend("任务处于启动状态").taskInfo(task);
            }
        }else {
            return result.extend("任务不存在!");
        }
    }

    @Override
    public TaskRunRetDto updateTaskBusinessInfo(TaskEntity entity) {
        TaskEntity task = searchTaskDetail(entity.getTaskId());
        TaskRunRetDto result = new TaskRunRetDto(TaskRunRetDto.TaskOperation.update, 0).taskInfo(entity);
        String config = entity.getTaskOutConfig();
        if(null != config && !JSONUtil.isJson(config) && !JSONUtil.isJsonArray(config)){
            result.setResult(0);
            result.extend("更新任务失败,任务配置必须为JSON或空");
            result.taskInfo(entity);
            return result;
        }
        task.setTaskCron(entity.getTaskCron());
        task.setTaskOutConfig(entity.getTaskOutConfig());
        task.setTaskName(entity.getTaskName());
        task.setTaskDesc(entity.getTaskDesc());
        int num = taskInfoOpMapper.updateTaskInfo(task);
        if (num == 1) {
            result.setResult(1);
            result.extend("成功更新任务");
            result.taskInfo(entity);
            //重新刷新任务
            taskScheduleServer.removeTaskFromScheduling(entity.getTaskId());
            taskScheduleServer.addTaskToScheduling(task);
        }

        return result;
    }

效果

数据库中配置任务

任务代码

public class TestTask extends AbstractBaseCronTask {

    public TestTask(TaskEntity taskEntity) {
        super(taskEntity);
    }

    @Override
    public void beforeJob() {
        log.info("测试任务开始");
    }

    @Override
    public void startJob() {
    }

    @Override
    public void afterJob() {
    }
}

任务查看

执行效果

 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1914864.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

自定义枚举对象序列化规则: 在Json中以枚举的code值表示枚举;枚举序列化时,新增枚举描述字段;String到IEnum的转换

文章目录 引言I 案例分析1.1 接口签名计算1.2 请求对象1.3 枚举对象序列化1.4 创建JavaTimeModule以支持Java 8的时间日期类型序列化和反序列化1.5 请求对象默认值处理II 在JSON中以枚举的code值来表示枚举的实现方式2.1 自定义toString方法返回code2.2 使用@JsonValue注解,只…

buuctf面具下的flag

细节: 这道题可能因为是vmdk的原因 导致在window上 7z无法得到全部的信息 所以最后解压要在linux系统上 解密网站 Brainfuck/Ook! Obfuscation/Encoding [splitbrain.org] 这道题010打开,可以发现里面隐藏了很多 binwalk解压 两个文件 vmdk可以直接 用7z解压 7z x flag.…

1. InternLM - 入门岛

第1关 Linux 基础知识 1. 完成SSH连接与端口映射并运行hello_world.py SSH连接配置 # wsl2中生成密钥对&#xff08;~/.ssh/id_rsa, ~/.ssh/id_rsa.pub&#xff09; ssh-keygen -t rsa# 将id_rsa.pub在internStudio作为公钥导入SSH登录 $ ssh -p 38871 rootssh.intern-ai.o…

如何监控 PostgreSQL 中表空间的使用情况并进行合理的管理?

文章目录 如何监控 PostgreSQL 中表空间的使用情况并进行合理的管理 一、引言 在 PostgreSQL 数据库中&#xff0c;表空间&#xff08;Tablespace&#xff09;是用于管理数据库对象存储位置的逻辑存储区域。有效地监控和管理表空间的使用情况对于确保数据库的性能、优化存储资…

Flutter 开启混淆打包apk,并反编译apk确认源码是否被混淆

第一步&#xff1a;开启混淆并打包apk flutter build apk --obfuscate --split-debug-info./out/android/app.android-arm64.symbols 第二步&#xff1a;从dex2jar download | SourceForge.net 官网下载dex2jar 下载完终端进入该文件夹&#xff0c;然后运行以下命令就会在该…

【多GPU训练方法】

一、数据并行 这是最常用的方法。整个模型复制到每个GPU上。训练数据被均匀分割&#xff0c;每个GPU处理一部分数据。所有GPU上的梯度被收集并求平均。通常使用NCCL&#xff08;NVIDIA Collective Communications Library&#xff09;等通信库实现。参数更新 使用同步后的梯度…

愚人杯的RE题

easy_pyc pyc反编译成py文件 # uncompyle6 version 3.9.1 # Python bytecode version base 2.7 (62211) # Decompiled from: Python 3.11.8 (tags/v3.11.8:db85d51, Feb 6 2024, 22:03:32) [MSC v.1937 64 bit (AMD64)] # Embedded file name: enpyc.py # Compiled at: 2023…

批量下载手机中APP程序中文件

需求 利用 adb pull 下载手机中app的某目录 adb pull 命令本身不支持直接下载整个目录&#xff08;文件夹&#xff09;及其所有子目录和文件作为一个单一的操作。但是&#xff0c;可以通过一些方法来间接实现这一目的。 方法 1. 首先将要下载的目录进行 tar 打包 # 在 And…

初阶C++(三)

初阶C(三&#xff09; 指针和引⽤的关系inline介绍对inline的运用宏函数与inline关系nullptr NULL在C中有歧义nullptr引用 指针和引⽤的关系 C中指针和引⽤就像两个性格迥异的亲兄弟&#xff0c;指针是哥哥&#xff0c;引⽤是弟弟&#xff0c;在实践中他们相辅相成&#xff0c;…

单向链表队列

实现单向链表队列的&#xff0c;创建&#xff0c;入队&#xff0c;出队&#xff0c;遍历&#xff0c;长度&#xff0c;销毁。 queue.h #ifndef __QUEUE_H__ #define __QUEUE_H__#include <stdio.h> #include <stdlib.h> #include <string.h> #define max 30…

Docker 使用基础(2)—镜像

&#x1f3ac;慕斯主页&#xff1a;修仙—别有洞天 ♈️今日夜电波&#xff1a;秒針を噛む—ずっと真夜中でいいのに。 0:34━━━━━━️&#x1f49f;──────── 4:20 &#x1f504; ◀️ ⏸ …

【机器学习】(基础篇三) —— 线性回归

线性回归 本文介绍最经典的监督学习问题——线性回归&#xff0c;包括单变量线性回归和多变量线性回归 线性回归是回归任务&#xff0c;输入是带有标签的数据&#xff0c;根据数据关系&#xff0c;拟合出一个线性函数&#xff0c;并利用该函数进行预测等操作。线性回归分为单…

AirPods Pro新功能前瞻:iOS 18的五大创新亮点

随着科技的不断进步&#xff0c;苹果公司一直在探索如何通过创新提升用户体验。iOS 18的推出&#xff0c;不仅仅是iPhone的一次系统更新&#xff0c;更是苹果生态链中重要一环——AirPods Pro的一次重大升级。 据悉&#xff0c;iOS 18将为AirPods Pro带来五项新功能&#xff0…

LLM 研究方向(一): LLM Prompts--p-tuning、LoRA

目录 1. prompt-tuning background 2. Prompt Tuning 模型介绍 2.1 2021 prefix-tuning 2.2 2021 P-tuning v1 2.3 2021 Parameter-efficient prompt tuning (PET) 2.4 2022 P-tuning v2 2.5 2019 Adapter ​2.6 2021 LoRA (Low-Rank Adaptation) 2.7 2024 DoRA (…

剖析自闭症孩子玩手的独特之处

自闭症孩子玩手的行为常常具有一些较为独特的特点。 重复性是一个显著的特征。他们可能会以一种几乎相同的方式、节奏和频率反复地摆弄自己的手&#xff0c;例如不停地握拳、张开&#xff0c;或者持续地旋转手腕。 动作的单调性也是常见的。玩手的方式可能较为单一&#xff0c;…

python怎么求因数

要想做到python语言求因数方法&#xff0c;首先要明白其中的原理&#xff1a; 1、对由123456789这九个数字组成的9位数进行分解质因数。 2、1234576982x3x3x7x13x23x29x113&#xff0c;所以他的值因数是113。 3、总共有362880种可能&#xff0c;从中找出值因数中最小的数字和…

如何将heic格式转换jpg?四种将heic转换成jpg的方法!

如何将heic格式转换jpg&#xff1f;在现今的数字图像处理领域&#xff0c;Heic格式作为一种被吹捧的创新型图像格式&#xff0c;以其先进的压缩技术&#xff0c;迅速减小了图片文件的大小&#xff0c;然而&#xff0c;尽管其有许多优点&#xff0c;实际使用中Heic格式却带来了一…

视频解码故障案例两则

案例1 绿边 故障分析&#xff1a; 这个能明显看到视频上方出现绿色半透明边带。这说明Y数据正常。UV数据不正常。 它显然与视频帧的垂直分辨率设置有关。 UV数据和Y数据是连续放置的&#xff0c;如果上方出现彩色数据失调&#xff0c;说明这部分数据实际仍然是Y数据。也就是…

用微信服务号支付门诊缴费

时间上午10刚过&#xff0c;医院里计价收费处排起了长龙&#xff0c;放眼望去&#xff0c;左边的窗口六条队。右边在原来发药的位置也开辟了收费窗口&#xff0c;数了一下有四条队。一共十条排队付费的长龙&#xff0c;每一条队伍的人数不下20人&#xff0c;也即超过200人在排队…

Jenkins 构建 Web 项目:构建服务器和部署服务器分离, 并且前后端在一起的项目

构建命令 #!/bin/bash cd ruoyi-ui node -v pnpm -v pnpm install pnpm build:prod # 将dist打包成dist.zip zip -r dist.zip dist cp dist.zip ../dist.zip