xxl-job 分布式任务调度 基本使用

news2025/1/10 16:25:42

xxl-job

是一个分布式任务调度平台,使用非常方便。
官网:https://gitee.com/xuxueli0323/xxl-job
工作原理类似于nacos 执行器注册到调度中心 调度中心分配任务 执行器执行任务

docker-compose 配置

version: '3'
services:
  xxl-job:
    image: xuxueli/xxl-job-admin:2.3.1
    container_name: xxl-job
    environment:
      PARAMS: '--server.port=8800
      --server.servlet.context-path=/xxl-job-admin
      --spring.datasource.url=jdbc:mysql://192.168.56.200:3306/xxl_job?useUnicode=true&characterEncoding=UTF-8&autoReconnect=true&serverTimezone=Asia/Shanghai
      --spring.datasource.username=root
      --spring.datasource.password=123456
      --xxl.job.accessToken=jhjxxljob'
    volumes:
      - ./data/logs:/home/xxl-job/applogs
    ports:
      - 49170:8800
    restart: always

springboot 集成

pom

<dependency>
    <groupId>com.xuxueli</groupId>
    <artifactId>xxl-job-core</artifactId>
</dependency>

config bean

package com.jhj.media.config;

import com.xxl.job.core.executor.impl.XxlJobSpringExecutor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
 * xxl-job config
 *
 * @author xuxueli 2017-04-28
 */
@Configuration
public class XxlJobConfig {
    private Logger logger = LoggerFactory.getLogger(XxlJobConfig.class);

    @Value("${xxl.job.admin.addresses}")
    private String adminAddresses;

    @Value("${xxl.job.accessToken}")
    private String accessToken;

    @Value("${xxl.job.executor.appname}")
    private String appname;

    @Value("${xxl.job.executor.address}")
    private String address;

    @Value("${xxl.job.executor.ip}")
    private String ip;

    @Value("${xxl.job.executor.port}")
    private int port;

    @Value("${xxl.job.executor.logpath}")
    private String logPath;

    @Value("${xxl.job.executor.logretentiondays}")
    private int logRetentionDays;


    @Bean
    public XxlJobSpringExecutor xxlJobExecutor() {
        logger.info(">>>>>>>>>>> xxl-job config init.");
        XxlJobSpringExecutor xxlJobSpringExecutor = new XxlJobSpringExecutor();
        xxlJobSpringExecutor.setAdminAddresses(adminAddresses);
        xxlJobSpringExecutor.setAppname(appname);
        xxlJobSpringExecutor.setAddress(address);
        xxlJobSpringExecutor.setIp(ip);
        xxlJobSpringExecutor.setPort(port);
        xxlJobSpringExecutor.setAccessToken(accessToken);
        xxlJobSpringExecutor.setLogPath(logPath);
        xxlJobSpringExecutor.setLogRetentionDays(logRetentionDays);

        return xxlJobSpringExecutor;
    }

    /**
     * 针对多网卡、容器内部署等情况,可借助 "spring-cloud-commons" 提供的 "InetUtils" 组件灵活定制注册IP;
     *
     *      1、引入依赖:
     *          <dependency>
     *             <groupId>org.springframework.cloud</groupId>
     *             <artifactId>spring-cloud-commons</artifactId>
     *             <version>${version}</version>
     *         </dependency>
     *
     *      2、配置文件,或者容器启动变量
     *          spring.cloud.inetutils.preferred-networks: 'xxx.xxx.xxx.'
     *
     *      3、获取IP
     *          String ip_ = inetUtils.findFirstNonLoopbackHostInfo().getIpAddress();
     */


}

bootstrap.yml

xxl:
  job:
    admin:
      addresses: http://192.168.56.200:49170/xxl-job-admin
    executor:
      appname: testHandler
      address:
      ip:
      port: 9999
      logpath: /data/applogs/xxl-job/jobhandler
      logretentiondays: 30
    accessToken: jhjxxljob

使用

执行器
在这里插入图片描述

任务
在这里插入图片描述
任务中的高级配置 原理和线程池类似,路由策略和nginx,springcloud gateway等类似,不过多了一个分片广播,意思就是将所有的任务都广播到每一个执行器,需要自己在执行器手动进行判断哪些分片该哪些执行器执行。

package com.jhj.media.service.jobhandler;

import com.xxl.job.core.context.XxlJobHelper;
import com.xxl.job.core.handler.annotation.XxlJob;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;

import java.io.BufferedInputStream;
import java.io.BufferedReader;
import java.io.DataOutputStream;
import java.io.InputStreamReader;
import java.net.HttpURLConnection;
import java.net.URL;
import java.util.Arrays;
import java.util.concurrent.TimeUnit;

/**
 * XxlJob开发示例(Bean模式)
 *
 * 开发步骤:
 *      1、任务开发:在Spring Bean实例中,开发Job方法;
 *      2、注解配置:为Job方法添加注解 "@XxlJob(value="自定义jobhandler名称", init = "JobHandler初始化方法", destroy = "JobHandler销毁方法")",注解value值对应的是调度中心新建任务的JobHandler属性的值。
 *      3、执行日志:需要通过 "XxlJobHelper.log" 打印执行日志;
 *      4、任务结果:默认任务结果为 "成功" 状态,不需要主动设置;如有诉求,比如设置任务结果为失败,可以通过 "XxlJobHelper.handleFail/handleSuccess" 自主设置任务结果;
 *
 * @author xuxueli 2019-12-11 21:52:51
 */
@Component
public class SampleXxlJob {
    private static Logger logger = LoggerFactory.getLogger(SampleXxlJob.class);


    /**
     * 1、简单任务示例(Bean模式)
     */
    @XxlJob("demoJobHandler")
    public void demoJobHandler() throws Exception {
        XxlJobHelper.log("XXL-JOB, Hello World.");

        for (int i = 0; i < 5; i++) {
            XxlJobHelper.log("beat at:" + i);
            TimeUnit.SECONDS.sleep(2);
        }
        // default success
    }


    /**
     * 2、分片广播任务
     */
    @XxlJob("shardingJobHandler")
    public void shardingJobHandler() throws Exception {

        // 分片参数
        int shardIndex = XxlJobHelper.getShardIndex();
        int shardTotal = XxlJobHelper.getShardTotal();

        logger.info("分片参数:当前分片序号 = {}, 总分片数 = {}", shardIndex, shardTotal);

        // 业务逻辑
        for (int i = 0; i < shardTotal; i++) {
            if (i == shardIndex) {
                logger.info("第 {} 片, 命中分片开始处理", i);
            } else {
                logger.info("第 {} 片, 忽略", i);
            }
        }

    }


    /**
     * 3、命令行任务
     */
    @XxlJob("commandJobHandler")
    public void commandJobHandler() throws Exception {
        String command = XxlJobHelper.getJobParam();
        int exitValue = -1;

        BufferedReader bufferedReader = null;
        try {
            // command process
            ProcessBuilder processBuilder = new ProcessBuilder();
            processBuilder.command(command);
            processBuilder.redirectErrorStream(true);

            Process process = processBuilder.start();
            //Process process = Runtime.getRuntime().exec(command);

            BufferedInputStream bufferedInputStream = new BufferedInputStream(process.getInputStream());
            bufferedReader = new BufferedReader(new InputStreamReader(bufferedInputStream));

            // command log
            String line;
            while ((line = bufferedReader.readLine()) != null) {
                XxlJobHelper.log(line);
            }

            // command exit
            process.waitFor();
            exitValue = process.exitValue();
        } catch (Exception e) {
            XxlJobHelper.log(e);
        } finally {
            if (bufferedReader != null) {
                bufferedReader.close();
            }
        }

        if (exitValue == 0) {
            // default success
        } else {
            XxlJobHelper.handleFail("command exit value("+exitValue+") is failed");
        }

    }


    /**
     * 4、跨平台Http任务
     *  参数示例:
     *      "url: http://www.baidu.com\n" +
     *      "method: get\n" +
     *      "data: content\n";
     */
    @XxlJob("httpJobHandler")
    public void httpJobHandler() throws Exception {

        // param parse
        String param = XxlJobHelper.getJobParam();
        if (param==null || param.trim().length()==0) {
            XxlJobHelper.log("param["+ param +"] invalid.");

            XxlJobHelper.handleFail();
            return;
        }

        String[] httpParams = param.split("\n");
        String url = null;
        String method = null;
        String data = null;
        for (String httpParam: httpParams) {
            if (httpParam.startsWith("url:")) {
                url = httpParam.substring(httpParam.indexOf("url:") + 4).trim();
            }
            if (httpParam.startsWith("method:")) {
                method = httpParam.substring(httpParam.indexOf("method:") + 7).trim().toUpperCase();
            }
            if (httpParam.startsWith("data:")) {
                data = httpParam.substring(httpParam.indexOf("data:") + 5).trim();
            }
        }

        // param valid
        if (url==null || url.trim().length()==0) {
            XxlJobHelper.log("url["+ url +"] invalid.");

            XxlJobHelper.handleFail();
            return;
        }
        if (method==null || !Arrays.asList("GET", "POST").contains(method)) {
            XxlJobHelper.log("method["+ method +"] invalid.");

            XxlJobHelper.handleFail();
            return;
        }
        boolean isPostMethod = method.equals("POST");

        // request
        HttpURLConnection connection = null;
        BufferedReader bufferedReader = null;
        try {
            // connection
            URL realUrl = new URL(url);
            connection = (HttpURLConnection) realUrl.openConnection();

            // connection setting
            connection.setRequestMethod(method);
            connection.setDoOutput(isPostMethod);
            connection.setDoInput(true);
            connection.setUseCaches(false);
            connection.setReadTimeout(5 * 1000);
            connection.setConnectTimeout(3 * 1000);
            connection.setRequestProperty("connection", "Keep-Alive");
            connection.setRequestProperty("Content-Type", "application/json;charset=UTF-8");
            connection.setRequestProperty("Accept-Charset", "application/json;charset=UTF-8");

            // do connection
            connection.connect();

            // data
            if (isPostMethod && data!=null && data.trim().length()>0) {
                DataOutputStream dataOutputStream = new DataOutputStream(connection.getOutputStream());
                dataOutputStream.write(data.getBytes("UTF-8"));
                dataOutputStream.flush();
                dataOutputStream.close();
            }

            // valid StatusCode
            int statusCode = connection.getResponseCode();
            if (statusCode != 200) {
                throw new RuntimeException("Http Request StatusCode(" + statusCode + ") Invalid.");
            }

            // result
            bufferedReader = new BufferedReader(new InputStreamReader(connection.getInputStream(), "UTF-8"));
            StringBuilder result = new StringBuilder();
            String line;
            while ((line = bufferedReader.readLine()) != null) {
                result.append(line);
            }
            String responseMsg = result.toString();

            XxlJobHelper.log(responseMsg);

            return;
        } catch (Exception e) {
            XxlJobHelper.log(e);

            XxlJobHelper.handleFail();
            return;
        } finally {
            try {
                if (bufferedReader != null) {
                    bufferedReader.close();
                }
                if (connection != null) {
                    connection.disconnect();
                }
            } catch (Exception e2) {
                XxlJobHelper.log(e2);
            }
        }

    }

    /**
     * 5、生命周期任务示例:任务初始化与销毁时,支持自定义相关逻辑;
     */
    @XxlJob(value = "demoJobHandler2", init = "init", destroy = "destroy")
    public void demoJobHandler2() throws Exception {
        XxlJobHelper.log("XXL-JOB, Hello World.");
    }
    public void init(){
        logger.info("init");
    }
    public void destroy(){
        logger.info("destroy");
    }


}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1863249.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

【神经网络】深入理解多层神经网络(深度神经网络

&#x1f388;个人主页&#xff1a;豌豆射手^ &#x1f389;欢迎 &#x1f44d;点赞✍评论⭐收藏 &#x1f91d;希望本文对您有所裨益&#xff0c;如有不足之处&#xff0c;欢迎在评论区提出指正&#xff0c;让我们共同学习、交流进步&#xff01; 深入理解多层神经网络&#x…

【C++题解】1712. 输出满足条件的整数2

问题&#xff1a;1712. 输出满足条件的整数2 类型&#xff1a;简单循环 题目描述&#xff1a; 有这样的三位数&#xff0c;其百位、十位、个位的数字之和为偶数&#xff0c;且百位大于十位&#xff0c;十位大于个位&#xff0c;请输出满所有满足条件的整数。 输入&#xff1…

博客solo!bolo-solo让博客创作更自由。

bolo-solo&#xff1a;独行侠的数字笔录&#xff0c; 你的博客新伙伴- 精选真开源&#xff0c;释放新价值。 概览 bolo-solo是GitHub 上一个开源的个人博客系统&#xff1a;Bolo Solo&#xff0c;简单易部署&#xff0c;自带精致主题、数据统计表、邮件提醒、自定义图床、功能…

利用Linked SQL Server提权

点击星标&#xff0c;即时接收最新推文 本文选自《内网安全攻防&#xff1a;红队之路》 扫描二维码五折购书 利用Linked SQL Server提权 Linked SQL server是一个SQL Server数据库中的对象&#xff0c;它可以连接到另一个SQL Server或非SQL Server数据源&#xff08;如Oracle&a…

规则引擎-Aviator 表达式校验是否成立

目录 介绍特性使用更多文献支持 介绍 Aviator是一个轻量级、高性能的Java表达式执行引擎&#xff0c;它动态地将表达式编译成字节码并运行。 特性 支持绝大多数运算操作符&#xff0c;包括算术操作符、关系运算符、逻辑操作符、位运算符、正则匹配操作符(~)、三元表达式(?:…

Java学习十一—Java8特性之Stream流

一、Java8新特性简介 2014年3月18日&#xff0c;JDK8发布&#xff0c;提供了Lambda表达式支持、内置Nashorn JavaScript引擎支持、新的时间日期API、彻底移除HotSpot永久代。 ​ Java 8引入了许多令人兴奋的新特性&#xff0c;其中最引人注目的是Lambda表达式和Stream API。以…

【redis】redis概述

1、定义 Redis&#xff08;Remote Dictionary Server&#xff09;&#xff0c;即远程字典服务&#xff0c;是一个开源的、内存中的数据结构存储系统。redis是一个key-value存储系统。和Memcached类似&#xff0c;它支持存储的value类型相对更多&#xff0c;包括string(字符串)…

电脑开机启动项在哪里设置?3个方法教你轻松找到!

“有朋友知道电脑开机启动项在哪里设置吗&#xff1f;我想在里面结束一些程序&#xff0c;但是不知道怎么找到这个功能&#xff0c;请大家帮帮我&#xff01;” 电脑开机启动项的设置对于优化系统启动速度、管理后台运行程序具有重要意义。通过合理配置启动项&#xff0c;我们可…

《Windows API每日一练》6.2 客户区鼠标消息

第五章已经讲到&#xff0c;Windows只会把键盘消息发送到当前具有输入焦点的窗口。鼠标消息则不同&#xff1a;当鼠标经过窗口或在窗口内被单击&#xff0c;则即使该窗口是非活动窗口或不带输入焦点&#xff0c; 窗口过程还是会收到鼠标消息。Windows定义了 21种鼠标消息。不过…

github无法访问,下载慢的解决方法

GitHub是一个存储分享无数的开源项目和代码的宝库网站。然而&#xff0c;由于一些原因&#xff0c;国内用户在访问GitHub时常常遭遇无法访问或下载速度缓慢的问题。这不仅影响了开发者的工作效率&#xff0c;也使一些想要访问下载github文件的普通用户遇到困难。下面小编就来和…

什么是CMSIS || 标准库与HAL库

一&#xff0c;ARM&#xff08;Cortex Microcontroller Software Interface Standard&#xff09; ARM Cortex™ 微控制器软件接口标准&#xff08;Cortex Microcontroller Software Interface Standard&#xff09;是 CortexM 处理器系列的与供应商无关的硬件抽象层。…

Access Levels in Swift

Access Levels (访问级别) Swift provides six different access levels for entities(实体) within your code. These access levels are relative to the source file in which an entity is defined, the module(模块) that source file belongs to, and the package that …

hive架构详解:HQL案例解析(第15天)

系列文章目录 一、Hive基础架构&#xff08;重点&#xff09; 二、Hive数据库,表操作&#xff08;重点&#xff09; 三、Hadoop架构详解(hdfs)&#xff08;补充&#xff09; 四、Hive环境准备&#xff08;操作&#xff09;(补充) 文章目录 系列文章目录前言一、Hive基础架构1、…

【Excel】单元格如何设置可选项、固定表头

设置可选项 固定表头&#xff1a;视图---冻结窗口

Python | Leetcode Python题解之第198题打家劫舍

题目&#xff1a; 题解&#xff1a; class Solution:def rob(self, nums: List[int]) -> int:if not nums:return 0size len(nums)if size 1:return nums[0]first, second nums[0], max(nums[0], nums[1])for i in range(2, size):first, second second, max(first nu…

wps的domain转为shp矢量

wps的namelist制作、python出图和转矢量 简介 wps&#xff08;WRF Preprocessing System&#xff09;是中尺度数值天气预报系统WRF(Weather Research and Forecasting)的预处理系统。 wps的安装地址在GitHub上&#xff1a;https://github.com/wrf-model/WPS 下载完成后&…

循环神经网络——RNN

循环神经网络 在之前NLP基础章节-语言模型中我们介绍了 n n n 元语法&#xff0c;其中单词 x t x_t xt​ 在时间步 t t t 的条件概率仅取决于前面 n n n 个单词&#xff0c;若是想要将之前单词的影响也加入那么模型参数数量会指数级增长。但是可能之前的单词存在重要的信息…

Linux-笔记 高级I/O操作

前言 I/O&#xff08;Input/Output&#xff0c;输入/输出&#xff09;是计算机系统中的一个重要组成部分&#xff0c;它是指计算机与 外部世界之间的信息交流过程。I/O 操作是计算机系统中的一种基本操作&#xff0c;用于向外部设备&#xff08;如 硬盘、键盘、鼠标、网络等&am…

服务器数据恢复—异常断电导致RAID6阵列中磁盘出现坏扇区的数据恢复案例

服务器存储数据恢复环境&#xff1a; 一台存储中有一组由12块SAS硬盘组建的RAID6磁盘阵列&#xff0c;划分为一个卷&#xff0c;分配给几台Vmware ESXI主机做共享存储。该卷中存放了大量Windows虚拟机&#xff0c;这些虚拟机系统盘是统一大小&#xff0c;数据盘大小不确定&…

服务器硬件及RAID配置

目录 一、RAID磁盘阵列 1.概念 2.RAID 0 3.RAID 1 4.RAID 5 5.RAID 6 6.RAID 10 二、阵列卡 1.简介 2.缓存 三、创建 1.创建RAID 0 2.创建RAID 1 3.创建RAID 5 4.创建RAID 10 四、模拟故障 一、RAID磁盘阵列 1.概念 &#xff08;1&#xff09;是Redundant Array …