任务调度
一、什么时候需要任务调度?
- 基于时间的任务
- 批量数据的处理
- 异步解耦(比如先做任务A,再做任务B)
二、任务调度的基本需求有哪些?
- 可以定义触发的规则,比如基于时刻、时间间隔、表达式。
- 可以定义需要执行的任务。比如执行一个脚本或者一段代码。任务与规则是分开的。
- 集中管理配置,持久化配置。不需要把规则写在代码里面,可以看到所有的任务配置,方便维护。重启后,任务可以再次调度。
- 支持任务之间的串行执行,比如先执行任务A,再执行任务B。
- 支持多个任务并发执行,互不干扰。
- 有自己的调度器,可以启动、中断、停止任务。
- 容易集成到Spring。
Quartz的不足
调用API的方式操作业务,不人性化。
需要持久化业务QuartzJobBean到底层数据表中,系统侵入性相当严重。
调度和任务耦合。调度逻辑和QuartzJobBean耦合在同一个项目中,导致调度系统的性能受限于业务。
随机负载。底层使用抢占式获取DB锁,并由抢占成功的节点负责运行任务,会导致节点负载悬殊非常大。
以上四点是XXL-Job官方文档提供的。
缺少UI界面,也就无法通过UI界面动态调整调度策略。
架构图
运行模式
1、BEAN模式(类形式)
即使是无框架项目,如 main 方法直接启动的项目也提供支持。但是不支持自动扫描任务并注入到执行器容器中,需要手动注入。
继承 IJobHandler 抽象类,重写 execute 方法,如下:
public class SampleFramelessJob extends IJobHandler {
@Override
public void execute() throws Exception {
XxlJobHelper.log("Xxl frameless job starting !!!");
for (int i = 0; i < 5; i++) {
XxlJobHelper.log("beat at:" + i);
TimeUnit.SECONDS.sleep(2);
}
}
}
然后手动通过 XxlJobExecutor 将 Job 类注入到执行器容器中,如下:
XxlJobExecutor.registJobHandler("sampleFramelessJob", new SampleFramelessJob());
2、BEAN模式(方法形式)
在 Spring 环境中,对某个类标注 @Component 注解,同时对该类下的某个方法标注 @XxlJob 注解。
@Component
public class SampleJob {
@XxlJob("sampleBeanJob")
public void sampleJobHandler() throws Exception {
XxlJobHelper.log("Xxl sample job starting !!!");
for (int i = 0; i < 5; i++) {
XxlJobHelper.log("beat at:" + i);
TimeUnit.SECONDS.sleep(2);
}
}
}
3、GLUE模式(Java)
在 Web IDE 界面,输入如下代码:
package com.xxl.job.service.handler;
import com.xxl.job.core.context.XxlJobHelper;
import com.xxl.job.core.handler.IJobHandler;
import java.util.concurrent.TimeUnit;
public class DemoGlueJobHandler extends IJobHandler {
@Override
public void execute() throws Exception {
XxlJobHelper.log("XXL-JOB, Hello World.");
for (int i = 0; i < 5; i++) {
XxlJobHelper.log("beat at:" + i);
TimeUnit.SECONDS.sleep(2);
}
}
}
4、GLUE模式(Shell)
5、GLUE模式(Python)
6、GLUE模式(NodeJS)
7、GLUE模式(PHP)
8、GLUE模式(PowerShell)
路由策略
执行器集群部署时的路由策略。如果有多个执行器,选择哪个执行器执行任务。支持如下策略:
- FIRST(第一个):固定选择第一个机器
- LAST(最后一个):固定选择最后一个机器
- ROUND(轮询)
- RANDOM(随机):随机选择在线的机器
- CONSISTENT_HASH(一致性HASH):每个任务按照Hash算法固定选择某一台机器,并且所有任务均匀散列在不同机器上
- LEAST_FREQUENTLY_USED(最不经常使用):优先选择使用频率最低的机器
- LEAST_RECENTLY_USED(最近最久未使用):优先选择使用最久未使用的机器
- FAILOVER(故障转移):按照顺序依次进行心跳检测,优先选择第一个心跳检测成功的机器
- BUSYOVER(忙碌转移):按照顺序依次进行空闲检测,优先选择第一个空闲检测成功的机器
- SHARDING_BROADCAST(分片广播):对集群中所有机器都执行一次任务,同时系统自动传递分片参数;可以根据分片参数开发分片任务
子任务
每个任务都有一个唯一的任务id(任务id可以从任务列表中获取)。添加任务时,可以设置一个子任务id。这样可以实现当前调度任务执行成功后,然后触发子任务的执行。
调度过期策略
任务调度过期后的处理策略。某一时刻调度任务本该执行的却没有执行的处理策略。支持如下策略:
- 忽略(默认):忽略过期的任务,从当前时间开始重新计算下次触发时间
- 立即执行一次:立即执行一次任务,从当前时间开始重新计算下次触发时间
阻塞处理策略
调度过于密集,执行器来不及处理时的处理策略。当前调度任务执行时,发现上一次的调度任务还未完成时的处理策略。支持如下策略:
- 单机串行(默认):当前调度请求进入FIFO队列并以串行的方式运行。
- 丢弃后续调度:如果上一次的调度任务未完成,则丢弃当前调度请求并标记为失败。
- 覆盖之前调度:如果上一次的调度任务未完成,则终止运行上一次的调度任务并清空队列,然后运行当前调度任务。
任务超时控制
支持自定义任务的超时时间,如果任务执行超时将会主动中断任务。默认0。
失败重试次数
任务执行失败时重试的次数,默认0。
分片广播任务
执行器集群部署时,并且任务的路由策略选择"分片广播"的情况下,一次任务调度会广播触发集群中的所有执行器执行一次任务,此外可以根据分片参数开发分片任务。
@XxlJob("shardingJobHandler")
public void shardingJobHandler() {
String jobParam = XxlJobHelper.getJobParam();
if (jobParam == null || jobParam.length() == 0) {
XxlJobHelper.handleFail("分片参数为空,分片任务处理失败");
return;
}
int shardIndex = XxlJobHelper.getShardIndex();
int shardTotal = XxlJobHelper.getShardTotal();
XxlJobHelper.log("分片参数:当前分片序号 = {}, 总分片数 = {}", shardIndex, shardTotal);
for (int i = 0; i < shardTotal; i++) {
if (i == shardIndex) {
XxlJobHelper.log("第 {} 片,命中分片开始处理", i);
} else {
XxlJobHelper.log("第 {} 片,忽略", i);
}
}
}
命令行任务
原生提供通用命令行任务Handler,业务方只需要提供命令行即可,比如在任务参数中指定 “pwd” 命令。
@XxlJob("commandLineJobHandler")
public void commandLineJobHandler() {
String jobParam = XxlJobHelper.getJobParam();
ProcessBuilder processBuilder = new ProcessBuilder();
processBuilder.command(jobParam);
// 将错误信息合并到标准输出中,因此可以使用Process#getInputStream()方法读取到错误信息
processBuilder.redirectErrorStream(true);
BufferedInputStream bufferedInputStream = null;
try {
Process process = processBuilder.start();
bufferedInputStream = new BufferedInputStream(process.getInputStream());
BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(bufferedInputStream));
String line;
while ((line = bufferedReader.readLine()) != null) {
XxlJobHelper.log(line);
}
process.waitFor();
int exitValue = process.exitValue();
if (exitValue == 0) {
XxlJobHelper.handleSuccess("command exit value(" + exitValue + ") is successful");
} else {
XxlJobHelper.handleFail("command exit value(" + exitValue + ") is failed");
}
} catch (Exception e) {
XxlJobHelper.log(e);
XxlJobHelper.handleFail();
} finally {
if (bufferedInputStream != null) {
try {
bufferedInputStream.close();
} catch (IOException e) {
XxlJobHelper.log(e);
}
}
}
}
HTTP任务
通用HTTP任务的Handler,业务方只需要提供HTTP链接等信息即可,不限制语言、平台。
例如,在任务参数中填写如下:
url: http://www.baidu.com/s?wd=xxljob
method: get
data: content
对应的HTTP任务如下:
@XxlJob("httpJobHandler")
public void httpJobHandler() {
String jobParam = XxlJobHelper.getJobParam();
if (jobParam == null || jobParam.length() == 0) {
XxlJobHelper.log("param[" + jobParam + "] invalid");
XxlJobHelper.handleFail();
return;
}
String[] httpParams = jobParam.split("\n");
String url = null;
String method = null;
String data = null;
for (String httpParam : httpParams) {
if (httpParam.startsWith("url:")) {
url = httpParam.substring(httpParam.indexOf("url:") + 4).trim();
}
if (httpParam.startsWith("method:")) {
method = httpParam.substring(httpParam.indexOf("method:") + 7).trim().toUpperCase();
}
if (httpParam.startsWith("data:")) {
data = httpParam.substring(httpParam.indexOf("data:") + 5).trim();
}
}
if (url == null || url.length() == 0) {
XxlJobHelper.log("url["+ url +"] invalid");
XxlJobHelper.handleFail();
return;
}
if (method == null || !Arrays.asList("GET", "POST").contains(method)) {
XxlJobHelper.log("method[" + method + "] invalid");
XxlJobHelper.handleFail();
return;
}
boolean isPostMethod = "POST".equals(method);
HttpURLConnection connection = null;
BufferedReader bufferedReader = null;
try {
URL realUrl = new URL(url);
connection = (HttpURLConnection) realUrl.openConnection();
connection.setRequestMethod(method);
connection.setDoOutput(isPostMethod);
connection.setDoInput(true);
connection.setUseCaches(false);
connection.setReadTimeout(5 * 1000);
connection.setConnectTimeout(3 * 1000);
// connection:Keep-Alive 表示在一次http请求中,服务器进行响应后,不再直接断开TCP连接,而是将TCP连接维持一段时间。
// 在这段时间内,如果同一客户端再次向服务端发起http请求,便可以复用此TCP连接,向服务端发起请求。
connection.setRequestProperty("connection", "Keep-Alive");
// Content-Type 表示客户端向服务端发送的数据的媒体类型(MIME类型)
connection.setRequestProperty("Content-Type", "application/json;charset=UTF-8");
// Accept-Charset 表示客户端希望服务端返回的数据的媒体类型(MIME类型)
connection.setRequestProperty("Accept-Charset", "application/json;charset=UTF-8");
connection.connect();
if (isPostMethod && data != null && data.length() > 0) {
DataOutputStream dataOutputStream = new DataOutputStream(connection.getOutputStream());
dataOutputStream.write(data.getBytes(Charset.defaultCharset()));
dataOutputStream.flush();
dataOutputStream.close();
}
int responseCode = connection.getResponseCode();
if (responseCode != 200) {
throw new RuntimeException("Http Request StatusCode(" + responseCode + ") Invalid");
}
bufferedReader = new BufferedReader(new InputStreamReader(connection.getInputStream(), Charset.defaultCharset()));
StringBuilder stringBuilder = new StringBuilder();
String line;
while ((line = bufferedReader.readLine()) != null) {
stringBuilder.append(line);
}
String responseMsg = stringBuilder.toString();
XxlJobHelper.log(responseMsg);
} catch (Exception e) {
XxlJobHelper.log(e);
XxlJobHelper.handleFail();
} finally {
try {
if (bufferedReader != null) {
bufferedReader.close();
}
if (connection != null) {
connection.disconnect();
}
} catch (Exception e) {
XxlJobHelper.log(e);
}
}
}