【手把手】分布式定时任务调度解析之Elastic-Job

news2025/1/18 20:55:35

1、这货怎么没怎么听过

经常使用Quartz或者Spring Task的小伙伴们,或多或少都会遇到几个痛点,比如:
1、不敢轻易跟着应用服务多节点部署,可能会重复多次执行而引发系统逻辑的错误;
2、Quartz的集群仅仅只是用来HA,节点数量的增加并不能给我们的每次执行效率带来提升,即不能实现水平扩展;

在当当的ddframe框架中,需要一个任务调度系统。实现的话有两种思路,一个是修改开源产品,一种是基于开源产品搭建,也就是封装。当当选择了后者,最开始这个调度系统叫做dd-job。它是一个无中心化的分布式调度框架。因为数据库缺少分布式协调功能(比如选主),替换为Zookeeper后,增加了弹性扩容和数据分片的功能。Elastic-Job是ddframe中的dd-job作业模块分离出来的作业框架,基于Quartz和Curator开发,在2015年开源。

Elastic-Job是当当网架构师张亮、曹昊和江树建基于Zookepper和Quartz开发并开源的一个Java分布式定时任务(跟大名鼎鼎的ElasticSearch没有半毛钱关系),解决了Quartz不支持分布式的弊端。Elastic-Job主要的功能有支持弹性扩容,通过Zookepper集中管理和监控job,支持失效转移等。最开始只有一个elastic-job-core的项目,在2.X版本以后主要分为Elastic-Job-Lite和Elastic-Job-Cloud两个子项目。其中,Elastic-Job-Lite定位为轻量级无中心化解决方案,使用jar包的形式提供分布式任务的协调服务。而Elastic-Job-Cloud使用Mesos + Docker的解决方案,额外提供资源治理、应用分发以及进程隔离等服务(跟Lite的区别只是部署方式不同,使用相同的API,只要开发一次)。

之所以感觉这货没什么人用,是因为这个项目在2020年捐给了Apache之后,将原生的编程方式基本删光了,现在在网上很难再找到原生的编程方式。更离谱的是,官网给出的Demo还漏了很多,也就是说你跟着官网一步一步写下来,结果还构建不起来.....

那为啥我还要写这么一篇文章出来,按道理说一个连官网都放弃的东西,费这些精力干啥。因为Elastic-Job中有一个很关键的概念:分片,也就是任务分片策略,这也正是它和Quartz之间最大的区别所在。而现在的Elastic-Job已经成为了一个二级子项目了,它的注册中心依赖于Zookeeper,所以如果要使用Elastic-Job的话,前提得先安装Zookeeper服务。

2、Quartz遗留的问题

① 假设在一个Quartz集群中有多个正在运行的节点,要如何决定哪些任务在哪些节点上运行呢?Quartz的处理方法非常简单粗暴,就是随机的。通过数据库中存储的信息,去抢占下一个即将触发的触发器所绑定的任务权限,并不支持对任务的执行节点进行协调;

② 当处理一个非常复杂的任务的时候,某一个节点的性能始终是有限的。如果可以将一个复杂的任务拆分成多个子任务,分别交由不同的节点协同处理,效率上必定事半功倍;

③ Quartz本身并不支持图形化管理页面,对于任务的管理非常的不方便;

3、初试Elastic-Job

以上的这些问题,Elastic-Job统统都可以解决,在弥补Quartz不足的这方面,Elastic-Job是认真的,但必须承认的是,它真的也没有特别好用。很遗憾,现在去Elastic-Job的官网已经访问不了了,这个官网早些年前被买给了快橙(一个VPN工具),现在它的官网已经沦落为shardingsphere下的一个二级网站:https://shardingsphere.apache.org/elasticjob/index_zh.html

启动Zookeeper服务

因为Elastic-Job是依赖于Zookeeper,所以先确保相关的Zookeeper服务启动成功。这里方便演示,我就不再专门弄个虚拟机搭一个Zookeeper服务,直接在本地启动Zookeeper服务。有几个点需要注意一下:

1、Zookeeper的压缩包解压缩之后,进入目录总,新建data目录存放数据,新建logs目录存放日志;

2、 进入conf目录中,复制zoo_sample.cfg一份,并从命名为zoo.cfg作为Zookeeper的配置文件,需要进行一些基础的修改;

3、进入Zookeeper中的bin目录下,执行:zkServer.cmd

看到日志输出 ZooKeeper audit is enabled.  则说明Zookeeper服务启动成功。

引入Elastic-Job相关依赖

<dependency>
    <groupId>org.apache.shardingsphere.elasticjob</groupId>
    <artifactId>elasticjob-lite-core</artifactId>
    <version>3.0.1</version>
</dependency>

目前Maven中央仓库最新的版本是3.0.2,更新于22年10月23日,而3.0.1是目前使用最多的版本,更新于21年10月11日。好家伙,一年一更,看这个维护的频率就知道这玩意儿没什么人用。

SimpleJob

自定义需执行的任务,实现SimpleJob接口

构建注册中心, 配置任务作业

启动Elastic-Job服务

可以看到,指定的7个分片进行任务的执行。这里只是简单的打印一句话,在实际的业务中,可以通过代码判断,控制不同的分片执行不同的任务,就可以进行任务的拆分执行。

官网的教程甚至都没有给出如何构建作业配置.....

DataFlowJob

上面演示的是最简单的任务模式,Elastic-Job还提供了另外一种数据流模式:DataFlowJob,用于处理数据流。必须实现fetchData()和processData()的方法,一个用来获取数据,一个用来处理获取到的数据,其它跟SimpleJob没有区别。

实现DataFlowJob接口

构建注册中心, 配置任务作业

启动Elastic-Job服务

可以看到,定义的3个分片进行任务执行

ScriptJob

Script类型作业意为脚本类型作业,支持shell,python,perl等所有类型脚本。说白了,这种类型的Job就是定时去执行脚本文件。把需要执行的脚本文件写好,告诉它在哪里,什么名字,到点了它自动去给你跑了就算完事儿。

使用ScriptJob这种类型的时候,有几个点需要注意一下:
1、在构建调度器Scheduler时,参数elasticJobType固定写死就是"SCRIPT";
2、在构建作业配置时,.setProperty方法的第一个参数也是固定写死"script.command.line",第二个参数是脚本文件的绝对路径;

脚本内容就是很简单的一句话:@echo ------【脚本任务】Sharding Context: %*

4、通过SPI的方式实现自定义作业类型

自定义任务接口,继承ElasticJob接口

实现自定义的任务接口

自定义任务执行器接口,继承ClassedJobItemExecutor接口

实现自定义的任务执行器接口

创建配置文件

在 resources.META-INF.services 目录下,以任务执行器接口的全路径名称创建一个配置文件,内容就是自定义的任务执行器实现类的全路径名

执行自定义的作业

package com.feenix.elasticjob.service;

import com.feenix.elasticjob.service.impl.FeenixJobImpl;
import org.apache.shardingsphere.elasticjob.api.JobConfiguration;
import org.apache.shardingsphere.elasticjob.lite.api.bootstrap.impl.ScheduleJobBootstrap;
import org.apache.shardingsphere.elasticjob.reg.base.CoordinatorRegistryCenter;
import org.apache.shardingsphere.elasticjob.reg.zookeeper.ZookeeperConfiguration;
import org.apache.shardingsphere.elasticjob.reg.zookeeper.ZookeeperRegistryCenter;

public class Application {

    public static void main(String[] args) {
        new ScheduleJobBootstrap(createRegistryCenter(),
                                 new FeenixJobImpl(),
                                 createJobConfiguration())
                                 .schedule();
    }

    // 构建Zookeeper注册中心
    private static CoordinatorRegistryCenter createRegistryCenter() {
        ZookeeperConfiguration zkConfiguration = new ZookeeperConfiguration("192.168.0.31:2181",
                                                                           "feenix-job");
        zkConfiguration.setConnectionTimeoutMilliseconds(100000);
        zkConfiguration.setMaxRetries(10);

        ZookeeperRegistryCenter zkRegistryCenter = new ZookeeperRegistryCenter(zkConfiguration);
        zkRegistryCenter.init();
        return zkRegistryCenter;
    }

    // 作业配置
    private static JobConfiguration createJobConfiguration() {
        String jobs = "0=宋远桥,1=俞莲舟,2=俞岱岩,3=张松溪,4=张翠山,5=殷梨亭,6=莫声谷";
        return JobConfiguration.newBuilder("FeenixJobImpl", 7)
                               .cron("0/3 * * * * ?")
                               .shardingItemParameters(jobs)
                               // 使用自定义的作业分片策略
                               /*.jobSharding   StrategyType("Shuffle")*/
                               // 允许客户端配置覆盖注册中心
                               .overwrite(true)
                               // 故障转移
                               .failover(true)
                               .build();
    }

}

5、分片

Elastic-Job中任务分片的概念,使得任务可以在分布式环境下运行,每台任务服务器只运行分配给该服务器的分片。随着服务器的增加或宕机,Elastic-Job会近乎实时的感知服务器的数量变更,从而重新为分布式服务器分配更合理的任务分片项,是的任务可以随着资源的增加而提升效率。

不过在上文中有提到过,Elastic-Job并不直接提供数据处理的功能,而是将分片项分配至各个运行中的作业服务器,开发者需要自行处理分片项与业务的对应关系。分片项为数字,从0开始。例如:按照地区水平拆分数据库,数据库 A 是北京的数据;数据库 B 是上海的数据;数据库 C 是广州的数据。 如果仅按照分片项配置,开发者需要了解 0 表示北京;1 表示上海;2 表示广州。 合理使用个性化参数可以让代码更可读,如果配置为 0=北京,1=上海,2=广州,那么代码中直接使用北京,上海,广州的枚举值即可完成分片项和业务逻辑的对应关系。

AverageAllocationJobShardingStrategy

基于平均分配算法的分片策略,也是默认的分片策略。如果分片不能整除,则余下的分片依次追加到顺序序号的服务器:
1、3台服务器,分成9片。第1台为[0,1,2]、第2台为[3,4,5]、第3台为[6,7,8];
2、3台服务器,分成8片。第1台为[0,1,6]、第2台为[2,3,7]、第3台为[4,5];
3、3台服务器,分成10片。第1台为[0,1,2,9]、第2台为[3,4,5]、第3台为[6,7,8];

OdevitySortByNameJobShardingStrategy

根据作业名哈希值的奇偶数决定IP升降序算法的分片策略。奇数则IP升序,偶数则IP降序,用于不同的作业平均分配负载至不同的服务器。

自定义分片策略

实现JobShardingStrategy接口

package com.feenix.elasticjob.strategy;

import org.apache.shardingsphere.elasticjob.infra.handler.sharding.JobInstance;
import org.apache.shardingsphere.elasticjob.infra.handler.sharding.JobShardingStrategy;

import java.util.*;

public class CustomJobShardingStrategy implements JobShardingStrategy {
    @Override
    public Map<JobInstance, List<Integer>> sharding(List<JobInstance> jobInstances, String jobName, int shardingTotalCount) {
        // 作业分片加入容器
        ArrayList<Integer> customShardingList = new ArrayList<>();
        for (int i = 0; i < shardingTotalCount; i++) {
            customShardingList.add(i);
        }

        // 将容器中的作业分片项顺序打乱
        Collections.shuffle(customShardingList);

        // 模拟AverageAllocationJobShardingStrategy算法
        Map<JobInstance, List<Integer>> result = shardingCustom(jobInstances, shardingTotalCount, customShardingList);
        addCustom(jobInstances, shardingTotalCount, result, customShardingList);
        return result;
    }

    private Map<JobInstance, List<Integer>> shardingCustom(final List<JobInstance> shardingUnits,
                                                           final int shardingTotalCount,
                                                           final ArrayList<Integer> customShardingList) {
        Map<JobInstance, List<Integer>> result = new LinkedHashMap<>(shardingUnits.size(), 1);
        int itemCountPerSharding = shardingTotalCount / shardingUnits.size();
        int count = 0;
        for (JobInstance each : shardingUnits) {
            // 每个作业服务器申请的作业分片项列表,容量是itemCountPersharding+1,为每个作业最大的分片项
            List<Integer> shardingItems = new ArrayList<>(itemCountPerSharding + 1);
            for (int i = count * itemCountPerSharding; i < (count + 1) * itemCountPerSharding; i++) {
                shardingItems.add(customShardingList.get(i));
            }

            result.put(each, shardingItems);
            count++;
        }

        return result;
    }

    private void addCustom(final List<JobInstance> shardingUnits,
                           final int shardingTotalCount,
                           final Map<JobInstance, List<Integer>> shardingResults,
                           final ArrayList<Integer> customShardingList) {
        int aliquant = shardingTotalCount % shardingUnits.size();
        int count = 0;
        for (Map.Entry<JobInstance, List<Integer>> entry : shardingResults.entrySet()) {
            if (count < aliquant) {
                entry.getValue().add(customShardingList.get(shardingTotalCount / shardingUnits.size() * shardingUnits.size() + count));
            }
            count++;
        }
    }

    @Override
    public String getType() {
        return "CUSTOM";
    }
}

创建配置文件

在 resources.META-INF.services 目录下,以自定义策略实现类的接口的全路径名称创建一个文件,内容就是自定义策略实现类的全路径

使用IDEA开启两个不同的服务实例

package com.feenix.elasticjob.service;

import com.feenix.elasticjob.service.impl.FeenixJobImpl;
import org.apache.shardingsphere.elasticjob.api.JobConfiguration;
import org.apache.shardingsphere.elasticjob.lite.api.bootstrap.impl.ScheduleJobBootstrap;
import org.apache.shardingsphere.elasticjob.reg.base.CoordinatorRegistryCenter;
import org.apache.shardingsphere.elasticjob.reg.zookeeper.ZookeeperConfiguration;
import org.apache.shardingsphere.elasticjob.reg.zookeeper.ZookeeperRegistryCenter;

public class Application {

    public static void main(String[] args) {
        new ScheduleJobBootstrap(createRegistryCenter(), new FeenixJobImpl(), createJobConfiguration()).schedule();
    }

    // 构建Zookeeper注册中心
    private static CoordinatorRegistryCenter createRegistryCenter() {
        ZookeeperConfiguration zkConfiguration = new ZookeeperConfiguration("192.168.0.31:2181",
                                                                           "feenix-job");
        zkConfiguration.setConnectionTimeoutMilliseconds(100000);
        zkConfiguration.setMaxRetries(10);

        ZookeeperRegistryCenter zkRegistryCenter = new ZookeeperRegistryCenter(zkConfiguration);
        zkRegistryCenter.init();
        return zkRegistryCenter;
    }

    // 作业配置
    private static JobConfiguration createJobConfiguration() {
        String jobs = "0=宋远桥,1=俞莲舟,2=俞岱岩,3=张松溪,4=张翠山,5=殷梨亭,6=莫声谷";
        return JobConfiguration.newBuilder("FeenixJobImpl", 7)
                               .cron("0/3 * * * * ?")
                               .shardingItemParameters(jobs)
                               // 使用自定义的作业分片策略
                               .jobShardingStrategyType("CUSTOM")
                               // 允许客户端配置覆盖注册中心
                               .overwrite(true)
                               // 故障转移
                               .failover(true)
                               .build();
    }

}

jobShardingStrategyType的值要设为"CUSTOM",因为在自定义分片策略的时候,已经将自定义的策略类型写死为"CUSTOM"。

可以看到,在同时开启两个实例的情况下,根据自定义的策略作业被分配到两个不同的实例上执行

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/93329.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

业主应该重视装修中的“道”而不是“术”!极家精工装修好不好!

业主应该重视装修中的“道”而不是“术”&#xff01;极家精工装修好不好&#xff01;看了很多业主问了很多关于装修中很琐碎的事儿&#xff0c;比如“装修流程”、“装修应该注意什么”、“装修哪些必须要重视”、“某某材料和某某材料相比哪个好”、“家里装了什么是你最不后…

Lua中的基本数据类型

Lua中的数据类型一、Lua基本数据类型1.1、nil1.2、boolean1.3、number1.4、string1.5、function1.6、table二、Lua 通用数据结构的实现总结后言Lua是一门动态类型的脚本语言&#xff0c;这意味着同一个变量可以在不同时刻指向不同类型的数据。Lua代码中 一般采用一下两种做法相…

Dubbo-admin+Zookeeper 的环境搭建实操与 Could-not-extract-archive 报错踩坑

$ brew install zookeeper > Downloading https://homebrew.bintray.com/bottles/zookeeper-3.4.13.mojave.bottle.tar.gz ...先来看dubbo-admin的安装&#xff1b;我们先找到它在apache下的官方GitHub&#xff0c;官方也有相关介绍&#xff0c;中英文版都有(毕竟原本是中国…

[附源码]Node.js计算机毕业设计高校学科竞赛管理系统Express

项目运行 环境配置&#xff1a; Node.js最新版 Vscode Mysql5.7 HBuilderXNavicat11Vue。 项目技术&#xff1a; Express框架 Node.js Vue 等等组成&#xff0c;B/S模式 Vscode管理前后端分离等等。 环境需要 1.运行环境&#xff1a;最好是Nodejs最新版&#xff0c;我…

Kibana使用

简介 Kibana是通向 Elastic 产品集的窗口。 它可以在 Elasticsearch 中对数据进行视觉探索和实时分析。 Kibana通常用于项目log日志收集分析、数据可视化分析等。 一、【Discover】搜索查询 Discover模块用于全文搜索文档(doucument),支持索引筛选、时间筛选、字段筛选、支持…

linux下syslog使用说明

syslog 系统日志应用 1) 概述 syslog是Linux默认的日志守护进程。默认的syslog配置文件是/etc/syslog.conf文件。程序&#xff0c;守护进程和内核提供了访问系统的日志信息。因此&#xff0c;任何希望生成日志信息的程序都可以向 syslog 接口呼叫生成该信息。 几乎所有的…

读《深入浅出MySQL数据库开发、优化与管理维护(第2版)》笔记1

上面3图是书中MySQL帮助的使用小节; 实测: 我用DATE_FORMAT(date,format)函数的时候经常会记不清格式化的字符是啥,这个时候我会去求助度娘,然后从零散的帖子里找一个合适的,测试一下可用,就拿来用了,但没法马上找到一个比较完整系统一点的帖子,从看此书本章节,可知使用MySQL的…

acm是什么?你准备好去打了吗?

1.引言2.acm究竟是什么&#xff1f;3.acm的时间安排重点网络赛的作用1.名额分配2.校内选拔icpc省赛省赛选拔赛(校内)4.acm该如何准备1.前期的算法积累1.Acwing 平台算法基础课 -y总业界良心。算法提高课 基本囊括了蓝桥杯的知识范畴算法进阶课&#xff08;选&#xff09; 算法中…

MYSQL 8.0 -- 事务中删除不存在的记录导致死锁

最近开发的某个功能中&#xff0c;线上偶尔会爆出死锁异常。再大佬同事的帮助下&#xff0c;最终排查出了原因&#xff0c;在此记录一下。 文章目录业务描述事务中删除行时锁的表现场景重现问题处理业务描述 在业绩信息维护中&#xff0c;可以维护相关人员列表&#xff0c;相关…

谁再问我 Kafka,我把这 43 张图甩给他

从Kafka诞生的早期&#xff0c;我就对Kafka投入了很多的关注&#xff0c;虽然不敢说精通Kafka, 但也算是非常熟悉了。 平时在工作之中&#xff0c;几乎天天都在跟这玩意儿打交道&#xff0c;在面试的时候&#xff0c;也会经常聊一些Kafka相关的内容。 Kafka 是一个优秀的分布…

二苯并环辛炔-二硫键-马来酰亚胺,DBCO-SS-Maleimide,DBCO-SS-Mal

基础产品数据&#xff08;Basic Product Data&#xff09;&#xff1a; CAS号&#xff1a;N/A 中文名&#xff1a;二苯并环辛炔-二硫键-马来酰亚胺 英文名&#xff1a;DBCO-SS-Maleimide&#xff0c;DBCO-SS-Mal 详细产品数据&#xff08;Detailed Product Data&#xff09;&am…

C++--数据结构--并查集--高阶0711

1. 并查集 在一些应用问题中&#xff0c;需要将n个不同的元素划分成一些不相交的集合。开始时&#xff0c;每个元素自成一个 单元素集合&#xff0c;然后按一定的规律将归于同一组元素的集合合并。在此过程中要反复用到查询某一 个元素归属于那个集合的运算。适合于描述这类问…

如何使用Python批量化处理Excel——零基础入门指南

本教程旨在帮助零编程基础&#xff0c;但是又有“批量化处理Excel表”这种需求的大家。 在进入教程时&#xff0c;请确保你具有以下资质&#xff1a; 1、 并非工作压到头上了&#xff0c;急需解决一批表所以想过来速成&#xff0c;而是愿意耐心花上几个小时学习来获得一项长久…

Redis之相关拓展(事务、监控、Jedis)

Redis之相关拓展一、事务1、介绍2、流程3、shell命令3.1 开启事务3.2 放弃事务3.3 编译型异常&#xff08;代码有问题&#xff0c;命令有错&#xff09;3.4 运行时异常二、监控&#xff08;watch&#xff09;1、锁1.1 悲观锁1.2 乐观锁2、注意2.1 原理2.2 流程三、Jedis1、简介…

学习Opencv不得不掌握的操作

OpenCV基本操作 1 图像的IO操作 这里我们会给大家介绍如何读取图像&#xff0c;如何显示图像和如何保存图像。 1.1 读取图像 API cv.imread() 参数&#xff1a; 要读取的图像读取方式的标志cv.IMREAD*COLOR&#xff1a;以彩色模式加载图像&#xff0c;任何图像的透明度都将…

C++ Primer笔记——explicit、string流、vector比较、emplace

目录 一.P265 抑制构造函数定义的隐式转换 二.P287 string流 三.P304 vector的比较 四.P307 在容器中特定位置添加元素 一.P265 抑制构造函数定义的隐式转换 举个例子&#xff0c;如果构造函数参数是string类型&#xff0c;那么当使用赋值符号进行初始化操作时&#xff0c…

leetcode -- ⽤最少数量的箭引爆⽓球(452)

有一些球形气球贴在一堵用 XY 平面表示的墙面上。墙面上的气球记录在整数数组 points &#xff0c;其中points[i] [xstart, xend] 表示水平直径在 xstart 和 xend之间的气球。你不知道气球的确切 y 坐标。 一支弓箭可以沿着 x 轴从不同点 完全垂直 地射出。在坐标 x 处射出一…

数据模型篇之阿里巴巴数据整合及管理体系

第9章 阿里巴巴整合及管理体系 OneData的设计是为了建设统一的、规范化的数据接人层&#xff08; ODS &#xff09;和数据中间层&#xff08; DWD和DWS &#xff09;&#xff0c;通过数据服务和数据产品&#xff0c;完成服务于阿里巴巴的大数据系统建设 &#xff0c;即数据公共…

2022 年 11 月区块链操作系统的开发回顾

查看 Cartesi Machine、Cartesi Rollups 和 Noether 的更新正在寻找区块链操作系统组件的最新进展&#xff1f;你找对地方了&#xff01;正如在我们的路线图文章中所描述的那样&#xff0c;我们一直在朝着定期且频繁的更新方向发展着&#xff0c;以便让我们的社区能够及时的了解…

微服务框架 SpringCloud微服务架构 多级缓存 46 JVM 进程缓存 46.1 导入商品案例【MySQL环境准备】

微服务框架 【SpringCloudRabbitMQDockerRedis搜索分布式&#xff0c;系统详解springcloud微服务技术栈课程|黑马程序员Java微服务】 多级缓存 文章目录微服务框架多级缓存46 JVM 进程缓存46.1 导入商品案例【MySQL环境准备】46.1.1 导入商品管理案例46 JVM 进程缓存 用于在T…