ElasticJob-Lite架构篇 - 认知分布式任务调度ElasticJob-Lite

news2024/12/26 13:52:31

前言

本文基于 ElasticJob-Lite 3.x 版本展开分析。

如果 Quartz 集群中有多个服务端节点,任务决定在哪个服务端节点上执行的呢?

Quartz 采用随机负载,通过 DB 抢占下一个即将触发的 Trigger 绑定的任务的执行权限。

在 Quartz 的基础上,需要一个新的分布式任务调度框架。它可以帮助我们做到如下:

  • 通过选举一个 Leader 节点来协调多个服务端节点,可以指定任务在哪个服务端节点上执行。
  • 如果任务数量过多,需要分片,多个节点执行分片任务。
  • 支持动态调度以及拥有可视化界面。

介绍

ElasticJob 是面向互联网生态和海量任务的分布式调度解决方案,由两个相互独立的子项目 ElasticJob-Lite 和 ElasticJob-Cloud 组成。它通过弹性调度、资源管控、以及作业治理的功能,打造一个适用于互联网场景的分布式调度解决方案,并通过开放的架构设计,提供多元化的作业生态。它的各个产品使用统一的作业 API,开发者仅需一次开发,即可随意部署。

ElasticJob-Lite 定位为轻量级无中心化解决方案,使用 jar 的形式提供分布式任务的协调服务。

在这里插入图片描述

核心概念与功能

调度模型

ElasticJob 的调度模型分为支持线程级别调度的进程内调度 ElasticJob-Lite 和进程级别调度的 ElasticJob-Cloud。

进程内调度

ElasticJob-Lite 是面向进程内的线程级别调度框架。通过它,作业能够透明化的与业务应用系统相结合。它能够方便的与 Spring、Dubbo 等 Java 框架配合使用,在作业中可以自由使用 Spring 注入的 Bean,如数据库连接池、Dubbo 远程服务等,更加方便的贴合业务开发。

进程调度

ElasticJob-Cloud 拥有进程内调度和进程级别调度两种方式。 由于 ElasticJob-Cloud 能够对作业服务器的资源进行控制,因此其作业类型可划分为常驻任务和瞬时任务。 常驻任务类似于 ElasticJob-Lite,是进程内调度;瞬时任务则完全不同,它充分的利用了资源分配的削峰填谷能力,是进程级的调度,每次任务会启动全新的进程处理。

弹性调度

弹性调度是 ElasticJob 最重要的功能,也是这款产品名称的由来。它是一款能够让任务通过分片进行水平拓展的任务处理系统。

分片

ElasticJob 中任务分片项的概念,使得任务可以在分布式的环境下运行,每台任务服务器只运行分配给该服务器的分片。 随着服务器的增加或宕机,ElasticJob 会近乎实时的感知服务器数量的变更,从而重新为分布式的任务服务器分配更加合理的任务分片项,使得任务可以随着资源的增加而提升效率。

任务的分布式执行,需要将一个任务拆分为多个独立的任务项,然后由分布式的服务器分别执行某一个或几个分片项。

分片项

分片项为数字,始于 0 而终于分片总数减 1。

分片参数

个性化参数可以和分片项匹配对应关系,用于将分片项的数字转换为更加可读的业务代码。

例如:按照地区水平拆分数据库,数据库 A 是北京的数据;数据库 B 是上海的数据;数据库 C 是广州的数据。 如果仅按照分片项配置,开发者需要了解 0 表示北京;1 表示上海;2 表示广州。 合理使用个性化参数可以让代码更可读,如果配置为 0=北京,1=上海,2=广州,那么代码中直接使用北京,上海,广州的枚举值即可完成分片项和业务逻辑的对应关系。

举例说明,如果作业分为 4 片,用两台服务器执行,则每个服务器分到 2 片,分别负责作业的 50% 的负载,如下图所示。

在这里插入图片描述

资源最大限度利用

当新增加作业服务器时,ElasticJob 会通过注册中心的临时节点的变化感知到新服务器的存在,并在下次任务调度的时候重新分片,新的服务器会承载一部分作业分片。

将分片项设置为大于服务器的数量,最好是大于服务器倍数的数量,作业将会合理的利用分布式资源,动态的分配分片项。

例如:3 台服务器,分成 10 片,则分片项分配结果为服务器 A = 0,1,2,9;服务器 B = 3,4,5;服务器 C = 6,7,8。 如果服务器 C 崩溃,则分片项分配结果为服务器 A = 0,1,2,3,4; 服务器 B = 5,6,7,8,9。 在不丢失分片项的情况下,最大限度的利用现有资源提高吞吐量。

高可用

当作业服务器在运行中宕机时,注册中心同样会通过临时节点感知,并将在下次运行时将分片转移至仍存活的服务器,以达到作业高可用的效果。 本次由于服务器宕机而未执行完的作业,则可以通过失效转移的方式继续执行

作业高可用

实现原理

ElasticJob-Lite 无作业调度中心节点,而是基于部署作业框架的程序在到达相应时间点时各自触发调度。 注册中心仅用于作业注册和监控信息存储。而主作业节点仅用于处理分片和清理等功能。

弹性分布式实现

  • 第一台服务器上线触发主服务器选举。主服务器一旦下线,则重新触发选举,选举过程中阻塞,只有主服务器选举完成,才会执行其他任务。
  • 某作业服务器上线时会自动将服务器信息注册到注册中心,下线时会自动更新服务器状态。
  • 主节点选举,服务器上下线,分片总数变更均更新重新分片标记。
  • 定时任务触发时,如需重新分片,则通过主服务器分片,分片过程中阻塞,分片结束后才可执行任务。如分片过程中主服务器下线,则先选举主服务器,再分片。
  • 通过上一项说明可知,为了维持作业运行时的稳定性,运行过程中只会标记分片状态,不会重新分片。分片仅可能发生在下次任务触发前。
  • 每次分片都会按服务器IP排序,保证分片结果不会产生较大波动。
  • 实现失效转移功能,在某台服务器执行完毕后主动抓取未分配的分片,并且在某台服务器下线后主动寻找可用的服务器执行任务。

注册中心数据结构

注册中心在定义的命名空间下,创建作业名称节点,用于区分不同作业,所以作业一旦创建则不能修改作业名称,如果修改名称将视为新的作业。 作业名称节点下又包含5个数据子节点,分别是 config, instances, sharding, servers 和 leader。

config 节点

作业配置信息,以 YAML 格式存储。

instances 节点

作业运行实例主键由作业运行服务器的 IP 地址和 PID 构成。 作业运行实例主键均为临时节点,当作业实例上线时注册,下线时自动清理。注册中心监控这些节点的变化来协调分布式作业的分片以及高可用。 可在作业运行实例节点写入 TRIGGER 表示该实例立即执行一次。

sharding 节点

作业分片信息,子节点是分片项序号,从零开始,至分片总数减一。 分片项序号的子节点存储详细信息。每个分片项下的子节点用于控制和记录分片运行状态。 节点详细信息说明:

子节点名临时节点描述
instance执行该分片项的作业运行实例主键
running分片项正在运行的状态 仅配置 monitorExecution 时有效
failover如果该分片项被失效转移分配给其他作业服务器,则此节点值记录执行此分片的作业服务器 IP
misfire是否开启错过任务重新执行
disabled是否禁用此分片项

servers 节点

作业服务器信息,子节点是作业服务器的 IP 地址。 可在 IP 地址节点写入 DISABLED 表示该服务器禁用。 在新的云原生架构下,servers 节点大幅弱化,仅包含控制服务器是否可以禁用这一功能。 为了更加纯粹的实现作业核心,servers 功能未来可能删除,控制服务器是否禁用的能力应该下放至自动化部署系统。

leader 节点

作业服务器主节点信息,分为 election,sharding 和 failover 三个子节点。 分别用于主节点选举,分片和失效转移处理。

leader节点是内部使用的节点。

子节点名临时节点描述
election\instance主节点服务器IP地址 一旦该节点被删除将会触发重新选举 重新选举的过程中一切主节点相关的操作都将阻塞
election\latch主节点选举的分布式锁 为 curator 的分布式锁使用
sharding\necessary是否需要重新分片的标记 如果分片总数变化,或作业服务器节点上下线或启用/禁用,以及主节点选举,会触发设置重分片标记 作业在下次执行时使用主节点重新分片,且中间不会被打断 作业执行时不会触发分片
sharding\processing主节点在分片时持有的节点 如果有此节点,所有的作业执行都将阻塞,直至分片结束 主节点分片结束或主节点崩溃会删除此临时节点
failover\items\分片项一旦有作业崩溃,则会向此节点记录 当有空闲作业服务器时,会从此节点抓取需失效转移的作业项
failover\items\latch分配失效转移分片项时占用的分布式锁 为 curator 的分布式锁使用

流程图

作业启动

作业启动

作业执行

作业执行

失效转移

概念

失效转移是当前执行作业的临时补偿执行机制,在下次作业运行时,会通过重分片对当前作业分配进行调整。

定时作业

举例说明,若作业以每小时为间隔执行,每次执行耗时 30 分钟。图中表示作业分别于 12:00,13:00 和 14:00 执行。图中显示的当前时间点为 13:00 的作业执行中。

如果作业的其中一个分片服务器在 13:10 的时候宕机,那么剩余的 20 分钟应该处理的业务未得到执行,并且需要在 14:00 时才能再次开始执行下一次作业。 也就是说,在不开启失效转移的情况下,位于该分片的作业有 50 分钟空档期。在开启失效转移功能之后,ElasticJob 的其他服务器能够在感知到宕机的作业服务器之后,补偿执行该分片作业。在资源充足的情况下,作业仍然能够在 13:30 完成执行。

执行机制

当作业执行的节点宕机时,会触发失效转移流程。ElasticJob 根据触发时的分布式作业执行的不同情况来决定失效转移的执行时机

1、通知执行。当其它服务器感知到有失效转移的作业需要处理时,且该作业服务器已经完成了本地作业,则会实时的拉取待失效转移的分片项,并开始补偿执行。也称为实时执行。

2、问询执行。作业服务器在本地任务执行结束后,会向注册中心询问待执行的失效转移分片项,如果有,则开始补偿执行。也称为异步执行。

适用场景

开启失效转移功能,ElasticJob 会监控作业每一个分片的执行状态,并将其写入到注册中心,供其它节点感知。

在一次运行耗时较长且间隔较长的作业场景,失效转移是提升作业运行实时性的有效手段;对于间隔较短的作业,会产生大量与注册中心的网络通信,对集群的性能产生影响。而且间隔较短的作业并未见得关注单次作业的实时性,可以通过下次作业执行的重分片使所有的分片正确执行,因此不建议短间隔作业开启失效转移

另外需要注意的是,作业本身的幂等性,是保证失效转移正确性的前提。

错过任务重执行

错误任务重执行功能可以使逾期未执行的作业在上次作业执行完成之后立即执行。ElasticJob 不允许作业在同一时间内叠加执行。

定时作业

举例说明,若作业以每小时为间隔执行,每次执行耗时 30 分钟。如果 12:00 开始执行的作业在 13:10 才执行完毕,那么本该由 13:00 触发的作业则错过了触发时间,需要等待至 14:00 的下次作业触发。在开启错过任务重执行功能之后,ElasticJob 将会在上次作业执行完毕后,立刻触发执行错过的作业。如下图所示。

错过作业重执行

适用场景

在一次运行耗时较长且间隔较长的作业场景,错过任务重执行是提升作业运行实时性的有效手段; 对于未见得关注单次作业的实时性的短间隔的作业来说,开启错过任务重执行并无必要。

作业 API

作业

ElasticJob 的作业分为:基于 class 和基于 type 两种类型。

基于 class 的作业需要开发者自行通过实现接口的方式织入业务逻辑。此外方法参数 shardingContext 包含作业配置、分片和运行时信息。可以通过 getShardingTotalCount()、getShardingItem() 等方法分别获取分片总数、运行在本作业服务器的分片序列号等。 ElasticJob 目前提供了 Simple、Dataflow 两种基于 class 的作业类型。

基于 type 的作业无需编码,只需要提供相应配置即可。ElasticJob 目前提供了 Script、HTTP 两种基于 type 的作业类型。

此外,用于可以通过实现 SPI 接口自行拓展作业类型。

简单作业

意为简单实现,未经任何封装的类型。需要实现 SimpleJob 接口。该接口仅提供单一方法用于覆盖,此方法将定时执行。与 Quartz 原生接口相似,但是提供了弹性扩缩容和分片等功能。

public class MySimpleJob implements SimpleJob {

    @Override
    public void execute(ShardingContext shardingContext) {
        switch (shardingContext.getShardingItem()) {
            case 0: {
                // process task
                break;
            }
            case 1: {
                // process task
                break;
            }
            case 2: {
                // process task
                break;
            }
            default: {
                break;
            }
        }
    }
}

数据流作业

用于处理数据流,需要实现 DataflowJob 接口。该接口提供了两个方法可供覆盖,分别用于抓取(fetchData)和处理(processData)数据。

public class MyDataflowJob implements DataflowJob<String> {

    @Override
    public List<String> fetchData(ShardingContext shardingContext) {
        List<String> result = new ArrayList<>();
        switch (shardingContext.getShardingItem()) {
            case 0: {
                // get data from database by sharding item 0
                break;
            }
            case 1: {
                // get data from database by sharding item 1
                break;
            }
            case 2: {
                // get data from database by sharding item 2
                break;
            }
            default: {
                break;
            }
        }
        return result;
    }

    @Override
    public void processData(ShardingContext shardingContext, List<String> list) {
        // process data
    }
}

可以通过属性配置 streaming.process 开启或者关闭流式处理。如果开启流式处理,则作业只有在 fetchData 方法的返回值为 null 或者集合容量为空时,才停止抓取,否则作业将一直运行下去;如果关闭流式处理,则每次作业执行过程中仅执行一次 fetchData 和 processData 方法,随即完成本次作业。

如果采用流式作业处理方式,建议 processData 在处理数据后更新其状态,避免 fetchData 再次抓取到,从而使得作业永不停止。

脚本作业

支持 shell、python、perl 等所有类型的脚本。可以通过配置属性 script.command.line 配置待执行脚本,无需编码。执行脚本路径可包含参数,参数传递完毕后,作业框架自动追加最后一个参数为作业运行时信息。

例如如下脚本:

#!/bin/bash
echo sharding execution context is $*

作业运行时将输出:

sharding execution context is {"jobName":"scriptElasticDemoJob","shardingTotalCount":10,"jobParameter":"","shardingItem":0,"shardingParameter":"A"}

HTTP作业

可以通过配置属性 http.urlhttp.methodhttp.data 等配置待请求的 http 信息。分片信息以 Header 形式传递,key 为 shardingContext,值为 json 格式。

public class HttpJobMain {
  public static void main(String[] args) {
    new ScheduleJobBootstrap(regCenter, "HTTP", JobConfiguration.newBuilder("javaHttpJob", 1)).setProperty(HttpJobProperties.URI_KEY, "http://xxx.com/execute")
      .setProperty(HttpJobProperties.METHOD_KEY, "POST")
      .setProperty(HttpJobProperties.DATA_KEY, "source=ejob")
      .cron("0/5 * * * * ?").shardingItemParameters("0=Beijing").build()).schedule();
  }
}
@Controller
@Slf4j
public class HttpJobController {
  
  @PostMapping("execute")
  public void execute(String source, @RequestHeader String shardingContext) {
    log.info("execute from source : {}, shardingContext : {}", source, shardingContext);
  }
}

作业运行时将输出:

execute from source : ejob, shardingContext : {"jobName":"scriptElasticDemoJob","shardingTotalCount":3,"jobParameter":"","shardingItem":0,"shardingParameter":"Beijing"}

Spring Boot 环境中,需要将对应的作业注册为 Spring 容器中的 bean。比如标注 @Component 注解。

Bean 默认是单例的,如果该作业实现会在同一个进程内被创建出多个 JobBootstrap 的实例, 可以考虑设置 Scope 为 prototype

作业配置

JobConfiguration.newBuilder("mySimpleJob", 3)
        .cron("0/5 * * * * ?")
  			.shardingItemParameters("0=Beijing,1=Shanghai,2=Guangzhou").build();

注意:一次性调度的作业配置不需要加 cron 表达式。

Spring Boot 环境中,在 application.yml 配置文件中进行配置。

elasticjob:
  reg-center:
    server-lists: 10.211.55.6:2181
    namespace: elasticjob-lite-springboot
  jobs:
    mySimpleJob:
      elasticJobClass: com.mzs.elasticjob.demo.job.MySimpleJob
      cron: 0/5 * * * * ?
      shardingTotalCount: 3
      shardingItemParameters: 0=Beijing,1=Shanghai,2=Guangzhou

作业启动

ElasticJob-Lite 调度器分为定时调度和一次性调度两种类型。每种调度器启动时都需要注册中心配置、作业对象(作业类型)以及作业配置三个参数。

定时调度

public class JobDemo {

    public static void main(String[] args) {
        jobDemo1();
    }

    /**
     * 定时调度
     */
    private static void jobDemo1() {
        // 调度基于 class 类型的作业
        new ScheduleJobBootstrap(createRegistryCenter(), new MySimpleJob(), createJobConfiguration()).schedule();
        // 调度基于 type 类型的作业
        new ScheduleJobBootstrap(createRegistryCenter(), "MY_TYPE", createJobConfiguration()).schedule();
    }

    private static JobConfiguration createJobConfiguration() {
        return JobConfiguration.newBuilder("mySimpleJob", 3)
                .cron("0/5 * * * * ?").shardingItemParameters("0=Beijing,1=Shanghai,2=Guangzhou").build();
    }

    private static CoordinatorRegistryCenter createRegistryCenter() {
        CoordinatorRegistryCenter registryCenter = new ZookeeperRegistryCenter(new ZookeeperConfiguration("10.211.55.6:2181,10.211.55.7:2181,10.211.55.8:2181", "elasticjob-demo"));
        registryCenter.init();
        return registryCenter;
    }

}

定时调度作业在 Spring Boot 应用程序启动完成后自动启动,无需其它额外操作。

一次性调度

public class JobDemo {

    public static void main(String[] args) {
        jobDemo2();
    }

  	/**
     * 一次性调度
     */
    private static void jobDemo2() {
        // 调度基于 class 类型的作业
        OneOffJobBootstrap oneOffJobBootstrap = new OneOffJobBootstrap(createRegistryCenter(), new MySimpleJob(), createJobConfigurationForOneOffJob());
        oneOffJobBootstrap.execute();
    }

    private static JobConfiguration createJobConfigurationForOneOffJob() {
        return JobConfiguration.newBuilder("mySimpleJob", 3)
                .shardingItemParameters("0=Beijing,1=Shanghai,2=Guangzhou").build();
    }

    private static CoordinatorRegistryCenter createRegistryCenter() {
        CoordinatorRegistryCenter registryCenter = new ZookeeperRegistryCenter(new ZookeeperConfiguration("10.211.55.6:2181,10.211.55.7:2181,10.211.55.8:2181", "elasticjob-demo"));
        registryCenter.init();
        return registryCenter;
    }

}

一次性调度在 Spring Boot 的 application.yml 文件中配置 jobBootstrapBeanName 属性。然后注入 OneOffJobBootstrap,在合适的地方调用它的 execute 方法。

@Resource(name = "someJobBootstrapBeanName")
private OneOffJobBootstrap oneOffJobBootstrap;

// @Autowired
// @Qualifier(name = "someJobBootstrapBeanName")
// private OneOffJobBootstrap oneOffJobBootstrap;

oneOffJobBootstrap.execute();

配置作业导出端口

使用 ElasticJob-Lite 过程中可能会遇到一些分布式问题,导致作业运行不稳定。

由于无法在生产环境调试,通过 dump 命令可以把作业内部相关信息导出,方便开发者调试分析;导出命令的使用请参见 运维指南。

以下示例用于展示如何通过 SnapshotService 开启用于导出命令的监听端口:

new SnapshotService(createRegistryCenter(), 9888).listen();

在 Spring Boot 环境中,配置如下:

elasticjob:
  reg-center:
    server-lists: 10.211.55.6:2181
    namespace: elasticjob-lite-springboot
  dump:
    enabled: true
    port: 9888

配置错误处理策略

使用 ElasticJob-Lite 过程中当作业发生异常后,可采用以下错误处理策略。

错误处理策略名称说明是否内置是否默认是否需要额外配置
记录日志策略记录作业异常日志,但不中断作业执行
抛出异常策略抛出系统异常并中断作业执行
忽略异常策略忽略系统异常且不中断作业执行
邮件通知策略发送邮件消息通知,但不中断作业执行
企业微信通知策略发送企业微信消息通知,但不中断作业执行
钉钉通知策略发送钉钉消息通知,但不中断作业执行

在实际开发中,配置错误处理策略需要在作业配置的 jobErrorHandlerType 方法指定参数,以上六种错误处理策略分别对应:LOG、THROW、IGNORE、EMAIL、WECHAT、DINGTALK。

JobConfiguration.newBuilder("mySimpleJob", 3)
  .cron("0/5 * * * * ?")
  .shardingItemParameters("0=Beijing,1=Shanghai,2=Guangzhou")
  .jobErrorHandlerType("LOG")
  .build();

对于邮件通知策略、企业微信通知策略、钉钉通知策略额外需要引入相关依赖以及设置相关配置,详见:错误处理策略。

在 Spring Boot 环境中,需要在 application.yml 配置文件中指定 jobErrorHandlerType 属性,支持 LOG、THROW、IGNORE、EMAIL、WECHAT、DINGTALK。

elasticjob:
  reg-center:
    server-lists: 10.211.55.6:2181
    namespace: elasticjob-lite-springboot
  jobs:
    mySimpleJob:
      elasticJobClass: com.mzs.elasticjob.demo.job.MySimpleJob
      cron: 0/5 * * * * ?
      shardingTotalCount: 3
      shardingItemParameters: 0=Beijing,1=Shanghai,2=Guangzhou
      jobErrorHandlerType: LOG

作业监听器

ElasticJob-Lite 提供了作业监听器,用于在任务执行前和任务执行后的执行监听的方法。

常规监听器

监听每个节点的任务执行。适用于任务实现简单,并且不需要考虑全局分布式任务是否完成的场景。比如:作业处理作业服务器的文件,处理完成后删除文件,每个节点都参与执行任务。

public class MyElasticJobListener implements ElasticJobListener {

	/**
     * 在任务开始执行时调用该方法
     */
    @Override
    public void beforeJobExecuted(ShardingContexts shardingContexts) {

    }

	/**
     * 在任务开始执行时调用该方法
     */
    @Override
    public void afterJobExecuted(ShardingContexts shardingContexts) {

    }

    @Override
    public String getType() {
        return "simpleJobListener";
    }
}
JobConfiguration.newBuilder("mySimpleJob", 3)
  .cron("0/5 * * * * ?")
  .shardingItemParameters("0=Beijing,1=Shanghai,2=Guangzhou")
  .jobListenerType("simpleJobListener")
  .build();

在作业配置中通过 jobListenerType 方法添加对应的作业监听器的类型。

分布式监听器

监听集群中最后一个开始/结束执行任务的节点的任务执行。需要同步分布式环境下的作业的状态同步,提供了超时设置来避免作业状态不同步导致死锁。比如:作业处理数据库数据,处理完成后只需要一个节点完成数据清理任务即可。

public class MyDistributedOnceJobListener extends AbstractDistributeOnceElasticJobListener {

    private static final long START_TIMEOUT_MILLS = 3000;
    private static final long COMPLETE_TIMEOUT_MILLS = 3000;

    public MyDistributedOnceJobListener() {
        super(START_TIMEOUT_MILLS, COMPLETE_TIMEOUT_MILLS);
    }

    /**
     * 在任务开始执行时调用该方法
     */
    @Override
    public void doBeforeJobExecutedAtLastStarted(ShardingContexts shardingContexts) {

    }

    /**
     * 在任务结束执行时调用该方法
     */
    @Override
    public void doAfterJobExecutedAtLastCompleted(ShardingContexts shardingContexts) {

    }

    @Override
    public String getType() {
        return "distributeOnceJobListener";
    }
}
JobConfiguration.newBuilder("mySimpleJob", 3)
  .cron("0/5 * * * * ?")
  .shardingItemParameters("0=Beijing,1=Shanghai,2=Guangzhou")
  .jobListenerType("distributeOnceJobListener")
  .build();

在作业配置中通过 jobListenerType 方法添加对应的作业监听器的类型。

Spring Boot 环境中, 将 JobListener 实现添加至 resources/META-INF/services/org.apache.shardingsphere.elasticjob.infra.listener.ElasticJobListener。

com.mzs.elasticjob.demo.listener.MyElasticJobListener

在配置文件中指定如下:

elasticjob:
  reg-center:
    server-lists: 10.211.55.6:2181
    namespace: elasticjob-lite-springboot
  jobs:
    mySimpleJob:
      elasticJobClass: com.mzs.elasticjob.demo.job.MySimpleJob
      cron: 0/5 * * * * ?
      shardingTotalCount: 3
      shardingItemParameters: 0=Beijing,1=Shanghai,2=Guangzhou
      jobListenerTypes: simpleJobListener

事件追踪

ElasticJob 提供了事件追踪的功能,可以通过事件订阅的方式处理调度过程的重要事件,用于查询、统计和监控。

目前提供了基于关系型数据库的事件订阅方式记录事件,开发者也可以通过 SPI 自行拓展。

// 初始化数据源
DataSource dataSource = ...;
// 定义日志数据库事件溯源配置
TracingConfiguration tracingConfig = new TracingConfiguration("RDB", dataSource);
JobConfiguration jobConfig = JobConfiguration.newBuilder("mySimpleJob", 3)
  .cron("0/5 * * * * ?")
  .shardingItemParameters("0=Beijing,1=Shanghai,2=Guangzhou")
  .build();
jobConfig.getExtraConfigurations().add(tracingConfig);

需要定义 TracingConfiguration,然后将其添加到作业配置中的 extraConfigurations 集合中。

在 Spring Boot 环境中,需要导入如下依赖:

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-jdbc</artifactId>
</dependency>

然后配置如下:

spring:
  datasource:
    url: jdbc:mysql://10.211.55.6:3306/elastic_job_log
    driver-class-name: com.mysql.cj.jdbc.Driver
    username: root
    password:	root

elasticjob:
  tracing:
    type: RDB

最后Elastic Job 会自动创建 JOB_EXECUTION_LOG 和 JOB_STATUS_TRACE_LOG 两张表以及若干索引。

JOB_EXECUTION_LOG 字段含义

字段名称字段类型是否必填描述
idVARCHAR(40)主键
job_nameVARCHAR(100)作业名称
task_idVARCHAR(1000)任务名称,每次作业运行生成新任务
hostnameVARCHAR(255)主机名称
ipVARCHAR(50)主机IP
sharding_itemINT分片项
execution_sourceVARCHAR(20)作业执行来源。可选值为NORMAL_TRIGGER, MISFIRE, FAILOVER
failure_causeVARCHAR(2000)执行失败原因
is_successBIT是否执行成功
start_timeTIMESTAMP作业开始执行时间
complete_timeTIMESTAMP作业结束执行时间

JOB_EXECUTION_LOG 记录每次作业的执行历史。 分为两个步骤:

  1. 作业开始执行时向数据库插入数据,除 failure_cause 和 complete_time 外的其他字段均不为空。
  2. 作业完成执行时向数据库更新数据,更新 is_success, complete_time 和 failure_cause(如果作业执行失败)。

JOB_STATUS_TRACE_LOG 字段含义

字段名称字段类型是否必填描述
idVARCHAR(40)主键
job_nameVARCHAR(100)作业名称
original_task_idVARCHAR(1000)原任务名称
task_idVARCHAR(1000)任务名称
slave_idVARCHAR(1000)执行作业服务器的名称,Lite版本为服务器的IP地址,Cloud版本为Mesos执行机主键
sourceVARCHAR(50)任务执行源,可选值为CLOUD_SCHEDULER, CLOUD_EXECUTOR, LITE_EXECUTOR
execution_typeVARCHAR(20)任务执行类型,可选值为NORMAL_TRIGGER, MISFIRE, FAILOVER
sharding_itemVARCHAR(255)分片项集合,多个分片项以逗号分隔
stateVARCHAR(20)任务执行状态,可选值为TASK_STAGING, TASK_RUNNING, TASK_FINISHED, TASK_KILLED, TASK_LOST, TASK_FAILED, TASK_ERROR
messageVARCHAR(2000)相关信息
creation_timeTIMESTAMP记录创建时间

JOB_STATUS_TRACE_LOG 记录作业状态变更痕迹表。 可通过每次作业运行的 task_id 查询作业状态变化的生命周期和运行轨迹。

分片策略

平均分片策略

类型:AVG_ALLOCATION

根据分片项平均分片。

如果作业服务器数量与分片总数无法整除,多余的分片将会顺序的分配至每一个作业服务器。

举例说明:

  1. 如果 3 台作业服务器且分片总数为9,则分片结果为:1=[0,1,2], 2=[3,4,5], 3=[6,7,8];
  2. 如果 3 台作业服务器且分片总数为8,则分片结果为:1=[0,1,6], 2=[2,3,7], 3=[4,5];
  3. 如果 3 台作业服务器且分片总数为10,则分片结果为:1=[0,1,2,9], 2=[3,4,5], 3=[6,7,8]。

奇偶分片策略

类型:ODEVITY

根据作业名称哈希值的奇偶数决定按照作业服务器 IP 升序或是降序的方式分片。

如果作业名称哈希值是偶数,则按照 IP 地址进行升序分片; 如果作业名称哈希值是奇数,则按照 IP 地址进行降序分片。 可用于让服务器负载在多个作业共同运行时分配的更加均匀。

举例说明:

  1. 如果 3 台作业服务器,分片总数为2且作业名称的哈希值为偶数,则分片结果为:1 = [0], 2 = [1], 3 = [];
  2. 如果 3 台作业服务器,分片总数为2且作业名称的哈希值为奇数,则分片结果为:3 = [0], 2 = [1], 1 = []。

轮询分片策略

类型:ROUND_ROBIN

根据作业名称轮询分片。

线程池策略

CPU 资源策略

类型:CPU

根据 CPU 核数 * 2 创建作业处理线程池。

单线程策略

类型:SINGLE_THREAD

使用单线程处理作业。

UI工具

使用 ElasticJob UI,界面如下:

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/338114.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

从0到1一步一步玩转openEuler--10 openEuler基础配置-设置kdump

10 openEuler基础配置-设置kdump 文章目录10 openEuler基础配置-设置kdump10.1 设置kdump10.1.1 设置kdump预留内存10.1.1.1 预留内存参数格式10.1.2 预留内存推荐值10.1.3 禁用网络相关驱动10.1 设置kdump 本节介绍如何设置kdump预留内存及修改kdump配置文件参数。 10.1.1 设…

写python爬虫,你永远绕不过去代理问题

如果你想要从事 Python 爬虫相关岗位&#xff0c;那你一定会接触到代理问题&#xff0c;随之而来的就是下面 5 大代理知识点。 什么是代理&#xff1a;代理是网络中间人&#xff08;中间商赚插件&#xff09;&#xff0c;它代表用户发送网络请求&#xff0c;隐藏用户的真实身份…

JDY-31蓝牙模块使用指南

前言 本来是想买个hc-05&#xff0c;这种非常常用的模块&#xff0c;但是在优信电子买的时候&#xff0c;说有个可以替代的&#xff0c;没注意看&#xff0c;买回来折腾半天。 这个模块是从机模块&#xff0c;蓝牙模块分为主机从机和主从一体的&#xff0c;主机与从机的区别就…

【安全】nginx反向代理+负载均衡上传webshel

Nginx负载均衡下上传webshell 什么是反向代理&#xff1f; 正向代理就是代替客户端进行各种服务的访问以及获取&#xff1b;那么反向代理自然就是代替服务器进行事务处理&#xff0c;就是此时的代理服务器负责将用户的各项请求做一个汇总、分类&#xff0c;将其分发到不同的服务…

网络抓包方式复现Tomcat- AJP协议文件读取/命令执行漏洞(CVE-2020-1938 / CNVD-2020-10487)

目录 测试是否安装成功​编辑 基础简介 Tomcat Connector(连接器) ​编辑Servlet(服务程序) Tomcat内部处理请求流程 Tomcat加载和处理jsp的流程图 抓包复现 需要将下图中抓取到的数据包修改一下 替换成二进制数据的形式&#xff1a; python版替换代码&#xff1a; 运…

WordPress网站日主题Ri主题RiProV2主题开启了验证码登录但是验证码配置不对结果退出登录后进不去管理端了

背景 WordPress网站日主题Ri主题RiProV2主题开启了验证码登录但是验证码配置不对结果退出登录后进不去管理端了;开启了腾讯云验证码防火墙但APPID,APPSecret没配置,结果在退出登录后,由于验证码验证失败管理端进不去了 提示如下:

自定义软件帮助文档(qt assistant实现)

网上搜了一下&#xff0c;软件的帮助文档&#xff0c;三个都可以&#xff1a;https://github.com/zealdocs/zeal&#xff0c;https://zealdocs.org/&#xff0c;看看这个博客说的 https://blog.csdn.net/libaineu2004/article/details/125028913&#xff0c;这个也是开源的&…

神经网络实战--使用迁移学习完成猫狗分类

前言&#xff1a; Hello大家好&#xff0c;我是Dream。 今天来学习一下如何使用基于tensorflow和keras的迁移学习完成猫狗分类&#xff0c;欢迎大家一起前来探讨学习~ 本文目录&#xff1a;一、加载数据集1.调用库函数2.加载数据集3.数据集管理二、猫狗数据集介绍1.猫狗数据集介…

【Spring(十一)】万字带你深入学习面向切面编程AOP

文章目录前言AOP简介AOP入门案例AOP工作流程AOP切入点表达式AOP通知类型AOP通知获取数据总结前言 今天我们来学习AOP,在最初我们学习Spring时说过Spring的两大特征&#xff0c;一个是IOC,一个是AOP,我们现在要学习的就是这个AOP。 AOP简介 AOP:面向切面编程,一种编程范式&#…

计算机网络自顶向下 -- 流水线,滑动窗口协议

流水线协议 Rdt3.0在停等操作的过程中浪费了大量的时间&#xff1a; 从而在Rdt 3.0上引入了流水线机制&#xff1a;为了提高资源利用率 流水线协议&#xff1a; 允许发送方在收到ACK之前连续发送多个分组&#xff0c;更大的序列号范围&#xff0c;同时发送方和/或接收方需要更…

关于自动驾驶高精定位的几大问题

交流群 | 进“传感器群/滑板底盘群”请加微信号&#xff1a;xsh041388交流群 | 进“汽车基础软件群”请加微信号&#xff1a;Faye_chloe备注信息&#xff1a;群名称 真实姓名、公司、岗位作者 | 许良定位是高等级自动驾驶的基础&#xff0c;但在高速NOA和城区NOA等场景中&…

Linux账号与用户组

目录 用户标识符&#xff1a;UID与GID 用户账号 /etc/passwd文件结构 1、账号名称 2、密码 3、UID 4、GID 5、用户信息说明栏 6、家目录 7、shell /etc/shadow文件结构 1、账号名称 2、密码 3、最近修改密码的日期 4、密码不可被修改的天数&#xff08;与第三字…

Git | 在IDEA中使用Git

目录 一、在IDEA中配置Git 1.1 配置Git 1.2 获取Git仓库 1.3 将本地项目推送到远程仓库 1.4 .gitignore文件的作用 二、本地仓库操作 2.1 将文件加入暂存区 2.2 将暂存区的文件提交到版本库 2.3 查看日志 三、远程仓库操作 3.1 查看和添加远程仓库 3.2 推送至远程仓…

fastcgi未授权访问漏洞(php-fpm fast-cgi未授权访问漏洞)

本文参考《Fastcgi协议分析 && PHP-FPM未授权访问漏洞 && Exp编写》进行该漏洞的复现以及分析。 1.前置基础 1.1 nginx中的fastcgi 先来看先前用过的一张图&#xff0c;其是nginx解析用户请求的过程。 图中的几个定义&#xff1a; CGI&#xff1a;CGI是一种…

1628_MIT 6.828 xv6_chapter0操作系统接口

全部学习汇总&#xff1a; GreyZhang/g_unix: some basic learning about unix operating system. (github.com) 这本书最初看名字以为是对早期unix的一个解读&#xff0c;但是看了开篇发现 不完全是&#xff0c;只是针对JOS教学OS系统来做的一些讲解。 Xv6是对UNIX v6的重新实…

【Java 面试合集】Java中修饰符有哪些,有什么应用场景

Java中修饰符有哪些&#xff0c;有什么应用场景 1. 概述 首先我们要知道Java的三大特性&#xff1a;封装&#xff0c;继承&#xff0c;多态。 而我们今天要分析的修饰符就跟封装有着密切的联系。因为权限修饰符可以控制变量以及方法的作用范围。 废话不多说&#xff0c;上图…

Python推导式

列表&#xff08;list&#xff09;推导式 [remove for source in xx_list]或者[remove for source in xx_list if condition] 实例&#xff1a; names[Bob,Mark,Mausk,Johndan,Wendy] new_names[name.upper() for name in names if len(name)<5] print(new_names)即迭代列…

PC端开发GUI

PC端开发GUI PC端环境搭建1、Python2、PycharmPC端环境搭建 1、Python 注意Python版本不能超过3.9,因为pyqt-tools只维护到python对应的该版本 1.1、查找是否安装python:win+R,输入cmd回车,输入python或python -V或python --version 1.2、若1.1没有,则下载安装下载链接…

天津菲图尼克科技携洁净及无菌防护服解决方案与您相约2023生物发酵展

BIO CHINA 生物发酵产业一年一度行业盛会&#xff0c;由中国生物发酵产业协会主办&#xff0c;上海信世展览服务有限公司承办&#xff0c;2023第10届国际生物发酵产品与技术装备展览会&#xff08;济南&#xff09;于2023年3月30-4月1日在山东国际会展中心&#xff08;济南市槐…

亿级高并发电商项目-- 实战篇 --万达商城项目 二(Zookeeper、Docker、Dubbo-Admin等搭建工作

&#x1f44f;作者简介&#xff1a;大家好&#xff0c;我是小童&#xff0c;Java开发工程师&#xff0c;CSDN博客博主&#xff0c;Java领域新星创作者 &#x1f4d5;系列专栏&#xff1a;前端、Java、Java中间件大全、微信小程序、微信支付、若依框架、Spring全家桶 &#x1f4…