分布式任务调度系统分析

news2024/9/23 3:29:58

背景介绍

首先,我们来思考一些几个业务场景:

  • XX 信用卡中心,每月 28 日凌晨 1:00 到 3:00 需要完成全网用户当月的费用清单的生成
  • XX 电商平台,需要每天上午 9:00 开始向会员推送送优惠券使用提醒
  • XX 公司,需要定时执行 Python 脚本,清理掉某文件服务系统中无效的 tmp 文件

最开始,在单台服务器使用Linux Cron 就能满足定时任务需求,但是随着任务数量的不断增长,单机模式会对机器负载产生巨大的压力,无法保证正常地触发运行任务。由此,就诞生了各种个样的分布式定时任务调度平台,比如 Quartz、XXL-Job、ElasticJob,PowerJob。但是,大部分公司可能都会选择自研

  • 自研更容易适配自有基础框架和技术工具
  • 自研系统的架构可灵活调整,并适配业务
  • 对开源项目做二次开发或者封装第三方 SDK 的开发和维护成本也不低

分布式任务调度系统设计

在这里插入图片描述

  • 采用分布式架构,解决单体架构遇到的性能瓶颈问题
  • 主要由调度器,执行器和Web控制台,API服务四个模块构成
    • 根据配置的路由策略进行调度计算、执行和停止具体任务、界面化管理任务和集群资源
      整个系统的核心在于调度器,调度器会实现负责管理任务的生命周期,维护任务的依赖关系(DAG 编排),支持定时任务触发,监控任务状态,管理任务的生命周期,维护任务状态机。
    • 分配任务到指定执行器。根据任务的类型、等待时间、优先级等信息,按照多种调度算法,对任务进行调度并将任务分发给合理的 执行器 来执行任务

开源系统 XXX-Job

一个分布式任务调度系统,基本会实现几个任务注册,任务调度,任务执行几个核心点

  • 任务注册:业务方注册任务到XXX-Job Admin
  • 任务触发:XXX-Job Admin 根据配置触发任务调度
  • 任务调度:任务触发之后,根据调度算法,找到执行器
  • 任务执行 :执行器执行任务,返回结果

XXX-Job

在这里插入图片描述

调度中心

负责管理调度信息,按照调度配置发出调度请求,自身不承担业务代码。调度系统与任务解耦,提高了系统可用性和稳定性,同时调度系统性能不再受限于任务模块;
支持可视化、简单且动态的管理调度信息,包括任务新建,更新,删除,GLUE开发和任务报警等,所有上述操作都会实时生效,同时支持监控调度结果以及执行日志,支持执行器Failover,支持创建执行器等功能。

执行器

负责接收调度请求并执行任务逻辑。任务模块专注于任务的执行等操作,开发和维护更加简单和高效;接收“调度中心”的执行请求、终止请求和日志请求等。

项目构成

在这里插入图片描述

  • admin 是xxljob的控制台,可以配置执行器,定时任务,dashboard查看等功能
  • core 是业务方也要引入的jar包,内置通过netty于admin进行通信
  • samples 项目是测试项目,包含传统spring项目如何引入xxljob和springboot如何引入

核心类调用关系

在这里插入图片描述

类名作用备注
XxlJobAdminConfig负责创建XxlJobScheduler实例
XxlJobScheduler负责创建各种线程,包括任务注册主线程,调度容器的主线程,以及调度参数的配置线程池 JobTriggerPoolHelper
JobScheduleHelper调度容器,创建一个守护线程查询所有下次执行时间在当前时间5秒内的定时任务,并按条件执行
JobTriggerPoolHelper创建操作XxlJobTrigger的线程池,并添加trigger
XxlJobTrigger表示一个调度参数的配置,会查询具体的定时任务信息XxlJobInfo
XxlJob定义执行器的注解
JobThread调用IJobHandler的executer执行任务,并回调调度中心
IJobHandler抽象的执行器接口,定义了要执行的具体内容,同样的也是一个execute方法
EmbedServer内嵌的Server,默认端口是9999
ExecutorBiz其中的run方法用于调用执行器,有两个是实现类ExecutorBizImpl以及ExecutorBizClient 。

任务注册

xxlJobExecutor.setXxlJobBeanList(Arrays.asList(new SampleXxlJob())); 注册自己的任务

public void initXxlJobExecutor() {
 
    // load executor prop
    Properties xxlJobProp = loadProperties("xxl-job-executor.properties");
 
    // init executor
    xxlJobExecutor = new XxlJobSimpleExecutor();
    xxlJobExecutor.setAdminAddresses(xxlJobProp.getProperty("xxl.job.admin.addresses"));
    xxlJobExecutor.setAccessToken(xxlJobProp.getProperty("xxl.job.accessToken"));
    xxlJobExecutor.setAppname(xxlJobProp.getProperty("xxl.job.executor.appname"));
    xxlJobExecutor.setAddress(xxlJobProp.getProperty("xxl.job.executor.address"));
    xxlJobExecutor.setIp(xxlJobProp.getProperty("xxl.job.executor.ip"));
    xxlJobExecutor.setPort(Integer.valueOf(xxlJobProp.getProperty("xxl.job.executor.port")));
    xxlJobExecutor.setLogPath(xxlJobProp.getProperty("xxl.job.executor.logpath"));
    xxlJobExecutor.setLogRetentionDays(Integer.valueOf(xxlJobProp.getProperty("xxl.job.executor.logretentiondays")));
 
    // registry job bean
    xxlJobExecutor.setXxlJobBeanList(Arrays.asList(new SampleXxlJob()));
 
    // start executor
    try {
        xxlJobExecutor.start();
    } catch (Exception e) {
        logger.error(e.getMessage(), e);
    }
}
xxlJobExecutor 会执行 initJobHandlerMethodRepository :将任务处理handler记录在本地

 super.start() 调用 XxlJobExecutor@Override
public void start() {
 
    // init JobHandler Repository (for method)
    initJobHandlerMethodRepository(xxlJobBeanList);
 
    // super start
    try {
        super.start();
    } catch (Exception e) {
        throw new RuntimeException(e);
    }
}
  • TriggerCallbackThread: 执行handler 之后上班结果的线程
  • initEmbedServer: 初始化服务,这里面也会把自己注册到 admin 里面去,让自己成为一个 executor
public void start() throws Exception {
 
    // init logpath
    XxlJobFileAppender.initLogPath(logPath);
 
    // init invoker, admin-client
    initAdminBizList(adminAddresses, accessToken);
 
 
    // init JobLogFileCleanThread
    JobLogFileCleanThread.getInstance().start(logRetentionDays);
 
    // init TriggerCallbackThread
    TriggerCallbackThread.getInstance().start();
 
    // init executor-server
    initEmbedServer(address, ip, port, appname, accessToken);
}

任务调度

admin的核心功能

public void init() throws Exception {
    //初始化fastTriggerPool和slowTriggerPool线程池对象
    JobTriggerPoolHelper.toStart();
 
    /**
     * 开启线程,每90s查询执行器的数据,如果执行器上次更新时间超过90s未更新,就移除这个执行器,并把存活的执行器更新
     */
    JobRegistryHelper.getInstance().start();
 
    /**
     * 启动线程,查找任务执行失败的任务,
     * 1.设置了重试次数,就再次触发任务
     * 2.判断是否需邮件预警
     */
    JobFailMonitorHelper.getInstance().start();
 
    /**
     * 启动线程,处理任务结果丢失的数据
     * 任务结果丢失处理:调度记录停留在 "运行中" 状态超过10min,且对应执行器心跳注册失败不在线,则将本地调度主动标记失败;
     */
    JobCompleteHelper.getInstance().start();
 
    JobLogReportHelper.getInstance().start();
 
    /**
 
     * 启动线程,执行任务
     */
    JobScheduleHelper.getInstance().start();
 
    logger.info(">>>>>>>>> init xxl-job admin success.");
}

任务查询

  • 获取数据库的 lock:

    • preparedStatement = conn.prepareStatement( "select * from xxl_job_lock where lock_name = 'schedule_lock' for update" );
  • 找到待调度的任务

    • List<XxlJobInfo> scheduleList = XxlJobAdminConfig.getAdminConfig().getXxlJobInfoDao().scheduleJobQuery(nowTime + PRE_READ_MS, preReadCount);
  • 下发任务:

    • JobTriggerPoolHelper.trigger(jobInfo.getId(), TriggerTypeEnum.CRON, -1, null, null, null);
public void start(){
 
    // schedule thread
    scheduleThread = new Thread(new Runnable() {
        @Override
        public void run() {
 
            try {
                TimeUnit.MILLISECONDS.sleep(5000 - System.currentTimeMillis()%1000 );
            } catch (InterruptedException e) {
                if (!scheduleThreadToStop) {
                    logger.error(e.getMessage(), e);
                }
            }
            logger.info(">>>>>>>>> init xxl-job admin scheduler success.");
 
            // pre-read count: treadpool-size * trigger-qps (each trigger cost 50ms, qps = 1000/50 = 20)
            int preReadCount = (XxlJobAdminConfig.getAdminConfig().getTriggerPoolFastMax() + XxlJobAdminConfig.getAdminConfig().getTriggerPoolSlowMax()) * 20;
 
            while (!scheduleThreadToStop) {
 
                // Scan Job
                long start = System.currentTimeMillis();
 
                Connection conn = null;
                Boolean connAutoCommit = null;
                PreparedStatement preparedStatement = null;
 
                boolean preReadSuc = true;
                try {
 
                    conn = XxlJobAdminConfig.getAdminConfig().getDataSource().getConnection();
                    connAutoCommit = conn.getAutoCommit();
                    conn.setAutoCommit(false);
 
                    preparedStatement = conn.prepareStatement(  "select * from xxl_job_lock where lock_name = 'schedule_lock' for update" );
                    preparedStatement.execute();
 
                    // tx start
 
                    // 1、 查询从当前时间+5秒内要执行的任务
                    long nowTime = System.currentTimeMillis();
                    List<XxlJobInfo> scheduleList = XxlJobAdminConfig.getAdminConfig().getXxlJobInfoDao().scheduleJobQuery(nowTime + PRE_READ_MS, preReadCount);
                    if (scheduleList!=null && scheduleList.size()>0) {
                        // 2、push time-ring
                        for (XxlJobInfo jobInfo: scheduleList) {
 
                            // time-ring jump todo 如果任务超时5秒
                            if (nowTime > jobInfo.getTriggerNextTime() + PRE_READ_MS) {
                                // 2.1、trigger-expire > 5s:pass && make next-trigger-time
                                logger.warn(">>>>>>>>>>> xxl-job, schedule misfire, jobId = " + jobInfo.getId());
 
                                // 1、misfire match
                                MisfireStrategyEnum misfireStrategyEnum = MisfireStrategyEnum.match(jobInfo.getMisfireStrategy(), MisfireStrategyEnum.DO_NOTHING);
                                if (MisfireStrategyEnum.FIRE_ONCE_NOW == misfireStrategyEnum) {
                                    // FIRE_ONCE_NOW 》 trigger todo 如果任务配置的"调度过期策略"是"立即执行一次",那么就触发一次任务
                                    JobTriggerPoolHelper.trigger(jobInfo.getId(), TriggerTypeEnum.MISFIRE, -1, null, null, null);
                                    logger.debug(">>>>>>>>>>> xxl-job, schedule push trigger : jobId = " + jobInfo.getId() );
                                }
 
                                // 2、fresh next todo 从当前时间开始,计算任务的下一次执行时间
                                refreshNextValidTime(jobInfo, new Date());
 
                            } else if (nowTime > jobInfo.getTriggerNextTime()) {
                                // 2.2、trigger-expire < 5s:direct-trigger && make next-trigger-time todo 任务执行时间在当前时间的5s内
 
                                // 1、trigger  触发任务
                                JobTriggerPoolHelper.trigger(jobInfo.getId(), TriggerTypeEnum.CRON, -1, null, null, null);
                                logger.debug(">>>>>>>>>>> xxl-job, schedule push trigger : jobId = " + jobInfo.getId() );
 
                                // 2、fresh next todo 从当前时间开始,计算任务的下一次执行时间
                                refreshNextValidTime(jobInfo, new Date());
 
                                // next-trigger-time in 5s, pre-read again todo 如果下一次的执行时间在未来5s内
                                if (jobInfo.getTriggerStatus()==1 && nowTime + PRE_READ_MS > jobInfo.getTriggerNextTime()) {
 
                                    // 1、make ring second
                                    int ringSecond = (int)((jobInfo.getTriggerNextTime()/1000)%60);
 
                                    // 2、push time ring
                                    pushTimeRing(ringSecond, jobInfo.getId());
 
                                    // 3、fresh next
                                    refreshNextValidTime(jobInfo, new Date(jobInfo.getTriggerNextTime()));
 
                                }
 
                            } else {
                                // 还没有到达任务执行的时间
                                // 2.3、trigger-pre-read:time-ring trigger && make next-trigger-time
 
                                // 1 计算剩余的秒数字
                                int ringSecond = (int)((jobInfo.getTriggerNextTime()/1000)%60);
 
                                // 2、把剩余秒数--任务id存入map中 ;==>下面的 ringThread 线程,会每一秒执行一次,查到对应的数据后,触发任务
                                pushTimeRing(ringSecond, jobInfo.getId());
 
                                // 3、重新计算下一次调度时间
                                refreshNextValidTime(jobInfo, new Date(jobInfo.getTriggerNextTime()));
 
                            }
 
                        }
 
                        // 3、update trigger info TODO 修改jonInfo的内容
                        for (XxlJobInfo jobInfo: scheduleList) {
                            XxlJobAdminConfig.getAdminConfig().getXxlJobInfoDao().scheduleUpdate(jobInfo);
                        }
 
                    } else {
                        preReadSuc = false;
                    }

任务下发

  • 初始化下发参数, triggerParam.setExecutorHandler(jobInfo.getExecutorHandler());

  • 找到需要下发的地址: routeAddressResult = executorRouteStrategyEnum.getRouter().route(triggerParam, group.getRegistryList());

  • 路由算法
    在这里插入图片描述

  • 调用执行器接口:·XxlJobRemotingUtil.postBody(addressUrl + “run”, accessToken, timeout, triggerParam, String.class);·

private static void processTrigger(XxlJobGroup group, XxlJobInfo jobInfo, int finalFailRetryCount, TriggerTypeEnum triggerType, int index, int total){
 
        // param
        ExecutorBlockStrategyEnum blockStrategy = ExecutorBlockStrategyEnum.match(jobInfo.getExecutorBlockStrategy(), ExecutorBlockStrategyEnum.SERIAL_EXECUTION);  // block strategy
        ExecutorRouteStrategyEnum executorRouteStrategyEnum = ExecutorRouteStrategyEnum.match(jobInfo.getExecutorRouteStrategy(), null);    // route strategy
        String shardingParam = (ExecutorRouteStrategyEnum.SHARDING_BROADCAST==executorRouteStrategyEnum)?String.valueOf(index).concat("/").concat(String.valueOf(total)):null;
 
        XxlJobLog jobLog = new XxlJobLog();
        jobLog.setJobGroup(jobInfo.getJobGroup());
        jobLog.setJobId(jobInfo.getId());
        jobLog.setTriggerTime(new Date());
        XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().save(jobLog);
        logger.debug(">>>>>>>>>>> xxl-job trigger start, jobId:{}", jobLog.getId());
 
        // 初始化下发的参数
        TriggerParam triggerParam = new TriggerParam();
        triggerParam.setJobId(jobInfo.getId());
        triggerParam.setExecutorHandler(jobInfo.getExecutorHandler());
 
        / 代码省略
        // 找到需要下发的地址
        String address = null;
        ReturnT<String> routeAddressResult = null;
        if (group.getRegistryList()!=null && !group.getRegistryList().isEmpty()) {
           // 广播地址处理
            if (ExecutorRouteStrategyEnum.SHARDING_BROADCAST == executorRouteStrategyEnum) {
                if (index < group.getRegistryList().size()) {
                    address = group.getRegistryList().get(index);
                } else {
                    address = group.getRegistryList().get(0);
                }
            } else {
                // 根据路由找到地址
                routeAddressResult = executorRouteStrategyEnum.getRouter().route(triggerParam, group.getRegistryList());
                if (routeAddressResult.getCode() == ReturnT.SUCCESS_CODE) {
                    address = routeAddressResult.getContent();
                }
            }
        } else {
            routeAddressResult = new ReturnT<String>(ReturnT.FAIL_CODE, I18nUtil.getString("jobconf_trigger_address_empty"));
        }
 
        // 4、trigger remote executor
        ReturnT<String> triggerResult = null;
        if (address != null) {
            // 调用执行器
            // 
            triggerResult = runExecutor(triggerParam, address);
        } else {
            triggerResult = new ReturnT<String>(ReturnT.FAIL_CODE, null);
        }
 
        / 代码省略
 
        logger.debug(">>>>>>>>>>> xxl-job trigger end, jobId:{}", jobLog.getId());

调用执行器接口

public ReturnT<String> run(TriggerParam triggerParam) {
    return XxlJobRemotingUtil.postBody(addressUrl + "run", accessToken, timeout, triggerParam, String.class);
}

任务执行

admin 会调用执行器的 addressUrl + “run”,这个接口主要用来触发任务执行的

// services mapping
try {
    switch (uri) {
        case "/beat":
            // 类似心跳接口
            return executorBiz.beat();
        case "/idleBeat":
            IdleBeatParam idleBeatParam = GsonTool.fromJson(requestData, IdleBeatParam.class);
            return executorBiz.idleBeat(idleBeatParam);
        case "/run":
            TriggerParam triggerParam = GsonTool.fromJson(requestData, TriggerParam.class);
            return executorBiz.run(triggerParam);
        case "/kill":
            KillParam killParam = GsonTool.fromJson(requestData, KillParam.class);
            return executorBiz.kill(killParam);
        case "/log":
            LogParam logParam = GsonTool.fromJson(requestData, LogParam.class);
            return executorBiz.log(logParam);
        default:
            return new ReturnT<String>(ReturnT.FAIL_CODE, "invalid request, uri-mapping(" + uri + ") not found.");
    }
} catch (Exception e) {
    logger.error(e.getMessage(), e);
    return new ReturnT<String>(ReturnT.FAIL_CODE, "request error:" + ThrowableUtil.toString(e));
}
  • 这里面主要做下面几件事情
    • 根据任务类型,创建不同的 jobHandler
    • 创建一个线程执行 jobHandler,监听参数队列
    • 将任务参数给到 参数队列,jobHandler线程 获取参数
public ReturnT<String> run(TriggerParam triggerParam) {
    // load old:jobHandler + jobThread TODO 查询这个任务的线程,第一次执行任务,是没有这个任务的
    JobThread jobThread = XxlJobExecutor.loadJobThread(triggerParam.getJobId());
    IJobHandler jobHandler = jobThread!=null?jobThread.getHandler():null;
    String removeOldReason = null;
 
     // 根据任务类型,创建不同的 jobHandler
    GlueTypeEnum glueTypeEnum = GlueTypeEnum.match(triggerParam.getGlueType());
    if (GlueTypeEnum.BEAN == glueTypeEnum) {
        //  创建 jobhandler,备注,执行器在启动的时候,会扫描@xxlJob注解修饰的方法,注册到map中,这里直接取出来,IJobHandler就是对jdk的method对象的封装
        IJobHandler newJobHandler = XxlJobExecutor.loadJobHandler(triggerParam.getExecutorHandler());
 
        if (jobThread!=null && jobHandler != newJobHandler) {
            // change handler, need kill old thread
            removeOldReason = "change jobhandler or glue type, and terminate the old job thread.";
 
            jobThread = null;
            jobHandler = null;
        }
 
        if (jobHandler == null) {
            jobHandler = newJobHandler;
            if (jobHandler == null) {
                return new ReturnT<String>(ReturnT.FAIL_CODE, "job handler [" + triggerParam.getExecutorHandler() + "] not found.");
            }
        }
 
    } else if (GlueTypeEnum.GLUE_GROOVY == glueTypeEnum) {
 
        // valid old jobThread
        if (jobThread != null &&
                !(jobThread.getHandler() instanceof GlueJobHandler
                    && ((GlueJobHandler) jobThread.getHandler()).getGlueUpdatetime()==triggerParam.getGlueUpdatetime() )) {
            // change handler or gluesource updated, need kill old thread
            removeOldReason = "change job source or glue type, and terminate the old job thread.";
 
            jobThread = null;
            jobHandler = null;
        }
 
        if (jobHandler == null) {
            try {
                IJobHandler originJobHandler = GlueFactory.getInstance().loadNewInstance(triggerParam.getGlueSource());
                jobHandler = new GlueJobHandler(originJobHandler, triggerParam.getGlueUpdatetime());
            } catch (Exception e) {
                logger.error(e.getMessage(), e);
                return new ReturnT<String>(ReturnT.FAIL_CODE, e.getMessage());
            }
        }
    }  
 
    // 判断堵塞策略
    if (jobThread != null) {
        ExecutorBlockStrategyEnum blockStrategy = ExecutorBlockStrategyEnum.match(triggerParam.getExecutorBlockStrategy(), null);
         // DISCARD_LATER,不能直接执行的话,就直接退出
        if (ExecutorBlockStrategyEnum.DISCARD_LATER == blockStrategy) {
            // discard when running
            if (jobThread.isRunningOrHasQueue()) {
                return new ReturnT<String>(ReturnT.FAIL_CODE, "block strategy effect:"+ExecutorBlockStrategyEnum.DISCARD_LATER.getTitle());
            }
        } else if (ExecutorBlockStrategyEnum.COVER_EARLY == blockStrategy) {
            //替换一个新的 jobThread
            if (jobThread.isRunningOrHasQueue()) {
                removeOldReason = "block strategy effect:" + ExecutorBlockStrategyEnum.COVER_EARLY.getTitle();
 
                jobThread = null;
            }
        } else {
            // just queue trigger
        }
    }
 
    //  jobThread 为null的话,就创建一个,并执行线程,是一个死循环,从 triggerQueue中读取参数,并执行
    if (jobThread == null) {
        // 这里会stop 掉之前旧的存在的 jobThread
        jobThread = XxlJobExecutor.registJobThread(triggerParam.getJobId(), jobHandler, removeOldReason);
    }
 
    //  把参数放在任务队列线程中的 triggerQueue 中
    //  pushTriggerQueue 也会判断是否是重复执行的 job
    ReturnT<String> pushResult = jobThread.pushTriggerQueue(triggerParam);
    return pushResult;
}

结果返回

  • 将结果给:TriggerCallbackThread.pushCallBack
  • TriggerCallbackThread 消费返回的结果
  • admin 接受返回结果,写入到DB
TriggerCallbackThread.pushCallBack(new HandleCallbackParam(
                              triggerParam.getLogId(),
                              triggerParam.getLogDateTime(),
                              XxlJobContext.getXxlJobContext().getHandleCode(),
                              XxlJobContext.getXxlJobContext().getHandleMsg() )

拿到任务的执行结果

@Override
    public void run() {
 
        // normal callback
        while(!toStop){
            try {
                // 拿到任务的执行结果
                HandleCallbackParam callback = getInstance().callBackQueue.take();
                if (callback != null) {
 
                    // callback list param
                    List<HandleCallbackParam> callbackParamList = new ArrayList<HandleCallbackParam>();
                    int drainToNum = getInstance().callBackQueue.drainTo(callbackParamList);
                    callbackParamList.add(callback);
 
                    // callback, will retry if error
                    if (callbackParamList!=null && callbackParamList.size()>0) {
                        // 调用 admin 接口:api/callback 给 admin 
                        doCallback(callbackParamList);
                    }
                }
            } catch (Exception e) {
                if (!toStop) {
                    logger.error(e.getMessage(), e);
                }
            }
        }
 
       // 代码省略
 
    }
});

总结

一个任务的生命周期

在这里插入图片描述

不足之处

  • 待调度的任务存储在一张表中,如果待调度任务过多的时候,可能会造成任务调度延迟
  • 只能选择一个调度策略
    • 执行器服务是优雅关闭,会自动调用 /reigstRomeve方法告诉admin,自己移除,admin会通过定时任务扫描,每90秒扫描一次执行器上次注册到时间,如果超过90s,就主动移除这个执行器异常退出,某些策略会导致任务在90s内一直失败(比如第一个)
  • 分片策略,这个策略是所有的机器都执行相同的参数,由执行器自己区分

一个任务调度系统核心点

  • 数据量大时保证调度时间:分库分表
  • 任务丢失处理:调度器定时扫描执行时间过长的任务

// 任务结果丢失处理:调度记录停留在 "运行中" 状态超过10min,且对应执行器心跳注册失败不在线,则将本地调度主动标记失败;
Date losedTime = DateUtil.addMinutes(new Date(), -10);
List<Long> losedJobIds  = XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().findLostJobIds(losedTime);
  • 任务幂等性:不能重复执行,在执行器记录现在有哪一些任务在执行, 但是,执行器在执行业务代码时也建议最好幂等
public ReturnT<String> pushTriggerQueue(TriggerParam triggerParam) {
    //  triggerLogIdSet 记录了当前执行器正在执行的任务,发现重复时不会写入到 triggerQueue 中
    if (triggerLogIdSet.contains(triggerParam.getLogId())) {
        logger.info(">>>>>>>>>>> repeate trigger job, logId:{}", triggerParam.getLogId());
        return new ReturnT<String>(ReturnT.FAIL_CODE, "repeate trigger job, logId:" + triggerParam.getLogId());
    }
 
    triggerLogIdSet.add(triggerParam.getLogId());
    triggerQueue.add(triggerParam);
    return ReturnT.SUCCESS;
}
  • 调度策略:解决如何分配到哪一个执行器问题,实现负载均衡,分片处理,高容错,节点故障转移作业
    • 实现负载均衡的一个需要执行器上报当前的状态:当前的任务执行数量,待调度的任务数量,读取服务器系统负载水平,根据这些,选择一个综合数值最小的执行器出来
  • 分片处理:分片处理通常需要调度器支持
    • java 实现分片
private static String calculatedExecutorParamValue(XxlJobInfo jobInfo,Integer index,Integer adminTotal) {
    String executorParam = jobInfo.getExecutorParam();
    if(StringUtils.isEmpty(executorParam)){
        return null;
    }
    try {
        List<String> paramList = JSONObject.parseArray(executorParam, String.class);
        if(CollectionUtils.isEmpty(paramList)){
            return null;
        }
        List<List<String>> paramAverageList = averageAssign(paramList, adminTotal);
        List<String> result = paramAverageList.get(index);
        return JSON.toJSONString(result);
    }catch (Exception e){
        logger.error("分片并行计算,解析参数错误,参数:{},错误原因:{}",executorParam,e.getMessage());
    }
    return null;
}
 
 
 
public static <T> List <List<T>> averageAssign(List<T>source,int n){
    List <List<T>> result=new ArrayList<List<T>>();
    int remainder=source.size()%n;  //先计算出余数
    int number=source.size()/n;  //然后是商
    int offset=0;//偏移量(用以标识加的余数)
    for(int i=0;i<n;i++){
        List<T>value;
        if(remainder>0){
            value=source.subList(i*number+offset, (i+1)*number+offset+1);
            remainder--;
            offset++;
        }else{
            value=source.subList(i*number+offset, (i+1)*number+offset);
        }
        result.add(value);
    }
    return result;
}
  • go 实现分片
package main
 
import (
    "fmt"
)
 
func chunk(slice []int, size int) [][]int {
    var chunks [][]int
    chunkSize := (len(slice) + size - 1) / size
    for i := 0; i < len(slice); i += chunkSize {
        end := i + chunkSize
        if end > len(slice) {
            end = len(slice)
        }
        chunks = append(chunks, slice[i:end])
    }
    return chunks
}
 
func main() {
    slice := []int{1, 2, 3, 4, 5}
    chunks := chunk(slice, 2)
    fmt.Println(chunks)
}

任务调度依赖:工作流模式调度依赖,一个有向无环图:DAG(directed acyclic graph)

在这里插入图片描述

package main
 
import (
    "fmt"
    "sync"
    "time"
)
 
//图结构
type DAG struct {
    Vertexs []*Vertex
}
 
//顶点
type Vertex struct {
    Key      string
    Value    interface{}
    Parents  []*Vertex
    Children []*Vertex
}
 
//添加顶点
func (dag *DAG) AddVertex(v *Vertex) {
    dag.Vertexs = append(dag.Vertexs, v)
}
 
//添加边
func (dag *DAG) AddEdge(from, to *Vertex) {
    from.Children = append(from.Children, to)
    to.Parents = append(to.Parents, from)
}
 
func main() {
 
    var dag = &DAG{}
 
    //添加顶点
    va := &Vertex{Key: "a", Value: "1"}
    vb := &Vertex{Key: "b", Value: "2"}
    vc := &Vertex{Key: "c", Value: "3"}
    vd := &Vertex{Key: "d", Value: "4"}
    ve := &Vertex{Key: "e", Value: "5"}
    vf := &Vertex{Key: "f", Value: "6"}
    vg := &Vertex{Key: "g", Value: "7"}
    vh := &Vertex{Key: "h", Value: "8"}
    vi := &Vertex{Key: "i", Value: "9"}
    //添加边
    dag.AddEdge(va, vb)
    dag.AddEdge(va, vc)
    dag.AddEdge(va, vd)
    dag.AddEdge(vb, ve)
    dag.AddEdge(vb, vh)
    dag.AddEdge(vb, vf)
    dag.AddEdge(vc, vf)
    dag.AddEdge(vc, vg)
    dag.AddEdge(vd, vg)
    dag.AddEdge(vh, vi)
    dag.AddEdge(ve, vi)
    dag.AddEdge(vf, vi)
    dag.AddEdge(vg, vi)
 
    //[1] [] { a }
    //[2] [] { b, c, d }
    //[3] [] { h, e, f, g }
    //[4] [] { i }
    all := LayerBFS(va)
    startTime := time.Now()
    for _, layer := range all {
        fmt.Println("------------------")
        doTasks(layer)
    }
    fmt.Printf("cost:%f\n", time.Since(startTime).Seconds())
}
 
type Queue []interface{}
 
func (q *Queue) Push(x interface{}) {
    *q = append(*q, x)
}
 
func (q *Queue) Pop() interface{} {
    h := *q
    var el interface{}
    l := len(h)
    el, *q = h[0], h[1:l]
    return el
}
 
func (q *Queue) Len() int {
    return len(*q)
}
 
func NewQueue() *Queue {
    return &Queue{}
}
 
func LayerBFS(root *Vertex) [][]*Vertex {
    queue := NewQueue()
    queue.Push(root)
    visited := make(map[string]*Vertex)
    all := make([][]*Vertex, 0)
    for queue.Len() > 0 {
        qSize := queue.Len()
        tmp := make([]*Vertex, 0)
        for i := 0; i < qSize; i++ {
            //pop vertex
            e := queue.Pop()
            currVert := e.(*Vertex)
            if _, ok := visited[currVert.Key]; ok {
                continue
            }
            visited[currVert.Key] = currVert
            tmp = append(tmp, currVert)
            for _, val := range currVert.Children {
                if _, ok := visited[val.Key]; !ok {
                    queue.Push(val) //add child
                }
            }
        }
        all = append(all, [][]*Vertex{tmp}...)
    }
    return all
}
 
//并发执行
func doTasks(vertexs []*Vertex) {
    var wg sync.WaitGroup
    startTime := time.Now()
    for _, v := range vertexs {
        wg.Add(1)
        go func(v *Vertex) {
            defer wg.Done()
            time.Sleep(2 * time.Second)
            fmt.Printf("do %v, result is %v \n", v.Key, v.Value)
        }(v) //notice
    }
    wg.Wait()
    fmt.Printf("cost:%0.0f\n", time.Since(startTime).Seconds())
}

文档参考

  • 实现一个任务调度系统,看这篇就够了
  • 伴鱼分布式调度系统 Jarvis 的设计与实现
  • 如何设计一个分布式任务调度系统

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/447323.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

多线程并发编程学习笔记9(小滴课堂)------线程池及Executor框架

它只会使用10个线程。因为我们设置了它的容量。 我们现在把这个队列容量设置为20. 我们可以看到这里它使用了20个线程。但是出了异常&#xff0c;这个后面我们会学习。 我们现在使用一下我们的callable&#xff1a; 一般我们如果是想在线程执行完以后&#xff0c;获得一个返回…

019 - C++ 中的局部静态(local static)

在前几期里&#xff0c;我们了解了static关键字在特定上下文中的含义。 今天我们看一看另一个环境。我们可以在局部作用域中使用 static 来声明一个变量。 这种情况和我们之前看到的两种static有点不同。这次的局部静态 Local static 有更多的含义。 声明一个变量&#xff0…

个人知识库(持续更新中)

打造一个属于自己的知识库 为什么会有这个知识库会记录什么内容基础知识Java核心Java WebMySQL 中间件&工具项目代码资源仿牛客社区Web开发华夏ERP软件 视频资源代码之外持续更新中… 为什么会有这个知识库 作为羊哥的死忠粉&#xff0c;当他谈到个人知识库这个东西的时候…

RS-485 基础知识:何时需要端接,以及如何正确端接

RS-485 网络的许多信号完整性和通信问题都源于端接&#xff0c;这可能是因为缺少端接或端接不正确。在 RS-485 基础知识系列的这一部分&#xff0c;我将讨论何时不需要端接 RS-485 网络&#xff0c;以及在需要端接时如何使用标准&#xff08;并联&#xff09;端接和交流电 (AC)…

【JavaEE】常见的锁策略与CAS的ABA问题

文章目录 1 常见的锁策略1.1 乐观锁与悲观锁1.2 轻量级锁与重量级锁1.3 自旋锁与挂起等待锁1.4 互斥锁与读写锁1.5 可重入锁与不可重入锁1.6 公平锁与非公平锁 2 CAS 操作2.1 CAS 简介2.2 CAS 的应用2.2.1 实现原子类2.2.2 实现自旋锁 3 CAS 的 ABA 问题写在最后 1 常见的锁策略…

Nacos 客户端的服务发现与服务订阅机制的纠缠 - 篇七

Nacos 客户端的服务发现与服务订阅机制的纠缠 - 篇七 历史篇章 &#x1f550;Nacos 客户端服务注册源码分析-篇一 &#x1f551;Nacos 客户端服务注册源码分析-篇二 &#x1f552;Nacos 客户端服务注册源码分析-篇三 &#x1f553;Nacos 服务端服务注册源码分析-篇四 &am…

最新入河排污口设置论证、水质影响预测与模拟、污水处理工艺分析及建设项目入河排污口方案报告书

随着水资源开发利用量不断增大&#xff0c;全国废污水排放量与日俱增&#xff0c;部分河段已远远超出水域纳污能力。近年来,部分沿岸入河排污口设置不合理&#xff0c;超标排污、未经同意私设排污口等问题逐步显现&#xff0c;已威胁到供水安全、水环境安全和水生态安全&#x…

ChatGPT探索系列之五:讨论人工智能伦理问题及ChatGPT的责任

文章目录 前言一、安全二、隐私和道德三、我们应该做什么总结 前言 ChatGPT发展到目前&#xff0c;其实网上已经有大量资料了&#xff0c;博主做个收口&#xff0c;会出一个ChatGPT探索系列的文章&#xff0c;帮助大家深入了解ChatGPT的。整个系列文章会按照一下目标来完成&am…

STM32(十六)正交编码器

一、简介 增量式编码器 增量式编码器也称为正交编码器&#xff0c;是通过两个信号线的脉冲输出来进行数据处理&#xff0c;一个输出脉冲信号就对应于一个增量位移&#xff0c;编码器每转动固定的位移&#xff0c;就会产生一个脉冲信号 通过读取单位时间脉冲信号的数量&#xff…

自动修改文章的软件-自动修改文案原创软件

有没有自动修改文章的软件 修改文章可能是很多人日常工作中必须完成的任务&#xff0c;但一般情况下&#xff0c;这需要人工完成。幸运的是&#xff0c;现在有很多文章修改软件可以帮助我们节省时间和精力。本文将向您介绍一款优秀的修改文章软件&#xff0c;名为147SEO&#…

SLAM论文速递【SLAM—— TwistSLAM:动态环境下的约束SLAM】—4.17(2)

论文信息 题目&#xff1a; Optimization RGB-D 3-D Reconstruction Algorithm Based on Dynamic SLAM 基于动态SLAM的RGB-D三维重建算法优化论文地址&#xff1a; https://ieeexplore.ieee.org/abstract/document/10050782发表期刊&#xff1a; IEEE Transactions on Instru…

Apache POI 实现用Java操作Excel完成读写操作

简介 Apache POI是一个用于操作Microsoft Office格式文件&#xff08;包括xls、docx、xlsx、pptx等&#xff09;的Java API库。POI全称为Poor Obfuscation Implementation&#xff0c;是Apache Software Foundation的一个开源项目。它提供了一组Java API&#xff0c;使得Java程…

LLM_StableDiffusion_studio发布

背景&#xff1a; 从chatgpt发布到现在已经超过半年时间&#xff0c;AGI的势头越来越猛。大家都在做各种的尝试和组合&#xff0c;把chatgpt通用的强大的知识表达和理解能力尝试应用在自己的业务场景。前期也是出现非常多的业务应用&#xff0c;但是主要还是围绕chatgpt本身已…

循环神经网络(RNN)简单介绍—包括TF和PyTorch源码,并给出详细注释

文章目录 循环神经网络&#xff08;RNN&#xff09;入门教程1. 循环神经网络的原理2. 循环神经网络的应用3. 使用keras框架实现循环神经网络3.1导入对应的库及加载数据集3.2.数据预处理3.3定义RNN模型3.4训练模型3.5测试模型 4.使用PyTorch框架实现上述功能—注释详细5.结论 循…

动静态库的制作和使用

动静态库 一&#xff0c;什么是库二&#xff0c;静态库的制作静态库原理 三&#xff0c;动态库的制作四&#xff0c;动态库的配置五&#xff0c;动态库的加载 一&#xff0c;什么是库 &#x1f680;库这个东西我们一直在使用&#xff0c;举个简单了例子&#xff0c;无论你是用…

netplan, NetworkManager, systemd-networkd简介

1、systemd-networkd简介 systemd-networkd是systemd 的一部分 &#xff0c;负责 systemd 生态中的网络配置部分(systemd-networkd.service, systemd-resolved.service)。使用 systemd-networkd&#xff0c;你可以为网络设备配置基础的 DHCP/静态IP网络等&#xff0c;还可以配…

U8W/U8W-Mini使用与常见问题解决

U8W/U8W-Mini使用与常见问题解决 U8WU8W/U8W-mini简介准备工作U8W/U8W-mini在线联机下载U8W/U8W-mini脱机下载第一步&#xff0c;把程序下载到U8W/U8W-mini烧录器中&#xff1a;第二步&#xff0c;用U8W/U8W-mini烧录器给目标单片机脱机下载 U8W/U8W-mini烧录器使用中常见的问题…

初识Linux运维

一.初识Linux 1.Linux系统内核 内核提供了Linux系统的主要功能&#xff0c;如硬件调度管理的能力。 Linux内核是免费开源的&#xff0c;任何人都可以查看内核的源代码&#xff0c;甚至是贡献源代码。 2.Linux系统发行版 内核无法被用户直接使用&#xff0c;需要配合应用程…

淘宝iOS拍立淘微距能力探索与实现

画面模糊问题的源头也是来自用户的微距体验不佳&#xff0c;我们对问题深入分析&#xff0c;适当拆解。通过 Apple Development Doc 的查阅及实践&#xff0c;一步步抽丝剥茧&#xff0c;最终完美解决用户的体验痛点&#xff0c;也为我们自身沉淀了展示微距的能力。 前言 在最近…

Unix和Linux

UNIX 诞生于 20 世纪 60 年代末 Windows 诞生于 20 世纪 80 年代中期 Linux 诞生于 20 世纪 90 年代初 1965 年&#xff0c;贝尔实验室、美国麻省理工学院和通用电气公司联合发起了Multics 工程计划&#xff0c;目标是开发一种交互式的、具有多道程序处理能力的分时操作系统&a…