初识轻量级分布式任务调度平台 xxl-job

news2024/11/17 5:22:42

文章目录

    • 前言
    • xxl-job的目录结构
    • 项目依赖 (父 pom.xml)
    • xxl-job-admin 启动
    • xxl-job-executor-sample (项目使用示例)
      • xxl-job-executor-sample-frameless : 不使用框架的接入方式案例
      • xxl-job-executor-sample-springboot : springboot接入方案案例
    • xxl-job执行器器启动流程分析
    • 调度中心启动流程分析
      • 创建调度器以及对调度器进行初始化
      • 调度器初始化
      • 国际化初始化
      • 触发器线程池创建
      • 注册监控器启动
      • 失败监控器启动
      • 日志任务启动
      • 调度启动
      • 总结
    • 定时任务执行流程分析-客户端触发
      • trigger方法准备触发任务
      • processTrigger触发任务
      • 总结
    • 定时任务执行流程分析-服务端执行
      • 执行阻塞策略
      • 注册任务
      • 保存触发参数到缓存
    • xxl-job 执行器路由选择
    • xxl-job定时任务执行流程分析-任务执行
      • 处理器的初始化
      • 任务的执行
      • 销毁清理工作

前言

大家好,这里是 Rocky 编程日记 ,喜欢后端架构及中间件源码,目前正在阅读 xxl-job 源码。同时也把自己学习该 xxl-job笔记,代码分享出来,供大家学习交流,如若笔记中有不对的地方,那一定是当时我的理解还不够,希望你能及时提出。

如果对于该笔记存在很多疑惑,欢迎和我交流讨论
最后也感谢您的阅读,点赞,关注,收藏~

前人述备矣,我只是知识的搬运工

xxl-job 源码均在个人的开源项目中, 源代码仓库地址: https://gitee.com/Rocky-BCRJ/xxl-job.git

在这里插入图片描述

官方文档: https://www.xuxueli.com/xxl-job/

xxl-job的目录结构

  • xxl-job-admin : 是后台管理页面
  • xxl-job-core : 项目的核心包
  • xxl-job-executor-sample (项目使用示例)
    • xxl-job-executor-sample-frameless : 不使用框架的接入方式案例
    • xxl-job-executor-sample-springboot : springboot接入方案案例
  • doc : 项目文档和sql

项目依赖 (父 pom.xml)

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
	<modelVersion>4.0.0</modelVersion>
	<groupId>com.xuxueli</groupId>
	<artifactId>xxl-job</artifactId>
	<version>2.4.0-SNAPSHOT</version>
	<packaging>pom</packaging>

	<name>${project.artifactId}</name>
	<description>A distributed task scheduling framework.</description>
	<url>https://www.xuxueli.com/</url>

	<modules>
		<module>xxl-job-core</module>
		<module>xxl-job-admin</module>
		<module>xxl-job-executor-samples</module>
    </modules>

	<properties>
		<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
		<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
		<maven.compiler.encoding>UTF-8</maven.compiler.encoding>
		<maven.compiler.source>1.8</maven.compiler.source>
		<maven.compiler.target>1.8</maven.compiler.target>
		<maven.test.skip>true</maven.test.skip>

		<netty-all.version>4.1.63.Final</netty-all.version>
		<gson.version>2.9.0</gson.version>

		<spring.version>5.3.20</spring.version>
		<spring-boot.version>2.6.7</spring-boot.version>

		<mybatis-spring-boot-starter.version>2.2.2</mybatis-spring-boot-starter.version>
		<mysql-connector-java.version>8.0.29</mysql-connector-java.version>

		<slf4j-api.version>1.7.36</slf4j-api.version>
		<junit-jupiter.version>5.8.2</junit-jupiter.version>
		<javax.annotation-api.version>1.3.2</javax.annotation-api.version>

		<groovy.version>3.0.10</groovy.version>

		<maven-source-plugin.version>3.2.1</maven-source-plugin.version>
		<maven-javadoc-plugin.version>3.4.0</maven-javadoc-plugin.version>
		<maven-gpg-plugin.version>3.0.1</maven-gpg-plugin.version>
	</properties>
	
</project>	

xxl-job-admin 启动

  • 从 xxl-job 文件 doc 目录下 执行项目中的 SQL, 生成库表操作
  • 更改该模块下数据库链接,修改日志文件路径,打包,启动项目
  • 浏览器输入 http://localhost:8080/xxl-job-admin/
  • 访问之后登录,账号 : admin 密码: 123456

xxl-job-executor-sample (项目使用示例)

xxl-job-executor-sample-frameless : 不使用框架的接入方式案例

  • 项目只依赖了 xxl-job-core

        <dependencies>
    
            <!-- slf4j -->
            <dependency>
                <groupId>org.slf4j</groupId>
                <artifactId>slf4j-log4j12</artifactId>
                <version>${slf4j-api.version}</version>
            </dependency>
            <!-- junit -->
            <dependency>
                <groupId>org.junit.jupiter</groupId>
                <artifactId>junit-jupiter-engine</artifactId>
                <version>${junit-jupiter.version}</version>
                <scope>test</scope>
            </dependency>
    
            <!-- xxl-job-core -->
            <dependency>
                <groupId>com.xuxueli</groupId>
                <artifactId>xxl-job-core</artifactId>
                <version>${project.parent.version}</version>
            </dependency>
    
        </dependencies>
    
  • 关于 xxl-job 的核心配置文件

    ### xxl-job admin address list, such as "http://address" or "http://address01,http://address02"
    xxl.job.admin.addresses=http://127.0.0.1:8080/xxl-job-admin
    
    ### xxl-job, access token
    xxl.job.accessToken=default_token
    
    ### xxl-job executor appname
    xxl.job.executor.appname=xxl-job-executor-sample
    ### xxl-job executor registry-address: default use address to registry , otherwise use ip:port if address is null
    xxl.job.executor.address=
    ### xxl-job executor server-info
    xxl.job.executor.ip=
    xxl.job.executor.port=9998
    ### xxl-job executor log-path
    xxl.job.executor.logpath=/data/applogs/xxl-job/jobhandler
    ### xxl-job executor log-retention-days
    xxl.job.executor.logretentiondays=30
    
    

xxl-job-executor-sample-springboot : springboot接入方案案例

xxl-job执行器器启动流程分析

在项目代码 FramelessApplication 类的 main 中

FrameLessXxlJobConfig.getInstance().initXxlJobExecutor();

上述代码即为启动任务执行器的代码。进入到 FrameLessXxlJobConfig#initXxlJobExecutor()方法中

    /**
     * init
     * 初始化 XxlJobSimpleExecutor 执行器
     */
    public void initXxlJobExecutor() {

        // load executor prop 
        // 从配置文件(xxl-job-executor.properties)中加载配置 放到 Properties
        Properties xxlJobProp = loadProperties("xxl-job-executor.properties");

        // init executor 
        // 创建普通的任务执行器
        xxlJobExecutor = new XxlJobSimpleExecutor();
        xxlJobExecutor.setAdminAddresses(xxlJobProp.getProperty("xxl.job.admin.addresses"));
        xxlJobExecutor.setAccessToken(xxlJobProp.getProperty("xxl.job.accessToken"));
        xxlJobExecutor.setAppname(xxlJobProp.getProperty("xxl.job.executor.appname"));
        xxlJobExecutor.setAddress(xxlJobProp.getProperty("xxl.job.executor.address"));
        xxlJobExecutor.setIp(xxlJobProp.getProperty("xxl.job.executor.ip"));
        xxlJobExecutor.setPort(Integer.valueOf(xxlJobProp.getProperty("xxl.job.executor.port")));
        xxlJobExecutor.setLogPath(xxlJobProp.getProperty("xxl.job.executor.logpath"));
        xxlJobExecutor.setLogRetentionDays(Integer.valueOf(xxlJobProp.getProperty("xxl.job.executor.logretentiondays")));

        // registry job bean
        // 注册定时任务的bean, 将 SampleXxlJob 加入到定时任务里去
        xxlJobExecutor.setXxlJobBeanList(Arrays.asList(new SampleXxlJob()));

        // start executor
        try {
            // 启动执行器
            xxlJobExecutor.start();
        } catch (Exception e) {
            logger.error(e.getMessage(), e);
        }
    }

SampleXxlJob 类中有一系列由 @XxlJob注解修饰的方法。这些 @XxlJob注解修饰的方法就是定时任务。我们再来看一下执行器的启动start方法:

    @Override
    public void start() {

        // init JobHandler Repository (for method)
        // 初始化任务处理器
        initJobHandlerMethodRepository(xxlJobBeanList);

        // super start
        try {
            // 调用父类的 start
            super.start();
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

调用父类的 start方法启动执行器,父类的start方法如下:

 /**
     * 开始
     * <p>
     *     1.初始化日志路径
     *     2.初始化admin的客户端
     *     3.初始化日志清理线程
     *     4.初始化回调线程池
     *     5.初始化执行器服务
     * </p>
     * @throws Exception 异常
     */
    public void start() throws Exception {

        // init logpath
        // 初始化日志路径
        XxlJobFileAppender.initLogPath(logPath);

        // init invoker, admin-client
        // 初始化admin的客户端
        initAdminBizList(adminAddresses, accessToken);

        // init JobLogFileCleanThread
        // 初始化日志清理线程
        JobLogFileCleanThread.getInstance().start(logRetentionDays);

        // init TriggerCallbackThread
        // 初始化回调线程池
        TriggerCallbackThread.getInstance().start();

        // init executor-server
        // 初始化执行器服务
        initEmbedServer(address, ip, port, appname, accessToken);
    }

初始化日志路径

/**
	 * 初始化日志路径
	 * <p>
	 *     首先创建了保存日志的文件目录
	 *     然后在创建保存脚本的文件目录
	 * </p>
	 * @param logPath 日志路径
	 */
	public static void initLogPath(String logPath){
		// init
		if (logPath!=null && logPath.trim().length()>0) {
			logBasePath = logPath;
		}
		// mk base dir
		// 创建父类目录
		File logPathDir = new File(logBasePath);
		if (!logPathDir.exists()) {
			logPathDir.mkdirs();
		}
		logBasePath = logPathDir.getPath();

		// mk glue dir
		// 创建脚本代码目录
		File glueBaseDir = new File(logPathDir, "gluesource");
		if (!glueBaseDir.exists()) {
			glueBaseDir.mkdirs();
		}
		glueSrcPath = glueBaseDir.getPath();
	}

初始化admin的客户端

/**
     * 初始化admin的客户端
     *
     * @param adminAddresses 管理地址
     * @param accessToken    访问令牌
     * @throws Exception 异常
     */
    private void initAdminBizList(String adminAddresses, String accessToken) throws Exception {
        // 遍历 调度器 的地址
        if (adminAddresses!=null && adminAddresses.trim().length()>0) {
            // 以逗号分隔
            for (String address: adminAddresses.trim().split(",")) {
                if (address!=null && address.trim().length()>0) {
                    // 初始化admin客户端
                    AdminBiz adminBiz = new AdminBizClient(address.trim(), accessToken);

                    if (adminBizList == null) {
                        adminBizList = new ArrayList<AdminBiz>();
                    }
                    // 保存到list中
                    adminBizList.add(adminBiz);
                }
            }
        }
    }

初始化日志清理线程

启动一个线程localThread,用来清理过期的日志文件。localThread的run方法一直执行,首先获取所有的日志文件目录,日志文件形式如logPath/yyyy-MM-dd/9999.log,获取logPath/yyyy-MM-dd/目录下的所有日志文件,然后判断日志文件是否已经过期,过期时间是配置的,如果当前时间减去日志文件创建时间(yyyy-MM-dd)大于配置的日志清理天数,说明日志文件已经过期,一般配置只保存30天的日志,30天以前的日志都删除掉。

初始化执行器服务

启动了一个netty服务器,用于执行器接收admin的http请求。主要接收admin发送的空闲检测请求、运行定时任务的请求、停止运行定时任务的请求、获取日志的请求。最后a还向dmin注册了执行器,注册执行器是调用AdminBizClient的registry方法注册的,AdminBizClient的registry方法通过http将注册请求转发给admin服务的AdminBizImpl类的registry方法,AdminBizImpl类的registry方法将注册请求保存在数据库中。

执行器服务接收admin服务的请求, 交给ExecutorBiz接口处理,ExecutorBiz接口有五个方法,分别是beat(心跳检测)、idleBeat(空闲检测)、run(运行定时任务)、kill(停止运行任务)、log(获取日志)。ExecutorBiz接口有两个实现:ExecutorBizClient和ExecutorBizImpl,ExecutorBizClient是执行器客户端,ExecutorBizImpl执行器服务端。admin服务通过ExecutorBizClient类的方法通过http将请求转发给执行器服务的ExecutorBizImpl对应的方法。

调度中心启动流程分析

创建调度器以及对调度器进行初始化

    /**
     * 创建调度器以及对调度器进行初始化
     * @throws Exception 异常
     */
    @Override
    public void afterPropertiesSet() throws Exception {
        adminConfig = this;
        // 新建调度器
        xxlJobScheduler = new XxlJobScheduler();
        // 调度器初始化
        xxlJobScheduler.init();
    }

调度器初始化

 /**
     * 调度器初始化
     * <p>
     *     1.国际化初始化
     *     2.触发器线程池创建
     *     3.注册监控器启动
     *     4.失败监控器启动
     *     5.丢失监控器启动
     *     6.日志任务启动
     *     7.调度启动
     * </p>
     *
     * @throws Exception 异常
     */
    public void init() throws Exception {
        // init i18n 初始化国际化
        initI18n();

        // admin trigger pool start 触发器线程池创建
        JobTriggerPoolHelper.toStart();

        // admin registry monitor run 注册监控器启动
        JobRegistryHelper.getInstance().start();

        // admin fail-monitor run 失败监控器启动
        JobFailMonitorHelper.getInstance().start();

        // admin lose-monitor run ( depend on JobTriggerPoolHelper )
        // 丢失监控器启动
        JobCompleteHelper.getInstance().start();

        // admin log report start
        // 日志报告启动
        JobLogReportHelper.getInstance().start();

        // start-schedule  ( depend on JobTriggerPoolHelper )
        // 调度启动
        JobScheduleHelper.getInstance().start();

        logger.info(">>>>>>>>> init xxl-job admin success.");
    }

国际化初始化

    /**
     * init i18n
     * 国际化初始化
     */
    private void initI18n(){
        for (ExecutorBlockStrategyEnum item:ExecutorBlockStrategyEnum.values()) {
            item.setTitle(I18nUtil.getString("jobconf_block_".concat(item.name())));
        }
    }

ExecutorBlockStrategyEnum是执行阻塞策略枚举,主要有单机串行、丢弃后续调度、覆盖之前调度三种策略,initI18n方法就是设置执行策略的title值。

I18nUtil.getString方法就是根据配置读取resources/il8n/目录下的其中一个文件,该目录下有message_en.properties、message_zh_CN.properties、message_zh_TC.properties三个文件,分别为英语、中文简体、中文繁体是属性文件。

I18nUtil.getString方法获取到执行阻塞策略的值赋值给title.

触发器线程池创建

public static void toStart() {
        helper.start();
    }

    public void start(){
        // 快速触发线程
        fastTriggerPool = new ThreadPoolExecutor(
                10,
                XxlJobAdminConfig.getAdminConfig().getTriggerPoolFastMax(),
                60L,
                TimeUnit.SECONDS,
                new LinkedBlockingQueue<Runnable>(1000),
                new ThreadFactory() {
                    @Override
                    public Thread newThread(Runnable r) {
                        return new Thread(r, "xxl-job, admin JobTriggerPoolHelper-fastTriggerPool-" + r.hashCode());
                    }
                });
        // 慢速触发线程池
        slowTriggerPool = new ThreadPoolExecutor(
                10,
                XxlJobAdminConfig.getAdminConfig().getTriggerPoolSlowMax(),
                60L,
                TimeUnit.SECONDS,
                new LinkedBlockingQueue<Runnable>(2000),
                new ThreadFactory() {
                    @Override
                    public Thread newThread(Runnable r) {
                        return new Thread(r, "xxl-job, admin JobTriggerPoolHelper-slowTriggerPool-" + r.hashCode());
                    }
                });
    }

触发器线程池创建调用了JobTriggerPoolHelper类的start方法,start方法创建了两个线程池、fastTriggerPool为快速线程池、slowTriggerPool为慢速线程池,都是采用阻塞队列LinkedBlockingQueue,快速线程池的阻塞队列大小为1000,慢速线程池的阻塞队列大小为2000。

快速线程池、慢速线程池在什么时候被用来调度任务呢?
默认是用快速调度器调度任务的,当缓存中等待被调度的同一个任务的数量大于10的时候,就用慢速调度器调度任务。

注册监控器启动

public void start(){

		// for registry or remove
		// 调度任务注册线程池
		registryOrRemoveThreadPool = new ThreadPoolExecutor(
				2,
				10,
				30L,
				TimeUnit.SECONDS,
				new LinkedBlockingQueue<Runnable>(2000),
				new ThreadFactory() {
					@Override
					public Thread newThread(Runnable r) {
						return new Thread(r, "xxl-job, admin JobRegistryMonitorHelper-registryOrRemoveThreadPool-" + r.hashCode());
					}
				},
				new RejectedExecutionHandler() {
					@Override
					public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
						r.run();
						logger.warn(">>>>>>>>>>> xxl-job, registry or remove too fast, match threadpool rejected handler(run now).");
					}
				});

		// for monitor
		// 注册监控器线程
		registryMonitorThread = new Thread(new Runnable() {
			@Override
			public void run() {
				while (!toStop) {
					try {
						// auto registry group
						List<XxlJobGroup> groupList = XxlJobAdminConfig.getAdminConfig().getXxlJobGroupDao().findByAddressType(0);
						if (groupList!=null && !groupList.isEmpty()) {

							// remove dead address (admin/executor)
							List<Integer> ids = XxlJobAdminConfig.getAdminConfig().getXxlJobRegistryDao().findDead(RegistryConfig.DEAD_TIMEOUT, new Date());
							if (ids!=null && ids.size()>0) {
								XxlJobAdminConfig.getAdminConfig().getXxlJobRegistryDao().removeDead(ids);
							}

							// fresh online address (admin/executor)
							HashMap<String, List<String>> appAddressMap = new HashMap<String, List<String>>();
							List<XxlJobRegistry> list = XxlJobAdminConfig.getAdminConfig().getXxlJobRegistryDao().findAll(RegistryConfig.DEAD_TIMEOUT, new Date());
							if (list != null) {
								for (XxlJobRegistry item: list) {
									if (RegistryConfig.RegistType.EXECUTOR.name().equals(item.getRegistryGroup())) {
										String appname = item.getRegistryKey();
										List<String> registryList = appAddressMap.get(appname);
										if (registryList == null) {
											registryList = new ArrayList<String>();
										}

										if (!registryList.contains(item.getRegistryValue())) {
											registryList.add(item.getRegistryValue());
										}
										appAddressMap.put(appname, registryList);
									}
								}
							}

							// fresh group address
							for (XxlJobGroup group: groupList) {
								List<String> registryList = appAddressMap.get(group.getAppname());
								String addressListStr = null;
								if (registryList!=null && !registryList.isEmpty()) {
									Collections.sort(registryList);
									StringBuilder addressListSB = new StringBuilder();
									for (String item:registryList) {
										addressListSB.append(item).append(",");
									}
									addressListStr = addressListSB.toString();
									addressListStr = addressListStr.substring(0, addressListStr.length()-1);
								}
								group.setAddressList(addressListStr);
								group.setUpdateTime(new Date());

								XxlJobAdminConfig.getAdminConfig().getXxlJobGroupDao().update(group);
							}
						}
					} catch (Exception e) {
						if (!toStop) {
							logger.error(">>>>>>>>>>> xxl-job, job registry monitor thread error:{}", e);
						}
					}
					try {
						TimeUnit.SECONDS.sleep(RegistryConfig.BEAT_TIMEOUT);
					} catch (InterruptedException e) {
						if (!toStop) {
							logger.error(">>>>>>>>>>> xxl-job, job registry monitor thread error:{}", e);
						}
					}
				}
				logger.info(">>>>>>>>>>> xxl-job, job registry monitor thread stop");
			}
		});
		registryMonitorThread.setDaemon(true);
		registryMonitorThread.setName("xxl-job, admin JobRegistryMonitorHelper-registryMonitorThread");
		registryMonitorThread.start();
	}

JobRegistryHelper的 start 创建了一个调度任务注册线程池

registryOrRemoveThreadPool以及注册监控器线程registryMonitorThread,调度任务注册线程池用来执行调度任务的注册,注册监控器线程用来监控执行器的机器是否下线。

然后将registryMonitorThread设置为守护线程,最后启动registryMonitorThread线程,开始监控执行器的机器。

registryMonitorThread线程的run方法的代码被省略,接下来分析下run方法的具体逻辑:

// 注册监控器线程
		registryMonitorThread = new Thread(new Runnable() {
			@Override
			public void run() {
				while (!toStop) {
					try {
						// auto registry group
						// 自动注册的执行器列表
						List<XxlJobGroup> groupList = XxlJobAdminConfig.getAdminConfig().getXxlJobGroupDao().findByAddressType(0);
						if (groupList!=null && !groupList.isEmpty()) {

							// remove dead address (admin/executor)
							// 获取已经下线的机器地址记录
							List<Integer> ids = XxlJobAdminConfig.getAdminConfig().getXxlJobRegistryDao().findDead(RegistryConfig.DEAD_TIMEOUT, new Date());
							if (ids!=null && ids.size()>0) {
								// 删除已经下线的注册
								XxlJobAdminConfig.getAdminConfig().getXxlJobRegistryDao().removeDead(ids);
							}

							// fresh online address (admin/executor) 刷新在线的机器
							HashMap<String, List<String>> appAddressMap = new HashMap<String, List<String>>();
							// 查询存活的执行器注册机器
							List<XxlJobRegistry> list = XxlJobAdminConfig.getAdminConfig().getXxlJobRegistryDao().findAll(RegistryConfig.DEAD_TIMEOUT, new Date());
							if (list != null) {
								for (XxlJobRegistry item: list) {
									//如果是执行器,将同一个应用的调度任务放在list中
									if (RegistryConfig.RegistType.EXECUTOR.name().equals(item.getRegistryGroup())) {
										String appname = item.getRegistryKey();
										List<String> registryList = appAddressMap.get(appname);
										if (registryList == null) {
											registryList = new ArrayList<String>();
										}

										if (!registryList.contains(item.getRegistryValue())) {
											registryList.add(item.getRegistryValue());
										}
										appAddressMap.put(appname, registryList);
									}
								}
							}

							// fresh group address
							// 遍历自动注册的执行器列表
							for (XxlJobGroup group: groupList) {
								List<String> registryList = appAddressMap.get(group.getAppname());
								String addressListStr = null;
								if (registryList!=null && !registryList.isEmpty()) {
									Collections.sort(registryList);
									StringBuilder addressListSB = new StringBuilder();
									// 执行器地址拼接
									for (String item:registryList) {
										addressListSB.append(item).append(",");
									}
									addressListStr = addressListSB.toString();
									addressListStr = addressListStr.substring(0, addressListStr.length()-1);
								}
								// 设置地址
								group.setAddressList(addressListStr);
								// 设置注册更新时间
								group.setUpdateTime(new Date());
								// 更新注册的执行器地址
								XxlJobAdminConfig.getAdminConfig().getXxlJobGroupDao().update(group);
							}
						}
					} catch (Exception e) {
						if (!toStop) {
							logger.error(">>>>>>>>>>> xxl-job, job registry monitor thread error:{}", e);
						}
					}
					try {
						TimeUnit.SECONDS.sleep(RegistryConfig.BEAT_TIMEOUT);
					} catch (InterruptedException e) {
						if (!toStop) {
							logger.error(">>>>>>>>>>> xxl-job, job registry monitor thread error:{}", e);
						}
					}
				}
				logger.info(">>>>>>>>>>> xxl-job, job registry monitor thread stop");
			}
		});

run()方法一直执行,直到服务停止,主要做了两件事:

  1. 将已经下线的执行器的记录从数据库中删除
  2. 将还在线的执行器机器记录重新设置执行器地址以及更新执行器的时间,然后更新数据库的记录。

怎么判定执行器已经下线了?如果数据库中的update_time字段小于当前时间减去死亡期限,那么说明已经执行器在死亡期限没有进行更新时间,就判定已经下线了。

执行器在启动的时候,会启动一个执行器线程不断的执行注册任务,执行器任务会更新update_time字段。

失败监控器启动

public void start(){
        monitorThread = new Thread(new Runnable() {

            @Override
            public void run() {
                //代码省略
            }
        });
        monitorThread.setDaemon(true);
        monitorThread.setName("xxl-job, admin JobFailMonitorHelper");
        monitorThread.start();
}

上述代码创建了一个名字为monitorThread的线程,并设为守护线程,然后启动这个线程。线程的run方法的代码被省略,run方法的代码如下:

			@Override
			public void run() {

				// monitor
				while (!toStop) {
					try {
						// 获取失败任务日志, 最多1000条
						List<Long> failLogIds = XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().findFailJobLogIds(1000);
						if (failLogIds!=null && !failLogIds.isEmpty()) {
							// 遍历失败日志
							for (long failLogId: failLogIds) {
								// lock log
								// 将默认(0)告警状态设置为锁定状态(-1)
								int lockRet = XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().updateAlarmStatus(failLogId, 0, -1);
								if (lockRet < 1) {
									continue;
								}
								XxlJobLog log = XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().load(failLogId);
								// 获取任务信息
								XxlJobInfo info = XxlJobAdminConfig.getAdminConfig().getXxlJobInfoDao().loadById(log.getJobId());

								// 1、fail retry monitor
								// 如果失败重试次数大于0
								if (log.getExecutorFailRetryCount() > 0) {
									// 触发任务执行
									JobTriggerPoolHelper.trigger(log.getJobId(), TriggerTypeEnum.RETRY, (log.getExecutorFailRetryCount()-1), log.getExecutorShardingParam(), log.getExecutorParam(), null);
									String retryMsg = "<br><br><span style=\"color:#F39C12;\" > >>>>>>>>>>>"+ I18nUtil.getString("jobconf_trigger_type_retry") +"<<<<<<<<<<< </span><br>";
									log.setTriggerMsg(log.getTriggerMsg() + retryMsg);
									// 更新触发日志
									XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().updateTriggerInfo(log);
								}

								// 2、fail alarm monitor
								int newAlarmStatus = 0;		// 告警状态:0-默认、-1=锁定状态、1-无需告警、2-告警成功、3-告警失败
								// 如果告警右键不为null
								if (info != null) {
									// 告警
									boolean alarmResult = XxlJobAdminConfig.getAdminConfig().getJobAlarmer().alarm(info, log);
									newAlarmStatus = alarmResult?2:3;
								} else {
									newAlarmStatus = 1;
								}
								// 将锁定(-1)的日志更新为新的告警状态
								XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().updateAlarmStatus(failLogId, -1, newAlarmStatus);
							}
						}

					} catch (Exception e) {
						if (!toStop) {
							logger.error(">>>>>>>>>>> xxl-job, job fail monitor thread error:{}", e);
						}
					}

                    try {
                        TimeUnit.SECONDS.sleep(10);
                    } catch (Exception e) {
                        if (!toStop) {
                            logger.error(e.getMessage(), e);
                        }
                    }

                }

				logger.info(">>>>>>>>>>> xxl-job, job fail monitor thread stop");

			}

run方法一直运行,直到线程停止。run方法的首先从数据库中获取失败的调度任务日志列表,每次最多一千条。遍历失败的调度任务日志列表,首先将失败的调度任务日志进行锁定,暂停给告警邮件发送告警信息。如果调度任务的失败重试次数大于0,触发任务执行,更新任务日志信息。当邮件不为空时,触发告警信息,最后将锁定的日志状态更新为告警状态。

日志任务启动

主要做了两件事:

  1. 统计当前时间前三天的触发任务的数量、运行中的任务的数量、成功的任务数量、任务失败的数量,然后保存在数据库中。
  2. 根据配置的保存日志的过期时间,将已经过期的日志从数据库中查出来,然后清理过期的日志。日志任务启动是创意了一个线程,然后一直在后台运行。

调度启动

调度启动创建了两个线程,一个线程是用于不断从数据库把5秒内要执行的任务读出,立即触发或者放到时间轮等待触发,一个是用于触发任务。

总结

调用中心启动就是启动springboot项目。

在启动的过程中加载XxlJobAdminConfig配置类,在配置类中,会进行一系列的初始化工作,加载配置信息,创建以及初始化一系列化线程在后台一直异步运行,提高了性能。

定时任务执行流程分析-客户端触发

客户端触发是记录触发的日志、准备触发参数触发远程服务器的执行。

trigger方法准备触发任务

    /**
     * trigger job
     *  <p>
     *      1.根据任务id从数据库中获取执行的任务
     *      2.根据任务组名字从数据库中获取任务组,如果地址不为空,覆盖原来的地址列表,设置触发类型为手动触发。
     *      3.判断路由策略,如果是分片广播,遍历地址列表,触发所有的机器,否则只触发一台机器。
     *      4.分片广播是要触发所有的机器并行处理任务。
     *  </p>
     *  
     * @param jobId                 工作id
     * @param triggerType           触发类型
     * @param failRetryCount        失败重试计数
     * 			>=0: use this param
     * 			<0: use param from job info config         
     * @param executorShardingParam 遗嘱执行人切分参数
     * @param executorParam         执行器参数
     *          null: use job param
     *          not null: cover job param
     * @param addressList           地址列表
     *          null: use executor addressList
     *          not null: cover
     */
    public static void trigger(int jobId,
                               TriggerTypeEnum triggerType,
                               int failRetryCount,
                               String executorShardingParam,
                               String executorParam,
                               String addressList) {

        // load data
        // 从数据库中获取任务
        XxlJobInfo jobInfo = XxlJobAdminConfig.getAdminConfig().getXxlJobInfoDao().loadById(jobId);
        if (jobInfo == null) {
            logger.warn(">>>>>>>>>>>> trigger fail, jobId invalid,jobId={}", jobId);
            return;
        }
        // 设置执行参数
        if (executorParam != null) {
            jobInfo.setExecutorParam(executorParam);
        }
        // 重试次数
        int finalFailRetryCount = failRetryCount>=0?failRetryCount:jobInfo.getExecutorFailRetryCount();
        XxlJobGroup group = XxlJobAdminConfig.getAdminConfig().getXxlJobGroupDao().load(jobInfo.getJobGroup());
        // 如果地址不为空,覆盖原来的地址列表
        // cover addressList
        if (addressList!=null && addressList.trim().length()>0) {
            group.setAddressType(1);
            group.setAddressList(addressList.trim());
        }

        // sharding param
        int[] shardingParam = null;
        // executorShardingParam不等于null
        if (executorShardingParam!=null){
            String[] shardingArr = executorShardingParam.split("/");
            if (shardingArr.length==2 && isNumeric(shardingArr[0]) && isNumeric(shardingArr[1])) {
                shardingParam = new int[2];
                shardingParam[0] = Integer.valueOf(shardingArr[0]);
                shardingParam[1] = Integer.valueOf(shardingArr[1]);
            }
        }
        // 分片广播
        if (ExecutorRouteStrategyEnum.SHARDING_BROADCAST==ExecutorRouteStrategyEnum.match(jobInfo.getExecutorRouteStrategy(), null)
                && group.getRegistryList()!=null && !group.getRegistryList().isEmpty()
                && shardingParam==null) {
            // 并行处理
            for (int i = 0; i < group.getRegistryList().size(); i++) {
                // 处理触发
                processTrigger(group, jobInfo, finalFailRetryCount, triggerType, i, group.getRegistryList().size());
            }
        } else {
            if (shardingParam == null) {
                shardingParam = new int[]{0, 1};
            }
            processTrigger(group, jobInfo, finalFailRetryCount, triggerType, shardingParam[0], shardingParam[1]);
        }

    }

processTrigger触发任务

    /**
     * <p>
     *      获取执行阻塞策略
     *      获取路由策略
     *      保存任务日志
     *      初始化触发参数
     *      初始化执行器的地址:如果路由策略是分片广播,执行地址就为第index的地址,否则从通过路由获取执行地址。
     *      触发远程执行器,即触发远程的定时任务
     *      设置触发信息并保存触发日志
     * </p>
     * @param group                     job group, registry list may be empty
     * @param jobInfo
     * @param finalFailRetryCount
     * @param triggerType
     * @param index                     sharding index
     * @param total                     sharding index
     */
    private static void processTrigger(XxlJobGroup group, XxlJobInfo jobInfo, int finalFailRetryCount, TriggerTypeEnum triggerType, int index, int total){

        // param
        // 执行阻塞策略
        ExecutorBlockStrategyEnum blockStrategy = ExecutorBlockStrategyEnum.match(jobInfo.getExecutorBlockStrategy(), ExecutorBlockStrategyEnum.SERIAL_EXECUTION);  // block strategy
        // 路由策略
        ExecutorRouteStrategyEnum executorRouteStrategyEnum = ExecutorRouteStrategyEnum.match(jobInfo.getExecutorRouteStrategy(), null);    // route strategy
        // 分片广播
        String shardingParam = (ExecutorRouteStrategyEnum.SHARDING_BROADCAST==executorRouteStrategyEnum)?String.valueOf(index).concat("/").concat(String.valueOf(total)):null;

        // 1、save log-id 保存日志
        XxlJobLog jobLog = new XxlJobLog();
        jobLog.setJobGroup(jobInfo.getJobGroup());
        jobLog.setJobId(jobInfo.getId());
        jobLog.setTriggerTime(new Date());
        XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().save(jobLog);
        logger.debug(">>>>>>>>>>> xxl-job trigger start, jobId:{}", jobLog.getId());

        // 2、init trigger-param 初始化触发参数
        TriggerParam triggerParam = new TriggerParam();
        triggerParam.setJobId(jobInfo.getId());
        triggerParam.setExecutorHandler(jobInfo.getExecutorHandler());
        triggerParam.setExecutorParams(jobInfo.getExecutorParam());
        triggerParam.setExecutorBlockStrategy(jobInfo.getExecutorBlockStrategy());
        triggerParam.setExecutorTimeout(jobInfo.getExecutorTimeout());
        triggerParam.setLogId(jobLog.getId());
        triggerParam.setLogDateTime(jobLog.getTriggerTime().getTime());
        triggerParam.setGlueType(jobInfo.getGlueType());
        triggerParam.setGlueSource(jobInfo.getGlueSource());
        triggerParam.setGlueUpdatetime(jobInfo.getGlueUpdatetime().getTime());
        triggerParam.setBroadcastIndex(index);
        triggerParam.setBroadcastTotal(total);

        // 3、init address 初始化地址
        String address = null;
        ReturnT<String> routeAddressResult = null;
        if (group.getRegistryList()!=null && !group.getRegistryList().isEmpty()) {
            if (ExecutorRouteStrategyEnum.SHARDING_BROADCAST == executorRouteStrategyEnum) {
                if (index < group.getRegistryList().size()) {
                    address = group.getRegistryList().get(index);
                } else {
                    address = group.getRegistryList().get(0);
                }
            } else {
                // 路由获取地址
                routeAddressResult = executorRouteStrategyEnum.getRouter().route(triggerParam, group.getRegistryList());
                if (routeAddressResult.getCode() == ReturnT.SUCCESS_CODE) {
                    address = routeAddressResult.getContent();
                }
            }
        } else {
            routeAddressResult = new ReturnT<String>(ReturnT.FAIL_CODE, I18nUtil.getString("jobconf_trigger_address_empty"));
        }

        // 4、trigger remote executor 触发远程执行器
        ReturnT<String> triggerResult = null;
        if (address != null) {
            triggerResult = runExecutor(triggerParam, address);
        } else {
            triggerResult = new ReturnT<String>(ReturnT.FAIL_CODE, null);
        }

        // 5、collection trigger info 触发信息
        StringBuffer triggerMsgSb = new StringBuffer();
        triggerMsgSb.append(I18nUtil.getString("jobconf_trigger_type")).append(":").append(triggerType.getTitle());
        triggerMsgSb.append("<br>").append(I18nUtil.getString("jobconf_trigger_admin_adress")).append(":").append(IpUtil.getIp());
        triggerMsgSb.append("<br>").append(I18nUtil.getString("jobconf_trigger_exe_regtype")).append(":")
                .append( (group.getAddressType() == 0)?I18nUtil.getString("jobgroup_field_addressType_0"):I18nUtil.getString("jobgroup_field_addressType_1") );
        triggerMsgSb.append("<br>").append(I18nUtil.getString("jobconf_trigger_exe_regaddress")).append(":").append(group.getRegistryList());
        triggerMsgSb.append("<br>").append(I18nUtil.getString("jobinfo_field_executorRouteStrategy")).append(":").append(executorRouteStrategyEnum.getTitle());
        if (shardingParam != null) {
            triggerMsgSb.append("("+shardingParam+")");
        }
        triggerMsgSb.append("<br>").append(I18nUtil.getString("jobinfo_field_executorBlockStrategy")).append(":").append(blockStrategy.getTitle());
        triggerMsgSb.append("<br>").append(I18nUtil.getString("jobinfo_field_timeout")).append(":").append(jobInfo.getExecutorTimeout());
        triggerMsgSb.append("<br>").append(I18nUtil.getString("jobinfo_field_executorFailRetryCount")).append(":").append(finalFailRetryCount);

        triggerMsgSb.append("<br><br><span style=\"color:#00c0ef;\" > >>>>>>>>>>>"+ I18nUtil.getString("jobconf_trigger_run") +"<<<<<<<<<<< </span><br>")
                .append((routeAddressResult!=null&&routeAddressResult.getMsg()!=null)?routeAddressResult.getMsg()+"<br><br>":"").append(triggerResult.getMsg()!=null?triggerResult.getMsg():"");

        // 6、save log trigger-info 保存触发日志
        jobLog.setExecutorAddress(address);
        jobLog.setExecutorHandler(jobInfo.getExecutorHandler());
        jobLog.setExecutorParam(jobInfo.getExecutorParam());
        jobLog.setExecutorShardingParam(shardingParam);
        jobLog.setExecutorFailRetryCount(finalFailRetryCount);
        //jobLog.setTriggerTime();
        jobLog.setTriggerCode(triggerResult.getCode());
        jobLog.setTriggerMsg(triggerMsgSb.toString());
        XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().updateTriggerInfo(jobLog);

        logger.debug(">>>>>>>>>>> xxl-job trigger end, jobId:{}", jobLog.getId());
    }

    public static ReturnT<String> runExecutor(TriggerParam triggerParam, String address){
        ReturnT<String> runResult = null;
        try {
            // 获取执行器
            ExecutorBiz executorBiz = XxlJobScheduler.getExecutorBiz(address);
            // 执行任务
            runResult = executorBiz.run(triggerParam);
        } catch (Exception e) {
            logger.error(">>>>>>>>>>> xxl-job trigger error, please check if the executor[{}] is running.", address, e);
            runResult = new ReturnT<String>(ReturnT.FAIL_CODE, ThrowableUtil.toString(e));
        }
        // 返回执行结果
        StringBuffer runResultSB = new StringBuffer(I18nUtil.getString("jobconf_trigger_run") + ":");
        runResultSB.append("<br>address:").append(address);
        runResultSB.append("<br>code:").append(runResult.getCode());
        runResultSB.append("<br>msg:").append(runResult.getMsg());

        runResult.setMsg(runResultSB.toString());
        return runResult;
    }

runExecutor方法通过 XxlJobScheduler.getExecutorBiz方法获取执行器ExecutorBiz,然后调用执行器ExecutorBiz的run方法执行任务。getExecutorBiz方法首先通过地址从executorBizRepository(map)获取ExecutorBiz,如果获取的ExecutorBiz不为null,则直接返回,否则,创建一个ExecutorBizClient保存在executorBizRepository中,然后将创建的ExecutorBizClient返回。

ExecutorBiz接口有两个实现,分别是ExecutorBizClient(执行器客户端)、ExecutorBizImpl(执行器服务端),ExecutorBizClien类就是客户端操作任务的类,ExecutorBizImpl就是服务端操作任务的类。ExecutorBiz接口有beat(心跳检测)、idleBeat(空闲检测)、run(执行任务)、kill(停止任务)、log(打印日志)这些方法。

我们看看ExecutorBizClien的run方法:

public ReturnT<String> run(TriggerParam triggerParam) {
        return XxlJobRemotingUtil.postBody(addressUrl + "run", accessToken, timeout, triggerParam, String.class);
}

ExecutorBizClien的run方法比较简单,就是调用http请求发送触发参数触发服务端的任务执行,然后将结果返回给客户端。请求的地址为addressUrl + “run”,当客户端发送请求以后,ExecutorBizImpl的run方法将会接收请求处理,然后将处理的结果返回,这篇文章就讲到这里,服务端执行定时任务放到下一篇文章进行讲解。

总结

客户端触发任务执行,首先从数据库中查询出需要执行的任务,然后做好任务执行的准备,如日志的记录、触发参数的初始化、获取执行的地址等,然后发送http请求给服务端执行任务,服务器将处理任务的结果返回给客户端。客户端触发任务执行,是通过http请求触发任务执行,如果请求丢失,那么就会错过任务的执行。

定时任务执行流程分析-服务端执行

执行器启动时,会初始化一个EmbedServer类,该类的start方法会启动netty服务器。netty服务器会接收客户端发送过来的http请求,当接收到触发请求(请求路径是/run)会交给EmbedServer类的process方法处理,process方法将会调用ExecutorBizImpl的run方法处理客户端发送的触发请求。

ExecutorBizImpl的run方法处理流程大致如下:

 /**
     * <p>
     *     1.加载任务处理器与任务执行线程,校验任务处理器与任务执行线程
     *     2.执行阻塞策略
     *     3.注册任务
     *     4.保存触发参数到缓存
     *
     * </p>
     *
     * @param triggerParam 触发参数
     * @return {@link ReturnT}<{@link String}>
     */
    @Override
    public ReturnT<String> run(TriggerParam triggerParam) {
        // load old:jobHandler + jobThread
        // 加载旧的任务处理器和任务线程
        JobThread jobThread = XxlJobExecutor.loadJobThread(triggerParam.getJobId());
        IJobHandler jobHandler = jobThread!=null?jobThread.getHandler():null;
        String removeOldReason = null;

        // valid:jobHandler + jobThread
        GlueTypeEnum glueTypeEnum = GlueTypeEnum.match(triggerParam.getGlueType());
        if (GlueTypeEnum.BEAN == glueTypeEnum) {

            // new jobhandler
            // new jobhandler 从缓存中加载任务处理器,根据处理器名字
            IJobHandler newJobHandler = XxlJobExecutor.loadJobHandler(triggerParam.getExecutorHandler());

            // valid old jobThread
            // valid old jobThread 如果新的任务处理器与旧的任务处理器不同,将旧的任务处理器以及旧的任务线程gc
            if (jobThread!=null && jobHandler != newJobHandler) {
                // change handler, need kill old thread
                removeOldReason = "change jobhandler or glue type, and terminate the old job thread.";

                jobThread = null;
                jobHandler = null;
            }

            // valid handler
            if (jobHandler == null) {
                jobHandler = newJobHandler;
                if (jobHandler == null) {
                    return new ReturnT<String>(ReturnT.FAIL_CODE, "job handler [" + triggerParam.getExecutorHandler() + "] not found.");
                }
            }

        } else if (GlueTypeEnum.GLUE_GROOVY == glueTypeEnum) {

            // valid old jobThread
            if (jobThread != null &&
                    !(jobThread.getHandler() instanceof GlueJobHandler
                        && ((GlueJobHandler) jobThread.getHandler()).getGlueUpdatetime()==triggerParam.getGlueUpdatetime() )) {
                // change handler or gluesource updated, need kill old thread
                removeOldReason = "change job source or glue type, and terminate the old job thread.";

                jobThread = null;
                jobHandler = null;
            }

            // valid handler
            if (jobHandler == null) {
                try {
                    IJobHandler originJobHandler = GlueFactory.getInstance().loadNewInstance(triggerParam.getGlueSource());
                    jobHandler = new GlueJobHandler(originJobHandler, triggerParam.getGlueUpdatetime());
                } catch (Exception e) {
                    logger.error(e.getMessage(), e);
                    return new ReturnT<String>(ReturnT.FAIL_CODE, e.getMessage());
                }
            }
        } else if (glueTypeEnum!=null && glueTypeEnum.isScript()) {

            // valid old jobThread
            if (jobThread != null &&
                    !(jobThread.getHandler() instanceof ScriptJobHandler
                            && ((ScriptJobHandler) jobThread.getHandler()).getGlueUpdatetime()==triggerParam.getGlueUpdatetime() )) {
                // change script or gluesource updated, need kill old thread
                removeOldReason = "change job source or glue type, and terminate the old job thread.";

                jobThread = null;
                jobHandler = null;
            }

            // valid handler
            if (jobHandler == null) {
                jobHandler = new ScriptJobHandler(triggerParam.getJobId(), triggerParam.getGlueUpdatetime(), triggerParam.getGlueSource(), GlueTypeEnum.match(triggerParam.getGlueType()));
            }
        } else {
            return new ReturnT<String>(ReturnT.FAIL_CODE, "glueType[" + triggerParam.getGlueType() + "] is not valid.");
        }

        // executor block strategy
        if (jobThread != null) {
            ExecutorBlockStrategyEnum blockStrategy = ExecutorBlockStrategyEnum.match(triggerParam.getExecutorBlockStrategy(), null);
            if (ExecutorBlockStrategyEnum.DISCARD_LATER == blockStrategy) {
                // discard when running
                if (jobThread.isRunningOrHasQueue()) {
                    return new ReturnT<String>(ReturnT.FAIL_CODE, "block strategy effect:"+ExecutorBlockStrategyEnum.DISCARD_LATER.getTitle());
                }
            } else if (ExecutorBlockStrategyEnum.COVER_EARLY == blockStrategy) {
                // kill running jobThread
                if (jobThread.isRunningOrHasQueue()) {
                    removeOldReason = "block strategy effect:" + ExecutorBlockStrategyEnum.COVER_EARLY.getTitle();

                    jobThread = null;
                }
            } else {
                // just queue trigger
            }
        }

        // replace thread (new or exists invalid)
        if (jobThread == null) {
            jobThread = XxlJobExecutor.registJobThread(triggerParam.getJobId(), jobHandler, removeOldReason);
        }

        // push data to queue
        ReturnT<String> pushResult = jobThread.pushTriggerQueue(triggerParam);
        return pushResult;
    }

run方法首先根据任务id从缓存jobThreadRepository(map)中获取任务执行线程jobThread,任务执行线程jobThread保存着任务处理器jobHandler,然后进行校验任务执行线程以及任务处理器。在了解校验过程之前,我们先了解下xxl-job定时任务的种类,xxll0job支持java、groovy、脚本(Shell、Python、PHP、NodeJs、PowerShell)的定时任务。

接下来检验任务执行线程以及任务处理器,就是按照Java、groovy、脚本分别进行校验。

当任务的种类是java时,根据任务处理器的名字从jobHandlerRepository(map)中获取任务处理器,如果新的任务处理器与旧的任务处理器不同,将旧的任务处理器以及旧的任务线程设置为null,等待被java虚拟机gc掉,这样做的目的是,如果已经重新设置了新的任务执行线程和任务处理器,那么就旧的gc掉,不至于一直存在内存中。

如果任务的种类是groovy时,判断任务执行线程不等于null、任务处理器已经更改和groovy的代码被更新了,那么就将旧的任务执行线程和任务执行器设置为null,等待被gc,如果任务处理器还是为null,那么新创建GlueJobHandler任务处理器。

如果是任务的种类是脚本类型,判断任务执行线程不等于null、任务处理器已经更改和脚本的代码被更新了,那么就将旧的任务执行线程和任务执行器设置为null,等待被gc,如果任务处理器还是为null,那么新创建ScriptJobHandler任务处理器。

执行阻塞策略

// 执行阻塞策略
        if (jobThread != null) {
            // 阻塞策略
            ExecutorBlockStrategyEnum blockStrategy = ExecutorBlockStrategyEnum.match(triggerParam.getExecutorBlockStrategy(), null);
            if (ExecutorBlockStrategyEnum.DISCARD_LATER == blockStrategy) {
                // discard when running 如果任务正在执行,直接返回结果,不再往下执行任务
                if (jobThread.isRunningOrHasQueue()) {
                    return new ReturnT<String>(ReturnT.FAIL_CODE, "block strategy effect:"+ExecutorBlockStrategyEnum.DISCARD_LATER.getTitle());
                }
            } else if (ExecutorBlockStrategyEnum.COVER_EARLY == blockStrategy) {
                // kill running jobThread 覆盖之前的
                if (jobThread.isRunningOrHasQueue()) {
                    removeOldReason = "block strategy effect:" + ExecutorBlockStrategyEnum.COVER_EARLY.getTitle();

                    jobThread = null;
                }
            } else {
                // just queue trigger
            }
        }

xxl-job 有三种阻塞队列,分别为SERIAL_EXECUTION(单机串行)、DISCARD_LATER(丢弃)、COVER_EARLY(覆盖之前的)。当阻塞策略为丢弃,则判断该执行线程是否正在执行,如果是则直接返回结果,不再往下执行任务了。当阻塞策略为覆盖之前的,则判断执行线程是否正在执行,如果是则杀掉原来的执行线程。如果阻塞策略是这俩种之外,则不做什么。

注册任务

        // replace thread (new or exists invalid) 
        // 如果任务线程等于null,注册任务线程并启动线程
        if (jobThread == null) {
            jobThread = XxlJobExecutor.registJobThread(triggerParam.getJobId(), jobHandler, removeOldReason);
        }

    public static JobThread registJobThread(int jobId, IJobHandler handler, String removeOldReason){
        JobThread newJobThread = new JobThread(jobId, handler);
        newJobThread.start();
        logger.info(">>>>>>>>>>> xxl-job regist JobThread success, jobId:{}, handler:{}", new Object[]{jobId, handler});

        JobThread oldJobThread = jobThreadRepository.put(jobId, newJobThread);	// putIfAbsent | oh my god, map's put method return the old value!!!
        if (oldJobThread != null) {
            oldJobThread.toStop(removeOldReason);
            oldJobThread.interrupt();
        }

        return newJobThread;
    }

如果任务线程等于null,注册任务线程并启动线程。registJobThread方法首先新建一个任务线程,并调用newJobThread的start方法启动任务线程。然后加入jobThreadRepository进行缓存,当旧的oldJobThread不等于null,则停止掉旧的任务线程。

保存触发参数到缓存

        // push data to queue
        // 保存触发参数到缓存
        ReturnT<String> pushResult = jobThread.pushTriggerQueue(triggerParam);
        return pushResult;

	public ReturnT<String> pushTriggerQueue(TriggerParam triggerParam) {
		// avoid repeat
		if (triggerLogIdSet.contains(triggerParam.getLogId())) {
			logger.info(">>>>>>>>>>> repeate trigger job, logId:{}", triggerParam.getLogId());
			return new ReturnT<String>(ReturnT.FAIL_CODE, "repeate trigger job, logId:" + triggerParam.getLogId());
		}

		triggerLogIdSet.add(triggerParam.getLogId());
		triggerQueue.add(triggerParam);
        return ReturnT.SUCCESS;
	}

pushTriggerQueue方法判断任务id是否已经存在triggerLogIdSet中,如果存在就直接返回结果,如果不存在就添加到triggerLogIdSet中,然后将触发参数保存在triggerQueue队列中。

xxl-job 执行器路由选择

现在的服务大部分都是微服务,在分布式环境中,服务在多台服务器上部署。

xxl-job为了防止定时任务在同一时间内多台服务运行定时任务,利用数据库的悲观锁保证同一时间内只有一台服务运行定时任务,在运行定时任务之前首先获取到锁(select lock for update),然后才运行定时任务,当任务运行完成时,释放悲观锁,其他服务就可以去尝试获取锁而执行定时任务。

上述是xxl-job在分布式环境中如何保证同一时间只有一台服务运行定时任务,那么如何从多台服务中选出一台服务来运行定时任务,这就设计到xxl-job执行路由选择的问题,接下来分析xxl-job是如何选择执行器的。

执行器路由抽象类 ExecutorRouterroute 方法是选择服务器地址的,决定哪一台服务器来执行定时任务。子列展示如下:

// com/xxl/job/admin/core/route/ExecutorRouteStrategyEnum.java
    /**
     * 执行器地址列表的第一个列表
     */
    FIRST(I18nUtil.getString("jobconf_route_first"), new ExecutorRouteFirst()),
    /**
     * 执行器地址列表的最后一个地址
     */
    LAST(I18nUtil.getString("jobconf_route_last"), new ExecutorRouteLast()),
    /**
     * 轮询路由,轮询选择一个执行器地址
     */
    ROUND(I18nUtil.getString("jobconf_route_round"), new ExecutorRouteRound()),
    /**
     * 随机路由,随机选择一个执行器地址
     */
    RANDOM(I18nUtil.getString("jobconf_route_random"), new ExecutorRouteRandom()),
    /**
     * 哈希一致性路由,通过哈希一致性算法选择执行器地址
     */
    CONSISTENT_HASH(I18nUtil.getString("jobconf_route_consistenthash"), new ExecutorRouteConsistentHash()),
    /**
     * 最不经常使用路由,使用频率最低的执行器地址
     */
    LEAST_FREQUENTLY_USED(I18nUtil.getString("jobconf_route_lfu"), new ExecutorRouteLFU()),
    /**
     * 最近最少使用(最近最久未使用路由,选择最近最久未被使用的执行器地址)
     */
    LEAST_RECENTLY_USED(I18nUtil.getString("jobconf_route_lru"), new ExecutorRouteLRU()),
    /**
     * 故障转移路由,查找心跳正常的执行器地址
     */
    FAILOVER(I18nUtil.getString("jobconf_route_failover"), new ExecutorRouteFailover()),
    /**
     * 忙碌转移路由,从执行器地址列表查找心跳正常的执行器地址
     */
    BUSYOVER(I18nUtil.getString("jobconf_route_busyover"), new ExecutorRouteBusyover()),
    /**
     * 分片广播
     */
    SHARDING_BROADCAST(I18nUtil.getString("jobconf_route_shard"), null);

Xxl-job 执行器路由选择可以去xxl-job的源码仓库观看。

xxl-job定时任务执行流程分析-任务执行

在服务端执行的流程中,将任务交给任务线程池JobThread执行,JobThread的run方法主要做了几件事:

  • 处理器的初始化
  • 任务的执行
  • 销毁清理工作

处理器的初始化

    	// init
    	try {
			handler.init();
		} catch (Throwable e) {
    		logger.error(e.getMessage(), e);
		}

处理器的初始化比较简单,调用IJobHandler的init方法,IJobHandler是接口类型,有三种方法,分别是init(初始化方法)、execute(执行方法)、destroy(销毁)方法。IJobHandler接口将在下面具体分析。

任务的执行

销毁清理工作

// callback trigger request in queue
		// 如果任务停止了,需要将队列中的所有触发删除(所有定时任务删除)
		while(triggerQueue !=null && triggerQueue.size()>0){
			// 从队列中获取触发参数
			TriggerParam triggerParam = triggerQueue.poll();
			if (triggerParam!=null) {
				// is killed
				TriggerCallbackThread.pushCallBack(new HandleCallbackParam(
						triggerParam.getLogId(),
						triggerParam.getLogDateTime(),
						XxlJobContext.HANDLE_CODE_FAIL,
						stopReason + " [job not executed, in the job queue, killed.]")
				);
			}
		}

		// destroy
		// 执行器的销毁方法
		try {
			handler.destroy();
		} catch (Throwable e) {
			logger.error(e.getMessage(), e);
		}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/660564.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

python高效使用方法03_pandas中的rolling.mean滚动求均值效率究竟如何?

先上图来说明pandas.rolling(n).mean()滚动求均值的方法效率其实并不是最高的&#xff0c;我自己尝试使用cython把滚动求均值的方法重新编译了一下&#xff0c;发现效率总体上是pandas的三倍以上。 总结&#xff1a;pandas比较合适用于普通的研究分析工作&#xff0c;如果用到追…

AcWing802:详解

原题引出 题解分析 首先毫无置疑的是本体需要用到离散化的知识&#xff0c;将输入的索引下标放到一个vector当中存起来&#xff0c;在该vector当中&#xff0c;利用其本身的索引我们对称构造两个数组a和s&#xff08;用于求前缀和&#xff09;。那么最关键的一个问题就是如何通…

NodeJS Socket编程⑩③

文章目录 ✨文章有误请指正&#xff0c;如果觉得对你有用&#xff0c;请点三连一波&#xff0c;蟹蟹支持&#x1f618;前言Socket编程1、WebSocket VS HTTP请求2、浏览器支持3、WebSocket Practice4、WebSocket 授权验证5、SocketIO模块 &#x1f5e1; Express~WebSokcetIOServ…

Java中不支持多重继承原因

在 Java 中回答这种棘手问题的关键是准备好相关主题, 以应对后续的各种可能的问题。 这是非常经典的问题&#xff0c;与为什么 String 在 Java 中是不可变的很类似; 这两个问题之间的相似之处在于它们主要是由 Java 创作者的设计决策使然。 为什么Java不支持类多重继承, 可以考…

UML基本图例

UML基本图例 软件流程和UML基本图例软件流程&#xff1a;下面是九种常用的UML图&#xff1a;1.用例图&#xff08;UseCase Diagrams&#xff09;2.类图&#xff08;Class Diagram&#xff09;3.对象图&#xff08;Object Diagrams&#xff09;4.状态图&#xff08;Statechart D…

6道常见hadoop面试题

Q1.什么是Hadoop&#xff1f;   Hadoop是一个开源软件框架&#xff0c;用于存储大量数据&#xff0c;并发处理/查询在具有多个商用硬件&#xff08;即低成本硬件&#xff09;节点的集群上的那些数据。总之&#xff0c;Hadoop包括以下内容&#xff1a;   HDFS&#xff08;Ha…

Chiplet技术概览

一、概览 chiplet技术顺应了芯片生产与集成技术发展的趋势&#xff0c;也开拓了半导体技术发展的新的发展方向&#xff0c;将创造出一种新的芯片设计和商业模式 1.1 芯片生产与集成技术发展的趋势 &#xff08;1&#xff09;低半径高带宽的物理连线(bandwidth / memory wall…

打开win10“启动”文件夹的2种方式

方式1&#xff1a;通过CMD命令 1.输入代码 按WinR键打开运行窗口&#xff0c;输入“shell:Common Startup”代码。 2.点击确定 点击下面的确定按钮&#xff0c;运行代码。 3.打开文件夹 弹出文件夹即为系统启动文件夹。 方式2&#xff1a;输入路径 1.打开文件管理器 打…

【城市开发者职业成长交流 - 重庆】加强交流,共创成长:重庆程序员的线下交流会总结

周五睡觉前&#xff0c;我把窗帘拉的严严实实&#xff0c;所有的闹钟全部取消&#xff0c;甚至另外一只电话直接关机掉&#xff0c;为的是第二天可以好好的躺到自然醒&#xff0c;我其实也不知道具体的几点&#xff0c;就感觉连上被踩&#xff01;无奈的睁开眼睛&#xff0c;看…

基于WEB的院校课程管理系统设计与实现(论文+源码)_kaic

摘要 课程管理是学校管理工作的核心&#xff0c;是最为繁琐也最容易出错的工作&#xff0c;开发和 应用课程管理系统能够让课程管理工作人员从繁琐的工作中解脱出来&#xff0c;提高课程 管理的工作效率[1]&#xff0c;实现课程管理的信息化。虽然目前高职院校应用的课程管理系…

bigdata.com《大数据计算框架》样题

容量单位从小到大的顺序依次为&#xff1a;TB、PB、EB、ZB。 . 用于设置环境变量的文件是 .bash_profile . 将HDFS文件下载到本地的命令是 hadoop fs -get。 . 不需要Java环境的支持是 MySQL . 通配符是用于模糊匹配的特殊字符&#xff0c;可以在关键字查询中使用。在MySQL…

助你面试一臂之力,认识银行测试

我们知道软件测试是为了保证软件的质量和可靠性而在新软件系统上线之前对软件进行的质量检测工作。通过软件测试这个过程找出软件中的错误&#xff0c;分析错误的产生原因和容易产生错误的区域&#xff0c;进而有针对性的设计测试方法提高软件测试的效率。 以上简单讲述了一下…

k8s 之网络组件-Calico(十九)

下载资源 &#xff1a;安装 kubernetes 网络组件-Calico 一&#xff0c;简介 Calico是Kubernetes生态系统中另一种流行的网络选择。虽然Flannel被公认为是最简单的选择&#xff0c;但Calico以其性能、灵活性而闻名。Calico的功能更为全面&#xff0c;不仅提供主机和pod之间的网…

【深入浅出密码学】离散对数

群相关知识点 离散对数相关 ## 蛮力搜索 对于解决 α x β \alpha^{x} \beta αxβ,我们不断地选取合适地 x x x,计算 a x a^x ax&#xff0c;直到找到满足这个等式的 x x x&#xff0c;时间复杂度 O ( ∣ G ∣ ) O(|G|) O(∣G∣). Baby-Step Giant-Step 对于解决 α x β…

卡尔曼滤波器使用一维与二维以及代码编写

注&#xff1a;要视频学习可以去B站搜索“DR_CAN”讲解的卡尔曼滤波器&#xff0c;深有体会&#xff01; 链接&#xff1a; 1、【学习心得|基于卡尔曼滤波的MPU6050姿态解算】https://www.bilibili.com/video/BV1sL411F7fu?p2&vd_source3d0b47bb7325b7b3a156ba92207bbd6…

【人工智能】— 神经网络、M-P 神经元模型、激活函数、神经网络结构、学习网络参数、代价定义、总代价

【人工智能】— 神经网络 神经网络的历史Neural Network IntroM-P 神经元模型激活函数(Activation function)神经网络结构举例训练神经网络学习网络参数代价定义均方误差交叉熵&#xff08;Cross Entropy&#xff09; 总代价 神经网络的历史 第一阶段 ⚫ 1943年, McCulloch和Pi…

AES入门 万字详解(附推荐论文和研究领域)

目录 前言 加密过程 SubBytes&#xff08;字节替换&#xff09; ShiftRows&#xff08;行移位&#xff09; MixColumns&#xff08;列混淆&#xff09; AddRoundKey&#xff08;轮密钥加&#xff09; 轮密钥生成过程 概述 具体步骤 代码实现方式 Java Java Cryptog…

Build your own unconditional confidence

不要活在既定的社会价值体系中 人类的偏好大多数时候都是愚昧的 I play whatever gods give me 情绪价值稳定 解决问题的能力 Dont label yourself 真正的强者不会吝啬对他人的赞美 敬畏自然&#xff0c;敬畏未知事物 核心是你对这个事情是否感兴趣&#xff0c;觉得有价…

Java-三种基本控制结构及相关面试题

文章目录 前言一、 顺序控制结构1.1 概念1.2 代码1.3 NS图中体现 二、分支控制结构2.1 概念2.2 if语句2.3 switch语句2.4 NS图中的体现 三、循环控制结构3.1 概念3.2 for循环3.3 while循环3.4 do-while循环3.5 增强 for 循环NS图中的体现 四、相关面试题什么是控制流语句&#…

springboot解析@transaction注解原理

目录 第一步、全局搜索Transactional.class 第二步、查看哪里配置BeanFactoryTransactionAttributeSourceAdvisor 第四、SpringTransactionAnnotationParser是什么时候被注入的 第三、总结 先看一下transaction的官网文档 16. Transaction Management 第一步、全局搜索Tr…