TaskScheduler
概述
TaskScheduler是spring 3.0版本后,自带了一个定时任务工具,不用配置文件,可以动态改变执行状态。也可以使用cron表达式设置定时任务。
被执行的类要实现Runnable接口
TaskScheduler是一个接口,它定义了6个方法
接口的6种方法
public interface TaskScheduler {
/**
* 提交任务调度请求
* @param task 待执行任务
* @param trigger 使用Trigger指定任务调度规则
* @return
*/
ScheduledFuture schedule(Runnable task, Trigger trigger);
/**
* 提交任务调度请求
* 注意任务只执行一次,使用startTime指定其启动时间
* @param task 待执行任务
* @param startTime 任务启动时间
* @return
*/
ScheduledFuture schedule(Runnable task, Date startTime);
/**
* 使用fixedRate的方式提交任务调度请求
* 任务首次启动时间由传入参数指定
* @param task 待执行的任务
* @param startTime 任务启动时间
* @param period 两次任务启动时间之间的间隔时间,默认单位是毫秒
* @return
*/
ScheduledFuture scheduleAtFixedRate(Runnable task, Date startTime, long period);
/**
* 使用fixedRate的方式提交任务调度请求
* 任务首次启动时间未设置,任务池将会尽可能早的启动任务
* @param task 待执行任务
* @param period 两次任务启动时间之间的间隔时间,默认单位是毫秒
* @return
*/
ScheduledFuture scheduleAtFixedRate(Runnable task, long period);
/**
* 使用fixedDelay的方式提交任务调度请求
* 任务首次启动时间由传入参数指定
* @param task 待执行任务
* @param startTime 任务启动时间
* @param delay 上一次任务结束时间与下一次任务开始时间的间隔时间,单位默认是毫秒
* @return
*/
ScheduledFuture scheduleWithFixedDelay(Runnable task, Date startTime, long delay);
/**
* 使用fixedDelay的方式提交任务调度请求
* 任务首次启动时间未设置,任务池将会尽可能早的启动任务
* @param task 待执行任务
* @param delay 上一次任务结束时间与下一次任务开始时间的间隔时间,单位默认是毫秒
* @return
*/
ScheduledFuture scheduleWithFixedDelay(Runnable task, long delay);
}
0、ThreadPoolTaskScheduler
在 ThreadPoolTaskSchedulerConfig 中定义 ThreadPoolTaskScheduler bean
@Configuration
public class ThreadPoolTaskSchedulerConfig {
@Bean
public ThreadPoolTaskScheduler threadPoolTaskScheduler(){
ThreadPoolTaskScheduler threadPoolTaskScheduler
= new ThreadPoolTaskScheduler();
threadPoolTaskScheduler.setPoolSize(5);
threadPoolTaskScheduler.setThreadNamePrefix(
"ThreadPoolTaskScheduler");
return threadPoolTaskScheduler;
}
}
配置的 bean threadPoolTaskScheduler 可以根据配置的池大小 5 异步执行任务。
请注意,所有与 ThreadPoolTaskScheduler 相关的线程名称都将以ThreadPoolTaskScheduler 为前缀。
让我们实现一个简单的任务,然后我们可以安排:
class RunnableTask implements Runnable{
private String message;
public RunnableTask(String message){
this.message = message;
}
@Override
public void run() {
System.out.println(new Date()+" Runnable Task with "+message
+" on thread "+Thread.currentThread().getName());
}
}
1、schedule(Runnable task, Trigger trigger)
指定一个触发器执行定时任务。可以使用CronTrigger来指定Cron表达式,执行定时任务
如下:使用CronTrigger 来根据 cron 表达式调度任务,可以使用提供的触发器按照某个指定的节奏或时间表运行任务,在这种情况下,RunnableTask 将在每分钟的第 10 秒执行。
taskScheduler.schedule(new RunnableTask("Cron Trigger"), cronTrigger);
2、schedule(Runnable task, Date startTime);
指定一个具体时间点执行定时任务,可以动态的指定时间,开启任务,只执行一次
如下:配置一个任务在 1000 毫秒的固定延迟后运行,RunnableTask 将始终在一次执行完成和下一次执行开始之间运行 1000 毫秒。
taskScheduler.schedule(
new Runnabletask("Specific time, 3 Seconds from now"),
new Date(System.currentTimeMillis + 3000)
);
3、scheduleAtFixedRate(Runnable task, long period);
立即执行,循环任务,指定一个执行周期(毫秒计时)
PS:不管上一个周期是否执行完,到时间下个周期就开始执行
如下:安排一个任务以固定的毫秒速率运行,下一个 RunnableTask 将始终在 2000 毫秒后运行,而不管上次执行的状态如何,它可能仍在运行。
taskScheduler.scheduleAtFixedRate(
new RunnableTask("Fixed Rate of 2 seconds") ,
2000);
4、scheduleAtFixedRate(Runnable task, Date startTime, long period);
指定时间开始执行,循环任务,指定一个间隔周期(毫秒计时)
PS:不管上一个周期是否执行完,到时间下个周期就开始执行
如下:使用CronTrigger 来根据 cron 表达式调度任务,可以使用提供的触发器按照某个指定的节奏或时间表运行任务,在这种情况下,RunnableTask 将在每分钟的第 10 秒执行。
taskScheduler.scheduleAtFixedRate(new RunnableTask(
"Fixed Rate of 2 seconds"), new Date(), 3000);
5、scheduleWithFixedDelay(Runnable task, long delay);
立即执行,循环任务,指定一个间隔周期(毫秒计时)
PS:上一个周期执行完,等待delay时间,下个周期开始执行
如下:配置一个任务在 1000 毫秒的固定延迟后运行,RunnableTask 将始终在一次执行完成和下一次执行开始之间运行 1000 毫秒。
taskScheduler.scheduleWithFixedDelay(
new RunnableTask("Fixed 1 second Delay"),
1000);
6、scheduleWithFixedDelay(Runnable task, Date startTime, long delay);
指定时间开始执行,循环任务,指定一个间隔周期(毫秒计时)
PS:上一个周期执行完,等待delay时间,下个周期开始执行
如下:将任务配置为在给定开始时间的固定延迟后运行,RunnableTask 将在指定的执行时间被调用,其中包括 @PostConstruct 方法开始的时间,随后延迟 1000 毫秒。
taskScheduler.scheduleWithFixedDelay(
new RunnableTask("Current Date Fixed 1 second Delay"),
new Date(),
1000);
接口5个实现类
1、ConcurrentTaskScheduler
以当前线程执行任务,如果任务简单,可以直接使用这个类来执行,快捷方便
单线程运行
public class LocTest implements Runnable {
private ConcurrentTaskScheduler concurrentTaskScheduler = new ConcurrentTaskScheduler();
private void start() {
concurrentTaskScheduler.schedule(this, new Date());
}
public void run() {
Thread thread = Thread.currentThread();
System.out.println("current id:" + thread.getId());
System.out.println("current name:" + thread.getName());
}
public static void main(String[] args) {
new LocTest().start();
}
}
2、DefaultManagedTaskScheduler
以当前线程执行任务,是ConcurrentTaskScheduler的子类,添加了JNDI的支持。
和ConcurrentTaskScheduler一样的用法,需要使用JNDI可以单独设置
3、ThreadPoolTaskScheduler
TaskScheduler接口的默认实现类,多线程定时任务执行。可以设置执行线程池数(默认一个线程)
使用前必须得先调用initialize()【初始化方法】
有shutDown()方法,执行完后可以关闭线程
除实现了TaskScheduler接口中的方法外,它还包含了一些对ScheduledThreadPoolExecutor进行操作的接口,其常用方法如下:
- setPoolSize :
设置线程池大小,最小为1,默认情况下也为1
; - setErrorHandler :
设置异常处理器
。 - getScheduledThreadPoolExecutor :
获取ScheduledExecutor,默认ScheduledThreadPoolExecutor类型
。 - getActiveCount :
获取当前活动的线程数
- execute :
提交执行一次的任务
- submit\submitListenable :
提交执行一次的任务,并且返回一个Future对象供判断任务状态使用
public class LocTest implements Runnable {
private ThreadPoolTaskScheduler taskScheduler = new ThreadPoolTaskScheduler();
private void start() {
taskScheduler.setPoolSize(10);
//必须得先初始化,才能使用
taskScheduler.initialize();
taskScheduler.schedule(this, new Date());
}
public void run() {
Thread ct = Thread.currentThread();
System.out.println("current id:"+ct.getId());
System.out.println("current name:"+ct.getName());
}
public static void main(String[] args) {
new LocTest().start();
}
}
4、TimerManagerTaskScheduler
用于包装CommonJ中的TimerManager接口。在使用CommonJ进行调度时使用
spring boot使用TaskScheduler实现动态增删启停定时任务
SchedulingConfig:添加执行定时任务的线程池配置类
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.TaskScheduler;
import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;
@Configuration
public class SchedulingConfig {
@Bean
public TaskScheduler taskScheduler() {
ThreadPoolTaskScheduler taskScheduler = new ThreadPoolTaskScheduler();
// 定时任务执行线程池核心线程数
taskScheduler.setPoolSize(4);
taskScheduler.setRemoveOnCancelPolicy(true);
taskScheduler.setThreadNamePrefix("TaskSchedulerThreadPool-");
return taskScheduler;
}
}
ScheduledTask:添加ScheduledFuture的包装类
ScheduledFuture是ScheduledExecutorService定时任务线程池的执行结果。
import java.util.concurrent.ScheduledFuture;
public final class ScheduledTask {
volatile ScheduledFuture<?> future;
/**
* 取消定时任务
*/
public void cancel() {
ScheduledFuture<?> future = this.future;
if (future != null) {
future.cancel(true);
}
}
}
SchedulingRunnable:添加Runnable接口实现类
添加Runnable接口实现类,被定时任务线程池调用,用来执行指定bean里面的方法
import org.apache.commons.lang.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.util.ReflectionUtils;
import java.lang.reflect.Method;
import java.util.Objects;
public class SchedulingRunnable implements Runnable {
private static final Logger logger = LoggerFactory.getLogger(SchedulingRunnable.class);
private final String beanName;
private final String methodName;
private final String params;
public SchedulingRunnable(String beanName, String methodName) {
this(beanName, methodName, null);
}
public SchedulingRunnable(String beanName, String methodName, String params) {
this.beanName = beanName;
this.methodName = methodName;
this.params = params;
}
@Override
public void run() {
logger.info("定时任务开始执行 - bean:{},方法:{},参数:{}", beanName, methodName, params);
long startTime = System.currentTimeMillis();
try {
Object target = SpringContextUtils.getBean(beanName);
Method method = null;
if (StringUtils.isNotEmpty(params)) {
method = target.getClass().getDeclaredMethod(methodName, String.class);
} else {
method = target.getClass().getDeclaredMethod(methodName);
}
ReflectionUtils.makeAccessible(method);
if (StringUtils.isNotEmpty(params)) {
method.invoke(target, params);
} else {
method.invoke(target);
}
} catch (Exception ex) {
logger.error(String.format("定时任务执行异常 - bean:%s,方法:%s,参数:%s ", beanName, methodName, params), ex);
}
long times = System.currentTimeMillis() - startTime;
logger.info("定时任务执行结束 - bean:{},方法:{},参数:{},耗时:{} 毫秒", beanName, methodName, params, times);
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
SchedulingRunnable that = (SchedulingRunnable) o;
if (params == null) {
return beanName.equals(that.beanName) &&
methodName.equals(that.methodName) &&
that.params == null;
}
return beanName.equals(that.beanName) &&
methodName.equals(that.methodName) &&
params.equals(that.params);
}
@Override
public int hashCode() {
if (params == null) {
return Objects.hash(beanName, methodName);
}
return Objects.hash(beanName, methodName, params);
}
}
CronTaskRegistrar:添加定时任务注册类,用来增加、删除定时任务
import com.example.testspringboot.cron.ScheduleResult;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.TaskScheduler;
import org.springframework.scheduling.config.CronTask;
import org.springframework.stereotype.Component;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
/**
* 添加定时任务注册类,用来增加、删除定时任务。
*/
@Component
public class CronTaskRegistrar implements DisposableBean {
private final Map<Runnable, ScheduledTask> scheduledTasks = new ConcurrentHashMap<>(16);
private final Map<Integer, ScheduleResult> schedulerJob = new HashMap<>();
@Autowired
private TaskScheduler taskScheduler;
public TaskScheduler getScheduler() {
return this.taskScheduler;
}
public void addCronTask(ScheduleResult scheduleResult) {
SchedulingRunnable task = new SchedulingRunnable(scheduleResult.getBeanName(), scheduleResult.getMethodName(), scheduleResult.getMethodParams());
String cronExpression = scheduleResult.getCronExpression();
CronTask cronTask = new CronTask(task, cronExpression);
// 如果当前包含这个任务,则移除
if (this.scheduledTasks.containsKey(task)) {
removeCronTask(scheduleResult.getBeanName(), scheduleResult.getMethodName(), scheduleResult.getMethodParams());
}
schedulerJob.put(scheduleResult.getJobId(), scheduleResult);
this.scheduledTasks.put(task, scheduleCronTask(cronTask));
}
public void removeCronTask(String beanName, String methodName, String methodParams) {
SchedulingRunnable task = new SchedulingRunnable(beanName, methodName, methodParams);
ScheduledTask scheduledTask = this.scheduledTasks.remove(task);
if (scheduledTask != null) {
scheduledTask.cancel();
}
}
public void removeCronTask(ScheduleResult scheduleResult) {
schedulerJob.put(scheduleResult.getJobId(), scheduleResult);
removeCronTask(scheduleResult.getBeanName(), scheduleResult.getMethodName(), scheduleResult.getMethodParams());
}
public ScheduledTask scheduleCronTask(CronTask cronTask) {
ScheduledTask scheduledTask = new ScheduledTask();
scheduledTask.future = this.taskScheduler.schedule(cronTask.getRunnable(), cronTask.getTrigger());
return scheduledTask;
}
public Map<Runnable, ScheduledTask> getScheduledTasks() {
return scheduledTasks;
}
public Map<Integer, ScheduleResult> getSchedulerJob() {
return schedulerJob;
}
@Override
public void destroy() {
for (ScheduledTask task : this.scheduledTasks.values()) {
task.cancel();
}
this.scheduledTasks.clear();
}
public ScheduleResult getSchedulerByJobId(Integer jobId) {
for (ScheduleResult job : findAllTask()) {
if (jobId.equals(job.getJobId())) {
return job;
}
}
return null;
}
public List<ScheduleResult> findAllTask() {
List<ScheduleResult> ScheduleResults = new ArrayList<>();
Set<Map.Entry<Integer, ScheduleResult>> entries = schedulerJob.entrySet();
for (Map.Entry<Integer, ScheduleResult> en : entries) {
ScheduleResults.add(en.getValue());
}
return ScheduleResults;
}
}
CronUtils:校验Cron表达式的有效性
import org.springframework.scheduling.support.CronExpression;
public class CronUtils {
/**
* 返回一个布尔值代表一个给定的Cron表达式的有效性
*
* @param cronExpression Cron表达式
* @return boolean 表达式是否有效
*/
public static boolean isValid(String cronExpression) {
return CronExpression.isValidExpression(cronExpression);
}
}
ScheduleResult:添加定时任务实体类
import lombok.Data;
@Data
public class ScheduleResult {
/**
* 任务ID
*/
private Integer jobId;
/**
* bean名称
*/
private String beanName;
/**
* 方法名称
*/
private String methodName;
/**
* 方法参数: 执行service里面的哪一种方法
*/
private String methodParams;
/**
* cron表达式
*/
private String cronExpression;
/**
* 状态(1正常 0暂停)
*/
private Integer jobStatus;
/**
* 备注
*/
private String remark;
/**
* 创建时间
*/
private String createTime;
/**
* 更新时间
*/
private String updateTime;
}
ScheduleJobStatus:任务状态枚举类型
public enum ScheduleJobStatus {
/**
* 暂停
*/
PAUSE,
/**
* 正常
*/
NORMAL;
}
SpringContextUtils类:从spring容器里获取bean
import org.springframework.beans.BeansException;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.stereotype.Component;
@Component
public class SpringContextUtils implements ApplicationContextAware {
private static ApplicationContext applicationContext = null;
@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
if (SpringContextUtils.applicationContext == null) {
SpringContextUtils.applicationContext = applicationContext;
}
}
public static ApplicationContext getApplicationContext() {
return applicationContext;
}
// 通过name获取 Bean.
public static Object getBean(String name) {
return getApplicationContext().getBean(name);
}
// 通过class获取Bean.
public static <T> T getBean(Class<T> clazz) {
return getApplicationContext().getBean(clazz);
}
// 通过name,以及Clazz返回指定的Bean
public static <T> T getBean(String name, Class<T> clazz) {
return getApplicationContext().getBean(name, clazz);
}
public static boolean containsBean(String name) {
return getApplicationContext().containsBean(name);
}
public static boolean isSingleton(String name) {
return getApplicationContext().isSingleton(name);
}
public static Class<? extends Object> getType(String name) {
return getApplicationContext().getType(name);
}
}
ScheduleJobService:增删启停service方法
@Service
@Slf4j
public class ScheduleJobService {
@Autowired
private CronTaskRegistrar cronTaskRegistrar;
public void addScheduleJob(ScheduleResult scheduleResult) {
long currentTimeMillis = System.currentTimeMillis();
scheduleResult.setCreateTime(formatTimeYMD_HMS_SSS(currentTimeMillis));
scheduleResult.setUpdateTime(formatTimeYMD_HMS_SSS(currentTimeMillis));
scheduleResult.setJobId(findAllTask().size() + 1);
if (scheduleResult.getJobStatus().equals(ScheduleJobStatus.NORMAL.ordinal())) {
log.info("Stop or pause: is now on");
cronTaskRegistrar.addCronTask(scheduleResult);
return;
}
cronTaskRegistrar.getSchedulerJob().put(scheduleResult.getJobId(), scheduleResult);
}
public void editScheduleJob(ScheduleResult currentSchedule) {
//先移除
cronTaskRegistrar.removeCronTask(currentSchedule.getBeanName(), currentSchedule.getMethodName(), currentSchedule.getMethodParams());
ScheduleResult pastScheduleJob = cronTaskRegistrar.getSchedulerByJobId(currentSchedule.getJobId());
if (pastScheduleJob == null) {
System.out.println("没有这个任务");
return;
}
//然后判断是否开启, 如果开启的话,现在立即执行
startOrStopSchedulerJob(currentSchedule, true);
}
public void deleteScheduleJob(ScheduleResult scheduleResult) {
// 清除这个任务
cronTaskRegistrar.removeCronTask(scheduleResult.getBeanName(), scheduleResult.getMethodName(), scheduleResult.getMethodParams());
// 清除这个任务的数据
cronTaskRegistrar.getSchedulerJob().remove(scheduleResult.getJobId());
}
public void startOrStopScheduler(ScheduleResult scheduleResult) {
cronTaskRegistrar.getSchedulerJob().get(scheduleResult.getJobId()).setJobStatus(scheduleResult.getJobStatus());
startOrStopSchedulerJob(scheduleResult, false);
}
private void startOrStopSchedulerJob(ScheduleResult scheduleResult, boolean update) {
// 更新时间
scheduleResult.setUpdateTime(formatTimeYMD_HMS_SSS(System.currentTimeMillis()));
if (scheduleResult.getJobStatus().equals(ScheduleJobStatus.NORMAL.ordinal())) {
System.out.println("停止或暂停:现在是开启");
cronTaskRegistrar.addCronTask(scheduleResult);
return;
}
System.out.println("停止或暂停:现在是暂停");
if (update){
cronTaskRegistrar.removeCronTask(scheduleResult);
return;
}
cronTaskRegistrar.removeCronTask(scheduleResult.getBeanName(), scheduleResult.getMethodName(), scheduleResult.getMethodParams());
}
public List<ScheduleResult> findAllTask() {
return cronTaskRegistrar.findAllTask();
}
// 转换为年-月-日 时:分:秒
private String formatTimeYMD_HMS_SSS(long time) {
return new SimpleDateFormat("yyyy-MM-dd HH:mm:ss:SSS").format(time);
}
}
cronController:访问接口
import com.example.testspringboot.cron.ScheduleResult;
import com.example.testspringboot.cron.ScheduleJobService;
import com.example.testspringboot.cron.utils.CronUtils;
import com.google.gson.Gson;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.*;
import java.util.List;
@RestController
public class cronController {
@Autowired
private ScheduleJobService scheduleJobService;
/**
* 测试上传的用例文件, 获取详细执行结果
*/
@PostMapping("/add")
void executeTestOneFile(@RequestBody ScheduleResult scheduleResult) {
boolean valid = CronUtils.isValid(scheduleResult.getCronExpression());
if (valid){
System.out.println("校验成功, 添加任务");
scheduleResult.setMethodParams(scheduleResult.getBranch()+scheduleResult.getCaseDir());
scheduleJobService.addScheduleJob(scheduleResult);
}else {
System.out.println("校验失败");
}
}
@PostMapping("/stop")
void end(@RequestBody ScheduleResult scheduleResult) {
Gson gson = new Gson();
System.out.println("================");
System.out.println(scheduleResult);
System.out.println("=================");
scheduleResult.setJobStatus(0);
scheduleJobService.startOrStopScheduler(scheduleResult);
}
@PostMapping("/start")
void start(@RequestBody ScheduleResult scheduleResult) {
System.out.println("================");
System.out.println(scheduleResult);
System.out.println("=================");
scheduleResult.setJobStatus(1);
scheduleJobService.startOrStopScheduler(scheduleResult);
}
@PostMapping("/edit")
void edit(@RequestBody ScheduleResult scheduleResult) {
System.out.println("=======edit=========");
System.out.println(scheduleResult);
System.out.println("=================");
scheduleJobService.editScheduleJob(scheduleResult);
}
@PostMapping("/delete")
void delete(@RequestBody ScheduleResult scheduleResult) {
System.out.println("=======delete=========");
System.out.println(scheduleResult);
System.out.println("=================");
scheduleJobService.deleteScheduleJob(scheduleResult);
}
@GetMapping("/tasks")
List<ScheduleResult> get() throws Exception {
List<ScheduleResult> allTask = scheduleJobService.findAllTask();
System.out.println("现在的定时任务数量 = " + allTask.size());
System.out.println("现在的定时任务 = " + allTask);
return allTask;
}
}
c1:测试bean
import org.springframework.stereotype.Component;
@Component
public class c1 {
public void test1(String y){
System.out.println("这个是test1的bean : " + y);
}
public void test2(){
System.out.println("这个是test1的bean中test2方法");
}
}
init:项目启动后的定时任务
import com.example.testspringboot.cron.ScheduleJobService;
import com.example.testspringboot.cron.ScheduleResult;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner;
import org.springframework.stereotype.Component;
@Component
public class init implements CommandLineRunner {
@Autowired
private ScheduleJobService scheduleJobService;
@Override
public void run(String... args) throws Exception {
System.out.println("开始珍惜");
ScheduleResult scheduleResult = new ScheduleResult();
scheduleResult.setBeanName("c1");
scheduleResult.setMethodName("test1");
scheduleResult.setCronExpression("0/25 * * * * *");
scheduleResult.setJobStatus(1);
scheduleResult.setMethodParams("test1");
scheduleJobService.addScheduleJob(scheduleResult);
scheduleJobService.findAllTask();
}
}
后续的操作,基本上就是复制粘贴,运行