<!-- quartz定时任务 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-quartz</artifactId>
</dependency>
单机版本:
SpringBoot集成Quartz动态定时任务_jobgroupname_小百菜的博客-CSDN博客
集群遇到的问题:
需要注意:集群模式下,最优方案是调度器独立为一个项目,然后再去调度执行器(集群),成熟的解决方案比如xxl-job。
1、同一个任务在不同节点同时执行。
2、前端请求,会随机选择一个后台节点,不可控。
解决思路:
1、启动任务和关闭任务时,将任务ID插入一个记录表,并返回版本号(自增主键)。
2、修改任务时,需要先将任务关闭,并插入到记录表。
由于启动任务和关闭任务,是从同一个表的自增主键拿的版本号,所以一定就有先后顺序,可以根据这个先后顺序,判断是否执行任务。
当执行任务时,当前任务版本为最新版本,才可以继续往下执行,否则关闭当前节点的任务。
关键代码:
1、任务调度器
package com.study.job;
import com.study.bean.Task;
import com.study.dao.DemoDao;
import org.quartz.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.quartz.SchedulerFactoryBean;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
/**
* 定时任务
* @author lhs
* @date 2021年4月30日 下午4:58:24
*/
@Component
public class CronScheduleJob {
private Logger logger = LoggerFactory.getLogger(CronScheduleJob.class);
@Autowired
private SchedulerFactoryBean schedulerFactoryBean;
@Autowired
private DemoDao demoDao;
@PostConstruct // 构造函数之后执行
public void init() {
//Spring容器加载之后,启动以前的定时任务
logger.info("启动以前的定时任务");
List<Task> list = demoDao.getAllTask();
for (Task task : list) {
String cron = task.getCron();
if (cron != null) {
int id = task.getTaskId();
//插入任务记录,返回一个版本号
int version = demoDao.addTaskRecord(id);
//启动任务
startTask(id, version, cron);
}
}
logger.info("定时任务启动完成!");
}
/**
* 添加一个定时任务
* @param jobName 任务名
* @param jobGroupName 任务组名
* @param triggerName 触发器名
* @param triggerGroupName 触发器组名
* @param cron 时间设置,参考quartz说明文档
* @param params 任务参数
* @author lhs
* @date 2021年4月30日 下午4:58:24
*/
public void addJob(String jobName, String jobGroupName, String triggerName, String triggerGroupName, String cron, Map<String, Object> params) throws Exception {
// 任务名,任务组,任务执行类
JobDetail job = JobBuilder.newJob(ScheduledJob.class).withIdentity(jobName, jobGroupName).build();
// 触发器,触发器名,触发器组
TriggerBuilder<Trigger> triggerBuilder = TriggerBuilder.newTrigger().withIdentity(triggerName, triggerGroupName);
// 触发器时间设定
CronTrigger trigger = triggerBuilder.withSchedule(CronScheduleBuilder.cronSchedule(cron)).build();
//还可以指定开始执行时间,还可以指定间隔时间,比如cron表达式不能写出每隔50秒执行一次,可以用这种方式。
//Date date = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").parse("2021-05-06 17:10:00");
//triggerBuilder.startAt(date);
// 任务参数
job.getJobDataMap().putAll(params);
Scheduler scheduler = schedulerFactoryBean.getScheduler();
// 调度容器设置Job和Trigger
scheduler.scheduleJob(job, trigger);
}
/**
* 执行一次
* @param jobName 任务名
* @param jobGroupName 任务组名
* @param triggerName 触发器名
* @param triggerGroupName 触发器组名
* @param params 任务参数
* @author lhs
* @date 2021年4月30日 下午4:58:24
*/
public void onceJob(String jobName, String jobGroupName, String triggerName, String triggerGroupName, Map<String, Object> params) throws Exception {
// 任务名,任务组,任务执行类
JobDetail job = JobBuilder.newJob(ScheduledJob.class).withIdentity(jobName, jobGroupName).build();
// 触发器,触发器名,触发器组
TriggerBuilder<Trigger> triggerBuilder = TriggerBuilder.newTrigger().withIdentity(triggerName, triggerGroupName);
// 触发器间隔时间和重复时间设定,10这个参数代表指定一个以秒为单位的重复间隔,0这个参数代表指定触发器将重复的次数,总执行次数=重复次数+1
SimpleTrigger trigger = triggerBuilder.startNow().withSchedule(SimpleScheduleBuilder.simpleSchedule().withIntervalInSeconds(10).withRepeatCount(0)).build();
//还可以指定开始执行时间,还可以指定间隔时间,比如cron表达式不能写出每隔50秒执行一次,可以用这种方式。
//Date date = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").parse("2021-05-06 17:10:00");
//SimpleTrigger trigger = triggerBuilder.startAt(date).withSchedule(SimpleScheduleBuilder.simpleSchedule().withIntervalInSeconds(10).withRepeatCount(2)).build();
// 任务参数
job.getJobDataMap().putAll(params);
Scheduler scheduler = schedulerFactoryBean.getScheduler();
// 调度容器设置Job和Trigger
scheduler.scheduleJob(job, trigger);
}
/**
* 重复执行
* @param jobName 任务名
* @param jobGroupName 任务组名
* @param triggerName 触发器名
* @param triggerGroupName 触发器组名
* @param count 重复次数
* @param params 任务参数
* @author lhs
* @date 2021年4月30日 下午4:58:24
*/
public void repeatJob(String jobName, String jobGroupName, String triggerName, String triggerGroupName, Integer count, Map<String, Object> params) throws Exception {
// 任务名,任务组,任务执行类
JobDetail job = JobBuilder.newJob(ScheduledJob.class).withIdentity(jobName, jobGroupName).build();
// 触发器,触发器名,触发器组
TriggerBuilder<Trigger> triggerBuilder = TriggerBuilder.newTrigger().withIdentity(triggerName, triggerGroupName);
// 触发器间隔时间和重复时间设定,10这个参数代表指定一个以秒为单位的重复间隔,0这个参数代表指定触发器将重复的次数,总执行次数=重复次数+1
// SimpleTrigger trigger = triggerBuilder.startNow().withSchedule(SimpleScheduleBuilder.simpleSchedule().withIntervalInSeconds(10).withRepeatCount(0)).build();
// 间隔时间:withIntervalInSeconds,执行次数:withRepeatCount
SimpleTrigger trigger = triggerBuilder.startNow().withSchedule(SimpleScheduleBuilder.simpleSchedule().withIntervalInSeconds(1).withRepeatCount(count - 1)).build();
//还可以指定开始执行时间,还可以指定间隔时间,比如cron表达式不能写出每隔50秒执行一次,可以用这种方式。
//Date date = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").parse("2021-05-06 17:10:00");
//SimpleTrigger trigger = triggerBuilder.startAt(date).withSchedule(SimpleScheduleBuilder.simpleSchedule().withIntervalInSeconds(10).withRepeatCount(2)).build();
// 任务参数
job.getJobDataMap().putAll(params);
Scheduler scheduler = schedulerFactoryBean.getScheduler();
// 调度容器设置Job和Trigger
scheduler.scheduleJob(job, trigger);
}
/**
* 修改一个任务的触发时间
* @param triggerName 触发器名
* @param triggerGroupName 触发器组名
* @param cron 时间设置,参考quartz说明文档
* @author lhs
* @date 2021年4月30日 下午4:58:24
*/
public void updateJob(String jobName, String jobGroupName, String triggerName, String triggerGroupName, String cron, Map<String, Object> params) throws Exception {
TriggerKey triggerKey = TriggerKey.triggerKey(triggerName, triggerGroupName);
Scheduler scheduler = schedulerFactoryBean.getScheduler();
CronTrigger trigger = (CronTrigger) scheduler.getTrigger(triggerKey);
if (trigger == null) {
addJob(jobName, jobGroupName, triggerName, triggerGroupName, cron, params);
return;
}
String oldCron = trigger.getCronExpression();
if (!oldCron.equalsIgnoreCase(cron)) {
// 触发器,触发器名,触发器组
TriggerBuilder<Trigger> triggerBuilder = TriggerBuilder.newTrigger().withIdentity(triggerName, triggerGroupName);
// 触发器时间设定
trigger = triggerBuilder.withSchedule(CronScheduleBuilder.cronSchedule(cron)).build();
// 修改一个任务的触发时间
scheduler.rescheduleJob(triggerKey, trigger);
}
}
/**
* 移除一个任务
* @param jobName 任务名
* @param jobGroupName 任务组名
* @param triggerName 触发器名
* @param triggerGroupName 触发器组名
* @author lhs
* @date 2021年4月30日 下午4:58:24
*/
public void deleteJob(String jobName, String jobGroupName, String triggerName, String triggerGroupName) throws Exception {
TriggerKey triggerKey = TriggerKey.triggerKey(triggerName, triggerGroupName);
Scheduler scheduler = schedulerFactoryBean.getScheduler();
// 停止触发器
scheduler.pauseTrigger(triggerKey);
// 移除触发器
scheduler.unscheduleJob(triggerKey);
// 删除任务
scheduler.deleteJob(JobKey.jobKey(jobName, jobGroupName));
}
/**
* 暂停job
* @param jobName 任务名称
* @param jobGroupName 任务所在组名称
* @author lhs
* @date 2021年4月30日 下午4:58:24
*/
public void pauseJob(String jobName, String jobGroupName) throws Exception {
Scheduler scheduler = schedulerFactoryBean.getScheduler();
scheduler.pauseJob(JobKey.jobKey(jobName, jobGroupName));
}
/**
* 恢复job
* @param jobName 任务名称
* @param jobGroupName 任务所在组名称
* @author lhs
* @date 2021年4月30日 下午4:58:24
*/
public void resumeJob(String jobName, String jobGroupName) throws Exception {
Scheduler scheduler = schedulerFactoryBean.getScheduler();
scheduler.resumeJob(JobKey.jobKey(jobName, jobGroupName));
}
/**
* 启动所有定时任务
* @author lhs
* @date 2021年4月30日 下午4:58:24
*/
public void startAllJobs() throws Exception {
Scheduler scheduler = schedulerFactoryBean.getScheduler();
scheduler.start();
}
/**
* 关闭所有定时任务
* @author lhs
* @date 2021年4月30日 下午4:58:24
*/
public void shutdownAllJobs() throws Exception {
Scheduler scheduler = schedulerFactoryBean.getScheduler();
if (!scheduler.isShutdown()) {
scheduler.shutdown();
}
}
/**
* 获取任务是否存在
*
* <pre>
* STATE_BLOCKED 4 阻塞
* STATE_COMPLETE 2 完成
* STATE_ERROR 3 错误
* STATE_NONE -1 不存在
* STATE_NORMAL 0 正常
* STATE_PAUSED 1 暂停
* </pre>
* @param triggerName 触发器名
* @param triggerGroupName 触发器组名
* @author lhs
* @date 2021年4月30日 下午4:58:24
*/
public Boolean isExists(String triggerName, String triggerGroupName) throws Exception {
Scheduler scheduler = schedulerFactoryBean.getScheduler();
return scheduler.getTriggerState(TriggerKey.triggerKey(triggerName, triggerGroupName)) == Trigger.TriggerState.NONE;
}
/**
* 启动任务
* @param id 任务ID
* @param version 任务版本号
* @param cron CRON表达式
*/
public boolean startTask(int id, int version, String cron) {
try {
//新增定时任务
String jobName = "job" + id;
String jobGroupName = "group";
String triggerName = "trigger" + id;
String triggerGroupName = "group";
Map<String, Object> params = new HashMap<>();
params.put("id", id);
params.put("version", version);
addJob(jobName, jobGroupName, triggerName, triggerGroupName, cron, params);
return true;
} catch (Exception e) {
logger.error(e.getMessage(), e);
}
return false;
}
/**
* 停止任务
* @param id 任务ID
*/
public boolean stopTask(int id) {
try {
//删除定时任务
String jobName = "job" + id;
String jobGroupName = "group";
String triggerName = "trigger" + id;
String triggerGroupName = "group";
deleteJob(jobName, jobGroupName, triggerName, triggerGroupName);
return true;
} catch (Exception e) {
logger.error(e.getMessage(), e);
}
return false;
}
}
2、任务执行器
package com.study.job;
import com.study.service.TaskService;
import org.quartz.Job;
import org.quartz.JobDataMap;
import org.quartz.JobExecutionContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import java.time.Instant;
import java.time.ZoneOffset;
import java.time.format.DateTimeFormatter;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
/**
* 定时任务实现类
* 注意:该类是非单例的,每次被调用都会生成一个实例来执行。
* @author lhs
* @date 2021年4月30日 下午4:58:24
*/
public class ScheduledJob implements Job {
private static Logger logger = LoggerFactory.getLogger(ScheduledJob.class);
private static DateTimeFormatter fmt = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSS");
//由于该类实例不是单例,要全局共享使用一个只能使用static来修饰
public static ExecutorService threadPool = Executors.newFixedThreadPool(16);//线程池,最大线程数为16
@Autowired
private TaskService taskService;
@Override
public void execute(JobExecutionContext jobExecutionContext) {
// 传入的参数
JobDataMap mergedJobDataMap = jobExecutionContext.getMergedJobDataMap();
int taskId = mergedJobDataMap.getInt("id");
int version = mergedJobDataMap.getInt("version");
long time = jobExecutionContext.getScheduledFireTime().getTime();
logger.info("执行任务:{},版本号:{},任务计划时间:{}", taskId, version, Instant.ofEpochMilli(time).atZone(ZoneOffset.ofHours(8)).format(fmt));
threadPool.execute(new Runnable() {
@Override
public void run() {
taskService.execTask(taskId, version, time);
}
});
}
}
3、dao层
package com.study.dao;
import com.study.bean.Task;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jdbc.core.BeanPropertyRowMapper;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.jdbc.core.PreparedStatementCreator;
import org.springframework.jdbc.core.PreparedStatementSetter;
import org.springframework.jdbc.support.GeneratedKeyHolder;
import org.springframework.jdbc.support.KeyHolder;
import org.springframework.stereotype.Repository;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.List;
/**
* 示例
* @author lhs
* @date 2023/7/3 11:08
*/
@Repository
public class DemoDao {
private Logger logger = LoggerFactory.getLogger(DemoDao.class);
@Autowired
private JdbcTemplate jdbcTemplate;
/**
* 查询任务
*/
public List<Task> getTaskList() {
String sql = "select task_id,task_name,cron,status,content from task order by task_id desc";
return this.jdbcTemplate.query(sql, new BeanPropertyRowMapper<>(Task.class));
}
/**
* 查询任务
*/
public Task getTaskById(int id) {
String sql = "select task_id,task_name,cron,status,content from task where task_id=? ";
return this.jdbcTemplate.queryForObject(sql, new Integer[]{id}, new BeanPropertyRowMapper<>(Task.class));
}
/**
* 查询任务
* 状态:0未启动,1运行中。
*/
public List<Task> getAllTask() {
String sql = "select task_id,task_name,cron,status,content from task where status=1 ";
return this.jdbcTemplate.query(sql, new BeanPropertyRowMapper<>(Task.class));
}
/**
* 新增任务
*/
public int addTask(Task Task) {
String sql = "insert into task(task_name,cron,status,content) values (?,?,0,?)";
return this.jdbcTemplate.update(sql, new PreparedStatementSetter() {
@Override
public void setValues(PreparedStatement ps) throws SQLException {
ps.setString(1, Task.getTaskName());
ps.setString(2, Task.getCron());
ps.setString(3, Task.getContent());
}
});
}
/**
* 修改任务
*/
public int updateTask(Task Task) {
String sql = "update task set task_name=?,cron=?,content=?,status=0 where task_id=? ";
return this.jdbcTemplate.update(sql, new PreparedStatementSetter() {
@Override
public void setValues(PreparedStatement ps) throws SQLException {
ps.setString(1, Task.getTaskName());
ps.setString(2, Task.getCron());
ps.setString(3, Task.getContent());
ps.setInt(4, Task.getTaskId());
}
});
}
/**
* 修改任务状态
*/
public int updateTaskStatus(int status, int id) {
String sql = "update task set status=? where task_id=? ";
return this.jdbcTemplate.update(sql, new Integer[]{status, id});
}
/**
* 删除任务
*/
public int deleteTask(int id) {
String sql = "delete from task where task_id=? ";
return this.jdbcTemplate.update(sql, new Integer[]{id});
}
/**
* 插入任务记录
*/
public int addTaskRecord(int taskId) {
String sql = "insert into task_record(task_id) values (?)";
KeyHolder keyHolder = new GeneratedKeyHolder();
this.jdbcTemplate.update(new PreparedStatementCreator() {
@Override
public PreparedStatement createPreparedStatement(Connection connection) throws SQLException {
PreparedStatement ps = connection.prepareStatement(sql, Statement.RETURN_GENERATED_KEYS);
ps.setInt(1, taskId);
return ps;
}
}, keyHolder);
return keyHolder.getKey().intValue();
}
/**
* 查询任务最新版本号
*/
public int getMaxVersion(int taskId) {
String sql = "select ifnull(max(id),0) from task_record where task_id=? ";
return jdbcTemplate.queryForObject(sql, new Integer[]{taskId}, Integer.class);
}
}
4、业务代码
package com.study.service;
import com.study.bean.Task;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
/**
* 定时任务业务处理
* @Author: lhs
* @Date: 2022/6/25 18:18
*/
@Service
public class TaskService {
private final Logger logger = LoggerFactory.getLogger(TaskService.class);
@Autowired
private DemoService demoService;
/**
* 执行任务
* @param taskId 任务ID
* @param version 任务版本号
* @param time 计划执行时间
* @author lhs
* @date 2022/6/25 21:17
*/
public void execTask(int taskId, int version, long time) {
// String lock = taskId + "_" + time;
// 防止同一个任务在多个节同时执行,还可以将lock插入一个唯一索引字段,插入失败表示这个任务已经在其他节点执行。
//当前任务版本为最新版本,才可以继续往下执行
int maxVersion = demoService.getMaxVersion(taskId);
if (version < maxVersion) {
//非最新版本,当前版本任务已停止,仅停止当前节点的任务,不能插入任务记录
demoService.onlyStopTask(taskId);
logger.warn("当前节点任务非最新版本{},停止任务:{},版本号:{}", maxVersion, taskId, version);
return;
}
logger.info("开始执行业务,当前任务:{},版本号:{}", taskId, version);
Task task = demoService.getTaskById(taskId);
// 取到业务数据
String content = task.getContent();
// 开始处理业务
// ....
}
}
4、涉及表结构
CREATE TABLE `task` (
`task_id` int(11) NOT NULL AUTO_INCREMENT COMMENT '任务ID',
`task_name` varchar(200) NOT NULL COMMENT '任务名',
`cron` varchar(200) NOT NULL COMMENT 'CRON表达式',
`status` int(1) NOT NULL DEFAULT '0' COMMENT '状态:0未启动,1运行中。',
`content` varchar(200) NOT NULL COMMENT '业务内容',
PRIMARY KEY (`task_id`)
) ENGINE=InnoDB AUTO_INCREMENT=8 DEFAULT CHARSET=utf8 COMMENT='任务表'
CREATE TABLE `task_record` (
`id` int(11) NOT NULL AUTO_INCREMENT COMMENT '版本号',
`task_id` int(11) NOT NULL COMMENT '任务ID',
PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=23 DEFAULT CHARSET=utf8 COMMENT='任务记录表'
示例:
源码:https://gitee.com/gloweds/quartz