Java | 一分钟掌握定时任务 | 6 - Quartz定时任务

news2024/11/18 18:32:58

作者:Mars酱

声明:本文章由Mars酱原创,部分内容来源于网络,如有疑问请联系本人。

转载:欢迎转载,转载前先请联系我!

前言

前几篇介绍了单体架构的定时任务解决方式,但是现代软件架构由于业务复杂度高,业务的耦合性太强,已经由单体架构拆分成了分布式架构。因此,定时任务的架构也随之修改。而Quartz是分布式定时任务解决方案中使用简单,结构清晰,且不依赖第三方分布式调度中间件的。上车,mars酱带你车里细说~

角色介绍

Quartz入门使用的角色不多,三个角色足够,分别是:

Scheduler:调度器。用来负责任务的调度;

Job:任务。这是一个接口,业务代码继承Job接口并实现它的execute方法,是业务执行的主体部分;

Trigger: 触发器。也是个接口,有两个触发器比较关键,一个是SimpleTrigger,另一个是CronTrigger。前者支持简单的定时,比如:按时、按秒等;后者直接支持cron表达式。下面我们从官方的源代码入手,看看Quartz如何做到分布式的。

官方例子

官方源代码down下来之后,有个examples文件夹:

example1是入门级中最简单的。就两个java文件,一个HelloJob:

package org.quartz.examples.example1;

import java.util.Date;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.quartz.Job;
import org.quartz.JobExecutionContext;
import org.quartz.JobExecutionException;

/**
 * <p>
 * This is just a simple job that says "Hello" to the world.
 * </p>
 * 
 * @author Bill Kratzer
 */
public class HelloJob implements Job {

    private static Logger _log = LoggerFactory.getLogger(HelloJob.class);

    /**
     * <p>
     * Empty constructor for job initilization
     * </p>
     * <p>
     * Quartz requires a public empty constructor so that the
     * scheduler can instantiate the class whenever it needs.
     * </p>
     */
    public HelloJob() {
    }

    /**
     * <p>
     * Called by the <code>{@link org.quartz.Scheduler}</code> when a
     * <code>{@link org.quartz.Trigger}</code> fires that is associated with
     * the <code>Job</code>.
     * </p>
     * 
     * @throws JobExecutionException
     *             if there is an exception while executing the job.
     */
    public void execute(JobExecutionContext context)
        throws JobExecutionException {

        // Say Hello to the World and display the date/time
        _log.info("Hello World! - " + new Date());
    }

}

另一个SimpleExample:

 package org.quartz.examples.example1;

 import org.quartz.JobDetail;
 import org.quartz.Scheduler;
 import org.quartz.SchedulerFactory;
 import org.quartz.Trigger;
 import org.quartz.impl.StdSchedulerFactory;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.util.Date;
 
 import static org.quartz.DateBuilder.evenMinuteDate;
 import static org.quartz.JobBuilder.newJob;
 import static org.quartz.TriggerBuilder.newTrigger;
 
 /**
  * This Example will demonstrate how to start and shutdown the Quartz scheduler and how to schedule a job to run in
  * Quartz.
  *
  * @author Bill Kratzer
  */
 public class SimpleExample {
 
     public void run() throws Exception {
         Logger log = LoggerFactory.getLogger(SimpleExample.class);
 
         log.info("------- Initializing ----------------------");
 
         // 1. 创建一个scheduler
         SchedulerFactory sf = new StdSchedulerFactory();
         Scheduler sched = sf.getScheduler();
 
         log.info("------- Initialization Complete -----------");
 
         // computer a time that is on the next round minute
         Date runTime = evenMinuteDate(new Date());
 
         log.info("------- Scheduling Job  -------------------");
 
         // 2. 指定一个job
         JobDetail job = newJob(HelloJob.class).withIdentity("job1", "group1").build();
 
         // 3. 指定一个trigger
         Trigger trigger = newTrigger().withIdentity("trigger1", "group1").startAt(runTime).build();
 
         // 4. 绑定job和trigger
         sched.scheduleJob(job, trigger);
         log.info(job.getKey() + " will run at: " + runTime);
 
         // 5. 执行
         sched.start();
 
         log.info("------- Started Scheduler -----------------");
 
         // wait long enough so that the scheduler as an opportunity to
         // run the job!
         log.info("------- Waiting 65 seconds... -------------");
         try {
             // wait 65 seconds to show job
             Thread.sleep(65L * 1000L);
             // executing...
         } catch (Exception e) {
             //
         }
 
         // shut down the scheduler
         log.info("------- Shutting Down ---------------------");
         sched.shutdown(true);
         log.info("------- Shutdown Complete -----------------");
     }
 
     public static void main(String[] args) throws Exception {
 
         SimpleExample example = new SimpleExample();
         example.run();
 
     }
 
 }

整个SimpleExample只有五个步骤:

  1. 创建Scheduler,这是一个调度器,例子中使用调度器工厂来创建一个调度器;
  2. 创建一个Job。实际上Job就是那个HelloJob,但是这里把HelloJob丢给了JobDetail对象,Job接口本身只有一个execute函数,没有其他的属性了,如果需要附加其他属性,JobDetail就支持,比如我们需要往Job中传递参数,JobDetail中提供了一个JobDataMap。当Job在运行的时候,execute函数里面的就能获取到JobDetail对象,并将设置的数据传递给Job接口的实现;
  3. 创建一个Trigger。Trigger对象主责是任务的执行时间,比如官方例子中的startAt函数,就指定了具体的运行时间,还有startNow(立即执行);
  4. 用scheduler绑定Job和Trigger;
  5. 执行scheduler。

Quartz的使用是不是简单又清晰?Job是任务,单一职责,不做任何其他事情。Trigger负责执行的频率等等属性。Scheduler负责按照Trigger的规则去执行Job的内容。各自部分的功能符合单一原则。

但是,到这里都不是分布式的方式,依然是单体架构的。那么,Quartz如何做到分布式的呢?

Quartz如何分布式?

Quartz的分布式实现方式并不依赖其他分布式协调管理中间件完成,而是使用数据锁来实现。使用数据做协调管理中间件的唯一的前提是:需要把集群的每台机器时间校对一致。

Quartz数据库核心表如下:

表名功能描述
QRTZ_CALENDARS存储Quartz的Calendar信息
QRTZ_CRON_TRIGGERS存储CronTrigger,包括Cron表达式和时区信息
QRTZ_FIRED_TRIGGERS存储与已触发的Trigger相关的状态信息,以及相联Job的执行信息
QRTZ_PAUSED_TRIGGER_GRPS存储已暂停的Trigger组的信息
QRTZ_SCHEDULER_STATE存储少量的有关Scheduler的状态信息,和别的Scheduler实例
QRTZ_LOCKS存储程序的悲观锁的信息
QRTZ_JOB_DETAILS存储每一个已配置的Job的详细信息
QRTZ_JOB_LISTENERS存储有关已配置的JobListener的信息
QRTZ_SIMPLE_TRIGGERS存储简单的Trigger,包括重复次数、间隔、以及已触的次数
QRTZ_BLOG_TRIGGERSTrigger作为Blob类型存储
QRTZ_TRIGGER_LISTENERS存储已配置的TriggerListener的信息
QRTZ_TRIGGERS存储已配置的Trigger的信息

字体加粗的QRTZ_LOCKS表是一个悲观锁的存储实现,Quartz认为每条记录都可能会产生并发冲突。以上表的SQL可以在quartz目录中找到:

找到自己喜欢的数据库品牌,并创建好表即可。

跟着官方例子看源码

我们从Hello的execute方法开始,反着找,继续看看分布式的方式如何实现。为什么反着找呢?因为这里是我们业务实现的主体内容,Quartz框架最终必须要调用到这个execute的具体实现的。我们找到调用execute的地方有很多处:

从类名来分析,DirectoryScanJob看着是目录扫描任务,FileScanJob直译是文件扫描任务,SendMailJob是发送邮件任务,最后只剩那个JobRunShell,毕竟翻译过来叫“任务运行の核心”啊。进入JobRunShell,找到调用execute函数的部分,execute函数被包裹在一个一百三十多行长又长的run函数中:

public void run() {
    qs.addInternalSchedulerListener(this);

    try {
        // ...省略很多源代码
        do {
            // ...省略很多源代码
            try {
                begin();
            } catch (SchedulerException se) {
                // ... 省略源代码
            }

            // ... 省略源代码

            try {
                log.debug("Calling execute on job " + jobDetail.getKey());
                // 这里负责执行job的execute函数
                job.execute(jec);
                endTime = System.currentTimeMillis();
            } catch (JobExecutionException jee) {
                // ... 省略源代码
            } catch (Throwable e) {
                // ... 省略源代码
            }

            // ...省略很多源代码

            try {
                complete(true);
            } catch (SchedulerException se) {
                // ... 省略源代码
            }

            // ...省略很多源代码
        } while (true);

    } finally {
        qs.removeInternalSchedulerListener(this);
    }
}

可以看到run中间的execute被夹在一个begin函数和comlete函数中,而begin和complete的实现是一个基于JTA事务的JTAJobRunShell的实现来完成的。JobRunShell是一个Runnable接口的实现,那么刚刚的run方法,必定在某处启用了线程(池)的start方法。

mars酱继续跟踪查找源代码,在QuartzSchedulerThread中的run函数中,找到JobRunShell的调用部分:

@Override
public void run() {
    int acquiresFailed = 0;
    while (!halted.get()) {
        // ...省略很多源代码

        // 源代码279行
        int availThreadCount = qsRsrcs.getThreadPool().blockForAvailableThreads();
        // ...省略很多源代码
        if(availThreadCount > 0) { 
            // ...省略很多源代码
            // 取下一个trigger,周期是30秒取一次
            triggers = qsRsrcs.getJobStore().acquireNextTriggers(
                now + idleWaitTime, Math.min(availThreadCount, qsRsrcs.getMaxBatchSize()), qsRsrcs.getBatchTimeWindow());

            // ...省略很多源代码

            // 触发trigger
            List<TriggerFiredResult> res = qsRsrcs.getJobStore().triggersFired(triggers);

            // ...省略很多源代码

            // 释放trigger,当bndle的结果是null就释放trigger
            if (bndle == null) {
                qsRsrcs.getJobStore().releaseAcquiredTrigger(triggers.get(i));
                continue;
            }

            // ...省略很多源代码
            JobRunShell shell = null;
            try {
                shell = qsRsrcs.getJobRunShellFactory().createJobRunShell(bndle);
                shell.initialize(qs);
            } catch (SchedulerException se) {
                qsRsrcs.getJobStore().triggeredJobComplete(triggers.get(i), bndle.getJobDetail(), CompletedExecutionInstruction.SET_ALL_JOB_TRIGGERS_ERROR);
                continue;
            }

            // 这里调用JobRunShell
            if (qsRsrcs.getThreadPool().runInThread(shell) == false) {
                // ...省略很多源代码
            }
        }  
    }
}

QuartzSchedulerThread的run函数就是核心处理流程了,qsRsrcs.getThreadPool().runInThread(shell)内部就根据具体的SimpleThreadPool或者ZeroSizeThreadPool来执行run函数,while循环基本就是不停的在轮询不断的去拿trigger,然后判断trigger的时间是不是到了,再按照时间trigger的时间规则执行Job,最后再标记为完成、释放trigger。

Trigger的处理

上面的逻辑都是通过qsRsrcs.getJobStore()得到的对象去处理Trigger的,返回对象是JobStore。任意查看qsRsrcs.getJobStore()调用的函数,比如:releaseAcquiredTriggerJobStore,它的实现有两个是比较重要的:一个是RAMJobStore,一个是JobStoreSupport。前者是RAM作为存储介质,作者还特意写上了这样一段注释:

This class implements a JobStore that utilizes RAM as its storage device.

As you should know, the ramification of this is that access is extrememly fast, but the data is completely volatile - therefore this JobStore should not be used if true persistence between program shutdowns is required.

这段英文的央视翻译:

这个类实现了一个使用RAM作为存储设备的JobStore。

您应该知道,这样做的后果是访问速度非常快,但是数据是完全不稳定的——因此,如果需要在程序关闭之间实现真正的持久性,则不应该使用这个JobStore。

而且内存存储也无法分布式处理吧?所以,mars酱选择了观看JobStoreSupport:

从import可以知道,这个玩意是连接了数据库的,所以呢,acquireNextTriggers、triggersFired、releaseAcquiredTrigger这些方法负责具体trigger的相关操作,都最终会执行到JobStoreSupport的第3844行的executeInNonManagedTXLock函数:

    /**
     * Execute the given callback having optionally acquired the given lock.
     * This uses the non-managed transaction connection.
     * 
     * @param lockName The name of the lock to acquire, for example
     * "TRIGGER_ACCESS".  If null, then no lock is acquired, but the
     * lockCallback is still executed in a non-managed transaction. 
     */
    protected <T> T executeInNonManagedTXLock(
            String lockName, 
            TransactionCallback<T> txCallback, final TransactionValidator<T> txValidator) throws JobPersistenceException {
        boolean transOwner = false;
        Connection conn = null;
        try {
            if (lockName != null) {
                // If we aren't using db locks, then delay getting DB connection 
                // until after acquiring the lock since it isn't needed.
                if (getLockHandler().requiresConnection()) {
                    conn = getNonManagedTXConnection();
                }
                
                transOwner = getLockHandler().obtainLock(conn, lockName);
            }
            
            if (conn == null) {
                conn = getNonManagedTXConnection();
            }
            
            final T result = txCallback.execute(conn);
            try {
                commitConnection(conn);
            } catch (JobPersistenceException e) {
                rollbackConnection(conn);
                if (txValidator == null || !retryExecuteInNonManagedTXLock(lockName, new TransactionCallback<Boolean>() {
                    @Override
                    public Boolean execute(Connection conn) throws JobPersistenceException {
                        return txValidator.validate(conn, result);
                    }
                })) {
                    throw e;
                }
            }

            Long sigTime = clearAndGetSignalSchedulingChangeOnTxCompletion();
            if(sigTime != null && sigTime >= 0) {
                signalSchedulingChangeImmediately(sigTime);
            }
            
            return result;
        } catch (JobPersistenceException e) {
            rollbackConnection(conn);
            throw e;
        } catch (RuntimeException e) {
            rollbackConnection(conn);
            throw new JobPersistenceException("Unexpected runtime exception: "
                    + e.getMessage(), e);
        } finally {
            try {
                releaseLock(lockName, transOwner);
            } finally {
                cleanupConnection(conn);
            }
        }
    }

整体的过程简要说明就是:获取数据库连接,给需要执行的trigger加锁,处理完之后再释放锁。

结合起来

结合前面的流程来看,一个调度器在执行前如果涉及到分布式的情况,流程如下:

  1. 首先要获取QUARTZ_LOCKS表中对应的锁(在executeInNonManagedTXLock函数的getLockHandler().obtainLock(conn, lockName)中);
  2. 获取锁后执行QuartzSchedulerThread中的JobRunShell,完成任务的执行;
  3. 最后QuartzSchedulerThread中调用triggeredJobComplete函数,锁被释放,在executeInNonManagedTXLock函数的releaseLock(lockName, transOwner)中执行;

集群中的每一个调度器实例都遵循这样的操作流程。

总结

Quartz 是一款用于分布式系统的高性能调度框架,它采用了数据库作为分布式锁机制来保证同一时刻只有一个 Scheduler 实例访问数据库中的 Trigger。

在 Quartz 中,调度器线程会争抢访问数据库中的 Trigger,以确保在同一时刻只有一个调度器线程执行某个 Trigger 的操作。如果有多个调度器线程同时尝试访问同一个 Trigger,它们会相互等待对方释放锁。当一个调度器线程获得了锁,它就可以访问数据库并执行相应的操作。

另外,Quartz 还采用了悲观锁的策略来避免死锁的发生。当一个调度器线程尝试获取锁时,如果锁已经被其他线程占用,那么这个线程会等待,直到有线程释放了锁。如果在等待过程中没有其他线程释放锁,那么这个线程就会一直等待下去,直到调度器重新分配了锁。

总之,Quartz 的分布式调度原理是通过数据库锁和悲观锁来实现的,以保证同一时刻只有一个调度器线程访问数据库中的 Trigger,从而提高系统的性能和可靠性。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/541775.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Mysql【基础篇】—— mysql安装和环境配置

Mysql【基础篇】—— mysql安装和环境配置&#x1f60e; Mysql 的概述Mysql下载安装和环境配置下载流程&#xff1a;Mysql启动&#xff1a;客户端连接方式一&#xff1a;使用MySQL提供的客户端命令行工具方式二&#xff1a;使用系统自带的命令行工具执行指令 总结撒花&#x1f…

tb-gateway配置OPC UA

1、安装模拟软件KEPServerEX 6 省略 2、配置OPC UA 安装好KEPServerEX 6之后,默认再电脑的最小化窗口会显示一个图标 右键点击图标,会显示一个OPC UA配置,然后点击配置,进入下面页面 点击添加按钮,弹出下面的弹窗 然后进行选择和配置,见下图,然后保存即可。 3、启动K…

【Linux】Linux编译器--vim的使用

&#x1f601;作者&#xff1a;日出等日落 &#x1f514;专栏&#xff1a;Linux 当你还不能对自己说今天学到了什么东西时&#xff0c;你就不要去睡觉。 ——利希顿堡 目录 vim是什么 vim安装 vim的基本概念 vim的基本操作 vim正常模式命令集 vim末行模…

R.I.P

0x01 这几天&#xff0c;陈皓老师&#xff08;网名&#xff1a;左耳朵耗子&#xff09;因心梗离世的消息相信大家也都看到了。 于我而言&#xff0c;震惊、难过之余&#xff0c;心里也是阵阵惋惜。 相信不少同学了解陈皓老师都是从他的个人博客酷壳CoolShell开始的。 同样&…

JavaWeb13-JavaScript 开发利器之 jQuery-02

1. jQuery 的 DOM 操作 1.1 查找节点, 修改属性 查找属性节点: 查找到所需要的元素之后, 可以调用 jQuery 对象的 attr() 方法来获取它的各种属性值 查找节点-应用实例 element-attribute.html <!DOCTYPE html> <html lang"en"> <head><met…

全网详细Django框架快速体验

一、安装Django (1)安装django命令 pip install django 二、命令行创建项目 执行命令创建项目 django-admin startproject 项目名称 如&#xff1a; django-admin startproject mysite 三、项目目录结构 mysite|----manage.py # 项目的管…

Nat. Mach. Intell 2023 | RT:首个统一分子性质预测(回归) 与条件生成的模型

原文标题&#xff1a;Regression Transformer enables concurrent sequence regression and generation for molecular language modelling 论文地址&#xff1a;Regression Transformer enables concurrent sequence regression and generation for molecular language model…

Servlet编程---Day 07

目录 一、过滤器概述 二、过滤器使用 &#xff08;一&#xff09;开发第一个过滤器 &#xff08;二&#xff09;过滤器的生命周期 &#xff08;三&#xff09;FilterChain(过滤器链) 1.过滤器链认识 2.过滤器链代码实现 3.过滤器链顺序 &#xff08;四&#xff09;请求…

【C++进阶】多态详解(上)

文章目录 一、多态的概念二、多态的定义及实现1.多态的构成条件2.虚函数3.虚函数的重写(1)虚函数重写概念(2)虚函数重写的两个例外&#xff1a;(3)析构函数是否要定义为虚函数(4)C11 override 和 final 三、抽象类1.概念2.接口继承和实现继承 四、多态的原理1.虚函数表2.多态的…

各种常见的word格式符号(回车字符、软回车、分页符等)

一、如何显示编辑符号 1、打开WORD-选项-显示&#xff0c;勾选格式标记 2、如下图所示。在【开始】一【段落】选项卡中&#xff0c;它可以帮助我们识别编辑过程中的格式符号。 二、各种常见的word格式符号 第一种 描述&#xff1a;向下的箭头 样式&#xff1a;↓ 名字&#xff…

Esbuild基本使用与插件开发

作为Vite的双引擎之一&#xff0c;Esbuild在很多关键的构建阶段(如依赖预编译、TS语法转译、代码压缩)让Vite获得了相当优异的性能&#xff0c;是Vite高性能的得力助手。无论是在Vite的配置项还是源码实现&#xff0c;都包含了不少Esbuild的基本概念和高阶用法。因此&#xff0…

idea线上debug

idea线上debug 1. 为什么需要线上debug2. 基本原理3.远程调试配置3.1 1. 融合云增加JVM参数3.2 idea配置 4. 注意附录 1. 为什么需要线上debug 在微服务开发中&#xff0c;开发的服务可能会依赖数据库、消息队列等资源&#xff0c;也有可能依赖其他的服务&#xff0c;这些服务…

PCB 布线技术~PCB 基础

PCB量测的单位 • PCB设计起源于美国&#xff0c;所以其常用单位是英制&#xff0c; 而非公制 – 版子的大小通常使用英尺 – 介质厚度&导体的长宽通常使用英尺及英寸 • 1 mil 0.001 inches • 1 mil .0254 mm – 导体的厚度常使用盎司(oz) • 一平方英尺金属的重量 •…

redis学习(十八) 部署redis哨兵模式

文章目录 前言一、搭建主从数据库二、搭建哨兵三、验证哨兵 前言 哨兵模式核心还是主从复制&#xff0c;只不过在相对于主从模式在主节点宕机导致不可写的情况下&#xff0c;多了一个竞选机制&#xff1a;在所有的从节点竞选出新的主节点。每一个哨兵都是一个独立的sentinel进…

PCB 布线技术~PCB结构:Traces,电源平面

PCB导体:Traces • 铜是PCB中最常用的导体 – 走线或连接器一般通过镀金来提供一个抗腐蚀的电传导特性 – 走线的宽度和长度-由PCB布线工程师控制 • 在通常的制造工艺下&#xff0c;走线的宽度和之间的间距一般要≥5 mil – 走线厚度-制造工艺的变量 • 典型值 0.5oz – 3oz •…

Linux---目录结构、绝对路径与相对路径、命令基础格式、ls命令

1. Linux的目录结构 Linux的目录结构是一个树型结构。 Windows 系统可以拥有多个盘符, 如 C盘、D盘、E盘。 Linux没有盘符这个概念, 只有一个根目录 /, 所有文件都在它下面。 在Linux系统中&#xff0c;路径之间的层级关系&#xff0c;使用&#xff1a;/ 来表示。 Linux只…

Inodb引擎 内存+磁盘+MVCC(多版本并发控制)

目录 逻辑存储结构 Innodb引擎内存结构介绍 Innodb引擎磁盘结构介绍 内存和磁盘交互 MVCC(多版本并发控制)原理 预备知识 mvcc基本概念 mvcc的具体实现 总的来说mvcc原理&#xff1a; 逻辑存储结构 Innodb引擎内存结构介绍 Buffer Pool(缓冲池) 缓冲池是内存的一个区域&am…

001 hive简介

一. hive概述 1. hive的产生背景 mapreduce程序大部分解决的问题是结构化数据&#xff0c;而解决结构化数据最佳方案是一条sql语句 hive出现的主要原因是解决mapreduce开发成本高的问题。但hive不能完全替代mr&#xff0c;只能处理mr中的结构化数据。 2. hive是什么 hive提…

【数据结构】常见数据结构汇总

文章目录 前言一、数组二、链表三、栈四、队列五、哈希表--散列表六、堆七、树八、图参考与感谢 前言 数据结构是计算机存储、组织数据的方式。一种好的数据结构可以带来更高的运行或者存储效率。数据在内存中是呈线性排列的&#xff0c;但是我们可以使用指针等道具&#xff0…

hive学习入门

第四章 HQL基础语法 Hive中的语句叫做HQL语句,是一种类似SQL的语句,基本上和SQL相同但是某些地方也是有很大的区别. 4.1 数据库操作 创建数据库 1.创建一个数据库,数据库在HDFS上的默认存储路径是/hive/warehouse/*.db。 create database hive01; 避免要创建的数据库已经存…