XXL-JOB架构篇 - 初识分布式任务调度XXL-JOB

news2025/3/7 8:02:57

任务调度

一、什么时候需要任务调度?

  • 基于时间的任务
  • 批量数据的处理
  • 异步解耦(比如先做任务A,再做任务B)

二、任务调度的基本需求有哪些?

  • 可以定义触发的规则,比如基于时刻、时间间隔、表达式。
  • 可以定义需要执行的任务。比如执行一个脚本或者一段代码。任务与规则是分开的。
  • 集中管理配置,持久化配置。不需要把规则写在代码里面,可以看到所有的任务配置,方便维护。重启后,任务可以再次调度。
  • 支持任务之间的串行执行,比如先执行任务A,再执行任务B。
  • 支持多个任务并发执行,互不干扰。
  • 有自己的调度器,可以启动、中断、停止任务。
  • 容易集成到Spring。

Quartz的不足

调用API的方式操作业务,不人性化。

需要持久化业务QuartzJobBean到底层数据表中,系统侵入性相当严重。

调度和任务耦合。调度逻辑和QuartzJobBean耦合在同一个项目中,导致调度系统的性能受限于业务。

随机负载。底层使用抢占式获取DB锁,并由抢占成功的节点负责运行任务,会导致节点负载悬殊非常大。

以上四点是XXL-Job官方文档提供的。

缺少UI界面,也就无法通过UI界面动态调整调度策略。

架构图

在这里插入图片描述

在这里插入图片描述

运行模式

1、BEAN模式(类形式)

即使是无框架项目,如 main 方法直接启动的项目也提供支持。但是不支持自动扫描任务并注入到执行器容器中,需要手动注入。

继承 IJobHandler 抽象类,重写 execute 方法,如下:

public class SampleFramelessJob extends IJobHandler {

    @Override
    public void execute() throws Exception {
        XxlJobHelper.log("Xxl frameless job starting !!!");
        for (int i = 0; i < 5; i++) {
            XxlJobHelper.log("beat at:" + i);
            TimeUnit.SECONDS.sleep(2);
        }
    }
    
}

然后手动通过 XxlJobExecutor 将 Job 类注入到执行器容器中,如下:

XxlJobExecutor.registJobHandler("sampleFramelessJob", new SampleFramelessJob());

2、BEAN模式(方法形式)

在 Spring 环境中,对某个类标注 @Component 注解,同时对该类下的某个方法标注 @XxlJob 注解。

@Component
public class SampleJob {

    @XxlJob("sampleBeanJob")
    public void sampleJobHandler() throws Exception {
        XxlJobHelper.log("Xxl sample job starting !!!");
        for (int i = 0; i < 5; i++) {
            XxlJobHelper.log("beat at:" + i);
            TimeUnit.SECONDS.sleep(2);
        }
    }
}

3、GLUE模式(Java)

在 Web IDE 界面,输入如下代码:

package com.xxl.job.service.handler;

import com.xxl.job.core.context.XxlJobHelper;
import com.xxl.job.core.handler.IJobHandler;
import java.util.concurrent.TimeUnit;

public class DemoGlueJobHandler extends IJobHandler {

	@Override
	public void execute() throws Exception {
		XxlJobHelper.log("XXL-JOB, Hello World.");
      	for (int i = 0; i < 5; i++) {
       		XxlJobHelper.log("beat at:" + i);
          	TimeUnit.SECONDS.sleep(2);
        }
	}

}

4、GLUE模式(Shell)

5、GLUE模式(Python)

6、GLUE模式(NodeJS)

7、GLUE模式(PHP)

8、GLUE模式(PowerShell)

路由策略

执行器集群部署时的路由策略。如果有多个执行器,选择哪个执行器执行任务。支持如下策略:

  • FIRST(第一个):固定选择第一个机器
  • LAST(最后一个):固定选择最后一个机器
  • ROUND(轮询)
  • RANDOM(随机):随机选择在线的机器
  • CONSISTENT_HASH(一致性HASH):每个任务按照Hash算法固定选择某一台机器,并且所有任务均匀散列在不同机器上
  • LEAST_FREQUENTLY_USED(最不经常使用):优先选择使用频率最低的机器
  • LEAST_RECENTLY_USED(最近最久未使用):优先选择使用最久未使用的机器
  • FAILOVER(故障转移):按照顺序依次进行心跳检测,优先选择第一个心跳检测成功的机器
  • BUSYOVER(忙碌转移):按照顺序依次进行空闲检测,优先选择第一个空闲检测成功的机器
  • SHARDING_BROADCAST(分片广播):对集群中所有机器都执行一次任务,同时系统自动传递分片参数;可以根据分片参数开发分片任务

子任务

每个任务都有一个唯一的任务id(任务id可以从任务列表中获取)。添加任务时,可以设置一个子任务id。这样可以实现当前调度任务执行成功后,然后触发子任务的执行。

调度过期策略

任务调度过期后的处理策略。某一时刻调度任务本该执行的却没有执行的处理策略。支持如下策略:

  • 忽略(默认):忽略过期的任务,从当前时间开始重新计算下次触发时间
  • 立即执行一次:立即执行一次任务,从当前时间开始重新计算下次触发时间

阻塞处理策略

调度过于密集,执行器来不及处理时的处理策略。当前调度任务执行时,发现上一次的调度任务还未完成时的处理策略。支持如下策略:

  • 单机串行(默认):当前调度请求进入FIFO队列并以串行的方式运行。
  • 丢弃后续调度:如果上一次的调度任务未完成,则丢弃当前调度请求并标记为失败。
  • 覆盖之前调度:如果上一次的调度任务未完成,则终止运行上一次的调度任务并清空队列,然后运行当前调度任务。

任务超时控制

支持自定义任务的超时时间,如果任务执行超时将会主动中断任务。默认0。

失败重试次数

任务执行失败时重试的次数,默认0。

分片广播任务

执行器集群部署时,并且任务的路由策略选择"分片广播"的情况下,一次任务调度会广播触发集群中的所有执行器执行一次任务,此外可以根据分片参数开发分片任务。

@XxlJob("shardingJobHandler")
public void shardingJobHandler() {
  	String jobParam = XxlJobHelper.getJobParam();
    if (jobParam == null || jobParam.length() ==  0) {
      	XxlJobHelper.handleFail("分片参数为空,分片任务处理失败");
      	return;
    }
  	int shardIndex = XxlJobHelper.getShardIndex();
  	int shardTotal = XxlJobHelper.getShardTotal();
 	XxlJobHelper.log("分片参数:当前分片序号 = {}, 总分片数 = {}", shardIndex, shardTotal);
  	for (int i = 0; i < shardTotal; i++) {
    	if (i == shardIndex) {
      		XxlJobHelper.log("第 {} 片,命中分片开始处理", i);
    	} else {
      		XxlJobHelper.log("第 {} 片,忽略", i);
    	}
  	}
}

命令行任务

原生提供通用命令行任务Handler,业务方只需要提供命令行即可,比如在任务参数中指定 “pwd” 命令。

@XxlJob("commandLineJobHandler")
public void commandLineJobHandler() {
    String jobParam = XxlJobHelper.getJobParam();
    ProcessBuilder processBuilder = new ProcessBuilder();
    processBuilder.command(jobParam);
    // 将错误信息合并到标准输出中,因此可以使用Process#getInputStream()方法读取到错误信息
    processBuilder.redirectErrorStream(true);
    BufferedInputStream bufferedInputStream = null;
    try {
        Process process = processBuilder.start();
        bufferedInputStream = new BufferedInputStream(process.getInputStream());
        BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(bufferedInputStream));
        String line;
        while ((line = bufferedReader.readLine()) != null) {
            XxlJobHelper.log(line);
        }
        process.waitFor();
        int exitValue = process.exitValue();
        if (exitValue == 0) {
            XxlJobHelper.handleSuccess("command exit value(" + exitValue + ") is successful");
        } else {
            XxlJobHelper.handleFail("command exit value(" + exitValue + ") is failed");
        }
    } catch (Exception e) {
        XxlJobHelper.log(e);
        XxlJobHelper.handleFail();
    } finally {
        if (bufferedInputStream != null) {
            try {
                bufferedInputStream.close();
            } catch (IOException e) {
                XxlJobHelper.log(e);
            }
        }
    }
}

HTTP任务

通用HTTP任务的Handler,业务方只需要提供HTTP链接等信息即可,不限制语言、平台。

例如,在任务参数中填写如下:

url: http://www.baidu.com/s?wd=xxljob
method: get
data: content

对应的HTTP任务如下:

@XxlJob("httpJobHandler")
public void httpJobHandler() {
    String jobParam = XxlJobHelper.getJobParam();
    if (jobParam == null || jobParam.length() == 0) {
        XxlJobHelper.log("param[" + jobParam + "] invalid");
        XxlJobHelper.handleFail();
        return;
    }
    String[] httpParams = jobParam.split("\n");
    String url = null;
    String method = null;
    String data = null;
    for (String httpParam : httpParams) {
        if (httpParam.startsWith("url:")) {
            url = httpParam.substring(httpParam.indexOf("url:") + 4).trim();
        }
        if (httpParam.startsWith("method:")) {
            method = httpParam.substring(httpParam.indexOf("method:") + 7).trim().toUpperCase();
        }
        if (httpParam.startsWith("data:")) {
            data = httpParam.substring(httpParam.indexOf("data:") + 5).trim();
        }
    }
    if (url == null || url.length() == 0) {
        XxlJobHelper.log("url["+ url +"] invalid");
        XxlJobHelper.handleFail();
        return;
    }
    if (method == null || !Arrays.asList("GET", "POST").contains(method)) {
        XxlJobHelper.log("method[" + method + "] invalid");
        XxlJobHelper.handleFail();
        return;
    }
    boolean isPostMethod = "POST".equals(method);
    HttpURLConnection connection = null;
    BufferedReader bufferedReader = null;
    try {
        URL realUrl = new URL(url);
        connection = (HttpURLConnection) realUrl.openConnection();
        connection.setRequestMethod(method);
        connection.setDoOutput(isPostMethod);
        connection.setDoInput(true);
        connection.setUseCaches(false);
        connection.setReadTimeout(5 * 1000);
        connection.setConnectTimeout(3 * 1000);
        // connection:Keep-Alive 表示在一次http请求中,服务器进行响应后,不再直接断开TCP连接,而是将TCP连接维持一段时间。
        // 在这段时间内,如果同一客户端再次向服务端发起http请求,便可以复用此TCP连接,向服务端发起请求。
        connection.setRequestProperty("connection", "Keep-Alive");
        // Content-Type 表示客户端向服务端发送的数据的媒体类型(MIME类型)
        connection.setRequestProperty("Content-Type", "application/json;charset=UTF-8");
        // Accept-Charset 表示客户端希望服务端返回的数据的媒体类型(MIME类型)
        connection.setRequestProperty("Accept-Charset", "application/json;charset=UTF-8");
        connection.connect();
        if (isPostMethod && data != null && data.length() > 0) {
            DataOutputStream dataOutputStream = new DataOutputStream(connection.getOutputStream());
            dataOutputStream.write(data.getBytes(Charset.defaultCharset()));
            dataOutputStream.flush();
            dataOutputStream.close();
        }
        int responseCode = connection.getResponseCode();
        if (responseCode != 200) {
            throw new RuntimeException("Http Request StatusCode(" + responseCode + ") Invalid");
        }
        bufferedReader = new BufferedReader(new InputStreamReader(connection.getInputStream(), Charset.defaultCharset()));
        StringBuilder stringBuilder = new StringBuilder();
        String line;
        while ((line = bufferedReader.readLine()) != null) {
            stringBuilder.append(line);
        }
        String responseMsg = stringBuilder.toString();
        XxlJobHelper.log(responseMsg);
    } catch (Exception e) {
        XxlJobHelper.log(e);
        XxlJobHelper.handleFail();
    } finally {
        try {
            if (bufferedReader != null) {
                bufferedReader.close();
            }
            if (connection != null) {
                connection.disconnect();
            }
        } catch (Exception e) {
            XxlJobHelper.log(e);
        }
    }
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/103852.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

5G无线技术基础自学系列 | RF优化概述

素材来源&#xff1a;《5G无线网络规划与优化》 一边学习一边整理内容&#xff0c;并与大家分享&#xff0c;侵权即删&#xff0c;谢谢支持&#xff01; 附上汇总贴&#xff1a;5G无线技术基础自学系列 | 汇总_COCOgsta的博客-CSDN博客 随着5G商用网络的陆续建设&#xff0c;…

疫情抑制珠宝消费增长,珠宝主要市场需求萎缩

一、我国珠宝行业市场规模持续增长 根据观研报告网发布的《2022年中国珠宝行业分析报告-行业现状与发展趋势分析》显示&#xff0c;2021年上半年&#xff0c;中国金饰需求增长强劲&#xff0c;中国金饰消费量在2021年第二季度趋于稳定&#xff0c;使上半年金饰总需求达到338吨…

chatGPT接口,不需要科学上网就可以调用的OPENAI接口

最近很多国产版本的chatGPT出现了 查找了一圈发现调用的openai 的一个接口 测试了一下效果还算可以吧 视频教程 链接: 视频教程 用接口自己写了一个测试的网页 日常使用应该是不成问题 接口整理好了在这&#xff1a; 请求接口&#xff1a; URL:-POST https://api.openai.…

m基于RBF神经网络和BP神经网络的信道估计误码率matlab仿真

目录 1.算法描述 2.仿真效果预览 3.MATLAB核心程序 4.完整MATLAB 1.算法描述 在无线通信系统中&#xff0c;从发射端发射的信号&#xff0c;经过直射、反射、散射等路径到达接收端。在ofdm系统中&#xff0c;为了获取更好的性能&#xff0c;需要进行信道估计获取信道的状态…

Diffusion Model合集 part1

扩散模型原理介绍1一&#xff0c;条件概率公式与高斯分布的KL散度重参数技巧二&#xff0c;VAE和多层VAE回顾单层VAE的原理公式与置信下界多层VAE的原理公式与置信下界三&#xff0c;Diffusion Model 图示四&#xff0c;扩散过程(Diffusion Process)与VAE的区别&#xff1a;如何…

腾讯云weda低代码让别人能看到发布的应用

我们先登录低代码的平台 腾讯云-控制台 (tencent.com) 我们从创建应用开始讲起&#xff0c;点击新建门户应用&#xff1a; 点击确定 我们先把这个应用发布。 确认发布。 显示出&#xff0c;下面这个已经发布成功后&#xff0c;我们就返回到主页面。 返回主界面&#xff0c;点…

《Docker系列》Docker安装MySQL 5.7

Docker安装MySQL 5.7 一、docker拉取MySQL 5.7镜像 docker pull mysql 拉取最新MySQL docker pull mysql:5.7 拉取指定版本MySQL 1 拉取mysql 5.7镜像 [rootzxy_master ~]# docker pull mysql:5.7 5.7: Pulling from library/mysql d26998a7c52d: Pull complete 4a9d8a356…

JavaWeb之Servelt学习

1.Servlet 概念&#xff1a;运行在服务端的小程序 Servlet就是一个接口&#xff0c;定义了java类被浏览器访问到&#xff08;tomcat识别&#xff09;的规则 将来我们自定义一个类&#xff0c;实现Servlet接口&#xff0c;复写方法 1.1快速入门 1.创建javaEE项目 2.定义类实…

DFS——连通性和搜索顺序(回溯)

文章目录概述连通性问题模板思考迷宫红与黑搜索顺序(回溯)模板思考马走日单词接龙分成互质组总结概述 定义 在深度优先搜索中&#xff0c;对于最新发现的顶点&#xff0c;如果它还有以此为顶点而未探测到的边&#xff0c;就沿此边继续探测下去&#xff0c;当顶点v的所有边都已…

JavaScript刷LeetCode拿offer-滑动窗口

一、前言 《JavaScript刷LeetCode拿offer-双指针技巧》中&#xff0c;简单地介绍了双指针技巧相比较单指针的优点&#xff0c;以及结合 Easy 难度的题目带大家进一步了解双指针的应用。 进入 Medium 难度之后&#xff0c;解题的关键在于如何构造双指针以及确定指针移动的规则…

从 0 开始学 Python 自动化测试开发(二):环境搭建

本文是「从 0 开始学 Python 自动化测试开发」专题系列文章第二篇 —— 环境搭建篇&#xff0c;适合零基础入门的同学。没有阅读过上一篇的同学&#xff0c;请戳主页看上一篇噢。作者方程老师&#xff0c;是前某跨国通信公司高级测试经理&#xff0c;目前为某互联网名企资深测试…

常见管理网络的net命令

目录1 简介2 常用命令2.1 net view2.2 net user2.3 net use2.4 net start2.5 net stop2.6 net share1 简介 net 命令是一种基于网络的命令&#xff0c;该命令包含了管理网络环境、服务、用户、登录等大部分重要的管理功能。 2 常用命令 2.1 net view 作用&#xff1a;显示域…

Spring 之 @Component 和 @Configuration 两者区别以及源码分析

之前一直搞不清 Component 和 Configuration 这两个注解到底有啥区别&#xff0c;一直认为被这两修饰的类可以被 Spring 实例化嘛&#xff0c;不&#xff0c;还是见识太短&#xff0c;直到今天才发现这两玩意有这么大区别。很幸运能够及时发现&#xff0c;后面可以少走点坑&…

操作系统知识点

操作系统的目标&#xff1a; 方便&#xff1a;使计算机系统易用 有效&#xff1a;以更有效的方式使用计算机系统资源 扩展&#xff1a;方便用户有效开发、测试和引进新功能 操作系统的 作用&#xff1a; 1. 有效的管理资源 2.通过命令接口、编程接口等为用户提供各种…

【自然语言处理】【ChatGPT系列】Chain of Thought:从大模型中引导出推理能力

Chain-of-Thought Prompting&#xff1a;从大模型中引导出推理能力《Chain-of-Thought Prompting Elicits Reasoning in Large Language Models》论文地址&#xff1a;https://arxiv.org/pdf/2201.11903.pdf 相关博客 【自然语言处理】【ChatGPT系列】Chain of Thought&#xf…

什么是加权轮询?云解析DNS是否支持加权轮询?-中科三方

什么是加权轮询&#xff1f; 所谓的加权轮询算法&#xff0c;其实就是Weighted Round Robin&#xff0c;简称wrr。在我们配置Nginx的upstream的时候&#xff0c;带权重的轮询&#xff0c;其实就是wrr。 upstream backend { ip_hash; server 192.168.1.232 weight4; server 19…

无疫苗未吃药,48小时内阳康全纪实个案

为了不误导不同体质的人&#xff0c;特意强调这是个案&#xff0c;阳康方案仅供参考。小编体质偏寒&#xff0c;长期熬夜&#xff0c;黑白颠倒&#xff0c;有习惯性头痛症。经验总结&#xff1a;1.受到风寒是病发的直接导火索&#xff0c;就算携带病毒&#xff0c;本体没有受到…

Android实现红绿灯检测(含Android源码 可实时运行)

Android实现红绿灯检测(含Android源码 可实时运行) 目录 Android实现红绿灯检测(含Android源码 可实时运行) 1. 前言 2. 红绿灯检测数据集说明 3. 基于YOLOv5的红绿灯检测模型训练 4.红绿灯检测模型Android部署 &#xff08;1&#xff09; 将Pytorch模型转换ONNX模型 &…

吉林优美姿:抖音怎么增加销量?

为了更好的在做抖音好物联盟&#xff0c;那么一些抖音达人也会想方设法的去报名申请&#xff0c;那么大家是否真的清楚这个报名要怎么做呢&#xff1f;具体有什么要求呢&#xff1f;跟着吉林优美姿小编来一起看看吧&#xff01; 注册要求&#xff1a;凡注册抖音 APP并开通产品分…

大数据培训Impala之存储和压缩

注&#xff1a;impala不支持ORC格式 1.创建parquet格式的表并插入数据进行查询 [hadoop104:21000] > create table student2(id int, name string) > row format delimited > fields terminated by ‘\t’ > stored as PARQUET; [hadoop104:21000] > insert …