xxl-job源码学习笔记

news2024/11/25 2:37:03

文章目录

  • 一、简介
  • 二、下载源码
  • 三、模块介绍
  • 四、源码解析
    • 4.1、调度中心启动流程(xxl-job-admin)
      • 4.1.1、JobTriggerPoolHelper(触发任务执行的核心组件)
      • 4.1.2、JobRegistryHelper(维护和更新调度中心与执行器之间的注册信息)
      • 4.1.3、JobFailMonitorHelper(监控任务执行的失败情况)
      • 4.1.4、JobCompleteHelper(处理任务完成后的相关操作)
      • 4.1.5、JobLogReportHelper(收集和整理任务执行的日志信息)
      • 4.1.6、JobScheduleHelper(任务调度)(重点!!!)
    • 4.2、执行器启动流程(xxl-job-core)
      • 4.2.1、initJobHandlerMethodRepository(初始化任务处理器(Job Handler)的映射关系)
      • 4.2.2、GlueFactory.refreshInstance(1)(加载Glue代码工厂实例,用于做运行中动态代码加载)
      • 4.2.3、super.start()(核心启动方法)
        • 4.2.3.1、XxlJobFileAppender.initLogPath(logPath)(初始化日志目录)
        • 4.2.3.2、initAdminBizList(adminAddresses, accessToken)(初始化调度中心的地址)
        • 4.2.3.3、JobLogFileCleanThread.getInstance().start(logRetentionDays)(启动日志文件清理线程)
        • 4.2.3.4、TriggerCallbackThread.getInstance().start()(回调调度中心任务的执行结果)
        • 4.2.3.5、initEmbedServer(address, ip, port, appname, accessToken)(执行内嵌服务)
    • 4.3、任务调用全流程
      • 4.3.1、xxl-job-admin调度中心部分
      • 4.3.2、xxl-job-core执行器部分
  • 五、总结

一、简介

本文用于学习xxl-job源码,版本是当前最新版本2.4.1

二、下载源码

我这里是在码云上下载的,地址是:https://gitee.com/xuxueli0323/xxl-job

三、模块介绍

四、源码解析

具体怎么使用我这边就不多赘述了,官方文档写的非常详细,本文还是着重讲解源码

4.1、调度中心启动流程(xxl-job-admin)

首先找到配置类 XxlJobAdminConfig

image-20240726162457798

可以看到实现了 InitializingBean 接口,这里不展开,只需要知道,其实现方法 afterPropertiesSet 会在启动时执行即可

@Override
public void afterPropertiesSet() throws Exception {
    adminConfig = this;

    xxlJobScheduler = new XxlJobScheduler();
    xxlJobScheduler.init();
}

这里创建了 XxlJobScheduler(xxl-job调度器),继续看它怎么初始化的

public void init() throws Exception {
    // init i18n
    initI18n();

    // admin trigger pool start
    JobTriggerPoolHelper.toStart();

    // admin registry monitor run
    JobRegistryHelper.getInstance().start();

    // admin fail-monitor run
    JobFailMonitorHelper.getInstance().start();

    // admin lose-monitor run ( depend on JobTriggerPoolHelper )
    JobCompleteHelper.getInstance().start();

    // admin log report start
    JobLogReportHelper.getInstance().start();

    // start-schedule  ( depend on JobTriggerPoolHelper )
    JobScheduleHelper.getInstance().start();

    logger.info(">>>>>>>>> init xxl-job admin success.");
}
  • JobTriggerPoolHelper:用于触发任务执行的核心组件
  • JobRegistryHelper:负责维护和更新调度中心与执行器之间的注册信息
  • JobFailMonitorHelper:负责监控任务执行的失败情况
  • JobCompleteHelper:用于处理任务完成后的相关操作
  • JobLogReportHelper:负责收集和整理任务执行的日志信息
  • JobScheduleHelper:负责任务调度

我们逐个看下

4.1.1、JobTriggerPoolHelper(触发任务执行的核心组件)

JobTriggerPoolHelper.toStart(); 方法

public static void toStart() {
    helper.start();
}

public void start(){
    fastTriggerPool = new ThreadPoolExecutor(
        10,
        XxlJobAdminConfig.getAdminConfig().getTriggerPoolFastMax(),
        60L,
        TimeUnit.SECONDS,
        new LinkedBlockingQueue<Runnable>(1000),
        new ThreadFactory() {
            @Override
            public Thread newThread(Runnable r) {
                return new Thread(r, "xxl-job, admin JobTriggerPoolHelper-fastTriggerPool-" + r.hashCode());
            }
        });

    slowTriggerPool = new ThreadPoolExecutor(
        10,
        XxlJobAdminConfig.getAdminConfig().getTriggerPoolSlowMax(),
        60L,
        TimeUnit.SECONDS,
        new LinkedBlockingQueue<Runnable>(2000),
        new ThreadFactory() {
            @Override
            public Thread newThread(Runnable r) {
                return new Thread(r, "xxl-job, admin JobTriggerPoolHelper-slowTriggerPool-" + r.hashCode());
            }
        });
}

这里创建了两个线程池,一个叫 fastTriggerPool,一个叫 slowTriggerPool,看名字就知道,一个快一个慢,继续看,这里还提供了一个 trigger 方法供其他地方调用,我们看下

/**
 * @param jobId
 * @param triggerType
 * @param failRetryCount
 *        >=0: use this param
 *        <0: use param from job info config
 * @param executorShardingParam
 * @param executorParam
 *          null: use job param
 *          not null: cover job param
 */
public static void trigger(int jobId, TriggerTypeEnum triggerType, int failRetryCount, String executorShardingParam, String executorParam, String addressList) {
    helper.addTrigger(jobId, triggerType, failRetryCount, executorShardingParam, executorParam, addressList);
}

public void addTrigger(final int jobId,
                           final TriggerTypeEnum triggerType,
                           final int failRetryCount,
                           final String executorShardingParam,
                           final String executorParam,
                           final String addressList) {

    // choose thread pool
    ThreadPoolExecutor triggerPool_ = fastTriggerPool;
    AtomicInteger jobTimeoutCount = jobTimeoutCountMap.get(jobId);
    if (jobTimeoutCount!=null && jobTimeoutCount.get() > 10) {      // job-timeout 10 times in 1 min
        triggerPool_ = slowTriggerPool;
    }

    // trigger
    triggerPool_.execute(new Runnable() {
        @Override
        public void run() {

            long start = System.currentTimeMillis();

            try {
                // do trigger
                XxlJobTrigger.trigger(jobId, triggerType, failRetryCount, executorShardingParam, executorParam, addressList);
            } catch (Exception e) {
                logger.error(e.getMessage(), e);
            } finally {

                // check timeout-count-map
                long minTim_now = System.currentTimeMillis()/60000;
                if (minTim != minTim_now) {
                    minTim = minTim_now;
                    jobTimeoutCountMap.clear();
                }

                // incr timeout-count-map
                long cost = System.currentTimeMillis()-start;
                if (cost > 500) {       // ob-timeout threshold 500ms
                    AtomicInteger timeoutCount = jobTimeoutCountMap.putIfAbsent(jobId, new AtomicInteger(1));
                    if (timeoutCount != null) {
                        timeoutCount.incrementAndGet();
                    }
                }

            }

        }
    });
}

这段代码还是比较好理解的,用 jobTimeoutCountMap 来记录每个job执行时间超过500ms的次数,超过10次,就从快的线程池挪到慢的线程池执行,每60s清空一次 jobTimeoutCountMap,防止在慢的线程池中执行的job永无翻身之日,真正执行的是 XxlJobTrigger.trigger 方法,这个留到下面讲调用job的时候再说

4.1.2、JobRegistryHelper(维护和更新调度中心与执行器之间的注册信息)

JobRegistryHelper.getInstance().start(); 方法

public void start(){

    // for registry or remove
    registryOrRemoveThreadPool = new ThreadPoolExecutor(
          2,
          10,
          30L,
          TimeUnit.SECONDS,
          new LinkedBlockingQueue<Runnable>(2000),
          new ThreadFactory() {
             @Override
             public Thread newThread(Runnable r) {
                return new Thread(r, "xxl-job, admin JobRegistryMonitorHelper-registryOrRemoveThreadPool-" + r.hashCode());
             }
          },
          new RejectedExecutionHandler() {
             @Override
             public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
                r.run();
                logger.warn(">>>>>>>>>>> xxl-job, registry or remove too fast, match threadpool rejected handler(run now).");
             }
          });

    // for monitor
    registryMonitorThread = new Thread(new Runnable() {
       @Override
       public void run() {
          while (!toStop) {
             try {
                // auto registry group
                List<XxlJobGroup> groupList = XxlJobAdminConfig.getAdminConfig().getXxlJobGroupDao().findByAddressType(0);
                if (groupList!=null && !groupList.isEmpty()) {

                   // remove dead address (admin/executor)
                   List<Integer> ids = XxlJobAdminConfig.getAdminConfig().getXxlJobRegistryDao().findDead(RegistryConfig.DEAD_TIMEOUT, new Date());
                   if (ids!=null && ids.size()>0) {
                      XxlJobAdminConfig.getAdminConfig().getXxlJobRegistryDao().removeDead(ids);
                   }

                   // fresh online address (admin/executor)
                   HashMap<String, List<String>> appAddressMap = new HashMap<String, List<String>>();
                   List<XxlJobRegistry> list = XxlJobAdminConfig.getAdminConfig().getXxlJobRegistryDao().findAll(RegistryConfig.DEAD_TIMEOUT, new Date());
                   if (list != null) {
                      for (XxlJobRegistry item: list) {
                         if (RegistryConfig.RegistType.EXECUTOR.name().equals(item.getRegistryGroup())) {
                            String appname = item.getRegistryKey();
                            List<String> registryList = appAddressMap.get(appname);
                            if (registryList == null) {
                               registryList = new ArrayList<String>();
                            }

                            if (!registryList.contains(item.getRegistryValue())) {
                               registryList.add(item.getRegistryValue());
                            }
                            appAddressMap.put(appname, registryList);
                         }
                      }
                   }

                   // fresh group address
                   for (XxlJobGroup group: groupList) {
                      List<String> registryList = appAddressMap.get(group.getAppname());
                      String addressListStr = null;
                      if (registryList!=null && !registryList.isEmpty()) {
                         Collections.sort(registryList);
                         StringBuilder addressListSB = new StringBuilder();
                         for (String item:registryList) {
                            addressListSB.append(item).append(",");
                         }
                         addressListStr = addressListSB.toString();
                         addressListStr = addressListStr.substring(0, addressListStr.length()-1);
                      }
                      group.setAddressList(addressListStr);
                      group.setUpdateTime(new Date());

                      XxlJobAdminConfig.getAdminConfig().getXxlJobGroupDao().update(group);
                   }
                }
             } catch (Exception e) {
                if (!toStop) {
                   logger.error(">>>>>>>>>>> xxl-job, job registry monitor thread error:{}", e);
                }
             }
             try {
                TimeUnit.SECONDS.sleep(RegistryConfig.BEAT_TIMEOUT);
             } catch (InterruptedException e) {
                if (!toStop) {
                   logger.error(">>>>>>>>>>> xxl-job, job registry monitor thread error:{}", e);
                }
             }
          }
          logger.info(">>>>>>>>>>> xxl-job, job registry monitor thread stop");
       }
    });
    registryMonitorThread.setDaemon(true);
    registryMonitorThread.setName("xxl-job, admin JobRegistryMonitorHelper-registryMonitorThread");
    registryMonitorThread.start();
}
  • 先是初始化了注册或删除线程池(registryOrRemoveThreadPool),用来执行注册或者删除任务的
  • 然后创建了一个注册监控线程(registryMonitorThread),并启动

先看 registryOrRemoveThreadPoolJobRegistryHelper 暴露了两个接口给外部调用,registryregistryRemove,一个用于注册,一个用于删除,都是讲任务丢到 registryOrRemoveThreadPool 线程池里执行

public ReturnT<String> registry(RegistryParam registryParam) {

    // valid
    if (!StringUtils.hasText(registryParam.getRegistryGroup())
          || !StringUtils.hasText(registryParam.getRegistryKey())
          || !StringUtils.hasText(registryParam.getRegistryValue())) {
       return new ReturnT<String>(ReturnT.FAIL_CODE, "Illegal Argument.");
    }

    // async execute
    registryOrRemoveThreadPool.execute(new Runnable() {
       @Override
       public void run() {
          int ret = XxlJobAdminConfig.getAdminConfig().getXxlJobRegistryDao().registryUpdate(registryParam.getRegistryGroup(), registryParam.getRegistryKey(), registryParam.getRegistryValue(), new Date());
          if (ret < 1) {
             XxlJobAdminConfig.getAdminConfig().getXxlJobRegistryDao().registrySave(registryParam.getRegistryGroup(), registryParam.getRegistryKey(), registryParam.getRegistryValue(), new Date());

             // fresh
             freshGroupRegistryInfo(registryParam);
          }
       }
    });

    return ReturnT.SUCCESS;
}

public ReturnT<String> registryRemove(RegistryParam registryParam) {

    // valid
    if (!StringUtils.hasText(registryParam.getRegistryGroup())
          || !StringUtils.hasText(registryParam.getRegistryKey())
          || !StringUtils.hasText(registryParam.getRegistryValue())) {
       return new ReturnT<String>(ReturnT.FAIL_CODE, "Illegal Argument.");
    }

    // async execute
    registryOrRemoveThreadPool.execute(new Runnable() {
       @Override
       public void run() {
          int ret = XxlJobAdminConfig.getAdminConfig().getXxlJobRegistryDao().registryDelete(registryParam.getRegistryGroup(), registryParam.getRegistryKey(), registryParam.getRegistryValue());
          if (ret > 0) {
             // fresh
             freshGroupRegistryInfo(registryParam);
          }
       }
    });

    return ReturnT.SUCCESS;
}

很简单,就是操作 xxl_job_registry 表的增删改

再来看 registryMonitorThread

while (!toStop) {
    try {
        // auto registry group
        // 找到自动注册的信息
        List<XxlJobGroup> groupList = XxlJobAdminConfig.getAdminConfig().getXxlJobGroupDao().findByAddressType(0);
        if (groupList!=null && !groupList.isEmpty()) {

            // remove dead address (admin/executor)
            // 找到xxl_job_registry里超时的address,并删除
            List<Integer> ids = XxlJobAdminConfig.getAdminConfig().getXxlJobRegistryDao().findDead(RegistryConfig.DEAD_TIMEOUT, new Date());
            if (ids!=null && ids.size()>0) {
                XxlJobAdminConfig.getAdminConfig().getXxlJobRegistryDao().removeDead(ids);
            }

            // fresh online address (admin/executor)
            // 刷新在线的address
            HashMap<String, List<String>> appAddressMap = new HashMap<String, List<String>>();
            List<XxlJobRegistry> list = XxlJobAdminConfig.getAdminConfig().getXxlJobRegistryDao().findAll(RegistryConfig.DEAD_TIMEOUT, new Date());
            if (list != null) {
                // 把同一个appname的address归类
                for (XxlJobRegistry item: list) {
                    if (RegistryConfig.RegistType.EXECUTOR.name().equals(item.getRegistryGroup())) {
                        String appname = item.getRegistryKey();
                        List<String> registryList = appAddressMap.get(appname);
                        if (registryList == null) {
                            registryList = new ArrayList<String>();
                        }

                        if (!registryList.contains(item.getRegistryValue())) {
                            registryList.add(item.getRegistryValue());
                        }
                        appAddressMap.put(appname, registryList);
                    }
                }
            }

            // fresh group address
            for (XxlJobGroup group: groupList) {
                // 把address用逗号拼接起来
                List<String> registryList = appAddressMap.get(group.getAppname());
                String addressListStr = null;
                if (registryList!=null && !registryList.isEmpty()) {
                    Collections.sort(registryList);
                    StringBuilder addressListSB = new StringBuilder();
                    for (String item:registryList) {
                        addressListSB.append(item).append(",");
                    }
                    addressListStr = addressListSB.toString();
                    addressListStr = addressListStr.substring(0, addressListStr.length()-1);
                }
                group.setAddressList(addressListStr);
                group.setUpdateTime(new Date());
				
                //更新addressList
                XxlJobAdminConfig.getAdminConfig().getXxlJobGroupDao().update(group);
            }
        }
    } catch (Exception e) {
        if (!toStop) {
            logger.error(">>>>>>>>>>> xxl-job, job registry monitor thread error:{}", e);
        }
    }
    try {
        // 睡30秒
        TimeUnit.SECONDS.sleep(RegistryConfig.BEAT_TIMEOUT);
    } catch (InterruptedException e) {
        if (!toStop) {
            logger.error(">>>>>>>>>>> xxl-job, job registry monitor thread error:{}", e);
        }
    }
}
logger.info(">>>>>>>>>>> xxl-job, job registry monitor thread stop");

就是每30秒执行一次,看 xxl_job_registry 表里的注册信息,刷新 xxl_job_group 表里的 address_list

4.1.3、JobFailMonitorHelper(监控任务执行的失败情况)

JobFailMonitorHelper.getInstance().start(); 方法

public void start(){
    monitorThread = new Thread(new Runnable() {

       @Override
       public void run() {

          // monitor
          while (!toStop) {
             try {
                 
				//查询失败job的id
                List<Long> failLogIds = XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().findFailJobLogIds(1000);
                if (failLogIds!=null && !failLogIds.isEmpty()) {
                   for (long failLogId: failLogIds) {

                      // lock log
                      // 更改预警状态为锁定
                      int lockRet = XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().updateAlarmStatus(failLogId, 0, -1);
                      if (lockRet < 1) {
                         continue;
                      }
                      XxlJobLog log = XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().load(failLogId);
                      XxlJobInfo info = XxlJobAdminConfig.getAdminConfig().getXxlJobInfoDao().loadById(log.getJobId());

                      // 1、fail retry monitor
                      // 重新触发任务
                      if (log.getExecutorFailRetryCount() > 0) {
                         JobTriggerPoolHelper.trigger(log.getJobId(), TriggerTypeEnum.RETRY, (log.getExecutorFailRetryCount()-1), log.getExecutorShardingParam(), log.getExecutorParam(), null);
                         String retryMsg = "<br><br><span style=\"color:#F39C12;\" > >>>>>>>>>>>"+ I18nUtil.getString("jobconf_trigger_type_retry") +"<<<<<<<<<<< </span><br>";
                         log.setTriggerMsg(log.getTriggerMsg() + retryMsg);
                         XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().updateTriggerInfo(log);
                      }

                      // 2、fail alarm monitor
                      // 执行告警
                      int newAlarmStatus = 0;       // 告警状态:0-默认、-1=锁定状态、1-无需告警、2-告警成功、3-告警失败
                      if (info != null) {
                         boolean alarmResult = XxlJobAdminConfig.getAdminConfig().getJobAlarmer().alarm(info, log);
                         newAlarmStatus = alarmResult?2:3;
                      } else {
                         newAlarmStatus = 1;
                      }

                      XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().updateAlarmStatus(failLogId, -1, newAlarmStatus);
                   }
                }

             } catch (Exception e) {
                if (!toStop) {
                   logger.error(">>>>>>>>>>> xxl-job, job fail monitor thread error:{}", e);
                }
             }

                   try {
                       TimeUnit.SECONDS.sleep(10);
                   } catch (Exception e) {
                       if (!toStop) {
                           logger.error(e.getMessage(), e);
                       }
                   }

               }

          logger.info(">>>>>>>>>>> xxl-job, job fail monitor thread stop");

       }
    });
    monitorThread.setDaemon(true);
    monitorThread.setName("xxl-job, admin JobFailMonitorHelper");
    monitorThread.start();
}

创建一个监控线程(monitorThread)并启动,我们看看 run 方法

先是从 xxl_job_log 表找到失败log的id,每次最多1000条,然后循环,更改预警状态为锁定,查询log的信息,查询job的信息,看如果失败重试次数大于0,就调用 JobTriggerPoolHelper.trigger 重新执行job,并更新log信息,然后开始告警,这个告警 xxl-job 有个默认的邮件告警,EmailJobAlarm,如果我们想要自己写告警,可以写一个类,实现 JobAlarm 接口,实现 doAlarm 方法(具体可以参照它的这个 EmailJobAlarm),最后根据预警的结果,更新 xxl_job_logalarm_status 状态,每10秒执行一次

4.1.4、JobCompleteHelper(处理任务完成后的相关操作)

JobCompleteHelper.getInstance().start(); 方法

public void start(){

    // for callback
    callbackThreadPool = new ThreadPoolExecutor(
          2,
          20,
          30L,
          TimeUnit.SECONDS,
          new LinkedBlockingQueue<Runnable>(3000),
          new ThreadFactory() {
             @Override
             public Thread newThread(Runnable r) {
                return new Thread(r, "xxl-job, admin JobLosedMonitorHelper-callbackThreadPool-" + r.hashCode());
             }
          },
          new RejectedExecutionHandler() {
             @Override
             public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
                r.run();
                logger.warn(">>>>>>>>>>> xxl-job, callback too fast, match threadpool rejected handler(run now).");
             }
          });


    // for monitor
    monitorThread = new Thread(new Runnable() {

       @Override
       public void run() {

          // wait for JobTriggerPoolHelper-init
          try {
             TimeUnit.MILLISECONDS.sleep(50);
          } catch (InterruptedException e) {
             if (!toStop) {
                logger.error(e.getMessage(), e);
             }
          }

          // monitor
          while (!toStop) {
             try {
                // 任务结果丢失处理:调度记录停留在 "运行中" 状态超过10min,且对应执行器心跳注册失败不在线,则将本地调度主动标记失败;
                Date losedTime = DateUtil.addMinutes(new Date(), -10);
                List<Long> losedJobIds  = XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().findLostJobIds(losedTime);

                if (losedJobIds!=null && losedJobIds.size()>0) {
                   for (Long logId: losedJobIds) {

                      XxlJobLog jobLog = new XxlJobLog();
                      jobLog.setId(logId);

                      jobLog.setHandleTime(new Date());
                      jobLog.setHandleCode(ReturnT.FAIL_CODE);
                      jobLog.setHandleMsg( I18nUtil.getString("joblog_lost_fail") );

                      XxlJobCompleter.updateHandleInfoAndFinish(jobLog);
                   }

                }
             } catch (Exception e) {
                if (!toStop) {
                   logger.error(">>>>>>>>>>> xxl-job, job fail monitor thread error:{}", e);
                }
             }

                   try {
                       TimeUnit.SECONDS.sleep(60);
                   } catch (Exception e) {
                       if (!toStop) {
                           logger.error(e.getMessage(), e);
                       }
                   }

               }

          logger.info(">>>>>>>>>>> xxl-job, JobLosedMonitorHelper stop");

       }
    });
    monitorThread.setDaemon(true);
    monitorThread.setName("xxl-job, admin JobLosedMonitorHelper");
    monitorThread.start();
}
  • 先是初始化了回调线程池(callbackThreadPool),是用来执行job回调的
  • 然后创建了一个监控线程(monitorThread),用来处理任务结果丢失的

先看 callbackThreadPoolJobCompleteHelper 暴露了一个接口给外部调用,callback 方法,最终就是丢到回调线程池(callbackThreadPool)中执行,这个我们后面调用的地方再讲

重点看下监控线程(monitorThread),其实就是调度记录停留在 “运行中” 状态超过10min,且对应执行器心跳注册失败不在线的,需要把状态标记成失败

4.1.5、JobLogReportHelper(收集和整理任务执行的日志信息)

这个没什么好讲的,就是将日志整理成报表(用于 xxl-job-admin 首页展示),还有就是将一些过期的日志删除

4.1.6、JobScheduleHelper(任务调度)(重点!!!)

JobScheduleHelper.getInstance().start(); 方法

public void start(){

    // schedule thread
    scheduleThread = new Thread(new Runnable() {
        @Override
        public void run() {

            try {
                TimeUnit.MILLISECONDS.sleep(5000 - System.currentTimeMillis()%1000 );
            } catch (InterruptedException e) {
                if (!scheduleThreadToStop) {
                    logger.error(e.getMessage(), e);
                }
            }
            logger.info(">>>>>>>>> init xxl-job admin scheduler success.");

            // pre-read count: treadpool-size * trigger-qps (each trigger cost 50ms, qps = 1000/50 = 20)
            int preReadCount = (XxlJobAdminConfig.getAdminConfig().getTriggerPoolFastMax() + XxlJobAdminConfig.getAdminConfig().getTriggerPoolSlowMax()) * 20;

            while (!scheduleThreadToStop) {

                // Scan Job
                long start = System.currentTimeMillis();

                Connection conn = null;
                Boolean connAutoCommit = null;
                PreparedStatement preparedStatement = null;

                boolean preReadSuc = true;
                try {

                    conn = XxlJobAdminConfig.getAdminConfig().getDataSource().getConnection();
                    connAutoCommit = conn.getAutoCommit();
                    conn.setAutoCommit(false);

                    preparedStatement = conn.prepareStatement(  "select * from xxl_job_lock where lock_name = 'schedule_lock' for update" );
                    preparedStatement.execute();

                    // tx start

                    // 1、pre read
                    long nowTime = System.currentTimeMillis();
                    List<XxlJobInfo> scheduleList = XxlJobAdminConfig.getAdminConfig().getXxlJobInfoDao().scheduleJobQuery(nowTime + PRE_READ_MS, preReadCount);
                    if (scheduleList!=null && scheduleList.size()>0) {
                        // 2、push time-ring
                        for (XxlJobInfo jobInfo: scheduleList) {

                            // time-ring jump
                            if (nowTime > jobInfo.getTriggerNextTime() + PRE_READ_MS) {
                                // 2.1、trigger-expire > 5s:pass && make next-trigger-time
                                logger.warn(">>>>>>>>>>> xxl-job, schedule misfire, jobId = " + jobInfo.getId());

                                // 1、misfire match
                                MisfireStrategyEnum misfireStrategyEnum = MisfireStrategyEnum.match(jobInfo.getMisfireStrategy(), MisfireStrategyEnum.DO_NOTHING);
                                if (MisfireStrategyEnum.FIRE_ONCE_NOW == misfireStrategyEnum) {
                                    // FIRE_ONCE_NOW 》 trigger
                                    JobTriggerPoolHelper.trigger(jobInfo.getId(), TriggerTypeEnum.MISFIRE, -1, null, null, null);
                                    logger.debug(">>>>>>>>>>> xxl-job, schedule push trigger : jobId = " + jobInfo.getId() );
                                }

                                // 2、fresh next
                                refreshNextValidTime(jobInfo, new Date());

                            } else if (nowTime > jobInfo.getTriggerNextTime()) {
                                // 2.2、trigger-expire < 5s:direct-trigger && make next-trigger-time

                                // 1、trigger
                                JobTriggerPoolHelper.trigger(jobInfo.getId(), TriggerTypeEnum.CRON, -1, null, null, null);
                                logger.debug(">>>>>>>>>>> xxl-job, schedule push trigger : jobId = " + jobInfo.getId() );

                                // 2、fresh next
                                refreshNextValidTime(jobInfo, new Date());

                                // next-trigger-time in 5s, pre-read again
                                if (jobInfo.getTriggerStatus()==1 && nowTime + PRE_READ_MS > jobInfo.getTriggerNextTime()) {

                                    // 1、make ring second
                                    int ringSecond = (int)((jobInfo.getTriggerNextTime()/1000)%60);

                                    // 2、push time ring
                                    pushTimeRing(ringSecond, jobInfo.getId());

                                    // 3、fresh next
                                    refreshNextValidTime(jobInfo, new Date(jobInfo.getTriggerNextTime()));

                                }

                            } else {
                                // 2.3、trigger-pre-read:time-ring trigger && make next-trigger-time

                                // 1、make ring second
                                int ringSecond = (int)((jobInfo.getTriggerNextTime()/1000)%60);

                                // 2、push time ring
                                pushTimeRing(ringSecond, jobInfo.getId());

                                // 3、fresh next
                                refreshNextValidTime(jobInfo, new Date(jobInfo.getTriggerNextTime()));

                            }

                        }

                        // 3、update trigger info
                        for (XxlJobInfo jobInfo: scheduleList) {
                            XxlJobAdminConfig.getAdminConfig().getXxlJobInfoDao().scheduleUpdate(jobInfo);
                        }

                    } else {
                        preReadSuc = false;
                    }

                    // tx stop


                } catch (Exception e) {
                    if (!scheduleThreadToStop) {
                        logger.error(">>>>>>>>>>> xxl-job, JobScheduleHelper#scheduleThread error:{}", e);
                    }
                } finally {

                    // commit
                    if (conn != null) {
                        try {
                            conn.commit();
                        } catch (SQLException e) {
                            if (!scheduleThreadToStop) {
                                logger.error(e.getMessage(), e);
                            }
                        }
                        try {
                            conn.setAutoCommit(connAutoCommit);
                        } catch (SQLException e) {
                            if (!scheduleThreadToStop) {
                                logger.error(e.getMessage(), e);
                            }
                        }
                        try {
                            conn.close();
                        } catch (SQLException e) {
                            if (!scheduleThreadToStop) {
                                logger.error(e.getMessage(), e);
                            }
                        }
                    }

                    // close PreparedStatement
                    if (null != preparedStatement) {
                        try {
                            preparedStatement.close();
                        } catch (SQLException e) {
                            if (!scheduleThreadToStop) {
                                logger.error(e.getMessage(), e);
                            }
                        }
                    }
                }
                long cost = System.currentTimeMillis()-start;


                // Wait seconds, align second
                if (cost < 1000) {  // scan-overtime, not wait
                    try {
                        // pre-read period: success > scan each second; fail > skip this period;
                        TimeUnit.MILLISECONDS.sleep((preReadSuc?1000:PRE_READ_MS) - System.currentTimeMillis()%1000);
                    } catch (InterruptedException e) {
                        if (!scheduleThreadToStop) {
                            logger.error(e.getMessage(), e);
                        }
                    }
                }

            }

            logger.info(">>>>>>>>>>> xxl-job, JobScheduleHelper#scheduleThread stop");
        }
    });
    scheduleThread.setDaemon(true);
    scheduleThread.setName("xxl-job, admin JobScheduleHelper#scheduleThread");
    scheduleThread.start();


    // ring thread
    ringThread = new Thread(new Runnable() {
        @Override
        public void run() {

            while (!ringThreadToStop) {

                // align second
                try {
                    TimeUnit.MILLISECONDS.sleep(1000 - System.currentTimeMillis() % 1000);
                } catch (InterruptedException e) {
                    if (!ringThreadToStop) {
                        logger.error(e.getMessage(), e);
                    }
                }

                try {
                    // second data
                    List<Integer> ringItemData = new ArrayList<>();
                    int nowSecond = Calendar.getInstance().get(Calendar.SECOND);   // 避免处理耗时太长,跨过刻度,向前校验一个刻度;
                    for (int i = 0; i < 2; i++) {
                        List<Integer> tmpData = ringData.remove( (nowSecond+60-i)%60 );
                        if (tmpData != null) {
                            ringItemData.addAll(tmpData);
                        }
                    }

                    // ring trigger
                    logger.debug(">>>>>>>>>>> xxl-job, time-ring beat : " + nowSecond + " = " + Arrays.asList(ringItemData) );
                    if (ringItemData.size() > 0) {
                        // do trigger
                        for (int jobId: ringItemData) {
                            // do trigger
                            JobTriggerPoolHelper.trigger(jobId, TriggerTypeEnum.CRON, -1, null, null, null);
                        }
                        // clear
                        ringItemData.clear();
                    }
                } catch (Exception e) {
                    if (!ringThreadToStop) {
                        logger.error(">>>>>>>>>>> xxl-job, JobScheduleHelper#ringThread error:{}", e);
                    }
                }
            }
            logger.info(">>>>>>>>>>> xxl-job, JobScheduleHelper#ringThread stop");
        }
    });
    ringThread.setDaemon(true);
    ringThread.setName("xxl-job, admin JobScheduleHelper#ringThread");
    ringThread.start();
}
  • 先是创建了一个调度线程(scheduleThread),并启动
  • 然后创建了一个ring线程(ringThread),并启动

先来看调度线程(scheduleThread),首先会利用mysql的for update加锁,然后查询出下次执行时间在未来5秒以内的所有任务,最多查6000条(快线程池最大线程数+慢线程池最大线程数,之后在乘以20,为什么是20,因为50ms处理一个,1秒处理20个),然后就对查出来的任务进行分类处理,分为一下几类:

  • if (nowTime > jobInfo.getTriggerNextTime() + PRE_READ_MS) (过期超过5秒的)
  • else if (nowTime > jobInfo.getTriggerNextTime())(过期5秒内的)
  • else(未来5秒内的)

针对不同的情况,有不同的处理方式,这里直接画个图,清楚一些

xxl-job执行图-new

总结一下就是,过期超过5s的按照job配置的过期策略执行,过期5s内的立即执行,并且看下次执行时间是不是在5s内,在的话,放到ringData中等待ring线程处理,如果是在未来5s内的这种正常情况,就直接放到ringData中等待ring线程处理

那么ring线程(ringThread)是怎么个处理方式,我们继续看看,首先ringData,其实就是个 ConcurrentHashMap,根据秒数刻度分类存储,然后再通过ringThread处理,处理的时候,为了避免处理时间长,往前多取了一个刻度

image-20240729173122242

另外注意 JobScheduleHelper 停的时候,需要等 ringThread 处理完

4.2、执行器启动流程(xxl-job-core)

从xxl-job执行器模块(xxl-job-executor-sample-springboot)的样例中,我们知道需要注册一个bean

image-20240730144428406

我们看看这个 XxlJobSpringExecutor,这个类实现了 SmartInitializingSingleton 接口,他的 afterSingletonsInstantiated 方法是在Spring容器中所有的bean加载完成后才回执行(为了保证 JobHandler 已经加载到Spring容器中)

@Override
public void afterSingletonsInstantiated() {

    // init JobHandler Repository
    /*initJobHandlerRepository(applicationContext);*/

    // init JobHandler Repository (for method)
    initJobHandlerMethodRepository(applicationContext);

    // refresh GlueFactory
    GlueFactory.refreshInstance(1);

    // super start
    try {
        super.start();
    } catch (Exception e) {
        throw new RuntimeException(e);
    }
}

主要做了一下三件事:

  1. **initJobHandlerMethodRepository(applicationContext):**初始化任务处理器(Job Handler)的映射关系
  2. **GlueFactory.refreshInstance(1):**加载Glue代码工厂实例,用于做运行中动态代码加载
  3. **super.start():**核心启动方法

4.2.1、initJobHandlerMethodRepository(初始化任务处理器(Job Handler)的映射关系)

先看下 initJobHandlerMethodRepository 方法

//XxlJobSpringExecutor.java
private void initJobHandlerMethodRepository(ApplicationContext applicationContext) {
    if (applicationContext == null) {
        return;
    }
    // init job handler from method
    String[] beanDefinitionNames = applicationContext.getBeanNamesForType(Object.class, false, true);
    for (String beanDefinitionName : beanDefinitionNames) {

        // get bean
        Object bean = null;
        Lazy onBean = applicationContext.findAnnotationOnBean(beanDefinitionName, Lazy.class);
        if (onBean!=null){
            logger.debug("xxl-job annotation scan, skip @Lazy Bean:{}", beanDefinitionName);
            continue;
        }else {
            bean = applicationContext.getBean(beanDefinitionName);
        }

        // filter method
        Map<Method, XxlJob> annotatedMethods = null;   // referred to :org.springframework.context.event.EventListenerMethodProcessor.processBean
        try {
            annotatedMethods = MethodIntrospector.selectMethods(bean.getClass(),
                    new MethodIntrospector.MetadataLookup<XxlJob>() {
                        @Override
                        public XxlJob inspect(Method method) {
                            return AnnotatedElementUtils.findMergedAnnotation(method, XxlJob.class);
                        }
                    });
        } catch (Throwable ex) {
            logger.error("xxl-job method-jobhandler resolve error for bean[" + beanDefinitionName + "].", ex);
        }
        if (annotatedMethods==null || annotatedMethods.isEmpty()) {
            continue;
        }

        // generate and regist method job handler
        for (Map.Entry<Method, XxlJob> methodXxlJobEntry : annotatedMethods.entrySet()) {
            Method executeMethod = methodXxlJobEntry.getKey();
            XxlJob xxlJob = methodXxlJobEntry.getValue();
            // regist
            registJobHandler(xxlJob, bean, executeMethod);
        }

    }
}

//XxlJobExecutor.java
protected void registJobHandler(XxlJob xxlJob, Object bean, Method executeMethod){
    if (xxlJob == null) {
        return;
    }

    String name = xxlJob.value();
    //make and simplify the variables since they'll be called several times later
    Class<?> clazz = bean.getClass();
    String methodName = executeMethod.getName();
    if (name.trim().length() == 0) {
        throw new RuntimeException("xxl-job method-jobhandler name invalid, for[" + clazz + "#" + methodName + "] .");
    }
    if (loadJobHandler(name) != null) {
        throw new RuntimeException("xxl-job jobhandler[" + name + "] naming conflicts.");
    }

    // execute method
    /*if (!(method.getParameterTypes().length == 1 && method.getParameterTypes()[0].isAssignableFrom(String.class))) {
            throw new RuntimeException("xxl-job method-jobhandler param-classtype invalid, for[" + bean.getClass() + "#" + method.getName() + "] , " +
                    "The correct method format like \" public ReturnT<String> execute(String param) \" .");
        }
        if (!method.getReturnType().isAssignableFrom(ReturnT.class)) {
            throw new RuntimeException("xxl-job method-jobhandler return-classtype invalid, for[" + bean.getClass() + "#" + method.getName() + "] , " +
                    "The correct method format like \" public ReturnT<String> execute(String param) \" .");
        }*/

    executeMethod.setAccessible(true);

    // init and destroy
    Method initMethod = null;
    Method destroyMethod = null;

    if (xxlJob.init().trim().length() > 0) {
        try {
            initMethod = clazz.getDeclaredMethod(xxlJob.init());
            initMethod.setAccessible(true);
        } catch (NoSuchMethodException e) {
            throw new RuntimeException("xxl-job method-jobhandler initMethod invalid, for[" + clazz + "#" + methodName + "] .");
        }
    }
    if (xxlJob.destroy().trim().length() > 0) {
        try {
            destroyMethod = clazz.getDeclaredMethod(xxlJob.destroy());
            destroyMethod.setAccessible(true);
        } catch (NoSuchMethodException e) {
            throw new RuntimeException("xxl-job method-jobhandler destroyMethod invalid, for[" + clazz + "#" + methodName + "] .");
        }
    }

    // registry jobhandler
    registJobHandler(name, new MethodJobHandler(bean, executeMethod, initMethod, destroyMethod));

}

//XxlJobExecutor.java
public static IJobHandler registJobHandler(String name, IJobHandler jobHandler){
    logger.info(">>>>>>>>>>> xxl-job register jobhandler success, name:{}, jobHandler:{}", name, jobHandler);
    return jobHandlerRepository.put(name, jobHandler);
}

//XxlJobExecutor.java
private static ConcurrentMap<String, IJobHandler> jobHandlerRepository = new ConcurrentHashMap<String, IJobHandler>();

这个方法先扫描整个Spring容器,找到所有带@XxlJob注解的方法,放到 jobHandlerRepository 这个 ConcurrentHashMap 里缓存,把 jobHandler 的名字(@XxlJob注解括号内的名字)做为key,把bean对象、执行方法、init方法、destroy方法封装成 MethodJobHandler 对象,作为value

4.2.2、GlueFactory.refreshInstance(1)(加载Glue代码工厂实例,用于做运行中动态代码加载)

public static void refreshInstance(int type){
    if (type == 0) {
       glueFactory = new GlueFactory();
    } else if (type == 1) {
       glueFactory = new SpringGlueFactory();
    }
}

type是1,所以初始化的是 SpringGlueFactory,这个是用于动态代码加载的,本文不做展开了,有兴趣的可以查阅其他资料

4.2.3、super.start()(核心启动方法)

核心启动方法,在父类 XxlJobExecutor

//XxlJobExecutor.java
public void start() throws Exception {

    // init logpath
    XxlJobFileAppender.initLogPath(logPath);

    // init invoker, admin-client
    initAdminBizList(adminAddresses, accessToken);


    // init JobLogFileCleanThread
    JobLogFileCleanThread.getInstance().start(logRetentionDays);

    // init TriggerCallbackThread
    TriggerCallbackThread.getInstance().start();

    // init executor-server
    initEmbedServer(address, ip, port, appname, accessToken);
}
  1. **XxlJobFileAppender.initLogPath(logPath):**初始化日志目录
  2. **initAdminBizList(adminAddresses, accessToken):**初始化调度中心的地址,用于后续执行器去访问
  3. **JobLogFileCleanThread.getInstance().start(logRetentionDays):**启动日志文件清理线程,清理过期日志文件,传入的参数单位是天
  4. **TriggerCallbackThread.getInstance().start():**回调调度中心任务的执行结果
  5. **initEmbedServer(address, ip, port, appname, accessToken):**执行内嵌服务
4.2.3.1、XxlJobFileAppender.initLogPath(logPath)(初始化日志目录)

image-20240802095836482

就是初始化日志的目录,没什么好讲的

4.2.3.2、initAdminBizList(adminAddresses, accessToken)(初始化调度中心的地址)

image-20240802100020062

因为 jobHandler 注册和任务的回调都需要调用 xxl-job-admin(调度中心),这里初始化 xxl-job-admin(调度中心) 的地址,封装成 AdminBizClient 类,后面会用到

4.2.3.3、JobLogFileCleanThread.getInstance().start(logRetentionDays)(启动日志文件清理线程)
public void start(final long logRetentionDays){

    // limit min value
    if (logRetentionDays < 3 ) {
        return;
    }

    localThread = new Thread(new Runnable() {
        @Override
        public void run() {
            while (!toStop) {
                try {
                    // clean log dir, over logRetentionDays
                    File[] childDirs = new File(XxlJobFileAppender.getLogPath()).listFiles();
                    if (childDirs!=null && childDirs.length>0) {

                        // today
                        Calendar todayCal = Calendar.getInstance();
                        todayCal.set(Calendar.HOUR_OF_DAY,0);
                        todayCal.set(Calendar.MINUTE,0);
                        todayCal.set(Calendar.SECOND,0);
                        todayCal.set(Calendar.MILLISECOND,0);

                        Date todayDate = todayCal.getTime();

                        for (File childFile: childDirs) {

                            // valid
                            if (!childFile.isDirectory()) {
                                continue;
                            }
                            if (childFile.getName().indexOf("-") == -1) {
                                continue;
                            }

                            // file create date
                            Date logFileCreateDate = null;
                            try {
                                SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd");
                                logFileCreateDate = simpleDateFormat.parse(childFile.getName());
                            } catch (ParseException e) {
                                logger.error(e.getMessage(), e);
                            }
                            if (logFileCreateDate == null) {
                                continue;
                            }

                            if ((todayDate.getTime()-logFileCreateDate.getTime()) >= logRetentionDays * (24 * 60 * 60 * 1000) ) {
                                FileUtil.deleteRecursively(childFile);
                            }

                        }
                    }

                } catch (Exception e) {
                    if (!toStop) {
                        logger.error(e.getMessage(), e);
                    }

                }

                try {
                    TimeUnit.DAYS.sleep(1);
                } catch (InterruptedException e) {
                    if (!toStop) {
                        logger.error(e.getMessage(), e);
                    }
                }
            }
            logger.info(">>>>>>>>>>> xxl-job, executor JobLogFileCleanThread thread destroy.");

        }
    });
    localThread.setDaemon(true);
    localThread.setName("xxl-job, executor JobLogFileCleanThread");
    localThread.start();
}

就是启动了一个一天一跑的线程,去清理过期的日志文件

4.2.3.4、TriggerCallbackThread.getInstance().start()(回调调度中心任务的执行结果)
private LinkedBlockingQueue<HandleCallbackParam> callBackQueue = new LinkedBlockingQueue<HandleCallbackParam>();

public static void pushCallBack(HandleCallbackParam callback){
    getInstance().callBackQueue.add(callback);
    logger.debug(">>>>>>>>>>> xxl-job, push callback request, logId:{}", callback.getLogId());
}

// callback
triggerCallbackThread = new Thread(new Runnable() {

    @Override
    public void run() {

        // normal callback
        while(!toStop){
            try {
                HandleCallbackParam callback = getInstance().callBackQueue.take();
                if (callback != null) {

                    // callback list param
                    List<HandleCallbackParam> callbackParamList = new ArrayList<HandleCallbackParam>();
                    int drainToNum = getInstance().callBackQueue.drainTo(callbackParamList);
                    callbackParamList.add(callback);

                    // callback, will retry if error
                    if (callbackParamList!=null && callbackParamList.size()>0) {
                        doCallback(callbackParamList);
                    }
                }
            } catch (Exception e) {
                if (!toStop) {
                    logger.error(e.getMessage(), e);
                }
            }
        }

        // last callback
        try {
            List<HandleCallbackParam> callbackParamList = new ArrayList<HandleCallbackParam>();
            int drainToNum = getInstance().callBackQueue.drainTo(callbackParamList);
            if (callbackParamList!=null && callbackParamList.size()>0) {
                doCallback(callbackParamList);
            }
        } catch (Exception e) {
            if (!toStop) {
                logger.error(e.getMessage(), e);
            }
        }
        logger.info(">>>>>>>>>>> xxl-job, executor callback thread destroy.");

    }
});
triggerCallbackThread.setDaemon(true);
triggerCallbackThread.setName("xxl-job, executor TriggerCallbackThread");
triggerCallbackThread.start();

因为执行器执行完任务之后,会将执行结果存放在一个队列里(callBackQueue,这里提供了一个 pushCallBack 方法,用于往队列里添加),所以这里启动了一个线程,从队列(callBackQueue)里获取结果,调用调度中心将结果返回

如果调用调度中心时遇到错误,会记录错误文件

image-20240802102113531

然后还会启动一个线程来处理这些错误文件,重新调用调度器

// retry
triggerRetryCallbackThread = new Thread(new Runnable() {
    @Override
    public void run() {
        while(!toStop){
            try {
                retryFailCallbackFile();
            } catch (Exception e) {
                if (!toStop) {
                    logger.error(e.getMessage(), e);
                }

            }
            try {
                TimeUnit.SECONDS.sleep(RegistryConfig.BEAT_TIMEOUT);
            } catch (InterruptedException e) {
                if (!toStop) {
                    logger.error(e.getMessage(), e);
                }
            }
        }
        logger.info(">>>>>>>>>>> xxl-job, executor retry callback thread destroy.");
    }
});
triggerRetryCallbackThread.setDaemon(true);
triggerRetryCallbackThread.start();
4.2.3.5、initEmbedServer(address, ip, port, appname, accessToken)(执行内嵌服务)
private void initEmbedServer(String address, String ip, int port, String appname, String accessToken) throws Exception {

    // fill ip port
    port = port>0?port: NetUtil.findAvailablePort(9999);
    ip = (ip!=null&&ip.trim().length()>0)?ip: IpUtil.getIp();

    // generate address
    if (address==null || address.trim().length()==0) {
        String ip_port_address = IpUtil.getIpPort(ip, port);   // registry-address:default use address to registry , otherwise use ip:port if address is null
        address = "http://{ip_port}/".replace("{ip_port}", ip_port_address);
    }

    // accessToken
    if (accessToken==null || accessToken.trim().length()==0) {
        logger.warn(">>>>>>>>>>> xxl-job accessToken is empty. To ensure system security, please set the accessToken.");
    }

    // start
    embedServer = new EmbedServer();
    embedServer.start(address, port, appname, accessToken);
}

主要干了两件事:

1、确定端口号,拼接http的请求地址

2、启动 netty 服务器

先确定端口号,如果配置的端口号小于等于0,就会开始搜索可用的端口号

public static int findAvailablePort(int defaultPort) {
    int portTmp = defaultPort;
    while (portTmp < 65535) {
        if (!isPortUsed(portTmp)) {
            return portTmp;
        } else {
            portTmp++;
        }
    }
    portTmp = defaultPort--;
    while (portTmp > 0) {
        if (!isPortUsed(portTmp)) {
            return portTmp;
        } else {
            portTmp--;
        }
    }
    throw new RuntimeException("no available port.");
}

先从9999往上找,找不到,再从9999往下找,然后把 ip 和端口拼装成 address

重点看 embedServer.start(address, port, appname, accessToken); 启动 netty 服务器

public void start(final String address, final int port, final String appname, final String accessToken) {
    executorBiz = new ExecutorBizImpl();
    thread = new Thread(new Runnable() {
        @Override
        public void run() {
            // param
            EventLoopGroup bossGroup = new NioEventLoopGroup();
            EventLoopGroup workerGroup = new NioEventLoopGroup();
            ThreadPoolExecutor bizThreadPool = new ThreadPoolExecutor(
                    0,
                    200,
                    60L,
                    TimeUnit.SECONDS,
                    new LinkedBlockingQueue<Runnable>(2000),
                    new ThreadFactory() {
                        @Override
                        public Thread newThread(Runnable r) {
                            return new Thread(r, "xxl-job, EmbedServer bizThreadPool-" + r.hashCode());
                        }
                    },
                    new RejectedExecutionHandler() {
                        @Override
                        public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
                            throw new RuntimeException("xxl-job, EmbedServer bizThreadPool is EXHAUSTED!");
                        }
                    });
            try {
                // start server
                ServerBootstrap bootstrap = new ServerBootstrap();
                bootstrap.group(bossGroup, workerGroup)
                        .channel(NioServerSocketChannel.class)
                        .childHandler(new ChannelInitializer<SocketChannel>() {
                            @Override
                            public void initChannel(SocketChannel channel) throws Exception {
                                channel.pipeline()
                                        .addLast(new IdleStateHandler(0, 0, 30 * 3, TimeUnit.SECONDS))  // beat 3N, close if idle
                                        .addLast(new HttpServerCodec())
                                        .addLast(new HttpObjectAggregator(5 * 1024 * 1024))  // merge request & reponse to FULL
                                        .addLast(new EmbedHttpServerHandler(executorBiz, accessToken, bizThreadPool));
                            }
                        })
                        .childOption(ChannelOption.SO_KEEPALIVE, true);

                // bind
                ChannelFuture future = bootstrap.bind(port).sync();

                logger.info(">>>>>>>>>>> xxl-job remoting server start success, nettype = {}, port = {}", EmbedServer.class, port);

                // start registry
                startRegistry(appname, address);

                // wait util stop
                future.channel().closeFuture().sync();

            } catch (InterruptedException e) {
                logger.info(">>>>>>>>>>> xxl-job remoting server stop.");
            } catch (Exception e) {
                logger.error(">>>>>>>>>>> xxl-job remoting server error.", e);
            } finally {
                // stop
                try {
                    workerGroup.shutdownGracefully();
                    bossGroup.shutdownGracefully();
                } catch (Exception e) {
                    logger.error(e.getMessage(), e);
                }
            }
        }
    });
    thread.setDaemon(true);    // daemon, service jvm, user thread leave >>> daemon leave >>> jvm leave
    thread.start();
}

这里创建了一个线程并启动,重点看线程里干了啥

image-20240806100343885

先创建了一个线程池(bizThreadPool),后面用于异步处理调度中心发过来的任务执行信息

image-20240806100518718

然后启动一个 netty 服务器,重点是添加一个自定义的处理器 EmbedHttpServerHandlerEmbedHttpServerHandlerchannelRead0 方法会处理具体的业务,这个我们后面调用的地方再讲

然后下面就是调用 startRegistry(appname, address); 方法来注册执行器到调度中心,往下走,会来到 ExecutorRegistryThread 类的 start 方法

public void start(final String appname, final String address){

    // valid
    if (appname==null || appname.trim().length()==0) {
        logger.warn(">>>>>>>>>>> xxl-job, executor registry config fail, appname is null.");
        return;
    }
    if (XxlJobExecutor.getAdminBizList() == null) {
        logger.warn(">>>>>>>>>>> xxl-job, executor registry config fail, adminAddresses is null.");
        return;
    }

    registryThread = new Thread(new Runnable() {
        @Override
        public void run() {

            // registry
            while (!toStop) {
                try {
                    RegistryParam registryParam = new RegistryParam(RegistryConfig.RegistType.EXECUTOR.name(), appname, address);
                    for (AdminBiz adminBiz: XxlJobExecutor.getAdminBizList()) {
                        try {
                            ReturnT<String> registryResult = adminBiz.registry(registryParam);
                            if (registryResult!=null && ReturnT.SUCCESS_CODE == registryResult.getCode()) {
                                registryResult = ReturnT.SUCCESS;
                                logger.debug(">>>>>>>>>>> xxl-job registry success, registryParam:{}, registryResult:{}", new Object[]{registryParam, registryResult});
                                break;
                            } else {
                                logger.info(">>>>>>>>>>> xxl-job registry fail, registryParam:{}, registryResult:{}", new Object[]{registryParam, registryResult});
                            }
                        } catch (Exception e) {
                            logger.info(">>>>>>>>>>> xxl-job registry error, registryParam:{}", registryParam, e);
                        }

                    }
                } catch (Exception e) {
                    if (!toStop) {
                        logger.error(e.getMessage(), e);
                    }

                }

                try {
                    if (!toStop) {
                        TimeUnit.SECONDS.sleep(RegistryConfig.BEAT_TIMEOUT);
                    }
                } catch (InterruptedException e) {
                    if (!toStop) {
                        logger.warn(">>>>>>>>>>> xxl-job, executor registry thread interrupted, error msg:{}", e.getMessage());
                    }
                }
            }

            // registry remove
            try {
                RegistryParam registryParam = new RegistryParam(RegistryConfig.RegistType.EXECUTOR.name(), appname, address);
                for (AdminBiz adminBiz: XxlJobExecutor.getAdminBizList()) {
                    try {
                        ReturnT<String> registryResult = adminBiz.registryRemove(registryParam);
                        if (registryResult!=null && ReturnT.SUCCESS_CODE == registryResult.getCode()) {
                            registryResult = ReturnT.SUCCESS;
                            logger.info(">>>>>>>>>>> xxl-job registry-remove success, registryParam:{}, registryResult:{}", new Object[]{registryParam, registryResult});
                            break;
                        } else {
                            logger.info(">>>>>>>>>>> xxl-job registry-remove fail, registryParam:{}, registryResult:{}", new Object[]{registryParam, registryResult});
                        }
                    } catch (Exception e) {
                        if (!toStop) {
                            logger.info(">>>>>>>>>>> xxl-job registry-remove error, registryParam:{}", registryParam, e);
                        }

                    }

                }
            } catch (Exception e) {
                if (!toStop) {
                    logger.error(e.getMessage(), e);
                }
            }
            logger.info(">>>>>>>>>>> xxl-job, executor registry thread destroy.");

        }
    });
    registryThread.setDaemon(true);
    registryThread.setName("xxl-job, executor ExecutorRegistryThread");
    registryThread.start();
}

这里会启动一个线程,当非停止状态时,每30秒执行一次向所有的调度中心(xxl-job-admin)注册执行器(adminBiz.registry()),如果是停止状态,就向所有调度中心(xxl-job-admin)删除执行器(adminBiz.registryRemove()),具体往下走会到 AdminBizClient 类的 registryregistryRemove 方法

image-20240806103524956

其实就是向调度中心(xxl-job-admin)发送请求,请求会走到 xxl-job-adminJobApiController 类的 api 方法,最终会走到 4.1.2 里讲的 registryregistryRemove 方法

4.3、任务调用全流程

4.3.1、xxl-job-admin调度中心部分

从手动触发入手,手动触发是在 xxl-job-admin 页面点击任务操作执行一次触发

image-20240806140103945

请求来到 xxl-job-admin 服务 的 com.xxl.job.admin.controller.JobInfoController#triggerJob 方法

image-20240806140325329

一直往下走,会来到 com.xxl.job.admin.core.thread.JobTriggerPoolHelper#trigger 方法

image-20240806140528200

这个方法正是我们4.1.1里提到的 JobTriggerPoolHelper 提供的 trigger 方法,我们看这个方法的引用

image-20240806141013256

继续往下,最后会来到 com.xxl.job.admin.core.trigger.XxlJobTrigger#trigger 方法

public static void trigger(int jobId,
                           TriggerTypeEnum triggerType,
                           int failRetryCount,
                           String executorShardingParam,
                           String executorParam,
                           String addressList) {

    // load data
    XxlJobInfo jobInfo = XxlJobAdminConfig.getAdminConfig().getXxlJobInfoDao().loadById(jobId);
    if (jobInfo == null) {
        logger.warn(">>>>>>>>>>>> trigger fail, jobId invalid,jobId={}", jobId);
        return;
    }
    if (executorParam != null) {
        jobInfo.setExecutorParam(executorParam);
    }
    int finalFailRetryCount = failRetryCount>=0?failRetryCount:jobInfo.getExecutorFailRetryCount();
    XxlJobGroup group = XxlJobAdminConfig.getAdminConfig().getXxlJobGroupDao().load(jobInfo.getJobGroup());

    // cover addressList
    if (addressList!=null && addressList.trim().length()>0) {
        group.setAddressType(1);
        group.setAddressList(addressList.trim());
    }

    // sharding param
    int[] shardingParam = null;
    if (executorShardingParam!=null){
        String[] shardingArr = executorShardingParam.split("/");
        if (shardingArr.length==2 && isNumeric(shardingArr[0]) && isNumeric(shardingArr[1])) {
            shardingParam = new int[2];
            shardingParam[0] = Integer.valueOf(shardingArr[0]);
            shardingParam[1] = Integer.valueOf(shardingArr[1]);
        }
    }
    if (ExecutorRouteStrategyEnum.SHARDING_BROADCAST==ExecutorRouteStrategyEnum.match(jobInfo.getExecutorRouteStrategy(), null)
            && group.getRegistryList()!=null && !group.getRegistryList().isEmpty()
            && shardingParam==null) {
        for (int i = 0; i < group.getRegistryList().size(); i++) {
            processTrigger(group, jobInfo, finalFailRetryCount, triggerType, i, group.getRegistryList().size());
        }
    } else {
        if (shardingParam == null) {
            shardingParam = new int[]{0, 1};
        }
        processTrigger(group, jobInfo, finalFailRetryCount, triggerType, shardingParam[0], shardingParam[1]);
    }

}

前面是对参数的一些处理,最后会调用 com.xxl.job.admin.core.trigger.XxlJobTrigger#processTrigger 方法

private static void processTrigger(XxlJobGroup group, XxlJobInfo jobInfo, int finalFailRetryCount, TriggerTypeEnum triggerType, int index, int total){

    // param
    ExecutorBlockStrategyEnum blockStrategy = ExecutorBlockStrategyEnum.match(jobInfo.getExecutorBlockStrategy(), ExecutorBlockStrategyEnum.SERIAL_EXECUTION);  // block strategy
    ExecutorRouteStrategyEnum executorRouteStrategyEnum = ExecutorRouteStrategyEnum.match(jobInfo.getExecutorRouteStrategy(), null);    // route strategy
    String shardingParam = (ExecutorRouteStrategyEnum.SHARDING_BROADCAST==executorRouteStrategyEnum)?String.valueOf(index).concat("/").concat(String.valueOf(total)):null;

    // 1、save log-id
    XxlJobLog jobLog = new XxlJobLog();
    jobLog.setJobGroup(jobInfo.getJobGroup());
    jobLog.setJobId(jobInfo.getId());
    jobLog.setTriggerTime(new Date());
    XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().save(jobLog);
    logger.debug(">>>>>>>>>>> xxl-job trigger start, jobId:{}", jobLog.getId());

    // 2、init trigger-param
    TriggerParam triggerParam = new TriggerParam();
    triggerParam.setJobId(jobInfo.getId());
    triggerParam.setExecutorHandler(jobInfo.getExecutorHandler());
    triggerParam.setExecutorParams(jobInfo.getExecutorParam());
    triggerParam.setExecutorBlockStrategy(jobInfo.getExecutorBlockStrategy());
    triggerParam.setExecutorTimeout(jobInfo.getExecutorTimeout());
    triggerParam.setLogId(jobLog.getId());
    triggerParam.setLogDateTime(jobLog.getTriggerTime().getTime());
    triggerParam.setGlueType(jobInfo.getGlueType());
    triggerParam.setGlueSource(jobInfo.getGlueSource());
    triggerParam.setGlueUpdatetime(jobInfo.getGlueUpdatetime().getTime());
    triggerParam.setBroadcastIndex(index);
    triggerParam.setBroadcastTotal(total);

    // 3、init address
    String address = null;
    ReturnT<String> routeAddressResult = null;
    if (group.getRegistryList()!=null && !group.getRegistryList().isEmpty()) {
        if (ExecutorRouteStrategyEnum.SHARDING_BROADCAST == executorRouteStrategyEnum) {
            if (index < group.getRegistryList().size()) {
                address = group.getRegistryList().get(index);
            } else {
                address = group.getRegistryList().get(0);
            }
        } else {
            routeAddressResult = executorRouteStrategyEnum.getRouter().route(triggerParam, group.getRegistryList());
            if (routeAddressResult.getCode() == ReturnT.SUCCESS_CODE) {
                address = routeAddressResult.getContent();
            }
        }
    } else {
        routeAddressResult = new ReturnT<String>(ReturnT.FAIL_CODE, I18nUtil.getString("jobconf_trigger_address_empty"));
    }

    // 4、trigger remote executor
    ReturnT<String> triggerResult = null;
    if (address != null) {
        triggerResult = runExecutor(triggerParam, address);
    } else {
        triggerResult = new ReturnT<String>(ReturnT.FAIL_CODE, null);
    }

    // 5、collection trigger info
    StringBuffer triggerMsgSb = new StringBuffer();
    triggerMsgSb.append(I18nUtil.getString("jobconf_trigger_type")).append(":").append(triggerType.getTitle());
    triggerMsgSb.append("<br>").append(I18nUtil.getString("jobconf_trigger_admin_adress")).append(":").append(IpUtil.getIp());
    triggerMsgSb.append("<br>").append(I18nUtil.getString("jobconf_trigger_exe_regtype")).append(":")
            .append( (group.getAddressType() == 0)?I18nUtil.getString("jobgroup_field_addressType_0"):I18nUtil.getString("jobgroup_field_addressType_1") );
    triggerMsgSb.append("<br>").append(I18nUtil.getString("jobconf_trigger_exe_regaddress")).append(":").append(group.getRegistryList());
    triggerMsgSb.append("<br>").append(I18nUtil.getString("jobinfo_field_executorRouteStrategy")).append(":").append(executorRouteStrategyEnum.getTitle());
    if (shardingParam != null) {
        triggerMsgSb.append("("+shardingParam+")");
    }
    triggerMsgSb.append("<br>").append(I18nUtil.getString("jobinfo_field_executorBlockStrategy")).append(":").append(blockStrategy.getTitle());
    triggerMsgSb.append("<br>").append(I18nUtil.getString("jobinfo_field_timeout")).append(":").append(jobInfo.getExecutorTimeout());
    triggerMsgSb.append("<br>").append(I18nUtil.getString("jobinfo_field_executorFailRetryCount")).append(":").append(finalFailRetryCount);

    triggerMsgSb.append("<br><br><span style=\"color:#00c0ef;\" > >>>>>>>>>>>"+ I18nUtil.getString("jobconf_trigger_run") +"<<<<<<<<<<< </span><br>")
            .append((routeAddressResult!=null&&routeAddressResult.getMsg()!=null)?routeAddressResult.getMsg()+"<br><br>":"").append(triggerResult.getMsg()!=null?triggerResult.getMsg():"");

    // 6、save log trigger-info
    jobLog.setExecutorAddress(address);
    jobLog.setExecutorHandler(jobInfo.getExecutorHandler());
    jobLog.setExecutorParam(jobInfo.getExecutorParam());
    jobLog.setExecutorShardingParam(shardingParam);
    jobLog.setExecutorFailRetryCount(finalFailRetryCount);
    //jobLog.setTriggerTime();
    jobLog.setTriggerCode(triggerResult.getCode());
    jobLog.setTriggerMsg(triggerMsgSb.toString());
    XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().updateTriggerInfo(jobLog);

    logger.debug(">>>>>>>>>>> xxl-job trigger end, jobId:{}", jobLog.getId());
}

这个方法有两处比较重要的逻辑,分别是第3步(init address)和第4步(trigger remote executor)

第3步(init address)结合了枚举和策略模式,巧妙的处理了不同的路由策略

image-20240806164946517

设计了一个 ExecutorRouter 抽象父类,所有的路由策略都继承这个抽象类,并且封装到枚举 ExecutorRouteStrategyEnum 中,这样直接通过配置中的 executorRouteStrategy 参数匹配对应的 ExecutorRouteStrategyEnum,拿到其具体的处理类,调用其 route 方法获取远程路由地址(分片广播特例)

image-20240806165315964

第4步(trigger remote executor)为核心方法,调用远程执行器

image-20240806141420942

继续跟进看下 com.xxl.job.admin.core.trigger.XxlJobTrigger#runExecutor 方法

image-20240806141524489

继续往下,来到 com.xxl.job.core.biz.client.ExecutorBizClient#run 方法

image-20240806141649821

看下请求地址

image-20240806141803886

正是我们执行器的地址

4.3.2、xxl-job-core执行器部分

下面看执行器的请求入口,在4.2.3.5中我们讲到了,入口是 com.xxl.job.core.server.EmbedServer.EmbedHttpServerHandler#channelRead0

image-20240806142342942

继续跟进 com.xxl.job.core.server.EmbedServer.EmbedHttpServerHandler#process 方法

image-20240806142433831

因为 uri 是run,所以继续跟进 com.xxl.job.core.biz.impl.ExecutorBizImpl#run 方法

image-20240806142638020

这里根据运行模式,我们会进入 GlueTypeEnum.BEAN 这个条件,从4.2.1里讲到的 jobHandlerRepository 缓存里拿到对应的 jobHandler

image-20240806143126248

继续往下

image-20240806143230666

如果 jobThread 为空,会先注册一个(其实就是创建一个线程启动,然后缓存在 jobThreadRepository 里),然后调用 com.xxl.job.core.thread.JobThread#pushTriggerQueue 方法,放到 triggerQueue 队列里,如果放入队列成功,直接返回 SUCCESS 给调度中心,最终会记录在 joblogtriggerCode 中(jobLog.setTriggerCode(triggerResult.getCode());

image-20240806143432839

那么 triggerQueue 队列在什么地方被取出呢,很显然,在 jobThread 线程的处理逻辑中,来看 com.xxl.job.core.thread.JobThread#run 方法

@Override
public void run() {

    // init
    try {
       // 执行初始化方法,就是用户自己配置的方法,如xxl-job的例子 @XxlJob(value = "demoJobHandler2", init = "init", destroy = "destroy")
       handler.init();
    } catch (Throwable e) {
       logger.error(e.getMessage(), e);
    }

    // execute
    while(!toStop){
       running = false;
       // 统计空执行的次数
       idleTimes++;

           TriggerParam triggerParam = null;
           try {
          // to check toStop signal, we need cycle, so wo cannot use queue.take(), instand of poll(timeout)
          // 从队列中取数据
          triggerParam = triggerQueue.poll(3L, TimeUnit.SECONDS);
          if (triggerParam!=null) {
             running = true;
             // 重置空执行的次数
             idleTimes = 0;
             // 用来防止重复执行的
             triggerLogIdSet.remove(triggerParam.getLogId());

             // log filename, like "logPath/yyyy-MM-dd/9999.log"
             // 记录log文件
             String logFileName = XxlJobFileAppender.makeLogFileName(new Date(triggerParam.getLogDateTime()), triggerParam.getLogId());
             XxlJobContext xxlJobContext = new XxlJobContext(
                   triggerParam.getJobId(),
                   triggerParam.getExecutorParams(),
                   logFileName,
                   triggerParam.getBroadcastIndex(),
                   triggerParam.getBroadcastTotal());

             // init job context
             XxlJobContext.setXxlJobContext(xxlJobContext);

             // execute
             XxlJobHelper.log("<br>----------- xxl-job job execute start -----------<br>----------- Param:" + xxlJobContext.getJobParam());

             //如果设置了超时时间,就用异步调用
             if (triggerParam.getExecutorTimeout() > 0) {
                // limit timeout
                Thread futureThread = null;
                try {
                   FutureTask<Boolean> futureTask = new FutureTask<Boolean>(new Callable<Boolean>() {
                      @Override
                      public Boolean call() throws Exception {

                         // init job context
                         XxlJobContext.setXxlJobContext(xxlJobContext);

                         handler.execute();
                         return true;
                      }
                   });
                   futureThread = new Thread(futureTask);
                   futureThread.start();
				   
                   //获取异步处理的返回值
                   Boolean tempResult = futureTask.get(triggerParam.getExecutorTimeout(), TimeUnit.SECONDS);
                } catch (TimeoutException e) {

                   XxlJobHelper.log("<br>----------- xxl-job job execute timeout");
                   XxlJobHelper.log(e);

                   // handle result
                   XxlJobHelper.handleTimeout("job execute timeout ");
                } finally {
                   futureThread.interrupt();
                }
             } else {
                // just execute
                // 如果未设置超时时间,立即执行
                handler.execute();
             }

             // valid execute handle data
             if (XxlJobContext.getXxlJobContext().getHandleCode() <= 0) {
                XxlJobHelper.handleFail("job handle result lost.");
             } else {
                String tempHandleMsg = XxlJobContext.getXxlJobContext().getHandleMsg();
                tempHandleMsg = (tempHandleMsg!=null&&tempHandleMsg.length()>50000)
                      ?tempHandleMsg.substring(0, 50000).concat("...")
                      :tempHandleMsg;
                XxlJobContext.getXxlJobContext().setHandleMsg(tempHandleMsg);
             }
             XxlJobHelper.log("<br>----------- xxl-job job execute end(finish) -----------<br>----------- Result: handleCode="
                   + XxlJobContext.getXxlJobContext().getHandleCode()
                   + ", handleMsg = "
                   + XxlJobContext.getXxlJobContext().getHandleMsg()
             );

          } else {
             //如果空执行的次数超过30次,并且队列里没有数据了,就删除线程
             if (idleTimes > 30) {
                if(triggerQueue.size() == 0) { // avoid concurrent trigger causes jobId-lost
                   XxlJobExecutor.removeJobThread(jobId, "excutor idel times over limit.");
                }
             }
          }
       } catch (Throwable e) {
          if (toStop) {
             XxlJobHelper.log("<br>----------- JobThread toStop, stopReason:" + stopReason);
          }

          // handle result
          StringWriter stringWriter = new StringWriter();
          e.printStackTrace(new PrintWriter(stringWriter));
          String errorMsg = stringWriter.toString();

          XxlJobHelper.handleFail(errorMsg);

          XxlJobHelper.log("<br>----------- JobThread Exception:" + errorMsg + "<br>----------- xxl-job job execute end(error) -----------");
       } finally {
               if(triggerParam != null) {
                   // callback handler info
                   if (!toStop) {
                       // commonm
                       //执行完之后将结果放入回调队列中(callBackQueue)
                       TriggerCallbackThread.pushCallBack(new HandleCallbackParam(
                           triggerParam.getLogId(),
                      triggerParam.getLogDateTime(),
                      XxlJobContext.getXxlJobContext().getHandleCode(),
                      XxlJobContext.getXxlJobContext().getHandleMsg() )
                );
                   } else {
                       // is killed
                       TriggerCallbackThread.pushCallBack(new HandleCallbackParam(
                           triggerParam.getLogId(),
                      triggerParam.getLogDateTime(),
                      XxlJobContext.HANDLE_CODE_FAIL,
                      stopReason + " [job running, killed]" )
                );
                   }
               }
           }
       }

    // callback trigger request in queue
    while(triggerQueue !=null && triggerQueue.size()>0){
       TriggerParam triggerParam = triggerQueue.poll();
       if (triggerParam!=null) {
          // is killed
          TriggerCallbackThread.pushCallBack(new HandleCallbackParam(
                triggerParam.getLogId(),
                triggerParam.getLogDateTime(),
                XxlJobContext.HANDLE_CODE_FAIL,
                stopReason + " [job not executed, in the job queue, killed.]")
          );
       }
    }

    // destroy
    try {
       // 执行销毁方法,就是用户自己配置的方法,如xxl-job的例子 @XxlJob(value = "demoJobHandler2", init = "init", destroy = "destroy")
       handler.destroy();
    } catch (Throwable e) {
       logger.error(e.getMessage(), e);
    }

    logger.info(">>>>>>>>>>> xxl-job JobThread stoped, hashCode:{}", Thread.currentThread());
}

该线程,从 triggerQueue 队列里获取数据,如果有,就触发任务执行,通过是否配置了过期时间来决定是不是异步执行,如果没有获取到数据超过30次,且队里里没有数据了,就销毁线程,具体调用任务代码是 handler.execute();,最终会走到 com.xxl.job.core.handler.impl.MethodJobHandler#execute

image-20240806150524925

该方法通过反射来调用标记了 @XxlJob 注解的对应方法

image-20240806150545679

最后执行完任务,把结果放到回调队列(callBackQueue)中,交由处理回调的线程处理

image-20240806150738145

再来看回调线程的执行逻辑,com.xxl.job.core.thread.TriggerCallbackThread#start

public void start() {

    // valid
    if (XxlJobExecutor.getAdminBizList() == null) {
        logger.warn(">>>>>>>>>>> xxl-job, executor callback config fail, adminAddresses is null.");
        return;
    }

    // callback
    triggerCallbackThread = new Thread(new Runnable() {

        @Override
        public void run() {

            // normal callback
            while(!toStop){
                try {
                    HandleCallbackParam callback = getInstance().callBackQueue.take();
                    if (callback != null) {

                        // callback list param
                        List<HandleCallbackParam> callbackParamList = new ArrayList<HandleCallbackParam>();
                        int drainToNum = getInstance().callBackQueue.drainTo(callbackParamList);
                        callbackParamList.add(callback);

                        // callback, will retry if error
                        if (callbackParamList!=null && callbackParamList.size()>0) {
                            doCallback(callbackParamList);
                        }
                    }
                } catch (Exception e) {
                    if (!toStop) {
                        logger.error(e.getMessage(), e);
                    }
                }
            }

            // last callback
            try {
                List<HandleCallbackParam> callbackParamList = new ArrayList<HandleCallbackParam>();
                int drainToNum = getInstance().callBackQueue.drainTo(callbackParamList);
                if (callbackParamList!=null && callbackParamList.size()>0) {
                    doCallback(callbackParamList);
                }
            } catch (Exception e) {
                if (!toStop) {
                    logger.error(e.getMessage(), e);
                }
            }
            logger.info(">>>>>>>>>>> xxl-job, executor callback thread destroy.");

        }
    });
    triggerCallbackThread.setDaemon(true);
    triggerCallbackThread.setName("xxl-job, executor TriggerCallbackThread");
    triggerCallbackThread.start();


    // retry
    triggerRetryCallbackThread = new Thread(new Runnable() {
        @Override
        public void run() {
            while(!toStop){
                try {
                    retryFailCallbackFile();
                } catch (Exception e) {
                    if (!toStop) {
                        logger.error(e.getMessage(), e);
                    }

                }
                try {
                    TimeUnit.SECONDS.sleep(RegistryConfig.BEAT_TIMEOUT);
                } catch (InterruptedException e) {
                    if (!toStop) {
                        logger.error(e.getMessage(), e);
                    }
                }
            }
            logger.info(">>>>>>>>>>> xxl-job, executor retry callback thread destroy.");
        }
    });
    triggerRetryCallbackThread.setDaemon(true);
    triggerRetryCallbackThread.start();

}

这个在4.2.3.4已经讲过,不多说了,我们跟进核心方法 com.xxl.job.core.thread.TriggerCallbackThread#doCallback 方法

image-20240806151257293

继续跟进 com.xxl.job.core.biz.client.AdminBizClient#callback 方法

image-20240806151343819

可以看到调用了 xxl-job-admin 的回调接口,又来到了 xxl-job-admincom.xxl.job.admin.controller.JobApiController#api 方法

image-20240806151638142

这次走的是 callback 分支了,最终会走到 com.xxl.job.admin.core.thread.JobCompleteHelper#callback(java.util.List<com.xxl.job.core.biz.model.HandleCallbackParam>) 方法,交由 callbackThreadPool 线程池处理(4.1.4中有提到)

五、总结

本文主要是简单的过一遍 xxl-job 启动以及调用的过程源码,大家可以下载 xxl-job 的源码,debug跟下代码,更便于理解

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1986249.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

常见中间件漏洞(四、Apache合集)

目录 四、Apache 4.1 CVE-2021-41773 漏洞简介 影响版本 环境搭建 漏洞复现 四、Apache 4.1 CVE-2021-41773 Apache HTTP Server 路径穿越漏洞 漏洞简介 该漏洞是由于Apache HTTP Server 2.4.49版本存在目录穿越漏洞,在路径穿越目录<Directory/>Require all gra…

Spring Boot 依赖之 lombok的@Data注解

Spring Boot 依赖之 lombok的Data注解 编译之后的代码 Java源代码 引入lombok 一、创建 Spring Boot 项目 启动 IntelliJ IDEA&#xff0c;点击 File -> New -> Project...在新项目对话框中&#xff0c;选择 Spring Initializr&#xff0c;点击 Next配置 Spring Ini…

RF放大器(传输线+终止传输线+奥品电路中常见问题+调谐放大器)

2024-8-6&#xff0c;星期二&#xff0c;22:57&#xff0c;天气&#xff1a;晴&#xff0c;心情&#xff1a;晴。下班抽出点时间看看书&#xff0c;话不多说&#xff0c;学习开始啦。 今日继续学习模电自选教材的第六章&#xff0c;多级放大器、RF放大器和功率放大器。主要学习…

机械学习—零基础学习日志(高数23——无穷小运算)

零基础为了学人工智能&#xff0c;真的开始复习高数 这段时间&#xff0c;把张宇老师讲解考研的第一部分基本全部学习完毕了。 这里把第一部分的内容最后汇总一下。 无穷小运算——吸收律 这里展示一些无穷小的具体计算思路 无穷小运算——计算方法 泰勒展开的原则 夹逼准则…

C语言程序设计-[6] if语句分支结构

if语句分支结构有三种形式&#xff0c;分别按照语句形式、流程图表示、示例的步骤进行讲解。 1、if语句的一般形式 ——简单if语句 语句形式&#xff1a;if(表达式) 语句1&#xff1b; 执行过程&#xff1a; 如果表达式值为“真”,执行该语句1&#xff0c;然后执行if的下一…

【生成式人工智能-四-chatgpt的训练过程-pretrain预训练自督导式学习督导式学习】

大模型是怎么被训练出来的具有人类智慧的 阶段一训练-自我学习-具备知识训练资料self-supervised learning&#xff08;自督导式学习&#xff09; 阶段二-怎么让模型具备人的智慧supervised learning 督导式学习预训练pretrain为什么要用预训练的模型&#xff1f;Adapter逆向工…

【大专生学前端】:挑战与机遇并存——你怎么看?

在当今互联网飞速发展的时代&#xff0c;前端开发作为连接用户与产品的重要桥梁&#xff0c;一直备受关注。然而&#xff0c;近期社会上流传着一种说法&#xff1a;“大专生学前端&#xff0c;没有前途了&#xff0c;毕业即失业。”这一观点不仅引发了广泛的讨论&#xff0c;也…

【爬虫实战】利用代理爬取电商数据

文章目录 前言工具介绍实战获取网站数据编写代码数据展示 推荐总结 前言 当今电商平台正经历着快速的转型与升级。随着技术的进步和用户需求的多样化&#xff0c;电商不仅从简单的在线购物演变为综合性的购物生态系统&#xff0c;还融合了人工智能、大数据和云计算等先进技术。…

freeRTOS入门学习-基于STM32F103C8T6最小系统板-使用cubeMX创建一个新的工程

一、准备工作 首先打开我们的cubeMX软件&#xff0c;搜索我们对应的MCU&#xff0c;我这里使用的是c8t6最小系统板 接下来双击我们需要的芯片型号 二、System Core配置 进入界面后我们先进行时钟的配置 将HSE配置为晶振 然后我们将SYS配置成为serial wire&#xff0c; …

【单片机毕业设计选题24102】-基于STM32和阿里云的禽舍环境监测控制系统

系统功能: 系统分为主机端和从机端&#xff0c;主机端主动向从机端发送信息和命令&#xff0c;从机端 收到主机端的信息后回复温湿度氨气浓度和光照强度等信息。 从机端操作&#xff1a; 从机端上电后显示“欢迎使用环境监测系统请稍后”两秒后显示第一页面。 从机端口上电…

每日学习笔记:C++ STL算法之数值算法

目录 算法头文件 运算后产生结果 对容器所有元素(数列)进行某种运算&#xff1a; accumulate(beg, end, initValue) accumulate(beg, end, initValue, op) 计算两数列的内积&#xff1a; inner_product(beg1, end1, end2, initValue) inner_product(beg1, end1, end2…

红酒与奶酪:欧洲风情的整合

在欧洲的浪漫风情中&#xff0c;红酒与奶酪总是携手相伴&#xff0c;它们各自不同的魅力交织在一起&#xff0c;仿佛一首悠扬的交响曲&#xff0c;在味蕾上奏响。当洒派红酒&#xff08;Bold & Generous&#xff09;与精选奶酪相遇&#xff0c;一场欧洲风情的整合即将上演。…

亚马逊自发货erp,自动虚拟发货自动生成订单

亚马逊自发货 ERP 自动化虚拟发货&#xff0c;贴牌定制独立部署。 大家好&#xff0c;今天来讲一下 ERP 的虚拟自动发货&#xff01; 1. 以为发货都是人工手动去发货的&#xff0c;其实不然&#xff0c;很多产品一爆就是几十、一百单&#xff0c;不可能一个一个点的去发货&am…

vue3 ResizeObserver如何监听一个容器盒子div宽度高度发生改变

ResizeObserver定义 ResizeObserver&#xff1a;是一个 JavaScript API&#xff0c;用于监视元素的大小变化。它可以观察一个或多个 DOM 元素&#xff0c;以便在元素的大小或形状发生变化时触发回调函数。 ResizeObserver &#xff1a;为了更有效地处理元素尺寸变化而引入的&am…

Linux驱动开发—设备树传递给内核,匹配驱动过程分析

文章目录 总体流程图传递DTB过程编译设备树源文件将 .dtb 文件与内核或引导加载程序集成 内核初始化阶段解析DTB内核启动阶段解析 DTB注册设备树节点驱动程序绑定 内核解析设备树二进制文件&#xff08;DTB&#xff09;的过程主要分为几个步骤&#xff0c;从设备树的传递到最终…

Mybatis学习-day19

Mybatis学习-day19 1. resultMap resultMap 是 MyBatis 中最复杂的元素&#xff0c;主要用于解决实体类属性名与数据库表中字段名不一致的情况&#xff0c;可以将查询结果映射成实体对象。 <resultMap id"staffAndDep" type"com.easy.bean.Staff">…

apache 漏洞

影响版本 Apache HTTP Server 2.4.49 某些Apache HTTPd 2.4.50也存在此漏洞 环境搭建 docker pull blueteamsteve/cve-2021-41773:no-cgid 漏洞复现 http://1.15.136.212:8080 1.使⽤poc curl http://1.15.136.212:8080/cgi-bin/.%2e/.%2e/.%2e/.%2e/etc/passwd

在没有硬盘的情况下进行电脑数据迁移

电脑数据迁移方式 在更换电脑的时候需要进行文件的传输&#xff0c;但是没有硬盘可以选择使用网线直连或者无线文件共享。通用配置 1.将旧电脑的文件夹或者磁盘设置文件共享 找到指定的文件夹右键属》属性&#xff0c;点击共享》点击高级共享 选择共享文件夹以及修改共享用户…

缓冲区和文件IO--linux系统调用

缓冲区&#xff1a; 缓冲区是一块内存区域&#xff0c;用于存储数据&#xff0c;直到数据被真正写入到文件或设备中&#xff0c;或从文件或设备中读取。这种机制使得程序可以一次处理较大的数据块&#xff0c;而不是频繁地进行较小的I/O操作。 缓冲模式&#xff1a; 全缓冲&…

学习LLM大模型,不容错过的《大语言模型:基础与前沿》(附PDF下载)

前言 就目前来看&#xff0c;大量工作正逐渐被大型语言模型&#xff08;LLM&#xff09;所替代&#xff0c;就比如文本自动生成、智能客服、数据分析和预测等多个领域。这暗示着LLM正逐步成为支撑社会运作的关键基础设施。未来&#xff0c;比Devin更为智能的LLM将会问世。我们…