作者:Mars酱
声明:本文章由Mars酱整理编写,部分内容来源于网络,如有疑问请联系本人。
转载:欢迎转载,转载前先请联系我!
前言
我们选择一套框架或者技术的时候,一定要知道它的特点和功能,不能为了(学习)技术而(选择)技术,那是对产品的不负责任。官方说有类似情况的,可以使用PowJob:
- 有定时执行需求的业务场景:如每天凌晨全量同步数据、生成业务报表、未支付订单超时取消等。
- 有需要
全部机器一同执行
的业务场景:如使用广播执行模式清理集群日志。 - 有需要
分布式处理
的业务场景:比如需要更新一大批数据,单机执行耗时非常长,可以使用Map/MapReduce 处理器完成任务的分发,调动整个集群加速计算。 - 有需要
延迟执行
某些任务的业务场景:比如订单过期处理等。
架构图
这是官方提供的架构图:
从架构图颜色可以看出主体就分了两个大块和一小条:调度中心、执行器、Akka。
-
调度中心 powerjob-server
:PowerJob 的设计目标为企业级的分布式任务调度平台,即成为调度中间件,让任意业务线的应用仅需要依赖 powerjob-worker 即可获取任务调度与分布式计算的能力。因此,PowerJob 的理想部署模式为一个公司统一部署 powerjob-server 集群,各业务线应用直接接入使用。 -
执行器 powerjob-worker
:根据以前对定时任务的理解,用过Quartz的话,这里相当于Job这个接口;用过ElasticJob的话,最起码相当于Job接口中的一种,比如SimpleJob接口;用过xxl-job的话,这里也是同理,相当于使用了注解@XxlJob
的方法。因此,所有需要执行的任务,mars酱的理解都需要依赖powerjob-worker的。 -
Akka ActorSystem
:基于Actor模型设计的,专用于构建高度并发、分布式和弹性的工具包,号称单台机器上高达 200 亿条消息/秒。从架构图来看,PowerJob用来做数据交换传输,这么牛逼的中间协议处理者,看来PowerJob团队一定是想往大了搞的。
学习官方的例子
官方提供了一个示例,下载源代码之后,有个powerjob-worker-samples子工程,工程结构是这样:
工程依赖powerjob-worker-spring-boot-starter,工程文件夹中比较重点的就是processors文件夹了,给出了各种处理器实现的例子。处理器官方按照功能分了几种:
-
单机处理器 - BasicProcessor
:单机执行的策略下,server 会在所有可用 worker 中选取健康度最佳的机器进行执行。 -
广播处理器 - BroadcastProcessor
:广播执行的策略下,所有机器都会被调度执行该任务。为了便于资源的准备和释放,广播处理器在BasicProcessor
的基础上额外增加了 preProcess
和 postProcess
方法,分别在整个集群开始之前/结束之后选一台机器执行相关方法。 -
并行处理器 - MapReduceProcessor
:MapReduce 是最复杂也是最强大的一种执行器,它允许开发者完成任务的拆分,将子任务派发到集群中其他Worker 执行,是执行大批量处理任务的首选。 -
Map处理器 - MapProcessor
:对应了Map任务,即某个任务在运行过程中,允许产生子任务并分发到其他机器进行运算。
process方法
BasicProcessor 的 process 方法基本上是所有任务需要实现的核心方法,表示需要执行任务的具体业务内容,其方法定义如下:
ProcessResult process(TaskContext context) throws Exception;
TaskContext入参
process参数TaskContext类似一个dto对象,里面定义了框架传递给具体业务内容所需的一些属性,如下:
属性名称 | 意义/用法 |
---|---|
jobId | 任务 ID,开发者一般无需关心此参数 |
instanceId | 任务实例 ID,全局唯一,开发者一般无需关心此参数 |
subInstanceId | 子任务实例 ID,秒级任务使用,开发者一般无需关心此参数 |
taskId | 采用链式命名法的 ID,在某个任务实例内唯一,开发者一般无需关心此参数 |
taskName | task 名称,Map/MapReduce 任务的子任务的值为开发者指定,否则为系统默认值,开发者一般无需关心此参数 |
jobParams | 任务参数对于非工作流中的任务其值等同于控制台录入的任务参数; 如果该任务为工作流中的任务且有配置节点参数信息,那么接收到的是节点配置的参数信息 |
instanceParams | 任务实例参数对于非工作流中的任务 其值 等同于 OpenAPI 传递的实例参数,非 OpenAPI 触发的任务则一定为空。 如果该任务为工作流中的任务那么这里实际接收到的是工作流上下文信息,建议使用 getWorkflowContext 方法获取上下文信息 |
maxRetryTimes | Task 的最大重试次数 |
currentRetryTimes | Task 的当前重试次数,和 maxRetryTimes 联合起来可以判断当前是否为该 Task 的最后一次运行机会 |
subTask | 子 Task,Map/MapReduce 处理器专属,开发者调用map方法时传递的子任务列表中的某一个 |
omsLogger | 在线日志,用法同 Slf4J,记录的日志可以直接通过控制台查看,非常便捷和强大!不过使用过程中需要注意频率,滥用在线日志会对 Server 造成巨大的压力 |
userContext | 用户在 PowerJobWorkerConfig 中设置的自定义上下文 |
workflowContext | 工作流WorkflowContext对象 |
这是它的源码:
@Getter
@Setter
@ToString
@Slf4j
public class TaskContext {
private Long jobId;
private Long instanceId;
private Long subInstanceId;
private String taskId;
private String taskName;
private String jobParams;
private String instanceParams;
private int maxRetryTimes;
private int currentRetryTimes;
private Object subTask;
@JsonIgnore
private OmsLogger omsLogger;
private Object userContext;
private WorkflowContext workflowContext;
}
返回值 ProcessResult
方法的返回值为 ProcessResult
,代表了本次任务执行的结果,包含 success
和 msg
两个属性,分别用于传递 Task 是否执行成功和 Task 需要返回的信息。
process方法被谁调用
mars酱选择官方例子中的SimpleProcessor任务,它实现了BasicProcessor
的process方法,跟踪它被调用的地方,找到LightTaskTracker, 它的构造函数把任务提交给线程池调用,这是LightTaskTracker的构造函数:
public LightTaskTracker(ServerScheduleJobReq req, WorkerRuntime workerRuntime) {
super(req, workerRuntime);
try {
taskContext = constructTaskContext(req, workerRuntime);
// 1. 等待处理
status = TaskStatus.WORKER_RECEIVED;
// 2. 加载 Processor
processorBean = workerRuntime.getProcessorLoader().load(new ProcessorDefinition().setProcessorType(req.getProcessorType()).setProcessorInfo(req.getProcessorInfo()));
executeThread = new AtomicReference<>();
long delay = Integer.parseInt(System.getProperty(PowerJobDKey.WORKER_STATUS_CHECK_PERIOD, "15")) * 1000L;
// 3. 初始延迟加入随机值,避免在高并发场景下所有请求集中在一个时间段
long initDelay = RandomUtils.nextInt(5000, 10000);
// 4. 上报任务状态
statusReportScheduledFuture = workerRuntime.getExecutorManager().getLightweightTaskStatusCheckExecutor().scheduleWithFixedDelay(this::checkAndReportStatus, initDelay, delay, TimeUnit.MILLISECONDS);
// 5. 超时控制
if (instanceInfo.getInstanceTimeoutMS() != Integer.MAX_VALUE) {
if (instanceInfo.getInstanceTimeoutMS() < 1000L) {
timeoutCheckScheduledFuture = workerRuntime.getExecutorManager().getLightweightTaskStatusCheckExecutor().scheduleAtFixedRate(this::timeoutCheck, instanceInfo.getInstanceTimeoutMS(), instanceInfo.getInstanceTimeoutMS() / 10, TimeUnit.MILLISECONDS);
} else {
// 执行时间超过 1 s 的任务,超时检测最小颗粒度为 1 s
timeoutCheckScheduledFuture = workerRuntime.getExecutorManager().getLightweightTaskStatusCheckExecutor().scheduleAtFixedRate(this::timeoutCheck, instanceInfo.getInstanceTimeoutMS(), 1000L, TimeUnit.MILLISECONDS);
}
} else {
timeoutCheckScheduledFuture = null;
}
// 6. 提交任务到线程池
processFuture = workerRuntime.getExecutorManager().getLightweightTaskExecutorService().submit(this::processTask);
} catch (Exception e) {
log.error("[TaskTracker-{}] fail to create TaskTracker for req:{} ", instanceId, req);
destroy();
throw e;
}
}
上面代码的第6步是通过PowerJob的ExecutorManager创建的一个线程池,并提交给线程池去执行,这是ExecutorManager的构造函数:
public ExecutorManager(PowerJobWorkerConfig workerConfig){
final int availableProcessors = Runtime.getRuntime().availableProcessors();
// 初始化定时线程池
ThreadFactory coreThreadFactory = new ThreadFactoryBuilder().setNameFormat("powerjob-worker-core-%d").build();
coreExecutor = new ScheduledThreadPoolExecutor(3, coreThreadFactory);
ThreadFactory lightTaskReportFactory = new ThreadFactoryBuilder().setNameFormat("powerjob-worker-light-task-status-check-%d").build();
lightweightTaskStatusCheckExecutor = new ScheduledThreadPoolExecutor(availableProcessors * 10, lightTaskReportFactory);
ThreadFactory lightTaskExecuteFactory = new ThreadFactoryBuilder().setNameFormat("powerjob-worker-light-task-execute-%d").build();
// 这里创建线程池,
lightweightTaskExecutorService = new ThreadPoolExecutor(availableProcessors * 10,availableProcessors * 10, 120L, TimeUnit.SECONDS,
new ArrayBlockingQueue<>((workerConfig.getMaxLightweightTaskNum() * 2),true), lightTaskExecuteFactory, new ThreadPoolExecutor.AbortPolicy());
}
构造函数最后一行创建ThreadPoolExecutor,队列使用的ArrayBlockingQueue,失败策略使用的AbortPolicy策略,失败之后抛出异常。
其他任务调度框架
优秀的定时任务框架很多,单体架构的实现可选的不太多,一般也就spring task常用点,分布式架构的可选的很多,可以根据自己的需求选择不同的定时任务框架,以下还有几款名气也不小的:
-
big-whale
:美柚app开源的任务调度框架,提供Spark、Flink等批处理任务的DAG调度和流处理任务的运行管理和状态监控,并具有Yarn应用管理、重复应用检测、大内存应用检测等功能。 -
Schedulis
:微众银行基于 LinkedIn 的开源项目 Azkaban 开发的一款工作流任务调度系统,用于解决金融级场景下,大量批量作业任务的复杂依赖、灵活调度。 -
Oozie
:工作流调度系统,用于管理Apache Hadoop作业。它与Hadoop堆栈的其余部分集成,支持多种类型的Hadoop作业(如Java map-reduce,Streaming map-reduce,Pig,Hive,Sqoop和Distcp)以及系统特定的作业(如Java程序和shell脚本)。
其他优秀的开源任务调度框架,大家可以去github或者gitee上搜索并学习一下。一分钟掌握定时任务,完结。