【源码分析】XXL-JOB的执行器的注册流程

news2025/1/11 17:56:29

目的:分析xxl-job执行器的注册过程

流程:

  1. 获取执行器中所有被注解(@xxlJjob)修饰的handler
  2. 执行器注册过程
  3. 执行器中任务执行过程

版本:xxl-job 2.3.1

建议:下载xxl-job源码,按流程图debug调试,看堆栈信息并按文章内容理解执行流程

完整流程图:
在这里插入图片描述

查找Handler任务

部分流程图:

首先启动管理台界面(服务XxlJobAdminApplication),然后启动项目中给的执行器实例(SpringBoot);

img

这个方法是扫描项目中使用@xxlJob注解的所有handler方法。接着往下走

private void initJobHandlerMethodRepository(ApplicationContext applicationContext) {
    if (applicationContext == null) {
        return;
    }
    //获取该项目中所有的bean,然后遍历
    String[] beanDefinitionNames = applicationContext.getBeanNamesForType(Object.class, false, true);
    for (String beanDefinitionName : beanDefinitionNames) {
        Object bean = applicationContext.getBean(beanDefinitionName);

        Map<Method, XxlJob> annotatedMethods = null;   // referred to :org.springframework.context.event.EventListenerMethodProcessor.processBean
        try {
            annotatedMethods = MethodIntrospector.selectMethods(bean.getClass(),
                    new MethodIntrospector.MetadataLookup<XxlJob>() {
                        //注意点★
                        @Override
                        public XxlJob inspect(Method method) {
                            return AnnotatedElementUtils.findMergedAnnotation(method, XxlJob.class);
                        }
                    });
        } catch (Throwable ex) {
            logger.error("xxl-job method-jobhandler resolve error for bean[" + beanDefinitionName + "].", ex);
        }
        //没有跳过本次循环继续
        if (annotatedMethods==null || annotatedMethods.isEmpty()) {
            continue;
        }
    	//获取了当前执行器中所有@xxl-job的方法,获取方法以及对应的初始化和销毁方法
        for (Map.Entry<Method, XxlJob> methodXxlJobEntry : annotatedMethods.entrySet()) {
            Method executeMethod = methodXxlJobEntry.getKey();
            XxlJob xxlJob = methodXxlJobEntry.getValue();
            // regist
            registJobHandler(xxlJob, bean, executeMethod);
        }
    }
}

Spirng案例执行器中有5个handler:

img

XxlJobExecutor.registJobHandler()中部分源码

String name = xxlJob.value();
//make and simplify the variables since they'll be called several times later
Class<?> clazz = bean.getClass();
String methodName = executeMethod.getName();
if (name.trim().length() == 0) {
    throw new RuntimeException("xxl-job method-jobhandler name invalid, for[" + clazz + "#" + methodName + "] .");
}
if (loadJobHandler(name) != null) {
    throw new RuntimeException("xxl-job jobhandler[" + name + "] naming conflicts.");
}

然后进行遍历注册;开始进行名字判断:

  1. 判断bean名字是否为空
  2. 判断bean是否被注册了(存在了)

loadJobHandler校验方式会去该方法中查找:当bean注册完成后时存放到jobHandlerRepository一个map类型中;

private static ConcurrentMap<String, IJobHandler> jobHandlerRepository = new ConcurrentHashMap<String, IJobHandler>();
public static IJobHandler loadJobHandler(String name){
    return jobHandlerRepository.get(name);
}

executeMethod.setAccessible(true);它实现了修改对象访问权限的功能,参数为true,则表示允许调用方在使用反射时忽略Java语言的访问控制检查.

往后走会判断该注解的生命周期方法(init和destroy)

  1. 未设置生命周期,则直接开始注册
//注意MethodJobHandler,后面会用到
registJobHandler(name, new MethodJobHandler(bean, executeMethod, initMethod, destroyMethod));
//添加执行器名字及对应的hob方法信息(当前类、方法、init和destroy属性)
public static IJobHandler registJobHandler(String name, IJobHandler jobHandler){
    logger.info(">>>>>>>>>>> xxl-job register jobhandler success, name:{}, jobHandler:{}", name, jobHandler);
    return jobHandlerRepository.put(name, jobHandler);
}
  1. 有生命周期,设置init和destroy方法权限
if (xxlJob.init().trim().length() > 0) {
    try {
        initMethod = clazz.getDeclaredMethod(xxlJob.init());
        initMethod.setAccessible(true);
    } catch (NoSuchMethodException e) {
        throw new RuntimeException("xxl-job method-jobhandler initMethod invalid, for[" + clazz + "#" + methodName + "] .");
    }
}
if (xxlJob.destroy().trim().length() > 0) {
    try {
        destroyMethod = clazz.getDeclaredMethod(xxlJob.destroy());
        destroyMethod.setAccessible(true);
    } catch (NoSuchMethodException e) {
        throw new RuntimeException("xxl-job method-jobhandler destroyMethod invalid, for[" + clazz + "#" + methodName + "] .");
    }
}

首先检查@XxlJob注解中的init属性是否存在且不为空。如果存在,则尝试获取该类中名为init的方法,并将其设置为可访问状态,以便后续调用。

同理,代码接下来也检查了@XxlJob注解中的destroy属性是否存在且不为空,如果是,则获取该类中名为destroy的方法,并设置其为可访问状态

在这个过程中,如果某个方法不存在或者无法被访问,则会抛出NoSuchMethodException异常,并且使用throw new RuntimeException将其包装并抛出一个运行时异常。这样做的目的是为了提醒开发人员在任务处理器类中正确地设置init和destroy属性,并确保方法名称与属性值一致。

执行器的注册过程

部分流程图:
在这里插入图片描述

public void afterSingletonsInstantiated() {

    // init JobHandler Repository
    /*initJobHandlerRepository(applicationContext);*/

    // init JobHandler Repository (for method)
    initJobHandlerMethodRepository(applicationContext);

    // refresh GlueFactory
    GlueFactory.refreshInstance(1);

    // super start
    try {
        super.start();
    } catch (Exception e) {
        throw new RuntimeException(e);
    }
}

在扫描完执行器中所有的任务后,开始进行执行器注册XxlJobSpringExecutor中的super.start() 方法。

在初始化执行服务器启动之前,进行了四种操作,初始化日志、初始化adminBizList地址(可视化管理台地址)、初始化日志清楚、初始化回调线程等。

这里需要注意的是第二步初始化地址,在初始化服务器启动的时候需要用到。

private void initEmbedServer(String address, String ip, int port, String appname, String accessToken) throws Exception {

    // fill ip port
    port = port>0?port: NetUtil.findAvailablePort(9999);
    ip = (ip!=null&&ip.trim().length()>0)?ip: IpUtil.getIp();

    // generate address
    if (address==null || address.trim().length()==0) {
        String ip_port_address = IpUtil.getIpPort(ip, port);   // registry-address:default use address to registry , otherwise use ip:port if address is null
        address = "http://{ip_port}/".replace("{ip_port}", ip_port_address);
    }

    // accessToken
    if (accessToken==null || accessToken.trim().length()==0) {
        logger.warn(">>>>>>>>>>> xxl-job accessToken is empty. To ensure system security, please set the accessToken.");
    }

    // start
    embedServer = new EmbedServer();
    embedServer.start(address, port, appname, accessToken);
}

继续到initEmbedServer,开始初始化ip地址和端口等,需要明白的是,这一步的参数获取方式其实是第一步读取**XxlJobConfig****获得的;**进行ip的校验和拼接等操作,开始进行真正的注册。

创建一个**嵌入式的HTTP服务器,**将当前执行器信息(包含应用名称和IP地址端口等)注册到注册中心,注册方式的实现在ExecutorRegistryThread中实现。

校验名字和注册中心,如果注册中心不可用,则等待一段时间后重新尝试连接。

// registry
while (!toStop) {
    try {
        RegistryParam registryParam = new RegistryParam(RegistryConfig.RegistType.EXECUTOR.name(), appname, address);
        for (AdminBiz adminBiz: XxlJobExecutor.getAdminBizList()) {
            try {
                ReturnT<String> registryResult = adminBiz.registry(registryParam);
                if (registryResult!=null && ReturnT.SUCCESS_CODE == registryResult.getCode()) {
                    registryResult = ReturnT.SUCCESS;
                    logger.debug(">>>>>>>>>>> xxl-job registry success, registryParam:{}, registryResult:{}", new Object[]{registryParam, registryResult});
                    break;
                } else {
                    logger.info(">>>>>>>>>>> xxl-job registry fail, registryParam:{}, registryResult:{}", new Object[]{registryParam, registryResult});
                }
            } catch (Exception e) {
                logger.info(">>>>>>>>>>> xxl-job registry error, registryParam:{}", registryParam, e);
            }

        }
    } catch (Exception e) {
        if (!toStop) {
            logger.error(e.getMessage(), e);
        }

    }

    try {
        //心跳检测,默认30s
        if (!toStop) {
            TimeUnit.SECONDS.sleep(RegistryConfig.BEAT_TIMEOUT);
        }
    } catch (InterruptedException e) {
        if (!toStop) {
            logger.warn(">>>>>>>>>>> xxl-job, executor registry thread interrupted, error msg:{}", e.getMessage());
        }
    }
}

开启一个新线程,首先构建注册参数(包含执行器分组、执行器名字、执行器本地地址及端口号),遍历注册中心地址,开始进行执行器注册,注册方式通过发送http的post请求。

@Override
public ReturnT<String> registry(RegistryParam registryParam) {
    return XxlJobRemotingUtil.postBody(addressUrl + "api/registry", accessToken, timeout, registryParam, String.class);
}

debug的过程中,执行到int statusCode = connection.getResponseCode();才会跳转到JobApiController.api中的注册地址.

// services mapping
if ("callback".equals(uri)) {
    List<HandleCallbackParam> callbackParamList = GsonTool.fromJson(data, List.class, HandleCallbackParam.class);
    return adminBiz.callback(callbackParamList);
} else if ("registry".equals(uri)) {
    RegistryParam registryParam = GsonTool.fromJson(data, RegistryParam.class);
    return adminBiz.registry(registryParam);
} else if ("registryRemove".equals(uri)) {
    RegistryParam registryParam = GsonTool.fromJson(data, RegistryParam.class);
    return adminBiz.registryRemove(registryParam);
} else {
    return new ReturnT<String>(ReturnT.FAIL_CODE, "invalid request, uri-mapping("+ uri +") not found.");
}

最后进入到JobRegistryHelper.registry()方法中完成数据库的入库和更新操作。

通过更新语句判断该执行器是否注册,结果小于1,那么保存注册器信息,并向注册中心发送一个请求,更新当前执行器所属的应用名称、执行器名称和 IP 地址等信息,否则跳过。

public ReturnT<String> registry(RegistryParam registryParam) {
	//.......
    // async execute
    registryOrRemoveThreadPool.execute(new Runnable() {
        @Override
        public void run() {
            //更新注册表信息
            int ret = XxlJobAdminConfig.getAdminConfig().getXxlJobRegistryDao().registryUpdate(registryParam.getRegistryGroup(), registryParam.getRegistryKey(), registryParam.getRegistryValue(), new Date());
            if (ret < 1) {
                //保存执行器注册信息
                XxlJobAdminConfig.getAdminConfig().getXxlJobRegistryDao().registrySave(registryParam.getRegistryGroup(), registryParam.getRegistryKey(), registryParam.getRegistryValue(), new Date());

                // fresh 刷新执行器状态
                freshGroupRegistryInfo(registryParam);
            }
        }
    });

    return ReturnT.SUCCESS;
}

至此执行器的注册流程分析完成。

执行器中的任务执行过程

img

部分流程图:
在这里插入图片描述

执行器中的任务流程比较简单,如果执行器启动的话,那么每次执行任务是通过JobThread通过Cron 表达式进行操作的。

通过handler.execute()进行执行,是在框架内部通过反射机制调用作业处理器对象 handler 中的 execute() 方法实现的。在这个过程中,handler 对象表示被加载的作业处理器,并且已经调用了init()方法进行初始化。

method.invoke() 方法使用反射机制调用指定对象 target 中的方法 method。在这个方法中,target 表示作业处理器对象,method 表示作业处理器中的 execute() 方法。

通过上述方法,获取到SampleXxlJob.demoJobHandler的任务,然后开始进行任务逻辑操作。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/448753.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

【ONE·C++ || stack queue (一)】

总言 主要介绍栈和队列的基本函数使用&#xff1a;栈和队列、优先级队列、适配器、反向迭代器。 文章目录 总言1、栈和队列接口基本介绍1.1、基本介绍1.2、相关例题1.2.1、最小栈1.2.2、栈的压入、弹出序列1.2.3、逆波兰表达式求值 2、适配器介绍2.1、引入&#xff1a;如何实现…

儿童用灯哪个品牌好?分享五款儿童护眼台灯品牌

家中有小朋友上了幼儿园就已经戴上了眼镜&#xff0c;太让人心疼了 近视已经成为世界难题&#xff0c;而我国儿童近视形式尤为严峻 据官方数据显示&#xff0c;我国儿童青少年总体近视率竟高达52.7% 如何保护孩子眼睛&#xff0c;儿童用灯哪个品牌好&#xff1f; 那今天&am…

Open vSwitch 入门实践(8) VXLAN实验

目录 什么是VXLAN&#xff1f; VXLAN解决了什么问题&#xff1f; VXLAN网络如何工作&#xff1f; 简单VXLAN实验 主机A 主机B 测试 什么是VXLAN&#xff1f; VXLAN&#xff08;Virtual eXtensible Local Area Network&#xff0c;虚拟扩展局域网&#xff09;&#xff0…

Spring依赖注入 - Resource注解详解及与Autowired注解区别

上篇博客我们讲了Spring中的自动注入(byName,byType)和Autowired注解的工作原理以及源码分析&#xff0c;那么这次&#xff0c;我们来分析还没讲完的&#xff0c;剩下的核心的方法&#xff1a; Nullable Object resolveDependency(DependencyDescriptor descriptor, Nullable …

0.96寸OLED液晶显示器

在日常的小项目制作中我们经常会接触到OLED液晶显示器&#xff0c;本文介绍0.96寸液晶显示器的基本原理&#xff0c;辅助我们后续的小项目开发 OLED被称为有机激光二极管&#xff0c;也被称为有机激光显示&#xff0c;OLED采用有机材料涂层和玻璃基板&#xff0c;当有电流通过…

#Chrome扩展程序开发教程--02:Hello Extensions

#Chrome扩展程序开发教程--02&#xff1a;Hello Extensions 引言1、Hello Extensions2、固定扩展程序3、重新加载扩展程序4、查看扩展程序的输出 引言 本系列博客旨在带来最新的Chrome扩展程序开发入门教程。 1、Hello Extensions 本节博客中&#xff0c;笔者将带领读者创建一个…

C++附加篇: 空间适配器

"我有时难过&#xff0c;却还有些抚慰和感动。" 一、我们来谈谈空间适配器 (1) 什么是空间配置器? STL的六大组件&#xff0c;容器、算法、迭代器、适配器、仿函数&#xff0c;最后一个也就是"空间适配器"。 所谓"空间适配器"&#x…

轻松掌握K8S使用kubectl操作配置文件挂载ConfigMap和密钥Secret知识点05

1、挂载应用配置文件配置集ConfigMap 当有许多应用如redis、mysql&#xff0c;希望将它的配置文件挂载出去&#xff0c;以便随时修改&#xff0c;可以用ConfigMap配置集 具体用法查看使用命令行操作里的 3、ConfigMap配置集实战 2、挂载应用配置文件的敏感信息Secret Secre…

JAVA开发运维(云基础设备监控)

在大型的商用系统中&#xff0c;经常需要监控云设备的健康状态&#xff0c;性能情况&#xff0c;流量数据等。及时发现系统问题&#xff0c;及时修复&#xff0c;以确保系统的高可用。检查云资源的工作内容主要包括基础监控、主动拨测、用户体验、APM监控、指标体系、业务分析、…

Java运行时内存管理

一、前言 希望能在我们平时开发写代码的时候&#xff0c;能够知道当前写的这段代码&#xff0c;内存方面是如何分配的。 我们深知&#xff0c;一个Java程序员在很多时候根本不用操心内存的释放&#xff0c;而是依靠JVM去管理&#xff0c;以前写C代码的时候&#xff0c;却要时刻…

SAP 自定义生产订单状态

1、生产订单通常系统有一整套订单状态&#xff0c;做PP的各位同学都应该知道。 CRTD状态 REL已下达 CNF已报工 DLV已入库 TECO技术性完成 等等状态这里就不在罗列了&#xff0c;可以自行在生产订单中看到 2、这篇文章主要是在生产订单系统外&#xff0c;在自定义一套状态。这个…

Spring更简单的读取和存储Bean(基于注解)

目录 ①从Maven中央仓库获取spring-context和spring-beans的依赖&#xff0c;将依赖引入到pom.xml中 ②配置扫描路径 ③添加注解存储Bean对象&#xff08;可以使用5大类注解和方法注解&#xff09; 类注解&#xff08;写在类上&#xff0c;作用于类上&#xff09; Contro…

【致敬未来的攻城狮计划】— 连续打卡第十一天:FSP固件库开发点亮第一个灯。

系列文章目录 1.连续打卡第一天&#xff1a;提前对CPK_RA2E1是瑞萨RA系列开发板的初体验&#xff0c;了解一下 2.开发环境的选择和调试&#xff08;从零开始&#xff0c;加油&#xff09; 3.欲速则不达&#xff0c;今天是对RA2E1 基础知识的补充学习。 4.e2 studio 使用教程 5.…

leetcode刷题--辅助工具

idea插件 插件商店搜索leetcode&#xff0c;可以让你利用idea调试leetcode的题目 插件首先需要填写用户名密码登录&#xff0c;登录上就可以在idea搜题、做题、提交等 注意&#xff1a; 一些版本登录可能登录失败&#xff0c;解决方法是换leetcode地址为leetcode.cn。 有些可…

通过用户名密码认证保障 MQTT 接入安全

认证是一种安全措施&#xff0c;用于识别用户并验证他们是否有权访问系统或服务器。它能够保护系统免受未经授权的访问&#xff0c;确保只有经过验证的用户才能使用系统。 物联网连接万物&#xff0c;对试图访问基础设施的用户进行认证至关重要。未经授权的访问存在重大的安全…

数据保管库的数据质量错误

数据保管库的数据质量错误 在过去的几年里&#xff0c;数据仓库发生了巨大的变化&#xff0c;但这并不意味着支撑健全数据架构的基本原理需要被抛在窗外。事实上&#xff0c;随着GDPR等数据法规的日益严格以及对优化技术成本的重新重视&#xff0c;我们现在看到了“Data Vault…

设计模式之备忘录模式(C++)

作者&#xff1a;翟天保Steven 版权声明&#xff1a;著作权归作者所有&#xff0c;商业转载请联系作者获得授权&#xff0c;非商业转载请注明出处 一、备忘录模式是什么&#xff1f; 备忘录模式是一种行为型的软件设计模式&#xff0c;在不破坏封装的前提下&#xff0c;获取一个…

OpenCV实例(八)行人跟踪

OpenCV实例&#xff08;八&#xff09;行人跟踪 1.目标跟踪概述2.基于背景差分检测运动物体2.1 实现基本背景差分器2.2 使用MOG背景差分器2.3 使用卡尔曼滤波器寻找运动趋势 3.跟踪行人 作者&#xff1a;Xiou 1.目标跟踪概述 目标跟踪是对摄像头视频中的移动目标进行定位的过…

数据结构与算法基础-学习-20-查找之散列表(HASH TABLE)

目录 目录 一、基本思想 二、术语 1、散列方法 2、散列函数 3、散列表 4、冲突 5、同义词 三、如何减少哈希冲突 四、构造散列函数需考虑的情况 五、散列函数的构造方法 1、直接定址法 2、除留余数法 六、如何处理哈希冲突 1、开地址法 2、拉链法 七、散列表查…

【微服务笔记16】微服务组件之Gateway服务网关基础环境搭建、高可用网关环境搭建

这篇文章&#xff0c;主要介绍微服务组件之Gateway服务网关基础环境搭建、高可用网关环境搭建。 目录 一、Gateway服务网关 1.1、什么是Gateway 1.2、Gateway基础环境搭建 &#xff08;1&#xff09;基础环境介绍 &#xff08;2&#xff09;引入依赖 &#xff08;3&#…