文章目录
- 前文
- 案例展示
- Misfire机制
- 1. 启动过程补偿
- 2. 定时任务补偿
- 3. 查询待触发列表时间区间补偿
前文
Misfire
是啥意义的?使用翻译软件结果是"失火"。一个定时软件,还来失火?其实在Java里面,fire的含义更应该是触发,比如在Netty中就常见这个词
同样在Quartz中,这个fire也是到处可见。比如QRTZ_TRIGGERS
表中的NEXT_FIRE_TIME
(下一次触发时间)、PREV_FIRE_TIME
(上一次触发时间)。所以这里Misfire
的意思是错过触发
。Quartz作为根据时间来定时触发的框架,不可避免的一个问题就是针对错过触发的任务的补偿机制。很多人难以理解Quartz怎么会错过任务触发呢?这是什么Bug吗?哪些场景下会导致任务错过触发呢?首先,如果程序停机了,此时由于Quartz调度线程都不在了,当然不会继续执行任务,当程序再次重启时,此时在停机时间内该调度的任务可能认为是错过触发的,需要进行补偿。其次,即使程序正常运行,由于工作线程数量限制和某些任务执行时间过长,也会导致任务错过触发,毕竟干活的都是工作线程,工作线程没有空闲,即使任务调度时间到了,也轮不到触发。这么分析下来,这些算不上是Quartz的BUG。通常来说,工作线程的数量org.quartz.threadPool.threadCount
必须根据业务情况来合理设置,另外,通过Quartz触发的任务应该尽量时间短或可控。但补偿机制还是必不可少的,毕竟谁也不能保证一切可控。
案例展示
下面通过一段代码来演示一些导致错过触发任务的场景,这里创建一个耗时3分钟的任务来模拟耗时较长的任务
package org.quartz.myexample;
import org.quartz.*;
import java.util.Date;
public class LongTimeJob implements Job {
public void execute(JobExecutionContext context) throws JobExecutionException {
System.err.println("---" + context.getJobDetail().getKey() + " executing.[" + new Date() + "]");
try {
// 3分钟
Thread.sleep(60000 * 3);
} catch (Exception ignore) {
//
}
System.err.println(" -" + context.getJobDetail().getKey() + " complete.[" + new Date() + "]");
}
}
然后设置只使用一个工作线程
// 限制只有一个工作线程 系统参数会覆盖配置文件中的参数
System.setProperty("org.quartz.threadPool.threadCount", "1");
同时要求任务启动时候为上一个整点时间,然后每10秒执行一次任务
Date startTime = DateBuilder.evenHourDateBefore(new Date());
// Trigger the job to run now, and then repeat every 40 seconds
Trigger trigger = TriggerBuilder.newTrigger()
.withIdentity("mytrigger", "group1")
.startAt(startTime)
.withSchedule(SimpleScheduleBuilder.simpleSchedule()
.withIntervalInSeconds(10)
.repeatForever())
.build();
src/main/resources/quartz.properties
配置文件内容如下,这里使用MySQL数据库作为JobStore
# Default Properties file for use by StdSchedulerFactory
# to create a Quartz Scheduler Instance, if a different
# properties file is not explicitly specified.
#
## 此调度程序的名称
org.quartz.scheduler.instanceName: DefaultQuartzScheduler
#org.quartz.scheduler.rmi.export: false
#org.quartz.scheduler.rmi.proxy: false
#org.quartz.scheduler.wrapJobExecutionInUserTransaction: false
#org.quartz.threadPool.class: org.quartz.simpl.SimpleThreadPool
## 线程池中有3个线程,这意味着最多可以同时运行3个job
org.quartz.threadPool.threadCount: 3
org.quartz.threadPool.threadPriority: 5
#org.quartz.threadPool.threadsInheritContextClassLoaderOfInitializingThread: true
#org.quartz.jobStore.misfireThreshold: 60000
# quartz的所有数据,包括job和trigger的配置,都会存储在内存中
#org.quartz.jobStore.class: org.quartz.simpl.RAMJobStore
org.quartz.scheduler.skipUpdateCheck=true
#org.terracotta.quartz.skipUpdateCheck=true
org.quartz.managementRESTService.enabled=false
org.quartz.jobStore.class = org.quartz.impl.jdbcjobstore.JobStoreTX
org.quartz.jobStore.driverDelegateClass = org.quartz.impl.jdbcjobstore.StdJDBCDelegate
org.quartz.jobStore.tablePrefix = QRTZ_
org.quartz.jobStore.dataSource = myDS
org.quartz.dataSource.myDS.connectionProvider.class:com.alibaba.druid.support.quartz.DruidQuartzConnectionProvider
org.quartz.dataSource.myDS.driverClassName = com.mysql.cj.jdbc.Driver
org.quartz.dataSource.myDS.url = jdbc:mysql://localhost:3306/quartz?characterEncoding=utf-8
org.quartz.dataSource.myDS.username = root
org.quartz.dataSource.myDS.password = root
org.quartz.dataSource.myDS.maxActive = 5
org.quartz.scheduler.instanceId=AUTO
logback日志文件src/main/resources/logback.xml
如下所示,将Quartz的日志级别调低到trace
<?xml version="1.0" encoding="UTF-8"?>
<configuration scan="true" scanPeriod="30 seconds" debug="true">
<appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
<!-- encoders are assigned the type
ch.qos.logback.classic.encoder.PatternLayoutEncoder by default -->
<encoder>
<pattern>%d{HH:mm:ss.SSS} [%thread] %-5level %logger{36} -%kvp- %msg%n</pattern>
</encoder>
</appender>
<logger name="org.quartz" level="trace"/>
<logger name="org.quartz.impl.jdbcjobstore.SimpleSemaphore" level="warn"/>
<root level="debug">
<appender-ref ref="STDOUT"/>
</root>
</configuration>
完整测试类为QuartzTest4Misfire,启动时需要设置启动参数-Dorg.quartz.properties=quartz.properties文件的绝对路径
。
package org.quartz.myexample;
import org.quartz.*;
import org.quartz.impl.StdSchedulerFactory;
import java.util.Date;
/**
* -Dorg.quartz.properties=D:\Tools\activtiDemo\src\main\resources\quartz.properties
*/
public class QuartzTest4Misfire {
public static void main(String[] args) {
try {
// 限制只有一个工作线程
System.setProperty("org.quartz.threadPool.threadCount", "1");
// Grab the scheduler instance from the Factory
Scheduler scheduler = StdSchedulerFactory.getDefaultScheduler();
scheduler.getContext().put("skey", "svalue");
Date startTime = DateBuilder.evenHourDateBefore(new Date());
System.err.println("开始触发时间" + startTime);
// Trigger the job to run now, and then repeat every 40 seconds
Trigger trigger = TriggerBuilder.newTrigger()
.withIdentity("mytrigger", "group1")
.startAt(startTime)
.withSchedule(SimpleScheduleBuilder.simpleSchedule()
.withIntervalInSeconds(10)
.repeatForever())
.build();
// define the job and tie it to our HelloJob class
JobDetail job = JobBuilder.newJob(LongTimeJob.class)
.withIdentity("myjob", "group1")
.build();
scheduler.deleteJob(job.getKey());
// Tell quartz to schedule the job using our trigger
scheduler.scheduleJob(job, trigger);
// and start it
scheduler.start();
Thread.sleep(Integer.MAX_VALUE);
scheduler.shutdown();
} catch (SchedulerException | InterruptedException e) {
e.printStackTrace();
}
}
}
首先在Quartz启动过程中可以看到以下内容。Handling 1 trigger(s) that missed their scheduled fire-time
,很明显,这里是处理了一个错过触发的触发任务。
在项目运行两分钟之后,又可以看到如下内容
日志简化如下
MisfireHandler: scanning for misfires...
Handling 1 trigger(s) that missed their scheduled fire-time.
这里有一个名称为QuartzScheduler_DefaultQuartzScheduler-NON_CLUSTERED_MisfireHandler
的线程在每隔1分钟执行一次scanning for misfires
操作,本次处理了一个错误触发的任务。
Misfire机制
从上面案例中,可以发现在Quartz启动过程中、启动后定时进行Misfire补偿,其实除了这两个地方之外,在任务调度时也会考虑补偿。总共有如下三种补偿机制。
1. 启动过程补偿
源码对应为QuartzScheduler#start
-> JobStoreSupport#schedulerStartedJob
(非集群模式)-> JobStoreSupport#recoverJobs
-> recoverMisfiredJobs
(recovering为true)
2. 定时任务补偿
JobStoreSupport.MisfireHandler
-> MisfireHandler#manage
-> JobStoreSupport#doRecoverMisfires
-> recoverMisfiredJobs
(recovering为false)
该定时任务也是在QuartzScheduler#start
过程中创建的。
3. 查询待触发列表时间区间补偿
QuartzSchedulerThread#run
-> JobStoreSupport#acquireNextTrigger
-> getDelegate().selectTriggerToAcquire
。
前两个最终都是调用org.quartz.impl.jdbcjobstore.JobStoreSupport#recoverMisfiredJobs
方法,只是recovering
值不同,后面会详述。查询待触发任务时会getMisfireTime
作为noEarlierThan
参数值,这样在当前时间前一段时间到未来一段时间的区间内的触发认为都有效,当前时间之前这段时间的触发任务其实就是一种补偿机制。而且在真实执行时,对于补偿策略配置为MISFIRE_INSTRUCTION_IGNORE_MISFIRE_POLICY
的触发器,只要nextFireTime
不超过noLaterThan
最大值,都会认为满足触发条件。详细细节参考Quartz项目搭建与任务执行源码分析的任务调度一节即可。
这里详细介绍recoverMisfiredJobs中的补偿机制详情。其源码如下
protected RecoverMisfiredJobsResult recoverMisfiredJobs(
Connection conn, boolean recovering)
throws JobPersistenceException, SQLException {
// If recovering, we want to handle all of the misfired
// triggers right away.
int maxMisfiresToHandleAtATime =
(recovering) ? -1 : getMaxMisfiresToHandleAtATime();
List<TriggerKey> misfiredTriggers = new LinkedList<TriggerKey>();
long earliestNewTime = Long.MAX_VALUE;
// We must still look for the MISFIRED state in case triggers were left
// in this state when upgrading to this version that does not support it.
boolean hasMoreMisfiredTriggers =
getDelegate().hasMisfiredTriggersInState(
conn, STATE_WAITING, getMisfireTime(),
maxMisfiresToHandleAtATime, misfiredTriggers);
if (hasMoreMisfiredTriggers) {
getLog().info(
"Handling the first " + misfiredTriggers.size() +
" triggers that missed their scheduled fire-time. " +
"More misfired triggers remain to be processed.");
} else if (misfiredTriggers.size() > 0) {
getLog().info(
"Handling " + misfiredTriggers.size() +
" trigger(s) that missed their scheduled fire-time.");
} else {
getLog().debug(
"Found 0 triggers that missed their scheduled fire-time.");
return RecoverMisfiredJobsResult.NO_OP;
}
for (TriggerKey triggerKey: misfiredTriggers) {
OperableTrigger trig =
retrieveTrigger(conn, triggerKey);
if (trig == null) {
continue;
}
doUpdateOfMisfiredTrigger(conn, trig, false, STATE_WAITING, recovering);
if(trig.getNextFireTime() != null && trig.getNextFireTime().getTime() < earliestNewTime)
earliestNewTime = trig.getNextFireTime().getTime();
}
return new RecoverMisfiredJobsResult(
hasMoreMisfiredTriggers, misfiredTriggers.size(), earliestNewTime);
}
这里主要的逻辑就是通过DriverDelegate#hasMisfiredTriggersInState
查询满足Misfire条件的Tiggers,并将结果存放在misfiredTriggers
列表,然后遍历通过JobStoreSupport#doUpdateOfMisfiredTrigger
修改内容。如果recovering为true,则单次查询条数为Long.MAX_VALUE
,否则默认为20,这里主要是性能考虑。查询语句如下
SELECT COUNT(TRIGGER_NAME)
FROM QRTZ_TRIGGERS
WHERE SCHED_NAME = 'DefaultQuartzScheduler'
AND NOT (MISFIRE_INSTR = -1)
AND NEXT_FIRE_TIME < ?
AND TRIGGER_STATE = 'WAITING'
其中NEXT_FIRE_TIME
的值取决于misfireThreshold
的值。默认为当前时间减去一分钟,用户可以通过参数org.quartz.jobStore.misfireThreshold
来修改,但是这个配置值如果小于0,其实是没有效果的,最终会按照当前时间计算。
private long misfireThreshold = 60000L; // one minute
protected long getMisfireTime() {
long misfireTime = System.currentTimeMillis();
if (getMisfireThreshold() > 0) {
misfireTime -= getMisfireThreshold();
}
return (misfireTime > 0) ? misfireTime : 0;
}
只要状态为WAITING
,下一次触发时间为前一分钟之内而且Misfire补偿机制不为-1(MISFIRE_INSTRUCTION_IGNORE_MISFIRE_POLICY
)的QRTZ_TRIGGERS表数据,都是需要进行补偿的触发器。
查询出待补偿数据之后,就会执行doUpdateOfMisfiredTrigger
方法进行补偿了。
private void doUpdateOfMisfiredTrigger(Connection conn, OperableTrigger trig, boolean forceState, String newStateIfNotComplete, boolean recovering) throws JobPersistenceException {
Calendar cal = null;
if (trig.getCalendarName() != null) {
cal = retrieveCalendar(conn, trig.getCalendarName());
}
schedSignaler.notifyTriggerListenersMisfired(trig);
trig.updateAfterMisfire(cal);
if (trig.getNextFireTime() == null) {
storeTrigger(conn, trig,
null, true, STATE_COMPLETE, forceState, recovering);
schedSignaler.notifySchedulerListenersFinalized(trig);
} else {
storeTrigger(conn, trig, null, true, newStateIfNotComplete,
forceState, false);
}
}
这里storeTrigger修改数据库数据无非就是修改触发器的状态和下一次触发时间这些值,如果没有下一次触发时间,则状态为COMPLETE,否则为WAITING。最关键的是下一次触发时间的计算,这里trig.updateAfterMisfire(cal)
是关键。这里会根据用户设置的补偿策略
来计算触发时间。什么是补偿策略呢?在前面已经看到的MISFIRE_INSTRUCTION_IGNORE_MISFIRE_POLICY
就是补偿策略的一种。
在创建触发器的时候可以指定补偿策略,如果不指定则为MISFIRE_INSTRUCTION_SMART_POLICY
。
触发器的补偿策略可以通过方法org.quartz.Trigger#getMisfireInstruction
来获取,在数据库当中对应表QRTZ_TRIGGERS
的MISFIRE_INSTR
字段。
/**
* Get the instruction the <code>Scheduler</code> should be given for
* handling misfire situations for this <code>Trigger</code>- the
* concrete <code>Trigger</code> type that you are using will have
* defined a set of additional <code>MISFIRE_INSTRUCTION_XXX</code>
* constants that may be set as this property's value.
*
* <p>
* If not explicitly set, the default value is <code>MISFIRE_INSTRUCTION_SMART_POLICY</code>.
* </p>
*
* @see #MISFIRE_INSTRUCTION_SMART_POLICY
* @see SimpleTrigger
* @see CronTrigger
*/
public int getMisfireInstruction();
在源码当中我们会发现其实在Trigger接口中补偿策略只有两种,
MISFIRE_INSTRUCTION_SMART_POLICY
和MISFIRE_INSTRUCTION_IGNORE_MISFIRE_POLICY
,
前者会根据环境自动选择,后者则忽略补偿策略(也就是前面看到的-1),这种情况无论下一次触发时间有多早,都会在查询待触发列表
时被查出来然后执行。
然后在CronTrigger
当中,
我们又可以看到两种策略,MISFIRE_INSTRUCTION_FIRE_ONCE_NOW
和MISFIRE_INSTRUCTION_DO_NOTHING
,在org.quartz.impl.triggers.CronTriggerImpl#updateAfterMisfire
当中可以看到这两个值是如何影响下一次触发时间的。
@Override
public void updateAfterMisfire(org.quartz.Calendar cal) {
int instr = getMisfireInstruction();
if(instr == Trigger.MISFIRE_INSTRUCTION_IGNORE_MISFIRE_POLICY)
return;
if (instr == MISFIRE_INSTRUCTION_SMART_POLICY) {
instr = MISFIRE_INSTRUCTION_FIRE_ONCE_NOW;
}
if (instr == MISFIRE_INSTRUCTION_DO_NOTHING) {
Date newFireTime = getFireTimeAfter(new Date());
while (newFireTime != null && cal != null
&& !cal.isTimeIncluded(newFireTime.getTime())) {
newFireTime = getFireTimeAfter(newFireTime);
}
setNextFireTime(newFireTime);
} else if (instr == MISFIRE_INSTRUCTION_FIRE_ONCE_NOW) {
setNextFireTime(new Date());
}
}
可以看到,配置为SMART_POLICY
或是FIRE_ONCE_NOW
都是立即触发补偿,而DO_NOTHING
则等到下一次触发(其实就是不进行任何补偿)。但是到了SimpleTrigger
当中,这一切变得复杂起来了。
这里有5个策略,除了第一个立即触发之外,都会考虑一个_COUNT
,这主要是对于普通触发器有一个repeatCount
参数用于控制任务的触发次数,对于CronTrigger其实隐含这个值是无穷大,减不减一根本不影响结果。但是SimpleTrigger
就不行,补偿机制必须考虑这个次数是否补偿。对于默认情况下,也会根据repeatCount
自动调整策略。
if (instr == Trigger.MISFIRE_INSTRUCTION_SMART_POLICY) {
if (getRepeatCount() == 0) {
instr = MISFIRE_INSTRUCTION_FIRE_NOW;
} else if (getRepeatCount() == REPEAT_INDEFINITELY) {
instr = MISFIRE_INSTRUCTION_RESCHEDULE_NEXT_WITH_REMAINING_COUNT;
} else {
// if (getRepeatCount() > 0)
instr = MISFIRE_INSTRUCTION_RESCHEDULE_NOW_WITH_EXISTING_REPEAT_COUNT;
}
} else if (instr == MISFIRE_INSTRUCTION_FIRE_NOW && getRepeatCount() != 0) {
instr = MISFIRE_INSTRUCTION_RESCHEDULE_NOW_WITH_REMAINING_REPEAT_COUNT;
}
默认情况下
repeatCount | 策略 | 备注 |
---|---|---|
0 | FIRE_NOW | |
无穷值 | RESCHEDULE_NEXT_WITH_REMAINING_COUNT | |
0 | RESCHEDULE_NOW_WITH_EXISTING_REPEAT_COUNT |
repeatCount
不为空,但设置策略为FIRE_NOW
也会调整为RESCHEDULE_NOW_WITH_REMAINING_REPEAT_COUNT
。因为立即触发只是简简单单计算下一次触发时间。
RESCHEDULE_NEXT_WITH_EXISTING_COUNT
不考虑补偿触发时间和触发次数。
else if (instr == MISFIRE_INSTRUCTION_RESCHEDULE_NEXT_WITH_EXISTING_COUNT) {
Date newFireTime = getFireTimeAfter(new Date());
while (newFireTime != null && cal != null
&& !cal.isTimeIncluded(newFireTime.getTime())) {
newFireTime = getFireTimeAfter(newFireTime);
if(newFireTime == null)
break;
//avoid infinite loop
java.util.Calendar c = java.util.Calendar.getInstance();
c.setTime(newFireTime);
if (c.get(java.util.Calendar.YEAR) > YEAR_TO_GIVEUP_SCHEDULING_AT) {
newFireTime = null;
}
}
setNextFireTime(newFireTime);
}
MISFIRE_INSTRUCTION_RESCHEDULE_NEXT_WITH_REMAINING_COUNT
会计算错过触发的次数并补偿到timesTriggered
字段,不会改变repeatCount
。
else if (instr == MISFIRE_INSTRUCTION_RESCHEDULE_NEXT_WITH_REMAINING_COUNT) {
Date newFireTime = getFireTimeAfter(new Date());
while (newFireTime != null && cal != null
&& !cal.isTimeIncluded(newFireTime.getTime())) {
newFireTime = getFireTimeAfter(newFireTime);
if(newFireTime == null)
break;
//avoid infinite loop
java.util.Calendar c = java.util.Calendar.getInstance();
c.setTime(newFireTime);
if (c.get(java.util.Calendar.YEAR) > YEAR_TO_GIVEUP_SCHEDULING_AT) {
newFireTime = null;
}
}
if (newFireTime != null) {
int timesMissed = computeNumTimesFiredBetween(nextFireTime,
newFireTime);
setTimesTriggered(getTimesTriggered() + timesMissed);
}
setNextFireTime(newFireTime);
}
MISFIRE_INSTRUCTION_RESCHEDULE_NOW_WITH_EXISTING_REPEAT_COUNT
会重新计算repeatCount
并将timesTriggered
参数清零。并将当前时间作为触发时间进行补偿,但这里会考虑设置的结束时间,如果已经到了结束时间,则不再触发。
else if (instr == MISFIRE_INSTRUCTION_RESCHEDULE_NOW_WITH_EXISTING_REPEAT_COUNT) {
Date newFireTime = new Date();
if (repeatCount != 0 && repeatCount != REPEAT_INDEFINITELY) {
setRepeatCount(getRepeatCount() - getTimesTriggered());
setTimesTriggered(0);
}
if (getEndTime() != null && getEndTime().before(newFireTime)) {
setNextFireTime(null); // We are past the end time
} else {
setStartTime(newFireTime);
setNextFireTime(newFireTime);
}
}
MISFIRE_INSTRUCTION_RESCHEDULE_NOW_WITH_REMAINING_REPEAT_COUNT
会重新计算剩余的触发次数(包含错过触发的次数),更新repeatCount
并将timesTriggered
清零,将当前时间作为下一次触发时间。这里会考虑设置的结束时间,如果已经到了结束时间,则不再触发。
else if (instr == MISFIRE_INSTRUCTION_RESCHEDULE_NOW_WITH_REMAINING_REPEAT_COUNT) {
Date newFireTime = new Date();
int timesMissed = computeNumTimesFiredBetween(nextFireTime,
newFireTime);
if (repeatCount != 0 && repeatCount != REPEAT_INDEFINITELY) {
int remainingCount = getRepeatCount()
- (getTimesTriggered() + timesMissed);
if (remainingCount <= 0) {
remainingCount = 0;
}
setRepeatCount(remainingCount);
setTimesTriggered(0);
}
if (getEndTime() != null && getEndTime().before(newFireTime)) {
setNextFireTime(null); // We are past the end time
} else {
setStartTime(newFireTime);
setNextFireTime(newFireTime);
}
}
补偿策略 | 枚举值 | 补偿机制 |
---|---|---|
MISFIRE_INSTRUCTION_FIRE_NOW | 1 | 当前时间作为下一次触发时间 |
MISFIRE_INSTRUCTION_RESCHEDULE_NEXT_WITH_EXISTING_COUNT | 5 | 重新计算下一次触发时间 |
MISFIRE_INSTRUCTION_RESCHEDULE_NEXT_WITH_REMAINING_COUNT | 4 | 重新计算下一次触发时间,将错过触发次数补偿到timesTriggered字段 |
MISFIRE_INSTRUCTION_RESCHEDULE_NOW_WITH_EXISTING_REPEAT_COUNT | 2 | 重新计算repeatCount(原值-timesTriggered),timesTriggered清零,当前时间作为下一次触发时间,考虑是否已到结束时间 |
MISFIRE_INSTRUCTION_RESCHEDULE_NOW_WITH_REMAINING_REPEAT_COUNT | 3 | 重新计算repeatCount(原值-timesTriggered-错过触发次数),timesTriggered清零,当前时间作为下一次触发时间,考虑是否已到结束时间 |
总结:Quartz为了补偿Misfire,为用户提供可选用的各种补偿策略,并在初始化、查询触发任务以及创建了一个后台线程定时进行补偿。但是无论哪种补偿机制,终究都是补偿。正确了解后面的机制,合理设置工作线程大小,创建任务时考虑任务时长,合理设置触发时间,才是根本之道。