elastic-job 搭建——应用于企业级项目

news2025/1/12 19:02:33

1. 📂 技术方案

方案介绍

ElasticJob 是面向互联网生态和海量任务的分布式调度解决方案。 它通过弹性调度、资源管控、以及作业治理的功能,打造一个适用于互联网场景的分布式调度解决方案,并通过开放的架构设计,提供多元化的作业生态。

1、elastic-job-lite:apache维护版本3.0.1

2、zookeeper:【zookeeper:3.6.3-openjdk】

3、elasticjob-lite-ui:【apache/shardingsphere-elasticjob-lite-ui:3.0.2】

核心内容

1、elastic-job-sdk:进一步封装elastic-job-lite。能够用少量的注解和配置,来轻松使用定时任务。

2、elastic-job-ui:运维管理,界面操作。

2. 🔱 技术架构

架构图

3. 💠 核心流程

一、elastic-job-sdk

1、引入SDK

<dependency>
    <groupId>com.wotu.sdk</groupId>
    <artifactId>elastic-job</artifactId>
    <version>1.0.0-SNAPSHOT</version>
</dependency>

2、apollo配置

#elasticjob.dump.enabled = true

elasticjob.reg-center.server-lists = 47.98.193.2:32181
elasticjob.reg-center.namespace = elastic-job-test

elasticjob.reg-center.base-sleep-time-milliseconds = 10000
elasticjob.reg-center.max-sleep-time-milliseconds = 30000
elasticjob.reg-center.session-timeout-milliseconds = 600000
elasticjob.reg-center.connection-timeout-milliseconds = 600000

elasticjob.tracing.type = RDB

spring.datasource.druid.job.type = com.alibaba.druid.pool.DruidDataSource
spring.datasource.druid.job.username = platform1
spring.datasource.druid.job.password = Ht60794066
spring.datasource.druid.job.driver-class-name = com.mysql.cj.jdbc.Driver
spring.datasource.druid.job.url = jdbc:mysql://wtcareertestnet.mysql.rds.aliyuncs.com:3306/middleware_elastic_job?serverTimezone=GMT%2B8&zeroDateTimeBehavior=convertToNull

3、定时任务

@Slf4j
@ElasticJobScheduler(name = "applyTestJob1", cron = "0/10 * * * * ? *")
public class ApplyJobTest implements SimpleJob {

    @Override
    public void execute(ShardingContext shardingContext) {
        log.info("ApplyJobTest start, time:{}", System.nanoTime());

        try {
            Thread.sleep(5000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        log.info("ApplyJobTest end, time:{}", System.nanoTime());
    }
}

4、其他配置

#config strategy, more strategy refer to official document
#cofig error handler type.  LOG,THROW,IGNORE,EMAIL,WECHAT,DINGTALK
elasticjob.job-error-handler-type=LOG
#config sharding strategy type. AVG_ALLOCATION,ODEVITY,ROUND_ROBIN
elasticjob.job-sharding-strategy-type=AVG_ALLOCATION
elasticjob.job-executor-service-handler-type=CPU

#tracing
#elasticjob.tracing.type=RDB
#multiple datasource
#elasticjob.tracing.data-source-bean-name=db1MasterSlaveRoutingDatasource

#config email notify
elasticjob.props.email.host=host
elasticjob.props.email.port=465
elasticjob.props.email.username=username
elasticjob.props.email.password=password
elasticjob.props.email.useSsl=true
elasticjob.props.email.subject=ElasticJob error message
elasticjob.props.email.from=from@xxx.xx
elasticjob.props.email.to=to1@xxx.xx,to2@xxx.xx
elasticjob.props.email.cc=cc@xxx.xx
elasticjob.props.email.bcc=bcc@xxx.xx
elasticjob.props.email.debug=false

#config wechat notify
elasticjob.props.wechat.webhook=you_webhook
elasticjob.props.wechat.connectTimeout=3000
elasticjob.props.wechat.readTimeout=5000

#config dingtalk notify
elasticjob.props.dingtalk.webhook=you_webhook
elasticjob.props.dingtalk.keyword=you_keyword
elasticjob.props.dingtalk.secret=you_secret
elasticjob.props.dingtalk.connectTimeout=3000
elasticjob.props.dingtalk.readTimeout=5000

二、elastic-job-ui

测试环境地址:http://47.98.193.2:8088/#/registry-center

root

root

界面配置

zookeeper连接地址:47.98.193.2:32181

4. ⚛️ 详细设计

一、弹性调度

ElasticJob 中任务分片项的概念,使得任务可以在分布式的环境下运行,每台任务服务器只运行分配给该服务器的分片。 随着服务器的增加或宕机,ElasticJob 会近乎实时的感知服务器数量的变更,从而重新为分布式的任务服务器分配更加合理的任务分片项,使得任务可以随着资源的增加而提升效率。

任务的分布式执行,需要将一个任务拆分为多个独立的任务项,然后由分布式的服务器分别执行某一个或几个分片项。

举例说明,如果作业分为 4 片,用两台服务器执行,则每个服务器分到 2 片,分别负责作业的 50% 的负载,如下图所示。

作用:根据分片值,对不同的数据进行处理

注意:分片了,就一定要对业务代码进行改造。比如说你分3片,原来的业务代码根本没变,没有用到分片项和分配参数,就会导致3台服务器执行了一样的数据,数据量翻三倍,业务也错乱

@Slf4j
@ElasticJobScheduler(name = "applyTestJob1", cron = "0/10 * * * * ? *",
        shardingTotalCount = 4, shardingItemParameters = "0=0,1=0,2=1,3=1")
public class ApplyJobTest implements SimpleJob {

    @Override
    public void execute(ShardingContext shardingContext) {
        int shardingItem = shardingContext.getShardingItem();

        switch (shardingItem) {
            case 0 : {
                handle(shardingItem);
                break;
            }
            case 1: {
                handle(shardingItem);
                break;
            }
            default: {
                return;
            }
        }
    }

    private void handle(int shardingItem) {
        log.info("ApplyJobTest start, time:{}", System.nanoTime());

        try {
            Thread.sleep(5000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        log.info("shardingItem:{}", shardingItem);

        log.info("ApplyJobTest end, time:{}", System.nanoTime());
    }
}

二、失效转移

在开启失效转移功能之后,ElasticJob 的其他服务器能够在感知到宕机的作业服务器之后,补偿执行该分片作业。

注意:

1、做好代码幂等性。

2、简单任务不需要开启失效转移,否则会影响性能。

3、需要先开启monitorExecution = true(Job幂等性)

@Slf4j
@ElasticJobScheduler(name = "applyTestJobOnce", cron = "0/10 * * * * ? *",
        monitorExecution = true, failover = true)
public class ApplyJobTest implements SimpleJob {

    @Override
    public void execute(ShardingContext shardingContext) {
        log.info("ApplyJobTest start, time:{}", System.nanoTime());

        try {
            Thread.sleep(5000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        log.info("shardingItem:{}", shardingContext.getShardingItem());

        log.info("ApplyJobTest end, time:{}", System.nanoTime());
    }
}

三、任务错过执行

注意:长执行任务再使用

@Slf4j
@ElasticJobScheduler(name = "applyTestJobOnce", cron = "0/10 * * * * ? *", misfire = true)
public class ApplyJobTest implements SimpleJob {

    @Override
    public void execute(ShardingContext shardingContext) {
        log.info("ApplyJobTest start, time:{}", System.nanoTime());

        try {
            Thread.sleep(5000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        log.info("ApplyJobTest mid, time:{}", System.nanoTime());

        try {
            Thread.sleep(5000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        log.info("shardingItem:{}", shardingContext.getShardingItem());

        log.info("ApplyJobTest end, time:{}", System.nanoTime());
    }
}

四、事件追踪

配置elastic-job和elastic-job-ui的事件追踪

1、elastic-job-lite 配置事件追踪

elastic-job会自动注入beanName为dataSource的数据源为事件追踪数据源。

apache版本的elastic-job将事件追踪放在额外配置中,需要自行放入configuration里

2、elastic-job-ui配置数据源

第一步:elastic-job-ui的docker容器,在/opt/elastic-job-ui/lib 下添加mysql的连接jar包

第二步:修改/config下application.properties,添加驱动

五、自定义WotuSimpleJob

作用:能够在定时任务执行时,拿到任务执行的开始时间(用作长任务之后,补偿任务拿任务的开始时间)

@Slf4j
@ElasticJobScheduler(name = "applyTestJobOnce", cron = "0/10 * * * * ? *",
        monitorExecution = true, failover = true, misfire = true)
public class ApplyJobTest implements WotuSimpleJob {

    @Override
    public void execute(ShardingContext shardingContext, LocalDateTime localDateTime) {
        log.info("ApplyJobTest execute, time:{}, id:{}", localDateTime);
        log.info("ApplyJobTest start, time:{}, id:{}", System.nanoTime(), shardingContext.getTaskId());

        try {
            Thread.sleep(5000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        log.info("ApplyJobTest mid, time:{}", System.nanoTime());

        try {
            Thread.sleep(5000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        log.info("shardingItem:{}", shardingContext.getShardingItem());

        log.info("ApplyJobTest end, time:{}", System.nanoTime());
    }
}

六、多数据源

配置了多数据源,需要注意原本的sql,是否走的是原数据源。

其他:

1、错误处理策略:记录日志策略、抛出/忽略异常策略、邮件/企业微信/钉钉通知策略

2、线程池策略:CPU 资源策略、单线程策略

3、作业分片策略:平均分片策略、奇偶分片策略、轮询分片策略

4、作业类型:SIMPLE、DATAFLOW、SCRIPT、HTTP

部分问题:

https://www.cnblogs.com/hzzjj/p/15715896.html

任务调度之Elastic-Job - 灰信网(软件开发博客聚合)

默认定时任务名称 + 服务名

最后一次任务执行时间

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/460523.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

你知道渲染农场是什么原理吗?它是如何工作的?

我们知道&#xff0c;仅靠一台计算机几乎是不能达到专业渲染集群的处理能力的。所以现在&#xff0c; 允许将很多台计算机或是处理器进行连接&#xff0c;再将连接后的机器作为一个总平台来处理不同的渲染需求&#xff0c;这样的设置&#xff0c;就被称之为渲染农场。 渲染农…

年轻人“赶烤”淄博,文旅业如何借势?

​&#xff08;图片来源于网络&#xff0c;侵删&#xff09; 文 | 螳螂观察 作者 | 易不二 从“更适合中国宝宝体质的TACO”在社交媒体爆火&#xff0c;到全国人民为之“赶烤”&#xff0c;淄博凭借独树一帜的烧烤文化&#xff0c;已经站上了文旅业回暖的潮头。 今年五一假期…

FPGA目前就业形势咋样?来听听业内工程师的看法

看到网上有一个问题很火&#xff1a;2023了&#xff0c;FPGA目前就业形势咋样?很多同学也对这个方向比较感兴趣&#xff0c;下面就来一起了解一下吧。 FPGA岗位有哪些&#xff1f; 从芯片设计流程来看&#xff0c;FPGA岗位可以分四类 产品开发期&#xff1a;FPGA系统架构师 …

基于FFmpeg倒放功能的实现-----命令行和API调用实现方法

来源:微信公众号「编程学习基地」 文章目录 FFmpeg API调用reverse滤镜实现视频倒放ffmpeg命令行实现方法FFmpeg 过滤器 调用API实现方法完整代码贴上运行FFmpeg API调用reverse滤镜实现视频倒放 ffmpeg命令行实现方法 ffmpeg -i bigbuckbunny_480x272.h265 -filter_comple…

vue页面内嵌iframe使用postMessage进行数据交互(postMessage跨域通信)

什么是postMessage postMessage是html5引入的API,它允许来自不同源的脚本采用异步方式进行有效的通信,可以实现跨文本文档,多窗口,跨域消息传递.多用于窗口间数据通信,这也使它成为跨域通信的一种有效的解决方案. vue父页面&#xff08;嵌入iframe的页面&#xff09; 在vue中…

【文心一言】文心一言最近这么火,它到底是什么

前言 文心一言&#xff08;英文名&#xff1a;ERNIE Bot&#xff09;是百度全新一代知识增强大语言模型&#xff0c;文心大模型家族的新成员&#xff0c;能够与人对话互动&#xff0c;回答问题&#xff0c;协助创作&#xff0c;高效便捷地帮助人们获取信息、知识和灵感。文心一…

网工容易混淆的三种网线类型:直连线、交叉线和反转线

网线是计算机网络中最常见的传输介质之一&#xff0c;它能够将数据快速而可靠地传输到各个网络设备中。在实际的网络应用中&#xff0c;我们常常需要使用到不同类型的网线&#xff0c;包括直连线、交叉线和反转线。本文将介绍这三种网线的定义、应用场景和注意事项。 直连线 …

Tailwind CSS入门(二)——基本介绍和特性

上一篇文章简要的介绍了原子类CSS&#xff0c;以及个人对语义化、原子化的一些经验和理解。从这篇文章开始&#xff0c;正式开始分享Tailwind CSS的特性、使用和技巧。 Tailwind CSS是一个为快速开发而精心设计的原子类CSS框架&#xff0c;在此我们将搭建一个Vite项目来配合讲…

力扣---LeetCode21. 合并两个有序链表(链表经典题)

文章目录 前言21. 合并两个有序链表链接&#xff1a;方法一&#xff1a;取小尾插1.1代码&#xff1a;1.2 流程图&#xff1a;1.3 注意&#xff1a; 方法二&#xff1a;带哨兵位2.1代码&#xff1a;2.2流程图&#xff1a; 总结 前言 焦虑不会消除明天的悲伤 只会让你今天的力量…

springboot概述

脚手架: 因为创建的为web项目&#xff0c;有这两个文件夹 在idea中也可以使用脚手架 会直接或间接包含依赖 启动类 单一模块: 启动类要放在根包下边&#xff0c;其他的业务放在根包或者根包的子包 多个模块: restController包含controller且每个方法都包含responseBody注解&…

这可能是你看过最详细的Java集合篇【二】—— LinkedList

文章目录 LinkedList继承关系数据结构变量构造方法添加元素相关方法查找元素相关方法删除元素相关方法清空方法遍历方法其它方法常见面试题 LinkedList LinkedList底层数据结构是双向链表。链表数据结构的特点是每个元素分配的空间不必连续、插入和删除元素时速度非常快、但访…

10年+工作经验总结:测试工程师职业成长路线图

一、功能测试工程师必备技能 1. 功能测试理论 主要包括&#xff1a; 软件测试流程 需求理解 测试用例设计&#xff08;编写测试用例的策略&#xff09; 执行测试用例 提交bug(bug是由什么组成&#xff0c;bug处理流程&#xff0c;bug优先级&#xff0c;bug的定位等) 回归…

基于Vue的web设计打印方案

企业信息化例如ERP,OA等等都会存在纸质单据打印的情况&#xff0c;需要在企业内部流转&#xff0c;打印设计也有很多方案&#xff0c;例如fastReport,bartender等等&#xff0c;今天要说的是 vue-plugin-hiprint&#xff0c;开源的web打印插件&#xff0c;基于此插件可以集成模…

SpringBoot 中如何正确的实现模块日志入库?

目录 1.简述2.踩坑记录3.LoginController4.LoginService5.LoginLogService5.1 Async实现异步5.2 自定义线程池实现异步1&#xff09;自定义线程池2&#xff09;复制上下文请求3&#xff09;自定义线程池实现异步 6.补充&#xff1a;LoginService 手动提交事务 背景&#xff1a;…

并发编程之可重入锁ReentrantLock

文章目录 前言ReentrantLock原理ReentrantLock VS Synchronized源码解析ReentrantLock同步机制ReentrantLock可重入机制ReentrantLock可中断机制ReentrantLock超时机制条件变量Condition 写在最后 前言 大家都知道在并发编程中一般会用到多线程技术&#xff0c;多线程技术可以…

2023年主流的选择仍是Feign, http客户端Feign还能再战

&#x1f473;我亲爱的各位大佬们好&#x1f618;&#x1f618;&#x1f618; ♨️本篇文章记录的为 微服务组件之http客户端Feign 相关内容&#xff0c;适合在学Java的小白,帮助新手快速上手,也适合复习中&#xff0c;面试中的大佬&#x1f649;&#x1f649;&#x1f649;。 …

UE4 架构初识(三)

UE4仿真引擎学习 一、架构基础 1. PlayerController PlayerController&#xff08;玩家控制器&#xff09; 是Pawn和控制它的人类玩家间的接口。PlayerController本质上代表了人类玩家的意愿。当设置PlayerController时&#xff0c;您需要考虑的一个事情就是您想在PlayerCont…

太阳辐射预报模式WRF-SOLAR在农业生态领域中的实践技术应用

太阳能是一种清洁能源&#xff0c;合理有效开发太阳能资源对减少污染、保护环境以及应对气候变化和能源安全具有非常重要的实际意义&#xff0c;为了实现能源和环境的可持续发展&#xff0c;近年来世界各国都高度重视太阳能资源的开发利用&#xff1b;另外太阳辐射的光谱成分、…

Navicat15数据库导表及乱码问题解决

本地环境 Win10 PHPstudy_Pro(小皮) PHP5.6 MySQL5.7 连接MySQL数据库 1.启动Navicat15 2.点击连接按钮,并选择MySQL子项 3.连接对话框 连接名:自己分的清的名字即可 主机:数据的地址 若连接非本地mysql只需将主机localhost换成需要连接数据的ip地址即可&#xff0c;输入数…

Linux下一切皆文件与指令的本质(可执行程序),which指令等

Linux下一切皆文件 在Linux下的话&#xff0c;一切皆文件。主要是看待诸如软硬件设备与磁盘文件的看法&#xff1a;一切皆文件&#xff0c;比如说显示器它也是文件&#xff0c;键盘也是文件&#xff0c;普通文件肯定是文件。首先就是显示器这个东西&#xff0c;它其实就是可以打…