参考网址
目标:定时任务持久化到数据库,动态调整数据库里保存的cron表达式使定时任务可以跟随变化。
从SYS_QUARTZ_JOB表(通过反射创建任务)和SYS_QUARTZ_LOG表(主要就是记录日志)构建两个对应的实体类:QuartzJob和QuartzLog
1.看表结构
-- Create table
create table SYS_QUARTZ_JOB
(
job_id NUMBER(20) not null,
bean_name NVARCHAR2(255),
cron_expression NVARCHAR2(255),
is_pause CHAR(1),
job_name NVARCHAR2(255),
method_name NVARCHAR2(255),
params NVARCHAR2(255),
description NVARCHAR2(255),
person_in_charge NVARCHAR2(100),
email NVARCHAR2(100),
sub_task NVARCHAR2(100),
pause_after_failure CHAR(1),
create_by NVARCHAR2(255),
update_by NVARCHAR2(255),
create_time DATE,
update_time DATE
)
tablespace USERS
pctfree 10
initrans 1
maxtrans 255
storage
(
initial 64K
next 1M
minextents 1
maxextents unlimited
);
-- Add comments to the table
comment on table SYS_QUARTZ_JOB
is '定时任务';
-- Add comments to the columns
comment on column SYS_QUARTZ_JOB.job_id
is 'ID';
comment on column SYS_QUARTZ_JOB.bean_name
is 'Spring Bean名称';
comment on column SYS_QUARTZ_JOB.cron_expression
is 'cron 表达式';
comment on column SYS_QUARTZ_JOB.is_pause
is '状态:1暂停、0启用';
comment on column SYS_QUARTZ_JOB.job_name
is '任务名称';
comment on column SYS_QUARTZ_JOB.method_name
is '方法名称';
comment on column SYS_QUARTZ_JOB.params
is '参数';
comment on column SYS_QUARTZ_JOB.description
is '备注';
comment on column SYS_QUARTZ_JOB.person_in_charge
is '负责人';
comment on column SYS_QUARTZ_JOB.email
is '报警邮箱';
comment on column SYS_QUARTZ_JOB.sub_task
is '子任务ID';
comment on column SYS_QUARTZ_JOB.pause_after_failure
is '任务失败后是否暂停';
comment on column SYS_QUARTZ_JOB.create_by
is '创建者';
comment on column SYS_QUARTZ_JOB.update_by
is '更新者';
comment on column SYS_QUARTZ_JOB.create_time
is '创建日期';
comment on column SYS_QUARTZ_JOB.update_time
is '更新时间';
-- Create/Recreate indexes
create index INX_IS_PAUSE on SYS_QUARTZ_JOB (IS_PAUSE)
tablespace USERS
pctfree 10
initrans 2
maxtrans 255
storage
(
initial 64K
next 1M
minextents 1
maxextents unlimited
);
-- Create/Recreate primary, unique and foreign key constraints
alter table SYS_QUARTZ_JOB
add primary key (JOB_ID)
using index
tablespace USERS
pctfree 10
initrans 2
maxtrans 255
storage
(
initial 64K
next 1M
minextents 1
maxextents unlimited
);
图形化理解字段:
-- Create table
create table SYS_QUARTZ_LOG
(
log_id NUMBER(20) not null,
bean_name NVARCHAR2(255),
create_time DATE,
cron_expression NVARCHAR2(255),
exception_detail NCLOB,
is_success VARCHAR2(1),
job_name NVARCHAR2(255),
method_name NVARCHAR2(255),
params NVARCHAR2(255),
time NUMBER(20)
)
tablespace USERS
pctfree 10
initrans 1
maxtrans 255
storage
(
initial 64K
next 1M
minextents 1
maxextents unlimited
);
-- Add comments to the table
comment on table SYS_QUARTZ_LOG
is '定时任务日志';
-- Add comments to the columns
comment on column SYS_QUARTZ_LOG.log_id
is 'ID';
-- Create/Recreate primary, unique and foreign key constraints
alter table SYS_QUARTZ_LOG
add primary key (LOG_ID)
using index
tablespace USERS
pctfree 10
initrans 2
maxtrans 255
storage
(
initial 64K
next 1M
minextents 1
maxextents unlimited
);
图形化理解字段:
2.项目各个文件位置及代码
config目录下(全局配置)
/*
*
*/
package com.njry.modules.quartz.config;
import com.njry.modules.quartz.domain.QuartzJob;
import lombok.RequiredArgsConstructor;
import com.njry.modules.quartz.mapper.QuartzJobMapper;
import com.njry.modules.quartz.utils.QuartzManage;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.stereotype.Component;
import java.util.List;
/**
* @author
* @date 2024-05-11
*/
@Component
@RequiredArgsConstructor
public class JobRunner implements ApplicationRunner {
private static final Logger log = LoggerFactory.getLogger(JobRunner.class);
private final QuartzJobMapper quartzJobMapper;
private final QuartzManage quartzManage;
/**
* 项目启动时重新激活启用的定时任务
*
* @param applicationArguments /
*/
@Override
public void run(ApplicationArguments applicationArguments) {
List<QuartzJob> quartzJobs = quartzJobMapper.findByIsPauseIsFalse();
quartzJobs.forEach(quartzManage::addJob);
log.info("Timing task injection complete");
}
}
/*
*
*/
package com.njry.modules.quartz.config;
import org.quartz.spi.TriggerFiredBundle;
import org.springframework.beans.factory.config.AutowireCapableBeanFactory;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.quartz.AdaptableJobFactory;
import org.springframework.stereotype.Component;
/**
* 定时任务配置
* @author
* @date
*/
@Configuration
public class QuartzConfig {
/**
* 解决Job中注入Spring Bean为null的问题
*/
@Component("quartzJobFactory")
public static class QuartzJobFactory extends AdaptableJobFactory {
private final AutowireCapableBeanFactory capableBeanFactory;
public QuartzJobFactory(AutowireCapableBeanFactory capableBeanFactory) {
this.capableBeanFactory = capableBeanFactory;
}
@Override
protected Object createJobInstance(TriggerFiredBundle bundle) throws Exception {
//调用父类的方法,把Job注入到spring中
Object jobInstance = super.createJobInstance(bundle);
capableBeanFactory.autowireBean(jobInstance);
return jobInstance;
}
}
}
domain目录下(实体类和请求体类)
vo目录下
/*
*
*/
package com.njry.modules.quartz.domain.vo;
import lombok.Data;
import java.sql.Timestamp;
import java.util.List;
/**
* @author
* @date
*/
@Data
public class QuartzJobQueryCriteria {
private String jobName;
private Boolean isSuccess;
private List<Timestamp> createTime;
}
/*
*
*/
package com.njry.modules.quartz.domain;
import com.baomidou.mybatisplus.annotation.IdType;
import com.baomidou.mybatisplus.annotation.TableField;
import com.baomidou.mybatisplus.annotation.TableId;
import com.baomidou.mybatisplus.annotation.TableName;
import com.njry.base.BaseEntity;
import io.swagger.annotations.ApiModelProperty;
import lombok.Getter;
import lombok.Setter;
import javax.validation.constraints.NotBlank;
import javax.validation.constraints.NotNull;
import java.io.Serializable;
/**
* @author
* @date
*/
@Getter
@Setter
@TableName("sys_quartz_job")
public class QuartzJob extends BaseEntity implements Serializable {
public static final String JOB_KEY = "JOB_KEY";
@TableId(value = "job_id", type = IdType.INPUT)
@NotNull(groups = {Update.class})
private Long id;
@TableField(exist = false)
@ApiModelProperty(value = "用于子任务唯一标识", hidden = true)
private String uuid;
@ApiModelProperty(value = "定时器名称")
private String jobName;
@NotBlank
@ApiModelProperty(value = "Bean名称")
private String beanName;
@NotBlank
@ApiModelProperty(value = "方法名称")
private String methodName;
@ApiModelProperty(value = "参数")
private String params;
@NotBlank
@ApiModelProperty(value = "cron表达式")
private String cronExpression;
@ApiModelProperty(value = "状态,暂时或启动")
private Boolean isPause = false;
@ApiModelProperty(value = "负责人")
private String personInCharge;
@ApiModelProperty(value = "报警邮箱")
private String email;
@ApiModelProperty(value = "子任务")
private String subTask;
@ApiModelProperty(value = "失败后暂停")
private Boolean pauseAfterFailure;
@NotBlank
@ApiModelProperty(value = "备注")
private String description;
}
/*
*
*/
package com.njry.modules.quartz.domain;
import com.baomidou.mybatisplus.annotation.*;
import io.swagger.annotations.ApiModelProperty;
import lombok.Data;
import java.io.Serializable;
import java.sql.Timestamp;
/**
* @author
* @date
*/
@Data
@TableName("sys_quartz_log")
public class QuartzLog implements Serializable {
@TableId(value = "log_id", type = IdType.INPUT)
@ApiModelProperty(value = "ID", hidden = true)
private Long id;
@ApiModelProperty(value = "任务名称", hidden = true)
private String jobName;
@ApiModelProperty(value = "bean名称", hidden = true)
private String beanName;
@ApiModelProperty(value = "方法名称", hidden = true)
private String methodName;
@ApiModelProperty(value = "参数", hidden = true)
private String params;
@ApiModelProperty(value = "cron表达式", hidden = true)
private String cronExpression;
@ApiModelProperty(value = "状态", hidden = true)
private Boolean isSuccess;
@ApiModelProperty(value = "异常详情", hidden = true)
private String exceptionDetail;
@ApiModelProperty(value = "执行耗时", hidden = true)
private Long time;
@TableField(fill = FieldFill.INSERT)
@ApiModelProperty(value = "创建时间", hidden = true)
private Timestamp createTime;
}
mapper目录下
/*
*
*/
package com.njry.modules.quartz.mapper;
import com.baomidou.mybatisplus.core.mapper.BaseMapper;
import com.baomidou.mybatisplus.core.metadata.IPage;
import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
import com.njry.modules.quartz.domain.QuartzJob;
import com.njry.modules.quartz.domain.vo.QuartzJobQueryCriteria;
import org.apache.ibatis.annotations.Mapper;
import org.apache.ibatis.annotations.Param;
import java.util.List;
/**
* @author
* @date
**/
@Mapper
public interface QuartzJobMapper extends BaseMapper<QuartzJob> {
IPage<QuartzJob> findAll(@Param("criteria") QuartzJobQueryCriteria criteria, Page<Object> page);
List<QuartzJob> findAll(@Param("criteria") QuartzJobQueryCriteria criteria);
List<QuartzJob> findByIsPauseIsFalse();
Long getSeq();
}
/*
*
*/
package com.njry.modules.quartz.mapper;
import com.baomidou.mybatisplus.core.mapper.BaseMapper;
import com.baomidou.mybatisplus.core.metadata.IPage;
import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
import com.njry.modules.quartz.domain.QuartzLog;
import com.njry.modules.quartz.domain.vo.QuartzJobQueryCriteria;
import org.apache.ibatis.annotations.Mapper;
import org.apache.ibatis.annotations.Param;
import java.util.List;
/**
* @author
* @date
**/
@Mapper
public interface QuartzLogMapper extends BaseMapper<QuartzLog> {
IPage<QuartzLog> findAll(@Param("criteria") QuartzJobQueryCriteria criteria, Page<Object> page);
List<QuartzLog> findAll(@Param("criteria") QuartzJobQueryCriteria criteria);
Long getSeq();
}
rest目录下
/*
*
*/
package com.njry.modules.quartz.mapper;
import com.baomidou.mybatisplus.core.mapper.BaseMapper;
import com.baomidou.mybatisplus.core.metadata.IPage;
import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
import com.njry.modules.quartz.domain.QuartzLog;
import com.njry.modules.quartz.domain.vo.QuartzJobQueryCriteria;
import org.apache.ibatis.annotations.Mapper;
import org.apache.ibatis.annotations.Param;
import java.util.List;
/**
* @author
* @date
**/
@Mapper
public interface QuartzLogMapper extends BaseMapper<QuartzLog> {
IPage<QuartzLog> findAll(@Param("criteria") QuartzJobQueryCriteria criteria, Page<Object> page);
List<QuartzLog> findAll(@Param("criteria") QuartzJobQueryCriteria criteria);
Long getSeq();
}
service目录下
impl目录下
/*
*
*/
package com.njry.modules.quartz.service.impl;
import cn.hutool.core.util.IdUtil;
import cn.hutool.core.util.StrUtil;
import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import com.njry.exception.BadRequestException;
import com.njry.modules.quartz.domain.QuartzJob;
import com.njry.modules.quartz.domain.QuartzLog;
import com.njry.modules.quartz.utils.QuartzManage;
import com.njry.utils.*;
import lombok.RequiredArgsConstructor;
import com.njry.modules.quartz.mapper.QuartzJobMapper;
import com.njry.modules.quartz.mapper.QuartzLogMapper;
import com.njry.modules.quartz.service.QuartzJobService;
import com.njry.modules.quartz.domain.vo.QuartzJobQueryCriteria;
import com.njry.utils.*;
import org.quartz.CronExpression;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import javax.servlet.http.HttpServletResponse;
import java.io.IOException;
import java.util.*;
/**
* @author
* @date
*/
@RequiredArgsConstructor
@Service(value = "quartzJobService")
public class QuartzJobServiceImpl extends ServiceImpl<QuartzJobMapper, QuartzJob> implements QuartzJobService {
private final QuartzJobMapper quartzJobMapper;
private final QuartzLogMapper quartzLogMapper;
private final QuartzManage quartzManage;
private final RedisUtils redisUtils;
@Override
public PageResult<QuartzJob> queryAll(QuartzJobQueryCriteria criteria, Page<Object> page){
return PageUtil.toPage(quartzJobMapper.findAll(criteria, page));
}
@Override
public PageResult<QuartzLog> queryAllLog(QuartzJobQueryCriteria criteria, Page<Object> page){
return PageUtil.toPage(quartzLogMapper.findAll(criteria, page));
}
@Override
public List<QuartzJob> queryAll(QuartzJobQueryCriteria criteria) {
return quartzJobMapper.findAll(criteria);
}
@Override
public List<QuartzLog> queryAllLog(QuartzJobQueryCriteria criteria) {
return quartzLogMapper.findAll(criteria);
}
@Override
@Transactional(rollbackFor = Exception.class)
public void create(QuartzJob resources) {
if (!CronExpression.isValidExpression(resources.getCronExpression())){
throw new BadRequestException("cron表达式格式错误");
}
resources.setId(quartzJobMapper.getSeq());
save(resources);
quartzManage.addJob(resources);
}
@Override
@Transactional(rollbackFor = Exception.class)
public void update(QuartzJob resources) {
if (!CronExpression.isValidExpression(resources.getCronExpression())){
throw new BadRequestException("cron表达式格式错误");
}
if(StringUtils.isNotBlank(resources.getSubTask())){
List<String> tasks = Arrays.asList(resources.getSubTask().split("[,,]"));
if (tasks.contains(resources.getId().toString())) {
throw new BadRequestException("子任务中不能添加当前任务ID");
}
}
saveOrUpdate(resources);
quartzManage.updateJobCron(resources);
}
@Override
@Transactional(rollbackFor = Exception.class)
public void updateIsPause(QuartzJob quartzJob) {
if (quartzJob.getIsPause()) {
quartzManage.resumeJob(quartzJob);
quartzJob.setIsPause(false);
} else {
quartzManage.pauseJob(quartzJob);
quartzJob.setIsPause(true);
}
saveOrUpdate(quartzJob);
}
@Override
public void execution(QuartzJob quartzJob) {
quartzManage.runJobNow(quartzJob);
}
@Override
@Transactional(rollbackFor = Exception.class)
public void delete(Set<Long> ids) {
for (Long id : ids) {
QuartzJob quartzJob = getById(id);
quartzManage.deleteJob(quartzJob);
removeById(quartzJob);
}
}
@Async
@Override
@Transactional(rollbackFor = Exception.class)
public void executionSubJob(String[] tasks) throws InterruptedException {
for (String id : tasks) {
if (StrUtil.isBlank(id)) {
// 如果是手动清除子任务id,会出现id为空字符串的问题
continue;
}
QuartzJob quartzJob = getById(Long.parseLong(id));
// 执行任务
String uuid = IdUtil.simpleUUID();
quartzJob.setUuid(uuid);
// 执行任务
execution(quartzJob);
// 获取执行状态,如果执行失败则停止后面的子任务执行
Boolean result = (Boolean) redisUtils.get(uuid);
while (result == null) {
// 休眠5秒,再次获取子任务执行情况
Thread.sleep(5000);
result = (Boolean) redisUtils.get(uuid);
}
if(!result){
redisUtils.del(uuid);
break;
}
}
}
@Override
public void download(List<QuartzJob> quartzJobs, HttpServletResponse response) throws IOException {
List<Map<String, Object>> list = new ArrayList<>();
for (QuartzJob quartzJob : quartzJobs) {
Map<String,Object> map = new LinkedHashMap<>();
map.put("任务名称", quartzJob.getJobName());
map.put("Bean名称", quartzJob.getBeanName());
map.put("执行方法", quartzJob.getMethodName());
map.put("参数", quartzJob.getParams());
map.put("表达式", quartzJob.getCronExpression());
map.put("状态", quartzJob.getIsPause() ? "暂停中" : "运行中");
map.put("描述", quartzJob.getDescription());
map.put("创建日期", quartzJob.getCreateTime());
list.add(map);
}
FileUtil.downloadExcel(list, response);
}
@Override
public void downloadLog(List<QuartzLog> queryAllLog, HttpServletResponse response) throws IOException {
List<Map<String, Object>> list = new ArrayList<>();
for (QuartzLog quartzLog : queryAllLog) {
Map<String,Object> map = new LinkedHashMap<>();
map.put("任务名称", quartzLog.getJobName());
map.put("Bean名称", quartzLog.getBeanName());
map.put("执行方法", quartzLog.getMethodName());
map.put("参数", quartzLog.getParams());
map.put("表达式", quartzLog.getCronExpression());
map.put("异常详情", quartzLog.getExceptionDetail());
map.put("耗时/毫秒", quartzLog.getTime());
map.put("状态", quartzLog.getIsSuccess() ? "成功" : "失败");
map.put("创建日期", quartzLog.getCreateTime());
list.add(map);
}
FileUtil.downloadExcel(list, response);
}
}
/*
*
*/
package com.njry.modules.quartz.service;
import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
import com.baomidou.mybatisplus.extension.service.IService;
import com.njry.modules.quartz.domain.QuartzJob;
import com.njry.modules.quartz.domain.QuartzLog;
import com.njry.utils.PageResult;
import com.njry.modules.quartz.domain.vo.QuartzJobQueryCriteria;
import javax.servlet.http.HttpServletResponse;
import java.io.IOException;
import java.util.List;
import java.util.Set;
/**
* @author
* @date
*/
public interface QuartzJobService extends IService<QuartzJob> {
/**
* 分页查询
*
* @param criteria 条件
* @param page 分页参数
* @return /
*/
PageResult<QuartzJob> queryAll(QuartzJobQueryCriteria criteria, Page<Object> page);
/**
* 查询全部
* @param criteria 条件
* @return /
*/
List<QuartzJob> queryAll(QuartzJobQueryCriteria criteria);
/**
* 分页查询日志
*
* @param criteria 条件
* @param page 分页参数
* @return /
*/
PageResult<QuartzLog> queryAllLog(QuartzJobQueryCriteria criteria, Page<Object> page);
/**
* 查询全部
* @param criteria 条件
* @return /
*/
List<QuartzLog> queryAllLog(QuartzJobQueryCriteria criteria);
/**
* 创建
* @param resources /
*/
void create(QuartzJob resources);
/**
* 编辑
* @param resources /
*/
void update(QuartzJob resources);
/**
* 删除任务
* @param ids /
*/
void delete(Set<Long> ids);
/**
* 更改定时任务状态
* @param quartzJob /
*/
void updateIsPause(QuartzJob quartzJob);
/**
* 立即执行定时任务
* @param quartzJob /
*/
void execution(QuartzJob quartzJob);
/**
* 导出定时任务
* @param queryAll 待导出的数据
* @param response /
* @throws IOException /
*/
void download(List<QuartzJob> queryAll, HttpServletResponse response) throws IOException;
/**
* 导出定时任务日志
* @param queryAllLog 待导出的数据
* @param response /
* @throws IOException /
*/
void downloadLog(List<QuartzLog> queryAllLog, HttpServletResponse response) throws IOException;
/**
* 执行子任务
* @param tasks /
* @throws InterruptedException /
*/
void executionSubJob(String[] tasks) throws InterruptedException;
}
task目录下
这类目的:
就是定义的测试任务
/*
*
*/
package com.njry.modules.quartz.task;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
/**
* 测试用
* @author
* @date
*/
@Slf4j
@Service
public class TestTask {
public void run(){
log.info("run 执行成功");
}
public void run1(String str){
log.info("run1 执行成功,参数为: {}", str);
}
public void run2(){
log.info("run2 执行成功");
}
}
utils目录下
这个类目的:
通过bean容器获取QuartzLogMapper实现日志记录,
通过bean容器获取QuartzJobService判断子任务,或者任务是否存在一类的
自定义线程池,异步执行反射获取的任务----会遇到子线程获取不到主线程的SecurityContext问题
(SecurityContextHolder.getContext()获取不到Security上下文数据?)
/*
*
*/
package com.njry.modules.quartz.utils;
import cn.hutool.extra.template.Template;
import cn.hutool.extra.template.TemplateConfig;
import cn.hutool.extra.template.TemplateEngine;
import cn.hutool.extra.template.TemplateUtil;
import com.njry.modules.tools.domain.vo.EmailVo;
import com.njry.modules.quartz.domain.QuartzJob;
import com.njry.modules.quartz.domain.QuartzLog;
import com.njry.modules.tools.service.EmailService;
import com.njry.utils.RedisUtils;
import com.njry.utils.SpringContextHolder;
import com.njry.utils.StringUtils;
import com.njry.utils.ThrowableUtil;
import com.njry.modules.quartz.mapper.QuartzLogMapper;
import com.njry.modules.quartz.service.QuartzJobService;
import org.quartz.JobExecutionContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.scheduling.annotation.Async;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.scheduling.quartz.QuartzJobBean;
import org.springframework.security.task.DelegatingSecurityContextAsyncTaskExecutor;
import java.util.*;
import java.util.concurrent.*;
/**
* 参考人人开源,<a href="https://gitee.com/renrenio/renren-security">...</a>
* @author
* @date
*/
@Async
public class ExecutionJob extends QuartzJobBean {
private final Logger logger = LoggerFactory.getLogger(this.getClass());
// 此处仅供参考,可根据任务执行情况自定义线程池参数
private final ThreadPoolTaskExecutor executor = SpringContextHolder.getBean("elAsync");
// private final DelegatingSecurityContextAsyncTaskExecutor delegateExecutor= SpringContextHolder.getBean("elAsyncAgain");
@Override
public void executeInternal(JobExecutionContext context) {
// 获取任务
//1. 通过context.getJobDetail().getJobDataMap()方式分别获得Job、Trigger参数
//2. 通过context.getMergedJobDataMap()合并方式获得参数
//Ps:若同名则优先获取Trigger类型的数据,屏蔽了JobDetail类型数据。
QuartzJob quartzJob = (QuartzJob) context.getMergedJobDataMap().get(QuartzJob.JOB_KEY);
// 获取spring bean
QuartzLogMapper quartzLogMapper = SpringContextHolder.getBean(QuartzLogMapper.class);
QuartzJobService quartzJobService = SpringContextHolder.getBean(QuartzJobService.class);
RedisUtils redisUtils = SpringContextHolder.getBean(RedisUtils.class);
String uuid = quartzJob.getUuid();
QuartzLog log = new QuartzLog();
log.setJobName(quartzJob.getJobName());
log.setBeanName(quartzJob.getBeanName());
log.setMethodName(quartzJob.getMethodName());
log.setParams(quartzJob.getParams());
long startTime = System.currentTimeMillis();
log.setCronExpression(quartzJob.getCronExpression());
try {
// 执行任务
QuartzRunnable task = new QuartzRunnable(quartzJob.getBeanName(), quartzJob.getMethodName(), quartzJob.getParams());
Future<?> future = executor.submit(task);
// Future<?> future = delegateExecutor.submit(task);
future.get();
long times = System.currentTimeMillis() - startTime;
log.setTime(times);
if(StringUtils.isNotBlank(uuid)) {
redisUtils.set(uuid, true);
}
// 任务状态
log.setIsSuccess(true);
logger.info("任务执行成功,任务名称:" + quartzJob.getJobName() + ", 执行时间:" + times + "毫秒");
// 判断是否存在子任务
if(StringUtils.isNotBlank(quartzJob.getSubTask())){
String[] tasks = quartzJob.getSubTask().split("[,,]");
// 执行子任务
quartzJobService.executionSubJob(tasks);
}
} catch (Exception e) {
if(StringUtils.isNotBlank(uuid)) {
redisUtils.set(uuid, false);
}
logger.error("任务执行失败,任务名称:" + quartzJob.getJobName());
long times = System.currentTimeMillis() - startTime;
log.setTime(times);
// 任务状态 0:成功 1:失败
log.setIsSuccess(false);
log.setExceptionDetail(ThrowableUtil.getStackTrace(e));
// 任务如果失败了则暂停
if(quartzJob.getPauseAfterFailure() != null && quartzJob.getPauseAfterFailure()){
quartzJob.setIsPause(false);
//更新状态
quartzJobService.updateIsPause(quartzJob);
}
if(quartzJob.getEmail() != null){
EmailService emailService = SpringContextHolder.getBean(EmailService.class);
// 邮箱报警
if(StringUtils.isNoneBlank(quartzJob.getEmail())){
EmailVo emailVo = taskAlarm(quartzJob, ThrowableUtil.getStackTrace(e));
emailService.send(emailVo, emailService.find());
}
}
} finally {
log.setId(quartzLogMapper.getSeq());
quartzLogMapper.insert(log);
}
}
private EmailVo taskAlarm(QuartzJob quartzJob, String msg) {
EmailVo emailVo = new EmailVo();
emailVo.setSubject("定时任务【"+ quartzJob.getJobName() +"】执行失败,请尽快处理!");
Map<String, Object> data = new HashMap<>(16);
data.put("task", quartzJob);
data.put("msg", msg);
TemplateEngine engine = TemplateUtil.createEngine(new TemplateConfig("template", TemplateConfig.ResourceMode.CLASSPATH));
Template template = engine.getTemplate("taskAlarm.ftl");
emailVo.setContent(template.render(data));
List<String> emails = Arrays.asList(quartzJob.getEmail().split("[,,]"));
emailVo.setTos(emails);
return emailVo;
}
}
这个类目的:
实现定时任务增删的基本操作
/*
*
*/
package com.njry.modules.quartz.utils;
import com.njry.exception.BadRequestException;
import com.njry.modules.quartz.domain.QuartzJob;
import lombok.extern.slf4j.Slf4j;
import org.quartz.*;
import org.quartz.impl.triggers.CronTriggerImpl;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
import java.util.Date;
import static org.quartz.TriggerBuilder.newTrigger;
/**
* @author
* @date
*/
@Slf4j
@Component
public class QuartzManage {
private static final String JOB_NAME = "TASK_";
@Resource
private Scheduler scheduler;
public void addJob(QuartzJob quartzJob){
try {
// 构建job信息
JobDetail jobDetail = JobBuilder.newJob(ExecutionJob.class).
withIdentity(JOB_NAME + quartzJob.getId()).build();
//通过触发器名和cron 表达式创建 Trigger
Trigger cronTrigger = newTrigger()
.withIdentity(JOB_NAME + quartzJob.getId())
.startNow()
.withSchedule(CronScheduleBuilder.cronSchedule(quartzJob.getCronExpression()))
.build();
// 这里保存调度任务信息,在ExecutionJob取出来,用QuartzRunnable反射找到对应执行job,但是交给子线程异步执行
cronTrigger.getJobDataMap().put(QuartzJob.JOB_KEY, quartzJob);
//重置启动时间
((CronTriggerImpl)cronTrigger).setStartTime(new Date());
//执行定时任务
scheduler.scheduleJob(jobDetail,cronTrigger);
// 暂停任务
if (quartzJob.getIsPause()) {
pauseJob(quartzJob);
}
} catch (Exception e){
log.error("创建定时任务失败", e);
throw new BadRequestException("创建定时任务失败");
}
}
/**
* 更新job cron表达式
* @param quartzJob /
*/
public void updateJobCron(QuartzJob quartzJob){
try {
TriggerKey triggerKey = TriggerKey.triggerKey(JOB_NAME + quartzJob.getId());
CronTrigger trigger = (CronTrigger) scheduler.getTrigger(triggerKey);
// 如果不存在则创建一个定时任务
if(trigger == null){
addJob(quartzJob);
trigger = (CronTrigger) scheduler.getTrigger(triggerKey);
}
CronScheduleBuilder scheduleBuilder = CronScheduleBuilder.cronSchedule(quartzJob.getCronExpression());
trigger = trigger.getTriggerBuilder().withIdentity(triggerKey).withSchedule(scheduleBuilder).build();
//重置启动时间
((CronTriggerImpl)trigger).setStartTime(new Date());
trigger.getJobDataMap().put(QuartzJob.JOB_KEY,quartzJob);
scheduler.rescheduleJob(triggerKey, trigger);
// 暂停任务
if (quartzJob.getIsPause()) {
pauseJob(quartzJob);
}
} catch (Exception e){
log.error("更新定时任务失败", e);
throw new BadRequestException("更新定时任务失败");
}
}
/**
* 删除一个job
* @param quartzJob /
*/
public void deleteJob(QuartzJob quartzJob){
try {
JobKey jobKey = JobKey.jobKey(JOB_NAME + quartzJob.getId());
scheduler.pauseJob(jobKey);
scheduler.deleteJob(jobKey);
} catch (Exception e){
log.error("删除定时任务失败", e);
throw new BadRequestException("删除定时任务失败");
}
}
/**
* 恢复一个job
* @param quartzJob /
*/
public void resumeJob(QuartzJob quartzJob){
try {
TriggerKey triggerKey = TriggerKey.triggerKey(JOB_NAME + quartzJob.getId());
CronTrigger trigger = (CronTrigger) scheduler.getTrigger(triggerKey);
// 如果不存在则创建一个定时任务
if(trigger == null) {
addJob(quartzJob);
}
JobKey jobKey = JobKey.jobKey(JOB_NAME + quartzJob.getId());
scheduler.resumeJob(jobKey);
} catch (Exception e){
log.error("恢复定时任务失败", e);
throw new BadRequestException("恢复定时任务失败");
}
}
/**
* 立即执行job
* @param quartzJob /
*/
public void runJobNow(QuartzJob quartzJob){
try {
TriggerKey triggerKey = TriggerKey.triggerKey(JOB_NAME + quartzJob.getId());
CronTrigger trigger = (CronTrigger) scheduler.getTrigger(triggerKey);
// 如果不存在则创建一个定时任务
if(trigger == null) {
addJob(quartzJob);
}
JobDataMap dataMap = new JobDataMap();
dataMap.put(QuartzJob.JOB_KEY, quartzJob);
JobKey jobKey = JobKey.jobKey(JOB_NAME + quartzJob.getId());
scheduler.triggerJob(jobKey,dataMap);
} catch (Exception e){
log.error("定时任务执行失败", e);
throw new BadRequestException("定时任务执行失败");
}
}
/**
* 暂停一个job
* @param quartzJob /
*/
public void pauseJob(QuartzJob quartzJob){
try {
JobKey jobKey = JobKey.jobKey(JOB_NAME + quartzJob.getId());
scheduler.pauseJob(jobKey);
} catch (Exception e){
log.error("定时任务暂停失败", e);
throw new BadRequestException("定时任务暂停失败");
}
}
}
这个类目的:
通过反射的方式调用定时任务,这样就不用手动生成每个Quartz的Job
/*
*
*/
package com.njry.modules.quartz.utils;
import com.njry.utils.SpringContextHolder;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.util.ReflectionUtils;
import java.lang.reflect.Method;
import java.util.concurrent.Callable;
/**
* 执行定时任务
* @author
*/
@Slf4j
public class QuartzRunnable implements Callable<Object> {
private final Object target;
private final Method method;
private final String params;
QuartzRunnable(String beanName, String methodName, String params)
throws NoSuchMethodException, SecurityException {
this.target = SpringContextHolder.getBean(beanName);
this.params = params;
if (StringUtils.isNotBlank(params)) {
this.method = target.getClass().getDeclaredMethod(methodName, String.class);
} else {
this.method = target.getClass().getDeclaredMethod(methodName);
}
}
@Override
@SuppressWarnings("all")
public Object call() throws Exception {
ReflectionUtils.makeAccessible(method);
if (StringUtils.isNotBlank(params)) {
method.invoke(target, params);
} else {
method.invoke(target);
}
return null;
}
}
mapper.xml
<?xml version="1.0" encoding="UTF-8" ?>
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd" >
<mapper namespace="com.njry.modules.quartz.mapper.QuartzJobMapper">
<resultMap id="BaseResultMap" type="com.njry.modules.quartz.domain.QuartzJob">
<id column="job_id" property="id" jdbcType="BIGINT"/>
<result column="job_name" property="jobName" jdbcType="VARCHAR"/>
<result column="bean_name" property="beanName" jdbcType="VARCHAR"/>
<result column="method_name" property="methodName" jdbcType="VARCHAR"/>
<result column="params" property="params" jdbcType="VARCHAR"/>
<result column="cron_expression" property="cronExpression" jdbcType="VARCHAR"/>
<result column="is_pause" property="isPause" jdbcType="TINYINT"/>
<result column="person_in_charge" property="personInCharge" jdbcType="VARCHAR"/>
<result column="email" property="email" jdbcType="VARCHAR"/>
<result column="sub_task" property="subTask" jdbcType="VARCHAR"/>
<result column="pause_after_failure" property="pauseAfterFailure" jdbcType="TINYINT"/>
<result column="description" property="description" jdbcType="VARCHAR"/>
<result column="create_time" property="createTime" jdbcType="TIMESTAMP"/>
<result column="update_time" property="updateTime" jdbcType="TIMESTAMP"/>
<result column="create_by" property="createBy" jdbcType="VARCHAR"/>
<result column="update_by" property="updateBy" jdbcType="VARCHAR"/>
</resultMap>
<sql id="Base_Column_List">
job_id, job_name, bean_name, method_name, params, cron_expression, is_pause, person_in_charge, email, sub_task, pause_after_failure, description, create_time, update_time, create_by, update_by
</sql>
<select id="findAll" resultMap="BaseResultMap">
SELECT
<include refid="Base_Column_List"/>
FROM sys_quartz_job
<where>
<if test="criteria.jobName != null and criteria.jobName != ''">
AND job_name LIKE CONCAT('%'||#{criteria.jobName},'%')
</if>
<if test="criteria.createTime != null and criteria.createTime.size() > 0">
AND update_time BETWEEN #{criteria.createTime[0]} AND #{criteria.createTime[1]}
</if>
</where>
ORDER BY job_id DESC
</select>
<select id="findByIsPauseIsFalse" resultMap="BaseResultMap">
SELECT
<include refid="Base_Column_List"/>
FROM sys_quartz_job
WHERE is_pause = '0'
</select>
<select id="getSeq" resultType="Long">
select seq_sys_quartz_job.nextval user_user_id from dual
</select>
</mapper>
<?xml version="1.0" encoding="UTF-8" ?>
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd" >
<mapper namespace="com.njry.modules.quartz.mapper.QuartzLogMapper">
<resultMap id="BaseResultMap" type="com.njry.modules.quartz.domain.QuartzLog">
<id column="log_id" property="id" jdbcType="BIGINT"/>
<result column="job_name" property="jobName" jdbcType="VARCHAR"/>
<result column="bean_name" property="beanName" jdbcType="VARCHAR"/>
<result column="method_name" property="methodName" jdbcType="VARCHAR"/>
<result column="params" property="params" jdbcType="VARCHAR"/>
<result column="cron_expression" property="cronExpression" jdbcType="VARCHAR"/>
<result column="is_success" property="isSuccess" jdbcType="VARCHAR"/>
<result column="exception_detail" property="exceptionDetail" jdbcType="BIGINT"/>
<result column="time" property="time" jdbcType="BIGINT"/>
<result column="create_time" property="createTime" jdbcType="TIMESTAMP"/>
</resultMap>
<sql id="Base_Column_List">
log_id, job_name, bean_name, method_name, params, cron_expression, is_success, exception_detail, time, create_time
</sql>
<select id="findAll" resultMap="BaseResultMap">
SELECT
<include refid="Base_Column_List"/>
FROM sys_quartz_log
<where>
<if test="criteria.jobName != null and criteria.jobName != ''">
AND job_name LIKE CONCAT('%'||#{criteria.jobName},'%')
</if>
<if test="criteria.isSuccess != null">
AND is_success = #{criteria.isSuccess}
</if>
<if test="criteria.createTime != null and criteria.createTime.size() > 0">
AND create_time BETWEEN #{criteria.createTime[0]} AND #{criteria.createTime[1]}
</if>
</where>
ORDER BY log_id DESC
</select>
<select id="getSeq" resultType="Long">
select seq_sys_quartz_log.nextval user_user_id from dual
</select>
</mapper>
看几个测试任务存入表里
3.项目代码用到的全局几个工具类
这个类目的:
获取全局上下文
/*
*
*/
package com.njry.utils;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.core.env.Environment;
import org.springframework.stereotype.Service;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
/**
* @author
* @date
*/
@Slf4j
public class SpringContextHolder implements ApplicationContextAware, DisposableBean {
private static ApplicationContext applicationContext = null;
private static final List<CallBack> CALL_BACKS = new ArrayList<>();
private static boolean addCallback = true;
/**
* 针对 某些初始化方法,在SpringContextHolder 未初始化时 提交回调方法。
* 在SpringContextHolder 初始化后,进行回调使用
*
* @param callBack 回调函数
*/
public synchronized static void addCallBacks(CallBack callBack) {
if (addCallback) {
SpringContextHolder.CALL_BACKS.add(callBack);
} else {
log.warn("CallBack:{} 已无法添加!立即执行", callBack.getCallBackName());
callBack.executor();
}
}
/**
* 从静态变量applicationContext中取得Bean, 自动转型为所赋值对象的类型.
*/
@SuppressWarnings("unchecked")
public static <T> T getBean(String name) {
assertContextInjected();
return (T) applicationContext.getBean(name);
}
/**
* 从静态变量applicationContext中取得Bean, 自动转型为所赋值对象的类型.
*/
public static <T> T getBean(Class<T> requiredType) {
assertContextInjected();
return applicationContext.getBean(requiredType);
}
/**
* 获取SpringBoot 配置信息
*
* @param property 属性key
* @param defaultValue 默认值
* @param requiredType 返回类型
* @return /
*/
public static <T> T getProperties(String property, T defaultValue, Class<T> requiredType) {
T result = defaultValue;
try {
result = getBean(Environment.class).getProperty(property, requiredType);
} catch (Exception ignored) {}
return result;
}
/**
* 获取SpringBoot 配置信息
*
* @param property 属性key
* @return /
*/
public static String getProperties(String property) {
return getProperties(property, null, String.class);
}
/**
* 获取SpringBoot 配置信息
*
* @param property 属性key
* @param requiredType 返回类型
* @return /
*/
public static <T> T getProperties(String property, Class<T> requiredType) {
return getProperties(property, null, requiredType);
}
/**
* 检查ApplicationContext不为空.
*/
private static void assertContextInjected() {
if (applicationContext == null) {
throw new IllegalStateException("applicaitonContext属性未注入, 请在applicationContext" +
".xml中定义SpringContextHolder或在SpringBoot启动类中注册SpringContextHolder.");
}
}
/**
* 清除SpringContextHolder中的ApplicationContext为Null.
*/
private static void clearHolder() {
log.debug("清除SpringContextHolder中的ApplicationContext:"
+ applicationContext);
applicationContext = null;
}
@Override
public void destroy() {
SpringContextHolder.clearHolder();
}
@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
if (SpringContextHolder.applicationContext != null) {
log.warn("SpringContextHolder中的ApplicationContext被覆盖, 原有ApplicationContext为:" + SpringContextHolder.applicationContext);
}
SpringContextHolder.applicationContext = applicationContext;
if (addCallback) {
for (CallBack callBack : SpringContextHolder.CALL_BACKS) {
callBack.executor();
}
CALL_BACKS.clear();
}
SpringContextHolder.addCallback = false;
}
/**
* 获取 @Service 的所有 bean 名称
* @return /
*/
public static List<String> getAllServiceBeanName() {
return new ArrayList<>(Arrays.asList(applicationContext
.getBeanNamesForAnnotation(Service.class)));
}
}
这个类目的:
自定义线程池
package com.njry.config.thread;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.security.core.context.SecurityContext;
import org.springframework.security.core.context.SecurityContextHolder;
import org.springframework.security.task.DelegatingSecurityContextAsyncTaskExecutor;
import java.util.concurrent.Executor;
import java.util.concurrent.ThreadPoolExecutor;
/**
* 创建自定义的线程池
* @author njry
* @date 2024-05-11
**/
@Configuration
public class CustomExecutorConfig {
/**
* 自定义线程池,用法 @Async
* @return Executor
*/
@Bean
@Primary
public Executor elAsync() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(AsyncTaskProperties.corePoolSize);
executor.setMaxPoolSize(AsyncTaskProperties.maxPoolSize);
executor.setQueueCapacity(AsyncTaskProperties.queueCapacity);
executor.setThreadNamePrefix("el-async-");
executor.setKeepAliveSeconds(AsyncTaskProperties.keepAliveSeconds);
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
executor.initialize();
return executor;
}
/**
* 自定义线程池,用法 @Async
* @return Executor
*/
@Bean
@Primary
public Executor elAsyncAgain() {
SecurityContext context = SecurityContextHolder.getContext();
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(AsyncTaskProperties.corePoolSize);
executor.setMaxPoolSize(AsyncTaskProperties.maxPoolSize);
executor.setQueueCapacity(AsyncTaskProperties.queueCapacity);
executor.setThreadNamePrefix("el-async-");
executor.setKeepAliveSeconds(AsyncTaskProperties.keepAliveSeconds);
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
executor.initialize();
// Constructor threw exception; nested exception is java.lang.ClassCastException:
// org.springframework.security.task.DelegatingSecurityContextAsyncTaskExecutor cannot be cast to
// org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor
// 这里不能类型转换
DelegatingSecurityContextAsyncTaskExecutor delegatingSecurityContextAsyncTaskExecutor = new DelegatingSecurityContextAsyncTaskExecutor(executor,context);
return delegatingSecurityContextAsyncTaskExecutor;
}
/**
* 自定义线程池,用法 @Async("otherAsync")
* @return Executor
*/
@Bean
public Executor otherAsync() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(15);
executor.setQueueCapacity(50);
executor.setKeepAliveSeconds(AsyncTaskProperties.keepAliveSeconds);
executor.setThreadNamePrefix("el-task-");
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
executor.initialize();
return executor;
}
}