elastic-job源码- job自动装配

news2024/11/23 14:50:35

版本:3.1.0-SNAPSHOT

git地址:GitHub - apache/shardingsphere-elasticjob: Distributed scheduled job framework

Maven 坐标

1

2

3

4

5

<dependency>

    <groupId>org.apache.shardingsphere.elasticjob</groupId>

    <artifactId>elasticjob-lite-spring-boot-starter</artifactId>

    <version>${latest.version}</version>

</dependency>

Spring.factories配置

org.springframework.boot.autoconfigure.EnableAutoConfiguration=\
  org.apache.shardingsphere.elasticjob.lite.spring.boot.job.ElasticJobLiteAutoConfiguration

在添加elasticjob-lite-spring-boot-starter启动类的时候,会自动加载ElasticJobLiteAutoConfiguration,接下来看下ElasticJobLiteAutoConfiguration中所做的处理。

ElasticJobLiteAutoConfiguration.java

复制代码

/**
 * ElasticJob-Lite auto configuration.
 */
@Configuration(proxyBeanMethods = false)
@AutoConfigureAfter(DataSourceAutoConfiguration.class)


/**
 * elastic job 开关
 * elasticjob.enabled.ture默认为true
 */
@ConditionalOnProperty(name = "elasticjob.enabled", havingValue = "true", matchIfMissing = true)


/**
 * 导入
 * ElasticJobRegistryCenterConfiguration.class 注册中心配置
 * ElasticJobTracingConfiguration.class job事件追踪配置
 * ElasticJobSnapshotServiceConfiguration.class 快照配置
 */
@Import({ElasticJobRegistryCenterConfiguration.class, ElasticJobTracingConfiguration.class, ElasticJobSnapshotServiceConfiguration.class})


/**
 * job相关配置信息
 */
@EnableConfigurationProperties(ElasticJobProperties.class)
public class ElasticJobLiteAutoConfiguration {
    
    @Configuration(proxyBeanMethods = false)
    /**
     * ElasticJobBootstrapConfiguration.class  创建job beans 注入spring容器
     * ScheduleJobBootstrapStartupRunner.class  执行类型为ScheduleJobBootstrap.class 的job开始运行
     */
    @Import({ElasticJobBootstrapConfiguration.class, ScheduleJobBootstrapStartupRunner.class})
    protected static class ElasticJobConfiguration {
    }
}

复制代码

Elastic-job 是利用zookeeper 实现分布式job的功能,所以在自动装配的时候,需要有zookeeper注册中心的配置。

自动装配主要做了4件事事

1.配置zookeeper 客户端信息,启动连接zookeeper.

2.配置事件追踪数据库,用于保存job运行记录

3.解析所有job配置文件,将所有job的bean放置在spring 单例bean中

4.识别job类型,在zookeeper节点上处理job节点数据,运行定时任务job.

第一件事:配置zookeeper 客户端信息,启动连接zookeeper.

ZookeeperRegistryCenter.class

复制代码

public void init() {
    log.debug("Elastic job: zookeeper registry center init, server lists is: {}.", zkConfig.getServerLists());
    CuratorFrameworkFactory.Builder builder = CuratorFrameworkFactory.builder()
            //设置zookeeper 服务器地址
            .connectString(zkConfig.getServerLists())
            //设置重试机制
            .retryPolicy(new ExponentialBackoffRetry(zkConfig.getBaseSleepTimeMilliseconds(), zkConfig.getMaxRetries(), zkConfig.getMaxSleepTimeMilliseconds()))
            //设置命名空间,zookeeper节点名称
            .namespace(zkConfig.getNamespace());
    //设置session超时时间
    if (0 != zkConfig.getSessionTimeoutMilliseconds()) {
        builder.sessionTimeoutMs(zkConfig.getSessionTimeoutMilliseconds());
    }
    //设置连接超时时间
    if (0 != zkConfig.getConnectionTimeoutMilliseconds()) {
        builder.connectionTimeoutMs(zkConfig.getConnectionTimeoutMilliseconds());
    }
    if (!Strings.isNullOrEmpty(zkConfig.getDigest())) {
        builder.authorization("digest", zkConfig.getDigest().getBytes(StandardCharsets.UTF_8))
                .aclProvider(new ACLProvider() {
                
                    @Override
                    public List<ACL> getDefaultAcl() {
                        return ZooDefs.Ids.CREATOR_ALL_ACL;
                    }
                
                    @Override
                    public List<ACL> getAclForPath(final String path) {
                        return ZooDefs.Ids.CREATOR_ALL_ACL;
                    }
                });
    }
    client = builder.build();
    //zookeeper 客户端开始启动
    client.start();
    try {
        //zookeeper 客户端一直连接
        if (!client.blockUntilConnected(zkConfig.getMaxSleepTimeMilliseconds() * zkConfig.getMaxRetries(), TimeUnit.MILLISECONDS)) {
            client.close();
            throw new KeeperException.OperationTimeoutException();
        }
        //CHECKSTYLE:OFF
    } catch (final Exception ex) {
        //CHECKSTYLE:ON
        RegExceptionHandler.handleException(ex);
    }
}

复制代码

第二件事: 配置事件追踪数据库,用于保存job运行记录

ElasticJobTracingConfiguration.java

复制代码

/**
 * Create a bean of tracing DataSource.
 *
 * @param tracingProperties tracing Properties
 * @return tracing DataSource
 */
@Bean("tracingDataSource")
//spring中注入bean name 为tracingDataSource的job数据库连接信息
public DataSource tracingDataSource(final TracingProperties tracingProperties) {
    //获取elastic-job 数据库配置
    DataSourceProperties dataSource = tracingProperties.getDataSource();
    if (dataSource == null) {
        return null;
    }
    HikariDataSource tracingDataSource = new HikariDataSource();
    tracingDataSource.setJdbcUrl(dataSource.getUrl());
    BeanUtils.copyProperties(dataSource, tracingDataSource);
    return tracingDataSource;
}


/**
 * Create a bean of tracing configuration.
 *
 * @param dataSource required by constructor
 * @param tracingDataSource tracing ataSource
 * @return a bean of tracing configuration
 */
@Bean
@ConditionalOnBean(DataSource.class)
@ConditionalOnProperty(name = "elasticjob.tracing.type", havingValue = "RDB")
public TracingConfiguration<DataSource> tracingConfiguration(final DataSource dataSource, @Nullable final DataSource tracingDataSource) {
    /**
     * dataSource 是业务数据库
     * tracingDataSource 是job数据库
     * 当配置elasticjob.tracing.type = RDB时,如果单独配置job数据库是,默认使用job数据库作为job运行轨迹的记录
     * 但这边同时业务数据库和job追踪数据库同时注入是,mybatis-plus 结合@Table 使用的时候,很有可能找不到正确对应的数据源
     */
    DataSource ds = tracingDataSource;
    if (ds == null) {
        ds = dataSource;
    }
    return new TracingConfiguration<>("RDB", ds);
}

复制代码

通过elasticjob.tracing.type=RDB的配置开启事件追踪功能,这边job的事件追踪数据源可以和业务数据源配置不一样。

第三件事:解析所有job配置文件

ElasticJobBootstrapConfiguration.class

复制代码

public void createJobBootstrapBeans() {
    //获取job配置
    ElasticJobProperties elasticJobProperties = applicationContext.getBean(ElasticJobProperties.class);
    //获取单利注册对象
    SingletonBeanRegistry singletonBeanRegistry = ((ConfigurableApplicationContext) applicationContext).getBeanFactory();
    //获取注入zookeeper 客户端
    CoordinatorRegistryCenter registryCenter = applicationContext.getBean(CoordinatorRegistryCenter.class);
    //获取job事件追踪
    TracingConfiguration<?> tracingConfig = getTracingConfiguration();
    //构造JobBootstraps
    constructJobBootstraps(elasticJobProperties, singletonBeanRegistry, registryCenter, tracingConfig);
}

复制代码

重要的是constructJobBootstraps 这个方法,来看下

复制代码

private void constructJobBootstraps(final ElasticJobProperties elasticJobProperties, final SingletonBeanRegistry singletonBeanRegistry,
                                    final CoordinatorRegistryCenter registryCenter, final TracingConfiguration<?> tracingConfig) {
    //遍历配置的每一个job
    for (Map.Entry<String, ElasticJobConfigurationProperties> entry : elasticJobProperties.getJobs().entrySet()) {
        ElasticJobConfigurationProperties jobConfigurationProperties = entry.getValue();
        //校验 job class 和 type 都为空抛异常
        Preconditions.checkArgument(null != jobConfigurationProperties.getElasticJobClass()
                        || !Strings.isNullOrEmpty(jobConfigurationProperties.getElasticJobType()),
                "Please specific [elasticJobClass] or [elasticJobType] under job configuration.");
        //校验 job class 和 type 都有 报相互排斥
        Preconditions.checkArgument(null == jobConfigurationProperties.getElasticJobClass()
                        || Strings.isNullOrEmpty(jobConfigurationProperties.getElasticJobType()),
                "[elasticJobClass] and [elasticJobType] are mutually exclusive.");


        if (null != jobConfigurationProperties.getElasticJobClass()) {
            //通过class 注入job
            registerClassedJob(entry.getKey(), entry.getValue().getJobBootstrapBeanName(), singletonBeanRegistry, registryCenter, tracingConfig, jobConfigurationProperties);
        } else if (!Strings.isNullOrEmpty(jobConfigurationProperties.getElasticJobType())) {
            //通过type 注入job
            registerTypedJob(entry.getKey(), entry.getValue().getJobBootstrapBeanName(), singletonBeanRegistry, registryCenter, tracingConfig, jobConfigurationProperties);
        }
    }
}

复制代码

Job 有两种类型的注入,第一种是是class,配置成job的全路径信息注入

再来看看registerClassedJob 方法里的内容

复制代码

private void registerClassedJob(final String jobName, final String jobBootstrapBeanName, final SingletonBeanRegistry singletonBeanRegistry, final CoordinatorRegistryCenter registryCenter,
                                final TracingConfiguration<?> tracingConfig, final ElasticJobConfigurationProperties jobConfigurationProperties) {
    //获取job配置
    JobConfiguration jobConfig = jobConfigurationProperties.toJobConfiguration(jobName);
    //配置job事件追踪
    jobExtraConfigurations(jobConfig, tracingConfig);
    //获取job类型
    ElasticJob elasticJob = applicationContext.getBean(jobConfigurationProperties.getElasticJobClass());
    //没有配置cron表达式 就初始化为OneOffJobBootstrap对象,一次性任务
    if (Strings.isNullOrEmpty(jobConfig.getCron())) {
        Preconditions.checkArgument(!Strings.isNullOrEmpty(jobBootstrapBeanName), "The property [jobBootstrapBeanName] is required for One-off job.");
        singletonBeanRegistry.registerSingleton(jobBootstrapBeanName, new OneOffJobBootstrap(registryCenter, elasticJob, jobConfig));
    } else {
        //有配置cron表达式 就初始化为ScheduleJobBootstrap对象,定时任务
        //设置bean name
        String beanName = !Strings.isNullOrEmpty(jobBootstrapBeanName) ? jobBootstrapBeanName : jobConfig.getJobName() + "ScheduleJobBootstrap";
        //注入ScheduleJobBootstrap对象为单利对象
        singletonBeanRegistry.registerSingleton(beanName, new ScheduleJobBootstrap(registryCenter, elasticJob, jobConfig));
    }
}

复制代码

Class 类型注入的job有两种类型

1.ScheduleJobBootstrap:定时任务类型的job。

2.OneOffJobBootstrap:一定次job类型。

看下定义的new ScheduleJobBootstrap 方法

复制代码

public JobScheduler(final CoordinatorRegistryCenter regCenter, final ElasticJob elasticJob, final JobConfiguration jobConfig) {
    Preconditions.checkArgument(null != elasticJob, "Elastic job cannot be null.");
    this.regCenter = regCenter;
    //获取job监听器
    Collection<ElasticJobListener> jobListeners = getElasticJobListeners(jobConfig);
    // 集成所有操作zookeeper 节点的services,job 监听器
    setUpFacade = new SetUpFacade(regCenter, jobConfig.getJobName(), jobListeners);
    //获取当前job名称
    String jobClassName = JobClassNameProviderFactory.getProvider().getJobClassName(elasticJob);
    //zookeeper节点 {namespace}/{jobclassname}/config 放置job配置信息
    this.jobConfig = setUpFacade.setUpJobConfiguration(jobClassName, jobConfig);
    // 集成所有操作zookeeper 节点的services
    schedulerFacade = new SchedulerFacade(regCenter, jobConfig.getJobName());
    jobFacade = new LiteJobFacade(regCenter, jobConfig.getJobName(), jobListeners, findTracingConfiguration().orElse(null));
    //检验job配置
    validateJobProperties();
    //定义job执行器
    jobExecutor = new ElasticJobExecutor(elasticJob, this.jobConfig, jobFacade);
    //监听器里注入GuaranteeService
    setGuaranteeServiceForElasticJobListeners(regCenter, jobListeners);
    //创建定时任务,开始执行
    jobScheduleController = createJobScheduleController();
}

复制代码

看下createJobScheduleController

复制代码

private JobScheduleController createJobScheduleController() {
    JobScheduleController result = new JobScheduleController(createScheduler(), createJobDetail(), getJobConfig().getJobName());
    //注册job
    JobRegistry.getInstance().registerJob(getJobConfig().getJobName(), result);
    //注册器开始运行
    registerStartUpInfo();
    return result;
}

复制代码

看下registerStartUpInfo方法

复制代码

public void registerStartUpInfo(final boolean enabled) {
    //开始所有的监听器
    listenerManager.startAllListeners();
    //选举leader /{namespace}/leader/election/instance 放置选举出来的服务器
    leaderService.electLeader();
    //{namespace}/{ipservers} 设置enable处理
    serverService.persistOnline(enabled);
    //临时节点   /{namespave}/instances 放置运行服务实例信息
    instanceService.persistOnline();
    //开启一个异步服务
    if (!reconcileService.isRunning()) {
        reconcileService.startAsync();
    }
}

复制代码

这里实行的操作:

1.开启所有监听器处理

2.leader选举

3.持久化节点数据

4.开启异步服务

第四步:4.识别job类型,在zookeeper节点上处理job节点数据,运行定时任务job.

@Override
public void run(final String... args) {
    log.info("Starting ElasticJob Bootstrap.");
    applicationContext.getBeansOfType(ScheduleJobBootstrap.class).values().forEach(ScheduleJobBootstrap::schedule);
    log.info("ElasticJob Bootstrap started.");
}

获取到所有的定时任务job(ScheduleJobBootstrap类型),执行schedule方法,底层实际使用quartz框架运行定时任务。 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/912123.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

vite+vue3使用@路径,报错处理

报错原因&#xff1a;未配置 符号为指定路径别名&#xff0c;直接使用导致 处理方法&#xff1a; 安装path模块&#xff1a; npm install --save-dev types/node修改vite.config.ts import { defineConfig } from vite import vue from vitejs/plugin-vue import path from…

Hope.money:新兴DeFi项目如何重新定义稳定币生态的未来?

联储加息导致金融市场紧缩&#xff0c;Terra、3AC、FTX等知名中心化机构未能妥善应对而暴雷&#xff0c;并重创了整个加密货币市场&#xff0c;导致参与者损失惨重。这些事件揭示了中心化机构的局限&#xff0c;投资者对其资产掌控权的担忧愈发强烈。 自2018年首个DeFi协议Com…

Java算法:对角线遍历

Java算法&#xff1a;对角线遍历 学习目标&#xff1a;对角线遍历算法算法要求算法思路算法实现 学习目标&#xff1a;对角线遍历算法 每日初级算法&#xff1a;对角线遍历 算法要求 给你一个大小为 m x n 的矩阵 mat &#xff0c;请以对角线遍历的顺序&#xff0c;用一个数组…

使用 Feature Flags 与可观测工具实现数据库灰度迁移

场景描述 很多企业会遇到数据库升级、或数据库迁移的情况&#xff0c;尤其是在自建数据库服务向云数据库服务、自建机房向云机房、旧数据库向新数据库迁移等场景。 然而&#xff0c;我们需要在整个移植过程中保证其稳定性、避免数据遗失、服务宕机等情况&#xff0c;最常见的移…

PHP求职招聘系统Dreamweaver开发mysql数据库web结构php编程计算机网页

一、源码特点 PHP 求职招聘系统是一套完善的web设计系统&#xff0c;对理解php编程开发语言有帮助&#xff0c;系统具有完整的源代码和数据库&#xff0c;系统主要采用B/S模式开发。 源码 https://download.csdn.net/download/qq_41221322/88240283 论文 https://down…

Python土力学与基础工程计算.PDF-隧道涌水量

Python 求解代码如下&#xff1a; 1. # 定义参数 2. A 2000 # 地表面积&#xff0c;单位&#xff1a;平方米 3. S 10 # 截面积&#xff0c;单位&#xff1a;平方米 4. h 500 # 年地下径流深度&#xff0c;单位&#xff1a;毫米 5. 6. # 转换单位 7. h h / 1000 # 单…

Linux 中查看文件第n行内容的命令(实战案例)

Linux 中查看文件第n行内容的命令实战案例 方法1&#xff1a; head -m filename | tail -1 //查看filename文件的第m行&#xff08;tail -1 是数字1&#xff09; head -n &#xff08;数字&#xff09;&#xff08;文件名&#xff09;&#xff1a;数字为正数 显示前多少行的文…

React学习记录

一、简介 1、React是什么&#xff1f; 一个将数据渲染为HTML视图的开源JavaScript库&#xff0c;操作DOM呈现页面。 2、特点 采用组件化模式&#xff0c;声明式编码&#xff0c;提高开发效率及组件复用率在react native中可使用react语法进行移动端开发使用虚拟DOM优秀的di…

数据结构之——(手撕)顺序表

本章会介绍的知识点如下图&#xff1a; 1&#xff1a; 顺序表的概念&#xff1a;顺序表是用一段物理地址连续的存储单元依次存储数据的线性结构&#xff0c;通常我们使用数组来表示&#xff0c;对数组进行增删查改。 顺序表的结构&#xff1a;逻辑结构与物理结构都是内存中一块…

无涯教程-PHP - preg_match()函数

preg_match() - 语法 int preg_match (string pattern, string string [, array pattern_array], [, int $flags [, int $offset]]]); preg_match()函数在字符串中搜索pattern&#xff0c;如果存在pattern&#xff0c;则返回true&#xff0c;否则返回false。 如果提供了可选…

回归预测 | MATLAB实现GA-RF遗传算法优化随机森林算法多输入单输出回归预测(多指标,多图)

回归预测 | MATLAB实现GA-RF遗传算法优化随机森林算法多输入单输出回归预测&#xff08;多指标&#xff0c;多图&#xff09; 目录 回归预测 | MATLAB实现GA-RF遗传算法优化随机森林算法多输入单输出回归预测&#xff08;多指标&#xff0c;多图&#xff09;效果一览基本介绍程…

今天七夕,群友让我帮忙给他分配一个对象,于是我。。。

今天七夕&#xff0c;群友让我帮忙给他分配一个对象&#xff0c;于是我只好尝试给他分配对象了&#xff1a; CGirlFrined *pGF new CGirlFrined("大屌萌妹");int nRet (群友).SetGirlFriend(pGF);if (nRet ! 0) {alert("分配失败&#xff01;"); }后来觉…

16、Flink 的table api与sql之连接外部系统: 读写外部系统的连接器和格式以及Apache Kafka示例(3)

Flink 系列文章 1、Flink 部署、概念介绍、source、transformation、sink使用示例、四大基石介绍和示例等系列综合文章链接 13、Flink 的table api与sql的基本概念、通用api介绍及入门示例 14、Flink 的table api与sql之数据类型: 内置数据类型以及它们的属性 15、Flink 的ta…

GitRedisNginx合集

目录 文件传下载 Git常用命令 Git工作区中文件的状态 远程仓库操作 分支操作 标签操作 idea中使用git 设置git.exe路径 操作步骤 linux配置jdk 安装tomcat 查看是否启动成功 查看tomcat进程 防火墙操作 开放指定端口并立即生效 安装mysql 修改mysql密码 安装lrzsz软…

Linux 线程同步——条件变量

一、条件变量的概念 如果说互斥锁是用于同步线程对共享数据的访问的话&#xff0c;那么条件变量则是用于在线程之间同步共享数据的值。条件变量提供了一种线程间的通知机制&#xff1a;当某个共享数据达到某个值的时候&#xff0c;唤醒等待这个共享数据的线程。如下图所示&…

Java【手撕双指针】LeetCode 202. “快乐数“, 图文详解思路分析 + 代码

文章目录 前言一、快乐数1, 题目2, 思路分析3, 代码展示 前言 各位读者好, 我是小陈, 这是我的个人主页, 希望我的专栏能够帮助到你: &#x1f4d5; JavaSE基础: 基础语法, 类和对象, 封装继承多态, 接口, 综合小练习图书管理系统等 &#x1f4d7; Java数据结构: 顺序表, 链表,…

Ubuntu系列弹性云服务器如何安装图形化界面?

​ 参考链接&#xff1a;Ubuntu系列弹性云服务器如何安装图形化界面&#xff1f; 操作场景 为了提供纯净的弹性云服务器系统给客户&#xff0c;Ubuntu系列弹性云服务器默认未安装图形化界面&#xff0c;如果您需要使用图形化界面&#xff0c;请参见本节内容进行安装。 对于GPU加…

Python支持下最新Noah-MP陆面模式站点与区域数据的处理、单站和区域的模拟、模拟结果的输出和后续分析及可视化

目录 Noah-MP 5.0模型介绍&模型所需环境的搭建 Python教程 Noah-MP 5.0模型站点模拟 Noah-MP 5.0模型区域模拟 更多应用 熟悉陆表过程的主要研究内容以及陆面模型在生态水文研究中的地位和作用&#xff1b;深入理解Noah-MP 5.0模型的原理&#xff0c;掌握Noah-MP模型&…

Video-LLaMA

文章目录 链接主要贡献模型一些思考啥是Q-former为什么position embedding可以辅助学到temporal的信息 代码视觉处理 一些惊喜 链接 https://github.com/DAMO-NLP-SG/Video-LLaMA 主要贡献 能够捕捉到一小小短时间&#xff08;temporal&#xff09;里视觉的变化 - 使用了Q-f…

“卷王”新茶饮们:从风口,卷到了十字路口

【潮汐商业评论/原创】 “楼下奈雪开业了&#xff0c;扫码9块9就可以喝到杨枝甘露&#xff0c;这也太便宜了&#xff0c;”Chloe向同事说道。 “上次跟Fendi联名的喜茶&#xff0c;两杯也只要38元。” “喜茶奈雪开始向二线价格靠拢&#xff1f;怎么都那么便宜了&#xff1f…