定时任务框架xxl-job及quartz

news2024/12/23 4:26:40

本文主要介绍分布式定时任务框架xxl-job,本文首先会对xxl-job做一个基本的介绍,接着将xxl-job与quartz做一个比较,最后就是介绍xxl-job调度的详细过程。
xxl-job官方文档

xxl-job的介绍
xxl-job是一个开源的分布式定时任务框架,其调度中心和执行器是相互分离,分开部署的,两者通过HTTP协议进行通信。其架构如下图所示:

调度中心:
负责管理调度信息,按照调度配置发出调度请求,自身不承担业务代码。调度系统与任务解耦,提高了系统可用性和稳定性,同时调度系统性能不再受限于任务模块;
支持可视化、简单且动态的管理调度信息,包括任务新建,更新,删除,GLUE开发和任务报警等,所有上述操作都会实时生效,同时支持监控调度结果以及执行日志,支持执行器Failover,支持创建执行器等功能。
执行模块(执行器):
负责接收调度请求并执行任务逻辑。任务模块专注于任务的执行等操作,开发和维护更加简单和高效;接收“调度中心”的执行请求、终止请求和日志请求等。

特性
xxl-job的特性有很多,官网上有详细的介绍,这里我会介绍几个重要的特性:

简单:支持通过Web页面对任务进行CRUD操作,操作简单,一分钟上手;
动态:支持动态修改任务状态、启动/停止任务,以及终止运行中的任务,都是即时生效的。
调度中心HA(中心式):调度采用中心式设计,“调度中心”自研调度组件并支持集群部署,可保证调度中心HA;
执行器HA(分布式):任务分布式执行,任务”执行器”支持集群部署,可保证任务执行HA;
调度过期策略:调度中心错过调度时间的补偿处理策略:包括:忽略,立即补偿触发一次等;
阻塞处理策略:调度过于密集执行器来不及处理时的处理策略,策略包括:单机串行(默认)、丢弃后续调度、覆盖之前的调用。
任务超时控制:支持自定义任务超时时间,任务运行超时将会主动中断任务;
xxl-job相关的数据表
xxl-job将任务信息以及日志信息持久化到数据表中,这个就保证了可以动态的添加删除任务。

xxl_job_lock:任务调度锁表,在线程查询任务信息时会调用上锁。
xxl_job_group:执行器信息表,维护任务执行器信息;
xxl_job_info:调度扩展信息表: 用于保存XXL-JOB调度任务的扩展信息,如任务分组、任务名、机器地址、执行器、执行入参和报警邮件等等;
xxl_job_log:调度日志表: 用于保存XXL-JOB任务调度的历史信息,如调度结果、执行结果、调度入参、调度机器和执行器等等;
xxl_job_log_report:调度日志报表:用户存储XXL-JOB任务调度日志的报表,调度中心报表功能页面会用到;
xxl_job_logglue:任务GLUE日志:用于保存GLUE更新历史,用于支持GLUE的版本回溯功能;
xxl_job_registry:执行器注册表,维护在线的执行器和调度中心机器地址信息;
xxl_job_user:系统用户表;
xxl-job与quartz的异同
这一部分主要是将quartz和xxl-job做一个比较,quartz是一款开源的使用非常广泛的定时任务框架。其可以说是定时任务的鼻祖,很多理念都与xxl-job类似。

综合比较

整体来说,xxl-job就是quartz的一个增强版,其弥补了quartz不支持并行调度,不支持失败处理策略和动态分片的策略等诸多不足,同时其有管理界面,上手比较容易,支持分布式,适用于分布式场景下的使用。两者相同的是都是通过数据库锁来控制任务不能重复执行。

核心类比较
quartz的核心类如下图所示:

类名    作用
QuartzSchedulerThread    负责执行向QuartzScheduler注册的触发Trigger的工作的线程
ThreadPool    Scheduler使用一个线程池作为任务运行的基础设施,任务通过共享线程池中的线程提供运行效率
QuartzSchedulerResources    包含创建QuartzScheduler实例所需的所有资源(JobStore,ThreadPool等)
SchedulerFactory    生成Scheduler实例
JobStore    通过类实现的接口,这些类要为org.quartz.core.QuartzScheduler的使用提供一个org.quartz.Job和org.quartz.Trigger存储机制。作业和触发器的存储应该以其名称和组的组合为唯一性。
QuartzScheduler    这是Quartz的核心,它是org.quartz.Scheduler接口的间接实现,包含调度org.quartz.Jobs,注册org.quartz.JobListener实例等的方法。
Scheduler    代表一个调度容器,一个调度容器中可以注册多个JobDetail和Trigger。当Trigger与JobDetail组合,就可以被Scheduler容器调度了。
Trigger    具有所有触发器通用属性的基本接口,描述了job执行的时间出发规则,使用TriggerBuilder实例化实际触发器,即表示什么时候去调用任务
JobDetail    表示一个具体的可执行的调度程序,Job是这个可执行的调度程序所要执行的内容,另外JobDetail还包含了这个任务调度的方案和策略
Job    表示一个工作,即要执行的具体内容
quartz中的类有很多,我们关注并掌握好Schedule(调度容器),Trigger(触发器),JobDetail&Job(定义具体的执行任务)这几个类就掌握了quartz的核心了。因为其余的类都是围绕这几个类转的,下图展示了各个核心类的调用关系:


quartz的调用示例:
public class RAMQuartz {
    public static void main(String[] args) throws SchedulerException {
   //1.创建Scheduler的工厂
        SchedulerFactory sf = new StdSchedulerFactory();
        //2.从工厂中获取调度器实例
        Scheduler scheduler = sf.getScheduler();
        //3.创建JobDetail
        JobDetail jobDetail = JobBuilder.newJob(RAMJob.class).withDescription("this is a ram job")
                .withIdentity("ramJob", "ramGroup").build();   //job的name和group
        // 4.任务运行的时间,SimpleScheduler类型触发器有效,3秒后启动
        long time = System.currentTimeMillis() + 3 * 1000L;
        Date startTime = new Date(time);
        // 5.创建Trigger
        CronTrigger cronTrigger = TriggerBuilder.newTrigger().withDescription("")
                .withIdentity("ramTrigger", "ramTriggerGroup")
                .startAt(startTime).withSchedule(CronScheduleBuilder.cronSchedule("0/10 * * * * ?")) //每10秒跑一次
                .build();
        // 6.注册任务和定时器
        scheduler.scheduleJob(jobDetail, cronTrigger);
        // 7.启动调度器
        scheduler.start();
        System.out.println("启动时间: " + new Date());
 }

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
其中RAMJob实现了Job接口,并重写了execute方法。

public class RAMJob implements Job {
    public void execute(JobExecutionContext context) throws JobExecutionException {
        System.out.println("Say hello to Quartz " + System.currentTimeMillis());
    }
}
1
2
3
4
5
xxl-job的核心类如下图所示:

类名    作用
XxlJobAdminConfig    调度中心的总配置类,负责创建XxlJobScheduler实例
XxlJobScheduler    负责创建各种线程,包括任务注册主线程,调度容器的主线程,以及调度参数的配置线程池JobTriggerPoolHelper
JobScheduleHelper    调度容器,创建一个守护线程查询所有下次执行时间在当前时间5秒内的定时任务,并按条件执行
JobTriggerPoolHelper    创建操作XxlJobTrigger的线程池,并添加trigger
XxlJobTrigger    表示一个调度参数的配置,会查询具体的定时任务信息XxlJobInfo
XxlJob    定义执行器的注解
JobThread    调用IJobHandler的executer执行任务,并回调调度中心
IJobHandler    抽象的执行器接口,定义了要执行的具体内容,同样的也是一个execute方法
EmbedServer    内嵌的Server,默认端口是9999
ExecutorBiz    其中的run方法用于调用执行器,有两个是实现类ExecutorBizImpl以及ExecutorBizClient 。
核心类的调用关系如下图所示:

从核心类我们可以看出xxl-job和quartz还是有很多相同点的,都有Scheduler,Trigger以及Job等几个核心的组件。不同之处是xxl-job把任务信息直接存储在了数据表中,而quartz是可以不存的。而且xxl-job调度和执行是分开的,而quartz调度和执行是在一块的。

xxl-job的调度过程
下图展示了调度中心调度执行器执行任务的时序图:


在XxlJobAdminConfig类的afterPropertiesSet方法中创建XxlJobScheduler实例
   @Override
    public void afterPropertiesSet() throws Exception {
        adminConfig = this;
        xxlJobScheduler = new XxlJobScheduler();
        xxlJobScheduler.init();
    }
1
2
3
4
5
6
在XxlJobScheduler类的init方法中初始化registry,schedule的主线程,并创建JobTriggerPool的线程池。
 public void init() throws Exception {
        //省略部分代码
       // admin registry monitor run
        JobRegistryMonitorHelper.getInstance().start();
        // admin trigger pool start
        JobTriggerPoolHelper.toStart();
        // start-schedule
        JobScheduleHelper.getInstance().start();

    }

1
2
3
4
5
6
7
8
9
10
11
JobScheduleHelper的start方法会创建一个新的线程,在该线程内会首先查询xxl_job_lock获取数据库锁,然后查询5秒内待执行的任务。当前时间大于任务下一次执行的时间,则会调用JobTriggerPoolHelper.trigger进行任务的执行。
下面代码展示了数据库锁的使用。
//上锁
try{
  conn = XxlJobAdminConfig.getAdminConfig().getDataSource().getConnection();
                        connAutoCommit = conn.getAutoCommit();
                        //取消事务自动提交
                        conn.setAutoCommit(false);
                        preparedStatement = conn.prepareStatement(  "select * from xxl_job_lock where lock_name = 'schedule_lock' for update" );
                        preparedStatement.execute();
    
     finally {
                // commit
                if (conn != null) {
                        //提交事务,释放锁
                        conn.commit();        
                        conn.setAutoCommit(connAutoCommit);
                    }    
                }

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
下面代码展示了定时任务的调用:

    public static final long PRE_READ_MS = 5000;    // pre read
  long nowTime = System.currentTimeMillis();
                          //查询任务下一次执行时间<当前时间+5秒的任务
                        List<XxlJobInfo> scheduleList = XxlJobAdminConfig.getAdminConfig().getXxlJobInfoDao().scheduleJobQuery(nowTime + PRE_READ_MS, preReadCount);
                         for (XxlJobInfo jobInfo: scheduleList) {
                              if (nowTime > jobInfo.getTriggerNextTime() + PRE_READ_MS) {
                                    // 2.1、trigger-expire > 5s:pass && make next-trigger-time,任务过期超过5秒,不在执行该任务,重新设置下一次执行时间
                                    // fresh next
                                    refreshNextValidTime(jobInfo, new Date());
                                } else if (nowTime > jobInfo.getTriggerNextTime()) {
                                    // 2.2、trigger-expire < 5s:direct-trigger && make next-trigger-time,任务过期<5秒,立即执行任务
                                    // 1、trigger
                                    JobTriggerPoolHelper.trigger(jobInfo.getId(), TriggerTypeEnum.CRON, -1, null, null, null);
                                    // 2、fresh next
                                    refreshNextValidTime(jobInfo, new Date());
                                    //省略部分代码
                       }


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
JobTriggerPoolHelper.trigger这个方法是通过第二步创建的线程池处理,将任务转给XxlJobTrigger.trigger方法。
 ThreadPoolExecutor triggerPool_ = fastTriggerPool;
  triggerPool_.execute(new Runnable() {
            // do trigger
                    XxlJobTrigger.trigger(jobId, triggerType, failRetryCount, executorShardingParam, executorParam, addressList);
  }
1
2
3
4
5
XxlJobTrigger.trigger这个方法首先根据jobId查询任务信息,接着根据jobGroup查询执行器信息,接着就是组装trigger-param,初始化address信息,最后就是调用ExecutorBizClient的run方法
  // load data,加载任务信息
        XxlJobInfo jobInfo = XxlJobAdminConfig.getAdminConfig().getXxlJobInfoDao().loadById(jobId);
  if (executorParam != null) {
            jobInfo.setExecutorParam(executorParam);
        }
                processTrigger(group, jobInfo, finalFailRetryCount, triggerType, shardingParam[0], shardingParam[1]);
1
2
3
4
5
6
核心逻辑在processTrigger中。

//初始化trigger-param
  TriggerParam triggerParam = new TriggerParam();
        triggerParam.setJobId(jobInfo.getId());
//初始化地址
                routeAddressResult = executorRouteStrategyEnum.getRouter().route(triggerParam, group.getRegistryList());
//执行任务
 triggerResult = runExecutor(triggerParam, address);
 //日志处理,代码省略
1
2
3
4
5
6
7
8
ExecutorBizClient的run方法通过Netty Http调用EmbedServer的process方法
EmbedServer类的内部类EmbedHttpServerHandler的process方法会调用ExecutorBizImpl类的run方法。
 private Object process(HttpMethod httpMethod, String uri, String requestData, String accessTokenReq){
     else if ("/run".equals(uri)) {
                    TriggerParam triggerParam = GsonTool.fromJson(requestData, TriggerParam.class);
                    return executorBiz.run(triggerParam);
                }
 }
1
2
3
4
5
6
ExecutorBizImpl类的run方法首先会创建JobThread,然后将任务放入triggerQueue(LinkedBlockingQueue)队列中,最后启动JobThread
public ReturnT<String> run(TriggerParam triggerParam) {
     JobThread jobThread = XxlJobExecutor.loadJobThread(triggerParam.getJobId());
        IJobHandler jobHandler = jobThread!=null?jobThread.getHandler():null;
            // push data to queue
        ReturnT<String> pushResult = jobThread.pushTriggerQueue(triggerParam);
        return pushResult;
}
1
2
3
4
5
6
7
JobThread线程首先从triggerQueue中poll中任务,然后通过反射的话获取IJobHandler,调用其execute方法执行具体的任务。
    public void run() {
            //通过反射的方式获取执行器的方法
            handler.init();
            //从队列中取出任务
            triggerParam = triggerQueue.poll(3L, TimeUnit.SECONDS);
            FutureTask<ReturnT<String>> futureTask = new FutureTask<ReturnT<String>>(new Callable<ReturnT<String>>() {
                                @Override
                                public ReturnT<String> call() throws Exception {
                                    return 
                                    //执行任务 
                                    handler.execute(triggerParamTmp.getExecutorParams());
                                }
                            });
    }
1
2
3
4
5
6
7
8
9
10
11
12
13
14
任务执行完成之后,将执行结果放入回调的队列callBackQueue中。
总结
本文首先对xxl-job做一个基本的介绍,接着将xxl-job与quartz做一个比较,最后就是介绍xxl-job调度过程做了一个详细介绍,xxl-job是一个上手很容易,适用于分布式场景下使用,调度中心和执行器分开部署,减少了系统的耦合以及调度中心的调度效率。最重要的是xxl-job对任务的过期处理以及阻塞处理策略设计的比较好。
 

文章已收录Github精选,欢迎Star:https://github.com/yehongzhi/learningSummary

一、概述

在平时的业务场景中,经常有一些场景需要使用定时任务,比如:

时间驱动的场景:某个时间点发送优惠券,发送短信等等。批量处理数据:批量统计上个月的账单,统计上个月销售数据等等。固定频率的场景:每隔5分钟需要执行一次。所以定时任务在平时开发中并不少见,而且对于现在快速消费的时代,每天都需要发送各种推送,消息都需要依赖定时任务去完成,应用非常广泛。

二、为什么需要任务调度平台

在Java中,传统的定时任务实现方案,比如Timer,Quartz等都或多或少存在一些问题:

不支持集群、不支持统计、没有管理平台、没有失败报警、没有监控等等而且在现在分布式的架构中,有一些场景需要分布式任务调度:

同一个服务多个实例的任务存在互斥时,需要统一的调度。任务调度需要支持高可用、监控、故障告警。需要统一管理和追踪各个服务节点任务调度的结果,需要记录保存任务属性信息等。显然传统的定时任务已经不满足现在的分布式架构,所以需要一个分布式任务调度平台,目前比较主流的是elasticjob和xxl-job。

elasticjob由当当网开源,目前github有6.5k的Star,使用的公司在官网登记有76家。

跟xxl-job不同的是,

elasticjob是采用zookeeper实现分布式协调

,实现任务高可用以及分片。

三、为什么选择XXL-JOB

实际上更多公司选择xxl-job,目前xxl-job的github上有15.7k个star,登记公司有348个。毫无疑问elasticjob和xxl-job都是非常优秀的技术框架,接下来我们进一步对比讨论,探索一下为什么更多公司会选择xxl-job。

首先先介绍一下xxl-job,这是出自大众点评许雪里(xxl就是作者名字的拼音首字母)的开源项目,官网上介绍这是一个轻量级分布式任务调度框架,其核心设计目标是开发迅速、学习简单、轻量级、易扩展。跟elasticjob不同,xxl-job环境依赖于mysql,不用ZooKeeper,这也是最大的不同。

elasticjob的初衷是为了面对高并发复杂的业务,即使是在业务量大,服务器多的时候也能做好任务调度,尽可能的利用服务器的资源。使用ZooKeeper使其具有高可用、一致性的,而且还具有良好的扩展性。官网上写

elasticjob是无中心化的,通过ZooKeeper的选举机制选举出主服务器,如果主服务器挂了,会重新选举新的主服务器。因此elasticjob具有良好的扩展性和可用性,但是使用和运维有一定的复杂

xxl-job则相反,是通过一个中心式的调度平台,调度多个执行器执行任务,调度中心通过DB锁保证集群分布式调度的一致性,这样扩展执行器会增大DB的压力,但是如果实际上这里数据库只是负责任务的调度执行。但是如果没有大量的执行器的话和任务的情况,是不会造成数据库压力的。实际上大部分公司任务数,执行器并不多(虽然面试经常会问一些高并发的问题)。

相对来说,xxl-job中心式的调度平台轻量级,开箱即用,操作简易,上手快,与SpringBoot有非常好的集成,而且监控界面就集成在调度中心,界面又简洁,对于企业维护起来成本不高,还有失败的邮件告警等等。这就使很多企业选择xxl-job做调度平台。

四、安装

4.1 拉取源码

搭建xxl-job很简单,有docker拉取镜像部署和源码编译两种方式,docker部署的方式比较简单,我就讲源码编译的方式。首先到github拉取xxl-job源码到本地。

4.2 导入IDEA

拉取源码下来后,可以看到项目结构,如下:

导入到IDEA,配置一下Maven,下载相关的jar包,稍等一下后,就可以看到这样的项目:

4.3 初始化数据库

前面讲过xxl-job需要依赖mysql,所以需要初始化数据库,在xxl-jobdocdb路径下找到tables_xxl_job.sql文件。在mysql上运行sql文件。

4.4 配置文件

接着就改一下配置文件,在admin项目下找到application.properties文件。

### 调度中心JDBC链接spring.datasource.url=jdbc:mysql://127.0.0.1:3306/xxl_job?useUnicode=true&characterEncoding=UTF-8&autoReconnect=true&serverTimezone=Asia/Shanghaispring.datasource.username=rootspring.datasource.password=spring.datasource.driver-class-name=com.mysql.jdbc.Driver### 报警邮箱spring.mail.host=smtp.qq.comspring.mail.port=25spring.mail.username=xxx@qq.comspring.mail.password=xxxspring.mail.properties.mail.smtp.auth=truespring.mail.properties.mail.smtp.starttls.enable=truespring.mail.properties.mail.smtp.starttls.required=truespring.mail.properties.mail.smtp.socketFactory.class=javax.net.ssl.SSLSocketFactory### 调度中心通讯TOKEN [选填]:非空时启用;xxl.job.accessToken=### 调度中心国际化配置 [必填]: 默认为 "zh_CN"/中文简体, 可选范围为 "zh_CN"/中文简体, "zh_TC"/中文繁体 and "en"/英文;xxl.job.i18n=zh_CN## 调度线程池最大线程配置【必填】xxl.job.triggerpool.fast.max=200xxl.job.triggerpool.slow.max=100### 调度中心日志表数据保存天数 [必填]:过期日志自动清理;限制大于等于7时生效,否则, 如-1,关闭自动清理功能;xxl.job.logretentiondays=10

4.5 编译运行

简单一点直接跑admin项目的main方法启动也行。

如果部署在服务器呢,那我们需要打包成jar包,在IDEA利用Maven插件打包。

然后在xxl-jobxxl-job-admintarget路径下,找到jar包。

然后就得到jar包了,使用java-jar命令就可以启动了。

到这里就已经完成了!打开浏览器,输入

http://localhost:8080/xxl-job-admin

进入管理页面。默认账号/密码:admin/123456。

五、永远的HelloWord

部署了调度中心之后,需要往调度中心注册执行器,添加调度任务。接下来就参考xxl-job写一个简单的例子。

首先创建一个SpringBoot项目,名字叫"xxljob-demo",添加依赖。

<dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter</artifactId></dependency><dependency><!-- 官网的demo是2.2.1,中央maven仓库还没有,所以就用2.2.0 --><groupId>com.xuxueli</groupId><artifactId>xxl-job-core</artifactId><version>2.2.0</version></dependency></dependencies>

接着修改application.properties。

# web portserver.port=8081# log configlogging.config=classpath:logback.xmlspring.application.name=xxljob-demo### 调度中心部署跟地址 [选填]:如调度中心集群部署存在多个地址则用逗号分隔。执行器将会使用该地址进行"执行器心跳注册"和"任务结果回调";为空则关闭自动注册;xxl.job.admin.addresses=http://127.0.0.1:8080/xxl-job-admin### 执行器通讯TOKEN [选填]:非空时启用;xxl.job.accessToken=### 执行器AppName [选填]:执行器心跳注册分组依据;为空则关闭自动注册xxl.job.executor.appname=xxl-job-demo### 执行器注册 [选填]:优先使用该配置作为注册地址,为空时使用内嵌服务 ”IP:PORT“ 作为注册地址。从而更灵活的支持容器类型执行器动态IP和动态映射端口问题。xxl.job.executor.address=### 执行器IP [选填]:默认为空表示自动获取IP,多网卡时可手动设置指定IP,该IP不会绑定Host仅作为通讯实用;地址信息用于 "执行器注册" 和 "调度中心请求并触发任务";xxl.job.executor.ip=### 执行器端口号 [选填]:小于等于0则自动获取;默认端口为9999,单机部署多个执行器时,注意要配置不同执行器端口;xxl.job.executor.port=9999### 执行器运行日志文件存储磁盘路径 [选填] :需要对该路径拥有读写权限;为空则使用默认路径;xxl.job.executor.logpath=/data/applogs/xxl-job/jobhandler### 执行器日志文件保存天数 [选填] : 过期日志自动清理, 限制值大于等于3时生效; 否则, 如-1, 关闭自动清理功能;xxl.job.executor.logretentiondays=10

接着写一个配置类XxlJobConfig。

@ConfigurationpublicclassXxlJobConfig{ private Logger logger = LoggerFactory.getLogger(XxlJobConfig.class); @Value("${xxl.job.admin.addresses}") private String adminAddresses; @Value("${xxl.job.accessToken}") private String accessToken; @Value("${xxl.job.executor.appname}") private String appname; @Value("${xxl.job.executor.address}") private String address; @Value("${xxl.job.executor.ip}") private String ip; @Value("${xxl.job.executor.port}") privateint port; @Value("${xxl.job.executor.logpath}") private String logPath; @Value("${xxl.job.executor.logretentiondays}") privateint logRetentionDays; @Beanpublic XxlJobSpringExecutor xxlJobExecutor(){ logger.info(">>>>>>>>>>> xxl-job config init."); XxlJobSpringExecutor xxlJobSpringExecutor = new XxlJobSpringExecutor(); xxlJobSpringExecutor.setAdminAddresses(adminAddresses); xxlJobSpringExecutor.setAppname(appname); xxlJobSpringExecutor.setAddress(address); xxlJobSpringExecutor.setIp(ip); xxlJobSpringExecutor.setPort(port); xxlJobSpringExecutor.setAccessToken(accessToken); xxlJobSpringExecutor.setLogPath(logPath); xxlJobSpringExecutor.setLogRetentionDays(logRetentionDays); return xxlJobSpringExecutor; }}

接着编写一个任务类XxlJobDemoHandler,使用Bean模式。

@ComponentpublicclassXxlJobDemoHandler{ /** * Bean模式,一个方法为一个任务 * 1、在Spring Bean实例中,开发Job方法,方式格式要求为 "public ReturnT<String> execute(String param)" * 2、为Job方法添加注解 "@XxlJob(value="自定义jobhandler名称", init = "JobHandler初始化方法", destroy = "JobHandler销毁方法")",注解value值对应的是调度中心新建任务的JobHandler属性的值。 * 3、执行日志:需要通过 "XxlJobLogger.log" 打印执行日志; */@XxlJob("demoJobHandler") public ReturnT<String> demoJobHandler(String param)throws Exception { XxlJobLogger.log("java, Hello World~~~"); XxlJobLogger.log("param:" + param); return ReturnT.SUCCESS; }}

在resources目录下,添加logback.xml文件。

<?xml version="1.0" encoding="UTF-8"?><configurationdebug="false"scan="true"scanPeriod="1 seconds"><contextName>logback</contextName><propertyname="log.path"value="/data/applogs/xxl-job/xxl-job-executor-sample-springboot.log"/><appendername="console"class="ch.qos.logback.core.ConsoleAppender"><encoder><pattern>%d{HH:mm:ss.SSS} %contextName [%thread] %-5level %logger{36} - %msg%n</pattern></encoder></appender><appendername="file"class="ch.qos.logback.core.rolling.RollingFileAppender"><file>${log.path}</file><rollingPolicyclass="ch.qos.logback.core.rolling.TimeBasedRollingPolicy"><fileNamePattern>${log.path}.%d{yyyy-MM-dd}.zip</fileNamePattern></rollingPolicy><encoder><pattern>%date %level [%thread] %logger{36} [%file : %line] %msg%n </pattern></encoder></appender><rootlevel="info"><appender-refref="console"/><appender-refref="file"/></root></configuration>

写完之后启动服务,然后可以打开管理界面,找到执行器管理,添加执行器。

接着到任务管理,添加任务。


 

最后我们可以到任务管理去测试一下,运行demoJobHandler。


 

点击保存后,会立即执行。点击查看日志,可以看到任务执行的历史日志记录。

打开刚刚执行的执行日志,我们可以看到,运行成功。

这就是简单的Demo演示,非常简单,上手也快。

六、谈谈架构设计

下面简单地说一下xxl-job的架构,我们先看官网提供的一张架构图来分析。

从架构图可以看出,分别有调度中心和执行器两大组成部分

调度中心。负责管理调度信息,按照调度配置发出调度请求,自身不承担业务代码。支持可视化界面,可以在调度中心对任务进行新增,更新,删除,会实时生效。支持监控调度结果,查看执行日志,查看调度任务统计报表,任务失败告警等等。执行器。负责接收调度请求,执行调度任务的业务逻辑。执行器启动后需要注册到调度中心。接收调度中心的发出的执行请求,终止请求,日志请求等等。接下来我们看一下xxl-job的工作原理。

任务执行器根据配置的调度中心的地址,自动注册到调度中心。达到任务触发条件,调度中心下发任务。执行器基于线程池执行任务,并把执行结果放入内存队列中、把执行日志写入日志文件中。执行器的回调线程消费内存队列中的执行结果,主动上报给调度中心。当用户在调度中心查看任务日志,调度中心请求任务执行器,任务执行器读取任务日志文件并返回日志详情。絮叨

看完以上的内容,基本算入门了。实际上,xxl-job还有很多功能,要深入学习,还需要到官网去研究探索。最好就是自己在本地搭建一个xxl-job来玩玩,动手实践是学得最快的学习方式。


XXL-Job 适配 Postgre 数据库的完整流程

背景

开源任务调度框架 XXL-Job 默认是基于 MySQL 数据库开发的,好在它是使用了 MyBatis ,要支持其他数据库,需要改 Mapping 映射文件,建表语句换成其他数据库的。

主要差异表现在四个方面:

  1. 建表语句中的主键自增
  2. Mapping 中的转义符号
  3. 分页查询语句
  4. 时间函数

本文介绍 XXL-Job 连接 Postgre 数据库的改造过程。

数据发送

第一步,将官方提供的 MySQL 建表语句通过 Navicat 导入 MySQL ,这样可以保证对应版本的建表语句的一致性。

第二步,选中 MySQL 连接下的 xxl-job 数据库,右键“数据传输” ,使用 Navicat 的数据传输功能,将 xxl-job 数据库的表结构传输给对应的 Postgre 连接:
在这里插入图片描述
等待完成,这个数据传输会完成创建 SEQUENCE 和表以及基础数据插入的工作,但是不会为自增主键设置默认值

第三步,在 Postgre 连接下对应的数据库中,创建查询语句,修改所有用到自增主键的表,设置 default 值为自增。

ALTER TABLE "public"."xxl_job_user" alter column ID set default nextval('xxl_job_user_id_seq'::regclass);
ALTER TABLE "public"."xxl_job_info" alter column ID set default nextval('xxl_job_info_id_seq'::regclass);
ALTER TABLE "public"."xxl_job_log" alter column ID set default nextval('xxl_job_log_id_seq'::regclass);
ALTER TABLE "public"."xxl_job_log_report" alter column ID set default nextval('xxl_job_log_report_id_seq'::regclass);
ALTER TABLE "public"."xxl_job_logglue" alter column ID set default nextval('xxl_job_logglue_id_seq'::regclass);
ALTER TABLE "public"."xxl_job_registry" alter column ID set default nextval('xxl_job_registry_id_seq'::regclass);
ALTER TABLE "public"."xxl_job_group" alter column ID set default nextval('xxl_job_group_id_seq'::regclass);

第四步,进入 Postgre 数据库中,右键设计每个表,对 int 类型且 Not Null 的字段设置默认值为 0 。这一步非常关键,因为日志表有几个字段入库时未传值,轮询后更新的,没有默认值,插入 SQL 无法通过 Not Null 的检查而异常。

MyBatisMapping 调整

到 mybatis-mapper 文件夹下,修改各个 Mapper 文件,使其适配 Postgre 语法,主要有三个地方:

  1. 去掉转义符 ` ,直接用空格替换
  2. 修改 LIMIT #{offset}, #{pagesize} 为 LIMIT #{pagesize} OFFSET #{offset} ,涉及到查询的地方都需要调整
  3. 时间函数,如:XxlJobRegistryMapper.xml 文件中 findAll 和 findDead 查询语句,DATE_ADD(#{nowTime},INTERVAL -#{timeout} SECOND) 修改为 ((select NOW())-INTERVAL '${timeout} S') 。

数据库连接配置

第一步,pom.xml 中添加 Postgre 驱动,maven 官网找最一个较高的版本。

<dependency><groupId>org.postgresql</groupId><artifactId>postgresql</artifactId><version>42.2.19</version></dependency>

第二步,修改 admin 模块下的 application.properties 文件,数据库连接配置信息改为 Postgre 。

### xxl-job, datasource
spring.datasource.url=jdbc:postgresql://IP:5432/xxl-job
spring.datasource.username=username
spring.datasource.password=password
spring.datasource.driver-class-name=org.postgresql.Driver

检查一遍所有的配置,然后在本机直接启动测试。

启示录

本来以为这个开源框架应该是支持所有类型数据库的呢,验证其他数据库的适配情况时,才发现官网 doc 目录下数据库只有一个 MySQL ,看 issues 说适配需要自己改 mapper 映射文件。

好在它不像 Azkaban 那样把 SQL 集成在代码中,所以就自己改造一下,还是比较顺利的,汇总成此文,希望给需要改造的同行一点启发!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/344399.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

若依前后端分离版集成nacos

根据公司要求&#xff0c;需要将项目集成到nacos中&#xff0c;当前项目是基于若依前后端分离版开发的&#xff0c;若依的版本为3.8.3&#xff0c;若依框架中整合的springBoot版本为2.5.14。Nacos核心提供两个功能&#xff1a;服务注册与发现&#xff0c;动态配置管理。 一、服…

【django项目开发】用户登录后缓存权限到redis中(十)

这里写目录标题一、权限的数据的特点二、首先settings.py文件中配置redis连接redis数据库一、权限的数据的特点 需要去数据库中频繁的读和写&#xff0c;为了项目提高运行效率&#xff0c;可以把用户的权限在每次登录的时候都缓存到redis中。这样的话&#xff0c;权限判断的中…

基于投票策略的室内家具检测:VoteNet、BRNet 最全总结

文章目录一、基本概述二、VoteNet三、BRNet四、最新研究成果一、基本概述 最近几年&#xff0c;基于点云的3D目标检测是自动驾驶场景研究的热点。但是&#xff0c;自动驾驶主要聚焦于室外场景。本文&#xff0c;我们主要介绍两篇文章&#xff08;VoteNet&#xff0c;BRNet&…

HTML第一章总结

<h1~h6>标题标签 <br />换行标签&#xff08;单&#xff09; <p>分段标签 <strong>加粗标签 <em>倾斜标签 <del>删除线标签 <ins>下划线标签 <div>独占一行的布局标签 <span>进行分割的布局标签 <img>图片标签&a…

Go-micro[windows]安装以及踩坑

一.首先安装protochttps://github.com/protocolbuffers/protobuf/releases进入网址&#xff0c;点击tag&#xff0c;然后选择v3版本进入之后找到随后下载安装然后将protoc解压缩到任意目录&#xff08;自己选&#xff09;再将protoc/bin的路径放置环境变量中二.获取protoc-gen-…

RK3568 UBOOT的问题解决案例

一、UBOOT下的波特率 原始的波特率为1500000,串口工具虽然可以设置任意波特率&#xff0c;但工作时不正常。 输入不了。本文描述如何修改成115200。 二、确认UBOOT的配置 ./build.sh uboot processing option: uboot Start building uboot TARGET_UBOOT_CONFIGrk3568## make …

【C++】二叉树之力扣经典题目1——详解二叉树的递归遍历,二叉树的层次遍历

如有错误&#xff0c;欢迎指正。 如有不理解的地方&#xff0c;可以私信问我。 文章目录题目1&#xff1a;根据二叉树创建字符串题目实例思路与解析代码实现题目2&#xff1a;二叉树的层序遍历题目思路与解析代码实现题目1&#xff1a;根据二叉树创建字符串 点击进入题目链接—…

C++——多态|虚函数|重写|虚表

文章目录1. 多态的概念1.1 概念2. 多态的定义及实现2.1多态的构成条件2.2 虚函数2.3虚函数的重写虚函数重写的三个例外&#xff1a;2.4 普通调用和多态调用&#xff1a;2.5 C11 override 和 final2.6 重载、虚函数的覆盖(重写)、隐藏(重定义)的对比3. 抽象类(有关纯虚函数)3.1 …

互联网新时代要到来了(一)什么是Web3.0?

什么是Web3.0? tips&#xff1a;内容来自百度百科、知乎、搜狐新闻、李留白公众号、CSDN「Meta.Qing」博客等网页 什么是Web3.0?1.什么是Web3.0&#xff08;概念介绍&#xff09;&#xff1f;2.Web3.0简单理解3.Web3.0的技术特点4.Web3.0项目1.什么是Web3.0&#xff08;概念…

greenDao的使用文档

介绍&#xff1a;greenDAO 是一款轻量级的 Android ORM 框架&#xff0c;将 Java 对象映射到 SQLite 数据库中&#xff0c;我们操作数据库的时候&#xff0c;不在需要编写复杂的 SQL语句&#xff0c; 在性能方面&#xff0c;greenDAO 针对 Android 进行了高度优化&#xff0c; …

Ubuntu 20中安装snaphu

Ubuntu 20中安装snaphu0 前言1 snaphu安装步骤1.1 在控制台用命令行安装1.2 在官网下载安装包0 前言 snaphu是一个解缠软件。基于欧空局的SNAP snaphu的官网&#xff1a;https://web.stanford.edu/group/radar/softwareandlinks/sw/snaphu/ 1 snaphu安装步骤 大致有两种 在…

微软支持的ChatGPT激增,但不要低估苹果和谷歌

微软和 OpenAI 可能在 AI 聊天机器人爆炸式增长的市场中具有先发优势&#xff0c;但不要排除其他一些可以访问大量 AI 训练数据的科技巨头&#xff0c;例如 Apple 和 Google。 通过其对 ChatGPT 开发商 OpenAI 的早期和持续支持&#xff0c;微软在AI 军备竞赛中目前处于领先地…

鲸探玩家狂收往期数藏,2023年数藏二级市场的紧箍咒可能松动了?

图片来源&#xff1a;由无界AI绘画工具生成2月初&#xff0c;数藏发行平台鲸探更新了用户服务协议&#xff0c;更新最受关注的点在于&#xff1a;首次转赠期限从180天调整为90天。此外&#xff0c;有媒体披露&#xff0c;鲸探客服回答用户提问称&#xff0c;非首次转赠也从720天…

ITSS认证分为几个级别,哪个级别最高

​一、什么是ITSS ITSS( 信息技术服务标准&#xff0c;简称ITSS)是国内第一套成体系和综合配套的信息技术服务标准库&#xff0c;全面规范了IT服务产品及其组成要素&#xff0c;用于指导实施标准化和可信赖的IT服务。 ITSS是在工业和信息化部、国家标准化管理委员会的联合指导下…

Python 之 NumPy 统计函数、数据类型和文件操作

文章目录一、统计函数1. 求平均值 mean()2. 中位数 np.median3. 标准差 ndarray.std4. 方差 ndarray.var()5. 最大值 ndarray.max()6. 最小值 ndarray.min()7. 求和 ndarray.sum()8. 加权平均值 numpy.average()二、数据类型1. 数据存储2. 定义结构化数据3. 结构化数据操作三、…

儿童及婴幼儿产品出口美国CPC和欧洲CE认证测试标准总结

消费者越来越关注他们选购的产品安全性和质量&#xff0c;尤其是儿童和婴幼儿产品。若产品不符合安全标准和法规要求&#xff0c;可能对婴儿、幼儿和儿童造成威胁。儿童和婴幼儿产品的制造商及零售商必须严格遵守当地市场法规&#xff0c;证明其产品的安全性和质量可以满足消费…

为什么重写equals还要重写hashcode方法

目录equals方法hashCode方法为什么要一起重写&#xff1f;总结面试如何回答重写 equals 时为什么一定要重写 hashCode&#xff1f;要想了解这个问题的根本原因&#xff0c;我们还得先从这两个方法开始说起。 以下是关于hashcode的一些规定&#xff1a; 两个对象相等&#xff0…

文心ERNIE 3.0 Tiny新升级!端侧压缩部署“小” “快” “灵”!

大家好&#xff0c;今天带来的是有关文心ERNIE 3.0 Tiny新升级内容的文章。 近年来&#xff0c;随着深度学习技术的迅速发展&#xff0c;大规模预训练范式通过一次又一次刷新各种评测基线证明了其卓越的学习与迁移能力。在这个过程中&#xff0c;研究者们发现通过不断地扩大模型…

第七章:Linux最小化搭建环境解说2

配置IP地址&#xff1a;我们先要到网卡配置文件夹里&#xff0c;路径是/etc/sysconfig/network-scripts/&#xff0c;有点长&#xff0c;不过没事&#xff0c;我们要学会习惯&#xff0c;这还是经常用的。然后就是用ls命令查看下面有什么&#xff0c;只有一个文件ifcfg-ens160&…

【位运算问题】Leetcode 136、137、260问题详解及代码实现

Halo&#xff0c;这里是Ppeua。平时主要更新C语言&#xff0c;C&#xff0c;数据结构算法......感兴趣就关注我吧&#xff01;你定不会失望。 &#x1f308;个人主页&#xff1a;主页链接 &#x1f308;算法专栏&#xff1a;专栏链接 我会一直往里填充内容哒&#xff01; &…