1、当前现状
- 所有的任务已经迁移到阿里云Schedulerx;
- 阿里云Schedulerx是按照调用次数收费,有些任务每秒调用一次,费用太高;
2、明确需求
- 需要把执行非常频繁的定时任务从阿里云迁移(阿里云收费根据调用次数,且可以针对单个任务设置是否启动定时调度);
- 服务是集群部署,存在广播模式和单机模式,大部分是单机模式;
- 有时任务失败,需要人工手动触发;
- 对新增的定时任务,不想在阿里云控制台过多配置,但又需要手动触发;
3、实现思路
保留原有阿里云定时任务,禁用执行非常频繁的手动任务,禁用后不会自动执行,但可以手动触发;
禁用的任务使用Spring的定时调度工具,(引入开源的调度工具比较麻烦);
需要考虑在集群环境下,广播和单机模式的实现,引入Redis锁机制;
可以定义一个任务,用来调起真正执行的认为,也使用手动触发模式;
4、代码实现
4.1、配置定时调度使用线程池
package com.suyun.vehicle.conf;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.scheduling.TaskScheduler;
import org.springframework.scheduling.annotation.EnableScheduling;
import org.springframework.scheduling.annotation.ScheduledAnnotationBeanPostProcessor;
import org.springframework.scheduling.annotation.SchedulingConfigurer;
import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;
import org.springframework.scheduling.config.ScheduledTaskRegistrar;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadPoolExecutor;
/**
* Description:
* <p>
*
* </p>
*
* @Author: leo.xiong
* @CreateDate: 2023/7/4 14:06
* @Email: leo.xiong@suyun360.com
* @Since:
*/
@EnableScheduling
@org.springframework.context.annotation.Configuration("springScheduleConfig")
public class SpringScheduleConfig implements SchedulingConfigurer {
@Value("${scheduler.task.size:8}")
private int taskSize;
/**
* <p>注入线程池</p>
* <p>{@link ScheduledAnnotationBeanPostProcessor}实现Spring定时器</p>
* <p>ScheduledAnnotationBeanPostProcessor->MergedBeanDefinitionPostProcessor->BeanPostProcessor扫描注解@Schedule,生成任务</p>
* <p>ScheduledAnnotationBeanPostProcessor->SmartInitializingSingleton和ApplicationListener接口,Spring Bean注入,还未发出通知之前afterSingletonsInstantiated执行一次,Spring 通知执行一次</p>
* <p>1、默认使用一个线程执行,如果多个任务都是每秒执行一次获取有任务执行时间过长,会造成任务延迟,严重直接阻塞任务;</p>
* <p>2、重写configureTasks方法,注入线程池</p>
* <p>3、异步执行需要开启@EnableAsync异步注解,方法上添加@Async注解</p>
* <p>4、ScheduledThreadPoolExecutor.ScheduledFutureTask方法,首选判断任务是否执行,不管手动还是定时,如果是,跳过这次执行,2判断是否非周期调用,直接执行,如果周期调用,执行,如果call()抛出异常,不在重新设置下次任务执行时间,定时任务不在执行</p>
*
* @param taskRegistrar
*/
@Override
public void configureTasks(ScheduledTaskRegistrar taskRegistrar) {
taskRegistrar.setScheduler(taskScheduler());
}
/**
* <p>{@link ScheduledThreadPoolExecutor}和{@link ThreadPoolTaskScheduler}只能使用一个,IOC容器互斥</p>
* <p>ScheduledThreadPoolExecutor只能使用设置固定核心线程长度的线程池,灵活性不足</p>
* <p>ThreadPoolTaskScheduler实例化Bean时,实现了InitializingBean接口,实现了异步调度接口</p>
* <p>最终执行任务的线程池都为{@link java.util.concurrent.ScheduledExecutorService}实现的{@link ScheduledThreadPoolExecutor}实例</p>
*
* @return
*/
@Bean
public TaskScheduler taskScheduler() {
ThreadPoolTaskScheduler taskScheduler = new ThreadPoolTaskScheduler();
// 配置线程池大小,根据任务数量定制
taskScheduler.setPoolSize(taskSize);
// 线程名称前缀
taskScheduler.setThreadNamePrefix("suyun-ThreadPoolTaskScheduler-thread-");
// 设置线程池中任务的等待时间,如果超过这个时候还没有销毁就强制销毁,以确保应用最后能够被关闭,而不是被没有完成的任务阻塞
// 线程池关闭前最大等待时间,确保最后一定关闭,线程任务最大执行时间30分钟
taskScheduler.setAwaitTerminationSeconds(1800);
// 线程池关闭时等待所有任务完成
taskScheduler.setWaitForTasksToCompleteOnShutdown(true);
// 任务拒绝策略 任务->判断核心线程是否存在空闲(直接执行)->判断队列是否已满(已满拒绝,否则放入队列)->判断是否达到最大线程(没有,创建执行,否则返回,等待可用线程)
taskScheduler.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardPolicy());
return taskScheduler;
}
}
4.2、定义调度实现接口
package com.suyun.vehicle.service;
import com.alibaba.fastjson.JSON;
import com.suyun.vehicle.dao.ManualTaskDto;
import com.suyun.vehicle.utils.NetworkUtil;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.springframework.data.redis.core.ValueOperations;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Supplier;
/**
* Description:
* <p>
* 手动定时任务处理调用阿里云定时器 {@link com.suyun.vehicle.conf.SpringScheduleConfig}
* </p>
*
* @Author: leo.xiong
* @CreateDate: 2023/7/4 9:56
* @Email: leo.xiong@suyun360.com
* @Since:
*/
public interface ManualTaskService {
String JOB_NAME = "jobInstance";
String PARAM = "param";
/**
* 手动任务执行前做一定的校验->获取对应执行任务的对象,手动执行一次
*
* @param instanceParameters
* @return
* @throws Exception
*/
Object manualProcess(String instanceParameters) throws Exception;
/**
* 校验参数
*
* @param instanceParameters
* @return
*/
default Object validParam(String instanceParameters) {
if (StringUtils.isEmpty(instanceParameters)) {
return "执行任务参数不能为空";
}
if (!instanceParameters.contains("{") || !instanceParameters.contains("}")) {
return "执行任务参数不是JSON字符串";
}
Map<String, String> keyValue = null;
try {
keyValue = JSON.parseObject(instanceParameters, Map.class);
} catch (Exception e) {
return "执行任务参数不是JSON字符串 instanceParameters:" + instanceParameters;
}
if (!keyValue.containsKey(JOB_NAME)) {
return "任务实例不能为空 instanceParameters:" + instanceParameters;
}
return keyValue;
}
/**
* 子任务执行
*/
interface SubManualTaskService {
/**
* 定时执行 @Scheduled
*
* @return
*/
void process();
/**
* 执行任务
*
* @param context
* @param testEndpoint
* @return
*/
Object processing(Object context, String testEndpoint);
/**
* 开始执行定时任务
*
* @param manualTaskDto
* @param supplier
* @return 返回true,执行成功,false,执行失败
*/
default Object execute(ManualTaskDto manualTaskDto, Supplier<Object> supplier) {
Long beginTime = null;
if (manualTaskDto.getLogger() != null) {
manualTaskDto.getLogger().info("开始任务 name:{}", this.getClass().getSimpleName());
if (manualTaskDto.getLogger().isDebugEnabled()) {
beginTime = System.currentTimeMillis();
}
}
if (before(manualTaskDto)) {
if (manualTaskDto.getLogger() != null) {
manualTaskDto.getLogger().info("任务无需执行");
}
return Boolean.TRUE;
}
Object r = null;
try {
/**@Scheduled 任务必须抓取异常,否则阻塞任务队列,任务不在执行{@link com.suyun.vehicle.conf.SpringScheduleConfig}**/
r = supplier.get();
if (r == null) {
return Boolean.TRUE;
}
} catch (Throwable e) {
//Throwable异常大于Exception,是Exception的父级,包括了Error等,如调用飞书接口,发送通知失败,会抛出Error,使用Exception就会抓取失败,防止有些异常抓取失败,导致线程阻塞
if (manualTaskDto.getLogger() != null) {
manualTaskDto.getLogger().warn("任务执行失败:manualTaskDto: {}", manualTaskDto, e);
}
return Boolean.FALSE;
}
if (manualTaskDto.getLogger() != null) {
manualTaskDto.getLogger().info("结束任务 name:{}", this.getClass().getSimpleName());
if (manualTaskDto.getLogger().isDebugEnabled()) {
manualTaskDto.getLogger().debug("任务执行时间:{} ms", System.currentTimeMillis() - beginTime);
}
}
return r;
}
/**
* 在执行任务之前,需要先判断是否已经执行,子任务 @Scheduled需要先判断是否别的服务器已经执行了
*
* @param manualTaskDto
* @return true 已执行,false未执行
*/
default boolean before(ManualTaskDto manualTaskDto) {
if (StringUtils.isNotEmpty(manualTaskDto.getTestEndpoint())) {
if (manualTaskDto.getLogger() != null) {
manualTaskDto.getLogger().debug("测试环境任务无需执行");
}
//如果测试环境无需执行
return true;
}
if (manualTaskDto.getBroadcast() || manualTaskDto.getValueOperations() == null) {
//如果redis对象为空,此处只能使用广播模式
return true;
}
//只能单机执行
String key = manualTaskDto.getGroupId() + ":" + this.getClass().getName();
if (manualTaskDto.getValueOperations().setIfAbsent(key, NetworkUtil.IP + ":" + System.currentTimeMillis())) {
/**
* 900毫秒失效,实际失效时间取决于Redis失效机制
*/
manualTaskDto.getValueOperations().getOperations().expire(key, 900, TimeUnit.MILLISECONDS);
return false;
}
if (manualTaskDto.getLogger() != null) {
manualTaskDto.getLogger().info("任务已有服务器执行,无需重复执行");
}
return true;
}
}
}
4.3、手动调用其他调度任务
package com.suyun.modules.vehicle.timetask;
import com.alibaba.schedulerx.worker.domain.JobContext;
import com.alibaba.schedulerx.worker.processor.JavaProcessor;
import com.alibaba.schedulerx.worker.processor.ProcessResult;
import com.suyun.vehicle.service.ManualTaskService;
import com.thinkgem.jeesite.common.utils.SpringContextHolder;
import com.thinkgem.jeesite.common.utils.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
import java.util.Map;
/**
* Description:
* <p>
* 手动执行任务
* </p>
*
* @Author: leo.xiong
* @CreateDate: 2023/7/4 9:41
* @Email: leo.xiong@suyun360.com
* @Since:
*/
@Component
public class ManualTask extends JavaProcessor {
private static final Logger LOGGER = LoggerFactory.getLogger(ManualTask.class);
@Override
public ProcessResult process(JobContext context) throws Exception {
String instanceParameters = context.getInstanceParameters();
ManualTaskService manualTaskService = new ManualTaskService() {
@Override
public Object manualProcess(String instanceParameters) throws Exception {
Object param = validParam(instanceParameters);
if (param == null) {
return new ProcessResult(true);
}
if (param instanceof String) {
LOGGER.warn((String) param);
return new ProcessResult(true);
}
Map<String, String> keyValueMap = (Map<String, String>) param;
JavaProcessor javaProcessor = SpringContextHolder.getBean(keyValueMap.get(ManualTaskService.JOB_NAME));
if (javaProcessor == null) {
LOGGER.warn("任务实例不为JavaProcessor对象 instanceParameters:{}", instanceParameters);
return new ProcessResult(true);
}
String paramValue = keyValueMap.get(ManualTaskService.PARAM);
JobContext jobContext = null;
if (StringUtils.isEmpty(paramValue)) {
jobContext = JobContext.newBuilder().setGroupId(context.getGroupId()).build();
} else {
jobContext = JobContext.newBuilder().setGroupId(context.getGroupId()).setJobParameters(paramValue).setInstanceParameters(paramValue).build();
}
return javaProcessor.process(jobContext);
}
};
ProcessResult processResult = null;
try {
processResult = (ProcessResult) manualTaskService.manualProcess(instanceParameters);
return processResult;
} catch (Exception e) {
LOGGER.warn("手动执行任务失败");
return new ProcessResult(false);
}
}
}
4.4、使用Spring定时器任务实现
package com.suyun.modules.vehicle.timetask;
import com.alibaba.schedulerx.worker.domain.JobContext;
import com.alibaba.schedulerx.worker.processor.JavaProcessor;
import com.alibaba.schedulerx.worker.processor.ProcessResult;
import com.suyun.modules.vehicle.timetask.service.CmdDownStatusTaskService;
import com.suyun.vehicle.dao.ManualTaskDto;
import com.suyun.vehicle.service.ManualTaskService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.data.redis.core.ValueOperations;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
/**
* Description:队列状态,任务状态,进度刷新
* 包括UDS和终端任务队列
*
* @Author: leo.xiong
* @CreateDate: 2020/12/26 17:39
* @Email: leo.xiong@suyun360.com
* @Since:
*/
@Component
public class CmdDownStatusTask extends JavaProcessor implements ManualTaskService.SubManualTaskService {
private static final Logger LOGGER = LoggerFactory.getLogger(CmdDownStatusTask.class);
@Autowired
private CmdDownStatusTaskService cmdDownStatusTaskService;
@Resource(name = "redisTemplate")
private ValueOperations<String, String> valueOperations;
@Value("${schedulerx.test.endpoint:}")
private String testEndpoint;
@Value("${schedulerx.group.id}")
private String groupId;
/**
* 阿里云调度器
*
* @param context
* @return
*/
@Override
public ProcessResult process(JobContext context) {
return processing(context, null);
}
/**
* Spring注解调度器
*
* @return
*/
@Override
@Scheduled(cron = "0 */1 * * * ?")
public void process() {
processing(JobContext.newBuilder().setGroupId(groupId).build(), testEndpoint);
}
/**
* 自动下发状态刷新(每一分钟执行一次)
*/
@Override
public ProcessResult processing(Object context, String testEndpoint) {
ManualTaskDto manualTaskDto = ManualTaskDto.builder().logger(LOGGER).valueOperations(valueOperations).groupId(groupId).testEndpoint(testEndpoint);
Object obj = execute(manualTaskDto, () -> {
return cmdDownStatusTaskService.doCmdDownStatusTask();
});
return obj instanceof Boolean ? new ProcessResult((Boolean) obj) : (obj instanceof ProcessResult ? (ProcessResult) obj : new ProcessResult(true));
}
}
5、Spring定时调度器生效原理
5.1、初始化定时器配置
Spring定义的钩子ScheduledAnnotationBeanPostProcessor
实现了BeanPostProcessor,SmartInitializingSingleton,ApplicationListener接口;
BeanPostProcessor:会对所有Bean,一般是用@Component注解的Bean对象扫描
@Override
public Object postProcessAfterInitialization(final Object bean, String beanName) {
Class<?> targetClass = AopProxyUtils.ultimateTargetClass(bean);
//第一次扫描后,为空的类下次不会再扫描
if (!this.nonAnnotatedClasses.contains(targetClass)) {
//找到当前类添加了Scheduled注解的所有方法
Map<Method, Set<Scheduled>> annotatedMethods = MethodIntrospector.selectMethods(targetClass,
(MethodIntrospector.MetadataLookup<Set<Scheduled>>) method -> {
Set<Scheduled> scheduledMethods = AnnotatedElementUtils.getMergedRepeatableAnnotations(
method, Scheduled.class, Schedules.class);
return (!scheduledMethods.isEmpty() ? scheduledMethods : null);
});
//
if (annotatedMethods.isEmpty()) {
this.nonAnnotatedClasses.add(targetClass);
if (logger.isTraceEnabled()) {
logger.trace("No @Scheduled annotations found on bean class: " + bean.getClass());
}
}
else {
// Non-empty set of methods
annotatedMethods.forEach((method, scheduledMethods) ->
scheduledMethods.forEach(scheduled ->
//校验一些方法配置是否正确,如,定时任务只能是无参的方法等
processScheduled(scheduled, method, bean)));
if (logger.isDebugEnabled()) {
logger.debug(annotatedMethods.size() + " @Scheduled methods processed on bean '" + beanName +
"': " + annotatedMethods);
}
}
}
return bean;
}
protected void processScheduled(Scheduled scheduled, Method method, Object bean) {
try {
//方法无参
Assert.isTrue(method.getParameterCount() == 0,
"Only no-arg methods may be annotated with @Scheduled");
//获取注解所在的bean和方法
Method invocableMethod = AopUtils.selectInvocableMethod(method, bean.getClass());
//定义一个Runnable对象,传入对象和方法,这里没有传入参数,这也是注解不能有入参的原因,后面会用反射的invoke执行定时方法
Runnable runnable = new ScheduledMethodRunnable(bean, invocableMethod);
boolean processedSchedule = false;
//
String errorMessage =
"Exactly one of the 'cron', 'fixedDelay(String)', or 'fixedRate(String)' attributes is required";
Set<ScheduledTask> tasks = new LinkedHashSet<>(4);
// Determine initial delay
long initialDelay = scheduled.initialDelay();
String initialDelayString = scheduled.initialDelayString();
if (StringUtils.hasText(initialDelayString)) {
//initialDelay和initialDelayString不能同时出现
Assert.isTrue(initialDelay < 0, "Specify 'initialDelay' or 'initialDelayString', not both");
if (this.embeddedValueResolver != null) {
initialDelayString = this.embeddedValueResolver.resolveStringValue(initialDelayString);
}
if (StringUtils.hasLength(initialDelayString)) {
try {
initialDelay = parseDelayAsLong(initialDelayString);
}
catch (RuntimeException ex) {
//initialDelayString不是有效的Long或者Duration.parse
throw new IllegalArgumentException(
"Invalid initialDelayString value \"" + initialDelayString + "\" - cannot parse into long");
}
}
}
// Check cron expression
String cron = scheduled.cron();
if (StringUtils.hasText(cron)) {
String zone = scheduled.zone();
if (this.embeddedValueResolver != null) {
cron = this.embeddedValueResolver.resolveStringValue(cron);
zone = this.embeddedValueResolver.resolveStringValue(zone);
}
if (StringUtils.hasLength(cron)) {
// cron不支持initialDelay
Assert.isTrue(initialDelay == -1, "'initialDelay' not supported for cron triggers");
processedSchedule = true;
TimeZone timeZone;
if (StringUtils.hasText(zone)) {
timeZone = StringUtils.parseTimeZoneString(zone);
}
else {
timeZone = TimeZone.getDefault();
}
//加入队列,把Runable和定时器规则包装成CronTask对象
tasks.add(this.registrar.scheduleCronTask(new CronTask(runnable, new CronTrigger(cron, timeZone))));
}
}
// At this point we don't need to differentiate between initial delay set or not anymore
if (initialDelay < 0) {
initialDelay = 0;
}
// Check fixed delay
long fixedDelay = scheduled.fixedDelay();
if (fixedDelay >= 0) {
Assert.isTrue(!processedSchedule, errorMessage);
processedSchedule = true;
tasks.add(this.registrar.scheduleFixedDelayTask(new FixedDelayTask(runnable, fixedDelay, initialDelay)));
}
String fixedDelayString = scheduled.fixedDelayString();
if (StringUtils.hasText(fixedDelayString)) {
if (this.embeddedValueResolver != null) {
fixedDelayString = this.embeddedValueResolver.resolveStringValue(fixedDelayString);
}
if (StringUtils.hasLength(fixedDelayString)) {
Assert.isTrue(!processedSchedule, errorMessage);
processedSchedule = true;
try {
fixedDelay = parseDelayAsLong(fixedDelayString);
}
catch (RuntimeException ex) {
//fixedDelayString不是有效的Long或者Duration.parse
throw new IllegalArgumentException(
"Invalid fixedDelayString value \"" + fixedDelayString + "\" - cannot parse into long");
}
tasks.add(this.registrar.scheduleFixedDelayTask(new FixedDelayTask(runnable, fixedDelay, initialDelay)));
}
}
// Check fixed rate
long fixedRate = scheduled.fixedRate();
if (fixedRate >= 0) {
Assert.isTrue(!processedSchedule, errorMessage);
processedSchedule = true;
tasks.add(this.registrar.scheduleFixedRateTask(new FixedRateTask(runnable, fixedRate, initialDelay)));
}
String fixedRateString = scheduled.fixedRateString();
if (StringUtils.hasText(fixedRateString)) {
if (this.embeddedValueResolver != null) {
fixedRateString = this.embeddedValueResolver.resolveStringValue(fixedRateString);
}
if (StringUtils.hasLength(fixedRateString)) {
Assert.isTrue(!processedSchedule, errorMessage);
processedSchedule = true;
try {
fixedRate = parseDelayAsLong(fixedRateString);
}
catch (RuntimeException ex) {
//fixedRateString不是有效的Long或者Duration.parse
throw new IllegalArgumentException(
"Invalid fixedRateString value \"" + fixedRateString + "\" - cannot parse into long");
}
tasks.add(this.registrar.scheduleFixedRateTask(new FixedRateTask(runnable, fixedRate, initialDelay)));
}
}
// Check whether we had any attribute set
Assert.isTrue(processedSchedule, errorMessage);
// Finally register the scheduled tasks
synchronized (this.scheduledTasks) {
//bean任务放入scheduledTasks任务队列中,每个bean维护一个队列,一个类可以有多个@Scheduled方法
Set<ScheduledTask> registeredTasks = this.scheduledTasks.get(bean);
if (registeredTasks == null) {
registeredTasks = new LinkedHashSet<>(4);
this.scheduledTasks.put(bean, registeredTasks);
}
//注册所有的任务
registeredTasks.addAll(tasks);
}
}
catch (IllegalArgumentException ex) {
//方法错误
throw new IllegalStateException(
"Encountered invalid @Scheduled method '" + method.getName() + "': " + ex.getMessage());
}
}
SmartInitializingSingleton:会调用afterSingletonsInstantiated方法,在
ApplicationContext 还未初始化完成,就先注入定时任务,主要是为了尽可能早的注入任务信息
ApplicationListener:监听程序启动成功之后,调用onApplicationEvent方法,执行顺序在SmartInitializingSingleton后面
5.2、执行定时任务
private void finishRegistration() {
if (this.scheduler != null) {
this.registrar.setScheduler(this.scheduler);
}
if (this.beanFactory instanceof ListableBeanFactory) {
Map<String, SchedulingConfigurer> beans =
((ListableBeanFactory) this.beanFactory).getBeansOfType(SchedulingConfigurer.class);
List<SchedulingConfigurer> configurers = new ArrayList<>(beans.values());
AnnotationAwareOrderComparator.sort(configurers);
for (SchedulingConfigurer configurer : configurers) {
//默认单线程线程池,重写可以使用多线程线程池
configurer.configureTasks(this.registrar);
}
}
//存在任务,scheduler线程池为空
if (this.registrar.hasTasks() && this.registrar.getScheduler() == null) {
Assert.state(this.beanFactory != null, "BeanFactory must be set to find scheduler by type");
try {
//根据类型TaskScheduler,从容器里面获取bean,为空抛出异常,之后抓取异常
this.registrar.setTaskScheduler(resolveSchedulerBean(beanFactory, TaskScheduler.class, false));
}
catch (NoUniqueBeanDefinitionException ex) {
//如果根据类型存在多个bean,在根据类型名称找一次,注入名称为taskScheduler,类型为TaskScheduler的bean
logger.debug("Could not find unique TaskScheduler bean", ex);
try {
this.registrar.setTaskScheduler(resolveSchedulerBean(beanFactory, TaskScheduler.class, true));
}
catch (NoSuchBeanDefinitionException ex2) {
if (logger.isInfoEnabled()) {
logger.info("More than one TaskScheduler bean exists within the context, and " +
"none is named 'taskScheduler'. Mark one of them as primary or name it 'taskScheduler' " +
"(possibly as an alias); or implement the SchedulingConfigurer interface and call " +
"ScheduledTaskRegistrar#setScheduler explicitly within the configureTasks() callback: " +
ex.getBeanNamesFound());
}
}
}
catch (NoSuchBeanDefinitionException ex) {
//如果找不到类型TaskScheduler的bean,在根据类型ScheduledExecutorService在容器中查找,否则抛出异常
logger.debug("Could not find default TaskScheduler bean", ex);
// Search for ScheduledExecutorService bean next...
try {
this.registrar.setScheduler(resolveSchedulerBean(beanFactory, ScheduledExecutorService.class, false));
}
catch (NoUniqueBeanDefinitionException ex2) {
//如果存在多个ScheduledExecutorService类型的bean,在根据ScheduledExecutorService和名称taskScheduler找一下
logger.debug("Could not find unique ScheduledExecutorService bean", ex2);
try {
this.registrar.setScheduler(resolveSchedulerBean(beanFactory, ScheduledExecutorService.class, true));
}
catch (NoSuchBeanDefinitionException ex3) {
if (logger.isInfoEnabled()) {
logger.info("More than one ScheduledExecutorService bean exists within the context, and " +
"none is named 'taskScheduler'. Mark one of them as primary or name it 'taskScheduler' " +
"(possibly as an alias); or implement the SchedulingConfigurer interface and call " +
"ScheduledTaskRegistrar#setScheduler explicitly within the configureTasks() callback: " +
ex2.getBeanNamesFound());
}
}
}
catch (NoSuchBeanDefinitionException ex2) {
// 打印异常
logger.debug("Could not find default ScheduledExecutorService bean", ex2);
// Giving up -> falling back to default scheduler within the registrar...
logger.info("No TaskScheduler/ScheduledExecutorService bean found for scheduled processing");
}
}
}
this.registrar.afterPropertiesSet();
}
4种任务,这里找下Cron任务
protected void scheduleTasks() {
//没有定义线程池
if (this.taskScheduler == null) {
this.localExecutor = Executors.newSingleThreadScheduledExecutor();
//初始化一个单线程定时调度线程池,包装成ConcurrentTaskScheduler,自定义线程池,也会包装成ConcurrentTaskScheduler对象
this.taskScheduler = new ConcurrentTaskScheduler(this.localExecutor);
}
if (this.triggerTasks != null) {
for (TriggerTask task : this.triggerTasks) {
addScheduledTask(scheduleTriggerTask(task));
}
}
if (this.cronTasks != null) {
for (CronTask task : this.cronTasks) {
addScheduledTask(scheduleCronTask(task));
}
}
if (this.fixedRateTasks != null) {
for (IntervalTask task : this.fixedRateTasks) {
addScheduledTask(scheduleFixedRateTask(task));
}
}
if (this.fixedDelayTasks != null) {
for (IntervalTask task : this.fixedDelayTasks) {
addScheduledTask(scheduleFixedDelayTask(task));
}
}
}
@Nullable
public ScheduledTask scheduleCronTask(CronTask task) {
//根据任务对象,移除任务,并返回前面的任务,如果没有就新建一个任务
ScheduledTask scheduledTask = this.unresolvedTasks.remove(task);
boolean newTask = false;
if (scheduledTask == null) {
scheduledTask = new ScheduledTask(task);
newTask = true;
}
//存在线程池,用线程池定时执行,即使没有taskScheduler,也会初始化一个单线程池,所以taskScheduler 不为空
if (this.taskScheduler != null) {
//把结果绑定对象
scheduledTask.future = this.taskScheduler.schedule(task.getRunnable(), task.getTrigger());
}
else {
addCronTask(task);
this.unresolvedTasks.put(task, scheduledTask);
}
return (newTask ? scheduledTask : null);
}
执行ConcurrentTaskScheduler的schedule方法
@Override
@Nullable
public ScheduledFuture<?> schedule(Runnable task, Trigger trigger) {
try {
if (this.enterpriseConcurrentScheduler) {
return new EnterpriseConcurrentTriggerScheduler().schedule(decorateTask(task, true), trigger);
}
else {
ErrorHandler errorHandler =
(this.errorHandler != null ? this.errorHandler : TaskUtils.getDefaultErrorHandler(true));
//ReschedulingRunnable对象,实现了Runnable和Future接口表明用线程执行,会有线程返回值
return new ReschedulingRunnable(task, trigger, this.scheduledExecutor, errorHandler).schedule();
}
}
catch (RejectedExecutionException ex) {
throw new TaskRejectedException("Executor [" + this.scheduledExecutor + "] did not accept task: " + task, ex);
}
}
@Nullable
public ScheduledFuture<?> schedule() {
//同一个ReschedulingRunnable执行,线程安全
synchronized (this.triggerContextMonitor) {
//根据最后的时间,获取下次执行时间
this.scheduledExecutionTime = this.trigger.nextExecutionTime(this.triggerContext);
if (this.scheduledExecutionTime == null) {
return null;
}
//下次执行时间-当前系统时间等于需要延迟的时间,如果下次执行时间比当前时间小,可能使用的是单线程,或者,线程任务执行时间过长,结果可能为负数
long initialDelay = this.scheduledExecutionTime.getTime() - System.currentTimeMillis();
//执行ScheduledThreadPoolExecutor的schedule方法
this.currentFuture = this.executor.schedule(this, initialDelay, TimeUnit.MILLISECONDS);
return this;
}
}
public ScheduledFuture<?> schedule(Runnable command,
long delay,
TimeUnit unit) {
if (command == null || unit == null)
throw new NullPointerException();
RunnableScheduledFuture<?> t = decorateTask(command,
//包装一个ScheduledFutureTask对象,实现了Runnable和Future,command也实现了Runnable和Future,被包装为Callable,后面会调用
new ScheduledFutureTask<Void>(command, null,
//触发时间,如果小于0偏移时间就为0,之后加上偏移时间
triggerTime(delay, unit)));
//延迟执行任务
delayedExecute(t);
return t;
}
private void delayedExecute(RunnableScheduledFuture<?> task) {
//如果线程池状态为shutdown,则拒绝新增任务
if (isShutdown())
reject(task);
else {
//任务加入队列
super.getQueue().add(task);
//如果线程池状态shutdown,并且运行的线程不是循环执行(定时执行),并且可以移除队列任务,那么移除之后终止任务,否则开始执行
if (isShutdown() &&
!canRunInCurrentRunState(task.isPeriodic()) &&
remove(task))
task.cancel(false);
else
//执行任务
ensurePrestart();
}
}
public boolean remove(Runnable task) {
//移除队列,并返回是否移除成功
boolean removed = workQueue.remove(task);
//尝试终止线程,并回收线程资源
tryTerminate(); // In case SHUTDOWN and now empty
return removed;
}
public boolean remove(Object x) {
//线程队列加锁,需要删除,重新排序堆
final ReentrantLock lock = this.lock;
lock.lock();
try {
int i = indexOf(x);
//如果元素不存在,移除失败
if (i < 0)
return false;
//移除元素
setIndex(queue[i], -1);
//对大小减少1
int s = --size;
//获取堆尾的最后一个任务
RunnableScheduledFuture<?> replacement = queue[s];
//最后的索引为空,减少了一个任务,索引从0开始
queue[s] = null;
//如果移除的任务不是最后一个任务
if (s != i) {
//将添加在顶部的元素向下筛选到其堆顺序位置
siftDown(i, replacement);
if (queue[i] == replacement)
//从底部向上筛选添加到其堆有序位置的元素
siftUp(i, replacement);
}
return true;
} finally {
lock.unlock();
}
}
线程池状态
RUNNING:运行状态
SHUTDOWN:线程池关闭,不在接收新任务,但线程池队列中的任务还是会执行完成
STOP:线程池停止,不接受新任务,且尝试终止线程池队列汇总的任务 TIDYING:线程池队列中所有的任务已经完成
TERMINATED:线程池状态为终止
final void tryTerminate() {
//没有退出条件的循环
for (;;) {
//ctl存储的是线程池状态和工作线程个数
int c = ctl.get();
//线程状态 RUNNING||(TIDYING || TERMINATED)||(SHUTDOWN && 线程队列不为空),直接返回
if (isRunning(c) ||
runStateAtLeast(c, TIDYING) ||
(runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))
return;
//线程执行任务不为0,但线程池状态只能为STOP
if (workerCountOf(c) != 0) { // Eligible to terminate
//终端线程,回收资源,回收一个之后,下次执行到这里,状态还是STOP,worker还是不为0 ,又终止一个,直到都终止完成,之后对不是终止状态的线程池终止
interruptIdleWorkers(ONLY_ONE);
return;
}
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
//如果线程状态为TIDYING
if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
try {
//终止线程,空方法,子类可以实现从TIDYING状态变为TERMINATED状态之前做一定逻辑判断
terminated();
} finally {
//设置线程池状态为终止
ctl.set(ctlOf(TERMINATED, 0));
//唤醒所有的终止任务,根据链表,通过unpark一个一个暂停线程。
termination.signalAll();
}
return;
}
} finally {
mainLock.unlock();
}
// else retry on failed CAS
}
}
private void interruptIdleWorkers(boolean onlyOne) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
for (Worker w : workers) {
//获取所有活动的线程对象
Thread t = w.thread;
if (!t.isInterrupted() && w.tryLock()) {
try {
//中断线程
t.interrupt();
} catch (SecurityException ignore) {
} finally {
w.unlock();
}
}
//终端一个后跳出循环
if (onlyOne)
break;
}
} finally {
mainLock.unlock();
}
}
public boolean cancel(boolean mayInterruptIfRunning) {
boolean cancelled = super.cancel(mayInterruptIfRunning);
if (cancelled && removeOnCancel && heapIndex >= 0)
//终止成功,移除任务
remove(this);
return cancelled;
}
public boolean cancel(boolean mayInterruptIfRunning) {
//如果状态不是新 || 线程状态不能设置为CANCELLED,返回false,不能中断
if (!(state == NEW &&
UNSAFE.compareAndSwapInt(this, stateOffset, NEW,
mayInterruptIfRunning ? INTERRUPTING : CANCELLED)))
return false;
try { // in case call to interrupt throws exception
if (mayInterruptIfRunning) {
try {
Thread t = runner;
if (t != null)
//中断线程
t.interrupt();
} finally { // final state
//设置线程状态为中断
UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED);
}
}
} finally {
//回收线程资源
finishCompletion();
}
return true;
}
private void finishCompletion() {
// assert state > COMPLETING;
for (WaitNode q; (q = waiters) != null;) {
//循环队列节点
if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {
for (;;) {
//当前节点循环
Thread t = q.thread;
if (t != null) {
q.thread = null;
//终止线程
LockSupport.unpark(t);
}
//获取当前节点的下游节点,如果为空,表示已当前节点作为初始节点的下游节点都已经被终止了
WaitNode next = q.next;
if (next == null)
break;
//下一个worker节点为空,解绑worker和线程池关系,GC能正常回收
q.next = null; // unlink to help gc
//把它的下游节点设置为头部节点,循环上终止,移动节点任务
q = next;
}
break;
}
}
//空方法,子类可以实现,比如判断线程是否都已回收,worker已经未0等等
done();
callable = null; // to reduce footprint
}
真正开始执行任务
void ensurePrestart() {
//获取线程池工作线程个数
int wc = workerCountOf(ctl.get());
//如果工作线程小于核心线程数量
if (wc < corePoolSize)
//直接添加任务,后面的一个参数表示是否核心线程
addWorker(null, true);
else if (wc == 0)
//如果工作线程0,直接添加任务
addWorker(null, false);
}
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
//循环
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
//线程池状态是(!RUNNING) && (SHUTDOWN && firstTask为空&&队列不为空)直接返回false
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
for (;;) {
int wc = workerCountOf(c);
//线程池中的工作线程大于等于最大值 || 大于核心线程,返回false
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
//CAS增加一个worker,增加成功,退出循环跳到retry:,往下执行
if (compareAndIncrementWorkerCount(c))
break retry;
//如果没有设置成功
c = ctl.get(); // Re-read ctl
//线程状态以改变,继续循环,重新执行for循环
if (runStateOf(c) != rs)
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
//获取堆的第一个任务,时间最前面,并根据线程工厂创建线程,Worker重写了toString方法
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
//加锁,这个线程不能中断
mainLock.lock();
try {
// Recheck while holding lock.
// Back out on ThreadFactory failure or if
// shut down before lock acquired.
int rs = runStateOf(ctl.get());
//线程池状态RUNNING 或者 SHUTDOWN && 堆首任务不存在
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
//如果线程已经执行,抛出异常
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
//把新的任务加入到workers里面,workers就是个Set,如果toString方法相同,覆盖
workers.add(w);
//获取当前工作组个数,如果大于最大值,更新最大值
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
//新增工作成功
workerAdded = true;
}
} finally {
mainLock.unlock();
}
if (workerAdded) {
//新增成功,开始执行任务ScheduledThreadPoolExecutor.ScheduledFutureTask
t.start();
workerStarted = true;
}
}
} finally {
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}
Worker的构造方法方法
Worker(Runnable firstTask) {
setState(-1); // inhibit interrupts until runWorker
//堆首任务最先执行,ScheduledThreadPoolExecutor.ScheduledFutureTask类型
this.firstTask = firstTask;
//获取到Bean工厂就是ThreadPoolTaskScheduler对象,CustomizableThreadFactory->CustomizableThreadCreator创建线程
this.thread = getThreadFactory().newThread(this);
}
Worker的toString方法
public String toString() {
long ncompleted;
int nworkers, nactive;
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
ncompleted = completedTaskCount;
nactive = 0;
nworkers = workers.size();
for (Worker w : workers) {
ncompleted += w.completedTasks;
if (w.isLocked())
++nactive;
}
} finally {
mainLock.unlock();
}
int c = ctl.get();
String rs = (runStateLessThan(c, SHUTDOWN) ? "Running" :
(runStateAtLeast(c, TERMINATED) ? "Terminated" :
"Shutting down"));
//线程池状态,工作线程个数,未激活的线程个数,队列长度,未完成的任务数量
return super.toString() +
"[" + rs +
", pool size = " + nworkers +
", active threads = " + nactive +
", queued tasks = " + workQueue.size() +
", completed tasks = " + ncompleted +
"]";
}
ScheduledThreadPoolExecutor.ScheduledFutureTask开始启动
- 实现了Comparable 接口,比较的对象为Delayed
- ScheduledThreadPoolExecutor线程池维护的是一个DelayedWorkQueue优先级自增长队列,最大容量为Integer.MAX_VALUE,是一个堆,堆首放的就是时间最早执行的任务,根据Delayed比较时间,每次新增,删除,执行后都会更新下一次任务执行时间,更新堆信息;
public void run() {
//判断是否是周期任务
boolean periodic = isPeriodic();
//线程池非运行状态(RUNNING || (SHUTDOWN && 可以终止))
if (!canRunInCurrentRunState(periodic))
//取消任务,回收资源
cancel(false);
else if (!periodic)
//非周期性任务,直接执行run
ScheduledFutureTask.super.run();
else if (ScheduledFutureTask.super.runAndReset()) {
//周期性任务执行成功,设置下一次任务时间
setNextRunTime();
//重新入队,调用执行
reExecutePeriodic(outerTask);
}
}
protected boolean runAndReset() {
if (state != NEW ||
!UNSAFE.compareAndSwapObject(this, runnerOffset,
null, Thread.currentThread()))
return false;
boolean ran = false;
int s = state;
try {
//callable就是前面定义的ReschedulingRunnable包装的Callable对象
Callable<V> c = callable;
if (c != null && s == NEW) {
try {
c.call(); // don't set result
//如果执行失败,跳入catch,异常被抓取,不会抛出异常
ran = true;
} catch (Throwable ex) {
setException(ex);
}
}
} finally {
// runner must be non-null until state is settled to
// prevent concurrent calls to run()
runner = null;
// state must be re-read after nulling runner to prevent
// leaked interrupts
s = state;
if (s >= INTERRUPTING)
handlePossibleCancellationInterrupt(s);
}
//抓取异常之后,返回false,不会设置下次任务时间,所以在代码层面需要手动抓取异常
return ran && s == NEW;
}
ReschedulingRunnable执行,调用的是
@Override
public void run() {
Date actualExecutionTime = new Date();
//调用的是父级的run方法,实际调用的是this.delegate.run();,delegate就是CornTask获取的Runable,在@Scheuler注解的时候生成的ScheduledMethodRunnable对象
super.run();
Date completionTime = new Date();
synchronized (this.triggerContextMonitor) {
Assert.state(this.scheduledExecutionTime != null, "No scheduled execution");
this.triggerContext.update(this.scheduledExecutionTime, actualExecutionTime, completionTime);
if (!obtainCurrentFuture().isCancelled()) {
schedule();
}
}
}
ScheduledMethodRunnable的run方法
@Override
public void run() {
try {
//设置定时任务方法为可执行
ReflectionUtils.makeAccessible(this.method);
//反射执行定义的定时任务方法
this.method.invoke(this.target);
}
catch (InvocationTargetException ex) {
ReflectionUtils.rethrowRuntimeException(ex.getTargetException());
}
catch (IllegalAccessException ex) {
throw new UndeclaredThrowableException(ex);
}
}
定时任务重新调用
void reExecutePeriodic(RunnableScheduledFuture<?> task) {
if (canRunInCurrentRunState(true)) {
//线程池运行状态,任务重新入队
super.getQueue().add(task);
//线程池不是运行状态,移除任务,结束任务,回收资源
if (!canRunInCurrentRunState(true) && remove(task))
task.cancel(false);
else
//再次调用前置执行方法,递归了
ensurePrestart();
}
}