xxl-job 源码分析
简介:阅读优秀的开源项目源码总是一件让人激动的事情,分布式调度平台xxl-job我们在生产环境也是有了很多的实践应用,一款产品使用久了对其实现原理多少有些了解了,今天也是抽出整块的时间来认真分析一下xxl-job的设计原理,吸收其中源码的精华。
一、源码目录
首先我们到git上面把源码down到本地,导入idea中,如下所示:
根据官网介绍各目录结构内容如下:
- /doc :文档资料
- /db :“调度数据库”建表脚本
- /xxl-job-admin :调度中心,项目源码
- /xxl-job-core :公共Jar依赖
- /xxl-job-executor-samples :执行器,Sample示例项目(大家可以在该项目上进行开发,也可以将现有项目改造生成执行器项目)
二、架构设计
1、设计思想
将调度行为抽象形成“调度中心”公共平台,而平台自身并不承担业务逻辑,“调度中心”负责发起调度请求。
将任务抽象成分散的JobHandler,交由“执行器”统一管理,“执行器”负责接收调度请求并执行对应的JobHandler中业务逻辑。
因此,“调度”和“任务”两部分可以相互解耦,提高系统整体稳定性和扩展性;
2、系统组成
- 调度模块(调度中心):
负责管理调度信息,按照调度配置发出调度请求,自身不承担业务代码。调度系统与任务解耦,提高了系统可用性和稳定性,同时调度系统性能不再受限于任务模块;
支持可视化、简单且动态的管理调度信息,包括任务新建,更新,删除,GLUE开发和任务报警等,所有上述操作都会实时生效,同时支持监控调度结果以及执行日志,支持执行器Failover。 - 执行模块(执行器):
负责接收调度请求并执行任务逻辑。任务模块专注于任务的执行等操作,开发和维护更加简单和高效;
接收“调度中心”的执行请求、终止请求和日志请求等。
3、架构图
三、设计原理
下图是xxl-job的执行流程原理图:
- 任务执行器根据配置的调度中心的地址,自动注册到调度中心。
- 达到任务触发条件,调度中心下发任务。
- 执行器基于线程池执行任务,并把执行结果放入内存队列中、把执行日志写入日志文件中。
- 执行器的回调线程消费内存队列中的执行结果,主动上报给调度中心。
- 当用户在调度中心查看任务日志,调度中心请求任务执行器,任务执行器读取任务日志文件并返回日志详情。
上面这段描述的是xxl-job整体的执行流程,接下来我将基于源码分析的方式来讲解一下调度中心和执行器的设计原理
1、调度中心
1.1、任务的执行或触发
任务执行方式
- 根据配置的cron表达式周期性执行任务
- 主动触发一次任务
1.2、主动触发一次任务
在xxl-job控制台界面中,点击【立即执行】,会触发一次任务,后台会调用JobTriggerPoolHelper.trigger() 任务。该方法是将任务提交给一个线程池,在线程池中调用XxlJobTrigger.trigger方法。
阅读源码可以得出如下执行流程:
- admin 通过触发执行一次,会调用JobTriggerPoolHelper.trigger(id, TriggerTypeEnum.MANUAL, -1, null, executorParam);
- 调用helper.addTrigger(jobId, triggerType, failRetryCount, executorShardingParam, executorParam); 将这个任务加入到jobTriggerPool池里面,提交到线程池里执行
-
向线程池提交任务里调用了 XxlJobTrigger.trigger(jobId, triggerType, failRetryCount, executorShardingParam, executorParam)方法;
-
之后调用了processTrigger(group, jobInfo, finalFailRetryCount, triggerType, shardingParam[0], shardingParam[1]);
-
然后通过创建的路由策略,选择同名执行器的某个执行器执行
-
然后调用 triggerResult = runExecutor(triggerParam, address);
-
runResult = executorBiz.run(triggerParam);
-
获取jobThread和JobHandler处理器,然后将任务键入的jobThread的队列中
以上是主动触发一次任务的整体处理流程了。
2、执行器
执行器相当于一个应用服务,需要注册到调度中心统一管理。执行器主要功能是把自己注册到调度中心然后保存在数据库(xxl_job_registry表),并定时发送心跳,保持续约。执行器正常关闭,也主动告知调度中心注销,这种是主动注册。
如果执行器网络故障,调度中心就不知道执行器的情况,如果把任务路由给一个不可用的执行器,就会导致任务失败。调度中心会启动一个后台线程定时调用执行器接口,如果发现异常就下线。
这里我们首先关注一下XxlJobConfig核心配置类,在这里我们这里将 com.xxl.job.core.executor.impl.XxlJobSpringExecutor 交由容器托管了。
XxlJobSpringExecutor实现了ApplicationContextAware, SmartInitializingSingleton, DisposableBean接口。SmartInitializingSingleton 接口的 afterSingletonsInstantiated()方法类似bean实例化执行后的回调函数。afterSingletonsInstantiated 会在spring 容器基本启动完成后执行。此时所有的单列bean都已初始化完成。实现了SmartInitializingSingleton 接口的类
可以在afterSingletonsInstantiated 中做一些回调操作。
在afterSingletonsInstantiated()方法中调用了父类的start()方法。然后再start()方法中我们看到了如下启动方法:
在XxlJobExecutor的initEmbedServer 方法中创建了内置容器,EmbedServer的start方法又会启动容器,并且会开始注册执行器。
EmbedServer 中和注册相关的部分代码:
ExecutorRegistryThread 是一个执行器注册线程类。到这里我们对执行器的整体注册过程进行了一次梳理,细节部分大家可以自行去深入研究学习。
总结
本文主要是梳理了一下自己理解xxl-job的思路,中间一段内容摘抄自官网(本部分都是通用的没必要自己去重复),文章整体并不是为了详细讲解清楚xxl-job开源项目,而是提取了核心关注点来进行的介绍,目的是让大家能够对xxl-job主流程和设计思路有个清晰的认识。