任务执行策略
任务阻塞策略
整体架构部署
这里主要讲解下每个模块的作用
- 调度模块:负责管理调度信息,按照调度配置发出调度请求,自身不承担任何业务代码。调度系统于任务解耦,提高了系统可用性和稳定性,同时调度系统性能不受限于任务模块
- 执行器:主要负责任务的具体执行,以及分配线程执行任务。同时执行器需要注册到调度中心,以及执行的回调结果需要通知调度器
部署注意事项
- 调度中心和执行器都可以多节点部署,但是调度中心有一定注意事项
- 连接同一个数据库(避免账号、任务、执行器等信息不同步)
- 集群机器时钟保持一致(避免任务出现重复调度)
执行器和调度器底层原理
执行器的注册和发现方式
- 执行器启动的时候,主动注册到调度中心,并且定时发送心跳,保持续约。执行器正常关闭时候,也主动告知调度中心注销掉
- 如果执行器宕机或者其余问题,调度中心不知道执行器的情况,这时候就需要调度器不断对执行器进行探活,会有一个专门的后台线程,定时调用执行器接口,如果有异常就下线
执行器的启动
//XxlJobSpringExecutor-》XxlJobExecutor-》start()
public void start() throws Exception {
// 初始化日志路线
XxlJobFileAppender.initLogPath(logPath);
// 创建调度器客户端,主要是执行器用于连接调度器的
initAdminBizList(adminAddresses, accessToken);
// 初始化日志清理线程
JobLogFileCleanThread.getInstance().start(logRetentionDays);
// 初始化回调线程
TriggerCallbackThread.getInstance().start();
// 初始化执行器服务器
initEmbedServer(address, ip, port, appname, accessToken);
}
initEmbedServer(初始化执行器服务器)
首先获取执行器端口和ip,然后启动一个Springboot内置的embedServer
private void initEmbedServer(String address, String ip, int port, String appname, String accessToken) throws Exception {
// fill ip port
port = port>0?port: NetUtil.findAvailablePort(9999);
ip = (ip!=null&&ip.trim().length()>0)?ip: IpUtil.getIp();
// generate address
if (address==null || address.trim().length()==0) {
String ip_port_address = IpUtil.getIpPort(ip, port); // registry-address:default use address to registry , otherwise use ip:port if address is null
address = "http://{ip_port}/".replace("{ip_port}", ip_port_address);
}
// start
embedServer = new EmbedServer();
embedServer.start(address, port, appname, accessToken);
}
embedServer.start
//创建了一个业务线程池bizThreadPool
//然后创建了一个Netty通信bootstrap,用于绑定端口启动
//然后注册执行器startRegistry(appname, address);
public void start(final String address, final int port, final String appname, final String accessToken) {
executorBiz = new ExecutorBizImpl();
thread = new Thread(new Runnable() {
@Override
public void run() {
// param
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();
ThreadPoolExecutor bizThreadPool = new ThreadPoolExecutor(
0,
200,
60L,
TimeUnit.SECONDS,
new LinkedBlockingQueue<Runnable>(2000),
new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
return new Thread(r, "xxl-rpc, EmbedServer bizThreadPool-" + r.hashCode());
}
},
new RejectedExecutionHandler() {
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
throw new RuntimeException("xxl-job, EmbedServer bizThreadPool is EXHAUSTED!");
}
});
try {
// start server
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel channel) throws Exception {
channel.pipeline()
.addLast(new IdleStateHandler(0, 0, 30 * 3, TimeUnit.SECONDS)) // beat 3N, close if idle
.addLast(new HttpServerCodec())
.addLast(new HttpObjectAggregator(5 * 1024 * 1024)) // merge request & reponse to FULL
.addLast(new EmbedHttpServerHandler(executorBiz, accessToken, bizThreadPool));
}
})
.childOption(ChannelOption.SO_KEEPALIVE, true);
// bind
ChannelFuture future = bootstrap.bind(port).sync();
logger.info(">>>>>>>>>>> xxl-job remoting server start success, nettype = {}, port = {}", EmbedServer.class, port);
// start registry
startRegistry(appname, address);
// wait util stop
future.channel().closeFuture().sync();
} catch (InterruptedException e) {
if (e instanceof InterruptedException) {
logger.info(">>>>>>>>>>> xxl-job remoting server stop.");
} else {
logger.error(">>>>>>>>>>> xxl-job remoting server error.", e);
}
} finally {
// stop
try {
workerGroup.shutdownGracefully();
bossGroup.shutdownGracefully();
} catch (Exception e) {
logger.error(e.getMessage(), e);
}
}
}
});
thread.setDaemon(true); // daemon, service jvm, user thread leave >>> daemon leave >>> jvm leave
thread.start();
}
注册执行器
注册执行器,主要是这一个方法,通过http接口,进行注册
for (AdminBiz adminBiz: XxlJobExecutor.getAdminBizList()) {
try {
ReturnT<String> registryResult = adminBiz.registryRemove(registryParam);
if (registryResult!=null && ReturnT.SUCCESS_CODE == registryResult.getCode()) {
registryResult = ReturnT.SUCCESS;
logger.info(">>>>>>>>>>> xxl-job registry-remove success, registryParam:{}, registryResult:{}", new Object[]{registryParam, registryResult});
break;
} else {
logger.info(">>>>>>>>>>> xxl-job registry-remove fail, registryParam:{}, registryResult:{}", new Object[]{registryParam, registryResult});
}
} catch (Exception e) {
if (!toStop) {
logger.info(">>>>>>>>>>> xxl-job registry-remove error, registryParam:{}", registryParam, e);
}
}
}
调度器接受到请求
然后到调度器模块JobApiController-》"registry"分支-》调用mapper方法插入数据库
@Override
public ReturnT<String> registry(RegistryParam registryParam) {
// valid
if (!StringUtils.hasText(registryParam.getRegistryGroup())
|| !StringUtils.hasText(registryParam.getRegistryKey())
|| !StringUtils.hasText(registryParam.getRegistryValue())) {
return new ReturnT<String>(ReturnT.FAIL_CODE, "Illegal Argument.");
}
int ret = xxlJobRegistryDao.registryUpdate(registryParam.getRegistryGroup(), registryParam.getRegistryKey(), registryParam.getRegistryValue(), new Date());
if (ret < 1) {
xxlJobRegistryDao.registrySave(registryParam.getRegistryGroup(), registryParam.getRegistryKey(), registryParam.getRegistryValue(), new Date());
// fresh
freshGroupRegistryInfo(registryParam);
}
return ReturnT.SUCCESS;
}
调度器启动
public void init() throws Exception {
// init i18n
initI18n();
// 任务注册监听器,不停更新注册表,吧超时的执行器剔除,每30s一次
JobRegistryMonitorHelper.getInstance().start();
//任务调度失败监控器,失败重试、发送邮件等
JobFailMonitorHelper.getInstance().start();
// 任务结果丢失处理
JobLosedMonitorHelper.getInstance().start();
// 2个调度线程启动(一快一慢)
JobTriggerPoolHelper.toStart();
// 日志报告启动
JobLogReportHelper.getInstance().start();
// start-schedule
JobScheduleHelper.getInstance().start();
logger.info(">>>>>>>>> init xxl-job admin success.");
}
JobScheduleHelper.getInstance().start();
-
调度线程
- 首先随机睡眠4-5s,有可能是防止执行器集中启动导致资源竞争问题
- 然后计算预读取任务数,默认6000
- 然后就是调度器重复执行的事情
- 首先获取数据库的排他锁,因为之前说过调度器集群部署需要连接同一个数据库,这里也就是说明后面的操作是互斥的,并且多个调度器服务在同一时间只能有一个调度器获取任务信息
- .获取锁成功以后,就执行查询任务逻辑(查询状态是1,并且5s之内需要触发的任务)
- 然后这里将任务分为三种情况(没有达到触发时间,需要触发5s以内,已经过期5s以上的)
- 过期5s以上的:这里就不做任何处理,只是更新下次调度时间
- 正常执行的:正常执行任务、然后更新下一次触发时间、然后进行一次预读(提升性能,主要是判断触发时间是否满足,如果满足就丢入时间轮等待,然后更新下次触发时间)
- 还没到时间的:丢入时间轮、刷新触发时间(实际上没用,因为时间还没到)
-
时间轮线程
- 初始化的时候先对齐秒数:主要是为了下一个整秒运行,因为时间轮的分割都是1s为维度
- 然后进入循环,开始执行
- 避免处理耗时太长,跨过刻度,向前校验一个刻度(也就是当前秒和前一秒的任务都取出来)
- 然后就是触发任务了
-
任务调度的流程
- 还记得之前说的线程池有两个,一个快线程池、一个慢线程池吧,就是在这里使用,让任务执行时间比较久的进入慢线程池,避免阻塞后续任务,这里主要就是一个线程隔离(判断标准为:job-timeout 10 times in 1 min)
- 然后就开始了任务执行的逻辑,首先拿到任务信息,如果参数的failRetryCount>0就用参数值,否则Job定义的
- 然后拿到失败重试次数和组别
- 如果是分片广播的话,会拿到对应的分片参数,然后吧这个参数分给对应的节点,然后根据实现的逻辑去执行对应的任务
- 其余逻辑按照之前的路由规则拿到不同的执行器,然后通过http请求触发远程执行
-
时间轮的作用
- 其实时间轮的作用很简单,就是任务太多了,然后可以有一个规划来让任务进行时间分组,方便与顺序执行
- 其实时间轮的作用很简单,就是任务太多了,然后可以有一个规划来让任务进行时间分组,方便与顺序执行
-
执行器执行任务
- 其实主要就是在之前创建的业务线程里面执行的
- 先从容器中拿到任务线程(每个任务都有一个单独线程执行)
- 然后再拿到对应的jobHandler,然后通过阻塞策略判断,如何执行这个任务
- 然后通过任务线程吧这个Trigger加入队列,顺序执行,然后看是否有超时的配置,因为超时也就是通过Featuer的超时限制的
- .然后在finally里面会进行任务的回调,然后也有对应的后台线程执行
- 包括任务重试,也是有对应的后台线程依次执行
public void start(){
scheduleThread = new Thread(new Runnable() {
@Override
public void run() {
try {
TimeUnit.MILLISECONDS.sleep(5000 - System.currentTimeMillis()%1000 );
} catch (InterruptedException e) {
if (!scheduleThreadToStop) {
logger.error(e.getMessage(), e);
}
}
logger.info(">>>>>>>>> init xxl-job admin scheduler success.");
// pre-read count: treadpool-size * trigger-qps (each trigger cost 50ms, qps = 1000/50 = 20)
int preReadCount = (XxlJobAdminConfig.getAdminConfig().getTriggerPoolFastMax() + XxlJobAdminConfig.getAdminConfig().getTriggerPoolSlowMax()) * 20;
while (!scheduleThreadToStop) {
// Scan Job
long start = System.currentTimeMillis();
Connection conn = null;
Boolean connAutoCommit = null;
PreparedStatement preparedStatement = null;
boolean preReadSuc = true;
try {
conn = XxlJobAdminConfig.getAdminConfig().getDataSource().getConnection();
connAutoCommit = conn.getAutoCommit();
conn.setAutoCommit(false);
preparedStatement = conn.prepareStatement( "select * from xxl_job_lock where lock_name = 'schedule_lock' for update" );
preparedStatement.execute();
// tx start
// 1、pre read
long nowTime = System.currentTimeMillis();
List<XxlJobInfo> scheduleList = XxlJobAdminConfig.getAdminConfig().getXxlJobInfoDao().scheduleJobQuery(nowTime + PRE_READ_MS, preReadCount);
if (scheduleList!=null && scheduleList.size()>0) {
// 2、push time-ring
for (XxlJobInfo jobInfo: scheduleList) {
// time-ring jump
if (nowTime > jobInfo.getTriggerNextTime() + PRE_READ_MS) {
// 2.1、trigger-expire > 5s:pass && make next-trigger-time
logger.warn(">>>>>>>>>>> xxl-job, schedule misfire, jobId = " + jobInfo.getId());
// fresh next
refreshNextValidTime(jobInfo, new Date());
} else if (nowTime > jobInfo.getTriggerNextTime()) {
// 2.2、trigger-expire < 5s:direct-trigger && make next-trigger-time
// 1、trigger
JobTriggerPoolHelper.trigger(jobInfo.getId(), TriggerTypeEnum.CRON, -1, null, null, null);
logger.debug(">>>>>>>>>>> xxl-job, schedule push trigger : jobId = " + jobInfo.getId() );
// 2、fresh next
refreshNextValidTime(jobInfo, new Date());
// next-trigger-time in 5s, pre-read again
if (jobInfo.getTriggerStatus()==1 && nowTime + PRE_READ_MS > jobInfo.getTriggerNextTime()) {
// 1、make ring second
int ringSecond = (int)((jobInfo.getTriggerNextTime()/1000)%60);
// 2、push time ring
pushTimeRing(ringSecond, jobInfo.getId());
// 3、fresh next
refreshNextValidTime(jobInfo, new Date(jobInfo.getTriggerNextTime()));
}
} else {
// 2.3、trigger-pre-read:time-ring trigger && make next-trigger-time
// 1、make ring second
int ringSecond = (int)((jobInfo.getTriggerNextTime()/1000)%60);
// 2、push time ring
pushTimeRing(ringSecond, jobInfo.getId());
// 3、fresh next
refreshNextValidTime(jobInfo, new Date(jobInfo.getTriggerNextTime()));
}
}
// 3、update trigger info
for (XxlJobInfo jobInfo: scheduleList) {
XxlJobAdminConfig.getAdminConfig().getXxlJobInfoDao().scheduleUpdate(jobInfo);
}
} else {
preReadSuc = false;
}
// tx stop
} catch (Exception e) {
if (!scheduleThreadToStop) {
logger.error(">>>>>>>>>>> xxl-job, JobScheduleHelper#scheduleThread error:{}", e);
}
} finally {
// commit
if (conn != null) {
try {
conn.commit();
} catch (SQLException e) {
if (!scheduleThreadToStop) {
logger.error(e.getMessage(), e);
}
}
try {
conn.setAutoCommit(connAutoCommit);
} catch (SQLException e) {
if (!scheduleThreadToStop) {
logger.error(e.getMessage(), e);
}
}
try {
conn.close();
} catch (SQLException e) {
if (!scheduleThreadToStop) {
logger.error(e.getMessage(), e);
}
}
}
// close PreparedStatement
if (null != preparedStatement) {
try {
preparedStatement.close();
} catch (SQLException e) {
if (!scheduleThreadToStop) {
logger.error(e.getMessage(), e);
}
}
}
}
long cost = System.currentTimeMillis()-start;
// Wait seconds, align second
if (cost < 1000) { // scan-overtime, not wait
try {
// pre-read period: success > scan each second; fail > skip this period;
TimeUnit.MILLISECONDS.sleep((preReadSuc?1000:PRE_READ_MS) - System.currentTimeMillis()%1000);
} catch (InterruptedException e) {
if (!scheduleThreadToStop) {
logger.error(e.getMessage(), e);
}
}
}
}
logger.info(">>>>>>>>>>> xxl-job, JobScheduleHelper#scheduleThread stop");
}
});
scheduleThread.setDaemon(true);
scheduleThread.setName("xxl-job, admin JobScheduleHelper#scheduleThread");
scheduleThread.start();
// 时间轮线程
ringThread = new Thread(new Runnable() {
@Override
public void run() {
// align second
try {
TimeUnit.MILLISECONDS.sleep(1000 - System.currentTimeMillis()%1000 );
} catch (InterruptedException e) {
if (!ringThreadToStop) {
logger.error(e.getMessage(), e);
}
}
while (!ringThreadToStop) {
try {
// second data
List<Integer> ringItemData = new ArrayList<>();
int nowSecond = Calendar.getInstance().get(Calendar.SECOND); // 避免处理耗时太长,跨过刻度,向前校验一个刻度;
for (int i = 0; i < 2; i++) {
List<Integer> tmpData = ringData.remove( (nowSecond+60-i)%60 );
if (tmpData != null) {
ringItemData.addAll(tmpData);
}
}
// ring trigger
logger.debug(">>>>>>>>>>> xxl-job, time-ring beat : " + nowSecond + " = " + Arrays.asList(ringItemData) );
if (ringItemData.size() > 0) {
// do trigger
for (int jobId: ringItemData) {
// do trigger
JobTriggerPoolHelper.trigger(jobId, TriggerTypeEnum.CRON, -1, null, null, null);
}
// clear
ringItemData.clear();
}
} catch (Exception e) {
if (!ringThreadToStop) {
logger.error(">>>>>>>>>>> xxl-job, JobScheduleHelper#ringThread error:{}", e);
}
}
// next second, align second
try {
TimeUnit.MILLISECONDS.sleep(1000 - System.currentTimeMillis()%1000);
} catch (InterruptedException e) {
if (!ringThreadToStop) {
logger.error(e.getMessage(), e);
}
}
}
logger.info(">>>>>>>>>>> xxl-job, JobScheduleHelper#ringThread stop");
}
});
ringThread.setDaemon(true);
ringThread.setName("xxl-job, admin JobScheduleHelper#ringThread");
ringThread.start();
}