Java | 一分钟掌握定时任务 | 8 - XXL-Job分布式定时任务

news2025/2/26 22:10:46

作者:Mars酱

声明:本文章由Mars酱编写,部分内容来源于网络,如有疑问请联系本人。

转载:欢迎转载,转载前先请联系我!

前言

java定时任务的框架可真是多啊,XXL-JOB也是一个分布式任务调度平台,其核心设计目标是开发迅速、学习简单、轻量级、易扩展,还是开源的,Mars酱只好下载下来看看了。

架构图

从架构图上可以看出,xxl-job并没依赖第三方的调度服务,而是自研的。那么我们看怎么使用,再研究它内部的原理吧。

任务的分类

启动xxl-job-admin,进入管理控制台,新建一个任务的时候可以看到任务的运行模式有几种,除了第一种BEAN模式,其他的都是GLUE开头的模式

GLUE类型的都能在xxl-job中通过提供的在线编辑器直接编写源码,编写完成之后的任务信息会保存在xxl-job的xxl_job_info表的glue_source字段中,能够在线编译任务,然后直接调试,确实方便了不少。

跟踪GLUE模式的实现逻辑

新建好一个java的GLUE运行模式后,在页面上选择执行,会发送一个http请求给管理控制台

跟踪/trigger这个请求地址,会到JobTriggerPoolHelper中的addTrigger函数:

    /**
     * add trigger
     */
    public void addTrigger(final int jobId,
                           final TriggerTypeEnum triggerType,
                           final int failRetryCount,
                           final String executorShardingParam,
                           final String executorParam,
                           final String addressList) {

        // choose thread pool
        ThreadPoolExecutor triggerPool_ = fastTriggerPool;
        AtomicInteger jobTimeoutCount = jobTimeoutCountMap.get(jobId);
        if (jobTimeoutCount!=null && jobTimeoutCount.get() > 10) {      // job-timeout 10 times in 1 min
            triggerPool_ = slowTriggerPool;
        }

        // trigger
        triggerPool_.execute(new Runnable() {
            @Override
            public void run() {

                long start = System.currentTimeMillis();

                try {
                    // do trigger
                    XxlJobTrigger.trigger(jobId, triggerType, failRetryCount, executorShardingParam, executorParam, addressList);
                } catch (Exception e) {
                    logger.error(e.getMessage(), e);
                } finally {

                    // check timeout-count-map
                    long minTim_now = System.currentTimeMillis()/60000;
                    if (minTim != minTim_now) {
                        minTim = minTim_now;
                        jobTimeoutCountMap.clear();
                    }

                    // incr timeout-count-map
                    long cost = System.currentTimeMillis()-start;
                    if (cost > 500) {       // ob-timeout threshold 500ms
                        AtomicInteger timeoutCount = jobTimeoutCountMap.putIfAbsent(jobId, new AtomicInteger(1));
                        if (timeoutCount != null) {
                            timeoutCount.incrementAndGet();
                        }
                    }

                }

            }
        });
    }

这段代码大致流程就是:

  1. 选择线程池,分快池和慢池,通过计数器的次数判断应该选择哪个池;
  2. 在线程池中调用XxlJobTrigger的trigger方法;
  3. 执行完毕之后判断耗时时间,并在计数器中记录次数,方便下次判断使用快池还是慢池;

跟踪XxlJobTrigger的trigger方法,会到runExecutor方法:

    /**
     * run executor
     * @param triggerParam
     * @param address
     * @return
     */
    public static ReturnT<String> runExecutor(TriggerParam triggerParam, String address){
        ReturnT<String> runResult = null;
        try {
            ExecutorBiz executorBiz = XxlJobScheduler.getExecutorBiz(address);
            runResult = executorBiz.run(triggerParam);
        } catch (Exception e) {
            logger.error(">>>>>>>>>>> xxl-job trigger error, please check if the executor[{}] is running.", address, e);
            runResult = new ReturnT<String>(ReturnT.FAIL_CODE, ThrowableUtil.toString(e));
        }

        StringBuffer runResultSB = new StringBuffer(I18nUtil.getString("jobconf_trigger_run") + ":");
        runResultSB.append("<br>address:").append(address);
        runResultSB.append("<br>code:").append(runResult.getCode());
        runResultSB.append("<br>msg:").append(runResult.getMsg());

        runResult.setMsg(runResultSB.toString());
        return runResult;
    }

runExecutor的方法调用了ExecutorBiz的接口run函数,并返回结果给runExecutor,而ExecutorBiz有两个实现,一个是ExecutorBizImpl,另一个是ExecutorBizClient。后者简单,发送一个post请求,其目标就是下发任务;前者复杂,目的是执行任务,代码Mars酱要贴出来:

    @Override
    public ReturnT<String> run(TriggerParam triggerParam) {
        // load old:jobHandler + jobThread
        JobThread jobThread = XxlJobExecutor.loadJobThread(triggerParam.getJobId());
        IJobHandler jobHandler = jobThread!=null?jobThread.getHandler():null;
        String removeOldReason = null;

        // valid:jobHandler + jobThread
        GlueTypeEnum glueTypeEnum = GlueTypeEnum.match(triggerParam.getGlueType());
        if (GlueTypeEnum.BEAN == glueTypeEnum) {

            // new jobhandler
            IJobHandler newJobHandler = XxlJobExecutor.loadJobHandler(triggerParam.getExecutorHandler());

            // valid old jobThread
            if (jobThread!=null && jobHandler != newJobHandler) {
                // change handler, need kill old thread
                removeOldReason = "change jobhandler or glue type, and terminate the old job thread.";

                jobThread = null;
                jobHandler = null;
            }

            // valid handler
            if (jobHandler == null) {
                jobHandler = newJobHandler;
                if (jobHandler == null) {
                    return new ReturnT<String>(ReturnT.FAIL_CODE, "job handler [" + triggerParam.getExecutorHandler() + "] not found.");
                }
            }

        } else if (GlueTypeEnum.GLUE_GROOVY == glueTypeEnum) {

            // valid old jobThread
            if (jobThread != null &&
                    !(jobThread.getHandler() instanceof GlueJobHandler
                        && ((GlueJobHandler) jobThread.getHandler()).getGlueUpdatetime()==triggerParam.getGlueUpdatetime() )) {
                // change handler or gluesource updated, need kill old thread
                removeOldReason = "change job source or glue type, and terminate the old job thread.";

                jobThread = null;
                jobHandler = null;
            }

            // valid handler
            if (jobHandler == null) {
                try {
                    IJobHandler originJobHandler = GlueFactory.getInstance().loadNewInstance(triggerParam.getGlueSource());
                    jobHandler = new GlueJobHandler(originJobHandler, triggerParam.getGlueUpdatetime());
                } catch (Exception e) {
                    logger.error(e.getMessage(), e);
                    return new ReturnT<String>(ReturnT.FAIL_CODE, e.getMessage());
                }
            }
        } else if (glueTypeEnum!=null && glueTypeEnum.isScript()) {

            // valid old jobThread
            if (jobThread != null &&
                    !(jobThread.getHandler() instanceof ScriptJobHandler
                            && ((ScriptJobHandler) jobThread.getHandler()).getGlueUpdatetime()==triggerParam.getGlueUpdatetime() )) {
                // change script or gluesource updated, need kill old thread
                removeOldReason = "change job source or glue type, and terminate the old job thread.";

                jobThread = null;
                jobHandler = null;
            }

            // valid handler
            if (jobHandler == null) {
                jobHandler = new ScriptJobHandler(triggerParam.getJobId(), triggerParam.getGlueUpdatetime(), triggerParam.getGlueSource(), GlueTypeEnum.match(triggerParam.getGlueType()));
            }
        } else {
            return new ReturnT<String>(ReturnT.FAIL_CODE, "glueType[" + triggerParam.getGlueType() + "] is not valid.");
        }

        // executor block strategy
        if (jobThread != null) {
            ExecutorBlockStrategyEnum blockStrategy = ExecutorBlockStrategyEnum.match(triggerParam.getExecutorBlockStrategy(), null);
            if (ExecutorBlockStrategyEnum.DISCARD_LATER == blockStrategy) {
                // discard when running
                if (jobThread.isRunningOrHasQueue()) {
                    return new ReturnT<String>(ReturnT.FAIL_CODE, "block strategy effect:"+ExecutorBlockStrategyEnum.DISCARD_LATER.getTitle());
                }
            } else if (ExecutorBlockStrategyEnum.COVER_EARLY == blockStrategy) {
                // kill running jobThread
                if (jobThread.isRunningOrHasQueue()) {
                    removeOldReason = "block strategy effect:" + ExecutorBlockStrategyEnum.COVER_EARLY.getTitle();

                    jobThread = null;
                }
            } else {
                // just queue trigger
            }
        }

        // replace thread (new or exists invalid)
        if (jobThread == null) {
            jobThread = XxlJobExecutor.registJobThread(triggerParam.getJobId(), jobHandler, removeOldReason);
        }

        // push data to queue
        ReturnT<String> pushResult = jobThread.pushTriggerQueue(triggerParam);
        return pushResult;
    }

是不是很长很复杂,我看了下,这段大概流程就是这样:

  1. 判断glue的模式;
  2. 读取模式下的source字段的内容;
  3. 创建对应glue模式下的jobHandler对象。jobHandler模式的对象有以下三种:

  1. 判断执行策略。执行策略分三种:SERIAL_EXECUTIONDISCARD_LATERCOVER_EARLY
  2. 注册任务线程,并调用线程的star方法;
  3. 任务线程放入trigger队列;

以上就是跟踪GLUE运行模式得到的流程,枯燥无味,是不是?

跟踪BEAN模式的实现逻辑

在跑起来admin服务之后,自带了一个例子,那就是一个BEAN模式的实现。跟踪这个例子得到的流程和上面的流程一样,最终都会进入到ExecutorBizImpl的run方法,在第一步判断glue模式的时候,直接会调用jobHandler的实现类MethodJobHandler去执行了。只是在直接执行之前已经把对象注册到了资源库中,注册的代码在XxlJobExecutor的registJobHandler中:

    protected void registJobHandler(XxlJob xxlJob, Object bean, Method executeMethod){
        if (xxlJob == null) {
            return;
        }

        String name = xxlJob.value();
        //make and simplify the variables since they'll be called several times later
        Class<?> clazz = bean.getClass();
        String methodName = executeMethod.getName();
        if (name.trim().length() == 0) {
            throw new RuntimeException("xxl-job method-jobhandler name invalid, for[" + clazz + "#" + methodName + "] .");
        }
        if (loadJobHandler(name) != null) {
            throw new RuntimeException("xxl-job jobhandler[" + name + "] naming conflicts.");
        }

        // execute method
        /*if (!(method.getParameterTypes().length == 1 && method.getParameterTypes()[0].isAssignableFrom(String.class))) {
            throw new RuntimeException("xxl-job method-jobhandler param-classtype invalid, for[" + bean.getClass() + "#" + method.getName() + "] , " +
                    "The correct method format like " public ReturnT<String> execute(String param) " .");
        }
        if (!method.getReturnType().isAssignableFrom(ReturnT.class)) {
            throw new RuntimeException("xxl-job method-jobhandler return-classtype invalid, for[" + bean.getClass() + "#" + method.getName() + "] , " +
                    "The correct method format like " public ReturnT<String> execute(String param) " .");
        }*/

        executeMethod.setAccessible(true);

        // init and destroy
        Method initMethod = null;
        Method destroyMethod = null;

        if (xxlJob.init().trim().length() > 0) {
            try {
                initMethod = clazz.getDeclaredMethod(xxlJob.init());
                initMethod.setAccessible(true);
            } catch (NoSuchMethodException e) {
                throw new RuntimeException("xxl-job method-jobhandler initMethod invalid, for[" + clazz + "#" + methodName + "] .");
            }
        }
        if (xxlJob.destroy().trim().length() > 0) {
            try {
                destroyMethod = clazz.getDeclaredMethod(xxlJob.destroy());
                destroyMethod.setAccessible(true);
            } catch (NoSuchMethodException e) {
                throw new RuntimeException("xxl-job method-jobhandler destroyMethod invalid, for[" + clazz + "#" + methodName + "] .");
            }
        }

        // registry jobhandler
        registJobHandler(name, new MethodJobHandler(bean, executeMethod, initMethod, destroyMethod));

    }

流程这种就是通过反射去得到指定的对象和方法,最后的registJob就是往ConcurrentHashMap中put反射之后得到的对象。

大致的流程都是和GLUE运行模式重叠的,只是注册的方式有小小的差别:一个是根据指定的值去反射得到需要执行的方法,另一个是数据库存储源码,通过GroovyClassLoader反射之后得到对象并执行方法。底层虽然都是反射,但是方式就这么一点点区别。

如何路由?

在addTrigger中有一段XxlJobTrigger.trigger,这里会在调用runExecutor之前调用processTrigger方法去实现具体的路由策略:

private static void processTrigger(XxlJobGroup group, XxlJobInfo jobInfo, int finalFailRetryCount, TriggerTypeEnum triggerType, int index, int total){
	// ... 省略其他代码
    // 路由策略
    ExecutorRouteStrategyEnum executorRouteStrategyEnum = ExecutorRouteStrategyEnum.match(jobInfo.getExecutorRouteStrategy(), null);    
	// ... 省略其他代码
}

路由策略由ExecutorRoute接口定义,有不同的实现方式:

这些路由策略,大多根据名字就能知道,如果想知道具体的实现逻辑,进入指定的路由实现类查看就行了。创建好的路由策略会在processTrigger方法体中调用route方法,然后会被执行。

总结

xxl-job我省略了很多的其他的细节,比如分片流程。总的来说,xxl-job分两个部分,一个是调度中心,一个是执行器,调度中心就是admin服务,执行器需要自己写,官方提供了两个例子,一个基于springboot的,一个无框架的,任务整体的流转流程我还是补个网图:

步骤说明:

  1. 执行器往调度中心注册,并持久化;
  2. 执行的时候下发到指定的执行器;
  3. 执行器完成之后把结果丢给调度中心存储执行结果,并记录好执行的日志。

xxl-job大抵就这样了,有问题请告诉我,Mars酱会修正。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/557889.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

开源进展|WeCross v1.3.0发布,支持适配FISCO BCOS v3.0

WeCross是微众银行自主研发并完全开源的区块链跨链协作平台&#xff0c;致力于促进跨行业、机构和地域的跨区块链信任传递和商业合作&#xff0c;有助于实现异构区块链系统之间安全可信的互操作。 WeCross v1.2.0自发布以来&#xff0c;得到了众多社区伙伴的支持和反馈。目前&…

内网渗透(八十)之搭建额外域控

搭建额外域控 我们在之前搭建完成Windows Server 2012 R2 域控的基础上搭建一个额外的域控。多个域控的好处在于,当其中有域控出现了故障,仍然能够由其他域控来提供服务。选择一台Windows Server 2012 R2 服务器作为额外域控,主机名为DC2. 首先在DC2上配置IP地址为192.168…

如何有效控制城镇供水管网漏损

漏损问题是影响城镇供水管网稳定与可靠运行的重要问题。其中&#xff0c;城镇供水管网运行中&#xff0c;一旦存在漏损情况不仅会对供水管网的供水水质产生影响&#xff0c;导致其水质降低&#xff0c;而且会出现供水压力与供水量减少等变化&#xff0c;对供水企业的供水服务质…

硬核数据处理笔记本推荐(2023版)

2023年硬核数据处理笔记本推荐它来了&#xff01;&#xff01;&#xff01;在大家的呼声中它来了&#xff01;&#xff01;&#xff01; 去年的推荐收货不少好评&#xff0c;今年继续为大家分享选购攻略&#xff01; 选购背景&#xff1a; 1.今年英特尔处理器挤牙膏、出套娃…

【开源】diy一个wifi遥控小飞机

完成效果&#xff1a; 童年的纸飞机 资料中包含了PCB和参考的小飞机模型&#xff0c;我当时是用某宝上几块钱的手抛小飞机改装的&#xff0c;需要一定的动手能力。 硬件 材料 720空心杯电机 * 2107正反桨一对&#xff0c;搭配电机3.7V 300mAh锂离子电池 * 1控制板 * 148cm手…

【C++】异常+智能指针+特殊类和类型转换

上天可能觉得我太孤独&#xff0c;派你来和我一起对抗虚无。 文章目录 一、异常1.传统处理错误的方式vs异常2.异常的使用规则2.1 异常的抛出和捕获原则2.2 在函数调用链中异常栈展开匹配原则 3.异常安全和异常规范4.自定义异常体系5.标准库的异常体系和异常的优缺点 二、智能指…

Rocketmq简单使用

1.引入依赖 <dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-client</artifactId><version>4.7.0</version></dependency><dependency><groupId>org.apache.rocketmq</groupId>&l…

22.vue插槽

目录 1 基本使用 2 name属性与v-slot 3 插槽放默认内容(后备内容) 4 插槽的自定义属性(作用域插槽) 4.1 简单使用 4.2 传data 4.3 支持解构 插槽操作就是写在组件中间的东西&#xff0c;其目的是增加组件在UI结构上的复用性&#xff0c;就像下面这样 直接写是渲染…

chatgpt赋能Python-python_ipynb

Python Ipython Notebook: 大数据时代的完美解决方案 在大数据时代&#xff0c;数据处理和分析是许多组织必须面对的挑战。Python Ipython Notebook (IPYNB) 可以提高数据探索性分析的效率&#xff0c;并能够使您更好地理解和评估数据。本文将介绍Python IPYNB是什么、以及为什…

C++设计手段的智慧:从基础到前沿

C设计手段的智慧&#xff1a;从基础到前沿 一、C基础设计手段&#xff08;Basic Design Techniques in C&#xff09;1.1 C 类和对象设计1.1.1 类的定义1.1.2 对象的创建和使用1.1.3 类的封装1.1.4 类的继承1.1.5 类的多态 1.2 RAII of C design tools (resource acquisition i…

ROS学习笔记(九):MoveIt!与机械臂控制

ROS学习笔记&#xff08;九&#xff09;&#xff1a;MoveIt&#xff01;与机械臂控制 MoveIt&#xff01;简介MoveIt&#xff01;系统架构MoveIt&#xff01;编程与机械臂控制关节空间规划工作空间规划笛卡尔运动规划避障规划 Pick and Place示例 MoveIt&#xff01;简介 Move…

一图看懂!RK3568与RK3399怎么选?

▎简介 RK3568和RK3399都是Rockchip公司的处理器&#xff0c;具有不同的特点和适用场景。以下是它们的主要区别和应用场景。 ▎RK3568 RK3568是新一代的高性能处理器&#xff0c;采用了22nm工艺&#xff0c;具有更高的性能和更低的功耗。它支持4K视频解码和编码&#xff0c;支持…

某程序员辞职后,接6份兼职,月入3w+

对于程序员来说&#xff0c;35岁真的是很关键。 如果成为架构师或者是成为管理方面的人才&#xff0c;还是不用担心失业。要是你30多岁还在一线写代码&#xff0c;那被裁的可能性很大。即使你现在没有失业&#xff0c;也说明你能力很一般。 最近在职场论坛上看到这样一个帖子…

互联网广告丨行业知识储备

文章状态&#xff1a;持续更新中 更新时间&#xff1a;2023.05.22 本文不同于专业咨询机构输出的专业行业调研报告&#xff0c;仅作为产品经理对互联网广告行业的一些基础知识储备。文章会以产品经理的角度&#xff0c;从行业概述、行业目标与愿景、行业生态、行业的发展、行业…

数仓中指标-标签,维度-度量,自然键-代理键等各名词深度解析

作为一个数据人&#xff0c;是不是经常被各种名词围绕&#xff0c;是不是对其中很多概念认知模糊。有些词虽然只有一字之差&#xff0c;但是它们意思完全不同&#xff0c;今天我们就来了解下数仓建设及数据分析时常见的一些概念含义及它们之间的关系。 本文首发于公众号【五分钟…

LiveNVR视频平台接收无人机等移动终端RTMP推流后转成GB28181协议输出级联到GB28181视频平台的操作说明...

1、需求介绍 目前很多移动终端设备(如无人机等)只支持RTMP推流输出&#xff0c;不支持GB28181协议。但是又有需要通过GB28181协议接入到视频平台的需求。比如有些大疆无人机产品不能直接注册国标平台&#xff0c;只能rtmp推流。那么&#xff0c;项目中如果将无人机的rtmp的推流…

Stablediffusion模型diffusesr格式和ckpt格式相互转换

参考资料&#xff1a; diffusers的源码 [github] 因为小博客可能看的人很少&#xff0c;所以我写的啰嗦一点&#xff0c;想直接看如何互相转换的朋友可以直接转到文末的代码段。 当你在学习Stablediffusion这个开源的t2i模型时&#xff0c;不可避免地会碰到两种模型权重的存储格…

在rk3568移植rtl8723du,配置成wifi ap模式

1、在路径添加rtl8723du模块代码 kernel/drivers/net/wireless/rockchip_wlan 添加rtl8723du 2、修改Makefile 修改对应的路径 修改交叉编译的工具的路径和内核路径 3、修改rockchip_wlan目录下的Makefile 添加这个 obj-$(CONFIG_RTL8723DU) rtl8723du/ 4、修改rockchip_w…

淘宝按关键字搜索淘宝商品 API 参数及返回值说明 翻页展示 含调用示例

淘宝关键字搜索接口&#xff0c;是复原我们在淘宝购物时&#xff0c;在搜索栏内输入关键字&#xff0c;即可获取到相关商品列表&#xff0c;商品信息齐全&#xff0c;支持翻页展示。同时&#xff0c;传入参数sort可按价格排序&#xff0c;也可筛选响应价格段的商品。商品信息是…

关于【Stable-Diffusion WEBUI】基础模型对应VAE的问题

文章目录 &#xff08;零&#xff09;前言&#xff08;一&#xff09;什么是VAE&#xff08;二&#xff09;模型嵌入VAE了么&#xff08;三&#xff09;我们能做什么&#xff08;3.1&#xff09;准备常见的VAE&#xff08;3.2&#xff09;下载模型对应的VAE&#xff08;3.3&…