二八佳人体似酥,腰间仗剑斩愚夫。虽然不见人头落,暗里教君骨髓枯。
上一章简单介绍了SpringBoot整合Quartz实现动态定时任务(三十四) ,如果没有看过,请观看上一章
通过 Quartz 实现了动态定时任务,还需要引入 Quartz 组件,
能不能不引入组件,我们自己创建一个简单的动态定时任务呢?
本章节参考了文章: SpringBoot动态定时任务(完整版)
在这个大佬的基础上,进行了扩充。
SpringBoot 自定义动态定时任务
使用到了 MybatisPlus 组件信息, 不使用 Quartz 组件。
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<!--引入MySql的驱动-->
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
</dependency>
<!--引入springboot与mybatis-plus整合的依赖。 去掉mybatis的依赖-->
<dependency>
<groupId>com.baomidou</groupId>
<artifactId>mybatis-plus-boot-starter</artifactId>
<version>3.3.2</version>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>druid-spring-boot-starter</artifactId>
<version>1.2.6</version>
</dependency>
<!-- 引入pagehelper分页插件 注意版本号要与 mybatis-plus 进行匹配到 -->
<!--分页-->
<dependency>
<groupId>com.github.pagehelper</groupId>
<artifactId>pagehelper-spring-boot-starter</artifactId>
<version>1.2.12</version>
<exclusions>
<exclusion>
<artifactId>mybatis</artifactId>
<groupId>org.mybatis</groupId>
</exclusion>
<exclusion>
<artifactId>mybatis-spring</artifactId>
<groupId>org.mybatis</groupId>
</exclusion>
<exclusion>
<groupId>com.github.jsqlparser</groupId>
<artifactId>jsqlparser</artifactId>
</exclusion>
</exclusions>
</dependency>
<!-- json 工具 -->
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.47</version>
</dependency>
定义实体,创建表 ScheduleSetting 用于维护动态任务信息
/**
* 任务表
*
* @author yuejianli
* @date 2023-01-05
*/
@Data
@TableName("schedule_setting")
public class ScheduleSetting {
/**
* 任务ID
*/
@TableId(value = "id",type= IdType.AUTO)
private Integer id;
/**
* bean名称
*/
@TableField("bean_name")
private String beanName;
/**
* 方法名称
*/
@TableField("method_name")
private String methodName;
/**
* 方法参数
*/
@TableField("method_params")
private String methodParams;
/**
* cron表达式
*/
@TableField("cron_expression")
private String cronExpression;
/**
* 状态(1正常 0暂停)
*/
@TableField("job_status")
private Integer jobStatus;
/**
* 备注
*/
@TableField("remark")
private String remark;
/**
* 创建时间
*/
@TableField("create_time")
private Date createTime;
/**
* 更新时间
*/
@TableField("update_time")
private Date updateTime;
}
对应的 SQL 语句是:
drop table if exists `schedule_setting`;
CREATE TABLE `schedule_setting` (
`id` int NOT NULL AUTO_INCREMENT COMMENT '任务ID',
`bean_name` varchar(255) DEFAULT NULL COMMENT 'bean名称',
`method_name` varchar(255) DEFAULT NULL COMMENT '方法名称',
`method_params` varchar(255) DEFAULT NULL COMMENT '方法参数',
`cron_expression` varchar(255) DEFAULT NULL COMMENT 'cron表达式',
`remark` varchar(255) DEFAULT NULL COMMENT '备注',
`job_status` int DEFAULT NULL COMMENT '状态(1正常 0暂停)',
`create_time` datetime DEFAULT NULL COMMENT '创建时间',
`update_time` datetime DEFAULT NULL COMMENT '修改时间',
PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
创建 ScheduleSetting 对应的 Mapper 和 Service
这个就不讲了哈, 详情可以看 老蝴蝶之前写的 MybatisPlus 章节: SpringBoot整合MyBatisPlus(十四)
ScheduleSettingMapper
@Mapper
public interface ScheduleSettingMapper extends BaseMapper<ScheduleSetting> {
}
ScheduleSettingService
public interface ScheduleSettingService extends IService<ScheduleSetting> {
/**
查询所有运行中的任务
*/
List<ScheduleSetting> findAllRunJob();
}
@Service
@Slf4j
public class ScheduleSettingServiceImpl extends ServiceImpl<ScheduleSettingMapper, ScheduleSetting>
implements ScheduleSettingService{
@Override
public List<ScheduleSetting> findAllRunJob() {
return this.lambdaQuery()
.eq(ScheduleSetting::getJobStatus,1)
.list();
}
}
TaskScheduler 线程池配置 SchedulingConfig
@Configuration
public class SchedulingConfig {
@Bean
public TaskScheduler taskScheduler() {
ThreadPoolTaskScheduler taskScheduler = new ThreadPoolTaskScheduler();
// 定时任务执行线程池核心线程数
taskScheduler.setPoolSize(6);
taskScheduler.setRemoveOnCancelPolicy(true);
taskScheduler.setThreadNamePrefix("TaskSchedulerThreadPool-");
return taskScheduler;
}
}
线程任务处理 ScheduledTask
ScheduledFuture是ScheduledExecutorService定时任务线程池的执行结果。
/**
* 调度任务
*
* @author yuejianli
* @date 2023-01-05
*/
public final class ScheduledTask {
volatile ScheduledFuture<?> future;
/**
* 取消定时任务
*/
public void cancel() {
ScheduledFuture<?> future = this.future;
if (future != null) {
future.cancel(true);
}
}
}
线程池调用执行任务 SchedulingRunnable
被定时任务线程池调用,用来执行指定bean里面的方法。
@Slf4j
public class SchedulingRunnable implements Runnable {
private String beanName;
private String methodName;
private String params;
public SchedulingRunnable(String beanName, String methodName) {
this(beanName, methodName, null);
}
public SchedulingRunnable(String beanName, String methodName, String params) {
this.beanName = beanName;
this.methodName = methodName;
this.params = params;
}
@Override
public void run() {
log.info("定时任务开始执行 - bean:{},方法:{},参数:{}", beanName, methodName, params);
long startTime = System.currentTimeMillis();
try {
Object target = SpringContextUtils.getBean(beanName);
Method method = null;
if (StringUtils.hasText(params)) {
method = target.getClass().getDeclaredMethod(methodName, String.class);
} else {
method = target.getClass().getDeclaredMethod(methodName);
}
ReflectionUtils.makeAccessible(method);
if (StringUtils.hasText(params)) {
method.invoke(target, params);
} else {
method.invoke(target);
}
} catch (Exception ex) {
log.error(String.format("定时任务执行异常 - bean:%s,方法:%s,参数:%s ", beanName, methodName, params), ex);
}
long times = System.currentTimeMillis() - startTime;
log.info("定时任务执行结束 - bean:{},方法:{},参数:{},耗时:{} 毫秒", beanName, methodName, params, times);
}
@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
SchedulingRunnable that = (SchedulingRunnable) o;
if (params == null) {
return beanName.equals(that.beanName) &&
methodName.equals(that.methodName) &&
that.params == null;
}
return beanName.equals(that.beanName) &&
methodName.equals(that.methodName) &&
params.equals(that.params);
}
@Override
public int hashCode() {
if (params == null) {
return Objects.hash(beanName, methodName);
}
return Objects.hash(beanName, methodName, params);
}
}
注意,获取类时,使用的是
Object target = SpringContextUtils.getBean(beanName);
获取的是 Spring 组件名, 并不是全限定名称。
Cron定时任务注册类 CronTaskRegistrar
@Component
public class CronTaskRegistrar implements DisposableBean {
private final Map<Runnable, ScheduledTask> scheduledTasks = new ConcurrentHashMap<>(16);
@Resource
private TaskScheduler taskScheduler;
public void addCronTask(Runnable task, String cronExpression) {
addCronTask(new CronTask(task, cronExpression));
}
public void addCronTask(CronTask cronTask) {
if (cronTask != null) {
Runnable task = cronTask.getRunnable();
if (this.scheduledTasks.containsKey(task)) {
removeCronTask(task);
}
this.scheduledTasks.put(task, scheduleCronTask(cronTask));
}
}
public void removeCronTask(Runnable task) {
ScheduledTask scheduledTask = this.scheduledTasks.remove(task);
if (scheduledTask != null) {
scheduledTask.cancel();
}
}
public ScheduledTask scheduleCronTask(CronTask cronTask) {
ScheduledTask scheduledTask = new ScheduledTask();
scheduledTask.future = this.taskScheduler.schedule(cronTask.getRunnable(), cronTask.getTrigger());
return scheduledTask;
}
@Override
public void destroy() {
for (ScheduledTask task : this.scheduledTasks.values()) {
task.cancel();
}
this.scheduledTasks.clear();
}
}
全局获取 Bean 的工具类 SpringContextUtils
@Component
public class SpringContextUtils implements ApplicationContextAware {
private static ApplicationContext applicationContext;
@Override
public void setApplicationContext(ApplicationContext applicationContext)
throws BeansException {
SpringContextUtils.applicationContext = applicationContext;
}
public static Object getBean(String name) {
return applicationContext.getBean(name);
}
public static <T> T getBean(Class<T> requiredType) {
return applicationContext.getBean(requiredType);
}
public static <T> T getBean(String name, Class<T> requiredType) {
return applicationContext.getBean(name, requiredType);
}
public static boolean containsBean(String name) {
return applicationContext.containsBean(name);
}
public static boolean isSingleton(String name) {
return applicationContext.isSingleton(name);
}
public static Class<? extends Object> getType(String name) {
return applicationContext.getType(name);
}
}
服务启动时,对任务处理 SysJobRunner
/**
* 服务启动时处理任务
*
* @author yuejianli
* @date 2023-01-05
*/
@Slf4j
@Component
public class SysJobRunner implements CommandLineRunner {
@Resource
private CronTaskRegistrar cronTaskRegistrar;
@Resource
private ScheduleSettingService scheduleSettingService;
@Override
public void run(String... args) {
// 初始加载数据库里状态为正常的定时任务
List<ScheduleSetting> jobList = scheduleSettingService.findAllRunJob();
if (CollectionUtils.isNotEmpty(jobList)) {
for (ScheduleSetting job : jobList) {
SchedulingRunnable task = new SchedulingRunnable(job.getBeanName(), job.getMethodName(), job.getMethodParams());
cronTaskRegistrar.addCronTask(task, job.getCronExpression());
}
log.info("定时任务已加载完毕...");
}
}
}
定时任务创建
创建两个定时任务, 一个无参数,一个有参数。
public interface BaseTask {
/**
* 执行任务
* @param param 参数
*/
void execute(String param);
}
@Slf4j
@Component("myTask1")
public class MyTask1 implements BaseTask{
@Resource
private UserService userService;
@Override
public void execute(String param) {
log.info(" 空参数任务 :" );
userService.addUser(null);
}
}
@Slf4j
@Component("myTask2")
public class MyTask2 implements BaseTask{
@Resource
private UserService userService;
@Override
public void execute(String param) {
log.info(" 有参数任务 :" + param);
if (ObjectUtils.isEmpty(param)) {
userService.addUser(null);
return ;
}
Map<String,String> jobDataMap = JSON.parseObject(param,Map.class);
User user = new User();
user.setName(jobDataMap.get("name"));
user.setAge(Integer.parseInt(jobDataMap.get("age")));
user.setSex(jobDataMap.get("sex"));
user.setDescription(jobDataMap.get("description"));
userService.addUser(user);
}
}
对定时任务和表进行业务整合
/**
* 任务封装处理
*
* @author Yue Jianli
* @date 2023-01-05
*/
public interface JobService {
/**
查询所有运行的任务
*/
List<ScheduleSetting> findAllRunJob();
/**
* 创建定时任务
* @param scheduleSetting 任务参数对象
*/
boolean createJob(ScheduleSetting scheduleSetting);
/**
* 更新定时任务
* @param scheduleSetting 任务参数对象
*/
boolean updateJob(ScheduleSetting scheduleSetting);
/**
* 删除任务
* @param id 任务id
*/
boolean deleteJob(Integer id);
/**
* 暂停任务
* @param id 任务id
*/
boolean pauseJob(Integer id);
/**
* 重新开始任务
* @param id 任务id
*/
boolean resumeJob(Integer id);
}
@Service
@Slf4j
public class JobServiceImpl implements JobService{
@Resource
private CronTaskRegistrar cronTaskRegistrar;
@Resource
private ScheduleSettingService scheduleSettingService;
@Override
public List<ScheduleSetting> findAllRunJob() {
return scheduleSettingService.findAllRunJob();
}
@Override
public boolean createJob(ScheduleSetting scheduleSetting) {
scheduleSetting.setCreateTime(new Date());
scheduleSetting.setUpdateTime(new Date());
boolean insert = scheduleSettingService.save(scheduleSetting);
if (!insert) {
return false;
}
// 添加成功,并且状态是1,直接放入任务器
if (scheduleSetting.getJobStatus().equals(1)) {
SchedulingRunnable task = new SchedulingRunnable(scheduleSetting.getBeanName(), scheduleSetting.getMethodName(), scheduleSetting.getMethodParams());
cronTaskRegistrar.addCronTask(task, scheduleSetting.getCronExpression());
}
return true;
}
@Override
public boolean updateJob(ScheduleSetting scheduleSetting) {
scheduleSetting.setCreateTime(new Date());
scheduleSetting.setUpdateTime(new Date());
// 查询修改前任务
ScheduleSetting oldJobInfo = scheduleSettingService.getById(scheduleSetting.getId());
if (null == oldJobInfo) {
return false;
}
// 修改任务
boolean update = scheduleSettingService.updateById(scheduleSetting);
if (!update) {
return false;
}
// 修改成功,则先删除任务器中的任务,并重新添加
SchedulingRunnable task1 = new SchedulingRunnable(oldJobInfo.getBeanName(), oldJobInfo.getMethodName(), oldJobInfo.getMethodParams());
cronTaskRegistrar.removeCronTask(task1);
// 如果修改后的任务状态是1就加入任务器
if (scheduleSetting.getJobStatus().equals(1)) {
SchedulingRunnable task = new SchedulingRunnable(scheduleSetting.getBeanName(), scheduleSetting.getMethodName(), scheduleSetting.getMethodParams());
cronTaskRegistrar.addCronTask(task, scheduleSetting.getCronExpression());
}
return true;
}
@Override
public boolean deleteJob(Integer id) {
ScheduleSetting oldJobInfo = scheduleSettingService.getById(id);
if (null == oldJobInfo) {
return false;
}
// 删除
boolean del = scheduleSettingService.removeById(id);
if (!del){
return false;
}
// 删除成功时要清除定时任务器中的对应任务
SchedulingRunnable task = new SchedulingRunnable(oldJobInfo.getBeanName(), oldJobInfo.getMethodName(), oldJobInfo.getMethodParams());
cronTaskRegistrar.removeCronTask(task);
return true;
}
@Override
public boolean pauseJob(Integer id) {
return changeJobStatus(id,0);
}
@Override
public boolean resumeJob(Integer id) {
return changeJobStatus(id,1);
}
private boolean changeJobStatus(Integer id,Integer jobStatus) {
// 修改任务状态
ScheduleSetting scheduleSetting = new ScheduleSetting();
scheduleSetting.setJobStatus(jobStatus);
scheduleSetting.setId(id);
boolean updateJobFlag = scheduleSettingService.updateById(scheduleSetting);
if (!updateJobFlag) {
return false;
}
// 查询修改后的任务信息
ScheduleSetting existedSysJob = scheduleSettingService.getById(id);
// 如果状态是1则添加任务
if (existedSysJob.getJobStatus().equals(1)) {
SchedulingRunnable task = new SchedulingRunnable(existedSysJob.getBeanName(), existedSysJob.getMethodName(), existedSysJob.getMethodParams());
cronTaskRegistrar.addCronTask(task, existedSysJob.getCronExpression());
} else {
// 否则清除任务
SchedulingRunnable task = new SchedulingRunnable(existedSysJob.getBeanName(), existedSysJob.getMethodName(), existedSysJob.getMethodParams());
cronTaskRegistrar.removeCronTask(task);
}
return true;
}
}
Controller 控制器配置
@RestController
@RequestMapping("/job")
public class JobController {
@Resource
private JobService jobService;
@RequestMapping("/findAllRunJob")
public List<ScheduleSetting> findAllRunJob() {
try {
return jobService.findAllRunJob();
} catch (Exception e) {
return null;
}
}
/**
* 创建定时任务
*
* @param scheduleSetting
* @return
*/
@PostMapping("create")
public String create(@RequestBody ScheduleSetting scheduleSetting) {
return jobService.createJob(scheduleSetting) ? "创建成功":"创建失败";
}
/**
* 修改定时任务
*
* @param scheduleSetting
* @return
*/
@PostMapping("update")
public String update(@RequestBody ScheduleSetting scheduleSetting) {
return jobService.updateJob(scheduleSetting) ? "更新成功":"更新失败";
}
/**
* 删除任务
*
* @param id 任务id
* @return
*/
@PostMapping("delete/{jobId}")
public String del(@PathVariable("jobId") Integer id) {
return jobService.deleteJob(id) ? "删除成功":"删除失败";
}
/**
* 暂停任务
*
* @param id 任务id
* @return
*/
@PostMapping("pause/{jobId}")
public String pause(@PathVariable("jobId") Integer id) {
return jobService.pauseJob(id) ? "暂停成功":"暂停失败";
}
/**
* 重新开始任务
*
* @param id 任务id
* @return
*/
@PostMapping("resume/{jobId}")
public String resume(@PathVariable("jobId") Integer id) {
return jobService.resumeJob(id) ? "重新开始任务成功":"重新开始任务失败";
}
}
执行验证前调整
SQL 手动插入一条任务,验证 服务启动时任务启动
-- 插入一条无参的任务
INSERT INTO `schedule_setting`(`id`, `bean_name`, `method_name`, `method_params`, `cron_expression`, `remark`,
`job_status`, `create_time`, `update_time`)
VALUES (1, 'myTask1', 'execute', NULL, '1/5 * * * * ?', '无参', 1,
'2023-01-05 17:12:35', '2023-01-05 17:12:38');
对任务调整
- 去掉日志打印
- 均执行有参的方法
执行验证
服务启动验证
服务启动时, 会从数据库中读取任务进行执行。
查询所有的任务 findAllRunJob
添加任务 create
任务创建成功,但是没有启动呢。 id的编号是2
启动任务 resume
查看运行中的任务列表,发现是有2条的
暂停任务 pause
更新任务 update
修改参数 和 cron 表达式
更新任务时, id 不会发生改变。
删除任务 delete
数据库里面也只有 myTask1 的任务
自定义动态任务是成功的。
本章节的代码放置在 github 上:
https://github.com/yuejianli/springboot/tree/develop/SpringBoot_SelfJob
谢谢您的观看,如果喜欢,请关注我,再次感谢 !!!