Quartz作为任务调度的组件,其中涉及到多种线程,主要分为主线程、调度线程和工作线程。
主线程:创建Quartz的调度工厂(SchedulerFactory)、调度器(Scheduler)、触发器(Trigger)、任务(Job)并启动调度器的线程。这里的主线程只是为了区分Quartz内部线程,与程序的主线程并不等价,可以是任意的其他非Quartz线程。
调度线程:根据触发条件获取需要执行的任务并分配给工作线程,只有一个
工作线程:真实执行Job的线程,一般有多个
工作线程的初始化
在org.quartz.impl.StdSchedulerFactory#instantiate()方法当中会执行工作线程池的初始化
rsrcs.setThreadPool(tp);
if(tp instanceof SimpleThreadPool) {
if(threadsInheritInitalizersClassLoader)
((SimpleThreadPool)tp).setThreadsInheritContextClassLoaderOfInitializingThread(threadsInheritInitalizersClassLoader);
}
tp.initialize();
tpInited = true;
默认的实现为org.quartz.simpl.SimpleThreadPool
,可以通过org.quartz.threadPool.class
指定其他的实现,但是必须实现org.quartz.spi.ThreadPool
接口。常见的一些配置参考如下
org.quartz.threadPool.class: org.quartz.simpl.SimpleThreadPool
org.quartz.threadPool.threadCount: 3
org.quartz.threadPool.threadPriority: 5
该接口中除了initialize初始化方法外,该接口中涉及到调度最重要的两个方法为runInThread和blockForAvailableThreads方法,前者用于添加要执行的任务,而后者用于阻塞等待直到有可用的线程。
/**
* <p>
* Must be called before the <code>ThreadPool</code> is
* used, in order to give the it a chance to initialize.
* </p>
*
* <p>Typically called by the <code>SchedulerFactory</code>.</p>
*/
void initialize() throws SchedulerConfigException;
/**
* <p>
* Execute the given <code>{@link java.lang.Runnable}</code> in the next
* available <code>Thread</code>.
* </p>
*
* <p>
* The implementation of this interface should not throw exceptions unless
* there is a serious problem (i.e. a serious misconfiguration). If there
* are no immediately available threads <code>false</code> should be returned.
* </p>
*
* @return true, if the runnable was assigned to run on a Thread.
*/
boolean runInThread(Runnable runnable);
/**
* <p>
* Determines the number of threads that are currently available in in
* the pool. Useful for determining the number of times
* <code>runInThread(Runnable)</code> can be called before returning
* false.
* </p>
*
* <p>The implementation of this method should block until there is at
* least one available thread.</p>
*
* @return the number of currently available threads
*/
int blockForAvailableThreads();
SimpleThreadPool初始化时会根据count创建指定数量的工作线程。这里使用了三个列表,workers列表存放全部的工作线程对象,availWorkers用于存放可用的工作线程对象,busyWorkers用于存放正在工作的线程对象。
private List<WorkerThread> workers;
private LinkedList<WorkerThread> availWorkers = new LinkedList<WorkerThread>();
private LinkedList<WorkerThread> busyWorkers = new LinkedList<WorkerThread>();
public void initialize() throws SchedulerConfigException {
// ... 去掉一些参数检查逻辑
// create the worker threads and start them
Iterator<WorkerThread> workerThreads = createWorkerThreads(count).iterator();
while(workerThreads.hasNext()) {
WorkerThread wt = workerThreads.next();
wt.start();
availWorkers.add(wt);
}
}
protected List<WorkerThread> createWorkerThreads(int createCount) {
workers = new LinkedList<WorkerThread>();
for (int i = 1; i<= createCount; ++i) {
String threadPrefix = getThreadNamePrefix();
if (threadPrefix == null) {
threadPrefix = schedulerInstanceName + "_Worker";
}
WorkerThread wt = new WorkerThread(this, threadGroup,
threadPrefix + "-" + i,
getThreadPriority(),
isMakeThreadsDaemons());
if (isThreadsInheritContextClassLoaderOfInitializingThread()) {
wt.setContextClassLoader(Thread.currentThread()
.getContextClassLoader());
}
workers.add(wt);
}
return workers;
}
工作线程WorkerThread的主要逻辑在run方法当中,主要是等待runnable属性有值,然后执行而已。
@Override
public void run() {
boolean ran = false;
while (run.get()) {
try {
synchronized(lock) {
while (runnable == null && run.get()) {
lock.wait(500);
}
if (runnable != null) {
ran = true;
runnable.run();
}
}
} catch (InterruptedException unblock) {
// ... 省略异常处理
} finally {
synchronized(lock) {
runnable = null;
}
// repair the thread in case the runnable mucked it up...
if(getPriority() != tp.getThreadPriority()) {
setPriority(tp.getThreadPriority());
}
if (runOnce) {
run.set(false);
clearFromBusyWorkersList(this);
} else if(ran) {
ran = false;
makeAvailable(this);
}
}
}
}
}
默认情况下工作线程的runnable没有值,工作线程处于等待状态。当调用org.quartz.simpl.SimpleThreadPool#runInThread方法时,会从availWorkers列表中选择第一个工作线程,然后赋值,并唤醒工作线程,执行任务。
public boolean runInThread(Runnable runnable) {
if (runnable == null) {
return false;
}
synchronized (nextRunnableLock) {
handoffPending = true;
// Wait until a worker thread is available
while ((availWorkers.size() < 1) && !isShutdown) {
try {
nextRunnableLock.wait(500);
} catch (InterruptedException ignore) {
}
}
if (!isShutdown) {
WorkerThread wt = (WorkerThread)availWorkers.removeFirst();
busyWorkers.add(wt);
wt.run(runnable);
} else {
// If the thread pool is going down, execute the Runnable
// within a new additional worker thread (no thread from the pool).
WorkerThread wt = new WorkerThread(this, threadGroup,
"WorkerThread-LastJob", prio, isMakeThreadsDaemons(), runnable);
busyWorkers.add(wt);
workers.add(wt);
wt.start();
}
nextRunnableLock.notifyAll();
handoffPending = false;
}
return true;
}
调度线程的初始化
调度线程在org.quartz.core.QuartzScheduler的属性schedThread
private QuartzSchedulerThread schedThread;
在QuartzScheduler构造时进行的创建,然后通过调度线程池来启动(调度线程池可以通过参数org.quartz.threadExecutor.class
指定,默认值为org.quartz.impl.DefaultThreadExecutor
)。
this.schedThread = new QuartzSchedulerThread(this, resources);
ThreadExecutor schedThreadExecutor = resources.getThreadExecutor();
schedThreadExecutor.execute(this.schedThread); // 启动线程池
启动线程池之后,调度线程并不会真实进入任务调度,因为此时调度线程的属性paused
为true。这个值是在构造时指定的。
QuartzSchedulerThread(QuartzScheduler qs, QuartzSchedulerResources qsRsrcs, boolean setDaemon, int threadPrio) {
super(qs.getSchedulerThreadGroup(), qsRsrcs.getThreadName());
this.qs = qs;
this.qsRsrcs = qsRsrcs;
this.setDaemon(setDaemon);
if(qsRsrcs.isThreadsInheritInitializersClassLoadContext()) {
log.info("QuartzSchedulerThread Inheriting ContextClassLoader of thread: " + Thread.currentThread().getName());
this.setContextClassLoader(Thread.currentThread().getContextClassLoader());
}
this.setPriority(threadPrio);
// start the underlying thread, but put this object into the 'paused'
// state
// so processing doesn't start yet...
paused = true;
halted = new AtomicBoolean(false);
}
此时paused的值为true,调度线程会一直在循环等待直到有线程调用org.quartz.core.QuartzSchedulerThread#togglePause
方法。
@Override
public void run() {
boolean lastAcquireFailed = false;
while (!halted.get()) {
try {
// check if we're supposed to pause...
synchronized (sigLock) {
while (paused && !halted.get()) {
try {
// wait until togglePause(false) is called...
sigLock.wait(1000L);
} catch (InterruptedException ignore) {
}
}
if (halted.get()) {
break;
}
}
int availThreadCount = qsRsrcs.getThreadPool().blockForAvailableThreads();
// 其他查询任务并分配给工作线程的过程 省略
} // while (!halted)
// drop references to scheduler stuff to aid garbage collection...
qs = null;
qsRsrcs = null;
}
以上参数的修改是在org.quartz.core.QuartzScheduler#start
方法中实现的。
public void start() throws SchedulerException {
if (shuttingDown|| closed) {
throw new SchedulerException(
"The Scheduler cannot be restarted after shutdown() has been called.");
}
// QTZ-212 : calling new schedulerStarting() method on the listeners
// right after entering start()
notifySchedulerListenersStarting();
if (initialStart == null) {
initialStart = new Date();
this.resources.getJobStore().schedulerStarted();
startPlugins();
} else {
resources.getJobStore().schedulerResumed();
}
schedThread.togglePause(false);
getLog().info(
"Scheduler " + resources.getUniqueIdentifier() + " started.");
notifySchedulerListenersStarted();
}
这也是为啥在主线程中一定要调用org.quartz.core.QuartzScheduler#start
方法,任务才会真正执行的原因了。下图中有一个调度线程和三个工作线程。