Quartz线程调度源码分析

news2024/12/24 13:02:59

Quartz作为任务调度的组件,其中涉及到多种线程,主要分为主线程、调度线程和工作线程。
在这里插入图片描述
主线程:创建Quartz的调度工厂(SchedulerFactory)、调度器(Scheduler)、触发器(Trigger)、任务(Job)并启动调度器的线程。这里的主线程只是为了区分Quartz内部线程,与程序的主线程并不等价,可以是任意的其他非Quartz线程。
调度线程:根据触发条件获取需要执行的任务并分配给工作线程,只有一个
工作线程:真实执行Job的线程,一般有多个

工作线程的初始化

在org.quartz.impl.StdSchedulerFactory#instantiate()方法当中会执行工作线程池的初始化

rsrcs.setThreadPool(tp);
if(tp instanceof SimpleThreadPool) {
    if(threadsInheritInitalizersClassLoader)
        ((SimpleThreadPool)tp).setThreadsInheritContextClassLoaderOfInitializingThread(threadsInheritInitalizersClassLoader);
}
tp.initialize();
tpInited = true;

默认的实现为org.quartz.simpl.SimpleThreadPool,可以通过org.quartz.threadPool.class指定其他的实现,但是必须实现org.quartz.spi.ThreadPool接口。常见的一些配置参考如下

org.quartz.threadPool.class: org.quartz.simpl.SimpleThreadPool
org.quartz.threadPool.threadCount: 3
org.quartz.threadPool.threadPriority: 5

该接口中除了initialize初始化方法外,该接口中涉及到调度最重要的两个方法为runInThread和blockForAvailableThreads方法,前者用于添加要执行的任务,而后者用于阻塞等待直到有可用的线程。

/**
 * <p>
 * Must be called before the <code>ThreadPool</code> is
 * used, in order to give the it a chance to initialize.
 * </p>
 * 
 * <p>Typically called by the <code>SchedulerFactory</code>.</p>
 */
void initialize() throws SchedulerConfigException;
/**
  * <p>
  * Execute the given <code>{@link java.lang.Runnable}</code> in the next
  * available <code>Thread</code>.
  * </p>
  *
  * <p>
  * The implementation of this interface should not throw exceptions unless
  * there is a serious problem (i.e. a serious misconfiguration). If there
  * are no immediately available threads <code>false</code> should be returned.
  * </p>
  *
  * @return true, if the runnable was assigned to run on a Thread.
  */
 boolean runInThread(Runnable runnable);

 /**
  * <p>
  * Determines the number of threads that are currently available in in
  * the pool.  Useful for determining the number of times
  * <code>runInThread(Runnable)</code> can be called before returning
  * false.
  * </p>
  *
  * <p>The implementation of this method should block until there is at
  * least one available thread.</p>
  *
  * @return the number of currently available threads
  */
 int blockForAvailableThreads();

SimpleThreadPool初始化时会根据count创建指定数量的工作线程。这里使用了三个列表,workers列表存放全部的工作线程对象,availWorkers用于存放可用的工作线程对象,busyWorkers用于存放正在工作的线程对象。

private List<WorkerThread> workers;
private LinkedList<WorkerThread> availWorkers = new LinkedList<WorkerThread>();
private LinkedList<WorkerThread> busyWorkers = new LinkedList<WorkerThread>();

public void initialize() throws SchedulerConfigException {
    // ... 去掉一些参数检查逻辑
    // create the worker threads and start them
    Iterator<WorkerThread> workerThreads = createWorkerThreads(count).iterator();
    while(workerThreads.hasNext()) {
        WorkerThread wt = workerThreads.next();
        wt.start();
        availWorkers.add(wt);
    }
}

protected List<WorkerThread> createWorkerThreads(int createCount) {
    workers = new LinkedList<WorkerThread>();
    for (int i = 1; i<= createCount; ++i) {
        String threadPrefix = getThreadNamePrefix();
        if (threadPrefix == null) {
            threadPrefix = schedulerInstanceName + "_Worker";
        }
        WorkerThread wt = new WorkerThread(this, threadGroup,
            threadPrefix + "-" + i,
            getThreadPriority(),
            isMakeThreadsDaemons());
        if (isThreadsInheritContextClassLoaderOfInitializingThread()) {
            wt.setContextClassLoader(Thread.currentThread()
                    .getContextClassLoader());
        }
        workers.add(wt);
    }

    return workers;
}

工作线程WorkerThread的主要逻辑在run方法当中,主要是等待runnable属性有值,然后执行而已。

 @Override
 public void run() {
     boolean ran = false;
     
     while (run.get()) {
         try {
             synchronized(lock) {
                 while (runnable == null && run.get()) {
                     lock.wait(500);
                 }

                 if (runnable != null) {
                     ran = true;
                     runnable.run();
                 }
             }
         } catch (InterruptedException unblock) {
         	// ... 省略异常处理
         } finally {
             synchronized(lock) {
                 runnable = null;
             }
             // repair the thread in case the runnable mucked it up...
             if(getPriority() != tp.getThreadPriority()) {
                 setPriority(tp.getThreadPriority());
             }

             if (runOnce) {
                    run.set(false);
                 clearFromBusyWorkersList(this);
             } else if(ran) {
                 ran = false;
                 makeAvailable(this);
             }
         }
     }
 }
}

默认情况下工作线程的runnable没有值,工作线程处于等待状态。当调用org.quartz.simpl.SimpleThreadPool#runInThread方法时,会从availWorkers列表中选择第一个工作线程,然后赋值,并唤醒工作线程,执行任务。

public boolean runInThread(Runnable runnable) {
    if (runnable == null) {
        return false;
    }

    synchronized (nextRunnableLock) {

        handoffPending = true;

        // Wait until a worker thread is available
        while ((availWorkers.size() < 1) && !isShutdown) {
            try {
                nextRunnableLock.wait(500);
            } catch (InterruptedException ignore) {
            }
        }

        if (!isShutdown) {
            WorkerThread wt = (WorkerThread)availWorkers.removeFirst();
            busyWorkers.add(wt);
            wt.run(runnable);
        } else {
            // If the thread pool is going down, execute the Runnable
            // within a new additional worker thread (no thread from the pool).
            WorkerThread wt = new WorkerThread(this, threadGroup,
                    "WorkerThread-LastJob", prio, isMakeThreadsDaemons(), runnable);
            busyWorkers.add(wt);
            workers.add(wt);
            wt.start();
        }
        nextRunnableLock.notifyAll();
        handoffPending = false;
    }

    return true;
}

调度线程的初始化

调度线程在org.quartz.core.QuartzScheduler的属性schedThread

private QuartzSchedulerThread schedThread;

在QuartzScheduler构造时进行的创建,然后通过调度线程池来启动(调度线程池可以通过参数org.quartz.threadExecutor.class指定,默认值为org.quartz.impl.DefaultThreadExecutor)。

this.schedThread = new QuartzSchedulerThread(this, resources);
ThreadExecutor schedThreadExecutor = resources.getThreadExecutor();
schedThreadExecutor.execute(this.schedThread);  // 启动线程池

启动线程池之后,调度线程并不会真实进入任务调度,因为此时调度线程的属性paused为true。这个值是在构造时指定的。

QuartzSchedulerThread(QuartzScheduler qs, QuartzSchedulerResources qsRsrcs, boolean setDaemon, int threadPrio) {
    super(qs.getSchedulerThreadGroup(), qsRsrcs.getThreadName());
    this.qs = qs;
    this.qsRsrcs = qsRsrcs;
    this.setDaemon(setDaemon);
    if(qsRsrcs.isThreadsInheritInitializersClassLoadContext()) {
        log.info("QuartzSchedulerThread Inheriting ContextClassLoader of thread: " + Thread.currentThread().getName());
        this.setContextClassLoader(Thread.currentThread().getContextClassLoader());
    }

    this.setPriority(threadPrio);

    // start the underlying thread, but put this object into the 'paused'
    // state
    // so processing doesn't start yet...
    paused = true;
    halted = new AtomicBoolean(false);
}

此时paused的值为true,调度线程会一直在循环等待直到有线程调用org.quartz.core.QuartzSchedulerThread#togglePause方法。

   @Override
    public void run() {
        boolean lastAcquireFailed = false;

        while (!halted.get()) {
            try {
                // check if we're supposed to pause...
                synchronized (sigLock) {
                    while (paused && !halted.get()) {
                        try {
                            // wait until togglePause(false) is called...
                            sigLock.wait(1000L);
                        } catch (InterruptedException ignore) {
                        }
                    }

                    if (halted.get()) {
                        break;
                    }
                }

                int availThreadCount = qsRsrcs.getThreadPool().blockForAvailableThreads();
				
				// 其他查询任务并分配给工作线程的过程 省略
				
        } // while (!halted)

        // drop references to scheduler stuff to aid garbage collection...
        qs = null;
        qsRsrcs = null;
    }

以上参数的修改是在org.quartz.core.QuartzScheduler#start方法中实现的。

public void start() throws SchedulerException {

    if (shuttingDown|| closed) {
        throw new SchedulerException(
                "The Scheduler cannot be restarted after shutdown() has been called.");
    }

    // QTZ-212 : calling new schedulerStarting() method on the listeners
    // right after entering start()
    notifySchedulerListenersStarting();

    if (initialStart == null) {
        initialStart = new Date();
        this.resources.getJobStore().schedulerStarted();            
        startPlugins();
    } else {
        resources.getJobStore().schedulerResumed();
    }

    schedThread.togglePause(false);

    getLog().info(
            "Scheduler " + resources.getUniqueIdentifier() + " started.");
    
    notifySchedulerListenersStarted();
}

这也是为啥在主线程中一定要调用org.quartz.core.QuartzScheduler#start方法,任务才会真正执行的原因了。下图中有一个调度线程和三个工作线程。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/788103.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Pytest结合数据驱动-yaml

Pytest 结合数据驱动 YAML 数据驱动 什么是数据驱动&#xff1f; 数据驱动就是数据的改变从而驱动自动化测试的执行&#xff0c;最终引起测试结果的改变。简单来说&#xff0c;就是参数化的应用。数据量小的测试用例可以使用代码的参数化来实现数据驱动&#xff0c;数据量大的…

Ceph社区上游正式合入openEuler原生支持,并通过CI持续验证

作为覆盖全场景应用、支持多样性算力的面向数字基础设施的开源操作系统&#xff0c;openEuler始终遵循“上游优先”的策略&#xff0c;帮助上游开源软件原生支持openEuler&#xff0c;让openEuler系操作系统的用户可以在开发、集成、使用这些开源软件或基于这些开源软件的产品和…

比较版本号(力扣)思维 JAVA

给你两个版本号 version1 和 version2 &#xff0c;请你比较它们。 版本号由一个或多个修订号组成&#xff0c;各修订号由一个 ‘.’ 连接。每个修订号由 多位数字 组成&#xff0c;可能包含 前导零。每个版本号至少包含一个字符。修订号从左到右编号&#xff0c;下标从 0 开始…

vue3+ts+elementui-plus二次封装树形表格

复制粘贴即可&#xff1a; 一、定义table组件 <template><div classmain><div><el-table ref"multipleTableRef" :height"height" :default-expand-all"isExpend" :data"treeTableData"style"width: 100%…

IL汇编字符串连接

在此实现了一个基本的IL汇编程序&#xff1b; 了解MSIL汇编和IL汇编评估堆栈_bcbobo21cn的博客-CSDN博客 它用了下面两句来在屏幕输出字符串&#xff0c; ldstr "I am from the IL Assembly Language..." call void [mscorlib]System.Console::WriteLine (string) …

Ros终端出现找不到bash: /home/***/devel/setup.bash: 没有那个文件或目录

现象&#xff1a;Ros终端出现找不到bash: /home/***/devel/setup.bash: 没有那个文件或目录 问题&#xff1a;配置时路径写错 解决方法&#xff1a;改正路径 1.打开文件 gedit ~/.bashrc2.修改正确路径

Day 42算法记录| 动态规划 08

这里写目录标题 139. 单词拆分多重背包问题背包问题总结 139. 单词拆分 单词就是物品&#xff0c;字符串s就是背包 1.dp[0]背包啥也不要用装&#xff0c;true。 2. for循环&#xff0c;顺序很重要&#xff0c;所以先背包再物品 如果求组合数就是外层for循环遍历物品&#xff0…

UG\NX二次开发 遍历部件中所有表达式

文章作者:里海 来源网站:https://blog.csdn.net/WangPaiFeiXingYuan 简介: 遍历部件中所有表达式 效果: 代码: #include "me.hpp" #include <uf_defs.h> #include <NXOpen/NXException.hxx> #include <NXOpen/Session.hxx> #include <NXO…

java根据模板导出word

java根据模板导出word 日常开发中&#xff0c;常常会遇到各种各样的表格进行导出&#xff0c;比较好的办法就是提前弄好word模版&#xff0c;再通过遍历的方式进行导出文档 1、制作word模版 模版编写 内容替换 目标下面模版进行多页展示 将word转换成xml 将xml格式化 再将x…

FTP文件传输工具:简单、高效、实用的数据传输方式

在当今的信息化社会&#xff0c;数据已经成为各行各业的重要资产&#xff0c;而数据的传输和交换则是数据价值的体现。在很多场景中&#xff0c;企业需要传输或接收大文件&#xff0c;比如设计图纸、视频素材、软件开发包、数据库备份等。这些文件的大小通常在几百兆字节到几十…

版本控制泄露代码

在文章的开头我们首先得了解什么是版本控制 在实际的开发过程中&#xff0c;我们会用到两个工具来进行版本控制和代码托管&#xff0c;它们分别是git和svn。 而在CTF中我们主要面对的就是git或svn的版本控制泄露。 &#xff08;1&#xff09;在使用 SVN 管理本地代码过程中&am…

在拦截器中使用redis报错空指针

问题 当在拦截器中使用 redis 时&#xff0c;获取不到 RedisTemplate 对象 原因 拦截器在SpringContext初始化之前就执行了&#xff0c;即Bean初始化之前它就执行了&#xff0c;所以肯定是无法获取SpringIOC容器中的内容的 解决 提前实例化拦截器 在配置类里面先实例化拦截…

vue2项目迁移到vue3中的改动——vuex部分——基础积累2

pinia中文文档里面有关于vuex的部分&#xff1a; Pinia 最初是在 2019 年 11 月左右重新设计使用 Composition API 。从那时起&#xff0c;最初的原则仍然相同&#xff0c;但 Pinia 对 Vue 2 和 Vue 3 都有效&#xff0c;并且不需要您使用组合 API。 除了安装和 SSR 之外&…

【使用时空RBF-NN进行非线性系统识别】实现了 RBF、分数 RBF 和时空 RBF 神经网络,用于非线性系统识别研究(Matlab代码实现)

目录 &#x1f4a5;1 概述 &#x1f4da;2 运行结果 2.1 算例1 2.2 算例2 &#x1f389;3 参考文献 &#x1f308;4 Matlab代码实现 &#x1f4a5;1 概述 本文用于非线性系统识别任务的径向基函数神经网络&#xff08;RBF-NN&#xff09;的三种变体。特别是&#xff0c;我实现…

Cesium态势标绘专题-正多边形(标绘+编辑)

标绘专题介绍:态势标绘专题介绍_总要学点什么的博客-CSDN博客 入口文件:Cesium态势标绘专题-入口_总要学点什么的博客-CSDN博客 辅助文件:Cesium态势标绘专题-辅助文件_总要学点什么的博客-CSDN博客 本专题没有废话,只有代码,代码中涉及到的引入文件方法,从上面三个链…

Android OpenGL 教程——窗口初始化绘制矩形

上节介绍了 Native 工程初始化&#xff0c;但是我们的窗口什么都没有&#xff0c;这节我们将 NativeActivity 创建的 window 绑定到 OpenGL 创建的 display 的 surface 上&#xff0c;并且绘制背景颜色和三角形&#xff0c;通过三角形绘制出一个矩形 显示系统创建 void Rende…

app专项性能测试测试指标和测试方法

工作中没有做过app这块的性能测试&#xff0c;但是你有面试问到过&#xff0c;现在做个总结如下 1、首次加载&#xff08;冷启动&#xff09; A.使用logcat监控activitymanager B.adb shell am start -w C.人工秒表计数 2、非第一启动&#xff08;热启动&#xff09; 应…

【开发心得】百度终于向百度发起挑战

没错&#xff0c;题目的确读起来别扭&#xff0c;但事实如此。 第一个百度&#xff0c;说的是百度的“文心千帆”。&#xff08;以下简称“千帆”&#xff09; 第二个百度&#xff0c;说的是百度的“搜索引擎”。&#xff08;以下简称“搜索”&#xff09; 这么讲就容易理解…

深入了解数据库的索引分类以及回表查询原理

索引的分类 在InnoDB存储引擎中的又可以分为以下两种 聚集索引的选取规则 如果有主键&#xff0c;主键索引就是聚集索引。 如果不存在主键&#xff0c;将会使用第一个唯一&#xff08;UNIQUE&#xff09;索引作为聚集索引 如果表没有主键&#xff0c;或者没有合适的唯一索引…

机场室内定位导航系统解决方案:提升旅客体验和机场运营效率

旅游业已开始回暖&#xff0c;作为旅客流动的核心节点&#xff0c;正逢暑假亲子出行&#xff0c;日益增长的客流量会对机场的运营和管理带来相当的冲击和挑战。 为提升旅客的出行体验和机场的优化运营管理效率&#xff0c;华安联大推荐采用机场室内定位导航系统解决方案。 基于…