文章目录
- 前言
- ElasticJob-Lite 3.x 集成springBoot 实战 (一次性作业、定时作业)
- 1. ElasticJob简介
- 2. ElasticJob-Lite 是什么
- 3. 功能列表
- 4. 所需依赖包
- 5. 定时作业配置
- 5.1. 作业:
- 5.2. yml配置:
- 5.3. 测试
- 6. 一次性任务配置、并手动触发
- 6.1. 作业:
- 6.2. yml配置:
- 6.3. 测试
- 7. 其他
- 7.1. Zookeeper命令查看elasticjob注册的信息
- 7.2. 如果注册发生未知错误,如何清理elasticjob在zookeeper上注册的命令空间
前言
如果您觉得有用的话,记得给博主点个赞,评论,收藏一键三连啊,写作不易啊^ _ ^。
而且听说点赞的人每天的运气都不会太差,实在白嫖的话,那欢迎常来啊!!!
ElasticJob-Lite 3.x 集成springBoot 实战 (一次性作业、定时作业)
1. ElasticJob简介
使用 ElasticJob 能够让开发工程师不再担心任务的线性吞吐量提升等非功能需求,使他们能够更加专注于面向业务编码设计; 同时,它也能够解放运维工程师,使他们不必再担心任务的可用性和相关管理需求,只通过轻松的增加服务节点即可达到自动化运维的目的。
2. ElasticJob-Lite 是什么
定位为轻量级无中心化解决方案,使用 jar 的形式提供分布式任务的协调服务。
3. 功能列表
弹性调度:
- 支持任务在分布式场景下的分片和高可用
- 能够水平扩展任务的吞吐量和执行效率
- 任务处理能力随资源配备弹性伸缩
资源分配:
- 在适合的时间将适合的资源分配给任务并使其生效
- 相同任务聚合至相同的执行器统一处理
- 动态调配追加资源至新分配的任务
作业治理:
- 失效转移
- 错过作业重新执行
- 自诊断修复
作业依赖(TODO):
- 基于有向无环图(DAG)的作业间依赖
- 基于有向无环图(DAG)的作业分片间依赖
作业开放生态:
- 可扩展的作业类型统一接口
- 丰富的作业类型库,如数据流、脚本、HTTP、文件、大数据等
- 易于对接业务作业,能够与 Spring 依赖注入无缝整合
可视化管控端:
- 作业管控端
- 作业执行历史数据追踪
- 注册中心管理
4. 所需依赖包
<dependency>
<groupId>org.apache.shardingsphere.elasticjob</groupId>
<artifactId>elasticjob-lite-spring-boot-starter</artifactId>
<version>3.0.1</version>
</dependency>
<dependency>
<groupId>org.apache.shardingsphere.elasticjob</groupId>
<artifactId>elasticjob-error-handler-wechat</artifactId>
<version>3.0.1</version>
</dependency>
<dependency>
<groupId>org.codehaus.groovy</groupId>
<artifactId>groovy-all</artifactId>
<version>2.4.15</version>
其中:
<dependency>
<groupId>org.apache.shardingsphere.elasticjob</groupId>
<artifactId>elasticjob-error-handler-wechat</artifactId>
<version>3.0.1</version>
</dependency>
这个是配置企业微信通知策略 ,即当作业抛异常时,发送企业微信消息通知,但不中断作业执行。
5. 定时作业配置
5.1. 作业:
package org.example.jobs;
import lombok.extern.slf4j.Slf4j;
import org.apache.shardingsphere.elasticjob.api.ShardingContext;
import org.apache.shardingsphere.elasticjob.simple.job.SimpleJob;
import org.quartz.DisallowConcurrentExecution;
import org.springframework.stereotype.Component;
/**
* @author yangzhenyu
* @version 1.0
* @description:
* @date 2023/4/26 14:33
*/
@Slf4j
@Component
@DisallowConcurrentExecution
public class TestJobMany implements SimpleJob {
@Override
public void execute(ShardingContext context) {
//作业名称
String jobName = context.getJobName();
//分片序列号
int shardingItem = context.getShardingItem();
//分片序列号对应的value值
String shardingParameter = context.getShardingParameter();
//作业分片总数
int jobNameshardingTotalCount = context.getShardingTotalCount();
log.info("{} begin, 作业分片总数: {}, 分片序列号: {},分片序列号对应的value值:{}", jobName,jobNameshardingTotalCount, shardingItem,shardingParameter);
}
}
5.2. yml配置:
elasticjob:
regCenter:
# 连接Zookeeper服务器的列表,包括IP地址和端口号,多个地址用逗号分隔
serverLists: 127.0.0.1:2181
# Zookeeper的命名空间
namespace: yzy
# 等待重试的间隔时间的初始值 单位:毫秒
base-sleep-time-milliseconds: 10000
# 等待重试的间隔时间的最大值
max-sleep-time-milliseconds: 30000
# 最大重试次数
max-retries: 3
# 会话超时时间 单位: 毫秒
session-timeout-milliseconds: 600000
# 连接超时时间 单位: 毫秒
connection-timeout-milliseconds: 600000
# elasticjob.jobs 是一个 Map,
# key 为作业名称,value 为作业类型与配置。
# Starter 会根据该配置自动创建 OneOffJobBootstrap 或 ScheduleJobBootstrap 的实例
# 并注册到 Spring 容器中。
jobs:
TestJobMany:
# elasticJobClass 与 elasticJobType 互斥,每项作业只能有一种类型
elasticJobClass: org.example.jobs.TestJobMany
# 作业分片总数
shardingTotalCount: 20
# 错误处理策略:记录日志策略、记录作业异常日志,但不中断作业执行
# LOG : 记录日志策略、记录作业异常日志,但不中断作业执行
# THROW : 抛出异常策略 抛出系统异常并中断作业执行
# IGNORE : 忽略异常策略 忽略系统异常且不中断作业执行
# WECHAT : 企业微信通知策略 发送企业微信消息通知,但不中断作业执行
jobErrorHandlerType: LOG
cron: 0/5 * * * * ?
# 作业线程池处理策略
jobExecutorServiceHandlerType: "SINGLE_THREAD"
# 本地配置是否可覆盖注册中心配置 , 如果可覆盖,每次启动作业都以本地配置为准
overwrite: true
# 是否开启错过任务重新执行
misfire: true
#是否开启任务执行失效转移,开启表示如果作业在一次任务执行中途宕机,允许将该次未完成的任务在另一作业节点上补偿执行
failover: true
如果:jobErrorHandlerType选择WECHAT
那么配置如下:
# Job 配置
elasticjob:
regCenter:
# 连接Zookeeper服务器的列表,包括IP地址和端口号,多个地址用逗号分隔
serverLists: 127.0.0.1:2181
# Zookeeper的命名空间
namespace: yzy
# 等待重试的间隔时间的初始值 单位:毫秒
base-sleep-time-milliseconds: 10000
# 等待重试的间隔时间的最大值
max-sleep-time-milliseconds: 30000
# 最大重试次数
max-retries: 3
# 会话超时时间 单位: 毫秒
session-timeout-milliseconds: 600000
# 连接超时时间 单位: 毫秒
connection-timeout-milliseconds: 600000
props:
wechat:
# 企业微信机器人的 webhook 地址
webhook: xxxxxx
# 与企业微信服务器建立连接的超时时间
connectTimeout: 3000
# 从企业微信服务器读取到可用资源的超时时间
readTimeout: 5000
# elasticjob.jobs 是一个 Map,
# key 为作业名称,value 为作业类型与配置。
# Starter 会根据该配置自动创建 OneOffJobBootstrap 或 ScheduleJobBootstrap 的实例
# 并注册到 Spring 容器中。
jobs:
TestJobMany:
# elasticJobClass 与 elasticJobType 互斥,每项作业只能有一种类型
elasticJobClass: org.example.jobs.TestJobMany
# 作业分片总数
shardingTotalCount: 20
# 错误处理策略:记录日志策略、记录作业异常日志,但不中断作业执行
# LOG : 记录日志策略、记录作业异常日志,但不中断作业执行
# THROW : 抛出异常策略 抛出系统异常并中断作业执行
# IGNORE : 忽略异常策略 忽略系统异常且不中断作业执行
# WECHAT : 企业微信通知策略 发送企业微信消息通知,但不中断作业执行
jobErrorHandlerType: WECHAT
cron: 0/5 * * * * ?
# 作业线程池处理策略
jobExecutorServiceHandlerType: "SINGLE_THREAD"
# 本地配置是否可覆盖注册中心配置 , 如果可覆盖,每次启动作业都以本地配置为准
overwrite: true
# 是否开启错过任务重新执行
misfire: true
#是否开启任务执行失效转移,开启表示如果作业在一次任务执行中途宕机,允许将该次未完成的任务在另一作业节点上补偿执行
failover: true
5.3. 测试
6. 一次性任务配置、并手动触发
6.1. 作业:
package org.example.jobs;
import lombok.extern.slf4j.Slf4j;
import org.apache.shardingsphere.elasticjob.api.ShardingContext;
import org.apache.shardingsphere.elasticjob.simple.job.SimpleJob;
import org.example.entity.TestData;
import org.quartz.DisallowConcurrentExecution;
import org.springframework.stereotype.Component;
/**
* @author yangzhenyu
* @version 1.0
* @description:
* @date 2023/4/26 14:33
*/
@Slf4j
@Component
@DisallowConcurrentExecution
public class TestJobOne implements SimpleJob {
@Override
public void execute(ShardingContext context) {
//作业名称
String jobName = context.getJobName();
//分片序列号
int shardingItem = context.getShardingItem();
//分片序列号对应的value值
String shardingParameter = context.getShardingParameter();
//作业分片总数
int jobNameshardingTotalCount = context.getShardingTotalCount();
log.info("{} begin, 作业分片总数: {}, 分片序列号: {},分片序列号对应的value值:{}", jobName,jobNameshardingTotalCount, shardingItem,shardingParameter);
}
}
6.2. yml配置:
# Job 配置
elasticjob:
regCenter:
# 连接Zookeeper服务器的列表,包括IP地址和端口号,多个地址用逗号分隔
serverLists: 127.0.0.1:2181
# Zookeeper的命名空间
namespace: yzy
# 等待重试的间隔时间的初始值 单位:毫秒
base-sleep-time-milliseconds: 10000
# 等待重试的间隔时间的最大值
max-sleep-time-milliseconds: 30000
# 最大重试次数
max-retries: 3
# 会话超时时间 单位: 毫秒
session-timeout-milliseconds: 600000
# 连接超时时间 单位: 毫秒
connection-timeout-milliseconds: 600000
props:
wechat:
# 企业微信机器人的 webhook 地址
webhook: xxxxxx
# 与企业微信服务器建立连接的超时时间
connectTimeout: 3000
# 从企业微信服务器读取到可用资源的超时时间
readTimeout: 5000
# elasticjob.jobs 是一个 Map,
# key 为作业名称,value 为作业类型与配置。
# Starter 会根据该配置自动创建 OneOffJobBootstrap 或 ScheduleJobBootstrap 的实例
# 并注册到 Spring 容器中。
jobs:
# 如果配置了 cron 属性则为定时调度作业,Starter 会在应用启动时自动启动; 否则为一次性调度作业,
# 需要通过 jobBootstrapBeanName 指定 OneOffJobBootstrap Bean 的名称,
# 在触发点注入 OneOffJobBootstrap 的实例并手动调用 execute() 方法。
myOneOffJob:
jobBootstrapBeanName: myOneOffJob
# elasticJobClass 与 elasticJobType 互斥,每项作业只能有一种类型
elasticJobClass: org.example.jobs.TestJobOne
# 作业分片总数
shardingTotalCount: 20
# 错误处理策略:记录日志策略、记录作业异常日志,但不中断作业执行
# LOG : 记录日志策略、记录作业异常日志,但不中断作业执行
# THROW : 抛出异常策略 抛出系统异常并中断作业执行
# IGNORE : 忽略异常策略 忽略系统异常且不中断作业执行
# WECHAT : 企业微信通知策略 发送企业微信消息通知,但不中断作业执行
jobErrorHandlerType: LOG
# 作业线程池处理策略
jobExecutorServiceHandlerType: "SINGLE_THREAD"
# 分片序列号和参数用等号分隔,多个键值对用逗号分隔
# 分片序列号从0开始,不可大于或等于作业分片总数
shardingItemParameters: 0=Beijing,1=Shanghai,2=Guangzhou
# 本地配置是否可覆盖注册中心配置 , 如果可覆盖,每次启动作业都以本地配置为准
overwrite: true
# 是否开启错过任务重新执行
misfire: true
#是否开启任务执行失效转移,开启表示如果作业在一次任务执行中途宕机,允许将该次未完成的任务在另一作业节点上补偿执行
failover: true
6.3. 测试
controller调用代码:
@GetMapping("/execute")
public String executeOneOffJob() {
OneOffJobBootstrap myOneOffJob = SpringUtil.getBean("myOneOffJob");
myOneOffJob.execute();
return "success";
}
执行:
7. 其他
7.1. Zookeeper命令查看elasticjob注册的信息
查看作业空间:
yml全配置:
# Job 配置
elasticjob:
regCenter:
# 连接Zookeeper服务器的列表,包括IP地址和端口号,多个地址用逗号分隔
serverLists: 127.0.0.1:2181
# Zookeeper的命名空间
namespace: yzy
# 等待重试的间隔时间的初始值 单位:毫秒
base-sleep-time-milliseconds: 10000
# 等待重试的间隔时间的最大值
max-sleep-time-milliseconds: 30000
# 最大重试次数
max-retries: 3
# 会话超时时间 单位: 毫秒
session-timeout-milliseconds: 600000
# 连接超时时间 单位: 毫秒
connection-timeout-milliseconds: 600000
props:
wechat:
# 企业微信机器人的 webhook 地址
webhook: xxxxxx
# 与企业微信服务器建立连接的超时时间
connectTimeout: 3000
# 从企业微信服务器读取到可用资源的超时时间
readTimeout: 5000
# elasticjob.jobs 是一个 Map,
# key 为作业名称,value 为作业类型与配置。
# Starter 会根据该配置自动创建 OneOffJobBootstrap 或 ScheduleJobBootstrap 的实例
# 并注册到 Spring 容器中。
jobs:
TestJobMany:
# elasticJobClass 与 elasticJobType 互斥,每项作业只能有一种类型
elasticJobClass: org.example.jobs.TestJobMany
# 作业分片总数
shardingTotalCount: 20
# 错误处理策略:记录日志策略、记录作业异常日志,但不中断作业执行
# LOG : 记录日志策略、记录作业异常日志,但不中断作业执行
# THROW : 抛出异常策略 抛出系统异常并中断作业执行
# IGNORE : 忽略异常策略 忽略系统异常且不中断作业执行
# WECHAT : 企业微信通知策略 发送企业微信消息通知,但不中断作业执行
jobErrorHandlerType: LOG
cron: 0/5 * * * * ?
# 作业线程池处理策略
jobExecutorServiceHandlerType: "SINGLE_THREAD"
# 本地配置是否可覆盖注册中心配置 , 如果可覆盖,每次启动作业都以本地配置为准
overwrite: true
# 是否开启错过任务重新执行
misfire: true
#是否开启任务执行失效转移,开启表示如果作业在一次任务执行中途宕机,允许将该次未完成的任务在另一作业节点上补偿执行
failover: true
# 如果配置了 cron 属性则为定时调度作业,Starter 会在应用启动时自动启动; 否则为一次性调度作业,
# 需要通过 jobBootstrapBeanName 指定 OneOffJobBootstrap Bean 的名称,
# 在触发点注入 OneOffJobBootstrap 的实例并手动调用 execute() 方法。
myOneOffJob:
jobBootstrapBeanName: myOneOffJob
# elasticJobClass 与 elasticJobType 互斥,每项作业只能有一种类型
elasticJobClass: org.example.jobs.TestJobOne
# 作业分片总数
shardingTotalCount: 20
# 错误处理策略:记录日志策略、记录作业异常日志,但不中断作业执行
# LOG : 记录日志策略、记录作业异常日志,但不中断作业执行
# THROW : 抛出异常策略 抛出系统异常并中断作业执行
# IGNORE : 忽略异常策略 忽略系统异常且不中断作业执行
# WECHAT : 企业微信通知策略 发送企业微信消息通知,但不中断作业执行
jobErrorHandlerType: LOG
# 作业线程池处理策略
jobExecutorServiceHandlerType: "SINGLE_THREAD"
# 分片序列号和参数用等号分隔,多个键值对用逗号分隔
# 分片序列号从0开始,不可大于或等于作业分片总数
shardingItemParameters: 0=Beijing,1=Shanghai,2=Guangzhou
# 本地配置是否可覆盖注册中心配置 , 如果可覆盖,每次启动作业都以本地配置为准
overwrite: true
# 是否开启错过任务重新执行
misfire: true
#是否开启任务执行失效转移,开启表示如果作业在一次任务执行中途宕机,允许将该次未完成的任务在另一作业节点上补偿执行
failover: true
Zookeeper 查询:
ls /
查看注册作业:
其中 TestJobMany 是我们注册的定时任务、myOneOffJob 是我们注册的一次性任务
查看分片数:
ls /yzy/TestJobMany/sharding
ls /yzy/myOneOffJob/sharding
7.2. 如果注册发生未知错误,如何清理elasticjob在zookeeper上注册的命令空间
清除我们elasticjob在zookeeper上注册的命令空间
deleteall /yzy
其中yzy是你注册的命名空间
查看:
清除成功,然后重新启动服务,重新注册即可。