1. 📂 技术方案
方案介绍
ElasticJob 是面向互联网生态和海量任务的分布式调度解决方案。 它通过弹性调度、资源管控、以及作业治理的功能,打造一个适用于互联网场景的分布式调度解决方案,并通过开放的架构设计,提供多元化的作业生态。
1、elastic-job-lite:apache维护版本3.0.1
2、zookeeper:【zookeeper:3.6.3-openjdk】
3、elasticjob-lite-ui:【apache/shardingsphere-elasticjob-lite-ui:3.0.2】
核心内容
1、elastic-job-sdk:进一步封装elastic-job-lite。能够用少量的注解和配置,来轻松使用定时任务。
2、elastic-job-ui:运维管理,界面操作。
2. 🔱 技术架构
架构图
3. 💠 核心流程
一、elastic-job-sdk
1、引入SDK
<dependency>
<groupId>com.wotu.sdk</groupId>
<artifactId>elastic-job</artifactId>
<version>1.0.0-SNAPSHOT</version>
</dependency>
2、apollo配置
#elasticjob.dump.enabled = true
elasticjob.reg-center.server-lists = 47.98.193.2:32181
elasticjob.reg-center.namespace = elastic-job-test
elasticjob.reg-center.base-sleep-time-milliseconds = 10000
elasticjob.reg-center.max-sleep-time-milliseconds = 30000
elasticjob.reg-center.session-timeout-milliseconds = 600000
elasticjob.reg-center.connection-timeout-milliseconds = 600000
elasticjob.tracing.type = RDB
spring.datasource.druid.job.type = com.alibaba.druid.pool.DruidDataSource
spring.datasource.druid.job.username = platform1
spring.datasource.druid.job.password = Ht60794066
spring.datasource.druid.job.driver-class-name = com.mysql.cj.jdbc.Driver
spring.datasource.druid.job.url = jdbc:mysql://wtcareertestnet.mysql.rds.aliyuncs.com:3306/middleware_elastic_job?serverTimezone=GMT%2B8&zeroDateTimeBehavior=convertToNull
3、定时任务
@Slf4j
@ElasticJobScheduler(name = "applyTestJob1", cron = "0/10 * * * * ? *")
public class ApplyJobTest implements SimpleJob {
@Override
public void execute(ShardingContext shardingContext) {
log.info("ApplyJobTest start, time:{}", System.nanoTime());
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
log.info("ApplyJobTest end, time:{}", System.nanoTime());
}
}
4、其他配置
#config strategy, more strategy refer to official document
#cofig error handler type. LOG,THROW,IGNORE,EMAIL,WECHAT,DINGTALK
elasticjob.job-error-handler-type=LOG
#config sharding strategy type. AVG_ALLOCATION,ODEVITY,ROUND_ROBIN
elasticjob.job-sharding-strategy-type=AVG_ALLOCATION
elasticjob.job-executor-service-handler-type=CPU
#tracing
#elasticjob.tracing.type=RDB
#multiple datasource
#elasticjob.tracing.data-source-bean-name=db1MasterSlaveRoutingDatasource
#config email notify
elasticjob.props.email.host=host
elasticjob.props.email.port=465
elasticjob.props.email.username=username
elasticjob.props.email.password=password
elasticjob.props.email.useSsl=true
elasticjob.props.email.subject=ElasticJob error message
elasticjob.props.email.from=from@xxx.xx
elasticjob.props.email.to=to1@xxx.xx,to2@xxx.xx
elasticjob.props.email.cc=cc@xxx.xx
elasticjob.props.email.bcc=bcc@xxx.xx
elasticjob.props.email.debug=false
#config wechat notify
elasticjob.props.wechat.webhook=you_webhook
elasticjob.props.wechat.connectTimeout=3000
elasticjob.props.wechat.readTimeout=5000
#config dingtalk notify
elasticjob.props.dingtalk.webhook=you_webhook
elasticjob.props.dingtalk.keyword=you_keyword
elasticjob.props.dingtalk.secret=you_secret
elasticjob.props.dingtalk.connectTimeout=3000
elasticjob.props.dingtalk.readTimeout=5000
二、elastic-job-ui
测试环境地址:http://47.98.193.2:8088/#/registry-center
root
root
界面配置
zookeeper连接地址:47.98.193.2:32181
4. ⚛️ 详细设计
一、弹性调度
ElasticJob 中任务分片项的概念,使得任务可以在分布式的环境下运行,每台任务服务器只运行分配给该服务器的分片。 随着服务器的增加或宕机,ElasticJob 会近乎实时的感知服务器数量的变更,从而重新为分布式的任务服务器分配更加合理的任务分片项,使得任务可以随着资源的增加而提升效率。
任务的分布式执行,需要将一个任务拆分为多个独立的任务项,然后由分布式的服务器分别执行某一个或几个分片项。
举例说明,如果作业分为 4 片,用两台服务器执行,则每个服务器分到 2 片,分别负责作业的 50% 的负载,如下图所示。
作用:根据分片值,对不同的数据进行处理
注意:分片了,就一定要对业务代码进行改造。比如说你分3片,原来的业务代码根本没变,没有用到分片项和分配参数,就会导致3台服务器执行了一样的数据,数据量翻三倍,业务也错乱
@Slf4j
@ElasticJobScheduler(name = "applyTestJob1", cron = "0/10 * * * * ? *",
shardingTotalCount = 4, shardingItemParameters = "0=0,1=0,2=1,3=1")
public class ApplyJobTest implements SimpleJob {
@Override
public void execute(ShardingContext shardingContext) {
int shardingItem = shardingContext.getShardingItem();
switch (shardingItem) {
case 0 : {
handle(shardingItem);
break;
}
case 1: {
handle(shardingItem);
break;
}
default: {
return;
}
}
}
private void handle(int shardingItem) {
log.info("ApplyJobTest start, time:{}", System.nanoTime());
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
log.info("shardingItem:{}", shardingItem);
log.info("ApplyJobTest end, time:{}", System.nanoTime());
}
}
二、失效转移
在开启失效转移功能之后,ElasticJob 的其他服务器能够在感知到宕机的作业服务器之后,补偿执行该分片作业。
注意:
1、做好代码幂等性。
2、简单任务不需要开启失效转移,否则会影响性能。
3、需要先开启monitorExecution = true(Job幂等性)
@Slf4j
@ElasticJobScheduler(name = "applyTestJobOnce", cron = "0/10 * * * * ? *",
monitorExecution = true, failover = true)
public class ApplyJobTest implements SimpleJob {
@Override
public void execute(ShardingContext shardingContext) {
log.info("ApplyJobTest start, time:{}", System.nanoTime());
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
log.info("shardingItem:{}", shardingContext.getShardingItem());
log.info("ApplyJobTest end, time:{}", System.nanoTime());
}
}
三、任务错过执行
注意:长执行任务再使用
@Slf4j
@ElasticJobScheduler(name = "applyTestJobOnce", cron = "0/10 * * * * ? *", misfire = true)
public class ApplyJobTest implements SimpleJob {
@Override
public void execute(ShardingContext shardingContext) {
log.info("ApplyJobTest start, time:{}", System.nanoTime());
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
log.info("ApplyJobTest mid, time:{}", System.nanoTime());
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
log.info("shardingItem:{}", shardingContext.getShardingItem());
log.info("ApplyJobTest end, time:{}", System.nanoTime());
}
}
四、事件追踪
配置elastic-job和elastic-job-ui的事件追踪
1、elastic-job-lite 配置事件追踪
elastic-job会自动注入beanName为dataSource的数据源为事件追踪数据源。
apache版本的elastic-job将事件追踪放在额外配置中,需要自行放入configuration里
2、elastic-job-ui配置数据源
第一步:elastic-job-ui的docker容器,在/opt/elastic-job-ui/lib 下添加mysql的连接jar包
第二步:修改/config下application.properties,添加驱动
五、自定义WotuSimpleJob
作用:能够在定时任务执行时,拿到任务执行的开始时间(用作长任务之后,补偿任务拿任务的开始时间)
@Slf4j
@ElasticJobScheduler(name = "applyTestJobOnce", cron = "0/10 * * * * ? *",
monitorExecution = true, failover = true, misfire = true)
public class ApplyJobTest implements WotuSimpleJob {
@Override
public void execute(ShardingContext shardingContext, LocalDateTime localDateTime) {
log.info("ApplyJobTest execute, time:{}, id:{}", localDateTime);
log.info("ApplyJobTest start, time:{}, id:{}", System.nanoTime(), shardingContext.getTaskId());
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
log.info("ApplyJobTest mid, time:{}", System.nanoTime());
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
log.info("shardingItem:{}", shardingContext.getShardingItem());
log.info("ApplyJobTest end, time:{}", System.nanoTime());
}
}
六、多数据源
配置了多数据源,需要注意原本的sql,是否走的是原数据源。
其他:
1、错误处理策略:记录日志策略、抛出/忽略异常策略、邮件/企业微信/钉钉通知策略
2、线程池策略:CPU 资源策略、单线程策略
3、作业分片策略:平均分片策略、奇偶分片策略、轮询分片策略
4、作业类型:SIMPLE、DATAFLOW、SCRIPT、HTTP
部分问题:
https://www.cnblogs.com/hzzjj/p/15715896.html
任务调度之Elastic-Job - 灰信网(软件开发博客聚合)
默认定时任务名称 + 服务名
最后一次任务执行时间