基于quartz实现定时任务管理系统
背景
说起定时任务框架,首先想到的是Quartz。这是定时任务的老牌框架了,它的优缺点都很明显。借助PowerJob 的readme文档的内容简单带过一下这部分。
除了上面提到,还有elastic-job-lite
、quartzui
也是相当火热的,可以自行了解一下。
那么都这么多后起之秀,个个吊打quartz,为什么我还选择用quartz来做定时任务?技术是服务于业务的,肯定是有需求呗。公司有个统一管理平台,现在需要开发一个定时任务管理平台,可以动态去管理配置定时任务、查看运行日志。
为什么不考虑其它的框架?由于需求要定制化ui界面和定时任务执行结果接入统一的通知中心等等需求,上面大多数框架都是通过简单配置开箱即用,定制化需要对源码有一定熟悉程度,而quartz我在大学就用了很多次了,非常熟悉,改造相对容易。
需求分析
- 定时任务的增删改查
- 定时任务执行日志查看
详细设计
开发环境
jdk 1.8
spring boot 2.7.7
quartz 单机版
Mysql 8.0
1. 定时任务的实现
quartz的任务实际是通过内置Java类实现job
接口或者继承QuartzJobBean
实现executeInternal来实现定时任务的添加。所以我们在管理页面上所做的增删改查操作都不可能是真正意义上地去修改了任务类,而是修改了映射类。举个例子来说,比如Power Job的管理界面
我们修改的这些任务名称、定时信息的持久化操作都不会是操作了真正的任务类,而是修改了一个绑定这个任务类的映射类。如下图中的类就是任务类
而图片中的这些就是属性就是映射类的属性。
你可以先是觉得任务类和映射类之间是一对一的关系,映射类记录了定时任务的执行频率(cron)、名称、任务类完整类名等其它的信息,然后在执行过程中通过完整类名反射获得任务类,任务类再根据这些信息去执行。
但如果每个定时任务,我们都要去实现Job接口创建一个类,相同的那些代码比如获取trigger、scheduler等等,都要出现在每个类中,每次添加一个定时任务都要多一个专门的定时任务类。每次开发时都要关注业务和定时任务类之间的关系,多了之后是有点烦。
我推荐的做法是,创建一个具备http请求功能的任务类,将业务操作开发成一个接口,在配置映射类时,将http链接配置上去,这样一到时间 ,就会请求到对应的业务接口。这样使得后续的定时任务功能开发更专注于业务开发,方便使用。尤其是团队开发中,对一些不熟悉quartz的朋友格外友好。
编码实现
引入依赖
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-quartz</artifactId>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>8.0.31</version>
</dependency>
<dependency>
<groupId>com.baomidou</groupId>
<artifactId>mybatis-plus-boot-starter</artifactId>
<version>3.5.2</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.16</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>cn.hutool</groupId>
<artifactId>hutool-all</artifactId>
<version>5.8.11</version>
</dependency>
</dependencies>
<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-dependencies</artifactId>
<version>${spring-boot.version}</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>
映射实体类
@TableName
@Data
public class ScheduleJob {
@TableId
private Long id;
@TableField
private String jobName;
@TableField
private String cronExpr; // cron表达式
@TableField
private String jobStatus; // 任务状态
@TableField
private String url; // 业务接口
}
必不可少的属性大概就是上面这几个,后续需要再添加
service层与mapper层我偷个懒,直接用mybatisplus的api
ScheduleJobService
@Service
public class ScheduleJobService extends ServiceImpl<ScheduleJobMapper, ScheduleJob> {
}
ScheduleJobMapper
@Mapper
public interface ScheduleJobMapper extends BaseMapper<ScheduleJob> {
}
有了这些就可以先做个添加定时任务的接口测试一下流程是否通畅
ScheduleJobController
@RestController
public class ScheduleJobController {
@Resource
private ScheduleJobService scheduleJobService;
@PostMapping("/scheduleJob")
public ResultBean createScheduleJob(@RequestBody ScheduleJob scheduleJob) {
scheduleJobService.save(scheduleJob);
return ResultBean.SUCCESS();
}
}
这个ResultBean是我封装的一个返回对象
@Data
@AllArgsConstructor
public class ResultBean {
private String code;
private String msg;
private Object data;
public static ResultBean SUCCESS(Object... data) {
String code = "200";
String msg = "操作成功!";
return new ResultBean(code, msg, data);
}
}
简单测试了一下,能写入库
目前为止的操作和定时任务还没有一分钱关系,接下来我们来接入定时任务。
创建任务类,这个任务类要支持发送http请求,所以取名为RestRequestJob,不过我们还不知道能不能真的按着指定的时间的执行,先不写太复杂。
@Slf4j
public class RestRequestJob extends QuartzJobBean {
@Override
protected void executeInternal(JobExecutionContext context) throws JobExecutionException {
log.info("hello quartz");
}
}
映射实体类有了,任务类也有了,就差任务类怎么按照映射实体类的信息去执行定时任务了
@Service
public class ScheduleJobManageService {
@Autowired
private ScheduleJobService scheduleJobService;
@Autowired
private Scheduler scheduler;
public void createScheduleJob(ScheduleJob scheduleJob) {
JobKey jobKey = JobKey.jobKey(scheduleJob.getJobName() + scheduleJob.getId());
JobDetail jobDetail = JobBuilder.newJob(RestRequestJob.class).withIdentity(jobKey).build();
// 表达式调度构建器
CronScheduleBuilder cronScheduleBuilder = CronScheduleBuilder.cronSchedule(scheduleJob.getCronExpr());
// 按新的cronExpression表达式构建一个新的trigger
CronTrigger trigger = TriggerBuilder.newTrigger()
.withIdentity(TriggerKey.triggerKey(scheduleJob.getId().toString()))
.withSchedule(cronScheduleBuilder).build();
jobDetail.getJobDataMap().put("TASK_PROPERTIES", scheduleJob);
try {
scheduler.scheduleJob(jobDetail, trigger);
if (scheduleJob.getJobStatus().equals("0")) {
scheduler.pauseJob(jobKey);
}
} catch (SchedulerException e) {
throw new RuntimeException(e);
}
}
}
jobKey是定时任务的唯一标识
jobDetail是任务类
trigger是执行时间
jobDetail.getJobDataMap().put("TASK_PROPERTIES", scheduleJob);
是把目前的映射类信息保存到缓存中,可提供给其它地方用。
scheduler.scheduleJob
是开始定时任务的线程
scheduler.pauseJob
如果发现状态是暂停的话,则暂停定时任务
回到ScheduleJobController
类的createScheduleJob
方法,调用scheduleJobManageService类中的创建定时任务方法
@PostMapping("/scheduleJob")
public ResultBean createScheduleJob(@RequestBody ScheduleJob scheduleJob) {
scheduleJobService.save(scheduleJob);
scheduleJobManageService.createScheduleJob(scheduleJob);
return ResultBean.SUCCESS();
}
到此再来测试一下,添加定时任务时,能不能启动任务类
定时任务启动成功了,但是目前除了停止应用,我们完全没办法能让它停下来。得写个暂停方法
@RestController
public class ScheduleJobManagerController {
@Resource
private ScheduleJobManageService scheduleJobManageService;
@GetMapping("/scheduleJobManage/changeStatus/{id}")
public ResultBean changeStatus(@PathVariable("id") Long id) {
try {
scheduleJobManageService.changeStatus(id);
return ResultBean.SUCCESS();
} catch (SchedulerException e) {
throw new RuntimeException(e);
}
}
}
在ScheduleJobManageService类中添加changeStatus方法
@Transactional
public void changeStatus(Long id) throws SchedulerException {
ScheduleJob scheduleJob = scheduleJobService.getById(id);
String status = scheduleJob.getJobStatus().equals("1") ? "0" : "1";
scheduleJob.setJobStatus(status);
scheduleJobService.saveOrUpdate(scheduleJob);
JobKey jobKey = JobKey.jobKey(scheduleJob.getJobName() + scheduleJob.getId());
switch (status) {
case "1":
log.info("已启动任务"+scheduleJob.getId());
scheduler.resumeJob(jobKey);
break;
case "0":
log.info("已暂停任务"+scheduleJob.getId());
scheduler.pauseJob(jobKey);
break;
}
}
测试一下,暂停启动都能成功
那我们这些定时任务遭遇了服务器重启之后都没法再启动了吗?那肯定不是,我们在服务启动过后再加载回来这些定时任务不就ok了嘛。在ScheduleJobManageService类中加多一个init方法
@PostConstruct
public void init() throws SchedulerException{
scheduler.clear();
List<ScheduleJob> jobs = scheduleJobService.list();
jobs.parallelStream().forEach(this::createScheduleJob);
}
一启动之后,那堆没暂停的定时任务一直在跑
现在该实现http请求了,回到RestRequestJob
类的executeInternal
protected void executeInternal(JobExecutionContext context) throws JobExecutionException {
// log.info("hello quartz");
ScheduleJob scheduleJob = new ScheduleJob();
BeanUtils.copyProperties(context.getMergedJobDataMap().get("TASK_PROPERTIES"), scheduleJob);
log.info("calling...{}",scheduleJob.getUrl());
String s = HttpUtil.get(scheduleJob.getUrl());
log.info(s);
}
context.getMergedJobDataMap().get("TASK_PROPERTIES")
这个就是刚才存进去的scheduleJob对象,这样就可以将url取出来,去访问了。
我做了一个很简单的接口用于测试是否url可以访问通的
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-UEO95tVV-1678082698242)(image-20230303154257116.png)]
调用成功
目前还缺定时任务修改、移除,执行日志记录
定时任务修改
ScheduleJobController
@PutMapping("/scheduleJob")
public ResultBean updateScheduleJob(@RequestBody ScheduleJob scheduleJob) {
try {
scheduleJobManageService.updateScheduleJob(scheduleJob);
return ResultBean.SUCCESS();
} catch (SchedulerException e) {
throw new RuntimeException(e);
}
}
ScheduleJobManageService
@Transactional
public void updateScheduleJob(ScheduleJob scheduleJob) throws SchedulerException {
ScheduleJob oldInfo = scheduleJobService.getById(scheduleJob.getId());
scheduleJobService.saveOrUpdate(scheduleJob);
if (!oldInfo.getCronExpr().equals(scheduleJob.getCronExpr()) || !oldInfo.getUrl().equals(scheduleJob.getUrl())) {
JobKey jobKey = JobKey.jobKey(scheduleJob.getId().toString());
if (scheduler.checkExists(jobKey))
{
// 防止创建时存在数据问题 先移除,然后在执行创建操作
scheduler.deleteJob(jobKey);
}
createScheduleJob(scheduleJob);
}
log.info("更新成功!");
}
删除定时任务
ScheduleJobController
@DeleteMapping("/scheduleJob/{id}")
public ResultBean deleteScheduleJob(@PathVariable Long id) {
try {
scheduleJobManageService.deleteScheduleJob(id);
return ResultBean.SUCCESS();
} catch (SchedulerException e) {
throw new RuntimeException(e);
}
}
ScheduleJobManageService
@Transactional
public void deleteScheduleJob(Long id) throws SchedulerException {
boolean b = scheduleJobService.removeById(id);
if (b) {
scheduler.deleteJob(JobKey.jobKey(id.toString()));
}
}
2. 日志写入
实体类
@Data
@TableName
public class ScheduleJobLog {
@TableId(type = IdType.AUTO)
private Long id;
@TableField
private Long schdJobId;
@TableField
private Date startTime;
@TableField
private Date endTime;
@TableField
private Integer executeRes;
@TableField
private String errorInfo;
}
Mapper和Service层都是用了mybatisplus偷工减料,就不累赘了。
重点是,我们的日志怎么切入?切入在什么地方?quartz实际上当trigger时间到了的时候,他去执行Job的实现类。也就是说我们的日志应该切入在RestRequestJob
类中,那接下来改装一下executeInternal
方法
protected void executeInternal(JobExecutionContext context) throws JobExecutionException {
// log.info("hello quartz");
ScheduleJobLog scheduleJobLog = new ScheduleJobLog();
ScheduleJob scheduleJob = null;
try {
scheduleJobLog.setStartTime(new Date());
scheduleJob = new ScheduleJob();
BeanUtils.copyProperties(context.getMergedJobDataMap().get("TASK_PROPERTIES"), scheduleJob);
log.info("calling...{}",scheduleJob.getUrl());
// String s = HttpUtil.get(scheduleJob.getUrl());
HttpResponse httpRes = HttpRequest.get(scheduleJob.getUrl()).execute();
log.info(String.valueOf(httpRes.getStatus()));
if (httpRes.getStatus() == HttpStatus.HTTP_NOT_FOUND) {
throw new Exception(String.valueOf(HttpStatus.HTTP_NOT_FOUND));
}
scheduleJobLog.setExecuteRes(1); // 执行成功
} catch (Exception e) {
scheduleJobLog.setExecuteRes(-1); // 执行失败
scheduleJobLog.setErrorInfo(e.getMessage());
throw new RuntimeException(e);
} finally {
scheduleJobLog.setEndTime(new Date());
scheduleJobLog.setSchdJobId(scheduleJob.getId());
ScheduleJobLogService logService = SpringUtil.getBean(ScheduleJobLogService.class);
logService.save(scheduleJobLog);
}
}
SpringUtil.getBean
是使用了hutool的工具,目的是在不被spring管理的类中也能拿到Bean。
到这里看起来貌似没什么问题,能增删改定时任务了,日志也能插入了。但是经验告诉我,如果碰到那种定时任务需要同步几万数据入库的,http的长连接也扛不住,而且一直在等待也对性能造成极大的影响。
那么能不能说,发出去的请求,到对应业务接口自己执行得了,不管你的死活,你执行完之后告诉我一声执行结果得了。这就是异步思想。当http请求过来的时候,任务类中把执行的日志中的id也带过去,业务接口这边一旦收到,马上就返回结果,告诉任务类这边,收到执行指令,你安就得了。任务类把执行日志的状态设置为【执行中】。业务接口执行结束之后调用一下,定时任务日志的接口,把状态更新即可。
- 新增更新执行状态接口
@RestController
public class ScheduleJobLogController {
@Autowired
private ScheduleJobLogService logService;
@PutMapping("/scheduleJobLog")
public ResultBean updateStatus (@RequestBody ScheduleJobLog scheduleJobLog) {
logService.updateStatus(scheduleJobLog);
return ResultBean.SUCCESS();
}
}
2.实现更新状态方法updateStatus
@Service
public class ScheduleJobLogService extends ServiceImpl<ScheduleJobLogMapper, ScheduleJobLog> {
public void updateStatus(ScheduleJobLog scheduleJobLog) {
ScheduleJobLog jobLog = this.getById(scheduleJobLog.getId());
jobLog.setExecuteRes(scheduleJobLog.getExecuteRes());
jobLog.setErrorInfo(scheduleJobLog.getErrorInfo());
jobLog.setEndTime(new Date()); // 更新执行结束时间
this.saveOrUpdate(jobLog);
}
}
在业务执行接口上有比较大的变动
@RestController
public class ExecuteJobController {
@GetMapping("/scheduleJob/helloWorld")
public ResultBean sayHelloWorld(Long logId) {
ScheduleJobLog jobLog = new ScheduleJobLog();
jobLog.setId(logId);
new Thread(() -> {
try {
// 模拟超时长业务
// Thread.sleep(2000);
jobLog.setExecuteRes(1);
//throw new Exception("just test");
} catch (Exception e) {
jobLog.setExecuteRes(-1);
jobLog.setErrorInfo(e.getMessage());
throw new RuntimeException(e);
} finally {
HttpRequest.put("http://localhost:8080/scheduleJobLog").body(JSONUtil.toJsonStr(jobLog)).execute();
}
}).start();
return ResultBean.SUCCESS("hello world!");
}
}
在实际的测试过程中,RestRequestJob的实现方法中,由于入库操作在finally中,当业务接口比较简单时,会出现数据库的并发问题,日志记录未入库,导致scheduleJobLogService.getById()方法空指针,因此要先入库再发送http请求业务接口
protected void executeInternal(JobExecutionContext context) throws JobExecutionException {
ScheduleJobLog scheduleJobLog = new ScheduleJobLog();
ScheduleJob scheduleJob = new ScheduleJob();
long snowflakeNextId = IdUtil.getSnowflakeNextId();
ScheduleJobLogService logService = SpringUtil.getBean(ScheduleJobLogService.class);
try {
BeanUtils.copyProperties(context.getMergedJobDataMap().get("TASK_PROPERTIES"), scheduleJob);
scheduleJobLog.setExecuteRes(0); // 正在执行
scheduleJobLog.setStartTime(new Date());
scheduleJobLog.setId(snowflakeNextId);
scheduleJobLog.setSchdJobId(scheduleJob.getId());
logService.save(scheduleJobLog); // 避免出现并发问题,应该先入库
log.info("calling...{}", scheduleJob.getUrl()+"?logId="+snowflakeNextId);
String reqUrl = scheduleJob.getUrl()+"?logId="+snowflakeNextId;
HttpResponse httpRes = HttpRequest.get(reqUrl).timeout(10000).execute();
log.info("logId" + snowflakeNextId + "; url: " + reqUrl);
if (httpRes.getStatus() == HttpStatus.HTTP_NOT_FOUND) {
throw new Exception(String.valueOf(HttpStatus.HTTP_NOT_FOUND));
}
} catch (Exception e) {
scheduleJobLog.setExecuteRes(-1); // 执行失败
scheduleJobLog.setErrorInfo(e.getMessage());
logService.saveOrUpdate(scheduleJobLog);
throw new RuntimeException(e);
}
}
总结
在本文中重点是讲解实现思路,代码并不算怎么优雅,格式优化的地方还有很多,参数校验也不够严谨,可以根据自己的需求改造。
代码拉取
开源项目,最好就是各路大神能一起维护这个项目。
仓库地址:https://gitcode.net/interestANd/quartz-job