背景介绍
首先,我们来思考一些几个业务场景:
- XX 信用卡中心,每月 28 日凌晨 1:00 到 3:00 需要完成全网用户当月的费用清单的生成
- XX 电商平台,需要每天上午 9:00 开始向会员推送送优惠券使用提醒
- XX 公司,需要定时执行 Python 脚本,清理掉某文件服务系统中无效的 tmp 文件
最开始,在单台服务器使用Linux Cron 就能满足定时任务需求,但是随着任务数量的不断增长,单机模式会对机器负载产生巨大的压力,无法保证正常地触发运行任务。由此,就诞生了各种个样的分布式定时任务调度平台,比如 Quartz、XXL-Job、ElasticJob,PowerJob。但是,大部分公司可能都会选择自研
- 自研更容易适配自有基础框架和技术工具
- 自研系统的架构可灵活调整,并适配业务
- 对开源项目做二次开发或者封装第三方 SDK 的开发和维护成本也不低
分布式任务调度系统设计
- 采用分布式架构,解决单体架构遇到的性能瓶颈问题
- 主要由调度器,执行器和Web控制台,API服务四个模块构成
- 根据配置的路由策略进行调度计算、执行和停止具体任务、界面化管理任务和集群资源
整个系统的核心在于调度器,调度器会实现负责管理任务的生命周期,维护任务的依赖关系(DAG 编排),支持定时任务触发,监控任务状态,管理任务的生命周期,维护任务状态机。 - 分配任务到指定执行器。根据任务的类型、等待时间、优先级等信息,按照多种调度算法,对任务进行调度并将任务分发给合理的 执行器 来执行任务
- 根据配置的路由策略进行调度计算、执行和停止具体任务、界面化管理任务和集群资源
开源系统 XXX-Job
一个分布式任务调度系统,基本会实现几个任务注册,任务调度,任务执行几个核心点
- 任务注册:业务方注册任务到XXX-Job Admin
- 任务触发:XXX-Job Admin 根据配置触发任务调度
- 任务调度:任务触发之后,根据调度算法,找到执行器
- 任务执行 :执行器执行任务,返回结果
XXX-Job
调度中心
负责管理调度信息,按照调度配置发出调度请求,自身不承担业务代码。调度系统与任务解耦,提高了系统可用性和稳定性,同时调度系统性能不再受限于任务模块;
支持可视化、简单且动态的管理调度信息,包括任务新建,更新,删除,GLUE开发和任务报警等,所有上述操作都会实时生效,同时支持监控调度结果以及执行日志,支持执行器Failover,支持创建执行器等功能。
执行器
负责接收调度请求并执行任务逻辑。任务模块专注于任务的执行等操作,开发和维护更加简单和高效;接收“调度中心”的执行请求、终止请求和日志请求等。
项目构成
- admin 是xxljob的控制台,可以配置执行器,定时任务,dashboard查看等功能
- core 是业务方也要引入的jar包,内置通过netty于admin进行通信
- samples 项目是测试项目,包含传统spring项目如何引入xxljob和springboot如何引入
核心类调用关系
类名 | 作用 | 备注 |
---|---|---|
XxlJobAdminConfig | 负责创建XxlJobScheduler实例 | |
XxlJobScheduler | 负责创建各种线程,包括任务注册主线程,调度容器的主线程,以及调度参数的配置线程池 JobTriggerPoolHelper | |
JobScheduleHelper | 调度容器,创建一个守护线程查询所有下次执行时间在当前时间5秒内的定时任务,并按条件执行 | |
JobTriggerPoolHelper | 创建操作XxlJobTrigger的线程池,并添加trigger | |
XxlJobTrigger | 表示一个调度参数的配置,会查询具体的定时任务信息XxlJobInfo | |
XxlJob | 定义执行器的注解 | |
JobThread | 调用IJobHandler的executer执行任务,并回调调度中心 | |
IJobHandler | 抽象的执行器接口,定义了要执行的具体内容,同样的也是一个execute方法 | |
EmbedServer | 内嵌的Server,默认端口是9999 | |
ExecutorBiz | 其中的run方法用于调用执行器,有两个是实现类ExecutorBizImpl以及ExecutorBizClient 。 |
任务注册
xxlJobExecutor.setXxlJobBeanList(Arrays.asList(new SampleXxlJob()))
; 注册自己的任务
public void initXxlJobExecutor() {
// load executor prop
Properties xxlJobProp = loadProperties("xxl-job-executor.properties");
// init executor
xxlJobExecutor = new XxlJobSimpleExecutor();
xxlJobExecutor.setAdminAddresses(xxlJobProp.getProperty("xxl.job.admin.addresses"));
xxlJobExecutor.setAccessToken(xxlJobProp.getProperty("xxl.job.accessToken"));
xxlJobExecutor.setAppname(xxlJobProp.getProperty("xxl.job.executor.appname"));
xxlJobExecutor.setAddress(xxlJobProp.getProperty("xxl.job.executor.address"));
xxlJobExecutor.setIp(xxlJobProp.getProperty("xxl.job.executor.ip"));
xxlJobExecutor.setPort(Integer.valueOf(xxlJobProp.getProperty("xxl.job.executor.port")));
xxlJobExecutor.setLogPath(xxlJobProp.getProperty("xxl.job.executor.logpath"));
xxlJobExecutor.setLogRetentionDays(Integer.valueOf(xxlJobProp.getProperty("xxl.job.executor.logretentiondays")));
// registry job bean
xxlJobExecutor.setXxlJobBeanList(Arrays.asList(new SampleXxlJob()));
// start executor
try {
xxlJobExecutor.start();
} catch (Exception e) {
logger.error(e.getMessage(), e);
}
}
xxlJobExecutor 会执行 initJobHandlerMethodRepository :将任务处理handler记录在本地
super.start() 调用 XxlJobExecutor ,
@Override
public void start() {
// init JobHandler Repository (for method)
initJobHandlerMethodRepository(xxlJobBeanList);
// super start
try {
super.start();
} catch (Exception e) {
throw new RuntimeException(e);
}
}
- TriggerCallbackThread: 执行handler 之后上班结果的线程
- initEmbedServer: 初始化服务,这里面也会把自己注册到 admin 里面去,让自己成为一个 executor
public void start() throws Exception {
// init logpath
XxlJobFileAppender.initLogPath(logPath);
// init invoker, admin-client
initAdminBizList(adminAddresses, accessToken);
// init JobLogFileCleanThread
JobLogFileCleanThread.getInstance().start(logRetentionDays);
// init TriggerCallbackThread
TriggerCallbackThread.getInstance().start();
// init executor-server
initEmbedServer(address, ip, port, appname, accessToken);
}
任务调度
admin的核心功能
public void init() throws Exception {
//初始化fastTriggerPool和slowTriggerPool线程池对象
JobTriggerPoolHelper.toStart();
/**
* 开启线程,每90s查询执行器的数据,如果执行器上次更新时间超过90s未更新,就移除这个执行器,并把存活的执行器更新
*/
JobRegistryHelper.getInstance().start();
/**
* 启动线程,查找任务执行失败的任务,
* 1.设置了重试次数,就再次触发任务
* 2.判断是否需邮件预警
*/
JobFailMonitorHelper.getInstance().start();
/**
* 启动线程,处理任务结果丢失的数据
* 任务结果丢失处理:调度记录停留在 "运行中" 状态超过10min,且对应执行器心跳注册失败不在线,则将本地调度主动标记失败;
*/
JobCompleteHelper.getInstance().start();
JobLogReportHelper.getInstance().start();
/**
* 启动线程,执行任务
*/
JobScheduleHelper.getInstance().start();
logger.info(">>>>>>>>> init xxl-job admin success.");
}
任务查询
-
获取数据库的 lock:
preparedStatement = conn.prepareStatement( "select * from xxl_job_lock where lock_name = 'schedule_lock' for update" );
-
找到待调度的任务
List<XxlJobInfo> scheduleList = XxlJobAdminConfig.getAdminConfig().getXxlJobInfoDao().scheduleJobQuery(nowTime + PRE_READ_MS, preReadCount);
-
下发任务:
JobTriggerPoolHelper.trigger(jobInfo.getId(), TriggerTypeEnum.CRON, -1, null, null, null);
public void start(){
// schedule thread
scheduleThread = new Thread(new Runnable() {
@Override
public void run() {
try {
TimeUnit.MILLISECONDS.sleep(5000 - System.currentTimeMillis()%1000 );
} catch (InterruptedException e) {
if (!scheduleThreadToStop) {
logger.error(e.getMessage(), e);
}
}
logger.info(">>>>>>>>> init xxl-job admin scheduler success.");
// pre-read count: treadpool-size * trigger-qps (each trigger cost 50ms, qps = 1000/50 = 20)
int preReadCount = (XxlJobAdminConfig.getAdminConfig().getTriggerPoolFastMax() + XxlJobAdminConfig.getAdminConfig().getTriggerPoolSlowMax()) * 20;
while (!scheduleThreadToStop) {
// Scan Job
long start = System.currentTimeMillis();
Connection conn = null;
Boolean connAutoCommit = null;
PreparedStatement preparedStatement = null;
boolean preReadSuc = true;
try {
conn = XxlJobAdminConfig.getAdminConfig().getDataSource().getConnection();
connAutoCommit = conn.getAutoCommit();
conn.setAutoCommit(false);
preparedStatement = conn.prepareStatement( "select * from xxl_job_lock where lock_name = 'schedule_lock' for update" );
preparedStatement.execute();
// tx start
// 1、 查询从当前时间+5秒内要执行的任务
long nowTime = System.currentTimeMillis();
List<XxlJobInfo> scheduleList = XxlJobAdminConfig.getAdminConfig().getXxlJobInfoDao().scheduleJobQuery(nowTime + PRE_READ_MS, preReadCount);
if (scheduleList!=null && scheduleList.size()>0) {
// 2、push time-ring
for (XxlJobInfo jobInfo: scheduleList) {
// time-ring jump todo 如果任务超时5秒
if (nowTime > jobInfo.getTriggerNextTime() + PRE_READ_MS) {
// 2.1、trigger-expire > 5s:pass && make next-trigger-time
logger.warn(">>>>>>>>>>> xxl-job, schedule misfire, jobId = " + jobInfo.getId());
// 1、misfire match
MisfireStrategyEnum misfireStrategyEnum = MisfireStrategyEnum.match(jobInfo.getMisfireStrategy(), MisfireStrategyEnum.DO_NOTHING);
if (MisfireStrategyEnum.FIRE_ONCE_NOW == misfireStrategyEnum) {
// FIRE_ONCE_NOW 》 trigger todo 如果任务配置的"调度过期策略"是"立即执行一次",那么就触发一次任务
JobTriggerPoolHelper.trigger(jobInfo.getId(), TriggerTypeEnum.MISFIRE, -1, null, null, null);
logger.debug(">>>>>>>>>>> xxl-job, schedule push trigger : jobId = " + jobInfo.getId() );
}
// 2、fresh next todo 从当前时间开始,计算任务的下一次执行时间
refreshNextValidTime(jobInfo, new Date());
} else if (nowTime > jobInfo.getTriggerNextTime()) {
// 2.2、trigger-expire < 5s:direct-trigger && make next-trigger-time todo 任务执行时间在当前时间的5s内
// 1、trigger 触发任务
JobTriggerPoolHelper.trigger(jobInfo.getId(), TriggerTypeEnum.CRON, -1, null, null, null);
logger.debug(">>>>>>>>>>> xxl-job, schedule push trigger : jobId = " + jobInfo.getId() );
// 2、fresh next todo 从当前时间开始,计算任务的下一次执行时间
refreshNextValidTime(jobInfo, new Date());
// next-trigger-time in 5s, pre-read again todo 如果下一次的执行时间在未来5s内
if (jobInfo.getTriggerStatus()==1 && nowTime + PRE_READ_MS > jobInfo.getTriggerNextTime()) {
// 1、make ring second
int ringSecond = (int)((jobInfo.getTriggerNextTime()/1000)%60);
// 2、push time ring
pushTimeRing(ringSecond, jobInfo.getId());
// 3、fresh next
refreshNextValidTime(jobInfo, new Date(jobInfo.getTriggerNextTime()));
}
} else {
// 还没有到达任务执行的时间
// 2.3、trigger-pre-read:time-ring trigger && make next-trigger-time
// 1 计算剩余的秒数字
int ringSecond = (int)((jobInfo.getTriggerNextTime()/1000)%60);
// 2、把剩余秒数--任务id存入map中 ;==>下面的 ringThread 线程,会每一秒执行一次,查到对应的数据后,触发任务
pushTimeRing(ringSecond, jobInfo.getId());
// 3、重新计算下一次调度时间
refreshNextValidTime(jobInfo, new Date(jobInfo.getTriggerNextTime()));
}
}
// 3、update trigger info TODO 修改jonInfo的内容
for (XxlJobInfo jobInfo: scheduleList) {
XxlJobAdminConfig.getAdminConfig().getXxlJobInfoDao().scheduleUpdate(jobInfo);
}
} else {
preReadSuc = false;
}
任务下发
-
初始化下发参数,
triggerParam.setExecutorHandler(jobInfo.getExecutorHandler());
-
找到需要下发的地址:
routeAddressResult = executorRouteStrategyEnum.getRouter().route(triggerParam, group.getRegistryList());
-
路由算法
-
调用执行器接口:·XxlJobRemotingUtil.postBody(addressUrl + “run”, accessToken, timeout, triggerParam, String.class);·
private static void processTrigger(XxlJobGroup group, XxlJobInfo jobInfo, int finalFailRetryCount, TriggerTypeEnum triggerType, int index, int total){
// param
ExecutorBlockStrategyEnum blockStrategy = ExecutorBlockStrategyEnum.match(jobInfo.getExecutorBlockStrategy(), ExecutorBlockStrategyEnum.SERIAL_EXECUTION); // block strategy
ExecutorRouteStrategyEnum executorRouteStrategyEnum = ExecutorRouteStrategyEnum.match(jobInfo.getExecutorRouteStrategy(), null); // route strategy
String shardingParam = (ExecutorRouteStrategyEnum.SHARDING_BROADCAST==executorRouteStrategyEnum)?String.valueOf(index).concat("/").concat(String.valueOf(total)):null;
XxlJobLog jobLog = new XxlJobLog();
jobLog.setJobGroup(jobInfo.getJobGroup());
jobLog.setJobId(jobInfo.getId());
jobLog.setTriggerTime(new Date());
XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().save(jobLog);
logger.debug(">>>>>>>>>>> xxl-job trigger start, jobId:{}", jobLog.getId());
// 初始化下发的参数
TriggerParam triggerParam = new TriggerParam();
triggerParam.setJobId(jobInfo.getId());
triggerParam.setExecutorHandler(jobInfo.getExecutorHandler());
/ 代码省略
// 找到需要下发的地址
String address = null;
ReturnT<String> routeAddressResult = null;
if (group.getRegistryList()!=null && !group.getRegistryList().isEmpty()) {
// 广播地址处理
if (ExecutorRouteStrategyEnum.SHARDING_BROADCAST == executorRouteStrategyEnum) {
if (index < group.getRegistryList().size()) {
address = group.getRegistryList().get(index);
} else {
address = group.getRegistryList().get(0);
}
} else {
// 根据路由找到地址
routeAddressResult = executorRouteStrategyEnum.getRouter().route(triggerParam, group.getRegistryList());
if (routeAddressResult.getCode() == ReturnT.SUCCESS_CODE) {
address = routeAddressResult.getContent();
}
}
} else {
routeAddressResult = new ReturnT<String>(ReturnT.FAIL_CODE, I18nUtil.getString("jobconf_trigger_address_empty"));
}
// 4、trigger remote executor
ReturnT<String> triggerResult = null;
if (address != null) {
// 调用执行器
//
triggerResult = runExecutor(triggerParam, address);
} else {
triggerResult = new ReturnT<String>(ReturnT.FAIL_CODE, null);
}
/ 代码省略
logger.debug(">>>>>>>>>>> xxl-job trigger end, jobId:{}", jobLog.getId());
调用执行器接口
public ReturnT<String> run(TriggerParam triggerParam) {
return XxlJobRemotingUtil.postBody(addressUrl + "run", accessToken, timeout, triggerParam, String.class);
}
任务执行
admin 会调用执行器的 addressUrl + “run”,这个接口主要用来触发任务执行的
// services mapping
try {
switch (uri) {
case "/beat":
// 类似心跳接口
return executorBiz.beat();
case "/idleBeat":
IdleBeatParam idleBeatParam = GsonTool.fromJson(requestData, IdleBeatParam.class);
return executorBiz.idleBeat(idleBeatParam);
case "/run":
TriggerParam triggerParam = GsonTool.fromJson(requestData, TriggerParam.class);
return executorBiz.run(triggerParam);
case "/kill":
KillParam killParam = GsonTool.fromJson(requestData, KillParam.class);
return executorBiz.kill(killParam);
case "/log":
LogParam logParam = GsonTool.fromJson(requestData, LogParam.class);
return executorBiz.log(logParam);
default:
return new ReturnT<String>(ReturnT.FAIL_CODE, "invalid request, uri-mapping(" + uri + ") not found.");
}
} catch (Exception e) {
logger.error(e.getMessage(), e);
return new ReturnT<String>(ReturnT.FAIL_CODE, "request error:" + ThrowableUtil.toString(e));
}
- 这里面主要做下面几件事情
- 根据任务类型,创建不同的 jobHandler
- 创建一个线程执行 jobHandler,监听参数队列
- 将任务参数给到 参数队列,jobHandler线程 获取参数
public ReturnT<String> run(TriggerParam triggerParam) {
// load old:jobHandler + jobThread TODO 查询这个任务的线程,第一次执行任务,是没有这个任务的
JobThread jobThread = XxlJobExecutor.loadJobThread(triggerParam.getJobId());
IJobHandler jobHandler = jobThread!=null?jobThread.getHandler():null;
String removeOldReason = null;
// 根据任务类型,创建不同的 jobHandler
GlueTypeEnum glueTypeEnum = GlueTypeEnum.match(triggerParam.getGlueType());
if (GlueTypeEnum.BEAN == glueTypeEnum) {
// 创建 jobhandler,备注,执行器在启动的时候,会扫描@xxlJob注解修饰的方法,注册到map中,这里直接取出来,IJobHandler就是对jdk的method对象的封装
IJobHandler newJobHandler = XxlJobExecutor.loadJobHandler(triggerParam.getExecutorHandler());
if (jobThread!=null && jobHandler != newJobHandler) {
// change handler, need kill old thread
removeOldReason = "change jobhandler or glue type, and terminate the old job thread.";
jobThread = null;
jobHandler = null;
}
if (jobHandler == null) {
jobHandler = newJobHandler;
if (jobHandler == null) {
return new ReturnT<String>(ReturnT.FAIL_CODE, "job handler [" + triggerParam.getExecutorHandler() + "] not found.");
}
}
} else if (GlueTypeEnum.GLUE_GROOVY == glueTypeEnum) {
// valid old jobThread
if (jobThread != null &&
!(jobThread.getHandler() instanceof GlueJobHandler
&& ((GlueJobHandler) jobThread.getHandler()).getGlueUpdatetime()==triggerParam.getGlueUpdatetime() )) {
// change handler or gluesource updated, need kill old thread
removeOldReason = "change job source or glue type, and terminate the old job thread.";
jobThread = null;
jobHandler = null;
}
if (jobHandler == null) {
try {
IJobHandler originJobHandler = GlueFactory.getInstance().loadNewInstance(triggerParam.getGlueSource());
jobHandler = new GlueJobHandler(originJobHandler, triggerParam.getGlueUpdatetime());
} catch (Exception e) {
logger.error(e.getMessage(), e);
return new ReturnT<String>(ReturnT.FAIL_CODE, e.getMessage());
}
}
}
// 判断堵塞策略
if (jobThread != null) {
ExecutorBlockStrategyEnum blockStrategy = ExecutorBlockStrategyEnum.match(triggerParam.getExecutorBlockStrategy(), null);
// DISCARD_LATER,不能直接执行的话,就直接退出
if (ExecutorBlockStrategyEnum.DISCARD_LATER == blockStrategy) {
// discard when running
if (jobThread.isRunningOrHasQueue()) {
return new ReturnT<String>(ReturnT.FAIL_CODE, "block strategy effect:"+ExecutorBlockStrategyEnum.DISCARD_LATER.getTitle());
}
} else if (ExecutorBlockStrategyEnum.COVER_EARLY == blockStrategy) {
//替换一个新的 jobThread
if (jobThread.isRunningOrHasQueue()) {
removeOldReason = "block strategy effect:" + ExecutorBlockStrategyEnum.COVER_EARLY.getTitle();
jobThread = null;
}
} else {
// just queue trigger
}
}
// jobThread 为null的话,就创建一个,并执行线程,是一个死循环,从 triggerQueue中读取参数,并执行
if (jobThread == null) {
// 这里会stop 掉之前旧的存在的 jobThread
jobThread = XxlJobExecutor.registJobThread(triggerParam.getJobId(), jobHandler, removeOldReason);
}
// 把参数放在任务队列线程中的 triggerQueue 中
// pushTriggerQueue 也会判断是否是重复执行的 job
ReturnT<String> pushResult = jobThread.pushTriggerQueue(triggerParam);
return pushResult;
}
结果返回
- 将结果给:
TriggerCallbackThread.pushCallBack
TriggerCallbackThread
消费返回的结果admin
接受返回结果,写入到DB
TriggerCallbackThread.pushCallBack(new HandleCallbackParam(
triggerParam.getLogId(),
triggerParam.getLogDateTime(),
XxlJobContext.getXxlJobContext().getHandleCode(),
XxlJobContext.getXxlJobContext().getHandleMsg() )
拿到任务的执行结果
@Override
public void run() {
// normal callback
while(!toStop){
try {
// 拿到任务的执行结果
HandleCallbackParam callback = getInstance().callBackQueue.take();
if (callback != null) {
// callback list param
List<HandleCallbackParam> callbackParamList = new ArrayList<HandleCallbackParam>();
int drainToNum = getInstance().callBackQueue.drainTo(callbackParamList);
callbackParamList.add(callback);
// callback, will retry if error
if (callbackParamList!=null && callbackParamList.size()>0) {
// 调用 admin 接口:api/callback 给 admin
doCallback(callbackParamList);
}
}
} catch (Exception e) {
if (!toStop) {
logger.error(e.getMessage(), e);
}
}
}
// 代码省略
}
});
总结
一个任务的生命周期
不足之处
- 待调度的任务存储在一张表中,如果待调度任务过多的时候,可能会造成任务调度延迟
- 只能选择一个调度策略
- 执行器服务是优雅关闭,会自动调用 /reigstRomeve方法告诉admin,自己移除,admin会通过定时任务扫描,每90秒扫描一次执行器上次注册到时间,如果超过90s,就主动移除这个执行器异常退出,某些策略会导致任务在90s内一直失败(比如第一个)
- 分片策略,这个策略是所有的机器都执行相同的参数,由执行器自己区分
一个任务调度系统核心点
- 数据量大时保证调度时间:分库分表
- 任务丢失处理:调度器定时扫描执行时间过长的任务
// 任务结果丢失处理:调度记录停留在 "运行中" 状态超过10min,且对应执行器心跳注册失败不在线,则将本地调度主动标记失败;
Date losedTime = DateUtil.addMinutes(new Date(), -10);
List<Long> losedJobIds = XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().findLostJobIds(losedTime);
- 任务幂等性:不能重复执行,在执行器记录现在有哪一些任务在执行, 但是,执行器在执行业务代码时也建议最好幂等
public ReturnT<String> pushTriggerQueue(TriggerParam triggerParam) {
// triggerLogIdSet 记录了当前执行器正在执行的任务,发现重复时不会写入到 triggerQueue 中
if (triggerLogIdSet.contains(triggerParam.getLogId())) {
logger.info(">>>>>>>>>>> repeate trigger job, logId:{}", triggerParam.getLogId());
return new ReturnT<String>(ReturnT.FAIL_CODE, "repeate trigger job, logId:" + triggerParam.getLogId());
}
triggerLogIdSet.add(triggerParam.getLogId());
triggerQueue.add(triggerParam);
return ReturnT.SUCCESS;
}
- 调度策略:解决如何分配到哪一个执行器问题,实现负载均衡,分片处理,高容错,节点故障转移作业
- 实现负载均衡的一个需要执行器上报当前的状态:当前的任务执行数量,待调度的任务数量,读取服务器系统负载水平,根据这些,选择一个综合数值最小的执行器出来
- 分片处理:分片处理通常需要调度器支持
- java 实现分片
private static String calculatedExecutorParamValue(XxlJobInfo jobInfo,Integer index,Integer adminTotal) {
String executorParam = jobInfo.getExecutorParam();
if(StringUtils.isEmpty(executorParam)){
return null;
}
try {
List<String> paramList = JSONObject.parseArray(executorParam, String.class);
if(CollectionUtils.isEmpty(paramList)){
return null;
}
List<List<String>> paramAverageList = averageAssign(paramList, adminTotal);
List<String> result = paramAverageList.get(index);
return JSON.toJSONString(result);
}catch (Exception e){
logger.error("分片并行计算,解析参数错误,参数:{},错误原因:{}",executorParam,e.getMessage());
}
return null;
}
public static <T> List <List<T>> averageAssign(List<T>source,int n){
List <List<T>> result=new ArrayList<List<T>>();
int remainder=source.size()%n; //先计算出余数
int number=source.size()/n; //然后是商
int offset=0;//偏移量(用以标识加的余数)
for(int i=0;i<n;i++){
List<T>value;
if(remainder>0){
value=source.subList(i*number+offset, (i+1)*number+offset+1);
remainder--;
offset++;
}else{
value=source.subList(i*number+offset, (i+1)*number+offset);
}
result.add(value);
}
return result;
}
- go 实现分片
package main
import (
"fmt"
)
func chunk(slice []int, size int) [][]int {
var chunks [][]int
chunkSize := (len(slice) + size - 1) / size
for i := 0; i < len(slice); i += chunkSize {
end := i + chunkSize
if end > len(slice) {
end = len(slice)
}
chunks = append(chunks, slice[i:end])
}
return chunks
}
func main() {
slice := []int{1, 2, 3, 4, 5}
chunks := chunk(slice, 2)
fmt.Println(chunks)
}
任务调度依赖:工作流模式调度依赖,一个有向无环图:DAG(directed acyclic graph)
package main
import (
"fmt"
"sync"
"time"
)
//图结构
type DAG struct {
Vertexs []*Vertex
}
//顶点
type Vertex struct {
Key string
Value interface{}
Parents []*Vertex
Children []*Vertex
}
//添加顶点
func (dag *DAG) AddVertex(v *Vertex) {
dag.Vertexs = append(dag.Vertexs, v)
}
//添加边
func (dag *DAG) AddEdge(from, to *Vertex) {
from.Children = append(from.Children, to)
to.Parents = append(to.Parents, from)
}
func main() {
var dag = &DAG{}
//添加顶点
va := &Vertex{Key: "a", Value: "1"}
vb := &Vertex{Key: "b", Value: "2"}
vc := &Vertex{Key: "c", Value: "3"}
vd := &Vertex{Key: "d", Value: "4"}
ve := &Vertex{Key: "e", Value: "5"}
vf := &Vertex{Key: "f", Value: "6"}
vg := &Vertex{Key: "g", Value: "7"}
vh := &Vertex{Key: "h", Value: "8"}
vi := &Vertex{Key: "i", Value: "9"}
//添加边
dag.AddEdge(va, vb)
dag.AddEdge(va, vc)
dag.AddEdge(va, vd)
dag.AddEdge(vb, ve)
dag.AddEdge(vb, vh)
dag.AddEdge(vb, vf)
dag.AddEdge(vc, vf)
dag.AddEdge(vc, vg)
dag.AddEdge(vd, vg)
dag.AddEdge(vh, vi)
dag.AddEdge(ve, vi)
dag.AddEdge(vf, vi)
dag.AddEdge(vg, vi)
//[1] [] { a }
//[2] [] { b, c, d }
//[3] [] { h, e, f, g }
//[4] [] { i }
all := LayerBFS(va)
startTime := time.Now()
for _, layer := range all {
fmt.Println("------------------")
doTasks(layer)
}
fmt.Printf("cost:%f\n", time.Since(startTime).Seconds())
}
type Queue []interface{}
func (q *Queue) Push(x interface{}) {
*q = append(*q, x)
}
func (q *Queue) Pop() interface{} {
h := *q
var el interface{}
l := len(h)
el, *q = h[0], h[1:l]
return el
}
func (q *Queue) Len() int {
return len(*q)
}
func NewQueue() *Queue {
return &Queue{}
}
func LayerBFS(root *Vertex) [][]*Vertex {
queue := NewQueue()
queue.Push(root)
visited := make(map[string]*Vertex)
all := make([][]*Vertex, 0)
for queue.Len() > 0 {
qSize := queue.Len()
tmp := make([]*Vertex, 0)
for i := 0; i < qSize; i++ {
//pop vertex
e := queue.Pop()
currVert := e.(*Vertex)
if _, ok := visited[currVert.Key]; ok {
continue
}
visited[currVert.Key] = currVert
tmp = append(tmp, currVert)
for _, val := range currVert.Children {
if _, ok := visited[val.Key]; !ok {
queue.Push(val) //add child
}
}
}
all = append(all, [][]*Vertex{tmp}...)
}
return all
}
//并发执行
func doTasks(vertexs []*Vertex) {
var wg sync.WaitGroup
startTime := time.Now()
for _, v := range vertexs {
wg.Add(1)
go func(v *Vertex) {
defer wg.Done()
time.Sleep(2 * time.Second)
fmt.Printf("do %v, result is %v \n", v.Key, v.Value)
}(v) //notice
}
wg.Wait()
fmt.Printf("cost:%0.0f\n", time.Since(startTime).Seconds())
}
文档参考
- 实现一个任务调度系统,看这篇就够了
- 伴鱼分布式调度系统 Jarvis 的设计与实现
- 如何设计一个分布式任务调度系统