较真儿学源码系列-PowerJob启动流程源码分析

news2024/11/24 2:33:08

        PowerJob版本:4.3.2-main。


1 简介

        PowerJob是全新一代的分布式任务调度与计算框架,官网地址:http://www.powerjob.tech/。其中介绍了PowerJob的功能特点,以及与其他调度框架的对比,这里就不再赘述了。

        以上是PowerJob的架构图,取自官网。可以看出,PowerJob是典型的客户端/服务端交互的架构(但是在PowerJob中却没有一般分布式中间件会有的注册中心)。本文就从启动流程出发,来一起探究下PowerJob在启动阶段中都做了些什么动作。


2 服务端

        既然要看启动流程源码,那么首先就来看下启动类。PowerJob依赖于Spring Boot,启动类为PowerJobServerApplication:

/**
 * powerjob-server entry
 *
 * @author tjq
 * @since 2020/3/29
 */
@Slf4j
@EnableScheduling
@SpringBootApplication
public class PowerJobServerApplication {

    private static final String TIPS = "\n\n" +
            "******************* PowerJob Tips *******************\n" +
            "如果应用无法启动,我们建议您仔细阅读以下文档来解决:\n" +
            "if server can't startup, we recommend that you read the documentation to find a solution:\n" +
            "https://www.yuque.com/powerjob/guidence/problem\n" +
            "******************* PowerJob Tips *******************\n\n";

    public static void main(String[] args) {

        pre();

        // Start SpringBoot application.
        try {
            SpringApplication.run(PowerJobServerApplication.class, args);
        } catch (Throwable t) {
            log.error(TIPS);
            throw t;
        }
    }

    private static void pre() {
        log.info(TIPS);
        PropertyUtils.init();
    }

}

/**
 * 加载配置文件
 *
 * @author tjq
 * @since 2020/5/18
 */
@Slf4j
public class PropertyUtils {

    private static final Properties PROPERTIES = new Properties();

    public static Properties getProperties() {
        return PROPERTIES;
    }

    public static void init() {
        URL propertiesURL = PropertyUtils.class.getClassLoader().getResource("application.properties");
        Objects.requireNonNull(propertiesURL);
        try (InputStream is = propertiesURL.openStream()) {
            PROPERTIES.load(is);
        } catch (Exception e) {
            ExceptionUtils.rethrow(e);
        }
    }
}

        其中pre方法只是将配置文件内容加载进Properties缓存中。

        既然启动类看不出什么逻辑,那么接下来就来看下服务端启动时有没有什么初始化的逻辑:

2.1 PowerTransportService

        PowerTransportService类是用作数据传输服务的,也就是客户端和服务端之间的通信。其实现了InitializingBean接口,所以查看下其afterPropertiesSet方法的实现:

/**
 * PowerTransportService:
 */
@Override
public void afterPropertiesSet() throws Exception {

    log.info("[PowerTransportService] start to initialize whole PowerTransportService!");
    log.info("[PowerTransportService] activeProtocols: {}", activeProtocols);

    if (StringUtils.isEmpty(activeProtocols)) {
        throw new IllegalArgumentException("activeProtocols can't be empty!");
    }

    for (String protocol : activeProtocols.split(OmsConstant.COMMA)) {
        try {
            final int port = parseProtocolPort(protocol);
            //初始化网络通讯
            initRemoteFrameWork(protocol, port);
        } catch (Throwable t) {
            log.error("[PowerTransportService] initialize protocol[{}] failed. If you don't need to use this protocol, you can turn it off by 'oms.transporter.active.protocols'", protocol);
            ExceptionUtils.rethrow(t);
        }
    }

    //选择默认的通信协议,默认为HTTP
    choseDefault();

    log.info("[PowerTransportService] initialize successfully!");
    log.info("[PowerTransportService] ALL_PROTOCOLS: {}", protocolName2Info);
}

/**
 * 第18行代码处:
 */
private void initRemoteFrameWork(String protocol, int port) {

    // 从构造器注入改为从 applicationContext 获取来避免循环依赖
    //获取所有注解了@Actor的bean
    final Map<String, Object> beansWithAnnotation = applicationContext.getBeansWithAnnotation(Actor.class);
    log.info("[PowerTransportService] find Actor num={},names={}", beansWithAnnotation.size(), beansWithAnnotation.keySet());

    Address address = new Address()
            .setHost(NetUtils.getLocalHost())
            .setPort(port);
    EngineConfig engineConfig = new EngineConfig()
            .setServerType(ServerType.SERVER)
            .setType(protocol.toUpperCase())
            .setBindAddress(address)
            .setActorList(Lists.newArrayList(beansWithAnnotation.values()));
    log.info("[PowerTransportService] start to initialize RemoteEngine[type={},address={}]", protocol, address);
    RemoteEngine re = new PowerJobRemoteEngine();
    //初始化网络层
    final EngineOutput engineOutput = re.start(engineConfig);
    log.info("[PowerTransportService] start RemoteEngine[type={},address={}] successfully", protocol, address);

    //放入相关缓存中
    this.engines.add(re);
    this.protocolName2Info.put(protocol, new ProtocolInfo(protocol, address.toFullAddress(), engineOutput.getTransporter()));
}

        其中需要说明的一点是:PowerJob网络层使用的协议是Akka或Vert.x:

  • Akka是一个在JVM上构建高并发、分布式和弹性消息驱动的应用程序。其是用Scala写的,使用到了Actor模型;
  • 而Vert.x是一个在JVM上构建响应式应用程序的工具包,其底层基于Netty(之前我对Netty的源码也进行过分析,感兴趣的话可以查看《较真儿学源码系列-Netty核心流程源码分析》)。

        PowerJob屏蔽了底层的实现,用两个自定义的注解@Actor和@Handler进行了统一的封装。客户端传过来的请求会自动跳转到@Actor注解的类、@Handler注解的方法上。拿客户端发送心跳给服务端的逻辑为例,流程如下所示:

        下面就来继续看下,在上面的第53行代码处。PowerJob是如何完成这个绑定的:

/**
 * PowerJobRemoteEngine:
 */
@Override
public EngineOutput start(EngineConfig engineConfig) {

    final String engineType = engineConfig.getType();
    EngineOutput engineOutput = new EngineOutput();
    log.info("[PowerJobRemoteEngine] [{}] start remote engine with config: {}", engineType, engineConfig);
    //获取所有@Actor的类,和其中注解了@Handler的方法
    List<ActorInfo> actorInfos = ActorFactory.load(engineConfig.getActorList());
    //遍历获取指定协议的CSInitializer
    csInitializer = CSInitializerFactory.build(engineType);

    String type = csInitializer.type();

    Stopwatch sw = Stopwatch.createStarted();
    log.info("[PowerJobRemoteEngine] [{}] try to startup CSInitializer[type={}]", engineType, type);

    //CsInitializer初始化,这里以Vert.x为例,查看下其实现
    csInitializer.init(new CSInitializerConfig()
            .setBindAddress(engineConfig.getBindAddress())
            .setServerType(engineConfig.getServerType())
    );

    // 构建通讯器
    Transporter transporter = csInitializer.buildTransporter();
    engineOutput.setTransporter(transporter);

    log.info("[PowerJobRemoteEngine] [{}] start to bind Handler", engineType);
    actorInfos.forEach(actor -> actor.getHandlerInfos().forEach(handlerInfo -> log.info("[PowerJobRemoteEngine] [{}] PATH={}, handler={}", engineType, handlerInfo.getLocation().toPath(), handlerInfo.getMethod())));

    // 绑定 handler
    csInitializer.bindHandlers(actorInfos);

    log.info("[PowerJobRemoteEngine] [{}] startup successfully, cost: {}", engineType, sw);

    return engineOutput;
}

/**
 * ActorFactory:
 * 第11行代码处:
 */
static List<ActorInfo> load(List<Object> actorList) {

    List<ActorInfo> actorInfos = Lists.newArrayList();

    actorList.forEach(actor -> {
        final Class<?> clz = actor.getClass();
        try {
            final Actor anno = clz.getAnnotation(Actor.class);

            ActorInfo actorInfo = new ActorInfo().setActor(actor).setAnno(anno);
            //获取所有注解了@Handler的方法,并缓存进handlerInfos里
            actorInfo.setHandlerInfos(loadHandlerInfos4Actor(actorInfo));

            actorInfos.add(actorInfo);
        } catch (Throwable t) {
            log.error("[ActorFactory] process Actor[{}] failed!", clz);
            ExceptionUtils.rethrow(t);
        }
    });

    return actorInfos;
}

/**
 * CSInitializerFactory:
 * 第13行代码处:
 */
static CSInitializer build(String targetType) {

    Reflections reflections = new Reflections(OmsConstant.PACKAGE);
    //使用Reflections反射获取CSInitializer的实现类,即AkkaCSInitializer和HttpVertxCSInitializer。Reflections的介绍和简单使用请看https://blog.csdn.net/weixin_30342639/article/details/124521467
    Set<Class<? extends CSInitializer>> cSInitializerClzSet = reflections.getSubTypesOf(CSInitializer.class);

    log.info("[CSInitializerFactory] scan subTypeOf CSInitializer: {}", cSInitializerClzSet);

    for (Class<? extends CSInitializer> clz : cSInitializerClzSet) {
        try {
            CSInitializer csInitializer = clz.getDeclaredConstructor().newInstance();
            //获取类型:AKKA/HTTP
            String type = csInitializer.type();
            log.info("[CSInitializerFactory] new instance for CSInitializer[{}] successfully, type={}, object: {}", clz, type, csInitializer);
            //遍历获取指定协议的CSInitializer
            if (targetType.equalsIgnoreCase(type)) {
                return csInitializer;
            }
        } catch (Exception e) {
            log.error("[CSInitializerFactory] new instance for CSInitializer[{}] failed, maybe you should provide a non-parameter constructor", clz);
            ExceptionUtils.rethrow(e);
        }
    }

    throw new PowerJobException(String.format("can't load CSInitializer[%s], ensure your package name start with 'tech.powerjob' and import the dependencies!", targetType));
}

/**
 * HttpVertxCSInitializer:
 * 第21行代码处:
 * 这里也就是在做Vert.x的初始化工作,不再继续深入了
 */
@Override
public void init(CSInitializerConfig config) {
    this.config = config;
    vertx = VertxInitializer.buildVertx();
    httpServer = VertxInitializer.buildHttpServer(vertx);
    httpClient = VertxInitializer.buildHttpClient(vertx);
}

/**
 * 第34行代码处:
 */
@Override
@SneakyThrows
public void bindHandlers(List<ActorInfo> actorInfos) {
    Router router = Router.router(vertx);
    // 处理请求响应
    router.route().handler(BodyHandler.create());
    actorInfos.forEach(actorInfo -> {
        Optional.ofNullable(actorInfo.getHandlerInfos()).orElse(Collections.emptyList()).forEach(handlerInfo -> {
            //获取目标地址,即@Actor和@Handler注解上path的拼接
            String handlerHttpPath = handlerInfo.getLocation().toPath();
            ProcessType processType = handlerInfo.getAnno().processType();

            Handler<RoutingContext> routingContextHandler = buildRequestHandler(actorInfo, handlerInfo);
            //添加路由绑定
            Route route = router.post(handlerHttpPath);
            if (processType == ProcessType.BLOCKING) {
                //绑定阻塞调用handler
                route.blockingHandler(routingContextHandler, false);
            } else {
                //绑定非阻塞调用handler
                route.handler(routingContextHandler);
            }
        });
    });

    // 启动 vertx http server
    final int port = config.getBindAddress().getPort();
    final String host = config.getBindAddress().getHost();

    httpServer.requestHandler(router)
            .exceptionHandler(e -> log.error("[PowerJob] unknown exception in Actor communication!", e))
            .listen(port, host)
            .toCompletionStage()
            .toCompletableFuture()
            .get(1, TimeUnit.MINUTES);

    log.info("[PowerJobRemoteEngine] startup vertx HttpServer successfully!");
}

/**
 * 第127行代码处:
 */
private Handler<RoutingContext> buildRequestHandler(ActorInfo actorInfo, HandlerInfo handlerInfo) {
    Method method = handlerInfo.getMethod();
    Optional<Class<?>> powerSerializeClz = RemoteUtils.findPowerSerialize(method.getParameterTypes());

    // 内部框架,严格模式,绑定失败直接报错
    if (!powerSerializeClz.isPresent()) {
        throw new PowerJobException("can't find any 'PowerSerialize' object in handler args: " + handlerInfo.getLocation());
    }

    //这里实际上是注册了一个事件驱动的回调函数(Netty的玩法),当有请求过来的时候,会走到下面的代码里
    return ctx -> {
        final RequestBody body = ctx.body();
        final Object convertResult = body.asPojo(powerSerializeClz.get());
        try {
            //这里通过反射调用相关的@Handler注解的方法
            Object response = method.invoke(actorInfo.getActor(), convertResult);
            if (response != null) {
                if (response instanceof String) {
                    ctx.end((String) response);
                } else {
                    ctx.json(JsonObject.mapFrom(response));
                }
                return;
            }

            ctx.end();
        } catch (Throwable t) {
            // 注意这里是框架实际运行时,日志输出用标准 PowerJob 格式
            log.error("[PowerJob] invoke Handler[{}] failed!", handlerInfo.getLocation(), t);
            ctx.fail(HttpResponseStatus.INTERNAL_SERVER_ERROR.code(), t);
        }
    };
}

        通过上面的buildRequestHandler方法可知,当Vert.x接收到客户端的请求时,会调用到一个回调函数。而这个回调函数会最终通过反射的方式调用到自定义的@Handler注解的方法中来。

        上面只是完成了初始化和绑定的操作,还缺少具体调用时候的逻辑(这里属于一个整体流程,所以就一起分析了)。继续拿发送心跳为例,调用的方法是:TransportUtils.reportWorkerHeartbeat:

/**
 * TransportUtils:
 */
public static void reportWorkerHeartbeat(WorkerHeartbeat req, String address, Transporter transporter) {
    //绑定url调用信息
    final URL url = easyBuildUrl(ServerType.SERVER, S4W_PATH, S4W_HANDLER_WORKER_HEARTBEAT, address);
    //这里依旧是拿Vert.x的实现来分析
    transporter.tell(url, req);
}

/**
 * VertxTransporter:
 * 第8行代码处:
 */
@Override
public void tell(URL url, PowerSerializable request) {
    post(url, request, null);
}

/**
 * 这里就涉及到Vert.x的具体调用细节了,不再深入
 */
@SuppressWarnings("unchecked")
private <T> CompletionStage<T> post(URL url, PowerSerializable request, Class<T> clz) {
    final String host = url.getAddress().getHost();
    final int port = url.getAddress().getPort();
    final String path = url.getLocation().toPath();
    RequestOptions requestOptions = new RequestOptions()
            .setMethod(HttpMethod.POST)
            .setHost(host)
            .setPort(port)
            .setURI(path);
    // 获取远程服务器的HTTP连接
    Future<HttpClientRequest> httpClientRequestFuture = httpClient.request(requestOptions);
    // 转换 -> 发送请求获取响应
    Future<HttpClientResponse> responseFuture = httpClientRequestFuture.compose(httpClientRequest ->
            httpClientRequest
                    .putHeader(HttpHeaderNames.CONTENT_TYPE, HttpHeaderValues.APPLICATION_JSON)
                    .send(JsonObject.mapFrom(request).toBuffer())
    );
    return responseFuture.compose(httpClientResponse -> {
        // throw exception
        final int statusCode = httpClientResponse.statusCode();
        if (statusCode != HttpResponseStatus.OK.code()) {
            // CompletableFuture.get() 时会传递抛出该异常
            throw new RemotingException(String.format("request [host:%s,port:%s,url:%s] failed, status: %d, msg: %s",
                    host, port, path, statusCode, httpClientResponse.statusMessage()
            ));
        }

        return httpClientResponse.body().compose(x -> {

            if (clz == null) {
                return Future.succeededFuture(null);
            }

            if (clz.equals(String.class)) {
                return Future.succeededFuture((T) x.toString());
            }

            return Future.succeededFuture(x.toJsonObject().mapTo(clz));
        });
    }).toCompletionStage();
}

        当客户端往服务端发送完请求后,服务端接受到相关的请求,会调用到相应的回调函数,从而最终调用到@Handler注解的方法。整个流程就串起来了。

2.2. InstanceMetadataService

        同PowerTransportService类一样,InstanceMetadataService也实现了InitializingBean接口,所以查看下其afterPropertiesSet方法的实现:

/**
 * InstanceMetadataService:
 */
@Override
public void afterPropertiesSet() throws Exception {
    instanceId2JobInfoCache = CacheBuilder.newBuilder()
            .concurrencyLevel(CACHE_CONCURRENCY_LEVEL)
            .maximumSize(instanceMetadataCacheSize)
            .softValues()
            .build();
}

        其中只是初始化了一个本地缓存,没有多余的逻辑。

2.3 CoreScheduleTaskManager

        CoreScheduleTaskManager也实现了InitializingBean接口,查看下其afterPropertiesSet方法的实现:

/**
 * CoreScheduleTaskManager:
 */
@SuppressWarnings("AlibabaAvoidManuallyCreateThread")
@Override
public void afterPropertiesSet() {
    // 定时调度
    coreThreadContainer.add(new Thread(new LoopRunnable("ScheduleCronJob", PowerScheduleService.SCHEDULE_RATE, () -> powerScheduleService.scheduleNormalJob(TimeExpressionType.CRON)), "Thread-ScheduleCronJob"));
    coreThreadContainer.add(new Thread(new LoopRunnable("ScheduleDailyTimeIntervalJob", PowerScheduleService.SCHEDULE_RATE, () -> powerScheduleService.scheduleNormalJob(TimeExpressionType.DAILY_TIME_INTERVAL)), "Thread-ScheduleDailyTimeIntervalJob"));
    coreThreadContainer.add(new Thread(new LoopRunnable("ScheduleCronWorkflow", PowerScheduleService.SCHEDULE_RATE, powerScheduleService::scheduleCronWorkflow), "Thread-ScheduleCronWorkflow"));
    coreThreadContainer.add(new Thread(new LoopRunnable("ScheduleFrequentJob", PowerScheduleService.SCHEDULE_RATE, powerScheduleService::scheduleFrequentJob), "Thread-ScheduleFrequentJob"));
    // 数据清理
    coreThreadContainer.add(new Thread(new LoopRunnable("CleanWorkerData", PowerScheduleService.SCHEDULE_RATE, powerScheduleService::cleanData), "Thread-CleanWorkerData"));
    // 状态检查
    coreThreadContainer.add(new Thread(new LoopRunnable("CheckRunningInstance", InstanceStatusCheckService.CHECK_INTERVAL, instanceStatusCheckService::checkRunningInstance), "Thread-CheckRunningInstance"));
    coreThreadContainer.add(new Thread(new LoopRunnable("CheckWaitingDispatchInstance", InstanceStatusCheckService.CHECK_INTERVAL, instanceStatusCheckService::checkWaitingDispatchInstance), "Thread-CheckWaitingDispatchInstance"));
    coreThreadContainer.add(new Thread(new LoopRunnable("CheckWaitingWorkerReceiveInstance", InstanceStatusCheckService.CHECK_INTERVAL, instanceStatusCheckService::checkWaitingWorkerReceiveInstance), "Thread-CheckWaitingWorkerReceiveInstance"));
    coreThreadContainer.add(new Thread(new LoopRunnable("CheckWorkflowInstance", InstanceStatusCheckService.CHECK_INTERVAL, instanceStatusCheckService::checkWorkflowInstance), "Thread-CheckWorkflowInstance"));

    coreThreadContainer.forEach(Thread::start);
}

        可以看到,其中初始化了一堆的定时任务。从中挑几个定时任务来分析下:

2.3.1 ScheduleCronJob/ScheduleDailyTimeIntervalJob

/**
 * PowerScheduleService:
 */
public void scheduleNormalJob(TimeExpressionType timeExpressionType) {
    long start = System.currentTimeMillis();
    // 调度 CRON 表达式 JOB
    try {
        //获取在PowerJob控制台配置的appId,也就是服务id(PowerJob会使用数据库来存储服务、实例等相关数据。在进行一些操作前,会先落库,然后再执行。以此避免相关操作丢失的情况出现)
        final List<Long> allAppIds = appInfoRepository.listAppIdByCurrentServer(transportService.defaultProtocol().getAddress());
        if (CollectionUtils.isEmpty(allAppIds)) {
            log.info("[NormalScheduler] current server has no app's job to schedule.");
            return;
        }
        scheduleNormalJob0(timeExpressionType, allAppIds);
    } catch (Exception e) {
        log.error("[NormalScheduler] schedule cron job failed.", e);
    }
    long cost = System.currentTimeMillis() - start;
    log.info("[NormalScheduler] {} job schedule use {} ms.", timeExpressionType, cost);
    if (cost > SCHEDULE_RATE) {
        log.warn("[NormalScheduler] The database query is using too much time({}ms), please check if the database load is too high!", cost);
    }
}

/**
 * 第14行代码处:
 * 调度普通服务端计算表达式类型(CRON、DAILY_TIME_INTERVAL)的任务
 *
 * @param timeExpressionType 表达式类型
 * @param appIds             appIds
 */
private void scheduleNormalJob0(TimeExpressionType timeExpressionType, List<Long> appIds) {

    long nowTime = System.currentTimeMillis();
    long timeThreshold = nowTime + 2 * SCHEDULE_RATE;
    //分组执行
    Lists.partition(appIds, MAX_APP_NUM).forEach(partAppIds -> {

        try {

            // 查询条件:任务开启 + 使用CRON表达调度时间 + 指定appId + 即将需要调度执行
            //获取在PowerJob控制台配置的执行任务
            List<JobInfoDO> jobInfos = jobInfoRepository.findByAppIdInAndStatusAndTimeExpressionTypeAndNextTriggerTimeLessThanEqual(partAppIds, SwitchableStatus.ENABLE.getV(), timeExpressionType.getV(), timeThreshold);

            if (CollectionUtils.isEmpty(jobInfos)) {
                return;
            }

            // 1. 批量写日志表
            Map<Long, Long> jobId2InstanceId = Maps.newHashMap();
            log.info("[NormalScheduler] These {} jobs will be scheduled: {}.", timeExpressionType.name(), jobInfos);

            jobInfos.forEach(jobInfo -> {
                //实例表进行落库(任务和实例的关系为:每执行一次任务会生成一个实例)
                Long instanceId = instanceService.create(jobInfo.getId(), jobInfo.getAppId(), jobInfo.getJobParams(), null, null, jobInfo.getNextTriggerTime()).getInstanceId();
                jobId2InstanceId.put(jobInfo.getId(), instanceId);
            });
            instanceInfoRepository.flush();

            // 2. 推入时间轮中等待调度执行(对时间轮代码的分析请查看https://blog.csdn.net/weixin_30342639/article/details/132732836)
            jobInfos.forEach(jobInfoDO -> {

                Long instanceId = jobId2InstanceId.get(jobInfoDO.getId());

                //获取下次执行时间
                long targetTriggerTime = jobInfoDO.getNextTriggerTime();
                long delay = 0;
                if (targetTriggerTime < nowTime) {
                    log.warn("[Job-{}] schedule delay, expect: {}, current: {}", jobInfoDO.getId(), targetTriggerTime, System.currentTimeMillis());
                } else {
                    //计算距离下次执行时间所需要延迟的时间
                    delay = targetTriggerTime - nowTime;
                }

                //任务实例推入时间轮,等待被执行
                InstanceTimeWheelService.schedule(instanceId, delay, () -> dispatchService.dispatch(jobInfoDO, instanceId, Optional.empty(), Optional.empty()));
            });

            // 3. 计算下一次调度时间(忽略5S内的重复执行,即CRON模式下最小的连续执行间隔为 SCHEDULE_RATE ms)
            jobInfos.forEach(jobInfoDO -> {
                try {
                    //重新计算下次执行时间,并落库
                    //这里会用到策略模式,不同类型的任务(Cron/固定频率/每日固定间隔)会有不同的计算方式,具体不再深入分析,感兴趣可自行查看
                    refreshJob(timeExpressionType, jobInfoDO);
                } catch (Exception e) {
                    log.error("[Job-{}] refresh job failed.", jobInfoDO.getId(), e);
                }
            });
            jobInfoRepository.flush();


        } catch (Exception e) {
            log.error("[NormalScheduler] schedule {} job failed.", timeExpressionType.name(), e);
        }
    });
}

        在上面第76行代码处,当任务推入到时间轮之后(对时间轮代码的分析请查看《较真儿学源码系列-PowerJob时间轮源码分析》),等到需要被执行的时候,会调用到DispatchService.dispatch方法来派发任务:

/**
 * DispatchService:
 * 将任务从Server派发到Worker(TaskTracker)
 * 只会派发当前状态为等待派发的任务实例
 * **************************************************
 * 2021-02-03 modify by Echo009
 * 1、移除参数 当前运行次数、工作流实例ID、实例参数
 * 更改为从当前任务实例中获取获取以上信息
 * 2、移除运行次数相关的(runningTimes)处理逻辑
 * 迁移至 {@link InstanceManager#updateStatus} 中处理
 * **************************************************
 *
 * @param jobInfo              任务的元信息
 * @param instanceId           任务实例ID
 * @param instanceInfoOptional 任务实例信息,可选
 * @param overloadOptional     超载信息,可选
 */
@UseCacheLock(type = "processJobInstance", key = "#jobInfo.getMaxInstanceNum() > 0 || T(tech.powerjob.common.enums.TimeExpressionType).FREQUENT_TYPES.contains(#jobInfo.getTimeExpressionType()) ? #jobInfo.getId() : #instanceId", concurrencyLevel = 1024)
public void dispatch(JobInfoDO jobInfo, Long instanceId, Optional<InstanceInfoDO> instanceInfoOptional, Optional<Holder<Boolean>> overloadOptional) {
    // 允许从外部传入实例信息,减少 io 次数
    // 检查当前任务是否被取消
    //获取实例
    InstanceInfoDO instanceInfo = instanceInfoOptional.orElseGet(() -> instanceInfoRepository.findByInstanceId(instanceId));
    Long jobId = instanceInfo.getJobId();
        
    //...

    // 任务信息已经被删除
    if (jobInfo.getId() == null) {
        log.warn("[Dispatcher-{}|{}] cancel dispatch due to job(id={}) has been deleted!", jobId, instanceId, jobId);
        instanceManager.processFinishedInstance(instanceId, instanceInfo.getWfInstanceId(), FAILED, "can't find job by id " + jobId);
        return;
    }

    Date now = new Date();
    String dbInstanceParams = instanceInfo.getInstanceParams() == null ? "" : instanceInfo.getInstanceParams();
    log.info("[Dispatcher-{}|{}] start to dispatch job: {};instancePrams: {}.", jobId, instanceId, jobInfo, dbInstanceParams);

    // 查询当前运行的实例数
    long current = System.currentTimeMillis();
    Integer maxInstanceNum = jobInfo.getMaxInstanceNum();
    // 秒级任务只派发到一台机器,具体的 maxInstanceNum 由 TaskTracker 控制
    if (TimeExpressionType.FREQUENT_TYPES.contains(jobInfo.getTimeExpressionType())) {
        maxInstanceNum = 1;
    }

    //...
        
    // 获取当前最合适的 worker 列表
    List<WorkerInfo> suitableWorkers = workerClusterQueryService.getSuitableWorkers(jobInfo);

    //...

    // 判断是否超载,在所有可用 worker 超载的情况下直接跳过当前任务
    suitableWorkers = filterOverloadWorker(suitableWorkers);
        
    //...

    //获取worker ip
    List<String> workerIpList = suitableWorkers.stream().map(WorkerInfo::getAddress).collect(Collectors.toList());
    // 构造任务调度请求
    ServerScheduleJobReq req = constructServerScheduleJobReq(jobInfo, instanceInfo, workerIpList);

    // 发送请求(不可靠,需要一个后台线程定期轮询状态)
    //只取第一个worker
    WorkerInfo taskTracker = suitableWorkers.get(0);
    String taskTrackerAddress = taskTracker.getAddress();

    URL workerUrl = ServerURLFactory.dispatchJob2Worker(taskTrackerAddress);
    transportService.tell(taskTracker.getProtocol(), workerUrl, req);
    log.info("[Dispatcher-{}|{}] send schedule request to TaskTracker[protocol:{},address:{}] successfully: {}.", jobId, instanceId, taskTracker.getProtocol(), taskTrackerAddress, req);

    // 修改状态
    //修改实例表状态
    instanceInfoRepository.update4TriggerSucceed(instanceId, WAITING_WORKER_RECEIVE.getV(), current, taskTrackerAddress, now, instanceInfo.getStatus());
    // 装载缓存
    instanceMetadataService.loadJobInfo(instanceId, jobInfo);
}

/**
 * InstanceManager:
 * 第31行代码处:
 * 收尾完成的任务实例
 *
 * @param instanceId   任务实例ID
 * @param wfInstanceId 工作流实例ID,非必须
 * @param status       任务状态,有 成功/失败/手动停止
 * @param result       执行结果
 */
public void processFinishedInstance(Long instanceId, Long wfInstanceId, InstanceStatus status, String result) {

    log.info("[Instance-{}] process finished, final status is {}.", instanceId, status.name());

    // 上报日志数据
    //时间轮延迟执行
    HashedWheelTimerHolder.INACCURATE_TIMER.schedule(() -> instanceLogService.sync(instanceId), 60, TimeUnit.SECONDS);

    // workflow 特殊处理
    if (wfInstanceId != null) {
        // 手动停止在工作流中也认为是失败(理论上不应该发生)
        workflowInstanceManager.move(wfInstanceId, instanceId, status, result);
    }

    // 告警
    if (status == InstanceStatus.FAILED) {
        alert(instanceId, result);
    }
    // 主动移除缓存,减小内存占用
    instanceMetadataService.invalidateJobInfo(instanceId);
}

/**
 * InstanceLogService:
 * 第96行代码处:
 * 将本地的任务实例运行日志同步到 mongoDB 存储,在任务执行结束后异步执行
 *
 * @param instanceId 任务实例ID
 */
@Async(PJThreadPool.BACKGROUND_POOL)
public void sync(Long instanceId) {

    Stopwatch sw = Stopwatch.createStarted();
    try {
        // 先持久化到本地文件
        File stableLogFile = genStableLogFile(instanceId);
        // 将文件推送到 MongoDB
        if (gridFsManager.available()) {
            try {
                gridFsManager.store(stableLogFile, GridFsManager.LOG_BUCKET, genMongoFileName(instanceId));
                log.info("[InstanceLog-{}] push local instanceLogs to mongoDB succeed, using: {}.", instanceId, sw.stop());
            } catch (Exception e) {
                log.warn("[InstanceLog-{}] push local instanceLogs to mongoDB failed.", instanceId, e);
            }
        }
    } catch (Exception e) {
        log.warn("[InstanceLog-{}] sync local instanceLogs failed.", instanceId, e);
    }
    // 删除本地数据库数据
    try {
        instanceId2LastReportTime.remove(instanceId);
        CommonUtils.executeWithRetry0(() -> localInstanceLogRepository.deleteByInstanceId(instanceId));
        log.info("[InstanceLog-{}] delete local instanceLog successfully.", instanceId);
    } catch (Exception e) {
        log.warn("[InstanceLog-{}] delete local instanceLog failed.", instanceId, e);
    }
}

/**
 * WorkerClusterQueryService:
 * 第50行代码处:
 * get worker for job
 *
 * @param jobInfo job
 * @return worker cluster info, sorted by metrics desc
 */
public List<WorkerInfo> getSuitableWorkers(JobInfoDO jobInfo) {

    //获取该集群所有的机器信息
    List<WorkerInfo> workers = Lists.newLinkedList(getWorkerInfosByAppId(jobInfo.getAppId()).values());

    workers.removeIf(workerInfo -> filterWorker(workerInfo, jobInfo));

    DispatchStrategy dispatchStrategy = DispatchStrategy.of(jobInfo.getDispatchStrategy());
    switch (dispatchStrategy) {
        case RANDOM:
            //随机的方式就是打乱顺序
            Collections.shuffle(workers);
            break;
        case HEALTH_FIRST:
            //健康优先的方式需要计算下得分,按得分高低排序
            workers.sort((o1, o2) -> o2.getSystemMetrics().calculateScore() - o1.getSystemMetrics().calculateScore());
            break;
        default:
            // do nothing
    }

    // 限定集群大小(0代表不限制)
    if (!workers.isEmpty() && jobInfo.getMaxWorkerCount() > 0 && workers.size() > jobInfo.getMaxWorkerCount()) {
        workers = workers.subList(0, jobInfo.getMaxWorkerCount());
    }
    return workers;
}

/**
 * SystemMetrics:
 * 第171行代码处:
 * Calculate score, based on CPU and memory info.
 *
 * @return score
 */
public int calculateScore() {
    if (score > 0) {
        return score;
    }
    // Memory is vital to TaskTracker, so we set the multiplier factor as 2.
    //未使用内存指标的权重为2
    double memScore = (jvmMaxMemory - jvmUsedMemory) * 2;
    // Calculate the remaining load of CPU. Multiplier is set as 1.
    //剩余可用cpu数的权重为1
    double cpuScore = cpuProcessors - cpuLoad;
    // Windows can not fetch CPU load, set cpuScore as 1.
    //Windows系统拿不到cpu使用数,所以cpu得分设置为1
    if (cpuScore > cpuProcessors) {
        cpuScore = 1;
    }
    //最终得分就是内存得分+cpu得分
    score = (int) (memScore + cpuScore);
    return score;
}

/**
 * ServerURLFactory:
 * 第69行代码处:
 */
public static URL dispatchJob2Worker(String address) {
    return simileBuild(address, ServerType.WORKER, WORKER_PATH, WTT_HANDLER_RUN_JOB);
}

        在上面第216行代码处会将请求跳转到WorkerActor.onReceiveServerScheduleJobReq方法:

/**
 * WorkerActor:
 */
@Handler(path = WTT_HANDLER_RUN_JOB)
public void onReceiveServerScheduleJobReq(ServerScheduleJobReq req) {
    taskTrackerActor.onReceiveServerScheduleJobReq(req);
}

/**
 * TaskTrackerActor:
 * 服务器任务调度处理器
 */
@Handler(path = WTT_HANDLER_RUN_JOB)
public void onReceiveServerScheduleJobReq(ServerScheduleJobReq req) {
    log.debug("[TaskTrackerActor] server schedule job by request: {}.", req);
    Long instanceId = req.getInstanceId();
    // 区分轻量级任务模型以及重量级任务模型
    //单机执行的OpenApi/Corn/工作流是轻量级任务,其他的是重量级任务
    if (isLightweightTask(req)) {
        final LightTaskTracker taskTracker = LightTaskTrackerManager.getTaskTracker(instanceId);
        if (taskTracker != null) {
            log.warn("[TaskTrackerActor] LightTaskTracker({}) for instance(id={}) already exists.", taskTracker, instanceId);
            return;
        }
        // 判断是否已经 overload
        if (LightTaskTrackerManager.currentTaskTrackerSize() >= workerRuntime.getWorkerConfig().getMaxLightweightTaskNum() * LightTaskTrackerManager.OVERLOAD_FACTOR) {
            // ignore this request
            log.warn("[TaskTrackerActor] this worker is overload,ignore this request(instanceId={}),current size = {}!", instanceId, LightTaskTrackerManager.currentTaskTrackerSize());
            return;
        }
        if (LightTaskTrackerManager.currentTaskTrackerSize() >= workerRuntime.getWorkerConfig().getMaxLightweightTaskNum()) {
            log.warn("[TaskTrackerActor] this worker will be overload soon,current size = {}!", LightTaskTrackerManager.currentTaskTrackerSize());
        }
        // 创建轻量级任务
        //在轻量级本地缓存中添加TaskTracker
        LightTaskTrackerManager.atomicCreateTaskTracker(instanceId, ignore -> LightTaskTracker.create(req, workerRuntime));
    } else {
        HeavyTaskTracker taskTracker = HeavyTaskTrackerManager.getTaskTracker(instanceId);
        if (taskTracker != null) {
            log.warn("[TaskTrackerActor] HeavyTaskTracker({}) for instance(id={}) already exists.", taskTracker, instanceId);
            return;
        }
        // 判断是否已经 overload
        if (HeavyTaskTrackerManager.currentTaskTrackerSize() >= workerRuntime.getWorkerConfig().getMaxHeavyweightTaskNum()) {
            // ignore this request
            log.warn("[TaskTrackerActor] this worker is overload,ignore this request(instanceId={})! current size = {},", instanceId, HeavyTaskTrackerManager.currentTaskTrackerSize());
            return;
        }
        // 原子创建,防止多实例的存在
        //在重量级本地缓存中添加TaskTracker
        HeavyTaskTrackerManager.atomicCreateTaskTracker(instanceId, ignore -> HeavyTaskTracker.create(req, workerRuntime));
    }
}

        由上可以看到,派发任务就是往TaskTracker里添加一条本地缓存。

        这里出现了TaskTracker,TaskTrackerProcessorTrackerProcessor的概念直接参考PowerJob作者写过的文章:

        这里我们继续分析下上面第51行代码处的HeavyTaskTracker.create方法:

/**
 * HeavyTaskTracker:
 * 静态方法创建 TaskTracker
 *
 * @param req 服务端调度任务请求
 * @return API/CRON -> CommonTaskTracker, FIX_RATE/FIX_DELAY -> FrequentTaskTracker
 */
public static HeavyTaskTracker create(ServerScheduleJobReq req, WorkerRuntime workerRuntime) {
    try {
        TimeExpressionType timeExpressionType = TimeExpressionType.valueOf(req.getTimeExpressionType());
        switch (timeExpressionType) {
            case FIXED_RATE:
            case FIXED_DELAY:
                return new FrequentTaskTracker(req, workerRuntime);
            default:
                //这里我们分析下CommonTaskTracker构造器的实现
                return new CommonTaskTracker(req, workerRuntime);
        }
    } catch (Exception e) {
        reportCreateErrorToServer(req, workerRuntime, e);
    }
    return null;
}

/**
 * CommonTaskTracker:
 * 第17行代码处:
 */
protected CommonTaskTracker(ServerScheduleJobReq req, WorkerRuntime workerRuntime) {
    super(req, workerRuntime);
}

/**
 * HeavyTaskTracker:
 */
protected HeavyTaskTracker(ServerScheduleJobReq req, WorkerRuntime workerRuntime) {
    // 初始化成员变量
    super(req, workerRuntime);
    // 赋予时间表达式类型
    instanceInfo.setTimeExpressionType(TimeExpressionType.valueOf(req.getTimeExpressionType()).getV());
    // 保护性操作
    instanceInfo.setThreadConcurrency(Math.max(1, instanceInfo.getThreadConcurrency()));
    this.ptStatusHolder = new ProcessorTrackerStatusHolder(instanceId, req.getMaxWorkerCount(), req.getAllWorkerAddress());
    this.taskPersistenceService = workerRuntime.getTaskPersistenceService();
    // 构建缓存
    taskId2BriefInfo = CacheBuilder.newBuilder().maximumSize(1024).build();

    // 构建分段锁
    //SegmentLock是自己实现的ReentrantLock数组(使用数组是为了提高并发度)
    segmentLock = new SegmentLock(UPDATE_CONCURRENCY);

    // 子类自定义初始化操作
    initTaskTracker(req);

    log.info("[TaskTracker-{}] create TaskTracker successfully.", instanceId);
}

/**
 * CommonTaskTracker:
 * 第53行代码处:
 *
 * @param req 服务器调度任务实例运行请求
 */
@Override
protected void initTaskTracker(ServerScheduleJobReq req) {

    // CommonTaskTrackerTimingPool 缩写
    String poolName = String.format("ctttp-%d", req.getInstanceId()) + "-%d";
    ThreadFactory factory = new ThreadFactoryBuilder().setNameFormat(poolName).build();
    this.scheduledPool = Executors.newScheduledThreadPool(2, factory);

    // 持久化根任务
    persistenceRootTask();

    // 开启定时状态检查
    int delay = Integer.parseInt(System.getProperty(PowerJobDKey.WORKER_STATUS_CHECK_PERIOD, "13"));
    scheduledPool.scheduleWithFixedDelay(new StatusCheckRunnable(), 3, delay, TimeUnit.SECONDS);

    // 如果是 MR 任务,则需要启动执行器动态检测装置
    ExecuteType executeType = ExecuteType.valueOf(req.getExecuteType());
    if (executeType == ExecuteType.MAP || executeType == ExecuteType.MAP_REDUCE) {
        scheduledPool.scheduleAtFixedRate(new WorkerDetector(), 1, 1, TimeUnit.MINUTES);
    }

    // 最后启动任务派发器,否则会出现 TaskTracker 还未创建完毕 ProcessorTracker 已开始汇报状态的情况
    scheduledPool.scheduleWithFixedDelay(new Dispatcher(), 10, 5000, TimeUnit.MILLISECONDS);
}

/**
 * 第73行代码处:
 * 持久化根任务,只有完成持久化才能视为任务开始running(先持久化,再报告server)
 */
private void persistenceRootTask() {

    TaskDO rootTask = new TaskDO();
    rootTask.setStatus(TaskStatus.WAITING_DISPATCH.getValue());
    rootTask.setInstanceId(instanceInfo.getInstanceId());
    rootTask.setTaskId(ROOT_TASK_ID);
    rootTask.setFailedCnt(0);
    rootTask.setAddress(workerRuntime.getWorkerAddress());
    //这里需要留意下,根任务的名称为OMS_ROOT_TASK(ROOT_TASK_NAME)
    rootTask.setTaskName(TaskConstant.ROOT_TASK_NAME);
    rootTask.setCreatedTime(System.currentTimeMillis());
    rootTask.setLastModifiedTime(System.currentTimeMillis());
    rootTask.setLastReportTime(-1L);
    rootTask.setSubInstanceId(instanceId);

    if (taskPersistenceService.save(rootTask)) {
        log.info("[TaskTracker-{}] create root task successfully.", instanceId);
    } else {
        log.error("[TaskTracker-{}] create root task failed.", instanceId);
        throw new PowerJobException("create root task failed for instance: " + instanceId);
    }
}

        在上面第77行代码处、第82行代码处和第86行代码处的initTaskTracker方法中,分别开启了三个定时任务,这里我们主要分析下StatusCheckRunnable和Dispatcher(WorkerDetector放在《较真儿学源码系列-PowerJob MapReduce源码分析》中分析):

2.3.1.1 StatusCheckRunnable
/**
 * StatusCheckRunnable:
 */
@Override
public void run() {
    try {
        innerRun();
    } catch (Exception e) {
        log.warn("[TaskTracker-{}] status checker execute failed, please fix the bug (@tjq)!", instanceId, e);
    }
}

/**
 * 第7行代码处:
 */
@SuppressWarnings("squid:S3776")
private void innerRun() {

    //获取任务实例产生的各个Task状态,用于分析任务实例执行情况
    InstanceStatisticsHolder holder = getInstanceStatisticsHolder(instanceId);

    long finishedNum = holder.succeedNum + holder.failedNum;
    long unfinishedNum = holder.waitingDispatchNum + holder.workerUnreceivedNum + holder.receivedNum + holder.runningNum;

    log.debug("[TaskTracker-{}] status check result: {}", instanceId, holder);

    //组装上报参数
    TaskTrackerReportInstanceStatusReq req = new TaskTrackerReportInstanceStatusReq();
    req.setAppId(workerRuntime.getAppId());
    req.setJobId(instanceInfo.getJobId());
    req.setInstanceId(instanceId);
    req.setWfInstanceId(instanceInfo.getWfInstanceId());
    req.setTotalTaskNum(finishedNum + unfinishedNum);
    req.setSucceedTaskNum(holder.succeedNum);
    req.setFailedTaskNum(holder.failedNum);
    req.setReportTime(System.currentTimeMillis());
    req.setStartTime(createTime);
    req.setSourceAddress(workerRuntime.getWorkerAddress());

    boolean success = false;
    String result = null;

    // 2. 如果未完成任务数为0,判断是否真正结束,并获取真正结束任务的执行结果
    if (unfinishedNum == 0) {

        // 数据库中一个任务都没有,说明根任务创建失败,该任务实例失败
        if (finishedNum == 0) {
            finished.set(true);
            result = SystemInstanceResult.TASK_INIT_FAILED;
        } else {
            ExecuteType executeType = ExecuteType.valueOf(instanceInfo.getExecuteType());

            switch (executeType) {

                // STANDALONE 只有一个任务,完成即结束
                case STANDALONE:
                    finished.set(true);
                    List<TaskDO> allTask = taskPersistenceService.getAllTask(instanceId, instanceId);
                    if (CollectionUtils.isEmpty(allTask) || allTask.size() > 1) {
                        result = SystemInstanceResult.UNKNOWN_BUG;
                        log.warn("[TaskTracker-{}] there must have some bug in TaskTracker.", instanceId);
                    } else {
                        result = allTask.get(0).getResult();
                        success = allTask.get(0).getStatus() == TaskStatus.WORKER_PROCESS_SUCCESS.getValue();
                    }
                    break;
                case MAP:
                    //...
                    break;
                default:
                    //...
            }
        }
    }

    // 3. 检查任务实例整体是否超时
    if (isTimeout()) {
        finished.set(true);
        success = false;
        result = SystemInstanceResult.INSTANCE_EXECUTE_TIMEOUT;
    }

    // 4. 执行完毕,报告服务器
    if (finished.get()) {
        req.setResult(result);
        // 上报追加的工作流上下文信息
        req.setAppendedWfContext(appendedWfContext);
        req.setInstanceStatus(success ? InstanceStatus.SUCCEED.getV() : InstanceStatus.FAILED.getV());
        reportFinalStatusThenDestroy(workerRuntime, req);
        return;
    }

    // 5. 未完成,上报状态
    req.setInstanceStatus(InstanceStatus.RUNNING.getV());
    TransportUtils.ttReportInstanceStatus(req, workerRuntime.getServerDiscoveryService().getCurrentServerAddress(), workerRuntime.getTransporter());

    // 6.1 定期检查 -> 重试派发后未确认的任务
    long currentMS = System.currentTimeMillis();
    if (holder.workerUnreceivedNum != 0) {
        taskPersistenceService.getTaskByStatus(instanceId, TaskStatus.DISPATCH_SUCCESS_WORKER_UNCHECK, 100).forEach(uncheckTask -> {

            long elapsedTime = currentMS - uncheckTask.getLastModifiedTime();
            if (elapsedTime > DISPATCH_TIME_OUT_MS) {

                TaskDO updateEntity = new TaskDO();
                updateEntity.setStatus(TaskStatus.WAITING_DISPATCH.getValue());
                // 特殊任务只能本机执行
                if (!TaskConstant.LAST_TASK_NAME.equals(uncheckTask.getTaskName())) {
                    updateEntity.setAddress(RemoteConstant.EMPTY_ADDRESS);
                }
                // 失败次数 + 1
                updateEntity.setFailedCnt(uncheckTask.getFailedCnt() + 1);

                taskPersistenceService.updateTask(instanceId, uncheckTask.getTaskId(), updateEntity);

                log.warn("[TaskTracker-{}] task(id={},name={}) try to dispatch again due to unreceived the response from ProcessorTracker.",
                        instanceId, uncheckTask.getTaskId(), uncheckTask.getTaskName());
            }

        });
    }

    // 6.2 定期检查 -> 重新执行被派发到宕机ProcessorTracker上的任务
    List<String> disconnectedPTs = ptStatusHolder.getAllDisconnectedProcessorTrackers();
    if (!disconnectedPTs.isEmpty()) {
        log.warn("[TaskTracker-{}] some ProcessorTracker disconnected from TaskTracker,their address is {}.", instanceId, disconnectedPTs);
        if (taskPersistenceService.updateLostTasks(instanceId, disconnectedPTs, true)) {
            ptStatusHolder.remove(disconnectedPTs);
            log.warn("[TaskTracker-{}] removed these ProcessorTracker from StatusHolder: {}", instanceId, disconnectedPTs);
        }
    }
}

        上报到服务端的逻辑这里就不再看了(更新instance_info实例表数据)。整体上来说,StatusCheckRunnable的作用就是检查任务的执行情况,并上报到服务端。

2.3.1.2 Dispatcher
/**
 * Dispatcher:
 */
@Override
public void run() {

    if (finished.get()) {
        return;
    }

    Stopwatch stopwatch = Stopwatch.createStarted();

    // 1. 获取可以派发任务的 ProcessorTracker
    List<String> availablePtIps = ptStatusHolder.getAvailableProcessorTrackers();

    // 2. 没有可用 ProcessorTracker,本次不派发
    if (availablePtIps.isEmpty()) {
        log.debug("[TaskTracker-{}] no available ProcessorTracker now.", instanceId);
        return;
    }

    // 3. 避免大查询,分批派发任务
    long currentDispatchNum = 0;
    //这里需要留意下,最大分发次数=可用ProcessorTracker数量*实例并发度*2
    long maxDispatchNum = availablePtIps.size() * instanceInfo.getThreadConcurrency() * 2L;
    AtomicInteger index = new AtomicInteger(0);

    // 4. 循环查询数据库,获取需要派发的任务
    while (maxDispatchNum > currentDispatchNum) {

        //每次最多查询100个任务
        int dbQueryLimit = Math.min(DB_QUERY_LIMIT, (int) maxDispatchNum);
        //获取等待调度器调度的任务
        List<TaskDO> needDispatchTasks = taskPersistenceService.getTaskByStatus(instanceId, TaskStatus.WAITING_DISPATCH, dbQueryLimit);
        currentDispatchNum += needDispatchTasks.size();

        needDispatchTasks.forEach(task -> {
            // 获取 ProcessorTracker 地址,如果 Task 中自带了 Address,则使用该 Address
            String ptAddress = task.getAddress();
            if (StringUtils.isEmpty(ptAddress) || RemoteConstant.EMPTY_ADDRESS.equals(ptAddress)) {
                //否则,从可用ProcessorTracker中取余获取一个
                ptAddress = availablePtIps.get(index.getAndIncrement() % availablePtIps.size());
            }
            //分发任务
            dispatchTask(task, ptAddress);
        });

        // 数量不足 或 查询失败,则终止循环
        if (needDispatchTasks.size() < dbQueryLimit) {
            break;
        }
    }

    log.debug("[TaskTracker-{}] dispatched {} tasks,using time {}.", instanceId, currentDispatchNum, stopwatch.stop());
}

/**
 * ProcessorTrackerStatusHolder:
 * 第14行代码处:
 * 获取可用 ProcessorTracker 的IP地址
 */
public List<String> getAvailableProcessorTrackers() {

    List<String> result = Lists.newLinkedList();
    address2Status.forEach((address, ptStatus) -> {
        if (ptStatus.available()) {
            result.add(address);
        }
    });
    return result;
}

/**
 * ProcessorTrackerStatus:
 * 第66行代码处:
 * 是否可用
 */
public boolean available() {

    // 未曾派发过,默认可用
    if (!dispatched) {
        return true;
    }

    // 已派发但未收到响应,则不可用
    if (!connected) {
        return false;
    }

    // 长时间未收到心跳消息,则不可用
    if (isTimeout()) {
        return false;
    }

    // 留有过多待处理任务,则不可用
    if (remainTaskNum >= DISPATCH_THRESHOLD) {
        return false;
    }

    // TODO:后续考虑加上机器健康度等信息

    return true;
}

/**
 * 第91行代码处:
 * 是否超时(超过一定时间没有收到心跳)
 */
public boolean isTimeout() {
    if (dispatched) {
        //系统当前时间-上次活跃时间>心跳超时时间
        return System.currentTimeMillis() - lastActiveTime > HEARTBEAT_TIMEOUT_MS;
    }
    // 未曾派发过任务的机器,不用处理
    return false;
}

/**
 * HeavyTaskTracker:
 * 第45行代码处:
 * 派发任务到 ProcessorTracker
 *
 * @param task                    需要被执行的任务
 * @param processorTrackerAddress ProcessorTracker的地址(IP:Port)
 */
protected void dispatchTask(TaskDO task, String processorTrackerAddress) {

    // 1. 持久化,更新数据库(如果更新数据库失败,可能导致重复执行,先不处理)
    TaskDO updateEntity = new TaskDO();
    updateEntity.setStatus(TaskStatus.DISPATCH_SUCCESS_WORKER_UNCHECK.getValue());
    // 写入处理该任务的 ProcessorTracker
    updateEntity.setAddress(processorTrackerAddress);
    boolean success = taskPersistenceService.updateTask(instanceId, task.getTaskId(), updateEntity);
    if (!success) {
        log.warn("[TaskTracker-{}] dispatch task(taskId={},taskName={}) failed due to update task status failed.", instanceId, task.getTaskId(), task.getTaskName());
        return;
    }

    // 2. 更新 ProcessorTrackerStatus 状态
    ptStatusHolder.getProcessorTrackerStatus(processorTrackerAddress).setDispatched(true);
    // 3. 初始化缓存
    taskId2BriefInfo.put(task.getTaskId(), new TaskBriefInfo(task.getTaskId(), TaskStatus.DISPATCH_SUCCESS_WORKER_UNCHECK, -1L));

    // 4. 任务派发
    TaskTrackerStartTaskReq startTaskReq = new TaskTrackerStartTaskReq(instanceInfo, task, workerRuntime.getWorkerAddress());
    TransportUtils.ttStartPtTask(startTaskReq, processorTrackerAddress, workerRuntime.getTransporter());

    log.debug("[TaskTracker-{}] dispatch task(taskId={},taskName={}) successfully.", instanceId, task.getTaskId(), task.getTaskName());
}

/**
 * TransportUtils:
 * 第146行代码处:
 */
public static void ttStartPtTask(TaskTrackerStartTaskReq req, String address, Transporter transporter) {
    final URL url = easyBuildUrl(ServerType.WORKER, WPT_PATH, WPT_HANDLER_START_TASK, address);
    transporter.tell(url, req);
}

/**
 * ProcessorTrackerActor:
 * 第157行代码处:
 * 处理来自TaskTracker的task执行请求
 *
 * @param req 请求
 */
@Handler(path = RemoteConstant.WPT_HANDLER_START_TASK, processType = ProcessType.NO_BLOCKING)
public void onReceiveTaskTrackerStartTaskReq(TaskTrackerStartTaskReq req) {

    Long instanceId = req.getInstanceInfo().getInstanceId();

    // 创建 ProcessorTracker 一定能成功
    ProcessorTracker processorTracker = ProcessorTrackerManager.getProcessorTracker(
            instanceId,
            req.getTaskTrackerAddress(),
            () -> new ProcessorTracker(req, workerRuntime));

    TaskDO task = new TaskDO();

    task.setTaskId(req.getTaskId());
    task.setTaskName(req.getTaskName());
    task.setTaskContent(req.getTaskContent());
    task.setFailedCnt(req.getTaskCurrentRetryNums());
    task.setSubInstanceId(req.getSubInstanceId());

    processorTracker.submitTask(task);
}

/**
 * ProcessorTracker:
 * 提交任务到线程池执行
 * 1.0版本:TaskTracker有任务就dispatch,导致 ProcessorTracker 本地可能堆积过多的任务,造成内存压力。为此 ProcessorTracker 在线程
 * 池队列堆积到一定程度时,会将数据持久化到DB,然后通过异步线程定时从数据库中取回任务,重新提交执行。
 * 联动:数据库的SPID设计、TaskStatus段落设计等,全部取消...
 * last commitId: 341953aceceafec0fbe7c3d9a3e26451656b945e
 * 2.0版本:ProcessorTracker定时向TaskTracker发送心跳消息,心跳消息中包含了当前线程池队列任务个数,TaskTracker根据ProcessorTracker
 * 的状态判断能否继续派发任务。因此,ProcessorTracker本地不会堆积过多任务,故删除 持久化机制 ╥﹏╥...!
 *
 * @param newTask 需要提交到线程池执行的任务
 */
public void submitTask(TaskDO newTask) {

    // 一旦 ProcessorTracker 出现异常,所有提交到此处的任务直接返回失败,防止形成死锁
    // 死锁分析:TT创建PT,PT创建失败,无法定期汇报心跳,TT长时间未收到PT心跳,认为PT宕机(确实宕机了),无法选择可用的PT再次派发任务,死锁形成,GG斯密达 T_T
    if (lethal) {
        ProcessorReportTaskStatusReq report = new ProcessorReportTaskStatusReq()
                .setInstanceId(instanceId)
                .setSubInstanceId(newTask.getSubInstanceId())
                .setTaskId(newTask.getTaskId())
                .setStatus(TaskStatus.WORKER_PROCESS_FAILED.getValue())
                .setResult(lethalReason)
                .setReportTime(System.currentTimeMillis());

        TransportUtils.ptReportTask(report, taskTrackerAddress, workerRuntime);
        return;
    }

    boolean success = false;
    // 1. 设置值并提交执行
    newTask.setInstanceId(instanceInfo.getInstanceId());
    newTask.setAddress(taskTrackerAddress);

    HeavyProcessorRunnable heavyProcessorRunnable = new HeavyProcessorRunnable(instanceInfo, taskTrackerAddress, newTask, processorBean, omsLogger, statusReportRetryQueue, workerRuntime);
    try {
        threadPool.submit(heavyProcessorRunnable);
        success = true;
    } catch (RejectedExecutionException ignore) {
        log.warn("[ProcessorTracker-{}] submit task(taskId={},taskName={}) to ThreadPool failed due to ThreadPool has too much task waiting to process, this task will dispatch to other ProcessorTracker.",
                instanceId, newTask.getTaskId(), newTask.getTaskName());
    } catch (Exception e) {
        log.error("[ProcessorTracker-{}] submit task(taskId={},taskName={}) to ThreadPool failed.", instanceId, newTask.getTaskId(), newTask.getTaskName(), e);
    }

    // 2. 回复接收成功
    if (success) {
        ProcessorReportTaskStatusReq reportReq = new ProcessorReportTaskStatusReq();
        reportReq.setInstanceId(instanceId);
        reportReq.setSubInstanceId(newTask.getSubInstanceId());
        reportReq.setTaskId(newTask.getTaskId());
        reportReq.setStatus(TaskStatus.WORKER_RECEIVED.getValue());
        reportReq.setReportTime(System.currentTimeMillis());

        TransportUtils.ptReportTask(reportReq, taskTrackerAddress, workerRuntime);

        log.debug("[ProcessorTracker-{}] submit task(taskId={}, taskName={}) success, current queue size: {}.",
                instanceId, newTask.getTaskId(), newTask.getTaskName(), threadPool.getQueue().size());
    }
}

        可以看到,在最后,Dispatcher会创建出一个HeavyProcessorRunnable的线程来执行,里面存放着需要执行的任务实例、执行地址、执行处理器等信息。接下来看下其实现:

/**
 * HeavyProcessorRunnable:
 */
@Override
@SuppressWarnings("squid:S2142")
public void run() {
    // 切换线程上下文类加载器(否则用的是 Worker 类加载器,不存在容器类,在序列化/反序列化时会报 ClassNotFoundException)
    Thread.currentThread().setContextClassLoader(processorBean.getClassLoader());
    try {
        innerRun();
    } catch (InterruptedException ignore) {
        // ignore
    } catch (Throwable e) {
        reportStatus(TaskStatus.WORKER_PROCESS_FAILED, e.toString(), null, null);
        log.error("[ProcessorRunnable-{}] execute failed, please contact the author(@KFCFans) to fix the bug!", task.getInstanceId(), e);
    } finally {
        ThreadLocalStore.clear();
    }
}

/**
 * 第10行代码处:
 */
public void innerRun() throws InterruptedException {

    //获取执行处理器
    final BasicProcessor processor = processorBean.getProcessor();

    String taskId = task.getTaskId();
    Long instanceId = task.getInstanceId();

    log.debug("[ProcessorRunnable-{}] start to run task(taskId={}&taskName={})", instanceId, taskId, task.getTaskName());
    //缓存
    ThreadLocalStore.setTask(task);
    ThreadLocalStore.setRuntimeMeta(workerRuntime);

    // 0. 构造任务上下文
    WorkflowContext workflowContext = constructWorkflowContext();
    TaskContext taskContext = constructTaskContext();
    taskContext.setWorkflowContext(workflowContext);
    // 1. 上报执行信息
    reportStatus(TaskStatus.WORKER_PROCESSING, null, null, null);

    ProcessResult processResult;
    ExecuteType executeType = ExecuteType.valueOf(instanceInfo.getExecuteType());

    // 2. 根任务 & 广播执行 特殊处理
    if (TaskConstant.ROOT_TASK_NAME.equals(task.getTaskName()) && executeType == ExecuteType.BROADCAST) {
        // 广播执行:先选本机执行 preProcess,完成后 TaskTracker 再为所有 Worker 生成子 Task
        handleBroadcastRootTask(instanceId, taskContext);
        return;
    }

    // 3. 最终任务特殊处理(一定和 TaskTracker 处于相同的机器)
    if (TaskConstant.LAST_TASK_NAME.equals(task.getTaskName())) {
        handleLastTask(taskId, instanceId, taskContext, executeType);
        return;
    }

    // 4. 正式提交运行
    try {
        //这里的process方法也就是我们自己写的业务方法
        processResult = processor.process(taskContext);
        if (processResult == null) {
            processResult = new ProcessResult(false, "ProcessResult can't be null");
        }
    } catch (Throwable e) {
        log.warn("[ProcessorRunnable-{}] task(id={},name={}) process failed.", instanceId, taskContext.getTaskId(), taskContext.getTaskName(), e);
        processResult = new ProcessResult(false, e.toString());
    }
    //上报执行结果
    reportStatus(processResult.isSuccess() ? TaskStatus.WORKER_PROCESS_SUCCESS : TaskStatus.WORKER_PROCESS_FAILED, suit(processResult.getMsg()), null, workflowContext.getAppendedContextData());
}

        Dispatcher的作用就是分发任务,并执行任务。也就是会有一个单独的线程(HeavyProcessorRunnable),会定时从任务表中拿取任务,然后执行我们自己实现的process方法,并会在执行前和执行后上报执行信息。

2.3.2 CheckRunningInstance

/**
 * InstanceStatusCheckService:
 * 检查运行中的实例
 * RUNNING 超时:TaskTracker down,断开与 server 的心跳连接
 */
public void checkRunningInstance() {
    Stopwatch stopwatch = Stopwatch.createStarted();
    // 查询 DB 获取该 Server 需要负责的 AppGroup
    //获取appId
    List<Long> allAppIds = appInfoRepository.listAppIdByCurrentServer(transportService.defaultProtocol().getAddress());
    if (CollectionUtils.isEmpty(allAppIds)) {
        log.info("[InstanceStatusChecker] current server has no app's job to check");
        return;
    }
    try {
        // 检查 RUNNING 状态的任务(一定时间没收到 TaskTracker 的状态报告,视为失败)
        Lists.partition(allAppIds, MAX_BATCH_NUM_APP).forEach(this::handleRunningInstance);
    } catch (Exception e) {
        log.error("[InstanceStatusChecker] RunningInstance status check failed.", e);
    }
    log.info("[InstanceStatusChecker] RunningInstance status check using {}.", stopwatch.stop());
}

/**
 * 第17行代码处:
 */
private void handleRunningInstance(List<Long> partAppIds) {
    // 3. 检查 RUNNING 状态的任务(一定时间没收到 TaskTracker 的状态报告,视为失败)
    long threshold = System.currentTimeMillis() - RUNNING_TIMEOUT_MS;
    //查找修改时间距离现在超过1分钟的实例
    List<BriefInstanceInfo> failedInstances = instanceInfoRepository.selectBriefInfoByAppIdInAndStatusAndGmtModifiedBefore(partAppIds, InstanceStatus.RUNNING.getV(), new Date(threshold), PageRequest.of(0, MAX_BATCH_NUM_INSTANCE));
    while (!failedInstances.isEmpty()) {
        // collect job id
        Set<Long> jobIds = failedInstances.stream().map(BriefInstanceInfo::getJobId).collect(Collectors.toSet());
        // query job info and map
        Map<Long, JobInfoDO> jobInfoMap = jobInfoRepository.findByIdIn(jobIds).stream().collect(Collectors.toMap(JobInfoDO::getId, e -> e));
        log.warn("[InstanceStatusCheckService] find some instances have not received status report for a long time : {}", failedInstances.stream().map(BriefInstanceInfo::getInstanceId).collect(Collectors.toList()));
        failedInstances.forEach(instance -> {
            Optional<JobInfoDO> jobInfoOpt = Optional.ofNullable(jobInfoMap.get(instance.getJobId()));
            if (!jobInfoOpt.isPresent()) {
                final Optional<InstanceInfoDO> opt = instanceInfoRepository.findById(instance.getId());
                opt.ifPresent(e -> updateFailedInstance(e, SystemInstanceResult.REPORT_TIMEOUT));
                return;
            }
            TimeExpressionType timeExpressionType = TimeExpressionType.of(jobInfoOpt.get().getTimeExpressionType());
            SwitchableStatus switchableStatus = SwitchableStatus.of(jobInfoOpt.get().getStatus());
            // 如果任务已关闭,则不进行重试,将任务置为失败即可;秒级任务也直接置为失败,由派发器重新调度
            if (switchableStatus != SwitchableStatus.ENABLE || TimeExpressionType.FREQUENT_TYPES.contains(timeExpressionType.getV())) {
                final Optional<InstanceInfoDO> opt = instanceInfoRepository.findById(instance.getId());
                opt.ifPresent(e -> updateFailedInstance(e, SystemInstanceResult.REPORT_TIMEOUT));
                return;
            }
            // CRON 和 API一样,失败次数 + 1,根据重试配置进行重试
            if (instance.getRunningTimes() < jobInfoOpt.get().getInstanceRetryNum()) {
                //更新实例表状态为等待派发
                dispatchService.redispatchAsync(instance.getInstanceId(), InstanceStatus.RUNNING.getV());
            } else {
                final Optional<InstanceInfoDO> opt = instanceInfoRepository.findById(instance.getId());
                opt.ifPresent(e -> updateFailedInstance(e, SystemInstanceResult.REPORT_TIMEOUT));
            }
        });
        threshold = System.currentTimeMillis() - RUNNING_TIMEOUT_MS;
        //重新查找修改时间距离现在超过1分钟的实例,并循环
        failedInstances = instanceInfoRepository.selectBriefInfoByAppIdInAndStatusAndGmtModifiedBefore(partAppIds, InstanceStatus.RUNNING.getV(), new Date(threshold), PageRequest.of(0, MAX_BATCH_NUM_INSTANCE));
    }

}

3 客户端

3.1 PowerJobSpringWorker

        客户端的启动类同样没有什么逻辑,查看下初始化类PowerJobSpringWorker:

/**
 * PowerJobSpringWorker:
 */
@Override
public void afterPropertiesSet() throws Exception {
    powerJobWorker = new PowerJobWorker(config);
    powerJobWorker.init();
}

/**
 * PowerJobWorker:
 */
public void init() throws Exception {

    if (!initialized.compareAndSet(false, true)) {
        log.warn("[PowerJobWorker] please do not repeat the initialization");
        return;
    }

    Stopwatch stopwatch = Stopwatch.createStarted();
    log.info("[PowerJobWorker] start to initialize PowerJobWorker...");

    PowerJobWorkerConfig config = workerRuntime.getWorkerConfig();
    CommonUtils.requireNonNull(config, "can't find PowerJobWorkerConfig, please set PowerJobWorkerConfig first");

    try {
        //打印banner
        PowerBannerPrinter.print();
        // 校验 appName
        if (!config.isEnableTestMode()) {
            assertAppName();
        } else {
            log.warn("[PowerJobWorker] using TestMode now, it's dangerous if this is production env.");
        }

        // 初始化元数据
        String workerAddress = NetUtils.getLocalHost() + ":" + config.getPort();
        workerRuntime.setWorkerAddress(workerAddress);

        // 初始化 线程池
        final ExecutorManager executorManager = new ExecutorManager(workerRuntime.getWorkerConfig());
        workerRuntime.setExecutorManager(executorManager);

        // 初始化 ProcessorLoader
        //动态加载类的工厂,可以不依赖于Spring的实现
        ProcessorLoader processorLoader = buildProcessorLoader(workerRuntime);
        workerRuntime.setProcessorLoader(processorLoader);

        // 初始化 actor
        TaskTrackerActor taskTrackerActor = new TaskTrackerActor(workerRuntime);
        ProcessorTrackerActor processorTrackerActor = new ProcessorTrackerActor(workerRuntime);
        WorkerActor workerActor = new WorkerActor(workerRuntime, taskTrackerActor);

        // 初始化通讯引擎
        EngineConfig engineConfig = new EngineConfig()
                .setType(config.getProtocol().name())
                .setServerType(ServerType.WORKER)
                .setBindAddress(new Address().setHost(NetUtils.getLocalHost()).setPort(config.getPort()))
                .setActorList(Lists.newArrayList(taskTrackerActor, processorTrackerActor, workerActor));

        //之前服务端启动的时候也会执行PowerJobRemoteEngine.start方法
        EngineOutput engineOutput = remoteEngine.start(engineConfig);
        workerRuntime.setTransporter(engineOutput.getTransporter());

        // 连接 server
        ServerDiscoveryService serverDiscoveryService = new ServerDiscoveryService(workerRuntime.getAppId(), workerRuntime.getWorkerConfig());

        serverDiscoveryService.start(workerRuntime.getExecutorManager().getCoreExecutor());
        workerRuntime.setServerDiscoveryService(serverDiscoveryService);

        log.info("[PowerJobWorker] PowerJobRemoteEngine initialized successfully.");

        // 初始化日志系统
        OmsLogHandler omsLogHandler = new OmsLogHandler(workerAddress, workerRuntime.getTransporter(), serverDiscoveryService);
        workerRuntime.setOmsLogHandler(omsLogHandler);

        // 初始化存储
        TaskPersistenceService taskPersistenceService = new TaskPersistenceService(workerRuntime.getWorkerConfig().getStoreStrategy());
        taskPersistenceService.init();
        workerRuntime.setTaskPersistenceService(taskPersistenceService);
        log.info("[PowerJobWorker] local storage initialized successfully.");


        // 初始化定时任务
        workerRuntime.getExecutorManager().getCoreExecutor().scheduleAtFixedRate(new WorkerHealthReporter(workerRuntime), 0, config.getHealthReportInterval(), TimeUnit.SECONDS);
        workerRuntime.getExecutorManager().getCoreExecutor().scheduleWithFixedDelay(omsLogHandler.logSubmitter, 0, 5, TimeUnit.SECONDS);

        log.info("[PowerJobWorker] PowerJobWorker initialized successfully, using time: {}, congratulations!", stopwatch);
    } catch (Exception e) {
        log.error("[PowerJobWorker] initialize PowerJobWorker failed, using {}.", stopwatch, e);
        throw e;
    }
}

/**
 * 第31行代码处:
 */
@SuppressWarnings("rawtypes")
private void assertAppName() {

    PowerJobWorkerConfig config = workerRuntime.getWorkerConfig();
    String appName = config.getAppName();
    Objects.requireNonNull(appName, "appName can't be empty!");

    String url = "http://%s/server/assert?appName=%s";
    for (String server : config.getServerAddress()) {
        String realUrl = String.format(url, server, appName);
        try {
            //客户端启动的时候连接一下服务端,看是否能连通
            String resultDTOStr = CommonUtils.executeWithRetry0(() -> HttpUtils.get(realUrl));
            ResultDTO resultDTO = JsonUtils.parseObject(resultDTOStr, ResultDTO.class);
            if (resultDTO.isSuccess()) {
                Long appId = Long.valueOf(resultDTO.getData().toString());
                log.info("[PowerJobWorker] assert appName({}) succeed, the appId for this application is {}.", appName, appId);
                workerRuntime.setAppId(appId);
                return;
            } else {
                log.error("[PowerJobWorker] assert appName failed, this appName is invalid, please register the appName {} first.", appName);
                throw new PowerJobException(resultDTO.getMessage());
            }
        } catch (PowerJobException oe) {
            throw oe;
        } catch (Exception ignore) {
           log.warn("[PowerJobWorker] assert appName by url({}) failed, please check the server address.", realUrl);
        }
    }
    log.error("[PowerJobWorker] no available server in {}.", config.getServerAddress());
    throw new PowerJobException("no server available!");
}

/**
 * ExecutorManager:
 * 第41行代码处:
 */
public ExecutorManager(PowerJobWorkerConfig workerConfig) {

    //可用cpu数
    final int availableProcessors = Runtime.getRuntime().availableProcessors();
    // 初始化定时线程池
    ThreadFactory coreThreadFactory = new ThreadFactoryBuilder().setNameFormat("powerjob-worker-core-%d").build();
    coreExecutor = new ScheduledThreadPoolExecutor(3, coreThreadFactory);

    ThreadFactory lightTaskReportFactory = new ThreadFactoryBuilder().setNameFormat("powerjob-worker-light-task-status-check-%d").build();
    // 都是 io 密集型任务
    lightweightTaskStatusCheckExecutor = new ScheduledThreadPoolExecutor(availableProcessors * 10, lightTaskReportFactory);

    ThreadFactory lightTaskExecuteFactory = new ThreadFactoryBuilder().setNameFormat("powerjob-worker-light-task-execute-%d").build();
    // 大部分任务都是 io 密集型
    lightweightTaskExecutorService = new ThreadPoolExecutor(availableProcessors * 10, availableProcessors * 10, 120L, TimeUnit.SECONDS,
            new ArrayBlockingQueue<>((workerConfig.getMaxLightweightTaskNum() * 2), true), lightTaskExecuteFactory, new ThreadPoolExecutor.AbortPolicy());

}

/**
 * ServerDiscoveryService:
 * 第68行代码处:
 */
public void start(ScheduledExecutorService timingPool) {
    this.currentServerAddress = discovery();
    if (StringUtils.isEmpty(this.currentServerAddress) && !config.isEnableTestMode()) {
        throw new PowerJobException("can't find any available server, this worker has been quarantined.");
    }
    // 这里必须保证成功
    timingPool.scheduleAtFixedRate(() -> {
                try {
                    this.currentServerAddress = discovery();
                } catch (Exception e) {
                    log.error("[PowerDiscovery] fail to discovery server!", e);
                }
            }
            , 10, 10, TimeUnit.SECONDS);
}

/**
 * 第159行代码处和第166行代码处:
 */
private String discovery() {

    if (ip2Address.isEmpty()) {
        config.getServerAddress().forEach(x -> ip2Address.put(x.split(":")[0], x));
    }

    String result = null;

    // 先对当前机器发起请求
    String currentServer = currentServerAddress;
    if (!StringUtils.isEmpty(currentServer)) {
        String ip = currentServer.split(":")[0];
        // 直接请求当前Server的HTTP服务,可以少一次网络开销,减轻Server负担
        String firstServerAddress = ip2Address.get(ip);
        if (firstServerAddress != null) {
            result = acquire(firstServerAddress);
        }
    }

    for (String httpServerAddress : config.getServerAddress()) {
        if (StringUtils.isEmpty(result)) {
            result = acquire(httpServerAddress);
        } else {
            break;
        }
    }

    if (StringUtils.isEmpty(result)) {
        log.warn("[PowerDiscovery] can't find any available server, this worker has been quarantined.");

        // 在 Server 高可用的前提下,连续失败多次,说明该节点与外界失联,Server已经将秒级任务转移到其他Worker,需要杀死本地的任务
        if (FAILED_COUNT++ > MAX_FAILED_COUNT) {

            log.warn("[PowerDiscovery] can't find any available server for 3 consecutive times, It's time to kill all frequent job in this worker.");
            List<Long> frequentInstanceIds = HeavyTaskTrackerManager.getAllFrequentTaskTrackerKeys();
            if (!CollectionUtils.isEmpty(frequentInstanceIds)) {
                frequentInstanceIds.forEach(instanceId -> {
                    HeavyTaskTracker taskTracker = HeavyTaskTrackerManager.removeTaskTracker(instanceId);
                    taskTracker.destroy();
                    log.warn("[PowerDiscovery] kill frequent instance(instanceId={}) due to can't find any available server.", instanceId);
                });
            }

            FAILED_COUNT = 0;
        }
        return null;
    } else {
        // 重置失败次数
        FAILED_COUNT = 0;
        log.debug("[PowerDiscovery] current server is {}.", result);
        return result;
    }
}

/**
 * 第192行代码处和第198行代码处
 */
@SuppressWarnings("rawtypes")
private String acquire(String httpServerAddress) {
    String result = null;
    //构建url参数
    String url = buildServerDiscoveryUrl(httpServerAddress);
    try {
        //请求服务端ServerController.acquireServer方法
        result = CommonUtils.executeWithRetry0(() -> HttpUtils.get(url));
    } catch (Exception ignore) {
    }
    if (!StringUtils.isEmpty(result)) {
        try {
            ResultDTO resultDTO = JsonUtils.parseObject(result, ResultDTO.class);
            if (resultDTO.isSuccess()) {
                return resultDTO.getData().toString();
            }
        } catch (Exception ignore) {
        }
    }
    return null;
}

/**
 * ServerController:
 * 第241行代码处:
 */
@GetMapping("/acquire")
public ResultDTO<String> acquireServer(ServerDiscoveryRequest request) {
    return ResultDTO.success(serverElectionService.elect(request));
}

/**
 * ServerElectionService:
 */
public String elect(ServerDiscoveryRequest request) {
    if (!accurate()) {
        final String currentServer = request.getCurrentServer();
        // 如果是本机,就不需要查数据库那么复杂的操作了,直接返回成功
        Optional<ProtocolInfo> localProtocolInfoOpt = Optional.ofNullable(transportService.allProtocols().get(request.getProtocol()));
        //如果不是精确地请求,并且请求的参数中直接带有当前服务端的地址,则直接返回其地址即可
        if (localProtocolInfoOpt.isPresent() && localProtocolInfoOpt.get().getAddress().equals(currentServer)) {
            log.debug("[ServerElectionService] this server[{}] is worker's current server, skip check", currentServer);
            return currentServer;
        }
    }
    //上面的条件不满足,则走常规的选举流程
    return getServer0(request);
}

/**
 * 第269行代码处:
 */
private boolean accurate() {
    //accurateSelectServerPercentage默认为50,这里是在随机判断是否是精确的请求
    return ThreadLocalRandom.current().nextInt(100) < accurateSelectServerPercentage;
}

/**
 * 第280行代码处:
 */
private String getServer0(ServerDiscoveryRequest discoveryRequest) {

    final Long appId = discoveryRequest.getAppId();
    final String protocol = discoveryRequest.getProtocol();
    Set<String> downServerCache = Sets.newHashSet();

    for (int i = 0; i < RETRY_TIMES; i++) {

        // 无锁获取当前数据库中的Server
        Optional<AppInfoDO> appInfoOpt = appInfoRepository.findById(appId);
        if (!appInfoOpt.isPresent()) {
            throw new PowerJobException(appId + " is not registered!");
        }
        String appName = appInfoOpt.get().getAppName();
        String originServer = appInfoOpt.get().getCurrentServer();
        String activeAddress = activeAddress(originServer, downServerCache, protocol);
        if (StringUtils.isNotEmpty(activeAddress)) {
            return activeAddress;
        }

        // 无可用Server,重新进行Server选举,需要加锁
        String lockName = String.format(SERVER_ELECT_LOCK, appId);
        //文件锁
        boolean lockStatus = lockService.tryLock(lockName, 30000);
        if (!lockStatus) {
            try {
                Thread.sleep(500);
            } catch (Exception ignore) {
            }
            continue;
        }
        try {

            // 可能上一台机器已经完成了Server选举,需要再次判断
            //双重检查加锁
            AppInfoDO appInfo = appInfoRepository.findById(appId).orElseThrow(() -> new RuntimeException("impossible, unless we just lost our database."));
            String address = activeAddress(appInfo.getCurrentServer(), downServerCache, protocol);
            if (StringUtils.isNotEmpty(address)) {
                return address;
            }

            // 篡位,如果本机存在协议,则作为Server调度该 worker
            //优先本机调度
            final ProtocolInfo targetProtocolInfo = transportService.allProtocols().get(protocol);
            if (targetProtocolInfo != null) {
                // 注意,写入 AppInfoDO#currentServer 的永远是 default 的地址,仅在返回的时候特殊处理为协议地址
                appInfo.setCurrentServer(transportService.defaultProtocol().getAddress());
                appInfo.setGmtModified(new Date());

                appInfoRepository.saveAndFlush(appInfo);
                log.info("[ServerElection] this server({}) become the new server for app(appId={}).", appInfo.getCurrentServer(), appId);
                return targetProtocolInfo.getAddress();
            }
        } catch (Exception e) {
            log.error("[ServerElection] write new server to db failed for app {}.", appName, e);
        } finally {
            lockService.unlock(lockName);
        }
    }
    throw new PowerJobException("server elect failed for app " + appId);
}

/**
 * 第309行代码处:
 * 判断指定server是否存活
 *
 * @param serverAddress   需要检测的server地址
 * @param downServerCache 缓存,防止多次发送PING(这个QPS其实还蛮爆表的...)
 * @param protocol        协议,用于返回指定的地址
 * @return null or address
 */
private String activeAddress(String serverAddress, Set<String> downServerCache, String protocol) {

    if (downServerCache.contains(serverAddress)) {
        return null;
    }
    if (StringUtils.isEmpty(serverAddress)) {
        return null;
    }

    Ping ping = new Ping();
    ping.setCurrentTime(System.currentTimeMillis());

    //构建服务端url参数
    URL targetUrl = ServerURLFactory.ping2Friend(serverAddress);
    try {
        //这里会跳转到FriendActor.onReceivePing方法
        AskResponse response = transportService.ask(Protocol.HTTP.name(), targetUrl, ping, AskResponse.class)
                .toCompletableFuture()
                .get(PING_TIMEOUT_MS, TimeUnit.MILLISECONDS);
        if (response.isSuccess()) {
            // 检测通过的是远程 server 的暴露地址,需要返回 worker 需要的协议地址
            final JSONObject protocolInfo = JsonUtils.parseObject(response.getData(), JSONObject.class).getJSONObject(protocol);
            if (protocolInfo != null) {
                downServerCache.remove(serverAddress);
                final String protocolAddress = protocolInfo.toJavaObject(ProtocolInfo.class).getAddress();
                log.info("[ServerElection] server[{}] is active, it will be the master, final protocol address={}", serverAddress, protocolAddress);
                return protocolAddress;
            } else {
                log.warn("[ServerElection] server[{}] is active but don't have target protocol", serverAddress);
            }
        }
    } catch (TimeoutException te) {
        log.warn("[ServerElection] server[{}] was down due to ping timeout!", serverAddress);
    } catch (Exception e) {
        log.warn("[ServerElection] server[{}] was down with unknown case!", serverAddress, e);
    }
    downServerCache.add(serverAddress);
    return null;
}

/**
 * FriendActor:
 * 第381行代码处:
 * 处理存活检测的请求
 */
@Handler(path = S4S_HANDLER_PING, processType = ProcessType.NO_BLOCKING)
public AskResponse onReceivePing(Ping ping) {
    return AskResponse.succeed(transportService.allProtocols());
}

/**
 * PowerTransportService:
 */
@Override
public Map<String, ProtocolInfo> allProtocols() {
    //这里就是从protocolName2Info缓存中取数据
    return protocolName2Info;
}

        由上可知,在客户端启动的时候,会通过选举的方式来选出一台服务器作为自己的server(也可能不选举,直接选择当前的服务端),并且会定时发送心跳数据来进行保活。        

        在上面第85行代码处和第86行代码处,客户端启动的时候也会有两个定时任务,查看其实现:

3.1.1 WorkerHealthReporter

        WorkerHealthReporter是用来给客户端健康度定时上报用的:

/**
 * WorkerHealthReporter:
 */
@Override
public void run() {

    // 没有可用Server,无法上报
    String currentServer = workerRuntime.getServerDiscoveryService().getCurrentServerAddress();
    if (StringUtils.isEmpty(currentServer)) {
        log.warn("[WorkerHealthReporter] no available server,fail to report health info!");
        return;
    }

    SystemMetrics systemMetrics;

    if (workerRuntime.getWorkerConfig().getSystemMetricsCollector() == null) {
        systemMetrics = SystemInfoUtils.getSystemMetrics();
    } else {
        systemMetrics = workerRuntime.getWorkerConfig().getSystemMetricsCollector().collect();
    }

    WorkerHeartbeat heartbeat = new WorkerHeartbeat();

    heartbeat.setSystemMetrics(systemMetrics);
    heartbeat.setWorkerAddress(workerRuntime.getWorkerAddress());
    heartbeat.setAppName(workerRuntime.getWorkerConfig().getAppName());
    heartbeat.setAppId(workerRuntime.getAppId());
    heartbeat.setHeartbeatTime(System.currentTimeMillis());
    heartbeat.setVersion(PowerJobWorkerVersion.getVersion());
    heartbeat.setProtocol(workerRuntime.getWorkerConfig().getProtocol().name());
    heartbeat.setClient("KingPenguin");
    heartbeat.setTag(workerRuntime.getWorkerConfig().getTag());

    // 上报 Tracker 数量
    heartbeat.setLightTaskTrackerNum(LightTaskTrackerManager.currentTaskTrackerSize());
    heartbeat.setHeavyTaskTrackerNum(HeavyTaskTrackerManager.currentTaskTrackerSize());
    // 是否超载
    if (workerRuntime.getWorkerConfig().getMaxLightweightTaskNum() <= LightTaskTrackerManager.currentTaskTrackerSize() || workerRuntime.getWorkerConfig().getMaxHeavyweightTaskNum() <= HeavyTaskTrackerManager.currentTaskTrackerSize()) {
        heartbeat.setOverload(true);
    }
    // 获取当前加载的容器列表
    heartbeat.setContainerInfos(OmsContainerFactory.getDeployedContainerInfos());
    // 发送请求
    if (StringUtils.isEmpty(currentServer)) {
        return;
    }
    // log
    log.info("[WorkerHealthReporter] report health status,appId:{},appName:{},isOverload:{},maxLightweightTaskNum:{},currentLightweightTaskNum:{},maxHeavyweightTaskNum:{},currentHeavyweightTaskNum:{}",
            heartbeat.getAppId(),
            heartbeat.getAppName(),
            heartbeat.isOverload(),
            workerRuntime.getWorkerConfig().getMaxLightweightTaskNum(),
            heartbeat.getLightTaskTrackerNum(),
            workerRuntime.getWorkerConfig().getMaxHeavyweightTaskNum(),
            heartbeat.getHeavyTaskTrackerNum()
    );

    TransportUtils.reportWorkerHeartbeat(heartbeat, currentServer, workerRuntime.getTransporter());
}

/**
 * SystemInfoUtils:
 * 第17行代码处:
 */
public static SystemMetrics getSystemMetrics() {

    SystemMetrics metrics = new SystemMetrics();

    //赋值cpu指标
    fillCPUInfo(metrics);
    //赋值内存指标
    fillMemoryInfo(metrics);
    //赋值磁盘指标
    fillDiskInfo(metrics);

    // 在Worker完成分数计算,减小Server压力
    metrics.calculateScore();
    return metrics;
}

/**
 * TransportUtils:
 * 第58行代码处:
 */
public static void reportWorkerHeartbeat(WorkerHeartbeat req, String address, Transporter transporter) {
    //绑定url调用信息
    final URL url = easyBuildUrl(ServerType.SERVER, S4W_PATH, S4W_HANDLER_WORKER_HEARTBEAT, address);
    transporter.tell(url, req);
}

        其中SystemInfoUtils.getSystemMetrics方法是在获取系统的一些指标数据(调用的都是Java的底层Api,这里就不再详细查看了),并且计算健康度的得分(之前在第2.3.1小节中已经查看过该方法的实现了)。基于此,我们就知道了控制台首页的worker指标数据是怎么来的了:

        在上面第88行代码处向服务端发送了心跳数据,接下来就来看下服务端是如何处理的:

/**
 * AbWorkerRequestHandler:
 */
@Override
@Handler(path = S4W_HANDLER_WORKER_HEARTBEAT, processType = ProcessType.NO_BLOCKING)
public void processWorkerHeartbeat(WorkerHeartbeat heartbeat) {
    long startMs = System.currentTimeMillis();
    WorkerHeartbeatEvent event = new WorkerHeartbeatEvent()
            .setAppName(heartbeat.getAppName())
            .setAppId(heartbeat.getAppId())
            .setVersion(heartbeat.getVersion())
            .setProtocol(heartbeat.getProtocol())
            .setTag(heartbeat.getTag())
            .setWorkerAddress(heartbeat.getWorkerAddress())
            .setDelayMs(startMs - heartbeat.getHeartbeatTime())
            .setScore(heartbeat.getSystemMetrics().getScore());
    processWorkerHeartbeat0(heartbeat, event);
    //默认实现是写入日志进行监控
    monitorService.monitor(event);
}

/**
 * WorkerRequestHandlerImpl:
 * 第17行代码处:
 */
@Override
protected void processWorkerHeartbeat0(WorkerHeartbeat heartbeat, WorkerHeartbeatEvent event) {
    WorkerClusterManagerService.updateStatus(heartbeat);
}

/**
 * WorkerClusterManagerService:
 * 更新状态
 *
 * @param heartbeat Worker的心跳包
 */
public static void updateStatus(WorkerHeartbeat heartbeat) {
    Long appId = heartbeat.getAppId();
    String appName = heartbeat.getAppName();
    ClusterStatusHolder clusterStatusHolder = APP_ID_2_CLUSTER_STATUS.computeIfAbsent(appId, ignore -> new ClusterStatusHolder(appName));
    clusterStatusHolder.updateStatus(heartbeat);
}

/**
 * ClusterStatusHolder:
 * 更新 worker 机器的状态
 *
 * @param heartbeat 心跳请求
 */
public void updateStatus(WorkerHeartbeat heartbeat) {

    String workerAddress = heartbeat.getWorkerAddress();
    long heartbeatTime = heartbeat.getHeartbeatTime();

    WorkerInfo workerInfo = address2WorkerInfo.computeIfAbsent(workerAddress, ignore -> {
        WorkerInfo wf = new WorkerInfo();
        wf.refresh(heartbeat);
        return wf;
    });
    long oldTime = workerInfo.getLastActiveTime();
    //过期心跳数据,不处理
    if (heartbeatTime < oldTime) {
        log.warn("[ClusterStatusHolder-{}] receive the expired heartbeat from {}, serverTime: {}, heartTime: {}", appName, heartbeat.getWorkerAddress(), System.currentTimeMillis(), heartbeat.getHeartbeatTime());
        return;
    }

    workerInfo.refresh(heartbeat);

    List<DeployedContainerInfo> containerInfos = heartbeat.getContainerInfos();
    if (!CollectionUtils.isEmpty(containerInfos)) {
        containerInfos.forEach(containerInfo -> {
            Map<String, DeployedContainerInfo> infos = containerId2Infos.computeIfAbsent(containerInfo.getContainerId(), ignore -> Maps.newConcurrentMap());
            infos.put(workerAddress, containerInfo);
        });
    }
}

/**
 * WorkerInfo:
 * 第57行代码处和第67行代码处:
 * 刷新服务端记录的客户端的数据
 */
public void refresh(WorkerHeartbeat workerHeartbeat) {
    address = workerHeartbeat.getWorkerAddress();
    lastActiveTime = workerHeartbeat.getHeartbeatTime();
    protocol = workerHeartbeat.getProtocol();
    client = workerHeartbeat.getClient();
    tag = workerHeartbeat.getTag();
    systemMetrics = workerHeartbeat.getSystemMetrics();
    containerInfos = workerHeartbeat.getContainerInfos();

    lightTaskTrackerNum = workerHeartbeat.getLightTaskTrackerNum();
    heavyTaskTrackerNum = workerHeartbeat.getHeavyTaskTrackerNum();

    if (workerHeartbeat.isOverload()) {
        overloading = true;
        lastOverloadTime = workerHeartbeat.getHeartbeatTime();
        log.warn("[WorkerInfo] worker {} is overload!", getAddress());
    } else {
        overloading = false;
    }
}

3.1.2 LogSubmitter

/**
 * LogSubmitter:
 */
@Override
public void run() {

    boolean lockResult = reportLock.tryLock();
    if (!lockResult) {
        return;
    }

    try {

        final String currentServerAddress = serverDiscoveryService.getCurrentServerAddress();
        // 当前无可用 Server
        if (StringUtils.isEmpty(currentServerAddress)) {
            if (!logQueue.isEmpty()) {
                logQueue.clear();
                log.warn("[OmsLogHandler] because there is no available server to report logs which leads to queue accumulation, oms discarded all logs.");
            }
            return;
        }

        List<InstanceLogContent> logs = Lists.newLinkedList();

        while (!logQueue.isEmpty()) {
            try {
                //从日志队列头部取数据
                InstanceLogContent logContent = logQueue.poll(100, TimeUnit.MILLISECONDS);
                logs.add(logContent);

                //批处理
                if (logs.size() >= BATCH_SIZE) {
                    WorkerLogReportReq req = new WorkerLogReportReq(workerAddress, Lists.newLinkedList(logs));
                    // 不可靠请求,WEB日志不追求极致
                    TransportUtils.reportLogs(req, currentServerAddress, transporter);
                    logs.clear();
                }

            } catch (Exception ignore) {
                break;
            }
        }

        if (!logs.isEmpty()) {
            WorkerLogReportReq req = new WorkerLogReportReq(workerAddress, logs);
            TransportUtils.reportLogs(req, currentServerAddress, transporter);
        }

    } finally {
        reportLock.unlock();
    }
}

/**
 * TransportUtils:
 * 第36行代码处和第47行代码处:
 */
public static void reportLogs(WorkerLogReportReq req, String address, Transporter transporter) {
    final URL url = easyBuildUrl(ServerType.SERVER, S4W_PATH, S4W_HANDLER_REPORT_LOG, address);
    transporter.tell(url, req);
}

        上面第61行代码处会将请求发送到服务端的AbWorkerRequestHandler.processWorkerLogReport方法中:

/**
 * AbWorkerRequestHandler:
 */
@Override
@Handler(path = S4W_HANDLER_REPORT_LOG, processType = ProcessType.NO_BLOCKING)
public void processWorkerLogReport(WorkerLogReportReq req) {

    WorkerLogReportEvent event = new WorkerLogReportEvent()
            .setWorkerAddress(req.getWorkerAddress())
            .setLogNum(req.getInstanceLogContents().size());
    try {
        processWorkerLogReport0(req, event);
        event.setStatus(WorkerLogReportEvent.Status.SUCCESS);
    } catch (RejectedExecutionException re) {
        event.setStatus(WorkerLogReportEvent.Status.REJECTED);
    } catch (Throwable t) {
        event.setStatus(WorkerLogReportEvent.Status.EXCEPTION);
        log.warn("[WorkerRequestHandler] process worker report failed!", t);
    } finally {
        //日志监控
        monitorService.monitor(event);
    }
}

/**
 * WorkerRequestHandlerImpl:
 * 第12行代码处:
 */
@Override
protected void processWorkerLogReport0(WorkerLogReportReq req, WorkerLogReportEvent event) {
    // 这个效率应该不会拉垮吧...也就是一些判断 + Map#get 吧...
    instanceLogService.submitLogs(req.getWorkerAddress(), req.getInstanceLogContents());
}

/**
 * InstanceLogService:
 * 提交日志记录,持久化到本地数据库中
 *
 * @param workerAddress 上报机器地址
 * @param logs          任务实例运行时日志
 */
@Async(value = PJThreadPool.LOCAL_DB_POOL)
public void submitLogs(String workerAddress, List<InstanceLogContent> logs) {

    List<LocalInstanceLogDO> logList = logs.stream().map(x -> {
        instanceId2LastReportTime.put(x.getInstanceId(), System.currentTimeMillis());

        LocalInstanceLogDO y = new LocalInstanceLogDO();
        BeanUtils.copyProperties(x, y);
        y.setWorkerAddress(workerAddress);
        return y;
    }).collect(Collectors.toList());

    try {
        //插入到local_instance_log表中
        CommonUtils.executeWithRetry0(() -> localInstanceLogRepository.saveAll(logList));
    } catch (Exception e) {
        log.warn("[InstanceLogService] persistent instance logs failed, these logs will be dropped: {}.", logs, e);
    }
}

原创不易,未得准许,请勿转载,翻版必究

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1043487.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

基于PSO算法的功率角摆动曲线优化研究(Matlab代码实现)

&#x1f4a5;&#x1f4a5;&#x1f49e;&#x1f49e;欢迎来到本博客❤️❤️&#x1f4a5;&#x1f4a5; &#x1f3c6;博主优势&#xff1a;&#x1f31e;&#x1f31e;&#x1f31e;博客内容尽量做到思维缜密&#xff0c;逻辑清晰&#xff0c;为了方便读者。 ⛳️座右铭&a…

MySQL的执行流程

在聊mysql的执行流程之前&#xff0c;咱们要先聊聊mysql的逻辑架构。 逻辑架构 可以将上图简化为下图 连接层 客服端访问mysql服务器前&#xff0c;要先和mysq建立tcp连接。经过3次握手建立连接成功后&#xff0c;mysql服务器对tcp传输过来的账号密码进行身份认证&#x…

接口测试之文件下载

在工作中对于下载接口&#xff0c;经常会有这样的疑问&#xff1a;这类接口一般功能比较稳定&#xff0c;但是又比较重要&#xff0c;需要占用回归测试时间&#xff0c;有没有可替代的方式&#xff1f; 答案肯定是有的&#xff0c;可以从接口测试/UI自动化测试介入&#xff0c;…

UE5 虚幻引擎 使用编辑器工具进行资产批处理操作 让你的工作效率指数级增长!!!

目录 0 引言1 编辑器工具蓝图1.1 介绍1.2 案例&#xff1a;批量设置静态网格体资产的LOD1.3 进阶用法 2 编辑器工具控件2.1 介绍2.2 案例&#xff1a;随机给场景中Actor添加Yaw旋转值 0 引言 官方教程视频 参考文章 参考视频 UE5提供了两种 编辑器工具 &#xff1a;编辑器工具…

Dubbo学习(二)——dubbo环境搭建

文章目录 dubbo核心简介SpringCloud与Dubbo的区别Dubbo的架构说明dubbo和Feign远程调用的差异共同点&#xff1a;区别&#xff1a; 基于 Spring Boot 开发微服务应用项目介绍1. 启动注册中心2. 新建一个spring boot项目3. 添加 Maven 依赖4. 定义服务接口5. 定义服务端的实现实…

操作系统 --- 进程的描述与控制

&#xff08;一 &#xff09;前趋图和程序的执行 前趋图 前趋图 :有向无循环图 &#xff0c;用于描述 进程之间执行的先后顺序 结点表示进程或程序段&#xff0c;有向边表示前趋关系 前驱图中是不允许有循环的&#xff0c;否则必然会产生无法实现的前驱关系。 &#xff08;二…

浙大公共管理硕士(MPA)提前批面试的题库里到底有哪些内容?

像研究生招生这样严肃的工作&#xff0c;所有的环节和程序都得有章可循&#xff01;浙大公共管理硕士&#xff08;MPA&#xff09;项目提前批面试即将在10月14日举行&#xff0c;已经通过材料审核的考生肯定很想知道面试中会遇到什么样的题目&#xff0c;专注浙大的杭州达立易考…

卤制品配送经营商城小程序的用处是什么

卤制品也是食品领域重要的分支&#xff0c;尤其对年轻人来说&#xff0c;只要干净卫生好吃价格合理&#xff0c;那复购率宣传性自是不用说&#xff0c;而随着互联网发展&#xff0c;传统线下门店也须要通过线上破解难题或进一步扩大生意。 而商城小程序无疑是商家通过线上私域…

Redis各数据类型特定的命令和用法 1.0版本

目录 一、Sring数据类型1.1 概述1.2 set/get/append/strlen命令1.3 incr/decr/incrby/decrby 命令1.4 getset命令1.5 setex命令1.6 setnx命令1.7 mset/mget/msetnx命令 二、List数据类型2.1 概述2.2 lpush/lpushx/lrange命令2.3 lpop/llen命令2.4 lrem/lset/lindex/ltrim命令2.…

信息安全:网络安全审计技术原理与应用.

信息安全&#xff1a;网络安全审计技术原理与应用. 网络安全审计是指对网络信息系统的安全相关活动信息进行获取、记录、存储、分析和利用的工作。网络安全审计的作用在千建立“事后“安全保障措施&#xff0c;保存网络安全事件及行为信息&#xff0c;为网络安全事件分析提供线…

SpringBoot之视图解析

文章目录 前言一、视图解析1.视图解析原理流程 二、模板引擎——Thymeleaf基本语法表达式字面量文本操作数学运算布尔运算比较运算条件运算特殊操作设置属性值-th:attr迭代条件运算属性优先级 提取公共页面th:insertth:replace区别 总结 前言 SpringBoot默认不支持 JSP&#x…

UE学习记录07----C++中使用事件委托

1.c定义多播委托&#xff0c;示例代码&#xff1a; #include "Delegates/Delegate.h"DECLARE_DYNAMIC_MULTICAST_DELEGATE_OneParam(FMyDelegate, UObject*, SelectAgent);/****/ UCLASS(Blueprintable, DisplayName "VM_PlaceEntity") class PR_PLACEE…

“益路同行”栏目人物专访第0001期—笨爸爸工房创始人张旭健先生

中国善网在第十届慈展会上特别推出了《益路同行》采访栏目&#xff0c;《益路同行》栏目旨在寻觅公益之路上同行者的故事&#xff0c;挖掘公益更深层次的内涵&#xff0c;探索新时代公益发展道路。希望公益企业、人物、故事被更多人看到&#xff0c;并呼吁更多人为公益做出自己…

san.js源码解读之工具(util)篇——splitStr2Obj函数

一、 源码解析 /*** 将字符串逗号切分返回对象** param {string} source 源字符串* return {Object}*/ function splitStr2Obj(source) {var result {};each( // 2source.split(,), // 1function (key) { // 3result[key] key;});return result; }把字符串通过 split 函数以…

力扣刷题-链表-链表相交

02.07. 链表相交 给你两个单链表的头节点 headA 和 headB &#xff0c;请你找出并返回两个单链表相交的起始节点。如果两个链表没有交点&#xff0c;返回 null 。 图示两个链表在节点 c1 开始相交&#xff1a; 题目数据 保证 整个链式结构中不存在环。 注意&#xff0c;函数返…

C语言动态内存管理malloc、calloc、realloc、free函数的讲解

一.为什么存在动态内存管理&#xff1a; 我们知道&#xff0c;在此之前向内存申请空间的方式有以下两种&#xff1a;&#xff08;变量和数组&#xff09; 但这两种方法有几个缺陷&#xff1a; ①&#xff1a;空间开辟大小是固定的&#xff1b; ②&#xff1a;数组在声明的时候&…

redis缓存穿透问题

缓存穿透是指客户端请求的数据在缓存中和数据库中都不存在,这样缓存永远不会生效,这些请求都会打到数据库 常见的解决办法: 缓存空对象:第一次请求发现redis和数据库中都不存在时,,可以设置请求数据为key,null为值,放置在缓存中. 但是会有一个问题,就是如果此类请求过多,会导…

渗透中Windows利用Certutil进行文件下载

渗透中Windows利用Certutil进行文件下载 1.概述2.利用Certutil进行文件下载 1.概述 certutil.exe 是一个合法Windows文件&#xff0c;用于管理Windows证书的程序。此合法Windows服务现已被广泛滥用于恶意用途 渗透中主要利用其下载、编码、解码、替代数据流等功能 可以在命令…

全流程HEC-RAS 1D/2D水动力与水环境模拟技术

查看原文>>>全流程HEC-RAS 1D/2D水动力与水环境模拟技术案例实践及拓展应用 目录 专题一、水动力模型基础 专题二、恒定流模型(1D/2D) 专题三、一维非恒定流 专题四、二维非恒定流模型&#xff08;一&#xff09; 专题五、二维非恒定流模型&#xff08;二&#…

JSON.stringify格式化数据美化显示效果(不呆板地一行显示)

一.JSON.stringify 语法: JSON.stringify(value[, replacer[,space]])第二个参数replacer: 过滤属性或者处理值第三个参数space: 美化输出格式 第一个参数: 对象object等 第二个参数replacer: 如果该参数是一个函数: 则在序列化的过程中,被序列化的值的每个属性都会经过该函…