Quartz 调度原理与源码分析

news2024/11/18 1:37:54

文章目录

  • 一、Quartz基础
    • 1、入门案例
  • 二、获取调度器实例源码分析
    • 1、读取配置文件:initialize()
    • 2、初始化工作:instantiate()
      • (1)创建线程池(包工头)SimpleThreadPool
      • (2)WorkerThread(工人)
      • (3)创建调度线程QuartzScheduler(项目经理)
      • (4)拓展:JobRunShell 的作用
    • 3、线程模型总结
  • 三、绑定 JobDetail 和 Trigger
  • 四、启动调度器
    • 1、源码总结
  • 五、集群原理
    • 1、基于数据库解决资源竞争问题

一、Quartz基础

Quartz使用文档,使用Quartz实现动态任务,Spring集成Quartz,Quartz集群部署,Quartz源码分析

1、入门案例

Quartz重要步骤主要有三步:1、从SchedulerFactory获取Scheduler;2、Scheduler绑定JobDetail和Trigger;3、Scheduler开始执行任务。

我们从这三步,逐一分析Quartz是如何进行任务调度的。

// JobDetail
JobDetail jobDetail = JobBuilder.newJob(MyJob1.class)
	.withIdentity("job1", "group1") // 任务名 + 任务组 同一个组中包含许多任务
	.usingJobData("cxf","加油") // 添加额外自定义参数
	.usingJobData("moon",5.21F)
	.build();

// Trigger
Trigger trigger = TriggerBuilder.newTrigger()
	.withIdentity("trigger1", "group1") // 定义trigger名 + 组名
	.startNow()
	.withSchedule(SimpleScheduleBuilder.simpleSchedule() // 简单触发器
			.withIntervalInSeconds(2) // 2秒一次
			.repeatForever()) // 持续不断执行
	.build();

// SchedulerFactory
SchedulerFactory  factory = new StdSchedulerFactory();

// Scheduler 
Scheduler scheduler = factory.getScheduler();

// 绑定关系是1:N
scheduler.scheduleJob(jobDetail, trigger);
scheduler.start();

二、获取调度器实例源码分析

// org.quartz.impl.StdSchedulerFactory#getScheduler()
public Scheduler getScheduler() throws SchedulerException {
    if (cfg == null) {
    	// 读取 quartz.properties 配置文件 (见 1、)
        initialize();
    }

	// 这个类是一个 HashMap,用来基于调度器的名称保证调度器的唯一性
    SchedulerRepository schedRep = SchedulerRepository.getInstance();

    Scheduler sched = schedRep.lookup(getSchedulerName());
	// 如果调度器已经存在了
    if (sched != null) {
    	// 调度器关闭了,移除
        if (sched.isShutdown()) {
            schedRep.remove(getSchedulerName());
        } else {
        	// 返回调度器
            return sched;
        }
    }
	// 调度器不存在,初始化 (见 2、)
    sched = instantiate();

    return sched;
}

1、读取配置文件:initialize()

// org.quartz.impl.StdSchedulerFactory#initialize()
public void initialize() throws SchedulerException {
    // short-circuit if already initialized
    if (cfg != null) {
        return;
    }
    if (initException != null) {
        throw initException;
    }
	// 加载配置文件,默认配置<自定义配置
    String requestedFile = System.getProperty(PROPERTIES_FILE);
    String propFileName = requestedFile != null ? requestedFile
            : "quartz.properties";
    File propFile = new File(propFileName);

    Properties props = new Properties();

    InputStream in = null;

    try {
        if (propFile.exists()) {
            try {
                if (requestedFile != null) {
                    propSrc = "specified file: '" + requestedFile + "'";
                } else {
                    propSrc = "default file in current working dir: 'quartz.properties'";
                }

                in = new BufferedInputStream(new FileInputStream(propFileName));
                props.load(in);

            } catch (IOException ioe) {
                initException = new SchedulerException("Properties file: '"
                        + propFileName + "' could not be read.", ioe);
                throw initException;
            }
        } else if (requestedFile != null) {
            in =
                Thread.currentThread().getContextClassLoader().getResourceAsStream(requestedFile);

            if(in == null) {
                initException = new SchedulerException("Properties file: '"
                    + requestedFile + "' could not be found.");
                throw initException;
            }

            propSrc = "specified file: '" + requestedFile + "' in the class resource path.";

            in = new BufferedInputStream(in);
            try {
                props.load(in);
            } catch (IOException ioe) {
                initException = new SchedulerException("Properties file: '"
                        + requestedFile + "' could not be read.", ioe);
                throw initException;
            }

        } else {
            propSrc = "default resource file in Quartz package: 'quartz.properties'";

            ClassLoader cl = getClass().getClassLoader();
            if(cl == null)
                cl = findClassloader();
            if(cl == null)
                throw new SchedulerConfigException("Unable to find a class loader on the current thread or class.");

            in = cl.getResourceAsStream(
                    "quartz.properties");

            if (in == null) {
                in = cl.getResourceAsStream(
                        "/quartz.properties");
            }
            if (in == null) {
                in = cl.getResourceAsStream(
                        "org/quartz/quartz.properties");
            }
            if (in == null) {
                initException = new SchedulerException(
                        "Default quartz.properties not found in class path");
                throw initException;
            }
            try {
                props.load(in);
            } catch (IOException ioe) {
                initException = new SchedulerException(
                        "Resource properties file: 'org/quartz/quartz.properties' "
                                + "could not be read from the classpath.", ioe);
                throw initException;
            }
        }
    } finally {
        if(in != null) {
            try { in.close(); } catch(IOException ignore) { /* ignore */ }
        }
    }

    initialize(overrideWithSysProps(props));
}

2、初始化工作:instantiate()

instantiate()方法中做了初始化的所有工作:
instantiate()方法非常的长,初始化所有的关键组件。

// org.quartz.impl.StdSchedulerFactory#instantiate()
private Scheduler instantiate() throws SchedulerException {
	// 存储任务信息的 JobStore
	JobStore js = null;
	// 创建线程池,默认是 SimpleThreadPool
	ThreadPool tp = null;
	// 创建调度器
	QuartzScheduler qs = null;
	// 连接数据库的连接管理器
	DBConnectionManager dbMgr = null;
	// 自动生成 ID
	String instanceIdGeneratorClass = null;
	Properties tProps = null;
	String userTXLocation = null;
	boolean wrapJobInTx = false;
	boolean autoId = false;
	long idleWaitTime = -1;
	long dbFailureRetry = 15000L; // 15 secs
	String classLoadHelperClass;
	// jobFactory
	String jobFactoryClass;
	// 创建线程执行器,默认为 DefaultThreadExecutor
	ThreadExecutor threadExecutor;
	// ...

(1)创建线程池(包工头)SimpleThreadPool

instantiate() 创建了ThreadPool tp:

// 默认是配置文件中指定的 SimpleThreadPool。
String tpClass = cfg.getStringProperty(PROP_THREAD_POOL_CLASS, SimpleThreadPool.class.getName());

if (tpClass == null) {
    initException = new SchedulerException(
            "ThreadPool class not specified. ");
    throw initException;
}

try {
    tp = (ThreadPool) loadHelper.loadClass(tpClass).newInstance();
} catch (Exception e) {
    initException = new SchedulerException("ThreadPool class '"
            + tpClass + "' could not be instantiated.", e);
    throw initException;
}
tProps = cfg.getPropertyGroup(PROP_THREAD_POOL_PREFIX, true);
try {
    setBeanProps(tp, tProps);
} catch (Exception e) {
    initException = new SchedulerException("ThreadPool class '"
            + tpClass + "' props could not be configured.", e);
    throw initException;
}

SimpleThreadPool 里面维护了三个 list,分别存放所有的工作线程、空闲的工作线程和忙碌的工作线程。我们可以把 SimpleThreadPool 理解为包工头。

private List<WorkerThread> workers;
private LinkedList<WorkerThread> availWorkers = new LinkedList<WorkerThread>();
private LinkedList<WorkerThread> busyWorkers = new LinkedList<WorkerThread>();

SimpleThreadPool的runInThread()方法是线程池运行线程的接口方法。参数 Runnable 是执行的任务内容。取出 WorkerThread 去执行参数里面的 runnable(JobRunShell)。

WorkerThread wt = (WorkerThread)availWorkers.removeFirst();
busyWorkers.add(wt);
wt.run(runnable);

(2)WorkerThread(工人)

SimpleThreadPool中的List,存储的是WorkerThread,WorkerThread 是 SimpleThreadPool 的 内 部 类 , 用 来 执 行 任 务 。

我们把WorkerThread理解为工人。在WorkerThread的 run 方法中,执行传入的参数runnable任务:

// org.quartz.simpl.SimpleThreadPool.WorkerThread#run()
@Override
public void run() {
    boolean ran = false;
    
    while (run.get()) {
        try {
            synchronized(lock) {
                while (runnable == null && run.get()) {
                    lock.wait(500);
                }

                if (runnable != null) {
                    ran = true;
                    runnable.run();
                }
            }
        } catch (InterruptedException unblock) {
            // do nothing (loop will terminate if shutdown() was called
            try {
                getLog().error("Worker thread was interrupt()'ed.", unblock);
            } catch(Exception e) {
                // ignore to help with a tomcat glitch
            }
        } catch (Throwable exceptionInRunnable) {
            try {
                getLog().error("Error while executing the Runnable: ",
                    exceptionInRunnable);
            } catch(Exception e) {
                // ignore to help with a tomcat glitch
            }
        } finally {
            synchronized(lock) {
                runnable = null;
            }
            // repair the thread in case the runnable mucked it up...
            if(getPriority() != tp.getThreadPriority()) {
                setPriority(tp.getThreadPriority());
            }

            if (runOnce) {
                   run.set(false);
                clearFromBusyWorkersList(this);
            } else if(ran) {
                ran = false;
                makeAvailable(this);
            }

        }
    }

    //if (log.isDebugEnabled())
    try {
        getLog().debug("WorkerThread is shut down.");
    } catch(Exception e) {
        // ignore to help with a tomcat glitch
    }
}

(3)创建调度线程QuartzScheduler(项目经理)

qs = new QuartzScheduler(rsrcs, idleWaitTime, dbFailureRetry);

在 QuartzScheduler 的构造函数中,创建了 QuartzSchedulerThread,我们把它理解为项目经理,它会调用包工头的工人资源,给他们安排任务。
并 且 创 建 了 线 程 执 行 器 schedThreadExecutor , 执 行 了 这 个QuartzSchedulerThread,也就是调用了它的 run 方法。

// org.quartz.core.QuartzScheduler#QuartzScheduler
public QuartzScheduler(QuartzSchedulerResources resources, long idleWaitTime, @Deprecated long dbRetryInterval)
    throws SchedulerException {
    this.resources = resources;
    if (resources.getJobStore() instanceof JobListener) {
        addInternalJobListener((JobListener)resources.getJobStore());
    }
	// 创建一个线程,resouces 里面有线程名称
    this.schedThread = new QuartzSchedulerThread(this, resources);
    // 线程执行器
    ThreadExecutor schedThreadExecutor = resources.getThreadExecutor();
    //执行这个线程,也就是调用了线程的 run 方法
    schedThreadExecutor.execute(this.schedThread);
    if (idleWaitTime > 0) {
        this.schedThread.setIdleWaitTime(idleWaitTime);
    }

    jobMgr = new ExecutingJobsManager();
    addInternalJobListener(jobMgr);
    errLogger = new ErrorLogger();
    addInternalSchedulerListener(errLogger);

    signaler = new SchedulerSignalerImpl(this, this.schedThread);
    
    getLog().info("Quartz Scheduler v." + getVersion() + " created.");
}

点开 QuartzSchedulerThread 类,找到 run 方法,这个是 Quartz 任务调度的核心方法

// org.quartz.core.QuartzSchedulerThread#run
@Override
public void run() {
    int acquiresFailed = 0;
	// 检查 scheuler 是否为停止状态
    while (!halted.get()) {
        try {
            // check if we're supposed to pause...
            synchronized (sigLock) {
            	// 检查是否为暂停状态,初始是暂停状态,启用调度器时,会开始往下执行
                while (paused && !halted.get()) {
                    try {
                        // wait until togglePause(false) is called...
                        // 暂停的话会尝试去获得信号锁,并 wait 一会
                        sigLock.wait(1000L);
                    } catch (InterruptedException ignore) {
                    }

                    // reset failure counter when paused, so that we don't
                    // wait again after unpausing
                    acquiresFailed = 0;
                }

                if (halted.get()) {
                    break;
                }
            }

            // wait a bit, if reading from job store is consistently
            // failing (e.g. DB is down or restarting)..
            // 从 JobStore 获取 Job 持续失败,sleep 一下
            if (acquiresFailed > 1) {
                try {
                    long delay = computeDelayForRepeatedErrors(qsRsrcs.getJobStore(), acquiresFailed);
                    Thread.sleep(delay);
                } catch (Exception ignore) {
                }
            }
			// 从线程池获取可用的线程
            int availThreadCount = qsRsrcs.getThreadPool().blockForAvailableThreads();
            if(availThreadCount > 0) { // will always be true, due to semantics of blockForAvailableThreads...

                List<OperableTrigger> triggers;

                long now = System.currentTimeMillis();

                clearSignaledSchedulingChange();
                try {
                	// 获取需要下次执行的 triggers
					// idleWaitTime: 默认 30s
					// availThreadCount:获取可用(空闲)的工作线程数量,总会大于 1,因为该方法会一直阻塞,直到有工作线程空闲下来。
					// maxBatchSize:一次拉取 trigger 的最大数量,默认是 1
					// batchTimeWindow:时间窗口调节参数,默认是 0
					// misfireThreshold: 超过这个时间还未触发的 trigger,被认为发生了 misfire,默认 60s
					// 调度线程一次会拉取 NEXT_FIRETIME 小于(now + idleWaitTime +batchTimeWindow),大于(now - misfireThreshold)的,min(availThreadCount,maxBatchSize)个 triggers,默认情况下,会拉取未来 30s、过去 60s 之间还未 fire 的 1 个 trigger
                    triggers = qsRsrcs.getJobStore().acquireNextTriggers(
                            now + idleWaitTime, Math.min(availThreadCount, qsRsrcs.getMaxBatchSize()), qsRsrcs.getBatchTimeWindow());
                    acquiresFailed = 0;
                    if (log.isDebugEnabled())
                        log.debug("batch acquisition of " + (triggers == null ? 0 : triggers.size()) + " triggers");
                } catch (JobPersistenceException jpe) {
                    if (acquiresFailed == 0) {
                        qs.notifySchedulerListenersError(
                            "An error occurred while scanning for the next triggers to fire.",
                            jpe);
                    }
                    if (acquiresFailed < Integer.MAX_VALUE)
                        acquiresFailed++;
                    continue;
                } catch (RuntimeException e) {
                    if (acquiresFailed == 0) {
                        getLog().error("quartzSchedulerThreadLoop: RuntimeException "
                                +e.getMessage(), e);
                    }
                    if (acquiresFailed < Integer.MAX_VALUE)
                        acquiresFailed++;
                    continue;
                }

                if (triggers != null && !triggers.isEmpty()) {

                    now = System.currentTimeMillis();
                    long triggerTime = triggers.get(0).getNextFireTime().getTime();
                    long timeUntilTrigger = triggerTime - now;
                    while(timeUntilTrigger > 2) {
                        synchronized (sigLock) {
                            if (halted.get()) {
                                break;
                            }
                            if (!isCandidateNewTimeEarlierWithinReason(triggerTime, false)) {
                                try {
                                    // we could have blocked a long while
                                    // on 'synchronize', so we must recompute
                                    now = System.currentTimeMillis();
                                    timeUntilTrigger = triggerTime - now;
                                    if(timeUntilTrigger >= 1)
                                        sigLock.wait(timeUntilTrigger);
                                } catch (InterruptedException ignore) {
                                }
                            }
                        }
                        if(releaseIfScheduleChangedSignificantly(triggers, triggerTime)) {
                            break;
                        }
                        now = System.currentTimeMillis();
                        timeUntilTrigger = triggerTime - now;
                    }

                    // this happens if releaseIfScheduleChangedSignificantly decided to release triggers
                    if(triggers.isEmpty())
                        continue;

                    // set triggers to 'executing'
                    List<TriggerFiredResult> bndles = new ArrayList<TriggerFiredResult>();

                    boolean goAhead = true;
                    synchronized(sigLock) {
                        goAhead = !halted.get();
                    }
                    if(goAhead) {
                        try {
                        	// 触发 Trigger,把 ACQUIRED 状态改成 EXECUTING
							// 如果这个 trigger 的 NEXTFIRETIME 为空,也就是未来不再触发,就将其状态改为COMPLETE
							// 如果 trigger 不允许并发执行(即 Job 的实现类标注了@DisallowConcurrentExecution),则将状态变为 BLOCKED,否则就将状态改为 WAITING
                            List<TriggerFiredResult> res = qsRsrcs.getJobStore().triggersFired(triggers);
                            if(res != null)
                                bndles = res;
                        } catch (SchedulerException se) {
                            qs.notifySchedulerListenersError(
                                    "An error occurred while firing triggers '"
                                            + triggers + "'", se);
                            //QTZ-179 : a problem occurred interacting with the triggers from the db
                            //we release them and loop again
                            for (int i = 0; i < triggers.size(); i++) {
                                qsRsrcs.getJobStore().releaseAcquiredTrigger(triggers.get(i));
                            }
                            continue;
                        }

                    }
					// 循环处理 Trigger
                    for (int i = 0; i < bndles.size(); i++) {
                        TriggerFiredResult result =  bndles.get(i);
                        TriggerFiredBundle bndle =  result.getTriggerFiredBundle();
                        Exception exception = result.getException();

                        if (exception instanceof RuntimeException) {
                            getLog().error("RuntimeException while firing trigger " + triggers.get(i), exception);
                            qsRsrcs.getJobStore().releaseAcquiredTrigger(triggers.get(i));
                            continue;
                        }

                        // it's possible to get 'null' if the triggers was paused,
                        // blocked, or other similar occurrences that prevent it being
                        // fired at this time...  or if the scheduler was shutdown (halted)
                        if (bndle == null) {
                            qsRsrcs.getJobStore().releaseAcquiredTrigger(triggers.get(i));
                            continue;
                        }

                        JobRunShell shell = null;
                        try {
                        	// 根据 trigger 信息实例化 JobRunShell(implements Runnable),同时依据JOB_CLASS_NAME 实例化 Job,随后我们将 JobRunShell 实例丢入工作线。
                            shell = qsRsrcs.getJobRunShellFactory().createJobRunShell(bndle);
                            shell.initialize(qs);
                        } catch (SchedulerException se) {
                            qsRsrcs.getJobStore().triggeredJobComplete(triggers.get(i), bndle.getJobDetail(), CompletedExecutionInstruction.SET_ALL_JOB_TRIGGERS_ERROR);
                            continue;
                        }
						// 执行 JobRunShell 的 run 方法
                        if (qsRsrcs.getThreadPool().runInThread(shell) == false) {
                            // this case should never happen, as it is indicative of the
                            // scheduler being shutdown or a bug in the thread pool or
                            // a thread pool being used concurrently - which the docs
                            // say not to do...
                            getLog().error("ThreadPool.runInThread() return false!");
                            qsRsrcs.getJobStore().triggeredJobComplete(triggers.get(i), bndle.getJobDetail(), CompletedExecutionInstruction.SET_ALL_JOB_TRIGGERS_ERROR);
                        }

                    }

                    continue; // while (!halted)
                }
            } else { // if(availThreadCount > 0)
                // should never happen, if threadPool.blockForAvailableThreads() follows contract
                continue; // while (!halted)
            }

            long now = System.currentTimeMillis();
            long waitTime = now + getRandomizedIdleWaitTime();
            long timeUntilContinue = waitTime - now;
            synchronized(sigLock) {
                try {
                  if(!halted.get()) {
                    // QTZ-336 A job might have been completed in the mean time and we might have
                    // missed the scheduled changed signal by not waiting for the notify() yet
                    // Check that before waiting for too long in case this very job needs to be
                    // scheduled very soon
                    if (!isScheduleChanged()) {
                      sigLock.wait(timeUntilContinue);
                    }
                  }
                } catch (InterruptedException ignore) {
                }
            }

        } catch(RuntimeException re) {
            getLog().error("Runtime error occurred in main trigger firing loop.", re);
        }
    } // while (!halted)

    // drop references to scheduler stuff to aid garbage collection...
    qs = null;
    qsRsrcs = null;
}

(4)拓展:JobRunShell 的作用

JobRunShell instances are responsible for providing the ‘safe’ environment for Job s to run in, and for performing all of the work of executing the Job, catching ANY thrown exceptions, updating the Trigger with the Job’s completion code, etc.

A JobRunShell instance is created by a JobRunShellFactory on behalf of the QuartzSchedulerThread which then runs the shell in a thread from the configured ThreadPool when the scheduler determines that a Job has been triggered.

JobRunShell 用来为 Job 提供安全的运行环境的,执行 Job 中所有的作业,捕获运行中的异常,在任务执行完毕的时候更新 Trigger 状态,等等。

JobRunShell 实例是用 JobRunShellFactory 为 QuartzSchedulerThread 创建的,在调度器决定一个 Job 被触发的时候,它从线程池中取出一个线程来执行任务。

3、线程模型总结

SimpleThreadPool:包工头,管理所有 WorkerThread
WorkerThread:工人,把 Job 包装成 JobRunShell,执行
QuartSchedulerThread:项目经理,获取即将触发的 Trigger,从包工头出拿到worker,执行 Trigger 绑定的任务

三、绑定 JobDetail 和 Trigger

// org.quartz.impl.StdScheduler#scheduleJob(org.quartz.JobDetail, org.quartz.Trigger)
public Date scheduleJob(JobDetail jobDetail, Trigger trigger)
    throws SchedulerException {
    // 调用QuartzScheduler的scheduleJob方法
    return sched.scheduleJob(jobDetail, trigger);
}

// org.quartz.core.QuartzScheduler#scheduleJob(org.quartz.JobDetail, org.quartz.Trigger)
public Date scheduleJob(JobDetail jobDetail,
        Trigger trigger) throws SchedulerException {
    validateState();

    if (jobDetail == null) {
        throw new SchedulerException("JobDetail cannot be null");
    }
    
    if (trigger == null) {
        throw new SchedulerException("Trigger cannot be null");
    }
    
    if (jobDetail.getKey() == null) {
        throw new SchedulerException("Job's key cannot be null");
    }

    if (jobDetail.getJobClass() == null) {
        throw new SchedulerException("Job's class cannot be null");
    }
    
    OperableTrigger trig = (OperableTrigger)trigger;

    if (trigger.getJobKey() == null) {
        trig.setJobKey(jobDetail.getKey());
    } else if (!trigger.getJobKey().equals(jobDetail.getKey())) {
        throw new SchedulerException(
            "Trigger does not reference given job!");
    }

    trig.validate();

    Calendar cal = null;
    if (trigger.getCalendarName() != null) {
        cal = resources.getJobStore().retrieveCalendar(trigger.getCalendarName());
    }
    Date ft = trig.computeFirstFireTime(cal);

    if (ft == null) {
        throw new SchedulerException(
                "Based on configured schedule, the given trigger '" + trigger.getKey() + "' will never fire.");
    }
	// 存储 JobDetail 和 Trigger
    resources.getJobStore().storeJobAndTrigger(jobDetail, trig);
    // 通知相关的 Listener
    notifySchedulerListenersJobAdded(jobDetail);
    notifySchedulerThread(trigger.getNextFireTime().getTime());
    notifySchedulerListenersSchduled(trigger);

    return ft;
}

四、启动调度器

// org.quartz.impl.StdScheduler#start
public void start() throws SchedulerException {
	// QuartzScheduler的start方法
    sched.start();
}
// org.quartz.core.QuartzScheduler#start
public void start() throws SchedulerException {

    if (shuttingDown|| closed) {
        throw new SchedulerException(
                "The Scheduler cannot be restarted after shutdown() has been called.");
    }

    // QTZ-212 : calling new schedulerStarting() method on the listeners
    // right after entering start()
    // 通知监听器
    notifySchedulerListenersStarting();

    if (initialStart == null) {
        initialStart = new Date();
        this.resources.getJobStore().schedulerStarted();            
        startPlugins();
    } else {
        resources.getJobStore().schedulerResumed();
    }
	// 通知 QuartzSchedulerThread 不再等待,开始干活
    schedThread.togglePause(false);

    getLog().info(
            "Scheduler " + resources.getUniqueIdentifier() + " started.");
    // 通知监听器
    notifySchedulerListenersStarted();
}

1、源码总结

getScheduler 方法创建线程池 ThreadPool,创建调度器 QuartzScheduler,创建调度线程 QuartzSchedulerThread,调度线程初始处于暂停状态。

scheduleJob 将任务添加到 JobStore 中。

scheduler.start()方法激活调度器,QuartzSchedulerThread 从 timeTrriger 取出待触 发 的 任 务 , 并 包 装 成 TriggerFiredBundle , 然 后 由 JobRunShellFactory 创 建TriggerFiredBundle 的 执 行 线 程 JobRunShell , 调 度 执 行 通 过 线 程 池SimpleThreadPool 去执行 JobRunShell,而 JobRunShell 执行的就是任务类的 execute方法:job.execute(JobExecutionContext context)。

五、集群原理

基于数据库,如何实现任务的不重跑不漏跑?
问题 1:如果任务执行中的资源是“下一个即将触发的任务”,怎么基于数据库实现这个资源的竞争?
问题 2:怎么对数据的行加锁?

1、基于数据库解决资源竞争问题

上面我们分析到,QuartzSchedulerThread 的run方法是Quartz的核心,我们继续进行深入探索:

// 287行 获取下一个即将触发的 Trigger
// 调用 JobStoreSupport 的 acquireNextTriggers()方法
triggers = qsRsrcs.getJobStore().acquireNextTriggers(
// org.quartz.impl.jdbcjobstore.JobStoreSupport#acquireNextTriggers
public List<OperableTrigger> acquireNextTriggers(final long noLaterThan, final int maxCount, final long timeWindow)
    throws JobPersistenceException {
    
    String lockName;
    if(isAcquireTriggersWithinLock() || maxCount > 1) { 
        lockName = LOCK_TRIGGER_ACCESS;
    } else {
        lockName = null;
    }
    // 调用 JobStoreSupport.executeInNonManagedTXLock()方法
    return executeInNonManagedTXLock(lockName, 
            new TransactionCallback<List<OperableTrigger>>() {
                public List<OperableTrigger> execute(Connection conn) throws JobPersistenceException {
                    return acquireNextTrigger(conn, noLaterThan, maxCount, timeWindow);
                }
            },
            new TransactionValidator<List<OperableTrigger>>() {
                public Boolean validate(Connection conn, List<OperableTrigger> result) throws JobPersistenceException {
                    try {
                        List<FiredTriggerRecord> acquired = getDelegate().selectInstancesFiredTriggerRecords(conn, getInstanceId());
                        Set<String> fireInstanceIds = new HashSet<String>();
                        for (FiredTriggerRecord ft : acquired) {
                            fireInstanceIds.add(ft.getFireInstanceId());
                        }
                        for (OperableTrigger tr : result) {
                            if (fireInstanceIds.contains(tr.getFireInstanceId())) {
                                return true;
                            }
                        }
                        return false;
                    } catch (SQLException e) {
                        throw new JobPersistenceException("error validating trigger acquisition", e);
                    }
                }
            });
}
// org.quartz.impl.jdbcjobstore.JobStoreSupport#executeInNonManagedTXLock
protected <T> T executeInNonManagedTXLock(
        String lockName, 
        TransactionCallback<T> txCallback, final TransactionValidator<T> txValidator) throws JobPersistenceException {
    boolean transOwner = false;
    Connection conn = null;
    try {
        if (lockName != null) {
            // If we aren't using db locks, then delay getting DB connection 
            // until after acquiring the lock since it isn't needed.
            if (getLockHandler().requiresConnection()) {
                conn = getNonManagedTXConnection();
            }
            // 尝试获取锁 调用 DBSemaphore 的 obtainLock()方法
            transOwner = getLockHandler().obtainLock(conn, lockName);
        }
        
        if (conn == null) {
            conn = getNonManagedTXConnection();
        }
        
        final T result = txCallback.execute(conn);
        try {
            commitConnection(conn);
        } catch (JobPersistenceException e) {
            rollbackConnection(conn);
            if (txValidator == null || !retryExecuteInNonManagedTXLock(lockName, new TransactionCallback<Boolean>() {
                @Override
                public Boolean execute(Connection conn) throws JobPersistenceException {
                    return txValidator.validate(conn, result);
                }
            })) {
                throw e;
            }
        }

        Long sigTime = clearAndGetSignalSchedulingChangeOnTxCompletion();
        if(sigTime != null && sigTime >= 0) {
            signalSchedulingChangeImmediately(sigTime);
        }
        
        return result;
    } catch (JobPersistenceException e) {
        rollbackConnection(conn);
        throw e;
    } catch (RuntimeException e) {
        rollbackConnection(conn);
        throw new JobPersistenceException("Unexpected runtime exception: "
                + e.getMessage(), e);
    } finally {
        try {
            releaseLock(lockName, transOwner);
        } finally {
            cleanupConnection(conn);
        }
    }
}
// org.quartz.impl.jdbcjobstore.DBSemaphore#obtainLock
public boolean obtainLock(Connection conn, String lockName)
    throws LockException {

    if(log.isDebugEnabled()) {
        log.debug(
            "Lock '" + lockName + "' is desired by: "
                    + Thread.currentThread().getName());
    }
    if (!isLockOwner(lockName)) {
		// 调用 StdRowLockSemaphore 的 executeSQL()方法
		// 最终用 JDBC 执行 SQL,语句内容是 expandedSQL 和 expandedInsertSQL。
        executeSQL(conn, lockName, expandedSQL, expandedInsertSQL);
        
        if(log.isDebugEnabled()) {
            log.debug(
                "Lock '" + lockName + "' given to: "
                        + Thread.currentThread().getName());
        }
        getThreadLocks().add(lockName);
        //getThreadLocksObtainer().put(lockName, new
        // Exception("Obtainer..."));
    } else if(log.isDebugEnabled()) {
        log.debug(
            "Lock '" + lockName + "' Is already owned by: "
                    + Thread.currentThread().getName());
    }

    return true;
}

在 StdRowLockSemaphore 的构造函数中,把定义的两条 SQL 传进去:

public class StdRowLockSemaphore extends DBSemaphore {

    /*
     * ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
     * 
     * Constants.
     * 
     * ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
     */

    public static final String SELECT_FOR_LOCK = "SELECT * FROM "
            + TABLE_PREFIX_SUBST + TABLE_LOCKS + " WHERE " + COL_SCHEDULER_NAME + " = " + SCHED_NAME_SUBST
            + " AND " + COL_LOCK_NAME + " = ? FOR UPDATE";

    public static final String INSERT_LOCK = "INSERT INTO "
        + TABLE_PREFIX_SUBST + TABLE_LOCKS + "(" + COL_SCHEDULER_NAME + ", " + COL_LOCK_NAME + ") VALUES (" 
        + SCHED_NAME_SUBST + ", ?)"; 

最终执行的sql:

select * from QRTZ_LOCKS t where t.lock_name='TRIGGER_ACCESS' for update

在我们执行官方的建表脚本的时候,QRTZ_LOCKS 表,它会为每个调度器创建两行数据,获取 Trigger 和触发 Trigger 是两把锁:
在这里插入图片描述
在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/803237.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

十五章:使用类别峰值响应的弱监督实例分割

0.摘要 目前&#xff0c;使用图像级别标签而不是昂贵的像素级掩码进行弱监督实例分割的研究还未得到充分探索。本文通过利用类别峰值响应来实现一个分类网络&#xff0c;用于提取实例掩码&#xff0c;来解决这个具有挑战性的问题。只通过图像标签的监督下&#xff0c;完全卷积的…

winform 将resources资源文件反编译为resx文件

resources资源文件反编译为resx文件 【前景提要】 在日常工作中我们会遇到需要将一个编译后的winform程序反编译出其对应的源码&#xff0c;然而在常用的反编译工具中对于项目中使用的资源文件是编译为resources文件的&#xff0c;这个资源文件在反编译后的源码中是无法直接使用…

MES管理系统中设备管理功能的原理是什么

制造执行系统MES是一种应用于制造工厂的实际操作系统&#xff0c;它通过实时监控和控制生产流程&#xff0c;为生产过程提供全面的管理和优化。在MES管理系统解决方案中&#xff0c;设备管理功能是非常重要的一部分&#xff0c;它可以实现设备实时监控、故障预警、维护保养等功…

MyBatisPlus从入门到精通-2

接着上一讲的Mp的分页功能 下面我们讲解条件查询功能和其他功能 解决一下日志输出和banner问题 每次卞就会输出这些日志 很不美观&#xff0c;现在我们关闭一下 这样建个xml&#xff0c;文件名为logback.xml 文件内容改成这样 配置了logback但是里面什么都没写就不会说有日…

视频监控汇聚平台EasyCVR向上级联时,上级一直回复401是什么原因?

视频监控管理EasyCVR视频融合平台基于云边端一体化架构&#xff0c;可支持多协议、多类型设备接入&#xff0c;具体包括&#xff1a;NVR、IPC、视频编码器、无人机、车载设备、智能手持终端、移动执法仪等。平台具有强大的数据接入、处理及分发能力&#xff0c;可在复杂的网络环…

下载文件出错:org.apache.catalina.connector.ClientAbortException

解决方案 复现步骤&#xff1a; 浏览器调整下载速度后&#xff0c;超过1分钟的下载会自动断开&#xff0c;调整connectionTimeout后&#xff0c;问题解决。

前端Vue入门-day04-用vue实现组件通信

(创作不易&#xff0c;感谢有你&#xff0c;你的支持&#xff0c;就是我前行的最大动力&#xff0c;如果看完对你有帮助&#xff0c;请留下您的足迹&#xff09; 目录 组件的三大组成部分 注意点说明 组件的样式冲突 scoped data 是一个函数 组件通信 什么是组件通信 不…

图注意力网络论文详解和PyTorch实现

图神经网络(gnn)是一类功能强大的神经网络&#xff0c;它对图结构数据进行操作。它们通过从节点的局部邻域聚合信息来学习节点表示(嵌入)。这个概念在图表示学习文献中被称为“消息传递”。 消息(嵌入)通过多个GNN层在图中的节点之间传递。每个节点聚合来自其邻居的消息以更新其…

特殊矩阵的压缩存储

1 数组的存储结构 1.1 一维数组 各数组元素大小相同&#xff0c;且物理上连续存放。第i个元素的地址位置是&#xff1a;a[i] LOC i*sizeof(ElemType) (LOC为起始地址) 1.2 二维数组 对于多维数组有行优先、列优先的存储方法 行优先&#xff1a;先行后列&#xff0c;先存储…

C# 汇总区间

228 汇总区间 给定一个 无重复元素 的 有序 整数数组 nums 。 返回 恰好覆盖数组中所有数字 的 最小有序 区间范围列表 。也就是说&#xff0c;nums 的每个元素都恰好被某个区间范围所覆盖&#xff0c;并且不存在属于某个范围但不属于 nums 的数字 x 。 列表中的每个区间范围…

Mybatis中where 1=1 浅析

在一些集成mybatis的工程中经常看到where11 的代码&#xff0c;也有同事问我&#xff0c;这样写有什么用&#xff0c;下面对其进行简单的分析记录一下。 1、场景 看下面这样一段xml中的代码 <select id"queryBook" parameterType"com.platform.entity.Book…

【JavaEE】博客系统前后端交互

目录 一、准备工作 二、数据库的表设计 三、封装JDBC数据库操作 1、创建数据表对应的实体类 2、封装增删改查操作 四、前后端交互逻辑的实现 1、博客列表页 1.1、展示博客列表 1.2、博客详情页 1.3、登录页面 1.4、强制要求用户登录&#xff0c;检查用户的登录状态 …

Jenkins构建完成后发送消息至钉钉

钉钉群的最终效果&#xff1a; 1、jenkins安装DingTalk插件&#xff0c;安装完成后重启 2、配置钉钉插件 参考官网文档&#xff1a;快速开始 | 钉钉机器人插件 系统管理 拉到最下面&#xff0c;可以看到钉钉配置 按照如下配置钉钉机器人 配置完成可以点击测试按钮&#xff0…

css 书写规范!其他人总结!

CSS书写顺序 1.位置属性(position, top, right, z-index, display, float等) 2.大小(width, height, padding, margin) 3.文字系列(font, line-height, letter-spacing, color- text-align等) 4.背景(background, border等) 5.其他(animation, transition等) CSS书写规范 使用…

利用官网文档快速上手 Android 开发

官网学习链接&#xff1a;官网链接 官网教程

Vue结合echarts实现水滴图

效果展示 核心代码 <template><div id"cpu" style"width: 270px;height: 200px;"></div> </template><script>import * as echarts from echarts;export default {name: "show",methods:{aucDrawLine() {// 基于…

【Unity细节】关于拉进镜头场景后场景资源消失的问题的解决

&#x1f468;‍&#x1f4bb;个人主页&#xff1a;元宇宙-秩沅 hallo 欢迎 点赞&#x1f44d; 收藏⭐ 留言&#x1f4dd; 加关注✅! 本文由 秩沅 原创 收录于专栏&#xff1a;unity细节和bug ⭐关于拉进镜头场景资源消失的问题的解决⭐ 文章目录 ⭐关于拉进镜头场景资源消失…

什么是云原生和 CNCF?

一、CNCF简介 CNCF&#xff1a;全称Cloud Native Computing Foundation&#xff08;云原生计算基金会&#xff09;&#xff0c;成立于 2015 年 12 月 11 日&#xff0c;是一个开源软件基金会&#xff0c;它致力于云原生&#xff08;Cloud Native&#xff09;技术的普及和可持续…

MATLAB与ROS联合仿真——绘图类功能模块介绍

1、Drawing Trajectories &#xff08;1&#xff09;输入参数&#xff1a; xr和yr为小车在世界坐标系下当前的位置坐标&#xff0c;X和Y世界坐标系下当前目标点的位置坐标&#xff0c;Xall与Yall为完整的需要跟踪的轨迹&#xff0c;仅用来画图作为参考&#xff0c;与小车的控制…

【JavaWeb】渲染技术Jsp

&#x1f384;欢迎来到边境矢梦的csdn博文&#xff0c;本文主要讲解Java web中渲染技术 Jsp 的相关知识&#x1f384; &#x1f308;我是边境矢梦&#xff0c;一个正在为秋招和算法竞赛做准备的学生&#x1f308; &#x1f386;喜欢的朋友可以关注一下&#x1faf0;&#x1faf…