手撸XXL-JOB(二)——定时任务管理

news2024/12/26 22:13:32

在上一节中,我们介绍了SpringBoot中关于定时任务的执行方式,以及ScheduledExecutorService接口提供的定时任务执行方法。假设我们现在要写类似XXL-JOB这样的任务调度平台,那么,对于任务的管理,是尤为重要的。接下来我们将一步一步,实现一个任务调度管理类。

YangJobManager类基础实现

假设我们现在的任务管理类,名为YangJobManager类。对于定时任务的执行,我们最终会调用到ScheduledExecutorService的相关方法,因此,我们的YangJobManager类,需要有ScheduledExecutorService属性,其次,我们希望能对要执行的定时线程任务,其命名进行修改,因此,我们需要有一个线程工厂的属性。基于上述两点,我们对YangJobManager类进行实现:

package com.yang.job;

import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;

public class YangJobManager {
    private ScheduledExecutorService scheduledExecutorService;

    private ThreadFactory threadFactory;

    public YangJobManager(ScheduledExecutorService scheduledExecutorService, ThreadFactory threadFactory) {
        this.scheduledExecutorService = scheduledExecutorService;
        this.threadFactory = threadFactory;
    }

    public void schedule(Runnable runnable, Long delay) {
        Thread thread = threadFactory.newThread(runnable);
        scheduledExecutorService.schedule(thread, delay, TimeUnit.SECONDS);
    }

    public void scheduleWithFixedDelay(Runnable runnable, Long delay, Long period) {
        Thread thread = threadFactory.newThread(runnable);
        scheduledExecutorService.scheduleWithFixedDelay(thread, delay, period, TimeUnit.SECONDS);
    }

    public void scheduleWithFixedRate(Runnable runnable, Long delay, Long period) {
        Thread thread = threadFactory.newThread(runnable);
        scheduledExecutorService.scheduleAtFixedRate(thread, delay, period, TimeUnit.SECONDS);
    }

    public void shutdown() {
        if (this.scheduledExecutorService == null) {
            return;
        }
        if (this.scheduledExecutorService.isShutdown()) {
            return;
        }
        scheduledExecutorService.shutdown();
        try {
            if (!scheduledExecutorService.awaitTermination(10, TimeUnit.SECONDS)) {
                scheduledExecutorService.shutdownNow();
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

然后,我们实现YangJobThreadFactory,完成对线程的命名

public class YangJobThreadFactory implements ThreadFactory {
    private String poolName;

    private String threadPrefixName;

    private static AtomicInteger poolNumber = new AtomicInteger(1);

    private AtomicInteger threadNumber = new AtomicInteger(1);

    public YangJobThreadFactory(String poolName) {
        this.poolName = poolName;
        this.threadPrefixName = poolName + "-pool-" + poolNumber.getAndIncrement() + "-thread-";
    }

    public String getPoolName() {
        return this.poolName;
    }

    @Override
    public Thread newThread(Runnable r) {
        Thread thread = new Thread(r);
        thread.setName(this.threadPrefixName + threadNumber.getAndIncrement());
        return thread;
    }

}

然后我们添加测试方法:

 public static void main(String[] args) {
        ThreadFactory threadFactory = new YangJobThreadFactory("yang");
        ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(4, threadFactory);
        YangJobManager yangJobManager = new YangJobManager(scheduledExecutorService, threadFactory);

        yangJobManager.schedule(() -> {
            System.out.println(Thread.currentThread().getName() + "schedule定时任务开始执行:" + new Date());
        }, 1L);

        yangJobManager.scheduleWithFixedDelay(() -> {
            System.out.println(Thread.currentThread().getName() + "withFixedDelay定时任务开始执行:" + new Date());
        }, 0L, 1L);

        yangJobManager.scheduleWithFixedRate(() -> {
            System.out.println(Thread.currentThread().getName() + "withFixedRate定时任务开始执行:" + new Date());
        }, 0L, 1L);

        try {
            Thread.sleep(20000);
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
        yangJobManager.shutdown();
    }

执行结果如下:
image.png

提供统一的schedule方法

虽然我们能顺利将任务提交给YangJobManager执行,当感觉还不够收敛,因为我们创建了三个方法:schedule,scheduleWithFixedDelay, shceduleWithFixedRate,每个方法执行逻辑都差不多,最后都是调用scheduledExecutorService的相关方法,我们可以将这些方法都收敛到一个入口——schedule,然后在入参中添加一个参数,表示要执行的策略,根据入参的参数,选择对应的方法执行。
首先,我们添加一个执行策略枚举:

package com.yang.job.enums;

public enum JobExecuteStrategyEnum {
    IMMEDIATE_EXECUTE("immediate", "立即执行"),
    ONCE("once", "执行一次"),
    WITH_FIXED_DELAY("withFixedDelay", "任务执行完毕后间隔执行"),
    WITH_FIXED_RATE("withFixedRate", "任务执行开始后间隔执行");

    private String name;

    private String description;

    JobExecuteStrategyEnum(String name, String description) {
        this.name = name;
        this.description = description;
    }

    public String getName() {
        return this.name;
    }

    public static JobExecuteStrategyEnum getJobExecuteStrategyByName(String name) {
        if (name == null) {
            return null;
        }
        for (JobExecuteStrategyEnum value : values()) {
            if (name.equals(value.getName())) {
                return value;
            }
        }
        return null;
    }

    public static boolean isLegal(String name) {
        JobExecuteStrategyEnum jobExecuteStrategyByName = getJobExecuteStrategyByName(name);
        return jobExecuteStrategyByName != null;
    }

    public String getDescription() {
        return description;
    }
}

然后添加YangJobManager的schedule方法的入参类:

package com.yang.job.request;

import com.yang.job.enums.JobExecuteStrategyEnum;
import lombok.Data;

import java.io.Serializable;

@Data
public class YangJobSubmitParam implements Serializable {
    private Runnable runnable;
    
    private Integer initialDelay;
    
    private Integer period;
    
    private JobExecuteStrategyEnum jobExecuteStrategy;
}

最后,修改YangJobManager类,将执行定时任务收敛到schedule方法,进入该方法,首先根据入参判断执行策略,如果是immediate,那么直接对入参的runnable调用run方法执行接口,其他的策略则分别对应scheduledExecutorService的schedule、scheduledWithFixedDelay、scheduledWithFixedRate方法,此外,这里对属性也进行修改,去除ThreadFactory属性。

package com.yang.job;

import com.yang.job.enums.JobExecuteStrategyEnum;
import com.yang.job.request.YangJobSubmitParam;

import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

public class YangJobManager {
    private ScheduledExecutorService scheduledExecutorService;
    

    public YangJobManager(ScheduledExecutorService scheduledExecutorService) {
        this.scheduledExecutorService = scheduledExecutorService;
    }

    public void schedule(YangJobSubmitParam yangJobSubmitParam) {
        JobExecuteStrategyEnum jobExecuteStrategy = yangJobSubmitParam.getJobExecuteStrategy();
        if (jobExecuteStrategy == null) {
            throw new RuntimeException("缺少执行策略=========");
        }
        Runnable runnable = yangJobSubmitParam.getRunnable();
        Integer initialDelay = yangJobSubmitParam.getInitialDelay();
        Integer period = yangJobSubmitParam.getPeriod();
        switch (jobExecuteStrategy) {
            case IMMEDIATE_EXECUTE:
                runnable.run();
                break;
            case ONCE:
                scheduledExecutorService.schedule(runnable, initialDelay, TimeUnit.SECONDS);
                break;
            case WITH_FIXED_DELAY:
                scheduledExecutorService.scheduleWithFixedDelay(runnable, initialDelay, period, TimeUnit.SECONDS);
                break;
            case WITH_FIXED_RATE:
                scheduledExecutorService.scheduleAtFixedRate(runnable, initialDelay, period, TimeUnit.SECONDS);
                break;
        }
    }
    
    public void shutdown() {
        if (this.scheduledExecutorService == null) {
            return;
        }
        if (this.scheduledExecutorService.isShutdown()) {
            return;
        }
        scheduledExecutorService.shutdown();
        try {
            if (!scheduledExecutorService.awaitTermination(10, TimeUnit.SECONDS)) {
                scheduledExecutorService.shutdownNow();
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

最后,我们添加测试方法:

public static void main(String[] args) {
        ThreadFactory threadFactory = new YangJobThreadFactory("yang");
        ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(4, threadFactory);
        YangJobManager yangJobManager = new YangJobManager(scheduledExecutorService);

        YangJobSubmitParam yangJobSubmitParam1 = new YangJobSubmitParam();
        yangJobSubmitParam1.setRunnable(() -> System.out.println("立即执行======" + new Date()));
        yangJobSubmitParam1.setJobExecuteStrategy(JobExecuteStrategyEnum.IMMEDIATE_EXECUTE);

        YangJobSubmitParam yangJobSubmitParam2 = new YangJobSubmitParam();
        yangJobSubmitParam2.setRunnable(() -> System.out.println("执行一次======" + new Date()));
        yangJobSubmitParam2.setInitialDelay(1);
        yangJobSubmitParam2.setJobExecuteStrategy(JobExecuteStrategyEnum.ONCE);

        YangJobSubmitParam yangJobSubmitParam3 = new YangJobSubmitParam();
        yangJobSubmitParam3.setRunnable(() -> System.out.println("withFixedDelay=====" + new Date()));
        yangJobSubmitParam3.setInitialDelay(1);
        yangJobSubmitParam3.setPeriod(2);
        yangJobSubmitParam3.setJobExecuteStrategy(JobExecuteStrategyEnum.WITH_FIXED_DELAY);

        YangJobSubmitParam yangJobSubmitParam4 = new YangJobSubmitParam();
        yangJobSubmitParam4.setRunnable(() -> System.out.println("withFixedRate=====" + new Date()));
        yangJobSubmitParam4.setInitialDelay(1);
        yangJobSubmitParam4.setPeriod(2);
        yangJobSubmitParam4.setJobExecuteStrategy(JobExecuteStrategyEnum.WITH_FIXED_RATE);

        yangJobManager.schedule(yangJobSubmitParam1);
        yangJobManager.schedule(yangJobSubmitParam2);
        yangJobManager.schedule(yangJobSubmitParam3);
        yangJobManager.schedule(yangJobSubmitParam4);

        try {
            Thread.sleep(20000);
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
        yangJobManager.shutdown();
    }

执行结果如下:
image.png

提交任务和取消任务

任务的提交对应的是schedule方法,但我们的YangJobManager类缺少了关于任务的取消逻辑。在ScheduledExecutorService的各个定时执行方法中,其返回值是一个ScheduleFuture类,我们可以通过该类的cancel方法,来将对应的线程任务进行取消。此外,对于每一个任务,我们需要有一个任务标识,所以,我们先修改YangJobSubmitParam类:

package com.yang.job.request;

import com.yang.job.enums.JobExecuteStrategyEnum;
import lombok.Data;

import java.io.Serializable;

@Data
public class YangJobSubmitParam implements Serializable {
    private Integer jobId;
    
    private Runnable runnable;

    private Integer initialDelay;

    private Integer period;

    private JobExecuteStrategyEnum jobExecuteStrategy;
}

然后,我们修改YangJobManager类,首先将schedule方法改为submit方法,这样更见名知义,在submit方法中,除了理解执行策略外,其他策略都会获取返回的ScheduleFuture,然后存入对应的map,在取消的时候,我们根据jobId从map中找到对应的ScheduleFuture,并执行cancel方法,以此来取消任务。

package com.yang.job;

import com.yang.job.enums.JobExecuteStrategyEnum;
import com.yang.job.request.YangJobSubmitParam;

import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;

public class YangJobManager {
    private ScheduledExecutorService scheduledExecutorService;

    private Map<String, ScheduledFuture> jobId2ScheduleFutureMap = new ConcurrentHashMap<>();

    public YangJobManager(ScheduledExecutorService scheduledExecutorService) {
        this.scheduledExecutorService = scheduledExecutorService;
    }

    public void submitJob(YangJobSubmitParam yangJobSubmitParam) {
        Integer jobId = yangJobSubmitParam.getJobId();
        if (jobId == null) {
            throw new RuntimeException("缺少任务标识=========");
        }
        ScheduledFuture scheduledFuture = jobId2ScheduleFutureMap.get(jobId.toString());
        if (scheduledFuture != null && !scheduledFuture.isCancelled()) {
            // jobId存在对应的任务
            return;
        }
        
        JobExecuteStrategyEnum jobExecuteStrategy = yangJobSubmitParam.getJobExecuteStrategy();
        if (jobExecuteStrategy == null) {
            throw new RuntimeException("缺少执行策略=========");
        }
        
        if (jobExecuteStrategy == JobExecuteStrategyEnum.IMMEDIATE_EXECUTE) {
            yangJobSubmitParam.getRunnable().run();
            return;
        }
        scheduledFuture = scheduleJob(yangJobSubmitParam);
        jobId2ScheduleFutureMap.put(jobId.toString(), scheduledFuture);
    }
    
    public void cancelJob(Integer jobId) {
        if (jobId == null) {
            return;
        }
        ScheduledFuture scheduledFuture = jobId2ScheduleFutureMap.get(jobId.toString());
        if (scheduledFuture == null) {
            return;
        }
        if (!scheduledFuture.isCancelled()) {
            scheduledFuture.cancel(true);
        }
        jobId2ScheduleFutureMap.remove(jobId.toString());
    }

    private ScheduledFuture scheduleJob(YangJobSubmitParam yangJobSubmitParam) {
        Runnable runnable = yangJobSubmitParam.getRunnable();
        Integer initialDelay = yangJobSubmitParam.getInitialDelay();
        Integer period = yangJobSubmitParam.getPeriod();
        JobExecuteStrategyEnum jobExecuteStrategy = yangJobSubmitParam.getJobExecuteStrategy();
        switch (jobExecuteStrategy) {
            case ONCE:
                return scheduledExecutorService.schedule(runnable, initialDelay, TimeUnit.SECONDS);
            case WITH_FIXED_DELAY:
                return scheduledExecutorService.scheduleWithFixedDelay(runnable, initialDelay, period, TimeUnit.SECONDS);
            case WITH_FIXED_RATE:
                return scheduledExecutorService.scheduleAtFixedRate(runnable, initialDelay, period, TimeUnit.SECONDS);
        }
        throw new RuntimeException("执行策略有误===========");
    }

    public void shutdown() {
        if (this.scheduledExecutorService == null) {
            return;
        }
        if (this.scheduledExecutorService.isShutdown()) {
            return;
        }
        scheduledExecutorService.shutdown();
        try {
            if (!scheduledExecutorService.awaitTermination(10, TimeUnit.SECONDS)) {
                scheduledExecutorService.shutdownNow();
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

最后,我们添加对应的测试方法:

 public static void main(String[] args) {
        ThreadFactory threadFactory = new YangJobThreadFactory("yang");
        ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(4, threadFactory);

        YangJobManager yangJobManager = new YangJobManager(scheduledExecutorService);
        YangJobSubmitParam yangJobSubmitParam = new YangJobSubmitParam();
        yangJobSubmitParam.setJobId(1);
        yangJobSubmitParam.setRunnable(() -> System.out.println("执行任务=====" + new Date()));
        yangJobSubmitParam.setInitialDelay(0);
        yangJobSubmitParam.setPeriod(2);
        yangJobSubmitParam.setJobExecuteStrategy(JobExecuteStrategyEnum.WITH_FIXED_RATE);
        yangJobManager.submitJob(yangJobSubmitParam);

        try {
            Thread.sleep(10000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("取消任务==========");
        yangJobManager.cancelJob(1);
        try {
            Thread.sleep(10000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        yangJobManager.shutdown();

    }

在该方法中,我们提交任务,该任务间隔时间为2秒,10秒过后,取消任务,取消任务过后,再睡眠10秒,在后面10秒钟,不会执行任务(或执行一次,因为在cancel之前刚好有任务没执行完),执行结果如下:
image.png

YangJobManager建造者

对于YangJobManager,目前我们所拥有的属性、方法都比较简单,但是如果后续这个类进一步扩展,构造该类可能会变得很麻烦,因此,我们添加一个YangJobBuilder建造者类,用于构造YangJobManager,此外,我们将YangJobManager的构造方法设置为private,从而将构造YangJobManager的职责,彻底收敛到YangJobManagerBuilder类中,我们修改YangJobManager类如下:

package com.yang.job;

import com.yang.job.enums.JobExecuteStrategyEnum;
import com.yang.job.factory.YangJobThreadFactory;
import com.yang.job.request.YangJobSubmitParam;

import java.util.Map;
import java.util.concurrent.*;

public class YangJobManager {
    private ScheduledExecutorService scheduledExecutorService;

    private Map<String, ScheduledFuture> jobId2ScheduleFutureMap = new ConcurrentHashMap<>();

    private YangJobManager(ScheduledExecutorService scheduledExecutorService) {
        this.scheduledExecutorService = scheduledExecutorService;
    }

    public void submitJob(YangJobSubmitParam yangJobSubmitParam) {
        Integer jobId = yangJobSubmitParam.getJobId();
        if (jobId == null) {
            throw new RuntimeException("缺少任务标识=========");
        }
        ScheduledFuture scheduledFuture = jobId2ScheduleFutureMap.get(jobId.toString());
        if (scheduledFuture != null && !scheduledFuture.isCancelled()) {
            // jobId存在对应的任务
            return;
        }

        JobExecuteStrategyEnum jobExecuteStrategy = yangJobSubmitParam.getJobExecuteStrategy();
        if (jobExecuteStrategy == null) {
            throw new RuntimeException("缺少执行策略=========");
        }

        if (jobExecuteStrategy == JobExecuteStrategyEnum.IMMEDIATE_EXECUTE) {
            yangJobSubmitParam.getRunnable().run();
            return;
        }
        scheduledFuture = scheduleJob(yangJobSubmitParam);
        jobId2ScheduleFutureMap.put(jobId.toString(), scheduledFuture);
    }

    public void cancelJob(Integer jobId) {
        if (jobId == null) {
            return;
        }
        ScheduledFuture scheduledFuture = jobId2ScheduleFutureMap.get(jobId.toString());
        if (scheduledFuture == null) {
            return;
        }
        if (!scheduledFuture.isCancelled()) {
            scheduledFuture.cancel(true);
        }
        jobId2ScheduleFutureMap.remove(jobId.toString());
    }

    private ScheduledFuture scheduleJob(YangJobSubmitParam yangJobSubmitParam) {
        Runnable runnable = yangJobSubmitParam.getRunnable();
        Integer initialDelay = yangJobSubmitParam.getInitialDelay();
        Integer period = yangJobSubmitParam.getPeriod();
        JobExecuteStrategyEnum jobExecuteStrategy = yangJobSubmitParam.getJobExecuteStrategy();
        switch (jobExecuteStrategy) {
            case ONCE:
                return scheduledExecutorService.schedule(runnable, initialDelay, TimeUnit.SECONDS);
            case WITH_FIXED_DELAY:
                return scheduledExecutorService.scheduleWithFixedDelay(runnable, initialDelay, period, TimeUnit.SECONDS);
            case WITH_FIXED_RATE:
                return scheduledExecutorService.scheduleAtFixedRate(runnable, initialDelay, period, TimeUnit.SECONDS);
        }
        throw new RuntimeException("执行策略有误===========");
    }

    public void shutdown() {
        if (this.scheduledExecutorService == null) {
            return;
        }
        if (this.scheduledExecutorService.isShutdown()) {
            return;
        }
        scheduledExecutorService.shutdown();
        try {
            if (!scheduledExecutorService.awaitTermination(10, TimeUnit.SECONDS)) {
                scheduledExecutorService.shutdownNow();
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    public static class YangJobManagerBuilder {
        private ThreadFactory threadFactory;

        private ScheduledExecutorService scheduledExecutorService;

        public YangJobManagerBuilder() {
        }

        public YangJobManagerBuilder setThreadFactory(ThreadFactory threadFactory) {
            this.threadFactory = threadFactory;
            return this;
        }
        
        public YangJobManagerBuilder setScheduledExecutorService(ScheduledExecutorService scheduledExecutorService) {
            this.scheduledExecutorService = scheduledExecutorService;
            return this;
        }
        
        public YangJobManager build() {
            if (this.threadFactory == null) {
                this.threadFactory = new YangJobThreadFactory("yang");
            }
            if (this.scheduledExecutorService == null) {
                this.scheduledExecutorService = Executors.newScheduledThreadPool(Runtime.getRuntime().availableProcessors(),
                        this.threadFactory);
            } else {
                if (this.scheduledExecutorService instanceof ScheduledThreadPoolExecutor) {
                    ScheduledThreadPoolExecutor scheduledThreadPoolExecutor = (ScheduledThreadPoolExecutor) this.scheduledExecutorService;
                    scheduledThreadPoolExecutor.setThreadFactory(this.threadFactory);
                }
            }
            return new YangJobManager(this.scheduledExecutorService);
        }
    }
}

任务执行类

在之前的代码中,我们的Runnable都是匿名函数类,但是在我们的定时任务调度平台中,一般情况下,这个任务是会持久化到数据库中的,我们一般不会说把这个Runnable的代码也存到数据库吧,一般存储的,应该就是某个任务执行类的类路径,和方法名,以及入参,然后在启动项目时,从数据库中加载这些数据,并通过反射或代理等方式,来构造这个Runnable。
首先,我们定义一个任务执行类,来规范任务的执行方法和入参格式:

// 任务执行类
package com.yang.job.execute;

public interface IYangJobExecutor {
    void execute(YangJobExecuteRequest yangJobExecuteRequest);
}

// 任务执行方法入参
package com.yang.job.execute;

import lombok.Data;

import java.io.Serializable;
import java.util.HashMap;
import java.util.Map;

@Data
public class YangJobExecuteRequest implements Serializable {
    private String jobId;

    private Map<String, String> params = new HashMap<>();

    public void addParam(String key, String value) {
        params.put(key, value);
    }

    public String getParam(String key) {
        return params.get(key);
    }
}

接着,我们创建这个YangJobExecutor的实现类,用于测试,在该类中,执行任务的方法很简单,打印当前类的名字以及入参。

package com.yang.task;

import com.yang.job.execute.IYangJobExecutor;
import com.yang.job.execute.YangJobExecuteRequest;

import java.util.Date;

public class TestJobExecutor implements IYangJobExecutor {
    @Override
    public void execute(YangJobExecuteRequest yangJobExecuteRequest) {
        System.out.println(String.format("%s 任务执行类执行了,入参为:%s, 当前时间:%s",
                this.getClass().getName(), yangJobExecuteRequest.toString(),
                new Date().toString()));
    }
}

然后我们创建一个YangJobData,假设我们从数据库中获取的数据格式如下:

package com.yang.job.data;

import lombok.Data;

import java.io.Serializable;

@Data
public class YangJobData implements Serializable {
    private Integer jobId;
    
    private String cron;
    
    private String executeStrategy;
    
    private String executeClassPath;
    
    private String executeParams;
}

executeStrategy表示任务的执行策略,executeClassPath表示要执行的任务类的路径,executeParams表示执行任务方法的入参。
在XXL-JOB中,我们可以使用cron来设置定时任务的执行时间,因此我们这里,也使用cron作为定时任务的执行时间设置,为了解析cron表达式,我们添加下列依赖:

  <dependency>
            <groupId>com.cronutils</groupId>
            <artifactId>cron-utils</artifactId>
            <version>9.2.0</version>
        </dependency>

然后创建一个CronUtils工具类,用于解析cron表达式。

package com.yang.demo.infra.utils;

import com.cronutils.model.CronType;
import com.cronutils.model.definition.CronDefinition;
import com.cronutils.model.definition.CronDefinitionBuilder;
import com.cronutils.model.time.ExecutionTime;
import com.cronutils.parser.CronParser;

import java.time.ZonedDateTime;
import java.util.Optional;

public class CronUtils {
    private static final CronDefinition CRON_DEFINITION = CronDefinitionBuilder.instanceDefinitionFor(CronType.QUARTZ);
    private static final CronParser CRON_PARSER = new CronParser(CRON_DEFINITION);

    public static ZonedDateTime nextExecutionTime(String cron, ZonedDateTime startTime) {
        ExecutionTime executionTime = ExecutionTime.forCron(CRON_PARSER.parse(cron));
        Optional<ZonedDateTime> zonedDateTime = executionTime.nextExecution(startTime);
        return zonedDateTime.get();
    }
}

对于执行方法的入参,一般情况下,就是任务的id,以及一些扩展信息,这些扩展信息一般以键值对的形式存储,即"key:value;key:value;"这些形式,所以这里添加一个FeaturesUtils类,用于解析这些键值对信息:

package com.yang.job.utils;


import java.util.HashMap;
import java.util.Map;

public class FeaturesUtils {
    private final static String KEY_KEY_SEPARATOR = ";";
    private final static String KEY_VALUE_SEPARATOR = ":";

    public static Map<String, String> convert2FeatureMap(String features) {
        Map<String, String> featureMap = new HashMap<>();
        if (features == null || features.isEmpty()) {
            return featureMap;
        }
        String[] keyValues = features.split(KEY_KEY_SEPARATOR);
        for (String keyValue : keyValues) {
            String[] split = keyValue.split(KEY_VALUE_SEPARATOR);
            String key = split[0];
            String value = split[1];
            featureMap.put(key, value);
        }
        return featureMap;
    }

    public static String convert2Features(Map<String, String> featureMap) {
        if (featureMap == null || featureMap.isEmpty()) {
            return "";
        }
        StringBuilder stringBuilder = new StringBuilder();
        featureMap.forEach((key, value) -> {
            stringBuilder.append(key)
                    .append(KEY_VALUE_SEPARATOR)
                    .append(value)
                    .append(KEY_KEY_SEPARATOR);
        });
        return stringBuilder.toString();
    }
}

然后我们添加测试方法,模拟从数据库中获取数据,并根据任务类路径,获取对应的runnable并提交到YangJobManager中。

  public static void main(String[] args) {
        YangJobData yangJobData = mockYangJobData();
        YangJobSubmitParam yangJobSubmitParam = convert2YangJobSubmitParam(yangJobData);

        YangJobManager yangJobManager = new YangJobManager.YangJobManagerBuilder()
                .setThreadFactory(new YangJobThreadFactory("yang"))
                .build();
        yangJobManager.submitJob(yangJobSubmitParam);

        try {
            Thread.sleep(20000);
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
        yangJobManager.shutdown();
    }

    private static YangJobSubmitParam convert2YangJobSubmitParam(YangJobData yangJobData) {
        YangJobSubmitParam yangJobSubmitParam = new YangJobSubmitParam();
        yangJobSubmitParam.setJobId(yangJobData.getJobId());
        yangJobSubmitParam.setJobExecuteStrategy(JobExecuteStrategyEnum.getJobExecuteStrategyByName(yangJobData.getExecuteStrategy()));
        ZonedDateTime nextExecutionTime = CronUtils.nextExecutionTime(yangJobData.getCron(), ZonedDateTime.now());
        ZonedDateTime nextNextExecutionTime = CronUtils.nextExecutionTime(yangJobData.getCron(), nextExecutionTime);
        long nowEochMill = ZonedDateTime.now().toInstant().toEpochMilli();
        long executeEochMill = nextExecutionTime.toInstant().toEpochMilli();
        long secondExecuteEochMill = nextNextExecutionTime.toInstant().toEpochMilli();
        yangJobSubmitParam.setInitialDelay((int)(executeEochMill - nowEochMill) / 1000);
        yangJobSubmitParam.setPeriod((int)(secondExecuteEochMill - executeEochMill) / 1000);

        try {
            Class<?> aClass = Class.forName(yangJobData.getExecuteClassPath());
            if (!IYangJobExecutor.class.isAssignableFrom(aClass)) {
                throw new RuntimeException("任务类必须实现IYangJobExecutor接口");
            }
            IYangJobExecutor executor = (IYangJobExecutor) aClass.newInstance();
            YangJobExecuteRequest yangJobExecuteRequest = convert2YangJobExecuteRequest(yangJobData);
            Runnable runnable = () -> executor.execute(yangJobExecuteRequest);
            yangJobSubmitParam.setRunnable(runnable);
        } catch (InstantiationException | IllegalAccessException e) {
            e.printStackTrace();
        } catch (ClassNotFoundException e) {
            e.printStackTrace();
        }
        return yangJobSubmitParam;
    }

    private static YangJobExecuteRequest convert2YangJobExecuteRequest(YangJobData yangJobData) {
        YangJobExecuteRequest yangJobExecuteRequest = new YangJobExecuteRequest();
        yangJobExecuteRequest.setJobId(yangJobData.getJobId().toString());
        yangJobExecuteRequest.setParams(FeaturesUtils.convert2FeatureMap(yangJobData.getExecuteParams()));
        return yangJobExecuteRequest;
    }

    private static YangJobData mockYangJobData() {
        YangJobData yangJobData = new YangJobData();
        yangJobData.setJobId(1);
        yangJobData.setCron("0/5 * * * * ?");
        yangJobData.setExecuteStrategy(JobExecuteStrategyEnum.WITH_FIXED_DELAY.getName());
        yangJobData.setExecuteClassPath("com.yang.task.TestJobExecutor");
        yangJobData.setExecuteParams("jobId:1;startIndex:1;endIndex:10;");
        return yangJobData;
    }

这里对于cron的解析,其实不是特别好,这里的思路是,获取下一次执行的时间,和下下一次执行的时间,然后以此来计算initialDelay和period,但是如果这个cron表示的是某几天、某几个小时,比如说星期一、星期二、星期三执行,那么我们那种解析方式是有误的,这个可以后续再好好斟酌一下,目前先这样解析。
执行结果如下:
image.png

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1676551.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

讲解SSM的xml文件

概述&#xff1a;这些配置文件很烦&#xff0c;建议直接复制粘贴 springMVC.xml文件 <?xml version"1.0" encoding"UTF-8"?> <beans xmlns"http://www.springframework.org/schema/beans"xmlns:xsi"http://www.w3.org/2001/XM…

k8s endpoint

Endpoint Service 并不是和 pod 直接相连的&#xff0c;Endpoint 介于两者之间。Endpoint 资源就是暴露一个服务的 IP 地址和端口的列表。 虽然在 spec 服务中定义了 pod 选择器&#xff0c;但在重定向传入连接时不会直接使用它。选择器用于构建 IP 和端口列表&#xff0c;然…

SQOOP详细讲解

SQOOP安装及使用 SQOOP安装及使用SQOOP安装1、上传并解压2、修改文件夹名字3、修改配置文件4、修改环境变量5、添加MySQL连接驱动6、测试准备MySQL数据登录MySQL数据库创建student数据库切换数据库并导入数据另外一种导入数据的方式使用Navicat运行SQL文件导出MySQL数据库impo…

企业必看:镭速教你如何测试内网文件传输效率和稳定问题

在现代商业运作中&#xff0c;企业内部文件传输的效率和稳定性对于数据管理和业务流程极为重要。无论是远程工作还是团队协作&#xff0c;高效的文件传输都能显著提升工作效率。今天镭速小编就教你如何测试内网文件传输效率和稳定问题。 1、磁盘性能&#xff0c;即硬盘的读取和…

JVM从1%到99%【精选】-运行时数据区

目录 1.总览运行时数据区 2.内存溢出 3. 程序计数器 4.虚拟机栈 5.本地方法栈 6.堆 7.方法区 8.直接内存 1.总览运行时数据区 Java虚拟机在运行Java程序过程中管理的内存区域,称之为运行时数据区。运行时数据区可以分为方法区、堆、虚拟机栈、本地方法栈、程序计数器…

分享一些关于软件测试的面试问题

由于最近在面试软件测试工程师的岗位&#xff0c;遇到了一些面试的问题以及我的答案&#xff0c;希望对正在找工作的同行有些帮助&#xff0c;仅仅作为软件测试行业&#xff0c;求职者看问题就行&#xff0c;大佬可以探讨一下答案。 1.提交了bug&#xff0c;开发不认为是bug怎么…

JavaScript的综合案例

案例要求&#xff1a; 实现一个表单验证 1.当输入框失去焦点时&#xff0c;验证输入的内容是否符合要求 2.当点击注册按钮时&#xff0c;判断所有输入框的内容是否都符合要求&#xff0c;如果不符合要求阻止表单提交 简单的页面实现 <!DOCTYPE html> <html lang&…

凸优化理论学习三|凸优化问题(一)

系列文章目录 凸优化理论学习一|最优化及凸集的基本概念 凸优化理论学习二|凸函数及其相关概念 文章目录 系列文章目录一、优化问题&#xff08;一&#xff09;标准形式的优化问题&#xff08;二&#xff09;可行点和最优点&#xff08;三&#xff09;局部最优点&#xff08;四…

【万字面试题】Redis

文章目录 常见面试题布隆过滤器原理和数据结构&#xff1a;特点和应用场景&#xff1a;缺点和注意事项&#xff1a;在python中使用布隆过滤器 三种数据删除策略LRU (Least Recently Used)工作原理&#xff1a;应用场景&#xff1a; LFU (Least Frequently Used)工作原理&#x…

振弦式应变计的灵敏系数k范围探讨

振弦式应变计是一种广泛应用于工程结构健康监测的重要设备&#xff0c;其灵敏系数k是衡量其性能的关键指标。本文将探讨振弦式应变计的灵敏系数k的一般范围&#xff0c;并分析影响灵敏系数的因素。 一、振弦式应变计的工作原理 振弦式应变计通过测量振弦在受力作用下的振动频率…

屡被约谈的货拉拉三闯IPO,CEO周胜馥IPO前套现11亿

近日&#xff0c;货运巨头货拉拉第三次闯关IPO。虽然其实现首次年度盈利&#xff0c;但光鲜数据背后的盈利模式却频遭诟病。 货拉拉的创始人周胜馥从美国高校毕业后&#xff0c;曾供职于贝恩咨询公司&#xff0c;并在期间对创业产生了浓厚兴趣。抛开履历中的高学历好工作的部分…

Amesim基础篇-表格类型设置与读取

前言 在Amesim仿真中,不可避免需要应用到表格。如新能源动力电池中内阻、充电倍率的调取,压缩机的机械效率、容积效率等,水泵的效率,管路的压降等等。本文将介绍如何对表格类型的选择与参数输入。 1 进入表格设置界面 如下图所示,在Amesim界面的右上角Table Editor进入…

EEL中 python端的函数名是如何传递给js端的

python端的函数名是如何传递给js端的 核心步骤&#xff1a;将函数名列表注入到动态生成的 eel.js 中&#xff0c;这样前端一开始引用的eel.js本身已经包含有py_function的函数名列表了。你打开开发者工具看看浏览器中的 eel.js文件源代码就知道了。 具体实现&#xff1a; # 读…

使用 Flask Blueprint 实现模块化 Web 应用

文章目录 1. 什么是 Flask Blueprint&#xff1f;2. 为什么要使用 Flask Blueprint&#xff1f;3. 如何使用 Flask Blueprint&#xff1f;4. 在 Blueprint 之间进行通信5. 结合 Flask 插件系统进行功能拓展结语 当构建大型 Flask Web 应用时&#xff0c;保持代码的组织结构清晰…

etcd单机部署和集群部署

1、etcd单实例部署 对于平常的学习&#xff0c;其实搭建一个单机节点是够了的。接下来就讲讲怎么搭建单机节点。 本次部署是在 centos7 系统&#xff0c;cpu 为amd64 上面进行的。 部署是直接使用官方编译好的二进制文件&#xff0c;大家也可以直接看 ectd-releases 界面选择…

linux 环境下 分布式文件搭建fastDFS

1.软件信息 地址&#xff1a;happyfish100 (YuQing) GitHub 1.fastdfs-master.zip 2.fastdfs-nginx-module-master.zip 3.libfastcommon-master.zip 4.libserverframe-master.zip yum install make cmake gcc gcc-c perl 2.安装libfastcommon unzip libfastcommon-mast…

CPU利用率使用教程

本文主要参考&#xff1a; 一文让你学到 nmon最详尽的用法 Linux性能监控命令_nmon 安装与使用 如果你是在Ubuntu上安装nmon&#xff0c;使用&#xff1a; apt install nmon安装好后&#xff0c;直接运行 $:nmon #运行如果是后台抓数据&#xff1a; -f 参数: 生成文件,文件…

北京玻色量子携手赛氪网举办长三角高校数学建模竞赛巡回讲座

2024年5月13日下午&#xff0c;一场聚焦数学建模与量子计算前沿的讲座在中国计量大学隆重举行。此次讲座作为第四届长三角高校数学建模竞赛的巡回宣讲活动之一&#xff0c;由北京玻色量子科技有限公司与竞赛组委会成员赛氪网共同举办&#xff0c;旨在向广大师生介绍量子计算的应…

企业如何利用美国多IP服务器来提升网站的安全性?

企业如何利用美国多IP服务器来提升网站的安全性? 在当前网络环境下&#xff0c;网站安全性日益成为企业面临的重要挑战。为了有效应对各种潜在威胁&#xff0c;越来越多的企业选择利用美国多IP服务器来提升其网站的安全性。这种服务器配置能够通过一系列策略来增加网站的安全…

滚珠螺杆在精密机械设备中如何维持精度要求?

滚珠螺杆在精密设备领域中的运用非常之广泛&#xff0c;具有精度高、效率高的特点。为了确保滚珠螺杆在生产设备中能够发挥最佳性能&#xff0c;我们必须从多个维度进行深入考量&#xff0c;并采取针对性的措施&#xff0c;以确保其稳定、精准地服务于现代化生产的每一个环节。…